Add new udafs and RMR support to gsprintconsole_ves
[com/gs-lite.git] / src / tools / gsprintconsole.c
1 /* ------------------------------------------------
2  Copyright 2014 AT&T Intellectual Property
3  Licensed under the Apache License, Version 2.0 (the "License");
4  you may not use this file except in compliance with the License.
5  You may obtain a copy of the License at
6  
7  http://www.apache.org/licenses/LICENSE-2.0
8  
9  Unless required by applicable law or agreed to in writing, software
10  distributed under the License is distributed on an "AS IS" BASIS,
11  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  See the License for the specific language governing permissions and
13  limitations under the License.
14  ------------------------------------------- */
15 #include <app.h>
16 #include <stdlib.h>
17 #include <stdio.h>
18 #include <unistd.h>
19 #include <signal.h>
20 #include <time.h>
21 #include <string.h>
22 #include <sys/time.h>
23 #include <sys/stat.h>
24 #include <sys/types.h>
25 #include <sys/socket.h>
26 #include <netinet/in.h>
27
28
29 #include "gsconfig.h"
30 #include "gstypes.h"
31 #include "gshub.h"
32
33 #include <schemaparser.h>
34
35 #define MAXLINE 100000
36 static unsigned tcpport=0;
37 static char linebuf[MAXLINE];
38 int listensockfd=0;
39 int fd=0;
40
41 FILE* outf;
42
43 // Not all systems have timersub defined so make sure its ther
44 #ifndef timersub
45
46 #define timersub(tvp, uvp, vvp)                                         \
47 do {                                                            \
48 (vvp)->tv_sec = (tvp)->tv_sec - (uvp)->tv_sec;          \
49 (vvp)->tv_usec = (tvp)->tv_usec - (uvp)->tv_usec;       \
50 if ((vvp)->tv_usec < 0) {                               \
51 (vvp)->tv_sec--;                                \
52 (vvp)->tv_usec += 1000000;                      \
53 }                                                       \
54 } while (0)
55
56 #endif
57
58 void hand(int iv) {
59     ftaapp_exit();
60     fprintf(stderr, "exiting via signal handler %d...\n", iv);
61     exit(1);
62 }
63
64 static void wait_for_client() {
65     struct sockaddr_in serv_addr,cli_addr;
66     socklen_t clilen;
67     if (listensockfd==0) {
68                 gs_int32_t on = 1;
69                 listensockfd=socket(AF_INET, SOCK_STREAM, 0);
70         if (listensockfd < 0) {
71                         gslog(LOG_EMERG,"Error:Could not create socket for tcp data stream");
72                         exit(1);
73                 }
74                 bzero((char *) &serv_addr, sizeof(serv_addr));
75                 serv_addr.sin_family = AF_INET;
76                 serv_addr.sin_addr.s_addr = INADDR_ANY;
77                 serv_addr.sin_port = htons(tcpport);
78 #ifndef __linux__
79         /* make sure we can reuse the common port rapidly */
80         if (setsockopt(listensockfd, SOL_SOCKET, SO_REUSEPORT,
81                        (gs_sp_t )&on, sizeof(on)) != 0) {
82             gslog(LOG_EMERG,"Error::could not set socket option\n");
83             exit(1);
84         }
85 #endif
86         if (setsockopt(listensockfd, SOL_SOCKET, SO_REUSEADDR,
87                        (gs_sp_t )&on, sizeof(on)) != 0) {
88             gslog(LOG_EMERG,"Error::could not set socket option\n");
89             exit(1);
90                 }
91         
92                 if (bind(listensockfd, (struct sockaddr *) &serv_addr,
93                  sizeof(serv_addr)) < 0) {
94                         gslog(LOG_EMERG,"Error:Could not bind socket for tcp data stream");
95             exit(1);
96         }
97         }
98     
99         do {
100                 listen(listensockfd,5);
101                 clilen = sizeof(cli_addr);
102                 fd=accept(listensockfd, (struct sockaddr *) &cli_addr, &clilen);
103                 if (fd<0) {
104             gslog(LOG_EMERG,"Error:Could not accept connection on tcp socket");
105                 }
106         } while (fd==0);
107 }
108
109
110 static void emit_socket() {
111         unsigned o,w,l;
112         o=0;
113         w=0;
114         l=strlen(linebuf);
115         do {
116                 if((w=write(fd,&linebuf[o],l))==0) {
117                         close(fd);
118                         wait_for_client();
119                 }
120                 o=o+w;
121         } while (o<l);
122 }
123
124 static void emit_line() {
125     
126     if (tcpport==0) {
127         fprintf(outf,"%s",linebuf);
128     } else {
129         emit_socket();
130     }
131     
132 }
133
134 int main(int argc, char* argv[]) {
135     gs_sp_t me = argv[0];
136     FTAID fta_id;
137     gs_int32_t schema, ch;
138     
139     FTAID rfta_id;
140     gs_uint32_t rsize;
141     gs_uint32_t bufsz=8*1024*1024;
142     gs_int8_t rbuf[2*MAXTUPLESZ];
143     
144     gs_int32_t numberoffields;
145     gs_int32_t verbose=0;
146     gs_int32_t y, lcv;
147     
148     void *pblk;
149     gs_int32_t pblklen;
150         gs_int32_t n_actual_param;
151         gs_int32_t n_expected_param;
152     gs_int32_t xit = 0;
153     gs_int32_t dump = 0;
154     struct timeval tvs, tve, tvd;
155     gs_retval_t code;
156     endpoint gshub;
157     endpoint dummyep;
158     gs_uint32_t tip1,tip2,tip3,tip4;
159     gs_sp_t instance_name;
160
161         char sep_str[2];
162
163     gs_uint32_t tlimit = 0;     // time limit in seconds
164     time_t start_time, curr_time;
165
166         sep_str[0] = '|';
167         sep_str[1] = '\0';
168     
169         gsopenlog(argv[0]);
170
171     // by default the output will go to stdout
172     outf = stdout;
173     
174     while ((ch = getopt(argc, argv, "l:p:r:sveXD")) != -1) {
175         switch (ch) {
176             case 's':
177                 sep_str[0]=',';
178                 break;
179             case 'r':
180                 bufsz=atoi(optarg);
181                 break;
182             case 'p':
183                 tcpport=atoi(optarg);
184                 break;
185             case 'v':
186                 verbose++;
187                 break;
188             case 'e':
189                 outf = stderr;
190                 break;
191             case 'X':
192                 xit++;
193                 break;
194             case 'D':
195                 dump++;
196                 break;
197             case 'l':
198                 tlimit = atoi(optarg);
199                 break;
200             default:
201             usage:
202                 fprintf(stderr, "usage: %s [-r <bufsz>] [-e] [-p <port>] [-l <time_limit>] [-v] [-X] [-D] <gshub-hostname>:<gshub-port> <gsinstance_name>  query param1 param2...\n",
203                         *argv);
204                 exit(1);
205         }
206     }
207     argc -= optind;
208     argv += optind;
209     if (argc<3) goto usage;
210     
211     if (sscanf(argv[0],"%u.%u.%u.%u:%hu",&tip1,&tip2,&tip3,&tip4,&(gshub.port))!= 5 ) {
212         gslog(LOG_EMERG,"HUB IP NOT DEFINED");
213         exit(1);
214     }
215     gshub.ip=htonl(tip1<<24|tip2<<16|tip3<<8|tip4);
216     gshub.port=htons(gshub.port);
217     instance_name=strdup(argv[1]);
218     if (set_hub(gshub)!=0) {
219         gslog(LOG_EMERG,"Could not set hub");
220         exit(1);
221     }
222     if (set_instance_name(instance_name)!=0) {
223         gslog(LOG_EMERG,"Could not set instance name");
224         exit(1);
225     }
226     
227     if (get_initinstance(gshub,instance_name,&dummyep,1)!=0) {
228         gslog(LOG_EMERG,"Did not receive signal that GS is initiated\n");
229     }
230     
231     gettimeofday(&tvs, 0);
232     argc -=2;
233     argv +=2;
234     if (argc < 1)
235         goto usage;
236     
237     /* initialize host library and the sgroup  */
238     
239     if (verbose>=2) fprintf(stderr,"Inializin gscp\n");
240     
241     if (ftaapp_init(bufsz)!=0) {
242         fprintf(stderr,"%s::error:could not initialize gscp\n", me);
243         exit(1);
244     }
245     
246     signal(SIGTERM, hand);
247     signal(SIGINT, hand);
248     
249     schema = ftaapp_get_fta_schema_by_name(argv[0]);
250     if (schema < 0) {
251         fprintf(stderr,"%s::error:could not get fta '%s' schema\n",
252                 me ,argv[0]);
253         exit(1);
254     }
255         n_expected_param = ftaschema_parameter_len(schema);
256     if (n_expected_param == 0) {
257         pblk = 0;
258         pblklen = 0;
259     } else {
260         /* parse the params */
261                 n_actual_param = argc-1;
262                 if(n_actual_param < n_expected_param){
263                         fprintf(stderr,"Error, %d query parameters expected, %d provided.\n",n_expected_param, n_actual_param);
264                         exit(1);
265                 }
266         for (lcv = 1 ; lcv < argc ; lcv++) {
267             char *k, *e;
268             int rv;
269             k = argv[lcv];
270             e = k;
271             while (*e && *e != '=') e++;
272             if (*e == 0) {
273                 fprintf(stderr,"param parse error '%s' (fmt 'key=val')\n",
274                         argv[lcv]);
275                 exit(1);
276             }
277             *e = 0;
278             rv = ftaschema_setparam_by_name(schema, k, e+1, strlen(e+1));
279             *e = '=';
280             if (rv < 0) {
281                 fprintf(stderr,"param setparam error '%s' (fmt 'key=val')\n",
282                         argv[lcv]);
283                 exit(1);
284             }
285         }
286         if (ftaschema_create_param_block(schema, &pblk, &pblklen) < 0) {
287             fprintf(stderr, "ftaschema_create_param_block failed!\n");
288             exit(1);
289         }
290     }
291     ftaschema_free(schema); /* XXXCDC */
292     
293     
294     if (verbose>=2) fprintf(stderr,"Initalice FTA\n");
295     
296     fta_id=ftaapp_add_fta(argv[0],0,0,0,pblklen,pblk);
297     if (fta_id.streamid==0) {
298         fprintf(stderr,"%s::error:could not initialize fta %s\n",
299                 me, argv[0]);
300         exit(1);
301     }
302     /* XXXCDC: pblk is malloc'd, should we free it? */
303     
304     if (verbose>=2) fprintf(stderr,"Get schema handle\n");
305     
306     if ((schema=ftaapp_get_fta_schema(fta_id))<0) {
307         fprintf(stderr,"%s::error:could not get schema\n", me);
308         exit(1);
309     }
310     
311     if ((numberoffields=ftaschema_tuple_len(schema))<0) {
312         fprintf(stderr,"%s::error:could not get number of fields in schema\n",
313                 me);
314         exit(1);
315     }
316     
317     if (verbose>=1) {
318         for(y=0; y<numberoffields;y++) {
319             printf("%s",ftaschema_field_name(schema,y));
320             if (y<numberoffields-1) printf("|");
321         }
322         printf("\n");
323     }
324     if (xit) {
325         gettimeofday(&tve, 0);
326         timersub(&tve, &tvs, &tvd);
327         printf("TIME= %ld.%06d sec\n", tvd.tv_sec, tvd.tv_usec);
328         hand(0);
329     }
330     if (tcpport!=0) {
331         wait_for_client();
332     }
333
334     start_time = time(NULL);
335
336     while((code=ftaapp_get_tuple(&rfta_id,&rsize,rbuf,2*MAXTUPLESZ,0))>=0) {
337         if (dump)
338             continue;
339         if (ftaschema_is_eof_tuple(schema, rbuf)) {
340             /* initiate shutdown or something of that nature */
341             printf("#All data proccessed\n");
342             exit(0);
343         }
344         if (!rsize)
345             continue;
346         if (verbose >=2) {
347             snprintf(linebuf,MAXLINE,"RESULT CODE => %u\n",code);
348             emit_line();
349         }
350         if ((code==0)&&(rfta_id.streamid == fta_id.streamid)) {
351             for(y=0; y<numberoffields;y++) {
352                 struct access_result ar;
353                 if (verbose>=2)
354                     printf("%s->",ftaschema_field_name(schema,y));
355                 ar=ftaschema_get_field_by_index(schema,y,rbuf,rsize);
356                 switch (ar.field_data_type) {
357                     case INT_TYPE:
358                         snprintf(linebuf,MAXLINE,"%d",ar.r.i);
359                         break;
360                     case UINT_TYPE:
361                         snprintf(linebuf,MAXLINE,"%u",ar.r.ui);
362                         break;
363                     case IP_TYPE:
364                         snprintf(linebuf,MAXLINE,"%u.%u.%u.%u",ar.r.ui>>24&0xff,
365                                  ar.r.ui>>16&0xff,
366                                  ar.r.ui>>8&0xff,
367                                  ar.r.ui&0xff);
368                         break;
369                     case IPV6_TYPE:
370                     {
371                         unsigned x;
372                         unsigned zc=0;
373                         for(x=0;x<4;x++) { if (ar.r.ip6.v[x]==0) zc++;}
374                         if (zc!=4) {
375                             snprintf(linebuf,MAXLINE,"");
376                             for(x=0;x<8;x++) {
377                                 unsigned char * a = (unsigned char *)  &(ar.r.ip6.v[0]);
378                                 unsigned y;
379                                 y=((unsigned)a[2*x])<<8|((unsigned) a[2*x+1]);
380                                 snprintf(&linebuf[strlen(linebuf)],MAXLINE,"%04x",y);
381                                 if (x<7) snprintf(&linebuf[strlen(linebuf)],MAXLINE,":");
382                             }
383                         } else {
384                             snprintf(linebuf,MAXLINE,"::");
385                         }
386                     }
387                         break;
388                         
389                     case USHORT_TYPE:
390                         snprintf(linebuf,MAXLINE,"%u",ar.r.ui);
391                         break;
392                     case BOOL_TYPE:
393                         if (ar.r.ui==0) {
394                             snprintf(linebuf,MAXLINE,"FALSE");
395                         } else {
396                             snprintf(linebuf,MAXLINE,"TRUE");
397                         }
398                         break;
399                     case ULLONG_TYPE:
400                         snprintf(linebuf,MAXLINE,"%llu",ar.r.ul);
401                         break;
402                     case LLONG_TYPE:
403                         snprintf(linebuf,MAXLINE,"%lld",ar.r.l);
404                         break;
405                     case FLOAT_TYPE:
406                         snprintf(linebuf,MAXLINE,"%f",ar.r.f);
407                         break;
408                     case TIMEVAL_TYPE:
409                     {
410                         gs_float_t t;
411                         t= ar.r.t.tv_usec;
412                         t=t/1000000;
413                         t=t+ar.r.t.tv_sec;
414                         snprintf(linebuf,MAXLINE,"%f sec",t);
415                     }
416                         break;
417                     case VSTR_TYPE:
418                     {
419                         int x;
420                         int c;
421                         int d=0;
422                         char * src;
423                         src=(char*)ar.r.vs.offset;
424                         if(d<MAXLINE){
425                             linebuf[d] = '\0';
426                         }
427                         for(x=0;x<ar.r.vs.length;x++) {
428                             c=src[x];
429                             if ((c<='~') && (c>=' ')) {
430                                 if (d<MAXLINE-1) {
431                                     linebuf[d]=c;
432                                     linebuf[d+1]=0;
433                                     d++;
434                                 }
435                             } else {
436                                 if (d<MAXLINE-1) {
437                                     linebuf[d]='.';
438                                     linebuf[d+1]=0;
439                                     d++;
440                                 }
441                             }
442                         }
443                     }
444                         break;
445                     default:
446                         linebuf[0]=0;
447                         break;
448                 }
449 //                if (y<numberoffields-1) snprintf(&linebuf[strlen(linebuf)],MAXLINE,"|");
450                 if (y<numberoffields-1) snprintf(&linebuf[strlen(linebuf)],MAXLINE,sep_str);
451                 emit_line();
452             }
453             snprintf(linebuf,MAXLINE,"\n");
454             emit_line();
455             if (verbose!=0) fflush(stdout);
456         } else {
457             if (rfta_id.streamid != fta_id.streamid)
458                 fprintf(stderr,"Got unkown streamid %llu \n",rfta_id.streamid);
459         }
460
461         // whenever we receive a temp tuple check if we reached time limit
462         if ((code==2)  && tlimit && (time(NULL)-start_time)>=tlimit) {
463             fprintf(stderr,"Reached time limit of %d seconds\n",tlimit);    
464             ftaapp_exit();
465             exit(0);
466         }        
467     }
468 }
469