ed7403a52ae3611ed6a676cb17326de12ff71d98
[com/gs-lite.git] / src / lib / gscprts / rts_csv.cc
1 /* ------------------------------------------------
2  Copyright 2014 AT&T Intellectual Property
3  Licensed under the Apache License, Version 2.0 (the "License");
4  you may not use this file except in compliance with the License.
5  You may obtain a copy of the License at
6
7  http://www.apache.org/licenses/LICENSE-2.0
8
9  Unless required by applicable law or agreed to in writing, software
10  distributed under the License is distributed on an "AS IS" BASIS,
11  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  See the License for the specific language governing permissions and
13  limitations under the License.
14  ------------------------------------------- */
15 #include <time.h>
16 #include <stdlib.h>
17 #include <string.h>
18 #include <unistd.h>
19 #include <fcntl.h>
20 #include <sys/time.h>
21 #include <sys/stat.h>
22 #include <sys/types.h>
23 #include <sys/socket.h>
24 #include <netinet/in.h>
25 #include <zlib.h>
26 #include <errno.h>
27 #include <stdio.h>
28 #include <dirent.h>
29
30
31 extern "C" {
32 #include "gsconfig.h"
33 #include "gshub.h"
34 #include "gstypes.h"
35 #include "lapp.h"
36 #include "fta.h"
37 #include "packet.h"
38 #include "schemaparser.h"
39 #include "lfta/rts.h"
40
41
42 void fta_init(gs_sp_t device);
43 void rts_fta_process_packet(struct packet * p);
44 void rts_fta_done();
45 }
46
47 time_t st_time;
48
49 #define CSVMAXLINE 1000000
50
51 #define CHUNK 262144
52 static gs_uint8_t in[CHUNK + CSVMAXLINE];
53 static gs_uint8_t out[CHUNK + CSVMAXLINE];
54
55 #define FILEWAIT_TIMEOUT 10000          // timeout value for getting next file (in microseconds)
56
57 gs_uint32_t max_field_csv = CSVELEMENTS;
58
59 z_stream strm;
60
61 #ifdef BSA_ENABLED
62 #include "bsa_stream.hpp"
63 #include "bsa_util.hpp"
64 BSA::FileStream::ISubStream* stream;
65 BSA::FileStream::IFileHandle* ifh;
66 BSA::FileStream::Reader* reader;
67
68 #endif
69
70 #ifdef SSL_ENABLED
71 #include <openssl/pem.h>
72 #include <openssl/x509.h>
73 #include <openssl/x509v3.h>
74 #include <openssl/ssl.h>
75 #include <openssl/crypto.h>
76 #include <openssl/err.h>
77
78 EVP_PKEY *rkey;
79 PKCS7 *p7;
80 BIO* mem_io;
81 char pwd[CSVMAXLINE];
82
83 // callback for passing password to private key reader
84 int pass_cb(char *buf, int size, int rwflag, void *u) {
85     int len = strlen(pwd);
86     memcpy(buf, pwd, len);
87     return len;
88 }
89
90 #endif
91
92 gs_sp_t dev;
93
94 static int listensockfd=-1;
95 static int fd=-1;
96 static struct packet cur_packet;
97 static gs_sp_t name;
98 static gs_sp_t dir_name;
99 struct dirent **namelist;
100 static gs_int32_t num_dir_files;
101 static gs_sp_t line;
102 static ssize_t len;
103 static size_t line_len;
104 static gs_uint32_t lineend=0;
105 static gs_uint8_t csvdel = ',';
106 static gs_uint32_t verbose=0;
107 static gs_uint32_t startupdelay=0;
108 static gs_uint32_t singlefile=0;
109 static gs_uint32_t use_gzip=0;
110 static gs_uint32_t use_bsa=0;
111 static gs_uint32_t use_decryption=0;
112 static gs_uint32_t gshub=0;
113 static int socket_desc=0;
114
115 #include "lfta/csv_parser.h"
116
117 // leftover bytes not consumed at the end of the data chunk
118  gs_uint32_t leftover = 0;
119
120  uint64_t get_posix_clock_time ()
121 {
122     struct timespec ts;
123
124     if (clock_gettime (CLOCK_MONOTONIC, &ts) == 0)
125         return (uint64_t) (ts.tv_sec * 1000 + ts.tv_nsec / 1000000);
126     else
127         return 0;
128 }
129
130
131 static void init_inflate() {
132         gs_int32_t ret;
133
134     /* allocate inflate state */
135     strm.zalloc = Z_NULL;
136     strm.zfree = Z_NULL;
137     strm.opaque = Z_NULL;
138     strm.avail_in = 0;
139     strm.next_in = Z_NULL;
140     ret = inflateInit2(&strm, 15 /* window bits */ | 32 /* use gzip */);
141     if (ret != Z_OK) {
142                 print_error((gs_sp_t)"csv::inflateInit2");
143                 exit(10);
144     }
145 }
146
147 static void csv_replay_check_messages() {
148   if (fta_start_service(0)<0) {
149                 print_error((gs_sp_t)"Error:in processing the msg queue for a replay file");
150                 exit(9);
151   }
152 }
153
154 static gs_int32_t read_chunk_socket(gs_sp_t buffer, gs_uint32_t length){
155         gs_uint32_t r;
156         fd_set socket_rset;
157         fd_set socket_eset;
158         struct timeval socket_timeout;
159         gs_int32_t retval;
160
161         FD_ZERO(&socket_rset);
162         FD_SET(socket_desc,&socket_rset);
163         FD_ZERO(&socket_eset);
164         FD_SET(socket_desc,&socket_eset);
165         // timeout in one millisecond
166         socket_timeout.tv_sec = 0;
167         socket_timeout.tv_usec = 1000;
168
169         if ((retval = select(socket_desc+1,&socket_rset,0,&socket_eset,&socket_timeout))<=0) {
170                 if (retval==0) {
171                         // caught a timeout
172                         return -1;
173                 }
174                 return -2;
175         }
176
177         if ((r=read(socket_desc, buffer + leftover, length)) <= 0) {
178                 print_error((gs_sp_t)"ERROR:could not read data from csv stream");
179                 return -2;
180         }
181
182         return r;
183 }
184
185 static void init_socket() {
186         endpoint gshub;
187         endpoint srcinfo;
188         struct sockaddr_in server;
189         gs_int32_t parserversion;
190         gs_uint32_t schemalen;
191         static gs_sp_t asciischema=0;
192         gs_int8_t buf[1024];
193
194         if (get_hub(&gshub)!=0) {
195                 print_error((gs_sp_t)"ERROR:could not find gshub for data source");
196                 exit(0);
197         }
198
199         if (get_streamsource(gshub,name,&srcinfo,1) !=0) {
200                 print_error((gs_sp_t)"ERROR:could not find data source for stream\n");
201                 exit(0);
202         }
203
204         socket_desc = socket(AF_INET , SOCK_STREAM , 0);
205         if (socket_desc == -1)
206         {
207                 print_error((gs_sp_t)"ERROR:could not create socket for data stream");
208                 exit(0);
209         }
210         server.sin_addr.s_addr = srcinfo.ip;
211         server.sin_family = AF_INET;
212         server.sin_port = srcinfo.port;
213
214         if (connect(socket_desc , (struct sockaddr *)&server , sizeof(server)) < 0)  {
215                 print_error((gs_sp_t)"ERROR: could not open connection to data source");
216                 exit(0);
217         }
218 }
219
220 static void next_file() {
221
222         static gs_uint32_t file_pos = 0;
223         static gs_uint32_t scan_finished = 0;
224
225         char buf[CSVMAXLINE];
226
227         if (dir_name) {
228                 if (scan_finished) {
229                         if (verbose)
230                                 fprintf(stderr,"Done processing, waiting for things to shut down\n");
231                         rts_fta_done();
232                         // now just service message queue until we get killed or loose connectivity
233                         while (true) {
234                                 fta_start_service(0); // service all waiting messages
235                                 usleep(1000); // sleep a millisecond
236                         }
237                 }
238                 if (num_dir_files) {            // we already started directory scan
239                         free(name);
240                         if (file_pos < num_dir_files) {
241                                 sprintf(buf, "%s/%s", dir_name, namelist[file_pos]->d_name);
242                                 name = strdup(buf);
243                                 free(namelist[file_pos]);
244                                 file_pos++;
245                         } else {
246                                 free(namelist);
247                                 scan_finished = 1;
248                                 return;
249                         }
250                 } else {
251                         num_dir_files = scandir(dir_name, &namelist, NULL, alphasort);
252                         if (num_dir_files == -1) {
253                                 num_dir_files = 0;
254                                 print_error((gs_sp_t)"ERROR: Unable to scan directory");
255                                 return;
256                         }
257                         if (num_dir_files == 2) {       // only . and . are there, empty dir
258                                 free(namelist[0]);
259                                 free(namelist[1]);
260                                 scan_finished = 1;
261                                 return;
262                         } else
263                                 file_pos = 2;
264                         
265                         sprintf(buf, "%s/%s", dir_name, namelist[file_pos]->d_name);
266                         name = strdup(buf);
267                         free(namelist[file_pos]);
268                         file_pos++;
269                 }
270         }
271
272         struct stat s;
273         if (verbose) {
274                 fprintf(stderr,"Opening %s\n",name);
275         }
276         if (singlefile == 0) {
277                 while (lstat(name, &s) != 0) {
278                         if (errno != ENOENT) {
279                                 print_error((gs_sp_t)"csv::lstat unexpected return value");
280                                 exit(10);
281                         }
282                         csv_replay_check_messages();
283                         usleep(FILEWAIT_TIMEOUT);
284                 }
285                 if      (fd > 0) {
286                         close(fd);
287                 }
288         }
289         if ((fd = open(name, O_RDONLY)) < 0) {
290                 print_error((gs_sp_t)"csv::open failed ");
291                 exit(10);
292         }
293         posix_fadvise(fd, 0, 0, POSIX_FADV_SEQUENTIAL);
294
295 #ifdef SSL_ENABLED
296         if (use_decryption) {
297                 // free SSL resources
298                 if (mem_io)
299                         BIO_free(mem_io);
300                 if (p7)
301                         PKCS7_free(p7);
302
303                 FILE *fp = fdopen(fd, "r");
304                 p7 = d2i_PKCS7_fp(fp, NULL);
305         if (p7 == NULL) {
306                 print_error((gs_sp_t)"Error reading SMIME message from file");
307                 exit(-1);
308         }
309
310         if(!(mem_io = PKCS7_dataDecode(p7, rkey, NULL, NULL))) {
311                 print_error((gs_sp_t)"Error decoding PKCS7 file\n");
312                 exit(-1);
313         }
314
315                 fclose(fp);
316         }
317 #endif  
318         if (!dir_name && !singlefile) {
319                 unlink(name);
320         }
321         if (use_gzip) {
322                 init_inflate();
323         }
324 }
325
326 #ifdef BSA_ENABLED
327
328 uint64_t bsa_file_start_time = 0;
329 uint64_t bsa_total_elapsed_time = 0;
330
331 static void next_file_bsa() {
332     int ret;
333
334         if (bsa_file_start_time) {
335                 bsa_total_elapsed_time += (get_posix_clock_time()- bsa_file_start_time);
336                 bsa_file_start_time = 0;
337         }
338
339     ifh = stream->getNextFileHandle(FILEWAIT_TIMEOUT / 1000);
340     if (!ifh) {
341         return;
342         }
343         if (verbose) {
344                 fprintf(stderr,"%s: Opening %s %s\n", dev, ifh->getHandle().c_str(), stream->getPositionHandle().c_str());
345         }
346         bsa_file_start_time = get_posix_clock_time();
347     reader = ifh->openFile();
348
349         if (use_gzip) {
350                 init_inflate();
351         }
352 }
353
354 static void close_file_bsa() {
355
356         if (reader) {
357                 reader->close();
358                 delete reader;
359         }
360         reader = NULL;
361
362         if (ifh) {
363         ifh->finished();
364                 delete ifh;
365         }
366         ifh = NULL;
367 }
368
369 #endif
370
371 static gs_retval_t csv_replay_init(gs_sp_t device)
372 {
373         gs_sp_t verbosetmp;
374         gs_sp_t delaytmp;
375         gs_sp_t gshubtmp;
376         gs_sp_t tempdel;
377         gs_sp_t singlefiletmp;
378         gs_sp_t compressortmp;
379         gs_sp_t bsatmp;  
380         gs_sp_t encryptedtmp;  
381         gs_sp_t maxfieldtmp
382
383         gs_sp_t pkey_fname;  
384         gs_sp_t pwd_fname;              
385
386
387         if ((verbosetmp=get_iface_properties(device,(gs_sp_t)"verbose"))!=0) {
388                 if (strncmp(verbosetmp,"TRUE",4)==0) {
389                         verbose=1;
390                         fprintf(stderr,"VERBOSE ENABLED\n");
391                 } else {
392                         fprintf(stderr,"VERBOSE DISABLED\n");
393                 }
394         }
395
396         name=get_iface_properties(device,(gs_sp_t)"filename");
397         dir_name=get_iface_properties(device,(gs_sp_t)"directoryname");
398         if (!name && !dir_name) {
399                 print_error((gs_sp_t)"csv_replay_init::Either \"Filename\" or \"Dirname\" must be defined");
400                 exit(0);
401         }
402
403         tempdel=get_iface_properties(device,(gs_sp_t)"csvseparator");
404         if (tempdel != 0 ) {
405                 csvdel = tempdel[0];
406                 csv_set_delim(csvdel);
407         }
408
409         if ((singlefiletmp=get_iface_properties(device,(gs_sp_t)"singlefile"))!=0) {
410                 if (strncmp(singlefiletmp,"TRUE",4)==0) {
411                         singlefile=1;
412                         if (verbose)
413                                 fprintf(stderr,"SINGLEFILE ENABLED\n");
414                 } else {
415                         if (verbose)
416                                 fprintf(stderr,"SINGLEFILE DISABLED\n");
417                 }
418         }
419
420         if ((compressortmp=get_iface_properties(device,(gs_sp_t)"compressor"))!=0) {
421                 if (strncmp(compressortmp,"GZIP",4)==0) {
422                         use_gzip=1;
423                         if (verbose)
424                                 fprintf(stderr,"USING ZLIP COMPRESSOR ENABLED\n");
425                 } else {
426                         print_error((gs_sp_t)"csv_replay_init::Unknown value for interface property \"Compressor\"");
427                         exit(0);
428                 }
429         }
430
431         if ((bsatmp=get_iface_properties(device,(gs_sp_t)"bsa"))!=0) {
432                 if (strncmp(bsatmp,"TRUE",4)==0) {
433                         #ifndef BSA_ENABLED
434                                 print_error((gs_sp_t)"csv_replay_init::runtime not built with BSA support to use BSA interfaces");              
435                                 exit(0);                                                
436                         #endif
437
438                         use_bsa=1;
439                         if (verbose)
440                                 fprintf(stderr,"USING BSA STREAMS\n");
441                 } 
442         }    
443
444         if ((delaytmp=get_iface_properties(device,(gs_sp_t)"startupdelay"))!=0) {
445                 if (verbose) {
446                                 fprintf(stderr,"Startup delay of %u seconds\n",atoi(delaytmp));
447                 }
448                 startupdelay=atoi(delaytmp);
449         }
450
451         if ((maxfieldtmp=get_iface_properties(device,(gs_sp_t)"_max_csv_pos"))!=0) {
452                 max_field_csv=atoi(maxfieldtmp);
453         }       
454
455         if ((gshubtmp=get_iface_properties(device,(gs_sp_t)"gshub"))!=0) {
456                 if (verbose) {
457                                 fprintf(stderr,"CSV format using gshub\n");
458                 }
459                 gshub=1;
460                 if (!name) {
461                         print_error((gs_sp_t)"csv_replay_init::Filename must be defined for gshub interfaces");
462                         exit(0);                        
463                 }
464         }
465
466         pkey_fname=get_iface_properties(device,(gs_sp_t)"privatekey");
467         pwd_fname=get_iface_properties(device,(gs_sp_t)"password");
468
469         if ((encryptedtmp=get_iface_properties(device,(gs_sp_t)"encrypted"))!=0) {
470                 if (strncmp(encryptedtmp,"TRUE",4)==0) {
471                         #ifndef SSL_ENABLED
472                                 print_error((gs_sp_t)"csv_replay_init::runtime not built with SSL support to use encrypted interfaces");                
473                                 exit(0);
474                         #else
475                                 use_decryption=1;
476                                 if (verbose) {
477                                         fprintf(stderr,"CSV file is encrypted\n");
478                                 }
479                                 if (!pkey_fname || !pwd_fname) {
480                                         print_error((gs_sp_t)"csv_replay_init::privatekey and/or password filenames not specified for encrypted itnerface");            
481                                         exit(0);
482                                 }
483
484                                 OpenSSL_add_all_algorithms();
485                                 ERR_load_crypto_strings();
486
487                                 // Read password file
488                                 FILE* in_fd = fopen(pwd_fname, "r");
489                                 if (!in_fd) {
490                                         fprintf(stderr, "Unable to open password file %s\n", pwd_fname);
491                                         exit(0);        
492                                 }
493
494                                 if (!fgets(pwd, CSVMAXLINE, in_fd)) {
495                                         fprintf(stderr, "Error reading password from file %s\n", pwd_fname);
496                                         exit(0);                                        
497                                 }
498                                 strtok(pwd, "\r\n\t ");
499                                 fclose(in_fd);                  
500
501                                 // Read the private key
502                                 in_fd = fopen(pkey_fname, "r");
503                                 if (!in_fd) {
504                                         fprintf(stderr, "Unable to open private key file %s\n", pkey_fname);
505                                         exit(0);        
506                                 }
507
508                                 rkey = PEM_read_PrivateKey(in_fd, NULL, pass_cb, NULL);
509                                 if (!rkey) {
510                                         fprintf(stderr, "Unable to read private key file %s\n", pkey_fname);
511                                         exit(-1);        
512                                 }
513                                 
514                                 fclose(in_fd);
515                         #endif
516                 }
517         }
518
519         cur_packet.ptype=PTYPE_CSV;
520         return 0;
521 }
522
523 static inline int consume_chunk(gs_sp_t chunk, gs_uint32_t chunk_size) {
524     int tuple_consumed = 0;     
525         gs_sp_t linepos = chunk;
526         gs_sp_t new_linepos = (gs_sp_t)memchr(linepos + leftover, '\n', chunk_size);
527         gs_sp_t end_pos = chunk + chunk_size + leftover;
528         leftover = chunk_size;
529
530         while (new_linepos) {
531                 // *new_linepos = 0;                            // terminate the line
532                 csv_parse_line(linepos, new_linepos - linepos);
533                 rts_fta_process_packet(&cur_packet);
534         tuple_consumed++;               
535                 linepos = new_linepos + 1;
536                 leftover = end_pos - linepos;
537                 new_linepos = (gs_sp_t)memchr(linepos, '\n', leftover);
538         }
539         memcpy(chunk, linepos, leftover);
540
541     return tuple_consumed;      
542 }
543
544 static int csv_process_chunk(gs_uint32_t chunk_size)
545 {
546     gs_int32_t ret;
547     gs_uint32_t have = chunk_size;
548     gs_uint32_t tuple_consumed = 0;
549
550         if (use_gzip) {
551                 strm.avail_in = have;
552                 strm.next_in = in;
553                 /* run inflate() on input until output buffer not full */
554                 do {
555                         strm.avail_out = CHUNK;
556                         strm.next_out = out + leftover;
557                         ret = inflate(&strm, Z_NO_FLUSH);
558                         /* assert(ret != Z_STREAM_ERROR);  state not clobbered */
559                         switch (ret) {
560                                 case Z_NEED_DICT:
561                                         ret = Z_DATA_ERROR;     /* and fall through */
562                                 case Z_DATA_ERROR:
563                                 case Z_MEM_ERROR:
564                                         (void)inflateEnd(&strm);
565 #ifdef BSA_ENABLED      
566                                         close_file_bsa();
567 #endif
568                                         fprintf(stderr,"Error inflating data chunk\n");
569                                         return 0;
570                         }
571                         have = CHUNK - strm.avail_out;
572                         tuple_consumed += consume_chunk((gs_sp_t)out, have);
573                 } while (strm.avail_out == 0);
574                 /* done when inflate() says it's done */
575
576                 if (ret == Z_STREAM_END) {
577                         inflateEnd(&strm);
578 #ifdef BSA_ENABLED      
579                         close_file_bsa();
580 #endif                         
581         }
582         } else {
583                 tuple_consumed += consume_chunk((gs_sp_t)out, have);
584         }
585     
586     return tuple_consumed;
587 }
588
589 static gs_int32_t csv_read_chunk() {
590
591         gs_int32_t have;
592
593         if (gshub!=0) {
594                 return read_chunk_socket((gs_sp_t)out, CHUNK);
595         } else {
596                 gs_sp_t read_pos = (gs_sp_t)(use_gzip ? in : (out + leftover));
597
598 #ifdef BSA_ENABLED
599                 if (use_bsa) {
600                         if (ifh == 0) next_file_bsa();
601                         if (ifh == 0)           // if no new files available return
602                                 return -1;              // -1 indicates a timeout
603
604                 while ((have = reader->read(read_pos, CHUNK)) == 0) {
605                                 close_file_bsa();
606
607                         next_file_bsa();
608     
609                         if (ifh == 0) { // if no new files available return
610                         return -1;      // -1 indicates a timeout
611                                 }
612                 }
613                 } else {
614 #endif
615                 if (fd <= 0) next_file();
616                         
617 #ifdef SSL_ENABLED
618                 if (use_decryption) {
619
620                 while ((have = BIO_read (mem_io, read_pos, CHUNK)) == 0) {
621                                 if (singlefile==1) {
622                                         if(verbose) {
623                                                 fprintf(stderr,"SINGLEFILE PROCESSING DONE! RTS SAYS BYE\n");
624                                         }
625                                         return -2;
626
627                                 } else {
628                                         next_file();
629                                 }
630                 }
631
632                 }       else {                  
633 #endif
634                 while ((have = read(fd, read_pos, CHUNK)) == 0) {
635                         if (singlefile==1) {
636                                 if(verbose) {
637                                         fprintf(stderr,"SINGLEFILE PROCESSING DONE! RTS SAYS BYE\n");
638                                 }
639                                 return -2;
640
641                         } else {
642                                 next_file();
643                         }
644                 }
645 #ifdef SSL_ENABLED              
646                 }
647 #endif
648 #ifdef BSA_ENABLED              
649                 }
650 #endif
651         }
652         return have;
653 }
654
655 static gs_retval_t csv_process_input()
656 {
657         unsigned cnt = 0;
658         static unsigned totalcnt = 0;
659
660         gs_int32_t retval;
661         while(cnt < 50000) {                    // process up to 50000 tuples at a time
662                 retval = csv_read_chunk();
663                 if (retval == -1) return 0; // got a timeout so service message queue
664                 if (retval == -2) {
665                         // we signal that everything is done
666                         if (verbose)
667                                 fprintf(stderr,"Done processing, waiting for things to shut down\n");
668                         rts_fta_done();
669                         // now just service message queue until we get killed or loose connectivity
670                         while (true) {
671                                 fta_start_service(0); // service all waiting messages
672                                 usleep(1000); // sleep a millisecond
673                         }
674                 }
675                 cnt += csv_process_chunk((gs_uint32_t)retval);
676         }
677         totalcnt = totalcnt + cnt;
678         if (verbose) {
679 #ifdef BSA_ENABLED              
680                 fprintf(stderr,"%s: Processed %u tuples, rate = %lf tup/sec\n", dev, totalcnt, 1000.0 * (double)totalcnt / (double)bsa_total_elapsed_time);
681 #else
682                 fprintf(stderr,"Processed %u tuples, rate = %lf tup/sec\n", totalcnt, (double)totalcnt / (double)(time(NULL) - st_time));
683 #endif
684         }
685         return 0;
686 }
687
688 extern "C" gs_retval_t main_csv(gs_int32_t devicenum, gs_sp_t device, gs_int32_t mapcnt, gs_sp_t map[]) {
689         gs_uint32_t cont;
690         endpoint mygshub;
691
692     dev = device;
693
694         csv_replay_init(device);
695
696         /* initalize host_lib */
697         if (verbose) {
698                 fprintf(stderr,"Init LFTAs for %s\n",device);
699         }
700
701         if (hostlib_init(LFTA,0,devicenum,mapcnt,map) < 0) {
702                 fprintf(stderr,"%s::error:could not initiate host lib for clearinghouse\n",
703                         device);
704                 exit(7);
705         }
706
707         fta_init(device); /*xxx probably should get error code back put Ted doesn't give me one*/
708
709         // set maximum field nubmer to be extracted by csv parser
710         csv_set_maxfield(max_field_csv);
711
712         cont = startupdelay + time(0);
713
714         if (verbose) { fprintf(stderr,"Start startup delay"); }
715
716         while (cont > time(NULL)) {
717                 if (fta_start_service(0) < 0) {
718                         fprintf(stderr,"%s::error:in processing the msg queue\n", device);
719                         exit(9);
720                 }
721                 usleep(1000); /* sleep for one millisecond */
722         }
723
724         if (verbose) { fprintf(stderr,"... Done\n"); }
725
726         // open the connection to the data source
727         if (gshub != 0) { init_socket();}
728
729         // wait to process till we get the signal from GSHUB
730         if (get_hub(&mygshub) != 0) {
731                 print_error((gs_sp_t)"ERROR:could not find gshub for data source");
732                 exit(0);
733         }
734         while(get_startprocessing(mygshub,get_instance_name(),0) != 0) {
735                 usleep(100);
736                 if (fta_start_service(0) < 0) {
737                         fprintf(stderr,"%s::error:in processing the msg queue\n", device);
738                         exit(9);
739                 }
740         }
741
742         /* now we enter an endless loop to process data */
743         if (verbose) {
744                 fprintf(stderr,"Start processing %s\n",device);
745         }
746
747 #ifdef BSA_ENABLED
748     if (use_bsa) {
749             stream = BSA::FileStream::ISubStream::construct(std::string(name));
750             stream->init ();
751     }
752
753 #endif
754         st_time = time(NULL);
755         while (true) {
756                 if (csv_process_input() < 0) {
757                         fprintf(stderr,"%s::error:in processing records\n", device);
758                         exit(8);
759                 }
760                 /* process all messages on the message queue*/
761                 if (fta_start_service(0) < 0) {
762                         fprintf(stderr,"%s::error:in processing the msg queue\n", device);
763                         exit(9);
764                 }
765         }
766
767         return 0;
768 }