RICPLT-2979 SubReq go asn into use 17/2217/2
authorJuha Hyttinen <juha.hyttinen@nokia.com>
Tue, 14 Jan 2020 10:49:09 +0000 (12:49 +0200)
committerJuha Hyttinen <juha.hyttinen@nokia.com>
Tue, 14 Jan 2020 12:09:49 +0000 (14:09 +0200)
Change-Id: I9a47ff45543fbb87172746449cc463bdf7d37ce8
Signed-off-by: Juha Hyttinen <juha.hyttinen@nokia.com>
pkg/control/control.go
pkg/control/e2ap.go
pkg/control/main_test.go
pkg/control/registry.go
pkg/control/tracker.go
pkg/control/transaction.go

index 18eeb4c..cfe3dfb 100755 (executable)
@@ -21,7 +21,7 @@ package control
 
 import (
        "fmt"
-       //"gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/packer"
+       "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
        rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
        rtmgrhandle "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client/handle"
        "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
@@ -144,7 +144,7 @@ func (c *Control) rmrSend(desc string, subs *Subscription, trans *Transaction, p
        params := &RMRParams{&xapp.RMRParams{}}
        params.Mtype = trans.GetMtype()
        params.SubId = int(subs.GetSubId())
-       params.Xid = trans.GetXid()
+       params.Xid = ""
        params.Meid = subs.GetMeid()
        params.Src = ""
        params.PayloadLen = payloadLen
@@ -159,7 +159,7 @@ func (c *Control) rmrReplyToSender(desc string, subs *Subscription, trans *Trans
        params.Mtype = mType
        params.SubId = int(subs.GetSubId())
        params.Xid = trans.GetXid()
-       params.Meid = subs.GetMeid()
+       params.Meid = trans.GetMeid()
        params.Src = ""
        params.PayloadLen = payloadLen
        params.Payload = payload
@@ -171,11 +171,8 @@ func (c *Control) rmrReplyToSender(desc string, subs *Subscription, trans *Trans
 func (c *Control) Consume(params *xapp.RMRParams) (err error) {
        xapp.Rmr.Free(params.Mbuf)
        params.Mbuf = nil
-
        msg := &RMRParams{params}
-
        c.msgCounter++
-
        switch msg.Mtype {
        case xapp.RICMessageTypes["RIC_SUB_REQ"]:
                go c.handleSubscriptionRequest(msg)
@@ -192,88 +189,79 @@ func (c *Control) Consume(params *xapp.RMRParams) (err error) {
        default:
                xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
        }
+
        return nil
 }
 
 func (c *Control) handleSubscriptionRequest(params *RMRParams) {
        xapp.Logger.Info("SubReq from xapp: %s", params.String())
 
-       srcAddr, srcPort, err := c.rtmgrClient.SplitSource(params.Src)
-       if err != nil {
-               xapp.Logger.Error("SubReq: Failed to update routing-manager. Dropping this msg. Err: %s, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
-               return
-       }
+       //
+       //
+       //
+       trans, err := c.tracker.TrackTransaction(NewRmrEndpoint(params.Src),
+               params.Mtype,
+               params.Xid,
+               params.Meid,
+               false,
+               true)
 
-       subs, err := c.registry.ReserveSubscription(RmrEndpoint{*srcAddr, *srcPort}, params.Meid)
        if err != nil {
-               xapp.Logger.Error("SubReq: %s, Dropping this msg.", err.Error())
+               xapp.Logger.Error("SubReq: %s, Dropping this msg. %s", err.Error(), params.String())
                return
        }
 
        //
-       // WIP RICPLT-2979
        //
-       /*
-               e2SubReq := packerif.NewPackerSubscriptionRequest()
-               packedData := &packer.PackedData{}
-               packedData.Buf = params.Payload
-               err = e2SubReq.UnPack(packedData)
-               if err != nil {
-                       xapp.Logger.Error("SubReq: UnPack() failed: %s", err.Error())
-               }
-               getErr, subReq := e2SubReq.Get()
-               if getErr != nil {
-                       xapp.Logger.Error("SubReq: Get() failed: %s", err.Error())
-               }
-
-
-               subReq.RequestId.Seq = uint32(subs.GetSubId())
-
-               err = e2SubReq.Set(subReq)
-               if err != nil {
-                       xapp.Logger.Error("SubReq: Set() failed: %s", err.Error())
-                       return
-               }
-               err, packedData = e2SubReq.Pack(nil)
-               if err != nil {
-                       xapp.Logger.Error("SubReq: Pack() failed: %s", err.Error())
-                       return
-               }
+       //
+       trans.SubReqMsg, err = c.e2ap.UnpackSubscriptionRequest(params.Payload)
+       if err != nil {
+               xapp.Logger.Error("SubReq: %s Dropping this msg. %s", err.Error(), trans)
+               trans.Release()
+               return
+       }
 
-               params.PayloadLen = len(packedData.Buf)
-               params.Payload = packedData.Buf
-       */
        //
        //
        //
-
-       params.SubId = int(subs.GetSubId())
-       err = c.e2ap.SetSubscriptionRequestSequenceNumber(params.Payload, subs.GetSubId())
+       subs, err := c.registry.ReserveSubscription(&trans.RmrEndpoint, trans.Meid)
        if err != nil {
-               xapp.Logger.Error("SubReq: Unable to get Sequence Number from Payload. Dropping this msg. Err: %v, %s", err, params.String())
-               c.registry.DelSubscription(subs.Seq)
+               xapp.Logger.Error("SubReq: %s, Dropping this msg. %s", err.Error(), trans)
+               trans.Release()
                return
        }
 
-       // Create transatcion record for every subscription request
-       var forwardRespToXapp bool = true
-       var responseReceived bool = false
-       trans, err := c.tracker.TrackTransaction(RmrEndpoint{*srcAddr, *srcPort}, params, responseReceived, forwardRespToXapp)
+       err = subs.SetTransaction(trans)
        if err != nil {
-               xapp.Logger.Error("SubReq: %s, Dropping this msg.", err.Error())
+               xapp.Logger.Error("SubReq: %s, Dropping this msg. %s", err.Error(), trans)
                c.registry.DelSubscription(subs.Seq)
+               trans.Release()
                return
        }
 
-       err = subs.SetTransaction(trans)
+       trans.SubReqMsg.RequestId.Seq = uint32(subs.GetSubId())
+
+       //
+       // TODO: subscription create is in fact owned by subscription and not transaction.
+       //       Transaction is toward xapp while Subscription is toward ran.
+       //       In merge several xapps may wake transactions, while only one subscription
+       //       toward ran occurs -> subscription owns subscription creation toward ran
+       //
+       //       This is intermediate solution while improving message handling
+       //
+       packedData, err := c.e2ap.PackSubscriptionRequest(trans.SubReqMsg)
        if err != nil {
-               xapp.Logger.Error("SubReq: %s, Dropping this msg.", err.Error())
+               xapp.Logger.Error("SubReq: %s for trans %s", err.Error(), trans)
                c.registry.DelSubscription(subs.Seq)
                trans.Release()
                return
        }
 
-       c.rmrSend("SubReq to E2T", subs, trans, params.Payload, params.PayloadLen)
+       //Optimize and store packed message to be sent (for retransmission). Again owned by subscription?
+       trans.Payload = packedData.Buf
+       trans.PayloadLen = len(packedData.Buf)
+
+       c.rmrSend("SubReq to E2T", subs, trans, packedData.Buf, len(packedData.Buf))
 
        c.timerMap.StartTimer("RIC_SUB_REQ", int(subs.Seq), subReqTime, FirstTry, c.handleSubscriptionRequestTimer)
        xapp.Logger.Debug("SubReq: Debugging trans table = %v", c.tracker.transactionXappTable)
@@ -384,100 +372,109 @@ func (c *Control) handleSubscriptionRequestTimer(strId string, nbrId int, tryCou
        }
 
        if tryCount < maxSubReqTryCount {
-               xapp.Logger.Info("SubReq timeout: Resending SubReq to E2T: Mtype: %v, SubId: %v, Xid %s, Meid %v", trans.OrigParams.Mtype, subs.GetSubId(), trans.GetXid(), subs.GetMeid())
+               xapp.Logger.Info("SubReq timeout: Resending SubReq to E2T: Mtype: %v, SubId: %v, Xid %s, Meid %v", trans.GetMtype(), subs.GetSubId(), trans.GetXid(), trans.GetMeid())
 
                trans.RetryTransaction()
 
-               c.rmrSend("SubReq(SubReq timer) to E2T", subs, trans, trans.OrigParams.Payload, trans.OrigParams.PayloadLen)
+               c.rmrSend("SubReq(SubReq timer) to E2T", subs, trans, trans.Payload, trans.PayloadLen)
 
                tryCount++
                c.timerMap.StartTimer("RIC_SUB_REQ", int(subs.GetSubId()), subReqTime, tryCount, c.handleSubscriptionRequestTimer)
                return
        }
 
-       var subDelReqPayload []byte
-       subDelReqPayload, err := c.e2ap.PackSubscriptionDeleteRequest(trans.OrigParams.Payload, subs.GetSubId())
-       if err != nil {
-               xapp.Logger.Error("SubReq timeout: Packing SubDelReq failed. Err: %v", err)
-               return
-       }
-
-       // Cancel failed subscription
-       params := &RMRParams{&xapp.RMRParams{}}
-       params.Mtype = 12020 // RIC SUBSCRIPTION DELETE
-       params.SubId = int(subs.GetSubId())
-       params.Xid = trans.GetXid()
-       params.Meid = subs.GetMeid()
-       params.Src = trans.OrigParams.Src
-       params.PayloadLen = len(subDelReqPayload)
-       params.Payload = subDelReqPayload
-       params.Mbuf = nil
-
        // Delete CREATE transaction
        trans.Release()
 
        // Create DELETE transaction (internal and no messages toward xapp)
-       var forwardRespToXapp bool = false
-       var respReceived bool = false
-       deltrans, err := c.tracker.TrackTransaction(trans.RmrEndpoint, params, respReceived, forwardRespToXapp)
+       deltrans, err := c.tracker.TrackTransaction(&trans.RmrEndpoint,
+               12020, // RIC SUBSCRIPTION DELETE
+               trans.GetXid(),
+               trans.GetMeid(),
+               false,
+               false)
+
        if err != nil {
                xapp.Logger.Error("SubReq timeout: %s, Dropping this msg.", err.Error())
+               //TODO improve error handling. Important at least in merge
+               c.registry.DelSubscription(subs.GetSubId())
                return
        }
 
+       deltrans.SubDelReqMsg = &e2ap.E2APSubscriptionDeleteRequest{}
+       deltrans.SubDelReqMsg.RequestId.Id = trans.SubReqMsg.RequestId.Id
+       deltrans.SubDelReqMsg.RequestId.Seq = uint32(subs.GetSubId())
+       deltrans.SubDelReqMsg.FunctionId = trans.SubReqMsg.FunctionId
+       packedData, err := c.e2ap.PackSubscriptionDeleteRequest(deltrans.SubDelReqMsg)
+       if err != nil {
+               xapp.Logger.Error("SubReq timeout: Packing SubDelReq failed. Err: %v", err)
+               //TODO improve error handling. Important at least in merge
+               deltrans.Release()
+               c.registry.DelSubscription(subs.GetSubId())
+               return
+       }
+       deltrans.PayloadLen = len(packedData.Buf)
+       deltrans.Payload = packedData.Buf
+
        err = subs.SetTransaction(deltrans)
        if err != nil {
                xapp.Logger.Error("SubReq timeout: %s, Dropping this msg.", err.Error())
+               //TODO improve error handling. Important at least in merge
                deltrans.Release()
                return
        }
 
-       c.rmrSend("SubDelReq(SubReq timer) to E2T", subs, deltrans, deltrans.OrigParams.Payload, deltrans.OrigParams.PayloadLen)
+       c.rmrSend("SubDelReq(SubReq timer) to E2T", subs, deltrans, deltrans.Payload, deltrans.PayloadLen)
 
        c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()), subDelReqTime, FirstTry, c.handleSubscriptionDeleteRequestTimer)
        return
 }
 
 func (c *Control) handleSubscriptionDeleteRequest(params *RMRParams) {
+       var subs *Subscription
+
        xapp.Logger.Info("SubDelReq from xapp: %s", params.String())
 
-       srcAddr, srcPort, err := c.rtmgrClient.SplitSource(params.Src)
+       trans, err := c.tracker.TrackTransaction(NewRmrEndpoint(params.Src),
+               params.Mtype,
+               params.Xid,
+               params.Meid,
+               false,
+               true)
+
        if err != nil {
-               xapp.Logger.Error("SubDelReq: Failed to update routing-manager. Dropping this msg. Err: %s, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
+               xapp.Logger.Error("SubDelReq: %s, Dropping this msg. %s", err.Error(), params.String())
                return
        }
 
        payloadSeqNum, err := c.e2ap.GetSubscriptionDeleteRequestSequenceNumber(params.Payload)
-       if err != nil {
-               xapp.Logger.Error("SubDelReq: Unable to get Sequence Number from Payload. Dropping this msg. Err: %v, SubId: %v, Xid: %s, Payload %X", err, params.SubId, params.Xid, params.Payload)
-               return
+       if err == nil {
+               subs = c.registry.GetSubscription(payloadSeqNum)
+       }
+       if subs == nil && params.SubId > 0 {
+               subs = c.registry.GetSubscription(uint16(params.SubId))
        }
-       xapp.Logger.Info("SubDelReq: Received payloadSeqNum: %v", payloadSeqNum)
 
-       subs := c.registry.GetSubscription(payloadSeqNum)
        if subs == nil {
-               xapp.Logger.Error("SubDelReq: Not valid sequence number. Dropping this msg. SubId: %v, Xid: %s", params.SubId, params.Xid)
+               xapp.Logger.Error("SubDelReq: Not valid subscription found payloadSeqNum: %d. Dropping this msg. %s", payloadSeqNum, trans)
+               trans.Release()
                return
        }
+       xapp.Logger.Info("SubDelReq: subscription found payloadSeqNum: %d. %s", payloadSeqNum, trans)
 
-       var forwardRespToXapp bool = true
-       var respReceived bool = false
-       trans, err := c.tracker.TrackTransaction(RmrEndpoint{*srcAddr, *srcPort}, params, respReceived, forwardRespToXapp)
-       if err != nil {
-               xapp.Logger.Error("SubDelReq: %s, Dropping this msg.", err.Error())
-               return
-       }
+       trans.PayloadLen = params.PayloadLen
+       trans.Payload = params.Payload
 
        err = subs.SetTransaction(trans)
        if err != nil {
-               xapp.Logger.Error("SubDelReq: %s, Dropping this msg.", err.Error())
+               xapp.Logger.Error("SubDelReq: %s, Dropping this msg. %s", err.Error(), trans)
                trans.Release()
                return
        }
 
        subs.UnConfirmed()
 
-       c.rmrSend("SubDelReq to E2T", subs, trans, trans.OrigParams.Payload, trans.OrigParams.PayloadLen)
+       c.rmrSend("SubDelReq to E2T", subs, trans, trans.Payload, trans.PayloadLen)
 
        c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()), subDelReqTime, FirstTry, c.handleSubscriptionDeleteRequestTimer)
        return
@@ -559,7 +556,7 @@ func (c *Control) handleSubscriptionDeleteFailure(params *RMRParams) {
        }
        if trans.ForwardRespToXapp == true {
                var subDelRespPayload []byte
-               subDelRespPayload, err = c.e2ap.PackSubscriptionDeleteResponse(trans.OrigParams.Payload, subs.GetSubId())
+               subDelRespPayload, err = c.e2ap.PackSubscriptionDeleteResponseFromSubDelReq(trans.Payload, subs.GetSubId())
                if err != nil {
                        xapp.Logger.Error("SubDelFail:Packing SubDelResp failed. Err: %v", err)
                        return
@@ -601,12 +598,12 @@ func (c *Control) handleSubscriptionDeleteRequestTimer(strId string, nbrId int,
        }
 
        if tryCount < maxSubDelReqTryCount {
-               xapp.Logger.Info("SubDelReq timeout: Resending SubDelReq to E2T: Mtype: %v, SubId: %v, Xid %s, Meid %v", trans.OrigParams.Mtype, subs.GetSubId(), trans.GetXid(), subs.GetMeid())
+               xapp.Logger.Info("SubDelReq timeout: Resending SubDelReq to E2T: Mtype: %v, SubId: %v, Xid %s, Meid %v", trans.GetMtype(), subs.GetSubId(), trans.GetXid(), trans.GetMeid())
                // Set possible to handle new response for the subId
 
                trans.RetryTransaction()
 
-               c.rmrSend("SubDelReq(SubDelReq timer) to E2T", subs, trans, trans.OrigParams.Payload, trans.OrigParams.PayloadLen)
+               c.rmrSend("SubDelReq(SubDelReq timer) to E2T", subs, trans, trans.Payload, trans.PayloadLen)
 
                tryCount++
                c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()), subReqTime, tryCount, c.handleSubscriptionDeleteRequestTimer)
@@ -615,9 +612,9 @@ func (c *Control) handleSubscriptionDeleteRequestTimer(strId string, nbrId int,
 
        if trans.ForwardRespToXapp == true {
                var subDelRespPayload []byte
-               subDelRespPayload, err := c.e2ap.PackSubscriptionDeleteResponse(trans.OrigParams.Payload, subs.GetSubId())
+               subDelRespPayload, err := c.e2ap.PackSubscriptionDeleteResponseFromSubDelReq(trans.Payload, subs.GetSubId())
                if err != nil {
-                       xapp.Logger.Error("SubDelReq timeout: Unable to pack payload. Dropping this this msg. Err: %v, SubId: %v, Xid: %s, Payload %x", err, subs.GetSubId(), trans.GetXid(), trans.OrigParams.Payload)
+                       xapp.Logger.Error("SubDelReq timeout: Unable to pack payload. Dropping this this msg. Err: %v, SubId: %v, Xid: %s, Payload %x", err, subs.GetSubId(), trans.GetXid(), trans.Payload)
                        return
                }
 
index 084c7eb..5cabe6e 100644 (file)
@@ -41,27 +41,6 @@ type E2ap struct {
 
 /* RICsubscriptionRequest */
 
-// Used by e2t test stub
-func (c *E2ap) GetSubscriptionRequestSequenceNumber(payload []byte) (subId uint16, err error) {
-       cptr := unsafe.Pointer(&payload[0])
-       cret := C.e2ap_get_ric_subscription_request_sequence_number(cptr, C.size_t(len(payload)))
-       if cret < 0 {
-               return 0, fmt.Errorf("e2ap wrapper is unable to get Subscirption Request Sequence Number due to wrong or invalid payload. Erroxappde: %v", cret)
-       }
-       subId = uint16(cret)
-       return
-}
-
-// Used by submgr, xapp test stub
-func (c *E2ap) SetSubscriptionRequestSequenceNumber(payload []byte, newSubscriptionid uint16) (err error) {
-       cptr := unsafe.Pointer(&payload[0])
-       size := C.e2ap_set_ric_subscription_request_sequence_number(cptr, C.size_t(len(payload)), C.long(newSubscriptionid))
-       if size < 0 {
-               return fmt.Errorf("e2ap wrapper is unable to set Subscription Request Sequence Number due to wrong or invalid payload. Erroxappde: %v", size)
-       }
-       return
-}
-
 // Used by submgr, xapp test stub
 func (c *E2ap) GetSubscriptionResponseSequenceNumber(payload []byte) (subId uint16, err error) {
        cptr := unsafe.Pointer(&payload[0])
@@ -176,61 +155,96 @@ func (c *E2ap) SetSubscriptionDeleteFailureSequenceNumber(payload []byte, newSub
 }
 
 // Used by submgr
-func (c *E2ap) PackSubscriptionDeleteResponse(payload []byte, newSubscriptionid uint16) (newPayload []byte, err error) {
-       e2SubDelReq := packerif.NewPackerSubscriptionDeleteRequest()
-       packedData := &packer.PackedData{}
-       packedData.Buf = payload
-       err = e2SubDelReq.UnPack(packedData)
+func (c *E2ap) PackSubscriptionDeleteResponseFromSubDelReq(payload []byte, newSubscriptionid uint16) (newPayload []byte, err error) {
+
+       subDelReq, err := c.UnpackSubscriptionDeleteRequest(payload)
        if err != nil {
-               return make([]byte, 0), fmt.Errorf("PackSubDelResp: UnPack() failed: %s", err.Error())
-       }
-       getErr, subDelReq := e2SubDelReq.Get()
-       if getErr != nil {
-               return make([]byte, 0), fmt.Errorf("PackSubDelResp: Get() failed: %s", getErr.Error())
+               return make([]byte, 0), fmt.Errorf("PackSubDelRespFromSubDelReq: SubDelReq unpack failed: %s", err.Error())
        }
 
-       e2SubDelResp := packerif.NewPackerSubscriptionDeleteResponse()
-       subDelResp := e2ap.E2APSubscriptionDeleteResponse{}
+       subDelResp := &e2ap.E2APSubscriptionDeleteResponse{}
        subDelResp.RequestId.Id = subDelReq.RequestId.Id
        subDelResp.RequestId.Seq = uint32(newSubscriptionid)
        subDelResp.FunctionId = subDelReq.FunctionId
-       err = e2SubDelResp.Set(&subDelResp)
+
+       packedData, err := c.PackSubscriptionDeleteResponse(subDelResp)
+       if err != nil {
+               return make([]byte, 0), fmt.Errorf("PackSubDelRespFromSubDelReq: SubDelResp pack failed: %s", err.Error())
+       }
+       return packedData.Buf, nil
+}
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+func (c *E2ap) UnpackSubscriptionRequest(payload []byte) (*e2ap.E2APSubscriptionRequest, error) {
+       e2SubReq := packerif.NewPackerSubscriptionRequest()
+       packedData := &packer.PackedData{}
+       packedData.Buf = payload
+       err := e2SubReq.UnPack(packedData)
        if err != nil {
-               return make([]byte, 0), fmt.Errorf("PackSubDelResp: Set() failed: %s", err.Error())
+               return nil, err
        }
-       err, packedData = e2SubDelResp.Pack(nil)
+       err, subReq := e2SubReq.Get()
        if err != nil {
-               return make([]byte, 0), fmt.Errorf("PackSubDelResp: Pack() failed: %s", err.Error())
+               return nil, err
        }
-       return packedData.Buf, nil
+       return subReq, nil
 }
 
-// Used by submgr
-func (c *E2ap) PackSubscriptionDeleteRequest(payload []byte, newSubscriptionid uint16) (newPayload []byte, err error) {
+func (c *E2ap) PackSubscriptionRequest(req *e2ap.E2APSubscriptionRequest) (*packer.PackedData, error) {
        e2SubReq := packerif.NewPackerSubscriptionRequest()
+       err := e2SubReq.Set(req)
+       if err != nil {
+               return nil, err
+       }
+       err, packedData := e2SubReq.Pack(nil)
+       if err != nil {
+               return nil, err
+       }
+       return packedData, nil
+}
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+func (c *E2ap) UnpackSubscriptionDeleteRequest(payload []byte) (*e2ap.E2APSubscriptionDeleteRequest, error) {
+       e2SubDelReq := packerif.NewPackerSubscriptionDeleteRequest()
        packedData := &packer.PackedData{}
        packedData.Buf = payload
-       err = e2SubReq.UnPack(packedData)
+       err := e2SubDelReq.UnPack(packedData)
        if err != nil {
-               return make([]byte, 0), fmt.Errorf("PackSubDelReq: UnPack() failed: %s", err.Error())
+               return nil, err
        }
-       getErr, subReq := e2SubReq.Get()
-       if getErr != nil {
-               return make([]byte, 0), fmt.Errorf("PackSubDelReq: Get() failed: %s", getErr.Error())
+       err, subReq := e2SubDelReq.Get()
+       if err != nil {
+               return nil, err
        }
+       return subReq, nil
+}
 
-       e2SubDel := packerif.NewPackerSubscriptionDeleteRequest()
-       subDelReq := e2ap.E2APSubscriptionDeleteRequest{}
-       subDelReq.RequestId.Id = subReq.RequestId.Id
-       subDelReq.RequestId.Seq = uint32(newSubscriptionid)
-       subDelReq.FunctionId = subReq.FunctionId
-       err = e2SubDel.Set(&subDelReq)
+func (c *E2ap) PackSubscriptionDeleteRequest(req *e2ap.E2APSubscriptionDeleteRequest) (*packer.PackedData, error) {
+       e2SubDelReq := packerif.NewPackerSubscriptionDeleteRequest()
+       err := e2SubDelReq.Set(req)
        if err != nil {
-               return make([]byte, 0), fmt.Errorf("PackSubDelReq: Set() failed: %s", err.Error())
+               return nil, err
        }
-       err, packedData = e2SubDel.Pack(nil)
+       err, packedData := e2SubDelReq.Pack(nil)
        if err != nil {
-               return make([]byte, 0), fmt.Errorf("PackSubDelReq: Pack() failed: %s", err.Error())
+               return nil, err
        }
-       return packedData.Buf, nil
+       return packedData, nil
+}
+
+func (c *E2ap) PackSubscriptionDeleteResponse(req *e2ap.E2APSubscriptionDeleteResponse) (*packer.PackedData, error) {
+       e2SubDelResp := packerif.NewPackerSubscriptionDeleteResponse()
+       err := e2SubDelResp.Set(req)
+       if err != nil {
+               return nil, err
+       }
+       err, packedData := e2SubDelResp.Pack(nil)
+       if err != nil {
+               return nil, err
+       }
+       return packedData, nil
 }
index dffff77..32ea09f 100644 (file)
@@ -68,6 +68,7 @@ func initTestingControl(desc string, rtfile string, port string) testingControl
 type testingRmrControl struct {
        testingControl
        rmrClientTest *xapp.RMRClient
+       active        bool
 }
 
 func (tc *testingRmrControl) RmrSend(params *RMRParams) (err error) {
@@ -93,6 +94,7 @@ func (tc *testingRmrControl) RmrSend(params *RMRParams) (err error) {
 
 func initTestingRmrControl(desc string, rtfile string, port string, stat string, consumer xapp.MessageConsumer) testingRmrControl {
        tc := testingRmrControl{}
+       tc.active = false
        tc.testingControl = initTestingControl(desc, rtfile, port)
        tc.rmrClientTest = xapp.NewRMRClientWithParams("tcp:"+port, 4096, 1, stat)
        tc.rmrClientTest.SetReadyCB(tc.ReadyCB, nil)
@@ -153,6 +155,12 @@ func (tc *testingXappControl) Consume(params *xapp.RMRParams) (err error) {
        params.Mbuf = nil
        msg := &RMRParams{params}
 
+       if params.Mtype == 55555 {
+               xapp.Logger.Info("(%s) Testing message ignore %s", tc.desc, msg.String())
+               tc.active = true
+               return
+       }
+
        if strings.Contains(msg.Xid, tc.desc) {
                xapp.Logger.Info("(%s) Consume %s", tc.desc, msg.String())
                tc.rmrConChan <- msg
@@ -182,6 +190,13 @@ func (tc *testingE2termControl) Consume(params *xapp.RMRParams) (err error) {
        xapp.Rmr.Free(params.Mbuf)
        params.Mbuf = nil
        msg := &RMRParams{params}
+
+       if params.Mtype == 55555 {
+               xapp.Logger.Info("(%s) Testing message ignore %s", tc.desc, msg.String())
+               tc.active = true
+               return
+       }
+
        xapp.Logger.Info("(%s) Consume %s", tc.desc, msg.String())
        tc.rmrConChan <- msg
        return
@@ -338,6 +353,7 @@ mse|12021,localhost:15560|-1|localhost:14560
 mse|12022,localhost:15560|-1|localhost:14560
 mse|12021,localhost:14560|-1|localhost:13660;localhost:13560
 mse|12022,localhost:14560|-1|localhost:13660;localhost:13560
+mse|55555|-1|localhost:13660;localhost:13560,localhost:15560
 newrt|end
 `
 
@@ -437,10 +453,36 @@ newrt|end
        e2termConn = createNewE2termControl("e2termstub", e2termrtfilename, "15560", "RMRE2TERMSTUB")
 
        //---------------------------------
-       // Stupid sleep to try improve robustness
-       // due: http handler and rmr routes init delays
+       // Testing message sending
        //---------------------------------
-       <-time.After(2 * time.Second)
+       var dummyBuf []byte = make([]byte, 100)
+
+       params := &RMRParams{&xapp.RMRParams{}}
+       params.Mtype = 55555
+       params.SubId = -1
+       params.Payload = dummyBuf
+       params.PayloadLen = 100
+       params.Meid = &xapp.RMRMeid{RanName: "NONEXISTINGRAN"}
+       params.Xid = "THISISTESTFORSTUBS"
+       params.Mbuf = nil
+
+       status := false
+       i := 1
+       for ; i <= 10 && status == false; i++ {
+               xapp.Rmr.Send(params.RMRParams, false)
+               if e2termConn.active == true && xappConn1.active == true && xappConn2.active == true {
+                       status = true
+                       break
+               } else {
+                       xapp.Logger.Info("Sleep 0.5 secs and try routes again")
+                       time.Sleep(500 * time.Millisecond)
+               }
+       }
+
+       if status == false {
+               xapp.Logger.Error("Could not initialize routes")
+               os.Exit(1)
+       }
 
        //---------------------------------
        //
index 2c5bd8c..0970a3a 100644 (file)
@@ -42,7 +42,7 @@ func (r *Registry) Initialize(seedsn uint16) {
 }
 
 // Reserves and returns the next free sequence number
-func (r *Registry) ReserveSubscription(endPoint RmrEndpoint, meid *xapp.RMRMeid) (*Subscription, error) {
+func (r *Registry) ReserveSubscription(endPoint *RmrEndpoint, meid *xapp.RMRMeid) (*Subscription, error) {
        // Check is current SequenceNumber valid
        // Allocate next SequenceNumber value and retry N times
        r.mutex.Lock()
@@ -60,7 +60,7 @@ func (r *Registry) ReserveSubscription(endPoint RmrEndpoint, meid *xapp.RMRMeid)
                        subs := &Subscription{
                                Seq:         sequenceNumber,
                                Active:      false,
-                               RmrEndpoint: endPoint,
+                               RmrEndpoint: *endPoint,
                                Meid:        meid,
                                Trans:       nil,
                        }
index 869e8ca..75127a7 100644 (file)
@@ -21,6 +21,7 @@ package control
 
 import (
        "fmt"
+       "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
        "sync"
 )
 
@@ -36,15 +37,26 @@ func (t *Tracker) Init() {
        t.transactionXappTable = make(map[TransactionXappKey]*Transaction)
 }
 
-func (t *Tracker) TrackTransaction(endpoint RmrEndpoint, params *RMRParams, respReceived bool, forwardRespToXapp bool) (*Transaction, error) {
+func (t *Tracker) TrackTransaction(
+       endpoint *RmrEndpoint,
+       mtype int,
+       xid string,
+       meid *xapp.RMRMeid,
+       respReceived bool,
+       forwardRespToXapp bool) (*Transaction, error) {
+
+       if endpoint == nil {
+               err := fmt.Errorf("Tracker: No valid endpoint given")
+               return nil, err
+       }
 
        trans := &Transaction{
                tracker:           nil,
                Subs:              nil,
-               RmrEndpoint:       endpoint,
-               Mtype:             params.Mtype,
-               Xid:               params.Xid,
-               OrigParams:        params,
+               RmrEndpoint:       *endpoint,
+               Mtype:             mtype,
+               Xid:               xid,
+               Meid:              meid,
                RespReceived:      respReceived,
                ForwardRespToXapp: forwardRespToXapp,
        }
@@ -52,7 +64,7 @@ func (t *Tracker) TrackTransaction(endpoint RmrEndpoint, params *RMRParams, resp
        t.mutex.Lock()
        defer t.mutex.Unlock()
 
-       xappkey := TransactionXappKey{endpoint, params.Xid}
+       xappkey := TransactionXappKey{*endpoint, xid}
        if _, ok := t.transactionXappTable[xappkey]; ok {
                err := fmt.Errorf("Tracker: Similar transaction with xappkey %s is ongoing, transaction %s not created ", xappkey, trans)
                return nil, err
index 2f4acab..40c9e4d 100644 (file)
@@ -20,6 +20,8 @@
 package control
 
 import (
+       "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
+       "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
        "strconv"
        "sync"
 )
@@ -41,12 +43,16 @@ func (key *TransactionXappKey) String() string {
 //-----------------------------------------------------------------------------
 type Transaction struct {
        mutex             sync.Mutex
-       tracker           *Tracker // tracker instance
-       Subs              *Subscription
-       RmrEndpoint       RmrEndpoint
-       Mtype             int
-       Xid               string     // xapp xid in req
-       OrigParams        *RMRParams // request orginal params
+       tracker           *Tracker                            //tracker instance
+       Subs              *Subscription                       //related subscription
+       RmrEndpoint       RmrEndpoint                         //xapp endpoint
+       Mtype             int                                 //type of initiating message
+       Xid               string                              //xapp xid in req
+       Meid              *xapp.RMRMeid                       //meid transaction related
+       SubReqMsg         *e2ap.E2APSubscriptionRequest       //SubReq TODO: maybe own transactions per type
+       SubDelReqMsg      *e2ap.E2APSubscriptionDeleteRequest //SubDelReq TODO: maybe own transactions per type
+       Payload           []byte                              //packed message to optimize retransmissions
+       PayloadLen        int                                 //packed message len to optimize  retransmissions
        RespReceived      bool
        ForwardRespToXapp bool
 }
@@ -73,6 +79,15 @@ func (t *Transaction) GetMtype() int {
        return t.Mtype
 }
 
+func (t *Transaction) GetMeid() *xapp.RMRMeid {
+       t.mutex.Lock()
+       defer t.mutex.Unlock()
+       if t.Meid != nil {
+               return t.Meid
+       }
+       return nil
+}
+
 func (t *Transaction) GetSrc() string {
        t.mutex.Lock()
        defer t.mutex.Unlock()