X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=pkg%2Fcontrol%2Fcontrol.go;h=094e020c29fe3cd8625f61e482630ddb4de7a34d;hb=refs%2Fchanges%2F02%2F2502%2F1;hp=1d64f3c64bbf17c77fd2be1235800275b732bbd2;hpb=83ada00338d2c9fa47d48c406b4a46b9d7888aff;p=ric-plt%2Fsubmgr.git diff --git a/pkg/control/control.go b/pkg/control/control.go index 1d64f3c..094e020 100755 --- a/pkg/control/control.go +++ b/pkg/control/control.go @@ -23,11 +23,11 @@ import ( "fmt" "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap" rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client" + "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/xapptweaks" "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" httptransport "github.com/go-openapi/runtime/client" "github.com/go-openapi/strfmt" "github.com/spf13/viper" - "sync" "time" ) @@ -35,6 +35,25 @@ import ( // //----------------------------------------------------------------------------- +func idstring(err error, entries ...fmt.Stringer) string { + var retval string = "" + var filler string = "" + for _, entry := range entries { + retval += filler + entry.String() + filler = " " + } + if err != nil { + retval += filler + "err(" + err.Error() + ")" + filler = " " + + } + return retval +} + +//----------------------------------------------------------------------------- +// +//----------------------------------------------------------------------------- + var e2tSubReqTimeout time.Duration = 5 * time.Second var e2tSubDelReqTime time.Duration = 5 * time.Second var e2tMaxSubReqTryCount uint64 = 2 // Initial try + retry @@ -43,12 +62,11 @@ var e2tMaxSubDelReqTryCount uint64 = 2 // Initial try + retry var e2tRecvMsgTimeout time.Duration = 5 * time.Second type Control struct { - e2ap *E2ap - registry *Registry - tracker *Tracker - timerMap *TimerMap - rmrSendMutex sync.Mutex - msgCounter uint64 + xapptweaks.XappWrapper + e2ap *E2ap + registry *Registry + tracker *Tracker + timerMap *TimerMap } type RMRMeid struct { @@ -79,42 +97,28 @@ func NewControl() *Control { timerMap := new(TimerMap) timerMap.Init() - return &Control{e2ap: new(E2ap), - registry: registry, - tracker: tracker, - timerMap: timerMap, - msgCounter: 0, + c := &Control{e2ap: new(E2ap), + registry: registry, + tracker: tracker, + timerMap: timerMap, } + c.XappWrapper.Init("") + return c } -func (c *Control) Run() { - xapp.Run(c) +func (c *Control) ReadyCB(data interface{}) { + if c.Rmr == nil { + c.Rmr = xapp.Rmr + } } -func (c *Control) rmrSendRaw(desc string, params *RMRParams) (err error) { - - xapp.Logger.Info("%s: %s", desc, params.String()) - status := false - i := 1 - for ; i <= 10 && status == false; i++ { - c.rmrSendMutex.Lock() - status = xapp.Rmr.Send(params.RMRParams, false) - c.rmrSendMutex.Unlock() - if status == false { - xapp.Logger.Info("rmr.Send() failed. Retry count %d, %s", i, params.String()) - time.Sleep(500 * time.Millisecond) - } - } - if status == false { - err = fmt.Errorf("rmr.Send() failed. Retry count %d, %s", i, params.String()) - xapp.Logger.Error("%s: %s", desc, err.Error()) - xapp.Rmr.Free(params.Mbuf) - } - return +func (c *Control) Run() { + xapp.SetReadyCB(c.ReadyCB, nil) + xapp.Run(c) } func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) { - params := &RMRParams{&xapp.RMRParams{}} + params := xapptweaks.NewParams(nil) params.Mtype = trans.GetMtype() params.SubId = int(subs.GetReqId().Seq) params.Xid = "" @@ -123,13 +127,13 @@ func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *Transacti params.PayloadLen = len(trans.Payload.Buf) params.Payload = trans.Payload.Buf params.Mbuf = nil - - return c.rmrSendRaw("MSG to E2T:"+desc+":"+trans.String(), params) + xapp.Logger.Info("MSG to E2T: %s %s %s", desc, trans.String(), params.String()) + return c.RmrSend(params) } func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) { - params := &RMRParams{&xapp.RMRParams{}} + params := xapptweaks.NewParams(nil) params.Mtype = trans.GetMtype() params.SubId = int(subs.GetReqId().Seq) params.Xid = trans.GetXid() @@ -138,15 +142,20 @@ func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *Transact params.PayloadLen = len(trans.Payload.Buf) params.Payload = trans.Payload.Buf params.Mbuf = nil - - return c.rmrSendRaw("MSG to XAPP:"+desc+":"+trans.String(), params) + xapp.Logger.Info("MSG to XAPP: %s %s %s", desc, trans.String(), params.String()) + return c.RmrSend(params) } func (c *Control) Consume(params *xapp.RMRParams) (err error) { - xapp.Rmr.Free(params.Mbuf) - params.Mbuf = nil - msg := &RMRParams{params} - c.msgCounter++ + msg := xapptweaks.NewParams(params) + if c.Rmr == nil { + err = fmt.Errorf("Rmr object nil can handle %s", msg.String()) + xapp.Logger.Error("%s", err.Error()) + return + } + c.CntRecvMsg++ + + defer c.Rmr.Free(msg.Mbuf) switch msg.Mtype { case xapp.RICMessageTypes["RIC_SUB_REQ"]: @@ -164,29 +173,13 @@ func (c *Control) Consume(params *xapp.RMRParams) (err error) { default: xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype) } - - return nil -} - -func idstring(err error, entries ...fmt.Stringer) string { - var retval string = "" - var filler string = "" - for _, entry := range entries { - retval += filler + entry.String() - filler = " " - } - if err != nil { - retval += filler + "err(" + err.Error() + ")" - filler = " " - - } - return retval + return } //------------------------------------------------------------------- // handle from XAPP Subscription Request //------------------------------------------------------------------ -func (c *Control) handleXAPPSubscriptionRequest(params *RMRParams) { +func (c *Control) handleXAPPSubscriptionRequest(params *xapptweaks.RMRParams) { xapp.Logger.Info("MSG from XAPP: %s", params.String()) subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload) @@ -239,13 +232,13 @@ func (c *Control) handleXAPPSubscriptionRequest(params *RMRParams) { } } xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(err, trans, subs)) - go c.registry.RemoveFromSubscription(subs, trans, 5*time.Second) + c.registry.RemoveFromSubscription(subs, trans, 5*time.Second) } //------------------------------------------------------------------- // handle from XAPP Subscription Delete Request //------------------------------------------------------------------ -func (c *Control) handleXAPPSubscriptionDeleteRequest(params *RMRParams) { +func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapptweaks.RMRParams) { xapp.Logger.Info("MSG from XAPP: %s", params.String()) subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload) @@ -290,7 +283,7 @@ func (c *Control) handleXAPPSubscriptionDeleteRequest(params *RMRParams) { c.rmrSendToXapp("", subs, trans) } - go c.registry.RemoveFromSubscription(subs, trans, 5*time.Second) + c.registry.RemoveFromSubscription(subs, trans, 5*time.Second) } //------------------------------------------------------------------- @@ -417,7 +410,7 @@ func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *Tr //------------------------------------------------------------------- // handle from E2T Subscription Reponse //------------------------------------------------------------------- -func (c *Control) handleE2TSubscriptionResponse(params *RMRParams) { +func (c *Control) handleE2TSubscriptionResponse(params *xapptweaks.RMRParams) { xapp.Logger.Info("MSG from E2T: %s", params.String()) subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload) if err != nil { @@ -446,7 +439,7 @@ func (c *Control) handleE2TSubscriptionResponse(params *RMRParams) { //------------------------------------------------------------------- // handle from E2T Subscription Failure //------------------------------------------------------------------- -func (c *Control) handleE2TSubscriptionFailure(params *RMRParams) { +func (c *Control) handleE2TSubscriptionFailure(params *xapptweaks.RMRParams) { xapp.Logger.Info("MSG from E2T: %s", params.String()) subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload) if err != nil { @@ -475,7 +468,7 @@ func (c *Control) handleE2TSubscriptionFailure(params *RMRParams) { //------------------------------------------------------------------- // handle from E2T Subscription Delete Response //------------------------------------------------------------------- -func (c *Control) handleE2TSubscriptionDeleteResponse(params *RMRParams) (err error) { +func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapptweaks.RMRParams) (err error) { xapp.Logger.Info("MSG from E2T: %s", params.String()) subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload) if err != nil { @@ -504,7 +497,7 @@ func (c *Control) handleE2TSubscriptionDeleteResponse(params *RMRParams) (err er //------------------------------------------------------------------- // handle from E2T Subscription Delete Failure //------------------------------------------------------------------- -func (c *Control) handleE2TSubscriptionDeleteFailure(params *RMRParams) { +func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapptweaks.RMRParams) { xapp.Logger.Info("MSG from E2T: %s", params.String()) subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload) if err != nil {