2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
30 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
31 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
32 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
33 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/common"
34 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
35 httptransport "github.com/go-openapi/runtime/client"
36 "github.com/go-openapi/strfmt"
37 "github.com/gorilla/mux"
38 "github.com/segmentio/ksuid"
39 "github.com/spf13/viper"
42 //-----------------------------------------------------------------------------
44 //-----------------------------------------------------------------------------
46 func idstring(err error, entries ...fmt.Stringer) string {
47 var retval string = ""
48 var filler string = ""
49 for _, entry := range entries {
51 retval += filler + entry.String()
54 retval += filler + "(NIL)"
58 retval += filler + "err(" + err.Error() + ")"
64 //-----------------------------------------------------------------------------
66 //-----------------------------------------------------------------------------
68 var e2tSubReqTimeout time.Duration
69 var e2tSubDelReqTime time.Duration
70 var e2tRecvMsgTimeout time.Duration
71 var waitRouteCleanup_ms time.Duration
72 var e2tMaxSubReqTryCount uint64 // Initial try + retry
73 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
74 var readSubsFromDb string
75 var restDuplicateCtrl duplicateCtrl
76 var dbRetryForever string
85 restSubsDb Sdlnterface
88 Counters map[string]xapp.Counter
98 type SubmgrRestartTestEvent struct{}
99 type SubmgrRestartUpEvent struct{}
100 type PackSubscriptionRequestErrortEvent struct {
104 func (p *PackSubscriptionRequestErrortEvent) SetEvent(errorInfo *ErrorInfo) {
105 p.ErrorInfo = *errorInfo
108 type SDLWriteErrortEvent struct {
112 func (s *SDLWriteErrortEvent) SetEvent(errorInfo *ErrorInfo) {
113 s.ErrorInfo = *errorInfo
117 xapp.Logger.Debug("SUBMGR")
119 viper.SetEnvPrefix("submgr")
120 viper.AllowEmptyEnv(true)
123 func NewControl() *Control {
125 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
126 rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
128 registry := new(Registry)
129 registry.Initialize()
130 registry.rtmgrClient = &rtmgrClient
132 tracker := new(Tracker)
135 c := &Control{e2ap: new(E2ap),
138 e2SubsDb: CreateSdl(),
139 restSubsDb: CreateRESTSdl(),
140 Counters: xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
143 c.ReadConfigParameters("")
145 // Register REST handler for testing support
146 xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
147 xapp.Resource.InjectRoute("/ric/v1/restsubscriptions", c.GetAllRestSubscriptions, "GET")
148 xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET")
150 go xapp.Subscription.Listen(c.RESTSubscriptionHandler, c.RESTQueryHandler, c.RESTSubscriptionDeleteHandler)
152 if readSubsFromDb == "false" {
156 restDuplicateCtrl.Init()
158 // Read subscriptions from db
159 c.ReadE2Subscriptions()
160 c.ReadRESTSubscriptions()
164 func (c *Control) SymptomDataHandler(w http.ResponseWriter, r *http.Request) {
165 subscriptions, _ := c.registry.QueryHandler()
166 xapp.Resource.SendSymptomDataJson(w, r, subscriptions, "platform/subscriptions.json")
169 //-------------------------------------------------------------------
171 //-------------------------------------------------------------------
172 func (c *Control) GetAllRestSubscriptions(w http.ResponseWriter, r *http.Request) {
173 xapp.Logger.Debug("GetAllRestSubscriptions() called")
174 response := c.registry.GetAllRestSubscriptions()
178 //-------------------------------------------------------------------
180 //-------------------------------------------------------------------
181 func (c *Control) ReadE2Subscriptions() error {
184 var register map[uint32]*Subscription
185 for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
186 xapp.Logger.Debug("Reading E2 subscriptions from db")
187 subIds, register, err = c.ReadAllSubscriptionsFromSdl()
189 xapp.Logger.Error("%v", err)
190 <-time.After(1 * time.Second)
192 c.registry.subIds = subIds
193 c.registry.register = register
194 c.HandleUncompletedSubscriptions(register)
198 xapp.Logger.Debug("Continuing without retring")
202 //-------------------------------------------------------------------
204 //-------------------------------------------------------------------
205 func (c *Control) ReadRESTSubscriptions() error {
207 var restSubscriptions map[string]*RESTSubscription
208 for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
209 xapp.Logger.Debug("Reading REST subscriptions from db")
210 restSubscriptions, err = c.ReadAllRESTSubscriptionsFromSdl()
212 xapp.Logger.Error("%v", err)
213 <-time.After(1 * time.Second)
215 c.registry.restSubscriptions = restSubscriptions
219 xapp.Logger.Debug("Continuing without retring")
223 //-------------------------------------------------------------------
225 //-------------------------------------------------------------------
226 func (c *Control) ReadConfigParameters(f string) {
228 c.LoggerLevel = int(xapp.Logger.GetLevel())
229 xapp.Logger.Debug("LoggerLevel %v", c.LoggerLevel)
231 // viper.GetDuration returns nanoseconds
232 e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
233 if e2tSubReqTimeout == 0 {
234 e2tSubReqTimeout = 2000 * 1000000
236 xapp.Logger.Debug("e2tSubReqTimeout %v", e2tSubReqTimeout)
238 e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
239 if e2tSubDelReqTime == 0 {
240 e2tSubDelReqTime = 2000 * 1000000
242 xapp.Logger.Debug("e2tSubDelReqTime %v", e2tSubDelReqTime)
243 e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
244 if e2tRecvMsgTimeout == 0 {
245 e2tRecvMsgTimeout = 2000 * 1000000
247 xapp.Logger.Debug("e2tRecvMsgTimeout %v", e2tRecvMsgTimeout)
249 e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
250 if e2tMaxSubReqTryCount == 0 {
251 e2tMaxSubReqTryCount = 1
253 xapp.Logger.Debug("e2tMaxSubReqTryCount %v", e2tMaxSubReqTryCount)
255 e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
256 if e2tMaxSubDelReqTryCount == 0 {
257 e2tMaxSubDelReqTryCount = 1
259 xapp.Logger.Debug("e2tMaxSubDelReqTryCount %v", e2tMaxSubDelReqTryCount)
261 readSubsFromDb = viper.GetString("controls.readSubsFromDb")
262 if readSubsFromDb == "" {
263 readSubsFromDb = "true"
265 xapp.Logger.Debug("readSubsFromDb %v", readSubsFromDb)
267 dbTryCount = viper.GetInt("controls.dbTryCount")
271 xapp.Logger.Debug("dbTryCount %v", dbTryCount)
273 dbRetryForever = viper.GetString("controls.dbRetryForever")
274 if dbRetryForever == "" {
275 dbRetryForever = "true"
277 xapp.Logger.Debug("dbRetryForever %v", dbRetryForever)
279 // Internal cfg parameter, used to define a wait time for RMR route clean-up. None default
280 // value 100ms used currently only in unittests.
281 waitRouteCleanup_ms = viper.GetDuration("controls.waitRouteCleanup_ms") * 1000000
282 if waitRouteCleanup_ms == 0 {
283 waitRouteCleanup_ms = 5000 * 1000000
285 xapp.Logger.Debug("waitRouteCleanup %v", waitRouteCleanup_ms)
288 //-------------------------------------------------------------------
290 //-------------------------------------------------------------------
291 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
293 xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
294 for subId, subs := range register {
295 if subs.SubRespRcvd == false {
296 // If policy subscription has already been made successfully unsuccessful update should not be deleted.
297 if subs.PolicyUpdate == false {
298 subs.NoRespToXapp = true
299 xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
300 c.SendSubscriptionDeleteReq(subs)
306 func (c *Control) ReadyCB(data interface{}) {
307 if c.RMRClient == nil {
308 c.RMRClient = xapp.Rmr
312 func (c *Control) Run() {
313 xapp.SetReadyCB(c.ReadyCB, nil)
314 xapp.AddConfigChangeListener(c.ReadConfigParameters)
318 //-------------------------------------------------------------------
320 //-------------------------------------------------------------------
321 func (c *Control) GetOrCreateRestSubscription(p *models.SubscriptionParams, md5sum string, xAppRmrEndpoint string) (*RESTSubscription, string, string, error) {
324 var restSubscription *RESTSubscription
327 prevRestSubsId, exists := restDuplicateCtrl.GetLastKnownRestSubsIdBasedOnMd5sum(md5sum)
328 if p.SubscriptionID == "" {
329 // Subscription does not contain REST subscription Id
331 restSubscription, err = c.registry.GetRESTSubscription(prevRestSubsId, false)
332 if restSubscription != nil {
333 // Subscription not found
334 restSubId = prevRestSubsId
336 xapp.Logger.Debug("Existing restSubId %s found by MD5sum %s for a request without subscription ID - using previous subscription", prevRestSubsId, md5sum)
338 xapp.Logger.Debug("Existing restSubId %s found by MD5sum %s for a request without subscription ID - Note: %s", prevRestSubsId, md5sum, err.Error())
341 xapp.Logger.Debug("None existing restSubId %s referred by MD5sum %s for a request without subscription ID - deleting cached entry", prevRestSubsId, md5sum)
342 restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
346 if restSubscription == nil {
347 restSubId = ksuid.New().String()
348 restSubscription = c.registry.CreateRESTSubscription(&restSubId, &xAppRmrEndpoint, p.Meid)
351 // Subscription contains REST subscription Id
352 restSubId = p.SubscriptionID
354 xapp.Logger.Debug("RestSubscription ID %s provided via REST request", restSubId)
355 restSubscription, err = c.registry.GetRESTSubscription(restSubId, false)
357 // Subscription with id in REST request does not exist
358 xapp.Logger.Error("%s", err.Error())
359 c.UpdateCounter(cRestSubFailToXapp)
360 return nil, "", models.SubscriptionInstanceRejectCauseRESTSubscriptionWithGivenIDDoesNotExist, err
364 xapp.Logger.Debug("Existing restSubscription found for ID %s, new request based on md5sum", restSubId)
366 xapp.Logger.Debug("Existing restSubscription found for ID %s(%s), re-transmission based on md5sum match with previous request", prevRestSubsId, restSubId)
370 return restSubscription, restSubId, "", nil
373 //-------------------------------------------------------------------
375 //-------------------------------------------------------------------
376 func (c *Control) RESTSubscriptionHandler(params interface{}) (*models.SubscriptionResponse, int) {
379 c.UpdateCounter(cRestSubReqFromXapp)
381 subResp := models.SubscriptionResponse{}
382 p := params.(*models.SubscriptionParams)
384 if c.LoggerLevel > 2 {
385 c.PrintRESTSubscriptionRequest(p)
388 if p.ClientEndpoint == nil {
389 err := fmt.Errorf("ClientEndpoint == nil")
390 xapp.Logger.Error("%v", err)
391 c.UpdateCounter(cRestSubFailToXapp)
392 return c.GetSubscriptionResponse(models.SubscriptionInstanceRejectCauseInvalidRESTRequestMessage, err.Error(), "SUBMGR", ""), common.SubscribeBadRequestCode
395 _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*p.ClientEndpoint)
397 xapp.Logger.Error("%s", err.Error())
398 c.UpdateCounter(cRestSubFailToXapp)
399 return c.GetSubscriptionResponse(models.SubscriptionInstanceRejectCauseInvalidRESTRequestMessage, err.Error(), "SUBMGR", ""), common.SubscribeBadRequestCode
402 md5sum, err := CalculateRequestMd5sum(params)
404 xapp.Logger.Error("Failed to generate md5sum from incoming request - %s", err.Error())
407 restSubscription, restSubId, rejectCause, err := c.GetOrCreateRestSubscription(p, md5sum, xAppRmrEndpoint)
409 xapp.Logger.Error("Failed to get/allocate REST subscription")
410 return c.GetSubscriptionResponse(rejectCause, err.Error(), "SUBMGR", ""), common.SubscribeBadRequestCode
413 subResp.SubscriptionID = &restSubId
414 subReqList := e2ap.SubscriptionRequestList{}
415 err = c.e2ap.FillSubscriptionReqMsgs(params, &subReqList, restSubscription)
417 xapp.Logger.Error("%s", err.Error())
418 restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
419 c.registry.DeleteRESTSubscription(&restSubId)
420 c.UpdateCounter(cRestSubFailToXapp)
421 return c.GetSubscriptionResponse(models.SubscriptionInstanceRejectCauseInvalidRESTRequestMessage, err.Error(), "SUBMGR", ""), common.SubscribeBadRequestCode
424 duplicate := restDuplicateCtrl.IsDuplicateToOngoingTransaction(restSubId, md5sum)
426 err := fmt.Errorf("Retransmission blocker direct ACK for request of restSubsId %s restSubId MD5sum %s as retransmission", restSubId, md5sum)
427 xapp.Logger.Debug("%s", err)
428 c.UpdateCounter(cRestSubRespToXapp)
429 return &subResp, common.SubscribeCreatedCode
432 c.WriteRESTSubscriptionToDb(restSubId, restSubscription)
433 e2SubscriptionDirectives, err := c.GetE2SubscriptionDirectives(p)
435 xapp.Logger.Error("%s", err)
436 return c.GetSubscriptionResponse(models.SubscriptionInstanceRejectCauseInvalidRESTRequestMessage, err.Error(), "SUBMGR", ""), common.SubscribeBadRequestCode
438 go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId, xAppRmrEndpoint, md5sum, e2SubscriptionDirectives)
440 c.UpdateCounter(cRestSubRespToXapp)
441 return &subResp, common.SubscribeCreatedCode
444 //-------------------------------------------------------------------
446 //-------------------------------------------------------------------
447 func (c *Control) GetE2SubscriptionDirectives(p *models.SubscriptionParams) (*E2SubscriptionDirectives, error) {
449 e2SubscriptionDirectives := &E2SubscriptionDirectives{}
450 if p == nil || p.E2SubscriptionDirectives == nil {
451 e2SubscriptionDirectives.E2TimeoutTimerValue = e2tSubReqTimeout
452 e2SubscriptionDirectives.E2MaxTryCount = int64(e2tMaxSubReqTryCount)
453 e2SubscriptionDirectives.CreateRMRRoute = true
454 xapp.Logger.Debug("p == nil || p.E2SubscriptionDirectives == nil. Using default values for E2TimeoutTimerValue = %v and E2RetryCount = %v RMRRoutingNeeded = true", e2tSubReqTimeout, e2tMaxSubReqTryCount)
456 if p.E2SubscriptionDirectives.E2TimeoutTimerValue >= 1 && p.E2SubscriptionDirectives.E2TimeoutTimerValue <= 10 {
457 e2SubscriptionDirectives.E2TimeoutTimerValue = time.Duration(p.E2SubscriptionDirectives.E2TimeoutTimerValue) * 1000000000 // Duration type cast returns nano seconds
459 return nil, fmt.Errorf("p.E2SubscriptionDirectives.E2TimeoutTimerValue out of range (1-10 seconds): %v", p.E2SubscriptionDirectives.E2TimeoutTimerValue)
461 if p.E2SubscriptionDirectives.E2RetryCount == nil {
462 xapp.Logger.Error("p.E2SubscriptionDirectives.E2RetryCount == nil. Using default value")
463 e2SubscriptionDirectives.E2MaxTryCount = int64(e2tMaxSubReqTryCount)
465 if *p.E2SubscriptionDirectives.E2RetryCount >= 0 && *p.E2SubscriptionDirectives.E2RetryCount <= 10 {
466 e2SubscriptionDirectives.E2MaxTryCount = *p.E2SubscriptionDirectives.E2RetryCount + 1 // E2MaxTryCount = First sending plus two retries
468 return nil, fmt.Errorf("p.E2SubscriptionDirectives.E2RetryCount out of range (0-10): %v", *p.E2SubscriptionDirectives.E2RetryCount)
471 if p.E2SubscriptionDirectives.RMRRoutingNeeded == nil {
472 xapp.Logger.Error("p.E2SubscriptionDirectives.RMRRoutingNeeded == nil")
473 e2SubscriptionDirectives.CreateRMRRoute = true
475 e2SubscriptionDirectives.CreateRMRRoute = *p.E2SubscriptionDirectives.RMRRoutingNeeded
478 xapp.Logger.Debug("e2SubscriptionDirectives.E2TimeoutTimerValue: %v", e2SubscriptionDirectives.E2TimeoutTimerValue)
479 xapp.Logger.Debug("e2SubscriptionDirectives.E2MaxTryCount: %v", e2SubscriptionDirectives.E2MaxTryCount)
480 xapp.Logger.Debug("e2SubscriptionDirectives.CreateRMRRoute: %v", e2SubscriptionDirectives.CreateRMRRoute)
481 return e2SubscriptionDirectives, nil
484 //-------------------------------------------------------------------
486 //-------------------------------------------------------------------
487 func (c *Control) GetSubscriptionResponse(rejectCause string, errorCause string, errorSource string, timeoutType string) *models.SubscriptionResponse {
488 subResp := models.SubscriptionResponse{}
489 subscriptionInstance := models.SubscriptionInstance{}
490 subscriptionInstance.RejectCause = &rejectCause
491 subscriptionInstance.ErrorCause = &errorCause
492 subscriptionInstance.ErrorSource = &errorSource
493 if timeoutType != "" {
494 subscriptionInstance.TimeoutType = &timeoutType
496 subResp.SubscriptionInstances = append(subResp.SubscriptionInstances, &subscriptionInstance)
497 xapp.Logger.Error("etSubscriptionResponse() %+v", subscriptionInstance)
502 //-------------------------------------------------------------------
504 //-------------------------------------------------------------------
506 func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription, subReqList *e2ap.SubscriptionRequestList,
507 clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string, xAppRmrEndpoint string, md5sum string, e2SubscriptionDirectives *E2SubscriptionDirectives) {
509 xapp.Logger.Debug("Subscription Request count=%v ", len(subReqList.E2APSubscriptionRequests))
511 var xAppEventInstanceID int64
512 var e2EventInstanceID int64
513 errorInfo := &ErrorInfo{}
515 defer restDuplicateCtrl.SetMd5sumFromLastOkRequest(*restSubId, md5sum)
517 for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ {
518 subReqMsg := subReqList.E2APSubscriptionRequests[index]
519 xAppEventInstanceID = (int64)(subReqMsg.RequestId.Id)
521 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), *restSubId, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid})
523 // Send notification to xApp that prosessing of a Subscription Request has failed.
524 err := fmt.Errorf("Tracking failure")
525 errorInfo.ErrorCause = err.Error()
526 c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans, errorInfo)
530 xapp.Logger.Debug("Handle SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
532 subRespMsg, errorInfo, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, *restSubId, e2SubscriptionDirectives)
534 xapp.Logger.Debug("Handled SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
537 c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans, errorInfo)
539 e2EventInstanceID = (int64)(subRespMsg.RequestId.InstanceId)
540 restSubscription.AddMd5Sum(md5sum)
541 xapp.Logger.Debug("SubscriptionRequest index=%v processed successfullyfor %s. endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
542 index, *restSubId, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
543 c.sendSuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, e2EventInstanceID, clientEndpoint, trans)
549 //-------------------------------------------------------------------
551 //------------------------------------------------------------------
552 func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, meid *string,
553 restSubId string, e2SubscriptionDirectives *E2SubscriptionDirectives) (*e2ap.E2APSubscriptionResponse, *ErrorInfo, error) {
555 errorInfo := ErrorInfo{}
557 err := c.tracker.Track(trans)
559 xapp.Logger.Error("XAPP-SubReq Tracking error: %s", idstring(err, trans))
560 errorInfo.ErrorCause = err.Error()
561 err = fmt.Errorf("Tracking failure")
562 return nil, &errorInfo, err
565 subs, errorInfo, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c, e2SubscriptionDirectives.CreateRMRRoute)
567 xapp.Logger.Error("XAPP-SubReq Assign error: %s", idstring(err, trans))
568 return nil, &errorInfo, err
574 go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives)
575 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
579 switch themsg := event.(type) {
580 case *e2ap.E2APSubscriptionResponse:
582 return themsg, &errorInfo, nil
583 case *e2ap.E2APSubscriptionFailure:
584 err = fmt.Errorf("E2 SubscriptionFailure received")
585 errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
586 return nil, &errorInfo, err
587 case *PackSubscriptionRequestErrortEvent:
588 err = fmt.Errorf("E2 SubscriptionRequest pack failure")
589 return nil, &themsg.ErrorInfo, err
590 case *SDLWriteErrortEvent:
591 err = fmt.Errorf("SDL write failure")
592 return nil, &themsg.ErrorInfo, err
594 err = fmt.Errorf("Unexpected E2 subscription response received")
595 errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
599 err = fmt.Errorf("E2 subscription response timeout")
600 errorInfo.SetInfo(err.Error(), "", models.SubscriptionInstanceTimeoutTypeE2Timeout)
601 if subs.PolicyUpdate == true {
602 return nil, &errorInfo, err
606 xapp.Logger.Error("XAPP-SubReq E2 subscription failed %s", idstring(err, trans, subs))
607 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
608 return nil, &errorInfo, err
611 //-------------------------------------------------------------------
613 //-------------------------------------------------------------------
614 func (c *Control) sendUnsuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, err error,
615 clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp, errorInfo *ErrorInfo) {
617 // Send notification to xApp that prosessing of a Subscription Request has failed.
618 e2EventInstanceID := (int64)(0)
619 if errorInfo.ErrorSource == "" {
620 // Submgr is default source of error
621 errorInfo.ErrorSource = models.SubscriptionInstanceErrorSourceSUBMGR
623 resp := &models.SubscriptionResponse{
624 SubscriptionID: restSubId,
625 SubscriptionInstances: []*models.SubscriptionInstance{
626 &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
627 ErrorCause: &errorInfo.ErrorCause,
628 ErrorSource: &errorInfo.ErrorSource,
629 TimeoutType: &errorInfo.TimeoutType,
630 XappEventInstanceID: &xAppEventInstanceID},
633 // Mark REST subscription request processed.
634 restSubscription.SetProcessed(err)
635 c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
637 xapp.Logger.Debug("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
638 errorInfo.ErrorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
640 xapp.Logger.Debug("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v",
641 errorInfo.ErrorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID)
644 c.UpdateCounter(cRestSubFailNotifToXapp)
645 xapp.Subscription.Notify(resp, *clientEndpoint)
648 //-------------------------------------------------------------------
650 //-------------------------------------------------------------------
651 func (c *Control) sendSuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, e2EventInstanceID int64,
652 clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp) {
654 // Store successfully processed InstanceId for deletion
655 restSubscription.AddE2InstanceId((uint32)(e2EventInstanceID))
656 restSubscription.AddXappIdToE2Id(xAppEventInstanceID, e2EventInstanceID)
658 // Send notification to xApp that a Subscription Request has been processed.
659 resp := &models.SubscriptionResponse{
660 SubscriptionID: restSubId,
661 SubscriptionInstances: []*models.SubscriptionInstance{
662 &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
664 XappEventInstanceID: &xAppEventInstanceID},
667 // Mark REST subscription request processesd.
668 restSubscription.SetProcessed(nil)
669 c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
670 xapp.Logger.Debug("Sending successful REST notification to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
671 clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
673 c.UpdateCounter(cRestSubNotifToXapp)
674 xapp.Subscription.Notify(resp, *clientEndpoint)
677 //-------------------------------------------------------------------
679 //-------------------------------------------------------------------
680 func (c *Control) RESTSubscriptionDeleteHandler(restSubId string) int {
683 c.UpdateCounter(cRestSubDelReqFromXapp)
685 xapp.Logger.Debug("SubscriptionDeleteRequest from XAPP")
687 restSubscription, err := c.registry.GetRESTSubscription(restSubId, true)
689 xapp.Logger.Error("%s", err.Error())
690 if restSubscription == nil {
691 // Subscription was not found
692 return common.UnsubscribeNoContentCode
694 if restSubscription.SubReqOngoing == true {
695 err := fmt.Errorf("Handling of the REST Subscription Request still ongoing %s", restSubId)
696 xapp.Logger.Error("%s", err.Error())
697 return common.UnsubscribeBadRequestCode
698 } else if restSubscription.SubDelReqOngoing == true {
699 // Previous request for same restSubId still ongoing
700 return common.UnsubscribeBadRequestCode
705 xAppRmrEndPoint := restSubscription.xAppRmrEndPoint
707 xapp.Logger.Debug("Deleteting handler: processing instances = %v", restSubscription.InstanceIds)
708 for _, instanceId := range restSubscription.InstanceIds {
709 xAppEventInstanceID, err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId)
712 xapp.Logger.Error("%s", err.Error())
714 xapp.Logger.Debug("Deleteting instanceId = %v", instanceId)
715 restSubscription.DeleteXappIdToE2Id(xAppEventInstanceID)
716 restSubscription.DeleteE2InstanceId(instanceId)
718 restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(restSubscription.lastReqMd5sum)
719 c.registry.DeleteRESTSubscription(&restSubId)
720 c.RemoveRESTSubscriptionFromDb(restSubId)
723 c.UpdateCounter(cRestSubDelRespToXapp)
725 return common.UnsubscribeNoContentCode
728 //-------------------------------------------------------------------
730 //-------------------------------------------------------------------
731 func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32) (int64, error) {
733 var xAppEventInstanceID int64
734 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{instanceId})
736 xapp.Logger.Debug("Subscription Delete Handler subscription for restSubId=%v, E2EventInstanceID=%v not found %s",
737 restSubId, instanceId, idstring(err, nil))
738 return xAppEventInstanceID, nil
741 xAppEventInstanceID = int64(subs.ReqId.Id)
742 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(*endPoint), *restSubId, e2ap.RequestId{subs.ReqId.Id, 0}, &xapp.RMRMeid{RanName: *meid})
744 err := fmt.Errorf("XAPP-SubDelReq transaction not created. restSubId %s, endPoint %s, meid %s, instanceId %v", *restSubId, *endPoint, *meid, instanceId)
745 xapp.Logger.Error("%s", err.Error())
747 defer trans.Release()
749 err = c.tracker.Track(trans)
751 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
752 xapp.Logger.Error("%s", err.Error())
753 return xAppEventInstanceID, &time.ParseError{}
758 go c.handleSubscriptionDelete(subs, trans)
759 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
761 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
763 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
765 return xAppEventInstanceID, nil
768 //-------------------------------------------------------------------
770 //-------------------------------------------------------------------
771 func (c *Control) RESTQueryHandler() (models.SubscriptionList, error) {
772 xapp.Logger.Debug("RESTQueryHandler() called")
776 return c.registry.QueryHandler()
779 func (c *Control) TestRestHandler(w http.ResponseWriter, r *http.Request) {
780 xapp.Logger.Debug("RESTTestRestHandler() called")
782 pathParams := mux.Vars(r)
783 s := pathParams["testId"]
785 // This can be used to delete single subscription from db
786 if contains := strings.Contains(s, "deletesubid="); contains == true {
787 var splits = strings.Split(s, "=")
788 if subId, err := strconv.ParseInt(splits[1], 10, 64); err == nil {
789 xapp.Logger.Debug("RemoveSubscriptionFromSdl() called. subId = %v", subId)
790 c.RemoveSubscriptionFromSdl(uint32(subId))
795 // This can be used to remove all subscriptions db from
797 xapp.Logger.Debug("RemoveAllSubscriptionsFromSdl() called")
798 c.RemoveAllSubscriptionsFromSdl()
799 c.RemoveAllRESTSubscriptionsFromSdl()
803 // This is meant to cause submgr's restart in testing
805 xapp.Logger.Debug("os.Exit(1) called")
809 xapp.Logger.Debug("Unsupported rest command received %s", s)
812 //-------------------------------------------------------------------
814 //-------------------------------------------------------------------
816 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
817 params := &xapp.RMRParams{}
818 params.Mtype = trans.GetMtype()
819 params.SubId = int(subs.GetReqId().InstanceId)
821 params.Meid = subs.GetMeid()
823 params.PayloadLen = len(trans.Payload.Buf)
824 params.Payload = trans.Payload.Buf
826 xapp.Logger.Debug("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
827 err = c.SendWithRetry(params, false, 5)
829 xapp.Logger.Error("rmrSendToE2T: Send failed: %+v", err)
834 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
836 params := &xapp.RMRParams{}
837 params.Mtype = trans.GetMtype()
838 params.SubId = int(subs.GetReqId().InstanceId)
839 params.Xid = trans.GetXid()
840 params.Meid = trans.GetMeid()
842 params.PayloadLen = len(trans.Payload.Buf)
843 params.Payload = trans.Payload.Buf
845 xapp.Logger.Debug("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
846 err = c.SendWithRetry(params, false, 5)
848 xapp.Logger.Error("rmrSendToXapp: Send failed: %+v", err)
853 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
854 if c.RMRClient == nil {
855 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
856 xapp.Logger.Error("%s", err.Error())
861 defer c.RMRClient.Free(msg.Mbuf)
863 // xapp-frame might use direct access to c buffer and
864 // when msg.Mbuf is freed, someone might take it into use
865 // and payload data might be invalid inside message handle function
867 // subscriptions won't load system a lot so there is no
868 // real performance hit by cloning buffer into new go byte slice
869 cPay := append(msg.Payload[:0:0], msg.Payload...)
871 msg.PayloadLen = len(cPay)
874 case xapp.RIC_SUB_REQ:
875 go c.handleXAPPSubscriptionRequest(msg)
876 case xapp.RIC_SUB_RESP:
877 go c.handleE2TSubscriptionResponse(msg)
878 case xapp.RIC_SUB_FAILURE:
879 go c.handleE2TSubscriptionFailure(msg)
880 case xapp.RIC_SUB_DEL_REQ:
881 go c.handleXAPPSubscriptionDeleteRequest(msg)
882 case xapp.RIC_SUB_DEL_RESP:
883 go c.handleE2TSubscriptionDeleteResponse(msg)
884 case xapp.RIC_SUB_DEL_FAILURE:
885 go c.handleE2TSubscriptionDeleteFailure(msg)
887 xapp.Logger.Debug("Unknown Message Type '%d', discarding", msg.Mtype)
892 //-------------------------------------------------------------------
893 // handle from XAPP Subscription Request
894 //------------------------------------------------------------------
895 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
896 xapp.Logger.Debug("MSG from XAPP: %s", params.String())
897 c.UpdateCounter(cSubReqFromXapp)
899 subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
901 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
905 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
907 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
910 defer trans.Release()
912 if err = c.tracker.Track(trans); err != nil {
913 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
917 //TODO handle subscription toward e2term inside AssignToSubscription / hide handleSubscriptionCreate in it?
918 subs, _, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c, true)
920 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
924 c.wakeSubscriptionRequest(subs, trans)
927 //-------------------------------------------------------------------
928 // Wake Subscription Request to E2node
929 //------------------------------------------------------------------
930 func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) {
932 e2SubscriptionDirectives, _ := c.GetE2SubscriptionDirectives(nil)
933 go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives)
934 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
937 switch themsg := event.(type) {
938 case *e2ap.E2APSubscriptionResponse:
939 themsg.RequestId.Id = trans.RequestId.Id
940 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
943 c.UpdateCounter(cSubRespToXapp)
944 c.rmrSendToXapp("", subs, trans)
947 case *e2ap.E2APSubscriptionFailure:
948 themsg.RequestId.Id = trans.RequestId.Id
949 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
951 c.UpdateCounter(cSubFailToXapp)
952 c.rmrSendToXapp("", subs, trans)
958 xapp.Logger.Debug("XAPP-SubReq: failed %s", idstring(err, trans, subs))
959 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
962 //-------------------------------------------------------------------
963 // handle from XAPP Subscription Delete Request
964 //------------------------------------------------------------------
965 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
966 xapp.Logger.Debug("MSG from XAPP: %s", params.String())
967 c.UpdateCounter(cSubDelReqFromXapp)
969 subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
971 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
975 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
977 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
980 defer trans.Release()
982 err = c.tracker.Track(trans)
984 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
988 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
990 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
997 go c.handleSubscriptionDelete(subs, trans)
998 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
1000 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
1002 if subs.NoRespToXapp == true {
1003 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
1004 xapp.Logger.Debug("XAPP-SubDelReq: subs.NoRespToXapp == true")
1008 // Whatever is received success, fail or timeout, send successful delete response
1009 subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
1010 subDelRespMsg.RequestId.Id = trans.RequestId.Id
1011 subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
1012 subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
1013 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
1015 c.UpdateCounter(cSubDelRespToXapp)
1016 c.rmrSendToXapp("", subs, trans)
1019 //TODO handle subscription toward e2term insiged RemoveFromSubscription / hide handleSubscriptionDelete in it?
1020 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
1023 //-------------------------------------------------------------------
1024 // SUBS CREATE Handling
1025 //-------------------------------------------------------------------
1026 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp, e2SubscriptionDirectives *E2SubscriptionDirectives) {
1028 var event interface{} = nil
1029 var removeSubscriptionFromDb bool = false
1030 trans := c.tracker.NewSubsTransaction(subs)
1031 subs.WaitTransactionTurn(trans)
1032 defer subs.ReleaseTransactionTurn(trans)
1033 defer trans.Release()
1035 xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
1037 subRfMsg, valid := subs.GetCachedResponse()
1038 if subRfMsg == nil && valid == true {
1039 event = c.sendE2TSubscriptionRequest(subs, trans, parentTrans, e2SubscriptionDirectives)
1040 switch event.(type) {
1041 case *e2ap.E2APSubscriptionResponse:
1042 subRfMsg, valid = subs.SetCachedResponse(event, true)
1043 subs.SubRespRcvd = true
1044 case *e2ap.E2APSubscriptionFailure:
1045 removeSubscriptionFromDb = true
1046 subRfMsg, valid = subs.SetCachedResponse(event, false)
1047 xapp.Logger.Debug("SUBS-SubReq: internal delete due failure event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1048 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1049 case *SubmgrRestartTestEvent:
1050 // This simulates that no response has been received and after restart subscriptions are restored from db
1051 xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
1052 case *PackSubscriptionRequestErrortEvent, *SDLWriteErrortEvent:
1053 subRfMsg, valid = subs.SetCachedResponse(event, false)
1055 if subs.PolicyUpdate == false {
1056 xapp.Logger.Debug("SUBS-SubReq: internal delete due default event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1057 removeSubscriptionFromDb = true
1058 subRfMsg, valid = subs.SetCachedResponse(nil, false)
1059 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1062 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
1064 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
1067 err := c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
1069 subRfMsg, valid = subs.SetCachedResponse(event, false)
1070 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1073 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
1075 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
1078 parentTrans.SendEvent(subRfMsg, 0)
1081 //-------------------------------------------------------------------
1082 // SUBS DELETE Handling
1083 //-------------------------------------------------------------------
1085 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
1087 trans := c.tracker.NewSubsTransaction(subs)
1088 subs.WaitTransactionTurn(trans)
1089 defer subs.ReleaseTransactionTurn(trans)
1090 defer trans.Release()
1092 xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
1096 if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
1099 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1103 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
1104 // If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if
1105 // RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...))
1106 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
1107 c.registry.UpdateSubscriptionToDb(subs, c)
1108 parentTrans.SendEvent(nil, 0)
1111 //-------------------------------------------------------------------
1112 // send to E2T Subscription Request
1113 //-------------------------------------------------------------------
1114 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp, e2SubscriptionDirectives *E2SubscriptionDirectives) interface{} {
1116 var event interface{} = nil
1117 var timedOut bool = false
1118 const ricRequestorId = 123
1120 subReqMsg := subs.SubReqMsg
1121 subReqMsg.RequestId = subs.GetReqId().RequestId
1122 subReqMsg.RequestId.Id = ricRequestorId
1123 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
1125 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
1126 return &PackSubscriptionRequestErrortEvent{
1128 ErrorSource: models.SubscriptionInstanceErrorSourceASN1,
1129 ErrorCause: err.Error(),
1134 // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
1135 err = c.WriteSubscriptionToDb(subs)
1137 return &SDLWriteErrortEvent{
1139 ErrorSource: models.SubscriptionInstanceErrorSourceDBAAS,
1140 ErrorCause: err.Error(),
1145 for retries := int64(0); retries < e2SubscriptionDirectives.E2MaxTryCount; retries++ {
1146 desc := fmt.Sprintf("(retry %d)", retries)
1148 c.UpdateCounter(cSubReqToE2)
1150 c.UpdateCounter(cSubReReqToE2)
1152 c.rmrSendToE2T(desc, subs, trans)
1153 if subs.DoNotWaitSubResp == false {
1154 event, timedOut = trans.WaitEvent(e2SubscriptionDirectives.E2TimeoutTimerValue)
1156 c.UpdateCounter(cSubReqTimerExpiry)
1160 // Simulating case where subscrition request has been sent but response has not been received before restart
1161 event = &SubmgrRestartTestEvent{}
1162 xapp.Logger.Debug("Restart event, DoNotWaitSubResp == true")
1166 xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1170 //-------------------------------------------------------------------
1171 // send to E2T Subscription Delete Request
1172 //-------------------------------------------------------------------
1174 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
1176 var event interface{}
1178 const ricRequestorId = 123
1180 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1181 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1182 subDelReqMsg.RequestId.Id = ricRequestorId
1183 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1184 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1186 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
1190 for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
1191 desc := fmt.Sprintf("(retry %d)", retries)
1193 c.UpdateCounter(cSubDelReqToE2)
1195 c.UpdateCounter(cSubDelReReqToE2)
1197 c.rmrSendToE2T(desc, subs, trans)
1198 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
1200 c.UpdateCounter(cSubDelReqTimerExpiry)
1205 xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1209 //-------------------------------------------------------------------
1210 // handle from E2T Subscription Response
1211 //-------------------------------------------------------------------
1212 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
1213 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1214 c.UpdateCounter(cSubRespFromE2)
1216 subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
1218 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
1221 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
1223 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
1226 trans := subs.GetTransaction()
1228 err = fmt.Errorf("Ongoing transaction not found")
1229 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
1232 sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
1233 if sendOk == false {
1234 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1235 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
1240 //-------------------------------------------------------------------
1241 // handle from E2T Subscription Failure
1242 //-------------------------------------------------------------------
1243 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
1244 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1245 c.UpdateCounter(cSubFailFromE2)
1246 subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
1248 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
1251 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
1253 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
1256 trans := subs.GetTransaction()
1258 err = fmt.Errorf("Ongoing transaction not found")
1259 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
1262 sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
1263 if sendOk == false {
1264 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1265 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
1270 //-------------------------------------------------------------------
1271 // handle from E2T Subscription Delete Response
1272 //-------------------------------------------------------------------
1273 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
1274 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1275 c.UpdateCounter(cSubDelRespFromE2)
1276 subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
1278 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1281 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
1283 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1286 trans := subs.GetTransaction()
1288 err = fmt.Errorf("Ongoing transaction not found")
1289 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
1292 sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
1293 if sendOk == false {
1294 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1295 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
1300 //-------------------------------------------------------------------
1301 // handle from E2T Subscription Delete Failure
1302 //-------------------------------------------------------------------
1303 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
1304 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1305 c.UpdateCounter(cSubDelFailFromE2)
1306 subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
1308 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1311 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
1313 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1316 trans := subs.GetTransaction()
1318 err = fmt.Errorf("Ongoing transaction not found")
1319 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
1322 sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
1323 if sendOk == false {
1324 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1325 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
1330 //-------------------------------------------------------------------
1332 //-------------------------------------------------------------------
1333 func typeofSubsMessage(v interface{}) string {
1338 //case *e2ap.E2APSubscriptionRequest:
1340 case *e2ap.E2APSubscriptionResponse:
1342 case *e2ap.E2APSubscriptionFailure:
1344 //case *e2ap.E2APSubscriptionDeleteRequest:
1345 // return "SubDelReq"
1346 case *e2ap.E2APSubscriptionDeleteResponse:
1348 case *e2ap.E2APSubscriptionDeleteFailure:
1355 //-------------------------------------------------------------------
1357 //-------------------------------------------------------------------
1358 func (c *Control) WriteSubscriptionToDb(subs *Subscription) error {
1359 xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
1360 err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
1362 xapp.Logger.Error("%v", err)
1368 //-------------------------------------------------------------------
1370 //-------------------------------------------------------------------
1371 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) error {
1373 if removeSubscriptionFromDb == true {
1374 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1375 c.RemoveSubscriptionFromDb(subs)
1377 // Update is needed for successful response and merge case here
1378 if subs.RetryFromXapp == false {
1379 err := c.WriteSubscriptionToDb(subs)
1383 subs.RetryFromXapp = false
1387 //-------------------------------------------------------------------
1389 //-------------------------------------------------------------------
1390 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
1391 xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
1392 err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
1394 xapp.Logger.Error("%v", err)
1398 //-------------------------------------------------------------------
1400 //-------------------------------------------------------------------
1401 func (c *Control) WriteRESTSubscriptionToDb(restSubId string, restSubs *RESTSubscription) {
1402 xapp.Logger.Debug("WriteRESTSubscriptionToDb() restSubId = %s", restSubId)
1403 err := c.WriteRESTSubscriptionToSdl(restSubId, restSubs)
1405 xapp.Logger.Error("%v", err)
1409 //-------------------------------------------------------------------
1411 //-------------------------------------------------------------------
1412 func (c *Control) UpdateRESTSubscriptionInDB(restSubId string, restSubs *RESTSubscription, removeRestSubscriptionFromDb bool) {
1414 if removeRestSubscriptionFromDb == true {
1415 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1416 c.RemoveRESTSubscriptionFromDb(restSubId)
1418 c.WriteRESTSubscriptionToDb(restSubId, restSubs)
1422 //-------------------------------------------------------------------
1424 //-------------------------------------------------------------------
1425 func (c *Control) RemoveRESTSubscriptionFromDb(restSubId string) {
1426 xapp.Logger.Debug("RemoveRESTSubscriptionFromDb() restSubId = %s", restSubId)
1427 err := c.RemoveRESTSubscriptionFromSdl(restSubId)
1429 xapp.Logger.Error("%v", err)
1433 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) {
1435 const ricRequestorId = 123
1436 xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
1438 // Send delete for every endpoint in the subscription
1439 if subs.PolicyUpdate == false {
1440 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1441 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1442 subDelReqMsg.RequestId.Id = ricRequestorId
1443 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1444 mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1446 xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
1449 for _, endPoint := range subs.EpList.Endpoints {
1450 params := &xapp.RMRParams{}
1451 params.Mtype = mType
1452 params.SubId = int(subs.GetReqId().InstanceId)
1454 params.Meid = subs.Meid
1455 params.Src = endPoint.String()
1456 params.PayloadLen = len(payload.Buf)
1457 params.Payload = payload.Buf
1459 subs.DeleteFromDb = true
1460 c.handleXAPPSubscriptionDeleteRequest(params)
1465 func (c *Control) PrintRESTSubscriptionRequest(p *models.SubscriptionParams) {
1467 fmt.Println("CRESTSubscriptionRequest")
1473 if p.SubscriptionID != "" {
1474 fmt.Println(" SubscriptionID = ", p.SubscriptionID)
1476 fmt.Println(" SubscriptionID = ''")
1479 fmt.Printf(" ClientEndpoint.Host = %s\n", p.ClientEndpoint.Host)
1481 if p.ClientEndpoint.HTTPPort != nil {
1482 fmt.Printf(" ClientEndpoint.HTTPPort = %v\n", *p.ClientEndpoint.HTTPPort)
1484 fmt.Println(" ClientEndpoint.HTTPPort = nil")
1487 if p.ClientEndpoint.RMRPort != nil {
1488 fmt.Printf(" ClientEndpoint.RMRPort = %v\n", *p.ClientEndpoint.RMRPort)
1490 fmt.Println(" ClientEndpoint.RMRPort = nil")
1494 fmt.Printf(" Meid = %s\n", *p.Meid)
1496 fmt.Println(" Meid = nil")
1499 if p.E2SubscriptionDirectives == nil {
1500 fmt.Println(" E2SubscriptionDirectives = nil")
1502 fmt.Println(" E2SubscriptionDirectives")
1503 if p.E2SubscriptionDirectives.E2RetryCount == nil {
1504 fmt.Println(" E2RetryCount == nil")
1506 fmt.Printf(" E2RetryCount = %v\n", *p.E2SubscriptionDirectives.E2RetryCount)
1508 fmt.Printf(" E2TimeoutTimerValue = %v\n", p.E2SubscriptionDirectives.E2TimeoutTimerValue)
1509 if p.E2SubscriptionDirectives.RMRRoutingNeeded == nil {
1510 fmt.Println(" RMRRoutingNeeded == nil")
1512 fmt.Printf(" RMRRoutingNeeded = %v\n", *p.E2SubscriptionDirectives.RMRRoutingNeeded)
1515 for _, subscriptionDetail := range p.SubscriptionDetails {
1516 if p.RANFunctionID != nil {
1517 fmt.Printf(" RANFunctionID = %v\n", *p.RANFunctionID)
1519 fmt.Println(" RANFunctionID = nil")
1521 fmt.Printf(" SubscriptionDetail.XappEventInstanceID = %v\n", *subscriptionDetail.XappEventInstanceID)
1522 fmt.Printf(" SubscriptionDetail.EventTriggers = %v\n", subscriptionDetail.EventTriggers)
1524 for _, actionToBeSetup := range subscriptionDetail.ActionToBeSetupList {
1525 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionID = %v\n", *actionToBeSetup.ActionID)
1526 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionType = %s\n", *actionToBeSetup.ActionType)
1527 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionDefinition = %v\n", actionToBeSetup.ActionDefinition)
1529 if actionToBeSetup.SubsequentAction != nil {
1530 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.SubsequentAction.SubsequentActionType = %s\n", *actionToBeSetup.SubsequentAction.SubsequentActionType)
1531 fmt.Printf(" SubscriptionDetail.ActionToBeSetup..SubsequentAction.TimeToWait = %s\n", *actionToBeSetup.SubsequentAction.TimeToWait)
1533 fmt.Println(" SubscriptionDetail.ActionToBeSetup.SubsequentAction = nil")