RICPLT-2988 Unittest timing issues during retransmission case 70/2170/1
authorJuha Hyttinen <juha.hyttinen@nokia.com>
Wed, 8 Jan 2020 11:01:52 +0000 (13:01 +0200)
committerJuha Hyttinen <juha.hyttinen@nokia.com>
Wed, 8 Jan 2020 12:20:46 +0000 (14:20 +0200)
Preparation also for RICPLT-2571

Change-Id: Ie98aface81a308022ea4015d88aab449564f932f
Signed-off-by: Juha Hyttinen <juha.hyttinen@nokia.com>
pkg/control/control.go
pkg/control/main_test.go
pkg/control/messaging_test.go
pkg/control/registry.go
pkg/control/tracker.go
pkg/control/types.go

index d5a92b6..9ce34a0 100755 (executable)
@@ -44,6 +44,7 @@ type Control struct {
        tracker      *Tracker
        timerMap     *TimerMap
        rmrSendMutex sync.Mutex
+       msgCounter   uint64
 }
 
 type RMRMeid struct {
@@ -98,6 +99,7 @@ func NewControl() *Control {
                rtmgrClient: &rtmgrClient,
                tracker:     tracker,
                timerMap:    timerMap,
+               msgCounter:  0,
        }
 }
 
@@ -130,6 +132,7 @@ func (c *Control) rmrReplyToSender(params *xapp.RMRParams) (err error) {
 }
 
 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
