RIC:1060: Change in PTL
[ric-plt/e2.git] / RIC-E2-TERMINATION / sctpThread.h
index 2149cd3..f5c4fb9 100644 (file)
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- */
+*/
 
 /*
  * This source code is part of the near-RT RIC (RAN Intelligent Controller)
  * platform project (RICP).
  */
 
-
 #ifndef X2_SCTP_THREAD_H
 #define X2_SCTP_THREAD_H
 
@@ -42,6 +41,8 @@
 #include <atomic>
 #include <sys/param.h>
 #include <sys/file.h>
+#include <sys/types.h>
+#include <ifaddrs.h>
 #include <ctime>
 #include <netdb.h>
 #include <sys/epoll.h>
 #include <shared_mutex>
 #include <iterator>
 #include <map>
+#include <sys/inotify.h>
+#include <csignal>
+#include <future>
+#include <bitset>
 
 #include <rmr/rmr.h>
 #include <rmr/RIC_message_types.h>
 #include <boost/log/sources/global_logger_storage.hpp>
 #include <boost/log/utility/setup/file.hpp>
 #include <boost/log/utility/setup/common_attributes.hpp>
-
+#include <boost/filesystem.hpp>
 
 #include <mdclog/mdclog.h>
 
-#include "3rdparty/asn1cFiles/E2AP-PDU.h"
-#include <3rdparty/asn1cFiles/ProtocolIE-Container.h>
-#include "3rdparty/asn1cFiles/InitiatingMessage.h"
-#include "3rdparty/asn1cFiles/SuccessfulOutcome.h"
-#include "3rdparty/asn1cFiles/UnsuccessfulOutcome.h"
-#include "3rdparty/asn1cFiles/ProtocolIE-Container.h"
-#include "3rdparty/asn1cFiles/ProtocolIE-Field.h"
-
-#ifdef __TRACING__
-#include "openTracing.h"
-#endif
+#include "oranE2/E2AP-PDU.h"
+#include "oranE2/ProtocolIE-Container.h"
+#include "oranE2/InitiatingMessage.h"
+#include "oranE2/SuccessfulOutcome.h"
+#include "oranE2/UnsuccessfulOutcome.h"
+#include "oranE2/ProtocolIE-Container.h"
+#include "oranE2/ProtocolIE-Field.h"
+#include "oranE2/GlobalE2node-gNB-ID.h"
+#include "oranE2/GlobalE2node-en-gNB-ID.h"
+#include "oranE2/GlobalE2node-ng-eNB-ID.h"
+#include "oranE2/GlobalE2node-eNB-ID.h"
+
+#include "cxxopts.hpp"
+//#include "config-cpp/include/config-cpp/config-cpp.h"
+#include <zlib.h>
+#include <prometheus/counter.h>
+#include <prometheus/exposer.h>
+#include <prometheus/gateway.h>
+#include <prometheus/registry.h>
+
+using namespace prometheus;
 
 #include "mapWrapper.h"
 
 #include "base64.h"
 
+#include "ReadConfigFile.h"
+
 using namespace std;
 namespace logging = boost::log;
 namespace src = boost::log::sources;
@@ -98,31 +115,67 @@ namespace expr = boost::log::expressions;
 
 #define MAXEVENTS 128
 
-#define RECEIVE_SCTP_BUFFER_SIZE (64*1024)
-#define RECEIVE_XAPP_BUFFER_SIZE RECEIVE_SCTP_BUFFER_SIZE 
+#define RECEIVE_SCTP_BUFFER_SIZE (8 * 1024)
+#define RECEIVE_XAPP_BUFFER_SIZE RECEIVE_SCTP_BUFFER_SIZE
 
 typedef mapWrapper Sctp_Map_t;
 
-#ifdef __TRACING__
-typedef const std::unique_ptr<opentracing::Span> otSpan;
-#else
-typedef const int otSpan;
-#endif
+
 
 #define VOLUME_URL_SIZE 256
