1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
18 * Print ves formatted records to the console.
19 * Each line is a json record.
20 * Based on gsprintconsole.c, just differences in formatting.
33 #include <sys/types.h>
34 #include <sys/socket.h>
35 #include <netinet/in.h>
42 #include <rmr/RIC_message_types.h>
43 #include <sys/epoll.h>
49 #include "simple_http.h"
51 #include <schemaparser.h>
53 #define MAXLINE 100000
54 static unsigned tcpport=0;
55 static char linebuf[MAXLINE];
59 // how frequently we will log stats (expressed in tuples posted)
60 #define STAT_FREQUENCY 5
63 // Not all systems have timersub defined so make sure its ther
66 #define timersub(tvp, uvp, vvp) \
68 (vvp)->tv_sec = (tvp)->tv_sec - (uvp)->tv_sec; \
69 (vvp)->tv_usec = (tvp)->tv_usec - (uvp)->tv_usec; \
70 if ((vvp)->tv_usec < 0) { \
72 (vvp)->tv_usec += 1000000; \
80 fprintf(stderr, "exiting via signal handler %d...\n", iv);
84 static void wait_for_client() {
85 struct sockaddr_in serv_addr,cli_addr;
87 if (listensockfd==0) {
89 listensockfd=socket(AF_INET, SOCK_STREAM, 0);
90 if (listensockfd < 0) {
91 gslog(LOG_EMERG,"Error:Could not create socket for tcp data stream");
94 bzero((char *) &serv_addr, sizeof(serv_addr));
95 serv_addr.sin_family = AF_INET;
96 serv_addr.sin_addr.s_addr = INADDR_ANY;
97 serv_addr.sin_port = htons(tcpport);
99 /* make sure we can reuse the common port rapidly */
100 if (setsockopt(listensockfd, SOL_SOCKET, SO_REUSEPORT,
101 (gs_sp_t )&on, sizeof(on)) != 0) {
102 gslog(LOG_EMERG,"Error::could not set socket option");
106 if (setsockopt(listensockfd, SOL_SOCKET, SO_REUSEADDR,
107 (gs_sp_t )&on, sizeof(on)) != 0) {
108 gslog(LOG_EMERG,"Error::could not set socket option");
112 if (bind(listensockfd, (struct sockaddr *) &serv_addr,
113 sizeof(serv_addr)) < 0) {
114 gslog(LOG_EMERG,"Error:Could not bind socket for tcp data stream");
120 listen(listensockfd,5);
121 clilen = sizeof(cli_addr);
122 fd=accept(listensockfd, (struct sockaddr *) &cli_addr, &clilen);
124 gslog(LOG_EMERG,"Error:Could not accept connection on tcp socket");
130 static void emit_socket() {
136 if((w=write(fd,&linebuf[o],l))==0) {
144 static void emit_line() {
147 printf("%s",linebuf);
154 int main(int argc, char* argv[]) {
155 gs_sp_t me = argv[0];
157 gs_int32_t schema, ch;
161 gs_uint32_t bufsz=8*1024*1024;
162 gs_int8_t rbuf[2*MAXTUPLESZ];
164 gs_int32_t numberoffields;
165 gs_int32_t verbose=0;
170 gs_int32_t n_actual_param;
171 gs_int32_t n_expected_param;
174 struct timeval tvs, tve, tvd;
178 gs_uint32_t tip1,tip2,tip3,tip4;
179 gs_sp_t instance_name;
180 gs_sp_t rmr_port = NULL;
183 // RMR-related parameters
184 gs_int32_t rmr_mtype = MC_REPORT;
187 gs_sp_t curl_address = NULL;
188 endpoint curl_endpoint;
189 gs_sp_t curl_url = NULL;
190 gs_sp_t curl_auth = NULL;
191 gs_uint32_t http_code;
193 gs_uint32_t ves_version=7;
195 gs_uint32_t tlimit = 0; // time limit in seconds
196 time_t start_time, curr_time;
198 gs_uint64_t post_success_cnt = 0ULL;
199 gs_uint64_t post_failure_cnt = 0ULL;
202 void* mrc; //msg router context
203 struct epoll_event events[1]; // list of events to give to epoll
204 struct epoll_event epe; // event definition for event to listen to
205 gs_int32_t ep_fd = -1; // epoll's file des (given to epoll_wait)
206 gs_int32_t rcv_fd; // file des that NNG tickles -- give this to epoll to listen on
207 gs_int32_t nready; // number of events ready for receive
208 rmr_mbuf_t* rmr_sbuf; // send buffer
209 rmr_mbuf_t* rmr_rbuf; // received buffer
211 gs_uint64_t rmr_post_success_cnt = 0ULL;
212 gs_uint64_t rmr_post_failure_cnt = 0ULL;
217 while ((ch = getopt(argc, argv, "l:p:r:vXDC:U:R:A:V:")) != -1) {
223 tcpport=atoi(optarg);
235 tlimit = atoi(optarg);
238 ves_version = atoi(optarg);
241 curl_address = strdup(optarg);
242 if (sscanf(curl_address,"%u.%u.%u.%u:%hu",&tip1,&tip2,&tip3,&tip4,&(curl_endpoint.port))!= 5 ) {
243 gslog(LOG_EMERG,"Curl IP NOT DEFINED");
246 curl_endpoint.ip=htonl(tip1<<24|tip2<<16|tip3<<8|tip4);
247 curl_endpoint.port=htons(curl_endpoint.port);
250 rmr_port=strdup(optarg);
253 curl_url = strdup(optarg);
256 curl_auth = strdup(optarg);
260 fprintf(stderr, "usage: %s [-r <bufsz>] [-p <port>] [-l <time_limit>] [-v] [-X] [-D] [-C <curl_dest>:<curl_port>] [-U <curl_url>] [-A <authentication_string>] [-V <ves_version>] [-R <rmr_port>] <gshub-hostname>:<gshub-port> <gsinstance_name> query param1 param2...\n",
267 if (argc<3) goto usage;
269 if (sscanf(argv[0],"%u.%u.%u.%u:%hu",&tip1,&tip2,&tip3,&tip4,&(gshub.port))!= 5 ) {
270 gslog(LOG_EMERG,"HUB IP NOT DEFINED");
273 gshub.ip=htonl(tip1<<24|tip2<<16|tip3<<8|tip4);
274 gshub.port=htons(gshub.port);
275 instance_name=strdup(argv[1]);
276 if (set_hub(gshub)!=0) {
277 gslog(LOG_EMERG,"Could not set hub");
280 if (set_instance_name(instance_name)!=0) {
281 gslog(LOG_EMERG,"Could not set instance name");
285 if (get_initinstance(gshub,instance_name,&dummyep,1)!=0) {
286 gslog(LOG_EMERG,"Did not receive signal that GS is initiated");
290 // If this uses curl output, ensure consistency in the curl args
291 if(curl_address != NULL){
292 if(curl_url == NULL){
293 gslog(LOG_EMERG,"Curl IP defined, but there is no url (e.g. /foo/bar");
301 gettimeofday(&tvs, 0);
309 /* initialize RMR library */
310 if( (mrc = rmr_init( rmr_port, 1400, RMRFL_NONE )) == NULL ) {
311 fprintf(stderr, "%s::error:unable to initialise RMR\n", me);
315 rcv_fd = rmr_get_rcvfd( mrc ); // set up epoll things, start by getting the FD from MRr
317 fprintf(stderr, "%s::error:unable to set up polling fd\n", me);
320 if( (ep_fd = epoll_create1( 0 )) < 0 ) {
321 fprintf(stderr, "%s::error:unable to create epoll fd: %d\n", me, errno);
324 epe.events = EPOLLIN;
325 epe.data.fd = rcv_fd;
327 if( epoll_ctl( ep_fd, EPOLL_CTL_ADD, rcv_fd, &epe ) != 0 ) {
328 fprintf(stderr, "%s::error:epoll_ctl status not 0 : %s\n", me, strerror(errno));
332 rmr_sbuf = rmr_alloc_msg( mrc, MAXLINE ); // alloc first send buffer; subsequent buffers allcoated on send
333 rmr_rbuf = NULL; // don't need to alloc receive buffer
335 while( ! rmr_ready( mrc ) ) { // must have a route table before we can send; wait til RMr say it has one
338 fprintf( stderr, "%s: RMR is ready\n", argv[0]);
340 fprintf(stderr,"Runtime libraries built without RMR support. Rebuild with RMR_ENABLED defined in gsoptions.h\n");
345 /* initialize host library and the sgroup */
347 if (verbose>=2) fprintf(stderr,"Initializing gscp\n");
349 if (ftaapp_init(bufsz)!=0) {
350 fprintf(stderr,"%s::error:could not initialize gscp\n", me);
354 signal(SIGTERM, hand);
355 signal(SIGINT, hand);
357 schema = ftaapp_get_fta_schema_by_name(argv[0]);
359 fprintf(stderr,"%s::error:could not get fta '%s' schema\n",
363 n_expected_param = ftaschema_parameter_len(schema);
364 if (n_expected_param == 0) {
368 n_actual_param = argc-1;
369 if(n_actual_param < n_expected_param){
370 fprintf(stderr,"Error, %d query parameters expected, %d provided.\n",n_expected_param, n_actual_param);
373 /* parse the params */
374 for (lcv = 1 ; lcv < argc ; lcv++) {
379 while (*e && *e != '=') e++;
381 fprintf(stderr,"param parse error '%s' (fmt 'key=val')\n",
386 rv = ftaschema_setparam_by_name(schema, k, e+1, strlen(e+1));
389 fprintf(stderr,"param setparam error '%s' (fmt 'key=val')\n",
394 if (ftaschema_create_param_block(schema, &pblk, &pblklen) < 0) {
395 fprintf(stderr, "ftaschema_create_param_block failed!\n");
399 // ftaschema_free(schema); /* XXXCDC */ // the schema continues to be used
402 if (verbose>=2) fprintf(stderr,"Initalized FTA\n");
404 fta_id=ftaapp_add_fta(argv[0],0,0,0,pblklen,pblk);
405 if (fta_id.streamid==0) {
406 fprintf(stderr,"%s::error:could not initialize fta %s\n",
410 /* XXXCDC: pblk is malloc'd, should we free it? */
412 if (verbose>=2) fprintf(stderr,"Get schema handle\n");
414 if ((schema=ftaapp_get_fta_schema(fta_id))<0) {
415 fprintf(stderr,"%s::error:could not get schema\n", me);
419 if ((numberoffields=ftaschema_tuple_len(schema))<0) {
420 fprintf(stderr,"%s::error:could not get number of fields in schema\n",
426 for(y=0; y<numberoffields;y++) {
427 printf("%s",ftaschema_field_name(schema,y));
428 if (y<numberoffields-1) printf("|");
432 if (xit) { // -X in command line
433 gettimeofday(&tve, 0);
434 timersub(&tve, &tvs, &tvd);
435 printf("TIME= %ld%06d sec\n", tvd.tv_sec, tvd.tv_usec);
436 hand(0); // effectively an exit
442 start_time = time(NULL);
444 int measurement_interval_pos = -1; // extract measurementInterval if present
445 char *field_names[numberoffields];
446 for(y=0; y<numberoffields;y++) {
447 field_names[y] = strdup(ftaschema_field_name(schema,y));
448 if(strcmp(field_names[y], "measurementInterval")==0)
449 measurement_interval_pos = y;
453 struct timeval tsample;
454 gettimeofday(&tsample, 0);
455 char start_ts[100], curr_ts[100];
456 sprintf(start_ts,"%ld%06d", tsample.tv_sec, tsample.tv_usec);
458 long unsigned int lineno=0;
459 long unsigned int seqno=0;
460 double measurement_interval;
461 while((code=ftaapp_get_tuple(&rfta_id,&rsize,rbuf,2*MAXTUPLESZ,0))>=0) {
463 if (dump) // -D in command line
465 if (ftaschema_is_eof_tuple(schema, rbuf)) {
466 /* initiate shutdown or something of that nature */
467 printf("#All data proccessed\n");
473 snprintf(linebuf,MAXLINE,"RESULT CODE => %u\n",code);
476 if ((code==0)&&(rfta_id.streamid == fta_id.streamid)) {
478 gettimeofday(&tsample, 0);
479 sprintf(curr_ts,"%ld%06d", tsample.tv_sec, tsample.tv_usec);
482 pos = snprintf(linebuf, MAXLINE,
483 "{\"event\": { \"commonEventHeader\": { "
484 "\"domain\": \"measurementsForVfScaling\", "
485 "\"eventId\": \"%s%u\", "
486 "\"eventType\": \"%s\", "
487 "\"eventName\": \"Measurement_MC_%s\", "
488 "\"lastEpochMicrosec\": %s, "
489 "\"priority\": \"Normal\", "
490 "\"reportingEntityName\": \"GS-LITE MC\", "
492 "\"sourceName\": \"meas_cmpgn_xapp\", "
493 "\"startEpochMicrosec\": %s, "
496 "\"measurementsForVfScalingFields\": { "
497 "\"additionalFields\": ["
498 ,argv[0],lineno, argv[0], argv[0], curr_ts, seqno, start_ts
501 pos = snprintf(linebuf, MAXLINE,
502 "{\"event\": { \"commonEventHeader\": { "
503 "\"domain\": \"measurement\", "
504 "\"eventId\": \"%s%u\", "
505 "\"eventType\": \"%s\", "
506 "\"eventName\": \"Measurement_MC_%s\", "
507 "\"lastEpochMicrosec\": %s, "
508 "\"priority\": \"Normal\", "
509 "\"reportingEntityName\": \"GS-LITE MC\", "
511 "\"sourceName\": \"meas_cmpgn_xapp\", "
512 "\"startEpochMicrosec\": %s, "
513 "\"version\": \"4.0.1\", "
514 "\"vesEventListenerVersion\": \"7.0.1\" "
516 "\"measurementFields\": { "
517 "\"additionalFields\": {"
518 ,argv[0],lineno, argv[0], argv[0], curr_ts, seqno, start_ts
522 measurement_interval = 0;
523 for(y=0; y<numberoffields;y++) {
524 struct access_result ar;
526 // printf("%s->",ftaschema_field_name(schema,y));
531 ar=ftaschema_get_field_by_index(schema,y,rbuf,rsize);
532 switch (ar.field_data_type) {
535 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%d\"}",field_names[y], ar.r.i);
537 pos += snprintf(linebuf+pos,MAXLINE-pos," \"%s\": \"%d\"",field_names[y], ar.r.i);
538 if(y==measurement_interval_pos)
539 measurement_interval = (double)ar.r.i;
543 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%u\"}",field_names[y], ar.r.ui);
545 pos += snprintf(linebuf+pos,MAXLINE-pos," \"%s\": \"%u\"",field_names[y], ar.r.ui);
546 if(y==measurement_interval_pos)
547 measurement_interval = (double)ar.r.ui;
551 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%u.%u.%u.%u\"}",field_names[y], ar.r.ui>>24&0xff,
556 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"%u.%u.%u.%u\"",field_names[y], ar.r.ui>>24&0xff,
564 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"",field_names[y]);
566 pos += snprintf(linebuf+pos,MAXLINE-pos," \"%s\": \"",field_names[y]);
569 for(x=0;x<4;x++) { if (ar.r.ip6.v[x]==0) zc++;}
571 snprintf(linebuf,MAXLINE,"");
573 unsigned char * a = (unsigned char *) &(ar.r.ip6.v[0]);
575 y=((unsigned)a[2*x])<<8|((unsigned) a[2*x+1]);
576 pos += snprintf(linebuf+pos,MAXLINE-pos,"%04x",y);
578 pos += snprintf(linebuf+pos,MAXLINE-pos,":");
582 pos+=snprintf(linebuf+pos,MAXLINE-pos,"::");
585 pos += snprintf(linebuf+pos, MAXLINE-pos,"\"}");
587 pos += snprintf(linebuf+pos, MAXLINE-pos,"\"");
593 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%u\"}",field_names[y], ar.r.ui);
595 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"%u\"}",field_names[y], ar.r.ui);
596 if(y==measurement_interval_pos)
597 measurement_interval = (double)ar.r.ui;
602 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"FALSE\"}",field_names[y]);
604 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"TRUE\"}",field_names[y]);
608 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"FALSE\"",field_names[y]);
610 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"TRUE\"",field_names[y]);
616 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%llu\"}",field_names[y], ar.r.ul);
618 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"%llu\"",field_names[y], ar.r.ul);
619 if(y==measurement_interval_pos)
620 measurement_interval = (double)ar.r.ul;
624 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%lld\"}",field_names[y], ar.r.l);
626 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"%lld\"",field_names[y], ar.r.l);
627 if(y==measurement_interval_pos)
628 measurement_interval = (double)ar.r.l;
632 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%f\"}",field_names[y], ar.r.f);
634 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"%f\"",field_names[y], ar.r.f);
635 if(y==measurement_interval_pos)
636 measurement_interval = (double)ar.r.f;
645 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%f\"}",field_names[y], t);
647 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"%f\"",field_names[y], t);
653 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"",field_names[y]);
655 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"",field_names[y]);
659 src=(char*)ar.r.vs.offset;
660 for(x=0;x<ar.r.vs.length;x++){
662 if ((c<='~') && (c>=' ')) {
675 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"}");
677 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"");
686 snprintf(linebuf+pos, MAXLINE-pos,
687 "], \"measurementInterval\": %f, \"measurementsForVfScalingVersion\": 1"
688 "}}}\n", measurement_interval
691 snprintf(linebuf+pos, MAXLINE-pos,
692 "}, \"measurementInterval\": %f, \"measurementFieldsVersion\": \"4.0\""
693 "}}}\n", measurement_interval
699 rmr_sbuf->mtype = rmr_mtype; // fill in the message bits
700 rmr_sbuf->len = strlen(linebuf) + 1; // our receiver likely wants a nice acsii-z string
701 memcpy(rmr_sbuf->payload, linebuf, rmr_sbuf->len);
703 rmr_sbuf = rmr_send_msg( mrc, rmr_sbuf); // send it (send returns an empty payload on success, or the original payload on fail/retry)
704 while( rmr_sbuf->state == RMR_ERR_RETRY ) { // soft failure (device busy?) retry
705 rmr_sbuf = rmr_send_msg( mrc, rmr_sbuf); // retry send until it's good (simple test; real programmes should do better)
707 if(rmr_sbuf->state != RMR_OK) {
708 gslog(LOG_WARNING, "rmr_send_msg() failure, strerror(errno) is %s", strerror(errno));
709 rmr_post_failure_cnt++;
711 rmr_post_success_cnt++;
712 if (((rmr_post_success_cnt+rmr_post_failure_cnt) % STAT_FREQUENCY) == 0)
713 gslog(LOG_WARNING, "%s: successful RMR posts - %llu, failed RMR posts - %llu", argv[0], rmr_post_success_cnt, rmr_post_failure_cnt);
717 if(curl_address==NULL){
718 if (!rmr_port) // if neither VES collector nor RMR is specified print to standard output
721 http_post_request_hdr(curl_endpoint, curl_url, linebuf, &http_code, curl_auth);
722 if(http_code != 200 && http_code != 202){
724 gslog(LOG_WARNING, "http return code is %d",http_code);
728 if (((post_success_cnt+post_failure_cnt) % STAT_FREQUENCY) == 0)
729 gslog(LOG_WARNING, "%s: successful ves posts - %llu, failed ves posts - %llu", argv[0], post_success_cnt, post_failure_cnt);
731 if (verbose!=0) fflush(stdout);
733 if (rfta_id.streamid != fta_id.streamid)
734 fprintf(stderr,"Got unknown streamid %llu \n",rfta_id.streamid);
737 // whenever we receive a temp tuple check if we reached time limit
738 if ((code==2) && tlimit && (time(NULL)-start_time)>=tlimit) {
739 fprintf(stderr,"Reached time limit of %d seconds\n",tlimit);