2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
28 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
29 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
30 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
31 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/common"
32 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
33 httptransport "github.com/go-openapi/runtime/client"
34 "github.com/go-openapi/strfmt"
35 "github.com/segmentio/ksuid"
36 "github.com/spf13/viper"
39 //-----------------------------------------------------------------------------
41 //-----------------------------------------------------------------------------
43 func idstring(err error, entries ...fmt.Stringer) string {
44 var retval string = ""
45 var filler string = ""
46 for _, entry := range entries {
48 retval += filler + entry.String()
51 retval += filler + "(NIL)"
55 retval += filler + "err(" + err.Error() + ")"
61 //-----------------------------------------------------------------------------
63 //-----------------------------------------------------------------------------
65 var e2tSubReqTimeout time.Duration
66 var e2tSubDelReqTime time.Duration
67 var e2tRecvMsgTimeout time.Duration
68 var waitRouteCleanup_ms time.Duration
69 var e2tMaxSubReqTryCount uint64 // Initial try + retry
70 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
71 var checkE2State string
72 var readSubsFromDb string
73 var dbRetryForever string
81 restDuplicateCtrl *DuplicateCtrl
83 e2IfStateDb XappRnibInterface
85 restSubsDb Sdlnterface
88 Counters map[string]xapp.Counter
99 type SubmgrRestartTestEvent struct{}
100 type SubmgrRestartUpEvent struct{}
101 type PackSubscriptionRequestErrortEvent struct {
105 func (p *PackSubscriptionRequestErrortEvent) SetEvent(errorInfo *ErrorInfo) {
106 p.ErrorInfo = *errorInfo
109 type SDLWriteErrortEvent struct {
113 func (s *SDLWriteErrortEvent) SetEvent(errorInfo *ErrorInfo) {
114 s.ErrorInfo = *errorInfo
118 xapp.Logger.Debug("SUBMGR")
120 viper.SetEnvPrefix("submgr")
121 viper.AllowEmptyEnv(true)
124 func NewControl() *Control {
126 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
127 rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
129 registry := new(Registry)
130 registry.Initialize()
131 registry.rtmgrClient = &rtmgrClient
133 tracker := new(Tracker)
136 restDuplicateCtrl := new(DuplicateCtrl)
137 restDuplicateCtrl.Init()
139 e2IfState := new(E2IfState)
141 c := &Control{e2ap: new(E2ap),
144 restDuplicateCtrl: restDuplicateCtrl,
145 e2IfState: e2IfState,
146 e2IfStateDb: CreateXappRnibIfInstance(),
147 e2SubsDb: CreateSdl(),
148 restSubsDb: CreateRESTSdl(),
149 Counters: xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
154 c.ReadConfigParameters("")
156 // Register REST handler for testing support
157 xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET")
158 xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
159 xapp.Resource.InjectRoute("/ric/v1/restsubscriptions", c.GetAllRestSubscriptions, "GET")
161 xapp.Resource.InjectRoute("/ric/v1/get_all_e2nodes", c.GetAllE2Nodes, "GET")
162 xapp.Resource.InjectRoute("/ric/v1/get_e2node_rest_subscriptions/{ranName}", c.GetAllE2NodeRestSubscriptions, "GET")
164 xapp.Resource.InjectRoute("/ric/v1/get_all_xapps", c.GetAllXapps, "GET")
165 xapp.Resource.InjectRoute("/ric/v1/get_xapp_rest_restsubscriptions/{xappServiceName}", c.GetAllXappRestSubscriptions, "GET")
166 xapp.Resource.InjectRoute("/ric/v1/get_e2subscriptions/{restId}", c.GetE2Subscriptions, "GET")
168 xapp.Resource.InjectRoute("/ric/v1/delete_all_e2node_subscriptions/{ranName}", c.DeleteAllE2nodeSubscriptions, "DELETE")
169 xapp.Resource.InjectRoute("/ric/v1/delete_all_xapp_subscriptions/{xappServiceName}", c.DeleteAllXappSubscriptions, "DELETE")
171 if readSubsFromDb == "true" {
172 // Read subscriptions from db
173 err := c.ReadE2Subscriptions()
175 xapp.Logger.Error("ReadE2Subscriptions() failed %s", err.Error())
177 err = c.ReadRESTSubscriptions()
179 xapp.Logger.Error("ReadRESTSubscriptions() failed %s", err.Error())
184 err := xapp.Subscription.Listen(c.RESTSubscriptionHandler, c.RESTQueryHandler, c.RESTSubscriptionDeleteHandler)
186 xapp.Logger.Error("xapp.Subscription.Listen failure: %s", err.Error())
192 func (c *Control) SymptomDataHandler(w http.ResponseWriter, r *http.Request) {
193 subscriptions, err := c.registry.QueryHandler()
195 xapp.Logger.Error("QueryHandler() failed %s", err.Error())
198 xapp.Resource.SendSymptomDataJson(w, r, subscriptions, "platform/subscriptions.json")
201 //-------------------------------------------------------------------
203 //-------------------------------------------------------------------
204 func (c *Control) RESTQueryHandler() (models.SubscriptionList, error) {
205 xapp.Logger.Debug("RESTQueryHandler() called")
209 return c.registry.QueryHandler()
212 //-------------------------------------------------------------------
214 //-------------------------------------------------------------------
215 func (c *Control) ReadE2Subscriptions() error {
218 var register map[uint32]*Subscription
219 for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
220 xapp.Logger.Debug("Reading E2 subscriptions from db")
221 subIds, register, err = c.ReadAllSubscriptionsFromSdl()
223 xapp.Logger.Error("%v", err)
224 <-time.After(1 * time.Second)
226 c.registry.subIds = subIds
227 c.registry.register = register
228 go c.HandleUncompletedSubscriptions(register)
232 xapp.Logger.Debug("Continuing without retring")
236 //-------------------------------------------------------------------
238 //-------------------------------------------------------------------
239 func (c *Control) ReadRESTSubscriptions() error {
241 xapp.Logger.Debug("ReadRESTSubscriptions()")
243 var restSubscriptions map[string]*RESTSubscription
244 for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
245 xapp.Logger.Debug("Reading REST subscriptions from db")
246 restSubscriptions, err = c.ReadAllRESTSubscriptionsFromSdl()
248 xapp.Logger.Error("%v", err)
249 <-time.After(1 * time.Second)
251 // Fix REST subscriptions ongoing status after restart
252 for restSubId, restSubscription := range restSubscriptions {
253 restSubscription.SubReqOngoing = false
254 restSubscription.SubDelReqOngoing = false
255 err := c.WriteRESTSubscriptionToSdl(restSubId, restSubscription)
257 xapp.Logger.Error("WriteRESTSubscriptionToSdl() failed:%s", err.Error())
260 c.registry.restSubscriptions = restSubscriptions
264 xapp.Logger.Debug("Continuing without retring")
268 //-------------------------------------------------------------------
270 //-------------------------------------------------------------------
271 func (c *Control) ReadConfigParameters(f string) {
273 xapp.Logger.Debug("ReadConfigParameters")
275 c.LoggerLevel = int(xapp.Logger.GetLevel())
276 xapp.Logger.Info("LoggerLevel = %v", c.LoggerLevel)
277 c.e2ap.SetASN1DebugPrintStatus(c.LoggerLevel)
279 // viper.GetDuration returns nanoseconds
280 e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
281 if e2tSubReqTimeout == 0 {
282 e2tSubReqTimeout = 2000 * 1000000
283 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tSubReqTimeout")
285 xapp.Logger.Debug("e2tSubReqTimeout= %v", e2tSubReqTimeout)
287 e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
288 if e2tSubDelReqTime == 0 {
289 e2tSubDelReqTime = 2000 * 1000000
290 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tSubDelReqTime")
292 xapp.Logger.Debug("e2tSubDelReqTime= %v", e2tSubDelReqTime)
294 e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
295 if e2tRecvMsgTimeout == 0 {
296 e2tRecvMsgTimeout = 2000 * 1000000
297 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tRecvMsgTimeout")
299 xapp.Logger.Debug("e2tRecvMsgTimeout= %v", e2tRecvMsgTimeout)
301 e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
302 if e2tMaxSubReqTryCount == 0 {
303 e2tMaxSubReqTryCount = 1
304 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tMaxSubReqTryCount")
306 xapp.Logger.Debug("e2tMaxSubReqTryCount= %v", e2tMaxSubReqTryCount)
308 e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
309 if e2tMaxSubDelReqTryCount == 0 {
310 e2tMaxSubDelReqTryCount = 1
311 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tMaxSubDelReqTryCount")
313 xapp.Logger.Debug("e2tMaxSubDelReqTryCount= %v", e2tMaxSubDelReqTryCount)
315 checkE2State = viper.GetString("controls.checkE2State")
316 if checkE2State == "" {
317 checkE2State = "true"
318 xapp.Logger.Debug("WARNING: Using hard coded default value for checkE2State")
320 xapp.Logger.Debug("checkE2State= %v", checkE2State)
322 readSubsFromDb = viper.GetString("controls.readSubsFromDb")
323 if readSubsFromDb == "" {
324 readSubsFromDb = "true"
325 xapp.Logger.Debug("WARNING: Using hard coded default value for readSubsFromDb")
327 xapp.Logger.Debug("readSubsFromDb= %v", readSubsFromDb)
329 dbTryCount = viper.GetInt("controls.dbTryCount")
332 xapp.Logger.Debug("WARNING: Using hard coded default value for dbTryCount")
334 xapp.Logger.Debug("dbTryCount= %v", dbTryCount)
336 dbRetryForever = viper.GetString("controls.dbRetryForever")
337 if dbRetryForever == "" {
338 dbRetryForever = "true"
339 xapp.Logger.Debug("WARNING: Using hard coded default value for dbRetryForever")
341 xapp.Logger.Debug("dbRetryForever= %v", dbRetryForever)
343 // Internal cfg parameter, used to define a wait time for RMR route clean-up. None default
344 // value 100ms used currently only in unittests.
345 waitRouteCleanup_ms = viper.GetDuration("controls.waitRouteCleanup_ms") * 1000000
346 if waitRouteCleanup_ms == 0 {
347 waitRouteCleanup_ms = 5000 * 1000000
348 xapp.Logger.Debug("WARNING: Using hard coded default value for waitRouteCleanup_ms")
350 xapp.Logger.Debug("waitRouteCleanup= %v", waitRouteCleanup_ms)
353 //-------------------------------------------------------------------
355 //-------------------------------------------------------------------
356 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
358 xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
359 for subId, subs := range register {
360 if subs.SubRespRcvd == false {
361 // If policy subscription has already been made successfully unsuccessful update should not be deleted.
362 if subs.PolicyUpdate == false {
363 subs.NoRespToXapp = true
364 xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
365 c.SendSubscriptionDeleteReq(subs)
371 func (c *Control) ReadyCB(data interface{}) {
372 if c.RMRClient == nil {
373 c.RMRClient = xapp.Rmr
377 func (c *Control) Run() {
378 xapp.SetReadyCB(c.ReadyCB, nil)
379 xapp.AddConfigChangeListener(c.ReadConfigParameters)
383 //-------------------------------------------------------------------
385 //-------------------------------------------------------------------
386 func (c *Control) GetOrCreateRestSubscription(p *models.SubscriptionParams, md5sum string, xAppRmrEndpoint string, xAppServiceName string) (*RESTSubscription, string, error) {
389 var restSubscription *RESTSubscription
392 prevRestSubsId, exists := c.restDuplicateCtrl.GetLastKnownRestSubsIdBasedOnMd5sum(md5sum)
393 if p.SubscriptionID == "" {
394 // Subscription does not contain REST subscription Id
396 restSubscription, err = c.registry.GetRESTSubscription(prevRestSubsId, false)
397 if restSubscription != nil {
398 // Subscription not found
399 restSubId = prevRestSubsId
401 xapp.Logger.Debug("Existing restSubId %s found by MD5sum %s for a request without subscription ID - using previous subscription", prevRestSubsId, md5sum)
403 xapp.Logger.Debug("Existing restSubId %s found by MD5sum %s for a request without subscription ID - Note: %s", prevRestSubsId, md5sum, err.Error())
406 xapp.Logger.Debug("None existing restSubId %s referred by MD5sum %s for a request without subscription ID - deleting cached entry", prevRestSubsId, md5sum)
407 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
411 if restSubscription == nil {
412 restSubId = ksuid.New().String()
413 restSubscription = c.registry.CreateRESTSubscription(&restSubId, &xAppServiceName, &xAppRmrEndpoint, p.Meid)
416 // Subscription contains REST subscription Id
417 restSubId = p.SubscriptionID
419 xapp.Logger.Debug("RestSubscription ID %s provided via REST request", restSubId)
420 restSubscription, err = c.registry.GetRESTSubscription(restSubId, false)
422 // Subscription with id in REST request does not exist
423 xapp.Logger.Error("%s", err.Error())
424 c.UpdateCounter(cRestSubFailToXapp)
429 xapp.Logger.Debug("Existing restSubscription found for ID %s, new request based on md5sum", restSubId)
431 xapp.Logger.Debug("Existing restSubscription found for ID %s(%s), re-transmission based on md5sum match with previous request", prevRestSubsId, restSubId)
435 return restSubscription, restSubId, nil
438 //-------------------------------------------------------------------
440 //-------------------------------------------------------------------
441 func (c *Control) RESTSubscriptionHandler(params interface{}) (*models.SubscriptionResponse, int) {
444 c.UpdateCounter(cRestSubReqFromXapp)
446 subResp := models.SubscriptionResponse{}
447 p := params.(*models.SubscriptionParams)
449 if c.LoggerLevel > 2 {
450 c.PrintRESTSubscriptionRequest(p)
453 if c.e2IfState.IsE2ConnectionUp(p.Meid) == false {
454 xapp.Logger.Error("No E2 connection for ranName %v", *p.Meid)
455 c.UpdateCounter(cRestReqRejDueE2Down)
456 return nil, common.SubscribeServiceUnavailableCode
459 if p.ClientEndpoint == nil {
460 err := fmt.Errorf("ClientEndpoint == nil")
461 xapp.Logger.Error("%v", err)
462 c.UpdateCounter(cRestSubFailToXapp)
463 return nil, common.SubscribeBadRequestCode
466 e2SubscriptionDirectives, err := c.GetE2SubscriptionDirectives(p)
468 xapp.Logger.Error("%s", err)
469 c.UpdateCounter(cRestSubFailToXapp)
470 return nil, common.SubscribeBadRequestCode
472 _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*p.ClientEndpoint)
474 xapp.Logger.Error("%s", err.Error())
475 c.UpdateCounter(cRestSubFailToXapp)
476 return nil, common.SubscribeBadRequestCode
479 md5sum, err := CalculateRequestMd5sum(params)
481 xapp.Logger.Error("Failed to generate md5sum from incoming request - %s", err.Error())
484 restSubscription, restSubId, err := c.GetOrCreateRestSubscription(p, md5sum, xAppRmrEndpoint, p.ClientEndpoint.Host)
486 xapp.Logger.Error("Subscription with id in REST request does not exist")
487 return nil, common.SubscribeNotFoundCode
490 subResp.SubscriptionID = &restSubId
491 subReqList := e2ap.SubscriptionRequestList{}
492 err = c.e2ap.FillSubscriptionReqMsgs(params, &subReqList, restSubscription)
494 xapp.Logger.Error("%s", err.Error())
495 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
496 c.registry.DeleteRESTSubscription(&restSubId)
497 c.UpdateCounter(cRestSubFailToXapp)
498 return nil, common.SubscribeBadRequestCode
501 duplicate := c.restDuplicateCtrl.IsDuplicateToOngoingTransaction(restSubId, md5sum)
503 err := fmt.Errorf("Retransmission blocker direct ACK for request of restSubsId %s restSubId MD5sum %s as retransmission", restSubId, md5sum)
504 xapp.Logger.Debug("%s", err)
505 c.registry.DeleteRESTSubscription(&restSubId)
506 c.UpdateCounter(cRestSubRespToXapp)
507 return &subResp, common.SubscribeCreatedCode
510 c.WriteRESTSubscriptionToDb(restSubId, restSubscription)
511 go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId, xAppRmrEndpoint, md5sum, e2SubscriptionDirectives)
513 c.UpdateCounter(cRestSubRespToXapp)
514 return &subResp, common.SubscribeCreatedCode
517 //-------------------------------------------------------------------
519 //-------------------------------------------------------------------
520 func (c *Control) GetE2SubscriptionDirectives(p *models.SubscriptionParams) (*E2SubscriptionDirectives, error) {
522 e2SubscriptionDirectives := &E2SubscriptionDirectives{}
523 if p == nil || p.E2SubscriptionDirectives == nil {
524 e2SubscriptionDirectives.E2TimeoutTimerValue = e2tSubReqTimeout
525 e2SubscriptionDirectives.E2MaxTryCount = int64(e2tMaxSubReqTryCount)
526 e2SubscriptionDirectives.CreateRMRRoute = true
527 xapp.Logger.Debug("p == nil || p.E2SubscriptionDirectives == nil. Using default values for E2TimeoutTimerValue = %v and E2RetryCount = %v RMRRoutingNeeded = true", e2tSubReqTimeout, e2tMaxSubReqTryCount)
529 if p.E2SubscriptionDirectives.E2TimeoutTimerValue >= 1 && p.E2SubscriptionDirectives.E2TimeoutTimerValue <= 10 {
530 e2SubscriptionDirectives.E2TimeoutTimerValue = time.Duration(p.E2SubscriptionDirectives.E2TimeoutTimerValue) * 1000000000 // Duration type cast returns nano seconds
532 return nil, fmt.Errorf("p.E2SubscriptionDirectives.E2TimeoutTimerValue out of range (1-10 seconds): %v", p.E2SubscriptionDirectives.E2TimeoutTimerValue)
534 if p.E2SubscriptionDirectives.E2RetryCount == nil {
535 xapp.Logger.Error("p.E2SubscriptionDirectives.E2RetryCount == nil. Using default value")
536 e2SubscriptionDirectives.E2MaxTryCount = int64(e2tMaxSubReqTryCount)
538 if *p.E2SubscriptionDirectives.E2RetryCount >= 0 && *p.E2SubscriptionDirectives.E2RetryCount <= 10 {
539 e2SubscriptionDirectives.E2MaxTryCount = *p.E2SubscriptionDirectives.E2RetryCount + 1 // E2MaxTryCount = First sending plus two retries
541 return nil, fmt.Errorf("p.E2SubscriptionDirectives.E2RetryCount out of range (0-10): %v", *p.E2SubscriptionDirectives.E2RetryCount)
544 e2SubscriptionDirectives.CreateRMRRoute = p.E2SubscriptionDirectives.RMRRoutingNeeded
546 xapp.Logger.Debug("e2SubscriptionDirectives.E2TimeoutTimerValue: %v", e2SubscriptionDirectives.E2TimeoutTimerValue)
547 xapp.Logger.Debug("e2SubscriptionDirectives.E2MaxTryCount: %v", e2SubscriptionDirectives.E2MaxTryCount)
548 xapp.Logger.Debug("e2SubscriptionDirectives.CreateRMRRoute: %v", e2SubscriptionDirectives.CreateRMRRoute)
549 return e2SubscriptionDirectives, nil
552 //-------------------------------------------------------------------
554 //-------------------------------------------------------------------
556 func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription, subReqList *e2ap.SubscriptionRequestList,
557 clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string, xAppRmrEndpoint string, md5sum string, e2SubscriptionDirectives *E2SubscriptionDirectives) {
559 c.SubscriptionProcessingStartDelay()
560 xapp.Logger.Debug("E2 SubscriptionRequest count = %v ", len(subReqList.E2APSubscriptionRequests))
562 var xAppEventInstanceID int64
563 var e2EventInstanceID int64
564 errorInfo := &ErrorInfo{}
566 defer c.restDuplicateCtrl.SetMd5sumFromLastOkRequest(*restSubId, md5sum)
568 for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ {
569 subReqMsg := subReqList.E2APSubscriptionRequests[index]
570 xAppEventInstanceID = (int64)(subReqMsg.RequestId.Id)
572 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), *restSubId, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid})
574 // Send notification to xApp that prosessing of a Subscription Request has failed.
575 err := fmt.Errorf("Tracking failure")
576 errorInfo.ErrorCause = err.Error()
577 c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans, errorInfo)
581 xapp.Logger.Debug("Handle SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
583 subRespMsg, errorInfo, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, *restSubId, e2SubscriptionDirectives)
585 xapp.Logger.Debug("Handled SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
589 if err.Error() == "TEST: restart event received" {
590 // This is just for UT cases. Stop here subscription processing
593 c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans, errorInfo)
595 e2EventInstanceID = (int64)(subRespMsg.RequestId.InstanceId)
596 restSubscription.AddMd5Sum(md5sum)
597 xapp.Logger.Debug("SubscriptionRequest index=%v processed successfullyfor %s. endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
598 index, *restSubId, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
599 c.sendSuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, e2EventInstanceID, clientEndpoint, trans, errorInfo)
604 //-------------------------------------------------------------------
606 //------------------------------------------------------------------
607 func (c *Control) SubscriptionProcessingStartDelay() {
608 if c.UTTesting == true {
609 // This is temporary fix for the UT problem that notification arrives before subscription response
610 // Correct fix would be to allow notification come before response and process it correctly
611 xapp.Logger.Debug("Setting 50 ms delay before starting processing Subscriptions")
612 <-time.After(time.Millisecond * 50)
613 xapp.Logger.Debug("Continuing after delay")
617 //-------------------------------------------------------------------
619 //------------------------------------------------------------------
620 func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, meid *string,
621 restSubId string, e2SubscriptionDirectives *E2SubscriptionDirectives) (*e2ap.E2APSubscriptionResponse, *ErrorInfo, error) {
623 errorInfo := ErrorInfo{}
625 err := c.tracker.Track(trans)
627 xapp.Logger.Error("XAPP-SubReq Tracking error: %s", idstring(err, trans))
628 errorInfo.ErrorCause = err.Error()
629 err = fmt.Errorf("Tracking failure")
630 return nil, &errorInfo, err
633 subs, errorInfo, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c, e2SubscriptionDirectives.CreateRMRRoute)
635 xapp.Logger.Error("XAPP-SubReq Assign error: %s", idstring(err, trans))
636 return nil, &errorInfo, err
642 subs.OngoingReqCount++
643 go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives, 0)
644 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
645 subs.OngoingReqCount--
649 switch themsg := event.(type) {
650 case *e2ap.E2APSubscriptionResponse:
652 if c.e2IfState.IsE2ConnectionUp(meid) == true {
653 errorInfo = c.e2ap.CheckActionNotAdmittedList(xapp.RIC_SUB_RESP, themsg.ActionNotAdmittedList, c)
654 return themsg, &errorInfo, nil
656 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
657 c.RemoveSubscriptionFromDb(subs)
658 err = fmt.Errorf("E2 interface down")
659 errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
661 case *e2ap.E2APSubscriptionFailure:
662 err = fmt.Errorf("RICSubscriptionFailure. E2NodeCause: (Cause:%v, Value %v)", themsg.Cause.Content, themsg.Cause.Value)
663 errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
664 case *PackSubscriptionRequestErrortEvent:
665 err = fmt.Errorf("E2 RICSubscriptionRequest pack failure")
666 errorInfo = themsg.ErrorInfo
667 case *SDLWriteErrortEvent:
668 err = fmt.Errorf("SDL write failure")
669 errorInfo = themsg.ErrorInfo
670 case *SubmgrRestartTestEvent:
671 err = fmt.Errorf("TEST: restart event received")
672 xapp.Logger.Debug("%s", err)
673 return nil, &errorInfo, err
675 err = fmt.Errorf("Unexpected E2 subscription response received")
676 errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
681 err = fmt.Errorf("E2 RICSubscriptionResponse timeout")
682 errorInfo.SetInfo(err.Error(), "", models.SubscriptionInstanceTimeoutTypeE2Timeout)
683 if subs.PolicyUpdate == true {
684 return nil, &errorInfo, err
688 xapp.Logger.Error("XAPP-SubReq E2 subscription failed: %s", idstring(err, trans, subs))
689 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
691 return nil, &errorInfo, err
694 //-------------------------------------------------------------------
696 //-------------------------------------------------------------------
697 func (c *Control) sendUnsuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, err error,
698 clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp, errorInfo *ErrorInfo) {
700 // Send notification to xApp that prosessing of a Subscription Request has failed.
701 e2EventInstanceID := (int64)(0)
702 if errorInfo.ErrorSource == "" {
703 // Submgr is default source of error
704 errorInfo.ErrorSource = models.SubscriptionInstanceErrorSourceSUBMGR
706 resp := &models.SubscriptionResponse{
707 SubscriptionID: restSubId,
708 SubscriptionInstances: []*models.SubscriptionInstance{
709 &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
710 ErrorCause: errorInfo.ErrorCause,
711 ErrorSource: errorInfo.ErrorSource,
712 TimeoutType: errorInfo.TimeoutType,
713 XappEventInstanceID: &xAppEventInstanceID},
716 // Mark REST subscription request processed.
717 restSubscription.SetProcessed(err)
718 c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
720 xapp.Logger.Debug("Sending unsuccessful REST notification: ErrorCause:%s, ErrorSource:%s, TimeoutType:%s, to Endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
721 errorInfo.ErrorCause, errorInfo.ErrorSource, errorInfo.TimeoutType, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
723 xapp.Logger.Debug("Sending unsuccessful REST notification: ErrorCause:%s, ErrorSource:%s, TimeoutType:%s, to Endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v",
724 errorInfo.ErrorCause, errorInfo.ErrorSource, errorInfo.TimeoutType, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID)
727 c.UpdateCounter(cRestSubFailNotifToXapp)
728 err = xapp.Subscription.Notify(resp, *clientEndpoint)
730 xapp.Logger.Error("xapp.Subscription.Notify failed %s", err.Error())
733 // E2 is down. Delete completely processed request safely now
734 if c.e2IfState.IsE2ConnectionUp(&restSubscription.Meid) == false && restSubscription.SubReqOngoing == false {
735 c.registry.DeleteRESTSubscription(restSubId)
736 c.RemoveRESTSubscriptionFromDb(*restSubId)
740 //-------------------------------------------------------------------
742 //-------------------------------------------------------------------
743 func (c *Control) sendSuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, e2EventInstanceID int64,
744 clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp, errorInfo *ErrorInfo) {
746 // Store successfully processed InstanceId for deletion
747 restSubscription.AddE2InstanceId((uint32)(e2EventInstanceID))
748 restSubscription.AddXappIdToE2Id(xAppEventInstanceID, e2EventInstanceID)
750 // Send notification to xApp that a Subscription Request has been processed.
751 resp := &models.SubscriptionResponse{
752 SubscriptionID: restSubId,
753 SubscriptionInstances: []*models.SubscriptionInstance{
754 &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
755 ErrorCause: errorInfo.ErrorCause,
756 ErrorSource: errorInfo.ErrorSource,
757 XappEventInstanceID: &xAppEventInstanceID},
760 // Mark REST subscription request processesd.
761 restSubscription.SetProcessed(nil)
762 c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
763 xapp.Logger.Debug("Sending successful REST notification: ErrorCause:%s, ErrorSource:%s, TimeoutType:%s, to Endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
764 errorInfo.ErrorCause, errorInfo.ErrorSource, errorInfo.TimeoutType, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
765 c.UpdateCounter(cRestSubNotifToXapp)
766 err := xapp.Subscription.Notify(resp, *clientEndpoint)
768 xapp.Logger.Error("xapp.Subscription.Notify failed %s", err.Error())
771 // E2 is down. Delete completely processed request safely now
772 if c.e2IfState.IsE2ConnectionUp(&restSubscription.Meid) == false && restSubscription.SubReqOngoing == false {
773 c.registry.DeleteRESTSubscription(restSubId)
774 c.RemoveRESTSubscriptionFromDb(*restSubId)
778 //-------------------------------------------------------------------
780 //-------------------------------------------------------------------
781 func (c *Control) RESTSubscriptionDeleteHandler(restSubId string) int {
784 c.UpdateCounter(cRestSubDelReqFromXapp)
786 xapp.Logger.Debug("SubscriptionDeleteRequest from XAPP")
788 restSubscription, err := c.registry.GetRESTSubscription(restSubId, true)
790 xapp.Logger.Error("%s", err.Error())
791 if restSubscription == nil {
792 // Subscription was not found
793 c.UpdateCounter(cRestSubDelRespToXapp)
794 return common.UnsubscribeNoContentCode
796 if restSubscription.SubReqOngoing == true {
797 err := fmt.Errorf("Handling of the REST Subscription Request still ongoing %s", restSubId)
798 xapp.Logger.Error("%s", err.Error())
799 c.UpdateCounter(cRestSubDelFailToXapp)
800 return common.UnsubscribeBadRequestCode
801 } else if restSubscription.SubDelReqOngoing == true {
802 // Previous request for same restSubId still ongoing
803 c.UpdateCounter(cRestSubDelRespToXapp)
804 return common.UnsubscribeNoContentCode
809 xAppRmrEndPoint := restSubscription.xAppRmrEndPoint
811 xapp.Logger.Debug("Deleteting handler: processing instances = %v", restSubscription.InstanceIds)
812 for _, instanceId := range restSubscription.InstanceIds {
813 xAppEventInstanceID, err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId, 0)
816 xapp.Logger.Error("%s", err.Error())
818 xapp.Logger.Debug("Deleteting instanceId = %v", instanceId)
819 restSubscription.DeleteXappIdToE2Id(xAppEventInstanceID)
820 restSubscription.DeleteE2InstanceId(instanceId)
822 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(restSubscription.lastReqMd5sum)
823 c.registry.DeleteRESTSubscription(&restSubId)
824 c.RemoveRESTSubscriptionFromDb(restSubId)
827 c.UpdateCounter(cRestSubDelRespToXapp)
828 return common.UnsubscribeNoContentCode
831 //-------------------------------------------------------------------
833 //-------------------------------------------------------------------
834 func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32, waitRouteCleanupTime time.Duration) (int64, error) {
836 var xAppEventInstanceID int64
837 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{instanceId})
839 xapp.Logger.Debug("Subscription Delete Handler subscription for restSubId=%v, E2EventInstanceID=%v not found %s",
840 restSubId, instanceId, idstring(err, nil))
841 return xAppEventInstanceID, nil
844 xAppEventInstanceID = int64(subs.ReqId.Id)
845 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(*endPoint), *restSubId, e2ap.RequestId{subs.ReqId.Id, 0}, &xapp.RMRMeid{RanName: *meid})
847 err := fmt.Errorf("XAPP-SubDelReq transaction not created. restSubId %s, endPoint %s, meid %s, instanceId %v", *restSubId, *endPoint, *meid, instanceId)
848 xapp.Logger.Error("%s", err.Error())
850 defer trans.Release()
852 err = c.tracker.Track(trans)
854 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
855 xapp.Logger.Error("%s", err.Error())
856 return xAppEventInstanceID, &time.ParseError{}
861 subs.OngoingDelCount++
862 go c.handleSubscriptionDelete(subs, trans, waitRouteCleanupTime)
863 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
864 subs.OngoingDelCount--
866 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
868 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
870 return xAppEventInstanceID, nil
873 //-------------------------------------------------------------------
875 //-------------------------------------------------------------------
877 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
878 params := &xapp.RMRParams{}
879 params.Mtype = trans.GetMtype()
880 params.SubId = int(subs.GetReqId().InstanceId)
882 params.Meid = subs.GetMeid()
884 params.PayloadLen = len(trans.Payload.Buf)
885 params.Payload = trans.Payload.Buf
887 xapp.Logger.Debug("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
888 err = c.SendWithRetry(params, false, 5)
890 xapp.Logger.Error("rmrSendToE2T: Send failed: %+v", err)
895 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
897 params := &xapp.RMRParams{}
898 params.Mtype = trans.GetMtype()
899 params.SubId = int(subs.GetReqId().InstanceId)
900 params.Xid = trans.GetXid()
901 params.Meid = trans.GetMeid()
903 params.PayloadLen = len(trans.Payload.Buf)
904 params.Payload = trans.Payload.Buf
906 xapp.Logger.Debug("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
907 err = c.SendWithRetry(params, false, 5)
909 xapp.Logger.Error("rmrSendToXapp: Send failed: %+v", err)
914 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
915 if c.RMRClient == nil {
916 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
917 xapp.Logger.Error("%s", err.Error())
922 defer c.RMRClient.Free(msg.Mbuf)
924 // xapp-frame might use direct access to c buffer and
925 // when msg.Mbuf is freed, someone might take it into use
926 // and payload data might be invalid inside message handle function
928 // subscriptions won't load system a lot so there is no
929 // real performance hit by cloning buffer into new go byte slice
930 cPay := append(msg.Payload[:0:0], msg.Payload...)
932 msg.PayloadLen = len(cPay)
935 case xapp.RIC_SUB_REQ:
936 go c.handleXAPPSubscriptionRequest(msg)
937 case xapp.RIC_SUB_RESP:
938 go c.handleE2TSubscriptionResponse(msg)
939 case xapp.RIC_SUB_FAILURE:
940 go c.handleE2TSubscriptionFailure(msg)
941 case xapp.RIC_SUB_DEL_REQ:
942 go c.handleXAPPSubscriptionDeleteRequest(msg)
943 case xapp.RIC_SUB_DEL_RESP:
944 go c.handleE2TSubscriptionDeleteResponse(msg)
945 case xapp.RIC_SUB_DEL_FAILURE:
946 go c.handleE2TSubscriptionDeleteFailure(msg)
948 xapp.Logger.Debug("Unknown Message Type '%d', discarding", msg.Mtype)
953 //-------------------------------------------------------------------
954 // handle from XAPP Subscription Request
955 //------------------------------------------------------------------
956 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
957 xapp.Logger.Debug("MSG from XAPP: %s", params.String())
958 c.UpdateCounter(cSubReqFromXapp)
960 if c.e2IfState.IsE2ConnectionUp(¶ms.Meid.RanName) == false {
961 xapp.Logger.Error("No E2 connection for ranName %v", params.Meid.RanName)
965 subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
967 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
971 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
973 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
976 defer trans.Release()
978 if err = c.tracker.Track(trans); err != nil {
979 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
983 subs, _, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c, true)
985 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
989 c.wakeSubscriptionRequest(subs, trans)
992 //-------------------------------------------------------------------
993 // Wake Subscription Request to E2node
994 //------------------------------------------------------------------
995 func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) {
997 e2SubscriptionDirectives, err := c.GetE2SubscriptionDirectives(nil)
999 xapp.Logger.Error("c.GetE2SubscriptionDirectives failure: %s", err.Error())
1001 subs.OngoingReqCount++
1002 go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives, waitRouteCleanup_ms)
1003 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
1004 subs.OngoingReqCount--
1006 switch themsg := event.(type) {
1007 case *e2ap.E2APSubscriptionResponse:
1008 themsg.RequestId.Id = trans.RequestId.Id
1009 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
1012 c.UpdateCounter(cSubRespToXapp)
1013 err := c.rmrSendToXapp("", subs, trans)
1015 xapp.Logger.Error("rmrSendToXapp() failed:%s", err.Error())
1019 case *e2ap.E2APSubscriptionFailure:
1020 themsg.RequestId.Id = trans.RequestId.Id
1021 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
1023 c.UpdateCounter(cSubFailToXapp)
1024 c.rmrSendToXapp("", subs, trans)
1030 xapp.Logger.Debug("XAPP-SubReq: failed %s", idstring(err, trans, subs))
1033 //-------------------------------------------------------------------
1034 // handle from XAPP Subscription Delete Request
1035 //------------------------------------------------------------------
1036 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
1037 xapp.Logger.Debug("MSG from XAPP: %s", params.String())
1038 c.UpdateCounter(cSubDelReqFromXapp)
1040 if c.e2IfState.IsE2ConnectionUp(¶ms.Meid.RanName) == false {
1041 xapp.Logger.Error("No E2 connection for ranName %v", params.Meid.RanName)
1045 subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
1047 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
1051 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
1053 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
1056 defer trans.Release()
1058 err = c.tracker.Track(trans)
1060 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
1064 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
1066 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
1073 subs.OngoingDelCount++
1074 go c.handleSubscriptionDelete(subs, trans, waitRouteCleanup_ms)
1075 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
1076 subs.OngoingDelCount--
1078 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
1080 if subs.NoRespToXapp == true {
1081 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
1082 xapp.Logger.Debug("XAPP-SubDelReq: subs.NoRespToXapp == true")
1086 // Whatever is received success, fail or timeout, send successful delete response
1087 subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
1088 subDelRespMsg.RequestId.Id = trans.RequestId.Id
1089 subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
1090 subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
1091 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
1093 c.UpdateCounter(cSubDelRespToXapp)
1094 err := c.rmrSendToXapp("", subs, trans)
1096 xapp.Logger.Error("rmrSendToXapp() failed:%s", err.Error())
1101 //-------------------------------------------------------------------
1102 // SUBS CREATE Handling
1103 //-------------------------------------------------------------------
1104 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp, e2SubscriptionDirectives *E2SubscriptionDirectives, waitRouteCleanupTime time.Duration) {
1106 var event interface{} = nil
1107 var removeSubscriptionFromDb bool = false
1108 trans := c.tracker.NewSubsTransaction(subs)
1109 subs.WaitTransactionTurn(trans)
1110 defer subs.ReleaseTransactionTurn(trans)
1111 defer trans.Release()
1113 xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
1115 subRfMsg, valid := subs.GetCachedResponse()
1116 if subRfMsg == nil && valid == true {
1117 event = c.sendE2TSubscriptionRequest(subs, trans, parentTrans, e2SubscriptionDirectives)
1118 switch event.(type) {
1119 case *e2ap.E2APSubscriptionResponse:
1120 subRfMsg, valid = subs.SetCachedResponse(event, true)
1121 subs.SubRespRcvd = true
1122 case *e2ap.E2APSubscriptionFailure:
1123 subRfMsg, valid = subs.SetCachedResponse(event, false)
1124 xapp.Logger.Debug("SUBS-SubReq: internal delete due failure event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1125 case *SubmgrRestartTestEvent:
1126 // This is used to simulate that no response has been received and after restart, subscriptions are restored from db
1127 xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
1128 subRfMsg, valid = subs.SetCachedResponse(event, false)
1129 parentTrans.SendEvent(subRfMsg, 0)
1131 case *PackSubscriptionRequestErrortEvent, *SDLWriteErrortEvent:
1132 subRfMsg, valid = subs.SetCachedResponse(event, false)
1135 if subs.PolicyUpdate == false {
1136 xapp.Logger.Debug("SUBS-SubReq: internal delete due default event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1137 subRfMsg, valid = subs.SetCachedResponse(nil, false)
1138 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1140 subRfMsg, valid = subs.SetCachedResponse(nil, true)
1143 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
1145 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
1148 removeSubscriptionFromDb = true
1151 err := c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
1154 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1158 // Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
1160 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanupTime, c)
1163 parentTrans.SendEvent(subRfMsg, 0)
1166 //-------------------------------------------------------------------
1167 // SUBS DELETE Handling
1168 //-------------------------------------------------------------------
1170 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp, waitRouteCleanupTime time.Duration) {
1172 trans := c.tracker.NewSubsTransaction(subs)
1173 subs.WaitTransactionTurn(trans)
1174 defer subs.ReleaseTransactionTurn(trans)
1175 defer trans.Release()
1177 xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
1181 if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
1184 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1189 // Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
1190 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanupTime, c)
1191 parentTrans.SendEvent(nil, 0)
1194 //-------------------------------------------------------------------
1195 // send to E2T Subscription Request
1196 //-------------------------------------------------------------------
1197 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp, e2SubscriptionDirectives *E2SubscriptionDirectives) interface{} {
1199 var event interface{} = nil
1200 var timedOut bool = false
1201 const ricRequestorId = 123
1203 subReqMsg := subs.SubReqMsg
1204 subReqMsg.RequestId = subs.GetReqId().RequestId
1205 subReqMsg.RequestId.Id = ricRequestorId
1206 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
1208 xapp.Logger.Error("SUBS-SubReq ASN1 pack error: %s", idstring(err, trans, subs, parentTrans))
1209 return &PackSubscriptionRequestErrortEvent{
1211 ErrorSource: models.SubscriptionInstanceErrorSourceASN1,
1212 ErrorCause: err.Error(),
1217 // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
1218 err = c.WriteSubscriptionToDb(subs)
1220 return &SDLWriteErrortEvent{
1222 ErrorSource: models.SubscriptionInstanceErrorSourceDBAAS,
1223 ErrorCause: err.Error(),
1228 for retries := int64(0); retries < e2SubscriptionDirectives.E2MaxTryCount; retries++ {
1229 desc := fmt.Sprintf("(retry %d)", retries)
1231 c.UpdateCounter(cSubReqToE2)
1233 c.UpdateCounter(cSubReReqToE2)
1235 err := c.rmrSendToE2T(desc, subs, trans)
1237 xapp.Logger.Error("rmrSendToE2T() failed:%s", err.Error())
1240 if subs.DoNotWaitSubResp == false {
1241 event, timedOut = trans.WaitEvent(e2SubscriptionDirectives.E2TimeoutTimerValue)
1243 c.UpdateCounter(cSubReqTimerExpiry)
1247 // Simulating case where subscrition request has been sent but response has not been received before restart
1248 event = &SubmgrRestartTestEvent{}
1249 xapp.Logger.Debug("Restart event, DoNotWaitSubResp == true")
1253 xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1257 //-------------------------------------------------------------------
1258 // send to E2T Subscription Delete Request
1259 //-------------------------------------------------------------------
1261 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
1263 var event interface{}
1265 const ricRequestorId = 123
1267 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1268 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1269 subDelReqMsg.RequestId.Id = ricRequestorId
1270 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1271 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1273 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
1277 for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
1278 desc := fmt.Sprintf("(retry %d)", retries)
1280 c.UpdateCounter(cSubDelReqToE2)
1282 c.UpdateCounter(cSubDelReReqToE2)
1284 err := c.rmrSendToE2T(desc, subs, trans)
1286 xapp.Logger.Error("SUBS-SubDelReq: rmrSendToE2T failure: %s", idstring(err, trans, subs, parentTrans))
1288 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
1290 c.UpdateCounter(cSubDelReqTimerExpiry)
1295 xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1299 //-------------------------------------------------------------------
1300 // handle from E2T Subscription Response
1301 //-------------------------------------------------------------------
1302 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
1303 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1304 c.UpdateCounter(cSubRespFromE2)
1306 subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
1308 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
1311 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
1313 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
1316 trans := subs.GetTransaction()
1318 err = fmt.Errorf("Ongoing transaction not found")
1319 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
1322 xapp.Logger.Debug("SUBS-SubResp: Sending event, trans= %v", trans)
1323 sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
1324 if sendOk == false {
1325 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1326 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
1331 //-------------------------------------------------------------------
1332 // handle from E2T Subscription Failure
1333 //-------------------------------------------------------------------
1334 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
1335 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1336 c.UpdateCounter(cSubFailFromE2)
1337 subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
1339 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
1342 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
1344 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
1347 trans := subs.GetTransaction()
1349 err = fmt.Errorf("Ongoing transaction not found")
1350 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
1353 sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
1354 if sendOk == false {
1355 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1356 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
1361 //-------------------------------------------------------------------
1362 // handle from E2T Subscription Delete Response
1363 //-------------------------------------------------------------------
1364 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) {
1365 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1366 c.UpdateCounter(cSubDelRespFromE2)
1367 subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
1369 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1372 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
1374 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1377 trans := subs.GetTransaction()
1379 err = fmt.Errorf("Ongoing transaction not found")
1380 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
1383 sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
1384 if sendOk == false {
1385 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1386 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
1391 //-------------------------------------------------------------------
1392 // handle from E2T Subscription Delete Failure
1393 //-------------------------------------------------------------------
1394 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
1395 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1396 c.UpdateCounter(cSubDelFailFromE2)
1397 subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
1399 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1402 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
1404 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1407 trans := subs.GetTransaction()
1409 err = fmt.Errorf("Ongoing transaction not found")
1410 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
1413 sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
1414 if sendOk == false {
1415 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1416 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
1421 //-------------------------------------------------------------------
1423 //-------------------------------------------------------------------
1424 func typeofSubsMessage(v interface{}) string {
1429 //case *e2ap.E2APSubscriptionRequest:
1431 case *e2ap.E2APSubscriptionResponse:
1433 case *e2ap.E2APSubscriptionFailure:
1435 //case *e2ap.E2APSubscriptionDeleteRequest:
1436 // return "SubDelReq"
1437 case *e2ap.E2APSubscriptionDeleteResponse:
1439 case *e2ap.E2APSubscriptionDeleteFailure:
1446 //-------------------------------------------------------------------
1448 //-------------------------------------------------------------------
1449 func (c *Control) WriteSubscriptionToDb(subs *Subscription) error {
1450 xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
1451 err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
1453 xapp.Logger.Error("%v", err)
1459 //-------------------------------------------------------------------
1461 //-------------------------------------------------------------------
1462 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) error {
1464 if removeSubscriptionFromDb == true {
1465 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1466 c.RemoveSubscriptionFromDb(subs)
1468 // Update is needed for successful response and merge case here
1469 if subs.RetryFromXapp == false {
1470 err := c.WriteSubscriptionToDb(subs)
1474 subs.RetryFromXapp = false
1478 //-------------------------------------------------------------------
1480 //-------------------------------------------------------------------
1481 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
1482 xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
1483 err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
1485 xapp.Logger.Error("%v", err)
1489 //-------------------------------------------------------------------
1491 //-------------------------------------------------------------------
1492 func (c *Control) WriteRESTSubscriptionToDb(restSubId string, restSubs *RESTSubscription) {
1493 xapp.Logger.Debug("WriteRESTSubscriptionToDb() restSubId = %s", restSubId)
1494 err := c.WriteRESTSubscriptionToSdl(restSubId, restSubs)
1496 xapp.Logger.Error("%v", err)
1500 //-------------------------------------------------------------------
1502 //-------------------------------------------------------------------
1503 func (c *Control) UpdateRESTSubscriptionInDB(restSubId string, restSubs *RESTSubscription, removeRestSubscriptionFromDb bool) {
1505 if removeRestSubscriptionFromDb == true {
1506 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1507 c.RemoveRESTSubscriptionFromDb(restSubId)
1509 c.WriteRESTSubscriptionToDb(restSubId, restSubs)
1513 //-------------------------------------------------------------------
1515 //-------------------------------------------------------------------
1516 func (c *Control) RemoveRESTSubscriptionFromDb(restSubId string) {
1517 xapp.Logger.Debug("RemoveRESTSubscriptionFromDb() restSubId = %s", restSubId)
1518 err := c.RemoveRESTSubscriptionFromSdl(restSubId)
1520 xapp.Logger.Error("%v", err)
1524 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) {
1526 if c.UTTesting == true {
1527 // Reqistry mutex is not locked after real restart but it can be when restart is simulated in unit tests
1528 c.registry.mutex = new(sync.Mutex)
1531 const ricRequestorId = 123
1532 xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
1534 // Send delete for every endpoint in the subscription
1535 if subs.PolicyUpdate == false {
1536 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1537 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1538 subDelReqMsg.RequestId.Id = ricRequestorId
1539 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1540 mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1542 xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
1545 for _, endPoint := range subs.EpList.Endpoints {
1546 params := &xapp.RMRParams{}
1547 params.Mtype = mType
1548 params.SubId = int(subs.GetReqId().InstanceId)
1550 params.Meid = subs.Meid
1551 params.Src = endPoint.String()
1552 params.PayloadLen = len(payload.Buf)
1553 params.Payload = payload.Buf
1555 subs.DeleteFromDb = true
1556 c.handleXAPPSubscriptionDeleteRequest(params)
1561 func (c *Control) PrintRESTSubscriptionRequest(p *models.SubscriptionParams) {
1563 fmt.Println("CRESTSubscriptionRequest")
1569 if p.SubscriptionID != "" {
1570 fmt.Println(" SubscriptionID = ", p.SubscriptionID)
1572 fmt.Println(" SubscriptionID = ''")
1575 fmt.Printf(" ClientEndpoint.Host = %s\n", p.ClientEndpoint.Host)
1577 if p.ClientEndpoint.HTTPPort != nil {
1578 fmt.Printf(" ClientEndpoint.HTTPPort = %v\n", *p.ClientEndpoint.HTTPPort)
1580 fmt.Println(" ClientEndpoint.HTTPPort = nil")
1583 if p.ClientEndpoint.RMRPort != nil {
1584 fmt.Printf(" ClientEndpoint.RMRPort = %v\n", *p.ClientEndpoint.RMRPort)
1586 fmt.Println(" ClientEndpoint.RMRPort = nil")
1590 fmt.Printf(" Meid = %s\n", *p.Meid)
1592 fmt.Println(" Meid = nil")
1595 if p.E2SubscriptionDirectives == nil {
1596 fmt.Println(" E2SubscriptionDirectives = nil")
1598 fmt.Println(" E2SubscriptionDirectives")
1599 if p.E2SubscriptionDirectives.E2RetryCount == nil {
1600 fmt.Println(" E2RetryCount == nil")
1602 fmt.Printf(" E2RetryCount = %v\n", *p.E2SubscriptionDirectives.E2RetryCount)
1604 fmt.Printf(" E2TimeoutTimerValue = %v\n", p.E2SubscriptionDirectives.E2TimeoutTimerValue)
1605 fmt.Printf(" RMRRoutingNeeded = %v\n", p.E2SubscriptionDirectives.RMRRoutingNeeded)
1607 for _, subscriptionDetail := range p.SubscriptionDetails {
1608 if p.RANFunctionID != nil {
1609 fmt.Printf(" RANFunctionID = %v\n", *p.RANFunctionID)
1611 fmt.Println(" RANFunctionID = nil")
1613 fmt.Printf(" SubscriptionDetail.XappEventInstanceID = %v\n", *subscriptionDetail.XappEventInstanceID)
1614 fmt.Printf(" SubscriptionDetail.EventTriggers = %v\n", subscriptionDetail.EventTriggers)
1616 for _, actionToBeSetup := range subscriptionDetail.ActionToBeSetupList {
1617 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionID = %v\n", *actionToBeSetup.ActionID)
1618 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionType = %s\n", *actionToBeSetup.ActionType)
1619 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionDefinition = %v\n", actionToBeSetup.ActionDefinition)
1621 if actionToBeSetup.SubsequentAction != nil {
1622 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.SubsequentAction.SubsequentActionType = %s\n", *actionToBeSetup.SubsequentAction.SubsequentActionType)
1623 fmt.Printf(" SubscriptionDetail.ActionToBeSetup..SubsequentAction.TimeToWait = %s\n", *actionToBeSetup.SubsequentAction.TimeToWait)
1625 fmt.Println(" SubscriptionDetail.ActionToBeSetup.SubsequentAction = nil")