5.0.1 Replace the Prometheus library with the new one Change in the API remove one... 37/3937/1
authoraa7133@att.com <aa7133@att.com>
Mon, 1 Jun 2020 14:19:20 +0000 (17:19 +0300)
committeraa7133@att.com <aa7133@att.com>
Mon, 1 Jun 2020 14:19:51 +0000 (17:19 +0300)
Add push capability to Prometheus
add all statistic counters

Change-Id: Idd8aa8258e4aedbb2a04c89dfac7ac1c1263774a
Signed-off-by: aa7133@att.com <aa7133@att.com>
RIC-E2-TERMINATION/CMakeLists.txt
RIC-E2-TERMINATION/config/config.conf
RIC-E2-TERMINATION/container-tag.yaml
RIC-E2-TERMINATION/sctpThread.cpp
RIC-E2-TERMINATION/sctpThread.h

index 398fd6d..fbb9a12 100644 (file)
@@ -103,6 +103,7 @@ link_libraries(nsl
         cgreen
         prometheus-cpp-core
         prometheus-cpp-pull
+        prometheus-cpp-push
         z
         curl
         pthread)
@@ -126,6 +127,7 @@ target_link_libraries(e2 libicuuc.a)
 target_link_libraries(e2 libicudata.a)
 target_link_libraries(e2 prometheus-cpp-core.a)
 target_link_libraries(e2 prometheus-cpp-pull.a)
+target_link_libraries(e2 prometheus-cpp-push.a)
 
 #target_link_libraries(e2 libnng.a)
 
index 86d1180..d6f5008 100644 (file)
@@ -4,6 +4,11 @@ volume=log
 #the key name of the environment holds the local ip address
 #ip address of the E2T in the RMR
 local-ip=127.0.0.1
+#prometheus mode can be pull or push
+prometheusMode=pull
+#timeout can be from 5 seconds to 300 seconds default is 10
+prometheusPushTimeOut=10
+prometheusPushAddr=127.0.0.1:7676
 prometheusPort=8088
 #trace is start, stop
 trace=start
index adeeeda..6a6c6f6 100644 (file)
@@ -1,3 +1,3 @@
 # The Jenkins job requires a tag to build the Docker image.
 # Global-JJB script assumes this file is in the repo root.
-tag: 5.0.0
+tag: 5.0.1
index 49a4c51..62f42a3 100644 (file)
@@ -239,6 +239,25 @@ int buildConfiguration(sctp_params_t &sctpParams) {
     }
     jsonTrace = sctpParams.trace;
 
