X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=pkg%2Fcontrol%2Fsubscription.go;h=6fd806c610ce7c3761a9cd8d593c4e25e800a6fe;hb=31797b49985822f1d402501f16ab2794838bebba;hp=6ed3d322643a0be4808c4912c35a9eedd4618235;hpb=e406a34d5547107533e65ddfbb2074e96d77b4b3;p=ric-plt%2Fsubmgr.git diff --git a/pkg/control/subscription.go b/pkg/control/subscription.go index 6ed3d32..6fd806c 100644 --- a/pkg/control/subscription.go +++ b/pkg/control/subscription.go @@ -30,19 +30,24 @@ import ( // //----------------------------------------------------------------------------- type Subscription struct { - mutex sync.Mutex - Seq uint16 - Active bool + mutex sync.Mutex + registry *Registry + Seq uint16 + Active bool // - Meid *xapp.RMRMeid - RmrEndpoint // xapp endpoint. Now only one xapp can have relation to single subscription. To be changed in merge - Trans *Transaction + Meid *xapp.RMRMeid + EpList RmrEndpointList + Trans *Transaction +} + +func (s *Subscription) stringImpl() string { + return "subs(" + strconv.FormatUint(uint64(s.Seq), 10) + "/" + s.Meid.RanName + "/" + s.EpList.String() + ")" } func (s *Subscription) String() string { s.mutex.Lock() defer s.mutex.Unlock() - return strconv.FormatUint(uint64(s.Seq), 10) + "/" + s.RmrEndpoint.String() + "/" + s.Meid.RanName + return s.stringImpl() } func (s *Subscription) GetSubId() uint16 { @@ -78,20 +83,29 @@ func (s *Subscription) IsConfirmed() bool { return s.Active } -func (s *Subscription) SetTransaction(trans *Transaction) error { +func (s *Subscription) IsEndpoint(ep *RmrEndpoint) bool { s.mutex.Lock() defer s.mutex.Unlock() + return s.EpList.HasEndpoint(ep) +} - subString := strconv.FormatUint(uint64(s.Seq), 10) + "/" + s.RmrEndpoint.String() + "/" + s.Meid.RanName +func (s *Subscription) SetTransaction(trans *Transaction) error { + s.mutex.Lock() + defer s.mutex.Unlock() - if (s.RmrEndpoint.Addr != trans.RmrEndpoint.Addr) || (s.RmrEndpoint.Port != trans.RmrEndpoint.Port) { - return fmt.Errorf("Subscription: %s endpoint mismatch with trans: %s", subString, trans) - } if s.Trans != nil { - return fmt.Errorf("Subscription: %s trans %s exist, can not register %s", subString, s.Trans, trans) + return fmt.Errorf("subs(%s) trans(%s) exist, can not register trans(%s)", s.stringImpl(), s.Trans, trans) } trans.Subs = s s.Trans = trans + + if len(s.EpList.Endpoints) == 0 { + s.EpList.Endpoints = append(s.EpList.Endpoints, trans.RmrEndpoint) + return s.updateRouteImpl(CREATE) + } else if s.EpList.HasEndpoint(&trans.RmrEndpoint) == false { + s.EpList.Endpoints = append(s.EpList.Endpoints, trans.RmrEndpoint) + return s.updateRouteImpl(MERGE) + } return nil } @@ -111,14 +125,25 @@ func (s *Subscription) GetTransaction() *Transaction { return s.Trans } -func (s *Subscription) UpdateRoute(act Action, rtmgrClient *RtmgrClient) error { +func (s *Subscription) updateRouteImpl(act Action) error { + subRouteAction := SubRouteInfo{act, s.EpList, s.Seq} + err := s.registry.rtmgrClient.SubscriptionRequestUpdate(subRouteAction) + if err != nil { + return fmt.Errorf("subs(%s) %s", s.stringImpl(), err.Error()) + } + return nil +} + +func (s *Subscription) UpdateRoute(act Action) error { s.mutex.Lock() defer s.mutex.Unlock() - xapp.Logger.Info("Subscription: Starting routing manager route add. SubId: %d, RmrEndpoint: %s", s.Seq, s.RmrEndpoint) - subRouteAction := SubRouteInfo{act, s.RmrEndpoint.Addr, s.RmrEndpoint.Port, s.Seq} - err := rtmgrClient.SubscriptionRequestUpdate(subRouteAction) + return s.updateRouteImpl(act) +} + +func (s *Subscription) Release() { + s.registry.DelSubscription(s.Seq) + err := s.UpdateRoute(DELETE) if err != nil { - return fmt.Errorf("Subscription: Failed to add route. SubId: %d, RmrEndpoint: %s", s.Seq, s.RmrEndpoint) + xapp.Logger.Error("%s", err.Error()) } - return nil }