Squash-commit of e2ap-v2.0 branch
[ric-plt/e2.git] / RIC-E2-TERMINATION / sctpThread.cpp
index 55f9083..088ce0e 100644 (file)
@@ -292,6 +292,12 @@ int buildListeningPort(sctp_params_t &sctpParams) {
         return -1;
 #endif        
     }
+    struct sctp_initmsg initmsg;
+    memset (&initmsg, 0, sizeof (initmsg));
+    initmsg.sinit_num_ostreams = 2;
+    initmsg.sinit_max_instreams = 2;
+    initmsg.sinit_max_attempts = 4;
+    setsockopt (sctpParams.listenFD, IPPROTO_SCTP, SCTP_INITMSG, &initmsg, sizeof (initmsg));
 
     struct sockaddr_in6 serverAddress {};
     serverAddress.sin6_family = AF_INET6;
@@ -897,6 +903,11 @@ void listener(sctp_params_t *params) {
                         break;
 #endif                        
                     }
+                    struct sctp_event_subscribe sctpevents;
+                    memset( (void *)&sctpevents, 0, sizeof(sctpevents) );
+                    sctpevents.sctp_data_io_event = 1;
+                    setsockopt(peerInfo->fileDescriptor, SOL_SCTP, SCTP_EVENTS,(const void *)&sctpevents, sizeof(sctpevents) );
+
                     auto  ans = getnameinfo(&in_addr, in_len,
                                             peerInfo->hostName, NI_MAXHOST,
                                             peerInfo->portNumber, NI_MAXSERV, (unsigned )((unsigned int)NI_NUMERICHOST | (unsigned int)NI_NUMERICSERV));
