2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
28 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
29 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
30 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
31 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/common"
32 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
33 httptransport "github.com/go-openapi/runtime/client"
34 "github.com/go-openapi/strfmt"
35 "github.com/segmentio/ksuid"
36 "github.com/spf13/viper"
39 //-----------------------------------------------------------------------------
41 //-----------------------------------------------------------------------------
43 func idstring(err error, entries ...fmt.Stringer) string {
44 var retval string = ""
45 var filler string = ""
46 for _, entry := range entries {
48 retval += filler + entry.String()
51 retval += filler + "(NIL)"
55 retval += filler + "err(" + err.Error() + ")"
61 //-----------------------------------------------------------------------------
63 //-----------------------------------------------------------------------------
65 var e2tSubReqTimeout time.Duration
66 var e2tSubDelReqTime time.Duration
67 var e2tRecvMsgTimeout time.Duration
68 var waitRouteCleanup_ms time.Duration
69 var e2tMaxSubReqTryCount uint64 // Initial try + retry
70 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
71 var checkE2State string
72 var readSubsFromDb string
73 var dbRetryForever string
75 var e2IEOrderCheckValue uint8
82 restDuplicateCtrl *DuplicateCtrl
84 e2IfStateDb XappRnibInterface
86 restSubsDb Sdlnterface
89 Counters map[string]xapp.Counter
100 type SubmgrRestartTestEvent struct{}
101 type SubmgrRestartUpEvent struct{}
102 type PackSubscriptionRequestErrortEvent struct {
106 func (p *PackSubscriptionRequestErrortEvent) SetEvent(errorInfo *ErrorInfo) {
107 p.ErrorInfo = *errorInfo
110 type SDLWriteErrortEvent struct {
114 func (s *SDLWriteErrortEvent) SetEvent(errorInfo *ErrorInfo) {
115 s.ErrorInfo = *errorInfo
119 xapp.Logger.Debug("SUBMGR")
121 viper.SetEnvPrefix("submgr")
122 viper.AllowEmptyEnv(true)
125 func NewControl() *Control {
127 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
128 rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
130 registry := new(Registry)
131 registry.Initialize()
132 registry.rtmgrClient = &rtmgrClient
134 tracker := new(Tracker)
137 restDuplicateCtrl := new(DuplicateCtrl)
138 restDuplicateCtrl.Init()
140 e2IfState := new(E2IfState)
142 c := &Control{e2ap: new(E2ap),
145 restDuplicateCtrl: restDuplicateCtrl,
146 e2IfState: e2IfState,
147 e2IfStateDb: CreateXappRnibIfInstance(),
148 e2SubsDb: CreateSdl(),
149 restSubsDb: CreateRESTSdl(),
150 Counters: xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
155 c.ReadConfigParameters("")
157 // Register REST handler for testing support
158 xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET")
159 xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
160 xapp.Resource.InjectRoute("/ric/v1/restsubscriptions", c.GetAllRestSubscriptions, "GET")
162 xapp.Resource.InjectRoute("/ric/v1/get_all_e2nodes", c.GetAllE2Nodes, "GET")
163 xapp.Resource.InjectRoute("/ric/v1/get_e2node_rest_subscriptions/{ranName}", c.GetAllE2NodeRestSubscriptions, "GET")
165 xapp.Resource.InjectRoute("/ric/v1/get_all_xapps", c.GetAllXapps, "GET")
166 xapp.Resource.InjectRoute("/ric/v1/get_xapp_rest_restsubscriptions/{xappServiceName}", c.GetAllXappRestSubscriptions, "GET")
167 xapp.Resource.InjectRoute("/ric/v1/get_e2subscriptions/{restId}", c.GetE2Subscriptions, "GET")
169 xapp.Resource.InjectRoute("/ric/v1/delete_all_e2node_subscriptions/{ranName}", c.DeleteAllE2nodeSubscriptions, "DELETE")
170 xapp.Resource.InjectRoute("/ric/v1/delete_all_xapp_subscriptions/{xappServiceName}", c.DeleteAllXappSubscriptions, "DELETE")
172 if readSubsFromDb == "true" {
173 // Read subscriptions from db
174 err := c.ReadE2Subscriptions()
176 xapp.Logger.Error("ReadE2Subscriptions() failed %s", err.Error())
178 err = c.ReadRESTSubscriptions()
180 xapp.Logger.Error("ReadRESTSubscriptions() failed %s", err.Error())
185 err := xapp.Subscription.Listen(c.RESTSubscriptionHandler, c.RESTQueryHandler, c.RESTSubscriptionDeleteHandler)
187 xapp.Logger.Error("xapp.Subscription.Listen failure: %s", err.Error())
193 func (c *Control) SymptomDataHandler(w http.ResponseWriter, r *http.Request) {
194 subscriptions, err := c.registry.QueryHandler()
196 xapp.Logger.Error("QueryHandler() failed %s", err.Error())
199 xapp.Resource.SendSymptomDataJson(w, r, subscriptions, "platform/subscriptions.json")
202 //-------------------------------------------------------------------
204 //-------------------------------------------------------------------
205 func (c *Control) RESTQueryHandler() (models.SubscriptionList, error) {
206 xapp.Logger.Debug("RESTQueryHandler() called")
210 return c.registry.QueryHandler()
213 //-------------------------------------------------------------------
215 //-------------------------------------------------------------------
216 func (c *Control) ReadE2Subscriptions() error {
219 var register map[uint32]*Subscription
220 for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
221 xapp.Logger.Debug("Reading E2 subscriptions from db")
222 subIds, register, err = c.ReadAllSubscriptionsFromSdl()
224 xapp.Logger.Error("%v", err)
225 <-time.After(1 * time.Second)
227 c.registry.subIds = subIds
228 c.registry.register = register
229 go c.HandleUncompletedSubscriptions(register)
233 xapp.Logger.Debug("Continuing without retring")
237 //-------------------------------------------------------------------
239 //-------------------------------------------------------------------
240 func (c *Control) ReadRESTSubscriptions() error {
242 xapp.Logger.Debug("ReadRESTSubscriptions()")
244 var restSubscriptions map[string]*RESTSubscription
245 for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
246 xapp.Logger.Debug("Reading REST subscriptions from db")
247 restSubscriptions, err = c.ReadAllRESTSubscriptionsFromSdl()
249 xapp.Logger.Error("%v", err)
250 <-time.After(1 * time.Second)
252 // Fix REST subscriptions ongoing status after restart
253 for restSubId, restSubscription := range restSubscriptions {
254 restSubscription.SubReqOngoing = false
255 restSubscription.SubDelReqOngoing = false
256 err := c.WriteRESTSubscriptionToSdl(restSubId, restSubscription)
258 xapp.Logger.Error("WriteRESTSubscriptionToSdl() failed:%s", err.Error())
261 c.registry.restSubscriptions = restSubscriptions
265 xapp.Logger.Debug("Continuing without retring")
269 //-------------------------------------------------------------------
271 //-------------------------------------------------------------------
272 func (c *Control) ReadConfigParameters(f string) {
274 xapp.Logger.Debug("ReadConfigParameters")
276 c.LoggerLevel = int(xapp.Logger.GetLevel())
277 xapp.Logger.Info("LoggerLevel = %v", c.LoggerLevel)
278 c.e2ap.SetASN1DebugPrintStatus(c.LoggerLevel)
280 // viper.GetDuration returns nanoseconds
281 e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
282 if e2tSubReqTimeout == 0 {
283 e2tSubReqTimeout = 2000 * 1000000
284 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tSubReqTimeout")
286 xapp.Logger.Debug("e2tSubReqTimeout= %v", e2tSubReqTimeout)
288 e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
289 if e2tSubDelReqTime == 0 {
290 e2tSubDelReqTime = 2000 * 1000000
291 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tSubDelReqTime")
293 xapp.Logger.Debug("e2tSubDelReqTime= %v", e2tSubDelReqTime)
295 e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
296 if e2tRecvMsgTimeout == 0 {
297 e2tRecvMsgTimeout = 2000 * 1000000
298 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tRecvMsgTimeout")
300 xapp.Logger.Debug("e2tRecvMsgTimeout= %v", e2tRecvMsgTimeout)
302 e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
303 if e2tMaxSubReqTryCount == 0 {
304 e2tMaxSubReqTryCount = 1
305 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tMaxSubReqTryCount")
307 xapp.Logger.Debug("e2tMaxSubReqTryCount= %v", e2tMaxSubReqTryCount)
309 e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
310 if e2tMaxSubDelReqTryCount == 0 {
311 e2tMaxSubDelReqTryCount = 1
312 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tMaxSubDelReqTryCount")
314 xapp.Logger.Debug("e2tMaxSubDelReqTryCount= %v", e2tMaxSubDelReqTryCount)
316 checkE2State = viper.GetString("controls.checkE2State")
317 if checkE2State == "" {
318 checkE2State = "true"
319 xapp.Logger.Debug("WARNING: Using hard coded default value for checkE2State")
321 xapp.Logger.Debug("checkE2State= %v", checkE2State)
323 readSubsFromDb = viper.GetString("controls.readSubsFromDb")
324 if readSubsFromDb == "" {
325 readSubsFromDb = "true"
326 xapp.Logger.Debug("WARNING: Using hard coded default value for readSubsFromDb")
328 xapp.Logger.Debug("readSubsFromDb= %v", readSubsFromDb)
330 dbTryCount = viper.GetInt("controls.dbTryCount")
333 xapp.Logger.Debug("WARNING: Using hard coded default value for dbTryCount")
335 xapp.Logger.Debug("dbTryCount= %v", dbTryCount)
337 dbRetryForever = viper.GetString("controls.dbRetryForever")
338 if dbRetryForever == "" {
339 dbRetryForever = "true"
340 xapp.Logger.Debug("WARNING: Using hard coded default value for dbRetryForever")
342 xapp.Logger.Debug("dbRetryForever= %v", dbRetryForever)
344 // Internal cfg parameter, used to define a wait time for RMR route clean-up. None default
345 // value 100ms used currently only in unittests.
346 waitRouteCleanup_ms = viper.GetDuration("controls.waitRouteCleanup_ms") * 1000000
347 if waitRouteCleanup_ms == 0 {
348 waitRouteCleanup_ms = 5000 * 1000000
349 xapp.Logger.Debug("WARNING: Using hard coded default value for waitRouteCleanup_ms")
351 xapp.Logger.Debug("waitRouteCleanup= %v", waitRouteCleanup_ms)
353 viper.SetDefault("controls.checkE2IEOrder", 1)
354 e2IEOrderCheckValue = uint8(viper.GetUint("controls.checkE2IEOrder"))
355 c.e2ap.SetE2IEOrderCheck(e2IEOrderCheckValue)
356 xapp.Logger.Debug("e2IEOrderCheck= %v", e2IEOrderCheckValue)
359 //-------------------------------------------------------------------
361 //-------------------------------------------------------------------
362 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
364 xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
365 for subId, subs := range register {
366 if subs.SubRespRcvd == false {
367 // If policy subscription has already been made successfully unsuccessful update should not be deleted.
368 if subs.PolicyUpdate == false {
369 subs.NoRespToXapp = true
370 xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
371 c.SendSubscriptionDeleteReq(subs, false)
377 func (c *Control) ReadyCB(data interface{}) {
378 if c.RMRClient == nil {
379 c.RMRClient = xapp.Rmr
383 func (c *Control) Run() {
384 xapp.SetReadyCB(c.ReadyCB, nil)
385 xapp.AddConfigChangeListener(c.ReadConfigParameters)
389 //-------------------------------------------------------------------
391 //-------------------------------------------------------------------
392 func (c *Control) GetOrCreateRestSubscription(p *models.SubscriptionParams, md5sum string, xAppRmrEndpoint string, xAppServiceName string) (*RESTSubscription, string, error) {
395 var restSubscription *RESTSubscription
398 prevRestSubsId, exists := c.restDuplicateCtrl.GetLastKnownRestSubsIdBasedOnMd5sum(md5sum)
399 if p.SubscriptionID == "" {
400 // Subscription does not contain REST subscription Id
402 restSubscription, err = c.registry.GetRESTSubscription(prevRestSubsId, false)
403 if restSubscription != nil {
404 // Subscription not found
405 restSubId = prevRestSubsId
407 xapp.Logger.Debug("Existing restSubId %s found by MD5sum %s for a request without subscription ID - using previous subscription", prevRestSubsId, md5sum)
409 xapp.Logger.Debug("Existing restSubId %s found by MD5sum %s for a request without subscription ID - Note: %s", prevRestSubsId, md5sum, err.Error())
412 xapp.Logger.Debug("None existing restSubId %s referred by MD5sum %s for a request without subscription ID - deleting cached entry", prevRestSubsId, md5sum)
413 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
417 if restSubscription == nil {
418 restSubId = ksuid.New().String()
419 restSubscription = c.registry.CreateRESTSubscription(&restSubId, &xAppServiceName, &xAppRmrEndpoint, p.Meid)
422 // Subscription contains REST subscription Id
423 restSubId = p.SubscriptionID
425 xapp.Logger.Debug("RestSubscription ID %s provided via REST request", restSubId)
426 restSubscription, err = c.registry.GetRESTSubscription(restSubId, false)
428 // Subscription with id in REST request does not exist
429 xapp.Logger.Error("%s", err.Error())
430 c.UpdateCounter(cRestSubFailToXapp)
435 xapp.Logger.Debug("Existing restSubscription found for ID %s, new request based on md5sum", restSubId)
437 xapp.Logger.Debug("Existing restSubscription found for ID %s(%s), re-transmission based on md5sum match with previous request", prevRestSubsId, restSubId)
441 return restSubscription, restSubId, nil
444 //-------------------------------------------------------------------
446 //-------------------------------------------------------------------
447 func (c *Control) RESTSubscriptionHandler(params interface{}) (*models.SubscriptionResponse, int) {
450 c.UpdateCounter(cRestSubReqFromXapp)
452 subResp := models.SubscriptionResponse{}
453 p := params.(*models.SubscriptionParams)
455 if c.LoggerLevel > 2 {
456 c.PrintRESTSubscriptionRequest(p)
459 if c.e2IfState.IsE2ConnectionUp(p.Meid) == false || c.e2IfState.IsE2ConnectionUnderReset(p.Meid) == true {
460 if c.e2IfState.IsE2ConnectionUp(p.Meid) == false {
461 xapp.Logger.Error("No E2 connection for ranName %v", *p.Meid)
462 } else if c.e2IfState.IsE2ConnectionUnderReset(p.Meid) == true {
463 xapp.Logger.Error("E2 Node for ranName %v UNDER RESET", *p.Meid)
465 c.UpdateCounter(cRestReqRejDueE2Down)
466 return nil, common.SubscribeServiceUnavailableCode
469 if p.ClientEndpoint == nil {
470 err := fmt.Errorf("ClientEndpoint == nil")
471 xapp.Logger.Error("%v", err)
472 c.UpdateCounter(cRestSubFailToXapp)
473 return nil, common.SubscribeBadRequestCode
476 e2SubscriptionDirectives, err := c.GetE2SubscriptionDirectives(p)
478 xapp.Logger.Error("%s", err)
479 c.UpdateCounter(cRestSubFailToXapp)
480 return nil, common.SubscribeBadRequestCode
482 _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*p.ClientEndpoint)
484 xapp.Logger.Error("%s", err.Error())
485 c.UpdateCounter(cRestSubFailToXapp)
486 return nil, common.SubscribeBadRequestCode
489 md5sum, err := CalculateRequestMd5sum(params)
491 xapp.Logger.Error("Failed to generate md5sum from incoming request - %s", err.Error())
494 restSubscription, restSubId, err := c.GetOrCreateRestSubscription(p, md5sum, xAppRmrEndpoint, p.ClientEndpoint.Host)
496 xapp.Logger.Error("Subscription with id in REST request does not exist")
497 return nil, common.SubscribeNotFoundCode
500 subResp.SubscriptionID = &restSubId
501 subReqList := e2ap.SubscriptionRequestList{}
502 err = c.e2ap.FillSubscriptionReqMsgs(params, &subReqList, restSubscription)
504 xapp.Logger.Error("%s", err.Error())
505 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
506 c.registry.DeleteRESTSubscription(&restSubId)
507 c.UpdateCounter(cRestSubFailToXapp)
508 return nil, common.SubscribeBadRequestCode
511 duplicate := c.restDuplicateCtrl.IsDuplicateToOngoingTransaction(restSubId, md5sum)
513 err := fmt.Errorf("Retransmission blocker direct ACK for request of restSubsId %s restSubId MD5sum %s as retransmission", restSubId, md5sum)
514 xapp.Logger.Debug("%s", err)
515 c.registry.DeleteRESTSubscription(&restSubId)
516 c.UpdateCounter(cRestSubRespToXapp)
517 return &subResp, common.SubscribeCreatedCode
520 c.WriteRESTSubscriptionToDb(restSubId, restSubscription)
521 go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId, xAppRmrEndpoint, md5sum, e2SubscriptionDirectives)
523 c.UpdateCounter(cRestSubRespToXapp)
524 return &subResp, common.SubscribeCreatedCode
527 //-------------------------------------------------------------------
529 //-------------------------------------------------------------------
530 func (c *Control) GetE2SubscriptionDirectives(p *models.SubscriptionParams) (*E2SubscriptionDirectives, error) {
532 e2SubscriptionDirectives := &E2SubscriptionDirectives{}
533 if p == nil || p.E2SubscriptionDirectives == nil {
534 e2SubscriptionDirectives.E2TimeoutTimerValue = e2tSubReqTimeout
535 e2SubscriptionDirectives.E2MaxTryCount = int64(e2tMaxSubReqTryCount)
536 e2SubscriptionDirectives.CreateRMRRoute = true
537 xapp.Logger.Debug("p == nil || p.E2SubscriptionDirectives == nil. Using default values for E2TimeoutTimerValue = %v and E2RetryCount = %v RMRRoutingNeeded = true", e2tSubReqTimeout, e2tMaxSubReqTryCount)
539 if p.E2SubscriptionDirectives.E2TimeoutTimerValue >= 1 && p.E2SubscriptionDirectives.E2TimeoutTimerValue <= 10 {
540 e2SubscriptionDirectives.E2TimeoutTimerValue = time.Duration(p.E2SubscriptionDirectives.E2TimeoutTimerValue) * 1000000000 // Duration type cast returns nano seconds
542 return nil, fmt.Errorf("p.E2SubscriptionDirectives.E2TimeoutTimerValue out of range (1-10 seconds): %v", p.E2SubscriptionDirectives.E2TimeoutTimerValue)
544 if p.E2SubscriptionDirectives.E2RetryCount == nil {
545 xapp.Logger.Error("p.E2SubscriptionDirectives.E2RetryCount == nil. Using default value")
546 e2SubscriptionDirectives.E2MaxTryCount = int64(e2tMaxSubReqTryCount)
548 if *p.E2SubscriptionDirectives.E2RetryCount >= 0 && *p.E2SubscriptionDirectives.E2RetryCount <= 10 {
549 e2SubscriptionDirectives.E2MaxTryCount = *p.E2SubscriptionDirectives.E2RetryCount + 1 // E2MaxTryCount = First sending plus two retries
551 return nil, fmt.Errorf("p.E2SubscriptionDirectives.E2RetryCount out of range (0-10): %v", *p.E2SubscriptionDirectives.E2RetryCount)
554 e2SubscriptionDirectives.CreateRMRRoute = p.E2SubscriptionDirectives.RMRRoutingNeeded
556 xapp.Logger.Debug("e2SubscriptionDirectives.E2TimeoutTimerValue: %v", e2SubscriptionDirectives.E2TimeoutTimerValue)
557 xapp.Logger.Debug("e2SubscriptionDirectives.E2MaxTryCount: %v", e2SubscriptionDirectives.E2MaxTryCount)
558 xapp.Logger.Debug("e2SubscriptionDirectives.CreateRMRRoute: %v", e2SubscriptionDirectives.CreateRMRRoute)
559 return e2SubscriptionDirectives, nil
562 //-------------------------------------------------------------------
564 //-------------------------------------------------------------------
566 func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription, subReqList *e2ap.SubscriptionRequestList,
567 clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string, xAppRmrEndpoint string, md5sum string, e2SubscriptionDirectives *E2SubscriptionDirectives) {
569 c.SubscriptionProcessingStartDelay()
570 xapp.Logger.Debug("E2 SubscriptionRequest count = %v ", len(subReqList.E2APSubscriptionRequests))
572 var xAppEventInstanceID int64
573 var e2EventInstanceID int64
574 errorInfo := &ErrorInfo{}
576 defer c.restDuplicateCtrl.SetMd5sumFromLastOkRequest(*restSubId, md5sum)
578 for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ {
579 subReqMsg := subReqList.E2APSubscriptionRequests[index]
580 xAppEventInstanceID = (int64)(subReqMsg.RequestId.Id)
582 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), *restSubId, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid})
584 // Send notification to xApp that prosessing of a Subscription Request has failed.
585 err := fmt.Errorf("Tracking failure")
586 errorInfo.ErrorCause = err.Error()
587 c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans, errorInfo)
591 xapp.Logger.Debug("Handle SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
593 subRespMsg, errorInfo, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, *restSubId, e2SubscriptionDirectives)
595 xapp.Logger.Debug("Handled SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
599 if err.Error() == "TEST: restart event received" {
600 // This is just for UT cases. Stop here subscription processing
603 c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans, errorInfo)
605 e2EventInstanceID = (int64)(subRespMsg.RequestId.InstanceId)
606 restSubscription.AddMd5Sum(md5sum)
607 xapp.Logger.Debug("SubscriptionRequest index=%v processed successfullyfor %s. endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
608 index, *restSubId, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
609 c.sendSuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, e2EventInstanceID, clientEndpoint, trans, errorInfo)
614 //-------------------------------------------------------------------
616 //------------------------------------------------------------------
617 func (c *Control) SubscriptionProcessingStartDelay() {
618 if c.UTTesting == true {
619 // This is temporary fix for the UT problem that notification arrives before subscription response
620 // Correct fix would be to allow notification come before response and process it correctly
621 xapp.Logger.Debug("Setting 50 ms delay before starting processing Subscriptions")
622 <-time.After(time.Millisecond * 50)
623 xapp.Logger.Debug("Continuing after delay")
627 //-------------------------------------------------------------------
629 //------------------------------------------------------------------
630 func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, meid *string,
631 restSubId string, e2SubscriptionDirectives *E2SubscriptionDirectives) (*e2ap.E2APSubscriptionResponse, *ErrorInfo, error) {
633 errorInfo := ErrorInfo{}
635 err := c.tracker.Track(trans)
637 xapp.Logger.Error("XAPP-SubReq Tracking error: %s", idstring(err, trans))
638 errorInfo.ErrorCause = err.Error()
639 err = fmt.Errorf("Tracking failure")
640 return nil, &errorInfo, err
643 subs, errorInfo, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c, e2SubscriptionDirectives.CreateRMRRoute)
645 xapp.Logger.Error("XAPP-SubReq Assign error: %s", idstring(err, trans))
646 return nil, &errorInfo, err
652 subs.OngoingReqCount++
653 go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives, 0)
654 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
655 subs.OngoingReqCount--
659 switch themsg := event.(type) {
660 case *e2ap.E2APSubscriptionResponse:
662 if c.e2IfState.IsE2ConnectionUp(meid) == true {
663 errorInfo = c.e2ap.CheckActionNotAdmittedList(xapp.RIC_SUB_RESP, themsg.ActionNotAdmittedList, c)
664 return themsg, &errorInfo, nil
666 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
667 c.RemoveSubscriptionFromDb(subs)
668 err = fmt.Errorf("E2 interface down")
669 errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
671 case *e2ap.E2APSubscriptionFailure:
672 err = fmt.Errorf("RICSubscriptionFailure. E2NodeCause: (Cause:%v, Value %v)", themsg.Cause.Content, themsg.Cause.Value)
673 errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
674 case *PackSubscriptionRequestErrortEvent:
675 err = fmt.Errorf("E2 RICSubscriptionRequest pack failure")
676 errorInfo = themsg.ErrorInfo
677 case *SDLWriteErrortEvent:
678 err = fmt.Errorf("SDL write failure")
679 errorInfo = themsg.ErrorInfo
680 case *SubmgrRestartTestEvent:
681 err = fmt.Errorf("TEST: restart event received")
682 xapp.Logger.Debug("%s", err)
683 return nil, &errorInfo, err
685 err = fmt.Errorf("Unexpected E2 subscription response received")
686 errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
691 err = fmt.Errorf("E2 RICSubscriptionResponse timeout")
692 errorInfo.SetInfo(err.Error(), "", models.SubscriptionInstanceTimeoutTypeE2Timeout)
693 if subs.PolicyUpdate == true {
694 return nil, &errorInfo, err
698 xapp.Logger.Error("XAPP-SubReq E2 subscription failed: %s", idstring(err, trans, subs))
699 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
701 return nil, &errorInfo, err
704 //-------------------------------------------------------------------
706 //-------------------------------------------------------------------
707 func (c *Control) sendUnsuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, err error,
708 clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp, errorInfo *ErrorInfo) {
710 // Send notification to xApp that prosessing of a Subscription Request has failed.
711 e2EventInstanceID := (int64)(0)
712 if errorInfo.ErrorSource == "" {
713 // Submgr is default source of error
714 errorInfo.ErrorSource = models.SubscriptionInstanceErrorSourceSUBMGR
716 resp := &models.SubscriptionResponse{
717 SubscriptionID: restSubId,
718 SubscriptionInstances: []*models.SubscriptionInstance{
719 &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
720 ErrorCause: errorInfo.ErrorCause,
721 ErrorSource: errorInfo.ErrorSource,
722 TimeoutType: errorInfo.TimeoutType,
723 XappEventInstanceID: &xAppEventInstanceID},
726 // Mark REST subscription request processed.
727 restSubscription.SetProcessed(err)
728 c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
730 xapp.Logger.Debug("Sending unsuccessful REST notification: ErrorCause:%s, ErrorSource:%s, TimeoutType:%s, to Endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
731 errorInfo.ErrorCause, errorInfo.ErrorSource, errorInfo.TimeoutType, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
733 xapp.Logger.Debug("Sending unsuccessful REST notification: ErrorCause:%s, ErrorSource:%s, TimeoutType:%s, to Endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v",
734 errorInfo.ErrorCause, errorInfo.ErrorSource, errorInfo.TimeoutType, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID)
737 c.UpdateCounter(cRestSubFailNotifToXapp)
738 err = xapp.Subscription.Notify(resp, *clientEndpoint)
740 xapp.Logger.Error("xapp.Subscription.Notify failed %s", err.Error())
743 // E2 is down. Delete completely processed request safely now
744 if c.e2IfState.IsE2ConnectionUp(&restSubscription.Meid) == false && restSubscription.SubReqOngoing == false {
745 c.registry.DeleteRESTSubscription(restSubId)
746 c.RemoveRESTSubscriptionFromDb(*restSubId)
750 //-------------------------------------------------------------------
752 //-------------------------------------------------------------------
753 func (c *Control) sendSuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, e2EventInstanceID int64,
754 clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp, errorInfo *ErrorInfo) {
756 // Store successfully processed InstanceId for deletion
757 restSubscription.AddE2InstanceId((uint32)(e2EventInstanceID))
758 restSubscription.AddXappIdToE2Id(xAppEventInstanceID, e2EventInstanceID)
760 // Send notification to xApp that a Subscription Request has been processed.
761 resp := &models.SubscriptionResponse{
762 SubscriptionID: restSubId,
763 SubscriptionInstances: []*models.SubscriptionInstance{
764 &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
765 ErrorCause: errorInfo.ErrorCause,
766 ErrorSource: errorInfo.ErrorSource,
767 XappEventInstanceID: &xAppEventInstanceID},
770 // Mark REST subscription request processesd.
771 restSubscription.SetProcessed(nil)
772 c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
773 xapp.Logger.Debug("Sending successful REST notification: ErrorCause:%s, ErrorSource:%s, TimeoutType:%s, to Endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
774 errorInfo.ErrorCause, errorInfo.ErrorSource, errorInfo.TimeoutType, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
775 c.UpdateCounter(cRestSubNotifToXapp)
776 err := xapp.Subscription.Notify(resp, *clientEndpoint)
778 xapp.Logger.Error("xapp.Subscription.Notify failed %s", err.Error())
781 // E2 is down. Delete completely processed request safely now
782 if c.e2IfState.IsE2ConnectionUp(&restSubscription.Meid) == false && restSubscription.SubReqOngoing == false {
783 c.registry.DeleteRESTSubscription(restSubId)
784 c.RemoveRESTSubscriptionFromDb(*restSubId)
788 //-------------------------------------------------------------------
790 //-------------------------------------------------------------------
791 func (c *Control) RESTSubscriptionDeleteHandler(restSubId string) int {
794 c.UpdateCounter(cRestSubDelReqFromXapp)
796 xapp.Logger.Debug("SubscriptionDeleteRequest from XAPP")
798 restSubscription, err := c.registry.GetRESTSubscription(restSubId, true)
800 xapp.Logger.Error("%s", err.Error())
801 if restSubscription == nil {
802 // Subscription was not found
803 c.UpdateCounter(cRestSubDelRespToXapp)
804 return common.UnsubscribeNoContentCode
806 if restSubscription.SubReqOngoing == true {
807 err := fmt.Errorf("Handling of the REST Subscription Request still ongoing %s", restSubId)
808 xapp.Logger.Error("%s", err.Error())
809 c.UpdateCounter(cRestSubDelFailToXapp)
810 return common.UnsubscribeBadRequestCode
811 } else if restSubscription.SubDelReqOngoing == true {
812 // Previous request for same restSubId still ongoing
813 c.UpdateCounter(cRestSubDelRespToXapp)
814 return common.UnsubscribeNoContentCode
819 xAppRmrEndPoint := restSubscription.xAppRmrEndPoint
821 xapp.Logger.Debug("Deleteting handler: processing instances = %v", restSubscription.InstanceIds)
822 for _, instanceId := range restSubscription.InstanceIds {
823 xAppEventInstanceID, err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId, 0)
826 xapp.Logger.Error("%s", err.Error())
828 xapp.Logger.Debug("Deleteting instanceId = %v", instanceId)
829 restSubscription.DeleteXappIdToE2Id(xAppEventInstanceID)
830 restSubscription.DeleteE2InstanceId(instanceId)
832 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(restSubscription.lastReqMd5sum)
833 c.registry.DeleteRESTSubscription(&restSubId)
834 c.RemoveRESTSubscriptionFromDb(restSubId)
837 c.UpdateCounter(cRestSubDelRespToXapp)
838 return common.UnsubscribeNoContentCode
841 //-------------------------------------------------------------------
843 //-------------------------------------------------------------------
844 func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32, waitRouteCleanupTime time.Duration) (int64, error) {
846 var xAppEventInstanceID int64
847 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{instanceId})
849 xapp.Logger.Debug("Subscription Delete Handler subscription for restSubId=%v, E2EventInstanceID=%v not found %s",
850 restSubId, instanceId, idstring(err, nil))
851 return xAppEventInstanceID, nil
854 xAppEventInstanceID = int64(subs.ReqId.Id)
855 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(*endPoint), *restSubId, e2ap.RequestId{subs.ReqId.Id, 0}, &xapp.RMRMeid{RanName: *meid})
857 err := fmt.Errorf("XAPP-SubDelReq transaction not created. restSubId %s, endPoint %s, meid %s, instanceId %v", *restSubId, *endPoint, *meid, instanceId)
858 xapp.Logger.Error("%s", err.Error())
860 defer trans.Release()
862 err = c.tracker.Track(trans)
864 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
865 xapp.Logger.Error("%s", err.Error())
866 return xAppEventInstanceID, &time.ParseError{}
871 subs.OngoingDelCount++
872 go c.handleSubscriptionDelete(subs, trans, waitRouteCleanupTime)
873 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
874 subs.OngoingDelCount--
876 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
878 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
880 return xAppEventInstanceID, nil
883 //-------------------------------------------------------------------
885 //-------------------------------------------------------------------
887 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
888 params := &xapp.RMRParams{}
889 params.Mtype = trans.GetMtype()
890 params.SubId = int(subs.GetReqId().InstanceId)
892 params.Meid = subs.GetMeid()
894 params.PayloadLen = len(trans.Payload.Buf)
895 params.Payload = trans.Payload.Buf
897 xapp.Logger.Debug("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
898 err = c.SendWithRetry(params, false, 5)
900 xapp.Logger.Error("rmrSendToE2T: Send failed: %+v", err)
905 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
907 params := &xapp.RMRParams{}
908 params.Mtype = trans.GetMtype()
909 params.SubId = int(subs.GetReqId().InstanceId)
910 params.Xid = trans.GetXid()
911 params.Meid = trans.GetMeid()
913 params.PayloadLen = len(trans.Payload.Buf)
914 params.Payload = trans.Payload.Buf
916 xapp.Logger.Debug("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
917 err = c.SendWithRetry(params, false, 5)
919 xapp.Logger.Error("rmrSendToXapp: Send failed: %+v", err)
924 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
925 if c.RMRClient == nil {
926 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
927 xapp.Logger.Error("%s", err.Error())
932 defer c.RMRClient.Free(msg.Mbuf)
934 // xapp-frame might use direct access to c buffer and
935 // when msg.Mbuf is freed, someone might take it into use
936 // and payload data might be invalid inside message handle function
938 // subscriptions won't load system a lot so there is no
939 // real performance hit by cloning buffer into new go byte slice
940 cPay := append(msg.Payload[:0:0], msg.Payload...)
942 msg.PayloadLen = len(cPay)
945 case xapp.RIC_SUB_REQ:
946 go c.handleXAPPSubscriptionRequest(msg)
947 case xapp.RIC_SUB_RESP:
948 go c.handleE2TSubscriptionResponse(msg)
949 case xapp.RIC_SUB_FAILURE:
950 go c.handleE2TSubscriptionFailure(msg)
951 case xapp.RIC_SUB_DEL_REQ:
952 go c.handleXAPPSubscriptionDeleteRequest(msg)
953 case xapp.RIC_SUB_DEL_RESP:
954 go c.handleE2TSubscriptionDeleteResponse(msg)
955 case xapp.RIC_SUB_DEL_FAILURE:
956 go c.handleE2TSubscriptionDeleteFailure(msg)
957 case xapp.RIC_SUB_DEL_REQUIRED:
958 go c.handleE2TSubscriptionDeleteRequired(msg)
960 xapp.Logger.Debug("Unknown Message Type '%d', discarding", msg.Mtype)
965 //-------------------------------------------------------------------
966 // handle from XAPP Subscription Request
967 //------------------------------------------------------------------
968 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
969 xapp.Logger.Debug("MSG from XAPP: %s", params.String())
970 c.UpdateCounter(cSubReqFromXapp)
972 if c.e2IfState.IsE2ConnectionUp(¶ms.Meid.RanName) == false {
973 xapp.Logger.Error("No E2 connection for ranName %v", params.Meid.RanName)
977 subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
979 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
983 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
985 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
988 defer trans.Release()
990 if err = c.tracker.Track(trans); err != nil {
991 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
995 subs, _, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c, true)
997 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
1001 c.wakeSubscriptionRequest(subs, trans)
1004 //-------------------------------------------------------------------
1005 // Wake Subscription Request to E2node
1006 //------------------------------------------------------------------
1007 func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) {
1009 e2SubscriptionDirectives, err := c.GetE2SubscriptionDirectives(nil)
1011 xapp.Logger.Error("c.GetE2SubscriptionDirectives failure: %s", err.Error())
1013 subs.OngoingReqCount++
1014 go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives, waitRouteCleanup_ms)
1015 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
1016 subs.OngoingReqCount--
1018 switch themsg := event.(type) {
1019 case *e2ap.E2APSubscriptionResponse:
1020 themsg.RequestId.Id = trans.RequestId.Id
1021 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
1024 c.UpdateCounter(cSubRespToXapp)
1025 err := c.rmrSendToXapp("", subs, trans)
1027 xapp.Logger.Error("rmrSendToXapp() failed:%s", err.Error())
1031 case *e2ap.E2APSubscriptionFailure:
1032 themsg.RequestId.Id = trans.RequestId.Id
1033 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
1035 c.UpdateCounter(cSubFailToXapp)
1036 c.rmrSendToXapp("", subs, trans)
1042 xapp.Logger.Debug("XAPP-SubReq: failed %s", idstring(err, trans, subs))
1045 //-------------------------------------------------------------------
1046 // handle from XAPP Subscription Delete Request
1047 //------------------------------------------------------------------
1048 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
1049 xapp.Logger.Debug("MSG from XAPP: %s", params.String())
1050 c.UpdateCounter(cSubDelReqFromXapp)
1052 if c.e2IfState.IsE2ConnectionUp(¶ms.Meid.RanName) == false {
1053 xapp.Logger.Error("No E2 connection for ranName %v", params.Meid.RanName)
1057 subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
1059 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
1063 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
1065 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
1068 defer trans.Release()
1070 err = c.tracker.Track(trans)
1072 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
1076 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
1078 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
1085 subs.OngoingDelCount++
1086 go c.handleSubscriptionDelete(subs, trans, waitRouteCleanup_ms)
1087 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
1088 subs.OngoingDelCount--
1090 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
1092 if subs.NoRespToXapp == true {
1093 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
1094 xapp.Logger.Debug("XAPP-SubDelReq: subs.NoRespToXapp == true")
1098 // Whatever is received success, fail or timeout, send successful delete response
1099 subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
1100 subDelRespMsg.RequestId.Id = trans.RequestId.Id
1101 subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
1102 subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
1103 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
1105 c.UpdateCounter(cSubDelRespToXapp)
1106 err := c.rmrSendToXapp("", subs, trans)
1108 xapp.Logger.Error("rmrSendToXapp() failed:%s", err.Error())
1113 //-------------------------------------------------------------------
1114 // SUBS CREATE Handling
1115 //-------------------------------------------------------------------
1116 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp, e2SubscriptionDirectives *E2SubscriptionDirectives, waitRouteCleanupTime time.Duration) {
1118 var event interface{} = nil
1119 var removeSubscriptionFromDb bool = false
1120 trans := c.tracker.NewSubsTransaction(subs)
1121 subs.WaitTransactionTurn(trans)
1122 defer subs.ReleaseTransactionTurn(trans)
1123 defer trans.Release()
1125 xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
1127 subRfMsg, valid := subs.GetCachedResponse()
1128 if subRfMsg == nil && valid == true {
1129 event = c.sendE2TSubscriptionRequest(subs, trans, parentTrans, e2SubscriptionDirectives)
1130 switch event.(type) {
1131 case *e2ap.E2APSubscriptionResponse:
1132 subRfMsg, valid = subs.SetCachedResponse(event, true)
1133 subs.SubRespRcvd = true
1134 case *e2ap.E2APSubscriptionFailure:
1135 subRfMsg, valid = subs.SetCachedResponse(event, false)
1136 xapp.Logger.Debug("SUBS-SubReq: internal delete due failure event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1137 case *SubmgrRestartTestEvent:
1138 // This is used to simulate that no response has been received and after restart, subscriptions are restored from db
1139 xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
1140 subRfMsg, valid = subs.SetCachedResponse(event, false)
1141 parentTrans.SendEvent(subRfMsg, 0)
1143 case *PackSubscriptionRequestErrortEvent, *SDLWriteErrortEvent:
1144 subRfMsg, valid = subs.SetCachedResponse(event, false)
1147 if subs.PolicyUpdate == false {
1148 xapp.Logger.Debug("SUBS-SubReq: internal delete due default event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1149 subRfMsg, valid = subs.SetCachedResponse(nil, false)
1150 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1152 subRfMsg, valid = subs.SetCachedResponse(nil, true)
1155 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
1157 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
1160 removeSubscriptionFromDb = true
1163 err := c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
1166 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1170 // Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
1172 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanupTime, c)
1175 parentTrans.SendEvent(subRfMsg, 0)
1178 //-------------------------------------------------------------------
1179 // SUBS DELETE Handling
1180 //-------------------------------------------------------------------
1182 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp, waitRouteCleanupTime time.Duration) {
1184 trans := c.tracker.NewSubsTransaction(subs)
1185 subs.WaitTransactionTurn(trans)
1186 defer subs.ReleaseTransactionTurn(trans)
1187 defer trans.Release()
1189 xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
1193 if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
1196 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1201 // Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
1202 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanupTime, c)
1203 parentTrans.SendEvent(nil, 0)
1206 //-------------------------------------------------------------------
1207 // send to E2T Subscription Request
1208 //-------------------------------------------------------------------
1209 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp, e2SubscriptionDirectives *E2SubscriptionDirectives) interface{} {
1211 var event interface{} = nil
1212 var timedOut bool = false
1213 const ricRequestorId = 123
1215 subReqMsg := subs.SubReqMsg
1216 subReqMsg.RequestId = subs.GetReqId().RequestId
1217 subReqMsg.RequestId.Id = ricRequestorId
1218 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
1220 xapp.Logger.Error("SUBS-SubReq ASN1 pack error: %s", idstring(err, trans, subs, parentTrans))
1221 return &PackSubscriptionRequestErrortEvent{
1223 ErrorSource: models.SubscriptionInstanceErrorSourceASN1,
1224 ErrorCause: err.Error(),
1229 // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
1230 err = c.WriteSubscriptionToDb(subs)
1232 return &SDLWriteErrortEvent{
1234 ErrorSource: models.SubscriptionInstanceErrorSourceDBAAS,
1235 ErrorCause: err.Error(),
1240 for retries := int64(0); retries < e2SubscriptionDirectives.E2MaxTryCount; retries++ {
1241 desc := fmt.Sprintf("(retry %d)", retries)
1243 c.UpdateCounter(cSubReqToE2)
1245 c.UpdateCounter(cSubReReqToE2)
1247 err := c.rmrSendToE2T(desc, subs, trans)
1249 xapp.Logger.Error("rmrSendToE2T() failed:%s", err.Error())
1252 if subs.DoNotWaitSubResp == false {
1253 event, timedOut = trans.WaitEvent(e2SubscriptionDirectives.E2TimeoutTimerValue)
1255 c.UpdateCounter(cSubReqTimerExpiry)
1259 // Simulating case where subscrition request has been sent but response has not been received before restart
1260 event = &SubmgrRestartTestEvent{}
1261 xapp.Logger.Debug("Restart event, DoNotWaitSubResp == true")
1265 xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1269 //-------------------------------------------------------------------
1270 // send to E2T Subscription Delete Request
1271 //-------------------------------------------------------------------
1273 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
1275 var event interface{}
1277 const ricRequestorId = 123
1279 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1280 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1281 subDelReqMsg.RequestId.Id = ricRequestorId
1282 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1283 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1285 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
1289 for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
1290 desc := fmt.Sprintf("(retry %d)", retries)
1292 c.UpdateCounter(cSubDelReqToE2)
1294 c.UpdateCounter(cSubDelReReqToE2)
1296 err := c.rmrSendToE2T(desc, subs, trans)
1298 xapp.Logger.Error("SUBS-SubDelReq: rmrSendToE2T failure: %s", idstring(err, trans, subs, parentTrans))
1300 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
1302 c.UpdateCounter(cSubDelReqTimerExpiry)
1307 xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1311 //-------------------------------------------------------------------
1312 // handle from E2T Subscription Response
1313 //-------------------------------------------------------------------
1314 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
1315 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1316 c.UpdateCounter(cSubRespFromE2)
1318 subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
1320 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
1323 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
1325 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
1328 trans := subs.GetTransaction()
1330 err = fmt.Errorf("Ongoing transaction not found")
1331 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
1334 xapp.Logger.Debug("SUBS-SubResp: Sending event, trans= %v", trans)
1335 sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
1336 if sendOk == false {
1337 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1338 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
1343 //-------------------------------------------------------------------
1344 // handle from E2T Subscription Failure
1345 //-------------------------------------------------------------------
1346 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
1347 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1348 c.UpdateCounter(cSubFailFromE2)
1349 subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
1351 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
1354 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
1356 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
1359 trans := subs.GetTransaction()
1361 err = fmt.Errorf("Ongoing transaction not found")
1362 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
1365 sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
1366 if sendOk == false {
1367 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1368 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
1373 //-------------------------------------------------------------------
1374 // handle from E2T Subscription Delete Response
1375 //-------------------------------------------------------------------
1376 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) {
1377 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1378 c.UpdateCounter(cSubDelRespFromE2)
1379 subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
1381 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1384 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
1386 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1389 trans := subs.GetTransaction()
1391 err = fmt.Errorf("Ongoing transaction not found")
1392 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
1395 sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
1396 if sendOk == false {
1397 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1398 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
1403 //-------------------------------------------------------------------
1404 // handle from E2T Subscription Delete Failure
1405 //-------------------------------------------------------------------
1406 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
1407 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1408 c.UpdateCounter(cSubDelFailFromE2)
1409 subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
1411 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1414 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
1416 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1419 trans := subs.GetTransaction()
1421 err = fmt.Errorf("Ongoing transaction not found")
1422 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
1425 sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
1426 if sendOk == false {
1427 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1428 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
1433 //-------------------------------------------------------------------
1435 //-------------------------------------------------------------------
1436 func typeofSubsMessage(v interface{}) string {
1441 //case *e2ap.E2APSubscriptionRequest:
1443 case *e2ap.E2APSubscriptionResponse:
1445 case *e2ap.E2APSubscriptionFailure:
1447 //case *e2ap.E2APSubscriptionDeleteRequest:
1448 // return "SubDelReq"
1449 case *e2ap.E2APSubscriptionDeleteResponse:
1451 case *e2ap.E2APSubscriptionDeleteFailure:
1458 //-------------------------------------------------------------------
1460 //-------------------------------------------------------------------
1461 func (c *Control) WriteSubscriptionToDb(subs *Subscription) error {
1462 xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
1463 err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
1465 xapp.Logger.Error("%v", err)
1471 //-------------------------------------------------------------------
1473 //-------------------------------------------------------------------
1474 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) error {
1476 if removeSubscriptionFromDb == true {
1477 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1478 c.RemoveSubscriptionFromDb(subs)
1480 // Update is needed for successful response and merge case here
1481 if subs.RetryFromXapp == false {
1482 err := c.WriteSubscriptionToDb(subs)
1486 subs.RetryFromXapp = false
1490 //-------------------------------------------------------------------
1492 //-------------------------------------------------------------------
1493 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
1494 xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
1495 err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
1497 xapp.Logger.Error("%v", err)
1501 //-------------------------------------------------------------------
1503 //-------------------------------------------------------------------
1504 func (c *Control) WriteRESTSubscriptionToDb(restSubId string, restSubs *RESTSubscription) {
1505 xapp.Logger.Debug("WriteRESTSubscriptionToDb() restSubId = %s", restSubId)
1506 err := c.WriteRESTSubscriptionToSdl(restSubId, restSubs)
1508 xapp.Logger.Error("%v", err)
1512 //-------------------------------------------------------------------
1514 //-------------------------------------------------------------------
1515 func (c *Control) UpdateRESTSubscriptionInDB(restSubId string, restSubs *RESTSubscription, removeRestSubscriptionFromDb bool) {
1517 if removeRestSubscriptionFromDb == true {
1518 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1519 c.RemoveRESTSubscriptionFromDb(restSubId)
1521 c.WriteRESTSubscriptionToDb(restSubId, restSubs)
1525 //-------------------------------------------------------------------
1527 //-------------------------------------------------------------------
1528 func (c *Control) RemoveRESTSubscriptionFromDb(restSubId string) {
1529 xapp.Logger.Debug("RemoveRESTSubscriptionFromDb() restSubId = %s", restSubId)
1530 err := c.RemoveRESTSubscriptionFromSdl(restSubId)
1532 xapp.Logger.Error("%v", err)
1536 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription, e2SubsDelRequired bool) {
1538 if c.UTTesting == true {
1539 // Reqistry mutex is not locked after real restart but it can be when restart is simulated in unit tests
1540 c.registry.mutex = new(sync.Mutex)
1543 const ricRequestorId = 123
1544 xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
1546 // Send delete for every endpoint in the subscription
1547 if subs.PolicyUpdate == false {
1548 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1549 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1550 subDelReqMsg.RequestId.Id = ricRequestorId
1551 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1552 mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1554 xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
1557 for _, endPoint := range subs.EpList.Endpoints {
1558 params := &xapp.RMRParams{}
1559 params.Mtype = mType
1560 params.SubId = int(subs.GetReqId().InstanceId)
1562 params.Meid = subs.Meid
1563 params.Src = endPoint.String()
1564 params.PayloadLen = len(payload.Buf)
1565 params.Payload = payload.Buf
1567 subs.DeleteFromDb = true
1568 if !e2SubsDelRequired {
1569 c.handleXAPPSubscriptionDeleteRequest(params)
1571 c.SendSubscriptionDeleteReqToE2T(subs, params)
1577 func (c *Control) PrintRESTSubscriptionRequest(p *models.SubscriptionParams) {
1579 fmt.Println("CRESTSubscriptionRequest")
1585 if p.SubscriptionID != "" {
1586 fmt.Println(" SubscriptionID = ", p.SubscriptionID)
1588 fmt.Println(" SubscriptionID = ''")
1591 fmt.Printf(" ClientEndpoint.Host = %s\n", p.ClientEndpoint.Host)
1593 if p.ClientEndpoint.HTTPPort != nil {
1594 fmt.Printf(" ClientEndpoint.HTTPPort = %v\n", *p.ClientEndpoint.HTTPPort)
1596 fmt.Println(" ClientEndpoint.HTTPPort = nil")
1599 if p.ClientEndpoint.RMRPort != nil {
1600 fmt.Printf(" ClientEndpoint.RMRPort = %v\n", *p.ClientEndpoint.RMRPort)
1602 fmt.Println(" ClientEndpoint.RMRPort = nil")
1606 fmt.Printf(" Meid = %s\n", *p.Meid)
1608 fmt.Println(" Meid = nil")
1611 if p.E2SubscriptionDirectives == nil {
1612 fmt.Println(" E2SubscriptionDirectives = nil")
1614 fmt.Println(" E2SubscriptionDirectives")
1615 if p.E2SubscriptionDirectives.E2RetryCount == nil {
1616 fmt.Println(" E2RetryCount == nil")
1618 fmt.Printf(" E2RetryCount = %v\n", *p.E2SubscriptionDirectives.E2RetryCount)
1620 fmt.Printf(" E2TimeoutTimerValue = %v\n", p.E2SubscriptionDirectives.E2TimeoutTimerValue)
1621 fmt.Printf(" RMRRoutingNeeded = %v\n", p.E2SubscriptionDirectives.RMRRoutingNeeded)
1623 for _, subscriptionDetail := range p.SubscriptionDetails {
1624 if p.RANFunctionID != nil {
1625 fmt.Printf(" RANFunctionID = %v\n", *p.RANFunctionID)
1627 fmt.Println(" RANFunctionID = nil")
1629 fmt.Printf(" SubscriptionDetail.XappEventInstanceID = %v\n", *subscriptionDetail.XappEventInstanceID)
1630 fmt.Printf(" SubscriptionDetail.EventTriggers = %v\n", subscriptionDetail.EventTriggers)
1632 for _, actionToBeSetup := range subscriptionDetail.ActionToBeSetupList {
1633 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionID = %v\n", *actionToBeSetup.ActionID)
1634 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionType = %s\n", *actionToBeSetup.ActionType)
1635 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionDefinition = %v\n", actionToBeSetup.ActionDefinition)
1637 if actionToBeSetup.SubsequentAction != nil {
1638 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.SubsequentAction.SubsequentActionType = %s\n", *actionToBeSetup.SubsequentAction.SubsequentActionType)
1639 fmt.Printf(" SubscriptionDetail.ActionToBeSetup..SubsequentAction.TimeToWait = %s\n", *actionToBeSetup.SubsequentAction.TimeToWait)
1641 fmt.Println(" SubscriptionDetail.ActionToBeSetup.SubsequentAction = nil")
1647 //-------------------------------------------------------------------
1648 // handle from E2T Subscription Delete Required
1649 //-------------------------------------------------------------------
1650 func (c *Control) handleE2TSubscriptionDeleteRequired(params *xapp.RMRParams) {
1651 xapp.Logger.Info("MSG from E2T: %s", params.String())
1652 c.UpdateCounter(cSubDelRequFromE2)
1653 subsDelRequMsg, err := c.e2ap.UnpackSubscriptionDeleteRequired(params.Payload)
1655 xapp.Logger.Error("MSG-SubDelRequired: %s", idstring(err, params))
1656 //c.sendE2TErrorIndication(nil)
1659 var subscriptions = map[string][]e2ap.E2APSubscriptionDeleteRequired{}
1660 var subDB = []*Subscription{}
1661 for _, subsTobeRemove := range subsDelRequMsg.E2APSubscriptionDeleteRequiredRequests {
1662 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subsTobeRemove.RequestId.InstanceId})
1664 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1667 // Check if Delete Subscription Already triggered
1668 if subs.OngoingDelCount > 0 {
1671 subDB = append(subDB, subs)
1672 for _, endpoint := range subs.EpList.Endpoints {
1673 subscriptions[endpoint.Addr] = append(subscriptions[endpoint.Addr], subsTobeRemove)
1675 // Sending Subscription Delete Request to E2T
1676 // c.SendSubscriptionDeleteReq(subs, true)
1678 for _, subsTobeRemove := range subDB {
1679 // Sending Subscription Delete Request to E2T
1680 c.SendSubscriptionDeleteReq(subsTobeRemove, true)
1684 //-----------------------------------------------------------------
1685 // Initiate RIC Subscription Delete Request after receiving
1686 // RIC Subscription Delete Required from E2T
1687 //-----------------------------------------------------------------
1688 func (c *Control) SendSubscriptionDeleteReqToE2T(subs *Subscription, params *xapp.RMRParams) {
1689 xapp.Logger.Debug("MSG TO E2T: %s", params.String())
1690 c.UpdateCounter(cSubDelReqToE2)
1692 if c.e2IfState.IsE2ConnectionUp(¶ms.Meid.RanName) == false {
1693 xapp.Logger.Error("No E2 connection for ranName %v", params.Meid.RanName)
1697 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subs.ReqId.RequestId, params.Meid)
1699 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
1702 defer trans.Release()
1704 err := c.tracker.Track(trans)
1706 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
1713 subs.OngoingDelCount++
1714 go c.handleSubscriptionDelete(subs, trans, waitRouteCleanup_ms)
1715 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
1716 subs.OngoingDelCount--
1718 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
1720 if subs.NoRespToXapp == true {
1721 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
1722 xapp.Logger.Debug("XAPP-SubDelReq: subs.NoRespToXapp == true")