From: Juha Hyttinen Date: Wed, 15 Jan 2020 07:25:13 +0000 (+0200) Subject: Use packed data to store cache encoded message X-Git-Tag: 0.4.0~40 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=9340072742db74b6f93ca43d73841c388dd80e02;p=ric-plt%2Fsubmgr.git Use packed data to store cache encoded message Change-Id: I687dbe101da773b6a069f9909e0eb6ca9c410fc3 Signed-off-by: Juha Hyttinen --- diff --git a/pkg/control/control.go b/pkg/control/control.go index 614ac9b..b189071 100755 --- a/pkg/control/control.go +++ b/pkg/control/control.go @@ -22,6 +22,7 @@ package control import ( "fmt" "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap" + "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/packer" rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client" rtmgrhandle "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client/handle" "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" @@ -134,29 +135,29 @@ func (c *Control) rmrSendRaw(desc string, params *RMRParams) (err error) { return } -func (c *Control) rmrSend(desc string, subs *Subscription, trans *Transaction, payload []byte, payloadLen int) (err error) { +func (c *Control) rmrSend(desc string, subs *Subscription, trans *Transaction, payload *packer.PackedData) (err error) { params := &RMRParams{&xapp.RMRParams{}} params.Mtype = trans.GetMtype() params.SubId = int(subs.GetSubId()) params.Xid = "" params.Meid = subs.GetMeid() params.Src = "" - params.PayloadLen = payloadLen - params.Payload = payload + params.PayloadLen = len(payload.Buf) + params.Payload = payload.Buf params.Mbuf = nil return c.rmrSendRaw(desc, params) } -func (c *Control) rmrReplyToSender(desc string, subs *Subscription, trans *Transaction, mType int, payload []byte, payloadLen int) (err error) { +func (c *Control) rmrReplyToSender(desc string, subs *Subscription, trans *Transaction, mType int, payload *packer.PackedData) (err error) { params := &RMRParams{&xapp.RMRParams{}} params.Mtype = mType params.SubId = int(subs.GetSubId()) params.Xid = trans.GetXid() params.Meid = trans.GetMeid() params.Src = "" - params.PayloadLen = payloadLen - params.Payload = payload + params.PayloadLen = len(payload.Buf) + params.Payload = payload.Buf params.Mbuf = nil return c.rmrSendRaw(desc, params) @@ -243,7 +244,7 @@ func (c *Control) handleSubscriptionRequest(params *RMRParams) { // // This is intermediate solution while improving message handling // - packedData, err := c.e2ap.PackSubscriptionRequest(trans.SubReqMsg) + trans.Payload, err = c.e2ap.PackSubscriptionRequest(trans.SubReqMsg) if err != nil { xapp.Logger.Error("SubReq: %s for trans %s", err.Error(), trans) subs.Release() @@ -251,11 +252,7 @@ func (c *Control) handleSubscriptionRequest(params *RMRParams) { return } - //Optimize and store packed message to be sent (for retransmission). Again owned by subscription? - trans.Payload = packedData.Buf - trans.PayloadLen = len(packedData.Buf) - - c.rmrSend("SubReq: SubReq to E2T", subs, trans, trans.Payload, trans.PayloadLen) + c.rmrSend("SubReq: SubReq to E2T", subs, trans, trans.Payload) c.timerMap.StartTimer("RIC_SUB_REQ", int(subs.GetSubId()), subReqTime, FirstTry, c.handleSubscriptionRequestTimer) xapp.Logger.Debug("SubReq: Debugging trans table = %v", c.tracker.transactionXappTable) @@ -310,20 +307,16 @@ func (c *Control) handleSubscriptionResponse(params *RMRParams) { return } - packedData, err := c.e2ap.PackSubscriptionResponse(trans.SubRespMsg) + trans.Payload, err = c.e2ap.PackSubscriptionResponse(trans.SubRespMsg) if err != nil { xapp.Logger.Error("SubResp: %s for trans %s", err.Error(), trans) trans.Release() return } - //Optimize and store packed message to be sent. - trans.Payload = packedData.Buf - trans.PayloadLen = len(packedData.Buf) - subs.Confirmed() trans.Release() - c.rmrReplyToSender("SubResp: SubResp to xapp", subs, trans, 12011, trans.Payload, trans.PayloadLen) + c.rmrReplyToSender("SubResp: SubResp to xapp", subs, trans, 12011, trans.Payload) return } @@ -378,12 +371,9 @@ func (c *Control) handleSubscriptionFailure(params *RMRParams) { return } - packedData, err := c.e2ap.PackSubscriptionFailure(trans.SubFailMsg) + trans.Payload, err = c.e2ap.PackSubscriptionFailure(trans.SubFailMsg) if err == nil { - //Optimize and store packed message to be sent. - trans.Payload = packedData.Buf - trans.PayloadLen = len(packedData.Buf) - c.rmrReplyToSender("SubFail: SubFail to xapp", subs, trans, 12012, trans.Payload, trans.PayloadLen) + c.rmrReplyToSender("SubFail: SubFail to xapp", subs, trans, 12012, trans.Payload) time.Sleep(3 * time.Second) } else { //TODO error handling improvement @@ -422,7 +412,7 @@ func (c *Control) handleSubscriptionRequestTimer(strId string, nbrId int, tryCou trans.RetryTransaction() - c.rmrSend("SubReq timeout: SubReq to E2T", subs, trans, trans.Payload, trans.PayloadLen) + c.rmrSend("SubReq timeout: SubReq to E2T", subs, trans, trans.Payload) tryCount++ c.timerMap.StartTimer("RIC_SUB_REQ", int(subs.GetSubId()), subReqTime, tryCount, c.handleSubscriptionRequestTimer) @@ -451,7 +441,7 @@ func (c *Control) handleSubscriptionRequestTimer(strId string, nbrId int, tryCou deltrans.SubDelReqMsg.RequestId.Id = trans.SubReqMsg.RequestId.Id deltrans.SubDelReqMsg.RequestId.Seq = uint32(subs.GetSubId()) deltrans.SubDelReqMsg.FunctionId = trans.SubReqMsg.FunctionId - packedData, err := c.e2ap.PackSubscriptionDeleteRequest(deltrans.SubDelReqMsg) + deltrans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(deltrans.SubDelReqMsg) if err != nil { xapp.Logger.Error("SubReq timeout: Packing SubDelReq failed. Err: %v", err) //TODO improve error handling. Important at least in merge @@ -459,8 +449,6 @@ func (c *Control) handleSubscriptionRequestTimer(strId string, nbrId int, tryCou subs.Release() return } - deltrans.PayloadLen = len(packedData.Buf) - deltrans.Payload = packedData.Buf err = subs.SetTransaction(deltrans) if err != nil { @@ -470,7 +458,7 @@ func (c *Control) handleSubscriptionRequestTimer(strId string, nbrId int, tryCou return } - c.rmrSend("SubReq timer: SubDelReq to E2T", subs, deltrans, deltrans.Payload, deltrans.PayloadLen) + c.rmrSend("SubReq timer: SubDelReq to E2T", subs, deltrans, deltrans.Payload) c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()), subDelReqTime, FirstTry, c.handleSubscriptionDeleteRequestTimer) return } @@ -533,20 +521,16 @@ func (c *Control) handleSubscriptionDeleteRequest(params *RMRParams) { // // This is intermediate solution while improving message handling // - packedData, err := c.e2ap.PackSubscriptionDeleteRequest(trans.SubDelReqMsg) + trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(trans.SubDelReqMsg) if err != nil { xapp.Logger.Error("SubDelReq: %s for trans %s", err.Error(), trans) trans.Release() return } - //Optimize and store packed message to be sent (for retransmission). Again owned by subscription? - trans.Payload = packedData.Buf - trans.PayloadLen = len(packedData.Buf) - subs.UnConfirmed() - c.rmrSend("SubDelReq: SubDelReq to E2T", subs, trans, trans.Payload, trans.PayloadLen) + c.rmrSend("SubDelReq: SubDelReq to E2T", subs, trans, trans.Payload) c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()), subDelReqTime, FirstTry, c.handleSubscriptionDeleteRequestTimer) return @@ -679,7 +663,7 @@ func (c *Control) handleSubscriptionDeleteRequestTimer(strId string, nbrId int, if tryCount < maxSubDelReqTryCount { // Set possible to handle new response for the subId trans.RetryTransaction() - c.rmrSend("SubDelReq timeout: SubDelReq to E2T", subs, trans, trans.Payload, trans.PayloadLen) + c.rmrSend("SubDelReq timeout: SubDelReq to E2T", subs, trans, trans.Payload) tryCount++ c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()), subReqTime, tryCount, c.handleSubscriptionDeleteRequestTimer) return @@ -698,11 +682,10 @@ func (c *Control) sendSubscriptionDeleteResponse(desc string, trans *Transaction trans.SubDelRespMsg.RequestId.Seq = uint32(subs.GetSubId()) trans.SubDelRespMsg.FunctionId = trans.SubDelReqMsg.FunctionId - packedData, err := c.e2ap.PackSubscriptionDeleteResponse(trans.SubDelRespMsg) + var err error + trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(trans.SubDelRespMsg) if err == nil { - trans.Payload = packedData.Buf - trans.PayloadLen = len(packedData.Buf) - c.rmrReplyToSender(desc+": SubDelResp to xapp", subs, trans, 12021, trans.Payload, trans.PayloadLen) + c.rmrReplyToSender(desc+": SubDelResp to xapp", subs, trans, 12021, trans.Payload) time.Sleep(3 * time.Second) } else { //TODO error handling improvement diff --git a/pkg/control/transaction.go b/pkg/control/transaction.go index cdfdcfb..b0da077 100644 --- a/pkg/control/transaction.go +++ b/pkg/control/transaction.go @@ -21,6 +21,7 @@ package control import ( "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap" + "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/packer" "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" "strconv" "sync" @@ -55,8 +56,7 @@ type Transaction struct { SubDelReqMsg *e2ap.E2APSubscriptionDeleteRequest //SubDelReq TODO: maybe own transactions per type SubDelRespMsg *e2ap.E2APSubscriptionDeleteResponse //SubDelResp TODO: maybe own transactions per type SubDelFailMsg *e2ap.E2APSubscriptionDeleteFailure //SubDelFail TODO: maybe own transactions per type - Payload []byte //packed message to optimize retransmissions - PayloadLen int //packed message len to optimize retransmissions + Payload *packer.PackedData //Encoded message to be send. Optimized RespReceived bool ForwardRespToXapp bool }