1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
22 #include <sys/types.h>
23 #include <sys/socket.h>
24 #include <netinet/in.h>
35 #include "schemaparser.h"
38 void rts_fta_process_packet(struct packet * p);
40 void fta_init(gs_sp_t device);
43 #define CSVMAXLINE 1000000
46 static int listensockfd=-1;
48 static struct packet cur_packet;
50 static gs_uint8_t line[CSVMAXLINE];
51 static gs_uint32_t lineend=0;
52 static gs_uint8_t csvdel=',';
53 static gs_uint32_t verbose=0;
54 static gs_uint32_t startupdelay=0;
55 static gs_uint32_t singlefile=0;
56 static gs_uint32_t gshub=0;
57 static int socket_desc=0;
59 static void csv_replay_check_messages() {
60 if (fta_start_service(0)<0) {
61 print_error("Error:in processing the msg queue for a replay file");
66 static gs_uint32_t gs_read_line(gs_uint8_t * buffer, gs_uint32_t length){
71 struct timeval socket_timeout;
74 FD_ZERO(&socket_rset);
75 FD_SET(socket_desc,&socket_rset);
76 FD_ZERO(&socket_eset);
77 FD_SET(socket_desc,&socket_eset);
78 // timeout in one millisecon
79 socket_timeout.tv_sec=0;
80 socket_timeout.tv_usec=1000;
82 if ((retval=select(socket_desc+1,&socket_rset,0,&socket_eset,&socket_timeout))<=0) {
90 while((used < (length-1)) && ((used==0)|| (buffer[used-1]!='\n'))) {
91 if ((cur=read(socket_desc,&(buffer[used]),1))<=0) {
92 print_error("ERROR:could not read data from gdat stream");
101 static void init_socket() {
104 struct sockaddr_in server;
105 gs_int32_t parserversion;
106 gs_uint32_t schemalen;
107 static char * asciischema=0;
110 if (get_hub(&gshub)!=0) {
111 print_error("ERROR:could not find gshub for data source");
115 if (get_streamsource(gshub,name,&srcinfo,1) !=0) {
116 print_error("ERROR:could not find data source for stream\n");
120 socket_desc = socket(AF_INET , SOCK_STREAM , 0);
121 if (socket_desc == -1)
123 print_error("ERROR:could not create socket for data stream");
126 server.sin_addr.s_addr = srcinfo.ip;
127 server.sin_family = AF_INET;
128 server.sin_port = srcinfo.port;
130 if (connect(socket_desc , (struct sockaddr *)&server , sizeof(server)) < 0)
132 print_error("ERROR: could not open connection to data source");
138 static void next_file() {
141 fprintf(stderr,"Opening %s\n",name);
144 while (lstat(name,&s)!=0) {
146 print_error("csv::lstat unexpected return value");
149 csv_replay_check_messages();
156 if ((pd=fopen(name,"r"))==0) {
157 print_error("csv::open failed ");
166 static gs_retval_t csv_replay_init(gs_sp_t device)
172 gs_sp_t singlefiletmp;
174 if ((name=get_iface_properties(device,"filename"))==0) {
175 print_error("csv_init::No CSV \"Filename\" defined");
178 tempdel=get_iface_properties(device,"csvseparator");
180 csvdel=(gs_uint8_t) tempdel[0];
183 if ((verbosetmp=get_iface_properties(device,"verbose"))!=0) {
184 if (strncmp(verbosetmp,"TRUE",4)==0) {
186 fprintf(stderr,"VERBOSE ENABLED\n");
188 fprintf(stderr,"VERBOSE DISABLED\n");
191 if ((singlefiletmp=get_iface_properties(device,"singlefile"))!=0) {
192 if (strncmp(singlefiletmp,"TRUE",4)==0) {
195 fprintf(stderr,"SINGLEFILE ENABLED\n");
198 fprintf(stderr,"SINGLEFILE DISABLED\n");
202 if ((delaytmp=get_iface_properties(device,"startupdelay"))!=0) {
204 fprintf(stderr,"Startup delay of %u seconds\n",atoi(get_iface_properties(device,"startupdelay")));
206 startupdelay=atoi(get_iface_properties(device,"startupdelay"));
208 if ((gshubtmp=get_iface_properties(device,"gshub"))!=0) {
210 fprintf(stderr,"GDAT format using gshub\n");
215 cur_packet.ptype=PTYPE_CSV;
219 static gs_retval_t csv_read_socket()
228 if ((ret=gs_read_line(line,CSVMAXLINE-1))<0) { return ret;}
229 cur_packet.systemTime=time(0);
233 while((done==0)&&(i<CSVMAXLINE)&&(line[i]!=0)&&(p<CSVELEMENTS)){
234 cur_packet.record.csv.fields[p]=&line[i];
236 while((line[i] != 0) && (line[i] != csvdel)) {
239 if (line[i]==0) done=1;
243 cur_packet.record.csv.numberfields=p;
244 //fprintf(stderr,"XX,%s,%s,%u,%u\n",cur_packet.record.csv.fields[0],cur_packet.record.csv.fields[1],x,lineend);
245 rts_fta_process_packet(&cur_packet);
247 memcpy(&(line[0]),&(line[x+1]),lineend-x-1);
256 static void csv_read_tuple()
261 if (pd==0) next_file();
262 while(fgets((char *)&(line[0]),CSVMAXLINE,pd)==0) {
265 fprintf(stderr,"SINGLEFILE PROCESSING DONE!\n");
269 fprintf(stderr,"RTS SAYS BY\n");
271 while(1==1) sleep(1);
276 cur_packet.systemTime=time(0);
277 while((i<CSVMAXLINE)&&(line[i]!=0)&&(p<CSVELEMENTS)){
278 cur_packet.record.csv.fields[p]=&line[i];
280 while((line[i] != 0) && (line[i] != csvdel)) {
288 // Get rid of trailing \n and \r.
289 while(i>0 && (line[i-1] == '\n' || line[i-1] == '\r')){
293 cur_packet.record.csv.numberfields=p;
294 rts_fta_process_packet(&cur_packet);
301 static gs_retval_t csv_process_file()
304 static unsigned totalcnt=0;
305 for(cnt=0;cnt<50000;cnt++) {
308 retval=csv_read_socket();
309 if (retval==-1) return 0; // got a timeout so service message queue
310 if ((retval==-2) || (ftaschema_is_eof_tuple(cur_packet.record.gdat.schema,(void *)cur_packet.record.gdat.data))) {
311 // we signal that everything is done if we either see an EOF tuple OR the socket is closed by the peer
313 fprintf(stderr,"Done processing waiting for things to shut down\n");
315 // now just service message queue until we get killed or loose connectivity
317 fta_start_service(0); // service all waiting messages
318 usleep(1000); // sleep a millisecond
325 totalcnt=totalcnt+cnt;
327 fprintf(stderr,"Processesd %u tuple\n",totalcnt);
332 gs_retval_t main_csv(gs_int32_t devicenum, gs_sp_t device, gs_int32_t mapcnt, gs_sp_t map[]) {
336 csv_replay_init(device);
338 /* initalize host_lib */
340 fprintf(stderr,"Init LFTAs for %s\n",device);
343 if (hostlib_init(LFTA,0,devicenum,mapcnt,map)<0) {
344 fprintf(stderr,"%s::error:could not initiate host lib for clearinghouse\n",
349 fta_init(device); /*xxx probably should get error code back put Ted doesn't give me one*/
351 cont=startupdelay+time(0);
353 if (verbose) { fprintf(stderr,"Start startup delay"); }
355 while (cont>time(NULL)) {
356 if (fta_start_service(0)<0) {
357 fprintf(stderr,"%s::error:in processing the msg queue\n",
361 usleep(1000); /* sleep for one millisecond */
364 if (verbose) { fprintf(stderr,"... Done\n"); }
366 // open the connection to the data source
367 if (gshub!=0) { init_socket();}
369 // wait to process till we get the signal from GSHUB
370 if (get_hub(&mygshub)!=0) {
371 print_error("ERROR:could not find gshub for data source");
374 while(get_startprocessing(mygshub,get_instance_name(),0)!=0) {
376 if (fta_start_service(0)<0) {
377 fprintf(stderr,"%s::error:in processing the msg queue\n",
383 /* now we enter an endless loop to process data */
385 fprintf(stderr,"Start processing %s\n",device);
389 if (csv_process_file()<0) {
390 fprintf(stderr,"%s::error:in processing packets\n",
394 /* process all messages on the message queue*/
395 if (fta_start_service(0)<0) {
396 fprintf(stderr,"%s::error:in processing the msg queue\n",