1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
18 * Print ves formatted records to the console.
19 * Each line is a json record.
20 * Based on gsprintconsole.c, just differences in formatting.
33 #include <sys/types.h>
34 #include <sys/socket.h>
35 #include <netinet/in.h>
41 #include "simple_http.h"
43 #include <schemaparser.h>
45 #define MAXLINE 100000
46 static unsigned tcpport=0;
47 static char linebuf[MAXLINE];
52 // Not all systems have timersub defined so make sure its ther
55 #define timersub(tvp, uvp, vvp) \
57 (vvp)->tv_sec = (tvp)->tv_sec - (uvp)->tv_sec; \
58 (vvp)->tv_usec = (tvp)->tv_usec - (uvp)->tv_usec; \
59 if ((vvp)->tv_usec < 0) { \
61 (vvp)->tv_usec += 1000000; \
69 fprintf(stderr, "exiting via signal handler %d...\n", iv);
73 static void wait_for_client() {
74 struct sockaddr_in serv_addr,cli_addr;
76 if (listensockfd==0) {
78 listensockfd=socket(AF_INET, SOCK_STREAM, 0);
79 if (listensockfd < 0) {
80 gslog(LOG_EMERG,"Error:Could not create socket for tcp data stream");
83 bzero((char *) &serv_addr, sizeof(serv_addr));
84 serv_addr.sin_family = AF_INET;
85 serv_addr.sin_addr.s_addr = INADDR_ANY;
86 serv_addr.sin_port = htons(tcpport);
88 /* make sure we can reuse the common port rapidly */
89 if (setsockopt(listensockfd, SOL_SOCKET, SO_REUSEPORT,
90 (gs_sp_t )&on, sizeof(on)) != 0) {
91 gslog(LOG_EMERG,"Error::could not set socket option\n");
95 if (setsockopt(listensockfd, SOL_SOCKET, SO_REUSEADDR,
96 (gs_sp_t )&on, sizeof(on)) != 0) {
97 gslog(LOG_EMERG,"Error::could not set socket option\n");
101 if (bind(listensockfd, (struct sockaddr *) &serv_addr,
102 sizeof(serv_addr)) < 0) {
103 gslog(LOG_EMERG,"Error:Could not bind socket for tcp data stream");
109 listen(listensockfd,5);
110 clilen = sizeof(cli_addr);
111 fd=accept(listensockfd, (struct sockaddr *) &cli_addr, &clilen);
113 gslog(LOG_EMERG,"Error:Could not accept connection on tcp socket");
119 static void emit_socket() {
125 if((w=write(fd,&linebuf[o],l))==0) {
133 static void emit_line() {
136 printf("%s",linebuf);
143 int main(int argc, char* argv[]) {
144 gs_sp_t me = argv[0];
146 gs_int32_t schema, ch;
150 gs_uint32_t bufsz=8*1024*1024;
151 gs_int8_t rbuf[2*MAXTUPLESZ];
153 gs_int32_t numberoffields;
154 gs_int32_t verbose=0;
159 gs_int32_t n_actual_param;
160 gs_int32_t n_expected_param;
163 struct timeval tvs, tve, tvd;
167 gs_uint32_t tip1,tip2,tip3,tip4;
168 gs_sp_t instance_name;
170 gs_sp_t curl_address = NULL;
171 endpoint curl_endpoint;
172 gs_sp_t curl_url = NULL;
173 gs_sp_t curl_auth = NULL;
174 gs_uint32_t http_code;
176 gs_uint32_t ves_version=5;
179 gs_uint32_t tlimit = 0; // time limit in seconds
180 time_t start_time, curr_time;
185 while ((ch = getopt(argc, argv, "l:p:r:vXDC:U:A:V:")) != -1) {
191 tcpport=atoi(optarg);
203 tlimit = atoi(optarg);
206 ves_version = atoi(optarg);
209 curl_address = strdup(optarg);
210 if (sscanf(curl_address,"%u.%u.%u.%u:%hu",&tip1,&tip2,&tip3,&tip4,&(curl_endpoint.port))!= 5 ) {
211 gslog(LOG_EMERG,"Curl IP NOT DEFINED");
214 curl_endpoint.ip=htonl(tip1<<24|tip2<<16|tip3<<8|tip4);
215 curl_endpoint.port=htons(curl_endpoint.port);
218 curl_url = strdup(optarg);
221 curl_auth = strdup(optarg);
225 fprintf(stderr, "usage: %s [-r <bufsz>] [-p <port>] [-l <time_limit>] [-v] [-X] [-D] [-C <curl_dest>:<curl_port>] [-U <curl_url>] [-A <authentication_string>] [-V <ves_version>] <gshub-hostname>:<gshub-port> <gsinstance_name> query param1 param2...\n",
232 if (argc<3) goto usage;
234 if (sscanf(argv[0],"%u.%u.%u.%u:%hu",&tip1,&tip2,&tip3,&tip4,&(gshub.port))!= 5 ) {
235 gslog(LOG_EMERG,"HUB IP NOT DEFINED");
238 gshub.ip=htonl(tip1<<24|tip2<<16|tip3<<8|tip4);
239 gshub.port=htons(gshub.port);
240 instance_name=strdup(argv[1]);
241 if (set_hub(gshub)!=0) {
242 gslog(LOG_EMERG,"Could not set hub");
245 if (set_instance_name(instance_name)!=0) {
246 gslog(LOG_EMERG,"Could not set instance name");
250 if (get_initinstance(gshub,instance_name,&dummyep,1)!=0) {
251 gslog(LOG_EMERG,"Did not receive signal that GS is initiated\n");
255 // If this uses curl output, ensure consistency in the curl args
256 if(curl_address != NULL){
257 if(curl_url == NULL){
258 gslog(LOG_EMERG,"Curl IP defined, but there is no url (e.g. /foo/bar");
266 gettimeofday(&tvs, 0);
272 /* initialize host library and the sgroup */
274 if (verbose>=2) fprintf(stderr,"Initializing gscp\n");
276 if (ftaapp_init(bufsz)!=0) {
277 fprintf(stderr,"%s::error:could not initialize gscp\n", me);
281 signal(SIGTERM, hand);
282 signal(SIGINT, hand);
284 schema = ftaapp_get_fta_schema_by_name(argv[0]);
286 fprintf(stderr,"%s::error:could not get fta '%s' schema\n",
290 n_expected_param = ftaschema_parameter_len(schema);
291 if (n_expected_param == 0) {
295 n_actual_param = argc-1;
296 if(n_actual_param < n_expected_param){
297 fprintf(stderr,"Error, %d query parameters expected, %d provided.\n",n_expected_param, n_actual_param);
300 /* parse the params */
301 for (lcv = 1 ; lcv < argc ; lcv++) {
306 while (*e && *e != '=') e++;
308 fprintf(stderr,"param parse error '%s' (fmt 'key=val')\n",
313 rv = ftaschema_setparam_by_name(schema, k, e+1, strlen(e+1));
316 fprintf(stderr,"param setparam error '%s' (fmt 'key=val')\n",
321 if (ftaschema_create_param_block(schema, &pblk, &pblklen) < 0) {
322 fprintf(stderr, "ftaschema_create_param_block failed!\n");
326 // ftaschema_free(schema); /* XXXCDC */ // the schema continues to be used
329 if (verbose>=2) fprintf(stderr,"Initalized FTA\n");
331 fta_id=ftaapp_add_fta(argv[0],0,0,0,pblklen,pblk);
332 if (fta_id.streamid==0) {
333 fprintf(stderr,"%s::error:could not initialize fta %s\n",
337 /* XXXCDC: pblk is malloc'd, should we free it? */
339 if (verbose>=2) fprintf(stderr,"Get schema handle\n");
341 if ((schema=ftaapp_get_fta_schema(fta_id))<0) {
342 fprintf(stderr,"%s::error:could not get schema\n", me);
346 if ((numberoffields=ftaschema_tuple_len(schema))<0) {
347 fprintf(stderr,"%s::error:could not get number of fields in schema\n",
353 for(y=0; y<numberoffields;y++) {
354 printf("%s",ftaschema_field_name(schema,y));
355 if (y<numberoffields-1) printf("|");
359 if (xit) { // -X in command line
360 gettimeofday(&tve, 0);
361 timersub(&tve, &tvs, &tvd);
362 printf("TIME= %ld%06d sec\n", tvd.tv_sec, tvd.tv_usec);
363 hand(0); // effectively an exit
369 start_time = time(NULL);
371 int measurement_interval_pos = -1; // extract measurementInterval if present
372 char *field_names[numberoffields];
373 for(y=0; y<numberoffields;y++) {
374 field_names[y] = strdup(ftaschema_field_name(schema,y));
375 if(strcmp(field_names[y], "measurementInterval")==0)
376 measurement_interval_pos = y;
380 struct timeval tsample;
381 gettimeofday(&tsample, 0);
382 char start_ts[100], curr_ts[100];
383 sprintf(start_ts,"%ld%06d", tsample.tv_sec, tsample.tv_usec);
385 long unsigned int lineno=0;
386 long unsigned int seqno=0;
387 unsigned int measurement_interval;
388 while((code=ftaapp_get_tuple(&rfta_id,&rsize,rbuf,2*MAXTUPLESZ,0))>=0) {
390 if (dump) // -D in command line
392 if (ftaschema_is_eof_tuple(schema, rbuf)) {
393 /* initiate shutdown or something of that nature */
394 printf("#All data proccessed\n");
400 snprintf(linebuf,MAXLINE,"RESULT CODE => %u\n",code);
403 if ((code==0)&&(rfta_id.streamid == fta_id.streamid)) {
405 gettimeofday(&tsample, 0);
406 sprintf(curr_ts,"%ld%06d", tsample.tv_sec, tsample.tv_usec);
409 pos = snprintf(linebuf, MAXLINE,
410 "{\"event\": { \"commonEventHeader\": { "
411 "\"domain\": \"measurementsForVfScaling\", "
412 "\"eventId\": \"%s%u\", "
413 "\"eventType\": \"%s\", "
414 "\"eventName\": \"Measurement_MC_%s\", "
415 "\"lastEpochMicrosec\": %s, "
416 "\"priority\": \"Normal\", "
417 "\"reportingEntityName\": \"GS-LITE MC\", "
419 "\"sourceName\": \"meas_cmpgn_xapp\", "
420 "\"startEpochMicrosec\": %s, "
423 "\"measurementsForVfScalingFields\": { "
424 "\"additionalFields\": ["
425 ,argv[0],lineno, argv[0], argv[0], curr_ts, seqno, start_ts
428 pos = snprintf(linebuf, MAXLINE,
429 "{\"event\": { \"commonEventHeader\": { "
430 "\"domain\": \"measurement\", "
431 "\"eventId\": \"%s%u\", "
432 "\"eventType\": \"%s\", "
433 "\"eventName\": \"Measurement_MC_%s\", "
434 "\"lastEpochMicrosec\": %s, "
435 "\"priority\": \"Normal\", "
436 "\"reportingEntityName\": \"GS-LITE MC\", "
438 "\"sourceName\": \"meas_cmpgn_xapp\", "
439 "\"startEpochMicrosec\": %s, "
440 "\"version\": \"4.0.1\", "
441 "\"vesEventListenerVersion\": \"7.0.1\" "
443 "\"measurementFields\": { "
444 "\"additionalFields\": {"
445 ,argv[0],lineno, argv[0], argv[0], curr_ts, seqno, start_ts
449 measurement_interval = 0;
450 for(y=0; y<numberoffields;y++) {
451 struct access_result ar;
453 // printf("%s->",ftaschema_field_name(schema,y));
458 ar=ftaschema_get_field_by_index(schema,y,rbuf,rsize);
459 switch (ar.field_data_type) {
462 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%d\"}",field_names[y], ar.r.i);
464 pos += snprintf(linebuf+pos,MAXLINE-pos," \"%s\": \"%d\"",field_names[y], ar.r.i);
465 if(y==measurement_interval_pos)
466 measurement_interval = (unsigned int)ar.r.i;
470 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%u\"}",field_names[y], ar.r.ui);
472 pos += snprintf(linebuf+pos,MAXLINE-pos," \"%s\": \"%u\"",field_names[y], ar.r.ui);
473 if(y==measurement_interval_pos)
474 measurement_interval = (unsigned int)ar.r.ui;
478 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%u.%u.%u.%u\"}",field_names[y], ar.r.ui>>24&0xff,
483 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"%u.%u.%u.%u\"",field_names[y], ar.r.ui>>24&0xff,
491 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"",field_names[y]);
493 pos += snprintf(linebuf+pos,MAXLINE-pos," \"%s\": \"",field_names[y]);
496 for(x=0;x<4;x++) { if (ar.r.ip6.v[x]==0) zc++;}
498 snprintf(linebuf,MAXLINE,"");
500 unsigned char * a = (unsigned char *) &(ar.r.ip6.v[0]);
502 y=((unsigned)a[2*x])<<8|((unsigned) a[2*x+1]);
503 pos += snprintf(linebuf+pos,MAXLINE-pos,"%04x",y);
505 pos += snprintf(linebuf+pos,MAXLINE-pos,":");
509 pos+=snprintf(linebuf+pos,MAXLINE-pos,"::");
512 pos += snprintf(linebuf+pos, MAXLINE-pos,"\"}");
514 pos += snprintf(linebuf+pos, MAXLINE-pos,"\"");
520 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%u\"}",field_names[y], ar.r.ui);
522 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"%u\"}",field_names[y], ar.r.ui);
523 if(y==measurement_interval_pos)
524 measurement_interval = (unsigned int)ar.r.ui;
529 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"FALSE\"}",field_names[y]);
531 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"TRUE\"}",field_names[y]);
535 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"FALSE\"",field_names[y]);
537 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"TRUE\"",field_names[y]);
543 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%llu\"}",field_names[y], ar.r.ul);
545 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"%llu\"",field_names[y], ar.r.ul);
546 if(y==measurement_interval_pos)
547 measurement_interval = (unsigned int)ar.r.ul;
551 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%lld\"}",field_names[y], ar.r.l);
553 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"%lld\"",field_names[y], ar.r.l);
554 if(y==measurement_interval_pos)
555 measurement_interval = (unsigned int)ar.r.l;
559 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%f\"}",field_names[y], ar.r.f);
561 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"%f\"",field_names[y], ar.r.f);
570 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%f\"}",field_names[y], t);
572 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"%f\"",field_names[y], t);
578 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"",field_names[y]);
580 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"",field_names[y]);
584 src=(char*)ar.r.vs.offset;
585 for(x=0;x<ar.r.vs.length;x++){
587 if ((c<='~') && (c>=' ')) {
600 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"}");
602 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"");
611 snprintf(linebuf+pos, MAXLINE-pos,
612 "], \"measurementInterval\": %u, \"measurementsForVfScalingVersion\": 1"
613 "}}}\n", measurement_interval
616 snprintf(linebuf+pos, MAXLINE-pos,
617 "}, \"measurementInterval\": %u, \"measurementFieldsVersion\": \"4.0\""
618 "}}}\n", measurement_interval
621 if(curl_address==NULL){
624 http_post_request_hdr(curl_endpoint, curl_url, linebuf, &http_code, curl_auth);
625 if(http_code != 200 && http_code != 202){
626 gslog(LOG_WARNING, "http return code is %d\n",http_code);
629 if (verbose!=0) fflush(stdout);
631 if (rfta_id.streamid != fta_id.streamid)
632 fprintf(stderr,"Got unkown streamid %llu \n",rfta_id.streamid);
635 // whenever we receive a temp tuple check if we reached time limit
636 if ((code==2) && tlimit && (time(NULL)-start_time)>=tlimit) {
637 fprintf(stderr,"Reached time limit of %d seconds\n",tlimit);