X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=pkg%2Fcontrol%2Fcontrol.go;h=1f769e1229af9560582ae63bd4ded958a98e95b3;hb=42aee7e5d6235c7a6ffe18d1ddb0aa89ad8f135d;hp=49b7968428661dce37f445fb8804481bf465a843;hpb=9c4697fa22fae79ac923e72f417ecbebf1c1e4d6;p=ric-plt%2Fsubmgr.git diff --git a/pkg/control/control.go b/pkg/control/control.go index 49b7968..1f769e1 100755 --- a/pkg/control/control.go +++ b/pkg/control/control.go @@ -72,6 +72,7 @@ var checkE2State string var readSubsFromDb string var dbRetryForever string var dbTryCount int +var e2IEOrderCheckValue uint8 type Control struct { *xapp.RMRClient @@ -180,7 +181,12 @@ func NewControl() *Control { } } - go xapp.Subscription.Listen(c.RESTSubscriptionHandler, c.RESTQueryHandler, c.RESTSubscriptionDeleteHandler) + go func() { + err := xapp.Subscription.Listen(c.RESTSubscriptionHandler, c.RESTQueryHandler, c.RESTSubscriptionDeleteHandler) + if err != nil { + xapp.Logger.Error("xapp.Subscription.Listen failure: %s", err.Error()) + } + }() return c } @@ -343,6 +349,11 @@ func (c *Control) ReadConfigParameters(f string) { xapp.Logger.Debug("WARNING: Using hard coded default value for waitRouteCleanup_ms") } xapp.Logger.Debug("waitRouteCleanup= %v", waitRouteCleanup_ms) + + viper.SetDefault("controls.checkE2IEOrder", 1) + e2IEOrderCheckValue = uint8(viper.GetUint("controls.checkE2IEOrder")) + c.e2ap.SetE2IEOrderCheck(e2IEOrderCheckValue) + xapp.Logger.Debug("e2IEOrderCheck= %v", e2IEOrderCheckValue) } //------------------------------------------------------------------- @@ -357,7 +368,7 @@ func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscripti if subs.PolicyUpdate == false { subs.NoRespToXapp = true xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId) - c.SendSubscriptionDeleteReq(subs) + c.SendSubscriptionDeleteReq(subs, false) } } } @@ -445,8 +456,12 @@ func (c *Control) RESTSubscriptionHandler(params interface{}) (*models.Subscript c.PrintRESTSubscriptionRequest(p) } - if c.e2IfState.IsE2ConnectionUp(p.Meid) == false { - xapp.Logger.Error("No E2 connection for ranName %v", *p.Meid) + if c.e2IfState.IsE2ConnectionUp(p.Meid) == false || c.e2IfState.IsE2ConnectionUnderReset(p.Meid) == true { + if c.e2IfState.IsE2ConnectionUp(p.Meid) == false { + xapp.Logger.Error("No E2 connection for ranName %v", *p.Meid) + } else if c.e2IfState.IsE2ConnectionUnderReset(p.Meid) == true { + xapp.Logger.Error("E2 Node for ranName %v UNDER RESET", *p.Meid) + } c.UpdateCounter(cRestReqRejDueE2Down) return nil, common.SubscribeServiceUnavailableCode } @@ -681,10 +696,12 @@ func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e } xapp.Logger.Error("XAPP-SubReq E2 subscription failed: %s", idstring(err, trans, subs)) - err2 := c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c) - if err2 != nil { - xapp.Logger.Error("RemoveFromSubscription failed: %s", err2.Error()) + // If policy type subscription fails we cannot remove it only internally. Once subscription has been created + // successfully, it must be deleted on both sides. + if subs.PolicyUpdate == false { + c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c) } + return nil, &errorInfo, err } @@ -862,10 +879,7 @@ func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs)) - err = c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c) - if err != nil { - xapp.Logger.Error("XAPP-SubDelReq %s:", idstring(fmt.Errorf("RemoveFromSubscription faliled"), trans, subs)) - } + c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c) return xAppEventInstanceID, nil } @@ -944,6 +958,8 @@ func (c *Control) Consume(msg *xapp.RMRParams) (err error) { go c.handleE2TSubscriptionDeleteResponse(msg) case xapp.RIC_SUB_DEL_FAILURE: go c.handleE2TSubscriptionDeleteFailure(msg) + case xapp.RIC_SUB_DEL_REQUIRED: + go c.handleE2TSubscriptionDeleteRequired(msg) default: xapp.Logger.Debug("Unknown Message Type '%d', discarding", msg.Mtype) } @@ -994,12 +1010,14 @@ func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) { //------------------------------------------------------------------ func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) { - e2SubscriptionDirectives, _ := c.GetE2SubscriptionDirectives(nil) + e2SubscriptionDirectives, err := c.GetE2SubscriptionDirectives(nil) + if err != nil { + xapp.Logger.Error("c.GetE2SubscriptionDirectives failure: %s", err.Error()) + } subs.OngoingReqCount++ go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives, waitRouteCleanup_ms) event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side subs.OngoingReqCount-- - var err error if event != nil { switch themsg := event.(type) { case *e2ap.E2APSubscriptionResponse: @@ -1118,7 +1136,13 @@ func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *Tran subRfMsg, valid = subs.SetCachedResponse(event, true) subs.SubRespRcvd = true case *e2ap.E2APSubscriptionFailure: - subRfMsg, valid = subs.SetCachedResponse(event, false) + if subs.PolicyUpdate == false { + subRfMsg, valid = subs.SetCachedResponse(event, false) + } else { + // In policy update case where subscription has already been created successfully in Gnb + // we cannot delete subscription internally in submgr + subRfMsg, valid = subs.SetCachedResponse(event, true) + } xapp.Logger.Debug("SUBS-SubReq: internal delete due failure event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans)) case *SubmgrRestartTestEvent: // This is used to simulate that no response has been received and after restart, subscriptions are restored from db @@ -1142,6 +1166,9 @@ func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *Tran } else { xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans)) } + xapp.Logger.Debug("subs.PolicyUpdate: %v", subs.PolicyUpdate) + xapp.Logger.Debug("subs: %v", subs) + if valid == false { removeSubscriptionFromDb = true } @@ -1155,10 +1182,7 @@ func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *Tran // Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete) if valid == false { - err = c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanupTime, c) - if err != nil { - xapp.Logger.Error("RemoveFromSubscription() failed:%s", err.Error()) - } + c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanupTime, c) } parentTrans.SendEvent(subRfMsg, 0) @@ -1522,7 +1546,7 @@ func (c *Control) RemoveRESTSubscriptionFromDb(restSubId string) { } } -func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) { +func (c *Control) SendSubscriptionDeleteReq(subs *Subscription, e2SubsDelRequired bool) { if c.UTTesting == true { // Reqistry mutex is not locked after real restart but it can be when restart is simulated in unit tests @@ -1554,7 +1578,11 @@ func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) { params.Payload = payload.Buf params.Mbuf = nil subs.DeleteFromDb = true - c.handleXAPPSubscriptionDeleteRequest(params) + if !e2SubsDelRequired { + c.handleXAPPSubscriptionDeleteRequest(params) + } else { + c.SendSubscriptionDeleteReqToE2T(subs, params) + } } } } @@ -1628,3 +1656,83 @@ func (c *Control) PrintRESTSubscriptionRequest(p *models.SubscriptionParams) { } } } + +//------------------------------------------------------------------- +// handle from E2T Subscription Delete Required +//------------------------------------------------------------------- +func (c *Control) handleE2TSubscriptionDeleteRequired(params *xapp.RMRParams) { + xapp.Logger.Info("MSG from E2T: %s", params.String()) + c.UpdateCounter(cSubDelRequFromE2) + subsDelRequMsg, err := c.e2ap.UnpackSubscriptionDeleteRequired(params.Payload) + if err != nil { + xapp.Logger.Error("MSG-SubDelRequired: %s", idstring(err, params)) + //c.sendE2TErrorIndication(nil) + return + } + var subscriptions = map[string][]e2ap.E2APSubscriptionDeleteRequired{} + var subDB = []*Subscription{} + for _, subsTobeRemove := range subsDelRequMsg.E2APSubscriptionDeleteRequiredRequests { + subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subsTobeRemove.RequestId.InstanceId}) + if err != nil { + xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params)) + continue + } + // Check if Delete Subscription Already triggered + if subs.OngoingDelCount > 0 { + continue + } + subDB = append(subDB, subs) + for _, endpoint := range subs.EpList.Endpoints { + subscriptions[endpoint.Addr] = append(subscriptions[endpoint.Addr], subsTobeRemove) + } + // Sending Subscription Delete Request to E2T + // c.SendSubscriptionDeleteReq(subs, true) + } + for _, subsTobeRemove := range subDB { + // Sending Subscription Delete Request to E2T + c.SendSubscriptionDeleteReq(subsTobeRemove, true) + } +} + +//----------------------------------------------------------------- +// Initiate RIC Subscription Delete Request after receiving +// RIC Subscription Delete Required from E2T +//----------------------------------------------------------------- +func (c *Control) SendSubscriptionDeleteReqToE2T(subs *Subscription, params *xapp.RMRParams) { + xapp.Logger.Debug("MSG TO E2T: %s", params.String()) + c.UpdateCounter(cSubDelReqToE2) + + if c.e2IfState.IsE2ConnectionUp(¶ms.Meid.RanName) == false { + xapp.Logger.Error("No E2 connection for ranName %v", params.Meid.RanName) + return + } + + trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subs.ReqId.RequestId, params.Meid) + if trans == nil { + xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params)) + return + } + defer trans.Release() + + err := c.tracker.Track(trans) + if err != nil { + xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans)) + return + } + + // + // Wake subs delete + // + subs.OngoingDelCount++ + go c.handleSubscriptionDelete(subs, trans, waitRouteCleanup_ms) + trans.WaitEvent(0) //blocked wait as timeout is handled in subs side + subs.OngoingDelCount-- + + xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs)) + + if subs.NoRespToXapp == true { + // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions + xapp.Logger.Debug("XAPP-SubDelReq: subs.NoRespToXapp == true") + return + } +}