LC for E2 for ORAN
[ric-plt/e2.git] / RIC-E2-TERMINATION / sctpThread.h
index 9dfbf9d..10f572b 100644 (file)
@@ -50,6 +50,7 @@
 #include <map>
 #include <sys/inotify.h>
 #include <csignal>
+#include <future>
 
 #include <rmr/rmr.h>
 #include <rmr/RIC_message_types.h>
 
 #include "cxxopts.hpp"
 //#include "config-cpp/include/config-cpp/config-cpp.h"
+#include <zlib.h>
+#include <prometheus/counter.h>
+#include <prometheus/exposer.h>
+#include <prometheus/gateway.h>
+#include <prometheus/registry.h>
 
-#ifdef __TRACING__
-#include "openTracing.h"
-#endif
+using namespace prometheus;
 
 #include "mapWrapper.h"
 
@@ -108,21 +112,20 @@ namespace expr = boost::log::expressions;
 
 #define MAXEVENTS 128
 
-#define RECEIVE_SCTP_BUFFER_SIZE (8*1024)
-#define RECEIVE_XAPP_BUFFER_SIZE RECEIVE_SCTP_BUFFER_SIZE 
+#define RECEIVE_SCTP_BUFFER_SIZE (256 * 1024)
+#define RECEIVE_XAPP_BUFFER_SIZE RECEIVE_SCTP_BUFFER_SIZE
 
 typedef mapWrapper Sctp_Map_t;
 
-#ifdef __TRACING__
-typedef const std::unique_ptr<opentracing::Span> otSpan;
-#else
-typedef const int otSpan;
-#endif
+
 
 #define VOLUME_URL_SIZE 256
