From: vlad shkapenyuk Date: Wed, 15 Apr 2020 23:07:44 +0000 (-0400) Subject: Added support for publishing metrics using RMR and new descriptor file format X-Git-Tag: 1.0.4^0 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=50db50e8785e69eee93a1f79bbe179f26cd6e589;p=ric-app%2Fmc.git Added support for publishing metrics using RMR and new descriptor file format Signed-off-by: vlad shkapenyuk Change-Id: Ifd15f6ad2c1bda7b7067012320dee04ece744516 --- diff --git a/mc-core/Dockerfile b/mc-core/Dockerfile index b46eb17..6c09a51 100644 --- a/mc-core/Dockerfile +++ b/mc-core/Dockerfile @@ -16,15 +16,18 @@ ARG STAGE_DIR=/mc -FROM nexus3.o-ran-sc.org:10004/bldr-ubuntu18-c-go:7-u18.04 as project-build -# Update & installation of linux packages -RUN apt-get update -y && \ - apt-get install -y curl && \ - apt-get install -y procps && \ - apt-get install -y python-pip +FROM nexus3.o-ran-sc.org:10004/bldr-ubuntu18-c-go:7-u18.04 AS project-build ARG STAGE_DIR +ARG RMR_VER=3.7.4 + +RUN wget -nv --content-disposition https://packagecloud.io/o-ran-sc/staging/packages/debian/stretch/rmr_${RMR_VER}_amd64.deb/download.deb +RUN wget -nv --content-disposition https://packagecloud.io/o-ran-sc/staging/packages/debian/stretch/rmr-dev_${RMR_VER}_amd64.deb/download.deb +RUN dpkg -i rmr_${RMR_VER}_amd64.deb +RUN dpkg -i rmr-dev_${RMR_VER}_amd64.deb +RUN ldconfig + COPY mc ${STAGE_DIR} WORKDIR ${STAGE_DIR} RUN apt-get install -y libboost-all-dev @@ -53,7 +56,6 @@ RUN ./configure RUN make RUN make install RUN ldconfig -RUN pip install protobuf WORKDIR ${STAGE_DIR} RUN git clone -b release/0.1.0 https://gerrit.o-ran-sc.org/r/com/gs-lite WORKDIR ${STAGE_DIR}/gs-lite @@ -70,7 +72,7 @@ RUN python generate_runall.py # now install the binaries and libraries into smaller docker image -FROM nexus3.o-ran-sc.org:10004/o-ran-sc/ric-app-mc-listener:1.4.0 +FROM nexus3.o-ran-sc.org:10004/o-ran-sc/ric-app-mc-listener:1.5.0 ARG STAGE_DIR @@ -78,6 +80,7 @@ COPY --from=project-build ${STAGE_DIR}/gs-lite/demo/queries /mc/gs-lite/demo/que COPY --from=project-build ${STAGE_DIR}/gs-lite/bin /mc/gs-lite/bin COPY --from=project-build ${STAGE_DIR}/data_gen /mc/data_gen COPY --from=project-build ${STAGE_DIR}/extract_params.py /mc/ +COPY --from=project-build ${STAGE_DIR}/extract_rmr_port.py /mc/ COPY --from=project-build /usr/local/lib/libproto* /usr/local/lib/ COPY --from=project-build /usr/local/lib/libsdl* /usr/local/lib/ @@ -87,8 +90,7 @@ COPY --from=project-build ${STAGE_DIR}/mc_deployment.json /opt/ric/config/config COPY container_start.sh /playpen/bin/ RUN apt-get update && \ - apt-get install -y curl python python-pip libboost-all-dev libhiredis-dev \ - redis-tools && \ + apt-get install -y curl python python-pip libboost-all-dev libhiredis-dev && \ apt-get clean RUN ldconfig diff --git a/mc-core/container-tag.yaml b/mc-core/container-tag.yaml index 9a0530a..5975875 100644 --- a/mc-core/container-tag.yaml +++ b/mc-core/container-tag.yaml @@ -1,4 +1,4 @@ --- -tag: '1.0.3' +tag: '1.0.4' # this is used by the CI jobs to tag the image it builds diff --git a/mc-core/container_start.sh b/mc-core/container_start.sh index 1d4d271..a2195ea 100755 --- a/mc-core/container_start.sh +++ b/mc-core/container_start.sh @@ -42,6 +42,7 @@ set -e SIMULATOR_MODE=`python /mc/extract_params.py ${XAPP_DESCRIPTOR_PATH}/config-file.json simulator_mode` +RMR_PORT=`python /mc/extract_rmr_port.py ${XAPP_DESCRIPTOR_PATH}/config-file.json rmr_data_in` if [ "$SIMULATOR_MODE" != "true" ] then @@ -49,7 +50,12 @@ then ( cd /playpen - bin/mc_listener + if [ "$RMR_PORT" != "" ] + then + bin/mc_listener -p $RMR_PORT + else + bin/mc_listener + fi ) echo "listener was started" >&2 diff --git a/mc-core/mc/extract_params.py b/mc-core/mc/extract_params.py index 3368651..21d7207 100644 --- a/mc-core/mc/extract_params.py +++ b/mc-core/mc/extract_params.py @@ -30,7 +30,7 @@ ret = '' with open(xapp_descriptor_file) as f: data = json.load(f) - if 'parameters' in data.keys() and param_name in data['parameters'].keys(): - ret = data['parameters'][param_name] + if 'controls' in data.keys() and param_name in data['controls'].keys(): + ret = data['controls'][param_name] print ret diff --git a/mc-core/mc/extract_rmr_port.py b/mc-core/mc/extract_rmr_port.py new file mode 100644 index 0000000..5fbc77b --- /dev/null +++ b/mc-core/mc/extract_rmr_port.py @@ -0,0 +1,39 @@ +# ------------------------------------------------------------------------------- +# Copyright (c) 2018-2019 AT&T Intellectual Property. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ------------------------------------------------------------------------------- + +import json +import sys + +usage = "extract_rmr_port.py xapp_descriptor_file port_name" + +if len(sys.argv) < 3 : + sys.exit(-1) + +xapp_descriptor_file = sys.argv[1] +port_name = sys.argv[2] + +ret = '' + +with open(xapp_descriptor_file) as f: + data = json.load(f) + + if 'messaging' in data.keys() and 'ports' in data['messaging'].keys(): + port_list = data['messaging']['ports'] + for port in port_list : + if 'name' in port.keys() and 'port' in port.keys() and port['name'] == port_name : + ret = port['port'] + +print ret diff --git a/mc-core/mc/mc_deployment.json b/mc-core/mc/mc_deployment.json index 635fdd5..2a3f2ec 100644 --- a/mc-core/mc/mc_deployment.json +++ b/mc-core/mc/mc_deployment.json @@ -1,38 +1,101 @@ { - "local": { - "host": ":8080" - }, - "logger": { - "level": 3 + "xapp_name": "mcxapp", + "version": "1.0.3", + "containers": [ + { + "name": "mcxapp", + "image": { + "registry": "nexus3.o-ran-sc.org:10002", + "name": "o-ran-sc/ric-app-mc", + "tag": "1.0.4" + }, + "command": "/playpen/bin/container_start.sh" + } + ], + "livenessProbe": { + "exec": { + "command": ["/usr/local/bin/health_ck"] + }, + "initialDelaySeconds": 5, + "periodSeconds": 15 }, - "rmr": { - "protPort": "tcp:4560", - "maxSize": 2072, - "numWorkers": 1, - "txMessages": [ - "RIC_SUB_REQ", - "RIC_SUB_DEL_REQ" - ], - "rxMessages": [ - "RIC_SUB_RESP", - "RIC_SUB_FAILURE", - "RIC_SUB_DEL_RESP", - "RIC_INDICATION" + "messaging": { + "ports": [ + { + "name": "rmr_data_in", + "container": "mcxapp", + "port": 4560, + "rxMessages": + [ + "RIC_UE_CONTEXT_RELEASE", + "RIC_SGNB_ADDITION_REQ", + "RIC_SGNB_ADDITION_ACK", + "RIC_SGNB_ADDITION_REJECT", + "RIC_SGNB_RECONF_COMPLETE", + "RIC_RRC_TRANSFER", + "RIC_SGNB_MOD_REQUEST", + "RIC_SGNB_MOD_REQUEST_ACK", + "RIC_SGNB_MOD_REQUEST_REJ", + "RIC_SGNB_MOD_REQUIRED", + "RIC_SGNB_MOD_CONFIRM", + "RIC_SGNB_MOD_REFUSE", + "RIC_SGNB_RELEASE_REQUEST", + "RIC_SGNB_RELEASE_REQUEST_ACK", + "RIC_SGNB_RELEASE_REQUIRED", + "RIC_SGNB_RELEASE_CONFIRM", + "RIC_SECONDARY_RAT_DATA_USAGE_REPORT" + ], + "description": "rmr receive data port for mcxapp" + }, + { + "name": "rmr_data_out", + "container": "mcxapp", + "port": 4562, + "txMessages": + [ + "MC_REPORT" + ], + "description": "rmr send data port for mcxapp" + }, + { + "name": "rmr_route", + "container": "mcxapp", + "port": 4561, + "description": "rmr route port for mcxapp" + } ] }, "controls": { - "active": true, - "interfaceId": { - "globalENBId": { - "plmnId": 123456, - "eNBId": 5678 - } - } - }, - "parameters": { - "ves_collector_address":"xapp-sandbox2.research.att.com:8888", + "ves_collector_address": "xapp-sandbox2.research.att.com:8888", "measurement_interval": 10000, "simulator_mode": "true", "debug_mode": "true" - } -} + }, + "rmr":{ + "protPort": "tcp:4560", + "maxSize": 2072, + "numWorkers": 1, + "txMessages": [ + "MC_REPORT" + ], + "rxMessages": [ + "RIC_UE_CONTEXT_RELEASE", + "RIC_SGNB_ADDITION_REQ", + "RIC_SGNB_ADDITION_ACK", + "RIC_SGNB_ADDITION_REJECT", + "RIC_SGNB_RECONF_COMPLETE", + "RIC_RRC_TRANSFER", + "RIC_SGNB_MOD_REQUEST", + "RIC_SGNB_MOD_REQUEST_ACK", + "RIC_SGNB_MOD_REQUEST_REJ", + "RIC_SGNB_MOD_REQUIRED", + "RIC_SGNB_MOD_CONFIRM", + "RIC_SGNB_MOD_REFUSE", + "RIC_SGNB_RELEASE_REQUEST", + "RIC_SGNB_RELEASE_REQUEST_ACK", + "RIC_SGNB_RELEASE_REQUIRED", + "RIC_SGNB_RELEASE_CONFIRM", + "RIC_SECONDARY_RAT_DATA_USAGE_REPORT" + ] + } +} \ No newline at end of file diff --git a/mc-core/mc/mcnib/Makefile b/mc-core/mc/mcnib/Makefile index 2e36b36..5350289 100644 --- a/mc-core/mc/mcnib/Makefile +++ b/mc-core/mc/mcnib/Makefile @@ -60,7 +60,7 @@ gsprintconsole: gsprintconsole.o ../../lib/libgscphostaux.a ../../lib/libgscphos g++ -g -o gsprintconsole gsprintconsole.o -L../../lib -lgscpapp -lgscphostaux -lgscphost -lgscpinterface -lgscplftaaux -lclearinghouse -lgscpaux gsprintconsole_ves: gsprintconsole_ves.o ../../lib/libgscphostaux.a ../../lib/libgscphost.a ../../lib/libgscpinterface.a ../../lib/libgscpapp.a - g++ -g -o gsprintconsole_ves gsprintconsole_ves.o -L../../lib -lgscpapp -lgscphostaux -lgscphost -lgscpinterface -lgscplftaaux -lclearinghouse -lgscpaux + g++ -g -o gsprintconsole_ves gsprintconsole_ves.o -L../../lib -lgscpapp -lgscphostaux -lgscphost -lgscpinterface -lgscplftaaux -lclearinghouse -lgscpaux -lrmr_si -lm -lpthread gsmcnib: gsmcnib.o ../../lib/libgscphostaux.a ../../lib/libgscphost.a ../../lib/libgscpinterface.a ../../lib/libgscpapp.a g++ -g -o gsmcnib gsmcnib.o -L../../lib -lgscpapp -lgscphostaux -lgscphost -lgscpinterface -lgscplftaaux -lclearinghouse -lgscpaux -lsdl diff --git a/mc-core/mc/mcnib/gsprintconsole_ves.c b/mc-core/mc/mcnib/gsprintconsole_ves.c new file mode 100644 index 0000000..75dccb0 --- /dev/null +++ b/mc-core/mc/mcnib/gsprintconsole_ves.c @@ -0,0 +1,727 @@ +/* ------------------------------------------------ + Copyright 2014 AT&T Intellectual Property + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + ------------------------------------------- */ + + +/* + * Print ves formatted records to the console. + * Each line is a json record. + * Based on gsprintconsole.c, just differences in formatting. +*/ + + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include +#include + + +#include "gsconfig.h" +#include "gstypes.h" +#include "gshub.h" +#include "simple_http.h" + +#include + +#define MAXLINE 100000 +static unsigned tcpport=0; +static char linebuf[MAXLINE]; +int listensockfd=0; +int fd=0; + +// how frequently we will log stats (expressed in tuples posted) +#define STAT_FREQUENCY 5 + + +// Not all systems have timersub defined so make sure its ther +#ifndef timersub + +#define timersub(tvp, uvp, vvp) \ +do { \ +(vvp)->tv_sec = (tvp)->tv_sec - (uvp)->tv_sec; \ +(vvp)->tv_usec = (tvp)->tv_usec - (uvp)->tv_usec; \ +if ((vvp)->tv_usec < 0) { \ +(vvp)->tv_sec--; \ +(vvp)->tv_usec += 1000000; \ +} \ +} while (0) + +#endif + +void hand(int iv) { + ftaapp_exit(); + fprintf(stderr, "exiting via signal handler %d...\n", iv); + exit(1); +} + +static void wait_for_client() { + struct sockaddr_in serv_addr,cli_addr; + socklen_t clilen; + if (listensockfd==0) { + gs_int32_t on = 1; + listensockfd=socket(AF_INET, SOCK_STREAM, 0); + if (listensockfd < 0) { + gslog(LOG_EMERG,"Error:Could not create socket for tcp data stream"); + exit(1); + } + bzero((char *) &serv_addr, sizeof(serv_addr)); + serv_addr.sin_family = AF_INET; + serv_addr.sin_addr.s_addr = INADDR_ANY; + serv_addr.sin_port = htons(tcpport); +#ifndef __linux__ + /* make sure we can reuse the common port rapidly */ + if (setsockopt(listensockfd, SOL_SOCKET, SO_REUSEPORT, + (gs_sp_t )&on, sizeof(on)) != 0) { + gslog(LOG_EMERG,"Error::could not set socket option"); + exit(1); + } +#endif + if (setsockopt(listensockfd, SOL_SOCKET, SO_REUSEADDR, + (gs_sp_t )&on, sizeof(on)) != 0) { + gslog(LOG_EMERG,"Error::could not set socket option"); + exit(1); + } + + if (bind(listensockfd, (struct sockaddr *) &serv_addr, + sizeof(serv_addr)) < 0) { + gslog(LOG_EMERG,"Error:Could not bind socket for tcp data stream"); + exit(1); + } + } + + do { + listen(listensockfd,5); + clilen = sizeof(cli_addr); + fd=accept(listensockfd, (struct sockaddr *) &cli_addr, &clilen); + if (fd<0) { + gslog(LOG_EMERG,"Error:Could not accept connection on tcp socket"); + } + } while (fd==0); +} + + +static void emit_socket() { + unsigned o,w,l; + o=0; + w=0; + l=strlen(linebuf); + do { + if((w=write(fd,&linebuf[o],l))==0) { + close(fd); + wait_for_client(); + } + o=o+w; + } while (o] [-p ] [-l ] [-v] [-X] [-D] [-C :] [-U ] [-A ] [-V ] [-R ] : query param1 param2...\n", + *argv); + exit(1); + } + } + argc -= optind; + argv += optind; + if (argc<3) goto usage; + + if (sscanf(argv[0],"%u.%u.%u.%u:%hu",&tip1,&tip2,&tip3,&tip4,&(gshub.port))!= 5 ) { + gslog(LOG_EMERG,"HUB IP NOT DEFINED"); + exit(1); + } + gshub.ip=htonl(tip1<<24|tip2<<16|tip3<<8|tip4); + gshub.port=htons(gshub.port); + instance_name=strdup(argv[1]); + if (set_hub(gshub)!=0) { + gslog(LOG_EMERG,"Could not set hub"); + exit(1); + } + if (set_instance_name(instance_name)!=0) { + gslog(LOG_EMERG,"Could not set instance name"); + exit(1); + } + + if (get_initinstance(gshub,instance_name,&dummyep,1)!=0) { + gslog(LOG_EMERG,"Did not receive signal that GS is initiated"); + } + + +// If this uses curl output, ensure consistency in the curl args + if(curl_address != NULL){ + if(curl_url == NULL){ + gslog(LOG_EMERG,"Curl IP defined, but there is no url (e.g. /foo/bar"); + exit(1); + } + if(curl_auth==NULL){ + curl_auth = ""; + } + } + + gettimeofday(&tvs, 0); + argc -=2; + argv +=2; + if (argc < 1) + goto usage; + + if (rmr_port) { + /* initialize RMR library */ + if( (mrc = rmr_init( rmr_port, 1400, RMRFL_NONE )) == NULL ) { + fprintf(stderr, "%s::error:unable to initialise RMR\n", me); + exit( 1 ); + } + + rcv_fd = rmr_get_rcvfd( mrc ); // set up epoll things, start by getting the FD from MRr + if( rcv_fd < 0 ) { + fprintf(stderr, "%s::error:unable to set up polling fd\n", me); + exit( 1 ); + } + if( (ep_fd = epoll_create1( 0 )) < 0 ) { + fprintf(stderr, "%s::error:unable to create epoll fd: %d\n", me, errno); + exit( 1 ); + } + epe.events = EPOLLIN; + epe.data.fd = rcv_fd; + + if( epoll_ctl( ep_fd, EPOLL_CTL_ADD, rcv_fd, &epe ) != 0 ) { + fprintf(stderr, "%s::error:epoll_ctl status not 0 : %s\n", me, strerror(errno)); + exit( 1 ); + } + + rmr_sbuf = rmr_alloc_msg( mrc, MAXLINE ); // alloc first send buffer; subsequent buffers allcoated on send + rmr_rbuf = NULL; // don't need to alloc receive buffer + + while( ! rmr_ready( mrc ) ) { // must have a route table before we can send; wait til RMr say it has one + sleep( 1 ); + } + fprintf( stderr, "RMR is ready\n" ); + } + + /* initialize host library and the sgroup */ + + if (verbose>=2) fprintf(stderr,"Initializing gscp\n"); + + if (ftaapp_init(bufsz)!=0) { + fprintf(stderr,"%s::error:could not initialize gscp\n", me); + exit(1); + } + + signal(SIGTERM, hand); + signal(SIGINT, hand); + + schema = ftaapp_get_fta_schema_by_name(argv[0]); + if (schema < 0) { + fprintf(stderr,"%s::error:could not get fta '%s' schema\n", + me ,argv[0]); + exit(1); + } + n_expected_param = ftaschema_parameter_len(schema); + if (n_expected_param == 0) { + pblk = 0; + pblklen = 0; + } else { + n_actual_param = argc-1; + if(n_actual_param < n_expected_param){ + fprintf(stderr,"Error, %d query parameters expected, %d provided.\n",n_expected_param, n_actual_param); + exit(1); + } + /* parse the params */ + for (lcv = 1 ; lcv < argc ; lcv++) { + char *k, *e; + int rv; + k = argv[lcv]; + e = k; + while (*e && *e != '=') e++; + if (*e == 0) { + fprintf(stderr,"param parse error '%s' (fmt 'key=val')\n", + argv[lcv]); + exit(1); + } + *e = 0; + rv = ftaschema_setparam_by_name(schema, k, e+1, strlen(e+1)); + *e = '='; + if (rv < 0) { + fprintf(stderr,"param setparam error '%s' (fmt 'key=val')\n", + argv[lcv]); + exit(1); + } + } + if (ftaschema_create_param_block(schema, &pblk, &pblklen) < 0) { + fprintf(stderr, "ftaschema_create_param_block failed!\n"); + exit(1); + } + } +// ftaschema_free(schema); /* XXXCDC */ // the schema continues to be used + + + if (verbose>=2) fprintf(stderr,"Initalized FTA\n"); + + fta_id=ftaapp_add_fta(argv[0],0,0,0,pblklen,pblk); + if (fta_id.streamid==0) { + fprintf(stderr,"%s::error:could not initialize fta %s\n", + me, argv[0]); + exit(1); + } + /* XXXCDC: pblk is malloc'd, should we free it? */ + + if (verbose>=2) fprintf(stderr,"Get schema handle\n"); + + if ((schema=ftaapp_get_fta_schema(fta_id))<0) { + fprintf(stderr,"%s::error:could not get schema\n", me); + exit(1); + } + + if ((numberoffields=ftaschema_tuple_len(schema))<0) { + fprintf(stderr,"%s::error:could not get number of fields in schema\n", + me); + exit(1); + } + + if (verbose>=1) { + for(y=0; y=0) { + lineno++; + if (dump) // -D in command line + continue; + if (ftaschema_is_eof_tuple(schema, rbuf)) { + /* initiate shutdown or something of that nature */ + printf("#All data proccessed\n"); + exit(0); + } + if (!rsize) + continue; + if (verbose >=2) { + snprintf(linebuf,MAXLINE,"RESULT CODE => %u\n",code); + emit_line(); + } + if ((code==0)&&(rfta_id.streamid == fta_id.streamid)) { + seqno++; + gettimeofday(&tsample, 0); + sprintf(curr_ts,"%ld%06d", tsample.tv_sec, tsample.tv_usec); + int pos; + if(ves_version < 7){ + pos = snprintf(linebuf, MAXLINE, + "{\"event\": { \"commonEventHeader\": { " + "\"domain\": \"measurementsForVfScaling\", " + "\"eventId\": \"%s%u\", " + "\"eventType\": \"%s\", " + "\"eventName\": \"Measurement_MC_%s\", " + "\"lastEpochMicrosec\": %s, " + "\"priority\": \"Normal\", " + "\"reportingEntityName\": \"GS-LITE MC\", " + "\"sequence\": %u, " + "\"sourceName\": \"meas_cmpgn_xapp\", " + "\"startEpochMicrosec\": %s, " + "\"version\": 5 " + "}, " + "\"measurementsForVfScalingFields\": { " + "\"additionalFields\": [" + ,argv[0],lineno, argv[0], argv[0], curr_ts, seqno, start_ts + ); + }else{ + pos = snprintf(linebuf, MAXLINE, + "{\"event\": { \"commonEventHeader\": { " + "\"domain\": \"measurement\", " + "\"eventId\": \"%s%u\", " + "\"eventType\": \"%s\", " + "\"eventName\": \"Measurement_MC_%s\", " + "\"lastEpochMicrosec\": %s, " + "\"priority\": \"Normal\", " + "\"reportingEntityName\": \"GS-LITE MC\", " + "\"sequence\": %u, " + "\"sourceName\": \"meas_cmpgn_xapp\", " + "\"startEpochMicrosec\": %s, " + "\"version\": \"4.0.1\", " + "\"vesEventListenerVersion\": \"7.0.1\" " + "}, " + "\"measurementFields\": { " + "\"additionalFields\": {" + ,argv[0],lineno, argv[0], argv[0], curr_ts, seqno, start_ts + ); + } + + measurement_interval = 0; + for(y=0; y=2) +// printf("%s->",ftaschema_field_name(schema,y)); + if(y>0){ + linebuf[pos]=','; + pos++; + } + ar=ftaschema_get_field_by_index(schema,y,rbuf,rsize); + switch (ar.field_data_type) { + case INT_TYPE: + if(ves_version < 7) + pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%d\"}",field_names[y], ar.r.i); + else + pos += snprintf(linebuf+pos,MAXLINE-pos," \"%s\": \"%d\"",field_names[y], ar.r.i); + if(y==measurement_interval_pos) + measurement_interval = (double)ar.r.i; + break; + case UINT_TYPE: + if(ves_version < 7) + pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%u\"}",field_names[y], ar.r.ui); + else + pos += snprintf(linebuf+pos,MAXLINE-pos," \"%s\": \"%u\"",field_names[y], ar.r.ui); + if(y==measurement_interval_pos) + measurement_interval = (double)ar.r.ui; + break; + case IP_TYPE: + if(ves_version < 7) + pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%u.%u.%u.%u\"}",field_names[y], ar.r.ui>>24&0xff, + ar.r.ui>>16&0xff, + ar.r.ui>>8&0xff, + ar.r.ui&0xff); + else + pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"%u.%u.%u.%u\"",field_names[y], ar.r.ui>>24&0xff, + ar.r.ui>>16&0xff, + ar.r.ui>>8&0xff, + ar.r.ui&0xff); + break; + case IPV6_TYPE: + { + if(ves_version < 7) + pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"",field_names[y]); + else + pos += snprintf(linebuf+pos,MAXLINE-pos," \"%s\": \"",field_names[y]); + unsigned x; + unsigned zc=0; + for(x=0;x<4;x++) { if (ar.r.ip6.v[x]==0) zc++;} + if (zc!=4) { + snprintf(linebuf,MAXLINE,""); + for(x=0;x<8;x++) { + unsigned char * a = (unsigned char *) &(ar.r.ip6.v[0]); + unsigned y; + y=((unsigned)a[2*x])<<8|((unsigned) a[2*x+1]); + pos += snprintf(linebuf+pos,MAXLINE-pos,"%04x",y); + if (x<7){ + pos += snprintf(linebuf+pos,MAXLINE-pos,":"); + } + } + } else { + pos+=snprintf(linebuf+pos,MAXLINE-pos,"::"); + } + if(ves_version < 7) + pos += snprintf(linebuf+pos, MAXLINE-pos,"\"}"); + else + pos += snprintf(linebuf+pos, MAXLINE-pos,"\""); + } + break; + + case USHORT_TYPE: + if(ves_version < 7) + pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%u\"}",field_names[y], ar.r.ui); + else + pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"%u\"}",field_names[y], ar.r.ui); + if(y==measurement_interval_pos) + measurement_interval = (double)ar.r.ui; + break; + case BOOL_TYPE: + if(ves_version < 7){ + if (ar.r.ui==0) { + pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"FALSE\"}",field_names[y]); + } else { + pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"TRUE\"}",field_names[y]); + } + }else{ + if (ar.r.ui==0) { + pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"FALSE\"",field_names[y]); + } else { + pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"TRUE\"",field_names[y]); + } + } + break; + case ULLONG_TYPE: + if(ves_version < 7) + pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%llu\"}",field_names[y], ar.r.ul); + else + pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"%llu\"",field_names[y], ar.r.ul); + if(y==measurement_interval_pos) + measurement_interval = (double)ar.r.ul; + break; + case LLONG_TYPE: + if(ves_version < 7) + pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%lld\"}",field_names[y], ar.r.l); + else + pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"%lld\"",field_names[y], ar.r.l); + if(y==measurement_interval_pos) + measurement_interval = (double)ar.r.l; + break; + case FLOAT_TYPE: + if(ves_version < 7) + pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%f\"}",field_names[y], ar.r.f); + else + pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"%f\"",field_names[y], ar.r.f); + if(y==measurement_interval_pos) + measurement_interval = (double)ar.r.f; + break; + case TIMEVAL_TYPE: + { + gs_float_t t; + t= ar.r.t.tv_usec; + t=t/1000000; + t=t+ar.r.t.tv_sec; + if(ves_version < 7) + pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%f\"}",field_names[y], t); + else + pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"%f\"",field_names[y], t); + } + break; + case VSTR_TYPE: + { + if(ves_version < 7) + pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"",field_names[y]); + else + pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"",field_names[y]); + int x; + int c; + char * src; + src=(char*)ar.r.vs.offset; + for(x=0;x=' ')) { + if (posmtype = rmr_mtype; // fill in the message bits + rmr_sbuf->len = strlen(linebuf) + 1; // our receiver likely wants a nice acsii-z string + rmr_sbuf->state = 0; + rmr_sbuf = rmr_send_msg( mrc, rmr_sbuf); // send it (send returns an empty payload on success, or the original payload on fail/retry) + while( rmr_sbuf->state == RMR_ERR_RETRY ) { // soft failure (device busy?) retry + rmr_sbuf = rmr_send_msg( mrc, rmr_sbuf); // retry send until it's good (simple test; real programmes should do better) + } + rmr_post_success_cnt++; + + } + + if(curl_address==NULL){ + emit_line(); + }else{ + http_post_request_hdr(curl_endpoint, curl_url, linebuf, &http_code, curl_auth); + if(http_code != 200 && http_code != 202){ + post_failure_cnt++; + gslog(LOG_WARNING, "http return code is %d",http_code); + } else { + post_success_cnt++; + } + if (((post_success_cnt+post_failure_cnt) % STAT_FREQUENCY) == 0) + gslog(LOG_WARNING, "%s: successful ves posts - %llu, failed ves posts - %llu", argv[0], post_success_cnt, post_failure_cnt); + } + if (verbose!=0) fflush(stdout); + } else { + if (rfta_id.streamid != fta_id.streamid) + fprintf(stderr,"Got unknown streamid %llu \n",rfta_id.streamid); + } + + // whenever we receive a temp tuple check if we reached time limit + if ((code==2) && tlimit && (time(NULL)-start_time)>=tlimit) { + fprintf(stderr,"Reached time limit of %d seconds\n",tlimit); + ftaapp_exit(); + exit(0); + } + } +} + diff --git a/mc-core/mc/queries/generate_runall.py b/mc-core/mc/queries/generate_runall.py index c443dcc..04d5f36 100644 --- a/mc-core/mc/queries/generate_runall.py +++ b/mc-core/mc/queries/generate_runall.py @@ -69,6 +69,8 @@ DEBUG_MODE=`python /mc/extract_params.py ${XAPP_DESCRIPTOR_PATH}/config-file.jso WINDOW=`python /mc/extract_params.py ${XAPP_DESCRIPTOR_PATH}/config-file.json measurement_interval` +RMR_PORT=`python /mc/extract_rmr_port.py ${XAPP_DESCRIPTOR_PATH}/config-file.json rmr_data_out` + # export DBAAS_SERVICE_HOST=`python /mc/extract_params.py ${XAPP_DESCRIPTOR_PATH}/config-file.json __DBAAS_SERVICE_HOST__` # export DBAAS_SERVICE_PORT=`python /mc/extract_params.py ${XAPP_DESCRIPTOR_PATH}/config-file.json __DBAAS_SERVICE_PORT__` @@ -119,12 +121,17 @@ fi runall += """ +if [ "$RMR_PORT" != "" ] +then + RMR_OPTION="-R $RMR_PORT" +fi + # invoke gsprintconsole_ves gsmcnib for all non-debug queries """ for q in oqy: if "debug" not in q: - runall += " /mc/gs-lite/bin/gsprintconsole_ves -C $VES_IP:$VES_PORT -U /vescollector/eventListener/v7 -V 7 `cat gshub.log` default "+q+" window=$WINDOW &\n" + runall += " /mc/gs-lite/bin/gsprintconsole_ves -C $VES_IP:$VES_PORT -U /vescollector/eventListener/v7 -V 7 $RMR_OPTION `cat gshub.log` default "+q+" window=$WINDOW &\n" keys = nib[q]["keys"] if len(keys)>0: keys_str = ",".join(keys)