2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
28 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
29 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
30 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
31 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/common"
32 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
33 httptransport "github.com/go-openapi/runtime/client"
34 "github.com/go-openapi/strfmt"
35 "github.com/segmentio/ksuid"
36 "github.com/spf13/viper"
39 //-----------------------------------------------------------------------------
41 //-----------------------------------------------------------------------------
43 func idstring(err error, entries ...fmt.Stringer) string {
44 var retval string = ""
45 var filler string = ""
46 for _, entry := range entries {
48 retval += filler + entry.String()
51 retval += filler + "(NIL)"
55 retval += filler + "err(" + err.Error() + ")"
61 //-----------------------------------------------------------------------------
63 //-----------------------------------------------------------------------------
65 var e2tSubReqTimeout time.Duration
66 var e2tSubDelReqTime time.Duration
67 var e2tRecvMsgTimeout time.Duration
68 var waitRouteCleanup_ms time.Duration
69 var e2tMaxSubReqTryCount uint64 // Initial try + retry
70 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
71 var checkE2State string
72 var readSubsFromDb string
73 var dbRetryForever string
81 restDuplicateCtrl *DuplicateCtrl
83 e2IfStateDb XappRnibInterface
85 restSubsDb Sdlnterface
88 Counters map[string]xapp.Counter
99 type SubmgrRestartTestEvent struct{}
100 type SubmgrRestartUpEvent struct{}
101 type PackSubscriptionRequestErrortEvent struct {
105 func (p *PackSubscriptionRequestErrortEvent) SetEvent(errorInfo *ErrorInfo) {
106 p.ErrorInfo = *errorInfo
109 type SDLWriteErrortEvent struct {
113 func (s *SDLWriteErrortEvent) SetEvent(errorInfo *ErrorInfo) {
114 s.ErrorInfo = *errorInfo
118 xapp.Logger.Debug("SUBMGR")
120 viper.SetEnvPrefix("submgr")
121 viper.AllowEmptyEnv(true)
124 func NewControl() *Control {
126 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
127 rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
129 registry := new(Registry)
130 registry.Initialize()
131 registry.rtmgrClient = &rtmgrClient
133 tracker := new(Tracker)
136 restDuplicateCtrl := new(DuplicateCtrl)
137 restDuplicateCtrl.Init()
139 e2IfState := new(E2IfState)
141 c := &Control{e2ap: new(E2ap),
144 restDuplicateCtrl: restDuplicateCtrl,
145 e2IfState: e2IfState,
146 e2IfStateDb: CreateXappRnibIfInstance(),
147 e2SubsDb: CreateSdl(),
148 restSubsDb: CreateRESTSdl(),
149 Counters: xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
154 c.ReadConfigParameters("")
156 // Register REST handler for testing support
157 xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET")
158 xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
159 xapp.Resource.InjectRoute("/ric/v1/restsubscriptions", c.GetAllRestSubscriptions, "GET")
161 xapp.Resource.InjectRoute("/ric/v1/get_all_e2nodes", c.GetAllE2Nodes, "GET")
162 xapp.Resource.InjectRoute("/ric/v1/get_e2node_rest_subscriptions/{ranName}", c.GetAllE2NodeRestSubscriptions, "GET")
164 xapp.Resource.InjectRoute("/ric/v1/get_all_xapps", c.GetAllXapps, "GET")
165 xapp.Resource.InjectRoute("/ric/v1/get_xapp_rest_restsubscriptions/{xappServiceName}", c.GetAllXappRestSubscriptions, "GET")
166 xapp.Resource.InjectRoute("/ric/v1/get_e2subscriptions/{restId}", c.GetE2Subscriptions, "GET")
168 xapp.Resource.InjectRoute("/ric/v1/delete_all_e2node_subscriptions/{ranName}", c.DeleteAllE2nodeSubscriptions, "DELETE")
169 xapp.Resource.InjectRoute("/ric/v1/delete_all_xapp_subscriptions/{xappServiceName}", c.DeleteAllXappSubscriptions, "DELETE")
171 if readSubsFromDb == "true" {
172 // Read subscriptions from db
173 err := c.ReadE2Subscriptions()
175 xapp.Logger.Error("ReadE2Subscriptions() failed %s", err.Error())
177 err = c.ReadRESTSubscriptions()
179 xapp.Logger.Error("ReadRESTSubscriptions() failed %s", err.Error())
184 err := xapp.Subscription.Listen(c.RESTSubscriptionHandler, c.RESTQueryHandler, c.RESTSubscriptionDeleteHandler)
186 xapp.Logger.Error("xapp.Subscription.Listen failure: %s", err.Error())
192 func (c *Control) SymptomDataHandler(w http.ResponseWriter, r *http.Request) {
193 subscriptions, err := c.registry.QueryHandler()
195 xapp.Logger.Error("QueryHandler() failed %s", err.Error())
198 xapp.Resource.SendSymptomDataJson(w, r, subscriptions, "platform/subscriptions.json")
201 //-------------------------------------------------------------------
203 //-------------------------------------------------------------------
204 func (c *Control) RESTQueryHandler() (models.SubscriptionList, error) {
205 xapp.Logger.Debug("RESTQueryHandler() called")
209 return c.registry.QueryHandler()
212 //-------------------------------------------------------------------
214 //-------------------------------------------------------------------
215 func (c *Control) ReadE2Subscriptions() error {
218 var register map[uint32]*Subscription
219 for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
220 xapp.Logger.Debug("Reading E2 subscriptions from db")
221 subIds, register, err = c.ReadAllSubscriptionsFromSdl()
223 xapp.Logger.Error("%v", err)
224 <-time.After(1 * time.Second)
226 c.registry.subIds = subIds
227 c.registry.register = register
228 go c.HandleUncompletedSubscriptions(register)
232 xapp.Logger.Debug("Continuing without retring")
236 //-------------------------------------------------------------------
238 //-------------------------------------------------------------------
239 func (c *Control) ReadRESTSubscriptions() error {
241 xapp.Logger.Debug("ReadRESTSubscriptions()")
243 var restSubscriptions map[string]*RESTSubscription
244 for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
245 xapp.Logger.Debug("Reading REST subscriptions from db")
246 restSubscriptions, err = c.ReadAllRESTSubscriptionsFromSdl()
248 xapp.Logger.Error("%v", err)
249 <-time.After(1 * time.Second)
251 // Fix REST subscriptions ongoing status after restart
252 for restSubId, restSubscription := range restSubscriptions {
253 restSubscription.SubReqOngoing = false
254 restSubscription.SubDelReqOngoing = false
255 err := c.WriteRESTSubscriptionToSdl(restSubId, restSubscription)
257 xapp.Logger.Error("WriteRESTSubscriptionToSdl() failed:%s", err.Error())
260 c.registry.restSubscriptions = restSubscriptions
264 xapp.Logger.Debug("Continuing without retring")
268 //-------------------------------------------------------------------
270 //-------------------------------------------------------------------
271 func (c *Control) ReadConfigParameters(f string) {
273 xapp.Logger.Debug("ReadConfigParameters")
275 c.LoggerLevel = int(xapp.Logger.GetLevel())
276 xapp.Logger.Info("LoggerLevel = %v", c.LoggerLevel)
277 c.e2ap.SetASN1DebugPrintStatus(c.LoggerLevel)
279 // viper.GetDuration returns nanoseconds
280 e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
281 if e2tSubReqTimeout == 0 {
282 e2tSubReqTimeout = 2000 * 1000000
283 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tSubReqTimeout")
285 xapp.Logger.Debug("e2tSubReqTimeout= %v", e2tSubReqTimeout)
287 e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
288 if e2tSubDelReqTime == 0 {
289 e2tSubDelReqTime = 2000 * 1000000
290 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tSubDelReqTime")
292 xapp.Logger.Debug("e2tSubDelReqTime= %v", e2tSubDelReqTime)
294 e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
295 if e2tRecvMsgTimeout == 0 {
296 e2tRecvMsgTimeout = 2000 * 1000000
297 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tRecvMsgTimeout")
299 xapp.Logger.Debug("e2tRecvMsgTimeout= %v", e2tRecvMsgTimeout)
301 e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
302 if e2tMaxSubReqTryCount == 0 {
303 e2tMaxSubReqTryCount = 1
304 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tMaxSubReqTryCount")
306 xapp.Logger.Debug("e2tMaxSubReqTryCount= %v", e2tMaxSubReqTryCount)
308 e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
309 if e2tMaxSubDelReqTryCount == 0 {
310 e2tMaxSubDelReqTryCount = 1
311 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tMaxSubDelReqTryCount")
313 xapp.Logger.Debug("e2tMaxSubDelReqTryCount= %v", e2tMaxSubDelReqTryCount)
315 checkE2State = viper.GetString("controls.checkE2State")
316 if checkE2State == "" {
317 checkE2State = "true"
318 xapp.Logger.Debug("WARNING: Using hard coded default value for checkE2State")
320 xapp.Logger.Debug("checkE2State= %v", checkE2State)
322 readSubsFromDb = viper.GetString("controls.readSubsFromDb")
323 if readSubsFromDb == "" {
324 readSubsFromDb = "true"
325 xapp.Logger.Debug("WARNING: Using hard coded default value for readSubsFromDb")
327 xapp.Logger.Debug("readSubsFromDb= %v", readSubsFromDb)
329 dbTryCount = viper.GetInt("controls.dbTryCount")
332 xapp.Logger.Debug("WARNING: Using hard coded default value for dbTryCount")
334 xapp.Logger.Debug("dbTryCount= %v", dbTryCount)
336 dbRetryForever = viper.GetString("controls.dbRetryForever")
337 if dbRetryForever == "" {
338 dbRetryForever = "true"
339 xapp.Logger.Debug("WARNING: Using hard coded default value for dbRetryForever")
341 xapp.Logger.Debug("dbRetryForever= %v", dbRetryForever)
343 // Internal cfg parameter, used to define a wait time for RMR route clean-up. None default
344 // value 100ms used currently only in unittests.
345 waitRouteCleanup_ms = viper.GetDuration("controls.waitRouteCleanup_ms") * 1000000
346 if waitRouteCleanup_ms == 0 {
347 waitRouteCleanup_ms = 5000 * 1000000
348 xapp.Logger.Debug("WARNING: Using hard coded default value for waitRouteCleanup_ms")
350 xapp.Logger.Debug("waitRouteCleanup= %v", waitRouteCleanup_ms)
353 //-------------------------------------------------------------------
355 //-------------------------------------------------------------------
356 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
358 xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
359 for subId, subs := range register {
360 if subs.SubRespRcvd == false {
361 // If policy subscription has already been made successfully unsuccessful update should not be deleted.
362 if subs.PolicyUpdate == false {
363 subs.NoRespToXapp = true
364 xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
365 c.SendSubscriptionDeleteReq(subs, false)
371 func (c *Control) ReadyCB(data interface{}) {
372 if c.RMRClient == nil {
373 c.RMRClient = xapp.Rmr
377 func (c *Control) Run() {
378 xapp.SetReadyCB(c.ReadyCB, nil)
379 xapp.AddConfigChangeListener(c.ReadConfigParameters)
383 //-------------------------------------------------------------------
385 //-------------------------------------------------------------------
386 func (c *Control) GetOrCreateRestSubscription(p *models.SubscriptionParams, md5sum string, xAppRmrEndpoint string, xAppServiceName string) (*RESTSubscription, string, error) {
389 var restSubscription *RESTSubscription
392 prevRestSubsId, exists := c.restDuplicateCtrl.GetLastKnownRestSubsIdBasedOnMd5sum(md5sum)
393 if p.SubscriptionID == "" {
394 // Subscription does not contain REST subscription Id
396 restSubscription, err = c.registry.GetRESTSubscription(prevRestSubsId, false)
397 if restSubscription != nil {
398 // Subscription not found
399 restSubId = prevRestSubsId
401 xapp.Logger.Debug("Existing restSubId %s found by MD5sum %s for a request without subscription ID - using previous subscription", prevRestSubsId, md5sum)
403 xapp.Logger.Debug("Existing restSubId %s found by MD5sum %s for a request without subscription ID - Note: %s", prevRestSubsId, md5sum, err.Error())
406 xapp.Logger.Debug("None existing restSubId %s referred by MD5sum %s for a request without subscription ID - deleting cached entry", prevRestSubsId, md5sum)
407 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
411 if restSubscription == nil {
412 restSubId = ksuid.New().String()
413 restSubscription = c.registry.CreateRESTSubscription(&restSubId, &xAppServiceName, &xAppRmrEndpoint, p.Meid)
416 // Subscription contains REST subscription Id
417 restSubId = p.SubscriptionID
419 xapp.Logger.Debug("RestSubscription ID %s provided via REST request", restSubId)
420 restSubscription, err = c.registry.GetRESTSubscription(restSubId, false)
422 // Subscription with id in REST request does not exist
423 xapp.Logger.Error("%s", err.Error())
424 c.UpdateCounter(cRestSubFailToXapp)
429 xapp.Logger.Debug("Existing restSubscription found for ID %s, new request based on md5sum", restSubId)
431 xapp.Logger.Debug("Existing restSubscription found for ID %s(%s), re-transmission based on md5sum match with previous request", prevRestSubsId, restSubId)
435 return restSubscription, restSubId, nil
438 //-------------------------------------------------------------------
440 //-------------------------------------------------------------------
441 func (c *Control) RESTSubscriptionHandler(params interface{}) (*models.SubscriptionResponse, int) {
444 c.UpdateCounter(cRestSubReqFromXapp)
446 subResp := models.SubscriptionResponse{}
447 p := params.(*models.SubscriptionParams)
449 if c.LoggerLevel > 2 {
450 c.PrintRESTSubscriptionRequest(p)
453 if c.e2IfState.IsE2ConnectionUp(p.Meid) == false || c.e2IfState.IsE2ConnectionUnderReset(p.Meid) == true {
454 if c.e2IfState.IsE2ConnectionUp(p.Meid) == false {
455 xapp.Logger.Error("No E2 connection for ranName %v", *p.Meid)
456 } else if c.e2IfState.IsE2ConnectionUnderReset(p.Meid) == true {
457 xapp.Logger.Error("E2 Node for ranName %v UNDER RESET", *p.Meid)
459 c.UpdateCounter(cRestReqRejDueE2Down)
460 return nil, common.SubscribeServiceUnavailableCode
463 if p.ClientEndpoint == nil {
464 err := fmt.Errorf("ClientEndpoint == nil")
465 xapp.Logger.Error("%v", err)
466 c.UpdateCounter(cRestSubFailToXapp)
467 return nil, common.SubscribeBadRequestCode
470 e2SubscriptionDirectives, err := c.GetE2SubscriptionDirectives(p)
472 xapp.Logger.Error("%s", err)
473 c.UpdateCounter(cRestSubFailToXapp)
474 return nil, common.SubscribeBadRequestCode
476 _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*p.ClientEndpoint)
478 xapp.Logger.Error("%s", err.Error())
479 c.UpdateCounter(cRestSubFailToXapp)
480 return nil, common.SubscribeBadRequestCode
483 md5sum, err := CalculateRequestMd5sum(params)
485 xapp.Logger.Error("Failed to generate md5sum from incoming request - %s", err.Error())
488 restSubscription, restSubId, err := c.GetOrCreateRestSubscription(p, md5sum, xAppRmrEndpoint, p.ClientEndpoint.Host)
490 xapp.Logger.Error("Subscription with id in REST request does not exist")
491 return nil, common.SubscribeNotFoundCode
494 subResp.SubscriptionID = &restSubId
495 subReqList := e2ap.SubscriptionRequestList{}
496 err = c.e2ap.FillSubscriptionReqMsgs(params, &subReqList, restSubscription)
498 xapp.Logger.Error("%s", err.Error())
499 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
500 c.registry.DeleteRESTSubscription(&restSubId)
501 c.UpdateCounter(cRestSubFailToXapp)
502 return nil, common.SubscribeBadRequestCode
505 duplicate := c.restDuplicateCtrl.IsDuplicateToOngoingTransaction(restSubId, md5sum)
507 err := fmt.Errorf("Retransmission blocker direct ACK for request of restSubsId %s restSubId MD5sum %s as retransmission", restSubId, md5sum)
508 xapp.Logger.Debug("%s", err)
509 c.registry.DeleteRESTSubscription(&restSubId)
510 c.UpdateCounter(cRestSubRespToXapp)
511 return &subResp, common.SubscribeCreatedCode
514 c.WriteRESTSubscriptionToDb(restSubId, restSubscription)
515 go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId, xAppRmrEndpoint, md5sum, e2SubscriptionDirectives)
517 c.UpdateCounter(cRestSubRespToXapp)
518 return &subResp, common.SubscribeCreatedCode
521 //-------------------------------------------------------------------
523 //-------------------------------------------------------------------
524 func (c *Control) GetE2SubscriptionDirectives(p *models.SubscriptionParams) (*E2SubscriptionDirectives, error) {
526 e2SubscriptionDirectives := &E2SubscriptionDirectives{}
527 if p == nil || p.E2SubscriptionDirectives == nil {
528 e2SubscriptionDirectives.E2TimeoutTimerValue = e2tSubReqTimeout
529 e2SubscriptionDirectives.E2MaxTryCount = int64(e2tMaxSubReqTryCount)
530 e2SubscriptionDirectives.CreateRMRRoute = true
531 xapp.Logger.Debug("p == nil || p.E2SubscriptionDirectives == nil. Using default values for E2TimeoutTimerValue = %v and E2RetryCount = %v RMRRoutingNeeded = true", e2tSubReqTimeout, e2tMaxSubReqTryCount)
533 if p.E2SubscriptionDirectives.E2TimeoutTimerValue >= 1 && p.E2SubscriptionDirectives.E2TimeoutTimerValue <= 10 {
534 e2SubscriptionDirectives.E2TimeoutTimerValue = time.Duration(p.E2SubscriptionDirectives.E2TimeoutTimerValue) * 1000000000 // Duration type cast returns nano seconds
536 return nil, fmt.Errorf("p.E2SubscriptionDirectives.E2TimeoutTimerValue out of range (1-10 seconds): %v", p.E2SubscriptionDirectives.E2TimeoutTimerValue)
538 if p.E2SubscriptionDirectives.E2RetryCount == nil {
539 xapp.Logger.Error("p.E2SubscriptionDirectives.E2RetryCount == nil. Using default value")
540 e2SubscriptionDirectives.E2MaxTryCount = int64(e2tMaxSubReqTryCount)
542 if *p.E2SubscriptionDirectives.E2RetryCount >= 0 && *p.E2SubscriptionDirectives.E2RetryCount <= 10 {
543 e2SubscriptionDirectives.E2MaxTryCount = *p.E2SubscriptionDirectives.E2RetryCount + 1 // E2MaxTryCount = First sending plus two retries
545 return nil, fmt.Errorf("p.E2SubscriptionDirectives.E2RetryCount out of range (0-10): %v", *p.E2SubscriptionDirectives.E2RetryCount)
548 e2SubscriptionDirectives.CreateRMRRoute = p.E2SubscriptionDirectives.RMRRoutingNeeded
550 xapp.Logger.Debug("e2SubscriptionDirectives.E2TimeoutTimerValue: %v", e2SubscriptionDirectives.E2TimeoutTimerValue)
551 xapp.Logger.Debug("e2SubscriptionDirectives.E2MaxTryCount: %v", e2SubscriptionDirectives.E2MaxTryCount)
552 xapp.Logger.Debug("e2SubscriptionDirectives.CreateRMRRoute: %v", e2SubscriptionDirectives.CreateRMRRoute)
553 return e2SubscriptionDirectives, nil
556 //-------------------------------------------------------------------
558 //-------------------------------------------------------------------
560 func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription, subReqList *e2ap.SubscriptionRequestList,
561 clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string, xAppRmrEndpoint string, md5sum string, e2SubscriptionDirectives *E2SubscriptionDirectives) {
563 c.SubscriptionProcessingStartDelay()
564 xapp.Logger.Debug("E2 SubscriptionRequest count = %v ", len(subReqList.E2APSubscriptionRequests))
566 var xAppEventInstanceID int64
567 var e2EventInstanceID int64
568 errorInfo := &ErrorInfo{}
570 defer c.restDuplicateCtrl.SetMd5sumFromLastOkRequest(*restSubId, md5sum)
572 for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ {
573 subReqMsg := subReqList.E2APSubscriptionRequests[index]
574 xAppEventInstanceID = (int64)(subReqMsg.RequestId.Id)
576 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), *restSubId, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid})
578 // Send notification to xApp that prosessing of a Subscription Request has failed.
579 err := fmt.Errorf("Tracking failure")
580 errorInfo.ErrorCause = err.Error()
581 c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans, errorInfo)
585 xapp.Logger.Debug("Handle SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
587 subRespMsg, errorInfo, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, *restSubId, e2SubscriptionDirectives)
589 xapp.Logger.Debug("Handled SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
593 if err.Error() == "TEST: restart event received" {
594 // This is just for UT cases. Stop here subscription processing
597 c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans, errorInfo)
599 e2EventInstanceID = (int64)(subRespMsg.RequestId.InstanceId)
600 restSubscription.AddMd5Sum(md5sum)
601 xapp.Logger.Debug("SubscriptionRequest index=%v processed successfullyfor %s. endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
602 index, *restSubId, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
603 c.sendSuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, e2EventInstanceID, clientEndpoint, trans, errorInfo)
608 //-------------------------------------------------------------------
610 //------------------------------------------------------------------
611 func (c *Control) SubscriptionProcessingStartDelay() {
612 if c.UTTesting == true {
613 // This is temporary fix for the UT problem that notification arrives before subscription response
614 // Correct fix would be to allow notification come before response and process it correctly
615 xapp.Logger.Debug("Setting 50 ms delay before starting processing Subscriptions")
616 <-time.After(time.Millisecond * 50)
617 xapp.Logger.Debug("Continuing after delay")
621 //-------------------------------------------------------------------
623 //------------------------------------------------------------------
624 func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, meid *string,
625 restSubId string, e2SubscriptionDirectives *E2SubscriptionDirectives) (*e2ap.E2APSubscriptionResponse, *ErrorInfo, error) {
627 errorInfo := ErrorInfo{}
629 err := c.tracker.Track(trans)
631 xapp.Logger.Error("XAPP-SubReq Tracking error: %s", idstring(err, trans))
632 errorInfo.ErrorCause = err.Error()
633 err = fmt.Errorf("Tracking failure")
634 return nil, &errorInfo, err
637 subs, errorInfo, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c, e2SubscriptionDirectives.CreateRMRRoute)
639 xapp.Logger.Error("XAPP-SubReq Assign error: %s", idstring(err, trans))
640 return nil, &errorInfo, err
646 subs.OngoingReqCount++
647 go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives, 0)
648 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
649 subs.OngoingReqCount--
653 switch themsg := event.(type) {
654 case *e2ap.E2APSubscriptionResponse:
656 if c.e2IfState.IsE2ConnectionUp(meid) == true {
657 errorInfo = c.e2ap.CheckActionNotAdmittedList(xapp.RIC_SUB_RESP, themsg.ActionNotAdmittedList, c)
658 return themsg, &errorInfo, nil
660 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
661 c.RemoveSubscriptionFromDb(subs)
662 err = fmt.Errorf("E2 interface down")
663 errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
665 case *e2ap.E2APSubscriptionFailure:
666 err = fmt.Errorf("RICSubscriptionFailure. E2NodeCause: (Cause:%v, Value %v)", themsg.Cause.Content, themsg.Cause.Value)
667 errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
668 case *PackSubscriptionRequestErrortEvent:
669 err = fmt.Errorf("E2 RICSubscriptionRequest pack failure")
670 errorInfo = themsg.ErrorInfo
671 case *SDLWriteErrortEvent:
672 err = fmt.Errorf("SDL write failure")
673 errorInfo = themsg.ErrorInfo
674 case *SubmgrRestartTestEvent:
675 err = fmt.Errorf("TEST: restart event received")
676 xapp.Logger.Debug("%s", err)
677 return nil, &errorInfo, err
679 err = fmt.Errorf("Unexpected E2 subscription response received")
680 errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
685 err = fmt.Errorf("E2 RICSubscriptionResponse timeout")
686 errorInfo.SetInfo(err.Error(), "", models.SubscriptionInstanceTimeoutTypeE2Timeout)
687 if subs.PolicyUpdate == true {
688 return nil, &errorInfo, err
692 xapp.Logger.Error("XAPP-SubReq E2 subscription failed: %s", idstring(err, trans, subs))
693 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
695 return nil, &errorInfo, err
698 //-------------------------------------------------------------------
700 //-------------------------------------------------------------------
701 func (c *Control) sendUnsuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, err error,
702 clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp, errorInfo *ErrorInfo) {
704 // Send notification to xApp that prosessing of a Subscription Request has failed.
705 e2EventInstanceID := (int64)(0)
706 if errorInfo.ErrorSource == "" {
707 // Submgr is default source of error
708 errorInfo.ErrorSource = models.SubscriptionInstanceErrorSourceSUBMGR
710 resp := &models.SubscriptionResponse{
711 SubscriptionID: restSubId,
712 SubscriptionInstances: []*models.SubscriptionInstance{
713 &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
714 ErrorCause: errorInfo.ErrorCause,
715 ErrorSource: errorInfo.ErrorSource,
716 TimeoutType: errorInfo.TimeoutType,
717 XappEventInstanceID: &xAppEventInstanceID},
720 // Mark REST subscription request processed.
721 restSubscription.SetProcessed(err)
722 c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
724 xapp.Logger.Debug("Sending unsuccessful REST notification: ErrorCause:%s, ErrorSource:%s, TimeoutType:%s, to Endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
725 errorInfo.ErrorCause, errorInfo.ErrorSource, errorInfo.TimeoutType, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
727 xapp.Logger.Debug("Sending unsuccessful REST notification: ErrorCause:%s, ErrorSource:%s, TimeoutType:%s, to Endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v",
728 errorInfo.ErrorCause, errorInfo.ErrorSource, errorInfo.TimeoutType, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID)
731 c.UpdateCounter(cRestSubFailNotifToXapp)
732 err = xapp.Subscription.Notify(resp, *clientEndpoint)
734 xapp.Logger.Error("xapp.Subscription.Notify failed %s", err.Error())
737 // E2 is down. Delete completely processed request safely now
738 if c.e2IfState.IsE2ConnectionUp(&restSubscription.Meid) == false && restSubscription.SubReqOngoing == false {
739 c.registry.DeleteRESTSubscription(restSubId)
740 c.RemoveRESTSubscriptionFromDb(*restSubId)
744 //-------------------------------------------------------------------
746 //-------------------------------------------------------------------
747 func (c *Control) sendSuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, e2EventInstanceID int64,
748 clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp, errorInfo *ErrorInfo) {
750 // Store successfully processed InstanceId for deletion
751 restSubscription.AddE2InstanceId((uint32)(e2EventInstanceID))
752 restSubscription.AddXappIdToE2Id(xAppEventInstanceID, e2EventInstanceID)
754 // Send notification to xApp that a Subscription Request has been processed.
755 resp := &models.SubscriptionResponse{
756 SubscriptionID: restSubId,
757 SubscriptionInstances: []*models.SubscriptionInstance{
758 &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
759 ErrorCause: errorInfo.ErrorCause,
760 ErrorSource: errorInfo.ErrorSource,
761 XappEventInstanceID: &xAppEventInstanceID},
764 // Mark REST subscription request processesd.
765 restSubscription.SetProcessed(nil)
766 c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
767 xapp.Logger.Debug("Sending successful REST notification: ErrorCause:%s, ErrorSource:%s, TimeoutType:%s, to Endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
768 errorInfo.ErrorCause, errorInfo.ErrorSource, errorInfo.TimeoutType, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
769 c.UpdateCounter(cRestSubNotifToXapp)
770 err := xapp.Subscription.Notify(resp, *clientEndpoint)
772 xapp.Logger.Error("xapp.Subscription.Notify failed %s", err.Error())
775 // E2 is down. Delete completely processed request safely now
776 if c.e2IfState.IsE2ConnectionUp(&restSubscription.Meid) == false && restSubscription.SubReqOngoing == false {
777 c.registry.DeleteRESTSubscription(restSubId)
778 c.RemoveRESTSubscriptionFromDb(*restSubId)
782 //-------------------------------------------------------------------
784 //-------------------------------------------------------------------
785 func (c *Control) RESTSubscriptionDeleteHandler(restSubId string) int {
788 c.UpdateCounter(cRestSubDelReqFromXapp)
790 xapp.Logger.Debug("SubscriptionDeleteRequest from XAPP")
792 restSubscription, err := c.registry.GetRESTSubscription(restSubId, true)
794 xapp.Logger.Error("%s", err.Error())
795 if restSubscription == nil {
796 // Subscription was not found
797 c.UpdateCounter(cRestSubDelRespToXapp)
798 return common.UnsubscribeNoContentCode
800 if restSubscription.SubReqOngoing == true {
801 err := fmt.Errorf("Handling of the REST Subscription Request still ongoing %s", restSubId)
802 xapp.Logger.Error("%s", err.Error())
803 c.UpdateCounter(cRestSubDelFailToXapp)
804 return common.UnsubscribeBadRequestCode
805 } else if restSubscription.SubDelReqOngoing == true {
806 // Previous request for same restSubId still ongoing
807 c.UpdateCounter(cRestSubDelRespToXapp)
808 return common.UnsubscribeNoContentCode
813 xAppRmrEndPoint := restSubscription.xAppRmrEndPoint
815 xapp.Logger.Debug("Deleteting handler: processing instances = %v", restSubscription.InstanceIds)
816 for _, instanceId := range restSubscription.InstanceIds {
817 xAppEventInstanceID, err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId, 0)
820 xapp.Logger.Error("%s", err.Error())
822 xapp.Logger.Debug("Deleteting instanceId = %v", instanceId)
823 restSubscription.DeleteXappIdToE2Id(xAppEventInstanceID)
824 restSubscription.DeleteE2InstanceId(instanceId)
826 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(restSubscription.lastReqMd5sum)
827 c.registry.DeleteRESTSubscription(&restSubId)
828 c.RemoveRESTSubscriptionFromDb(restSubId)
831 c.UpdateCounter(cRestSubDelRespToXapp)
832 return common.UnsubscribeNoContentCode
835 //-------------------------------------------------------------------
837 //-------------------------------------------------------------------
838 func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32, waitRouteCleanupTime time.Duration) (int64, error) {
840 var xAppEventInstanceID int64
841 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{instanceId})
843 xapp.Logger.Debug("Subscription Delete Handler subscription for restSubId=%v, E2EventInstanceID=%v not found %s",
844 restSubId, instanceId, idstring(err, nil))
845 return xAppEventInstanceID, nil
848 xAppEventInstanceID = int64(subs.ReqId.Id)
849 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(*endPoint), *restSubId, e2ap.RequestId{subs.ReqId.Id, 0}, &xapp.RMRMeid{RanName: *meid})
851 err := fmt.Errorf("XAPP-SubDelReq transaction not created. restSubId %s, endPoint %s, meid %s, instanceId %v", *restSubId, *endPoint, *meid, instanceId)
852 xapp.Logger.Error("%s", err.Error())
854 defer trans.Release()
856 err = c.tracker.Track(trans)
858 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
859 xapp.Logger.Error("%s", err.Error())
860 return xAppEventInstanceID, &time.ParseError{}
865 subs.OngoingDelCount++
866 go c.handleSubscriptionDelete(subs, trans, waitRouteCleanupTime)
867 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
868 subs.OngoingDelCount--
870 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
872 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
874 return xAppEventInstanceID, nil
877 //-------------------------------------------------------------------
879 //-------------------------------------------------------------------
881 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
882 params := &xapp.RMRParams{}
883 params.Mtype = trans.GetMtype()
884 params.SubId = int(subs.GetReqId().InstanceId)
886 params.Meid = subs.GetMeid()
888 params.PayloadLen = len(trans.Payload.Buf)
889 params.Payload = trans.Payload.Buf
891 xapp.Logger.Debug("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
892 err = c.SendWithRetry(params, false, 5)
894 xapp.Logger.Error("rmrSendToE2T: Send failed: %+v", err)
899 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
901 params := &xapp.RMRParams{}
902 params.Mtype = trans.GetMtype()
903 params.SubId = int(subs.GetReqId().InstanceId)
904 params.Xid = trans.GetXid()
905 params.Meid = trans.GetMeid()
907 params.PayloadLen = len(trans.Payload.Buf)
908 params.Payload = trans.Payload.Buf
910 xapp.Logger.Debug("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
911 err = c.SendWithRetry(params, false, 5)
913 xapp.Logger.Error("rmrSendToXapp: Send failed: %+v", err)
918 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
919 if c.RMRClient == nil {
920 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
921 xapp.Logger.Error("%s", err.Error())
926 defer c.RMRClient.Free(msg.Mbuf)
928 // xapp-frame might use direct access to c buffer and
929 // when msg.Mbuf is freed, someone might take it into use
930 // and payload data might be invalid inside message handle function
932 // subscriptions won't load system a lot so there is no
933 // real performance hit by cloning buffer into new go byte slice
934 cPay := append(msg.Payload[:0:0], msg.Payload...)
936 msg.PayloadLen = len(cPay)
939 case xapp.RIC_SUB_REQ:
940 go c.handleXAPPSubscriptionRequest(msg)
941 case xapp.RIC_SUB_RESP:
942 go c.handleE2TSubscriptionResponse(msg)
943 case xapp.RIC_SUB_FAILURE:
944 go c.handleE2TSubscriptionFailure(msg)
945 case xapp.RIC_SUB_DEL_REQ:
946 go c.handleXAPPSubscriptionDeleteRequest(msg)
947 case xapp.RIC_SUB_DEL_RESP:
948 go c.handleE2TSubscriptionDeleteResponse(msg)
949 case xapp.RIC_SUB_DEL_FAILURE:
950 go c.handleE2TSubscriptionDeleteFailure(msg)
951 case xapp.RIC_SUB_DEL_REQUIRED:
952 go c.handleE2TSubscriptionDeleteRequired(msg)
954 xapp.Logger.Debug("Unknown Message Type '%d', discarding", msg.Mtype)
959 //-------------------------------------------------------------------
960 // handle from XAPP Subscription Request
961 //------------------------------------------------------------------
962 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
963 xapp.Logger.Debug("MSG from XAPP: %s", params.String())
964 c.UpdateCounter(cSubReqFromXapp)
966 if c.e2IfState.IsE2ConnectionUp(¶ms.Meid.RanName) == false {
967 xapp.Logger.Error("No E2 connection for ranName %v", params.Meid.RanName)
971 subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
973 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
977 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
979 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
982 defer trans.Release()
984 if err = c.tracker.Track(trans); err != nil {
985 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
989 subs, _, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c, true)
991 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
995 c.wakeSubscriptionRequest(subs, trans)
998 //-------------------------------------------------------------------
999 // Wake Subscription Request to E2node
1000 //------------------------------------------------------------------
1001 func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) {
1003 e2SubscriptionDirectives, err := c.GetE2SubscriptionDirectives(nil)
1005 xapp.Logger.Error("c.GetE2SubscriptionDirectives failure: %s", err.Error())
1007 subs.OngoingReqCount++
1008 go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives, waitRouteCleanup_ms)
1009 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
1010 subs.OngoingReqCount--
1012 switch themsg := event.(type) {
1013 case *e2ap.E2APSubscriptionResponse:
1014 themsg.RequestId.Id = trans.RequestId.Id
1015 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
1018 c.UpdateCounter(cSubRespToXapp)
1019 err := c.rmrSendToXapp("", subs, trans)
1021 xapp.Logger.Error("rmrSendToXapp() failed:%s", err.Error())
1025 case *e2ap.E2APSubscriptionFailure:
1026 themsg.RequestId.Id = trans.RequestId.Id
1027 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
1029 c.UpdateCounter(cSubFailToXapp)
1030 c.rmrSendToXapp("", subs, trans)
1036 xapp.Logger.Debug("XAPP-SubReq: failed %s", idstring(err, trans, subs))
1039 //-------------------------------------------------------------------
1040 // handle from XAPP Subscription Delete Request
1041 //------------------------------------------------------------------
1042 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
1043 xapp.Logger.Debug("MSG from XAPP: %s", params.String())
1044 c.UpdateCounter(cSubDelReqFromXapp)
1046 if c.e2IfState.IsE2ConnectionUp(¶ms.Meid.RanName) == false {
1047 xapp.Logger.Error("No E2 connection for ranName %v", params.Meid.RanName)
1051 subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
1053 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
1057 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
1059 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
1062 defer trans.Release()
1064 err = c.tracker.Track(trans)
1066 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
1070 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
1072 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
1079 subs.OngoingDelCount++
1080 go c.handleSubscriptionDelete(subs, trans, waitRouteCleanup_ms)
1081 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
1082 subs.OngoingDelCount--
1084 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
1086 if subs.NoRespToXapp == true {
1087 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
1088 xapp.Logger.Debug("XAPP-SubDelReq: subs.NoRespToXapp == true")
1092 // Whatever is received success, fail or timeout, send successful delete response
1093 subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
1094 subDelRespMsg.RequestId.Id = trans.RequestId.Id
1095 subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
1096 subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
1097 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
1099 c.UpdateCounter(cSubDelRespToXapp)
1100 err := c.rmrSendToXapp("", subs, trans)
1102 xapp.Logger.Error("rmrSendToXapp() failed:%s", err.Error())
1107 //-------------------------------------------------------------------
1108 // SUBS CREATE Handling
1109 //-------------------------------------------------------------------
1110 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp, e2SubscriptionDirectives *E2SubscriptionDirectives, waitRouteCleanupTime time.Duration) {
1112 var event interface{} = nil
1113 var removeSubscriptionFromDb bool = false
1114 trans := c.tracker.NewSubsTransaction(subs)
1115 subs.WaitTransactionTurn(trans)
1116 defer subs.ReleaseTransactionTurn(trans)
1117 defer trans.Release()
1119 xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
1121 subRfMsg, valid := subs.GetCachedResponse()
1122 if subRfMsg == nil && valid == true {
1123 event = c.sendE2TSubscriptionRequest(subs, trans, parentTrans, e2SubscriptionDirectives)
1124 switch event.(type) {
1125 case *e2ap.E2APSubscriptionResponse:
1126 subRfMsg, valid = subs.SetCachedResponse(event, true)
1127 subs.SubRespRcvd = true
1128 case *e2ap.E2APSubscriptionFailure:
1129 subRfMsg, valid = subs.SetCachedResponse(event, false)
1130 xapp.Logger.Debug("SUBS-SubReq: internal delete due failure event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1131 case *SubmgrRestartTestEvent:
1132 // This is used to simulate that no response has been received and after restart, subscriptions are restored from db
1133 xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
1134 subRfMsg, valid = subs.SetCachedResponse(event, false)
1135 parentTrans.SendEvent(subRfMsg, 0)
1137 case *PackSubscriptionRequestErrortEvent, *SDLWriteErrortEvent:
1138 subRfMsg, valid = subs.SetCachedResponse(event, false)
1141 if subs.PolicyUpdate == false {
1142 xapp.Logger.Debug("SUBS-SubReq: internal delete due default event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1143 subRfMsg, valid = subs.SetCachedResponse(nil, false)
1144 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1146 subRfMsg, valid = subs.SetCachedResponse(nil, true)
1149 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
1151 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
1154 removeSubscriptionFromDb = true
1157 err := c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
1160 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1164 // Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
1166 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanupTime, c)
1169 parentTrans.SendEvent(subRfMsg, 0)
1172 //-------------------------------------------------------------------
1173 // SUBS DELETE Handling
1174 //-------------------------------------------------------------------
1176 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp, waitRouteCleanupTime time.Duration) {
1178 trans := c.tracker.NewSubsTransaction(subs)
1179 subs.WaitTransactionTurn(trans)
1180 defer subs.ReleaseTransactionTurn(trans)
1181 defer trans.Release()
1183 xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
1187 if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
1190 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1195 // Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
1196 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanupTime, c)
1197 parentTrans.SendEvent(nil, 0)
1200 //-------------------------------------------------------------------
1201 // send to E2T Subscription Request
1202 //-------------------------------------------------------------------
1203 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp, e2SubscriptionDirectives *E2SubscriptionDirectives) interface{} {
1205 var event interface{} = nil
1206 var timedOut bool = false
1207 const ricRequestorId = 123
1209 subReqMsg := subs.SubReqMsg
1210 subReqMsg.RequestId = subs.GetReqId().RequestId
1211 subReqMsg.RequestId.Id = ricRequestorId
1212 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
1214 xapp.Logger.Error("SUBS-SubReq ASN1 pack error: %s", idstring(err, trans, subs, parentTrans))
1215 return &PackSubscriptionRequestErrortEvent{
1217 ErrorSource: models.SubscriptionInstanceErrorSourceASN1,
1218 ErrorCause: err.Error(),
1223 // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
1224 err = c.WriteSubscriptionToDb(subs)
1226 return &SDLWriteErrortEvent{
1228 ErrorSource: models.SubscriptionInstanceErrorSourceDBAAS,
1229 ErrorCause: err.Error(),
1234 for retries := int64(0); retries < e2SubscriptionDirectives.E2MaxTryCount; retries++ {
1235 desc := fmt.Sprintf("(retry %d)", retries)
1237 c.UpdateCounter(cSubReqToE2)
1239 c.UpdateCounter(cSubReReqToE2)
1241 err := c.rmrSendToE2T(desc, subs, trans)
1243 xapp.Logger.Error("rmrSendToE2T() failed:%s", err.Error())
1246 if subs.DoNotWaitSubResp == false {
1247 event, timedOut = trans.WaitEvent(e2SubscriptionDirectives.E2TimeoutTimerValue)
1249 c.UpdateCounter(cSubReqTimerExpiry)
1253 // Simulating case where subscrition request has been sent but response has not been received before restart
1254 event = &SubmgrRestartTestEvent{}
1255 xapp.Logger.Debug("Restart event, DoNotWaitSubResp == true")
1259 xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1263 //-------------------------------------------------------------------
1264 // send to E2T Subscription Delete Request
1265 //-------------------------------------------------------------------
1267 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
1269 var event interface{}
1271 const ricRequestorId = 123
1273 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1274 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1275 subDelReqMsg.RequestId.Id = ricRequestorId
1276 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1277 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1279 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
1283 for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
1284 desc := fmt.Sprintf("(retry %d)", retries)
1286 c.UpdateCounter(cSubDelReqToE2)
1288 c.UpdateCounter(cSubDelReReqToE2)
1290 err := c.rmrSendToE2T(desc, subs, trans)
1292 xapp.Logger.Error("SUBS-SubDelReq: rmrSendToE2T failure: %s", idstring(err, trans, subs, parentTrans))
1294 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
1296 c.UpdateCounter(cSubDelReqTimerExpiry)
1301 xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1305 //-------------------------------------------------------------------
1306 // handle from E2T Subscription Response
1307 //-------------------------------------------------------------------
1308 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
1309 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1310 c.UpdateCounter(cSubRespFromE2)
1312 subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
1314 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
1317 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
1319 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
1322 trans := subs.GetTransaction()
1324 err = fmt.Errorf("Ongoing transaction not found")
1325 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
1328 xapp.Logger.Debug("SUBS-SubResp: Sending event, trans= %v", trans)
1329 sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
1330 if sendOk == false {
1331 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1332 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
1337 //-------------------------------------------------------------------
1338 // handle from E2T Subscription Failure
1339 //-------------------------------------------------------------------
1340 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
1341 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1342 c.UpdateCounter(cSubFailFromE2)
1343 subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
1345 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
1348 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
1350 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
1353 trans := subs.GetTransaction()
1355 err = fmt.Errorf("Ongoing transaction not found")
1356 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
1359 sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
1360 if sendOk == false {
1361 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1362 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
1367 //-------------------------------------------------------------------
1368 // handle from E2T Subscription Delete Response
1369 //-------------------------------------------------------------------
1370 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) {
1371 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1372 c.UpdateCounter(cSubDelRespFromE2)
1373 subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
1375 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1378 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
1380 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1383 trans := subs.GetTransaction()
1385 err = fmt.Errorf("Ongoing transaction not found")
1386 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
1389 sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
1390 if sendOk == false {
1391 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1392 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
1397 //-------------------------------------------------------------------
1398 // handle from E2T Subscription Delete Failure
1399 //-------------------------------------------------------------------
1400 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
1401 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1402 c.UpdateCounter(cSubDelFailFromE2)
1403 subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
1405 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1408 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
1410 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1413 trans := subs.GetTransaction()
1415 err = fmt.Errorf("Ongoing transaction not found")
1416 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
1419 sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
1420 if sendOk == false {
1421 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1422 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
1427 //-------------------------------------------------------------------
1429 //-------------------------------------------------------------------
1430 func typeofSubsMessage(v interface{}) string {
1435 //case *e2ap.E2APSubscriptionRequest:
1437 case *e2ap.E2APSubscriptionResponse:
1439 case *e2ap.E2APSubscriptionFailure:
1441 //case *e2ap.E2APSubscriptionDeleteRequest:
1442 // return "SubDelReq"
1443 case *e2ap.E2APSubscriptionDeleteResponse:
1445 case *e2ap.E2APSubscriptionDeleteFailure:
1452 //-------------------------------------------------------------------
1454 //-------------------------------------------------------------------
1455 func (c *Control) WriteSubscriptionToDb(subs *Subscription) error {
1456 xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
1457 err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
1459 xapp.Logger.Error("%v", err)
1465 //-------------------------------------------------------------------
1467 //-------------------------------------------------------------------
1468 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) error {
1470 if removeSubscriptionFromDb == true {
1471 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1472 c.RemoveSubscriptionFromDb(subs)
1474 // Update is needed for successful response and merge case here
1475 if subs.RetryFromXapp == false {
1476 err := c.WriteSubscriptionToDb(subs)
1480 subs.RetryFromXapp = false
1484 //-------------------------------------------------------------------
1486 //-------------------------------------------------------------------
1487 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
1488 xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
1489 err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
1491 xapp.Logger.Error("%v", err)
1495 //-------------------------------------------------------------------
1497 //-------------------------------------------------------------------
1498 func (c *Control) WriteRESTSubscriptionToDb(restSubId string, restSubs *RESTSubscription) {
1499 xapp.Logger.Debug("WriteRESTSubscriptionToDb() restSubId = %s", restSubId)
1500 err := c.WriteRESTSubscriptionToSdl(restSubId, restSubs)
1502 xapp.Logger.Error("%v", err)
1506 //-------------------------------------------------------------------
1508 //-------------------------------------------------------------------
1509 func (c *Control) UpdateRESTSubscriptionInDB(restSubId string, restSubs *RESTSubscription, removeRestSubscriptionFromDb bool) {
1511 if removeRestSubscriptionFromDb == true {
1512 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1513 c.RemoveRESTSubscriptionFromDb(restSubId)
1515 c.WriteRESTSubscriptionToDb(restSubId, restSubs)
1519 //-------------------------------------------------------------------
1521 //-------------------------------------------------------------------
1522 func (c *Control) RemoveRESTSubscriptionFromDb(restSubId string) {
1523 xapp.Logger.Debug("RemoveRESTSubscriptionFromDb() restSubId = %s", restSubId)
1524 err := c.RemoveRESTSubscriptionFromSdl(restSubId)
1526 xapp.Logger.Error("%v", err)
1530 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription, e2SubsDelRequired bool) {
1532 if c.UTTesting == true {
1533 // Reqistry mutex is not locked after real restart but it can be when restart is simulated in unit tests
1534 c.registry.mutex = new(sync.Mutex)
1537 const ricRequestorId = 123
1538 xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
1540 // Send delete for every endpoint in the subscription
1541 if subs.PolicyUpdate == false {
1542 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1543 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1544 subDelReqMsg.RequestId.Id = ricRequestorId
1545 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1546 mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1548 xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
1551 for _, endPoint := range subs.EpList.Endpoints {
1552 params := &xapp.RMRParams{}
1553 params.Mtype = mType
1554 params.SubId = int(subs.GetReqId().InstanceId)
1556 params.Meid = subs.Meid
1557 params.Src = endPoint.String()
1558 params.PayloadLen = len(payload.Buf)
1559 params.Payload = payload.Buf
1561 subs.DeleteFromDb = true
1562 if !e2SubsDelRequired {
1563 c.handleXAPPSubscriptionDeleteRequest(params)
1565 c.SendSubscriptionDeleteReqToE2T(subs, params)
1571 func (c *Control) PrintRESTSubscriptionRequest(p *models.SubscriptionParams) {
1573 fmt.Println("CRESTSubscriptionRequest")
1579 if p.SubscriptionID != "" {
1580 fmt.Println(" SubscriptionID = ", p.SubscriptionID)
1582 fmt.Println(" SubscriptionID = ''")
1585 fmt.Printf(" ClientEndpoint.Host = %s\n", p.ClientEndpoint.Host)
1587 if p.ClientEndpoint.HTTPPort != nil {
1588 fmt.Printf(" ClientEndpoint.HTTPPort = %v\n", *p.ClientEndpoint.HTTPPort)
1590 fmt.Println(" ClientEndpoint.HTTPPort = nil")
1593 if p.ClientEndpoint.RMRPort != nil {
1594 fmt.Printf(" ClientEndpoint.RMRPort = %v\n", *p.ClientEndpoint.RMRPort)
1596 fmt.Println(" ClientEndpoint.RMRPort = nil")
1600 fmt.Printf(" Meid = %s\n", *p.Meid)
1602 fmt.Println(" Meid = nil")
1605 if p.E2SubscriptionDirectives == nil {
1606 fmt.Println(" E2SubscriptionDirectives = nil")
1608 fmt.Println(" E2SubscriptionDirectives")
1609 if p.E2SubscriptionDirectives.E2RetryCount == nil {
1610 fmt.Println(" E2RetryCount == nil")
1612 fmt.Printf(" E2RetryCount = %v\n", *p.E2SubscriptionDirectives.E2RetryCount)
1614 fmt.Printf(" E2TimeoutTimerValue = %v\n", p.E2SubscriptionDirectives.E2TimeoutTimerValue)
1615 fmt.Printf(" RMRRoutingNeeded = %v\n", p.E2SubscriptionDirectives.RMRRoutingNeeded)
1617 for _, subscriptionDetail := range p.SubscriptionDetails {
1618 if p.RANFunctionID != nil {
1619 fmt.Printf(" RANFunctionID = %v\n", *p.RANFunctionID)
1621 fmt.Println(" RANFunctionID = nil")
1623 fmt.Printf(" SubscriptionDetail.XappEventInstanceID = %v\n", *subscriptionDetail.XappEventInstanceID)
1624 fmt.Printf(" SubscriptionDetail.EventTriggers = %v\n", subscriptionDetail.EventTriggers)
1626 for _, actionToBeSetup := range subscriptionDetail.ActionToBeSetupList {
1627 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionID = %v\n", *actionToBeSetup.ActionID)
1628 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionType = %s\n", *actionToBeSetup.ActionType)
1629 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionDefinition = %v\n", actionToBeSetup.ActionDefinition)
1631 if actionToBeSetup.SubsequentAction != nil {
1632 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.SubsequentAction.SubsequentActionType = %s\n", *actionToBeSetup.SubsequentAction.SubsequentActionType)
1633 fmt.Printf(" SubscriptionDetail.ActionToBeSetup..SubsequentAction.TimeToWait = %s\n", *actionToBeSetup.SubsequentAction.TimeToWait)
1635 fmt.Println(" SubscriptionDetail.ActionToBeSetup.SubsequentAction = nil")
1641 //-------------------------------------------------------------------
1642 // handle from E2T Subscription Delete Required
1643 //-------------------------------------------------------------------
1644 func (c *Control) handleE2TSubscriptionDeleteRequired(params *xapp.RMRParams) {
1645 xapp.Logger.Info("MSG from E2T: %s", params.String())
1646 c.UpdateCounter(cSubDelRequFromE2)
1647 subsDelRequMsg, err := c.e2ap.UnpackSubscriptionDeleteRequired(params.Payload)
1649 xapp.Logger.Error("MSG-SubDelRequired: %s", idstring(err, params))
1650 //c.sendE2TErrorIndication(nil)
1653 var subscriptions = map[string][]e2ap.E2APSubscriptionDeleteRequired{}
1654 var subDB = []*Subscription{}
1655 for _, subsTobeRemove := range subsDelRequMsg.E2APSubscriptionDeleteRequiredRequests {
1656 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subsTobeRemove.RequestId.InstanceId})
1658 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1661 // Check if Delete Subscription Already triggered
1662 if subs.OngoingDelCount > 0 {
1665 subDB = append(subDB, subs)
1666 for _, endpoint := range subs.EpList.Endpoints {
1667 subscriptions[endpoint.Addr] = append(subscriptions[endpoint.Addr], subsTobeRemove)
1669 // Sending Subscription Delete Request to E2T
1670 // c.SendSubscriptionDeleteReq(subs, true)
1672 for _, subsTobeRemove := range subDB {
1673 // Sending Subscription Delete Request to E2T
1674 c.SendSubscriptionDeleteReq(subsTobeRemove, true)
1678 //-----------------------------------------------------------------
1679 // Initiate RIC Subscription Delete Request after receiving
1680 // RIC Subscription Delete Required from E2T
1681 //-----------------------------------------------------------------
1682 func (c *Control) SendSubscriptionDeleteReqToE2T(subs *Subscription, params *xapp.RMRParams) {
1683 xapp.Logger.Debug("MSG TO E2T: %s", params.String())
1684 c.UpdateCounter(cSubDelReqToE2)
1686 if c.e2IfState.IsE2ConnectionUp(¶ms.Meid.RanName) == false {
1687 xapp.Logger.Error("No E2 connection for ranName %v", params.Meid.RanName)
1691 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subs.ReqId.RequestId, params.Meid)
1693 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
1696 defer trans.Release()
1698 err := c.tracker.Track(trans)
1700 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
1707 subs.OngoingDelCount++
1708 go c.handleSubscriptionDelete(subs, trans, waitRouteCleanup_ms)
1709 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
1710 subs.OngoingDelCount--
1712 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
1714 if subs.NoRespToXapp == true {
1715 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
1716 xapp.Logger.Debug("XAPP-SubDelReq: subs.NoRespToXapp == true")