Added protobuf support
[com/gs-lite.git] / src / lib / gscprts / rts_gdat.c
1 /* ------------------------------------------------
2  Copyright 2014 AT&T Intellectual Property
3  Licensed under the Apache License, Version 2.0 (the "License");
4  you may not use this file except in compliance with the License.
5  You may obtain a copy of the License at
6  
7  http://www.apache.org/licenses/LICENSE-2.0
8  
9  Unless required by applicable law or agreed to in writing, software
10  distributed under the License is distributed on an "AS IS" BASIS,
11  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  See the License for the specific language governing permissions and
13  limitations under the License.
14  ------------------------------------------- */
15 #include <time.h>
16 #include <stdlib.h>
17 #include <string.h>
18 #include <unistd.h>
19 #include <fcntl.h>
20 #include <sys/time.h>
21 #include <sys/stat.h>
22 #include <sys/types.h>
23 #include <sys/socket.h>
24 #include <netinet/in.h>
25 #include "errno.h"
26
27 #include "gsconfig.h"
28 #include "gshub.h"
29 #include "gstypes.h"
30 #include "lapp.h"
31 #include "fta.h"
32 #include "stdio.h"
33 #include "stdlib.h"
34 #include "packet.h"
35 #include "schemaparser.h"
36 #include "lfta/rts.h"
37
38
39 void rts_fta_process_packet(struct packet * p);
40 void rts_fta_done();
41 void fta_init(gs_sp_t device);
42
43 #define CSVMAXLINE 1000000
44
45 static FILE *pd;
46 static struct packet cur_packet;
47 static gs_sp_t name;
48 static gs_uint32_t verbose=0;
49 static gs_uint32_t startupdelay=0;
50 static gs_uint32_t gshub=0;
51 static int socket_desc=0;
52 static gs_uint32_t singlefile=0;
53
54 static void gdat_replay_check_messages() {
55     if (fta_start_service(0)<0) {
56         print_error("Error:in processing the msg queue for a replay file");
57         exit(9);
58     }
59 }
60
61 static gs_retval_t gs_read(gs_sp_t buffer, gs_uint32_t length){
62         gs_uint32_t used=0;
63         gs_uint32_t cur;
64     fd_set socket_rset;
65     fd_set socket_eset;
66     struct timeval socket_timeout;
67     int retval;
68     
69     FD_ZERO(&socket_rset);
70     FD_SET(socket_desc,&socket_rset);
71     FD_ZERO(&socket_eset);
72     FD_SET(socket_desc,&socket_eset);
73     // timeout in one millisecon
74     socket_timeout.tv_sec=0;
75     socket_timeout.tv_usec=1000;
76     
77     if ((retval=select(socket_desc+1,&socket_rset,0,&socket_eset,&socket_timeout))<=0) {
78         if (retval==0) {
79             // caught a timeout
80             return -1;
81         }
82         print_error("ERROR:select error in reading data from socket");
83         exit(0);
84     }
85     
86         while(used < length) {
87                 if ((cur=read(socket_desc,&(buffer[used]),length-used))<=0) {
88             if (errno==115) return -2; // error code we get if the server closes the connection on us
89                         print_error("ERROR:could not read data from gdat stream");
90                         exit(0);
91                 }
92                 used+=cur;
93         }
94         return 1;
95 }
96
97 static gs_uint32_t gs_read_line(gs_sp_t buffer, gs_uint32_t length){
98     gs_uint32_t used=0;
99     gs_uint32_t cur;
100     
101     while((used < (length-1)) && ((used==0)|| (buffer[used-1]!='\n'))) {
102         if ((cur=read(socket_desc,&(buffer[used]),1))<=0) {
103             print_error("ERROR:could not read data from gdat stream");
104             exit(0);
105         }
106         used+=cur;
107     }
108         buffer[used]=0;
109         return 1;
110 }
111
112
113 static void init_socket() {
114         endpoint gshub;
115         endpoint srcinfo;
116         struct sockaddr_in server;
117     gs_int32_t parserversion;
118     gs_uint32_t schemalen;
119     static char * asciischema=0;
120         gs_int8_t buf[1024];
121         
122         if (get_hub(&gshub)!=0) {
123                 print_error("ERROR:could not find gshub for data source");
124                 exit(0);
125         }
126     
127         if (get_streamsource(gshub,name,&srcinfo,1) !=0) {
128                 print_error("ERROR:could not find data source for stream\n");
129                 exit(0);
130         }
131     
132         socket_desc = socket(AF_INET , SOCK_STREAM , 0);
133     if (socket_desc == -1)
134     {
135         print_error("ERROR:could not create socket for data stream");
136                 exit(0);
137     }
138         server.sin_addr.s_addr = srcinfo.ip;
139     server.sin_family = AF_INET;
140     server.sin_port = srcinfo.port;
141     
142         if (connect(socket_desc , (struct sockaddr *)&server , sizeof(server)) < 0)
143     {
144                 print_error("ERROR: could not open connection to data source");
145                 exit(0);
146         }
147     
148     
149         gs_read_line(buf,1024);
150         if (strncmp(buf,"GDAT",4)!=0) {
151                 print_error("ERROR: not a GDAT stream\n");
152         exit(0);
153     }
154         gs_read_line(buf,1024);
155         if (sscanf(buf,"VERSION:%u\n",&parserversion)!=1) {
156         print_error("ERROR: no GDAT VERSION given\n");
157         exit(0);
158     }
159         gs_read_line(buf,1024);
160     if (sscanf(buf,"SCHEMALENGTH:%u\n",&schemalen)!=1) {
161         print_error("ERROR: no GDAT SCHEMALENGTH given\n");
162         exit(0);
163     }
164     if (schemaparser_accepts_version(parserversion)!=1) {
165         print_error("gdatinput::wrong gdat version\n");
166         exit(0);
167     }
168     if ((asciischema=malloc(schemalen))==0) {
169         print_error("gdatinput::could not allocate memory for schema\n");
170         exit(0);
171     }
172     if (gs_read(asciischema,schemalen)!=1) {
173         print_error("gdatinput::could not read schema from file\n");
174         exit(0);
175     }
176     if ((cur_packet.record.gdat.schema=ftaschema_parse_string(asciischema))<0) {
177         print_error("gdatinput::could not parse schema\n");
178         exit(0);
179     }
180     cur_packet.record.gdat.numfields=ftaschema_tuple_len(cur_packet.record.gdat.schema);
181 }
182
183
184 static void next_file() {
185     struct stat s;
186     gs_int32_t parserversion;
187     gs_uint32_t schemalen;
188         static char * asciischema=0;
189     
190     if (verbose) {
191         fprintf(stderr,"Opening %s\n",name);
192     }
193     while (lstat(name,&s)!=0) {
194         if (errno!=ENOENT) {
195             print_error("gdat::lstat unexpected return value");
196             exit(10);
197         }
198         gdat_replay_check_messages();
199         usleep(10000);
200     }
201     
202     
203     if (pd!=0) {
204         fclose(pd);
205         if (asciischema!=0) free(asciischema);
206         if (cur_packet.record.gdat.schema>=0) {
207             ftaschema_free(cur_packet.record.gdat.schema);
208         }
209     }
210     
211     if ((pd=fopen(name,"r"))==0) return;
212         if (singlefile==0) {
213                 unlink(name);
214         }
215     
216         if (fscanf(pd,"GDAT\nVERSION:%u\nSCHEMALENGTH:%u\n",&parserversion,&schemalen)!=2) {
217                 if (verbose) fprintf(stderr,"gdatinput::could not parse GDAT header\n");
218                 fclose(pd);
219                 pd=0;
220                 return;
221     }
222     if (schemaparser_accepts_version(parserversion)!=1) {
223         if (verbose) fprintf(stderr,"gdatinput::wrong gdat version\n");
224         fclose(pd);
225         pd=0;
226                 return;
227         }
228         if ((asciischema=malloc(schemalen))==0) {
229         if (verbose) fprintf(stderr,"gdatinput::could not allocate memory for schema\n");
230                 fclose(pd);
231                 pd=0;
232         return;
233     }
234         if (fread(asciischema,schemalen,1,pd)!=1) {
235         if (verbose) fprintf(stderr,"gdatinput::could not read schema from file\n");
236                 fclose(pd);
237                 pd=0;
238         return;
239     }
240         if ((cur_packet.record.gdat.schema=ftaschema_parse_string(asciischema))<0) {
241         if (verbose) fprintf(stderr,"gdatinput::could not parse schema\n");
242                 fclose(pd);
243                 pd=0;
244                 return;
245         }
246         cur_packet.record.gdat.numfields=ftaschema_tuple_len(cur_packet.record.gdat.schema);
247 }
248
249 static gs_retval_t gdat_replay_init(gs_sp_t device)
250 {
251     gs_sp_t  verbosetmp;
252     gs_sp_t  delaytmp;
253     gs_sp_t  gshubtmp;
254     gs_sp_t  singlefiletmp;
255
256     if ((name=get_iface_properties(device,"filename"))==0) {
257                 print_error("csv_init::No GDAT \"Filename\" defined");
258                 exit(0);
259         }
260     
261     if ((verbosetmp=get_iface_properties(device,"verbose"))!=0) {
262         if (strncmp(verbosetmp,"TRUE",4)==0) {
263             verbose=1;
264             fprintf(stderr,"VERBOSE ENABLED\n");
265         } else {
266             fprintf(stderr,"VERBOSE DISABLED\n");
267         }
268     }
269     if ((singlefiletmp=get_iface_properties(device,"singlefile"))!=0) {
270         if (strncmp(singlefiletmp,"TRUE",4)==0) {
271             singlefile=1;
272             if (verbose)
273                 fprintf(stderr,"SINGLEFILE ENABLED\n");
274         } else {
275             if (verbose)
276                 fprintf(stderr,"SINGLEFILE DISABLED\n");
277         }
278     }
279
280     if ((delaytmp=get_iface_properties(device,"startupdelay"))!=0) {
281         if (verbose) {
282             fprintf(stderr,"Startup delay of %u seconds\n",atoi(get_iface_properties(device,"startupdelay")));
283         }
284         startupdelay=atoi(get_iface_properties(device,"startupdelay"));
285     }
286     if ((gshubtmp=get_iface_properties(device,"gshub"))!=0) {
287         if (verbose) {
288             fprintf(stderr,"GDAT format using gshub\n");
289         }
290         gshub=1;
291     }
292     
293     cur_packet.ptype=PTYPE_GDAT;
294     return 0;
295 }
296
297 static gs_retval_t gdat_read_socket(){
298     gs_uint32_t nsz,sz;
299     gs_retval_t retval;
300     if ((retval=gs_read((gs_sp_t)&nsz,sizeof(gs_uint32_t)))<0) {
301         return retval;
302     }
303     sz=ntohl(nsz);
304     if (sz>MAXTUPLESZ) {
305         if (verbose) {
306             fprintf(stderr,"INTERNAL ERROR tuple to long for fixed buffer. Tuple sz %u\n",
307                     (sz));
308         }
309         print_error("Error::Illegal tuple received");
310         exit(0);
311     }
312     
313     cur_packet.record.gdat.datasz=sz;
314     
315     if (gs_read((gs_sp_t)cur_packet.record.gdat.data,(sz))!=1) {
316         if (verbose){
317             fprintf(stderr,"UNEXPECTED END OF FILE. Tryed to read tuple of size %u\n",
318                     (sz));
319         }
320         print_error("Error::Illegal tuple received");
321         exit(0);
322     }
323     cur_packet.systemTime=time(0);
324     return 0;
325 }
326
327 static gs_retval_t  gdat_read_tuple(){
328     gs_uint32_t nsz,sz;
329
330     if (pd==0) next_file();
331 again:
332     while((pd==0) || (fread(&nsz,sizeof(gs_uint32_t),1,pd)!=1)) {
333         if (singlefile==1) {
334             if(verbose) {
335                 fprintf(stderr,"SINGLEFILE PROCESSING DONE!\n");
336             }
337             rts_fta_done();
338             if (verbose) {
339                 fprintf(stderr,"RTS SAYS BY\n");
340             }
341             while(1==1) sleep(1);
342         } else {
343             next_file(); 
344         }
345     }
346     sz=ntohl(nsz);
347     if (sz>MAXTUPLESZ) {
348         if (verbose) {
349                 fprintf(stderr,"INTERNAL ERROR tuple to long for fixed buffer. Tuple sz %u\n",
350                     (sz));
351         }
352         fclose(pd);
353         pd=0;
354         goto again;
355     }
356     
357     cur_packet.record.gdat.datasz=sz;
358         
359     if (fread(cur_packet.record.gdat.data,(sz),1,pd)!=1) {
360         if (verbose){
361             fprintf(stderr,"UNEXPECTED END OF FILE. Tryed to read tuple of size %u\n",
362                     (sz));
363         }
364         fclose(pd);
365         pd=0;
366         goto again;
367     }
368     cur_packet.systemTime=time(0);
369     return 0;
370 }
371
372 static gs_retval_t gdat_process_file()
373 {
374         unsigned cnt=0;
375         static unsigned totalcnt=0;
376         for(cnt=0;cnt<50000;cnt++) {
377                 if (gshub!=0) {
378             gs_retval_t retval;
379             retval=gdat_read_socket();
380                         if (retval==-1) return 0; // got a timeout so service message queue
381             if ((retval==-2) || (ftaschema_is_eof_tuple(cur_packet.record.gdat.schema,(void *)cur_packet.record.gdat.data))) {
382                 // we signal that everything is done if we either see an EOF tuple OR the socket is closed by the peer
383                 if (verbose)
384                     fprintf(stderr,"Done processing waiting for things to shut down\n");
385                 rts_fta_done();
386                 // now just service message queue until we get killed or loose connectivity
387                 while (0==0) {
388                     fta_start_service(0); // service all waiting messages
389                     usleep(1000); // sleep a millisecond
390                 }
391             }
392                 } else {
393                         gdat_read_tuple();
394                 }
395                 rts_fta_process_packet(&cur_packet);
396         }
397         totalcnt=totalcnt+cnt;
398         if (verbose) {
399                 fprintf(stderr,"Processesd %u tuple\n",totalcnt);
400         }
401         return 0;
402 }
403
404 gs_retval_t main_gdat(gs_int32_t devicenum, gs_sp_t device, gs_int32_t mapcnt, gs_sp_t map[]) {
405     gs_uint32_t cont;
406     endpoint mygshub;
407     
408     gdat_replay_init(device);
409     
410     /* initalize host_lib */
411     if (verbose) {
412         fprintf(stderr,"Init LFTAs for %s\n",device);
413     }
414     
415     if (hostlib_init(LFTA,0,devicenum,mapcnt,map)<0) {
416         fprintf(stderr,"%s::error:could not initiate host lib for clearinghouse\n",
417                 device);
418         exit(7);
419     }
420     
421     fta_init(device);
422     
423     cont=startupdelay+time(0);
424     
425     if (verbose) { fprintf(stderr,"Start startup delay"); }
426     
427     while (cont>time(NULL)) {
428         if (fta_start_service(0)<0) {
429             fprintf(stderr,"%s::error:in processing the msg queue\n",
430                     device);
431             exit(9);
432         }
433         usleep(1); /* sleep for one millisecond */
434     }
435     
436     if (verbose) { fprintf(stderr,"... Done\n"); }
437     
438     // open the connection to the data source
439     
440     if (gshub!=0) { init_socket();}
441     
442     // wait to process till we get the signal from GSHUB
443         if (get_hub(&mygshub)!=0) {
444                 print_error("ERROR:could not find gshub for data source");
445                 exit(0);
446         }
447     while(get_startprocessing(mygshub,get_instance_name(),0)!=0) {
448         usleep(100);
449         if (fta_start_service(0)<0) {
450             fprintf(stderr,"%s::error:in processing the msg queue\n",
451                     device);
452             exit(9);
453         }
454     }
455
456     /* now we enter an endless loop to process data */
457     if (verbose) {
458         fprintf(stderr,"Start processing %s\n",device);
459     }
460     
461     while (1==1) {
462         /* proess packets data stream*/
463         if (gdat_process_file()<0) {
464             fprintf(stderr,"%s::error:in processing packets\n",
465                     device);
466             exit(8);
467         }
468         /* process all messages on the message queue*/
469         if (fta_start_service(0)<0) {
470             fprintf(stderr,"%s::error:in processing the msg queue\n",
471                     device);
472             exit(9);
473         }
474     }
475     return 0;
476 }
477
478
479