X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=RIC-E2-TERMINATION%2FsctpThread.h;h=f5c4fb9a329e0eae9fdf92ed2403e0100ae00226;hb=refs%2Fchanges%2F62%2F12862%2F1;hp=26422bf4473ec6f5313d78a1eb606eff7409c74f;hpb=342843e307ac2e1d915b37350424f20123c65d10;p=ric-plt%2Fe2.git diff --git a/RIC-E2-TERMINATION/sctpThread.h b/RIC-E2-TERMINATION/sctpThread.h index 26422bf..f5c4fb9 100644 --- a/RIC-E2-TERMINATION/sctpThread.h +++ b/RIC-E2-TERMINATION/sctpThread.h @@ -41,6 +41,8 @@ #include #include #include +#include +#include #include #include #include @@ -50,6 +52,8 @@ #include #include #include +#include +#include #include #include @@ -69,20 +73,27 @@ #include -#include "asn1cFiles/E2AP-PDU.h" -#include "asn1cFiles/ProtocolIE-Container.h" -#include "asn1cFiles/InitiatingMessage.h" -#include "asn1cFiles/SuccessfulOutcome.h" -#include "asn1cFiles/UnsuccessfulOutcome.h" -#include "asn1cFiles/ProtocolIE-Container.h" -#include "asn1cFiles/ProtocolIE-Field.h" +#include "oranE2/E2AP-PDU.h" +#include "oranE2/ProtocolIE-Container.h" +#include "oranE2/InitiatingMessage.h" +#include "oranE2/SuccessfulOutcome.h" +#include "oranE2/UnsuccessfulOutcome.h" +#include "oranE2/ProtocolIE-Container.h" +#include "oranE2/ProtocolIE-Field.h" +#include "oranE2/GlobalE2node-gNB-ID.h" +#include "oranE2/GlobalE2node-en-gNB-ID.h" +#include "oranE2/GlobalE2node-ng-eNB-ID.h" +#include "oranE2/GlobalE2node-eNB-ID.h" #include "cxxopts.hpp" //#include "config-cpp/include/config-cpp/config-cpp.h" +#include +#include +#include +#include +#include -#ifdef __TRACING__ -#include "openTracing.h" -#endif +using namespace prometheus; #include "mapWrapper.h" @@ -104,31 +115,37 @@ namespace expr = boost::log::expressions; #define MAXEVENTS 128 -#define RECEIVE_SCTP_BUFFER_SIZE (64*1024) -#define RECEIVE_XAPP_BUFFER_SIZE RECEIVE_SCTP_BUFFER_SIZE +#define RECEIVE_SCTP_BUFFER_SIZE (8 * 1024) +#define RECEIVE_XAPP_BUFFER_SIZE RECEIVE_SCTP_BUFFER_SIZE typedef mapWrapper Sctp_Map_t; -#ifdef __TRACING__ -typedef const std::unique_ptr otSpan; -#else -typedef const int otSpan; -#endif + #define VOLUME_URL_SIZE 256 +#define KA_MESSAGE_SIZE 2048 + +enum E2T_Internal_Counters +{ + SCTP_ABORT_INITIATED_BY_E2NODE = 0, + INVALID_MESSAGE_RECEIVED = 1, + E2T_MAX_INTERNAL_COUNTER = 2, +}; typedef struct sctp_params { + int epollTimeOut = -1; uint16_t rmrPort = 0; + uint16_t sctpPort = SRC_PORT; int epoll_fd = 0; + int listenFD = 0; int rmrListenFd = 0; int inotifyFD = 0; int inotifyWD = 0; void *rmrCtx = nullptr; Sctp_Map_t *sctpMap = nullptr; - char ka_message[4096] {}; + char ka_message[KA_MESSAGE_SIZE] {}; int ka_message_length = 0; - char rmrAddress[256] {}; // "tcp:portnumber" "tcp:5566" listen to all address on port 5566 - mdclog_severity_t logLevel = MDCLOG_INFO; + char rmrAddress[256] {}; // "tcp:port number" "tcp:5566" listen to all address on port 5566 char volume[VOLUME_URL_SIZE]; string myIP {}; string fqdn {}; @@ -136,9 +153,29 @@ typedef struct sctp_params { string configFilePath {}; string configFileName {}; bool trace = true; - //shared_timed_mutex fence; // moved to mapWrapper + shared_ptr prometheusRegistry; + string prometheusPort {"8088"}; + Family *prometheusFamily; + Exposer *prometheusExposer = nullptr; + Counter *e2tCounters[6][2][ProcedureCode_id_RICsubscriptionDeleteRequired + 1] {}; + Counter *e2tInternalCounters[E2T_Internal_Counters::E2T_MAX_INTERNAL_COUNTER] {}; } sctp_params_t; +// RAN to RIC +#define IN_INITI 0 //INITIATING +#define IN_SUCC 1 //SUCCESSFUL +#define IN_UN_SUCC 2 //UN-Successful + +// RIC To RAN +#define OUT_INITI 3 //INITIATING +#define OUT_SUCC 4 //SUCCESSFUL +#define OUT_UN_SUCC 5 //UN-Successful + +#define MSG_COUNTER 0 +#define BYTES_COUNTER 1 + +#define INVALID_STREAM_ID -1 + typedef struct ConnectedCU { int fileDescriptor = 0; char hostName[NI_MAXHOST] {}; @@ -148,17 +185,24 @@ typedef struct ConnectedCU { size_t asnLength = 0; int mtype = 0; bool isConnected = false; + bool gotSetup = false; + sctp_params_t *sctpParams = nullptr; + Counter *counters[6][2][ProcedureCode_id_RICsubscriptionDeleteRequired + 1] {}; + bool isSingleStream = false; + int singleStreamId = 0; + Counter *e2tInternalCounters[E2T_Internal_Counters::E2T_MAX_INTERNAL_COUNTER] {}; } ConnectedCU_t ; -#define MAX_RMR_BUFF_ARRY 32 + +#define MAX_RMR_BUFF_ARRAY 32 typedef struct RmrMessagesBuffer { - char ka_message[4096] {}; + char ka_message[KA_MESSAGE_SIZE] {}; int ka_message_len = 0; void *rmrCtx = nullptr; rmr_mbuf_t *sendMessage= nullptr; - rmr_mbuf_t *sendBufferedMessages[MAX_RMR_BUFF_ARRY] {}; + //rmr_mbuf_t *sendBufferedMessages[MAX_RMR_BUFF_ARRAY] {}; rmr_mbuf_t *rcvMessage= nullptr; - rmr_mbuf_t *rcvBufferedMessages[MAX_RMR_BUFF_ARRY] {}; + //rmr_mbuf_t *rcvBufferedMessages[MAX_RMR_BUFF_ARRAY] {}; } RmrMessagesBuffer_t; typedef struct formatedMessage { @@ -171,12 +215,60 @@ typedef struct formatedMessage { } FormatedMessage_t; typedef struct ReportingMessages { - FormatedMessage_t message; - long outLen; - unsigned char base64Data[RECEIVE_SCTP_BUFFER_SIZE * 2]; - char buffer[RECEIVE_SCTP_BUFFER_SIZE * 8]; + FormatedMessage_t message {}; + ConnectedCU_t *peerInfo = nullptr; + long outLen = 0; + unsigned char base64Data[RECEIVE_SCTP_BUFFER_SIZE * 2] {}; + char buffer[RECEIVE_SCTP_BUFFER_SIZE * 8] {}; } ReportingMessages_t; +enum E2T_Procedure_States +{ + E2_SETUP_PROCEDURE_NOT_INITIATED = 0, + E2_SETUP_PROCEDURE_ONGOING = 1, + E2_SETUP_PROCEDURE_COMPLETED = 2, + RIC_SERVICE_UPDATE_PROCEDURE_ONGOING = 3, + RIC_SERVICE_UPDATE_PROCEDURE_COMPLETED = 4, + RIC_SUBS_PROCEDURE_ONGOING = 5, + RIC_SUBS_PROCEDURE_COMPLETED = 6, + RIC_INDICATION_PROCEDURE_ONGOING = 7, + RIC_INDICATION_PROCEDURE_COMPLETED = 8, + RIC_SUBS_DEL_PROCEDURE_ONGOING = 9, + RIC_SUBS_DEL_PROCEDURE_COMPLETED = 10, + CONTROL_PROCEDURE_ONGOING = 11, + CONTROL_PROCEDURE_COMPLETED = 12, + E2_NODE_CONF_UPDATE_PROCEDURE_ONGOING = 13, + E2_NODE_CONF_UPDATE_PROCEDURE_COMPLETED = 14, + RESET_PROCEDURE_ONGOING = 15, + RESET_PROCEDURE_COMPLETED = 16, +}; + +struct E2NodeConnectionHandling +{ + E2T_Procedure_States e2tProcedureOngoingStatus; + long e2SetupProcedureTransactionId; +}; + +constexpr int negativeOne = -1; +constexpr int negativeSix = -6; +constexpr int negativeSeven = -7; +constexpr int numberZero = 0; +constexpr int numberOne = 1; +constexpr int numberTwo = 2; +constexpr int numberThree = 3; +constexpr int numberFour = 4; +constexpr int numberFive = 5; +constexpr int numberTwenty = 20; + +constexpr uint8_t sendMsgMaxBitPosition = 2; +constexpr uint8_t sendMsgToE2MBitSetPosition = 0; +constexpr uint8_t sendMsgToSubMgrBitSetPosition = 1; + +constexpr uint8_t requiredIePresentMaxBitSetPosition = 3; +constexpr uint8_t transactionIdIeBitSetPosition = 0; +constexpr uint8_t ricRequestIdIeBitSetPosition = 1; +constexpr uint8_t causeIeBitSetPosition = 2; + cxxopts::ParseResult parse(int argc, char *argv[], sctp_params_t &pSctpParams); int buildInotify(sctp_params_t &sctpParams); @@ -194,51 +286,44 @@ int setSocketNoBlocking(int socket); void handleEinprogressMessages(struct epoll_event &event, ReportingMessages_t &message, RmrMessagesBuffer_t &rmrMessageBuffer, - sctp_params_t *params, - otSpan *pSpan); + sctp_params_t *params); void handlepoll_error(struct epoll_event &event, ReportingMessages_t &message, RmrMessagesBuffer_t &rmrMessageBuffer, - sctp_params_t *params, - otSpan *pSpan); + sctp_params_t *params); -void cleanHashEntry(ConnectedCU_t *peerInfo, Sctp_Map_t *m, otSpan *pSpan); +void cleanHashEntry(ConnectedCU_t *peerInfo, Sctp_Map_t *m); -int getSetupRequestMetaData(ReportingMessages_t &message, char *data, char *host, uint16_t &port, otSpan *pSpan); /** * * @param message * @param rmrMessageBuffer - * @param pSpan */ -void getRequestMetaData(ReportingMessages_t &message, RmrMessagesBuffer_t &rmrMessageBuffer, otSpan *pSpan); +void getRequestMetaData(ReportingMessages_t &message, RmrMessagesBuffer_t &rmrMessageBuffer); /** * * @param sctpMap - * @param messagBuffer + * @param messageBuffer * @param message * @param failedMesgId - * @param pSpan * @return */ int sendMessagetoCu(Sctp_Map_t *sctpMap, - RmrMessagesBuffer_t &messagBuffer, + RmrMessagesBuffer_t &messageBuffer, ReportingMessages_t &message, - int failedMesgId, otSpan *pSpan); + int failedMesgId); void sendFailedSendingMessagetoXapp(RmrMessagesBuffer_t &rmrMessageBuffer, ReportingMessages_t &message, - int failedMesgId, - otSpan *pSpan); + int failedMesgId); int sendRequestToXapp(ReportingMessages_t &message, int requestId, - RmrMessagesBuffer_t &rmrMmessageBuffer, - otSpan *pSpan); + RmrMessagesBuffer_t &rmrMmessageBuffer); /** * @@ -247,28 +332,26 @@ int sendRequestToXapp(ReportingMessages_t &message, * @param requestType * @param rmrMessageBuffer * @param sctpMap - * @param pSpan * @return */ +/* int sendResponseToXapp(ReportingMessages_t &message, int msgType, int requestType, RmrMessagesBuffer_t &rmrMessageBuffer, - Sctp_Map_t *sctpMap, - otSpan *pSpan); + Sctp_Map_t *sctpMap); +*/ /** * * @param peerInfo * @param message * @param m - * @param pSpan * @return */ int sendSctpMsg(ConnectedCU_t *peerInfo, ReportingMessages_t &message, - Sctp_Map_t *m, - otSpan *pSpan); + Sctp_Map_t *m); /** * @@ -277,114 +360,83 @@ int sendSctpMsg(ConnectedCU_t *peerInfo, * @param numOfMessages * @param rmrMessageBuffer * @param ts - * @param pSpan * @return */ int receiveDataFromSctp(struct epoll_event *events, Sctp_Map_t *sctpMap, int &numOfMessages, RmrMessagesBuffer_t &rmrMessageBuffer, - struct timespec &ts, - otSpan *pSpan); + struct timespec &ts); /** * * @param rmrAddress - * @param pSpan * @return */ -void getRmrContext(sctp_params_t &pSctpParams, otSpan *pSpan); +void getRmrContext(sctp_params_t &pSctpParams); /** * - * @param epoll_fd - * @param rmrCtx * @param sctpMap - * @param messagBuffer - * @param pSpan - * @return - */ -int receiveXappMessages(int epoll_fd, - Sctp_Map_t *sctpMap, - RmrMessagesBuffer_t &rmrMessageBuffer, - struct timespec &ts, - otSpan *pSpan); - -/** - * * @param rmrMessageBuffer - * @param message - * @param epoll_fd - * @param sctpMap - * @param pSpan + * @param ts * @return */ -int connectToCUandSetUp(RmrMessagesBuffer_t &rmrMessageBuffer, - ReportingMessages_t &message, - int epoll_fd, - Sctp_Map_t *sctpMap, - otSpan *pSpan); +int receiveXappMessages(Sctp_Map_t *sctpMap, + RmrMessagesBuffer_t &rmrMessageBuffer, + struct timespec &ts); /** * - * @param messagBuffer + * @param messageBuffer * @param failedMsgId * @param sctpMap - * @param pSpan * @return */ -int sendDirectionalSctpMsg(RmrMessagesBuffer_t &messagBuffer, +int sendDirectionalSctpMsg(RmrMessagesBuffer_t &messageBuffer, ReportingMessages_t &message, int failedMsgId, - Sctp_Map_t *sctpMap, - otSpan *pSpan); + Sctp_Map_t *sctpMap); /** * * @param pdu * @param message * @param rmrMessageBuffer - * @param pSpan */ void asnInitiatingRequest(E2AP_PDU_t *pdu, + Sctp_Map_t *sctpMap, ReportingMessages_t &message, - RmrMessagesBuffer_t &rmrMessageBuffer, - otSpan *pSpan); + RmrMessagesBuffer_t &rmrMessageBuffer,int streamId); /** * * @param pdu * @param message * @param sctpMap * @param rmrMessageBuffer - * @param pSpan */ -void asnSuccsesfulMsg(E2AP_PDU_t *pdu, - ReportingMessages_t &message, +void asnSuccessfulMsg(E2AP_PDU_t *pdu, Sctp_Map_t *sctpMap, - RmrMessagesBuffer_t &rmrMessageBuffer, - otSpan *pSpan); + ReportingMessages_t &message, + RmrMessagesBuffer_t &rmrMessageBuffer); /** * * @param pdu * @param message * @param sctpMap * @param rmrMessageBuffer - * @param pSpan */ void asnUnSuccsesfulMsg(E2AP_PDU_t *pdu, - ReportingMessages_t &message, Sctp_Map_t *sctpMap, - RmrMessagesBuffer_t &rmrMessageBuffer, - otSpan *pSpan); + ReportingMessages_t &message, + RmrMessagesBuffer_t &rmrMessageBuffer); /** * * @param rmrMessageBuffer * @param message - * @param pSpan * @return */ -int sendRmrMessage(RmrMessagesBuffer_t &rmrMessageBuffer, ReportingMessages_t &message, otSpan *pSpan); - +int sendRmrMessage(RmrMessagesBuffer_t &rmrMessageBuffer, ReportingMessages_t &message); /** * * @param epoll_fd @@ -393,10 +445,9 @@ int sendRmrMessage(RmrMessagesBuffer_t &rmrMessageBuffer, ReportingMessages_t &m * @param sctpMap * @param enodbName * @param msgType - * @param pSpan * @returnsrc::logger_mt& lg = my_logger::get(); */ -int addToEpoll(int epoll_fd, ConnectedCU_t *peerInfo, uint32_t events, Sctp_Map_t *sctpMap, char *enodbName, int msgType, otSpan *pSpan); +int addToEpoll(int epoll_fd, ConnectedCU_t *peerInfo, uint32_t events, Sctp_Map_t *sctpMap, char *enodbName, int msgType); /** * * @param epoll_fd @@ -405,10 +456,9 @@ int addToEpoll(int epoll_fd, ConnectedCU_t *peerInfo, uint32_t events, Sctp_Map_ * @param sctpMap * @param enodbName * @param msgType - * @param pSpan * @return */ -int modifyToEpoll(int epoll_fd, ConnectedCU_t *peerInfo, uint32_t events, Sctp_Map_t *sctpMap, char *enodbName, int msgType, otSpan *pSpan); +int modifyToEpoll(int epoll_fd, ConnectedCU_t *peerInfo, uint32_t events, Sctp_Map_t *sctpMap, char *enodbName, int msgType); /** * @@ -424,14 +474,39 @@ void buildJsonMessage(ReportingMessages_t &message); */ string translateRmrErrorMessages(int state); +int buildConfiguration(sctp_params_t &sctpParams); +void startPrometheus(sctp_params_t &sctpParams); +static int enable_log_change_notify(const char* fileName); +static int register_log_change_notify(const char *fileName); +static void * monitor_loglevel_change_handler(void* arg); +void update_mdc_log_level_severity(char* log_level); +char* getinterfaceip(); +static char* parse_file(char* filename); + static inline uint64_t rdtscp(uint32_t &aux) { uint64_t rax,rdx; asm volatile ("rdtscp\n" : "=a" (rax), "=d" (rdx), "=c" (aux) : :); - return (rdx << 32) + rax; + return (rdx << (unsigned)32) + rax; } #ifndef RIC_SCTP_CONNECTION_FAILURE #define RIC_SCTP_CONNECTION_FAILURE 10080 #endif +#ifdef UNIT_TEST + #define FILE_DESCRIPTOR 53424 /*Dummy value for file descriptor only when UT is defined*/ +#endif + +int buildListeningPort(sctp_params_t &sctpParams); +void buildE2TPrometheusCounters(sctp_params_t &sctpParams); + +int fetchStreamId(ConnectedCU_t *peerInfo, ReportingMessages_t &message); +void removeE2ConnectionEntryFromMap(char* eNBName); +bool getE2tProcedureOngoingStatus(char *enbName, E2NodeConnectionHandling &e2NodeConnectionHandling); +void setE2ProcedureOngoingStatus(char *enbName, E2T_Procedure_States state); +void insertE2ProcedureOngoing(char *enbName, long &transactionID); +E2T_Procedure_States currentE2tProcedureOngoingStatus(char *enbName); +void printEntryPresentInMap(); +void handleE2SetupReq(ReportingMessages_t &message, RmrMessagesBuffer_t &rmrMessageBuffer, E2AP_PDU_t *pdu, long &transactionID, int streamId, Sctp_Map_t *sctpMap); +bitset getSendMsgBitSetValue(int procedureCode, bitset isRequiredIesPresent, char* enbName); #endif //X2_SCTP_THREAD_H