X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=pkg%2Fcontrol%2Fcontrol.go;h=7de724ec09bf05a3970b2dd1c7c10886012bd498;hb=3d80b72cb374aec809740f3bec895d4b37a4fc2f;hp=1bcffc6d8f843839557b4c737429cfc75c6ccee5;hpb=08c8fbd6d2d26e17b87aac8438d035813170bc91;p=ric-plt%2Fsubmgr.git diff --git a/pkg/control/control.go b/pkg/control/control.go index 1bcffc6..7de724e 100755 --- a/pkg/control/control.go +++ b/pkg/control/control.go @@ -71,23 +71,26 @@ var e2tRecvMsgTimeout time.Duration var waitRouteCleanup_ms time.Duration var e2tMaxSubReqTryCount uint64 // Initial try + retry var e2tMaxSubDelReqTryCount uint64 // Initial try + retry +var checkE2State string var readSubsFromDb string -var restDuplicateCtrl duplicateCtrl var dbRetryForever string var dbTryCount int type Control struct { *xapp.RMRClient - e2ap *E2ap - registry *Registry - tracker *Tracker - e2SubsDb Sdlnterface - restSubsDb Sdlnterface - CntRecvMsg uint64 - ResetTestFlag bool - Counters map[string]xapp.Counter - LoggerLevel int - UTTesting bool + e2ap *E2ap + registry *Registry + tracker *Tracker + restDuplicateCtrl *DuplicateCtrl + e2IfState *E2IfState + e2IfStateDb XappRnibInterface + e2SubsDb Sdlnterface + restSubsDb Sdlnterface + CntRecvMsg uint64 + ResetTestFlag bool + Counters map[string]xapp.Counter + LoggerLevel int + UTTesting bool } type RMRMeid struct { @@ -133,14 +136,24 @@ func NewControl() *Control { tracker := new(Tracker) tracker.Init() + restDuplicateCtrl := new(DuplicateCtrl) + restDuplicateCtrl.Init() + + e2IfState := new(E2IfState) + c := &Control{e2ap: new(E2ap), - registry: registry, - tracker: tracker, - e2SubsDb: CreateSdl(), - restSubsDb: CreateRESTSdl(), - Counters: xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"), - LoggerLevel: 3, - } + registry: registry, + tracker: tracker, + restDuplicateCtrl: restDuplicateCtrl, + e2IfState: e2IfState, + e2IfStateDb: CreateXappRnibIfInstance(), + e2SubsDb: CreateSdl(), + restSubsDb: CreateRESTSdl(), + Counters: xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"), + LoggerLevel: 1, + } + + e2IfState.Init(c) c.ReadConfigParameters("") // Register REST handler for testing support @@ -148,17 +161,13 @@ func NewControl() *Control { xapp.Resource.InjectRoute("/ric/v1/restsubscriptions", c.GetAllRestSubscriptions, "GET") xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET") - go xapp.Subscription.Listen(c.RESTSubscriptionHandler, c.RESTQueryHandler, c.RESTSubscriptionDeleteHandler) - - if readSubsFromDb == "false" { - return c + if readSubsFromDb == "true" { + // Read subscriptions from db + c.ReadE2Subscriptions() + c.ReadRESTSubscriptions() } - restDuplicateCtrl.Init() - - // Read subscriptions from db - c.ReadE2Subscriptions() - c.ReadRESTSubscriptions() + go xapp.Subscription.Listen(c.RESTSubscriptionHandler, c.RESTQueryHandler, c.RESTSubscriptionDeleteHandler) return c } @@ -226,64 +235,84 @@ func (c *Control) ReadRESTSubscriptions() error { //------------------------------------------------------------------- func (c *Control) ReadConfigParameters(f string) { + xapp.Logger.Debug("ReadConfigParameters") + c.LoggerLevel = int(xapp.Logger.GetLevel()) - xapp.Logger.Debug("LoggerLevel %v", c.LoggerLevel) + xapp.Logger.Info("LoggerLevel = %v", c.LoggerLevel) + c.e2ap.SetASN1DebugPrintStatus(c.LoggerLevel) // viper.GetDuration returns nanoseconds e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000 if e2tSubReqTimeout == 0 { e2tSubReqTimeout = 2000 * 1000000 + xapp.Logger.Debug("WARNING: Using hard coded default value for e2tSubReqTimeout") } - xapp.Logger.Debug("e2tSubReqTimeout %v", e2tSubReqTimeout) + xapp.Logger.Debug("e2tSubReqTimeout= %v", e2tSubReqTimeout) e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000 if e2tSubDelReqTime == 0 { e2tSubDelReqTime = 2000 * 1000000 + xapp.Logger.Debug("WARNING: Using hard coded default value for e2tSubDelReqTime") } - xapp.Logger.Debug("e2tSubDelReqTime %v", e2tSubDelReqTime) + xapp.Logger.Debug("e2tSubDelReqTime= %v", e2tSubDelReqTime) + e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000 if e2tRecvMsgTimeout == 0 { e2tRecvMsgTimeout = 2000 * 1000000 + xapp.Logger.Debug("WARNING: Using hard coded default value for e2tRecvMsgTimeout") } - xapp.Logger.Debug("e2tRecvMsgTimeout %v", e2tRecvMsgTimeout) + xapp.Logger.Debug("e2tRecvMsgTimeout= %v", e2tRecvMsgTimeout) e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount") if e2tMaxSubReqTryCount == 0 { e2tMaxSubReqTryCount = 1 + xapp.Logger.Debug("WARNING: Using hard coded default value for e2tMaxSubReqTryCount") } - xapp.Logger.Debug("e2tMaxSubReqTryCount %v", e2tMaxSubReqTryCount) + xapp.Logger.Debug("e2tMaxSubReqTryCount= %v", e2tMaxSubReqTryCount) e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount") if e2tMaxSubDelReqTryCount == 0 { e2tMaxSubDelReqTryCount = 1 + xapp.Logger.Debug("WARNING: Using hard coded default value for e2tMaxSubDelReqTryCount") + } + xapp.Logger.Debug("e2tMaxSubDelReqTryCount= %v", e2tMaxSubDelReqTryCount) + + checkE2State = viper.GetString("controls.checkE2State") + if checkE2State == "" { + checkE2State = "true" + xapp.Logger.Debug("WARNING: Using hard coded default value for checkE2State") } - xapp.Logger.Debug("e2tMaxSubDelReqTryCount %v", e2tMaxSubDelReqTryCount) + xapp.Logger.Debug("checkE2State= %v", checkE2State) readSubsFromDb = viper.GetString("controls.readSubsFromDb") if readSubsFromDb == "" { readSubsFromDb = "true" + xapp.Logger.Debug("WARNING: Using hard coded default value for readSubsFromDb") } - xapp.Logger.Debug("readSubsFromDb %v", readSubsFromDb) + xapp.Logger.Debug("readSubsFromDb= %v", readSubsFromDb) dbTryCount = viper.GetInt("controls.dbTryCount") if dbTryCount == 0 { dbTryCount = 200 + xapp.Logger.Debug("WARNING: Using hard coded default value for dbTryCount") } - xapp.Logger.Debug("dbTryCount %v", dbTryCount) + xapp.Logger.Debug("dbTryCount= %v", dbTryCount) dbRetryForever = viper.GetString("controls.dbRetryForever") if dbRetryForever == "" { dbRetryForever = "true" + xapp.Logger.Debug("WARNING: Using hard coded default value for dbRetryForever") } - xapp.Logger.Debug("dbRetryForever %v", dbRetryForever) + xapp.Logger.Debug("dbRetryForever= %v", dbRetryForever) // Internal cfg parameter, used to define a wait time for RMR route clean-up. None default // value 100ms used currently only in unittests. waitRouteCleanup_ms = viper.GetDuration("controls.waitRouteCleanup_ms") * 1000000 if waitRouteCleanup_ms == 0 { waitRouteCleanup_ms = 5000 * 1000000 + xapp.Logger.Debug("WARNING: Using hard coded default value for waitRouteCleanup_ms") } - xapp.Logger.Debug("waitRouteCleanup %v", waitRouteCleanup_ms) + xapp.Logger.Debug("waitRouteCleanup= %v", waitRouteCleanup_ms) } //------------------------------------------------------------------- @@ -325,7 +354,7 @@ func (c *Control) GetOrCreateRestSubscription(p *models.SubscriptionParams, md5s var restSubscription *RESTSubscription var err error - prevRestSubsId, exists := restDuplicateCtrl.GetLastKnownRestSubsIdBasedOnMd5sum(md5sum) + prevRestSubsId, exists := c.restDuplicateCtrl.GetLastKnownRestSubsIdBasedOnMd5sum(md5sum) if p.SubscriptionID == "" { // Subscription does not contain REST subscription Id if exists { @@ -340,7 +369,7 @@ func (c *Control) GetOrCreateRestSubscription(p *models.SubscriptionParams, md5s } } else { xapp.Logger.Debug("None existing restSubId %s referred by MD5sum %s for a request without subscription ID - deleting cached entry", prevRestSubsId, md5sum) - restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum) + c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum) } } @@ -386,6 +415,12 @@ func (c *Control) RESTSubscriptionHandler(params interface{}) (*models.Subscript c.PrintRESTSubscriptionRequest(p) } + if c.e2IfState.IsE2ConnectionUp(p.Meid) == false { + xapp.Logger.Error("No E2 connection for ranName %v", *p.Meid) + c.UpdateCounter(cRestReqRejDueE2Down) + return nil, common.SubscribeServiceUnavailableCode + } + if p.ClientEndpoint == nil { err := fmt.Errorf("ClientEndpoint == nil") xapp.Logger.Error("%v", err) @@ -416,13 +451,13 @@ func (c *Control) RESTSubscriptionHandler(params interface{}) (*models.Subscript err = c.e2ap.FillSubscriptionReqMsgs(params, &subReqList, restSubscription) if err != nil { xapp.Logger.Error("%s", err.Error()) - restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum) + c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum) c.registry.DeleteRESTSubscription(&restSubId) c.UpdateCounter(cRestSubFailToXapp) return nil, common.SubscribeBadRequestCode } - duplicate := restDuplicateCtrl.IsDuplicateToOngoingTransaction(restSubId, md5sum) + duplicate := c.restDuplicateCtrl.IsDuplicateToOngoingTransaction(restSubId, md5sum) if duplicate { err := fmt.Errorf("Retransmission blocker direct ACK for request of restSubsId %s restSubId MD5sum %s as retransmission", restSubId, md5sum) xapp.Logger.Debug("%s", err) @@ -491,7 +526,7 @@ func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription var e2EventInstanceID int64 errorInfo := &ErrorInfo{} - defer restDuplicateCtrl.SetMd5sumFromLastOkRequest(*restSubId, md5sum) + defer c.restDuplicateCtrl.SetMd5sumFromLastOkRequest(*restSubId, md5sum) for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ { subReqMsg := subReqList.E2APSubscriptionRequests[index] @@ -511,6 +546,7 @@ func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription subRespMsg, errorInfo, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, *restSubId, e2SubscriptionDirectives) xapp.Logger.Debug("Handled SubscriptionRequest index=%v, %s", index, idstring(nil, trans)) + trans.Release() if err != nil { c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans, errorInfo) @@ -521,7 +557,6 @@ func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription index, *restSubId, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans)) c.sendSuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, e2EventInstanceID, clientEndpoint, trans) } - trans.Release() } } @@ -563,15 +598,25 @@ func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e // // Wake subs request // + subs.OngoingReqCount++ go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives) event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side + subs.OngoingReqCount-- err = nil if event != nil { switch themsg := event.(type) { case *e2ap.E2APSubscriptionResponse: trans.Release() - return themsg, &errorInfo, nil + if c.e2IfState.IsE2ConnectionUp(meid) == true { + return themsg, &errorInfo, nil + } else { + c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c) + c.RemoveSubscriptionFromDb(subs) + err = fmt.Errorf("E2 interface down") + errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "") + return nil, &errorInfo, err + } case *e2ap.E2APSubscriptionFailure: err = fmt.Errorf("E2 SubscriptionFailure received") errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "") @@ -588,6 +633,7 @@ func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e break } } else { + // Timer expiry err = fmt.Errorf("E2 subscription response timeout") errorInfo.SetInfo(err.Error(), "", models.SubscriptionInstanceTimeoutTypeE2Timeout) if subs.PolicyUpdate == true { @@ -596,6 +642,7 @@ func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e } xapp.Logger.Error("XAPP-SubReq E2 subscription failed %s", idstring(err, trans, subs)) + c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c) return nil, &errorInfo, err } @@ -635,6 +682,11 @@ func (c *Control) sendUnsuccesfullResponseNotification(restSubId *string, restSu c.UpdateCounter(cRestSubFailNotifToXapp) xapp.Subscription.Notify(resp, *clientEndpoint) + + if c.e2IfState.IsE2ConnectionUp(&restSubscription.Meid) == false && restSubscription.SubReqOngoing == false { + c.registry.DeleteRESTSubscription(restSubId) + c.RemoveRESTSubscriptionFromDb(*restSubId) + } } //------------------------------------------------------------------- @@ -664,6 +716,11 @@ func (c *Control) sendSuccesfullResponseNotification(restSubId *string, restSubs c.UpdateCounter(cRestSubNotifToXapp) xapp.Subscription.Notify(resp, *clientEndpoint) + + if c.e2IfState.IsE2ConnectionUp(&restSubscription.Meid) == false && restSubscription.SubReqOngoing == false { + c.registry.DeleteRESTSubscription(restSubId) + c.RemoveRESTSubscriptionFromDb(*restSubId) + } } //------------------------------------------------------------------- @@ -681,15 +738,18 @@ func (c *Control) RESTSubscriptionDeleteHandler(restSubId string) int { xapp.Logger.Error("%s", err.Error()) if restSubscription == nil { // Subscription was not found + c.UpdateCounter(cRestSubDelRespToXapp) return common.UnsubscribeNoContentCode } else { if restSubscription.SubReqOngoing == true { err := fmt.Errorf("Handling of the REST Subscription Request still ongoing %s", restSubId) xapp.Logger.Error("%s", err.Error()) + c.UpdateCounter(cRestSubDelFailToXapp) return common.UnsubscribeBadRequestCode } else if restSubscription.SubDelReqOngoing == true { // Previous request for same restSubId still ongoing - return common.UnsubscribeBadRequestCode + c.UpdateCounter(cRestSubDelRespToXapp) + return common.UnsubscribeNoContentCode } } } @@ -707,13 +767,12 @@ func (c *Control) RESTSubscriptionDeleteHandler(restSubId string) int { restSubscription.DeleteXappIdToE2Id(xAppEventInstanceID) restSubscription.DeleteE2InstanceId(instanceId) } - restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(restSubscription.lastReqMd5sum) + c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(restSubscription.lastReqMd5sum) c.registry.DeleteRESTSubscription(&restSubId) c.RemoveRESTSubscriptionFromDb(restSubId) }() c.UpdateCounter(cRestSubDelRespToXapp) - return common.UnsubscribeNoContentCode } @@ -747,8 +806,10 @@ func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, // // Wake subs delete // + subs.OngoingDelCount++ go c.handleSubscriptionDelete(subs, trans) trans.WaitEvent(0) //blocked wait as timeout is handled in subs side + subs.OngoingDelCount-- xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs)) @@ -888,6 +949,11 @@ func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) { xapp.Logger.Debug("MSG from XAPP: %s", params.String()) c.UpdateCounter(cSubReqFromXapp) + if c.e2IfState.IsE2ConnectionUp(¶ms.Meid.RanName) == false { + xapp.Logger.Error("No E2 connection for ranName %v", params.Meid.RanName) + return + } + subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload) if err != nil { xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params)) @@ -922,8 +988,10 @@ func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) { func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) { e2SubscriptionDirectives, _ := c.GetE2SubscriptionDirectives(nil) + subs.OngoingReqCount++ go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives) event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side + subs.OngoingReqCount-- var err error if event != nil { switch themsg := event.(type) { @@ -958,6 +1026,11 @@ func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) { xapp.Logger.Debug("MSG from XAPP: %s", params.String()) c.UpdateCounter(cSubDelReqFromXapp) + if c.e2IfState.IsE2ConnectionUp(¶ms.Meid.RanName) == false { + xapp.Logger.Error("No E2 connection for ranName %v", params.Meid.RanName) + return + } + subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload) if err != nil { xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params)) @@ -986,8 +1059,10 @@ func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) { // // Wake subs delete // + subs.OngoingDelCount++ go c.handleSubscriptionDelete(subs, trans) trans.WaitEvent(0) //blocked wait as timeout is handled in subs side + subs.OngoingDelCount-- xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs)) @@ -1037,13 +1112,13 @@ func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *Tran removeSubscriptionFromDb = true subRfMsg, valid = subs.SetCachedResponse(event, false) xapp.Logger.Debug("SUBS-SubReq: internal delete due failure event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans)) - c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans) case *SubmgrRestartTestEvent: // This simulates that no response has been received and after restart subscriptions are restored from db xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case") case *PackSubscriptionRequestErrortEvent, *SDLWriteErrortEvent: subRfMsg, valid = subs.SetCachedResponse(event, false) default: + // Timer expiry if subs.PolicyUpdate == false { xapp.Logger.Debug("SUBS-SubReq: internal delete due default event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans)) removeSubscriptionFromDb = true