From: aa7133@att.com Date: Mon, 1 Jun 2020 14:19:20 +0000 (+0300) Subject: 5.0.1 Replace the Prometheus library with the new one Change in the API remove one... X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=96f712b813b81fe04bcf8adf1e2fb9c49b35e551;p=ric-plt%2Fe2.git 5.0.1 Replace the Prometheus library with the new one Change in the API remove one parameter from the Exposer constructor. Add push capability to Prometheus add all statistic counters Change-Id: Idd8aa8258e4aedbb2a04c89dfac7ac1c1263774a Signed-off-by: aa7133@att.com --- diff --git a/RIC-E2-TERMINATION/CMakeLists.txt b/RIC-E2-TERMINATION/CMakeLists.txt index 398fd6d..fbb9a12 100644 --- a/RIC-E2-TERMINATION/CMakeLists.txt +++ b/RIC-E2-TERMINATION/CMakeLists.txt @@ -103,6 +103,7 @@ link_libraries(nsl cgreen prometheus-cpp-core prometheus-cpp-pull + prometheus-cpp-push z curl pthread) @@ -126,6 +127,7 @@ target_link_libraries(e2 libicuuc.a) target_link_libraries(e2 libicudata.a) target_link_libraries(e2 prometheus-cpp-core.a) target_link_libraries(e2 prometheus-cpp-pull.a) +target_link_libraries(e2 prometheus-cpp-push.a) #target_link_libraries(e2 libnng.a) diff --git a/RIC-E2-TERMINATION/config/config.conf b/RIC-E2-TERMINATION/config/config.conf index 86d1180..d6f5008 100644 --- a/RIC-E2-TERMINATION/config/config.conf +++ b/RIC-E2-TERMINATION/config/config.conf @@ -4,6 +4,11 @@ volume=log #the key name of the environment holds the local ip address #ip address of the E2T in the RMR local-ip=127.0.0.1 +#prometheus mode can be pull or push +prometheusMode=pull +#timeout can be from 5 seconds to 300 seconds default is 10 +prometheusPushTimeOut=10 +prometheusPushAddr=127.0.0.1:7676 prometheusPort=8088 #trace is start, stop trace=start diff --git a/RIC-E2-TERMINATION/container-tag.yaml b/RIC-E2-TERMINATION/container-tag.yaml index adeeeda..6a6c6f6 100644 --- a/RIC-E2-TERMINATION/container-tag.yaml +++ b/RIC-E2-TERMINATION/container-tag.yaml @@ -1,3 +1,3 @@ # The Jenkins job requires a tag to build the Docker image. # Global-JJB script assumes this file is in the repo root. -tag: 5.0.0 +tag: 5.0.1 diff --git a/RIC-E2-TERMINATION/sctpThread.cpp b/RIC-E2-TERMINATION/sctpThread.cpp index 49a4c51..62f42a3 100644 --- a/RIC-E2-TERMINATION/sctpThread.cpp +++ b/RIC-E2-TERMINATION/sctpThread.cpp @@ -239,6 +239,25 @@ int buildConfiguration(sctp_params_t &sctpParams) { } jsonTrace = sctpParams.trace; + tmpStr = conf.getStringValue("prometheusMode"); + transform(tmpStr.begin(), tmpStr.end(), tmpStr.begin(), ::tolower); + if (tmpStr.length() != 0) { + if (tmpStr.compare("push")) { + sctpParams.prometheusPushAddress = tmpStr; + auto timeout = conf.getIntValue("prometheusPushTimeOut"); + if (timeout >= 5 && timeout <= 300) { + sctpParams.epollTimeOut = timeout; + } else { + sctpParams.epollTimeOut = 10; + } + } + } + + tmpStr = conf.getStringValue("prometheusPushAddr"); + if (tmpStr.length() != 0) { + sctpParams.prometheusMode = tmpStr; + } + tmpStr = conf.getStringValue("prometheusPort"); if (tmpStr.length() != 0) { sctpParams.prometheusPort = tmpStr; @@ -289,6 +308,15 @@ int buildConfiguration(sctp_params_t &sctpParams) { return 0; } +static std::string GetHostName() { + char hostname[1024]; + + if (::gethostname(hostname, sizeof(hostname))) { + return {}; + } + return hostname; +} + int main(const int argc, char **argv) { @@ -333,17 +361,15 @@ int main(const int argc, char **argv) { } //auto registry = std::make_shared(); - sctpParams.promteheusRegistry = std::make_shared(); + sctpParams.prometheusRegistry = std::make_shared(); - //sctpParams.prometheusFamily = new Family("E2T", "E2T message counter", {{"E", sctpParams.podName}}); + //sctpParams.promtheusFamily = new Family("E2T", "E2T message counter", {{"E", sctpParams.podName}}); sctpParams.prometheusFamily = &BuildCounter() .Name("E2T") .Help("E2T message counter") .Labels({{"E", sctpParams.podName}}) - .Register(*sctpParams.promteheusRegistry); - - Exposer exposer{sctpParams.myIP + ":" + sctpParams.prometheusPort, "/metrics", 1}; + .Register(*sctpParams.prometheusRegistry); // start epoll @@ -378,7 +404,26 @@ int main(const int argc, char **argv) { std::vector threads(num_cpus); // std::vector threads; - exposer.RegisterCollectable(sctpParams.promteheusRegistry); + if (sctpParams.prometheusMode.compare("pull") == 0) { + sctpParams.prometheusExposer = new Exposer(sctpParams.myIP + ":" + sctpParams.prometheusPort, 1); + sctpParams.prometheusExposer->RegisterCollectable(sctpParams.prometheusRegistry); + } else if (sctpParams.prometheusMode.compare("push") == 0) { + const auto labels = Gateway::GetInstanceLabel(GetHostName()); + string address {}; + string port {}; + char ch = ':'; + auto found = sctpParams.prometheusPushAddress.find_last_of(ch); + // If string doesn't have + // character ch present in it + if (found != string::npos) { + address = sctpParams.prometheusPushAddress.substr(0,found); + port = sctpParams.prometheusPushAddress.substr(found + 1); + sctpParams.prometheusGateway = new Gateway(address, port, "E2T", labels); + sctpParams.prometheusGateway->RegisterCollectable(sctpParams.prometheusRegistry); + } else { + mdclog_write(MDCLOG_ERR, "failed to build Prometheus gateway no stats will be sent"); + } + } num_cpus = 1; for (unsigned int i = 0; i < num_cpus; i++) { @@ -574,21 +619,39 @@ void listener(sctp_params_t *params) { // rmrMessageBuffer.sendBufferedMessages[i] = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE); // } + bool gatewayflag = false; while (true) { + future gateWay; + if (mdclog_level_get() >= MDCLOG_DEBUG) { mdclog_write(MDCLOG_DEBUG, "Start EPOLL Wait"); } - auto numOfEvents = epoll_wait(params->epoll_fd, events, MAXEVENTS, -1); - if (numOfEvents < 0 && errno == EINTR) { - if (mdclog_level_get() >= MDCLOG_DEBUG) { - mdclog_write(MDCLOG_DEBUG, "got EINTR : %s", strerror(errno)); + auto numOfEvents = epoll_wait(params->epoll_fd, events, MAXEVENTS, params->epollTimeOut); + if (numOfEvents == 0) { + if (params->prometheusGateway != nullptr) { + gateWay = params->prometheusGateway->AsyncPush(); + gatewayflag = true; } continue; - } - if (numOfEvents < 0) { + } else if (numOfEvents < 0) { + if (errno == EINTR) { + if (mdclog_level_get() >= MDCLOG_DEBUG) { + mdclog_write(MDCLOG_DEBUG, "got EINTR : %s", strerror(errno)); + } + continue; + } mdclog_write(MDCLOG_ERR, "Epoll wait failed, errno = %s", strerror(errno)); return; } + if (gatewayflag) { + gatewayflag = false; + auto rc = gateWay.get(); + if (rc != 200) { + mdclog_write(MDCLOG_ERR, "Async Send to Promethues faild with Return Code %d", rc); + } else if (mdclog_level_get() >= MDCLOG_DEBUG) { + mdclog_write(MDCLOG_DEBUG, "Stats sent to Prometheus"); + } + } for (auto i = 0; i < numOfEvents; i++) { if (mdclog_level_get() >= MDCLOG_DEBUG) { mdclog_write(MDCLOG_DEBUG, "handling epoll event %d out of %d", i + 1, numOfEvents); @@ -806,6 +869,14 @@ void handleConfigChange(sctp_params_t *sctpParams) { sctpParams->trace = false; } jsonTrace = sctpParams->trace; + + auto timeout = conf.getIntValue("prometheusPushTimeOut"); + if (timeout >= 5 && timeout <= 300) { + sctpParams->epollTimeOut = timeout; + } else { + mdclog_write(MDCLOG_ERR, "prometheusPushTimeOut set wrong value %d, values are [5..300]", timeout); + } + endlessLoop = false; } } @@ -1018,8 +1089,6 @@ int sendSctpMsg(ConnectedCU_t *peerInfo, ReportingMessages_t &message, Sctp_Map_ * @param rmrMessageBuffer */ void getRequestMetaData(ReportingMessages_t &message, RmrMessagesBuffer_t &rmrMessageBuffer) { - rmr_get_meid(rmrMessageBuffer.rcvMessage, (unsigned char *) (message.message.enodbName)); - message.message.asndata = rmrMessageBuffer.rcvMessage->payload; message.message.asnLength = rmrMessageBuffer.rcvMessage->len; @@ -1234,7 +1303,10 @@ static void buildAndsendSetupRequest(ReportingMessages_t &message, } // encode to xml - auto res = buildXmlData(messageName, ieName, functionsToAdd_v, functionsToModified_v, buffer, (size_t) er.encoded); + string res {}; + if (!functionsToAdd_v.empty() || !functionsToModified_v.empty()) { + res = buildXmlData(messageName, ieName, functionsToAdd_v, functionsToModified_v, buffer, (size_t) er.encoded); + } rmr_mbuf_t *rmrMsg; if (res.length() == 0) { rmrMsg = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, buffer_size + 256); @@ -1391,7 +1463,7 @@ int collectServiceUpdate_RequestData(E2AP_PDU_t *pdu, -void buildPromethuslist(ConnectedCU_t *peerInfo, Family *prometheusFamily) { +void buildPrometheuslist(ConnectedCU_t *peerInfo, Family *prometheusFamily) { peerInfo->counters[IN_INITI][MSG_COUNTER][(ProcedureCode_id_E2setup - 1)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"SetupRequest", "Messages"}}); peerInfo->counters[IN_INITI][BYTES_COUNTER][(ProcedureCode_id_E2setup - 1)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"SetupRequest", "Bytes"}}); @@ -1489,7 +1561,6 @@ int collectSetupRequestData(E2AP_PDU_t *pdu, memcpy(message.message.enodbName, message.peerInfo->enodbName, strlen(message.peerInfo->enodbName)); sctpMap->setkey(message.message.enodbName, message.peerInfo); - buildPromethuslist(message.peerInfo, message.peerInfo->sctpParams->prometheusFamily); } } else if (ie->id == ProtocolIE_ID_id_RANfunctionsAdded) { if (ie->value.present == E2setupRequestIEs__value_PR_RANfunctions_List) { @@ -1574,9 +1645,13 @@ void asnInitiatingRequest(E2AP_PDU_t *pdu, break; } + buildPrometheuslist(message.peerInfo, message.peerInfo->sctpParams->prometheusFamily); + string messageName("E2setupRequest"); string ieName("E2setupRequestIEs"); message.message.messageType = RIC_E2_SETUP_REQ; + message.peerInfo->counters[IN_INITI][MSG_COUNTER][ProcedureCode_id_E2setup - 1]->Increment(); + message.peerInfo->counters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_E2setup - 1]->Increment((double)rmrMessageBuffer.rcvMessage->len); buildAndsendSetupRequest(message, rmrMessageBuffer, pdu, messageName, ieName, RANfunctionsAdded_v, RANfunctionsModified_v); break; } @@ -1596,6 +1671,9 @@ void asnInitiatingRequest(E2AP_PDU_t *pdu, string messageName("RICserviceUpdate"); string ieName("RICserviceUpdateIEs"); message.message.messageType = RIC_SERVICE_UPDATE; + message.peerInfo->counters[IN_INITI][MSG_COUNTER][ProcedureCode_id_RICserviceUpdate - 1]->Increment(); + message.peerInfo->counters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_RICserviceUpdate - 1]->Increment((double)rmrMessageBuffer.rcvMessage->len); + buildAndsendSetupRequest(message, rmrMessageBuffer, pdu, messageName, ieName, RANfunctionsAdded_v, RANfunctionsModified_v); break; } @@ -1603,6 +1681,8 @@ void asnInitiatingRequest(E2AP_PDU_t *pdu, if (logLevel >= MDCLOG_DEBUG) { mdclog_write(MDCLOG_DEBUG, "Got ErrorIndication %s", message.message.enodbName); } + message.peerInfo->counters[IN_INITI][MSG_COUNTER][ProcedureCode_id_ErrorIndication - 1]->Increment(); + message.peerInfo->counters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_ErrorIndication - 1]->Increment((double)rmrMessageBuffer.rcvMessage->len); if (sendRequestToXapp(message, RIC_ERROR_INDICATION, rmrMessageBuffer) != 0) { mdclog_write(MDCLOG_ERR, "RIC_ERROR_INDICATION failed to send to xAPP"); } @@ -1613,6 +1693,8 @@ void asnInitiatingRequest(E2AP_PDU_t *pdu, mdclog_write(MDCLOG_DEBUG, "Got Reset %s", message.message.enodbName); } + message.peerInfo->counters[IN_INITI][MSG_COUNTER][ProcedureCode_id_Reset - 1]->Increment(); + message.peerInfo->counters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_Reset - 1]->Increment((double)rmrMessageBuffer.rcvMessage->len); if (XML_From_PER(message, rmrMessageBuffer) < 0) { break; } @@ -1622,12 +1704,6 @@ void asnInitiatingRequest(E2AP_PDU_t *pdu, } break; } - case ProcedureCode_id_RICcontrol: { - if (logLevel >= MDCLOG_DEBUG) { - mdclog_write(MDCLOG_DEBUG, "Got RICcontrol %s", message.message.enodbName); - } - break; - } case ProcedureCode_id_RICindication: { if (logLevel >= MDCLOG_DEBUG) { mdclog_write(MDCLOG_DEBUG, "Got RICindication %s", message.message.enodbName); @@ -1661,6 +1737,8 @@ void asnInitiatingRequest(E2AP_PDU_t *pdu, ie->value.choice.RICrequestID.ricInstanceID, ie->value.choice.RICrequestID.ricRequestorID); } + message.peerInfo->counters[IN_INITI][MSG_COUNTER][ProcedureCode_id_RICindication - 1]->Increment(); + message.peerInfo->counters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_RICindication - 1]->Increment((double)rmrMessageBuffer.rcvMessage->len); sendRmrMessage(rmrMessageBuffer, message); messageSent = true; } else { @@ -1673,24 +1751,6 @@ void asnInitiatingRequest(E2AP_PDU_t *pdu, } break; } - case ProcedureCode_id_RICserviceQuery: { - if (logLevel >= MDCLOG_DEBUG) { - mdclog_write(MDCLOG_DEBUG, "Got RICserviceQuery %s", message.message.enodbName); - } - break; - } - case ProcedureCode_id_RICsubscription: { - if (logLevel >= MDCLOG_DEBUG) { - mdclog_write(MDCLOG_DEBUG, "Got RICsubscription %s", message.message.enodbName); - } - break; - } - case ProcedureCode_id_RICsubscriptionDelete: { - if (logLevel >= MDCLOG_DEBUG) { - mdclog_write(MDCLOG_DEBUG, "Got RICsubscriptionDelete %s", message.message.enodbName); - } - break; - } default: { mdclog_write(MDCLOG_ERR, "Undefined or not supported message = %ld", procedureCode); message.message.messageType = 0; // no RMR message type yet @@ -1718,25 +1778,12 @@ void asnSuccsesfulMsg(E2AP_PDU_t *pdu, mdclog_write(MDCLOG_INFO, "Successful Outcome %ld", procedureCode); } switch (procedureCode) { - case ProcedureCode_id_E2setup: { - if (logLevel >= MDCLOG_DEBUG) { - mdclog_write(MDCLOG_DEBUG, "Got E2setup\n"); - } - break; - } - case ProcedureCode_id_ErrorIndication: { - if (logLevel >= MDCLOG_DEBUG) { - mdclog_write(MDCLOG_DEBUG, "Got ErrorIndication %s", message.message.enodbName); - } - if (sendRequestToXapp(message, RIC_ERROR_INDICATION, rmrMessageBuffer) != 0) { - mdclog_write(MDCLOG_ERR, "RIC_ERROR_INDICATION failed to send to xAPP"); - } - break; - } case ProcedureCode_id_Reset: { if (logLevel >= MDCLOG_DEBUG) { mdclog_write(MDCLOG_DEBUG, "Got Reset %s", message.message.enodbName); } + message.peerInfo->counters[IN_SUCC][MSG_COUNTER][ProcedureCode_id_Reset - 1]->Increment(); + message.peerInfo->counters[IN_SUCC][BYTES_COUNTER][ProcedureCode_id_Reset - 1]->Increment((double)rmrMessageBuffer.rcvMessage->len); if (XML_From_PER(message, rmrMessageBuffer) < 0) { break; } @@ -1773,6 +1820,8 @@ void asnSuccsesfulMsg(E2AP_PDU_t *pdu, (unsigned char *)message.message.enodbName, strlen(message.message.enodbName)); + message.peerInfo->counters[IN_SUCC][MSG_COUNTER][ProcedureCode_id_RICcontrol - 1]->Increment(); + message.peerInfo->counters[IN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICcontrol - 1]->Increment((double)rmrMessageBuffer.rcvMessage->len); sendRmrMessage(rmrMessageBuffer, message); messageSent = true; } else { @@ -1786,66 +1835,12 @@ void asnSuccsesfulMsg(E2AP_PDU_t *pdu, break; } - case ProcedureCode_id_RICindication: { - if (logLevel >= MDCLOG_DEBUG) { - mdclog_write(MDCLOG_DEBUG, "Got RICindication %s", message.message.enodbName); - } - for (auto i = 0; i < pdu->choice.initiatingMessage->value.choice.RICindication.protocolIEs.list.count; i++) { - auto messageSent = false; - RICindication_IEs_t *ie = pdu->choice.initiatingMessage->value.choice.RICindication.protocolIEs.list.array[i]; - if (logLevel >= MDCLOG_DEBUG) { - mdclog_write(MDCLOG_DEBUG, "ie type (ProtocolIE_ID) = %ld", ie->id); - } - if (ie->id == ProtocolIE_ID_id_RICrequestID) { - if (logLevel >= MDCLOG_DEBUG) { - mdclog_write(MDCLOG_DEBUG, "Got RIC requestId entry, ie type (ProtocolIE_ID) = %ld", ie->id); - } - if (ie->value.present == RICindication_IEs__value_PR_RICrequestID) { - static unsigned char tx[32]; - message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_INDICATION; - snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++); - rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, strlen((const char *) tx)); - rmr_bytes2meid(rmrMessageBuffer.sendMessage, - (unsigned char *)message.message.enodbName, - strlen(message.message.enodbName)); - rmrMessageBuffer.sendMessage->state = 0; - rmrMessageBuffer.sendMessage->sub_id = (int)ie->value.choice.RICrequestID.ricInstanceID; - if (mdclog_level_get() >= MDCLOG_DEBUG) { - mdclog_write(MDCLOG_DEBUG, "RIC sub id = %d, message type = %d", - rmrMessageBuffer.sendMessage->sub_id, - rmrMessageBuffer.sendMessage->mtype); - } - sendRmrMessage(rmrMessageBuffer, message); - messageSent = true; - } else { - mdclog_write(MDCLOG_ERR, "RIC request id missing illigal request"); - } - } - if (messageSent) { - break; - } - } - break; - } - case ProcedureCode_id_RICserviceQuery: { - if (logLevel >= MDCLOG_DEBUG) { - mdclog_write(MDCLOG_DEBUG, "Got RICserviceQuery %s", message.message.enodbName); - } - break; - } - case ProcedureCode_id_RICserviceUpdate: { - if (logLevel >= MDCLOG_DEBUG) { - mdclog_write(MDCLOG_DEBUG, "Got RICserviceUpdate %s", message.message.enodbName); - } - if (sendRequestToXapp(message, RIC_SERVICE_UPDATE, rmrMessageBuffer) != 0) { - mdclog_write(MDCLOG_ERR, "RIC_SERVICE_UPDATE message failed to send to xAPP"); - } - break; - } case ProcedureCode_id_RICsubscription: { if (logLevel >= MDCLOG_DEBUG) { mdclog_write(MDCLOG_DEBUG, "Got RICsubscription %s", message.message.enodbName); } + message.peerInfo->counters[IN_SUCC][MSG_COUNTER][ProcedureCode_id_RICsubscription - 1]->Increment(); + message.peerInfo->counters[IN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICsubscription - 1]->Increment((double)rmrMessageBuffer.rcvMessage->len); if (sendRequestToXapp(message, RIC_SUB_RESP, rmrMessageBuffer) != 0) { mdclog_write(MDCLOG_ERR, "Subscription successful message failed to send to xAPP"); } @@ -1855,6 +1850,8 @@ void asnSuccsesfulMsg(E2AP_PDU_t *pdu, if (logLevel >= MDCLOG_DEBUG) { mdclog_write(MDCLOG_DEBUG, "Got RICsubscriptionDelete %s", message.message.enodbName); } + message.peerInfo->counters[IN_SUCC][MSG_COUNTER][ProcedureCode_id_RICsubscriptionDelete - 1]->Increment(); + message.peerInfo->counters[IN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICsubscriptionDelete - 1]->Increment((double)rmrMessageBuffer.rcvMessage->len); if (sendRequestToXapp(message, RIC_SUB_DEL_RESP, rmrMessageBuffer) != 0) { mdclog_write(MDCLOG_ERR, "Subscription delete successful message failed to send to xAPP"); } @@ -1886,25 +1883,6 @@ void asnUnSuccsesfulMsg(E2AP_PDU_t *pdu, mdclog_write(MDCLOG_INFO, "Unsuccessful Outcome %ld", procedureCode); } switch (procedureCode) { - case ProcedureCode_id_E2setup: { - if (logLevel >= MDCLOG_DEBUG) { - mdclog_write(MDCLOG_DEBUG, "Got E2setup\n"); - } - break; - } - case ProcedureCode_id_ErrorIndication: { - if (logLevel >= MDCLOG_DEBUG) { - mdclog_write(MDCLOG_DEBUG, "Got ErrorIndication %s", message.message.enodbName); - } - if (sendRequestToXapp(message, RIC_ERROR_INDICATION, rmrMessageBuffer) != 0) { - mdclog_write(MDCLOG_ERR, "RIC_ERROR_INDICATION failed to send to xAPP"); - } - break; - } - case ProcedureCode_id_Reset: { - mdclog_write(MDCLOG_ERR, "Got Reset for %s, Protocol ERROR", message.message.enodbName); - break; - } case ProcedureCode_id_RICcontrol: { if (logLevel >= MDCLOG_DEBUG) { mdclog_write(MDCLOG_DEBUG, "Got RICcontrol %s", message.message.enodbName); @@ -1930,6 +1908,8 @@ void asnUnSuccsesfulMsg(E2AP_PDU_t *pdu, rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, strlen((const char *) tx)); rmr_bytes2meid(rmrMessageBuffer.sendMessage, (unsigned char *) message.message.enodbName, strlen(message.message.enodbName)); + message.peerInfo->counters[IN_UN_SUCC][MSG_COUNTER][ProcedureCode_id_RICcontrol - 1]->Increment(); + message.peerInfo->counters[IN_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICcontrol - 1]->Increment((double)rmrMessageBuffer.rcvMessage->len); sendRmrMessage(rmrMessageBuffer, message); messageSent = true; } else { @@ -1942,67 +1922,12 @@ void asnUnSuccsesfulMsg(E2AP_PDU_t *pdu, } break; } - case ProcedureCode_id_RICindication: { - if (logLevel >= MDCLOG_DEBUG) { - mdclog_write(MDCLOG_DEBUG, "Got RICindication %s", message.message.enodbName); - } - for (auto i = 0; i < pdu->choice.initiatingMessage->value.choice.RICindication.protocolIEs.list.count; i++) { - auto messageSent = false; - RICindication_IEs_t *ie = pdu->choice.initiatingMessage->value.choice.RICindication.protocolIEs.list.array[i]; - if (logLevel >= MDCLOG_DEBUG) { - mdclog_write(MDCLOG_DEBUG, "ie type (ProtocolIE_ID) = %ld", ie->id); - } - if (ie->id == ProtocolIE_ID_id_RICrequestID) { - if (logLevel >= MDCLOG_DEBUG) { - mdclog_write(MDCLOG_DEBUG, "Got RIC requestId entry, ie type (ProtocolIE_ID) = %ld", ie->id); - } - if (ie->value.present == RICindication_IEs__value_PR_RICrequestID) { - static unsigned char tx[32]; - message.message.messageType = rmrMessageBuffer.sendMessage->mtype = RIC_INDICATION; - snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++); - rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, strlen((const char *) tx)); - rmr_bytes2meid(rmrMessageBuffer.sendMessage, - (unsigned char *)message.message.enodbName, - strlen(message.message.enodbName)); - rmrMessageBuffer.sendMessage->state = 0; -// rmrMessageBuffer.sendMessage->sub_id = (int)ie->value.choice.RICrequestID.ricRequestorID; - rmrMessageBuffer.sendMessage->sub_id = (int)ie->value.choice.RICrequestID.ricInstanceID; - if (mdclog_level_get() >= MDCLOG_DEBUG) { - mdclog_write(MDCLOG_DEBUG, "RIC sub id = %d, message type = %d", - rmrMessageBuffer.sendMessage->sub_id, - rmrMessageBuffer.sendMessage->mtype); - } - sendRmrMessage(rmrMessageBuffer, message); - messageSent = true; - } else { - mdclog_write(MDCLOG_ERR, "RIC request id missing illigal request"); - } - } - if (messageSent) { - break; - } - } - break; - } - case ProcedureCode_id_RICserviceQuery: { - if (logLevel >= MDCLOG_DEBUG) { - mdclog_write(MDCLOG_DEBUG, "Got RICserviceQuery %s", message.message.enodbName); - } - break; - } - case ProcedureCode_id_RICserviceUpdate: { - if (logLevel >= MDCLOG_DEBUG) { - mdclog_write(MDCLOG_DEBUG, "Got RICserviceUpdate %s", message.message.enodbName); - } - if (sendRequestToXapp(message, RIC_SERVICE_UPDATE, rmrMessageBuffer) != 0) { - mdclog_write(MDCLOG_ERR, "RIC_SERVICE_UPDATE message failed to send to xAPP"); - } - break; - } case ProcedureCode_id_RICsubscription: { if (logLevel >= MDCLOG_DEBUG) { mdclog_write(MDCLOG_DEBUG, "Got RICsubscription %s", message.message.enodbName); } + message.peerInfo->counters[IN_UN_SUCC][MSG_COUNTER][ProcedureCode_id_RICsubscription - 1]->Increment(); + message.peerInfo->counters[IN_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICsubscription - 1]->Increment((double)rmrMessageBuffer.rcvMessage->len); if (sendRequestToXapp(message, RIC_SUB_FAILURE, rmrMessageBuffer) != 0) { mdclog_write(MDCLOG_ERR, "Subscription unsuccessful message failed to send to xAPP"); } @@ -2012,7 +1937,9 @@ void asnUnSuccsesfulMsg(E2AP_PDU_t *pdu, if (logLevel >= MDCLOG_DEBUG) { mdclog_write(MDCLOG_DEBUG, "Got RICsubscriptionDelete %s", message.message.enodbName); } - if (sendRequestToXapp(message, RIC_SUB_DEL_FAILURE, rmrMessageBuffer) != 0) { + message.peerInfo->counters[IN_UN_SUCC][MSG_COUNTER][ProcedureCode_id_RICsubscriptionDelete - 1]->Increment(); + message.peerInfo->counters[IN_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICsubscriptionDelete - 1]->Increment((double)rmrMessageBuffer.rcvMessage->len); + if (sendRequestToXapp(message, RIC_SUB_FAILURE, rmrMessageBuffer) != 0) { mdclog_write(MDCLOG_ERR, "Subscription Delete unsuccessful message failed to send to xAPP"); } break; @@ -2175,12 +2102,19 @@ int receiveXappMessages(Sctp_Map_t *sctpMap, return -1; } rmr_get_meid(rmrMessageBuffer.rcvMessage, (unsigned char *)message.message.enodbName); + message.peerInfo = (ConnectedCU_t *) sctpMap->find(message.message.enodbName); + if (message.peerInfo == nullptr) { + mdclog_write(MDCLOG_ERR, "Failed to send message no CU entry %s", message.message.enodbName); + return -1; + } + switch (rmrMessageBuffer.rcvMessage->mtype) { case RIC_E2_SETUP_RESP : { if (PER_FromXML(message, rmrMessageBuffer) != 0) { break; } - + message.peerInfo->counters[OUT_SUCC][MSG_COUNTER][ProcedureCode_id_E2setup - 1]->Increment(); + message.peerInfo->counters[OUT_SUCC][BYTES_COUNTER][ProcedureCode_id_E2setup - 1]->Increment(rmrMessageBuffer.rcvMessage->len); if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) { mdclog_write(MDCLOG_ERR, "Failed to send RIC_E2_SETUP_RESP"); return -6; @@ -2191,6 +2125,8 @@ int receiveXappMessages(Sctp_Map_t *sctpMap, if (PER_FromXML(message, rmrMessageBuffer) != 0) { break; } + message.peerInfo->counters[OUT_UN_SUCC][MSG_COUNTER][ProcedureCode_id_E2setup - 1]->Increment(); + message.peerInfo->counters[OUT_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_E2setup - 1]->Increment(rmrMessageBuffer.rcvMessage->len); if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) { mdclog_write(MDCLOG_ERR, "Failed to send RIC_E2_SETUP_FAILURE"); return -6; @@ -2198,6 +2134,8 @@ int receiveXappMessages(Sctp_Map_t *sctpMap, break; } case RIC_ERROR_INDICATION: { + message.peerInfo->counters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_ErrorIndication - 1]->Increment(); + message.peerInfo->counters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_ErrorIndication - 1]->Increment(rmrMessageBuffer.rcvMessage->len); if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) { mdclog_write(MDCLOG_ERR, "Failed to send RIC_ERROR_INDICATION"); return -6; @@ -2205,6 +2143,8 @@ int receiveXappMessages(Sctp_Map_t *sctpMap, break; } case RIC_SUB_REQ: { + message.peerInfo->counters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_RICsubscription - 1]->Increment(); + message.peerInfo->counters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_RICsubscription - 1]->Increment(rmrMessageBuffer.rcvMessage->len); if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) { mdclog_write(MDCLOG_ERR, "Failed to send RIC_SUB_REQ"); return -6; @@ -2212,6 +2152,8 @@ int receiveXappMessages(Sctp_Map_t *sctpMap, break; } case RIC_SUB_DEL_REQ: { + message.peerInfo->counters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_RICsubscriptionDelete - 1]->Increment(); + message.peerInfo->counters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_RICsubscriptionDelete - 1]->Increment(rmrMessageBuffer.rcvMessage->len); if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) { mdclog_write(MDCLOG_ERR, "Failed to send RIC_SUB_DEL_REQ"); return -6; @@ -2219,6 +2161,8 @@ int receiveXappMessages(Sctp_Map_t *sctpMap, break; } case RIC_CONTROL_REQ: { + message.peerInfo->counters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_RICcontrol - 1]->Increment(); + message.peerInfo->counters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_RICcontrol - 1]->Increment(rmrMessageBuffer.rcvMessage->len); if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) { mdclog_write(MDCLOG_ERR, "Failed to send RIC_CONTROL_REQ"); return -6; @@ -2229,6 +2173,8 @@ int receiveXappMessages(Sctp_Map_t *sctpMap, if (PER_FromXML(message, rmrMessageBuffer) != 0) { break; } + message.peerInfo->counters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_RICserviceQuery - 1]->Increment(); + message.peerInfo->counters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_RICserviceQuery - 1]->Increment(rmrMessageBuffer.rcvMessage->len); if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) { mdclog_write(MDCLOG_ERR, "Failed to send RIC_SERVICE_QUERY"); return -6; @@ -2239,6 +2185,8 @@ int receiveXappMessages(Sctp_Map_t *sctpMap, if (PER_FromXML(message, rmrMessageBuffer) != 0) { break; } + message.peerInfo->counters[OUT_SUCC][MSG_COUNTER][ProcedureCode_id_RICserviceUpdate - 1]->Increment(); + message.peerInfo->counters[OUT_SUCC][BYTES_COUNTER][ProcedureCode_id_RICserviceQuery - 1]->Increment(rmrMessageBuffer.rcvMessage->len); if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) { mdclog_write(MDCLOG_ERR, "Failed to send RIC_SERVICE_UPDATE_ACK"); return -6; @@ -2249,6 +2197,8 @@ int receiveXappMessages(Sctp_Map_t *sctpMap, if (PER_FromXML(message, rmrMessageBuffer) != 0) { break; } + message.peerInfo->counters[OUT_UN_SUCC][MSG_COUNTER][ProcedureCode_id_RICserviceUpdate - 1]->Increment(); + message.peerInfo->counters[OUT_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICserviceQuery - 1]->Increment(rmrMessageBuffer.rcvMessage->len); if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) { mdclog_write(MDCLOG_ERR, "Failed to send RIC_SERVICE_UPDATE_FAILURE"); return -6; @@ -2259,6 +2209,8 @@ int receiveXappMessages(Sctp_Map_t *sctpMap, if (PER_FromXML(message, rmrMessageBuffer) != 0) { break; } + message.peerInfo->counters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_Reset - 1]->Increment(); + message.peerInfo->counters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_Reset - 1]->Increment(rmrMessageBuffer.rcvMessage->len); if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) { mdclog_write(MDCLOG_ERR, "Failed to send RIC_E2_RESET"); return -6; @@ -2269,6 +2221,8 @@ int receiveXappMessages(Sctp_Map_t *sctpMap, if (PER_FromXML(message, rmrMessageBuffer) != 0) { break; } + message.peerInfo->counters[OUT_SUCC][MSG_COUNTER][ProcedureCode_id_Reset - 1]->Increment(); + message.peerInfo->counters[OUT_SUCC][BYTES_COUNTER][ProcedureCode_id_Reset - 1]->Increment(rmrMessageBuffer.rcvMessage->len); if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) { mdclog_write(MDCLOG_ERR, "Failed to send RIC_E2_RESET_RESP"); return -6; @@ -2410,19 +2364,9 @@ int sendMessagetoCu(Sctp_Map_t *sctpMap, RmrMessagesBuffer_t &messageBuffer, ReportingMessages_t &message, int failedMesgId) { - auto *peerInfo = (ConnectedCU_t *) sctpMap->find(message.message.enodbName); - if (peerInfo == nullptr) { - if (failedMesgId != 0) { - sendFailedSendingMessagetoXapp(messageBuffer, message, failedMesgId); - } else { - mdclog_write(MDCLOG_ERR, "Failed to send message no CU entry %s", message.message.enodbName); - } - return -1; - } - // get the FD message.message.messageType = messageBuffer.rcvMessage->mtype; - auto rc = sendSctpMsg(peerInfo, message, sctpMap); + auto rc = sendSctpMsg(message.peerInfo, message, sctpMap); return rc; } diff --git a/RIC-E2-TERMINATION/sctpThread.h b/RIC-E2-TERMINATION/sctpThread.h index 8b40784..2e286f8 100644 --- a/RIC-E2-TERMINATION/sctpThread.h +++ b/RIC-E2-TERMINATION/sctpThread.h @@ -50,6 +50,7 @@ #include #include #include +#include #include #include @@ -86,6 +87,7 @@ #include #include #include +#include #include using namespace prometheus; @@ -121,6 +123,7 @@ typedef mapWrapper Sctp_Map_t; #define KA_MESSAGE_SIZE 2048 typedef struct sctp_params { + int epollTimeOut = -1; uint16_t rmrPort = 0; uint16_t sctpPort = SRC_PORT; int epoll_fd = 0; @@ -141,10 +144,13 @@ typedef struct sctp_params { string configFilePath {}; string configFileName {}; bool trace = true; - shared_ptr promteheusRegistry; + string prometheusMode {"pull"}; + string prometheusPushAddress {"127.0.0.1:7676"}; + shared_ptr prometheusRegistry; string prometheusPort {"8088"}; Family *prometheusFamily; - //shared_timed_mutex fence; // moved to mapWrapper + Gateway *prometheusGateway = nullptr; + Exposer *prometheusExposer = nullptr; } sctp_params_t; // RAN to RIC