+#define KA_MESSAGE_SIZE 2048
+
+enum E2T_Internal_Counters
+{
+    SCTP_ABORT_INITIATED_BY_E2NODE = 0,
+    INVALID_MESSAGE_RECEIVED = 1,
+    E2T_MAX_INTERNAL_COUNTER = 2,
+};
 
 typedef struct sctp_params {
+    int      epollTimeOut = -1;
     uint16_t rmrPort = 0;
+    uint16_t sctpPort = SRC_PORT;
     int      epoll_fd = 0;
+    int      listenFD = 0;
     int      rmrListenFd = 0;
+    int      inotifyFD = 0;
+    int      inotifyWD = 0;
     void     *rmrCtx = nullptr;
     Sctp_Map_t *sctpMap = nullptr;
-    char       rmrAddress[256] {}; // "tcp:portnumber" "tcp:5566" listen to all address on port 5566
-    mdclog_severity_t logLevel = MDCLOG_INFO;
+    char      ka_message[KA_MESSAGE_SIZE] {};
+    int       ka_message_length = 0;
+    char       rmrAddress[256] {}; // "tcp:port number" "tcp:5566" listen to all address on port 5566
     char volume[VOLUME_URL_SIZE];
-    //shared_timed_mutex fence; // moved to mapWrapper
+    string myIP {};
+    string fqdn {};
+    string podName {};
+    string configFilePath {};
+    string configFileName {};
+    bool trace = true;
+    shared_ptr<prometheus::Registry> prometheusRegistry;
+    string prometheusPort {"8088"};
+    Family<Counter> *prometheusFamily;
+    Exposer *prometheusExposer = nullptr;
+    Counter *e2tCounters[6][2][ProcedureCode_id_RICsubscriptionDeleteRequired + 1] {};
+    Counter *e2tInternalCounters[E2T_Internal_Counters::E2T_MAX_INTERNAL_COUNTER] {};
 } sctp_params_t;
 
