X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;ds=sidebyside;f=pkg%2Fcontrol%2Fcontrol.go;h=ec6419e3794bfc149c199bfe50424a4c9a1fbbe8;hb=e00186861608731e2390055a0e1b1cf455670508;hp=f6cd771e7e20a5e58af049cd5ad5dbbfec76dc7f;hpb=e9608cd4ebb86e569dbc74978689fb8104a0d493;p=ric-plt%2Fsubmgr.git diff --git a/pkg/control/control.go b/pkg/control/control.go index f6cd771..ec6419e 100644 --- a/pkg/control/control.go +++ b/pkg/control/control.go @@ -53,17 +53,6 @@ type RMRMeid struct { EnbID string } -type RMRParams struct { - Mtype int - Payload []byte - PayloadLen int - Meid *RMRMeid - Xid string - SubId int - Src string - Mbuf *C.rmr_mbuf_t -} - var SEEDSN uint16 var SubscriptionReqChan = make(chan subRouteInfo, 10) @@ -98,7 +87,8 @@ func NewControl() Control { transport := httptransport.New(viper.GetString("rtmgr.HostAddr") + ":" + viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"}) client := rtmgrclient.New(transport, strfmt.Default) handle := rtmgrhandle.NewProvideXappSubscriptionHandleParamsWithTimeout(10 * time.Second) - rtmgrClient := RtmgrClient{client, handle} + delete_handle := rtmgrhandle.NewDeleteXappSubscriptionHandleParamsWithTimeout(10 * time.Second) + rtmgrClient := RtmgrClient{client, handle, delete_handle} return Control{new(E2ap), registry, &rtmgrClient, tracker} } @@ -115,6 +105,8 @@ func (c *Control) Consume(rp *xapp.RMRParams) (err error) { err = c.handleSubscriptionResponse(rp) case C.RIC_SUB_DEL_REQ: err = c.handleSubscriptionDeleteRequest(rp) + case C.RIC_SUB_DEL_RESP: + err = c.handleSubscriptionDeleteResponse(rp) default: err = errors.New("Message Type " + strconv.Itoa(rp.Mtype) + " is discarded") } @@ -128,6 +120,13 @@ func (c *Control) rmrSend(params *xapp.RMRParams) (err error) { return } +func (c *Control) rmrReplyToSender(params *xapp.RMRParams) (err error) { + if !xapp.Rmr.Send(params, true) { + err = errors.New("rmr.Send() failed") + } + return +} + func (c *Control) handleSubscriptionRequest(params *xapp.RMRParams) (err error) { payload_seq_num, err := c.e2ap.GetSubscriptionRequestSequenceNumber(params.Payload) if err != nil { @@ -153,7 +152,7 @@ func (c *Control) handleSubscriptionRequest(params *xapp.RMRParams) (err error) /* Create transatcion records for every subscription request */ xact_key := Transaction_key{new_sub_id, CREATE} - xact_value := Transaction{*src_addr, *src_port, params.Payload} + xact_value := Transaction{*src_addr, *src_port, params.Payload, params.Mbuf} err = c.tracker.Track_transaction(xact_key, xact_value) if err != nil { xapp.Logger.Error("Failed to create a transaction record due to %v", err) @@ -170,6 +169,7 @@ func (c *Control) handleSubscriptionRequest(params *xapp.RMRParams) (err error) xapp.Logger.Info("Generated ID: %v. Forwarding to E2 Termination...", int(new_sub_id)) c.rmrSend(params) + xapp.Logger.Info("--- Debugging transaction table = %v", c.tracker.transaction_table) return } @@ -186,7 +186,14 @@ func (c *Control) handleSubscriptionResponse(params *xapp.RMRParams) (err error) } c.registry.setSubscriptionToConfirmed(payload_seq_num) xapp.Logger.Info("Subscription Response Registered. Forwarding to Requestor...") - c.rmrSend(params) + transaction, err := c.tracker.complete_transaction(payload_seq_num, DELETE) + if err != nil { + xapp.Logger.Error("Failed to create a transaction record due to %v", err) + return + } + xapp.Logger.Info("Subscription ID: %v, from address: %v:%v. Forwarding to E2 Termination...", int(payload_seq_num), transaction.Xapp_instance_address, transaction.Xapp_port) + params.Mbuf = transaction.Mbuf + c.rmrReplyToSender(params) return } @@ -221,8 +228,47 @@ func (c *Control) handleSubscriptionDeleteRequest(params *xapp.RMRParams) (err e xapp.Logger.Info("Subscription Delete Request Received. RMR SUBSCRIPTION_ID: %v | PAYLOAD SEQUENCE_NUMBER: %v", params.SubId, payload_seq_num) if c.registry.IsValidSequenceNumber(payload_seq_num) { c.registry.deleteSubscription(payload_seq_num) + trackErr := c.trackDeleteTransaction(params, payload_seq_num) + if trackErr != nil { + xapp.Logger.Error("Failed to create a transaction record due to %v", err) + return trackErr + } } xapp.Logger.Info("Subscription ID: %v. Forwarding to E2 Termination...", int(payload_seq_num)) c.rmrSend(params) return } + +func (c *Control) trackDeleteTransaction(params *xapp.RMRParams, payload_seq_num uint16) (err error) { + src_addr, src_port, err := c.rtmgrClient.SplitSource(params.Src) + xact_key := Transaction_key{payload_seq_num, DELETE} + xact_value := Transaction{*src_addr, *src_port, params.Payload, params.Mbuf} + err = c.tracker.Track_transaction(xact_key, xact_value) + return +} + +func (c *Control) handleSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) { + payload_seq_num, err := c.e2ap.GetSubscriptionDeleteResponseSequenceNumber(params.Payload) + if err != nil { + err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error()) + return + } + var transaction , _= c.tracker.Retrive_transaction(payload_seq_num, DELETE) + sub_route_action := subRouteInfo{DELETE, transaction.Xapp_instance_address, transaction.Xapp_port, payload_seq_num } + go c.rtmgrClient.SubscriptionRequestUpdate() + SubscriptionReqChan <- sub_route_action + + xapp.Logger.Info("Subscription Delete Response Received. RMR SUBSCRIPTION_ID: %v | PAYLOAD SEQUENCE_NUMBER: %v", params.SubId, payload_seq_num) + if c.registry.releaseSequenceNumber(payload_seq_num) { + transaction, err = c.tracker.complete_transaction(payload_seq_num, DELETE) + if err != nil { + xapp.Logger.Error("Failed to create a transaction record due to %v", err) + return + } + xapp.Logger.Info("Subscription ID: %v, from address: %v:%v. Forwarding to E2 Termination...", int(payload_seq_num), transaction.Xapp_instance_address, transaction.Xapp_port) + //params.Src = xAddress + ":" + strconv.Itoa(int(xPort)) + params.Mbuf = transaction.Mbuf + c.rmrReplyToSender(params) + } + return +}