X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=RIC-E2-TERMINATION%2FsctpThread.h;h=637dd17da5832949c9a03d1320cf2457b814f977;hb=96c9b882e2aed3838bcec26689dda01b3d11bc94;hp=419a2f64fddabca9ddb4bfce84793a0eb25c61a9;hpb=84c6004e378fa3ae9bfb8a8e7c8c83389003e484;p=ric-plt%2Fe2.git diff --git a/RIC-E2-TERMINATION/sctpThread.h b/RIC-E2-TERMINATION/sctpThread.h index 419a2f6..637dd17 100644 --- a/RIC-E2-TERMINATION/sctpThread.h +++ b/RIC-E2-TERMINATION/sctpThread.h @@ -15,6 +15,11 @@ * limitations under the License. */ +/* + * This source code is part of the near-RT RIC (RAN Intelligent Controller) + * platform project (RICP). + */ + #ifndef X2_SCTP_THREAD_H #define X2_SCTP_THREAD_H @@ -43,6 +48,8 @@ #include #include #include +#include +#include #include #include @@ -58,26 +65,33 @@ #include #include #include - +#include #include -#include "3rdparty/asn1cFiles/E2AP-PDU.h" -#include <3rdparty/asn1cFiles/ProtocolIE-Container.h> -#include "3rdparty/asn1cFiles/InitiatingMessage.h" -#include "3rdparty/asn1cFiles/SuccessfulOutcome.h" -#include "3rdparty/asn1cFiles/UnsuccessfulOutcome.h" -#include "3rdparty/asn1cFiles/ProtocolIE-Container.h" -#include "3rdparty/asn1cFiles/ProtocolIE-Field.h" +#include "oranE2/E2AP-PDU.h" +#include "oranE2/ProtocolIE-Container.h" +#include "oranE2/InitiatingMessage.h" +#include "oranE2/SuccessfulOutcome.h" +#include "oranE2/UnsuccessfulOutcome.h" +#include "oranE2/ProtocolIE-Container.h" +#include "oranE2/ProtocolIE-Field.h" +#include "oranE2/GlobalE2node-gNB-ID.h" +#include "oranE2/GlobalE2node-en-gNB-ID.h" +#include "oranE2/GlobalE2node-ng-eNB-ID.h" +#include "oranE2/GlobalE2node-eNB-ID.h" + +#include "cxxopts.hpp" +//#include "config-cpp/include/config-cpp/config-cpp.h" -#ifdef __TRACING__ -#include "openTracing.h" -#endif #include "mapWrapper.h" +#include "statCollector.h" #include "base64.h" +#include "ReadConfigFile.h" + using namespace std; namespace logging = boost::log; namespace src = boost::log::sources; @@ -92,28 +106,37 @@ namespace expr = boost::log::expressions; #define MAXEVENTS 128 -#define RECEIVE_SCTP_BUFFER_SIZE (64*1024) +#define RECEIVE_SCTP_BUFFER_SIZE (8 * 1024) #define RECEIVE_XAPP_BUFFER_SIZE RECEIVE_SCTP_BUFFER_SIZE typedef mapWrapper Sctp_Map_t; -#ifdef __TRACING__ -typedef const std::unique_ptr otSpan; -#else -typedef const int otSpan; -#endif + #define VOLUME_URL_SIZE 256 +#define KA_MESSAGE_SIZE 2048 typedef struct sctp_params { uint16_t rmrPort = 0; + uint16_t sctpPort = SRC_PORT; int epoll_fd = 0; + int listenFD = 0; int rmrListenFd = 0; + int inotifyFD = 0; + int inotifyWD = 0; void *rmrCtx = nullptr; Sctp_Map_t *sctpMap = nullptr; + char ka_message[KA_MESSAGE_SIZE] {}; + int ka_message_length = 0; char rmrAddress[256] {}; // "tcp:portnumber" "tcp:5566" listen to all address on port 5566 mdclog_severity_t logLevel = MDCLOG_INFO; char volume[VOLUME_URL_SIZE]; + string myIP {}; + string fqdn {}; + string podName {}; + string configFilePath {}; + string configFileName {}; + bool trace = true; //shared_timed_mutex fence; // moved to mapWrapper } sctp_params_t; @@ -126,15 +149,19 @@ typedef struct ConnectedCU { size_t asnLength = 0; int mtype = 0; bool isConnected = false; + bool gotSetup = false; + sctp_params_t *sctpParams = nullptr; } ConnectedCU_t ; #define MAX_RMR_BUFF_ARRY 32 typedef struct RmrMessagesBuffer { - void *rmrCtx; - rmr_mbuf_t *sendMessage; - rmr_mbuf_t *sendBufferedMessages[MAX_RMR_BUFF_ARRY]; - rmr_mbuf_t *rcvMessage; - rmr_mbuf_t *rcvBufferedMessages[MAX_RMR_BUFF_ARRY]; + char ka_message[KA_MESSAGE_SIZE] {}; + int ka_message_len = 0; + void *rmrCtx = nullptr; + rmr_mbuf_t *sendMessage= nullptr; + rmr_mbuf_t *sendBufferedMessages[MAX_RMR_BUFF_ARRY] {}; + rmr_mbuf_t *rcvMessage= nullptr; + rmr_mbuf_t *rcvBufferedMessages[MAX_RMR_BUFF_ARRY] {}; } RmrMessagesBuffer_t; typedef struct formatedMessage { @@ -147,29 +174,48 @@ typedef struct formatedMessage { } FormatedMessage_t; typedef struct ReportingMessages { - FormatedMessage_t message; - int outLen; - unsigned char base64Data[RECEIVE_SCTP_BUFFER_SIZE * 2]; - char buffer[RECEIVE_SCTP_BUFFER_SIZE * 8]; - size_t bufferLen; + FormatedMessage_t message {}; + ConnectedCU_t *peerInfo = nullptr; + long outLen = 0; + unsigned char base64Data[RECEIVE_SCTP_BUFFER_SIZE * 2] {}; + char buffer[RECEIVE_SCTP_BUFFER_SIZE * 8] {}; + StatCollector *statCollector = nullptr; } ReportingMessages_t; +cxxopts::ParseResult parse(int argc, char *argv[], sctp_params_t &pSctpParams); + +int buildInotify(sctp_params_t &sctpParams); + +void handleTermInit(sctp_params_t &sctpParams); + +void handleConfigChange(sctp_params_t *sctpParams); void listener(sctp_params_t *params); +void sendTermInit(sctp_params_t &sctpParams); + int setSocketNoBlocking(int socket); -void cleanHashEntry(ConnectedCU_t *peerInfo, Sctp_Map_t *m, otSpan *pSpan); +void handleEinprogressMessages(struct epoll_event &event, + ReportingMessages_t &message, + RmrMessagesBuffer_t &rmrMessageBuffer, + sctp_params_t *params); + +void handlepoll_error(struct epoll_event &event, + ReportingMessages_t &message, + RmrMessagesBuffer_t &rmrMessageBuffer, + sctp_params_t *params); + + +void cleanHashEntry(ConnectedCU_t *peerInfo, Sctp_Map_t *m); -int getSetupRequestMetaData(ReportingMessages_t &message, char *data, char *host, uint16_t &port, otSpan *pSpan); /** * * @param message * @param rmrMessageBuffer - * @param pSpan */ -void getRequestMetaData(ReportingMessages_t &message, RmrMessagesBuffer_t &rmrMessageBuffer, otSpan *pSpan); +void getRequestMetaData(ReportingMessages_t &message, RmrMessagesBuffer_t &rmrMessageBuffer); /** * @@ -177,23 +223,20 @@ void getRequestMetaData(ReportingMessages_t &message, RmrMessagesBuffer_t &rmrMe * @param messagBuffer * @param message * @param failedMesgId - * @param pSpan * @return */ int sendMessagetoCu(Sctp_Map_t *sctpMap, RmrMessagesBuffer_t &messagBuffer, ReportingMessages_t &message, - int failedMesgId, otSpan *pSpan); + int failedMesgId); void sendFailedSendingMessagetoXapp(RmrMessagesBuffer_t &rmrMessageBuffer, ReportingMessages_t &message, - int failedMesgId, - otSpan *pSpan); + int failedMesgId); int sendRequestToXapp(ReportingMessages_t &message, int requestId, - RmrMessagesBuffer_t &rmrMmessageBuffer, - otSpan *pSpan); + RmrMessagesBuffer_t &rmrMmessageBuffer); /** * @@ -202,28 +245,26 @@ int sendRequestToXapp(ReportingMessages_t &message, * @param requestType * @param rmrMessageBuffer * @param sctpMap - * @param pSpan * @return */ +/* int sendResponseToXapp(ReportingMessages_t &message, int msgType, int requestType, RmrMessagesBuffer_t &rmrMessageBuffer, - Sctp_Map_t *sctpMap, - otSpan *pSpan); + Sctp_Map_t *sctpMap); +*/ /** * * @param peerInfo * @param message * @param m - * @param pSpan * @return */ int sendSctpMsg(ConnectedCU_t *peerInfo, ReportingMessages_t &message, - Sctp_Map_t *m, - otSpan *pSpan); + Sctp_Map_t *m); /** * @@ -232,23 +273,20 @@ int sendSctpMsg(ConnectedCU_t *peerInfo, * @param numOfMessages * @param rmrMessageBuffer * @param ts - * @param pSpan * @return */ int receiveDataFromSctp(struct epoll_event *events, Sctp_Map_t *sctpMap, int &numOfMessages, RmrMessagesBuffer_t &rmrMessageBuffer, - struct timespec &ts, - otSpan *pSpan); + struct timespec &ts); /** * * @param rmrAddress - * @param pSpan * @return */ -void *getRmrContext(char *rmrAddress, otSpan *pSpan); +void getRmrContext(sctp_params_t &pSctpParams); /** * @@ -256,90 +294,60 @@ void *getRmrContext(char *rmrAddress, otSpan *pSpan); * @param rmrCtx * @param sctpMap * @param messagBuffer - * @param pSpan * @return */ -int receiveXappMessages(int epoll_fd, - Sctp_Map_t *sctpMap, +int receiveXappMessages(Sctp_Map_t *sctpMap, RmrMessagesBuffer_t &rmrMessageBuffer, - struct timespec &ts, - otSpan *pSpan); - -/** - * - * @param rmrMessageBuffer - * @param message - * @param epoll_fd - * @param sctpMap - * @param pSpan - * @return - */ -int connectToCUandSetUp(RmrMessagesBuffer_t &rmrMessageBuffer, - ReportingMessages_t &message, - int epoll_fd, - Sctp_Map_t *sctpMap, - otSpan *pSpan); + struct timespec &ts); /** * * @param messagBuffer * @param failedMsgId * @param sctpMap - * @param pSpan * @return */ int sendDirectionalSctpMsg(RmrMessagesBuffer_t &messagBuffer, ReportingMessages_t &message, int failedMsgId, - Sctp_Map_t *sctpMap, - otSpan *pSpan); + Sctp_Map_t *sctpMap); /** * * @param pdu * @param message * @param rmrMessageBuffer - * @param pSpan */ void asnInitiatingRequest(E2AP_PDU_t *pdu, ReportingMessages_t &message, - RmrMessagesBuffer_t &rmrMessageBuffer, - otSpan *pSpan); + RmrMessagesBuffer_t &rmrMessageBuffer); /** * * @param pdu * @param message * @param sctpMap * @param rmrMessageBuffer - * @param pSpan */ void asnSuccsesfulMsg(E2AP_PDU_t *pdu, ReportingMessages_t &message, - Sctp_Map_t *sctpMap, - RmrMessagesBuffer_t &rmrMessageBuffer, - otSpan *pSpan); + RmrMessagesBuffer_t &rmrMessageBuffer); /** * * @param pdu * @param message * @param sctpMap * @param rmrMessageBuffer - * @param pSpan */ void asnUnSuccsesfulMsg(E2AP_PDU_t *pdu, ReportingMessages_t &message, - Sctp_Map_t *sctpMap, - RmrMessagesBuffer_t &rmrMessageBuffer, - otSpan *pSpan); + RmrMessagesBuffer_t &rmrMessageBuffer); /** * * @param rmrMessageBuffer * @param message - * @param pSpan * @return */ -int sendRmrMessage(RmrMessagesBuffer_t &rmrMessageBuffer, ReportingMessages_t &message, otSpan *pSpan); - +int sendRmrMessage(RmrMessagesBuffer_t &rmrMessageBuffer, ReportingMessages_t &message); /** * * @param epoll_fd @@ -348,10 +356,9 @@ int sendRmrMessage(RmrMessagesBuffer_t &rmrMessageBuffer, ReportingMessages_t &m * @param sctpMap * @param enodbName * @param msgType - * @param pSpan * @returnsrc::logger_mt& lg = my_logger::get(); */ -int addToEpoll(int epoll_fd, ConnectedCU_t *peerInfo, uint32_t events, Sctp_Map_t *sctpMap, char *enodbName, int msgType, otSpan *pSpan); +int addToEpoll(int epoll_fd, ConnectedCU_t *peerInfo, uint32_t events, Sctp_Map_t *sctpMap, char *enodbName, int msgType); /** * * @param epoll_fd @@ -360,10 +367,9 @@ int addToEpoll(int epoll_fd, ConnectedCU_t *peerInfo, uint32_t events, Sctp_Map_ * @param sctpMap * @param enodbName * @param msgType - * @param pSpan * @return */ -int modifyToEpoll(int epoll_fd, ConnectedCU_t *peerInfo, uint32_t events, Sctp_Map_t *sctpMap, char *enodbName, int msgType, otSpan *pSpan); +int modifyToEpoll(int epoll_fd, ConnectedCU_t *peerInfo, uint32_t events, Sctp_Map_t *sctpMap, char *enodbName, int msgType); /** * @@ -379,6 +385,12 @@ void buildJsonMessage(ReportingMessages_t &message); */ string translateRmrErrorMessages(int state); + +static inline uint64_t rdtscp(uint32_t &aux) { + uint64_t rax,rdx; + asm volatile ("rdtscp\n" : "=a" (rax), "=d" (rdx), "=c" (aux) : :); + return (rdx << (unsigned)32) + rax; +} #ifndef RIC_SCTP_CONNECTION_FAILURE #define RIC_SCTP_CONNECTION_FAILURE 10080 #endif