X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=pkg%2Fcontrol%2Fcontrol.go;h=b5c60ec554d890f2df4eb07575ab5b0ac694d8f2;hb=refs%2Fchanges%2F01%2F2701%2F4;hp=2d7d535e7e099937667a25334fef95c1143bebe5;hpb=47942b4fd845472cd4f669bdcc5320be71f1570c;p=ric-plt%2Fsubmgr.git diff --git a/pkg/control/control.go b/pkg/control/control.go index 2d7d535..b5c60ec 100755 --- a/pkg/control/control.go +++ b/pkg/control/control.go @@ -24,6 +24,7 @@ import ( "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap" rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client" "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/xapptweaks" + "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models" "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" httptransport "github.com/go-openapi/runtime/client" "github.com/go-openapi/strfmt" @@ -66,7 +67,7 @@ type Control struct { e2ap *E2ap registry *Registry tracker *Tracker - timerMap *TimerMap + //subscriber *xapp.Subscriber } type RMRMeid struct { @@ -94,15 +95,16 @@ func NewControl() *Control { tracker := new(Tracker) tracker.Init() - timerMap := new(TimerMap) - timerMap.Init() + //subscriber := xapp.NewSubscriber(viper.GetString("subscription.host"), viper.GetInt("subscription.timeout")) c := &Control{e2ap: new(E2ap), registry: registry, tracker: tracker, - timerMap: timerMap, + //subscriber: subscriber, } c.XappWrapper.Init("") + go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler) + //go c.subscriber.Listen(c.SubscriptionHandler, c.QueryHandler) return c } @@ -117,6 +119,34 @@ func (c *Control) Run() { xapp.Run(c) } +//------------------------------------------------------------------- +// +//------------------------------------------------------------------- +func (c *Control) SubscriptionHandler(stype models.SubscriptionType, params interface{}) (models.SubscriptionResult, error) { + /* + switch p := params.(type) { + case *models.ReportParams: + trans := c.tracker.NewXappTransaction(NewRmrEndpoint(p.ClientEndpoint),"" , 0, &xapp.RMRMeid{RanName: p.Meid}) + if trans == nil { + xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params)) + return + } + defer trans.Release() + case *models.ControlParams: + case *models.PolicyParams: + } + */ + return models.SubscriptionResult{}, fmt.Errorf("Subscription rest interface not implemented") +} + +func (c *Control) QueryHandler() (models.SubscriptionList, error) { + return c.registry.QueryHandler() +} + +//------------------------------------------------------------------- +// +//------------------------------------------------------------------- + func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) { params := xapptweaks.NewParams(nil) params.Mtype = trans.GetMtype() @@ -128,7 +158,7 @@ func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *Transacti params.Payload = trans.Payload.Buf params.Mbuf = nil xapp.Logger.Info("MSG to E2T: %s %s %s", desc, trans.String(), params.String()) - return c.RmrSend(params) + return c.RmrSend(params, 5) } func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) { @@ -143,7 +173,7 @@ func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *Transact params.Payload = trans.Payload.Buf params.Mbuf = nil xapp.Logger.Info("MSG to XAPP: %s %s %s", desc, trans.String(), params.String()) - return c.RmrSend(params) + return c.RmrSend(params, 5) } func (c *Control) Consume(params *xapp.RMRParams) (err error) { @@ -158,17 +188,17 @@ func (c *Control) Consume(params *xapp.RMRParams) (err error) { defer c.Rmr.Free(msg.Mbuf) switch msg.Mtype { - case xapp.RICMessageTypes["RIC_SUB_REQ"]: + case xapp.RIC_SUB_REQ: go c.handleXAPPSubscriptionRequest(msg) - case xapp.RICMessageTypes["RIC_SUB_RESP"]: + case xapp.RIC_SUB_RESP: go c.handleE2TSubscriptionResponse(msg) - case xapp.RICMessageTypes["RIC_SUB_FAILURE"]: + case xapp.RIC_SUB_FAILURE: go c.handleE2TSubscriptionFailure(msg) - case xapp.RICMessageTypes["RIC_SUB_DEL_REQ"]: + case xapp.RIC_SUB_DEL_REQ: go c.handleXAPPSubscriptionDeleteRequest(msg) - case xapp.RICMessageTypes["RIC_SUB_DEL_RESP"]: + case xapp.RIC_SUB_DEL_RESP: go c.handleE2TSubscriptionDeleteResponse(msg) - case xapp.RICMessageTypes["RIC_SUB_DEL_FAILURE"]: + case xapp.RIC_SUB_DEL_FAILURE: go c.handleE2TSubscriptionDeleteFailure(msg) default: xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)