X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=pkg%2Fcontrol%2Fcontrol.go;h=0202d7ebe25228a7336fde6da9abe551fea6badb;hb=dce97d040839d06292cb0bdf7e62e3d7ac942854;hp=6111c636f4118931036bdfe58662da982fecae00;hpb=4c626a2869ab103ac837910518f8e71cb0541b27;p=ric-plt%2Fsubmgr.git diff --git a/pkg/control/control.go b/pkg/control/control.go index 6111c63..0202d7e 100755 --- a/pkg/control/control.go +++ b/pkg/control/control.go @@ -51,7 +51,6 @@ func idstring(err error, entries ...fmt.Stringer) string { if err != nil { retval += filler + "err(" + err.Error() + ")" filler = " " - } return retval } @@ -97,7 +96,6 @@ func init() { func NewControl() *Control { - ReadConfigParameters() transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"}) rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)} @@ -108,8 +106,6 @@ func NewControl() *Control { tracker := new(Tracker) tracker.Init() - //subscriber := xapp.NewSubscriber(viper.GetString("subscription.host"), viper.GetInt("subscription.timeout")) - c := &Control{e2ap: new(E2ap), registry: registry, tracker: tracker, @@ -117,12 +113,13 @@ func NewControl() *Control { //subscriber: subscriber, Counters: xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"), } + c.ReadConfigParameters("") // Register REST handler for testing support xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST") + xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET") go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler, c.SubscriptionDeleteHandler) - //go c.subscriber.Listen(c.SubscriptionHandler, c.QueryHandler) if readSubsFromDb == "false" { return c @@ -141,10 +138,15 @@ func NewControl() *Control { return c } +func (c *Control) SymptomDataHandler(w http.ResponseWriter, r *http.Request) { + subscriptions, _ := c.registry.QueryHandler() + xapp.Resource.SendSymptomDataJson(w, r, subscriptions, "platform/subscriptions.json") +} + //------------------------------------------------------------------- // //------------------------------------------------------------------- -func ReadConfigParameters() { +func (c *Control) ReadConfigParameters(f string) { // viper.GetDuration returns nanoseconds e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000 @@ -203,13 +205,14 @@ func (c *Control) ReadyCB(data interface{}) { func (c *Control) Run() { xapp.SetReadyCB(c.ReadyCB, nil) + xapp.AddConfigChangeListener(c.ReadConfigParameters) xapp.Run(c) } //------------------------------------------------------------------- // //------------------------------------------------------------------- -func (c *Control) SubscriptionHandler(stype models.SubscriptionType, params interface{}) (*models.SubscriptionResponse, error) { +func (c *Control) SubscriptionHandler(params interface{}) (*models.SubscriptionResponse, error) { /* switch p := params.(type) { case *models.ReportParams: @@ -283,7 +286,11 @@ func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *Transacti params.Payload = trans.Payload.Buf params.Mbuf = nil xapp.Logger.Info("MSG to E2T: %s %s %s", desc, trans.String(), params.String()) - return c.SendWithRetry(params, false, 5) + err = c.SendWithRetry(params, false, 5) + if err != nil { + xapp.Logger.Error("rmrSendToE2T: Send failed: %+v", err) + } + return err } func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) { @@ -298,7 +305,11 @@ func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *Transact params.Payload = trans.Payload.Buf params.Mbuf = nil xapp.Logger.Info("MSG to XAPP: %s %s %s", desc, trans.String(), params.String()) - return c.SendWithRetry(params, false, 5) + err = c.SendWithRetry(params, false, 5) + if err != nil { + xapp.Logger.Error("rmrSendToXapp: Send failed: %+v", err) + } + return err } func (c *Control) Consume(msg *xapp.RMRParams) (err error) { @@ -360,8 +371,7 @@ func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) { } defer trans.Release() - err = c.tracker.Track(trans) - if err != nil { + if err = c.tracker.Track(trans); err != nil { xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans)) return } @@ -373,12 +383,17 @@ func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) { return } - // - // Wake subs request - // + c.wakeSubscriptionRequest(subs, trans) +} + +//------------------------------------------------------------------- +// Wake Subscription Request to E2node +//------------------------------------------------------------------ +func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) { + go c.handleSubscriptionCreate(subs, trans) event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side - err = nil + var err error if event != nil { switch themsg := event.(type) { case *e2ap.E2APSubscriptionResponse: @@ -488,8 +503,7 @@ func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *Tran case *e2ap.E2APSubscriptionFailure: removeSubscriptionFromDb = true subRfMsg, valid = subs.SetCachedResponse(event, false) - xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans)) - c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans) + xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans)) case *SubmgrRestartTestEvent: // This simulates that no response has been received and after restart subscriptions are restored from db xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")