@@ -1263,8 +1274,10 @@ int sendSctpMsg(ConnectedCU_t *peerInfo, ReportingMessages_t &message, Sctp_Map_
     auto loglevel = mdclog_level_get();
 #ifndef UNIT_TEST    
     int fd = peerInfo->fileDescriptor;
+    int streamId = fetchStreamId(peerInfo,message);
 #else
     int fd = FILE_DESCRIPTOR;
+    int streamId = 0;
 #endif    
     if (loglevel >= MDCLOG_DEBUG) {
         mdclog_write(MDCLOG_DEBUG, "Send SCTP message for CU %s, %s",
@@ -1272,7 +1285,7 @@ int sendSctpMsg(ConnectedCU_t *peerInfo, ReportingMessages_t &message, Sctp_Map_
     }
 
     while (true) {
-        if (send(fd,message.message.asndata, message.message.asnLength,MSG_NOSIGNAL) < 0) {
+        if (sctp_sendmsg(fd,message.message.asndata, message.message.asnLength,(struct sockaddr *) NULL, 0, 0, 0,streamId,0,0) < 0) {
             if (errno == EINTR) {
                 continue;
             }
@@ -1353,6 +1366,9 @@ int receiveDataFromSctp(struct epoll_event *events,
     ReportingMessages_t message {};
     auto done = 0;
     auto loglevel = mdclog_level_get();
+    struct sctp_sndrcvinfo sndrcvinfo;
+    int flags;
+    int streamId;
 
     // get the identity of the interface
     message.peerInfo = (ConnectedCU_t *)events->data.ptr;
@@ -1372,9 +1388,12 @@ int receiveDataFromSctp(struct epoll_event *events,
         message.message.asndata = rmrMessageBuffer.sendMessage->payload;
 #ifndef UNIT_TEST        
         message.message.asnLength = rmrMessageBuffer.sendMessage->len =
-                read(message.peerInfo->fileDescriptor, rmrMessageBuffer.sendMessage->payload, RECEIVE_SCTP_BUFFER_SIZE);
+                sctp_recvmsg(message.peerInfo->fileDescriptor, rmrMessageBuffer.sendMessage->payload, RECEIVE_SCTP_BUFFER_SIZE,(struct sockaddr *) NULL, 0, &sndrcvinfo, &flags);
+        mdclog_write(MDCLOG_DEBUG, "Start Read from SCTP fd %d stream %d ", message.peerInfo->fileDescriptor, sndrcvinfo.sinfo_stream);
+        streamId = sndrcvinfo.sinfo_stream;
 #else
         message.message.asnLength = rmrMessageBuffer.sendMessage->len;
+        streamId = 0;
 #endif
 
         if (loglevel >= MDCLOG_DEBUG) {
@@ -1459,7 +1478,7 @@ int receiveDataFromSctp(struct epoll_event *events,
 
         switch (pdu->present) {
             case E2AP_PDU_PR_initiatingMessage: {//initiating message
-                asnInitiatingRequest(pdu, sctpMap,message, rmrMessageBuffer);
+                asnInitiatingRequest(pdu, sctpMap,message, rmrMessageBuffer, streamId);
                 break;
             }
             case E2AP_PDU_PR_successfulOutcome: { //successful outcome
@@ -1761,6 +1780,15 @@ void buildE2TPrometheusCounters(sctp_params_t &sctpParams) {
     sctpParams.e2tCounters[OUT_UN_SUCC][MSG_COUNTER][ProcedureCode_id_E2setup] = &sctpParams.prometheusFamily->Add({{"counter", "SetupRequestFailureMsgs"}});
     sctpParams.e2tCounters[OUT_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_E2setup] = &sctpParams.prometheusFamily->Add({{"counter", "SetupRequestFailureBytes"}});
 
+    sctpParams.e2tCounters[IN_INITI][MSG_COUNTER][(ProcedureCode_id_E2nodeConfigurationUpdate)] = &sctpParams.prometheusFamily->Add({{"counter", "E2NodeConfigUpdateMsgs"}});
+    sctpParams.e2tCounters[IN_INITI][BYTES_COUNTER][(ProcedureCode_id_E2nodeConfigurationUpdate)] = &sctpParams.prometheusFamily->Add({{"counter", "E2NodeConfigUpdateBytes"}});
+
+    sctpParams.e2tCounters[OUT_SUCC][MSG_COUNTER][(ProcedureCode_id_E2nodeConfigurationUpdate)] = &sctpParams.prometheusFamily->Add({{"counter", "E2NodeConfigUpdateResponseMsgs"}});
+    sctpParams.e2tCounters[OUT_SUCC][BYTES_COUNTER][(ProcedureCode_id_E2nodeConfigurationUpdate)] = &sctpParams.prometheusFamily->Add({{"counter", "E2NodeConfigUpdateResponseBytes"}});
+
+    sctpParams.e2tCounters[OUT_UN_SUCC][MSG_COUNTER][ProcedureCode_id_E2nodeConfigurationUpdate] = &sctpParams.prometheusFamily->Add({{"counter", "E2NodeConfigUpdateFailureMsgs"}});
+    sctpParams.e2tCounters[OUT_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_E2nodeConfigurationUpdate] = &sctpParams.prometheusFamily->Add({{"counter", "E2NodeConfigUpdateFailureBytes"}});
+
     sctpParams.e2tCounters[IN_INITI][MSG_COUNTER][(ProcedureCode_id_ErrorIndication)] = &sctpParams.prometheusFamily->Add({{"counter", "ErrorIndicationMsgs"}});
     sctpParams.e2tCounters[IN_INITI][BYTES_COUNTER][(ProcedureCode_id_ErrorIndication)] = &sctpParams.prometheusFamily->Add({{"counter", "ErrorIndicationBytes"}});
 
@@ -1817,6 +1845,9 @@ void buildPrometheusList(ConnectedCU_t *peerInfo, Family<Counter> *prometheusFam
     peerInfo->counters[IN_INITI][MSG_COUNTER][(ProcedureCode_id_E2setup)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"SetupRequest", "Messages"}});
     peerInfo->counters[IN_INITI][BYTES_COUNTER][(ProcedureCode_id_E2setup)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"SetupRequest", "Bytes"}});
 
+    peerInfo->counters[IN_INITI][MSG_COUNTER][(ProcedureCode_id_E2nodeConfigurationUpdate)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"E2NodeConfigUpdate", "Messages"}});
+    peerInfo->counters[IN_INITI][BYTES_COUNTER][(ProcedureCode_id_E2nodeConfigurationUpdate)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"E2NodeConfigUpdate", "Bytes"}});
+
     peerInfo->counters[IN_INITI][MSG_COUNTER][(ProcedureCode_id_ErrorIndication)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"ErrorIndication", "Messages"}});
     peerInfo->counters[IN_INITI][BYTES_COUNTER][(ProcedureCode_id_ErrorIndication)] = &prometheusFamily->Add({{peerInfo->enodbName, "IN"}, {"ErrorIndication", "Bytes"}});
 
@@ -1873,6 +1904,9 @@ void buildPrometheusList(ConnectedCU_t *peerInfo, Family<Counter> *prometheusFam
     peerInfo->counters[OUT_SUCC][MSG_COUNTER][(ProcedureCode_id_E2setup)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"SetupResponse", "Messages"}});
     peerInfo->counters[OUT_SUCC][BYTES_COUNTER][(ProcedureCode_id_E2setup)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"SetupResponse", "Bytes"}});
 
+    peerInfo->counters[OUT_SUCC][MSG_COUNTER][(ProcedureCode_id_E2nodeConfigurationUpdate)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"E2NodeConfigUpdateSuccess", "Messages"}});
+    peerInfo->counters[OUT_SUCC][BYTES_COUNTER][(ProcedureCode_id_E2nodeConfigurationUpdate)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"E2NodeConfigUpdateSuccess", "Bytes"}});
+
     peerInfo->counters[OUT_SUCC][MSG_COUNTER][(ProcedureCode_id_Reset)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"ResetACK", "Messages"}});
     peerInfo->counters[OUT_SUCC][BYTES_COUNTER][(ProcedureCode_id_Reset)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"ResetACK", "Bytes"}});
 