+// RAN to RIC
+#define IN_INITI 0 //INITIATING
+#define IN_SUCC 1 //SUCCESSFUL
+#define IN_UN_SUCC 2 //UN-Successful
+
+// RIC To RAN
+#define OUT_INITI 3 //INITIATING
+#define OUT_SUCC 4 //SUCCESSFUL
+#define OUT_UN_SUCC 5 //UN-Successful
+
+#define MSG_COUNTER 0
+#define BYTES_COUNTER 1
+
+#define INVALID_STREAM_ID -1
+
 typedef struct ConnectedCU {
     int fileDescriptor = 0;
     char hostName[NI_MAXHOST] {};
@@ -132,15 +185,24 @@ typedef struct ConnectedCU {
     size_t asnLength = 0;
     int mtype = 0;
     bool isConnected = false;
+    bool gotSetup = false;
+    sctp_params_t *sctpParams = nullptr;
+    Counter *counters[6][2][ProcedureCode_id_RICsubscriptionDeleteRequired + 1] {};
+    bool isSingleStream = false;
+    int singleStreamId = 0;
+    Counter *e2tInternalCounters[E2T_Internal_Counters::E2T_MAX_INTERNAL_COUNTER] {};
 } ConnectedCU_t ;
 
-#define MAX_RMR_BUFF_ARRY 32
+
+#define MAX_RMR_BUFF_ARRAY 32
 typedef struct RmrMessagesBuffer {
-    void *rmrCtx;
-    rmr_mbuf_t *sendMessage;
-    rmr_mbuf_t *sendBufferedMessages[MAX_RMR_BUFF_ARRY];
-    rmr_mbuf_t *rcvMessage;
-    rmr_mbuf_t *rcvBufferedMessages[MAX_RMR_BUFF_ARRY];
+    char ka_message[KA_MESSAGE_SIZE] {};
+    int  ka_message_len = 0;
+    void *rmrCtx = nullptr;
+    rmr_mbuf_t *sendMessage= nullptr;
+    //rmr_mbuf_t *sendBufferedMessages[MAX_RMR_BUFF_ARRAY] {};
+    rmr_mbuf_t *rcvMessage= nullptr;
+    //rmr_mbuf_t *rcvBufferedMessages[MAX_RMR_BUFF_ARRAY] {};
 } RmrMessagesBuffer_t;
 
 typedef struct formatedMessage {
@@ -153,53 +215,115 @@ typedef struct formatedMessage {
 } FormatedMessage_t;
 
 typedef struct ReportingMessages {
-    FormatedMessage_t message;
-    int outLen;
-    unsigned char base64Data[RECEIVE_SCTP_BUFFER_SIZE * 2];
-    char buffer[RECEIVE_SCTP_BUFFER_SIZE * 8];
-    size_t bufferLen;
+    FormatedMessage_t message {};
+    ConnectedCU_t *peerInfo = nullptr;
+    long outLen = 0;
+    unsigned char base64Data[RECEIVE_SCTP_BUFFER_SIZE * 2] {};
+    char buffer[RECEIVE_SCTP_BUFFER_SIZE * 8] {};
 } ReportingMessages_t;
 
+enum E2T_Procedure_States
+{
+    E2_SETUP_PROCEDURE_NOT_INITIATED = 0,
+    E2_SETUP_PROCEDURE_ONGOING = 1,
+    E2_SETUP_PROCEDURE_COMPLETED = 2,
+    RIC_SERVICE_UPDATE_PROCEDURE_ONGOING = 3,
+    RIC_SERVICE_UPDATE_PROCEDURE_COMPLETED = 4,
+    RIC_SUBS_PROCEDURE_ONGOING = 5,
+    RIC_SUBS_PROCEDURE_COMPLETED = 6,
+    RIC_INDICATION_PROCEDURE_ONGOING = 7,
+    RIC_INDICATION_PROCEDURE_COMPLETED = 8,
+    RIC_SUBS_DEL_PROCEDURE_ONGOING = 9,
+    RIC_SUBS_DEL_PROCEDURE_COMPLETED = 10,
+    CONTROL_PROCEDURE_ONGOING = 11,
+    CONTROL_PROCEDURE_COMPLETED = 12,
+    E2_NODE_CONF_UPDATE_PROCEDURE_ONGOING = 13,
+    E2_NODE_CONF_UPDATE_PROCEDURE_COMPLETED = 14,
+    RESET_PROCEDURE_ONGOING = 15,
+    RESET_PROCEDURE_COMPLETED = 16,
+};
+
+struct E2NodeConnectionHandling
+{
+    E2T_Procedure_States e2tProcedureOngoingStatus;
+    long e2SetupProcedureTransactionId;
+};
+
+constexpr int negativeOne = -1;
+constexpr int negativeSix = -6;
+constexpr int negativeSeven = -7;
+constexpr int numberZero = 0;
+constexpr int numberOne = 1;
+constexpr int numberTwo = 2;
+constexpr int numberThree = 3;
+constexpr int numberFour = 4;
+constexpr int numberFive = 5;
+constexpr int numberTwenty = 20;
+
+constexpr uint8_t sendMsgMaxBitPosition = 2;
+constexpr uint8_t sendMsgToE2MBitSetPosition = 0;
+constexpr uint8_t sendMsgToSubMgrBitSetPosition = 1;
+
+constexpr uint8_t requiredIePresentMaxBitSetPosition = 3;
+constexpr uint8_t transactionIdIeBitSetPosition = 0;
+constexpr uint8_t ricRequestIdIeBitSetPosition = 1;
+constexpr uint8_t causeIeBitSetPosition = 2;
+
+cxxopts::ParseResult parse(int argc, char *argv[], sctp_params_t &pSctpParams);
+
+int buildInotify(sctp_params_t &sctpParams);
+
+void handleTermInit(sctp_params_t &sctpParams);
+
+void handleConfigChange(sctp_params_t *sctpParams);
 
 void listener(sctp_params_t *params);
 
+void sendTermInit(sctp_params_t &sctpParams);
+
 int setSocketNoBlocking(int socket);
 
-void cleanHashEntry(ConnectedCU_t *peerInfo, Sctp_Map_t *m, otSpan *pSpan);
+void handleEinprogressMessages(struct epoll_event &event,
+                               ReportingMessages_t &message,
+                               RmrMessagesBuffer_t &rmrMessageBuffer,
+                               sctp_params_t *params);
+
+void handlepoll_error(struct epoll_event &event,
+                      ReportingMessages_t &message,
+                      RmrMessagesBuffer_t &rmrMessageBuffer,
+                      sctp_params_t *params);
+
+
+void cleanHashEntry(ConnectedCU_t *peerInfo, Sctp_Map_t *m);
 
-int getSetupRequestMetaData(ReportingMessages_t &message, char *data, char *host, uint16_t &port, otSpan *pSpan);
 
 /**
  *
  * @param message
  * @param rmrMessageBuffer
- * @param pSpan
  */
-void getRequestMetaData(ReportingMessages_t &message, RmrMessagesBuffer_t &rmrMessageBuffer, otSpan *pSpan);
+void getRequestMetaData(ReportingMessages_t &message, RmrMessagesBuffer_t &rmrMessageBuffer);
 
 /**
  *
  * @param sctpMap
- * @param messagBuffer
+ * @param messageBuffer
  * @param message
  * @param failedMesgId
- * @param pSpan
  * @return
  */
 int sendMessagetoCu(Sctp_Map_t *sctpMap,
-                    RmrMessagesBuffer_t &messagBuffer,
+                    RmrMessagesBuffer_t &messageBuffer,
                     ReportingMessages_t &message,
-                    int failedMesgId, otSpan *pSpan);
+                    int failedMesgId);
 
 void sendFailedSendingMessagetoXapp(RmrMessagesBuffer_t &rmrMessageBuffer,
                                     ReportingMessages_t &message,
-                                    int failedMesgId,
-                                    otSpan *pSpan);
+                                    int failedMesgId);
 
 int sendRequestToXapp(ReportingMessages_t &message,
                       int requestId,
-                      RmrMessagesBuffer_t &rmrMmessageBuffer,
-                      otSpan *pSpan);
+                      RmrMessagesBuffer_t &rmrMmessageBuffer);
 
 /**
  *
@@ -208,28 +332,26 @@ int sendRequestToXapp(ReportingMessages_t &message,
  * @param requestType
  * @param rmrMessageBuffer
  * @param sctpMap
- * @param pSpan
  * @return
  */
