From 31797b49985822f1d402501f16ab2794838bebba Mon Sep 17 00:00:00 2001 From: Juha Hyttinen Date: Thu, 16 Jan 2020 14:05:01 +0200 Subject: [PATCH] RICPLT-3014 Subs multiple rmr endpoints Subscription to support multiple end points and to update routes for multiple endpoints Change-Id: I1bb3c5fba61e878c66723a5e340aba06ac17a962 Signed-off-by: Juha Hyttinen --- pkg/control/client.go | 65 ++++----- pkg/control/control.go | 306 ++++++++++++++---------------------------- pkg/control/main_test.go | 97 ++++++++----- pkg/control/messaging_test.go | 49 +++++-- pkg/control/registry.go | 38 ++---- pkg/control/subscription.go | 45 ++++--- pkg/control/tracker.go | 10 +- pkg/control/transaction.go | 34 +++-- pkg/control/types.go | 38 ++++-- 9 files changed, 333 insertions(+), 349 deletions(-) diff --git a/pkg/control/client.go b/pkg/control/client.go index 6ab20fb..b067f3b 100644 --- a/pkg/control/client.go +++ b/pkg/control/client.go @@ -20,7 +20,7 @@ package control import ( - "errors" + "fmt" rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client" rtmgrhandle "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client/handle" "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_models" @@ -29,6 +29,22 @@ import ( "strings" ) +//----------------------------------------------------------------------------- +// +//----------------------------------------------------------------------------- +type SubRouteInfo struct { + Command Action + EpList RmrEndpointList + SubID uint16 +} + +func (sri *SubRouteInfo) String() string { + return "routeinfo(" + sri.Command.String() + "/" + strconv.FormatUint(uint64(sri.SubID), 10) + "/[" + sri.EpList.String() + "])" +} + +//----------------------------------------------------------------------------- +// +//----------------------------------------------------------------------------- type RtmgrClient struct { rtClient *rtmgrclient.RoutingManager xappHandleParams *rtmgrhandle.ProvideXappSubscriptionHandleParams @@ -36,48 +52,23 @@ type RtmgrClient struct { } func (rc *RtmgrClient) SubscriptionRequestUpdate(subRouteAction SubRouteInfo) error { - xapp.Logger.Debug("SubscriptionRequestUpdate() invoked") subID := int32(subRouteAction.SubID) - xapp.Logger.Debug("Subscription action details received. subRouteAction.Command: %v, Address %s, Port %v, subID %v", int16(subRouteAction.Command), subRouteAction.Address, subRouteAction.Port, subID) - xappSubReq := rtmgr_models.XappSubscriptionData{&subRouteAction.Address, &subRouteAction.Port, &subID} - + xapp.Logger.Debug("%s ongoing", subRouteAction.String()) + xappSubReq := rtmgr_models.XappSubscriptionData{&subRouteAction.EpList.Endpoints[0].Addr, &subRouteAction.EpList.Endpoints[0].Port, &subID} + var err error switch subRouteAction.Command { case CREATE: - _, postErr := rc.rtClient.Handle.ProvideXappSubscriptionHandle(rc.xappHandleParams.WithXappSubscriptionData(&xappSubReq)) - if postErr != nil && !(strings.Contains(postErr.Error(), "status 200")) { - xapp.Logger.Error("Updating routing manager about subscription id = %d failed with error: %v", subID, postErr) - return postErr - } else { - xapp.Logger.Info("Succesfully updated routing manager about the subscription: %d", subID) - return nil - } + _, err = rc.rtClient.Handle.ProvideXappSubscriptionHandle(rc.xappHandleParams.WithXappSubscriptionData(&xappSubReq)) case DELETE: - _, _, deleteErr := rc.rtClient.Handle.DeleteXappSubscriptionHandle(rc.xappDeleteParams.WithXappSubscriptionData(&xappSubReq)) - if deleteErr != nil && !(strings.Contains(deleteErr.Error(), "status 200")) { - xapp.Logger.Error("Deleting subscription id = %d in routing manager, failed with error: %v", subID, deleteErr) - return deleteErr - } else { - xapp.Logger.Info("Succesfully deleted subscription: %d in routing manager.", subID) - return nil - } + _, _, err = rc.rtClient.Handle.DeleteXappSubscriptionHandle(rc.xappDeleteParams.WithXappSubscriptionData(&xappSubReq)) default: - xapp.Logger.Debug("Unknown subRouteAction.Command: %v, Address %s, Port %v, subID: %v", subRouteAction.Command, subRouteAction.Address, subRouteAction.Port, subID) - return nil + return fmt.Errorf("%s unknown", subRouteAction.String()) } -} -func (rc *RtmgrClient) SplitSource(src string) (*string, *uint16, error) { - tcpSrc := strings.Split(src, ":") - if len(tcpSrc) != 2 { - err := errors.New("unable to get the source details of the xapp - check the source string received from the rmr") - return nil, nil, err + if err != nil && !(strings.Contains(err.Error(), "status 200")) { + return fmt.Errorf("%s failed with error: %s", subRouteAction.String(), err.Error()) } - srcAddr := tcpSrc[0] - xapp.Logger.Debug("Debugging Inside splitsource tcpsrc[0] = %s and tcpsrc[1]= %s ", tcpSrc[0], tcpSrc[1]) - srcPort, err := strconv.ParseUint(tcpSrc[1], 10, 16) - if err != nil { - return nil, nil, err - } - srcPortInt := uint16(srcPort) - return &srcAddr, &srcPortInt, nil + xapp.Logger.Debug("%s successful", subRouteAction.String()) + return nil + } diff --git a/pkg/control/control.go b/pkg/control/control.go index cc176c2..1bc2ee8 100755 --- a/pkg/control/control.go +++ b/pkg/control/control.go @@ -174,54 +174,86 @@ func (c *Control) Consume(params *xapp.RMRParams) (err error) { return nil } +func idstring(trans fmt.Stringer, subs fmt.Stringer, err error) string { + var retval string = "" + var filler string = "" + if trans != nil { + retval += filler + trans.String() + filler = " " + } + if subs != nil { + retval += filler + subs.String() + filler = " " + } + if err != nil { + retval += filler + "err(" + err.Error() + ")" + filler = " " + } + return retval +} + +func (c *Control) findSubs(ids []int) (*Subscription, error) { + var subs *Subscription = nil + for _, id := range ids { + if id >= 0 { + subs = c.registry.GetSubscription(uint16(id)) + } + if subs != nil { + break + } + } + if subs == nil { + return nil, fmt.Errorf("No valid subscription found with ids %v", ids) + } + return subs, nil +} + +func (c *Control) findSubsAndTrans(ids []int) (*Subscription, *Transaction, error) { + subs, err := c.findSubs(ids) + if err != nil { + return nil, nil, err + } + trans := subs.GetTransaction() + if trans == nil { + return subs, nil, fmt.Errorf("No ongoing transaction found from %s", idstring(nil, subs, nil)) + } + return subs, trans, nil +} func (c *Control) handleSubscriptionRequest(params *RMRParams) { xapp.Logger.Info("SubReq from xapp: %s", params.String()) - // - // - // - trans, err := c.tracker.TrackTransaction(NewRmrEndpoint(params.Src), - params.Xid, - params.Meid, - false, - true) - + SubReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload) if err != nil { - xapp.Logger.Error("SubReq: %s, Dropping this msg. %s", err.Error(), params.String()) + xapp.Logger.Error("SubReq Drop: %s", idstring(params, nil, err)) return } - // - // - // - trans.SubReqMsg, err = c.e2ap.UnpackSubscriptionRequest(params.Payload) + trans, err := c.tracker.TrackTransaction(NewRmrEndpoint(params.Src), params.Xid, params.Meid, false, true) if err != nil { - xapp.Logger.Error("SubReq: %s Dropping this msg. %s", err.Error(), trans) - trans.Release() + xapp.Logger.Error("SubReq Drop: %s", idstring(params, nil, err)) return } + trans.SubReqMsg = SubReqMsg - // - // - // - subs, err := c.registry.ReserveSubscription(&trans.RmrEndpoint, trans.Meid) + subs, err := c.registry.ReserveSubscription(trans.Meid) if err != nil { - xapp.Logger.Error("SubReq: %s, Dropping this msg. %s", err.Error(), trans) + xapp.Logger.Error("SubReq Drop: %s", idstring(trans, nil, err)) trans.Release() return } err = subs.SetTransaction(trans) if err != nil { - xapp.Logger.Error("SubReq: %s, Dropping this msg. %s", err.Error(), trans) + xapp.Logger.Error("SubReq Drop: %s", idstring(trans, subs, err)) subs.Release() trans.Release() return } - trans.SubReqMsg.RequestId.Seq = uint32(subs.GetSubId()) + xapp.Logger.Debug("SubReq: Handling %s", idstring(trans, subs, nil)) + // // TODO: subscription create is in fact owned by subscription and not transaction. // Transaction is toward xapp while Subscription is toward ran. @@ -232,59 +264,34 @@ func (c *Control) handleSubscriptionRequest(params *RMRParams) { // trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(trans.SubReqMsg) if err != nil { - xapp.Logger.Error("SubReq: %s for trans %s", err.Error(), trans) + xapp.Logger.Error("SubResp Drop: %s", idstring(trans, subs, err)) subs.Release() trans.Release() return } c.rmrSend("SubReq: SubReq to E2T", subs, trans) - c.timerMap.StartTimer("RIC_SUB_REQ", int(subs.GetSubId()), subReqTime, FirstTry, c.handleSubscriptionRequestTimer) - xapp.Logger.Debug("SubReq: Debugging trans table = %v", c.tracker.transactionXappTable) return } func (c *Control) handleSubscriptionResponse(params *RMRParams) { xapp.Logger.Info("SubResp from E2T: %s", params.String()) - // - // - // SubRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload) if err != nil { - xapp.Logger.Error("SubResp: %s Dropping this msg. %s", err.Error(), params.String()) - return - } - - // - // - // - subs := c.registry.GetSubscription(uint16(SubRespMsg.RequestId.Seq)) - if subs == nil && params.SubId > 0 { - subs = c.registry.GetSubscription(uint16(params.SubId)) - } - - if subs == nil { - xapp.Logger.Error("SubResp: Not valid subscription found payloadSeqNum: %d, SubId: %d. Dropping this msg. %s", SubRespMsg.RequestId.Seq, params.SubId, params.String()) + xapp.Logger.Error("SubResp Drop %s", idstring(params, nil, err)) return } - xapp.Logger.Info("SubResp: subscription found payloadSeqNum: %d, SubId: %d", SubRespMsg.RequestId.Seq, subs.GetSubId()) - // - // - // - trans := subs.GetTransaction() - if trans == nil { - xapp.Logger.Error("SubResp: Unknown trans. Dropping this msg. SubId: %d", subs.GetSubId()) + subs, trans, err := c.findSubsAndTrans([]int{int(SubRespMsg.RequestId.Seq), params.SubId}) + if err != nil { + xapp.Logger.Error("SubResp: %s", idstring(params, nil, err)) return } - trans.SubRespMsg = SubRespMsg + xapp.Logger.Debug("SubResp: Handling %s", idstring(trans, subs, nil)) - // - // - // c.timerMap.StopTimer("RIC_SUB_REQ", int(subs.GetSubId())) responseReceived := trans.CheckResponseReceived() @@ -295,7 +302,7 @@ func (c *Control) handleSubscriptionResponse(params *RMRParams) { trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(trans.SubRespMsg) if err != nil { - xapp.Logger.Error("SubResp: %s for trans %s", err.Error(), trans) + xapp.Logger.Error("SubResp: %s", idstring(trans, subs, err)) trans.Release() return } @@ -309,49 +316,22 @@ func (c *Control) handleSubscriptionResponse(params *RMRParams) { func (c *Control) handleSubscriptionFailure(params *RMRParams) { xapp.Logger.Info("SubFail from E2T: %s", params.String()) - // - // - // SubFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload) if err != nil { - xapp.Logger.Error("SubFail: %s Dropping this msg. %s", err.Error(), params.String()) + xapp.Logger.Error("SubFail Drop %s", idstring(params, nil, err)) return } - // - // - // - subs := c.registry.GetSubscription(uint16(SubFailMsg.RequestId.Seq)) - if subs == nil && params.SubId > 0 { - subs = c.registry.GetSubscription(uint16(params.SubId)) - } - - if subs == nil { - xapp.Logger.Error("SubFail: Not valid subscription found payloadSeqNum: %d, SubId: %d. Dropping this msg. %s", SubFailMsg.RequestId.Seq, params.SubId, params.String()) - return - } - xapp.Logger.Info("SubFail: subscription found payloadSeqNum: %d, SubId: %d", SubFailMsg.RequestId.Seq, subs.GetSubId()) - - // - // - // - trans := subs.GetTransaction() - if trans == nil { - xapp.Logger.Error("SubFail: Unknown trans. Dropping this msg. SubId: %d", subs.GetSubId()) + subs, trans, err := c.findSubsAndTrans([]int{int(SubFailMsg.RequestId.Seq), params.SubId}) + if err != nil { + xapp.Logger.Error("SubFail: %s", idstring(params, nil, err)) return } trans.SubFailMsg = SubFailMsg + xapp.Logger.Debug("SubFail: Handling %s", idstring(trans, subs, nil)) - // - // - // c.timerMap.StopTimer("RIC_SUB_REQ", int(subs.GetSubId())) - responseReceived := trans.CheckResponseReceived() - if err != nil { - return - } - if responseReceived == true { // Subscription timer already received return @@ -363,7 +343,7 @@ func (c *Control) handleSubscriptionFailure(params *RMRParams) { time.Sleep(3 * time.Second) } else { //TODO error handling improvement - xapp.Logger.Error("SubFail: %s for trans %s (continuing cleaning)", err.Error(), trans) + xapp.Logger.Error("SubFail: (continue cleaning) %s", idstring(trans, subs, err)) } trans.Release() @@ -374,27 +354,21 @@ func (c *Control) handleSubscriptionFailure(params *RMRParams) { func (c *Control) handleSubscriptionRequestTimer(strId string, nbrId int, tryCount uint64) { xapp.Logger.Info("SubReq timeout: subId: %v, tryCount: %v", nbrId, tryCount) - subs := c.registry.GetSubscription(uint16(nbrId)) - if subs == nil { - xapp.Logger.Error("SubReq timeout: Unknown payloadSeqNum. Dropping this msg. SubId: %v", nbrId) - return - } - - trans := subs.GetTransaction() - if trans == nil { - xapp.Logger.Error("SubReq timeout: Unknown trans. Dropping this msg. SubId: %v", subs.GetSubId()) + subs, trans, err := c.findSubsAndTrans(([]int{nbrId})) + if err != nil { + xapp.Logger.Error("SubReq timeout: %s", idstring(nil, nil, err)) return } + xapp.Logger.Debug("SubReq timeout: Handling %s", idstring(trans, subs, nil)) responseReceived := trans.CheckResponseReceived() - if responseReceived == true { // Subscription Response or Failure already received return } if tryCount < maxSubReqTryCount { - xapp.Logger.Info("SubReq timeout: subs: %s trans: %s", subs, trans) + xapp.Logger.Info("SubReq timeout: %s", idstring(trans, subs, nil)) trans.RetryTransaction() @@ -409,14 +383,9 @@ func (c *Control) handleSubscriptionRequestTimer(strId string, nbrId int, tryCou trans.Release() // Create DELETE transaction (internal and no messages toward xapp) - deltrans, err := c.tracker.TrackTransaction(&trans.RmrEndpoint, - trans.GetXid(), - trans.GetMeid(), - false, - false) - + deltrans, err := c.tracker.TrackTransaction(&trans.RmrEndpoint, trans.GetXid(), trans.GetMeid(), false, false) if err != nil { - xapp.Logger.Error("SubReq timeout: %s, Dropping this msg.", err.Error()) + xapp.Logger.Error("SubReq timeout: %s", idstring(trans, subs, err)) //TODO improve error handling. Important at least in merge subs.Release() return @@ -428,7 +397,7 @@ func (c *Control) handleSubscriptionRequestTimer(strId string, nbrId int, tryCou deltrans.SubDelReqMsg.FunctionId = trans.SubReqMsg.FunctionId deltrans.Mtype, deltrans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(deltrans.SubDelReqMsg) if err != nil { - xapp.Logger.Error("SubReq timeout: Packing SubDelReq failed. Err: %v", err) + xapp.Logger.Error("SubReq timeout: %s", idstring(trans, subs, err)) //TODO improve error handling. Important at least in merge deltrans.Release() subs.Release() @@ -437,7 +406,7 @@ func (c *Control) handleSubscriptionRequestTimer(strId string, nbrId int, tryCou err = subs.SetTransaction(deltrans) if err != nil { - xapp.Logger.Error("SubReq timeout: %s, Dropping this msg.", err.Error()) + xapp.Logger.Error("SubReq timeout: %s", idstring(trans, subs, err)) //TODO improve error handling. Important at least in merge deltrans.Release() return @@ -451,52 +420,35 @@ func (c *Control) handleSubscriptionRequestTimer(strId string, nbrId int, tryCou func (c *Control) handleSubscriptionDeleteRequest(params *RMRParams) { xapp.Logger.Info("SubDelReq from xapp: %s", params.String()) - // - // - // - trans, err := c.tracker.TrackTransaction(NewRmrEndpoint(params.Src), - params.Xid, - params.Meid, - false, - true) - + SubDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload) if err != nil { - xapp.Logger.Error("SubDelReq: %s, Dropping this msg. %s", err.Error(), params.String()) + xapp.Logger.Error("SubDelReq Drop %s", idstring(params, nil, err)) return } - // - // - // - trans.SubDelReqMsg, err = c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload) + trans, err := c.tracker.TrackTransaction(NewRmrEndpoint(params.Src), params.Xid, params.Meid, false, true) if err != nil { - xapp.Logger.Error("SubDelReq: %s Dropping this msg. %s", err.Error(), trans) - trans.Release() + xapp.Logger.Error("SubDelReq Drop %s", idstring(params, nil, err)) return } + trans.SubDelReqMsg = SubDelReqMsg - // - // - // - subs := c.registry.GetSubscription(uint16(trans.SubDelReqMsg.RequestId.Seq)) - if subs == nil && params.SubId > 0 { - subs = c.registry.GetSubscription(uint16(params.SubId)) - } - - if subs == nil { - xapp.Logger.Error("SubDelReq: Not valid subscription found payloadSeqNum: %d, SubId: %d. Dropping this msg. %s", trans.SubDelReqMsg.RequestId.Seq, params.SubId, trans) + subs, err := c.findSubs([]int{int(trans.SubDelReqMsg.RequestId.Seq), params.SubId}) + if err != nil { + xapp.Logger.Error("SubDelReq: %s", idstring(params, nil, err)) trans.Release() return } - xapp.Logger.Info("SubDelReq: subscription found payloadSeqNum: %d, SubId: %d. %s", trans.SubDelReqMsg.RequestId.Seq, params.SubId, trans) err = subs.SetTransaction(trans) if err != nil { - xapp.Logger.Error("SubDelReq: %s, Dropping this msg. %s", err.Error(), trans) + xapp.Logger.Error("SubDelReq: %s", idstring(trans, subs, err)) trans.Release() return } + xapp.Logger.Debug("SubDelReq: Handling %s", idstring(trans, subs, nil)) + // // TODO: subscription delete is in fact owned by subscription and not transaction. // Transaction is toward xapp while Subscription is toward ran. @@ -507,7 +459,7 @@ func (c *Control) handleSubscriptionDeleteRequest(params *RMRParams) { // trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(trans.SubDelReqMsg) if err != nil { - xapp.Logger.Error("SubDelReq: %s for trans %s", err.Error(), trans) + xapp.Logger.Error("SubDelReq: %s", idstring(trans, subs, err)) trans.Release() return } @@ -523,43 +475,20 @@ func (c *Control) handleSubscriptionDeleteRequest(params *RMRParams) { func (c *Control) handleSubscriptionDeleteResponse(params *RMRParams) (err error) { xapp.Logger.Info("SubDelResp from E2T:%s", params.String()) - // - // - // SubDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload) if err != nil { - xapp.Logger.Error("SubDelResp: %s Dropping this msg. %s", err.Error(), params.String()) - return - } - - // - // - // - subs := c.registry.GetSubscription(uint16(SubDelRespMsg.RequestId.Seq)) - if subs == nil && params.SubId > 0 { - subs = c.registry.GetSubscription(uint16(params.SubId)) - } - - if subs == nil { - xapp.Logger.Error("SubDelResp: Not valid subscription found payloadSeqNum: %d, SubId: %d. Dropping this msg. %s", SubDelRespMsg.RequestId.Seq, params.SubId, params.String()) + xapp.Logger.Error("SubDelResp: Dropping this msg. %s", idstring(params, nil, err)) return } - xapp.Logger.Info("SubDelResp: subscription found payloadSeqNum: %d, SubId: %d", SubDelRespMsg.RequestId.Seq, subs.GetSubId()) - // - // - // - trans := subs.GetTransaction() - if trans == nil { - xapp.Logger.Error("SubDelResp: Unknown trans. Dropping this msg. SubId: %d", subs.GetSubId()) + subs, trans, err := c.findSubsAndTrans([]int{int(SubDelRespMsg.RequestId.Seq), params.SubId}) + if err != nil { + xapp.Logger.Error("SubDelResp: %s", idstring(params, nil, err)) return } - trans.SubDelRespMsg = SubDelRespMsg + xapp.Logger.Debug("SubDelResp: Handling %s", idstring(trans, subs, nil)) - // - // - // c.timerMap.StopTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId())) responseReceived := trans.CheckResponseReceived() @@ -575,42 +504,20 @@ func (c *Control) handleSubscriptionDeleteResponse(params *RMRParams) (err error func (c *Control) handleSubscriptionDeleteFailure(params *RMRParams) { xapp.Logger.Info("SubDelFail from E2T:%s", params.String()) - // - // - // SubDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload) if err != nil { - xapp.Logger.Error("SubDelFail: %s Dropping this msg. %s", err.Error(), params.String()) + xapp.Logger.Error("SubDelFail: Dropping this msg. %s", idstring(params, nil, err)) return } - // - // - // - subs := c.registry.GetSubscription(uint16(SubDelFailMsg.RequestId.Seq)) - if subs == nil && params.SubId > 0 { - subs = c.registry.GetSubscription(uint16(params.SubId)) - } - - if subs == nil { - xapp.Logger.Error("SubDelFail: Not valid subscription found payloadSeqNum: %d, SubId: %d. Dropping this msg. %s", SubDelFailMsg.RequestId.Seq, params.SubId, params.String()) - return - } - xapp.Logger.Info("SubDelFail: subscription found payloadSeqNum: %d, SubId: %d", SubDelFailMsg.RequestId.Seq, subs.GetSubId()) - - // - // - // - trans := subs.GetTransaction() - if trans == nil { - xapp.Logger.Error("SubDelFail: Unknown trans. Dropping this msg. SubId: %d", subs.GetSubId()) + subs, trans, err := c.findSubsAndTrans([]int{int(SubDelFailMsg.RequestId.Seq), params.SubId}) + if err != nil { + xapp.Logger.Error("SubDelFail: %s", idstring(params, nil, err)) return } trans.SubDelFailMsg = SubDelFailMsg + xapp.Logger.Debug("SubDelFail: Handling %s", idstring(trans, subs, nil)) - // - // - // c.timerMap.StopTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId())) responseReceived := trans.CheckResponseReceived() @@ -626,17 +533,12 @@ func (c *Control) handleSubscriptionDeleteFailure(params *RMRParams) { func (c *Control) handleSubscriptionDeleteRequestTimer(strId string, nbrId int, tryCount uint64) { xapp.Logger.Info("SubDelReq timeout: subId: %v, tryCount: %v", nbrId, tryCount) - subs := c.registry.GetSubscription(uint16(nbrId)) - if subs == nil { - xapp.Logger.Error("SubDelReq timeout: Unknown payloadSeqNum. Dropping this msg. SubId: %v", nbrId) - return - } - - trans := subs.GetTransaction() - if trans == nil { - xapp.Logger.Error("SubDelReq timeout: Unknown trans. Dropping this msg. SubId: %v", subs.GetSubId()) + subs, trans, err := c.findSubsAndTrans([]int{nbrId}) + if err != nil { + xapp.Logger.Error("SubDelReq timeout: %s", idstring(nil, nil, err)) return } + xapp.Logger.Debug("SubDelReq timeout: Handling %s", idstring(trans, subs, nil)) responseReceived := trans.CheckResponseReceived() if responseReceived == true { @@ -673,7 +575,7 @@ func (c *Control) sendSubscriptionDeleteResponse(desc string, trans *Transaction time.Sleep(3 * time.Second) } else { //TODO error handling improvement - xapp.Logger.Error("%s: %s for trans %s (continuing cleaning)", desc, err.Error(), trans) + xapp.Logger.Error("%s: (continue cleaning) %s", desc, idstring(trans, subs, err)) } } diff --git a/pkg/control/main_test.go b/pkg/control/main_test.go index ecf5a19..d3d7b57 100644 --- a/pkg/control/main_test.go +++ b/pkg/control/main_test.go @@ -29,6 +29,7 @@ import ( "os" "strconv" "strings" + "sync" "testing" "time" ) @@ -37,56 +38,92 @@ import ( // //----------------------------------------------------------------------------- -type httpRtmgrMsg struct { - msg *rtmgr_models.XappSubscriptionData - w http.ResponseWriter - r *http.Request +type httpEventWaiter struct { + resultChan chan bool + nextActionOk bool } -func (msg *httpRtmgrMsg) RetOk() { - msg.w.WriteHeader(200) +func (msg *httpEventWaiter) SetResult(res bool) { + msg.resultChan <- res } +func (msg *httpEventWaiter) WaitResult(t *testing.T) bool { + select { + case result := <-msg.resultChan: + return result + case <-time.After(15 * time.Second): + testError(t, "Waiter not received result status from case within 15 secs") + return false + } + testError(t, "Waiter error in default branch") + return false +} + +//----------------------------------------------------------------------------- +// +//----------------------------------------------------------------------------- type testingHttpRtmgrControl struct { - desc string - port string - useChannel bool - msgChan chan *httpRtmgrMsg + sync.Mutex + desc string + port string + eventWaiter *httpEventWaiter } -func (hc *testingHttpRtmgrControl) UseChannel(flag bool) { - hc.useChannel = flag +func (hc *testingHttpRtmgrControl) NextEvent(eventWaiter *httpEventWaiter) { + hc.Lock() + defer hc.Unlock() + hc.eventWaiter = eventWaiter } -func (hc *testingHttpRtmgrControl) WaitReq(t *testing.T) *httpRtmgrMsg { - xapp.Logger.Info("(%s) handle_rtmgr_req", hc.desc) - select { - case msg := <-hc.msgChan: - return msg - case <-time.After(15 * time.Second): - testError(t, "(%s) Not Received RTMGR Subscription message within 15 secs", hc.desc) - return nil +func (hc *testingHttpRtmgrControl) AllocNextEvent(nextAction bool) *httpEventWaiter { + eventWaiter := &httpEventWaiter{ + resultChan: make(chan bool), + nextActionOk: nextAction, } - return nil + hc.NextEvent(eventWaiter) + return eventWaiter } func (hc *testingHttpRtmgrControl) http_handler(w http.ResponseWriter, r *http.Request) { + + hc.Lock() + defer hc.Unlock() + var req rtmgr_models.XappSubscriptionData err := json.NewDecoder(r.Body).Decode(&req) if err != nil { xapp.Logger.Error("%s", err.Error()) } xapp.Logger.Info("(%s) handling Address=%s Port=%d SubscriptionID=%d", hc.desc, *req.Address, *req.Port, *req.SubscriptionID) - msg := &httpRtmgrMsg{ - msg: &req, - w: w, - r: r, + + var code int = 0 + switch r.Method { + case http.MethodPost: + code = 201 + if hc.eventWaiter != nil { + if hc.eventWaiter.nextActionOk == false { + code = 400 + } + } + case http.MethodDelete: + code = 200 + if hc.eventWaiter != nil { + if hc.eventWaiter.nextActionOk == false { + code = 400 + } + } + default: + code = 200 } - if hc.useChannel { - hc.msgChan <- msg - } else { - msg.RetOk() + + waiter := hc.eventWaiter + hc.eventWaiter = nil + if waiter != nil { + waiter.SetResult(true) } + xapp.Logger.Info("(%s) Method=%s Reply with code %d", hc.desc, r.Method, code) + w.WriteHeader(code) + } func (hc *testingHttpRtmgrControl) run() { @@ -98,8 +135,6 @@ func initTestingHttpRtmgrControl(desc string, port string) *testingHttpRtmgrCont hc := &testingHttpRtmgrControl{} hc.desc = desc hc.port = port - hc.useChannel = false - hc.msgChan = make(chan *httpRtmgrMsg) return hc } diff --git a/pkg/control/messaging_test.go b/pkg/control/messaging_test.go index 2f7bc4a..68118ea 100644 --- a/pkg/control/messaging_test.go +++ b/pkg/control/messaging_test.go @@ -586,6 +586,41 @@ func (mc *testingMainControl) wait_msgcounter_change(t *testing.T, orig uint64, return 0, false } +//----------------------------------------------------------------------------- +// TestSubReqAndRouteNok +// +// stub stub +// +-------+ +---------+ +---------+ +// | xapp | | submgr | | rtmgr | +// +-------+ +---------+ +---------+ +// | | | +// | SubReq | | +// |------------->| | +// | | | +// | | RouteCreate | +// | |------------->| +// | | | +// | | RouteCreate | +// | | status:400 | +// | |<-------------| +// | | | +// | [SUBS INT DELETE] | +// | | | +// +//----------------------------------------------------------------------------- + +func TestSubReqAndRouteNok(t *testing.T) { + xapp.Logger.Info("TestSubReqAndRouteNok") + + waiter := rtmgrHttp.AllocNextEvent(false) + newSubsId := mainCtrl.get_subid(t) + xappConn1.handle_xapp_subs_req(t, nil) + waiter.WaitResult(t) + + //Wait that subs is cleaned + mainCtrl.wait_subs_clean(t, int(newSubsId), 10) +} + //----------------------------------------------------------------------------- // TestSubReqAndSubDelOk // @@ -622,26 +657,24 @@ func (mc *testingMainControl) wait_msgcounter_change(t *testing.T, orig uint64, //----------------------------------------------------------------------------- func TestSubReqAndSubDelOk(t *testing.T) { xapp.Logger.Info("TestSubReqAndSubDelOk") - rtmgrHttp.UseChannel(true) + waiter := rtmgrHttp.AllocNextEvent(true) cretrans := xappConn1.handle_xapp_subs_req(t, nil) - msg := rtmgrHttp.WaitReq(t) - msg.RetOk() + waiter.WaitResult(t) + crereq, cremsg := e2termConn.handle_e2term_subs_req(t) e2termConn.handle_e2term_subs_resp(t, crereq, cremsg) e2SubsId := xappConn1.handle_xapp_subs_resp(t, cretrans) - deltrans := xappConn1.handle_xapp_subs_del_req(t, nil, e2SubsId) delreq, delmsg := e2termConn.handle_e2term_subs_del_req(t) + + waiter = rtmgrHttp.AllocNextEvent(true) e2termConn.handle_e2term_subs_del_resp(t, delreq, delmsg) xappConn1.handle_xapp_subs_del_resp(t, deltrans) - - msg = rtmgrHttp.WaitReq(t) - msg.RetOk() + waiter.WaitResult(t) //Wait that subs is cleaned mainCtrl.wait_subs_clean(t, e2SubsId, 10) - rtmgrHttp.UseChannel(false) } //----------------------------------------------------------------------------- diff --git a/pkg/control/registry.go b/pkg/control/registry.go index c27e6a2..0fadabb 100644 --- a/pkg/control/registry.go +++ b/pkg/control/registry.go @@ -45,47 +45,32 @@ func (r *Registry) Initialize() { } // Reserves and returns the next free sequence number -func (r *Registry) ReserveSubscription(endPoint *RmrEndpoint, meid *xapp.RMRMeid) (*Subscription, error) { - // Check is current SequenceNumber valid - // Allocate next SequenceNumber value and retry N times +func (r *Registry) ReserveSubscription(meid *xapp.RMRMeid) (*Subscription, error) { r.mutex.Lock() defer r.mutex.Unlock() - var subs *Subscription = nil - var retrytimes uint16 = 1000 - for ; subs == nil && retrytimes > 0; retrytimes-- { + if len(r.subIds) > 0 { sequenceNumber := r.subIds[0] r.subIds = r.subIds[1:] if _, ok := r.register[sequenceNumber]; ok == false { subs := &Subscription{ - registry: r, - Seq: sequenceNumber, - Active: false, - RmrEndpoint: *endPoint, - Meid: meid, - Trans: nil, + registry: r, + Seq: sequenceNumber, + Active: false, + Meid: meid, + Trans: nil, } r.register[sequenceNumber] = subs - - // Update routing - r.mutex.Unlock() - err := subs.UpdateRoute(CREATE) - r.mutex.Lock() - if err != nil { - if _, ok := r.register[sequenceNumber]; ok { - delete(r.register, sequenceNumber) - } - return nil, err - } + xapp.Logger.Info("Registry: Create %s", subs.String()) + xapp.Logger.Debug("Registry: substable=%v", r.register) return subs, nil } } - return nil, fmt.Errorf("Registry: Failed to reserves subscription. RmrEndpoint: %s, Meid: %s", endPoint, meid.RanName) + return nil, fmt.Errorf("Registry: Failed to reserves subscription") } func (r *Registry) GetSubscription(sn uint16) *Subscription { r.mutex.Lock() defer r.mutex.Unlock() - xapp.Logger.Debug("Registry map: %v", r.register) if _, ok := r.register[sn]; ok { return r.register[sn] } @@ -96,8 +81,11 @@ func (r *Registry) DelSubscription(sn uint16) bool { r.mutex.Lock() defer r.mutex.Unlock() if _, ok := r.register[sn]; ok { + subs := r.register[sn] + xapp.Logger.Info("Registry: Delete %s", subs.String()) r.subIds = append(r.subIds, sn) delete(r.register, sn) + xapp.Logger.Debug("Registry: substable=%v", r.register) return true } return false diff --git a/pkg/control/subscription.go b/pkg/control/subscription.go index 6a72f74..6fd806c 100644 --- a/pkg/control/subscription.go +++ b/pkg/control/subscription.go @@ -35,15 +35,19 @@ type Subscription struct { Seq uint16 Active bool // - Meid *xapp.RMRMeid - RmrEndpoint // xapp endpoint. Now only one xapp can have relation to single subscription. To be changed in merge - Trans *Transaction + Meid *xapp.RMRMeid + EpList RmrEndpointList + Trans *Transaction +} + +func (s *Subscription) stringImpl() string { + return "subs(" + strconv.FormatUint(uint64(s.Seq), 10) + "/" + s.Meid.RanName + "/" + s.EpList.String() + ")" } func (s *Subscription) String() string { s.mutex.Lock() defer s.mutex.Unlock() - return strconv.FormatUint(uint64(s.Seq), 10) + "/" + s.RmrEndpoint.String() + "/" + s.Meid.RanName + return s.stringImpl() } func (s *Subscription) GetSubId() uint16 { @@ -79,20 +83,29 @@ func (s *Subscription) IsConfirmed() bool { return s.Active } -func (s *Subscription) SetTransaction(trans *Transaction) error { +func (s *Subscription) IsEndpoint(ep *RmrEndpoint) bool { s.mutex.Lock() defer s.mutex.Unlock() + return s.EpList.HasEndpoint(ep) +} - subString := strconv.FormatUint(uint64(s.Seq), 10) + "/" + s.RmrEndpoint.String() + "/" + s.Meid.RanName +func (s *Subscription) SetTransaction(trans *Transaction) error { + s.mutex.Lock() + defer s.mutex.Unlock() - if (s.RmrEndpoint.Addr != trans.RmrEndpoint.Addr) || (s.RmrEndpoint.Port != trans.RmrEndpoint.Port) { - return fmt.Errorf("Subscription: %s endpoint mismatch with trans: %s", subString, trans) - } if s.Trans != nil { - return fmt.Errorf("Subscription: %s trans %s exist, can not register %s", subString, s.Trans, trans) + return fmt.Errorf("subs(%s) trans(%s) exist, can not register trans(%s)", s.stringImpl(), s.Trans, trans) } trans.Subs = s s.Trans = trans + + if len(s.EpList.Endpoints) == 0 { + s.EpList.Endpoints = append(s.EpList.Endpoints, trans.RmrEndpoint) + return s.updateRouteImpl(CREATE) + } else if s.EpList.HasEndpoint(&trans.RmrEndpoint) == false { + s.EpList.Endpoints = append(s.EpList.Endpoints, trans.RmrEndpoint) + return s.updateRouteImpl(MERGE) + } return nil } @@ -113,11 +126,10 @@ func (s *Subscription) GetTransaction() *Transaction { } func (s *Subscription) updateRouteImpl(act Action) error { - xapp.Logger.Info("Subscription: Starting routing manager route add. SubId: %d, RmrEndpoint: %s", s.Seq, s.RmrEndpoint) - subRouteAction := SubRouteInfo{act, s.RmrEndpoint.Addr, s.RmrEndpoint.Port, s.Seq} + subRouteAction := SubRouteInfo{act, s.EpList, s.Seq} err := s.registry.rtmgrClient.SubscriptionRequestUpdate(subRouteAction) if err != nil { - return fmt.Errorf("Subscription: Failed to add route. SubId: %d, RmrEndpoint: %s", s.Seq, s.RmrEndpoint) + return fmt.Errorf("subs(%s) %s", s.stringImpl(), err.Error()) } return nil } @@ -129,12 +141,9 @@ func (s *Subscription) UpdateRoute(act Action) error { } func (s *Subscription) Release() { - xapp.Logger.Info("Subscription: Releasing %s", s) - s.mutex.Lock() - defer s.mutex.Unlock() s.registry.DelSubscription(s.Seq) - err := s.updateRouteImpl(DELETE) + err := s.UpdateRoute(DELETE) if err != nil { - xapp.Logger.Error("Registry: Failed to del route. SubId: %d, RmrEndpoint: %s", s.Seq, s.RmrEndpoint) + xapp.Logger.Error("%s", err.Error()) } } diff --git a/pkg/control/tracker.go b/pkg/control/tracker.go index 11d2cf7..20af9f0 100644 --- a/pkg/control/tracker.go +++ b/pkg/control/tracker.go @@ -63,13 +63,15 @@ func (t *Tracker) TrackTransaction( defer t.mutex.Unlock() xappkey := TransactionXappKey{*endpoint, xid} - if _, ok := t.transactionXappTable[xappkey]; ok { - err := fmt.Errorf("Tracker: Similar transaction with xappkey %s is ongoing, transaction %s not created ", xappkey, trans) + if othtrans, ok := t.transactionXappTable[xappkey]; ok { + err := fmt.Errorf("Tracker: %s is ongoing, %s not created ", othtrans, trans) return nil, err } trans.tracker = t t.transactionXappTable[xappkey] = trans + xapp.Logger.Info("Tracker: Create %s", trans.String()) + xapp.Logger.Debug("Tracker: transtable=%v", t.transactionXappTable) return trans, nil } @@ -77,8 +79,10 @@ func (t *Tracker) UnTrackTransaction(xappKey TransactionXappKey) (*Transaction, t.mutex.Lock() defer t.mutex.Unlock() if trans, ok2 := t.transactionXappTable[xappKey]; ok2 { + xapp.Logger.Info("Tracker: Delete %s", trans.String()) delete(t.transactionXappTable, xappKey) + xapp.Logger.Debug("Tracker: transtable=%v", t.transactionXappTable) return trans, nil } - return nil, fmt.Errorf("Tracker: No record for xappkey %s", xappKey) + return nil, fmt.Errorf("Tracker: No record %s", xappKey) } diff --git a/pkg/control/transaction.go b/pkg/control/transaction.go index 867b854..9adaeca 100644 --- a/pkg/control/transaction.go +++ b/pkg/control/transaction.go @@ -36,7 +36,7 @@ type TransactionXappKey struct { } func (key *TransactionXappKey) String() string { - return key.RmrEndpoint.String() + "/" + key.Xid + return "transkey(" + key.RmrEndpoint.String() + "/" + key.Xid + ")" } //----------------------------------------------------------------------------- @@ -61,14 +61,18 @@ type Transaction struct { ForwardRespToXapp bool } -func (t *Transaction) String() string { - t.mutex.Lock() - defer t.mutex.Unlock() +func (t *Transaction) StringImpl() string { var subId string = "?" if t.Subs != nil { subId = strconv.FormatUint(uint64(t.Subs.Seq), 10) } - return subId + "/" + t.RmrEndpoint.String() + "/" + t.Xid + return "trans(" + t.RmrEndpoint.String() + "/" + t.Xid + "/" + t.Meid.RanName + "/" + subId + ")" +} + +func (t *Transaction) String() string { + t.mutex.Lock() + defer t.mutex.Unlock() + return t.StringImpl() } func (t *Transaction) GetXid() string { @@ -115,16 +119,18 @@ func (t *Transaction) RetryTransaction() { } func (t *Transaction) Release() { - xapp.Logger.Info("Transaction: Releasing %s", t) t.mutex.Lock() - defer t.mutex.Unlock() - if t.Subs != nil { - t.Subs.UnSetTransaction(t) - } - if t.tracker != nil { - xappkey := TransactionXappKey{t.RmrEndpoint, t.Xid} - t.tracker.UnTrackTransaction(xappkey) - } + subs := t.Subs + tracker := t.tracker + xappkey := TransactionXappKey{t.RmrEndpoint, t.Xid} t.Subs = nil t.tracker = nil + t.mutex.Unlock() + + if subs != nil { + subs.UnSetTransaction(t) + } + if tracker != nil { + tracker.UnTrackTransaction(xappkey) + } } diff --git a/pkg/control/types.go b/pkg/control/types.go index 00674c7..e740349 100644 --- a/pkg/control/types.go +++ b/pkg/control/types.go @@ -36,16 +36,6 @@ type RmrDatagram struct { Payload []byte } -//----------------------------------------------------------------------------- -// -//----------------------------------------------------------------------------- -type SubRouteInfo struct { - Command Action - Address string - Port uint16 - SubID uint16 -} - //----------------------------------------------------------------------------- // //----------------------------------------------------------------------------- @@ -84,6 +74,32 @@ func (endpoint *RmrEndpoint) Set(src string) bool { return false } +//----------------------------------------------------------------------------- +// +//----------------------------------------------------------------------------- +type RmrEndpointList struct { + Endpoints []RmrEndpoint +} + +func (eplist *RmrEndpointList) String() string { + valuesText := []string{} + for i := range eplist.Endpoints { + ep := eplist.Endpoints[i] + text := ep.String() + valuesText = append(valuesText, text) + } + return strings.Join(valuesText, ",") +} + +func (eplist *RmrEndpointList) HasEndpoint(ep *RmrEndpoint) bool { + for i := range eplist.Endpoints { + if (eplist.Endpoints[i].Addr == ep.Addr) && (eplist.Endpoints[i].Port == ep.Port) { + return true + } + } + return false +} + func NewRmrEndpoint(src string) *RmrEndpoint { ep := &RmrEndpoint{} if ep.Set(src) == false { @@ -120,6 +136,6 @@ type RMRParams struct { func (params *RMRParams) String() string { var b bytes.Buffer - fmt.Fprintf(&b, "Src=%s Mtype=%s(%d) SubId=%v Xid=%s Meid=%v", params.Src, xapp.RicMessageTypeToName[params.Mtype], params.Mtype, params.SubId, params.Xid, params.Meid) + fmt.Fprintf(&b, "params(Src=%s Mtype=%s(%d) SubId=%v Xid=%s Meid=%s)", params.Src, xapp.RicMessageTypeToName[params.Mtype], params.Mtype, params.SubId, params.Xid, params.Meid.RanName) return b.String() } -- 2.16.6