FCA_MEC-5161: Oran commit for E2 UT+MT improvement
[ric-plt/e2.git] / RIC-E2-TERMINATION / sctpThread.cpp
index 2d7e76b..15f5ba4 100644 (file)
@@ -33,8 +33,6 @@
 #include <errno.h>
 #include <sys/stat.h>
 
-
-
 using namespace std;
 //using namespace std::placeholders;
 using namespace boost::filesystem;
@@ -886,7 +884,7 @@ void listener(sctp_params_t *params) {
             }
         }
 #ifdef UNIT_TEST
-                break;
+    break;
 #endif
     }
 }
@@ -1175,7 +1173,11 @@ void cleanHashEntry(ConnectedCU_t *val, Sctp_Map_t *m) {
  */
 int sendSctpMsg(ConnectedCU_t *peerInfo, ReportingMessages_t &message, Sctp_Map_t *m) {
     auto loglevel = mdclog_level_get();
+#ifndef UNIT_TEST    
     int fd = peerInfo->fileDescriptor;
+#else
+    int fd = FILE_DESCRIPTOR;
+#endif    
     if (loglevel >= MDCLOG_DEBUG) {
         mdclog_write(MDCLOG_DEBUG, "Send SCTP message for CU %s, %s",
                      message.message.enodbName, __FUNCTION__);
@@ -1187,14 +1189,14 @@ int sendSctpMsg(ConnectedCU_t *peerInfo, ReportingMessages_t &message, Sctp_Map_
                 continue;
             }
             mdclog_write(MDCLOG_ERR, "error writing to CU a message, %s ", strerror(errno));
+#ifndef UNIT_TEST            
             if (!peerInfo->isConnected) {
                 mdclog_write(MDCLOG_ERR, "connection to CU %s is still in progress.", message.message.enodbName);
-#ifndef UNIT_TEST
                 return -1;
-#endif
             }
             cleanHashEntry(peerInfo, m);
             close(fd);
+#endif            
             char key[MAX_ENODB_NAME_SIZE * 2];
             snprintf(key, MAX_ENODB_NAME_SIZE * 2, "msg:%s|%d", message.message.enodbName,
                      message.message.messageType);
@@ -1278,8 +1280,12 @@ int receiveDataFromSctp(struct epoll_event *events,
         }
         // read the buffer directly to rmr payload
         message.message.asndata = rmrMessageBuffer.sendMessage->payload;
+#ifndef UNIT_TEST        
         message.message.asnLength = rmrMessageBuffer.sendMessage->len =
                 read(message.peerInfo->fileDescriptor, rmrMessageBuffer.sendMessage->payload, RECEIVE_SCTP_BUFFER_SIZE);
+#else
+        message.message.asnLength = rmrMessageBuffer.sendMessage->len;
+#endif
 
         if (loglevel >= MDCLOG_DEBUG) {
             mdclog_write(MDCLOG_DEBUG, "Finish Read from SCTP %d fd message length = %ld",
@@ -1329,9 +1335,13 @@ int receiveDataFromSctp(struct epoll_event *events,
                          printBuffer);
             clock_gettime(CLOCK_MONOTONIC, &decodeStart);
         }
-
+#ifndef UNIT_TEST
         auto rval = asn_decode(nullptr, ATS_ALIGNED_BASIC_PER, &asn_DEF_E2AP_PDU, (void **) &pdu,
-                          message.message.asndata, message.message.asnLength);
+                        message.message.asndata, message.message.asnLength);
+#else
+        asn_dec_rval_t rval = {RC_OK, 0};
+        pdu = (E2AP_PDU_t*)rmrMessageBuffer.sendMessage->tp_buf;
+#endif
         if (rval.code != RC_OK) {
             mdclog_write(MDCLOG_ERR, "Error %d Decoding (unpack) E2AP PDU from RAN : %s", rval.code,
                          message.peerInfo->enodbName);
@@ -1382,6 +1392,10 @@ int receiveDataFromSctp(struct epoll_event *events,
             //ASN_STRUCT_FREE(asn_DEF_E2AP_PDU, pdu);
             //pdu = nullptr;
         }
+#ifdef UNIT_TEST
+    done = 1;
+    break;
+#endif
     }
 
     if (done) {
@@ -1394,12 +1408,13 @@ int receiveDataFromSctp(struct epoll_event *events,
                          "%s|CU disconnected unexpectedly",
                          message.peerInfo->enodbName);
         message.message.asndata = rmrMessageBuffer.sendMessage->payload;
-
+#ifndef UNIT_TEST        
         if (sendRequestToXapp(message,
                               RIC_SCTP_CONNECTION_FAILURE,
                               rmrMessageBuffer) != 0) {
             mdclog_write(MDCLOG_ERR, "SCTP_CONNECTION_FAIL message failed to send to xAPP");
         }
+#endif        
 
         /* Closing descriptor make epoll remove it from the set of descriptors which are monitored. */
         close(message.peerInfo->fileDescriptor);
@@ -2095,7 +2110,7 @@ void asnUnSuccsesfulMsg(E2AP_PDU_t *pdu,
 #ifndef UNIT_TEST
                         message.peerInfo->counters[IN_UN_SUCC][MSG_COUNTER][ProcedureCode_id_RICcontrol]->Increment();
                         message.peerInfo->counters[IN_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICcontrol]->Increment((double)message.message.asnLength);
-#endif                 
+#endif
                         sendRmrMessage(rmrMessageBuffer, message);
                         messageSent = true;
                     } else {
@@ -2137,9 +2152,9 @@ void asnUnSuccsesfulMsg(E2AP_PDU_t *pdu,
         default: {
             mdclog_write(MDCLOG_WARN, "Undefined or not supported message = %ld", procedureCode);
             message.message.messageType = 0; // no RMR message type yet
-
+#ifndef UNIT_TEST
             buildJsonMessage(message);
-
+#endif            
             break;
         }
     }
@@ -2295,7 +2310,7 @@ int receiveXappMessages(Sctp_Map_t *sctpMap,
     // get message payload
     //auto msgData = msg->payload;
 #ifdef UNIT_TEST
-        rmrMessageBuffer.rcvMessage->state = 0;
+    rmrMessageBuffer.rcvMessage->state = 0;
 #endif
     if (rmrMessageBuffer.rcvMessage->state != 0) {
         mdclog_write(MDCLOG_ERR, "RMR Receiving message with stat = %d", rmrMessageBuffer.rcvMessage->state);
@@ -2311,6 +2326,9 @@ int receiveXappMessages(Sctp_Map_t *sctpMap,
             case RIC_HEALTH_CHECK_REQ:
                 break;
             default:
+#ifdef UNIT_TEST
+    break;
+#endif    
                 mdclog_write(MDCLOG_ERR, "Failed to send message no CU entry %s", message.message.enodbName);
                 return -1;
         }
@@ -2328,8 +2346,10 @@ int receiveXappMessages(Sctp_Map_t *sctpMap,
             if (PER_FromXML(message, rmrMessageBuffer) != 0) {
                 break;
             }
+#ifndef UNIT_TEST            
             message.peerInfo->counters[OUT_SUCC][MSG_COUNTER][ProcedureCode_id_E2setup]->Increment();
             message.peerInfo->counters[OUT_SUCC][BYTES_COUNTER][ProcedureCode_id_E2setup]->Increment(rmrMessageBuffer.rcvMessage->len);
+#endif            
             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_E2_SETUP_RESP");
                 return -6;
@@ -2343,8 +2363,10 @@ int receiveXappMessages(Sctp_Map_t *sctpMap,
             if (PER_FromXML(message, rmrMessageBuffer) != 0) {
                 break;
             }
+#ifndef UNIT_TEST            
             message.peerInfo->counters[OUT_UN_SUCC][MSG_COUNTER][ProcedureCode_id_E2setup]->Increment();
             message.peerInfo->counters[OUT_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_E2setup]->Increment(rmrMessageBuffer.rcvMessage->len);
+#endif            
             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_E2_SETUP_FAILURE");
                 return -6;
@@ -2355,8 +2377,10 @@ int receiveXappMessages(Sctp_Map_t *sctpMap,
             if (loglevel >= MDCLOG_DEBUG) {
                 mdclog_write(MDCLOG_DEBUG, "RIC_ERROR_INDICATION");
             }
+#ifndef UNIT_TEST            
             message.peerInfo->counters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_ErrorIndication]->Increment();
             message.peerInfo->counters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_ErrorIndication]->Increment(rmrMessageBuffer.rcvMessage->len);
+#endif            
             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_ERROR_INDICATION");
                 return -6;
@@ -2367,8 +2391,10 @@ int receiveXappMessages(Sctp_Map_t *sctpMap,
             if (loglevel >= MDCLOG_DEBUG) {
                 mdclog_write(MDCLOG_DEBUG, "RIC_SUB_REQ");
             }
+#ifndef UNIT_TEST            
             message.peerInfo->counters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_RICsubscription]->Increment();
             message.peerInfo->counters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_RICsubscription]->Increment(rmrMessageBuffer.rcvMessage->len);
+#endif            
             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SUB_REQ");
                 return -6;
@@ -2379,8 +2405,10 @@ int receiveXappMessages(Sctp_Map_t *sctpMap,
             if (loglevel >= MDCLOG_DEBUG) {
                 mdclog_write(MDCLOG_DEBUG, "RIC_SUB_DEL_REQ");
             }
+#ifndef UNIT_TEST            
             message.peerInfo->counters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_RICsubscriptionDelete]->Increment();
             message.peerInfo->counters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_RICsubscriptionDelete]->Increment(rmrMessageBuffer.rcvMessage->len);
+#endif            
             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SUB_DEL_REQ");
                 return -6;
@@ -2391,8 +2419,10 @@ int receiveXappMessages(Sctp_Map_t *sctpMap,
             if (loglevel >= MDCLOG_DEBUG) {
                 mdclog_write(MDCLOG_DEBUG, "RIC_CONTROL_REQ");
             }
+#ifndef UNIT_TEST            
             message.peerInfo->counters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_RICcontrol]->Increment();
             message.peerInfo->counters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_RICcontrol]->Increment(rmrMessageBuffer.rcvMessage->len);
+#endif            
             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_CONTROL_REQ");
                 return -6;
@@ -2406,8 +2436,10 @@ int receiveXappMessages(Sctp_Map_t *sctpMap,
             if (PER_FromXML(message, rmrMessageBuffer) != 0) {
                 break;
             }
+#ifndef UNIT_TEST            
             message.peerInfo->counters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_RICserviceQuery]->Increment();
             message.peerInfo->counters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_RICserviceQuery]->Increment(rmrMessageBuffer.rcvMessage->len);
+#endif            
             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SERVICE_QUERY");
                 return -6;
@@ -2422,8 +2454,10 @@ int receiveXappMessages(Sctp_Map_t *sctpMap,
                 mdclog_write(MDCLOG_ERR, "error in PER_FromXML");
                 break;
             }
+#ifndef UNIT_TEST            
             message.peerInfo->counters[OUT_SUCC][MSG_COUNTER][ProcedureCode_id_RICserviceUpdate]->Increment();
             message.peerInfo->counters[OUT_SUCC][BYTES_COUNTER][ProcedureCode_id_RICserviceUpdate]->Increment(rmrMessageBuffer.rcvMessage->len);
+#endif            
             if (loglevel >= MDCLOG_DEBUG) {
                 mdclog_write(MDCLOG_DEBUG, "Before sending to CU");
             }
@@ -2440,8 +2474,10 @@ int receiveXappMessages(Sctp_Map_t *sctpMap,
             if (PER_FromXML(message, rmrMessageBuffer) != 0) {
                 break;
             }
+#ifndef UNIT_TEST            
             message.peerInfo->counters[OUT_UN_SUCC][MSG_COUNTER][ProcedureCode_id_RICserviceUpdate]->Increment();
             message.peerInfo->counters[OUT_UN_SUCC][BYTES_COUNTER][ProcedureCode_id_RICserviceUpdate]->Increment(rmrMessageBuffer.rcvMessage->len);
+#endif            
             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_SERVICE_UPDATE_FAILURE");
                 return -6;
@@ -2455,8 +2491,10 @@ int receiveXappMessages(Sctp_Map_t *sctpMap,
             if (PER_FromXML(message, rmrMessageBuffer) != 0) {
                 break;
             }
+#ifndef UNIT_TEST            
             message.peerInfo->counters[OUT_INITI][MSG_COUNTER][ProcedureCode_id_Reset]->Increment();
             message.peerInfo->counters[OUT_INITI][BYTES_COUNTER][ProcedureCode_id_Reset]->Increment(rmrMessageBuffer.rcvMessage->len);
+#endif            
             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_E2_RESET");
                 return -6;
@@ -2470,8 +2508,10 @@ int receiveXappMessages(Sctp_Map_t *sctpMap,
             if (PER_FromXML(message, rmrMessageBuffer) != 0) {
                 break;
             }
+#ifndef UNIT_TEST            
             message.peerInfo->counters[OUT_SUCC][MSG_COUNTER][ProcedureCode_id_Reset]->Increment();
             message.peerInfo->counters[OUT_SUCC][BYTES_COUNTER][ProcedureCode_id_Reset]->Increment(rmrMessageBuffer.rcvMessage->len);
+#endif            
             if (sendDirectionalSctpMsg(rmrMessageBuffer, message, 0, sctpMap) != 0) {
                 mdclog_write(MDCLOG_ERR, "Failed to send RIC_E2_RESET_RESP");
                 return -6;
@@ -2723,7 +2763,7 @@ int sendRmrMessage(RmrMessagesBuffer_t &rmrMessageBuffer, ReportingMessages_t &m
 #ifndef UNIT_TEST
     rmrMessageBuffer.sendMessage = rmr_send_msg(rmrMessageBuffer.rmrCtx, rmrMessageBuffer.sendMessage);
 #else
-        rmrMessageBuffer.sendMessage->state = RMR_ERR_RETRY;
+    rmrMessageBuffer.sendMessage->state = RMR_ERR_RETRY;
 #endif
     if (rmrMessageBuffer.sendMessage == nullptr) {
         rmrMessageBuffer.sendMessage = rmr_alloc_msg(rmrMessageBuffer.rmrCtx, RECEIVE_XAPP_BUFFER_SIZE);
@@ -2768,7 +2808,7 @@ int sendRmrMessage(RmrMessagesBuffer_t &rmrMessageBuffer, ReportingMessages_t &m
 
 void buildJsonMessage(ReportingMessages_t &message) {
 #ifdef UNIT_TEST
-        jsonTrace = true;
+    jsonTrace = true;
 #endif
     if (jsonTrace) {
         message.outLen = sizeof(message.base64Data);