+/*
 int sendResponseToXapp(ReportingMessages_t &message,
                        int msgType,
                        int requestType,
                        RmrMessagesBuffer_t &rmrMessageBuffer,
-                       Sctp_Map_t *sctpMap,
-                       otSpan *pSpan);
+                       Sctp_Map_t *sctpMap);
+*/
 
 /**
  *
  * @param peerInfo
  * @param message
  * @param m
- * @param pSpan
  * @return
  */
 int sendSctpMsg(ConnectedCU_t *peerInfo,
                 ReportingMessages_t &message,
-                Sctp_Map_t *m,
-                otSpan *pSpan);
+                Sctp_Map_t *m);
 
 /**
  *
@@ -238,114 +360,83 @@ int sendSctpMsg(ConnectedCU_t *peerInfo,
  * @param numOfMessages
  * @param rmrMessageBuffer
  * @param ts
- * @param pSpan
  * @return
  */
 int receiveDataFromSctp(struct epoll_event *events,
                         Sctp_Map_t *sctpMap,
                         int &numOfMessages,
                         RmrMessagesBuffer_t &rmrMessageBuffer,
-                        struct timespec &ts,
-                        otSpan *pSpan);
+                        struct timespec &ts);
 
 /**
  *
  * @param rmrAddress
- * @param pSpan
  * @return
  */
