2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
28 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
29 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
30 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
31 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/common"
32 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
33 httptransport "github.com/go-openapi/runtime/client"
34 "github.com/go-openapi/strfmt"
35 "github.com/segmentio/ksuid"
36 "github.com/spf13/viper"
39 //-----------------------------------------------------------------------------
41 //-----------------------------------------------------------------------------
43 func idstring(err error, entries ...fmt.Stringer) string {
44 var retval string = ""
45 var filler string = ""
46 for _, entry := range entries {
48 retval += filler + entry.String()
51 retval += filler + "(NIL)"
55 retval += filler + "err(" + err.Error() + ")"
61 //-----------------------------------------------------------------------------
63 //-----------------------------------------------------------------------------
65 var e2tSubReqTimeout time.Duration
66 var e2tSubDelReqTime time.Duration
67 var e2tRecvMsgTimeout time.Duration
68 var waitRouteCleanup_ms time.Duration
69 var e2tMaxSubReqTryCount uint64 // Initial try + retry
70 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
71 var checkE2State string
72 var readSubsFromDb string
73 var dbRetryForever string
81 restDuplicateCtrl *DuplicateCtrl
83 e2IfStateDb XappRnibInterface
85 restSubsDb Sdlnterface
88 Counters map[string]xapp.Counter
99 type SubmgrRestartTestEvent struct{}
100 type SubmgrRestartUpEvent struct{}
101 type PackSubscriptionRequestErrortEvent struct {
105 func (p *PackSubscriptionRequestErrortEvent) SetEvent(errorInfo *ErrorInfo) {
106 p.ErrorInfo = *errorInfo
109 type SDLWriteErrortEvent struct {
113 func (s *SDLWriteErrortEvent) SetEvent(errorInfo *ErrorInfo) {
114 s.ErrorInfo = *errorInfo
118 xapp.Logger.Debug("SUBMGR")
120 viper.SetEnvPrefix("submgr")
121 viper.AllowEmptyEnv(true)
124 func NewControl() *Control {
126 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
127 rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
129 registry := new(Registry)
130 registry.Initialize()
131 registry.rtmgrClient = &rtmgrClient
133 tracker := new(Tracker)
136 restDuplicateCtrl := new(DuplicateCtrl)
137 restDuplicateCtrl.Init()
139 e2IfState := new(E2IfState)
141 c := &Control{e2ap: new(E2ap),
144 restDuplicateCtrl: restDuplicateCtrl,
145 e2IfState: e2IfState,
146 e2IfStateDb: CreateXappRnibIfInstance(),
147 e2SubsDb: CreateSdl(),
148 restSubsDb: CreateRESTSdl(),
149 Counters: xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
154 c.ReadConfigParameters("")
156 // Register REST handler for testing support
157 xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET")
158 xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
159 xapp.Resource.InjectRoute("/ric/v1/restsubscriptions", c.GetAllRestSubscriptions, "GET")
161 xapp.Resource.InjectRoute("/ric/v1/get_all_e2nodes", c.GetAllE2Nodes, "GET")
162 xapp.Resource.InjectRoute("/ric/v1/get_e2node_rest_subscriptions/{ranName}", c.GetAllE2NodeRestSubscriptions, "GET")
164 xapp.Resource.InjectRoute("/ric/v1/get_all_xapps", c.GetAllXapps, "GET")
165 xapp.Resource.InjectRoute("/ric/v1/get_xapp_rest_restsubscriptions/{xappServiceName}", c.GetAllXappRestSubscriptions, "GET")
166 xapp.Resource.InjectRoute("/ric/v1/get_e2subscriptions/{restId}", c.GetE2Subscriptions, "GET")
168 xapp.Resource.InjectRoute("/ric/v1/delete_all_e2node_subscriptions/{ranName}", c.DeleteAllE2nodeSubscriptions, "DELETE")
169 xapp.Resource.InjectRoute("/ric/v1/delete_all_xapp_subscriptions/{xappServiceName}", c.DeleteAllXappSubscriptions, "DELETE")
171 if readSubsFromDb == "true" {
172 // Read subscriptions from db
173 err := c.ReadE2Subscriptions()
175 xapp.Logger.Error("ReadE2Subscriptions() failed %s", err.Error())
177 err = c.ReadRESTSubscriptions()
179 xapp.Logger.Error("ReadRESTSubscriptions() failed %s", err.Error())
183 go xapp.Subscription.Listen(c.RESTSubscriptionHandler, c.RESTQueryHandler, c.RESTSubscriptionDeleteHandler)
187 func (c *Control) SymptomDataHandler(w http.ResponseWriter, r *http.Request) {
188 subscriptions, err := c.registry.QueryHandler()
190 xapp.Logger.Error("QueryHandler() failed %s", err.Error())
193 xapp.Resource.SendSymptomDataJson(w, r, subscriptions, "platform/subscriptions.json")
196 //-------------------------------------------------------------------
198 //-------------------------------------------------------------------
199 func (c *Control) RESTQueryHandler() (models.SubscriptionList, error) {
200 xapp.Logger.Debug("RESTQueryHandler() called")
204 return c.registry.QueryHandler()
207 //-------------------------------------------------------------------
209 //-------------------------------------------------------------------
210 func (c *Control) ReadE2Subscriptions() error {
213 var register map[uint32]*Subscription
214 for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
215 xapp.Logger.Debug("Reading E2 subscriptions from db")
216 subIds, register, err = c.ReadAllSubscriptionsFromSdl()
218 xapp.Logger.Error("%v", err)
219 <-time.After(1 * time.Second)
221 c.registry.subIds = subIds
222 c.registry.register = register
223 go c.HandleUncompletedSubscriptions(register)
227 xapp.Logger.Debug("Continuing without retring")
231 //-------------------------------------------------------------------
233 //-------------------------------------------------------------------
234 func (c *Control) ReadRESTSubscriptions() error {
236 xapp.Logger.Debug("ReadRESTSubscriptions()")
238 var restSubscriptions map[string]*RESTSubscription
239 for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
240 xapp.Logger.Debug("Reading REST subscriptions from db")
241 restSubscriptions, err = c.ReadAllRESTSubscriptionsFromSdl()
243 xapp.Logger.Error("%v", err)
244 <-time.After(1 * time.Second)
246 // Fix REST subscriptions ongoing status after restart
247 for restSubId, restSubscription := range restSubscriptions {
248 restSubscription.SubReqOngoing = false
249 restSubscription.SubDelReqOngoing = false
250 err := c.WriteRESTSubscriptionToSdl(restSubId, restSubscription)
252 xapp.Logger.Error("WriteRESTSubscriptionToSdl() failed:%s", err.Error())
255 c.registry.restSubscriptions = restSubscriptions
259 xapp.Logger.Debug("Continuing without retring")
263 //-------------------------------------------------------------------
265 //-------------------------------------------------------------------
266 func (c *Control) ReadConfigParameters(f string) {
268 xapp.Logger.Debug("ReadConfigParameters")
270 c.LoggerLevel = int(xapp.Logger.GetLevel())
271 xapp.Logger.Info("LoggerLevel = %v", c.LoggerLevel)
272 c.e2ap.SetASN1DebugPrintStatus(c.LoggerLevel)
274 // viper.GetDuration returns nanoseconds
275 e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
276 if e2tSubReqTimeout == 0 {
277 e2tSubReqTimeout = 2000 * 1000000
278 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tSubReqTimeout")
280 xapp.Logger.Debug("e2tSubReqTimeout= %v", e2tSubReqTimeout)
282 e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
283 if e2tSubDelReqTime == 0 {
284 e2tSubDelReqTime = 2000 * 1000000
285 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tSubDelReqTime")
287 xapp.Logger.Debug("e2tSubDelReqTime= %v", e2tSubDelReqTime)
289 e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
290 if e2tRecvMsgTimeout == 0 {
291 e2tRecvMsgTimeout = 2000 * 1000000
292 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tRecvMsgTimeout")
294 xapp.Logger.Debug("e2tRecvMsgTimeout= %v", e2tRecvMsgTimeout)
296 e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
297 if e2tMaxSubReqTryCount == 0 {
298 e2tMaxSubReqTryCount = 1
299 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tMaxSubReqTryCount")
301 xapp.Logger.Debug("e2tMaxSubReqTryCount= %v", e2tMaxSubReqTryCount)
303 e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
304 if e2tMaxSubDelReqTryCount == 0 {
305 e2tMaxSubDelReqTryCount = 1
306 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tMaxSubDelReqTryCount")
308 xapp.Logger.Debug("e2tMaxSubDelReqTryCount= %v", e2tMaxSubDelReqTryCount)
310 checkE2State = viper.GetString("controls.checkE2State")
311 if checkE2State == "" {
312 checkE2State = "true"
313 xapp.Logger.Debug("WARNING: Using hard coded default value for checkE2State")
315 xapp.Logger.Debug("checkE2State= %v", checkE2State)
317 readSubsFromDb = viper.GetString("controls.readSubsFromDb")
318 if readSubsFromDb == "" {
319 readSubsFromDb = "true"
320 xapp.Logger.Debug("WARNING: Using hard coded default value for readSubsFromDb")
322 xapp.Logger.Debug("readSubsFromDb= %v", readSubsFromDb)
324 dbTryCount = viper.GetInt("controls.dbTryCount")
327 xapp.Logger.Debug("WARNING: Using hard coded default value for dbTryCount")
329 xapp.Logger.Debug("dbTryCount= %v", dbTryCount)
331 dbRetryForever = viper.GetString("controls.dbRetryForever")
332 if dbRetryForever == "" {
333 dbRetryForever = "true"
334 xapp.Logger.Debug("WARNING: Using hard coded default value for dbRetryForever")
336 xapp.Logger.Debug("dbRetryForever= %v", dbRetryForever)
338 // Internal cfg parameter, used to define a wait time for RMR route clean-up. None default
339 // value 100ms used currently only in unittests.
340 waitRouteCleanup_ms = viper.GetDuration("controls.waitRouteCleanup_ms") * 1000000
341 if waitRouteCleanup_ms == 0 {
342 waitRouteCleanup_ms = 5000 * 1000000
343 xapp.Logger.Debug("WARNING: Using hard coded default value for waitRouteCleanup_ms")
345 xapp.Logger.Debug("waitRouteCleanup= %v", waitRouteCleanup_ms)
348 //-------------------------------------------------------------------
350 //-------------------------------------------------------------------
351 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
353 xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
354 for subId, subs := range register {
355 if subs.SubRespRcvd == false {
356 // If policy subscription has already been made successfully unsuccessful update should not be deleted.
357 if subs.PolicyUpdate == false {
358 subs.NoRespToXapp = true
359 xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
360 c.SendSubscriptionDeleteReq(subs)
366 func (c *Control) ReadyCB(data interface{}) {
367 if c.RMRClient == nil {
368 c.RMRClient = xapp.Rmr
372 func (c *Control) Run() {
373 xapp.SetReadyCB(c.ReadyCB, nil)
374 xapp.AddConfigChangeListener(c.ReadConfigParameters)
378 //-------------------------------------------------------------------
380 //-------------------------------------------------------------------
381 func (c *Control) GetOrCreateRestSubscription(p *models.SubscriptionParams, md5sum string, xAppRmrEndpoint string, xAppServiceName string) (*RESTSubscription, string, error) {
384 var restSubscription *RESTSubscription
387 prevRestSubsId, exists := c.restDuplicateCtrl.GetLastKnownRestSubsIdBasedOnMd5sum(md5sum)
388 if p.SubscriptionID == "" {
389 // Subscription does not contain REST subscription Id
391 restSubscription, err = c.registry.GetRESTSubscription(prevRestSubsId, false)
392 if restSubscription != nil {
393 // Subscription not found
394 restSubId = prevRestSubsId
396 xapp.Logger.Debug("Existing restSubId %s found by MD5sum %s for a request without subscription ID - using previous subscription", prevRestSubsId, md5sum)
398 xapp.Logger.Debug("Existing restSubId %s found by MD5sum %s for a request without subscription ID - Note: %s", prevRestSubsId, md5sum, err.Error())
401 xapp.Logger.Debug("None existing restSubId %s referred by MD5sum %s for a request without subscription ID - deleting cached entry", prevRestSubsId, md5sum)
402 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
406 if restSubscription == nil {
407 restSubId = ksuid.New().String()
408 restSubscription = c.registry.CreateRESTSubscription(&restSubId, &xAppServiceName, &xAppRmrEndpoint, p.Meid)
411 // Subscription contains REST subscription Id
412 restSubId = p.SubscriptionID
414 xapp.Logger.Debug("RestSubscription ID %s provided via REST request", restSubId)
415 restSubscription, err = c.registry.GetRESTSubscription(restSubId, false)
417 // Subscription with id in REST request does not exist
418 xapp.Logger.Error("%s", err.Error())
419 c.UpdateCounter(cRestSubFailToXapp)
424 xapp.Logger.Debug("Existing restSubscription found for ID %s, new request based on md5sum", restSubId)
426 xapp.Logger.Debug("Existing restSubscription found for ID %s(%s), re-transmission based on md5sum match with previous request", prevRestSubsId, restSubId)
430 return restSubscription, restSubId, nil
433 //-------------------------------------------------------------------
435 //-------------------------------------------------------------------
436 func (c *Control) RESTSubscriptionHandler(params interface{}) (*models.SubscriptionResponse, int) {
439 c.UpdateCounter(cRestSubReqFromXapp)
441 subResp := models.SubscriptionResponse{}
442 p := params.(*models.SubscriptionParams)
444 if c.LoggerLevel > 2 {
445 c.PrintRESTSubscriptionRequest(p)
448 if c.e2IfState.IsE2ConnectionUp(p.Meid) == false {
449 xapp.Logger.Error("No E2 connection for ranName %v", *p.Meid)
450 c.UpdateCounter(cRestReqRejDueE2Down)
451 return nil, common.SubscribeServiceUnavailableCode
454 if p.ClientEndpoint == nil {
455 err := fmt.Errorf("ClientEndpoint == nil")
456 xapp.Logger.Error("%v", err)
457 c.UpdateCounter(cRestSubFailToXapp)
458 return nil, common.SubscribeBadRequestCode
461 e2SubscriptionDirectives, err := c.GetE2SubscriptionDirectives(p)
463 xapp.Logger.Error("%s", err)
464 c.UpdateCounter(cRestSubFailToXapp)
465 return nil, common.SubscribeBadRequestCode
467 _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*p.ClientEndpoint)
469 xapp.Logger.Error("%s", err.Error())
470 c.UpdateCounter(cRestSubFailToXapp)
471 return nil, common.SubscribeBadRequestCode
474 md5sum, err := CalculateRequestMd5sum(params)
476 xapp.Logger.Error("Failed to generate md5sum from incoming request - %s", err.Error())
479 restSubscription, restSubId, err := c.GetOrCreateRestSubscription(p, md5sum, xAppRmrEndpoint, p.ClientEndpoint.Host)
481 xapp.Logger.Error("Subscription with id in REST request does not exist")
482 return nil, common.SubscribeNotFoundCode
485 subResp.SubscriptionID = &restSubId
486 subReqList := e2ap.SubscriptionRequestList{}
487 err = c.e2ap.FillSubscriptionReqMsgs(params, &subReqList, restSubscription)
489 xapp.Logger.Error("%s", err.Error())
490 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
491 c.registry.DeleteRESTSubscription(&restSubId)
492 c.UpdateCounter(cRestSubFailToXapp)
493 return nil, common.SubscribeBadRequestCode
496 duplicate := c.restDuplicateCtrl.IsDuplicateToOngoingTransaction(restSubId, md5sum)
498 err := fmt.Errorf("Retransmission blocker direct ACK for request of restSubsId %s restSubId MD5sum %s as retransmission", restSubId, md5sum)
499 xapp.Logger.Debug("%s", err)
500 c.registry.DeleteRESTSubscription(&restSubId)
501 c.UpdateCounter(cRestSubRespToXapp)
502 return &subResp, common.SubscribeCreatedCode
505 c.WriteRESTSubscriptionToDb(restSubId, restSubscription)
506 go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId, xAppRmrEndpoint, md5sum, e2SubscriptionDirectives)
508 c.UpdateCounter(cRestSubRespToXapp)
509 return &subResp, common.SubscribeCreatedCode
512 //-------------------------------------------------------------------
514 //-------------------------------------------------------------------
515 func (c *Control) GetE2SubscriptionDirectives(p *models.SubscriptionParams) (*E2SubscriptionDirectives, error) {
517 e2SubscriptionDirectives := &E2SubscriptionDirectives{}
518 if p == nil || p.E2SubscriptionDirectives == nil {
519 e2SubscriptionDirectives.E2TimeoutTimerValue = e2tSubReqTimeout
520 e2SubscriptionDirectives.E2MaxTryCount = int64(e2tMaxSubReqTryCount)
521 e2SubscriptionDirectives.CreateRMRRoute = true
522 xapp.Logger.Debug("p == nil || p.E2SubscriptionDirectives == nil. Using default values for E2TimeoutTimerValue = %v and E2RetryCount = %v RMRRoutingNeeded = true", e2tSubReqTimeout, e2tMaxSubReqTryCount)
524 if p.E2SubscriptionDirectives.E2TimeoutTimerValue >= 1 && p.E2SubscriptionDirectives.E2TimeoutTimerValue <= 10 {
525 e2SubscriptionDirectives.E2TimeoutTimerValue = time.Duration(p.E2SubscriptionDirectives.E2TimeoutTimerValue) * 1000000000 // Duration type cast returns nano seconds
527 return nil, fmt.Errorf("p.E2SubscriptionDirectives.E2TimeoutTimerValue out of range (1-10 seconds): %v", p.E2SubscriptionDirectives.E2TimeoutTimerValue)
529 if p.E2SubscriptionDirectives.E2RetryCount == nil {
530 xapp.Logger.Error("p.E2SubscriptionDirectives.E2RetryCount == nil. Using default value")
531 e2SubscriptionDirectives.E2MaxTryCount = int64(e2tMaxSubReqTryCount)
533 if *p.E2SubscriptionDirectives.E2RetryCount >= 0 && *p.E2SubscriptionDirectives.E2RetryCount <= 10 {
534 e2SubscriptionDirectives.E2MaxTryCount = *p.E2SubscriptionDirectives.E2RetryCount + 1 // E2MaxTryCount = First sending plus two retries
536 return nil, fmt.Errorf("p.E2SubscriptionDirectives.E2RetryCount out of range (0-10): %v", *p.E2SubscriptionDirectives.E2RetryCount)
539 e2SubscriptionDirectives.CreateRMRRoute = p.E2SubscriptionDirectives.RMRRoutingNeeded
541 xapp.Logger.Debug("e2SubscriptionDirectives.E2TimeoutTimerValue: %v", e2SubscriptionDirectives.E2TimeoutTimerValue)
542 xapp.Logger.Debug("e2SubscriptionDirectives.E2MaxTryCount: %v", e2SubscriptionDirectives.E2MaxTryCount)
543 xapp.Logger.Debug("e2SubscriptionDirectives.CreateRMRRoute: %v", e2SubscriptionDirectives.CreateRMRRoute)
544 return e2SubscriptionDirectives, nil
547 //-------------------------------------------------------------------
549 //-------------------------------------------------------------------
551 func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription, subReqList *e2ap.SubscriptionRequestList,
552 clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string, xAppRmrEndpoint string, md5sum string, e2SubscriptionDirectives *E2SubscriptionDirectives) {
554 c.SubscriptionProcessingStartDelay()
555 xapp.Logger.Debug("E2 SubscriptionRequest count = %v ", len(subReqList.E2APSubscriptionRequests))
557 var xAppEventInstanceID int64
558 var e2EventInstanceID int64
559 errorInfo := &ErrorInfo{}
561 defer c.restDuplicateCtrl.SetMd5sumFromLastOkRequest(*restSubId, md5sum)
563 for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ {
564 subReqMsg := subReqList.E2APSubscriptionRequests[index]
565 xAppEventInstanceID = (int64)(subReqMsg.RequestId.Id)
567 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), *restSubId, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid})
569 // Send notification to xApp that prosessing of a Subscription Request has failed.
570 err := fmt.Errorf("Tracking failure")
571 errorInfo.ErrorCause = err.Error()
572 c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans, errorInfo)
576 xapp.Logger.Debug("Handle SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
578 subRespMsg, errorInfo, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, *restSubId, e2SubscriptionDirectives)
580 xapp.Logger.Debug("Handled SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
584 if err.Error() == "TEST: restart event received" {
585 // This is just for UT cases. Stop here subscription processing
588 c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans, errorInfo)
590 e2EventInstanceID = (int64)(subRespMsg.RequestId.InstanceId)
591 restSubscription.AddMd5Sum(md5sum)
592 xapp.Logger.Debug("SubscriptionRequest index=%v processed successfullyfor %s. endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
593 index, *restSubId, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
594 c.sendSuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, e2EventInstanceID, clientEndpoint, trans, errorInfo)
599 //-------------------------------------------------------------------
601 //------------------------------------------------------------------
602 func (c *Control) SubscriptionProcessingStartDelay() {
603 if c.UTTesting == true {
604 // This is temporary fix for the UT problem that notification arrives before subscription response
605 // Correct fix would be to allow notification come before response and process it correctly
606 xapp.Logger.Debug("Setting 50 ms delay before starting processing Subscriptions")
607 <-time.After(time.Millisecond * 50)
608 xapp.Logger.Debug("Continuing after delay")
612 //-------------------------------------------------------------------
614 //------------------------------------------------------------------
615 func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, meid *string,
616 restSubId string, e2SubscriptionDirectives *E2SubscriptionDirectives) (*e2ap.E2APSubscriptionResponse, *ErrorInfo, error) {
618 errorInfo := ErrorInfo{}
620 err := c.tracker.Track(trans)
622 xapp.Logger.Error("XAPP-SubReq Tracking error: %s", idstring(err, trans))
623 errorInfo.ErrorCause = err.Error()
624 err = fmt.Errorf("Tracking failure")
625 return nil, &errorInfo, err
628 subs, errorInfo, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c, e2SubscriptionDirectives.CreateRMRRoute)
630 xapp.Logger.Error("XAPP-SubReq Assign error: %s", idstring(err, trans))
631 return nil, &errorInfo, err
637 subs.OngoingReqCount++
638 go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives, 0)
639 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
640 subs.OngoingReqCount--
644 switch themsg := event.(type) {
645 case *e2ap.E2APSubscriptionResponse:
647 if c.e2IfState.IsE2ConnectionUp(meid) == true {
648 errorInfo = c.e2ap.CheckActionNotAdmittedList(xapp.RIC_SUB_RESP, themsg.ActionNotAdmittedList, c)
649 return themsg, &errorInfo, nil
651 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
652 c.RemoveSubscriptionFromDb(subs)
653 err = fmt.Errorf("E2 interface down")
654 errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
656 case *e2ap.E2APSubscriptionFailure:
657 err = fmt.Errorf("RICSubscriptionFailure. E2NodeCause: (Cause:%v, Value %v)", themsg.Cause.Content, themsg.Cause.Value)
658 errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
659 case *PackSubscriptionRequestErrortEvent:
660 err = fmt.Errorf("E2 RICSubscriptionRequest pack failure")
661 errorInfo = themsg.ErrorInfo
662 case *SDLWriteErrortEvent:
663 err = fmt.Errorf("SDL write failure")
664 errorInfo = themsg.ErrorInfo
665 case *SubmgrRestartTestEvent:
666 err = fmt.Errorf("TEST: restart event received")
667 xapp.Logger.Debug("%s", err)
668 return nil, &errorInfo, err
670 err = fmt.Errorf("Unexpected E2 subscription response received")
671 errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
676 err = fmt.Errorf("E2 RICSubscriptionResponse timeout")
677 errorInfo.SetInfo(err.Error(), "", models.SubscriptionInstanceTimeoutTypeE2Timeout)
678 if subs.PolicyUpdate == true {
679 return nil, &errorInfo, err
683 xapp.Logger.Error("XAPP-SubReq E2 subscription failed: %s", idstring(err, trans, subs))
684 err2 := c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
686 xapp.Logger.Error("RemoveFromSubscription failed: %s", err2.Error())
688 return nil, &errorInfo, err
691 //-------------------------------------------------------------------
693 //-------------------------------------------------------------------
694 func (c *Control) sendUnsuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, err error,
695 clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp, errorInfo *ErrorInfo) {
697 // Send notification to xApp that prosessing of a Subscription Request has failed.
698 e2EventInstanceID := (int64)(0)
699 if errorInfo.ErrorSource == "" {
700 // Submgr is default source of error
701 errorInfo.ErrorSource = models.SubscriptionInstanceErrorSourceSUBMGR
703 resp := &models.SubscriptionResponse{
704 SubscriptionID: restSubId,
705 SubscriptionInstances: []*models.SubscriptionInstance{
706 &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
707 ErrorCause: errorInfo.ErrorCause,
708 ErrorSource: errorInfo.ErrorSource,
709 TimeoutType: errorInfo.TimeoutType,
710 XappEventInstanceID: &xAppEventInstanceID},
713 // Mark REST subscription request processed.
714 restSubscription.SetProcessed(err)
715 c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
717 xapp.Logger.Debug("Sending unsuccessful REST notification: ErrorCause:%s, ErrorSource:%s, TimeoutType:%s, to Endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
718 errorInfo.ErrorCause, errorInfo.ErrorSource, errorInfo.TimeoutType, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
720 xapp.Logger.Debug("Sending unsuccessful REST notification: ErrorCause:%s, ErrorSource:%s, TimeoutType:%s, to Endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v",
721 errorInfo.ErrorCause, errorInfo.ErrorSource, errorInfo.TimeoutType, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID)
724 c.UpdateCounter(cRestSubFailNotifToXapp)
725 err = xapp.Subscription.Notify(resp, *clientEndpoint)
727 xapp.Logger.Error("xapp.Subscription.Notify failed %s", err.Error())
730 // E2 is down. Delete completely processed request safely now
731 if c.e2IfState.IsE2ConnectionUp(&restSubscription.Meid) == false && restSubscription.SubReqOngoing == false {
732 c.registry.DeleteRESTSubscription(restSubId)
733 c.RemoveRESTSubscriptionFromDb(*restSubId)
737 //-------------------------------------------------------------------
739 //-------------------------------------------------------------------
740 func (c *Control) sendSuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, e2EventInstanceID int64,
741 clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp, errorInfo *ErrorInfo) {
743 // Store successfully processed InstanceId for deletion
744 restSubscription.AddE2InstanceId((uint32)(e2EventInstanceID))
745 restSubscription.AddXappIdToE2Id(xAppEventInstanceID, e2EventInstanceID)
747 // Send notification to xApp that a Subscription Request has been processed.
748 resp := &models.SubscriptionResponse{
749 SubscriptionID: restSubId,
750 SubscriptionInstances: []*models.SubscriptionInstance{
751 &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
752 ErrorCause: errorInfo.ErrorCause,
753 ErrorSource: errorInfo.ErrorSource,
754 XappEventInstanceID: &xAppEventInstanceID},
757 // Mark REST subscription request processesd.
758 restSubscription.SetProcessed(nil)
759 c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
760 xapp.Logger.Debug("Sending successful REST notification: ErrorCause:%s, ErrorSource:%s, TimeoutType:%s, to Endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
761 errorInfo.ErrorCause, errorInfo.ErrorSource, errorInfo.TimeoutType, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
762 c.UpdateCounter(cRestSubNotifToXapp)
763 err := xapp.Subscription.Notify(resp, *clientEndpoint)
765 xapp.Logger.Error("xapp.Subscription.Notify failed %s", err.Error())
768 // E2 is down. Delete completely processed request safely now
769 if c.e2IfState.IsE2ConnectionUp(&restSubscription.Meid) == false && restSubscription.SubReqOngoing == false {
770 c.registry.DeleteRESTSubscription(restSubId)
771 c.RemoveRESTSubscriptionFromDb(*restSubId)
775 //-------------------------------------------------------------------
777 //-------------------------------------------------------------------
778 func (c *Control) RESTSubscriptionDeleteHandler(restSubId string) int {
781 c.UpdateCounter(cRestSubDelReqFromXapp)
783 xapp.Logger.Debug("SubscriptionDeleteRequest from XAPP")
785 restSubscription, err := c.registry.GetRESTSubscription(restSubId, true)
787 xapp.Logger.Error("%s", err.Error())
788 if restSubscription == nil {
789 // Subscription was not found
790 c.UpdateCounter(cRestSubDelRespToXapp)
791 return common.UnsubscribeNoContentCode
793 if restSubscription.SubReqOngoing == true {
794 err := fmt.Errorf("Handling of the REST Subscription Request still ongoing %s", restSubId)
795 xapp.Logger.Error("%s", err.Error())
796 c.UpdateCounter(cRestSubDelFailToXapp)
797 return common.UnsubscribeBadRequestCode
798 } else if restSubscription.SubDelReqOngoing == true {
799 // Previous request for same restSubId still ongoing
800 c.UpdateCounter(cRestSubDelRespToXapp)
801 return common.UnsubscribeNoContentCode
806 xAppRmrEndPoint := restSubscription.xAppRmrEndPoint
808 xapp.Logger.Debug("Deleteting handler: processing instances = %v", restSubscription.InstanceIds)
809 for _, instanceId := range restSubscription.InstanceIds {
810 xAppEventInstanceID, err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId, 0)
813 xapp.Logger.Error("%s", err.Error())
815 xapp.Logger.Debug("Deleteting instanceId = %v", instanceId)
816 restSubscription.DeleteXappIdToE2Id(xAppEventInstanceID)
817 restSubscription.DeleteE2InstanceId(instanceId)
819 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(restSubscription.lastReqMd5sum)
820 c.registry.DeleteRESTSubscription(&restSubId)
821 c.RemoveRESTSubscriptionFromDb(restSubId)
824 c.UpdateCounter(cRestSubDelRespToXapp)
825 return common.UnsubscribeNoContentCode
828 //-------------------------------------------------------------------
830 //-------------------------------------------------------------------
831 func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32, waitRouteCleanupTime time.Duration) (int64, error) {
833 var xAppEventInstanceID int64
834 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{instanceId})
836 xapp.Logger.Debug("Subscription Delete Handler subscription for restSubId=%v, E2EventInstanceID=%v not found %s",
837 restSubId, instanceId, idstring(err, nil))
838 return xAppEventInstanceID, nil
841 xAppEventInstanceID = int64(subs.ReqId.Id)
842 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(*endPoint), *restSubId, e2ap.RequestId{subs.ReqId.Id, 0}, &xapp.RMRMeid{RanName: *meid})
844 err := fmt.Errorf("XAPP-SubDelReq transaction not created. restSubId %s, endPoint %s, meid %s, instanceId %v", *restSubId, *endPoint, *meid, instanceId)
845 xapp.Logger.Error("%s", err.Error())
847 defer trans.Release()
849 err = c.tracker.Track(trans)
851 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
852 xapp.Logger.Error("%s", err.Error())
853 return xAppEventInstanceID, &time.ParseError{}
858 subs.OngoingDelCount++
859 go c.handleSubscriptionDelete(subs, trans, waitRouteCleanupTime)
860 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
861 subs.OngoingDelCount--
863 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
865 err = c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
867 xapp.Logger.Error("XAPP-SubDelReq %s:", idstring(fmt.Errorf("RemoveFromSubscription faliled"), trans, subs))
870 return xAppEventInstanceID, nil
873 //-------------------------------------------------------------------
875 //-------------------------------------------------------------------
877 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
878 params := &xapp.RMRParams{}
879 params.Mtype = trans.GetMtype()
880 params.SubId = int(subs.GetReqId().InstanceId)
882 params.Meid = subs.GetMeid()
884 params.PayloadLen = len(trans.Payload.Buf)
885 params.Payload = trans.Payload.Buf
887 xapp.Logger.Debug("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
888 err = c.SendWithRetry(params, false, 5)
890 xapp.Logger.Error("rmrSendToE2T: Send failed: %+v", err)
895 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
897 params := &xapp.RMRParams{}
898 params.Mtype = trans.GetMtype()
899 params.SubId = int(subs.GetReqId().InstanceId)
900 params.Xid = trans.GetXid()
901 params.Meid = trans.GetMeid()
903 params.PayloadLen = len(trans.Payload.Buf)
904 params.Payload = trans.Payload.Buf
906 xapp.Logger.Debug("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
907 err = c.SendWithRetry(params, false, 5)
909 xapp.Logger.Error("rmrSendToXapp: Send failed: %+v", err)
914 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
915 if c.RMRClient == nil {
916 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
917 xapp.Logger.Error("%s", err.Error())
922 defer c.RMRClient.Free(msg.Mbuf)
924 // xapp-frame might use direct access to c buffer and
925 // when msg.Mbuf is freed, someone might take it into use
926 // and payload data might be invalid inside message handle function
928 // subscriptions won't load system a lot so there is no
929 // real performance hit by cloning buffer into new go byte slice
930 cPay := append(msg.Payload[:0:0], msg.Payload...)
932 msg.PayloadLen = len(cPay)
935 case xapp.RIC_SUB_REQ:
936 go c.handleXAPPSubscriptionRequest(msg)
937 case xapp.RIC_SUB_RESP:
938 go c.handleE2TSubscriptionResponse(msg)
939 case xapp.RIC_SUB_FAILURE:
940 go c.handleE2TSubscriptionFailure(msg)
941 case xapp.RIC_SUB_DEL_REQ:
942 go c.handleXAPPSubscriptionDeleteRequest(msg)
943 case xapp.RIC_SUB_DEL_RESP:
944 go c.handleE2TSubscriptionDeleteResponse(msg)
945 case xapp.RIC_SUB_DEL_FAILURE:
946 go c.handleE2TSubscriptionDeleteFailure(msg)
948 xapp.Logger.Debug("Unknown Message Type '%d', discarding", msg.Mtype)
953 //-------------------------------------------------------------------
954 // handle from XAPP Subscription Request
955 //------------------------------------------------------------------
956 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
957 xapp.Logger.Debug("MSG from XAPP: %s", params.String())
958 c.UpdateCounter(cSubReqFromXapp)
960 if c.e2IfState.IsE2ConnectionUp(¶ms.Meid.RanName) == false {
961 xapp.Logger.Error("No E2 connection for ranName %v", params.Meid.RanName)
965 subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
967 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
971 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
973 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
976 defer trans.Release()
978 if err = c.tracker.Track(trans); err != nil {
979 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
983 subs, _, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c, true)
985 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
989 c.wakeSubscriptionRequest(subs, trans)
992 //-------------------------------------------------------------------
993 // Wake Subscription Request to E2node
994 //------------------------------------------------------------------
995 func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) {
997 e2SubscriptionDirectives, _ := c.GetE2SubscriptionDirectives(nil)
998 subs.OngoingReqCount++
999 go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives, waitRouteCleanup_ms)
1000 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
1001 subs.OngoingReqCount--
1004 switch themsg := event.(type) {
1005 case *e2ap.E2APSubscriptionResponse:
1006 themsg.RequestId.Id = trans.RequestId.Id
1007 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
1010 c.UpdateCounter(cSubRespToXapp)
1011 err := c.rmrSendToXapp("", subs, trans)
1013 xapp.Logger.Error("rmrSendToXapp() failed:%s", err.Error())
1017 case *e2ap.E2APSubscriptionFailure:
1018 themsg.RequestId.Id = trans.RequestId.Id
1019 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
1021 c.UpdateCounter(cSubFailToXapp)
1022 c.rmrSendToXapp("", subs, trans)
1028 xapp.Logger.Debug("XAPP-SubReq: failed %s", idstring(err, trans, subs))
1031 //-------------------------------------------------------------------
1032 // handle from XAPP Subscription Delete Request
1033 //------------------------------------------------------------------
1034 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
1035 xapp.Logger.Debug("MSG from XAPP: %s", params.String())
1036 c.UpdateCounter(cSubDelReqFromXapp)
1038 if c.e2IfState.IsE2ConnectionUp(¶ms.Meid.RanName) == false {
1039 xapp.Logger.Error("No E2 connection for ranName %v", params.Meid.RanName)
1043 subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
1045 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
1049 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
1051 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
1054 defer trans.Release()
1056 err = c.tracker.Track(trans)
1058 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
1062 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
1064 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
1071 subs.OngoingDelCount++
1072 go c.handleSubscriptionDelete(subs, trans, waitRouteCleanup_ms)
1073 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
1074 subs.OngoingDelCount--
1076 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
1078 if subs.NoRespToXapp == true {
1079 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
1080 xapp.Logger.Debug("XAPP-SubDelReq: subs.NoRespToXapp == true")
1084 // Whatever is received success, fail or timeout, send successful delete response
1085 subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
1086 subDelRespMsg.RequestId.Id = trans.RequestId.Id
1087 subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
1088 subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
1089 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
1091 c.UpdateCounter(cSubDelRespToXapp)
1092 err := c.rmrSendToXapp("", subs, trans)
1094 xapp.Logger.Error("rmrSendToXapp() failed:%s", err.Error())
1099 //-------------------------------------------------------------------
1100 // SUBS CREATE Handling
1101 //-------------------------------------------------------------------
1102 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp, e2SubscriptionDirectives *E2SubscriptionDirectives, waitRouteCleanupTime time.Duration) {
1104 var event interface{} = nil
1105 var removeSubscriptionFromDb bool = false
1106 trans := c.tracker.NewSubsTransaction(subs)
1107 subs.WaitTransactionTurn(trans)
1108 defer subs.ReleaseTransactionTurn(trans)
1109 defer trans.Release()
1111 xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
1113 subRfMsg, valid := subs.GetCachedResponse()
1114 if subRfMsg == nil && valid == true {
1115 event = c.sendE2TSubscriptionRequest(subs, trans, parentTrans, e2SubscriptionDirectives)
1116 switch event.(type) {
1117 case *e2ap.E2APSubscriptionResponse:
1118 subRfMsg, valid = subs.SetCachedResponse(event, true)
1119 subs.SubRespRcvd = true
1120 case *e2ap.E2APSubscriptionFailure:
1121 subRfMsg, valid = subs.SetCachedResponse(event, false)
1122 xapp.Logger.Debug("SUBS-SubReq: internal delete due failure event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1123 case *SubmgrRestartTestEvent:
1124 // This is used to simulate that no response has been received and after restart, subscriptions are restored from db
1125 xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
1126 subRfMsg, valid = subs.SetCachedResponse(event, false)
1127 parentTrans.SendEvent(subRfMsg, 0)
1129 case *PackSubscriptionRequestErrortEvent, *SDLWriteErrortEvent:
1130 subRfMsg, valid = subs.SetCachedResponse(event, false)
1133 if subs.PolicyUpdate == false {
1134 xapp.Logger.Debug("SUBS-SubReq: internal delete due default event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1135 subRfMsg, valid = subs.SetCachedResponse(nil, false)
1136 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1138 subRfMsg, valid = subs.SetCachedResponse(nil, true)
1141 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
1143 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
1146 removeSubscriptionFromDb = true
1149 err := c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
1152 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1156 // Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
1158 err = c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanupTime, c)
1160 xapp.Logger.Error("RemoveFromSubscription() failed:%s", err.Error())
1164 parentTrans.SendEvent(subRfMsg, 0)
1167 //-------------------------------------------------------------------
1168 // SUBS DELETE Handling
1169 //-------------------------------------------------------------------
1171 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp, waitRouteCleanupTime time.Duration) {
1173 trans := c.tracker.NewSubsTransaction(subs)
1174 subs.WaitTransactionTurn(trans)
1175 defer subs.ReleaseTransactionTurn(trans)
1176 defer trans.Release()
1178 xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
1182 if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
1185 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1190 // Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
1191 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanupTime, c)
1192 parentTrans.SendEvent(nil, 0)
1195 //-------------------------------------------------------------------
1196 // send to E2T Subscription Request
1197 //-------------------------------------------------------------------
1198 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp, e2SubscriptionDirectives *E2SubscriptionDirectives) interface{} {
1200 var event interface{} = nil
1201 var timedOut bool = false
1202 const ricRequestorId = 123
1204 subReqMsg := subs.SubReqMsg
1205 subReqMsg.RequestId = subs.GetReqId().RequestId
1206 subReqMsg.RequestId.Id = ricRequestorId
1207 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
1209 xapp.Logger.Error("SUBS-SubReq ASN1 pack error: %s", idstring(err, trans, subs, parentTrans))
1210 return &PackSubscriptionRequestErrortEvent{
1212 ErrorSource: models.SubscriptionInstanceErrorSourceASN1,
1213 ErrorCause: err.Error(),
1218 // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
1219 err = c.WriteSubscriptionToDb(subs)
1221 return &SDLWriteErrortEvent{
1223 ErrorSource: models.SubscriptionInstanceErrorSourceDBAAS,
1224 ErrorCause: err.Error(),
1229 for retries := int64(0); retries < e2SubscriptionDirectives.E2MaxTryCount; retries++ {
1230 desc := fmt.Sprintf("(retry %d)", retries)
1232 c.UpdateCounter(cSubReqToE2)
1234 c.UpdateCounter(cSubReReqToE2)
1236 err := c.rmrSendToE2T(desc, subs, trans)
1238 xapp.Logger.Error("rmrSendToE2T() failed:%s", err.Error())
1241 if subs.DoNotWaitSubResp == false {
1242 event, timedOut = trans.WaitEvent(e2SubscriptionDirectives.E2TimeoutTimerValue)
1244 c.UpdateCounter(cSubReqTimerExpiry)
1248 // Simulating case where subscrition request has been sent but response has not been received before restart
1249 event = &SubmgrRestartTestEvent{}
1250 xapp.Logger.Debug("Restart event, DoNotWaitSubResp == true")
1254 xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1258 //-------------------------------------------------------------------
1259 // send to E2T Subscription Delete Request
1260 //-------------------------------------------------------------------
1262 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
1264 var event interface{}
1266 const ricRequestorId = 123
1268 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1269 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1270 subDelReqMsg.RequestId.Id = ricRequestorId
1271 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1272 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1274 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
1278 for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
1279 desc := fmt.Sprintf("(retry %d)", retries)
1281 c.UpdateCounter(cSubDelReqToE2)
1283 c.UpdateCounter(cSubDelReReqToE2)
1285 err := c.rmrSendToE2T(desc, subs, trans)
1287 xapp.Logger.Error("SUBS-SubDelReq: rmrSendToE2T failure: %s", idstring(err, trans, subs, parentTrans))
1289 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
1291 c.UpdateCounter(cSubDelReqTimerExpiry)
1296 xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1300 //-------------------------------------------------------------------
1301 // handle from E2T Subscription Response
1302 //-------------------------------------------------------------------
1303 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
1304 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1305 c.UpdateCounter(cSubRespFromE2)
1307 subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
1309 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
1312 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
1314 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
1317 trans := subs.GetTransaction()
1319 err = fmt.Errorf("Ongoing transaction not found")
1320 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
1323 xapp.Logger.Debug("SUBS-SubResp: Sending event, trans= %v", trans)
1324 sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
1325 if sendOk == false {
1326 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1327 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
1332 //-------------------------------------------------------------------
1333 // handle from E2T Subscription Failure
1334 //-------------------------------------------------------------------
1335 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
1336 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1337 c.UpdateCounter(cSubFailFromE2)
1338 subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
1340 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
1343 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
1345 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
1348 trans := subs.GetTransaction()
1350 err = fmt.Errorf("Ongoing transaction not found")
1351 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
1354 sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
1355 if sendOk == false {
1356 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1357 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
1362 //-------------------------------------------------------------------
1363 // handle from E2T Subscription Delete Response
1364 //-------------------------------------------------------------------
1365 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) {
1366 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1367 c.UpdateCounter(cSubDelRespFromE2)
1368 subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
1370 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1373 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
1375 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1378 trans := subs.GetTransaction()
1380 err = fmt.Errorf("Ongoing transaction not found")
1381 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
1384 sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
1385 if sendOk == false {
1386 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1387 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
1392 //-------------------------------------------------------------------
1393 // handle from E2T Subscription Delete Failure
1394 //-------------------------------------------------------------------
1395 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
1396 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1397 c.UpdateCounter(cSubDelFailFromE2)
1398 subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
1400 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1403 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
1405 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1408 trans := subs.GetTransaction()
1410 err = fmt.Errorf("Ongoing transaction not found")
1411 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
1414 sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
1415 if sendOk == false {
1416 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1417 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
1422 //-------------------------------------------------------------------
1424 //-------------------------------------------------------------------
1425 func typeofSubsMessage(v interface{}) string {
1430 //case *e2ap.E2APSubscriptionRequest:
1432 case *e2ap.E2APSubscriptionResponse:
1434 case *e2ap.E2APSubscriptionFailure:
1436 //case *e2ap.E2APSubscriptionDeleteRequest:
1437 // return "SubDelReq"
1438 case *e2ap.E2APSubscriptionDeleteResponse:
1440 case *e2ap.E2APSubscriptionDeleteFailure:
1447 //-------------------------------------------------------------------
1449 //-------------------------------------------------------------------
1450 func (c *Control) WriteSubscriptionToDb(subs *Subscription) error {
1451 xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
1452 err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
1454 xapp.Logger.Error("%v", err)
1460 //-------------------------------------------------------------------
1462 //-------------------------------------------------------------------
1463 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) error {
1465 if removeSubscriptionFromDb == true {
1466 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1467 c.RemoveSubscriptionFromDb(subs)
1469 // Update is needed for successful response and merge case here
1470 if subs.RetryFromXapp == false {
1471 err := c.WriteSubscriptionToDb(subs)
1475 subs.RetryFromXapp = false
1479 //-------------------------------------------------------------------
1481 //-------------------------------------------------------------------
1482 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
1483 xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
1484 err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
1486 xapp.Logger.Error("%v", err)
1490 //-------------------------------------------------------------------
1492 //-------------------------------------------------------------------
1493 func (c *Control) WriteRESTSubscriptionToDb(restSubId string, restSubs *RESTSubscription) {
1494 xapp.Logger.Debug("WriteRESTSubscriptionToDb() restSubId = %s", restSubId)
1495 err := c.WriteRESTSubscriptionToSdl(restSubId, restSubs)
1497 xapp.Logger.Error("%v", err)
1501 //-------------------------------------------------------------------
1503 //-------------------------------------------------------------------
1504 func (c *Control) UpdateRESTSubscriptionInDB(restSubId string, restSubs *RESTSubscription, removeRestSubscriptionFromDb bool) {
1506 if removeRestSubscriptionFromDb == true {
1507 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1508 c.RemoveRESTSubscriptionFromDb(restSubId)
1510 c.WriteRESTSubscriptionToDb(restSubId, restSubs)
1514 //-------------------------------------------------------------------
1516 //-------------------------------------------------------------------
1517 func (c *Control) RemoveRESTSubscriptionFromDb(restSubId string) {
1518 xapp.Logger.Debug("RemoveRESTSubscriptionFromDb() restSubId = %s", restSubId)
1519 err := c.RemoveRESTSubscriptionFromSdl(restSubId)
1521 xapp.Logger.Error("%v", err)
1525 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) {
1527 if c.UTTesting == true {
1528 // Reqistry mutex is not locked after real restart but it can be when restart is simulated in unit tests
1529 c.registry.mutex = new(sync.Mutex)
1532 const ricRequestorId = 123
1533 xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
1535 // Send delete for every endpoint in the subscription
1536 if subs.PolicyUpdate == false {
1537 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1538 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1539 subDelReqMsg.RequestId.Id = ricRequestorId
1540 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1541 mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1543 xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
1546 for _, endPoint := range subs.EpList.Endpoints {
1547 params := &xapp.RMRParams{}
1548 params.Mtype = mType
1549 params.SubId = int(subs.GetReqId().InstanceId)
1551 params.Meid = subs.Meid
1552 params.Src = endPoint.String()
1553 params.PayloadLen = len(payload.Buf)
1554 params.Payload = payload.Buf
1556 subs.DeleteFromDb = true
1557 c.handleXAPPSubscriptionDeleteRequest(params)
1562 func (c *Control) PrintRESTSubscriptionRequest(p *models.SubscriptionParams) {
1564 fmt.Println("CRESTSubscriptionRequest")
1570 if p.SubscriptionID != "" {
1571 fmt.Println(" SubscriptionID = ", p.SubscriptionID)
1573 fmt.Println(" SubscriptionID = ''")
1576 fmt.Printf(" ClientEndpoint.Host = %s\n", p.ClientEndpoint.Host)
1578 if p.ClientEndpoint.HTTPPort != nil {
1579 fmt.Printf(" ClientEndpoint.HTTPPort = %v\n", *p.ClientEndpoint.HTTPPort)
1581 fmt.Println(" ClientEndpoint.HTTPPort = nil")
1584 if p.ClientEndpoint.RMRPort != nil {
1585 fmt.Printf(" ClientEndpoint.RMRPort = %v\n", *p.ClientEndpoint.RMRPort)
1587 fmt.Println(" ClientEndpoint.RMRPort = nil")
1591 fmt.Printf(" Meid = %s\n", *p.Meid)
1593 fmt.Println(" Meid = nil")
1596 if p.E2SubscriptionDirectives == nil {
1597 fmt.Println(" E2SubscriptionDirectives = nil")
1599 fmt.Println(" E2SubscriptionDirectives")
1600 if p.E2SubscriptionDirectives.E2RetryCount == nil {
1601 fmt.Println(" E2RetryCount == nil")
1603 fmt.Printf(" E2RetryCount = %v\n", *p.E2SubscriptionDirectives.E2RetryCount)
1605 fmt.Printf(" E2TimeoutTimerValue = %v\n", p.E2SubscriptionDirectives.E2TimeoutTimerValue)
1606 fmt.Printf(" RMRRoutingNeeded = %v\n", p.E2SubscriptionDirectives.RMRRoutingNeeded)
1608 for _, subscriptionDetail := range p.SubscriptionDetails {
1609 if p.RANFunctionID != nil {
1610 fmt.Printf(" RANFunctionID = %v\n", *p.RANFunctionID)
1612 fmt.Println(" RANFunctionID = nil")
1614 fmt.Printf(" SubscriptionDetail.XappEventInstanceID = %v\n", *subscriptionDetail.XappEventInstanceID)
1615 fmt.Printf(" SubscriptionDetail.EventTriggers = %v\n", subscriptionDetail.EventTriggers)
1617 for _, actionToBeSetup := range subscriptionDetail.ActionToBeSetupList {
1618 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionID = %v\n", *actionToBeSetup.ActionID)
1619 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionType = %s\n", *actionToBeSetup.ActionType)
1620 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionDefinition = %v\n", actionToBeSetup.ActionDefinition)
1622 if actionToBeSetup.SubsequentAction != nil {
1623 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.SubsequentAction.SubsequentActionType = %s\n", *actionToBeSetup.SubsequentAction.SubsequentActionType)
1624 fmt.Printf(" SubscriptionDetail.ActionToBeSetup..SubsequentAction.TimeToWait = %s\n", *actionToBeSetup.SubsequentAction.TimeToWait)
1626 fmt.Println(" SubscriptionDetail.ActionToBeSetup.SubsequentAction = nil")