ISSUE ID:- (RICAPP-176). 44/8144/1 f-release
authorsandeepindia <kumar.sandeep3@hcl.com>
Wed, 4 May 2022 13:12:29 +0000 (18:42 +0530)
committersandeepindia <kumar.sandeep3@hcl.com>
Wed, 4 May 2022 13:12:29 +0000 (18:42 +0530)
Added the functionality of REST based subscription request and
REST based subscription delete request

Signed-off-by: sandeepindia <kumar.sandeep3@hcl.com>
Change-Id: I97a07fb95cacff702d0b7f9deeb847a113e07d83

Bouncer/Dockerfile
Bouncer/init/config-file.json
Bouncer/src/Makefile
Bouncer/src/b_xapp_main.cc
Bouncer/src/xapp-asn/e2ap/subscription_delete_request.cc
Bouncer/src/xapp-asn/e2ap/subscription_request.cc
Bouncer/src/xapp-mgmt/msgs_proc.cc
Bouncer/src/xapp-mgmt/subs_mgmt.cc
Bouncer/src/xapp-mgmt/subs_mgmt.hpp
Bouncer/src/xapp.cc
Bouncer/src/xapp.hpp

index 39f9319..0205975 100644 (file)
@@ -75,6 +75,10 @@ RUN apt-get install -y  libboost-all-dev
 RUN apt-get install -y libhiredis-dev
 #RUN apt-get install -y valgrind
 
+RUN mkdir /usr/local/include/nlohmann
+RUN git clone https://github.com/azadkuh/nlohmann_json_release.git
+RUN cp nlohmann_json_release/json.hpp /usr/local/include/nlohmann
+
 RUN git clone https://gerrit.o-ran-sc.org/r/ric-plt/dbaas
 RUN cd dbaas/redismodule && \
     ./autogen.sh && \
@@ -103,6 +107,20 @@ RUN git clone https://github.com/Tencent/rapidjson && \
     cd ${STAGE_DIR} && \
     rm -rf rapidjson
 
+WORKDIR ${STAGE_DIR}
+## Install CPPRESTSDK
+
+RUN apt-get install -y libcpprest-dev
+RUN apt-get install -y  g++ git libboost-atomic-dev libboost-thread-dev libboost-system-dev libboost-date-time-dev libboost-regex-dev libboost-filesystem-dev libboost-random-dev libboost-chrono-dev libboost-serialization-dev libwebsocketpp-dev openssl libssl-dev ninja-build zlib1g-dev
+RUN git clone https://github.com/Microsoft/cpprestsdk.git casablanca && \
+    cd casablanca && \
+    mkdir build && \
+    cd build && \
+    cmake -G Ninja .. -DCMAKE_BUILD_TYPE=Release -DBUILD_TESTS=OFF -DBUILD_SAMPLES=OFF -DCMAKE_INSTALL_PREFIX=/usr/local .. && \
+    ninja && \
+    ninja install && \
+    cd ${STAGE_DIR}
+    #rm -rf casablanca
 ##-----------------------------------
 # Now install the program
 #------------------------------------
@@ -135,10 +153,12 @@ COPY --from=ricbuild /usr/local/libexec/redismodule/libredis* /usr/local/libexec
 RUN dpkg -i /tmp/*.deb
 RUN apt-get update && \
     apt-get install -y libcurl3 python3 && \
-    apt-get install -y libboost-all-dev cpputest libhiredis-dev valgrind && \
+    apt-get install -y libboost-all-dev cpputest libcpprest-dev libhiredis-dev valgrind && \
     apt-get clean
 COPY --from=ricbuild /etc/xapp/* /etc/xapp/
 COPY --from=ricbuild /usr/local/bin/b_xapp_main /usr/local/bin/b_xapp_main
+COPY --from=ricbuild /usr/local/lib/libcpprest.so* /usr/local/bin/
+COPY --from=ricbuild /usr/local/lib/libcpprest.so* /usr/local/lib/
 COPY --from=ricbuild /usr/local/include/rnib/*.h /usr/local/include/rnib/
 COPY --from=ricbuild /usr/local/include/rnib/rnibreader.a /usr/local/include/rnib/
 
index 508e064..958136e 100644 (file)
         ],
         "messaging": {
             "ports": [
+                {
+                        "name": "http",
+                        "container": "bouncer-xapp",
+                        "port": 8080,
+                        "description": "http service"
+                },
                 {
                     "name": "rmr-data",
                     "container": "bouncer-xapp",
                     "port": 4560,
-                   
-                   "rxMessages": ["RIC_SUB_RESP", "RIC_INDICATION"],
-                    "txMessages": ["RIC_SUB_REQ"],
+
+                    "rxMessages": ["RIC_SUB_RESP", "RIC_INDICATION","RIC_SUB_DEL_RESP"],
+                    "txMessages": ["RIC_SUB_REQ","RIC_SUB_DEL_REQ"],
                     "policies": [1],
                     "description": "rmr receive data port for Bouncer xApp"
                 },
             "protPort": "tcp:4560",
             "maxSize": 2072,
             "numWorkers": 1,
-            "txMessages": ["RIC_SUB_REQ"],
-            "rxMessages": ["RIC_SUB_RESP", "RIC_INDICATION"],
-           "policies": [1]
+            "txMessages": ["RIC_SUB_REQ","RIC_SUB_DEL_REQ"],
+            "rxMessages": ["RIC_SUB_RESP", "RIC_INDICATION","RIC_SUB_DEL_RESP"],
+            "policies": [1]
+        },
+        "http":{
+                "protPort": "tcp:8080"
+
         }
   }
index 808bafe..f6c3946 100644 (file)
@@ -32,7 +32,7 @@ E2SMFLAGS = -I$(E2SMSRC)
 
 ########libs
 
-LIBS= -lsdl -lrmr_si  -lpthread -lm $(LOG_LIBS) $(CURL_LIBS) $(RNIB_LIBS)
+LIBS= -lsdl -lrmr_si -lpthread -lm -lboost_system -lcrypto -lssl -lcpprest $(LOG_LIBS) $(CURL_LIBS) $(RNIB_LIBS)
 COV_FLAGS= -fprofile-arcs -ftest-coverage
 
 #######
index 42d2d3e..0492e39 100644 (file)
 */
 
 #include "xapp.hpp"
