RICPLT-3014 Subscription multiple endpoints 86/2286/4
authorJuha Hyttinen <juha.hyttinen@nokia.com>
Fri, 17 Jan 2020 11:37:05 +0000 (13:37 +0200)
committerJuha Hyttinen <juha.hyttinen@nokia.com>
Wed, 22 Jan 2020 09:15:46 +0000 (11:15 +0200)
Change-Id: I2cc6ce087199f26fe4dd609497d2248ac9daf2cd
Signed-off-by: Juha Hyttinen <juha.hyttinen@nokia.com>
pkg/control/client.go
pkg/control/control.go
pkg/control/main_test.go
pkg/control/messaging_test.go
pkg/control/registry.go
pkg/control/subscription.go
pkg/control/tracker.go
pkg/control/transaction.go
pkg/control/types.go

index b067f3b..adc03d1 100644 (file)
@@ -39,7 +39,7 @@ type SubRouteInfo struct {
 }
 
 func (sri *SubRouteInfo) String() string {
-       return "routeinfo(" + sri.Command.String() + "/" + strconv.FormatUint(uint64(sri.SubID), 10) + "/[" + sri.EpList.String() + "])"
+       return "routeinfo(" + strconv.FormatUint(uint64(sri.SubID), 10) + "/" + sri.Command.String() + "/[" + sri.EpList.String() + "])"
 }
 
 //-----------------------------------------------------------------------------
