"gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
"gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/xapptweaks"
+ "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
"gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
httptransport "github.com/go-openapi/runtime/client"
"github.com/go-openapi/strfmt"
e2ap *E2ap
registry *Registry
tracker *Tracker
- timerMap *TimerMap
+ //subscriber *xapp.Subscriber
}
type RMRMeid struct {
tracker := new(Tracker)
tracker.Init()
- timerMap := new(TimerMap)
- timerMap.Init()
+ //subscriber := xapp.NewSubscriber(viper.GetString("subscription.host"), viper.GetInt("subscription.timeout"))
c := &Control{e2ap: new(E2ap),
registry: registry,
tracker: tracker,
- timerMap: timerMap,
+ //subscriber: subscriber,
}
c.XappWrapper.Init("")
+ go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler)
+ //go c.subscriber.Listen(c.SubscriptionHandler, c.QueryHandler)
return c
}
xapp.Run(c)
}
+//-------------------------------------------------------------------
+//
+//-------------------------------------------------------------------
+func (c *Control) SubscriptionHandler(stype models.SubscriptionType, params interface{}) (models.SubscriptionResult, error) {
+ /*
+ switch p := params.(type) {
+ case *models.ReportParams:
+ trans := c.tracker.NewXappTransaction(NewRmrEndpoint(p.ClientEndpoint),"" , 0, &xapp.RMRMeid{RanName: p.Meid})
+ if trans == nil {
+ xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
+ return
+ }
+ defer trans.Release()
+ case *models.ControlParams:
+ case *models.PolicyParams:
+ }
+ */
+ return models.SubscriptionResult{}, fmt.Errorf("Subscription rest interface not implemented")
+}
+
+func (c *Control) QueryHandler() (models.SubscriptionList, error) {
+ return c.registry.QueryHandler()
+}
+
+//-------------------------------------------------------------------
+//
+//-------------------------------------------------------------------
+
func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
params := xapptweaks.NewParams(nil)
params.Mtype = trans.GetMtype()
params.Payload = trans.Payload.Buf
params.Mbuf = nil
xapp.Logger.Info("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
- return c.RmrSend(params)
+ return c.RmrSend(params, 5)
}
func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
params.Payload = trans.Payload.Buf
params.Mbuf = nil
xapp.Logger.Info("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
- return c.RmrSend(params)
+ return c.RmrSend(params, 5)
}
func (c *Control) Consume(params *xapp.RMRParams) (err error) {
defer c.Rmr.Free(msg.Mbuf)
switch msg.Mtype {
- case xapp.RICMessageTypes["RIC_SUB_REQ"]:
+ case xapp.RIC_SUB_REQ:
go c.handleXAPPSubscriptionRequest(msg)
- case xapp.RICMessageTypes["RIC_SUB_RESP"]:
+ case xapp.RIC_SUB_RESP:
go c.handleE2TSubscriptionResponse(msg)
- case xapp.RICMessageTypes["RIC_SUB_FAILURE"]:
+ case xapp.RIC_SUB_FAILURE:
go c.handleE2TSubscriptionFailure(msg)
- case xapp.RICMessageTypes["RIC_SUB_DEL_REQ"]:
+ case xapp.RIC_SUB_DEL_REQ:
go c.handleXAPPSubscriptionDeleteRequest(msg)
- case xapp.RICMessageTypes["RIC_SUB_DEL_RESP"]:
+ case xapp.RIC_SUB_DEL_RESP:
go c.handleE2TSubscriptionDeleteResponse(msg)
- case xapp.RICMessageTypes["RIC_SUB_DEL_FAILURE"]:
+ case xapp.RIC_SUB_DEL_FAILURE:
go c.handleE2TSubscriptionDeleteFailure(msg)
default:
xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
return
}
- trans := c.tracker.NewXappTransaction(NewRmrEndpoint(params.Src), params.Xid, &RequestId{subReqMsg.RequestId}, params.Meid)
+ trans := c.tracker.NewXappTransaction(NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId.Seq, params.Meid)
if trans == nil {
xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
return
case *e2ap.E2APSubscriptionResponse:
trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
if err == nil {
+ trans.Release()
c.rmrSendToXapp("", subs, trans)
return
}
}
}
xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(err, trans, subs))
- go c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
+ c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
}
//-------------------------------------------------------------------
return
}
- trans := c.tracker.NewXappTransaction(NewRmrEndpoint(params.Src), params.Xid, &RequestId{subDelReqMsg.RequestId}, params.Meid)
+ trans := c.tracker.NewXappTransaction(NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId.Seq, params.Meid)
if trans == nil {
xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
return
return
}
- subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelReqMsg.RequestId.Seq})
+ subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
if err != nil {
xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
return
// Whatever is received send ok delete response
subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
- subDelRespMsg.RequestId = subs.SubReqMsg.RequestId
+ subDelRespMsg.RequestId = subs.GetReqId().RequestId
subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
if err == nil {
c.rmrSendToXapp("", subs, trans)
}
- go c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
+ c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
}
//-------------------------------------------------------------------
subs.mutex.Unlock()
}
- subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
- subDelRespMsg.RequestId = subs.SubReqMsg.RequestId
- subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
- parentTrans.SendEvent(subDelRespMsg, 0)
+ parentTrans.SendEvent(nil, 0)
}
//-------------------------------------------------------------------