Added support for publishing metrics using RMR and new descriptor file format 88/3288/1 1.0.4
authorvlad shkapenyuk <vshkap@research.att.com>
Wed, 15 Apr 2020 23:07:44 +0000 (19:07 -0400)
committervlad shkapenyuk <vshkap@research.att.com>
Wed, 15 Apr 2020 23:07:44 +0000 (19:07 -0400)
Signed-off-by: vlad shkapenyuk <vshkap@research.att.com>
Change-Id: Ifd15f6ad2c1bda7b7067012320dee04ece744516

mc-core/Dockerfile
mc-core/container-tag.yaml
mc-core/container_start.sh
mc-core/mc/extract_params.py
mc-core/mc/extract_rmr_port.py [new file with mode: 0644]
mc-core/mc/mc_deployment.json
mc-core/mc/mcnib/Makefile
mc-core/mc/mcnib/gsprintconsole_ves.c [new file with mode: 0644]
mc-core/mc/queries/generate_runall.py

index b46eb17..6c09a51 100644 (file)
 
 ARG STAGE_DIR=/mc
 
-FROM nexus3.o-ran-sc.org:10004/bldr-ubuntu18-c-go:7-u18.04 as project-build
-# Update & installation of linux packages
-RUN apt-get update -y && \
-        apt-get install -y curl && \
-        apt-get install -y procps && \
-        apt-get install -y python-pip
+FROM nexus3.o-ran-sc.org:10004/bldr-ubuntu18-c-go:7-u18.04 AS project-build
 
 ARG STAGE_DIR
 
+ARG RMR_VER=3.7.4
+
+RUN wget -nv --content-disposition https://packagecloud.io/o-ran-sc/staging/packages/debian/stretch/rmr_${RMR_VER}_amd64.deb/download.deb
+RUN wget -nv --content-disposition https://packagecloud.io/o-ran-sc/staging/packages/debian/stretch/rmr-dev_${RMR_VER}_amd64.deb/download.deb
+RUN dpkg -i rmr_${RMR_VER}_amd64.deb
+RUN dpkg -i rmr-dev_${RMR_VER}_amd64.deb
+RUN ldconfig
+
 COPY mc ${STAGE_DIR}
 WORKDIR ${STAGE_DIR}
 RUN apt-get install -y libboost-all-dev
@@ -53,7 +56,6 @@ RUN ./configure
 RUN make
 RUN make install
 RUN ldconfig
-RUN pip install protobuf
 WORKDIR ${STAGE_DIR}
 RUN git clone -b release/0.1.0 https://gerrit.o-ran-sc.org/r/com/gs-lite
 WORKDIR ${STAGE_DIR}/gs-lite
@@ -70,7 +72,7 @@ RUN python generate_runall.py
 
 
 # now install the binaries and libraries into smaller docker image
-FROM nexus3.o-ran-sc.org:10004/o-ran-sc/ric-app-mc-listener:1.4.0
+FROM nexus3.o-ran-sc.org:10004/o-ran-sc/ric-app-mc-listener:1.5.0
 
 ARG STAGE_DIR
 
@@ -78,6 +80,7 @@ COPY --from=project-build ${STAGE_DIR}/gs-lite/demo/queries /mc/gs-lite/demo/que
 COPY --from=project-build ${STAGE_DIR}/gs-lite/bin /mc/gs-lite/bin
 COPY --from=project-build ${STAGE_DIR}/data_gen /mc/data_gen
 COPY --from=project-build ${STAGE_DIR}/extract_params.py /mc/
+COPY --from=project-build ${STAGE_DIR}/extract_rmr_port.py /mc/
 COPY --from=project-build /usr/local/lib/libproto* /usr/local/lib/
 COPY --from=project-build /usr/local/lib/libsdl* /usr/local/lib/
 
@@ -87,8 +90,7 @@ COPY --from=project-build ${STAGE_DIR}/mc_deployment.json /opt/ric/config/config
 COPY container_start.sh /playpen/bin/
 
 RUN apt-get update && \
-    apt-get install -y curl python python-pip libboost-all-dev libhiredis-dev \
- redis-tools && \
+    apt-get install -y curl python python-pip libboost-all-dev libhiredis-dev && \
     apt-get clean
 
 RUN ldconfig
index 9a0530a..5975875 100644 (file)
@@ -1,4 +1,4 @@
 ---
-tag: '1.0.3'
+tag: '1.0.4'
 
 # this is used by the CI jobs to tag the image it builds
index 1d4d271..a2195ea 100755 (executable)
@@ -42,6 +42,7 @@
 set -e
 
 SIMULATOR_MODE=`python /mc/extract_params.py ${XAPP_DESCRIPTOR_PATH}/config-file.json simulator_mode`
+RMR_PORT=`python /mc/extract_rmr_port.py ${XAPP_DESCRIPTOR_PATH}/config-file.json rmr_data_in`
 
 if [ "$SIMULATOR_MODE" != "true" ]
 then
@@ -49,7 +50,12 @@ then
 
 (
        cd /playpen
-       bin/mc_listener
+       if [ "$RMR_PORT" != "" ]
+       then
+               bin/mc_listener -p $RMR_PORT
+       else
+               bin/mc_listener
+       fi
 )
 
 echo "listener was started" >&2
index 3368651..21d7207 100644 (file)
@@ -30,7 +30,7 @@ ret = ''
 with open(xapp_descriptor_file) as f:
     data = json.load(f)
     
