1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
24 #include <sys/types.h>
25 #include <sys/socket.h>
26 #include <netinet/in.h>
33 #include <schemaparser.h>
35 #define MAXLINE 100000
36 static unsigned tcpport=0;
37 static char linebuf[MAXLINE];
43 // Not all systems have timersub defined so make sure its ther
46 #define timersub(tvp, uvp, vvp) \
48 (vvp)->tv_sec = (tvp)->tv_sec - (uvp)->tv_sec; \
49 (vvp)->tv_usec = (tvp)->tv_usec - (uvp)->tv_usec; \
50 if ((vvp)->tv_usec < 0) { \
52 (vvp)->tv_usec += 1000000; \
60 fprintf(stderr, "exiting via signal handler %d...\n", iv);
64 static void wait_for_client() {
65 struct sockaddr_in serv_addr,cli_addr;
67 if (listensockfd==0) {
69 listensockfd=socket(AF_INET, SOCK_STREAM, 0);
70 if (listensockfd < 0) {
71 gslog(LOG_EMERG,"Error:Could not create socket for tcp data stream");
74 bzero((char *) &serv_addr, sizeof(serv_addr));
75 serv_addr.sin_family = AF_INET;
76 serv_addr.sin_addr.s_addr = INADDR_ANY;
77 serv_addr.sin_port = htons(tcpport);
79 /* make sure we can reuse the common port rapidly */
80 if (setsockopt(listensockfd, SOL_SOCKET, SO_REUSEPORT,
81 (gs_sp_t )&on, sizeof(on)) != 0) {
82 gslog(LOG_EMERG,"Error::could not set socket option\n");
86 if (setsockopt(listensockfd, SOL_SOCKET, SO_REUSEADDR,
87 (gs_sp_t )&on, sizeof(on)) != 0) {
88 gslog(LOG_EMERG,"Error::could not set socket option\n");
92 if (bind(listensockfd, (struct sockaddr *) &serv_addr,
93 sizeof(serv_addr)) < 0) {
94 gslog(LOG_EMERG,"Error:Could not bind socket for tcp data stream");
100 listen(listensockfd,5);
101 clilen = sizeof(cli_addr);
102 fd=accept(listensockfd, (struct sockaddr *) &cli_addr, &clilen);
104 gslog(LOG_EMERG,"Error:Could not accept connection on tcp socket");
110 static void emit_socket() {
116 if((w=write(fd,&linebuf[o],l))==0) {
124 static void emit_line() {
127 fprintf(outf,"%s",linebuf);
134 int main(int argc, char* argv[]) {
135 gs_sp_t me = argv[0];
137 gs_int32_t schema, ch;
141 gs_uint32_t bufsz=8*1024*1024;
142 gs_int8_t rbuf[2*MAXTUPLESZ];
144 gs_int32_t numberoffields;
145 gs_int32_t verbose=0;
150 gs_int32_t n_actual_param;
151 gs_int32_t n_expected_param;
154 struct timeval tvs, tve, tvd;
158 gs_uint32_t tip1,tip2,tip3,tip4;
159 gs_sp_t instance_name;
161 gs_uint32_t tlimit = 0; // time limit in seconds
162 time_t start_time, curr_time;
166 // by default the output will go to stdout
169 while ((ch = getopt(argc, argv, "l:p:r:veXD")) != -1) {
175 tcpport=atoi(optarg);
190 tlimit = atoi(optarg);
194 fprintf(stderr, "usage: %s [-r <bufsz>] [-e] [-p <port>] [-l <time_limit>] [-v] [-X] [-D] <gshub-hostname>:<gshub-port> <gsinstance_name> query param1 param2...\n",
201 if (argc<3) goto usage;
203 if (sscanf(argv[0],"%u.%u.%u.%u:%hu",&tip1,&tip2,&tip3,&tip4,&(gshub.port))!= 5 ) {
204 gslog(LOG_EMERG,"HUB IP NOT DEFINED");
207 gshub.ip=htonl(tip1<<24|tip2<<16|tip3<<8|tip4);
208 gshub.port=htons(gshub.port);
209 instance_name=strdup(argv[1]);
210 if (set_hub(gshub)!=0) {
211 gslog(LOG_EMERG,"Could not set hub");
214 if (set_instance_name(instance_name)!=0) {
215 gslog(LOG_EMERG,"Could not set instance name");
219 if (get_initinstance(gshub,instance_name,&dummyep,1)!=0) {
220 gslog(LOG_EMERG,"Did not receive signal that GS is initiated\n");
223 gettimeofday(&tvs, 0);
229 /* initialize host library and the sgroup */
231 if (verbose>=2) fprintf(stderr,"Inializin gscp\n");
233 if (ftaapp_init(bufsz)!=0) {
234 fprintf(stderr,"%s::error:could not initialize gscp\n", me);
238 signal(SIGTERM, hand);
239 signal(SIGINT, hand);
241 schema = ftaapp_get_fta_schema_by_name(argv[0]);
243 fprintf(stderr,"%s::error:could not get fta '%s' schema\n",
247 n_expected_param = ftaschema_parameter_len(schema);
248 if (n_expected_param == 0) {
252 /* parse the params */
253 n_actual_param = argc-1;
254 if(n_actual_param < n_expected_param){
255 fprintf(stderr,"Error, %d query parameters expected, %d provided.\n",n_expected_param, n_actual_param);
258 for (lcv = 1 ; lcv < argc ; lcv++) {
263 while (*e && *e != '=') e++;
265 fprintf(stderr,"param parse error '%s' (fmt 'key=val')\n",
270 rv = ftaschema_setparam_by_name(schema, k, e+1, strlen(e+1));
273 fprintf(stderr,"param setparam error '%s' (fmt 'key=val')\n",
278 if (ftaschema_create_param_block(schema, &pblk, &pblklen) < 0) {
279 fprintf(stderr, "ftaschema_create_param_block failed!\n");
283 ftaschema_free(schema); /* XXXCDC */
286 if (verbose>=2) fprintf(stderr,"Initalice FTA\n");
288 fta_id=ftaapp_add_fta(argv[0],0,0,0,pblklen,pblk);
289 if (fta_id.streamid==0) {
290 fprintf(stderr,"%s::error:could not initialize fta %s\n",
294 /* XXXCDC: pblk is malloc'd, should we free it? */
296 if (verbose>=2) fprintf(stderr,"Get schema handle\n");
298 if ((schema=ftaapp_get_fta_schema(fta_id))<0) {
299 fprintf(stderr,"%s::error:could not get schema\n", me);
303 if ((numberoffields=ftaschema_tuple_len(schema))<0) {
304 fprintf(stderr,"%s::error:could not get number of fields in schema\n",
310 for(y=0; y<numberoffields;y++) {
311 printf("%s",ftaschema_field_name(schema,y));
312 if (y<numberoffields-1) printf("|");
317 gettimeofday(&tve, 0);
318 timersub(&tve, &tvs, &tvd);
319 printf("TIME= %ld.%06d sec\n", tvd.tv_sec, tvd.tv_usec);
326 start_time = time(NULL);
328 while((code=ftaapp_get_tuple(&rfta_id,&rsize,rbuf,2*MAXTUPLESZ,0))>=0) {
331 if (ftaschema_is_eof_tuple(schema, rbuf)) {
332 /* initiate shutdown or something of that nature */
333 printf("#All data proccessed\n");
339 snprintf(linebuf,MAXLINE,"RESULT CODE => %u\n",code);
342 if ((code==0)&&(rfta_id.streamid == fta_id.streamid)) {
343 for(y=0; y<numberoffields;y++) {
344 struct access_result ar;
346 printf("%s->",ftaschema_field_name(schema,y));
347 ar=ftaschema_get_field_by_index(schema,y,rbuf,rsize);
348 switch (ar.field_data_type) {
350 snprintf(linebuf,MAXLINE,"%d",ar.r.i);
353 snprintf(linebuf,MAXLINE,"%u",ar.r.ui);
356 snprintf(linebuf,MAXLINE,"%u.%u.%u.%u",ar.r.ui>>24&0xff,
365 for(x=0;x<4;x++) { if (ar.r.ip6.v[x]==0) zc++;}
367 snprintf(linebuf,MAXLINE,"");
369 unsigned char * a = (unsigned char *) &(ar.r.ip6.v[0]);
371 y=((unsigned)a[2*x])<<8|((unsigned) a[2*x+1]);
372 snprintf(&linebuf[strlen(linebuf)],MAXLINE,"%04x",y);
373 if (x<7) snprintf(&linebuf[strlen(linebuf)],MAXLINE,":");
376 snprintf(linebuf,MAXLINE,"::");
382 snprintf(linebuf,MAXLINE,"%u",ar.r.ui);
386 snprintf(linebuf,MAXLINE,"FALSE");
388 snprintf(linebuf,MAXLINE,"TRUE");
392 snprintf(linebuf,MAXLINE,"%llu",ar.r.ul);
395 snprintf(linebuf,MAXLINE,"%lld",ar.r.l);
398 snprintf(linebuf,MAXLINE,"%f",ar.r.f);
406 snprintf(linebuf,MAXLINE,"%f sec",t);
415 src=(char*)ar.r.vs.offset;
419 for(x=0;x<ar.r.vs.length;x++) {
421 if ((c<='~') && (c>=' ')) {
441 if (y<numberoffields-1) snprintf(&linebuf[strlen(linebuf)],MAXLINE,"|");
444 snprintf(linebuf,MAXLINE,"\n");
446 if (verbose!=0) fflush(stdout);
448 if (rfta_id.streamid != fta_id.streamid)
449 fprintf(stderr,"Got unkown streamid %llu \n",rfta_id.streamid);
452 // whenever we receive a temp tuple check if we reached time limit
453 if ((code==2) && tlimit && (time(NULL)-start_time)>=tlimit) {
454 fprintf(stderr,"Reached time limit of %d seconds\n",tlimit);