X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=pkg%2Fcontrol%2Fcontrol.go;h=cc176c2b711c1c58370a648da6a39f721b50a6ea;hb=7e9c5e52615fef7cbc23de28505e1a7292cdf14f;hp=b189071fe6e16b787eb8bb333aa416dea98becc5;hpb=9340072742db74b6f93ca43d73841c388dd80e02;p=ric-plt%2Fsubmgr.git diff --git a/pkg/control/control.go b/pkg/control/control.go index b189071..cc176c2 100755 --- a/pkg/control/control.go +++ b/pkg/control/control.go @@ -22,14 +22,12 @@ package control import ( "fmt" "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap" - "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/packer" rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client" rtmgrhandle "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client/handle" "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" httptransport "github.com/go-openapi/runtime/client" "github.com/go-openapi/strfmt" "github.com/spf13/viper" - "math/rand" "sync" "time" ) @@ -58,8 +56,6 @@ type RMRMeid struct { RanName string } -var seedSN uint16 - const ( CREATE Action = 0 MERGE Action = 1 @@ -72,15 +68,6 @@ func init() { viper.AutomaticEnv() viper.SetEnvPrefix("submgr") viper.AllowEmptyEnv(true) - seedSN = uint16(viper.GetInt("seed_sn")) - if seedSN == 0 { - rand.Seed(time.Now().UnixNano()) - seedSN = uint16(rand.Intn(65535)) - } - if seedSN > 65535 { - seedSN = 0 - } - xapp.Logger.Info("SUBMGR: Initial Sequence Number: %v", seedSN) } func NewControl() *Control { @@ -92,7 +79,7 @@ func NewControl() *Control { rtmgrClient := RtmgrClient{client, handle, deleteHandle} registry := new(Registry) - registry.Initialize(seedSN) + registry.Initialize() registry.rtmgrClient = &rtmgrClient tracker := new(Tracker) @@ -135,29 +122,29 @@ func (c *Control) rmrSendRaw(desc string, params *RMRParams) (err error) { return } -func (c *Control) rmrSend(desc string, subs *Subscription, trans *Transaction, payload *packer.PackedData) (err error) { +func (c *Control) rmrSend(desc string, subs *Subscription, trans *Transaction) (err error) { params := &RMRParams{&xapp.RMRParams{}} params.Mtype = trans.GetMtype() params.SubId = int(subs.GetSubId()) params.Xid = "" params.Meid = subs.GetMeid() params.Src = "" - params.PayloadLen = len(payload.Buf) - params.Payload = payload.Buf + params.PayloadLen = len(trans.Payload.Buf) + params.Payload = trans.Payload.Buf params.Mbuf = nil return c.rmrSendRaw(desc, params) } -func (c *Control) rmrReplyToSender(desc string, subs *Subscription, trans *Transaction, mType int, payload *packer.PackedData) (err error) { +func (c *Control) rmrReplyToSender(desc string, subs *Subscription, trans *Transaction) (err error) { params := &RMRParams{&xapp.RMRParams{}} - params.Mtype = mType + params.Mtype = trans.GetMtype() params.SubId = int(subs.GetSubId()) params.Xid = trans.GetXid() params.Meid = trans.GetMeid() params.Src = "" - params.PayloadLen = len(payload.Buf) - params.Payload = payload.Buf + params.PayloadLen = len(trans.Payload.Buf) + params.Payload = trans.Payload.Buf params.Mbuf = nil return c.rmrSendRaw(desc, params) @@ -195,7 +182,6 @@ func (c *Control) handleSubscriptionRequest(params *RMRParams) { // // trans, err := c.tracker.TrackTransaction(NewRmrEndpoint(params.Src), - params.Mtype, params.Xid, params.Meid, false, @@ -244,7 +230,7 @@ func (c *Control) handleSubscriptionRequest(params *RMRParams) { // // This is intermediate solution while improving message handling // - trans.Payload, err = c.e2ap.PackSubscriptionRequest(trans.SubReqMsg) + trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(trans.SubReqMsg) if err != nil { xapp.Logger.Error("SubReq: %s for trans %s", err.Error(), trans) subs.Release() @@ -252,7 +238,7 @@ func (c *Control) handleSubscriptionRequest(params *RMRParams) { return } - c.rmrSend("SubReq: SubReq to E2T", subs, trans, trans.Payload) + c.rmrSend("SubReq: SubReq to E2T", subs, trans) c.timerMap.StartTimer("RIC_SUB_REQ", int(subs.GetSubId()), subReqTime, FirstTry, c.handleSubscriptionRequestTimer) xapp.Logger.Debug("SubReq: Debugging trans table = %v", c.tracker.transactionXappTable) @@ -267,7 +253,7 @@ func (c *Control) handleSubscriptionResponse(params *RMRParams) { // SubRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload) if err != nil { - xapp.Logger.Error("SubDelReq: %s Dropping this msg. %s", err.Error(), params.String()) + xapp.Logger.Error("SubResp: %s Dropping this msg. %s", err.Error(), params.String()) return } @@ -307,7 +293,7 @@ func (c *Control) handleSubscriptionResponse(params *RMRParams) { return } - trans.Payload, err = c.e2ap.PackSubscriptionResponse(trans.SubRespMsg) + trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(trans.SubRespMsg) if err != nil { xapp.Logger.Error("SubResp: %s for trans %s", err.Error(), trans) trans.Release() @@ -316,7 +302,7 @@ func (c *Control) handleSubscriptionResponse(params *RMRParams) { subs.Confirmed() trans.Release() - c.rmrReplyToSender("SubResp: SubResp to xapp", subs, trans, 12011, trans.Payload) + c.rmrReplyToSender("SubResp: SubResp to xapp", subs, trans) return } @@ -371,9 +357,9 @@ func (c *Control) handleSubscriptionFailure(params *RMRParams) { return } - trans.Payload, err = c.e2ap.PackSubscriptionFailure(trans.SubFailMsg) + trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(trans.SubFailMsg) if err == nil { - c.rmrReplyToSender("SubFail: SubFail to xapp", subs, trans, 12012, trans.Payload) + c.rmrReplyToSender("SubFail: SubFail to xapp", subs, trans) time.Sleep(3 * time.Second) } else { //TODO error handling improvement @@ -408,11 +394,11 @@ func (c *Control) handleSubscriptionRequestTimer(strId string, nbrId int, tryCou } if tryCount < maxSubReqTryCount { - xapp.Logger.Info("SubReq timeout: Resending SubReq to E2T: Mtype: %v, SubId: %v, Xid %s, Meid %v", trans.GetMtype(), subs.GetSubId(), trans.GetXid(), trans.GetMeid()) + xapp.Logger.Info("SubReq timeout: subs: %s trans: %s", subs, trans) trans.RetryTransaction() - c.rmrSend("SubReq timeout: SubReq to E2T", subs, trans, trans.Payload) + c.rmrSend("SubReq timeout: SubReq to E2T", subs, trans) tryCount++ c.timerMap.StartTimer("RIC_SUB_REQ", int(subs.GetSubId()), subReqTime, tryCount, c.handleSubscriptionRequestTimer) @@ -424,7 +410,6 @@ func (c *Control) handleSubscriptionRequestTimer(strId string, nbrId int, tryCou // Create DELETE transaction (internal and no messages toward xapp) deltrans, err := c.tracker.TrackTransaction(&trans.RmrEndpoint, - 12020, // RIC SUBSCRIPTION DELETE trans.GetXid(), trans.GetMeid(), false, @@ -441,7 +426,7 @@ func (c *Control) handleSubscriptionRequestTimer(strId string, nbrId int, tryCou deltrans.SubDelReqMsg.RequestId.Id = trans.SubReqMsg.RequestId.Id deltrans.SubDelReqMsg.RequestId.Seq = uint32(subs.GetSubId()) deltrans.SubDelReqMsg.FunctionId = trans.SubReqMsg.FunctionId - deltrans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(deltrans.SubDelReqMsg) + deltrans.Mtype, deltrans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(deltrans.SubDelReqMsg) if err != nil { xapp.Logger.Error("SubReq timeout: Packing SubDelReq failed. Err: %v", err) //TODO improve error handling. Important at least in merge @@ -458,7 +443,7 @@ func (c *Control) handleSubscriptionRequestTimer(strId string, nbrId int, tryCou return } - c.rmrSend("SubReq timer: SubDelReq to E2T", subs, deltrans, deltrans.Payload) + c.rmrSend("SubReq timer: SubDelReq to E2T", subs, deltrans) c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()), subDelReqTime, FirstTry, c.handleSubscriptionDeleteRequestTimer) return } @@ -470,7 +455,6 @@ func (c *Control) handleSubscriptionDeleteRequest(params *RMRParams) { // // trans, err := c.tracker.TrackTransaction(NewRmrEndpoint(params.Src), - params.Mtype, params.Xid, params.Meid, false, @@ -521,7 +505,7 @@ func (c *Control) handleSubscriptionDeleteRequest(params *RMRParams) { // // This is intermediate solution while improving message handling // - trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(trans.SubDelReqMsg) + trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(trans.SubDelReqMsg) if err != nil { xapp.Logger.Error("SubDelReq: %s for trans %s", err.Error(), trans) trans.Release() @@ -530,7 +514,7 @@ func (c *Control) handleSubscriptionDeleteRequest(params *RMRParams) { subs.UnConfirmed() - c.rmrSend("SubDelReq: SubDelReq to E2T", subs, trans, trans.Payload) + c.rmrSend("SubDelReq: SubDelReq to E2T", subs, trans) c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()), subDelReqTime, FirstTry, c.handleSubscriptionDeleteRequestTimer) return @@ -663,7 +647,7 @@ func (c *Control) handleSubscriptionDeleteRequestTimer(strId string, nbrId int, if tryCount < maxSubDelReqTryCount { // Set possible to handle new response for the subId trans.RetryTransaction() - c.rmrSend("SubDelReq timeout: SubDelReq to E2T", subs, trans, trans.Payload) + c.rmrSend("SubDelReq timeout: SubDelReq to E2T", subs, trans) tryCount++ c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()), subReqTime, tryCount, c.handleSubscriptionDeleteRequestTimer) return @@ -683,9 +667,9 @@ func (c *Control) sendSubscriptionDeleteResponse(desc string, trans *Transaction trans.SubDelRespMsg.FunctionId = trans.SubDelReqMsg.FunctionId var err error - trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(trans.SubDelRespMsg) + trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(trans.SubDelRespMsg) if err == nil { - c.rmrReplyToSender(desc+": SubDelResp to xapp", subs, trans, 12021, trans.Payload) + c.rmrReplyToSender(desc+": SubDelResp to xapp", subs, trans) time.Sleep(3 * time.Second) } else { //TODO error handling improvement