1 /* ------------------------------------------------
2 Copyright 2014 AT&T Intellectual Property
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ------------------------------------------- */
18 * Print ves formatted records to the console.
19 * Each line is a json record.
20 * Based on gsprintconsole.c, just differences in formatting.
33 #include <sys/types.h>
34 #include <sys/socket.h>
35 #include <netinet/in.h>
39 #include <sys/epoll.h>
42 #include <rmr/RIC_message_types.h>
48 #include "simple_http.h"
50 #include <schemaparser.h>
52 #define MAXLINE 100000
53 static unsigned tcpport=0;
54 static char linebuf[MAXLINE];
58 // how frequently we will log stats (expressed in tuples posted)
59 #define STAT_FREQUENCY 5
62 // Not all systems have timersub defined so make sure its ther
65 #define timersub(tvp, uvp, vvp) \
67 (vvp)->tv_sec = (tvp)->tv_sec - (uvp)->tv_sec; \
68 (vvp)->tv_usec = (tvp)->tv_usec - (uvp)->tv_usec; \
69 if ((vvp)->tv_usec < 0) { \
71 (vvp)->tv_usec += 1000000; \
79 fprintf(stderr, "exiting via signal handler %d...\n", iv);
83 static void wait_for_client() {
84 struct sockaddr_in serv_addr,cli_addr;
86 if (listensockfd==0) {
88 listensockfd=socket(AF_INET, SOCK_STREAM, 0);
89 if (listensockfd < 0) {
90 gslog(LOG_EMERG,"Error:Could not create socket for tcp data stream");
93 bzero((char *) &serv_addr, sizeof(serv_addr));
94 serv_addr.sin_family = AF_INET;
95 serv_addr.sin_addr.s_addr = INADDR_ANY;
96 serv_addr.sin_port = htons(tcpport);
98 /* make sure we can reuse the common port rapidly */
99 if (setsockopt(listensockfd, SOL_SOCKET, SO_REUSEPORT,
100 (gs_sp_t )&on, sizeof(on)) != 0) {
101 gslog(LOG_EMERG,"Error::could not set socket option");
105 if (setsockopt(listensockfd, SOL_SOCKET, SO_REUSEADDR,
106 (gs_sp_t )&on, sizeof(on)) != 0) {
107 gslog(LOG_EMERG,"Error::could not set socket option");
111 if (bind(listensockfd, (struct sockaddr *) &serv_addr,
112 sizeof(serv_addr)) < 0) {
113 gslog(LOG_EMERG,"Error:Could not bind socket for tcp data stream");
119 listen(listensockfd,5);
120 clilen = sizeof(cli_addr);
121 fd=accept(listensockfd, (struct sockaddr *) &cli_addr, &clilen);
123 gslog(LOG_EMERG,"Error:Could not accept connection on tcp socket");
129 static void emit_socket() {
135 if((w=write(fd,&linebuf[o],l))==0) {
143 static void emit_line() {
146 printf("%s",linebuf);
153 int main(int argc, char* argv[]) {
154 gs_sp_t me = argv[0];
156 gs_int32_t schema, ch;
160 gs_uint32_t bufsz=8*1024*1024;
161 gs_int8_t rbuf[2*MAXTUPLESZ];
163 gs_int32_t numberoffields;
164 gs_int32_t verbose=0;
169 gs_int32_t n_actual_param;
170 gs_int32_t n_expected_param;
173 struct timeval tvs, tve, tvd;
177 gs_uint32_t tip1,tip2,tip3,tip4;
178 gs_sp_t instance_name;
180 // RMR-related parameters
181 gs_sp_t rmr_port = NULL;
182 gs_int32_t rmr_mtype = MC_REPORT;
184 gs_sp_t curl_address = NULL;
185 endpoint curl_endpoint;
186 gs_sp_t curl_url = NULL;
187 gs_sp_t curl_auth = NULL;
188 gs_uint32_t http_code;
190 gs_uint32_t ves_version=7;
192 void* mrc; //msg router context
193 struct epoll_event events[1]; // list of events to give to epoll
194 struct epoll_event epe; // event definition for event to listen to
195 gs_int32_t ep_fd = -1; // epoll's file des (given to epoll_wait)
196 gs_int32_t rcv_fd; // file des that NNG tickles -- give this to epoll to listen on
197 gs_int32_t nready; // number of events ready for receive
198 rmr_mbuf_t* rmr_sbuf; // send buffer
199 rmr_mbuf_t* rmr_rbuf; // received buffer
201 gs_uint32_t tlimit = 0; // time limit in seconds
202 time_t start_time, curr_time;
204 gs_uint64_t post_success_cnt = 0ULL;
205 gs_uint64_t post_failure_cnt = 0ULL;
207 gs_uint64_t rmr_post_success_cnt = 0ULL;
208 gs_uint64_t rmr_post_failure_cnt = 0ULL;
213 while ((ch = getopt(argc, argv, "l:p:r:vXDC:U:R:A:V:")) != -1) {
219 tcpport=atoi(optarg);
231 tlimit = atoi(optarg);
234 ves_version = atoi(optarg);
237 curl_address = strdup(optarg);
238 if (sscanf(curl_address,"%u.%u.%u.%u:%hu",&tip1,&tip2,&tip3,&tip4,&(curl_endpoint.port))!= 5 ) {
239 gslog(LOG_EMERG,"Curl IP NOT DEFINED");
242 curl_endpoint.ip=htonl(tip1<<24|tip2<<16|tip3<<8|tip4);
243 curl_endpoint.port=htons(curl_endpoint.port);
246 rmr_port=strdup(optarg);
249 curl_url = strdup(optarg);
252 curl_auth = strdup(optarg);
256 fprintf(stderr, "usage: %s [-r <bufsz>] [-p <port>] [-l <time_limit>] [-v] [-X] [-D] [-C <curl_dest>:<curl_port>] [-U <curl_url>] [-A <authentication_string>] [-V <ves_version>] [-R <rmr_port>] <gshub-hostname>:<gshub-port> <gsinstance_name> query param1 param2...\n",
263 if (argc<3) goto usage;
265 if (sscanf(argv[0],"%u.%u.%u.%u:%hu",&tip1,&tip2,&tip3,&tip4,&(gshub.port))!= 5 ) {
266 gslog(LOG_EMERG,"HUB IP NOT DEFINED");
269 gshub.ip=htonl(tip1<<24|tip2<<16|tip3<<8|tip4);
270 gshub.port=htons(gshub.port);
271 instance_name=strdup(argv[1]);
272 if (set_hub(gshub)!=0) {
273 gslog(LOG_EMERG,"Could not set hub");
276 if (set_instance_name(instance_name)!=0) {
277 gslog(LOG_EMERG,"Could not set instance name");
281 if (get_initinstance(gshub,instance_name,&dummyep,1)!=0) {
282 gslog(LOG_EMERG,"Did not receive signal that GS is initiated");
286 // If this uses curl output, ensure consistency in the curl args
287 if(curl_address != NULL){
288 if(curl_url == NULL){
289 gslog(LOG_EMERG,"Curl IP defined, but there is no url (e.g. /foo/bar");
297 gettimeofday(&tvs, 0);
304 /* initialize RMR library */
305 if( (mrc = rmr_init( rmr_port, 1400, RMRFL_NONE )) == NULL ) {
306 fprintf(stderr, "%s::error:unable to initialise RMR\n", me);
310 rcv_fd = rmr_get_rcvfd( mrc ); // set up epoll things, start by getting the FD from MRr
312 fprintf(stderr, "%s::error:unable to set up polling fd\n", me);
315 if( (ep_fd = epoll_create1( 0 )) < 0 ) {
316 fprintf(stderr, "%s::error:unable to create epoll fd: %d\n", me, errno);
319 epe.events = EPOLLIN;
320 epe.data.fd = rcv_fd;
322 if( epoll_ctl( ep_fd, EPOLL_CTL_ADD, rcv_fd, &epe ) != 0 ) {
323 fprintf(stderr, "%s::error:epoll_ctl status not 0 : %s\n", me, strerror(errno));
327 rmr_sbuf = rmr_alloc_msg( mrc, MAXLINE ); // alloc first send buffer; subsequent buffers allcoated on send
328 rmr_rbuf = NULL; // don't need to alloc receive buffer
330 while( ! rmr_ready( mrc ) ) { // must have a route table before we can send; wait til RMr say it has one
333 fprintf( stderr, "RMR is ready\n" );
336 /* initialize host library and the sgroup */
338 if (verbose>=2) fprintf(stderr,"Initializing gscp\n");
340 if (ftaapp_init(bufsz)!=0) {
341 fprintf(stderr,"%s::error:could not initialize gscp\n", me);
345 signal(SIGTERM, hand);
346 signal(SIGINT, hand);
348 schema = ftaapp_get_fta_schema_by_name(argv[0]);
350 fprintf(stderr,"%s::error:could not get fta '%s' schema\n",
354 n_expected_param = ftaschema_parameter_len(schema);
355 if (n_expected_param == 0) {
359 n_actual_param = argc-1;
360 if(n_actual_param < n_expected_param){
361 fprintf(stderr,"Error, %d query parameters expected, %d provided.\n",n_expected_param, n_actual_param);
364 /* parse the params */
365 for (lcv = 1 ; lcv < argc ; lcv++) {
370 while (*e && *e != '=') e++;
372 fprintf(stderr,"param parse error '%s' (fmt 'key=val')\n",
377 rv = ftaschema_setparam_by_name(schema, k, e+1, strlen(e+1));
380 fprintf(stderr,"param setparam error '%s' (fmt 'key=val')\n",
385 if (ftaschema_create_param_block(schema, &pblk, &pblklen) < 0) {
386 fprintf(stderr, "ftaschema_create_param_block failed!\n");
390 // ftaschema_free(schema); /* XXXCDC */ // the schema continues to be used
393 if (verbose>=2) fprintf(stderr,"Initalized FTA\n");
395 fta_id=ftaapp_add_fta(argv[0],0,0,0,pblklen,pblk);
396 if (fta_id.streamid==0) {
397 fprintf(stderr,"%s::error:could not initialize fta %s\n",
401 /* XXXCDC: pblk is malloc'd, should we free it? */
403 if (verbose>=2) fprintf(stderr,"Get schema handle\n");
405 if ((schema=ftaapp_get_fta_schema(fta_id))<0) {
406 fprintf(stderr,"%s::error:could not get schema\n", me);
410 if ((numberoffields=ftaschema_tuple_len(schema))<0) {
411 fprintf(stderr,"%s::error:could not get number of fields in schema\n",
417 for(y=0; y<numberoffields;y++) {
418 printf("%s",ftaschema_field_name(schema,y));
419 if (y<numberoffields-1) printf("|");
423 if (xit) { // -X in command line
424 gettimeofday(&tve, 0);
425 timersub(&tve, &tvs, &tvd);
426 printf("TIME= %ld%06d sec\n", tvd.tv_sec, tvd.tv_usec);
427 hand(0); // effectively an exit
433 start_time = time(NULL);
435 int measurement_interval_pos = -1; // extract measurementInterval if present
436 char *field_names[numberoffields];
437 for(y=0; y<numberoffields;y++) {
438 field_names[y] = strdup(ftaschema_field_name(schema,y));
439 if(strcmp(field_names[y], "measurementInterval")==0)
440 measurement_interval_pos = y;
444 struct timeval tsample;
445 gettimeofday(&tsample, 0);
446 char start_ts[100], curr_ts[100];
447 sprintf(start_ts,"%ld%06d", tsample.tv_sec, tsample.tv_usec);
449 long unsigned int lineno=0;
450 long unsigned int seqno=0;
451 double measurement_interval;
452 while((code=ftaapp_get_tuple(&rfta_id,&rsize,rbuf,2*MAXTUPLESZ,0))>=0) {
454 if (dump) // -D in command line
456 if (ftaschema_is_eof_tuple(schema, rbuf)) {
457 /* initiate shutdown or something of that nature */
458 printf("#All data proccessed\n");
464 snprintf(linebuf,MAXLINE,"RESULT CODE => %u\n",code);
467 if ((code==0)&&(rfta_id.streamid == fta_id.streamid)) {
469 gettimeofday(&tsample, 0);
470 sprintf(curr_ts,"%ld%06d", tsample.tv_sec, tsample.tv_usec);
473 pos = snprintf(linebuf, MAXLINE,
474 "{\"event\": { \"commonEventHeader\": { "
475 "\"domain\": \"measurementsForVfScaling\", "
476 "\"eventId\": \"%s%u\", "
477 "\"eventType\": \"%s\", "
478 "\"eventName\": \"Measurement_MC_%s\", "
479 "\"lastEpochMicrosec\": %s, "
480 "\"priority\": \"Normal\", "
481 "\"reportingEntityName\": \"GS-LITE MC\", "
483 "\"sourceName\": \"meas_cmpgn_xapp\", "
484 "\"startEpochMicrosec\": %s, "
487 "\"measurementsForVfScalingFields\": { "
488 "\"additionalFields\": ["
489 ,argv[0],lineno, argv[0], argv[0], curr_ts, seqno, start_ts
492 pos = snprintf(linebuf, MAXLINE,
493 "{\"event\": { \"commonEventHeader\": { "
494 "\"domain\": \"measurement\", "
495 "\"eventId\": \"%s%u\", "
496 "\"eventType\": \"%s\", "
497 "\"eventName\": \"Measurement_MC_%s\", "
498 "\"lastEpochMicrosec\": %s, "
499 "\"priority\": \"Normal\", "
500 "\"reportingEntityName\": \"GS-LITE MC\", "
502 "\"sourceName\": \"meas_cmpgn_xapp\", "
503 "\"startEpochMicrosec\": %s, "
504 "\"version\": \"4.0.1\", "
505 "\"vesEventListenerVersion\": \"7.0.1\" "
507 "\"measurementFields\": { "
508 "\"additionalFields\": {"
509 ,argv[0],lineno, argv[0], argv[0], curr_ts, seqno, start_ts
513 measurement_interval = 0;
514 for(y=0; y<numberoffields;y++) {
515 struct access_result ar;
517 // printf("%s->",ftaschema_field_name(schema,y));
522 ar=ftaschema_get_field_by_index(schema,y,rbuf,rsize);
523 switch (ar.field_data_type) {
526 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%d\"}",field_names[y], ar.r.i);
528 pos += snprintf(linebuf+pos,MAXLINE-pos," \"%s\": \"%d\"",field_names[y], ar.r.i);
529 if(y==measurement_interval_pos)
530 measurement_interval = (double)ar.r.i;
534 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%u\"}",field_names[y], ar.r.ui);
536 pos += snprintf(linebuf+pos,MAXLINE-pos," \"%s\": \"%u\"",field_names[y], ar.r.ui);
537 if(y==measurement_interval_pos)
538 measurement_interval = (double)ar.r.ui;
542 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%u.%u.%u.%u\"}",field_names[y], ar.r.ui>>24&0xff,
547 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"%u.%u.%u.%u\"",field_names[y], ar.r.ui>>24&0xff,
555 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"",field_names[y]);
557 pos += snprintf(linebuf+pos,MAXLINE-pos," \"%s\": \"",field_names[y]);
560 for(x=0;x<4;x++) { if (ar.r.ip6.v[x]==0) zc++;}
562 snprintf(linebuf,MAXLINE,"");
564 unsigned char * a = (unsigned char *) &(ar.r.ip6.v[0]);
566 y=((unsigned)a[2*x])<<8|((unsigned) a[2*x+1]);
567 pos += snprintf(linebuf+pos,MAXLINE-pos,"%04x",y);
569 pos += snprintf(linebuf+pos,MAXLINE-pos,":");
573 pos+=snprintf(linebuf+pos,MAXLINE-pos,"::");
576 pos += snprintf(linebuf+pos, MAXLINE-pos,"\"}");
578 pos += snprintf(linebuf+pos, MAXLINE-pos,"\"");
584 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%u\"}",field_names[y], ar.r.ui);
586 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"%u\"}",field_names[y], ar.r.ui);
587 if(y==measurement_interval_pos)
588 measurement_interval = (double)ar.r.ui;
593 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"FALSE\"}",field_names[y]);
595 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"TRUE\"}",field_names[y]);
599 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"FALSE\"",field_names[y]);
601 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"TRUE\"",field_names[y]);
607 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%llu\"}",field_names[y], ar.r.ul);
609 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"%llu\"",field_names[y], ar.r.ul);
610 if(y==measurement_interval_pos)
611 measurement_interval = (double)ar.r.ul;
615 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%lld\"}",field_names[y], ar.r.l);
617 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"%lld\"",field_names[y], ar.r.l);
618 if(y==measurement_interval_pos)
619 measurement_interval = (double)ar.r.l;
623 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%f\"}",field_names[y], ar.r.f);
625 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"%f\"",field_names[y], ar.r.f);
626 if(y==measurement_interval_pos)
627 measurement_interval = (double)ar.r.f;
636 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%f\"}",field_names[y], t);
638 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"%f\"",field_names[y], t);
644 pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"",field_names[y]);
646 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"",field_names[y]);
650 src=(char*)ar.r.vs.offset;
651 for(x=0;x<ar.r.vs.length;x++){
653 if ((c<='~') && (c>=' ')) {
666 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"}");
668 pos += snprintf(linebuf+pos,MAXLINE-pos,"\"");
677 snprintf(linebuf+pos, MAXLINE-pos,
678 "], \"measurementInterval\": %f, \"measurementsForVfScalingVersion\": 1"
679 "}}}\n", measurement_interval
682 snprintf(linebuf+pos, MAXLINE-pos,
683 "}, \"measurementInterval\": %f, \"measurementFieldsVersion\": \"4.0\""
684 "}}}\n", measurement_interval
689 rmr_sbuf->mtype = rmr_mtype; // fill in the message bits
690 rmr_sbuf->len = strlen(linebuf) + 1; // our receiver likely wants a nice acsii-z string
691 memcpy(rmr_sbuf->payload, linebuf, rmr_sbuf->len);
693 rmr_sbuf = rmr_send_msg( mrc, rmr_sbuf); // send it (send returns an empty payload on success, or the original payload on fail/retry)
694 while( rmr_sbuf->state == RMR_ERR_RETRY ) { // soft failure (device busy?) retry
695 rmr_sbuf = rmr_send_msg( mrc, rmr_sbuf); // retry send until it's good (simple test; real programmes should do better)
697 if(rmr_sbuf->state != RMR_OK) {
698 gslog(LOG_WARNING, "rmr_send_msg() failure, strerror(errno) is %s", strerror(errno));
699 rmr_post_failure_cnt++;
701 rmr_post_success_cnt++;
702 if (((rmr_post_success_cnt+rmr_post_failure_cnt) % STAT_FREQUENCY) == 0)
703 gslog(LOG_WARNING, "%s: successful RMR posts - %llu, failed RMR posts - %llu", argv[0], rmr_post_success_cnt, rmr_post_failure_cnt);
706 if(curl_address==NULL){
707 if (!rmr_port) // if neither VES collector nor RMR is specified print to standard output
710 http_post_request_hdr(curl_endpoint, curl_url, linebuf, &http_code, curl_auth);
711 if(http_code != 200 && http_code != 202){
713 gslog(LOG_WARNING, "http return code is %d",http_code);
717 if (((post_success_cnt+post_failure_cnt) % STAT_FREQUENCY) == 0)
718 gslog(LOG_WARNING, "%s: successful ves posts - %llu, failed ves posts - %llu", argv[0], post_success_cnt, post_failure_cnt);
720 if (verbose!=0) fflush(stdout);
722 if (rfta_id.streamid != fta_id.streamid)
723 fprintf(stderr,"Got unknown streamid %llu \n",rfta_id.streamid);
726 // whenever we receive a temp tuple check if we reached time limit
727 if ((code==2) && tlimit && (time(NULL)-start_time)>=tlimit) {
728 fprintf(stderr,"Reached time limit of %d seconds\n",tlimit);