X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;ds=sidebyside;f=pkg%2Fcontrol%2Fregistry.go;h=33d480a9c6b708597745ebfb72431f68bc559941;hb=f6b780489e1d769ed56def6b8ff874ffca3198b7;hp=e07116c5c67eda60515a51d592783845abe94515;hpb=c92b421ec9f89e77df36422987e478ed8db85299;p=ric-plt%2Fsubmgr.git diff --git a/pkg/control/registry.go b/pkg/control/registry.go index e07116c..33d480a 100644 --- a/pkg/control/registry.go +++ b/pkg/control/registry.go @@ -128,7 +128,7 @@ func (r *Registry) findExistingSubs(trans *TransactionXapp, subReqMsg *e2ap.E2AP return nil, false } -func (r *Registry) AssignToSubscription(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, resetTestFlag bool) (*Subscription, error) { +func (r *Registry) AssignToSubscription(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, resetTestFlag bool, c *Control) (*Subscription, error) { var err error var newAlloc bool r.mutex.Lock() @@ -158,8 +158,7 @@ func (r *Registry) AssignToSubscription(trans *TransactionXapp, subReqMsg *e2ap. subs, endPointFound := r.findExistingSubs(trans, subReqMsg) if subs == nil { - subs, err = r.allocateSubs(trans, subReqMsg, resetTestFlag) - if err != nil { + if subs, err = r.allocateSubs(trans, subReqMsg, resetTestFlag); err != nil { return nil, err } newAlloc = true @@ -167,7 +166,6 @@ func (r *Registry) AssignToSubscription(trans *TransactionXapp, subReqMsg *e2ap. // Requesting endpoint is already present in existing subscription. This can happen if xApp is restarted. subs.RetryFromXapp = true xapp.Logger.Debug("CREATE: subscription already exists. %s", subs.String()) - //xapp.Logger.Debug("Registry: substable=%v", r.register) return subs, nil } @@ -178,17 +176,16 @@ func (r *Registry) AssignToSubscription(trans *TransactionXapp, subReqMsg *e2ap. defer subs.mutex.Unlock() epamount := subs.EpList.Size() + xapp.Logger.Info("AssignToSubscription subs.EpList.Size() = %v", subs.EpList.Size()) r.mutex.Unlock() // // Subscription route updates // if epamount == 1 { - subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)} - err = r.rtmgrClient.SubscriptionRequestCreate(subRouteAction) + err = r.RouteCreate(subs, c) } else { - subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)} - err = r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction) + err = r.RouteCreateUpdate(subs, c) } r.mutex.Lock() @@ -196,6 +193,8 @@ func (r *Registry) AssignToSubscription(trans *TransactionXapp, subReqMsg *e2ap. if newAlloc { r.subIds = append(r.subIds, subs.ReqId.InstanceId) } + // Delete already added endpoint for the request + subs.EpList.DelEndpoint(trans.GetEndpoint()) return nil, err } @@ -207,6 +206,26 @@ func (r *Registry) AssignToSubscription(trans *TransactionXapp, subReqMsg *e2ap. return subs, nil } +func (r *Registry) RouteCreate(subs *Subscription, c *Control) error { + subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)} + err := r.rtmgrClient.SubscriptionRequestCreate(subRouteAction) + if err != nil { + c.UpdateCounter(cRouteCreateFail) + } + return err +} + +func (r *Registry) RouteCreateUpdate(subs *Subscription, c *Control) error { + subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)} + err := r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction) + if err != nil { + c.UpdateCounter(cRouteCreateUpdateFail) + return err + } + c.UpdateCounter(cMergedSubscriptions) + return err +} + func (r *Registry) CheckActionTypes(subReqMsg *e2ap.E2APSubscriptionRequest) (uint64, error) { var reportFound bool = false var policyFound bool = false @@ -267,10 +286,7 @@ func (r *Registry) RemoveFromSubscription(subs *Subscription, trans *Transaction // // Subscription route delete // - tmpList := xapp.RmrEndpointList{} - tmpList.AddEndpoint(trans.GetEndpoint()) - subRouteAction := SubRouteInfo{tmpList, uint16(subId)} - r.rtmgrClient.SubscriptionRequestDelete(subRouteAction) + r.RouteDelete(subs, trans, c) // // Subscription release @@ -286,16 +302,31 @@ func (r *Registry) RemoveFromSubscription(subs *Subscription, trans *Transaction r.subIds = append(r.subIds, subId) } else if subs.EpList.Size() > 0 { // - // Subscription route updates + // Subscription route update // - subRouteAction := SubRouteInfo{subs.EpList, uint16(subId)} - r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction) + r.RouteDeleteUpdate(subs, c) } }() return nil } +func (r *Registry) RouteDelete(subs *Subscription, trans *TransactionXapp, c *Control) { + tmpList := xapp.RmrEndpointList{} + tmpList.AddEndpoint(trans.GetEndpoint()) + subRouteAction := SubRouteInfo{tmpList, uint16(subs.ReqId.InstanceId)} + if err := r.rtmgrClient.SubscriptionRequestDelete(subRouteAction); err != nil { + c.UpdateCounter(cRouteDeleteFail) + } +} + +func (r *Registry) RouteDeleteUpdate(subs *Subscription, c *Control) { + subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.InstanceId)} + if err := r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction); err != nil { + c.UpdateCounter(cRouteDeleteUpdateFail) + } +} + func (r *Registry) UpdateSubscriptionToDb(subs *Subscription, c *Control) { r.mutex.Lock() defer r.mutex.Unlock() @@ -312,6 +343,7 @@ func (r *Registry) UpdateSubscriptionToDb(subs *Subscription, c *Control) { } else if subs.EpList.Size() > 0 { // Endpoint of merged subscription is being deleted c.WriteSubscriptionToDb(subs) + c.UpdateCounter(cUnmergedSubscriptions) } }