Add gsprintconsole option to print to stderr
[com/gs-lite.git] / src / tools / gsprintconsole.c
1 /* ------------------------------------------------
2  Copyright 2014 AT&T Intellectual Property
3  Licensed under the Apache License, Version 2.0 (the "License");
4  you may not use this file except in compliance with the License.
5  You may obtain a copy of the License at
6  
7  http://www.apache.org/licenses/LICENSE-2.0
8  
9  Unless required by applicable law or agreed to in writing, software
10  distributed under the License is distributed on an "AS IS" BASIS,
11  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  See the License for the specific language governing permissions and
13  limitations under the License.
14  ------------------------------------------- */
15 #include <app.h>
16 #include <stdlib.h>
17 #include <stdio.h>
18 #include <unistd.h>
19 #include <signal.h>
20 #include <time.h>
21 #include <string.h>
22 #include <sys/time.h>
23 #include <sys/stat.h>
24 #include <sys/types.h>
25 #include <sys/socket.h>
26 #include <netinet/in.h>
27
28
29 #include "gsconfig.h"
30 #include "gstypes.h"
31 #include "gshub.h"
32
33 #include <schemaparser.h>
34
35 #define MAXLINE 100000
36 static unsigned tcpport=0;
37 static char linebuf[MAXLINE];
38 int listensockfd=0;
39 int fd=0;
40
41 FILE* outf;
42
43 // Not all systems have timersub defined so make sure its ther
44 #ifndef timersub
45
46 #define timersub(tvp, uvp, vvp)                                         \
47 do {                                                            \
48 (vvp)->tv_sec = (tvp)->tv_sec - (uvp)->tv_sec;          \
49 (vvp)->tv_usec = (tvp)->tv_usec - (uvp)->tv_usec;       \
50 if ((vvp)->tv_usec < 0) {                               \
51 (vvp)->tv_sec--;                                \
52 (vvp)->tv_usec += 1000000;                      \
53 }                                                       \
54 } while (0)
55
56 #endif
57
58 void hand(int iv) {
59     ftaapp_exit();
60     fprintf(stderr, "exiting via signal handler %d...\n", iv);
61     exit(1);
62 }
63
64 static void wait_for_client() {
65     struct sockaddr_in serv_addr,cli_addr;
66     socklen_t clilen;
67     if (listensockfd==0) {
68                 gs_int32_t on = 1;
69                 listensockfd=socket(AF_INET, SOCK_STREAM, 0);
70         if (listensockfd < 0) {
71                         gslog(LOG_EMERG,"Error:Could not create socket for tcp data stream");
72                         exit(1);
73                 }
74                 bzero((char *) &serv_addr, sizeof(serv_addr));
75                 serv_addr.sin_family = AF_INET;
76                 serv_addr.sin_addr.s_addr = INADDR_ANY;
77                 serv_addr.sin_port = htons(tcpport);
78 #ifndef __linux__
79         /* make sure we can reuse the common port rapidly */
80         if (setsockopt(listensockfd, SOL_SOCKET, SO_REUSEPORT,
81                        (gs_sp_t )&on, sizeof(on)) != 0) {
82             gslog(LOG_EMERG,"Error::could not set socket option\n");
83             exit(1);
84         }
85 #endif
86         if (setsockopt(listensockfd, SOL_SOCKET, SO_REUSEADDR,
87                        (gs_sp_t )&on, sizeof(on)) != 0) {
88             gslog(LOG_EMERG,"Error::could not set socket option\n");
89             exit(1);
90                 }
91         
92                 if (bind(listensockfd, (struct sockaddr *) &serv_addr,
93                  sizeof(serv_addr)) < 0) {
94                         gslog(LOG_EMERG,"Error:Could not bind socket for tcp data stream");
95             exit(1);
96         }
97         }
98     
99         do {
100                 listen(listensockfd,5);
101                 clilen = sizeof(cli_addr);
102                 fd=accept(listensockfd, (struct sockaddr *) &cli_addr, &clilen);
103                 if (fd<0) {
104             gslog(LOG_EMERG,"Error:Could not accept connection on tcp socket");
105                 }
106         } while (fd==0);
107 }
108
109
110 static void emit_socket() {
111         unsigned o,w,l;
112         o=0;
113         w=0;
114         l=strlen(linebuf);
115         do {
116                 if((w=write(fd,&linebuf[o],l))==0) {
117                         close(fd);
118                         wait_for_client();
119                 }
120                 o=o+w;
121         } while (o<l);
122 }
123
124 static void emit_line() {
125     
126     if (tcpport==0) {
127         fprintf(outf,"%s",linebuf);
128     } else {
129         emit_socket();
130     }
131     
132 }
133
134 int main(int argc, char* argv[]) {
135     gs_sp_t me = argv[0];
136     FTAID fta_id;
137     gs_int32_t schema, ch;
138     
139     FTAID rfta_id;
140     gs_uint32_t rsize;
141     gs_uint32_t bufsz=8*1024*1024;
142     gs_int8_t rbuf[2*MAXTUPLESZ];
143     
144     gs_int32_t numberoffields;
145     gs_int32_t verbose=0;
146     gs_int32_t y, lcv;
147     
148     void *pblk;
149     gs_int32_t pblklen;
150         gs_int32_t n_actual_param;
151         gs_int32_t n_expected_param;
152     gs_int32_t xit = 0;
153     gs_int32_t dump = 0;
154     struct timeval tvs, tve, tvd;
155     gs_retval_t code;
156     endpoint gshub;
157     endpoint dummyep;
158     gs_uint32_t tip1,tip2,tip3,tip4;
159     gs_sp_t instance_name;
160
161     gs_uint32_t tlimit = 0;     // time limit in seconds
162     time_t start_time, curr_time;
163     
164         gsopenlog(argv[0]);
165
166     // by default the output will go to stdout
167     outf = stdout;
168     
169     while ((ch = getopt(argc, argv, "l:p:r:vXD")) != -1) {
170         switch (ch) {
171             case 'r':
172                 bufsz=atoi(optarg);
173                 break;
174             case 'p':
175                 tcpport=atoi(optarg);
176                 break;
177             case 'v':
178                 verbose++;
179                 break;
180             case 'e':
181                 outf = stderr;
182                 break;
183             case 'X':
184                 xit++;
185                 break;
186             case 'D':
187                 dump++;
188                 break;
189             case 'l':
190                 tlimit = atoi(optarg);
191                 break;
192             default:
193             usage:
194                 fprintf(stderr, "usage: %s [-r <bufsz>] [-e] [-p <port>] [-l <time_limit>] [-v] [-X] [-D] <gshub-hostname>:<gshub-port> <gsinstance_name>  query param1 param2...\n",
195                         *argv);
196                 exit(1);
197         }
198     }
199     argc -= optind;
200     argv += optind;
201     if (argc<3) goto usage;
202     
203     if (sscanf(argv[0],"%u.%u.%u.%u:%hu",&tip1,&tip2,&tip3,&tip4,&(gshub.port))!= 5 ) {
204         gslog(LOG_EMERG,"HUB IP NOT DEFINED");
205         exit(1);
206     }
207     gshub.ip=htonl(tip1<<24|tip2<<16|tip3<<8|tip4);
208     gshub.port=htons(gshub.port);
209     instance_name=strdup(argv[1]);
210     if (set_hub(gshub)!=0) {
211         gslog(LOG_EMERG,"Could not set hub");
212         exit(1);
213     }
214     if (set_instance_name(instance_name)!=0) {
215         gslog(LOG_EMERG,"Could not set instance name");
216         exit(1);
217     }
218     
219     if (get_initinstance(gshub,instance_name,&dummyep,1)!=0) {
220         gslog(LOG_EMERG,"Did not receive signal that GS is initiated\n");
221     }
222     
223     gettimeofday(&tvs, 0);
224     argc -=2;
225     argv +=2;
226     if (argc < 1)
227         goto usage;
228     
229     /* initialize host library and the sgroup  */
230     
231     if (verbose>=2) fprintf(stderr,"Inializin gscp\n");
232     
233     if (ftaapp_init(bufsz)!=0) {
234         fprintf(stderr,"%s::error:could not initialize gscp\n", me);
235         exit(1);
236     }
237     
238     signal(SIGTERM, hand);
239     signal(SIGINT, hand);
240     
241     schema = ftaapp_get_fta_schema_by_name(argv[0]);
242     if (schema < 0) {
243         fprintf(stderr,"%s::error:could not get fta '%s' schema\n",
244                 me ,argv[0]);
245         exit(1);
246     }
247         n_expected_param = ftaschema_parameter_len(schema);
248     if (n_expected_param == 0) {
249         pblk = 0;
250         pblklen = 0;
251     } else {
252         /* parse the params */
253                 n_actual_param = argc-1;
254                 if(n_actual_param < n_expected_param){
255                         fprintf(stderr,"Error, %d query parameters expected, %d provided.\n",n_expected_param, n_actual_param);
256                         exit(1);
257                 }
258         for (lcv = 1 ; lcv < argc ; lcv++) {
259             char *k, *e;
260             int rv;
261             k = argv[lcv];
262             e = k;
263             while (*e && *e != '=') e++;
264             if (*e == 0) {
265                 fprintf(stderr,"param parse error '%s' (fmt 'key=val')\n",
266                         argv[lcv]);
267                 exit(1);
268             }
269             *e = 0;
270             rv = ftaschema_setparam_by_name(schema, k, e+1, strlen(e+1));
271             *e = '=';
272             if (rv < 0) {
273                 fprintf(stderr,"param setparam error '%s' (fmt 'key=val')\n",
274                         argv[lcv]);
275                 exit(1);
276             }
277         }
278         if (ftaschema_create_param_block(schema, &pblk, &pblklen) < 0) {
279             fprintf(stderr, "ftaschema_create_param_block failed!\n");
280             exit(1);
281         }
282     }
283     ftaschema_free(schema); /* XXXCDC */
284     
285     
286     if (verbose>=2) fprintf(stderr,"Initalice FTA\n");
287     
288     fta_id=ftaapp_add_fta(argv[0],0,0,0,pblklen,pblk);
289     if (fta_id.streamid==0) {
290         fprintf(stderr,"%s::error:could not initialize fta %s\n",
291                 me, argv[0]);
292         exit(1);
293     }
294     /* XXXCDC: pblk is malloc'd, should we free it? */
295     
296     if (verbose>=2) fprintf(stderr,"Get schema handle\n");
297     
298     if ((schema=ftaapp_get_fta_schema(fta_id))<0) {
299         fprintf(stderr,"%s::error:could not get schema\n", me);
300         exit(1);
301     }
302     
303     if ((numberoffields=ftaschema_tuple_len(schema))<0) {
304         fprintf(stderr,"%s::error:could not get number of fields in schema\n",
305                 me);
306         exit(1);
307     }
308     
309     if (verbose>=1) {
310         for(y=0; y<numberoffields;y++) {
311             printf("%s",ftaschema_field_name(schema,y));
312             if (y<numberoffields-1) printf("|");
313         }
314         printf("\n");
315     }
316     if (xit) {
317         gettimeofday(&tve, 0);
318         timersub(&tve, &tvs, &tvd);
319         printf("TIME= %ld.%06d sec\n", tvd.tv_sec, tvd.tv_usec);
320         hand(0);
321     }
322     if (tcpport!=0) {
323         wait_for_client();
324     }
325
326     start_time = time(NULL);
327
328     while((code=ftaapp_get_tuple(&rfta_id,&rsize,rbuf,2*MAXTUPLESZ,0))>=0) {
329         if (dump)
330             continue;
331         if (ftaschema_is_eof_tuple(schema, rbuf)) {
332             /* initiate shutdown or something of that nature */
333             printf("#All data proccessed\n");
334             exit(0);
335         }
336         if (!rsize)
337             continue;
338         if (verbose >=2) {
339             snprintf(linebuf,MAXLINE,"RESULT CODE => %u\n",code);
340             emit_line();
341         }
342         if ((code==0)&&(rfta_id.streamid == fta_id.streamid)) {
343             for(y=0; y<numberoffields;y++) {
344                 struct access_result ar;
345                 if (verbose>=2)
346                     printf("%s->",ftaschema_field_name(schema,y));
347                 ar=ftaschema_get_field_by_index(schema,y,rbuf,rsize);
348                 switch (ar.field_data_type) {
349                     case INT_TYPE:
350                         snprintf(linebuf,MAXLINE,"%d",ar.r.i);
351                         break;
352                     case UINT_TYPE:
353                         snprintf(linebuf,MAXLINE,"%u",ar.r.ui);
354                         break;
355                     case IP_TYPE:
356                         snprintf(linebuf,MAXLINE,"%u.%u.%u.%u",ar.r.ui>>24&0xff,
357                                  ar.r.ui>>16&0xff,
358                                  ar.r.ui>>8&0xff,
359                                  ar.r.ui&0xff);
360                         break;
361                     case IPV6_TYPE:
362                     {
363                         unsigned x;
364                         unsigned zc=0;
365                         for(x=0;x<4;x++) { if (ar.r.ip6.v[x]==0) zc++;}
366                         if (zc!=4) {
367                             snprintf(linebuf,MAXLINE,"");
368                             for(x=0;x<8;x++) {
369                                 unsigned char * a = (unsigned char *)  &(ar.r.ip6.v[0]);
370                                 unsigned y;
371                                 y=((unsigned)a[2*x])<<8|((unsigned) a[2*x+1]);
372                                 snprintf(&linebuf[strlen(linebuf)],MAXLINE,"%04x",y);
373                                 if (x<7) snprintf(&linebuf[strlen(linebuf)],MAXLINE,":");
374                             }
375                         } else {
376                             snprintf(linebuf,MAXLINE,"::");
377                         }
378                     }
379                         break;
380                         
381                     case USHORT_TYPE:
382                         snprintf(linebuf,MAXLINE,"%u",ar.r.ui);
383                         break;
384                     case BOOL_TYPE:
385                         if (ar.r.ui==0) {
386                             snprintf(linebuf,MAXLINE,"FALSE");
387                         } else {
388                             snprintf(linebuf,MAXLINE,"TRUE");
389                         }
390                         break;
391                     case ULLONG_TYPE:
392                         snprintf(linebuf,MAXLINE,"%llu",ar.r.ul);
393                         break;
394                     case LLONG_TYPE:
395                         snprintf(linebuf,MAXLINE,"%lld",ar.r.l);
396                         break;
397                     case FLOAT_TYPE:
398                         snprintf(linebuf,MAXLINE,"%f",ar.r.f);
399                         break;
400                     case TIMEVAL_TYPE:
401                     {
402                         gs_float_t t;
403                         t= ar.r.t.tv_usec;
404                         t=t/1000000;
405                         t=t+ar.r.t.tv_sec;
406                         snprintf(linebuf,MAXLINE,"%f sec",t);
407                     }
408                         break;
409                     case VSTR_TYPE:
410                     {
411                         int x;
412                         int c;
413                         int d=0;
414                         char * src;
415                         src=(char*)ar.r.vs.offset;
416                         if(d<MAXLINE){
417                             linebuf[d] = '\0';
418                         }
419                         for(x=0;x<ar.r.vs.length;x++) {
420                             c=src[x];
421                             if ((c<='~') && (c>=' ')) {
422                                 if (d<MAXLINE-1) {
423                                     linebuf[d]=c;
424                                     linebuf[d+1]=0;
425                                     d++;
426                                 }
427                             } else {
428                                 if (d<MAXLINE-1) {
429                                     linebuf[d]='.';
430                                     linebuf[d+1]=0;
431                                     d++;
432                                 }
433                             }
434                         }
435                     }
436                         break;
437                     default:
438                         linebuf[0]=0;
439                         break;
440                 }
441                 if (y<numberoffields-1) snprintf(&linebuf[strlen(linebuf)],MAXLINE,"|");
442                 emit_line();
443             }
444             snprintf(linebuf,MAXLINE,"\n");
445             emit_line();
446             if (verbose!=0) fflush(stdout);
447         } else {
448             if (rfta_id.streamid != fta_id.streamid)
449                 fprintf(stderr,"Got unkown streamid %llu \n",rfta_id.streamid);
450         }
451
452         // whenever we receive a temp tuple check if we reached time limit
453         if ((code==2)  && tlimit && (time(NULL)-start_time)>=tlimit) {
454             fprintf(stderr,"Reached time limit of %d seconds\n",tlimit);    
455             ftaapp_exit();
456             exit(0);
457         }        
458     }
459 }
460