#include <atomic>
#include <sys/param.h>
#include <sys/file.h>
+#include <sys/types.h>
+#include <ifaddrs.h>
#include <ctime>
#include <netdb.h>
#include <sys/epoll.h>
#include <map>
#include <sys/inotify.h>
#include <csignal>
+#include <future>
#include <rmr/rmr.h>
#include <rmr/RIC_message_types.h>
#include <mdclog/mdclog.h>
-#include "asn1cFiles/E2AP-PDU.h"
-#include "asn1cFiles/ProtocolIE-Container.h"
-#include "asn1cFiles/InitiatingMessage.h"
-#include "asn1cFiles/SuccessfulOutcome.h"
-#include "asn1cFiles/UnsuccessfulOutcome.h"
-#include "asn1cFiles/ProtocolIE-Container.h"
-#include "asn1cFiles/ProtocolIE-Field.h"
+#include "oranE2/E2AP-PDU.h"
+#include "oranE2/ProtocolIE-Container.h"
+#include "oranE2/InitiatingMessage.h"
+#include "oranE2/SuccessfulOutcome.h"
+#include "oranE2/UnsuccessfulOutcome.h"
+#include "oranE2/ProtocolIE-Container.h"
+#include "oranE2/ProtocolIE-Field.h"
+#include "oranE2/GlobalE2node-gNB-ID.h"
+#include "oranE2/GlobalE2node-en-gNB-ID.h"
+#include "oranE2/GlobalE2node-ng-eNB-ID.h"
+#include "oranE2/GlobalE2node-eNB-ID.h"
#include "cxxopts.hpp"
//#include "config-cpp/include/config-cpp/config-cpp.h"
+#include <zlib.h>
+#include <prometheus/counter.h>
+#include <prometheus/exposer.h>
+#include <prometheus/gateway.h>
+#include <prometheus/registry.h>
-#ifdef __TRACING__
-#include "openTracing.h"
-#endif
+using namespace prometheus;
#include "mapWrapper.h"
#define MAXEVENTS 128
-#define RECEIVE_SCTP_BUFFER_SIZE (64*1024)
-#define RECEIVE_XAPP_BUFFER_SIZE RECEIVE_SCTP_BUFFER_SIZE
+#define RECEIVE_SCTP_BUFFER_SIZE (256 * 1024)
+#define RECEIVE_XAPP_BUFFER_SIZE RECEIVE_SCTP_BUFFER_SIZE
typedef mapWrapper Sctp_Map_t;
-#ifdef __TRACING__
-typedef const std::unique_ptr<opentracing::Span> otSpan;
-#else
-typedef const int otSpan;
-#endif
+
#define VOLUME_URL_SIZE 256
+#define KA_MESSAGE_SIZE 2048
typedef struct sctp_params {
+ int epollTimeOut = -1;
uint16_t rmrPort = 0;
+ uint16_t sctpPort = SRC_PORT;
int epoll_fd = 0;
+ int listenFD = 0;
int rmrListenFd = 0;
int inotifyFD = 0;
int inotifyWD = 0;
void *rmrCtx = nullptr;
Sctp_Map_t *sctpMap = nullptr;
- char ka_message[4096] {};
+ char ka_message[KA_MESSAGE_SIZE] {};
int ka_message_length = 0;
- char rmrAddress[256] {}; // "tcp:portnumber" "tcp:5566" listen to all address on port 5566
+ char rmrAddress[256] {}; // "tcp:port number" "tcp:5566" listen to all address on port 5566
mdclog_severity_t logLevel = MDCLOG_INFO;
char volume[VOLUME_URL_SIZE];
string myIP {};
string configFilePath {};
string configFileName {};
bool trace = true;
- //shared_timed_mutex fence; // moved to mapWrapper
+ shared_ptr<prometheus::Registry> prometheusRegistry;
+ string prometheusPort {"8088"};
+ Family<Counter> *prometheusFamily;
+ Exposer *prometheusExposer = nullptr;
+ Counter *e2tCounters[6][2][ProcedureCode_id_RICsubscriptionDelete + 1] {};
} sctp_params_t;
+// RAN to RIC
+#define IN_INITI 0 //INITIATING
+#define IN_SUCC 1 //SUCCESSFUL
+#define IN_UN_SUCC 2 //UN-Successful
+
+// RIC To RAN
+#define OUT_INITI 3 //INITIATING
+#define OUT_SUCC 4 //SUCCESSFUL
+#define OUT_UN_SUCC 5 //UN-Successful
+
+#define MSG_COUNTER 0
+#define BYTES_COUNTER 1
+
typedef struct ConnectedCU {
int fileDescriptor = 0;
char hostName[NI_MAXHOST] {};
size_t asnLength = 0;
int mtype = 0;
bool isConnected = false;
+ bool gotSetup = false;
+ sctp_params_t *sctpParams = nullptr;
+ Counter *counters[6][2][ProcedureCode_id_RICsubscriptionDelete + 1] {};
} ConnectedCU_t ;
-#define MAX_RMR_BUFF_ARRY 32
+
+#define MAX_RMR_BUFF_ARRAY 32
typedef struct RmrMessagesBuffer {
- char ka_message[4096] {};
+ char ka_message[KA_MESSAGE_SIZE] {};
int ka_message_len = 0;
void *rmrCtx = nullptr;
rmr_mbuf_t *sendMessage= nullptr;
- rmr_mbuf_t *sendBufferedMessages[MAX_RMR_BUFF_ARRY] {};
+ //rmr_mbuf_t *sendBufferedMessages[MAX_RMR_BUFF_ARRAY] {};
rmr_mbuf_t *rcvMessage= nullptr;
- rmr_mbuf_t *rcvBufferedMessages[MAX_RMR_BUFF_ARRY] {};
+ //rmr_mbuf_t *rcvBufferedMessages[MAX_RMR_BUFF_ARRAY] {};
} RmrMessagesBuffer_t;
typedef struct formatedMessage {
} FormatedMessage_t;
typedef struct ReportingMessages {
- FormatedMessage_t message;
- long outLen;
- unsigned char base64Data[RECEIVE_SCTP_BUFFER_SIZE * 2];
- char buffer[RECEIVE_SCTP_BUFFER_SIZE * 8];
+ FormatedMessage_t message {};
+ ConnectedCU_t *peerInfo = nullptr;
+ long outLen = 0;
+ unsigned char base64Data[RECEIVE_SCTP_BUFFER_SIZE * 2] {};
+ char buffer[RECEIVE_SCTP_BUFFER_SIZE * 8] {};
} ReportingMessages_t;
cxxopts::ParseResult parse(int argc, char *argv[], sctp_params_t &pSctpParams);
void handleEinprogressMessages(struct epoll_event &event,
ReportingMessages_t &message,
RmrMessagesBuffer_t &rmrMessageBuffer,
- sctp_params_t *params,
- otSpan *pSpan);
+ sctp_params_t *params);
void handlepoll_error(struct epoll_event &event,
ReportingMessages_t &message,
RmrMessagesBuffer_t &rmrMessageBuffer,
- sctp_params_t *params,
- otSpan *pSpan);
+ sctp_params_t *params);
-void cleanHashEntry(ConnectedCU_t *peerInfo, Sctp_Map_t *m, otSpan *pSpan);
+void cleanHashEntry(ConnectedCU_t *peerInfo, Sctp_Map_t *m);
-int getSetupRequestMetaData(ReportingMessages_t &message, char *data, char *host, uint16_t &port, otSpan *pSpan);
/**
*
* @param message
* @param rmrMessageBuffer
- * @param pSpan
*/
-void getRequestMetaData(ReportingMessages_t &message, RmrMessagesBuffer_t &rmrMessageBuffer, otSpan *pSpan);
+void getRequestMetaData(ReportingMessages_t &message, RmrMessagesBuffer_t &rmrMessageBuffer);
/**
*
* @param sctpMap
- * @param messagBuffer
+ * @param messageBuffer
* @param message
* @param failedMesgId
- * @param pSpan
* @return
*/
int sendMessagetoCu(Sctp_Map_t *sctpMap,
- RmrMessagesBuffer_t &messagBuffer,
+ RmrMessagesBuffer_t &messageBuffer,
ReportingMessages_t &message,
- int failedMesgId, otSpan *pSpan);
+ int failedMesgId);
void sendFailedSendingMessagetoXapp(RmrMessagesBuffer_t &rmrMessageBuffer,
ReportingMessages_t &message,
- int failedMesgId,
- otSpan *pSpan);
+ int failedMesgId);
int sendRequestToXapp(ReportingMessages_t &message,
int requestId,
- RmrMessagesBuffer_t &rmrMmessageBuffer,
- otSpan *pSpan);
+ RmrMessagesBuffer_t &rmrMmessageBuffer);
/**
*
* @param requestType
* @param rmrMessageBuffer
* @param sctpMap
- * @param pSpan
* @return
*/
+/*
int sendResponseToXapp(ReportingMessages_t &message,
int msgType,
int requestType,
RmrMessagesBuffer_t &rmrMessageBuffer,
- Sctp_Map_t *sctpMap,
- otSpan *pSpan);
+ Sctp_Map_t *sctpMap);
+*/
/**
*
* @param peerInfo
* @param message
* @param m
- * @param pSpan
* @return
*/
int sendSctpMsg(ConnectedCU_t *peerInfo,
ReportingMessages_t &message,
- Sctp_Map_t *m,
- otSpan *pSpan);
+ Sctp_Map_t *m);
/**
*
* @param numOfMessages
* @param rmrMessageBuffer
* @param ts
- * @param pSpan
* @return
*/
int receiveDataFromSctp(struct epoll_event *events,
Sctp_Map_t *sctpMap,
int &numOfMessages,
RmrMessagesBuffer_t &rmrMessageBuffer,
- struct timespec &ts,
- otSpan *pSpan);
+ struct timespec &ts);
/**
*
* @param rmrAddress
- * @param pSpan
* @return
*/
-void getRmrContext(sctp_params_t &pSctpParams, otSpan *pSpan);
+void getRmrContext(sctp_params_t &pSctpParams);
/**
*
- * @param epoll_fd
- * @param rmrCtx
* @param sctpMap
- * @param messagBuffer
- * @param pSpan
- * @return
- */
-int receiveXappMessages(int epoll_fd,
- Sctp_Map_t *sctpMap,
- RmrMessagesBuffer_t &rmrMessageBuffer,
- struct timespec &ts,
- otSpan *pSpan);
-
-/**
- *
* @param rmrMessageBuffer
- * @param message
- * @param epoll_fd
- * @param sctpMap
- * @param pSpan
+ * @param ts
* @return
*/
-int connectToCUandSetUp(RmrMessagesBuffer_t &rmrMessageBuffer,
- ReportingMessages_t &message,
- int epoll_fd,
- Sctp_Map_t *sctpMap,
- otSpan *pSpan);
+int receiveXappMessages(Sctp_Map_t *sctpMap,
+ RmrMessagesBuffer_t &rmrMessageBuffer,
+ struct timespec &ts);
/**
*
- * @param messagBuffer
+ * @param messageBuffer
* @param failedMsgId
* @param sctpMap
- * @param pSpan
* @return
*/
-int sendDirectionalSctpMsg(RmrMessagesBuffer_t &messagBuffer,
+int sendDirectionalSctpMsg(RmrMessagesBuffer_t &messageBuffer,
ReportingMessages_t &message,
int failedMsgId,
- Sctp_Map_t *sctpMap,
- otSpan *pSpan);
+ Sctp_Map_t *sctpMap);
/**
*
* @param pdu
* @param message
* @param rmrMessageBuffer
- * @param pSpan
*/
void asnInitiatingRequest(E2AP_PDU_t *pdu,
+ Sctp_Map_t *sctpMap,
ReportingMessages_t &message,
- RmrMessagesBuffer_t &rmrMessageBuffer,
- otSpan *pSpan);
+ RmrMessagesBuffer_t &rmrMessageBuffer);
/**
*
* @param pdu
* @param message
* @param sctpMap
* @param rmrMessageBuffer
- * @param pSpan
*/
-void asnSuccsesfulMsg(E2AP_PDU_t *pdu,
- ReportingMessages_t &message,
+void asnSuccessfulMsg(E2AP_PDU_t *pdu,
Sctp_Map_t *sctpMap,
- RmrMessagesBuffer_t &rmrMessageBuffer,
- otSpan *pSpan);
+ ReportingMessages_t &message,
+ RmrMessagesBuffer_t &rmrMessageBuffer);
/**
*
* @param pdu
* @param message
* @param sctpMap
* @param rmrMessageBuffer
- * @param pSpan
*/
void asnUnSuccsesfulMsg(E2AP_PDU_t *pdu,
- ReportingMessages_t &message,
Sctp_Map_t *sctpMap,
- RmrMessagesBuffer_t &rmrMessageBuffer,
- otSpan *pSpan);
+ ReportingMessages_t &message,
+ RmrMessagesBuffer_t &rmrMessageBuffer);
/**
*
* @param rmrMessageBuffer
* @param message
- * @param pSpan
* @return
*/
-int sendRmrMessage(RmrMessagesBuffer_t &rmrMessageBuffer, ReportingMessages_t &message, otSpan *pSpan);
-
+int sendRmrMessage(RmrMessagesBuffer_t &rmrMessageBuffer, ReportingMessages_t &message);
/**
*
* @param epoll_fd
* @param sctpMap
* @param enodbName
* @param msgType
- * @param pSpan
* @returnsrc::logger_mt& lg = my_logger::get();
*/
-int addToEpoll(int epoll_fd, ConnectedCU_t *peerInfo, uint32_t events, Sctp_Map_t *sctpMap, char *enodbName, int msgType, otSpan *pSpan);
+int addToEpoll(int epoll_fd, ConnectedCU_t *peerInfo, uint32_t events, Sctp_Map_t *sctpMap, char *enodbName, int msgType);
/**
*
* @param epoll_fd
* @param sctpMap
* @param enodbName
* @param msgType
- * @param pSpan
* @return
*/
-int modifyToEpoll(int epoll_fd, ConnectedCU_t *peerInfo, uint32_t events, Sctp_Map_t *sctpMap, char *enodbName, int msgType, otSpan *pSpan);
+int modifyToEpoll(int epoll_fd, ConnectedCU_t *peerInfo, uint32_t events, Sctp_Map_t *sctpMap, char *enodbName, int msgType);
/**
*
*/
string translateRmrErrorMessages(int state);
+int buildConfiguration(sctp_params_t &sctpParams);
+void startPrometheus(sctp_params_t &sctpParams);
+static int enable_log_change_notify(const char* fileName);
+static int register_log_change_notify(const char *fileName);
+static void * monitor_loglevel_change_handler(void* arg);
+void update_mdc_log_level_severity(char* log_level);
+char* getinterfaceip();
+static char* parse_file(char* filename);
+
static inline uint64_t rdtscp(uint32_t &aux) {
uint64_t rax,rdx;
asm volatile ("rdtscp\n" : "=a" (rax), "=d" (rdx), "=c" (aux) : :);
- return (rdx << 32) + rax;
+ return (rdx << (unsigned)32) + rax;
}
#ifndef RIC_SCTP_CONNECTION_FAILURE
#define RIC_SCTP_CONNECTION_FAILURE 10080
#endif
+#ifdef UNIT_TEST
+ #define FILE_DESCRIPTOR 53424 /*Dummy value for file descriptor only when UT is defined*/
+#endif
+
+int buildListeningPort(sctp_params_t &sctpParams);
+void buildE2TPrometheusCounters(sctp_params_t &sctpParams);
+
#endif //X2_SCTP_THREAD_H