RICPLT-2988 Unittest timing issues during retransmission case
[ric-plt/submgr.git] / pkg / control / control.go
index d5a92b6..9ce34a0 100755 (executable)
@@ -44,6 +44,7 @@ type Control struct {
        tracker      *Tracker
        timerMap     *TimerMap
        rmrSendMutex sync.Mutex
+       msgCounter   uint64
 }
 
 type RMRMeid struct {
@@ -98,6 +99,7 @@ func NewControl() *Control {
                rtmgrClient: &rtmgrClient,
                tracker:     tracker,
                timerMap:    timerMap,
+               msgCounter:  0,
        }
 }
 
@@ -130,6 +132,7 @@ func (c *Control) rmrReplyToSender(params *xapp.RMRParams) (err error) {
 }
 
 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
+       c.msgCounter++
        switch msg.Mtype {
        case xapp.RICMessageTypes["RIC_SUB_REQ"]:
                go c.handleSubscriptionRequest(msg)
@@ -155,35 +158,34 @@ func (c *Control) handleSubscriptionRequest(params *xapp.RMRParams) {
        params.Mbuf = nil
 
        /* Reserve a sequence number and set it in the payload */
-       newSubId, isIdValid := c.registry.ReserveSequenceNumber()
-       if isIdValid != true {
+       subs := c.registry.ReserveSubscription()
+       if subs == nil {
                xapp.Logger.Error("SubReq: Failed to reserve sequence number. Dropping this msg. SubId: %v, Xid: %s", params.SubId, params.Xid)
-               c.registry.releaseSequenceNumber(newSubId)
                return
        }
 
-       params.SubId = int(newSubId)
-       err := c.e2ap.SetSubscriptionRequestSequenceNumber(params.Payload, newSubId)
+       params.SubId = int(subs.Seq)
+       err := c.e2ap.SetSubscriptionRequestSequenceNumber(params.Payload, subs.Seq)
        if err != nil {
                xapp.Logger.Error("SubReq: Unable to get Sequence Number from Payload. Dropping this msg. Err: %v, SubId: %v, Xid: %s, Payload %X", err, params.SubId, params.Xid, params.Payload)
-               c.registry.releaseSequenceNumber(newSubId)
+               c.registry.releaseSequenceNumber(subs.Seq)
                return
        }
 
        srcAddr, srcPort, err := c.rtmgrClient.SplitSource(params.Src)
        if err != nil {
                xapp.Logger.Error("SubReq: Failed to update routing-manager. Dropping this msg. Err: %s, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
-               c.registry.releaseSequenceNumber(newSubId)
+               c.registry.releaseSequenceNumber(subs.Seq)
                return
        }
 
        // Create transatcion record for every subscription request
        var forwardRespToXapp bool = true
        var responseReceived bool = false
-       transaction, err := c.tracker.TrackTransaction(newSubId, CREATE, *srcAddr, *srcPort, params, responseReceived, forwardRespToXapp)
+       transaction, err := c.tracker.TrackTransaction(subs.Seq, CREATE, *srcAddr, *srcPort, params, responseReceived, forwardRespToXapp)
        if err != nil {
                xapp.Logger.Error("SubReq: Failed to create transaction record. Dropping this msg. Err: %v SubId: %v, Xid: %s", err, params.SubId, params.Xid)
-               c.registry.releaseSequenceNumber(newSubId)
+               c.registry.releaseSequenceNumber(subs.Seq)
                return
        }
 
@@ -194,7 +196,7 @@ func (c *Control) handleSubscriptionRequest(params *xapp.RMRParams) {
        err = c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
        if err != nil {
                xapp.Logger.Error("SubReq: Failed to update routing manager. Dropping this msg. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
-               c.registry.releaseSequenceNumber(newSubId)
+               c.registry.releaseSequenceNumber(subs.Seq)
                return
        }
 
@@ -204,7 +206,7 @@ func (c *Control) handleSubscriptionRequest(params *xapp.RMRParams) {
        if err != nil {
                xapp.Logger.Error("SubReq: Failed to send request to E2T %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
        }
-       c.timerMap.StartTimer("RIC_SUB_REQ", int(newSubId), subReqTime, FirstTry, c.handleSubscriptionRequestTimer)
+       c.timerMap.StartTimer("RIC_SUB_REQ", int(subs.Seq), subReqTime, FirstTry, c.handleSubscriptionRequestTimer)
        xapp.Logger.Debug("SubReq: Debugging transaction table = %v", c.tracker.transactionTable)
        return
 }
@@ -421,13 +423,13 @@ func (c *Control) handleSubscriptionDeleteRequest(params *xapp.RMRParams) {
        xapp.Logger.Info("SubDelReq: Received payloadSeqNum: %v", payloadSeqNum)
 
        if c.registry.IsValidSequenceNumber(payloadSeqNum) {
-               c.registry.deleteSubscription(payloadSeqNum)
                var forwardRespToXapp bool = true
                _, err = c.trackDeleteTransaction(params, payloadSeqNum, forwardRespToXapp)
                if err != nil {
                        xapp.Logger.Error("SubDelReq: Failed to create transaction record. Dropping this msg. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
                        return
                }
+               c.registry.setSubscriptionToUnConfirmed(payloadSeqNum)
        } else {
                xapp.Logger.Error("SubDelReq: Not valid sequence number. Dropping this msg. SubId: %v, Xid: %s", params.SubId, params.Xid)
                return