Added quantiling UDAFs
[com/gs-lite.git] / src / lib / gscprts / rts_csv.c
1 /* ------------------------------------------------\r
2  Copyright 2014 AT&T Intellectual Property\r
3  Licensed under the Apache License, Version 2.0 (the "License");\r
4  you may not use this file except in compliance with the License.\r
5  You may obtain a copy of the License at\r
6  \r
7  http://www.apache.org/licenses/LICENSE-2.0\r
8  \r
9  Unless required by applicable law or agreed to in writing, software\r
10  distributed under the License is distributed on an "AS IS" BASIS,\r
11  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
12  See the License for the specific language governing permissions and\r
13  limitations under the License.\r
14  ------------------------------------------- */\r
15 #include <time.h>\r
16 #include <stdlib.h>\r
17 #include <string.h>\r
18 #include <unistd.h>\r
19 #include <fcntl.h>\r
20 #include <sys/time.h>\r
21 #include <sys/stat.h>\r
22 #include <sys/types.h>\r
23 #include <sys/socket.h>\r
24 #include <netinet/in.h>\r
25 #include "errno.h"\r
26 \r
27 #include "gsconfig.h"\r
28 #include "gshub.h"\r
29 #include "gstypes.h"\r
30 #include "lapp.h"\r
31 #include "fta.h"\r
32 #include "stdio.h"\r
33 #include "stdlib.h"\r
34 #include "packet.h"\r
35 #include "schemaparser.h"\r
36 #include "lfta/rts.h"\r
37 \r
38 void rts_fta_process_packet(struct packet * p);\r
39 void rts_fta_done();\r
40 void fta_init(gs_sp_t device);\r
41 \r
42 \r
43 #define CSVMAXLINE 1000000\r
44 \r
45 static FILE *pd;\r
46 static int listensockfd=-1;\r
47 static int fd=-1;\r
48 static struct packet cur_packet;\r
49 static gs_sp_t name;\r
50 static gs_uint8_t line[CSVMAXLINE];\r
51 static gs_uint32_t lineend=0;\r
52 static gs_uint8_t csvdel=',';\r
53 static gs_uint32_t verbose=0;\r
54 static gs_uint32_t startupdelay=0;\r
55 static gs_uint32_t singlefile=0;\r
56 static gs_uint32_t gshub=0;\r
57 static int socket_desc=0;\r
58 \r
59 static void csv_replay_check_messages() {\r
60     if (fta_start_service(0)<0) {\r
61         print_error("Error:in processing the msg queue for a replay file");\r
62         exit(9);\r
63     }\r
64 }\r
65 \r
66 static gs_uint32_t gs_read_line(gs_uint8_t * buffer, gs_uint32_t length){\r
67     gs_uint32_t used=0;\r
68     gs_uint32_t cur;\r
69     fd_set socket_rset;\r
70     fd_set socket_eset;\r
71     struct timeval socket_timeout;\r
72     int retval;\r
73     \r
74     FD_ZERO(&socket_rset);\r
75     FD_SET(socket_desc,&socket_rset);\r
76     FD_ZERO(&socket_eset);\r
77     FD_SET(socket_desc,&socket_eset);\r
78     // timeout in one millisecon\r
79     socket_timeout.tv_sec=0;\r
80     socket_timeout.tv_usec=1000;\r
81     \r
82     if ((retval=select(socket_desc+1,&socket_rset,0,&socket_eset,&socket_timeout))<=0) {\r
83         if (retval==0) {\r
84             // caught a timeout\r
85             return -1;\r
86         }\r
87         return -2;\r
88     }\r
89     \r
90     while((used < (length-1)) && ((used==0)|| (buffer[used-1]!='\n'))) {\r
91         if ((cur=read(socket_desc,&(buffer[used]),1))<=0) {\r
92             print_error("ERROR:could not read data from gdat stream");\r
93             return -2;\r
94         }\r
95         used+=cur;\r
96     }\r
97         buffer[used]=0;\r
98         return 0;\r
99 }\r
100 \r
101 static void init_socket() {\r
102         endpoint gshub;\r
103         endpoint srcinfo;\r
104         struct sockaddr_in server;\r
105     gs_int32_t parserversion;\r
106     gs_uint32_t schemalen;\r
107     static char * asciischema=0;\r
108         gs_int8_t buf[1024];\r
109         \r
110         if (get_hub(&gshub)!=0) {\r
111                 print_error("ERROR:could not find gshub for data source");\r
112                 exit(0);\r
113         }\r
114     \r
115         if (get_streamsource(gshub,name,&srcinfo,1) !=0) {\r
116                 print_error("ERROR:could not find data source for stream\n");\r
117                 exit(0);\r
118         }\r
119     \r
120         socket_desc = socket(AF_INET , SOCK_STREAM , 0);\r
121     if (socket_desc == -1)\r
122     {\r
123         print_error("ERROR:could not create socket for data stream");\r
124                 exit(0);\r
125     }\r
126         server.sin_addr.s_addr = srcinfo.ip;\r
127     server.sin_family = AF_INET;\r
128     server.sin_port = srcinfo.port;\r
129     \r
130         if (connect(socket_desc , (struct sockaddr *)&server , sizeof(server)) < 0)\r
131     {\r
132                 print_error("ERROR: could not open connection to data source");\r
133                 exit(0);\r
134         }\r
135     \r
136 }\r
137 \r
138 static void next_file() {\r
139         struct stat s;\r
140         if (verbose) {\r
141                 fprintf(stderr,"Opening %s\n",name);\r
142         }\r
143         if (singlefile==0) {\r
144                 while (lstat(name,&s)!=0) {\r
145                         if (errno!=ENOENT) {\r
146                                 print_error("csv::lstat unexpected return value");\r
147                                 exit(10);\r
148                         }\r
149                         csv_replay_check_messages();\r
150                         usleep(10000);\r
151                 }\r
152                 if  (pd!=0) {\r
153                         fclose(pd);\r
154                 }\r
155         }\r
156     if ((pd=fopen(name,"r"))==0) {\r
157         print_error("csv::open failed ");\r
158         exit(10);\r
159     }\r
160         if (singlefile==0) {\r
161                 unlink(name);\r
162         }\r
163 }\r
164 \r
165 \r
166 static gs_retval_t csv_replay_init(gs_sp_t device)\r
167 {\r
168     gs_sp_t  verbosetmp;\r
169     gs_sp_t  delaytmp;\r
170     gs_sp_t  gshubtmp;\r
171     gs_sp_t  tempdel;\r
172     gs_sp_t  singlefiletmp;\r
173     \r
174     if ((name=get_iface_properties(device,"filename"))==0) {\r
175                 print_error("csv_init::No CSV \"Filename\" defined");\r
176                 exit(0);\r
177         }\r
178     tempdel=get_iface_properties(device,"csvseparator");\r
179     if (tempdel != 0 ) {\r
180         csvdel=(gs_uint8_t) tempdel[0];\r
181     }\r
182     \r
183     if ((verbosetmp=get_iface_properties(device,"verbose"))!=0) {\r
184         if (strncmp(verbosetmp,"TRUE",4)==0) {\r
185             verbose=1;\r
186             fprintf(stderr,"VERBOSE ENABLED\n");\r
187         } else {\r
188             fprintf(stderr,"VERBOSE DISABLED\n");\r
189         }\r
190     }\r
191     if ((singlefiletmp=get_iface_properties(device,"singlefile"))!=0) {\r
192         if (strncmp(singlefiletmp,"TRUE",4)==0) {\r
193             singlefile=1;\r
194             if (verbose)\r
195                 fprintf(stderr,"SINGLEFILE ENABLED\n");\r
196         } else {\r
197             if (verbose)\r
198                 fprintf(stderr,"SINGLEFILE DISABLED\n");\r
199         }\r
200     }\r
201     \r
202     if ((delaytmp=get_iface_properties(device,"startupdelay"))!=0) {\r
203         if (verbose) {\r
204             fprintf(stderr,"Startup delay of %u seconds\n",atoi(get_iface_properties(device,"startupdelay")));\r
205         }\r
206         startupdelay=atoi(get_iface_properties(device,"startupdelay"));\r
207     }\r
208     if ((gshubtmp=get_iface_properties(device,"gshub"))!=0) {\r
209         if (verbose) {\r
210             fprintf(stderr,"GDAT format using gshub\n");\r
211         }\r
212         gshub=1;\r
213     }\r
214     \r
215     cur_packet.ptype=PTYPE_CSV;\r
216     return 0;\r
217 }\r
218 \r
219 static gs_retval_t csv_read_socket()\r
220 {\r
221     gs_uint32_t i;\r
222     gs_uint32_t p;\r
223         gs_uint32_t x;\r
224         gs_int32_t r;\r
225         gs_retval_t ret;\r
226     gs_uint32_t done;\r
227     \r
228     if ((ret=gs_read_line(line,CSVMAXLINE-1))<0) { return ret;}\r
229         cur_packet.systemTime=time(0);\r
230         p=0;\r
231         i=0;\r
232         done=0;\r
233         while((done==0)&&(i<CSVMAXLINE)&&(line[i]!=0)&&(p<CSVELEMENTS)){\r
234             cur_packet.record.csv.fields[p]=&line[i];\r
235             p++;\r
236             while((line[i] != 0) && (line[i] != csvdel)) {\r
237                 i++;\r
238             }\r
239             if (line[i]==0) done=1;\r
240             line[i]=0;\r
241             i++;\r
242         }\r
243         cur_packet.record.csv.numberfields=p;\r
244         //fprintf(stderr,"XX,%s,%s,%u,%u\n",cur_packet.record.csv.fields[0],cur_packet.record.csv.fields[1],x,lineend);\r
245         rts_fta_process_packet(&cur_packet);\r
246         if (lineend>x+1)  {\r
247             memcpy(&(line[0]),&(line[x+1]),lineend-x-1);\r
248             lineend=lineend-x-1;\r
249         } else {\r
250             lineend=0;\r
251         }\r
252         line[lineend]=0;\r
253         return 0;\r
254     }\r
255     \r
256     static void csv_read_tuple()\r
257     {\r
258         gs_uint32_t i=0;\r
259         gs_uint32_t p=0;\r
260         gs_uint32_t flen=0;\r
261         if (pd==0) next_file();\r
262         while(fgets((char *)&(line[0]),CSVMAXLINE,pd)==0) {\r
263             if (singlefile==1) {\r
264                 if(verbose) {\r
265                     fprintf(stderr,"SINGLEFILE PROCESSING DONE!\n");\r
266                 }\r
267                 rts_fta_done();\r
268                 if (verbose) {\r
269                     fprintf(stderr,"RTS SAYS BY\n");\r
270                 }\r
271                 while(1==1) sleep(1);\r
272             } else {\r
273                 next_file();\r
274             }\r
275         }\r
276         cur_packet.systemTime=time(0);\r
277         while((i<CSVMAXLINE)&&(line[i]!=0)&&(p<CSVELEMENTS)){\r
278             cur_packet.record.csv.fields[p]=&line[i];\r
279             p++;\r
280             while((line[i] != 0) && (line[i] != csvdel)) {\r
281                 i++;\r
282             }\r
283             if(line[i] != 0){\r
284                 line[i]=0;\r
285                 i++;\r
286             }\r
287         }\r
288         //                      Get rid of trailing \n and \r.\r
289         while(i>0 && (line[i-1] == '\n' || line[i-1] == '\r')){\r
290             i--;\r
291             line[i] = '\0';\r
292         }\r
293         cur_packet.record.csv.numberfields=p;\r
294         rts_fta_process_packet(&cur_packet);\r
295     }\r
296     \r
297     \r
298     \r
299     \r
300     \r
301     static gs_retval_t csv_process_file()\r
302     {\r
303         unsigned cnt=0;\r
304         static unsigned totalcnt=0;\r
305         for(cnt=0;cnt<50000;cnt++) {\r
306             if (gshub!=0) {\r
307                 gs_retval_t retval;\r
308                 retval=csv_read_socket();\r
309                 if (retval==-1) return 0; // got a timeout so service message queue\r
310                 if ((retval==-2) || (ftaschema_is_eof_tuple(cur_packet.record.gdat.schema,(void *)cur_packet.record.gdat.data))) {\r
311                     // we signal that everything is done if we either see an EOF tuple OR the socket is closed by the peer\r
312                     if (verbose)\r
313                         fprintf(stderr,"Done processing waiting for things to shut down\n");\r
314                     rts_fta_done();\r
315                     // now just service message queue until we get killed or loose connectivity\r
316                     while (0==0) {\r
317                         fta_start_service(0); // service all waiting messages\r
318                         usleep(1000); // sleep a millisecond\r
319                     }\r
320                 }\r
321             } else {\r
322                 csv_read_tuple();\r
323             }\r
324         }\r
325         totalcnt=totalcnt+cnt;\r
326         if (verbose) {\r
327             fprintf(stderr,"Processesd %u tuple\n",totalcnt);\r
328         }\r
329         return 0;\r
330     }\r
331     \r
332     gs_retval_t main_csv(gs_int32_t devicenum, gs_sp_t device, gs_int32_t mapcnt, gs_sp_t map[]) {\r
333         gs_uint32_t cont;\r
334         endpoint mygshub;\r
335         \r
336         csv_replay_init(device);\r
337         \r
338         /* initalize host_lib */\r
339         if (verbose) {\r
340             fprintf(stderr,"Init LFTAs for %s\n",device);\r
341         }\r
342         \r
343         if (hostlib_init(LFTA,0,devicenum,mapcnt,map)<0) {\r
344             fprintf(stderr,"%s::error:could not initiate host lib for clearinghouse\n",\r
345                     device);\r
346             exit(7);\r
347         }\r
348         \r
349         fta_init(device); /*xxx probably should get error code back put Ted doesn't give me one*/\r
350         \r
351         cont=startupdelay+time(0);\r
352         \r
353         if (verbose) { fprintf(stderr,"Start startup delay"); }\r
354         \r
355         while (cont>time(NULL)) {\r
356             if (fta_start_service(0)<0) {\r
357                 fprintf(stderr,"%s::error:in processing the msg queue\n",\r
358                         device);\r
359                 exit(9);\r
360             }\r
361             usleep(1000); /* sleep for one millisecond */\r
362         }\r
363         \r
364         if (verbose) { fprintf(stderr,"... Done\n"); }\r
365         \r
366         // open the connection to the data source\r
367         if (gshub!=0) { init_socket();}\r
368         \r
369         // wait to process till we get the signal from GSHUB\r
370         if (get_hub(&mygshub)!=0) {\r
371             print_error("ERROR:could not find gshub for data source");\r
372             exit(0);\r
373         }\r
374         while(get_startprocessing(mygshub,get_instance_name(),0)!=0) {\r
375             usleep(100);\r
376             if (fta_start_service(0)<0) {\r
377                 fprintf(stderr,"%s::error:in processing the msg queue\n",\r
378                         device);\r
379                 exit(9);\r
380             }\r
381         }\r
382         \r
383         /* now we enter an endless loop to process data */\r
384         if (verbose) {\r
385             fprintf(stderr,"Start processing %s\n",device);\r
386         }\r
387         \r
388         while (1==1) {\r
389             if (csv_process_file()<0) {\r
390                 fprintf(stderr,"%s::error:in processing packets\n",\r
391                         device);\r
392                 exit(8);\r
393             }\r
394             /* process all messages on the message queue*/\r
395             if (fta_start_service(0)<0) {\r
396                 fprintf(stderr,"%s::error:in processing the msg queue\n",\r
397                         device);\r
398                 exit(9);\r
399             }\r
400         }\r
401         return 0;\r
402     }\r
403     \r
404     \r