index 1bc2ee8..a2c8b0d 100755 (executable)
@@ -36,10 +36,12 @@ import (
 //
 //-----------------------------------------------------------------------------
 
-var subReqTime time.Duration = 5 * time.Second
-var subDelReqTime time.Duration = 5 * time.Second
-var maxSubReqTryCount uint64 = 2    // Initial try + retry
-var maxSubDelReqTryCount uint64 = 2 // Initial try + retry
+var e2tSubReqTimeout time.Duration = 5 * time.Second
+var e2tSubDelReqTime time.Duration = 5 * time.Second
+var e2tMaxSubReqTryCount uint64 = 2    // Initial try + retry
+var e2tMaxSubDelReqTryCount uint64 = 2 // Initial try + retry
+
+var e2tRecvMsgTimeout time.Duration = 5 * time.Second
 
 type Control struct {
        e2ap         *E2ap
@@ -157,17 +159,17 @@ func (c *Control) Consume(params *xapp.RMRParams) (err error) {
        c.msgCounter++
        switch msg.Mtype {
        case xapp.RICMessageTypes["RIC_SUB_REQ"]:
-               go c.handleSubscriptionRequest(msg)
+               go c.handleXAPPSubscriptionRequest(msg)
        case xapp.RICMessageTypes["RIC_SUB_RESP"]:
-               go c.handleSubscriptionResponse(msg)
+               go c.handleE2TSubscriptionResponse(msg)
        case xapp.RICMessageTypes["RIC_SUB_FAILURE"]:
-               go c.handleSubscriptionFailure(msg)
+               go c.handleE2TSubscriptionFailure(msg)
        case xapp.RICMessageTypes["RIC_SUB_DEL_REQ"]:
-               go c.handleSubscriptionDeleteRequest(msg)
+               go c.handleXAPPSubscriptionDeleteRequest(msg)
        case xapp.RICMessageTypes["RIC_SUB_DEL_RESP"]:
-               go c.handleSubscriptionDeleteResponse(msg)
+               go c.handleE2TSubscriptionDeleteResponse(msg)
        case xapp.RICMessageTypes["RIC_SUB_DEL_FAILURE"]:
-               go c.handleSubscriptionDeleteFailure(msg)
+               go c.handleE2TSubscriptionDeleteFailure(msg)
        default:
                xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
        }
@@ -188,397 +190,371 @@ func idstring(trans fmt.Stringer, subs fmt.Stringer, err error) string {
        if err != nil {
                retval += filler + "err(" + err.Error() + ")"
                filler = " "
-       }
-       return retval
-}
-
-func (c *Control) findSubs(ids []int) (*Subscription, error) {
-       var subs *Subscription = nil
-       for _, id := range ids {
-               if id >= 0 {
-                       subs = c.registry.GetSubscription(uint16(id))
-               }
-               if subs != nil {
-                       break
-               }
-       }
-       if subs == nil {
-               return nil, fmt.Errorf("No valid subscription found with ids %v", ids)
-       }
-       return subs, nil
-}
 
-func (c *Control) findSubsAndTrans(ids []int) (*Subscription, *Transaction, error) {
-       subs, err := c.findSubs(ids)
-       if err != nil {
-               return nil, nil, err
-       }
-       trans := subs.GetTransaction()
-       if trans == nil {
-               return subs, nil, fmt.Errorf("No ongoing transaction found from %s", idstring(nil, subs, nil))
        }
-       return subs, trans, nil
+       return retval
 }
 
-func (c *Control) handleSubscriptionRequest(params *RMRParams) {
-       xapp.Logger.Info("SubReq from xapp: %s", params.String())
+//-------------------------------------------------------------------
+// handle from XAPP Subscription Request
+//------------------------------------------------------------------
+func (c *Control) handleXAPPSubscriptionRequest(params *RMRParams) {
+       xapp.Logger.Info("XAPP-SubReq from xapp: %s", params.String())
 
-       SubReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
+       subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
        if err != nil {
-               xapp.Logger.Error("SubReq Drop: %s", idstring(params, nil, err))
+               xapp.Logger.Error("XAPP-SubReq: %s", idstring(params, nil, err))
                return
        }
 
-       trans, err := c.tracker.TrackTransaction(NewRmrEndpoint(params.Src), params.Xid, params.Meid, false, true)
+       trans, err := c.tracker.TrackTransaction(NewRmrEndpoint(params.Src), params.Xid, params.Meid)
        if err != nil {
-               xapp.Logger.Error("SubReq Drop: %s", idstring(params, nil, err))
+               xapp.Logger.Error("XAPP-SubReq: %s", idstring(params, nil, err))
                return
        }
-       trans.SubReqMsg = SubReqMsg
+       defer trans.Release()
 
-       subs, err := c.registry.ReserveSubscription(trans.Meid)
+       subs, err := c.registry.AssignToSubscription(trans, subReqMsg)
        if err != nil {
-               xapp.Logger.Error("SubReq Drop: %s", idstring(trans, nil, err))
-               trans.Release()
+               xapp.Logger.Error("XAPP-SubReq: %s", idstring(trans, nil, err))
                return
        }
 
-       err = subs.SetTransaction(trans)
-       if err != nil {
-               xapp.Logger.Error("SubReq Drop: %s", idstring(trans, subs, err))
-               subs.Release()
-               trans.Release()
+       if subs.IsTransactionReserved() {
+               err := fmt.Errorf("Currently parallel or queued transactions are not allowed")
+               xapp.Logger.Error("XAPP-SubReq: %s", idstring(trans, subs, err))
                return
        }
-       trans.SubReqMsg.RequestId.Seq = uint32(subs.GetSubId())
-
-       xapp.Logger.Debug("SubReq: Handling %s", idstring(trans, subs, nil))
 
        //
-       // TODO: subscription create is in fact owned by subscription and not transaction.
-       //       Transaction is toward xapp while Subscription is toward ran.
-       //       In merge several xapps may wake transactions, while only one subscription
-       //       toward ran occurs -> subscription owns subscription creation toward ran
+       // Wake subs request
        //
-       //       This is intermediate solution while improving message handling
-       //
-       trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(trans.SubReqMsg)
-       if err != nil {
-               xapp.Logger.Error("SubResp Drop: %s", idstring(trans, subs, err))
-               subs.Release()
-               trans.Release()
-               return
+       go c.handleSubscriptionCreate(subs, trans)
+       event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
+
+       err = nil
+       if event != nil {
+               switch themsg := event.(type) {
+               case *e2ap.E2APSubscriptionResponse:
+                       trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
+                       if err == nil {
+                               c.rmrReplyToSender("XAPP-SubReq: SubResp to xapp", subs, trans)
+                               return
+                       }
+               case *e2ap.E2APSubscriptionFailure:
+                       trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
+                       if err == nil {
+                               c.rmrReplyToSender("XAPP-SubReq: SubFail to xapp", subs, trans)
+                       }
+                       return
+               default:
+                       break
+               }
        }
-
-       c.rmrSend("SubReq: SubReq to E2T", subs, trans)
-       c.timerMap.StartTimer("RIC_SUB_REQ", int(subs.GetSubId()), subReqTime, FirstTry, c.handleSubscriptionRequestTimer)
-       return
+       xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(trans, subs, err))
 }
 
-func (c *Control) handleSubscriptionResponse(params *RMRParams) {
-       xapp.Logger.Info("SubResp from E2T: %s", params.String())
+//-------------------------------------------------------------------
+// handle from XAPP Subscription Delete Request
+//------------------------------------------------------------------
+func (c *Control) handleXAPPSubscriptionDeleteRequest(params *RMRParams) {
+       xapp.Logger.Info("XAPP-SubDelReq from xapp: %s", params.String())
 
-       SubRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
+       subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
        if err != nil {
-               xapp.Logger.Error("SubResp Drop %s", idstring(params, nil, err))
+               xapp.Logger.Error("XAPP-SubDelReq %s", idstring(params, nil, err))
                return
        }
 
-       subs, trans, err := c.findSubsAndTrans([]int{int(SubRespMsg.RequestId.Seq), params.SubId})
+       trans, err := c.tracker.TrackTransaction(NewRmrEndpoint(params.Src), params.Xid, params.Meid)
        if err != nil {
-               xapp.Logger.Error("SubResp: %s", idstring(params, nil, err))
+               xapp.Logger.Error("XAPP-SubDelReq %s", idstring(params, nil, err))
                return
        }
-       trans.SubRespMsg = SubRespMsg
-       xapp.Logger.Debug("SubResp: Handling %s", idstring(trans, subs, nil))
+       defer trans.Release()
 
-       c.timerMap.StopTimer("RIC_SUB_REQ", int(subs.GetSubId()))
-
-       responseReceived := trans.CheckResponseReceived()
-       if responseReceived == true {
-               // Subscription timer already received
-               return
-       }
-
-       trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(trans.SubRespMsg)
-       if err != nil {
-               xapp.Logger.Error("SubResp: %s", idstring(trans, subs, err))
-               trans.Release()
-               return
-       }
-
-       subs.Confirmed()
-       trans.Release()
-       c.rmrReplyToSender("SubResp: SubResp to xapp", subs, trans)
-       return
-}
-
-func (c *Control) handleSubscriptionFailure(params *RMRParams) {
-       xapp.Logger.Info("SubFail from E2T: %s", params.String())
-
-       SubFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
+       subs, err := c.registry.GetSubscriptionFirstMatch([]uint16{uint16(subDelReqMsg.RequestId.Seq), uint16(params.SubId)})
        if err != nil {
-               xapp.Logger.Error("SubFail Drop %s", idstring(params, nil, err))
+               xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(trans, nil, err))
                return
        }
 
-       subs, trans, err := c.findSubsAndTrans([]int{int(SubFailMsg.RequestId.Seq), params.SubId})
-       if err != nil {
-               xapp.Logger.Error("SubFail: %s", idstring(params, nil, err))
+       if subs.IsTransactionReserved() {
+               err := fmt.Errorf("Currently parallel or queued transactions are not allowed")
+               xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(trans, subs, err))
                return
        }
-       trans.SubFailMsg = SubFailMsg
-       xapp.Logger.Debug("SubFail: Handling %s", idstring(trans, subs, nil))
 
-       c.timerMap.StopTimer("RIC_SUB_REQ", int(subs.GetSubId()))
-       responseReceived := trans.CheckResponseReceived()
-       if responseReceived == true {
-               // Subscription timer already received
-               return
-       }
-
-       trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(trans.SubFailMsg)
+       //
+       // Wake subs delete
+       //
+       go c.handleSubscriptionDelete(subs, trans)
+       trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
+
+       // Whatever is received send ok delete response
+       subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
+       subDelRespMsg.RequestId.Id = subs.SubReqMsg.RequestId.Id
+       subDelRespMsg.RequestId.Seq = uint32(subs.GetSubId())
+       subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
+       trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
        if err == nil {
-               c.rmrReplyToSender("SubFail: SubFail to xapp", subs, trans)
-               time.Sleep(3 * time.Second)
-       } else {
-               //TODO error handling improvement
-               xapp.Logger.Error("SubFail: (continue cleaning) %s", idstring(trans, subs, err))
+               c.rmrReplyToSender("XAPP-SubDelReq: SubDelResp to xapp", subs, trans)
        }
-
-       trans.Release()
-       subs.Release()
-       return
 }
 
-func (c *Control) handleSubscriptionRequestTimer(strId string, nbrId int, tryCount uint64) {
-       xapp.Logger.Info("SubReq timeout: subId: %v,  tryCount: %v", nbrId, tryCount)
+//-------------------------------------------------------------------
+// SUBS CREATE Handling
+//-------------------------------------------------------------------
+func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *Transaction) {
 
-       subs, trans, err := c.findSubsAndTrans(([]int{nbrId}))
-       if err != nil {
-               xapp.Logger.Error("SubReq timeout: %s", idstring(nil, nil, err))
+       trans := c.tracker.NewTransaction(subs.GetMeid())
+       subs.WaitTransactionTurn(trans)
+       defer subs.ReleaseTransactionTurn(trans)
+       defer trans.Release()
+
+       xapp.Logger.Debug("SUBS-SubReq: Handling %s parent %s", idstring(trans, subs, nil), parentTrans.String())
+
+       if subs.SubRespMsg != nil {
+               xapp.Logger.Debug("SUBS-SubReq: Handling (immediate response) %s parent %s", idstring(nil, subs, nil), parentTrans.String())
+               parentTrans.SendEvent(subs.SubRespMsg, 0)
                return
        }
-       xapp.Logger.Debug("SubReq timeout: Handling %s", idstring(trans, subs, nil))
 
-       responseReceived := trans.CheckResponseReceived()
-       if responseReceived == true {
-               // Subscription Response or Failure already received
+       event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
+       switch themsg := event.(type) {
+       case *e2ap.E2APSubscriptionResponse:
+               subs.SubRespMsg = themsg
+               parentTrans.SendEvent(event, 0)
                return
+       case *e2ap.E2APSubscriptionFailure:
+               //TODO: Possible delete and one retry for subs req
+               parentTrans.SendEvent(event, 0)
+       default:
+               xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(trans, subs, nil))
+               c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
+               parentTrans.SendEvent(nil, 0)
        }
 
-       if tryCount < maxSubReqTryCount {
-               xapp.Logger.Info("SubReq timeout: %s", idstring(trans, subs, nil))
+       subs.DelEndpoint(parentTrans.GetEndpoint())
+}
 
-               trans.RetryTransaction()
+//-------------------------------------------------------------------
+// SUBS DELETE Handling
+//-------------------------------------------------------------------
+func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *Transaction) {
 
-               c.rmrSend("SubReq timeout: SubReq to E2T", subs, trans)
+       trans := c.tracker.NewTransaction(subs.GetMeid())
+       subs.WaitTransactionTurn(trans)
+       defer subs.ReleaseTransactionTurn(trans)
+       defer trans.Release()
 
-               tryCount++
-               c.timerMap.StartTimer("RIC_SUB_REQ", int(subs.GetSubId()), subReqTime, tryCount, c.handleSubscriptionRequestTimer)
-               return
-       }
+       xapp.Logger.Debug("SUBS-SubDelReq: Handling %s parent %s", idstring(trans, subs, nil), parentTrans.String())
 
-       // Release CREATE transaction
-       trans.Release()
+       event := c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
 
-       // Create DELETE transaction (internal and no messages toward xapp)
-       deltrans, err := c.tracker.TrackTransaction(&trans.RmrEndpoint, trans.GetXid(), trans.GetMeid(), false, false)
-       if err != nil {
-               xapp.Logger.Error("SubReq timeout: %s", idstring(trans, subs, err))
-               //TODO improve error handling. Important at least in merge
-               subs.Release()
-               return
-       }
+       parentTrans.SendEvent(event, 0)
+       subs.DelEndpoint(parentTrans.GetEndpoint())
+}
 
-       deltrans.SubDelReqMsg = &e2ap.E2APSubscriptionDeleteRequest{}
-       deltrans.SubDelReqMsg.RequestId.Id = trans.SubReqMsg.RequestId.Id
-       deltrans.SubDelReqMsg.RequestId.Seq = uint32(subs.GetSubId())
-       deltrans.SubDelReqMsg.FunctionId = trans.SubReqMsg.FunctionId
-       deltrans.Mtype, deltrans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(deltrans.SubDelReqMsg)
+//-------------------------------------------------------------------
+// send to E2T Subscription Request
+//-------------------------------------------------------------------
+func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *Transaction, parentTrans *Transaction) interface{} {
+       var err error
+       var event interface{} = nil
+       var timedOut bool = false
+
+       subReqMsg := subs.SubReqMsg
+       subReqMsg.RequestId.Id = 123
+       subReqMsg.RequestId.Seq = uint32(subs.GetSubId())
+       trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
        if err != nil {
-               xapp.Logger.Error("SubReq timeout: %s", idstring(trans, subs, err))
-               //TODO improve error handling. Important at least in merge
-               deltrans.Release()
-               subs.Release()
-               return
+               xapp.Logger.Error("SUBS-SubReq: %s parent %s", idstring(trans, subs, err), parentTrans.String())
+               return event
        }
 
-       err = subs.SetTransaction(deltrans)
-       if err != nil {
-               xapp.Logger.Error("SubReq timeout: %s", idstring(trans, subs, err))
-               //TODO improve error handling. Important at least in merge
-               deltrans.Release()
-               return
+       for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
+               desc := fmt.Sprintf("SUBS-SubReq: SubReq to E2T (retry %d)", retries)
+               c.rmrSend(desc, subs, trans)
+               event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
+               if timedOut {
+                       continue
+               }
+               break
        }
-
-       c.rmrSend("SubReq timer: SubDelReq to E2T", subs, deltrans)
-       c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()), subDelReqTime, FirstTry, c.handleSubscriptionDeleteRequestTimer)
-       return
+       xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s parent %s", typeofSubsMessage(event), idstring(trans, subs, nil), parentTrans.String())
+       return event
 }
 
-func (c *Control) handleSubscriptionDeleteRequest(params *RMRParams) {
-       xapp.Logger.Info("SubDelReq from xapp: %s", params.String())
+//-------------------------------------------------------------------
+// send to E2T Subscription Delete Request
+//-------------------------------------------------------------------
 
-       SubDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
+func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *Transaction, parentTrans *Transaction) interface{} {
+       var err error
+       var event interface{}
+       var timedOut bool
+
+       subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
+       subDelReqMsg.RequestId.Id = 123
+       subDelReqMsg.RequestId.Seq = uint32(subs.GetSubId())
+       subDelReqMsg.FunctionId = 0
+       trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
        if err != nil {
-               xapp.Logger.Error("SubDelReq Drop %s", idstring(params, nil, err))
-               return
+               xapp.Logger.Error("SUBS-SubDelReq: %s parent %s", idstring(trans, subs, err), parentTrans.String())
+               return event
        }
 
-       trans, err := c.tracker.TrackTransaction(NewRmrEndpoint(params.Src), params.Xid, params.Meid, false, true)
-       if err != nil {
-               xapp.Logger.Error("SubDelReq Drop %s", idstring(params, nil, err))
-               return
+       for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
+               desc := fmt.Sprintf("SUBS-SubDelReq: SubDelReq to E2T (retry %d)", retries)
+               c.rmrSend(desc, subs, trans)
+               event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
+               if timedOut {
+                       continue
+               }
+               break
        }
-       trans.SubDelReqMsg = SubDelReqMsg
+       xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s parent %s", typeofSubsMessage(event), idstring(trans, subs, nil), parentTrans.String())
+       return event
+}
 
-       subs, err := c.findSubs([]int{int(trans.SubDelReqMsg.RequestId.Seq), params.SubId})
+//-------------------------------------------------------------------
+// handle from E2T Subscription Reponse
+//-------------------------------------------------------------------
+func (c *Control) handleE2TSubscriptionResponse(params *RMRParams) {
+       xapp.Logger.Info("MSG-SubResp from E2T: %s", params.String())
+       subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
        if err != nil {
-               xapp.Logger.Error("SubDelReq: %s", idstring(params, nil, err))
-               trans.Release()
+               xapp.Logger.Error("MSG-SubResp %s", idstring(params, nil, err))
                return
        }
-
-       err = subs.SetTransaction(trans)
+       subs, err := c.registry.GetSubscriptionFirstMatch([]uint16{uint16(subRespMsg.RequestId.Seq), uint16(params.SubId)})
        if err != nil {
-               xapp.Logger.Error("SubDelReq: %s", idstring(trans, subs, err))
-               trans.Release()
+               xapp.Logger.Error("MSG-SubResp: %s", idstring(params, nil, err))
                return
        }
-
-       xapp.Logger.Debug("SubDelReq: Handling %s", idstring(trans, subs, nil))
-
-       //
-       // TODO: subscription delete is in fact owned by subscription and not transaction.
-       //       Transaction is toward xapp while Subscription is toward ran.
-       //       In merge several xapps may wake transactions, while only one subscription
-       //       toward ran occurs -> subscription owns subscription creation toward ran
-       //
-       //       This is intermediate solution while improving message handling
-       //
-       trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(trans.SubDelReqMsg)
-       if err != nil {
-               xapp.Logger.Error("SubDelReq: %s", idstring(trans, subs, err))
-               trans.Release()
+       trans := subs.GetTransaction()
+       if trans == nil {
+               err = fmt.Errorf("Ongoing transaction not found")
+               xapp.Logger.Error("MSG-SubResp: %s", idstring(params, subs, err))
                return
        }
-
-       subs.UnConfirmed()
-
-       c.rmrSend("SubDelReq: SubDelReq to E2T", subs, trans)
-
-       c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()), subDelReqTime, FirstTry, c.handleSubscriptionDeleteRequestTimer)
+       sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
+       if sendOk == false {
+               err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
+               xapp.Logger.Error("MSG-SubResp: %s", idstring(trans, subs, err))
+       }
        return
 }
 
-func (c *Control) handleSubscriptionDeleteResponse(params *RMRParams) (err error) {
-       xapp.Logger.Info("SubDelResp from E2T:%s", params.String())
-
-       SubDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
+//-------------------------------------------------------------------
+// handle from E2T Subscription Failure
+//-------------------------------------------------------------------
+func (c *Control) handleE2TSubscriptionFailure(params *RMRParams) {
+       xapp.Logger.Info("MSG-SubFail from E2T: %s", params.String())
+       subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
        if err != nil {
-               xapp.Logger.Error("SubDelResp: Dropping this msg. %s", idstring(params, nil, err))
+               xapp.Logger.Error("MSG-SubFail %s", idstring(params, nil, err))
                return
        }
-
-       subs, trans, err := c.findSubsAndTrans([]int{int(SubDelRespMsg.RequestId.Seq), params.SubId})
+       subs, err := c.registry.GetSubscriptionFirstMatch([]uint16{uint16(subFailMsg.RequestId.Seq), uint16(params.SubId)})
        if err != nil {
-               xapp.Logger.Error("SubDelResp: %s", idstring(params, nil, err))
+               xapp.Logger.Error("MSG-SubFail: %s", idstring(params, nil, err))
                return
        }
-       trans.SubDelRespMsg = SubDelRespMsg
-       xapp.Logger.Debug("SubDelResp: Handling %s", idstring(trans, subs, nil))
-
-       c.timerMap.StopTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()))
-
-       responseReceived := trans.CheckResponseReceived()
-       if responseReceived == true {
-               // Subscription Delete timer already received
+       trans := subs.GetTransaction()
+       if trans == nil {
+               err = fmt.Errorf("Ongoing transaction not found")
+               xapp.Logger.Error("MSG-SubFail: %s", idstring(params, subs, err))
                return
        }
-
-       c.sendSubscriptionDeleteResponse("SubDelResp", trans, subs)
+       sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
+       if sendOk == false {
+               err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
+               xapp.Logger.Error("MSG-SubFail: %s", idstring(trans, subs, err))
+       }
        return
 }
 
-func (c *Control) handleSubscriptionDeleteFailure(params *RMRParams) {
-       xapp.Logger.Info("SubDelFail from E2T:%s", params.String())
-
-       SubDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
+//-------------------------------------------------------------------
+// handle from E2T Subscription Delete Response
+//-------------------------------------------------------------------
+func (c *Control) handleE2TSubscriptionDeleteResponse(params *RMRParams) (err error) {
+       xapp.Logger.Info("SUBS-SubDelResp from E2T:%s", params.String())
+       subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
        if err != nil {
-               xapp.Logger.Error("SubDelFail: Dropping this msg. %s", idstring(params, nil, err))
+               xapp.Logger.Error("SUBS-SubDelResp: %s", idstring(params, nil, err))
                return
        }
-
-       subs, trans, err := c.findSubsAndTrans([]int{int(SubDelFailMsg.RequestId.Seq), params.SubId})
+       subs, err := c.registry.GetSubscriptionFirstMatch([]uint16{uint16(subDelRespMsg.RequestId.Seq), uint16(params.SubId)})
        if err != nil {
-               xapp.Logger.Error("SubDelFail: %s", idstring(params, nil, err))
+               xapp.Logger.Error("SUBS-SubDelResp: %s", idstring(params, nil, err))
                return
        }
-       trans.SubDelFailMsg = SubDelFailMsg
-       xapp.Logger.Debug("SubDelFail: Handling %s", idstring(trans, subs, nil))
-
-       c.timerMap.StopTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()))
-
-       responseReceived := trans.CheckResponseReceived()
-       if responseReceived == true {
-               // Subscription Delete timer already received
+       trans := subs.GetTransaction()
+       if trans == nil {
+               err = fmt.Errorf("Ongoing transaction not found")
+               xapp.Logger.Error("SUBS-SubDelResp: %s", idstring(params, subs, err))
                return
        }
-
-       c.sendSubscriptionDeleteResponse("SubDelFail", trans, subs)
+       sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
+       if sendOk == false {
+               err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
+               xapp.Logger.Error("MSG-SubDelResp: %s", idstring(trans, subs, err))
+       }
        return
 }
 
-func (c *Control) handleSubscriptionDeleteRequestTimer(strId string, nbrId int, tryCount uint64) {
-       xapp.Logger.Info("SubDelReq timeout: subId: %v, tryCount: %v", nbrId, tryCount)
-
-       subs, trans, err := c.findSubsAndTrans([]int{nbrId})
+//-------------------------------------------------------------------
+// handle from E2T Subscription Delete Failure
+//-------------------------------------------------------------------
+func (c *Control) handleE2TSubscriptionDeleteFailure(params *RMRParams) {
+       xapp.Logger.Info("MSG-SubDelFail from E2T:%s", params.String())
+       subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
        if err != nil {
-               xapp.Logger.Error("SubDelReq timeout: %s", idstring(nil, nil, err))
+               xapp.Logger.Error("MSG-SubDelFail: %s", idstring(params, nil, err))
                return
        }
-       xapp.Logger.Debug("SubDelReq timeout: Handling %s", idstring(trans, subs, nil))
-
-       responseReceived := trans.CheckResponseReceived()
-       if responseReceived == true {
-               // Subscription Delete Response or Failure already received
+       subs, err := c.registry.GetSubscriptionFirstMatch([]uint16{uint16(subDelFailMsg.RequestId.Seq), uint16(params.SubId)})
+       if err != nil {
+               xapp.Logger.Error("MSG-SubDelFail: %s", idstring(params, nil, err))
                return
        }
-
-       if tryCount < maxSubDelReqTryCount {
-               // Set possible to handle new response for the subId
-               trans.RetryTransaction()
-               c.rmrSend("SubDelReq timeout: SubDelReq to E2T", subs, trans)
-               tryCount++
-               c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()), subReqTime, tryCount, c.handleSubscriptionDeleteRequestTimer)
+       trans := subs.GetTransaction()
+       if trans == nil {
+               err = fmt.Errorf("Ongoing transaction not found")
+               xapp.Logger.Error("MSG-SubDelFail: %s", idstring(params, subs, err))
                return
        }
-
-       c.sendSubscriptionDeleteResponse("SubDelReq(timer)", trans, subs)
+       sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
+       if sendOk == false {
+               err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
+               xapp.Logger.Error("MSG-SubDelFail: %s", idstring(trans, subs, err))
+       }
        return
 }
 
-func (c *Control) sendSubscriptionDeleteResponse(desc string, trans *Transaction, subs *Subscription) {
-
-       if trans.ForwardRespToXapp == true {
-               //Always generate SubDelResp
-               trans.SubDelRespMsg = &e2ap.E2APSubscriptionDeleteResponse{}
-               trans.SubDelRespMsg.RequestId.Id = trans.SubDelReqMsg.RequestId.Id
-               trans.SubDelRespMsg.RequestId.Seq = uint32(subs.GetSubId())
-               trans.SubDelRespMsg.FunctionId = trans.SubDelReqMsg.FunctionId
-
-               var err error
-               trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(trans.SubDelRespMsg)
-               if err == nil {
-                       c.rmrReplyToSender(desc+": SubDelResp to xapp", subs, trans)
-                       time.Sleep(3 * time.Second)
-               } else {
-                       //TODO error handling improvement
-                       xapp.Logger.Error("%s: (continue cleaning) %s", desc, idstring(trans, subs, err))
-               }
+//-------------------------------------------------------------------
+//
+//-------------------------------------------------------------------
+func typeofSubsMessage(v interface{}) string {
+       if v == nil {
+               return "NIL"
+       }
+       switch v.(type) {
+       case *e2ap.E2APSubscriptionRequest:
+               return "SubReq"
+       case *e2ap.E2APSubscriptionResponse:
+               return "SubResp"
+       case *e2ap.E2APSubscriptionFailure:
+               return "SubFail"
+       case *e2ap.E2APSubscriptionDeleteRequest:
+               return "SubDelReq"
+       case *e2ap.E2APSubscriptionDeleteResponse:
+               return "SubDelResp"
+       case *e2ap.E2APSubscriptionDeleteFailure:
+               return "SubDelFail"
+       default:
+               return "Unknown"
        }
-
-       trans.Release()
-       subs.Release()
 }
index d3d7b57..ab0bff6 100644 (file)
@@ -174,6 +174,27 @@ type testingRmrStubControl struct {
        testingRmrControl
        rmrClientTest *xapp.RMRClient
        active        bool
+       msgCnt        uint64
+}
+
+func (tc *testingRmrStubControl) GetMsgCnt() uint64 {
+       return tc.msgCnt
+}
+
+func (tc *testingRmrStubControl) IncMsgCnt() {
+       tc.msgCnt++
+}
+
+func (tc *testingRmrStubControl) DecMsgCnt() {
+       if tc.msgCnt > 0 {
+               tc.msgCnt--
+       }
+}
+
+func (tc *testingRmrStubControl) TestMsgCnt(t *testing.T) {
+       if tc.GetMsgCnt() > 0 {
+               testError(t, "(%s) message count expected 0 but is %d", tc.desc, tc.GetMsgCnt())
+       }
 }
 
 func (tc *testingRmrStubControl) RmrSend(params *RMRParams) (err error) {
@@ -268,6 +289,7 @@ func (tc *testingXappControl) Consume(params *xapp.RMRParams) (err error) {
 
        if strings.Contains(msg.Xid, tc.desc) {
                xapp.Logger.Info("(%s) Consume %s", tc.desc, msg.String())
+               tc.IncMsgCnt()
                tc.rmrConChan <- msg
        } else {
                xapp.Logger.Info("(%s) Ignore %s", tc.desc, msg.String())
index 68118ea..007a432 100644 (file)
@@ -115,6 +115,7 @@ func (xappConn *testingXappControl) handle_xapp_subs_resp(t *testing.T, trans *x
        //---------------------------------
        select {
        case msg := <-xappConn.rmrConChan:
+               xappConn.DecMsgCnt()
                if msg.Mtype != xapp.RICMessageTypes["RIC_SUB_RESP"] {
                        testError(t, "(%s) Received RIC_SUB_RESP wrong mtype expected %s got %s, error", xappConn.desc, "RIC_SUB_RESP", xapp.RicMessageTypeToName[msg.Mtype])
                        return -1
@@ -158,6 +159,7 @@ func (xappConn *testingXappControl) handle_xapp_subs_fail(t *testing.T, trans *x
        //-------------------------------
        select {
        case msg := <-xappConn.rmrConChan:
+               xappConn.DecMsgCnt()
                if msg.Mtype != xapp.RICMessageTypes["RIC_SUB_FAILURE"] {
                        testError(t, "(%s) Received RIC_SUB_FAILURE wrong mtype expected %s got %s, error", xappConn.desc, "RIC_SUB_FAILURE", xapp.RicMessageTypeToName[msg.Mtype])
                        return -1
@@ -246,6 +248,7 @@ func (xappConn *testingXappControl) handle_xapp_subs_del_resp(t *testing.T, tran
        //---------------------------------
        select {
        case msg := <-xappConn.rmrConChan:
+               xappConn.DecMsgCnt()
                if msg.Mtype != xapp.RICMessageTypes["RIC_SUB_DEL_RESP"] {
                        testError(t, "(%s) Received RIC_SUB_DEL_RESP wrong mtype expected %s got %s, error", xappConn.desc, "RIC_SUB_DEL_RESP", xapp.RicMessageTypeToName[msg.Mtype])
                        return
@@ -283,6 +286,7 @@ func (e2termConn *testingE2termControl) handle_e2term_subs_req(t *testing.T) (*e
        //---------------------------------
        select {
        case msg := <-e2termConn.rmrConChan:
+               e2termConn.DecMsgCnt()
                if msg.Mtype != xapp.RICMessageTypes["RIC_SUB_REQ"] {
                        testError(t, "(%s) Received wrong mtype expected %s got %s, error", e2termConn.desc, "RIC_SUB_REQ", xapp.RicMessageTypeToName[msg.Mtype])
                } else {
@@ -342,10 +346,11 @@ func (e2termConn *testingE2termControl) handle_e2term_subs_resp(t *testing.T, re
 
        params := &RMRParams{&xapp.RMRParams{}}
        params.Mtype = xapp.RIC_SUB_RESP
-       params.SubId = msg.SubId
+       //params.SubId = msg.SubId
+       params.SubId = -1
        params.Payload = packedMsg.Buf
        params.Meid = msg.Meid
-       params.Xid = msg.Xid
+       //params.Xid = msg.Xid
        params.Mbuf = nil
 
        snderr := e2termConn.RmrSend(params)
@@ -411,6 +416,7 @@ func (e2termConn *testingE2termControl) handle_e2term_subs_del_req(t *testing.T)
        //---------------------------------
        select {
        case msg := <-e2termConn.rmrConChan:
+               e2termConn.DecMsgCnt()
                if msg.Mtype != xapp.RICMessageTypes["RIC_SUB_DEL_REQ"] {
                        testError(t, "(%s) Received wrong mtype expected %s got %s, error", e2termConn.desc, "RIC_SUB_DEL_REQ", xapp.RicMessageTypeToName[msg.Mtype])
                } else {
@@ -520,31 +526,42 @@ func (e2termConn *testingE2termControl) handle_e2term_subs_del_fail(t *testing.T
 //
 //-----------------------------------------------------------------------------
 func (mc *testingMainControl) wait_subs_clean(t *testing.T, e2SubsId int, secs int) bool {
+       var subs *Subscription
        i := 1
        for ; i <= secs*2; i++ {
-               if mc.c.registry.GetSubscription(uint16(e2SubsId)) == nil {
+               subs = mc.c.registry.GetSubscription(uint16(e2SubsId))
+               if subs == nil {
                        return true
                }
                time.Sleep(500 * time.Millisecond)
        }
-       testError(t, "(general) no clean within %d secs", secs)
+       if subs != nil {
+               testError(t, "(general) no clean within %d secs: %s", secs, subs.String())
+       } else {
+               testError(t, "(general) no clean within %d secs: subs(N/A)", secs)
+       }
        return false
 }
 
 func (mc *testingMainControl) wait_subs_trans_clean(t *testing.T, e2SubsId int, secs int) bool {
+       var trans *Transaction
        i := 1
        for ; i <= secs*2; i++ {
                subs := mc.c.registry.GetSubscription(uint16(e2SubsId))
                if subs == nil {
                        return true
                }
-               trans := subs.GetTransaction()
+               trans = subs.GetTransaction()
                if trans == nil {
                        return true
                }
                time.Sleep(500 * time.Millisecond)
        }
-       testError(t, "(general) no clean within %d secs", secs)
+       if trans != nil {
+               testError(t, "(general) no clean within %d secs: %s", secs, trans.String())
+       } else {
+               testError(t, "(general) no clean within %d secs: trans(N/A)", secs)
+       }
        return false
 }
 
@@ -619,6 +636,10 @@ func TestSubReqAndRouteNok(t *testing.T) {
 
        //Wait that subs is cleaned
        mainCtrl.wait_subs_clean(t, int(newSubsId), 10)
+
+       xappConn1.TestMsgCnt(t)
+       xappConn2.TestMsgCnt(t)
+       e2termConn.TestMsgCnt(t)
 }
 
 //-----------------------------------------------------------------------------
@@ -675,6 +696,10 @@ func TestSubReqAndSubDelOk(t *testing.T) {
 
        //Wait that subs is cleaned
        mainCtrl.wait_subs_clean(t, e2SubsId, 10)
+
+       xappConn1.TestMsgCnt(t)
+       xappConn2.TestMsgCnt(t)
+       e2termConn.TestMsgCnt(t)
 }
 
 //-----------------------------------------------------------------------------
@@ -727,6 +752,10 @@ func TestSubReqRetransmission(t *testing.T) {
 
        //Wait that subs is cleaned
        mainCtrl.wait_subs_clean(t, e2SubsId, 10)
+
+       xappConn1.TestMsgCnt(t)
+       xappConn2.TestMsgCnt(t)
+       e2termConn.TestMsgCnt(t)
 }
 
 //-----------------------------------------------------------------------------
@@ -780,6 +809,10 @@ func TestSubDelReqRetransmission(t *testing.T) {
 
        //Wait that subs is cleaned
        mainCtrl.wait_subs_clean(t, e2SubsId, 10)
+
+       xappConn1.TestMsgCnt(t)
+       xappConn2.TestMsgCnt(t)
+       e2termConn.TestMsgCnt(t)
 }
 
 //-----------------------------------------------------------------------------
@@ -834,6 +867,10 @@ func TestSubDelReqCollision(t *testing.T) {
 
        //Wait that subs is cleaned
        mainCtrl.wait_subs_clean(t, e2SubsId, 10)
+
+       xappConn1.TestMsgCnt(t)
+       xappConn2.TestMsgCnt(t)
+       e2termConn.TestMsgCnt(t)
 }
 
 //-----------------------------------------------------------------------------
@@ -908,6 +945,10 @@ func TestSubReqAndSubDelOkTwoParallel(t *testing.T) {
        xappConn2.handle_xapp_subs_del_resp(t, deltrans2)
        //Wait that subs is cleaned
        mainCtrl.wait_subs_clean(t, e2SubsId2, 10)
+
+       xappConn1.TestMsgCnt(t)
+       xappConn2.TestMsgCnt(t)
+       e2termConn.TestMsgCnt(t)
 }
 
 //-----------------------------------------------------------------------------
@@ -985,6 +1026,10 @@ func TestSameSubsDiffRan(t *testing.T) {
        xappConn1.handle_xapp_subs_del_resp(t, deltrans2)
        //Wait that subs is cleaned
        mainCtrl.wait_subs_clean(t, e2SubsId2, 10)
+
+       xappConn1.TestMsgCnt(t)
+       xappConn2.TestMsgCnt(t)
+       e2termConn.TestMsgCnt(t)
 }
 
 //-----------------------------------------------------------------------------
@@ -1040,6 +1085,10 @@ func TestSubReqRetryInSubmgr(t *testing.T) {
 
        // Wait that subs is cleaned
        mainCtrl.wait_subs_clean(t, e2SubsId, 10)
+
+       xappConn1.TestMsgCnt(t)
+       xappConn2.TestMsgCnt(t)
+       e2termConn.TestMsgCnt(t)
 }
 
 //-----------------------------------------------------------------------------
@@ -1093,6 +1142,10 @@ func TestSubReqRetryNoRespSubDelRespInSubmgr(t *testing.T) {
 
        // Wait that subs is cleaned
        mainCtrl.wait_subs_clean(t, int(delreq.RequestId.Seq), 10)
+
+       xappConn1.TestMsgCnt(t)
+       xappConn2.TestMsgCnt(t)
+       e2termConn.TestMsgCnt(t)
 }
 
 //-----------------------------------------------------------------------------
@@ -1144,7 +1197,11 @@ func TestSubReqTwoRetriesNoRespAtAllInSubmgr(t *testing.T) {
        delreq, _ := e2termConn.handle_e2term_subs_del_req(t)
 
        // Wait that subs is cleaned
-       mainCtrl.wait_subs_clean(t, int(delreq.RequestId.Seq), 10)
+       mainCtrl.wait_subs_clean(t, int(delreq.RequestId.Seq), 15)
+
+       xappConn1.TestMsgCnt(t)
+       xappConn2.TestMsgCnt(t)
+       e2termConn.TestMsgCnt(t)
 }
 
 //-----------------------------------------------------------------------------
@@ -1186,6 +1243,10 @@ func TestSubReqSubFailRespInSubmgr(t *testing.T) {
 
        // Wait that subs is cleaned
        mainCtrl.wait_subs_clean(t, e2SubsId, 10)
+
+       xappConn1.TestMsgCnt(t)
+       xappConn2.TestMsgCnt(t)
+       e2termConn.TestMsgCnt(t)
 }
 
 //-----------------------------------------------------------------------------
@@ -1242,6 +1303,10 @@ func TestSubDelReqRetryInSubmgr(t *testing.T) {
 
        // Wait that subs is cleaned
        mainCtrl.wait_subs_clean(t, e2SubsId, 10)
+
+       xappConn1.TestMsgCnt(t)
+       xappConn2.TestMsgCnt(t)
+       e2termConn.TestMsgCnt(t)
 }
 
 //-----------------------------------------------------------------------------
@@ -1295,6 +1360,10 @@ func TestSubDelReqTwoRetriesNoRespInSubmgr(t *testing.T) {
 
        // Wait that subs is cleaned
        mainCtrl.wait_subs_clean(t, e2SubsId, 10)
+
+       xappConn1.TestMsgCnt(t)
+       xappConn2.TestMsgCnt(t)
+       e2termConn.TestMsgCnt(t)
 }
 
 //-----------------------------------------------------------------------------
@@ -1345,4 +1414,8 @@ func TestSubDelReqSubDelFailRespInSubmgr(t *testing.T) {
 
        // Wait that subs is cleaned
        mainCtrl.wait_subs_clean(t, e2SubsId, 10)
+
+       xappConn1.TestMsgCnt(t)
+       xappConn2.TestMsgCnt(t)
+       e2termConn.TestMsgCnt(t)
 }
index 0fadabb..5edc3eb 100644 (file)
@@ -21,6 +21,7 @@ package control
 
 import (
        "fmt"
+       "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
        "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
        "sync"
 )
@@ -45,7 +46,7 @@ func (r *Registry) Initialize() {
 }
 
 // Reserves and returns the next free sequence number
-func (r *Registry) ReserveSubscription(meid *xapp.RMRMeid) (*Subscription, error) {
+func (r *Registry) AssignToSubscription(trans *Transaction, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, error) {
        r.mutex.Lock()
        defer r.mutex.Unlock()
        if len(r.subIds) > 0 {
@@ -55,12 +56,16 @@ func (r *Registry) ReserveSubscription(meid *xapp.RMRMeid) (*Subscription, error
                        subs := &Subscription{
                                registry: r,
                                Seq:      sequenceNumber,
-                               Active:   false,
-                               Meid:     meid,
-                               Trans:    nil,
+                               Meid:     trans.Meid,
                        }
+                       err := subs.AddEndpoint(trans.GetEndpoint())
+                       if err != nil {
+                               return nil, err
+                       }
+                       subs.SubReqMsg = subReqMsg
+
                        r.register[sequenceNumber] = subs
-                       xapp.Logger.Info("Registry: Create %s", subs.String())
+                       xapp.Logger.Debug("Registry: Create %s", subs.String())
                        xapp.Logger.Debug("Registry: substable=%v", r.register)
                        return subs, nil
                }
@@ -77,12 +82,23 @@ func (r *Registry) GetSubscription(sn uint16) *Subscription {
        return nil
 }
 
+func (r *Registry) GetSubscriptionFirstMatch(ids []uint16) (*Subscription, error) {
+       r.mutex.Lock()
+       defer r.mutex.Unlock()
+       for _, id := range ids {
+               if _, ok := r.register[id]; ok {
+                       return r.register[id], nil
+               }
+       }
+       return nil, fmt.Errorf("No valid subscription found with ids %v", ids)
+}
+
 func (r *Registry) DelSubscription(sn uint16) bool {
        r.mutex.Lock()
        defer r.mutex.Unlock()
        if _, ok := r.register[sn]; ok {
                subs := r.register[sn]
-               xapp.Logger.Info("Registry: Delete %s", subs.String())
+               xapp.Logger.Debug("Registry: Delete %s", subs.String())
                r.subIds = append(r.subIds, sn)
                delete(r.register, sn)
                xapp.Logger.Debug("Registry: substable=%v", r.register)
index 6fd806c..4514d00 100644 (file)
@@ -21,23 +21,28 @@ package control
 
 import (
        "fmt"
+       "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
        "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
        "strconv"
        "sync"
+       "time"
 )
 
 //-----------------------------------------------------------------------------
 //
 //-----------------------------------------------------------------------------
 type Subscription struct {
-       mutex    sync.Mutex
-       registry *Registry
-       Seq      uint16
-       Active   bool
-       //
-       Meid   *xapp.RMRMeid
-       EpList RmrEndpointList
-       Trans  *Transaction
+       mutex      sync.Mutex      // Lock
+       registry   *Registry       // Registry
+       Seq        uint16          // SubsId
+       Meid       *xapp.RMRMeid   // Meid/ RanName
+       EpList     RmrEndpointList // Endpoints
+       DelEpList  RmrEndpointList // Endpoints
+       DelSeq     uint64
+       TransLock  sync.Mutex                     // Lock transactions, only one executed per time for subs
+       TheTrans   *Transaction                   // Ongoing transaction from xapp
+       SubReqMsg  *e2ap.E2APSubscriptionRequest  // Subscription information
+       SubRespMsg *e2ap.E2APSubscriptionResponse // Subscription information
 }
 
 func (s *Subscription) stringImpl() string {
@@ -65,85 +70,105 @@ func (s *Subscription) GetMeid() *xapp.RMRMeid {
        return nil
 }
 
-func (s *Subscription) Confirmed() {
+func (s *Subscription) AddEndpoint(ep *RmrEndpoint) error {
        s.mutex.Lock()
        defer s.mutex.Unlock()
-       s.Active = true
-}
-
-func (s *Subscription) UnConfirmed() {
-       s.mutex.Lock()
-       defer s.mutex.Unlock()
-       s.Active = false
-}
-
-func (s *Subscription) IsConfirmed() bool {
-       s.mutex.Lock()
-       defer s.mutex.Unlock()
-       return s.Active
+       if ep == nil {
+               return fmt.Errorf("AddEndpoint no endpoint given")
+       }
+       if s.EpList.AddEndpoint(ep) {
+               s.DelEpList.DelEndpoint(ep)
+               if s.EpList.Size() == 1 {
+                       return s.updateRouteImpl(CREATE)
+               }
+               return s.updateRouteImpl(MERGE)
+       }
+       return nil
 }
 
-func (s *Subscription) IsEndpoint(ep *RmrEndpoint) bool {
+func (s *Subscription) DelEndpoint(ep *RmrEndpoint) error {
        s.mutex.Lock()
        defer s.mutex.Unlock()
-       return s.EpList.HasEndpoint(ep)
+       var err error
+       if ep == nil {
+               return fmt.Errorf("DelEndpoint no endpoint given")
+       }
+       if s.EpList.HasEndpoint(ep) == false {
+               return fmt.Errorf("DelEndpoint endpoint not found")
+       }
+       if s.DelEpList.HasEndpoint(ep) == true {
+               return fmt.Errorf("DelEndpoint endpoint already under del")
+       }
+       s.DelEpList.AddEndpoint(ep)
+       go s.CleanCheck()
+       return err
 }
 
-func (s *Subscription) SetTransaction(trans *Transaction) error {
+func (s *Subscription) CleanCheck() {
        s.mutex.Lock()
        defer s.mutex.Unlock()
-
-       if s.Trans != nil {
-               return fmt.Errorf("subs(%s) trans(%s) exist, can not register trans(%s)", s.stringImpl(), s.Trans, trans)
+       s.DelSeq++
+       // Only one clean ongoing
+       if s.DelSeq > 1 {
+               return
        }
-       trans.Subs = s
-       s.Trans = trans
-
-       if len(s.EpList.Endpoints) == 0 {
-               s.EpList.Endpoints = append(s.EpList.Endpoints, trans.RmrEndpoint)
-               return s.updateRouteImpl(CREATE)
-       } else if s.EpList.HasEndpoint(&trans.RmrEndpoint) == false {
-               s.EpList.Endpoints = append(s.EpList.Endpoints, trans.RmrEndpoint)
-               return s.updateRouteImpl(MERGE)
+       var currSeq uint64 = 0
+       // Make sure that routes to be deleted
+       // are not deleted too fast
+       for currSeq < s.DelSeq {
+               currSeq = s.DelSeq
+               s.mutex.Unlock()
+               time.Sleep(5 * time.Second)
+               s.mutex.Lock()
        }
-       return nil
+       xapp.Logger.Info("DelEndpoint: delete cleaning %s", s.stringImpl())
+       if s.EpList.Size() <= s.DelEpList.Size() {
+               s.updateRouteImpl(DELETE)
+               go s.registry.DelSubscription(s.Seq)
+       } else if s.EpList.DelEndpoints(&s.DelEpList) {
+               s.updateRouteImpl(MERGE)
+       }
+       s.DelSeq = 0
+
 }
 
-func (s *Subscription) UnSetTransaction(trans *Transaction) bool {
+func (s *Subscription) IsTransactionReserved() bool {
        s.mutex.Lock()
        defer s.mutex.Unlock()
-       if trans == nil || trans == s.Trans {
-               s.Trans = nil
+       if s.TheTrans != nil {
                return true
        }
        return false
+
 }
 
 func (s *Subscription) GetTransaction() *Transaction {
        s.mutex.Lock()
        defer s.mutex.Unlock()
-       return s.Trans
+       return s.TheTrans
 }
 
-func (s *Subscription) updateRouteImpl(act Action) error {
-       subRouteAction := SubRouteInfo{act, s.EpList, s.Seq}
-       err := s.registry.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
-       if err != nil {
-               return fmt.Errorf("subs(%s) %s", s.stringImpl(), err.Error())
-       }
-       return nil
+func (s *Subscription) WaitTransactionTurn(trans *Transaction) {
+       s.TransLock.Lock()
+       s.mutex.Lock()
+       s.TheTrans = trans
+       s.mutex.Unlock()
 }
 
-func (s *Subscription) UpdateRoute(act Action) error {
+func (s *Subscription) ReleaseTransactionTurn(trans *Transaction) {
        s.mutex.Lock()
-       defer s.mutex.Unlock()
-       return s.updateRouteImpl(act)
+       if trans != nil && trans == s.TheTrans {
+               s.TheTrans = nil
+       }
+       s.mutex.Unlock()
+       s.TransLock.Unlock()
 }
 
-func (s *Subscription) Release() {
-       s.registry.DelSubscription(s.Seq)
-       err := s.UpdateRoute(DELETE)
+func (s *Subscription) updateRouteImpl(act Action) error {
+       subRouteAction := SubRouteInfo{act, s.EpList, s.Seq}
+       err := s.registry.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
        if err != nil {
-               xapp.Logger.Error("%s", err.Error())
+               return fmt.Errorf("%s %s", s.stringImpl(), err.Error())
        }
+       return nil
 }
index 20af9f0..c16a76a 100644 (file)
@@ -31,47 +31,62 @@ import (
 type Tracker struct {
        mutex                sync.Mutex
        transactionXappTable map[TransactionXappKey]*Transaction
+       transSeq             uint64
 }
 
 func (t *Tracker) Init() {
        t.transactionXappTable = make(map[TransactionXappKey]*Transaction)
 }
 
+func (t *Tracker) NewTransactionFromSkel(transSkel *Transaction) *Transaction {
+       t.mutex.Lock()
+       defer t.mutex.Unlock()
+       trans := transSkel
+       if trans == nil {
+               trans = &Transaction{}
+       }
+       trans.EventChan = make(chan interface{})
+       trans.tracker = t
+       trans.Seq = t.transSeq
+       t.transSeq++
+       xapp.Logger.Debug("Transaction: Create %s", trans.String())
+       return trans
+}
+
+func (t *Tracker) NewTransaction(meid *xapp.RMRMeid) *Transaction {
+       trans := &Transaction{}
+       trans.Meid = meid
+       trans = t.NewTransactionFromSkel(trans)
+       return trans
+}
+
 func (t *Tracker) TrackTransaction(
        endpoint *RmrEndpoint,
        xid string,
-       meid *xapp.RMRMeid,
-       respReceived bool,
-       forwardRespToXapp bool) (*Transaction, error) {
+       meid *xapp.RMRMeid) (*Transaction, error) {
 
        if endpoint == nil {
                err := fmt.Errorf("Tracker: No valid endpoint given")
                return nil, err
        }
 
-       trans := &Transaction{
-               tracker:           nil,
-               Subs:              nil,
-               RmrEndpoint:       *endpoint,
-               Xid:               xid,
-               Meid:              meid,
-               RespReceived:      respReceived,
-               ForwardRespToXapp: forwardRespToXapp,
-       }
+       trans := &Transaction{}
+       trans.XappKey = &TransactionXappKey{*endpoint, xid}
+       trans.Meid = meid
+       trans = t.NewTransactionFromSkel(trans)
 
        t.mutex.Lock()
        defer t.mutex.Unlock()
 
-       xappkey := TransactionXappKey{*endpoint, xid}
-       if othtrans, ok := t.transactionXappTable[xappkey]; ok {
+       if othtrans, ok := t.transactionXappTable[*trans.XappKey]; ok {
                err := fmt.Errorf("Tracker: %s is ongoing, %s not created ", othtrans, trans)
                return nil, err
        }
 
        trans.tracker = t
-       t.transactionXappTable[xappkey] = trans
-       xapp.Logger.Info("Tracker: Create %s", trans.String())
-       xapp.Logger.Debug("Tracker: transtable=%v", t.transactionXappTable)
+       t.transactionXappTable[*trans.XappKey] = trans
+       xapp.Logger.Debug("Tracker: Add %s", trans.String())
+       //xapp.Logger.Debug("Tracker: transtable=%v", t.transactionXappTable)
        return trans, nil
 }
 
@@ -79,9 +94,9 @@ func (t *Tracker) UnTrackTransaction(xappKey TransactionXappKey) (*Transaction,
        t.mutex.Lock()
        defer t.mutex.Unlock()
        if trans, ok2 := t.transactionXappTable[xappKey]; ok2 {
-               xapp.Logger.Info("Tracker: Delete %s", trans.String())
+               xapp.Logger.Debug("Tracker: Delete %s", trans.String())
                delete(t.transactionXappTable, xappKey)
-               xapp.Logger.Debug("Tracker: transtable=%v", t.transactionXappTable)
+               //xapp.Logger.Debug("Tracker: transtable=%v", t.transactionXappTable)
                return trans, nil
        }
        return nil, fmt.Errorf("Tracker: No record %s", xappKey)
index 9adaeca..a0a260f 100644 (file)
 package control
 
 import (
-       "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
        "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/packer"
        "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
        "strconv"
        "sync"
+       "time"
 )
 
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+
+type TransactionBase struct {
+       mutex     sync.Mutex         //
+       Seq       uint64             //
+       tracker   *Tracker           //tracker instance
+       Meid      *xapp.RMRMeid      //meid transaction related
+       Mtype     int                //Encoded message type to be send
+       Payload   *packer.PackedData //Encoded message to be send
+       EventChan chan interface{}
+}
+
+func (t *TransactionBase) SendEvent(event interface{}, waittime time.Duration) (bool, bool) {
+       if waittime > 0 {
+               select {
+               case t.EventChan <- event:
+                       return true, false
+               case <-time.After(waittime):
+                       return false, true
+               }
+               return false, false
+       }
+       t.EventChan <- event
+       return true, false
+}
+
+func (t *TransactionBase) WaitEvent(waittime time.Duration) (interface{}, bool) {
+       if waittime > 0 {
+               select {
+               case event := <-t.EventChan:
+                       return event, false
+               case <-time.After(waittime):
+                       return nil, true
+               }
+       }
+       event := <-t.EventChan
+       return event, false
+}
+
+func (t *TransactionBase) GetMtype() int {
+       t.mutex.Lock()
+       defer t.mutex.Unlock()
+       return t.Mtype
+}
+
+func (t *TransactionBase) GetMeid() *xapp.RMRMeid {
+       t.mutex.Lock()
+       defer t.mutex.Unlock()
+       if t.Meid != nil {
+               return t.Meid
+       }
+       return nil
+}
+
+func (t *TransactionBase) GetPayload() *packer.PackedData {
+       t.mutex.Lock()
+       defer t.mutex.Unlock()
+       return t.Payload
+}
+
 //-----------------------------------------------------------------------------
 //
 //-----------------------------------------------------------------------------
@@ -43,30 +105,16 @@ func (key *TransactionXappKey) String() string {
 //
 //-----------------------------------------------------------------------------
 type Transaction struct {
-       mutex             sync.Mutex
-       tracker           *Tracker                             //tracker instance
-       Subs              *Subscription                        //related subscription
-       RmrEndpoint       RmrEndpoint                          //xapp endpoint
-       Xid               string                               //xapp xid in req
-       Meid              *xapp.RMRMeid                        //meid transaction related
-       SubReqMsg         *e2ap.E2APSubscriptionRequest        //SubReq TODO: maybe own transactions per type
-       SubRespMsg        *e2ap.E2APSubscriptionResponse       //SubResp TODO: maybe own transactions per type
-       SubFailMsg        *e2ap.E2APSubscriptionFailure        //SubFail TODO: maybe own transactions per type
-       SubDelReqMsg      *e2ap.E2APSubscriptionDeleteRequest  //SubDelReq TODO: maybe own transactions per type
-       SubDelRespMsg     *e2ap.E2APSubscriptionDeleteResponse //SubDelResp TODO: maybe own transactions per type
-       SubDelFailMsg     *e2ap.E2APSubscriptionDeleteFailure  //SubDelFail TODO: maybe own transactions per type
-       Mtype             int                                  //Encoded message type to be send
-       Payload           *packer.PackedData                   //Encoded message to be send
-       RespReceived      bool
-       ForwardRespToXapp bool
+       TransactionBase                     //
+       XappKey         *TransactionXappKey //
 }
 
 func (t *Transaction) StringImpl() string {
-       var subId string = "?"
-       if t.Subs != nil {
-               subId = strconv.FormatUint(uint64(t.Subs.Seq), 10)
+       var transkey string = "transkey(N/A)"
+       if t.XappKey != nil {
+               transkey = t.XappKey.String()
        }
-       return "trans(" + t.RmrEndpoint.String() + "/" + t.Xid + "/" + t.Meid.RanName + "/" + subId + ")"
+       return "trans(" + strconv.FormatUint(uint64(t.Seq), 10) + "/" + t.Meid.RanName + "/" + transkey + ")"
 }
 
 func (t *Transaction) String() string {
@@ -75,62 +123,42 @@ func (t *Transaction) String() string {
        return t.StringImpl()
 }
 
-func (t *Transaction) GetXid() string {
+func (t *Transaction) GetEndpoint() *RmrEndpoint {
        t.mutex.Lock()
        defer t.mutex.Unlock()
-       return t.Xid
-}
-
-func (t *Transaction) GetMtype() int {
-       t.mutex.Lock()
-       defer t.mutex.Unlock()
-       return t.Mtype
-}
-
-func (t *Transaction) GetMeid() *xapp.RMRMeid {
-       t.mutex.Lock()
-       defer t.mutex.Unlock()
-       if t.Meid != nil {
-               return t.Meid
+       if t.XappKey != nil {
+               return &t.XappKey.RmrEndpoint
        }
        return nil
 }
 
-func (t *Transaction) GetSrc() string {
-       t.mutex.Lock()
-       defer t.mutex.Unlock()
-       return t.RmrEndpoint.String()
-}
-
-func (t *Transaction) CheckResponseReceived() bool {
+func (t *Transaction) GetXid() string {
        t.mutex.Lock()
        defer t.mutex.Unlock()
-       if t.RespReceived == false {
-               t.RespReceived = true
-               return false
+       if t.XappKey != nil {
+               return t.XappKey.Xid
        }
-       return true
+       return ""
 }
 
-func (t *Transaction) RetryTransaction() {
+func (t *Transaction) GetSrc() string {
        t.mutex.Lock()
        defer t.mutex.Unlock()
-       t.RespReceived = false
+       if t.XappKey != nil {
+               return t.XappKey.RmrEndpoint.String()
+       }
+       return ""
 }
 
 func (t *Transaction) Release() {
        t.mutex.Lock()
-       subs := t.Subs
+       xapp.Logger.Debug("Transaction: Release %s", t.StringImpl())
        tracker := t.tracker
-       xappkey := TransactionXappKey{t.RmrEndpoint, t.Xid}
-       t.Subs = nil
+       xappkey := t.XappKey
        t.tracker = nil
        t.mutex.Unlock()
 
-       if subs != nil {
-               subs.UnSetTransaction(t)
-       }
-       if tracker != nil {
-               tracker.UnTrackTransaction(xappkey)
+       if tracker != nil && xappkey != nil {
+               tracker.UnTrackTransaction(*xappkey)
        }
 }
index e740349..d0c7fb8 100644 (file)
@@ -91,6 +91,42 @@ func (eplist *RmrEndpointList) String() string {
        return strings.Join(valuesText, ",")
 }
 
+func (eplist *RmrEndpointList) Size() int {
+       return len(eplist.Endpoints)
+}
+
+func (eplist *RmrEndpointList) AddEndpoint(ep *RmrEndpoint) bool {
+       for i := range eplist.Endpoints {
+               if (eplist.Endpoints[i].Addr == ep.Addr) && (eplist.Endpoints[i].Port == ep.Port) {
+                       return false
+               }
+       }
+       eplist.Endpoints = append(eplist.Endpoints, *ep)
+       return true
+}
+
+func (eplist *RmrEndpointList) DelEndpoint(ep *RmrEndpoint) bool {
+       for i := range eplist.Endpoints {
+               if (eplist.Endpoints[i].Addr == ep.Addr) && (eplist.Endpoints[i].Port == ep.Port) {
+                       eplist.Endpoints[i] = eplist.Endpoints[len(eplist.Endpoints)-1]
+                       eplist.Endpoints[len(eplist.Endpoints)-1] = RmrEndpoint{"", 0}
+                       eplist.Endpoints = eplist.Endpoints[:len(eplist.Endpoints)-1]
+                       return true
+               }
+       }
+       return false
+}
+
+func (eplist *RmrEndpointList) DelEndpoints(otheplist *RmrEndpointList) bool {
+       var retval bool = false
+       for i := range otheplist.Endpoints {
+               if eplist.DelEndpoint(&eplist.Endpoints[i]) {
+                       retval = true
+               }
+       }
+       return retval
+}
+
 func (eplist *RmrEndpointList) HasEndpoint(ep *RmrEndpoint) bool {
        for i := range eplist.Endpoints {
                if (eplist.Endpoints[i].Addr == ep.Addr) && (eplist.Endpoints[i].Port == ep.Port) {