Refactor csv input processing. Add support for kafka interfaces. Fix bug in join...
[com/gs-lite.git] / src / lib / gscprts / rts_csv.cc
1 /* ------------------------------------------------
2  Copyright 2014 AT&T Intellectual Property
3  Licensed under the Apache License, Version 2.0 (the "License");
4  you may not use this file except in compliance with the License.
5  You may obtain a copy of the License at
6
7  http://www.apache.org/licenses/LICENSE-2.0
8
9  Unless required by applicable law or agreed to in writing, software
10  distributed under the License is distributed on an "AS IS" BASIS,
11  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  See the License for the specific language governing permissions and
13  limitations under the License.
14  ------------------------------------------- */
15 #include <time.h>
16 #include <stdlib.h>
17 #include <string.h>
18 #include <unistd.h>
19 #include <fcntl.h>
20 #include <sys/time.h>
21 #include <sys/stat.h>
22 #include <sys/types.h>
23 #include <sys/socket.h>
24 #include <netinet/in.h>
25 #include <zlib.h>
26 #include "errno.h"
27 #include "stdio.h"
28 #include "stdlib.h"
29
30
31 extern "C" {
32 #include "gsconfig.h"
33 #include "gshub.h"
34 #include "gstypes.h"
35 #include "lapp.h"
36 #include "fta.h"
37 #include "packet.h"
38 #include "schemaparser.h"
39 #include "lfta/rts.h"
40
41
42 void fta_init(gs_sp_t device);
43 void rts_fta_process_packet(struct packet * p);
44 void rts_fta_done();
45 }
46
47 time_t st_time;
48
49 #define CSVMAXLINE 1000000
50
51 #define CHUNK 262144
52 static gs_uint8_t in[CHUNK + CSVMAXLINE];
53 static gs_uint8_t out[CHUNK + CSVMAXLINE];
54
55 #define FILEWAIT_TIMEOUT 10000          // timeout value for getting next file (in microseconds)
56
57 gs_uint32_t max_field_csv = CSVELEMENTS;
58
59 z_stream strm;
60
61 #ifdef BSA_ENABLED
62 #include "bsa_stream.hpp"
63 #include "bsa_util.hpp"
64 BSA::FileStream::ISubStream* stream;
65 BSA::FileStream::IFileHandle* ifh;
66 BSA::FileStream::Reader* reader;
67 #endif
68
69 gs_sp_t dev;
70
71 static int listensockfd=-1;
72 static int fd=-1;
73 static struct packet cur_packet;
74 static gs_sp_t name;
75 static gs_sp_t line;
76 static ssize_t len;
77 static size_t line_len;
78 static gs_uint32_t lineend=0;
79 static gs_uint8_t csvdel = ',';
80 static gs_uint32_t verbose=0;
81 static gs_uint32_t startupdelay=0;
82 static gs_uint32_t singlefile=0;
83 static gs_uint32_t use_gzip=0;
84 static gs_uint32_t use_bsa=0;
85 static gs_uint32_t gshub=0;
86 static int socket_desc=0;
87
88 #include "lfta/csv_parser.h"
89
90 // leftover bytes not consumed at the end of the data chunk
91  gs_uint32_t leftover = 0;
92
93  uint64_t get_posix_clock_time ()
94 {
95     struct timespec ts;
96
97     if (clock_gettime (CLOCK_MONOTONIC, &ts) == 0)
98         return (uint64_t) (ts.tv_sec * 1000 + ts.tv_nsec / 1000000);
99     else
100         return 0;
101 }
102
103
104 static void init_inflate() {
105         gs_int32_t ret;
106
107     /* allocate inflate state */
108     strm.zalloc = Z_NULL;
109     strm.zfree = Z_NULL;
110     strm.opaque = Z_NULL;
111     strm.avail_in = 0;
112     strm.next_in = Z_NULL;
113     ret = inflateInit2(&strm, 15 /* window bits */ | 32 /* use gzip */);
114     if (ret != Z_OK) {
115                 print_error((gs_sp_t)"csv::inflateInit2");
116                 exit(10);
117     }
118 }
119
120 static void csv_replay_check_messages() {
121   if (fta_start_service(0)<0) {
122                 print_error((gs_sp_t)"Error:in processing the msg queue for a replay file");
123                 exit(9);
124   }
125 }
126
127 static gs_int32_t read_chunk_socket(gs_sp_t buffer, gs_uint32_t length){
128         gs_uint32_t r;
129         fd_set socket_rset;
130         fd_set socket_eset;
131         struct timeval socket_timeout;
132         gs_int32_t retval;
133
134         FD_ZERO(&socket_rset);
135         FD_SET(socket_desc,&socket_rset);
136         FD_ZERO(&socket_eset);
137         FD_SET(socket_desc,&socket_eset);
138         // timeout in one millisecond
139         socket_timeout.tv_sec = 0;
140         socket_timeout.tv_usec = 1000;
141
142         if ((retval = select(socket_desc+1,&socket_rset,0,&socket_eset,&socket_timeout))<=0) {
143                 if (retval==0) {
144                         // caught a timeout
145                         return -1;
146                 }
147                 return -2;
148         }
149
150         if ((r=read(socket_desc, buffer + leftover, length)) <= 0) {
151                 print_error((gs_sp_t)"ERROR:could not read data from csv stream");
152                 return -2;
153         }
154
155         return r;
156 }
157
158 static void init_socket() {
159         endpoint gshub;
160         endpoint srcinfo;
161         struct sockaddr_in server;
162         gs_int32_t parserversion;
163         gs_uint32_t schemalen;
164         static gs_sp_t asciischema=0;
165         gs_int8_t buf[1024];
166
167         if (get_hub(&gshub)!=0) {
168                 print_error((gs_sp_t)"ERROR:could not find gshub for data source");
169                 exit(0);
170         }
171
172         if (get_streamsource(gshub,name,&srcinfo,1) !=0) {
173                 print_error((gs_sp_t)"ERROR:could not find data source for stream\n");
174                 exit(0);
175         }
176
177         socket_desc = socket(AF_INET , SOCK_STREAM , 0);
178         if (socket_desc == -1)
179         {
180                 print_error((gs_sp_t)"ERROR:could not create socket for data stream");
181                 exit(0);
182         }
183         server.sin_addr.s_addr = srcinfo.ip;
184         server.sin_family = AF_INET;
185         server.sin_port = srcinfo.port;
186
187         if (connect(socket_desc , (struct sockaddr *)&server , sizeof(server)) < 0)  {
188                 print_error((gs_sp_t)"ERROR: could not open connection to data source");
189                 exit(0);
190         }
191 }
192
193 static void next_file() {
194         struct stat s;
195         if (verbose) {
196                 fprintf(stderr,"Opening %s\n",name);
197         }
198         if (singlefile == 0) {
199                 while (lstat(name, &s) != 0) {
200                         if (errno != ENOENT) {
201                                 print_error((gs_sp_t)"csv::lstat unexpected return value");
202                                 exit(10);
203                         }
204                         csv_replay_check_messages();
205                         usleep(FILEWAIT_TIMEOUT);
206                 }
207                 if      (fd > 0) {
208                         close(fd);
209                 }
210         }
211         if ((fd = open(name, O_RDONLY)) < 0) {
212                 print_error((gs_sp_t)"csv::open failed ");
213                 exit(10);
214         }
215         posix_fadvise(fd, 0, 0, POSIX_FADV_SEQUENTIAL);
216         if (singlefile == 0) {
217                 unlink(name);
218         }
219         if (use_gzip) {
220                 init_inflate();
221         }
222 }
223
224 #ifdef BSA_ENABLED
225
226 uint64_t bsa_file_start_time = 0;
227 uint64_t bsa_total_elapsed_time = 0;
228
229 static void next_file_bsa() {
230     int ret;
231
232         if (bsa_file_start_time) {
233                 bsa_total_elapsed_time += (get_posix_clock_time()- bsa_file_start_time);
234                 bsa_file_start_time = 0;
235         }
236
237     ifh = stream->getNextFileHandle(FILEWAIT_TIMEOUT / 1000);
238     if (!ifh) {
239         return;
240         }
241         if (verbose) {
242                 fprintf(stderr,"%s: Opening %s %s\n", dev, ifh->getHandle().c_str(), stream->getPositionHandle().c_str());
243         }
244         bsa_file_start_time = get_posix_clock_time();
245     reader = ifh->openFile();
246
247         if (use_gzip) {
248                 init_inflate();
249         }
250 }
251
252 static void close_file_bsa() {
253
254         if (reader) {
255                 reader->close();
256                 delete reader;
257         }
258         reader = NULL;
259
260         if (ifh) {
261         ifh->finished();
262                 delete ifh;
263         }
264         ifh = NULL;
265 }
266
267 #endif
268
269 static gs_retval_t csv_replay_init(gs_sp_t device)
270 {
271         gs_sp_t verbosetmp;
272         gs_sp_t delaytmp;
273         gs_sp_t gshubtmp;
274         gs_sp_t tempdel;
275         gs_sp_t singlefiletmp;
276         gs_sp_t compressortmp;
277         gs_sp_t bsatmp;    
278
279         if ((verbosetmp=get_iface_properties(device,(gs_sp_t)"verbose"))!=0) {
280                 if (strncmp(verbosetmp,"TRUE",4)==0) {
281                         verbose=1;
282                         fprintf(stderr,"VERBOSE ENABLED\n");
283                 } else {
284                         fprintf(stderr,"VERBOSE DISABLED\n");
285                 }
286         }
287
288         if ((name=get_iface_properties(device,(gs_sp_t)"filename"))==0) {
289                 print_error((gs_sp_t)"csv_replay_init::No CSV \"Filename\" defined");
290                 exit(0);
291         }
292         tempdel=get_iface_properties(device,(gs_sp_t)"csvseparator");
293         if (tempdel != 0 ) {
294                 csvdel = tempdel[0];
295                 csv_set_delim(csvdel);
296         }
297
298         if ((singlefiletmp=get_iface_properties(device,(gs_sp_t)"singlefile"))!=0) {
299                 if (strncmp(singlefiletmp,"TRUE",4)==0) {
300                         singlefile=1;
301                         if (verbose)
302                                 fprintf(stderr,"SINGLEFILE ENABLED\n");
303                 } else {
304                         if (verbose)
305                                 fprintf(stderr,"SINGLEFILE DISABLED\n");
306                 }
307         }
308
309         if ((compressortmp=get_iface_properties(device,(gs_sp_t)"compressor"))!=0) {
310                 if (strncmp(compressortmp,"GZIP",4)==0) {
311                         use_gzip=1;
312                         if (verbose)
313                                 fprintf(stderr,"USING ZLIP COMPRESSOR ENABLED\n");
314                 } else {
315                         print_error((gs_sp_t)"csv_replay_init::Unknown value for interface property \"Compressor\"");
316                         exit(0);
317                 }
318         }
319
320         if ((bsatmp=get_iface_properties(device,(gs_sp_t)"bsa"))!=0) {
321                 if (strncmp(bsatmp,"TRUE",4)==0) {
322                         #ifndef BSA_ENABLED
323                                 print_error((gs_sp_t)"csv_replay_init::runtime not built with BSA support to use BSA interfaces");              
324                                 exit(0);                                                
325                         #endif
326
327                         use_bsa=1;
328                         if (verbose)
329                                 fprintf(stderr,"USING BSA STREAMS\n");
330                 } 
331         }    
332
333         if ((delaytmp=get_iface_properties(device,(gs_sp_t)"startupdelay"))!=0) {
334                 if (verbose) {
335                                 fprintf(stderr,"Startup delay of %u seconds\n",atoi(get_iface_properties(device,(gs_sp_t)"startupdelay")));
336                 }
337                 startupdelay=atoi(get_iface_properties(device,(gs_sp_t)"startupdelay"));
338         }
339         if ((gshubtmp=get_iface_properties(device,(gs_sp_t)"gshub"))!=0) {
340                 if (verbose) {
341                                 fprintf(stderr,"CSV format using gshub\n");
342                 }
343                 gshub=1;
344         }
345
346         cur_packet.ptype=PTYPE_CSV;
347         return 0;
348 }
349
350 static inline int consume_chunk(gs_sp_t chunk, gs_uint32_t chunk_size) {
351     int tuple_consumed = 0;     
352         gs_sp_t linepos = chunk;
353         gs_sp_t new_linepos = (gs_sp_t)memchr(linepos + leftover, '\n', chunk_size);
354         gs_sp_t end_pos = chunk + chunk_size + leftover;
355         leftover = chunk_size;
356
357         while (new_linepos) {
358                 // *new_linepos = 0;                            // terminate the line
359                 csv_parse_line(linepos, new_linepos - linepos);
360                 rts_fta_process_packet(&cur_packet);
361         tuple_consumed++;               
362                 linepos = new_linepos + 1;
363                 leftover = end_pos - linepos;
364                 new_linepos = (gs_sp_t)memchr(linepos, '\n', leftover);
365         }
366         memcpy(chunk, linepos, leftover);
367
368     return tuple_consumed;      
369 }
370
371 static int csv_process_chunk(gs_uint32_t chunk_size)
372 {
373     gs_int32_t ret;
374     gs_uint32_t have = chunk_size;
375     gs_uint32_t tuple_consumed = 0;
376
377         if (use_gzip) {
378                 strm.avail_in = have;
379                 strm.next_in = in;
380                 /* run inflate() on input until output buffer not full */
381                 do {
382                         strm.avail_out = CHUNK;
383                         strm.next_out = out + leftover;
384                         ret = inflate(&strm, Z_NO_FLUSH);
385                         /* assert(ret != Z_STREAM_ERROR);  state not clobbered */
386                         switch (ret) {
387                                 case Z_NEED_DICT:
388                                         ret = Z_DATA_ERROR;     /* and fall through */
389                                 case Z_DATA_ERROR:
390                                 case Z_MEM_ERROR:
391                                         (void)inflateEnd(&strm);
392 #ifdef BSA_ENABLED      
393                                         close_file_bsa();
394 #endif
395                                         fprintf(stderr,"Error inflating data chunk\n");
396                                         return 0;
397                         }
398                         have = CHUNK - strm.avail_out;
399                         tuple_consumed += consume_chunk((gs_sp_t)out, have);
400                 } while (strm.avail_out == 0);
401                 /* done when inflate() says it's done */
402
403                 if (ret == Z_STREAM_END) {
404                         inflateEnd(&strm);
405 #ifdef BSA_ENABLED      
406                         close_file_bsa();
407 #endif                         
408         }
409         } else {
410                 tuple_consumed += consume_chunk((gs_sp_t)out, have);
411         }
412     
413     return tuple_consumed;
414 }
415
416 static gs_int32_t csv_read_chunk() {
417
418         gs_int32_t have;
419
420         if (gshub!=0) {
421                 return read_chunk_socket((gs_sp_t)out, CHUNK);
422         } else {
423                 gs_sp_t read_pos = (gs_sp_t)(use_gzip ? in : (out + leftover));
424
425 #ifdef BSA_ENABLED
426                 if (use_bsa) {
427                         if (ifh == 0) next_file_bsa();
428                         if (ifh == 0)           // if no new files available return
429                                 return -1;              // -1 indicates a timeout
430
431                 while ((have = reader->read(read_pos, CHUNK)) == 0) {
432                                 close_file_bsa();
433
434                         next_file_bsa();
435     
436                         if (ifh == 0) { // if no new files available return
437                         return -1;      // -1 indicates a timeout
438                                 }
439                 }
440                 } else {
441 #endif
442                         if (fd <= 0) next_file();
443                         while ((have = read(fd, read_pos, CHUNK)) == 0) {
444                                 if (singlefile==1) {
445                                         if(verbose) {
446                                                 fprintf(stderr,"SINGLEFILE PROCESSING DONE! RTS SAYS BYE\n");
447                                         }
448                                         return -2;
449
450                                 } else {
451                                         next_file();
452                                 }
453                         }
454 #ifdef BSA_ENABLED              
455                 }
456 #endif
457         }
458         return have;
459 }
460
461 static gs_retval_t csv_process_input()
462 {
463         unsigned cnt = 0;
464         static unsigned totalcnt = 0;
465
466         gs_int32_t retval;
467         while(cnt < 50000) {                    // process up to 50000 tuples at a time
468                 retval = csv_read_chunk();
469                 if (retval == -1) return 0; // got a timeout so service message queue
470                 if (retval == -2) {
471                         // we signal that everything is done
472                         if (verbose)
473                                 fprintf(stderr,"Done processing, waiting for things to shut down\n");
474                         rts_fta_done();
475                         // now just service message queue until we get killed or loose connectivity
476                         while (true) {
477                                 fta_start_service(0); // service all waiting messages
478                                 usleep(1000); // sleep a millisecond
479                         }
480                 }
481                 cnt += csv_process_chunk((gs_uint32_t)retval);
482         }
483         totalcnt = totalcnt + cnt;
484         if (verbose) {
485 #ifdef BSA_ENABLED              
486                 fprintf(stderr,"%s: Processed %u tuples, rate = %lf tup/sec\n", dev, totalcnt, 1000.0 * (double)totalcnt / (double)bsa_total_elapsed_time);
487 #else
488                 fprintf(stderr,"Processed %u tuples, rate = %lf tup/sec\n", totalcnt, (double)totalcnt / (double)(time(NULL) - st_time));
489 #endif
490         }
491         return 0;
492 }
493
494 extern "C" gs_retval_t main_csv(gs_int32_t devicenum, gs_sp_t device, gs_int32_t mapcnt, gs_sp_t map[]) {
495         gs_uint32_t cont;
496         endpoint mygshub;
497
498     dev = device;
499
500         csv_replay_init(device);
501
502         /* initalize host_lib */
503         if (verbose) {
504                 fprintf(stderr,"Init LFTAs for %s\n",device);
505         }
506
507         if (hostlib_init(LFTA,0,devicenum,mapcnt,map) < 0) {
508                 fprintf(stderr,"%s::error:could not initiate host lib for clearinghouse\n",
509                         device);
510                 exit(7);
511         }
512
513         fta_init(device); /*xxx probably should get error code back put Ted doesn't give me one*/
514
515         // set maximum field nubmer to be extracted by csv parser
516         csv_set_maxfield(max_field_csv);
517
518         cont = startupdelay + time(0);
519
520         if (verbose) { fprintf(stderr,"Start startup delay"); }
521
522         while (cont > time(NULL)) {
523                 if (fta_start_service(0) < 0) {
524                         fprintf(stderr,"%s::error:in processing the msg queue\n", device);
525                         exit(9);
526                 }
527                 usleep(1000); /* sleep for one millisecond */
528         }
529
530         if (verbose) { fprintf(stderr,"... Done\n"); }
531
532         // open the connection to the data source
533         if (gshub != 0) { init_socket();}
534
535         // wait to process till we get the signal from GSHUB
536         if (get_hub(&mygshub) != 0) {
537                 print_error((gs_sp_t)"ERROR:could not find gshub for data source");
538                 exit(0);
539         }
540         while(get_startprocessing(mygshub,get_instance_name(),0) != 0) {
541                 usleep(100);
542                 if (fta_start_service(0) < 0) {
543                         fprintf(stderr,"%s::error:in processing the msg queue\n", device);
544                         exit(9);
545                 }
546         }
547
548         /* now we enter an endless loop to process data */
549         if (verbose) {
550                 fprintf(stderr,"Start processing %s\n",device);
551         }
552
553 #ifdef BSA_ENABLED
554     if (use_bsa) {
555             stream = BSA::FileStream::ISubStream::construct(std::string(name));
556             stream->init ();
557     }
558
559 #endif
560         st_time = time(NULL);
561         while (true) {
562                 if (csv_process_input() < 0) {
563                         fprintf(stderr,"%s::error:in processing records\n", device);
564                         exit(8);
565                 }
566                 /* process all messages on the message queue*/
567                 if (fta_start_service(0) < 0) {
568                         fprintf(stderr,"%s::error:in processing the msg queue\n", device);
569                         exit(9);
570                 }
571         }
572
573         return 0;
574 }