X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=pkg%2Fcontrol%2Fcontrol.go;h=51284581a707ddc0193db3eea46f5cb7b7450edf;hb=9dc5adc78d459157a42ef8997eeced82a3616f01;hp=35e39f4c0497cd45f3d35d4e0a9a6bc5507c430f;hpb=f3f4efb9be78f25d3417ecbbce3ea988550e5381;p=ric-plt%2Fsubmgr.git diff --git a/pkg/control/control.go b/pkg/control/control.go index 35e39f4..5128458 100755 --- a/pkg/control/control.go +++ b/pkg/control/control.go @@ -23,7 +23,6 @@ import ( "fmt" "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap" rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client" - "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/xapptweaks" "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models" "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" httptransport "github.com/go-openapi/runtime/client" @@ -63,11 +62,12 @@ var e2tMaxSubDelReqTryCount uint64 = 2 // Initial try + retry var e2tRecvMsgTimeout time.Duration = 5 * time.Second type Control struct { - xapptweaks.XappWrapper + *xapp.RMRClient e2ap *E2ap registry *Registry tracker *Tracker //subscriber *xapp.Subscriber + CntRecvMsg uint64 } type RMRMeid struct { @@ -102,15 +102,14 @@ func NewControl() *Control { tracker: tracker, //subscriber: subscriber, } - c.XappWrapper.Init("") go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler, c.SubscriptionDeleteHandler) //go c.subscriber.Listen(c.SubscriptionHandler, c.QueryHandler) return c } func (c *Control) ReadyCB(data interface{}) { - if c.Rmr == nil { - c.Rmr = xapp.Rmr + if c.RMRClient == nil { + c.RMRClient = xapp.Rmr } } @@ -152,7 +151,7 @@ func (c *Control) QueryHandler() (models.SubscriptionList, error) { //------------------------------------------------------------------- func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) { - params := xapptweaks.NewParams(nil) + params := &xapp.RMRParams{} params.Mtype = trans.GetMtype() params.SubId = int(subs.GetReqId().InstanceId) params.Xid = "" @@ -162,12 +161,12 @@ func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *Transacti params.Payload = trans.Payload.Buf params.Mbuf = nil xapp.Logger.Info("MSG to E2T: %s %s %s", desc, trans.String(), params.String()) - return c.RmrSend(params, 5) + return c.SendWithRetry(params, false, 5) } func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) { - params := xapptweaks.NewParams(nil) + params := &xapp.RMRParams{} params.Mtype = trans.GetMtype() params.SubId = int(subs.GetReqId().InstanceId) params.Xid = trans.GetXid() @@ -177,19 +176,18 @@ func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *Transact params.Payload = trans.Payload.Buf params.Mbuf = nil xapp.Logger.Info("MSG to XAPP: %s %s %s", desc, trans.String(), params.String()) - return c.RmrSend(params, 5) + return c.SendWithRetry(params, false, 5) } -func (c *Control) Consume(params *xapp.RMRParams) (err error) { - msg := xapptweaks.NewParams(params) - if c.Rmr == nil { +func (c *Control) Consume(msg *xapp.RMRParams) (err error) { + if c.RMRClient == nil { err = fmt.Errorf("Rmr object nil can handle %s", msg.String()) xapp.Logger.Error("%s", err.Error()) return } c.CntRecvMsg++ - defer c.Rmr.Free(msg.Mbuf) + defer c.RMRClient.Free(msg.Mbuf) // xapp-frame might use direct access to c buffer and // when msg.Mbuf is freed, someone might take it into use @@ -223,7 +221,7 @@ func (c *Control) Consume(params *xapp.RMRParams) (err error) { //------------------------------------------------------------------- // handle from XAPP Subscription Request //------------------------------------------------------------------ -func (c *Control) handleXAPPSubscriptionRequest(params *xapptweaks.RMRParams) { +func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) { xapp.Logger.Info("MSG from XAPP: %s", params.String()) subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload) @@ -232,7 +230,7 @@ func (c *Control) handleXAPPSubscriptionRequest(params *xapptweaks.RMRParams) { return } - trans := c.tracker.NewXappTransaction(xapptweaks.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId.InstanceId, params.Meid) + trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId.InstanceId, params.Meid) if trans == nil { xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params)) return @@ -284,7 +282,7 @@ func (c *Control) handleXAPPSubscriptionRequest(params *xapptweaks.RMRParams) { //------------------------------------------------------------------- // handle from XAPP Subscription Delete Request //------------------------------------------------------------------ -func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapptweaks.RMRParams) { +func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) { xapp.Logger.Info("MSG from XAPP: %s", params.String()) subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload) @@ -293,7 +291,7 @@ func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapptweaks.RMRPara return } - trans := c.tracker.NewXappTransaction(xapptweaks.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId.InstanceId, params.Meid) + trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId.InstanceId, params.Meid) if trans == nil { xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params)) return @@ -483,7 +481,7 @@ func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *Tr //------------------------------------------------------------------- // handle from E2T Subscription Reponse //------------------------------------------------------------------- -func (c *Control) handleE2TSubscriptionResponse(params *xapptweaks.RMRParams) { +func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) { xapp.Logger.Info("MSG from E2T: %s", params.String()) subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload) if err != nil { @@ -512,7 +510,7 @@ func (c *Control) handleE2TSubscriptionResponse(params *xapptweaks.RMRParams) { //------------------------------------------------------------------- // handle from E2T Subscription Failure //------------------------------------------------------------------- -func (c *Control) handleE2TSubscriptionFailure(params *xapptweaks.RMRParams) { +func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) { xapp.Logger.Info("MSG from E2T: %s", params.String()) subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload) if err != nil { @@ -541,7 +539,7 @@ func (c *Control) handleE2TSubscriptionFailure(params *xapptweaks.RMRParams) { //------------------------------------------------------------------- // handle from E2T Subscription Delete Response //------------------------------------------------------------------- -func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapptweaks.RMRParams) (err error) { +func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) { xapp.Logger.Info("MSG from E2T: %s", params.String()) subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload) if err != nil { @@ -570,7 +568,7 @@ func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapptweaks.RMRPara //------------------------------------------------------------------- // handle from E2T Subscription Delete Failure //------------------------------------------------------------------- -func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapptweaks.RMRParams) { +func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) { xapp.Logger.Info("MSG from E2T: %s", params.String()) subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload) if err != nil {