From 0388dd945789dae802aaa93c5062e3ae4c45ddf1 Mon Sep 17 00:00:00 2001 From: Juha Hyttinen Date: Thu, 9 Jan 2020 14:14:16 +0200 Subject: [PATCH] RICPLT-2985 Route update via registry/subscription entry Change-Id: I5aa4b71290e5405b9bf995b8528e49e900375cbc Signed-off-by: Juha Hyttinen --- pkg/control/control.go | 69 +++++++-------------------- pkg/control/main_test.go | 3 -- pkg/control/registry.go | 113 +++++++++++++------------------------------- pkg/control/subscription.go | 109 ++++++++++++++++++++++++++++++++++++++++++ pkg/control/tracker.go | 81 ++----------------------------- pkg/control/transaction.go | 92 ++++++++++++++++++++++++++++++++++++ 6 files changed, 252 insertions(+), 215 deletions(-) create mode 100644 pkg/control/subscription.go create mode 100644 pkg/control/transaction.go diff --git a/pkg/control/control.go b/pkg/control/control.go index 5a38656..5dd8f05 100755 --- a/pkg/control/control.go +++ b/pkg/control/control.go @@ -79,6 +79,7 @@ func init() { } func NewControl() *Control { + registry := new(Registry) registry.Initialize(seedSN) @@ -94,9 +95,14 @@ func NewControl() *Control { deleteHandle := rtmgrhandle.NewDeleteXappSubscriptionHandleParamsWithTimeout(10 * time.Second) rtmgrClient := RtmgrClient{client, handle, deleteHandle} + rtmgrClientPtr := &rtmgrClient + + //TODO: to make this better. Now it is just a hack. + registry.rtmgrClient = rtmgrClientPtr + return &Control{e2ap: new(E2ap), registry: registry, - rtmgrClient: &rtmgrClient, + rtmgrClient: rtmgrClientPtr, tracker: tracker, timerMap: timerMap, msgCounter: 0, @@ -163,10 +169,9 @@ func (c *Control) handleSubscriptionRequest(params *xapp.RMRParams) { return } - /* Reserve a sequence number and set it in the payload */ - subs := c.registry.ReserveSubscription(RmrEndpoint{*srcAddr, *srcPort}, params.Meid) - if subs == nil { - xapp.Logger.Error("SubReq: Failed to reserve sequence number. Dropping this msg. SubId: %v, Xid: %s", params.SubId, params.Xid) + subs, err := c.registry.ReserveSubscription(RmrEndpoint{*srcAddr, *srcPort}, params.Meid) + if err != nil { + xapp.Logger.Error("SubReq: %s, Dropping this msg.", err.Error()) return } @@ -174,7 +179,7 @@ func (c *Control) handleSubscriptionRequest(params *xapp.RMRParams) { err = c.e2ap.SetSubscriptionRequestSequenceNumber(params.Payload, subs.Seq) if err != nil { xapp.Logger.Error("SubReq: Unable to get Sequence Number from Payload. Dropping this msg. Err: %v, SubId: %v, Xid: %s, Payload %X", err, params.SubId, params.Xid, params.Payload) - c.registry.releaseSequenceNumber(subs.Seq) + c.registry.DelSubscription(subs.Seq) return } @@ -184,18 +189,7 @@ func (c *Control) handleSubscriptionRequest(params *xapp.RMRParams) { _, err = c.tracker.TrackTransaction(subs, RmrEndpoint{*srcAddr, *srcPort}, params, responseReceived, forwardRespToXapp) if err != nil { xapp.Logger.Error("SubReq: %s, Dropping this msg.", err.Error()) - c.registry.releaseSequenceNumber(subs.Seq) - return - } - - // Update routing manager about the new subscription - subRouteAction := subs.SubRouteInfo(CREATE) - xapp.Logger.Info("SubReq: Starting routing manager update. SubId: %v, Xid: %s", params.SubId, params.Xid) - - err = c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction) - if err != nil { - xapp.Logger.Error("SubReq: Failed to update routing manager. Dropping this msg. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid) - c.registry.releaseSequenceNumber(subs.Seq) + c.registry.DelSubscription(subs.Seq) return } @@ -295,16 +289,9 @@ func (c *Control) handleSubscriptionFailure(params *xapp.RMRParams) { time.Sleep(3 * time.Second) - xapp.Logger.Info("SubFail: Starting routing manager update. SubId: %v, Xid: %s", params.SubId, params.Xid) - subRouteAction := subs.SubRouteInfo(DELETE) - err = c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction) - if err != nil { - xapp.Logger.Error("SubFail: Failed to update routing manager. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid) - } - xapp.Logger.Info("SubFail: Deleting transaction record. SubId: %v, Xid: %s", params.SubId, params.Xid) transaction.Release() - if !c.registry.releaseSequenceNumber(payloadSeqNum) { + if !c.registry.DelSubscription(payloadSeqNum) { xapp.Logger.Error("SubFail: Failed to release sequency number. SubId: %v, Xid: %s", params.SubId, params.Xid) } return @@ -500,16 +487,8 @@ func (c *Control) handleSubscriptionDeleteResponse(params *xapp.RMRParams) (err time.Sleep(3 * time.Second) } - xapp.Logger.Info("SubDelResp: Starting routing manager update. SubId: %v, Xid: %s", params.SubId, params.Xid) - subRouteAction := subs.SubRouteInfo(DELETE) - err = c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction) - if err != nil { - xapp.Logger.Error("SubDelResp: Failed to update routing manager. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid) - return - } - xapp.Logger.Info("SubDelResp: Deleting transaction record. SubId: %v, Xid: %s", params.SubId, params.Xid) - if !c.registry.releaseSequenceNumber(payloadSeqNum) { + if !c.registry.DelSubscription(payloadSeqNum) { xapp.Logger.Error("SubDelResp: Failed to release sequency number. SubId: %v, Xid: %s", params.SubId, params.Xid) return } @@ -574,17 +553,9 @@ func (c *Control) handleSubscriptionDeleteFailure(params *xapp.RMRParams) { time.Sleep(3 * time.Second) } - xapp.Logger.Info("SubDelFail: Starting routing manager update. SubId: %v, Xid: %s", params.SubId, params.Xid) - subRouteAction := subs.SubRouteInfo(DELETE) - err = c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction) - if err != nil { - xapp.Logger.Error("SubDelFail: Failed to update routing manager. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid) - return - } - xapp.Logger.Info("SubDelFail: Deleting transaction record. SubId: %v, Xid: %s", params.SubId, params.Xid) transaction.Release() - if !c.registry.releaseSequenceNumber(payloadSeqNum) { + if !c.registry.DelSubscription(payloadSeqNum) { xapp.Logger.Error("SubDelFail: Failed to release sequency number. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid) return } @@ -656,17 +627,9 @@ func (c *Control) handleSubscriptionDeleteRequestTimer(strId string, nbrId int, time.Sleep(3 * time.Second) } - xapp.Logger.Info("handleSubDelTimer: Starting routing manager update. SubId: %v, Xid: %s", subId, params.Xid) - subRouteAction := subs.SubRouteInfo(DELETE) - err := c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction) - if err != nil { - xapp.Logger.Error("handleSubDelTimer: Failed to update routing manager. Err: %v, SubId: %v, Xid: %s", err, subId, params.Xid) - return - } - xapp.Logger.Info("handleSubDelTimer: Deleting transaction record. SubId: %v, Xid: %s", subId, params.Xid) transaction.Release() - if !c.registry.releaseSequenceNumber(subId) { + if !c.registry.DelSubscription(subId) { xapp.Logger.Error("handleSubDelTimer: Failed to release sequency number. SubId: %v, Xid: %s", subId, params.Xid) } return diff --git a/pkg/control/main_test.go b/pkg/control/main_test.go index 69dadff..155f92c 100644 --- a/pkg/control/main_test.go +++ b/pkg/control/main_test.go @@ -114,9 +114,6 @@ func initTestingMessageChannel() testingMessageChannel { return mc } -//----------------------------------------------------------------------------- -// -//----------------------------------------------------------------------------- //----------------------------------------------------------------------------- // //----------------------------------------------------------------------------- diff --git a/pkg/control/registry.go b/pkg/control/registry.go index 1adb4f7..2c5bd8c 100644 --- a/pkg/control/registry.go +++ b/pkg/control/registry.go @@ -20,87 +20,19 @@ package control import ( + "fmt" "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" - "strconv" "sync" ) -//----------------------------------------------------------------------------- -// -//----------------------------------------------------------------------------- -type Subscription struct { - mutex sync.Mutex - Seq uint16 - Active bool - // - Meid *xapp.RMRMeid - RmrEndpoint // xapp endpoint - Trans *Transaction -} - -func (s *Subscription) String() string { - s.mutex.Lock() - defer s.mutex.Unlock() - return strconv.FormatUint(uint64(s.Seq), 10) + "/" + s.RmrEndpoint.String() + "/" + s.Meid.RanName -} - -func (s *Subscription) Confirmed() { - s.mutex.Lock() - defer s.mutex.Unlock() - s.Active = true -} - -func (s *Subscription) UnConfirmed() { - s.mutex.Lock() - defer s.mutex.Unlock() - s.Active = false -} - -func (s *Subscription) IsConfirmed() bool { - s.mutex.Lock() - defer s.mutex.Unlock() - return s.Active -} - -func (s *Subscription) SetTransaction(trans *Transaction) bool { - s.mutex.Lock() - defer s.mutex.Unlock() - if s.Trans == nil { - s.Trans = trans - return true - } - return false -} - -func (s *Subscription) UnSetTransaction(trans *Transaction) bool { - s.mutex.Lock() - defer s.mutex.Unlock() - if trans == nil || trans == s.Trans { - s.Trans = nil - return true - } - return false -} - -func (s *Subscription) GetTransaction() *Transaction { - s.mutex.Lock() - defer s.mutex.Unlock() - return s.Trans -} - -func (s *Subscription) SubRouteInfo(act Action) SubRouteInfo { - s.mutex.Lock() - defer s.mutex.Unlock() - return SubRouteInfo{act, s.RmrEndpoint.Addr, s.RmrEndpoint.Port, s.Seq} -} - //----------------------------------------------------------------------------- // //----------------------------------------------------------------------------- type Registry struct { - register map[uint16]*Subscription - counter uint16 - mutex sync.Mutex + mutex sync.Mutex + register map[uint16]*Subscription + counter uint16 + rtmgrClient *RtmgrClient } // This method should run as a constructor @@ -110,7 +42,7 @@ func (r *Registry) Initialize(seedsn uint16) { } // Reserves and returns the next free sequence number -func (r *Registry) ReserveSubscription(endPoint RmrEndpoint, meid *xapp.RMRMeid) *Subscription { +func (r *Registry) ReserveSubscription(endPoint RmrEndpoint, meid *xapp.RMRMeid) (*Subscription, error) { // Check is current SequenceNumber valid // Allocate next SequenceNumber value and retry N times r.mutex.Lock() @@ -125,17 +57,29 @@ func (r *Registry) ReserveSubscription(endPoint RmrEndpoint, meid *xapp.RMRMeid) r.counter++ } if _, ok := r.register[sequenceNumber]; ok == false { - r.register[sequenceNumber] = &Subscription{ + subs := &Subscription{ Seq: sequenceNumber, Active: false, RmrEndpoint: endPoint, Meid: meid, Trans: nil, } - return r.register[sequenceNumber] + r.register[sequenceNumber] = subs + + // Update routing + r.mutex.Unlock() + err := subs.UpdateRoute(CREATE, r.rtmgrClient) + r.mutex.Lock() + if err != nil { + if _, ok := r.register[sequenceNumber]; ok { + delete(r.register, sequenceNumber) + } + return nil, err + } + return subs, nil } } - return nil + return nil, fmt.Errorf("Registry: Failed to reserves subcription. RmrEndpoint: %s, Meid: %s", endPoint, meid.RanName) } func (r *Registry) GetSubscription(sn uint16) *Subscription { @@ -148,14 +92,21 @@ func (r *Registry) GetSubscription(sn uint16) *Subscription { return nil } -//This function releases the given id as unused in the register -func (r *Registry) releaseSequenceNumber(sn uint16) bool { +func (r *Registry) DelSubscription(sn uint16) bool { r.mutex.Lock() defer r.mutex.Unlock() if _, ok := r.register[sn]; ok { + subs := r.register[sn] delete(r.register, sn) + + // Update routing + r.mutex.Unlock() + err := subs.UpdateRoute(DELETE, r.rtmgrClient) + r.mutex.Lock() + if err != nil { + xapp.Logger.Error("Registry: Failed to del route. SubId: %d, RmrEndpoint: %s", subs.Seq, subs.RmrEndpoint) + } return true - } else { - return false } + return false } diff --git a/pkg/control/subscription.go b/pkg/control/subscription.go new file mode 100644 index 0000000..9bbe3d4 --- /dev/null +++ b/pkg/control/subscription.go @@ -0,0 +1,109 @@ +/* +================================================================================== + Copyright (c) 2019 AT&T Intellectual Property. + Copyright (c) 2019 Nokia + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +================================================================================== +*/ + +package control + +import ( + "fmt" + "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" + "strconv" + "sync" +) + +//----------------------------------------------------------------------------- +// +//----------------------------------------------------------------------------- +type Subscription struct { + mutex sync.Mutex + Seq uint16 + Active bool + // + Meid *xapp.RMRMeid + RmrEndpoint // xapp endpoint. Now only one xapp can have relation to single subscription. To be changed in merge + Trans *Transaction +} + +func (s *Subscription) String() string { + s.mutex.Lock() + defer s.mutex.Unlock() + return strconv.FormatUint(uint64(s.Seq), 10) + "/" + s.RmrEndpoint.String() + "/" + s.Meid.RanName +} + +func (s *Subscription) Confirmed() { + s.mutex.Lock() + defer s.mutex.Unlock() + s.Active = true +} + +func (s *Subscription) UnConfirmed() { + s.mutex.Lock() + defer s.mutex.Unlock() + s.Active = false +} + +func (s *Subscription) IsConfirmed() bool { + s.mutex.Lock() + defer s.mutex.Unlock() + return s.Active +} + +func (s *Subscription) SetTransaction(trans *Transaction) error { + s.mutex.Lock() + defer s.mutex.Unlock() + + subString := strconv.FormatUint(uint64(s.Seq), 10) + "/" + s.RmrEndpoint.String() + "/" + s.Meid.RanName + + if (s.RmrEndpoint.Addr != trans.RmrEndpoint.Addr) || (s.RmrEndpoint.Port != trans.RmrEndpoint.Port) { + return fmt.Errorf("Subscription: %s endpoint mismatch with trans: %s", subString, trans) + } + if s.Trans != nil { + return fmt.Errorf("Subscription: %s trans %s exist, can not register %s", subString, s.Trans, trans) + } + trans.Subs = s + s.Trans = trans + return nil +} + +func (s *Subscription) UnSetTransaction(trans *Transaction) bool { + s.mutex.Lock() + defer s.mutex.Unlock() + if trans == nil || trans == s.Trans { + s.Trans = nil + return true + } + return false +} + +func (s *Subscription) GetTransaction() *Transaction { + s.mutex.Lock() + defer s.mutex.Unlock() + return s.Trans +} + +func (s *Subscription) UpdateRoute(act Action, rtmgrClient *RtmgrClient) error { + s.mutex.Lock() + defer s.mutex.Unlock() + xapp.Logger.Info("Subscription: Starting routing manager route add. SubId: %d, RmrEndpoint: %s", s.Seq, s.RmrEndpoint) + subRouteAction := SubRouteInfo{act, s.RmrEndpoint.Addr, s.RmrEndpoint.Port, s.Seq} + err := rtmgrClient.SubscriptionRequestUpdate(subRouteAction) + if err != nil { + return fmt.Errorf("Subscription: Failed to add route. SubId: %d, RmrEndpoint: %s", s.Seq, s.RmrEndpoint) + } + return nil +} diff --git a/pkg/control/tracker.go b/pkg/control/tracker.go index 0e6941d..087b781 100644 --- a/pkg/control/tracker.go +++ b/pkg/control/tracker.go @@ -22,82 +22,15 @@ package control import ( "fmt" "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" - "strconv" "sync" ) -//----------------------------------------------------------------------------- -// -//----------------------------------------------------------------------------- -type TransactionXappKey struct { - RmrEndpoint - Xid string // xapp xid in req -} - -func (key *TransactionXappKey) String() string { - return key.RmrEndpoint.String() + "/" + key.Xid -} - -//----------------------------------------------------------------------------- -// -//----------------------------------------------------------------------------- -type Transaction struct { - tracker *Tracker // tracker instance - Subs *Subscription - RmrEndpoint RmrEndpoint - Xid string // xapp xid in req - OrigParams *xapp.RMRParams // request orginal params - RespReceived bool - ForwardRespToXapp bool - mutex sync.Mutex -} - -func (t *Transaction) String() string { - t.mutex.Lock() - defer t.mutex.Unlock() - var subId string = "?" - if t.Subs != nil { - subId = strconv.FormatUint(uint64(t.Subs.Seq), 10) - } - return subId + "/" + t.RmrEndpoint.String() + "/" + t.Xid -} - -func (t *Transaction) CheckResponseReceived() bool { - t.mutex.Lock() - defer t.mutex.Unlock() - if t.RespReceived == false { - t.RespReceived = true - return false - } - return true -} - -func (t *Transaction) RetryTransaction() { - t.mutex.Lock() - defer t.mutex.Unlock() - t.RespReceived = false -} - -func (t *Transaction) Release() { - t.mutex.Lock() - defer t.mutex.Unlock() - if t.Subs != nil { - t.Subs.UnSetTransaction(t) - } - if t.tracker != nil { - xappkey := TransactionXappKey{t.RmrEndpoint, t.Xid} - t.tracker.UnTrackTransaction(xappkey) - } - t.Subs = nil - t.tracker = nil -} - //----------------------------------------------------------------------------- // //----------------------------------------------------------------------------- type Tracker struct { - transactionXappTable map[TransactionXappKey]*Transaction mutex sync.Mutex + transactionXappTable map[TransactionXappKey]*Transaction } func (t *Tracker) Init() { @@ -125,18 +58,10 @@ func (t *Tracker) TrackTransaction(subs *Subscription, endpoint RmrEndpoint, par return nil, err } - if subs.SetTransaction(trans) == false { - othTrans := subs.GetTransaction() - err := fmt.Errorf("Tracker: Subscription %s got already transaction ongoing: %s, transaction %s not created", subs, othTrans, trans) - return nil, err - } - trans.Subs = subs - if (trans.Subs.RmrEndpoint.Addr != trans.RmrEndpoint.Addr) || (trans.Subs.RmrEndpoint.Port != trans.RmrEndpoint.Port) { - err := fmt.Errorf("Tracker: Subscription endpoint %s mismatch with trans: %s", subs, trans) - trans.Subs.UnSetTransaction(nil) + err := subs.SetTransaction(trans) + if err != nil { return nil, err } - trans.tracker = t t.transactionXappTable[xappkey] = trans return trans, nil diff --git a/pkg/control/transaction.go b/pkg/control/transaction.go new file mode 100644 index 0000000..f686b44 --- /dev/null +++ b/pkg/control/transaction.go @@ -0,0 +1,92 @@ +/* +================================================================================== + Copyright (c) 2019 AT&T Intellectual Property. + Copyright (c) 2019 Nokia + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +================================================================================== +*/ + +package control + +import ( + "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" + "strconv" + "sync" +) + +//----------------------------------------------------------------------------- +// +//----------------------------------------------------------------------------- +type TransactionXappKey struct { + RmrEndpoint + Xid string // xapp xid in req +} + +func (key *TransactionXappKey) String() string { + return key.RmrEndpoint.String() + "/" + key.Xid +} + +//----------------------------------------------------------------------------- +// +//----------------------------------------------------------------------------- +type Transaction struct { + mutex sync.Mutex + tracker *Tracker // tracker instance + Subs *Subscription + RmrEndpoint RmrEndpoint + Xid string // xapp xid in req + OrigParams *xapp.RMRParams // request orginal params + RespReceived bool + ForwardRespToXapp bool +} + +func (t *Transaction) String() string { + t.mutex.Lock() + defer t.mutex.Unlock() + var subId string = "?" + if t.Subs != nil { + subId = strconv.FormatUint(uint64(t.Subs.Seq), 10) + } + return subId + "/" + t.RmrEndpoint.String() + "/" + t.Xid +} + +func (t *Transaction) CheckResponseReceived() bool { + t.mutex.Lock() + defer t.mutex.Unlock() + if t.RespReceived == false { + t.RespReceived = true + return false + } + return true +} + +func (t *Transaction) RetryTransaction() { + t.mutex.Lock() + defer t.mutex.Unlock() + t.RespReceived = false +} + +func (t *Transaction) Release() { + t.mutex.Lock() + defer t.mutex.Unlock() + if t.Subs != nil { + t.Subs.UnSetTransaction(t) + } + if t.tracker != nil { + xappkey := TransactionXappKey{t.RmrEndpoint, t.Xid} + t.tracker.UnTrackTransaction(xappkey) + } + t.Subs = nil + t.tracker = nil +} -- 2.16.6