2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
28 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
29 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
30 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
31 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/common"
32 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
33 httptransport "github.com/go-openapi/runtime/client"
34 "github.com/go-openapi/strfmt"
35 "github.com/segmentio/ksuid"
36 "github.com/spf13/viper"
39 //-----------------------------------------------------------------------------
41 //-----------------------------------------------------------------------------
43 func idstring(err error, entries ...fmt.Stringer) string {
44 var retval string = ""
45 var filler string = ""
46 for _, entry := range entries {
48 retval += filler + entry.String()
51 retval += filler + "(NIL)"
55 retval += filler + "err(" + err.Error() + ")"
61 //-----------------------------------------------------------------------------
63 //-----------------------------------------------------------------------------
65 var e2tSubReqTimeout time.Duration
66 var e2tSubDelReqTime time.Duration
67 var e2tRecvMsgTimeout time.Duration
68 var waitRouteCleanup_ms time.Duration
69 var e2tMaxSubReqTryCount uint64 // Initial try + retry
70 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
71 var checkE2State string
72 var readSubsFromDb string
73 var dbRetryForever string
81 restDuplicateCtrl *DuplicateCtrl
83 e2IfStateDb XappRnibInterface
85 restSubsDb Sdlnterface
88 Counters map[string]xapp.Counter
99 type SubmgrRestartTestEvent struct{}
100 type SubmgrRestartUpEvent struct{}
101 type PackSubscriptionRequestErrortEvent struct {
105 func (p *PackSubscriptionRequestErrortEvent) SetEvent(errorInfo *ErrorInfo) {
106 p.ErrorInfo = *errorInfo
109 type SDLWriteErrortEvent struct {
113 func (s *SDLWriteErrortEvent) SetEvent(errorInfo *ErrorInfo) {
114 s.ErrorInfo = *errorInfo
118 xapp.Logger.Debug("SUBMGR")
120 viper.SetEnvPrefix("submgr")
121 viper.AllowEmptyEnv(true)
124 func NewControl() *Control {
126 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
127 rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
129 registry := new(Registry)
130 registry.Initialize()
131 registry.rtmgrClient = &rtmgrClient
133 tracker := new(Tracker)
136 restDuplicateCtrl := new(DuplicateCtrl)
137 restDuplicateCtrl.Init()
139 e2IfState := new(E2IfState)
141 c := &Control{e2ap: new(E2ap),
144 restDuplicateCtrl: restDuplicateCtrl,
145 e2IfState: e2IfState,
146 e2IfStateDb: CreateXappRnibIfInstance(),
147 e2SubsDb: CreateSdl(),
148 restSubsDb: CreateRESTSdl(),
149 Counters: xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
154 c.ReadConfigParameters("")
156 // Register REST handler for testing support
157 xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET")
158 xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
159 xapp.Resource.InjectRoute("/ric/v1/restsubscriptions", c.GetAllRestSubscriptions, "GET")
161 xapp.Resource.InjectRoute("/ric/v1/get_all_e2nodes", c.GetAllE2Nodes, "GET")
162 xapp.Resource.InjectRoute("/ric/v1/get_e2node_rest_subscriptions/{ranName}", c.GetAllE2NodeRestSubscriptions, "GET")
164 xapp.Resource.InjectRoute("/ric/v1/get_all_xapps", c.GetAllXapps, "GET")
165 xapp.Resource.InjectRoute("/ric/v1/get_xapp_rest_restsubscriptions/{xappServiceName}", c.GetAllXappRestSubscriptions, "GET")
166 xapp.Resource.InjectRoute("/ric/v1/get_e2subscriptions/{restId}", c.GetE2Subscriptions, "GET")
168 xapp.Resource.InjectRoute("/ric/v1/delete_all_e2node_subscriptions/{ranName}", c.DeleteAllE2nodeSubscriptions, "DELETE")
169 xapp.Resource.InjectRoute("/ric/v1/delete_all_xapp_subscriptions/{xappServiceName}", c.DeleteAllXappSubscriptions, "DELETE")
171 if readSubsFromDb == "true" {
172 // Read subscriptions from db
173 err := c.ReadE2Subscriptions()
175 xapp.Logger.Error("ReadE2Subscriptions() failed %s", err.Error())
177 err = c.ReadRESTSubscriptions()
179 xapp.Logger.Error("ReadRESTSubscriptions() failed %s", err.Error())
184 err := xapp.Subscription.Listen(c.RESTSubscriptionHandler, c.RESTQueryHandler, c.RESTSubscriptionDeleteHandler)
186 xapp.Logger.Error("xapp.Subscription.Listen failure: %s", err.Error())
192 func (c *Control) SymptomDataHandler(w http.ResponseWriter, r *http.Request) {
193 subscriptions, err := c.registry.QueryHandler()
195 xapp.Logger.Error("QueryHandler() failed %s", err.Error())
198 xapp.Resource.SendSymptomDataJson(w, r, subscriptions, "platform/subscriptions.json")
201 //-------------------------------------------------------------------
203 //-------------------------------------------------------------------
204 func (c *Control) RESTQueryHandler() (models.SubscriptionList, error) {
205 xapp.Logger.Debug("RESTQueryHandler() called")
209 return c.registry.QueryHandler()
212 //-------------------------------------------------------------------
214 //-------------------------------------------------------------------
215 func (c *Control) ReadE2Subscriptions() error {
218 var register map[uint32]*Subscription
219 for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
220 xapp.Logger.Debug("Reading E2 subscriptions from db")
221 subIds, register, err = c.ReadAllSubscriptionsFromSdl()
223 xapp.Logger.Error("%v", err)
224 <-time.After(1 * time.Second)
226 c.registry.subIds = subIds
227 c.registry.register = register
228 go c.HandleUncompletedSubscriptions(register)
232 xapp.Logger.Debug("Continuing without retring")
236 //-------------------------------------------------------------------
238 //-------------------------------------------------------------------
239 func (c *Control) ReadRESTSubscriptions() error {
241 xapp.Logger.Debug("ReadRESTSubscriptions()")
243 var restSubscriptions map[string]*RESTSubscription
244 for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
245 xapp.Logger.Debug("Reading REST subscriptions from db")
246 restSubscriptions, err = c.ReadAllRESTSubscriptionsFromSdl()
248 xapp.Logger.Error("%v", err)
249 <-time.After(1 * time.Second)
251 // Fix REST subscriptions ongoing status after restart
252 for restSubId, restSubscription := range restSubscriptions {
253 restSubscription.SubReqOngoing = false
254 restSubscription.SubDelReqOngoing = false
255 err := c.WriteRESTSubscriptionToSdl(restSubId, restSubscription)
257 xapp.Logger.Error("WriteRESTSubscriptionToSdl() failed:%s", err.Error())
260 c.registry.restSubscriptions = restSubscriptions
264 xapp.Logger.Debug("Continuing without retring")
268 //-------------------------------------------------------------------
270 //-------------------------------------------------------------------
271 func (c *Control) ReadConfigParameters(f string) {
273 xapp.Logger.Debug("ReadConfigParameters")
275 c.LoggerLevel = int(xapp.Logger.GetLevel())
276 xapp.Logger.Info("LoggerLevel = %v", c.LoggerLevel)
277 c.e2ap.SetASN1DebugPrintStatus(c.LoggerLevel)
279 // viper.GetDuration returns nanoseconds
280 e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
281 if e2tSubReqTimeout == 0 {
282 e2tSubReqTimeout = 2000 * 1000000
283 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tSubReqTimeout")
285 xapp.Logger.Debug("e2tSubReqTimeout= %v", e2tSubReqTimeout)
287 e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
288 if e2tSubDelReqTime == 0 {
289 e2tSubDelReqTime = 2000 * 1000000
290 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tSubDelReqTime")
292 xapp.Logger.Debug("e2tSubDelReqTime= %v", e2tSubDelReqTime)
294 e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
295 if e2tRecvMsgTimeout == 0 {
296 e2tRecvMsgTimeout = 2000 * 1000000
297 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tRecvMsgTimeout")
299 xapp.Logger.Debug("e2tRecvMsgTimeout= %v", e2tRecvMsgTimeout)
301 e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
302 if e2tMaxSubReqTryCount == 0 {
303 e2tMaxSubReqTryCount = 1
304 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tMaxSubReqTryCount")
306 xapp.Logger.Debug("e2tMaxSubReqTryCount= %v", e2tMaxSubReqTryCount)
308 e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
309 if e2tMaxSubDelReqTryCount == 0 {
310 e2tMaxSubDelReqTryCount = 1
311 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tMaxSubDelReqTryCount")
313 xapp.Logger.Debug("e2tMaxSubDelReqTryCount= %v", e2tMaxSubDelReqTryCount)
315 checkE2State = viper.GetString("controls.checkE2State")
316 if checkE2State == "" {
317 checkE2State = "true"
318 xapp.Logger.Debug("WARNING: Using hard coded default value for checkE2State")
320 xapp.Logger.Debug("checkE2State= %v", checkE2State)
322 readSubsFromDb = viper.GetString("controls.readSubsFromDb")
323 if readSubsFromDb == "" {
324 readSubsFromDb = "true"
325 xapp.Logger.Debug("WARNING: Using hard coded default value for readSubsFromDb")
327 xapp.Logger.Debug("readSubsFromDb= %v", readSubsFromDb)
329 dbTryCount = viper.GetInt("controls.dbTryCount")
332 xapp.Logger.Debug("WARNING: Using hard coded default value for dbTryCount")
334 xapp.Logger.Debug("dbTryCount= %v", dbTryCount)
336 dbRetryForever = viper.GetString("controls.dbRetryForever")
337 if dbRetryForever == "" {
338 dbRetryForever = "true"
339 xapp.Logger.Debug("WARNING: Using hard coded default value for dbRetryForever")
341 xapp.Logger.Debug("dbRetryForever= %v", dbRetryForever)
343 // Internal cfg parameter, used to define a wait time for RMR route clean-up. None default
344 // value 100ms used currently only in unittests.
345 waitRouteCleanup_ms = viper.GetDuration("controls.waitRouteCleanup_ms") * 1000000
346 if waitRouteCleanup_ms == 0 {
347 waitRouteCleanup_ms = 5000 * 1000000
348 xapp.Logger.Debug("WARNING: Using hard coded default value for waitRouteCleanup_ms")
350 xapp.Logger.Debug("waitRouteCleanup= %v", waitRouteCleanup_ms)
353 //-------------------------------------------------------------------
355 //-------------------------------------------------------------------
356 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
358 xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
359 for subId, subs := range register {
360 if subs.SubRespRcvd == false {
361 // If policy subscription has already been made successfully unsuccessful update should not be deleted.
362 if subs.PolicyUpdate == false {
363 subs.NoRespToXapp = true
364 xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
365 c.SendSubscriptionDeleteReq(subs, false)
371 func (c *Control) ReadyCB(data interface{}) {
372 if c.RMRClient == nil {
373 c.RMRClient = xapp.Rmr
377 func (c *Control) Run() {
378 xapp.SetReadyCB(c.ReadyCB, nil)
379 xapp.AddConfigChangeListener(c.ReadConfigParameters)
383 //-------------------------------------------------------------------
385 //-------------------------------------------------------------------
386 func (c *Control) GetOrCreateRestSubscription(p *models.SubscriptionParams, md5sum string, xAppRmrEndpoint string, xAppServiceName string) (*RESTSubscription, string, error) {
389 var restSubscription *RESTSubscription
392 prevRestSubsId, exists := c.restDuplicateCtrl.GetLastKnownRestSubsIdBasedOnMd5sum(md5sum)
393 if p.SubscriptionID == "" {
394 // Subscription does not contain REST subscription Id
396 restSubscription, err = c.registry.GetRESTSubscription(prevRestSubsId, false)
397 if restSubscription != nil {
398 // Subscription not found
399 restSubId = prevRestSubsId
401 xapp.Logger.Debug("Existing restSubId %s found by MD5sum %s for a request without subscription ID - using previous subscription", prevRestSubsId, md5sum)
403 xapp.Logger.Debug("Existing restSubId %s found by MD5sum %s for a request without subscription ID - Note: %s", prevRestSubsId, md5sum, err.Error())
406 xapp.Logger.Debug("None existing restSubId %s referred by MD5sum %s for a request without subscription ID - deleting cached entry", prevRestSubsId, md5sum)
407 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
411 if restSubscription == nil {
412 restSubId = ksuid.New().String()
413 restSubscription = c.registry.CreateRESTSubscription(&restSubId, &xAppServiceName, &xAppRmrEndpoint, p.Meid)
416 // Subscription contains REST subscription Id
417 restSubId = p.SubscriptionID
419 xapp.Logger.Debug("RestSubscription ID %s provided via REST request", restSubId)
420 restSubscription, err = c.registry.GetRESTSubscription(restSubId, false)
422 // Subscription with id in REST request does not exist
423 xapp.Logger.Error("%s", err.Error())
424 c.UpdateCounter(cRestSubFailToXapp)
429 xapp.Logger.Debug("Existing restSubscription found for ID %s, new request based on md5sum", restSubId)
431 xapp.Logger.Debug("Existing restSubscription found for ID %s(%s), re-transmission based on md5sum match with previous request", prevRestSubsId, restSubId)
435 return restSubscription, restSubId, nil
438 //-------------------------------------------------------------------
440 //-------------------------------------------------------------------
441 func (c *Control) RESTSubscriptionHandler(params interface{}) (*models.SubscriptionResponse, int) {
444 c.UpdateCounter(cRestSubReqFromXapp)
446 subResp := models.SubscriptionResponse{}
447 p := params.(*models.SubscriptionParams)
449 if c.LoggerLevel > 2 {
450 c.PrintRESTSubscriptionRequest(p)
453 if c.e2IfState.IsE2ConnectionUp(p.Meid) == false || c.e2IfState.IsE2ConnectionUnderReset(p.Meid) == true {
454 if c.e2IfState.IsE2ConnectionUp(p.Meid) == false {
455 xapp.Logger.Error("No E2 connection for ranName %v", *p.Meid)
456 } else if c.e2IfState.IsE2ConnectionUnderReset(p.Meid) == true {
457 xapp.Logger.Error("E2 Node for ranName %v UNDER RESET", *p.Meid)
459 c.UpdateCounter(cRestReqRejDueE2Down)
460 return nil, common.SubscribeServiceUnavailableCode
463 if p.ClientEndpoint == nil {
464 err := fmt.Errorf("ClientEndpoint == nil")
465 xapp.Logger.Error("%v", err)
466 c.UpdateCounter(cRestSubFailToXapp)
467 return nil, common.SubscribeBadRequestCode
470 e2SubscriptionDirectives, err := c.GetE2SubscriptionDirectives(p)
472 xapp.Logger.Error("%s", err)
473 c.UpdateCounter(cRestSubFailToXapp)
474 return nil, common.SubscribeBadRequestCode
476 _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*p.ClientEndpoint)
478 xapp.Logger.Error("%s", err.Error())
479 c.UpdateCounter(cRestSubFailToXapp)
480 return nil, common.SubscribeBadRequestCode
483 md5sum, err := CalculateRequestMd5sum(params)
485 xapp.Logger.Error("Failed to generate md5sum from incoming request - %s", err.Error())
488 restSubscription, restSubId, err := c.GetOrCreateRestSubscription(p, md5sum, xAppRmrEndpoint, p.ClientEndpoint.Host)
490 xapp.Logger.Error("Subscription with id in REST request does not exist")
491 return nil, common.SubscribeNotFoundCode
494 subResp.SubscriptionID = &restSubId
495 subReqList := e2ap.SubscriptionRequestList{}
496 err = c.e2ap.FillSubscriptionReqMsgs(params, &subReqList, restSubscription)
498 xapp.Logger.Error("%s", err.Error())
499 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
500 c.registry.DeleteRESTSubscription(&restSubId)
501 c.UpdateCounter(cRestSubFailToXapp)
502 return nil, common.SubscribeBadRequestCode
505 duplicate := c.restDuplicateCtrl.IsDuplicateToOngoingTransaction(restSubId, md5sum)
507 err := fmt.Errorf("Retransmission blocker direct ACK for request of restSubsId %s restSubId MD5sum %s as retransmission", restSubId, md5sum)
508 xapp.Logger.Debug("%s", err)
509 c.registry.DeleteRESTSubscription(&restSubId)
510 c.UpdateCounter(cRestSubRespToXapp)
511 return &subResp, common.SubscribeCreatedCode
514 c.WriteRESTSubscriptionToDb(restSubId, restSubscription)
515 go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId, xAppRmrEndpoint, md5sum, e2SubscriptionDirectives)
517 c.UpdateCounter(cRestSubRespToXapp)
518 return &subResp, common.SubscribeCreatedCode
521 //-------------------------------------------------------------------
523 //-------------------------------------------------------------------
524 func (c *Control) GetE2SubscriptionDirectives(p *models.SubscriptionParams) (*E2SubscriptionDirectives, error) {
526 e2SubscriptionDirectives := &E2SubscriptionDirectives{}
527 if p == nil || p.E2SubscriptionDirectives == nil {
528 e2SubscriptionDirectives.E2TimeoutTimerValue = e2tSubReqTimeout
529 e2SubscriptionDirectives.E2MaxTryCount = int64(e2tMaxSubReqTryCount)
530 e2SubscriptionDirectives.CreateRMRRoute = true
531 xapp.Logger.Debug("p == nil || p.E2SubscriptionDirectives == nil. Using default values for E2TimeoutTimerValue = %v and E2RetryCount = %v RMRRoutingNeeded = true", e2tSubReqTimeout, e2tMaxSubReqTryCount)
533 if p.E2SubscriptionDirectives.E2TimeoutTimerValue >= 1 && p.E2SubscriptionDirectives.E2TimeoutTimerValue <= 10 {
534 e2SubscriptionDirectives.E2TimeoutTimerValue = time.Duration(p.E2SubscriptionDirectives.E2TimeoutTimerValue) * 1000000000 // Duration type cast returns nano seconds
536 return nil, fmt.Errorf("p.E2SubscriptionDirectives.E2TimeoutTimerValue out of range (1-10 seconds): %v", p.E2SubscriptionDirectives.E2TimeoutTimerValue)
538 if p.E2SubscriptionDirectives.E2RetryCount == nil {
539 xapp.Logger.Error("p.E2SubscriptionDirectives.E2RetryCount == nil. Using default value")
540 e2SubscriptionDirectives.E2MaxTryCount = int64(e2tMaxSubReqTryCount)
542 if *p.E2SubscriptionDirectives.E2RetryCount >= 0 && *p.E2SubscriptionDirectives.E2RetryCount <= 10 {
543 e2SubscriptionDirectives.E2MaxTryCount = *p.E2SubscriptionDirectives.E2RetryCount + 1 // E2MaxTryCount = First sending plus two retries
545 return nil, fmt.Errorf("p.E2SubscriptionDirectives.E2RetryCount out of range (0-10): %v", *p.E2SubscriptionDirectives.E2RetryCount)
548 e2SubscriptionDirectives.CreateRMRRoute = p.E2SubscriptionDirectives.RMRRoutingNeeded
550 xapp.Logger.Debug("e2SubscriptionDirectives.E2TimeoutTimerValue: %v", e2SubscriptionDirectives.E2TimeoutTimerValue)
551 xapp.Logger.Debug("e2SubscriptionDirectives.E2MaxTryCount: %v", e2SubscriptionDirectives.E2MaxTryCount)
552 xapp.Logger.Debug("e2SubscriptionDirectives.CreateRMRRoute: %v", e2SubscriptionDirectives.CreateRMRRoute)
553 return e2SubscriptionDirectives, nil
556 //-------------------------------------------------------------------
558 //-------------------------------------------------------------------
560 func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription, subReqList *e2ap.SubscriptionRequestList,
561 clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string, xAppRmrEndpoint string, md5sum string, e2SubscriptionDirectives *E2SubscriptionDirectives) {
563 c.SubscriptionProcessingStartDelay()
564 xapp.Logger.Debug("E2 SubscriptionRequest count = %v ", len(subReqList.E2APSubscriptionRequests))
566 var xAppEventInstanceID int64
567 var e2EventInstanceID int64
568 errorInfo := &ErrorInfo{}
570 defer c.restDuplicateCtrl.SetMd5sumFromLastOkRequest(*restSubId, md5sum)
572 for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ {
573 subReqMsg := subReqList.E2APSubscriptionRequests[index]
574 xAppEventInstanceID = (int64)(subReqMsg.RequestId.Id)
576 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), *restSubId, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid})
578 // Send notification to xApp that prosessing of a Subscription Request has failed.
579 err := fmt.Errorf("Tracking failure")
580 errorInfo.ErrorCause = err.Error()
581 c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans, errorInfo)
585 xapp.Logger.Debug("Handle SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
587 subRespMsg, errorInfo, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, *restSubId, e2SubscriptionDirectives)
589 xapp.Logger.Debug("Handled SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
593 if err.Error() == "TEST: restart event received" {
594 // This is just for UT cases. Stop here subscription processing
597 c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans, errorInfo)
599 e2EventInstanceID = (int64)(subRespMsg.RequestId.InstanceId)
600 restSubscription.AddMd5Sum(md5sum)
601 xapp.Logger.Debug("SubscriptionRequest index=%v processed successfullyfor %s. endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
602 index, *restSubId, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
603 c.sendSuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, e2EventInstanceID, clientEndpoint, trans, errorInfo)
608 //-------------------------------------------------------------------
610 //------------------------------------------------------------------
611 func (c *Control) SubscriptionProcessingStartDelay() {
612 if c.UTTesting == true {
613 // This is temporary fix for the UT problem that notification arrives before subscription response
614 // Correct fix would be to allow notification come before response and process it correctly
615 xapp.Logger.Debug("Setting 50 ms delay before starting processing Subscriptions")
616 <-time.After(time.Millisecond * 50)
617 xapp.Logger.Debug("Continuing after delay")
621 //-------------------------------------------------------------------
623 //------------------------------------------------------------------
624 func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, meid *string,
625 restSubId string, e2SubscriptionDirectives *E2SubscriptionDirectives) (*e2ap.E2APSubscriptionResponse, *ErrorInfo, error) {
627 errorInfo := ErrorInfo{}
629 err := c.tracker.Track(trans)
631 xapp.Logger.Error("XAPP-SubReq Tracking error: %s", idstring(err, trans))
632 errorInfo.ErrorCause = err.Error()
633 err = fmt.Errorf("Tracking failure")
634 return nil, &errorInfo, err
637 subs, errorInfo, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c, e2SubscriptionDirectives.CreateRMRRoute)
639 xapp.Logger.Error("XAPP-SubReq Assign error: %s", idstring(err, trans))
640 return nil, &errorInfo, err
646 subs.OngoingReqCount++
647 go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives, 0)
648 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
649 subs.OngoingReqCount--
653 switch themsg := event.(type) {
654 case *e2ap.E2APSubscriptionResponse:
656 if c.e2IfState.IsE2ConnectionUp(meid) == true {
657 errorInfo = c.e2ap.CheckActionNotAdmittedList(xapp.RIC_SUB_RESP, themsg.ActionNotAdmittedList, c)
658 return themsg, &errorInfo, nil
660 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
661 c.RemoveSubscriptionFromDb(subs)
662 err = fmt.Errorf("E2 interface down")
663 errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
665 case *e2ap.E2APSubscriptionFailure:
666 err = fmt.Errorf("RICSubscriptionFailure. E2NodeCause: (Cause:%v, Value %v)", themsg.Cause.Content, themsg.Cause.Value)
667 errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
668 case *PackSubscriptionRequestErrortEvent:
669 err = fmt.Errorf("E2 RICSubscriptionRequest pack failure")
670 errorInfo = themsg.ErrorInfo
671 case *SDLWriteErrortEvent:
672 err = fmt.Errorf("SDL write failure")
673 errorInfo = themsg.ErrorInfo
674 case *SubmgrRestartTestEvent:
675 err = fmt.Errorf("TEST: restart event received")
676 xapp.Logger.Debug("%s", err)
677 return nil, &errorInfo, err
679 err = fmt.Errorf("Unexpected E2 subscription response received")
680 errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
685 err = fmt.Errorf("E2 RICSubscriptionResponse timeout")
686 errorInfo.SetInfo(err.Error(), "", models.SubscriptionInstanceTimeoutTypeE2Timeout)
687 if subs.PolicyUpdate == true {
688 return nil, &errorInfo, err
692 xapp.Logger.Error("XAPP-SubReq E2 subscription failed: %s", idstring(err, trans, subs))
693 // If policy type subscription fails we cannot remove it only internally. Once subscription has been created
694 // successfully, it must be deleted on both sides.
695 if subs.PolicyUpdate == false {
696 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
699 return nil, &errorInfo, err
702 //-------------------------------------------------------------------
704 //-------------------------------------------------------------------
705 func (c *Control) sendUnsuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, err error,
706 clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp, errorInfo *ErrorInfo) {
708 // Send notification to xApp that prosessing of a Subscription Request has failed.
709 e2EventInstanceID := (int64)(0)
710 if errorInfo.ErrorSource == "" {
711 // Submgr is default source of error
712 errorInfo.ErrorSource = models.SubscriptionInstanceErrorSourceSUBMGR
714 resp := &models.SubscriptionResponse{
715 SubscriptionID: restSubId,
716 SubscriptionInstances: []*models.SubscriptionInstance{
717 &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
718 ErrorCause: errorInfo.ErrorCause,
719 ErrorSource: errorInfo.ErrorSource,
720 TimeoutType: errorInfo.TimeoutType,
721 XappEventInstanceID: &xAppEventInstanceID},
724 // Mark REST subscription request processed.
725 restSubscription.SetProcessed(err)
726 c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
728 xapp.Logger.Debug("Sending unsuccessful REST notification: ErrorCause:%s, ErrorSource:%s, TimeoutType:%s, to Endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
729 errorInfo.ErrorCause, errorInfo.ErrorSource, errorInfo.TimeoutType, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
731 xapp.Logger.Debug("Sending unsuccessful REST notification: ErrorCause:%s, ErrorSource:%s, TimeoutType:%s, to Endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v",
732 errorInfo.ErrorCause, errorInfo.ErrorSource, errorInfo.TimeoutType, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID)
735 c.UpdateCounter(cRestSubFailNotifToXapp)
736 err = xapp.Subscription.Notify(resp, *clientEndpoint)
738 xapp.Logger.Error("xapp.Subscription.Notify failed %s", err.Error())
741 // E2 is down. Delete completely processed request safely now
742 if c.e2IfState.IsE2ConnectionUp(&restSubscription.Meid) == false && restSubscription.SubReqOngoing == false {
743 c.registry.DeleteRESTSubscription(restSubId)
744 c.RemoveRESTSubscriptionFromDb(*restSubId)
748 //-------------------------------------------------------------------
750 //-------------------------------------------------------------------
751 func (c *Control) sendSuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, e2EventInstanceID int64,
752 clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp, errorInfo *ErrorInfo) {
754 // Store successfully processed InstanceId for deletion
755 restSubscription.AddE2InstanceId((uint32)(e2EventInstanceID))
756 restSubscription.AddXappIdToE2Id(xAppEventInstanceID, e2EventInstanceID)
758 // Send notification to xApp that a Subscription Request has been processed.
759 resp := &models.SubscriptionResponse{
760 SubscriptionID: restSubId,
761 SubscriptionInstances: []*models.SubscriptionInstance{
762 &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
763 ErrorCause: errorInfo.ErrorCause,
764 ErrorSource: errorInfo.ErrorSource,
765 XappEventInstanceID: &xAppEventInstanceID},
768 // Mark REST subscription request processesd.
769 restSubscription.SetProcessed(nil)
770 c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
771 xapp.Logger.Debug("Sending successful REST notification: ErrorCause:%s, ErrorSource:%s, TimeoutType:%s, to Endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
772 errorInfo.ErrorCause, errorInfo.ErrorSource, errorInfo.TimeoutType, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
773 c.UpdateCounter(cRestSubNotifToXapp)
774 err := xapp.Subscription.Notify(resp, *clientEndpoint)
776 xapp.Logger.Error("xapp.Subscription.Notify failed %s", err.Error())
779 // E2 is down. Delete completely processed request safely now
780 if c.e2IfState.IsE2ConnectionUp(&restSubscription.Meid) == false && restSubscription.SubReqOngoing == false {
781 c.registry.DeleteRESTSubscription(restSubId)
782 c.RemoveRESTSubscriptionFromDb(*restSubId)
786 //-------------------------------------------------------------------
788 //-------------------------------------------------------------------
789 func (c *Control) RESTSubscriptionDeleteHandler(restSubId string) int {
792 c.UpdateCounter(cRestSubDelReqFromXapp)
794 xapp.Logger.Debug("SubscriptionDeleteRequest from XAPP")
796 restSubscription, err := c.registry.GetRESTSubscription(restSubId, true)
798 xapp.Logger.Error("%s", err.Error())
799 if restSubscription == nil {
800 // Subscription was not found
801 c.UpdateCounter(cRestSubDelRespToXapp)
802 return common.UnsubscribeNoContentCode
804 if restSubscription.SubReqOngoing == true {
805 err := fmt.Errorf("Handling of the REST Subscription Request still ongoing %s", restSubId)
806 xapp.Logger.Error("%s", err.Error())
807 c.UpdateCounter(cRestSubDelFailToXapp)
808 return common.UnsubscribeBadRequestCode
809 } else if restSubscription.SubDelReqOngoing == true {
810 // Previous request for same restSubId still ongoing
811 c.UpdateCounter(cRestSubDelRespToXapp)
812 return common.UnsubscribeNoContentCode
817 xAppRmrEndPoint := restSubscription.xAppRmrEndPoint
819 xapp.Logger.Debug("Deleteting handler: processing instances = %v", restSubscription.InstanceIds)
820 for _, instanceId := range restSubscription.InstanceIds {
821 xAppEventInstanceID, err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId, 0)
824 xapp.Logger.Error("%s", err.Error())
826 xapp.Logger.Debug("Deleteting instanceId = %v", instanceId)
827 restSubscription.DeleteXappIdToE2Id(xAppEventInstanceID)
828 restSubscription.DeleteE2InstanceId(instanceId)
830 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(restSubscription.lastReqMd5sum)
831 c.registry.DeleteRESTSubscription(&restSubId)
832 c.RemoveRESTSubscriptionFromDb(restSubId)
835 c.UpdateCounter(cRestSubDelRespToXapp)
836 return common.UnsubscribeNoContentCode
839 //-------------------------------------------------------------------
841 //-------------------------------------------------------------------
842 func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32, waitRouteCleanupTime time.Duration) (int64, error) {
844 var xAppEventInstanceID int64
845 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{instanceId})
847 xapp.Logger.Debug("Subscription Delete Handler subscription for restSubId=%v, E2EventInstanceID=%v not found %s",
848 restSubId, instanceId, idstring(err, nil))
849 return xAppEventInstanceID, nil
852 xAppEventInstanceID = int64(subs.ReqId.Id)
853 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(*endPoint), *restSubId, e2ap.RequestId{subs.ReqId.Id, 0}, &xapp.RMRMeid{RanName: *meid})
855 err := fmt.Errorf("XAPP-SubDelReq transaction not created. restSubId %s, endPoint %s, meid %s, instanceId %v", *restSubId, *endPoint, *meid, instanceId)
856 xapp.Logger.Error("%s", err.Error())
858 defer trans.Release()
860 err = c.tracker.Track(trans)
862 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
863 xapp.Logger.Error("%s", err.Error())
864 return xAppEventInstanceID, &time.ParseError{}
869 subs.OngoingDelCount++
870 go c.handleSubscriptionDelete(subs, trans, waitRouteCleanupTime)
871 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
872 subs.OngoingDelCount--
874 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
876 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
878 return xAppEventInstanceID, nil
881 //-------------------------------------------------------------------
883 //-------------------------------------------------------------------
885 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
886 params := &xapp.RMRParams{}
887 params.Mtype = trans.GetMtype()
888 params.SubId = int(subs.GetReqId().InstanceId)
890 params.Meid = subs.GetMeid()
892 params.PayloadLen = len(trans.Payload.Buf)
893 params.Payload = trans.Payload.Buf
895 xapp.Logger.Debug("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
896 err = c.SendWithRetry(params, false, 5)
898 xapp.Logger.Error("rmrSendToE2T: Send failed: %+v", err)
903 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
905 params := &xapp.RMRParams{}
906 params.Mtype = trans.GetMtype()
907 params.SubId = int(subs.GetReqId().InstanceId)
908 params.Xid = trans.GetXid()
909 params.Meid = trans.GetMeid()
911 params.PayloadLen = len(trans.Payload.Buf)
912 params.Payload = trans.Payload.Buf
914 xapp.Logger.Debug("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
915 err = c.SendWithRetry(params, false, 5)
917 xapp.Logger.Error("rmrSendToXapp: Send failed: %+v", err)
922 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
923 if c.RMRClient == nil {
924 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
925 xapp.Logger.Error("%s", err.Error())
930 defer c.RMRClient.Free(msg.Mbuf)
932 // xapp-frame might use direct access to c buffer and
933 // when msg.Mbuf is freed, someone might take it into use
934 // and payload data might be invalid inside message handle function
936 // subscriptions won't load system a lot so there is no
937 // real performance hit by cloning buffer into new go byte slice
938 cPay := append(msg.Payload[:0:0], msg.Payload...)
940 msg.PayloadLen = len(cPay)
943 case xapp.RIC_SUB_REQ:
944 go c.handleXAPPSubscriptionRequest(msg)
945 case xapp.RIC_SUB_RESP:
946 go c.handleE2TSubscriptionResponse(msg)
947 case xapp.RIC_SUB_FAILURE:
948 go c.handleE2TSubscriptionFailure(msg)
949 case xapp.RIC_SUB_DEL_REQ:
950 go c.handleXAPPSubscriptionDeleteRequest(msg)
951 case xapp.RIC_SUB_DEL_RESP:
952 go c.handleE2TSubscriptionDeleteResponse(msg)
953 case xapp.RIC_SUB_DEL_FAILURE:
954 go c.handleE2TSubscriptionDeleteFailure(msg)
955 case xapp.RIC_SUB_DEL_REQUIRED:
956 go c.handleE2TSubscriptionDeleteRequired(msg)
958 xapp.Logger.Debug("Unknown Message Type '%d', discarding", msg.Mtype)
963 //-------------------------------------------------------------------
964 // handle from XAPP Subscription Request
965 //------------------------------------------------------------------
966 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
967 xapp.Logger.Debug("MSG from XAPP: %s", params.String())
968 c.UpdateCounter(cSubReqFromXapp)
970 if c.e2IfState.IsE2ConnectionUp(¶ms.Meid.RanName) == false {
971 xapp.Logger.Error("No E2 connection for ranName %v", params.Meid.RanName)
975 subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
977 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
981 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
983 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
986 defer trans.Release()
988 if err = c.tracker.Track(trans); err != nil {
989 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
993 subs, _, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c, true)
995 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
999 c.wakeSubscriptionRequest(subs, trans)
1002 //-------------------------------------------------------------------
1003 // Wake Subscription Request to E2node
1004 //------------------------------------------------------------------
1005 func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) {
1007 e2SubscriptionDirectives, err := c.GetE2SubscriptionDirectives(nil)
1009 xapp.Logger.Error("c.GetE2SubscriptionDirectives failure: %s", err.Error())
1011 subs.OngoingReqCount++
1012 go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives, waitRouteCleanup_ms)
1013 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
1014 subs.OngoingReqCount--
1016 switch themsg := event.(type) {
1017 case *e2ap.E2APSubscriptionResponse:
1018 themsg.RequestId.Id = trans.RequestId.Id
1019 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
1022 c.UpdateCounter(cSubRespToXapp)
1023 err := c.rmrSendToXapp("", subs, trans)
1025 xapp.Logger.Error("rmrSendToXapp() failed:%s", err.Error())
1029 case *e2ap.E2APSubscriptionFailure:
1030 themsg.RequestId.Id = trans.RequestId.Id
1031 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
1033 c.UpdateCounter(cSubFailToXapp)
1034 c.rmrSendToXapp("", subs, trans)
1040 xapp.Logger.Debug("XAPP-SubReq: failed %s", idstring(err, trans, subs))
1043 //-------------------------------------------------------------------
1044 // handle from XAPP Subscription Delete Request
1045 //------------------------------------------------------------------
1046 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
1047 xapp.Logger.Debug("MSG from XAPP: %s", params.String())
1048 c.UpdateCounter(cSubDelReqFromXapp)
1050 if c.e2IfState.IsE2ConnectionUp(¶ms.Meid.RanName) == false {
1051 xapp.Logger.Error("No E2 connection for ranName %v", params.Meid.RanName)
1055 subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
1057 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
1061 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
1063 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
1066 defer trans.Release()
1068 err = c.tracker.Track(trans)
1070 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
1074 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
1076 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
1083 subs.OngoingDelCount++
1084 go c.handleSubscriptionDelete(subs, trans, waitRouteCleanup_ms)
1085 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
1086 subs.OngoingDelCount--
1088 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
1090 if subs.NoRespToXapp == true {
1091 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
1092 xapp.Logger.Debug("XAPP-SubDelReq: subs.NoRespToXapp == true")
1096 // Whatever is received success, fail or timeout, send successful delete response
1097 subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
1098 subDelRespMsg.RequestId.Id = trans.RequestId.Id
1099 subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
1100 subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
1101 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
1103 c.UpdateCounter(cSubDelRespToXapp)
1104 err := c.rmrSendToXapp("", subs, trans)
1106 xapp.Logger.Error("rmrSendToXapp() failed:%s", err.Error())
1111 //-------------------------------------------------------------------
1112 // SUBS CREATE Handling
1113 //-------------------------------------------------------------------
1114 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp, e2SubscriptionDirectives *E2SubscriptionDirectives, waitRouteCleanupTime time.Duration) {
1116 var event interface{} = nil
1117 var removeSubscriptionFromDb bool = false
1118 trans := c.tracker.NewSubsTransaction(subs)
1119 subs.WaitTransactionTurn(trans)
1120 defer subs.ReleaseTransactionTurn(trans)
1121 defer trans.Release()
1123 xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
1125 subRfMsg, valid := subs.GetCachedResponse()
1126 if subRfMsg == nil && valid == true {
1127 event = c.sendE2TSubscriptionRequest(subs, trans, parentTrans, e2SubscriptionDirectives)
1128 switch event.(type) {
1129 case *e2ap.E2APSubscriptionResponse:
1130 subRfMsg, valid = subs.SetCachedResponse(event, true)
1131 subs.SubRespRcvd = true
1132 case *e2ap.E2APSubscriptionFailure:
1133 if subs.PolicyUpdate == false {
1134 subRfMsg, valid = subs.SetCachedResponse(event, false)
1136 // In policy update case where subscription has already been created successfully in Gnb
1137 // we cannot delete subscription internally in submgr
1138 subRfMsg, valid = subs.SetCachedResponse(event, true)
1140 xapp.Logger.Debug("SUBS-SubReq: internal delete due failure event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1141 case *SubmgrRestartTestEvent:
1142 // This is used to simulate that no response has been received and after restart, subscriptions are restored from db
1143 xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
1144 subRfMsg, valid = subs.SetCachedResponse(event, false)
1145 parentTrans.SendEvent(subRfMsg, 0)
1147 case *PackSubscriptionRequestErrortEvent, *SDLWriteErrortEvent:
1148 subRfMsg, valid = subs.SetCachedResponse(event, false)
1151 if subs.PolicyUpdate == false {
1152 xapp.Logger.Debug("SUBS-SubReq: internal delete due default event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1153 subRfMsg, valid = subs.SetCachedResponse(nil, false)
1154 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1156 subRfMsg, valid = subs.SetCachedResponse(nil, true)
1159 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
1161 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
1163 xapp.Logger.Debug("subs.PolicyUpdate: %v", subs.PolicyUpdate)
1164 xapp.Logger.Debug("subs: %v", subs)
1167 removeSubscriptionFromDb = true
1170 err := c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
1173 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1177 // Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
1179 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanupTime, c)
1182 parentTrans.SendEvent(subRfMsg, 0)
1185 //-------------------------------------------------------------------
1186 // SUBS DELETE Handling
1187 //-------------------------------------------------------------------
1189 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp, waitRouteCleanupTime time.Duration) {
1191 trans := c.tracker.NewSubsTransaction(subs)
1192 subs.WaitTransactionTurn(trans)
1193 defer subs.ReleaseTransactionTurn(trans)
1194 defer trans.Release()
1196 xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
1200 if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
1203 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1208 // Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
1209 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanupTime, c)
1210 parentTrans.SendEvent(nil, 0)
1213 //-------------------------------------------------------------------
1214 // send to E2T Subscription Request
1215 //-------------------------------------------------------------------
1216 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp, e2SubscriptionDirectives *E2SubscriptionDirectives) interface{} {
1218 var event interface{} = nil
1219 var timedOut bool = false
1220 const ricRequestorId = 123
1222 subReqMsg := subs.SubReqMsg
1223 subReqMsg.RequestId = subs.GetReqId().RequestId
1224 subReqMsg.RequestId.Id = ricRequestorId
1225 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
1227 xapp.Logger.Error("SUBS-SubReq ASN1 pack error: %s", idstring(err, trans, subs, parentTrans))
1228 return &PackSubscriptionRequestErrortEvent{
1230 ErrorSource: models.SubscriptionInstanceErrorSourceASN1,
1231 ErrorCause: err.Error(),
1236 // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
1237 err = c.WriteSubscriptionToDb(subs)
1239 return &SDLWriteErrortEvent{
1241 ErrorSource: models.SubscriptionInstanceErrorSourceDBAAS,
1242 ErrorCause: err.Error(),
1247 for retries := int64(0); retries < e2SubscriptionDirectives.E2MaxTryCount; retries++ {
1248 desc := fmt.Sprintf("(retry %d)", retries)
1250 c.UpdateCounter(cSubReqToE2)
1252 c.UpdateCounter(cSubReReqToE2)
1254 err := c.rmrSendToE2T(desc, subs, trans)
1256 xapp.Logger.Error("rmrSendToE2T() failed:%s", err.Error())
1259 if subs.DoNotWaitSubResp == false {
1260 event, timedOut = trans.WaitEvent(e2SubscriptionDirectives.E2TimeoutTimerValue)
1262 c.UpdateCounter(cSubReqTimerExpiry)
1266 // Simulating case where subscrition request has been sent but response has not been received before restart
1267 event = &SubmgrRestartTestEvent{}
1268 xapp.Logger.Debug("Restart event, DoNotWaitSubResp == true")
1272 xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1276 //-------------------------------------------------------------------
1277 // send to E2T Subscription Delete Request
1278 //-------------------------------------------------------------------
1280 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
1282 var event interface{}
1284 const ricRequestorId = 123
1286 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1287 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1288 subDelReqMsg.RequestId.Id = ricRequestorId
1289 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1290 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1292 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
1296 for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
1297 desc := fmt.Sprintf("(retry %d)", retries)
1299 c.UpdateCounter(cSubDelReqToE2)
1301 c.UpdateCounter(cSubDelReReqToE2)
1303 err := c.rmrSendToE2T(desc, subs, trans)
1305 xapp.Logger.Error("SUBS-SubDelReq: rmrSendToE2T failure: %s", idstring(err, trans, subs, parentTrans))
1307 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
1309 c.UpdateCounter(cSubDelReqTimerExpiry)
1314 xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1318 //-------------------------------------------------------------------
1319 // handle from E2T Subscription Response
1320 //-------------------------------------------------------------------
1321 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
1322 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1323 c.UpdateCounter(cSubRespFromE2)
1325 subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
1327 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
1330 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
1332 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
1335 trans := subs.GetTransaction()
1337 err = fmt.Errorf("Ongoing transaction not found")
1338 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
1341 xapp.Logger.Debug("SUBS-SubResp: Sending event, trans= %v", trans)
1342 sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
1343 if sendOk == false {
1344 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1345 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
1350 //-------------------------------------------------------------------
1351 // handle from E2T Subscription Failure
1352 //-------------------------------------------------------------------
1353 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
1354 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1355 c.UpdateCounter(cSubFailFromE2)
1356 subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
1358 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
1361 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
1363 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
1366 trans := subs.GetTransaction()
1368 err = fmt.Errorf("Ongoing transaction not found")
1369 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
1372 sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
1373 if sendOk == false {
1374 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1375 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
1380 //-------------------------------------------------------------------
1381 // handle from E2T Subscription Delete Response
1382 //-------------------------------------------------------------------
1383 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) {
1384 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1385 c.UpdateCounter(cSubDelRespFromE2)
1386 subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
1388 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1391 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
1393 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1396 trans := subs.GetTransaction()
1398 err = fmt.Errorf("Ongoing transaction not found")
1399 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
1402 sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
1403 if sendOk == false {
1404 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1405 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
1410 //-------------------------------------------------------------------
1411 // handle from E2T Subscription Delete Failure
1412 //-------------------------------------------------------------------
1413 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
1414 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1415 c.UpdateCounter(cSubDelFailFromE2)
1416 subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
1418 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1421 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
1423 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1426 trans := subs.GetTransaction()
1428 err = fmt.Errorf("Ongoing transaction not found")
1429 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
1432 sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
1433 if sendOk == false {
1434 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1435 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
1440 //-------------------------------------------------------------------
1442 //-------------------------------------------------------------------
1443 func typeofSubsMessage(v interface{}) string {
1448 //case *e2ap.E2APSubscriptionRequest:
1450 case *e2ap.E2APSubscriptionResponse:
1452 case *e2ap.E2APSubscriptionFailure:
1454 //case *e2ap.E2APSubscriptionDeleteRequest:
1455 // return "SubDelReq"
1456 case *e2ap.E2APSubscriptionDeleteResponse:
1458 case *e2ap.E2APSubscriptionDeleteFailure:
1465 //-------------------------------------------------------------------
1467 //-------------------------------------------------------------------
1468 func (c *Control) WriteSubscriptionToDb(subs *Subscription) error {
1469 xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
1470 err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
1472 xapp.Logger.Error("%v", err)
1478 //-------------------------------------------------------------------
1480 //-------------------------------------------------------------------
1481 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) error {
1483 if removeSubscriptionFromDb == true {
1484 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1485 c.RemoveSubscriptionFromDb(subs)
1487 // Update is needed for successful response and merge case here
1488 if subs.RetryFromXapp == false {
1489 err := c.WriteSubscriptionToDb(subs)
1493 subs.RetryFromXapp = false
1497 //-------------------------------------------------------------------
1499 //-------------------------------------------------------------------
1500 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
1501 xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
1502 err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
1504 xapp.Logger.Error("%v", err)
1508 //-------------------------------------------------------------------
1510 //-------------------------------------------------------------------
1511 func (c *Control) WriteRESTSubscriptionToDb(restSubId string, restSubs *RESTSubscription) {
1512 xapp.Logger.Debug("WriteRESTSubscriptionToDb() restSubId = %s", restSubId)
1513 err := c.WriteRESTSubscriptionToSdl(restSubId, restSubs)
1515 xapp.Logger.Error("%v", err)
1519 //-------------------------------------------------------------------
1521 //-------------------------------------------------------------------
1522 func (c *Control) UpdateRESTSubscriptionInDB(restSubId string, restSubs *RESTSubscription, removeRestSubscriptionFromDb bool) {
1524 if removeRestSubscriptionFromDb == true {
1525 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1526 c.RemoveRESTSubscriptionFromDb(restSubId)
1528 c.WriteRESTSubscriptionToDb(restSubId, restSubs)
1532 //-------------------------------------------------------------------
1534 //-------------------------------------------------------------------
1535 func (c *Control) RemoveRESTSubscriptionFromDb(restSubId string) {
1536 xapp.Logger.Debug("RemoveRESTSubscriptionFromDb() restSubId = %s", restSubId)
1537 err := c.RemoveRESTSubscriptionFromSdl(restSubId)
1539 xapp.Logger.Error("%v", err)
1543 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription, e2SubsDelRequired bool) {
1545 if c.UTTesting == true {
1546 // Reqistry mutex is not locked after real restart but it can be when restart is simulated in unit tests
1547 c.registry.mutex = new(sync.Mutex)
1550 const ricRequestorId = 123
1551 xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
1553 // Send delete for every endpoint in the subscription
1554 if subs.PolicyUpdate == false {
1555 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1556 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1557 subDelReqMsg.RequestId.Id = ricRequestorId
1558 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1559 mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1561 xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
1564 for _, endPoint := range subs.EpList.Endpoints {
1565 params := &xapp.RMRParams{}
1566 params.Mtype = mType
1567 params.SubId = int(subs.GetReqId().InstanceId)
1569 params.Meid = subs.Meid
1570 params.Src = endPoint.String()
1571 params.PayloadLen = len(payload.Buf)
1572 params.Payload = payload.Buf
1574 subs.DeleteFromDb = true
1575 if !e2SubsDelRequired {
1576 c.handleXAPPSubscriptionDeleteRequest(params)
1578 c.SendSubscriptionDeleteReqToE2T(subs, params)
1584 func (c *Control) PrintRESTSubscriptionRequest(p *models.SubscriptionParams) {
1586 fmt.Println("CRESTSubscriptionRequest")
1592 if p.SubscriptionID != "" {
1593 fmt.Println(" SubscriptionID = ", p.SubscriptionID)
1595 fmt.Println(" SubscriptionID = ''")
1598 fmt.Printf(" ClientEndpoint.Host = %s\n", p.ClientEndpoint.Host)
1600 if p.ClientEndpoint.HTTPPort != nil {
1601 fmt.Printf(" ClientEndpoint.HTTPPort = %v\n", *p.ClientEndpoint.HTTPPort)
1603 fmt.Println(" ClientEndpoint.HTTPPort = nil")
1606 if p.ClientEndpoint.RMRPort != nil {
1607 fmt.Printf(" ClientEndpoint.RMRPort = %v\n", *p.ClientEndpoint.RMRPort)
1609 fmt.Println(" ClientEndpoint.RMRPort = nil")
1613 fmt.Printf(" Meid = %s\n", *p.Meid)
1615 fmt.Println(" Meid = nil")
1618 if p.E2SubscriptionDirectives == nil {
1619 fmt.Println(" E2SubscriptionDirectives = nil")
1621 fmt.Println(" E2SubscriptionDirectives")
1622 if p.E2SubscriptionDirectives.E2RetryCount == nil {
1623 fmt.Println(" E2RetryCount == nil")
1625 fmt.Printf(" E2RetryCount = %v\n", *p.E2SubscriptionDirectives.E2RetryCount)
1627 fmt.Printf(" E2TimeoutTimerValue = %v\n", p.E2SubscriptionDirectives.E2TimeoutTimerValue)
1628 fmt.Printf(" RMRRoutingNeeded = %v\n", p.E2SubscriptionDirectives.RMRRoutingNeeded)
1630 for _, subscriptionDetail := range p.SubscriptionDetails {
1631 if p.RANFunctionID != nil {
1632 fmt.Printf(" RANFunctionID = %v\n", *p.RANFunctionID)
1634 fmt.Println(" RANFunctionID = nil")
1636 fmt.Printf(" SubscriptionDetail.XappEventInstanceID = %v\n", *subscriptionDetail.XappEventInstanceID)
1637 fmt.Printf(" SubscriptionDetail.EventTriggers = %v\n", subscriptionDetail.EventTriggers)
1639 for _, actionToBeSetup := range subscriptionDetail.ActionToBeSetupList {
1640 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionID = %v\n", *actionToBeSetup.ActionID)
1641 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionType = %s\n", *actionToBeSetup.ActionType)
1642 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionDefinition = %v\n", actionToBeSetup.ActionDefinition)
1644 if actionToBeSetup.SubsequentAction != nil {
1645 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.SubsequentAction.SubsequentActionType = %s\n", *actionToBeSetup.SubsequentAction.SubsequentActionType)
1646 fmt.Printf(" SubscriptionDetail.ActionToBeSetup..SubsequentAction.TimeToWait = %s\n", *actionToBeSetup.SubsequentAction.TimeToWait)
1648 fmt.Println(" SubscriptionDetail.ActionToBeSetup.SubsequentAction = nil")
1654 //-------------------------------------------------------------------
1655 // handle from E2T Subscription Delete Required
1656 //-------------------------------------------------------------------
1657 func (c *Control) handleE2TSubscriptionDeleteRequired(params *xapp.RMRParams) {
1658 xapp.Logger.Info("MSG from E2T: %s", params.String())
1659 c.UpdateCounter(cSubDelRequFromE2)
1660 subsDelRequMsg, err := c.e2ap.UnpackSubscriptionDeleteRequired(params.Payload)
1662 xapp.Logger.Error("MSG-SubDelRequired: %s", idstring(err, params))
1663 //c.sendE2TErrorIndication(nil)
1666 var subscriptions = map[string][]e2ap.E2APSubscriptionDeleteRequired{}
1667 var subDB = []*Subscription{}
1668 for _, subsTobeRemove := range subsDelRequMsg.E2APSubscriptionDeleteRequiredRequests {
1669 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subsTobeRemove.RequestId.InstanceId})
1671 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1674 // Check if Delete Subscription Already triggered
1675 if subs.OngoingDelCount > 0 {
1678 subDB = append(subDB, subs)
1679 for _, endpoint := range subs.EpList.Endpoints {
1680 subscriptions[endpoint.Addr] = append(subscriptions[endpoint.Addr], subsTobeRemove)
1682 // Sending Subscription Delete Request to E2T
1683 // c.SendSubscriptionDeleteReq(subs, true)
1685 for _, subsTobeRemove := range subDB {
1686 // Sending Subscription Delete Request to E2T
1687 c.SendSubscriptionDeleteReq(subsTobeRemove, true)
1691 //-----------------------------------------------------------------
1692 // Initiate RIC Subscription Delete Request after receiving
1693 // RIC Subscription Delete Required from E2T
1694 //-----------------------------------------------------------------
1695 func (c *Control) SendSubscriptionDeleteReqToE2T(subs *Subscription, params *xapp.RMRParams) {
1696 xapp.Logger.Debug("MSG TO E2T: %s", params.String())
1697 c.UpdateCounter(cSubDelReqToE2)
1699 if c.e2IfState.IsE2ConnectionUp(¶ms.Meid.RanName) == false {
1700 xapp.Logger.Error("No E2 connection for ranName %v", params.Meid.RanName)
1704 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subs.ReqId.RequestId, params.Meid)
1706 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
1709 defer trans.Release()
1711 err := c.tracker.Track(trans)
1713 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
1720 subs.OngoingDelCount++
1721 go c.handleSubscriptionDelete(subs, trans, waitRouteCleanup_ms)
1722 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
1723 subs.OngoingDelCount--
1725 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
1727 if subs.NoRespToXapp == true {
1728 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
1729 xapp.Logger.Debug("XAPP-SubDelReq: subs.NoRespToXapp == true")