RICPLT-2985 Route update via registry/subscription entry 79/2179/1
authorJuha Hyttinen <juha.hyttinen@nokia.com>
Thu, 9 Jan 2020 12:14:16 +0000 (14:14 +0200)
committerJuha Hyttinen <juha.hyttinen@nokia.com>
Thu, 9 Jan 2020 12:45:24 +0000 (14:45 +0200)
Change-Id: I5aa4b71290e5405b9bf995b8528e49e900375cbc
Signed-off-by: Juha Hyttinen <juha.hyttinen@nokia.com>
pkg/control/control.go
pkg/control/main_test.go
pkg/control/registry.go
pkg/control/subscription.go [new file with mode: 0644]
pkg/control/tracker.go
pkg/control/transaction.go [new file with mode: 0644]

index 5a38656..5dd8f05 100755 (executable)
@@ -79,6 +79,7 @@ func init() {
 }
 
 func NewControl() *Control {
+
        registry := new(Registry)
        registry.Initialize(seedSN)
 
@@ -94,9 +95,14 @@ func NewControl() *Control {
        deleteHandle := rtmgrhandle.NewDeleteXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
        rtmgrClient := RtmgrClient{client, handle, deleteHandle}
 
+       rtmgrClientPtr := &rtmgrClient
+
+       //TODO: to make this better. Now it is just a hack.
+       registry.rtmgrClient = rtmgrClientPtr
+
        return &Control{e2ap: new(E2ap),
                registry:    registry,
-               rtmgrClient: &rtmgrClient,
+               rtmgrClient: rtmgrClientPtr,
                tracker:     tracker,
                timerMap:    timerMap,
                msgCounter:  0,
@@ -163,10 +169,9 @@ func (c *Control) handleSubscriptionRequest(params *xapp.RMRParams) {
                return
        }
 
-       /* Reserve a sequence number and set it in the payload */
-       subs := c.registry.ReserveSubscription(RmrEndpoint{*srcAddr, *srcPort}, params.Meid)
-       if subs == nil {
-               xapp.Logger.Error("SubReq: Failed to reserve sequence number. Dropping this msg. SubId: %v, Xid: %s", params.SubId, params.Xid)
+       subs, err := c.registry.ReserveSubscription(RmrEndpoint{*srcAddr, *srcPort}, params.Meid)
+       if err != nil {
+               xapp.Logger.Error("SubReq: %s, Dropping this msg.", err.Error())
                return
        }
 
@@ -174,7 +179,7 @@ func (c *Control) handleSubscriptionRequest(params *xapp.RMRParams) {
        err = c.e2ap.SetSubscriptionRequestSequenceNumber(params.Payload, subs.Seq)
        if err != nil {
                xapp.Logger.Error("SubReq: Unable to get Sequence Number from Payload. Dropping this msg. Err: %v, SubId: %v, Xid: %s, Payload %X", err, params.SubId, params.Xid, params.Payload)
-               c.registry.releaseSequenceNumber(subs.Seq)
+               c.registry.DelSubscription(subs.Seq)
                return
        }
 
@@ -184,18 +189,7 @@ func (c *Control) handleSubscriptionRequest(params *xapp.RMRParams) {
        _, err = c.tracker.TrackTransaction(subs, RmrEndpoint{*srcAddr, *srcPort}, params, responseReceived, forwardRespToXapp)
        if err != nil {
                xapp.Logger.Error("SubReq: %s, Dropping this msg.", err.Error())
-               c.registry.releaseSequenceNumber(subs.Seq)
-               return
-       }
-
-       // Update routing manager about the new subscription
-       subRouteAction := subs.SubRouteInfo(CREATE)
-       xapp.Logger.Info("SubReq: Starting routing manager update. SubId: %v, Xid: %s", params.SubId, params.Xid)
-
-       err = c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
-       if err != nil {
-               xapp.Logger.Error("SubReq: Failed to update routing manager. Dropping this msg. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
-               c.registry.releaseSequenceNumber(subs.Seq)
+               c.registry.DelSubscription(subs.Seq)
                return
        }
 
@@ -295,16 +289,9 @@ func (c *Control) handleSubscriptionFailure(params *xapp.RMRParams) {
 
        time.Sleep(3 * time.Second)
 
-       xapp.Logger.Info("SubFail: Starting routing manager update. SubId: %v, Xid: %s", params.SubId, params.Xid)
-       subRouteAction := subs.SubRouteInfo(DELETE)
-       err = c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
-       if err != nil {
-               xapp.Logger.Error("SubFail: Failed to update routing manager. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
-       }
-
        xapp.Logger.Info("SubFail: Deleting transaction record. SubId: %v, Xid: %s", params.SubId, params.Xid)
        transaction.Release()
-       if !c.registry.releaseSequenceNumber(payloadSeqNum) {
+       if !c.registry.DelSubscription(payloadSeqNum) {
                xapp.Logger.Error("SubFail: Failed to release sequency number. SubId: %v, Xid: %s", params.SubId, params.Xid)
        }
        return
@@ -500,16 +487,8 @@ func (c *Control) handleSubscriptionDeleteResponse(params *xapp.RMRParams) (err
                time.Sleep(3 * time.Second)
        }
 
-       xapp.Logger.Info("SubDelResp: Starting routing manager update. SubId: %v, Xid: %s", params.SubId, params.Xid)
-       subRouteAction := subs.SubRouteInfo(DELETE)
-       err = c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
-       if err != nil {
-               xapp.Logger.Error("SubDelResp: Failed to update routing manager. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
-               return
-       }
-
        xapp.Logger.Info("SubDelResp: Deleting transaction record. SubId: %v, Xid: %s", params.SubId, params.Xid)
-       if !c.registry.releaseSequenceNumber(payloadSeqNum) {
+       if !c.registry.DelSubscription(payloadSeqNum) {
                xapp.Logger.Error("SubDelResp: Failed to release sequency number. SubId: %v, Xid: %s", params.SubId, params.Xid)
                return
        }
@@ -574,17 +553,9 @@ func (c *Control) handleSubscriptionDeleteFailure(params *xapp.RMRParams) {
                time.Sleep(3 * time.Second)
        }
 
-       xapp.Logger.Info("SubDelFail: Starting routing manager update. SubId: %v, Xid: %s", params.SubId, params.Xid)
-       subRouteAction := subs.SubRouteInfo(DELETE)
-       err = c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
-       if err != nil {
-               xapp.Logger.Error("SubDelFail: Failed to update routing manager. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
-               return
-       }
-
        xapp.Logger.Info("SubDelFail: Deleting transaction record. SubId: %v, Xid: %s", params.SubId, params.Xid)
        transaction.Release()
-       if !c.registry.releaseSequenceNumber(payloadSeqNum) {
+       if !c.registry.DelSubscription(payloadSeqNum) {
                xapp.Logger.Error("SubDelFail: Failed to release sequency number. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
                return
        }
@@ -656,17 +627,9 @@ func (c *Control) handleSubscriptionDeleteRequestTimer(strId string, nbrId int,
                time.Sleep(3 * time.Second)
        }
 
-       xapp.Logger.Info("handleSubDelTimer: Starting routing manager update. SubId: %v, Xid: %s", subId, params.Xid)
-       subRouteAction := subs.SubRouteInfo(DELETE)
-       err := c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
-       if err != nil {
-               xapp.Logger.Error("handleSubDelTimer: Failed to update routing manager. Err: %v, SubId: %v, Xid: %s", err, subId, params.Xid)
-               return
-       }
-
        xapp.Logger.Info("handleSubDelTimer: Deleting transaction record. SubId: %v, Xid: %s", subId, params.Xid)
        transaction.Release()
-       if !c.registry.releaseSequenceNumber(subId) {
+       if !c.registry.DelSubscription(subId) {
                xapp.Logger.Error("handleSubDelTimer: Failed to release sequency number. SubId: %v, Xid: %s", subId, params.Xid)
        }
        return
index 69dadff..155f92c 100644 (file)
@@ -114,9 +114,6 @@ func initTestingMessageChannel() testingMessageChannel {
        return mc
 }
 
-//-----------------------------------------------------------------------------
-//
-//-----------------------------------------------------------------------------
 //-----------------------------------------------------------------------------
 //
 //-----------------------------------------------------------------------------
index 1adb4f7..2c5bd8c 100644 (file)
 package control
 
 import (
+       "fmt"
        "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
-       "strconv"
        "sync"
 )
 
-//-----------------------------------------------------------------------------
-//
-//-----------------------------------------------------------------------------
-type Subscription struct {
-       mutex  sync.Mutex
-       Seq    uint16
-       Active bool
-       //
-       Meid        *xapp.RMRMeid
-       RmrEndpoint // xapp endpoint
-       Trans       *Transaction
-}
-
-func (s *Subscription) String() string {
-       s.mutex.Lock()
-       defer s.mutex.Unlock()
-       return strconv.FormatUint(uint64(s.Seq), 10) + "/" + s.RmrEndpoint.String() + "/" + s.Meid.RanName
-}
-
-func (s *Subscription) Confirmed() {
-       s.mutex.Lock()
-       defer s.mutex.Unlock()
-       s.Active = true
-}
-
-func (s *Subscription) UnConfirmed() {
-       s.mutex.Lock()
-       defer s.mutex.Unlock()
-       s.Active = false
-}
-
-func (s *Subscription) IsConfirmed() bool {
-       s.mutex.Lock()
-       defer s.mutex.Unlock()
-       return s.Active
-}
-
-func (s *Subscription) SetTransaction(trans *Transaction) bool {
-       s.mutex.Lock()
-       defer s.mutex.Unlock()
-       if s.Trans == nil {
-               s.Trans = trans
-               return true
-       }
-       return false
-}
-
-func (s *Subscription) UnSetTransaction(trans *Transaction) bool {
-       s.mutex.Lock()
-       defer s.mutex.Unlock()
-       if trans == nil || trans == s.Trans {
-               s.Trans = nil
-               return true
-       }
-       return false
-}
-
-func (s *Subscription) GetTransaction() *Transaction {
-       s.mutex.Lock()
-       defer s.mutex.Unlock()
-       return s.Trans
-}
-
-func (s *Subscription) SubRouteInfo(act Action) SubRouteInfo {
-       s.mutex.Lock()
-       defer s.mutex.Unlock()
-       return SubRouteInfo{act, s.RmrEndpoint.Addr, s.RmrEndpoint.Port, s.Seq}
-}
-
 //-----------------------------------------------------------------------------
 //
 //-----------------------------------------------------------------------------
 type Registry struct {
-       register map[uint16]*Subscription
-       counter  uint16
-       mutex    sync.Mutex
+       mutex       sync.Mutex
+       register    map[uint16]*Subscription
+       counter     uint16
+       rtmgrClient *RtmgrClient
 }
 
 // This method should run as a constructor
@@ -110,7 +42,7 @@ func (r *Registry) Initialize(seedsn uint16) {
 }
 
 // Reserves and returns the next free sequence number
-func (r *Registry) ReserveSubscription(endPoint RmrEndpoint, meid *xapp.RMRMeid) *Subscription {
+func (r *Registry) ReserveSubscription(endPoint RmrEndpoint, meid *xapp.RMRMeid) (*Subscription, error) {
        // Check is current SequenceNumber valid
        // Allocate next SequenceNumber value and retry N times
        r.mutex.Lock()
@@ -125,17 +57,29 @@ func (r *Registry) ReserveSubscription(endPoint RmrEndpoint, meid *xapp.RMRMeid)
                        r.counter++
                }
                if _, ok := r.register[sequenceNumber]; ok == false {
-                       r.register[sequenceNumber] = &Subscription{
+                       subs := &Subscription{
                                Seq:         sequenceNumber,
                                Active:      false,
                                RmrEndpoint: endPoint,
                                Meid:        meid,
                                Trans:       nil,
                        }
-                       return r.register[sequenceNumber]
+                       r.register[sequenceNumber] = subs
+
+                       // Update routing
+                       r.mutex.Unlock()
+                       err := subs.UpdateRoute(CREATE, r.rtmgrClient)
+                       r.mutex.Lock()
+                       if err != nil {
+                               if _, ok := r.register[sequenceNumber]; ok {
+                                       delete(r.register, sequenceNumber)
+                               }
+                               return nil, err
+                       }
+                       return subs, nil
                }
        }
-       return nil
+       return nil, fmt.Errorf("Registry: Failed to reserves subcription. RmrEndpoint: %s, Meid: %s", endPoint, meid.RanName)
 }
 
 func (r *Registry) GetSubscription(sn uint16) *Subscription {
@@ -148,14 +92,21 @@ func (r *Registry) GetSubscription(sn uint16) *Subscription {
        return nil
 }
 
-//This function releases the given id as unused in the register
-func (r *Registry) releaseSequenceNumber(sn uint16) bool {
+func (r *Registry) DelSubscription(sn uint16) bool {
        r.mutex.Lock()
        defer r.mutex.Unlock()
        if _, ok := r.register[sn]; ok {
+               subs := r.register[sn]
                delete(r.register, sn)
+
+               // Update routing
+               r.mutex.Unlock()
+               err := subs.UpdateRoute(DELETE, r.rtmgrClient)
+               r.mutex.Lock()
+               if err != nil {
+                       xapp.Logger.Error("Registry: Failed to del route. SubId: %d, RmrEndpoint: %s", subs.Seq, subs.RmrEndpoint)
+               }
                return true
-       } else {
-               return false
        }
+       return false
 }
diff --git a/pkg/control/subscription.go b/pkg/control/subscription.go
new file mode 100644 (file)
index 0000000..9bbe3d4
--- /dev/null
@@ -0,0 +1,109 @@
+/*
+==================================================================================
+  Copyright (c) 2019 AT&T Intellectual Property.
+  Copyright (c) 2019 Nokia
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+
+package control
+
+import (
+       "fmt"
+       "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
+       "strconv"
+       "sync"
+)
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+type Subscription struct {
+       mutex  sync.Mutex
+       Seq    uint16
+       Active bool
+       //
+       Meid        *xapp.RMRMeid
+       RmrEndpoint // xapp endpoint. Now only one xapp can have relation to single subscription. To be changed in merge
+       Trans       *Transaction
+}
+
+func (s *Subscription) String() string {
+       s.mutex.Lock()
+       defer s.mutex.Unlock()
+       return strconv.FormatUint(uint64(s.Seq), 10) + "/" + s.RmrEndpoint.String() + "/" + s.Meid.RanName
+}
+
+func (s *Subscription) Confirmed() {
+       s.mutex.Lock()
+       defer s.mutex.Unlock()
+       s.Active = true
+}
+
+func (s *Subscription) UnConfirmed() {
+       s.mutex.Lock()
+       defer s.mutex.Unlock()
+       s.Active = false
+}
+
+func (s *Subscription) IsConfirmed() bool {
+       s.mutex.Lock()
+       defer s.mutex.Unlock()
+       return s.Active
+}
+
+func (s *Subscription) SetTransaction(trans *Transaction) error {
+       s.mutex.Lock()
+       defer s.mutex.Unlock()
+
+       subString := strconv.FormatUint(uint64(s.Seq), 10) + "/" + s.RmrEndpoint.String() + "/" + s.Meid.RanName
+
+       if (s.RmrEndpoint.Addr != trans.RmrEndpoint.Addr) || (s.RmrEndpoint.Port != trans.RmrEndpoint.Port) {
+               return fmt.Errorf("Subscription: %s endpoint mismatch with trans: %s", subString, trans)
+       }
+       if s.Trans != nil {
+               return fmt.Errorf("Subscription: %s trans %s exist, can not register %s", subString, s.Trans, trans)
+       }
+       trans.Subs = s
+       s.Trans = trans
+       return nil
+}
+
+func (s *Subscription) UnSetTransaction(trans *Transaction) bool {
+       s.mutex.Lock()
+       defer s.mutex.Unlock()
+       if trans == nil || trans == s.Trans {
+               s.Trans = nil
+               return true
+       }
+       return false
+}
+
+func (s *Subscription) GetTransaction() *Transaction {
+       s.mutex.Lock()
+       defer s.mutex.Unlock()
+       return s.Trans
+}
+
+func (s *Subscription) UpdateRoute(act Action, rtmgrClient *RtmgrClient) error {
+       s.mutex.Lock()
+       defer s.mutex.Unlock()
+       xapp.Logger.Info("Subscription: Starting routing manager route add. SubId: %d, RmrEndpoint: %s", s.Seq, s.RmrEndpoint)
+       subRouteAction := SubRouteInfo{act, s.RmrEndpoint.Addr, s.RmrEndpoint.Port, s.Seq}
+       err := rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
+       if err != nil {
+               return fmt.Errorf("Subscription: Failed to add route. SubId: %d, RmrEndpoint: %s", s.Seq, s.RmrEndpoint)
+       }
+       return nil
+}
index 0e6941d..087b781 100644 (file)
@@ -22,82 +22,15 @@ package control
 import (
        "fmt"
        "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
-       "strconv"
        "sync"
 )
 
-//-----------------------------------------------------------------------------
-//
-//-----------------------------------------------------------------------------
-type TransactionXappKey struct {
-       RmrEndpoint
-       Xid string // xapp xid in req
-}
-
-func (key *TransactionXappKey) String() string {
-       return key.RmrEndpoint.String() + "/" + key.Xid
-}
-
-//-----------------------------------------------------------------------------
-//
-//-----------------------------------------------------------------------------
-type Transaction struct {
-       tracker           *Tracker // tracker instance
-       Subs              *Subscription
-       RmrEndpoint       RmrEndpoint
-       Xid               string          // xapp xid in req
-       OrigParams        *xapp.RMRParams // request orginal params
-       RespReceived      bool
-       ForwardRespToXapp bool
-       mutex             sync.Mutex
-}
-
-func (t *Transaction) String() string {
-       t.mutex.Lock()
-       defer t.mutex.Unlock()
-       var subId string = "?"
-       if t.Subs != nil {
-               subId = strconv.FormatUint(uint64(t.Subs.Seq), 10)
-       }
-       return subId + "/" + t.RmrEndpoint.String() + "/" + t.Xid
-}
-
-func (t *Transaction) CheckResponseReceived() bool {
-       t.mutex.Lock()
-       defer t.mutex.Unlock()
-       if t.RespReceived == false {
-               t.RespReceived = true
-               return false
-       }
-       return true
-}
-
-func (t *Transaction) RetryTransaction() {
-       t.mutex.Lock()
-       defer t.mutex.Unlock()
-       t.RespReceived = false
-}
-
-func (t *Transaction) Release() {
-       t.mutex.Lock()
-       defer t.mutex.Unlock()
-       if t.Subs != nil {
-               t.Subs.UnSetTransaction(t)
-       }
-       if t.tracker != nil {
-               xappkey := TransactionXappKey{t.RmrEndpoint, t.Xid}
-               t.tracker.UnTrackTransaction(xappkey)
-       }
-       t.Subs = nil
-       t.tracker = nil
-}
-
 //-----------------------------------------------------------------------------
 //
 //-----------------------------------------------------------------------------
 type Tracker struct {
-       transactionXappTable map[TransactionXappKey]*Transaction
        mutex                sync.Mutex
+       transactionXappTable map[TransactionXappKey]*Transaction
 }
 
 func (t *Tracker) Init() {
@@ -125,18 +58,10 @@ func (t *Tracker) TrackTransaction(subs *Subscription, endpoint RmrEndpoint, par
                return nil, err
        }
 
-       if subs.SetTransaction(trans) == false {
-               othTrans := subs.GetTransaction()
-               err := fmt.Errorf("Tracker: Subscription %s got already transaction ongoing: %s, transaction %s not created", subs, othTrans, trans)
-               return nil, err
-       }
-       trans.Subs = subs
-       if (trans.Subs.RmrEndpoint.Addr != trans.RmrEndpoint.Addr) || (trans.Subs.RmrEndpoint.Port != trans.RmrEndpoint.Port) {
-               err := fmt.Errorf("Tracker: Subscription endpoint %s mismatch with trans: %s", subs, trans)
-               trans.Subs.UnSetTransaction(nil)
+       err := subs.SetTransaction(trans)
+       if err != nil {
                return nil, err
        }
-
        trans.tracker = t
        t.transactionXappTable[xappkey] = trans
        return trans, nil
diff --git a/pkg/control/transaction.go b/pkg/control/transaction.go
new file mode 100644 (file)
index 0000000..f686b44
--- /dev/null
@@ -0,0 +1,92 @@
+/*
+==================================================================================
+  Copyright (c) 2019 AT&T Intellectual Property.
+  Copyright (c) 2019 Nokia
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+
+package control
+
+import (
+       "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
+       "strconv"
+       "sync"
+)
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+type TransactionXappKey struct {
+       RmrEndpoint
+       Xid string // xapp xid in req
+}
+
+func (key *TransactionXappKey) String() string {
+       return key.RmrEndpoint.String() + "/" + key.Xid
+}
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+type Transaction struct {
+       mutex             sync.Mutex
+       tracker           *Tracker // tracker instance
+       Subs              *Subscription
+       RmrEndpoint       RmrEndpoint
+       Xid               string          // xapp xid in req
+       OrigParams        *xapp.RMRParams // request orginal params
+       RespReceived      bool
+       ForwardRespToXapp bool
+}
+
+func (t *Transaction) String() string {
+       t.mutex.Lock()
+       defer t.mutex.Unlock()
+       var subId string = "?"
+       if t.Subs != nil {
+               subId = strconv.FormatUint(uint64(t.Subs.Seq), 10)
+       }
+       return subId + "/" + t.RmrEndpoint.String() + "/" + t.Xid
+}
+
+func (t *Transaction) CheckResponseReceived() bool {
+       t.mutex.Lock()
+       defer t.mutex.Unlock()
+       if t.RespReceived == false {
+               t.RespReceived = true
+               return false
+       }
+       return true
+}
+
+func (t *Transaction) RetryTransaction() {
+       t.mutex.Lock()
+       defer t.mutex.Unlock()
+       t.RespReceived = false
+}
+
+func (t *Transaction) Release() {
+       t.mutex.Lock()
+       defer t.mutex.Unlock()
+       if t.Subs != nil {
+               t.Subs.UnSetTransaction(t)
+       }
+       if t.tracker != nil {
+               xappkey := TransactionXappKey{t.RmrEndpoint, t.Xid}
+               t.tracker.UnTrackTransaction(xappkey)
+       }
+       t.Subs = nil
+       t.tracker = nil
+}