2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
31 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
32 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
33 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
34 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/common"
35 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
36 httptransport "github.com/go-openapi/runtime/client"
37 "github.com/go-openapi/strfmt"
38 "github.com/gorilla/mux"
39 "github.com/segmentio/ksuid"
40 "github.com/spf13/viper"
43 //-----------------------------------------------------------------------------
45 //-----------------------------------------------------------------------------
47 func idstring(err error, entries ...fmt.Stringer) string {
48 var retval string = ""
49 var filler string = ""
50 for _, entry := range entries {
52 retval += filler + entry.String()
55 retval += filler + "(NIL)"
59 retval += filler + "err(" + err.Error() + ")"
65 //-----------------------------------------------------------------------------
67 //-----------------------------------------------------------------------------
69 var e2tSubReqTimeout time.Duration
70 var e2tSubDelReqTime time.Duration
71 var e2tRecvMsgTimeout time.Duration
72 var waitRouteCleanup_ms time.Duration
73 var e2tMaxSubReqTryCount uint64 // Initial try + retry
74 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
75 var checkE2State string
76 var readSubsFromDb string
77 var dbRetryForever string
85 restDuplicateCtrl *DuplicateCtrl
87 e2IfStateDb XappRnibInterface
89 restSubsDb Sdlnterface
92 Counters map[string]xapp.Counter
103 type SubmgrRestartTestEvent struct{}
104 type SubmgrRestartUpEvent struct{}
105 type PackSubscriptionRequestErrortEvent struct {
109 func (p *PackSubscriptionRequestErrortEvent) SetEvent(errorInfo *ErrorInfo) {
110 p.ErrorInfo = *errorInfo
113 type SDLWriteErrortEvent struct {
117 func (s *SDLWriteErrortEvent) SetEvent(errorInfo *ErrorInfo) {
118 s.ErrorInfo = *errorInfo
122 xapp.Logger.Debug("SUBMGR")
124 viper.SetEnvPrefix("submgr")
125 viper.AllowEmptyEnv(true)
128 func NewControl() *Control {
130 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
131 rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
133 registry := new(Registry)
134 registry.Initialize()
135 registry.rtmgrClient = &rtmgrClient
137 tracker := new(Tracker)
140 restDuplicateCtrl := new(DuplicateCtrl)
141 restDuplicateCtrl.Init()
143 e2IfState := new(E2IfState)
145 c := &Control{e2ap: new(E2ap),
148 restDuplicateCtrl: restDuplicateCtrl,
149 e2IfState: e2IfState,
150 e2IfStateDb: CreateXappRnibIfInstance(),
151 e2SubsDb: CreateSdl(),
152 restSubsDb: CreateRESTSdl(),
153 Counters: xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
158 c.ReadConfigParameters("")
160 // Register REST handler for testing support
161 xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
162 xapp.Resource.InjectRoute("/ric/v1/restsubscriptions", c.GetAllRestSubscriptions, "GET")
163 xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET")
165 if readSubsFromDb == "true" {
166 // Read subscriptions from db
167 c.ReadE2Subscriptions()
168 c.ReadRESTSubscriptions()
171 go xapp.Subscription.Listen(c.RESTSubscriptionHandler, c.RESTQueryHandler, c.RESTSubscriptionDeleteHandler)
175 func (c *Control) SymptomDataHandler(w http.ResponseWriter, r *http.Request) {
176 subscriptions, _ := c.registry.QueryHandler()
177 xapp.Resource.SendSymptomDataJson(w, r, subscriptions, "platform/subscriptions.json")
180 //-------------------------------------------------------------------
182 //-------------------------------------------------------------------
183 func (c *Control) GetAllRestSubscriptions(w http.ResponseWriter, r *http.Request) {
184 xapp.Logger.Debug("GetAllRestSubscriptions() called")
185 response := c.registry.GetAllRestSubscriptions()
189 //-------------------------------------------------------------------
191 //-------------------------------------------------------------------
192 func (c *Control) ReadE2Subscriptions() error {
195 var register map[uint32]*Subscription
196 for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
197 xapp.Logger.Debug("Reading E2 subscriptions from db")
198 subIds, register, err = c.ReadAllSubscriptionsFromSdl()
200 xapp.Logger.Error("%v", err)
201 <-time.After(1 * time.Second)
203 c.registry.subIds = subIds
204 c.registry.register = register
205 c.HandleUncompletedSubscriptions(register)
209 xapp.Logger.Debug("Continuing without retring")
213 //-------------------------------------------------------------------
215 //-------------------------------------------------------------------
216 func (c *Control) ReadRESTSubscriptions() error {
218 xapp.Logger.Debug("ReadRESTSubscriptions()")
220 var restSubscriptions map[string]*RESTSubscription
221 for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
222 xapp.Logger.Debug("Reading REST subscriptions from db")
223 restSubscriptions, err = c.ReadAllRESTSubscriptionsFromSdl()
225 xapp.Logger.Error("%v", err)
226 <-time.After(1 * time.Second)
228 // Fix REST subscriptions ongoing status after restart
229 for restSubId, restSubscription := range restSubscriptions {
230 restSubscription.SubReqOngoing = false
231 restSubscription.SubDelReqOngoing = false
232 c.WriteRESTSubscriptionToSdl(restSubId, restSubscription)
234 c.registry.restSubscriptions = restSubscriptions
238 xapp.Logger.Debug("Continuing without retring")
242 //-------------------------------------------------------------------
244 //-------------------------------------------------------------------
245 func (c *Control) ReadConfigParameters(f string) {
247 xapp.Logger.Debug("ReadConfigParameters")
249 c.LoggerLevel = int(xapp.Logger.GetLevel())
250 xapp.Logger.Info("LoggerLevel = %v", c.LoggerLevel)
251 c.e2ap.SetASN1DebugPrintStatus(c.LoggerLevel)
253 // viper.GetDuration returns nanoseconds
254 e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
255 if e2tSubReqTimeout == 0 {
256 e2tSubReqTimeout = 2000 * 1000000
257 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tSubReqTimeout")
259 xapp.Logger.Debug("e2tSubReqTimeout= %v", e2tSubReqTimeout)
261 e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
262 if e2tSubDelReqTime == 0 {
263 e2tSubDelReqTime = 2000 * 1000000
264 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tSubDelReqTime")
266 xapp.Logger.Debug("e2tSubDelReqTime= %v", e2tSubDelReqTime)
268 e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
269 if e2tRecvMsgTimeout == 0 {
270 e2tRecvMsgTimeout = 2000 * 1000000
271 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tRecvMsgTimeout")
273 xapp.Logger.Debug("e2tRecvMsgTimeout= %v", e2tRecvMsgTimeout)
275 e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
276 if e2tMaxSubReqTryCount == 0 {
277 e2tMaxSubReqTryCount = 1
278 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tMaxSubReqTryCount")
280 xapp.Logger.Debug("e2tMaxSubReqTryCount= %v", e2tMaxSubReqTryCount)
282 e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
283 if e2tMaxSubDelReqTryCount == 0 {
284 e2tMaxSubDelReqTryCount = 1
285 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tMaxSubDelReqTryCount")
287 xapp.Logger.Debug("e2tMaxSubDelReqTryCount= %v", e2tMaxSubDelReqTryCount)
289 checkE2State = viper.GetString("controls.checkE2State")
290 if checkE2State == "" {
291 checkE2State = "true"
292 xapp.Logger.Debug("WARNING: Using hard coded default value for checkE2State")
294 xapp.Logger.Debug("checkE2State= %v", checkE2State)
296 readSubsFromDb = viper.GetString("controls.readSubsFromDb")
297 if readSubsFromDb == "" {
298 readSubsFromDb = "true"
299 xapp.Logger.Debug("WARNING: Using hard coded default value for readSubsFromDb")
301 xapp.Logger.Debug("readSubsFromDb= %v", readSubsFromDb)
303 dbTryCount = viper.GetInt("controls.dbTryCount")
306 xapp.Logger.Debug("WARNING: Using hard coded default value for dbTryCount")
308 xapp.Logger.Debug("dbTryCount= %v", dbTryCount)
310 dbRetryForever = viper.GetString("controls.dbRetryForever")
311 if dbRetryForever == "" {
312 dbRetryForever = "true"
313 xapp.Logger.Debug("WARNING: Using hard coded default value for dbRetryForever")
315 xapp.Logger.Debug("dbRetryForever= %v", dbRetryForever)
317 // Internal cfg parameter, used to define a wait time for RMR route clean-up. None default
318 // value 100ms used currently only in unittests.
319 waitRouteCleanup_ms = viper.GetDuration("controls.waitRouteCleanup_ms") * 1000000
320 if waitRouteCleanup_ms == 0 {
321 waitRouteCleanup_ms = 5000 * 1000000
322 xapp.Logger.Debug("WARNING: Using hard coded default value for waitRouteCleanup_ms")
324 xapp.Logger.Debug("waitRouteCleanup= %v", waitRouteCleanup_ms)
327 //-------------------------------------------------------------------
329 //-------------------------------------------------------------------
330 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
332 xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
333 for subId, subs := range register {
334 if subs.SubRespRcvd == false {
335 // If policy subscription has already been made successfully unsuccessful update should not be deleted.
336 if subs.PolicyUpdate == false {
337 subs.NoRespToXapp = true
338 xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
339 c.SendSubscriptionDeleteReq(subs)
345 func (c *Control) ReadyCB(data interface{}) {
346 if c.RMRClient == nil {
347 c.RMRClient = xapp.Rmr
351 func (c *Control) Run() {
352 xapp.SetReadyCB(c.ReadyCB, nil)
353 xapp.AddConfigChangeListener(c.ReadConfigParameters)
357 //-------------------------------------------------------------------
359 //-------------------------------------------------------------------
360 func (c *Control) GetOrCreateRestSubscription(p *models.SubscriptionParams, md5sum string, xAppRmrEndpoint string) (*RESTSubscription, string, error) {
363 var restSubscription *RESTSubscription
366 prevRestSubsId, exists := c.restDuplicateCtrl.GetLastKnownRestSubsIdBasedOnMd5sum(md5sum)
367 if p.SubscriptionID == "" {
368 // Subscription does not contain REST subscription Id
370 restSubscription, err = c.registry.GetRESTSubscription(prevRestSubsId, false)
371 if restSubscription != nil {
372 // Subscription not found
373 restSubId = prevRestSubsId
375 xapp.Logger.Debug("Existing restSubId %s found by MD5sum %s for a request without subscription ID - using previous subscription", prevRestSubsId, md5sum)
377 xapp.Logger.Debug("Existing restSubId %s found by MD5sum %s for a request without subscription ID - Note: %s", prevRestSubsId, md5sum, err.Error())
380 xapp.Logger.Debug("None existing restSubId %s referred by MD5sum %s for a request without subscription ID - deleting cached entry", prevRestSubsId, md5sum)
381 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
385 if restSubscription == nil {
386 restSubId = ksuid.New().String()
387 restSubscription = c.registry.CreateRESTSubscription(&restSubId, &xAppRmrEndpoint, p.Meid)
390 // Subscription contains REST subscription Id
391 restSubId = p.SubscriptionID
393 xapp.Logger.Debug("RestSubscription ID %s provided via REST request", restSubId)
394 restSubscription, err = c.registry.GetRESTSubscription(restSubId, false)
396 // Subscription with id in REST request does not exist
397 xapp.Logger.Error("%s", err.Error())
398 c.UpdateCounter(cRestSubFailToXapp)
403 xapp.Logger.Debug("Existing restSubscription found for ID %s, new request based on md5sum", restSubId)
405 xapp.Logger.Debug("Existing restSubscription found for ID %s(%s), re-transmission based on md5sum match with previous request", prevRestSubsId, restSubId)
409 return restSubscription, restSubId, nil
412 //-------------------------------------------------------------------
414 //-------------------------------------------------------------------
415 func (c *Control) RESTSubscriptionHandler(params interface{}) (*models.SubscriptionResponse, int) {
418 c.UpdateCounter(cRestSubReqFromXapp)
420 subResp := models.SubscriptionResponse{}
421 p := params.(*models.SubscriptionParams)
423 if c.LoggerLevel > 2 {
424 c.PrintRESTSubscriptionRequest(p)
427 if c.e2IfState.IsE2ConnectionUp(p.Meid) == false {
428 xapp.Logger.Error("No E2 connection for ranName %v", *p.Meid)
429 c.UpdateCounter(cRestReqRejDueE2Down)
430 return nil, common.SubscribeServiceUnavailableCode
433 if p.ClientEndpoint == nil {
434 err := fmt.Errorf("ClientEndpoint == nil")
435 xapp.Logger.Error("%v", err)
436 c.UpdateCounter(cRestSubFailToXapp)
437 return nil, common.SubscribeBadRequestCode
440 _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*p.ClientEndpoint)
442 xapp.Logger.Error("%s", err.Error())
443 c.UpdateCounter(cRestSubFailToXapp)
444 return nil, common.SubscribeBadRequestCode
447 md5sum, err := CalculateRequestMd5sum(params)
449 xapp.Logger.Error("Failed to generate md5sum from incoming request - %s", err.Error())
452 restSubscription, restSubId, err := c.GetOrCreateRestSubscription(p, md5sum, xAppRmrEndpoint)
454 xapp.Logger.Error("Subscription with id in REST request does not exist")
455 return nil, common.SubscribeNotFoundCode
458 subResp.SubscriptionID = &restSubId
459 subReqList := e2ap.SubscriptionRequestList{}
460 err = c.e2ap.FillSubscriptionReqMsgs(params, &subReqList, restSubscription)
462 xapp.Logger.Error("%s", err.Error())
463 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
464 c.registry.DeleteRESTSubscription(&restSubId)
465 c.UpdateCounter(cRestSubFailToXapp)
466 return nil, common.SubscribeBadRequestCode
469 duplicate := c.restDuplicateCtrl.IsDuplicateToOngoingTransaction(restSubId, md5sum)
471 err := fmt.Errorf("Retransmission blocker direct ACK for request of restSubsId %s restSubId MD5sum %s as retransmission", restSubId, md5sum)
472 xapp.Logger.Debug("%s", err)
473 c.registry.DeleteRESTSubscription(&restSubId)
474 c.UpdateCounter(cRestSubRespToXapp)
475 return &subResp, common.SubscribeCreatedCode
478 c.WriteRESTSubscriptionToDb(restSubId, restSubscription)
479 e2SubscriptionDirectives, err := c.GetE2SubscriptionDirectives(p)
481 xapp.Logger.Error("%s", err)
482 c.registry.DeleteRESTSubscription(&restSubId)
483 return nil, common.SubscribeBadRequestCode
485 go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId, xAppRmrEndpoint, md5sum, e2SubscriptionDirectives)
487 c.UpdateCounter(cRestSubRespToXapp)
488 return &subResp, common.SubscribeCreatedCode
491 //-------------------------------------------------------------------
493 //-------------------------------------------------------------------
494 func (c *Control) GetE2SubscriptionDirectives(p *models.SubscriptionParams) (*E2SubscriptionDirectives, error) {
496 e2SubscriptionDirectives := &E2SubscriptionDirectives{}
497 if p == nil || p.E2SubscriptionDirectives == nil {
498 e2SubscriptionDirectives.E2TimeoutTimerValue = e2tSubReqTimeout
499 e2SubscriptionDirectives.E2MaxTryCount = int64(e2tMaxSubReqTryCount)
500 e2SubscriptionDirectives.CreateRMRRoute = true
501 xapp.Logger.Debug("p == nil || p.E2SubscriptionDirectives == nil. Using default values for E2TimeoutTimerValue = %v and E2RetryCount = %v RMRRoutingNeeded = true", e2tSubReqTimeout, e2tMaxSubReqTryCount)
503 if p.E2SubscriptionDirectives.E2TimeoutTimerValue >= 1 && p.E2SubscriptionDirectives.E2TimeoutTimerValue <= 10 {
504 e2SubscriptionDirectives.E2TimeoutTimerValue = time.Duration(p.E2SubscriptionDirectives.E2TimeoutTimerValue) * 1000000000 // Duration type cast returns nano seconds
506 return nil, fmt.Errorf("p.E2SubscriptionDirectives.E2TimeoutTimerValue out of range (1-10 seconds): %v", p.E2SubscriptionDirectives.E2TimeoutTimerValue)
508 if p.E2SubscriptionDirectives.E2RetryCount == nil {
509 xapp.Logger.Error("p.E2SubscriptionDirectives.E2RetryCount == nil. Using default value")
510 e2SubscriptionDirectives.E2MaxTryCount = int64(e2tMaxSubReqTryCount)
512 if *p.E2SubscriptionDirectives.E2RetryCount >= 0 && *p.E2SubscriptionDirectives.E2RetryCount <= 10 {
513 e2SubscriptionDirectives.E2MaxTryCount = *p.E2SubscriptionDirectives.E2RetryCount + 1 // E2MaxTryCount = First sending plus two retries
515 return nil, fmt.Errorf("p.E2SubscriptionDirectives.E2RetryCount out of range (0-10): %v", *p.E2SubscriptionDirectives.E2RetryCount)
518 e2SubscriptionDirectives.CreateRMRRoute = p.E2SubscriptionDirectives.RMRRoutingNeeded
520 xapp.Logger.Debug("e2SubscriptionDirectives.E2TimeoutTimerValue: %v", e2SubscriptionDirectives.E2TimeoutTimerValue)
521 xapp.Logger.Debug("e2SubscriptionDirectives.E2MaxTryCount: %v", e2SubscriptionDirectives.E2MaxTryCount)
522 xapp.Logger.Debug("e2SubscriptionDirectives.CreateRMRRoute: %v", e2SubscriptionDirectives.CreateRMRRoute)
523 return e2SubscriptionDirectives, nil
526 //-------------------------------------------------------------------
528 //-------------------------------------------------------------------
530 func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription, subReqList *e2ap.SubscriptionRequestList,
531 clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string, xAppRmrEndpoint string, md5sum string, e2SubscriptionDirectives *E2SubscriptionDirectives) {
533 c.SubscriptionProcessingStartDelay()
534 xapp.Logger.Debug("E2 SubscriptionRequest count = %v ", len(subReqList.E2APSubscriptionRequests))
536 var xAppEventInstanceID int64
537 var e2EventInstanceID int64
538 errorInfo := &ErrorInfo{}
540 defer c.restDuplicateCtrl.SetMd5sumFromLastOkRequest(*restSubId, md5sum)
542 for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ {
543 subReqMsg := subReqList.E2APSubscriptionRequests[index]
544 xAppEventInstanceID = (int64)(subReqMsg.RequestId.Id)
546 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), *restSubId, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid})
548 // Send notification to xApp that prosessing of a Subscription Request has failed.
549 err := fmt.Errorf("Tracking failure")
550 errorInfo.ErrorCause = err.Error()
551 c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans, errorInfo)
555 xapp.Logger.Debug("Handle SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
557 subRespMsg, errorInfo, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, *restSubId, e2SubscriptionDirectives)
559 xapp.Logger.Debug("Handled SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
563 if err.Error() == "TEST: restart event received" {
564 // This is just for UT cases. Stop here subscription processing
567 c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans, errorInfo)
569 e2EventInstanceID = (int64)(subRespMsg.RequestId.InstanceId)
570 restSubscription.AddMd5Sum(md5sum)
571 xapp.Logger.Debug("SubscriptionRequest index=%v processed successfullyfor %s. endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
572 index, *restSubId, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
573 c.sendSuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, e2EventInstanceID, clientEndpoint, trans)
578 //-------------------------------------------------------------------
580 //------------------------------------------------------------------
581 func (c *Control) SubscriptionProcessingStartDelay() {
582 if c.UTTesting == true {
583 // This is temporary fix for the UT problem that notification arrives before subscription response
584 // Correct fix would be to allow notification come before response and process it correctly
585 xapp.Logger.Debug("Setting 50 ms delay before starting processing Subscriptions")
586 <-time.After(time.Millisecond * 50)
587 xapp.Logger.Debug("Continuing after delay")
591 //-------------------------------------------------------------------
593 //------------------------------------------------------------------
594 func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, meid *string,
595 restSubId string, e2SubscriptionDirectives *E2SubscriptionDirectives) (*e2ap.E2APSubscriptionResponse, *ErrorInfo, error) {
597 errorInfo := ErrorInfo{}
599 err := c.tracker.Track(trans)
601 xapp.Logger.Error("XAPP-SubReq Tracking error: %s", idstring(err, trans))
602 errorInfo.ErrorCause = err.Error()
603 err = fmt.Errorf("Tracking failure")
604 return nil, &errorInfo, err
607 subs, errorInfo, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c, e2SubscriptionDirectives.CreateRMRRoute)
609 xapp.Logger.Error("XAPP-SubReq Assign error: %s", idstring(err, trans))
610 return nil, &errorInfo, err
616 subs.OngoingReqCount++
617 go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives, 0)
618 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
619 subs.OngoingReqCount--
623 switch themsg := event.(type) {
624 case *e2ap.E2APSubscriptionResponse:
626 if c.e2IfState.IsE2ConnectionUp(meid) == true {
627 return themsg, &errorInfo, nil
629 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
630 c.RemoveSubscriptionFromDb(subs)
631 err = fmt.Errorf("E2 interface down")
632 errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
634 case *e2ap.E2APSubscriptionFailure:
635 err = fmt.Errorf("E2 SubscriptionFailure received")
636 errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
637 case *PackSubscriptionRequestErrortEvent:
638 err = fmt.Errorf("E2 SubscriptionRequest pack failure")
639 errorInfo = themsg.ErrorInfo
640 case *SDLWriteErrortEvent:
641 err = fmt.Errorf("SDL write failure")
642 errorInfo = themsg.ErrorInfo
643 case *SubmgrRestartTestEvent:
644 err = fmt.Errorf("TEST: restart event received")
645 xapp.Logger.Debug("%s", err)
646 return nil, &errorInfo, err
648 err = fmt.Errorf("Unexpected E2 subscription response received")
649 errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
654 err = fmt.Errorf("E2 subscription response timeout")
655 errorInfo.SetInfo(err.Error(), "", models.SubscriptionInstanceTimeoutTypeE2Timeout)
656 if subs.PolicyUpdate == true {
657 return nil, &errorInfo, err
661 xapp.Logger.Error("XAPP-SubReq E2 subscription failed %s", idstring(err, trans, subs))
662 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
663 return nil, &errorInfo, err
666 //-------------------------------------------------------------------
668 //-------------------------------------------------------------------
669 func (c *Control) sendUnsuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, err error,
670 clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp, errorInfo *ErrorInfo) {
672 // Send notification to xApp that prosessing of a Subscription Request has failed.
673 e2EventInstanceID := (int64)(0)
674 if errorInfo.ErrorSource == "" {
675 // Submgr is default source of error
676 errorInfo.ErrorSource = models.SubscriptionInstanceErrorSourceSUBMGR
678 resp := &models.SubscriptionResponse{
679 SubscriptionID: restSubId,
680 SubscriptionInstances: []*models.SubscriptionInstance{
681 &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
682 ErrorCause: errorInfo.ErrorCause,
683 ErrorSource: errorInfo.ErrorSource,
684 TimeoutType: errorInfo.TimeoutType,
685 XappEventInstanceID: &xAppEventInstanceID},
688 // Mark REST subscription request processed.
689 restSubscription.SetProcessed(err)
690 c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
692 xapp.Logger.Debug("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
693 errorInfo.ErrorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
695 xapp.Logger.Debug("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v",
696 errorInfo.ErrorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID)
699 c.UpdateCounter(cRestSubFailNotifToXapp)
700 xapp.Subscription.Notify(resp, *clientEndpoint)
702 // E2 is down. Delete completely processed request safely now
703 if c.e2IfState.IsE2ConnectionUp(&restSubscription.Meid) == false && restSubscription.SubReqOngoing == false {
704 c.registry.DeleteRESTSubscription(restSubId)
705 c.RemoveRESTSubscriptionFromDb(*restSubId)
709 //-------------------------------------------------------------------
711 //-------------------------------------------------------------------
712 func (c *Control) sendSuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, e2EventInstanceID int64,
713 clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp) {
715 // Store successfully processed InstanceId for deletion
716 restSubscription.AddE2InstanceId((uint32)(e2EventInstanceID))
717 restSubscription.AddXappIdToE2Id(xAppEventInstanceID, e2EventInstanceID)
719 // Send notification to xApp that a Subscription Request has been processed.
720 resp := &models.SubscriptionResponse{
721 SubscriptionID: restSubId,
722 SubscriptionInstances: []*models.SubscriptionInstance{
723 &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
725 XappEventInstanceID: &xAppEventInstanceID},
728 // Mark REST subscription request processesd.
729 restSubscription.SetProcessed(nil)
730 c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
731 xapp.Logger.Debug("Sending successful REST notification to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
732 clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
734 c.UpdateCounter(cRestSubNotifToXapp)
735 xapp.Subscription.Notify(resp, *clientEndpoint)
737 // E2 is down. Delete completely processed request safely now
738 if c.e2IfState.IsE2ConnectionUp(&restSubscription.Meid) == false && restSubscription.SubReqOngoing == false {
739 c.registry.DeleteRESTSubscription(restSubId)
740 c.RemoveRESTSubscriptionFromDb(*restSubId)
744 //-------------------------------------------------------------------
746 //-------------------------------------------------------------------
747 func (c *Control) RESTSubscriptionDeleteHandler(restSubId string) int {
750 c.UpdateCounter(cRestSubDelReqFromXapp)
752 xapp.Logger.Debug("SubscriptionDeleteRequest from XAPP")
754 restSubscription, err := c.registry.GetRESTSubscription(restSubId, true)
756 xapp.Logger.Error("%s", err.Error())
757 if restSubscription == nil {
758 // Subscription was not found
759 c.UpdateCounter(cRestSubDelRespToXapp)
760 return common.UnsubscribeNoContentCode
762 if restSubscription.SubReqOngoing == true {
763 err := fmt.Errorf("Handling of the REST Subscription Request still ongoing %s", restSubId)
764 xapp.Logger.Error("%s", err.Error())
765 c.UpdateCounter(cRestSubDelFailToXapp)
766 return common.UnsubscribeBadRequestCode
767 } else if restSubscription.SubDelReqOngoing == true {
768 // Previous request for same restSubId still ongoing
769 c.UpdateCounter(cRestSubDelRespToXapp)
770 return common.UnsubscribeNoContentCode
775 xAppRmrEndPoint := restSubscription.xAppRmrEndPoint
777 xapp.Logger.Debug("Deleteting handler: processing instances = %v", restSubscription.InstanceIds)
778 for _, instanceId := range restSubscription.InstanceIds {
779 xAppEventInstanceID, err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId, 0)
782 xapp.Logger.Error("%s", err.Error())
784 xapp.Logger.Debug("Deleteting instanceId = %v", instanceId)
785 restSubscription.DeleteXappIdToE2Id(xAppEventInstanceID)
786 restSubscription.DeleteE2InstanceId(instanceId)
788 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(restSubscription.lastReqMd5sum)
789 c.registry.DeleteRESTSubscription(&restSubId)
790 c.RemoveRESTSubscriptionFromDb(restSubId)
793 c.UpdateCounter(cRestSubDelRespToXapp)
794 return common.UnsubscribeNoContentCode
797 //-------------------------------------------------------------------
799 //-------------------------------------------------------------------
800 func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32, waitRouteCleanupTime time.Duration) (int64, error) {
802 var xAppEventInstanceID int64
803 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{instanceId})
805 xapp.Logger.Debug("Subscription Delete Handler subscription for restSubId=%v, E2EventInstanceID=%v not found %s",
806 restSubId, instanceId, idstring(err, nil))
807 return xAppEventInstanceID, nil
810 xAppEventInstanceID = int64(subs.ReqId.Id)
811 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(*endPoint), *restSubId, e2ap.RequestId{subs.ReqId.Id, 0}, &xapp.RMRMeid{RanName: *meid})
813 err := fmt.Errorf("XAPP-SubDelReq transaction not created. restSubId %s, endPoint %s, meid %s, instanceId %v", *restSubId, *endPoint, *meid, instanceId)
814 xapp.Logger.Error("%s", err.Error())
816 defer trans.Release()
818 err = c.tracker.Track(trans)
820 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
821 xapp.Logger.Error("%s", err.Error())
822 return xAppEventInstanceID, &time.ParseError{}
827 subs.OngoingDelCount++
828 go c.handleSubscriptionDelete(subs, trans, waitRouteCleanupTime)
829 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
830 subs.OngoingDelCount--
832 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
834 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
836 return xAppEventInstanceID, nil
839 //-------------------------------------------------------------------
841 //-------------------------------------------------------------------
842 func (c *Control) RESTQueryHandler() (models.SubscriptionList, error) {
843 xapp.Logger.Debug("RESTQueryHandler() called")
847 return c.registry.QueryHandler()
850 func (c *Control) TestRestHandler(w http.ResponseWriter, r *http.Request) {
851 xapp.Logger.Debug("RESTTestRestHandler() called")
853 pathParams := mux.Vars(r)
854 s := pathParams["testId"]
856 // This can be used to delete single subscription from db
857 if contains := strings.Contains(s, "deletesubid="); contains == true {
858 var splits = strings.Split(s, "=")
859 if subId, err := strconv.ParseInt(splits[1], 10, 64); err == nil {
860 xapp.Logger.Debug("RemoveSubscriptionFromSdl() called. subId = %v", subId)
861 c.RemoveSubscriptionFromSdl(uint32(subId))
866 // This can be used to remove all subscriptions db from
868 xapp.Logger.Debug("RemoveAllSubscriptionsFromSdl() called")
869 c.RemoveAllSubscriptionsFromSdl()
870 c.RemoveAllRESTSubscriptionsFromSdl()
874 // This is meant to cause submgr's restart in testing
876 xapp.Logger.Debug("os.Exit(1) called")
880 xapp.Logger.Debug("Unsupported rest command received %s", s)
883 //-------------------------------------------------------------------
885 //-------------------------------------------------------------------
887 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
888 params := &xapp.RMRParams{}
889 params.Mtype = trans.GetMtype()
890 params.SubId = int(subs.GetReqId().InstanceId)
892 params.Meid = subs.GetMeid()
894 params.PayloadLen = len(trans.Payload.Buf)
895 params.Payload = trans.Payload.Buf
897 xapp.Logger.Debug("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
898 err = c.SendWithRetry(params, false, 5)
900 xapp.Logger.Error("rmrSendToE2T: Send failed: %+v", err)
905 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
907 params := &xapp.RMRParams{}
908 params.Mtype = trans.GetMtype()
909 params.SubId = int(subs.GetReqId().InstanceId)
910 params.Xid = trans.GetXid()
911 params.Meid = trans.GetMeid()
913 params.PayloadLen = len(trans.Payload.Buf)
914 params.Payload = trans.Payload.Buf
916 xapp.Logger.Debug("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
917 err = c.SendWithRetry(params, false, 5)
919 xapp.Logger.Error("rmrSendToXapp: Send failed: %+v", err)
924 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
925 if c.RMRClient == nil {
926 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
927 xapp.Logger.Error("%s", err.Error())
932 defer c.RMRClient.Free(msg.Mbuf)
934 // xapp-frame might use direct access to c buffer and
935 // when msg.Mbuf is freed, someone might take it into use
936 // and payload data might be invalid inside message handle function
938 // subscriptions won't load system a lot so there is no
939 // real performance hit by cloning buffer into new go byte slice
940 cPay := append(msg.Payload[:0:0], msg.Payload...)
942 msg.PayloadLen = len(cPay)
945 case xapp.RIC_SUB_REQ:
946 go c.handleXAPPSubscriptionRequest(msg)
947 case xapp.RIC_SUB_RESP:
948 go c.handleE2TSubscriptionResponse(msg)
949 case xapp.RIC_SUB_FAILURE:
950 go c.handleE2TSubscriptionFailure(msg)
951 case xapp.RIC_SUB_DEL_REQ:
952 go c.handleXAPPSubscriptionDeleteRequest(msg)
953 case xapp.RIC_SUB_DEL_RESP:
954 go c.handleE2TSubscriptionDeleteResponse(msg)
955 case xapp.RIC_SUB_DEL_FAILURE:
956 go c.handleE2TSubscriptionDeleteFailure(msg)
958 xapp.Logger.Debug("Unknown Message Type '%d', discarding", msg.Mtype)
963 //-------------------------------------------------------------------
964 // handle from XAPP Subscription Request
965 //------------------------------------------------------------------
966 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
967 xapp.Logger.Debug("MSG from XAPP: %s", params.String())
968 c.UpdateCounter(cSubReqFromXapp)
970 if c.e2IfState.IsE2ConnectionUp(¶ms.Meid.RanName) == false {
971 xapp.Logger.Error("No E2 connection for ranName %v", params.Meid.RanName)
975 subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
977 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
981 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
983 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
986 defer trans.Release()
988 if err = c.tracker.Track(trans); err != nil {
989 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
993 //TODO handle subscription toward e2term inside AssignToSubscription / hide handleSubscriptionCreate in it?
994 subs, _, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c, true)
996 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
1000 c.wakeSubscriptionRequest(subs, trans)
1003 //-------------------------------------------------------------------
1004 // Wake Subscription Request to E2node
1005 //------------------------------------------------------------------
1006 func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) {
1008 e2SubscriptionDirectives, _ := c.GetE2SubscriptionDirectives(nil)
1009 subs.OngoingReqCount++
1010 go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives, waitRouteCleanup_ms)
1011 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
1012 subs.OngoingReqCount--
1015 switch themsg := event.(type) {
1016 case *e2ap.E2APSubscriptionResponse:
1017 themsg.RequestId.Id = trans.RequestId.Id
1018 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
1021 c.UpdateCounter(cSubRespToXapp)
1022 c.rmrSendToXapp("", subs, trans)
1025 case *e2ap.E2APSubscriptionFailure:
1026 themsg.RequestId.Id = trans.RequestId.Id
1027 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
1029 c.UpdateCounter(cSubFailToXapp)
1030 c.rmrSendToXapp("", subs, trans)
1036 xapp.Logger.Debug("XAPP-SubReq: failed %s", idstring(err, trans, subs))
1037 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
1040 //-------------------------------------------------------------------
1041 // handle from XAPP Subscription Delete Request
1042 //------------------------------------------------------------------
1043 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
1044 xapp.Logger.Debug("MSG from XAPP: %s", params.String())
1045 c.UpdateCounter(cSubDelReqFromXapp)
1047 if c.e2IfState.IsE2ConnectionUp(¶ms.Meid.RanName) == false {
1048 xapp.Logger.Error("No E2 connection for ranName %v", params.Meid.RanName)
1052 subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
1054 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
1058 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
1060 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
1063 defer trans.Release()
1065 err = c.tracker.Track(trans)
1067 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
1071 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
1073 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
1080 subs.OngoingDelCount++
1081 go c.handleSubscriptionDelete(subs, trans, waitRouteCleanup_ms)
1082 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
1083 subs.OngoingDelCount--
1085 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
1087 if subs.NoRespToXapp == true {
1088 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
1089 xapp.Logger.Debug("XAPP-SubDelReq: subs.NoRespToXapp == true")
1093 // Whatever is received success, fail or timeout, send successful delete response
1094 subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
1095 subDelRespMsg.RequestId.Id = trans.RequestId.Id
1096 subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
1097 subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
1098 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
1100 c.UpdateCounter(cSubDelRespToXapp)
1101 c.rmrSendToXapp("", subs, trans)
1104 //TODO handle subscription toward e2term insiged RemoveFromSubscription / hide handleSubscriptionDelete in it?
1105 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
1108 //-------------------------------------------------------------------
1109 // SUBS CREATE Handling
1110 //-------------------------------------------------------------------
1111 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp, e2SubscriptionDirectives *E2SubscriptionDirectives, waitRouteCleanupTime time.Duration) {
1113 var event interface{} = nil
1114 var removeSubscriptionFromDb bool = false
1115 trans := c.tracker.NewSubsTransaction(subs)
1116 subs.WaitTransactionTurn(trans)
1117 defer subs.ReleaseTransactionTurn(trans)
1118 defer trans.Release()
1120 xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
1122 subRfMsg, valid := subs.GetCachedResponse()
1123 if subRfMsg == nil && valid == true {
1124 event = c.sendE2TSubscriptionRequest(subs, trans, parentTrans, e2SubscriptionDirectives)
1125 switch event.(type) {
1126 case *e2ap.E2APSubscriptionResponse:
1127 subRfMsg, valid = subs.SetCachedResponse(event, true)
1128 subs.SubRespRcvd = true
1129 case *e2ap.E2APSubscriptionFailure:
1130 subRfMsg, valid = subs.SetCachedResponse(event, false)
1131 xapp.Logger.Debug("SUBS-SubReq: internal delete due failure event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1132 case *SubmgrRestartTestEvent:
1133 // This is used to simulate that no response has been received and after restart, subscriptions are restored from db
1134 xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
1135 subRfMsg, valid = subs.SetCachedResponse(event, false)
1136 parentTrans.SendEvent(subRfMsg, 0)
1138 case *PackSubscriptionRequestErrortEvent, *SDLWriteErrortEvent:
1139 subRfMsg, valid = subs.SetCachedResponse(event, false)
1142 if subs.PolicyUpdate == false {
1143 xapp.Logger.Debug("SUBS-SubReq: internal delete due default event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1144 subRfMsg, valid = subs.SetCachedResponse(nil, false)
1145 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1147 subRfMsg, valid = subs.SetCachedResponse(nil, true)
1150 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
1152 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
1155 removeSubscriptionFromDb = true
1158 err := c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
1161 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1164 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
1166 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanupTime, c)
1169 parentTrans.SendEvent(subRfMsg, 0)
1172 //-------------------------------------------------------------------
1173 // SUBS DELETE Handling
1174 //-------------------------------------------------------------------
1176 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp, waitRouteCleanupTime time.Duration) {
1178 trans := c.tracker.NewSubsTransaction(subs)
1179 subs.WaitTransactionTurn(trans)
1180 defer subs.ReleaseTransactionTurn(trans)
1181 defer trans.Release()
1183 xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
1187 if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
1190 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1194 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
1195 // If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if
1196 // RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...))
1197 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanupTime, c)
1198 parentTrans.SendEvent(nil, 0)
1201 //-------------------------------------------------------------------
1202 // send to E2T Subscription Request
1203 //-------------------------------------------------------------------
1204 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp, e2SubscriptionDirectives *E2SubscriptionDirectives) interface{} {
1206 var event interface{} = nil
1207 var timedOut bool = false
1208 const ricRequestorId = 123
1210 subReqMsg := subs.SubReqMsg
1211 subReqMsg.RequestId = subs.GetReqId().RequestId
1212 subReqMsg.RequestId.Id = ricRequestorId
1213 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
1215 xapp.Logger.Error("SUBS-SubReq ASN1 pack error: %s", idstring(err, trans, subs, parentTrans))
1216 return &PackSubscriptionRequestErrortEvent{
1218 ErrorSource: models.SubscriptionInstanceErrorSourceASN1,
1219 ErrorCause: err.Error(),
1224 // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
1225 err = c.WriteSubscriptionToDb(subs)
1227 return &SDLWriteErrortEvent{
1229 ErrorSource: models.SubscriptionInstanceErrorSourceDBAAS,
1230 ErrorCause: err.Error(),
1235 for retries := int64(0); retries < e2SubscriptionDirectives.E2MaxTryCount; retries++ {
1236 desc := fmt.Sprintf("(retry %d)", retries)
1238 c.UpdateCounter(cSubReqToE2)
1240 c.UpdateCounter(cSubReReqToE2)
1242 c.rmrSendToE2T(desc, subs, trans)
1243 if subs.DoNotWaitSubResp == false {
1244 event, timedOut = trans.WaitEvent(e2SubscriptionDirectives.E2TimeoutTimerValue)
1246 c.UpdateCounter(cSubReqTimerExpiry)
1250 // Simulating case where subscrition request has been sent but response has not been received before restart
1251 event = &SubmgrRestartTestEvent{}
1252 xapp.Logger.Debug("Restart event, DoNotWaitSubResp == true")
1256 xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1260 //-------------------------------------------------------------------
1261 // send to E2T Subscription Delete Request
1262 //-------------------------------------------------------------------
1264 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
1266 var event interface{}
1268 const ricRequestorId = 123
1270 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1271 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1272 subDelReqMsg.RequestId.Id = ricRequestorId
1273 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1274 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1276 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
1280 for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
1281 desc := fmt.Sprintf("(retry %d)", retries)
1283 c.UpdateCounter(cSubDelReqToE2)
1285 c.UpdateCounter(cSubDelReReqToE2)
1287 c.rmrSendToE2T(desc, subs, trans)
1288 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
1290 c.UpdateCounter(cSubDelReqTimerExpiry)
1295 xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1299 //-------------------------------------------------------------------
1300 // handle from E2T Subscription Response
1301 //-------------------------------------------------------------------
1302 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
1303 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1304 c.UpdateCounter(cSubRespFromE2)
1306 subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
1308 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
1311 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
1313 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
1316 trans := subs.GetTransaction()
1318 err = fmt.Errorf("Ongoing transaction not found")
1319 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
1322 xapp.Logger.Debug("SUBS-SubResp: Sending event, trans= %v", trans)
1323 sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
1324 if sendOk == false {
1325 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1326 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
1331 //-------------------------------------------------------------------
1332 // handle from E2T Subscription Failure
1333 //-------------------------------------------------------------------
1334 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
1335 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1336 c.UpdateCounter(cSubFailFromE2)
1337 subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
1339 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
1342 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
1344 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
1347 trans := subs.GetTransaction()
1349 err = fmt.Errorf("Ongoing transaction not found")
1350 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
1353 sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
1354 if sendOk == false {
1355 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1356 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
1361 //-------------------------------------------------------------------
1362 // handle from E2T Subscription Delete Response
1363 //-------------------------------------------------------------------
1364 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
1365 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1366 c.UpdateCounter(cSubDelRespFromE2)
1367 subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
1369 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1372 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
1374 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1377 trans := subs.GetTransaction()
1379 err = fmt.Errorf("Ongoing transaction not found")
1380 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
1383 sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
1384 if sendOk == false {
1385 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1386 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
1391 //-------------------------------------------------------------------
1392 // handle from E2T Subscription Delete Failure
1393 //-------------------------------------------------------------------
1394 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
1395 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1396 c.UpdateCounter(cSubDelFailFromE2)
1397 subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
1399 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1402 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
1404 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1407 trans := subs.GetTransaction()
1409 err = fmt.Errorf("Ongoing transaction not found")
1410 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
1413 sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
1414 if sendOk == false {
1415 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1416 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
1421 //-------------------------------------------------------------------
1423 //-------------------------------------------------------------------
1424 func typeofSubsMessage(v interface{}) string {
1429 //case *e2ap.E2APSubscriptionRequest:
1431 case *e2ap.E2APSubscriptionResponse:
1433 case *e2ap.E2APSubscriptionFailure:
1435 //case *e2ap.E2APSubscriptionDeleteRequest:
1436 // return "SubDelReq"
1437 case *e2ap.E2APSubscriptionDeleteResponse:
1439 case *e2ap.E2APSubscriptionDeleteFailure:
1446 //-------------------------------------------------------------------
1448 //-------------------------------------------------------------------
1449 func (c *Control) WriteSubscriptionToDb(subs *Subscription) error {
1450 xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
1451 err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
1453 xapp.Logger.Error("%v", err)
1459 //-------------------------------------------------------------------
1461 //-------------------------------------------------------------------
1462 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) error {
1464 if removeSubscriptionFromDb == true {
1465 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1466 c.RemoveSubscriptionFromDb(subs)
1468 // Update is needed for successful response and merge case here
1469 if subs.RetryFromXapp == false {
1470 err := c.WriteSubscriptionToDb(subs)
1474 subs.RetryFromXapp = false
1478 //-------------------------------------------------------------------
1480 //-------------------------------------------------------------------
1481 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
1482 xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
1483 err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
1485 xapp.Logger.Error("%v", err)
1489 //-------------------------------------------------------------------
1491 //-------------------------------------------------------------------
1492 func (c *Control) WriteRESTSubscriptionToDb(restSubId string, restSubs *RESTSubscription) {
1493 xapp.Logger.Debug("WriteRESTSubscriptionToDb() restSubId = %s", restSubId)
1494 err := c.WriteRESTSubscriptionToSdl(restSubId, restSubs)
1496 xapp.Logger.Error("%v", err)
1500 //-------------------------------------------------------------------
1502 //-------------------------------------------------------------------
1503 func (c *Control) UpdateRESTSubscriptionInDB(restSubId string, restSubs *RESTSubscription, removeRestSubscriptionFromDb bool) {
1505 if removeRestSubscriptionFromDb == true {
1506 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1507 c.RemoveRESTSubscriptionFromDb(restSubId)
1509 c.WriteRESTSubscriptionToDb(restSubId, restSubs)
1513 //-------------------------------------------------------------------
1515 //-------------------------------------------------------------------
1516 func (c *Control) RemoveRESTSubscriptionFromDb(restSubId string) {
1517 xapp.Logger.Debug("RemoveRESTSubscriptionFromDb() restSubId = %s", restSubId)
1518 err := c.RemoveRESTSubscriptionFromSdl(restSubId)
1520 xapp.Logger.Error("%v", err)
1524 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) {
1526 if c.UTTesting == true {
1527 // Reqistry mutex is not locked after real restart but it can be when restart is simulated in unit tests
1528 c.registry.mutex = new(sync.Mutex)
1531 const ricRequestorId = 123
1532 xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
1534 // Send delete for every endpoint in the subscription
1535 if subs.PolicyUpdate == false {
1536 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1537 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1538 subDelReqMsg.RequestId.Id = ricRequestorId
1539 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1540 mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1542 xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
1545 for _, endPoint := range subs.EpList.Endpoints {
1546 params := &xapp.RMRParams{}
1547 params.Mtype = mType
1548 params.SubId = int(subs.GetReqId().InstanceId)
1550 params.Meid = subs.Meid
1551 params.Src = endPoint.String()
1552 params.PayloadLen = len(payload.Buf)
1553 params.Payload = payload.Buf
1555 subs.DeleteFromDb = true
1556 c.handleXAPPSubscriptionDeleteRequest(params)
1561 func (c *Control) PrintRESTSubscriptionRequest(p *models.SubscriptionParams) {
1563 fmt.Println("CRESTSubscriptionRequest")
1569 if p.SubscriptionID != "" {
1570 fmt.Println(" SubscriptionID = ", p.SubscriptionID)
1572 fmt.Println(" SubscriptionID = ''")
1575 fmt.Printf(" ClientEndpoint.Host = %s\n", p.ClientEndpoint.Host)
1577 if p.ClientEndpoint.HTTPPort != nil {
1578 fmt.Printf(" ClientEndpoint.HTTPPort = %v\n", *p.ClientEndpoint.HTTPPort)
1580 fmt.Println(" ClientEndpoint.HTTPPort = nil")
1583 if p.ClientEndpoint.RMRPort != nil {
1584 fmt.Printf(" ClientEndpoint.RMRPort = %v\n", *p.ClientEndpoint.RMRPort)
1586 fmt.Println(" ClientEndpoint.RMRPort = nil")
1590 fmt.Printf(" Meid = %s\n", *p.Meid)
1592 fmt.Println(" Meid = nil")
1595 if p.E2SubscriptionDirectives == nil {
1596 fmt.Println(" E2SubscriptionDirectives = nil")
1598 fmt.Println(" E2SubscriptionDirectives")
1599 if p.E2SubscriptionDirectives.E2RetryCount == nil {
1600 fmt.Println(" E2RetryCount == nil")
1602 fmt.Printf(" E2RetryCount = %v\n", *p.E2SubscriptionDirectives.E2RetryCount)
1604 fmt.Printf(" E2TimeoutTimerValue = %v\n", p.E2SubscriptionDirectives.E2TimeoutTimerValue)
1605 fmt.Printf(" RMRRoutingNeeded = %v\n", p.E2SubscriptionDirectives.RMRRoutingNeeded)
1607 for _, subscriptionDetail := range p.SubscriptionDetails {
1608 if p.RANFunctionID != nil {
1609 fmt.Printf(" RANFunctionID = %v\n", *p.RANFunctionID)
1611 fmt.Println(" RANFunctionID = nil")
1613 fmt.Printf(" SubscriptionDetail.XappEventInstanceID = %v\n", *subscriptionDetail.XappEventInstanceID)
1614 fmt.Printf(" SubscriptionDetail.EventTriggers = %v\n", subscriptionDetail.EventTriggers)
1616 for _, actionToBeSetup := range subscriptionDetail.ActionToBeSetupList {
1617 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionID = %v\n", *actionToBeSetup.ActionID)
1618 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionType = %s\n", *actionToBeSetup.ActionType)
1619 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionDefinition = %v\n", actionToBeSetup.ActionDefinition)
1621 if actionToBeSetup.SubsequentAction != nil {
1622 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.SubsequentAction.SubsequentActionType = %s\n", *actionToBeSetup.SubsequentAction.SubsequentActionType)
1623 fmt.Printf(" SubscriptionDetail.ActionToBeSetup..SubsequentAction.TimeToWait = %s\n", *actionToBeSetup.SubsequentAction.TimeToWait)
1625 fmt.Println(" SubscriptionDetail.ActionToBeSetup.SubsequentAction = nil")