2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
30 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
31 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
32 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
33 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/common"
34 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
35 httptransport "github.com/go-openapi/runtime/client"
36 "github.com/go-openapi/strfmt"
37 "github.com/gorilla/mux"
38 "github.com/segmentio/ksuid"
39 "github.com/spf13/viper"
42 //-----------------------------------------------------------------------------
44 //-----------------------------------------------------------------------------
46 func idstring(err error, entries ...fmt.Stringer) string {
47 var retval string = ""
48 var filler string = ""
49 for _, entry := range entries {
51 retval += filler + entry.String()
54 retval += filler + "(NIL)"
58 retval += filler + "err(" + err.Error() + ")"
64 //-----------------------------------------------------------------------------
66 //-----------------------------------------------------------------------------
68 var e2tSubReqTimeout time.Duration
69 var e2tSubDelReqTime time.Duration
70 var e2tRecvMsgTimeout time.Duration
71 var waitRouteCleanup_ms time.Duration
72 var e2tMaxSubReqTryCount uint64 // Initial try + retry
73 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
74 var readSubsFromDb string
75 var restDuplicateCtrl duplicateCtrl
76 var dbRetryForever string
85 restSubsDb Sdlnterface
88 Counters map[string]xapp.Counter
98 type SubmgrRestartTestEvent struct{}
99 type SubmgrRestartUpEvent struct{}
100 type PackSubscriptionRequestErrortEvent struct {
104 func (p *PackSubscriptionRequestErrortEvent) SetEvent(errorInfo *ErrorInfo) {
105 p.ErrorInfo = *errorInfo
108 type SDLWriteErrortEvent struct {
112 func (s *SDLWriteErrortEvent) SetEvent(errorInfo *ErrorInfo) {
113 s.ErrorInfo = *errorInfo
117 xapp.Logger.Debug("SUBMGR")
119 viper.SetEnvPrefix("submgr")
120 viper.AllowEmptyEnv(true)
123 func NewControl() *Control {
125 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
126 rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
128 registry := new(Registry)
129 registry.Initialize()
130 registry.rtmgrClient = &rtmgrClient
132 tracker := new(Tracker)
135 c := &Control{e2ap: new(E2ap),
138 e2SubsDb: CreateSdl(),
139 restSubsDb: CreateRESTSdl(),
140 Counters: xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
143 c.ReadConfigParameters("")
145 // Register REST handler for testing support
146 xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
147 xapp.Resource.InjectRoute("/ric/v1/restsubscriptions", c.GetAllRestSubscriptions, "GET")
148 xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET")
150 go xapp.Subscription.Listen(c.RESTSubscriptionHandler, c.RESTQueryHandler, c.RESTSubscriptionDeleteHandler)
152 if readSubsFromDb == "false" {
156 restDuplicateCtrl.Init()
158 // Read subscriptions from db
159 c.ReadE2Subscriptions()
160 c.ReadRESTSubscriptions()
164 func (c *Control) SymptomDataHandler(w http.ResponseWriter, r *http.Request) {
165 subscriptions, _ := c.registry.QueryHandler()
166 xapp.Resource.SendSymptomDataJson(w, r, subscriptions, "platform/subscriptions.json")
169 //-------------------------------------------------------------------
171 //-------------------------------------------------------------------
172 func (c *Control) GetAllRestSubscriptions(w http.ResponseWriter, r *http.Request) {
173 xapp.Logger.Debug("GetAllRestSubscriptions() called")
174 response := c.registry.GetAllRestSubscriptions()
178 //-------------------------------------------------------------------
180 //-------------------------------------------------------------------
181 func (c *Control) ReadE2Subscriptions() error {
184 var register map[uint32]*Subscription
185 for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
186 xapp.Logger.Debug("Reading E2 subscriptions from db")
187 subIds, register, err = c.ReadAllSubscriptionsFromSdl()
189 xapp.Logger.Error("%v", err)
190 <-time.After(1 * time.Second)
192 c.registry.subIds = subIds
193 c.registry.register = register
194 c.HandleUncompletedSubscriptions(register)
198 xapp.Logger.Debug("Continuing without retring")
202 //-------------------------------------------------------------------
204 //-------------------------------------------------------------------
205 func (c *Control) ReadRESTSubscriptions() error {
207 var restSubscriptions map[string]*RESTSubscription
208 for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
209 xapp.Logger.Debug("Reading REST subscriptions from db")
210 restSubscriptions, err = c.ReadAllRESTSubscriptionsFromSdl()
212 xapp.Logger.Error("%v", err)
213 <-time.After(1 * time.Second)
215 c.registry.restSubscriptions = restSubscriptions
219 xapp.Logger.Debug("Continuing without retring")
223 //-------------------------------------------------------------------
225 //-------------------------------------------------------------------
226 func (c *Control) ReadConfigParameters(f string) {
228 c.LoggerLevel = int(xapp.Logger.GetLevel())
229 xapp.Logger.Debug("LoggerLevel %v", c.LoggerLevel)
231 // viper.GetDuration returns nanoseconds
232 e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
233 if e2tSubReqTimeout == 0 {
234 e2tSubReqTimeout = 2000 * 1000000
236 xapp.Logger.Debug("e2tSubReqTimeout %v", e2tSubReqTimeout)
238 e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
239 if e2tSubDelReqTime == 0 {
240 e2tSubDelReqTime = 2000 * 1000000
242 xapp.Logger.Debug("e2tSubDelReqTime %v", e2tSubDelReqTime)
243 e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
244 if e2tRecvMsgTimeout == 0 {
245 e2tRecvMsgTimeout = 2000 * 1000000
247 xapp.Logger.Debug("e2tRecvMsgTimeout %v", e2tRecvMsgTimeout)
249 e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
250 if e2tMaxSubReqTryCount == 0 {
251 e2tMaxSubReqTryCount = 1
253 xapp.Logger.Debug("e2tMaxSubReqTryCount %v", e2tMaxSubReqTryCount)
255 e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
256 if e2tMaxSubDelReqTryCount == 0 {
257 e2tMaxSubDelReqTryCount = 1
259 xapp.Logger.Debug("e2tMaxSubDelReqTryCount %v", e2tMaxSubDelReqTryCount)
261 readSubsFromDb = viper.GetString("controls.readSubsFromDb")
262 if readSubsFromDb == "" {
263 readSubsFromDb = "true"
265 xapp.Logger.Debug("readSubsFromDb %v", readSubsFromDb)
267 dbTryCount = viper.GetInt("controls.dbTryCount")
271 xapp.Logger.Debug("dbTryCount %v", dbTryCount)
273 dbRetryForever = viper.GetString("controls.dbRetryForever")
274 if dbRetryForever == "" {
275 dbRetryForever = "true"
277 xapp.Logger.Debug("dbRetryForever %v", dbRetryForever)
279 // Internal cfg parameter, used to define a wait time for RMR route clean-up. None default
280 // value 100ms used currently only in unittests.
281 waitRouteCleanup_ms = viper.GetDuration("controls.waitRouteCleanup_ms") * 1000000
282 if waitRouteCleanup_ms == 0 {
283 waitRouteCleanup_ms = 5000 * 1000000
285 xapp.Logger.Debug("waitRouteCleanup %v", waitRouteCleanup_ms)
288 //-------------------------------------------------------------------
290 //-------------------------------------------------------------------
291 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
293 xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
294 for subId, subs := range register {
295 if subs.SubRespRcvd == false {
296 // If policy subscription has already been made successfully unsuccessful update should not be deleted.
297 if subs.PolicyUpdate == false {
298 subs.NoRespToXapp = true
299 xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
300 c.SendSubscriptionDeleteReq(subs)
306 func (c *Control) ReadyCB(data interface{}) {
307 if c.RMRClient == nil {
308 c.RMRClient = xapp.Rmr
312 func (c *Control) Run() {
313 xapp.SetReadyCB(c.ReadyCB, nil)
314 xapp.AddConfigChangeListener(c.ReadConfigParameters)
318 //-------------------------------------------------------------------
320 //-------------------------------------------------------------------
321 func (c *Control) GetOrCreateRestSubscription(p *models.SubscriptionParams, md5sum string, xAppRmrEndpoint string) (*RESTSubscription, string, error) {
324 var restSubscription *RESTSubscription
327 prevRestSubsId, exists := restDuplicateCtrl.GetLastKnownRestSubsIdBasedOnMd5sum(md5sum)
328 if p.SubscriptionID == "" {
329 // Subscription does not contain REST subscription Id
331 restSubscription, err = c.registry.GetRESTSubscription(prevRestSubsId, false)
332 if restSubscription != nil {
333 // Subscription not found
334 restSubId = prevRestSubsId
336 xapp.Logger.Debug("Existing restSubId %s found by MD5sum %s for a request without subscription ID - using previous subscription", prevRestSubsId, md5sum)
338 xapp.Logger.Debug("Existing restSubId %s found by MD5sum %s for a request without subscription ID - Note: %s", prevRestSubsId, md5sum, err.Error())
341 xapp.Logger.Debug("None existing restSubId %s referred by MD5sum %s for a request without subscription ID - deleting cached entry", prevRestSubsId, md5sum)
342 restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
346 if restSubscription == nil {
347 restSubId = ksuid.New().String()
348 restSubscription = c.registry.CreateRESTSubscription(&restSubId, &xAppRmrEndpoint, p.Meid)
351 // Subscription contains REST subscription Id
352 restSubId = p.SubscriptionID
354 xapp.Logger.Debug("RestSubscription ID %s provided via REST request", restSubId)
355 restSubscription, err = c.registry.GetRESTSubscription(restSubId, false)
357 // Subscription with id in REST request does not exist
358 xapp.Logger.Error("%s", err.Error())
359 c.UpdateCounter(cRestSubFailToXapp)
364 xapp.Logger.Debug("Existing restSubscription found for ID %s, new request based on md5sum", restSubId)
366 xapp.Logger.Debug("Existing restSubscription found for ID %s(%s), re-transmission based on md5sum match with previous request", prevRestSubsId, restSubId)
370 return restSubscription, restSubId, nil
373 //-------------------------------------------------------------------
375 //-------------------------------------------------------------------
376 func (c *Control) RESTSubscriptionHandler(params interface{}) (*models.SubscriptionResponse, int) {
379 c.UpdateCounter(cRestSubReqFromXapp)
381 subResp := models.SubscriptionResponse{}
382 p := params.(*models.SubscriptionParams)
384 if c.LoggerLevel > 2 {
385 c.PrintRESTSubscriptionRequest(p)
388 if p.ClientEndpoint == nil {
389 err := fmt.Errorf("ClientEndpoint == nil")
390 xapp.Logger.Error("%v", err)
391 c.UpdateCounter(cRestSubFailToXapp)
392 return nil, common.SubscribeBadRequestCode
395 _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*p.ClientEndpoint)
397 xapp.Logger.Error("%s", err.Error())
398 c.UpdateCounter(cRestSubFailToXapp)
399 return nil, common.SubscribeBadRequestCode
402 md5sum, err := CalculateRequestMd5sum(params)
404 xapp.Logger.Error("Failed to generate md5sum from incoming request - %s", err.Error())
407 restSubscription, restSubId, err := c.GetOrCreateRestSubscription(p, md5sum, xAppRmrEndpoint)
409 xapp.Logger.Error("Subscription with id in REST request does not exist")
410 return nil, common.SubscribeNotFoundCode
413 subResp.SubscriptionID = &restSubId
414 subReqList := e2ap.SubscriptionRequestList{}
415 err = c.e2ap.FillSubscriptionReqMsgs(params, &subReqList, restSubscription)
417 xapp.Logger.Error("%s", err.Error())
418 restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
419 c.registry.DeleteRESTSubscription(&restSubId)
420 c.UpdateCounter(cRestSubFailToXapp)
421 return nil, common.SubscribeBadRequestCode
424 duplicate := restDuplicateCtrl.IsDuplicateToOngoingTransaction(restSubId, md5sum)
426 err := fmt.Errorf("Retransmission blocker direct ACK for request of restSubsId %s restSubId MD5sum %s as retransmission", restSubId, md5sum)
427 xapp.Logger.Debug("%s", err)
428 c.UpdateCounter(cRestSubRespToXapp)
429 return &subResp, common.SubscribeCreatedCode
432 c.WriteRESTSubscriptionToDb(restSubId, restSubscription)
433 e2SubscriptionDirectives, err := c.GetE2SubscriptionDirectives(p)
435 xapp.Logger.Error("%s", err)
436 return nil, common.SubscribeBadRequestCode
438 go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId, xAppRmrEndpoint, md5sum, e2SubscriptionDirectives)
440 c.UpdateCounter(cRestSubRespToXapp)
441 return &subResp, common.SubscribeCreatedCode
444 //-------------------------------------------------------------------
446 //-------------------------------------------------------------------
447 func (c *Control) GetE2SubscriptionDirectives(p *models.SubscriptionParams) (*E2SubscriptionDirectives, error) {
449 e2SubscriptionDirectives := &E2SubscriptionDirectives{}
450 if p == nil || p.E2SubscriptionDirectives == nil {
451 e2SubscriptionDirectives.E2TimeoutTimerValue = e2tSubReqTimeout
452 e2SubscriptionDirectives.E2MaxTryCount = int64(e2tMaxSubReqTryCount)
453 e2SubscriptionDirectives.CreateRMRRoute = true
454 xapp.Logger.Debug("p == nil || p.E2SubscriptionDirectives == nil. Using default values for E2TimeoutTimerValue = %v and E2RetryCount = %v RMRRoutingNeeded = true", e2tSubReqTimeout, e2tMaxSubReqTryCount)
456 if p.E2SubscriptionDirectives.E2TimeoutTimerValue >= 1 && p.E2SubscriptionDirectives.E2TimeoutTimerValue <= 10 {
457 e2SubscriptionDirectives.E2TimeoutTimerValue = time.Duration(p.E2SubscriptionDirectives.E2TimeoutTimerValue) * 1000000000 // Duration type cast returns nano seconds
459 return nil, fmt.Errorf("p.E2SubscriptionDirectives.E2TimeoutTimerValue out of range (1-10 seconds): %v", p.E2SubscriptionDirectives.E2TimeoutTimerValue)
461 if p.E2SubscriptionDirectives.E2RetryCount == nil {
462 xapp.Logger.Error("p.E2SubscriptionDirectives.E2RetryCount == nil. Using default value")
463 e2SubscriptionDirectives.E2MaxTryCount = int64(e2tMaxSubReqTryCount)
465 if *p.E2SubscriptionDirectives.E2RetryCount >= 0 && *p.E2SubscriptionDirectives.E2RetryCount <= 10 {
466 e2SubscriptionDirectives.E2MaxTryCount = *p.E2SubscriptionDirectives.E2RetryCount + 1 // E2MaxTryCount = First sending plus two retries
468 return nil, fmt.Errorf("p.E2SubscriptionDirectives.E2RetryCount out of range (0-10): %v", *p.E2SubscriptionDirectives.E2RetryCount)
471 e2SubscriptionDirectives.CreateRMRRoute = p.E2SubscriptionDirectives.RMRRoutingNeeded
473 xapp.Logger.Debug("e2SubscriptionDirectives.E2TimeoutTimerValue: %v", e2SubscriptionDirectives.E2TimeoutTimerValue)
474 xapp.Logger.Debug("e2SubscriptionDirectives.E2MaxTryCount: %v", e2SubscriptionDirectives.E2MaxTryCount)
475 xapp.Logger.Debug("e2SubscriptionDirectives.CreateRMRRoute: %v", e2SubscriptionDirectives.CreateRMRRoute)
476 return e2SubscriptionDirectives, nil
479 //-------------------------------------------------------------------
481 //-------------------------------------------------------------------
483 func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription, subReqList *e2ap.SubscriptionRequestList,
484 clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string, xAppRmrEndpoint string, md5sum string, e2SubscriptionDirectives *E2SubscriptionDirectives) {
486 xapp.Logger.Debug("Subscription Request count=%v ", len(subReqList.E2APSubscriptionRequests))
488 var xAppEventInstanceID int64
489 var e2EventInstanceID int64
490 errorInfo := &ErrorInfo{}
492 defer restDuplicateCtrl.SetMd5sumFromLastOkRequest(*restSubId, md5sum)
494 for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ {
495 subReqMsg := subReqList.E2APSubscriptionRequests[index]
496 xAppEventInstanceID = (int64)(subReqMsg.RequestId.Id)
498 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), *restSubId, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid})
500 // Send notification to xApp that prosessing of a Subscription Request has failed.
501 err := fmt.Errorf("Tracking failure")
502 errorInfo.ErrorCause = err.Error()
503 c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans, errorInfo)
507 xapp.Logger.Debug("Handle SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
509 subRespMsg, errorInfo, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, *restSubId, e2SubscriptionDirectives)
511 xapp.Logger.Debug("Handled SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
514 c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans, errorInfo)
516 e2EventInstanceID = (int64)(subRespMsg.RequestId.InstanceId)
517 restSubscription.AddMd5Sum(md5sum)
518 xapp.Logger.Debug("SubscriptionRequest index=%v processed successfullyfor %s. endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
519 index, *restSubId, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
520 c.sendSuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, e2EventInstanceID, clientEndpoint, trans)
526 //-------------------------------------------------------------------
528 //------------------------------------------------------------------
529 func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, meid *string,
530 restSubId string, e2SubscriptionDirectives *E2SubscriptionDirectives) (*e2ap.E2APSubscriptionResponse, *ErrorInfo, error) {
532 errorInfo := ErrorInfo{}
534 err := c.tracker.Track(trans)
536 xapp.Logger.Error("XAPP-SubReq Tracking error: %s", idstring(err, trans))
537 errorInfo.ErrorCause = err.Error()
538 err = fmt.Errorf("Tracking failure")
539 return nil, &errorInfo, err
542 subs, errorInfo, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c, e2SubscriptionDirectives.CreateRMRRoute)
544 xapp.Logger.Error("XAPP-SubReq Assign error: %s", idstring(err, trans))
545 return nil, &errorInfo, err
551 go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives)
552 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
556 switch themsg := event.(type) {
557 case *e2ap.E2APSubscriptionResponse:
559 return themsg, &errorInfo, nil
560 case *e2ap.E2APSubscriptionFailure:
561 err = fmt.Errorf("E2 SubscriptionFailure received")
562 errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
563 return nil, &errorInfo, err
564 case *PackSubscriptionRequestErrortEvent:
565 err = fmt.Errorf("E2 SubscriptionRequest pack failure")
566 return nil, &themsg.ErrorInfo, err
567 case *SDLWriteErrortEvent:
568 err = fmt.Errorf("SDL write failure")
569 return nil, &themsg.ErrorInfo, err
571 err = fmt.Errorf("Unexpected E2 subscription response received")
572 errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
576 err = fmt.Errorf("E2 subscription response timeout")
577 errorInfo.SetInfo(err.Error(), "", models.SubscriptionInstanceTimeoutTypeE2Timeout)
578 if subs.PolicyUpdate == true {
579 return nil, &errorInfo, err
583 xapp.Logger.Error("XAPP-SubReq E2 subscription failed %s", idstring(err, trans, subs))
584 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
585 return nil, &errorInfo, err
588 //-------------------------------------------------------------------
590 //-------------------------------------------------------------------
591 func (c *Control) sendUnsuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, err error,
592 clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp, errorInfo *ErrorInfo) {
594 // Send notification to xApp that prosessing of a Subscription Request has failed.
595 e2EventInstanceID := (int64)(0)
596 if errorInfo.ErrorSource == "" {
597 // Submgr is default source of error
598 errorInfo.ErrorSource = models.SubscriptionInstanceErrorSourceSUBMGR
600 resp := &models.SubscriptionResponse{
601 SubscriptionID: restSubId,
602 SubscriptionInstances: []*models.SubscriptionInstance{
603 &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
604 ErrorCause: errorInfo.ErrorCause,
605 ErrorSource: errorInfo.ErrorSource,
606 TimeoutType: errorInfo.TimeoutType,
607 XappEventInstanceID: &xAppEventInstanceID},
610 // Mark REST subscription request processed.
611 restSubscription.SetProcessed(err)
612 c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
614 xapp.Logger.Debug("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
615 errorInfo.ErrorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
617 xapp.Logger.Debug("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v",
618 errorInfo.ErrorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID)
621 c.UpdateCounter(cRestSubFailNotifToXapp)
622 xapp.Subscription.Notify(resp, *clientEndpoint)
625 //-------------------------------------------------------------------
627 //-------------------------------------------------------------------
628 func (c *Control) sendSuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, e2EventInstanceID int64,
629 clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp) {
631 // Store successfully processed InstanceId for deletion
632 restSubscription.AddE2InstanceId((uint32)(e2EventInstanceID))
633 restSubscription.AddXappIdToE2Id(xAppEventInstanceID, e2EventInstanceID)
635 // Send notification to xApp that a Subscription Request has been processed.
636 resp := &models.SubscriptionResponse{
637 SubscriptionID: restSubId,
638 SubscriptionInstances: []*models.SubscriptionInstance{
639 &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
641 XappEventInstanceID: &xAppEventInstanceID},
644 // Mark REST subscription request processesd.
645 restSubscription.SetProcessed(nil)
646 c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
647 xapp.Logger.Debug("Sending successful REST notification to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
648 clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
650 c.UpdateCounter(cRestSubNotifToXapp)
651 xapp.Subscription.Notify(resp, *clientEndpoint)
654 //-------------------------------------------------------------------
656 //-------------------------------------------------------------------
657 func (c *Control) RESTSubscriptionDeleteHandler(restSubId string) int {
660 c.UpdateCounter(cRestSubDelReqFromXapp)
662 xapp.Logger.Debug("SubscriptionDeleteRequest from XAPP")
664 restSubscription, err := c.registry.GetRESTSubscription(restSubId, true)
666 xapp.Logger.Error("%s", err.Error())
667 if restSubscription == nil {
668 // Subscription was not found
669 return common.UnsubscribeNoContentCode
671 if restSubscription.SubReqOngoing == true {
672 err := fmt.Errorf("Handling of the REST Subscription Request still ongoing %s", restSubId)
673 xapp.Logger.Error("%s", err.Error())
674 return common.UnsubscribeBadRequestCode
675 } else if restSubscription.SubDelReqOngoing == true {
676 // Previous request for same restSubId still ongoing
677 return common.UnsubscribeBadRequestCode
682 xAppRmrEndPoint := restSubscription.xAppRmrEndPoint
684 xapp.Logger.Debug("Deleteting handler: processing instances = %v", restSubscription.InstanceIds)
685 for _, instanceId := range restSubscription.InstanceIds {
686 xAppEventInstanceID, err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId)
689 xapp.Logger.Error("%s", err.Error())
691 xapp.Logger.Debug("Deleteting instanceId = %v", instanceId)
692 restSubscription.DeleteXappIdToE2Id(xAppEventInstanceID)
693 restSubscription.DeleteE2InstanceId(instanceId)
695 restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(restSubscription.lastReqMd5sum)
696 c.registry.DeleteRESTSubscription(&restSubId)
697 c.RemoveRESTSubscriptionFromDb(restSubId)
700 c.UpdateCounter(cRestSubDelRespToXapp)
702 return common.UnsubscribeNoContentCode
705 //-------------------------------------------------------------------
707 //-------------------------------------------------------------------
708 func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32) (int64, error) {
710 var xAppEventInstanceID int64
711 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{instanceId})
713 xapp.Logger.Debug("Subscription Delete Handler subscription for restSubId=%v, E2EventInstanceID=%v not found %s",
714 restSubId, instanceId, idstring(err, nil))
715 return xAppEventInstanceID, nil
718 xAppEventInstanceID = int64(subs.ReqId.Id)
719 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(*endPoint), *restSubId, e2ap.RequestId{subs.ReqId.Id, 0}, &xapp.RMRMeid{RanName: *meid})
721 err := fmt.Errorf("XAPP-SubDelReq transaction not created. restSubId %s, endPoint %s, meid %s, instanceId %v", *restSubId, *endPoint, *meid, instanceId)
722 xapp.Logger.Error("%s", err.Error())
724 defer trans.Release()
726 err = c.tracker.Track(trans)
728 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
729 xapp.Logger.Error("%s", err.Error())
730 return xAppEventInstanceID, &time.ParseError{}
735 go c.handleSubscriptionDelete(subs, trans)
736 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
738 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
740 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
742 return xAppEventInstanceID, nil
745 //-------------------------------------------------------------------
747 //-------------------------------------------------------------------
748 func (c *Control) RESTQueryHandler() (models.SubscriptionList, error) {
749 xapp.Logger.Debug("RESTQueryHandler() called")
753 return c.registry.QueryHandler()
756 func (c *Control) TestRestHandler(w http.ResponseWriter, r *http.Request) {
757 xapp.Logger.Debug("RESTTestRestHandler() called")
759 pathParams := mux.Vars(r)
760 s := pathParams["testId"]
762 // This can be used to delete single subscription from db
763 if contains := strings.Contains(s, "deletesubid="); contains == true {
764 var splits = strings.Split(s, "=")
765 if subId, err := strconv.ParseInt(splits[1], 10, 64); err == nil {
766 xapp.Logger.Debug("RemoveSubscriptionFromSdl() called. subId = %v", subId)
767 c.RemoveSubscriptionFromSdl(uint32(subId))
772 // This can be used to remove all subscriptions db from
774 xapp.Logger.Debug("RemoveAllSubscriptionsFromSdl() called")
775 c.RemoveAllSubscriptionsFromSdl()
776 c.RemoveAllRESTSubscriptionsFromSdl()
780 // This is meant to cause submgr's restart in testing
782 xapp.Logger.Debug("os.Exit(1) called")
786 xapp.Logger.Debug("Unsupported rest command received %s", s)
789 //-------------------------------------------------------------------
791 //-------------------------------------------------------------------
793 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
794 params := &xapp.RMRParams{}
795 params.Mtype = trans.GetMtype()
796 params.SubId = int(subs.GetReqId().InstanceId)
798 params.Meid = subs.GetMeid()
800 params.PayloadLen = len(trans.Payload.Buf)
801 params.Payload = trans.Payload.Buf
803 xapp.Logger.Debug("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
804 err = c.SendWithRetry(params, false, 5)
806 xapp.Logger.Error("rmrSendToE2T: Send failed: %+v", err)
811 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
813 params := &xapp.RMRParams{}
814 params.Mtype = trans.GetMtype()
815 params.SubId = int(subs.GetReqId().InstanceId)
816 params.Xid = trans.GetXid()
817 params.Meid = trans.GetMeid()
819 params.PayloadLen = len(trans.Payload.Buf)
820 params.Payload = trans.Payload.Buf
822 xapp.Logger.Debug("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
823 err = c.SendWithRetry(params, false, 5)
825 xapp.Logger.Error("rmrSendToXapp: Send failed: %+v", err)
830 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
831 if c.RMRClient == nil {
832 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
833 xapp.Logger.Error("%s", err.Error())
838 defer c.RMRClient.Free(msg.Mbuf)
840 // xapp-frame might use direct access to c buffer and
841 // when msg.Mbuf is freed, someone might take it into use
842 // and payload data might be invalid inside message handle function
844 // subscriptions won't load system a lot so there is no
845 // real performance hit by cloning buffer into new go byte slice
846 cPay := append(msg.Payload[:0:0], msg.Payload...)
848 msg.PayloadLen = len(cPay)
851 case xapp.RIC_SUB_REQ:
852 go c.handleXAPPSubscriptionRequest(msg)
853 case xapp.RIC_SUB_RESP:
854 go c.handleE2TSubscriptionResponse(msg)
855 case xapp.RIC_SUB_FAILURE:
856 go c.handleE2TSubscriptionFailure(msg)
857 case xapp.RIC_SUB_DEL_REQ:
858 go c.handleXAPPSubscriptionDeleteRequest(msg)
859 case xapp.RIC_SUB_DEL_RESP:
860 go c.handleE2TSubscriptionDeleteResponse(msg)
861 case xapp.RIC_SUB_DEL_FAILURE:
862 go c.handleE2TSubscriptionDeleteFailure(msg)
864 xapp.Logger.Debug("Unknown Message Type '%d', discarding", msg.Mtype)
869 //-------------------------------------------------------------------
870 // handle from XAPP Subscription Request
871 //------------------------------------------------------------------
872 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
873 xapp.Logger.Debug("MSG from XAPP: %s", params.String())
874 c.UpdateCounter(cSubReqFromXapp)
876 subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
878 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
882 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
884 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
887 defer trans.Release()
889 if err = c.tracker.Track(trans); err != nil {
890 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
894 //TODO handle subscription toward e2term inside AssignToSubscription / hide handleSubscriptionCreate in it?
895 subs, _, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c, true)
897 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
901 c.wakeSubscriptionRequest(subs, trans)
904 //-------------------------------------------------------------------
905 // Wake Subscription Request to E2node
906 //------------------------------------------------------------------
907 func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) {
909 e2SubscriptionDirectives, _ := c.GetE2SubscriptionDirectives(nil)
910 go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives)
911 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
914 switch themsg := event.(type) {
915 case *e2ap.E2APSubscriptionResponse:
916 themsg.RequestId.Id = trans.RequestId.Id
917 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
920 c.UpdateCounter(cSubRespToXapp)
921 c.rmrSendToXapp("", subs, trans)
924 case *e2ap.E2APSubscriptionFailure:
925 themsg.RequestId.Id = trans.RequestId.Id
926 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
928 c.UpdateCounter(cSubFailToXapp)
929 c.rmrSendToXapp("", subs, trans)
935 xapp.Logger.Debug("XAPP-SubReq: failed %s", idstring(err, trans, subs))
936 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
939 //-------------------------------------------------------------------
940 // handle from XAPP Subscription Delete Request
941 //------------------------------------------------------------------
942 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
943 xapp.Logger.Debug("MSG from XAPP: %s", params.String())
944 c.UpdateCounter(cSubDelReqFromXapp)
946 subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
948 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
952 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
954 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
957 defer trans.Release()
959 err = c.tracker.Track(trans)
961 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
965 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
967 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
974 go c.handleSubscriptionDelete(subs, trans)
975 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
977 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
979 if subs.NoRespToXapp == true {
980 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
981 xapp.Logger.Debug("XAPP-SubDelReq: subs.NoRespToXapp == true")
985 // Whatever is received success, fail or timeout, send successful delete response
986 subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
987 subDelRespMsg.RequestId.Id = trans.RequestId.Id
988 subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
989 subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
990 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
992 c.UpdateCounter(cSubDelRespToXapp)
993 c.rmrSendToXapp("", subs, trans)
996 //TODO handle subscription toward e2term insiged RemoveFromSubscription / hide handleSubscriptionDelete in it?
997 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
1000 //-------------------------------------------------------------------
1001 // SUBS CREATE Handling
1002 //-------------------------------------------------------------------
1003 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp, e2SubscriptionDirectives *E2SubscriptionDirectives) {
1005 var event interface{} = nil
1006 var removeSubscriptionFromDb bool = false
1007 trans := c.tracker.NewSubsTransaction(subs)
1008 subs.WaitTransactionTurn(trans)
1009 defer subs.ReleaseTransactionTurn(trans)
1010 defer trans.Release()
1012 xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
1014 subRfMsg, valid := subs.GetCachedResponse()
1015 if subRfMsg == nil && valid == true {
1016 event = c.sendE2TSubscriptionRequest(subs, trans, parentTrans, e2SubscriptionDirectives)
1017 switch event.(type) {
1018 case *e2ap.E2APSubscriptionResponse:
1019 subRfMsg, valid = subs.SetCachedResponse(event, true)
1020 subs.SubRespRcvd = true
1021 case *e2ap.E2APSubscriptionFailure:
1022 removeSubscriptionFromDb = true
1023 subRfMsg, valid = subs.SetCachedResponse(event, false)
1024 xapp.Logger.Debug("SUBS-SubReq: internal delete due failure event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1025 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1026 case *SubmgrRestartTestEvent:
1027 // This simulates that no response has been received and after restart subscriptions are restored from db
1028 xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
1029 case *PackSubscriptionRequestErrortEvent, *SDLWriteErrortEvent:
1030 subRfMsg, valid = subs.SetCachedResponse(event, false)
1032 if subs.PolicyUpdate == false {
1033 xapp.Logger.Debug("SUBS-SubReq: internal delete due default event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1034 removeSubscriptionFromDb = true
1035 subRfMsg, valid = subs.SetCachedResponse(nil, false)
1036 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1039 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
1041 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
1044 err := c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
1046 subRfMsg, valid = subs.SetCachedResponse(event, false)
1047 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1050 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
1052 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
1055 parentTrans.SendEvent(subRfMsg, 0)
1058 //-------------------------------------------------------------------
1059 // SUBS DELETE Handling
1060 //-------------------------------------------------------------------
1062 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
1064 trans := c.tracker.NewSubsTransaction(subs)
1065 subs.WaitTransactionTurn(trans)
1066 defer subs.ReleaseTransactionTurn(trans)
1067 defer trans.Release()
1069 xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
1073 if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
1076 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1080 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
1081 // If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if
1082 // RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...))
1083 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
1084 c.registry.UpdateSubscriptionToDb(subs, c)
1085 parentTrans.SendEvent(nil, 0)
1088 //-------------------------------------------------------------------
1089 // send to E2T Subscription Request
1090 //-------------------------------------------------------------------
1091 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp, e2SubscriptionDirectives *E2SubscriptionDirectives) interface{} {
1093 var event interface{} = nil
1094 var timedOut bool = false
1095 const ricRequestorId = 123
1097 subReqMsg := subs.SubReqMsg
1098 subReqMsg.RequestId = subs.GetReqId().RequestId
1099 subReqMsg.RequestId.Id = ricRequestorId
1100 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
1102 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
1103 return &PackSubscriptionRequestErrortEvent{
1105 ErrorSource: models.SubscriptionInstanceErrorSourceASN1,
1106 ErrorCause: err.Error(),
1111 // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
1112 err = c.WriteSubscriptionToDb(subs)
1114 return &SDLWriteErrortEvent{
1116 ErrorSource: models.SubscriptionInstanceErrorSourceDBAAS,
1117 ErrorCause: err.Error(),
1122 for retries := int64(0); retries < e2SubscriptionDirectives.E2MaxTryCount; retries++ {
1123 desc := fmt.Sprintf("(retry %d)", retries)
1125 c.UpdateCounter(cSubReqToE2)
1127 c.UpdateCounter(cSubReReqToE2)
1129 c.rmrSendToE2T(desc, subs, trans)
1130 if subs.DoNotWaitSubResp == false {
1131 event, timedOut = trans.WaitEvent(e2SubscriptionDirectives.E2TimeoutTimerValue)
1133 c.UpdateCounter(cSubReqTimerExpiry)
1137 // Simulating case where subscrition request has been sent but response has not been received before restart
1138 event = &SubmgrRestartTestEvent{}
1139 xapp.Logger.Debug("Restart event, DoNotWaitSubResp == true")
1143 xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1147 //-------------------------------------------------------------------
1148 // send to E2T Subscription Delete Request
1149 //-------------------------------------------------------------------
1151 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
1153 var event interface{}
1155 const ricRequestorId = 123
1157 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1158 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1159 subDelReqMsg.RequestId.Id = ricRequestorId
1160 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1161 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1163 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
1167 for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
1168 desc := fmt.Sprintf("(retry %d)", retries)
1170 c.UpdateCounter(cSubDelReqToE2)
1172 c.UpdateCounter(cSubDelReReqToE2)
1174 c.rmrSendToE2T(desc, subs, trans)
1175 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
1177 c.UpdateCounter(cSubDelReqTimerExpiry)
1182 xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1186 //-------------------------------------------------------------------
1187 // handle from E2T Subscription Response
1188 //-------------------------------------------------------------------
1189 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
1190 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1191 c.UpdateCounter(cSubRespFromE2)
1193 subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
1195 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
1198 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
1200 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
1203 trans := subs.GetTransaction()
1205 err = fmt.Errorf("Ongoing transaction not found")
1206 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
1209 sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
1210 if sendOk == false {
1211 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1212 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
1217 //-------------------------------------------------------------------
1218 // handle from E2T Subscription Failure
1219 //-------------------------------------------------------------------
1220 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
1221 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1222 c.UpdateCounter(cSubFailFromE2)
1223 subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
1225 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
1228 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
1230 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
1233 trans := subs.GetTransaction()
1235 err = fmt.Errorf("Ongoing transaction not found")
1236 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
1239 sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
1240 if sendOk == false {
1241 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1242 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
1247 //-------------------------------------------------------------------
1248 // handle from E2T Subscription Delete Response
1249 //-------------------------------------------------------------------
1250 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
1251 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1252 c.UpdateCounter(cSubDelRespFromE2)
1253 subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
1255 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1258 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
1260 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1263 trans := subs.GetTransaction()
1265 err = fmt.Errorf("Ongoing transaction not found")
1266 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
1269 sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
1270 if sendOk == false {
1271 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1272 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
1277 //-------------------------------------------------------------------
1278 // handle from E2T Subscription Delete Failure
1279 //-------------------------------------------------------------------
1280 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
1281 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1282 c.UpdateCounter(cSubDelFailFromE2)
1283 subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
1285 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1288 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
1290 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1293 trans := subs.GetTransaction()
1295 err = fmt.Errorf("Ongoing transaction not found")
1296 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
1299 sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
1300 if sendOk == false {
1301 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1302 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
1307 //-------------------------------------------------------------------
1309 //-------------------------------------------------------------------
1310 func typeofSubsMessage(v interface{}) string {
1315 //case *e2ap.E2APSubscriptionRequest:
1317 case *e2ap.E2APSubscriptionResponse:
1319 case *e2ap.E2APSubscriptionFailure:
1321 //case *e2ap.E2APSubscriptionDeleteRequest:
1322 // return "SubDelReq"
1323 case *e2ap.E2APSubscriptionDeleteResponse:
1325 case *e2ap.E2APSubscriptionDeleteFailure:
1332 //-------------------------------------------------------------------
1334 //-------------------------------------------------------------------
1335 func (c *Control) WriteSubscriptionToDb(subs *Subscription) error {
1336 xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
1337 err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
1339 xapp.Logger.Error("%v", err)
1345 //-------------------------------------------------------------------
1347 //-------------------------------------------------------------------
1348 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) error {
1350 if removeSubscriptionFromDb == true {
1351 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1352 c.RemoveSubscriptionFromDb(subs)
1354 // Update is needed for successful response and merge case here
1355 if subs.RetryFromXapp == false {
1356 err := c.WriteSubscriptionToDb(subs)
1360 subs.RetryFromXapp = false
1364 //-------------------------------------------------------------------
1366 //-------------------------------------------------------------------
1367 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
1368 xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
1369 err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
1371 xapp.Logger.Error("%v", err)
1375 //-------------------------------------------------------------------
1377 //-------------------------------------------------------------------
1378 func (c *Control) WriteRESTSubscriptionToDb(restSubId string, restSubs *RESTSubscription) {
1379 xapp.Logger.Debug("WriteRESTSubscriptionToDb() restSubId = %s", restSubId)
1380 err := c.WriteRESTSubscriptionToSdl(restSubId, restSubs)
1382 xapp.Logger.Error("%v", err)
1386 //-------------------------------------------------------------------
1388 //-------------------------------------------------------------------
1389 func (c *Control) UpdateRESTSubscriptionInDB(restSubId string, restSubs *RESTSubscription, removeRestSubscriptionFromDb bool) {
1391 if removeRestSubscriptionFromDb == true {
1392 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1393 c.RemoveRESTSubscriptionFromDb(restSubId)
1395 c.WriteRESTSubscriptionToDb(restSubId, restSubs)
1399 //-------------------------------------------------------------------
1401 //-------------------------------------------------------------------
1402 func (c *Control) RemoveRESTSubscriptionFromDb(restSubId string) {
1403 xapp.Logger.Debug("RemoveRESTSubscriptionFromDb() restSubId = %s", restSubId)
1404 err := c.RemoveRESTSubscriptionFromSdl(restSubId)
1406 xapp.Logger.Error("%v", err)
1410 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) {
1412 const ricRequestorId = 123
1413 xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
1415 // Send delete for every endpoint in the subscription
1416 if subs.PolicyUpdate == false {
1417 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1418 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1419 subDelReqMsg.RequestId.Id = ricRequestorId
1420 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1421 mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1423 xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
1426 for _, endPoint := range subs.EpList.Endpoints {
1427 params := &xapp.RMRParams{}
1428 params.Mtype = mType
1429 params.SubId = int(subs.GetReqId().InstanceId)
1431 params.Meid = subs.Meid
1432 params.Src = endPoint.String()
1433 params.PayloadLen = len(payload.Buf)
1434 params.Payload = payload.Buf
1436 subs.DeleteFromDb = true
1437 c.handleXAPPSubscriptionDeleteRequest(params)
1442 func (c *Control) PrintRESTSubscriptionRequest(p *models.SubscriptionParams) {
1444 fmt.Println("CRESTSubscriptionRequest")
1450 if p.SubscriptionID != "" {
1451 fmt.Println(" SubscriptionID = ", p.SubscriptionID)
1453 fmt.Println(" SubscriptionID = ''")
1456 fmt.Printf(" ClientEndpoint.Host = %s\n", p.ClientEndpoint.Host)
1458 if p.ClientEndpoint.HTTPPort != nil {
1459 fmt.Printf(" ClientEndpoint.HTTPPort = %v\n", *p.ClientEndpoint.HTTPPort)
1461 fmt.Println(" ClientEndpoint.HTTPPort = nil")
1464 if p.ClientEndpoint.RMRPort != nil {
1465 fmt.Printf(" ClientEndpoint.RMRPort = %v\n", *p.ClientEndpoint.RMRPort)
1467 fmt.Println(" ClientEndpoint.RMRPort = nil")
1471 fmt.Printf(" Meid = %s\n", *p.Meid)
1473 fmt.Println(" Meid = nil")
1476 if p.E2SubscriptionDirectives == nil {
1477 fmt.Println(" E2SubscriptionDirectives = nil")
1479 fmt.Println(" E2SubscriptionDirectives")
1480 if p.E2SubscriptionDirectives.E2RetryCount == nil {
1481 fmt.Println(" E2RetryCount == nil")
1483 fmt.Printf(" E2RetryCount = %v\n", *p.E2SubscriptionDirectives.E2RetryCount)
1485 fmt.Printf(" E2TimeoutTimerValue = %v\n", p.E2SubscriptionDirectives.E2TimeoutTimerValue)
1486 fmt.Printf(" RMRRoutingNeeded = %v\n", p.E2SubscriptionDirectives.RMRRoutingNeeded)
1488 for _, subscriptionDetail := range p.SubscriptionDetails {
1489 if p.RANFunctionID != nil {
1490 fmt.Printf(" RANFunctionID = %v\n", *p.RANFunctionID)
1492 fmt.Println(" RANFunctionID = nil")
1494 fmt.Printf(" SubscriptionDetail.XappEventInstanceID = %v\n", *subscriptionDetail.XappEventInstanceID)
1495 fmt.Printf(" SubscriptionDetail.EventTriggers = %v\n", subscriptionDetail.EventTriggers)
1497 for _, actionToBeSetup := range subscriptionDetail.ActionToBeSetupList {
1498 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionID = %v\n", *actionToBeSetup.ActionID)
1499 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionType = %s\n", *actionToBeSetup.ActionType)
1500 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionDefinition = %v\n", actionToBeSetup.ActionDefinition)
1502 if actionToBeSetup.SubsequentAction != nil {
1503 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.SubsequentAction.SubsequentActionType = %s\n", *actionToBeSetup.SubsequentAction.SubsequentActionType)
1504 fmt.Printf(" SubscriptionDetail.ActionToBeSetup..SubsequentAction.TimeToWait = %s\n", *actionToBeSetup.SubsequentAction.TimeToWait)
1506 fmt.Println(" SubscriptionDetail.ActionToBeSetup.SubsequentAction = nil")