@@ -1882,6 +1916,9 @@ void buildPrometheusList(ConnectedCU_t *peerInfo, Family<Counter> *prometheusFam
     peerInfo->counters[OUT_UN_SUCC][MSG_COUNTER][(ProcedureCode_id_E2setup)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"SetupRequestFailure", "Messages"}});
     peerInfo->counters[OUT_UN_SUCC][BYTES_COUNTER][(ProcedureCode_id_E2setup)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"SetupRequestFailure", "Bytes"}});
 
+    peerInfo->counters[OUT_UN_SUCC][MSG_COUNTER][(ProcedureCode_id_E2nodeConfigurationUpdate)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"E2NodeConfigUpdateFailure", "Messages"}});
+    peerInfo->counters[OUT_UN_SUCC][BYTES_COUNTER][(ProcedureCode_id_E2nodeConfigurationUpdate)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"E2NodeConfigUpdateFailure", "Bytes"}});
+
     peerInfo->counters[OUT_UN_SUCC][MSG_COUNTER][(ProcedureCode_id_RICserviceUpdate)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"RICserviceUpdateFailure", "Messages"}});
     peerInfo->counters[OUT_UN_SUCC][BYTES_COUNTER][(ProcedureCode_id_RICserviceUpdate)] = &prometheusFamily->Add({{peerInfo->enodbName, "OUT"}, {"RICserviceUpdateFailure", "Bytes"}});
 }
