X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=pkg%2Fcontrol%2Fcontrol.go;h=5dedb1e37919c8c99e53e74d2c61092ba2a42046;hb=568d260371fd14454e5a3e41133e65bc7099a1b9;hp=4df76768dfc1cfac66607cdc853025eb26835737;hpb=42723e2593926f1cfa144b503bf043a0fe36e657;p=ric-plt%2Fsubmgr.git diff --git a/pkg/control/control.go b/pkg/control/control.go index 4df7676..5dedb1e 100755 --- a/pkg/control/control.go +++ b/pkg/control/control.go @@ -72,6 +72,8 @@ var e2tMaxSubReqTryCount uint64 // Initial try + retry var e2tMaxSubDelReqTryCount uint64 // Initial try + retry var readSubsFromDb string var restDuplicateCtrl duplicateCtrl +var dbRetryForever string +var dbTryCount int type Control struct { *xapp.RMRClient @@ -84,6 +86,7 @@ type Control struct { ResetTestFlag bool Counters map[string]xapp.Counter LoggerLevel uint32 + UTTesting bool } type RMRMeid struct { @@ -126,6 +129,7 @@ func NewControl() *Control { // Register REST handler for testing support xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST") + xapp.Resource.InjectRoute("/ric/v1/restsubscriptions", c.GetAllRestSubscriptions, "GET") xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET") go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler, c.SubscriptionDeleteHandlerCB) @@ -137,22 +141,8 @@ func NewControl() *Control { restDuplicateCtrl.Init() // Read subscriptions from db - xapp.Logger.Info("Reading subscriptions from db") - subIds, register, err := c.ReadAllSubscriptionsFromSdl() - if err != nil { - xapp.Logger.Error("%v", err) - } else { - c.registry.subIds = subIds - c.registry.register = register - c.HandleUncompletedSubscriptions(register) - } - - restSubscriptions, err := c.ReadAllRESTSubscriptionsFromSdl() - if err != nil { - xapp.Logger.Error("%v", err) - } else { - c.registry.restSubscriptions = restSubscriptions - } + c.ReadE2Subscriptions() + c.ReadRESTSubscriptions() return c } @@ -161,6 +151,60 @@ func (c *Control) SymptomDataHandler(w http.ResponseWriter, r *http.Request) { xapp.Resource.SendSymptomDataJson(w, r, subscriptions, "platform/subscriptions.json") } +//------------------------------------------------------------------- +// +//------------------------------------------------------------------- +func (c *Control) GetAllRestSubscriptions(w http.ResponseWriter, r *http.Request) { + xapp.Logger.Info("GetAllRestSubscriptions() called") + response := c.registry.GetAllRestSubscriptions() + w.Write(response) +} + +//------------------------------------------------------------------- +// +//------------------------------------------------------------------- +func (c *Control) ReadE2Subscriptions() error { + var err error + var subIds []uint32 + var register map[uint32]*Subscription + for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ { + xapp.Logger.Info("Reading E2 subscriptions from db") + subIds, register, err = c.ReadAllSubscriptionsFromSdl() + if err != nil { + xapp.Logger.Error("%v", err) + <-time.After(1 * time.Second) + } else { + c.registry.subIds = subIds + c.registry.register = register + c.HandleUncompletedSubscriptions(register) + return nil + } + } + xapp.Logger.Info("Continuing without retring") + return err +} + +//------------------------------------------------------------------- +// +//------------------------------------------------------------------- +func (c *Control) ReadRESTSubscriptions() error { + var err error + var restSubscriptions map[string]*RESTSubscription + for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ { + xapp.Logger.Info("Reading REST subscriptions from db") + restSubscriptions, err = c.ReadAllRESTSubscriptionsFromSdl() + if err != nil { + xapp.Logger.Error("%v", err) + <-time.After(1 * time.Second) + } else { + c.registry.restSubscriptions = restSubscriptions + return nil + } + } + xapp.Logger.Info("Continuing without retring") + return err +} + //------------------------------------------------------------------- // //------------------------------------------------------------------- @@ -183,14 +227,6 @@ func (c *Control) ReadConfigParameters(f string) { } xapp.Logger.Info("e2tRecvMsgTimeout %v", e2tRecvMsgTimeout) - // Internal cfg parameter, used to define a wait time for RMR route clean-up. None default - // value 100ms used currently only in unittests. - waitRouteCleanup_ms = viper.GetDuration("controls.waitRouteCleanup_ms") * 1000000 - if waitRouteCleanup_ms == 0 { - waitRouteCleanup_ms = 5000 * 1000000 - } - xapp.Logger.Info("waitRouteCleanup %v", waitRouteCleanup_ms) - e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount") if e2tMaxSubReqTryCount == 0 { e2tMaxSubReqTryCount = 1 @@ -208,10 +244,32 @@ func (c *Control) ReadConfigParameters(f string) { readSubsFromDb = "true" } xapp.Logger.Info("readSubsFromDb %v", readSubsFromDb) + + dbTryCount = viper.GetInt("controls.dbTryCount") + if dbTryCount == 0 { + dbTryCount = 200 + } + xapp.Logger.Info("dbTryCount %v", dbTryCount) + + dbRetryForever = viper.GetString("controls.dbRetryForever") + if dbRetryForever == "" { + dbRetryForever = "true" + } + xapp.Logger.Info("dbRetryForever %v", dbRetryForever) + c.LoggerLevel = viper.GetUint32("logger.level") if c.LoggerLevel == 0 { c.LoggerLevel = 3 } + xapp.Logger.Info("LoggerLevel %v", c.LoggerLevel) + + // Internal cfg parameter, used to define a wait time for RMR route clean-up. None default + // value 100ms used currently only in unittests. + waitRouteCleanup_ms = viper.GetDuration("controls.waitRouteCleanup_ms") * 1000000 + if waitRouteCleanup_ms == 0 { + waitRouteCleanup_ms = 5000 * 1000000 + } + xapp.Logger.Info("waitRouteCleanup %v", waitRouteCleanup_ms) } //------------------------------------------------------------------- @@ -355,6 +413,8 @@ func (c *Control) SubscriptionHandler(params interface{}) (*models.SubscriptionR return &subResp, nil } + c.WriteRESTSubscriptionToDb(restSubId, restSubscription) + go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId, xAppRmrEndpoint, md5sum) c.UpdateCounter(cRestSubRespToXapp) @@ -368,7 +428,8 @@ func (c *Control) SubscriptionHandler(params interface{}) (*models.SubscriptionR func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription, subReqList *e2ap.SubscriptionRequestList, clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string, xAppRmrEndpoint string, md5sum string) { - xapp.Logger.Info("Subscription Request count=%v ", len(subReqList.E2APSubscriptionRequests)) + c.SubscriptionProcessingStartDelay() + xapp.Logger.Debug("Subscription Request count=%v ", len(subReqList.E2APSubscriptionRequests)) var xAppEventInstanceID int64 var e2EventInstanceID int64 @@ -391,7 +452,8 @@ func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription subRespMsg, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, *restSubId) - xapp.Logger.Info("Handled SubscriptionRequest index=%v, %s", index, idstring(nil, trans)) + xapp.Logger.Debug("Handled SubscriptionRequest index=%v, %s", index, idstring(nil, trans)) + trans.Release() if err != nil { c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans) @@ -402,7 +464,19 @@ func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription index, *restSubId, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans)) c.sendSuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, e2EventInstanceID, clientEndpoint, trans) } - trans.Release() + } +} + +//------------------------------------------------------------------- +// +//------------------------------------------------------------------ +func (c *Control) SubscriptionProcessingStartDelay() { + if c.UTTesting == true { + // This is temporary fix for the UT problem that notification arrives before subscription response + // Correct fix would be to allow notification come before response and process it correctly + xapp.Logger.Debug("Setting 50 ms delay before starting processing Subscriptions") + <-time.After(time.Millisecond * 50) + xapp.Logger.Debug("Continuing after delay") } } @@ -544,7 +618,7 @@ func (c *Control) SubscriptionDeleteHandlerCB(restSubId string) error { xAppRmrEndPoint := restSubscription.xAppRmrEndPoint go func() { - xapp.Logger.Info("Deleteting instances = %v", restSubscription.InstanceIds) + xapp.Logger.Info("Deleteting handler: processing instances = %v", restSubscription.InstanceIds) for _, instanceId := range restSubscription.InstanceIds { xAppEventInstanceID, err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId) @@ -1276,6 +1350,10 @@ func (c *Control) PrintRESTSubscriptionRequest(p *models.SubscriptionParams) { fmt.Println("CRESTSubscriptionRequest") + if p == nil { + return + } + if p.SubscriptionID != "" { fmt.Println(" SubscriptionID = ", p.SubscriptionID) } else {