2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
28 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
29 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
30 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
31 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/common"
32 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
33 httptransport "github.com/go-openapi/runtime/client"
34 "github.com/go-openapi/strfmt"
35 "github.com/segmentio/ksuid"
36 "github.com/spf13/viper"
39 //-----------------------------------------------------------------------------
41 //-----------------------------------------------------------------------------
43 func idstring(err error, entries ...fmt.Stringer) string {
44 var retval string = ""
45 var filler string = ""
46 for _, entry := range entries {
48 retval += filler + entry.String()
51 retval += filler + "(NIL)"
55 retval += filler + "err(" + err.Error() + ")"
61 //-----------------------------------------------------------------------------
63 //-----------------------------------------------------------------------------
65 var e2tSubReqTimeout time.Duration
66 var e2tSubDelReqTime time.Duration
67 var e2tRecvMsgTimeout time.Duration
68 var waitRouteCleanup_ms time.Duration
69 var e2tMaxSubReqTryCount uint64 // Initial try + retry
70 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
71 var checkE2State string
72 var readSubsFromDb string
73 var dbRetryForever string
81 restDuplicateCtrl *DuplicateCtrl
83 e2IfStateDb XappRnibInterface
85 restSubsDb Sdlnterface
88 Counters map[string]xapp.Counter
99 type SubmgrRestartTestEvent struct{}
100 type SubmgrRestartUpEvent struct{}
101 type PackSubscriptionRequestErrortEvent struct {
105 func (p *PackSubscriptionRequestErrortEvent) SetEvent(errorInfo *ErrorInfo) {
106 p.ErrorInfo = *errorInfo
109 type SDLWriteErrortEvent struct {
113 func (s *SDLWriteErrortEvent) SetEvent(errorInfo *ErrorInfo) {
114 s.ErrorInfo = *errorInfo
118 xapp.Logger.Debug("SUBMGR")
120 viper.SetEnvPrefix("submgr")
121 viper.AllowEmptyEnv(true)
124 func NewControl() *Control {
126 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
127 rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
129 registry := new(Registry)
130 registry.Initialize()
131 registry.rtmgrClient = &rtmgrClient
133 tracker := new(Tracker)
136 restDuplicateCtrl := new(DuplicateCtrl)
137 restDuplicateCtrl.Init()
139 e2IfState := new(E2IfState)
141 c := &Control{e2ap: new(E2ap),
144 restDuplicateCtrl: restDuplicateCtrl,
145 e2IfState: e2IfState,
146 e2IfStateDb: CreateXappRnibIfInstance(),
147 e2SubsDb: CreateSdl(),
148 restSubsDb: CreateRESTSdl(),
149 Counters: xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
154 c.ReadConfigParameters("")
156 // Register REST handler for testing support
157 xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET")
158 xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
159 xapp.Resource.InjectRoute("/ric/v1/restsubscriptions", c.GetAllRestSubscriptions, "GET")
161 xapp.Resource.InjectRoute("/ric/v1/get_all_e2nodes", c.GetAllE2Nodes, "GET")
162 xapp.Resource.InjectRoute("/ric/v1/get_e2node_rest_subscriptions/{ranName}", c.GetAllE2NodeRestSubscriptions, "GET")
164 xapp.Resource.InjectRoute("/ric/v1/get_all_xapps", c.GetAllXapps, "GET")
165 xapp.Resource.InjectRoute("/ric/v1/get_xapp_rest_restsubscriptions/{xappServiceName}", c.GetAllXappRestSubscriptions, "GET")
166 xapp.Resource.InjectRoute("/ric/v1/get_e2subscriptions/{restId}", c.GetE2Subscriptions, "GET")
168 xapp.Resource.InjectRoute("/ric/v1/delete_all_e2node_subscriptions/{ranName}", c.DeleteAllE2nodeSubscriptions, "DELETE")
169 xapp.Resource.InjectRoute("/ric/v1/delete_all_xapp_subscriptions/{xappServiceName}", c.DeleteAllXappSubscriptions, "DELETE")
171 if readSubsFromDb == "true" {
172 // Read subscriptions from db
173 err := c.ReadE2Subscriptions()
175 xapp.Logger.Error("ReadE2Subscriptions() failed %s", err.Error())
177 err = c.ReadRESTSubscriptions()
179 xapp.Logger.Error("ReadRESTSubscriptions() failed %s", err.Error())
184 err := xapp.Subscription.Listen(c.RESTSubscriptionHandler, c.RESTQueryHandler, c.RESTSubscriptionDeleteHandler)
186 xapp.Logger.Error("xapp.Subscription.Listen failure: %s", err.Error())
192 func (c *Control) SymptomDataHandler(w http.ResponseWriter, r *http.Request) {
193 subscriptions, err := c.registry.QueryHandler()
195 xapp.Logger.Error("QueryHandler() failed %s", err.Error())
198 xapp.Resource.SendSymptomDataJson(w, r, subscriptions, "platform/subscriptions.json")
201 //-------------------------------------------------------------------
203 //-------------------------------------------------------------------
204 func (c *Control) RESTQueryHandler() (models.SubscriptionList, error) {
205 xapp.Logger.Debug("RESTQueryHandler() called")
209 return c.registry.QueryHandler()
212 //-------------------------------------------------------------------
214 //-------------------------------------------------------------------
215 func (c *Control) ReadE2Subscriptions() error {
218 var register map[uint32]*Subscription
219 for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
220 xapp.Logger.Debug("Reading E2 subscriptions from db")
221 subIds, register, err = c.ReadAllSubscriptionsFromSdl()
223 xapp.Logger.Error("%v", err)
224 <-time.After(1 * time.Second)
226 c.registry.subIds = subIds
227 c.registry.register = register
228 go c.HandleUncompletedSubscriptions(register)
232 xapp.Logger.Debug("Continuing without retring")
236 //-------------------------------------------------------------------
238 //-------------------------------------------------------------------
239 func (c *Control) ReadRESTSubscriptions() error {
241 xapp.Logger.Debug("ReadRESTSubscriptions()")
243 var restSubscriptions map[string]*RESTSubscription
244 for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
245 xapp.Logger.Debug("Reading REST subscriptions from db")
246 restSubscriptions, err = c.ReadAllRESTSubscriptionsFromSdl()
248 xapp.Logger.Error("%v", err)
249 <-time.After(1 * time.Second)
251 // Fix REST subscriptions ongoing status after restart
252 for restSubId, restSubscription := range restSubscriptions {
253 restSubscription.SubReqOngoing = false
254 restSubscription.SubDelReqOngoing = false
255 err := c.WriteRESTSubscriptionToSdl(restSubId, restSubscription)
257 xapp.Logger.Error("WriteRESTSubscriptionToSdl() failed:%s", err.Error())
260 c.registry.restSubscriptions = restSubscriptions
264 xapp.Logger.Debug("Continuing without retring")
268 //-------------------------------------------------------------------
270 //-------------------------------------------------------------------
271 func (c *Control) ReadConfigParameters(f string) {
273 xapp.Logger.Debug("ReadConfigParameters")
275 c.LoggerLevel = int(xapp.Logger.GetLevel())
276 xapp.Logger.Info("LoggerLevel = %v", c.LoggerLevel)
277 c.e2ap.SetASN1DebugPrintStatus(c.LoggerLevel)
279 // viper.GetDuration returns nanoseconds
280 e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
281 if e2tSubReqTimeout == 0 {
282 e2tSubReqTimeout = 2000 * 1000000
283 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tSubReqTimeout")
285 xapp.Logger.Debug("e2tSubReqTimeout= %v", e2tSubReqTimeout)
287 e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
288 if e2tSubDelReqTime == 0 {
289 e2tSubDelReqTime = 2000 * 1000000
290 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tSubDelReqTime")
292 xapp.Logger.Debug("e2tSubDelReqTime= %v", e2tSubDelReqTime)
294 e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
295 if e2tRecvMsgTimeout == 0 {
296 e2tRecvMsgTimeout = 2000 * 1000000
297 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tRecvMsgTimeout")
299 xapp.Logger.Debug("e2tRecvMsgTimeout= %v", e2tRecvMsgTimeout)
301 e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
302 if e2tMaxSubReqTryCount == 0 {
303 e2tMaxSubReqTryCount = 1
304 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tMaxSubReqTryCount")
306 xapp.Logger.Debug("e2tMaxSubReqTryCount= %v", e2tMaxSubReqTryCount)
308 e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
309 if e2tMaxSubDelReqTryCount == 0 {
310 e2tMaxSubDelReqTryCount = 1
311 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tMaxSubDelReqTryCount")
313 xapp.Logger.Debug("e2tMaxSubDelReqTryCount= %v", e2tMaxSubDelReqTryCount)
315 checkE2State = viper.GetString("controls.checkE2State")
316 if checkE2State == "" {
317 checkE2State = "true"
318 xapp.Logger.Debug("WARNING: Using hard coded default value for checkE2State")
320 xapp.Logger.Debug("checkE2State= %v", checkE2State)
322 readSubsFromDb = viper.GetString("controls.readSubsFromDb")
323 if readSubsFromDb == "" {
324 readSubsFromDb = "true"
325 xapp.Logger.Debug("WARNING: Using hard coded default value for readSubsFromDb")
327 xapp.Logger.Debug("readSubsFromDb= %v", readSubsFromDb)
329 dbTryCount = viper.GetInt("controls.dbTryCount")
332 xapp.Logger.Debug("WARNING: Using hard coded default value for dbTryCount")
334 xapp.Logger.Debug("dbTryCount= %v", dbTryCount)
336 dbRetryForever = viper.GetString("controls.dbRetryForever")
337 if dbRetryForever == "" {
338 dbRetryForever = "true"
339 xapp.Logger.Debug("WARNING: Using hard coded default value for dbRetryForever")
341 xapp.Logger.Debug("dbRetryForever= %v", dbRetryForever)
343 // Internal cfg parameter, used to define a wait time for RMR route clean-up. None default
344 // value 100ms used currently only in unittests.
345 waitRouteCleanup_ms = viper.GetDuration("controls.waitRouteCleanup_ms") * 1000000
346 if waitRouteCleanup_ms == 0 {
347 waitRouteCleanup_ms = 5000 * 1000000
348 xapp.Logger.Debug("WARNING: Using hard coded default value for waitRouteCleanup_ms")
350 xapp.Logger.Debug("waitRouteCleanup= %v", waitRouteCleanup_ms)
353 //-------------------------------------------------------------------
355 //-------------------------------------------------------------------
356 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
358 xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
359 for subId, subs := range register {
360 if subs.SubRespRcvd == false {
361 // If policy subscription has already been made successfully unsuccessful update should not be deleted.
362 if subs.PolicyUpdate == false {
363 subs.NoRespToXapp = true
364 xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
365 c.SendSubscriptionDeleteReq(subs, false)
371 func (c *Control) ReadyCB(data interface{}) {
372 if c.RMRClient == nil {
373 c.RMRClient = xapp.Rmr
377 func (c *Control) Run() {
378 xapp.SetReadyCB(c.ReadyCB, nil)
379 xapp.AddConfigChangeListener(c.ReadConfigParameters)
383 //-------------------------------------------------------------------
385 //-------------------------------------------------------------------
386 func (c *Control) GetOrCreateRestSubscription(p *models.SubscriptionParams, md5sum string, xAppRmrEndpoint string, xAppServiceName string) (*RESTSubscription, string, error) {
389 var restSubscription *RESTSubscription
392 prevRestSubsId, exists := c.restDuplicateCtrl.GetLastKnownRestSubsIdBasedOnMd5sum(md5sum)
393 if p.SubscriptionID == "" {
394 // Subscription does not contain REST subscription Id
396 restSubscription, err = c.registry.GetRESTSubscription(prevRestSubsId, false)
397 if restSubscription != nil {
398 // Subscription not found
399 restSubId = prevRestSubsId
401 xapp.Logger.Debug("Existing restSubId %s found by MD5sum %s for a request without subscription ID - using previous subscription", prevRestSubsId, md5sum)
403 xapp.Logger.Debug("Existing restSubId %s found by MD5sum %s for a request without subscription ID - Note: %s", prevRestSubsId, md5sum, err.Error())
406 xapp.Logger.Debug("None existing restSubId %s referred by MD5sum %s for a request without subscription ID - deleting cached entry", prevRestSubsId, md5sum)
407 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
411 if restSubscription == nil {
412 restSubId = ksuid.New().String()
413 restSubscription = c.registry.CreateRESTSubscription(&restSubId, &xAppServiceName, &xAppRmrEndpoint, p.Meid)
416 // Subscription contains REST subscription Id
417 restSubId = p.SubscriptionID
419 xapp.Logger.Debug("RestSubscription ID %s provided via REST request", restSubId)
420 restSubscription, err = c.registry.GetRESTSubscription(restSubId, false)
422 // Subscription with id in REST request does not exist
423 xapp.Logger.Error("%s", err.Error())
424 c.UpdateCounter(cRestSubFailToXapp)
429 xapp.Logger.Debug("Existing restSubscription found for ID %s, new request based on md5sum", restSubId)
431 xapp.Logger.Debug("Existing restSubscription found for ID %s(%s), re-transmission based on md5sum match with previous request", prevRestSubsId, restSubId)
435 return restSubscription, restSubId, nil
438 //-------------------------------------------------------------------
440 //-------------------------------------------------------------------
441 func (c *Control) RESTSubscriptionHandler(params interface{}) (*models.SubscriptionResponse, int) {
444 c.UpdateCounter(cRestSubReqFromXapp)
446 subResp := models.SubscriptionResponse{}
447 p := params.(*models.SubscriptionParams)
449 if c.LoggerLevel > 2 {
450 c.PrintRESTSubscriptionRequest(p)
453 if c.e2IfState.IsE2ConnectionUp(p.Meid) == false {
454 xapp.Logger.Error("No E2 connection for ranName %v", *p.Meid)
455 c.UpdateCounter(cRestReqRejDueE2Down)
456 return nil, common.SubscribeServiceUnavailableCode
459 if p.ClientEndpoint == nil {
460 err := fmt.Errorf("ClientEndpoint == nil")
461 xapp.Logger.Error("%v", err)
462 c.UpdateCounter(cRestSubFailToXapp)
463 return nil, common.SubscribeBadRequestCode
466 e2SubscriptionDirectives, err := c.GetE2SubscriptionDirectives(p)
468 xapp.Logger.Error("%s", err)
469 c.UpdateCounter(cRestSubFailToXapp)
470 return nil, common.SubscribeBadRequestCode
472 _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*p.ClientEndpoint)
474 xapp.Logger.Error("%s", err.Error())
475 c.UpdateCounter(cRestSubFailToXapp)
476 return nil, common.SubscribeBadRequestCode
479 md5sum, err := CalculateRequestMd5sum(params)
481 xapp.Logger.Error("Failed to generate md5sum from incoming request - %s", err.Error())
484 restSubscription, restSubId, err := c.GetOrCreateRestSubscription(p, md5sum, xAppRmrEndpoint, p.ClientEndpoint.Host)
486 xapp.Logger.Error("Subscription with id in REST request does not exist")
487 return nil, common.SubscribeNotFoundCode
490 subResp.SubscriptionID = &restSubId
491 subReqList := e2ap.SubscriptionRequestList{}
492 err = c.e2ap.FillSubscriptionReqMsgs(params, &subReqList, restSubscription)
494 xapp.Logger.Error("%s", err.Error())
495 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
496 c.registry.DeleteRESTSubscription(&restSubId)
497 c.UpdateCounter(cRestSubFailToXapp)
498 return nil, common.SubscribeBadRequestCode
501 duplicate := c.restDuplicateCtrl.IsDuplicateToOngoingTransaction(restSubId, md5sum)
503 err := fmt.Errorf("Retransmission blocker direct ACK for request of restSubsId %s restSubId MD5sum %s as retransmission", restSubId, md5sum)
504 xapp.Logger.Debug("%s", err)
505 c.registry.DeleteRESTSubscription(&restSubId)
506 c.UpdateCounter(cRestSubRespToXapp)
507 return &subResp, common.SubscribeCreatedCode
510 c.WriteRESTSubscriptionToDb(restSubId, restSubscription)
511 go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId, xAppRmrEndpoint, md5sum, e2SubscriptionDirectives)
513 c.UpdateCounter(cRestSubRespToXapp)
514 return &subResp, common.SubscribeCreatedCode
517 //-------------------------------------------------------------------
519 //-------------------------------------------------------------------
520 func (c *Control) GetE2SubscriptionDirectives(p *models.SubscriptionParams) (*E2SubscriptionDirectives, error) {
522 e2SubscriptionDirectives := &E2SubscriptionDirectives{}
523 if p == nil || p.E2SubscriptionDirectives == nil {
524 e2SubscriptionDirectives.E2TimeoutTimerValue = e2tSubReqTimeout
525 e2SubscriptionDirectives.E2MaxTryCount = int64(e2tMaxSubReqTryCount)
526 e2SubscriptionDirectives.CreateRMRRoute = true
527 xapp.Logger.Debug("p == nil || p.E2SubscriptionDirectives == nil. Using default values for E2TimeoutTimerValue = %v and E2RetryCount = %v RMRRoutingNeeded = true", e2tSubReqTimeout, e2tMaxSubReqTryCount)
529 if p.E2SubscriptionDirectives.E2TimeoutTimerValue >= 1 && p.E2SubscriptionDirectives.E2TimeoutTimerValue <= 10 {
530 e2SubscriptionDirectives.E2TimeoutTimerValue = time.Duration(p.E2SubscriptionDirectives.E2TimeoutTimerValue) * 1000000000 // Duration type cast returns nano seconds
532 return nil, fmt.Errorf("p.E2SubscriptionDirectives.E2TimeoutTimerValue out of range (1-10 seconds): %v", p.E2SubscriptionDirectives.E2TimeoutTimerValue)
534 if p.E2SubscriptionDirectives.E2RetryCount == nil {
535 xapp.Logger.Error("p.E2SubscriptionDirectives.E2RetryCount == nil. Using default value")
536 e2SubscriptionDirectives.E2MaxTryCount = int64(e2tMaxSubReqTryCount)
538 if *p.E2SubscriptionDirectives.E2RetryCount >= 0 && *p.E2SubscriptionDirectives.E2RetryCount <= 10 {
539 e2SubscriptionDirectives.E2MaxTryCount = *p.E2SubscriptionDirectives.E2RetryCount + 1 // E2MaxTryCount = First sending plus two retries
541 return nil, fmt.Errorf("p.E2SubscriptionDirectives.E2RetryCount out of range (0-10): %v", *p.E2SubscriptionDirectives.E2RetryCount)
544 e2SubscriptionDirectives.CreateRMRRoute = p.E2SubscriptionDirectives.RMRRoutingNeeded
546 xapp.Logger.Debug("e2SubscriptionDirectives.E2TimeoutTimerValue: %v", e2SubscriptionDirectives.E2TimeoutTimerValue)
547 xapp.Logger.Debug("e2SubscriptionDirectives.E2MaxTryCount: %v", e2SubscriptionDirectives.E2MaxTryCount)
548 xapp.Logger.Debug("e2SubscriptionDirectives.CreateRMRRoute: %v", e2SubscriptionDirectives.CreateRMRRoute)
549 return e2SubscriptionDirectives, nil
552 //-------------------------------------------------------------------
554 //-------------------------------------------------------------------
556 func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription, subReqList *e2ap.SubscriptionRequestList,
557 clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string, xAppRmrEndpoint string, md5sum string, e2SubscriptionDirectives *E2SubscriptionDirectives) {
559 c.SubscriptionProcessingStartDelay()
560 xapp.Logger.Debug("E2 SubscriptionRequest count = %v ", len(subReqList.E2APSubscriptionRequests))
562 var xAppEventInstanceID int64
563 var e2EventInstanceID int64
564 errorInfo := &ErrorInfo{}
566 defer c.restDuplicateCtrl.SetMd5sumFromLastOkRequest(*restSubId, md5sum)
568 for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ {
569 subReqMsg := subReqList.E2APSubscriptionRequests[index]
570 xAppEventInstanceID = (int64)(subReqMsg.RequestId.Id)
572 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), *restSubId, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid})
574 // Send notification to xApp that prosessing of a Subscription Request has failed.
575 err := fmt.Errorf("Tracking failure")
576 errorInfo.ErrorCause = err.Error()
577 c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans, errorInfo)
581 xapp.Logger.Debug("Handle SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
583 subRespMsg, errorInfo, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, *restSubId, e2SubscriptionDirectives)
585 xapp.Logger.Debug("Handled SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
589 if err.Error() == "TEST: restart event received" {
590 // This is just for UT cases. Stop here subscription processing
593 c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans, errorInfo)
595 e2EventInstanceID = (int64)(subRespMsg.RequestId.InstanceId)
596 restSubscription.AddMd5Sum(md5sum)
597 xapp.Logger.Debug("SubscriptionRequest index=%v processed successfullyfor %s. endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
598 index, *restSubId, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
599 c.sendSuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, e2EventInstanceID, clientEndpoint, trans, errorInfo)
604 //-------------------------------------------------------------------
606 //------------------------------------------------------------------
607 func (c *Control) SubscriptionProcessingStartDelay() {
608 if c.UTTesting == true {
609 // This is temporary fix for the UT problem that notification arrives before subscription response
610 // Correct fix would be to allow notification come before response and process it correctly
611 xapp.Logger.Debug("Setting 50 ms delay before starting processing Subscriptions")
612 <-time.After(time.Millisecond * 50)
613 xapp.Logger.Debug("Continuing after delay")
617 //-------------------------------------------------------------------
619 //------------------------------------------------------------------
620 func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, meid *string,
621 restSubId string, e2SubscriptionDirectives *E2SubscriptionDirectives) (*e2ap.E2APSubscriptionResponse, *ErrorInfo, error) {
623 errorInfo := ErrorInfo{}
625 err := c.tracker.Track(trans)
627 xapp.Logger.Error("XAPP-SubReq Tracking error: %s", idstring(err, trans))
628 errorInfo.ErrorCause = err.Error()
629 err = fmt.Errorf("Tracking failure")
630 return nil, &errorInfo, err
633 subs, errorInfo, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c, e2SubscriptionDirectives.CreateRMRRoute)
635 xapp.Logger.Error("XAPP-SubReq Assign error: %s", idstring(err, trans))
636 return nil, &errorInfo, err
642 subs.OngoingReqCount++
643 go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives, 0)
644 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
645 subs.OngoingReqCount--
649 switch themsg := event.(type) {
650 case *e2ap.E2APSubscriptionResponse:
652 if c.e2IfState.IsE2ConnectionUp(meid) == true {
653 errorInfo = c.e2ap.CheckActionNotAdmittedList(xapp.RIC_SUB_RESP, themsg.ActionNotAdmittedList, c)
654 return themsg, &errorInfo, nil
656 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
657 c.RemoveSubscriptionFromDb(subs)
658 err = fmt.Errorf("E2 interface down")
659 errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
661 case *e2ap.E2APSubscriptionFailure:
662 err = fmt.Errorf("RICSubscriptionFailure. E2NodeCause: (Cause:%v, Value %v)", themsg.Cause.Content, themsg.Cause.Value)
663 errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
664 case *PackSubscriptionRequestErrortEvent:
665 err = fmt.Errorf("E2 RICSubscriptionRequest pack failure")
666 errorInfo = themsg.ErrorInfo
667 case *SDLWriteErrortEvent:
668 err = fmt.Errorf("SDL write failure")
669 errorInfo = themsg.ErrorInfo
670 case *SubmgrRestartTestEvent:
671 err = fmt.Errorf("TEST: restart event received")
672 xapp.Logger.Debug("%s", err)
673 return nil, &errorInfo, err
675 err = fmt.Errorf("Unexpected E2 subscription response received")
676 errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
681 err = fmt.Errorf("E2 RICSubscriptionResponse timeout")
682 errorInfo.SetInfo(err.Error(), "", models.SubscriptionInstanceTimeoutTypeE2Timeout)
683 if subs.PolicyUpdate == true {
684 return nil, &errorInfo, err
688 xapp.Logger.Error("XAPP-SubReq E2 subscription failed: %s", idstring(err, trans, subs))
689 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
691 return nil, &errorInfo, err
694 //-------------------------------------------------------------------
696 //-------------------------------------------------------------------
697 func (c *Control) sendUnsuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, err error,
698 clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp, errorInfo *ErrorInfo) {
700 // Send notification to xApp that prosessing of a Subscription Request has failed.
701 e2EventInstanceID := (int64)(0)
702 if errorInfo.ErrorSource == "" {
703 // Submgr is default source of error
704 errorInfo.ErrorSource = models.SubscriptionInstanceErrorSourceSUBMGR
706 resp := &models.SubscriptionResponse{
707 SubscriptionID: restSubId,
708 SubscriptionInstances: []*models.SubscriptionInstance{
709 &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
710 ErrorCause: errorInfo.ErrorCause,
711 ErrorSource: errorInfo.ErrorSource,
712 TimeoutType: errorInfo.TimeoutType,
713 XappEventInstanceID: &xAppEventInstanceID},
716 // Mark REST subscription request processed.
717 restSubscription.SetProcessed(err)
718 c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
720 xapp.Logger.Debug("Sending unsuccessful REST notification: ErrorCause:%s, ErrorSource:%s, TimeoutType:%s, to Endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
721 errorInfo.ErrorCause, errorInfo.ErrorSource, errorInfo.TimeoutType, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
723 xapp.Logger.Debug("Sending unsuccessful REST notification: ErrorCause:%s, ErrorSource:%s, TimeoutType:%s, to Endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v",
724 errorInfo.ErrorCause, errorInfo.ErrorSource, errorInfo.TimeoutType, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID)
727 c.UpdateCounter(cRestSubFailNotifToXapp)
728 err = xapp.Subscription.Notify(resp, *clientEndpoint)
730 xapp.Logger.Error("xapp.Subscription.Notify failed %s", err.Error())
733 // E2 is down. Delete completely processed request safely now
734 if c.e2IfState.IsE2ConnectionUp(&restSubscription.Meid) == false && restSubscription.SubReqOngoing == false {
735 c.registry.DeleteRESTSubscription(restSubId)
736 c.RemoveRESTSubscriptionFromDb(*restSubId)
740 //-------------------------------------------------------------------
742 //-------------------------------------------------------------------
743 func (c *Control) sendSuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, e2EventInstanceID int64,
744 clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp, errorInfo *ErrorInfo) {
746 // Store successfully processed InstanceId for deletion
747 restSubscription.AddE2InstanceId((uint32)(e2EventInstanceID))
748 restSubscription.AddXappIdToE2Id(xAppEventInstanceID, e2EventInstanceID)
750 // Send notification to xApp that a Subscription Request has been processed.
751 resp := &models.SubscriptionResponse{
752 SubscriptionID: restSubId,
753 SubscriptionInstances: []*models.SubscriptionInstance{
754 &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
755 ErrorCause: errorInfo.ErrorCause,
756 ErrorSource: errorInfo.ErrorSource,
757 XappEventInstanceID: &xAppEventInstanceID},
760 // Mark REST subscription request processesd.
761 restSubscription.SetProcessed(nil)
762 c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
763 xapp.Logger.Debug("Sending successful REST notification: ErrorCause:%s, ErrorSource:%s, TimeoutType:%s, to Endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
764 errorInfo.ErrorCause, errorInfo.ErrorSource, errorInfo.TimeoutType, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
765 c.UpdateCounter(cRestSubNotifToXapp)
766 err := xapp.Subscription.Notify(resp, *clientEndpoint)
768 xapp.Logger.Error("xapp.Subscription.Notify failed %s", err.Error())
771 // E2 is down. Delete completely processed request safely now
772 if c.e2IfState.IsE2ConnectionUp(&restSubscription.Meid) == false && restSubscription.SubReqOngoing == false {
773 c.registry.DeleteRESTSubscription(restSubId)
774 c.RemoveRESTSubscriptionFromDb(*restSubId)
778 //-------------------------------------------------------------------
780 //-------------------------------------------------------------------
781 func (c *Control) RESTSubscriptionDeleteHandler(restSubId string) int {
784 c.UpdateCounter(cRestSubDelReqFromXapp)
786 xapp.Logger.Debug("SubscriptionDeleteRequest from XAPP")
788 restSubscription, err := c.registry.GetRESTSubscription(restSubId, true)
790 xapp.Logger.Error("%s", err.Error())
791 if restSubscription == nil {
792 // Subscription was not found
793 c.UpdateCounter(cRestSubDelRespToXapp)
794 return common.UnsubscribeNoContentCode
796 if restSubscription.SubReqOngoing == true {
797 err := fmt.Errorf("Handling of the REST Subscription Request still ongoing %s", restSubId)
798 xapp.Logger.Error("%s", err.Error())
799 c.UpdateCounter(cRestSubDelFailToXapp)
800 return common.UnsubscribeBadRequestCode
801 } else if restSubscription.SubDelReqOngoing == true {
802 // Previous request for same restSubId still ongoing
803 c.UpdateCounter(cRestSubDelRespToXapp)
804 return common.UnsubscribeNoContentCode
809 xAppRmrEndPoint := restSubscription.xAppRmrEndPoint
811 xapp.Logger.Debug("Deleteting handler: processing instances = %v", restSubscription.InstanceIds)
812 for _, instanceId := range restSubscription.InstanceIds {
813 xAppEventInstanceID, err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId, 0)
816 xapp.Logger.Error("%s", err.Error())
818 xapp.Logger.Debug("Deleteting instanceId = %v", instanceId)
819 restSubscription.DeleteXappIdToE2Id(xAppEventInstanceID)
820 restSubscription.DeleteE2InstanceId(instanceId)
822 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(restSubscription.lastReqMd5sum)
823 c.registry.DeleteRESTSubscription(&restSubId)
824 c.RemoveRESTSubscriptionFromDb(restSubId)
827 c.UpdateCounter(cRestSubDelRespToXapp)
828 return common.UnsubscribeNoContentCode
831 //-------------------------------------------------------------------
833 //-------------------------------------------------------------------
834 func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32, waitRouteCleanupTime time.Duration) (int64, error) {
836 var xAppEventInstanceID int64
837 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{instanceId})
839 xapp.Logger.Debug("Subscription Delete Handler subscription for restSubId=%v, E2EventInstanceID=%v not found %s",
840 restSubId, instanceId, idstring(err, nil))
841 return xAppEventInstanceID, nil
844 xAppEventInstanceID = int64(subs.ReqId.Id)
845 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(*endPoint), *restSubId, e2ap.RequestId{subs.ReqId.Id, 0}, &xapp.RMRMeid{RanName: *meid})
847 err := fmt.Errorf("XAPP-SubDelReq transaction not created. restSubId %s, endPoint %s, meid %s, instanceId %v", *restSubId, *endPoint, *meid, instanceId)
848 xapp.Logger.Error("%s", err.Error())
850 defer trans.Release()
852 err = c.tracker.Track(trans)
854 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
855 xapp.Logger.Error("%s", err.Error())
856 return xAppEventInstanceID, &time.ParseError{}
861 subs.OngoingDelCount++
862 go c.handleSubscriptionDelete(subs, trans, waitRouteCleanupTime)
863 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
864 subs.OngoingDelCount--
866 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
868 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
870 return xAppEventInstanceID, nil
873 //-------------------------------------------------------------------
875 //-------------------------------------------------------------------
877 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
878 params := &xapp.RMRParams{}
879 params.Mtype = trans.GetMtype()
880 params.SubId = int(subs.GetReqId().InstanceId)
882 params.Meid = subs.GetMeid()
884 params.PayloadLen = len(trans.Payload.Buf)
885 params.Payload = trans.Payload.Buf
887 xapp.Logger.Debug("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
888 err = c.SendWithRetry(params, false, 5)
890 xapp.Logger.Error("rmrSendToE2T: Send failed: %+v", err)
895 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
897 params := &xapp.RMRParams{}
898 params.Mtype = trans.GetMtype()
899 params.SubId = int(subs.GetReqId().InstanceId)
900 params.Xid = trans.GetXid()
901 params.Meid = trans.GetMeid()
903 params.PayloadLen = len(trans.Payload.Buf)
904 params.Payload = trans.Payload.Buf
906 xapp.Logger.Debug("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
907 err = c.SendWithRetry(params, false, 5)
909 xapp.Logger.Error("rmrSendToXapp: Send failed: %+v", err)
914 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
915 if c.RMRClient == nil {
916 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
917 xapp.Logger.Error("%s", err.Error())
922 defer c.RMRClient.Free(msg.Mbuf)
924 // xapp-frame might use direct access to c buffer and
925 // when msg.Mbuf is freed, someone might take it into use
926 // and payload data might be invalid inside message handle function
928 // subscriptions won't load system a lot so there is no
929 // real performance hit by cloning buffer into new go byte slice
930 cPay := append(msg.Payload[:0:0], msg.Payload...)
932 msg.PayloadLen = len(cPay)
935 case xapp.RIC_SUB_REQ:
936 go c.handleXAPPSubscriptionRequest(msg)
937 case xapp.RIC_SUB_RESP:
938 go c.handleE2TSubscriptionResponse(msg)
939 case xapp.RIC_SUB_FAILURE:
940 go c.handleE2TSubscriptionFailure(msg)
941 case xapp.RIC_SUB_DEL_REQ:
942 go c.handleXAPPSubscriptionDeleteRequest(msg)
943 case xapp.RIC_SUB_DEL_RESP:
944 go c.handleE2TSubscriptionDeleteResponse(msg)
945 case xapp.RIC_SUB_DEL_FAILURE:
946 go c.handleE2TSubscriptionDeleteFailure(msg)
947 case xapp.RIC_SUB_DEL_REQUIRED:
948 go c.handleE2TSubscriptionDeleteRequired(msg)
950 xapp.Logger.Debug("Unknown Message Type '%d', discarding", msg.Mtype)
955 //-------------------------------------------------------------------
956 // handle from XAPP Subscription Request
957 //------------------------------------------------------------------
958 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
959 xapp.Logger.Debug("MSG from XAPP: %s", params.String())
960 c.UpdateCounter(cSubReqFromXapp)
962 if c.e2IfState.IsE2ConnectionUp(¶ms.Meid.RanName) == false {
963 xapp.Logger.Error("No E2 connection for ranName %v", params.Meid.RanName)
967 subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
969 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
973 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
975 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
978 defer trans.Release()
980 if err = c.tracker.Track(trans); err != nil {
981 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
985 subs, _, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c, true)
987 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
991 c.wakeSubscriptionRequest(subs, trans)
994 //-------------------------------------------------------------------
995 // Wake Subscription Request to E2node
996 //------------------------------------------------------------------
997 func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) {
999 e2SubscriptionDirectives, err := c.GetE2SubscriptionDirectives(nil)
1001 xapp.Logger.Error("c.GetE2SubscriptionDirectives failure: %s", err.Error())
1003 subs.OngoingReqCount++
1004 go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives, waitRouteCleanup_ms)
1005 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
1006 subs.OngoingReqCount--
1008 switch themsg := event.(type) {
1009 case *e2ap.E2APSubscriptionResponse:
1010 themsg.RequestId.Id = trans.RequestId.Id
1011 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
1014 c.UpdateCounter(cSubRespToXapp)
1015 err := c.rmrSendToXapp("", subs, trans)
1017 xapp.Logger.Error("rmrSendToXapp() failed:%s", err.Error())
1021 case *e2ap.E2APSubscriptionFailure:
1022 themsg.RequestId.Id = trans.RequestId.Id
1023 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
1025 c.UpdateCounter(cSubFailToXapp)
1026 c.rmrSendToXapp("", subs, trans)
1032 xapp.Logger.Debug("XAPP-SubReq: failed %s", idstring(err, trans, subs))
1035 //-------------------------------------------------------------------
1036 // handle from XAPP Subscription Delete Request
1037 //------------------------------------------------------------------
1038 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
1039 xapp.Logger.Debug("MSG from XAPP: %s", params.String())
1040 c.UpdateCounter(cSubDelReqFromXapp)
1042 if c.e2IfState.IsE2ConnectionUp(¶ms.Meid.RanName) == false {
1043 xapp.Logger.Error("No E2 connection for ranName %v", params.Meid.RanName)
1047 subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
1049 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
1053 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
1055 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
1058 defer trans.Release()
1060 err = c.tracker.Track(trans)
1062 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
1066 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
1068 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
1075 subs.OngoingDelCount++
1076 go c.handleSubscriptionDelete(subs, trans, waitRouteCleanup_ms)
1077 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
1078 subs.OngoingDelCount--
1080 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
1082 if subs.NoRespToXapp == true {
1083 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
1084 xapp.Logger.Debug("XAPP-SubDelReq: subs.NoRespToXapp == true")
1088 // Whatever is received success, fail or timeout, send successful delete response
1089 subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
1090 subDelRespMsg.RequestId.Id = trans.RequestId.Id
1091 subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
1092 subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
1093 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
1095 c.UpdateCounter(cSubDelRespToXapp)
1096 err := c.rmrSendToXapp("", subs, trans)
1098 xapp.Logger.Error("rmrSendToXapp() failed:%s", err.Error())
1103 //-------------------------------------------------------------------
1104 // SUBS CREATE Handling
1105 //-------------------------------------------------------------------
1106 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp, e2SubscriptionDirectives *E2SubscriptionDirectives, waitRouteCleanupTime time.Duration) {
1108 var event interface{} = nil
1109 var removeSubscriptionFromDb bool = false
1110 trans := c.tracker.NewSubsTransaction(subs)
1111 subs.WaitTransactionTurn(trans)
1112 defer subs.ReleaseTransactionTurn(trans)
1113 defer trans.Release()
1115 xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
1117 subRfMsg, valid := subs.GetCachedResponse()
1118 if subRfMsg == nil && valid == true {
1119 event = c.sendE2TSubscriptionRequest(subs, trans, parentTrans, e2SubscriptionDirectives)
1120 switch event.(type) {
1121 case *e2ap.E2APSubscriptionResponse:
1122 subRfMsg, valid = subs.SetCachedResponse(event, true)
1123 subs.SubRespRcvd = true
1124 case *e2ap.E2APSubscriptionFailure:
1125 subRfMsg, valid = subs.SetCachedResponse(event, false)
1126 xapp.Logger.Debug("SUBS-SubReq: internal delete due failure event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1127 case *SubmgrRestartTestEvent:
1128 // This is used to simulate that no response has been received and after restart, subscriptions are restored from db
1129 xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
1130 subRfMsg, valid = subs.SetCachedResponse(event, false)
1131 parentTrans.SendEvent(subRfMsg, 0)
1133 case *PackSubscriptionRequestErrortEvent, *SDLWriteErrortEvent:
1134 subRfMsg, valid = subs.SetCachedResponse(event, false)
1137 if subs.PolicyUpdate == false {
1138 xapp.Logger.Debug("SUBS-SubReq: internal delete due default event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1139 subRfMsg, valid = subs.SetCachedResponse(nil, false)
1140 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1142 subRfMsg, valid = subs.SetCachedResponse(nil, true)
1145 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
1147 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
1150 removeSubscriptionFromDb = true
1153 err := c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
1156 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1160 // Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
1162 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanupTime, c)
1165 parentTrans.SendEvent(subRfMsg, 0)
1168 //-------------------------------------------------------------------
1169 // SUBS DELETE Handling
1170 //-------------------------------------------------------------------
1172 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp, waitRouteCleanupTime time.Duration) {
1174 trans := c.tracker.NewSubsTransaction(subs)
1175 subs.WaitTransactionTurn(trans)
1176 defer subs.ReleaseTransactionTurn(trans)
1177 defer trans.Release()
1179 xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
1183 if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
1186 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1191 // Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
1192 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanupTime, c)
1193 parentTrans.SendEvent(nil, 0)
1196 //-------------------------------------------------------------------
1197 // send to E2T Subscription Request
1198 //-------------------------------------------------------------------
1199 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp, e2SubscriptionDirectives *E2SubscriptionDirectives) interface{} {
1201 var event interface{} = nil
1202 var timedOut bool = false
1203 const ricRequestorId = 123
1205 subReqMsg := subs.SubReqMsg
1206 subReqMsg.RequestId = subs.GetReqId().RequestId
1207 subReqMsg.RequestId.Id = ricRequestorId
1208 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
1210 xapp.Logger.Error("SUBS-SubReq ASN1 pack error: %s", idstring(err, trans, subs, parentTrans))
1211 return &PackSubscriptionRequestErrortEvent{
1213 ErrorSource: models.SubscriptionInstanceErrorSourceASN1,
1214 ErrorCause: err.Error(),
1219 // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
1220 err = c.WriteSubscriptionToDb(subs)
1222 return &SDLWriteErrortEvent{
1224 ErrorSource: models.SubscriptionInstanceErrorSourceDBAAS,
1225 ErrorCause: err.Error(),
1230 for retries := int64(0); retries < e2SubscriptionDirectives.E2MaxTryCount; retries++ {
1231 desc := fmt.Sprintf("(retry %d)", retries)
1233 c.UpdateCounter(cSubReqToE2)
1235 c.UpdateCounter(cSubReReqToE2)
1237 err := c.rmrSendToE2T(desc, subs, trans)
1239 xapp.Logger.Error("rmrSendToE2T() failed:%s", err.Error())
1242 if subs.DoNotWaitSubResp == false {
1243 event, timedOut = trans.WaitEvent(e2SubscriptionDirectives.E2TimeoutTimerValue)
1245 c.UpdateCounter(cSubReqTimerExpiry)
1249 // Simulating case where subscrition request has been sent but response has not been received before restart
1250 event = &SubmgrRestartTestEvent{}
1251 xapp.Logger.Debug("Restart event, DoNotWaitSubResp == true")
1255 xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1259 //-------------------------------------------------------------------
1260 // send to E2T Subscription Delete Request
1261 //-------------------------------------------------------------------
1263 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
1265 var event interface{}
1267 const ricRequestorId = 123
1269 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1270 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1271 subDelReqMsg.RequestId.Id = ricRequestorId
1272 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1273 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1275 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
1279 for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
1280 desc := fmt.Sprintf("(retry %d)", retries)
1282 c.UpdateCounter(cSubDelReqToE2)
1284 c.UpdateCounter(cSubDelReReqToE2)
1286 err := c.rmrSendToE2T(desc, subs, trans)
1288 xapp.Logger.Error("SUBS-SubDelReq: rmrSendToE2T failure: %s", idstring(err, trans, subs, parentTrans))
1290 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
1292 c.UpdateCounter(cSubDelReqTimerExpiry)
1297 xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1301 //-------------------------------------------------------------------
1302 // handle from E2T Subscription Response
1303 //-------------------------------------------------------------------
1304 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
1305 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1306 c.UpdateCounter(cSubRespFromE2)
1308 subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
1310 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
1313 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
1315 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
1318 trans := subs.GetTransaction()
1320 err = fmt.Errorf("Ongoing transaction not found")
1321 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
1324 xapp.Logger.Debug("SUBS-SubResp: Sending event, trans= %v", trans)
1325 sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
1326 if sendOk == false {
1327 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1328 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
1333 //-------------------------------------------------------------------
1334 // handle from E2T Subscription Failure
1335 //-------------------------------------------------------------------
1336 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
1337 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1338 c.UpdateCounter(cSubFailFromE2)
1339 subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
1341 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
1344 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
1346 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
1349 trans := subs.GetTransaction()
1351 err = fmt.Errorf("Ongoing transaction not found")
1352 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
1355 sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
1356 if sendOk == false {
1357 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1358 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
1363 //-------------------------------------------------------------------
1364 // handle from E2T Subscription Delete Response
1365 //-------------------------------------------------------------------
1366 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) {
1367 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1368 c.UpdateCounter(cSubDelRespFromE2)
1369 subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
1371 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1374 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
1376 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1379 trans := subs.GetTransaction()
1381 err = fmt.Errorf("Ongoing transaction not found")
1382 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
1385 sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
1386 if sendOk == false {
1387 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1388 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
1393 //-------------------------------------------------------------------
1394 // handle from E2T Subscription Delete Failure
1395 //-------------------------------------------------------------------
1396 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
1397 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1398 c.UpdateCounter(cSubDelFailFromE2)
1399 subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
1401 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1404 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
1406 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1409 trans := subs.GetTransaction()
1411 err = fmt.Errorf("Ongoing transaction not found")
1412 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
1415 sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
1416 if sendOk == false {
1417 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1418 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
1423 //-------------------------------------------------------------------
1425 //-------------------------------------------------------------------
1426 func typeofSubsMessage(v interface{}) string {
1431 //case *e2ap.E2APSubscriptionRequest:
1433 case *e2ap.E2APSubscriptionResponse:
1435 case *e2ap.E2APSubscriptionFailure:
1437 //case *e2ap.E2APSubscriptionDeleteRequest:
1438 // return "SubDelReq"
1439 case *e2ap.E2APSubscriptionDeleteResponse:
1441 case *e2ap.E2APSubscriptionDeleteFailure:
1448 //-------------------------------------------------------------------
1450 //-------------------------------------------------------------------
1451 func (c *Control) WriteSubscriptionToDb(subs *Subscription) error {
1452 xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
1453 err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
1455 xapp.Logger.Error("%v", err)
1461 //-------------------------------------------------------------------
1463 //-------------------------------------------------------------------
1464 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) error {
1466 if removeSubscriptionFromDb == true {
1467 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1468 c.RemoveSubscriptionFromDb(subs)
1470 // Update is needed for successful response and merge case here
1471 if subs.RetryFromXapp == false {
1472 err := c.WriteSubscriptionToDb(subs)
1476 subs.RetryFromXapp = false
1480 //-------------------------------------------------------------------
1482 //-------------------------------------------------------------------
1483 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
1484 xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
1485 err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
1487 xapp.Logger.Error("%v", err)
1491 //-------------------------------------------------------------------
1493 //-------------------------------------------------------------------
1494 func (c *Control) WriteRESTSubscriptionToDb(restSubId string, restSubs *RESTSubscription) {
1495 xapp.Logger.Debug("WriteRESTSubscriptionToDb() restSubId = %s", restSubId)
1496 err := c.WriteRESTSubscriptionToSdl(restSubId, restSubs)
1498 xapp.Logger.Error("%v", err)
1502 //-------------------------------------------------------------------
1504 //-------------------------------------------------------------------
1505 func (c *Control) UpdateRESTSubscriptionInDB(restSubId string, restSubs *RESTSubscription, removeRestSubscriptionFromDb bool) {
1507 if removeRestSubscriptionFromDb == true {
1508 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1509 c.RemoveRESTSubscriptionFromDb(restSubId)
1511 c.WriteRESTSubscriptionToDb(restSubId, restSubs)
1515 //-------------------------------------------------------------------
1517 //-------------------------------------------------------------------
1518 func (c *Control) RemoveRESTSubscriptionFromDb(restSubId string) {
1519 xapp.Logger.Debug("RemoveRESTSubscriptionFromDb() restSubId = %s", restSubId)
1520 err := c.RemoveRESTSubscriptionFromSdl(restSubId)
1522 xapp.Logger.Error("%v", err)
1526 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription, e2SubsDelRequired bool) {
1528 if c.UTTesting == true {
1529 // Reqistry mutex is not locked after real restart but it can be when restart is simulated in unit tests
1530 c.registry.mutex = new(sync.Mutex)
1533 const ricRequestorId = 123
1534 xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
1536 // Send delete for every endpoint in the subscription
1537 if subs.PolicyUpdate == false {
1538 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1539 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1540 subDelReqMsg.RequestId.Id = ricRequestorId
1541 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1542 mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1544 xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
1547 for _, endPoint := range subs.EpList.Endpoints {
1548 params := &xapp.RMRParams{}
1549 params.Mtype = mType
1550 params.SubId = int(subs.GetReqId().InstanceId)
1552 params.Meid = subs.Meid
1553 params.Src = endPoint.String()
1554 params.PayloadLen = len(payload.Buf)
1555 params.Payload = payload.Buf
1557 subs.DeleteFromDb = true
1558 if !e2SubsDelRequired {
1559 c.handleXAPPSubscriptionDeleteRequest(params)
1561 c.SendSubscriptionDeleteReqToE2T(subs, params)
1567 func (c *Control) PrintRESTSubscriptionRequest(p *models.SubscriptionParams) {
1569 fmt.Println("CRESTSubscriptionRequest")
1575 if p.SubscriptionID != "" {
1576 fmt.Println(" SubscriptionID = ", p.SubscriptionID)
1578 fmt.Println(" SubscriptionID = ''")
1581 fmt.Printf(" ClientEndpoint.Host = %s\n", p.ClientEndpoint.Host)
1583 if p.ClientEndpoint.HTTPPort != nil {
1584 fmt.Printf(" ClientEndpoint.HTTPPort = %v\n", *p.ClientEndpoint.HTTPPort)
1586 fmt.Println(" ClientEndpoint.HTTPPort = nil")
1589 if p.ClientEndpoint.RMRPort != nil {
1590 fmt.Printf(" ClientEndpoint.RMRPort = %v\n", *p.ClientEndpoint.RMRPort)
1592 fmt.Println(" ClientEndpoint.RMRPort = nil")
1596 fmt.Printf(" Meid = %s\n", *p.Meid)
1598 fmt.Println(" Meid = nil")
1601 if p.E2SubscriptionDirectives == nil {
1602 fmt.Println(" E2SubscriptionDirectives = nil")
1604 fmt.Println(" E2SubscriptionDirectives")
1605 if p.E2SubscriptionDirectives.E2RetryCount == nil {
1606 fmt.Println(" E2RetryCount == nil")
1608 fmt.Printf(" E2RetryCount = %v\n", *p.E2SubscriptionDirectives.E2RetryCount)
1610 fmt.Printf(" E2TimeoutTimerValue = %v\n", p.E2SubscriptionDirectives.E2TimeoutTimerValue)
1611 fmt.Printf(" RMRRoutingNeeded = %v\n", p.E2SubscriptionDirectives.RMRRoutingNeeded)
1613 for _, subscriptionDetail := range p.SubscriptionDetails {
1614 if p.RANFunctionID != nil {
1615 fmt.Printf(" RANFunctionID = %v\n", *p.RANFunctionID)
1617 fmt.Println(" RANFunctionID = nil")
1619 fmt.Printf(" SubscriptionDetail.XappEventInstanceID = %v\n", *subscriptionDetail.XappEventInstanceID)
1620 fmt.Printf(" SubscriptionDetail.EventTriggers = %v\n", subscriptionDetail.EventTriggers)
1622 for _, actionToBeSetup := range subscriptionDetail.ActionToBeSetupList {
1623 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionID = %v\n", *actionToBeSetup.ActionID)
1624 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionType = %s\n", *actionToBeSetup.ActionType)
1625 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionDefinition = %v\n", actionToBeSetup.ActionDefinition)
1627 if actionToBeSetup.SubsequentAction != nil {
1628 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.SubsequentAction.SubsequentActionType = %s\n", *actionToBeSetup.SubsequentAction.SubsequentActionType)
1629 fmt.Printf(" SubscriptionDetail.ActionToBeSetup..SubsequentAction.TimeToWait = %s\n", *actionToBeSetup.SubsequentAction.TimeToWait)
1631 fmt.Println(" SubscriptionDetail.ActionToBeSetup.SubsequentAction = nil")
1637 //-------------------------------------------------------------------
1638 // handle from E2T Subscription Delete Required
1639 //-------------------------------------------------------------------
1640 func (c *Control) handleE2TSubscriptionDeleteRequired(params *xapp.RMRParams) {
1641 xapp.Logger.Info("MSG from E2T: %s", params.String())
1642 c.UpdateCounter(cSubDelRequFromE2)
1643 subsDelRequMsg, err := c.e2ap.UnpackSubscriptionDeleteRequired(params.Payload)
1645 xapp.Logger.Error("MSG-SubDelRequired: %s", idstring(err, params))
1646 //c.sendE2TErrorIndication(nil)
1649 var subscriptions = map[string][]e2ap.E2APSubscriptionDeleteRequired{}
1650 var subDB = []*Subscription{}
1651 for _, subsTobeRemove := range subsDelRequMsg.E2APSubscriptionDeleteRequiredRequests {
1652 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subsTobeRemove.RequestId.InstanceId})
1654 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1657 // Check if Delete Subscription Already triggered
1658 if subs.OngoingDelCount > 0 {
1661 subDB = append(subDB, subs)
1662 for _, endpoint := range subs.EpList.Endpoints {
1663 subscriptions[endpoint.Addr] = append(subscriptions[endpoint.Addr], subsTobeRemove)
1665 // Sending Subscription Delete Request to E2T
1666 // c.SendSubscriptionDeleteReq(subs, true)
1668 for _, subsTobeRemove := range subDB {
1669 // Sending Subscription Delete Request to E2T
1670 c.SendSubscriptionDeleteReq(subsTobeRemove, true)
1674 //-----------------------------------------------------------------
1675 // Initiate RIC Subscription Delete Request after receiving
1676 // RIC Subscription Delete Required from E2T
1677 //-----------------------------------------------------------------
1678 func (c *Control) SendSubscriptionDeleteReqToE2T(subs *Subscription, params *xapp.RMRParams) {
1679 xapp.Logger.Debug("MSG TO E2T: %s", params.String())
1680 c.UpdateCounter(cSubDelReqToE2)
1682 if c.e2IfState.IsE2ConnectionUp(¶ms.Meid.RanName) == false {
1683 xapp.Logger.Error("No E2 connection for ranName %v", params.Meid.RanName)
1687 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subs.ReqId.RequestId, params.Meid)
1689 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
1692 defer trans.Release()
1694 err := c.tracker.Track(trans)
1696 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
1703 subs.OngoingDelCount++
1704 go c.handleSubscriptionDelete(subs, trans, waitRouteCleanup_ms)
1705 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
1706 subs.OngoingDelCount--
1708 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
1710 if subs.NoRespToXapp == true {
1711 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
1712 xapp.Logger.Debug("XAPP-SubDelReq: subs.NoRespToXapp == true")