-void *getRmrContext(char *rmrAddress, otSpan *pSpan);
+void getRmrContext(sctp_params_t &pSctpParams);
 
 /**
  *
- * @param epoll_fd
- * @param rmrCtx
  * @param sctpMap
- * @param messagBuffer
- * @param pSpan
- * @return
- */
-int receiveXappMessages(int epoll_fd,
-                        Sctp_Map_t *sctpMap,
-                        RmrMessagesBuffer_t &rmrMessageBuffer,
-                        struct timespec &ts,
-                        otSpan *pSpan);
-
-/**
- *
  * @param rmrMessageBuffer
- * @param message
- * @param epoll_fd
- * @param sctpMap
- * @param pSpan
+ * @param ts
  * @return
  */
-int connectToCUandSetUp(RmrMessagesBuffer_t &rmrMessageBuffer,
-                           ReportingMessages_t &message,
-                           int epoll_fd,
-                           Sctp_Map_t *sctpMap,
-                           otSpan *pSpan);
+int receiveXappMessages(Sctp_Map_t *sctpMap,
+                        RmrMessagesBuffer_t &rmrMessageBuffer,
+                        struct timespec &ts);
 
 /**
  *
- * @param messagBuffer
+ * @param messageBuffer
  * @param failedMsgId
  * @param sctpMap
- * @param pSpan
  * @return
  */
-int sendDirectionalSctpMsg(RmrMessagesBuffer_t &messagBuffer,
+int sendDirectionalSctpMsg(RmrMessagesBuffer_t &messageBuffer,
                            ReportingMessages_t &message,
                            int failedMsgId,
-                           Sctp_Map_t *sctpMap,
-                           otSpan *pSpan);
+                           Sctp_Map_t *sctpMap);
 /**
  *
  * @param pdu
  * @param message
  * @param rmrMessageBuffer
- * @param pSpan
  */
 void asnInitiatingRequest(E2AP_PDU_t *pdu,
+                          Sctp_Map_t *sctpMap,
                           ReportingMessages_t &message,
-                          RmrMessagesBuffer_t &rmrMessageBuffer,
-                          otSpan *pSpan);
+                          RmrMessagesBuffer_t &rmrMessageBuffer,int streamId);
 /**
  *
  * @param pdu
  * @param message
  * @param sctpMap
  * @param rmrMessageBuffer
- * @param pSpan
  */
-void asnSuccsesfulMsg(E2AP_PDU_t *pdu,
-                      ReportingMessages_t &message,
+void asnSuccessfulMsg(E2AP_PDU_t *pdu,
                       Sctp_Map_t *sctpMap,
-                      RmrMessagesBuffer_t &rmrMessageBuffer,
-                      otSpan *pSpan);
+                      ReportingMessages_t &message,
+                      RmrMessagesBuffer_t &rmrMessageBuffer);
 /**
  *
  * @param pdu
  * @param message
  * @param sctpMap
  * @param rmrMessageBuffer
- * @param pSpan
  */
 void asnUnSuccsesfulMsg(E2AP_PDU_t *pdu,
-                        ReportingMessages_t &message,
                         Sctp_Map_t *sctpMap,
-                        RmrMessagesBuffer_t &rmrMessageBuffer,
-                        otSpan *pSpan);
+                        ReportingMessages_t &message,
+                        RmrMessagesBuffer_t &rmrMessageBuffer);
 
 /**
  *
  * @param rmrMessageBuffer
  * @param message
- * @param pSpan
  * @return
  */