+#define KA_MESSAGE_SIZE 2048
 
 typedef struct sctp_params {
+    int      epollTimeOut = -1;
     uint16_t rmrPort = 0;
+    uint16_t sctpPort = SRC_PORT;
     int      epoll_fd = 0;
     int      listenFD = 0;
     int      rmrListenFd = 0;
@@ -130,9 +133,9 @@ typedef struct sctp_params {
     int      inotifyWD = 0;
     void     *rmrCtx = nullptr;
     Sctp_Map_t *sctpMap = nullptr;
-    char      ka_message[4096] {};
+    char      ka_message[KA_MESSAGE_SIZE] {};
     int       ka_message_length = 0;
-    char       rmrAddress[256] {}; // "tcp:portnumber" "tcp:5566" listen to all address on port 5566
+    char       rmrAddress[256] {}; // "tcp:port number" "tcp:5566" listen to all address on port 5566
     mdclog_severity_t logLevel = MDCLOG_INFO;
     char volume[VOLUME_URL_SIZE];
     string myIP {};
@@ -141,9 +144,25 @@ typedef struct sctp_params {
     string configFilePath {};
     string configFileName {};
     bool trace = true;
-    //shared_timed_mutex fence; // moved to mapWrapper
+    shared_ptr<prometheus::Registry> prometheusRegistry;
+    string prometheusPort {"8088"};
+    Family<Counter> *prometheusFamily;
+    Exposer *prometheusExposer = nullptr;
 } sctp_params_t;
 
+// RAN to RIC
+#define IN_INITI 0 //INITIATING
+#define IN_SUCC 1 //SUCCESSFUL
+#define IN_UN_SUCC 2 //UN-Successful
+
+// RIC To RAN
+#define OUT_INITI 3 //INITIATING
+#define OUT_SUCC 4 //SUCCESSFUL
+#define OUT_UN_SUCC 5 //UN-Successful
+
+#define MSG_COUNTER 0
+#define BYTES_COUNTER 1
+
 typedef struct ConnectedCU {
     int fileDescriptor = 0;
     char hostName[NI_MAXHOST] {};
@@ -155,17 +174,19 @@ typedef struct ConnectedCU {
     bool isConnected = false;
     bool gotSetup = false;
     sctp_params_t *sctpParams = nullptr;
+    Counter *counters[6][2][ProcedureCode_id_RICsubscriptionDelete + 1] {};
 } ConnectedCU_t ;
 
-#define MAX_RMR_BUFF_ARRY 32
+
+#define MAX_RMR_BUFF_ARRAY 32
 typedef struct RmrMessagesBuffer {
-    char ka_message[4096] {};
+    char ka_message[KA_MESSAGE_SIZE] {};
     int  ka_message_len = 0;
     void *rmrCtx = nullptr;
     rmr_mbuf_t *sendMessage= nullptr;
-    rmr_mbuf_t *sendBufferedMessages[MAX_RMR_BUFF_ARRY] {};
+    //rmr_mbuf_t *sendBufferedMessages[MAX_RMR_BUFF_ARRAY] {};
     rmr_mbuf_t *rcvMessage= nullptr;
-    rmr_mbuf_t *rcvBufferedMessages[MAX_RMR_BUFF_ARRY] {};
+    //rmr_mbuf_t *rcvBufferedMessages[MAX_RMR_BUFF_ARRAY] {};
 } RmrMessagesBuffer_t;
 
 typedef struct formatedMessage {
@@ -202,51 +223,44 @@ int setSocketNoBlocking(int socket);
 void handleEinprogressMessages(struct epoll_event &event,
                                ReportingMessages_t &message,
                                RmrMessagesBuffer_t &rmrMessageBuffer,
-                               sctp_params_t *params,
-                               otSpan *pSpan);
+                               sctp_params_t *params);
 
 void handlepoll_error(struct epoll_event &event,
                       ReportingMessages_t &message,
                       RmrMessagesBuffer_t &rmrMessageBuffer,
-                      sctp_params_t *params,
-                      otSpan *pSpan);
+                      sctp_params_t *params);
 
 
-void cleanHashEntry(ConnectedCU_t *peerInfo, Sctp_Map_t *m, otSpan *pSpan);
+void cleanHashEntry(ConnectedCU_t *peerInfo, Sctp_Map_t *m);
 
-int getSetupRequestMetaData(ReportingMessages_t &message, char *data, char *host, uint16_t &port, otSpan *pSpan);
 
 /**
  *
  * @param message
  * @param rmrMessageBuffer
- * @param pSpan
  */
-void getRequestMetaData(ReportingMessages_t &message, RmrMessagesBuffer_t &rmrMessageBuffer, otSpan *pSpan);
+void getRequestMetaData(ReportingMessages_t &message, RmrMessagesBuffer_t &rmrMessageBuffer);
 
 /**
  *
  * @param sctpMap
- * @param messagBuffer
+ * @param messageBuffer
  * @param message
  * @param failedMesgId
- * @param pSpan
  * @return
  */
 int sendMessagetoCu(Sctp_Map_t *sctpMap,
-                    RmrMessagesBuffer_t &messagBuffer,
+                    RmrMessagesBuffer_t &messageBuffer,
                     ReportingMessages_t &message,
-                    int failedMesgId, otSpan *pSpan);
+                    int failedMesgId);
 
 void sendFailedSendingMessagetoXapp(RmrMessagesBuffer_t &rmrMessageBuffer,
                                     ReportingMessages_t &message,
-                                    int failedMesgId,
-                                    otSpan *pSpan);
+                                    int failedMesgId);
 
 int sendRequestToXapp(ReportingMessages_t &message,
                       int requestId,
-                      RmrMessagesBuffer_t &rmrMmessageBuffer,
-                      otSpan *pSpan);
+                      RmrMessagesBuffer_t &rmrMmessageBuffer);
 
 /**
  *
@@ -255,7 +269,6 @@ int sendRequestToXapp(ReportingMessages_t &message,
  * @param requestType
  * @param rmrMessageBuffer
  * @param sctpMap
- * @param pSpan
  * @return
  */
 /*
@@ -263,8 +276,7 @@ int sendResponseToXapp(ReportingMessages_t &message,
                        int msgType,
                        int requestType,
                        RmrMessagesBuffer_t &rmrMessageBuffer,
-                       Sctp_Map_t *sctpMap,
-                       otSpan *pSpan);
+                       Sctp_Map_t *sctpMap);
 */
 
 /**
@@ -272,13 +284,11 @@ int sendResponseToXapp(ReportingMessages_t &message,
  * @param peerInfo
  * @param message
  * @param m
- * @param pSpan
  * @return
  */
 int sendSctpMsg(ConnectedCU_t *peerInfo,
                 ReportingMessages_t &message,
-                Sctp_Map_t *m,
-                otSpan *pSpan);
+                Sctp_Map_t *m);
 
 /**
  *
@@ -287,113 +297,83 @@ int sendSctpMsg(ConnectedCU_t *peerInfo,
  * @param numOfMessages
  * @param rmrMessageBuffer
  * @param ts
- * @param pSpan
  * @return
  */
 int receiveDataFromSctp(struct epoll_event *events,
                         Sctp_Map_t *sctpMap,
                         int &numOfMessages,
                         RmrMessagesBuffer_t &rmrMessageBuffer,
-                        struct timespec &ts,
-                        otSpan *pSpan);
+                        struct timespec &ts);
 
 /**
  *
  * @param rmrAddress
- * @param pSpan
  * @return
  */