+       c.msgCounter++
        switch msg.Mtype {
        case xapp.RICMessageTypes["RIC_SUB_REQ"]:
                go c.handleSubscriptionRequest(msg)
@@ -155,35 +158,34 @@ func (c *Control) handleSubscriptionRequest(params *xapp.RMRParams) {
        params.Mbuf = nil
 
        /* Reserve a sequence number and set it in the payload */
-       newSubId, isIdValid := c.registry.ReserveSequenceNumber()
-       if isIdValid != true {
+       subs := c.registry.ReserveSubscription()
+       if subs == nil {
                xapp.Logger.Error("SubReq: Failed to reserve sequence number. Dropping this msg. SubId: %v, Xid: %s", params.SubId, params.Xid)
-               c.registry.releaseSequenceNumber(newSubId)
                return
        }
 
-       params.SubId = int(newSubId)
-       err := c.e2ap.SetSubscriptionRequestSequenceNumber(params.Payload, newSubId)
+       params.SubId = int(subs.Seq)
+       err := c.e2ap.SetSubscriptionRequestSequenceNumber(params.Payload, subs.Seq)
        if err != nil {
                xapp.Logger.Error("SubReq: Unable to get Sequence Number from Payload. Dropping this msg. Err: %v, SubId: %v, Xid: %s, Payload %X", err, params.SubId, params.Xid, params.Payload)
-               c.registry.releaseSequenceNumber(newSubId)
+               c.registry.releaseSequenceNumber(subs.Seq)
                return
        }
 
        srcAddr, srcPort, err := c.rtmgrClient.SplitSource(params.Src)
        if err != nil {
                xapp.Logger.Error("SubReq: Failed to update routing-manager. Dropping this msg. Err: %s, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
-               c.registry.releaseSequenceNumber(newSubId)
+               c.registry.releaseSequenceNumber(subs.Seq)
                return
        }
 
        // Create transatcion record for every subscription request
        var forwardRespToXapp bool = true
        var responseReceived bool = false
-       transaction, err := c.tracker.TrackTransaction(newSubId, CREATE, *srcAddr, *srcPort, params, responseReceived, forwardRespToXapp)
+       transaction, err := c.tracker.TrackTransaction(subs.Seq, CREATE, *srcAddr, *srcPort, params, responseReceived, forwardRespToXapp)
        if err != nil {
                xapp.Logger.Error("SubReq: Failed to create transaction record. Dropping this msg. Err: %v SubId: %v, Xid: %s", err, params.SubId, params.Xid)
-               c.registry.releaseSequenceNumber(newSubId)
+               c.registry.releaseSequenceNumber(subs.Seq)
                return
        }
 
@@ -194,7 +196,7 @@ func (c *Control) handleSubscriptionRequest(params *xapp.RMRParams) {
        err = c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
        if err != nil {
                xapp.Logger.Error("SubReq: Failed to update routing manager. Dropping this msg. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
-               c.registry.releaseSequenceNumber(newSubId)
+               c.registry.releaseSequenceNumber(subs.Seq)
                return
        }
 
@@ -204,7 +206,7 @@ func (c *Control) handleSubscriptionRequest(params *xapp.RMRParams) {
        if err != nil {
                xapp.Logger.Error("SubReq: Failed to send request to E2T %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
        }
-       c.timerMap.StartTimer("RIC_SUB_REQ", int(newSubId), subReqTime, FirstTry, c.handleSubscriptionRequestTimer)
+       c.timerMap.StartTimer("RIC_SUB_REQ", int(subs.Seq), subReqTime, FirstTry, c.handleSubscriptionRequestTimer)
        xapp.Logger.Debug("SubReq: Debugging transaction table = %v", c.tracker.transactionTable)
        return
 }
@@ -421,13 +423,13 @@ func (c *Control) handleSubscriptionDeleteRequest(params *xapp.RMRParams) {
        xapp.Logger.Info("SubDelReq: Received payloadSeqNum: %v", payloadSeqNum)
 
        if c.registry.IsValidSequenceNumber(payloadSeqNum) {
-               c.registry.deleteSubscription(payloadSeqNum)
                var forwardRespToXapp bool = true
                _, err = c.trackDeleteTransaction(params, payloadSeqNum, forwardRespToXapp)
                if err != nil {
                        xapp.Logger.Error("SubDelReq: Failed to create transaction record. Dropping this msg. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
                        return
                }
+               c.registry.setSubscriptionToUnConfirmed(payloadSeqNum)
        } else {
                xapp.Logger.Error("SubDelReq: Not valid sequence number. Dropping this msg. SubId: %v, Xid: %s", params.SubId, params.Xid)
                return
index 1c69b01..dc62225 100644 (file)
@@ -117,24 +117,43 @@ func initTestingMessageChannel() testingMessageChannel {
 //-----------------------------------------------------------------------------
 //
 //-----------------------------------------------------------------------------
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+type xappTransaction struct {
+       tc   *testingXappControl
+       xid  string
+       meid *xapp.RMRMeid
+}
 
 type testingXappControl struct {
        testingRmrControl
        testingMessageChannel
        meid    *xapp.RMRMeid
        xid_seq uint64
-       xid     string
 }
 
 func (tc *testingXappControl) newXid() string {
-       tc.xid = tc.desc + "_XID_" + strconv.FormatUint(uint64(tc.xid_seq), 10)
+       var xid string
+       xid = tc.desc + "_XID_" + strconv.FormatUint(uint64(tc.xid_seq), 10)
        tc.xid_seq++
-       return tc.xid
+       return xid
+}
+
+func (tc *testingXappControl) newXappTransaction(xid *string, ranname string) *xappTransaction {
+       trans := &xappTransaction{}
+       trans.tc = tc
+       if xid == nil {
+               trans.xid = tc.newXid()
+       } else {
+               trans.xid = *xid
+       }
+       trans.meid = &xapp.RMRMeid{RanName: ranname}
+       return trans
 }
 
 func (tc *testingXappControl) Consume(msg *xapp.RMRParams) (err error) {
 
-       //if msg.Xid == tc.xid {
        if strings.Contains(msg.Xid, tc.desc) {
                xapp.Logger.Info("(%s) Consume mtype=%s subid=%d xid=%s", tc.desc, xapp.RicMessageTypeToName[msg.Mtype], msg.SubId, msg.Xid)
                tc.rmrConChan <- msg
@@ -149,8 +168,7 @@ func createNewXappControl(desc string, rtfile string, port string, stat string,
        xappCtrl.testingRmrControl = initTestingRmrControl(desc, rtfile, port, stat, xappCtrl)
        xappCtrl.testingMessageChannel = initTestingMessageChannel()
        xappCtrl.meid = &xapp.RMRMeid{RanName: ranname}
-       xappCtrl.xid_seq = 0
-       xappCtrl.newXid()
+       xappCtrl.xid_seq = 1
        return xappCtrl
 }
 
index b8ed6e6..2d45ede 100644 (file)
@@ -31,27 +31,12 @@ import (
 //-----------------------------------------------------------------------------
 //
 //-----------------------------------------------------------------------------
-
 var e2asnpacker e2ap.E2APPackerIf = e2ap_wrapper.NewAsn1E2Packer()
 
 //-----------------------------------------------------------------------------
 //
 //-----------------------------------------------------------------------------
-type xappTransaction struct {
-       xappConn *testingXappControl
-       xid      string
-}
-
-func newXappTransaction(xappConn *testingXappControl) {
-       trans := &xappTransaction{}
-       trans.xappConn = xappConn
-       trans.xid = xappConn.newXid()
-}
-
-//-----------------------------------------------------------------------------
-//
-//-----------------------------------------------------------------------------
-func (xappConn *testingXappControl) handle_xapp_subs_req(t *testing.T, updseq bool) {
+func (xappConn *testingXappControl) handle_xapp_subs_req(t *testing.T, oldTrans *xappTransaction) *xappTransaction {
        xapp.Logger.Info("handle_xapp_subs_req")
        e2SubsReq := e2asnpacker.NewPackerSubscriptionRequest()
 
@@ -93,29 +78,34 @@ func (xappConn *testingXappControl) handle_xapp_subs_req(t *testing.T, updseq bo
        err, packedMsg := e2SubsReq.Pack(nil)
        if err != nil {
                testError(t, "(%s) pack NOK %s", xappConn.desc, err.Error())
+               return nil
+       }
+
+       var trans *xappTransaction = oldTrans
+       if trans == nil {
+               trans = xappConn.newXappTransaction(nil, "RAN_NAME_1")
        }
 
        params := &xapp.RMRParams{}
        params.Mtype = xapp.RIC_SUB_REQ
        params.SubId = -1
        params.Payload = packedMsg.Buf
-       params.Meid = xappConn.meid
-       if updseq {
-               xappConn.newXid()
-       }
-       params.Xid = xappConn.xid
+       params.Meid = trans.meid
+       params.Xid = trans.xid
        params.Mbuf = nil
 
        snderr := xappConn.RmrSend(params)
        if snderr != nil {
                testError(t, "(%s) RMR SEND FAILED: %s", xappConn.desc, snderr.Error())
+               return nil
        }
+       return trans
 }
 
 //-----------------------------------------------------------------------------
 //
 //-----------------------------------------------------------------------------
-func (xappConn *testingXappControl) handle_xapp_subs_resp(t *testing.T) int {
+func (xappConn *testingXappControl) handle_xapp_subs_resp(t *testing.T, trans *xappTransaction) int {
        xapp.Logger.Info("handle_xapp_subs_resp")
        e2SubsResp := e2asnpacker.NewPackerSubscriptionResponse()
        var e2SubsId int
@@ -126,7 +116,10 @@ func (xappConn *testingXappControl) handle_xapp_subs_resp(t *testing.T) int {
        select {
        case msg := <-xappConn.rmrConChan:
                if msg.Mtype != xapp.RICMessageTypes["RIC_SUB_RESP"] {
-                       testError(t, "(%s) Received wrong mtype expected %s got %s, error", xappConn.desc, "RIC_SUB_RESP", xapp.RicMessageTypeToName[msg.Mtype])
+                       testError(t, "(%s) Received RIC_SUB_RESP wrong mtype expected %s got %s, error", xappConn.desc, "RIC_SUB_RESP", xapp.RicMessageTypeToName[msg.Mtype])
+                       return -1
+               } else if msg.Xid != trans.xid {
+                       testError(t, "(%s) Received RIC_SUB_RESP wrong xid expected %s got %s, error", xappConn.desc, trans.xid, msg.Xid)
                        return -1
                } else {
                        packedData := &packer.PackedData{}
@@ -155,7 +148,7 @@ func (xappConn *testingXappControl) handle_xapp_subs_resp(t *testing.T) int {
 //-----------------------------------------------------------------------------
 //
 //-----------------------------------------------------------------------------
-func (xappConn *testingXappControl) handle_xapp_subs_del_req(t *testing.T, updseq bool, e2SubsId int) {
+func (xappConn *testingXappControl) handle_xapp_subs_del_req(t *testing.T, oldTrans *xappTransaction, e2SubsId int) *xappTransaction {
        xapp.Logger.Info("handle_xapp_subs_del_req")
        e2SubsDelReq := e2asnpacker.NewPackerSubscriptionDeleteRequest()
 
@@ -174,29 +167,34 @@ func (xappConn *testingXappControl) handle_xapp_subs_del_req(t *testing.T, updse
        err, packedMsg := e2SubsDelReq.Pack(nil)
        if err != nil {
                testError(t, "(%s) pack NOK %s", xappConn.desc, err.Error())
+               return nil
+       }
+
+       var trans *xappTransaction = oldTrans
+       if trans == nil {
+               trans = xappConn.newXappTransaction(nil, "RAN_NAME_1")
        }
 
        params := &xapp.RMRParams{}
        params.Mtype = xapp.RIC_SUB_DEL_REQ
        params.SubId = e2SubsId
        params.Payload = packedMsg.Buf
-       params.Meid = xappConn.meid
-       if updseq {
-               xappConn.newXid()
-       }
-       params.Xid = xappConn.xid
+       params.Meid = trans.meid
+       params.Xid = trans.xid
        params.Mbuf = nil
 
        snderr := xappConn.RmrSend(params)
        if snderr != nil {
                testError(t, "(%s) RMR SEND FAILED: %s", xappConn.desc, snderr.Error())
+               return nil
        }
+       return trans
 }
 
 //-----------------------------------------------------------------------------
 //
 //-----------------------------------------------------------------------------
-func (xappConn *testingXappControl) handle_xapp_subs_del_resp(t *testing.T) {
+func (xappConn *testingXappControl) handle_xapp_subs_del_resp(t *testing.T, trans *xappTransaction) {
        xapp.Logger.Info("handle_xapp_subs_del_resp")
        e2SubsDelResp := e2asnpacker.NewPackerSubscriptionDeleteResponse()
 
@@ -206,7 +204,10 @@ func (xappConn *testingXappControl) handle_xapp_subs_del_resp(t *testing.T) {
        select {
        case msg := <-xappConn.rmrConChan:
                if msg.Mtype != xapp.RICMessageTypes["RIC_SUB_DEL_RESP"] {
-                       testError(t, "(%s) Received wrong mtype expected %s got %s, error", xappConn.desc, "RIC_SUB_DEL_RESP", xapp.RicMessageTypeToName[msg.Mtype])
+                       testError(t, "(%s) Received RIC_SUB_DEL_RESP wrong mtype expected %s got %s, error", xappConn.desc, "RIC_SUB_DEL_RESP", xapp.RicMessageTypeToName[msg.Mtype])
+                       return
+               } else if msg.Xid != trans.xid {
+                       testError(t, "(%s) Received RIC_SUB_DEL_RESP wrong xid expected %s got %s, error", xappConn.desc, trans.xid, msg.Xid)
                        return
                } else {
                        packedData := &packer.PackedData{}
@@ -403,6 +404,44 @@ func (mc *testingMainControl) wait_subs_clean(t *testing.T, e2SubsId int, secs i
        return false
 }
 
+func (mc *testingMainControl) get_seqcnt(t *testing.T) uint16 {
+       mc.c.registry.mutex.Lock()
+       defer mc.c.registry.mutex.Unlock()
+       return mc.c.registry.counter
+}
+
+func (mc *testingMainControl) wait_seqcnt_change(t *testing.T, orig uint16, secs int) (uint16, bool) {
+       i := 1
+       for ; i <= secs*2; i++ {
+               mc.c.registry.mutex.Lock()
+               curr := mc.c.registry.counter
+               mc.c.registry.mutex.Unlock()
+               if curr != orig {
+                       return curr, true
+               }
+               time.Sleep(500 * time.Millisecond)
+       }
+       testError(t, "(general) no seq change within %d secs", secs)
+       return 0, false
+}
+
+func (mc *testingMainControl) get_msgcounter(t *testing.T) uint64 {
+       return mc.c.msgCounter
+}
+
+func (mc *testingMainControl) wait_msgcounter_change(t *testing.T, orig uint64, secs int) (uint64, bool) {
+       i := 1
+       for ; i <= secs*2; i++ {
+               curr := mc.c.msgCounter
+               if curr != orig {
+                       return curr, true
+               }
+               time.Sleep(500 * time.Millisecond)
+       }
+       testError(t, "(general) no msg counter change within %d secs", secs)
+       return 0, false
+}
+
 //-----------------------------------------------------------------------------
 // TestSubReqAndSubDelOk
 //
@@ -440,15 +479,15 @@ func (mc *testingMainControl) wait_subs_clean(t *testing.T, e2SubsId int, secs i
 func TestSubReqAndSubDelOk(t *testing.T) {
        xapp.Logger.Info("TestSubReqAndSubDelOk")
 
-       xappConn1.handle_xapp_subs_req(t, true)
+       cretrans := xappConn1.handle_xapp_subs_req(t, nil)
        crereq, cremsg := e2termConn.handle_e2term_subs_req(t)
        e2termConn.handle_e2term_subs_resp(t, crereq, cremsg)
-       e2SubsId := xappConn1.handle_xapp_subs_resp(t)
+       e2SubsId := xappConn1.handle_xapp_subs_resp(t, cretrans)
 
-       xappConn1.handle_xapp_subs_del_req(t, true, e2SubsId)
+       deltrans := xappConn1.handle_xapp_subs_del_req(t, nil, e2SubsId)
        delreq, delmsg := e2termConn.handle_e2term_subs_del_req(t)
        e2termConn.handle_e2term_subs_del_resp(t, delreq, delmsg)
-       xappConn1.handle_xapp_subs_del_resp(t)
+       xappConn1.handle_xapp_subs_del_resp(t, deltrans)
 
        //Wait that subs is cleaned
        mainCtrl.wait_subs_clean(t, e2SubsId, 10)
@@ -486,19 +525,21 @@ func TestSubReqRetransmission(t *testing.T) {
        xapp.Logger.Info("TestSubReqRetransmission")
 
        //Subs Create
-       xappConn1.handle_xapp_subs_req(t, true)
+       cretrans := xappConn1.handle_xapp_subs_req(t, nil)
        crereq, cremsg := e2termConn.handle_e2term_subs_req(t)
-       xappConn1.handle_xapp_subs_req(t, false)
 
-       e2termConn.handle_e2term_subs_resp(t, crereq, cremsg)
+       seqBef := mainCtrl.get_msgcounter(t)
+       xappConn1.handle_xapp_subs_req(t, cretrans) //Retransmitted SubReq
+       mainCtrl.wait_msgcounter_change(t, seqBef, 10)
 
-       e2SubsId := xappConn1.handle_xapp_subs_resp(t)
+       e2termConn.handle_e2term_subs_resp(t, crereq, cremsg)
+       e2SubsId := xappConn1.handle_xapp_subs_resp(t, cretrans)
 
        //Subs Delete
-       xappConn1.handle_xapp_subs_del_req(t, true, e2SubsId)
+       deltrans := xappConn1.handle_xapp_subs_del_req(t, nil, e2SubsId)
        delreq, delmsg := e2termConn.handle_e2term_subs_del_req(t)
        e2termConn.handle_e2term_subs_del_resp(t, delreq, delmsg)
-       xappConn1.handle_xapp_subs_del_resp(t)
+       xappConn1.handle_xapp_subs_del_resp(t, deltrans)
 
        //Wait that subs is cleaned
        mainCtrl.wait_subs_clean(t, e2SubsId, 10)
@@ -535,20 +576,21 @@ func TestSubDelReqRetransmission(t *testing.T) {
        xapp.Logger.Info("TestSubDelReqRetransmission")
 
        //Subs Create
-       xappConn1.handle_xapp_subs_req(t, true)
+       cretrans := xappConn1.handle_xapp_subs_req(t, nil)
        crereq, cremsg := e2termConn.handle_e2term_subs_req(t)
        e2termConn.handle_e2term_subs_resp(t, crereq, cremsg)
-       e2SubsId := xappConn1.handle_xapp_subs_resp(t)
+       e2SubsId := xappConn1.handle_xapp_subs_resp(t, cretrans)
 
        //Subs Delete
-       xappConn1.handle_xapp_subs_del_req(t, true, e2SubsId)
+       deltrans := xappConn1.handle_xapp_subs_del_req(t, nil, e2SubsId)
        delreq, delmsg := e2termConn.handle_e2term_subs_del_req(t)
 
-       <-time.After(2 * time.Second)
-       xappConn1.handle_xapp_subs_del_req(t, false, e2SubsId)
+       seqBef := mainCtrl.get_msgcounter(t)
+       xappConn1.handle_xapp_subs_del_req(t, deltrans, e2SubsId) //Retransmitted SubDelReq
+       mainCtrl.wait_msgcounter_change(t, seqBef, 10)
 
        e2termConn.handle_e2term_subs_del_resp(t, delreq, delmsg)
-       xappConn1.handle_xapp_subs_del_resp(t)
+       xappConn1.handle_xapp_subs_del_resp(t, deltrans)
 
        //Wait that subs is cleaned
        mainCtrl.wait_subs_clean(t, e2SubsId, 10)
@@ -596,34 +638,111 @@ func TestSubReqAndSubDelOkTwoParallel(t *testing.T) {
        xapp.Logger.Info("TestSubReqAndSubDelOkTwoParallel")
 
        //Req1
-       xappConn1.handle_xapp_subs_req(t, true)
+       cretrans1 := xappConn1.handle_xapp_subs_req(t, nil)
        crereq1, cremsg1 := e2termConn.handle_e2term_subs_req(t)
 
        //Req2
-       xappConn2.handle_xapp_subs_req(t, true)
+       cretrans2 := xappConn2.handle_xapp_subs_req(t, nil)
        crereq2, cremsg2 := e2termConn.handle_e2term_subs_req(t)
 
        //Resp1
        e2termConn.handle_e2term_subs_resp(t, crereq1, cremsg1)
-       e2SubsId1 := xappConn1.handle_xapp_subs_resp(t)
+       e2SubsId1 := xappConn1.handle_xapp_subs_resp(t, cretrans1)
 
        //Resp2
        e2termConn.handle_e2term_subs_resp(t, crereq2, cremsg2)
-       e2SubsId2 := xappConn2.handle_xapp_subs_resp(t)
+       e2SubsId2 := xappConn2.handle_xapp_subs_resp(t, cretrans2)
+
+       //Del1
+       deltrans1 := xappConn1.handle_xapp_subs_del_req(t, nil, e2SubsId1)
+       delreq1, delmsg1 := e2termConn.handle_e2term_subs_del_req(t)
+       e2termConn.handle_e2term_subs_del_resp(t, delreq1, delmsg1)
+       xappConn1.handle_xapp_subs_del_resp(t, deltrans1)
+       //Wait that subs is cleaned
+       mainCtrl.wait_subs_clean(t, e2SubsId1, 10)
+
+       //Del2
+       deltrans2 := xappConn2.handle_xapp_subs_del_req(t, nil, e2SubsId2)
+       delreq2, delmsg2 := e2termConn.handle_e2term_subs_del_req(t)
+       e2termConn.handle_e2term_subs_del_resp(t, delreq2, delmsg2)
+       xappConn2.handle_xapp_subs_del_resp(t, deltrans2)
+       //Wait that subs is cleaned
+       mainCtrl.wait_subs_clean(t, e2SubsId2, 10)
+}
+
+//-----------------------------------------------------------------------------
+// TestSameSubsDiffRan
+// Same subscription to different RANs
+//
+//   stub                          stub
+// +-------+     +---------+    +---------+
+// | xapp  |     | submgr  |    | e2term  |
+// +-------+     +---------+    +---------+
+//     |              |              |
+//     |              |              |
+//     |              |              |
+//     | SubReq(r1)   |              |
+//     |------------->|              |
+//     |              |              |
+//     |              | SubReq(r1)   |
+//     |              |------------->|
+//     |              |              |
+//     |              | SubResp(r1)  |
+//     |              |<-------------|
+//     |              |              |
+//     | SubResp(r1)  |              |
+//     |<-------------|              |
+//     |              |              |
+//     | SubReq(r2)   |              |
+//     |------------->|              |
+//     |              |              |
+//     |              | SubReq(r2)   |
+//     |              |------------->|
+//     |              |              |
+//     |              | SubResp(r2)  |
+//     |              |<-------------|
+//     |              |              |
+//     | SubResp(r2)  |              |
+//     |<-------------|              |
+//     |              |              |
+//     |       [SUBS r1 DELETE]      |
+//     |              |              |
+//     |       [SUBS r2 DELETE]      |
+//     |              |              |
+//
+//-----------------------------------------------------------------------------
+func TestSameSubsDiffRan(t *testing.T) {
+       xapp.Logger.Info("TestSameSubsDiffRan")
+
+       //Req1
+       cretrans1 := xappConn1.newXappTransaction(nil, "RAN_NAME_1")
+       xappConn1.handle_xapp_subs_req(t, cretrans1)
+       crereq1, cremsg1 := e2termConn.handle_e2term_subs_req(t)
+       e2termConn.handle_e2term_subs_resp(t, crereq1, cremsg1)
+       e2SubsId1 := xappConn1.handle_xapp_subs_resp(t, cretrans1)
+
+       //Req2
+       cretrans2 := xappConn1.newXappTransaction(nil, "RAN_NAME_2")
+       xappConn1.handle_xapp_subs_req(t, cretrans2)
+       crereq2, cremsg2 := e2termConn.handle_e2term_subs_req(t)
+       e2termConn.handle_e2term_subs_resp(t, crereq2, cremsg2)
+       e2SubsId2 := xappConn1.handle_xapp_subs_resp(t, cretrans2)
 
        //Del1
-       xappConn1.handle_xapp_subs_del_req(t, true, e2SubsId1)
+       deltrans1 := xappConn1.newXappTransaction(nil, "RAN_NAME_1")
+       xappConn1.handle_xapp_subs_del_req(t, deltrans1, e2SubsId1)
        delreq1, delmsg1 := e2termConn.handle_e2term_subs_del_req(t)
        e2termConn.handle_e2term_subs_del_resp(t, delreq1, delmsg1)
-       xappConn1.handle_xapp_subs_del_resp(t)
+       xappConn1.handle_xapp_subs_del_resp(t, deltrans1)
        //Wait that subs is cleaned
        mainCtrl.wait_subs_clean(t, e2SubsId1, 10)
 
        //Del2
-       xappConn2.handle_xapp_subs_del_req(t, true, e2SubsId2)
+       deltrans2 := xappConn1.newXappTransaction(nil, "RAN_NAME_2")
+       xappConn1.handle_xapp_subs_del_req(t, deltrans2, e2SubsId2)
        delreq2, delmsg2 := e2termConn.handle_e2term_subs_del_req(t)
        e2termConn.handle_e2term_subs_del_resp(t, delreq2, delmsg2)
-       xappConn2.handle_xapp_subs_del_resp(t)
+       xappConn1.handle_xapp_subs_del_resp(t, deltrans2)
        //Wait that subs is cleaned
        mainCtrl.wait_subs_clean(t, e2SubsId2, 10)
 }
index 3ac15f1..3d70d63 100644 (file)
@@ -25,8 +25,20 @@ import (
 )
 
 type Subscription struct {
-       Seq       uint16
-       Confirmed bool
+       Seq    uint16
+       Active bool
+}
+
+func (s *Subscription) Confirmed() {
+       s.Active = true
+}
+
+func (s *Subscription) UnConfirmed() {
+       s.Active = false
+}
+
+func (s *Subscription) IsConfirmed() bool {
+       return s.Active
 }
 
 type Registry struct {
@@ -42,24 +54,37 @@ func (r *Registry) Initialize(seedsn uint16) {
 }
 
 // Reserves and returns the next free sequence number
-func (r *Registry) ReserveSequenceNumber() (uint16, bool) {
+func (r *Registry) ReserveSubscription() *Subscription {
        // Check is current SequenceNumber valid
+       // Allocate next SequenceNumber value and retry N times
        r.mutex.Lock()
        defer r.mutex.Unlock()
-       sequenceNumber := r.counter
-       if _, ok := r.register[sequenceNumber]; ok {
-               xapp.Logger.Error("Invalid SeqenceNumber sequenceNumber: %v", sequenceNumber)
-               return sequenceNumber, false
+       var subs *Subscription = nil
+       var retrytimes uint16 = 1000
+       for ; subs == nil && retrytimes > 0; retrytimes-- {
+               sequenceNumber := r.counter
+               if r.counter == 65535 {
+                       r.counter = 0
+               } else {
+                       r.counter++
+               }
+               if _, ok := r.register[sequenceNumber]; ok == false {
+                       r.register[sequenceNumber] = &Subscription{sequenceNumber, false}
+                       return r.register[sequenceNumber]
+               }
        }
-       r.register[sequenceNumber] = &Subscription{sequenceNumber, false}
+       return nil
+}
 
-       // Allocate next SequenceNumber value
-       if r.counter == 65535 {
-               r.counter = 0
-       } else {
-               r.counter++
+// This function checks the validity of the given subscription id
+func (r *Registry) GetSubscription(sn uint16) *Subscription {
+       r.mutex.Lock()
+       defer r.mutex.Unlock()
+       xapp.Logger.Debug("Registry map: %v", r.register)
+       if _, ok := r.register[sn]; ok {
+               return r.register[sn]
        }
-       return sequenceNumber, true
+       return nil
 }
 
 // This function checks the validity of the given subscription id
@@ -77,14 +102,14 @@ func (r *Registry) IsValidSequenceNumber(sn uint16) bool {
 func (r *Registry) setSubscriptionToConfirmed(sn uint16) {
        r.mutex.Lock()
        defer r.mutex.Unlock()
-       r.register[sn].Confirmed = true
+       r.register[sn].Confirmed()
 }
 
 //This function sets the given id as unused in the register
-func (r *Registry) deleteSubscription(sn uint16) {
+func (r *Registry) setSubscriptionToUnConfirmed(sn uint16) {
        r.mutex.Lock()
        defer r.mutex.Unlock()
-       r.register[sn].Confirmed = false
+       r.register[sn].UnConfirmed()
 }
 
 //This function releases the given id as unused in the register
index 584b331..9287ea8 100644 (file)
@@ -31,9 +31,8 @@ type TransactionKey struct {
 }
 
 type TransactionXappKey struct {
-       Addr string // xapp addr
-       Port uint16 // xapp port
-       Xid  string // xapp xid in req
+       RmrEndpoint
+       Xid string // xapp xid in req
 }
 
 type Transaction struct {
@@ -46,7 +45,7 @@ type Transaction struct {
 }
 
 func (t *Transaction) SubRouteInfo() SubRouteInfo {
-       return SubRouteInfo{t.Key.TransType, t.Xappkey.Addr, t.Xappkey.Port, t.Key.SubID}
+       return SubRouteInfo{t.Key.TransType, t.Xappkey.RmrEndpoint.Addr, t.Xappkey.RmrEndpoint.Port, t.Key.SubID}
 }
 
 /*
@@ -69,7 +68,8 @@ Returns error if there is similar transatcion ongoing.
 */
 func (t *Tracker) TrackTransaction(subID uint16, act Action, addr string, port uint16, params *xapp.RMRParams, respReceived bool, forwardRespToXapp bool) (*Transaction, error) {
        key := TransactionKey{subID, act}
-       xappkey := TransactionXappKey{addr, port, params.Xid}
+       endpoint := RmrEndpoint{addr, port}
+       xappkey := TransactionXappKey{endpoint, params.Xid}
        trans := &Transaction{t, key, xappkey, params, respReceived, forwardRespToXapp}
        t.mutex.Lock()
        defer t.mutex.Unlock()
index 83312d8..febd41a 100644 (file)
@@ -32,4 +32,8 @@ type SubRouteInfo struct {
        SubID   uint16
 }
 
+type RmrEndpoint struct {
+       Addr string // xapp addr
+       Port uint16 // xapp port
+}
 type Action int