import (
"fmt"
- //"gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/packer"
+ "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
rtmgrhandle "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client/handle"
"gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
params := &RMRParams{&xapp.RMRParams{}}
params.Mtype = trans.GetMtype()
params.SubId = int(subs.GetSubId())
- params.Xid = trans.GetXid()
+ params.Xid = ""
params.Meid = subs.GetMeid()
params.Src = ""
params.PayloadLen = payloadLen
params.Mtype = mType
params.SubId = int(subs.GetSubId())
params.Xid = trans.GetXid()
- params.Meid = subs.GetMeid()
+ params.Meid = trans.GetMeid()
params.Src = ""
params.PayloadLen = payloadLen
params.Payload = payload
func (c *Control) Consume(params *xapp.RMRParams) (err error) {
xapp.Rmr.Free(params.Mbuf)
params.Mbuf = nil
-
msg := &RMRParams{params}
-
c.msgCounter++
-
switch msg.Mtype {
case xapp.RICMessageTypes["RIC_SUB_REQ"]:
go c.handleSubscriptionRequest(msg)
default:
xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
}
+
return nil
}
func (c *Control) handleSubscriptionRequest(params *RMRParams) {
xapp.Logger.Info("SubReq from xapp: %s", params.String())
- srcAddr, srcPort, err := c.rtmgrClient.SplitSource(params.Src)
- if err != nil {
- xapp.Logger.Error("SubReq: Failed to update routing-manager. Dropping this msg. Err: %s, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
- return
- }
+ //
+ //
+ //
+ trans, err := c.tracker.TrackTransaction(NewRmrEndpoint(params.Src),
+ params.Mtype,
+ params.Xid,
+ params.Meid,
+ false,
+ true)
- subs, err := c.registry.ReserveSubscription(RmrEndpoint{*srcAddr, *srcPort}, params.Meid)
if err != nil {
- xapp.Logger.Error("SubReq: %s, Dropping this msg.", err.Error())
+ xapp.Logger.Error("SubReq: %s, Dropping this msg. %s", err.Error(), params.String())
return
}
//
- // WIP RICPLT-2979
//
- /*
- e2SubReq := packerif.NewPackerSubscriptionRequest()
- packedData := &packer.PackedData{}
- packedData.Buf = params.Payload
- err = e2SubReq.UnPack(packedData)
- if err != nil {
- xapp.Logger.Error("SubReq: UnPack() failed: %s", err.Error())
- }
- getErr, subReq := e2SubReq.Get()
- if getErr != nil {
- xapp.Logger.Error("SubReq: Get() failed: %s", err.Error())
- }
-
-
- subReq.RequestId.Seq = uint32(subs.GetSubId())
-
- err = e2SubReq.Set(subReq)
- if err != nil {
- xapp.Logger.Error("SubReq: Set() failed: %s", err.Error())
- return
- }
- err, packedData = e2SubReq.Pack(nil)
- if err != nil {
- xapp.Logger.Error("SubReq: Pack() failed: %s", err.Error())
- return
- }
+ //
+ trans.SubReqMsg, err = c.e2ap.UnpackSubscriptionRequest(params.Payload)
+ if err != nil {
+ xapp.Logger.Error("SubReq: %s Dropping this msg. %s", err.Error(), trans)
+ trans.Release()
+ return
+ }
- params.PayloadLen = len(packedData.Buf)
- params.Payload = packedData.Buf
- */
//
//
//
-
- params.SubId = int(subs.GetSubId())
- err = c.e2ap.SetSubscriptionRequestSequenceNumber(params.Payload, subs.GetSubId())
+ subs, err := c.registry.ReserveSubscription(&trans.RmrEndpoint, trans.Meid)
if err != nil {
- xapp.Logger.Error("SubReq: Unable to get Sequence Number from Payload. Dropping this msg. Err: %v, %s", err, params.String())
- c.registry.DelSubscription(subs.Seq)
+ xapp.Logger.Error("SubReq: %s, Dropping this msg. %s", err.Error(), trans)
+ trans.Release()
return
}
- // Create transatcion record for every subscription request
- var forwardRespToXapp bool = true
- var responseReceived bool = false
- trans, err := c.tracker.TrackTransaction(RmrEndpoint{*srcAddr, *srcPort}, params, responseReceived, forwardRespToXapp)
+ err = subs.SetTransaction(trans)
if err != nil {
- xapp.Logger.Error("SubReq: %s, Dropping this msg.", err.Error())
+ xapp.Logger.Error("SubReq: %s, Dropping this msg. %s", err.Error(), trans)
c.registry.DelSubscription(subs.Seq)
+ trans.Release()
return
}
- err = subs.SetTransaction(trans)
+ trans.SubReqMsg.RequestId.Seq = uint32(subs.GetSubId())
+
+ //
+ // TODO: subscription create is in fact owned by subscription and not transaction.
+ // Transaction is toward xapp while Subscription is toward ran.
+ // In merge several xapps may wake transactions, while only one subscription
+ // toward ran occurs -> subscription owns subscription creation toward ran
+ //
+ // This is intermediate solution while improving message handling
+ //
+ packedData, err := c.e2ap.PackSubscriptionRequest(trans.SubReqMsg)
if err != nil {
- xapp.Logger.Error("SubReq: %s, Dropping this msg.", err.Error())
+ xapp.Logger.Error("SubReq: %s for trans %s", err.Error(), trans)
c.registry.DelSubscription(subs.Seq)
trans.Release()
return
}
- c.rmrSend("SubReq to E2T", subs, trans, params.Payload, params.PayloadLen)
+ //Optimize and store packed message to be sent (for retransmission). Again owned by subscription?
+ trans.Payload = packedData.Buf
+ trans.PayloadLen = len(packedData.Buf)
+
+ c.rmrSend("SubReq to E2T", subs, trans, packedData.Buf, len(packedData.Buf))
c.timerMap.StartTimer("RIC_SUB_REQ", int(subs.Seq), subReqTime, FirstTry, c.handleSubscriptionRequestTimer)
xapp.Logger.Debug("SubReq: Debugging trans table = %v", c.tracker.transactionXappTable)
}
if tryCount < maxSubReqTryCount {
- xapp.Logger.Info("SubReq timeout: Resending SubReq to E2T: Mtype: %v, SubId: %v, Xid %s, Meid %v", trans.OrigParams.Mtype, subs.GetSubId(), trans.GetXid(), subs.GetMeid())
+ xapp.Logger.Info("SubReq timeout: Resending SubReq to E2T: Mtype: %v, SubId: %v, Xid %s, Meid %v", trans.GetMtype(), subs.GetSubId(), trans.GetXid(), trans.GetMeid())
trans.RetryTransaction()
- c.rmrSend("SubReq(SubReq timer) to E2T", subs, trans, trans.OrigParams.Payload, trans.OrigParams.PayloadLen)
+ c.rmrSend("SubReq(SubReq timer) to E2T", subs, trans, trans.Payload, trans.PayloadLen)
tryCount++
c.timerMap.StartTimer("RIC_SUB_REQ", int(subs.GetSubId()), subReqTime, tryCount, c.handleSubscriptionRequestTimer)
return
}
- var subDelReqPayload []byte
- subDelReqPayload, err := c.e2ap.PackSubscriptionDeleteRequest(trans.OrigParams.Payload, subs.GetSubId())
- if err != nil {
- xapp.Logger.Error("SubReq timeout: Packing SubDelReq failed. Err: %v", err)
- return
- }
-
- // Cancel failed subscription
- params := &RMRParams{&xapp.RMRParams{}}
- params.Mtype = 12020 // RIC SUBSCRIPTION DELETE
- params.SubId = int(subs.GetSubId())
- params.Xid = trans.GetXid()
- params.Meid = subs.GetMeid()
- params.Src = trans.OrigParams.Src
- params.PayloadLen = len(subDelReqPayload)
- params.Payload = subDelReqPayload
- params.Mbuf = nil
-
// Delete CREATE transaction
trans.Release()
// Create DELETE transaction (internal and no messages toward xapp)
- var forwardRespToXapp bool = false
- var respReceived bool = false
- deltrans, err := c.tracker.TrackTransaction(trans.RmrEndpoint, params, respReceived, forwardRespToXapp)
+ deltrans, err := c.tracker.TrackTransaction(&trans.RmrEndpoint,
+ 12020, // RIC SUBSCRIPTION DELETE
+ trans.GetXid(),
+ trans.GetMeid(),
+ false,
+ false)
+
if err != nil {
xapp.Logger.Error("SubReq timeout: %s, Dropping this msg.", err.Error())
+ //TODO improve error handling. Important at least in merge
+ c.registry.DelSubscription(subs.GetSubId())
return
}
+ deltrans.SubDelReqMsg = &e2ap.E2APSubscriptionDeleteRequest{}
+ deltrans.SubDelReqMsg.RequestId.Id = trans.SubReqMsg.RequestId.Id
+ deltrans.SubDelReqMsg.RequestId.Seq = uint32(subs.GetSubId())
+ deltrans.SubDelReqMsg.FunctionId = trans.SubReqMsg.FunctionId
+ packedData, err := c.e2ap.PackSubscriptionDeleteRequest(deltrans.SubDelReqMsg)
+ if err != nil {
+ xapp.Logger.Error("SubReq timeout: Packing SubDelReq failed. Err: %v", err)
+ //TODO improve error handling. Important at least in merge
+ deltrans.Release()
+ c.registry.DelSubscription(subs.GetSubId())
+ return
+ }
+ deltrans.PayloadLen = len(packedData.Buf)
+ deltrans.Payload = packedData.Buf
+
err = subs.SetTransaction(deltrans)
if err != nil {
xapp.Logger.Error("SubReq timeout: %s, Dropping this msg.", err.Error())
+ //TODO improve error handling. Important at least in merge
deltrans.Release()
return
}
- c.rmrSend("SubDelReq(SubReq timer) to E2T", subs, deltrans, deltrans.OrigParams.Payload, deltrans.OrigParams.PayloadLen)
+ c.rmrSend("SubDelReq(SubReq timer) to E2T", subs, deltrans, deltrans.Payload, deltrans.PayloadLen)
c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()), subDelReqTime, FirstTry, c.handleSubscriptionDeleteRequestTimer)
return
}
func (c *Control) handleSubscriptionDeleteRequest(params *RMRParams) {
+ var subs *Subscription
+
xapp.Logger.Info("SubDelReq from xapp: %s", params.String())
- srcAddr, srcPort, err := c.rtmgrClient.SplitSource(params.Src)
+ trans, err := c.tracker.TrackTransaction(NewRmrEndpoint(params.Src),
+ params.Mtype,
+ params.Xid,
+ params.Meid,
+ false,
+ true)
+
if err != nil {
- xapp.Logger.Error("SubDelReq: Failed to update routing-manager. Dropping this msg. Err: %s, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
+ xapp.Logger.Error("SubDelReq: %s, Dropping this msg. %s", err.Error(), params.String())
return
}
payloadSeqNum, err := c.e2ap.GetSubscriptionDeleteRequestSequenceNumber(params.Payload)
- if err != nil {
- xapp.Logger.Error("SubDelReq: Unable to get Sequence Number from Payload. Dropping this msg. Err: %v, SubId: %v, Xid: %s, Payload %X", err, params.SubId, params.Xid, params.Payload)
- return
+ if err == nil {
+ subs = c.registry.GetSubscription(payloadSeqNum)
+ }
+ if subs == nil && params.SubId > 0 {
+ subs = c.registry.GetSubscription(uint16(params.SubId))
}
- xapp.Logger.Info("SubDelReq: Received payloadSeqNum: %v", payloadSeqNum)
- subs := c.registry.GetSubscription(payloadSeqNum)
if subs == nil {
- xapp.Logger.Error("SubDelReq: Not valid sequence number. Dropping this msg. SubId: %v, Xid: %s", params.SubId, params.Xid)
+ xapp.Logger.Error("SubDelReq: Not valid subscription found payloadSeqNum: %d. Dropping this msg. %s", payloadSeqNum, trans)
+ trans.Release()
return
}
+ xapp.Logger.Info("SubDelReq: subscription found payloadSeqNum: %d. %s", payloadSeqNum, trans)
- var forwardRespToXapp bool = true
- var respReceived bool = false
- trans, err := c.tracker.TrackTransaction(RmrEndpoint{*srcAddr, *srcPort}, params, respReceived, forwardRespToXapp)
- if err != nil {
- xapp.Logger.Error("SubDelReq: %s, Dropping this msg.", err.Error())
- return
- }
+ trans.PayloadLen = params.PayloadLen
+ trans.Payload = params.Payload
err = subs.SetTransaction(trans)
if err != nil {
- xapp.Logger.Error("SubDelReq: %s, Dropping this msg.", err.Error())
+ xapp.Logger.Error("SubDelReq: %s, Dropping this msg. %s", err.Error(), trans)
trans.Release()
return
}
subs.UnConfirmed()
- c.rmrSend("SubDelReq to E2T", subs, trans, trans.OrigParams.Payload, trans.OrigParams.PayloadLen)
+ c.rmrSend("SubDelReq to E2T", subs, trans, trans.Payload, trans.PayloadLen)
c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()), subDelReqTime, FirstTry, c.handleSubscriptionDeleteRequestTimer)
return
}
if trans.ForwardRespToXapp == true {
var subDelRespPayload []byte
- subDelRespPayload, err = c.e2ap.PackSubscriptionDeleteResponse(trans.OrigParams.Payload, subs.GetSubId())
+ subDelRespPayload, err = c.e2ap.PackSubscriptionDeleteResponseFromSubDelReq(trans.Payload, subs.GetSubId())
if err != nil {
xapp.Logger.Error("SubDelFail:Packing SubDelResp failed. Err: %v", err)
return
}
if tryCount < maxSubDelReqTryCount {
- xapp.Logger.Info("SubDelReq timeout: Resending SubDelReq to E2T: Mtype: %v, SubId: %v, Xid %s, Meid %v", trans.OrigParams.Mtype, subs.GetSubId(), trans.GetXid(), subs.GetMeid())
+ xapp.Logger.Info("SubDelReq timeout: Resending SubDelReq to E2T: Mtype: %v, SubId: %v, Xid %s, Meid %v", trans.GetMtype(), subs.GetSubId(), trans.GetXid(), trans.GetMeid())
// Set possible to handle new response for the subId
trans.RetryTransaction()
- c.rmrSend("SubDelReq(SubDelReq timer) to E2T", subs, trans, trans.OrigParams.Payload, trans.OrigParams.PayloadLen)
+ c.rmrSend("SubDelReq(SubDelReq timer) to E2T", subs, trans, trans.Payload, trans.PayloadLen)
tryCount++
c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()), subReqTime, tryCount, c.handleSubscriptionDeleteRequestTimer)
if trans.ForwardRespToXapp == true {
var subDelRespPayload []byte
- subDelRespPayload, err := c.e2ap.PackSubscriptionDeleteResponse(trans.OrigParams.Payload, subs.GetSubId())
+ subDelRespPayload, err := c.e2ap.PackSubscriptionDeleteResponseFromSubDelReq(trans.Payload, subs.GetSubId())
if err != nil {
- xapp.Logger.Error("SubDelReq timeout: Unable to pack payload. Dropping this this msg. Err: %v, SubId: %v, Xid: %s, Payload %x", err, subs.GetSubId(), trans.GetXid(), trans.OrigParams.Payload)
+ xapp.Logger.Error("SubDelReq timeout: Unable to pack payload. Dropping this this msg. Err: %v, SubId: %v, Xid: %s, Payload %x", err, subs.GetSubId(), trans.GetXid(), trans.Payload)
return
}
/* RICsubscriptionRequest */
-// Used by e2t test stub
-func (c *E2ap) GetSubscriptionRequestSequenceNumber(payload []byte) (subId uint16, err error) {
- cptr := unsafe.Pointer(&payload[0])
- cret := C.e2ap_get_ric_subscription_request_sequence_number(cptr, C.size_t(len(payload)))
- if cret < 0 {
- return 0, fmt.Errorf("e2ap wrapper is unable to get Subscirption Request Sequence Number due to wrong or invalid payload. Erroxappde: %v", cret)
- }
- subId = uint16(cret)
- return
-}
-
-// Used by submgr, xapp test stub
-func (c *E2ap) SetSubscriptionRequestSequenceNumber(payload []byte, newSubscriptionid uint16) (err error) {
- cptr := unsafe.Pointer(&payload[0])
- size := C.e2ap_set_ric_subscription_request_sequence_number(cptr, C.size_t(len(payload)), C.long(newSubscriptionid))
- if size < 0 {
- return fmt.Errorf("e2ap wrapper is unable to set Subscription Request Sequence Number due to wrong or invalid payload. Erroxappde: %v", size)
- }
- return
-}
-
// Used by submgr, xapp test stub
func (c *E2ap) GetSubscriptionResponseSequenceNumber(payload []byte) (subId uint16, err error) {
cptr := unsafe.Pointer(&payload[0])
}
// Used by submgr
-func (c *E2ap) PackSubscriptionDeleteResponse(payload []byte, newSubscriptionid uint16) (newPayload []byte, err error) {
- e2SubDelReq := packerif.NewPackerSubscriptionDeleteRequest()
- packedData := &packer.PackedData{}
- packedData.Buf = payload
- err = e2SubDelReq.UnPack(packedData)
+func (c *E2ap) PackSubscriptionDeleteResponseFromSubDelReq(payload []byte, newSubscriptionid uint16) (newPayload []byte, err error) {
+
+ subDelReq, err := c.UnpackSubscriptionDeleteRequest(payload)
if err != nil {
- return make([]byte, 0), fmt.Errorf("PackSubDelResp: UnPack() failed: %s", err.Error())
- }
- getErr, subDelReq := e2SubDelReq.Get()
- if getErr != nil {
- return make([]byte, 0), fmt.Errorf("PackSubDelResp: Get() failed: %s", getErr.Error())
+ return make([]byte, 0), fmt.Errorf("PackSubDelRespFromSubDelReq: SubDelReq unpack failed: %s", err.Error())
}
- e2SubDelResp := packerif.NewPackerSubscriptionDeleteResponse()
- subDelResp := e2ap.E2APSubscriptionDeleteResponse{}
+ subDelResp := &e2ap.E2APSubscriptionDeleteResponse{}
subDelResp.RequestId.Id = subDelReq.RequestId.Id
subDelResp.RequestId.Seq = uint32(newSubscriptionid)
subDelResp.FunctionId = subDelReq.FunctionId
- err = e2SubDelResp.Set(&subDelResp)
+
+ packedData, err := c.PackSubscriptionDeleteResponse(subDelResp)
+ if err != nil {
+ return make([]byte, 0), fmt.Errorf("PackSubDelRespFromSubDelReq: SubDelResp pack failed: %s", err.Error())
+ }
+ return packedData.Buf, nil
+}
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+func (c *E2ap) UnpackSubscriptionRequest(payload []byte) (*e2ap.E2APSubscriptionRequest, error) {
+ e2SubReq := packerif.NewPackerSubscriptionRequest()
+ packedData := &packer.PackedData{}
+ packedData.Buf = payload
+ err := e2SubReq.UnPack(packedData)
if err != nil {
- return make([]byte, 0), fmt.Errorf("PackSubDelResp: Set() failed: %s", err.Error())
+ return nil, err
}
- err, packedData = e2SubDelResp.Pack(nil)
+ err, subReq := e2SubReq.Get()
if err != nil {
- return make([]byte, 0), fmt.Errorf("PackSubDelResp: Pack() failed: %s", err.Error())
+ return nil, err
}
- return packedData.Buf, nil
+ return subReq, nil
}
-// Used by submgr
-func (c *E2ap) PackSubscriptionDeleteRequest(payload []byte, newSubscriptionid uint16) (newPayload []byte, err error) {
+func (c *E2ap) PackSubscriptionRequest(req *e2ap.E2APSubscriptionRequest) (*packer.PackedData, error) {
e2SubReq := packerif.NewPackerSubscriptionRequest()
+ err := e2SubReq.Set(req)
+ if err != nil {
+ return nil, err
+ }
+ err, packedData := e2SubReq.Pack(nil)
+ if err != nil {
+ return nil, err
+ }
+ return packedData, nil
+}
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+func (c *E2ap) UnpackSubscriptionDeleteRequest(payload []byte) (*e2ap.E2APSubscriptionDeleteRequest, error) {
+ e2SubDelReq := packerif.NewPackerSubscriptionDeleteRequest()
packedData := &packer.PackedData{}
packedData.Buf = payload
- err = e2SubReq.UnPack(packedData)
+ err := e2SubDelReq.UnPack(packedData)
if err != nil {
- return make([]byte, 0), fmt.Errorf("PackSubDelReq: UnPack() failed: %s", err.Error())
+ return nil, err
}
- getErr, subReq := e2SubReq.Get()
- if getErr != nil {
- return make([]byte, 0), fmt.Errorf("PackSubDelReq: Get() failed: %s", getErr.Error())
+ err, subReq := e2SubDelReq.Get()
+ if err != nil {
+ return nil, err
}
+ return subReq, nil
+}
- e2SubDel := packerif.NewPackerSubscriptionDeleteRequest()
- subDelReq := e2ap.E2APSubscriptionDeleteRequest{}
- subDelReq.RequestId.Id = subReq.RequestId.Id
- subDelReq.RequestId.Seq = uint32(newSubscriptionid)
- subDelReq.FunctionId = subReq.FunctionId
- err = e2SubDel.Set(&subDelReq)
+func (c *E2ap) PackSubscriptionDeleteRequest(req *e2ap.E2APSubscriptionDeleteRequest) (*packer.PackedData, error) {
+ e2SubDelReq := packerif.NewPackerSubscriptionDeleteRequest()
+ err := e2SubDelReq.Set(req)
if err != nil {
- return make([]byte, 0), fmt.Errorf("PackSubDelReq: Set() failed: %s", err.Error())
+ return nil, err
}
- err, packedData = e2SubDel.Pack(nil)
+ err, packedData := e2SubDelReq.Pack(nil)
if err != nil {
- return make([]byte, 0), fmt.Errorf("PackSubDelReq: Pack() failed: %s", err.Error())
+ return nil, err
}
- return packedData.Buf, nil
+ return packedData, nil
+}
+
+func (c *E2ap) PackSubscriptionDeleteResponse(req *e2ap.E2APSubscriptionDeleteResponse) (*packer.PackedData, error) {
+ e2SubDelResp := packerif.NewPackerSubscriptionDeleteResponse()
+ err := e2SubDelResp.Set(req)
+ if err != nil {
+ return nil, err
+ }
+ err, packedData := e2SubDelResp.Pack(nil)
+ if err != nil {
+ return nil, err
+ }
+ return packedData, nil
}