X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=pkg%2Fcontrol%2Fcontrol.go;h=4c0cf9a460b7342c532ca143a0c34a86327380e4;hb=6bd579175aaa85f1dd864ad10fe4209ed6b450ea;hp=aaad62565598bca33f81ff264033762396dee813;hpb=f682ace08a827bd260e4905b5ee1bddacf01c6e0;p=ric-plt%2Fsubmgr.git diff --git a/pkg/control/control.go b/pkg/control/control.go index aaad625..4c0cf9a 100755 --- a/pkg/control/control.go +++ b/pkg/control/control.go @@ -22,9 +22,7 @@ package control import ( "fmt" "net/http" - "os" - "strconv" - "strings" + "sync" "time" "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap" @@ -34,7 +32,6 @@ import ( "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" httptransport "github.com/go-openapi/runtime/client" "github.com/go-openapi/strfmt" - "github.com/gorilla/mux" "github.com/segmentio/ksuid" "github.com/spf13/viper" ) @@ -71,22 +68,26 @@ var e2tRecvMsgTimeout time.Duration var waitRouteCleanup_ms time.Duration var e2tMaxSubReqTryCount uint64 // Initial try + retry var e2tMaxSubDelReqTryCount uint64 // Initial try + retry +var checkE2State string var readSubsFromDb string -var restDuplicateCtrl duplicateCtrl var dbRetryForever string var dbTryCount int type Control struct { *xapp.RMRClient - e2ap *E2ap - registry *Registry - tracker *Tracker - e2SubsDb Sdlnterface - restSubsDb Sdlnterface - CntRecvMsg uint64 - ResetTestFlag bool - Counters map[string]xapp.Counter - LoggerLevel int + e2ap *E2ap + registry *Registry + tracker *Tracker + restDuplicateCtrl *DuplicateCtrl + e2IfState *E2IfState + e2IfStateDb XappRnibInterface + e2SubsDb Sdlnterface + restSubsDb Sdlnterface + CntRecvMsg uint64 + ResetTestFlag bool + Counters map[string]xapp.Counter + LoggerLevel int + UTTesting bool } type RMRMeid struct { @@ -132,32 +133,48 @@ func NewControl() *Control { tracker := new(Tracker) tracker.Init() + restDuplicateCtrl := new(DuplicateCtrl) + restDuplicateCtrl.Init() + + e2IfState := new(E2IfState) + c := &Control{e2ap: new(E2ap), - registry: registry, - tracker: tracker, - e2SubsDb: CreateSdl(), - restSubsDb: CreateRESTSdl(), - Counters: xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"), - LoggerLevel: 3, - } + registry: registry, + tracker: tracker, + restDuplicateCtrl: restDuplicateCtrl, + e2IfState: e2IfState, + e2IfStateDb: CreateXappRnibIfInstance(), + e2SubsDb: CreateSdl(), + restSubsDb: CreateRESTSdl(), + Counters: xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"), + LoggerLevel: 1, + } + + e2IfState.Init(c) c.ReadConfigParameters("") // Register REST handler for testing support + xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET") xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST") xapp.Resource.InjectRoute("/ric/v1/restsubscriptions", c.GetAllRestSubscriptions, "GET") - xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET") - go xapp.Subscription.Listen(c.RESTSubscriptionHandler, c.RESTQueryHandler, c.RESTSubscriptionDeleteHandler) + xapp.Resource.InjectRoute("/ric/v1/get_all_e2nodes", c.GetAllE2Nodes, "GET") + xapp.Resource.InjectRoute("/ric/v1/get_e2node_rest_subscriptions/{ranName}", c.GetAllE2NodeRestSubscriptions, "GET") - if readSubsFromDb == "false" { - return c - } + xapp.Resource.InjectRoute("/ric/v1/get_all_xapps", c.GetAllXapps, "GET") + xapp.Resource.InjectRoute("/ric/v1/get_xapp_rest_restsubscriptions/{xappServiceName}", c.GetAllXappRestSubscriptions, "GET") + xapp.Resource.InjectRoute("/ric/v1/get_e2subscriptions/{restId}", c.GetE2Subscriptions, "GET") - restDuplicateCtrl.Init() + xapp.Resource.InjectRoute("/ric/v1/delete_all_e2node_subscriptions/{ranName}", c.DeleteAllE2nodeSubscriptions, "DELETE") + xapp.Resource.InjectRoute("/ric/v1/delete_all_xapp_subscriptions/{xappServiceName}", c.DeleteAllXappSubscriptions, "DELETE") + + if readSubsFromDb == "true" { + // Read subscriptions from db + c.ReadE2Subscriptions() + c.ReadRESTSubscriptions() + } - // Read subscriptions from db - c.ReadE2Subscriptions() - c.ReadRESTSubscriptions() + go xapp.Subscription.Listen(c.RESTSubscriptionHandler, c.RESTQueryHandler, c.RESTSubscriptionDeleteHandler) return c } @@ -169,10 +186,12 @@ func (c *Control) SymptomDataHandler(w http.ResponseWriter, r *http.Request) { //------------------------------------------------------------------- // //------------------------------------------------------------------- -func (c *Control) GetAllRestSubscriptions(w http.ResponseWriter, r *http.Request) { - xapp.Logger.Debug("GetAllRestSubscriptions() called") - response := c.registry.GetAllRestSubscriptions() - w.Write(response) +func (c *Control) RESTQueryHandler() (models.SubscriptionList, error) { + xapp.Logger.Debug("RESTQueryHandler() called") + + c.CntRecvMsg++ + + return c.registry.QueryHandler() } //------------------------------------------------------------------- @@ -191,7 +210,7 @@ func (c *Control) ReadE2Subscriptions() error { } else { c.registry.subIds = subIds c.registry.register = register - c.HandleUncompletedSubscriptions(register) + go c.HandleUncompletedSubscriptions(register) return nil } } @@ -203,6 +222,8 @@ func (c *Control) ReadE2Subscriptions() error { // //------------------------------------------------------------------- func (c *Control) ReadRESTSubscriptions() error { + + xapp.Logger.Debug("ReadRESTSubscriptions()") var err error var restSubscriptions map[string]*RESTSubscription for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ { @@ -212,6 +233,12 @@ func (c *Control) ReadRESTSubscriptions() error { xapp.Logger.Error("%v", err) <-time.After(1 * time.Second) } else { + // Fix REST subscriptions ongoing status after restart + for restSubId, restSubscription := range restSubscriptions { + restSubscription.SubReqOngoing = false + restSubscription.SubDelReqOngoing = false + c.WriteRESTSubscriptionToSdl(restSubId, restSubscription) + } c.registry.restSubscriptions = restSubscriptions return nil } @@ -225,64 +252,84 @@ func (c *Control) ReadRESTSubscriptions() error { //------------------------------------------------------------------- func (c *Control) ReadConfigParameters(f string) { + xapp.Logger.Debug("ReadConfigParameters") + c.LoggerLevel = int(xapp.Logger.GetLevel()) - xapp.Logger.Debug("LoggerLevel %v", c.LoggerLevel) + xapp.Logger.Info("LoggerLevel = %v", c.LoggerLevel) + c.e2ap.SetASN1DebugPrintStatus(c.LoggerLevel) // viper.GetDuration returns nanoseconds e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000 if e2tSubReqTimeout == 0 { e2tSubReqTimeout = 2000 * 1000000 + xapp.Logger.Debug("WARNING: Using hard coded default value for e2tSubReqTimeout") } - xapp.Logger.Debug("e2tSubReqTimeout %v", e2tSubReqTimeout) + xapp.Logger.Debug("e2tSubReqTimeout= %v", e2tSubReqTimeout) e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000 if e2tSubDelReqTime == 0 { e2tSubDelReqTime = 2000 * 1000000 + xapp.Logger.Debug("WARNING: Using hard coded default value for e2tSubDelReqTime") } - xapp.Logger.Debug("e2tSubDelReqTime %v", e2tSubDelReqTime) + xapp.Logger.Debug("e2tSubDelReqTime= %v", e2tSubDelReqTime) + e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000 if e2tRecvMsgTimeout == 0 { e2tRecvMsgTimeout = 2000 * 1000000 + xapp.Logger.Debug("WARNING: Using hard coded default value for e2tRecvMsgTimeout") } - xapp.Logger.Debug("e2tRecvMsgTimeout %v", e2tRecvMsgTimeout) + xapp.Logger.Debug("e2tRecvMsgTimeout= %v", e2tRecvMsgTimeout) e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount") if e2tMaxSubReqTryCount == 0 { e2tMaxSubReqTryCount = 1 + xapp.Logger.Debug("WARNING: Using hard coded default value for e2tMaxSubReqTryCount") } - xapp.Logger.Debug("e2tMaxSubReqTryCount %v", e2tMaxSubReqTryCount) + xapp.Logger.Debug("e2tMaxSubReqTryCount= %v", e2tMaxSubReqTryCount) e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount") if e2tMaxSubDelReqTryCount == 0 { e2tMaxSubDelReqTryCount = 1 + xapp.Logger.Debug("WARNING: Using hard coded default value for e2tMaxSubDelReqTryCount") + } + xapp.Logger.Debug("e2tMaxSubDelReqTryCount= %v", e2tMaxSubDelReqTryCount) + + checkE2State = viper.GetString("controls.checkE2State") + if checkE2State == "" { + checkE2State = "true" + xapp.Logger.Debug("WARNING: Using hard coded default value for checkE2State") } - xapp.Logger.Debug("e2tMaxSubDelReqTryCount %v", e2tMaxSubDelReqTryCount) + xapp.Logger.Debug("checkE2State= %v", checkE2State) readSubsFromDb = viper.GetString("controls.readSubsFromDb") if readSubsFromDb == "" { readSubsFromDb = "true" + xapp.Logger.Debug("WARNING: Using hard coded default value for readSubsFromDb") } - xapp.Logger.Debug("readSubsFromDb %v", readSubsFromDb) + xapp.Logger.Debug("readSubsFromDb= %v", readSubsFromDb) dbTryCount = viper.GetInt("controls.dbTryCount") if dbTryCount == 0 { dbTryCount = 200 + xapp.Logger.Debug("WARNING: Using hard coded default value for dbTryCount") } - xapp.Logger.Debug("dbTryCount %v", dbTryCount) + xapp.Logger.Debug("dbTryCount= %v", dbTryCount) dbRetryForever = viper.GetString("controls.dbRetryForever") if dbRetryForever == "" { dbRetryForever = "true" + xapp.Logger.Debug("WARNING: Using hard coded default value for dbRetryForever") } - xapp.Logger.Debug("dbRetryForever %v", dbRetryForever) + xapp.Logger.Debug("dbRetryForever= %v", dbRetryForever) // Internal cfg parameter, used to define a wait time for RMR route clean-up. None default // value 100ms used currently only in unittests. waitRouteCleanup_ms = viper.GetDuration("controls.waitRouteCleanup_ms") * 1000000 if waitRouteCleanup_ms == 0 { waitRouteCleanup_ms = 5000 * 1000000 + xapp.Logger.Debug("WARNING: Using hard coded default value for waitRouteCleanup_ms") } - xapp.Logger.Debug("waitRouteCleanup %v", waitRouteCleanup_ms) + xapp.Logger.Debug("waitRouteCleanup= %v", waitRouteCleanup_ms) } //------------------------------------------------------------------- @@ -318,13 +365,13 @@ func (c *Control) Run() { //------------------------------------------------------------------- // //------------------------------------------------------------------- -func (c *Control) GetOrCreateRestSubscription(p *models.SubscriptionParams, md5sum string, xAppRmrEndpoint string) (*RESTSubscription, string, string, error) { +func (c *Control) GetOrCreateRestSubscription(p *models.SubscriptionParams, md5sum string, xAppRmrEndpoint string, xAppServiceName string) (*RESTSubscription, string, error) { var restSubId string var restSubscription *RESTSubscription var err error - prevRestSubsId, exists := restDuplicateCtrl.GetLastKnownRestSubsIdBasedOnMd5sum(md5sum) + prevRestSubsId, exists := c.restDuplicateCtrl.GetLastKnownRestSubsIdBasedOnMd5sum(md5sum) if p.SubscriptionID == "" { // Subscription does not contain REST subscription Id if exists { @@ -339,13 +386,13 @@ func (c *Control) GetOrCreateRestSubscription(p *models.SubscriptionParams, md5s } } else { xapp.Logger.Debug("None existing restSubId %s referred by MD5sum %s for a request without subscription ID - deleting cached entry", prevRestSubsId, md5sum) - restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum) + c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum) } } if restSubscription == nil { restSubId = ksuid.New().String() - restSubscription = c.registry.CreateRESTSubscription(&restSubId, &xAppRmrEndpoint, p.Meid) + restSubscription = c.registry.CreateRESTSubscription(&restSubId, &xAppServiceName, &xAppRmrEndpoint, p.Meid) } } else { // Subscription contains REST subscription Id @@ -357,7 +404,7 @@ func (c *Control) GetOrCreateRestSubscription(p *models.SubscriptionParams, md5s // Subscription with id in REST request does not exist xapp.Logger.Error("%s", err.Error()) c.UpdateCounter(cRestSubFailToXapp) - return nil, "", models.SubscriptionInstanceRejectCauseRESTSubscriptionWithGivenIDDoesNotExist, err + return nil, "", err } if !exists { @@ -367,7 +414,7 @@ func (c *Control) GetOrCreateRestSubscription(p *models.SubscriptionParams, md5s } } - return restSubscription, restSubId, "", nil + return restSubscription, restSubId, nil } //------------------------------------------------------------------- @@ -385,18 +432,30 @@ func (c *Control) RESTSubscriptionHandler(params interface{}) (*models.Subscript c.PrintRESTSubscriptionRequest(p) } + if c.e2IfState.IsE2ConnectionUp(p.Meid) == false { + xapp.Logger.Error("No E2 connection for ranName %v", *p.Meid) + c.UpdateCounter(cRestReqRejDueE2Down) + return nil, common.SubscribeServiceUnavailableCode + } + if p.ClientEndpoint == nil { err := fmt.Errorf("ClientEndpoint == nil") xapp.Logger.Error("%v", err) c.UpdateCounter(cRestSubFailToXapp) - return c.GetSubscriptionResponse(models.SubscriptionInstanceRejectCauseInvalidRESTRequestMessage, err.Error(), "SUBMGR", ""), common.SubscribeBadRequestCode + return nil, common.SubscribeBadRequestCode } + e2SubscriptionDirectives, err := c.GetE2SubscriptionDirectives(p) + if err != nil { + xapp.Logger.Error("%s", err) + c.UpdateCounter(cRestSubFailToXapp) + return nil, common.SubscribeBadRequestCode + } _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*p.ClientEndpoint) if err != nil { xapp.Logger.Error("%s", err.Error()) c.UpdateCounter(cRestSubFailToXapp) - return c.GetSubscriptionResponse(models.SubscriptionInstanceRejectCauseInvalidRESTRequestMessage, err.Error(), "SUBMGR", ""), common.SubscribeBadRequestCode + return nil, common.SubscribeBadRequestCode } md5sum, err := CalculateRequestMd5sum(params) @@ -404,10 +463,10 @@ func (c *Control) RESTSubscriptionHandler(params interface{}) (*models.Subscript xapp.Logger.Error("Failed to generate md5sum from incoming request - %s", err.Error()) } - restSubscription, restSubId, rejectCause, err := c.GetOrCreateRestSubscription(p, md5sum, xAppRmrEndpoint) + restSubscription, restSubId, err := c.GetOrCreateRestSubscription(p, md5sum, xAppRmrEndpoint, p.ClientEndpoint.Host) if err != nil { - xapp.Logger.Error("Failed to get/allocate REST subscription") - return c.GetSubscriptionResponse(rejectCause, err.Error(), "SUBMGR", ""), common.SubscribeBadRequestCode + xapp.Logger.Error("Subscription with id in REST request does not exist") + return nil, common.SubscribeNotFoundCode } subResp.SubscriptionID = &restSubId @@ -415,26 +474,22 @@ func (c *Control) RESTSubscriptionHandler(params interface{}) (*models.Subscript err = c.e2ap.FillSubscriptionReqMsgs(params, &subReqList, restSubscription) if err != nil { xapp.Logger.Error("%s", err.Error()) - restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum) + c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum) c.registry.DeleteRESTSubscription(&restSubId) c.UpdateCounter(cRestSubFailToXapp) - return c.GetSubscriptionResponse(models.SubscriptionInstanceRejectCauseInvalidRESTRequestMessage, err.Error(), "SUBMGR", ""), common.SubscribeBadRequestCode + return nil, common.SubscribeBadRequestCode } - duplicate := restDuplicateCtrl.IsDuplicateToOngoingTransaction(restSubId, md5sum) + duplicate := c.restDuplicateCtrl.IsDuplicateToOngoingTransaction(restSubId, md5sum) if duplicate { err := fmt.Errorf("Retransmission blocker direct ACK for request of restSubsId %s restSubId MD5sum %s as retransmission", restSubId, md5sum) xapp.Logger.Debug("%s", err) + c.registry.DeleteRESTSubscription(&restSubId) c.UpdateCounter(cRestSubRespToXapp) return &subResp, common.SubscribeCreatedCode } c.WriteRESTSubscriptionToDb(restSubId, restSubscription) - e2SubscriptionDirectives, err := c.GetE2SubscriptionDirectives(p) - if err != nil { - xapp.Logger.Error("%s", err) - return c.GetSubscriptionResponse(models.SubscriptionInstanceRejectCauseInvalidRESTRequestMessage, err.Error(), "SUBMGR", ""), common.SubscribeBadRequestCode - } go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId, xAppRmrEndpoint, md5sum, e2SubscriptionDirectives) c.UpdateCounter(cRestSubRespToXapp) @@ -468,12 +523,7 @@ func (c *Control) GetE2SubscriptionDirectives(p *models.SubscriptionParams) (*E2 return nil, fmt.Errorf("p.E2SubscriptionDirectives.E2RetryCount out of range (0-10): %v", *p.E2SubscriptionDirectives.E2RetryCount) } } - if p.E2SubscriptionDirectives.RMRRoutingNeeded == nil { - xapp.Logger.Error("p.E2SubscriptionDirectives.RMRRoutingNeeded == nil") - e2SubscriptionDirectives.CreateRMRRoute = true - } else { - e2SubscriptionDirectives.CreateRMRRoute = *p.E2SubscriptionDirectives.RMRRoutingNeeded - } + e2SubscriptionDirectives.CreateRMRRoute = p.E2SubscriptionDirectives.RMRRoutingNeeded } xapp.Logger.Debug("e2SubscriptionDirectives.E2TimeoutTimerValue: %v", e2SubscriptionDirectives.E2TimeoutTimerValue) xapp.Logger.Debug("e2SubscriptionDirectives.E2MaxTryCount: %v", e2SubscriptionDirectives.E2MaxTryCount) @@ -481,24 +531,6 @@ func (c *Control) GetE2SubscriptionDirectives(p *models.SubscriptionParams) (*E2 return e2SubscriptionDirectives, nil } -//------------------------------------------------------------------- -// -//------------------------------------------------------------------- -func (c *Control) GetSubscriptionResponse(rejectCause string, errorCause string, errorSource string, timeoutType string) *models.SubscriptionResponse { - subResp := models.SubscriptionResponse{} - subscriptionInstance := models.SubscriptionInstance{} - subscriptionInstance.RejectCause = &rejectCause - subscriptionInstance.ErrorCause = &errorCause - subscriptionInstance.ErrorSource = &errorSource - if timeoutType != "" { - subscriptionInstance.TimeoutType = &timeoutType - } - subResp.SubscriptionInstances = append(subResp.SubscriptionInstances, &subscriptionInstance) - xapp.Logger.Error("etSubscriptionResponse() %+v", subscriptionInstance) - - return &subResp -} - //------------------------------------------------------------------- // //------------------------------------------------------------------- @@ -506,13 +538,14 @@ func (c *Control) GetSubscriptionResponse(rejectCause string, errorCause string, func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription, subReqList *e2ap.SubscriptionRequestList, clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string, xAppRmrEndpoint string, md5sum string, e2SubscriptionDirectives *E2SubscriptionDirectives) { - xapp.Logger.Debug("Subscription Request count=%v ", len(subReqList.E2APSubscriptionRequests)) + c.SubscriptionProcessingStartDelay() + xapp.Logger.Debug("E2 SubscriptionRequest count = %v ", len(subReqList.E2APSubscriptionRequests)) var xAppEventInstanceID int64 var e2EventInstanceID int64 errorInfo := &ErrorInfo{} - defer restDuplicateCtrl.SetMd5sumFromLastOkRequest(*restSubId, md5sum) + defer c.restDuplicateCtrl.SetMd5sumFromLastOkRequest(*restSubId, md5sum) for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ { subReqMsg := subReqList.E2APSubscriptionRequests[index] @@ -532,17 +565,34 @@ func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription subRespMsg, errorInfo, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, *restSubId, e2SubscriptionDirectives) xapp.Logger.Debug("Handled SubscriptionRequest index=%v, %s", index, idstring(nil, trans)) + trans.Release() if err != nil { + if err.Error() == "TEST: restart event received" { + // This is just for UT cases. Stop here subscription processing + return + } c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans, errorInfo) } else { e2EventInstanceID = (int64)(subRespMsg.RequestId.InstanceId) restSubscription.AddMd5Sum(md5sum) xapp.Logger.Debug("SubscriptionRequest index=%v processed successfullyfor %s. endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s", index, *restSubId, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans)) - c.sendSuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, e2EventInstanceID, clientEndpoint, trans) + c.sendSuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, e2EventInstanceID, clientEndpoint, trans, errorInfo) } - trans.Release() + } +} + +//------------------------------------------------------------------- +// +//------------------------------------------------------------------ +func (c *Control) SubscriptionProcessingStartDelay() { + if c.UTTesting == true { + // This is temporary fix for the UT problem that notification arrives before subscription response + // Correct fix would be to allow notification come before response and process it correctly + xapp.Logger.Debug("Setting 50 ms delay before starting processing Subscriptions") + <-time.After(time.Millisecond * 50) + xapp.Logger.Debug("Continuing after delay") } } @@ -571,32 +621,46 @@ func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e // // Wake subs request // - go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives) + subs.OngoingReqCount++ + go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives, 0) event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side + subs.OngoingReqCount-- err = nil if event != nil { switch themsg := event.(type) { case *e2ap.E2APSubscriptionResponse: trans.Release() - return themsg, &errorInfo, nil + if c.e2IfState.IsE2ConnectionUp(meid) == true { + errorInfo = c.e2ap.CheckActionNotAdmittedList(xapp.RIC_SUB_RESP, themsg.ActionNotAdmittedList, c) + return themsg, &errorInfo, nil + } else { + c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c) + c.RemoveSubscriptionFromDb(subs) + err = fmt.Errorf("E2 interface down") + errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "") + } case *e2ap.E2APSubscriptionFailure: - err = fmt.Errorf("E2 SubscriptionFailure received") + err = fmt.Errorf("RICSubscriptionFailure. E2NodeCause: (Cause:%v, Value %v)", themsg.Cause.Content, themsg.Cause.Value) errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "") - return nil, &errorInfo, err case *PackSubscriptionRequestErrortEvent: - err = fmt.Errorf("E2 SubscriptionRequest pack failure") - return nil, &themsg.ErrorInfo, err + err = fmt.Errorf("E2 RICSubscriptionRequest pack failure") + errorInfo = themsg.ErrorInfo case *SDLWriteErrortEvent: err = fmt.Errorf("SDL write failure") - return nil, &themsg.ErrorInfo, err + errorInfo = themsg.ErrorInfo + case *SubmgrRestartTestEvent: + err = fmt.Errorf("TEST: restart event received") + xapp.Logger.Debug("%s", err) + return nil, &errorInfo, err default: err = fmt.Errorf("Unexpected E2 subscription response received") errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "") break } } else { - err = fmt.Errorf("E2 subscription response timeout") + // Timer expiry + err = fmt.Errorf("E2 RICSubscriptionResponse timeout") errorInfo.SetInfo(err.Error(), "", models.SubscriptionInstanceTimeoutTypeE2Timeout) if subs.PolicyUpdate == true { return nil, &errorInfo, err @@ -624,9 +688,9 @@ func (c *Control) sendUnsuccesfullResponseNotification(restSubId *string, restSu SubscriptionID: restSubId, SubscriptionInstances: []*models.SubscriptionInstance{ &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID, - ErrorCause: &errorInfo.ErrorCause, - ErrorSource: &errorInfo.ErrorSource, - TimeoutType: &errorInfo.TimeoutType, + ErrorCause: errorInfo.ErrorCause, + ErrorSource: errorInfo.ErrorSource, + TimeoutType: errorInfo.TimeoutType, XappEventInstanceID: &xAppEventInstanceID}, }, } @@ -634,22 +698,28 @@ func (c *Control) sendUnsuccesfullResponseNotification(restSubId *string, restSu restSubscription.SetProcessed(err) c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false) if trans != nil { - xapp.Logger.Debug("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s", - errorInfo.ErrorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans)) + xapp.Logger.Debug("Sending unsuccessful REST notification: ErrorCause:%s, ErrorSource:%s, TimeoutType:%s, to Endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s", + errorInfo.ErrorCause, errorInfo.ErrorSource, errorInfo.TimeoutType, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans)) } else { - xapp.Logger.Debug("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v", - errorInfo.ErrorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID) + xapp.Logger.Debug("Sending unsuccessful REST notification: ErrorCause:%s, ErrorSource:%s, TimeoutType:%s, to Endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v", + errorInfo.ErrorCause, errorInfo.ErrorSource, errorInfo.TimeoutType, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID) } c.UpdateCounter(cRestSubFailNotifToXapp) xapp.Subscription.Notify(resp, *clientEndpoint) + + // E2 is down. Delete completely processed request safely now + if c.e2IfState.IsE2ConnectionUp(&restSubscription.Meid) == false && restSubscription.SubReqOngoing == false { + c.registry.DeleteRESTSubscription(restSubId) + c.RemoveRESTSubscriptionFromDb(*restSubId) + } } //------------------------------------------------------------------- // //------------------------------------------------------------------- func (c *Control) sendSuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, e2EventInstanceID int64, - clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp) { + clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp, errorInfo *ErrorInfo) { // Store successfully processed InstanceId for deletion restSubscription.AddE2InstanceId((uint32)(e2EventInstanceID)) @@ -660,18 +730,24 @@ func (c *Control) sendSuccesfullResponseNotification(restSubId *string, restSubs SubscriptionID: restSubId, SubscriptionInstances: []*models.SubscriptionInstance{ &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID, - ErrorCause: nil, + ErrorCause: errorInfo.ErrorCause, + ErrorSource: errorInfo.ErrorSource, XappEventInstanceID: &xAppEventInstanceID}, }, } // Mark REST subscription request processesd. restSubscription.SetProcessed(nil) c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false) - xapp.Logger.Debug("Sending successful REST notification to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s", - clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans)) - + xapp.Logger.Debug("Sending successful REST notification: ErrorCause:%s, ErrorSource:%s, TimeoutType:%s, to Endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s", + errorInfo.ErrorCause, errorInfo.ErrorSource, errorInfo.TimeoutType, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans)) c.UpdateCounter(cRestSubNotifToXapp) xapp.Subscription.Notify(resp, *clientEndpoint) + + // E2 is down. Delete completely processed request safely now + if c.e2IfState.IsE2ConnectionUp(&restSubscription.Meid) == false && restSubscription.SubReqOngoing == false { + c.registry.DeleteRESTSubscription(restSubId) + c.RemoveRESTSubscriptionFromDb(*restSubId) + } } //------------------------------------------------------------------- @@ -689,15 +765,18 @@ func (c *Control) RESTSubscriptionDeleteHandler(restSubId string) int { xapp.Logger.Error("%s", err.Error()) if restSubscription == nil { // Subscription was not found + c.UpdateCounter(cRestSubDelRespToXapp) return common.UnsubscribeNoContentCode } else { if restSubscription.SubReqOngoing == true { err := fmt.Errorf("Handling of the REST Subscription Request still ongoing %s", restSubId) xapp.Logger.Error("%s", err.Error()) + c.UpdateCounter(cRestSubDelFailToXapp) return common.UnsubscribeBadRequestCode } else if restSubscription.SubDelReqOngoing == true { // Previous request for same restSubId still ongoing - return common.UnsubscribeBadRequestCode + c.UpdateCounter(cRestSubDelRespToXapp) + return common.UnsubscribeNoContentCode } } } @@ -706,7 +785,7 @@ func (c *Control) RESTSubscriptionDeleteHandler(restSubId string) int { go func() { xapp.Logger.Debug("Deleteting handler: processing instances = %v", restSubscription.InstanceIds) for _, instanceId := range restSubscription.InstanceIds { - xAppEventInstanceID, err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId) + xAppEventInstanceID, err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId, 0) if err != nil { xapp.Logger.Error("%s", err.Error()) @@ -715,20 +794,19 @@ func (c *Control) RESTSubscriptionDeleteHandler(restSubId string) int { restSubscription.DeleteXappIdToE2Id(xAppEventInstanceID) restSubscription.DeleteE2InstanceId(instanceId) } - restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(restSubscription.lastReqMd5sum) + c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(restSubscription.lastReqMd5sum) c.registry.DeleteRESTSubscription(&restSubId) c.RemoveRESTSubscriptionFromDb(restSubId) }() c.UpdateCounter(cRestSubDelRespToXapp) - return common.UnsubscribeNoContentCode } //------------------------------------------------------------------- // //------------------------------------------------------------------- -func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32) (int64, error) { +func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32, waitRouteCleanupTime time.Duration) (int64, error) { var xAppEventInstanceID int64 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{instanceId}) @@ -755,8 +833,10 @@ func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, // // Wake subs delete // - go c.handleSubscriptionDelete(subs, trans) + subs.OngoingDelCount++ + go c.handleSubscriptionDelete(subs, trans, waitRouteCleanupTime) trans.WaitEvent(0) //blocked wait as timeout is handled in subs side + subs.OngoingDelCount-- xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs)) @@ -765,50 +845,6 @@ func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, return xAppEventInstanceID, nil } -//------------------------------------------------------------------- -// -//------------------------------------------------------------------- -func (c *Control) RESTQueryHandler() (models.SubscriptionList, error) { - xapp.Logger.Debug("RESTQueryHandler() called") - - c.CntRecvMsg++ - - return c.registry.QueryHandler() -} - -func (c *Control) TestRestHandler(w http.ResponseWriter, r *http.Request) { - xapp.Logger.Debug("RESTTestRestHandler() called") - - pathParams := mux.Vars(r) - s := pathParams["testId"] - - // This can be used to delete single subscription from db - if contains := strings.Contains(s, "deletesubid="); contains == true { - var splits = strings.Split(s, "=") - if subId, err := strconv.ParseInt(splits[1], 10, 64); err == nil { - xapp.Logger.Debug("RemoveSubscriptionFromSdl() called. subId = %v", subId) - c.RemoveSubscriptionFromSdl(uint32(subId)) - return - } - } - - // This can be used to remove all subscriptions db from - if s == "emptydb" { - xapp.Logger.Debug("RemoveAllSubscriptionsFromSdl() called") - c.RemoveAllSubscriptionsFromSdl() - c.RemoveAllRESTSubscriptionsFromSdl() - return - } - - // This is meant to cause submgr's restart in testing - if s == "restart" { - xapp.Logger.Debug("os.Exit(1) called") - os.Exit(1) - } - - xapp.Logger.Debug("Unsupported rest command received %s", s) -} - //------------------------------------------------------------------- // //------------------------------------------------------------------- @@ -896,6 +932,11 @@ func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) { xapp.Logger.Debug("MSG from XAPP: %s", params.String()) c.UpdateCounter(cSubReqFromXapp) + if c.e2IfState.IsE2ConnectionUp(¶ms.Meid.RanName) == false { + xapp.Logger.Error("No E2 connection for ranName %v", params.Meid.RanName) + return + } + subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload) if err != nil { xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params)) @@ -914,7 +955,6 @@ func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) { return } - //TODO handle subscription toward e2term inside AssignToSubscription / hide handleSubscriptionCreate in it? subs, _, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c, true) if err != nil { xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans)) @@ -930,8 +970,10 @@ func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) { func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) { e2SubscriptionDirectives, _ := c.GetE2SubscriptionDirectives(nil) - go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives) + subs.OngoingReqCount++ + go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives, waitRouteCleanup_ms) event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side + subs.OngoingReqCount-- var err error if event != nil { switch themsg := event.(type) { @@ -956,7 +998,6 @@ func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *Transaction } } xapp.Logger.Debug("XAPP-SubReq: failed %s", idstring(err, trans, subs)) - //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second) } //------------------------------------------------------------------- @@ -966,6 +1007,11 @@ func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) { xapp.Logger.Debug("MSG from XAPP: %s", params.String()) c.UpdateCounter(cSubDelReqFromXapp) + if c.e2IfState.IsE2ConnectionUp(¶ms.Meid.RanName) == false { + xapp.Logger.Error("No E2 connection for ranName %v", params.Meid.RanName) + return + } + subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload) if err != nil { xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params)) @@ -994,8 +1040,10 @@ func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) { // // Wake subs delete // - go c.handleSubscriptionDelete(subs, trans) + subs.OngoingDelCount++ + go c.handleSubscriptionDelete(subs, trans, waitRouteCleanup_ms) trans.WaitEvent(0) //blocked wait as timeout is handled in subs side + subs.OngoingDelCount-- xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs)) @@ -1015,15 +1063,12 @@ func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) { c.UpdateCounter(cSubDelRespToXapp) c.rmrSendToXapp("", subs, trans) } - - //TODO handle subscription toward e2term insiged RemoveFromSubscription / hide handleSubscriptionDelete in it? - //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second) } //------------------------------------------------------------------- // SUBS CREATE Handling //------------------------------------------------------------------- -func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp, e2SubscriptionDirectives *E2SubscriptionDirectives) { +func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp, e2SubscriptionDirectives *E2SubscriptionDirectives, waitRouteCleanupTime time.Duration) { var event interface{} = nil var removeSubscriptionFromDb bool = false @@ -1042,37 +1087,43 @@ func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *Tran subRfMsg, valid = subs.SetCachedResponse(event, true) subs.SubRespRcvd = true case *e2ap.E2APSubscriptionFailure: - removeSubscriptionFromDb = true subRfMsg, valid = subs.SetCachedResponse(event, false) xapp.Logger.Debug("SUBS-SubReq: internal delete due failure event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans)) - c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans) case *SubmgrRestartTestEvent: - // This simulates that no response has been received and after restart subscriptions are restored from db + // This is used to simulate that no response has been received and after restart, subscriptions are restored from db xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case") + subRfMsg, valid = subs.SetCachedResponse(event, false) + parentTrans.SendEvent(subRfMsg, 0) + return case *PackSubscriptionRequestErrortEvent, *SDLWriteErrortEvent: subRfMsg, valid = subs.SetCachedResponse(event, false) default: + // Timer expiry if subs.PolicyUpdate == false { xapp.Logger.Debug("SUBS-SubReq: internal delete due default event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans)) - removeSubscriptionFromDb = true subRfMsg, valid = subs.SetCachedResponse(nil, false) c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans) + } else { + subRfMsg, valid = subs.SetCachedResponse(nil, true) } } xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans)) } else { xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans)) } + if valid == false { + removeSubscriptionFromDb = true + } err := c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb) if err != nil { - subRfMsg, valid = subs.SetCachedResponse(event, false) + valid = false c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans) } - //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete) + // Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete) if valid == false { - c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c) + c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanupTime, c) } parentTrans.SendEvent(subRfMsg, 0) @@ -1082,7 +1133,7 @@ func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *Tran // SUBS DELETE Handling //------------------------------------------------------------------- -func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) { +func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp, waitRouteCleanupTime time.Duration) { trans := c.tracker.NewSubsTransaction(subs) subs.WaitTransactionTurn(trans) @@ -1100,11 +1151,9 @@ func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *Tran } else { subs.mutex.Unlock() } - //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete) - // If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if - // RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...)) - c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c) - c.registry.UpdateSubscriptionToDb(subs, c) + + // Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete) + c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanupTime, c) parentTrans.SendEvent(nil, 0) } @@ -1122,7 +1171,7 @@ func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *Transact subReqMsg.RequestId.Id = ricRequestorId trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg) if err != nil { - xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans)) + xapp.Logger.Error("SUBS-SubReq ASN1 pack error: %s", idstring(err, trans, subs, parentTrans)) return &PackSubscriptionRequestErrortEvent{ ErrorInfo{ ErrorSource: models.SubscriptionInstanceErrorSourceASN1, @@ -1229,6 +1278,7 @@ func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) { xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs)) return } + xapp.Logger.Debug("SUBS-SubResp: Sending event, trans= %v", trans) sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout) if sendOk == false { err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut) @@ -1432,6 +1482,11 @@ func (c *Control) RemoveRESTSubscriptionFromDb(restSubId string) { func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) { + if c.UTTesting == true { + // Reqistry mutex is not locked after real restart but it can be when restart is simulated in unit tests + c.registry.mutex = new(sync.Mutex) + } + const ricRequestorId = 123 xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId) @@ -1506,11 +1561,7 @@ func (c *Control) PrintRESTSubscriptionRequest(p *models.SubscriptionParams) { fmt.Printf(" E2RetryCount = %v\n", *p.E2SubscriptionDirectives.E2RetryCount) } fmt.Printf(" E2TimeoutTimerValue = %v\n", p.E2SubscriptionDirectives.E2TimeoutTimerValue) - if p.E2SubscriptionDirectives.RMRRoutingNeeded == nil { - fmt.Println(" RMRRoutingNeeded == nil") - } else { - fmt.Printf(" RMRRoutingNeeded = %v\n", *p.E2SubscriptionDirectives.RMRRoutingNeeded) - } + fmt.Printf(" RMRRoutingNeeded = %v\n", p.E2SubscriptionDirectives.RMRRoutingNeeded) } for _, subscriptionDetail := range p.SubscriptionDetails { if p.RANFunctionID != nil {