From 47b842bf6afc45313a0edadc78f87bff06ddf2b4 Mon Sep 17 00:00:00 2001 From: Juha Hyttinen Date: Wed, 8 Jan 2020 13:01:52 +0200 Subject: [PATCH] RICPLT-2988 Unittest timing issues during retransmission case Preparation also for RICPLT-2571 Change-Id: Ie98aface81a308022ea4015d88aab449564f932f Signed-off-by: Juha Hyttinen --- pkg/control/control.go | 26 ++--- pkg/control/main_test.go | 30 ++++-- pkg/control/messaging_test.go | 229 ++++++++++++++++++++++++++++++++---------- pkg/control/registry.go | 59 +++++++---- pkg/control/tracker.go | 10 +- pkg/control/types.go | 4 + 6 files changed, 263 insertions(+), 95 deletions(-) diff --git a/pkg/control/control.go b/pkg/control/control.go index d5a92b6..9ce34a0 100755 --- a/pkg/control/control.go +++ b/pkg/control/control.go @@ -44,6 +44,7 @@ type Control struct { tracker *Tracker timerMap *TimerMap rmrSendMutex sync.Mutex + msgCounter uint64 } type RMRMeid struct { @@ -98,6 +99,7 @@ func NewControl() *Control { rtmgrClient: &rtmgrClient, tracker: tracker, timerMap: timerMap, + msgCounter: 0, } } @@ -130,6 +132,7 @@ func (c *Control) rmrReplyToSender(params *xapp.RMRParams) (err error) { } func (c *Control) Consume(msg *xapp.RMRParams) (err error) { + c.msgCounter++ switch msg.Mtype { case xapp.RICMessageTypes["RIC_SUB_REQ"]: go c.handleSubscriptionRequest(msg) @@ -155,35 +158,34 @@ func (c *Control) handleSubscriptionRequest(params *xapp.RMRParams) { params.Mbuf = nil /* Reserve a sequence number and set it in the payload */ - newSubId, isIdValid := c.registry.ReserveSequenceNumber() - if isIdValid != true { + subs := c.registry.ReserveSubscription() + if subs == nil { xapp.Logger.Error("SubReq: Failed to reserve sequence number. Dropping this msg. SubId: %v, Xid: %s", params.SubId, params.Xid) - c.registry.releaseSequenceNumber(newSubId) return } - params.SubId = int(newSubId) - err := c.e2ap.SetSubscriptionRequestSequenceNumber(params.Payload, newSubId) + params.SubId = int(subs.Seq) + err := c.e2ap.SetSubscriptionRequestSequenceNumber(params.Payload, subs.Seq) if err != nil { xapp.Logger.Error("SubReq: Unable to get Sequence Number from Payload. Dropping this msg. Err: %v, SubId: %v, Xid: %s, Payload %X", err, params.SubId, params.Xid, params.Payload) - c.registry.releaseSequenceNumber(newSubId) + c.registry.releaseSequenceNumber(subs.Seq) return } srcAddr, srcPort, err := c.rtmgrClient.SplitSource(params.Src) if err != nil { xapp.Logger.Error("SubReq: Failed to update routing-manager. Dropping this msg. Err: %s, SubId: %v, Xid: %s", err, params.SubId, params.Xid) - c.registry.releaseSequenceNumber(newSubId) + c.registry.releaseSequenceNumber(subs.Seq) return } // Create transatcion record for every subscription request var forwardRespToXapp bool = true var responseReceived bool = false - transaction, err := c.tracker.TrackTransaction(newSubId, CREATE, *srcAddr, *srcPort, params, responseReceived, forwardRespToXapp) + transaction, err := c.tracker.TrackTransaction(subs.Seq, CREATE, *srcAddr, *srcPort, params, responseReceived, forwardRespToXapp) if err != nil { xapp.Logger.Error("SubReq: Failed to create transaction record. Dropping this msg. Err: %v SubId: %v, Xid: %s", err, params.SubId, params.Xid) - c.registry.releaseSequenceNumber(newSubId) + c.registry.releaseSequenceNumber(subs.Seq) return } @@ -194,7 +196,7 @@ func (c *Control) handleSubscriptionRequest(params *xapp.RMRParams) { err = c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction) if err != nil { xapp.Logger.Error("SubReq: Failed to update routing manager. Dropping this msg. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid) - c.registry.releaseSequenceNumber(newSubId) + c.registry.releaseSequenceNumber(subs.Seq) return } @@ -204,7 +206,7 @@ func (c *Control) handleSubscriptionRequest(params *xapp.RMRParams) { if err != nil { xapp.Logger.Error("SubReq: Failed to send request to E2T %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid) } - c.timerMap.StartTimer("RIC_SUB_REQ", int(newSubId), subReqTime, FirstTry, c.handleSubscriptionRequestTimer) + c.timerMap.StartTimer("RIC_SUB_REQ", int(subs.Seq), subReqTime, FirstTry, c.handleSubscriptionRequestTimer) xapp.Logger.Debug("SubReq: Debugging transaction table = %v", c.tracker.transactionTable) return } @@ -421,13 +423,13 @@ func (c *Control) handleSubscriptionDeleteRequest(params *xapp.RMRParams) { xapp.Logger.Info("SubDelReq: Received payloadSeqNum: %v", payloadSeqNum) if c.registry.IsValidSequenceNumber(payloadSeqNum) { - c.registry.deleteSubscription(payloadSeqNum) var forwardRespToXapp bool = true _, err = c.trackDeleteTransaction(params, payloadSeqNum, forwardRespToXapp) if err != nil { xapp.Logger.Error("SubDelReq: Failed to create transaction record. Dropping this msg. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid) return } + c.registry.setSubscriptionToUnConfirmed(payloadSeqNum) } else { xapp.Logger.Error("SubDelReq: Not valid sequence number. Dropping this msg. SubId: %v, Xid: %s", params.SubId, params.Xid) return diff --git a/pkg/control/main_test.go b/pkg/control/main_test.go index 1c69b01..dc62225 100644 --- a/pkg/control/main_test.go +++ b/pkg/control/main_test.go @@ -117,24 +117,43 @@ func initTestingMessageChannel() testingMessageChannel { //----------------------------------------------------------------------------- // //----------------------------------------------------------------------------- +//----------------------------------------------------------------------------- +// +//----------------------------------------------------------------------------- +type xappTransaction struct { + tc *testingXappControl + xid string + meid *xapp.RMRMeid +} type testingXappControl struct { testingRmrControl testingMessageChannel meid *xapp.RMRMeid xid_seq uint64 - xid string } func (tc *testingXappControl) newXid() string { - tc.xid = tc.desc + "_XID_" + strconv.FormatUint(uint64(tc.xid_seq), 10) + var xid string + xid = tc.desc + "_XID_" + strconv.FormatUint(uint64(tc.xid_seq), 10) tc.xid_seq++ - return tc.xid + return xid +} + +func (tc *testingXappControl) newXappTransaction(xid *string, ranname string) *xappTransaction { + trans := &xappTransaction{} + trans.tc = tc + if xid == nil { + trans.xid = tc.newXid() + } else { + trans.xid = *xid + } + trans.meid = &xapp.RMRMeid{RanName: ranname} + return trans } func (tc *testingXappControl) Consume(msg *xapp.RMRParams) (err error) { - //if msg.Xid == tc.xid { if strings.Contains(msg.Xid, tc.desc) { xapp.Logger.Info("(%s) Consume mtype=%s subid=%d xid=%s", tc.desc, xapp.RicMessageTypeToName[msg.Mtype], msg.SubId, msg.Xid) tc.rmrConChan <- msg @@ -149,8 +168,7 @@ func createNewXappControl(desc string, rtfile string, port string, stat string, xappCtrl.testingRmrControl = initTestingRmrControl(desc, rtfile, port, stat, xappCtrl) xappCtrl.testingMessageChannel = initTestingMessageChannel() xappCtrl.meid = &xapp.RMRMeid{RanName: ranname} - xappCtrl.xid_seq = 0 - xappCtrl.newXid() + xappCtrl.xid_seq = 1 return xappCtrl } diff --git a/pkg/control/messaging_test.go b/pkg/control/messaging_test.go index b8ed6e6..2d45ede 100644 --- a/pkg/control/messaging_test.go +++ b/pkg/control/messaging_test.go @@ -31,27 +31,12 @@ import ( //----------------------------------------------------------------------------- // //----------------------------------------------------------------------------- - var e2asnpacker e2ap.E2APPackerIf = e2ap_wrapper.NewAsn1E2Packer() //----------------------------------------------------------------------------- // //----------------------------------------------------------------------------- -type xappTransaction struct { - xappConn *testingXappControl - xid string -} - -func newXappTransaction(xappConn *testingXappControl) { - trans := &xappTransaction{} - trans.xappConn = xappConn - trans.xid = xappConn.newXid() -} - -//----------------------------------------------------------------------------- -// -//----------------------------------------------------------------------------- -func (xappConn *testingXappControl) handle_xapp_subs_req(t *testing.T, updseq bool) { +func (xappConn *testingXappControl) handle_xapp_subs_req(t *testing.T, oldTrans *xappTransaction) *xappTransaction { xapp.Logger.Info("handle_xapp_subs_req") e2SubsReq := e2asnpacker.NewPackerSubscriptionRequest() @@ -93,29 +78,34 @@ func (xappConn *testingXappControl) handle_xapp_subs_req(t *testing.T, updseq bo err, packedMsg := e2SubsReq.Pack(nil) if err != nil { testError(t, "(%s) pack NOK %s", xappConn.desc, err.Error()) + return nil + } + + var trans *xappTransaction = oldTrans + if trans == nil { + trans = xappConn.newXappTransaction(nil, "RAN_NAME_1") } params := &xapp.RMRParams{} params.Mtype = xapp.RIC_SUB_REQ params.SubId = -1 params.Payload = packedMsg.Buf - params.Meid = xappConn.meid - if updseq { - xappConn.newXid() - } - params.Xid = xappConn.xid + params.Meid = trans.meid + params.Xid = trans.xid params.Mbuf = nil snderr := xappConn.RmrSend(params) if snderr != nil { testError(t, "(%s) RMR SEND FAILED: %s", xappConn.desc, snderr.Error()) + return nil } + return trans } //----------------------------------------------------------------------------- // //----------------------------------------------------------------------------- -func (xappConn *testingXappControl) handle_xapp_subs_resp(t *testing.T) int { +func (xappConn *testingXappControl) handle_xapp_subs_resp(t *testing.T, trans *xappTransaction) int { xapp.Logger.Info("handle_xapp_subs_resp") e2SubsResp := e2asnpacker.NewPackerSubscriptionResponse() var e2SubsId int @@ -126,7 +116,10 @@ func (xappConn *testingXappControl) handle_xapp_subs_resp(t *testing.T) int { select { case msg := <-xappConn.rmrConChan: if msg.Mtype != xapp.RICMessageTypes["RIC_SUB_RESP"] { - testError(t, "(%s) Received wrong mtype expected %s got %s, error", xappConn.desc, "RIC_SUB_RESP", xapp.RicMessageTypeToName[msg.Mtype]) + testError(t, "(%s) Received RIC_SUB_RESP wrong mtype expected %s got %s, error", xappConn.desc, "RIC_SUB_RESP", xapp.RicMessageTypeToName[msg.Mtype]) + return -1 + } else if msg.Xid != trans.xid { + testError(t, "(%s) Received RIC_SUB_RESP wrong xid expected %s got %s, error", xappConn.desc, trans.xid, msg.Xid) return -1 } else { packedData := &packer.PackedData{} @@ -155,7 +148,7 @@ func (xappConn *testingXappControl) handle_xapp_subs_resp(t *testing.T) int { //----------------------------------------------------------------------------- // //----------------------------------------------------------------------------- -func (xappConn *testingXappControl) handle_xapp_subs_del_req(t *testing.T, updseq bool, e2SubsId int) { +func (xappConn *testingXappControl) handle_xapp_subs_del_req(t *testing.T, oldTrans *xappTransaction, e2SubsId int) *xappTransaction { xapp.Logger.Info("handle_xapp_subs_del_req") e2SubsDelReq := e2asnpacker.NewPackerSubscriptionDeleteRequest() @@ -174,29 +167,34 @@ func (xappConn *testingXappControl) handle_xapp_subs_del_req(t *testing.T, updse err, packedMsg := e2SubsDelReq.Pack(nil) if err != nil { testError(t, "(%s) pack NOK %s", xappConn.desc, err.Error()) + return nil + } + + var trans *xappTransaction = oldTrans + if trans == nil { + trans = xappConn.newXappTransaction(nil, "RAN_NAME_1") } params := &xapp.RMRParams{} params.Mtype = xapp.RIC_SUB_DEL_REQ params.SubId = e2SubsId params.Payload = packedMsg.Buf - params.Meid = xappConn.meid - if updseq { - xappConn.newXid() - } - params.Xid = xappConn.xid + params.Meid = trans.meid + params.Xid = trans.xid params.Mbuf = nil snderr := xappConn.RmrSend(params) if snderr != nil { testError(t, "(%s) RMR SEND FAILED: %s", xappConn.desc, snderr.Error()) + return nil } + return trans } //----------------------------------------------------------------------------- // //----------------------------------------------------------------------------- -func (xappConn *testingXappControl) handle_xapp_subs_del_resp(t *testing.T) { +func (xappConn *testingXappControl) handle_xapp_subs_del_resp(t *testing.T, trans *xappTransaction) { xapp.Logger.Info("handle_xapp_subs_del_resp") e2SubsDelResp := e2asnpacker.NewPackerSubscriptionDeleteResponse() @@ -206,7 +204,10 @@ func (xappConn *testingXappControl) handle_xapp_subs_del_resp(t *testing.T) { select { case msg := <-xappConn.rmrConChan: if msg.Mtype != xapp.RICMessageTypes["RIC_SUB_DEL_RESP"] { - testError(t, "(%s) Received wrong mtype expected %s got %s, error", xappConn.desc, "RIC_SUB_DEL_RESP", xapp.RicMessageTypeToName[msg.Mtype]) + testError(t, "(%s) Received RIC_SUB_DEL_RESP wrong mtype expected %s got %s, error", xappConn.desc, "RIC_SUB_DEL_RESP", xapp.RicMessageTypeToName[msg.Mtype]) + return + } else if msg.Xid != trans.xid { + testError(t, "(%s) Received RIC_SUB_DEL_RESP wrong xid expected %s got %s, error", xappConn.desc, trans.xid, msg.Xid) return } else { packedData := &packer.PackedData{} @@ -403,6 +404,44 @@ func (mc *testingMainControl) wait_subs_clean(t *testing.T, e2SubsId int, secs i return false } +func (mc *testingMainControl) get_seqcnt(t *testing.T) uint16 { + mc.c.registry.mutex.Lock() + defer mc.c.registry.mutex.Unlock() + return mc.c.registry.counter +} + +func (mc *testingMainControl) wait_seqcnt_change(t *testing.T, orig uint16, secs int) (uint16, bool) { + i := 1 + for ; i <= secs*2; i++ { + mc.c.registry.mutex.Lock() + curr := mc.c.registry.counter + mc.c.registry.mutex.Unlock() + if curr != orig { + return curr, true + } + time.Sleep(500 * time.Millisecond) + } + testError(t, "(general) no seq change within %d secs", secs) + return 0, false +} + +func (mc *testingMainControl) get_msgcounter(t *testing.T) uint64 { + return mc.c.msgCounter +} + +func (mc *testingMainControl) wait_msgcounter_change(t *testing.T, orig uint64, secs int) (uint64, bool) { + i := 1 + for ; i <= secs*2; i++ { + curr := mc.c.msgCounter + if curr != orig { + return curr, true + } + time.Sleep(500 * time.Millisecond) + } + testError(t, "(general) no msg counter change within %d secs", secs) + return 0, false +} + //----------------------------------------------------------------------------- // TestSubReqAndSubDelOk // @@ -440,15 +479,15 @@ func (mc *testingMainControl) wait_subs_clean(t *testing.T, e2SubsId int, secs i func TestSubReqAndSubDelOk(t *testing.T) { xapp.Logger.Info("TestSubReqAndSubDelOk") - xappConn1.handle_xapp_subs_req(t, true) + cretrans := xappConn1.handle_xapp_subs_req(t, nil) crereq, cremsg := e2termConn.handle_e2term_subs_req(t) e2termConn.handle_e2term_subs_resp(t, crereq, cremsg) - e2SubsId := xappConn1.handle_xapp_subs_resp(t) + e2SubsId := xappConn1.handle_xapp_subs_resp(t, cretrans) - xappConn1.handle_xapp_subs_del_req(t, true, e2SubsId) + deltrans := xappConn1.handle_xapp_subs_del_req(t, nil, e2SubsId) delreq, delmsg := e2termConn.handle_e2term_subs_del_req(t) e2termConn.handle_e2term_subs_del_resp(t, delreq, delmsg) - xappConn1.handle_xapp_subs_del_resp(t) + xappConn1.handle_xapp_subs_del_resp(t, deltrans) //Wait that subs is cleaned mainCtrl.wait_subs_clean(t, e2SubsId, 10) @@ -486,19 +525,21 @@ func TestSubReqRetransmission(t *testing.T) { xapp.Logger.Info("TestSubReqRetransmission") //Subs Create - xappConn1.handle_xapp_subs_req(t, true) + cretrans := xappConn1.handle_xapp_subs_req(t, nil) crereq, cremsg := e2termConn.handle_e2term_subs_req(t) - xappConn1.handle_xapp_subs_req(t, false) - e2termConn.handle_e2term_subs_resp(t, crereq, cremsg) + seqBef := mainCtrl.get_msgcounter(t) + xappConn1.handle_xapp_subs_req(t, cretrans) //Retransmitted SubReq + mainCtrl.wait_msgcounter_change(t, seqBef, 10) - e2SubsId := xappConn1.handle_xapp_subs_resp(t) + e2termConn.handle_e2term_subs_resp(t, crereq, cremsg) + e2SubsId := xappConn1.handle_xapp_subs_resp(t, cretrans) //Subs Delete - xappConn1.handle_xapp_subs_del_req(t, true, e2SubsId) + deltrans := xappConn1.handle_xapp_subs_del_req(t, nil, e2SubsId) delreq, delmsg := e2termConn.handle_e2term_subs_del_req(t) e2termConn.handle_e2term_subs_del_resp(t, delreq, delmsg) - xappConn1.handle_xapp_subs_del_resp(t) + xappConn1.handle_xapp_subs_del_resp(t, deltrans) //Wait that subs is cleaned mainCtrl.wait_subs_clean(t, e2SubsId, 10) @@ -535,20 +576,21 @@ func TestSubDelReqRetransmission(t *testing.T) { xapp.Logger.Info("TestSubDelReqRetransmission") //Subs Create - xappConn1.handle_xapp_subs_req(t, true) + cretrans := xappConn1.handle_xapp_subs_req(t, nil) crereq, cremsg := e2termConn.handle_e2term_subs_req(t) e2termConn.handle_e2term_subs_resp(t, crereq, cremsg) - e2SubsId := xappConn1.handle_xapp_subs_resp(t) + e2SubsId := xappConn1.handle_xapp_subs_resp(t, cretrans) //Subs Delete - xappConn1.handle_xapp_subs_del_req(t, true, e2SubsId) + deltrans := xappConn1.handle_xapp_subs_del_req(t, nil, e2SubsId) delreq, delmsg := e2termConn.handle_e2term_subs_del_req(t) - <-time.After(2 * time.Second) - xappConn1.handle_xapp_subs_del_req(t, false, e2SubsId) + seqBef := mainCtrl.get_msgcounter(t) + xappConn1.handle_xapp_subs_del_req(t, deltrans, e2SubsId) //Retransmitted SubDelReq + mainCtrl.wait_msgcounter_change(t, seqBef, 10) e2termConn.handle_e2term_subs_del_resp(t, delreq, delmsg) - xappConn1.handle_xapp_subs_del_resp(t) + xappConn1.handle_xapp_subs_del_resp(t, deltrans) //Wait that subs is cleaned mainCtrl.wait_subs_clean(t, e2SubsId, 10) @@ -596,34 +638,111 @@ func TestSubReqAndSubDelOkTwoParallel(t *testing.T) { xapp.Logger.Info("TestSubReqAndSubDelOkTwoParallel") //Req1 - xappConn1.handle_xapp_subs_req(t, true) + cretrans1 := xappConn1.handle_xapp_subs_req(t, nil) crereq1, cremsg1 := e2termConn.handle_e2term_subs_req(t) //Req2 - xappConn2.handle_xapp_subs_req(t, true) + cretrans2 := xappConn2.handle_xapp_subs_req(t, nil) crereq2, cremsg2 := e2termConn.handle_e2term_subs_req(t) //Resp1 e2termConn.handle_e2term_subs_resp(t, crereq1, cremsg1) - e2SubsId1 := xappConn1.handle_xapp_subs_resp(t) + e2SubsId1 := xappConn1.handle_xapp_subs_resp(t, cretrans1) //Resp2 e2termConn.handle_e2term_subs_resp(t, crereq2, cremsg2) - e2SubsId2 := xappConn2.handle_xapp_subs_resp(t) + e2SubsId2 := xappConn2.handle_xapp_subs_resp(t, cretrans2) + + //Del1 + deltrans1 := xappConn1.handle_xapp_subs_del_req(t, nil, e2SubsId1) + delreq1, delmsg1 := e2termConn.handle_e2term_subs_del_req(t) + e2termConn.handle_e2term_subs_del_resp(t, delreq1, delmsg1) + xappConn1.handle_xapp_subs_del_resp(t, deltrans1) + //Wait that subs is cleaned + mainCtrl.wait_subs_clean(t, e2SubsId1, 10) + + //Del2 + deltrans2 := xappConn2.handle_xapp_subs_del_req(t, nil, e2SubsId2) + delreq2, delmsg2 := e2termConn.handle_e2term_subs_del_req(t) + e2termConn.handle_e2term_subs_del_resp(t, delreq2, delmsg2) + xappConn2.handle_xapp_subs_del_resp(t, deltrans2) + //Wait that subs is cleaned + mainCtrl.wait_subs_clean(t, e2SubsId2, 10) +} + +//----------------------------------------------------------------------------- +// TestSameSubsDiffRan +// Same subscription to different RANs +// +// stub stub +// +-------+ +---------+ +---------+ +// | xapp | | submgr | | e2term | +// +-------+ +---------+ +---------+ +// | | | +// | | | +// | | | +// | SubReq(r1) | | +// |------------->| | +// | | | +// | | SubReq(r1) | +// | |------------->| +// | | | +// | | SubResp(r1) | +// | |<-------------| +// | | | +// | SubResp(r1) | | +// |<-------------| | +// | | | +// | SubReq(r2) | | +// |------------->| | +// | | | +// | | SubReq(r2) | +// | |------------->| +// | | | +// | | SubResp(r2) | +// | |<-------------| +// | | | +// | SubResp(r2) | | +// |<-------------| | +// | | | +// | [SUBS r1 DELETE] | +// | | | +// | [SUBS r2 DELETE] | +// | | | +// +//----------------------------------------------------------------------------- +func TestSameSubsDiffRan(t *testing.T) { + xapp.Logger.Info("TestSameSubsDiffRan") + + //Req1 + cretrans1 := xappConn1.newXappTransaction(nil, "RAN_NAME_1") + xappConn1.handle_xapp_subs_req(t, cretrans1) + crereq1, cremsg1 := e2termConn.handle_e2term_subs_req(t) + e2termConn.handle_e2term_subs_resp(t, crereq1, cremsg1) + e2SubsId1 := xappConn1.handle_xapp_subs_resp(t, cretrans1) + + //Req2 + cretrans2 := xappConn1.newXappTransaction(nil, "RAN_NAME_2") + xappConn1.handle_xapp_subs_req(t, cretrans2) + crereq2, cremsg2 := e2termConn.handle_e2term_subs_req(t) + e2termConn.handle_e2term_subs_resp(t, crereq2, cremsg2) + e2SubsId2 := xappConn1.handle_xapp_subs_resp(t, cretrans2) //Del1 - xappConn1.handle_xapp_subs_del_req(t, true, e2SubsId1) + deltrans1 := xappConn1.newXappTransaction(nil, "RAN_NAME_1") + xappConn1.handle_xapp_subs_del_req(t, deltrans1, e2SubsId1) delreq1, delmsg1 := e2termConn.handle_e2term_subs_del_req(t) e2termConn.handle_e2term_subs_del_resp(t, delreq1, delmsg1) - xappConn1.handle_xapp_subs_del_resp(t) + xappConn1.handle_xapp_subs_del_resp(t, deltrans1) //Wait that subs is cleaned mainCtrl.wait_subs_clean(t, e2SubsId1, 10) //Del2 - xappConn2.handle_xapp_subs_del_req(t, true, e2SubsId2) + deltrans2 := xappConn1.newXappTransaction(nil, "RAN_NAME_2") + xappConn1.handle_xapp_subs_del_req(t, deltrans2, e2SubsId2) delreq2, delmsg2 := e2termConn.handle_e2term_subs_del_req(t) e2termConn.handle_e2term_subs_del_resp(t, delreq2, delmsg2) - xappConn2.handle_xapp_subs_del_resp(t) + xappConn1.handle_xapp_subs_del_resp(t, deltrans2) //Wait that subs is cleaned mainCtrl.wait_subs_clean(t, e2SubsId2, 10) } diff --git a/pkg/control/registry.go b/pkg/control/registry.go index 3ac15f1..3d70d63 100644 --- a/pkg/control/registry.go +++ b/pkg/control/registry.go @@ -25,8 +25,20 @@ import ( ) type Subscription struct { - Seq uint16 - Confirmed bool + Seq uint16 + Active bool +} + +func (s *Subscription) Confirmed() { + s.Active = true +} + +func (s *Subscription) UnConfirmed() { + s.Active = false +} + +func (s *Subscription) IsConfirmed() bool { + return s.Active } type Registry struct { @@ -42,24 +54,37 @@ func (r *Registry) Initialize(seedsn uint16) { } // Reserves and returns the next free sequence number -func (r *Registry) ReserveSequenceNumber() (uint16, bool) { +func (r *Registry) ReserveSubscription() *Subscription { // Check is current SequenceNumber valid + // Allocate next SequenceNumber value and retry N times r.mutex.Lock() defer r.mutex.Unlock() - sequenceNumber := r.counter - if _, ok := r.register[sequenceNumber]; ok { - xapp.Logger.Error("Invalid SeqenceNumber sequenceNumber: %v", sequenceNumber) - return sequenceNumber, false + var subs *Subscription = nil + var retrytimes uint16 = 1000 + for ; subs == nil && retrytimes > 0; retrytimes-- { + sequenceNumber := r.counter + if r.counter == 65535 { + r.counter = 0 + } else { + r.counter++ + } + if _, ok := r.register[sequenceNumber]; ok == false { + r.register[sequenceNumber] = &Subscription{sequenceNumber, false} + return r.register[sequenceNumber] + } } - r.register[sequenceNumber] = &Subscription{sequenceNumber, false} + return nil +} - // Allocate next SequenceNumber value - if r.counter == 65535 { - r.counter = 0 - } else { - r.counter++ +// This function checks the validity of the given subscription id +func (r *Registry) GetSubscription(sn uint16) *Subscription { + r.mutex.Lock() + defer r.mutex.Unlock() + xapp.Logger.Debug("Registry map: %v", r.register) + if _, ok := r.register[sn]; ok { + return r.register[sn] } - return sequenceNumber, true + return nil } // This function checks the validity of the given subscription id @@ -77,14 +102,14 @@ func (r *Registry) IsValidSequenceNumber(sn uint16) bool { func (r *Registry) setSubscriptionToConfirmed(sn uint16) { r.mutex.Lock() defer r.mutex.Unlock() - r.register[sn].Confirmed = true + r.register[sn].Confirmed() } //This function sets the given id as unused in the register -func (r *Registry) deleteSubscription(sn uint16) { +func (r *Registry) setSubscriptionToUnConfirmed(sn uint16) { r.mutex.Lock() defer r.mutex.Unlock() - r.register[sn].Confirmed = false + r.register[sn].UnConfirmed() } //This function releases the given id as unused in the register diff --git a/pkg/control/tracker.go b/pkg/control/tracker.go index 584b331..9287ea8 100644 --- a/pkg/control/tracker.go +++ b/pkg/control/tracker.go @@ -31,9 +31,8 @@ type TransactionKey struct { } type TransactionXappKey struct { - Addr string // xapp addr - Port uint16 // xapp port - Xid string // xapp xid in req + RmrEndpoint + Xid string // xapp xid in req } type Transaction struct { @@ -46,7 +45,7 @@ type Transaction struct { } func (t *Transaction) SubRouteInfo() SubRouteInfo { - return SubRouteInfo{t.Key.TransType, t.Xappkey.Addr, t.Xappkey.Port, t.Key.SubID} + return SubRouteInfo{t.Key.TransType, t.Xappkey.RmrEndpoint.Addr, t.Xappkey.RmrEndpoint.Port, t.Key.SubID} } /* @@ -69,7 +68,8 @@ Returns error if there is similar transatcion ongoing. */ func (t *Tracker) TrackTransaction(subID uint16, act Action, addr string, port uint16, params *xapp.RMRParams, respReceived bool, forwardRespToXapp bool) (*Transaction, error) { key := TransactionKey{subID, act} - xappkey := TransactionXappKey{addr, port, params.Xid} + endpoint := RmrEndpoint{addr, port} + xappkey := TransactionXappKey{endpoint, params.Xid} trans := &Transaction{t, key, xappkey, params, respReceived, forwardRespToXapp} t.mutex.Lock() defer t.mutex.Unlock() diff --git a/pkg/control/types.go b/pkg/control/types.go index 83312d8..febd41a 100644 --- a/pkg/control/types.go +++ b/pkg/control/types.go @@ -32,4 +32,8 @@ type SubRouteInfo struct { SubID uint16 } +type RmrEndpoint struct { + Addr string // xapp addr + Port uint16 // xapp port +} type Action int -- 2.16.6