Change lfta code generation form C to C++
[com/gs-lite.git] / src / lib / gscprts / rts_csv.cc
1 /* ------------------------------------------------
2  Copyright 2014 AT&T Intellectual Property
3  Licensed under the Apache License, Version 2.0 (the "License");
4  you may not use this file except in compliance with the License.
5  You may obtain a copy of the License at
6
7  http://www.apache.org/licenses/LICENSE-2.0
8
9  Unless required by applicable law or agreed to in writing, software
10  distributed under the License is distributed on an "AS IS" BASIS,
11  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  See the License for the specific language governing permissions and
13  limitations under the License.
14  ------------------------------------------- */
15 #include <time.h>
16 #include <stdlib.h>
17 #include <string.h>
18 #include <unistd.h>
19 #include <fcntl.h>
20 #include <sys/time.h>
21 #include <sys/stat.h>
22 #include <sys/types.h>
23 #include <sys/socket.h>
24 #include <netinet/in.h>
25 #include <zlib.h>
26 #include <errno.h>
27 #include <stdio.h>
28 #include <dirent.h>
29
30
31 extern "C" {
32 #include "gsconfig.h"
33 #include "gshub.h"
34 #include "gstypes.h"
35 #include "lapp.h"
36 #include "fta.h"
37 #include "packet.h"
38 #include "schemaparser.h"
39 #include "lfta/rts.h"
40
41
42 void fta_init(gs_sp_t device);
43 void rts_fta_process_packet(struct packet * p);
44 void rts_fta_done();
45 }
46
47 time_t st_time;
48
49 #define CSVMAXLINE 1000000
50
51 #define CHUNK 262144
52 static gs_uint8_t in[CHUNK + CSVMAXLINE];
53 static gs_uint8_t out[CHUNK + CSVMAXLINE];
54
55 #define FILEWAIT_TIMEOUT 10000          // timeout value for getting next file (in microseconds)
56
57 gs_uint32_t max_field_csv = CSVELEMENTS;
58
59 z_stream strm;
60
61 #ifdef BSA_ENABLED
62 #include "bsa_stream.hpp"
63 #include "bsa_util.hpp"
64 BSA::FileStream::ISubStream* stream;
65 BSA::FileStream::IFileHandle* ifh;
66 BSA::FileStream::Reader* reader;
67
68 #endif
69
70 #ifdef SSL_ENABLED
71 #include <openssl/pem.h>
72 #include <openssl/x509.h>
73 #include <openssl/x509v3.h>
74 #include <openssl/ssl.h>
75 #include <openssl/crypto.h>
76 #include <openssl/err.h>
77
78 EVP_PKEY *rkey;
79 PKCS7 *p7;
80 BIO* mem_io;
81 char pwd[CSVMAXLINE];
82
83 // callback for passing password to private key reader
84 int pass_cb(char *buf, int size, int rwflag, void *u) {
85     int len = strlen(pwd);
86     memcpy(buf, pwd, len);
87     return len;
88 }
89
90 #endif
91
92 gs_sp_t dev;
93
94 static int listensockfd=-1;
95 static int fd=-1;
96 static struct packet cur_packet;
97 static gs_sp_t name;
98 static gs_sp_t dir_name;
99 struct dirent **namelist;
100 static gs_int32_t num_dir_files;
101 static gs_sp_t line;
102 static ssize_t len;
103 static size_t line_len;
104 static gs_uint32_t lineend=0;
105 static gs_uint8_t csvdel = ',';
106 static gs_uint32_t verbose=0;
107 static gs_uint32_t startupdelay=0;
108 static gs_uint32_t singlefile=0;
109 static gs_uint32_t use_gzip=0;
110 static gs_uint32_t use_bsa=0;
111 static gs_uint32_t use_decryption=0;
112 static gs_uint32_t gshub=0;
113 static int socket_desc=0;
114
115 #include "lfta/csv_parser.h"
116
117 // leftover bytes not consumed at the end of the data chunk
118  gs_uint32_t leftover = 0;
119
120  uint64_t get_posix_clock_time ()
121 {
122     struct timespec ts;
123
124     if (clock_gettime (CLOCK_MONOTONIC, &ts) == 0)
125         return (uint64_t) (ts.tv_sec * 1000 + ts.tv_nsec / 1000000);
126     else
127         return 0;
128 }
129
130
131 static void init_inflate() {
132         gs_int32_t ret;
133
134     /* allocate inflate state */
135     strm.zalloc = Z_NULL;
136     strm.zfree = Z_NULL;
137     strm.opaque = Z_NULL;
138     strm.avail_in = 0;
139     strm.next_in = Z_NULL;
140     ret = inflateInit2(&strm, 15 /* window bits */ | 32 /* use gzip */);
141     if (ret != Z_OK) {
142                 print_error((gs_sp_t)"csv::inflateInit2");
143                 exit(10);
144     }
145 }
146
147 static void csv_replay_check_messages() {
148   if (fta_start_service(0)<0) {
149                 print_error((gs_sp_t)"Error:in processing the msg queue for a replay file");
150                 exit(9);
151   }
152 }
153
154 static gs_int32_t read_chunk_socket(gs_sp_t buffer, gs_uint32_t length){
155         gs_uint32_t r;
156         fd_set socket_rset;
157         fd_set socket_eset;
158         struct timeval socket_timeout;
159         gs_int32_t retval;
160
161         FD_ZERO(&socket_rset);
162         FD_SET(socket_desc,&socket_rset);
163         FD_ZERO(&socket_eset);
164         FD_SET(socket_desc,&socket_eset);
165         // timeout in one millisecond
166         socket_timeout.tv_sec = 0;
167         socket_timeout.tv_usec = 1000;
168
169         if ((retval = select(socket_desc+1,&socket_rset,0,&socket_eset,&socket_timeout))<=0) {
170                 if (retval==0) {
171                         // caught a timeout
172                         return -1;
173                 }
174                 return -2;
175         }
176
177         if ((r=read(socket_desc, buffer + leftover, length)) <= 0) {
178                 print_error((gs_sp_t)"ERROR:could not read data from csv stream");
179                 return -2;
180         }
181
182         return r;
183 }
184
185 static void init_socket() {
186         endpoint gshub;
187         endpoint srcinfo;
188         struct sockaddr_in server;
189         gs_int32_t parserversion;
190         gs_uint32_t schemalen;
191         static gs_sp_t asciischema=0;
192         gs_int8_t buf[1024];
193
194         if (get_hub(&gshub)!=0) {
195                 print_error((gs_sp_t)"ERROR:could not find gshub for data source");
196                 exit(0);
197         }
198
199         if (get_streamsource(gshub,name,&srcinfo,1) !=0) {
200                 print_error((gs_sp_t)"ERROR:could not find data source for stream\n");
201                 exit(0);
202         }
203
204         socket_desc = socket(AF_INET , SOCK_STREAM , 0);
205         if (socket_desc == -1)
206         {
207                 print_error((gs_sp_t)"ERROR:could not create socket for data stream");
208                 exit(0);
209         }
210         server.sin_addr.s_addr = srcinfo.ip;
211         server.sin_family = AF_INET;
212         server.sin_port = srcinfo.port;
213
214         if (connect(socket_desc , (struct sockaddr *)&server , sizeof(server)) < 0)  {
215                 print_error((gs_sp_t)"ERROR: could not open connection to data source");
216                 exit(0);
217         }
218 }
219
220 static void next_file() {
221
222         static gs_uint32_t file_pos = 0;
223         static gs_uint32_t scan_finished = 0;
224
225         char buf[CSVMAXLINE];
226
227         if (dir_name) {
228                 if (scan_finished) {
229                         if (verbose)
230                                 fprintf(stderr,"Done processing, waiting for things to shut down\n");
231                         rts_fta_done();
232                         // now just service message queue until we get killed or loose connectivity
233                         while (true) {
234                                 fta_start_service(0); // service all waiting messages
235                                 usleep(1000); // sleep a millisecond
236                         }
237                 }
238                 if (num_dir_files) {            // we already started directory scan
239                         free(name);
240                         if (file_pos < num_dir_files) {
241                                 sprintf(buf, "%s/%s", dir_name, namelist[file_pos]->d_name);
242                                 name = strdup(buf);
243                                 free(namelist[file_pos]);
244                                 file_pos++;
245                         } else {
246                                 free(namelist);
247                                 scan_finished = 1;
248                                 return;
249                         }
250                 } else {
251                         num_dir_files = scandir(dir_name, &namelist, NULL, alphasort);
252                         if (num_dir_files == -1) {
253                                 num_dir_files = 0;
254                                 print_error((gs_sp_t)"ERROR: Unable to scan directory");
255                                 return;
256                         }
257                         if (num_dir_files == 2) {       // only . and . are there, empty dir
258                                 free(namelist[0]);
259                                 free(namelist[1]);
260                                 scan_finished = 1;
261                                 return;
262                         } else
263                                 file_pos = 2;
264                         
265                         sprintf(buf, "%s/%s", dir_name, namelist[file_pos]->d_name);
266                         name = strdup(buf);
267                         free(namelist[file_pos]);
268                         file_pos++;
269                 }
270         }
271
272         struct stat s;
273         if (verbose) {
274                 fprintf(stderr,"Opening %s\n",name);
275         }
276         if (singlefile == 0) {
277                 while (lstat(name, &s) != 0) {
278                         if (errno != ENOENT) {
279                                 print_error((gs_sp_t)"csv::lstat unexpected return value");
280                                 exit(10);
281                         }
282                         csv_replay_check_messages();
283                         usleep(FILEWAIT_TIMEOUT);
284                 }
285                 if      (fd > 0) {
286                         close(fd);
287                 }
288         }
289         if ((fd = open(name, O_RDONLY)) < 0) {
290                 print_error((gs_sp_t)"csv::open failed ");
291                 exit(10);
292         }
293         posix_fadvise(fd, 0, 0, POSIX_FADV_SEQUENTIAL);
294
295 #ifdef SSL_ENABLED
296         if (use_decryption) {
297                 // free SSL resources
298                 if (mem_io)
299                         BIO_free(mem_io);
300                 if (p7)
301                         PKCS7_free(p7);
302
303                 FILE *fp = fdopen(fd, "r");
304                 p7 = d2i_PKCS7_fp(fp, NULL);
305         if (p7 == NULL) {
306                 print_error((gs_sp_t)"Error reading SMIME message from file");
307                 exit(-1);
308         }
309
310         if(!(mem_io = PKCS7_dataDecode(p7, rkey, NULL, NULL))) {
311                 print_error((gs_sp_t)"Error decoding PKCS7 file\n");
312                 exit(-1);
313         }
314
315                 fclose(fp);
316         }
317 #endif  
318         if (!dir_name && !singlefile) {
319                 unlink(name);
320         }
321         if (use_gzip) {
322                 init_inflate();
323         }
324 }
325
326 #ifdef BSA_ENABLED
327
328 uint64_t bsa_file_start_time = 0;
329 uint64_t bsa_total_elapsed_time = 0;
330
331 static void next_file_bsa() {
332     int ret;
333
334         if (bsa_file_start_time) {
335                 bsa_total_elapsed_time += (get_posix_clock_time()- bsa_file_start_time);
336                 bsa_file_start_time = 0;
337         }
338
339     ifh = stream->getNextFileHandle(FILEWAIT_TIMEOUT / 1000);
340     if (!ifh) {
341         return;
342         }
343         if (verbose) {
344                 fprintf(stderr,"%s: Opening %s %s\n", dev, ifh->getHandle().c_str(), stream->getPositionHandle().c_str());
345         }
346         bsa_file_start_time = get_posix_clock_time();
347     reader = ifh->openFile();
348
349         if (use_gzip) {
350                 init_inflate();
351         }
352 }
353
354 static void close_file_bsa() {
355
356         if (reader) {
357                 reader->close();
358                 delete reader;
359         }
360         reader = NULL;
361
362         if (ifh) {
363         ifh->finished();
364                 delete ifh;
365         }
366         ifh = NULL;
367 }
368
369 #endif
370
371 static gs_retval_t csv_replay_init(gs_sp_t device)
372 {
373         gs_csp_t verbosetmp;
374         gs_csp_t delaytmp;
375         gs_csp_t gshubtmp;
376         gs_csp_t tempdel;
377         gs_csp_t singlefiletmp;
378         gs_csp_t compressortmp;
379         gs_csp_t bsatmp;  
380         gs_csp_t encryptedtmp;  
381         gs_csp_t maxfieldtmp;
382
383         gs_csp_t pkey_fname;  
384         gs_csp_t pwd_fname;             
385
386         gs_csp_t stringtmp;
387
388
389         if ((verbosetmp=get_iface_properties(device,(gs_sp_t)"verbose"))!=0) {
390                 if (strncmp(verbosetmp,"TRUE",4)==0) {
391                         verbose=1;
392                         fprintf(stderr,"VERBOSE ENABLED\n");
393                 } else {
394                         fprintf(stderr,"VERBOSE DISABLED\n");
395                 }
396         }
397
398         stringtmp=get_iface_properties(device,(gs_sp_t)"filename");
399         if(stringtmp){
400                 name = strdup(stringtmp);
401         }else{
402                 name = NULL;
403         }
404         stringtmp=get_iface_properties(device,(gs_sp_t)"directoryname");
405         if(stringtmp){
406                 dir_name = strdup(stringtmp);
407         }else{
408                 dir_name = NULL;
409         }
410         if (!name && !dir_name) {
411                 print_error((gs_sp_t)"csv_replay_init::Either \"Filename\" or \"Dirname\" must be defined");
412                 exit(0);
413         }
414
415         tempdel=get_iface_properties(device,(gs_sp_t)"csvseparator");
416         if (tempdel != 0 ) {
417                 csvdel = tempdel[0];
418                 csv_set_delim(csvdel);
419         }
420
421         if ((singlefiletmp=get_iface_properties(device,(gs_sp_t)"singlefile"))!=0) {
422                 if (strncmp(singlefiletmp,"TRUE",4)==0) {
423                         singlefile=1;
424                         if (verbose)
425                                 fprintf(stderr,"SINGLEFILE ENABLED\n");
426                 } else {
427                         if (verbose)
428                                 fprintf(stderr,"SINGLEFILE DISABLED\n");
429                 }
430         }
431
432         if ((compressortmp=get_iface_properties(device,(gs_sp_t)"compressor"))!=0) {
433                 if (strncmp(compressortmp,"GZIP",4)==0) {
434                         use_gzip=1;
435                         if (verbose)
436                                 fprintf(stderr,"USING ZLIP COMPRESSOR ENABLED\n");
437                 } else {
438                         print_error((gs_sp_t)"csv_replay_init::Unknown value for interface property \"Compressor\"");
439                         exit(0);
440                 }
441         }
442
443         if ((bsatmp=get_iface_properties(device,(gs_sp_t)"bsa"))!=0) {
444                 if (strncmp(bsatmp,"TRUE",4)==0) {
445                         #ifndef BSA_ENABLED
446                                 print_error((gs_sp_t)"csv_replay_init::runtime not built with BSA support to use BSA interfaces");              
447                                 exit(0);                                                
448                         #endif
449
450                         use_bsa=1;
451                         if (verbose)
452                                 fprintf(stderr,"USING BSA STREAMS\n");
453                 } 
454         }    
455
456         if ((delaytmp=get_iface_properties(device,(gs_sp_t)"startupdelay"))!=0) {
457                 if (verbose) {
458                                 fprintf(stderr,"Startup delay of %u seconds\n",atoi(delaytmp));
459                 }
460                 startupdelay=atoi(delaytmp);
461         }
462
463         if ((maxfieldtmp=get_iface_properties(device,(gs_sp_t)"_max_csv_pos"))!=0) {
464                 max_field_csv=atoi(maxfieldtmp);
465         }       
466
467         if ((gshubtmp=get_iface_properties(device,(gs_sp_t)"gshub"))!=0) {
468                 if (verbose) {
469                                 fprintf(stderr,"CSV format using gshub\n");
470                 }
471                 gshub=1;
472                 if (!name) {
473                         print_error((gs_sp_t)"csv_replay_init::Filename must be defined for gshub interfaces");
474                         exit(0);                        
475                 }
476         }
477
478         pkey_fname=get_iface_properties(device,(gs_sp_t)"privatekey");
479         pwd_fname=get_iface_properties(device,(gs_sp_t)"password");
480
481         if ((encryptedtmp=get_iface_properties(device,(gs_sp_t)"encrypted"))!=0) {
482                 if (strncmp(encryptedtmp,"TRUE",4)==0) {
483                         #ifndef SSL_ENABLED
484                                 print_error((gs_sp_t)"csv_replay_init::runtime not built with SSL support to use encrypted interfaces");                
485                                 exit(0);
486                         #else
487                                 use_decryption=1;
488                                 if (verbose) {
489                                         fprintf(stderr,"CSV file is encrypted\n");
490                                 }
491                                 if (!pkey_fname || !pwd_fname) {
492                                         print_error((gs_sp_t)"csv_replay_init::privatekey and/or password filenames not specified for encrypted itnerface");            
493                                         exit(0);
494                                 }
495
496                                 OpenSSL_add_all_algorithms();
497                                 ERR_load_crypto_strings();
498
499                                 // Read password file
500                                 FILE* in_fd = fopen(pwd_fname, "r");
501                                 if (!in_fd) {
502                                         fprintf(stderr, "Unable to open password file %s\n", pwd_fname);
503                                         exit(0);        
504                                 }
505
506                                 if (!fgets(pwd, CSVMAXLINE, in_fd)) {
507                                         fprintf(stderr, "Error reading password from file %s\n", pwd_fname);
508                                         exit(0);                                        
509                                 }
510                                 strtok(pwd, "\r\n\t ");
511                                 fclose(in_fd);                  
512
513                                 // Read the private key
514                                 in_fd = fopen(pkey_fname, "r");
515                                 if (!in_fd) {
516                                         fprintf(stderr, "Unable to open private key file %s\n", pkey_fname);
517                                         exit(0);        
518                                 }
519
520                                 rkey = PEM_read_PrivateKey(in_fd, NULL, pass_cb, NULL);
521                                 if (!rkey) {
522                                         fprintf(stderr, "Unable to read private key file %s\n", pkey_fname);
523                                         exit(-1);        
524                                 }
525                                 
526                                 fclose(in_fd);
527                         #endif
528                 }
529         }
530
531         cur_packet.ptype=PTYPE_CSV;
532         return 0;
533 }
534
535 static inline int consume_chunk(gs_sp_t chunk, gs_uint32_t chunk_size) {
536     int tuple_consumed = 0;     
537         gs_sp_t linepos = chunk;
538         gs_sp_t new_linepos = (gs_sp_t)memchr(linepos + leftover, '\n', chunk_size);
539         gs_sp_t end_pos = chunk + chunk_size + leftover;
540         leftover = chunk_size;
541
542         while (new_linepos) {
543                 // *new_linepos = 0;                            // terminate the line
544                 csv_parse_line(linepos, new_linepos - linepos);
545                 rts_fta_process_packet(&cur_packet);
546         tuple_consumed++;               
547                 linepos = new_linepos + 1;
548                 leftover = end_pos - linepos;
549                 new_linepos = (gs_sp_t)memchr(linepos, '\n', leftover);
550         }
551         memcpy(chunk, linepos, leftover);
552
553     return tuple_consumed;      
554 }
555
556 static int csv_process_chunk(gs_uint32_t chunk_size)
557 {
558     gs_int32_t ret;
559     gs_uint32_t have = chunk_size;
560     gs_uint32_t tuple_consumed = 0;
561
562         if (use_gzip) {
563                 strm.avail_in = have;
564                 strm.next_in = in;
565                 /* run inflate() on input until output buffer not full */
566                 do {
567                         strm.avail_out = CHUNK;
568                         strm.next_out = out + leftover;
569                         ret = inflate(&strm, Z_NO_FLUSH);
570                         /* assert(ret != Z_STREAM_ERROR);  state not clobbered */
571                         switch (ret) {
572                                 case Z_NEED_DICT:
573                                         ret = Z_DATA_ERROR;     /* and fall through */
574                                 case Z_DATA_ERROR:
575                                 case Z_MEM_ERROR:
576                                         (void)inflateEnd(&strm);
577 #ifdef BSA_ENABLED      
578                                         close_file_bsa();
579 #endif
580                                         fprintf(stderr,"Error inflating data chunk\n");
581                                         return 0;
582                         }
583                         have = CHUNK - strm.avail_out;
584                         tuple_consumed += consume_chunk((gs_sp_t)out, have);
585                 } while (strm.avail_out == 0);
586                 /* done when inflate() says it's done */
587
588                 if (ret == Z_STREAM_END) {
589                         inflateEnd(&strm);
590 #ifdef BSA_ENABLED      
591                         close_file_bsa();
592 #endif                         
593         }
594         } else {
595                 tuple_consumed += consume_chunk((gs_sp_t)out, have);
596         }
597     
598     return tuple_consumed;
599 }
600
601 static gs_int32_t csv_read_chunk() {
602
603         gs_int32_t have;
604
605         if (gshub!=0) {
606                 return read_chunk_socket((gs_sp_t)out, CHUNK);
607         } else {
608                 gs_sp_t read_pos = (gs_sp_t)(use_gzip ? in : (out + leftover));
609
610 #ifdef BSA_ENABLED
611                 if (use_bsa) {
612                         if (ifh == 0) next_file_bsa();
613                         if (ifh == 0)           // if no new files available return
614                                 return -1;              // -1 indicates a timeout
615
616                 while ((have = reader->read(read_pos, CHUNK)) == 0) {
617                                 close_file_bsa();
618
619                         next_file_bsa();
620     
621                         if (ifh == 0) { // if no new files available return
622                         return -1;      // -1 indicates a timeout
623                                 }
624                 }
625                 } else {
626 #endif
627                 if (fd <= 0) next_file();
628                         
629 #ifdef SSL_ENABLED
630                 if (use_decryption) {
631
632                 while ((have = BIO_read (mem_io, read_pos, CHUNK)) == 0) {
633                                 if (singlefile==1) {
634                                         if(verbose) {
635                                                 fprintf(stderr,"SINGLEFILE PROCESSING DONE! RTS SAYS BYE\n");
636                                         }
637                                         return -2;
638
639                                 } else {
640                                         next_file();
641                                 }
642                 }
643
644                 }       else {                  
645 #endif
646                 while ((have = read(fd, read_pos, CHUNK)) == 0) {
647                         if (singlefile==1) {
648                                 if(verbose) {
649                                         fprintf(stderr,"SINGLEFILE PROCESSING DONE! RTS SAYS BYE\n");
650                                 }
651                                 return -2;
652
653                         } else {
654                                 next_file();
655                         }
656                 }
657 #ifdef SSL_ENABLED              
658                 }
659 #endif
660 #ifdef BSA_ENABLED              
661                 }
662 #endif
663         }
664         return have;
665 }
666
667 static gs_retval_t csv_process_input()
668 {
669         unsigned cnt = 0;
670         static unsigned totalcnt = 0;
671
672         gs_int32_t retval;
673         while(cnt < 50000) {                    // process up to 50000 tuples at a time
674                 retval = csv_read_chunk();
675                 if (retval == -1) return 0; // got a timeout so service message queue
676                 if (retval == -2) {
677                         // we signal that everything is done
678                         if (verbose)
679                                 fprintf(stderr,"Done processing, waiting for things to shut down\n");
680                         rts_fta_done();
681                         // now just service message queue until we get killed or loose connectivity
682                         while (true) {
683                                 fta_start_service(0); // service all waiting messages
684                                 usleep(1000); // sleep a millisecond
685                         }
686                 }
687                 cnt += csv_process_chunk((gs_uint32_t)retval);
688         }
689         totalcnt = totalcnt + cnt;
690         if (verbose) {
691 #ifdef BSA_ENABLED              
692                 fprintf(stderr,"%s: Processed %u tuples, rate = %lf tup/sec\n", dev, totalcnt, 1000.0 * (double)totalcnt / (double)bsa_total_elapsed_time);
693 #else
694                 fprintf(stderr,"Processed %u tuples, rate = %lf tup/sec\n", totalcnt, (double)totalcnt / (double)(time(NULL) - st_time));
695 #endif
696         }
697         return 0;
698 }
699
700 extern "C" gs_retval_t main_csv(gs_int32_t devicenum, gs_sp_t device, gs_int32_t mapcnt, gs_sp_t map[]) {
701         gs_uint32_t cont;
702         endpoint mygshub;
703
704     dev = device;
705
706         csv_replay_init(device);
707
708         /* initalize host_lib */
709         if (verbose) {
710                 fprintf(stderr,"Init LFTAs for %s\n",device);
711         }
712
713         if (hostlib_init(LFTA,0,devicenum,mapcnt,map) < 0) {
714                 fprintf(stderr,"%s::error:could not initiate host lib for clearinghouse\n",
715                         device);
716                 exit(7);
717         }
718
719         fta_init(device); /*xxx probably should get error code back put Ted doesn't give me one*/
720
721         // set maximum field nubmer to be extracted by csv parser
722         csv_set_maxfield(max_field_csv);
723
724         cont = startupdelay + time(0);
725
726         if (verbose) { fprintf(stderr,"Start startup delay"); }
727
728         while (cont > time(NULL)) {
729                 if (fta_start_service(0) < 0) {
730                         fprintf(stderr,"%s::error:in processing the msg queue\n", device);
731                         exit(9);
732                 }
733                 usleep(1000); /* sleep for one millisecond */
734         }
735
736         if (verbose) { fprintf(stderr,"... Done\n"); }
737
738         // open the connection to the data source
739         if (gshub != 0) { init_socket();}
740
741         // wait to process till we get the signal from GSHUB
742         if (get_hub(&mygshub) != 0) {
743                 print_error((gs_sp_t)"ERROR:could not find gshub for data source");
744                 exit(0);
745         }
746         while(get_startprocessing(mygshub,get_instance_name(),0) != 0) {
747                 usleep(100);
748                 if (fta_start_service(0) < 0) {
749                         fprintf(stderr,"%s::error:in processing the msg queue\n", device);
750                         exit(9);
751                 }
752         }
753
754         /* now we enter an endless loop to process data */
755         if (verbose) {
756                 fprintf(stderr,"Start processing %s\n",device);
757         }
758
759 #ifdef BSA_ENABLED
760     if (use_bsa) {
761             stream = BSA::FileStream::ISubStream::construct(std::string(name));
762             stream->init ();
763     }
764
765 #endif
766         st_time = time(NULL);
767         while (true) {
768                 if (csv_process_input() < 0) {
769                         fprintf(stderr,"%s::error:in processing records\n", device);
770                         exit(8);
771                 }
772                 /* process all messages on the message queue*/
773                 if (fta_start_service(0) < 0) {
774                         fprintf(stderr,"%s::error:in processing the msg queue\n", device);
775                         exit(9);
776                 }
777         }
778
779         return 0;
780 }