Change lfta code generation form C to C++
[com/gs-lite.git] / src / lib / gscprts / rts_gdat.c
1 /* ------------------------------------------------
2  Copyright 2014 AT&T Intellectual Property
3  Licensed under the Apache License, Version 2.0 (the "License");
4  you may not use this file except in compliance with the License.
5  You may obtain a copy of the License at
6  
7  http://www.apache.org/licenses/LICENSE-2.0
8  
9  Unless required by applicable law or agreed to in writing, software
10  distributed under the License is distributed on an "AS IS" BASIS,
11  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  See the License for the specific language governing permissions and
13  limitations under the License.
14  ------------------------------------------- */
15 #include <time.h>
16 #include <stdlib.h>
17 #include <string.h>
18 #include <unistd.h>
19 #include <fcntl.h>
20 #include <sys/time.h>
21 #include <sys/stat.h>
22 #include <sys/types.h>
23 #include <sys/socket.h>
24 #include <netinet/in.h>
25 #include "errno.h"
26
27 #include "gsconfig.h"
28 #include "gshub.h"
29 #include "gstypes.h"
30 #include "lapp.h"
31 #include "fta.h"
32 #include "stdio.h"
33 #include "stdlib.h"
34 #include "packet.h"
35 #include "schemaparser.h"
36 #include "lfta/rts.h"
37
38
39 void rts_fta_process_packet(struct packet * p);
40 void rts_fta_done();
41 void fta_init(gs_sp_t device);
42
43 #define CSVMAXLINE 1000000
44
45 static FILE *pd;
46 static struct packet cur_packet;
47 static gs_sp_t name;
48 static gs_uint32_t verbose=0;
49 static gs_uint32_t startupdelay=0;
50 static gs_uint32_t gshub=0;
51 static int socket_desc=0;
52 static gs_uint32_t singlefile=0;
53
54 static void gdat_replay_check_messages() {
55     if (fta_start_service(0)<0) {
56         print_error("Error:in processing the msg queue for a replay file");
57         exit(9);
58     }
59 }
60
61 static gs_retval_t gs_read(gs_sp_t buffer, gs_uint32_t length){
62         gs_uint32_t used=0;
63         gs_uint32_t cur;
64     fd_set socket_rset;
65     fd_set socket_eset;
66     struct timeval socket_timeout;
67     int retval;
68     
69     FD_ZERO(&socket_rset);
70     FD_SET(socket_desc,&socket_rset);
71     FD_ZERO(&socket_eset);
72     FD_SET(socket_desc,&socket_eset);
73     // timeout in one millisecon
74     socket_timeout.tv_sec=0;
75     socket_timeout.tv_usec=1000;
76     
77     if ((retval=select(socket_desc+1,&socket_rset,0,&socket_eset,&socket_timeout))<=0) {
78         if (retval==0) {
79             // caught a timeout
80             return -1;
81         }
82         print_error("ERROR:select error in reading data from socket");
83         exit(0);
84     }
85     
86         while(used < length) {
87                 if ((cur=read(socket_desc,&(buffer[used]),length-used))<=0) {
88             if (errno==115) return -2; // error code we get if the server closes the connection on us
89                         print_error("ERROR:could not read data from gdat stream");
90                         exit(0);
91                 }
92                 used+=cur;
93         }
94         return 1;
95 }
96
97 static gs_uint32_t gs_read_line(gs_sp_t buffer, gs_uint32_t length){
98     gs_uint32_t used=0;
99     gs_uint32_t cur;
100     
101     while((used < (length-1)) && ((used==0)|| (buffer[used-1]!='\n'))) {
102         if ((cur=read(socket_desc,&(buffer[used]),1))<=0) {
103             print_error("ERROR:could not read data from gdat stream");
104             exit(0);
105         }
106         used+=cur;
107     }
108         buffer[used]=0;
109         return 1;
110 }
111
112
113 static void init_socket() {
114         endpoint gshub;
115         endpoint srcinfo;
116         struct sockaddr_in server;
117     gs_int32_t parserversion;
118     gs_uint32_t schemalen;
119     static char * asciischema=0;
120         gs_int8_t buf[1024];
121         
122         if (get_hub(&gshub)!=0) {
123                 print_error("ERROR:could not find gshub for data source");
124                 exit(0);
125         }
126     
127         if (get_streamsource(gshub,name,&srcinfo,1) !=0) {
128                 print_error("ERROR:could not find data source for stream\n");
129                 exit(0);
130         }
131     
132         socket_desc = socket(AF_INET , SOCK_STREAM , 0);
133     if (socket_desc == -1)
134     {
135         print_error("ERROR:could not create socket for data stream");
136                 exit(0);
137     }
138         server.sin_addr.s_addr = srcinfo.ip;
139     server.sin_family = AF_INET;
140     server.sin_port = srcinfo.port;
141     
142         if (connect(socket_desc , (struct sockaddr *)&server , sizeof(server)) < 0)
143     {
144                 print_error("ERROR: could not open connection to data source");
145                 exit(0);
146         }
147     
148     
149         gs_read_line(buf,1024);
150         if (strncmp(buf,"GDAT",4)!=0) {
151                 print_error("ERROR: not a GDAT stream\n");
152         exit(0);
153     }
154         gs_read_line(buf,1024);
155         if (sscanf(buf,"VERSION:%u\n",&parserversion)!=1) {
156         print_error("ERROR: no GDAT VERSION given\n");
157         exit(0);
158     }
159         gs_read_line(buf,1024);
160     if (sscanf(buf,"SCHEMALENGTH:%u\n",&schemalen)!=1) {
161         print_error("ERROR: no GDAT SCHEMALENGTH given\n");
162         exit(0);
163     }
164     if (schemaparser_accepts_version(parserversion)!=1) {
165         print_error("gdatinput::wrong gdat version\n");
166         exit(0);
167     }
168     if ((asciischema=malloc(schemalen))==0) {
169         print_error("gdatinput::could not allocate memory for schema\n");
170         exit(0);
171     }
172     if (gs_read(asciischema,schemalen)!=1) {
173         print_error("gdatinput::could not read schema from file\n");
174         exit(0);
175     }
176     if ((cur_packet.record.gdat.schema=ftaschema_parse_string(asciischema))<0) {
177         print_error("gdatinput::could not parse schema\n");
178         exit(0);
179     }
180     cur_packet.record.gdat.numfields=ftaschema_tuple_len(cur_packet.record.gdat.schema);
181 }
182
183
184 static void next_file() {
185     struct stat s;
186     gs_int32_t parserversion;
187     gs_uint32_t schemalen;
188         static char * asciischema=0;
189     
190     if (verbose) {
191         fprintf(stderr,"Opening %s\n",name);
192     }
193     while (lstat(name,&s)!=0) {
194         if (errno!=ENOENT) {
195             print_error("gdat::lstat unexpected return value");
196             exit(10);
197         }
198         gdat_replay_check_messages();
199         usleep(10000);
200     }
201     
202     
203     if (pd!=0) {
204         fclose(pd);
205         if (asciischema!=0) free(asciischema);
206         if (cur_packet.record.gdat.schema>=0) {
207             ftaschema_free(cur_packet.record.gdat.schema);
208         }
209     }
210     
211     if ((pd=fopen(name,"r"))==0) return;
212         if (singlefile==0) {
213                 unlink(name);
214         }
215     
216         if (fscanf(pd,"GDAT\nVERSION:%u\nSCHEMALENGTH:%u\n",&parserversion,&schemalen)!=2) {
217                 if (verbose) fprintf(stderr,"gdatinput::could not parse GDAT header\n");
218                 fclose(pd);
219                 pd=0;
220                 return;
221     }
222     if (schemaparser_accepts_version(parserversion)!=1) {
223         if (verbose) fprintf(stderr,"gdatinput::wrong gdat version\n");
224         fclose(pd);
225         pd=0;
226                 return;
227         }
228         if ((asciischema=malloc(schemalen))==0) {
229         if (verbose) fprintf(stderr,"gdatinput::could not allocate memory for schema\n");
230                 fclose(pd);
231                 pd=0;
232         return;
233     }
234         if (fread(asciischema,schemalen,1,pd)!=1) {
235         if (verbose) fprintf(stderr,"gdatinput::could not read schema from file\n");
236                 fclose(pd);
237                 pd=0;
238         return;
239     }
240         if ((cur_packet.record.gdat.schema=ftaschema_parse_string(asciischema))<0) {
241         if (verbose) fprintf(stderr,"gdatinput::could not parse schema\n");
242                 fclose(pd);
243                 pd=0;
244                 return;
245         }
246         cur_packet.record.gdat.numfields=ftaschema_tuple_len(cur_packet.record.gdat.schema);
247 }
248
249 static gs_retval_t gdat_replay_init(gs_sp_t device)
250 {
251     gs_csp_t  verbosetmp;
252     gs_csp_t  delaytmp;
253     gs_csp_t  gshubtmp;
254     gs_csp_t  singlefiletmp;
255
256         gs_csp_t tmp_name;
257
258     if ((tmp_name=get_iface_properties(device,"filename"))==0) {
259                 print_error("csv_init::No GDAT \"Filename\" defined");
260                 exit(0);
261         }
262         name = strdup(tmp_name);
263     
264     if ((verbosetmp=get_iface_properties(device,"verbose"))!=0) {
265         if (strncmp(verbosetmp,"TRUE",4)==0) {
266             verbose=1;
267             fprintf(stderr,"VERBOSE ENABLED\n");
268         } else {
269             fprintf(stderr,"VERBOSE DISABLED\n");
270         }
271     }
272     if ((singlefiletmp=get_iface_properties(device,"singlefile"))!=0) {
273         if (strncmp(singlefiletmp,"TRUE",4)==0) {
274             singlefile=1;
275             if (verbose)
276                 fprintf(stderr,"SINGLEFILE ENABLED\n");
277         } else {
278             if (verbose)
279                 fprintf(stderr,"SINGLEFILE DISABLED\n");
280         }
281     }
282
283     if ((delaytmp=get_iface_properties(device,"startupdelay"))!=0) {
284         if (verbose) {
285             fprintf(stderr,"Startup delay of %u seconds\n",atoi(get_iface_properties(device,"startupdelay")));
286         }
287         startupdelay=atoi(get_iface_properties(device,"startupdelay"));
288     }
289     if ((gshubtmp=get_iface_properties(device,"gshub"))!=0) {
290         if (verbose) {
291             fprintf(stderr,"GDAT format using gshub\n");
292         }
293         gshub=1;
294     }
295     
296     cur_packet.ptype=PTYPE_GDAT;
297     return 0;
298 }
299
300 static gs_retval_t gdat_read_socket(){
301     gs_uint32_t nsz,sz;
302     gs_retval_t retval;
303     if ((retval=gs_read((gs_sp_t)&nsz,sizeof(gs_uint32_t)))<0) {
304         return retval;
305     }
306     sz=ntohl(nsz);
307     if (sz>MAXTUPLESZ) {
308         if (verbose) {
309             fprintf(stderr,"INTERNAL ERROR tuple to long for fixed buffer. Tuple sz %u\n",
310                     (sz));
311         }
312         print_error("Error::Illegal tuple received");
313         exit(0);
314     }
315     
316     cur_packet.record.gdat.datasz=sz;
317     
318     if (gs_read((gs_sp_t)cur_packet.record.gdat.data,(sz))!=1) {
319         if (verbose){
320             fprintf(stderr,"UNEXPECTED END OF FILE. Tryed to read tuple of size %u\n",
321                     (sz));
322         }
323         print_error("Error::Illegal tuple received");
324         exit(0);
325     }
326     cur_packet.systemTime=time(0);
327     return 0;
328 }
329
330 static gs_retval_t  gdat_read_tuple(){
331     gs_uint32_t nsz,sz;
332
333     if (pd==0) next_file();
334 again:
335     while((pd==0) || (fread(&nsz,sizeof(gs_uint32_t),1,pd)!=1)) {
336         if (singlefile==1) {
337             if(verbose) {
338                 fprintf(stderr,"SINGLEFILE PROCESSING DONE!\n");
339             }
340             rts_fta_done();
341             if (verbose) {
342                 fprintf(stderr,"RTS SAYS BY\n");
343             }
344             while(1==1) sleep(1);
345         } else {
346             next_file(); 
347         }
348     }
349     sz=ntohl(nsz);
350     if (sz>MAXTUPLESZ) {
351         if (verbose) {
352                 fprintf(stderr,"INTERNAL ERROR tuple to long for fixed buffer. Tuple sz %u\n",
353                     (sz));
354         }
355         fclose(pd);
356         pd=0;
357         goto again;
358     }
359     
360     cur_packet.record.gdat.datasz=sz;
361         
362     if (fread(cur_packet.record.gdat.data,(sz),1,pd)!=1) {
363         if (verbose){
364             fprintf(stderr,"UNEXPECTED END OF FILE. Tryed to read tuple of size %u\n",
365                     (sz));
366         }
367         fclose(pd);
368         pd=0;
369         goto again;
370     }
371     cur_packet.systemTime=time(0);
372     return 0;
373 }
374
375 static gs_retval_t gdat_process_file()
376 {
377         unsigned cnt=0;
378         static unsigned totalcnt=0;
379         for(cnt=0;cnt<50000;cnt++) {
380                 if (gshub!=0) {
381             gs_retval_t retval;
382             retval=gdat_read_socket();
383                         if (retval==-1) return 0; // got a timeout so service message queue
384             if ((retval==-2) || (ftaschema_is_eof_tuple(cur_packet.record.gdat.schema,(void *)cur_packet.record.gdat.data))) {
385                 // we signal that everything is done if we either see an EOF tuple OR the socket is closed by the peer
386                 if (verbose)
387                     fprintf(stderr,"Done processing waiting for things to shut down\n");
388                 rts_fta_done();
389                 // now just service message queue until we get killed or loose connectivity
390                 while (0==0) {
391                     fta_start_service(0); // service all waiting messages
392                     usleep(1000); // sleep a millisecond
393                 }
394             }
395                 } else {
396                         gdat_read_tuple();
397                 }
398                 rts_fta_process_packet(&cur_packet);
399         }
400         totalcnt=totalcnt+cnt;
401         if (verbose) {
402                 fprintf(stderr,"Processesd %u tuple\n",totalcnt);
403         }
404         return 0;
405 }
406
407 gs_retval_t main_gdat(gs_int32_t devicenum, gs_sp_t device, gs_int32_t mapcnt, gs_sp_t map[]) {
408     gs_uint32_t cont;
409     endpoint mygshub;
410     
411     gdat_replay_init(device);
412     
413     /* initalize host_lib */
414     if (verbose) {
415         fprintf(stderr,"Init LFTAs for %s\n",device);
416     }
417     
418     if (hostlib_init(LFTA,0,devicenum,mapcnt,map)<0) {
419         fprintf(stderr,"%s::error:could not initiate host lib for clearinghouse\n",
420                 device);
421         exit(7);
422     }
423     
424     fta_init(device);
425     
426     cont=startupdelay+time(0);
427     
428     if (verbose) { fprintf(stderr,"Start startup delay"); }
429     
430     while (cont>time(NULL)) {
431         if (fta_start_service(0)<0) {
432             fprintf(stderr,"%s::error:in processing the msg queue\n",
433                     device);
434             exit(9);
435         }
436         usleep(1); /* sleep for one millisecond */
437     }
438     
439     if (verbose) { fprintf(stderr,"... Done\n"); }
440     
441     // open the connection to the data source
442     
443     if (gshub!=0) { init_socket();}
444     
445     // wait to process till we get the signal from GSHUB
446         if (get_hub(&mygshub)!=0) {
447                 print_error("ERROR:could not find gshub for data source");
448                 exit(0);
449         }
450     while(get_startprocessing(mygshub,get_instance_name(),0)!=0) {
451         usleep(100);
452         if (fta_start_service(0)<0) {
453             fprintf(stderr,"%s::error:in processing the msg queue\n",
454                     device);
455             exit(9);
456         }
457     }
458
459     /* now we enter an endless loop to process data */
460     if (verbose) {
461         fprintf(stderr,"Start processing %s\n",device);
462     }
463     
464     while (1==1) {
465         /* proess packets data stream*/
466         if (gdat_process_file()<0) {
467             fprintf(stderr,"%s::error:in processing packets\n",
468                     device);
469             exit(8);
470         }
471         /* process all messages on the message queue*/
472         if (fta_start_service(0)<0) {
473             fprintf(stderr,"%s::error:in processing the msg queue\n",
474                     device);
475             exit(9);
476         }
477     }
478     return 0;
479 }
480
481
482