X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=pkg%2Fcontrol%2Fsubscription.go;h=4514d007fc2a786248c72fef0e7d084a0540d52e;hb=refs%2Fchanges%2F86%2F2286%2F4;hp=6fd806c610ce7c3761a9cd8d593c4e25e800a6fe;hpb=a93ec3c226130e8a300b107cd582dc57004e0c40;p=ric-plt%2Fsubmgr.git diff --git a/pkg/control/subscription.go b/pkg/control/subscription.go index 6fd806c..4514d00 100644 --- a/pkg/control/subscription.go +++ b/pkg/control/subscription.go @@ -21,23 +21,28 @@ package control import ( "fmt" + "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap" "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" "strconv" "sync" + "time" ) //----------------------------------------------------------------------------- // //----------------------------------------------------------------------------- type Subscription struct { - mutex sync.Mutex - registry *Registry - Seq uint16 - Active bool - // - Meid *xapp.RMRMeid - EpList RmrEndpointList - Trans *Transaction + mutex sync.Mutex // Lock + registry *Registry // Registry + Seq uint16 // SubsId + Meid *xapp.RMRMeid // Meid/ RanName + EpList RmrEndpointList // Endpoints + DelEpList RmrEndpointList // Endpoints + DelSeq uint64 + TransLock sync.Mutex // Lock transactions, only one executed per time for subs + TheTrans *Transaction // Ongoing transaction from xapp + SubReqMsg *e2ap.E2APSubscriptionRequest // Subscription information + SubRespMsg *e2ap.E2APSubscriptionResponse // Subscription information } func (s *Subscription) stringImpl() string { @@ -65,85 +70,105 @@ func (s *Subscription) GetMeid() *xapp.RMRMeid { return nil } -func (s *Subscription) Confirmed() { +func (s *Subscription) AddEndpoint(ep *RmrEndpoint) error { s.mutex.Lock() defer s.mutex.Unlock() - s.Active = true -} - -func (s *Subscription) UnConfirmed() { - s.mutex.Lock() - defer s.mutex.Unlock() - s.Active = false -} - -func (s *Subscription) IsConfirmed() bool { - s.mutex.Lock() - defer s.mutex.Unlock() - return s.Active + if ep == nil { + return fmt.Errorf("AddEndpoint no endpoint given") + } + if s.EpList.AddEndpoint(ep) { + s.DelEpList.DelEndpoint(ep) + if s.EpList.Size() == 1 { + return s.updateRouteImpl(CREATE) + } + return s.updateRouteImpl(MERGE) + } + return nil } -func (s *Subscription) IsEndpoint(ep *RmrEndpoint) bool { +func (s *Subscription) DelEndpoint(ep *RmrEndpoint) error { s.mutex.Lock() defer s.mutex.Unlock() - return s.EpList.HasEndpoint(ep) + var err error + if ep == nil { + return fmt.Errorf("DelEndpoint no endpoint given") + } + if s.EpList.HasEndpoint(ep) == false { + return fmt.Errorf("DelEndpoint endpoint not found") + } + if s.DelEpList.HasEndpoint(ep) == true { + return fmt.Errorf("DelEndpoint endpoint already under del") + } + s.DelEpList.AddEndpoint(ep) + go s.CleanCheck() + return err } -func (s *Subscription) SetTransaction(trans *Transaction) error { +func (s *Subscription) CleanCheck() { s.mutex.Lock() defer s.mutex.Unlock() - - if s.Trans != nil { - return fmt.Errorf("subs(%s) trans(%s) exist, can not register trans(%s)", s.stringImpl(), s.Trans, trans) + s.DelSeq++ + // Only one clean ongoing + if s.DelSeq > 1 { + return } - trans.Subs = s - s.Trans = trans - - if len(s.EpList.Endpoints) == 0 { - s.EpList.Endpoints = append(s.EpList.Endpoints, trans.RmrEndpoint) - return s.updateRouteImpl(CREATE) - } else if s.EpList.HasEndpoint(&trans.RmrEndpoint) == false { - s.EpList.Endpoints = append(s.EpList.Endpoints, trans.RmrEndpoint) - return s.updateRouteImpl(MERGE) + var currSeq uint64 = 0 + // Make sure that routes to be deleted + // are not deleted too fast + for currSeq < s.DelSeq { + currSeq = s.DelSeq + s.mutex.Unlock() + time.Sleep(5 * time.Second) + s.mutex.Lock() } - return nil + xapp.Logger.Info("DelEndpoint: delete cleaning %s", s.stringImpl()) + if s.EpList.Size() <= s.DelEpList.Size() { + s.updateRouteImpl(DELETE) + go s.registry.DelSubscription(s.Seq) + } else if s.EpList.DelEndpoints(&s.DelEpList) { + s.updateRouteImpl(MERGE) + } + s.DelSeq = 0 + } -func (s *Subscription) UnSetTransaction(trans *Transaction) bool { +func (s *Subscription) IsTransactionReserved() bool { s.mutex.Lock() defer s.mutex.Unlock() - if trans == nil || trans == s.Trans { - s.Trans = nil + if s.TheTrans != nil { return true } return false + } func (s *Subscription) GetTransaction() *Transaction { s.mutex.Lock() defer s.mutex.Unlock() - return s.Trans + return s.TheTrans } -func (s *Subscription) updateRouteImpl(act Action) error { - subRouteAction := SubRouteInfo{act, s.EpList, s.Seq} - err := s.registry.rtmgrClient.SubscriptionRequestUpdate(subRouteAction) - if err != nil { - return fmt.Errorf("subs(%s) %s", s.stringImpl(), err.Error()) - } - return nil +func (s *Subscription) WaitTransactionTurn(trans *Transaction) { + s.TransLock.Lock() + s.mutex.Lock() + s.TheTrans = trans + s.mutex.Unlock() } -func (s *Subscription) UpdateRoute(act Action) error { +func (s *Subscription) ReleaseTransactionTurn(trans *Transaction) { s.mutex.Lock() - defer s.mutex.Unlock() - return s.updateRouteImpl(act) + if trans != nil && trans == s.TheTrans { + s.TheTrans = nil + } + s.mutex.Unlock() + s.TransLock.Unlock() } -func (s *Subscription) Release() { - s.registry.DelSubscription(s.Seq) - err := s.UpdateRoute(DELETE) +func (s *Subscription) updateRouteImpl(act Action) error { + subRouteAction := SubRouteInfo{act, s.EpList, s.Seq} + err := s.registry.rtmgrClient.SubscriptionRequestUpdate(subRouteAction) if err != nil { - xapp.Logger.Error("%s", err.Error()) + return fmt.Errorf("%s %s", s.stringImpl(), err.Error()) } + return nil }