-int sendRmrMessage(RmrMessagesBuffer_t &rmrMessageBuffer, ReportingMessages_t &message, otSpan *pSpan);
-
+int sendRmrMessage(RmrMessagesBuffer_t &rmrMessageBuffer, ReportingMessages_t &message);
 /**
  *
  * @param epoll_fd
@@ -354,10 +445,9 @@ int sendRmrMessage(RmrMessagesBuffer_t &rmrMessageBuffer, ReportingMessages_t &m
  * @param sctpMap
  * @param enodbName
  * @param msgType
- * @param pSpan
  * @returnsrc::logger_mt& lg = my_logger::get();
  */
-int addToEpoll(int epoll_fd, ConnectedCU_t *peerInfo, uint32_t events, Sctp_Map_t *sctpMap, char *enodbName, int msgType, otSpan *pSpan);
+int addToEpoll(int epoll_fd, ConnectedCU_t *peerInfo, uint32_t events, Sctp_Map_t *sctpMap, char *enodbName, int msgType);
 /**
  *
  * @param epoll_fd
@@ -366,10 +456,9 @@ int addToEpoll(int epoll_fd, ConnectedCU_t *peerInfo, uint32_t events, Sctp_Map_
  * @param sctpMap
  * @param enodbName
  * @param msgType
- * @param pSpan
  * @return
  */
-int modifyToEpoll(int epoll_fd, ConnectedCU_t *peerInfo, uint32_t events, Sctp_Map_t *sctpMap, char *enodbName, int msgType, otSpan *pSpan);
+int modifyToEpoll(int epoll_fd, ConnectedCU_t *peerInfo, uint32_t events, Sctp_Map_t *sctpMap, char *enodbName, int msgType);
 
 /**
  *
@@ -385,8 +474,39 @@ void buildJsonMessage(ReportingMessages_t &message);
  */
 string translateRmrErrorMessages(int state);
 
+int buildConfiguration(sctp_params_t &sctpParams);
+void startPrometheus(sctp_params_t &sctpParams);
+static int enable_log_change_notify(const char* fileName);
+static int register_log_change_notify(const char *fileName);
+static void * monitor_loglevel_change_handler(void* arg);
+void  update_mdc_log_level_severity(char* log_level);
+char* getinterfaceip();
+static char* parse_file(char* filename);
+
+
+static inline uint64_t rdtscp(uint32_t &aux) {
+    uint64_t rax,rdx;
+    asm volatile ("rdtscp\n" : "=a" (rax), "=d" (rdx), "=c" (aux) : :);
+    return (rdx << (unsigned)32) + rax;
+}
 #ifndef RIC_SCTP_CONNECTION_FAILURE
 #define RIC_SCTP_CONNECTION_FAILURE  10080
 #endif
 
+#ifdef UNIT_TEST
+    #define FILE_DESCRIPTOR 53424 /*Dummy value for file descriptor only when UT is defined*/
+#endif
+
+int buildListeningPort(sctp_params_t &sctpParams);
+void buildE2TPrometheusCounters(sctp_params_t &sctpParams);
+
+int fetchStreamId(ConnectedCU_t *peerInfo, ReportingMessages_t &message);
+void removeE2ConnectionEntryFromMap(char* eNBName);
+bool getE2tProcedureOngoingStatus(char *enbName, E2NodeConnectionHandling &e2NodeConnectionHandling);
+void setE2ProcedureOngoingStatus(char *enbName, E2T_Procedure_States state);
+void insertE2ProcedureOngoing(char *enbName, long &transactionID);
+E2T_Procedure_States currentE2tProcedureOngoingStatus(char *enbName);
+void printEntryPresentInMap();
+void handleE2SetupReq(ReportingMessages_t &message, RmrMessagesBuffer_t &rmrMessageBuffer, E2AP_PDU_t *pdu, long &transactionID, int streamId, Sctp_Map_t *sctpMap);
+bitset<sendMsgMaxBitPosition> getSendMsgBitSetValue(int procedureCode, bitset<requiredIePresentMaxBitSetPosition> isRequiredIesPresent, char* enbName);
 #endif //X2_SCTP_THREAD_H