Fixed newline characters throughout the code
[com/gs-lite.git] / src / tools / gssinksim.c
1 /* ------------------------------------------------
2  Copyright 2014 AT&T Intellectual Property
3  Licensed under the Apache License, Version 2.0 (the "License");
4  you may not use this file except in compliance with the License.
5  You may obtain a copy of the License at
6  
7  http://www.apache.org/licenses/LICENSE-2.0
8  
9  Unless required by applicable law or agreed to in writing, software
10  distributed under the License is distributed on an "AS IS" BASIS,
11  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  See the License for the specific language governing permissions and
13  limitations under the License.
14  ------------------------------------------- */
15 #include <stdlib.h>
16 #include <stdio.h>
17 #include <unistd.h>
18 #include <signal.h>
19 #include <time.h>
20 #include <string.h>
21 #include <sys/time.h>
22 #include <sys/stat.h>
23 #include <sys/types.h>
24 #include <sys/socket.h>
25 #include <netinet/in.h>
26
27 #include <schemaparser.h>
28 #include "gsconfig.h"
29 #include "gstypes.h"
30 #include "gshub.h"
31
32 #define MAXLINE 100000
33 static unsigned tcpport=0;
34 static char linebuf[MAXLINE];
35 int listensockfd=0;
36 int fd=0;
37 gs_int32_t verbose=0;
38 endpoint hub;
39 endpoint ds;
40 gs_sp_t sink_name;
41
42
43 static void wait_for_feed() {
44     struct sockaddr_in serv_addr,cli_addr;
45     socklen_t clilen;
46     struct sockaddr_in sin;
47     socklen_t sin_sz;
48     if (listensockfd==0) {
49                 gs_int32_t on = 1;
50         
51         if (verbose==2) {
52             fprintf(stderr,"Create listen socket for port %u\n",tcpport);
53         }
54                 listensockfd=socket(AF_INET, SOCK_STREAM, 0);
55         if (listensockfd < 0) {
56                         gslog(LOG_EMERG,"Error:Could not create socket for tcp data stream");
57                         exit(1);
58                 }
59                 bzero((char *) &serv_addr, sizeof(serv_addr));
60                 serv_addr.sin_family = AF_INET;
61                 serv_addr.sin_addr.s_addr = INADDR_ANY;
62                 serv_addr.sin_port = htons(tcpport);
63 #ifndef __linux__
64         /* make sure we can reuse the common port rapidly */
65         if (setsockopt(listensockfd, SOL_SOCKET, SO_REUSEPORT,
66                        (gs_sp_t )&on, sizeof(on)) != 0) {
67             gslog(LOG_EMERG,"Error::could not set socket option\n");
68             exit(1);
69         }
70 #endif
71         if (setsockopt(listensockfd, SOL_SOCKET, SO_REUSEADDR,
72                        (gs_sp_t )&on, sizeof(on)) != 0) {
73             gslog(LOG_EMERG,"Error::could not set socket option\n");
74             exit(1);
75                 }
76         
77                 if (bind(listensockfd, (struct sockaddr *) &serv_addr,
78                  sizeof(serv_addr)) < 0) {
79                         gslog(LOG_EMERG,"Error:Could not bind socket for tcp data stream");
80             exit(1);
81         }
82         }
83     
84     if (verbose==2) {
85         fprintf(stderr,"Socket created waiting for data producer\n");
86     }
87     if (listen(listensockfd,5)< 0) {
88         fprintf(stderr,"Error::could not listen to socket for port %u \n",ntohs(serv_addr.sin_port));
89         close(listensockfd);
90         exit(1);
91     }
92     sin_sz=sizeof(sin);
93     if (getsockname(listensockfd, (struct sockaddr *) &sin, &sin_sz) < 0) {
94         fprintf(stderr,"Error::could not get local port number of listen socket\n");
95         exit(1);
96     }
97     ds.ip=htonl(127<<24|1);
98     ds.port=sin.sin_port;
99     if (set_streamsink(hub,sink_name,ds)!=0) {
100         fprintf(stderr,"Error::could not set sink in GSHUB for %s source name\n",sink_name);
101         exit(1);
102     }
103     
104         do {
105                 clilen = sizeof(cli_addr);
106                 fd=accept(listensockfd, (struct sockaddr *) &cli_addr, &clilen);
107                 if (fd<0) {
108             gslog(LOG_EMERG,"Error:Could not accept connection on tcp socket");
109                 }
110         } while (fd==0);
111     if (verbose) {
112         fprintf(stderr,"Producer found ready to rock!\n");
113     }
114     
115 }
116
117
118 static void gs_read(gs_sp_t buffer, gs_uint32_t length){
119         gs_uint32_t used=0;
120         gs_uint32_t cur;
121     
122         while(used < length) {
123         if (verbose==2) {
124             fprintf(stderr,"\tread %u out of %u\n",used,length);
125         }
126                 if ((cur=read(fd,&(buffer[used]),length-used))<=0) {
127                         gslog(LOG_EMERG,"ERROR:could not read data from gdat stream");
128                         exit(0);
129                 }
130                 used+=cur;
131         }
132 }
133
134 static void gs_read_line(gs_sp_t buffer, gs_uint32_t length){
135     gs_uint32_t used=0;
136     gs_uint32_t cur;
137     
138     while((used < (length-1)) && ((used==0)|| (buffer[used-1]!='\n'))) {
139         if ((cur=read(fd,&(buffer[used]),1))<=0) {
140             gslog(LOG_EMERG,"ERROR:could not read data from gdat stream");
141             exit(0);
142         }
143         used+=cur;
144     }
145         buffer[used]=0;
146 }
147
148 static gs_uint32_t get_stream_tuple(gs_uint32_t * rsize,gs_sp_t rbuf,gs_uint32_t bufsz){
149     
150     gs_uint32_t nsz,sz;
151     gs_read((gs_sp_t)&nsz,sizeof(gs_uint32_t));
152     sz=ntohl(nsz);
153     if (sz>MAXTUPLESZ) {
154         gslog(LOG_EMERG,"Error::Illegal tuple received");
155         exit(0);
156     }
157     
158     if (sz>bufsz) {
159         gslog(LOG_EMERG,"Error::Illegal tuple size received -- to long");
160         exit(0);
161     }
162     *rsize=sz;
163     
164     gs_read(rbuf,(sz));
165     return 0;
166 }
167
168 int main(int argc, char** argv) {
169     gs_uint32_t streamid;
170     gs_schemahandle_t schema;
171     
172     gs_uint32_t rstreamid;
173     gs_uint32_t rsize;
174     gs_int8_t rbuf[2*MAXTUPLESZ];
175     
176     gs_int32_t numberoffields;
177     gs_int32_t y;
178     gs_int32_t parserversion;
179     gs_uint32_t schemalen;
180     gs_sp_t asciischema;
181     gs_sp_t me;
182     gs_int32_t ch;
183     
184     gs_int8_t buf[1024];
185     gs_uint32_t tip1,tip2,tip3,tip4;
186     
187     /* initialize host library and the sgroup  */
188     
189     me=argv[0];
190     
191     while ((ch = getopt(argc, argv, "hvxp:")) != -1) {
192         switch(ch) {
193             case 'h':
194                 fprintf(stderr,"%s::usage: %s -v -x -p <port>  <IP>:<port> <source_name> \n",argv[0],argv[0]);
195                 exit(0);
196                 break;
197             case 'p':
198                 tcpport=atoi(optarg);;
199                 break;
200             case 'v':
201                 verbose=1;
202                 break;
203             case 'x':
204                 verbose=2;
205                 break;
206             default:
207                 break;
208         }
209     }
210     if (optind+2>argc) {
211         fprintf(stderr,"Could not find hub and stream source name on command line\n");
212         exit(1);
213     }
214     if (sscanf(argv[optind],"%u.%u.%u.%u:%hu",&tip1,&tip2,&tip3,&tip4,&(hub.port))!= 5 ) {
215         fprintf(stderr,"Could not parse hub endpoint\n");
216         exit(1);
217     }
218     hub.ip=htonl(tip1<<24|tip2<<16|tip3<<8|tip4);
219     hub.port=htons(hub.port);
220     sink_name=strdup(argv[optind+1]);
221     
222     wait_for_feed();
223     
224     gs_read_line(buf,1024);
225     if (strncmp(buf,"GDAT",4)!=0) {
226         gslog(LOG_EMERG,"ERROR: not a GDAT stream\n");
227         exit(0);
228     }
229     gs_read_line(buf,1024);
230     if (sscanf(buf,"VERSION:%u\n",&parserversion)!=1) {
231         gslog(LOG_EMERG,"ERROR: no GDAT VERSION given\n");
232         exit(0);
233     }
234     gs_read_line(buf,1024);
235     if (sscanf(buf,"SCHEMALENGTH:%u\n",&schemalen)!=1) {
236         gslog(LOG_EMERG,"ERROR: no GDAT SCHEMALENGTH given\n");
237         exit(0);
238     }
239     
240     if(verbose==2) {
241         fprintf (stderr,"GDAT version %u schemalength %u\n",parserversion,schemalen);
242     }
243         
244     if (schemaparser_accepts_version(parserversion)!=1) {
245         fprintf(stderr,"%s::error: wrong parser version %u for file %s\n",
246                 me,parserversion,argv[0]);
247         exit(1);
248     }
249     
250     if ((asciischema=malloc(schemalen))==0) {
251         fprintf(stderr,"%s::error: could not allocate schema buffer of sz %u "
252                 "for file %s\n",
253                 me,schemalen,argv[0]);
254         exit(1);
255     }
256     
257     if(verbose==2) {
258         fprintf (stderr,"Reading schema from scocket\n");
259     }
260     
261     gs_read(asciischema,schemalen);
262     
263     if(verbose==2) {
264         fprintf (stderr,"Received schema\n");
265     }
266     
267     if (verbose==2) {
268         fprintf(stderr,"%s\n",asciischema);
269     }
270     if ((schema=ftaschema_parse_string(asciischema))<0) {
271         fprintf(stderr,"%s::error: could not parse schema  \n",
272                 me);
273         exit(1);
274     }
275     
276     if ((numberoffields=ftaschema_tuple_len(schema))<0) {
277         fprintf(stderr,"%s::error:could not get number of fields in schema\n",
278                 me);
279         exit(1);
280     }
281     if (verbose==1) {
282         for(y=0; y<numberoffields;y++) {
283             printf("%s",ftaschema_field_name(schema,y));
284             if (y<numberoffields-1) printf("|");
285         }
286         printf("\n");
287     }
288     while(get_stream_tuple(&rsize,rbuf,2*MAXTUPLESZ)==0) {
289         if (verbose) {
290             if (ftaschema_is_eof_tuple(schema, rbuf)) {
291                 /* initiate shutdown or something of that nature */
292                 fprintf(stderr,"%s::All data proccessed\n",me);
293             } else {
294                 if (ftaschema_is_temporal_tuple(schema, rbuf)) {
295                     /* initiate shutdown or something of that nature */
296                     fprintf(stderr,"%s:: temporal tuple\n",me);
297                 } else {
298                     fprintf(stderr,"%s:: regular tuple\n",me);
299                 }
300             }
301         }
302         
303         if ((!rsize) )
304             continue;
305         for(y=0; y<numberoffields;y++) {
306             struct access_result ar;
307             if (verbose==2)
308                 printf("%s->",ftaschema_field_name(schema,y));
309             ar=ftaschema_get_field_by_index(schema,y,rbuf,rsize);
310             switch (ar.field_data_type) {
311                 case INT_TYPE:
312                     printf("%d",ar.r.i);
313                     break;
314                 case UINT_TYPE:
315                     printf("%u",ar.r.ui);
316                     break;
317                 case IP_TYPE:
318                     printf("%u.%u.%u.%u",ar.r.ui>>24&0xff,
319                            ar.r.ui>>16&0xff,
320                            ar.r.ui>>8&0xff,
321                            ar.r.ui&0xff);
322                     break;
323                 case IPV6_TYPE:
324                 {
325                     unsigned x;
326                     unsigned zc=0;
327                     for(x=0;x<4;x++) { if (ar.r.ip6.v[x]==0) zc++;}
328                     if (zc!=4) {
329                         for(x=0;x<8;x++) {
330                             unsigned char * a = (unsigned char *)  &(ar.r.ip6.v[0]);
331                             unsigned y;
332                             y=((unsigned)a[2*x])<<8|((unsigned) a[2*x+1]);
333                             printf("%04x",y);
334                             if (x<7) printf(":");
335                         }
336                     } else {
337                         printf("::");
338                     }
339                 }
340                     break;
341                 case USHORT_TYPE:
342                     printf("%hu",(unsigned short)ar.r.ui);
343                     break;
344                 case BOOL_TYPE:
345                     if (ar.r.ui==0) {
346                         printf("FALSE");
347                     } else {
348                         printf("TRUE");
349                     }
350                     break;
351                 case ULLONG_TYPE:
352                 {
353                     gs_uint64_t ul;
354                     gs_uint64_t t1;
355                     gs_uint64_t t2;
356                     ul=ar.r.ul;
357                     printf("%llu",ul);
358                 }
359                     break;
360                 case LLONG_TYPE:
361                     printf("%lld",ar.r.l);
362                     break;
363                 case FLOAT_TYPE:
364                     printf("%f",ar.r.f);
365                     break;
366                 case TIMEVAL_TYPE:
367                 { 
368                     gs_float_t t;
369                     t= ar.r.t.tv_usec;
370                     t=t/1000000;
371                     t=t+ar.r.t.tv_sec;
372                     printf("%lf sec",t);
373                 }
374                     break;
375                 case VSTR_TYPE:
376                 {
377                     int x;
378                     int c;
379                     char * src;
380                     src=(gs_sp_t)ar.r.vs.offset;
381                     if ((ar.r.vs.length>0) && (src[ar.r.vs.length-1]==0)) {
382                         ar.r.vs.length = ar.r.vs.length-1;
383                     }
384                     for(x=0;x<ar.r.vs.length;x++) {
385                         c=src[x];
386                         if (((c<='~') && (c>=' '))&&(c!='|')) {
387                             printf("%c",c);
388                         } else {
389                             printf("(0x%x)",(gs_uint8_t)c);
390                         }
391                     }
392                 }
393                     break;
394                 default:
395                     break;
396             }
397             if (y<numberoffields-1) printf("|");
398         }
399         printf("\n");
400         if (verbose!=0) fflush(stdout);
401     }
402     exit(0);
403 }
404