Default to non-blocking ringbuffers
[com/gs-lite.git] / src / lib / gscprts / rts_csv.c
1 /* ------------------------------------------------
2  Copyright 2014 AT&T Intellectual Property
3  Licensed under the Apache License, Version 2.0 (the "License");
4  you may not use this file except in compliance with the License.
5  You may obtain a copy of the License at
6  
7  http://www.apache.org/licenses/LICENSE-2.0
8  
9  Unless required by applicable law or agreed to in writing, software
10  distributed under the License is distributed on an "AS IS" BASIS,
11  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  See the License for the specific language governing permissions and
13  limitations under the License.
14  ------------------------------------------- */
15 #include <time.h>
16 #include <stdlib.h>
17 #include <string.h>
18 #include <unistd.h>
19 #include <fcntl.h>
20 #include <sys/time.h>
21 #include <sys/stat.h>
22 #include <sys/types.h>
23 #include <sys/socket.h>
24 #include <netinet/in.h>
25 #include "errno.h"
26
27 #include "gsconfig.h"
28 #include "gshub.h"
29 #include "gstypes.h"
30 #include "lapp.h"
31 #include "fta.h"
32 #include "stdio.h"
33 #include "stdlib.h"
34 #include "packet.h"
35 #include "schemaparser.h"
36 #include "lfta/rts.h"
37
38 void rts_fta_process_packet(struct packet * p);
39 void rts_fta_done();
40 void fta_init(gs_sp_t device);
41
42
43 #define CSVMAXLINE 1000000
44
45 static FILE *pd;
46 static int listensockfd=-1;
47 static int fd=-1;
48 static struct packet cur_packet;
49 static gs_sp_t name;
50 static gs_uint8_t line[CSVMAXLINE];
51 static gs_uint32_t lineend=0;
52 static gs_uint8_t csvdel=',';
53 static gs_uint32_t verbose=0;
54 static gs_uint32_t startupdelay=0;
55 static gs_uint32_t singlefile=0;
56 static gs_uint32_t gshub=0;
57 static int socket_desc=0;
58
59 static void csv_replay_check_messages() {
60     if (fta_start_service(0)<0) {
61         print_error("Error:in processing the msg queue for a replay file");
62         exit(9);
63     }
64 }
65
66 static gs_uint32_t gs_read_line(gs_uint8_t * buffer, gs_uint32_t length){
67     gs_uint32_t used=0;
68     gs_uint32_t cur;
69     fd_set socket_rset;
70     fd_set socket_eset;
71     struct timeval socket_timeout;
72     int retval;
73     
74     FD_ZERO(&socket_rset);
75     FD_SET(socket_desc,&socket_rset);
76     FD_ZERO(&socket_eset);
77     FD_SET(socket_desc,&socket_eset);
78     // timeout in one millisecon
79     socket_timeout.tv_sec=0;
80     socket_timeout.tv_usec=1000;
81     
82     if ((retval=select(socket_desc+1,&socket_rset,0,&socket_eset,&socket_timeout))<=0) {
83         if (retval==0) {
84             // caught a timeout
85             return -1;
86         }
87         return -2;
88     }
89     
90     while((used < (length-1)) && ((used==0)|| (buffer[used-1]!='\n'))) {
91         if ((cur=read(socket_desc,&(buffer[used]),1))<=0) {
92             print_error("ERROR:could not read data from gdat stream");
93             return -2;
94         }
95         used+=cur;
96     }
97         buffer[used]=0;
98         return 0;
99 }
100
101 static void init_socket() {
102         endpoint gshub;
103         endpoint srcinfo;
104         struct sockaddr_in server;
105     gs_int32_t parserversion;
106     gs_uint32_t schemalen;
107     static char * asciischema=0;
108         gs_int8_t buf[1024];
109         
110         if (get_hub(&gshub)!=0) {
111                 print_error("ERROR:could not find gshub for data source");
112                 exit(0);
113         }
114     
115         if (get_streamsource(gshub,name,&srcinfo,1) !=0) {
116                 print_error("ERROR:could not find data source for stream\n");
117                 exit(0);
118         }
119     
120         socket_desc = socket(AF_INET , SOCK_STREAM , 0);
121     if (socket_desc == -1)
122     {
123         print_error("ERROR:could not create socket for data stream");
124                 exit(0);
125     }
126         server.sin_addr.s_addr = srcinfo.ip;
127     server.sin_family = AF_INET;
128     server.sin_port = srcinfo.port;
129     
130         if (connect(socket_desc , (struct sockaddr *)&server , sizeof(server)) < 0)
131     {
132                 print_error("ERROR: could not open connection to data source");
133                 exit(0);
134         }
135     
136 }
137
138 static void next_file() {
139         struct stat s;
140         if (verbose) {
141                 fprintf(stderr,"Opening %s\n",name);
142         }
143         if (singlefile==0) {
144                 while (lstat(name,&s)!=0) {
145                         if (errno!=ENOENT) {
146                                 print_error("csv::lstat unexpected return value");
147                                 exit(10);
148                         }
149                         csv_replay_check_messages();
150                         usleep(10000);
151                 }
152                 if  (pd!=0) {
153                         fclose(pd);
154                 }
155         }
156     if ((pd=fopen(name,"r"))==0) {
157         print_error("csv::open failed ");
158         exit(10);
159     }
160         if (singlefile==0) {
161                 unlink(name);
162         }
163 }
164
165
166 static gs_retval_t csv_replay_init(gs_sp_t device)
167 {
168     gs_sp_t  verbosetmp;
169     gs_sp_t  delaytmp;
170     gs_sp_t  gshubtmp;
171     gs_sp_t  tempdel;
172     gs_sp_t  singlefiletmp;
173     
174     if ((name=get_iface_properties(device,"filename"))==0) {
175                 print_error("csv_init::No CSV \"Filename\" defined");
176                 exit(0);
177         }
178     tempdel=get_iface_properties(device,"csvseparator");
179     if (tempdel != 0 ) {
180         csvdel=(gs_uint8_t) tempdel[0];
181     }
182     
183     if ((verbosetmp=get_iface_properties(device,"verbose"))!=0) {
184         if (strncmp(verbosetmp,"TRUE",4)==0) {
185             verbose=1;
186             fprintf(stderr,"VERBOSE ENABLED\n");
187         } else {
188             fprintf(stderr,"VERBOSE DISABLED\n");
189         }
190     }
191     if ((singlefiletmp=get_iface_properties(device,"singlefile"))!=0) {
192         if (strncmp(singlefiletmp,"TRUE",4)==0) {
193             singlefile=1;
194             if (verbose)
195                 fprintf(stderr,"SINGLEFILE ENABLED\n");
196         } else {
197             if (verbose)
198                 fprintf(stderr,"SINGLEFILE DISABLED\n");
199         }
200     }
201     
202     if ((delaytmp=get_iface_properties(device,"startupdelay"))!=0) {
203         if (verbose) {
204             fprintf(stderr,"Startup delay of %u seconds\n",atoi(get_iface_properties(device,"startupdelay")));
205         }
206         startupdelay=atoi(get_iface_properties(device,"startupdelay"));
207     }
208     if ((gshubtmp=get_iface_properties(device,"gshub"))!=0) {
209         if (verbose) {
210             fprintf(stderr,"GDAT format using gshub\n");
211         }
212         gshub=1;
213     }
214     
215     cur_packet.ptype=PTYPE_CSV;
216     return 0;
217 }
218
219 static gs_retval_t csv_read_socket()
220 {
221     gs_uint32_t i;
222     gs_uint32_t p;
223         gs_uint32_t x;
224         gs_int32_t r;
225         gs_retval_t ret;
226     gs_uint32_t done;
227     
228     if ((ret=gs_read_line(line,CSVMAXLINE-1))<0) { return ret;}
229         cur_packet.systemTime=time(0);
230         p=0;
231         i=0;
232         done=0;
233         while((done==0)&&(i<CSVMAXLINE)&&(line[i]!=0)&&(p<CSVELEMENTS)){
234             cur_packet.record.csv.fields[p]=&line[i];
235             p++;
236             while((line[i] != 0) && (line[i] != csvdel)) {
237                 i++;
238             }
239             if (line[i]==0) done=1;
240             line[i]=0;
241             i++;
242         }
243         cur_packet.record.csv.numberfields=p;
244         //fprintf(stderr,"XX,%s,%s,%u,%u\n",cur_packet.record.csv.fields[0],cur_packet.record.csv.fields[1],x,lineend);
245         rts_fta_process_packet(&cur_packet);
246         if (lineend>x+1)  {
247             memcpy(&(line[0]),&(line[x+1]),lineend-x-1);
248             lineend=lineend-x-1;
249         } else {
250             lineend=0;
251         }
252         line[lineend]=0;
253         return 0;
254     }
255     
256     static void csv_read_tuple()
257     {
258         gs_uint32_t i=0;
259         gs_uint32_t p=0;
260         gs_uint32_t flen=0;
261         if (pd==0) next_file();
262         while(fgets((char *)&(line[0]),CSVMAXLINE,pd)==0) {
263             if (singlefile==1) {
264                 if(verbose) {
265                     fprintf(stderr,"SINGLEFILE PROCESSING DONE!\n");
266                 }
267                 rts_fta_done();
268                 if (verbose) {
269                     fprintf(stderr,"RTS SAYS BY\n");
270                 }
271                 while(1==1) sleep(1);
272             } else {
273                 next_file();
274             }
275         }
276         cur_packet.systemTime=time(0);
277         while((i<CSVMAXLINE)&&(line[i]!=0)&&(p<CSVELEMENTS)){
278             cur_packet.record.csv.fields[p]=&line[i];
279             p++;
280             while((line[i] != 0) && (line[i] != csvdel)) {
281                 i++;
282             }
283             if(line[i] != 0){
284                 line[i]=0;
285                 i++;
286             }
287         }
288         //                      Get rid of trailing \n and \r.
289         while(i>0 && (line[i-1] == '\n' || line[i-1] == '\r')){
290             i--;
291             line[i] = '\0';
292         }
293         cur_packet.record.csv.numberfields=p;
294         rts_fta_process_packet(&cur_packet);
295     }
296     
297     
298     
299     
300     
301     static gs_retval_t csv_process_file()
302     {
303         unsigned cnt=0;
304         static unsigned totalcnt=0;
305         for(cnt=0;cnt<50000;cnt++) {
306             if (gshub!=0) {
307                 gs_retval_t retval;
308                 retval=csv_read_socket();
309                 if (retval==-1) return 0; // got a timeout so service message queue
310                 if ((retval==-2) || (ftaschema_is_eof_tuple(cur_packet.record.gdat.schema,(void *)cur_packet.record.gdat.data))) {
311                     // we signal that everything is done if we either see an EOF tuple OR the socket is closed by the peer
312                     if (verbose)
313                         fprintf(stderr,"Done processing waiting for things to shut down\n");
314                     rts_fta_done();
315                     // now just service message queue until we get killed or loose connectivity
316                     while (0==0) {
317                         fta_start_service(0); // service all waiting messages
318                         usleep(1000); // sleep a millisecond
319                     }
320                 }
321             } else {
322                 csv_read_tuple();
323             }
324         }
325         totalcnt=totalcnt+cnt;
326         if (verbose) {
327             fprintf(stderr,"Processesd %u tuple\n",totalcnt);
328         }
329         return 0;
330     }
331     
332     gs_retval_t main_csv(gs_int32_t devicenum, gs_sp_t device, gs_int32_t mapcnt, gs_sp_t map[]) {
333         gs_uint32_t cont;
334         endpoint mygshub;
335         
336         csv_replay_init(device);
337         
338         /* initalize host_lib */
339         if (verbose) {
340             fprintf(stderr,"Init LFTAs for %s\n",device);
341         }
342         
343         if (hostlib_init(LFTA,0,devicenum,mapcnt,map)<0) {
344             fprintf(stderr,"%s::error:could not initiate host lib for clearinghouse\n",
345                     device);
346             exit(7);
347         }
348         
349         fta_init(device); /*xxx probably should get error code back put Ted doesn't give me one*/
350         
351         cont=startupdelay+time(0);
352         
353         if (verbose) { fprintf(stderr,"Start startup delay"); }
354         
355         while (cont>time(NULL)) {
356             if (fta_start_service(0)<0) {
357                 fprintf(stderr,"%s::error:in processing the msg queue\n",
358                         device);
359                 exit(9);
360             }
361             usleep(1000); /* sleep for one millisecond */
362         }
363         
364         if (verbose) { fprintf(stderr,"... Done\n"); }
365         
366         // open the connection to the data source
367         if (gshub!=0) { init_socket();}
368         
369         // wait to process till we get the signal from GSHUB
370         if (get_hub(&mygshub)!=0) {
371             print_error("ERROR:could not find gshub for data source");
372             exit(0);
373         }
374         while(get_startprocessing(mygshub,get_instance_name(),0)!=0) {
375             usleep(100);
376             if (fta_start_service(0)<0) {
377                 fprintf(stderr,"%s::error:in processing the msg queue\n",
378                         device);
379                 exit(9);
380             }
381         }
382         
383         /* now we enter an endless loop to process data */
384         if (verbose) {
385             fprintf(stderr,"Start processing %s\n",device);
386         }
387         
388         while (1==1) {
389             if (csv_process_file()<0) {
390                 fprintf(stderr,"%s::error:in processing packets\n",
391                         device);
392                 exit(8);
393             }
394             /* process all messages on the message queue*/
395             if (fta_start_service(0)<0) {
396                 fprintf(stderr,"%s::error:in processing the msg queue\n",
397                         device);
398                 exit(9);
399             }
400         }
401         return 0;
402     }
403     
404