+#include <cpprest/http_listener.h>
+#include <cpprest/json.h>
+#include <cpprest/uri.h>
+using namespace web;
+using namespace web::http;
+using namespace web::http::experimental::listener;
+using namespace utility;
+std::vector<std::string>SubscriptionIds;
+#define TRACE(msg)            wcout << msg
+
+
+void display_json(
+   json::value const & jvalue)
+{
+        cout<<"\ndisplaying REST Notification\n";
+   wcout << jvalue.serialize().c_str() << endl;
+}
+
+
+void handle_request(http_request request)
+{
+auto answer = json::value::object();
+cout<<"\nPrinting POST request content\n";
+cout<<request.to_string()<<"\n";
+   request
+      .extract_json()
+      .then([&answer](pplx::task<json::value> task) {
+         try
+         {
+            answer = task.get();
+            display_json(answer);
+            }
+         catch (http_exception const & e)
+         {
+         cout<<"\ninside catch block";
+            wcout << e.what() << endl;
+         }
+
+      })
+      .wait();
+
+   request.reply(status_codes::OK, answer);
+}
+
+void handle_post(http_request request)
+{
+   TRACE("\nhandle POST\n");
+
+   handle_request(request);
+}
+
+void handle_put(http_request request)
+{
+   TRACE("\nhandle PUT\n");
+
+   handle_request(request);
+}
+
+void start_server()
+{
+        
+         utility::string_t port = U("8080");
+         utility::string_t address = U("http://0.0.0.0:");
+         address.append(port);
+        address.append(U("/ric/v1/subscriptions/response"));
+       uri_builder uri(address);
+
+         auto addr = uri.to_uri().to_string();
+         http_listener listener(addr);
+       //http_listener listener("http://localhost:8080/ric");
+       cout<<"validated uri = "<<uri::validate(addr)<<"\n";
+         ucout << utility::string_t(U("Listening for REST Notification at: ")) << addr << std::endl;
+       listener.support(methods::POST,[](http_request request) { handle_post(request);});
+       listener.support(methods::PUT,[](http_request request){  handle_put(request);});
+       try
+       {
+       listener
+               .open()
+               .then([&listener]() { })
+               .wait();
+
+       while (true);
+       }
+       catch (exception const & e)
+       {
+       wcout << e.what() << endl;
+       }
+
+}
 
 void signalHandler( int signum ) {
    cout << "Interrupt signal (" << signum << ") received.\n";
@@ -65,6 +154,8 @@ int main(int argc, char *argv[]){
 
        mdclog_write(MDCLOG_INFO, "Created Bouncer Xapp Instance");
        //Startup E2 subscription
+       std::thread t1(std::ref(start_server));
+       t1.detach();
        b_xapp->startup(sub_handler);
 
        sleep(10);
@@ -78,10 +169,10 @@ int main(int argc, char *argv[]){
 
        b_xapp->start_xapp_receiver(std::ref(*mp_handler));
 
-       sleep(1);
-
+       sleep(20);//waiting for some time before sending delete.
 
 
+       b_xapp->shutdown();//will start the sending delete procedure.
        while(1){
                                sleep(1);
                         }
index 6a1ffc8..f63376e 100644 (file)
@@ -113,30 +113,35 @@ bool subscription_delete::encode_e2ap_subscription(unsigned char *buf, size_t *s
 }
 
 
-bool  subscription_delete::set_fields( subscription_helper &helper){
-  unsigned int ie_index;
-  
-  ie_index = 0;
-  RICsubscriptionDeleteRequest_IEs_t *ies_ricreq = &IE_array[ie_index];
-  ies_ricreq->criticality = Criticality_reject;
-  ies_ricreq->id = ProtocolIE_ID_id_RICrequestID;
-  ies_ricreq->value.present = RICsubscriptionDeleteRequest_IEs__value_PR_RICrequestID;
-  RICrequestID_t *ricrequest_ie = &ies_ricreq->value.choice.RICrequestID;
-  ricrequest_ie->ricRequestorID = helper.get_request_id();
-  //ricrequest_ie->ricRequestSequenceNumber = helper.get_req_seq();
+bool  subscription_delete::set_fields( subscription_helper &helper)
+{
+       static long update_instance=0;//static variable to update ricInstaceID for sending delete req
+       unsigned int ie_index;
+       ie_index = 0;
+       RICsubscriptionDeleteRequest_IEs_t *ies_ricreq = &IE_array[ie_index];
+       ies_ricreq->criticality = Criticality_reject;
+       ies_ricreq->id = ProtocolIE_ID_id_RICrequestID;
+       ies_ricreq->value.present = RICsubscriptionDeleteRequest_IEs__value_PR_RICrequestID;
+       RICrequestID_t *ricrequest_ie = &ies_ricreq->value.choice.RICrequestID;
+       ricrequest_ie->ricRequestorID = helper.get_request_id();
+       update_instance++;//incrementing ricInstanceID by one, each time the bouncer send delete req
+       ricrequest_ie->ricInstanceID = update_instance;
+       mdclog_write(MDCLOG_INFO,"instance id for subsdelreq  = %d", update_instance);
+       //ricrequest_ie->ricRequestSequenceNumber = helper.get_req_seq();
 
 
   
-  ie_index = 1;
-  RICsubscriptionDeleteRequest_IEs_t *ies_ranfunc = &IE_array[ie_index];
-  ies_ranfunc->criticality = Criticality_reject;
-  ies_ranfunc->id = ProtocolIE_ID_id_RANfunctionID;
-  ies_ranfunc->value.present = RICsubscriptionDeleteRequest_IEs__value_PR_RANfunctionID;
-  RANfunctionID_t *ranfunction_ie = &ies_ranfunc->value.choice.RANfunctionID;
-  *ranfunction_ie = helper.get_function_id();
-
+       ie_index = 1;
+       RICsubscriptionDeleteRequest_IEs_t *ies_ranfunc = &IE_array[ie_index];
+       ies_ranfunc->criticality = Criticality_reject;
+       ies_ranfunc->id = ProtocolIE_ID_id_RANfunctionID;
+       ies_ranfunc->value.present = RICsubscriptionDeleteRequest_IEs__value_PR_RANfunctionID;
+       RANfunctionID_t *ranfunction_ie = &ies_ranfunc->value.choice.RANfunctionID;
+       *ranfunction_ie = helper.get_function_id();
+       mdclog_write(MDCLOG_INFO,"ran function  id for subsdelreq  = %d", helper.get_function_id());
+       //*ranfunction_ie =1;
   
-  return true;
+       return true;
 };
 
 
index 91071b3..4d23bd9 100644 (file)
@@ -175,6 +175,7 @@ bool subscription_request::set_fields( InitiatingMessage_t * init_msg, subscript
   ies_ricreq->value.present = RICsubscriptionRequest_IEs__value_PR_RICrequestID;
   RICrequestID_t *ricrequest_ie = &ies_ricreq->value.choice.RICrequestID;
   ricrequest_ie->ricRequestorID = helper.get_request_id();
+ mdclog_write(MDCLOG_INFO,"instance id for subsreq  = %d", ricrequest_ie->ricInstanceID);
   //ricrequest_ie->ricRequestSequenceNumber = helper.get_req_seq();
   result = ASN_SEQUENCE_ADD(&(ric_subscription->protocolIEs), &IE_array[ie_index]);
   assert(result == 0);
index d66f0c8..49b413a 100644 (file)
@@ -136,102 +136,150 @@ bool XappMsgHandler::decode_subscription_response(unsigned char* data_buf, size_
 
 
 //For processing received messages.XappMsgHandler should mention if resend is required or not.
-void XappMsgHandler::operator()(rmr_mbuf_t *message, bool *resend){
+void XappMsgHandler::operator()(rmr_mbuf_t *message, bool *resend)
+{
 
-       if (message->len > MAX_RMR_RECV_SIZE){
+       if (message->len > MAX_RMR_RECV_SIZE)
+       {
                mdclog_write(MDCLOG_ERR, "Error : %s, %d, RMR message larger than %d. Ignoring ...", __FILE__, __LINE__, MAX_RMR_RECV_SIZE);
                return;
        }
-      //a1_policy_helper helper;
+               //a1_policy_helper helper;
        bool res=false;
        E2AP_PDU_t* e2pdu = (E2AP_PDU_t*)calloc(1, sizeof(E2AP_PDU));
        int num = 0;
        
-       switch(message->mtype){
+       switch(message->mtype)
+       {
                //need to fix the health check.
                case (RIC_HEALTH_CHECK_REQ):
-                               message->mtype = RIC_HEALTH_CHECK_RESP;        // if we're here we are running and all is ok
-                               message->sub_id = -1;
-                               strncpy( (char*)message->payload, "Bouncer OK\n", rmr_payload_size( message) );
-                               *resend = true;
-                               break;
+                       message->mtype = RIC_HEALTH_CHECK_RESP;        // if we're here we are running and all is ok
+                       message->sub_id = -1;
+                       strncpy( (char*)message->payload, "Bouncer OK\n", rmr_payload_size( message) );
+                       *resend = true;
+                       break;
 
                case (RIC_SUB_RESP):
                        mdclog_write(MDCLOG_INFO, "Received subscription message of type = %d", message->mtype);
-                               unsigned char *me_id;
-                               if( (me_id = (unsigned char *) malloc( sizeof( unsigned char ) * RMR_MAX_MEID )) == NULL ) {
-                                       mdclog_write(MDCLOG_ERR, "Error :  %s, %d : malloc failed for me_id", __FILE__, __LINE__);
-                                       me_id = rmr_get_meid(message, NULL);
-                               } else {
-                                       rmr_get_meid(message, me_id);
-                               }
-                               if(me_id == NULL){
-                                       mdclog_write(MDCLOG_ERR, " Error :: %s, %d : rmr_get_meid failed me_id is NULL", __FILE__, __LINE__);
-                                       break;
-                               }
-                               mdclog_write(MDCLOG_INFO,"RMR Received MEID: %s",me_id);
-                               if(_ref_sub_handler !=NULL){
-                                       _ref_sub_handler->manage_subscription_response(message->mtype, reinterpret_cast< char const* >(me_id));
-                               } else {
-                                       mdclog_write(MDCLOG_ERR, " Error :: %s, %d : Subscription handler not assigned in message processor !", __FILE__, __LINE__);
-                               }
-                               *resend = false;
-                               if (me_id != NULL) {
-                                       mdclog_write(MDCLOG_INFO, "Free RMR Received MEID memory: %s(0x%x)", me_id, me_id);
-                                       free(me_id);
-                               }
+                       unsigned char *me_id;
+                       if( (me_id = (unsigned char *) malloc( sizeof( unsigned char ) * RMR_MAX_MEID )) == NULL ) 
+                       {
+                               mdclog_write(MDCLOG_ERR, "Error :  %s, %d : malloc failed for me_id", __FILE__, __LINE__);
+                               me_id = rmr_get_meid(message, NULL);
+                       } 
+                       else 
+                       {
+                               rmr_get_meid(message, me_id);
+                       }
+                       if(me_id == NULL)
+                       {
+                               mdclog_write(MDCLOG_ERR, " Error :: %s, %d : rmr_get_meid failed me_id is NULL", __FILE__, __LINE__);
+                               break;
+                       }
+                       mdclog_write(MDCLOG_INFO,"RMR Received MEID: %s",me_id);
+                       if(_ref_sub_handler !=NULL)
+                       {
+                               _ref_sub_handler->manage_subscription_response(message->mtype, reinterpret_cast< char const* >(me_id));
+                       } 
+                       else 
+                       {
+                               mdclog_write(MDCLOG_ERR, " Error :: %s, %d : Subscription handler not assigned in message processor !", __FILE__, __LINE__);
+                       }
+                       *resend = false;
+                       if (me_id != NULL) 
+                       {
+                               mdclog_write(MDCLOG_INFO, "Free RMR Received MEID memory: %s(0x%x)", me_id, me_id);
+                               free(me_id);
+                       }
+                       break;
+
+               case (RIC_SUB_DEL_RESP):
+                       mdclog_write(MDCLOG_INFO, "Received subscription delete message of type = %d", message->mtype);
+                       //unsigned char *me_id;
+                       if( (me_id = (unsigned char *) malloc( sizeof( unsigned char ) * RMR_MAX_MEID )) == NULL ) 
+                       {
+                               mdclog_write(MDCLOG_ERR, "Error :  %s, %d : malloc failed for me_id", __FILE__, __LINE__);
+                               me_id = rmr_get_meid(message, NULL);
+                       } 
+                       else 
+                               
+                       {
+                               rmr_get_meid(message, me_id);
+                       }
+                       if(me_id == NULL)
+                       {
+                               mdclog_write(MDCLOG_ERR, " Error :: %s, %d : rmr_get_meid failed me_id is NULL", __FILE__, __LINE__);
                                break;
+                       }
+                       mdclog_write(MDCLOG_INFO,"RMR Received MEID: %s",me_id);
+                       if(_ref_sub_handler !=NULL)
+                       {
+                               _ref_sub_handler->manage_subscription_response(message->mtype, reinterpret_cast< char const* >(me_id));
+                       } 
+                       else 
+                       {
+                               mdclog_write(MDCLOG_ERR, " Error :: %s, %d : Subscription handler not assigned in message processor !", __FILE__, __LINE__);
+                       }
+                       *resend = false;
+                       if (me_id != NULL) 
+                       {
+                               mdclog_write(MDCLOG_INFO, "Free RMR Received MEID memory: %s(0x%x)", me_id, me_id);
+                               free(me_id);
+                       }
+                       break;
 
                case (RIC_INDICATION):
                        
                        if(message->mtype == 12050)
-                         {
-                           mdclog_write(MDCLOG_INFO, "Decoding indication for msg = %d", message->mtype);
-
-                           ASN_STRUCT_RESET(asn_DEF_E2AP_PDU, e2pdu);
-                           asn_transfer_syntax syntax;
-                           syntax = ATS_ALIGNED_BASIC_PER;
-
-                           mdclog_write(MDCLOG_INFO, "Data_size = %d",  message->len);
-
-                           auto rval =  asn_decode(nullptr, syntax, &asn_DEF_E2AP_PDU, (void**)&e2pdu, message->payload, message->len);
-
-                           if(rval.code == RC_OK)
-                           {
-                                 mdclog_write(MDCLOG_INFO, "rval.code = %d ", rval.code);
-                           }
-                           else{
-                                 mdclog_write(MDCLOG_ERR, " rval.code = %d ", rval.code);
-                                 break;
-                           }
-
-                           asn_fprint(stdout, &asn_DEF_E2AP_PDU, e2pdu);
-                          mdclog_write(MDCLOG_INFO, "Received indication message of type = %d", message->mtype);
-                           num++;
-                          message->mtype = RIC_CONTROL_REQ;        // if we're here we are running and all is ok
-                           message->sub_id = -1;
-                           strncpy((char*)message->payload, "Bouncer Control OK\n", rmr_payload_size(message));
-                           *resend = true;
-                           ASN_STRUCT_FREE(asn_DEF_E2AP_PDU, e2pdu);
+                        {
+                               mdclog_write(MDCLOG_INFO, "Decoding indication for msg = %d", message->mtype);
+
+                               ASN_STRUCT_RESET(asn_DEF_E2AP_PDU, e2pdu);
+                               asn_transfer_syntax syntax;
+                               syntax = ATS_ALIGNED_BASIC_PER;
+
+                               mdclog_write(MDCLOG_INFO, "Data_size = %d",  message->len);
+
+                               auto rval =  asn_decode(nullptr, syntax, &asn_DEF_E2AP_PDU, (void**)&e2pdu, message->payload, message->len);
+
+                               if(rval.code == RC_OK)
+                               {
+                                       mdclog_write(MDCLOG_INFO, "rval.code = %d ", rval.code);
+                               }
+                                       else
+                               {
+                                       mdclog_write(MDCLOG_ERR, " rval.code = %d ", rval.code);
+                                       break;
+                               }
+
+                                       asn_fprint(stdout, &asn_DEF_E2AP_PDU, e2pdu);
+                               mdclog_write(MDCLOG_INFO, "Received indication message of type = %d", message->mtype);
+                               num++;
+                               message->mtype = RIC_CONTROL_REQ;        // if we're here we are running and all is ok
+                                       message->sub_id = -1;
+                                       strncpy((char*)message->payload, "Bouncer Control OK\n", rmr_payload_size(message));
+                                       *resend = true;
+                                       ASN_STRUCT_FREE(asn_DEF_E2AP_PDU, e2pdu);
 
                         } 
-                        mdclog_write(MDCLOG_INFO, "Number of Indications Received = %d", num);
-                        break;
+                       mdclog_write(MDCLOG_INFO, "Number of Indications Received = %d", num);
+                               break;
 
-       /*case A1_POLICY_REQ:
+               /*case A1_POLICY_REQ:
 
-                   mdclog_write(MDCLOG_INFO, "In Message Handler: Received A1_POLICY_REQ.");
+                       mdclog_write(MDCLOG_INFO, "In Message Handler: Received A1_POLICY_REQ.");
                        helper.handler_id = xapp_id;
 
                        res = a1_policy_handler((char*)message->payload, &message->len, helper);
-                       if(res){
+                       if(res)
+                       {
                                message->mtype = A1_POLICY_RESP;        // if we're here we are running and all is ok
                                message->sub_id = -1;
                                *resend = true;
                        }
                        break;*/
 
-       default:
+               default:
                {
                        mdclog_write(MDCLOG_ERR, "Error :: Unknown message type %d received from RMR", message->mtype);
                        *resend = false;
index 931453f..dc6416e 100644 (file)
@@ -116,26 +116,33 @@ bool SubscriptionHandler::is_request_entry(transaction_identifier id){
 
 
 // Handles subscription responses
-void SubscriptionHandler::manage_subscription_response(int message_type, transaction_identifier id){
+void SubscriptionHandler::manage_subscription_response(int message_type, transaction_identifier id)
+{
        // Make This Thread sleep for 1 Second
-  std::this_thread::sleep_for(std::chrono::milliseconds(1000));
-  {
-         std::unique_lock<std::mutex> _local_lock(*(_data_lock.get()));
-         mdclog_write(MDCLOG_INFO,"Subscription Handler: Status for meid %s WAS: %d",id.c_str(),this->get_request_status(id));
-
-         //from the message type we can know if its a success/failure etc.
-         if(message_type==RIC_SUB_RESP)
-         this->set_request_status(id, request_success);
-
-         if(message_type==RIC_SUB_FAILURE)
-         this->set_request_status(id,request_failed);
-
-         mdclog_write(MDCLOG_INFO,"Subscription Handler: Status for meid %s IS: %d",id.c_str(),this->get_request_status(id));
-
-
-         //this->print_subscription_status();
-   }
-  //_cv.get()->notify_all();
+       std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+       {
+               std::unique_lock<std::mutex> _local_lock(*(_data_lock.get()));
+               mdclog_write(MDCLOG_INFO,"Subscription Handler: Status for meid %s WAS: %d",id.c_str(),this->get_request_status(id));
+
+               //from the message type we can know if its a success/failure etc.
+               if(message_type==RIC_SUB_RESP)
+                       this->set_request_status(id, request_success);
+        
+               if(message_type==RIC_SUB_DEL_RESP)
+                       this->set_request_status(id, request_success);
+
+               if(message_type==RIC_SUB_FAILURE)
+                       this->set_request_status(id,request_failed);
+         
+               if(message_type==RIC_SUB_DEL_FAILURE)
+                       this->set_request_status(id,request_failed);
+         
+               mdclog_write(MDCLOG_INFO,"Subscription Handler: Status for meid %s IS: %d",id.c_str(),this->get_request_status(id));
+
+
+               //this->print_subscription_status();
+       }
+       //_cv.get()->notify_all();
 
 }
 
index 28cf293..1da8adc 100644 (file)
@@ -49,7 +49,7 @@
 #define SUBSCR_ERR_FAIL -3
 #define SUBSCR_ERR_UNKNOWN -4
 #define SUBSCR_ERR_DUPLICATE -5
-
+#define SUBSCR_ERR_NOT_FOUND -6
 using namespace std;
 
 class TransmitterBase
@@ -165,7 +165,6 @@ int SubscriptionHandler::manage_subscription_request(transaction_identifier rmr_
   // put entry in request table
   {
     std::lock_guard<std::mutex> lock(*(_data_lock.get()));
-
     res = add_request_entry(rmr_trans_id, request_pending);
     if(! res){
                
@@ -178,10 +177,8 @@ int SubscriptionHandler::manage_subscription_request(transaction_identifier rmr_
 
   // acquire lock ...
   std::unique_lock<std::mutex> _local_lock(*(_data_lock.get()));
-
   // Send the message
   bool flg = tx();
-
   if (!flg){
     // clear state
     delete_request_entry(rmr_trans_id);
@@ -205,4 +202,52 @@ int SubscriptionHandler::manage_subscription_request(transaction_identifier rmr_
    return res;
 };
 
+template<typename AppTransmitter>
+int SubscriptionHandler:: manage_subscription_delete_request(transaction_identifier rmr_trans_id, AppTransmitter && tx)
+{
+       int res;
+       // delete  entry in request table
+       {
+               std::lock_guard<std::mutex> lock(*(_data_lock.get()));
+               res = delete_request_entry(rmr_trans_id);
+               mdclog_write(MDCLOG_INFO,"res=%d",res);
+               if(! res)
+               {
+                       mdclog_write(MDCLOG_ERR, "%s : Error deleting new subscription request %s from queue because request with key doesn't present",  __FILE__, __LINE__);
+
+                       return SUBSCR_ERR_NOT_FOUND;
+               }
+
+       }
+
+
+       // acquire lock ...
+       std::unique_lock<std::mutex> _local_lock(*(_data_lock.get()));
+       // Send the message
+       bool flg = tx();
+
+       if (!flg)
+       {
+               // add state
+               res = add_request_entry(rmr_trans_id, request_pending);
+               mdclog_write(MDCLOG_ERR, "%s, %d :: Error transmitting subscription delete request %s", __FILE__, __LINE__, rmr_trans_id.c_str());
+               return SUBSCR_ERR_TX;
+       } 
+       else 
+       {
+               mdclog_write(MDCLOG_INFO, "%s, %d :: Transmitted subscription delete request for trans_id  %s", __FILE__, __LINE__, rmr_trans_id.c_str());
+
+       }
+
+       // record time stamp ..
+       auto start = std::chrono::system_clock::now();
+       std::chrono::milliseconds t_out(_time_out);
+
+       //the wait functionality has been removed.
+
+
+       _local_lock.unlock();
+       // std::cout <<"Returning  res = " << res << " for request = " << rmr_trans_id  << std::endl;
+       return res;
+};
 #endif
index 94d190e..2a07ff4 100644 (file)
 */
 
 #include "xapp.hpp"
+#include <nlohmann/json.hpp>
+#include <iostream>
+#include<string>
+#include <cpprest/http_client.h>
+#include <cpprest/filestream.h>
+#include <cpprest/uri.h>
+#include <cpprest/json.h>
+using namespace utility;
+using namespace web;
+using namespace web::http;
+using namespace web::http::client;
+using namespace concurrency::streams;
+using jsonn = nlohmann::json;
 #define BUFFER_SIZE 1024
-
+extern std::vector<std::string>SubscriptionIds;
  Xapp::Xapp(XappSettings &config, XappRmr &rmr){
 
          rmr_ref = &rmr;
@@ -105,35 +118,247 @@ void Xapp::start_xapp_receiver(XappMsgHandler& mp_handler){
 }
 
 void Xapp::shutdown(){
-       return;
+       
+        sleep(70);
+        //send subscriptions delete.
+        shutdown_subscribe_deletes();
+        return;
 }
 
+void Xapp::shutdown_subscribe_deletes(void )
+{
+
+       bool res;
+       size_t data_size = ASN_BUFF_MAX_SIZE;
+       unsigned char   data[data_size];
+       //unsigned char meid[RMR_MAX_MEID];
+       char meid[RMR_MAX_MEID];
+       std::string xapp_id = config_ref->operator [](XappSettings::SettingName::XAPP_ID);
+
+       mdclog_write(MDCLOG_INFO,"Preparing to send subscription Delete in file= %s, line=%d",__FILE__,__LINE__);
+
+       auto gnblist = get_rnib_gnblist();
+
+       int sz = gnblist.size();
+        mdclog_write(MDCLOG_INFO,"GNBList size : %d", sz);
+
+       if(sz <= 0)
+               mdclog_write(MDCLOG_INFO,"Subscriptions Delete cannot be sent as GNBList in RNIB is NULL");
+
+       for(int i = 0; i<sz; i++)
+       {
+               sleep(15);
+               //give the message to subscription handler, along with the transmitter.
+               strcpy((char*)meid,gnblist[i].c_str());
+               mdclog_write(MDCLOG_INFO,"sending %d subscription delete request out of : %d",i+1, sz);
+               mdclog_write(MDCLOG_INFO,"sending subscription delete to ,meid = %s", meid);
+               
+               if (SubscriptionIds.size()>0)
+               {
+               auto delJson = pplx::create_task([i,meid]() {
+               utility::string_t port = U("8088");
+                utility::string_t address = U("http://service-ricplt-submgr-http.ricplt.svc.cluster.local:");
+                address.append(port);
+                address.append(U("/ric/v1/subscriptions/"));
+               address.append( utility::string_t(SubscriptionIds.back()));
+                               SubscriptionIds.pop_back();
+                uri_builder uri(address);
+                auto addr = uri.to_uri().to_string();
+                http_client client(addr);
+                ucout << utility::string_t(U("making requests at: ")) << addr <<std::endl;
+                return client.request(methods::DEL);
+
+                 
+                        })
+
+                        // Get the response.
+                                .then([](http_response response) {
+                                // Check the status code.
+                                if (response.status_code() != 204) {
+                                        throw std::runtime_error("Returned " + std::to_string(response.status_code()));
+                                }
+
+                                // Convert the response body to JSON object.
+                                       std::wcout << "Deleted: " << std::boolalpha << (response.status_code() == 204) << std::endl;
+                                        });
+
+                                // serailize the user details.
+                      
+
+                                        try {
+                                                delJson.wait();
+                                        }
+                                        catch (const std::exception& e) {
+                                                printf("Error exception:%s\n", e.what());
+                                        }
+                                                                               
+               }
+               
+               else{
+                mdclog_write(MDCLOG_ERR,"Subscription delete cannot send in file= %s, line=%d for MEID %s as no valid subIDS",__FILE__,__LINE__, meid);
+               }
+
+               /*
+
+
+               subscription_helper  din;
+               subscription_helper  dout;
+
+               subscription_delete sub_del;
+               subscription_delete sub_recv;
+
+               unsigned char buf[BUFFER_SIZE];
+               size_t buf_size = BUFFER_SIZE;
+               bool res;
+
+
+               //Random Data  for request
+               int request_id = 1;
+               int function_id = 1;
+
+               din.set_request(request_id);
+               din.set_function_id(function_id);
+
+               res = sub_del.encode_e2ap_subscription(&buf[0], &buf_size, din);
+
+               mdclog_write(MDCLOG_INFO,"Sending subscription delete  in file= %s, line=%d for MEID %s",__FILE__,__LINE__, meid);
+
+               xapp_rmr_header rmr_header; 
+               rmr_header.message_type = RIC_SUB_DEL_REQ;
+               rmr_header.payload_length = buf_size; //data_size
+
+               strcpy((char*)rmr_header.meid,gnblist[i].c_str());
+               auto transmitter = std::bind(&XappRmr::xapp_rmr_send,rmr_ref, &rmr_header, (void*)buf); //(void*)data)
+               if (subhandler_ref)
+               {
+                       mdclog_write(MDCLOG_INFO,"subhandler_ref is valid pointer");
+               }
+               else
+               {
+                       mdclog_write(MDCLOG_INFO,"subhandler_ref is invalid pointer");
+               }
+               int result = subhandler_ref->manage_subscription_delete_request(gnblist[i], transmitter);
+
+                       if(result==SUBSCR_SUCCESS)
+               {
+
+                       mdclog_write(MDCLOG_INFO,"Subscription Delete SUCCESSFUL in file= %s, line=%d for MEID %s",__FILE__,__LINE__, meid);
+               }
+               else 
+               {
+                       mdclog_write(MDCLOG_ERR,"Subscription Delete FAILED in file= %s, line=%d for MEID %s",__FILE__,__LINE__, meid);
+               }
+               */
+       }
+}
 
 void Xapp::startup_subscribe_requests(void ){
-
    bool res;
    size_t data_size = ASN_BUFF_MAX_SIZE;
    unsigned char       data[data_size];
-   unsigned char meid[RMR_MAX_MEID];
+   char meid[RMR_MAX_MEID];
    std::string xapp_id = config_ref->operator [](XappSettings::SettingName::XAPP_ID);
-
+   //int a =std::stoi(xapp_id);
    mdclog_write(MDCLOG_INFO,"Preparing to send subscription in file= %s, line=%d",__FILE__,__LINE__);
 
    auto gnblist = get_rnib_gnblist();
 
    int sz = gnblist.size();
-
+       mdclog_write(MDCLOG_INFO,"GNBList size : %d", sz);
    if(sz <= 0)
           mdclog_write(MDCLOG_INFO,"Subscriptions cannot be sent as GNBList in RNIB is NULL");
 
    for(int i = 0; i<sz; i++)
    {
-        sleep(15); 
+        sleep(15);
+        strcpy((char*)meid,gnblist[i].c_str());
+        mdclog_write(MDCLOG_INFO,"sending %d subscription request out of : %d",i+1, sz);
+
+        //mdclog_write(MDCLOG_INFO,"GNBList,gnblist[i] = %s and ith val = %d", gnblist[i], i);
+        mdclog_write(MDCLOG_INFO,"sending subscription to ,meid = %s", meid);
+
+auto postJson = pplx::create_task([meid,xapp_id]() {
+
+
+                jsonn jsonObject;
+                 jsonObject =
+    {
+
+
+
+        {"SubscriptionId",""},
+        {"ClientEndpoint",{{"Host","service-ricxapp-bouncer-xapp-http.ricxapp"},{"HTTPPort",8080},{"RMRPort",4560}}},
+        {"Meid",meid},
+        {"RANFunctionID",0},
+        {"SubscriptionDetails",
+                {
+                        {
+                            {"XappEventInstanceId",12345},{"EventTriggers",{0}},
+                            {"ActionToBeSetupList",
+                                    {
+                                        {
+                                                {"ActionID",1},{"ActionType","report"},{"ActionDefinition",{0}},{"SubsequentAction",{{"SubsequentActionType","continue"},{"TimeToWait","zero"}}}
+                                        }
+                                    }
+                            }
+                        }
+                }
+        }
+
+    };
+                        std::cout <<jsonObject.dump(4) << "\n";
+                        utility::stringstream_t s;
+                        s << jsonObject.dump().c_str();
+                        web::json::value ret = json::value::parse(s);
+                       // std::wcout << ret.serialize().c_str() << std::endl;
+                utility::string_t port = U("8088");
+                 utility::string_t address = U("http://service-ricplt-submgr-http.ricplt.svc.cluster.local:");
+                  address.append(port);
+                  address.append(U("/ric/v1/subscriptions"));
+                uri_builder uri(address);
+                auto addr = uri.to_uri().to_string();
+                http_client client(addr);
+                //std::cout<<uri::validate(addr)<<" validation \n";
+                ucout << utility::string_t(U("making requests at: ")) << addr << "\n";
+                return client.request(methods::POST,U("/"),ret.serialize(),U("application/json"));
+                        })
+
+                        // Get the response.
+                                .then([](http_response response) {
+                                // Check the status code.
+                                if (response.status_code() != 201) {
+                                        throw std::runtime_error("Returned " + std::to_string(response.status_code()));
+                                }
+
+                                // Convert the response body to JSON object.
+                                return response.extract_json();
+                                        })
+
+                                // serailize the user details.
+                                                .then([](json::value jsonObject) {
+                                               std::cout<<"\nRecieved REST subscription response\n";
+                                                std::wcout << jsonObject.serialize().c_str() << "\n";
+                                               std::string tmp;
+                                               tmp=jsonObject[U("SubscriptionId")].as_string();
+                                               SubscriptionIds.push_back(tmp);
+
+                                                        });
+
+                                        try {
+                                                postJson.wait();
+                                        }
+                                        catch (const std::exception& e) {
+                                                printf("Error exception:%s\n", e.what());
+                                        }
+
+       
+       /*
         //give the message to subscription handler, along with the transmitter.
         strcpy((char*)meid,gnblist[i].c_str());
 
          mdclog_write(MDCLOG_INFO,"GNBList size : %d", sz);
-
+       mdclog_write(MDCLOG_INFO,"sending %d subscription request out of : %d",i+1, sz);
         subscription_helper  din;
         subscription_helper  dout;
 
@@ -160,7 +385,7 @@ void Xapp::startup_subscribe_requests(void ){
 
         res = sub_req.encode_e2ap_subscription(&buf[0], &buf_size, din);
 
-        //mdclog_write(MDCLOG_INFO,"GNBList = %s and ith val = %d", gnblist[i], i);
+        //mdclog_write(MDCLOG_INFO,"GNBList = %s and ith val = %d", gnblist[i], i);
 
         mdclog_write(MDCLOG_INFO,"Sending subscription in file= %s, line=%d for MEID %s",__FILE__,__LINE__, meid);
         
@@ -180,8 +405,10 @@ void Xapp::startup_subscribe_requests(void ){
           }
           else {
                 mdclog_write(MDCLOG_ERR,"Subscription FAILED in file= %s, line=%d for MEID %s",__FILE__,__LINE__, meid);
-              }  
-       } 
+              }
+           */  
+       }
+   std::cout<<"\n SubscriptionIds vector size= "<<SubscriptionIds.size()<<"\n"; 
 }
 
 void Xapp::startup_get_policies(void){
index bfb62cc..e436f98 100644 (file)
@@ -34,7 +34,7 @@
 #include "rapidjson/writer.h"
 #include "rapidjson/document.h"
 #include "rapidjson/error/error.h"
-
+#include<thread>
 #include "msgs_proc.hpp"
 #include "subs_mgmt.hpp"
 #include "xapp_config.hpp"