Seq uint16
Active bool
//
- Meid *xapp.RMRMeid
- RmrEndpoint // xapp endpoint. Now only one xapp can have relation to single subscription. To be changed in merge
- Trans *Transaction
+ Meid *xapp.RMRMeid
+ EpList RmrEndpointList
+ Trans *Transaction
+}
+
+func (s *Subscription) stringImpl() string {
+ return "subs(" + strconv.FormatUint(uint64(s.Seq), 10) + "/" + s.Meid.RanName + "/" + s.EpList.String() + ")"
}
func (s *Subscription) String() string {
s.mutex.Lock()
defer s.mutex.Unlock()
- return strconv.FormatUint(uint64(s.Seq), 10) + "/" + s.RmrEndpoint.String() + "/" + s.Meid.RanName
+ return s.stringImpl()
}
func (s *Subscription) GetSubId() uint16 {
return s.Active
}
-func (s *Subscription) SetTransaction(trans *Transaction) error {
+func (s *Subscription) IsEndpoint(ep *RmrEndpoint) bool {
s.mutex.Lock()
defer s.mutex.Unlock()
+ return s.EpList.HasEndpoint(ep)
+}
- subString := strconv.FormatUint(uint64(s.Seq), 10) + "/" + s.RmrEndpoint.String() + "/" + s.Meid.RanName
+func (s *Subscription) SetTransaction(trans *Transaction) error {
+ s.mutex.Lock()
+ defer s.mutex.Unlock()
- if (s.RmrEndpoint.Addr != trans.RmrEndpoint.Addr) || (s.RmrEndpoint.Port != trans.RmrEndpoint.Port) {
- return fmt.Errorf("Subscription: %s endpoint mismatch with trans: %s", subString, trans)
- }
if s.Trans != nil {
- return fmt.Errorf("Subscription: %s trans %s exist, can not register %s", subString, s.Trans, trans)
+ return fmt.Errorf("subs(%s) trans(%s) exist, can not register trans(%s)", s.stringImpl(), s.Trans, trans)
}
trans.Subs = s
s.Trans = trans
+
+ if len(s.EpList.Endpoints) == 0 {
+ s.EpList.Endpoints = append(s.EpList.Endpoints, trans.RmrEndpoint)
+ return s.updateRouteImpl(CREATE)
+ } else if s.EpList.HasEndpoint(&trans.RmrEndpoint) == false {
+ s.EpList.Endpoints = append(s.EpList.Endpoints, trans.RmrEndpoint)
+ return s.updateRouteImpl(MERGE)
+ }
return nil
}
}
func (s *Subscription) updateRouteImpl(act Action) error {
- xapp.Logger.Info("Subscription: Starting routing manager route add. SubId: %d, RmrEndpoint: %s", s.Seq, s.RmrEndpoint)
- subRouteAction := SubRouteInfo{act, s.RmrEndpoint.Addr, s.RmrEndpoint.Port, s.Seq}
+ subRouteAction := SubRouteInfo{act, s.EpList, s.Seq}
err := s.registry.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
if err != nil {
- return fmt.Errorf("Subscription: Failed to add route. SubId: %d, RmrEndpoint: %s", s.Seq, s.RmrEndpoint)
+ return fmt.Errorf("subs(%s) %s", s.stringImpl(), err.Error())
}
return nil
}
}
func (s *Subscription) Release() {
- xapp.Logger.Info("Subscription: Releasing %s", s)
- s.mutex.Lock()
- defer s.mutex.Unlock()
s.registry.DelSubscription(s.Seq)
- err := s.updateRouteImpl(DELETE)
+ err := s.UpdateRoute(DELETE)
if err != nil {
- xapp.Logger.Error("Registry: Failed to del route. SubId: %d, RmrEndpoint: %s", s.Seq, s.RmrEndpoint)
+ xapp.Logger.Error("%s", err.Error())
}
}