X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=pkg%2Fcontrol%2Fcontrol.go;h=3dacc5916c8e76b2caa3898cb74d026bf7e3f373;hb=fc67b036d8944c372715b7306f892d2ddfb4684f;hp=49b7968428661dce37f445fb8804481bf465a843;hpb=9c4697fa22fae79ac923e72f417ecbebf1c1e4d6;p=ric-plt%2Fsubmgr.git diff --git a/pkg/control/control.go b/pkg/control/control.go index 49b7968..3dacc59 100755 --- a/pkg/control/control.go +++ b/pkg/control/control.go @@ -180,7 +180,12 @@ func NewControl() *Control { } } - go xapp.Subscription.Listen(c.RESTSubscriptionHandler, c.RESTQueryHandler, c.RESTSubscriptionDeleteHandler) + go func() { + err := xapp.Subscription.Listen(c.RESTSubscriptionHandler, c.RESTQueryHandler, c.RESTSubscriptionDeleteHandler) + if err != nil { + xapp.Logger.Error("xapp.Subscription.Listen failure: %s", err.Error()) + } + }() return c } @@ -357,7 +362,7 @@ func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscripti if subs.PolicyUpdate == false { subs.NoRespToXapp = true xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId) - c.SendSubscriptionDeleteReq(subs) + c.SendSubscriptionDeleteReq(subs, false) } } } @@ -681,10 +686,8 @@ func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e } xapp.Logger.Error("XAPP-SubReq E2 subscription failed: %s", idstring(err, trans, subs)) - err2 := c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c) - if err2 != nil { - xapp.Logger.Error("RemoveFromSubscription failed: %s", err2.Error()) - } + c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c) + return nil, &errorInfo, err } @@ -862,10 +865,7 @@ func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs)) - err = c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c) - if err != nil { - xapp.Logger.Error("XAPP-SubDelReq %s:", idstring(fmt.Errorf("RemoveFromSubscription faliled"), trans, subs)) - } + c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c) return xAppEventInstanceID, nil } @@ -944,6 +944,8 @@ func (c *Control) Consume(msg *xapp.RMRParams) (err error) { go c.handleE2TSubscriptionDeleteResponse(msg) case xapp.RIC_SUB_DEL_FAILURE: go c.handleE2TSubscriptionDeleteFailure(msg) + case xapp.RIC_SUB_DEL_REQUIRED: + go c.handleE2TSubscriptionDeleteRequired(msg) default: xapp.Logger.Debug("Unknown Message Type '%d', discarding", msg.Mtype) } @@ -994,12 +996,14 @@ func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) { //------------------------------------------------------------------ func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) { - e2SubscriptionDirectives, _ := c.GetE2SubscriptionDirectives(nil) + e2SubscriptionDirectives, err := c.GetE2SubscriptionDirectives(nil) + if err != nil { + xapp.Logger.Error("c.GetE2SubscriptionDirectives failure: %s", err.Error()) + } subs.OngoingReqCount++ go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives, waitRouteCleanup_ms) event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side subs.OngoingReqCount-- - var err error if event != nil { switch themsg := event.(type) { case *e2ap.E2APSubscriptionResponse: @@ -1155,10 +1159,7 @@ func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *Tran // Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete) if valid == false { - err = c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanupTime, c) - if err != nil { - xapp.Logger.Error("RemoveFromSubscription() failed:%s", err.Error()) - } + c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanupTime, c) } parentTrans.SendEvent(subRfMsg, 0) @@ -1522,7 +1523,7 @@ func (c *Control) RemoveRESTSubscriptionFromDb(restSubId string) { } } -func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) { +func (c *Control) SendSubscriptionDeleteReq(subs *Subscription, e2SubsDelRequired bool) { if c.UTTesting == true { // Reqistry mutex is not locked after real restart but it can be when restart is simulated in unit tests @@ -1554,7 +1555,11 @@ func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) { params.Payload = payload.Buf params.Mbuf = nil subs.DeleteFromDb = true - c.handleXAPPSubscriptionDeleteRequest(params) + if !e2SubsDelRequired { + c.handleXAPPSubscriptionDeleteRequest(params) + } else { + c.SendSubscriptionDeleteReqToE2T(subs, params) + } } } } @@ -1628,3 +1633,83 @@ func (c *Control) PrintRESTSubscriptionRequest(p *models.SubscriptionParams) { } } } + +//------------------------------------------------------------------- +// handle from E2T Subscription Delete Required +//------------------------------------------------------------------- +func (c *Control) handleE2TSubscriptionDeleteRequired(params *xapp.RMRParams) { + xapp.Logger.Info("MSG from E2T: %s", params.String()) + c.UpdateCounter(cSubDelRequFromE2) + subsDelRequMsg, err := c.e2ap.UnpackSubscriptionDeleteRequired(params.Payload) + if err != nil { + xapp.Logger.Error("MSG-SubDelRequired: %s", idstring(err, params)) + //c.sendE2TErrorIndication(nil) + return + } + var subscriptions = map[string][]e2ap.E2APSubscriptionDeleteRequired{} + var subDB = []*Subscription{} + for _, subsTobeRemove := range subsDelRequMsg.E2APSubscriptionDeleteRequiredRequests { + subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subsTobeRemove.RequestId.InstanceId}) + if err != nil { + xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params)) + continue + } + // Check if Delete Subscription Already triggered + if subs.OngoingDelCount > 0 { + continue + } + subDB = append(subDB, subs) + for _, endpoint := range subs.EpList.Endpoints { + subscriptions[endpoint.Addr] = append(subscriptions[endpoint.Addr], subsTobeRemove) + } + // Sending Subscription Delete Request to E2T + // c.SendSubscriptionDeleteReq(subs, true) + } + for _, subsTobeRemove := range subDB { + // Sending Subscription Delete Request to E2T + c.SendSubscriptionDeleteReq(subsTobeRemove, true) + } +} + +//----------------------------------------------------------------- +// Initiate RIC Subscription Delete Request after receiving +// RIC Subscription Delete Required from E2T +//----------------------------------------------------------------- +func (c *Control) SendSubscriptionDeleteReqToE2T(subs *Subscription, params *xapp.RMRParams) { + xapp.Logger.Debug("MSG TO E2T: %s", params.String()) + c.UpdateCounter(cSubDelReqToE2) + + if c.e2IfState.IsE2ConnectionUp(¶ms.Meid.RanName) == false { + xapp.Logger.Error("No E2 connection for ranName %v", params.Meid.RanName) + return + } + + trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subs.ReqId.RequestId, params.Meid) + if trans == nil { + xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params)) + return + } + defer trans.Release() + + err := c.tracker.Track(trans) + if err != nil { + xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans)) + return + } + + // + // Wake subs delete + // + subs.OngoingDelCount++ + go c.handleSubscriptionDelete(subs, trans, waitRouteCleanup_ms) + trans.WaitEvent(0) //blocked wait as timeout is handled in subs side + subs.OngoingDelCount-- + + xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs)) + + if subs.NoRespToXapp == true { + // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions + xapp.Logger.Debug("XAPP-SubDelReq: subs.NoRespToXapp == true") + return + } +}