1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
24 #include <sys/types.h>
25 #include <sys/socket.h>
26 #include <netinet/in.h>
33 #include <schemaparser.h>
35 #define MAXLINE 100000
36 static unsigned tcpport=0;
37 static char linebuf[MAXLINE];
42 // Not all systems have timersub defined so make sure its ther
45 #define timersub(tvp, uvp, vvp) \
47 (vvp)->tv_sec = (tvp)->tv_sec - (uvp)->tv_sec; \
48 (vvp)->tv_usec = (tvp)->tv_usec - (uvp)->tv_usec; \
49 if ((vvp)->tv_usec < 0) { \
51 (vvp)->tv_usec += 1000000; \
59 fprintf(stderr, "exiting via signal handler %d...\n", iv);
63 static void wait_for_client() {
64 struct sockaddr_in serv_addr,cli_addr;
66 if (listensockfd==0) {
68 listensockfd=socket(AF_INET, SOCK_STREAM, 0);
69 if (listensockfd < 0) {
70 gslog(LOG_EMERG,"Error:Could not create socket for tcp data stream");
73 bzero((char *) &serv_addr, sizeof(serv_addr));
74 serv_addr.sin_family = AF_INET;
75 serv_addr.sin_addr.s_addr = INADDR_ANY;
76 serv_addr.sin_port = htons(tcpport);
78 /* make sure we can reuse the common port rapidly */
79 if (setsockopt(listensockfd, SOL_SOCKET, SO_REUSEPORT,
80 (gs_sp_t )&on, sizeof(on)) != 0) {
81 gslog(LOG_EMERG,"Error::could not set socket option\n");
85 if (setsockopt(listensockfd, SOL_SOCKET, SO_REUSEADDR,
86 (gs_sp_t )&on, sizeof(on)) != 0) {
87 gslog(LOG_EMERG,"Error::could not set socket option\n");
91 if (bind(listensockfd, (struct sockaddr *) &serv_addr,
92 sizeof(serv_addr)) < 0) {
93 gslog(LOG_EMERG,"Error:Could not bind socket for tcp data stream");
99 listen(listensockfd,5);
100 clilen = sizeof(cli_addr);
101 fd=accept(listensockfd, (struct sockaddr *) &cli_addr, &clilen);
103 gslog(LOG_EMERG,"Error:Could not accept connection on tcp socket");
109 static void emit_socket() {
115 if((w=write(fd,&linebuf[o],l))==0) {
123 static void emit_line() {
126 printf("%s",linebuf);
133 int main(int argc, char* argv[]) {
134 gs_sp_t me = argv[0];
136 gs_int32_t schema, ch;
140 gs_uint32_t bufsz=8*1024*1024;
141 gs_int8_t rbuf[2*MAXTUPLESZ];
143 gs_int32_t numberoffields;
144 gs_int32_t verbose=0;
149 gs_int32_t n_actual_param;
150 gs_int32_t n_expected_param;
153 struct timeval tvs, tve, tvd;
157 gs_uint32_t tip1,tip2,tip3,tip4;
158 gs_sp_t instance_name;
160 gs_uint32_t tlimit = 0; // time limit in seconds
161 time_t start_time, curr_time;
166 while ((ch = getopt(argc, argv, "l:p:r:vXD")) != -1) {
172 tcpport=atoi(optarg);
184 tlimit = atoi(optarg);
188 fprintf(stderr, "usage: %s [-r <bufsz>] [-p <port>] [-l <time_limit>] [-v] [-X] [-D] <gshub-hostname>:<gshub-port> <gsinstance_name> query param1 param2...\n",
195 if (argc<3) goto usage;
197 if (sscanf(argv[0],"%u.%u.%u.%u:%hu",&tip1,&tip2,&tip3,&tip4,&(gshub.port))!= 5 ) {
198 gslog(LOG_EMERG,"HUB IP NOT DEFINED");
201 gshub.ip=htonl(tip1<<24|tip2<<16|tip3<<8|tip4);
202 gshub.port=htons(gshub.port);
203 instance_name=strdup(argv[1]);
204 if (set_hub(gshub)!=0) {
205 gslog(LOG_EMERG,"Could not set hub");
208 if (set_instance_name(instance_name)!=0) {
209 gslog(LOG_EMERG,"Could not set instance name");
213 if (get_initinstance(gshub,instance_name,&dummyep,1)!=0) {
214 gslog(LOG_EMERG,"Did not receive signal that GS is initiated\n");
217 gettimeofday(&tvs, 0);
223 /* initialize host library and the sgroup */
225 if (verbose>=2) fprintf(stderr,"Inializin gscp\n");
227 if (ftaapp_init(bufsz)!=0) {
228 fprintf(stderr,"%s::error:could not initialize gscp\n", me);
232 signal(SIGTERM, hand);
233 signal(SIGINT, hand);
235 schema = ftaapp_get_fta_schema_by_name(argv[0]);
237 fprintf(stderr,"%s::error:could not get fta '%s' schema\n",
241 n_expected_param = ftaschema_parameter_len(schema);
242 if (n_expected_param == 0) {
246 /* parse the params */
247 n_actual_param = argc-1;
248 if(n_actual_param < n_expected_param){
249 fprintf(stderr,"Error, %d query parameters expected, %d provided.\n",n_expected_param, n_actual_param);
252 for (lcv = 1 ; lcv < argc ; lcv++) {
257 while (*e && *e != '=') e++;
259 fprintf(stderr,"param parse error '%s' (fmt 'key=val')\n",
264 rv = ftaschema_setparam_by_name(schema, k, e+1, strlen(e+1));
267 fprintf(stderr,"param setparam error '%s' (fmt 'key=val')\n",
272 if (ftaschema_create_param_block(schema, &pblk, &pblklen) < 0) {
273 fprintf(stderr, "ftaschema_create_param_block failed!\n");
277 ftaschema_free(schema); /* XXXCDC */
280 if (verbose>=2) fprintf(stderr,"Initalice FTA\n");
282 fta_id=ftaapp_add_fta(argv[0],0,0,0,pblklen,pblk);
283 if (fta_id.streamid==0) {
284 fprintf(stderr,"%s::error:could not initialize fta %s\n",
288 /* XXXCDC: pblk is malloc'd, should we free it? */
290 if (verbose>=2) fprintf(stderr,"Get schema handle\n");
292 if ((schema=ftaapp_get_fta_schema(fta_id))<0) {
293 fprintf(stderr,"%s::error:could not get schema\n", me);
297 if ((numberoffields=ftaschema_tuple_len(schema))<0) {
298 fprintf(stderr,"%s::error:could not get number of fields in schema\n",
304 for(y=0; y<numberoffields;y++) {
305 printf("%s",ftaschema_field_name(schema,y));
306 if (y<numberoffields-1) printf("|");
311 gettimeofday(&tve, 0);
312 timersub(&tve, &tvs, &tvd);
313 printf("TIME= %ld.%06d sec\n", tvd.tv_sec, tvd.tv_usec);
320 start_time = time(NULL);
322 while((code=ftaapp_get_tuple(&rfta_id,&rsize,rbuf,2*MAXTUPLESZ,0))>=0) {
325 if (ftaschema_is_eof_tuple(schema, rbuf)) {
326 /* initiate shutdown or something of that nature */
327 printf("#All data proccessed\n");
333 snprintf(linebuf,MAXLINE,"RESULT CODE => %u\n",code);
336 if ((code==0)&&(rfta_id.streamid == fta_id.streamid)) {
337 for(y=0; y<numberoffields;y++) {
338 struct access_result ar;
340 printf("%s->",ftaschema_field_name(schema,y));
341 ar=ftaschema_get_field_by_index(schema,y,rbuf,rsize);
342 switch (ar.field_data_type) {
344 snprintf(linebuf,MAXLINE,"%d",ar.r.i);
347 snprintf(linebuf,MAXLINE,"%u",ar.r.ui);
350 snprintf(linebuf,MAXLINE,"%u.%u.%u.%u",ar.r.ui>>24&0xff,
359 for(x=0;x<4;x++) { if (ar.r.ip6.v[x]==0) zc++;}
361 snprintf(linebuf,MAXLINE,"");
363 unsigned char * a = (unsigned char *) &(ar.r.ip6.v[0]);
365 y=((unsigned)a[2*x])<<8|((unsigned) a[2*x+1]);
366 snprintf(&linebuf[strlen(linebuf)],MAXLINE,"%04x",y);
367 if (x<7) snprintf(&linebuf[strlen(linebuf)],MAXLINE,":");
370 snprintf(linebuf,MAXLINE,"::");
376 snprintf(linebuf,MAXLINE,"%u",ar.r.ui);
380 snprintf(linebuf,MAXLINE,"FALSE");
382 snprintf(linebuf,MAXLINE,"TRUE");
386 snprintf(linebuf,MAXLINE,"%llu",ar.r.ul);
389 snprintf(linebuf,MAXLINE,"%lld",ar.r.l);
392 snprintf(linebuf,MAXLINE,"%f",ar.r.f);
400 snprintf(linebuf,MAXLINE,"%f sec",t);
409 src=(char*)ar.r.vs.offset;
413 for(x=0;x<ar.r.vs.length;x++) {
415 if ((c<='~') && (c>=' ')) {
435 if (y<numberoffields-1) snprintf(&linebuf[strlen(linebuf)],MAXLINE,"|");
438 snprintf(linebuf,MAXLINE,"\n");
440 if (verbose!=0) fflush(stdout);
442 if (rfta_id.streamid != fta_id.streamid)
443 fprintf(stderr,"Got unkown streamid %llu \n",rfta_id.streamid);
446 // whenever we receive a temp tuple check if we reached time limit
447 if ((code==2) && tlimit && (time(NULL)-start_time)>=tlimit) {
448 fprintf(stderr,"Reached time limit of %d seconds\n",tlimit);