Fixed newline characters throughout the code
[com/gs-lite.git] / src / tools / gsgdatprint.c
1 /* ------------------------------------------------
2  Copyright 2014 AT&T Intellectual Property
3  Licensed under the Apache License, Version 2.0 (the "License");
4  you may not use this file except in compliance with the License.
5  You may obtain a copy of the License at
6  
7  http://www.apache.org/licenses/LICENSE-2.0
8  
9  Unless required by applicable law or agreed to in writing, software
10  distributed under the License is distributed on an "AS IS" BASIS,
11  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  See the License for the specific language governing permissions and
13  limitations under the License.
14  ------------------------------------------- */
15 #include <app.h>
16 #include <stdlib.h>
17 #include <string.h>
18 #include <stdio.h>
19 #include <unistd.h>
20 #include <signal.h>
21 #include <time.h>
22 #include <string.h>
23 #include <sys/time.h>
24 #include <sys/stat.h>
25 #include <sys/types.h>
26 #include <sys/socket.h>
27 #include <netinet/in.h>
28
29 #include "gsconfig.h"
30 #include "gstypes.h"
31 #include <schemaparser.h>
32 #include <gshub.h>
33
34 #define MAXSTRLEN 256
35 #define MAXNUMFIELDS 64
36
37 //#define GSBZIP2
38
39 #ifdef GSBZIP2
40 #include "bzlib.h"
41 #endif
42
43 #ifdef ZLIB
44 #include "zlib.h"
45 #endif
46
47 gs_sp_t me;     /* saved copy argv[0] */
48
49 struct FTA_state {
50     FTAID fta_id;
51     gs_schemahandle_t schema;
52     gs_sp_t asciischema;
53     gs_int32_t numfields;
54     gs_int32_t timefieldoffset;
55 };
56
57 struct gsasciiprint_state {
58     // configuration state
59     gs_uint32_t bufsz;
60     gs_int32_t stream;
61     gs_int32_t flush;
62     gs_int32_t parserversion;
63     gs_int32_t compressed;
64     gs_int32_t verbose;
65     gs_int32_t notemp;
66     gs_sp_t timefield;
67     gs_int32_t  interval;
68     gs_int32_t  quitcnt;
69     gs_int32_t  quittime;
70     gs_int32_t  numfields;
71     gs_sp_t extension;
72     gs_sp_t query;
73     gs_int32_t remote_print;
74     
75     
76     // runtime state
77     struct FTA_state fs;
78 };
79
80
81 void hand(int iv) {
82     ftaapp_exit();
83     gslog(LOG_NOTICE, "exiting via signal handler %d...\n", iv);
84     exit(0);
85 }
86
87 void timeouthand(int iv) {
88     ftaapp_exit();
89     //  if (s->verbose!=0) fprintf(stderr, "exiting because of timeout...\n");
90     exit(0);
91 }
92
93
94 static void print_usage_exit(gs_sp_t reason) {
95     fprintf(stderr,
96             "%s::error: %s\n"
97             "%s::usage: %s -r <int> -v -f -s -z -c <int> -q <int> -b <field> -t <int> -e <string> <gshub-hostname>:<gshub-port> <gsinstance_name>   <query_name> <parameters>\n"
98             "\t-v makes output verbose\n"
99             "\t-f flushes each tuple individually\n"
100             "\t-h <int> print within the hfta int identifies how many output streams based on the SubInterface field\n"
101             "\t-r sets the ringbuffer size default is 8MB\n"
102             "\t-s uses %s in streaming mode cannot be used with -v -z -b -t -e\n"
103             "\t-c <int> indicates that %s should terminate after <int> tuples.\n"
104             "\t-q <int> indicates that %s should terminate after <int> seconds.\n"
105             "\t-z the output is compressed with gzip \n"
106             "\t-b <field> identifies the field which is increasing and is used to\n"
107             "\t\tbin output files. Field has to be of type uint.\n"
108             "\t\tThe default is the UNIX system time\n"
109             "\t-t <int> specifies the number the <field> specified with -b\n"
110             "\t\thas to increase before a new output file is used\n"
111             "\t\tthe default is 60 for 1 minute if the default UNIX time is used\n"
112             "\t-e <string> identifies the file string extension used for the \n"
113             "\t\toutput file. The output file always starts with the current value\n"
114             "\t\tof the b field (or UNIX time) the default extension is .txt \n"
115             "\t\twithout the -z option and .txt.gz with the -z option\n"
116             "\t<query_name> specifies the query which will be\n"
117             "\t\t instanciated.\n"
118             "\t<parameters> sets the parameters for the query\n"
119             , me, reason, me, me, me,me,me);
120     exit(1);
121 }
122
123
124
125
126 static void init(gs_int32_t argc, gs_sp_t argv[], struct gsasciiprint_state *s) {
127     void *pblk;
128     gs_int32_t x, y, schema, pblklen, lcv;
129         gs_int32_t n_actual_param;
130         gs_int32_t n_expected_param;
131     gs_sp_t c;
132     gs_int8_t name[1024];
133     
134     sprintf(name,"gsgdatprint:%s",argv[0]);
135     
136     gsopenlog(name);
137     
138     
139     if (s->verbose!=0) gslog(LOG_DEBUG,"Initializing gscp\n");
140     if (ftaapp_init(s->bufsz)!=0) {
141         gslog(LOG_EMERG,"%s::error:could not initialize gscp\n", me);
142         exit(1);
143     }
144     
145     signal(SIGTERM, hand);
146     signal(SIGINT, hand);
147     signal(SIGPIPE, hand);
148     
149     if (s->verbose!=0) gslog(LOG_DEBUG,"Initializing FTAs\n");
150     
151     schema = ftaapp_get_fta_schema_by_name(argv[0]);
152     if (schema < 0) {
153         gslog(LOG_EMERG,"%s::error:can't get fta '%s' schema\n", me ,argv[0]);
154         exit(1);
155     }
156         n_expected_param = ftaschema_parameter_len(schema);
157     if (n_expected_param == 0) {
158         pblk = 0;
159         pblklen = 0;
160         if (s->verbose) gslog(LOG_DEBUG,"[query does not have any params]\n");
161     } else {
162                 n_actual_param = argc-1;
163                 if(n_actual_param < n_expected_param){
164                         gslog(LOG_EMERG,"Error, %d query parameters expected, %d provided.\n",n_expected_param, n_actual_param);
165                         exit(1);
166                 }
167         /* parse the params */
168         for (lcv = 1 ; lcv < argc ; lcv++) {
169             char *k, *e;
170             int rv;
171             k = argv[lcv];
172             e = k;
173             while (*e && *e != '=') e++;
174             if (*e == 0) {
175                 gslog(LOG_EMERG,"param parse error '%s' (fmt 'key=val')\n",
176                       argv[lcv]);
177                 exit(1);
178             }
179             *e = 0;
180             rv = ftaschema_setparam_by_name(schema, k, e+1, strlen(e+1));
181             *e = '=';
182             if (rv < 0) {
183                 gslog(LOG_EMERG,"param setparam error '%s' (fmt 'key=val')\n",
184                       argv[lcv]);
185                 exit(1);
186             }
187         }
188         if (ftaschema_create_param_block(schema, &pblk, &pblklen) < 0) {
189             gslog(LOG_EMERG, "ftaschema_create_param_block failed!\n");
190             exit(1);
191         }
192     }
193     ftaschema_free(schema); /* XXXCDC */
194     
195     
196         if (s->remote_print>=0) {
197                 s->fs.fta_id=ftaapp_add_fta_print(s->query,pblk?0:1,pblk?0:1,0,pblklen,pblk,"./data",s->extension,s->timefield,"SubInterface",s->interval,s->remote_print);
198                 if (s->fs.fta_id.streamid==0){
199             gslog(LOG_EMERG,"%s::error:could not initialize fta_print %s\n",
200                   me,s->query);
201             exit(1);
202                 }
203     again:
204         // wait forever
205         sleep(60);
206         goto again;
207         }
208     
209     
210     s->fs.fta_id=ftaapp_add_fta(s->query,pblk?0:1,pblk?0:1,0,pblklen,pblk);
211     if (s->fs.fta_id.streamid==0){
212         gslog(LOG_EMERG,"%s::error:could not initialize fta %s\n",
213               me,s->query);
214         exit(1);
215     }
216     /* XXXCDC: pblk is malloc'd, should we free it? */
217     
218     if ((c=ftaapp_get_fta_ascii_schema_by_name(s->query))==0){
219         gslog(LOG_EMERG,"%s::error:could not get ascii schema for %s\n",
220               me,s->query);
221         exit(1);
222     }
223     
224     //ftaapp_get_fta_ascii_schema_by_name uses static buffer so make a copy
225     s->fs.asciischema=strdup(c);
226     
227     
228     // Set parser version here
229     s->parserversion=get_schemaparser_version();
230     
231     if ((s->fs.schema=ftaapp_get_fta_schema(s->fs.fta_id))<0) {
232         gslog(LOG_EMERG,"%s::error:could not get schema for query\n",
233               me,s->query);
234         exit(1);
235     }
236     
237     // Use all available fields
238     if ((s->fs.numfields=ftaschema_tuple_len(s->fs.schema))<0) {
239         gslog(LOG_EMERG,"%s::error:could not get number of fields for query %s\n",
240               me,s->query);
241         exit(1);
242     }
243     if (s->timefield!=0) {
244         if ((s->fs.timefieldoffset=ftaschema_get_field_offset_by_name(
245                                                                       s->fs.schema,s->timefield))<0) {
246             gslog(LOG_EMERG,"%s::error:could not get "
247                   "offset for timefield %s in query %s\n",
248                   me,s->timefield,s->query);
249             exit(1);
250         }
251         
252         if (ftaschema_get_field_type_by_name(
253                                              s->fs.schema,s->timefield)!=UINT_TYPE) {
254             gslog(LOG_EMERG,"%s::error: illegal type for timefield "
255                   "%s in query %s UINT expected\n",
256                   me,s->timefield,s->query);
257             exit(1);
258         }
259     }
260 }
261
262
263 static void process_data(struct gsasciiprint_state *s)
264 {
265         gs_int32_t x;
266     gs_int32_t y;
267     gs_int32_t z;
268     gs_int32_t tb;
269     gs_uint32_t nsz;
270     FTAID rfta_id;
271     gs_uint32_t rsize;
272     gs_int8_t rbuf[2*MAXTUPLESZ];
273     gs_int32_t problem;
274     
275     gs_int32_t ctb=0;
276     gs_int8_t fname[1024];
277     gs_int8_t tmpname[1024];
278     gs_int8_t command[1024];
279         gs_int8_t topb[1024];
280     gs_int32_t rcnt=0;
281     gs_retval_t code;
282 #ifdef GSBZIP2
283         int bzerror;
284         BZFILE * b;
285         int abandon=0;
286         unsigned int bytes_in;
287         unsigned int bytes_out;
288 #endif
289 #ifdef ZLIB
290         gzFile of=0;
291 #else
292         FILE * of=0;
293 #endif
294     
295     
296     tmpname[0]=0;
297     
298     if (s->verbose!=0) gslog(LOG_INFO,"Getting Data for %s",s->query);
299     
300         sprintf(&topb[0],"GDAT\nVERSION:%u\nSCHEMALENGTH:%lu\n",
301                         s->parserversion,strlen(s->fs.asciischema)+1);
302     if (s->stream!=0) {
303                 of=stdout;
304                 // need to get ASCII version of schema
305                 fwrite(&topb[0],strlen(topb),1,of);
306                 fwrite(s->fs.asciischema,strlen(s->fs.asciischema)+1,1,of);
307     }
308     
309     if (s->quittime !=0 ) {
310                 signal(SIGALRM,timeouthand);
311                 alarm(s->quittime);
312     }
313     
314     
315     while((code=ftaapp_get_tuple(&rfta_id,&rsize,rbuf,2*MAXTUPLESZ,0))>=0) {
316                 rcnt++;
317                 if ((s->notemp==1) && (code==2)) continue;
318                 if (((s->quitcnt>0) && (rcnt>s->quitcnt))||ftaschema_is_eof_tuple(s->fs.schema, rbuf)) {
319                         if (s->verbose!=0)
320                                 gslog(LOG_EMERG, "exiting reached tuple limit or all data has been proccessed\n");
321                         if (of!=0) {
322                                 fclose(of);
323                 
324                                 if (s->compressed) {
325                                         system(command);
326                                 }
327                                 rename(tmpname,fname);
328                         }
329                         exit(0);
330                 }
331                 if (((code==0) || (code==2))&&(s->fs.fta_id.streamid==rfta_id.streamid)) {
332                         if (s->stream==0) {
333                                 if (s->timefield!=0) {
334                                         tb=fta_unpack_uint(rbuf,
335                                        rsize,
336                                        s->fs.timefieldoffset,
337                                        &problem);
338                                 } else {
339                                         tb=time(0);
340                                 }
341                                 if ((ctb+s->interval)<=tb) {
342                                         gsstats();
343                                         if (of!=0) {
344 #ifdef GSBZIP2
345                                                 BZ2_bzWriteClose(&bzerror, b, abandon, &bytes_in, &bytes_out );
346                                                 if (bzerror!=BZ_OK) {
347                                                         gslog(LOG_EMERG,"Could not bz close file .. EXITING\n");
348                                                         exit(0);
349                                                 }
350 #endif
351 #ifdef ZLIB
352                                                 gzclose(of);
353 #else
354                                                 fclose(of);
355 #endif
356 #ifndef GSBZIP2
357                                                 if (s->compressed) {
358                                                         system(command);
359                                                 }
360 #endif
361                                                 rename(tmpname,fname);
362                                         }
363                                         while((ctb+s->interval)<=tb) {
364                                                 if (ctb==0) {
365                                                         ctb=(tb/s->interval)*s->interval;
366                                                 } else {
367                                                         ctb=ctb+s->interval;
368                                                 }
369                                         }
370 #ifdef ZLIB
371                                         sprintf(tmpname,"%u%s.gz.tmp",ctb,s->extension);
372                                         sprintf(fname,"%u%s.gz",ctb,s->extension);
373 #else
374                     sprintf(tmpname,"%u%s.tmp",ctb,s->extension);
375                     sprintf(fname,"%u%s",ctb,s->extension);
376 #endif
377 #ifndef GSBZIP2
378                                         if (s->compressed) {
379                                                 sprintf(command,"gzip -S .tmpgz %s ; mv %s.tmpgz %s",tmpname,tmpname,tmpname);
380                                         }
381 #endif
382 #ifdef ZLIB
383                                         if ((of=gzopen(tmpname,"wb"))==0) {
384                                                 gslog(LOG_EMERG,"Could not open file \"%s\".. EXITING\n",
385                               tmpname);
386                                                 exit(0);
387                                         }
388 #else
389                                         if ((of=fopen(tmpname,"w"))==0) {
390                                                 gslog(LOG_EMERG,"Could not open file \"%s\".. EXITING\n",
391                               tmpname);
392                                                 exit(0);
393                                         }
394 #endif
395 #ifdef GSBZIP2
396                                         if (s->compressed) {
397                                                 b=BZ2_bzWriteOpen(&bzerror,of,5,0,30);
398                                                 if (bzerror!=BZ_OK) {
399                                                         gslog(LOG_EMERG,"Could not bz open file \"%s\".. EXITING\n",
400                                   tmpname);
401                                                         exit(0);
402                                                 }
403                                         }
404 #endif
405                     
406                                         if (s->compressed) {
407 #ifdef GSBZIP2
408                                                 BZ2_bzWrite ( &bzerror, b, &topb[0],strlen(topb));
409                                                 BZ2_bzWrite ( &bzerror, b, s->fs.asciischema,strlen(s->fs.asciischema)+1);
410 #endif
411                                         } else {
412                                                 // need to get ASCII version of schema
413 #ifdef ZLIB
414                                                 gzwrite(of,&topb[0],strlen(topb));
415                                                 gzwrite(of,s->fs.asciischema,strlen(s->fs.asciischema)+1);
416 #else
417                                                 fwrite(&topb[0],strlen(topb),1,of);
418                                                 fwrite(s->fs.asciischema,strlen(s->fs.asciischema)+1,1,of);
419 #endif
420                                         }
421                                 }
422                         }
423                         if (code==0) {
424                                 nsz=htonl(rsize);
425                                 if (s->compressed) {
426 #ifdef GSBZIP2
427                                         BZ2_bzWrite ( &bzerror, b,&nsz,sizeof(gs_uint32_t) );
428                                         BZ2_bzWrite ( &bzerror, b,rbuf,rsize);
429 #endif
430                                 } else {
431 #ifdef ZLIB
432                                         gzwrite(of,&nsz,sizeof(gs_uint32_t));
433                                         gzwrite(of,rbuf,rsize);
434                                 }
435 #else
436                 if (fwrite(&nsz,sizeof(gs_uint32_t),1,of)!=1) {
437                     ftaapp_exit();
438                     if (s->verbose!=0)
439                         gslog(LOG_EMERG,"Could not write to output\"%s\".. EXITING\n",
440                               tmpname);
441                     exit(0);
442                 }
443                 
444                 if (fwrite(rbuf,rsize,1,of)!=1) {
445                     ftaapp_exit();
446                     if (s->verbose!=0)
447                         gslog(LOG_EMERG,"Could not write to output\"%s\".. EXITING\n",
448                               tmpname);
449                     exit(0);
450                 }
451                 if ((s->stream!=0) || (s->flush!=0)) {
452                     fflush(of);
453                 }
454             }
455 #endif
456         }
457     }
458 }
459 }
460
461
462
463 int main(int argc, char** argv) {
464     
465     struct gsasciiprint_state s;
466     gs_retval_t ch;
467     endpoint gshub;
468     endpoint dummyep;
469     gs_uint32_t tip1,tip2,tip3,tip4;
470     gs_sp_t instance_name;
471     
472     me = argv[0];
473     
474     /* initialize host library and the sgroup  */
475     
476     if (argc<=1) {
477         print_usage_exit("Not enough arguments");
478     }
479     
480     /* parse args */
481     bzero(&s, sizeof(s));
482     s.interval = 60;    /* default */
483     s.quittime=0;
484     s.quitcnt=-1;
485     s.remote_print=-1;
486     s.bufsz=8*1024*1024;
487     
488     while ((ch = getopt(argc, argv, "nr:b:e:t:c:q:sfvzmh:")) != -1) {
489         switch(ch) {
490             case 'r':
491                 s.bufsz=atoi(optarg);
492                 break;
493             case 'c':
494                 s.quitcnt = atoi(optarg);
495                 break;
496             case 'q':
497                 s.quittime = atoi(optarg);
498                 break;
499             case 'b':
500                 s.timefield = optarg;
501                 break;
502             case 'e':
503                 s.extension = optarg;
504                 break;
505             case 'n':
506                 s.notemp=1;
507                 break;
508             case 't':
509                 s.interval = atoi(optarg);
510                 if (s.interval < 1) {
511                     goto usage;
512                 }
513                 break;
514             case 's':
515                 s.stream++;
516                 break;
517             case 'f':
518                 s.flush++;
519                 break;
520             case 'v':
521                 s.verbose++;
522                 break;
523             case 'z':
524                 s.compressed++;
525                 break;
526             case 'h':
527                 s.remote_print=atoi(optarg);
528                 break;
529             default:
530             usage:
531                 print_usage_exit("invalid args");
532         }
533     }
534     
535     if ((s.stream!=0) & (s.compressed!=0 | s.verbose!=0 |
536                          s.timefield!=0 | s.extension!=0)) {
537         print_usage_exit("illegal argument combination with -s");
538     }
539     
540     if (!s.extension) {
541         s.extension = (s.compressed) ? ".txt.gz" : ".txt";
542     }
543     
544     argc -= optind;
545     argv += optind;
546     if (argc<3) print_usage_exit("must specify hub info and query");
547     
548     if (sscanf(argv[0],"%u.%u.%u.%u:%hu",&tip1,&tip2,&tip3,&tip4,&(gshub.port))!= 5 ) {
549         gslog(LOG_EMERG,"HUB IP NOT DEFINED");
550         exit(1);
551     }
552     gshub.ip=htonl(tip1<<24|tip2<<16|tip3<<8|tip4);
553     gshub.port=htons(gshub.port);
554     instance_name=strdup(argv[1]);
555     if (set_hub(gshub)!=0) {
556         gslog(LOG_EMERG,"Could not set hub");
557         exit(1);
558     }
559     if (set_instance_name(instance_name)!=0) {
560         gslog(LOG_EMERG,"Could not set instance name");
561         exit(1);
562     }
563     
564     if (get_initinstance(gshub,instance_name,&dummyep,1)!=0) {
565         gslog(LOG_EMERG,"Did not receive signal that GS is initiated\n");
566     }
567     
568     
569     argc -=2;
570     argv +=2;
571     
572     s.query = argv[0];
573     
574     init(argc, argv, &s);
575     
576     process_data(&s);
577     
578     gslog(LOG_EMERG,"%s::internal error reached unexpected end", me);
579 }
580