2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
30 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
31 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
32 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
33 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/common"
34 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
35 httptransport "github.com/go-openapi/runtime/client"
36 "github.com/go-openapi/strfmt"
37 "github.com/gorilla/mux"
38 "github.com/segmentio/ksuid"
39 "github.com/spf13/viper"
42 //-----------------------------------------------------------------------------
44 //-----------------------------------------------------------------------------
46 func idstring(err error, entries ...fmt.Stringer) string {
47 var retval string = ""
48 var filler string = ""
49 for _, entry := range entries {
51 retval += filler + entry.String()
54 retval += filler + "(NIL)"
58 retval += filler + "err(" + err.Error() + ")"
64 //-----------------------------------------------------------------------------
66 //-----------------------------------------------------------------------------
68 var e2tSubReqTimeout time.Duration
69 var e2tSubDelReqTime time.Duration
70 var e2tRecvMsgTimeout time.Duration
71 var waitRouteCleanup_ms time.Duration
72 var e2tMaxSubReqTryCount uint64 // Initial try + retry
73 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
74 var readSubsFromDb string
75 var dbRetryForever string
83 restDuplicateCtrl *DuplicateCtrl
85 restSubsDb Sdlnterface
88 Counters map[string]xapp.Counter
99 type SubmgrRestartTestEvent struct{}
100 type SubmgrRestartUpEvent struct{}
101 type PackSubscriptionRequestErrortEvent struct {
105 func (p *PackSubscriptionRequestErrortEvent) SetEvent(errorInfo *ErrorInfo) {
106 p.ErrorInfo = *errorInfo
109 type SDLWriteErrortEvent struct {
113 func (s *SDLWriteErrortEvent) SetEvent(errorInfo *ErrorInfo) {
114 s.ErrorInfo = *errorInfo
118 xapp.Logger.Debug("SUBMGR")
120 viper.SetEnvPrefix("submgr")
121 viper.AllowEmptyEnv(true)
124 func NewControl() *Control {
126 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
127 rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
129 registry := new(Registry)
130 registry.Initialize()
131 registry.rtmgrClient = &rtmgrClient
133 tracker := new(Tracker)
136 restDuplicateCtrl := new(DuplicateCtrl)
137 restDuplicateCtrl.Init()
139 c := &Control{e2ap: new(E2ap),
142 restDuplicateCtrl: restDuplicateCtrl,
143 e2SubsDb: CreateSdl(),
144 restSubsDb: CreateRESTSdl(),
145 Counters: xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
148 c.ReadConfigParameters("")
150 // Register REST handler for testing support
151 xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
152 xapp.Resource.InjectRoute("/ric/v1/restsubscriptions", c.GetAllRestSubscriptions, "GET")
153 xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET")
155 if readSubsFromDb == "false" {
159 // Read subscriptions from db
160 c.ReadE2Subscriptions()
161 c.ReadRESTSubscriptions()
163 go xapp.Subscription.Listen(c.RESTSubscriptionHandler, c.RESTQueryHandler, c.RESTSubscriptionDeleteHandler)
168 func (c *Control) SymptomDataHandler(w http.ResponseWriter, r *http.Request) {
169 subscriptions, _ := c.registry.QueryHandler()
170 xapp.Resource.SendSymptomDataJson(w, r, subscriptions, "platform/subscriptions.json")
173 //-------------------------------------------------------------------
175 //-------------------------------------------------------------------
176 func (c *Control) GetAllRestSubscriptions(w http.ResponseWriter, r *http.Request) {
177 xapp.Logger.Debug("GetAllRestSubscriptions() called")
178 response := c.registry.GetAllRestSubscriptions()
182 //-------------------------------------------------------------------
184 //-------------------------------------------------------------------
185 func (c *Control) ReadE2Subscriptions() error {
188 var register map[uint32]*Subscription
189 for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
190 xapp.Logger.Debug("Reading E2 subscriptions from db")
191 subIds, register, err = c.ReadAllSubscriptionsFromSdl()
193 xapp.Logger.Error("%v", err)
194 <-time.After(1 * time.Second)
196 c.registry.subIds = subIds
197 c.registry.register = register
198 c.HandleUncompletedSubscriptions(register)
202 xapp.Logger.Debug("Continuing without retring")
206 //-------------------------------------------------------------------
208 //-------------------------------------------------------------------
209 func (c *Control) ReadRESTSubscriptions() error {
211 var restSubscriptions map[string]*RESTSubscription
212 for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
213 xapp.Logger.Debug("Reading REST subscriptions from db")
214 restSubscriptions, err = c.ReadAllRESTSubscriptionsFromSdl()
216 xapp.Logger.Error("%v", err)
217 <-time.After(1 * time.Second)
219 c.registry.restSubscriptions = restSubscriptions
223 xapp.Logger.Debug("Continuing without retring")
227 //-------------------------------------------------------------------
229 //-------------------------------------------------------------------
230 func (c *Control) ReadConfigParameters(f string) {
232 c.LoggerLevel = int(xapp.Logger.GetLevel())
233 xapp.Logger.Debug("LoggerLevel %v", c.LoggerLevel)
235 // viper.GetDuration returns nanoseconds
236 e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
237 if e2tSubReqTimeout == 0 {
238 e2tSubReqTimeout = 2000 * 1000000
240 xapp.Logger.Debug("e2tSubReqTimeout %v", e2tSubReqTimeout)
242 e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
243 if e2tSubDelReqTime == 0 {
244 e2tSubDelReqTime = 2000 * 1000000
246 xapp.Logger.Debug("e2tSubDelReqTime %v", e2tSubDelReqTime)
247 e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
248 if e2tRecvMsgTimeout == 0 {
249 e2tRecvMsgTimeout = 2000 * 1000000
251 xapp.Logger.Debug("e2tRecvMsgTimeout %v", e2tRecvMsgTimeout)
253 e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
254 if e2tMaxSubReqTryCount == 0 {
255 e2tMaxSubReqTryCount = 1
257 xapp.Logger.Debug("e2tMaxSubReqTryCount %v", e2tMaxSubReqTryCount)
259 e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
260 if e2tMaxSubDelReqTryCount == 0 {
261 e2tMaxSubDelReqTryCount = 1
263 xapp.Logger.Debug("e2tMaxSubDelReqTryCount %v", e2tMaxSubDelReqTryCount)
265 readSubsFromDb = viper.GetString("controls.readSubsFromDb")
266 if readSubsFromDb == "" {
267 readSubsFromDb = "true"
269 xapp.Logger.Debug("readSubsFromDb %v", readSubsFromDb)
271 dbTryCount = viper.GetInt("controls.dbTryCount")
275 xapp.Logger.Debug("dbTryCount %v", dbTryCount)
277 dbRetryForever = viper.GetString("controls.dbRetryForever")
278 if dbRetryForever == "" {
279 dbRetryForever = "true"
281 xapp.Logger.Debug("dbRetryForever %v", dbRetryForever)
283 // Internal cfg parameter, used to define a wait time for RMR route clean-up. None default
284 // value 100ms used currently only in unittests.
285 waitRouteCleanup_ms = viper.GetDuration("controls.waitRouteCleanup_ms") * 1000000
286 if waitRouteCleanup_ms == 0 {
287 waitRouteCleanup_ms = 5000 * 1000000
289 xapp.Logger.Debug("waitRouteCleanup %v", waitRouteCleanup_ms)
292 //-------------------------------------------------------------------
294 //-------------------------------------------------------------------
295 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
297 xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
298 for subId, subs := range register {
299 if subs.SubRespRcvd == false {
300 // If policy subscription has already been made successfully unsuccessful update should not be deleted.
301 if subs.PolicyUpdate == false {
302 subs.NoRespToXapp = true
303 xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
304 c.SendSubscriptionDeleteReq(subs)
310 func (c *Control) ReadyCB(data interface{}) {
311 if c.RMRClient == nil {
312 c.RMRClient = xapp.Rmr
316 func (c *Control) Run() {
317 xapp.SetReadyCB(c.ReadyCB, nil)
318 xapp.AddConfigChangeListener(c.ReadConfigParameters)
322 //-------------------------------------------------------------------
324 //-------------------------------------------------------------------
325 func (c *Control) GetOrCreateRestSubscription(p *models.SubscriptionParams, md5sum string, xAppRmrEndpoint string) (*RESTSubscription, string, error) {
328 var restSubscription *RESTSubscription
331 prevRestSubsId, exists := c.restDuplicateCtrl.GetLastKnownRestSubsIdBasedOnMd5sum(md5sum)
332 if p.SubscriptionID == "" {
333 // Subscription does not contain REST subscription Id
335 restSubscription, err = c.registry.GetRESTSubscription(prevRestSubsId, false)
336 if restSubscription != nil {
337 // Subscription not found
338 restSubId = prevRestSubsId
340 xapp.Logger.Debug("Existing restSubId %s found by MD5sum %s for a request without subscription ID - using previous subscription", prevRestSubsId, md5sum)
342 xapp.Logger.Debug("Existing restSubId %s found by MD5sum %s for a request without subscription ID - Note: %s", prevRestSubsId, md5sum, err.Error())
345 xapp.Logger.Debug("None existing restSubId %s referred by MD5sum %s for a request without subscription ID - deleting cached entry", prevRestSubsId, md5sum)
346 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
350 if restSubscription == nil {
351 restSubId = ksuid.New().String()
352 restSubscription = c.registry.CreateRESTSubscription(&restSubId, &xAppRmrEndpoint, p.Meid)
355 // Subscription contains REST subscription Id
356 restSubId = p.SubscriptionID
358 xapp.Logger.Debug("RestSubscription ID %s provided via REST request", restSubId)
359 restSubscription, err = c.registry.GetRESTSubscription(restSubId, false)
361 // Subscription with id in REST request does not exist
362 xapp.Logger.Error("%s", err.Error())
363 c.UpdateCounter(cRestSubFailToXapp)
368 xapp.Logger.Debug("Existing restSubscription found for ID %s, new request based on md5sum", restSubId)
370 xapp.Logger.Debug("Existing restSubscription found for ID %s(%s), re-transmission based on md5sum match with previous request", prevRestSubsId, restSubId)
374 return restSubscription, restSubId, nil
377 //-------------------------------------------------------------------
379 //-------------------------------------------------------------------
380 func (c *Control) RESTSubscriptionHandler(params interface{}) (*models.SubscriptionResponse, int) {
383 c.UpdateCounter(cRestSubReqFromXapp)
385 subResp := models.SubscriptionResponse{}
386 p := params.(*models.SubscriptionParams)
388 if c.LoggerLevel > 2 {
389 c.PrintRESTSubscriptionRequest(p)
392 if p.ClientEndpoint == nil {
393 err := fmt.Errorf("ClientEndpoint == nil")
394 xapp.Logger.Error("%v", err)
395 c.UpdateCounter(cRestSubFailToXapp)
396 return nil, common.SubscribeBadRequestCode
399 _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*p.ClientEndpoint)
401 xapp.Logger.Error("%s", err.Error())
402 c.UpdateCounter(cRestSubFailToXapp)
403 return nil, common.SubscribeBadRequestCode
406 md5sum, err := CalculateRequestMd5sum(params)
408 xapp.Logger.Error("Failed to generate md5sum from incoming request - %s", err.Error())
411 restSubscription, restSubId, err := c.GetOrCreateRestSubscription(p, md5sum, xAppRmrEndpoint)
413 xapp.Logger.Error("Subscription with id in REST request does not exist")
414 return nil, common.SubscribeNotFoundCode
417 subResp.SubscriptionID = &restSubId
418 subReqList := e2ap.SubscriptionRequestList{}
419 err = c.e2ap.FillSubscriptionReqMsgs(params, &subReqList, restSubscription)
421 xapp.Logger.Error("%s", err.Error())
422 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
423 c.registry.DeleteRESTSubscription(&restSubId)
424 c.UpdateCounter(cRestSubFailToXapp)
425 return nil, common.SubscribeBadRequestCode
428 duplicate := c.restDuplicateCtrl.IsDuplicateToOngoingTransaction(restSubId, md5sum)
430 err := fmt.Errorf("Retransmission blocker direct ACK for request of restSubsId %s restSubId MD5sum %s as retransmission", restSubId, md5sum)
431 xapp.Logger.Debug("%s", err)
432 c.UpdateCounter(cRestSubRespToXapp)
433 return &subResp, common.SubscribeCreatedCode
436 c.WriteRESTSubscriptionToDb(restSubId, restSubscription)
437 e2SubscriptionDirectives, err := c.GetE2SubscriptionDirectives(p)
439 xapp.Logger.Error("%s", err)
440 return nil, common.SubscribeBadRequestCode
442 go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId, xAppRmrEndpoint, md5sum, e2SubscriptionDirectives)
444 c.UpdateCounter(cRestSubRespToXapp)
445 return &subResp, common.SubscribeCreatedCode
448 //-------------------------------------------------------------------
450 //-------------------------------------------------------------------
451 func (c *Control) GetE2SubscriptionDirectives(p *models.SubscriptionParams) (*E2SubscriptionDirectives, error) {
453 e2SubscriptionDirectives := &E2SubscriptionDirectives{}
454 if p == nil || p.E2SubscriptionDirectives == nil {
455 e2SubscriptionDirectives.E2TimeoutTimerValue = e2tSubReqTimeout
456 e2SubscriptionDirectives.E2MaxTryCount = int64(e2tMaxSubReqTryCount)
457 e2SubscriptionDirectives.CreateRMRRoute = true
458 xapp.Logger.Debug("p == nil || p.E2SubscriptionDirectives == nil. Using default values for E2TimeoutTimerValue = %v and E2RetryCount = %v RMRRoutingNeeded = true", e2tSubReqTimeout, e2tMaxSubReqTryCount)
460 if p.E2SubscriptionDirectives.E2TimeoutTimerValue >= 1 && p.E2SubscriptionDirectives.E2TimeoutTimerValue <= 10 {
461 e2SubscriptionDirectives.E2TimeoutTimerValue = time.Duration(p.E2SubscriptionDirectives.E2TimeoutTimerValue) * 1000000000 // Duration type cast returns nano seconds
463 return nil, fmt.Errorf("p.E2SubscriptionDirectives.E2TimeoutTimerValue out of range (1-10 seconds): %v", p.E2SubscriptionDirectives.E2TimeoutTimerValue)
465 if p.E2SubscriptionDirectives.E2RetryCount == nil {
466 xapp.Logger.Error("p.E2SubscriptionDirectives.E2RetryCount == nil. Using default value")
467 e2SubscriptionDirectives.E2MaxTryCount = int64(e2tMaxSubReqTryCount)
469 if *p.E2SubscriptionDirectives.E2RetryCount >= 0 && *p.E2SubscriptionDirectives.E2RetryCount <= 10 {
470 e2SubscriptionDirectives.E2MaxTryCount = *p.E2SubscriptionDirectives.E2RetryCount + 1 // E2MaxTryCount = First sending plus two retries
472 return nil, fmt.Errorf("p.E2SubscriptionDirectives.E2RetryCount out of range (0-10): %v", *p.E2SubscriptionDirectives.E2RetryCount)
475 e2SubscriptionDirectives.CreateRMRRoute = p.E2SubscriptionDirectives.RMRRoutingNeeded
477 xapp.Logger.Debug("e2SubscriptionDirectives.E2TimeoutTimerValue: %v", e2SubscriptionDirectives.E2TimeoutTimerValue)
478 xapp.Logger.Debug("e2SubscriptionDirectives.E2MaxTryCount: %v", e2SubscriptionDirectives.E2MaxTryCount)
479 xapp.Logger.Debug("e2SubscriptionDirectives.CreateRMRRoute: %v", e2SubscriptionDirectives.CreateRMRRoute)
480 return e2SubscriptionDirectives, nil
483 //-------------------------------------------------------------------
485 //-------------------------------------------------------------------
487 func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription, subReqList *e2ap.SubscriptionRequestList,
488 clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string, xAppRmrEndpoint string, md5sum string, e2SubscriptionDirectives *E2SubscriptionDirectives) {
490 c.SubscriptionProcessingStartDelay()
491 xapp.Logger.Debug("Subscription Request count=%v ", len(subReqList.E2APSubscriptionRequests))
493 var xAppEventInstanceID int64
494 var e2EventInstanceID int64
495 errorInfo := &ErrorInfo{}
497 defer c.restDuplicateCtrl.SetMd5sumFromLastOkRequest(*restSubId, md5sum)
499 for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ {
500 subReqMsg := subReqList.E2APSubscriptionRequests[index]
501 xAppEventInstanceID = (int64)(subReqMsg.RequestId.Id)
503 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), *restSubId, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid})
505 // Send notification to xApp that prosessing of a Subscription Request has failed.
506 err := fmt.Errorf("Tracking failure")
507 errorInfo.ErrorCause = err.Error()
508 c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans, errorInfo)
512 xapp.Logger.Debug("Handle SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
514 subRespMsg, errorInfo, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, *restSubId, e2SubscriptionDirectives)
516 xapp.Logger.Debug("Handled SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
520 c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans, errorInfo)
522 e2EventInstanceID = (int64)(subRespMsg.RequestId.InstanceId)
523 restSubscription.AddMd5Sum(md5sum)
524 xapp.Logger.Debug("SubscriptionRequest index=%v processed successfullyfor %s. endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
525 index, *restSubId, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
526 c.sendSuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, e2EventInstanceID, clientEndpoint, trans)
531 //-------------------------------------------------------------------
533 //------------------------------------------------------------------
534 func (c *Control) SubscriptionProcessingStartDelay() {
535 if c.UTTesting == true {
536 // This is temporary fix for the UT problem that notification arrives before subscription response
537 // Correct fix would be to allow notification come before response and process it correctly
538 xapp.Logger.Debug("Setting 50 ms delay before starting processing Subscriptions")
539 <-time.After(time.Millisecond * 50)
540 xapp.Logger.Debug("Continuing after delay")
544 //-------------------------------------------------------------------
546 //------------------------------------------------------------------
547 func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, meid *string,
548 restSubId string, e2SubscriptionDirectives *E2SubscriptionDirectives) (*e2ap.E2APSubscriptionResponse, *ErrorInfo, error) {
550 errorInfo := ErrorInfo{}
552 err := c.tracker.Track(trans)
554 xapp.Logger.Error("XAPP-SubReq Tracking error: %s", idstring(err, trans))
555 errorInfo.ErrorCause = err.Error()
556 err = fmt.Errorf("Tracking failure")
557 return nil, &errorInfo, err
560 subs, errorInfo, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c, e2SubscriptionDirectives.CreateRMRRoute)
562 xapp.Logger.Error("XAPP-SubReq Assign error: %s", idstring(err, trans))
563 return nil, &errorInfo, err
569 go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives)
570 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
574 switch themsg := event.(type) {
575 case *e2ap.E2APSubscriptionResponse:
577 return themsg, &errorInfo, nil
578 case *e2ap.E2APSubscriptionFailure:
579 err = fmt.Errorf("E2 SubscriptionFailure received")
580 errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
581 return nil, &errorInfo, err
582 case *PackSubscriptionRequestErrortEvent:
583 err = fmt.Errorf("E2 SubscriptionRequest pack failure")
584 return nil, &themsg.ErrorInfo, err
585 case *SDLWriteErrortEvent:
586 err = fmt.Errorf("SDL write failure")
587 return nil, &themsg.ErrorInfo, err
589 err = fmt.Errorf("Unexpected E2 subscription response received")
590 errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
594 err = fmt.Errorf("E2 subscription response timeout")
595 errorInfo.SetInfo(err.Error(), "", models.SubscriptionInstanceTimeoutTypeE2Timeout)
596 if subs.PolicyUpdate == true {
597 return nil, &errorInfo, err
601 xapp.Logger.Error("XAPP-SubReq E2 subscription failed %s", idstring(err, trans, subs))
602 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
603 return nil, &errorInfo, err
606 //-------------------------------------------------------------------
608 //-------------------------------------------------------------------
609 func (c *Control) sendUnsuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, err error,
610 clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp, errorInfo *ErrorInfo) {
612 // Send notification to xApp that prosessing of a Subscription Request has failed.
613 e2EventInstanceID := (int64)(0)
614 if errorInfo.ErrorSource == "" {
615 // Submgr is default source of error
616 errorInfo.ErrorSource = models.SubscriptionInstanceErrorSourceSUBMGR
618 resp := &models.SubscriptionResponse{
619 SubscriptionID: restSubId,
620 SubscriptionInstances: []*models.SubscriptionInstance{
621 &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
622 ErrorCause: errorInfo.ErrorCause,
623 ErrorSource: errorInfo.ErrorSource,
624 TimeoutType: errorInfo.TimeoutType,
625 XappEventInstanceID: &xAppEventInstanceID},
628 // Mark REST subscription request processed.
629 restSubscription.SetProcessed(err)
630 c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
632 xapp.Logger.Debug("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
633 errorInfo.ErrorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
635 xapp.Logger.Debug("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v",
636 errorInfo.ErrorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID)
639 c.UpdateCounter(cRestSubFailNotifToXapp)
640 xapp.Subscription.Notify(resp, *clientEndpoint)
643 //-------------------------------------------------------------------
645 //-------------------------------------------------------------------
646 func (c *Control) sendSuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, e2EventInstanceID int64,
647 clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp) {
649 // Store successfully processed InstanceId for deletion
650 restSubscription.AddE2InstanceId((uint32)(e2EventInstanceID))
651 restSubscription.AddXappIdToE2Id(xAppEventInstanceID, e2EventInstanceID)
653 // Send notification to xApp that a Subscription Request has been processed.
654 resp := &models.SubscriptionResponse{
655 SubscriptionID: restSubId,
656 SubscriptionInstances: []*models.SubscriptionInstance{
657 &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
659 XappEventInstanceID: &xAppEventInstanceID},
662 // Mark REST subscription request processesd.
663 restSubscription.SetProcessed(nil)
664 c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
665 xapp.Logger.Debug("Sending successful REST notification to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
666 clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
668 c.UpdateCounter(cRestSubNotifToXapp)
669 xapp.Subscription.Notify(resp, *clientEndpoint)
672 //-------------------------------------------------------------------
674 //-------------------------------------------------------------------
675 func (c *Control) RESTSubscriptionDeleteHandler(restSubId string) int {
678 c.UpdateCounter(cRestSubDelReqFromXapp)
680 xapp.Logger.Debug("SubscriptionDeleteRequest from XAPP")
682 restSubscription, err := c.registry.GetRESTSubscription(restSubId, true)
684 xapp.Logger.Error("%s", err.Error())
685 if restSubscription == nil {
686 // Subscription was not found
687 return common.UnsubscribeNoContentCode
689 if restSubscription.SubReqOngoing == true {
690 err := fmt.Errorf("Handling of the REST Subscription Request still ongoing %s", restSubId)
691 xapp.Logger.Error("%s", err.Error())
692 return common.UnsubscribeBadRequestCode
693 } else if restSubscription.SubDelReqOngoing == true {
694 // Previous request for same restSubId still ongoing
695 return common.UnsubscribeBadRequestCode
700 xAppRmrEndPoint := restSubscription.xAppRmrEndPoint
702 xapp.Logger.Debug("Deleteting handler: processing instances = %v", restSubscription.InstanceIds)
703 for _, instanceId := range restSubscription.InstanceIds {
704 xAppEventInstanceID, err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId)
707 xapp.Logger.Error("%s", err.Error())
709 xapp.Logger.Debug("Deleteting instanceId = %v", instanceId)
710 restSubscription.DeleteXappIdToE2Id(xAppEventInstanceID)
711 restSubscription.DeleteE2InstanceId(instanceId)
713 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(restSubscription.lastReqMd5sum)
714 c.registry.DeleteRESTSubscription(&restSubId)
715 c.RemoveRESTSubscriptionFromDb(restSubId)
718 c.UpdateCounter(cRestSubDelRespToXapp)
720 return common.UnsubscribeNoContentCode
723 //-------------------------------------------------------------------
725 //-------------------------------------------------------------------
726 func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32) (int64, error) {
728 var xAppEventInstanceID int64
729 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{instanceId})
731 xapp.Logger.Debug("Subscription Delete Handler subscription for restSubId=%v, E2EventInstanceID=%v not found %s",
732 restSubId, instanceId, idstring(err, nil))
733 return xAppEventInstanceID, nil
736 xAppEventInstanceID = int64(subs.ReqId.Id)
737 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(*endPoint), *restSubId, e2ap.RequestId{subs.ReqId.Id, 0}, &xapp.RMRMeid{RanName: *meid})
739 err := fmt.Errorf("XAPP-SubDelReq transaction not created. restSubId %s, endPoint %s, meid %s, instanceId %v", *restSubId, *endPoint, *meid, instanceId)
740 xapp.Logger.Error("%s", err.Error())
742 defer trans.Release()
744 err = c.tracker.Track(trans)
746 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
747 xapp.Logger.Error("%s", err.Error())
748 return xAppEventInstanceID, &time.ParseError{}
753 go c.handleSubscriptionDelete(subs, trans)
754 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
756 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
758 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
760 return xAppEventInstanceID, nil
763 //-------------------------------------------------------------------
765 //-------------------------------------------------------------------
766 func (c *Control) RESTQueryHandler() (models.SubscriptionList, error) {
767 xapp.Logger.Debug("RESTQueryHandler() called")
771 return c.registry.QueryHandler()
774 func (c *Control) TestRestHandler(w http.ResponseWriter, r *http.Request) {
775 xapp.Logger.Debug("RESTTestRestHandler() called")
777 pathParams := mux.Vars(r)
778 s := pathParams["testId"]
780 // This can be used to delete single subscription from db
781 if contains := strings.Contains(s, "deletesubid="); contains == true {
782 var splits = strings.Split(s, "=")
783 if subId, err := strconv.ParseInt(splits[1], 10, 64); err == nil {
784 xapp.Logger.Debug("RemoveSubscriptionFromSdl() called. subId = %v", subId)
785 c.RemoveSubscriptionFromSdl(uint32(subId))
790 // This can be used to remove all subscriptions db from
792 xapp.Logger.Debug("RemoveAllSubscriptionsFromSdl() called")
793 c.RemoveAllSubscriptionsFromSdl()
794 c.RemoveAllRESTSubscriptionsFromSdl()
798 // This is meant to cause submgr's restart in testing
800 xapp.Logger.Debug("os.Exit(1) called")
804 xapp.Logger.Debug("Unsupported rest command received %s", s)
807 //-------------------------------------------------------------------
809 //-------------------------------------------------------------------
811 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
812 params := &xapp.RMRParams{}
813 params.Mtype = trans.GetMtype()
814 params.SubId = int(subs.GetReqId().InstanceId)
816 params.Meid = subs.GetMeid()
818 params.PayloadLen = len(trans.Payload.Buf)
819 params.Payload = trans.Payload.Buf
821 xapp.Logger.Debug("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
822 err = c.SendWithRetry(params, false, 5)
824 xapp.Logger.Error("rmrSendToE2T: Send failed: %+v", err)
829 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
831 params := &xapp.RMRParams{}
832 params.Mtype = trans.GetMtype()
833 params.SubId = int(subs.GetReqId().InstanceId)
834 params.Xid = trans.GetXid()
835 params.Meid = trans.GetMeid()
837 params.PayloadLen = len(trans.Payload.Buf)
838 params.Payload = trans.Payload.Buf
840 xapp.Logger.Debug("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
841 err = c.SendWithRetry(params, false, 5)
843 xapp.Logger.Error("rmrSendToXapp: Send failed: %+v", err)
848 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
849 if c.RMRClient == nil {
850 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
851 xapp.Logger.Error("%s", err.Error())
856 defer c.RMRClient.Free(msg.Mbuf)
858 // xapp-frame might use direct access to c buffer and
859 // when msg.Mbuf is freed, someone might take it into use
860 // and payload data might be invalid inside message handle function
862 // subscriptions won't load system a lot so there is no
863 // real performance hit by cloning buffer into new go byte slice
864 cPay := append(msg.Payload[:0:0], msg.Payload...)
866 msg.PayloadLen = len(cPay)
869 case xapp.RIC_SUB_REQ:
870 go c.handleXAPPSubscriptionRequest(msg)
871 case xapp.RIC_SUB_RESP:
872 go c.handleE2TSubscriptionResponse(msg)
873 case xapp.RIC_SUB_FAILURE:
874 go c.handleE2TSubscriptionFailure(msg)
875 case xapp.RIC_SUB_DEL_REQ:
876 go c.handleXAPPSubscriptionDeleteRequest(msg)
877 case xapp.RIC_SUB_DEL_RESP:
878 go c.handleE2TSubscriptionDeleteResponse(msg)
879 case xapp.RIC_SUB_DEL_FAILURE:
880 go c.handleE2TSubscriptionDeleteFailure(msg)
882 xapp.Logger.Debug("Unknown Message Type '%d', discarding", msg.Mtype)
887 //-------------------------------------------------------------------
888 // handle from XAPP Subscription Request
889 //------------------------------------------------------------------
890 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
891 xapp.Logger.Debug("MSG from XAPP: %s", params.String())
892 c.UpdateCounter(cSubReqFromXapp)
894 subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
896 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
900 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
902 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
905 defer trans.Release()
907 if err = c.tracker.Track(trans); err != nil {
908 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
912 //TODO handle subscription toward e2term inside AssignToSubscription / hide handleSubscriptionCreate in it?
913 subs, _, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c, true)
915 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
919 c.wakeSubscriptionRequest(subs, trans)
922 //-------------------------------------------------------------------
923 // Wake Subscription Request to E2node
924 //------------------------------------------------------------------
925 func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) {
927 e2SubscriptionDirectives, _ := c.GetE2SubscriptionDirectives(nil)
928 go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives)
929 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
932 switch themsg := event.(type) {
933 case *e2ap.E2APSubscriptionResponse:
934 themsg.RequestId.Id = trans.RequestId.Id
935 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
938 c.UpdateCounter(cSubRespToXapp)
939 c.rmrSendToXapp("", subs, trans)
942 case *e2ap.E2APSubscriptionFailure:
943 themsg.RequestId.Id = trans.RequestId.Id
944 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
946 c.UpdateCounter(cSubFailToXapp)
947 c.rmrSendToXapp("", subs, trans)
953 xapp.Logger.Debug("XAPP-SubReq: failed %s", idstring(err, trans, subs))
954 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
957 //-------------------------------------------------------------------
958 // handle from XAPP Subscription Delete Request
959 //------------------------------------------------------------------
960 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
961 xapp.Logger.Debug("MSG from XAPP: %s", params.String())
962 c.UpdateCounter(cSubDelReqFromXapp)
964 subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
966 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
970 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
972 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
975 defer trans.Release()
977 err = c.tracker.Track(trans)
979 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
983 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
985 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
992 go c.handleSubscriptionDelete(subs, trans)
993 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
995 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
997 if subs.NoRespToXapp == true {
998 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
999 xapp.Logger.Debug("XAPP-SubDelReq: subs.NoRespToXapp == true")
1003 // Whatever is received success, fail or timeout, send successful delete response
1004 subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
1005 subDelRespMsg.RequestId.Id = trans.RequestId.Id
1006 subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
1007 subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
1008 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
1010 c.UpdateCounter(cSubDelRespToXapp)
1011 c.rmrSendToXapp("", subs, trans)
1014 //TODO handle subscription toward e2term insiged RemoveFromSubscription / hide handleSubscriptionDelete in it?
1015 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
1018 //-------------------------------------------------------------------
1019 // SUBS CREATE Handling
1020 //-------------------------------------------------------------------
1021 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp, e2SubscriptionDirectives *E2SubscriptionDirectives) {
1023 var event interface{} = nil
1024 var removeSubscriptionFromDb bool = false
1025 trans := c.tracker.NewSubsTransaction(subs)
1026 subs.WaitTransactionTurn(trans)
1027 defer subs.ReleaseTransactionTurn(trans)
1028 defer trans.Release()
1030 xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
1032 subRfMsg, valid := subs.GetCachedResponse()
1033 if subRfMsg == nil && valid == true {
1034 event = c.sendE2TSubscriptionRequest(subs, trans, parentTrans, e2SubscriptionDirectives)
1035 switch event.(type) {
1036 case *e2ap.E2APSubscriptionResponse:
1037 subRfMsg, valid = subs.SetCachedResponse(event, true)
1038 subs.SubRespRcvd = true
1039 case *e2ap.E2APSubscriptionFailure:
1040 removeSubscriptionFromDb = true
1041 subRfMsg, valid = subs.SetCachedResponse(event, false)
1042 xapp.Logger.Debug("SUBS-SubReq: internal delete due failure event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1043 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1044 case *SubmgrRestartTestEvent:
1045 // This simulates that no response has been received and after restart subscriptions are restored from db
1046 xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
1047 case *PackSubscriptionRequestErrortEvent, *SDLWriteErrortEvent:
1048 subRfMsg, valid = subs.SetCachedResponse(event, false)
1050 if subs.PolicyUpdate == false {
1051 xapp.Logger.Debug("SUBS-SubReq: internal delete due default event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1052 removeSubscriptionFromDb = true
1053 subRfMsg, valid = subs.SetCachedResponse(nil, false)
1054 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1057 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
1059 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
1062 err := c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
1064 subRfMsg, valid = subs.SetCachedResponse(event, false)
1065 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1068 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
1070 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
1073 parentTrans.SendEvent(subRfMsg, 0)
1076 //-------------------------------------------------------------------
1077 // SUBS DELETE Handling
1078 //-------------------------------------------------------------------
1080 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
1082 trans := c.tracker.NewSubsTransaction(subs)
1083 subs.WaitTransactionTurn(trans)
1084 defer subs.ReleaseTransactionTurn(trans)
1085 defer trans.Release()
1087 xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
1091 if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
1094 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1098 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
1099 // If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if
1100 // RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...))
1101 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
1102 c.registry.UpdateSubscriptionToDb(subs, c)
1103 parentTrans.SendEvent(nil, 0)
1106 //-------------------------------------------------------------------
1107 // send to E2T Subscription Request
1108 //-------------------------------------------------------------------
1109 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp, e2SubscriptionDirectives *E2SubscriptionDirectives) interface{} {
1111 var event interface{} = nil
1112 var timedOut bool = false
1113 const ricRequestorId = 123
1115 subReqMsg := subs.SubReqMsg
1116 subReqMsg.RequestId = subs.GetReqId().RequestId
1117 subReqMsg.RequestId.Id = ricRequestorId
1118 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
1120 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
1121 return &PackSubscriptionRequestErrortEvent{
1123 ErrorSource: models.SubscriptionInstanceErrorSourceASN1,
1124 ErrorCause: err.Error(),
1129 // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
1130 err = c.WriteSubscriptionToDb(subs)
1132 return &SDLWriteErrortEvent{
1134 ErrorSource: models.SubscriptionInstanceErrorSourceDBAAS,
1135 ErrorCause: err.Error(),
1140 for retries := int64(0); retries < e2SubscriptionDirectives.E2MaxTryCount; retries++ {
1141 desc := fmt.Sprintf("(retry %d)", retries)
1143 c.UpdateCounter(cSubReqToE2)
1145 c.UpdateCounter(cSubReReqToE2)
1147 c.rmrSendToE2T(desc, subs, trans)
1148 if subs.DoNotWaitSubResp == false {
1149 event, timedOut = trans.WaitEvent(e2SubscriptionDirectives.E2TimeoutTimerValue)
1151 c.UpdateCounter(cSubReqTimerExpiry)
1155 // Simulating case where subscrition request has been sent but response has not been received before restart
1156 event = &SubmgrRestartTestEvent{}
1157 xapp.Logger.Debug("Restart event, DoNotWaitSubResp == true")
1161 xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1165 //-------------------------------------------------------------------
1166 // send to E2T Subscription Delete Request
1167 //-------------------------------------------------------------------
1169 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
1171 var event interface{}
1173 const ricRequestorId = 123
1175 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1176 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1177 subDelReqMsg.RequestId.Id = ricRequestorId
1178 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1179 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1181 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
1185 for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
1186 desc := fmt.Sprintf("(retry %d)", retries)
1188 c.UpdateCounter(cSubDelReqToE2)
1190 c.UpdateCounter(cSubDelReReqToE2)
1192 c.rmrSendToE2T(desc, subs, trans)
1193 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
1195 c.UpdateCounter(cSubDelReqTimerExpiry)
1200 xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1204 //-------------------------------------------------------------------
1205 // handle from E2T Subscription Response
1206 //-------------------------------------------------------------------
1207 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
1208 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1209 c.UpdateCounter(cSubRespFromE2)
1211 subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
1213 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
1216 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
1218 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
1221 trans := subs.GetTransaction()
1223 err = fmt.Errorf("Ongoing transaction not found")
1224 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
1227 sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
1228 if sendOk == false {
1229 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1230 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
1235 //-------------------------------------------------------------------
1236 // handle from E2T Subscription Failure
1237 //-------------------------------------------------------------------
1238 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
1239 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1240 c.UpdateCounter(cSubFailFromE2)
1241 subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
1243 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
1246 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
1248 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
1251 trans := subs.GetTransaction()
1253 err = fmt.Errorf("Ongoing transaction not found")
1254 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
1257 sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
1258 if sendOk == false {
1259 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1260 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
1265 //-------------------------------------------------------------------
1266 // handle from E2T Subscription Delete Response
1267 //-------------------------------------------------------------------
1268 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
1269 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1270 c.UpdateCounter(cSubDelRespFromE2)
1271 subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
1273 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1276 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
1278 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1281 trans := subs.GetTransaction()
1283 err = fmt.Errorf("Ongoing transaction not found")
1284 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
1287 sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
1288 if sendOk == false {
1289 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1290 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
1295 //-------------------------------------------------------------------
1296 // handle from E2T Subscription Delete Failure
1297 //-------------------------------------------------------------------
1298 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
1299 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1300 c.UpdateCounter(cSubDelFailFromE2)
1301 subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
1303 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1306 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
1308 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1311 trans := subs.GetTransaction()
1313 err = fmt.Errorf("Ongoing transaction not found")
1314 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
1317 sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
1318 if sendOk == false {
1319 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1320 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
1325 //-------------------------------------------------------------------
1327 //-------------------------------------------------------------------
1328 func typeofSubsMessage(v interface{}) string {
1333 //case *e2ap.E2APSubscriptionRequest:
1335 case *e2ap.E2APSubscriptionResponse:
1337 case *e2ap.E2APSubscriptionFailure:
1339 //case *e2ap.E2APSubscriptionDeleteRequest:
1340 // return "SubDelReq"
1341 case *e2ap.E2APSubscriptionDeleteResponse:
1343 case *e2ap.E2APSubscriptionDeleteFailure:
1350 //-------------------------------------------------------------------
1352 //-------------------------------------------------------------------
1353 func (c *Control) WriteSubscriptionToDb(subs *Subscription) error {
1354 xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
1355 err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
1357 xapp.Logger.Error("%v", err)
1363 //-------------------------------------------------------------------
1365 //-------------------------------------------------------------------
1366 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) error {
1368 if removeSubscriptionFromDb == true {
1369 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1370 c.RemoveSubscriptionFromDb(subs)
1372 // Update is needed for successful response and merge case here
1373 if subs.RetryFromXapp == false {
1374 err := c.WriteSubscriptionToDb(subs)
1378 subs.RetryFromXapp = false
1382 //-------------------------------------------------------------------
1384 //-------------------------------------------------------------------
1385 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
1386 xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
1387 err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
1389 xapp.Logger.Error("%v", err)
1393 //-------------------------------------------------------------------
1395 //-------------------------------------------------------------------
1396 func (c *Control) WriteRESTSubscriptionToDb(restSubId string, restSubs *RESTSubscription) {
1397 xapp.Logger.Debug("WriteRESTSubscriptionToDb() restSubId = %s", restSubId)
1398 err := c.WriteRESTSubscriptionToSdl(restSubId, restSubs)
1400 xapp.Logger.Error("%v", err)
1404 //-------------------------------------------------------------------
1406 //-------------------------------------------------------------------
1407 func (c *Control) UpdateRESTSubscriptionInDB(restSubId string, restSubs *RESTSubscription, removeRestSubscriptionFromDb bool) {
1409 if removeRestSubscriptionFromDb == true {
1410 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1411 c.RemoveRESTSubscriptionFromDb(restSubId)
1413 c.WriteRESTSubscriptionToDb(restSubId, restSubs)
1417 //-------------------------------------------------------------------
1419 //-------------------------------------------------------------------
1420 func (c *Control) RemoveRESTSubscriptionFromDb(restSubId string) {
1421 xapp.Logger.Debug("RemoveRESTSubscriptionFromDb() restSubId = %s", restSubId)
1422 err := c.RemoveRESTSubscriptionFromSdl(restSubId)
1424 xapp.Logger.Error("%v", err)
1428 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) {
1430 const ricRequestorId = 123
1431 xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
1433 // Send delete for every endpoint in the subscription
1434 if subs.PolicyUpdate == false {
1435 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1436 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1437 subDelReqMsg.RequestId.Id = ricRequestorId
1438 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1439 mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1441 xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
1444 for _, endPoint := range subs.EpList.Endpoints {
1445 params := &xapp.RMRParams{}
1446 params.Mtype = mType
1447 params.SubId = int(subs.GetReqId().InstanceId)
1449 params.Meid = subs.Meid
1450 params.Src = endPoint.String()
1451 params.PayloadLen = len(payload.Buf)
1452 params.Payload = payload.Buf
1454 subs.DeleteFromDb = true
1455 c.handleXAPPSubscriptionDeleteRequest(params)
1460 func (c *Control) PrintRESTSubscriptionRequest(p *models.SubscriptionParams) {
1462 fmt.Println("CRESTSubscriptionRequest")
1468 if p.SubscriptionID != "" {
1469 fmt.Println(" SubscriptionID = ", p.SubscriptionID)
1471 fmt.Println(" SubscriptionID = ''")
1474 fmt.Printf(" ClientEndpoint.Host = %s\n", p.ClientEndpoint.Host)
1476 if p.ClientEndpoint.HTTPPort != nil {
1477 fmt.Printf(" ClientEndpoint.HTTPPort = %v\n", *p.ClientEndpoint.HTTPPort)
1479 fmt.Println(" ClientEndpoint.HTTPPort = nil")
1482 if p.ClientEndpoint.RMRPort != nil {
1483 fmt.Printf(" ClientEndpoint.RMRPort = %v\n", *p.ClientEndpoint.RMRPort)
1485 fmt.Println(" ClientEndpoint.RMRPort = nil")
1489 fmt.Printf(" Meid = %s\n", *p.Meid)
1491 fmt.Println(" Meid = nil")
1494 if p.E2SubscriptionDirectives == nil {
1495 fmt.Println(" E2SubscriptionDirectives = nil")
1497 fmt.Println(" E2SubscriptionDirectives")
1498 if p.E2SubscriptionDirectives.E2RetryCount == nil {
1499 fmt.Println(" E2RetryCount == nil")
1501 fmt.Printf(" E2RetryCount = %v\n", *p.E2SubscriptionDirectives.E2RetryCount)
1503 fmt.Printf(" E2TimeoutTimerValue = %v\n", p.E2SubscriptionDirectives.E2TimeoutTimerValue)
1504 fmt.Printf(" RMRRoutingNeeded = %v\n", p.E2SubscriptionDirectives.RMRRoutingNeeded)
1506 for _, subscriptionDetail := range p.SubscriptionDetails {
1507 if p.RANFunctionID != nil {
1508 fmt.Printf(" RANFunctionID = %v\n", *p.RANFunctionID)
1510 fmt.Println(" RANFunctionID = nil")
1512 fmt.Printf(" SubscriptionDetail.XappEventInstanceID = %v\n", *subscriptionDetail.XappEventInstanceID)
1513 fmt.Printf(" SubscriptionDetail.EventTriggers = %v\n", subscriptionDetail.EventTriggers)
1515 for _, actionToBeSetup := range subscriptionDetail.ActionToBeSetupList {
1516 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionID = %v\n", *actionToBeSetup.ActionID)
1517 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionType = %s\n", *actionToBeSetup.ActionType)
1518 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionDefinition = %v\n", actionToBeSetup.ActionDefinition)
1520 if actionToBeSetup.SubsequentAction != nil {
1521 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.SubsequentAction.SubsequentActionType = %s\n", *actionToBeSetup.SubsequentAction.SubsequentActionType)
1522 fmt.Printf(" SubscriptionDetail.ActionToBeSetup..SubsequentAction.TimeToWait = %s\n", *actionToBeSetup.SubsequentAction.TimeToWait)
1524 fmt.Println(" SubscriptionDetail.ActionToBeSetup.SubsequentAction = nil")