2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
30 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
31 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
32 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
33 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/common"
34 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
35 httptransport "github.com/go-openapi/runtime/client"
36 "github.com/go-openapi/strfmt"
37 "github.com/gorilla/mux"
38 "github.com/segmentio/ksuid"
39 "github.com/spf13/viper"
42 //-----------------------------------------------------------------------------
44 //-----------------------------------------------------------------------------
46 func idstring(err error, entries ...fmt.Stringer) string {
47 var retval string = ""
48 var filler string = ""
49 for _, entry := range entries {
51 retval += filler + entry.String()
54 retval += filler + "(NIL)"
58 retval += filler + "err(" + err.Error() + ")"
64 //-----------------------------------------------------------------------------
66 //-----------------------------------------------------------------------------
68 var e2tSubReqTimeout time.Duration
69 var e2tSubDelReqTime time.Duration
70 var e2tRecvMsgTimeout time.Duration
71 var waitRouteCleanup_ms time.Duration
72 var e2tMaxSubReqTryCount uint64 // Initial try + retry
73 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
74 var readSubsFromDb string
75 var restDuplicateCtrl duplicateCtrl
76 var dbRetryForever string
85 restSubsDb Sdlnterface
88 Counters map[string]xapp.Counter
99 type SubmgrRestartTestEvent struct{}
100 type SubmgrRestartUpEvent struct{}
101 type PackSubscriptionRequestErrortEvent struct {
105 func (p *PackSubscriptionRequestErrortEvent) SetEvent(errorInfo *ErrorInfo) {
106 p.ErrorInfo = *errorInfo
109 type SDLWriteErrortEvent struct {
113 func (s *SDLWriteErrortEvent) SetEvent(errorInfo *ErrorInfo) {
114 s.ErrorInfo = *errorInfo
118 xapp.Logger.Debug("SUBMGR")
120 viper.SetEnvPrefix("submgr")
121 viper.AllowEmptyEnv(true)
124 func NewControl() *Control {
126 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
127 rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
129 registry := new(Registry)
130 registry.Initialize()
131 registry.rtmgrClient = &rtmgrClient
133 tracker := new(Tracker)
136 c := &Control{e2ap: new(E2ap),
139 e2SubsDb: CreateSdl(),
140 restSubsDb: CreateRESTSdl(),
141 Counters: xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
144 c.ReadConfigParameters("")
146 // Register REST handler for testing support
147 xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
148 xapp.Resource.InjectRoute("/ric/v1/restsubscriptions", c.GetAllRestSubscriptions, "GET")
149 xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET")
151 go xapp.Subscription.Listen(c.RESTSubscriptionHandler, c.RESTQueryHandler, c.RESTSubscriptionDeleteHandler)
153 if readSubsFromDb == "false" {
157 restDuplicateCtrl.Init()
159 // Read subscriptions from db
160 c.ReadE2Subscriptions()
161 c.ReadRESTSubscriptions()
165 func (c *Control) SymptomDataHandler(w http.ResponseWriter, r *http.Request) {
166 subscriptions, _ := c.registry.QueryHandler()
167 xapp.Resource.SendSymptomDataJson(w, r, subscriptions, "platform/subscriptions.json")
170 //-------------------------------------------------------------------
172 //-------------------------------------------------------------------
173 func (c *Control) GetAllRestSubscriptions(w http.ResponseWriter, r *http.Request) {
174 xapp.Logger.Debug("GetAllRestSubscriptions() called")
175 response := c.registry.GetAllRestSubscriptions()
179 //-------------------------------------------------------------------
181 //-------------------------------------------------------------------
182 func (c *Control) ReadE2Subscriptions() error {
185 var register map[uint32]*Subscription
186 for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
187 xapp.Logger.Debug("Reading E2 subscriptions from db")
188 subIds, register, err = c.ReadAllSubscriptionsFromSdl()
190 xapp.Logger.Error("%v", err)
191 <-time.After(1 * time.Second)
193 c.registry.subIds = subIds
194 c.registry.register = register
195 c.HandleUncompletedSubscriptions(register)
199 xapp.Logger.Debug("Continuing without retring")
203 //-------------------------------------------------------------------
205 //-------------------------------------------------------------------
206 func (c *Control) ReadRESTSubscriptions() error {
208 var restSubscriptions map[string]*RESTSubscription
209 for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
210 xapp.Logger.Debug("Reading REST subscriptions from db")
211 restSubscriptions, err = c.ReadAllRESTSubscriptionsFromSdl()
213 xapp.Logger.Error("%v", err)
214 <-time.After(1 * time.Second)
216 c.registry.restSubscriptions = restSubscriptions
220 xapp.Logger.Debug("Continuing without retring")
224 //-------------------------------------------------------------------
226 //-------------------------------------------------------------------
227 func (c *Control) ReadConfigParameters(f string) {
229 c.LoggerLevel = int(xapp.Logger.GetLevel())
230 xapp.Logger.Debug("LoggerLevel %v", c.LoggerLevel)
232 // viper.GetDuration returns nanoseconds
233 e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
234 if e2tSubReqTimeout == 0 {
235 e2tSubReqTimeout = 2000 * 1000000
237 xapp.Logger.Debug("e2tSubReqTimeout %v", e2tSubReqTimeout)
239 e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
240 if e2tSubDelReqTime == 0 {
241 e2tSubDelReqTime = 2000 * 1000000
243 xapp.Logger.Debug("e2tSubDelReqTime %v", e2tSubDelReqTime)
244 e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
245 if e2tRecvMsgTimeout == 0 {
246 e2tRecvMsgTimeout = 2000 * 1000000
248 xapp.Logger.Debug("e2tRecvMsgTimeout %v", e2tRecvMsgTimeout)
250 e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
251 if e2tMaxSubReqTryCount == 0 {
252 e2tMaxSubReqTryCount = 1
254 xapp.Logger.Debug("e2tMaxSubReqTryCount %v", e2tMaxSubReqTryCount)
256 e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
257 if e2tMaxSubDelReqTryCount == 0 {
258 e2tMaxSubDelReqTryCount = 1
260 xapp.Logger.Debug("e2tMaxSubDelReqTryCount %v", e2tMaxSubDelReqTryCount)
262 readSubsFromDb = viper.GetString("controls.readSubsFromDb")
263 if readSubsFromDb == "" {
264 readSubsFromDb = "true"
266 xapp.Logger.Debug("readSubsFromDb %v", readSubsFromDb)
268 dbTryCount = viper.GetInt("controls.dbTryCount")
272 xapp.Logger.Debug("dbTryCount %v", dbTryCount)
274 dbRetryForever = viper.GetString("controls.dbRetryForever")
275 if dbRetryForever == "" {
276 dbRetryForever = "true"
278 xapp.Logger.Debug("dbRetryForever %v", dbRetryForever)
280 // Internal cfg parameter, used to define a wait time for RMR route clean-up. None default
281 // value 100ms used currently only in unittests.
282 waitRouteCleanup_ms = viper.GetDuration("controls.waitRouteCleanup_ms") * 1000000
283 if waitRouteCleanup_ms == 0 {
284 waitRouteCleanup_ms = 5000 * 1000000
286 xapp.Logger.Debug("waitRouteCleanup %v", waitRouteCleanup_ms)
289 //-------------------------------------------------------------------
291 //-------------------------------------------------------------------
292 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
294 xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
295 for subId, subs := range register {
296 if subs.SubRespRcvd == false {
297 // If policy subscription has already been made successfully unsuccessful update should not be deleted.
298 if subs.PolicyUpdate == false {
299 subs.NoRespToXapp = true
300 xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
301 c.SendSubscriptionDeleteReq(subs)
307 func (c *Control) ReadyCB(data interface{}) {
308 if c.RMRClient == nil {
309 c.RMRClient = xapp.Rmr
313 func (c *Control) Run() {
314 xapp.SetReadyCB(c.ReadyCB, nil)
315 xapp.AddConfigChangeListener(c.ReadConfigParameters)
319 //-------------------------------------------------------------------
321 //-------------------------------------------------------------------
322 func (c *Control) GetOrCreateRestSubscription(p *models.SubscriptionParams, md5sum string, xAppRmrEndpoint string) (*RESTSubscription, string, error) {
325 var restSubscription *RESTSubscription
328 prevRestSubsId, exists := restDuplicateCtrl.GetLastKnownRestSubsIdBasedOnMd5sum(md5sum)
329 if p.SubscriptionID == "" {
330 // Subscription does not contain REST subscription Id
332 restSubscription, err = c.registry.GetRESTSubscription(prevRestSubsId, false)
333 if restSubscription != nil {
334 // Subscription not found
335 restSubId = prevRestSubsId
337 xapp.Logger.Debug("Existing restSubId %s found by MD5sum %s for a request without subscription ID - using previous subscription", prevRestSubsId, md5sum)
339 xapp.Logger.Debug("Existing restSubId %s found by MD5sum %s for a request without subscription ID - Note: %s", prevRestSubsId, md5sum, err.Error())
342 xapp.Logger.Debug("None existing restSubId %s referred by MD5sum %s for a request without subscription ID - deleting cached entry", prevRestSubsId, md5sum)
343 restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
347 if restSubscription == nil {
348 restSubId = ksuid.New().String()
349 restSubscription = c.registry.CreateRESTSubscription(&restSubId, &xAppRmrEndpoint, p.Meid)
352 // Subscription contains REST subscription Id
353 restSubId = p.SubscriptionID
355 xapp.Logger.Debug("RestSubscription ID %s provided via REST request", restSubId)
356 restSubscription, err = c.registry.GetRESTSubscription(restSubId, false)
358 // Subscription with id in REST request does not exist
359 xapp.Logger.Error("%s", err.Error())
360 c.UpdateCounter(cRestSubFailToXapp)
365 xapp.Logger.Debug("Existing restSubscription found for ID %s, new request based on md5sum", restSubId)
367 xapp.Logger.Debug("Existing restSubscription found for ID %s(%s), re-transmission based on md5sum match with previous request", prevRestSubsId, restSubId)
371 return restSubscription, restSubId, nil
374 //-------------------------------------------------------------------
376 //-------------------------------------------------------------------
377 func (c *Control) RESTSubscriptionHandler(params interface{}) (*models.SubscriptionResponse, int) {
380 c.UpdateCounter(cRestSubReqFromXapp)
382 subResp := models.SubscriptionResponse{}
383 p := params.(*models.SubscriptionParams)
385 if c.LoggerLevel > 2 {
386 c.PrintRESTSubscriptionRequest(p)
389 if p.ClientEndpoint == nil {
390 err := fmt.Errorf("ClientEndpoint == nil")
391 xapp.Logger.Error("%v", err)
392 c.UpdateCounter(cRestSubFailToXapp)
393 return nil, common.SubscribeBadRequestCode
396 _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*p.ClientEndpoint)
398 xapp.Logger.Error("%s", err.Error())
399 c.UpdateCounter(cRestSubFailToXapp)
400 return nil, common.SubscribeBadRequestCode
403 md5sum, err := CalculateRequestMd5sum(params)
405 xapp.Logger.Error("Failed to generate md5sum from incoming request - %s", err.Error())
408 restSubscription, restSubId, err := c.GetOrCreateRestSubscription(p, md5sum, xAppRmrEndpoint)
410 xapp.Logger.Error("Subscription with id in REST request does not exist")
411 return nil, common.SubscribeNotFoundCode
414 subResp.SubscriptionID = &restSubId
415 subReqList := e2ap.SubscriptionRequestList{}
416 err = c.e2ap.FillSubscriptionReqMsgs(params, &subReqList, restSubscription)
418 xapp.Logger.Error("%s", err.Error())
419 restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
420 c.registry.DeleteRESTSubscription(&restSubId)
421 c.UpdateCounter(cRestSubFailToXapp)
422 return nil, common.SubscribeBadRequestCode
425 duplicate := restDuplicateCtrl.IsDuplicateToOngoingTransaction(restSubId, md5sum)
427 err := fmt.Errorf("Retransmission blocker direct ACK for request of restSubsId %s restSubId MD5sum %s as retransmission", restSubId, md5sum)
428 xapp.Logger.Debug("%s", err)
429 c.UpdateCounter(cRestSubRespToXapp)
430 return &subResp, common.SubscribeCreatedCode
433 c.WriteRESTSubscriptionToDb(restSubId, restSubscription)
434 e2SubscriptionDirectives, err := c.GetE2SubscriptionDirectives(p)
436 xapp.Logger.Error("%s", err)
437 return nil, common.SubscribeBadRequestCode
439 go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId, xAppRmrEndpoint, md5sum, e2SubscriptionDirectives)
441 c.UpdateCounter(cRestSubRespToXapp)
442 return &subResp, common.SubscribeCreatedCode
445 //-------------------------------------------------------------------
447 //-------------------------------------------------------------------
448 func (c *Control) GetE2SubscriptionDirectives(p *models.SubscriptionParams) (*E2SubscriptionDirectives, error) {
450 e2SubscriptionDirectives := &E2SubscriptionDirectives{}
451 if p == nil || p.E2SubscriptionDirectives == nil {
452 e2SubscriptionDirectives.E2TimeoutTimerValue = e2tSubReqTimeout
453 e2SubscriptionDirectives.E2MaxTryCount = int64(e2tMaxSubReqTryCount)
454 e2SubscriptionDirectives.CreateRMRRoute = true
455 xapp.Logger.Debug("p == nil || p.E2SubscriptionDirectives == nil. Using default values for E2TimeoutTimerValue = %v and E2RetryCount = %v RMRRoutingNeeded = true", e2tSubReqTimeout, e2tMaxSubReqTryCount)
457 if p.E2SubscriptionDirectives.E2TimeoutTimerValue >= 1 && p.E2SubscriptionDirectives.E2TimeoutTimerValue <= 10 {
458 e2SubscriptionDirectives.E2TimeoutTimerValue = time.Duration(p.E2SubscriptionDirectives.E2TimeoutTimerValue) * 1000000000 // Duration type cast returns nano seconds
460 return nil, fmt.Errorf("p.E2SubscriptionDirectives.E2TimeoutTimerValue out of range (1-10 seconds): %v", p.E2SubscriptionDirectives.E2TimeoutTimerValue)
462 if p.E2SubscriptionDirectives.E2RetryCount == nil {
463 xapp.Logger.Error("p.E2SubscriptionDirectives.E2RetryCount == nil. Using default value")
464 e2SubscriptionDirectives.E2MaxTryCount = int64(e2tMaxSubReqTryCount)
466 if *p.E2SubscriptionDirectives.E2RetryCount >= 0 && *p.E2SubscriptionDirectives.E2RetryCount <= 10 {
467 e2SubscriptionDirectives.E2MaxTryCount = *p.E2SubscriptionDirectives.E2RetryCount + 1 // E2MaxTryCount = First sending plus two retries
469 return nil, fmt.Errorf("p.E2SubscriptionDirectives.E2RetryCount out of range (0-10): %v", *p.E2SubscriptionDirectives.E2RetryCount)
472 e2SubscriptionDirectives.CreateRMRRoute = p.E2SubscriptionDirectives.RMRRoutingNeeded
474 xapp.Logger.Debug("e2SubscriptionDirectives.E2TimeoutTimerValue: %v", e2SubscriptionDirectives.E2TimeoutTimerValue)
475 xapp.Logger.Debug("e2SubscriptionDirectives.E2MaxTryCount: %v", e2SubscriptionDirectives.E2MaxTryCount)
476 xapp.Logger.Debug("e2SubscriptionDirectives.CreateRMRRoute: %v", e2SubscriptionDirectives.CreateRMRRoute)
477 return e2SubscriptionDirectives, nil
480 //-------------------------------------------------------------------
482 //-------------------------------------------------------------------
484 func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription, subReqList *e2ap.SubscriptionRequestList,
485 clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string, xAppRmrEndpoint string, md5sum string, e2SubscriptionDirectives *E2SubscriptionDirectives) {
487 c.SubscriptionProcessingStartDelay()
488 xapp.Logger.Debug("Subscription Request count=%v ", len(subReqList.E2APSubscriptionRequests))
490 var xAppEventInstanceID int64
491 var e2EventInstanceID int64
492 errorInfo := &ErrorInfo{}
494 defer restDuplicateCtrl.SetMd5sumFromLastOkRequest(*restSubId, md5sum)
496 for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ {
497 subReqMsg := subReqList.E2APSubscriptionRequests[index]
498 xAppEventInstanceID = (int64)(subReqMsg.RequestId.Id)
500 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), *restSubId, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid})
502 // Send notification to xApp that prosessing of a Subscription Request has failed.
503 err := fmt.Errorf("Tracking failure")
504 errorInfo.ErrorCause = err.Error()
505 c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans, errorInfo)
509 xapp.Logger.Debug("Handle SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
511 subRespMsg, errorInfo, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, *restSubId, e2SubscriptionDirectives)
513 xapp.Logger.Debug("Handled SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
517 c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans, errorInfo)
519 e2EventInstanceID = (int64)(subRespMsg.RequestId.InstanceId)
520 restSubscription.AddMd5Sum(md5sum)
521 xapp.Logger.Debug("SubscriptionRequest index=%v processed successfullyfor %s. endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
522 index, *restSubId, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
523 c.sendSuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, e2EventInstanceID, clientEndpoint, trans)
528 //-------------------------------------------------------------------
530 //------------------------------------------------------------------
531 func (c *Control) SubscriptionProcessingStartDelay() {
532 if c.UTTesting == true {
533 // This is temporary fix for the UT problem that notification arrives before subscription response
534 // Correct fix would be to allow notification come before response and process it correctly
535 xapp.Logger.Debug("Setting 50 ms delay before starting processing Subscriptions")
536 <-time.After(time.Millisecond * 50)
537 xapp.Logger.Debug("Continuing after delay")
541 //-------------------------------------------------------------------
543 //------------------------------------------------------------------
544 func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, meid *string,
545 restSubId string, e2SubscriptionDirectives *E2SubscriptionDirectives) (*e2ap.E2APSubscriptionResponse, *ErrorInfo, error) {
547 errorInfo := ErrorInfo{}
549 err := c.tracker.Track(trans)
551 xapp.Logger.Error("XAPP-SubReq Tracking error: %s", idstring(err, trans))
552 errorInfo.ErrorCause = err.Error()
553 err = fmt.Errorf("Tracking failure")
554 return nil, &errorInfo, err
557 subs, errorInfo, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c, e2SubscriptionDirectives.CreateRMRRoute)
559 xapp.Logger.Error("XAPP-SubReq Assign error: %s", idstring(err, trans))
560 return nil, &errorInfo, err
566 go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives)
567 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
571 switch themsg := event.(type) {
572 case *e2ap.E2APSubscriptionResponse:
574 return themsg, &errorInfo, nil
575 case *e2ap.E2APSubscriptionFailure:
576 err = fmt.Errorf("E2 SubscriptionFailure received")
577 errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
578 return nil, &errorInfo, err
579 case *PackSubscriptionRequestErrortEvent:
580 err = fmt.Errorf("E2 SubscriptionRequest pack failure")
581 return nil, &themsg.ErrorInfo, err
582 case *SDLWriteErrortEvent:
583 err = fmt.Errorf("SDL write failure")
584 return nil, &themsg.ErrorInfo, err
586 err = fmt.Errorf("Unexpected E2 subscription response received")
587 errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
591 err = fmt.Errorf("E2 subscription response timeout")
592 errorInfo.SetInfo(err.Error(), "", models.SubscriptionInstanceTimeoutTypeE2Timeout)
593 if subs.PolicyUpdate == true {
594 return nil, &errorInfo, err
598 xapp.Logger.Error("XAPP-SubReq E2 subscription failed %s", idstring(err, trans, subs))
599 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
600 return nil, &errorInfo, err
603 //-------------------------------------------------------------------
605 //-------------------------------------------------------------------
606 func (c *Control) sendUnsuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, err error,
607 clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp, errorInfo *ErrorInfo) {
609 // Send notification to xApp that prosessing of a Subscription Request has failed.
610 e2EventInstanceID := (int64)(0)
611 if errorInfo.ErrorSource == "" {
612 // Submgr is default source of error
613 errorInfo.ErrorSource = models.SubscriptionInstanceErrorSourceSUBMGR
615 resp := &models.SubscriptionResponse{
616 SubscriptionID: restSubId,
617 SubscriptionInstances: []*models.SubscriptionInstance{
618 &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
619 ErrorCause: errorInfo.ErrorCause,
620 ErrorSource: errorInfo.ErrorSource,
621 TimeoutType: errorInfo.TimeoutType,
622 XappEventInstanceID: &xAppEventInstanceID},
625 // Mark REST subscription request processed.
626 restSubscription.SetProcessed(err)
627 c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
629 xapp.Logger.Debug("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
630 errorInfo.ErrorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
632 xapp.Logger.Debug("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v",
633 errorInfo.ErrorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID)
636 c.UpdateCounter(cRestSubFailNotifToXapp)
637 xapp.Subscription.Notify(resp, *clientEndpoint)
640 //-------------------------------------------------------------------
642 //-------------------------------------------------------------------
643 func (c *Control) sendSuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, e2EventInstanceID int64,
644 clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp) {
646 // Store successfully processed InstanceId for deletion
647 restSubscription.AddE2InstanceId((uint32)(e2EventInstanceID))
648 restSubscription.AddXappIdToE2Id(xAppEventInstanceID, e2EventInstanceID)
650 // Send notification to xApp that a Subscription Request has been processed.
651 resp := &models.SubscriptionResponse{
652 SubscriptionID: restSubId,
653 SubscriptionInstances: []*models.SubscriptionInstance{
654 &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
656 XappEventInstanceID: &xAppEventInstanceID},
659 // Mark REST subscription request processesd.
660 restSubscription.SetProcessed(nil)
661 c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
662 xapp.Logger.Debug("Sending successful REST notification to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
663 clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
665 c.UpdateCounter(cRestSubNotifToXapp)
666 xapp.Subscription.Notify(resp, *clientEndpoint)
669 //-------------------------------------------------------------------
671 //-------------------------------------------------------------------
672 func (c *Control) RESTSubscriptionDeleteHandler(restSubId string) int {
675 c.UpdateCounter(cRestSubDelReqFromXapp)
677 xapp.Logger.Debug("SubscriptionDeleteRequest from XAPP")
679 restSubscription, err := c.registry.GetRESTSubscription(restSubId, true)
681 xapp.Logger.Error("%s", err.Error())
682 if restSubscription == nil {
683 // Subscription was not found
684 return common.UnsubscribeNoContentCode
686 if restSubscription.SubReqOngoing == true {
687 err := fmt.Errorf("Handling of the REST Subscription Request still ongoing %s", restSubId)
688 xapp.Logger.Error("%s", err.Error())
689 return common.UnsubscribeBadRequestCode
690 } else if restSubscription.SubDelReqOngoing == true {
691 // Previous request for same restSubId still ongoing
692 return common.UnsubscribeBadRequestCode
697 xAppRmrEndPoint := restSubscription.xAppRmrEndPoint
699 xapp.Logger.Debug("Deleteting handler: processing instances = %v", restSubscription.InstanceIds)
700 for _, instanceId := range restSubscription.InstanceIds {
701 xAppEventInstanceID, err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId)
704 xapp.Logger.Error("%s", err.Error())
706 xapp.Logger.Debug("Deleteting instanceId = %v", instanceId)
707 restSubscription.DeleteXappIdToE2Id(xAppEventInstanceID)
708 restSubscription.DeleteE2InstanceId(instanceId)
710 restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(restSubscription.lastReqMd5sum)
711 c.registry.DeleteRESTSubscription(&restSubId)
712 c.RemoveRESTSubscriptionFromDb(restSubId)
715 c.UpdateCounter(cRestSubDelRespToXapp)
717 return common.UnsubscribeNoContentCode
720 //-------------------------------------------------------------------
722 //-------------------------------------------------------------------
723 func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32) (int64, error) {
725 var xAppEventInstanceID int64
726 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{instanceId})
728 xapp.Logger.Debug("Subscription Delete Handler subscription for restSubId=%v, E2EventInstanceID=%v not found %s",
729 restSubId, instanceId, idstring(err, nil))
730 return xAppEventInstanceID, nil
733 xAppEventInstanceID = int64(subs.ReqId.Id)
734 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(*endPoint), *restSubId, e2ap.RequestId{subs.ReqId.Id, 0}, &xapp.RMRMeid{RanName: *meid})
736 err := fmt.Errorf("XAPP-SubDelReq transaction not created. restSubId %s, endPoint %s, meid %s, instanceId %v", *restSubId, *endPoint, *meid, instanceId)
737 xapp.Logger.Error("%s", err.Error())
739 defer trans.Release()
741 err = c.tracker.Track(trans)
743 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
744 xapp.Logger.Error("%s", err.Error())
745 return xAppEventInstanceID, &time.ParseError{}
750 go c.handleSubscriptionDelete(subs, trans)
751 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
753 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
755 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
757 return xAppEventInstanceID, nil
760 //-------------------------------------------------------------------
762 //-------------------------------------------------------------------
763 func (c *Control) RESTQueryHandler() (models.SubscriptionList, error) {
764 xapp.Logger.Debug("RESTQueryHandler() called")
768 return c.registry.QueryHandler()
771 func (c *Control) TestRestHandler(w http.ResponseWriter, r *http.Request) {
772 xapp.Logger.Debug("RESTTestRestHandler() called")
774 pathParams := mux.Vars(r)
775 s := pathParams["testId"]
777 // This can be used to delete single subscription from db
778 if contains := strings.Contains(s, "deletesubid="); contains == true {
779 var splits = strings.Split(s, "=")
780 if subId, err := strconv.ParseInt(splits[1], 10, 64); err == nil {
781 xapp.Logger.Debug("RemoveSubscriptionFromSdl() called. subId = %v", subId)
782 c.RemoveSubscriptionFromSdl(uint32(subId))
787 // This can be used to remove all subscriptions db from
789 xapp.Logger.Debug("RemoveAllSubscriptionsFromSdl() called")
790 c.RemoveAllSubscriptionsFromSdl()
791 c.RemoveAllRESTSubscriptionsFromSdl()
795 // This is meant to cause submgr's restart in testing
797 xapp.Logger.Debug("os.Exit(1) called")
801 xapp.Logger.Debug("Unsupported rest command received %s", s)
804 //-------------------------------------------------------------------
806 //-------------------------------------------------------------------
808 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
809 params := &xapp.RMRParams{}
810 params.Mtype = trans.GetMtype()
811 params.SubId = int(subs.GetReqId().InstanceId)
813 params.Meid = subs.GetMeid()
815 params.PayloadLen = len(trans.Payload.Buf)
816 params.Payload = trans.Payload.Buf
818 xapp.Logger.Debug("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
819 err = c.SendWithRetry(params, false, 5)
821 xapp.Logger.Error("rmrSendToE2T: Send failed: %+v", err)
826 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
828 params := &xapp.RMRParams{}
829 params.Mtype = trans.GetMtype()
830 params.SubId = int(subs.GetReqId().InstanceId)
831 params.Xid = trans.GetXid()
832 params.Meid = trans.GetMeid()
834 params.PayloadLen = len(trans.Payload.Buf)
835 params.Payload = trans.Payload.Buf
837 xapp.Logger.Debug("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
838 err = c.SendWithRetry(params, false, 5)
840 xapp.Logger.Error("rmrSendToXapp: Send failed: %+v", err)
845 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
846 if c.RMRClient == nil {
847 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
848 xapp.Logger.Error("%s", err.Error())
853 defer c.RMRClient.Free(msg.Mbuf)
855 // xapp-frame might use direct access to c buffer and
856 // when msg.Mbuf is freed, someone might take it into use
857 // and payload data might be invalid inside message handle function
859 // subscriptions won't load system a lot so there is no
860 // real performance hit by cloning buffer into new go byte slice
861 cPay := append(msg.Payload[:0:0], msg.Payload...)
863 msg.PayloadLen = len(cPay)
866 case xapp.RIC_SUB_REQ:
867 go c.handleXAPPSubscriptionRequest(msg)
868 case xapp.RIC_SUB_RESP:
869 go c.handleE2TSubscriptionResponse(msg)
870 case xapp.RIC_SUB_FAILURE:
871 go c.handleE2TSubscriptionFailure(msg)
872 case xapp.RIC_SUB_DEL_REQ:
873 go c.handleXAPPSubscriptionDeleteRequest(msg)
874 case xapp.RIC_SUB_DEL_RESP:
875 go c.handleE2TSubscriptionDeleteResponse(msg)
876 case xapp.RIC_SUB_DEL_FAILURE:
877 go c.handleE2TSubscriptionDeleteFailure(msg)
879 xapp.Logger.Debug("Unknown Message Type '%d', discarding", msg.Mtype)
884 //-------------------------------------------------------------------
885 // handle from XAPP Subscription Request
886 //------------------------------------------------------------------
887 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
888 xapp.Logger.Debug("MSG from XAPP: %s", params.String())
889 c.UpdateCounter(cSubReqFromXapp)
891 subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
893 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
897 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
899 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
902 defer trans.Release()
904 if err = c.tracker.Track(trans); err != nil {
905 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
909 //TODO handle subscription toward e2term inside AssignToSubscription / hide handleSubscriptionCreate in it?
910 subs, _, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c, true)
912 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
916 c.wakeSubscriptionRequest(subs, trans)
919 //-------------------------------------------------------------------
920 // Wake Subscription Request to E2node
921 //------------------------------------------------------------------
922 func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) {
924 e2SubscriptionDirectives, _ := c.GetE2SubscriptionDirectives(nil)
925 go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives)
926 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
929 switch themsg := event.(type) {
930 case *e2ap.E2APSubscriptionResponse:
931 themsg.RequestId.Id = trans.RequestId.Id
932 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
935 c.UpdateCounter(cSubRespToXapp)
936 c.rmrSendToXapp("", subs, trans)
939 case *e2ap.E2APSubscriptionFailure:
940 themsg.RequestId.Id = trans.RequestId.Id
941 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
943 c.UpdateCounter(cSubFailToXapp)
944 c.rmrSendToXapp("", subs, trans)
950 xapp.Logger.Debug("XAPP-SubReq: failed %s", idstring(err, trans, subs))
951 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
954 //-------------------------------------------------------------------
955 // handle from XAPP Subscription Delete Request
956 //------------------------------------------------------------------
957 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
958 xapp.Logger.Debug("MSG from XAPP: %s", params.String())
959 c.UpdateCounter(cSubDelReqFromXapp)
961 subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
963 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
967 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
969 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
972 defer trans.Release()
974 err = c.tracker.Track(trans)
976 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
980 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
982 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
989 go c.handleSubscriptionDelete(subs, trans)
990 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
992 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
994 if subs.NoRespToXapp == true {
995 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
996 xapp.Logger.Debug("XAPP-SubDelReq: subs.NoRespToXapp == true")
1000 // Whatever is received success, fail or timeout, send successful delete response
1001 subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
1002 subDelRespMsg.RequestId.Id = trans.RequestId.Id
1003 subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
1004 subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
1005 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
1007 c.UpdateCounter(cSubDelRespToXapp)
1008 c.rmrSendToXapp("", subs, trans)
1011 //TODO handle subscription toward e2term insiged RemoveFromSubscription / hide handleSubscriptionDelete in it?
1012 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
1015 //-------------------------------------------------------------------
1016 // SUBS CREATE Handling
1017 //-------------------------------------------------------------------
1018 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp, e2SubscriptionDirectives *E2SubscriptionDirectives) {
1020 var event interface{} = nil
1021 var removeSubscriptionFromDb bool = false
1022 trans := c.tracker.NewSubsTransaction(subs)
1023 subs.WaitTransactionTurn(trans)
1024 defer subs.ReleaseTransactionTurn(trans)
1025 defer trans.Release()
1027 xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
1029 subRfMsg, valid := subs.GetCachedResponse()
1030 if subRfMsg == nil && valid == true {
1031 event = c.sendE2TSubscriptionRequest(subs, trans, parentTrans, e2SubscriptionDirectives)
1032 switch event.(type) {
1033 case *e2ap.E2APSubscriptionResponse:
1034 subRfMsg, valid = subs.SetCachedResponse(event, true)
1035 subs.SubRespRcvd = true
1036 case *e2ap.E2APSubscriptionFailure:
1037 removeSubscriptionFromDb = true
1038 subRfMsg, valid = subs.SetCachedResponse(event, false)
1039 xapp.Logger.Debug("SUBS-SubReq: internal delete due failure event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1040 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1041 case *SubmgrRestartTestEvent:
1042 // This simulates that no response has been received and after restart subscriptions are restored from db
1043 xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
1044 case *PackSubscriptionRequestErrortEvent, *SDLWriteErrortEvent:
1045 subRfMsg, valid = subs.SetCachedResponse(event, false)
1047 if subs.PolicyUpdate == false {
1048 xapp.Logger.Debug("SUBS-SubReq: internal delete due default event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1049 removeSubscriptionFromDb = true
1050 subRfMsg, valid = subs.SetCachedResponse(nil, false)
1051 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1054 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
1056 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
1059 err := c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
1061 subRfMsg, valid = subs.SetCachedResponse(event, false)
1062 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1065 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
1067 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
1070 parentTrans.SendEvent(subRfMsg, 0)
1073 //-------------------------------------------------------------------
1074 // SUBS DELETE Handling
1075 //-------------------------------------------------------------------
1077 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
1079 trans := c.tracker.NewSubsTransaction(subs)
1080 subs.WaitTransactionTurn(trans)
1081 defer subs.ReleaseTransactionTurn(trans)
1082 defer trans.Release()
1084 xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
1088 if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
1091 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1095 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
1096 // If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if
1097 // RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...))
1098 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
1099 c.registry.UpdateSubscriptionToDb(subs, c)
1100 parentTrans.SendEvent(nil, 0)
1103 //-------------------------------------------------------------------
1104 // send to E2T Subscription Request
1105 //-------------------------------------------------------------------
1106 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp, e2SubscriptionDirectives *E2SubscriptionDirectives) interface{} {
1108 var event interface{} = nil
1109 var timedOut bool = false
1110 const ricRequestorId = 123
1112 subReqMsg := subs.SubReqMsg
1113 subReqMsg.RequestId = subs.GetReqId().RequestId
1114 subReqMsg.RequestId.Id = ricRequestorId
1115 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
1117 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
1118 return &PackSubscriptionRequestErrortEvent{
1120 ErrorSource: models.SubscriptionInstanceErrorSourceASN1,
1121 ErrorCause: err.Error(),
1126 // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
1127 err = c.WriteSubscriptionToDb(subs)
1129 return &SDLWriteErrortEvent{
1131 ErrorSource: models.SubscriptionInstanceErrorSourceDBAAS,
1132 ErrorCause: err.Error(),
1137 for retries := int64(0); retries < e2SubscriptionDirectives.E2MaxTryCount; retries++ {
1138 desc := fmt.Sprintf("(retry %d)", retries)
1140 c.UpdateCounter(cSubReqToE2)
1142 c.UpdateCounter(cSubReReqToE2)
1144 c.rmrSendToE2T(desc, subs, trans)
1145 if subs.DoNotWaitSubResp == false {
1146 event, timedOut = trans.WaitEvent(e2SubscriptionDirectives.E2TimeoutTimerValue)
1148 c.UpdateCounter(cSubReqTimerExpiry)
1152 // Simulating case where subscrition request has been sent but response has not been received before restart
1153 event = &SubmgrRestartTestEvent{}
1154 xapp.Logger.Debug("Restart event, DoNotWaitSubResp == true")
1158 xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1162 //-------------------------------------------------------------------
1163 // send to E2T Subscription Delete Request
1164 //-------------------------------------------------------------------
1166 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
1168 var event interface{}
1170 const ricRequestorId = 123
1172 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1173 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1174 subDelReqMsg.RequestId.Id = ricRequestorId
1175 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1176 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1178 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
1182 for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
1183 desc := fmt.Sprintf("(retry %d)", retries)
1185 c.UpdateCounter(cSubDelReqToE2)
1187 c.UpdateCounter(cSubDelReReqToE2)
1189 c.rmrSendToE2T(desc, subs, trans)
1190 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
1192 c.UpdateCounter(cSubDelReqTimerExpiry)
1197 xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1201 //-------------------------------------------------------------------
1202 // handle from E2T Subscription Response
1203 //-------------------------------------------------------------------
1204 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
1205 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1206 c.UpdateCounter(cSubRespFromE2)
1208 subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
1210 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
1213 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
1215 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
1218 trans := subs.GetTransaction()
1220 err = fmt.Errorf("Ongoing transaction not found")
1221 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
1224 sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
1225 if sendOk == false {
1226 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1227 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
1232 //-------------------------------------------------------------------
1233 // handle from E2T Subscription Failure
1234 //-------------------------------------------------------------------
1235 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
1236 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1237 c.UpdateCounter(cSubFailFromE2)
1238 subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
1240 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
1243 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
1245 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
1248 trans := subs.GetTransaction()
1250 err = fmt.Errorf("Ongoing transaction not found")
1251 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
1254 sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
1255 if sendOk == false {
1256 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1257 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
1262 //-------------------------------------------------------------------
1263 // handle from E2T Subscription Delete Response
1264 //-------------------------------------------------------------------
1265 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
1266 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1267 c.UpdateCounter(cSubDelRespFromE2)
1268 subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
1270 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1273 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
1275 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1278 trans := subs.GetTransaction()
1280 err = fmt.Errorf("Ongoing transaction not found")
1281 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
1284 sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
1285 if sendOk == false {
1286 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1287 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
1292 //-------------------------------------------------------------------
1293 // handle from E2T Subscription Delete Failure
1294 //-------------------------------------------------------------------
1295 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
1296 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1297 c.UpdateCounter(cSubDelFailFromE2)
1298 subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
1300 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1303 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
1305 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1308 trans := subs.GetTransaction()
1310 err = fmt.Errorf("Ongoing transaction not found")
1311 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
1314 sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
1315 if sendOk == false {
1316 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1317 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
1322 //-------------------------------------------------------------------
1324 //-------------------------------------------------------------------
1325 func typeofSubsMessage(v interface{}) string {
1330 //case *e2ap.E2APSubscriptionRequest:
1332 case *e2ap.E2APSubscriptionResponse:
1334 case *e2ap.E2APSubscriptionFailure:
1336 //case *e2ap.E2APSubscriptionDeleteRequest:
1337 // return "SubDelReq"
1338 case *e2ap.E2APSubscriptionDeleteResponse:
1340 case *e2ap.E2APSubscriptionDeleteFailure:
1347 //-------------------------------------------------------------------
1349 //-------------------------------------------------------------------
1350 func (c *Control) WriteSubscriptionToDb(subs *Subscription) error {
1351 xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
1352 err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
1354 xapp.Logger.Error("%v", err)
1360 //-------------------------------------------------------------------
1362 //-------------------------------------------------------------------
1363 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) error {
1365 if removeSubscriptionFromDb == true {
1366 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1367 c.RemoveSubscriptionFromDb(subs)
1369 // Update is needed for successful response and merge case here
1370 if subs.RetryFromXapp == false {
1371 err := c.WriteSubscriptionToDb(subs)
1375 subs.RetryFromXapp = false
1379 //-------------------------------------------------------------------
1381 //-------------------------------------------------------------------
1382 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
1383 xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
1384 err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
1386 xapp.Logger.Error("%v", err)
1390 //-------------------------------------------------------------------
1392 //-------------------------------------------------------------------
1393 func (c *Control) WriteRESTSubscriptionToDb(restSubId string, restSubs *RESTSubscription) {
1394 xapp.Logger.Debug("WriteRESTSubscriptionToDb() restSubId = %s", restSubId)
1395 err := c.WriteRESTSubscriptionToSdl(restSubId, restSubs)
1397 xapp.Logger.Error("%v", err)
1401 //-------------------------------------------------------------------
1403 //-------------------------------------------------------------------
1404 func (c *Control) UpdateRESTSubscriptionInDB(restSubId string, restSubs *RESTSubscription, removeRestSubscriptionFromDb bool) {
1406 if removeRestSubscriptionFromDb == true {
1407 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1408 c.RemoveRESTSubscriptionFromDb(restSubId)
1410 c.WriteRESTSubscriptionToDb(restSubId, restSubs)
1414 //-------------------------------------------------------------------
1416 //-------------------------------------------------------------------
1417 func (c *Control) RemoveRESTSubscriptionFromDb(restSubId string) {
1418 xapp.Logger.Debug("RemoveRESTSubscriptionFromDb() restSubId = %s", restSubId)
1419 err := c.RemoveRESTSubscriptionFromSdl(restSubId)
1421 xapp.Logger.Error("%v", err)
1425 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) {
1427 const ricRequestorId = 123
1428 xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
1430 // Send delete for every endpoint in the subscription
1431 if subs.PolicyUpdate == false {
1432 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1433 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1434 subDelReqMsg.RequestId.Id = ricRequestorId
1435 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1436 mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1438 xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
1441 for _, endPoint := range subs.EpList.Endpoints {
1442 params := &xapp.RMRParams{}
1443 params.Mtype = mType
1444 params.SubId = int(subs.GetReqId().InstanceId)
1446 params.Meid = subs.Meid
1447 params.Src = endPoint.String()
1448 params.PayloadLen = len(payload.Buf)
1449 params.Payload = payload.Buf
1451 subs.DeleteFromDb = true
1452 c.handleXAPPSubscriptionDeleteRequest(params)
1457 func (c *Control) PrintRESTSubscriptionRequest(p *models.SubscriptionParams) {
1459 fmt.Println("CRESTSubscriptionRequest")
1465 if p.SubscriptionID != "" {
1466 fmt.Println(" SubscriptionID = ", p.SubscriptionID)
1468 fmt.Println(" SubscriptionID = ''")
1471 fmt.Printf(" ClientEndpoint.Host = %s\n", p.ClientEndpoint.Host)
1473 if p.ClientEndpoint.HTTPPort != nil {
1474 fmt.Printf(" ClientEndpoint.HTTPPort = %v\n", *p.ClientEndpoint.HTTPPort)
1476 fmt.Println(" ClientEndpoint.HTTPPort = nil")
1479 if p.ClientEndpoint.RMRPort != nil {
1480 fmt.Printf(" ClientEndpoint.RMRPort = %v\n", *p.ClientEndpoint.RMRPort)
1482 fmt.Println(" ClientEndpoint.RMRPort = nil")
1486 fmt.Printf(" Meid = %s\n", *p.Meid)
1488 fmt.Println(" Meid = nil")
1491 if p.E2SubscriptionDirectives == nil {
1492 fmt.Println(" E2SubscriptionDirectives = nil")
1494 fmt.Println(" E2SubscriptionDirectives")
1495 if p.E2SubscriptionDirectives.E2RetryCount == nil {
1496 fmt.Println(" E2RetryCount == nil")
1498 fmt.Printf(" E2RetryCount = %v\n", *p.E2SubscriptionDirectives.E2RetryCount)
1500 fmt.Printf(" E2TimeoutTimerValue = %v\n", p.E2SubscriptionDirectives.E2TimeoutTimerValue)
1501 fmt.Printf(" RMRRoutingNeeded = %v\n", p.E2SubscriptionDirectives.RMRRoutingNeeded)
1503 for _, subscriptionDetail := range p.SubscriptionDetails {
1504 if p.RANFunctionID != nil {
1505 fmt.Printf(" RANFunctionID = %v\n", *p.RANFunctionID)
1507 fmt.Println(" RANFunctionID = nil")
1509 fmt.Printf(" SubscriptionDetail.XappEventInstanceID = %v\n", *subscriptionDetail.XappEventInstanceID)
1510 fmt.Printf(" SubscriptionDetail.EventTriggers = %v\n", subscriptionDetail.EventTriggers)
1512 for _, actionToBeSetup := range subscriptionDetail.ActionToBeSetupList {
1513 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionID = %v\n", *actionToBeSetup.ActionID)
1514 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionType = %s\n", *actionToBeSetup.ActionType)
1515 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionDefinition = %v\n", actionToBeSetup.ActionDefinition)
1517 if actionToBeSetup.SubsequentAction != nil {
1518 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.SubsequentAction.SubsequentActionType = %s\n", *actionToBeSetup.SubsequentAction.SubsequentActionType)
1519 fmt.Printf(" SubscriptionDetail.ActionToBeSetup..SubsequentAction.TimeToWait = %s\n", *actionToBeSetup.SubsequentAction.TimeToWait)
1521 fmt.Println(" SubscriptionDetail.ActionToBeSetup.SubsequentAction = nil")