-    if 'parameters' in data.keys()  and param_name in data['parameters'].keys():
-        ret = data['parameters'][param_name]
+    if 'controls' in data.keys()  and param_name in data['controls'].keys():
+        ret = data['controls'][param_name]
 
 print ret
diff --git a/mc-core/mc/extract_rmr_port.py b/mc-core/mc/extract_rmr_port.py
new file mode 100644 (file)
index 0000000..5fbc77b
--- /dev/null
@@ -0,0 +1,39 @@
+# -------------------------------------------------------------------------------
+#    Copyright (c) 2018-2019 AT&T Intellectual Property.
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+# -------------------------------------------------------------------------------
+
+import json
+import sys
+
+usage = "extract_rmr_port.py xapp_descriptor_file port_name"
+
+if len(sys.argv) < 3 :
+    sys.exit(-1)
+
+xapp_descriptor_file = sys.argv[1]
+port_name = sys.argv[2]
+
+ret = ''
+
+with open(xapp_descriptor_file) as f:
+    data = json.load(f)
+    
+    if 'messaging' in data.keys() and 'ports' in data['messaging'].keys():
+        port_list = data['messaging']['ports']
+        for port in port_list :
+            if 'name' in port.keys() and 'port' in port.keys() and port['name'] == port_name :
+                ret = port['port']
+
+print ret
index 635fdd5..2a3f2ec 100644 (file)
 {
-       "local": {
-               "host": ":8080"
-       },
-       "logger": {
-               "level": 3
+       "xapp_name": "mcxapp",
+       "version": "1.0.3",
+       "containers": [
+               {
+                       "name": "mcxapp",
+                       "image": {
+                               "registry": "nexus3.o-ran-sc.org:10002",
+                               "name": "o-ran-sc/ric-app-mc",
+                               "tag": "1.0.4"
+                       },
+                       "command": "/playpen/bin/container_start.sh"
+               }
+       ],
+       "livenessProbe": {
+           "exec": {
+               "command": ["/usr/local/bin/health_ck"]
+           },
+           "initialDelaySeconds": 5,
+           "periodSeconds": 15
        },
-       "rmr": {
-               "protPort": "tcp:4560",
-               "maxSize": 2072,
-               "numWorkers": 1,
-               "txMessages": [
-                       "RIC_SUB_REQ",
-                       "RIC_SUB_DEL_REQ"
-               ],
-               "rxMessages": [
-                       "RIC_SUB_RESP",
-                       "RIC_SUB_FAILURE",
-                       "RIC_SUB_DEL_RESP",
-                       "RIC_INDICATION"
+       "messaging": {
+               "ports": [
+                       {
+                               "name": "rmr_data_in",
+                               "container": "mcxapp",
+                               "port": 4560,
+                               "rxMessages":
+                               [
+                                       "RIC_UE_CONTEXT_RELEASE",
+                                       "RIC_SGNB_ADDITION_REQ",
+                                       "RIC_SGNB_ADDITION_ACK",
+                                       "RIC_SGNB_ADDITION_REJECT",
+                                       "RIC_SGNB_RECONF_COMPLETE",
+                                       "RIC_RRC_TRANSFER",
+                                       "RIC_SGNB_MOD_REQUEST",
+                                       "RIC_SGNB_MOD_REQUEST_ACK",
+                                       "RIC_SGNB_MOD_REQUEST_REJ",
+                                       "RIC_SGNB_MOD_REQUIRED",
+                                       "RIC_SGNB_MOD_CONFIRM",
+                                       "RIC_SGNB_MOD_REFUSE",
+                                       "RIC_SGNB_RELEASE_REQUEST",
+                                       "RIC_SGNB_RELEASE_REQUEST_ACK",
+                                       "RIC_SGNB_RELEASE_REQUIRED",
+                                       "RIC_SGNB_RELEASE_CONFIRM",
+                                       "RIC_SECONDARY_RAT_DATA_USAGE_REPORT"                                   
+                               ],
+                               "description": "rmr receive data port for mcxapp"
+                       },
+                       {
+                               "name": "rmr_data_out",
+                               "container": "mcxapp",
+                               "port": 4562,
+                               "txMessages":
+                               [
+                                       "MC_REPORT"
+                               ],
+                               "description": "rmr send data port for mcxapp"
+                       },                      
+                       {
+                               "name": "rmr_route",
+                               "container": "mcxapp",
+                               "port": 4561,
+                               "description": "rmr route port for mcxapp"
+                       }
                ]
        },
        "controls": {
-               "active": true,
-               "interfaceId": {
-                       "globalENBId": {
-                               "plmnId": 123456,
-                               "eNBId": 5678
-                       }
-               }
-       },
-       "parameters": {
-               "ves_collector_address":"xapp-sandbox2.research.att.com:8888",
+               "ves_collector_address": "xapp-sandbox2.research.att.com:8888",
                "measurement_interval": 10000,
                "simulator_mode": "true",
                "debug_mode": "true"
-       }
-}
+       },
+       "rmr":{
+       "protPort": "tcp:4560",
+       "maxSize": 2072,
+       "numWorkers": 1,
+       "txMessages": [
+                       "MC_REPORT"
+          ],                      
+       "rxMessages": [
+                       "RIC_UE_CONTEXT_RELEASE",
+                       "RIC_SGNB_ADDITION_REQ",
+                       "RIC_SGNB_ADDITION_ACK",
+                       "RIC_SGNB_ADDITION_REJECT",
+                       "RIC_SGNB_RECONF_COMPLETE",
+                       "RIC_RRC_TRANSFER",
+                       "RIC_SGNB_MOD_REQUEST",
+                       "RIC_SGNB_MOD_REQUEST_ACK",
+                       "RIC_SGNB_MOD_REQUEST_REJ",
+                       "RIC_SGNB_MOD_REQUIRED",
+                       "RIC_SGNB_MOD_CONFIRM",
+                       "RIC_SGNB_MOD_REFUSE",
+                       "RIC_SGNB_RELEASE_REQUEST",
+                       "RIC_SGNB_RELEASE_REQUEST_ACK",
+                       "RIC_SGNB_RELEASE_REQUIRED",
+                       "RIC_SGNB_RELEASE_CONFIRM",
+                       "RIC_SECONDARY_RAT_DATA_USAGE_REPORT"
+       ]
+   }
+}
\ No newline at end of file
index 2e36b36..5350289 100644 (file)
@@ -60,7 +60,7 @@ gsprintconsole: gsprintconsole.o ../../lib/libgscphostaux.a ../../lib/libgscphos
        g++ -g -o gsprintconsole gsprintconsole.o -L../../lib -lgscpapp  -lgscphostaux -lgscphost -lgscpinterface -lgscplftaaux -lclearinghouse -lgscpaux  
 
 gsprintconsole_ves: gsprintconsole_ves.o ../../lib/libgscphostaux.a ../../lib/libgscphost.a ../../lib/libgscpinterface.a ../../lib/libgscpapp.a
-       g++ -g -o gsprintconsole_ves gsprintconsole_ves.o -L../../lib -lgscpapp  -lgscphostaux -lgscphost -lgscpinterface -lgscplftaaux -lclearinghouse -lgscpaux  
+       g++ -g -o gsprintconsole_ves gsprintconsole_ves.o -L../../lib -lgscpapp  -lgscphostaux -lgscphost -lgscpinterface -lgscplftaaux -lclearinghouse -lgscpaux -lrmr_si -lm -lpthread
 
 gsmcnib: gsmcnib.o ../../lib/libgscphostaux.a ../../lib/libgscphost.a ../../lib/libgscpinterface.a ../../lib/libgscpapp.a
        g++ -g -o gsmcnib gsmcnib.o -L../../lib -lgscpapp  -lgscphostaux -lgscphost -lgscpinterface -lgscplftaaux -lclearinghouse -lgscpaux -lsdl
diff --git a/mc-core/mc/mcnib/gsprintconsole_ves.c b/mc-core/mc/mcnib/gsprintconsole_ves.c
new file mode 100644 (file)
index 0000000..75dccb0
--- /dev/null
@@ -0,0 +1,727 @@
+/* ------------------------------------------------
+ Copyright 2014 AT&T Intellectual Property
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ ------------------------------------------- */
+
+
+/*
+ * Print ves formatted records to the console.
+ * Each line is a json record.
+ * Based on gsprintconsole.c, just differences in formatting.
+*/
+
+
+#include <app.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <signal.h>
+#include <time.h>
+#include <string.h>
+#include <sys/time.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+
+#include <errno.h>
+#include <string.h>
+#include <sys/epoll.h>
+
+#include <rmr/rmr.h>
+#include <rmr/RIC_message_types.h>
+
+
+#include "gsconfig.h"
+#include "gstypes.h"
+#include "gshub.h"
+#include "simple_http.h"
+
+#include <schemaparser.h>
+
+#define MAXLINE 100000
+static unsigned tcpport=0;
+static char linebuf[MAXLINE];
+int listensockfd=0;
+int fd=0;
+
+// how frequently we will log stats (expressed in tuples posted)
+#define STAT_FREQUENCY 5
+
+
+// Not all systems have timersub defined so make sure its ther
+#ifndef timersub
+
+#define timersub(tvp, uvp, vvp)                                         \
+do {                                                            \
+(vvp)->tv_sec = (tvp)->tv_sec - (uvp)->tv_sec;          \
+(vvp)->tv_usec = (tvp)->tv_usec - (uvp)->tv_usec;       \
+if ((vvp)->tv_usec < 0) {                               \
+(vvp)->tv_sec--;                                \
+(vvp)->tv_usec += 1000000;                      \
+}                                                       \
+} while (0)
+
+#endif
+
+void hand(int iv) {
+    ftaapp_exit();
+    fprintf(stderr, "exiting via signal handler %d...\n", iv);
+    exit(1);
+}
+
+static void wait_for_client() {
+    struct sockaddr_in serv_addr,cli_addr;
+    socklen_t clilen;
+    if (listensockfd==0) {
+               gs_int32_t on = 1;
+               listensockfd=socket(AF_INET, SOCK_STREAM, 0);
+        if (listensockfd < 0) {
+                       gslog(LOG_EMERG,"Error:Could not create socket for tcp data stream");
+                       exit(1);
+               }
+               bzero((char *) &serv_addr, sizeof(serv_addr));
+               serv_addr.sin_family = AF_INET;
+               serv_addr.sin_addr.s_addr = INADDR_ANY;
+               serv_addr.sin_port = htons(tcpport);
+#ifndef __linux__
+        /* make sure we can reuse the common port rapidly */
+        if (setsockopt(listensockfd, SOL_SOCKET, SO_REUSEPORT,
+                       (gs_sp_t )&on, sizeof(on)) != 0) {
+            gslog(LOG_EMERG,"Error::could not set socket option");
+            exit(1);
+        }
+#endif
+        if (setsockopt(listensockfd, SOL_SOCKET, SO_REUSEADDR,
+                       (gs_sp_t )&on, sizeof(on)) != 0) {
+            gslog(LOG_EMERG,"Error::could not set socket option");
+            exit(1);
+               }
+        
+               if (bind(listensockfd, (struct sockaddr *) &serv_addr,
+                 sizeof(serv_addr)) < 0) {
+                       gslog(LOG_EMERG,"Error:Could not bind socket for tcp data stream");
+            exit(1);
+        }
+       }
+    
+       do {
+               listen(listensockfd,5);
+               clilen = sizeof(cli_addr);
+               fd=accept(listensockfd, (struct sockaddr *) &cli_addr, &clilen);
+               if (fd<0) {
+            gslog(LOG_EMERG,"Error:Could not accept connection on tcp socket");
+               }
+       } while (fd==0);
+}
+
+
+static void emit_socket() {
+       unsigned o,w,l;
+       o=0;
+       w=0;
+       l=strlen(linebuf);
+       do {
+               if((w=write(fd,&linebuf[o],l))==0) {
+                       close(fd);
+                       wait_for_client();
+               }
+               o=o+w;
+       } while (o<l);
+}
+
+static void emit_line() {
+    
+    if (tcpport==0) {
+        printf("%s",linebuf);
+    } else {
+        emit_socket();
+    }
+    
+}
+
+int main(int argc, char* argv[]) {
+    gs_sp_t me = argv[0];
+    FTAID fta_id;
+    gs_int32_t schema, ch;
+    
+    FTAID rfta_id;
+    gs_uint32_t rsize;
+    gs_uint32_t bufsz=8*1024*1024;
+    gs_int8_t rbuf[2*MAXTUPLESZ];
+    
+    gs_int32_t numberoffields;
+    gs_int32_t verbose=0;
+    gs_int32_t y, lcv;
+    
+    void *pblk;
+    gs_int32_t pblklen;
+       gs_int32_t n_actual_param;
+       gs_int32_t n_expected_param;
+    gs_int32_t xit = 0;
+    gs_int32_t dump = 0;
+    struct timeval tvs, tve, tvd;
+    gs_retval_t code;
+    endpoint gshub;
+    endpoint dummyep;
+    gs_uint32_t tip1,tip2,tip3,tip4;
+    gs_sp_t instance_name;
+
+    // RMR-related parameters
+    gs_sp_t    rmr_port = NULL;
+    gs_int32_t rmr_mtype = MC_REPORT;    
+
+       gs_sp_t curl_address = NULL;
+       endpoint curl_endpoint;
+       gs_sp_t curl_url = NULL;
+       gs_sp_t curl_auth = NULL;
+       gs_uint32_t http_code;
+
+       gs_uint32_t ves_version=7;
+
+       void* mrc;                                                      //msg router context
+       struct epoll_event events[1];                   // list of events to give to epoll
+       struct epoll_event epe;                 // event definition for event to listen to
+       gs_int32_t     ep_fd = -1;                                              // epoll's file des (given to epoll_wait)
+       gs_int32_t rcv_fd;                                              // file des that NNG tickles -- give this to epoll to listen on
+       gs_int32_t nready;                                                              // number of events ready for receive
+       rmr_mbuf_t*             rmr_sbuf;                                       // send buffer
+       rmr_mbuf_t*             rmr_rbuf;                                       // received buffer
+
+    gs_uint32_t tlimit = 0;     // time limit in seconds
+    time_t start_time, curr_time;
+
+    gs_uint64_t post_success_cnt = 0ULL;
+    gs_uint64_t post_failure_cnt = 0ULL;    
+
+    gs_uint64_t rmr_post_success_cnt = 0ULL;
+    gs_uint64_t rmr_post_failure_cnt = 0ULL;    
+
+    
+       gsopenlog(argv[0]);
+    
+    while ((ch = getopt(argc, argv, "l:p:r:vXDC:U:R:A:V:")) != -1) {
+        switch (ch) {
+            case 'r':
+                bufsz=atoi(optarg);
+                break;
+            case 'p':
+                tcpport=atoi(optarg);
+                break;
+            case 'v':
+                verbose++;
+                break;
+            case 'X':
+                xit++;
+                break;
+            case 'D':
+                dump++;
+                break;
+            case 'l':
+                tlimit = atoi(optarg);
+                break;
+            case 'V':
+                ves_version = atoi(optarg);
+                break;
+                       case 'C':
+                               curl_address = strdup(optarg);
+                       if (sscanf(curl_address,"%u.%u.%u.%u:%hu",&tip1,&tip2,&tip3,&tip4,&(curl_endpoint.port))!= 5 ) {
+                               gslog(LOG_EMERG,"Curl IP NOT DEFINED");
+                               exit(1);
+                       }
+                       curl_endpoint.ip=htonl(tip1<<24|tip2<<16|tip3<<8|tip4);
+                       curl_endpoint.port=htons(curl_endpoint.port);
+                               break;
+                       case 'R':
+                rmr_port=strdup(optarg);
+                break;
+            case 'U':
+                               curl_url = strdup(optarg);
+                break;
+            case 'A':
+                               curl_auth = strdup(optarg);
+                break;
+            default:
+            usage:
+                fprintf(stderr, "usage: %s [-r <bufsz>] [-p <port>] [-l <time_limit>] [-v] [-X] [-D] [-C <curl_dest>:<curl_port>] [-U <curl_url>] [-A <authentication_string>] [-V <ves_version>] [-R <rmr_port>] <gshub-hostname>:<gshub-port> <gsinstance_name>  query param1 param2...\n",
+                        *argv);
+                exit(1);
+        }
+    }
+    argc -= optind;
+    argv += optind;
+    if (argc<3) goto usage;
+    
+    if (sscanf(argv[0],"%u.%u.%u.%u:%hu",&tip1,&tip2,&tip3,&tip4,&(gshub.port))!= 5 ) {
+        gslog(LOG_EMERG,"HUB IP NOT DEFINED");
+        exit(1);
+    }
+    gshub.ip=htonl(tip1<<24|tip2<<16|tip3<<8|tip4);
+    gshub.port=htons(gshub.port);
+    instance_name=strdup(argv[1]);
+    if (set_hub(gshub)!=0) {
+        gslog(LOG_EMERG,"Could not set hub");
+        exit(1);
+    }
+    if (set_instance_name(instance_name)!=0) {
+        gslog(LOG_EMERG,"Could not set instance name");
+        exit(1);
+    }
+    
+    if (get_initinstance(gshub,instance_name,&dummyep,1)!=0) {
+        gslog(LOG_EMERG,"Did not receive signal that GS is initiated");
+    }
+
+
+//     If this uses curl output, ensure consistency in the curl args
+       if(curl_address != NULL){
+               if(curl_url == NULL){
+                       gslog(LOG_EMERG,"Curl IP defined, but there is no url (e.g. /foo/bar");
+                       exit(1);
+               }
+               if(curl_auth==NULL){
+                       curl_auth = "";
+               } 
+       }
+    
+    gettimeofday(&tvs, 0);
+    argc -=2;
+    argv +=2;
+    if (argc < 1)
+        goto usage;
+
+    if (rmr_port) {
+        /* initialize RMR library */
+        if( (mrc = rmr_init( rmr_port, 1400, RMRFL_NONE )) == NULL ) {
+            fprintf(stderr, "%s::error:unable to initialise RMR\n", me);
+            exit( 1 );
+        }
+
+        rcv_fd = rmr_get_rcvfd( mrc );                                 // set up epoll things, start by getting the FD from MRr
+        if( rcv_fd < 0 ) {
+            fprintf(stderr, "%s::error:unable to set up polling fd\n", me);
+            exit( 1 );
+        }
+        if( (ep_fd = epoll_create1( 0 )) < 0 ) {
+            fprintf(stderr, "%s::error:unable to create epoll fd: %d\n", me, errno);
+            exit( 1 );
+        }
+        epe.events = EPOLLIN;
+        epe.data.fd = rcv_fd;
+
+        if( epoll_ctl( ep_fd, EPOLL_CTL_ADD, rcv_fd, &epe ) != 0 )  {
+            fprintf(stderr, "%s::error:epoll_ctl status not 0 : %s\n", me, strerror(errno));
+            exit( 1 );
+        }
+
+        rmr_sbuf = rmr_alloc_msg( mrc, MAXLINE );      // alloc first send buffer; subsequent buffers allcoated on send
+        rmr_rbuf = NULL;                                               // don't need to alloc receive buffer
+
+        while( ! rmr_ready( mrc ) ) {          // must have a route table before we can send; wait til RMr say it has one
+            sleep( 1 );
+        }
+        fprintf( stderr, "RMR is ready\n" );        
+    }
+        
+    /* initialize host library and the sgroup  */
+    
+    if (verbose>=2) fprintf(stderr,"Initializing gscp\n");
+    
+    if (ftaapp_init(bufsz)!=0) {
+        fprintf(stderr,"%s::error:could not initialize gscp\n", me);
+        exit(1);
+    }
+    
+    signal(SIGTERM, hand);
+    signal(SIGINT, hand);
+    
+    schema = ftaapp_get_fta_schema_by_name(argv[0]);
+    if (schema < 0) {
+        fprintf(stderr,"%s::error:could not get fta '%s' schema\n",
+                me ,argv[0]);
+        exit(1);
+    }
+       n_expected_param = ftaschema_parameter_len(schema);
+    if (n_expected_param == 0) {
+        pblk = 0;
+        pblklen = 0;
+    } else {
+               n_actual_param = argc-1;
+               if(n_actual_param < n_expected_param){
+                       fprintf(stderr,"Error, %d query parameters expected, %d provided.\n",n_expected_param, n_actual_param);
+                       exit(1);
+               }
+        /* parse the params */
+        for (lcv = 1 ; lcv < argc ; lcv++) {
+            char *k, *e;
+            int rv;
+            k = argv[lcv];
+            e = k;
+            while (*e && *e != '=') e++;
+            if (*e == 0) {
+                fprintf(stderr,"param parse error '%s' (fmt 'key=val')\n",
+                        argv[lcv]);
+                exit(1);
+            }
+            *e = 0;
+            rv = ftaschema_setparam_by_name(schema, k, e+1, strlen(e+1));
+            *e = '=';
+            if (rv < 0) {
+                fprintf(stderr,"param setparam error '%s' (fmt 'key=val')\n",
+                        argv[lcv]);
+                exit(1);
+            }
+        }
+        if (ftaschema_create_param_block(schema, &pblk, &pblklen) < 0) {
+            fprintf(stderr, "ftaschema_create_param_block failed!\n");
+            exit(1);
+        }
+    }
+//    ftaschema_free(schema); /* XXXCDC */ // the schema continues to be used
+    
+    
+    if (verbose>=2) fprintf(stderr,"Initalized FTA\n");
+    
+    fta_id=ftaapp_add_fta(argv[0],0,0,0,pblklen,pblk);
+    if (fta_id.streamid==0) {
+        fprintf(stderr,"%s::error:could not initialize fta %s\n",
+                me, argv[0]);
+        exit(1);
+    }
+    /* XXXCDC: pblk is malloc'd, should we free it? */
+    
+    if (verbose>=2) fprintf(stderr,"Get schema handle\n");
+    
+    if ((schema=ftaapp_get_fta_schema(fta_id))<0) {
+        fprintf(stderr,"%s::error:could not get schema\n", me);
+        exit(1);
+    }
+    
+    if ((numberoffields=ftaschema_tuple_len(schema))<0) {
+        fprintf(stderr,"%s::error:could not get number of fields in schema\n",
+                me);
+        exit(1);
+    }
+    
+    if (verbose>=1) {
+        for(y=0; y<numberoffields;y++) {
+            printf("%s",ftaschema_field_name(schema,y));
+            if (y<numberoffields-1) printf("|");
+        }
+        printf("\n");
+    }
+    if (xit) { // -X in command line
+        gettimeofday(&tve, 0);
+        timersub(&tve, &tvs, &tvd);
+        printf("TIME= %ld%06d sec\n", tvd.tv_sec, tvd.tv_usec);
+        hand(0);       // effectively an exit
+    }
+    if (tcpport!=0) {
+       wait_for_client();
+    }
+
+    start_time = time(NULL);
+
+       int measurement_interval_pos = -1; // extract measurementInterval if present
+       char *field_names[numberoffields];
+    for(y=0; y<numberoffields;y++) {
+               field_names[y] = strdup(ftaschema_field_name(schema,y));
+               if(strcmp(field_names[y], "measurementInterval")==0)
+                       measurement_interval_pos = y;
+       }
+
+
+       struct timeval tsample;
+       gettimeofday(&tsample, 0);
+       char start_ts[100], curr_ts[100];
+       sprintf(start_ts,"%ld%06d", tsample.tv_sec, tsample.tv_usec);
+
+       long unsigned int lineno=0;
+       long unsigned int seqno=0;
+       double measurement_interval;
+    while((code=ftaapp_get_tuple(&rfta_id,&rsize,rbuf,2*MAXTUPLESZ,0))>=0) {
+               lineno++;
+        if (dump)      // -D in command line
+            continue;
+        if (ftaschema_is_eof_tuple(schema, rbuf)) {
+            /* initiate shutdown or something of that nature */
+            printf("#All data proccessed\n");
+            exit(0);
+        }
+        if (!rsize)
+            continue;
+        if (verbose >=2) {
+            snprintf(linebuf,MAXLINE,"RESULT CODE => %u\n",code);
+            emit_line();
+        }
+        if ((code==0)&&(rfta_id.streamid == fta_id.streamid)) {
+                       seqno++;
+                       gettimeofday(&tsample, 0);
+                       sprintf(curr_ts,"%ld%06d", tsample.tv_sec, tsample.tv_usec);
+                       int pos;
+                       if(ves_version < 7){
+                               pos = snprintf(linebuf, MAXLINE,
+  "{\"event\": { \"commonEventHeader\": { "
+        "\"domain\": \"measurementsForVfScaling\", "
+        "\"eventId\": \"%s%u\", "
+        "\"eventType\": \"%s\", "
+        "\"eventName\": \"Measurement_MC_%s\", "
+        "\"lastEpochMicrosec\": %s, "
+        "\"priority\": \"Normal\", "
+        "\"reportingEntityName\": \"GS-LITE MC\", "
+        "\"sequence\": %u, "
+        "\"sourceName\": \"meas_cmpgn_xapp\", "
+        "\"startEpochMicrosec\": %s, "
+        "\"version\": 5 "
+      "}, "
+      "\"measurementsForVfScalingFields\": { "
+               "\"additionalFields\": ["
+                               ,argv[0],lineno, argv[0], argv[0], curr_ts, seqno, start_ts
+                               );
+                       }else{
+                               pos = snprintf(linebuf, MAXLINE,
+  "{\"event\": { \"commonEventHeader\": { "
+        "\"domain\": \"measurement\", "
+        "\"eventId\": \"%s%u\", "
+        "\"eventType\": \"%s\", "
+        "\"eventName\": \"Measurement_MC_%s\", "
+        "\"lastEpochMicrosec\": %s, "
+        "\"priority\": \"Normal\", "
+        "\"reportingEntityName\": \"GS-LITE MC\", "
+        "\"sequence\": %u, "
+        "\"sourceName\": \"meas_cmpgn_xapp\", "
+        "\"startEpochMicrosec\": %s, "
+        "\"version\": \"4.0.1\", "
+        "\"vesEventListenerVersion\": \"7.0.1\" "
+      "}, "
+      "\"measurementFields\": { "
+               "\"additionalFields\": {"
+                               ,argv[0],lineno, argv[0], argv[0], curr_ts, seqno, start_ts
+                               );
+                       }
+
+                       measurement_interval = 0;
+            for(y=0; y<numberoffields;y++) {
+                struct access_result ar;
+//                if (verbose>=2)
+//                    printf("%s->",ftaschema_field_name(schema,y));
+               if(y>0){
+                                       linebuf[pos]=',';
+                                       pos++;
+                               }
+                ar=ftaschema_get_field_by_index(schema,y,rbuf,rsize);
+                switch (ar.field_data_type) {
+                    case INT_TYPE:
+                                               if(ves_version < 7)
+                               pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%d\"}",field_names[y], ar.r.i);
+                                               else
+                               pos += snprintf(linebuf+pos,MAXLINE-pos," \"%s\": \"%d\"",field_names[y], ar.r.i);
+                                               if(y==measurement_interval_pos)
+                                                       measurement_interval = (double)ar.r.i;
+                        break;
+                    case UINT_TYPE:
+                                               if(ves_version < 7)
+                               pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%u\"}",field_names[y], ar.r.ui);
+                                               else
+                               pos += snprintf(linebuf+pos,MAXLINE-pos," \"%s\": \"%u\"",field_names[y], ar.r.ui);
+                                               if(y==measurement_interval_pos)
+                                                       measurement_interval = (double)ar.r.ui;
+                        break;
+                    case IP_TYPE:
+                                               if(ves_version < 7)
+                               pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%u.%u.%u.%u\"}",field_names[y], ar.r.ui>>24&0xff,
+                                 ar.r.ui>>16&0xff,
+                                 ar.r.ui>>8&0xff,
+                                 ar.r.ui&0xff);
+                                               else
+                               pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"%u.%u.%u.%u\"",field_names[y], ar.r.ui>>24&0xff,
+                                 ar.r.ui>>16&0xff,
+                                 ar.r.ui>>8&0xff,
+                                 ar.r.ui&0xff);
+                        break;
+                    case IPV6_TYPE:
+                    {
+                                               if(ves_version < 7)
+                               pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"",field_names[y]);
+                                               else
+                               pos += snprintf(linebuf+pos,MAXLINE-pos," \"%s\": \"",field_names[y]);
+                        unsigned x;
+                        unsigned zc=0;
+                        for(x=0;x<4;x++) { if (ar.r.ip6.v[x]==0) zc++;}
+                        if (zc!=4) {
+                            snprintf(linebuf,MAXLINE,"");
+                            for(x=0;x<8;x++) {
+                                unsigned char * a = (unsigned char *)  &(ar.r.ip6.v[0]);
+                                unsigned y;
+                                y=((unsigned)a[2*x])<<8|((unsigned) a[2*x+1]);
+                                pos += snprintf(linebuf+pos,MAXLINE-pos,"%04x",y);
+                                if (x<7){
+                                                                       pos += snprintf(linebuf+pos,MAXLINE-pos,":");
+                                                               }
+                            }
+                        } else {
+                            pos+=snprintf(linebuf+pos,MAXLINE-pos,"::");
+                        }
+                                               if(ves_version < 7)
+                                                       pos += snprintf(linebuf+pos, MAXLINE-pos,"\"}");
+                                               else
+                                                       pos += snprintf(linebuf+pos, MAXLINE-pos,"\"");
+                    }
+                        break;
+                        
+                    case USHORT_TYPE:
+                                               if(ves_version < 7)
+                               pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%u\"}",field_names[y], ar.r.ui);
+                                               else
+                               pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\":  \"%u\"}",field_names[y], ar.r.ui);
+                                               if(y==measurement_interval_pos)
+                                                       measurement_interval = (double)ar.r.ui;
+                        break;
+                    case BOOL_TYPE:
+                                               if(ves_version < 7){
+                               if (ar.r.ui==0) {
+                                       pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"FALSE\"}",field_names[y]);
+                                    } else {
+                                       pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"TRUE\"}",field_names[y]);
+                               }
+                                               }else{
+                               if (ar.r.ui==0) {
+                                       pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"FALSE\"",field_names[y]);
+                                    } else {
+                                       pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"TRUE\"",field_names[y]);
+                               }
+                                               }
+                        break;
+                    case ULLONG_TYPE:
+                                               if(ves_version < 7)
+                               pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%llu\"}",field_names[y], ar.r.ul);
+                                               else
+                               pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"%llu\"",field_names[y], ar.r.ul);
+                                               if(y==measurement_interval_pos)
+                                                       measurement_interval = (double)ar.r.ul;
+                        break;
+                    case LLONG_TYPE:
+                                               if(ves_version < 7)
+                               pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%lld\"}",field_names[y], ar.r.l);
+                                               else
+                               pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"%lld\"",field_names[y], ar.r.l);
+                                               if(y==measurement_interval_pos)
+                                                       measurement_interval = (double)ar.r.l;
+                        break;
+                    case FLOAT_TYPE:
+                                               if(ves_version < 7)
+                               pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%f\"}",field_names[y], ar.r.f);
+                                               else
+                               pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"%f\"",field_names[y], ar.r.f);
+                                               if(y==measurement_interval_pos)
+                                                       measurement_interval = (double)ar.r.f;                            
+                        break;
+                    case TIMEVAL_TYPE:
+                    {
+                        gs_float_t t;
+                        t= ar.r.t.tv_usec;
+                        t=t/1000000;
+                        t=t+ar.r.t.tv_sec;
+                                               if(ves_version < 7)
+                               pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"%f\"}",field_names[y], t);
+                                               else
+                               pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"%f\"",field_names[y], t);
+                    }
+                        break;
+                    case VSTR_TYPE:
+                    {
+                                               if(ves_version < 7)
+                               pos += snprintf(linebuf+pos,MAXLINE-pos,"{\"name\": \"%s\", \"value\": \"",field_names[y]);
+                                               else
+                               pos += snprintf(linebuf+pos,MAXLINE-pos,"\"%s\": \"",field_names[y]);
+                        int x;
+                        int c;
+                        char * src;
+                        src=(char*)ar.r.vs.offset;
+                                               for(x=0;x<ar.r.vs.length;x++){
+                            c=src[x];
+                            if ((c<='~') && (c>=' ')) {
+                                if (pos<MAXLINE-1) {
+                                    linebuf[pos]=c;
+                                    pos++;
+                                }
+                            } else {
+                                if (pos<MAXLINE-1) {
+                                    linebuf[pos]='.';
+                                    pos++;
+                                }
+                            }
+                                               }
+                                               if(ves_version < 7)
+                               pos += snprintf(linebuf+pos,MAXLINE-pos,"\"}");
+                                               else
+                               pos += snprintf(linebuf+pos,MAXLINE-pos,"\"");
+                    }
+                        break;
+                    default:
+                        linebuf[0]=0;
+                        break;
+                }
+                       }
+                       if(ves_version < 7){
+                               snprintf(linebuf+pos, MAXLINE-pos,
+               "], \"measurementInterval\": %f, \"measurementsForVfScalingVersion\": 1"
+               "}}}\n", measurement_interval
+                               );
+                       }else{
+                               snprintf(linebuf+pos, MAXLINE-pos,
+               "}, \"measurementInterval\": %f, \"measurementFieldsVersion\": \"4.0\""
+               "}}}\n", measurement_interval
+                               );
+                       }
+
+            if (rmr_port) {
+                rmr_sbuf->mtype = rmr_mtype;                                                   // fill in the message bits
+                rmr_sbuf->len =  strlen(linebuf) + 1;          // our receiver likely wants a nice acsii-z string
+                rmr_sbuf->state = 0;
+                rmr_sbuf = rmr_send_msg( mrc, rmr_sbuf);                               // send it (send returns an empty payload on success, or the original payload on fail/retry)
+                while( rmr_sbuf->state == RMR_ERR_RETRY ) {                    // soft failure (device busy?) retry
+                    rmr_sbuf = rmr_send_msg( mrc, rmr_sbuf);                   // retry send until it's good (simple test; real programmes should do better)
+                }
+                rmr_post_success_cnt++;
+
+            }
+
+                       if(curl_address==NULL){
+               emit_line();
+                       }else{
+                               http_post_request_hdr(curl_endpoint, curl_url, linebuf, &http_code, curl_auth);
+                               if(http_code != 200 && http_code != 202){
+                    post_failure_cnt++; 
+                                       gslog(LOG_WARNING, "http return code is %d",http_code);
+                               } else {
+                    post_success_cnt++;   
+                }  
+                if (((post_success_cnt+post_failure_cnt) % STAT_FREQUENCY) == 0)
+                    gslog(LOG_WARNING, "%s: successful ves posts - %llu, failed ves posts - %llu", argv[0], post_success_cnt, post_failure_cnt);
+                       }
+            if (verbose!=0) fflush(stdout);
+        } else {
+            if (rfta_id.streamid != fta_id.streamid)
+                fprintf(stderr,"Got unknown streamid %llu \n",rfta_id.streamid);
+        }
+
+        // whenever we receive a temp tuple check if we reached time limit
+        if ((code==2)  && tlimit && (time(NULL)-start_time)>=tlimit) {
+            fprintf(stderr,"Reached time limit of %d seconds\n",tlimit);    
+            ftaapp_exit();
+            exit(0);
+        }        
+    }
+}
+
index c443dcc..04d5f36 100644 (file)
@@ -69,6 +69,8 @@ DEBUG_MODE=`python /mc/extract_params.py ${XAPP_DESCRIPTOR_PATH}/config-file.jso
 
 WINDOW=`python /mc/extract_params.py ${XAPP_DESCRIPTOR_PATH}/config-file.json measurement_interval`
 
+RMR_PORT=`python /mc/extract_rmr_port.py ${XAPP_DESCRIPTOR_PATH}/config-file.json rmr_data_out`
+
 # export DBAAS_SERVICE_HOST=`python /mc/extract_params.py ${XAPP_DESCRIPTOR_PATH}/config-file.json __DBAAS_SERVICE_HOST__`
 # export DBAAS_SERVICE_PORT=`python /mc/extract_params.py ${XAPP_DESCRIPTOR_PATH}/config-file.json __DBAAS_SERVICE_PORT__`
 
@@ -119,12 +121,17 @@ fi
 
 runall += """
 
+if [ "$RMR_PORT" != "" ]
+then
+    RMR_OPTION="-R $RMR_PORT"
+fi
+
 # invoke gsprintconsole_ves gsmcnib for all non-debug queries
 """
 
 for q in oqy:
        if "debug" not in q:
-               runall += " /mc/gs-lite/bin/gsprintconsole_ves -C $VES_IP:$VES_PORT -U /vescollector/eventListener/v7 -V 7 `cat gshub.log` default "+q+" window=$WINDOW &\n"
+               runall += " /mc/gs-lite/bin/gsprintconsole_ves -C $VES_IP:$VES_PORT -U /vescollector/eventListener/v7 -V 7 $RMR_OPTION `cat gshub.log` default "+q+" window=$WINDOW &\n"
                keys = nib[q]["keys"]
                if len(keys)>0:
                        keys_str = ",".join(keys)