RICPLT-2961 Drop retransmitted messages with same transaction id 29/2129/4
authorJuha Hyttinen <juha.hyttinen@nokia.com>
Mon, 30 Dec 2019 13:49:41 +0000 (15:49 +0200)
committerJuha Hyttinen <juha.hyttinen@nokia.com>
Thu, 2 Jan 2020 07:00:48 +0000 (09:00 +0200)
Change-Id: Iaccff14cd9bbbf0b029a8153e9665209b68f65d3
Signed-off-by: Juha Hyttinen <juha.hyttinen@nokia.com>
pkg/control/control.go
pkg/control/main_test.go
pkg/control/messaging_test.go
pkg/control/tracker.go
pkg/control/types.go

index b398daa..cb085de 100644 (file)
@@ -164,26 +164,27 @@ func (c *Control) handleSubscriptionRequest(params *xapp.RMRParams) {
        err := c.e2ap.SetSubscriptionRequestSequenceNumber(params.Payload, newSubId)
        if err != nil {
                xapp.Logger.Error("SubReq: Unable to set Sequence Number in Payload. Dropping this msg. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
+               c.registry.releaseSequenceNumber(newSubId)
                return
        }
 
        srcAddr, srcPort, err := c.rtmgrClient.SplitSource(params.Src)
        if err != nil {
                xapp.Logger.Error("SubReq: Failed to update routing-manager. Dropping this msg. Err: %s, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
+               c.registry.releaseSequenceNumber(newSubId)
                return
        }
 
        /* Create transatcion records for every subscription request */
-       xactKey := TransactionKey{newSubId, CREATE}
-       xactValue := Transaction{*srcAddr, *srcPort, params}
-       err = c.tracker.TrackTransaction(xactKey, xactValue)
+       transaction, err := c.tracker.TrackTransaction(newSubId, CREATE, *srcAddr, *srcPort, params)
        if err != nil {
                xapp.Logger.Error("SubReq: Failed to create transaction record. Dropping this msg. Err: %v SubId: %v, Xid: %s", err, params.SubId, params.Xid)
+               c.registry.releaseSequenceNumber(newSubId)
                return
        }
 
        /* Update routing manager about the new subscription*/
-       subRouteAction := SubRouteInfo{CREATE, *srcAddr, *srcPort, newSubId}
+       subRouteAction := transaction.SubRouteInfo()
        xapp.Logger.Info("SubReq: Starting routing manager update. SubId: %v, Xid: %s", params.SubId, params.Xid)
 
        err = c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
@@ -225,13 +226,12 @@ func (c *Control) handleSubscriptionResponse(params *xapp.RMRParams) {
        c.timerMap.StopTimer("RIC_SUB_REQ", int(payloadSeqNum))
 
        c.registry.setSubscriptionToConfirmed(payloadSeqNum)
-       var transaction Transaction
-       transaction, err = c.tracker.RetriveTransaction(payloadSeqNum, CREATE)
+       transaction, err := c.tracker.RetriveTransaction(payloadSeqNum, CREATE)
        if err != nil {
                xapp.Logger.Error("SubResp: Failed to retrive transaction record. Dropping this msg. Err: %v, SubId: %v", err, params.SubId)
                return
        }
-       xapp.Logger.Info("SubResp: SubId: %v, from address: %v:%v. Retrieved old subId", int(payloadSeqNum), transaction.XappInstanceAddress, transaction.XappPort)
+       xapp.Logger.Info("SubResp: SubId: %v, from address: %v:%v. Retrieved old subId", int(payloadSeqNum), transaction.Xappkey.Addr, transaction.Xappkey.Port)
 
        params.SubId = int(payloadSeqNum)
        params.Xid = transaction.OrigParams.Xid
@@ -242,7 +242,7 @@ func (c *Control) handleSubscriptionResponse(params *xapp.RMRParams) {
                xapp.Logger.Error("SubResp: Failed to send response to xApp. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
        }
 
-       xapp.Logger.Info("SubResp: SubId: %v, from address: %v:%v. Deleting transaction record", int(payloadSeqNum), transaction.XappInstanceAddress, transaction.XappPort)
+       xapp.Logger.Info("SubResp: SubId: %v, from address: %v:%v. Deleting transaction record", int(payloadSeqNum), transaction.Xappkey.Addr, transaction.Xappkey.Port)
        transaction, err = c.tracker.completeTransaction(payloadSeqNum, CREATE)
        if err != nil {
                xapp.Logger.Error("SubResp: Failed to delete transaction record. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
@@ -265,13 +265,12 @@ func (c *Control) handleSubscriptionFailure(params *xapp.RMRParams) {
 
        c.timerMap.StopTimer("RIC_SUB_REQ", int(payloadSeqNum))
 
-       var transaction Transaction
-       transaction, err = c.tracker.RetriveTransaction(payloadSeqNum, CREATE)
+       transaction, err := c.tracker.RetriveTransaction(payloadSeqNum, CREATE)
        if err != nil {
                xapp.Logger.Error("SubFail: Failed to retrive transaction record. Dropping this msg. Err: %v, SubId: %v", err, params.SubId)
                return
        }
-       xapp.Logger.Info("SubFail: SubId: %v, from address: %v:%v. Forwarding response to xApp", int(payloadSeqNum), transaction.XappInstanceAddress, transaction.XappPort)
+       xapp.Logger.Info("SubFail: SubId: %v, from address: %v:%v. Forwarding response to xApp", int(payloadSeqNum), transaction.Xappkey.Addr, transaction.Xappkey.Port)
 
        params.SubId = int(payloadSeqNum)
        params.Xid = transaction.OrigParams.Xid
@@ -285,7 +284,7 @@ func (c *Control) handleSubscriptionFailure(params *xapp.RMRParams) {
        time.Sleep(3 * time.Second)
 
        xapp.Logger.Info("SubFail: Starting routing manager update. SubId: %v, Xid: %s", params.SubId, params.Xid)
-       subRouteAction := SubRouteInfo{CREATE, transaction.XappInstanceAddress, transaction.XappPort, payloadSeqNum}
+       subRouteAction := transaction.SubRouteInfo()
        err = c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
        if err != nil {
                xapp.Logger.Error("SubFail: Failed to update routing manager. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
@@ -348,10 +347,10 @@ func (c *Control) sendSubscriptionFailure(subId uint16, causeContent uint8, caus
 
        time.Sleep(3 * time.Second)
 
-       xapp.Logger.Info("SendSubFail: SubId: %v, from address: %v:%v. Deleting transaction record", int(subId), transaction.XappInstanceAddress, transaction.XappPort)
+       xapp.Logger.Info("SendSubFail: SubId: %v, from address: %v:%v. Deleting transaction record", int(subId), transaction.Xappkey.Addr, transaction.Xappkey.Port)
 
        xapp.Logger.Info("SubReqTimer: Starting routing manager update. SubId: %v, Xid: %s", params.SubId, params.Xid)
-       subRouteAction := SubRouteInfo{DELETE, transaction.XappInstanceAddress, transaction.XappPort, subId}
+       subRouteAction := SubRouteInfo{DELETE, transaction.Xappkey.Addr, transaction.Xappkey.Port, subId}
        err = c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
        if err != nil {
                xapp.Logger.Error("SendSubFail: Failed to update routing manager %v. SubId: %v, Xid: %s", err, params.SubId, params.Xid)
@@ -409,7 +408,7 @@ func (c *Control) handleSubscriptionDeleteRequest(params *xapp.RMRParams) {
 
        if c.registry.IsValidSequenceNumber(payloadSeqNum) {
                c.registry.deleteSubscription(payloadSeqNum)
-               err = c.trackDeleteTransaction(params, payloadSeqNum)
+               _, err = c.trackDeleteTransaction(params, payloadSeqNum)
                if err != nil {
                        xapp.Logger.Error("SubDelReq: Failed to create transaction record. Dropping this msg. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
                        return
@@ -429,14 +428,12 @@ func (c *Control) handleSubscriptionDeleteRequest(params *xapp.RMRParams) {
        return
 }
 
-func (c *Control) trackDeleteTransaction(params *xapp.RMRParams, payloadSeqNum uint16) (err error) {
+func (c *Control) trackDeleteTransaction(params *xapp.RMRParams, payloadSeqNum uint16) (transaction *Transaction, err error) {
        srcAddr, srcPort, err := c.rtmgrClient.SplitSource(params.Src)
        if err != nil {
                xapp.Logger.Error("SubDelReq: Failed to update routing-manager. Err: %s, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
        }
-       xactKey := TransactionKey{payloadSeqNum, DELETE}
-       xactValue := Transaction{*srcAddr, *srcPort, params}
-       err = c.tracker.TrackTransaction(xactKey, xactValue)
+       transaction, err = c.tracker.TrackTransaction(payloadSeqNum, DELETE, *srcAddr, *srcPort, params)
        return
 }
 
@@ -454,13 +451,12 @@ func (c *Control) handleSubscriptionDeleteResponse(params *xapp.RMRParams) (err
 
        c.timerMap.StopTimer("RIC_SUB_DEL_REQ", int(payloadSeqNum))
 
-       var transaction Transaction
-       transaction, err = c.tracker.RetriveTransaction(payloadSeqNum, DELETE)
+       transaction, err := c.tracker.RetriveTransaction(payloadSeqNum, DELETE)
        if err != nil {
                xapp.Logger.Error("SubDelResp: Failed to retrive transaction record. Dropping this msg. Err: %v, SubId: %v", err, params.SubId)
                return
        }
-       xapp.Logger.Info("SubDelResp: SubId: %v, from address: %v:%v. Forwarding response to xApp", int(payloadSeqNum), transaction.XappInstanceAddress, transaction.XappPort)
+       xapp.Logger.Info("SubDelResp: SubId: %v, from address: %v:%v. Forwarding response to xApp", int(payloadSeqNum), transaction.Xappkey.Addr, transaction.Xappkey.Port)
 
        params.SubId = int(payloadSeqNum)
        params.Xid = transaction.OrigParams.Xid
@@ -474,7 +470,7 @@ func (c *Control) handleSubscriptionDeleteResponse(params *xapp.RMRParams) (err
        time.Sleep(3 * time.Second)
 
        xapp.Logger.Info("SubDelResp: Starting routing manager update. SubId: %v, Xid: %s", params.SubId, params.Xid)
-       subRouteAction := SubRouteInfo{DELETE, transaction.XappInstanceAddress, transaction.XappPort, payloadSeqNum}
+       subRouteAction := SubRouteInfo{DELETE, transaction.Xappkey.Addr, transaction.Xappkey.Port, payloadSeqNum}
        err = c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
        if err != nil {
                xapp.Logger.Error("SubDelResp: Failed to update routing manager. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
@@ -509,13 +505,12 @@ func (c *Control) handleSubscriptionDeleteFailure(params *xapp.RMRParams) {
 
        c.timerMap.StopTimer("RIC_SUB_DEL_REQ", int(payloadSeqNum))
 
-       var transaction Transaction
-       transaction, err = c.tracker.RetriveTransaction(payloadSeqNum, DELETE)
+       transaction, err := c.tracker.RetriveTransaction(payloadSeqNum, DELETE)
        if err != nil {
                xapp.Logger.Error("SubDelFail: Failed to retrive transaction record. Dropping msg. Err %v, SubId: %v", err, params.SubId)
                return
        }
-       xapp.Logger.Info("SubDelFail: SubId: %v, from address: %v:%v. Forwarding response to xApp", int(payloadSeqNum), transaction.XappInstanceAddress, transaction.XappPort)
+       xapp.Logger.Info("SubDelFail: SubId: %v, from address: %v:%v. Forwarding response to xApp", int(payloadSeqNum), transaction.Xappkey.Addr, transaction.Xappkey.Port)
 
        params.SubId = int(payloadSeqNum)
        params.Xid = transaction.OrigParams.Xid
@@ -529,7 +524,7 @@ func (c *Control) handleSubscriptionDeleteFailure(params *xapp.RMRParams) {
        time.Sleep(3 * time.Second)
 
        xapp.Logger.Info("SubDelFail: Starting routing manager update. SubId: %v, Xid: %s", params.SubId, params.Xid)
-       subRouteAction := SubRouteInfo{DELETE, transaction.XappInstanceAddress, transaction.XappPort, payloadSeqNum}
+       subRouteAction := SubRouteInfo{DELETE, transaction.Xappkey.Addr, transaction.Xappkey.Port, payloadSeqNum}
        c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
        if err != nil {
                xapp.Logger.Error("SubDelFail: Failed to update routing manager. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
@@ -592,10 +587,10 @@ func (c *Control) sendSubscriptionDeleteFailure(subId uint16, causeContent uint8
 
        time.Sleep(3 * time.Second)
 
-       xapp.Logger.Info("SendSubDelFail: SubId: %v, from address: %v:%v. Deleting transaction record", int(subId), transaction.XappInstanceAddress, transaction.XappPort)
+       xapp.Logger.Info("SendSubDelFail: SubId: %v, from address: %v:%v. Deleting transaction record", int(subId), transaction.Xappkey.Addr, transaction.Xappkey.Port)
 
        xapp.Logger.Info("SendSubDelFail: Starting routing manager update. SubId: %v, Xid: %s", params.SubId, params.Xid)
-       subRouteAction := SubRouteInfo{DELETE, transaction.XappInstanceAddress, transaction.XappPort, subId}
+       subRouteAction := SubRouteInfo{DELETE, transaction.Xappkey.Addr, transaction.Xappkey.Port, subId}
        err = c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
        if err != nil {
                xapp.Logger.Error("SendSubDelFail: Failed to update routing manager. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
index f74339e..2185d53 100644 (file)
@@ -95,6 +95,25 @@ func createNewRmrControl(desc string, rtfile string, port string, stat string) *
        return newConn
 }
 
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+type testingMainControl struct {
+       testingControl
+       c *Control
+}
+
+func (mc *testingMainControl) wait_subs_clean(e2SubsId int, secs int) bool {
+       i := 1
+       for ; i <= secs*2; i++ {
+               if mc.c.registry.IsValidSequenceNumber(uint16(e2SubsId)) == false {
+                       return true
+               }
+               time.Sleep(500 * time.Millisecond)
+       }
+       return false
+}
+
 //-----------------------------------------------------------------------------
 //
 //-----------------------------------------------------------------------------
@@ -123,6 +142,7 @@ func testCreateTmpFile(str string) (string, error) {
 
 var xappConn *testingRmrControl
 var e2termConn *testingRmrControl
+var mainCtrl *testingMainControl
 
 func TestMain(m *testing.M) {
        xapp.Logger.Info("TestMain start")
@@ -187,16 +207,17 @@ newrt|end
        subrtfilename, _ := testCreateTmpFile(subsrt)
        defer os.Remove(subrtfilename)
        os.Setenv("RMR_SEED_RT", subrtfilename)
+       os.Setenv("RMR_SRC_ID", "localhost:14560")
        xapp.Logger.Info("Using rt file %s", os.Getenv("RMR_SEED_RT"))
+       xapp.Logger.Info("Using src id  %s", os.Getenv("RMR_SRC_ID"))
 
-       mainCtrl := &testingControl{}
+       mainCtrl = &testingMainControl{}
        mainCtrl.desc = "main"
        mainCtrl.syncChan = make(chan struct{})
 
-       os.Setenv("RMR_SRC_ID", "localhost:14560")
-       c := NewControl()
+       mainCtrl.c = NewControl()
        xapp.SetReadyCB(mainCtrl.ReadyCB, nil)
-       go xapp.RunWithParams(c, false)
+       go xapp.RunWithParams(mainCtrl.c, false)
        <-mainCtrl.syncChan
 
        //---------------------------------
index 60a5dc0..7308563 100644 (file)
@@ -34,9 +34,9 @@ import (
 
 var e2asnpacker e2ap.E2APPackerIf = e2ap_wrapper.NewAsn1E2Packer()
 
+//-----------------------------------------------------------------------------
 //
-//
-//
+//-----------------------------------------------------------------------------
 func createSubsReq() *e2ap.E2APSubscriptionRequest {
        req := &e2ap.E2APSubscriptionRequest{}
 
@@ -69,9 +69,9 @@ func createSubsReq() *e2ap.E2APSubscriptionRequest {
        return req
 }
 
+//-----------------------------------------------------------------------------
 //
-//
-//
+//-----------------------------------------------------------------------------
 func createSubsResp(req *e2ap.E2APSubscriptionRequest) *e2ap.E2APSubscriptionResponse {
 
        resp := &e2ap.E2APSubscriptionResponse{}
@@ -96,9 +96,9 @@ func createSubsResp(req *e2ap.E2APSubscriptionRequest) *e2ap.E2APSubscriptionRes
        return resp
 }
 
+//-----------------------------------------------------------------------------
 //
-//
-//
+//-----------------------------------------------------------------------------
 func createSubsDelReq(e2SubsId uint32) *e2ap.E2APSubscriptionDeleteRequest {
        req := &e2ap.E2APSubscriptionDeleteRequest{}
        req.RequestId.Id = 1
@@ -107,9 +107,9 @@ func createSubsDelReq(e2SubsId uint32) *e2ap.E2APSubscriptionDeleteRequest {
        return req
 }
 
+//-----------------------------------------------------------------------------
 //
-//
-//
+//-----------------------------------------------------------------------------
 func createSubsDelResp(req *e2ap.E2APSubscriptionDeleteRequest) *e2ap.E2APSubscriptionDeleteResponse {
        resp := &e2ap.E2APSubscriptionDeleteResponse{}
        resp.RequestId.Id = req.RequestId.Id
@@ -119,85 +119,57 @@ func createSubsDelResp(req *e2ap.E2APSubscriptionDeleteRequest) *e2ap.E2APSubscr
 }
 
 //-----------------------------------------------------------------------------
-// TestSubRequestSubResponseOk
-//
-//   stub                          stub
-// +-------+     +---------+    +---------+
-// | xapp  |     | submgr  |    | e2term  |
-// +-------+     +---------+    +---------+
-//     |              |              |
-//     | SubReq       |              |
-//     |------------->|              |
-//     |              |              |
-//     |              | SubReq       |
-//     |              |------------->|
-//     |              |              |
-//     |              |      SubResp |
-//     |              |<-------------|
-//     |              |              |
-//     |      SubResp |              |
-//     |<-------------|              |
-//     |              |              |
-//     |              |              |
-//     | SubDelReq    |              |
-//     |------------->|              |
-//     |              |              |
-//     |              | SubDelReq    |
-//     |              |------------->|
-//     |              |              |
-//     |              |   SubDelResp |
-//     |              |<-------------|
-//     |              |              |
-//     |   SubDelResp |              |
-//     |<-------------|              |
 //
 //-----------------------------------------------------------------------------
-func TestSubReqAndSubDelOk(t *testing.T) {
-
-       xapp.Logger.Info("TestSubReqAndSubDelOk start")
+func handle_xapp_subs_req(t *testing.T) {
+       xapp.Logger.Info("handle_xapp_subs_req start")
        e2SubsReq := e2asnpacker.NewPackerSubscriptionRequest()
-       e2SubsResp := e2asnpacker.NewPackerSubscriptionResponse()
-       e2SubsDelReq := e2asnpacker.NewPackerSubscriptionDeleteRequest()
-       e2SubsDelResp := e2asnpacker.NewPackerSubscriptionDeleteResponse()
-       var e2SubsId int
 
        //---------------------------------
        // xapp activity: Send Subs Req
        //---------------------------------
-       select {
-       case <-time.After(5 * time.Second):
-               xapp.Logger.Info("(xappConn) Send Subs Req")
-               req := createSubsReq()
-               e2SubsReq.Set(req)
-               xapp.Logger.Debug("%s", e2SubsReq.String())
-               err, packedMsg := e2SubsReq.Pack(nil)
-               if err != nil {
-                       testError(t, "(xappConn) pack NOK %s", err.Error())
-               }
+       //select {
+       //case <-time.After(1 * time.Second):
+       xapp.Logger.Info("(xappConn) Send Subs Req")
+       req := createSubsReq()
+       e2SubsReq.Set(req)
+       xapp.Logger.Debug("%s", e2SubsReq.String())
+       err, packedMsg := e2SubsReq.Pack(nil)
+       if err != nil {
+               testError(t, "(xappConn) pack NOK %s", err.Error())
+       }
 
-               params := &xapp.RMRParams{}
-               params.Mtype = xapp.RIC_SUB_REQ
-               params.SubId = -1
-               params.Payload = packedMsg.Buf
-               params.Meid = &xapp.RMRMeid{RanName: "RAN_NAME_1"}
-               params.Xid = "XID_1"
-               params.Mbuf = nil
-
-               snderr := xappConn.RmrSend(params)
-               if snderr != nil {
-                       testError(t, "(xappConn) RMR SEND FAILED: %s", snderr.Error())
-               }
+       params := &xapp.RMRParams{}
+       params.Mtype = xapp.RIC_SUB_REQ
+       params.SubId = -1
+       params.Payload = packedMsg.Buf
+       params.Meid = &xapp.RMRMeid{RanName: "RAN_NAME_1"}
+       params.Xid = "XID_1"
+       params.Mbuf = nil
+
+       snderr := xappConn.RmrSend(params)
+       if snderr != nil {
+               testError(t, "(xappConn) RMR SEND FAILED: %s", snderr.Error())
        }
+       //}
+}
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+func handle_e2term_subs_req(t *testing.T) (*e2ap.E2APSubscriptionRequest, *xapp.RMRParams) {
+       xapp.Logger.Info("handle_e2term_subs_req start")
+       e2SubsReq := e2asnpacker.NewPackerSubscriptionRequest()
 
        //---------------------------------
-       // e2term activity: Recv Subs Req & Send Subs Resp
+       // e2term activity: Recv Subs Req
        //---------------------------------
        select {
        case msg := <-e2termConn.rmrConChan:
                if msg.Mtype != xapp.RICMessageTypes["RIC_SUB_REQ"] {
                        testError(t, "(e2termConn) Received non RIC_SUB_REQ message")
                } else {
-                       xapp.Logger.Info("(e2termConn) Recv Subs Req & Send Subs Resp")
+                       xapp.Logger.Info("(e2termConn) Recv Subs Req")
                        packedData := &packer.PackedData{}
                        packedData.Buf = msg.Payload
                        unpackerr := e2SubsReq.UnPack(packedData)
@@ -208,32 +180,56 @@ func TestSubReqAndSubDelOk(t *testing.T) {
                        if geterr != nil {
                                testError(t, "(e2termConn) RIC_SUB_REQ get failed err: %s", geterr.Error())
                        }
-
-                       resp := createSubsResp(req)
-                       e2SubsResp.Set(resp)
-                       xapp.Logger.Debug("%s", e2SubsResp.String())
-                       packerr, packedMsg := e2SubsResp.Pack(nil)
-                       if packerr != nil {
-                               testError(t, "(e2termConn) pack NOK %s", packerr.Error())
-                       }
-
-                       params := &xapp.RMRParams{}
-                       params.Mtype = xapp.RIC_SUB_RESP
-                       params.SubId = msg.SubId
-                       params.Payload = packedMsg.Buf
-                       params.Meid = msg.Meid
-                       params.Xid = msg.Xid
-                       params.Mbuf = nil
-
-                       snderr := e2termConn.RmrSend(params)
-                       if snderr != nil {
-                               testError(t, "(e2termConn) RMR SEND FAILED: %s", snderr.Error())
-                       }
-
+                       return req, msg
                }
        case <-time.After(15 * time.Second):
                testError(t, "(e2termConn) Not Received RIC_SUB_REQ within 15 secs")
        }
+       return nil, nil
+}
+
+func handle_e2term_subs_resp(t *testing.T, req *e2ap.E2APSubscriptionRequest, msg *xapp.RMRParams) {
+       xapp.Logger.Info("handle_e2term_subs_resp start")
+       e2SubsResp := e2asnpacker.NewPackerSubscriptionResponse()
+
+       //---------------------------------
+       // e2term activity: Send Subs Resp
+       //---------------------------------
+       xapp.Logger.Info("(e2termConn) Send Subs Resp")
+       resp := createSubsResp(req)
+       e2SubsResp.Set(resp)
+       xapp.Logger.Debug("%s", e2SubsResp.String())
+       packerr, packedMsg := e2SubsResp.Pack(nil)
+       if packerr != nil {
+               testError(t, "(e2termConn) pack NOK %s", packerr.Error())
+       }
+
+       params := &xapp.RMRParams{}
+       params.Mtype = xapp.RIC_SUB_RESP
+       params.SubId = msg.SubId
+       params.Payload = packedMsg.Buf
+       params.Meid = msg.Meid
+       params.Xid = msg.Xid
+       params.Mbuf = nil
+
+       snderr := e2termConn.RmrSend(params)
+       if snderr != nil {
+               testError(t, "(e2termConn) RMR SEND FAILED: %s", snderr.Error())
+       }
+}
+
+func handle_e2term_subs_reqandresp(t *testing.T) {
+       req, msg := handle_e2term_subs_req(t)
+       handle_e2term_subs_resp(t, req, msg)
+}
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+func handle_xapp_subs_resp(t *testing.T) int {
+       xapp.Logger.Info("handle_xapp_subs_resp start")
+       e2SubsResp := e2asnpacker.NewPackerSubscriptionResponse()
+       var e2SubsId int
 
        //---------------------------------
        // xapp activity: Recv Subs Resp
@@ -262,44 +258,61 @@ func TestSubReqAndSubDelOk(t *testing.T) {
        case <-time.After(15 * time.Second):
                testError(t, "(xappConn) Not Received RIC_SUB_RESP within 15 secs")
        }
+       return e2SubsId
+}
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+func handle_xapp_subs_del_req(t *testing.T, e2SubsId int) {
+       xapp.Logger.Info("handle_xapp_subs_del_req start")
+       e2SubsDelReq := e2asnpacker.NewPackerSubscriptionDeleteRequest()
 
        //---------------------------------
        // xapp activity: Send Subs Del Req
        //---------------------------------
-       select {
-       case <-time.After(2 * time.Second):
-               xapp.Logger.Info("(xappConn) Send Subs Del Req")
-               req := createSubsDelReq(uint32(e2SubsId))
-               e2SubsDelReq.Set(req)
-               xapp.Logger.Debug("%s", e2SubsDelReq.String())
-               err, packedMsg := e2SubsDelReq.Pack(nil)
-               if err != nil {
-                       testError(t, "(xappConn) pack NOK %s", err.Error())
-               }
+       //select {
+       //case <-time.After(1 * time.Second):
+       xapp.Logger.Info("(xappConn) Send Subs Del Req")
+       req := createSubsDelReq(uint32(e2SubsId))
+       e2SubsDelReq.Set(req)
+       xapp.Logger.Debug("%s", e2SubsDelReq.String())
+       err, packedMsg := e2SubsDelReq.Pack(nil)
+       if err != nil {
+               testError(t, "(xappConn) pack NOK %s", err.Error())
+       }
 
-               params := &xapp.RMRParams{}
-               params.Mtype = xapp.RIC_SUB_DEL_REQ
-               params.SubId = e2SubsId
-               params.Payload = packedMsg.Buf
-               params.Meid = &xapp.RMRMeid{RanName: "RAN_NAME_1"}
-               params.Xid = "XID_1"
-               params.Mbuf = nil
-
-               snderr := xappConn.RmrSend(params)
-               if snderr != nil {
-                       testError(t, "(xappConn) RMR SEND FAILED: %s", snderr.Error())
-               }
+       params := &xapp.RMRParams{}
+       params.Mtype = xapp.RIC_SUB_DEL_REQ
+       params.SubId = e2SubsId
+       params.Payload = packedMsg.Buf
+       params.Meid = &xapp.RMRMeid{RanName: "RAN_NAME_1"}
+       params.Xid = "XID_1"
+       params.Mbuf = nil
+
+       snderr := xappConn.RmrSend(params)
+       if snderr != nil {
+               testError(t, "(xappConn) RMR SEND FAILED: %s", snderr.Error())
        }
+       //}
+}
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+func handle_e2term_subs_del_req(t *testing.T) (*e2ap.E2APSubscriptionDeleteRequest, *xapp.RMRParams) {
+       xapp.Logger.Info("handle_e2term_subs_del_req start")
+       e2SubsDelReq := e2asnpacker.NewPackerSubscriptionDeleteRequest()
 
        //---------------------------------
-       // e2term activity: Recv Subs Del Req & Send Subs Del Resp
+       // e2term activity: Recv Subs Del Req
        //---------------------------------
        select {
        case msg := <-e2termConn.rmrConChan:
                if msg.Mtype != xapp.RICMessageTypes["RIC_SUB_DEL_REQ"] {
                        testError(t, "(e2termConn) Received non RIC_SUB_DEL_REQ message")
                } else {
-                       xapp.Logger.Info("(e2termConn) Recv Subs Del Req & Send Subs Del Resp")
+                       xapp.Logger.Info("(e2termConn) Recv Subs Del Req")
 
                        packedData := &packer.PackedData{}
                        packedData.Buf = msg.Payload
@@ -311,32 +324,56 @@ func TestSubReqAndSubDelOk(t *testing.T) {
                        if geterr != nil {
                                testError(t, "(e2termConn) RIC_SUB_DEL_REQ get failed err: %s", geterr.Error())
                        }
-
-                       resp := createSubsDelResp(req)
-                       e2SubsDelResp.Set(resp)
-                       xapp.Logger.Debug("%s", e2SubsDelResp.String())
-                       packerr, packedMsg := e2SubsDelResp.Pack(nil)
-                       if packerr != nil {
-                               testError(t, "(e2termConn) pack NOK %s", packerr.Error())
-                       }
-
-                       params := &xapp.RMRParams{}
-                       params.Mtype = xapp.RIC_SUB_DEL_RESP
-                       params.SubId = msg.SubId
-                       params.Payload = packedMsg.Buf
-                       params.Meid = msg.Meid
-                       params.Xid = msg.Xid
-                       params.Mbuf = nil
-
-                       snderr := e2termConn.RmrSend(params)
-                       if snderr != nil {
-                               testError(t, "(e2termConn) RMR SEND FAILED: %s", snderr.Error())
-                       }
-
+                       return req, msg
                }
        case <-time.After(15 * time.Second):
                testError(t, "(e2termConn) Not Received RIC_SUB_DEL_REQ within 15 secs")
        }
+       return nil, nil
+}
+
+func handle_e2term_subs_del_resp(t *testing.T, req *e2ap.E2APSubscriptionDeleteRequest, msg *xapp.RMRParams) {
+       xapp.Logger.Info("handle_e2term_subs_del_resp start")
+       e2SubsDelResp := e2asnpacker.NewPackerSubscriptionDeleteResponse()
+
+       //---------------------------------
+       // e2term activity: Send Subs Del Resp
+       //---------------------------------
+       xapp.Logger.Info("(e2termConn) Send Subs Del Resp")
+       resp := createSubsDelResp(req)
+       e2SubsDelResp.Set(resp)
+       xapp.Logger.Debug("%s", e2SubsDelResp.String())
+       packerr, packedMsg := e2SubsDelResp.Pack(nil)
+       if packerr != nil {
+               testError(t, "(e2termConn) pack NOK %s", packerr.Error())
+       }
+
+       params := &xapp.RMRParams{}
+       params.Mtype = xapp.RIC_SUB_DEL_RESP
+       params.SubId = msg.SubId
+       params.Payload = packedMsg.Buf
+       params.Meid = msg.Meid
+       params.Xid = msg.Xid
+       params.Mbuf = nil
+
+       snderr := e2termConn.RmrSend(params)
+       if snderr != nil {
+               testError(t, "(e2termConn) RMR SEND FAILED: %s", snderr.Error())
+       }
+
+}
+
+func handle_e2term_subs_del_reqandresp(t *testing.T) {
+       req, msg := handle_e2term_subs_del_req(t)
+       handle_e2term_subs_del_resp(t, req, msg)
+}
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+func handle_xapp_subs_del_resp(t *testing.T) {
+       xapp.Logger.Info("handle_xapp_subs_del_resp start")
+       e2SubsDelResp := e2asnpacker.NewPackerSubscriptionDeleteResponse()
 
        //---------------------------------
        // xapp activity: Recv Subs Del Resp
@@ -363,5 +400,163 @@ func TestSubReqAndSubDelOk(t *testing.T) {
        case <-time.After(15 * time.Second):
                testError(t, "(xappConn) Not Received RIC_SUB_DEL_RESP within 15 secs")
        }
+}
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+func handle_wait_subs_clean(t *testing.T, e2SubsId int) bool {
+       xapp.Logger.Info("handle_wait_subs_clean start")
+       if mainCtrl.wait_subs_clean(e2SubsId, 10) == false {
+               testError(t, "(general) no clean within 10 secs")
+               return false
+       }
+       return true
+}
+
+//-----------------------------------------------------------------------------
+// TestSubReqAndSubDelOk
+//
+//   stub                          stub
+// +-------+     +---------+    +---------+
+// | xapp  |     | submgr  |    | e2term  |
+// +-------+     +---------+    +---------+
+//     |              |              |
+//     | SubReq       |              |
+//     |------------->|              |
+//     |              |              |
+//     |              | SubReq       |
+//     |              |------------->|
+//     |              |              |
+//     |              |      SubResp |
+//     |              |<-------------|
+//     |              |              |
+//     |      SubResp |              |
+//     |<-------------|              |
+//     |              |              |
+//     |              |              |
+//     | SubDelReq    |              |
+//     |------------->|              |
+//     |              |              |
+//     |              | SubDelReq    |
+//     |              |------------->|
+//     |              |              |
+//     |              |   SubDelResp |
+//     |              |<-------------|
+//     |              |              |
+//     |   SubDelResp |              |
+//     |<-------------|              |
+//
+//-----------------------------------------------------------------------------
+func TestSubReqAndSubDelOk(t *testing.T) {
+       xapp.Logger.Info("TestSubReqAndSubDelOk start")
+
+       handle_xapp_subs_req(t)
+       handle_e2term_subs_reqandresp(t)
+       e2SubsId := handle_xapp_subs_resp(t)
+
+       handle_xapp_subs_del_req(t, e2SubsId)
+       handle_e2term_subs_del_reqandresp(t)
+       handle_xapp_subs_del_resp(t)
+
+       //Wait that subs is cleaned
+       handle_wait_subs_clean(t, e2SubsId)
+}
+
+//-----------------------------------------------------------------------------
+// TestSubReqRetransmission
+//
+//   stub                          stub
+// +-------+     +---------+    +---------+
+// | xapp  |     | submgr  |    | e2term  |
+// +-------+     +---------+    +---------+
+//     |              |              |
+//     |  SubReq      |              |
+//     |------------->|              |
+//     |              |              |
+//     |              | SubReq       |
+//     |              |------------->|
+//     |              |              |
+//     |  SubReq      |              |
+//     | (retrans)    |              |
+//     |------------->|              |
+//     |              |              |
+//     |              |      SubResp |
+//     |              |<-------------|
+//     |              |              |
+//     |      SubResp |              |
+//     |<-------------|              |
+//     |              |              |
+//     |         [SUBS DELETE]       |
+//     |              |              |
+//
+//-----------------------------------------------------------------------------
+func TestSubReqRetransmission(t *testing.T) {
+       xapp.Logger.Info("TestSubReqRetransmission start")
+
+       //Subs Create
+       handle_xapp_subs_req(t)
+       req, msg := handle_e2term_subs_req(t)
+       handle_xapp_subs_req(t)
+
+       handle_e2term_subs_resp(t, req, msg)
+
+       e2SubsId := handle_xapp_subs_resp(t)
+
+       //Subs Delete
+       handle_xapp_subs_del_req(t, e2SubsId)
+       handle_e2term_subs_del_reqandresp(t)
+       handle_xapp_subs_del_resp(t)
+
+       //Wait that subs is cleaned
+       handle_wait_subs_clean(t, e2SubsId)
+}
+
+//-----------------------------------------------------------------------------
+// TestSubDelReqRetransmission
+//
+//   stub                          stub
+// +-------+     +---------+    +---------+
+// | xapp  |     | submgr  |    | e2term  |
+// +-------+     +---------+    +---------+
+//     |              |              |
+//     |         [SUBS CREATE]       |
+//     |              |              |
+//     |              |              |
+//     | SubDelReq    |              |
+//     |------------->|              |
+//     |              |              |
+//     |              | SubDelReq    |
+//     |              |------------->|
+//     |              |              |
+//     | SubDelReq    |              |
+//     |------------->|              |
+//     |              |              |
+//     |              |   SubDelResp |
+//     |              |<-------------|
+//     |              |              |
+//     |   SubDelResp |              |
+//     |<-------------|              |
+//
+//-----------------------------------------------------------------------------
+func TestSubDelReqRetransmission(t *testing.T) {
+       xapp.Logger.Info("TestSubDelReqRetransmission start")
+
+       //Subs Create
+       handle_xapp_subs_req(t)
+       handle_e2term_subs_reqandresp(t)
+       e2SubsId := handle_xapp_subs_resp(t)
+
+       //Subs Delete
+       handle_xapp_subs_del_req(t, e2SubsId)
+       req, msg := handle_e2term_subs_del_req(t)
+
+       <-time.After(2 * time.Second)
+
+       handle_xapp_subs_del_req(t, e2SubsId)
+
+       handle_e2term_subs_del_resp(t, req, msg)
 
+       //Wait that subs is cleaned
+       handle_wait_subs_clean(t, e2SubsId)
 }
index 1682ae7..65f816e 100644 (file)
@@ -21,83 +21,101 @@ package control
 
 import (
        "fmt"
+       "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
        "sync"
 )
 
+type TransactionKey struct {
+       SubID     uint16 // subscription id / sequence number
+       TransType Action // action ongoing (CREATE/DELETE etc)
+}
+
+type TransactionXappKey struct {
+       Addr string // xapp addr
+       Port uint16 // xapp port
+       Xid  string // xapp xid in req
+}
+
+type Transaction struct {
+       tracker    *Tracker           // tracker instance
+       Key        TransactionKey     // action key
+       Xappkey    TransactionXappKey // transaction key
+       OrigParams *xapp.RMRParams    // request orginal params
+}
+
+func (t *Transaction) SubRouteInfo() SubRouteInfo {
+       return SubRouteInfo{t.Key.TransType, t.Xappkey.Addr, t.Xappkey.Port, t.Key.SubID}
+}
+
 /*
 Implements a record of ongoing transactions and helper functions to CRUD the records.
 */
 type Tracker struct {
-       transactionTable map[TransactionKey]Transaction
-       mutex            sync.Mutex
+       transactionTable     map[TransactionKey]*Transaction
+       transactionXappTable map[TransactionXappKey]*Transaction
+       mutex                sync.Mutex
 }
 
 func (t *Tracker) Init() {
-       t.transactionTable = make(map[TransactionKey]Transaction)
+       t.transactionTable = make(map[TransactionKey]*Transaction)
+       t.transactionXappTable = make(map[TransactionXappKey]*Transaction)
 }
 
 /*
 Checks if a tranascation with similar type has been ongoing. If not then creates one.
 Returns error if there is similar transatcion ongoing.
 */
-func (t *Tracker) TrackTransaction(key TransactionKey, xact Transaction) error {
+func (t *Tracker) TrackTransaction(subID uint16, act Action, addr string, port uint16, params *xapp.RMRParams) (*Transaction, error) {
+       key := TransactionKey{subID, act}
+       xappkey := TransactionXappKey{addr, port, params.Xid}
+       trans := &Transaction{t, key, xappkey, params}
        t.mutex.Lock()
        defer t.mutex.Unlock()
        if _, ok := t.transactionTable[key]; ok {
                // TODO: Implement merge related check here. If the key is same but the value is different.
-               err := fmt.Errorf("transaction tracker: Similar transaction with sub id %d and type %s is ongoing", key.SubID, key.transType)
-               return err
+               err := fmt.Errorf("transaction tracker: Similar transaction with sub id %d and type %s is ongoing", key.SubID, key.TransType)
+               return nil, err
        }
-       t.transactionTable[key] = xact
-       return nil
-}
-
-/*
-Retreives the transaction table entry for the given request.
-Returns error in case the transaction cannot be found.
-*/
-func (t *Tracker) UpdateTransaction(SubID uint16, transType Action, xact Transaction) error {
-       key := TransactionKey{SubID, transType}
-       t.mutex.Lock()
-       defer t.mutex.Unlock()
-       if _, ok := t.transactionTable[key]; ok {
+       if _, ok := t.transactionXappTable[xappkey]; ok {
                // TODO: Implement merge related check here. If the key is same but the value is different.
-               err := fmt.Errorf("transaction tracker: Similar transaction with sub id %d and type %v is ongoing", key.SubID, key.transType)
-               return err
+               err := fmt.Errorf("transaction tracker: Similar transaction with xapp key %v is ongoing", xappkey)
+               return nil, err
        }
-       t.transactionTable[key] = xact
-       return nil
+       t.transactionTable[key] = trans
+       t.transactionXappTable[xappkey] = trans
+       return trans, nil
 }
 
 /*
 Retreives the transaction table entry for the given request.
 Returns error in case the transaction cannot be found.
 */
-func (t *Tracker) RetriveTransaction(subID uint16, act Action) (Transaction, error) {
+func (t *Tracker) RetriveTransaction(subID uint16, act Action) (*Transaction, error) {
        key := TransactionKey{subID, act}
        t.mutex.Lock()
        defer t.mutex.Unlock()
-       var xact Transaction
-       if xact, ok := t.transactionTable[key]; ok {
-               return xact, nil
+       if trans, ok := t.transactionTable[key]; ok {
+               return trans, nil
        }
        err := fmt.Errorf("transaction record for Subscription ID %d and action %s does not exist", subID, act)
-       return xact, err
+       return nil, err
 }
 
 /*
 Deletes the transaction table entry for the given request and returns the deleted xapp's address and port for reference.
 Returns error in case the transaction cannot be found.
 */
-func (t *Tracker) completeTransaction(subID uint16, act Action) (Transaction, error) {
+func (t *Tracker) completeTransaction(subID uint16, act Action) (*Transaction, error) {
        key := TransactionKey{subID, act}
-       var emptyTransaction Transaction
        t.mutex.Lock()
        defer t.mutex.Unlock()
-       if xact, ok := t.transactionTable[key]; ok {
+       if trans, ok1 := t.transactionTable[key]; ok1 {
+               if _, ok2 := t.transactionXappTable[trans.Xappkey]; ok2 {
+                       delete(t.transactionXappTable, trans.Xappkey)
+               }
                delete(t.transactionTable, key)
-               return xact, nil
+               return trans, nil
        }
        err := fmt.Errorf("transaction record for Subscription ID %d and action %s does not exist", subID, act)
-       return emptyTransaction, err
+       return nil, err
 }
index 3a9587f..83312d8 100644 (file)
 
 package control
 
-import (
-       "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
-)
-
 type RmrDatagram struct {
        MessageType    int
        SubscriptionId uint16
@@ -37,14 +33,3 @@ type SubRouteInfo struct {
 }
 
 type Action int
-
-type TransactionKey struct {
-       SubID     uint16
-       transType Action
-}
-
-type Transaction struct {
-       XappInstanceAddress string
-       XappPort            uint16
-       OrigParams          *xapp.RMRParams
-}