-void getRmrContext(sctp_params_t &pSctpParams, otSpan *pSpan);
+void getRmrContext(sctp_params_t &pSctpParams);
 
 /**
  *
- * @param epoll_fd
- * @param rmrCtx
  * @param sctpMap
- * @param messagBuffer
- * @param pSpan
- * @return
- */
-int receiveXappMessages(int epoll_fd,
-                        Sctp_Map_t *sctpMap,
-                        RmrMessagesBuffer_t &rmrMessageBuffer,
-                        struct timespec &ts,
-                        otSpan *pSpan);
-
-/**
- *
  * @param rmrMessageBuffer
- * @param message
- * @param epoll_fd
- * @param sctpMap
- * @param pSpan
+ * @param ts
  * @return
  */
-int connectToCUandSetUp(RmrMessagesBuffer_t &rmrMessageBuffer,
-                           ReportingMessages_t &message,
-                           int epoll_fd,
-                           Sctp_Map_t *sctpMap,
-                           otSpan *pSpan);
+int receiveXappMessages(Sctp_Map_t *sctpMap,
+                        RmrMessagesBuffer_t &rmrMessageBuffer,
+                        struct timespec &ts);
 
 /**
  *
- * @param messagBuffer
+ * @param messageBuffer
  * @param failedMsgId
  * @param sctpMap
- * @param pSpan
  * @return
  */
-int sendDirectionalSctpMsg(RmrMessagesBuffer_t &messagBuffer,
+int sendDirectionalSctpMsg(RmrMessagesBuffer_t &messageBuffer,
                            ReportingMessages_t &message,
                            int failedMsgId,
-                           Sctp_Map_t *sctpMap,
-                           otSpan *pSpan);
+                           Sctp_Map_t *sctpMap);
 /**
  *
  * @param pdu
  * @param message
  * @param rmrMessageBuffer
- * @param pSpan
  */
 void asnInitiatingRequest(E2AP_PDU_t *pdu,
+                          Sctp_Map_t *sctpMap,
                           ReportingMessages_t &message,
-                          RmrMessagesBuffer_t &rmrMessageBuffer,
-                          otSpan *pSpan);
+                          RmrMessagesBuffer_t &rmrMessageBuffer);
 /**
  *
  * @param pdu
  * @param message
  * @param sctpMap
  * @param rmrMessageBuffer
- * @param pSpan
  */
