From 82cfb2f10b4efdd3c9c95c2486c508848d0db3e3 Mon Sep 17 00:00:00 2001 From: sandeeku Date: Wed, 11 May 2022 20:32:45 +0530 Subject: [PATCH] Sctp MultiStream changes for RIC Change-Id: I202af9fbb8ebd2c12142ca2bda4fb8f16efec462 Signed-off-by: sandeeku --- RIC-E2-TERMINATION/Dockerfile | 2 +- RIC-E2-TERMINATION/TEST/sctp_thread_test.cpp | 18 +++--- RIC-E2-TERMINATION/sctpThread.cpp | 89 ++++++++++++++++++++++++++-- RIC-E2-TERMINATION/sctpThread.h | 7 ++- 4 files changed, 103 insertions(+), 13 deletions(-) diff --git a/RIC-E2-TERMINATION/Dockerfile b/RIC-E2-TERMINATION/Dockerfile index 0000d7a..78dedfa 100644 --- a/RIC-E2-TERMINATION/Dockerfile +++ b/RIC-E2-TERMINATION/Dockerfile @@ -156,7 +156,7 @@ COPY --from=ubuntu /usr/lib/x86_64-linux-gnu/libstdc++.so.6 /usr/lib/libstdc++.s COPY --from=ubuntu /usr/lib/x86_64-linux-gnu/libcurl-gnutls.so.4.6.0 /usr/lib/libcurl-gnutls.so.4.6.0 COPY --from=ubuntu /usr/lib/x86_64-linux-gnu/libcurl-gnutls.so.4 /usr/lib/libcurl-gnutls.so.4 COPY --from=ubuntu /usr/lib/x86_64-linux-gnu/libcurl-gnutls.so /usr/lib/libcurl-gnutls.so - +COPY --from=ubuntu /usr/lib/x86_64-linux-gnu/libsctp.so.1 /usr/lib/libsctp.so.1 COPY --from=ubuntu /usr/local/bin/rmr_probe /opt/e2/rmr_probe WORKDIR /opt/e2/ diff --git a/RIC-E2-TERMINATION/TEST/sctp_thread_test.cpp b/RIC-E2-TERMINATION/TEST/sctp_thread_test.cpp index 3c851e5..73655ab 100644 --- a/RIC-E2-TERMINATION/TEST/sctp_thread_test.cpp +++ b/RIC-E2-TERMINATION/TEST/sctp_thread_test.cpp @@ -184,8 +184,9 @@ void create_asnInitiatingReq_Procedure_RICserviceUpdate(E2AP_PDU_t *pdu, /* Sending E2AP_PDU_PR_initiatingMessage and procedure code as: ProcedureCode_id_RICserviceUpdate */ pdu->choice.initiatingMessage->procedureCode = ProcedureCode_id_RICserviceUpdate; pdu->choice.initiatingMessage->value.present = InitiatingMessage__value_PR_RICserviceUpdate; - - asnInitiatingRequest(pdu, sctpMap, message, rmrMessageBuffer); + + int streamId = 0; + asnInitiatingRequest(pdu, sctpMap, message, rmrMessageBuffer,streamId); delete_memories_initiatingMessage(pdu, rmrMessageBuffer); } @@ -208,8 +209,8 @@ void create_asnInitiatingReq_Procedure_RICindication(E2AP_PDU_t *pdu, ie->value.present = RICindication_IEs__value_PR_RICrequestID; ie->value.choice.RICrequestID.ricRequestorID = 12345; ie->value.choice.RICrequestID.ricInstanceID = 1; - - asnInitiatingRequest(pdu, sctpMap, message, rmrMessageBuffer); + int streamId = 0; + asnInitiatingRequest(pdu, sctpMap, message, rmrMessageBuffer,streamId); delete_memories_initiatingMessage(pdu, rmrMessageBuffer); } @@ -223,7 +224,8 @@ void create_asnInitiatingReq_Procedure_ErrorIndication(E2AP_PDU_t *pdu, /* Sending E2AP_PDU_PR_initiatingMessage and procedure code as: ProcedureCode_id_ErrorIndication */ pdu->choice.initiatingMessage->procedureCode = ProcedureCode_id_ErrorIndication; pdu->choice.initiatingMessage->value.present = InitiatingMessage__value_PR_ErrorIndication; - asnInitiatingRequest(pdu, sctpMap, message, rmrMessageBuffer); + int streamId = 0; + asnInitiatingRequest(pdu, sctpMap, message, rmrMessageBuffer, streamId); delete_memories_initiatingMessage(pdu, rmrMessageBuffer); } @@ -237,7 +239,8 @@ void create_asnInitiatingReq_Procedure_Reset(E2AP_PDU_t *pdu, /* Sending E2AP_PDU_PR_initiatingMessage and procedure code as: ProcedureCode_id_Reset */ pdu->choice.initiatingMessage->procedureCode = ProcedureCode_id_Reset; pdu->choice.initiatingMessage->value.present = InitiatingMessage__value_PR_ResetRequest; - asnInitiatingRequest(pdu, sctpMap, message, rmrMessageBuffer); + int streamId =0; + asnInitiatingRequest(pdu, sctpMap, message, rmrMessageBuffer,streamId); delete_memories_initiatingMessage(pdu, rmrMessageBuffer); } @@ -267,7 +270,8 @@ TEST(sctp, TEST7) { create_asnInitiatingReq_Procedure_Reset(&pdu, sctpMap, message, rmrMessageBuffer, sctp_ut_params); /* For Procedure's Default case. */ pdu.choice.initiatingMessage->procedureCode = ((ProcedureCode_t)100); - asnInitiatingRequest(&pdu, sctpMap, message, rmrMessageBuffer); + int streamId =0; + asnInitiatingRequest(&pdu, sctpMap, message, rmrMessageBuffer,streamId); if(pdu.choice.initiatingMessage) { free(pdu.choice.initiatingMessage); diff --git a/RIC-E2-TERMINATION/sctpThread.cpp b/RIC-E2-TERMINATION/sctpThread.cpp index e67efdb..088ce0e 100644 --- a/RIC-E2-TERMINATION/sctpThread.cpp +++ b/RIC-E2-TERMINATION/sctpThread.cpp @@ -292,6 +292,12 @@ int buildListeningPort(sctp_params_t &sctpParams) { return -1; #endif } + struct sctp_initmsg initmsg; + memset (&initmsg, 0, sizeof (initmsg)); + initmsg.sinit_num_ostreams = 2; + initmsg.sinit_max_instreams = 2; + initmsg.sinit_max_attempts = 4; + setsockopt (sctpParams.listenFD, IPPROTO_SCTP, SCTP_INITMSG, &initmsg, sizeof (initmsg)); struct sockaddr_in6 serverAddress {}; serverAddress.sin6_family = AF_INET6; @@ -897,6 +903,11 @@ void listener(sctp_params_t *params) { break; #endif } + struct sctp_event_subscribe sctpevents; + memset( (void *)&sctpevents, 0, sizeof(sctpevents) ); + sctpevents.sctp_data_io_event = 1; + setsockopt(peerInfo->fileDescriptor, SOL_SCTP, SCTP_EVENTS,(const void *)&sctpevents, sizeof(sctpevents) ); + auto ans = getnameinfo(&in_addr, in_len, peerInfo->hostName, NI_MAXHOST, peerInfo->portNumber, NI_MAXSERV, (unsigned )((unsigned int)NI_NUMERICHOST | (unsigned int)NI_NUMERICSERV)); @@ -1263,8 +1274,10 @@ int sendSctpMsg(ConnectedCU_t *peerInfo, ReportingMessages_t &message, Sctp_Map_ auto loglevel = mdclog_level_get(); #ifndef UNIT_TEST int fd = peerInfo->fileDescriptor; + int streamId = fetchStreamId(peerInfo,message); #else int fd = FILE_DESCRIPTOR; + int streamId = 0; #endif if (loglevel >= MDCLOG_DEBUG) { mdclog_write(MDCLOG_DEBUG, "Send SCTP message for CU %s, %s", @@ -1272,7 +1285,7 @@ int sendSctpMsg(ConnectedCU_t *peerInfo, ReportingMessages_t &message, Sctp_Map_ } while (true) { - if (send(fd,message.message.asndata, message.message.asnLength,MSG_NOSIGNAL) < 0) { + if (sctp_sendmsg(fd,message.message.asndata, message.message.asnLength,(struct sockaddr *) NULL, 0, 0, 0,streamId,0,0) < 0) { if (errno == EINTR) { continue; } @@ -1353,6 +1366,9 @@ int receiveDataFromSctp(struct epoll_event *events, ReportingMessages_t message {}; auto done = 0; auto loglevel = mdclog_level_get(); + struct sctp_sndrcvinfo sndrcvinfo; + int flags; + int streamId; // get the identity of the interface message.peerInfo = (ConnectedCU_t *)events->data.ptr; @@ -1372,9 +1388,12 @@ int receiveDataFromSctp(struct epoll_event *events, message.message.asndata = rmrMessageBuffer.sendMessage->payload; #ifndef UNIT_TEST message.message.asnLength = rmrMessageBuffer.sendMessage->len = - read(message.peerInfo->fileDescriptor, rmrMessageBuffer.sendMessage->payload, RECEIVE_SCTP_BUFFER_SIZE); + sctp_recvmsg(message.peerInfo->fileDescriptor, rmrMessageBuffer.sendMessage->payload, RECEIVE_SCTP_BUFFER_SIZE,(struct sockaddr *) NULL, 0, &sndrcvinfo, &flags); + mdclog_write(MDCLOG_DEBUG, "Start Read from SCTP fd %d stream %d ", message.peerInfo->fileDescriptor, sndrcvinfo.sinfo_stream); + streamId = sndrcvinfo.sinfo_stream; #else message.message.asnLength = rmrMessageBuffer.sendMessage->len; + streamId = 0; #endif if (loglevel >= MDCLOG_DEBUG) { @@ -1459,7 +1478,7 @@ int receiveDataFromSctp(struct epoll_event *events, switch (pdu->present) { case E2AP_PDU_PR_initiatingMessage: {//initiating message - asnInitiatingRequest(pdu, sctpMap,message, rmrMessageBuffer); + asnInitiatingRequest(pdu, sctpMap,message, rmrMessageBuffer, streamId); break; } case E2AP_PDU_PR_successfulOutcome: { //successful outcome @@ -2009,7 +2028,7 @@ int XML_From_PER(ReportingMessages_t &message, RmrMessagesBuffer_t &rmrMessageBu void asnInitiatingRequest(E2AP_PDU_t *pdu, Sctp_Map_t *sctpMap, ReportingMessages_t &message, - RmrMessagesBuffer_t &rmrMessageBuffer) { + RmrMessagesBuffer_t &rmrMessageBuffer, int streamId) { auto logLevel = mdclog_level_get(); auto procedureCode = ((InitiatingMessage_t *) pdu->choice.initiatingMessage)->procedureCode; if (logLevel >= MDCLOG_DEBUG) { @@ -2028,6 +2047,28 @@ void asnInitiatingRequest(E2AP_PDU_t *pdu, if (collectSetupRequestData(pdu, sctpMap, message) != 0) { break; } + struct sctp_status status; + int stat_size = sizeof(status); + getsockopt( message.peerInfo->fileDescriptor, SOL_SCTP, SCTP_STATUS,(void *)&status, (socklen_t *)&stat_size ); + if (logLevel >= MDCLOG_DEBUG) { + mdclog_write(MDCLOG_DEBUG, "Start from SCTP %d fd", message.peerInfo->fileDescriptor); + mdclog_write(MDCLOG_DEBUG, "SCTP status assoc id %d instrms %d outstrms %d", status.sstat_assoc_id, + status.sstat_instrms, status.sstat_outstrms); + } + if(status.sstat_outstrms == 1 || status.sstat_instrms == 1) + { + message.peerInfo->isSingleStream = true; + message.peerInfo->singleStreamId = streamId; + if (status.sstat_outstrms == 1 && status.sstat_instrms == 1){ + if (logLevel >= MDCLOG_DEBUG) { + mdclog_write(MDCLOG_DEBUG, "Single SCTP stream is used for sending from now on, assoc id %d streamId %d #instrms %d #outstrms %d, %s",status.sstat_assoc_id, streamId, status.sstat_instrms, status.sstat_outstrms, __FUNCTION__); + } + } + else { + mdclog_write(MDCLOG_ERR, "Single SCTP stream used for sending messages even if there is a mismatch in number of in & out streams, assoc id %d instrms %d outstrms %d", status.sstat_assoc_id, + status.sstat_instrms, status.sstat_outstrms); + } + } buildPrometheusList(message.peerInfo, message.peerInfo->sctpParams->prometheusFamily); @@ -3288,3 +3329,43 @@ string translateRmrErrorMessages(int state) { } return str; } +int fetchStreamId(ConnectedCU_t *peerInfo, ReportingMessages_t &message) +{ + auto loglevel = mdclog_level_get(); + int streamId = INVALID_STREAM_ID; + if(message.peerInfo->isSingleStream != false) + { + streamId = message.peerInfo->singleStreamId; + if (loglevel >= MDCLOG_DEBUG) { + mdclog_write(MDCLOG_DEBUG, "Send SCTP message for SINGLE_STREAM streamId %d , Messeage Type %d ,%s", + streamId,message.message.messageType, __FUNCTION__); + } + return streamId; + } + int msgType = message.message.messageType; + switch (msgType){ + case RIC_E2_RESET_REQ: + case RIC_E2_RESET_RESP: + case RIC_E2_SETUP_RESP: + case RIC_E2_SETUP_FAILURE: + case RIC_ERROR_INDICATION: + case RIC_SERVICE_QUERY: + case RIC_SERVICE_UPDATE_ACK: + case RIC_SERVICE_UPDATE_FAILURE: + streamId = 0; + break; + case RIC_SUB_REQ: + case RIC_SUB_DEL_REQ: + case RIC_CONTROL_REQ: + streamId = 1; + break; + default: + streamId = 0; + break; + } + if (loglevel >= MDCLOG_DEBUG) { + mdclog_write(MDCLOG_DEBUG, "Send SCTP message for streamId %d Messeage Type %d, %s", + streamId, message.message.messageType, __FUNCTION__); + } + return streamId; +} diff --git a/RIC-E2-TERMINATION/sctpThread.h b/RIC-E2-TERMINATION/sctpThread.h index 51527c0..3b68672 100644 --- a/RIC-E2-TERMINATION/sctpThread.h +++ b/RIC-E2-TERMINATION/sctpThread.h @@ -166,6 +166,8 @@ typedef struct sctp_params { #define MSG_COUNTER 0 #define BYTES_COUNTER 1 +#define INVALID_STREAM_ID -1 + typedef struct ConnectedCU { int fileDescriptor = 0; char hostName[NI_MAXHOST] {}; @@ -178,6 +180,8 @@ typedef struct ConnectedCU { bool gotSetup = false; sctp_params_t *sctpParams = nullptr; Counter *counters[6][2][ProcedureCode_id_RICsubscriptionDelete + 1] {}; + bool isSingleStream = false; + int singleStreamId = 0; } ConnectedCU_t ; @@ -346,7 +350,7 @@ int sendDirectionalSctpMsg(RmrMessagesBuffer_t &messageBuffer, void asnInitiatingRequest(E2AP_PDU_t *pdu, Sctp_Map_t *sctpMap, ReportingMessages_t &message, - RmrMessagesBuffer_t &rmrMessageBuffer); + RmrMessagesBuffer_t &rmrMessageBuffer,int streamId); /** * * @param pdu @@ -440,4 +444,5 @@ static inline uint64_t rdtscp(uint32_t &aux) { int buildListeningPort(sctp_params_t &sctpParams); void buildE2TPrometheusCounters(sctp_params_t &sctpParams); +int fetchStreamId(ConnectedCU_t *peerInfo, ReportingMessages_t &message); #endif //X2_SCTP_THREAD_H -- 2.16.6