1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
22 #include <sys/types.h>
23 #include <sys/socket.h>
24 #include <netinet/in.h>
35 #include "schemaparser.h"
39 void rts_fta_process_packet(struct packet * p);
41 void fta_init(gs_sp_t device);
43 #define CSVMAXLINE 1000000
46 static struct packet cur_packet;
48 static gs_uint32_t verbose=0;
49 static gs_uint32_t startupdelay=0;
50 static gs_uint32_t gshub=0;
51 static int socket_desc=0;
52 static gs_uint32_t singlefile=0;
54 static void gdat_replay_check_messages() {
55 if (fta_start_service(0)<0) {
56 print_error("Error:in processing the msg queue for a replay file");
61 static gs_retval_t gs_read(gs_sp_t buffer, gs_uint32_t length){
66 struct timeval socket_timeout;
69 FD_ZERO(&socket_rset);
70 FD_SET(socket_desc,&socket_rset);
71 FD_ZERO(&socket_eset);
72 FD_SET(socket_desc,&socket_eset);
73 // timeout in one millisecon
74 socket_timeout.tv_sec=0;
75 socket_timeout.tv_usec=1000;
77 if ((retval=select(socket_desc+1,&socket_rset,0,&socket_eset,&socket_timeout))<=0) {
82 print_error("ERROR:select error in reading data from socket");
86 while(used < length) {
87 if ((cur=read(socket_desc,&(buffer[used]),length-used))<=0) {
88 if (errno==115) return -2; // error code we get if the server closes the connection on us
89 print_error("ERROR:could not read data from gdat stream");
97 static gs_uint32_t gs_read_line(gs_sp_t buffer, gs_uint32_t length){
101 while((used < (length-1)) && ((used==0)|| (buffer[used-1]!='\n'))) {
102 if ((cur=read(socket_desc,&(buffer[used]),1))<=0) {
103 print_error("ERROR:could not read data from gdat stream");
113 static void init_socket() {
116 struct sockaddr_in server;
117 gs_int32_t parserversion;
118 gs_uint32_t schemalen;
119 static char * asciischema=0;
122 if (get_hub(&gshub)!=0) {
123 print_error("ERROR:could not find gshub for data source");
127 if (get_streamsource(gshub,name,&srcinfo,1) !=0) {
128 print_error("ERROR:could not find data source for stream\n");
132 socket_desc = socket(AF_INET , SOCK_STREAM , 0);
133 if (socket_desc == -1)
135 print_error("ERROR:could not create socket for data stream");
138 server.sin_addr.s_addr = srcinfo.ip;
139 server.sin_family = AF_INET;
140 server.sin_port = srcinfo.port;
142 if (connect(socket_desc , (struct sockaddr *)&server , sizeof(server)) < 0)
144 print_error("ERROR: could not open connection to data source");
149 gs_read_line(buf,1024);
150 if (strncmp(buf,"GDAT",4)!=0) {
151 print_error("ERROR: not a GDAT stream\n");
154 gs_read_line(buf,1024);
155 if (sscanf(buf,"VERSION:%u\n",&parserversion)!=1) {
156 print_error("ERROR: no GDAT VERSION given\n");
159 gs_read_line(buf,1024);
160 if (sscanf(buf,"SCHEMALENGTH:%u\n",&schemalen)!=1) {
161 print_error("ERROR: no GDAT SCHEMALENGTH given\n");
164 if (schemaparser_accepts_version(parserversion)!=1) {
165 print_error("gdatinput::wrong gdat version\n");
168 if ((asciischema=malloc(schemalen))==0) {
169 print_error("gdatinput::could not allocate memory for schema\n");
172 if (gs_read(asciischema,schemalen)!=1) {
173 print_error("gdatinput::could not read schema from file\n");
176 if ((cur_packet.record.gdat.schema=ftaschema_parse_string(asciischema))<0) {
177 print_error("gdatinput::could not parse schema\n");
180 cur_packet.record.gdat.numfields=ftaschema_tuple_len(cur_packet.record.gdat.schema);
184 static void next_file() {
186 gs_int32_t parserversion;
187 gs_uint32_t schemalen;
188 static char * asciischema=0;
191 fprintf(stderr,"Opening %s\n",name);
193 while (lstat(name,&s)!=0) {
195 print_error("gdat::lstat unexpected return value");
198 gdat_replay_check_messages();
205 if (asciischema!=0) free(asciischema);
206 if (cur_packet.record.gdat.schema>=0) {
207 ftaschema_free(cur_packet.record.gdat.schema);
211 if ((pd=fopen(name,"r"))==0) return;
216 if (fscanf(pd,"GDAT\nVERSION:%u\nSCHEMALENGTH:%u\n",&parserversion,&schemalen)!=2) {
217 if (verbose) fprintf(stderr,"gdatinput::could not parse GDAT header\n");
222 if (schemaparser_accepts_version(parserversion)!=1) {
223 if (verbose) fprintf(stderr,"gdatinput::wrong gdat version\n");
228 if ((asciischema=malloc(schemalen))==0) {
229 if (verbose) fprintf(stderr,"gdatinput::could not allocate memory for schema\n");
234 if (fread(asciischema,schemalen,1,pd)!=1) {
235 if (verbose) fprintf(stderr,"gdatinput::could not read schema from file\n");
240 if ((cur_packet.record.gdat.schema=ftaschema_parse_string(asciischema))<0) {
241 if (verbose) fprintf(stderr,"gdatinput::could not parse schema\n");
246 cur_packet.record.gdat.numfields=ftaschema_tuple_len(cur_packet.record.gdat.schema);
249 static gs_retval_t gdat_replay_init(gs_sp_t device)
254 gs_sp_t singlefiletmp;
256 if ((name=get_iface_properties(device,"filename"))==0) {
257 print_error("csv_init::No GDAT \"Filename\" defined");
261 if ((verbosetmp=get_iface_properties(device,"verbose"))!=0) {
262 if (strncmp(verbosetmp,"TRUE",4)==0) {
264 fprintf(stderr,"VERBOSE ENABLED\n");
266 fprintf(stderr,"VERBOSE DISABLED\n");
269 if ((singlefiletmp=get_iface_properties(device,"singlefile"))!=0) {
270 if (strncmp(singlefiletmp,"TRUE",4)==0) {
273 fprintf(stderr,"SINGLEFILE ENABLED\n");
276 fprintf(stderr,"SINGLEFILE DISABLED\n");
280 if ((delaytmp=get_iface_properties(device,"startupdelay"))!=0) {
282 fprintf(stderr,"Startup delay of %u seconds\n",atoi(get_iface_properties(device,"startupdelay")));
284 startupdelay=atoi(get_iface_properties(device,"startupdelay"));
286 if ((gshubtmp=get_iface_properties(device,"gshub"))!=0) {
288 fprintf(stderr,"GDAT format using gshub\n");
293 cur_packet.ptype=PTYPE_GDAT;
297 static gs_retval_t gdat_read_socket(){
300 if ((retval=gs_read((gs_sp_t)&nsz,sizeof(gs_uint32_t)))<0) {
306 fprintf(stderr,"INTERNAL ERROR tuple to long for fixed buffer. Tuple sz %u\n",
309 print_error("Error::Illegal tuple received");
313 cur_packet.record.gdat.datasz=sz;
315 if (gs_read((gs_sp_t)cur_packet.record.gdat.data,(sz))!=1) {
317 fprintf(stderr,"UNEXPECTED END OF FILE. Tryed to read tuple of size %u\n",
320 print_error("Error::Illegal tuple received");
323 cur_packet.systemTime=time(0);
327 static gs_retval_t gdat_read_tuple(){
330 if (pd==0) next_file();
332 while((pd==0) || (fread(&nsz,sizeof(gs_uint32_t),1,pd)!=1)) {
335 fprintf(stderr,"SINGLEFILE PROCESSING DONE!\n");
339 fprintf(stderr,"RTS SAYS BY\n");
341 while(1==1) sleep(1);
349 fprintf(stderr,"INTERNAL ERROR tuple to long for fixed buffer. Tuple sz %u\n",
357 cur_packet.record.gdat.datasz=sz;
359 if (fread(cur_packet.record.gdat.data,(sz),1,pd)!=1) {
361 fprintf(stderr,"UNEXPECTED END OF FILE. Tryed to read tuple of size %u\n",
368 cur_packet.systemTime=time(0);
372 static gs_retval_t gdat_process_file()
375 static unsigned totalcnt=0;
376 for(cnt=0;cnt<50000;cnt++) {
379 retval=gdat_read_socket();
380 if (retval==-1) return 0; // got a timeout so service message queue
381 if ((retval==-2) || (ftaschema_is_eof_tuple(cur_packet.record.gdat.schema,(void *)cur_packet.record.gdat.data))) {
382 // we signal that everything is done if we either see an EOF tuple OR the socket is closed by the peer
384 fprintf(stderr,"Done processing waiting for things to shut down\n");
386 // now just service message queue until we get killed or loose connectivity
388 fta_start_service(0); // service all waiting messages
389 usleep(1000); // sleep a millisecond
395 rts_fta_process_packet(&cur_packet);
397 totalcnt=totalcnt+cnt;
399 fprintf(stderr,"Processesd %u tuple\n",totalcnt);
404 gs_retval_t main_gdat(gs_int32_t devicenum, gs_sp_t device, gs_int32_t mapcnt, gs_sp_t map[]) {
408 gdat_replay_init(device);
410 /* initalize host_lib */
412 fprintf(stderr,"Init LFTAs for %s\n",device);
415 if (hostlib_init(LFTA,0,devicenum,mapcnt,map)<0) {
416 fprintf(stderr,"%s::error:could not initiate host lib for clearinghouse\n",
423 cont=startupdelay+time(0);
425 if (verbose) { fprintf(stderr,"Start startup delay"); }
427 while (cont>time(NULL)) {
428 if (fta_start_service(0)<0) {
429 fprintf(stderr,"%s::error:in processing the msg queue\n",
433 usleep(1); /* sleep for one millisecond */
436 if (verbose) { fprintf(stderr,"... Done\n"); }
438 // open the connection to the data source
440 if (gshub!=0) { init_socket();}
442 // wait to process till we get the signal from GSHUB
443 if (get_hub(&mygshub)!=0) {
444 print_error("ERROR:could not find gshub for data source");
447 while(get_startprocessing(mygshub,get_instance_name(),0)!=0) {
449 if (fta_start_service(0)<0) {
450 fprintf(stderr,"%s::error:in processing the msg queue\n",
456 /* now we enter an endless loop to process data */
458 fprintf(stderr,"Start processing %s\n",device);
462 /* proess packets data stream*/
463 if (gdat_process_file()<0) {
464 fprintf(stderr,"%s::error:in processing packets\n",
468 /* process all messages on the message queue*/
469 if (fta_start_service(0)<0) {
470 fprintf(stderr,"%s::error:in processing the msg queue\n",