RICPLT-2962 Preparation for subs merge 97/2197/6
authorJuha Hyttinen <juha.hyttinen@nokia.com>
Mon, 13 Jan 2020 11:02:26 +0000 (13:02 +0200)
committerJuha Hyttinen <juha.hyttinen@nokia.com>
Tue, 14 Jan 2020 10:34:39 +0000 (12:34 +0200)
Change-Id: I2b2f4ddf6887a83decb0a042e5215152bcd3d555
Signed-off-by: Juha Hyttinen <juha.hyttinen@nokia.com>
Dockerfile
pkg/control/control.go
pkg/control/main_test.go
pkg/control/messaging_test.go
pkg/control/subscription.go
pkg/control/tracker.go
pkg/control/transaction.go
pkg/control/types.go
pkg/control/types_test.go [new file with mode: 0644]

index b3ad890..1fa3ad7 100644 (file)
@@ -26,10 +26,11 @@ RUN apt update && apt install -y iputils-ping net-tools curl tcpdump gdb
 
 WORKDIR /tmp
 
+ARG RMRVERSION=1.13.1
 # Install RMr shared library
-RUN wget --content-disposition https://packagecloud.io/o-ran-sc/staging/packages/debian/stretch/rmr_1.10.0_amd64.deb/download.deb && dpkg -i rmr_1.10.0_amd64.deb && rm -rf rmr_1.10.0_amd64.deb
+RUN wget --content-disposition https://packagecloud.io/o-ran-sc/staging/packages/debian/stretch/rmr_${RMRVERSION}_amd64.deb/download.deb && dpkg -i rmr_${RMRVERSION}_amd64.deb && rm -rf rmr_${RMRVERSION}_amd64.deb
 # Install RMr development header files
-RUN wget --content-disposition https://packagecloud.io/o-ran-sc/staging/packages/debian/stretch/rmr-dev_1.10.0_amd64.deb/download.deb && dpkg -i rmr-dev_1.10.0_amd64.deb && rm -rf rmr-dev_1.10.0_amd64.deb
+RUN wget --content-disposition https://packagecloud.io/o-ran-sc/staging/packages/debian/stretch/rmr-dev_${RMRVERSION}_amd64.deb/download.deb && dpkg -i rmr-dev_${RMRVERSION}_amd64.deb && rm -rf rmr-dev_${RMRVERSION}_amd64.deb
 
 # "PULLING LOG and COMPILING LOG"
 #RUN git clone "https://gerrit.o-ran-sc.org/r/com/log" /opt/log && cd /opt/log && \
@@ -115,7 +116,10 @@ RUN /usr/local/go/bin/go mod tidy
 # unittest
 COPY test/config-file.json test/config-file.json
 ENV CFG_FILE=/opt/submgr/test/config-file.json
-RUN /usr/local/go/bin/go test -count=1 -v ./pkg/control
+
+RUN /usr/local/go/bin/go test -test.coverprofile /tmp/submgr_cover.out -count=1 -v ./pkg/control
+
+RUN /usr/local/go/bin/go tool cover -html=/tmp/submgr_cover.out -o /tmp/submgr_cover.html
 
 # test formating (not important)
 RUN test -z "$(/usr/local/go/bin/gofmt -l pkg/control/*.go)"
