1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
24 #include <sys/types.h>
25 #include <sys/socket.h>
26 #include <netinet/in.h>
33 #include <schemaparser.h>
35 #define MAXLINE 100000
36 static unsigned tcpport=0;
37 static char linebuf[MAXLINE];
43 // Not all systems have timersub defined so make sure its ther
46 #define timersub(tvp, uvp, vvp) \
48 (vvp)->tv_sec = (tvp)->tv_sec - (uvp)->tv_sec; \
49 (vvp)->tv_usec = (tvp)->tv_usec - (uvp)->tv_usec; \
50 if ((vvp)->tv_usec < 0) { \
52 (vvp)->tv_usec += 1000000; \
60 fprintf(stderr, "exiting via signal handler %d...\n", iv);
64 static void wait_for_client() {
65 struct sockaddr_in serv_addr,cli_addr;
67 if (listensockfd==0) {
69 listensockfd=socket(AF_INET, SOCK_STREAM, 0);
70 if (listensockfd < 0) {
71 gslog(LOG_EMERG,"Error:Could not create socket for tcp data stream");
74 bzero((char *) &serv_addr, sizeof(serv_addr));
75 serv_addr.sin_family = AF_INET;
76 serv_addr.sin_addr.s_addr = INADDR_ANY;
77 serv_addr.sin_port = htons(tcpport);
79 /* make sure we can reuse the common port rapidly */
80 if (setsockopt(listensockfd, SOL_SOCKET, SO_REUSEPORT,
81 (gs_sp_t )&on, sizeof(on)) != 0) {
82 gslog(LOG_EMERG,"Error::could not set socket option\n");
86 if (setsockopt(listensockfd, SOL_SOCKET, SO_REUSEADDR,
87 (gs_sp_t )&on, sizeof(on)) != 0) {
88 gslog(LOG_EMERG,"Error::could not set socket option\n");
92 if (bind(listensockfd, (struct sockaddr *) &serv_addr,
93 sizeof(serv_addr)) < 0) {
94 gslog(LOG_EMERG,"Error:Could not bind socket for tcp data stream");
100 listen(listensockfd,5);
101 clilen = sizeof(cli_addr);
102 fd=accept(listensockfd, (struct sockaddr *) &cli_addr, &clilen);
104 gslog(LOG_EMERG,"Error:Could not accept connection on tcp socket");
110 static void emit_socket() {
116 if((w=write(fd,&linebuf[o],l))==0) {
124 static void emit_line() {
127 fprintf(outf,"%s",linebuf);
134 int main(int argc, char* argv[]) {
135 gs_sp_t me = argv[0];
137 gs_int32_t schema, ch;
141 gs_uint32_t bufsz=8*1024*1024;
142 gs_int8_t rbuf[2*MAXTUPLESZ];
144 gs_int32_t numberoffields;
145 gs_int32_t verbose=0;
150 gs_int32_t n_actual_param;
151 gs_int32_t n_expected_param;
154 struct timeval tvs, tve, tvd;
158 gs_uint32_t tip1,tip2,tip3,tip4;
159 gs_sp_t instance_name;
163 gs_uint32_t tlimit = 0; // time limit in seconds
164 time_t start_time, curr_time;
171 // by default the output will go to stdout
174 while ((ch = getopt(argc, argv, "l:p:r:sveXD")) != -1) {
183 tcpport=atoi(optarg);
198 tlimit = atoi(optarg);
202 fprintf(stderr, "usage: %s [-r <bufsz>] [-e] [-p <port>] [-l <time_limit>] [-v] [-X] [-D] <gshub-hostname>:<gshub-port> <gsinstance_name> query param1 param2...\n",
209 if (argc<3) goto usage;
211 if (sscanf(argv[0],"%u.%u.%u.%u:%hu",&tip1,&tip2,&tip3,&tip4,&(gshub.port))!= 5 ) {
212 gslog(LOG_EMERG,"HUB IP NOT DEFINED");
215 gshub.ip=htonl(tip1<<24|tip2<<16|tip3<<8|tip4);
216 gshub.port=htons(gshub.port);
217 instance_name=strdup(argv[1]);
218 if (set_hub(gshub)!=0) {
219 gslog(LOG_EMERG,"Could not set hub");
222 if (set_instance_name(instance_name)!=0) {
223 gslog(LOG_EMERG,"Could not set instance name");
227 if (get_initinstance(gshub,instance_name,&dummyep,1)!=0) {
228 gslog(LOG_EMERG,"Did not receive signal that GS is initiated\n");
231 gettimeofday(&tvs, 0);
237 /* initialize host library and the sgroup */
239 if (verbose>=2) fprintf(stderr,"Inializin gscp\n");
241 if (ftaapp_init(bufsz)!=0) {
242 fprintf(stderr,"%s::error:could not initialize gscp\n", me);
246 signal(SIGTERM, hand);
247 signal(SIGINT, hand);
249 schema = ftaapp_get_fta_schema_by_name(argv[0]);
251 fprintf(stderr,"%s::error:could not get fta '%s' schema\n",
255 n_expected_param = ftaschema_parameter_len(schema);
256 if (n_expected_param == 0) {
260 /* parse the params */
261 n_actual_param = argc-1;
262 if(n_actual_param < n_expected_param){
263 fprintf(stderr,"Error, %d query parameters expected, %d provided.\n",n_expected_param, n_actual_param);
266 for (lcv = 1 ; lcv < argc ; lcv++) {
271 while (*e && *e != '=') e++;
273 fprintf(stderr,"param parse error '%s' (fmt 'key=val')\n",
278 rv = ftaschema_setparam_by_name(schema, k, e+1, strlen(e+1));
281 fprintf(stderr,"param setparam error '%s' (fmt 'key=val')\n",
286 if (ftaschema_create_param_block(schema, &pblk, &pblklen) < 0) {
287 fprintf(stderr, "ftaschema_create_param_block failed!\n");
291 ftaschema_free(schema); /* XXXCDC */
294 if (verbose>=2) fprintf(stderr,"Initalice FTA\n");
296 fta_id=ftaapp_add_fta(argv[0],0,0,0,pblklen,pblk);
297 if (fta_id.streamid==0) {
298 fprintf(stderr,"%s::error:could not initialize fta %s\n",
302 /* XXXCDC: pblk is malloc'd, should we free it? */
304 if (verbose>=2) fprintf(stderr,"Get schema handle\n");
306 if ((schema=ftaapp_get_fta_schema(fta_id))<0) {
307 fprintf(stderr,"%s::error:could not get schema\n", me);
311 if ((numberoffields=ftaschema_tuple_len(schema))<0) {
312 fprintf(stderr,"%s::error:could not get number of fields in schema\n",
318 for(y=0; y<numberoffields;y++) {
319 printf("%s",ftaschema_field_name(schema,y));
320 if (y<numberoffields-1) printf("|");
325 gettimeofday(&tve, 0);
326 timersub(&tve, &tvs, &tvd);
327 printf("TIME= %ld.%06d sec\n", tvd.tv_sec, tvd.tv_usec);
334 start_time = time(NULL);
336 while((code=ftaapp_get_tuple(&rfta_id,&rsize,rbuf,2*MAXTUPLESZ,0))>=0) {
339 if (ftaschema_is_eof_tuple(schema, rbuf)) {
340 /* initiate shutdown or something of that nature */
341 printf("#All data proccessed\n");
347 snprintf(linebuf,MAXLINE,"RESULT CODE => %u\n",code);
350 if ((code==0)&&(rfta_id.streamid == fta_id.streamid)) {
351 for(y=0; y<numberoffields;y++) {
352 struct access_result ar;
354 printf("%s->",ftaschema_field_name(schema,y));
355 ar=ftaschema_get_field_by_index(schema,y,rbuf,rsize);
356 switch (ar.field_data_type) {
358 snprintf(linebuf,MAXLINE,"%d",ar.r.i);
361 snprintf(linebuf,MAXLINE,"%u",ar.r.ui);
364 snprintf(linebuf,MAXLINE,"%u.%u.%u.%u",ar.r.ui>>24&0xff,
373 for(x=0;x<4;x++) { if (ar.r.ip6.v[x]==0) zc++;}
375 snprintf(linebuf,MAXLINE,"");
377 unsigned char * a = (unsigned char *) &(ar.r.ip6.v[0]);
379 y=((unsigned)a[2*x])<<8|((unsigned) a[2*x+1]);
380 snprintf(&linebuf[strlen(linebuf)],MAXLINE,"%04x",y);
381 if (x<7) snprintf(&linebuf[strlen(linebuf)],MAXLINE,":");
384 snprintf(linebuf,MAXLINE,"::");
390 snprintf(linebuf,MAXLINE,"%u",ar.r.ui);
394 snprintf(linebuf,MAXLINE,"FALSE");
396 snprintf(linebuf,MAXLINE,"TRUE");
400 snprintf(linebuf,MAXLINE,"%llu",ar.r.ul);
403 snprintf(linebuf,MAXLINE,"%lld",ar.r.l);
406 snprintf(linebuf,MAXLINE,"%f",ar.r.f);
414 snprintf(linebuf,MAXLINE,"%f sec",t);
423 src=(char*)ar.r.vs.offset;
427 for(x=0;x<ar.r.vs.length;x++) {
429 if ((c<='~') && (c>=' ')) {
449 // if (y<numberoffields-1) snprintf(&linebuf[strlen(linebuf)],MAXLINE,"|");
450 if (y<numberoffields-1) snprintf(&linebuf[strlen(linebuf)],MAXLINE,sep_str);
453 snprintf(linebuf,MAXLINE,"\n");
455 if (verbose!=0) fflush(stdout);
457 if (rfta_id.streamid != fta_id.streamid)
458 fprintf(stderr,"Got unkown streamid %llu \n",rfta_id.streamid);
461 // whenever we receive a temp tuple check if we reached time limit
462 if ((code==2) && tlimit && (time(NULL)-start_time)>=tlimit) {
463 fprintf(stderr,"Reached time limit of %d seconds\n",tlimit);