RICPLT-3014 Subs multiple rmr endpoints 52/2252/3
authorJuha Hyttinen <juha.hyttinen@nokia.com>
Thu, 16 Jan 2020 12:05:01 +0000 (14:05 +0200)
committerJuha Hyttinen <juha.hyttinen@nokia.com>
Fri, 17 Jan 2020 07:13:11 +0000 (09:13 +0200)
Subscription to support multiple end points and to update routes for multiple endpoints

Change-Id: I1bb3c5fba61e878c66723a5e340aba06ac17a962
Signed-off-by: Juha Hyttinen <juha.hyttinen@nokia.com>
pkg/control/client.go
pkg/control/control.go
pkg/control/main_test.go
pkg/control/messaging_test.go
pkg/control/registry.go
pkg/control/subscription.go
pkg/control/tracker.go
pkg/control/transaction.go
pkg/control/types.go

index 6ab20fb..b067f3b 100644 (file)
@@ -20,7 +20,7 @@
 package control
 
 import (
-       "errors"
+       "fmt"
        rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
        rtmgrhandle "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client/handle"
        "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_models"
@@ -29,6 +29,22 @@ import (
        "strings"
 )
 
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+type SubRouteInfo struct {
+       Command Action
+       EpList  RmrEndpointList
+       SubID   uint16
+}
+
+func (sri *SubRouteInfo) String() string {
+       return "routeinfo(" + sri.Command.String() + "/" + strconv.FormatUint(uint64(sri.SubID), 10) + "/[" + sri.EpList.String() + "])"
+}
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
 type RtmgrClient struct {
        rtClient         *rtmgrclient.RoutingManager
        xappHandleParams *rtmgrhandle.ProvideXappSubscriptionHandleParams
@@ -36,48 +52,23 @@ type RtmgrClient struct {
 }
 
 func (rc *RtmgrClient) SubscriptionRequestUpdate(subRouteAction SubRouteInfo) error {
-       xapp.Logger.Debug("SubscriptionRequestUpdate() invoked")
        subID := int32(subRouteAction.SubID)
-       xapp.Logger.Debug("Subscription action details received. subRouteAction.Command: %v, Address %s, Port %v, subID %v", int16(subRouteAction.Command), subRouteAction.Address, subRouteAction.Port, subID)
-       xappSubReq := rtmgr_models.XappSubscriptionData{&subRouteAction.Address, &subRouteAction.Port, &subID}
-
+       xapp.Logger.Debug("%s ongoing", subRouteAction.String())
+       xappSubReq := rtmgr_models.XappSubscriptionData{&subRouteAction.EpList.Endpoints[0].Addr, &subRouteAction.EpList.Endpoints[0].Port, &subID}
+       var err error
        switch subRouteAction.Command {
        case CREATE:
-               _, postErr := rc.rtClient.Handle.ProvideXappSubscriptionHandle(rc.xappHandleParams.WithXappSubscriptionData(&xappSubReq))
-               if postErr != nil && !(strings.Contains(postErr.Error(), "status 200")) {
-                       xapp.Logger.Error("Updating routing manager about subscription id = %d failed with error: %v", subID, postErr)
-                       return postErr
-               } else {
-                       xapp.Logger.Info("Succesfully updated routing manager about the subscription: %d", subID)
-                       return nil
-               }
+               _, err = rc.rtClient.Handle.ProvideXappSubscriptionHandle(rc.xappHandleParams.WithXappSubscriptionData(&xappSubReq))
        case DELETE:
-               _, _, deleteErr := rc.rtClient.Handle.DeleteXappSubscriptionHandle(rc.xappDeleteParams.WithXappSubscriptionData(&xappSubReq))
-               if deleteErr != nil && !(strings.Contains(deleteErr.Error(), "status 200")) {
-                       xapp.Logger.Error("Deleting subscription id = %d  in routing manager, failed with error: %v", subID, deleteErr)
-                       return deleteErr
-               } else {
-                       xapp.Logger.Info("Succesfully deleted subscription: %d in routing manager.", subID)
-                       return nil
-               }
+               _, _, err = rc.rtClient.Handle.DeleteXappSubscriptionHandle(rc.xappDeleteParams.WithXappSubscriptionData(&xappSubReq))
        default:
-               xapp.Logger.Debug("Unknown subRouteAction.Command: %v, Address %s, Port %v, subID: %v", subRouteAction.Command, subRouteAction.Address, subRouteAction.Port, subID)
-               return nil
+               return fmt.Errorf("%s unknown", subRouteAction.String())
        }
-}
 
-func (rc *RtmgrClient) SplitSource(src string) (*string, *uint16, error) {
-       tcpSrc := strings.Split(src, ":")
-       if len(tcpSrc) != 2 {
-               err := errors.New("unable to get the source details of the xapp - check the source string received from the rmr")
-               return nil, nil, err
+       if err != nil && !(strings.Contains(err.Error(), "status 200")) {
+               return fmt.Errorf("%s failed with error: %s", subRouteAction.String(), err.Error())
        }
-       srcAddr := tcpSrc[0]
-       xapp.Logger.Debug("Debugging Inside splitsource tcpsrc[0] = %s and tcpsrc[1]= %s ", tcpSrc[0], tcpSrc[1])
-       srcPort, err := strconv.ParseUint(tcpSrc[1], 10, 16)
-       if err != nil {
-               return nil, nil, err
-       }
-       srcPortInt := uint16(srcPort)
-       return &srcAddr, &srcPortInt, nil
+       xapp.Logger.Debug("%s successful", subRouteAction.String())
+       return nil
+
 }
index cc176c2..1bc2ee8 100755 (executable)
@@ -174,54 +174,86 @@ func (c *Control) Consume(params *xapp.RMRParams) (err error) {
 
        return nil
 }
+func idstring(trans fmt.Stringer, subs fmt.Stringer, err error) string {
+       var retval string = ""
+       var filler string = ""
+       if trans != nil {
+               retval += filler + trans.String()
+               filler = " "
+       }
+       if subs != nil {
+               retval += filler + subs.String()
+               filler = " "
+       }
+       if err != nil {
+               retval += filler + "err(" + err.Error() + ")"
+               filler = " "
+       }
+       return retval
+}
+
+func (c *Control) findSubs(ids []int) (*Subscription, error) {
+       var subs *Subscription = nil
+       for _, id := range ids {
+               if id >= 0 {
+                       subs = c.registry.GetSubscription(uint16(id))
+               }
+               if subs != nil {
+                       break
+               }
+       }
+       if subs == nil {
+               return nil, fmt.Errorf("No valid subscription found with ids %v", ids)
+       }
+       return subs, nil
+}
+
+func (c *Control) findSubsAndTrans(ids []int) (*Subscription, *Transaction, error) {
+       subs, err := c.findSubs(ids)
+       if err != nil {
+               return nil, nil, err
+       }
+       trans := subs.GetTransaction()
+       if trans == nil {
+               return subs, nil, fmt.Errorf("No ongoing transaction found from %s", idstring(nil, subs, nil))
+       }
+       return subs, trans, nil
+}
 
 func (c *Control) handleSubscriptionRequest(params *RMRParams) {
        xapp.Logger.Info("SubReq from xapp: %s", params.String())
 
-       //
-       //
-       //
-       trans, err := c.tracker.TrackTransaction(NewRmrEndpoint(params.Src),
-               params.Xid,
-               params.Meid,
-               false,
-               true)
-
+       SubReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
        if err != nil {
-               xapp.Logger.Error("SubReq: %s, Dropping this msg. %s", err.Error(), params.String())
+               xapp.Logger.Error("SubReq Drop: %s", idstring(params, nil, err))
                return
        }
 
-       //
-       //
-       //
-       trans.SubReqMsg, err = c.e2ap.UnpackSubscriptionRequest(params.Payload)
+       trans, err := c.tracker.TrackTransaction(NewRmrEndpoint(params.Src), params.Xid, params.Meid, false, true)
        if err != nil {
-               xapp.Logger.Error("SubReq: %s Dropping this msg. %s", err.Error(), trans)
-               trans.Release()
+               xapp.Logger.Error("SubReq Drop: %s", idstring(params, nil, err))
                return
        }
+       trans.SubReqMsg = SubReqMsg
 
-       //
-       //
-       //
-       subs, err := c.registry.ReserveSubscription(&trans.RmrEndpoint, trans.Meid)
+       subs, err := c.registry.ReserveSubscription(trans.Meid)
        if err != nil {
-               xapp.Logger.Error("SubReq: %s, Dropping this msg. %s", err.Error(), trans)
+               xapp.Logger.Error("SubReq Drop: %s", idstring(trans, nil, err))
                trans.Release()
                return
        }
 
        err = subs.SetTransaction(trans)
        if err != nil {
-               xapp.Logger.Error("SubReq: %s, Dropping this msg. %s", err.Error(), trans)
+               xapp.Logger.Error("SubReq Drop: %s", idstring(trans, subs, err))
                subs.Release()
                trans.Release()
                return
        }
-
        trans.SubReqMsg.RequestId.Seq = uint32(subs.GetSubId())
 
+       xapp.Logger.Debug("SubReq: Handling %s", idstring(trans, subs, nil))
+
        //
        // TODO: subscription create is in fact owned by subscription and not transaction.
        //       Transaction is toward xapp while Subscription is toward ran.
@@ -232,59 +264,34 @@ func (c *Control) handleSubscriptionRequest(params *RMRParams) {
        //
        trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(trans.SubReqMsg)
        if err != nil {
-               xapp.Logger.Error("SubReq: %s for trans %s", err.Error(), trans)
+               xapp.Logger.Error("SubResp Drop: %s", idstring(trans, subs, err))
                subs.Release()
                trans.Release()
                return
        }
 
        c.rmrSend("SubReq: SubReq to E2T", subs, trans)
-
        c.timerMap.StartTimer("RIC_SUB_REQ", int(subs.GetSubId()), subReqTime, FirstTry, c.handleSubscriptionRequestTimer)
-       xapp.Logger.Debug("SubReq: Debugging trans table = %v", c.tracker.transactionXappTable)
        return
 }
 
 func (c *Control) handleSubscriptionResponse(params *RMRParams) {
        xapp.Logger.Info("SubResp from E2T: %s", params.String())
 
-       //
-       //
-       //
        SubRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
        if err != nil {
-               xapp.Logger.Error("SubResp: %s Dropping this msg. %s", err.Error(), params.String())
-               return
-       }
-
-       //
-       //
-       //
-       subs := c.registry.GetSubscription(uint16(SubRespMsg.RequestId.Seq))
-       if subs == nil && params.SubId > 0 {
-               subs = c.registry.GetSubscription(uint16(params.SubId))
-       }
-
-       if subs == nil {
-               xapp.Logger.Error("SubResp: Not valid subscription found payloadSeqNum: %d, SubId: %d. Dropping this msg. %s", SubRespMsg.RequestId.Seq, params.SubId, params.String())
+               xapp.Logger.Error("SubResp Drop %s", idstring(params, nil, err))
                return
        }
-       xapp.Logger.Info("SubResp: subscription found payloadSeqNum: %d, SubId: %d", SubRespMsg.RequestId.Seq, subs.GetSubId())
 
-       //
-       //
-       //
-       trans := subs.GetTransaction()
-       if trans == nil {
-               xapp.Logger.Error("SubResp: Unknown trans. Dropping this msg. SubId: %d", subs.GetSubId())
+       subs, trans, err := c.findSubsAndTrans([]int{int(SubRespMsg.RequestId.Seq), params.SubId})
+       if err != nil {
+               xapp.Logger.Error("SubResp: %s", idstring(params, nil, err))
                return
        }
-
        trans.SubRespMsg = SubRespMsg
+       xapp.Logger.Debug("SubResp: Handling %s", idstring(trans, subs, nil))
 
-       //
-       //
-       //
        c.timerMap.StopTimer("RIC_SUB_REQ", int(subs.GetSubId()))
 
        responseReceived := trans.CheckResponseReceived()
@@ -295,7 +302,7 @@ func (c *Control) handleSubscriptionResponse(params *RMRParams) {
 
        trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(trans.SubRespMsg)
        if err != nil {
-               xapp.Logger.Error("SubResp: %s for trans %s", err.Error(), trans)
+               xapp.Logger.Error("SubResp: %s", idstring(trans, subs, err))
                trans.Release()
                return
        }
@@ -309,49 +316,22 @@ func (c *Control) handleSubscriptionResponse(params *RMRParams) {
 func (c *Control) handleSubscriptionFailure(params *RMRParams) {
        xapp.Logger.Info("SubFail from E2T: %s", params.String())
 
-       //
-       //
-       //
        SubFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
        if err != nil {
-               xapp.Logger.Error("SubFail: %s Dropping this msg. %s", err.Error(), params.String())
+               xapp.Logger.Error("SubFail Drop %s", idstring(params, nil, err))
                return
        }
 
-       //
-       //
-       //
-       subs := c.registry.GetSubscription(uint16(SubFailMsg.RequestId.Seq))
-       if subs == nil && params.SubId > 0 {
-               subs = c.registry.GetSubscription(uint16(params.SubId))
-       }
-
-       if subs == nil {
-               xapp.Logger.Error("SubFail: Not valid subscription found payloadSeqNum: %d, SubId: %d. Dropping this msg. %s", SubFailMsg.RequestId.Seq, params.SubId, params.String())
-               return
-       }
-       xapp.Logger.Info("SubFail: subscription found payloadSeqNum: %d, SubId: %d", SubFailMsg.RequestId.Seq, subs.GetSubId())
-
-       //
-       //
-       //
-       trans := subs.GetTransaction()
-       if trans == nil {
-               xapp.Logger.Error("SubFail: Unknown trans. Dropping this msg. SubId: %d", subs.GetSubId())
+       subs, trans, err := c.findSubsAndTrans([]int{int(SubFailMsg.RequestId.Seq), params.SubId})
+       if err != nil {
+               xapp.Logger.Error("SubFail: %s", idstring(params, nil, err))
                return
        }
        trans.SubFailMsg = SubFailMsg
+       xapp.Logger.Debug("SubFail: Handling %s", idstring(trans, subs, nil))
 
-       //
-       //
-       //
        c.timerMap.StopTimer("RIC_SUB_REQ", int(subs.GetSubId()))
-
        responseReceived := trans.CheckResponseReceived()
-       if err != nil {
-               return
-       }
-
        if responseReceived == true {
                // Subscription timer already received
                return
@@ -363,7 +343,7 @@ func (c *Control) handleSubscriptionFailure(params *RMRParams) {
                time.Sleep(3 * time.Second)
        } else {
                //TODO error handling improvement
-               xapp.Logger.Error("SubFail: %s for trans %s (continuing cleaning)", err.Error(), trans)
+               xapp.Logger.Error("SubFail: (continue cleaning) %s", idstring(trans, subs, err))
        }
 
        trans.Release()
@@ -374,27 +354,21 @@ func (c *Control) handleSubscriptionFailure(params *RMRParams) {
 func (c *Control) handleSubscriptionRequestTimer(strId string, nbrId int, tryCount uint64) {
        xapp.Logger.Info("SubReq timeout: subId: %v,  tryCount: %v", nbrId, tryCount)
 
-       subs := c.registry.GetSubscription(uint16(nbrId))
-       if subs == nil {
-               xapp.Logger.Error("SubReq timeout: Unknown payloadSeqNum. Dropping this msg. SubId: %v", nbrId)
-               return
-       }
-
-       trans := subs.GetTransaction()
-       if trans == nil {
-               xapp.Logger.Error("SubReq timeout: Unknown trans. Dropping this msg. SubId: %v", subs.GetSubId())
+       subs, trans, err := c.findSubsAndTrans(([]int{nbrId}))
+       if err != nil {
+               xapp.Logger.Error("SubReq timeout: %s", idstring(nil, nil, err))
                return
        }
+       xapp.Logger.Debug("SubReq timeout: Handling %s", idstring(trans, subs, nil))
 
        responseReceived := trans.CheckResponseReceived()
-
        if responseReceived == true {
                // Subscription Response or Failure already received
                return
        }
 
        if tryCount < maxSubReqTryCount {
-               xapp.Logger.Info("SubReq timeout: subs: %s trans: %s", subs, trans)
+               xapp.Logger.Info("SubReq timeout: %s", idstring(trans, subs, nil))
 
                trans.RetryTransaction()
 
@@ -409,14 +383,9 @@ func (c *Control) handleSubscriptionRequestTimer(strId string, nbrId int, tryCou
        trans.Release()
 
        // Create DELETE transaction (internal and no messages toward xapp)
-       deltrans, err := c.tracker.TrackTransaction(&trans.RmrEndpoint,
-               trans.GetXid(),
-               trans.GetMeid(),
-               false,
-               false)
-
+       deltrans, err := c.tracker.TrackTransaction(&trans.RmrEndpoint, trans.GetXid(), trans.GetMeid(), false, false)
        if err != nil {
-               xapp.Logger.Error("SubReq timeout: %s, Dropping this msg.", err.Error())
+               xapp.Logger.Error("SubReq timeout: %s", idstring(trans, subs, err))
                //TODO improve error handling. Important at least in merge
                subs.Release()
                return
@@ -428,7 +397,7 @@ func (c *Control) handleSubscriptionRequestTimer(strId string, nbrId int, tryCou
        deltrans.SubDelReqMsg.FunctionId = trans.SubReqMsg.FunctionId
        deltrans.Mtype, deltrans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(deltrans.SubDelReqMsg)
        if err != nil {
-               xapp.Logger.Error("SubReq timeout: Packing SubDelReq failed. Err: %v", err)
+               xapp.Logger.Error("SubReq timeout: %s", idstring(trans, subs, err))
                //TODO improve error handling. Important at least in merge
                deltrans.Release()
                subs.Release()
@@ -437,7 +406,7 @@ func (c *Control) handleSubscriptionRequestTimer(strId string, nbrId int, tryCou
 
        err = subs.SetTransaction(deltrans)
        if err != nil {
-               xapp.Logger.Error("SubReq timeout: %s, Dropping this msg.", err.Error())
+               xapp.Logger.Error("SubReq timeout: %s", idstring(trans, subs, err))
                //TODO improve error handling. Important at least in merge
                deltrans.Release()
                return
@@ -451,52 +420,35 @@ func (c *Control) handleSubscriptionRequestTimer(strId string, nbrId int, tryCou
 func (c *Control) handleSubscriptionDeleteRequest(params *RMRParams) {
        xapp.Logger.Info("SubDelReq from xapp: %s", params.String())
 
-       //
-       //
-       //
-       trans, err := c.tracker.TrackTransaction(NewRmrEndpoint(params.Src),
-               params.Xid,
-               params.Meid,
-               false,
-               true)
-
+       SubDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
        if err != nil {
-               xapp.Logger.Error("SubDelReq: %s, Dropping this msg. %s", err.Error(), params.String())
+               xapp.Logger.Error("SubDelReq Drop %s", idstring(params, nil, err))
                return
        }
 
-       //
-       //
-       //
-       trans.SubDelReqMsg, err = c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
+       trans, err := c.tracker.TrackTransaction(NewRmrEndpoint(params.Src), params.Xid, params.Meid, false, true)
        if err != nil {
-               xapp.Logger.Error("SubDelReq: %s Dropping this msg. %s", err.Error(), trans)
-               trans.Release()
+               xapp.Logger.Error("SubDelReq Drop %s", idstring(params, nil, err))
                return
        }
+       trans.SubDelReqMsg = SubDelReqMsg
 
-       //
-       //
-       //
-       subs := c.registry.GetSubscription(uint16(trans.SubDelReqMsg.RequestId.Seq))
-       if subs == nil && params.SubId > 0 {
-               subs = c.registry.GetSubscription(uint16(params.SubId))
-       }
-
-       if subs == nil {
-               xapp.Logger.Error("SubDelReq: Not valid subscription found payloadSeqNum: %d, SubId: %d. Dropping this msg. %s", trans.SubDelReqMsg.RequestId.Seq, params.SubId, trans)
+       subs, err := c.findSubs([]int{int(trans.SubDelReqMsg.RequestId.Seq), params.SubId})
+       if err != nil {
+               xapp.Logger.Error("SubDelReq: %s", idstring(params, nil, err))
                trans.Release()
                return
        }
-       xapp.Logger.Info("SubDelReq: subscription found payloadSeqNum: %d, SubId: %d. %s", trans.SubDelReqMsg.RequestId.Seq, params.SubId, trans)
 
        err = subs.SetTransaction(trans)
        if err != nil {
-               xapp.Logger.Error("SubDelReq: %s, Dropping this msg. %s", err.Error(), trans)
+               xapp.Logger.Error("SubDelReq: %s", idstring(trans, subs, err))
                trans.Release()
                return
        }
 
+       xapp.Logger.Debug("SubDelReq: Handling %s", idstring(trans, subs, nil))
+
        //
        // TODO: subscription delete is in fact owned by subscription and not transaction.
        //       Transaction is toward xapp while Subscription is toward ran.
@@ -507,7 +459,7 @@ func (c *Control) handleSubscriptionDeleteRequest(params *RMRParams) {
        //
        trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(trans.SubDelReqMsg)
        if err != nil {
-               xapp.Logger.Error("SubDelReq: %s for trans %s", err.Error(), trans)
+               xapp.Logger.Error("SubDelReq: %s", idstring(trans, subs, err))
                trans.Release()
                return
        }
@@ -523,43 +475,20 @@ func (c *Control) handleSubscriptionDeleteRequest(params *RMRParams) {
 func (c *Control) handleSubscriptionDeleteResponse(params *RMRParams) (err error) {
        xapp.Logger.Info("SubDelResp from E2T:%s", params.String())
 
-       //
-       //
-       //
        SubDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
        if err != nil {
-               xapp.Logger.Error("SubDelResp: %s Dropping this msg. %s", err.Error(), params.String())
-               return
-       }
-
-       //
-       //
-       //
-       subs := c.registry.GetSubscription(uint16(SubDelRespMsg.RequestId.Seq))
-       if subs == nil && params.SubId > 0 {
-               subs = c.registry.GetSubscription(uint16(params.SubId))
-       }
-
-       if subs == nil {
-               xapp.Logger.Error("SubDelResp: Not valid subscription found payloadSeqNum: %d, SubId: %d. Dropping this msg. %s", SubDelRespMsg.RequestId.Seq, params.SubId, params.String())
+               xapp.Logger.Error("SubDelResp: Dropping this msg. %s", idstring(params, nil, err))
                return
        }
-       xapp.Logger.Info("SubDelResp: subscription found payloadSeqNum: %d, SubId: %d", SubDelRespMsg.RequestId.Seq, subs.GetSubId())
 
-       //
-       //
-       //
-       trans := subs.GetTransaction()
-       if trans == nil {
-               xapp.Logger.Error("SubDelResp: Unknown trans. Dropping this msg. SubId: %d", subs.GetSubId())
+       subs, trans, err := c.findSubsAndTrans([]int{int(SubDelRespMsg.RequestId.Seq), params.SubId})
+       if err != nil {
+               xapp.Logger.Error("SubDelResp: %s", idstring(params, nil, err))
                return
        }
-
        trans.SubDelRespMsg = SubDelRespMsg
+       xapp.Logger.Debug("SubDelResp: Handling %s", idstring(trans, subs, nil))
 
-       //
-       //
-       //
        c.timerMap.StopTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()))
 
        responseReceived := trans.CheckResponseReceived()
@@ -575,42 +504,20 @@ func (c *Control) handleSubscriptionDeleteResponse(params *RMRParams) (err error
 func (c *Control) handleSubscriptionDeleteFailure(params *RMRParams) {
        xapp.Logger.Info("SubDelFail from E2T:%s", params.String())
 
-       //
-       //
-       //
        SubDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
        if err != nil {
-               xapp.Logger.Error("SubDelFail: %s Dropping this msg. %s", err.Error(), params.String())
+               xapp.Logger.Error("SubDelFail: Dropping this msg. %s", idstring(params, nil, err))
                return
        }
 
-       //
-       //
-       //
-       subs := c.registry.GetSubscription(uint16(SubDelFailMsg.RequestId.Seq))
-       if subs == nil && params.SubId > 0 {
-               subs = c.registry.GetSubscription(uint16(params.SubId))
-       }
-
-       if subs == nil {
-               xapp.Logger.Error("SubDelFail: Not valid subscription found payloadSeqNum: %d, SubId: %d. Dropping this msg. %s", SubDelFailMsg.RequestId.Seq, params.SubId, params.String())
-               return
-       }
-       xapp.Logger.Info("SubDelFail: subscription found payloadSeqNum: %d, SubId: %d", SubDelFailMsg.RequestId.Seq, subs.GetSubId())
-
-       //
-       //
-       //
-       trans := subs.GetTransaction()
-       if trans == nil {
-               xapp.Logger.Error("SubDelFail: Unknown trans. Dropping this msg. SubId: %d", subs.GetSubId())
+       subs, trans, err := c.findSubsAndTrans([]int{int(SubDelFailMsg.RequestId.Seq), params.SubId})
+       if err != nil {
+               xapp.Logger.Error("SubDelFail: %s", idstring(params, nil, err))
                return
        }
        trans.SubDelFailMsg = SubDelFailMsg
+       xapp.Logger.Debug("SubDelFail: Handling %s", idstring(trans, subs, nil))
 
-       //
-       //
-       //
        c.timerMap.StopTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()))
 
        responseReceived := trans.CheckResponseReceived()
@@ -626,17 +533,12 @@ func (c *Control) handleSubscriptionDeleteFailure(params *RMRParams) {
 func (c *Control) handleSubscriptionDeleteRequestTimer(strId string, nbrId int, tryCount uint64) {
        xapp.Logger.Info("SubDelReq timeout: subId: %v, tryCount: %v", nbrId, tryCount)
 
-       subs := c.registry.GetSubscription(uint16(nbrId))
-       if subs == nil {
-               xapp.Logger.Error("SubDelReq timeout: Unknown payloadSeqNum. Dropping this msg. SubId: %v", nbrId)
-               return
-       }
-
-       trans := subs.GetTransaction()
-       if trans == nil {
-               xapp.Logger.Error("SubDelReq timeout: Unknown trans. Dropping this msg. SubId: %v", subs.GetSubId())
+       subs, trans, err := c.findSubsAndTrans([]int{nbrId})
+       if err != nil {
+               xapp.Logger.Error("SubDelReq timeout: %s", idstring(nil, nil, err))
                return
        }
+       xapp.Logger.Debug("SubDelReq timeout: Handling %s", idstring(trans, subs, nil))
 
        responseReceived := trans.CheckResponseReceived()
        if responseReceived == true {
@@ -673,7 +575,7 @@ func (c *Control) sendSubscriptionDeleteResponse(desc string, trans *Transaction
                        time.Sleep(3 * time.Second)
                } else {
                        //TODO error handling improvement
-                       xapp.Logger.Error("%s: %s for trans %s (continuing cleaning)", desc, err.Error(), trans)
+                       xapp.Logger.Error("%s: (continue cleaning) %s", desc, idstring(trans, subs, err))
                }
        }
 
index ecf5a19..d3d7b57 100644 (file)
@@ -29,6 +29,7 @@ import (
        "os"
        "strconv"
        "strings"
+       "sync"
        "testing"
        "time"
 )
@@ -37,56 +38,92 @@ import (
 //
 //-----------------------------------------------------------------------------
 
-type httpRtmgrMsg struct {
-       msg *rtmgr_models.XappSubscriptionData
-       w   http.ResponseWriter
-       r   *http.Request
+type httpEventWaiter struct {
+       resultChan   chan bool
+       nextActionOk bool
 }
 
-func (msg *httpRtmgrMsg) RetOk() {
-       msg.w.WriteHeader(200)
+func (msg *httpEventWaiter) SetResult(res bool) {
+       msg.resultChan <- res
 }
 
+func (msg *httpEventWaiter) WaitResult(t *testing.T) bool {
+       select {
+       case result := <-msg.resultChan:
+               return result
+       case <-time.After(15 * time.Second):
+               testError(t, "Waiter not received result status from case within 15 secs")
+               return false
+       }
+       testError(t, "Waiter error in default branch")
+       return false
+}
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
 type testingHttpRtmgrControl struct {
-       desc       string
-       port       string
-       useChannel bool
-       msgChan    chan *httpRtmgrMsg
+       sync.Mutex
+       desc        string
+       port        string
+       eventWaiter *httpEventWaiter
 }
 
-func (hc *testingHttpRtmgrControl) UseChannel(flag bool) {
-       hc.useChannel = flag
+func (hc *testingHttpRtmgrControl) NextEvent(eventWaiter *httpEventWaiter) {
+       hc.Lock()
+       defer hc.Unlock()
+       hc.eventWaiter = eventWaiter
 }
 
-func (hc *testingHttpRtmgrControl) WaitReq(t *testing.T) *httpRtmgrMsg {
-       xapp.Logger.Info("(%s) handle_rtmgr_req", hc.desc)
-       select {
-       case msg := <-hc.msgChan:
-               return msg
-       case <-time.After(15 * time.Second):
-               testError(t, "(%s) Not Received RTMGR Subscription message within 15 secs", hc.desc)
-               return nil
+func (hc *testingHttpRtmgrControl) AllocNextEvent(nextAction bool) *httpEventWaiter {
+       eventWaiter := &httpEventWaiter{
+               resultChan:   make(chan bool),
+               nextActionOk: nextAction,
        }
-       return nil
+       hc.NextEvent(eventWaiter)
+       return eventWaiter
 }
 
 func (hc *testingHttpRtmgrControl) http_handler(w http.ResponseWriter, r *http.Request) {
+
+       hc.Lock()
+       defer hc.Unlock()
+
        var req rtmgr_models.XappSubscriptionData
        err := json.NewDecoder(r.Body).Decode(&req)
        if err != nil {
                xapp.Logger.Error("%s", err.Error())
        }
        xapp.Logger.Info("(%s) handling Address=%s Port=%d SubscriptionID=%d", hc.desc, *req.Address, *req.Port, *req.SubscriptionID)
-       msg := &httpRtmgrMsg{
-               msg: &req,
-               w:   w,
-               r:   r,
+
+       var code int = 0
+       switch r.Method {
+       case http.MethodPost:
+               code = 201
+               if hc.eventWaiter != nil {
+                       if hc.eventWaiter.nextActionOk == false {
+                               code = 400
+                       }
+               }
+       case http.MethodDelete:
+               code = 200
+               if hc.eventWaiter != nil {
+                       if hc.eventWaiter.nextActionOk == false {
+                               code = 400
+                       }
+               }
+       default:
+               code = 200
        }
-       if hc.useChannel {
-               hc.msgChan <- msg
-       } else {
-               msg.RetOk()
+
+       waiter := hc.eventWaiter
+       hc.eventWaiter = nil
+       if waiter != nil {
+               waiter.SetResult(true)
        }
+       xapp.Logger.Info("(%s) Method=%s Reply with code %d", hc.desc, r.Method, code)
+       w.WriteHeader(code)
+
 }
 
 func (hc *testingHttpRtmgrControl) run() {
@@ -98,8 +135,6 @@ func initTestingHttpRtmgrControl(desc string, port string) *testingHttpRtmgrCont
        hc := &testingHttpRtmgrControl{}
        hc.desc = desc
        hc.port = port
-       hc.useChannel = false
-       hc.msgChan = make(chan *httpRtmgrMsg)
        return hc
 }
 
index 2f7bc4a..68118ea 100644 (file)
@@ -586,6 +586,41 @@ func (mc *testingMainControl) wait_msgcounter_change(t *testing.T, orig uint64,
        return 0, false
 }
 
+//-----------------------------------------------------------------------------
+// TestSubReqAndRouteNok
+//
+//   stub                          stub
+// +-------+     +---------+    +---------+
+// | xapp  |     | submgr  |    | rtmgr   |
+// +-------+     +---------+    +---------+
+//     |              |              |
+//     | SubReq       |              |
+//     |------------->|              |
+//     |              |              |
+//     |              | RouteCreate  |
+//     |              |------------->|
+//     |              |              |
+//     |              | RouteCreate  |
+//     |              |  status:400  |
+//     |              |<-------------|
+//     |              |              |
+//     |       [SUBS INT DELETE]     |
+//     |              |              |
+//
+//-----------------------------------------------------------------------------
+
+func TestSubReqAndRouteNok(t *testing.T) {
+       xapp.Logger.Info("TestSubReqAndRouteNok")
+
+       waiter := rtmgrHttp.AllocNextEvent(false)
+       newSubsId := mainCtrl.get_subid(t)
+       xappConn1.handle_xapp_subs_req(t, nil)
+       waiter.WaitResult(t)
+
+       //Wait that subs is cleaned
+       mainCtrl.wait_subs_clean(t, int(newSubsId), 10)
+}
+
 //-----------------------------------------------------------------------------
 // TestSubReqAndSubDelOk
 //
@@ -622,26 +657,24 @@ func (mc *testingMainControl) wait_msgcounter_change(t *testing.T, orig uint64,
 //-----------------------------------------------------------------------------
 func TestSubReqAndSubDelOk(t *testing.T) {
        xapp.Logger.Info("TestSubReqAndSubDelOk")
-       rtmgrHttp.UseChannel(true)
 
+       waiter := rtmgrHttp.AllocNextEvent(true)
        cretrans := xappConn1.handle_xapp_subs_req(t, nil)
-       msg := rtmgrHttp.WaitReq(t)
-       msg.RetOk()
+       waiter.WaitResult(t)
+
        crereq, cremsg := e2termConn.handle_e2term_subs_req(t)
        e2termConn.handle_e2term_subs_resp(t, crereq, cremsg)
        e2SubsId := xappConn1.handle_xapp_subs_resp(t, cretrans)
-
        deltrans := xappConn1.handle_xapp_subs_del_req(t, nil, e2SubsId)
        delreq, delmsg := e2termConn.handle_e2term_subs_del_req(t)
+
+       waiter = rtmgrHttp.AllocNextEvent(true)
        e2termConn.handle_e2term_subs_del_resp(t, delreq, delmsg)
        xappConn1.handle_xapp_subs_del_resp(t, deltrans)
-
-       msg = rtmgrHttp.WaitReq(t)
-       msg.RetOk()
+       waiter.WaitResult(t)
 
        //Wait that subs is cleaned
        mainCtrl.wait_subs_clean(t, e2SubsId, 10)
-       rtmgrHttp.UseChannel(false)
 }
 
 //-----------------------------------------------------------------------------
index c27e6a2..0fadabb 100644 (file)
@@ -45,47 +45,32 @@ func (r *Registry) Initialize() {
 }
 
 // Reserves and returns the next free sequence number
-func (r *Registry) ReserveSubscription(endPoint *RmrEndpoint, meid *xapp.RMRMeid) (*Subscription, error) {
-       // Check is current SequenceNumber valid
-       // Allocate next SequenceNumber value and retry N times
+func (r *Registry) ReserveSubscription(meid *xapp.RMRMeid) (*Subscription, error) {
        r.mutex.Lock()
        defer r.mutex.Unlock()
-       var subs *Subscription = nil
-       var retrytimes uint16 = 1000
-       for ; subs == nil && retrytimes > 0; retrytimes-- {
+       if len(r.subIds) > 0 {
                sequenceNumber := r.subIds[0]
                r.subIds = r.subIds[1:]
                if _, ok := r.register[sequenceNumber]; ok == false {
                        subs := &Subscription{
-                               registry:    r,
-                               Seq:         sequenceNumber,
-                               Active:      false,
-                               RmrEndpoint: *endPoint,
-                               Meid:        meid,
-                               Trans:       nil,
+                               registry: r,
+                               Seq:      sequenceNumber,
+                               Active:   false,
+                               Meid:     meid,
+                               Trans:    nil,
                        }
                        r.register[sequenceNumber] = subs
-
-                       // Update routing
-                       r.mutex.Unlock()
-                       err := subs.UpdateRoute(CREATE)
-                       r.mutex.Lock()
-                       if err != nil {
-                               if _, ok := r.register[sequenceNumber]; ok {
-                                       delete(r.register, sequenceNumber)
-                               }
-                               return nil, err
-                       }
+                       xapp.Logger.Info("Registry: Create %s", subs.String())
+                       xapp.Logger.Debug("Registry: substable=%v", r.register)
                        return subs, nil
                }
        }
-       return nil, fmt.Errorf("Registry: Failed to reserves subscription. RmrEndpoint: %s, Meid: %s", endPoint, meid.RanName)
+       return nil, fmt.Errorf("Registry: Failed to reserves subscription")
 }
 
 func (r *Registry) GetSubscription(sn uint16) *Subscription {
        r.mutex.Lock()
        defer r.mutex.Unlock()
-       xapp.Logger.Debug("Registry map: %v", r.register)
        if _, ok := r.register[sn]; ok {
                return r.register[sn]
        }
@@ -96,8 +81,11 @@ func (r *Registry) DelSubscription(sn uint16) bool {
        r.mutex.Lock()
        defer r.mutex.Unlock()
        if _, ok := r.register[sn]; ok {
+               subs := r.register[sn]
+               xapp.Logger.Info("Registry: Delete %s", subs.String())
                r.subIds = append(r.subIds, sn)
                delete(r.register, sn)
+               xapp.Logger.Debug("Registry: substable=%v", r.register)
                return true
        }
        return false
index 6a72f74..6fd806c 100644 (file)
@@ -35,15 +35,19 @@ type Subscription struct {
        Seq      uint16
        Active   bool
        //
-       Meid        *xapp.RMRMeid
-       RmrEndpoint // xapp endpoint. Now only one xapp can have relation to single subscription. To be changed in merge
-       Trans       *Transaction
+       Meid   *xapp.RMRMeid
+       EpList RmrEndpointList
+       Trans  *Transaction
+}
+
+func (s *Subscription) stringImpl() string {
+       return "subs(" + strconv.FormatUint(uint64(s.Seq), 10) + "/" + s.Meid.RanName + "/" + s.EpList.String() + ")"
 }
 
 func (s *Subscription) String() string {
        s.mutex.Lock()
        defer s.mutex.Unlock()
-       return strconv.FormatUint(uint64(s.Seq), 10) + "/" + s.RmrEndpoint.String() + "/" + s.Meid.RanName
+       return s.stringImpl()
 }
 
 func (s *Subscription) GetSubId() uint16 {
@@ -79,20 +83,29 @@ func (s *Subscription) IsConfirmed() bool {
        return s.Active
 }
 
-func (s *Subscription) SetTransaction(trans *Transaction) error {
+func (s *Subscription) IsEndpoint(ep *RmrEndpoint) bool {
        s.mutex.Lock()
        defer s.mutex.Unlock()
+       return s.EpList.HasEndpoint(ep)
+}
 
-       subString := strconv.FormatUint(uint64(s.Seq), 10) + "/" + s.RmrEndpoint.String() + "/" + s.Meid.RanName
+func (s *Subscription) SetTransaction(trans *Transaction) error {
+       s.mutex.Lock()
+       defer s.mutex.Unlock()
 
-       if (s.RmrEndpoint.Addr != trans.RmrEndpoint.Addr) || (s.RmrEndpoint.Port != trans.RmrEndpoint.Port) {
-               return fmt.Errorf("Subscription: %s endpoint mismatch with trans: %s", subString, trans)
-       }
        if s.Trans != nil {
-               return fmt.Errorf("Subscription: %s trans %s exist, can not register %s", subString, s.Trans, trans)
+               return fmt.Errorf("subs(%s) trans(%s) exist, can not register trans(%s)", s.stringImpl(), s.Trans, trans)
        }
        trans.Subs = s
        s.Trans = trans
+
+       if len(s.EpList.Endpoints) == 0 {
+               s.EpList.Endpoints = append(s.EpList.Endpoints, trans.RmrEndpoint)
+               return s.updateRouteImpl(CREATE)
+       } else if s.EpList.HasEndpoint(&trans.RmrEndpoint) == false {
+               s.EpList.Endpoints = append(s.EpList.Endpoints, trans.RmrEndpoint)
+               return s.updateRouteImpl(MERGE)
+       }
        return nil
 }
 
@@ -113,11 +126,10 @@ func (s *Subscription) GetTransaction() *Transaction {
 }
 
 func (s *Subscription) updateRouteImpl(act Action) error {
-       xapp.Logger.Info("Subscription: Starting routing manager route add. SubId: %d, RmrEndpoint: %s", s.Seq, s.RmrEndpoint)
-       subRouteAction := SubRouteInfo{act, s.RmrEndpoint.Addr, s.RmrEndpoint.Port, s.Seq}
+       subRouteAction := SubRouteInfo{act, s.EpList, s.Seq}
        err := s.registry.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
        if err != nil {
-               return fmt.Errorf("Subscription: Failed to add route. SubId: %d, RmrEndpoint: %s", s.Seq, s.RmrEndpoint)
+               return fmt.Errorf("subs(%s) %s", s.stringImpl(), err.Error())
        }
        return nil
 }
@@ -129,12 +141,9 @@ func (s *Subscription) UpdateRoute(act Action) error {
 }
 
 func (s *Subscription) Release() {
-       xapp.Logger.Info("Subscription: Releasing %s", s)
-       s.mutex.Lock()
-       defer s.mutex.Unlock()
        s.registry.DelSubscription(s.Seq)
-       err := s.updateRouteImpl(DELETE)
+       err := s.UpdateRoute(DELETE)
        if err != nil {
-               xapp.Logger.Error("Registry: Failed to del route. SubId: %d, RmrEndpoint: %s", s.Seq, s.RmrEndpoint)
+               xapp.Logger.Error("%s", err.Error())
        }
 }
index 11d2cf7..20af9f0 100644 (file)
@@ -63,13 +63,15 @@ func (t *Tracker) TrackTransaction(
        defer t.mutex.Unlock()
 
        xappkey := TransactionXappKey{*endpoint, xid}
-       if _, ok := t.transactionXappTable[xappkey]; ok {
-               err := fmt.Errorf("Tracker: Similar transaction with xappkey %s is ongoing, transaction %s not created ", xappkey, trans)
+       if othtrans, ok := t.transactionXappTable[xappkey]; ok {
+               err := fmt.Errorf("Tracker: %s is ongoing, %s not created ", othtrans, trans)
                return nil, err
        }
 
        trans.tracker = t
        t.transactionXappTable[xappkey] = trans
+       xapp.Logger.Info("Tracker: Create %s", trans.String())
+       xapp.Logger.Debug("Tracker: transtable=%v", t.transactionXappTable)
        return trans, nil
 }
 
@@ -77,8 +79,10 @@ func (t *Tracker) UnTrackTransaction(xappKey TransactionXappKey) (*Transaction,
        t.mutex.Lock()
        defer t.mutex.Unlock()
        if trans, ok2 := t.transactionXappTable[xappKey]; ok2 {
+               xapp.Logger.Info("Tracker: Delete %s", trans.String())
                delete(t.transactionXappTable, xappKey)
+               xapp.Logger.Debug("Tracker: transtable=%v", t.transactionXappTable)
                return trans, nil
        }
-       return nil, fmt.Errorf("Tracker: No record for xappkey %s", xappKey)
+       return nil, fmt.Errorf("Tracker: No record %s", xappKey)
 }
index 867b854..9adaeca 100644 (file)
@@ -36,7 +36,7 @@ type TransactionXappKey struct {
 }
 
 func (key *TransactionXappKey) String() string {
-       return key.RmrEndpoint.String() + "/" + key.Xid
+       return "transkey(" + key.RmrEndpoint.String() + "/" + key.Xid + ")"
 }
 
 //-----------------------------------------------------------------------------
@@ -61,14 +61,18 @@ type Transaction struct {
        ForwardRespToXapp bool
 }
 
-func (t *Transaction) String() string {
-       t.mutex.Lock()
-       defer t.mutex.Unlock()
+func (t *Transaction) StringImpl() string {
        var subId string = "?"
        if t.Subs != nil {
                subId = strconv.FormatUint(uint64(t.Subs.Seq), 10)
        }
-       return subId + "/" + t.RmrEndpoint.String() + "/" + t.Xid
+       return "trans(" + t.RmrEndpoint.String() + "/" + t.Xid + "/" + t.Meid.RanName + "/" + subId + ")"
+}
+
+func (t *Transaction) String() string {
+       t.mutex.Lock()
+       defer t.mutex.Unlock()
+       return t.StringImpl()
 }
 
 func (t *Transaction) GetXid() string {
@@ -115,16 +119,18 @@ func (t *Transaction) RetryTransaction() {
 }
 
 func (t *Transaction) Release() {
-       xapp.Logger.Info("Transaction: Releasing %s", t)
        t.mutex.Lock()
-       defer t.mutex.Unlock()
-       if t.Subs != nil {
-               t.Subs.UnSetTransaction(t)
-       }
-       if t.tracker != nil {
-               xappkey := TransactionXappKey{t.RmrEndpoint, t.Xid}
-               t.tracker.UnTrackTransaction(xappkey)
-       }
+       subs := t.Subs
+       tracker := t.tracker
+       xappkey := TransactionXappKey{t.RmrEndpoint, t.Xid}
        t.Subs = nil
        t.tracker = nil
+       t.mutex.Unlock()
+
+       if subs != nil {
+               subs.UnSetTransaction(t)
+       }
+       if tracker != nil {
+               tracker.UnTrackTransaction(xappkey)
+       }
 }
index 00674c7..e740349 100644 (file)
@@ -36,16 +36,6 @@ type RmrDatagram struct {
        Payload        []byte
 }
 
-//-----------------------------------------------------------------------------
-//
-//-----------------------------------------------------------------------------
-type SubRouteInfo struct {
-       Command Action
-       Address string
-       Port    uint16
-       SubID   uint16
-}
-
 //-----------------------------------------------------------------------------
 //
 //-----------------------------------------------------------------------------
@@ -84,6 +74,32 @@ func (endpoint *RmrEndpoint) Set(src string) bool {
        return false
 }
 
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+type RmrEndpointList struct {
+       Endpoints []RmrEndpoint
+}
+
+func (eplist *RmrEndpointList) String() string {
+       valuesText := []string{}
+       for i := range eplist.Endpoints {
+               ep := eplist.Endpoints[i]
+               text := ep.String()
+               valuesText = append(valuesText, text)
+       }
+       return strings.Join(valuesText, ",")
+}
+
+func (eplist *RmrEndpointList) HasEndpoint(ep *RmrEndpoint) bool {
+       for i := range eplist.Endpoints {
+               if (eplist.Endpoints[i].Addr == ep.Addr) && (eplist.Endpoints[i].Port == ep.Port) {
+                       return true
+               }
+       }
+       return false
+}
+
 func NewRmrEndpoint(src string) *RmrEndpoint {
        ep := &RmrEndpoint{}
        if ep.Set(src) == false {
@@ -120,6 +136,6 @@ type RMRParams struct {
 
 func (params *RMRParams) String() string {
        var b bytes.Buffer
-       fmt.Fprintf(&b, "Src=%s Mtype=%s(%d) SubId=%v Xid=%s Meid=%v", params.Src, xapp.RicMessageTypeToName[params.Mtype], params.Mtype, params.SubId, params.Xid, params.Meid)
+       fmt.Fprintf(&b, "params(Src=%s Mtype=%s(%d) SubId=%v Xid=%s Meid=%s)", params.Src, xapp.RicMessageTypeToName[params.Mtype], params.Mtype, params.SubId, params.Xid, params.Meid.RanName)
        return b.String()
 }