+    tmpStr = conf.getStringValue("prometheusMode");
+    transform(tmpStr.begin(), tmpStr.end(), tmpStr.begin(), ::tolower);
+    if (tmpStr.length() != 0) {
+        if (tmpStr.compare("push")) {
+            sctpParams.prometheusPushAddress = tmpStr;
+            auto timeout = conf.getIntValue("prometheusPushTimeOut");
+            if (timeout >= 5 && timeout <= 300) {
+                sctpParams.epollTimeOut = timeout;
+            } else {
+                sctpParams.epollTimeOut = 10;
+            }
+        }
+    }
+
+    tmpStr = conf.getStringValue("prometheusPushAddr");
+    if (tmpStr.length() != 0) {
+        sctpParams.prometheusMode = tmpStr;
+    }
+
     tmpStr = conf.getStringValue("prometheusPort");
     if (tmpStr.length() != 0) {
         sctpParams.prometheusPort = tmpStr;
@@ -289,6 +308,15 @@ int buildConfiguration(sctp_params_t &sctpParams) {
     return 0;
 }
 
+static std::string GetHostName() {
+    char hostname[1024];
+
+    if (::gethostname(hostname, sizeof(hostname))) {
+        return {};
+    }
+    return hostname;
+}
+
 
 
 int main(const int argc, char **argv) {
@@ -333,17 +361,15 @@ int main(const int argc, char **argv) {
     }
 
     //auto registry = std::make_shared<Registry>();
-    sctpParams.promteheusRegistry = std::make_shared<Registry>();
+    sctpParams.prometheusRegistry = std::make_shared<Registry>();
 
-    //sctpParams.prometheusFamily = new Family<Counter>("E2T", "E2T message counter", {{"E", sctpParams.podName}});
+    //sctpParams.promtheusFamily = new Family<Counter>("E2T", "E2T message counter", {{"E", sctpParams.podName}});
 
     sctpParams.prometheusFamily = &BuildCounter()
             .Name("E2T")
             .Help("E2T message counter")
             .Labels({{"E", sctpParams.podName}})
-            .Register(*sctpParams.promteheusRegistry);
-
-    Exposer exposer{sctpParams.myIP + ":" + sctpParams.prometheusPort, "/metrics", 1};
+            .Register(*sctpParams.prometheusRegistry);
 
 
     // start epoll
@@ -378,7 +404,26 @@ int main(const int argc, char **argv) {
     std::vector<std::thread> threads(num_cpus);
 //    std::vector<std::thread> threads;
 
-    exposer.RegisterCollectable(sctpParams.promteheusRegistry);
+    if (sctpParams.prometheusMode.compare("pull") == 0) {
+        sctpParams.prometheusExposer = new Exposer(sctpParams.myIP + ":" + sctpParams.prometheusPort, 1);
+        sctpParams.prometheusExposer->RegisterCollectable(sctpParams.prometheusRegistry);
+    } else if (sctpParams.prometheusMode.compare("push") == 0) {
+        const auto labels = Gateway::GetInstanceLabel(GetHostName());
+        string address {};
+        string port {};
+        char ch = ':';
+        auto found = sctpParams.prometheusPushAddress.find_last_of(ch);
+        // If string doesn't have
+        // character ch present in it
+        if (found != string::npos) {
+            address = sctpParams.prometheusPushAddress.substr(0,found);
+            port = sctpParams.prometheusPushAddress.substr(found + 1);
+            sctpParams.prometheusGateway = new Gateway(address, port, "E2T", labels);
+            sctpParams.prometheusGateway->RegisterCollectable(sctpParams.prometheusRegistry);
+        } else {
+            mdclog_write(MDCLOG_ERR, "failed to build Prometheus gateway no stats will be sent");
+        }
+    }
 
     num_cpus = 1;
     for (unsigned int i = 0; i < num_cpus; i++) {
@@ -574,21 +619,39 @@ void listener(sctp_params_t *params) {
 //        rmrMessageBuffer.sendBufferedMessages[i] = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
 //    }
 
+    bool gatewayflag = false;
     while (true) {
+        future<int> gateWay;
+
         if (mdclog_level_get() >= MDCLOG_DEBUG) {
             mdclog_write(MDCLOG_DEBUG, "Start EPOLL Wait");
         }
-        auto numOfEvents = epoll_wait(params->epoll_fd, events, MAXEVENTS, -1);
-        if (numOfEvents < 0 && errno == EINTR) {
-            if (mdclog_level_get() >= MDCLOG_DEBUG) {
-                mdclog_write(MDCLOG_DEBUG, "got EINTR : %s", strerror(errno));
+        auto numOfEvents = epoll_wait(params->epoll_fd, events, MAXEVENTS, params->epollTimeOut);
+        if (numOfEvents == 0) {
+            if (params->prometheusGateway != nullptr) {
+                gateWay = params->prometheusGateway->AsyncPush();
+                gatewayflag = true;
             }
             continue;
-        }
-        if (numOfEvents < 0) {
+        } else if (numOfEvents < 0) {
+            if (errno == EINTR) {
+                if (mdclog_level_get() >= MDCLOG_DEBUG) {
+                    mdclog_write(MDCLOG_DEBUG, "got EINTR : %s", strerror(errno));
+                }
+                continue;
+            }
             mdclog_write(MDCLOG_ERR, "Epoll wait failed, errno = %s", strerror(errno));
             return;
         }
+        if (gatewayflag) {
+            gatewayflag = false;
+            auto rc = gateWay.get();
+            if (rc != 200) {
+                mdclog_write(MDCLOG_ERR, "Async Send to Promethues faild with Return Code %d", rc);
+            } else if (mdclog_level_get() >= MDCLOG_DEBUG) {
+                mdclog_write(MDCLOG_DEBUG, "Stats sent to Prometheus");
+            }
+        }
         for (auto i = 0; i < numOfEvents; i++) {
             if (mdclog_level_get() >= MDCLOG_DEBUG) {
                 mdclog_write(MDCLOG_DEBUG, "handling epoll event %d out of %d", i + 1, numOfEvents);
@@ -806,6 +869,14 @@ void handleConfigChange(sctp_params_t *sctpParams) {
                     sctpParams->trace = false;
                 }
                 jsonTrace = sctpParams->trace;
+
+                auto timeout = conf.getIntValue("prometheusPushTimeOut");
+                if (timeout >= 5 && timeout <= 300) {
+                    sctpParams->epollTimeOut = timeout;
+                } else {
+                    mdclog_write(MDCLOG_ERR, "prometheusPushTimeOut set wrong value %d, values are [5..300]", timeout);
+                }
+
                 endlessLoop = false;
             }
         }
@@ -1018,8 +1089,6 @@ int sendSctpMsg(ConnectedCU_t *peerInfo, ReportingMessages_t &message, Sctp_Map_
  * @param rmrMessageBuffer
  */
 void getRequestMetaData(ReportingMessages_t &message, RmrMessagesBuffer_t &rmrMessageBuffer) {
-    rmr_get_meid(rmrMessageBuffer.rcvMessage, (unsigned char *) (message.message.enodbName));
-
     message.message.asndata = rmrMessageBuffer.rcvMessage->payload;
     message.message.asnLength = rmrMessageBuffer.rcvMessage->len;
 
@@ -1234,7 +1303,10 @@ static void buildAndsendSetupRequest(ReportingMessages_t &message,
     }
     // encode to xml
 
-    auto res = buildXmlData(messageName, ieName, functionsToAdd_v, functionsToModified_v, buffer, (size_t) er.encoded);
+    string res {};
+    if (!functionsToAdd_v.empty() || !functionsToModified_v.empty()) {
+        res = buildXmlData(messageName, ieName, functionsToAdd_v, functionsToModified_v, buffer, (size_t) er.encoded);
+    }
     rmr_mbuf_t *rmrMsg;
     if (res.length() == 0) {
         rmrMsg = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, buffer_size + 256);
@@ -1391,7 +1463,7 @@ int collectServiceUpdate_RequestData(E2AP_PDU_t *pdu,
 
 
 
-void buildPromethuslist(ConnectedCU_t *peerInfo, Family<Counter> *prometheusFamily) {
+void buildPrometheuslist(ConnectedCU_t *peerInfo, Family<Counter> *prometheusFamily) {
     peerInfo->counters[IN_INITI][MSG_COUNTER][(ProcedureCode_id_E2setup - 1)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"SetupRequest", "Messages"}});
     peerInfo->counters[IN_INITI][BYTES_COUNTER][(ProcedureCode_id_E2setup - 1)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"SetupRequest", "Bytes"}});
 
@@ -1489,7 +1561,6 @@ int collectSetupRequestData(E2AP_PDU_t *pdu,
 
                 memcpy(message.message.enodbName, message.peerInfo->enodbName, strlen(message.peerInfo->enodbName));
                 sctpMap->setkey(message.message.enodbName, message.peerInfo);
-                buildPromethuslist(message.peerInfo, message.peerInfo->sctpParams->prometheusFamily);
             }
         } else if (ie->id == ProtocolIE_ID_id_RANfunctionsAdded) {
             if (ie->value.present == E2setupRequestIEs__value_PR_RANfunctions_List) {
@@ -1574,9 +1645,13 @@ void asnInitiatingRequest(E2AP_PDU_t *pdu,
                 break;
             }
 
+            buildPrometheuslist(message.peerInfo, message.peerInfo->sctpParams->prometheusFamily);
+
             string messageName("E2setupRequest");
             string ieName("E2setupRequestIEs");
             message.message.messageType = RIC_E2_SETUP_REQ;
+            message.peerInfo->counters[IN_INITI][MSG_COUNTER][ProcedureCode_id_E2setup - 1]->Increment();
+            message.peerInfo->counters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_E2setup - 1]->Increment((double)rmrMessageBuffer.rcvMessage->len);
             buildAndsendSetupRequest(message, rmrMessageBuffer, pdu, messageName, ieName, RANfunctionsAdded_v, RANfunctionsModified_v);
             break;
         }
@@ -1596,6 +1671,9 @@ void asnInitiatingRequest(E2AP_PDU_t *pdu,
             string messageName("RICserviceUpdate");
             string ieName("RICserviceUpdateIEs");
             message.message.messageType = RIC_SERVICE_UPDATE;
+            message.peerInfo->counters[IN_INITI][MSG_COUNTER][ProcedureCode_id_RICserviceUpdate - 1]->Increment();
+            message.peerInfo->counters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_RICserviceUpdate - 1]->Increment((double)rmrMessageBuffer.rcvMessage->len);
+
             buildAndsendSetupRequest(message, rmrMessageBuffer, pdu, messageName, ieName, RANfunctionsAdded_v, RANfunctionsModified_v);
             break;
         }
@@ -1603,6 +1681,8 @@ void asnInitiatingRequest(E2AP_PDU_t *pdu,
             if (logLevel >= MDCLOG_DEBUG) {
                 mdclog_write(MDCLOG_DEBUG, "Got ErrorIndication %s", message.message.enodbName);
             }
+            message.peerInfo->counters[IN_INITI][MSG_COUNTER][ProcedureCode_id_ErrorIndication - 1]->Increment();
+            message.peerInfo->counters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_ErrorIndication - 1]->Increment((double)rmrMessageBuffer.rcvMessage->len);
             if (sendRequestToXapp(message, RIC_ERROR_INDICATION, rmrMessageBuffer) != 0) {
                 mdclog_write(MDCLOG_ERR, "RIC_ERROR_INDICATION failed to send to xAPP");
             }
@@ -1613,6 +1693,8 @@ void asnInitiatingRequest(E2AP_PDU_t *pdu,
                 mdclog_write(MDCLOG_DEBUG, "Got Reset %s", message.message.enodbName);
             }
 
+            message.peerInfo->counters[IN_INITI][MSG_COUNTER][ProcedureCode_id_Reset - 1]->Increment();
+            message.peerInfo->counters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_Reset - 1]->Increment((double)rmrMessageBuffer.rcvMessage->len);
             if (XML_From_PER(message, rmrMessageBuffer) < 0) {
                 break;
             }
@@ -1622,12 +1704,6 @@ void asnInitiatingRequest(E2AP_PDU_t *pdu,
             }
             break;
         }
-        case ProcedureCode_id_RICcontrol: {
-            if (logLevel >= MDCLOG_DEBUG) {
-                mdclog_write(MDCLOG_DEBUG, "Got RICcontrol %s", message.message.enodbName);
-            }
-            break;
-        }
         case ProcedureCode_id_RICindication: {
             if (logLevel >= MDCLOG_DEBUG) {
                 mdclog_write(MDCLOG_DEBUG, "Got RICindication %s", message.message.enodbName);
@@ -1661,6 +1737,8 @@ void asnInitiatingRequest(E2AP_PDU_t *pdu,
                                          ie->value.choice.RICrequestID.ricInstanceID,
                                          ie->value.choice.RICrequestID.ricRequestorID);
                         }
+                        message.peerInfo->counters[IN_INITI][MSG_COUNTER][ProcedureCode_id_RICindication - 1]->Increment();
+                        message.peerInfo->counters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_RICindication - 1]->Increment((double)rmrMessageBuffer.rcvMessage->len);
                         sendRmrMessage(rmrMessageBuffer, message);
                         messageSent = true;
                     } else {
@@ -1673,24 +1751,6 @@ void asnInitiatingRequest(E2AP_PDU_t *pdu,
             }
             break;
         }
-        case ProcedureCode_id_RICserviceQuery: {
-            if (logLevel >= MDCLOG_DEBUG) {
-                mdclog_write(MDCLOG_DEBUG, "Got RICserviceQuery %s", message.message.enodbName);
-            }
-            break;
-        }
-        case ProcedureCode_id_RICsubscription: {
-            if (logLevel >= MDCLOG_DEBUG) {
-                mdclog_write(MDCLOG_DEBUG, "Got RICsubscription %s", message.message.enodbName);
-            }
-            break;
-        }
-        case ProcedureCode_id_RICsubscriptionDelete: {
-            if (logLevel >= MDCLOG_DEBUG) {
-                mdclog_write(MDCLOG_DEBUG, "Got RICsubscriptionDelete %s", message.message.enodbName);
-            }
-            break;
-        }
         default: {
             mdclog_write(MDCLOG_ERR, "Undefined or not supported message = %ld", procedureCode);
             message.message.messageType = 0; // no RMR message type yet
@@ -1718,25 +1778,12 @@ void asnSuccsesfulMsg(E2AP_PDU_t *pdu,
         mdclog_write(MDCLOG_INFO, "Successful Outcome %ld", procedureCode);
     }
     switch (procedureCode) {
-        case ProcedureCode_id_E2setup: {
-            if (logLevel >= MDCLOG_DEBUG) {
-                mdclog_write(MDCLOG_DEBUG, "Got E2setup\n");
-            }
-            break;
-        }
-        case ProcedureCode_id_ErrorIndication: {
-            if (logLevel >= MDCLOG_DEBUG) {
-                mdclog_write(MDCLOG_DEBUG, "Got ErrorIndication %s", message.message.enodbName);
-            }
-            if (sendRequestToXapp(message, RIC_ERROR_INDICATION, rmrMessageBuffer) != 0) {
-                mdclog_write(MDCLOG_ERR, "RIC_ERROR_INDICATION failed to send to xAPP");
-            }
-            break;
-        }
         case ProcedureCode_id_Reset: {
             if (logLevel >= MDCLOG_DEBUG) {
                 mdclog_write(MDCLOG_DEBUG, "Got Reset %s", message.message.enodbName);
             }
+            message.peerInfo->counters[IN_SUCC][MSG_COUNTER][ProcedureCode_id_Reset - 1]->Increment();
+            message.peerInfo->counters[IN_SUCC][BYTES_COUNTER][ProcedureCode_id_Reset - 1]->Increment((double)rmrMessageBuffer.rcvMessage->len);
             if (XML_From_PER(message, rmrMessageBuffer) < 0) {
                 break;
             }
@@ -1773,6 +1820,8 @@ void asnSuccsesfulMsg(E2AP_PDU_t *pdu,
                                        (unsigned char *)message.message.enodbName,
                                        strlen(message.message.enodbName));
 
+                        message.peerInfo->counters[IN_SUCC][MSG_COUNTER][ProcedureCode_id_RICcontrol - 1]->Increment();
+                        message.peerInfo->counters[IN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICcontrol - 1]->Increment((double)rmrMessageBuffer.rcvMessage->len);
                         sendRmrMessage(rmrMessageBuffer, message);
                         messageSent = true;
                     } else {
@@ -1786,66 +1835,12 @@ void asnSuccsesfulMsg(E2AP_PDU_t *pdu,
 
             break;
         }
-        case ProcedureCode_id_RICindication: {
-            if (logLevel >= MDCLOG_DEBUG) {
-                mdclog_write(MDCLOG_DEBUG, "Got RICindication %s", message.message.enodbName);
-            }
-            for (auto i = 0; i < pdu->choice.initiatingMessage->value.choice.RICindication.protocolIEs.list.count; i++) {
-                auto messageSent = false;
-                RICindication_IEs_t *ie = pdu->choice.initiatingMessage->value.choice.RICindication.protocolIEs.list.array[i];
-                if (logLevel >= MDCLOG_DEBUG) {
-                    mdclog_write(MDCLOG_DEBUG, "ie type (ProtocolIE_ID) = %ld", ie->id);
-                }
-                if (ie->id == ProtocolIE_ID_id_RICrequestID) {
-                    if (logLevel >= MDCLOG_DEBUG) {
-                        mdclog_write(MDCLOG_DEBUG, "Got RIC requestId entry, ie type (ProtocolIE_ID) = %ld", ie->id);
-                    }
-                    if (ie->value.present == RICindication_IEs__value_PR_RICrequestID) {
-                        static unsigned char tx[32];
-                        message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_INDICATION;
-                        snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
-                        rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, strlen((const char *) tx));
-                        rmr_bytes2meid(rmrMessageBuffer.sendMessage,
-                                       (unsigned char *)message.message.enodbName,
-                                       strlen(message.message.enodbName));
-                        rmrMessageBuffer.sendMessage->state = 0;
-                        rmrMessageBuffer.sendMessage->sub_id = (int)ie->value.choice.RICrequestID.ricInstanceID;
-                        if (mdclog_level_get() >= MDCLOG_DEBUG) {
-                            mdclog_write(MDCLOG_DEBUG, "RIC sub id = %d, message type = %d",
-                                         rmrMessageBuffer.sendMessage->sub_id,
-                                         rmrMessageBuffer.sendMessage->mtype);
-                        }
-                        sendRmrMessage(rmrMessageBuffer, message);
-                        messageSent = true;
-                    } else {
-                        mdclog_write(MDCLOG_ERR, "RIC request id missing illigal request");
-                    }
-                }
-                if (messageSent) {
-                    break;
-                }
-            }
-            break;
-        }
-        case ProcedureCode_id_RICserviceQuery: {
-            if (logLevel >= MDCLOG_DEBUG) {
-                mdclog_write(MDCLOG_DEBUG, "Got RICserviceQuery %s", message.message.enodbName);
-            }
-            break;
-        }
-        case ProcedureCode_id_RICserviceUpdate: {
-            if (logLevel >= MDCLOG_DEBUG) {
-                mdclog_write(MDCLOG_DEBUG, "Got RICserviceUpdate %s", message.message.enodbName);
-            }
-            if (sendRequestToXapp(message, RIC_SERVICE_UPDATE, rmrMessageBuffer) != 0) {
-                mdclog_write(MDCLOG_ERR, "RIC_SERVICE_UPDATE message failed to send to xAPP");
-            }
-            break;
-        }
         case ProcedureCode_id_RICsubscription: {
             if (logLevel >= MDCLOG_DEBUG) {
                 mdclog_write(MDCLOG_DEBUG, "Got RICsubscription %s", message.message.enodbName);
             }
+            message.peerInfo->counters[IN_SUCC][MSG_COUNTER][ProcedureCode_id_RICsubscription - 1]->Increment();
+            message.peerInfo->counters[IN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICsubscription - 1]->Increment((double)rmrMessageBuffer.rcvMessage->len);
             if (sendRequestToXapp(message, RIC_SUB_RESP, rmrMessageBuffer) != 0) {
                 mdclog_write(MDCLOG_ERR, "Subscription successful message failed to send to xAPP");
             }
@@ -1855,6 +1850,8 @@ void asnSuccsesfulMsg(E2AP_PDU_t *pdu,
             if (logLevel >= MDCLOG_DEBUG) {
                 mdclog_write(MDCLOG_DEBUG, "Got RICsubscriptionDelete %s", message.message.enodbName);
             }
+            message.peerInfo->counters[IN_SUCC][MSG_COUNTER][ProcedureCode_id_RICsubscriptionDelete - 1]->Increment();
+            message.peerInfo->counters[IN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICsubscriptionDelete - 1]->Increment((double)rmrMessageBuffer.rcvMessage->len);
             if (sendRequestToXapp(message, RIC_SUB_DEL_RESP, rmrMessageBuffer) != 0) {
                 mdclog_write(MDCLOG_ERR, "Subscription delete successful message failed to send to xAPP");
             }
@@ -1886,25 +1883,6 @@ void asnUnSuccsesfulMsg(E2AP_PDU_t *pdu,
         mdclog_write(MDCLOG_INFO, "Unsuccessful Outcome %ld", procedureCode);
     }
     switch (procedureCode) {
-        case ProcedureCode_id_E2setup: {
-            if (logLevel >= MDCLOG_DEBUG) {
-                mdclog_write(MDCLOG_DEBUG, "Got E2setup\n");
-            }
-            break;
-        }
-        case ProcedureCode_id_ErrorIndication: {
-            if (logLevel >= MDCLOG_DEBUG) {
-                mdclog_write(MDCLOG_DEBUG, "Got ErrorIndication %s", message.message.enodbName);
-            }
-            if (sendRequestToXapp(message, RIC_ERROR_INDICATION, rmrMessageBuffer) != 0) {
-                mdclog_write(MDCLOG_ERR, "RIC_ERROR_INDICATION failed to send to xAPP");
-            }
-            break;
-        }
-        case ProcedureCode_id_Reset: {
-            mdclog_write(MDCLOG_ERR, "Got Reset for %s, Protocol ERROR", message.message.enodbName);
-            break;
-        }
         case ProcedureCode_id_RICcontrol: {
             if (logLevel >= MDCLOG_DEBUG) {
                 mdclog_write(MDCLOG_DEBUG, "Got RICcontrol %s", message.message.enodbName);
@@ -1930,6 +1908,8 @@ void asnUnSuccsesfulMsg(E2AP_PDU_t *pdu,
                         rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, strlen((const char *) tx));
                         rmr_bytes2meid(rmrMessageBuffer.sendMessage, (unsigned char *) message.message.enodbName,
                                        strlen(message.message.enodbName));
+                        message.peerInfo->counters[IN_UN_SUCC][MSG_COUNTER][ProcedureCode_id_RICcontrol - 1]->Increment();
+                        message.peerInfo->counters[IN_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICcontrol - 1]->Increment((double)rmrMessageBuffer.rcvMessage->len);
                         sendRmrMessage(rmrMessageBuffer, message);
                         messageSent = true;
                     } else {
@@ -1942,67 +1922,12 @@ void asnUnSuccsesfulMsg(E2AP_PDU_t *pdu,
             }
             break;
         }
-        case ProcedureCode_id_RICindication: {
-            if (logLevel >= MDCLOG_DEBUG) {
-                mdclog_write(MDCLOG_DEBUG, "Got RICindication %s", message.message.enodbName);
-            }
-            for (auto i = 0; i < pdu->choice.initiatingMessage->value.choice.RICindication.protocolIEs.list.count; i++) {
-                auto messageSent = false;
-                RICindication_IEs_t *ie = pdu->choice.initiatingMessage->value.choice.RICindication.protocolIEs.list.array[i];
-                if (logLevel >= MDCLOG_DEBUG) {
-                    mdclog_write(MDCLOG_DEBUG, "ie type (ProtocolIE_ID) = %ld", ie->id);
-                }
-                if (ie->id == ProtocolIE_ID_id_RICrequestID) {
-                    if (logLevel >= MDCLOG_DEBUG) {
-                        mdclog_write(MDCLOG_DEBUG, "Got RIC requestId entry, ie type (ProtocolIE_ID) = %ld", ie->id);
-                    }
-                    if (ie->value.present == RICindication_IEs__value_PR_RICrequestID) {
-                        static unsigned char tx[32];
-                        message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_INDICATION;
-                        snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++);
-                        rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, strlen((const char *) tx));
-                        rmr_bytes2meid(rmrMessageBuffer.sendMessage,
-                                       (unsigned char *)message.message.enodbName,
-                                       strlen(message.message.enodbName));
-                        rmrMessageBuffer.sendMessage->state = 0;
-//                        rmrMessageBuffer.sendMessage->sub_id = (int)ie->value.choice.RICrequestID.ricRequestorID;
-                        rmrMessageBuffer.sendMessage->sub_id = (int)ie->value.choice.RICrequestID.ricInstanceID;
-                        if (mdclog_level_get() >= MDCLOG_DEBUG) {
-                            mdclog_write(MDCLOG_DEBUG, "RIC sub id = %d, message type = %d",
-                                         rmrMessageBuffer.sendMessage->sub_id,
-                                         rmrMessageBuffer.sendMessage->mtype);
-                        }
-                        sendRmrMessage(rmrMessageBuffer, message);
-                        messageSent = true;
-                    } else {
-                        mdclog_write(MDCLOG_ERR, "RIC request id missing illigal request");
-                    }
-                }
-                if (messageSent) {
-                    break;
-                }
-            }
-            break;
-        }
-        case ProcedureCode_id_RICserviceQuery: {
-            if (logLevel >= MDCLOG_DEBUG) {
-                mdclog_write(MDCLOG_DEBUG, "Got RICserviceQuery %s", message.message.enodbName);
-            }
-            break;
-        }
-        case ProcedureCode_id_RICserviceUpdate: {
-            if (logLevel >= MDCLOG_DEBUG) {
-                mdclog_write(MDCLOG_DEBUG, "Got RICserviceUpdate %s", message.message.enodbName);
-            }
-            if (sendRequestToXapp(message, RIC_SERVICE_UPDATE, rmrMessageBuffer) != 0) {
-                mdclog_write(MDCLOG_ERR, "RIC_SERVICE_UPDATE message failed to send to xAPP");
-            }
-            break;
-        }
         case ProcedureCode_id_RICsubscription: {
             if (logLevel >= MDCLOG_DEBUG) {
                 mdclog_write(MDCLOG_DEBUG, "Got RICsubscription %s", message.message.enodbName);
             }
+            message.peerInfo->counters[IN_UN_SUCC][MSG_COUNTER][ProcedureCode_id_RICsubscription - 1]->Increment();
+            message.peerInfo->counters[IN_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICsubscription - 1]->Increment((double)rmrMessageBuffer.rcvMessage->len);
             if (sendRequestToXapp(message, RIC_SUB_FAILURE, rmrMessageBuffer) != 0) {
                 mdclog_write(MDCLOG_ERR, "Subscription unsuccessful message failed to send to xAPP");
             }
@@ -2012,7 +1937,9 @@ void asnUnSuccsesfulMsg(E2AP_PDU_t *pdu,
             if (logLevel >= MDCLOG_DEBUG) {
                 mdclog_write(MDCLOG_DEBUG, "Got RICsubscriptionDelete %s", message.message.enodbName);
             }
-            if (sendRequestToXapp(message, RIC_SUB_DEL_FAILURE, rmrMessageBuffer) != 0) {
+            message.peerInfo->counters[IN_UN_SUCC][MSG_COUNTER][ProcedureCode_id_RICsubscriptionDelete - 1]->Increment();
+            message.peerInfo->counters[IN_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICsubscriptionDelete - 1]->Increment((double)rmrMessageBuffer.rcvMessage->len);
+            if (sendRequestToXapp(message, RIC_SUB_FAILURE, rmrMessageBuffer) != 0) {
                 mdclog_write(MDCLOG_ERR, "Subscription Delete unsuccessful message failed to send to xAPP");
             }
             break;
@@ -2175,12 +2102,19 @@ int receiveXappMessages(Sctp_Map_t *sctpMap,
         return -1;
     }
     rmr_get_meid(rmrMessageBuffer.rcvMessage, (unsigned char *)message.message.enodbName);
+    message.peerInfo = (ConnectedCU_t *) sctpMap->find(message.message.enodbName);
+    if (message.peerInfo == nullptr) {
+         mdclog_write(MDCLOG_ERR, "Failed to send message no CU entry %s", message.message.enodbName);
+        return -1;
+    }
+
     switch (rmrMessageBuffer.rcvMessage->mtype) {
         case RIC_E2_SETUP_RESP : {
             if (PER_FromXML(message, rmrMessageBuffer) != 0) {
                 break;
             }
-
+            message.peerInfo->counters[OUT_SUCC][MSG_COUNTER][ProcedureCode_id_E2setup - 1]->Increment();
+            message.peerInfo->counters[OUT_SUCC][BYTES_COUNTER][ProcedureCode_id_E2setup - 1]->Increment(rmrMessageBuffer.rcvMessage->len);
             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_E2_SETUP_RESP");
                 return -6;
@@ -2191,6 +2125,8 @@ int receiveXappMessages(Sctp_Map_t *sctpMap,
             if (PER_FromXML(message, rmrMessageBuffer) != 0) {
                 break;
             }
+            message.peerInfo->counters[OUT_UN_SUCC][MSG_COUNTER][ProcedureCode_id_E2setup - 1]->Increment();
+            message.peerInfo->counters[OUT_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_E2setup - 1]->Increment(rmrMessageBuffer.rcvMessage->len);
             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_E2_SETUP_FAILURE");
                 return -6;
@@ -2198,6 +2134,8 @@ int receiveXappMessages(Sctp_Map_t *sctpMap,
             break;
         }
         case RIC_ERROR_INDICATION: {
+            message.peerInfo->counters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_ErrorIndication - 1]->Increment();
+            message.peerInfo->counters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_ErrorIndication - 1]->Increment(rmrMessageBuffer.rcvMessage->len);
             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_ERROR_INDICATION");
                 return -6;
@@ -2205,6 +2143,8 @@ int receiveXappMessages(Sctp_Map_t *sctpMap,
             break;
         }
         case RIC_SUB_REQ: {
+            message.peerInfo->counters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_RICsubscription - 1]->Increment();
+            message.peerInfo->counters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_RICsubscription - 1]->Increment(rmrMessageBuffer.rcvMessage->len);
             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SUB_REQ");
                 return -6;
@@ -2212,6 +2152,8 @@ int receiveXappMessages(Sctp_Map_t *sctpMap,
             break;
         }
         case RIC_SUB_DEL_REQ: {
+            message.peerInfo->counters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_RICsubscriptionDelete - 1]->Increment();
+            message.peerInfo->counters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_RICsubscriptionDelete - 1]->Increment(rmrMessageBuffer.rcvMessage->len);
             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SUB_DEL_REQ");
                 return -6;
@@ -2219,6 +2161,8 @@ int receiveXappMessages(Sctp_Map_t *sctpMap,
             break;
         }
         case RIC_CONTROL_REQ: {
+            message.peerInfo->counters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_RICcontrol - 1]->Increment();
+            message.peerInfo->counters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_RICcontrol - 1]->Increment(rmrMessageBuffer.rcvMessage->len);
             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_CONTROL_REQ");
                 return -6;
@@ -2229,6 +2173,8 @@ int receiveXappMessages(Sctp_Map_t *sctpMap,
             if (PER_FromXML(message, rmrMessageBuffer) != 0) {
                 break;
             }
+            message.peerInfo->counters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_RICserviceQuery - 1]->Increment();
+            message.peerInfo->counters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_RICserviceQuery - 1]->Increment(rmrMessageBuffer.rcvMessage->len);
             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SERVICE_QUERY");
                 return -6;
@@ -2239,6 +2185,8 @@ int receiveXappMessages(Sctp_Map_t *sctpMap,
             if (PER_FromXML(message, rmrMessageBuffer) != 0) {
                 break;
             }
+            message.peerInfo->counters[OUT_SUCC][MSG_COUNTER][ProcedureCode_id_RICserviceUpdate - 1]->Increment();
+            message.peerInfo->counters[OUT_SUCC][BYTES_COUNTER][ProcedureCode_id_RICserviceQuery - 1]->Increment(rmrMessageBuffer.rcvMessage->len);
             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SERVICE_UPDATE_ACK");
                 return -6;
@@ -2249,6 +2197,8 @@ int receiveXappMessages(Sctp_Map_t *sctpMap,
             if (PER_FromXML(message, rmrMessageBuffer) != 0) {
                 break;
             }
+            message.peerInfo->counters[OUT_UN_SUCC][MSG_COUNTER][ProcedureCode_id_RICserviceUpdate - 1]->Increment();
+            message.peerInfo->counters[OUT_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICserviceQuery - 1]->Increment(rmrMessageBuffer.rcvMessage->len);
             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SERVICE_UPDATE_FAILURE");
                 return -6;
@@ -2259,6 +2209,8 @@ int receiveXappMessages(Sctp_Map_t *sctpMap,
             if (PER_FromXML(message, rmrMessageBuffer) != 0) {
                 break;
             }
+            message.peerInfo->counters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_Reset - 1]->Increment();
+            message.peerInfo->counters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_Reset - 1]->Increment(rmrMessageBuffer.rcvMessage->len);
             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_E2_RESET");
                 return -6;
@@ -2269,6 +2221,8 @@ int receiveXappMessages(Sctp_Map_t *sctpMap,
             if (PER_FromXML(message, rmrMessageBuffer) != 0) {
                 break;
             }
+            message.peerInfo->counters[OUT_SUCC][MSG_COUNTER][ProcedureCode_id_Reset - 1]->Increment();
+            message.peerInfo->counters[OUT_SUCC][BYTES_COUNTER][ProcedureCode_id_Reset - 1]->Increment(rmrMessageBuffer.rcvMessage->len);
             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_E2_RESET_RESP");
                 return -6;
@@ -2410,19 +2364,9 @@ int sendMessagetoCu(Sctp_Map_t *sctpMap,
                     RmrMessagesBuffer_t &messageBuffer,
                     ReportingMessages_t &message,
                     int failedMesgId) {
-    auto *peerInfo = (ConnectedCU_t *) sctpMap->find(message.message.enodbName);
-    if (peerInfo == nullptr) {
-        if (failedMesgId != 0) {
-            sendFailedSendingMessagetoXapp(messageBuffer, message, failedMesgId);
-        } else {
-            mdclog_write(MDCLOG_ERR, "Failed to send message no CU entry %s", message.message.enodbName);
-        }
-        return -1;
-    }
-
     // get the FD
     message.message.messageType = messageBuffer.rcvMessage->mtype;
-    auto rc = sendSctpMsg(peerInfo, message, sctpMap);
+    auto rc = sendSctpMsg(message.peerInfo, message, sctpMap);
     return rc;
 }
 
index 8b40784..2e286f8 100644 (file)
@@ -50,6 +50,7 @@
 #include <map>
 #include <sys/inotify.h>
 #include <csignal>
+#include <future>
 
 #include <rmr/rmr.h>
 #include <rmr/RIC_message_types.h>
@@ -86,6 +87,7 @@
 #include <zlib.h>
 #include <prometheus/counter.h>
 #include <prometheus/exposer.h>
+#include <prometheus/gateway.h>
 #include <prometheus/registry.h>
 
 using namespace prometheus;
@@ -121,6 +123,7 @@ typedef mapWrapper Sctp_Map_t;
 #define KA_MESSAGE_SIZE 2048
 
 typedef struct sctp_params {
+    int      epollTimeOut = -1;
     uint16_t rmrPort = 0;
     uint16_t sctpPort = SRC_PORT;
     int      epoll_fd = 0;
@@ -141,10 +144,13 @@ typedef struct sctp_params {
     string configFilePath {};
     string configFileName {};
     bool trace = true;
-    shared_ptr<prometheus::Registry> promteheusRegistry;
+    string prometheusMode {"pull"};
+    string prometheusPushAddress {"127.0.0.1:7676"};
+    shared_ptr<prometheus::Registry> prometheusRegistry;
     string prometheusPort {"8088"};
     Family<Counter> *prometheusFamily;
-    //shared_timed_mutex fence; // moved to mapWrapper
+    Gateway *prometheusGateway = nullptr;
+    Exposer *prometheusExposer = nullptr;
 } sctp_params_t;
 
 // RAN to RIC