From: Juha Hyttinen Date: Wed, 22 Jan 2020 10:59:01 +0000 (+0200) Subject: RICPLT-2989 Submgr routing manager client code to support multiple endpoints X-Git-Tag: 0.4.0~27 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=12d31af1cdfcbf5f634d9cf666e8e174c74ecb27;p=ric-plt%2Fsubmgr.git RICPLT-2989 Submgr routing manager client code to support multiple endpoints Change-Id: I0b4931090c06b4cf9a81b766a11162c8a6ebecf4 Signed-off-by: Juha Hyttinen --- diff --git a/pkg/control/client.go b/pkg/control/client.go index adc03d1..4146428 100644 --- a/pkg/control/client.go +++ b/pkg/control/client.go @@ -27,6 +27,7 @@ import ( "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" "strconv" "strings" + "time" ) //----------------------------------------------------------------------------- @@ -46,21 +47,34 @@ func (sri *SubRouteInfo) String() string { // //----------------------------------------------------------------------------- type RtmgrClient struct { - rtClient *rtmgrclient.RoutingManager - xappHandleParams *rtmgrhandle.ProvideXappSubscriptionHandleParams - xappDeleteParams *rtmgrhandle.DeleteXappSubscriptionHandleParams + rtClient *rtmgrclient.RoutingManager } func (rc *RtmgrClient) SubscriptionRequestUpdate(subRouteAction SubRouteInfo) error { subID := int32(subRouteAction.SubID) xapp.Logger.Debug("%s ongoing", subRouteAction.String()) - xappSubReq := rtmgr_models.XappSubscriptionData{&subRouteAction.EpList.Endpoints[0].Addr, &subRouteAction.EpList.Endpoints[0].Port, &subID} var err error switch subRouteAction.Command { case CREATE: - _, err = rc.rtClient.Handle.ProvideXappSubscriptionHandle(rc.xappHandleParams.WithXappSubscriptionData(&xappSubReq)) + createData := rtmgr_models.XappSubscriptionData{&subRouteAction.EpList.Endpoints[0].Addr, &subRouteAction.EpList.Endpoints[0].Port, &subID} + createHandle := rtmgrhandle.NewProvideXappSubscriptionHandleParamsWithTimeout(10 * time.Second) + createHandle.WithXappSubscriptionData(&createData) + _, err = rc.rtClient.Handle.ProvideXappSubscriptionHandle(createHandle) case DELETE: - _, _, err = rc.rtClient.Handle.DeleteXappSubscriptionHandle(rc.xappDeleteParams.WithXappSubscriptionData(&xappSubReq)) + deleteData := rtmgr_models.XappSubscriptionData{&subRouteAction.EpList.Endpoints[0].Addr, &subRouteAction.EpList.Endpoints[0].Port, &subID} + deleteHandle := rtmgrhandle.NewDeleteXappSubscriptionHandleParamsWithTimeout(10 * time.Second) + deleteHandle.WithXappSubscriptionData(&deleteData) + _, _, err = rc.rtClient.Handle.DeleteXappSubscriptionHandle(deleteHandle) + case UPDATE: + updateData := rtmgr_models.XappList{} + for i := range subRouteAction.EpList.Endpoints { + updateData[i] = &rtmgr_models.XappElement{Address: &subRouteAction.EpList.Endpoints[i].Addr, Port: &subRouteAction.EpList.Endpoints[i].Port} + } + updateHandle := rtmgrhandle.NewUpdateXappSubscriptionHandleParamsWithTimeout(10 * time.Second) + updateHandle.WithSubscriptionID(subRouteAction.SubID) + updateHandle.WithXappList(updateData) + _, err = rc.rtClient.Handle.UpdateXappSubscriptionHandle(updateHandle) + default: return fmt.Errorf("%s unknown", subRouteAction.String()) } diff --git a/pkg/control/control.go b/pkg/control/control.go index a2c8b0d..3c75523 100755 --- a/pkg/control/control.go +++ b/pkg/control/control.go @@ -23,7 +23,6 @@ import ( "fmt" "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap" rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client" - rtmgrhandle "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client/handle" "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" httptransport "github.com/go-openapi/runtime/client" "github.com/go-openapi/strfmt" @@ -60,7 +59,7 @@ type RMRMeid struct { const ( CREATE Action = 0 - MERGE Action = 1 + UPDATE Action = 1 NONE Action = 2 DELETE Action = 3 ) @@ -75,10 +74,7 @@ func init() { func NewControl() *Control { transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"}) - client := rtmgrclient.New(transport, strfmt.Default) - handle := rtmgrhandle.NewProvideXappSubscriptionHandleParamsWithTimeout(10 * time.Second) - deleteHandle := rtmgrhandle.NewDeleteXappSubscriptionHandleParamsWithTimeout(10 * time.Second) - rtmgrClient := RtmgrClient{client, handle, deleteHandle} + rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)} registry := new(Registry) registry.Initialize() @@ -335,7 +331,7 @@ func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *Tran parentTrans.SendEvent(nil, 0) } - subs.DelEndpoint(parentTrans.GetEndpoint()) + go c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second) } //------------------------------------------------------------------- @@ -353,7 +349,7 @@ func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *Tran event := c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans) parentTrans.SendEvent(event, 0) - subs.DelEndpoint(parentTrans.GetEndpoint()) + go c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second) } //------------------------------------------------------------------- diff --git a/pkg/control/registry.go b/pkg/control/registry.go index 5edc3eb..2750b78 100644 --- a/pkg/control/registry.go +++ b/pkg/control/registry.go @@ -24,6 +24,7 @@ import ( "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap" "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" "sync" + "time" ) //----------------------------------------------------------------------------- @@ -36,7 +37,6 @@ type Registry struct { rtmgrClient *RtmgrClient } -// This method should run as a constructor func (r *Registry) Initialize() { r.register = make(map[uint16]*Subscription) var i uint16 @@ -45,32 +45,125 @@ func (r *Registry) Initialize() { } } -// Reserves and returns the next free sequence number func (r *Registry) AssignToSubscription(trans *Transaction, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, error) { r.mutex.Lock() defer r.mutex.Unlock() + + var sequenceNumber uint16 + + // + // Allocate subscription + // if len(r.subIds) > 0 { - sequenceNumber := r.subIds[0] + sequenceNumber = r.subIds[0] r.subIds = r.subIds[1:] - if _, ok := r.register[sequenceNumber]; ok == false { - subs := &Subscription{ - registry: r, - Seq: sequenceNumber, - Meid: trans.Meid, - } - err := subs.AddEndpoint(trans.GetEndpoint()) - if err != nil { - return nil, err - } - subs.SubReqMsg = subReqMsg - - r.register[sequenceNumber] = subs - xapp.Logger.Debug("Registry: Create %s", subs.String()) + if _, ok := r.register[sequenceNumber]; ok == true { + r.subIds = append(r.subIds, sequenceNumber) + return nil, fmt.Errorf("Registry: Failed to reserves subscription") + } + } else { + return nil, fmt.Errorf("Registry: Failed to reserves subscription no free ids") + } + subs := &Subscription{ + registry: r, + Seq: sequenceNumber, + Meid: trans.Meid, + } + + // + // Add to subscription + // + subs.mutex.Lock() + defer subs.mutex.Unlock() + + if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false { + r.subIds = append(r.subIds, sequenceNumber) + return nil, fmt.Errorf("Registry: Endpoint existing already in subscription") + } + epamount := subs.EpList.Size() + + r.mutex.Unlock() + // + // Subscription route updates + // + var err error + if epamount == 1 { + subRouteAction := SubRouteInfo{CREATE, subs.EpList, subs.Seq} + err = r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction) + } else { + subRouteAction := SubRouteInfo{UPDATE, subs.EpList, subs.Seq} + err = r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction) + } + r.mutex.Lock() + + if err != nil { + r.subIds = append(r.subIds, sequenceNumber) + return nil, err + } + subs.SubReqMsg = subReqMsg + + r.register[sequenceNumber] = subs + xapp.Logger.Debug("Registry: Create %s", subs.String()) + xapp.Logger.Debug("Registry: substable=%v", r.register) + return subs, nil +} + +func (r *Registry) RemoveFromSubscription(subs *Subscription, trans *Transaction, waitRouteClean time.Duration) error { + r.mutex.Lock() + defer r.mutex.Unlock() + subs.mutex.Lock() + defer subs.mutex.Unlock() + + delStatus := subs.EpList.DelEndpoint(trans.GetEndpoint()) + epamount := subs.EpList.Size() + + // + // If last endpoint remove from register map + // + if epamount == 0 { + if _, ok := r.register[subs.Seq]; ok { + xapp.Logger.Debug("Registry: Delete %s", subs.String()) + delete(r.register, subs.Seq) xapp.Logger.Debug("Registry: substable=%v", r.register) - return subs, nil } } - return nil, fmt.Errorf("Registry: Failed to reserves subscription") + r.mutex.Unlock() + + // + // Wait some time before really do route updates + // + if waitRouteClean > 0 { + subs.mutex.Unlock() + time.Sleep(waitRouteClean) + subs.mutex.Lock() + } + + xapp.Logger.Info("Registry: Cleaning %s", subs.String()) + + // + // Subscription route updates + // + if delStatus { + if epamount == 0 { + tmpList := RmrEndpointList{} + tmpList.AddEndpoint(trans.GetEndpoint()) + subRouteAction := SubRouteInfo{DELETE, tmpList, subs.Seq} + r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction) + } else { + subRouteAction := SubRouteInfo{UPDATE, subs.EpList, subs.Seq} + r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction) + } + } + + r.mutex.Lock() + // + // If last endpoint free seq nro + // + if epamount == 0 { + r.subIds = append(r.subIds, subs.Seq) + } + + return nil } func (r *Registry) GetSubscription(sn uint16) *Subscription { @@ -92,17 +185,3 @@ func (r *Registry) GetSubscriptionFirstMatch(ids []uint16) (*Subscription, error } return nil, fmt.Errorf("No valid subscription found with ids %v", ids) } - -func (r *Registry) DelSubscription(sn uint16) bool { - r.mutex.Lock() - defer r.mutex.Unlock() - if _, ok := r.register[sn]; ok { - subs := r.register[sn] - xapp.Logger.Debug("Registry: Delete %s", subs.String()) - r.subIds = append(r.subIds, sn) - delete(r.register, sn) - xapp.Logger.Debug("Registry: substable=%v", r.register) - return true - } - return false -} diff --git a/pkg/control/subscription.go b/pkg/control/subscription.go index 4514d00..45c13ec 100644 --- a/pkg/control/subscription.go +++ b/pkg/control/subscription.go @@ -20,39 +20,29 @@ package control import ( - "fmt" "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap" "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" "strconv" "sync" - "time" ) //----------------------------------------------------------------------------- // //----------------------------------------------------------------------------- type Subscription struct { - mutex sync.Mutex // Lock - registry *Registry // Registry - Seq uint16 // SubsId - Meid *xapp.RMRMeid // Meid/ RanName - EpList RmrEndpointList // Endpoints - DelEpList RmrEndpointList // Endpoints - DelSeq uint64 + mutex sync.Mutex // Lock + registry *Registry // Registry + Seq uint16 // SubsId + Meid *xapp.RMRMeid // Meid/ RanName + EpList RmrEndpointList // Endpoints TransLock sync.Mutex // Lock transactions, only one executed per time for subs TheTrans *Transaction // Ongoing transaction from xapp SubReqMsg *e2ap.E2APSubscriptionRequest // Subscription information SubRespMsg *e2ap.E2APSubscriptionResponse // Subscription information } -func (s *Subscription) stringImpl() string { - return "subs(" + strconv.FormatUint(uint64(s.Seq), 10) + "/" + s.Meid.RanName + "/" + s.EpList.String() + ")" -} - func (s *Subscription) String() string { - s.mutex.Lock() - defer s.mutex.Unlock() - return s.stringImpl() + return "subs(" + strconv.FormatUint(uint64(s.Seq), 10) + "/" + s.Meid.RanName + "/" + s.EpList.String() + ")" } func (s *Subscription) GetSubId() uint16 { @@ -70,68 +60,6 @@ func (s *Subscription) GetMeid() *xapp.RMRMeid { return nil } -func (s *Subscription) AddEndpoint(ep *RmrEndpoint) error { - s.mutex.Lock() - defer s.mutex.Unlock() - if ep == nil { - return fmt.Errorf("AddEndpoint no endpoint given") - } - if s.EpList.AddEndpoint(ep) { - s.DelEpList.DelEndpoint(ep) - if s.EpList.Size() == 1 { - return s.updateRouteImpl(CREATE) - } - return s.updateRouteImpl(MERGE) - } - return nil -} - -func (s *Subscription) DelEndpoint(ep *RmrEndpoint) error { - s.mutex.Lock() - defer s.mutex.Unlock() - var err error - if ep == nil { - return fmt.Errorf("DelEndpoint no endpoint given") - } - if s.EpList.HasEndpoint(ep) == false { - return fmt.Errorf("DelEndpoint endpoint not found") - } - if s.DelEpList.HasEndpoint(ep) == true { - return fmt.Errorf("DelEndpoint endpoint already under del") - } - s.DelEpList.AddEndpoint(ep) - go s.CleanCheck() - return err -} - -func (s *Subscription) CleanCheck() { - s.mutex.Lock() - defer s.mutex.Unlock() - s.DelSeq++ - // Only one clean ongoing - if s.DelSeq > 1 { - return - } - var currSeq uint64 = 0 - // Make sure that routes to be deleted - // are not deleted too fast - for currSeq < s.DelSeq { - currSeq = s.DelSeq - s.mutex.Unlock() - time.Sleep(5 * time.Second) - s.mutex.Lock() - } - xapp.Logger.Info("DelEndpoint: delete cleaning %s", s.stringImpl()) - if s.EpList.Size() <= s.DelEpList.Size() { - s.updateRouteImpl(DELETE) - go s.registry.DelSubscription(s.Seq) - } else if s.EpList.DelEndpoints(&s.DelEpList) { - s.updateRouteImpl(MERGE) - } - s.DelSeq = 0 - -} - func (s *Subscription) IsTransactionReserved() bool { s.mutex.Lock() defer s.mutex.Unlock() @@ -163,12 +91,3 @@ func (s *Subscription) ReleaseTransactionTurn(trans *Transaction) { s.mutex.Unlock() s.TransLock.Unlock() } - -func (s *Subscription) updateRouteImpl(act Action) error { - subRouteAction := SubRouteInfo{act, s.EpList, s.Seq} - err := s.registry.rtmgrClient.SubscriptionRequestUpdate(subRouteAction) - if err != nil { - return fmt.Errorf("%s %s", s.stringImpl(), err.Error()) - } - return nil -} diff --git a/pkg/control/transaction.go b/pkg/control/transaction.go index a0a260f..735954e 100644 --- a/pkg/control/transaction.go +++ b/pkg/control/transaction.go @@ -109,7 +109,7 @@ type Transaction struct { XappKey *TransactionXappKey // } -func (t *Transaction) StringImpl() string { +func (t *Transaction) String() string { var transkey string = "transkey(N/A)" if t.XappKey != nil { transkey = t.XappKey.String() @@ -117,12 +117,6 @@ func (t *Transaction) StringImpl() string { return "trans(" + strconv.FormatUint(uint64(t.Seq), 10) + "/" + t.Meid.RanName + "/" + transkey + ")" } -func (t *Transaction) String() string { - t.mutex.Lock() - defer t.mutex.Unlock() - return t.StringImpl() -} - func (t *Transaction) GetEndpoint() *RmrEndpoint { t.mutex.Lock() defer t.mutex.Unlock() @@ -152,7 +146,7 @@ func (t *Transaction) GetSrc() string { func (t *Transaction) Release() { t.mutex.Lock() - xapp.Logger.Debug("Transaction: Release %s", t.StringImpl()) + xapp.Logger.Debug("Transaction: Release %s", t.String()) tracker := t.tracker xappkey := t.XappKey t.tracker = nil diff --git a/pkg/control/types.go b/pkg/control/types.go index d0c7fb8..164e801 100644 --- a/pkg/control/types.go +++ b/pkg/control/types.go @@ -152,7 +152,7 @@ type Action int func (act Action) String() string { actions := [...]string{ "CREATE", - "MERGE", + "UPDATE", "NONE", "DELETE", } diff --git a/pkg/control/types_test.go b/pkg/control/types_test.go index eade2aa..caeae7b 100644 --- a/pkg/control/types_test.go +++ b/pkg/control/types_test.go @@ -61,7 +61,7 @@ func TestAction(t *testing.T) { } testActionString(t, 0, "CREATE") - testActionString(t, 1, "MERGE") + testActionString(t, 1, "UPDATE") testActionString(t, 2, "NONE") testActionString(t, 3, "DELETE") testActionString(t, 5, "UNKNOWN")