"fmt"
"gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
- "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/xapptweaks"
"gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
"gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
httptransport "github.com/go-openapi/runtime/client"
//
//-----------------------------------------------------------------------------
-var e2tSubReqTimeout time.Duration = 5 * time.Second
-var e2tSubDelReqTime time.Duration = 5 * time.Second
-var e2tMaxSubReqTryCount uint64 = 2 // Initial try + retry
-var e2tMaxSubDelReqTryCount uint64 = 2 // Initial try + retry
-
-var e2tRecvMsgTimeout time.Duration = 5 * time.Second
+var e2tSubReqTimeout time.Duration
+var e2tSubDelReqTime time.Duration
+var e2tRecvMsgTimeout time.Duration
+var e2tMaxSubReqTryCount uint64 // Initial try + retry
+var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
type Control struct {
- xapptweaks.XappWrapper
+ *xapp.RMRClient
e2ap *E2ap
registry *Registry
tracker *Tracker
//subscriber *xapp.Subscriber
+ CntRecvMsg uint64
}
type RMRMeid struct {
func NewControl() *Control {
+ // viper.GetDuration returns nanoseconds
+ e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
+ if e2tSubReqTimeout == 0 {
+ e2tSubReqTimeout = 2000 * 1000000
+ }
+ xapp.Logger.Info("e2tSubReqTimeout %v", e2tSubReqTimeout)
+ e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
+ if e2tSubDelReqTime == 0 {
+ e2tSubDelReqTime = 2000 * 1000000
+ }
+ xapp.Logger.Info("e2tSubDelReqTime %v", e2tSubDelReqTime)
+ e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
+ if e2tRecvMsgTimeout == 0 {
+ e2tRecvMsgTimeout = 2000 * 1000000
+ }
+ xapp.Logger.Info("e2tRecvMsgTimeout %v", e2tRecvMsgTimeout)
+ e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
+ if e2tMaxSubReqTryCount == 0 {
+ e2tMaxSubReqTryCount = 1
+ }
+ xapp.Logger.Info("e2tMaxSubReqTryCount %v", e2tMaxSubReqTryCount)
+ e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
+ if e2tMaxSubDelReqTryCount == 0 {
+ e2tMaxSubDelReqTryCount = 1
+ }
+ xapp.Logger.Info("e2tMaxSubDelReqTryCount %v", e2tMaxSubDelReqTryCount)
+
transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
tracker: tracker,
//subscriber: subscriber,
}
- c.XappWrapper.Init("")
go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler, c.SubscriptionDeleteHandler)
//go c.subscriber.Listen(c.SubscriptionHandler, c.QueryHandler)
return c
}
func (c *Control) ReadyCB(data interface{}) {
- if c.Rmr == nil {
- c.Rmr = xapp.Rmr
+ if c.RMRClient == nil {
+ c.RMRClient = xapp.Rmr
}
}
//-------------------------------------------------------------------
func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
- params := xapptweaks.NewParams(nil)
+ params := &xapp.RMRParams{}
params.Mtype = trans.GetMtype()
params.SubId = int(subs.GetReqId().InstanceId)
params.Xid = ""
params.Payload = trans.Payload.Buf
params.Mbuf = nil
xapp.Logger.Info("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
- return c.RmrSend(params, 5)
+ return c.SendWithRetry(params, false, 5)
}
func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
- params := xapptweaks.NewParams(nil)
+ params := &xapp.RMRParams{}
params.Mtype = trans.GetMtype()
params.SubId = int(subs.GetReqId().InstanceId)
params.Xid = trans.GetXid()
params.Payload = trans.Payload.Buf
params.Mbuf = nil
xapp.Logger.Info("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
- return c.RmrSend(params, 5)
+ return c.SendWithRetry(params, false, 5)
}
-func (c *Control) Consume(params *xapp.RMRParams) (err error) {
- msg := xapptweaks.NewParams(params)
- if c.Rmr == nil {
+func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
+ if c.RMRClient == nil {
err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
xapp.Logger.Error("%s", err.Error())
return
}
c.CntRecvMsg++
- defer c.Rmr.Free(msg.Mbuf)
+ defer c.RMRClient.Free(msg.Mbuf)
// xapp-frame might use direct access to c buffer and
// when msg.Mbuf is freed, someone might take it into use
//-------------------------------------------------------------------
// handle from XAPP Subscription Request
//------------------------------------------------------------------
-func (c *Control) handleXAPPSubscriptionRequest(params *xapptweaks.RMRParams) {
+func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
xapp.Logger.Info("MSG from XAPP: %s", params.String())
subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
return
}
- trans := c.tracker.NewXappTransaction(xapptweaks.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId.InstanceId, params.Meid)
+ trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId.InstanceId, params.Meid)
if trans == nil {
xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
return
//-------------------------------------------------------------------
// handle from XAPP Subscription Delete Request
//------------------------------------------------------------------
-func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapptweaks.RMRParams) {
+func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
xapp.Logger.Info("MSG from XAPP: %s", params.String())
subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
return
}
- trans := c.tracker.NewXappTransaction(xapptweaks.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId.InstanceId, params.Meid)
+ trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId.InstanceId, params.Meid)
if trans == nil {
xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
return
//-------------------------------------------------------------------
// handle from E2T Subscription Reponse
//-------------------------------------------------------------------
-func (c *Control) handleE2TSubscriptionResponse(params *xapptweaks.RMRParams) {
+func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
xapp.Logger.Info("MSG from E2T: %s", params.String())
subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
if err != nil {
//-------------------------------------------------------------------
// handle from E2T Subscription Failure
//-------------------------------------------------------------------
-func (c *Control) handleE2TSubscriptionFailure(params *xapptweaks.RMRParams) {
+func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
xapp.Logger.Info("MSG from E2T: %s", params.String())
subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
if err != nil {
//-------------------------------------------------------------------
// handle from E2T Subscription Delete Response
//-------------------------------------------------------------------
-func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapptweaks.RMRParams) (err error) {
+func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
xapp.Logger.Info("MSG from E2T: %s", params.String())
subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
if err != nil {
//-------------------------------------------------------------------
// handle from E2T Subscription Delete Failure
//-------------------------------------------------------------------
-func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapptweaks.RMRParams) {
+func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
xapp.Logger.Info("MSG from E2T: %s", params.String())
subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
if err != nil {