-void asnSuccsesfulMsg(E2AP_PDU_t *pdu,
-                      ReportingMessages_t &message,
+void asnSuccessfulMsg(E2AP_PDU_t *pdu,
                       Sctp_Map_t *sctpMap,
-                      RmrMessagesBuffer_t &rmrMessageBuffer,
-                      otSpan *pSpan);
+                      ReportingMessages_t &message,
+                      RmrMessagesBuffer_t &rmrMessageBuffer);
 /**
  *
  * @param pdu
  * @param message
  * @param sctpMap
  * @param rmrMessageBuffer
- * @param pSpan
  */
 void asnUnSuccsesfulMsg(E2AP_PDU_t *pdu,
-                        ReportingMessages_t &message,
                         Sctp_Map_t *sctpMap,
-                        RmrMessagesBuffer_t &rmrMessageBuffer,
-                        otSpan *pSpan);
+                        ReportingMessages_t &message,
+                        RmrMessagesBuffer_t &rmrMessageBuffer);
 
 /**
  *
  * @param rmrMessageBuffer
  * @param message
- * @param pSpan
  * @return
  */
-int sendRmrMessage(RmrMessagesBuffer_t &rmrMessageBuffer, ReportingMessages_t &message, otSpan *pSpan);
+int sendRmrMessage(RmrMessagesBuffer_t &rmrMessageBuffer, ReportingMessages_t &message);
 /**
  *
  * @param epoll_fd
@@ -402,10 +382,9 @@ int sendRmrMessage(RmrMessagesBuffer_t &rmrMessageBuffer, ReportingMessages_t &m
  * @param sctpMap
  * @param enodbName
  * @param msgType
- * @param pSpan
  * @returnsrc::logger_mt& lg = my_logger::get();
  */
-int addToEpoll(int epoll_fd, ConnectedCU_t *peerInfo, uint32_t events, Sctp_Map_t *sctpMap, char *enodbName, int msgType, otSpan *pSpan);
+int addToEpoll(int epoll_fd, ConnectedCU_t *peerInfo, uint32_t events, Sctp_Map_t *sctpMap, char *enodbName, int msgType);
 /**
  *
  * @param epoll_fd
@@ -414,10 +393,9 @@ int addToEpoll(int epoll_fd, ConnectedCU_t *peerInfo, uint32_t events, Sctp_Map_
  * @param sctpMap
  * @param enodbName
  * @param msgType
- * @param pSpan
  * @return
  */
-int modifyToEpoll(int epoll_fd, ConnectedCU_t *peerInfo, uint32_t events, Sctp_Map_t *sctpMap, char *enodbName, int msgType, otSpan *pSpan);
+int modifyToEpoll(int epoll_fd, ConnectedCU_t *peerInfo, uint32_t events, Sctp_Map_t *sctpMap, char *enodbName, int msgType);
 
 /**
  *
@@ -433,6 +411,14 @@ void buildJsonMessage(ReportingMessages_t &message);
  */
 string translateRmrErrorMessages(int state);
 
+int buildConfiguration(sctp_params_t &sctpParams);
+void startPrometheus(sctp_params_t &sctpParams);
+static int enable_log_change_notify(const char* fileName);
+static int register_log_change_notify(const char *fileName);
+static void * monitor_loglevel_change_handler(void* arg);
+void  update_mdc_log_level_severity(char* log_level);
+static char* parse_file(char* filename);
+
 
 static inline uint64_t rdtscp(uint32_t &aux) {
     uint64_t rax,rdx;
@@ -443,4 +429,10 @@ static inline uint64_t rdtscp(uint32_t &aux) {
 #define RIC_SCTP_CONNECTION_FAILURE  10080
 #endif
 
+#ifdef UNIT_TEST
+    #define FILE_DESCRIPTOR 53424 /*Dummy value for file descriptor only when UT is defined*/
+#endif
+
+int buildListeningPort(sctp_params_t &sctpParams);
+
 #endif //X2_SCTP_THREAD_H