1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
25 #include <sys/types.h>
26 #include <sys/socket.h>
27 #include <netinet/in.h>
31 #include <schemaparser.h>
35 #define MAXNUMFIELDS 64
47 gs_sp_t me; /* saved copy argv[0] */
51 gs_schemahandle_t schema;
54 gs_int32_t timefieldoffset;
57 struct gsasciiprint_state {
58 // configuration state
62 gs_int32_t parserversion;
63 gs_int32_t compressed;
73 gs_int32_t remote_print;
83 gslog(LOG_NOTICE, "exiting via signal handler %d...\n", iv);
87 void timeouthand(int iv) {
89 // if (s->verbose!=0) fprintf(stderr, "exiting because of timeout...\n");
94 static void print_usage_exit(gs_sp_t reason) {
97 "%s::usage: %s -r <int> -v -f -s -z -c <int> -q <int> -b <field> -t <int> -e <string> <gshub-hostname>:<gshub-port> <gsinstance_name> <query_name> <parameters>\n"
98 "\t-v makes output verbose\n"
99 "\t-f flushes each tuple individually\n"
100 "\t-h <int> print within the hfta int identifies how many output streams based on the SubInterface field\n"
101 "\t-r sets the ringbuffer size default is 8MB\n"
102 "\t-s uses %s in streaming mode cannot be used with -v -z -b -t -e\n"
103 "\t-c <int> indicates that %s should terminate after <int> tuples.\n"
104 "\t-q <int> indicates that %s should terminate after <int> seconds.\n"
105 "\t-z the output is compressed with gzip \n"
106 "\t-b <field> identifies the field which is increasing and is used to\n"
107 "\t\tbin output files. Field has to be of type uint.\n"
108 "\t\tThe default is the UNIX system time\n"
109 "\t-t <int> specifies the number the <field> specified with -b\n"
110 "\t\thas to increase before a new output file is used\n"
111 "\t\tthe default is 60 for 1 minute if the default UNIX time is used\n"
112 "\t-e <string> identifies the file string extension used for the \n"
113 "\t\toutput file. The output file always starts with the current value\n"
114 "\t\tof the b field (or UNIX time) the default extension is .txt \n"
115 "\t\twithout the -z option and .txt.gz with the -z option\n"
116 "\t<query_name> specifies the query which will be\n"
117 "\t\t instanciated.\n"
118 "\t<parameters> sets the parameters for the query\n"
119 , me, reason, me, me, me,me,me);
126 static void init(gs_int32_t argc, gs_sp_t argv[], struct gsasciiprint_state *s) {
128 gs_int32_t x, y, schema, pblklen, lcv;
129 gs_int32_t n_actual_param;
130 gs_int32_t n_expected_param;
132 gs_int8_t name[1024];
134 sprintf(name,"gsgdatprint:%s",argv[0]);
139 if (s->verbose!=0) gslog(LOG_DEBUG,"Initializing gscp\n");
140 if (ftaapp_init(s->bufsz)!=0) {
141 gslog(LOG_EMERG,"%s::error:could not initialize gscp\n", me);
145 signal(SIGTERM, hand);
146 signal(SIGINT, hand);
147 signal(SIGPIPE, hand);
149 if (s->verbose!=0) gslog(LOG_DEBUG,"Initializing FTAs\n");
151 schema = ftaapp_get_fta_schema_by_name(argv[0]);
153 gslog(LOG_EMERG,"%s::error:can't get fta '%s' schema\n", me ,argv[0]);
156 n_expected_param = ftaschema_parameter_len(schema);
157 if (n_expected_param == 0) {
160 if (s->verbose) gslog(LOG_DEBUG,"[query does not have any params]\n");
162 n_actual_param = argc-1;
163 if(n_actual_param < n_expected_param){
164 gslog(LOG_EMERG,"Error, %d query parameters expected, %d provided.\n",n_expected_param, n_actual_param);
167 /* parse the params */
168 for (lcv = 1 ; lcv < argc ; lcv++) {
173 while (*e && *e != '=') e++;
175 gslog(LOG_EMERG,"param parse error '%s' (fmt 'key=val')\n",
180 rv = ftaschema_setparam_by_name(schema, k, e+1, strlen(e+1));
183 gslog(LOG_EMERG,"param setparam error '%s' (fmt 'key=val')\n",
188 if (ftaschema_create_param_block(schema, &pblk, &pblklen) < 0) {
189 gslog(LOG_EMERG, "ftaschema_create_param_block failed!\n");
193 ftaschema_free(schema); /* XXXCDC */
196 if (s->remote_print>=0) {
197 s->fs.fta_id=ftaapp_add_fta_print(s->query,pblk?0:1,pblk?0:1,0,pblklen,pblk,"./data",s->extension,s->timefield,"SubInterface",s->interval,s->remote_print);
198 if (s->fs.fta_id.streamid==0){
199 gslog(LOG_EMERG,"%s::error:could not initialize fta_print %s\n",
210 s->fs.fta_id=ftaapp_add_fta(s->query,pblk?0:1,pblk?0:1,0,pblklen,pblk);
211 if (s->fs.fta_id.streamid==0){
212 gslog(LOG_EMERG,"%s::error:could not initialize fta %s\n",
216 /* XXXCDC: pblk is malloc'd, should we free it? */
218 if ((c=ftaapp_get_fta_ascii_schema_by_name(s->query))==0){
219 gslog(LOG_EMERG,"%s::error:could not get ascii schema for %s\n",
224 //ftaapp_get_fta_ascii_schema_by_name uses static buffer so make a copy
225 s->fs.asciischema=strdup(c);
228 // Set parser version here
229 s->parserversion=get_schemaparser_version();
231 if ((s->fs.schema=ftaapp_get_fta_schema(s->fs.fta_id))<0) {
232 gslog(LOG_EMERG,"%s::error:could not get schema for query\n",
237 // Use all available fields
238 if ((s->fs.numfields=ftaschema_tuple_len(s->fs.schema))<0) {
239 gslog(LOG_EMERG,"%s::error:could not get number of fields for query %s\n",
243 if (s->timefield!=0) {
244 if ((s->fs.timefieldoffset=ftaschema_get_field_offset_by_name(
245 s->fs.schema,s->timefield))<0) {
246 gslog(LOG_EMERG,"%s::error:could not get "
247 "offset for timefield %s in query %s\n",
248 me,s->timefield,s->query);
252 if (ftaschema_get_field_type_by_name(
253 s->fs.schema,s->timefield)!=UINT_TYPE) {
254 gslog(LOG_EMERG,"%s::error: illegal type for timefield "
255 "%s in query %s UINT expected\n",
256 me,s->timefield,s->query);
263 static void process_data(struct gsasciiprint_state *s)
272 gs_int8_t rbuf[2*MAXTUPLESZ];
276 gs_int8_t fname[1024];
277 gs_int8_t tmpname[1024];
278 gs_int8_t command[1024];
279 gs_int8_t topb[1024];
286 unsigned int bytes_in;
287 unsigned int bytes_out;
298 if (s->verbose!=0) gslog(LOG_INFO,"Getting Data for %s",s->query);
300 sprintf(&topb[0],"GDAT\nVERSION:%u\nSCHEMALENGTH:%lu\n",
301 s->parserversion,strlen(s->fs.asciischema)+1);
304 // need to get ASCII version of schema
305 fwrite(&topb[0],strlen(topb),1,of);
306 fwrite(s->fs.asciischema,strlen(s->fs.asciischema)+1,1,of);
309 if (s->quittime !=0 ) {
310 signal(SIGALRM,timeouthand);
315 while((code=ftaapp_get_tuple(&rfta_id,&rsize,rbuf,2*MAXTUPLESZ,0))>=0) {
317 if ((s->notemp==1) && (code==2)) continue;
318 if (((s->quitcnt>0) && (rcnt>s->quitcnt))||ftaschema_is_eof_tuple(s->fs.schema, rbuf)) {
320 gslog(LOG_EMERG, "exiting reached tuple limit or all data has been proccessed\n");
327 rename(tmpname,fname);
331 if (((code==0) || (code==2))&&(s->fs.fta_id.streamid==rfta_id.streamid)) {
333 if (s->timefield!=0) {
334 tb=fta_unpack_uint(rbuf,
336 s->fs.timefieldoffset,
341 if ((ctb+s->interval)<=tb) {
345 BZ2_bzWriteClose(&bzerror, b, abandon, &bytes_in, &bytes_out );
346 if (bzerror!=BZ_OK) {
347 gslog(LOG_EMERG,"Could not bz close file .. EXITING\n");
361 rename(tmpname,fname);
363 while((ctb+s->interval)<=tb) {
365 ctb=(tb/s->interval)*s->interval;
371 sprintf(tmpname,"%u%s.gz.tmp",ctb,s->extension);
372 sprintf(fname,"%u%s.gz",ctb,s->extension);
374 sprintf(tmpname,"%u%s.tmp",ctb,s->extension);
375 sprintf(fname,"%u%s",ctb,s->extension);
379 sprintf(command,"gzip -S .tmpgz %s ; mv %s.tmpgz %s",tmpname,tmpname,tmpname);
383 if ((of=gzopen(tmpname,"wb"))==0) {
384 gslog(LOG_EMERG,"Could not open file \"%s\".. EXITING\n",
389 if ((of=fopen(tmpname,"w"))==0) {
390 gslog(LOG_EMERG,"Could not open file \"%s\".. EXITING\n",
397 b=BZ2_bzWriteOpen(&bzerror,of,5,0,30);
398 if (bzerror!=BZ_OK) {
399 gslog(LOG_EMERG,"Could not bz open file \"%s\".. EXITING\n",
408 BZ2_bzWrite ( &bzerror, b, &topb[0],strlen(topb));
409 BZ2_bzWrite ( &bzerror, b, s->fs.asciischema,strlen(s->fs.asciischema)+1);
412 // need to get ASCII version of schema
414 gzwrite(of,&topb[0],strlen(topb));
415 gzwrite(of,s->fs.asciischema,strlen(s->fs.asciischema)+1);
417 fwrite(&topb[0],strlen(topb),1,of);
418 fwrite(s->fs.asciischema,strlen(s->fs.asciischema)+1,1,of);
427 BZ2_bzWrite ( &bzerror, b,&nsz,sizeof(gs_uint32_t) );
428 BZ2_bzWrite ( &bzerror, b,rbuf,rsize);
432 gzwrite(of,&nsz,sizeof(gs_uint32_t));
433 gzwrite(of,rbuf,rsize);
436 if (fwrite(&nsz,sizeof(gs_uint32_t),1,of)!=1) {
439 gslog(LOG_EMERG,"Could not write to output\"%s\".. EXITING\n",
444 if (fwrite(rbuf,rsize,1,of)!=1) {
447 gslog(LOG_EMERG,"Could not write to output\"%s\".. EXITING\n",
451 if ((s->stream!=0) || (s->flush!=0)) {
463 int main(int argc, char** argv) {
465 struct gsasciiprint_state s;
469 gs_uint32_t tip1,tip2,tip3,tip4;
470 gs_sp_t instance_name;
474 /* initialize host library and the sgroup */
477 print_usage_exit("Not enough arguments");
481 bzero(&s, sizeof(s));
482 s.interval = 60; /* default */
488 while ((ch = getopt(argc, argv, "nr:b:e:t:c:q:sfvzmh:")) != -1) {
491 s.bufsz=atoi(optarg);
494 s.quitcnt = atoi(optarg);
497 s.quittime = atoi(optarg);
500 s.timefield = optarg;
503 s.extension = optarg;
509 s.interval = atoi(optarg);
510 if (s.interval < 1) {
527 s.remote_print=atoi(optarg);
531 print_usage_exit("invalid args");
535 if ((s.stream!=0) & (s.compressed!=0 | s.verbose!=0 |
536 s.timefield!=0 | s.extension!=0)) {
537 print_usage_exit("illegal argument combination with -s");
541 s.extension = (s.compressed) ? ".txt.gz" : ".txt";
546 if (argc<3) print_usage_exit("must specify hub info and query");
548 if (sscanf(argv[0],"%u.%u.%u.%u:%hu",&tip1,&tip2,&tip3,&tip4,&(gshub.port))!= 5 ) {
549 gslog(LOG_EMERG,"HUB IP NOT DEFINED");
552 gshub.ip=htonl(tip1<<24|tip2<<16|tip3<<8|tip4);
553 gshub.port=htons(gshub.port);
554 instance_name=strdup(argv[1]);
555 if (set_hub(gshub)!=0) {
556 gslog(LOG_EMERG,"Could not set hub");
559 if (set_instance_name(instance_name)!=0) {
560 gslog(LOG_EMERG,"Could not set instance name");
564 if (get_initinstance(gshub,instance_name,&dummyep,1)!=0) {
565 gslog(LOG_EMERG,"Did not receive signal that GS is initiated\n");
574 init(argc, argv, &s);
578 gslog(LOG_EMERG,"%s::internal error reached unexpected end", me);