var e2tMaxSubReqTryCount uint64 // Initial try + retry
var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
var readSubsFromDb string
-var restDuplicateCtrl duplicateCtrl
var dbRetryForever string
var dbTryCount int
type Control struct {
*xapp.RMRClient
- e2ap *E2ap
- registry *Registry
- tracker *Tracker
- e2SubsDb Sdlnterface
- restSubsDb Sdlnterface
- CntRecvMsg uint64
- ResetTestFlag bool
- Counters map[string]xapp.Counter
- LoggerLevel int
+ e2ap *E2ap
+ registry *Registry
+ tracker *Tracker
+ restDuplicateCtrl *DuplicateCtrl
+ e2SubsDb Sdlnterface
+ restSubsDb Sdlnterface
+ CntRecvMsg uint64
+ ResetTestFlag bool
+ Counters map[string]xapp.Counter
+ LoggerLevel int
+ UTTesting bool
}
type RMRMeid struct {
tracker := new(Tracker)
tracker.Init()
+ restDuplicateCtrl := new(DuplicateCtrl)
+ restDuplicateCtrl.Init()
+
c := &Control{e2ap: new(E2ap),
- registry: registry,
- tracker: tracker,
- e2SubsDb: CreateSdl(),
- restSubsDb: CreateRESTSdl(),
- Counters: xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
- LoggerLevel: 3,
+ registry: registry,
+ tracker: tracker,
+ restDuplicateCtrl: restDuplicateCtrl,
+ e2SubsDb: CreateSdl(),
+ restSubsDb: CreateRESTSdl(),
+ Counters: xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
+ LoggerLevel: 4,
}
c.ReadConfigParameters("")
xapp.Resource.InjectRoute("/ric/v1/restsubscriptions", c.GetAllRestSubscriptions, "GET")
xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET")
- go xapp.Subscription.Listen(c.RESTSubscriptionHandler, c.RESTQueryHandler, c.RESTSubscriptionDeleteHandler)
-
if readSubsFromDb == "false" {
return c
}
- restDuplicateCtrl.Init()
-
// Read subscriptions from db
c.ReadE2Subscriptions()
c.ReadRESTSubscriptions()
+
+ go xapp.Subscription.Listen(c.RESTSubscriptionHandler, c.RESTQueryHandler, c.RESTSubscriptionDeleteHandler)
+
return c
}
//-------------------------------------------------------------------
//
//-------------------------------------------------------------------
-func (c *Control) GetOrCreateRestSubscription(p *models.SubscriptionParams, md5sum string, xAppRmrEndpoint string) (*RESTSubscription, string, string, error) {
+func (c *Control) GetOrCreateRestSubscription(p *models.SubscriptionParams, md5sum string, xAppRmrEndpoint string) (*RESTSubscription, string, error) {
var restSubId string
var restSubscription *RESTSubscription
var err error
- prevRestSubsId, exists := restDuplicateCtrl.GetLastKnownRestSubsIdBasedOnMd5sum(md5sum)
+ prevRestSubsId, exists := c.restDuplicateCtrl.GetLastKnownRestSubsIdBasedOnMd5sum(md5sum)
if p.SubscriptionID == "" {
// Subscription does not contain REST subscription Id
if exists {
}
} else {
xapp.Logger.Debug("None existing restSubId %s referred by MD5sum %s for a request without subscription ID - deleting cached entry", prevRestSubsId, md5sum)
- restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
+ c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
}
}
// Subscription with id in REST request does not exist
xapp.Logger.Error("%s", err.Error())
c.UpdateCounter(cRestSubFailToXapp)
- return nil, "", models.SubscriptionInstanceRejectCauseRESTSubscriptionWithGivenIDDoesNotExist, err
+ return nil, "", err
}
if !exists {
}
}
- return restSubscription, restSubId, "", nil
+ return restSubscription, restSubId, nil
}
//-------------------------------------------------------------------
err := fmt.Errorf("ClientEndpoint == nil")
xapp.Logger.Error("%v", err)
c.UpdateCounter(cRestSubFailToXapp)
- return c.GetSubscriptionResponse(models.SubscriptionInstanceRejectCauseInvalidRESTRequestMessage, err.Error(), "SUBMGR", ""), common.SubscribeBadRequestCode
+ return nil, common.SubscribeBadRequestCode
}
_, xAppRmrEndpoint, err := ConstructEndpointAddresses(*p.ClientEndpoint)
if err != nil {
xapp.Logger.Error("%s", err.Error())
c.UpdateCounter(cRestSubFailToXapp)
- return c.GetSubscriptionResponse(models.SubscriptionInstanceRejectCauseInvalidRESTRequestMessage, err.Error(), "SUBMGR", ""), common.SubscribeBadRequestCode
+ return nil, common.SubscribeBadRequestCode
}
md5sum, err := CalculateRequestMd5sum(params)
xapp.Logger.Error("Failed to generate md5sum from incoming request - %s", err.Error())
}
- restSubscription, restSubId, rejectCause, err := c.GetOrCreateRestSubscription(p, md5sum, xAppRmrEndpoint)
+ restSubscription, restSubId, err := c.GetOrCreateRestSubscription(p, md5sum, xAppRmrEndpoint)
if err != nil {
- xapp.Logger.Error("Failed to get/allocate REST subscription")
- return c.GetSubscriptionResponse(rejectCause, err.Error(), "SUBMGR", ""), common.SubscribeBadRequestCode
+ xapp.Logger.Error("Subscription with id in REST request does not exist")
+ return nil, common.SubscribeNotFoundCode
}
subResp.SubscriptionID = &restSubId
err = c.e2ap.FillSubscriptionReqMsgs(params, &subReqList, restSubscription)
if err != nil {
xapp.Logger.Error("%s", err.Error())
- restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
+ c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
c.registry.DeleteRESTSubscription(&restSubId)
c.UpdateCounter(cRestSubFailToXapp)
- return c.GetSubscriptionResponse(models.SubscriptionInstanceRejectCauseInvalidRESTRequestMessage, err.Error(), "SUBMGR", ""), common.SubscribeBadRequestCode
+ return nil, common.SubscribeBadRequestCode
}
- duplicate := restDuplicateCtrl.IsDuplicateToOngoingTransaction(restSubId, md5sum)
+ duplicate := c.restDuplicateCtrl.IsDuplicateToOngoingTransaction(restSubId, md5sum)
if duplicate {
err := fmt.Errorf("Retransmission blocker direct ACK for request of restSubsId %s restSubId MD5sum %s as retransmission", restSubId, md5sum)
xapp.Logger.Debug("%s", err)
e2SubscriptionDirectives, err := c.GetE2SubscriptionDirectives(p)
if err != nil {
xapp.Logger.Error("%s", err)
- return c.GetSubscriptionResponse(models.SubscriptionInstanceRejectCauseInvalidRESTRequestMessage, err.Error(), "SUBMGR", ""), common.SubscribeBadRequestCode
+ return nil, common.SubscribeBadRequestCode
}
go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId, xAppRmrEndpoint, md5sum, e2SubscriptionDirectives)
return nil, fmt.Errorf("p.E2SubscriptionDirectives.E2RetryCount out of range (0-10): %v", *p.E2SubscriptionDirectives.E2RetryCount)
}
}
- if p.E2SubscriptionDirectives.RMRRoutingNeeded == nil {
- xapp.Logger.Error("p.E2SubscriptionDirectives.RMRRoutingNeeded == nil")
- e2SubscriptionDirectives.CreateRMRRoute = true
- } else {
- e2SubscriptionDirectives.CreateRMRRoute = *p.E2SubscriptionDirectives.RMRRoutingNeeded
- }
+ e2SubscriptionDirectives.CreateRMRRoute = p.E2SubscriptionDirectives.RMRRoutingNeeded
}
xapp.Logger.Debug("e2SubscriptionDirectives.E2TimeoutTimerValue: %v", e2SubscriptionDirectives.E2TimeoutTimerValue)
xapp.Logger.Debug("e2SubscriptionDirectives.E2MaxTryCount: %v", e2SubscriptionDirectives.E2MaxTryCount)
return e2SubscriptionDirectives, nil
}
-//-------------------------------------------------------------------
-//
-//-------------------------------------------------------------------
-func (c *Control) GetSubscriptionResponse(rejectCause string, errorCause string, errorSource string, timeoutType string) *models.SubscriptionResponse {
- subResp := models.SubscriptionResponse{}
- subscriptionInstance := models.SubscriptionInstance{}
- subscriptionInstance.RejectCause = &rejectCause
- subscriptionInstance.ErrorCause = &errorCause
- subscriptionInstance.ErrorSource = &errorSource
- if timeoutType != "" {
- subscriptionInstance.TimeoutType = &timeoutType
- }
- subResp.SubscriptionInstances = append(subResp.SubscriptionInstances, &subscriptionInstance)
- xapp.Logger.Error("etSubscriptionResponse() %+v", subscriptionInstance)
-
- return &subResp
-}
-
//-------------------------------------------------------------------
//
//-------------------------------------------------------------------
func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription, subReqList *e2ap.SubscriptionRequestList,
clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string, xAppRmrEndpoint string, md5sum string, e2SubscriptionDirectives *E2SubscriptionDirectives) {
+ c.SubscriptionProcessingStartDelay()
xapp.Logger.Debug("Subscription Request count=%v ", len(subReqList.E2APSubscriptionRequests))
var xAppEventInstanceID int64
var e2EventInstanceID int64
errorInfo := &ErrorInfo{}
- defer restDuplicateCtrl.SetMd5sumFromLastOkRequest(*restSubId, md5sum)
+ defer c.restDuplicateCtrl.SetMd5sumFromLastOkRequest(*restSubId, md5sum)
for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ {
subReqMsg := subReqList.E2APSubscriptionRequests[index]
subRespMsg, errorInfo, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, *restSubId, e2SubscriptionDirectives)
xapp.Logger.Debug("Handled SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
+ trans.Release()
if err != nil {
c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans, errorInfo)
index, *restSubId, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
c.sendSuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, e2EventInstanceID, clientEndpoint, trans)
}
- trans.Release()
+ }
+}
+
+//-------------------------------------------------------------------
+//
+//------------------------------------------------------------------
+func (c *Control) SubscriptionProcessingStartDelay() {
+ if c.UTTesting == true {
+ // This is temporary fix for the UT problem that notification arrives before subscription response
+ // Correct fix would be to allow notification come before response and process it correctly
+ xapp.Logger.Debug("Setting 50 ms delay before starting processing Subscriptions")
+ <-time.After(time.Millisecond * 50)
+ xapp.Logger.Debug("Continuing after delay")
}
}
SubscriptionID: restSubId,
SubscriptionInstances: []*models.SubscriptionInstance{
&models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
- ErrorCause: &errorInfo.ErrorCause,
- ErrorSource: &errorInfo.ErrorSource,
- TimeoutType: &errorInfo.TimeoutType,
+ ErrorCause: errorInfo.ErrorCause,
+ ErrorSource: errorInfo.ErrorSource,
+ TimeoutType: errorInfo.TimeoutType,
XappEventInstanceID: &xAppEventInstanceID},
},
}
SubscriptionID: restSubId,
SubscriptionInstances: []*models.SubscriptionInstance{
&models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
- ErrorCause: nil,
+ ErrorCause: "",
XappEventInstanceID: &xAppEventInstanceID},
},
}
restSubscription.DeleteXappIdToE2Id(xAppEventInstanceID)
restSubscription.DeleteE2InstanceId(instanceId)
}
- restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(restSubscription.lastReqMd5sum)
+ c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(restSubscription.lastReqMd5sum)
c.registry.DeleteRESTSubscription(&restSubId)
c.RemoveRESTSubscriptionFromDb(restSubId)
}()
fmt.Printf(" E2RetryCount = %v\n", *p.E2SubscriptionDirectives.E2RetryCount)
}
fmt.Printf(" E2TimeoutTimerValue = %v\n", p.E2SubscriptionDirectives.E2TimeoutTimerValue)
- if p.E2SubscriptionDirectives.RMRRoutingNeeded == nil {
- fmt.Println(" RMRRoutingNeeded == nil")
- } else {
- fmt.Printf(" RMRRoutingNeeded = %v\n", *p.E2SubscriptionDirectives.RMRRoutingNeeded)
- }
+ fmt.Printf(" RMRRoutingNeeded = %v\n", p.E2SubscriptionDirectives.RMRRoutingNeeded)
}
for _, subscriptionDetail := range p.SubscriptionDetails {
if p.RANFunctionID != nil {