X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Ftools%2Fgsgdatprint.c;h=baf64aca1e51008b06f2f435a68b58ac0dfa1257;hb=804ea15b01566ac0de58781ca61870b4824d0e02;hp=a98f49c0ba375fda5c188826ce3a07f59d0efdcc;hpb=07495effe193ca3f73c3bf0ce417068f9ac9dcdd;p=com%2Fgs-lite.git diff --git a/src/tools/gsgdatprint.c b/src/tools/gsgdatprint.c index a98f49c..baf64ac 100644 --- a/src/tools/gsgdatprint.c +++ b/src/tools/gsgdatprint.c @@ -1,580 +1,580 @@ -/* ------------------------------------------------ - Copyright 2014 AT&T Intellectual Property - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - ------------------------------------------- */ -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "gsconfig.h" -#include "gstypes.h" -#include -#include - -#define MAXSTRLEN 256 -#define MAXNUMFIELDS 64 - -//#define GSBZIP2 - -#ifdef GSBZIP2 -#include "bzlib.h" -#endif - -#ifdef ZLIB -#include "zlib.h" -#endif - -gs_sp_t me; /* saved copy argv[0] */ - -struct FTA_state { - FTAID fta_id; - gs_schemahandle_t schema; - gs_sp_t asciischema; - gs_int32_t numfields; - gs_int32_t timefieldoffset; -}; - -struct gsasciiprint_state { - // configuration state - gs_uint32_t bufsz; - gs_int32_t stream; - gs_int32_t flush; - gs_int32_t parserversion; - gs_int32_t compressed; - gs_int32_t verbose; - gs_int32_t notemp; - gs_sp_t timefield; - gs_int32_t interval; - gs_int32_t quitcnt; - gs_int32_t quittime; - gs_int32_t numfields; - gs_sp_t extension; - gs_sp_t query; - gs_int32_t remote_print; - - - // runtime state - struct FTA_state fs; -}; - - -void hand(int iv) { - ftaapp_exit(); - gslog(LOG_NOTICE, "exiting via signal handler %d...\n", iv); - exit(0); -} - -void timeouthand(int iv) { - ftaapp_exit(); - // if (s->verbose!=0) fprintf(stderr, "exiting because of timeout...\n"); - exit(0); -} - - -static void print_usage_exit(gs_sp_t reason) { - fprintf(stderr, - "%s::error: %s\n" - "%s::usage: %s -r -v -f -s -z -c -q -b -t -e : \n" - "\t-v makes output verbose\n" - "\t-f flushes each tuple individually\n" - "\t-h print within the hfta int identifies how many output streams based on the SubInterface field\n" - "\t-r sets the ringbuffer size default is 8MB\n" - "\t-s uses %s in streaming mode cannot be used with -v -z -b -t -e\n" - "\t-c indicates that %s should terminate after tuples.\n" - "\t-q indicates that %s should terminate after seconds.\n" - "\t-z the output is compressed with gzip \n" - "\t-b identifies the field which is increasing and is used to\n" - "\t\tbin output files. Field has to be of type uint.\n" - "\t\tThe default is the UNIX system time\n" - "\t-t specifies the number the specified with -b\n" - "\t\thas to increase before a new output file is used\n" - "\t\tthe default is 60 for 1 minute if the default UNIX time is used\n" - "\t-e identifies the file string extension used for the \n" - "\t\toutput file. The output file always starts with the current value\n" - "\t\tof the b field (or UNIX time) the default extension is .txt \n" - "\t\twithout the -z option and .txt.gz with the -z option\n" - "\t specifies the query which will be\n" - "\t\t instanciated.\n" - "\t sets the parameters for the query\n" - , me, reason, me, me, me,me,me); - exit(1); -} - - - - -static void init(gs_int32_t argc, gs_sp_t argv[], struct gsasciiprint_state *s) { - void *pblk; - gs_int32_t x, y, schema, pblklen, lcv; - gs_int32_t n_actual_param; - gs_int32_t n_expected_param; - gs_sp_t c; - gs_int8_t name[1024]; - - sprintf(name,"gsgdatprint:%s",argv[0]); - - gsopenlog(name); - - - if (s->verbose!=0) gslog(LOG_DEBUG,"Initializing gscp\n"); - if (ftaapp_init(s->bufsz)!=0) { - gslog(LOG_EMERG,"%s::error:could not initialize gscp\n", me); - exit(1); - } - - signal(SIGTERM, hand); - signal(SIGINT, hand); - signal(SIGPIPE, hand); - - if (s->verbose!=0) gslog(LOG_DEBUG,"Initializing FTAs\n"); - - schema = ftaapp_get_fta_schema_by_name(argv[0]); - if (schema < 0) { - gslog(LOG_EMERG,"%s::error:can't get fta '%s' schema\n", me ,argv[0]); - exit(1); - } - n_expected_param = ftaschema_parameter_len(schema); - if (n_expected_param == 0) { - pblk = 0; - pblklen = 0; - if (s->verbose) gslog(LOG_DEBUG,"[query does not have any params]\n"); - } else { - n_actual_param = argc-1; - if(n_actual_param < n_expected_param){ - gslog(LOG_EMERG,"Error, %d query parameters expected, %d provided.\n",n_expected_param, n_actual_param); - exit(1); - } - /* parse the params */ - for (lcv = 1 ; lcv < argc ; lcv++) { - char *k, *e; - int rv; - k = argv[lcv]; - e = k; - while (*e && *e != '=') e++; - if (*e == 0) { - gslog(LOG_EMERG,"param parse error '%s' (fmt 'key=val')\n", - argv[lcv]); - exit(1); - } - *e = 0; - rv = ftaschema_setparam_by_name(schema, k, e+1, strlen(e+1)); - *e = '='; - if (rv < 0) { - gslog(LOG_EMERG,"param setparam error '%s' (fmt 'key=val')\n", - argv[lcv]); - exit(1); - } - } - if (ftaschema_create_param_block(schema, &pblk, &pblklen) < 0) { - gslog(LOG_EMERG, "ftaschema_create_param_block failed!\n"); - exit(1); - } - } - ftaschema_free(schema); /* XXXCDC */ - - - if (s->remote_print>=0) { - s->fs.fta_id=ftaapp_add_fta_print(s->query,pblk?0:1,pblk?0:1,0,pblklen,pblk,"./data",s->extension,s->timefield,"SubInterface",s->interval,s->remote_print); - if (s->fs.fta_id.streamid==0){ - gslog(LOG_EMERG,"%s::error:could not initialize fta_print %s\n", - me,s->query); - exit(1); - } - again: - // wait forever - sleep(60); - goto again; - } - - - s->fs.fta_id=ftaapp_add_fta(s->query,pblk?0:1,pblk?0:1,0,pblklen,pblk); - if (s->fs.fta_id.streamid==0){ - gslog(LOG_EMERG,"%s::error:could not initialize fta %s\n", - me,s->query); - exit(1); - } - /* XXXCDC: pblk is malloc'd, should we free it? */ - - if ((c=ftaapp_get_fta_ascii_schema_by_name(s->query))==0){ - gslog(LOG_EMERG,"%s::error:could not get ascii schema for %s\n", - me,s->query); - exit(1); - } - - //ftaapp_get_fta_ascii_schema_by_name uses static buffer so make a copy - s->fs.asciischema=strdup(c); - - - // Set parser version here - s->parserversion=get_schemaparser_version(); - - if ((s->fs.schema=ftaapp_get_fta_schema(s->fs.fta_id))<0) { - gslog(LOG_EMERG,"%s::error:could not get schema for query\n", - me,s->query); - exit(1); - } - - // Use all available fields - if ((s->fs.numfields=ftaschema_tuple_len(s->fs.schema))<0) { - gslog(LOG_EMERG,"%s::error:could not get number of fields for query %s\n", - me,s->query); - exit(1); - } - if (s->timefield!=0) { - if ((s->fs.timefieldoffset=ftaschema_get_field_offset_by_name( - s->fs.schema,s->timefield))<0) { - gslog(LOG_EMERG,"%s::error:could not get " - "offset for timefield %s in query %s\n", - me,s->timefield,s->query); - exit(1); - } - - if (ftaschema_get_field_type_by_name( - s->fs.schema,s->timefield)!=UINT_TYPE) { - gslog(LOG_EMERG,"%s::error: illegal type for timefield " - "%s in query %s UINT expected\n", - me,s->timefield,s->query); - exit(1); - } - } -} - - -static void process_data(struct gsasciiprint_state *s) -{ - gs_int32_t x; - gs_int32_t y; - gs_int32_t z; - gs_int32_t tb; - gs_uint32_t nsz; - FTAID rfta_id; - gs_uint32_t rsize; - gs_int8_t rbuf[2*MAXTUPLESZ]; - gs_int32_t problem; - - gs_int32_t ctb=0; - gs_int8_t fname[1024]; - gs_int8_t tmpname[1024]; - gs_int8_t command[1024]; - gs_int8_t topb[1024]; - gs_int32_t rcnt=0; - gs_retval_t code; -#ifdef GSBZIP2 - int bzerror; - BZFILE * b; - int abandon=0; - unsigned int bytes_in; - unsigned int bytes_out; -#endif -#ifdef ZLIB - gzFile of=0; -#else - FILE * of=0; -#endif - - - tmpname[0]=0; - - if (s->verbose!=0) gslog(LOG_INFO,"Getting Data for %s",s->query); - - sprintf(&topb[0],"GDAT\nVERSION:%u\nSCHEMALENGTH:%lu\n", - s->parserversion,strlen(s->fs.asciischema)+1); - if (s->stream!=0) { - of=stdout; - // need to get ASCII version of schema - fwrite(&topb[0],strlen(topb),1,of); - fwrite(s->fs.asciischema,strlen(s->fs.asciischema)+1,1,of); - } - - if (s->quittime !=0 ) { - signal(SIGALRM,timeouthand); - alarm(s->quittime); - } - - - while((code=ftaapp_get_tuple(&rfta_id,&rsize,rbuf,2*MAXTUPLESZ,0))>=0) { - rcnt++; - if ((s->notemp==1) && (code==2)) continue; - if (((s->quitcnt>0) && (rcnt>s->quitcnt))||ftaschema_is_eof_tuple(s->fs.schema, rbuf)) { - if (s->verbose!=0) - gslog(LOG_EMERG, "exiting reached tuple limit or all data has been proccessed\n"); - if (of!=0) { - fclose(of); - - if (s->compressed) { - system(command); - } - rename(tmpname,fname); - } - exit(0); - } - if (((code==0) || (code==2))&&(s->fs.fta_id.streamid==rfta_id.streamid)) { - if (s->stream==0) { - if (s->timefield!=0) { - tb=fta_unpack_uint(rbuf, - rsize, - s->fs.timefieldoffset, - &problem); - } else { - tb=time(0); - } - if ((ctb+s->interval)<=tb) { - gsstats(); - if (of!=0) { -#ifdef GSBZIP2 - BZ2_bzWriteClose(&bzerror, b, abandon, &bytes_in, &bytes_out ); - if (bzerror!=BZ_OK) { - gslog(LOG_EMERG,"Could not bz close file .. EXITING\n"); - exit(0); - } -#endif -#ifdef ZLIB - gzclose(of); -#else - fclose(of); -#endif -#ifndef GSBZIP2 - if (s->compressed) { - system(command); - } -#endif - rename(tmpname,fname); - } - while((ctb+s->interval)<=tb) { - if (ctb==0) { - ctb=(tb/s->interval)*s->interval; - } else { - ctb=ctb+s->interval; - } - } -#ifdef ZLIB - sprintf(tmpname,"%u%s.gz.tmp",ctb,s->extension); - sprintf(fname,"%u%s.gz",ctb,s->extension); -#else - sprintf(tmpname,"%u%s.tmp",ctb,s->extension); - sprintf(fname,"%u%s",ctb,s->extension); -#endif -#ifndef GSBZIP2 - if (s->compressed) { - sprintf(command,"gzip -S .tmpgz %s ; mv %s.tmpgz %s",tmpname,tmpname,tmpname); - } -#endif -#ifdef ZLIB - if ((of=gzopen(tmpname,"wb"))==0) { - gslog(LOG_EMERG,"Could not open file \"%s\".. EXITING\n", - tmpname); - exit(0); - } -#else - if ((of=fopen(tmpname,"w"))==0) { - gslog(LOG_EMERG,"Could not open file \"%s\".. EXITING\n", - tmpname); - exit(0); - } -#endif -#ifdef GSBZIP2 - if (s->compressed) { - b=BZ2_bzWriteOpen(&bzerror,of,5,0,30); - if (bzerror!=BZ_OK) { - gslog(LOG_EMERG,"Could not bz open file \"%s\".. EXITING\n", - tmpname); - exit(0); - } - } -#endif - - if (s->compressed) { -#ifdef GSBZIP2 - BZ2_bzWrite ( &bzerror, b, &topb[0],strlen(topb)); - BZ2_bzWrite ( &bzerror, b, s->fs.asciischema,strlen(s->fs.asciischema)+1); -#endif - } else { - // need to get ASCII version of schema -#ifdef ZLIB - gzwrite(of,&topb[0],strlen(topb)); - gzwrite(of,s->fs.asciischema,strlen(s->fs.asciischema)+1); -#else - fwrite(&topb[0],strlen(topb),1,of); - fwrite(s->fs.asciischema,strlen(s->fs.asciischema)+1,1,of); -#endif - } - } - } - if (code==0) { - nsz=htonl(rsize); - if (s->compressed) { -#ifdef GSBZIP2 - BZ2_bzWrite ( &bzerror, b,&nsz,sizeof(gs_uint32_t) ); - BZ2_bzWrite ( &bzerror, b,rbuf,rsize); -#endif - } else { -#ifdef ZLIB - gzwrite(of,&nsz,sizeof(gs_uint32_t)); - gzwrite(of,rbuf,rsize); - } -#else - if (fwrite(&nsz,sizeof(gs_uint32_t),1,of)!=1) { - ftaapp_exit(); - if (s->verbose!=0) - gslog(LOG_EMERG,"Could not write to output\"%s\".. EXITING\n", - tmpname); - exit(0); - } - - if (fwrite(rbuf,rsize,1,of)!=1) { - ftaapp_exit(); - if (s->verbose!=0) - gslog(LOG_EMERG,"Could not write to output\"%s\".. EXITING\n", - tmpname); - exit(0); - } - if ((s->stream!=0) || (s->flush!=0)) { - fflush(of); - } - } -#endif - } - } -} -} - - - -int main(int argc, char** argv) { - - struct gsasciiprint_state s; - gs_retval_t ch; - endpoint gshub; - endpoint dummyep; - gs_uint32_t tip1,tip2,tip3,tip4; - gs_sp_t instance_name; - - me = argv[0]; - - /* initialize host library and the sgroup */ - - if (argc<=1) { - print_usage_exit("Not enough arguments"); - } - - /* parse args */ - bzero(&s, sizeof(s)); - s.interval = 60; /* default */ - s.quittime=0; - s.quitcnt=-1; - s.remote_print=-1; - s.bufsz=8*1024*1024; - - while ((ch = getopt(argc, argv, "nr:b:e:t:c:q:sfvzmh:")) != -1) { - switch(ch) { - case 'r': - s.bufsz=atoi(optarg); - break; - case 'c': - s.quitcnt = atoi(optarg); - break; - case 'q': - s.quittime = atoi(optarg); - break; - case 'b': - s.timefield = optarg; - break; - case 'e': - s.extension = optarg; - break; - case 'n': - s.notemp=1; - break; - case 't': - s.interval = atoi(optarg); - if (s.interval < 1) { - goto usage; - } - break; - case 's': - s.stream++; - break; - case 'f': - s.flush++; - break; - case 'v': - s.verbose++; - break; - case 'z': - s.compressed++; - break; - case 'h': - s.remote_print=atoi(optarg); - break; - default: - usage: - print_usage_exit("invalid args"); - } - } - - if ((s.stream!=0) & (s.compressed!=0 | s.verbose!=0 | - s.timefield!=0 | s.extension!=0)) { - print_usage_exit("illegal argument combination with -s"); - } - - if (!s.extension) { - s.extension = (s.compressed) ? ".txt.gz" : ".txt"; - } - - argc -= optind; - argv += optind; - if (argc<3) print_usage_exit("must specify hub info and query"); - - if (sscanf(argv[0],"%u.%u.%u.%u:%hu",&tip1,&tip2,&tip3,&tip4,&(gshub.port))!= 5 ) { - gslog(LOG_EMERG,"HUB IP NOT DEFINED"); - exit(1); - } - gshub.ip=htonl(tip1<<24|tip2<<16|tip3<<8|tip4); - gshub.port=htons(gshub.port); - instance_name=strdup(argv[1]); - if (set_hub(gshub)!=0) { - gslog(LOG_EMERG,"Could not set hub"); - exit(1); - } - if (set_instance_name(instance_name)!=0) { - gslog(LOG_EMERG,"Could not set instance name"); - exit(1); - } - - if (get_initinstance(gshub,instance_name,&dummyep,1)!=0) { - gslog(LOG_EMERG,"Did not receive signal that GS is initiated\n"); - } - - - argc -=2; - argv +=2; - - s.query = argv[0]; - - init(argc, argv, &s); - - process_data(&s); - - gslog(LOG_EMERG,"%s::internal error reached unexpected end", me); -} - +/* ------------------------------------------------ + Copyright 2014 AT&T Intellectual Property + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + ------------------------------------------- */ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "gsconfig.h" +#include "gstypes.h" +#include +#include + +#define MAXSTRLEN 256 +#define MAXNUMFIELDS 64 + +//#define GSBZIP2 + +#ifdef GSBZIP2 +#include "bzlib.h" +#endif + +#ifdef ZLIB +#include "zlib.h" +#endif + +gs_sp_t me; /* saved copy argv[0] */ + +struct FTA_state { + FTAID fta_id; + gs_schemahandle_t schema; + gs_sp_t asciischema; + gs_int32_t numfields; + gs_int32_t timefieldoffset; +}; + +struct gsasciiprint_state { + // configuration state + gs_uint32_t bufsz; + gs_int32_t stream; + gs_int32_t flush; + gs_int32_t parserversion; + gs_int32_t compressed; + gs_int32_t verbose; + gs_int32_t notemp; + gs_sp_t timefield; + gs_int32_t interval; + gs_int32_t quitcnt; + gs_int32_t quittime; + gs_int32_t numfields; + gs_sp_t extension; + gs_sp_t query; + gs_int32_t remote_print; + + + // runtime state + struct FTA_state fs; +}; + + +void hand(int iv) { + ftaapp_exit(); + gslog(LOG_NOTICE, "exiting via signal handler %d...\n", iv); + exit(0); +} + +void timeouthand(int iv) { + ftaapp_exit(); + // if (s->verbose!=0) fprintf(stderr, "exiting because of timeout...\n"); + exit(0); +} + + +static void print_usage_exit(gs_sp_t reason) { + fprintf(stderr, + "%s::error: %s\n" + "%s::usage: %s -r -v -f -s -z -c -q -b -t -e : \n" + "\t-v makes output verbose\n" + "\t-f flushes each tuple individually\n" + "\t-h print within the hfta int identifies how many output streams based on the SubInterface field\n" + "\t-r sets the ringbuffer size default is 8MB\n" + "\t-s uses %s in streaming mode cannot be used with -v -z -b -t -e\n" + "\t-c indicates that %s should terminate after tuples.\n" + "\t-q indicates that %s should terminate after seconds.\n" + "\t-z the output is compressed with gzip \n" + "\t-b identifies the field which is increasing and is used to\n" + "\t\tbin output files. Field has to be of type uint.\n" + "\t\tThe default is the UNIX system time\n" + "\t-t specifies the number the specified with -b\n" + "\t\thas to increase before a new output file is used\n" + "\t\tthe default is 60 for 1 minute if the default UNIX time is used\n" + "\t-e identifies the file string extension used for the \n" + "\t\toutput file. The output file always starts with the current value\n" + "\t\tof the b field (or UNIX time) the default extension is .txt \n" + "\t\twithout the -z option and .txt.gz with the -z option\n" + "\t specifies the query which will be\n" + "\t\t instanciated.\n" + "\t sets the parameters for the query\n" + , me, reason, me, me, me,me,me); + exit(1); +} + + + + +static void init(gs_int32_t argc, gs_sp_t argv[], struct gsasciiprint_state *s) { + void *pblk; + gs_int32_t x, y, schema, pblklen, lcv; + gs_int32_t n_actual_param; + gs_int32_t n_expected_param; + gs_sp_t c; + gs_int8_t name[1024]; + + sprintf(name,"gsgdatprint:%s",argv[0]); + + gsopenlog(name); + + + if (s->verbose!=0) gslog(LOG_DEBUG,"Initializing gscp\n"); + if (ftaapp_init(s->bufsz)!=0) { + gslog(LOG_EMERG,"%s::error:could not initialize gscp\n", me); + exit(1); + } + + signal(SIGTERM, hand); + signal(SIGINT, hand); + signal(SIGPIPE, hand); + + if (s->verbose!=0) gslog(LOG_DEBUG,"Initializing FTAs\n"); + + schema = ftaapp_get_fta_schema_by_name(argv[0]); + if (schema < 0) { + gslog(LOG_EMERG,"%s::error:can't get fta '%s' schema\n", me ,argv[0]); + exit(1); + } + n_expected_param = ftaschema_parameter_len(schema); + if (n_expected_param == 0) { + pblk = 0; + pblklen = 0; + if (s->verbose) gslog(LOG_DEBUG,"[query does not have any params]\n"); + } else { + n_actual_param = argc-1; + if(n_actual_param < n_expected_param){ + gslog(LOG_EMERG,"Error, %d query parameters expected, %d provided.\n",n_expected_param, n_actual_param); + exit(1); + } + /* parse the params */ + for (lcv = 1 ; lcv < argc ; lcv++) { + char *k, *e; + int rv; + k = argv[lcv]; + e = k; + while (*e && *e != '=') e++; + if (*e == 0) { + gslog(LOG_EMERG,"param parse error '%s' (fmt 'key=val')\n", + argv[lcv]); + exit(1); + } + *e = 0; + rv = ftaschema_setparam_by_name(schema, k, e+1, strlen(e+1)); + *e = '='; + if (rv < 0) { + gslog(LOG_EMERG,"param setparam error '%s' (fmt 'key=val')\n", + argv[lcv]); + exit(1); + } + } + if (ftaschema_create_param_block(schema, &pblk, &pblklen) < 0) { + gslog(LOG_EMERG, "ftaschema_create_param_block failed!\n"); + exit(1); + } + } + ftaschema_free(schema); /* XXXCDC */ + + + if (s->remote_print>=0) { + s->fs.fta_id=ftaapp_add_fta_print(s->query,pblk?0:1,pblk?0:1,0,pblklen,pblk,"./data",s->extension,s->timefield,"SubInterface",s->interval,s->remote_print); + if (s->fs.fta_id.streamid==0){ + gslog(LOG_EMERG,"%s::error:could not initialize fta_print %s\n", + me,s->query); + exit(1); + } + again: + // wait forever + sleep(60); + goto again; + } + + + s->fs.fta_id=ftaapp_add_fta(s->query,pblk?0:1,pblk?0:1,0,pblklen,pblk); + if (s->fs.fta_id.streamid==0){ + gslog(LOG_EMERG,"%s::error:could not initialize fta %s\n", + me,s->query); + exit(1); + } + /* XXXCDC: pblk is malloc'd, should we free it? */ + + if ((c=ftaapp_get_fta_ascii_schema_by_name(s->query))==0){ + gslog(LOG_EMERG,"%s::error:could not get ascii schema for %s\n", + me,s->query); + exit(1); + } + + //ftaapp_get_fta_ascii_schema_by_name uses static buffer so make a copy + s->fs.asciischema=strdup(c); + + + // Set parser version here + s->parserversion=get_schemaparser_version(); + + if ((s->fs.schema=ftaapp_get_fta_schema(s->fs.fta_id))<0) { + gslog(LOG_EMERG,"%s::error:could not get schema for query\n", + me,s->query); + exit(1); + } + + // Use all available fields + if ((s->fs.numfields=ftaschema_tuple_len(s->fs.schema))<0) { + gslog(LOG_EMERG,"%s::error:could not get number of fields for query %s\n", + me,s->query); + exit(1); + } + if (s->timefield!=0) { + if ((s->fs.timefieldoffset=ftaschema_get_field_offset_by_name( + s->fs.schema,s->timefield))<0) { + gslog(LOG_EMERG,"%s::error:could not get " + "offset for timefield %s in query %s\n", + me,s->timefield,s->query); + exit(1); + } + + if (ftaschema_get_field_type_by_name( + s->fs.schema,s->timefield)!=UINT_TYPE) { + gslog(LOG_EMERG,"%s::error: illegal type for timefield " + "%s in query %s UINT expected\n", + me,s->timefield,s->query); + exit(1); + } + } +} + + +static void process_data(struct gsasciiprint_state *s) +{ + gs_int32_t x; + gs_int32_t y; + gs_int32_t z; + gs_int32_t tb; + gs_uint32_t nsz; + FTAID rfta_id; + gs_uint32_t rsize; + gs_int8_t rbuf[2*MAXTUPLESZ]; + gs_int32_t problem; + + gs_int32_t ctb=0; + gs_int8_t fname[1024]; + gs_int8_t tmpname[1024]; + gs_int8_t command[1024]; + gs_int8_t topb[1024]; + gs_int32_t rcnt=0; + gs_retval_t code; +#ifdef GSBZIP2 + int bzerror; + BZFILE * b; + int abandon=0; + unsigned int bytes_in; + unsigned int bytes_out; +#endif +#ifdef ZLIB + gzFile of=0; +#else + FILE * of=0; +#endif + + + tmpname[0]=0; + + if (s->verbose!=0) gslog(LOG_INFO,"Getting Data for %s",s->query); + + sprintf(&topb[0],"GDAT\nVERSION:%u\nSCHEMALENGTH:%lu\n", + s->parserversion,strlen(s->fs.asciischema)+1); + if (s->stream!=0) { + of=stdout; + // need to get ASCII version of schema + fwrite(&topb[0],strlen(topb),1,of); + fwrite(s->fs.asciischema,strlen(s->fs.asciischema)+1,1,of); + } + + if (s->quittime !=0 ) { + signal(SIGALRM,timeouthand); + alarm(s->quittime); + } + + + while((code=ftaapp_get_tuple(&rfta_id,&rsize,rbuf,2*MAXTUPLESZ,0))>=0) { + rcnt++; + if ((s->notemp==1) && (code==2)) continue; + if (((s->quitcnt>0) && (rcnt>s->quitcnt))||ftaschema_is_eof_tuple(s->fs.schema, rbuf)) { + if (s->verbose!=0) + gslog(LOG_EMERG, "exiting reached tuple limit or all data has been proccessed\n"); + if (of!=0) { + fclose(of); + + if (s->compressed) { + system(command); + } + rename(tmpname,fname); + } + exit(0); + } + if (((code==0) || (code==2))&&(s->fs.fta_id.streamid==rfta_id.streamid)) { + if (s->stream==0) { + if (s->timefield!=0) { + tb=fta_unpack_uint(rbuf, + rsize, + s->fs.timefieldoffset, + &problem); + } else { + tb=time(0); + } + if ((ctb+s->interval)<=tb) { + gsstats(); + if (of!=0) { +#ifdef GSBZIP2 + BZ2_bzWriteClose(&bzerror, b, abandon, &bytes_in, &bytes_out ); + if (bzerror!=BZ_OK) { + gslog(LOG_EMERG,"Could not bz close file .. EXITING\n"); + exit(0); + } +#endif +#ifdef ZLIB + gzclose(of); +#else + fclose(of); +#endif +#ifndef GSBZIP2 + if (s->compressed) { + system(command); + } +#endif + rename(tmpname,fname); + } + while((ctb+s->interval)<=tb) { + if (ctb==0) { + ctb=(tb/s->interval)*s->interval; + } else { + ctb=ctb+s->interval; + } + } +#ifdef ZLIB + sprintf(tmpname,"%u%s.gz.tmp",ctb,s->extension); + sprintf(fname,"%u%s.gz",ctb,s->extension); +#else + sprintf(tmpname,"%u%s.tmp",ctb,s->extension); + sprintf(fname,"%u%s",ctb,s->extension); +#endif +#ifndef GSBZIP2 + if (s->compressed) { + sprintf(command,"gzip -S .tmpgz %s ; mv %s.tmpgz %s",tmpname,tmpname,tmpname); + } +#endif +#ifdef ZLIB + if ((of=gzopen(tmpname,"wb"))==0) { + gslog(LOG_EMERG,"Could not open file \"%s\".. EXITING\n", + tmpname); + exit(0); + } +#else + if ((of=fopen(tmpname,"w"))==0) { + gslog(LOG_EMERG,"Could not open file \"%s\".. EXITING\n", + tmpname); + exit(0); + } +#endif +#ifdef GSBZIP2 + if (s->compressed) { + b=BZ2_bzWriteOpen(&bzerror,of,5,0,30); + if (bzerror!=BZ_OK) { + gslog(LOG_EMERG,"Could not bz open file \"%s\".. EXITING\n", + tmpname); + exit(0); + } + } +#endif + + if (s->compressed) { +#ifdef GSBZIP2 + BZ2_bzWrite ( &bzerror, b, &topb[0],strlen(topb)); + BZ2_bzWrite ( &bzerror, b, s->fs.asciischema,strlen(s->fs.asciischema)+1); +#endif + } else { + // need to get ASCII version of schema +#ifdef ZLIB + gzwrite(of,&topb[0],strlen(topb)); + gzwrite(of,s->fs.asciischema,strlen(s->fs.asciischema)+1); +#else + fwrite(&topb[0],strlen(topb),1,of); + fwrite(s->fs.asciischema,strlen(s->fs.asciischema)+1,1,of); +#endif + } + } + } + if (code==0) { + nsz=htonl(rsize); + if (s->compressed) { +#ifdef GSBZIP2 + BZ2_bzWrite ( &bzerror, b,&nsz,sizeof(gs_uint32_t) ); + BZ2_bzWrite ( &bzerror, b,rbuf,rsize); +#endif + } else { +#ifdef ZLIB + gzwrite(of,&nsz,sizeof(gs_uint32_t)); + gzwrite(of,rbuf,rsize); + } +#else + if (fwrite(&nsz,sizeof(gs_uint32_t),1,of)!=1) { + ftaapp_exit(); + if (s->verbose!=0) + gslog(LOG_EMERG,"Could not write to output\"%s\".. EXITING\n", + tmpname); + exit(0); + } + + if (fwrite(rbuf,rsize,1,of)!=1) { + ftaapp_exit(); + if (s->verbose!=0) + gslog(LOG_EMERG,"Could not write to output\"%s\".. EXITING\n", + tmpname); + exit(0); + } + if ((s->stream!=0) || (s->flush!=0)) { + fflush(of); + } + } +#endif + } + } +} +} + + + +int main(int argc, char** argv) { + + struct gsasciiprint_state s; + gs_retval_t ch; + endpoint gshub; + endpoint dummyep; + gs_uint32_t tip1,tip2,tip3,tip4; + gs_sp_t instance_name; + + me = argv[0]; + + /* initialize host library and the sgroup */ + + if (argc<=1) { + print_usage_exit("Not enough arguments"); + } + + /* parse args */ + bzero(&s, sizeof(s)); + s.interval = 60; /* default */ + s.quittime=0; + s.quitcnt=-1; + s.remote_print=-1; + s.bufsz=8*1024*1024; + + while ((ch = getopt(argc, argv, "nr:b:e:t:c:q:sfvzmh:")) != -1) { + switch(ch) { + case 'r': + s.bufsz=atoi(optarg); + break; + case 'c': + s.quitcnt = atoi(optarg); + break; + case 'q': + s.quittime = atoi(optarg); + break; + case 'b': + s.timefield = optarg; + break; + case 'e': + s.extension = optarg; + break; + case 'n': + s.notemp=1; + break; + case 't': + s.interval = atoi(optarg); + if (s.interval < 1) { + goto usage; + } + break; + case 's': + s.stream++; + break; + case 'f': + s.flush++; + break; + case 'v': + s.verbose++; + break; + case 'z': + s.compressed++; + break; + case 'h': + s.remote_print=atoi(optarg); + break; + default: + usage: + print_usage_exit("invalid args"); + } + } + + if ((s.stream!=0) & (s.compressed!=0 | s.verbose!=0 | + s.timefield!=0 | s.extension!=0)) { + print_usage_exit("illegal argument combination with -s"); + } + + if (!s.extension) { + s.extension = (s.compressed) ? ".txt.gz" : ".txt"; + } + + argc -= optind; + argv += optind; + if (argc<3) print_usage_exit("must specify hub info and query"); + + if (sscanf(argv[0],"%u.%u.%u.%u:%hu",&tip1,&tip2,&tip3,&tip4,&(gshub.port))!= 5 ) { + gslog(LOG_EMERG,"HUB IP NOT DEFINED"); + exit(1); + } + gshub.ip=htonl(tip1<<24|tip2<<16|tip3<<8|tip4); + gshub.port=htons(gshub.port); + instance_name=strdup(argv[1]); + if (set_hub(gshub)!=0) { + gslog(LOG_EMERG,"Could not set hub"); + exit(1); + } + if (set_instance_name(instance_name)!=0) { + gslog(LOG_EMERG,"Could not set instance name"); + exit(1); + } + + if (get_initinstance(gshub,instance_name,&dummyep,1)!=0) { + gslog(LOG_EMERG,"Did not receive signal that GS is initiated\n"); + } + + + argc -=2; + argv +=2; + + s.query = argv[0]; + + init(argc, argv, &s); + + process_data(&s); + + gslog(LOG_EMERG,"%s::internal error reached unexpected end", me); +} +