From 96f712b813b81fe04bcf8adf1e2fb9c49b35e551 Mon Sep 17 00:00:00 2001 From: "aa7133@att.com" Date: Mon, 1 Jun 2020 17:19:20 +0300 Subject: [PATCH] 5.0.1 Replace the Prometheus library with the new one Change in the API remove one parameter from the Exposer constructor. Add push capability to Prometheus add all statistic counters Change-Id: Idd8aa8258e4aedbb2a04c89dfac7ac1c1263774a Signed-off-by: aa7133@att.com --- RIC-E2-TERMINATION/CMakeLists.txt | 2 + RIC-E2-TERMINATION/config/config.conf | 5 + RIC-E2-TERMINATION/container-tag.yaml | 2 +- RIC-E2-TERMINATION/sctpThread.cpp | 346 ++++++++++++++-------------------- RIC-E2-TERMINATION/sctpThread.h | 10 +- 5 files changed, 161 insertions(+), 204 deletions(-) diff --git a/RIC-E2-TERMINATION/CMakeLists.txt b/RIC-E2-TERMINATION/CMakeLists.txt index 398fd6d..fbb9a12 100644 --- a/RIC-E2-TERMINATION/CMakeLists.txt +++ b/RIC-E2-TERMINATION/CMakeLists.txt @@ -103,6 +103,7 @@ link_libraries(nsl cgreen prometheus-cpp-core prometheus-cpp-pull + prometheus-cpp-push z curl pthread) @@ -126,6 +127,7 @@ target_link_libraries(e2 libicuuc.a) target_link_libraries(e2 libicudata.a) target_link_libraries(e2 prometheus-cpp-core.a) target_link_libraries(e2 prometheus-cpp-pull.a) +target_link_libraries(e2 prometheus-cpp-push.a) #target_link_libraries(e2 libnng.a) diff --git a/RIC-E2-TERMINATION/config/config.conf b/RIC-E2-TERMINATION/config/config.conf index 86d1180..d6f5008 100644 --- a/RIC-E2-TERMINATION/config/config.conf +++ b/RIC-E2-TERMINATION/config/config.conf @@ -4,6 +4,11 @@ volume=log #the key name of the environment holds the local ip address #ip address of the E2T in the RMR local-ip=127.0.0.1 +#prometheus mode can be pull or push +prometheusMode=pull +#timeout can be from 5 seconds to 300 seconds default is 10 +prometheusPushTimeOut=10 +prometheusPushAddr=127.0.0.1:7676 prometheusPort=8088 #trace is start, stop trace=start diff --git a/RIC-E2-TERMINATION/container-tag.yaml b/RIC-E2-TERMINATION/container-tag.yaml index adeeeda..6a6c6f6 100644 --- a/RIC-E2-TERMINATION/container-tag.yaml +++ b/RIC-E2-TERMINATION/container-tag.yaml @@ -1,3 +1,3 @@ # The Jenkins job requires a tag to build the Docker image. # Global-JJB script assumes this file is in the repo root. -tag: 5.0.0 +tag: 5.0.1 diff --git a/RIC-E2-TERMINATION/sctpThread.cpp b/RIC-E2-TERMINATION/sctpThread.cpp index 49a4c51..62f42a3 100644 --- a/RIC-E2-TERMINATION/sctpThread.cpp +++ b/RIC-E2-TERMINATION/sctpThread.cpp @@ -239,6 +239,25 @@ int buildConfiguration(sctp_params_t &sctpParams) { } jsonTrace = sctpParams.trace; + tmpStr = conf.getStringValue("prometheusMode"); + transform(tmpStr.begin(), tmpStr.end(), tmpStr.begin(), ::tolower); + if (tmpStr.length() != 0) { + if (tmpStr.compare("push")) { + sctpParams.prometheusPushAddress = tmpStr; + auto timeout = conf.getIntValue("prometheusPushTimeOut"); + if (timeout >= 5 && timeout <= 300) { + sctpParams.epollTimeOut = timeout; + } else { + sctpParams.epollTimeOut = 10; + } + } + } + + tmpStr = conf.getStringValue("prometheusPushAddr"); + if (tmpStr.length() != 0) { + sctpParams.prometheusMode = tmpStr; + } + tmpStr = conf.getStringValue("prometheusPort"); if (tmpStr.length() != 0) { sctpParams.prometheusPort = tmpStr; @@ -289,6 +308,15 @@ int buildConfiguration(sctp_params_t &sctpParams) { return 0; } +static std::string GetHostName() { + char hostname[1024]; + + if (::gethostname(hostname, sizeof(hostname))) { + return {}; + } + return hostname; +} + int main(const int argc, char **argv) { @@ -333,17 +361,15 @@ int main(const int argc, char **argv) { } //auto registry = std::make_shared(); - sctpParams.promteheusRegistry = std::make_shared(); + sctpParams.prometheusRegistry = std::make_shared(); - //sctpParams.prometheusFamily = new Family("E2T", "E2T message counter", {{"E", sctpParams.podName}}); + //sctpParams.promtheusFamily = new Family("E2T", "E2T message counter", {{"E", sctpParams.podName}}); sctpParams.prometheusFamily = &BuildCounter() .Name("E2T") .Help("E2T message counter") .Labels({{"E", sctpParams.podName}}) - .Register(*sctpParams.promteheusRegistry); - - Exposer exposer{sctpParams.myIP + ":" + sctpParams.prometheusPort, "/metrics", 1}; + .Register(*sctpParams.prometheusRegistry); // start epoll @@ -378,7 +404,26 @@ int main(const int argc, char **argv) { std::vector threads(num_cpus); // std::vector threads; - exposer.RegisterCollectable(sctpParams.promteheusRegistry); + if (sctpParams.prometheusMode.compare("pull") == 0) { + sctpParams.prometheusExposer = new Exposer(sctpParams.myIP + ":" + sctpParams.prometheusPort, 1); + sctpParams.prometheusExposer->RegisterCollectable(sctpParams.prometheusRegistry); + } else if (sctpParams.prometheusMode.compare("push") == 0) { + const auto labels = Gateway::GetInstanceLabel(GetHostName()); + string address {}; + string port {}; + char ch = ':'; + auto found = sctpParams.prometheusPushAddress.find_last_of(ch); + // If string doesn't have + // character ch present in it + if (found != string::npos) { + address = sctpParams.prometheusPushAddress.substr(0,found); + port = sctpParams.prometheusPushAddress.substr(found + 1); + sctpParams.prometheusGateway = new Gateway(address, port, "E2T", labels); + sctpParams.prometheusGateway->RegisterCollectable(sctpParams.prometheusRegistry); + } else { + mdclog_write(MDCLOG_ERR, "failed to build Prometheus gateway no stats will be sent"); + } + } num_cpus = 1; for (unsigned int i = 0; i < num_cpus; i++) { @@ -574,21 +619,39 @@ void listener(sctp_params_t *params) { // rmrMessageBuffer.sendBufferedMessages[i] = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE); // } + bool gatewayflag = false; while (true) { + future gateWay; + if (mdclog_level_get() >= MDCLOG_DEBUG) { mdclog_write(MDCLOG_DEBUG, "Start EPOLL Wait"); } - auto numOfEvents = epoll_wait(params->epoll_fd, events, MAXEVENTS, -1); - if (numOfEvents < 0 && errno == EINTR) { - if (mdclog_level_get() >= MDCLOG_DEBUG) { - mdclog_write(MDCLOG_DEBUG, "got EINTR : %s", strerror(errno)); + auto numOfEvents = epoll_wait(params->epoll_fd, events, MAXEVENTS, params->epollTimeOut); + if (numOfEvents == 0) { + if (params->prometheusGateway != nullptr) { + gateWay = params->prometheusGateway->AsyncPush(); + gatewayflag = true; } continue; - } - if (numOfEvents < 0) { + } else if (numOfEvents < 0) { + if (errno == EINTR) { + if (mdclog_level_get() >= MDCLOG_DEBUG) { + mdclog_write(MDCLOG_DEBUG, "got EINTR : %s", strerror(errno)); + } + continue; + } mdclog_write(MDCLOG_ERR, "Epoll wait failed, errno = %s", strerror(errno)); return; } + if (gatewayflag) { + gatewayflag = false; + auto rc = gateWay.get(); + if (rc != 200) { + mdclog_write(MDCLOG_ERR, "Async Send to Promethues faild with Return Code %d", rc); + } else if (mdclog_level_get() >= MDCLOG_DEBUG) { + mdclog_write(MDCLOG_DEBUG, "Stats sent to Prometheus"); + } + } for (auto i = 0; i < numOfEvents; i++) { if (mdclog_level_get() >= MDCLOG_DEBUG) { mdclog_write(MDCLOG_DEBUG, "handling epoll event %d out of %d", i + 1, numOfEvents); @@ -806,6 +869,14 @@ void handleConfigChange(sctp_params_t *sctpParams) { sctpParams->trace = false; } jsonTrace = sctpParams->trace; + + auto timeout = conf.getIntValue("prometheusPushTimeOut"); + if (timeout >= 5 && timeout <= 300) { + sctpParams->epollTimeOut = timeout; + } else { + mdclog_write(MDCLOG_ERR, "prometheusPushTimeOut set wrong value %d, values are [5..300]", timeout); + } + endlessLoop = false; } } @@ -1018,8 +1089,6 @@ int sendSctpMsg(ConnectedCU_t *peerInfo, ReportingMessages_t &message, Sctp_Map_ * @param rmrMessageBuffer */ void getRequestMetaData(ReportingMessages_t &message, RmrMessagesBuffer_t &rmrMessageBuffer) { - rmr_get_meid(rmrMessageBuffer.rcvMessage, (unsigned char *) (message.message.enodbName)); - message.message.asndata = rmrMessageBuffer.rcvMessage->payload; message.message.asnLength = rmrMessageBuffer.rcvMessage->len; @@ -1234,7 +1303,10 @@ static void buildAndsendSetupRequest(ReportingMessages_t &message, } // encode to xml - auto res = buildXmlData(messageName, ieName, functionsToAdd_v, functionsToModified_v, buffer, (size_t) er.encoded); + string res {}; + if (!functionsToAdd_v.empty() || !functionsToModified_v.empty()) { + res = buildXmlData(messageName, ieName, functionsToAdd_v, functionsToModified_v, buffer, (size_t) er.encoded); + } rmr_mbuf_t *rmrMsg; if (res.length() == 0) { rmrMsg = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, buffer_size + 256); @@ -1391,7 +1463,7 @@ int collectServiceUpdate_RequestData(E2AP_PDU_t *pdu, -void buildPromethuslist(ConnectedCU_t *peerInfo, Family *prometheusFamily) { +void buildPrometheuslist(ConnectedCU_t *peerInfo, Family *prometheusFamily) { peerInfo->counters[IN_INITI][MSG_COUNTER][(ProcedureCode_id_E2setup - 1)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"SetupRequest", "Messages"}}); peerInfo->counters[IN_INITI][BYTES_COUNTER][(ProcedureCode_id_E2setup - 1)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"SetupRequest", "Bytes"}}); @@ -1489,7 +1561,6 @@ int collectSetupRequestData(E2AP_PDU_t *pdu, memcpy(message.message.enodbName, message.peerInfo->enodbName, strlen(message.peerInfo->enodbName)); sctpMap->setkey(message.message.enodbName, message.peerInfo); - buildPromethuslist(message.peerInfo, message.peerInfo->sctpParams->prometheusFamily); } } else if (ie->id == ProtocolIE_ID_id_RANfunctionsAdded) { if (ie->value.present == E2setupRequestIEs__value_PR_RANfunctions_List) { @@ -1574,9 +1645,13 @@ void asnInitiatingRequest(E2AP_PDU_t *pdu, break; } + buildPrometheuslist(message.peerInfo, message.peerInfo->sctpParams->prometheusFamily); + string messageName("E2setupRequest"); string ieName("E2setupRequestIEs"); message.message.messageType = RIC_E2_SETUP_REQ; + message.peerInfo->counters[IN_INITI][MSG_COUNTER][ProcedureCode_id_E2setup - 1]->Increment(); + message.peerInfo->counters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_E2setup - 1]->Increment((double)rmrMessageBuffer.rcvMessage->len); buildAndsendSetupRequest(message, rmrMessageBuffer, pdu, messageName, ieName, RANfunctionsAdded_v, RANfunctionsModified_v); break; } @@ -1596,6 +1671,9 @@ void asnInitiatingRequest(E2AP_PDU_t *pdu, string messageName("RICserviceUpdate"); string ieName("RICserviceUpdateIEs"); message.message.messageType = RIC_SERVICE_UPDATE; + message.peerInfo->counters[IN_INITI][MSG_COUNTER][ProcedureCode_id_RICserviceUpdate - 1]->Increment(); + message.peerInfo->counters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_RICserviceUpdate - 1]->Increment((double)rmrMessageBuffer.rcvMessage->len); + buildAndsendSetupRequest(message, rmrMessageBuffer, pdu, messageName, ieName, RANfunctionsAdded_v, RANfunctionsModified_v); break; } @@ -1603,6 +1681,8 @@ void asnInitiatingRequest(E2AP_PDU_t *pdu, if (logLevel >= MDCLOG_DEBUG) { mdclog_write(MDCLOG_DEBUG, "Got ErrorIndication %s", message.message.enodbName); } + message.peerInfo->counters[IN_INITI][MSG_COUNTER][ProcedureCode_id_ErrorIndication - 1]->Increment(); + message.peerInfo->counters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_ErrorIndication - 1]->Increment((double)rmrMessageBuffer.rcvMessage->len); if (sendRequestToXapp(message, RIC_ERROR_INDICATION, rmrMessageBuffer) != 0) { mdclog_write(MDCLOG_ERR, "RIC_ERROR_INDICATION failed to send to xAPP"); } @@ -1613,6 +1693,8 @@ void asnInitiatingRequest(E2AP_PDU_t *pdu, mdclog_write(MDCLOG_DEBUG, "Got Reset %s", message.message.enodbName); } + message.peerInfo->counters[IN_INITI][MSG_COUNTER][ProcedureCode_id_Reset - 1]->Increment(); + message.peerInfo->counters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_Reset - 1]->Increment((double)rmrMessageBuffer.rcvMessage->len); if (XML_From_PER(message, rmrMessageBuffer) < 0) { break; } @@ -1622,12 +1704,6 @@ void asnInitiatingRequest(E2AP_PDU_t *pdu, } break; } - case ProcedureCode_id_RICcontrol: { - if (logLevel >= MDCLOG_DEBUG) { - mdclog_write(MDCLOG_DEBUG, "Got RICcontrol %s", message.message.enodbName); - } - break; - } case ProcedureCode_id_RICindication: { if (logLevel >= MDCLOG_DEBUG) { mdclog_write(MDCLOG_DEBUG, "Got RICindication %s", message.message.enodbName); @@ -1661,6 +1737,8 @@ void asnInitiatingRequest(E2AP_PDU_t *pdu, ie->value.choice.RICrequestID.ricInstanceID, ie->value.choice.RICrequestID.ricRequestorID); } + message.peerInfo->counters[IN_INITI][MSG_COUNTER][ProcedureCode_id_RICindication - 1]->Increment(); + message.peerInfo->counters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_RICindication - 1]->Increment((double)rmrMessageBuffer.rcvMessage->len); sendRmrMessage(rmrMessageBuffer, message); messageSent = true; } else { @@ -1673,24 +1751,6 @@ void asnInitiatingRequest(E2AP_PDU_t *pdu, } break; } - case ProcedureCode_id_RICserviceQuery: { - if (logLevel >= MDCLOG_DEBUG) { - mdclog_write(MDCLOG_DEBUG, "Got RICserviceQuery %s", message.message.enodbName); - } - break; - } - case ProcedureCode_id_RICsubscription: { - if (logLevel >= MDCLOG_DEBUG) { - mdclog_write(MDCLOG_DEBUG, "Got RICsubscription %s", message.message.enodbName); - } - break; - } - case ProcedureCode_id_RICsubscriptionDelete: { - if (logLevel >= MDCLOG_DEBUG) { - mdclog_write(MDCLOG_DEBUG, "Got RICsubscriptionDelete %s", message.message.enodbName); - } - break; - } default: { mdclog_write(MDCLOG_ERR, "Undefined or not supported message = %ld", procedureCode); message.message.messageType = 0; // no RMR message type yet @@ -1718,25 +1778,12 @@ void asnSuccsesfulMsg(E2AP_PDU_t *pdu, mdclog_write(MDCLOG_INFO, "Successful Outcome %ld", procedureCode); } switch (procedureCode) { - case ProcedureCode_id_E2setup: { - if (logLevel >= MDCLOG_DEBUG) { - mdclog_write(MDCLOG_DEBUG, "Got E2setup\n"); - } - break; - } - case ProcedureCode_id_ErrorIndication: { - if (logLevel >= MDCLOG_DEBUG) { - mdclog_write(MDCLOG_DEBUG, "Got ErrorIndication %s", message.message.enodbName); - } - if (sendRequestToXapp(message, RIC_ERROR_INDICATION, rmrMessageBuffer) != 0) { - mdclog_write(MDCLOG_ERR, "RIC_ERROR_INDICATION failed to send to xAPP"); - } - break; - } case ProcedureCode_id_Reset: { if (logLevel >= MDCLOG_DEBUG) { mdclog_write(MDCLOG_DEBUG, "Got Reset %s", message.message.enodbName); } + message.peerInfo->counters[IN_SUCC][MSG_COUNTER][ProcedureCode_id_Reset - 1]->Increment(); + message.peerInfo->counters[IN_SUCC][BYTES_COUNTER][ProcedureCode_id_Reset - 1]->Increment((double)rmrMessageBuffer.rcvMessage->len); if (XML_From_PER(message, rmrMessageBuffer) < 0) { break; } @@ -1773,6 +1820,8 @@ void asnSuccsesfulMsg(E2AP_PDU_t *pdu, (unsigned char *)message.message.enodbName, strlen(message.message.enodbName)); + message.peerInfo->counters[IN_SUCC][MSG_COUNTER][ProcedureCode_id_RICcontrol - 1]->Increment(); + message.peerInfo->counters[IN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICcontrol - 1]->Increment((double)rmrMessageBuffer.rcvMessage->len); sendRmrMessage(rmrMessageBuffer, message); messageSent = true; } else { @@ -1786,66 +1835,12 @@ void asnSuccsesfulMsg(E2AP_PDU_t *pdu, break; } - case ProcedureCode_id_RICindication: { - if (logLevel >= MDCLOG_DEBUG) { - mdclog_write(MDCLOG_DEBUG, "Got RICindication %s", message.message.enodbName); - } - for (auto i = 0; i < pdu->choice.initiatingMessage->value.choice.RICindication.protocolIEs.list.count; i++) { - auto messageSent = false; - RICindication_IEs_t *ie = pdu->choice.initiatingMessage->value.choice.RICindication.protocolIEs.list.array[i]; - if (logLevel >= MDCLOG_DEBUG) { - mdclog_write(MDCLOG_DEBUG, "ie type (ProtocolIE_ID) = %ld", ie->id); - } - if (ie->id == ProtocolIE_ID_id_RICrequestID) { - if (logLevel >= MDCLOG_DEBUG) { - mdclog_write(MDCLOG_DEBUG, "Got RIC requestId entry, ie type (ProtocolIE_ID) = %ld", ie->id); - } - if (ie->value.present == RICindication_IEs__value_PR_RICrequestID) { - static unsigned char tx[32]; - message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_INDICATION; - snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++); - rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, strlen((const char *) tx)); - rmr_bytes2meid(rmrMessageBuffer.sendMessage, - (unsigned char *)message.message.enodbName, - strlen(message.message.enodbName)); - rmrMessageBuffer.sendMessage->state = 0; - rmrMessageBuffer.sendMessage->sub_id = (int)ie->value.choice.RICrequestID.ricInstanceID; - if (mdclog_level_get() >= MDCLOG_DEBUG) { - mdclog_write(MDCLOG_DEBUG, "RIC sub id = %d, message type = %d", - rmrMessageBuffer.sendMessage->sub_id, - rmrMessageBuffer.sendMessage->mtype); - } - sendRmrMessage(rmrMessageBuffer, message); - messageSent = true; - } else { - mdclog_write(MDCLOG_ERR, "RIC request id missing illigal request"); - } - } - if (messageSent) { - break; - } - } - break; - } - case ProcedureCode_id_RICserviceQuery: { - if (logLevel >= MDCLOG_DEBUG) { - mdclog_write(MDCLOG_DEBUG, "Got RICserviceQuery %s", message.message.enodbName); - } - break; - } - case ProcedureCode_id_RICserviceUpdate: { - if (logLevel >= MDCLOG_DEBUG) { - mdclog_write(MDCLOG_DEBUG, "Got RICserviceUpdate %s", message.message.enodbName); - } - if (sendRequestToXapp(message, RIC_SERVICE_UPDATE, rmrMessageBuffer) != 0) { - mdclog_write(MDCLOG_ERR, "RIC_SERVICE_UPDATE message failed to send to xAPP"); - } - break; - } case ProcedureCode_id_RICsubscription: { if (logLevel >= MDCLOG_DEBUG) { mdclog_write(MDCLOG_DEBUG, "Got RICsubscription %s", message.message.enodbName); } + message.peerInfo->counters[IN_SUCC][MSG_COUNTER][ProcedureCode_id_RICsubscription - 1]->Increment(); + message.peerInfo->counters[IN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICsubscription - 1]->Increment((double)rmrMessageBuffer.rcvMessage->len); if (sendRequestToXapp(message, RIC_SUB_RESP, rmrMessageBuffer) != 0) { mdclog_write(MDCLOG_ERR, "Subscription successful message failed to send to xAPP"); } @@ -1855,6 +1850,8 @@ void asnSuccsesfulMsg(E2AP_PDU_t *pdu, if (logLevel >= MDCLOG_DEBUG) { mdclog_write(MDCLOG_DEBUG, "Got RICsubscriptionDelete %s", message.message.enodbName); } + message.peerInfo->counters[IN_SUCC][MSG_COUNTER][ProcedureCode_id_RICsubscriptionDelete - 1]->Increment(); + message.peerInfo->counters[IN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICsubscriptionDelete - 1]->Increment((double)rmrMessageBuffer.rcvMessage->len); if (sendRequestToXapp(message, RIC_SUB_DEL_RESP, rmrMessageBuffer) != 0) { mdclog_write(MDCLOG_ERR, "Subscription delete successful message failed to send to xAPP"); } @@ -1886,25 +1883,6 @@ void asnUnSuccsesfulMsg(E2AP_PDU_t *pdu, mdclog_write(MDCLOG_INFO, "Unsuccessful Outcome %ld", procedureCode); } switch (procedureCode) { - case ProcedureCode_id_E2setup: { - if (logLevel >= MDCLOG_DEBUG) { - mdclog_write(MDCLOG_DEBUG, "Got E2setup\n"); - } - break; - } - case ProcedureCode_id_ErrorIndication: { - if (logLevel >= MDCLOG_DEBUG) { - mdclog_write(MDCLOG_DEBUG, "Got ErrorIndication %s", message.message.enodbName); - } - if (sendRequestToXapp(message, RIC_ERROR_INDICATION, rmrMessageBuffer) != 0) { - mdclog_write(MDCLOG_ERR, "RIC_ERROR_INDICATION failed to send to xAPP"); - } - break; - } - case ProcedureCode_id_Reset: { - mdclog_write(MDCLOG_ERR, "Got Reset for %s, Protocol ERROR", message.message.enodbName); - break; - } case ProcedureCode_id_RICcontrol: { if (logLevel >= MDCLOG_DEBUG) { mdclog_write(MDCLOG_DEBUG, "Got RICcontrol %s", message.message.enodbName); @@ -1930,6 +1908,8 @@ void asnUnSuccsesfulMsg(E2AP_PDU_t *pdu, rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, strlen((const char *) tx)); rmr_bytes2meid(rmrMessageBuffer.sendMessage, (unsigned char *) message.message.enodbName, strlen(message.message.enodbName)); + message.peerInfo->counters[IN_UN_SUCC][MSG_COUNTER][ProcedureCode_id_RICcontrol - 1]->Increment(); + message.peerInfo->counters[IN_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICcontrol - 1]->Increment((double)rmrMessageBuffer.rcvMessage->len); sendRmrMessage(rmrMessageBuffer, message); messageSent = true; } else { @@ -1942,67 +1922,12 @@ void asnUnSuccsesfulMsg(E2AP_PDU_t *pdu, } break; } - case ProcedureCode_id_RICindication: { - if (logLevel >= MDCLOG_DEBUG) { - mdclog_write(MDCLOG_DEBUG, "Got RICindication %s", message.message.enodbName); - } - for (auto i = 0; i < pdu->choice.initiatingMessage->value.choice.RICindication.protocolIEs.list.count; i++) { - auto messageSent = false; - RICindication_IEs_t *ie = pdu->choice.initiatingMessage->value.choice.RICindication.protocolIEs.list.array[i]; - if (logLevel >= MDCLOG_DEBUG) { - mdclog_write(MDCLOG_DEBUG, "ie type (ProtocolIE_ID) = %ld", ie->id); - } - if (ie->id == ProtocolIE_ID_id_RICrequestID) { - if (logLevel >= MDCLOG_DEBUG) { - mdclog_write(MDCLOG_DEBUG, "Got RIC requestId entry, ie type (ProtocolIE_ID) = %ld", ie->id); - } - if (ie->value.present == RICindication_IEs__value_PR_RICrequestID) { - static unsigned char tx[32]; - message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_INDICATION; - snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++); - rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, strlen((const char *) tx)); - rmr_bytes2meid(rmrMessageBuffer.sendMessage, - (unsigned char *)message.message.enodbName, - strlen(message.message.enodbName)); - rmrMessageBuffer.sendMessage->state = 0; -// rmrMessageBuffer.sendMessage->sub_id = (int)ie->value.choice.RICrequestID.ricRequestorID; - rmrMessageBuffer.sendMessage->sub_id = (int)ie->value.choice.RICrequestID.ricInstanceID; - if (mdclog_level_get() >= MDCLOG_DEBUG) { - mdclog_write(MDCLOG_DEBUG, "RIC sub id = %d, message type = %d", - rmrMessageBuffer.sendMessage->sub_id, - rmrMessageBuffer.sendMessage->mtype); - } - sendRmrMessage(rmrMessageBuffer, message); - messageSent = true; - } else { - mdclog_write(MDCLOG_ERR, "RIC request id missing illigal request"); - } - } - if (messageSent) { - break; - } - } - break; - } - case ProcedureCode_id_RICserviceQuery: { - if (logLevel >= MDCLOG_DEBUG) { - mdclog_write(MDCLOG_DEBUG, "Got RICserviceQuery %s", message.message.enodbName); - } - break; - } - case ProcedureCode_id_RICserviceUpdate: { - if (logLevel >= MDCLOG_DEBUG) { - mdclog_write(MDCLOG_DEBUG, "Got RICserviceUpdate %s", message.message.enodbName); - } - if (sendRequestToXapp(message, RIC_SERVICE_UPDATE, rmrMessageBuffer) != 0) { - mdclog_write(MDCLOG_ERR, "RIC_SERVICE_UPDATE message failed to send to xAPP"); - } - break; - } case ProcedureCode_id_RICsubscription: { if (logLevel >= MDCLOG_DEBUG) { mdclog_write(MDCLOG_DEBUG, "Got RICsubscription %s", message.message.enodbName); } + message.peerInfo->counters[IN_UN_SUCC][MSG_COUNTER][ProcedureCode_id_RICsubscription - 1]->Increment(); + message.peerInfo->counters[IN_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICsubscription - 1]->Increment((double)rmrMessageBuffer.rcvMessage->len); if (sendRequestToXapp(message, RIC_SUB_FAILURE, rmrMessageBuffer) != 0) { mdclog_write(MDCLOG_ERR, "Subscription unsuccessful message failed to send to xAPP"); } @@ -2012,7 +1937,9 @@ void asnUnSuccsesfulMsg(E2AP_PDU_t *pdu, if (logLevel >= MDCLOG_DEBUG) { mdclog_write(MDCLOG_DEBUG, "Got RICsubscriptionDelete %s", message.message.enodbName); } - if (sendRequestToXapp(message, RIC_SUB_DEL_FAILURE, rmrMessageBuffer) != 0) { + message.peerInfo->counters[IN_UN_SUCC][MSG_COUNTER][ProcedureCode_id_RICsubscriptionDelete - 1]->Increment(); + message.peerInfo->counters[IN_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICsubscriptionDelete - 1]->Increment((double)rmrMessageBuffer.rcvMessage->len); + if (sendRequestToXapp(message, RIC_SUB_FAILURE, rmrMessageBuffer) != 0) { mdclog_write(MDCLOG_ERR, "Subscription Delete unsuccessful message failed to send to xAPP"); } break; @@ -2175,12 +2102,19 @@ int receiveXappMessages(Sctp_Map_t *sctpMap, return -1; } rmr_get_meid(rmrMessageBuffer.rcvMessage, (unsigned char *)message.message.enodbName); + message.peerInfo = (ConnectedCU_t *) sctpMap->find(message.message.enodbName); + if (message.peerInfo == nullptr) { + mdclog_write(MDCLOG_ERR, "Failed to send message no CU entry %s", message.message.enodbName); + return -1; + } + switch (rmrMessageBuffer.rcvMessage->mtype) { case RIC_E2_SETUP_RESP : { if (PER_FromXML(message, rmrMessageBuffer) != 0) { break; } - + message.peerInfo->counters[OUT_SUCC][MSG_COUNTER][ProcedureCode_id_E2setup - 1]->Increment(); + message.peerInfo->counters[OUT_SUCC][BYTES_COUNTER][ProcedureCode_id_E2setup - 1]->Increment(rmrMessageBuffer.rcvMessage->len); if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) { mdclog_write(MDCLOG_ERR, "Failed to send RIC_E2_SETUP_RESP"); return -6; @@ -2191,6 +2125,8 @@ int receiveXappMessages(Sctp_Map_t *sctpMap, if (PER_FromXML(message, rmrMessageBuffer) != 0) { break; } + message.peerInfo->counters[OUT_UN_SUCC][MSG_COUNTER][ProcedureCode_id_E2setup - 1]->Increment(); + message.peerInfo->counters[OUT_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_E2setup - 1]->Increment(rmrMessageBuffer.rcvMessage->len); if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) { mdclog_write(MDCLOG_ERR, "Failed to send RIC_E2_SETUP_FAILURE"); return -6; @@ -2198,6 +2134,8 @@ int receiveXappMessages(Sctp_Map_t *sctpMap, break; } case RIC_ERROR_INDICATION: { + message.peerInfo->counters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_ErrorIndication - 1]->Increment(); + message.peerInfo->counters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_ErrorIndication - 1]->Increment(rmrMessageBuffer.rcvMessage->len); if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) { mdclog_write(MDCLOG_ERR, "Failed to send RIC_ERROR_INDICATION"); return -6; @@ -2205,6 +2143,8 @@ int receiveXappMessages(Sctp_Map_t *sctpMap, break; } case RIC_SUB_REQ: { + message.peerInfo->counters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_RICsubscription - 1]->Increment(); + message.peerInfo->counters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_RICsubscription - 1]->Increment(rmrMessageBuffer.rcvMessage->len); if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) { mdclog_write(MDCLOG_ERR, "Failed to send RIC_SUB_REQ"); return -6; @@ -2212,6 +2152,8 @@ int receiveXappMessages(Sctp_Map_t *sctpMap, break; } case RIC_SUB_DEL_REQ: { + message.peerInfo->counters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_RICsubscriptionDelete - 1]->Increment(); + message.peerInfo->counters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_RICsubscriptionDelete - 1]->Increment(rmrMessageBuffer.rcvMessage->len); if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) { mdclog_write(MDCLOG_ERR, "Failed to send RIC_SUB_DEL_REQ"); return -6; @@ -2219,6 +2161,8 @@ int receiveXappMessages(Sctp_Map_t *sctpMap, break; } case RIC_CONTROL_REQ: { + message.peerInfo->counters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_RICcontrol - 1]->Increment(); + message.peerInfo->counters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_RICcontrol - 1]->Increment(rmrMessageBuffer.rcvMessage->len); if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) { mdclog_write(MDCLOG_ERR, "Failed to send RIC_CONTROL_REQ"); return -6; @@ -2229,6 +2173,8 @@ int receiveXappMessages(Sctp_Map_t *sctpMap, if (PER_FromXML(message, rmrMessageBuffer) != 0) { break; } + message.peerInfo->counters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_RICserviceQuery - 1]->Increment(); + message.peerInfo->counters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_RICserviceQuery - 1]->Increment(rmrMessageBuffer.rcvMessage->len); if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) { mdclog_write(MDCLOG_ERR, "Failed to send RIC_SERVICE_QUERY"); return -6; @@ -2239,6 +2185,8 @@ int receiveXappMessages(Sctp_Map_t *sctpMap, if (PER_FromXML(message, rmrMessageBuffer) != 0) { break; } + message.peerInfo->counters[OUT_SUCC][MSG_COUNTER][ProcedureCode_id_RICserviceUpdate - 1]->Increment(); + message.peerInfo->counters[OUT_SUCC][BYTES_COUNTER][ProcedureCode_id_RICserviceQuery - 1]->Increment(rmrMessageBuffer.rcvMessage->len); if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) { mdclog_write(MDCLOG_ERR, "Failed to send RIC_SERVICE_UPDATE_ACK"); return -6; @@ -2249,6 +2197,8 @@ int receiveXappMessages(Sctp_Map_t *sctpMap, if (PER_FromXML(message, rmrMessageBuffer) != 0) { break; } + message.peerInfo->counters[OUT_UN_SUCC][MSG_COUNTER][ProcedureCode_id_RICserviceUpdate - 1]->Increment(); + message.peerInfo->counters[OUT_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICserviceQuery - 1]->Increment(rmrMessageBuffer.rcvMessage->len); if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) { mdclog_write(MDCLOG_ERR, "Failed to send RIC_SERVICE_UPDATE_FAILURE"); return -6; @@ -2259,6 +2209,8 @@ int receiveXappMessages(Sctp_Map_t *sctpMap, if (PER_FromXML(message, rmrMessageBuffer) != 0) { break; } + message.peerInfo->counters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_Reset - 1]->Increment(); + message.peerInfo->counters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_Reset - 1]->Increment(rmrMessageBuffer.rcvMessage->len); if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) { mdclog_write(MDCLOG_ERR, "Failed to send RIC_E2_RESET"); return -6; @@ -2269,6 +2221,8 @@ int receiveXappMessages(Sctp_Map_t *sctpMap, if (PER_FromXML(message, rmrMessageBuffer) != 0) { break; } + message.peerInfo->counters[OUT_SUCC][MSG_COUNTER][ProcedureCode_id_Reset - 1]->Increment(); + message.peerInfo->counters[OUT_SUCC][BYTES_COUNTER][ProcedureCode_id_Reset - 1]->Increment(rmrMessageBuffer.rcvMessage->len); if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) { mdclog_write(MDCLOG_ERR, "Failed to send RIC_E2_RESET_RESP"); return -6; @@ -2410,19 +2364,9 @@ int sendMessagetoCu(Sctp_Map_t *sctpMap, RmrMessagesBuffer_t &messageBuffer, ReportingMessages_t &message, int failedMesgId) { - auto *peerInfo = (ConnectedCU_t *) sctpMap->find(message.message.enodbName); - if (peerInfo == nullptr) { - if (failedMesgId != 0) { - sendFailedSendingMessagetoXapp(messageBuffer, message, failedMesgId); - } else { - mdclog_write(MDCLOG_ERR, "Failed to send message no CU entry %s", message.message.enodbName); - } - return -1; - } - // get the FD message.message.messageType = messageBuffer.rcvMessage->mtype; - auto rc = sendSctpMsg(peerInfo, message, sctpMap); + auto rc = sendSctpMsg(message.peerInfo, message, sctpMap); return rc; } diff --git a/RIC-E2-TERMINATION/sctpThread.h b/RIC-E2-TERMINATION/sctpThread.h index 8b40784..2e286f8 100644 --- a/RIC-E2-TERMINATION/sctpThread.h +++ b/RIC-E2-TERMINATION/sctpThread.h @@ -50,6 +50,7 @@ #include #include #include +#include #include #include @@ -86,6 +87,7 @@ #include #include #include +#include #include using namespace prometheus; @@ -121,6 +123,7 @@ typedef mapWrapper Sctp_Map_t; #define KA_MESSAGE_SIZE 2048 typedef struct sctp_params { + int epollTimeOut = -1; uint16_t rmrPort = 0; uint16_t sctpPort = SRC_PORT; int epoll_fd = 0; @@ -141,10 +144,13 @@ typedef struct sctp_params { string configFilePath {}; string configFileName {}; bool trace = true; - shared_ptr promteheusRegistry; + string prometheusMode {"pull"}; + string prometheusPushAddress {"127.0.0.1:7676"}; + shared_ptr prometheusRegistry; string prometheusPort {"8088"}; Family *prometheusFamily; - //shared_timed_mutex fence; // moved to mapWrapper + Gateway *prometheusGateway = nullptr; + Exposer *prometheusExposer = nullptr; } sctp_params_t; // RAN to RIC -- 2.16.6