From 379ff082446038f3dee0f22d4f79c0965e9da25a Mon Sep 17 00:00:00 2001 From: Juha Hyttinen Date: Mon, 30 Dec 2019 15:49:41 +0200 Subject: [PATCH] RICPLT-2961 Drop retransmitted messages with same transaction id Change-Id: Iaccff14cd9bbbf0b029a8153e9665209b68f65d3 Signed-off-by: Juha Hyttinen --- pkg/control/control.go | 53 +++-- pkg/control/main_test.go | 29 ++- pkg/control/messaging_test.go | 471 +++++++++++++++++++++++++++++------------- pkg/control/tracker.go | 84 +++++--- pkg/control/types.go | 15 -- 5 files changed, 433 insertions(+), 219 deletions(-) diff --git a/pkg/control/control.go b/pkg/control/control.go index b398daa..cb085de 100644 --- a/pkg/control/control.go +++ b/pkg/control/control.go @@ -164,26 +164,27 @@ func (c *Control) handleSubscriptionRequest(params *xapp.RMRParams) { err := c.e2ap.SetSubscriptionRequestSequenceNumber(params.Payload, newSubId) if err != nil { xapp.Logger.Error("SubReq: Unable to set Sequence Number in Payload. Dropping this msg. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid) + c.registry.releaseSequenceNumber(newSubId) return } srcAddr, srcPort, err := c.rtmgrClient.SplitSource(params.Src) if err != nil { xapp.Logger.Error("SubReq: Failed to update routing-manager. Dropping this msg. Err: %s, SubId: %v, Xid: %s", err, params.SubId, params.Xid) + c.registry.releaseSequenceNumber(newSubId) return } /* Create transatcion records for every subscription request */ - xactKey := TransactionKey{newSubId, CREATE} - xactValue := Transaction{*srcAddr, *srcPort, params} - err = c.tracker.TrackTransaction(xactKey, xactValue) + transaction, err := c.tracker.TrackTransaction(newSubId, CREATE, *srcAddr, *srcPort, params) if err != nil { xapp.Logger.Error("SubReq: Failed to create transaction record. Dropping this msg. Err: %v SubId: %v, Xid: %s", err, params.SubId, params.Xid) + c.registry.releaseSequenceNumber(newSubId) return } /* Update routing manager about the new subscription*/ - subRouteAction := SubRouteInfo{CREATE, *srcAddr, *srcPort, newSubId} + subRouteAction := transaction.SubRouteInfo() xapp.Logger.Info("SubReq: Starting routing manager update. SubId: %v, Xid: %s", params.SubId, params.Xid) err = c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction) @@ -225,13 +226,12 @@ func (c *Control) handleSubscriptionResponse(params *xapp.RMRParams) { c.timerMap.StopTimer("RIC_SUB_REQ", int(payloadSeqNum)) c.registry.setSubscriptionToConfirmed(payloadSeqNum) - var transaction Transaction - transaction, err = c.tracker.RetriveTransaction(payloadSeqNum, CREATE) + transaction, err := c.tracker.RetriveTransaction(payloadSeqNum, CREATE) if err != nil { xapp.Logger.Error("SubResp: Failed to retrive transaction record. Dropping this msg. Err: %v, SubId: %v", err, params.SubId) return } - xapp.Logger.Info("SubResp: SubId: %v, from address: %v:%v. Retrieved old subId", int(payloadSeqNum), transaction.XappInstanceAddress, transaction.XappPort) + xapp.Logger.Info("SubResp: SubId: %v, from address: %v:%v. Retrieved old subId", int(payloadSeqNum), transaction.Xappkey.Addr, transaction.Xappkey.Port) params.SubId = int(payloadSeqNum) params.Xid = transaction.OrigParams.Xid @@ -242,7 +242,7 @@ func (c *Control) handleSubscriptionResponse(params *xapp.RMRParams) { xapp.Logger.Error("SubResp: Failed to send response to xApp. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid) } - xapp.Logger.Info("SubResp: SubId: %v, from address: %v:%v. Deleting transaction record", int(payloadSeqNum), transaction.XappInstanceAddress, transaction.XappPort) + xapp.Logger.Info("SubResp: SubId: %v, from address: %v:%v. Deleting transaction record", int(payloadSeqNum), transaction.Xappkey.Addr, transaction.Xappkey.Port) transaction, err = c.tracker.completeTransaction(payloadSeqNum, CREATE) if err != nil { xapp.Logger.Error("SubResp: Failed to delete transaction record. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid) @@ -265,13 +265,12 @@ func (c *Control) handleSubscriptionFailure(params *xapp.RMRParams) { c.timerMap.StopTimer("RIC_SUB_REQ", int(payloadSeqNum)) - var transaction Transaction - transaction, err = c.tracker.RetriveTransaction(payloadSeqNum, CREATE) + transaction, err := c.tracker.RetriveTransaction(payloadSeqNum, CREATE) if err != nil { xapp.Logger.Error("SubFail: Failed to retrive transaction record. Dropping this msg. Err: %v, SubId: %v", err, params.SubId) return } - xapp.Logger.Info("SubFail: SubId: %v, from address: %v:%v. Forwarding response to xApp", int(payloadSeqNum), transaction.XappInstanceAddress, transaction.XappPort) + xapp.Logger.Info("SubFail: SubId: %v, from address: %v:%v. Forwarding response to xApp", int(payloadSeqNum), transaction.Xappkey.Addr, transaction.Xappkey.Port) params.SubId = int(payloadSeqNum) params.Xid = transaction.OrigParams.Xid @@ -285,7 +284,7 @@ func (c *Control) handleSubscriptionFailure(params *xapp.RMRParams) { time.Sleep(3 * time.Second) xapp.Logger.Info("SubFail: Starting routing manager update. SubId: %v, Xid: %s", params.SubId, params.Xid) - subRouteAction := SubRouteInfo{CREATE, transaction.XappInstanceAddress, transaction.XappPort, payloadSeqNum} + subRouteAction := transaction.SubRouteInfo() err = c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction) if err != nil { xapp.Logger.Error("SubFail: Failed to update routing manager. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid) @@ -348,10 +347,10 @@ func (c *Control) sendSubscriptionFailure(subId uint16, causeContent uint8, caus time.Sleep(3 * time.Second) - xapp.Logger.Info("SendSubFail: SubId: %v, from address: %v:%v. Deleting transaction record", int(subId), transaction.XappInstanceAddress, transaction.XappPort) + xapp.Logger.Info("SendSubFail: SubId: %v, from address: %v:%v. Deleting transaction record", int(subId), transaction.Xappkey.Addr, transaction.Xappkey.Port) xapp.Logger.Info("SubReqTimer: Starting routing manager update. SubId: %v, Xid: %s", params.SubId, params.Xid) - subRouteAction := SubRouteInfo{DELETE, transaction.XappInstanceAddress, transaction.XappPort, subId} + subRouteAction := SubRouteInfo{DELETE, transaction.Xappkey.Addr, transaction.Xappkey.Port, subId} err = c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction) if err != nil { xapp.Logger.Error("SendSubFail: Failed to update routing manager %v. SubId: %v, Xid: %s", err, params.SubId, params.Xid) @@ -409,7 +408,7 @@ func (c *Control) handleSubscriptionDeleteRequest(params *xapp.RMRParams) { if c.registry.IsValidSequenceNumber(payloadSeqNum) { c.registry.deleteSubscription(payloadSeqNum) - err = c.trackDeleteTransaction(params, payloadSeqNum) + _, err = c.trackDeleteTransaction(params, payloadSeqNum) if err != nil { xapp.Logger.Error("SubDelReq: Failed to create transaction record. Dropping this msg. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid) return @@ -429,14 +428,12 @@ func (c *Control) handleSubscriptionDeleteRequest(params *xapp.RMRParams) { return } -func (c *Control) trackDeleteTransaction(params *xapp.RMRParams, payloadSeqNum uint16) (err error) { +func (c *Control) trackDeleteTransaction(params *xapp.RMRParams, payloadSeqNum uint16) (transaction *Transaction, err error) { srcAddr, srcPort, err := c.rtmgrClient.SplitSource(params.Src) if err != nil { xapp.Logger.Error("SubDelReq: Failed to update routing-manager. Err: %s, SubId: %v, Xid: %s", err, params.SubId, params.Xid) } - xactKey := TransactionKey{payloadSeqNum, DELETE} - xactValue := Transaction{*srcAddr, *srcPort, params} - err = c.tracker.TrackTransaction(xactKey, xactValue) + transaction, err = c.tracker.TrackTransaction(payloadSeqNum, DELETE, *srcAddr, *srcPort, params) return } @@ -454,13 +451,12 @@ func (c *Control) handleSubscriptionDeleteResponse(params *xapp.RMRParams) (err c.timerMap.StopTimer("RIC_SUB_DEL_REQ", int(payloadSeqNum)) - var transaction Transaction - transaction, err = c.tracker.RetriveTransaction(payloadSeqNum, DELETE) + transaction, err := c.tracker.RetriveTransaction(payloadSeqNum, DELETE) if err != nil { xapp.Logger.Error("SubDelResp: Failed to retrive transaction record. Dropping this msg. Err: %v, SubId: %v", err, params.SubId) return } - xapp.Logger.Info("SubDelResp: SubId: %v, from address: %v:%v. Forwarding response to xApp", int(payloadSeqNum), transaction.XappInstanceAddress, transaction.XappPort) + xapp.Logger.Info("SubDelResp: SubId: %v, from address: %v:%v. Forwarding response to xApp", int(payloadSeqNum), transaction.Xappkey.Addr, transaction.Xappkey.Port) params.SubId = int(payloadSeqNum) params.Xid = transaction.OrigParams.Xid @@ -474,7 +470,7 @@ func (c *Control) handleSubscriptionDeleteResponse(params *xapp.RMRParams) (err time.Sleep(3 * time.Second) xapp.Logger.Info("SubDelResp: Starting routing manager update. SubId: %v, Xid: %s", params.SubId, params.Xid) - subRouteAction := SubRouteInfo{DELETE, transaction.XappInstanceAddress, transaction.XappPort, payloadSeqNum} + subRouteAction := SubRouteInfo{DELETE, transaction.Xappkey.Addr, transaction.Xappkey.Port, payloadSeqNum} err = c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction) if err != nil { xapp.Logger.Error("SubDelResp: Failed to update routing manager. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid) @@ -509,13 +505,12 @@ func (c *Control) handleSubscriptionDeleteFailure(params *xapp.RMRParams) { c.timerMap.StopTimer("RIC_SUB_DEL_REQ", int(payloadSeqNum)) - var transaction Transaction - transaction, err = c.tracker.RetriveTransaction(payloadSeqNum, DELETE) + transaction, err := c.tracker.RetriveTransaction(payloadSeqNum, DELETE) if err != nil { xapp.Logger.Error("SubDelFail: Failed to retrive transaction record. Dropping msg. Err %v, SubId: %v", err, params.SubId) return } - xapp.Logger.Info("SubDelFail: SubId: %v, from address: %v:%v. Forwarding response to xApp", int(payloadSeqNum), transaction.XappInstanceAddress, transaction.XappPort) + xapp.Logger.Info("SubDelFail: SubId: %v, from address: %v:%v. Forwarding response to xApp", int(payloadSeqNum), transaction.Xappkey.Addr, transaction.Xappkey.Port) params.SubId = int(payloadSeqNum) params.Xid = transaction.OrigParams.Xid @@ -529,7 +524,7 @@ func (c *Control) handleSubscriptionDeleteFailure(params *xapp.RMRParams) { time.Sleep(3 * time.Second) xapp.Logger.Info("SubDelFail: Starting routing manager update. SubId: %v, Xid: %s", params.SubId, params.Xid) - subRouteAction := SubRouteInfo{DELETE, transaction.XappInstanceAddress, transaction.XappPort, payloadSeqNum} + subRouteAction := SubRouteInfo{DELETE, transaction.Xappkey.Addr, transaction.Xappkey.Port, payloadSeqNum} c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction) if err != nil { xapp.Logger.Error("SubDelFail: Failed to update routing manager. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid) @@ -592,10 +587,10 @@ func (c *Control) sendSubscriptionDeleteFailure(subId uint16, causeContent uint8 time.Sleep(3 * time.Second) - xapp.Logger.Info("SendSubDelFail: SubId: %v, from address: %v:%v. Deleting transaction record", int(subId), transaction.XappInstanceAddress, transaction.XappPort) + xapp.Logger.Info("SendSubDelFail: SubId: %v, from address: %v:%v. Deleting transaction record", int(subId), transaction.Xappkey.Addr, transaction.Xappkey.Port) xapp.Logger.Info("SendSubDelFail: Starting routing manager update. SubId: %v, Xid: %s", params.SubId, params.Xid) - subRouteAction := SubRouteInfo{DELETE, transaction.XappInstanceAddress, transaction.XappPort, subId} + subRouteAction := SubRouteInfo{DELETE, transaction.Xappkey.Addr, transaction.Xappkey.Port, subId} err = c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction) if err != nil { xapp.Logger.Error("SendSubDelFail: Failed to update routing manager. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid) diff --git a/pkg/control/main_test.go b/pkg/control/main_test.go index f74339e..2185d53 100644 --- a/pkg/control/main_test.go +++ b/pkg/control/main_test.go @@ -95,6 +95,25 @@ func createNewRmrControl(desc string, rtfile string, port string, stat string) * return newConn } +//----------------------------------------------------------------------------- +// +//----------------------------------------------------------------------------- +type testingMainControl struct { + testingControl + c *Control +} + +func (mc *testingMainControl) wait_subs_clean(e2SubsId int, secs int) bool { + i := 1 + for ; i <= secs*2; i++ { + if mc.c.registry.IsValidSequenceNumber(uint16(e2SubsId)) == false { + return true + } + time.Sleep(500 * time.Millisecond) + } + return false +} + //----------------------------------------------------------------------------- // //----------------------------------------------------------------------------- @@ -123,6 +142,7 @@ func testCreateTmpFile(str string) (string, error) { var xappConn *testingRmrControl var e2termConn *testingRmrControl +var mainCtrl *testingMainControl func TestMain(m *testing.M) { xapp.Logger.Info("TestMain start") @@ -187,16 +207,17 @@ newrt|end subrtfilename, _ := testCreateTmpFile(subsrt) defer os.Remove(subrtfilename) os.Setenv("RMR_SEED_RT", subrtfilename) + os.Setenv("RMR_SRC_ID", "localhost:14560") xapp.Logger.Info("Using rt file %s", os.Getenv("RMR_SEED_RT")) + xapp.Logger.Info("Using src id %s", os.Getenv("RMR_SRC_ID")) - mainCtrl := &testingControl{} + mainCtrl = &testingMainControl{} mainCtrl.desc = "main" mainCtrl.syncChan = make(chan struct{}) - os.Setenv("RMR_SRC_ID", "localhost:14560") - c := NewControl() + mainCtrl.c = NewControl() xapp.SetReadyCB(mainCtrl.ReadyCB, nil) - go xapp.RunWithParams(c, false) + go xapp.RunWithParams(mainCtrl.c, false) <-mainCtrl.syncChan //--------------------------------- diff --git a/pkg/control/messaging_test.go b/pkg/control/messaging_test.go index 60a5dc0..7308563 100644 --- a/pkg/control/messaging_test.go +++ b/pkg/control/messaging_test.go @@ -34,9 +34,9 @@ import ( var e2asnpacker e2ap.E2APPackerIf = e2ap_wrapper.NewAsn1E2Packer() +//----------------------------------------------------------------------------- // -// -// +//----------------------------------------------------------------------------- func createSubsReq() *e2ap.E2APSubscriptionRequest { req := &e2ap.E2APSubscriptionRequest{} @@ -69,9 +69,9 @@ func createSubsReq() *e2ap.E2APSubscriptionRequest { return req } +//----------------------------------------------------------------------------- // -// -// +//----------------------------------------------------------------------------- func createSubsResp(req *e2ap.E2APSubscriptionRequest) *e2ap.E2APSubscriptionResponse { resp := &e2ap.E2APSubscriptionResponse{} @@ -96,9 +96,9 @@ func createSubsResp(req *e2ap.E2APSubscriptionRequest) *e2ap.E2APSubscriptionRes return resp } +//----------------------------------------------------------------------------- // -// -// +//----------------------------------------------------------------------------- func createSubsDelReq(e2SubsId uint32) *e2ap.E2APSubscriptionDeleteRequest { req := &e2ap.E2APSubscriptionDeleteRequest{} req.RequestId.Id = 1 @@ -107,9 +107,9 @@ func createSubsDelReq(e2SubsId uint32) *e2ap.E2APSubscriptionDeleteRequest { return req } +//----------------------------------------------------------------------------- // -// -// +//----------------------------------------------------------------------------- func createSubsDelResp(req *e2ap.E2APSubscriptionDeleteRequest) *e2ap.E2APSubscriptionDeleteResponse { resp := &e2ap.E2APSubscriptionDeleteResponse{} resp.RequestId.Id = req.RequestId.Id @@ -119,85 +119,57 @@ func createSubsDelResp(req *e2ap.E2APSubscriptionDeleteRequest) *e2ap.E2APSubscr } //----------------------------------------------------------------------------- -// TestSubRequestSubResponseOk -// -// stub stub -// +-------+ +---------+ +---------+ -// | xapp | | submgr | | e2term | -// +-------+ +---------+ +---------+ -// | | | -// | SubReq | | -// |------------->| | -// | | | -// | | SubReq | -// | |------------->| -// | | | -// | | SubResp | -// | |<-------------| -// | | | -// | SubResp | | -// |<-------------| | -// | | | -// | | | -// | SubDelReq | | -// |------------->| | -// | | | -// | | SubDelReq | -// | |------------->| -// | | | -// | | SubDelResp | -// | |<-------------| -// | | | -// | SubDelResp | | -// |<-------------| | // //----------------------------------------------------------------------------- -func TestSubReqAndSubDelOk(t *testing.T) { - - xapp.Logger.Info("TestSubReqAndSubDelOk start") +func handle_xapp_subs_req(t *testing.T) { + xapp.Logger.Info("handle_xapp_subs_req start") e2SubsReq := e2asnpacker.NewPackerSubscriptionRequest() - e2SubsResp := e2asnpacker.NewPackerSubscriptionResponse() - e2SubsDelReq := e2asnpacker.NewPackerSubscriptionDeleteRequest() - e2SubsDelResp := e2asnpacker.NewPackerSubscriptionDeleteResponse() - var e2SubsId int //--------------------------------- // xapp activity: Send Subs Req //--------------------------------- - select { - case <-time.After(5 * time.Second): - xapp.Logger.Info("(xappConn) Send Subs Req") - req := createSubsReq() - e2SubsReq.Set(req) - xapp.Logger.Debug("%s", e2SubsReq.String()) - err, packedMsg := e2SubsReq.Pack(nil) - if err != nil { - testError(t, "(xappConn) pack NOK %s", err.Error()) - } + //select { + //case <-time.After(1 * time.Second): + xapp.Logger.Info("(xappConn) Send Subs Req") + req := createSubsReq() + e2SubsReq.Set(req) + xapp.Logger.Debug("%s", e2SubsReq.String()) + err, packedMsg := e2SubsReq.Pack(nil) + if err != nil { + testError(t, "(xappConn) pack NOK %s", err.Error()) + } - params := &xapp.RMRParams{} - params.Mtype = xapp.RIC_SUB_REQ - params.SubId = -1 - params.Payload = packedMsg.Buf - params.Meid = &xapp.RMRMeid{RanName: "RAN_NAME_1"} - params.Xid = "XID_1" - params.Mbuf = nil - - snderr := xappConn.RmrSend(params) - if snderr != nil { - testError(t, "(xappConn) RMR SEND FAILED: %s", snderr.Error()) - } + params := &xapp.RMRParams{} + params.Mtype = xapp.RIC_SUB_REQ + params.SubId = -1 + params.Payload = packedMsg.Buf + params.Meid = &xapp.RMRMeid{RanName: "RAN_NAME_1"} + params.Xid = "XID_1" + params.Mbuf = nil + + snderr := xappConn.RmrSend(params) + if snderr != nil { + testError(t, "(xappConn) RMR SEND FAILED: %s", snderr.Error()) } + //} +} + +//----------------------------------------------------------------------------- +// +//----------------------------------------------------------------------------- +func handle_e2term_subs_req(t *testing.T) (*e2ap.E2APSubscriptionRequest, *xapp.RMRParams) { + xapp.Logger.Info("handle_e2term_subs_req start") + e2SubsReq := e2asnpacker.NewPackerSubscriptionRequest() //--------------------------------- - // e2term activity: Recv Subs Req & Send Subs Resp + // e2term activity: Recv Subs Req //--------------------------------- select { case msg := <-e2termConn.rmrConChan: if msg.Mtype != xapp.RICMessageTypes["RIC_SUB_REQ"] { testError(t, "(e2termConn) Received non RIC_SUB_REQ message") } else { - xapp.Logger.Info("(e2termConn) Recv Subs Req & Send Subs Resp") + xapp.Logger.Info("(e2termConn) Recv Subs Req") packedData := &packer.PackedData{} packedData.Buf = msg.Payload unpackerr := e2SubsReq.UnPack(packedData) @@ -208,32 +180,56 @@ func TestSubReqAndSubDelOk(t *testing.T) { if geterr != nil { testError(t, "(e2termConn) RIC_SUB_REQ get failed err: %s", geterr.Error()) } - - resp := createSubsResp(req) - e2SubsResp.Set(resp) - xapp.Logger.Debug("%s", e2SubsResp.String()) - packerr, packedMsg := e2SubsResp.Pack(nil) - if packerr != nil { - testError(t, "(e2termConn) pack NOK %s", packerr.Error()) - } - - params := &xapp.RMRParams{} - params.Mtype = xapp.RIC_SUB_RESP - params.SubId = msg.SubId - params.Payload = packedMsg.Buf - params.Meid = msg.Meid - params.Xid = msg.Xid - params.Mbuf = nil - - snderr := e2termConn.RmrSend(params) - if snderr != nil { - testError(t, "(e2termConn) RMR SEND FAILED: %s", snderr.Error()) - } - + return req, msg } case <-time.After(15 * time.Second): testError(t, "(e2termConn) Not Received RIC_SUB_REQ within 15 secs") } + return nil, nil +} + +func handle_e2term_subs_resp(t *testing.T, req *e2ap.E2APSubscriptionRequest, msg *xapp.RMRParams) { + xapp.Logger.Info("handle_e2term_subs_resp start") + e2SubsResp := e2asnpacker.NewPackerSubscriptionResponse() + + //--------------------------------- + // e2term activity: Send Subs Resp + //--------------------------------- + xapp.Logger.Info("(e2termConn) Send Subs Resp") + resp := createSubsResp(req) + e2SubsResp.Set(resp) + xapp.Logger.Debug("%s", e2SubsResp.String()) + packerr, packedMsg := e2SubsResp.Pack(nil) + if packerr != nil { + testError(t, "(e2termConn) pack NOK %s", packerr.Error()) + } + + params := &xapp.RMRParams{} + params.Mtype = xapp.RIC_SUB_RESP + params.SubId = msg.SubId + params.Payload = packedMsg.Buf + params.Meid = msg.Meid + params.Xid = msg.Xid + params.Mbuf = nil + + snderr := e2termConn.RmrSend(params) + if snderr != nil { + testError(t, "(e2termConn) RMR SEND FAILED: %s", snderr.Error()) + } +} + +func handle_e2term_subs_reqandresp(t *testing.T) { + req, msg := handle_e2term_subs_req(t) + handle_e2term_subs_resp(t, req, msg) +} + +//----------------------------------------------------------------------------- +// +//----------------------------------------------------------------------------- +func handle_xapp_subs_resp(t *testing.T) int { + xapp.Logger.Info("handle_xapp_subs_resp start") + e2SubsResp := e2asnpacker.NewPackerSubscriptionResponse() + var e2SubsId int //--------------------------------- // xapp activity: Recv Subs Resp @@ -262,44 +258,61 @@ func TestSubReqAndSubDelOk(t *testing.T) { case <-time.After(15 * time.Second): testError(t, "(xappConn) Not Received RIC_SUB_RESP within 15 secs") } + return e2SubsId +} + +//----------------------------------------------------------------------------- +// +//----------------------------------------------------------------------------- +func handle_xapp_subs_del_req(t *testing.T, e2SubsId int) { + xapp.Logger.Info("handle_xapp_subs_del_req start") + e2SubsDelReq := e2asnpacker.NewPackerSubscriptionDeleteRequest() //--------------------------------- // xapp activity: Send Subs Del Req //--------------------------------- - select { - case <-time.After(2 * time.Second): - xapp.Logger.Info("(xappConn) Send Subs Del Req") - req := createSubsDelReq(uint32(e2SubsId)) - e2SubsDelReq.Set(req) - xapp.Logger.Debug("%s", e2SubsDelReq.String()) - err, packedMsg := e2SubsDelReq.Pack(nil) - if err != nil { - testError(t, "(xappConn) pack NOK %s", err.Error()) - } + //select { + //case <-time.After(1 * time.Second): + xapp.Logger.Info("(xappConn) Send Subs Del Req") + req := createSubsDelReq(uint32(e2SubsId)) + e2SubsDelReq.Set(req) + xapp.Logger.Debug("%s", e2SubsDelReq.String()) + err, packedMsg := e2SubsDelReq.Pack(nil) + if err != nil { + testError(t, "(xappConn) pack NOK %s", err.Error()) + } - params := &xapp.RMRParams{} - params.Mtype = xapp.RIC_SUB_DEL_REQ - params.SubId = e2SubsId - params.Payload = packedMsg.Buf - params.Meid = &xapp.RMRMeid{RanName: "RAN_NAME_1"} - params.Xid = "XID_1" - params.Mbuf = nil - - snderr := xappConn.RmrSend(params) - if snderr != nil { - testError(t, "(xappConn) RMR SEND FAILED: %s", snderr.Error()) - } + params := &xapp.RMRParams{} + params.Mtype = xapp.RIC_SUB_DEL_REQ + params.SubId = e2SubsId + params.Payload = packedMsg.Buf + params.Meid = &xapp.RMRMeid{RanName: "RAN_NAME_1"} + params.Xid = "XID_1" + params.Mbuf = nil + + snderr := xappConn.RmrSend(params) + if snderr != nil { + testError(t, "(xappConn) RMR SEND FAILED: %s", snderr.Error()) } + //} +} + +//----------------------------------------------------------------------------- +// +//----------------------------------------------------------------------------- +func handle_e2term_subs_del_req(t *testing.T) (*e2ap.E2APSubscriptionDeleteRequest, *xapp.RMRParams) { + xapp.Logger.Info("handle_e2term_subs_del_req start") + e2SubsDelReq := e2asnpacker.NewPackerSubscriptionDeleteRequest() //--------------------------------- - // e2term activity: Recv Subs Del Req & Send Subs Del Resp + // e2term activity: Recv Subs Del Req //--------------------------------- select { case msg := <-e2termConn.rmrConChan: if msg.Mtype != xapp.RICMessageTypes["RIC_SUB_DEL_REQ"] { testError(t, "(e2termConn) Received non RIC_SUB_DEL_REQ message") } else { - xapp.Logger.Info("(e2termConn) Recv Subs Del Req & Send Subs Del Resp") + xapp.Logger.Info("(e2termConn) Recv Subs Del Req") packedData := &packer.PackedData{} packedData.Buf = msg.Payload @@ -311,32 +324,56 @@ func TestSubReqAndSubDelOk(t *testing.T) { if geterr != nil { testError(t, "(e2termConn) RIC_SUB_DEL_REQ get failed err: %s", geterr.Error()) } - - resp := createSubsDelResp(req) - e2SubsDelResp.Set(resp) - xapp.Logger.Debug("%s", e2SubsDelResp.String()) - packerr, packedMsg := e2SubsDelResp.Pack(nil) - if packerr != nil { - testError(t, "(e2termConn) pack NOK %s", packerr.Error()) - } - - params := &xapp.RMRParams{} - params.Mtype = xapp.RIC_SUB_DEL_RESP - params.SubId = msg.SubId - params.Payload = packedMsg.Buf - params.Meid = msg.Meid - params.Xid = msg.Xid - params.Mbuf = nil - - snderr := e2termConn.RmrSend(params) - if snderr != nil { - testError(t, "(e2termConn) RMR SEND FAILED: %s", snderr.Error()) - } - + return req, msg } case <-time.After(15 * time.Second): testError(t, "(e2termConn) Not Received RIC_SUB_DEL_REQ within 15 secs") } + return nil, nil +} + +func handle_e2term_subs_del_resp(t *testing.T, req *e2ap.E2APSubscriptionDeleteRequest, msg *xapp.RMRParams) { + xapp.Logger.Info("handle_e2term_subs_del_resp start") + e2SubsDelResp := e2asnpacker.NewPackerSubscriptionDeleteResponse() + + //--------------------------------- + // e2term activity: Send Subs Del Resp + //--------------------------------- + xapp.Logger.Info("(e2termConn) Send Subs Del Resp") + resp := createSubsDelResp(req) + e2SubsDelResp.Set(resp) + xapp.Logger.Debug("%s", e2SubsDelResp.String()) + packerr, packedMsg := e2SubsDelResp.Pack(nil) + if packerr != nil { + testError(t, "(e2termConn) pack NOK %s", packerr.Error()) + } + + params := &xapp.RMRParams{} + params.Mtype = xapp.RIC_SUB_DEL_RESP + params.SubId = msg.SubId + params.Payload = packedMsg.Buf + params.Meid = msg.Meid + params.Xid = msg.Xid + params.Mbuf = nil + + snderr := e2termConn.RmrSend(params) + if snderr != nil { + testError(t, "(e2termConn) RMR SEND FAILED: %s", snderr.Error()) + } + +} + +func handle_e2term_subs_del_reqandresp(t *testing.T) { + req, msg := handle_e2term_subs_del_req(t) + handle_e2term_subs_del_resp(t, req, msg) +} + +//----------------------------------------------------------------------------- +// +//----------------------------------------------------------------------------- +func handle_xapp_subs_del_resp(t *testing.T) { + xapp.Logger.Info("handle_xapp_subs_del_resp start") + e2SubsDelResp := e2asnpacker.NewPackerSubscriptionDeleteResponse() //--------------------------------- // xapp activity: Recv Subs Del Resp @@ -363,5 +400,163 @@ func TestSubReqAndSubDelOk(t *testing.T) { case <-time.After(15 * time.Second): testError(t, "(xappConn) Not Received RIC_SUB_DEL_RESP within 15 secs") } +} + +//----------------------------------------------------------------------------- +// +//----------------------------------------------------------------------------- +func handle_wait_subs_clean(t *testing.T, e2SubsId int) bool { + xapp.Logger.Info("handle_wait_subs_clean start") + if mainCtrl.wait_subs_clean(e2SubsId, 10) == false { + testError(t, "(general) no clean within 10 secs") + return false + } + return true +} + +//----------------------------------------------------------------------------- +// TestSubReqAndSubDelOk +// +// stub stub +// +-------+ +---------+ +---------+ +// | xapp | | submgr | | e2term | +// +-------+ +---------+ +---------+ +// | | | +// | SubReq | | +// |------------->| | +// | | | +// | | SubReq | +// | |------------->| +// | | | +// | | SubResp | +// | |<-------------| +// | | | +// | SubResp | | +// |<-------------| | +// | | | +// | | | +// | SubDelReq | | +// |------------->| | +// | | | +// | | SubDelReq | +// | |------------->| +// | | | +// | | SubDelResp | +// | |<-------------| +// | | | +// | SubDelResp | | +// |<-------------| | +// +//----------------------------------------------------------------------------- +func TestSubReqAndSubDelOk(t *testing.T) { + xapp.Logger.Info("TestSubReqAndSubDelOk start") + + handle_xapp_subs_req(t) + handle_e2term_subs_reqandresp(t) + e2SubsId := handle_xapp_subs_resp(t) + + handle_xapp_subs_del_req(t, e2SubsId) + handle_e2term_subs_del_reqandresp(t) + handle_xapp_subs_del_resp(t) + + //Wait that subs is cleaned + handle_wait_subs_clean(t, e2SubsId) +} + +//----------------------------------------------------------------------------- +// TestSubReqRetransmission +// +// stub stub +// +-------+ +---------+ +---------+ +// | xapp | | submgr | | e2term | +// +-------+ +---------+ +---------+ +// | | | +// | SubReq | | +// |------------->| | +// | | | +// | | SubReq | +// | |------------->| +// | | | +// | SubReq | | +// | (retrans) | | +// |------------->| | +// | | | +// | | SubResp | +// | |<-------------| +// | | | +// | SubResp | | +// |<-------------| | +// | | | +// | [SUBS DELETE] | +// | | | +// +//----------------------------------------------------------------------------- +func TestSubReqRetransmission(t *testing.T) { + xapp.Logger.Info("TestSubReqRetransmission start") + + //Subs Create + handle_xapp_subs_req(t) + req, msg := handle_e2term_subs_req(t) + handle_xapp_subs_req(t) + + handle_e2term_subs_resp(t, req, msg) + + e2SubsId := handle_xapp_subs_resp(t) + + //Subs Delete + handle_xapp_subs_del_req(t, e2SubsId) + handle_e2term_subs_del_reqandresp(t) + handle_xapp_subs_del_resp(t) + + //Wait that subs is cleaned + handle_wait_subs_clean(t, e2SubsId) +} + +//----------------------------------------------------------------------------- +// TestSubDelReqRetransmission +// +// stub stub +// +-------+ +---------+ +---------+ +// | xapp | | submgr | | e2term | +// +-------+ +---------+ +---------+ +// | | | +// | [SUBS CREATE] | +// | | | +// | | | +// | SubDelReq | | +// |------------->| | +// | | | +// | | SubDelReq | +// | |------------->| +// | | | +// | SubDelReq | | +// |------------->| | +// | | | +// | | SubDelResp | +// | |<-------------| +// | | | +// | SubDelResp | | +// |<-------------| | +// +//----------------------------------------------------------------------------- +func TestSubDelReqRetransmission(t *testing.T) { + xapp.Logger.Info("TestSubDelReqRetransmission start") + + //Subs Create + handle_xapp_subs_req(t) + handle_e2term_subs_reqandresp(t) + e2SubsId := handle_xapp_subs_resp(t) + + //Subs Delete + handle_xapp_subs_del_req(t, e2SubsId) + req, msg := handle_e2term_subs_del_req(t) + + <-time.After(2 * time.Second) + + handle_xapp_subs_del_req(t, e2SubsId) + + handle_e2term_subs_del_resp(t, req, msg) + //Wait that subs is cleaned + handle_wait_subs_clean(t, e2SubsId) } diff --git a/pkg/control/tracker.go b/pkg/control/tracker.go index 1682ae7..65f816e 100644 --- a/pkg/control/tracker.go +++ b/pkg/control/tracker.go @@ -21,83 +21,101 @@ package control import ( "fmt" + "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" "sync" ) +type TransactionKey struct { + SubID uint16 // subscription id / sequence number + TransType Action // action ongoing (CREATE/DELETE etc) +} + +type TransactionXappKey struct { + Addr string // xapp addr + Port uint16 // xapp port + Xid string // xapp xid in req +} + +type Transaction struct { + tracker *Tracker // tracker instance + Key TransactionKey // action key + Xappkey TransactionXappKey // transaction key + OrigParams *xapp.RMRParams // request orginal params +} + +func (t *Transaction) SubRouteInfo() SubRouteInfo { + return SubRouteInfo{t.Key.TransType, t.Xappkey.Addr, t.Xappkey.Port, t.Key.SubID} +} + /* Implements a record of ongoing transactions and helper functions to CRUD the records. */ type Tracker struct { - transactionTable map[TransactionKey]Transaction - mutex sync.Mutex + transactionTable map[TransactionKey]*Transaction + transactionXappTable map[TransactionXappKey]*Transaction + mutex sync.Mutex } func (t *Tracker) Init() { - t.transactionTable = make(map[TransactionKey]Transaction) + t.transactionTable = make(map[TransactionKey]*Transaction) + t.transactionXappTable = make(map[TransactionXappKey]*Transaction) } /* Checks if a tranascation with similar type has been ongoing. If not then creates one. Returns error if there is similar transatcion ongoing. */ -func (t *Tracker) TrackTransaction(key TransactionKey, xact Transaction) error { +func (t *Tracker) TrackTransaction(subID uint16, act Action, addr string, port uint16, params *xapp.RMRParams) (*Transaction, error) { + key := TransactionKey{subID, act} + xappkey := TransactionXappKey{addr, port, params.Xid} + trans := &Transaction{t, key, xappkey, params} t.mutex.Lock() defer t.mutex.Unlock() if _, ok := t.transactionTable[key]; ok { // TODO: Implement merge related check here. If the key is same but the value is different. - err := fmt.Errorf("transaction tracker: Similar transaction with sub id %d and type %s is ongoing", key.SubID, key.transType) - return err + err := fmt.Errorf("transaction tracker: Similar transaction with sub id %d and type %s is ongoing", key.SubID, key.TransType) + return nil, err } - t.transactionTable[key] = xact - return nil -} - -/* -Retreives the transaction table entry for the given request. -Returns error in case the transaction cannot be found. -*/ -func (t *Tracker) UpdateTransaction(SubID uint16, transType Action, xact Transaction) error { - key := TransactionKey{SubID, transType} - t.mutex.Lock() - defer t.mutex.Unlock() - if _, ok := t.transactionTable[key]; ok { + if _, ok := t.transactionXappTable[xappkey]; ok { // TODO: Implement merge related check here. If the key is same but the value is different. - err := fmt.Errorf("transaction tracker: Similar transaction with sub id %d and type %v is ongoing", key.SubID, key.transType) - return err + err := fmt.Errorf("transaction tracker: Similar transaction with xapp key %v is ongoing", xappkey) + return nil, err } - t.transactionTable[key] = xact - return nil + t.transactionTable[key] = trans + t.transactionXappTable[xappkey] = trans + return trans, nil } /* Retreives the transaction table entry for the given request. Returns error in case the transaction cannot be found. */ -func (t *Tracker) RetriveTransaction(subID uint16, act Action) (Transaction, error) { +func (t *Tracker) RetriveTransaction(subID uint16, act Action) (*Transaction, error) { key := TransactionKey{subID, act} t.mutex.Lock() defer t.mutex.Unlock() - var xact Transaction - if xact, ok := t.transactionTable[key]; ok { - return xact, nil + if trans, ok := t.transactionTable[key]; ok { + return trans, nil } err := fmt.Errorf("transaction record for Subscription ID %d and action %s does not exist", subID, act) - return xact, err + return nil, err } /* Deletes the transaction table entry for the given request and returns the deleted xapp's address and port for reference. Returns error in case the transaction cannot be found. */ -func (t *Tracker) completeTransaction(subID uint16, act Action) (Transaction, error) { +func (t *Tracker) completeTransaction(subID uint16, act Action) (*Transaction, error) { key := TransactionKey{subID, act} - var emptyTransaction Transaction t.mutex.Lock() defer t.mutex.Unlock() - if xact, ok := t.transactionTable[key]; ok { + if trans, ok1 := t.transactionTable[key]; ok1 { + if _, ok2 := t.transactionXappTable[trans.Xappkey]; ok2 { + delete(t.transactionXappTable, trans.Xappkey) + } delete(t.transactionTable, key) - return xact, nil + return trans, nil } err := fmt.Errorf("transaction record for Subscription ID %d and action %s does not exist", subID, act) - return emptyTransaction, err + return nil, err } diff --git a/pkg/control/types.go b/pkg/control/types.go index 3a9587f..83312d8 100644 --- a/pkg/control/types.go +++ b/pkg/control/types.go @@ -19,10 +19,6 @@ package control -import ( - "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" -) - type RmrDatagram struct { MessageType int SubscriptionId uint16 @@ -37,14 +33,3 @@ type SubRouteInfo struct { } type Action int - -type TransactionKey struct { - SubID uint16 - transType Action -} - -type Transaction struct { - XappInstanceAddress string - XappPort uint16 - OrigParams *xapp.RMRParams -} -- 2.16.6