X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=pkg%2Fcontrol%2Fcontrol.go;h=cc176c2b711c1c58370a648da6a39f721b50a6ea;hb=4721d4e64a1abaf189b3acfc8fa259f1b9f10ced;hp=614ac9b3e121b0458786c963712fc5fb466691d9;hpb=375c141ce21767f7bda94ec435fdf833b41fbbd2;p=ric-plt%2Fsubmgr.git diff --git a/pkg/control/control.go b/pkg/control/control.go index 614ac9b..cc176c2 100755 --- a/pkg/control/control.go +++ b/pkg/control/control.go @@ -28,7 +28,6 @@ import ( httptransport "github.com/go-openapi/runtime/client" "github.com/go-openapi/strfmt" "github.com/spf13/viper" - "math/rand" "sync" "time" ) @@ -57,8 +56,6 @@ type RMRMeid struct { RanName string } -var seedSN uint16 - const ( CREATE Action = 0 MERGE Action = 1 @@ -71,15 +68,6 @@ func init() { viper.AutomaticEnv() viper.SetEnvPrefix("submgr") viper.AllowEmptyEnv(true) - seedSN = uint16(viper.GetInt("seed_sn")) - if seedSN == 0 { - rand.Seed(time.Now().UnixNano()) - seedSN = uint16(rand.Intn(65535)) - } - if seedSN > 65535 { - seedSN = 0 - } - xapp.Logger.Info("SUBMGR: Initial Sequence Number: %v", seedSN) } func NewControl() *Control { @@ -91,7 +79,7 @@ func NewControl() *Control { rtmgrClient := RtmgrClient{client, handle, deleteHandle} registry := new(Registry) - registry.Initialize(seedSN) + registry.Initialize() registry.rtmgrClient = &rtmgrClient tracker := new(Tracker) @@ -134,29 +122,29 @@ func (c *Control) rmrSendRaw(desc string, params *RMRParams) (err error) { return } -func (c *Control) rmrSend(desc string, subs *Subscription, trans *Transaction, payload []byte, payloadLen int) (err error) { +func (c *Control) rmrSend(desc string, subs *Subscription, trans *Transaction) (err error) { params := &RMRParams{&xapp.RMRParams{}} params.Mtype = trans.GetMtype() params.SubId = int(subs.GetSubId()) params.Xid = "" params.Meid = subs.GetMeid() params.Src = "" - params.PayloadLen = payloadLen - params.Payload = payload + params.PayloadLen = len(trans.Payload.Buf) + params.Payload = trans.Payload.Buf params.Mbuf = nil return c.rmrSendRaw(desc, params) } -func (c *Control) rmrReplyToSender(desc string, subs *Subscription, trans *Transaction, mType int, payload []byte, payloadLen int) (err error) { +func (c *Control) rmrReplyToSender(desc string, subs *Subscription, trans *Transaction) (err error) { params := &RMRParams{&xapp.RMRParams{}} - params.Mtype = mType + params.Mtype = trans.GetMtype() params.SubId = int(subs.GetSubId()) params.Xid = trans.GetXid() params.Meid = trans.GetMeid() params.Src = "" - params.PayloadLen = payloadLen - params.Payload = payload + params.PayloadLen = len(trans.Payload.Buf) + params.Payload = trans.Payload.Buf params.Mbuf = nil return c.rmrSendRaw(desc, params) @@ -194,7 +182,6 @@ func (c *Control) handleSubscriptionRequest(params *RMRParams) { // // trans, err := c.tracker.TrackTransaction(NewRmrEndpoint(params.Src), - params.Mtype, params.Xid, params.Meid, false, @@ -243,7 +230,7 @@ func (c *Control) handleSubscriptionRequest(params *RMRParams) { // // This is intermediate solution while improving message handling // - packedData, err := c.e2ap.PackSubscriptionRequest(trans.SubReqMsg) + trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(trans.SubReqMsg) if err != nil { xapp.Logger.Error("SubReq: %s for trans %s", err.Error(), trans) subs.Release() @@ -251,11 +238,7 @@ func (c *Control) handleSubscriptionRequest(params *RMRParams) { return } - //Optimize and store packed message to be sent (for retransmission). Again owned by subscription? - trans.Payload = packedData.Buf - trans.PayloadLen = len(packedData.Buf) - - c.rmrSend("SubReq: SubReq to E2T", subs, trans, trans.Payload, trans.PayloadLen) + c.rmrSend("SubReq: SubReq to E2T", subs, trans) c.timerMap.StartTimer("RIC_SUB_REQ", int(subs.GetSubId()), subReqTime, FirstTry, c.handleSubscriptionRequestTimer) xapp.Logger.Debug("SubReq: Debugging trans table = %v", c.tracker.transactionXappTable) @@ -270,7 +253,7 @@ func (c *Control) handleSubscriptionResponse(params *RMRParams) { // SubRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload) if err != nil { - xapp.Logger.Error("SubDelReq: %s Dropping this msg. %s", err.Error(), params.String()) + xapp.Logger.Error("SubResp: %s Dropping this msg. %s", err.Error(), params.String()) return } @@ -310,20 +293,16 @@ func (c *Control) handleSubscriptionResponse(params *RMRParams) { return } - packedData, err := c.e2ap.PackSubscriptionResponse(trans.SubRespMsg) + trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(trans.SubRespMsg) if err != nil { xapp.Logger.Error("SubResp: %s for trans %s", err.Error(), trans) trans.Release() return } - //Optimize and store packed message to be sent. - trans.Payload = packedData.Buf - trans.PayloadLen = len(packedData.Buf) - subs.Confirmed() trans.Release() - c.rmrReplyToSender("SubResp: SubResp to xapp", subs, trans, 12011, trans.Payload, trans.PayloadLen) + c.rmrReplyToSender("SubResp: SubResp to xapp", subs, trans) return } @@ -378,12 +357,9 @@ func (c *Control) handleSubscriptionFailure(params *RMRParams) { return } - packedData, err := c.e2ap.PackSubscriptionFailure(trans.SubFailMsg) + trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(trans.SubFailMsg) if err == nil { - //Optimize and store packed message to be sent. - trans.Payload = packedData.Buf - trans.PayloadLen = len(packedData.Buf) - c.rmrReplyToSender("SubFail: SubFail to xapp", subs, trans, 12012, trans.Payload, trans.PayloadLen) + c.rmrReplyToSender("SubFail: SubFail to xapp", subs, trans) time.Sleep(3 * time.Second) } else { //TODO error handling improvement @@ -418,11 +394,11 @@ func (c *Control) handleSubscriptionRequestTimer(strId string, nbrId int, tryCou } if tryCount < maxSubReqTryCount { - xapp.Logger.Info("SubReq timeout: Resending SubReq to E2T: Mtype: %v, SubId: %v, Xid %s, Meid %v", trans.GetMtype(), subs.GetSubId(), trans.GetXid(), trans.GetMeid()) + xapp.Logger.Info("SubReq timeout: subs: %s trans: %s", subs, trans) trans.RetryTransaction() - c.rmrSend("SubReq timeout: SubReq to E2T", subs, trans, trans.Payload, trans.PayloadLen) + c.rmrSend("SubReq timeout: SubReq to E2T", subs, trans) tryCount++ c.timerMap.StartTimer("RIC_SUB_REQ", int(subs.GetSubId()), subReqTime, tryCount, c.handleSubscriptionRequestTimer) @@ -434,7 +410,6 @@ func (c *Control) handleSubscriptionRequestTimer(strId string, nbrId int, tryCou // Create DELETE transaction (internal and no messages toward xapp) deltrans, err := c.tracker.TrackTransaction(&trans.RmrEndpoint, - 12020, // RIC SUBSCRIPTION DELETE trans.GetXid(), trans.GetMeid(), false, @@ -451,7 +426,7 @@ func (c *Control) handleSubscriptionRequestTimer(strId string, nbrId int, tryCou deltrans.SubDelReqMsg.RequestId.Id = trans.SubReqMsg.RequestId.Id deltrans.SubDelReqMsg.RequestId.Seq = uint32(subs.GetSubId()) deltrans.SubDelReqMsg.FunctionId = trans.SubReqMsg.FunctionId - packedData, err := c.e2ap.PackSubscriptionDeleteRequest(deltrans.SubDelReqMsg) + deltrans.Mtype, deltrans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(deltrans.SubDelReqMsg) if err != nil { xapp.Logger.Error("SubReq timeout: Packing SubDelReq failed. Err: %v", err) //TODO improve error handling. Important at least in merge @@ -459,8 +434,6 @@ func (c *Control) handleSubscriptionRequestTimer(strId string, nbrId int, tryCou subs.Release() return } - deltrans.PayloadLen = len(packedData.Buf) - deltrans.Payload = packedData.Buf err = subs.SetTransaction(deltrans) if err != nil { @@ -470,7 +443,7 @@ func (c *Control) handleSubscriptionRequestTimer(strId string, nbrId int, tryCou return } - c.rmrSend("SubReq timer: SubDelReq to E2T", subs, deltrans, deltrans.Payload, deltrans.PayloadLen) + c.rmrSend("SubReq timer: SubDelReq to E2T", subs, deltrans) c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()), subDelReqTime, FirstTry, c.handleSubscriptionDeleteRequestTimer) return } @@ -482,7 +455,6 @@ func (c *Control) handleSubscriptionDeleteRequest(params *RMRParams) { // // trans, err := c.tracker.TrackTransaction(NewRmrEndpoint(params.Src), - params.Mtype, params.Xid, params.Meid, false, @@ -533,20 +505,16 @@ func (c *Control) handleSubscriptionDeleteRequest(params *RMRParams) { // // This is intermediate solution while improving message handling // - packedData, err := c.e2ap.PackSubscriptionDeleteRequest(trans.SubDelReqMsg) + trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(trans.SubDelReqMsg) if err != nil { xapp.Logger.Error("SubDelReq: %s for trans %s", err.Error(), trans) trans.Release() return } - //Optimize and store packed message to be sent (for retransmission). Again owned by subscription? - trans.Payload = packedData.Buf - trans.PayloadLen = len(packedData.Buf) - subs.UnConfirmed() - c.rmrSend("SubDelReq: SubDelReq to E2T", subs, trans, trans.Payload, trans.PayloadLen) + c.rmrSend("SubDelReq: SubDelReq to E2T", subs, trans) c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()), subDelReqTime, FirstTry, c.handleSubscriptionDeleteRequestTimer) return @@ -679,7 +647,7 @@ func (c *Control) handleSubscriptionDeleteRequestTimer(strId string, nbrId int, if tryCount < maxSubDelReqTryCount { // Set possible to handle new response for the subId trans.RetryTransaction() - c.rmrSend("SubDelReq timeout: SubDelReq to E2T", subs, trans, trans.Payload, trans.PayloadLen) + c.rmrSend("SubDelReq timeout: SubDelReq to E2T", subs, trans) tryCount++ c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()), subReqTime, tryCount, c.handleSubscriptionDeleteRequestTimer) return @@ -698,11 +666,10 @@ func (c *Control) sendSubscriptionDeleteResponse(desc string, trans *Transaction trans.SubDelRespMsg.RequestId.Seq = uint32(subs.GetSubId()) trans.SubDelRespMsg.FunctionId = trans.SubDelReqMsg.FunctionId - packedData, err := c.e2ap.PackSubscriptionDeleteResponse(trans.SubDelRespMsg) + var err error + trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(trans.SubDelRespMsg) if err == nil { - trans.Payload = packedData.Buf - trans.PayloadLen = len(packedData.Buf) - c.rmrReplyToSender(desc+": SubDelResp to xapp", subs, trans, 12021, trans.Payload, trans.PayloadLen) + c.rmrReplyToSender(desc+": SubDelResp to xapp", subs, trans) time.Sleep(3 * time.Second) } else { //TODO error handling improvement