Added quantiling UDAFs
[com/gs-lite.git] / src / lib / gscprts / rts_csv.c
index 98c84c1..2602b3a 100644 (file)
-/* ------------------------------------------------
- Copyright 2014 AT&T Intellectual Property
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ------------------------------------------- */
-#include <time.h>
-#include <stdlib.h>
-#include <string.h>
-#include <unistd.h>
-#include <fcntl.h>
-#include <sys/time.h>
-#include <sys/stat.h>
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include "errno.h"
-
-#include "gsconfig.h"
-#include "gshub.h"
-#include "gstypes.h"
-#include "lapp.h"
-#include "fta.h"
-#include "stdio.h"
-#include "stdlib.h"
-#include "packet.h"
-#include "schemaparser.h"
-#include "lfta/rts.h"
-
-void rts_fta_process_packet(struct packet * p);
-void rts_fta_done();
-void fta_init(gs_sp_t device);
-
-
-#define CSVMAXLINE 1000000
-
-static FILE *pd;
-static int listensockfd=-1;
-static int fd=-1;
-static struct packet cur_packet;
-static gs_sp_t name;
-static gs_uint8_t line[CSVMAXLINE];
-static gs_uint32_t lineend=0;
-static gs_uint8_t csvdel=',';
-static gs_uint32_t verbose=0;
-static gs_uint32_t startupdelay=0;
-static gs_uint32_t singlefile=0;
-static gs_uint32_t gshub=0;
-static int socket_desc=0;
-
-static void csv_replay_check_messages() {
-    if (fta_start_service(0)<0) {
-        print_error("Error:in processing the msg queue for a replay file");
-        exit(9);
-    }
-}
-
-static gs_uint32_t gs_read_line(gs_uint8_t * buffer, gs_uint32_t length){
-    gs_uint32_t used=0;
-    gs_uint32_t cur;
-    fd_set socket_rset;
-    fd_set socket_eset;
-    struct timeval socket_timeout;
-    int retval;
-    
-    FD_ZERO(&socket_rset);
-    FD_SET(socket_desc,&socket_rset);
-    FD_ZERO(&socket_eset);
-    FD_SET(socket_desc,&socket_eset);
-    // timeout in one millisecon
-    socket_timeout.tv_sec=0;
-    socket_timeout.tv_usec=1000;
-    
-    if ((retval=select(socket_desc+1,&socket_rset,0,&socket_eset,&socket_timeout))<=0) {
-        if (retval==0) {
-            // caught a timeout
-            return -1;
-        }
-        return -2;
-    }
-    
-    while((used < (length-1)) && ((used==0)|| (buffer[used-1]!='\n'))) {
-        if ((cur=read(socket_desc,&(buffer[used]),1))<=0) {
-            print_error("ERROR:could not read data from gdat stream");
-            return -2;
-        }
-        used+=cur;
-    }
-       buffer[used]=0;
-       return 0;
-}
-
-static void init_socket() {
-       endpoint gshub;
-       endpoint srcinfo;
-       struct sockaddr_in server;
-    gs_int32_t parserversion;
-    gs_uint32_t schemalen;
-    static char * asciischema=0;
-       gs_int8_t buf[1024];
-       
-       if (get_hub(&gshub)!=0) {
-               print_error("ERROR:could not find gshub for data source");
-               exit(0);
-       }
-    
-       if (get_streamsource(gshub,name,&srcinfo,1) !=0) {
-               print_error("ERROR:could not find data source for stream\n");
-               exit(0);
-       }
-    
-       socket_desc = socket(AF_INET , SOCK_STREAM , 0);
-    if (socket_desc == -1)
-    {
-        print_error("ERROR:could not create socket for data stream");
-               exit(0);
-    }
-       server.sin_addr.s_addr = srcinfo.ip;
-    server.sin_family = AF_INET;
-    server.sin_port = srcinfo.port;
-    
-       if (connect(socket_desc , (struct sockaddr *)&server , sizeof(server)) < 0)
-    {
-               print_error("ERROR: could not open connection to data source");
-               exit(0);
-       }
-    
-}
-
-static void next_file() {
-       struct stat s;
-       if (verbose) {
-               fprintf(stderr,"Opening %s\n",name);
-       }
-       if (singlefile==0) {
-               while (lstat(name,&s)!=0) {
-                       if (errno!=ENOENT) {
-                               print_error("csv::lstat unexpected return value");
-                               exit(10);
-                       }
-                       csv_replay_check_messages();
-                       usleep(10000);
-               }
-               if  (pd!=0) {
-                       fclose(pd);
-               }
-       }
-    if ((pd=fopen(name,"r"))==0) {
-        print_error("csv::open failed ");
-        exit(10);
-    }
-       if (singlefile==0) {
-               unlink(name);
-       }
-}
-
-
-static gs_retval_t csv_replay_init(gs_sp_t device)
-{
-    gs_sp_t  verbosetmp;
-    gs_sp_t  delaytmp;
-    gs_sp_t  gshubtmp;
-    gs_sp_t  tempdel;
-    gs_sp_t  singlefiletmp;
-    
-    if ((name=get_iface_properties(device,"filename"))==0) {
-               print_error("csv_init::No CSV \"Filename\" defined");
-               exit(0);
-       }
-    tempdel=get_iface_properties(device,"csvseparator");
-    if (tempdel != 0 ) {
-        csvdel=(gs_uint8_t) tempdel[0];
-    }
-    
-    if ((verbosetmp=get_iface_properties(device,"verbose"))!=0) {
-        if (strncmp(verbosetmp,"TRUE",4)==0) {
-            verbose=1;
-            fprintf(stderr,"VERBOSE ENABLED\n");
-        } else {
-            fprintf(stderr,"VERBOSE DISABLED\n");
-        }
-    }
-    if ((singlefiletmp=get_iface_properties(device,"singlefile"))!=0) {
-        if (strncmp(singlefiletmp,"TRUE",4)==0) {
-            singlefile=1;
-            if (verbose)
-                fprintf(stderr,"SINGLEFILE ENABLED\n");
-        } else {
-            if (verbose)
-                fprintf(stderr,"SINGLEFILE DISABLED\n");
-        }
-    }
-    
-    if ((delaytmp=get_iface_properties(device,"startupdelay"))!=0) {
-        if (verbose) {
-            fprintf(stderr,"Startup delay of %u seconds\n",atoi(get_iface_properties(device,"startupdelay")));
-        }
-        startupdelay=atoi(get_iface_properties(device,"startupdelay"));
-    }
-    if ((gshubtmp=get_iface_properties(device,"gshub"))!=0) {
-        if (verbose) {
-            fprintf(stderr,"GDAT format using gshub\n");
-        }
-        gshub=1;
-    }
-    
-    cur_packet.ptype=PTYPE_CSV;
-    return 0;
-}
-
-static gs_retval_t csv_read_socket()
-{
-    gs_uint32_t i;
-    gs_uint32_t p;
-       gs_uint32_t x;
-       gs_int32_t r;
-       gs_retval_t ret;
-    gs_uint32_t done;
-    
-    if ((ret=gs_read_line(line,CSVMAXLINE-1))<0) { return ret;}
-        cur_packet.systemTime=time(0);
-        p=0;
-        i=0;
-        done=0;
-        while((done==0)&&(i<CSVMAXLINE)&&(line[i]!=0)&&(p<CSVELEMENTS)){
-            cur_packet.record.csv.fields[p]=&line[i];
-            p++;
-            while((line[i] != 0) && (line[i] != csvdel)) {
-                i++;
-            }
-            if (line[i]==0) done=1;
-            line[i]=0;
-            i++;
-        }
-        cur_packet.record.csv.numberfields=p;
-        //fprintf(stderr,"XX,%s,%s,%u,%u\n",cur_packet.record.csv.fields[0],cur_packet.record.csv.fields[1],x,lineend);
-        rts_fta_process_packet(&cur_packet);
-        if (lineend>x+1)  {
-            memcpy(&(line[0]),&(line[x+1]),lineend-x-1);
-            lineend=lineend-x-1;
-        } else {
-            lineend=0;
-        }
-        line[lineend]=0;
-        return 0;
-    }
-    
-    static void csv_read_tuple()
-    {
-        gs_uint32_t i=0;
-        gs_uint32_t p=0;
-        gs_uint32_t flen=0;
-        if (pd==0) next_file();
-        while(fgets((char *)&(line[0]),CSVMAXLINE,pd)==0) {
-            if (singlefile==1) {
-                if(verbose) {
-                    fprintf(stderr,"SINGLEFILE PROCESSING DONE!\n");
-                }
-                rts_fta_done();
-                if (verbose) {
-                    fprintf(stderr,"RTS SAYS BY\n");
-                }
-                while(1==1) sleep(1);
-            } else {
-                next_file();
-            }
-        }
-        cur_packet.systemTime=time(0);
-        while((i<CSVMAXLINE)&&(line[i]!=0)&&(p<CSVELEMENTS)){
-            cur_packet.record.csv.fields[p]=&line[i];
-            p++;
-            while((line[i] != 0) && (line[i] != csvdel)) {
-                i++;
-            }
-            if(line[i] != 0){
-                line[i]=0;
-                i++;
-            }
-        }
-        //                     Get rid of trailing \n and \r.
-        while(i>0 && (line[i-1] == '\n' || line[i-1] == '\r')){
-            i--;
-            line[i] = '\0';
-        }
-        cur_packet.record.csv.numberfields=p;
-        rts_fta_process_packet(&cur_packet);
-    }
-    
-    
-    
-    
-    
-    static gs_retval_t csv_process_file()
-    {
-        unsigned cnt=0;
-        static unsigned totalcnt=0;
-        for(cnt=0;cnt<50000;cnt++) {
-            if (gshub!=0) {
-                gs_retval_t retval;
-                retval=csv_read_socket();
-                if (retval==-1) return 0; // got a timeout so service message queue
-                if ((retval==-2) || (ftaschema_is_eof_tuple(cur_packet.record.gdat.schema,(void *)cur_packet.record.gdat.data))) {
-                    // we signal that everything is done if we either see an EOF tuple OR the socket is closed by the peer
-                    if (verbose)
-                        fprintf(stderr,"Done processing waiting for things to shut down\n");
-                    rts_fta_done();
-                    // now just service message queue until we get killed or loose connectivity
-                    while (0==0) {
-                        fta_start_service(0); // service all waiting messages
-                        usleep(1000); // sleep a millisecond
-                    }
-                }
-            } else {
-                csv_read_tuple();
-            }
-        }
-        totalcnt=totalcnt+cnt;
-        if (verbose) {
-            fprintf(stderr,"Processesd %u tuple\n",totalcnt);
-        }
-        return 0;
-    }
-    
-    gs_retval_t main_csv(gs_int32_t devicenum, gs_sp_t device, gs_int32_t mapcnt, gs_sp_t map[]) {
-        gs_uint32_t cont;
-        endpoint mygshub;
-        
-        csv_replay_init(device);
-        
-        /* initalize host_lib */
-        if (verbose) {
-            fprintf(stderr,"Init LFTAs for %s\n",device);
-        }
-        
-        if (hostlib_init(LFTA,0,devicenum,mapcnt,map)<0) {
-            fprintf(stderr,"%s::error:could not initiate host lib for clearinghouse\n",
-                    device);
-            exit(7);
-        }
-        
-        fta_init(device); /*xxx probably should get error code back put Ted doesn't give me one*/
-        
-        cont=startupdelay+time(0);
-        
-        if (verbose) { fprintf(stderr,"Start startup delay"); }
-        
-        while (cont>time(NULL)) {
-            if (fta_start_service(0)<0) {
-                fprintf(stderr,"%s::error:in processing the msg queue\n",
-                        device);
-                exit(9);
-            }
-            usleep(1000); /* sleep for one millisecond */
-        }
-        
-        if (verbose) { fprintf(stderr,"... Done\n"); }
-        
-        // open the connection to the data source
-        if (gshub!=0) { init_socket();}
-        
-        // wait to process till we get the signal from GSHUB
-        if (get_hub(&mygshub)!=0) {
-            print_error("ERROR:could not find gshub for data source");
-            exit(0);
-        }
-        while(get_startprocessing(mygshub,get_instance_name(),0)!=0) {
-            usleep(100);
-            if (fta_start_service(0)<0) {
-                fprintf(stderr,"%s::error:in processing the msg queue\n",
-                        device);
-                exit(9);
-            }
-        }
-        
-        /* now we enter an endless loop to process data */
-        if (verbose) {
-            fprintf(stderr,"Start processing %s\n",device);
-        }
-        
-        while (1==1) {
-            if (csv_process_file()<0) {
-                fprintf(stderr,"%s::error:in processing packets\n",
-                        device);
-                exit(8);
-            }
-            /* process all messages on the message queue*/
-            if (fta_start_service(0)<0) {
-                fprintf(stderr,"%s::error:in processing the msg queue\n",
-                        device);
-                exit(9);
-            }
-        }
-        return 0;
-    }
-    
-    
+/* ------------------------------------------------\r
+ Copyright 2014 AT&T Intellectual Property\r
+ Licensed under the Apache License, Version 2.0 (the "License");\r
+ you may not use this file except in compliance with the License.\r
+ You may obtain a copy of the License at\r
\r
+ http://www.apache.org/licenses/LICENSE-2.0\r
\r
+ Unless required by applicable law or agreed to in writing, software\r
+ distributed under the License is distributed on an "AS IS" BASIS,\r
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ See the License for the specific language governing permissions and\r
+ limitations under the License.\r
+ ------------------------------------------- */\r
+#include <time.h>\r
+#include <stdlib.h>\r
+#include <string.h>\r
+#include <unistd.h>\r
+#include <fcntl.h>\r
+#include <sys/time.h>\r
+#include <sys/stat.h>\r
+#include <sys/types.h>\r
+#include <sys/socket.h>\r
+#include <netinet/in.h>\r
+#include "errno.h"\r
+\r
+#include "gsconfig.h"\r
+#include "gshub.h"\r
+#include "gstypes.h"\r
+#include "lapp.h"\r
+#include "fta.h"\r
+#include "stdio.h"\r
+#include "stdlib.h"\r
+#include "packet.h"\r
+#include "schemaparser.h"\r
+#include "lfta/rts.h"\r
+\r
+void rts_fta_process_packet(struct packet * p);\r
+void rts_fta_done();\r
+void fta_init(gs_sp_t device);\r
+\r
+\r
+#define CSVMAXLINE 1000000\r
+\r
+static FILE *pd;\r
+static int listensockfd=-1;\r
+static int fd=-1;\r
+static struct packet cur_packet;\r
+static gs_sp_t name;\r
+static gs_uint8_t line[CSVMAXLINE];\r
+static gs_uint32_t lineend=0;\r
+static gs_uint8_t csvdel=',';\r
+static gs_uint32_t verbose=0;\r
+static gs_uint32_t startupdelay=0;\r
+static gs_uint32_t singlefile=0;\r
+static gs_uint32_t gshub=0;\r
+static int socket_desc=0;\r
+\r
+static void csv_replay_check_messages() {\r
+    if (fta_start_service(0)<0) {\r
+        print_error("Error:in processing the msg queue for a replay file");\r
+        exit(9);\r
+    }\r
+}\r
+\r
+static gs_uint32_t gs_read_line(gs_uint8_t * buffer, gs_uint32_t length){\r
+    gs_uint32_t used=0;\r
+    gs_uint32_t cur;\r
+    fd_set socket_rset;\r
+    fd_set socket_eset;\r
+    struct timeval socket_timeout;\r
+    int retval;\r
+    \r
+    FD_ZERO(&socket_rset);\r
+    FD_SET(socket_desc,&socket_rset);\r
+    FD_ZERO(&socket_eset);\r
+    FD_SET(socket_desc,&socket_eset);\r
+    // timeout in one millisecon\r
+    socket_timeout.tv_sec=0;\r
+    socket_timeout.tv_usec=1000;\r
+    \r
+    if ((retval=select(socket_desc+1,&socket_rset,0,&socket_eset,&socket_timeout))<=0) {\r
+        if (retval==0) {\r
+            // caught a timeout\r
+            return -1;\r
+        }\r
+        return -2;\r
+    }\r
+    \r
+    while((used < (length-1)) && ((used==0)|| (buffer[used-1]!='\n'))) {\r
+        if ((cur=read(socket_desc,&(buffer[used]),1))<=0) {\r
+            print_error("ERROR:could not read data from gdat stream");\r
+            return -2;\r
+        }\r
+        used+=cur;\r
+    }\r
+       buffer[used]=0;\r
+       return 0;\r
+}\r
+\r
+static void init_socket() {\r
+       endpoint gshub;\r
+       endpoint srcinfo;\r
+       struct sockaddr_in server;\r
+    gs_int32_t parserversion;\r
+    gs_uint32_t schemalen;\r
+    static char * asciischema=0;\r
+       gs_int8_t buf[1024];\r
+       \r
+       if (get_hub(&gshub)!=0) {\r
+               print_error("ERROR:could not find gshub for data source");\r
+               exit(0);\r
+       }\r
+    \r
+       if (get_streamsource(gshub,name,&srcinfo,1) !=0) {\r
+               print_error("ERROR:could not find data source for stream\n");\r
+               exit(0);\r
+       }\r
+    \r
+       socket_desc = socket(AF_INET , SOCK_STREAM , 0);\r
+    if (socket_desc == -1)\r
+    {\r
+        print_error("ERROR:could not create socket for data stream");\r
+               exit(0);\r
+    }\r
+       server.sin_addr.s_addr = srcinfo.ip;\r
+    server.sin_family = AF_INET;\r
+    server.sin_port = srcinfo.port;\r
+    \r
+       if (connect(socket_desc , (struct sockaddr *)&server , sizeof(server)) < 0)\r
+    {\r
+               print_error("ERROR: could not open connection to data source");\r
+               exit(0);\r
+       }\r
+    \r
+}\r
+\r
+static void next_file() {\r
+       struct stat s;\r
+       if (verbose) {\r
+               fprintf(stderr,"Opening %s\n",name);\r
+       }\r
+       if (singlefile==0) {\r
+               while (lstat(name,&s)!=0) {\r
+                       if (errno!=ENOENT) {\r
+                               print_error("csv::lstat unexpected return value");\r
+                               exit(10);\r
+                       }\r
+                       csv_replay_check_messages();\r
+                       usleep(10000);\r
+               }\r
+               if  (pd!=0) {\r
+                       fclose(pd);\r
+               }\r
+       }\r
+    if ((pd=fopen(name,"r"))==0) {\r
+        print_error("csv::open failed ");\r
+        exit(10);\r
+    }\r
+       if (singlefile==0) {\r
+               unlink(name);\r
+       }\r
+}\r
+\r
+\r
+static gs_retval_t csv_replay_init(gs_sp_t device)\r
+{\r
+    gs_sp_t  verbosetmp;\r
+    gs_sp_t  delaytmp;\r
+    gs_sp_t  gshubtmp;\r
+    gs_sp_t  tempdel;\r
+    gs_sp_t  singlefiletmp;\r
+    \r
+    if ((name=get_iface_properties(device,"filename"))==0) {\r
+               print_error("csv_init::No CSV \"Filename\" defined");\r
+               exit(0);\r
+       }\r
+    tempdel=get_iface_properties(device,"csvseparator");\r
+    if (tempdel != 0 ) {\r
+        csvdel=(gs_uint8_t) tempdel[0];\r
+    }\r
+    \r
+    if ((verbosetmp=get_iface_properties(device,"verbose"))!=0) {\r
+        if (strncmp(verbosetmp,"TRUE",4)==0) {\r
+            verbose=1;\r
+            fprintf(stderr,"VERBOSE ENABLED\n");\r
+        } else {\r
+            fprintf(stderr,"VERBOSE DISABLED\n");\r
+        }\r
+    }\r
+    if ((singlefiletmp=get_iface_properties(device,"singlefile"))!=0) {\r
+        if (strncmp(singlefiletmp,"TRUE",4)==0) {\r
+            singlefile=1;\r
+            if (verbose)\r
+                fprintf(stderr,"SINGLEFILE ENABLED\n");\r
+        } else {\r
+            if (verbose)\r
+                fprintf(stderr,"SINGLEFILE DISABLED\n");\r
+        }\r
+    }\r
+    \r
+    if ((delaytmp=get_iface_properties(device,"startupdelay"))!=0) {\r
+        if (verbose) {\r
+            fprintf(stderr,"Startup delay of %u seconds\n",atoi(get_iface_properties(device,"startupdelay")));\r
+        }\r
+        startupdelay=atoi(get_iface_properties(device,"startupdelay"));\r
+    }\r
+    if ((gshubtmp=get_iface_properties(device,"gshub"))!=0) {\r
+        if (verbose) {\r
+            fprintf(stderr,"GDAT format using gshub\n");\r
+        }\r
+        gshub=1;\r
+    }\r
+    \r
+    cur_packet.ptype=PTYPE_CSV;\r
+    return 0;\r
+}\r
+\r
+static gs_retval_t csv_read_socket()\r
+{\r
+    gs_uint32_t i;\r
+    gs_uint32_t p;\r
+       gs_uint32_t x;\r
+       gs_int32_t r;\r
+       gs_retval_t ret;\r
+    gs_uint32_t done;\r
+    \r
+    if ((ret=gs_read_line(line,CSVMAXLINE-1))<0) { return ret;}\r
+        cur_packet.systemTime=time(0);\r
+        p=0;\r
+        i=0;\r
+        done=0;\r
+        while((done==0)&&(i<CSVMAXLINE)&&(line[i]!=0)&&(p<CSVELEMENTS)){\r
+            cur_packet.record.csv.fields[p]=&line[i];\r
+            p++;\r
+            while((line[i] != 0) && (line[i] != csvdel)) {\r
+                i++;\r
+            }\r
+            if (line[i]==0) done=1;\r
+            line[i]=0;\r
+            i++;\r
+        }\r
+        cur_packet.record.csv.numberfields=p;\r
+        //fprintf(stderr,"XX,%s,%s,%u,%u\n",cur_packet.record.csv.fields[0],cur_packet.record.csv.fields[1],x,lineend);\r
+        rts_fta_process_packet(&cur_packet);\r
+        if (lineend>x+1)  {\r
+            memcpy(&(line[0]),&(line[x+1]),lineend-x-1);\r
+            lineend=lineend-x-1;\r
+        } else {\r
+            lineend=0;\r
+        }\r
+        line[lineend]=0;\r
+        return 0;\r
+    }\r
+    \r
+    static void csv_read_tuple()\r
+    {\r
+        gs_uint32_t i=0;\r
+        gs_uint32_t p=0;\r
+        gs_uint32_t flen=0;\r
+        if (pd==0) next_file();\r
+        while(fgets((char *)&(line[0]),CSVMAXLINE,pd)==0) {\r
+            if (singlefile==1) {\r
+                if(verbose) {\r
+                    fprintf(stderr,"SINGLEFILE PROCESSING DONE!\n");\r
+                }\r
+                rts_fta_done();\r
+                if (verbose) {\r
+                    fprintf(stderr,"RTS SAYS BY\n");\r
+                }\r
+                while(1==1) sleep(1);\r
+            } else {\r
+                next_file();\r
+            }\r
+        }\r
+        cur_packet.systemTime=time(0);\r
+        while((i<CSVMAXLINE)&&(line[i]!=0)&&(p<CSVELEMENTS)){\r
+            cur_packet.record.csv.fields[p]=&line[i];\r
+            p++;\r
+            while((line[i] != 0) && (line[i] != csvdel)) {\r
+                i++;\r
+            }\r
+            if(line[i] != 0){\r
+                line[i]=0;\r
+                i++;\r
+            }\r
+        }\r
+        //                     Get rid of trailing \n and \r.\r
+        while(i>0 && (line[i-1] == '\n' || line[i-1] == '\r')){\r
+            i--;\r
+            line[i] = '\0';\r
+        }\r
+        cur_packet.record.csv.numberfields=p;\r
+        rts_fta_process_packet(&cur_packet);\r
+    }\r
+    \r
+    \r
+    \r
+    \r
+    \r
+    static gs_retval_t csv_process_file()\r
+    {\r
+        unsigned cnt=0;\r
+        static unsigned totalcnt=0;\r
+        for(cnt=0;cnt<50000;cnt++) {\r
+            if (gshub!=0) {\r
+                gs_retval_t retval;\r
+                retval=csv_read_socket();\r
+                if (retval==-1) return 0; // got a timeout so service message queue\r
+                if ((retval==-2) || (ftaschema_is_eof_tuple(cur_packet.record.gdat.schema,(void *)cur_packet.record.gdat.data))) {\r
+                    // we signal that everything is done if we either see an EOF tuple OR the socket is closed by the peer\r
+                    if (verbose)\r
+                        fprintf(stderr,"Done processing waiting for things to shut down\n");\r
+                    rts_fta_done();\r
+                    // now just service message queue until we get killed or loose connectivity\r
+                    while (0==0) {\r
+                        fta_start_service(0); // service all waiting messages\r
+                        usleep(1000); // sleep a millisecond\r
+                    }\r
+                }\r
+            } else {\r
+                csv_read_tuple();\r
+            }\r
+        }\r
+        totalcnt=totalcnt+cnt;\r
+        if (verbose) {\r
+            fprintf(stderr,"Processesd %u tuple\n",totalcnt);\r
+        }\r
+        return 0;\r
+    }\r
+    \r
+    gs_retval_t main_csv(gs_int32_t devicenum, gs_sp_t device, gs_int32_t mapcnt, gs_sp_t map[]) {\r
+        gs_uint32_t cont;\r
+        endpoint mygshub;\r
+        \r
+        csv_replay_init(device);\r
+        \r
+        /* initalize host_lib */\r
+        if (verbose) {\r
+            fprintf(stderr,"Init LFTAs for %s\n",device);\r
+        }\r
+        \r
+        if (hostlib_init(LFTA,0,devicenum,mapcnt,map)<0) {\r
+            fprintf(stderr,"%s::error:could not initiate host lib for clearinghouse\n",\r
+                    device);\r
+            exit(7);\r
+        }\r
+        \r
+        fta_init(device); /*xxx probably should get error code back put Ted doesn't give me one*/\r
+        \r
+        cont=startupdelay+time(0);\r
+        \r
+        if (verbose) { fprintf(stderr,"Start startup delay"); }\r
+        \r
+        while (cont>time(NULL)) {\r
+            if (fta_start_service(0)<0) {\r
+                fprintf(stderr,"%s::error:in processing the msg queue\n",\r
+                        device);\r
+                exit(9);\r
+            }\r
+            usleep(1000); /* sleep for one millisecond */\r
+        }\r
+        \r
+        if (verbose) { fprintf(stderr,"... Done\n"); }\r
+        \r
+        // open the connection to the data source\r
+        if (gshub!=0) { init_socket();}\r
+        \r
+        // wait to process till we get the signal from GSHUB\r
+        if (get_hub(&mygshub)!=0) {\r
+            print_error("ERROR:could not find gshub for data source");\r
+            exit(0);\r
+        }\r
+        while(get_startprocessing(mygshub,get_instance_name(),0)!=0) {\r
+            usleep(100);\r
+            if (fta_start_service(0)<0) {\r
+                fprintf(stderr,"%s::error:in processing the msg queue\n",\r
+                        device);\r
+                exit(9);\r
+            }\r
+        }\r
+        \r
+        /* now we enter an endless loop to process data */\r
+        if (verbose) {\r
+            fprintf(stderr,"Start processing %s\n",device);\r
+        }\r
+        \r
+        while (1==1) {\r
+            if (csv_process_file()<0) {\r
+                fprintf(stderr,"%s::error:in processing packets\n",\r
+                        device);\r
+                exit(8);\r
+            }\r
+            /* process all messages on the message queue*/\r
+            if (fta_start_service(0)<0) {\r
+                fprintf(stderr,"%s::error:in processing the msg queue\n",\r
+                        device);\r
+                exit(9);\r
+            }\r
+        }\r
+        return 0;\r
+    }\r
+    \r
+    \r