2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
28 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
29 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
30 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
31 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/common"
32 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
33 httptransport "github.com/go-openapi/runtime/client"
34 "github.com/go-openapi/strfmt"
35 "github.com/segmentio/ksuid"
36 "github.com/spf13/viper"
39 //-----------------------------------------------------------------------------
41 //-----------------------------------------------------------------------------
43 func idstring(err error, entries ...fmt.Stringer) string {
44 var retval string = ""
45 var filler string = ""
46 for _, entry := range entries {
48 retval += filler + entry.String()
51 retval += filler + "(NIL)"
55 retval += filler + "err(" + err.Error() + ")"
61 //-----------------------------------------------------------------------------
63 //-----------------------------------------------------------------------------
65 var e2tSubReqTimeout time.Duration
66 var e2tSubDelReqTime time.Duration
67 var e2tRecvMsgTimeout time.Duration
68 var waitRouteCleanup_ms time.Duration
69 var e2tMaxSubReqTryCount uint64 // Initial try + retry
70 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
71 var checkE2State string
72 var readSubsFromDb string
73 var dbRetryForever string
75 var e2IEOrderCheckValue uint8
82 restDuplicateCtrl *DuplicateCtrl
84 e2IfStateDb XappRnibInterface
86 restSubsDb Sdlnterface
89 Counters map[string]xapp.Counter
100 type SubmgrRestartTestEvent struct{}
101 type SubmgrRestartUpEvent struct{}
102 type PackSubscriptionRequestErrortEvent struct {
106 func (p *PackSubscriptionRequestErrortEvent) SetEvent(errorInfo *ErrorInfo) {
107 p.ErrorInfo = *errorInfo
110 type SDLWriteErrortEvent struct {
114 func (s *SDLWriteErrortEvent) SetEvent(errorInfo *ErrorInfo) {
115 s.ErrorInfo = *errorInfo
119 xapp.Logger.Debug("SUBMGR")
121 viper.SetEnvPrefix("submgr")
122 viper.AllowEmptyEnv(true)
125 func NewControl() *Control {
127 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
128 rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
130 registry := new(Registry)
131 registry.Initialize()
132 registry.rtmgrClient = &rtmgrClient
134 tracker := new(Tracker)
137 restDuplicateCtrl := new(DuplicateCtrl)
138 restDuplicateCtrl.Init()
140 e2IfState := new(E2IfState)
142 c := &Control{e2ap: new(E2ap),
145 restDuplicateCtrl: restDuplicateCtrl,
146 e2IfState: e2IfState,
147 e2IfStateDb: CreateXappRnibIfInstance(),
148 e2SubsDb: CreateSdl(),
149 restSubsDb: CreateRESTSdl(),
150 Counters: xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
155 c.ReadConfigParameters("")
157 // Register REST handler for testing support
158 xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET")
159 xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
160 xapp.Resource.InjectRoute("/ric/v1/restsubscriptions", c.GetAllRestSubscriptions, "GET")
162 xapp.Resource.InjectRoute("/ric/v1/get_all_e2nodes", c.GetAllE2Nodes, "GET")
163 xapp.Resource.InjectRoute("/ric/v1/get_e2node_rest_subscriptions/{ranName}", c.GetAllE2NodeRestSubscriptions, "GET")
165 xapp.Resource.InjectRoute("/ric/v1/get_all_xapps", c.GetAllXapps, "GET")
166 xapp.Resource.InjectRoute("/ric/v1/get_xapp_rest_restsubscriptions/{xappServiceName}", c.GetAllXappRestSubscriptions, "GET")
167 xapp.Resource.InjectRoute("/ric/v1/get_e2subscriptions/{restId}", c.GetE2Subscriptions, "GET")
169 xapp.Resource.InjectRoute("/ric/v1/delete_all_e2node_subscriptions/{ranName}", c.DeleteAllE2nodeSubscriptions, "DELETE")
170 xapp.Resource.InjectRoute("/ric/v1/delete_all_xapp_subscriptions/{xappServiceName}", c.DeleteAllXappSubscriptions, "DELETE")
172 if readSubsFromDb == "true" {
173 // Read subscriptions from db
174 err := c.ReadE2Subscriptions()
176 xapp.Logger.Error("ReadE2Subscriptions() failed %s", err.Error())
178 err = c.ReadRESTSubscriptions()
180 xapp.Logger.Error("ReadRESTSubscriptions() failed %s", err.Error())
185 err := xapp.Subscription.Listen(c.RESTSubscriptionHandler, c.RESTQueryHandler, c.RESTSubscriptionDeleteHandler)
187 xapp.Logger.Error("xapp.Subscription.Listen failure: %s", err.Error())
193 func (c *Control) SymptomDataHandler(w http.ResponseWriter, r *http.Request) {
194 subscriptions, err := c.registry.QueryHandler()
196 xapp.Logger.Error("QueryHandler() failed %s", err.Error())
199 xapp.Resource.SendSymptomDataJson(w, r, subscriptions, "platform/subscriptions.json")
202 //-------------------------------------------------------------------
204 //-------------------------------------------------------------------
205 func (c *Control) RESTQueryHandler() (models.SubscriptionList, error) {
206 xapp.Logger.Debug("RESTQueryHandler() called")
210 return c.registry.QueryHandler()
213 //-------------------------------------------------------------------
215 //-------------------------------------------------------------------
216 func (c *Control) ReadE2Subscriptions() error {
219 var register map[uint32]*Subscription
220 for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
221 xapp.Logger.Debug("Reading E2 subscriptions from db")
222 subIds, register, err = c.ReadAllSubscriptionsFromSdl()
224 xapp.Logger.Error("%v", err)
225 <-time.After(1 * time.Second)
227 c.registry.subIds = subIds
228 c.registry.register = register
229 go c.HandleUncompletedSubscriptions(register)
233 xapp.Logger.Debug("Continuing without retring")
237 //-------------------------------------------------------------------
239 //-------------------------------------------------------------------
240 func (c *Control) ReadRESTSubscriptions() error {
242 xapp.Logger.Debug("ReadRESTSubscriptions()")
244 var restSubscriptions map[string]*RESTSubscription
245 for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
246 xapp.Logger.Debug("Reading REST subscriptions from db")
247 restSubscriptions, err = c.ReadAllRESTSubscriptionsFromSdl()
249 xapp.Logger.Error("%v", err)
250 <-time.After(1 * time.Second)
252 // Fix REST subscriptions ongoing status after restart
253 for restSubId, restSubscription := range restSubscriptions {
254 restSubscription.SubReqOngoing = false
255 restSubscription.SubDelReqOngoing = false
256 err := c.WriteRESTSubscriptionToSdl(restSubId, restSubscription)
258 xapp.Logger.Error("WriteRESTSubscriptionToSdl() failed:%s", err.Error())
261 c.registry.restSubscriptions = restSubscriptions
265 xapp.Logger.Debug("Continuing without retring")
269 //-------------------------------------------------------------------
271 //-------------------------------------------------------------------
272 func (c *Control) ReadConfigParameters(f string) {
274 xapp.Logger.Debug("ReadConfigParameters")
276 c.LoggerLevel = int(xapp.Logger.GetLevel())
277 xapp.Logger.Info("LoggerLevel = %v", c.LoggerLevel)
278 c.e2ap.SetASN1DebugPrintStatus(c.LoggerLevel)
280 // viper.GetDuration returns nanoseconds
281 e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
282 if e2tSubReqTimeout == 0 {
283 e2tSubReqTimeout = 2000 * 1000000
284 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tSubReqTimeout")
286 xapp.Logger.Debug("e2tSubReqTimeout= %v", e2tSubReqTimeout)
288 e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
289 if e2tSubDelReqTime == 0 {
290 e2tSubDelReqTime = 2000 * 1000000
291 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tSubDelReqTime")
293 xapp.Logger.Debug("e2tSubDelReqTime= %v", e2tSubDelReqTime)
295 e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
296 if e2tRecvMsgTimeout == 0 {
297 e2tRecvMsgTimeout = 2000 * 1000000
298 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tRecvMsgTimeout")
300 xapp.Logger.Debug("e2tRecvMsgTimeout= %v", e2tRecvMsgTimeout)
302 e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
303 if e2tMaxSubReqTryCount == 0 {
304 e2tMaxSubReqTryCount = 1
305 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tMaxSubReqTryCount")
307 xapp.Logger.Debug("e2tMaxSubReqTryCount= %v", e2tMaxSubReqTryCount)
309 e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
310 if e2tMaxSubDelReqTryCount == 0 {
311 e2tMaxSubDelReqTryCount = 1
312 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tMaxSubDelReqTryCount")
314 xapp.Logger.Debug("e2tMaxSubDelReqTryCount= %v", e2tMaxSubDelReqTryCount)
316 checkE2State = viper.GetString("controls.checkE2State")
317 if checkE2State == "" {
318 checkE2State = "true"
319 xapp.Logger.Debug("WARNING: Using hard coded default value for checkE2State")
321 xapp.Logger.Debug("checkE2State= %v", checkE2State)
323 readSubsFromDb = viper.GetString("controls.readSubsFromDb")
324 if readSubsFromDb == "" {
325 readSubsFromDb = "true"
326 xapp.Logger.Debug("WARNING: Using hard coded default value for readSubsFromDb")
328 xapp.Logger.Debug("readSubsFromDb= %v", readSubsFromDb)
330 dbTryCount = viper.GetInt("controls.dbTryCount")
333 xapp.Logger.Debug("WARNING: Using hard coded default value for dbTryCount")
335 xapp.Logger.Debug("dbTryCount= %v", dbTryCount)
337 dbRetryForever = viper.GetString("controls.dbRetryForever")
338 if dbRetryForever == "" {
339 dbRetryForever = "true"
340 xapp.Logger.Debug("WARNING: Using hard coded default value for dbRetryForever")
342 xapp.Logger.Debug("dbRetryForever= %v", dbRetryForever)
344 // Internal cfg parameter, used to define a wait time for RMR route clean-up. None default
345 // value 100ms used currently only in unittests.
346 waitRouteCleanup_ms = viper.GetDuration("controls.waitRouteCleanup_ms") * 1000000
347 if waitRouteCleanup_ms == 0 {
348 waitRouteCleanup_ms = 5000 * 1000000
349 xapp.Logger.Debug("WARNING: Using hard coded default value for waitRouteCleanup_ms")
351 xapp.Logger.Debug("waitRouteCleanup= %v", waitRouteCleanup_ms)
353 viper.SetDefault("controls.checkE2IEOrder", 1)
354 e2IEOrderCheckValue = uint8(viper.GetUint("controls.checkE2IEOrder"))
355 c.e2ap.SetE2IEOrderCheck(e2IEOrderCheckValue)
356 xapp.Logger.Debug("e2IEOrderCheck= %v", e2IEOrderCheckValue)
359 //-------------------------------------------------------------------
361 //-------------------------------------------------------------------
362 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
364 xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
365 for subId, subs := range register {
366 if subs.SubRespRcvd == false {
367 // If policy subscription has already been made successfully unsuccessful update should not be deleted.
368 if subs.PolicyUpdate == false {
369 subs.NoRespToXapp = true
370 xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
371 c.SendSubscriptionDeleteReq(subs, false)
377 func (c *Control) ReadyCB(data interface{}) {
378 if c.RMRClient == nil {
379 c.RMRClient = xapp.Rmr
383 func (c *Control) Run() {
384 xapp.SetReadyCB(c.ReadyCB, nil)
385 xapp.AddConfigChangeListener(c.ReadConfigParameters)
389 //-------------------------------------------------------------------
391 //-------------------------------------------------------------------
392 func (c *Control) GetOrCreateRestSubscription(p *models.SubscriptionParams, md5sum string, xAppRmrEndpoint string, xAppServiceName string) (*RESTSubscription, string, error) {
395 var restSubscription *RESTSubscription
398 prevRestSubsId, exists := c.restDuplicateCtrl.GetLastKnownRestSubsIdBasedOnMd5sum(md5sum)
399 if p.SubscriptionID == "" {
400 // Subscription does not contain REST subscription Id
402 restSubscription, err = c.registry.GetRESTSubscription(prevRestSubsId, false)
403 if restSubscription != nil {
404 // Subscription not found
405 restSubId = prevRestSubsId
407 xapp.Logger.Debug("Existing restSubId %s found by MD5sum %s for a request without subscription ID - using previous subscription", prevRestSubsId, md5sum)
409 xapp.Logger.Debug("Existing restSubId %s found by MD5sum %s for a request without subscription ID - Note: %s", prevRestSubsId, md5sum, err.Error())
412 xapp.Logger.Debug("None existing restSubId %s referred by MD5sum %s for a request without subscription ID - deleting cached entry", prevRestSubsId, md5sum)
413 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
417 if restSubscription == nil {
418 restSubId = ksuid.New().String()
419 restSubscription = c.registry.CreateRESTSubscription(&restSubId, &xAppServiceName, &xAppRmrEndpoint, p.Meid)
422 // Subscription contains REST subscription Id
423 restSubId = p.SubscriptionID
425 xapp.Logger.Debug("RestSubscription ID %s provided via REST request", restSubId)
426 restSubscription, err = c.registry.GetRESTSubscription(restSubId, false)
428 // Subscription with id in REST request does not exist
429 xapp.Logger.Error("%s", err.Error())
430 c.UpdateCounter(cRestSubFailToXapp)
435 xapp.Logger.Debug("Existing restSubscription found for ID %s, new request based on md5sum", restSubId)
437 xapp.Logger.Debug("Existing restSubscription found for ID %s(%s), re-transmission based on md5sum match with previous request", prevRestSubsId, restSubId)
441 return restSubscription, restSubId, nil
444 //-------------------------------------------------------------------
446 //-------------------------------------------------------------------
447 func (c *Control) RESTSubscriptionHandler(params interface{}) (*models.SubscriptionResponse, int) {
450 c.UpdateCounter(cRestSubReqFromXapp)
452 subResp := models.SubscriptionResponse{}
453 p := params.(*models.SubscriptionParams)
455 if c.LoggerLevel > 2 {
456 c.PrintRESTSubscriptionRequest(p)
459 if c.e2IfState.IsE2ConnectionUp(p.Meid) == false || c.e2IfState.IsE2ConnectionUnderReset(p.Meid) == true {
460 if c.e2IfState.IsE2ConnectionUp(p.Meid) == false {
461 xapp.Logger.Error("No E2 connection for ranName %v", *p.Meid)
462 } else if c.e2IfState.IsE2ConnectionUnderReset(p.Meid) == true {
463 xapp.Logger.Error("E2 Node for ranName %v UNDER RESET", *p.Meid)
465 c.UpdateCounter(cRestReqRejDueE2Down)
466 return nil, common.SubscribeServiceUnavailableCode
469 if p.ClientEndpoint == nil {
470 err := fmt.Errorf("ClientEndpoint == nil")
471 xapp.Logger.Error("%v", err)
472 c.UpdateCounter(cRestSubFailToXapp)
473 return nil, common.SubscribeBadRequestCode
476 e2SubscriptionDirectives, err := c.GetE2SubscriptionDirectives(p)
478 xapp.Logger.Error("%s", err)
479 c.UpdateCounter(cRestSubFailToXapp)
480 return nil, common.SubscribeBadRequestCode
482 _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*p.ClientEndpoint)
484 xapp.Logger.Error("%s", err.Error())
485 c.UpdateCounter(cRestSubFailToXapp)
486 return nil, common.SubscribeBadRequestCode
489 md5sum, err := CalculateRequestMd5sum(params)
491 xapp.Logger.Error("Failed to generate md5sum from incoming request - %s", err.Error())
494 restSubscription, restSubId, err := c.GetOrCreateRestSubscription(p, md5sum, xAppRmrEndpoint, p.ClientEndpoint.Host)
496 xapp.Logger.Error("Subscription with id in REST request does not exist")
497 return nil, common.SubscribeNotFoundCode
500 subResp.SubscriptionID = &restSubId
501 subReqList := e2ap.SubscriptionRequestList{}
502 err = c.e2ap.FillSubscriptionReqMsgs(params, &subReqList, restSubscription)
504 xapp.Logger.Error("%s", err.Error())
505 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
506 c.registry.DeleteRESTSubscription(&restSubId)
507 c.UpdateCounter(cRestSubFailToXapp)
508 return nil, common.SubscribeBadRequestCode
511 duplicate := c.restDuplicateCtrl.IsDuplicateToOngoingTransaction(restSubId, md5sum)
513 err := fmt.Errorf("Retransmission blocker direct ACK for request of restSubsId %s restSubId MD5sum %s as retransmission", restSubId, md5sum)
514 xapp.Logger.Debug("%s", err)
515 c.registry.DeleteRESTSubscription(&restSubId)
516 c.UpdateCounter(cRestSubRespToXapp)
517 return &subResp, common.SubscribeCreatedCode
520 c.WriteRESTSubscriptionToDb(restSubId, restSubscription)
521 go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId, xAppRmrEndpoint, md5sum, e2SubscriptionDirectives)
523 c.UpdateCounter(cRestSubRespToXapp)
524 return &subResp, common.SubscribeCreatedCode
527 //-------------------------------------------------------------------
529 //-------------------------------------------------------------------
530 func (c *Control) GetE2SubscriptionDirectives(p *models.SubscriptionParams) (*E2SubscriptionDirectives, error) {
532 e2SubscriptionDirectives := &E2SubscriptionDirectives{}
533 if p == nil || p.E2SubscriptionDirectives == nil {
534 e2SubscriptionDirectives.E2TimeoutTimerValue = e2tSubReqTimeout
535 e2SubscriptionDirectives.E2MaxTryCount = int64(e2tMaxSubReqTryCount)
536 e2SubscriptionDirectives.CreateRMRRoute = true
537 xapp.Logger.Debug("p == nil || p.E2SubscriptionDirectives == nil. Using default values for E2TimeoutTimerValue = %v and E2RetryCount = %v RMRRoutingNeeded = true", e2tSubReqTimeout, e2tMaxSubReqTryCount)
539 if p.E2SubscriptionDirectives.E2TimeoutTimerValue >= 1 && p.E2SubscriptionDirectives.E2TimeoutTimerValue <= 10 {
540 e2SubscriptionDirectives.E2TimeoutTimerValue = time.Duration(p.E2SubscriptionDirectives.E2TimeoutTimerValue) * 1000000000 // Duration type cast returns nano seconds
542 return nil, fmt.Errorf("p.E2SubscriptionDirectives.E2TimeoutTimerValue out of range (1-10 seconds): %v", p.E2SubscriptionDirectives.E2TimeoutTimerValue)
544 if p.E2SubscriptionDirectives.E2RetryCount == nil {
545 xapp.Logger.Error("p.E2SubscriptionDirectives.E2RetryCount == nil. Using default value")
546 e2SubscriptionDirectives.E2MaxTryCount = int64(e2tMaxSubReqTryCount)
548 if *p.E2SubscriptionDirectives.E2RetryCount >= 0 && *p.E2SubscriptionDirectives.E2RetryCount <= 10 {
549 e2SubscriptionDirectives.E2MaxTryCount = *p.E2SubscriptionDirectives.E2RetryCount + 1 // E2MaxTryCount = First sending plus two retries
551 return nil, fmt.Errorf("p.E2SubscriptionDirectives.E2RetryCount out of range (0-10): %v", *p.E2SubscriptionDirectives.E2RetryCount)
554 e2SubscriptionDirectives.CreateRMRRoute = p.E2SubscriptionDirectives.RMRRoutingNeeded
556 xapp.Logger.Debug("e2SubscriptionDirectives.E2TimeoutTimerValue: %v", e2SubscriptionDirectives.E2TimeoutTimerValue)
557 xapp.Logger.Debug("e2SubscriptionDirectives.E2MaxTryCount: %v", e2SubscriptionDirectives.E2MaxTryCount)
558 xapp.Logger.Debug("e2SubscriptionDirectives.CreateRMRRoute: %v", e2SubscriptionDirectives.CreateRMRRoute)
559 return e2SubscriptionDirectives, nil
562 //-------------------------------------------------------------------
564 //-------------------------------------------------------------------
566 func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription, subReqList *e2ap.SubscriptionRequestList,
567 clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string, xAppRmrEndpoint string, md5sum string, e2SubscriptionDirectives *E2SubscriptionDirectives) {
569 c.SubscriptionProcessingStartDelay()
570 xapp.Logger.Debug("E2 SubscriptionRequest count = %v ", len(subReqList.E2APSubscriptionRequests))
572 var xAppEventInstanceID int64
573 var e2EventInstanceID int64
574 errorInfo := &ErrorInfo{}
576 defer c.restDuplicateCtrl.SetMd5sumFromLastOkRequest(*restSubId, md5sum)
578 for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ {
579 subReqMsg := subReqList.E2APSubscriptionRequests[index]
580 xAppEventInstanceID = (int64)(subReqMsg.RequestId.Id)
582 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), *restSubId, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid})
584 // Send notification to xApp that prosessing of a Subscription Request has failed.
585 err := fmt.Errorf("Tracking failure")
586 errorInfo.ErrorCause = err.Error()
587 c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans, errorInfo)
591 xapp.Logger.Debug("Handle SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
593 subRespMsg, errorInfo, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, *restSubId, e2SubscriptionDirectives)
595 xapp.Logger.Debug("Handled SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
599 if err.Error() == "TEST: restart event received" {
600 // This is just for UT cases. Stop here subscription processing
603 c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans, errorInfo)
605 e2EventInstanceID = (int64)(subRespMsg.RequestId.InstanceId)
606 restSubscription.AddMd5Sum(md5sum)
607 xapp.Logger.Debug("SubscriptionRequest index=%v processed successfullyfor %s. endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
608 index, *restSubId, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
609 c.sendSuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, e2EventInstanceID, clientEndpoint, trans, errorInfo)
614 //-------------------------------------------------------------------
616 //------------------------------------------------------------------
617 func (c *Control) SubscriptionProcessingStartDelay() {
618 if c.UTTesting == true {
619 // This is temporary fix for the UT problem that notification arrives before subscription response
620 // Correct fix would be to allow notification come before response and process it correctly
621 xapp.Logger.Debug("Setting 50 ms delay before starting processing Subscriptions")
622 <-time.After(time.Millisecond * 50)
623 xapp.Logger.Debug("Continuing after delay")
627 //-------------------------------------------------------------------
629 //------------------------------------------------------------------
630 func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, meid *string,
631 restSubId string, e2SubscriptionDirectives *E2SubscriptionDirectives) (*e2ap.E2APSubscriptionResponse, *ErrorInfo, error) {
633 errorInfo := ErrorInfo{}
635 err := c.tracker.Track(trans)
637 xapp.Logger.Error("XAPP-SubReq Tracking error: %s", idstring(err, trans))
638 errorInfo.ErrorCause = err.Error()
639 err = fmt.Errorf("Tracking failure")
640 return nil, &errorInfo, err
643 subs, errorInfo, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c, e2SubscriptionDirectives.CreateRMRRoute)
645 xapp.Logger.Error("XAPP-SubReq Assign error: %s", idstring(err, trans))
646 return nil, &errorInfo, err
652 subs.OngoingReqCount++
653 go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives, 0)
654 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
655 subs.OngoingReqCount--
659 switch themsg := event.(type) {
660 case *e2ap.E2APSubscriptionResponse:
662 if c.e2IfState.IsE2ConnectionUp(meid) == true {
663 errorInfo = c.e2ap.CheckActionNotAdmittedList(xapp.RIC_SUB_RESP, themsg.ActionNotAdmittedList, c)
664 return themsg, &errorInfo, nil
666 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
667 c.RemoveSubscriptionFromDb(subs)
668 err = fmt.Errorf("E2 interface down")
669 errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
671 case *e2ap.E2APSubscriptionFailure:
672 err = fmt.Errorf("RICSubscriptionFailure. E2NodeCause: (Cause:%v, Value %v)", themsg.Cause.Content, themsg.Cause.Value)
673 errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
674 case *e2ap.E2APErrorIndication:
675 err = fmt.Errorf("RICE2RanErrorIndication. E2NodeCause: (Cause:%v, Value %v)", themsg.Cause.Content, themsg.Cause.Value)
676 errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
677 case *PackSubscriptionRequestErrortEvent:
678 err = fmt.Errorf("E2 RICSubscriptionRequest pack failure")
679 errorInfo = themsg.ErrorInfo
680 case *SDLWriteErrortEvent:
681 err = fmt.Errorf("SDL write failure")
682 errorInfo = themsg.ErrorInfo
683 case *SubmgrRestartTestEvent:
684 err = fmt.Errorf("TEST: restart event received")
685 xapp.Logger.Debug("%s", err)
686 return nil, &errorInfo, err
688 err = fmt.Errorf("Unexpected E2 subscription response received")
689 errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
694 err = fmt.Errorf("E2 RICSubscriptionResponse timeout")
695 errorInfo.SetInfo(err.Error(), "", models.SubscriptionInstanceTimeoutTypeE2Timeout)
696 if subs.PolicyUpdate == true {
697 return nil, &errorInfo, err
701 xapp.Logger.Error("XAPP-SubReq E2 subscription failed: %s", idstring(err, trans, subs))
702 // If policy type subscription fails we cannot remove it only internally. Once subscription has been created
703 // successfully, it must be deleted on both sides.
704 if subs.PolicyUpdate == false {
705 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
708 return nil, &errorInfo, err
711 //-------------------------------------------------------------------
713 //-------------------------------------------------------------------
714 func (c *Control) sendUnsuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, err error,
715 clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp, errorInfo *ErrorInfo) {
717 // Send notification to xApp that prosessing of a Subscription Request has failed.
718 e2EventInstanceID := (int64)(0)
719 if errorInfo.ErrorSource == "" {
720 // Submgr is default source of error
721 errorInfo.ErrorSource = models.SubscriptionInstanceErrorSourceSUBMGR
723 resp := &models.SubscriptionResponse{
724 SubscriptionID: restSubId,
725 SubscriptionInstances: []*models.SubscriptionInstance{
726 &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
727 ErrorCause: errorInfo.ErrorCause,
728 ErrorSource: errorInfo.ErrorSource,
729 TimeoutType: errorInfo.TimeoutType,
730 XappEventInstanceID: &xAppEventInstanceID},
733 // Mark REST subscription request processed.
734 restSubscription.SetProcessed(err)
735 c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
737 xapp.Logger.Debug("Sending unsuccessful REST notification: ErrorCause:%s, ErrorSource:%s, TimeoutType:%s, to Endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
738 errorInfo.ErrorCause, errorInfo.ErrorSource, errorInfo.TimeoutType, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
740 xapp.Logger.Debug("Sending unsuccessful REST notification: ErrorCause:%s, ErrorSource:%s, TimeoutType:%s, to Endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v",
741 errorInfo.ErrorCause, errorInfo.ErrorSource, errorInfo.TimeoutType, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID)
744 c.UpdateCounter(cRestSubFailNotifToXapp)
745 err = xapp.Subscription.Notify(resp, *clientEndpoint)
747 xapp.Logger.Error("xapp.Subscription.Notify failed %s", err.Error())
750 // E2 is down. Delete completely processed request safely now
751 if c.e2IfState.IsE2ConnectionUp(&restSubscription.Meid) == false && restSubscription.SubReqOngoing == false {
752 c.registry.DeleteRESTSubscription(restSubId)
753 c.RemoveRESTSubscriptionFromDb(*restSubId)
757 //-------------------------------------------------------------------
759 //-------------------------------------------------------------------
760 func (c *Control) sendSuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, e2EventInstanceID int64,
761 clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp, errorInfo *ErrorInfo) {
763 // Store successfully processed InstanceId for deletion
764 restSubscription.AddE2InstanceId((uint32)(e2EventInstanceID))
765 restSubscription.AddXappIdToE2Id(xAppEventInstanceID, e2EventInstanceID)
767 // Send notification to xApp that a Subscription Request has been processed.
768 resp := &models.SubscriptionResponse{
769 SubscriptionID: restSubId,
770 SubscriptionInstances: []*models.SubscriptionInstance{
771 &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
772 ErrorCause: errorInfo.ErrorCause,
773 ErrorSource: errorInfo.ErrorSource,
774 XappEventInstanceID: &xAppEventInstanceID},
777 // Mark REST subscription request processesd.
778 restSubscription.SetProcessed(nil)
779 c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
780 xapp.Logger.Debug("Sending successful REST notification: ErrorCause:%s, ErrorSource:%s, TimeoutType:%s, to Endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
781 errorInfo.ErrorCause, errorInfo.ErrorSource, errorInfo.TimeoutType, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
782 c.UpdateCounter(cRestSubNotifToXapp)
783 err := xapp.Subscription.Notify(resp, *clientEndpoint)
785 xapp.Logger.Error("xapp.Subscription.Notify failed %s", err.Error())
788 // E2 is down. Delete completely processed request safely now
789 if c.e2IfState.IsE2ConnectionUp(&restSubscription.Meid) == false && restSubscription.SubReqOngoing == false {
790 c.registry.DeleteRESTSubscription(restSubId)
791 c.RemoveRESTSubscriptionFromDb(*restSubId)
795 //-------------------------------------------------------------------
797 //-------------------------------------------------------------------
798 func (c *Control) RESTSubscriptionDeleteHandler(restSubId string) int {
801 c.UpdateCounter(cRestSubDelReqFromXapp)
803 xapp.Logger.Debug("SubscriptionDeleteRequest from XAPP")
805 restSubscription, err := c.registry.GetRESTSubscription(restSubId, true)
807 xapp.Logger.Error("%s", err.Error())
808 if restSubscription == nil {
809 // Subscription was not found
810 c.UpdateCounter(cRestSubDelRespToXapp)
811 return common.UnsubscribeNoContentCode
813 if restSubscription.SubReqOngoing == true {
814 err := fmt.Errorf("Handling of the REST Subscription Request still ongoing %s", restSubId)
815 xapp.Logger.Error("%s", err.Error())
816 c.UpdateCounter(cRestSubDelFailToXapp)
817 return common.UnsubscribeBadRequestCode
818 } else if restSubscription.SubDelReqOngoing == true {
819 // Previous request for same restSubId still ongoing
820 c.UpdateCounter(cRestSubDelRespToXapp)
821 return common.UnsubscribeNoContentCode
826 xAppRmrEndPoint := restSubscription.xAppRmrEndPoint
828 xapp.Logger.Debug("Deleteting handler: processing instances = %v", restSubscription.InstanceIds)
829 for _, instanceId := range restSubscription.InstanceIds {
830 xAppEventInstanceID, err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId, 0)
833 xapp.Logger.Error("%s", err.Error())
835 xapp.Logger.Debug("Deleteting instanceId = %v", instanceId)
836 restSubscription.DeleteXappIdToE2Id(xAppEventInstanceID)
837 restSubscription.DeleteE2InstanceId(instanceId)
839 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(restSubscription.lastReqMd5sum)
840 c.registry.DeleteRESTSubscription(&restSubId)
841 c.RemoveRESTSubscriptionFromDb(restSubId)
844 c.UpdateCounter(cRestSubDelRespToXapp)
845 return common.UnsubscribeNoContentCode
848 //-------------------------------------------------------------------
850 //-------------------------------------------------------------------
851 func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32, waitRouteCleanupTime time.Duration) (int64, error) {
853 var xAppEventInstanceID int64
854 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{instanceId})
856 xapp.Logger.Debug("Subscription Delete Handler subscription for restSubId=%v, E2EventInstanceID=%v not found %s",
857 restSubId, instanceId, idstring(err, nil))
858 return xAppEventInstanceID, nil
861 xAppEventInstanceID = int64(subs.ReqId.Id)
862 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(*endPoint), *restSubId, e2ap.RequestId{subs.ReqId.Id, 0}, &xapp.RMRMeid{RanName: *meid})
864 err := fmt.Errorf("XAPP-SubDelReq transaction not created. restSubId %s, endPoint %s, meid %s, instanceId %v", *restSubId, *endPoint, *meid, instanceId)
865 xapp.Logger.Error("%s", err.Error())
867 defer trans.Release()
869 err = c.tracker.Track(trans)
871 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
872 xapp.Logger.Error("%s", err.Error())
873 return xAppEventInstanceID, &time.ParseError{}
878 subs.OngoingDelCount++
879 go c.handleSubscriptionDelete(subs, trans, waitRouteCleanupTime)
880 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
881 subs.OngoingDelCount--
883 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
885 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
887 return xAppEventInstanceID, nil
890 //-------------------------------------------------------------------
892 //-------------------------------------------------------------------
894 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
895 params := &xapp.RMRParams{}
896 params.Mtype = trans.GetMtype()
897 params.SubId = int(subs.GetReqId().InstanceId)
899 params.Meid = subs.GetMeid()
901 params.PayloadLen = len(trans.Payload.Buf)
902 params.Payload = trans.Payload.Buf
904 xapp.Logger.Debug("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
905 err = c.SendWithRetry(params, false, 5)
907 xapp.Logger.Error("rmrSendToE2T: Send failed: %+v", err)
912 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
914 params := &xapp.RMRParams{}
915 params.Mtype = trans.GetMtype()
916 params.SubId = int(subs.GetReqId().InstanceId)
917 params.Xid = trans.GetXid()
918 params.Meid = trans.GetMeid()
920 params.PayloadLen = len(trans.Payload.Buf)
921 params.Payload = trans.Payload.Buf
923 xapp.Logger.Debug("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
924 err = c.SendWithRetry(params, false, 5)
926 xapp.Logger.Error("rmrSendToXapp: Send failed: %+v", err)
931 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
932 if c.RMRClient == nil {
933 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
934 xapp.Logger.Error("%s", err.Error())
939 defer c.RMRClient.Free(msg.Mbuf)
941 // xapp-frame might use direct access to c buffer and
942 // when msg.Mbuf is freed, someone might take it into use
943 // and payload data might be invalid inside message handle function
945 // subscriptions won't load system a lot so there is no
946 // real performance hit by cloning buffer into new go byte slice
947 cPay := append(msg.Payload[:0:0], msg.Payload...)
949 msg.PayloadLen = len(cPay)
952 case xapp.RIC_SUB_REQ:
953 go c.handleXAPPSubscriptionRequest(msg)
954 case xapp.RIC_SUB_RESP:
955 go c.handleE2TSubscriptionResponse(msg)
956 case xapp.RIC_SUB_FAILURE:
957 go c.handleE2TSubscriptionFailure(msg)
958 case xapp.RIC_SUB_DEL_REQ:
959 go c.handleXAPPSubscriptionDeleteRequest(msg)
960 case xapp.RIC_SUB_DEL_RESP:
961 go c.handleE2TSubscriptionDeleteResponse(msg)
962 case xapp.RIC_SUB_DEL_FAILURE:
963 go c.handleE2TSubscriptionDeleteFailure(msg)
964 case xapp.RIC_SUB_DEL_REQUIRED:
965 go c.handleE2TSubscriptionDeleteRequired(msg)
966 case xapp.RIC_E2_RAN_ERROR_INDICATION:
967 go c.handleE2RanErrorIndication(msg)
969 xapp.Logger.Debug("Unknown Message Type '%d', discarding", msg.Mtype)
974 //-------------------------------------------------------------------
975 // handle from XAPP Subscription Request
976 //------------------------------------------------------------------
977 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
978 xapp.Logger.Debug("MSG from XAPP: %s", params.String())
979 c.UpdateCounter(cSubReqFromXapp)
981 if c.e2IfState.IsE2ConnectionUp(¶ms.Meid.RanName) == false {
982 xapp.Logger.Error("No E2 connection for ranName %v", params.Meid.RanName)
986 subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
988 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
992 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
994 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
997 defer trans.Release()
999 if err = c.tracker.Track(trans); err != nil {
1000 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
1004 subs, _, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c, true)
1006 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
1010 c.wakeSubscriptionRequest(subs, trans)
1013 //-------------------------------------------------------------------
1014 // Wake Subscription Request to E2node
1015 //------------------------------------------------------------------
1016 func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) {
1018 e2SubscriptionDirectives, err := c.GetE2SubscriptionDirectives(nil)
1020 xapp.Logger.Error("c.GetE2SubscriptionDirectives failure: %s", err.Error())
1022 subs.OngoingReqCount++
1023 go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives, waitRouteCleanup_ms)
1024 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
1025 subs.OngoingReqCount--
1027 switch themsg := event.(type) {
1028 case *e2ap.E2APSubscriptionResponse:
1029 themsg.RequestId.Id = trans.RequestId.Id
1030 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
1033 c.UpdateCounter(cSubRespToXapp)
1034 err := c.rmrSendToXapp("", subs, trans)
1036 xapp.Logger.Error("rmrSendToXapp() failed:%s", err.Error())
1040 case *e2ap.E2APSubscriptionFailure:
1041 themsg.RequestId.Id = trans.RequestId.Id
1042 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
1044 c.UpdateCounter(cSubFailToXapp)
1045 c.rmrSendToXapp("", subs, trans)
1051 xapp.Logger.Debug("XAPP-SubReq: failed %s", idstring(err, trans, subs))
1054 //-------------------------------------------------------------------
1055 // handle from XAPP Subscription Delete Request
1056 //------------------------------------------------------------------
1057 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
1058 xapp.Logger.Debug("MSG from XAPP: %s", params.String())
1059 c.UpdateCounter(cSubDelReqFromXapp)
1061 if c.e2IfState.IsE2ConnectionUp(¶ms.Meid.RanName) == false {
1062 xapp.Logger.Error("No E2 connection for ranName %v", params.Meid.RanName)
1066 subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
1068 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
1072 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
1074 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
1077 defer trans.Release()
1079 err = c.tracker.Track(trans)
1081 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
1085 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
1087 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
1094 subs.OngoingDelCount++
1095 go c.handleSubscriptionDelete(subs, trans, waitRouteCleanup_ms)
1096 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
1097 subs.OngoingDelCount--
1099 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
1101 if subs.NoRespToXapp == true {
1102 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
1103 xapp.Logger.Debug("XAPP-SubDelReq: subs.NoRespToXapp == true")
1107 // Whatever is received success, fail or timeout, send successful delete response
1108 subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
1109 subDelRespMsg.RequestId.Id = trans.RequestId.Id
1110 subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
1111 subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
1112 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
1114 c.UpdateCounter(cSubDelRespToXapp)
1115 err := c.rmrSendToXapp("", subs, trans)
1117 xapp.Logger.Error("rmrSendToXapp() failed:%s", err.Error())
1122 //-------------------------------------------------------------------
1123 // SUBS CREATE Handling
1124 //-------------------------------------------------------------------
1125 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp, e2SubscriptionDirectives *E2SubscriptionDirectives, waitRouteCleanupTime time.Duration) {
1127 var event interface{} = nil
1128 var removeSubscriptionFromDb bool = false
1129 trans := c.tracker.NewSubsTransaction(subs)
1130 subs.WaitTransactionTurn(trans)
1131 defer subs.ReleaseTransactionTurn(trans)
1132 defer trans.Release()
1134 xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
1136 subRfMsg, valid := subs.GetCachedResponse()
1137 if subRfMsg == nil && valid == true {
1138 event = c.sendE2TSubscriptionRequest(subs, trans, parentTrans, e2SubscriptionDirectives)
1139 switch event.(type) {
1140 case *e2ap.E2APSubscriptionResponse:
1141 subRfMsg, valid = subs.SetCachedResponse(event, true)
1142 subs.SubRespRcvd = true
1143 case *e2ap.E2APSubscriptionFailure:
1144 if subs.PolicyUpdate == false {
1145 subRfMsg, valid = subs.SetCachedResponse(event, false)
1147 // In policy update case where subscription has already been created successfully in Gnb
1148 // we cannot delete subscription internally in submgr
1149 subRfMsg, valid = subs.SetCachedResponse(event, true)
1151 xapp.Logger.Debug("SUBS-SubReq: internal delete due failure event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1152 case *e2ap.E2APErrorIndication:
1153 subRfMsg, valid = subs.SetCachedResponse(event, false)
1154 case *SubmgrRestartTestEvent:
1155 // This is used to simulate that no response has been received and after restart, subscriptions are restored from db
1156 xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
1157 subRfMsg, valid = subs.SetCachedResponse(event, false)
1158 parentTrans.SendEvent(subRfMsg, 0)
1160 case *PackSubscriptionRequestErrortEvent, *SDLWriteErrortEvent:
1161 subRfMsg, valid = subs.SetCachedResponse(event, false)
1164 if subs.PolicyUpdate == false {
1165 xapp.Logger.Debug("SUBS-SubReq: internal delete due default event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1166 subRfMsg, valid = subs.SetCachedResponse(nil, false)
1167 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1169 subRfMsg, valid = subs.SetCachedResponse(nil, true)
1172 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
1174 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
1176 xapp.Logger.Debug("subs.PolicyUpdate: %v", subs.PolicyUpdate)
1177 xapp.Logger.Debug("subs: %v", subs)
1180 removeSubscriptionFromDb = true
1183 err := c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
1186 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1190 // Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
1192 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanupTime, c)
1195 parentTrans.SendEvent(subRfMsg, 0)
1198 //-------------------------------------------------------------------
1199 // SUBS DELETE Handling
1200 //-------------------------------------------------------------------
1202 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp, waitRouteCleanupTime time.Duration) {
1204 trans := c.tracker.NewSubsTransaction(subs)
1205 subs.WaitTransactionTurn(trans)
1206 defer subs.ReleaseTransactionTurn(trans)
1207 defer trans.Release()
1209 xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
1213 if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
1216 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1221 // Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
1222 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanupTime, c)
1223 parentTrans.SendEvent(nil, 0)
1226 //-------------------------------------------------------------------
1227 // send to E2T Subscription Request
1228 //-------------------------------------------------------------------
1229 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp, e2SubscriptionDirectives *E2SubscriptionDirectives) interface{} {
1231 var event interface{} = nil
1232 var timedOut bool = false
1233 const ricRequestorId = 123
1235 subReqMsg := subs.SubReqMsg
1236 subReqMsg.RequestId = subs.GetReqId().RequestId
1237 subReqMsg.RequestId.Id = ricRequestorId
1238 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
1240 xapp.Logger.Error("SUBS-SubReq ASN1 pack error: %s", idstring(err, trans, subs, parentTrans))
1241 return &PackSubscriptionRequestErrortEvent{
1243 ErrorSource: models.SubscriptionInstanceErrorSourceASN1,
1244 ErrorCause: err.Error(),
1249 // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
1250 err = c.WriteSubscriptionToDb(subs)
1252 return &SDLWriteErrortEvent{
1254 ErrorSource: models.SubscriptionInstanceErrorSourceDBAAS,
1255 ErrorCause: err.Error(),
1260 for retries := int64(0); retries < e2SubscriptionDirectives.E2MaxTryCount; retries++ {
1261 desc := fmt.Sprintf("(retry %d)", retries)
1263 c.UpdateCounter(cSubReqToE2)
1265 c.UpdateCounter(cSubReReqToE2)
1267 err := c.rmrSendToE2T(desc, subs, trans)
1269 xapp.Logger.Error("rmrSendToE2T() failed:%s", err.Error())
1272 if subs.DoNotWaitSubResp == false {
1273 event, timedOut = trans.WaitEvent(e2SubscriptionDirectives.E2TimeoutTimerValue)
1275 c.UpdateCounter(cSubReqTimerExpiry)
1279 // Simulating case where subscrition request has been sent but response has not been received before restart
1280 event = &SubmgrRestartTestEvent{}
1281 xapp.Logger.Debug("Restart event, DoNotWaitSubResp == true")
1285 xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1289 //-------------------------------------------------------------------
1290 // send to E2T Subscription Delete Request
1291 //-------------------------------------------------------------------
1293 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
1295 var event interface{}
1297 const ricRequestorId = 123
1299 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1300 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1301 subDelReqMsg.RequestId.Id = ricRequestorId
1302 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1303 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1305 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
1309 for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
1310 desc := fmt.Sprintf("(retry %d)", retries)
1312 c.UpdateCounter(cSubDelReqToE2)
1314 c.UpdateCounter(cSubDelReReqToE2)
1316 err := c.rmrSendToE2T(desc, subs, trans)
1318 xapp.Logger.Error("SUBS-SubDelReq: rmrSendToE2T failure: %s", idstring(err, trans, subs, parentTrans))
1320 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
1322 c.UpdateCounter(cSubDelReqTimerExpiry)
1327 xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1331 //-------------------------------------------------------------------
1332 // handle from E2T Subscription Response
1333 //-------------------------------------------------------------------
1334 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
1335 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1336 c.UpdateCounter(cSubRespFromE2)
1338 subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
1340 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
1343 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
1345 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
1348 trans := subs.GetTransaction()
1350 err = fmt.Errorf("Ongoing transaction not found")
1351 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
1354 xapp.Logger.Debug("SUBS-SubResp: Sending event, trans= %v", trans)
1355 sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
1356 if sendOk == false {
1357 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1358 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
1363 //-------------------------------------------------------------------
1364 // handle from E2T Subscription Failure
1365 //-------------------------------------------------------------------
1366 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
1367 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1368 c.UpdateCounter(cSubFailFromE2)
1369 subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
1371 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
1374 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
1376 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
1379 trans := subs.GetTransaction()
1381 err = fmt.Errorf("Ongoing transaction not found")
1382 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
1385 sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
1386 if sendOk == false {
1387 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1388 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
1393 //-------------------------------------------------------------------
1394 // handle from E2T Subscription Delete Response
1395 //-------------------------------------------------------------------
1396 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) {
1397 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1398 c.UpdateCounter(cSubDelRespFromE2)
1399 subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
1401 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1404 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
1406 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1409 trans := subs.GetTransaction()
1411 err = fmt.Errorf("Ongoing transaction not found")
1412 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
1415 sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
1416 if sendOk == false {
1417 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1418 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
1423 //-------------------------------------------------------------------
1424 // handle from E2T Subscription Delete Failure
1425 //-------------------------------------------------------------------
1426 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
1427 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1428 c.UpdateCounter(cSubDelFailFromE2)
1429 subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
1431 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1434 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
1436 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1439 trans := subs.GetTransaction()
1441 err = fmt.Errorf("Ongoing transaction not found")
1442 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
1445 sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
1446 if sendOk == false {
1447 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1448 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
1453 //-------------------------------------------------------------------
1455 //-------------------------------------------------------------------
1456 func typeofSubsMessage(v interface{}) string {
1461 //case *e2ap.E2APSubscriptionRequest:
1463 case *e2ap.E2APSubscriptionResponse:
1465 case *e2ap.E2APSubscriptionFailure:
1467 //case *e2ap.E2APSubscriptionDeleteRequest:
1468 // return "SubDelReq"
1469 case *e2ap.E2APSubscriptionDeleteResponse:
1471 case *e2ap.E2APSubscriptionDeleteFailure:
1473 case *e2ap.E2APErrorIndication:
1474 return "RicE2RanErrorIndication"
1480 //-------------------------------------------------------------------
1482 //-------------------------------------------------------------------
1483 func (c *Control) WriteSubscriptionToDb(subs *Subscription) error {
1484 xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
1485 err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
1487 xapp.Logger.Error("%v", err)
1493 //-------------------------------------------------------------------
1495 //-------------------------------------------------------------------
1496 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) error {
1498 if removeSubscriptionFromDb == true {
1499 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1500 c.RemoveSubscriptionFromDb(subs)
1502 // Update is needed for successful response and merge case here
1503 if subs.RetryFromXapp == false {
1504 err := c.WriteSubscriptionToDb(subs)
1508 subs.RetryFromXapp = false
1512 //-------------------------------------------------------------------
1514 //-------------------------------------------------------------------
1515 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
1516 xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
1517 err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
1519 xapp.Logger.Error("%v", err)
1523 //-------------------------------------------------------------------
1525 //-------------------------------------------------------------------
1526 func (c *Control) WriteRESTSubscriptionToDb(restSubId string, restSubs *RESTSubscription) {
1527 xapp.Logger.Debug("WriteRESTSubscriptionToDb() restSubId = %s", restSubId)
1528 err := c.WriteRESTSubscriptionToSdl(restSubId, restSubs)
1530 xapp.Logger.Error("%v", err)
1534 //-------------------------------------------------------------------
1536 //-------------------------------------------------------------------
1537 func (c *Control) UpdateRESTSubscriptionInDB(restSubId string, restSubs *RESTSubscription, removeRestSubscriptionFromDb bool) {
1539 if removeRestSubscriptionFromDb == true {
1540 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1541 c.RemoveRESTSubscriptionFromDb(restSubId)
1543 c.WriteRESTSubscriptionToDb(restSubId, restSubs)
1547 //-------------------------------------------------------------------
1549 //-------------------------------------------------------------------
1550 func (c *Control) RemoveRESTSubscriptionFromDb(restSubId string) {
1551 xapp.Logger.Debug("RemoveRESTSubscriptionFromDb() restSubId = %s", restSubId)
1552 err := c.RemoveRESTSubscriptionFromSdl(restSubId)
1554 xapp.Logger.Error("%v", err)
1558 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription, e2SubsDelRequired bool) {
1560 if c.UTTesting == true {
1561 // Reqistry mutex is not locked after real restart but it can be when restart is simulated in unit tests
1562 c.registry.mutex = new(sync.Mutex)
1565 const ricRequestorId = 123
1566 xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
1568 // Send delete for every endpoint in the subscription
1569 if subs.PolicyUpdate == false {
1570 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1571 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1572 subDelReqMsg.RequestId.Id = ricRequestorId
1573 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1574 mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1576 xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
1579 for _, endPoint := range subs.EpList.Endpoints {
1580 params := &xapp.RMRParams{}
1581 params.Mtype = mType
1582 params.SubId = int(subs.GetReqId().InstanceId)
1584 params.Meid = subs.Meid
1585 params.Src = endPoint.String()
1586 params.PayloadLen = len(payload.Buf)
1587 params.Payload = payload.Buf
1589 subs.DeleteFromDb = true
1590 if !e2SubsDelRequired {
1591 c.handleXAPPSubscriptionDeleteRequest(params)
1593 c.SendSubscriptionDeleteReqToE2T(subs, params)
1599 func (c *Control) PrintRESTSubscriptionRequest(p *models.SubscriptionParams) {
1601 fmt.Println("CRESTSubscriptionRequest")
1607 if p.SubscriptionID != "" {
1608 fmt.Println(" SubscriptionID = ", p.SubscriptionID)
1610 fmt.Println(" SubscriptionID = ''")
1613 fmt.Printf(" ClientEndpoint.Host = %s\n", p.ClientEndpoint.Host)
1615 if p.ClientEndpoint.HTTPPort != nil {
1616 fmt.Printf(" ClientEndpoint.HTTPPort = %v\n", *p.ClientEndpoint.HTTPPort)
1618 fmt.Println(" ClientEndpoint.HTTPPort = nil")
1621 if p.ClientEndpoint.RMRPort != nil {
1622 fmt.Printf(" ClientEndpoint.RMRPort = %v\n", *p.ClientEndpoint.RMRPort)
1624 fmt.Println(" ClientEndpoint.RMRPort = nil")
1628 fmt.Printf(" Meid = %s\n", *p.Meid)
1630 fmt.Println(" Meid = nil")
1633 if p.E2SubscriptionDirectives == nil {
1634 fmt.Println(" E2SubscriptionDirectives = nil")
1636 fmt.Println(" E2SubscriptionDirectives")
1637 if p.E2SubscriptionDirectives.E2RetryCount == nil {
1638 fmt.Println(" E2RetryCount == nil")
1640 fmt.Printf(" E2RetryCount = %v\n", *p.E2SubscriptionDirectives.E2RetryCount)
1642 fmt.Printf(" E2TimeoutTimerValue = %v\n", p.E2SubscriptionDirectives.E2TimeoutTimerValue)
1643 fmt.Printf(" RMRRoutingNeeded = %v\n", p.E2SubscriptionDirectives.RMRRoutingNeeded)
1645 for _, subscriptionDetail := range p.SubscriptionDetails {
1646 if p.RANFunctionID != nil {
1647 fmt.Printf(" RANFunctionID = %v\n", *p.RANFunctionID)
1649 fmt.Println(" RANFunctionID = nil")
1651 fmt.Printf(" SubscriptionDetail.XappEventInstanceID = %v\n", *subscriptionDetail.XappEventInstanceID)
1652 fmt.Printf(" SubscriptionDetail.EventTriggers = %v\n", subscriptionDetail.EventTriggers)
1654 for _, actionToBeSetup := range subscriptionDetail.ActionToBeSetupList {
1655 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionID = %v\n", *actionToBeSetup.ActionID)
1656 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionType = %s\n", *actionToBeSetup.ActionType)
1657 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionDefinition = %v\n", actionToBeSetup.ActionDefinition)
1659 if actionToBeSetup.SubsequentAction != nil {
1660 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.SubsequentAction.SubsequentActionType = %s\n", *actionToBeSetup.SubsequentAction.SubsequentActionType)
1661 fmt.Printf(" SubscriptionDetail.ActionToBeSetup..SubsequentAction.TimeToWait = %s\n", *actionToBeSetup.SubsequentAction.TimeToWait)
1663 fmt.Println(" SubscriptionDetail.ActionToBeSetup.SubsequentAction = nil")
1669 //-------------------------------------------------------------------
1670 // handle from E2T Subscription Delete Required
1671 //-------------------------------------------------------------------
1672 func (c *Control) handleE2TSubscriptionDeleteRequired(params *xapp.RMRParams) {
1673 xapp.Logger.Info("MSG from E2T: %s", params.String())
1674 c.UpdateCounter(cSubDelRequFromE2)
1675 subsDelRequMsg, err := c.e2ap.UnpackSubscriptionDeleteRequired(params.Payload)
1677 xapp.Logger.Error("MSG-SubDelRequired: %s", idstring(err, params))
1678 //c.sendE2TErrorIndication(nil)
1681 var subscriptions = map[string][]e2ap.E2APSubscriptionDeleteRequired{}
1682 var subDB = []*Subscription{}
1683 for _, subsTobeRemove := range subsDelRequMsg.E2APSubscriptionDeleteRequiredRequests {
1684 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subsTobeRemove.RequestId.InstanceId})
1686 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1689 // Check if Delete Subscription Already triggered
1690 if subs.OngoingDelCount > 0 {
1693 subDB = append(subDB, subs)
1694 for _, endpoint := range subs.EpList.Endpoints {
1695 subscriptions[endpoint.Addr] = append(subscriptions[endpoint.Addr], subsTobeRemove)
1697 // Sending Subscription Delete Request to E2T
1698 // c.SendSubscriptionDeleteReq(subs, true)
1700 for _, subsTobeRemove := range subDB {
1701 // Sending Subscription Delete Request to E2T
1702 c.SendSubscriptionDeleteReq(subsTobeRemove, true)
1706 //-------------------------------------------------------------------
1707 // handle from E2T Error Indication
1708 //-------------------------------------------------------------------
1709 func (c *Control) handleE2RanErrorIndication(params *xapp.RMRParams) {
1710 xapp.Logger.Debug("Received ErrorIndication from E2T")
1711 xapp.Logger.Error("MSG from E2T: %s", params.String())
1713 c.UpdateCounter(cErrorIndicationFromE2Node)
1715 errorIndication, err := c.e2ap.UnpackErrorIndicationFromE2Node(params.Payload)
1717 xapp.Logger.Error("MSG-ErrorIndication From E2Node %s", idstring(err, params))
1721 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{errorIndication.RequestId.InstanceId})
1723 xapp.Logger.Error("Unknown/Invalid InstanceId from E2Node. Dropping ErrorIndication from E2Node.")
1724 xapp.Logger.Error("MSG-ErrorIndication From E2Node: %s", idstring(err, params))
1725 if errorIndication.IsCausePresent == true {
1726 xapp.Logger.Debug("Cause is present in received ErrorIndication Message - Content: %v, Value: %v", errorIndication.Cause.Content, errorIndication.Cause.Value)
1731 trans := subs.GetTransaction()
1733 err = fmt.Errorf("Ongoing transaction not found")
1734 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
1735 if errorIndication.IsCausePresent == true {
1736 xapp.Logger.Debug("Cause is present in received ErrorIndication Message - Content: %v, Value: %v", errorIndication.Cause.Content, errorIndication.Cause.Value)
1741 if errorIndication.IsCausePresent == true {
1742 xapp.Logger.Debug("Cause present in ErrorIndication is: Content: %v, Value: %v", errorIndication.Cause.Content, errorIndication.Cause.Value)
1743 if (errorIndication.Cause.Content == e2ap.E2AP_CauseContent_Misc && (errorIndication.Cause.Value == e2ap.E2AP_CauseValue_CauseMisc_hardware_failure ||
1744 errorIndication.Cause.Value == e2ap.E2AP_CauseValue_CauseMisc_om_intervention)) ||
1745 (errorIndication.Cause.Content == e2ap.E2AP_CauseContent_E2node && errorIndication.Cause.Value == e2ap.E2AP_CauseValue_CauseE2node_e2node_component_unknown) {
1747 sendOk, timedOut := trans.SendEvent(errorIndication, e2tRecvMsgTimeout)
1748 if sendOk == false {
1749 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1750 xapp.Logger.Error("MSG-ErrorIndication: %s", idstring(err, trans, subs))
1753 xapp.Logger.Debug("Cause present in ErrorIndication is not serious problem. Retrying SubReq Procedure")
1759 //-----------------------------------------------------------------
1760 // Initiate RIC Subscription Delete Request after receiving
1761 // RIC Subscription Delete Required from E2T
1762 //-----------------------------------------------------------------
1763 func (c *Control) SendSubscriptionDeleteReqToE2T(subs *Subscription, params *xapp.RMRParams) {
1764 xapp.Logger.Debug("MSG TO E2T: %s", params.String())
1765 c.UpdateCounter(cSubDelReqToE2)
1767 if c.e2IfState.IsE2ConnectionUp(¶ms.Meid.RanName) == false {
1768 xapp.Logger.Error("No E2 connection for ranName %v", params.Meid.RanName)
1772 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subs.ReqId.RequestId, params.Meid)
1774 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
1777 defer trans.Release()
1779 err := c.tracker.Track(trans)
1781 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
1788 subs.OngoingDelCount++
1789 go c.handleSubscriptionDelete(subs, trans, waitRouteCleanup_ms)
1790 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
1791 subs.OngoingDelCount--
1793 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
1795 if subs.NoRespToXapp == true {
1796 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
1797 xapp.Logger.Debug("XAPP-SubDelReq: subs.NoRespToXapp == true")