index a1b2328..18eeb4c 100755 (executable)
@@ -20,7 +20,8 @@
 package control
 
 import (
-       "errors"
+       "fmt"
+       //"gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/packer"
        rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
        rtmgrhandle "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client/handle"
        "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
@@ -32,6 +33,10 @@ import (
        "time"
 )
 
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+
 var subReqTime time.Duration = 5 * time.Second
 var subDelReqTime time.Duration = 5 * time.Second
 var maxSubReqTryCount uint64 = 2    // Initial try + retry
@@ -113,32 +118,64 @@ func (c *Control) Run() {
        xapp.Run(c)
 }
 
-func (c *Control) rmrSend(params *xapp.RMRParams) (err error) {
+func (c *Control) rmrSendRaw(desc string, params *RMRParams) (err error) {
+
+       xapp.Logger.Info("%s: %s", desc, params.String())
        status := false
        i := 1
        for ; i <= 10 && status == false; i++ {
                c.rmrSendMutex.Lock()
-               status = xapp.Rmr.Send(params, false)
+               status = xapp.Rmr.Send(params.RMRParams, false)
                c.rmrSendMutex.Unlock()
                if status == false {
-                       xapp.Logger.Info("rmr.Send() failed. Retry count %v, Mtype: %v, SubId: %v, Xid %s", i, params.Mtype, params.SubId, params.Xid)
+                       xapp.Logger.Info("rmr.Send() failed. Retry count %d, %s", i, params.String())
                        time.Sleep(500 * time.Millisecond)
                }
        }
        if status == false {
-               err = errors.New("rmr.Send() failed")
+               err = fmt.Errorf("rmr.Send() failed. Retry count %d, %s", i, params.String())
+               xapp.Logger.Error("%s: %s", desc, err.Error())
                xapp.Rmr.Free(params.Mbuf)
        }
        return
 }
 
-func (c *Control) rmrReplyToSender(params *xapp.RMRParams) (err error) {
-       c.rmrSend(params)
-       return
+func (c *Control) rmrSend(desc string, subs *Subscription, trans *Transaction, payload []byte, payloadLen int) (err error) {
+       params := &RMRParams{&xapp.RMRParams{}}
+       params.Mtype = trans.GetMtype()
+       params.SubId = int(subs.GetSubId())
+       params.Xid = trans.GetXid()
+       params.Meid = subs.GetMeid()
+       params.Src = ""
+       params.PayloadLen = payloadLen
+       params.Payload = payload
+       params.Mbuf = nil
+
+       return c.rmrSendRaw(desc, params)
 }
 
-func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
+func (c *Control) rmrReplyToSender(desc string, subs *Subscription, trans *Transaction, mType int, payload []byte, payloadLen int) (err error) {
+       params := &RMRParams{&xapp.RMRParams{}}
+       params.Mtype = mType
+       params.SubId = int(subs.GetSubId())
+       params.Xid = trans.GetXid()
+       params.Meid = subs.GetMeid()
+       params.Src = ""
+       params.PayloadLen = payloadLen
+       params.Payload = payload
+       params.Mbuf = nil
+
+       return c.rmrSendRaw(desc, params)
+}
+
+func (c *Control) Consume(params *xapp.RMRParams) (err error) {
+       xapp.Rmr.Free(params.Mbuf)
+       params.Mbuf = nil
+
+       msg := &RMRParams{params}
+
        c.msgCounter++
+
        switch msg.Mtype {
        case xapp.RICMessageTypes["RIC_SUB_REQ"]:
                go c.handleSubscriptionRequest(msg)
@@ -158,10 +195,8 @@ func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
        return nil
 }
 
-func (c *Control) handleSubscriptionRequest(params *xapp.RMRParams) {
-       xapp.Logger.Info("SubReq received from Src: %s, Mtype: %v, SubId: %v, Xid: %s, Meid: %v", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid)
-       xapp.Rmr.Free(params.Mbuf)
-       params.Mbuf = nil
+func (c *Control) handleSubscriptionRequest(params *RMRParams) {
+       xapp.Logger.Info("SubReq from xapp: %s", params.String())
 
        srcAddr, srcPort, err := c.rtmgrClient.SplitSource(params.Src)
        if err != nil {
@@ -175,10 +210,47 @@ func (c *Control) handleSubscriptionRequest(params *xapp.RMRParams) {
                return
        }
 
-       params.SubId = int(subs.Seq)
-       err = c.e2ap.SetSubscriptionRequestSequenceNumber(params.Payload, subs.Seq)
+       //
+       // WIP RICPLT-2979
+       //
+       /*
+               e2SubReq := packerif.NewPackerSubscriptionRequest()
+               packedData := &packer.PackedData{}
+               packedData.Buf = params.Payload
+               err = e2SubReq.UnPack(packedData)
+               if err != nil {
+                       xapp.Logger.Error("SubReq: UnPack() failed: %s", err.Error())
+               }
+               getErr, subReq := e2SubReq.Get()
+               if getErr != nil {
+                       xapp.Logger.Error("SubReq: Get() failed: %s", err.Error())
+               }
+
+
+               subReq.RequestId.Seq = uint32(subs.GetSubId())
+
+               err = e2SubReq.Set(subReq)
+               if err != nil {
+                       xapp.Logger.Error("SubReq: Set() failed: %s", err.Error())
+                       return
+               }
+               err, packedData = e2SubReq.Pack(nil)
+               if err != nil {
+                       xapp.Logger.Error("SubReq: Pack() failed: %s", err.Error())
+                       return
+               }
+
+               params.PayloadLen = len(packedData.Buf)
+               params.Payload = packedData.Buf
+       */
+       //
+       //
+       //
+
+       params.SubId = int(subs.GetSubId())
+       err = c.e2ap.SetSubscriptionRequestSequenceNumber(params.Payload, subs.GetSubId())
        if err != nil {
-               xapp.Logger.Error("SubReq: Unable to get Sequence Number from Payload. Dropping this msg. Err: %v, SubId: %v, Xid: %s, Payload %X", err, params.SubId, params.Xid, params.Payload)
+               xapp.Logger.Error("SubReq: Unable to get Sequence Number from Payload. Dropping this msg. Err: %v, %s", err, params.String())
                c.registry.DelSubscription(subs.Seq)
                return
        }
@@ -186,28 +258,30 @@ func (c *Control) handleSubscriptionRequest(params *xapp.RMRParams) {
        // Create transatcion record for every subscription request
        var forwardRespToXapp bool = true
        var responseReceived bool = false
-       _, err = c.tracker.TrackTransaction(subs, RmrEndpoint{*srcAddr, *srcPort}, params, responseReceived, forwardRespToXapp)
+       trans, err := c.tracker.TrackTransaction(RmrEndpoint{*srcAddr, *srcPort}, params, responseReceived, forwardRespToXapp)
        if err != nil {
                xapp.Logger.Error("SubReq: %s, Dropping this msg.", err.Error())
                c.registry.DelSubscription(subs.Seq)
                return
        }
 
-       // Setting new subscription ID in the RMR header
-       xapp.Logger.Info("SubReq: Forwarding SubReq to E2T: Mtype: %v, SubId: %v, Xid %s, Meid %v", params.Mtype, params.SubId, params.Xid, params.Meid)
-       err = c.rmrSend(params)
+       err = subs.SetTransaction(trans)
        if err != nil {
-               xapp.Logger.Error("SubReq: Failed to send request to E2T %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
+               xapp.Logger.Error("SubReq: %s, Dropping this msg.", err.Error())
+               c.registry.DelSubscription(subs.Seq)
+               trans.Release()
+               return
        }
+
+       c.rmrSend("SubReq to E2T", subs, trans, params.Payload, params.PayloadLen)
+
        c.timerMap.StartTimer("RIC_SUB_REQ", int(subs.Seq), subReqTime, FirstTry, c.handleSubscriptionRequestTimer)
-       xapp.Logger.Debug("SubReq: Debugging transaction table = %v", c.tracker.transactionXappTable)
+       xapp.Logger.Debug("SubReq: Debugging trans table = %v", c.tracker.transactionXappTable)
        return
 }
 
-func (c *Control) handleSubscriptionResponse(params *xapp.RMRParams) {
-       xapp.Logger.Info("SubResp received from Src: %s, Mtype: %v, SubId: %v, Meid: %v", params.Src, params.Mtype, params.SubId, params.Meid)
-       xapp.Rmr.Free(params.Mbuf)
-       params.Mbuf = nil
+func (c *Control) handleSubscriptionResponse(params *RMRParams) {
+       xapp.Logger.Info("SubResp from E2T: %s", params.String())
 
        payloadSeqNum, err := c.e2ap.GetSubscriptionResponseSequenceNumber(params.Payload)
        if err != nil {
@@ -222,37 +296,25 @@ func (c *Control) handleSubscriptionResponse(params *xapp.RMRParams) {
                return
        }
 
-       transaction := subs.GetTransaction()
+       trans := subs.GetTransaction()
 
        c.timerMap.StopTimer("RIC_SUB_REQ", int(payloadSeqNum))
 
-       responseReceived := transaction.CheckResponseReceived()
+       responseReceived := trans.CheckResponseReceived()
        if responseReceived == true {
                // Subscription timer already received
                return
        }
-       xapp.Logger.Info("SubResp: SubId: %v, from address: %s.", payloadSeqNum, transaction.RmrEndpoint)
 
        subs.Confirmed()
-       transaction.Release()
-
-       params.SubId = int(payloadSeqNum)
-       params.Xid = transaction.OrigParams.Xid
-
-       xapp.Logger.Info("SubResp: Forwarding Subscription Response to xApp Mtype: %v, SubId: %v, Meid: %v", params.Mtype, params.SubId, params.Meid)
-       err = c.rmrReplyToSender(params)
-       if err != nil {
-               xapp.Logger.Error("SubResp: Failed to send response to xApp. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
-       }
-
-       xapp.Logger.Info("SubResp: SubId: %v, from address: %s. Deleting transaction record", payloadSeqNum, transaction.RmrEndpoint)
+       trans.Release()
+       c.rmrReplyToSender("SubResp to xapp", subs, trans, params.Mtype, params.Payload, params.PayloadLen)
+       xapp.Logger.Info("SubResp: SubId: %v, from address: %s. Deleting trans record", payloadSeqNum, trans.RmrEndpoint)
        return
 }
 
-func (c *Control) handleSubscriptionFailure(params *xapp.RMRParams) {
-       xapp.Logger.Info("SubFail received from Src: %s, Mtype: %v, SubId: %v, Meid: %v", params.Src, params.Mtype, params.SubId, params.Meid)
-       xapp.Rmr.Free(params.Mbuf)
-       params.Mbuf = nil
+func (c *Control) handleSubscriptionFailure(params *RMRParams) {
+       xapp.Logger.Info("SubFail from E2T: %s", params.String())
 
        payloadSeqNum, err := c.e2ap.GetSubscriptionFailureSequenceNumber(params.Payload)
        if err != nil {
@@ -267,15 +329,15 @@ func (c *Control) handleSubscriptionFailure(params *xapp.RMRParams) {
                return
        }
 
-       transaction := subs.GetTransaction()
-       if transaction == nil {
-               xapp.Logger.Error("SubFail: Unknown transaction. Dropping this msg. PayloadSeqNum: %v, SubId: %v", payloadSeqNum, params.SubId)
+       trans := subs.GetTransaction()
+       if trans == nil {
+               xapp.Logger.Error("SubFail: Unknown trans. Dropping this msg. PayloadSeqNum: %v, SubId: %v", payloadSeqNum, params.SubId)
                return
        }
 
        c.timerMap.StopTimer("RIC_SUB_REQ", int(payloadSeqNum))
 
-       responseReceived := transaction.CheckResponseReceived()
+       responseReceived := trans.CheckResponseReceived()
        if err != nil {
                xapp.Logger.Info("SubFail: Dropping this msg. Err: %v SubId: %v", err, payloadSeqNum)
                return
@@ -285,21 +347,14 @@ func (c *Control) handleSubscriptionFailure(params *xapp.RMRParams) {
                // Subscription timer already received
                return
        }
-       xapp.Logger.Info("SubFail: SubId: %v, from address: %s. Forwarding response to xApp", payloadSeqNum, transaction.RmrEndpoint)
-
-       params.SubId = int(payloadSeqNum)
-       params.Xid = transaction.OrigParams.Xid
+       xapp.Logger.Info("SubFail: SubId: %v, from address: %s. Forwarding response to xApp", payloadSeqNum, trans.RmrEndpoint)
 
-       xapp.Logger.Info("SubFail: Forwarding SubFail to xApp: Mtype: %v, SubId: %v, Xid: %v, Meid: %v", params.Mtype, params.SubId, params.Xid, params.Meid)
-       err = c.rmrReplyToSender(params)
-       if err != nil {
-               xapp.Logger.Error("SubFail: Failed to send response to xApp. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
-       }
+       c.rmrReplyToSender("SubFail to xapp", subs, trans, params.Mtype, params.Payload, params.PayloadLen)
 
        time.Sleep(3 * time.Second)
 
-       xapp.Logger.Info("SubFail: Deleting transaction record. SubId: %v, Xid: %s", params.SubId, params.Xid)
-       transaction.Release()
+       xapp.Logger.Info("SubFail: Deleting trans record. SubId: %v, Xid: %s", params.SubId, params.Xid)
+       trans.Release()
        if !c.registry.DelSubscription(payloadSeqNum) {
                xapp.Logger.Error("SubFail: Failed to release sequency number. SubId: %v, Xid: %s", params.SubId, params.Xid)
        }
@@ -307,22 +362,21 @@ func (c *Control) handleSubscriptionFailure(params *xapp.RMRParams) {
 }
 
 func (c *Control) handleSubscriptionRequestTimer(strId string, nbrId int, tryCount uint64) {
-       subId := uint16(nbrId)
-       xapp.Logger.Info("handleSubTimer: SubReq timer expired. subId: %v,  tryCount: %v", subId, tryCount)
+       xapp.Logger.Info("SubReq timeout: subId: %v,  tryCount: %v", nbrId, tryCount)
 
-       subs := c.registry.GetSubscription(subId)
+       subs := c.registry.GetSubscription(uint16(nbrId))
        if subs == nil {
-               xapp.Logger.Error("SubFail: Unknown payloadSeqNum. Dropping this msg. SubId: %v", subId)
+               xapp.Logger.Error("SubReq timeout: Unknown payloadSeqNum. Dropping this msg. SubId: %v", nbrId)
                return
        }
 
-       transaction := subs.GetTransaction()
-       if transaction == nil {
-               xapp.Logger.Error("SubFail: Unknown transaction. Dropping this msg. SubId: %v", subId)
+       trans := subs.GetTransaction()
+       if trans == nil {
+               xapp.Logger.Error("SubReq timeout: Unknown trans. Dropping this msg. SubId: %v", subs.GetSubId())
                return
        }
 
-       responseReceived := transaction.CheckResponseReceived()
+       responseReceived := trans.CheckResponseReceived()
 
        if responseReceived == true {
                // Subscription Response or Failure already received
@@ -330,84 +384,68 @@ func (c *Control) handleSubscriptionRequestTimer(strId string, nbrId int, tryCou
        }
 
        if tryCount < maxSubReqTryCount {
-               xapp.Logger.Info("handleSubTimer: Resending SubReq to E2T: Mtype: %v, SubId: %v, Xid %s, Meid %v", transaction.OrigParams.Mtype, transaction.OrigParams.SubId, transaction.OrigParams.Xid, transaction.OrigParams.Meid)
+               xapp.Logger.Info("SubReq timeout: Resending SubReq to E2T: Mtype: %v, SubId: %v, Xid %s, Meid %v", trans.OrigParams.Mtype, subs.GetSubId(), trans.GetXid(), subs.GetMeid())
 
-               transaction.RetryTransaction()
+               trans.RetryTransaction()
 
-               err := c.rmrSend(transaction.OrigParams)
-               if err != nil {
-                       xapp.Logger.Error("handleSubTimer: Failed to send request to E2T %v, SubId: %v, Xid: %s", err, transaction.OrigParams.SubId, transaction.OrigParams.Xid)
-               }
+               c.rmrSend("SubReq(SubReq timer) to E2T", subs, trans, trans.OrigParams.Payload, trans.OrigParams.PayloadLen)
 
                tryCount++
-               c.timerMap.StartTimer("RIC_SUB_REQ", int(subId), subReqTime, tryCount, c.handleSubscriptionRequestTimer)
+               c.timerMap.StartTimer("RIC_SUB_REQ", int(subs.GetSubId()), subReqTime, tryCount, c.handleSubscriptionRequestTimer)
                return
        }
 
        var subDelReqPayload []byte
-       subDelReqPayload, err := c.e2ap.PackSubscriptionDeleteRequest(transaction.OrigParams.Payload, subId)
+       subDelReqPayload, err := c.e2ap.PackSubscriptionDeleteRequest(trans.OrigParams.Payload, subs.GetSubId())
        if err != nil {
-               xapp.Logger.Error("handleSubTimer: Packing SubDelReq failed. Err: %v", err)
+               xapp.Logger.Error("SubReq timeout: Packing SubDelReq failed. Err: %v", err)
                return
        }
 
        // Cancel failed subscription
-       var params xapp.RMRParams
+       params := &RMRParams{&xapp.RMRParams{}}
        params.Mtype = 12020 // RIC SUBSCRIPTION DELETE
-       params.SubId = int(subId)
-       params.Xid = transaction.OrigParams.Xid
-       params.Meid = transaction.OrigParams.Meid
-       params.Src = transaction.OrigParams.Src
+       params.SubId = int(subs.GetSubId())
+       params.Xid = trans.GetXid()
+       params.Meid = subs.GetMeid()
+       params.Src = trans.OrigParams.Src
        params.PayloadLen = len(subDelReqPayload)
        params.Payload = subDelReqPayload
        params.Mbuf = nil
 
        // Delete CREATE transaction
-       transaction.Release()
+       trans.Release()
 
-       // Create DELETE transaction
-       _, err = c.trackDeleteTransaction(subs, &params, subId, false)
+       // Create DELETE transaction (internal and no messages toward xapp)
+       var forwardRespToXapp bool = false
+       var respReceived bool = false
+       deltrans, err := c.tracker.TrackTransaction(trans.RmrEndpoint, params, respReceived, forwardRespToXapp)
        if err != nil {
-               xapp.Logger.Error("handleSubTimer: %s, Dropping this msg.", err.Error())
+               xapp.Logger.Error("SubReq timeout: %s, Dropping this msg.", err.Error())
                return
        }
 
-       xapp.Logger.Info("handleSubTimer: Sending SubDelReq to E2T: Mtype: %v, SubId: %v, Meid: %v", params.Mtype, params.SubId, params.Meid)
-       c.rmrSend(&params)
+       err = subs.SetTransaction(deltrans)
        if err != nil {
-               xapp.Logger.Error("handleSubTimer: Failed to send request to E2T %v. SubId: %v, Xid: %s", err, params.SubId, params.Xid)
+               xapp.Logger.Error("SubReq timeout: %s, Dropping this msg.", err.Error())
+               deltrans.Release()
+               return
        }
-       c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(subId), subDelReqTime, FirstTry, c.handleSubscriptionDeleteRequestTimer)
-       return
-}
 
-func (act Action) String() string {
-       actions := [...]string{
-               "CREATE",
-               "MERGE",
-               "NONE",
-               "DELETE",
-       }
+       c.rmrSend("SubDelReq(SubReq timer) to E2T", subs, deltrans, deltrans.OrigParams.Payload, deltrans.OrigParams.PayloadLen)
 
-       if act < CREATE || act > DELETE {
-               return "Unknown"
-       }
-       return actions[act]
+       c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()), subDelReqTime, FirstTry, c.handleSubscriptionDeleteRequestTimer)
+       return
 }
 
-func (act Action) valid() bool {
-       switch act {
-       case CREATE, MERGE, DELETE:
-               return true
-       default:
-               return false
-       }
-}
+func (c *Control) handleSubscriptionDeleteRequest(params *RMRParams) {
+       xapp.Logger.Info("SubDelReq from xapp: %s", params.String())
 
-func (c *Control) handleSubscriptionDeleteRequest(params *xapp.RMRParams) {
-       xapp.Logger.Info("SubDelReq received from Src: %s, Mtype: %v, SubId: %v, Xid: %s, Meid: %v", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid)
-       xapp.Rmr.Free(params.Mbuf)
-       params.Mbuf = nil
+       srcAddr, srcPort, err := c.rtmgrClient.SplitSource(params.Src)
+       if err != nil {
+               xapp.Logger.Error("SubDelReq: Failed to update routing-manager. Dropping this msg. Err: %s, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
+               return
+       }
 
        payloadSeqNum, err := c.e2ap.GetSubscriptionDeleteRequestSequenceNumber(params.Payload)
        if err != nil {
@@ -417,42 +455,36 @@ func (c *Control) handleSubscriptionDeleteRequest(params *xapp.RMRParams) {
        xapp.Logger.Info("SubDelReq: Received payloadSeqNum: %v", payloadSeqNum)
 
        subs := c.registry.GetSubscription(payloadSeqNum)
-       if subs != nil {
-               var forwardRespToXapp bool = true
-               _, err = c.trackDeleteTransaction(subs, params, payloadSeqNum, forwardRespToXapp)
-               if err != nil {
-                       xapp.Logger.Error("SubDelReq: %s, Dropping this msg.", err.Error())
-                       return
-               }
-               subs.UnConfirmed()
-       } else {
+       if subs == nil {
                xapp.Logger.Error("SubDelReq: Not valid sequence number. Dropping this msg. SubId: %v, Xid: %s", params.SubId, params.Xid)
                return
        }
 
-       xapp.Logger.Info("SubDelReq: Forwarding Request to E2T. Mtype: %v, SubId: %v, Xid: %s, Meid: %v", params.Mtype, params.SubId, params.Xid, params.Meid)
-       c.rmrSend(params)
+       var forwardRespToXapp bool = true
+       var respReceived bool = false
+       trans, err := c.tracker.TrackTransaction(RmrEndpoint{*srcAddr, *srcPort}, params, respReceived, forwardRespToXapp)
        if err != nil {
-               xapp.Logger.Error("SubDelReq: Failed to send request to E2T. Err %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
+               xapp.Logger.Error("SubDelReq: %s, Dropping this msg.", err.Error())
+               return
        }
-       c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(payloadSeqNum), subDelReqTime, FirstTry, c.handleSubscriptionDeleteRequestTimer)
-       return
-}
 
-func (c *Control) trackDeleteTransaction(subs *Subscription, params *xapp.RMRParams, payloadSeqNum uint16, forwardRespToXapp bool) (transaction *Transaction, err error) {
-       srcAddr, srcPort, err := c.rtmgrClient.SplitSource(params.Src)
+       err = subs.SetTransaction(trans)
        if err != nil {
-               xapp.Logger.Error("Failed to split source address. Err: %s, SubId: %v, Xid: %s", err, payloadSeqNum, params.Xid)
+               xapp.Logger.Error("SubDelReq: %s, Dropping this msg.", err.Error())
+               trans.Release()
+               return
        }
-       var respReceived bool = false
-       transaction, err = c.tracker.TrackTransaction(subs, RmrEndpoint{*srcAddr, *srcPort}, params, respReceived, forwardRespToXapp)
+
+       subs.UnConfirmed()
+
+       c.rmrSend("SubDelReq to E2T", subs, trans, trans.OrigParams.Payload, trans.OrigParams.PayloadLen)
+
+       c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()), subDelReqTime, FirstTry, c.handleSubscriptionDeleteRequestTimer)
        return
 }
 
-func (c *Control) handleSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
-       xapp.Logger.Info("SubDelResp received from Src: %s, Mtype: %v, SubId: %v, Meid: %v", params.Src, params.Mtype, params.SubId, params.Meid)
-       xapp.Rmr.Free(params.Mbuf)
-       params.Mbuf = nil
+func (c *Control) handleSubscriptionDeleteResponse(params *RMRParams) (err error) {
+       xapp.Logger.Info("SubDelResp from E2T:%s", params.String())
 
        payloadSeqNum, err := c.e2ap.GetSubscriptionDeleteResponseSequenceNumber(params.Payload)
        if err != nil {
@@ -467,51 +499,41 @@ func (c *Control) handleSubscriptionDeleteResponse(params *xapp.RMRParams) (err
                return
        }
 
-       transaction := subs.GetTransaction()
-       if transaction == nil {
-               xapp.Logger.Error("SubDelResp: Unknown transaction. Dropping this msg. PayloadSeqNum: %v, SubId: %v", payloadSeqNum, params.SubId)
+       trans := subs.GetTransaction()
+       if trans == nil {
+               xapp.Logger.Error("SubDelResp: Unknown trans. Dropping this msg. PayloadSeqNum: %v, SubId: %v", subs.GetSubId(), params.SubId)
                return
        }
 
-       c.timerMap.StopTimer("RIC_SUB_DEL_REQ", int(payloadSeqNum))
+       c.timerMap.StopTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()))
 
-       responseReceived := transaction.CheckResponseReceived()
+       responseReceived := trans.CheckResponseReceived()
        if responseReceived == true {
                // Subscription Delete timer already received
                return
        }
 
-       transaction.Release()
-
-       xapp.Logger.Info("SubDelResp: SubId: %v, from address: %s. Forwarding response to xApp", payloadSeqNum, transaction.RmrEndpoint)
-       if transaction.ForwardRespToXapp == true {
-               params.SubId = int(payloadSeqNum)
-               params.Xid = transaction.OrigParams.Xid
-               xapp.Logger.Info("Forwarding SubDelResp to xApp: Mtype: %v, SubId: %v, Xid: %v, Meid: %v", params.Mtype, params.SubId, params.Xid, params.Meid)
-               err = c.rmrReplyToSender(params)
-               if err != nil {
-                       xapp.Logger.Error("SubDelResp: Failed to send response to xApp. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
-               }
+       trans.Release()
 
+       if trans.ForwardRespToXapp == true {
+               c.rmrReplyToSender("SubDelResp to xapp", subs, trans, params.Mtype, params.Payload, params.PayloadLen)
                time.Sleep(3 * time.Second)
        }
 
-       xapp.Logger.Info("SubDelResp: Deleting transaction record. SubId: %v, Xid: %s", params.SubId, params.Xid)
-       if !c.registry.DelSubscription(payloadSeqNum) {
-               xapp.Logger.Error("SubDelResp: Failed to release sequency number. SubId: %v, Xid: %s", params.SubId, params.Xid)
+       xapp.Logger.Info("SubDelResp: Deleting trans record. SubId: %v, Xid: %s", subs.GetSubId(), trans.GetXid())
+       if !c.registry.DelSubscription(subs.GetSubId()) {
+               xapp.Logger.Error("SubDelResp: Failed to release sequency number. SubId: %v, Xid: %s", subs.GetSubId(), trans.GetXid())
                return
        }
        return
 }
 
-func (c *Control) handleSubscriptionDeleteFailure(params *xapp.RMRParams) {
-       xapp.Logger.Info("SubDelFail received from Src: %s, Mtype: %v, SubId: %v, Meid: %v", params.Src, params.Mtype, params.SubId, params.Meid)
-       xapp.Rmr.Free(params.Mbuf)
-       params.Mbuf = nil
+func (c *Control) handleSubscriptionDeleteFailure(params *RMRParams) {
+       xapp.Logger.Info("SubDelFail from E2T:%s", params.String())
 
        payloadSeqNum, err := c.e2ap.GetSubscriptionDeleteFailureSequenceNumber(params.Payload)
        if err != nil {
-               xapp.Logger.Error("SubDelFail: Unable to get Sequence Number from Payload. Dropping this msg. Err: %v, SubId: %v, Xid: %s, Payload %X", err, params.SubId, params.Xid, params.Payload)
+               xapp.Logger.Error("SubDelFail: Unable to get Sequence Number from Payload. Dropping this msg. Err: %v, %s", err, params.String())
                return
        }
        xapp.Logger.Info("SubDelFail: Received payloadSeqNum: %v", payloadSeqNum)
@@ -522,124 +544,94 @@ func (c *Control) handleSubscriptionDeleteFailure(params *xapp.RMRParams) {
                return
        }
 
-       transaction := subs.GetTransaction()
-       if transaction == nil {
-               xapp.Logger.Error("SubDelFail: Unknown transaction. Dropping this msg. PayloadSeqNum: %v, SubId: %v", payloadSeqNum, params.SubId)
+       trans := subs.GetTransaction()
+       if trans == nil {
+               xapp.Logger.Error("SubDelFail: Unknown trans. Dropping this msg. PayloadSeqNum: %v, SubId: %v", subs.GetSubId(), params.SubId)
                return
        }
 
-       c.timerMap.StopTimer("RIC_SUB_DEL_REQ", int(payloadSeqNum))
+       c.timerMap.StopTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()))
 
-       responseReceived := transaction.CheckResponseReceived()
+       responseReceived := trans.CheckResponseReceived()
        if responseReceived == true {
                // Subscription Delete timer already received
                return
        }
-       xapp.Logger.Info("SubDelFail: SubId: %v, from address: %s. Forwarding response to xApp", payloadSeqNum, transaction.RmrEndpoint)
-
-       if transaction.ForwardRespToXapp == true {
+       if trans.ForwardRespToXapp == true {
                var subDelRespPayload []byte
-               subDelRespPayload, err = c.e2ap.PackSubscriptionDeleteResponse(transaction.OrigParams.Payload, payloadSeqNum)
+               subDelRespPayload, err = c.e2ap.PackSubscriptionDeleteResponse(trans.OrigParams.Payload, subs.GetSubId())
                if err != nil {
                        xapp.Logger.Error("SubDelFail:Packing SubDelResp failed. Err: %v", err)
                        return
                }
 
-               params.Mtype = 12021 // RIC SUBSCRIPTION DELETE RESPONSE
-               params.SubId = int(payloadSeqNum)
-               params.Xid = transaction.OrigParams.Xid
-               params.Meid = transaction.OrigParams.Meid
-               params.Src = transaction.OrigParams.Src
-               params.PayloadLen = len(subDelRespPayload)
-               params.Payload = subDelRespPayload
-               params.Mbuf = nil
-               xapp.Logger.Info("SubDelFail: Forwarding SubDelResp to xApp: Mtype: %v, SubId: %v, Xid: %v, Meid: %v", params.Mtype, params.SubId, params.Xid, params.Meid)
-               err = c.rmrReplyToSender(params)
-               if err != nil {
-                       xapp.Logger.Error("SubDelFail: Failed to send SubDelResp to xApp. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
-               }
-
+               // RIC SUBSCRIPTION DELETE RESPONSE
+               c.rmrReplyToSender("SubDelFail to xapp", subs, trans, 12021, subDelRespPayload, len(subDelRespPayload))
                time.Sleep(3 * time.Second)
        }
 
-       xapp.Logger.Info("SubDelFail: Deleting transaction record. SubId: %v, Xid: %s", params.SubId, params.Xid)
-       transaction.Release()
-       if !c.registry.DelSubscription(payloadSeqNum) {
-               xapp.Logger.Error("SubDelFail: Failed to release sequency number. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
+       xapp.Logger.Info("SubDelFail: Deleting trans record. SubId: %v, Xid: %s", subs.GetSubId(), trans.GetXid())
+       trans.Release()
+       if !c.registry.DelSubscription(subs.GetSubId()) {
+               xapp.Logger.Error("SubDelFail: Failed to release sequency number. Err: %v, SubId: %v, Xid: %s", err, subs.GetSubId(), trans.GetXid())
                return
        }
        return
 }
 
 func (c *Control) handleSubscriptionDeleteRequestTimer(strId string, nbrId int, tryCount uint64) {
-       subId := uint16(nbrId)
-       xapp.Logger.Info("handleSubDelTimer: SubDelReq timer expired. subId: %v, tryCount: %v", subId, tryCount)
+       xapp.Logger.Info("SubDelReq timeout: subId: %v, tryCount: %v", nbrId, tryCount)
 
-       subs := c.registry.GetSubscription(subId)
+       subs := c.registry.GetSubscription(uint16(nbrId))
        if subs == nil {
-               xapp.Logger.Error("handleSubDelTimer: Unknown payloadSeqNum. Dropping this msg. SubId: %v", subId)
+               xapp.Logger.Error("SubDelReq timeout: Unknown payloadSeqNum. Dropping this msg. SubId: %v", nbrId)
                return
        }
 
-       transaction := subs.GetTransaction()
-       if transaction == nil {
-               xapp.Logger.Error("handleSubDelTimer: Unknown transaction. Dropping this msg. SubId: %v", subId)
+       trans := subs.GetTransaction()
+       if trans == nil {
+               xapp.Logger.Error("SubDelReq timeout: Unknown trans. Dropping this msg. SubId: %v", subs.GetSubId())
                return
        }
 
-       responseReceived := transaction.CheckResponseReceived()
+       responseReceived := trans.CheckResponseReceived()
        if responseReceived == true {
                // Subscription Delete Response or Failure already received
                return
        }
 
        if tryCount < maxSubDelReqTryCount {
-               xapp.Logger.Info("handleSubDelTimer: Resending SubDelReq to E2T: Mtype: %v, SubId: %v, Xid %s, Meid %v", transaction.OrigParams.Mtype, transaction.OrigParams.SubId, transaction.OrigParams.Xid, transaction.OrigParams.Meid)
+               xapp.Logger.Info("SubDelReq timeout: Resending SubDelReq to E2T: Mtype: %v, SubId: %v, Xid %s, Meid %v", trans.OrigParams.Mtype, subs.GetSubId(), trans.GetXid(), subs.GetMeid())
                // Set possible to handle new response for the subId
 
-               transaction.RetryTransaction()
+               trans.RetryTransaction()
 
-               err := c.rmrSend(transaction.OrigParams)
-               if err != nil {
-                       xapp.Logger.Error("handleSubDelTimer: Failed to send request to E2T %v, SubId: %v, Xid: %s", err, transaction.OrigParams.SubId, transaction.OrigParams.Xid)
-               }
+               c.rmrSend("SubDelReq(SubDelReq timer) to E2T", subs, trans, trans.OrigParams.Payload, trans.OrigParams.PayloadLen)
 
                tryCount++
-               c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(subId), subReqTime, tryCount, c.handleSubscriptionDeleteRequestTimer)
+               c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()), subReqTime, tryCount, c.handleSubscriptionDeleteRequestTimer)
                return
        }
 
-       var params xapp.RMRParams
-       if transaction.ForwardRespToXapp == true {
+       if trans.ForwardRespToXapp == true {
                var subDelRespPayload []byte
-               subDelRespPayload, err := c.e2ap.PackSubscriptionDeleteResponse(transaction.OrigParams.Payload, subId)
+               subDelRespPayload, err := c.e2ap.PackSubscriptionDeleteResponse(trans.OrigParams.Payload, subs.GetSubId())
                if err != nil {
-                       xapp.Logger.Error("handleSubDelTimer: Unable to pack payload. Dropping this this msg. Err: %v, SubId: %v, Xid: %s, Payload %x", err, subId, transaction.OrigParams.Xid, transaction.OrigParams.Payload)
+                       xapp.Logger.Error("SubDelReq timeout: Unable to pack payload. Dropping this this msg. Err: %v, SubId: %v, Xid: %s, Payload %x", err, subs.GetSubId(), trans.GetXid(), trans.OrigParams.Payload)
                        return
                }
 
-               params.Mtype = 12021 // RIC SUBSCRIPTION DELETE RESPONSE
-               params.SubId = int(subId)
-               params.Meid = transaction.OrigParams.Meid
-               params.Xid = transaction.OrigParams.Xid
-               params.Src = transaction.OrigParams.Src
-               params.PayloadLen = len(subDelRespPayload)
-               params.Payload = subDelRespPayload
-               params.Mbuf = nil
-
-               xapp.Logger.Info("handleSubDelTimer: Sending SubDelResp to xApp: Mtype: %v, SubId: %v, Xid: %s, Meid: %v", params.Mtype, params.SubId, params.Xid, params.Meid)
-               err = c.rmrReplyToSender(&params)
-               if err != nil {
-                       xapp.Logger.Error("handleSubDelTimer: Failed to send response to xApp: Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
-               }
+               // RIC SUBSCRIPTION DELETE RESPONSE
+               c.rmrReplyToSender("SubDelResp(SubDelReq timer) to xapp", subs, trans, 12021, subDelRespPayload, len(subDelRespPayload))
 
                time.Sleep(3 * time.Second)
+
        }
 
-       xapp.Logger.Info("handleSubDelTimer: Deleting transaction record. SubId: %v, Xid: %s", subId, params.Xid)
-       transaction.Release()
-       if !c.registry.DelSubscription(subId) {
-               xapp.Logger.Error("handleSubDelTimer: Failed to release sequency number. SubId: %v, Xid: %s", subId, params.Xid)
+       xapp.Logger.Info("SubDelReq timeout: Deleting trans record. SubId: %v, Xid: %s", subs.GetSubId(), trans.GetXid())
+       trans.Release()
+       if !c.registry.DelSubscription(subs.GetSubId()) {
+               xapp.Logger.Error("SubDelReq timeout: Failed to release sequency number. SubId: %v, Xid: %s", subs.GetSubId(), trans.GetXid())
        }
        return
 }
index 155f92c..dffff77 100644 (file)
@@ -21,7 +21,6 @@ package control
 
 import (
        "encoding/json"
-       "errors"
        "fmt"
        "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_models"
        "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
@@ -71,22 +70,23 @@ type testingRmrControl struct {
        rmrClientTest *xapp.RMRClient
 }
 
-func (tc *testingRmrControl) RmrSend(params *xapp.RMRParams) (err error) {
+func (tc *testingRmrControl) RmrSend(params *RMRParams) (err error) {
        //
        //NOTE: Do this way until xapp-frame sending is improved
        //
+       xapp.Logger.Info("(%s) RmrSend %s", tc.desc, params.String())
        status := false
        i := 1
        for ; i <= 10 && status == false; i++ {
-               status = tc.rmrClientTest.SendMsg(params)
+               status = tc.rmrClientTest.SendMsg(params.RMRParams)
                if status == false {
-                       xapp.Logger.Info("rmr.Send() failed. Retry count %v, Mtype: %v, SubId: %v, Xid %s", i, params.Mtype, params.SubId, params.Xid)
+                       xapp.Logger.Info("(%s) RmrSend failed. Retry count %v, %s", tc.desc, i, params.String())
                        time.Sleep(500 * time.Millisecond)
                }
        }
        if status == false {
-               err = errors.New("rmr.Send() failed")
-               tc.rmrClientTest.Free(params.Mbuf)
+               err = fmt.Errorf("(%s) RmrSend failed. Retry count %v, %s", tc.desc, i, params.String())
+               xapp.Rmr.Free(params.Mbuf)
        }
        return
 }
@@ -105,12 +105,12 @@ func initTestingRmrControl(desc string, rtfile string, port string, stat string,
 //
 //-----------------------------------------------------------------------------
 type testingMessageChannel struct {
-       rmrConChan chan *xapp.RMRParams
+       rmrConChan chan *RMRParams
 }
 
 func initTestingMessageChannel() testingMessageChannel {
        mc := testingMessageChannel{}
-       mc.rmrConChan = make(chan *xapp.RMRParams)
+       mc.rmrConChan = make(chan *RMRParams)
        return mc
 }
 
@@ -148,13 +148,16 @@ func (tc *testingXappControl) newXappTransaction(xid *string, ranname string) *x
        return trans
 }
 
-func (tc *testingXappControl) Consume(msg *xapp.RMRParams) (err error) {
+func (tc *testingXappControl) Consume(params *xapp.RMRParams) (err error) {
+       xapp.Rmr.Free(params.Mbuf)
+       params.Mbuf = nil
+       msg := &RMRParams{params}
 
        if strings.Contains(msg.Xid, tc.desc) {
-               xapp.Logger.Info("(%s) Consume mtype=%s subid=%d xid=%s", tc.desc, xapp.RicMessageTypeToName[msg.Mtype], msg.SubId, msg.Xid)
+               xapp.Logger.Info("(%s) Consume %s", tc.desc, msg.String())
                tc.rmrConChan <- msg
        } else {
-               xapp.Logger.Info("(%s) Ignore mtype=%s subid=%d xid=%s, Expected xid to contain %s", tc.desc, xapp.RicMessageTypeToName[msg.Mtype], msg.SubId, msg.Xid, tc.desc)
+               xapp.Logger.Info("(%s) Ignore %s", tc.desc, msg.String())
        }
        return
 }
@@ -175,8 +178,11 @@ type testingE2termControl struct {
        testingMessageChannel
 }
 
-func (tc *testingE2termControl) Consume(msg *xapp.RMRParams) (err error) {
-       xapp.Logger.Info("(%s) Consume mtype=%s subid=%d xid=%s", tc.desc, xapp.RicMessageTypeToName[msg.Mtype], msg.SubId, msg.Xid)
+func (tc *testingE2termControl) Consume(params *xapp.RMRParams) (err error) {
+       xapp.Rmr.Free(params.Mbuf)
+       params.Mbuf = nil
+       msg := &RMRParams{params}
+       xapp.Logger.Info("(%s) Consume %s", tc.desc, msg.String())
        tc.rmrConChan <- msg
        return
 }
@@ -245,6 +251,29 @@ var mainCtrl *testingMainControl
 func TestMain(m *testing.M) {
        xapp.Logger.Info("TestMain start")
 
+       //---------------------------------
+       //
+       //---------------------------------
+       http_handler := func(w http.ResponseWriter, r *http.Request) {
+               var req rtmgr_models.XappSubscriptionData
+               err := json.NewDecoder(r.Body).Decode(&req)
+               if err != nil {
+                       xapp.Logger.Error("%s", err.Error())
+               }
+               xapp.Logger.Info("(http handler) handling Address=%s Port=%d SubscriptionID=%d", *req.Address, *req.Port, *req.SubscriptionID)
+
+               w.WriteHeader(200)
+       }
+
+       go func() {
+               http.HandleFunc("/", http_handler)
+               http.ListenAndServe("localhost:8989", nil)
+       }()
+
+       //---------------------------------
+       //
+       //---------------------------------
+
        //
        //Cfg creation won't work like this as xapp-frame reads it during init.
        //
@@ -282,28 +311,60 @@ func TestMain(m *testing.M) {
        xapp.Logger.Info("Using cfg file %s", os.Getenv("CFG_FILE"))
 
        //---------------------------------
+       // Static routetable for rmr
+       //
        // NOTE: Routing table is configured so, that responses
        //       are duplicated to xapp1 and xapp2 instances.
        //       If XID is not matching xapp stub will just
        //       drop message. (Messages 12011, 12012, 12021, 12022)
+       //
+       // 14560 submgr
+       // 15560 e2term stub
+       // 13560 xapp1 stub
+       // 13660 xapp2 stub
+       //
        //---------------------------------
-       xapp.Logger.Info("### submgr main run ###")
 
-       subsrt := `newrt|start
+       allrt := `newrt|start
 mse|12010|-1|localhost:14560
 mse|12010,localhost:14560|-1|localhost:15560
 mse|12011,localhost:15560|-1|localhost:14560
-mse|12011|-1|localhost:13560;localhost:13660
 mse|12012,localhost:15560|-1|localhost:14560
-mse|12012|-1|localhost:13560;localhost:13660
+mse|12011,localhost:14560|-1|localhost:13660;localhost:13560
+mse|12012,localhost:14560|-1|localhost:13660;localhost:13560
 mse|12020|-1|localhost:14560
 mse|12020,localhost:14560|-1|localhost:15560
 mse|12021,localhost:15560|-1|localhost:14560
-mse|12021|-1|localhost:13560;localhost:13660
 mse|12022,localhost:15560|-1|localhost:14560
-mse|12022|-1|localhost:13560;localhost:13660
+mse|12021,localhost:14560|-1|localhost:13660;localhost:13560
+mse|12022,localhost:14560|-1|localhost:13660;localhost:13560
 newrt|end
 `
+
+       //---------------------------------
+       //
+       //---------------------------------
+       xapp.Logger.Info("### submgr main run ###")
+
+       subsrt := allrt
+       /*
+               subsrt := `newrt|start
+          mse|12010|-1|localhost:14560
+          mse|12010,localhost:14560|-1|localhost:15560
+          mse|12011,localhost:15560|-1|localhost:14560
+          mse|12011|-1|localhost:13560;localhost:13660
+          mse|12012,localhost:15560|-1|localhost:14560
+          mse|12012|-1|localhost:13560;localhost:13660
+          mse|12020|-1|localhost:14560
+          mse|12020,localhost:14560|-1|localhost:15560
+          mse|12021,localhost:15560|-1|localhost:14560
+          mse|12021|-1|localhost:13560;localhost:13660
+          mse|12022,localhost:15560|-1|localhost:14560
+          mse|12022|-1|localhost:13560;localhost:13660
+          newrt|end
+          `
+       */
+
        subrtfilename, _ := testCreateTmpFile(subsrt)
        defer os.Remove(subrtfilename)
        mainCtrl = createNewMainControl("main", subrtfilename, "14560")
@@ -313,15 +374,18 @@ newrt|end
        //---------------------------------
        xapp.Logger.Info("### xapp1 rmr run ###")
 
-       xapprt1 := `newrt|start
-mse|12010|-1|localhost:14560
-mse|12011|-1|localhost:13560
-mse|12012|-1|localhost:13560
-mse|12020|-1|localhost:14560
-mse|12021|-1|localhost:13560
-mse|12022|-1|localhost:13560
-newrt|end
-`
+       xapprt1 := allrt
+       /*
+               xapprt1 := `newrt|start
+          mse|12010|-1|localhost:14560
+          mse|12011|-1|localhost:13560
+          mse|12012|-1|localhost:13560
+          mse|12020|-1|localhost:14560
+          mse|12021|-1|localhost:13560
+          mse|12022|-1|localhost:13560
+          newrt|end
+          `
+       */
 
        xapprtfilename1, _ := testCreateTmpFile(xapprt1)
        defer os.Remove(xapprtfilename1)
@@ -333,15 +397,18 @@ newrt|end
 
        xapp.Logger.Info("### xapp2 rmr run ###")
 
-       xapprt2 := `newrt|start
-mse|12010|-1|localhost:14560
-mse|12011|-1|localhost:13660
-mse|12012|-1|localhost:13660
-mse|12020|-1|localhost:14560
-mse|12021|-1|localhost:13660
-mse|12022|-1|localhost:13660
-newrt|end
-`
+       xapprt2 := allrt
+       /*
+               xapprt2 := `newrt|start
+          mse|12010|-1|localhost:14560
+          mse|12011|-1|localhost:13660
+          mse|12012|-1|localhost:13660
+          mse|12020|-1|localhost:14560
+          mse|12021|-1|localhost:13660
+          mse|12022|-1|localhost:13660
+          newrt|end
+          `
+       */
 
        xapprtfilename2, _ := testCreateTmpFile(xapprt2)
        defer os.Remove(xapprtfilename2)
@@ -352,38 +419,28 @@ newrt|end
        //---------------------------------
        xapp.Logger.Info("### e2term rmr run ###")
 
-       e2termrt := `newrt|start
-mse|12010|-1|localhost:15560
-mse|12011|-1|localhost:14560
-mse|12012|-1|localhost:14560
-mse|12020|-1|localhost:15560
-mse|12021|-1|localhost:14560
-mse|12022|-1|localhost:14560
-newrt|end
-`
+       e2termrt := allrt
+       /*
+               e2termrt := `newrt|start
+          mse|12010|-1|localhost:15560
+          mse|12011|-1|localhost:14560
+          mse|12012|-1|localhost:14560
+          mse|12020|-1|localhost:15560
+          mse|12021|-1|localhost:14560
+          mse|12022|-1|localhost:14560
+          newrt|end
+          `
+       */
 
        e2termrtfilename, _ := testCreateTmpFile(e2termrt)
        defer os.Remove(e2termrtfilename)
        e2termConn = createNewE2termControl("e2termstub", e2termrtfilename, "15560", "RMRE2TERMSTUB")
 
        //---------------------------------
-       //
+       // Stupid sleep to try improve robustness
+       // due: http handler and rmr routes init delays
        //---------------------------------
-       http_handler := func(w http.ResponseWriter, r *http.Request) {
-               var req rtmgr_models.XappSubscriptionData
-               err := json.NewDecoder(r.Body).Decode(&req)
-               if err != nil {
-                       xapp.Logger.Error("%s", err.Error())
-               }
-               xapp.Logger.Info("(http handler) handling Address=%s Port=%d SubscriptionID=%d", *req.Address, *req.Port, *req.SubscriptionID)
-
-               w.WriteHeader(200)
-       }
-
-       go func() {
-               http.HandleFunc("/", http_handler)
-               http.ListenAndServe("localhost:8989", nil)
-       }()
+       <-time.After(2 * time.Second)
 
        //---------------------------------
        //
index 2becce3..1b50c8b 100644 (file)
@@ -37,7 +37,7 @@ var e2asnpacker e2ap.E2APPackerIf = e2ap_wrapper.NewAsn1E2Packer()
 //
 //-----------------------------------------------------------------------------
 func (xappConn *testingXappControl) handle_xapp_subs_req(t *testing.T, oldTrans *xappTransaction) *xappTransaction {
-       xapp.Logger.Info("handle_xapp_subs_req")
+       xapp.Logger.Info("(%s) handle_xapp_subs_req", xappConn.desc)
        e2SubsReq := e2asnpacker.NewPackerSubscriptionRequest()
 
        //---------------------------------
@@ -86,7 +86,7 @@ func (xappConn *testingXappControl) handle_xapp_subs_req(t *testing.T, oldTrans
                trans = xappConn.newXappTransaction(nil, "RAN_NAME_1")
        }
 
-       params := &xapp.RMRParams{}
+       params := &RMRParams{&xapp.RMRParams{}}
        params.Mtype = xapp.RIC_SUB_REQ
        params.SubId = -1
        params.Payload = packedMsg.Buf
@@ -106,7 +106,7 @@ func (xappConn *testingXappControl) handle_xapp_subs_req(t *testing.T, oldTrans
 //
 //-----------------------------------------------------------------------------
 func (xappConn *testingXappControl) handle_xapp_subs_resp(t *testing.T, trans *xappTransaction) int {
-       xapp.Logger.Info("handle_xapp_subs_resp")
+       xapp.Logger.Info("(%s) handle_xapp_subs_resp", xappConn.desc)
        e2SubsResp := e2asnpacker.NewPackerSubscriptionResponse()
        var e2SubsId int
 
@@ -149,7 +149,7 @@ func (xappConn *testingXappControl) handle_xapp_subs_resp(t *testing.T, trans *x
 //
 //-----------------------------------------------------------------------------
 func (xappConn *testingXappControl) handle_xapp_subs_fail(t *testing.T, trans *xappTransaction) int {
-       xapp.Logger.Info("handle_xapp_subs_fail")
+       xapp.Logger.Info("(%s) handle_xapp_subs_fail", xappConn.desc)
        e2SubsFail := e2asnpacker.NewPackerSubscriptionFailure()
        var e2SubsId int
 
@@ -192,7 +192,7 @@ func (xappConn *testingXappControl) handle_xapp_subs_fail(t *testing.T, trans *x
 //
 //-----------------------------------------------------------------------------
 func (xappConn *testingXappControl) handle_xapp_subs_del_req(t *testing.T, oldTrans *xappTransaction, e2SubsId int) *xappTransaction {
-       xapp.Logger.Info("handle_xapp_subs_del_req")
+       xapp.Logger.Info("(%s) handle_xapp_subs_del_req", xappConn.desc)
        e2SubsDelReq := e2asnpacker.NewPackerSubscriptionDeleteRequest()
 
        //---------------------------------
@@ -218,7 +218,7 @@ func (xappConn *testingXappControl) handle_xapp_subs_del_req(t *testing.T, oldTr
                trans = xappConn.newXappTransaction(nil, "RAN_NAME_1")
        }
 
-       params := &xapp.RMRParams{}
+       params := &RMRParams{&xapp.RMRParams{}}
        params.Mtype = xapp.RIC_SUB_DEL_REQ
        params.SubId = e2SubsId
        params.Payload = packedMsg.Buf
@@ -238,7 +238,7 @@ func (xappConn *testingXappControl) handle_xapp_subs_del_req(t *testing.T, oldTr
 //
 //-----------------------------------------------------------------------------
 func (xappConn *testingXappControl) handle_xapp_subs_del_resp(t *testing.T, trans *xappTransaction) {
-       xapp.Logger.Info("handle_xapp_subs_del_resp")
+       xapp.Logger.Info("(%s) handle_xapp_subs_del_resp", xappConn.desc)
        e2SubsDelResp := e2asnpacker.NewPackerSubscriptionDeleteResponse()
 
        //---------------------------------
@@ -274,8 +274,8 @@ func (xappConn *testingXappControl) handle_xapp_subs_del_resp(t *testing.T, tran
 //-----------------------------------------------------------------------------
 //
 //-----------------------------------------------------------------------------
-func (e2termConn *testingE2termControl) handle_e2term_subs_req(t *testing.T) (*e2ap.E2APSubscriptionRequest, *xapp.RMRParams) {
-       xapp.Logger.Info("handle_e2term_subs_req")
+func (e2termConn *testingE2termControl) handle_e2term_subs_req(t *testing.T) (*e2ap.E2APSubscriptionRequest, *RMRParams) {
+       xapp.Logger.Info("(%s) handle_e2term_subs_req", e2termConn.desc)
        e2SubsReq := e2asnpacker.NewPackerSubscriptionRequest()
 
        //---------------------------------
@@ -305,8 +305,8 @@ func (e2termConn *testingE2termControl) handle_e2term_subs_req(t *testing.T) (*e
        return nil, nil
 }
 
-func (e2termConn *testingE2termControl) handle_e2term_subs_resp(t *testing.T, req *e2ap.E2APSubscriptionRequest, msg *xapp.RMRParams) {
-       xapp.Logger.Info("handle_e2term_subs_resp")
+func (e2termConn *testingE2termControl) handle_e2term_subs_resp(t *testing.T, req *e2ap.E2APSubscriptionRequest, msg *RMRParams) {
+       xapp.Logger.Info("(%s) handle_e2term_subs_resp", e2termConn.desc)
        e2SubsResp := e2asnpacker.NewPackerSubscriptionResponse()
 
        //---------------------------------
@@ -340,7 +340,7 @@ func (e2termConn *testingE2termControl) handle_e2term_subs_resp(t *testing.T, re
                testError(t, "(%s) pack NOK %s", e2termConn.desc, packerr.Error())
        }
 
-       params := &xapp.RMRParams{}
+       params := &RMRParams{&xapp.RMRParams{}}
        params.Mtype = xapp.RIC_SUB_RESP
        params.SubId = msg.SubId
        params.Payload = packedMsg.Buf
@@ -357,8 +357,8 @@ func (e2termConn *testingE2termControl) handle_e2term_subs_resp(t *testing.T, re
 //-----------------------------------------------------------------------------
 //
 //-----------------------------------------------------------------------------
-func (e2termConn *testingE2termControl) handle_e2term_subs_fail(t *testing.T, req *e2ap.E2APSubscriptionRequest, msg *xapp.RMRParams) {
-       xapp.Logger.Info("handle_e2term_subs_fail")
+func (e2termConn *testingE2termControl) handle_e2term_subs_fail(t *testing.T, req *e2ap.E2APSubscriptionRequest, msg *RMRParams) {
+       xapp.Logger.Info("(%s) handle_e2term_subs_fail", e2termConn.desc)
        e2SubsFail := e2asnpacker.NewPackerSubscriptionFailure()
 
        //---------------------------------
@@ -385,7 +385,7 @@ func (e2termConn *testingE2termControl) handle_e2term_subs_fail(t *testing.T, re
                testError(t, "(%s) pack NOK %s", e2termConn.desc, packerr.Error())
        }
 
-       params := &xapp.RMRParams{}
+       params := &RMRParams{&xapp.RMRParams{}}
        params.Mtype = xapp.RIC_SUB_FAILURE
        params.SubId = msg.SubId
        params.Payload = packedMsg.Buf
@@ -402,8 +402,8 @@ func (e2termConn *testingE2termControl) handle_e2term_subs_fail(t *testing.T, re
 //-----------------------------------------------------------------------------
 //
 //-----------------------------------------------------------------------------
-func (e2termConn *testingE2termControl) handle_e2term_subs_del_req(t *testing.T) (*e2ap.E2APSubscriptionDeleteRequest, *xapp.RMRParams) {
-       xapp.Logger.Info("handle_e2term_subs_del_req")
+func (e2termConn *testingE2termControl) handle_e2term_subs_del_req(t *testing.T) (*e2ap.E2APSubscriptionDeleteRequest, *RMRParams) {
+       xapp.Logger.Info("(%s) handle_e2term_subs_del_req", e2termConn.desc)
        e2SubsDelReq := e2asnpacker.NewPackerSubscriptionDeleteRequest()
 
        //---------------------------------
@@ -441,8 +441,8 @@ func handle_e2term_recv_empty() bool {
        return true
 }
 
-func (e2termConn *testingE2termControl) handle_e2term_subs_del_resp(t *testing.T, req *e2ap.E2APSubscriptionDeleteRequest, msg *xapp.RMRParams) {
-       xapp.Logger.Info("handle_e2term_subs_del_resp")
+func (e2termConn *testingE2termControl) handle_e2term_subs_del_resp(t *testing.T, req *e2ap.E2APSubscriptionDeleteRequest, msg *RMRParams) {
+       xapp.Logger.Info("(%s) handle_e2term_subs_del_resp", e2termConn.desc)
        e2SubsDelResp := e2asnpacker.NewPackerSubscriptionDeleteResponse()
 
        //---------------------------------
@@ -462,7 +462,7 @@ func (e2termConn *testingE2termControl) handle_e2term_subs_del_resp(t *testing.T
                testError(t, "(%s) pack NOK %s", e2termConn.desc, packerr.Error())
        }
 
-       params := &xapp.RMRParams{}
+       params := &RMRParams{&xapp.RMRParams{}}
        params.Mtype = xapp.RIC_SUB_DEL_RESP
        params.SubId = msg.SubId
        params.Payload = packedMsg.Buf
@@ -479,8 +479,8 @@ func (e2termConn *testingE2termControl) handle_e2term_subs_del_resp(t *testing.T
 //-----------------------------------------------------------------------------
 //
 //-----------------------------------------------------------------------------
-func (e2termConn *testingE2termControl) handle_e2term_subs_del_fail(t *testing.T, req *e2ap.E2APSubscriptionDeleteRequest, msg *xapp.RMRParams) {
-       xapp.Logger.Info("handle_e2term_del_subs_fail")
+func (e2termConn *testingE2termControl) handle_e2term_subs_del_fail(t *testing.T, req *e2ap.E2APSubscriptionDeleteRequest, msg *RMRParams) {
+       xapp.Logger.Info("(%s) handle_e2term_del_subs_fail", e2termConn.desc)
        e2SubsDelFail := e2asnpacker.NewPackerSubscriptionDeleteFailure()
 
        //---------------------------------
@@ -502,7 +502,7 @@ func (e2termConn *testingE2termControl) handle_e2term_subs_del_fail(t *testing.T
                testError(t, "(%s) pack NOK %s", e2termConn.desc, packerr.Error())
        }
 
-       params := &xapp.RMRParams{}
+       params := &RMRParams{&xapp.RMRParams{}}
        params.Mtype = xapp.RIC_SUB_DEL_FAILURE
        params.SubId = msg.SubId
        params.Payload = packedMsg.Buf
index 9bbe3d4..6ed3d32 100644 (file)
@@ -45,6 +45,21 @@ func (s *Subscription) String() string {
        return strconv.FormatUint(uint64(s.Seq), 10) + "/" + s.RmrEndpoint.String() + "/" + s.Meid.RanName
 }
 
+func (s *Subscription) GetSubId() uint16 {
+       s.mutex.Lock()
+       defer s.mutex.Unlock()
+       return s.Seq
+}
+
+func (s *Subscription) GetMeid() *xapp.RMRMeid {
+       s.mutex.Lock()
+       defer s.mutex.Unlock()
+       if s.Meid != nil {
+               return s.Meid
+       }
+       return nil
+}
+
 func (s *Subscription) Confirmed() {
        s.mutex.Lock()
        defer s.mutex.Unlock()
index 087b781..869e8ca 100644 (file)
@@ -21,7 +21,6 @@ package control
 
 import (
        "fmt"
-       "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
        "sync"
 )
 
@@ -37,12 +36,13 @@ func (t *Tracker) Init() {
        t.transactionXappTable = make(map[TransactionXappKey]*Transaction)
 }
 
-func (t *Tracker) TrackTransaction(subs *Subscription, endpoint RmrEndpoint, params *xapp.RMRParams, respReceived bool, forwardRespToXapp bool) (*Transaction, error) {
+func (t *Tracker) TrackTransaction(endpoint RmrEndpoint, params *RMRParams, respReceived bool, forwardRespToXapp bool) (*Transaction, error) {
 
        trans := &Transaction{
                tracker:           nil,
                Subs:              nil,
                RmrEndpoint:       endpoint,
+               Mtype:             params.Mtype,
                Xid:               params.Xid,
                OrigParams:        params,
                RespReceived:      respReceived,
@@ -58,10 +58,6 @@ func (t *Tracker) TrackTransaction(subs *Subscription, endpoint RmrEndpoint, par
                return nil, err
        }
 
-       err := subs.SetTransaction(trans)
-       if err != nil {
-               return nil, err
-       }
        trans.tracker = t
        t.transactionXappTable[xappkey] = trans
        return trans, nil
index f686b44..2f4acab 100644 (file)
@@ -20,7 +20,6 @@
 package control
 
 import (
-       "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
        "strconv"
        "sync"
 )
@@ -45,8 +44,9 @@ type Transaction struct {
        tracker           *Tracker // tracker instance
        Subs              *Subscription
        RmrEndpoint       RmrEndpoint
-       Xid               string          // xapp xid in req
-       OrigParams        *xapp.RMRParams // request orginal params
+       Mtype             int
+       Xid               string     // xapp xid in req
+       OrigParams        *RMRParams // request orginal params
        RespReceived      bool
        ForwardRespToXapp bool
 }
@@ -61,6 +61,24 @@ func (t *Transaction) String() string {
        return subId + "/" + t.RmrEndpoint.String() + "/" + t.Xid
 }
 
+func (t *Transaction) GetXid() string {
+       t.mutex.Lock()
+       defer t.mutex.Unlock()
+       return t.Xid
+}
+
+func (t *Transaction) GetMtype() int {
+       t.mutex.Lock()
+       defer t.mutex.Unlock()
+       return t.Mtype
+}
+
+func (t *Transaction) GetSrc() string {
+       t.mutex.Lock()
+       defer t.mutex.Unlock()
+       return t.RmrEndpoint.String()
+}
+
 func (t *Transaction) CheckResponseReceived() bool {
        t.mutex.Lock()
        defer t.mutex.Unlock()
index 64fd15f..22c44b3 100644 (file)
 package control
 
 import (
+       "bytes"
+       "fmt"
+       "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
        "strconv"
 )
 
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
 type RmrDatagram struct {
        MessageType    int
        SubscriptionId uint16
        Payload        []byte
 }
 
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
 type SubRouteInfo struct {
        Command Action
        Address string
@@ -36,6 +45,9 @@ type SubRouteInfo struct {
        SubID   uint16
 }
 
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
 type RmrEndpoint struct {
        Addr string // xapp addr
        Port uint16 // xapp port
@@ -45,4 +57,34 @@ func (endpoint RmrEndpoint) String() string {
        return endpoint.Addr + ":" + strconv.FormatUint(uint64(endpoint.Port), 10)
 }
 
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
 type Action int
+
+func (act Action) String() string {
+       actions := [...]string{
+               "CREATE",
+               "MERGE",
+               "NONE",
+               "DELETE",
+       }
+
+       if act < CREATE || act > DELETE {
+               return "UNKNOWN"
+       }
+       return actions[act]
+}
+
+//-----------------------------------------------------------------------------
+// To add own method for rmrparams
+//-----------------------------------------------------------------------------
+type RMRParams struct {
+       *xapp.RMRParams
+}
+
+func (params *RMRParams) String() string {
+       var b bytes.Buffer
+       fmt.Fprintf(&b, "Src: %s, Mtype: %s(%d), SubId: %v, Xid: %s, Meid: %v", params.Src, xapp.RicMessageTypeToName[params.Mtype], params.Mtype, params.SubId, params.Xid, params.Meid)
+       return b.String()
+}
diff --git a/pkg/control/types_test.go b/pkg/control/types_test.go
new file mode 100644 (file)
index 0000000..f29b3db
--- /dev/null
@@ -0,0 +1,42 @@
+/*
+==================================================================================
+  Copyright (c) 2019 AT&T Intellectual Property.
+  Copyright (c) 2019 Nokia
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+
+package control
+
+import (
+       "testing"
+)
+
+func TestAction(t *testing.T) {
+
+       testActionString := func(t *testing.T, val int, str string) {
+               if Action(val).String() != str {
+                       testError(t, "String for value %d expected %s got %s", val, str, Action(val).String())
+               }
+       }
+
+       testActionString(t, 0, "CREATE")
+       testActionString(t, 1, "MERGE")
+       testActionString(t, 2, "NONE")
+       testActionString(t, 3, "DELETE")
+       testActionString(t, 5, "UNKNOWN")
+       testActionString(t, 6, "UNKNOWN")
+       testActionString(t, 7, "UNKNOWN")
+       testActionString(t, 10, "UNKNOWN")
+}