X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=RIC-E2-TERMINATION%2FsctpThread.cpp;h=55f90833ec011a944d4293c5b462061f79d800b4;hb=c9d8d2891c793e9b90ae0f4d91e32e28df0faef1;hp=2d7e76b4b27627099f71f9fdc1b073311cfb934b;hpb=60f8bef9fe0d1dcec9505783bcc94d42e2c8529e;p=ric-plt%2Fe2.git diff --git a/RIC-E2-TERMINATION/sctpThread.cpp b/RIC-E2-TERMINATION/sctpThread.cpp index 2d7e76b..55f9083 100644 --- a/RIC-E2-TERMINATION/sctpThread.cpp +++ b/RIC-E2-TERMINATION/sctpThread.cpp @@ -33,8 +33,6 @@ #include #include - - using namespace std; //using namespace std::placeholders; using namespace boost::filesystem; @@ -62,6 +60,22 @@ boost::shared_ptr> boostLogger double cpuClock = 0.0; bool jsonTrace = false; +char* getinterfaceip() +{ + char hostname[256]; + char *IP; + struct hostent *host_entry; + int retVal; + retVal = gethostname(hostname, sizeof(hostname)); + if ( retVal == -1 ) + return NULL; + host_entry = gethostbyname(hostname); + if ( host_entry == NULL ) + return NULL; + IP = inet_ntoa(*((struct in_addr*) host_entry->h_addr_list[0])); + return IP; +} + static int enable_log_change_notify(const char* fileName) { @@ -81,7 +95,7 @@ static int register_log_change_notify(const char *fileName) pthread_t tid; pthread_attr_init(&cb_attr); pthread_attr_setdetachstate(&cb_attr,PTHREAD_CREATE_DETACHED); - return pthread_create(&tid, &cb_attr,&monitor_loglevel_change_handler,(void *)strdup(fileName)); + return pthread_create(&tid, &cb_attr,&monitor_loglevel_change_handler,(void *)fileName); } @@ -128,11 +142,13 @@ static void * monitor_loglevel_change_handler(void* arg) { n = read( ifd, rbuf, sizeof( rbuf ) ); // read the event if( n < 0 ) { +#if !(defined(UNIT_TEST) || defined(MODULE_TEST)) if( errno == EAGAIN ) { } else { - fprintf( stderr, "### CRIT ### config listener read err: %s\n", strerror( errno ) ); + printf( "### CRIT ### config listener read err: %s\n", strerror( errno ) ); } continue; +#endif } //Retrieving Log Level from configmap by parsing configmap file @@ -271,8 +287,10 @@ static long transactionCounter = 0; int buildListeningPort(sctp_params_t &sctpParams) { sctpParams.listenFD = socket(AF_INET6, SOCK_STREAM, IPPROTO_SCTP); if (sctpParams.listenFD <= 0) { +#if !(defined(UNIT_TEST) || defined(MODULE_TEST)) mdclog_write(MDCLOG_ERR, "Error Opening socket, %s", strerror(errno)); return -1; +#endif } struct sockaddr_in6 serverAddress {}; @@ -280,8 +298,10 @@ int buildListeningPort(sctp_params_t &sctpParams) { serverAddress.sin6_addr = in6addr_any; serverAddress.sin6_port = htons(sctpParams.sctpPort); if (bind(sctpParams.listenFD, (SA *)&serverAddress, sizeof(serverAddress)) < 0 ) { +#if !(defined(UNIT_TEST) || defined(MODULE_TEST)) mdclog_write(MDCLOG_ERR, "Error binding port %d. %s", sctpParams.sctpPort, strerror(errno)); return -1; +#endif } if (setSocketNoBlocking(sctpParams.listenFD) == -1) { //mdclog_write(MDCLOG_ERR, "Error binding. %s", strerror(errno)); @@ -297,8 +317,10 @@ int buildListeningPort(sctp_params_t &sctpParams) { } if (listen(sctpParams.listenFD, SOMAXCONN) < 0) { +#if !(defined(UNIT_TEST) || defined(MODULE_TEST)) mdclog_write(MDCLOG_ERR, "Error listening. %s\n", strerror(errno)); return -1; +#endif } struct epoll_event event {}; event.events = EPOLLIN | EPOLLET; @@ -306,9 +328,11 @@ int buildListeningPort(sctp_params_t &sctpParams) { // add listening port to epoll if (epoll_ctl(sctpParams.epoll_fd, EPOLL_CTL_ADD, sctpParams.listenFD, &event)) { +#if !(defined(UNIT_TEST) || defined(MODULE_TEST)) printf("Failed to add descriptor to epoll\n"); mdclog_write(MDCLOG_ERR, "Failed to add descriptor to epoll. %s\n", strerror(errno)); return -1; +#endif } return 0; @@ -320,31 +344,41 @@ int buildConfiguration(sctp_params_t &sctpParams) { const int size = 2048; auto fileSize = file_size(p); if (fileSize > size) { +#if !(defined(UNIT_TEST) || defined(MODULE_TEST)) mdclog_write(MDCLOG_ERR, "File %s larger than %d", p.string().c_str(), size); return -1; +#endif } } else { +#if !(defined(UNIT_TEST) || defined(MODULE_TEST)) mdclog_write(MDCLOG_ERR, "Configuration File %s not exists", p.string().c_str()); return -1; +#endif } ReadConfigFile conf; if (conf.openConfigFile(p.string()) == -1) { +#if !(defined(UNIT_TEST) || defined(MODULE_TEST)) mdclog_write(MDCLOG_ERR, "Filed to open config file %s, %s", p.string().c_str(), strerror(errno)); return -1; +#endif } int rmrPort = conf.getIntValue("nano"); if (rmrPort == -1) { +#if !(defined(UNIT_TEST) || defined(MODULE_TEST)) mdclog_write(MDCLOG_ERR, "illegal RMR port "); return -1; +#endif } sctpParams.rmrPort = (uint16_t)rmrPort; snprintf(sctpParams.rmrAddress, sizeof(sctpParams.rmrAddress), "%d", (int) (sctpParams.rmrPort)); auto tmpStr = conf.getStringValue("volume"); if (tmpStr.length() == 0) { +#if !(defined(UNIT_TEST) || defined(MODULE_TEST)) mdclog_write(MDCLOG_ERR, "illegal volume."); return -1; +#endif } char tmpLogFilespec[VOLUME_URL_SIZE]; @@ -360,21 +394,27 @@ int buildConfiguration(sctp_params_t &sctpParams) { sctpParams.myIP = conf.getStringValue("local-ip"); if (sctpParams.myIP.length() == 0) { +#if !(defined(UNIT_TEST) || defined(MODULE_TEST)) mdclog_write(MDCLOG_ERR, "illegal local-ip."); return -1; +#endif } int sctpPort = conf.getIntValue("sctp-port"); if (sctpPort == -1) { +#if !(defined(UNIT_TEST) || defined(MODULE_TEST)) mdclog_write(MDCLOG_ERR, "illegal SCTP port "); return -1; +#endif } sctpParams.sctpPort = (uint16_t)sctpPort; sctpParams.fqdn = conf.getStringValue("external-fqdn"); if (sctpParams.fqdn.length() == 0) { +#if !(defined(UNIT_TEST) || defined(MODULE_TEST)) mdclog_write(MDCLOG_ERR, "illegal external-fqdn"); return -1; +#endif } std::string pod = conf.getStringValue("pod_name"); @@ -399,14 +439,18 @@ int buildConfiguration(sctp_params_t &sctpParams) { tmpStr = conf.getStringValue("trace"); transform(tmpStr.begin(), tmpStr.end(), tmpStr.begin(), ::tolower); if ((tmpStr.compare("start")) == 0) { +#if !(defined(UNIT_TEST) || defined(MODULE_TEST)) mdclog_write(MDCLOG_INFO, "Trace set to: start"); sctpParams.trace = true; +#endif } else if ((tmpStr.compare("stop")) == 0) { mdclog_write(MDCLOG_INFO, "Trace set to: stop"); sctpParams.trace = false; } else { +#if !(defined(UNIT_TEST) || defined(MODULE_TEST)) mdclog_write(MDCLOG_ERR, "Trace was set to wrong value %s, set to stop", tmpStr.c_str()); sctpParams.trace = false; +#endif } jsonTrace = sctpParams.trace; @@ -462,13 +506,30 @@ int buildConfiguration(sctp_params_t &sctpParams) { } void startPrometheus(sctp_params_t &sctpParams) { + auto podName = std::getenv("POD_NAME"); + string metric = "E2TBeta"; + if (strstr(podName, "alpha") != NULL) { + metric = "E2TAlpha"; + } + //Get eth0 interface IP + char* host = getinterfaceip(); + string hostip = host; + sctpParams.prometheusFamily = &BuildCounter() - .Name("E2T") - .Help("E2T message counter") + .Name(metric.c_str()) + .Help("E2T instance metrics") .Labels({{"POD_NAME", sctpParams.podName}}) .Register(*sctpParams.prometheusRegistry); - string prometheusPath = sctpParams.prometheusPort + "," + "[::]:" + sctpParams.prometheusPort; + // Build E2T instance level metrics + buildE2TPrometheusCounters(sctpParams); + + string prometheusPath; + if (hostip.empty()) + prometheusPath = sctpParams.prometheusPort + "," + "[::]:" + sctpParams.prometheusPort; + else + prometheusPath = hostip + ":" + sctpParams.prometheusPort; + if (mdclog_level_get() >= MDCLOG_DEBUG) { mdclog_write(MDCLOG_DEBUG, "Start Prometheus Pull mode on %s", prometheusPath.c_str()); } @@ -762,6 +823,7 @@ void listener(sctp_params_t *params) { auto numOfEvents = 1; #endif if (numOfEvents == 0) { // time out +#if !(defined(UNIT_TEST) || defined(MODULE_TEST)) if (mdclog_level_get() >= MDCLOG_DEBUG) { mdclog_write(MDCLOG_DEBUG, "got epoll timeout"); } @@ -774,7 +836,12 @@ void listener(sctp_params_t *params) { continue; } mdclog_write(MDCLOG_ERR, "Epoll wait failed, errno = %s", strerror(errno)); + if(events) + { + free(events); + } return; +#endif } for (auto i = 0; i < numOfEvents; i++) { if (mdclog_level_get() >= MDCLOG_DEBUG) { @@ -809,10 +876,15 @@ void listener(sctp_params_t *params) { peerInfo->sctpParams = params; peerInfo->fileDescriptor = accept(params->listenFD, &in_addr, &in_len); if (peerInfo->fileDescriptor == -1) { +#if !(defined(UNIT_TEST) || defined(MODULE_TEST)) if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) { /* We have processed all incoming connections. */ + if(peerInfo) + free(peerInfo); break; } else { + if(peerInfo) + free(peerInfo); mdclog_write(MDCLOG_ERR, "Accept error, errno = %s", strerror(errno)); break; } @@ -820,7 +892,10 @@ void listener(sctp_params_t *params) { if (setSocketNoBlocking(peerInfo->fileDescriptor) == -1) { mdclog_write(MDCLOG_ERR, "setSocketNoBlocking failed to set new connection %s on port %s\n", hostBuff, portBuff); close(peerInfo->fileDescriptor); + if(peerInfo) + free(peerInfo); break; +#endif } auto ans = getnameinfo(&in_addr, in_len, peerInfo->hostName, NI_MAXHOST, @@ -828,6 +903,8 @@ void listener(sctp_params_t *params) { if (ans < 0) { mdclog_write(MDCLOG_ERR, "Failed to get info on connection request. %s\n", strerror(errno)); close(peerInfo->fileDescriptor); + if(peerInfo) + free(peerInfo); break; } if (mdclog_level_get() >= MDCLOG_DEBUG) { @@ -840,6 +917,8 @@ void listener(sctp_params_t *params) { (EPOLLIN | EPOLLET), params->sctpMap, nullptr, 0) != 0) { + if(peerInfo) + free(peerInfo); break; } break; @@ -886,7 +965,7 @@ void listener(sctp_params_t *params) { } } #ifdef UNIT_TEST - break; + break; #endif } } @@ -905,12 +984,13 @@ void handleConfigChange(sctp_params_t *sctpParams) { path p = (sctpParams->configFilePath + "/" + sctpParams->configFileName).c_str(); auto endlessLoop = true; while (endlessLoop) { -#ifndef UNIT_TEST - auto len = read(sctpParams->inotifyFD, buf, sizeof buf); +#if !(defined(UNIT_TEST) || defined(MODULE_TEST)) + auto len = read(sctpParams->inotifyFD, buf, sizeof buf); #else auto len=10; #endif if (len == -1) { +#if !(defined(UNIT_TEST) || defined(MODULE_TEST)) if (errno != EAGAIN) { mdclog_write(MDCLOG_ERR, "read %s ", strerror(errno)); endlessLoop = false; @@ -920,6 +1000,7 @@ void handleConfigChange(sctp_params_t *sctpParams) { endlessLoop = false; continue; } +#endif } for (ptr = buf; ptr < buf + len; ptr += sizeof(struct inotify_event) + event->len) { @@ -938,10 +1019,12 @@ void handleConfigChange(sctp_params_t *sctpParams) { // not the directory } if (event->len) { +#if !(defined(UNIT_TEST) || defined(MODULE_TEST)) auto retVal = strcmp(sctpParams->configFileName.c_str(), event->name); if (retVal != 0) { continue; } +#endif } // only the file we want if (event->mask & (uint32_t)IN_CLOSE_WRITE) { @@ -976,7 +1059,9 @@ void handleConfigChange(sctp_params_t *sctpParams) { if ((tmpStr.compare("debug")) == 0) { mdclog_write(MDCLOG_INFO, "Log level set to MDCLOG_DEBUG"); sctpParams->logLevel = MDCLOG_DEBUG; - } else if ((tmpStr.compare("info")) == 0) { + } +#if !(defined(UNIT_TEST) || defined(MODULE_TEST)) + else if ((tmpStr.compare("info")) == 0) { mdclog_write(MDCLOG_INFO, "Log level set to MDCLOG_INFO"); sctpParams->logLevel = MDCLOG_INFO; } else if ((tmpStr.compare("warning")) == 0) { @@ -989,6 +1074,7 @@ void handleConfigChange(sctp_params_t *sctpParams) { mdclog_write(MDCLOG_ERR, "illegal loglevel = %s. Set loglevel to MDCLOG_INFO", tmpStr.c_str()); sctpParams->logLevel = MDCLOG_INFO; } +#endif mdclog_level_set(sctpParams->logLevel); tmpStr = conf.getStringValue("trace"); if (tmpStr.length() == 0) { @@ -1038,7 +1124,7 @@ void handleEinprogressMessages(struct epoll_event &event, socklen_t retValLen = 0; auto rc = getsockopt(peerInfo->fileDescriptor, SOL_SOCKET, SO_ERROR, &retVal, &retValLen); if (rc != 0 || retVal != 0) { -#ifndef UNIT_TEST +#if !(defined(UNIT_TEST) || defined(MODULE_TEST)) if (rc != 0) { rmrMessageBuffer.sendMessage->len = snprintf((char *)rmrMessageBuffer.sendMessage->payload, 256, "%s|Failed SCTP Connection, after EINPROGRESS the getsockopt%s", @@ -1062,7 +1148,7 @@ void handleEinprogressMessages(struct epoll_event &event, peerInfo->mtype = 0; return; } - +#if !(defined(UNIT_TEST) || defined(MODULE_TEST)) peerInfo->isConnected = true; if (modifyToEpoll(params->epoll_fd, peerInfo, (EPOLLIN | EPOLLET), params->sctpMap, peerInfo->enodbName, @@ -1090,6 +1176,7 @@ void handleEinprogressMessages(struct epoll_event &event, memset(peerInfo->asnData, 0, peerInfo->asnLength); peerInfo->asnLength = 0; peerInfo->mtype = 0; +#endif } @@ -1101,8 +1188,7 @@ void handlepoll_error(struct epoll_event &event, auto *peerInfo = (ConnectedCU_t *)event.data.ptr; mdclog_write(MDCLOG_ERR, "epoll error, events %0x on fd %d, RAN NAME : %s", event.events, peerInfo->fileDescriptor, peerInfo->enodbName); -#ifndef UNIT_TEST - +#if !(defined(UNIT_TEST) || defined(MODULE_TEST)) rmrMessageBuffer.sendMessage->len = snprintf((char *)rmrMessageBuffer.sendMessage->payload, 256, "%s|Failed SCTP Connection", peerInfo->enodbName); @@ -1175,7 +1261,11 @@ void cleanHashEntry(ConnectedCU_t *val, Sctp_Map_t *m) { */ int sendSctpMsg(ConnectedCU_t *peerInfo, ReportingMessages_t &message, Sctp_Map_t *m) { auto loglevel = mdclog_level_get(); +#ifndef UNIT_TEST int fd = peerInfo->fileDescriptor; +#else + int fd = FILE_DESCRIPTOR; +#endif if (loglevel >= MDCLOG_DEBUG) { mdclog_write(MDCLOG_DEBUG, "Send SCTP message for CU %s, %s", message.message.enodbName, __FUNCTION__); @@ -1187,14 +1277,16 @@ int sendSctpMsg(ConnectedCU_t *peerInfo, ReportingMessages_t &message, Sctp_Map_ continue; } mdclog_write(MDCLOG_ERR, "error writing to CU a message, %s ", strerror(errno)); +#if !(defined(UNIT_TEST) || defined(MODULE_TEST)) if (!peerInfo->isConnected) { mdclog_write(MDCLOG_ERR, "connection to CU %s is still in progress.", message.message.enodbName); -#ifndef UNIT_TEST return -1; -#endif } +#endif +#ifndef UNIT_TEST cleanHashEntry(peerInfo, m); close(fd); +#endif char key[MAX_ENODB_NAME_SIZE * 2]; snprintf(key, MAX_ENODB_NAME_SIZE * 2, "msg:%s|%d", message.message.enodbName, message.message.messageType); @@ -1278,8 +1370,12 @@ int receiveDataFromSctp(struct epoll_event *events, } // read the buffer directly to rmr payload message.message.asndata = rmrMessageBuffer.sendMessage->payload; +#ifndef UNIT_TEST message.message.asnLength = rmrMessageBuffer.sendMessage->len = read(message.peerInfo->fileDescriptor, rmrMessageBuffer.sendMessage->payload, RECEIVE_SCTP_BUFFER_SIZE); +#else + message.message.asnLength = rmrMessageBuffer.sendMessage->len; +#endif if (loglevel >= MDCLOG_DEBUG) { mdclog_write(MDCLOG_DEBUG, "Finish Read from SCTP %d fd message length = %ld", @@ -1329,12 +1425,20 @@ int receiveDataFromSctp(struct epoll_event *events, printBuffer); clock_gettime(CLOCK_MONOTONIC, &decodeStart); } - +#ifndef UNIT_TEST auto rval = asn_decode(nullptr, ATS_ALIGNED_BASIC_PER, &asn_DEF_E2AP_PDU, (void **) &pdu, - message.message.asndata, message.message.asnLength); + message.message.asndata, message.message.asnLength); +#else + asn_dec_rval_t rval = {RC_OK, 0}; + pdu = (E2AP_PDU_t*)rmrMessageBuffer.sendMessage->tp_buf; +#endif if (rval.code != RC_OK) { mdclog_write(MDCLOG_ERR, "Error %d Decoding (unpack) E2AP PDU from RAN : %s", rval.code, message.peerInfo->enodbName); + if (pdu != nullptr) { + ASN_STRUCT_FREE(asn_DEF_E2AP_PDU, pdu); + pdu = nullptr; + } break; } @@ -1377,11 +1481,16 @@ int receiveDataFromSctp(struct epoll_event *events, message.peerInfo->enodbName, end.tv_sec - decodeStart.tv_sec, end.tv_nsec - decodeStart.tv_nsec); } numOfMessages++; +#ifndef UNIT_TEST if (pdu != nullptr) { - ASN_STRUCT_RESET(asn_DEF_E2AP_PDU, pdu); - //ASN_STRUCT_FREE(asn_DEF_E2AP_PDU, pdu); - //pdu = nullptr; + // ASN_STRUCT_RESET(asn_DEF_E2AP_PDU, pdu); /* With reset we were not freeing the memory and was causing the leak here. */ + ASN_STRUCT_FREE(asn_DEF_E2AP_PDU, pdu); + pdu = nullptr; } +#else + done = 1; + break; +#endif } if (done) { @@ -1394,12 +1503,13 @@ int receiveDataFromSctp(struct epoll_event *events, "%s|CU disconnected unexpectedly", message.peerInfo->enodbName); message.message.asndata = rmrMessageBuffer.sendMessage->payload; - +#if !(defined(UNIT_TEST) || defined(MODULE_TEST)) if (sendRequestToXapp(message, RIC_SCTP_CONNECTION_FAILURE, rmrMessageBuffer) != 0) { mdclog_write(MDCLOG_ERR, "SCTP_CONNECTION_FAIL message failed to send to xAPP"); } +#endif /* Closing descriptor make epoll remove it from the set of descriptors which are monitored. */ close(message.peerInfo->fileDescriptor); @@ -1430,16 +1540,21 @@ static void buildAndSendSetupRequest(ReportingMessages_t &message, buffer = (unsigned char *) calloc(buffer_size, sizeof(unsigned char)); if(!buffer) { +#if !(defined(UNIT_TEST) || defined(MODULE_TEST)) mdclog_write(MDCLOG_ERR, "Allocating buffer for %s failed, %s", asn_DEF_E2AP_PDU.name, strerror(errno)); return; +#endif } while (true) { er = asn_encode_to_buffer(nullptr, ATS_BASIC_XER, &asn_DEF_E2AP_PDU, pdu, buffer, buffer_size); if (er.encoded == -1) { +#if !(defined(UNIT_TEST) || defined(MODULE_TEST)) mdclog_write(MDCLOG_ERR, "encoding of %s failed, %s", asn_DEF_E2AP_PDU.name, strerror(errno)); return; +#endif } else if (er.encoded > (ssize_t) buffer_size) { buffer_size = er.encoded + 128; +#if !(defined(UNIT_TEST) || defined(MODULE_TEST)) mdclog_write(MDCLOG_WARN, "Buffer of size %d is to small for %s. Reallocate buffer of size %d", (int) buffer_size, asn_DEF_E2AP_PDU.name, buffer_size); @@ -1456,6 +1571,7 @@ static void buildAndSendSetupRequest(ReportingMessages_t &message, } buffer = newBuffer; continue; +#endif } buffer[er.encoded] = '\0'; break; @@ -1635,6 +1751,68 @@ int collectServiceUpdate_RequestData(E2AP_PDU_t *pdu, #endif +void buildE2TPrometheusCounters(sctp_params_t &sctpParams) { + sctpParams.e2tCounters[IN_INITI][MSG_COUNTER][(ProcedureCode_id_E2setup)] = &sctpParams.prometheusFamily->Add({{"counter", "SetupRequestMsgs"}}); + sctpParams.e2tCounters[IN_INITI][BYTES_COUNTER][(ProcedureCode_id_E2setup)] = &sctpParams.prometheusFamily->Add({{"counter", "SetupRequestBytes"}}); + + sctpParams.e2tCounters[OUT_SUCC][MSG_COUNTER][(ProcedureCode_id_E2setup)] = &sctpParams.prometheusFamily->Add({{"counter", "SetupResponseMsgs"}}); + sctpParams.e2tCounters[OUT_SUCC][BYTES_COUNTER][(ProcedureCode_id_E2setup)] = &sctpParams.prometheusFamily->Add({{"counter", "SetupResponseBytes"}}); + + sctpParams.e2tCounters[OUT_UN_SUCC][MSG_COUNTER][ProcedureCode_id_E2setup] = &sctpParams.prometheusFamily->Add({{"counter", "SetupRequestFailureMsgs"}}); + sctpParams.e2tCounters[OUT_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_E2setup] = &sctpParams.prometheusFamily->Add({{"counter", "SetupRequestFailureBytes"}}); + + sctpParams.e2tCounters[IN_INITI][MSG_COUNTER][(ProcedureCode_id_ErrorIndication)] = &sctpParams.prometheusFamily->Add({{"counter", "ErrorIndicationMsgs"}}); + sctpParams.e2tCounters[IN_INITI][BYTES_COUNTER][(ProcedureCode_id_ErrorIndication)] = &sctpParams.prometheusFamily->Add({{"counter", "ErrorIndicationBytes"}}); + + sctpParams.e2tCounters[IN_INITI][MSG_COUNTER][ProcedureCode_id_Reset] = &sctpParams.prometheusFamily->Add({{"counter", "ResetRequestMsgs"}}); + sctpParams.e2tCounters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_Reset] = &sctpParams.prometheusFamily->Add({{"counter", "ResetRequestBytes"}}); + + sctpParams.e2tCounters[OUT_SUCC][MSG_COUNTER][ProcedureCode_id_Reset] = &sctpParams.prometheusFamily->Add({{"counter", "ResetAckMsgs"}}); + sctpParams.e2tCounters[OUT_SUCC][BYTES_COUNTER][ProcedureCode_id_Reset] = &sctpParams.prometheusFamily->Add({{"counter", "ResetAckBytes"}}); + + sctpParams.e2tCounters[IN_INITI][MSG_COUNTER][ProcedureCode_id_RICserviceUpdate] = &sctpParams.prometheusFamily->Add({{"counter", "RICServiceUpdateMsgs"}}); + sctpParams.e2tCounters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_RICserviceUpdate] = &sctpParams.prometheusFamily->Add({{"counter", "RICServiceUpdateBytes"}}); + + sctpParams.e2tCounters[OUT_SUCC][MSG_COUNTER][ProcedureCode_id_RICserviceUpdate] = &sctpParams.prometheusFamily->Add({{"counter", "RICServiceUpdateRespMsgs"}}); + sctpParams.e2tCounters[OUT_SUCC][BYTES_COUNTER][ProcedureCode_id_RICserviceUpdate] = &sctpParams.prometheusFamily->Add({{"counter", "RICServiceUpdateRespBytes"}}); + + sctpParams.e2tCounters[OUT_UN_SUCC][MSG_COUNTER][ProcedureCode_id_RICserviceUpdate] = &sctpParams.prometheusFamily->Add({{"counter", "RICServiceUpdateFailureMsgs"}}); + sctpParams.e2tCounters[OUT_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICserviceUpdate] = &sctpParams.prometheusFamily->Add({{"counter", "RICServiceUpdateFailureBytes"}}); + + sctpParams.e2tCounters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_RICcontrol] = &sctpParams.prometheusFamily->Add({{"counter", "RICControlMsgs"}}); + sctpParams.e2tCounters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_RICcontrol] = &sctpParams.prometheusFamily->Add({{"counter", "RICControlBytes"}}); + + sctpParams.e2tCounters[IN_SUCC][MSG_COUNTER][ProcedureCode_id_RICcontrol] = &sctpParams.prometheusFamily->Add({{"counter", "RICControlAckMsgs"}}); + sctpParams.e2tCounters[IN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICcontrol] = &sctpParams.prometheusFamily->Add({{"counter", "RICControlAckBytes"}}); + + sctpParams.e2tCounters[IN_UN_SUCC][MSG_COUNTER][ProcedureCode_id_RICcontrol] = &sctpParams.prometheusFamily->Add({{"counter", "RICControlFailureMsgs"}}); + sctpParams.e2tCounters[IN_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICcontrol] = &sctpParams.prometheusFamily->Add({{"counter", "RICControlFailureBytes"}}); + + sctpParams.e2tCounters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_RICsubscription] = &sctpParams.prometheusFamily->Add({{"counter", "RICSubscriptionMsgs"}}); + sctpParams.e2tCounters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_RICsubscription] = &sctpParams.prometheusFamily->Add({{"counter", "RICSubscriptionBytes"}}); + + sctpParams.e2tCounters[IN_SUCC][MSG_COUNTER][ProcedureCode_id_RICsubscription] = &sctpParams.prometheusFamily->Add({{"counter", "RICSubscriptionAckMsgs"}}); + sctpParams.e2tCounters[IN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICsubscription] = &sctpParams.prometheusFamily->Add({{"counter", "RICSubscriptionAckBytes"}}); + + sctpParams.e2tCounters[IN_UN_SUCC][MSG_COUNTER][ProcedureCode_id_RICsubscription] = &sctpParams.prometheusFamily->Add({{"counter", "RICSubscriptionFailureMsgs"}}); + sctpParams.e2tCounters[IN_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICsubscription] = &sctpParams.prometheusFamily->Add({{"counter", "RICSubscriptionFailureBytes"}}); + + sctpParams.e2tCounters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_RICsubscriptionDelete] = &sctpParams.prometheusFamily->Add({{"counter", "RICSubscriptionDeleteMsgs"}}); + sctpParams.e2tCounters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_RICsubscriptionDelete] = &sctpParams.prometheusFamily->Add({{"counter", "RICSubscriptionDeleteBytes"}}); + + sctpParams.e2tCounters[IN_SUCC][MSG_COUNTER][ProcedureCode_id_RICsubscriptionDelete] = &sctpParams.prometheusFamily->Add({{"counter", "RICSubscriptionDeleteAckMsgs"}}); + sctpParams.e2tCounters[IN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICsubscriptionDelete] = &sctpParams.prometheusFamily->Add({{"counter", "RICSubscriptionDeleteAckBytes"}}); + + sctpParams.e2tCounters[IN_UN_SUCC][MSG_COUNTER][ProcedureCode_id_RICsubscriptionDelete] = &sctpParams.prometheusFamily->Add({{"counter", "RICSubscriptionDeleteFailMsgs"}}); + sctpParams.e2tCounters[IN_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICsubscriptionDelete] = &sctpParams.prometheusFamily->Add({{"counter", "RICSubscriptionDeleteFailBytes"}}); + + sctpParams.e2tCounters[IN_INITI][MSG_COUNTER][ProcedureCode_id_RICindication] = &sctpParams.prometheusFamily->Add({{"counter", "RICIndicationMsgs"}}); + sctpParams.e2tCounters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_RICindication] = &sctpParams.prometheusFamily->Add({{"counter", "RICIndicationBytes"}}); + + sctpParams.e2tCounters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_RICserviceQuery] = &sctpParams.prometheusFamily->Add({{"counter", "RICServiceQueryMsgs"}}); + sctpParams.e2tCounters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_RICserviceQuery] = &sctpParams.prometheusFamily->Add({{"counter", "RICServiceQueryBytes"}}); +} + void buildPrometheusList(ConnectedCU_t *peerInfo, Family *prometheusFamily) { peerInfo->counters[IN_INITI][MSG_COUNTER][(ProcedureCode_id_E2setup)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"SetupRequest", "Messages"}}); peerInfo->counters[IN_INITI][BYTES_COUNTER][(ProcedureCode_id_E2setup)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"SetupRequest", "Bytes"}}); @@ -1707,6 +1885,7 @@ void buildPrometheusList(ConnectedCU_t *peerInfo, Family *prometheusFam peerInfo->counters[OUT_UN_SUCC][MSG_COUNTER][(ProcedureCode_id_RICserviceUpdate)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"RICserviceUpdateFailure", "Messages"}}); peerInfo->counters[OUT_UN_SUCC][BYTES_COUNTER][(ProcedureCode_id_RICserviceUpdate)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"RICserviceUpdateFailure", "Bytes"}}); } + /** * * @param pdu @@ -1765,6 +1944,10 @@ int XML_From_PER(ReportingMessages_t &message, RmrMessagesBuffer_t &rmrMessageBu mdclog_write(MDCLOG_ERR, "Error %d Decoding (unpack) setup response from E2MGR : %s", rval.code, message.message.enodbName); + if (pdu != nullptr) { + ASN_STRUCT_FREE(asn_DEF_E2AP_PDU, pdu); + pdu = nullptr; + } return -1; } @@ -1773,6 +1956,10 @@ int XML_From_PER(ReportingMessages_t &message, RmrMessagesBuffer_t &rmrMessageBu rmrMessageBuffer.sendMessage->payload, buff_size); if (er.encoded == -1) { mdclog_write(MDCLOG_ERR, "encoding of %s failed, %s", asn_DEF_E2AP_PDU.name, strerror(errno)); + if (pdu != nullptr) { + ASN_STRUCT_FREE(asn_DEF_E2AP_PDU, pdu); + pdu = nullptr; + } return -1; } else if (er.encoded > (ssize_t)buff_size) { mdclog_write(MDCLOG_ERR, "Buffer of size %d is to small for %s, at %s line %d", @@ -1780,9 +1967,17 @@ int XML_From_PER(ReportingMessages_t &message, RmrMessagesBuffer_t &rmrMessageBu asn_DEF_E2AP_PDU.name, __func__, __LINE__); + if (pdu != nullptr) { + ASN_STRUCT_FREE(asn_DEF_E2AP_PDU, pdu); + pdu = nullptr; + } return -1; } rmrMessageBuffer.sendMessage->len = er.encoded; + if (pdu != nullptr) { + ASN_STRUCT_FREE(asn_DEF_E2AP_PDU, pdu); + pdu = nullptr; + } return 0; } @@ -1823,6 +2018,11 @@ void asnInitiatingRequest(E2AP_PDU_t *pdu, message.message.messageType = RIC_E2_SETUP_REQ; message.peerInfo->counters[IN_INITI][MSG_COUNTER][ProcedureCode_id_E2setup]->Increment(); message.peerInfo->counters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_E2setup]->Increment((double)message.message.asnLength); + + // Update E2T instance level metrics + message.peerInfo->sctpParams->e2tCounters[IN_INITI][MSG_COUNTER][ProcedureCode_id_E2setup]->Increment(); + message.peerInfo->sctpParams->e2tCounters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_E2setup]->Increment((double)message.message.asnLength); + buildAndSendSetupRequest(message, rmrMessageBuffer, pdu); break; } @@ -1842,9 +2042,13 @@ void asnInitiatingRequest(E2AP_PDU_t *pdu, string messageName("RICserviceUpdate"); string ieName("RICserviceUpdateIEs"); message.message.messageType = RIC_SERVICE_UPDATE; -#ifndef UNIT_TEST +#if !(defined(UNIT_TEST) || defined(MODULE_TEST)) message.peerInfo->counters[IN_INITI][MSG_COUNTER][ProcedureCode_id_RICserviceUpdate]->Increment(); message.peerInfo->counters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_RICserviceUpdate]->Increment((double)message.message.asnLength); + + // Update E2T instance level metrics + message.peerInfo->sctpParams->e2tCounters[IN_INITI][MSG_COUNTER][ProcedureCode_id_RICserviceUpdate]->Increment(); + message.peerInfo->sctpParams->e2tCounters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_RICserviceUpdate]->Increment((double)message.message.asnLength); #endif buildAndSendSetupRequest(message, rmrMessageBuffer, pdu); break; @@ -1853,9 +2057,13 @@ void asnInitiatingRequest(E2AP_PDU_t *pdu, if (logLevel >= MDCLOG_DEBUG) { mdclog_write(MDCLOG_DEBUG, "Got ErrorIndication %s", message.message.enodbName); } -#ifndef UNIT_TEST +#if !(defined(UNIT_TEST) || defined(MODULE_TEST)) message.peerInfo->counters[IN_INITI][MSG_COUNTER][ProcedureCode_id_ErrorIndication]->Increment(); message.peerInfo->counters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_ErrorIndication]->Increment((double)message.message.asnLength); + + // Update E2T instance level metrics + message.peerInfo->sctpParams->e2tCounters[IN_INITI][MSG_COUNTER][ProcedureCode_id_ErrorIndication]->Increment(); + message.peerInfo->sctpParams->e2tCounters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_ErrorIndication]->Increment((double)message.message.asnLength); #endif if (sendRequestToXapp(message, RIC_ERROR_INDICATION, rmrMessageBuffer) != 0) { mdclog_write(MDCLOG_ERR, "RIC_ERROR_INDICATION failed to send to xAPP"); @@ -1866,9 +2074,13 @@ void asnInitiatingRequest(E2AP_PDU_t *pdu, if (logLevel >= MDCLOG_DEBUG) { mdclog_write(MDCLOG_DEBUG, "Got Reset %s", message.message.enodbName); } -#ifndef UNIT_TEST +#if !(defined(UNIT_TEST) || defined(MODULE_TEST)) message.peerInfo->counters[IN_INITI][MSG_COUNTER][ProcedureCode_id_Reset]->Increment(); message.peerInfo->counters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_Reset]->Increment((double)message.message.asnLength); + + // Update E2T instance level metrics + message.peerInfo->sctpParams->e2tCounters[IN_INITI][MSG_COUNTER][ProcedureCode_id_Reset]->Increment(); + message.peerInfo->sctpParams->e2tCounters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_Reset]->Increment((double)message.message.asnLength); #endif if (XML_From_PER(message, rmrMessageBuffer) < 0) { break; @@ -1912,9 +2124,13 @@ void asnInitiatingRequest(E2AP_PDU_t *pdu, ie->value.choice.RICrequestID.ricInstanceID, ie->value.choice.RICrequestID.ricRequestorID); } -#ifndef UNIT_TEST +#if !(defined(UNIT_TEST) || defined(MODULE_TEST)) message.peerInfo->counters[IN_INITI][MSG_COUNTER][ProcedureCode_id_RICindication]->Increment(); message.peerInfo->counters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_RICindication]->Increment((double)message.message.asnLength); + + // Update E2T instance level metrics + message.peerInfo->sctpParams->e2tCounters[IN_INITI][MSG_COUNTER][ProcedureCode_id_RICindication]->Increment(); + message.peerInfo->sctpParams->e2tCounters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_RICindication]->Increment((double)message.message.asnLength); #endif sendRmrMessage(rmrMessageBuffer, message); messageSent = true; @@ -1959,9 +2175,13 @@ void asnSuccessfulMsg(E2AP_PDU_t *pdu, if (logLevel >= MDCLOG_DEBUG) { mdclog_write(MDCLOG_DEBUG, "Got Reset %s", message.message.enodbName); } -#ifndef UNIT_TEST +#if !(defined(UNIT_TEST) || defined(MODULE_TEST)) message.peerInfo->counters[IN_SUCC][MSG_COUNTER][ProcedureCode_id_Reset]->Increment(); message.peerInfo->counters[IN_SUCC][BYTES_COUNTER][ProcedureCode_id_Reset]->Increment((double)message.message.asnLength); + + // Update E2T instance level metrics + message.peerInfo->sctpParams->e2tCounters[IN_SUCC][MSG_COUNTER][ProcedureCode_id_Reset]->Increment(); + message.peerInfo->sctpParams->e2tCounters[IN_SUCC][BYTES_COUNTER][ProcedureCode_id_Reset]->Increment((double)message.message.asnLength); #endif if (XML_From_PER(message, rmrMessageBuffer) < 0) { break; @@ -1998,9 +2218,13 @@ void asnSuccessfulMsg(E2AP_PDU_t *pdu, rmr_bytes2meid(rmrMessageBuffer.sendMessage, (unsigned char *)message.message.enodbName, strlen(message.message.enodbName)); -#ifndef UNIT_TEST +#if !(defined(UNIT_TEST) || defined(MODULE_TEST)) message.peerInfo->counters[IN_SUCC][MSG_COUNTER][ProcedureCode_id_RICcontrol]->Increment(); message.peerInfo->counters[IN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICcontrol]->Increment((double)message.message.asnLength); + + // Update E2T instance level metrics + message.peerInfo->sctpParams->e2tCounters[IN_SUCC][MSG_COUNTER][ProcedureCode_id_RICcontrol]->Increment(); + message.peerInfo->sctpParams->e2tCounters[IN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICcontrol]->Increment((double)message.message.asnLength); #endif sendRmrMessage(rmrMessageBuffer, message); messageSent = true; @@ -2019,9 +2243,13 @@ void asnSuccessfulMsg(E2AP_PDU_t *pdu, if (logLevel >= MDCLOG_DEBUG) { mdclog_write(MDCLOG_DEBUG, "Got RICsubscription %s", message.message.enodbName); } -#ifndef UNIT_TEST +#if !(defined(UNIT_TEST) || defined(MODULE_TEST)) message.peerInfo->counters[IN_SUCC][MSG_COUNTER][ProcedureCode_id_RICsubscription]->Increment(); message.peerInfo->counters[IN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICsubscription]->Increment((double)message.message.asnLength); + + // Update E2T instance level metrics + message.peerInfo->sctpParams->e2tCounters[IN_SUCC][MSG_COUNTER][ProcedureCode_id_RICsubscription]->Increment(); + message.peerInfo->sctpParams->e2tCounters[IN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICsubscription]->Increment((double)message.message.asnLength); #endif if (sendRequestToXapp(message, RIC_SUB_RESP, rmrMessageBuffer) != 0) { mdclog_write(MDCLOG_ERR, "Subscription successful message failed to send to xAPP"); @@ -2032,9 +2260,13 @@ void asnSuccessfulMsg(E2AP_PDU_t *pdu, if (logLevel >= MDCLOG_DEBUG) { mdclog_write(MDCLOG_DEBUG, "Got RICsubscriptionDelete %s", message.message.enodbName); } -#ifndef UNIT_TEST +#if !(defined(UNIT_TEST) || defined(MODULE_TEST)) message.peerInfo->counters[IN_SUCC][MSG_COUNTER][ProcedureCode_id_RICsubscriptionDelete]->Increment(); message.peerInfo->counters[IN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICsubscriptionDelete]->Increment((double)message.message.asnLength); + + // Update E2T instance level metrics + message.peerInfo->sctpParams->e2tCounters[IN_SUCC][MSG_COUNTER][ProcedureCode_id_RICsubscriptionDelete]->Increment(); + message.peerInfo->sctpParams->e2tCounters[IN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICsubscriptionDelete]->Increment((double)message.message.asnLength); #endif if (sendRequestToXapp(message, RIC_SUB_DEL_RESP, rmrMessageBuffer) != 0) { mdclog_write(MDCLOG_ERR, "Subscription delete successful message failed to send to xAPP"); @@ -2092,10 +2324,14 @@ void asnUnSuccsesfulMsg(E2AP_PDU_t *pdu, rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, strlen((const char *) tx)); rmr_bytes2meid(rmrMessageBuffer.sendMessage, (unsigned char *) message.message.enodbName, strlen(message.message.enodbName)); -#ifndef UNIT_TEST +#if !(defined(UNIT_TEST) || defined(MODULE_TEST)) message.peerInfo->counters[IN_UN_SUCC][MSG_COUNTER][ProcedureCode_id_RICcontrol]->Increment(); message.peerInfo->counters[IN_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICcontrol]->Increment((double)message.message.asnLength); -#endif + + // Update E2T instance level metrics + message.peerInfo->sctpParams->e2tCounters[IN_UN_SUCC][MSG_COUNTER][ProcedureCode_id_RICcontrol]->Increment(); + message.peerInfo->sctpParams->e2tCounters[IN_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICcontrol]->Increment((double)message.message.asnLength); +#endif sendRmrMessage(rmrMessageBuffer, message); messageSent = true; } else { @@ -2112,9 +2348,13 @@ void asnUnSuccsesfulMsg(E2AP_PDU_t *pdu, if (logLevel >= MDCLOG_DEBUG) { mdclog_write(MDCLOG_DEBUG, "Got RICsubscription %s", message.message.enodbName); } -#ifndef UNIT_TEST +#if !(defined(UNIT_TEST) || defined(MODULE_TEST)) message.peerInfo->counters[IN_UN_SUCC][MSG_COUNTER][ProcedureCode_id_RICsubscription]->Increment(); message.peerInfo->counters[IN_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICsubscription]->Increment((double)message.message.asnLength); + + // Update E2T instance level metrics + message.peerInfo->sctpParams->e2tCounters[IN_UN_SUCC][MSG_COUNTER][ProcedureCode_id_RICsubscription]->Increment(); + message.peerInfo->sctpParams->e2tCounters[IN_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICsubscription]->Increment((double)message.message.asnLength); #endif if (sendRequestToXapp(message, RIC_SUB_FAILURE, rmrMessageBuffer) != 0) { mdclog_write(MDCLOG_ERR, "Subscription unsuccessful message failed to send to xAPP"); @@ -2125,9 +2365,13 @@ void asnUnSuccsesfulMsg(E2AP_PDU_t *pdu, if (logLevel >= MDCLOG_DEBUG) { mdclog_write(MDCLOG_DEBUG, "Got RICsubscriptionDelete %s", message.message.enodbName); } -#ifndef UNIT_TEST +#if !(defined(UNIT_TEST) || defined(MODULE_TEST)) message.peerInfo->counters[IN_UN_SUCC][MSG_COUNTER][ProcedureCode_id_RICsubscriptionDelete]->Increment(); message.peerInfo->counters[IN_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICsubscriptionDelete]->Increment((double)message.message.asnLength); + + // Update E2T instance level metrics + message.peerInfo->sctpParams->e2tCounters[IN_UN_SUCC][MSG_COUNTER][ProcedureCode_id_RICsubscriptionDelete]->Increment(); + message.peerInfo->sctpParams->e2tCounters[IN_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICsubscriptionDelete]->Increment((double)message.message.asnLength); #endif if (sendRequestToXapp(message, RIC_SUB_FAILURE, rmrMessageBuffer) != 0) { mdclog_write(MDCLOG_ERR, "Subscription Delete unsuccessful message failed to send to xAPP"); @@ -2137,9 +2381,9 @@ void asnUnSuccsesfulMsg(E2AP_PDU_t *pdu, default: { mdclog_write(MDCLOG_WARN, "Undefined or not supported message = %ld", procedureCode); message.message.messageType = 0; // no RMR message type yet - +#if !(defined(UNIT_TEST) || defined(MODULE_TEST)) buildJsonMessage(message); - +#endif break; } } @@ -2234,9 +2478,16 @@ int PER_FromXML(ReportingMessages_t &message, RmrMessagesBuffer_t &rmrMessageBuf mdclog_write(MDCLOG_DEBUG, "%s After decoding the XML to PDU", __func__ ); } if (rval.code != RC_OK) { +#ifdef UNIT_TEST + return 0; +#endif mdclog_write(MDCLOG_ERR, "Error %d Decoding (unpack) setup response from E2MGR : %s", rval.code, message.message.enodbName); + if (pdu != nullptr) { + ASN_STRUCT_FREE(asn_DEF_E2AP_PDU, pdu); + pdu = nullptr; + } return -1; } @@ -2248,6 +2499,10 @@ int PER_FromXML(ReportingMessages_t &message, RmrMessagesBuffer_t &rmrMessageBuf } if (er.encoded == -1) { mdclog_write(MDCLOG_ERR, "encoding of %s failed, %s", asn_DEF_E2AP_PDU.name, strerror(errno)); + if (pdu != nullptr) { + ASN_STRUCT_FREE(asn_DEF_E2AP_PDU, pdu); + pdu = nullptr; + } return -1; } else if (er.encoded > (ssize_t)buff_size) { mdclog_write(MDCLOG_ERR, "Buffer of size %d is to small for %s, at %s line %d", @@ -2255,9 +2510,17 @@ int PER_FromXML(ReportingMessages_t &message, RmrMessagesBuffer_t &rmrMessageBuf asn_DEF_E2AP_PDU.name, __func__, __LINE__); + if (pdu != nullptr) { + ASN_STRUCT_FREE(asn_DEF_E2AP_PDU, pdu); + pdu = nullptr; + } return -1; } rmrMessageBuffer.rcvMessage->len = er.encoded; + if (pdu != nullptr) { + ASN_STRUCT_FREE(asn_DEF_E2AP_PDU, pdu); + pdu = nullptr; + } return 0; } @@ -2295,7 +2558,7 @@ int receiveXappMessages(Sctp_Map_t *sctpMap, // get message payload //auto msgData = msg->payload; #ifdef UNIT_TEST - rmrMessageBuffer.rcvMessage->state = 0; + rmrMessageBuffer.rcvMessage->state = 0; #endif if (rmrMessageBuffer.rcvMessage->state != 0) { mdclog_write(MDCLOG_ERR, "RMR Receiving message with stat = %d", rmrMessageBuffer.rcvMessage->state); @@ -2311,6 +2574,9 @@ int receiveXappMessages(Sctp_Map_t *sctpMap, case RIC_HEALTH_CHECK_REQ: break; default: +#ifdef UNIT_TEST + break; +#endif mdclog_write(MDCLOG_ERR, "Failed to send message no CU entry %s", message.message.enodbName); return -1; } @@ -2328,8 +2594,14 @@ int receiveXappMessages(Sctp_Map_t *sctpMap, if (PER_FromXML(message, rmrMessageBuffer) != 0) { break; } +#if !(defined(UNIT_TEST) || defined(MODULE_TEST)) message.peerInfo->counters[OUT_SUCC][MSG_COUNTER][ProcedureCode_id_E2setup]->Increment(); message.peerInfo->counters[OUT_SUCC][BYTES_COUNTER][ProcedureCode_id_E2setup]->Increment(rmrMessageBuffer.rcvMessage->len); + + // Update E2T instance level metrics + message.peerInfo->sctpParams->e2tCounters[OUT_SUCC][MSG_COUNTER][ProcedureCode_id_E2setup]->Increment(); + message.peerInfo->sctpParams->e2tCounters[OUT_SUCC][BYTES_COUNTER][ProcedureCode_id_E2setup]->Increment(rmrMessageBuffer.rcvMessage->len); +#endif if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) { mdclog_write(MDCLOG_ERR, "Failed to send RIC_E2_SETUP_RESP"); return -6; @@ -2343,8 +2615,14 @@ int receiveXappMessages(Sctp_Map_t *sctpMap, if (PER_FromXML(message, rmrMessageBuffer) != 0) { break; } +#if !(defined(UNIT_TEST) || defined(MODULE_TEST)) message.peerInfo->counters[OUT_UN_SUCC][MSG_COUNTER][ProcedureCode_id_E2setup]->Increment(); message.peerInfo->counters[OUT_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_E2setup]->Increment(rmrMessageBuffer.rcvMessage->len); + + // Update E2T instance level metrics + message.peerInfo->sctpParams->e2tCounters[OUT_UN_SUCC][MSG_COUNTER][ProcedureCode_id_E2setup]->Increment(); + message.peerInfo->sctpParams->e2tCounters[OUT_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_E2setup]->Increment(rmrMessageBuffer.rcvMessage->len); +#endif if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) { mdclog_write(MDCLOG_ERR, "Failed to send RIC_E2_SETUP_FAILURE"); return -6; @@ -2355,8 +2633,14 @@ int receiveXappMessages(Sctp_Map_t *sctpMap, if (loglevel >= MDCLOG_DEBUG) { mdclog_write(MDCLOG_DEBUG, "RIC_ERROR_INDICATION"); } +#if !(defined(UNIT_TEST) || defined(MODULE_TEST)) message.peerInfo->counters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_ErrorIndication]->Increment(); message.peerInfo->counters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_ErrorIndication]->Increment(rmrMessageBuffer.rcvMessage->len); + + // Update E2T instance level metrics + message.peerInfo->sctpParams->e2tCounters[IN_INITI][MSG_COUNTER][ProcedureCode_id_ErrorIndication]->Increment(); + message.peerInfo->sctpParams->e2tCounters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_ErrorIndication]->Increment(rmrMessageBuffer.rcvMessage->len); +#endif if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) { mdclog_write(MDCLOG_ERR, "Failed to send RIC_ERROR_INDICATION"); return -6; @@ -2367,8 +2651,14 @@ int receiveXappMessages(Sctp_Map_t *sctpMap, if (loglevel >= MDCLOG_DEBUG) { mdclog_write(MDCLOG_DEBUG, "RIC_SUB_REQ"); } +#if !(defined(UNIT_TEST) || defined(MODULE_TEST)) message.peerInfo->counters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_RICsubscription]->Increment(); message.peerInfo->counters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_RICsubscription]->Increment(rmrMessageBuffer.rcvMessage->len); + + // Update E2T instance level metrics + message.peerInfo->sctpParams->e2tCounters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_RICsubscription]->Increment(); + message.peerInfo->sctpParams->e2tCounters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_RICsubscription]->Increment(rmrMessageBuffer.rcvMessage->len); +#endif if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) { mdclog_write(MDCLOG_ERR, "Failed to send RIC_SUB_REQ"); return -6; @@ -2379,8 +2669,14 @@ int receiveXappMessages(Sctp_Map_t *sctpMap, if (loglevel >= MDCLOG_DEBUG) { mdclog_write(MDCLOG_DEBUG, "RIC_SUB_DEL_REQ"); } +#if !(defined(UNIT_TEST) || defined(MODULE_TEST)) message.peerInfo->counters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_RICsubscriptionDelete]->Increment(); message.peerInfo->counters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_RICsubscriptionDelete]->Increment(rmrMessageBuffer.rcvMessage->len); + + // Update E2T instance level metrics + message.peerInfo->sctpParams->e2tCounters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_RICsubscriptionDelete]->Increment(); + message.peerInfo->sctpParams->e2tCounters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_RICsubscriptionDelete]->Increment(rmrMessageBuffer.rcvMessage->len); +#endif if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) { mdclog_write(MDCLOG_ERR, "Failed to send RIC_SUB_DEL_REQ"); return -6; @@ -2391,8 +2687,14 @@ int receiveXappMessages(Sctp_Map_t *sctpMap, if (loglevel >= MDCLOG_DEBUG) { mdclog_write(MDCLOG_DEBUG, "RIC_CONTROL_REQ"); } +#if !(defined(UNIT_TEST) || defined(MODULE_TEST)) message.peerInfo->counters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_RICcontrol]->Increment(); message.peerInfo->counters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_RICcontrol]->Increment(rmrMessageBuffer.rcvMessage->len); + + // Update E2T instance level metrics + message.peerInfo->sctpParams->e2tCounters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_RICcontrol]->Increment(); + message.peerInfo->sctpParams->e2tCounters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_RICcontrol]->Increment(rmrMessageBuffer.rcvMessage->len); +#endif if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) { mdclog_write(MDCLOG_ERR, "Failed to send RIC_CONTROL_REQ"); return -6; @@ -2406,8 +2708,14 @@ int receiveXappMessages(Sctp_Map_t *sctpMap, if (PER_FromXML(message, rmrMessageBuffer) != 0) { break; } +#if !(defined(UNIT_TEST) || defined(MODULE_TEST)) message.peerInfo->counters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_RICserviceQuery]->Increment(); message.peerInfo->counters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_RICserviceQuery]->Increment(rmrMessageBuffer.rcvMessage->len); + + // Update E2T instance level metrics + message.peerInfo->sctpParams->e2tCounters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_RICserviceQuery]->Increment(); + message.peerInfo->sctpParams->e2tCounters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_RICserviceQuery]->Increment(rmrMessageBuffer.rcvMessage->len); +#endif if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) { mdclog_write(MDCLOG_ERR, "Failed to send RIC_SERVICE_QUERY"); return -6; @@ -2422,8 +2730,14 @@ int receiveXappMessages(Sctp_Map_t *sctpMap, mdclog_write(MDCLOG_ERR, "error in PER_FromXML"); break; } +#if !(defined(UNIT_TEST) || defined(MODULE_TEST)) message.peerInfo->counters[OUT_SUCC][MSG_COUNTER][ProcedureCode_id_RICserviceUpdate]->Increment(); message.peerInfo->counters[OUT_SUCC][BYTES_COUNTER][ProcedureCode_id_RICserviceUpdate]->Increment(rmrMessageBuffer.rcvMessage->len); + + // Update E2T instance level metrics + message.peerInfo->sctpParams->e2tCounters[OUT_SUCC][MSG_COUNTER][ProcedureCode_id_RICserviceUpdate]->Increment(); + message.peerInfo->sctpParams->e2tCounters[OUT_SUCC][BYTES_COUNTER][ProcedureCode_id_RICserviceUpdate]->Increment(rmrMessageBuffer.rcvMessage->len); +#endif if (loglevel >= MDCLOG_DEBUG) { mdclog_write(MDCLOG_DEBUG, "Before sending to CU"); } @@ -2440,8 +2754,14 @@ int receiveXappMessages(Sctp_Map_t *sctpMap, if (PER_FromXML(message, rmrMessageBuffer) != 0) { break; } +#if !(defined(UNIT_TEST) || defined(MODULE_TEST)) message.peerInfo->counters[OUT_UN_SUCC][MSG_COUNTER][ProcedureCode_id_RICserviceUpdate]->Increment(); message.peerInfo->counters[OUT_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICserviceUpdate]->Increment(rmrMessageBuffer.rcvMessage->len); + + // Update E2T instance level metrics + message.peerInfo->sctpParams->e2tCounters[OUT_UN_SUCC][MSG_COUNTER][ProcedureCode_id_RICserviceUpdate]->Increment(); + message.peerInfo->sctpParams->e2tCounters[OUT_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICserviceUpdate]->Increment(rmrMessageBuffer.rcvMessage->len); +#endif if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) { mdclog_write(MDCLOG_ERR, "Failed to send RIC_SERVICE_UPDATE_FAILURE"); return -6; @@ -2455,8 +2775,14 @@ int receiveXappMessages(Sctp_Map_t *sctpMap, if (PER_FromXML(message, rmrMessageBuffer) != 0) { break; } +#if !(defined(UNIT_TEST) || defined(MODULE_TEST)) message.peerInfo->counters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_Reset]->Increment(); message.peerInfo->counters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_Reset]->Increment(rmrMessageBuffer.rcvMessage->len); + + // Update E2T instance level metrics + message.peerInfo->sctpParams->e2tCounters[IN_INITI][MSG_COUNTER][ProcedureCode_id_Reset]->Increment(); + message.peerInfo->sctpParams->e2tCounters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_Reset]->Increment(rmrMessageBuffer.rcvMessage->len); +#endif if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) { mdclog_write(MDCLOG_ERR, "Failed to send RIC_E2_RESET"); return -6; @@ -2470,8 +2796,14 @@ int receiveXappMessages(Sctp_Map_t *sctpMap, if (PER_FromXML(message, rmrMessageBuffer) != 0) { break; } +#if !(defined(UNIT_TEST) || defined(MODULE_TEST)) message.peerInfo->counters[OUT_SUCC][MSG_COUNTER][ProcedureCode_id_Reset]->Increment(); message.peerInfo->counters[OUT_SUCC][BYTES_COUNTER][ProcedureCode_id_Reset]->Increment(rmrMessageBuffer.rcvMessage->len); + + // Update E2T instance level metrics + message.peerInfo->sctpParams->e2tCounters[OUT_SUCC][MSG_COUNTER][ProcedureCode_id_Reset]->Increment(); + message.peerInfo->sctpParams->e2tCounters[OUT_SUCC][BYTES_COUNTER][ProcedureCode_id_Reset]->Increment(rmrMessageBuffer.rcvMessage->len); +#endif if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) { mdclog_write(MDCLOG_ERR, "Failed to send RIC_E2_RESET_RESP"); return -6; @@ -2523,7 +2855,7 @@ int receiveXappMessages(Sctp_Map_t *sctpMap, static unsigned char tx[32]; auto txLen = snprintf((char *) tx, sizeof tx, "%15ld", transactionCounter++); rmr_bytes2xact(rmrMessageBuffer.sendMessage, tx, txLen); -#ifndef UNIT_TEST +#if !(defined(UNIT_TEST) || defined(MODULE_TEST)) rmrMessageBuffer.sendMessage = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.sendMessage); #endif if (rmrMessageBuffer.sendMessage == nullptr) { @@ -2557,7 +2889,7 @@ int receiveXappMessages(Sctp_Map_t *sctpMap, } else if (rmrMessageBuffer.rcvMessage->state != 0) { mdclog_write(MDCLOG_ERR, "Failed to send RIC_HEALTH_CHECK_RESP, on RMR state = %d ( %s)", rmrMessageBuffer.rcvMessage->state, translateRmrErrorMessages(rmrMessageBuffer.rcvMessage->state).c_str()); - } else if (loglevel >= MDCLOG_DEBUG && ++counter % 100 == 0) { + } else if (loglevel >= MDCLOG_DEBUG && (++counter % 100 == 0)) { mdclog_write(MDCLOG_DEBUG, "Got %d RIC_HEALTH_CHECK_REQ Request send : OK", counter); } @@ -2648,6 +2980,7 @@ int addToEpoll(int epoll_fd, event.data.ptr = peerInfo; event.events = events; if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, peerInfo->fileDescriptor, &event) < 0) { +#if !(defined(UNIT_TEST) || defined(MODULE_TEST)) if (mdclog_level_get() >= MDCLOG_DEBUG) { mdclog_write(MDCLOG_DEBUG, "epoll_ctl EPOLL_CTL_ADD (may check not to quit here), %s, %s %d", strerror(errno), __func__, __LINE__); @@ -2670,6 +3003,7 @@ int addToEpoll(int epoll_fd, } mdclog_write(MDCLOG_ERR, "epoll_ctl EPOLL_CTL_ADD (may check not to quit here)"); return -1; +#endif } return 0; } @@ -2723,7 +3057,7 @@ int sendRmrMessage(RmrMessagesBuffer_t &rmrMessageBuffer, ReportingMessages_t &m #ifndef UNIT_TEST rmrMessageBuffer.sendMessage = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.sendMessage); #else - rmrMessageBuffer.sendMessage->state = RMR_ERR_RETRY; + rmrMessageBuffer.sendMessage->state = RMR_ERR_RETRY; #endif if (rmrMessageBuffer.sendMessage == nullptr) { rmrMessageBuffer.sendMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE); @@ -2768,7 +3102,7 @@ int sendRmrMessage(RmrMessagesBuffer_t &rmrMessageBuffer, ReportingMessages_t &m void buildJsonMessage(ReportingMessages_t &message) { #ifdef UNIT_TEST - jsonTrace = true; + jsonTrace = true; #endif if (jsonTrace) { message.outLen = sizeof(message.base64Data);