1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
18 * Print ves formatted records to the console.
19 * Each line is a json record.
20 * Based on gsprintconsole.c, just differences in formatting.
33 #include <sys/types.h>
34 #include <sys/socket.h>
35 #include <netinet/in.h>
41 #include "simple_http.h"
43 #include <schemaparser.h>
45 #define MAXLINE 100000
46 static unsigned tcpport=0;
47 static char linebuf[MAXLINE];
51 // how frequently we will log stats (expressed in tuples posted)
52 #define STAT_FREQUENCY 5
55 // Not all systems have timersub defined so make sure its ther
58 #define timersub(tvp, uvp, vvp) \
60 (vvp)->tv_sec = (tvp)->tv_sec - (uvp)->tv_sec; \
61 (vvp)->tv_usec = (tvp)->tv_usec - (uvp)->tv_usec; \
62 if ((vvp)->tv_usec < 0) { \
64 (vvp)->tv_usec += 1000000; \
72 fprintf(stderr, "exiting via signal handler %d...\n", iv);
76 static void wait_for_client() {
77 struct sockaddr_in serv_addr,cli_addr;
79 if (listensockfd==0) {
81 listensockfd=socket(AF_INET, SOCK_STREAM, 0);
82 if (listensockfd < 0) {
83 gslog(LOG_EMERG,"Error:Could not create socket for tcp data stream");
86 bzero((char *) &serv_addr, sizeof(serv_addr));
87 serv_addr.sin_family = AF_INET;
88 serv_addr.sin_addr.s_addr = INADDR_ANY;
89 serv_addr.sin_port = htons(tcpport);
91 /* make sure we can reuse the common port rapidly */
92 if (setsockopt(listensockfd, SOL_SOCKET, SO_REUSEPORT,
93 (gs_sp_t )&on, sizeof(on)) != 0) {
94 gslog(LOG_EMERG,"Error::could not set socket option");
98 if (setsockopt(listensockfd, SOL_SOCKET, SO_REUSEADDR,
99 (gs_sp_t )&on, sizeof(on)) != 0) {
100 gslog(LOG_EMERG,"Error::could not set socket option");
104 if (bind(listensockfd, (struct sockaddr *) &serv_addr,
105 sizeof(serv_addr)) < 0) {
106 gslog(LOG_EMERG,"Error:Could not bind socket for tcp data stream");
112 listen(listensockfd,5);
113 clilen = sizeof(cli_addr);
114 fd=accept(listensockfd, (struct sockaddr *) &cli_addr, &clilen);
116 gslog(LOG_EMERG,"Error:Could not accept connection on tcp socket");
122 static void emit_socket() {
128 if((w=write(fd,&linebuf[o],l))==0) {
136 static void emit_line() {
139 printf("%s",linebuf);
146 int main(int argc, char* argv[]) {
147 gs_sp_t me = argv[0];
149 gs_int32_t schema, ch;
153 gs_uint32_t bufsz=8*1024*1024;
154 gs_int8_t rbuf[2*MAXTUPLESZ];
156 gs_int32_t numberoffields;
157 gs_int32_t verbose=0;
162 gs_int32_t n_actual_param;
163 gs_int32_t n_expected_param;
166 struct timeval tvs, tve, tvd;
170 gs_uint32_t tip1,tip2,tip3,tip4;
171 gs_sp_t instance_name;
173 gs_sp_t curl_address = NULL;
174 endpoint curl_endpoint;
175 gs_sp_t curl_url = NULL;
176 gs_sp_t curl_auth = NULL;
177 gs_uint32_t http_code;
179 gs_uint32_t ves_version=7;
182 gs_uint32_t tlimit = 0; // time limit in seconds
183 time_t start_time, curr_time;
185 gs_uint64_t post_success_cnt = 0ULL;
186 gs_uint64_t post_failure_cnt = 0ULL;
191 while ((ch = getopt(argc, argv, "l:p:r:vXDC:U:A:V:")) != -1) {
197 tcpport=atoi(optarg);
209 tlimit = atoi(optarg);
212 ves_version = atoi(optarg);
215 curl_address = strdup(optarg);
216 if (sscanf(curl_address,"%u.%u.%u.%u:%hu",&tip1,&tip2,&tip3,&tip4,&(curl_endpoint.port))!= 5 ) {
217 gslog(LOG_EMERG,"Curl IP NOT DEFINED");
220 curl_endpoint.ip=htonl(tip1<<24|tip2<<16|tip3<<8|tip4);
221 curl_endpoint.port=htons(curl_endpoint.port);
224 curl_url = strdup(optarg);
227 curl_auth = strdup(optarg);
231 fprintf(stderr, "usage: %s [-r <bufsz>] [-p <port>] [-l <time_limit>] [-v] [-X] [-D] [-C <curl_dest>:<curl_port>] [-U <curl_url>] [-A <authentication_string>] [-V <ves_version>] <gshub-hostname>:<gshub-port> <gsinstance_name> query param1 param2...\n",
238 if (argc<3) goto usage;
240 if (sscanf(argv[0],"%u.%u.%u.%u:%hu",&tip1,&tip2,&tip3,&tip4,&(gshub.port))!= 5 ) {
241 gslog(LOG_EMERG,"HUB IP NOT DEFINED");
244 gshub.ip=htonl(tip1<<24|tip2<<16|tip3<<8|tip4);
245 gshub.port=htons(gshub.port);
246 instance_name=strdup(argv[1]);
247 if (set_hub(gshub)!=0) {
248 gslog(LOG_EMERG,"Could not set hub");
251 if (set_instance_name(instance_name)!=0) {
252 gslog(LOG_EMERG,"Could not set instance name");
256 if (get_initinstance(gshub,instance_name,&dummyep,1)!=0) {
257 gslog(LOG_EMERG,"Did not receive signal that GS is initiated");
261 // If this uses curl output, ensure consistency in the curl args
262 if(curl_address != NULL){
263 if(curl_url == NULL){
264 gslog(LOG_EMERG,"Curl IP defined, but there is no url (e.g. /foo/bar");
272 gettimeofday(&tvs, 0);
278 /* initialize host library and the sgroup */
280 if (verbose>=2) fprintf(stderr,"Initializing gscp\n");
282 if (ftaapp_init(bufsz)!=0) {
283 fprintf(stderr,"%s::error:could not initialize gscp\n", me);
287 signal(SIGTERM, hand);
288 signal(SIGINT, hand);
290 schema = ftaapp_get_fta_schema_by_name(argv[0]);
292 fprintf(stderr,"%s::error:could not get fta '%s' schema\n",
296 n_expected_param = ftaschema_parameter_len(schema);
297 if (n_expected_param == 0) {
301 n_actual_param = argc-1;
302 if(n_actual_param < n_expected_param){
303 fprintf(stderr,"Error, %d query parameters expected, %d provided.\n",n_expected_param, n_actual_param);
306 /* parse the params */
307 for (lcv = 1 ; lcv < argc ; lcv++) {
312 while (*e && *e != '=') e++;
314 fprintf(stderr,"param parse error '%s' (fmt 'key=val')\n",
319 rv = ftaschema_setparam_by_name(schema, k, e+1, strlen(e+1));
322 fprintf(stderr,"param setparam error '%s' (fmt 'key=val')\n",
327 if (ftaschema_create_param_block(schema, &pblk, &pblklen) < 0) {
328 fprintf(stderr, "ftaschema_create_param_block failed!\n");
332 // ftaschema_free(schema); /* XXXCDC */ // the schema continues to be used
335 if (verbose>=2) fprintf(stderr,"Initalized FTA\n");
337 fta_id=ftaapp_add_fta(argv[0],0,0,0,pblklen,pblk);
338 if (fta_id.streamid==0) {
339 fprintf(stderr,"%s::error:could not initialize fta %s\n",
343 /* XXXCDC: pblk is malloc'd, should we free it? */
345 if (verbose>=2) fprintf(stderr,"Get schema handle\n");
347 if ((schema=ftaapp_get_fta_schema(fta_id))<0) {
348 fprintf(stderr,"%s::error:could not get schema\n", me);
352 if ((numberoffields=ftaschema_tuple_len(schema))<0) {
353 fprintf(stderr,"%s::error:could not get number of fields in schema\n",
359 for(y=0; y<numberoffields;y++) {
360 printf("%s",ftaschema_field_name(schema,y));
361 if (y<numberoffields-1) printf("|");
365 if (xit) { // -X in command line
366 gettimeofday(&tve, 0);
367 timersub(&tve, &tvs, &tvd);
368 printf("TIME= %ld%06d sec\n", tvd.tv_sec, tvd.tv_usec);
369 hand(0); // effectively an exit
375 start_time = time(NULL);
377 int measurement_interval_pos = -1; // extract measurementInterval if present
378 char *field_names[numberoffields];
379 for(y=0; y<numberoffields;y++) {
380 field_names[y] = strdup(ftaschema_field_name(schema,y));
381 if(strcmp(field_names[y], "measurementInterval")==0)
382 measurement_interval_pos = y;
386 struct timeval tsample;
387 gettimeofday(&tsample, 0);
388 char start_ts[100], curr_ts[100];
389 sprintf(start_ts,"%ld%06d", tsample.tv_sec, tsample.tv_usec);
391 long unsigned int lineno=0;
392 long unsigned int seqno=0;
393 double measurement_interval;
394 while((code=ftaapp_get_tuple(&rfta_id,&rsize,rbuf,2*MAXTUPLESZ,0))>=0) {
396 if (dump) // -D in command line
398 if (ftaschema_is_eof_tuple(schema, rbuf)) {
399 /* initiate shutdown or something of that nature */
400 printf("#All data proccessed\n");
406 snprintf(linebuf,MAXLINE,"RESULT CODE => %u\n",code);
409 if ((code==0)&&(rfta_id.streamid == fta_id.streamid)) {
411 gettimeofday(&tsample, 0);
412 sprintf(curr_ts,"%ld%06d", tsample.tv_sec, tsample.tv_usec);
415 pos = snprintf(linebuf, MAXLINE,
416 "{\"event\": { \"commonEventHeader\": { "
417 "\"domain\": \"measurementsForVfScaling\", "
418 "\"eventId\": \"%s%u\", "
419 "\"eventType\": \"%s\", "
420 "\"eventName\": \"Measurement_MC_%s\", "
421 "\"lastEpochMicrosec\": %s, "
422 "\"priority\": \"Normal\", "
423 "\"reportingEntityName\": \"GS-LITE MC\", "
425 "\"sourceName\": \"meas_cmpgn_xapp\", "
426 "\"startEpochMicrosec\": %s, "
429 "\"measurementsForVfScalingFields\": { "
430 "\"additionalFields\": ["
431 ,argv[0],lineno, argv[0], argv[0], curr_ts, seqno, start_ts
434 pos = snprintf(linebuf, MAXLINE,
435 "{\"event\": { \"commonEventHeader\": { "
436 "\"domain\": \"measurement\", "
437 "\"eventId\": \"%s%u\", "
438 "\"eventType\": \"%s\", "
439 "\"eventName\": \"Measurement_MC_%s\", "
440 "\"lastEpochMicrosec\": %s, "
441 "\"priority\": \"Normal\", "
442 "\"reportingEntityName\": \"GS-LITE MC\", "
444 "\"sourceName\": \"meas_cmpgn_xapp\", "
445 "\"startEpochMicrosec\": %s, "
446 "\"version\": \"4.0.1\", "
447 "\"vesEventListenerVersion\": \"7.0.1\" "
449 "\"measurementFields\": { "
450 "\"additionalFields\": {"
451 ,argv[0],lineno, argv[0], argv[0], curr_ts, seqno, start_ts
455 measurement_interval = 0;
456 for(y=0; y<numberoffields;y++) {
457 struct access_result ar;
459 // printf("%s->",ftaschema_field_name(schema,y));
464 ar=ftaschema_get_field_by_index(schema,y,rbuf,rsize);
465 switch (ar.field_data_type) {
468 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%d\"}",field_names[y], ar.r.i);
470 pos += snprintf(linebuf+pos,MAXLINE-pos," \"%s\": \"%d\"",field_names[y], ar.r.i);
471 if(y==measurement_interval_pos)
472 measurement_interval = (double)ar.r.i;
476 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%u\"}",field_names[y], ar.r.ui);
478 pos += snprintf(linebuf+pos,MAXLINE-pos," \"%s\": \"%u\"",field_names[y], ar.r.ui);
479 if(y==measurement_interval_pos)
480 measurement_interval = (double)ar.r.ui;
484 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%u.%u.%u.%u\"}",field_names[y], ar.r.ui>>24&0xff,
489 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"%u.%u.%u.%u\"",field_names[y], ar.r.ui>>24&0xff,
497 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"",field_names[y]);
499 pos += snprintf(linebuf+pos,MAXLINE-pos," \"%s\": \"",field_names[y]);
502 for(x=0;x<4;x++) { if (ar.r.ip6.v[x]==0) zc++;}
504 snprintf(linebuf,MAXLINE,"");
506 unsigned char * a = (unsigned char *) &(ar.r.ip6.v[0]);
508 y=((unsigned)a[2*x])<<8|((unsigned) a[2*x+1]);
509 pos += snprintf(linebuf+pos,MAXLINE-pos,"%04x",y);
511 pos += snprintf(linebuf+pos,MAXLINE-pos,":");
515 pos+=snprintf(linebuf+pos,MAXLINE-pos,"::");
518 pos += snprintf(linebuf+pos, MAXLINE-pos,"\"}");
520 pos += snprintf(linebuf+pos, MAXLINE-pos,"\"");
526 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%u\"}",field_names[y], ar.r.ui);
528 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"%u\"}",field_names[y], ar.r.ui);
529 if(y==measurement_interval_pos)
530 measurement_interval = (double)ar.r.ui;
535 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"FALSE\"}",field_names[y]);
537 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"TRUE\"}",field_names[y]);
541 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"FALSE\"",field_names[y]);
543 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"TRUE\"",field_names[y]);
549 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%llu\"}",field_names[y], ar.r.ul);
551 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"%llu\"",field_names[y], ar.r.ul);
552 if(y==measurement_interval_pos)
553 measurement_interval = (double)ar.r.ul;
557 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%lld\"}",field_names[y], ar.r.l);
559 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"%lld\"",field_names[y], ar.r.l);
560 if(y==measurement_interval_pos)
561 measurement_interval = (double)ar.r.l;
565 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%f\"}",field_names[y], ar.r.f);
567 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"%f\"",field_names[y], ar.r.f);
568 if(y==measurement_interval_pos)
569 measurement_interval = (double)ar.r.f;
578 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%f\"}",field_names[y], t);
580 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"%f\"",field_names[y], t);
586 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"",field_names[y]);
588 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"",field_names[y]);
592 src=(char*)ar.r.vs.offset;
593 for(x=0;x<ar.r.vs.length;x++){
595 if ((c<='~') && (c>=' ')) {
608 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"}");
610 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"");
619 snprintf(linebuf+pos, MAXLINE-pos,
620 "], \"measurementInterval\": %f, \"measurementsForVfScalingVersion\": 1"
621 "}}}\n", measurement_interval
624 snprintf(linebuf+pos, MAXLINE-pos,
625 "}, \"measurementInterval\": %f, \"measurementFieldsVersion\": \"4.0\""
626 "}}}\n", measurement_interval
629 if(curl_address==NULL){
632 http_post_request_hdr(curl_endpoint, curl_url, linebuf, &http_code, curl_auth);
633 if(http_code != 200 && http_code != 202){
635 gslog(LOG_WARNING, "http return code is %d",http_code);
639 if (((post_success_cnt+post_failure_cnt) % STAT_FREQUENCY) == 0)
640 gslog(LOG_WARNING, "%s: successful ves posts - %llu, failed ves posts - %llu", argv[0], post_success_cnt, post_failure_cnt);
642 if (verbose!=0) fflush(stdout);
644 if (rfta_id.streamid != fta_id.streamid)
645 fprintf(stderr,"Got unknown streamid %llu \n",rfta_id.streamid);
648 // whenever we receive a temp tuple check if we reached time limit
649 if ((code==2) && tlimit && (time(NULL)-start_time)>=tlimit) {
650 fprintf(stderr,"Reached time limit of %d seconds\n",tlimit);