4b18ab56862d2c441d6c68c59c656edc2c8b09d8
[com/gs-lite.git] / src / tools / gsprintconsole.c
1 /* ------------------------------------------------
2  Copyright 2014 AT&T Intellectual Property
3  Licensed under the Apache License, Version 2.0 (the "License");
4  you may not use this file except in compliance with the License.
5  You may obtain a copy of the License at
6  
7  http://www.apache.org/licenses/LICENSE-2.0
8  
9  Unless required by applicable law or agreed to in writing, software
10  distributed under the License is distributed on an "AS IS" BASIS,
11  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  See the License for the specific language governing permissions and
13  limitations under the License.
14  ------------------------------------------- */
15 #include <app.h>
16 #include <stdlib.h>
17 #include <stdio.h>
18 #include <unistd.h>
19 #include <signal.h>
20 #include <time.h>
21 #include <string.h>
22 #include <sys/time.h>
23 #include <sys/stat.h>
24 #include <sys/types.h>
25 #include <sys/socket.h>
26 #include <netinet/in.h>
27
28
29 #include "gsconfig.h"
30 #include "gstypes.h"
31 #include "gshub.h"
32
33 #include <schemaparser.h>
34
35 #define MAXLINE 100000
36 static unsigned tcpport=0;
37 static char linebuf[MAXLINE];
38 int listensockfd=0;
39 int fd=0;
40
41
42 // Not all systems have timersub defined so make sure its ther
43 #ifndef timersub
44
45 #define timersub(tvp, uvp, vvp)                                         \
46 do {                                                            \
47 (vvp)->tv_sec = (tvp)->tv_sec - (uvp)->tv_sec;          \
48 (vvp)->tv_usec = (tvp)->tv_usec - (uvp)->tv_usec;       \
49 if ((vvp)->tv_usec < 0) {                               \
50 (vvp)->tv_sec--;                                \
51 (vvp)->tv_usec += 1000000;                      \
52 }                                                       \
53 } while (0)
54
55 #endif
56
57 void hand(int iv) {
58     ftaapp_exit();
59     fprintf(stderr, "exiting via signal handler %d...\n", iv);
60     exit(1);
61 }
62
63 static void wait_for_client() {
64     struct sockaddr_in serv_addr,cli_addr;
65     socklen_t clilen;
66     if (listensockfd==0) {
67                 gs_int32_t on = 1;
68                 listensockfd=socket(AF_INET, SOCK_STREAM, 0);
69         if (listensockfd < 0) {
70                         gslog(LOG_EMERG,"Error:Could not create socket for tcp data stream");
71                         exit(1);
72                 }
73                 bzero((char *) &serv_addr, sizeof(serv_addr));
74                 serv_addr.sin_family = AF_INET;
75                 serv_addr.sin_addr.s_addr = INADDR_ANY;
76                 serv_addr.sin_port = htons(tcpport);
77 #ifndef __linux__
78         /* make sure we can reuse the common port rapidly */
79         if (setsockopt(listensockfd, SOL_SOCKET, SO_REUSEPORT,
80                        (gs_sp_t )&on, sizeof(on)) != 0) {
81             gslog(LOG_EMERG,"Error::could not set socket option\n");
82             exit(1);
83         }
84 #endif
85         if (setsockopt(listensockfd, SOL_SOCKET, SO_REUSEADDR,
86                        (gs_sp_t )&on, sizeof(on)) != 0) {
87             gslog(LOG_EMERG,"Error::could not set socket option\n");
88             exit(1);
89                 }
90         
91                 if (bind(listensockfd, (struct sockaddr *) &serv_addr,
92                  sizeof(serv_addr)) < 0) {
93                         gslog(LOG_EMERG,"Error:Could not bind socket for tcp data stream");
94             exit(1);
95         }
96         }
97     
98         do {
99                 listen(listensockfd,5);
100                 clilen = sizeof(cli_addr);
101                 fd=accept(listensockfd, (struct sockaddr *) &cli_addr, &clilen);
102                 if (fd<0) {
103             gslog(LOG_EMERG,"Error:Could not accept connection on tcp socket");
104                 }
105         } while (fd==0);
106 }
107
108
109 static void emit_socket() {
110         unsigned o,w,l;
111         o=0;
112         w=0;
113         l=strlen(linebuf);
114         do {
115                 if((w=write(fd,&linebuf[o],l))==0) {
116                         close(fd);
117                         wait_for_client();
118                 }
119                 o=o+w;
120         } while (o<l);
121 }
122
123 static void emit_line() {
124     
125     if (tcpport==0) {
126         printf("%s",linebuf);
127     } else {
128         emit_socket();
129     }
130     
131 }
132
133 int main(int argc, char* argv[]) {
134     gs_sp_t me = argv[0];
135     FTAID fta_id;
136     gs_int32_t schema, ch;
137     
138     FTAID rfta_id;
139     gs_uint32_t rsize;
140     gs_uint32_t bufsz=8*1024*1024;
141     gs_int8_t rbuf[2*MAXTUPLESZ];
142     
143     gs_int32_t numberoffields;
144     gs_int32_t verbose=0;
145     gs_int32_t y, lcv;
146     
147     void *pblk;
148     gs_int32_t pblklen;
149         gs_int32_t n_actual_param;
150         gs_int32_t n_expected_param;
151     gs_int32_t xit = 0;
152     gs_int32_t dump = 0;
153     struct timeval tvs, tve, tvd;
154     gs_retval_t code;
155     endpoint gshub;
156     endpoint dummyep;
157     gs_uint32_t tip1,tip2,tip3,tip4;
158     gs_sp_t instance_name;
159
160     gs_uint32_t tlimit = 0;     // time limit in seconds
161     time_t start_time, curr_time;
162     
163         gsopenlog(argv[0]);
164     
165     
166     while ((ch = getopt(argc, argv, "l:p:r:vXD")) != -1) {
167         switch (ch) {
168             case 'r':
169                 bufsz=atoi(optarg);
170                 break;
171             case 'p':
172                 tcpport=atoi(optarg);
173                 break;
174             case 'v':
175                 verbose++;
176                 break;
177             case 'X':
178                 xit++;
179                 break;
180             case 'D':
181                 dump++;
182                 break;
183             case 'l':
184                 tlimit = atoi(optarg);
185                 break;
186             default:
187             usage:
188                 fprintf(stderr, "usage: %s [-r <bufsz>] [-p <port>] [-l <time_limit>] [-v] [-X] [-D] <gshub-hostname>:<gshub-port> <gsinstance_name>  query param1 param2...\n",
189                         *argv);
190                 exit(1);
191         }
192     }
193     argc -= optind;
194     argv += optind;
195     if (argc<3) goto usage;
196     
197     if (sscanf(argv[0],"%u.%u.%u.%u:%hu",&tip1,&tip2,&tip3,&tip4,&(gshub.port))!= 5 ) {
198         gslog(LOG_EMERG,"HUB IP NOT DEFINED");
199         exit(1);
200     }
201     gshub.ip=htonl(tip1<<24|tip2<<16|tip3<<8|tip4);
202     gshub.port=htons(gshub.port);
203     instance_name=strdup(argv[1]);
204     if (set_hub(gshub)!=0) {
205         gslog(LOG_EMERG,"Could not set hub");
206         exit(1);
207     }
208     if (set_instance_name(instance_name)!=0) {
209         gslog(LOG_EMERG,"Could not set instance name");
210         exit(1);
211     }
212     
213     if (get_initinstance(gshub,instance_name,&dummyep,1)!=0) {
214         gslog(LOG_EMERG,"Did not receive signal that GS is initiated\n");
215     }
216     
217     gettimeofday(&tvs, 0);
218     argc -=2;
219     argv +=2;
220     if (argc < 1)
221         goto usage;
222     
223     /* initialize host library and the sgroup  */
224     
225     if (verbose>=2) fprintf(stderr,"Inializin gscp\n");
226     
227     if (ftaapp_init(bufsz)!=0) {
228         fprintf(stderr,"%s::error:could not initialize gscp\n", me);
229         exit(1);
230     }
231     
232     signal(SIGTERM, hand);
233     signal(SIGINT, hand);
234     
235     schema = ftaapp_get_fta_schema_by_name(argv[0]);
236     if (schema < 0) {
237         fprintf(stderr,"%s::error:could not get fta '%s' schema\n",
238                 me ,argv[0]);
239         exit(1);
240     }
241         n_expected_param = ftaschema_parameter_len(schema);
242     if (n_expected_param == 0) {
243         pblk = 0;
244         pblklen = 0;
245     } else {
246         /* parse the params */
247                 n_actual_param = argc-1;
248                 if(n_actual_param < n_expected_param){
249                         fprintf(stderr,"Error, %d query parameters expected, %d provided.\n",n_expected_param, n_actual_param);
250                         exit(1);
251                 }
252         for (lcv = 1 ; lcv < argc ; lcv++) {
253             char *k, *e;
254             int rv;
255             k = argv[lcv];
256             e = k;
257             while (*e && *e != '=') e++;
258             if (*e == 0) {
259                 fprintf(stderr,"param parse error '%s' (fmt 'key=val')\n",
260                         argv[lcv]);
261                 exit(1);
262             }
263             *e = 0;
264             rv = ftaschema_setparam_by_name(schema, k, e+1, strlen(e+1));
265             *e = '=';
266             if (rv < 0) {
267                 fprintf(stderr,"param setparam error '%s' (fmt 'key=val')\n",
268                         argv[lcv]);
269                 exit(1);
270             }
271         }
272         if (ftaschema_create_param_block(schema, &pblk, &pblklen) < 0) {
273             fprintf(stderr, "ftaschema_create_param_block failed!\n");
274             exit(1);
275         }
276     }
277     ftaschema_free(schema); /* XXXCDC */
278     
279     
280     if (verbose>=2) fprintf(stderr,"Initalice FTA\n");
281     
282     fta_id=ftaapp_add_fta(argv[0],0,0,0,pblklen,pblk);
283     if (fta_id.streamid==0) {
284         fprintf(stderr,"%s::error:could not initialize fta %s\n",
285                 me, argv[0]);
286         exit(1);
287     }
288     /* XXXCDC: pblk is malloc'd, should we free it? */
289     
290     if (verbose>=2) fprintf(stderr,"Get schema handle\n");
291     
292     if ((schema=ftaapp_get_fta_schema(fta_id))<0) {
293         fprintf(stderr,"%s::error:could not get schema\n", me);
294         exit(1);
295     }
296     
297     if ((numberoffields=ftaschema_tuple_len(schema))<0) {
298         fprintf(stderr,"%s::error:could not get number of fields in schema\n",
299                 me);
300         exit(1);
301     }
302     
303     if (verbose>=1) {
304         for(y=0; y<numberoffields;y++) {
305             printf("%s",ftaschema_field_name(schema,y));
306             if (y<numberoffields-1) printf("|");
307         }
308         printf("\n");
309     }
310     if (xit) {
311         gettimeofday(&tve, 0);
312         timersub(&tve, &tvs, &tvd);
313         printf("TIME= %ld.%06d sec\n", tvd.tv_sec, tvd.tv_usec);
314         hand(0);
315     }
316     if (tcpport!=0) {
317         wait_for_client();
318     }
319
320     start_time = time(NULL);
321
322     while((code=ftaapp_get_tuple(&rfta_id,&rsize,rbuf,2*MAXTUPLESZ,0))>=0) {
323         if (dump)
324             continue;
325         if (ftaschema_is_eof_tuple(schema, rbuf)) {
326             /* initiate shutdown or something of that nature */
327             printf("#All data proccessed\n");
328             exit(0);
329         }
330         if (!rsize)
331             continue;
332         if (verbose >=2) {
333             snprintf(linebuf,MAXLINE,"RESULT CODE => %u\n",code);
334             emit_line();
335         }
336         if ((code==0)&&(rfta_id.streamid == fta_id.streamid)) {
337             for(y=0; y<numberoffields;y++) {
338                 struct access_result ar;
339                 if (verbose>=2)
340                     printf("%s->",ftaschema_field_name(schema,y));
341                 ar=ftaschema_get_field_by_index(schema,y,rbuf,rsize);
342                 switch (ar.field_data_type) {
343                     case INT_TYPE:
344                         snprintf(linebuf,MAXLINE,"%d",ar.r.i);
345                         break;
346                     case UINT_TYPE:
347                         snprintf(linebuf,MAXLINE,"%u",ar.r.ui);
348                         break;
349                     case IP_TYPE:
350                         snprintf(linebuf,MAXLINE,"%u.%u.%u.%u",ar.r.ui>>24&0xff,
351                                  ar.r.ui>>16&0xff,
352                                  ar.r.ui>>8&0xff,
353                                  ar.r.ui&0xff);
354                         break;
355                     case IPV6_TYPE:
356                     {
357                         unsigned x;
358                         unsigned zc=0;
359                         for(x=0;x<4;x++) { if (ar.r.ip6.v[x]==0) zc++;}
360                         if (zc!=4) {
361                             snprintf(linebuf,MAXLINE,"");
362                             for(x=0;x<8;x++) {
363                                 unsigned char * a = (unsigned char *)  &(ar.r.ip6.v[0]);
364                                 unsigned y;
365                                 y=((unsigned)a[2*x])<<8|((unsigned) a[2*x+1]);
366                                 snprintf(&linebuf[strlen(linebuf)],MAXLINE,"%04x",y);
367                                 if (x<7) snprintf(&linebuf[strlen(linebuf)],MAXLINE,":");
368                             }
369                         } else {
370                             snprintf(linebuf,MAXLINE,"::");
371                         }
372                     }
373                         break;
374                         
375                     case USHORT_TYPE:
376                         snprintf(linebuf,MAXLINE,"%u",ar.r.ui);
377                         break;
378                     case BOOL_TYPE:
379                         if (ar.r.ui==0) {
380                             snprintf(linebuf,MAXLINE,"FALSE");
381                         } else {
382                             snprintf(linebuf,MAXLINE,"TRUE");
383                         }
384                         break;
385                     case ULLONG_TYPE:
386                         snprintf(linebuf,MAXLINE,"%llu",ar.r.ul);
387                         break;
388                     case LLONG_TYPE:
389                         snprintf(linebuf,MAXLINE,"%lld",ar.r.l);
390                         break;
391                     case FLOAT_TYPE:
392                         snprintf(linebuf,MAXLINE,"%f",ar.r.f);
393                         break;
394                     case TIMEVAL_TYPE:
395                     {
396                         gs_float_t t;
397                         t= ar.r.t.tv_usec;
398                         t=t/1000000;
399                         t=t+ar.r.t.tv_sec;
400                         snprintf(linebuf,MAXLINE,"%f sec",t);
401                     }
402                         break;
403                     case VSTR_TYPE:
404                     {
405                         int x;
406                         int c;
407                         int d=0;
408                         char * src;
409                         src=(char*)ar.r.vs.offset;
410                         if(d<MAXLINE){
411                             linebuf[d] = '\0';
412                         }
413                         for(x=0;x<ar.r.vs.length;x++) {
414                             c=src[x];
415                             if ((c<='~') && (c>=' ')) {
416                                 if (d<MAXLINE-1) {
417                                     linebuf[d]=c;
418                                     linebuf[d+1]=0;
419                                     d++;
420                                 }
421                             } else {
422                                 if (d<MAXLINE-1) {
423                                     linebuf[d]='.';
424                                     linebuf[d+1]=0;
425                                     d++;
426                                 }
427                             }
428                         }
429                     }
430                         break;
431                     default:
432                         linebuf[0]=0;
433                         break;
434                 }
435                 if (y<numberoffields-1) snprintf(&linebuf[strlen(linebuf)],MAXLINE,"|");
436                 emit_line();
437             }
438             snprintf(linebuf,MAXLINE,"\n");
439             emit_line();
440             if (verbose!=0) fflush(stdout);
441         } else {
442             if (rfta_id.streamid != fta_id.streamid)
443                 fprintf(stderr,"Got unkown streamid %llu \n",rfta_id.streamid);
444         }
445
446         // whenever we receive a temp tuple check if we reached time limit
447         if ((code==2)  && tlimit && (time(NULL)-start_time)>=tlimit) {
448             fprintf(stderr,"Reached time limit of %d seconds\n",tlimit);    
449             ftaapp_exit();
450             exit(0);
451         }        
452     }
453 }
454