1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
22 #include <sys/types.h>
23 #include <sys/socket.h>
24 #include <netinet/in.h>
35 #include "schemaparser.h"
39 void rts_fta_process_packet(struct packet * p);
41 void fta_init(gs_sp_t device);
43 #define CSVMAXLINE 1000000
46 static struct packet cur_packet;
48 static gs_uint32_t verbose=0;
49 static gs_uint32_t startupdelay=0;
50 static gs_uint32_t gshub=0;
51 static int socket_desc=0;
52 static gs_uint32_t singlefile=0;
54 static void gdat_replay_check_messages() {
55 if (fta_start_service(0)<0) {
56 print_error("Error:in processing the msg queue for a replay file");
61 static gs_retval_t gs_read(gs_sp_t buffer, gs_uint32_t length){
66 struct timeval socket_timeout;
69 FD_ZERO(&socket_rset);
70 FD_SET(socket_desc,&socket_rset);
71 FD_ZERO(&socket_eset);
72 FD_SET(socket_desc,&socket_eset);
73 // timeout in one millisecon
74 socket_timeout.tv_sec=0;
75 socket_timeout.tv_usec=1000;
77 if ((retval=select(socket_desc+1,&socket_rset,0,&socket_eset,&socket_timeout))<=0) {
82 print_error("ERROR:select error in reading data from socket");
86 while(used < length) {
87 if ((cur=read(socket_desc,&(buffer[used]),length-used))<=0) {
88 if (errno==115) return -2; // error code we get if the server closes the connection on us
89 print_error("ERROR:could not read data from gdat stream");
97 static gs_uint32_t gs_read_line(gs_sp_t buffer, gs_uint32_t length){
101 while((used < (length-1)) && ((used==0)|| (buffer[used-1]!='\n'))) {
102 if ((cur=read(socket_desc,&(buffer[used]),1))<=0) {
103 print_error("ERROR:could not read data from gdat stream");
113 static void init_socket() {
116 struct sockaddr_in server;
117 gs_int32_t parserversion;
118 gs_uint32_t schemalen;
119 static char * asciischema=0;
122 if (get_hub(&gshub)!=0) {
123 print_error("ERROR:could not find gshub for data source");
127 if (get_streamsource(gshub,name,&srcinfo,1) !=0) {
128 print_error("ERROR:could not find data source for stream\n");
132 socket_desc = socket(AF_INET , SOCK_STREAM , 0);
133 if (socket_desc == -1)
135 print_error("ERROR:could not create socket for data stream");
138 server.sin_addr.s_addr = srcinfo.ip;
139 server.sin_family = AF_INET;
140 server.sin_port = srcinfo.port;
142 if (connect(socket_desc , (struct sockaddr *)&server , sizeof(server)) < 0)
144 print_error("ERROR: could not open connection to data source");
149 gs_read_line(buf,1024);
150 if (strncmp(buf,"GDAT",4)!=0) {
151 print_error("ERROR: not a GDAT stream\n");
154 gs_read_line(buf,1024);
155 if (sscanf(buf,"VERSION:%u\n",&parserversion)!=1) {
156 print_error("ERROR: no GDAT VERSION given\n");
159 gs_read_line(buf,1024);
160 if (sscanf(buf,"SCHEMALENGTH:%u\n",&schemalen)!=1) {
161 print_error("ERROR: no GDAT SCHEMALENGTH given\n");
164 if (schemaparser_accepts_version(parserversion)!=1) {
165 print_error("gdatinput::wrong gdat version\n");
168 if ((asciischema=malloc(schemalen))==0) {
169 print_error("gdatinput::could not allocate memory for schema\n");
172 if (gs_read(asciischema,schemalen)!=1) {
173 print_error("gdatinput::could not read schema from file\n");
176 if ((cur_packet.record.gdat.schema=ftaschema_parse_string(asciischema))<0) {
177 print_error("gdatinput::could not parse schema\n");
180 cur_packet.record.gdat.numfields=ftaschema_tuple_len(cur_packet.record.gdat.schema);
184 static void next_file() {
186 gs_int32_t parserversion;
187 gs_uint32_t schemalen;
188 static char * asciischema=0;
191 fprintf(stderr,"Opening %s\n",name);
193 while (lstat(name,&s)!=0) {
195 print_error("gdat::lstat unexpected return value");
198 gdat_replay_check_messages();
205 if (asciischema!=0) free(asciischema);
206 if (cur_packet.record.gdat.schema>=0) {
207 ftaschema_free(cur_packet.record.gdat.schema);
211 if ((pd=fopen(name,"r"))==0) return;
216 if (fscanf(pd,"GDAT\nVERSION:%u\nSCHEMALENGTH:%u\n",&parserversion,&schemalen)!=2) {
217 if (verbose) fprintf(stderr,"gdatinput::could not parse GDAT header\n");
222 if (schemaparser_accepts_version(parserversion)!=1) {
223 if (verbose) fprintf(stderr,"gdatinput::wrong gdat version\n");
228 if ((asciischema=malloc(schemalen))==0) {
229 if (verbose) fprintf(stderr,"gdatinput::could not allocate memory for schema\n");
234 if (fread(asciischema,schemalen,1,pd)!=1) {
235 if (verbose) fprintf(stderr,"gdatinput::could not read schema from file\n");
240 if ((cur_packet.record.gdat.schema=ftaschema_parse_string(asciischema))<0) {
241 if (verbose) fprintf(stderr,"gdatinput::could not parse schema\n");
246 cur_packet.record.gdat.numfields=ftaschema_tuple_len(cur_packet.record.gdat.schema);
249 static gs_retval_t gdat_replay_init(gs_sp_t device)
254 gs_csp_t singlefiletmp;
258 if ((tmp_name=get_iface_properties(device,"filename"))==0) {
259 print_error("csv_init::No GDAT \"Filename\" defined");
262 name = strdup(tmp_name);
264 if ((verbosetmp=get_iface_properties(device,"verbose"))!=0) {
265 if (strncmp(verbosetmp,"TRUE",4)==0) {
267 fprintf(stderr,"VERBOSE ENABLED\n");
269 fprintf(stderr,"VERBOSE DISABLED\n");
272 if ((singlefiletmp=get_iface_properties(device,"singlefile"))!=0) {
273 if (strncmp(singlefiletmp,"TRUE",4)==0) {
276 fprintf(stderr,"SINGLEFILE ENABLED\n");
279 fprintf(stderr,"SINGLEFILE DISABLED\n");
283 if ((delaytmp=get_iface_properties(device,"startupdelay"))!=0) {
285 fprintf(stderr,"Startup delay of %u seconds\n",atoi(get_iface_properties(device,"startupdelay")));
287 startupdelay=atoi(get_iface_properties(device,"startupdelay"));
289 if ((gshubtmp=get_iface_properties(device,"gshub"))!=0) {
291 fprintf(stderr,"GDAT format using gshub\n");
296 cur_packet.ptype=PTYPE_GDAT;
300 static gs_retval_t gdat_read_socket(){
303 if ((retval=gs_read((gs_sp_t)&nsz,sizeof(gs_uint32_t)))<0) {
309 fprintf(stderr,"INTERNAL ERROR tuple to long for fixed buffer. Tuple sz %u\n",
312 print_error("Error::Illegal tuple received");
316 cur_packet.record.gdat.datasz=sz;
318 if (gs_read((gs_sp_t)cur_packet.record.gdat.data,(sz))!=1) {
320 fprintf(stderr,"UNEXPECTED END OF FILE. Tryed to read tuple of size %u\n",
323 print_error("Error::Illegal tuple received");
326 cur_packet.systemTime=time(0);
330 static gs_retval_t gdat_read_tuple(){
333 if (pd==0) next_file();
335 while((pd==0) || (fread(&nsz,sizeof(gs_uint32_t),1,pd)!=1)) {
338 fprintf(stderr,"SINGLEFILE PROCESSING DONE!\n");
342 fprintf(stderr,"RTS SAYS BY\n");
344 while(1==1) sleep(1);
352 fprintf(stderr,"INTERNAL ERROR tuple to long for fixed buffer. Tuple sz %u\n",
360 cur_packet.record.gdat.datasz=sz;
362 if (fread(cur_packet.record.gdat.data,(sz),1,pd)!=1) {
364 fprintf(stderr,"UNEXPECTED END OF FILE. Tryed to read tuple of size %u\n",
371 cur_packet.systemTime=time(0);
375 static gs_retval_t gdat_process_file()
378 static unsigned totalcnt=0;
379 for(cnt=0;cnt<50000;cnt++) {
382 retval=gdat_read_socket();
383 if (retval==-1) return 0; // got a timeout so service message queue
384 if ((retval==-2) || (ftaschema_is_eof_tuple(cur_packet.record.gdat.schema,(void *)cur_packet.record.gdat.data))) {
385 // we signal that everything is done if we either see an EOF tuple OR the socket is closed by the peer
387 fprintf(stderr,"Done processing waiting for things to shut down\n");
389 // now just service message queue until we get killed or loose connectivity
391 fta_start_service(0); // service all waiting messages
392 usleep(1000); // sleep a millisecond
398 rts_fta_process_packet(&cur_packet);
400 totalcnt=totalcnt+cnt;
402 fprintf(stderr,"Processesd %u tuple\n",totalcnt);
407 gs_retval_t main_gdat(gs_int32_t devicenum, gs_sp_t device, gs_int32_t mapcnt, gs_sp_t map[]) {
411 gdat_replay_init(device);
413 /* initalize host_lib */
415 fprintf(stderr,"Init LFTAs for %s\n",device);
418 if (hostlib_init(LFTA,0,devicenum,mapcnt,map)<0) {
419 fprintf(stderr,"%s::error:could not initiate host lib for clearinghouse\n",
426 cont=startupdelay+time(0);
428 if (verbose) { fprintf(stderr,"Start startup delay"); }
430 while (cont>time(NULL)) {
431 if (fta_start_service(0)<0) {
432 fprintf(stderr,"%s::error:in processing the msg queue\n",
436 usleep(1); /* sleep for one millisecond */
439 if (verbose) { fprintf(stderr,"... Done\n"); }
441 // open the connection to the data source
443 if (gshub!=0) { init_socket();}
445 // wait to process till we get the signal from GSHUB
446 if (get_hub(&mygshub)!=0) {
447 print_error("ERROR:could not find gshub for data source");
450 while(get_startprocessing(mygshub,get_instance_name(),0)!=0) {
452 if (fta_start_service(0)<0) {
453 fprintf(stderr,"%s::error:in processing the msg queue\n",
459 /* now we enter an endless loop to process data */
461 fprintf(stderr,"Start processing %s\n",device);
465 /* proess packets data stream*/
466 if (gdat_process_file()<0) {
467 fprintf(stderr,"%s::error:in processing packets\n",
471 /* process all messages on the message queue*/
472 if (fta_start_service(0)<0) {
473 fprintf(stderr,"%s::error:in processing the msg queue\n",