X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=pkg%2Fcontrol%2Fsubscription.go;h=c3e1c205cecaa85e03995deed140ceec8287ebd4;hb=d708a43badb0742684b22866977f14cc1c03a1ba;hp=6a72f7423f64e3bda3ed34132dd72ccfb0770e5e;hpb=56e0383cad5307302f547a95755c3bcdd9e3251d;p=ric-plt%2Fsubmgr.git diff --git a/pkg/control/subscription.go b/pkg/control/subscription.go index 6a72f74..c3e1c20 100644 --- a/pkg/control/subscription.go +++ b/pkg/control/subscription.go @@ -20,9 +20,9 @@ package control import ( - "fmt" + "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap" + "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/xapptweaks" "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" - "strconv" "sync" ) @@ -30,26 +30,40 @@ import ( // //----------------------------------------------------------------------------- type Subscription struct { - mutex sync.Mutex - registry *Registry - Seq uint16 - Active bool - // - Meid *xapp.RMRMeid - RmrEndpoint // xapp endpoint. Now only one xapp can have relation to single subscription. To be changed in merge - Trans *Transaction + mutex sync.Mutex // Lock + valid bool // valid + registry *Registry // Registry + ReqId RequestId // ReqId (Requestor Id + Seq Nro a.k.a subsid) + Meid *xapp.RMRMeid // Meid/ RanName + EpList RmrEndpointList // Endpoints + TransLock sync.Mutex // Lock transactions, only one executed per time for subs + TheTrans TransactionIf // Ongoing transaction + SubReqMsg *e2ap.E2APSubscriptionRequest // Subscription information + SubRFMsg interface{} // Subscription information } func (s *Subscription) String() string { + return "subs(" + s.ReqId.String() + "/" + (&xapptweaks.RMRMeid{s.Meid}).String() + "/" + s.EpList.String() + ")" +} + +func (s *Subscription) GetCachedResponse() (interface{}, bool) { + s.mutex.Lock() + defer s.mutex.Unlock() + return s.SubRFMsg, s.valid +} + +func (s *Subscription) SetCachedResponse(subRFMsg interface{}, valid bool) (interface{}, bool) { s.mutex.Lock() defer s.mutex.Unlock() - return strconv.FormatUint(uint64(s.Seq), 10) + "/" + s.RmrEndpoint.String() + "/" + s.Meid.RanName + s.SubRFMsg = subRFMsg + s.valid = valid + return s.SubRFMsg, s.valid } -func (s *Subscription) GetSubId() uint16 { +func (s *Subscription) GetReqId() *RequestId { s.mutex.Lock() defer s.mutex.Unlock() - return s.Seq + return &s.ReqId } func (s *Subscription) GetMeid() *xapp.RMRMeid { @@ -61,80 +75,93 @@ func (s *Subscription) GetMeid() *xapp.RMRMeid { return nil } -func (s *Subscription) Confirmed() { +func (s *Subscription) GetTransaction() TransactionIf { s.mutex.Lock() defer s.mutex.Unlock() - s.Active = true + return s.TheTrans } -func (s *Subscription) UnConfirmed() { +func (s *Subscription) WaitTransactionTurn(trans TransactionIf) { + s.TransLock.Lock() s.mutex.Lock() - defer s.mutex.Unlock() - s.Active = false + s.TheTrans = trans + s.mutex.Unlock() } -func (s *Subscription) IsConfirmed() bool { +func (s *Subscription) ReleaseTransactionTurn(trans TransactionIf) { s.mutex.Lock() - defer s.mutex.Unlock() - return s.Active + if trans != nil && trans == s.TheTrans { + s.TheTrans = nil + } + s.mutex.Unlock() + s.TransLock.Unlock() } -func (s *Subscription) SetTransaction(trans *Transaction) error { +func (s *Subscription) IsMergeable(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) bool { s.mutex.Lock() defer s.mutex.Unlock() - subString := strconv.FormatUint(uint64(s.Seq), 10) + "/" + s.RmrEndpoint.String() + "/" + s.Meid.RanName + if s.valid == false { + return false + } - if (s.RmrEndpoint.Addr != trans.RmrEndpoint.Addr) || (s.RmrEndpoint.Port != trans.RmrEndpoint.Port) { - return fmt.Errorf("Subscription: %s endpoint mismatch with trans: %s", subString, trans) + if s.SubReqMsg == nil { + return false } - if s.Trans != nil { - return fmt.Errorf("Subscription: %s trans %s exist, can not register %s", subString, s.Trans, trans) + + if s.Meid.RanName != trans.Meid.RanName { + return false } - trans.Subs = s - s.Trans = trans - return nil -} -func (s *Subscription) UnSetTransaction(trans *Transaction) bool { - s.mutex.Lock() - defer s.mutex.Unlock() - if trans == nil || trans == s.Trans { - s.Trans = nil - return true + // EventTrigger check + if s.SubReqMsg.EventTriggerDefinition.InterfaceDirection != subReqMsg.EventTriggerDefinition.InterfaceDirection || + s.SubReqMsg.EventTriggerDefinition.ProcedureCode != subReqMsg.EventTriggerDefinition.ProcedureCode || + s.SubReqMsg.EventTriggerDefinition.TypeOfMessage != subReqMsg.EventTriggerDefinition.TypeOfMessage { + return false } - return false -} -func (s *Subscription) GetTransaction() *Transaction { - s.mutex.Lock() - defer s.mutex.Unlock() - return s.Trans -} + if s.SubReqMsg.EventTriggerDefinition.InterfaceId.GlobalEnbId.Present != subReqMsg.EventTriggerDefinition.InterfaceId.GlobalEnbId.Present || + s.SubReqMsg.EventTriggerDefinition.InterfaceId.GlobalEnbId.NodeId != subReqMsg.EventTriggerDefinition.InterfaceId.GlobalEnbId.NodeId || + s.SubReqMsg.EventTriggerDefinition.InterfaceId.GlobalEnbId.PlmnIdentity.String() != subReqMsg.EventTriggerDefinition.InterfaceId.GlobalEnbId.PlmnIdentity.String() { + return false + } -func (s *Subscription) updateRouteImpl(act Action) error { - xapp.Logger.Info("Subscription: Starting routing manager route add. SubId: %d, RmrEndpoint: %s", s.Seq, s.RmrEndpoint) - subRouteAction := SubRouteInfo{act, s.RmrEndpoint.Addr, s.RmrEndpoint.Port, s.Seq} - err := s.registry.rtmgrClient.SubscriptionRequestUpdate(subRouteAction) - if err != nil { - return fmt.Errorf("Subscription: Failed to add route. SubId: %d, RmrEndpoint: %s", s.Seq, s.RmrEndpoint) + if s.SubReqMsg.EventTriggerDefinition.InterfaceId.GlobalGnbId.Present != subReqMsg.EventTriggerDefinition.InterfaceId.GlobalGnbId.Present || + s.SubReqMsg.EventTriggerDefinition.InterfaceId.GlobalGnbId.NodeId != subReqMsg.EventTriggerDefinition.InterfaceId.GlobalGnbId.NodeId || + s.SubReqMsg.EventTriggerDefinition.InterfaceId.GlobalGnbId.PlmnIdentity.String() != subReqMsg.EventTriggerDefinition.InterfaceId.GlobalGnbId.PlmnIdentity.String() { + return false } - return nil -} -func (s *Subscription) UpdateRoute(act Action) error { - s.mutex.Lock() - defer s.mutex.Unlock() - return s.updateRouteImpl(act) -} + // Actions check + if len(s.SubReqMsg.ActionSetups) != len(subReqMsg.ActionSetups) { + return false + } -func (s *Subscription) Release() { - xapp.Logger.Info("Subscription: Releasing %s", s) - s.mutex.Lock() - defer s.mutex.Unlock() - s.registry.DelSubscription(s.Seq) - err := s.updateRouteImpl(DELETE) - if err != nil { - xapp.Logger.Error("Registry: Failed to del route. SubId: %d, RmrEndpoint: %s", s.Seq, s.RmrEndpoint) + for _, acts := range s.SubReqMsg.ActionSetups { + for _, actt := range subReqMsg.ActionSetups { + if acts.ActionId != actt.ActionId { + return false + } + if acts.ActionType != actt.ActionType { + return false + } + + if acts.ActionType != e2ap.E2AP_ActionTypeReport { + return false + } + + if acts.ActionDefinition.Present != actt.ActionDefinition.Present || + acts.ActionDefinition.StyleId != actt.ActionDefinition.StyleId || + acts.ActionDefinition.ParamId != actt.ActionDefinition.ParamId { + return false + } + if acts.SubsequentAction.Present != actt.SubsequentAction.Present || + acts.SubsequentAction.Type != actt.SubsequentAction.Type || + acts.SubsequentAction.TimetoWait != actt.SubsequentAction.TimetoWait { + return false + } + } } + + return true }