/* * Copyright 2019 AT&T Intellectual Property * Copyright 2019 Nokia * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ /* * This source code is part of the near-RT RIC (RAN Intelligent Controller) * platform project (RICP). */ #ifndef X2_SCTP_THREAD_H #define X2_SCTP_THREAD_H #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "oranE2/E2AP-PDU.h" #include "oranE2/ProtocolIE-Container.h" #include "oranE2/InitiatingMessage.h" #include "oranE2/SuccessfulOutcome.h" #include "oranE2/UnsuccessfulOutcome.h" #include "oranE2/ProtocolIE-Container.h" #include "oranE2/ProtocolIE-Field.h" #include "oranE2/GlobalE2node-gNB-ID.h" #include "oranE2/GlobalE2node-en-gNB-ID.h" #include "oranE2/GlobalE2node-ng-eNB-ID.h" #include "oranE2/GlobalE2node-eNB-ID.h" #include "cxxopts.hpp" //#include "config-cpp/include/config-cpp/config-cpp.h" #include #include #include #include #include using namespace prometheus; #include "mapWrapper.h" #include "base64.h" #include "ReadConfigFile.h" using namespace std; namespace logging = boost::log; namespace src = boost::log::sources; namespace keywords = boost::log::keywords; namespace sinks = boost::log::sinks; namespace posix_time = boost::posix_time; namespace expr = boost::log::expressions; #define SRC_PORT 36422 #define SA struct sockaddr #define MAX_ENODB_NAME_SIZE 64 #define MAXEVENTS 128 #define RECEIVE_SCTP_BUFFER_SIZE (8 * 1024) #define RECEIVE_XAPP_BUFFER_SIZE RECEIVE_SCTP_BUFFER_SIZE typedef mapWrapper Sctp_Map_t; #define VOLUME_URL_SIZE 256 #define KA_MESSAGE_SIZE 2048 enum E2T_Internal_Counters { SCTP_ABORT_INITIATED_BY_E2NODE = 0, INVALID_MESSAGE_RECEIVED = 1, E2T_MAX_INTERNAL_COUNTER = 2, }; typedef struct sctp_params { int epollTimeOut = -1; uint16_t rmrPort = 0; uint16_t sctpPort = SRC_PORT; int epoll_fd = 0; int listenFD = 0; int rmrListenFd = 0; int inotifyFD = 0; int inotifyWD = 0; void *rmrCtx = nullptr; Sctp_Map_t *sctpMap = nullptr; char ka_message[KA_MESSAGE_SIZE] {}; int ka_message_length = 0; char rmrAddress[256] {}; // "tcp:port number" "tcp:5566" listen to all address on port 5566 char volume[VOLUME_URL_SIZE]; string myIP {}; string fqdn {}; string podName {}; string configFilePath {}; string configFileName {}; bool trace = true; shared_ptr prometheusRegistry; string prometheusPort {"8088"}; Family *prometheusFamily; Exposer *prometheusExposer = nullptr; Counter *e2tCounters[6][2][ProcedureCode_id_RICsubscriptionDeleteRequired + 1] {}; Counter *e2tInternalCounters[E2T_Internal_Counters::E2T_MAX_INTERNAL_COUNTER] {}; } sctp_params_t; // RAN to RIC #define IN_INITI 0 //INITIATING #define IN_SUCC 1 //SUCCESSFUL #define IN_UN_SUCC 2 //UN-Successful // RIC To RAN #define OUT_INITI 3 //INITIATING #define OUT_SUCC 4 //SUCCESSFUL #define OUT_UN_SUCC 5 //UN-Successful #define MSG_COUNTER 0 #define BYTES_COUNTER 1 #define INVALID_STREAM_ID -1 typedef struct ConnectedCU { int fileDescriptor = 0; char hostName[NI_MAXHOST] {}; char portNumber[NI_MAXSERV] {}; char enodbName[MAX_ENODB_NAME_SIZE] {}; char asnData[RECEIVE_SCTP_BUFFER_SIZE] {}; size_t asnLength = 0; int mtype = 0; bool isConnected = false; bool gotSetup = false; sctp_params_t *sctpParams = nullptr; Counter *counters[6][2][ProcedureCode_id_RICsubscriptionDeleteRequired + 1] {}; bool isSingleStream = false; int singleStreamId = 0; Counter *e2tInternalCounters[E2T_Internal_Counters::E2T_MAX_INTERNAL_COUNTER] {}; } ConnectedCU_t ; #define MAX_RMR_BUFF_ARRAY 32 typedef struct RmrMessagesBuffer { char ka_message[KA_MESSAGE_SIZE] {}; int ka_message_len = 0; void *rmrCtx = nullptr; rmr_mbuf_t *sendMessage= nullptr; //rmr_mbuf_t *sendBufferedMessages[MAX_RMR_BUFF_ARRAY] {}; rmr_mbuf_t *rcvMessage= nullptr; //rmr_mbuf_t *rcvBufferedMessages[MAX_RMR_BUFF_ARRAY] {}; } RmrMessagesBuffer_t; typedef struct formatedMessage { char enodbName[MAX_ENODB_NAME_SIZE]; struct timespec time; int messageType; char direction; ssize_t asnLength; unsigned char *asndata; } FormatedMessage_t; typedef struct ReportingMessages { FormatedMessage_t message {}; ConnectedCU_t *peerInfo = nullptr; long outLen = 0; unsigned char base64Data[RECEIVE_SCTP_BUFFER_SIZE * 2] {}; char buffer[RECEIVE_SCTP_BUFFER_SIZE * 8] {}; } ReportingMessages_t; enum E2T_Procedure_States { E2_SETUP_PROCEDURE_NOT_INITIATED = 0, E2_SETUP_PROCEDURE_ONGOING = 1, E2_SETUP_PROCEDURE_COMPLETED = 2, RIC_SERVICE_UPDATE_PROCEDURE_ONGOING = 3, RIC_SERVICE_UPDATE_PROCEDURE_COMPLETED = 4, RIC_SUBS_PROCEDURE_ONGOING = 5, RIC_SUBS_PROCEDURE_COMPLETED = 6, RIC_INDICATION_PROCEDURE_ONGOING = 7, RIC_INDICATION_PROCEDURE_COMPLETED = 8, RIC_SUBS_DEL_PROCEDURE_ONGOING = 9, RIC_SUBS_DEL_PROCEDURE_COMPLETED = 10, CONTROL_PROCEDURE_ONGOING = 11, CONTROL_PROCEDURE_COMPLETED = 12, E2_NODE_CONF_UPDATE_PROCEDURE_ONGOING = 13, E2_NODE_CONF_UPDATE_PROCEDURE_COMPLETED = 14, RESET_PROCEDURE_ONGOING = 15, RESET_PROCEDURE_COMPLETED = 16, }; struct E2NodeConnectionHandling { E2T_Procedure_States e2tProcedureOngoingStatus; long e2SetupProcedureTransactionId; }; constexpr int negativeOne = -1; constexpr int negativeSix = -6; constexpr int negativeSeven = -7; constexpr int numberZero = 0; constexpr int numberOne = 1; constexpr int numberTwo = 2; constexpr int numberThree = 3; constexpr int numberFour = 4; constexpr int numberFive = 5; constexpr int numberTwenty = 20; constexpr uint8_t sendMsgMaxBitPosition = 2; constexpr uint8_t sendMsgToE2MBitSetPosition = 0; constexpr uint8_t sendMsgToSubMgrBitSetPosition = 1; constexpr uint8_t requiredIePresentMaxBitSetPosition = 3; constexpr uint8_t transactionIdIeBitSetPosition = 0; constexpr uint8_t ricRequestIdIeBitSetPosition = 1; constexpr uint8_t causeIeBitSetPosition = 2; cxxopts::ParseResult parse(int argc, char *argv[], sctp_params_t &pSctpParams); int buildInotify(sctp_params_t &sctpParams); void handleTermInit(sctp_params_t &sctpParams); void handleConfigChange(sctp_params_t *sctpParams); void listener(sctp_params_t *params); void sendTermInit(sctp_params_t &sctpParams); int setSocketNoBlocking(int socket); void handleEinprogressMessages(struct epoll_event &event, ReportingMessages_t &message, RmrMessagesBuffer_t &rmrMessageBuffer, sctp_params_t *params); void handlepoll_error(struct epoll_event &event, ReportingMessages_t &message, RmrMessagesBuffer_t &rmrMessageBuffer, sctp_params_t *params); void cleanHashEntry(ConnectedCU_t *peerInfo, Sctp_Map_t *m); /** * * @param message * @param rmrMessageBuffer */ void getRequestMetaData(ReportingMessages_t &message, RmrMessagesBuffer_t &rmrMessageBuffer); /** * * @param sctpMap * @param messageBuffer * @param message * @param failedMesgId * @return */ int sendMessagetoCu(Sctp_Map_t *sctpMap, RmrMessagesBuffer_t &messageBuffer, ReportingMessages_t &message, int failedMesgId); void sendFailedSendingMessagetoXapp(RmrMessagesBuffer_t &rmrMessageBuffer, ReportingMessages_t &message, int failedMesgId); int sendRequestToXapp(ReportingMessages_t &message, int requestId, RmrMessagesBuffer_t &rmrMmessageBuffer); /** * * @param message * @param msgType * @param requestType * @param rmrMessageBuffer * @param sctpMap * @return */ /* int sendResponseToXapp(ReportingMessages_t &message, int msgType, int requestType, RmrMessagesBuffer_t &rmrMessageBuffer, Sctp_Map_t *sctpMap); */ /** * * @param peerInfo * @param message * @param m * @return */ int sendSctpMsg(ConnectedCU_t *peerInfo, ReportingMessages_t &message, Sctp_Map_t *m); /** * * @param events * @param sctpMap * @param numOfMessages * @param rmrMessageBuffer * @param ts * @return */ int receiveDataFromSctp(struct epoll_event *events, Sctp_Map_t *sctpMap, int &numOfMessages, RmrMessagesBuffer_t &rmrMessageBuffer, struct timespec &ts); /** * * @param rmrAddress * @return */ void getRmrContext(sctp_params_t &pSctpParams); /** * * @param sctpMap * @param rmrMessageBuffer * @param ts * @return */ int receiveXappMessages(Sctp_Map_t *sctpMap, RmrMessagesBuffer_t &rmrMessageBuffer, struct timespec &ts); /** * * @param messageBuffer * @param failedMsgId * @param sctpMap * @return */ int sendDirectionalSctpMsg(RmrMessagesBuffer_t &messageBuffer, ReportingMessages_t &message, int failedMsgId, Sctp_Map_t *sctpMap); /** * * @param pdu * @param message * @param rmrMessageBuffer */ void asnInitiatingRequest(E2AP_PDU_t *pdu, Sctp_Map_t *sctpMap, ReportingMessages_t &message, RmrMessagesBuffer_t &rmrMessageBuffer,int streamId); /** * * @param pdu * @param message * @param sctpMap * @param rmrMessageBuffer */ void asnSuccessfulMsg(E2AP_PDU_t *pdu, Sctp_Map_t *sctpMap, ReportingMessages_t &message, RmrMessagesBuffer_t &rmrMessageBuffer); /** * * @param pdu * @param message * @param sctpMap * @param rmrMessageBuffer */ void asnUnSuccsesfulMsg(E2AP_PDU_t *pdu, Sctp_Map_t *sctpMap, ReportingMessages_t &message, RmrMessagesBuffer_t &rmrMessageBuffer); /** * * @param rmrMessageBuffer * @param message * @return */ int sendRmrMessage(RmrMessagesBuffer_t &rmrMessageBuffer, ReportingMessages_t &message); /** * * @param epoll_fd * @param peerInfo * @param events * @param sctpMap * @param enodbName * @param msgType * @returnsrc::logger_mt& lg = my_logger::get(); */ int addToEpoll(int epoll_fd, ConnectedCU_t *peerInfo, uint32_t events, Sctp_Map_t *sctpMap, char *enodbName, int msgType); /** * * @param epoll_fd * @param peerInfo * @param events * @param sctpMap * @param enodbName * @param msgType * @return */ int modifyToEpoll(int epoll_fd, ConnectedCU_t *peerInfo, uint32_t events, Sctp_Map_t *sctpMap, char *enodbName, int msgType); /** * * @param message */ void buildJsonMessage(ReportingMessages_t &message); /** * * * @param state * @return */ string translateRmrErrorMessages(int state); int buildConfiguration(sctp_params_t &sctpParams); void startPrometheus(sctp_params_t &sctpParams); static int enable_log_change_notify(const char* fileName); static int register_log_change_notify(const char *fileName); static void * monitor_loglevel_change_handler(void* arg); void update_mdc_log_level_severity(char* log_level); char* getinterfaceip(); static char* parse_file(char* filename); static inline uint64_t rdtscp(uint32_t &aux) { uint64_t rax,rdx; asm volatile ("rdtscp\n" : "=a" (rax), "=d" (rdx), "=c" (aux) : :); return (rdx << (unsigned)32) + rax; } #ifndef RIC_SCTP_CONNECTION_FAILURE #define RIC_SCTP_CONNECTION_FAILURE 10080 #endif #ifdef UNIT_TEST #define FILE_DESCRIPTOR 53424 /*Dummy value for file descriptor only when UT is defined*/ #endif int buildListeningPort(sctp_params_t &sctpParams); void buildE2TPrometheusCounters(sctp_params_t &sctpParams); int fetchStreamId(ConnectedCU_t *peerInfo, ReportingMessages_t &message); void removeE2ConnectionEntryFromMap(char* eNBName); bool getE2tProcedureOngoingStatus(char *enbName, E2NodeConnectionHandling &e2NodeConnectionHandling); void setE2ProcedureOngoingStatus(char *enbName, E2T_Procedure_States state); void insertE2ProcedureOngoing(char *enbName, long &transactionID); E2T_Procedure_States currentE2tProcedureOngoingStatus(char *enbName); void printEntryPresentInMap(); void handleE2SetupReq(ReportingMessages_t &message, RmrMessagesBuffer_t &rmrMessageBuffer, E2AP_PDU_t *pdu, long &transactionID, int streamId, Sctp_Map_t *sctpMap); bitset getSendMsgBitSetValue(int procedureCode, bitset isRequiredIesPresent, char* enbName); #endif //X2_SCTP_THREAD_H