X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=pkg%2Fcontrol%2Fcontrol.go;h=b5c60ec554d890f2df4eb07575ab5b0ac694d8f2;hb=refs%2Fchanges%2F01%2F2701%2F4;hp=094e020c29fe3cd8625f61e482630ddb4de7a34d;hpb=e530286062bdc07625e54433bd64530ae7ae2118;p=ric-plt%2Fsubmgr.git diff --git a/pkg/control/control.go b/pkg/control/control.go index 094e020..b5c60ec 100755 --- a/pkg/control/control.go +++ b/pkg/control/control.go @@ -24,6 +24,7 @@ import ( "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap" rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client" "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/xapptweaks" + "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models" "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" httptransport "github.com/go-openapi/runtime/client" "github.com/go-openapi/strfmt" @@ -66,7 +67,7 @@ type Control struct { e2ap *E2ap registry *Registry tracker *Tracker - timerMap *TimerMap + //subscriber *xapp.Subscriber } type RMRMeid struct { @@ -94,15 +95,16 @@ func NewControl() *Control { tracker := new(Tracker) tracker.Init() - timerMap := new(TimerMap) - timerMap.Init() + //subscriber := xapp.NewSubscriber(viper.GetString("subscription.host"), viper.GetInt("subscription.timeout")) c := &Control{e2ap: new(E2ap), registry: registry, tracker: tracker, - timerMap: timerMap, + //subscriber: subscriber, } c.XappWrapper.Init("") + go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler) + //go c.subscriber.Listen(c.SubscriptionHandler, c.QueryHandler) return c } @@ -117,6 +119,34 @@ func (c *Control) Run() { xapp.Run(c) } +//------------------------------------------------------------------- +// +//------------------------------------------------------------------- +func (c *Control) SubscriptionHandler(stype models.SubscriptionType, params interface{}) (models.SubscriptionResult, error) { + /* + switch p := params.(type) { + case *models.ReportParams: + trans := c.tracker.NewXappTransaction(NewRmrEndpoint(p.ClientEndpoint),"" , 0, &xapp.RMRMeid{RanName: p.Meid}) + if trans == nil { + xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params)) + return + } + defer trans.Release() + case *models.ControlParams: + case *models.PolicyParams: + } + */ + return models.SubscriptionResult{}, fmt.Errorf("Subscription rest interface not implemented") +} + +func (c *Control) QueryHandler() (models.SubscriptionList, error) { + return c.registry.QueryHandler() +} + +//------------------------------------------------------------------- +// +//------------------------------------------------------------------- + func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) { params := xapptweaks.NewParams(nil) params.Mtype = trans.GetMtype() @@ -128,7 +158,7 @@ func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *Transacti params.Payload = trans.Payload.Buf params.Mbuf = nil xapp.Logger.Info("MSG to E2T: %s %s %s", desc, trans.String(), params.String()) - return c.RmrSend(params) + return c.RmrSend(params, 5) } func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) { @@ -143,7 +173,7 @@ func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *Transact params.Payload = trans.Payload.Buf params.Mbuf = nil xapp.Logger.Info("MSG to XAPP: %s %s %s", desc, trans.String(), params.String()) - return c.RmrSend(params) + return c.RmrSend(params, 5) } func (c *Control) Consume(params *xapp.RMRParams) (err error) { @@ -158,17 +188,17 @@ func (c *Control) Consume(params *xapp.RMRParams) (err error) { defer c.Rmr.Free(msg.Mbuf) switch msg.Mtype { - case xapp.RICMessageTypes["RIC_SUB_REQ"]: + case xapp.RIC_SUB_REQ: go c.handleXAPPSubscriptionRequest(msg) - case xapp.RICMessageTypes["RIC_SUB_RESP"]: + case xapp.RIC_SUB_RESP: go c.handleE2TSubscriptionResponse(msg) - case xapp.RICMessageTypes["RIC_SUB_FAILURE"]: + case xapp.RIC_SUB_FAILURE: go c.handleE2TSubscriptionFailure(msg) - case xapp.RICMessageTypes["RIC_SUB_DEL_REQ"]: + case xapp.RIC_SUB_DEL_REQ: go c.handleXAPPSubscriptionDeleteRequest(msg) - case xapp.RICMessageTypes["RIC_SUB_DEL_RESP"]: + case xapp.RIC_SUB_DEL_RESP: go c.handleE2TSubscriptionDeleteResponse(msg) - case xapp.RICMessageTypes["RIC_SUB_DEL_FAILURE"]: + case xapp.RIC_SUB_DEL_FAILURE: go c.handleE2TSubscriptionDeleteFailure(msg) default: xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype) @@ -188,7 +218,7 @@ func (c *Control) handleXAPPSubscriptionRequest(params *xapptweaks.RMRParams) { return } - trans := c.tracker.NewXappTransaction(NewRmrEndpoint(params.Src), params.Xid, &RequestId{subReqMsg.RequestId}, params.Meid) + trans := c.tracker.NewXappTransaction(NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId.Seq, params.Meid) if trans == nil { xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params)) return @@ -219,6 +249,7 @@ func (c *Control) handleXAPPSubscriptionRequest(params *xapptweaks.RMRParams) { case *e2ap.E2APSubscriptionResponse: trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg) if err == nil { + trans.Release() c.rmrSendToXapp("", subs, trans) return } @@ -247,7 +278,7 @@ func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapptweaks.RMRPara return } - trans := c.tracker.NewXappTransaction(NewRmrEndpoint(params.Src), params.Xid, &RequestId{subDelReqMsg.RequestId}, params.Meid) + trans := c.tracker.NewXappTransaction(NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId.Seq, params.Meid) if trans == nil { xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params)) return @@ -260,7 +291,7 @@ func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapptweaks.RMRPara return } - subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelReqMsg.RequestId.Seq}) + subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()}) if err != nil { xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans)) return @@ -276,7 +307,7 @@ func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapptweaks.RMRPara // Whatever is received send ok delete response subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{} - subDelRespMsg.RequestId = subs.SubReqMsg.RequestId + subDelRespMsg.RequestId = subs.GetReqId().RequestId subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg) if err == nil { @@ -341,10 +372,7 @@ func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *Tran subs.mutex.Unlock() } - subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{} - subDelRespMsg.RequestId = subs.SubReqMsg.RequestId - subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId - parentTrans.SendEvent(subDelRespMsg, 0) + parentTrans.SendEvent(nil, 0) } //-------------------------------------------------------------------