double cpuClock = 0.0;
bool jsonTrace = false;
+char* getinterfaceip()
+{
+ char hostname[256];
+ char *IP;
+ struct hostent *host_entry;
+ int retVal;
+ retVal = gethostname(hostname, sizeof(hostname));
+ if ( retVal == -1 )
+ return NULL;
+ host_entry = gethostbyname(hostname);
+ if ( host_entry == NULL )
+ return NULL;
+ IP = inet_ntoa(*((struct in_addr*) host_entry->h_addr_list[0]));
+ return IP;
+}
+
static int enable_log_change_notify(const char* fileName)
{
#if !(defined(UNIT_TEST) || defined(MODULE_TEST))
if( errno == EAGAIN ) {
} else {
- fprintf( stderr, "### CRIT ### config listener read err: %s\n", strerror( errno ) );
+ printf( "### CRIT ### config listener read err: %s\n", strerror( errno ) );
}
continue;
#endif
std::atomic<int64_t> num_of_messages{0};
std::atomic<int64_t> num_of_XAPP_messages{0};
static long transactionCounter = 0;
+pthread_mutex_t thread_lock;
int buildListeningPort(sctp_params_t &sctpParams) {
sctpParams.listenFD = socket(AF_INET6, SOCK_STREAM, IPPROTO_SCTP);
return -1;
#endif
}
+ struct sctp_initmsg initmsg;
+ memset (&initmsg, 0, sizeof (initmsg));
+ initmsg.sinit_num_ostreams = 2;
+ initmsg.sinit_max_instreams = 2;
+ initmsg.sinit_max_attempts = 4;
+ setsockopt (sctpParams.listenFD, IPPROTO_SCTP, SCTP_INITMSG, &initmsg, sizeof (initmsg));
struct sockaddr_in6 serverAddress {};
serverAddress.sin6_family = AF_INET6;
}
void startPrometheus(sctp_params_t &sctpParams) {
+ auto podName = std::getenv("POD_NAME");
+ string metric = "E2TBeta";
+ if (strstr(podName, "alpha") != NULL) {
+ metric = "E2TAlpha";
+ }
+ //Get eth0 interface IP
+ char* host = getinterfaceip();
+ string hostip = host;
+
sctpParams.prometheusFamily = &BuildCounter()
- .Name("E2T")
- .Help("E2T message counter")
+ .Name(metric.c_str())
+ .Help("E2T instance metrics")
.Labels({{"POD_NAME", sctpParams.podName}})
.Register(*sctpParams.prometheusRegistry);
- string prometheusPath = sctpParams.prometheusPort + "," + "[::]:" + sctpParams.prometheusPort;
+ // Build E2T instance level metrics
+ buildE2TPrometheusCounters(sctpParams);
+
+ string prometheusPath;
+ if (hostip.empty())
+ prometheusPath = sctpParams.prometheusPort + "," + "[::]:" + sctpParams.prometheusPort;
+ else
+ prometheusPath = hostip + ":" + sctpParams.prometheusPort;
+
if (mdclog_level_get() >= MDCLOG_DEBUG) {
mdclog_write(MDCLOG_DEBUG, "Start Prometheus Pull mode on %s", prometheusPath.c_str());
}
sctpParams.sctpMap = new mapWrapper();
+ if (pthread_mutex_init(&thread_lock, NULL) != 0) {
+ mdclog_write(MDCLOG_ERR, "failed to init thread lock");
+ exit(-1);
+ }
std::vector<std::thread> threads(num_cpus);
// std::vector<std::thread> threads;
for (auto &t : threads) {
t.join();
}
-
+ pthread_mutex_destroy(&thread_lock);
return 0;
}
#endif
continue;
}
mdclog_write(MDCLOG_ERR, "Epoll wait failed, errno = %s", strerror(errno));
+ if(events)
+ {
+ free(events);
+ events = nullptr;
+ }
return;
#endif
}
/* We have processed all incoming connections. */
if(peerInfo)
free(peerInfo);
+ peerInfo = nullptr;
break;
} else {
if(peerInfo)
free(peerInfo);
+ peerInfo = nullptr;
mdclog_write(MDCLOG_ERR, "Accept error, errno = %s", strerror(errno));
break;
}
close(peerInfo->fileDescriptor);
if(peerInfo)
free(peerInfo);
+ peerInfo = nullptr;
break;
#endif
}
+ struct sctp_event_subscribe sctpevents;
+ memset( (void *)&sctpevents, 0, sizeof(sctpevents) );
+ sctpevents.sctp_data_io_event = 1;
+ setsockopt(peerInfo->fileDescriptor, SOL_SCTP, SCTP_EVENTS,(const void *)&sctpevents, sizeof(sctpevents) );
+
auto ans = getnameinfo(&in_addr, in_len,
peerInfo->hostName, NI_MAXHOST,
peerInfo->portNumber, NI_MAXSERV, (unsigned )((unsigned int)NI_NUMERICHOST | (unsigned int)NI_NUMERICSERV));
close(peerInfo->fileDescriptor);
if(peerInfo)
free(peerInfo);
+ peerInfo = nullptr;
break;
}
if (mdclog_level_get() >= MDCLOG_DEBUG) {
0) != 0) {
if(peerInfo)
free(peerInfo);
+ peerInfo = nullptr;
break;
}
break;
ReportingMessages_t &message,
RmrMessagesBuffer_t &rmrMessageBuffer,
sctp_params_t *params) {
- if (event.data.fd != params->rmrListenFd) {
+ if ((event.data.fd != params->rmrListenFd) && (event.data.ptr != nullptr)) {
auto *peerInfo = (ConnectedCU_t *)event.data.ptr;
mdclog_write(MDCLOG_ERR, "epoll error, events %0x on fd %d, RAN NAME : %s",
event.events, peerInfo->fileDescriptor, peerInfo->enodbName);
}
#endif
close(peerInfo->fileDescriptor);
- params->sctpMap->erase(peerInfo->enodbName);
+ //params->sctpMap->erase(peerInfo->enodbName);
cleanHashEntry((ConnectedCU_t *) event.data.ptr, params->sctpMap);
} else {
mdclog_write(MDCLOG_ERR, "epoll error, events %0x on RMR FD", event.events);
* @param m
*/
void cleanHashEntry(ConnectedCU_t *val, Sctp_Map_t *m) {
+ if(val != nullptr)
+ {
char *dummy;
auto port = (uint16_t) strtol(val->portNumber, &dummy, 10);
char searchBuff[2048]{};
snprintf(searchBuff, sizeof searchBuff, "host:%s:%d", val->hostName, port);
+ if(m->find(searchBuff))
+ {
m->erase(searchBuff);
+ }
+ if(m->find(val->enodbName))
+ {
+ mdclog_write(MDCLOG_DEBUG, "remove key enodbName = %s from %s at line %d", val->enodbName, __FUNCTION__, __LINE__);
m->erase(val->enodbName);
+ }
#ifndef UNIT_TEST
- free(val);
+ if(val) {
+ free(val);
+ val = nullptr;
+ }
#endif
+ }
}
/**
auto loglevel = mdclog_level_get();
#ifndef UNIT_TEST
int fd = peerInfo->fileDescriptor;
+ int streamId = fetchStreamId(peerInfo,message);
#else
int fd = FILE_DESCRIPTOR;
+ int streamId = 0;
#endif
if (loglevel >= MDCLOG_DEBUG) {
mdclog_write(MDCLOG_DEBUG, "Send SCTP message for CU %s, %s",
}
while (true) {
- if (send(fd,message.message.asndata, message.message.asnLength,MSG_NOSIGNAL) < 0) {
+ if (sctp_sendmsg(fd,message.message.asndata, message.message.asnLength,(struct sockaddr *) NULL, 0, 0, 0,streamId,0,0) < 0) {
if (errno == EINTR) {
continue;
}
auto tmp = m->find(key);
if (tmp) {
free(tmp);
+ tmp = nullptr;
}
m->erase(key);
#ifndef UNIT_TEST
ReportingMessages_t message {};
auto done = 0;
auto loglevel = mdclog_level_get();
+ struct sctp_sndrcvinfo sndrcvinfo;
+ int flags;
+ int streamId;
// get the identity of the interface
+ if (events->data.ptr != nullptr){
message.peerInfo = (ConnectedCU_t *)events->data.ptr;
+ }
struct timespec start{0, 0};
struct timespec decodeStart{0, 0};
message.message.asndata = rmrMessageBuffer.sendMessage->payload;
#ifndef UNIT_TEST
message.message.asnLength = rmrMessageBuffer.sendMessage->len =
- read(message.peerInfo->fileDescriptor, rmrMessageBuffer.sendMessage->payload, RECEIVE_SCTP_BUFFER_SIZE);
+ sctp_recvmsg(message.peerInfo->fileDescriptor, rmrMessageBuffer.sendMessage->payload, RECEIVE_SCTP_BUFFER_SIZE,(struct sockaddr *) NULL, 0, &sndrcvinfo, &flags);
+ mdclog_write(MDCLOG_DEBUG, "Start Read from SCTP fd %d stream %d ", message.peerInfo->fileDescriptor, sndrcvinfo.sinfo_stream);
+ streamId = sndrcvinfo.sinfo_stream;
#else
message.message.asnLength = rmrMessageBuffer.sendMessage->len;
+ streamId = 0;
#endif
if (loglevel >= MDCLOG_DEBUG) {
switch (pdu->present) {
case E2AP_PDU_PR_initiatingMessage: {//initiating message
- asnInitiatingRequest(pdu, sctpMap,message, rmrMessageBuffer);
+ asnInitiatingRequest(pdu, sctpMap,message, rmrMessageBuffer, streamId);
break;
}
case E2AP_PDU_PR_successfulOutcome: { //successful outcome
#endif
/* Closing descriptor make epoll remove it from the set of descriptors which are monitored. */
+#ifndef UNIT_TEST
+ pthread_mutex_lock(&thread_lock);
+ if (fcntl(message.peerInfo->fileDescriptor, F_GETFD) != -1) {
+ mdclog_write(MDCLOG_DEBUG, "Closing connection - descriptor = %d", message.peerInfo->fileDescriptor);
+ close(message.peerInfo->fileDescriptor);
+ cleanHashEntry((ConnectedCU_t *) events->data.ptr, sctpMap);
+ }
+ pthread_mutex_unlock(&thread_lock);
+#else
close(message.peerInfo->fileDescriptor);
cleanHashEntry((ConnectedCU_t *) events->data.ptr, sctpMap);
+#endif
}
if (loglevel >= MDCLOG_DEBUG) {
clock_gettime(CLOCK_MONOTONIC, &end);
// out of memory
mdclog_write(MDCLOG_ERR, "Reallocating buffer for %s failed, %s", asn_DEF_E2AP_PDU.name, strerror(errno));
free(buffer);
+ buffer = nullptr;
return;
}
buffer = newBuffer;
rmr_free_msg(rmrMsg);
}
free(buffer);
+ buffer = nullptr;
return;
}
#endif
+void buildE2TPrometheusCounters(sctp_params_t &sctpParams) {
+ sctpParams.e2tCounters[IN_INITI][MSG_COUNTER][(ProcedureCode_id_E2setup)] = &sctpParams.prometheusFamily->Add({{"counter", "SetupRequestMsgs"}});
+ sctpParams.e2tCounters[IN_INITI][BYTES_COUNTER][(ProcedureCode_id_E2setup)] = &sctpParams.prometheusFamily->Add({{"counter", "SetupRequestBytes"}});
+
+ sctpParams.e2tCounters[OUT_SUCC][MSG_COUNTER][(ProcedureCode_id_E2setup)] = &sctpParams.prometheusFamily->Add({{"counter", "SetupResponseMsgs"}});
+ sctpParams.e2tCounters[OUT_SUCC][BYTES_COUNTER][(ProcedureCode_id_E2setup)] = &sctpParams.prometheusFamily->Add({{"counter", "SetupResponseBytes"}});
+
+ sctpParams.e2tCounters[OUT_UN_SUCC][MSG_COUNTER][ProcedureCode_id_E2setup] = &sctpParams.prometheusFamily->Add({{"counter", "SetupRequestFailureMsgs"}});
+ sctpParams.e2tCounters[OUT_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_E2setup] = &sctpParams.prometheusFamily->Add({{"counter", "SetupRequestFailureBytes"}});
+
+ sctpParams.e2tCounters[IN_INITI][MSG_COUNTER][(ProcedureCode_id_E2nodeConfigurationUpdate)] = &sctpParams.prometheusFamily->Add({{"counter", "E2NodeConfigUpdateMsgs"}});
+ sctpParams.e2tCounters[IN_INITI][BYTES_COUNTER][(ProcedureCode_id_E2nodeConfigurationUpdate)] = &sctpParams.prometheusFamily->Add({{"counter", "E2NodeConfigUpdateBytes"}});
+
+ sctpParams.e2tCounters[OUT_SUCC][MSG_COUNTER][(ProcedureCode_id_E2nodeConfigurationUpdate)] = &sctpParams.prometheusFamily->Add({{"counter", "E2NodeConfigUpdateResponseMsgs"}});
+ sctpParams.e2tCounters[OUT_SUCC][BYTES_COUNTER][(ProcedureCode_id_E2nodeConfigurationUpdate)] = &sctpParams.prometheusFamily->Add({{"counter", "E2NodeConfigUpdateResponseBytes"}});
+
+ sctpParams.e2tCounters[OUT_UN_SUCC][MSG_COUNTER][ProcedureCode_id_E2nodeConfigurationUpdate] = &sctpParams.prometheusFamily->Add({{"counter", "E2NodeConfigUpdateFailureMsgs"}});
+ sctpParams.e2tCounters[OUT_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_E2nodeConfigurationUpdate] = &sctpParams.prometheusFamily->Add({{"counter", "E2NodeConfigUpdateFailureBytes"}});
+
+ sctpParams.e2tCounters[IN_INITI][MSG_COUNTER][(ProcedureCode_id_ErrorIndication)] = &sctpParams.prometheusFamily->Add({{"counter", "ErrorIndicationMsgs"}});
+ sctpParams.e2tCounters[IN_INITI][BYTES_COUNTER][(ProcedureCode_id_ErrorIndication)] = &sctpParams.prometheusFamily->Add({{"counter", "ErrorIndicationBytes"}});
+
+ sctpParams.e2tCounters[IN_INITI][MSG_COUNTER][ProcedureCode_id_Reset] = &sctpParams.prometheusFamily->Add({{"counter", "ResetRequestMsgs"}});
+ sctpParams.e2tCounters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_Reset] = &sctpParams.prometheusFamily->Add({{"counter", "ResetRequestBytes"}});
+
+ sctpParams.e2tCounters[OUT_SUCC][MSG_COUNTER][ProcedureCode_id_Reset] = &sctpParams.prometheusFamily->Add({{"counter", "ResetAckMsgs"}});
+ sctpParams.e2tCounters[OUT_SUCC][BYTES_COUNTER][ProcedureCode_id_Reset] = &sctpParams.prometheusFamily->Add({{"counter", "ResetAckBytes"}});
+
+ sctpParams.e2tCounters[IN_INITI][MSG_COUNTER][ProcedureCode_id_RICserviceUpdate] = &sctpParams.prometheusFamily->Add({{"counter", "RICServiceUpdateMsgs"}});
+ sctpParams.e2tCounters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_RICserviceUpdate] = &sctpParams.prometheusFamily->Add({{"counter", "RICServiceUpdateBytes"}});
+
+ sctpParams.e2tCounters[OUT_SUCC][MSG_COUNTER][ProcedureCode_id_RICserviceUpdate] = &sctpParams.prometheusFamily->Add({{"counter", "RICServiceUpdateRespMsgs"}});
+ sctpParams.e2tCounters[OUT_SUCC][BYTES_COUNTER][ProcedureCode_id_RICserviceUpdate] = &sctpParams.prometheusFamily->Add({{"counter", "RICServiceUpdateRespBytes"}});
+
+ sctpParams.e2tCounters[OUT_UN_SUCC][MSG_COUNTER][ProcedureCode_id_RICserviceUpdate] = &sctpParams.prometheusFamily->Add({{"counter", "RICServiceUpdateFailureMsgs"}});
+ sctpParams.e2tCounters[OUT_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICserviceUpdate] = &sctpParams.prometheusFamily->Add({{"counter", "RICServiceUpdateFailureBytes"}});
+
+ sctpParams.e2tCounters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_RICcontrol] = &sctpParams.prometheusFamily->Add({{"counter", "RICControlMsgs"}});
+ sctpParams.e2tCounters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_RICcontrol] = &sctpParams.prometheusFamily->Add({{"counter", "RICControlBytes"}});
+
+ sctpParams.e2tCounters[IN_SUCC][MSG_COUNTER][ProcedureCode_id_RICcontrol] = &sctpParams.prometheusFamily->Add({{"counter", "RICControlAckMsgs"}});
+ sctpParams.e2tCounters[IN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICcontrol] = &sctpParams.prometheusFamily->Add({{"counter", "RICControlAckBytes"}});
+
+ sctpParams.e2tCounters[IN_UN_SUCC][MSG_COUNTER][ProcedureCode_id_RICcontrol] = &sctpParams.prometheusFamily->Add({{"counter", "RICControlFailureMsgs"}});
+ sctpParams.e2tCounters[IN_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICcontrol] = &sctpParams.prometheusFamily->Add({{"counter", "RICControlFailureBytes"}});
+
+ sctpParams.e2tCounters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_RICsubscription] = &sctpParams.prometheusFamily->Add({{"counter", "RICSubscriptionMsgs"}});
+ sctpParams.e2tCounters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_RICsubscription] = &sctpParams.prometheusFamily->Add({{"counter", "RICSubscriptionBytes"}});
+
+ sctpParams.e2tCounters[IN_SUCC][MSG_COUNTER][ProcedureCode_id_RICsubscription] = &sctpParams.prometheusFamily->Add({{"counter", "RICSubscriptionAckMsgs"}});
+ sctpParams.e2tCounters[IN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICsubscription] = &sctpParams.prometheusFamily->Add({{"counter", "RICSubscriptionAckBytes"}});
+
+ sctpParams.e2tCounters[IN_UN_SUCC][MSG_COUNTER][ProcedureCode_id_RICsubscription] = &sctpParams.prometheusFamily->Add({{"counter", "RICSubscriptionFailureMsgs"}});
+ sctpParams.e2tCounters[IN_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICsubscription] = &sctpParams.prometheusFamily->Add({{"counter", "RICSubscriptionFailureBytes"}});
+
+ sctpParams.e2tCounters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_RICsubscriptionDelete] = &sctpParams.prometheusFamily->Add({{"counter", "RICSubscriptionDeleteMsgs"}});
+ sctpParams.e2tCounters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_RICsubscriptionDelete] = &sctpParams.prometheusFamily->Add({{"counter", "RICSubscriptionDeleteBytes"}});
+
+ sctpParams.e2tCounters[IN_SUCC][MSG_COUNTER][ProcedureCode_id_RICsubscriptionDelete] = &sctpParams.prometheusFamily->Add({{"counter", "RICSubscriptionDeleteAckMsgs"}});
+ sctpParams.e2tCounters[IN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICsubscriptionDelete] = &sctpParams.prometheusFamily->Add({{"counter", "RICSubscriptionDeleteAckBytes"}});
+
+ sctpParams.e2tCounters[IN_UN_SUCC][MSG_COUNTER][ProcedureCode_id_RICsubscriptionDelete] = &sctpParams.prometheusFamily->Add({{"counter", "RICSubscriptionDeleteFailMsgs"}});
+ sctpParams.e2tCounters[IN_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICsubscriptionDelete] = &sctpParams.prometheusFamily->Add({{"counter", "RICSubscriptionDeleteFailBytes"}});
+
+ sctpParams.e2tCounters[IN_INITI][MSG_COUNTER][ProcedureCode_id_RICindication] = &sctpParams.prometheusFamily->Add({{"counter", "RICIndicationMsgs"}});
+ sctpParams.e2tCounters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_RICindication] = &sctpParams.prometheusFamily->Add({{"counter", "RICIndicationBytes"}});
+
+ sctpParams.e2tCounters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_RICserviceQuery] = &sctpParams.prometheusFamily->Add({{"counter", "RICServiceQueryMsgs"}});
+ sctpParams.e2tCounters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_RICserviceQuery] = &sctpParams.prometheusFamily->Add({{"counter", "RICServiceQueryBytes"}});
+}
+
void buildPrometheusList(ConnectedCU_t *peerInfo, Family<Counter> *prometheusFamily) {
peerInfo->counters[IN_INITI][MSG_COUNTER][(ProcedureCode_id_E2setup)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"SetupRequest", "Messages"}});
peerInfo->counters[IN_INITI][BYTES_COUNTER][(ProcedureCode_id_E2setup)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"SetupRequest", "Bytes"}});
+ peerInfo->counters[IN_INITI][MSG_COUNTER][(ProcedureCode_id_E2nodeConfigurationUpdate)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"E2NodeConfigUpdate", "Messages"}});
+ peerInfo->counters[IN_INITI][BYTES_COUNTER][(ProcedureCode_id_E2nodeConfigurationUpdate)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"E2NodeConfigUpdate", "Bytes"}});
+
peerInfo->counters[IN_INITI][MSG_COUNTER][(ProcedureCode_id_ErrorIndication)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"ErrorIndication", "Messages"}});
peerInfo->counters[IN_INITI][BYTES_COUNTER][(ProcedureCode_id_ErrorIndication)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"ErrorIndication", "Bytes"}});
peerInfo->counters[OUT_SUCC][MSG_COUNTER][(ProcedureCode_id_E2setup)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"SetupResponse", "Messages"}});
peerInfo->counters[OUT_SUCC][BYTES_COUNTER][(ProcedureCode_id_E2setup)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"SetupResponse", "Bytes"}});
+ peerInfo->counters[OUT_SUCC][MSG_COUNTER][(ProcedureCode_id_E2nodeConfigurationUpdate)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"E2NodeConfigUpdateSuccess", "Messages"}});
+ peerInfo->counters[OUT_SUCC][BYTES_COUNTER][(ProcedureCode_id_E2nodeConfigurationUpdate)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"E2NodeConfigUpdateSuccess", "Bytes"}});
+
peerInfo->counters[OUT_SUCC][MSG_COUNTER][(ProcedureCode_id_Reset)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"ResetACK", "Messages"}});
peerInfo->counters[OUT_SUCC][BYTES_COUNTER][(ProcedureCode_id_Reset)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"ResetACK", "Bytes"}});
peerInfo->counters[OUT_UN_SUCC][MSG_COUNTER][(ProcedureCode_id_E2setup)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"SetupRequestFailure", "Messages"}});
peerInfo->counters[OUT_UN_SUCC][BYTES_COUNTER][(ProcedureCode_id_E2setup)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"SetupRequestFailure", "Bytes"}});
+ peerInfo->counters[OUT_UN_SUCC][MSG_COUNTER][(ProcedureCode_id_E2nodeConfigurationUpdate)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"E2NodeConfigUpdateFailure", "Messages"}});
+ peerInfo->counters[OUT_UN_SUCC][BYTES_COUNTER][(ProcedureCode_id_E2nodeConfigurationUpdate)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"E2NodeConfigUpdateFailure", "Bytes"}});
+
peerInfo->counters[OUT_UN_SUCC][MSG_COUNTER][(ProcedureCode_id_RICserviceUpdate)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"RICserviceUpdateFailure", "Messages"}});
peerInfo->counters[OUT_UN_SUCC][BYTES_COUNTER][(ProcedureCode_id_RICserviceUpdate)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"RICserviceUpdateFailure", "Bytes"}});
}
+
/**
*
* @param pdu
void asnInitiatingRequest(E2AP_PDU_t *pdu,
Sctp_Map_t *sctpMap,
ReportingMessages_t &message,
- RmrMessagesBuffer_t &rmrMessageBuffer) {
+ RmrMessagesBuffer_t &rmrMessageBuffer, int streamId) {
auto logLevel = mdclog_level_get();
auto procedureCode = ((InitiatingMessage_t *) pdu->choice.initiatingMessage)->procedureCode;
if (logLevel >= MDCLOG_DEBUG) {
if (collectSetupRequestData(pdu, sctpMap, message) != 0) {
break;
}
+ struct sctp_status status;
+ int stat_size = sizeof(status);
+ getsockopt( message.peerInfo->fileDescriptor, SOL_SCTP, SCTP_STATUS,(void *)&status, (socklen_t *)&stat_size );
+ if (logLevel >= MDCLOG_DEBUG) {
+ mdclog_write(MDCLOG_DEBUG, "Start from SCTP %d fd", message.peerInfo->fileDescriptor);
+ mdclog_write(MDCLOG_DEBUG, "SCTP status assoc id %d instrms %d outstrms %d", status.sstat_assoc_id,
+ status.sstat_instrms, status.sstat_outstrms);
+ }
+ if(status.sstat_outstrms == 1 || status.sstat_instrms == 1)
+ {
+ message.peerInfo->isSingleStream = true;
+ message.peerInfo->singleStreamId = streamId;
+ if (status.sstat_outstrms == 1 && status.sstat_instrms == 1){
+ if (logLevel >= MDCLOG_DEBUG) {
+ mdclog_write(MDCLOG_DEBUG, "Single SCTP stream is used for sending from now on, assoc id %d streamId %d #instrms %d #outstrms %d, %s",status.sstat_assoc_id, streamId, status.sstat_instrms, status.sstat_outstrms, __FUNCTION__);
+ }
+ }
+ else {
+ mdclog_write(MDCLOG_ERR, "Single SCTP stream used for sending messages even if there is a mismatch in number of in & out streams, assoc id %d instrms %d outstrms %d", status.sstat_assoc_id,
+ status.sstat_instrms, status.sstat_outstrms);
+ }
+ }
buildPrometheusList(message.peerInfo, message.peerInfo->sctpParams->prometheusFamily);
message.message.messageType = RIC_E2_SETUP_REQ;
message.peerInfo->counters[IN_INITI][MSG_COUNTER][ProcedureCode_id_E2setup]->Increment();
message.peerInfo->counters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_E2setup]->Increment((double)message.message.asnLength);
+
+ // Update E2T instance level metrics
+ message.peerInfo->sctpParams->e2tCounters[IN_INITI][MSG_COUNTER][ProcedureCode_id_E2setup]->Increment();
+ message.peerInfo->sctpParams->e2tCounters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_E2setup]->Increment((double)message.message.asnLength);
+
buildAndSendSetupRequest(message, rmrMessageBuffer, pdu);
break;
}
#if !(defined(UNIT_TEST) || defined(MODULE_TEST))
message.peerInfo->counters[IN_INITI][MSG_COUNTER][ProcedureCode_id_RICserviceUpdate]->Increment();
message.peerInfo->counters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_RICserviceUpdate]->Increment((double)message.message.asnLength);
+
+ // Update E2T instance level metrics
+ message.peerInfo->sctpParams->e2tCounters[IN_INITI][MSG_COUNTER][ProcedureCode_id_RICserviceUpdate]->Increment();
+ message.peerInfo->sctpParams->e2tCounters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_RICserviceUpdate]->Increment((double)message.message.asnLength);
#endif
buildAndSendSetupRequest(message, rmrMessageBuffer, pdu);
break;
}
+
+case ProcedureCode_id_E2nodeConfigurationUpdate: {
+ if (logLevel >= MDCLOG_DEBUG) {
+ mdclog_write(MDCLOG_DEBUG, "Got E2nodeConfigurationUpdate %s", message.message.enodbName);
+ }
+
+ string messageName("RICE2nodeConfigurationUpdate");
+ string ieName("RICE2nodeConfigurationUpdateIEs");
+ message.message.messageType = RIC_E2NODE_CONFIG_UPDATE;
+#if !(defined(UNIT_TEST) || defined(MODULE_TEST))
+ message.peerInfo->counters[IN_INITI][MSG_COUNTER][ProcedureCode_id_E2nodeConfigurationUpdate]->Increment();
+ message.peerInfo->counters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_E2nodeConfigurationUpdate]->Increment((double)message.message.asnLength);
+
+ // Update E2T instance level metrics
+ message.peerInfo->sctpParams->e2tCounters[IN_INITI][MSG_COUNTER][ProcedureCode_id_E2nodeConfigurationUpdate]->Increment();
+ message.peerInfo->sctpParams->e2tCounters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_E2nodeConfigurationUpdate]->Increment((double)message.message.asnLength);
+#endif
+ buildAndSendSetupRequest(message, rmrMessageBuffer, pdu);
+ break;
+ }
+
case ProcedureCode_id_ErrorIndication: {
if (logLevel >= MDCLOG_DEBUG) {
mdclog_write(MDCLOG_DEBUG, "Got ErrorIndication %s", message.message.enodbName);
#if !(defined(UNIT_TEST) || defined(MODULE_TEST))
message.peerInfo->counters[IN_INITI][MSG_COUNTER][ProcedureCode_id_ErrorIndication]->Increment();
message.peerInfo->counters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_ErrorIndication]->Increment((double)message.message.asnLength);
+
+ // Update E2T instance level metrics
+ message.peerInfo->sctpParams->e2tCounters[IN_INITI][MSG_COUNTER][ProcedureCode_id_ErrorIndication]->Increment();
+ message.peerInfo->sctpParams->e2tCounters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_ErrorIndication]->Increment((double)message.message.asnLength);
#endif
if (sendRequestToXapp(message, RIC_ERROR_INDICATION, rmrMessageBuffer) != 0) {
mdclog_write(MDCLOG_ERR, "RIC_ERROR_INDICATION failed to send to xAPP");
#if !(defined(UNIT_TEST) || defined(MODULE_TEST))
message.peerInfo->counters[IN_INITI][MSG_COUNTER][ProcedureCode_id_Reset]->Increment();
message.peerInfo->counters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_Reset]->Increment((double)message.message.asnLength);
+
+ // Update E2T instance level metrics
+ message.peerInfo->sctpParams->e2tCounters[IN_INITI][MSG_COUNTER][ProcedureCode_id_Reset]->Increment();
+ message.peerInfo->sctpParams->e2tCounters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_Reset]->Increment((double)message.message.asnLength);
#endif
if (XML_From_PER(message, rmrMessageBuffer) < 0) {
break;
#if !(defined(UNIT_TEST) || defined(MODULE_TEST))
message.peerInfo->counters[IN_INITI][MSG_COUNTER][ProcedureCode_id_RICindication]->Increment();
message.peerInfo->counters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_RICindication]->Increment((double)message.message.asnLength);
+
+ // Update E2T instance level metrics
+ message.peerInfo->sctpParams->e2tCounters[IN_INITI][MSG_COUNTER][ProcedureCode_id_RICindication]->Increment();
+ message.peerInfo->sctpParams->e2tCounters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_RICindication]->Increment((double)message.message.asnLength);
#endif
sendRmrMessage(rmrMessageBuffer, message);
messageSent = true;
#if !(defined(UNIT_TEST) || defined(MODULE_TEST))
message.peerInfo->counters[IN_SUCC][MSG_COUNTER][ProcedureCode_id_Reset]->Increment();
message.peerInfo->counters[IN_SUCC][BYTES_COUNTER][ProcedureCode_id_Reset]->Increment((double)message.message.asnLength);
+
+ // Update E2T instance level metrics
+ message.peerInfo->sctpParams->e2tCounters[IN_SUCC][MSG_COUNTER][ProcedureCode_id_Reset]->Increment();
+ message.peerInfo->sctpParams->e2tCounters[IN_SUCC][BYTES_COUNTER][ProcedureCode_id_Reset]->Increment((double)message.message.asnLength);
#endif
if (XML_From_PER(message, rmrMessageBuffer) < 0) {
break;
#if !(defined(UNIT_TEST) || defined(MODULE_TEST))
message.peerInfo->counters[IN_SUCC][MSG_COUNTER][ProcedureCode_id_RICcontrol]->Increment();
message.peerInfo->counters[IN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICcontrol]->Increment((double)message.message.asnLength);
+
+ // Update E2T instance level metrics
+ message.peerInfo->sctpParams->e2tCounters[IN_SUCC][MSG_COUNTER][ProcedureCode_id_RICcontrol]->Increment();
+ message.peerInfo->sctpParams->e2tCounters[IN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICcontrol]->Increment((double)message.message.asnLength);
#endif
sendRmrMessage(rmrMessageBuffer, message);
messageSent = true;
#if !(defined(UNIT_TEST) || defined(MODULE_TEST))
message.peerInfo->counters[IN_SUCC][MSG_COUNTER][ProcedureCode_id_RICsubscription]->Increment();
message.peerInfo->counters[IN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICsubscription]->Increment((double)message.message.asnLength);
+
+ // Update E2T instance level metrics
+ message.peerInfo->sctpParams->e2tCounters[IN_SUCC][MSG_COUNTER][ProcedureCode_id_RICsubscription]->Increment();
+ message.peerInfo->sctpParams->e2tCounters[IN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICsubscription]->Increment((double)message.message.asnLength);
#endif
if (sendRequestToXapp(message, RIC_SUB_RESP, rmrMessageBuffer) != 0) {
mdclog_write(MDCLOG_ERR, "Subscription successful message failed to send to xAPP");
#if !(defined(UNIT_TEST) || defined(MODULE_TEST))
message.peerInfo->counters[IN_SUCC][MSG_COUNTER][ProcedureCode_id_RICsubscriptionDelete]->Increment();
message.peerInfo->counters[IN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICsubscriptionDelete]->Increment((double)message.message.asnLength);
+
+ // Update E2T instance level metrics
+ message.peerInfo->sctpParams->e2tCounters[IN_SUCC][MSG_COUNTER][ProcedureCode_id_RICsubscriptionDelete]->Increment();
+ message.peerInfo->sctpParams->e2tCounters[IN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICsubscriptionDelete]->Increment((double)message.message.asnLength);
#endif
if (sendRequestToXapp(message, RIC_SUB_DEL_RESP, rmrMessageBuffer) != 0) {
mdclog_write(MDCLOG_ERR, "Subscription delete successful message failed to send to xAPP");
#if !(defined(UNIT_TEST) || defined(MODULE_TEST))
message.peerInfo->counters[IN_UN_SUCC][MSG_COUNTER][ProcedureCode_id_RICcontrol]->Increment();
message.peerInfo->counters[IN_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICcontrol]->Increment((double)message.message.asnLength);
+
+ // Update E2T instance level metrics
+ message.peerInfo->sctpParams->e2tCounters[IN_UN_SUCC][MSG_COUNTER][ProcedureCode_id_RICcontrol]->Increment();
+ message.peerInfo->sctpParams->e2tCounters[IN_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICcontrol]->Increment((double)message.message.asnLength);
#endif
sendRmrMessage(rmrMessageBuffer, message);
messageSent = true;
#if !(defined(UNIT_TEST) || defined(MODULE_TEST))
message.peerInfo->counters[IN_UN_SUCC][MSG_COUNTER][ProcedureCode_id_RICsubscription]->Increment();
message.peerInfo->counters[IN_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICsubscription]->Increment((double)message.message.asnLength);
+
+ // Update E2T instance level metrics
+ message.peerInfo->sctpParams->e2tCounters[IN_UN_SUCC][MSG_COUNTER][ProcedureCode_id_RICsubscription]->Increment();
+ message.peerInfo->sctpParams->e2tCounters[IN_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICsubscription]->Increment((double)message.message.asnLength);
#endif
if (sendRequestToXapp(message, RIC_SUB_FAILURE, rmrMessageBuffer) != 0) {
mdclog_write(MDCLOG_ERR, "Subscription unsuccessful message failed to send to xAPP");
#if !(defined(UNIT_TEST) || defined(MODULE_TEST))
message.peerInfo->counters[IN_UN_SUCC][MSG_COUNTER][ProcedureCode_id_RICsubscriptionDelete]->Increment();
message.peerInfo->counters[IN_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICsubscriptionDelete]->Increment((double)message.message.asnLength);
+
+ // Update E2T instance level metrics
+ message.peerInfo->sctpParams->e2tCounters[IN_UN_SUCC][MSG_COUNTER][ProcedureCode_id_RICsubscriptionDelete]->Increment();
+ message.peerInfo->sctpParams->e2tCounters[IN_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICsubscriptionDelete]->Increment((double)message.message.asnLength);
#endif
if (sendRequestToXapp(message, RIC_SUB_FAILURE, rmrMessageBuffer) != 0) {
mdclog_write(MDCLOG_ERR, "Subscription Delete unsuccessful message failed to send to xAPP");
#if !(defined(UNIT_TEST) || defined(MODULE_TEST))
message.peerInfo->counters[OUT_SUCC][MSG_COUNTER][ProcedureCode_id_E2setup]->Increment();
message.peerInfo->counters[OUT_SUCC][BYTES_COUNTER][ProcedureCode_id_E2setup]->Increment(rmrMessageBuffer.rcvMessage->len);
+
+ // Update E2T instance level metrics
+ message.peerInfo->sctpParams->e2tCounters[OUT_SUCC][MSG_COUNTER][ProcedureCode_id_E2setup]->Increment();
+ message.peerInfo->sctpParams->e2tCounters[OUT_SUCC][BYTES_COUNTER][ProcedureCode_id_E2setup]->Increment(rmrMessageBuffer.rcvMessage->len);
#endif
if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
mdclog_write(MDCLOG_ERR, "Failed to send RIC_E2_SETUP_RESP");
#if !(defined(UNIT_TEST) || defined(MODULE_TEST))
message.peerInfo->counters[OUT_UN_SUCC][MSG_COUNTER][ProcedureCode_id_E2setup]->Increment();
message.peerInfo->counters[OUT_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_E2setup]->Increment(rmrMessageBuffer.rcvMessage->len);
+
+ // Update E2T instance level metrics
+ message.peerInfo->sctpParams->e2tCounters[OUT_UN_SUCC][MSG_COUNTER][ProcedureCode_id_E2setup]->Increment();
+ message.peerInfo->sctpParams->e2tCounters[OUT_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_E2setup]->Increment(rmrMessageBuffer.rcvMessage->len);
#endif
if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
mdclog_write(MDCLOG_ERR, "Failed to send RIC_E2_SETUP_FAILURE");
}
break;
}
+
+ case RIC_E2NODE_CONFIG_UPDATE_ACK: {
+ if (loglevel >= MDCLOG_DEBUG) {
+ mdclog_write(MDCLOG_DEBUG, "RIC_E2NODE_CONFIG_UPDATE_ACK");
+ }
+ if (PER_FromXML(message, rmrMessageBuffer) != 0) {
+ break;
+ }
+#if !(defined(UNIT_TEST) || defined(MODULE_TEST))
+ message.peerInfo->counters[OUT_SUCC][MSG_COUNTER][ProcedureCode_id_E2nodeConfigurationUpdate]->Increment();
+ message.peerInfo->counters[OUT_SUCC][BYTES_COUNTER][ProcedureCode_id_E2nodeConfigurationUpdate]->Increment(rmrMessageBuffer.rcvMessage->len);
+
+ // Update E2T instance level metrics
+ message.peerInfo->sctpParams->e2tCounters[OUT_SUCC][MSG_COUNTER][ProcedureCode_id_E2nodeConfigurationUpdate]->Increment();
+ message.peerInfo->sctpParams->e2tCounters[OUT_SUCC][BYTES_COUNTER][ProcedureCode_id_E2nodeConfigurationUpdate]->Increment(rmrMessageBuffer.rcvMessage->len);
+#endif
+ if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
+ mdclog_write(MDCLOG_ERR, "Failed to send RIC_E2NODE_CONFIG_UPDATE_ACK");
+ return -6;
+ }
+ break;
+ }
+
+ case RIC_E2NODE_CONFIG_UPDATE_FAILURE: {
+ if (loglevel >= MDCLOG_DEBUG) {
+ mdclog_write(MDCLOG_DEBUG, "RIC_E2NODE_CONFIG_UPDATE_FAILURE");
+ }
+ if (PER_FromXML(message, rmrMessageBuffer) != 0) {
+ break;
+ }
+#if !(defined(UNIT_TEST) || defined(MODULE_TEST))
+ message.peerInfo->counters[OUT_UN_SUCC][MSG_COUNTER][ProcedureCode_id_E2nodeConfigurationUpdate]->Increment();
+ message.peerInfo->counters[OUT_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_E2nodeConfigurationUpdate]->Increment(rmrMessageBuffer.rcvMessage->len);
+
+ // Update E2T instance level metrics
+ message.peerInfo->sctpParams->e2tCounters[OUT_UN_SUCC][MSG_COUNTER][ProcedureCode_id_E2nodeConfigurationUpdate]->Increment();
+ message.peerInfo->sctpParams->e2tCounters[OUT_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_E2nodeConfigurationUpdate]->Increment(rmrMessageBuffer.rcvMessage->len);
+#endif
+ if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
+ mdclog_write(MDCLOG_ERR, "Failed to send RIC_E2NODE_CONFIG_UPDATE_FAILURE");
+ return -6;
+ }
+ break;
+ }
+
case RIC_ERROR_INDICATION: {
if (loglevel >= MDCLOG_DEBUG) {
mdclog_write(MDCLOG_DEBUG, "RIC_ERROR_INDICATION");
#if !(defined(UNIT_TEST) || defined(MODULE_TEST))
message.peerInfo->counters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_ErrorIndication]->Increment();
message.peerInfo->counters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_ErrorIndication]->Increment(rmrMessageBuffer.rcvMessage->len);
+
+ // Update E2T instance level metrics
+ message.peerInfo->sctpParams->e2tCounters[IN_INITI][MSG_COUNTER][ProcedureCode_id_ErrorIndication]->Increment();
+ message.peerInfo->sctpParams->e2tCounters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_ErrorIndication]->Increment(rmrMessageBuffer.rcvMessage->len);
#endif
if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
mdclog_write(MDCLOG_ERR, "Failed to send RIC_ERROR_INDICATION");
#if !(defined(UNIT_TEST) || defined(MODULE_TEST))
message.peerInfo->counters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_RICsubscription]->Increment();
message.peerInfo->counters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_RICsubscription]->Increment(rmrMessageBuffer.rcvMessage->len);
+
+ // Update E2T instance level metrics
+ message.peerInfo->sctpParams->e2tCounters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_RICsubscription]->Increment();
+ message.peerInfo->sctpParams->e2tCounters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_RICsubscription]->Increment(rmrMessageBuffer.rcvMessage->len);
#endif
if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
mdclog_write(MDCLOG_ERR, "Failed to send RIC_SUB_REQ");
#if !(defined(UNIT_TEST) || defined(MODULE_TEST))
message.peerInfo->counters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_RICsubscriptionDelete]->Increment();
message.peerInfo->counters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_RICsubscriptionDelete]->Increment(rmrMessageBuffer.rcvMessage->len);
+
+ // Update E2T instance level metrics
+ message.peerInfo->sctpParams->e2tCounters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_RICsubscriptionDelete]->Increment();
+ message.peerInfo->sctpParams->e2tCounters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_RICsubscriptionDelete]->Increment(rmrMessageBuffer.rcvMessage->len);
#endif
if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
mdclog_write(MDCLOG_ERR, "Failed to send RIC_SUB_DEL_REQ");
#if !(defined(UNIT_TEST) || defined(MODULE_TEST))
message.peerInfo->counters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_RICcontrol]->Increment();
message.peerInfo->counters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_RICcontrol]->Increment(rmrMessageBuffer.rcvMessage->len);
+
+ // Update E2T instance level metrics
+ message.peerInfo->sctpParams->e2tCounters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_RICcontrol]->Increment();
+ message.peerInfo->sctpParams->e2tCounters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_RICcontrol]->Increment(rmrMessageBuffer.rcvMessage->len);
#endif
if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
mdclog_write(MDCLOG_ERR, "Failed to send RIC_CONTROL_REQ");
#if !(defined(UNIT_TEST) || defined(MODULE_TEST))
message.peerInfo->counters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_RICserviceQuery]->Increment();
message.peerInfo->counters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_RICserviceQuery]->Increment(rmrMessageBuffer.rcvMessage->len);
+
+ // Update E2T instance level metrics
+ message.peerInfo->sctpParams->e2tCounters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_RICserviceQuery]->Increment();
+ message.peerInfo->sctpParams->e2tCounters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_RICserviceQuery]->Increment(rmrMessageBuffer.rcvMessage->len);
#endif
if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
mdclog_write(MDCLOG_ERR, "Failed to send RIC_SERVICE_QUERY");
#if !(defined(UNIT_TEST) || defined(MODULE_TEST))
message.peerInfo->counters[OUT_SUCC][MSG_COUNTER][ProcedureCode_id_RICserviceUpdate]->Increment();
message.peerInfo->counters[OUT_SUCC][BYTES_COUNTER][ProcedureCode_id_RICserviceUpdate]->Increment(rmrMessageBuffer.rcvMessage->len);
+
+ // Update E2T instance level metrics
+ message.peerInfo->sctpParams->e2tCounters[OUT_SUCC][MSG_COUNTER][ProcedureCode_id_RICserviceUpdate]->Increment();
+ message.peerInfo->sctpParams->e2tCounters[OUT_SUCC][BYTES_COUNTER][ProcedureCode_id_RICserviceUpdate]->Increment(rmrMessageBuffer.rcvMessage->len);
#endif
if (loglevel >= MDCLOG_DEBUG) {
mdclog_write(MDCLOG_DEBUG, "Before sending to CU");
#if !(defined(UNIT_TEST) || defined(MODULE_TEST))
message.peerInfo->counters[OUT_UN_SUCC][MSG_COUNTER][ProcedureCode_id_RICserviceUpdate]->Increment();
message.peerInfo->counters[OUT_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICserviceUpdate]->Increment(rmrMessageBuffer.rcvMessage->len);
+
+ // Update E2T instance level metrics
+ message.peerInfo->sctpParams->e2tCounters[OUT_UN_SUCC][MSG_COUNTER][ProcedureCode_id_RICserviceUpdate]->Increment();
+ message.peerInfo->sctpParams->e2tCounters[OUT_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICserviceUpdate]->Increment(rmrMessageBuffer.rcvMessage->len);
#endif
if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
mdclog_write(MDCLOG_ERR, "Failed to send RIC_SERVICE_UPDATE_FAILURE");
#if !(defined(UNIT_TEST) || defined(MODULE_TEST))
message.peerInfo->counters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_Reset]->Increment();
message.peerInfo->counters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_Reset]->Increment(rmrMessageBuffer.rcvMessage->len);
+
+ // Update E2T instance level metrics
+ message.peerInfo->sctpParams->e2tCounters[IN_INITI][MSG_COUNTER][ProcedureCode_id_Reset]->Increment();
+ message.peerInfo->sctpParams->e2tCounters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_Reset]->Increment(rmrMessageBuffer.rcvMessage->len);
#endif
if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
mdclog_write(MDCLOG_ERR, "Failed to send RIC_E2_RESET");
#if !(defined(UNIT_TEST) || defined(MODULE_TEST))
message.peerInfo->counters[OUT_SUCC][MSG_COUNTER][ProcedureCode_id_Reset]->Increment();
message.peerInfo->counters[OUT_SUCC][BYTES_COUNTER][ProcedureCode_id_Reset]->Increment(rmrMessageBuffer.rcvMessage->len);
+
+ // Update E2T instance level metrics
+ message.peerInfo->sctpParams->e2tCounters[OUT_SUCC][MSG_COUNTER][ProcedureCode_id_Reset]->Increment();
+ message.peerInfo->sctpParams->e2tCounters[OUT_SUCC][BYTES_COUNTER][ProcedureCode_id_Reset]->Increment(rmrMessageBuffer.rcvMessage->len);
#endif
if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
mdclog_write(MDCLOG_ERR, "Failed to send RIC_E2_RESET_RESP");
mdclog_write(MDCLOG_ERR, "SCTP_CONNECTION_FAIL message failed to send to xAPP");
}
free(peerInfo);
+ peerInfo = nullptr;
}
}
} else if (rmrMessageBuffer.rcvMessage->state != 0) {
mdclog_write(MDCLOG_ERR, "Failed to send RIC_HEALTH_CHECK_RESP, on RMR state = %d ( %s)",
rmrMessageBuffer.rcvMessage->state, translateRmrErrorMessages(rmrMessageBuffer.rcvMessage->state).c_str());
- } else if (loglevel >= MDCLOG_DEBUG && ++counter % 100 == 0) {
+ } else if (loglevel >= MDCLOG_DEBUG && (++counter % 100 == 0)) {
mdclog_write(MDCLOG_DEBUG, "Got %d RIC_HEALTH_CHECK_REQ Request send : OK", counter);
}
auto tmp = sctpMap->find(key);
if (tmp) {
free(tmp);
+ tmp = nullptr;
sctpMap->erase(key);
}
} else {
auto tmp = sctpMap->find(key);
if (tmp) {
free(tmp);
- }
+ tmp = nullptr;
sctpMap->erase(key);
+ }
mdclog_write(MDCLOG_ERR, "epoll_ctl EPOLL_CTL_ADD (may check not to quit here)");
return -1;
}
}
return str;
}
+int fetchStreamId(ConnectedCU_t *peerInfo, ReportingMessages_t &message)
+{
+ auto loglevel = mdclog_level_get();
+ int streamId = INVALID_STREAM_ID;
+ if(message.peerInfo->isSingleStream != false)
+ {
+ streamId = message.peerInfo->singleStreamId;
+ if (loglevel >= MDCLOG_DEBUG) {
+ mdclog_write(MDCLOG_DEBUG, "Send SCTP message for SINGLE_STREAM streamId %d , Messeage Type %d ,%s",
+ streamId,message.message.messageType, __FUNCTION__);
+ }
+ return streamId;
+ }
+ int msgType = message.message.messageType;
+ switch (msgType){
+ case RIC_E2_RESET_REQ:
+ case RIC_E2_RESET_RESP:
+ case RIC_E2_SETUP_RESP:
+ case RIC_E2_SETUP_FAILURE:
+ case RIC_ERROR_INDICATION:
+ case RIC_SERVICE_QUERY:
+ case RIC_SERVICE_UPDATE_ACK:
+ case RIC_SERVICE_UPDATE_FAILURE:
+ streamId = 0;
+ break;
+ case RIC_SUB_REQ:
+ case RIC_SUB_DEL_REQ:
+ case RIC_CONTROL_REQ:
+ streamId = 1;
+ break;
+ default:
+ streamId = 0;
+ break;
+ }
+ if (loglevel >= MDCLOG_DEBUG) {
+ mdclog_write(MDCLOG_DEBUG, "Send SCTP message for streamId %d Messeage Type %d, %s",
+ streamId, message.message.messageType, __FUNCTION__);
+ }
+ return streamId;
+}