Subscription release 22/2222/1
authorJuha Hyttinen <juha.hyttinen@nokia.com>
Tue, 14 Jan 2020 15:08:43 +0000 (17:08 +0200)
committerJuha Hyttinen <juha.hyttinen@nokia.com>
Tue, 14 Jan 2020 15:08:58 +0000 (17:08 +0200)
Change-Id: I2f2e1343dbad64c38afc1ca1f9afa7c478c5c8bb
Signed-off-by: Juha Hyttinen <juha.hyttinen@nokia.com>
pkg/control/control.go
pkg/control/registry.go
pkg/control/subscription.go
pkg/control/transaction.go

index 1bd27eb..eb2852a 100755 (executable)
@@ -45,7 +45,6 @@ var maxSubDelReqTryCount uint64 = 2 // Initial try + retry
 type Control struct {
        e2ap         *E2ap
        registry     *Registry
-       rtmgrClient  *RtmgrClient
        tracker      *Tracker
        timerMap     *TimerMap
        rmrSendMutex sync.Mutex
@@ -85,8 +84,15 @@ func init() {
 
 func NewControl() *Control {
 
+       transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
+       client := rtmgrclient.New(transport, strfmt.Default)
+       handle := rtmgrhandle.NewProvideXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
+       deleteHandle := rtmgrhandle.NewDeleteXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
+       rtmgrClient := RtmgrClient{client, handle, deleteHandle}
+
        registry := new(Registry)
        registry.Initialize(seedSN)
+       registry.rtmgrClient = &rtmgrClient
 
        tracker := new(Tracker)
        tracker.Init()
@@ -94,23 +100,11 @@ func NewControl() *Control {
        timerMap := new(TimerMap)
        timerMap.Init()
 
-       transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
-       client := rtmgrclient.New(transport, strfmt.Default)
-       handle := rtmgrhandle.NewProvideXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
-       deleteHandle := rtmgrhandle.NewDeleteXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
-       rtmgrClient := RtmgrClient{client, handle, deleteHandle}
-
-       rtmgrClientPtr := &rtmgrClient
-
-       //TODO: to make this better. Now it is just a hack.
-       registry.rtmgrClient = rtmgrClientPtr
-
        return &Control{e2ap: new(E2ap),
-               registry:    registry,
-               rtmgrClient: rtmgrClientPtr,
-               tracker:     tracker,
-               timerMap:    timerMap,
-               msgCounter:  0,
+               registry:   registry,
+               tracker:    tracker,
+               timerMap:   timerMap,
+               msgCounter: 0,
        }
 }
 
@@ -234,7 +228,7 @@ func (c *Control) handleSubscriptionRequest(params *RMRParams) {
        err = subs.SetTransaction(trans)
        if err != nil {
                xapp.Logger.Error("SubReq: %s, Dropping this msg. %s", err.Error(), trans)
-               c.registry.DelSubscription(subs.Seq)
+               subs.Release()
                trans.Release()
                return
        }
@@ -252,7 +246,7 @@ func (c *Control) handleSubscriptionRequest(params *RMRParams) {
        packedData, err := c.e2ap.PackSubscriptionRequest(trans.SubReqMsg)
        if err != nil {
                xapp.Logger.Error("SubReq: %s for trans %s", err.Error(), trans)
-               c.registry.DelSubscription(subs.Seq)
+               subs.Release()
                trans.Release()
                return
        }
@@ -397,9 +391,7 @@ func (c *Control) handleSubscriptionFailure(params *RMRParams) {
        }
 
        trans.Release()
-       if !c.registry.DelSubscription(subs.GetSubId()) {
-               xapp.Logger.Error("SubFail: Failed to release sequency number. %s", subs)
-       }
+       subs.Release()
        return
 }
 
@@ -430,7 +422,7 @@ func (c *Control) handleSubscriptionRequestTimer(strId string, nbrId int, tryCou
 
                trans.RetryTransaction()
 
-               c.rmrSend("SubReq(SubReq timer) to E2T", subs, trans, trans.Payload, trans.PayloadLen)
+               c.rmrSend("SubReq(SubReq timer retransmit) to E2T", subs, trans, trans.Payload, trans.PayloadLen)
 
                tryCount++
                c.timerMap.StartTimer("RIC_SUB_REQ", int(subs.GetSubId()), subReqTime, tryCount, c.handleSubscriptionRequestTimer)
@@ -451,7 +443,7 @@ func (c *Control) handleSubscriptionRequestTimer(strId string, nbrId int, tryCou
        if err != nil {
                xapp.Logger.Error("SubReq timeout: %s, Dropping this msg.", err.Error())
                //TODO improve error handling. Important at least in merge
-               c.registry.DelSubscription(subs.GetSubId())
+               subs.Release()
                return
        }
 
@@ -464,7 +456,7 @@ func (c *Control) handleSubscriptionRequestTimer(strId string, nbrId int, tryCou
                xapp.Logger.Error("SubReq timeout: Packing SubDelReq failed. Err: %v", err)
                //TODO improve error handling. Important at least in merge
                deltrans.Release()
-               c.registry.DelSubscription(subs.GetSubId())
+               subs.Release()
                return
        }
        deltrans.PayloadLen = len(packedData.Buf)
@@ -591,18 +583,13 @@ func (c *Control) handleSubscriptionDeleteResponse(params *RMRParams) (err error
                return
        }
 
-       trans.Release()
-
        if trans.ForwardRespToXapp == true {
                c.rmrReplyToSender("SubDelResp to xapp", subs, trans, params.Mtype, params.Payload, params.PayloadLen)
                time.Sleep(3 * time.Second)
        }
 
-       xapp.Logger.Info("SubDelResp: Deleting trans record. SubId: %v, Xid: %s", subs.GetSubId(), trans.GetXid())
-       if !c.registry.DelSubscription(subs.GetSubId()) {
-               xapp.Logger.Error("SubDelResp: Failed to release sequency number. SubId: %v, Xid: %s", subs.GetSubId(), trans.GetXid())
-               return
-       }
+       trans.Release()
+       subs.Release()
        return
 }
 
@@ -648,12 +635,8 @@ func (c *Control) handleSubscriptionDeleteFailure(params *RMRParams) {
                time.Sleep(3 * time.Second)
        }
 
-       xapp.Logger.Info("SubDelFail: Deleting trans record. SubId: %v, Xid: %s", subs.GetSubId(), trans.GetXid())
        trans.Release()
-       if !c.registry.DelSubscription(subs.GetSubId()) {
-               xapp.Logger.Error("SubDelFail: Failed to release sequency number. Err: %v, SubId: %v, Xid: %s", err, subs.GetSubId(), trans.GetXid())
-               return
-       }
+       subs.Release()
        return
 }
 
@@ -679,13 +662,9 @@ func (c *Control) handleSubscriptionDeleteRequestTimer(strId string, nbrId int,
        }
 
        if tryCount < maxSubDelReqTryCount {
-               xapp.Logger.Info("SubDelReq timeout: Resending SubDelReq to E2T: Mtype: %v, SubId: %v, Xid %s, Meid %v", trans.GetMtype(), subs.GetSubId(), trans.GetXid(), trans.GetMeid())
                // Set possible to handle new response for the subId
-
                trans.RetryTransaction()
-
-               c.rmrSend("SubDelReq(SubDelReq timer) to E2T", subs, trans, trans.Payload, trans.PayloadLen)
-
+               c.rmrSend("SubDelReq(SubDelReq timer retransmit) to E2T", subs, trans, trans.Payload, trans.PayloadLen)
                tryCount++
                c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()), subReqTime, tryCount, c.handleSubscriptionDeleteRequestTimer)
                return
@@ -705,11 +684,7 @@ func (c *Control) handleSubscriptionDeleteRequestTimer(strId string, nbrId int,
                time.Sleep(3 * time.Second)
 
        }
-
-       xapp.Logger.Info("SubDelReq timeout: Deleting trans record. SubId: %v, Xid: %s", subs.GetSubId(), trans.GetXid())
        trans.Release()
-       if !c.registry.DelSubscription(subs.GetSubId()) {
-               xapp.Logger.Error("SubDelReq timeout: Failed to release sequency number. SubId: %v, Xid: %s", subs.GetSubId(), trans.GetXid())
-       }
+       subs.Release()
        return
 }
index 0970a3a..a1fe4b8 100644 (file)
@@ -58,6 +58,7 @@ func (r *Registry) ReserveSubscription(endPoint *RmrEndpoint, meid *xapp.RMRMeid
                }
                if _, ok := r.register[sequenceNumber]; ok == false {
                        subs := &Subscription{
+                               registry:    r,
                                Seq:         sequenceNumber,
                                Active:      false,
                                RmrEndpoint: *endPoint,
@@ -68,7 +69,7 @@ func (r *Registry) ReserveSubscription(endPoint *RmrEndpoint, meid *xapp.RMRMeid
 
                        // Update routing
                        r.mutex.Unlock()
-                       err := subs.UpdateRoute(CREATE, r.rtmgrClient)
+                       err := subs.UpdateRoute(CREATE)
                        r.mutex.Lock()
                        if err != nil {
                                if _, ok := r.register[sequenceNumber]; ok {
@@ -96,16 +97,7 @@ func (r *Registry) DelSubscription(sn uint16) bool {
        r.mutex.Lock()
        defer r.mutex.Unlock()
        if _, ok := r.register[sn]; ok {
-               subs := r.register[sn]
                delete(r.register, sn)
-
-               // Update routing
-               r.mutex.Unlock()
-               err := subs.UpdateRoute(DELETE, r.rtmgrClient)
-               r.mutex.Lock()
-               if err != nil {
-                       xapp.Logger.Error("Registry: Failed to del route. SubId: %d, RmrEndpoint: %s", subs.Seq, subs.RmrEndpoint)
-               }
                return true
        }
        return false
index 6ed3d32..6a72f74 100644 (file)
@@ -30,9 +30,10 @@ import (
 //
 //-----------------------------------------------------------------------------
 type Subscription struct {
-       mutex  sync.Mutex
-       Seq    uint16
-       Active bool
+       mutex    sync.Mutex
+       registry *Registry
+       Seq      uint16
+       Active   bool
        //
        Meid        *xapp.RMRMeid
        RmrEndpoint // xapp endpoint. Now only one xapp can have relation to single subscription. To be changed in merge
@@ -111,14 +112,29 @@ func (s *Subscription) GetTransaction() *Transaction {
        return s.Trans
 }
 
-func (s *Subscription) UpdateRoute(act Action, rtmgrClient *RtmgrClient) error {
-       s.mutex.Lock()
-       defer s.mutex.Unlock()
+func (s *Subscription) updateRouteImpl(act Action) error {
        xapp.Logger.Info("Subscription: Starting routing manager route add. SubId: %d, RmrEndpoint: %s", s.Seq, s.RmrEndpoint)
        subRouteAction := SubRouteInfo{act, s.RmrEndpoint.Addr, s.RmrEndpoint.Port, s.Seq}
-       err := rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
+       err := s.registry.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
        if err != nil {
                return fmt.Errorf("Subscription: Failed to add route. SubId: %d, RmrEndpoint: %s", s.Seq, s.RmrEndpoint)
        }
        return nil
 }
+
+func (s *Subscription) UpdateRoute(act Action) error {
+       s.mutex.Lock()
+       defer s.mutex.Unlock()
+       return s.updateRouteImpl(act)
+}
+
+func (s *Subscription) Release() {
+       xapp.Logger.Info("Subscription: Releasing %s", s)
+       s.mutex.Lock()
+       defer s.mutex.Unlock()
+       s.registry.DelSubscription(s.Seq)
+       err := s.updateRouteImpl(DELETE)
+       if err != nil {
+               xapp.Logger.Error("Registry: Failed to del route. SubId: %d, RmrEndpoint: %s", s.Seq, s.RmrEndpoint)
+       }
+}
index 2cc68b1..5d13d8c 100644 (file)
@@ -113,6 +113,7 @@ func (t *Transaction) RetryTransaction() {
 }
 
 func (t *Transaction) Release() {
+       xapp.Logger.Info("Transaction: Releasing %s", t)
        t.mutex.Lock()
        defer t.mutex.Unlock()
        if t.Subs != nil {