@@ -1991,7 +2028,7 @@ int XML_From_PER(ReportingMessages_t &message, RmrMessagesBuffer_t &rmrMessageBu
 void asnInitiatingRequest(E2AP_PDU_t *pdu,
                           Sctp_Map_t *sctpMap,
                           ReportingMessages_t &message,
-                          RmrMessagesBuffer_t &rmrMessageBuffer) {
+                          RmrMessagesBuffer_t &rmrMessageBuffer, int streamId) {
     auto logLevel = mdclog_level_get();
     auto procedureCode = ((InitiatingMessage_t *) pdu->choice.initiatingMessage)->procedureCode;
     if (logLevel >= MDCLOG_DEBUG) {
@@ -2010,6 +2047,28 @@ void asnInitiatingRequest(E2AP_PDU_t *pdu,
             if (collectSetupRequestData(pdu, sctpMap, message) != 0) {
                 break;
             }
+            struct sctp_status status;
+            int stat_size = sizeof(status);
+            getsockopt( message.peerInfo->fileDescriptor, SOL_SCTP, SCTP_STATUS,(void *)&status, (socklen_t *)&stat_size );
+            if (logLevel >= MDCLOG_DEBUG) {
+                mdclog_write(MDCLOG_DEBUG, "Start from SCTP %d fd", message.peerInfo->fileDescriptor);
+                mdclog_write(MDCLOG_DEBUG, "SCTP status assoc id %d instrms %d outstrms %d", status.sstat_assoc_id,
+                        status.sstat_instrms, status.sstat_outstrms);
+            }
+            if(status.sstat_outstrms == 1 || status.sstat_instrms == 1)
+            {
+                message.peerInfo->isSingleStream = true;
+                message.peerInfo->singleStreamId = streamId;
+                if (status.sstat_outstrms == 1 && status.sstat_instrms == 1){
+                    if (logLevel >= MDCLOG_DEBUG) {
+                        mdclog_write(MDCLOG_DEBUG, "Single SCTP stream is used for sending from now on, assoc id %d streamId %d #instrms %d #outstrms %d, %s",status.sstat_assoc_id, streamId, status.sstat_instrms, status.sstat_outstrms, __FUNCTION__);
+                    }
+                }
+                else {
+                    mdclog_write(MDCLOG_ERR, "Single SCTP stream used for sending messages even if there is a mismatch in number of in & out streams, assoc id %d instrms %d outstrms %d", status.sstat_assoc_id,
+                            status.sstat_instrms, status.sstat_outstrms);
+                }
+            }
 
             buildPrometheusList(message.peerInfo, message.peerInfo->sctpParams->prometheusFamily);
 
@@ -2053,6 +2112,27 @@ void asnInitiatingRequest(E2AP_PDU_t *pdu,
             buildAndSendSetupRequest(message, rmrMessageBuffer, pdu);
             break;
         }
+
+case ProcedureCode_id_E2nodeConfigurationUpdate: {
+            if (logLevel >= MDCLOG_DEBUG) {
+                mdclog_write(MDCLOG_DEBUG, "Got E2nodeConfigurationUpdate %s", message.message.enodbName);
+            }
+
+            string messageName("RICE2nodeConfigurationUpdate");
+            string ieName("RICE2nodeConfigurationUpdateIEs");
+            message.message.messageType = RIC_E2NODE_CONFIG_UPDATE;
+#if !(defined(UNIT_TEST) || defined(MODULE_TEST))
+            message.peerInfo->counters[IN_INITI][MSG_COUNTER][ProcedureCode_id_E2nodeConfigurationUpdate]->Increment();
+            message.peerInfo->counters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_E2nodeConfigurationUpdate]->Increment((double)message.message.asnLength);
+
+            // Update E2T instance level metrics
+            message.peerInfo->sctpParams->e2tCounters[IN_INITI][MSG_COUNTER][ProcedureCode_id_E2nodeConfigurationUpdate]->Increment();
+            message.peerInfo->sctpParams->e2tCounters[IN_INITI][BYTES_COUNTER][ProcedureCode_id_E2nodeConfigurationUpdate]->Increment((double)message.message.asnLength);
+#endif
+            buildAndSendSetupRequest(message, rmrMessageBuffer, pdu);
+            break;
+        }
+
         case ProcedureCode_id_ErrorIndication: {
             if (logLevel >= MDCLOG_DEBUG) {
                 mdclog_write(MDCLOG_DEBUG, "Got ErrorIndication %s", message.message.enodbName);
@@ -2629,6 +2709,51 @@ int receiveXappMessages(Sctp_Map_t *sctpMap,
             }
             break;
         }
+
+        case RIC_E2NODE_CONFIG_UPDATE_ACK: {
+            if (loglevel >= MDCLOG_DEBUG) {
+                mdclog_write(MDCLOG_DEBUG, "RIC_E2NODE_CONFIG_UPDATE_ACK");
+            }
+            if (PER_FromXML(message, rmrMessageBuffer) != 0) {
+                break;
+            }
+#if !(defined(UNIT_TEST) || defined(MODULE_TEST))
+            message.peerInfo->counters[OUT_SUCC][MSG_COUNTER][ProcedureCode_id_E2nodeConfigurationUpdate]->Increment();
+            message.peerInfo->counters[OUT_SUCC][BYTES_COUNTER][ProcedureCode_id_E2nodeConfigurationUpdate]->Increment(rmrMessageBuffer.rcvMessage->len);
+
+            // Update E2T instance level metrics
+            message.peerInfo->sctpParams->e2tCounters[OUT_SUCC][MSG_COUNTER][ProcedureCode_id_E2nodeConfigurationUpdate]->Increment();
+            message.peerInfo->sctpParams->e2tCounters[OUT_SUCC][BYTES_COUNTER][ProcedureCode_id_E2nodeConfigurationUpdate]->Increment(rmrMessageBuffer.rcvMessage->len);
+#endif
+            if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
+                mdclog_write(MDCLOG_ERR, "Failed to send RIC_E2NODE_CONFIG_UPDATE_ACK");
+                return -6;
+            }
+            break;
+        }
+
+        case RIC_E2NODE_CONFIG_UPDATE_FAILURE: {
+            if (loglevel >= MDCLOG_DEBUG) {
+                mdclog_write(MDCLOG_DEBUG, "RIC_E2NODE_CONFIG_UPDATE_FAILURE");
+            }
+            if (PER_FromXML(message, rmrMessageBuffer) != 0) {
+                break;
+            }
+#if !(defined(UNIT_TEST) || defined(MODULE_TEST))
+            message.peerInfo->counters[OUT_UN_SUCC][MSG_COUNTER][ProcedureCode_id_E2nodeConfigurationUpdate]->Increment();
+            message.peerInfo->counters[OUT_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_E2nodeConfigurationUpdate]->Increment(rmrMessageBuffer.rcvMessage->len);
+
+            // Update E2T instance level metrics
+            message.peerInfo->sctpParams->e2tCounters[OUT_UN_SUCC][MSG_COUNTER][ProcedureCode_id_E2nodeConfigurationUpdate]->Increment();
+            message.peerInfo->sctpParams->e2tCounters[OUT_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_E2nodeConfigurationUpdate]->Increment(rmrMessageBuffer.rcvMessage->len);
+#endif
+            if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
+                mdclog_write(MDCLOG_ERR, "Failed to send RIC_E2NODE_CONFIG_UPDATE_FAILURE");
+                return -6;
+            }
+            break;
+        }
+
         case RIC_ERROR_INDICATION: {
             if (loglevel >= MDCLOG_DEBUG) {
                 mdclog_write(MDCLOG_DEBUG, "RIC_ERROR_INDICATION");
@@ -3204,3 +3329,43 @@ string translateRmrErrorMessages(int state) {
     }
     return str;
 }
+int fetchStreamId(ConnectedCU_t *peerInfo, ReportingMessages_t &message)
+{
+    auto loglevel = mdclog_level_get();
+    int streamId = INVALID_STREAM_ID;
+    if(message.peerInfo->isSingleStream != false)
+    {
+        streamId = message.peerInfo->singleStreamId;
+        if (loglevel >= MDCLOG_DEBUG) {
+        mdclog_write(MDCLOG_DEBUG, "Send SCTP message for SINGLE_STREAM streamId %d , Messeage Type %d ,%s",
+                     streamId,message.message.messageType,  __FUNCTION__);
+        }
+        return streamId;
+    }
+    int msgType = message.message.messageType;
+    switch (msgType){
+        case RIC_E2_RESET_REQ:
+        case RIC_E2_RESET_RESP:
+        case RIC_E2_SETUP_RESP:
+        case RIC_E2_SETUP_FAILURE:
+        case RIC_ERROR_INDICATION:
+        case RIC_SERVICE_QUERY:
+        case RIC_SERVICE_UPDATE_ACK:
+        case RIC_SERVICE_UPDATE_FAILURE:
+            streamId = 0;
+            break;
+        case RIC_SUB_REQ:
+        case RIC_SUB_DEL_REQ:
+        case RIC_CONTROL_REQ:
+            streamId = 1;
+            break;
+        default:
+            streamId = 0;
+            break;
+    }
+    if (loglevel >= MDCLOG_DEBUG) {
+        mdclog_write(MDCLOG_DEBUG, "Send SCTP message for streamId %d Messeage Type %d, %s",
+                     streamId, message.message.messageType, __FUNCTION__);
+    }
+    return streamId;
+}