2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
31 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
32 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
33 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
34 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/common"
35 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
36 httptransport "github.com/go-openapi/runtime/client"
37 "github.com/go-openapi/strfmt"
38 "github.com/gorilla/mux"
39 "github.com/segmentio/ksuid"
40 "github.com/spf13/viper"
43 //-----------------------------------------------------------------------------
45 //-----------------------------------------------------------------------------
47 func idstring(err error, entries ...fmt.Stringer) string {
48 var retval string = ""
49 var filler string = ""
50 for _, entry := range entries {
52 retval += filler + entry.String()
55 retval += filler + "(NIL)"
59 retval += filler + "err(" + err.Error() + ")"
65 //-----------------------------------------------------------------------------
67 //-----------------------------------------------------------------------------
69 var e2tSubReqTimeout time.Duration
70 var e2tSubDelReqTime time.Duration
71 var e2tRecvMsgTimeout time.Duration
72 var waitRouteCleanup_ms time.Duration
73 var e2tMaxSubReqTryCount uint64 // Initial try + retry
74 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
75 var checkE2State string
76 var readSubsFromDb string
77 var dbRetryForever string
85 restDuplicateCtrl *DuplicateCtrl
87 e2IfStateDb XappRnibInterface
89 restSubsDb Sdlnterface
92 Counters map[string]xapp.Counter
103 type SubmgrRestartTestEvent struct{}
104 type SubmgrRestartUpEvent struct{}
105 type PackSubscriptionRequestErrortEvent struct {
109 func (p *PackSubscriptionRequestErrortEvent) SetEvent(errorInfo *ErrorInfo) {
110 p.ErrorInfo = *errorInfo
113 type SDLWriteErrortEvent struct {
117 func (s *SDLWriteErrortEvent) SetEvent(errorInfo *ErrorInfo) {
118 s.ErrorInfo = *errorInfo
122 xapp.Logger.Debug("SUBMGR")
124 viper.SetEnvPrefix("submgr")
125 viper.AllowEmptyEnv(true)
128 func NewControl() *Control {
130 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
131 rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
133 registry := new(Registry)
134 registry.Initialize()
135 registry.rtmgrClient = &rtmgrClient
137 tracker := new(Tracker)
140 restDuplicateCtrl := new(DuplicateCtrl)
141 restDuplicateCtrl.Init()
143 e2IfState := new(E2IfState)
145 c := &Control{e2ap: new(E2ap),
148 restDuplicateCtrl: restDuplicateCtrl,
149 e2IfState: e2IfState,
150 e2IfStateDb: CreateXappRnibIfInstance(),
151 e2SubsDb: CreateSdl(),
152 restSubsDb: CreateRESTSdl(),
153 Counters: xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
158 c.ReadConfigParameters("")
160 // Register REST handler for testing support
161 xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
162 xapp.Resource.InjectRoute("/ric/v1/restsubscriptions", c.GetAllRestSubscriptions, "GET")
163 xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET")
165 if readSubsFromDb == "true" {
166 // Read subscriptions from db
167 c.ReadE2Subscriptions()
168 c.ReadRESTSubscriptions()
171 go xapp.Subscription.Listen(c.RESTSubscriptionHandler, c.RESTQueryHandler, c.RESTSubscriptionDeleteHandler)
175 func (c *Control) SymptomDataHandler(w http.ResponseWriter, r *http.Request) {
176 subscriptions, _ := c.registry.QueryHandler()
177 xapp.Resource.SendSymptomDataJson(w, r, subscriptions, "platform/subscriptions.json")
180 //-------------------------------------------------------------------
182 //-------------------------------------------------------------------
183 func (c *Control) GetAllRestSubscriptions(w http.ResponseWriter, r *http.Request) {
184 xapp.Logger.Debug("GetAllRestSubscriptions() called")
185 response := c.registry.GetAllRestSubscriptions()
189 //-------------------------------------------------------------------
191 //-------------------------------------------------------------------
192 func (c *Control) ReadE2Subscriptions() error {
195 var register map[uint32]*Subscription
196 for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
197 xapp.Logger.Debug("Reading E2 subscriptions from db")
198 subIds, register, err = c.ReadAllSubscriptionsFromSdl()
200 xapp.Logger.Error("%v", err)
201 <-time.After(1 * time.Second)
203 c.registry.subIds = subIds
204 c.registry.register = register
205 go c.HandleUncompletedSubscriptions(register)
209 xapp.Logger.Debug("Continuing without retring")
213 //-------------------------------------------------------------------
215 //-------------------------------------------------------------------
216 func (c *Control) ReadRESTSubscriptions() error {
218 xapp.Logger.Debug("ReadRESTSubscriptions()")
220 var restSubscriptions map[string]*RESTSubscription
221 for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
222 xapp.Logger.Debug("Reading REST subscriptions from db")
223 restSubscriptions, err = c.ReadAllRESTSubscriptionsFromSdl()
225 xapp.Logger.Error("%v", err)
226 <-time.After(1 * time.Second)
228 // Fix REST subscriptions ongoing status after restart
229 for restSubId, restSubscription := range restSubscriptions {
230 restSubscription.SubReqOngoing = false
231 restSubscription.SubDelReqOngoing = false
232 c.WriteRESTSubscriptionToSdl(restSubId, restSubscription)
234 c.registry.restSubscriptions = restSubscriptions
238 xapp.Logger.Debug("Continuing without retring")
242 //-------------------------------------------------------------------
244 //-------------------------------------------------------------------
245 func (c *Control) ReadConfigParameters(f string) {
247 xapp.Logger.Debug("ReadConfigParameters")
249 c.LoggerLevel = int(xapp.Logger.GetLevel())
250 xapp.Logger.Info("LoggerLevel = %v", c.LoggerLevel)
251 c.e2ap.SetASN1DebugPrintStatus(c.LoggerLevel)
253 // viper.GetDuration returns nanoseconds
254 e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
255 if e2tSubReqTimeout == 0 {
256 e2tSubReqTimeout = 2000 * 1000000
257 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tSubReqTimeout")
259 xapp.Logger.Debug("e2tSubReqTimeout= %v", e2tSubReqTimeout)
261 e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
262 if e2tSubDelReqTime == 0 {
263 e2tSubDelReqTime = 2000 * 1000000
264 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tSubDelReqTime")
266 xapp.Logger.Debug("e2tSubDelReqTime= %v", e2tSubDelReqTime)
268 e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
269 if e2tRecvMsgTimeout == 0 {
270 e2tRecvMsgTimeout = 2000 * 1000000
271 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tRecvMsgTimeout")
273 xapp.Logger.Debug("e2tRecvMsgTimeout= %v", e2tRecvMsgTimeout)
275 e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
276 if e2tMaxSubReqTryCount == 0 {
277 e2tMaxSubReqTryCount = 1
278 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tMaxSubReqTryCount")
280 xapp.Logger.Debug("e2tMaxSubReqTryCount= %v", e2tMaxSubReqTryCount)
282 e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
283 if e2tMaxSubDelReqTryCount == 0 {
284 e2tMaxSubDelReqTryCount = 1
285 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tMaxSubDelReqTryCount")
287 xapp.Logger.Debug("e2tMaxSubDelReqTryCount= %v", e2tMaxSubDelReqTryCount)
289 checkE2State = viper.GetString("controls.checkE2State")
290 if checkE2State == "" {
291 checkE2State = "true"
292 xapp.Logger.Debug("WARNING: Using hard coded default value for checkE2State")
294 xapp.Logger.Debug("checkE2State= %v", checkE2State)
296 readSubsFromDb = viper.GetString("controls.readSubsFromDb")
297 if readSubsFromDb == "" {
298 readSubsFromDb = "true"
299 xapp.Logger.Debug("WARNING: Using hard coded default value for readSubsFromDb")
301 xapp.Logger.Debug("readSubsFromDb= %v", readSubsFromDb)
303 dbTryCount = viper.GetInt("controls.dbTryCount")
306 xapp.Logger.Debug("WARNING: Using hard coded default value for dbTryCount")
308 xapp.Logger.Debug("dbTryCount= %v", dbTryCount)
310 dbRetryForever = viper.GetString("controls.dbRetryForever")
311 if dbRetryForever == "" {
312 dbRetryForever = "true"
313 xapp.Logger.Debug("WARNING: Using hard coded default value for dbRetryForever")
315 xapp.Logger.Debug("dbRetryForever= %v", dbRetryForever)
317 // Internal cfg parameter, used to define a wait time for RMR route clean-up. None default
318 // value 100ms used currently only in unittests.
319 waitRouteCleanup_ms = viper.GetDuration("controls.waitRouteCleanup_ms") * 1000000
320 if waitRouteCleanup_ms == 0 {
321 waitRouteCleanup_ms = 5000 * 1000000
322 xapp.Logger.Debug("WARNING: Using hard coded default value for waitRouteCleanup_ms")
324 xapp.Logger.Debug("waitRouteCleanup= %v", waitRouteCleanup_ms)
327 //-------------------------------------------------------------------
329 //-------------------------------------------------------------------
330 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
332 xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
333 for subId, subs := range register {
334 if subs.SubRespRcvd == false {
335 // If policy subscription has already been made successfully unsuccessful update should not be deleted.
336 if subs.PolicyUpdate == false {
337 subs.NoRespToXapp = true
338 xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
339 c.SendSubscriptionDeleteReq(subs)
345 func (c *Control) ReadyCB(data interface{}) {
346 if c.RMRClient == nil {
347 c.RMRClient = xapp.Rmr
351 func (c *Control) Run() {
352 xapp.SetReadyCB(c.ReadyCB, nil)
353 xapp.AddConfigChangeListener(c.ReadConfigParameters)
357 //-------------------------------------------------------------------
359 //-------------------------------------------------------------------
360 func (c *Control) GetOrCreateRestSubscription(p *models.SubscriptionParams, md5sum string, xAppRmrEndpoint string) (*RESTSubscription, string, error) {
363 var restSubscription *RESTSubscription
366 prevRestSubsId, exists := c.restDuplicateCtrl.GetLastKnownRestSubsIdBasedOnMd5sum(md5sum)
367 if p.SubscriptionID == "" {
368 // Subscription does not contain REST subscription Id
370 restSubscription, err = c.registry.GetRESTSubscription(prevRestSubsId, false)
371 if restSubscription != nil {
372 // Subscription not found
373 restSubId = prevRestSubsId
375 xapp.Logger.Debug("Existing restSubId %s found by MD5sum %s for a request without subscription ID - using previous subscription", prevRestSubsId, md5sum)
377 xapp.Logger.Debug("Existing restSubId %s found by MD5sum %s for a request without subscription ID - Note: %s", prevRestSubsId, md5sum, err.Error())
380 xapp.Logger.Debug("None existing restSubId %s referred by MD5sum %s for a request without subscription ID - deleting cached entry", prevRestSubsId, md5sum)
381 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
385 if restSubscription == nil {
386 restSubId = ksuid.New().String()
387 restSubscription = c.registry.CreateRESTSubscription(&restSubId, &xAppRmrEndpoint, p.Meid)
390 // Subscription contains REST subscription Id
391 restSubId = p.SubscriptionID
393 xapp.Logger.Debug("RestSubscription ID %s provided via REST request", restSubId)
394 restSubscription, err = c.registry.GetRESTSubscription(restSubId, false)
396 // Subscription with id in REST request does not exist
397 xapp.Logger.Error("%s", err.Error())
398 c.UpdateCounter(cRestSubFailToXapp)
403 xapp.Logger.Debug("Existing restSubscription found for ID %s, new request based on md5sum", restSubId)
405 xapp.Logger.Debug("Existing restSubscription found for ID %s(%s), re-transmission based on md5sum match with previous request", prevRestSubsId, restSubId)
409 return restSubscription, restSubId, nil
412 //-------------------------------------------------------------------
414 //-------------------------------------------------------------------
415 func (c *Control) RESTSubscriptionHandler(params interface{}) (*models.SubscriptionResponse, int) {
418 c.UpdateCounter(cRestSubReqFromXapp)
420 subResp := models.SubscriptionResponse{}
421 p := params.(*models.SubscriptionParams)
423 if c.LoggerLevel > 2 {
424 c.PrintRESTSubscriptionRequest(p)
427 if c.e2IfState.IsE2ConnectionUp(p.Meid) == false {
428 xapp.Logger.Error("No E2 connection for ranName %v", *p.Meid)
429 c.UpdateCounter(cRestReqRejDueE2Down)
430 return nil, common.SubscribeServiceUnavailableCode
433 if p.ClientEndpoint == nil {
434 err := fmt.Errorf("ClientEndpoint == nil")
435 xapp.Logger.Error("%v", err)
436 c.UpdateCounter(cRestSubFailToXapp)
437 return nil, common.SubscribeBadRequestCode
440 _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*p.ClientEndpoint)
442 xapp.Logger.Error("%s", err.Error())
443 c.UpdateCounter(cRestSubFailToXapp)
444 return nil, common.SubscribeBadRequestCode
447 md5sum, err := CalculateRequestMd5sum(params)
449 xapp.Logger.Error("Failed to generate md5sum from incoming request - %s", err.Error())
452 restSubscription, restSubId, err := c.GetOrCreateRestSubscription(p, md5sum, xAppRmrEndpoint)
454 xapp.Logger.Error("Subscription with id in REST request does not exist")
455 return nil, common.SubscribeNotFoundCode
458 subResp.SubscriptionID = &restSubId
459 subReqList := e2ap.SubscriptionRequestList{}
460 err = c.e2ap.FillSubscriptionReqMsgs(params, &subReqList, restSubscription)
462 xapp.Logger.Error("%s", err.Error())
463 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
464 c.registry.DeleteRESTSubscription(&restSubId)
465 c.UpdateCounter(cRestSubFailToXapp)
466 return nil, common.SubscribeBadRequestCode
469 duplicate := c.restDuplicateCtrl.IsDuplicateToOngoingTransaction(restSubId, md5sum)
471 err := fmt.Errorf("Retransmission blocker direct ACK for request of restSubsId %s restSubId MD5sum %s as retransmission", restSubId, md5sum)
472 xapp.Logger.Debug("%s", err)
473 c.registry.DeleteRESTSubscription(&restSubId)
474 c.UpdateCounter(cRestSubRespToXapp)
475 return &subResp, common.SubscribeCreatedCode
478 c.WriteRESTSubscriptionToDb(restSubId, restSubscription)
479 e2SubscriptionDirectives, err := c.GetE2SubscriptionDirectives(p)
481 xapp.Logger.Error("%s", err)
482 c.registry.DeleteRESTSubscription(&restSubId)
483 return nil, common.SubscribeBadRequestCode
485 go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId, xAppRmrEndpoint, md5sum, e2SubscriptionDirectives)
487 c.UpdateCounter(cRestSubRespToXapp)
488 return &subResp, common.SubscribeCreatedCode
491 //-------------------------------------------------------------------
493 //-------------------------------------------------------------------
494 func (c *Control) GetE2SubscriptionDirectives(p *models.SubscriptionParams) (*E2SubscriptionDirectives, error) {
496 e2SubscriptionDirectives := &E2SubscriptionDirectives{}
497 if p == nil || p.E2SubscriptionDirectives == nil {
498 e2SubscriptionDirectives.E2TimeoutTimerValue = e2tSubReqTimeout
499 e2SubscriptionDirectives.E2MaxTryCount = int64(e2tMaxSubReqTryCount)
500 e2SubscriptionDirectives.CreateRMRRoute = true
501 xapp.Logger.Debug("p == nil || p.E2SubscriptionDirectives == nil. Using default values for E2TimeoutTimerValue = %v and E2RetryCount = %v RMRRoutingNeeded = true", e2tSubReqTimeout, e2tMaxSubReqTryCount)
503 if p.E2SubscriptionDirectives.E2TimeoutTimerValue >= 1 && p.E2SubscriptionDirectives.E2TimeoutTimerValue <= 10 {
504 e2SubscriptionDirectives.E2TimeoutTimerValue = time.Duration(p.E2SubscriptionDirectives.E2TimeoutTimerValue) * 1000000000 // Duration type cast returns nano seconds
506 return nil, fmt.Errorf("p.E2SubscriptionDirectives.E2TimeoutTimerValue out of range (1-10 seconds): %v", p.E2SubscriptionDirectives.E2TimeoutTimerValue)
508 if p.E2SubscriptionDirectives.E2RetryCount == nil {
509 xapp.Logger.Error("p.E2SubscriptionDirectives.E2RetryCount == nil. Using default value")
510 e2SubscriptionDirectives.E2MaxTryCount = int64(e2tMaxSubReqTryCount)
512 if *p.E2SubscriptionDirectives.E2RetryCount >= 0 && *p.E2SubscriptionDirectives.E2RetryCount <= 10 {
513 e2SubscriptionDirectives.E2MaxTryCount = *p.E2SubscriptionDirectives.E2RetryCount + 1 // E2MaxTryCount = First sending plus two retries
515 return nil, fmt.Errorf("p.E2SubscriptionDirectives.E2RetryCount out of range (0-10): %v", *p.E2SubscriptionDirectives.E2RetryCount)
518 e2SubscriptionDirectives.CreateRMRRoute = p.E2SubscriptionDirectives.RMRRoutingNeeded
520 xapp.Logger.Debug("e2SubscriptionDirectives.E2TimeoutTimerValue: %v", e2SubscriptionDirectives.E2TimeoutTimerValue)
521 xapp.Logger.Debug("e2SubscriptionDirectives.E2MaxTryCount: %v", e2SubscriptionDirectives.E2MaxTryCount)
522 xapp.Logger.Debug("e2SubscriptionDirectives.CreateRMRRoute: %v", e2SubscriptionDirectives.CreateRMRRoute)
523 return e2SubscriptionDirectives, nil
526 //-------------------------------------------------------------------
528 //-------------------------------------------------------------------
530 func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription, subReqList *e2ap.SubscriptionRequestList,
531 clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string, xAppRmrEndpoint string, md5sum string, e2SubscriptionDirectives *E2SubscriptionDirectives) {
533 c.SubscriptionProcessingStartDelay()
534 xapp.Logger.Debug("E2 SubscriptionRequest count = %v ", len(subReqList.E2APSubscriptionRequests))
536 var xAppEventInstanceID int64
537 var e2EventInstanceID int64
538 errorInfo := &ErrorInfo{}
540 defer c.restDuplicateCtrl.SetMd5sumFromLastOkRequest(*restSubId, md5sum)
542 for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ {
543 subReqMsg := subReqList.E2APSubscriptionRequests[index]
544 xAppEventInstanceID = (int64)(subReqMsg.RequestId.Id)
546 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), *restSubId, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid})
548 // Send notification to xApp that prosessing of a Subscription Request has failed.
549 err := fmt.Errorf("Tracking failure")
550 errorInfo.ErrorCause = err.Error()
551 c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans, errorInfo)
555 xapp.Logger.Debug("Handle SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
557 subRespMsg, errorInfo, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, *restSubId, e2SubscriptionDirectives)
559 xapp.Logger.Debug("Handled SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
563 if err.Error() == "TEST: restart event received" {
564 // This is just for UT cases. Stop here subscription processing
567 c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans, errorInfo)
569 e2EventInstanceID = (int64)(subRespMsg.RequestId.InstanceId)
570 restSubscription.AddMd5Sum(md5sum)
571 xapp.Logger.Debug("SubscriptionRequest index=%v processed successfullyfor %s. endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
572 index, *restSubId, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
573 c.sendSuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, e2EventInstanceID, clientEndpoint, trans)
578 //-------------------------------------------------------------------
580 //------------------------------------------------------------------
581 func (c *Control) SubscriptionProcessingStartDelay() {
582 if c.UTTesting == true {
583 // This is temporary fix for the UT problem that notification arrives before subscription response
584 // Correct fix would be to allow notification come before response and process it correctly
585 xapp.Logger.Debug("Setting 50 ms delay before starting processing Subscriptions")
586 <-time.After(time.Millisecond * 50)
587 xapp.Logger.Debug("Continuing after delay")
591 //-------------------------------------------------------------------
593 //------------------------------------------------------------------
594 func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, meid *string,
595 restSubId string, e2SubscriptionDirectives *E2SubscriptionDirectives) (*e2ap.E2APSubscriptionResponse, *ErrorInfo, error) {
597 errorInfo := ErrorInfo{}
599 err := c.tracker.Track(trans)
601 xapp.Logger.Error("XAPP-SubReq Tracking error: %s", idstring(err, trans))
602 errorInfo.ErrorCause = err.Error()
603 err = fmt.Errorf("Tracking failure")
604 return nil, &errorInfo, err
607 subs, errorInfo, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c, e2SubscriptionDirectives.CreateRMRRoute)
609 xapp.Logger.Error("XAPP-SubReq Assign error: %s", idstring(err, trans))
610 return nil, &errorInfo, err
616 subs.OngoingReqCount++
617 go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives, 0)
618 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
619 subs.OngoingReqCount--
623 switch themsg := event.(type) {
624 case *e2ap.E2APSubscriptionResponse:
626 if c.e2IfState.IsE2ConnectionUp(meid) == true {
627 return themsg, &errorInfo, nil
629 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
630 c.RemoveSubscriptionFromDb(subs)
631 err = fmt.Errorf("E2 interface down")
632 errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
634 case *e2ap.E2APSubscriptionFailure:
635 err = fmt.Errorf("E2 SubscriptionFailure received")
636 errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
637 case *PackSubscriptionRequestErrortEvent:
638 err = fmt.Errorf("E2 SubscriptionRequest pack failure")
639 errorInfo = themsg.ErrorInfo
640 case *SDLWriteErrortEvent:
641 err = fmt.Errorf("SDL write failure")
642 errorInfo = themsg.ErrorInfo
643 case *SubmgrRestartTestEvent:
644 err = fmt.Errorf("TEST: restart event received")
645 xapp.Logger.Debug("%s", err)
646 return nil, &errorInfo, err
648 err = fmt.Errorf("Unexpected E2 subscription response received")
649 errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
654 err = fmt.Errorf("E2 subscription response timeout")
655 errorInfo.SetInfo(err.Error(), "", models.SubscriptionInstanceTimeoutTypeE2Timeout)
656 if subs.PolicyUpdate == true {
657 return nil, &errorInfo, err
661 xapp.Logger.Error("XAPP-SubReq E2 subscription failed %s", idstring(err, trans, subs))
662 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
663 return nil, &errorInfo, err
666 //-------------------------------------------------------------------
668 //-------------------------------------------------------------------
669 func (c *Control) sendUnsuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, err error,
670 clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp, errorInfo *ErrorInfo) {
672 // Send notification to xApp that prosessing of a Subscription Request has failed.
673 e2EventInstanceID := (int64)(0)
674 if errorInfo.ErrorSource == "" {
675 // Submgr is default source of error
676 errorInfo.ErrorSource = models.SubscriptionInstanceErrorSourceSUBMGR
678 resp := &models.SubscriptionResponse{
679 SubscriptionID: restSubId,
680 SubscriptionInstances: []*models.SubscriptionInstance{
681 &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
682 ErrorCause: errorInfo.ErrorCause,
683 ErrorSource: errorInfo.ErrorSource,
684 TimeoutType: errorInfo.TimeoutType,
685 XappEventInstanceID: &xAppEventInstanceID},
688 // Mark REST subscription request processed.
689 restSubscription.SetProcessed(err)
690 c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
692 xapp.Logger.Debug("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
693 errorInfo.ErrorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
695 xapp.Logger.Debug("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v",
696 errorInfo.ErrorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID)
699 c.UpdateCounter(cRestSubFailNotifToXapp)
700 xapp.Subscription.Notify(resp, *clientEndpoint)
702 // E2 is down. Delete completely processed request safely now
703 if c.e2IfState.IsE2ConnectionUp(&restSubscription.Meid) == false && restSubscription.SubReqOngoing == false {
704 c.registry.DeleteRESTSubscription(restSubId)
705 c.RemoveRESTSubscriptionFromDb(*restSubId)
709 //-------------------------------------------------------------------
711 //-------------------------------------------------------------------
712 func (c *Control) sendSuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, e2EventInstanceID int64,
713 clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp) {
715 // Store successfully processed InstanceId for deletion
716 restSubscription.AddE2InstanceId((uint32)(e2EventInstanceID))
717 restSubscription.AddXappIdToE2Id(xAppEventInstanceID, e2EventInstanceID)
719 // Send notification to xApp that a Subscription Request has been processed.
720 resp := &models.SubscriptionResponse{
721 SubscriptionID: restSubId,
722 SubscriptionInstances: []*models.SubscriptionInstance{
723 &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
725 XappEventInstanceID: &xAppEventInstanceID},
728 // Mark REST subscription request processesd.
729 restSubscription.SetProcessed(nil)
730 c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
731 xapp.Logger.Debug("Sending successful REST notification to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
732 clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
734 c.UpdateCounter(cRestSubNotifToXapp)
735 xapp.Subscription.Notify(resp, *clientEndpoint)
737 // E2 is down. Delete completely processed request safely now
738 if c.e2IfState.IsE2ConnectionUp(&restSubscription.Meid) == false && restSubscription.SubReqOngoing == false {
739 c.registry.DeleteRESTSubscription(restSubId)
740 c.RemoveRESTSubscriptionFromDb(*restSubId)
744 //-------------------------------------------------------------------
746 //-------------------------------------------------------------------
747 func (c *Control) RESTSubscriptionDeleteHandler(restSubId string) int {
750 c.UpdateCounter(cRestSubDelReqFromXapp)
752 xapp.Logger.Debug("SubscriptionDeleteRequest from XAPP")
754 restSubscription, err := c.registry.GetRESTSubscription(restSubId, true)
756 xapp.Logger.Error("%s", err.Error())
757 if restSubscription == nil {
758 // Subscription was not found
759 c.UpdateCounter(cRestSubDelRespToXapp)
760 return common.UnsubscribeNoContentCode
762 if restSubscription.SubReqOngoing == true {
763 err := fmt.Errorf("Handling of the REST Subscription Request still ongoing %s", restSubId)
764 xapp.Logger.Error("%s", err.Error())
765 c.UpdateCounter(cRestSubDelFailToXapp)
766 return common.UnsubscribeBadRequestCode
767 } else if restSubscription.SubDelReqOngoing == true {
768 // Previous request for same restSubId still ongoing
769 c.UpdateCounter(cRestSubDelRespToXapp)
770 return common.UnsubscribeNoContentCode
775 xAppRmrEndPoint := restSubscription.xAppRmrEndPoint
777 xapp.Logger.Debug("Deleteting handler: processing instances = %v", restSubscription.InstanceIds)
778 for _, instanceId := range restSubscription.InstanceIds {
779 xAppEventInstanceID, err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId, 0)
782 xapp.Logger.Error("%s", err.Error())
784 xapp.Logger.Debug("Deleteting instanceId = %v", instanceId)
785 restSubscription.DeleteXappIdToE2Id(xAppEventInstanceID)
786 restSubscription.DeleteE2InstanceId(instanceId)
788 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(restSubscription.lastReqMd5sum)
789 c.registry.DeleteRESTSubscription(&restSubId)
790 c.RemoveRESTSubscriptionFromDb(restSubId)
793 c.UpdateCounter(cRestSubDelRespToXapp)
794 return common.UnsubscribeNoContentCode
797 //-------------------------------------------------------------------
799 //-------------------------------------------------------------------
800 func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32, waitRouteCleanupTime time.Duration) (int64, error) {
802 var xAppEventInstanceID int64
803 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{instanceId})
805 xapp.Logger.Debug("Subscription Delete Handler subscription for restSubId=%v, E2EventInstanceID=%v not found %s",
806 restSubId, instanceId, idstring(err, nil))
807 return xAppEventInstanceID, nil
810 xAppEventInstanceID = int64(subs.ReqId.Id)
811 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(*endPoint), *restSubId, e2ap.RequestId{subs.ReqId.Id, 0}, &xapp.RMRMeid{RanName: *meid})
813 err := fmt.Errorf("XAPP-SubDelReq transaction not created. restSubId %s, endPoint %s, meid %s, instanceId %v", *restSubId, *endPoint, *meid, instanceId)
814 xapp.Logger.Error("%s", err.Error())
816 defer trans.Release()
818 err = c.tracker.Track(trans)
820 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
821 xapp.Logger.Error("%s", err.Error())
822 return xAppEventInstanceID, &time.ParseError{}
827 subs.OngoingDelCount++
828 go c.handleSubscriptionDelete(subs, trans, waitRouteCleanupTime)
829 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
830 subs.OngoingDelCount--
832 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
834 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
836 return xAppEventInstanceID, nil
839 //-------------------------------------------------------------------
841 //-------------------------------------------------------------------
842 func (c *Control) RESTQueryHandler() (models.SubscriptionList, error) {
843 xapp.Logger.Debug("RESTQueryHandler() called")
847 return c.registry.QueryHandler()
850 func (c *Control) TestRestHandler(w http.ResponseWriter, r *http.Request) {
851 xapp.Logger.Debug("RESTTestRestHandler() called")
853 pathParams := mux.Vars(r)
854 s := pathParams["testId"]
856 // This can be used to delete single subscription from db
857 if contains := strings.Contains(s, "deletesubid="); contains == true {
858 var splits = strings.Split(s, "=")
859 if subId, err := strconv.ParseInt(splits[1], 10, 64); err == nil {
860 xapp.Logger.Debug("RemoveSubscriptionFromSdl() called. subId = %v", subId)
861 c.RemoveSubscriptionFromSdl(uint32(subId))
866 // This can be used to remove all subscriptions db from
868 xapp.Logger.Debug("RemoveAllSubscriptionsFromSdl() called")
869 c.RemoveAllSubscriptionsFromSdl()
870 c.RemoveAllRESTSubscriptionsFromSdl()
874 // This is meant to cause submgr's restart in testing
876 xapp.Logger.Debug("os.Exit(1) called")
880 xapp.Logger.Debug("Unsupported rest command received %s", s)
883 //-------------------------------------------------------------------
885 //-------------------------------------------------------------------
887 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
888 params := &xapp.RMRParams{}
889 params.Mtype = trans.GetMtype()
890 params.SubId = int(subs.GetReqId().InstanceId)
892 params.Meid = subs.GetMeid()
894 params.PayloadLen = len(trans.Payload.Buf)
895 params.Payload = trans.Payload.Buf
897 xapp.Logger.Debug("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
898 err = c.SendWithRetry(params, false, 5)
900 xapp.Logger.Error("rmrSendToE2T: Send failed: %+v", err)
905 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
907 params := &xapp.RMRParams{}
908 params.Mtype = trans.GetMtype()
909 params.SubId = int(subs.GetReqId().InstanceId)
910 params.Xid = trans.GetXid()
911 params.Meid = trans.GetMeid()
913 params.PayloadLen = len(trans.Payload.Buf)
914 params.Payload = trans.Payload.Buf
916 xapp.Logger.Debug("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
917 err = c.SendWithRetry(params, false, 5)
919 xapp.Logger.Error("rmrSendToXapp: Send failed: %+v", err)
924 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
925 if c.RMRClient == nil {
926 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
927 xapp.Logger.Error("%s", err.Error())
932 defer c.RMRClient.Free(msg.Mbuf)
934 // xapp-frame might use direct access to c buffer and
935 // when msg.Mbuf is freed, someone might take it into use
936 // and payload data might be invalid inside message handle function
938 // subscriptions won't load system a lot so there is no
939 // real performance hit by cloning buffer into new go byte slice
940 cPay := append(msg.Payload[:0:0], msg.Payload...)
942 msg.PayloadLen = len(cPay)
945 case xapp.RIC_SUB_REQ:
946 go c.handleXAPPSubscriptionRequest(msg)
947 case xapp.RIC_SUB_RESP:
948 go c.handleE2TSubscriptionResponse(msg)
949 case xapp.RIC_SUB_FAILURE:
950 go c.handleE2TSubscriptionFailure(msg)
951 case xapp.RIC_SUB_DEL_REQ:
952 go c.handleXAPPSubscriptionDeleteRequest(msg)
953 case xapp.RIC_SUB_DEL_RESP:
954 go c.handleE2TSubscriptionDeleteResponse(msg)
955 case xapp.RIC_SUB_DEL_FAILURE:
956 go c.handleE2TSubscriptionDeleteFailure(msg)
958 xapp.Logger.Debug("Unknown Message Type '%d', discarding", msg.Mtype)
963 //-------------------------------------------------------------------
964 // handle from XAPP Subscription Request
965 //------------------------------------------------------------------
966 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
967 xapp.Logger.Debug("MSG from XAPP: %s", params.String())
968 c.UpdateCounter(cSubReqFromXapp)
970 if c.e2IfState.IsE2ConnectionUp(¶ms.Meid.RanName) == false {
971 xapp.Logger.Error("No E2 connection for ranName %v", params.Meid.RanName)
975 subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
977 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
981 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
983 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
986 defer trans.Release()
988 if err = c.tracker.Track(trans); err != nil {
989 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
993 subs, _, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c, true)
995 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
999 c.wakeSubscriptionRequest(subs, trans)
1002 //-------------------------------------------------------------------
1003 // Wake Subscription Request to E2node
1004 //------------------------------------------------------------------
1005 func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) {
1007 e2SubscriptionDirectives, _ := c.GetE2SubscriptionDirectives(nil)
1008 subs.OngoingReqCount++
1009 go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives, waitRouteCleanup_ms)
1010 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
1011 subs.OngoingReqCount--
1014 switch themsg := event.(type) {
1015 case *e2ap.E2APSubscriptionResponse:
1016 themsg.RequestId.Id = trans.RequestId.Id
1017 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
1020 c.UpdateCounter(cSubRespToXapp)
1021 c.rmrSendToXapp("", subs, trans)
1024 case *e2ap.E2APSubscriptionFailure:
1025 themsg.RequestId.Id = trans.RequestId.Id
1026 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
1028 c.UpdateCounter(cSubFailToXapp)
1029 c.rmrSendToXapp("", subs, trans)
1035 xapp.Logger.Debug("XAPP-SubReq: failed %s", idstring(err, trans, subs))
1038 //-------------------------------------------------------------------
1039 // handle from XAPP Subscription Delete Request
1040 //------------------------------------------------------------------
1041 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
1042 xapp.Logger.Debug("MSG from XAPP: %s", params.String())
1043 c.UpdateCounter(cSubDelReqFromXapp)
1045 if c.e2IfState.IsE2ConnectionUp(¶ms.Meid.RanName) == false {
1046 xapp.Logger.Error("No E2 connection for ranName %v", params.Meid.RanName)
1050 subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
1052 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
1056 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
1058 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
1061 defer trans.Release()
1063 err = c.tracker.Track(trans)
1065 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
1069 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
1071 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
1078 subs.OngoingDelCount++
1079 go c.handleSubscriptionDelete(subs, trans, waitRouteCleanup_ms)
1080 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
1081 subs.OngoingDelCount--
1083 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
1085 if subs.NoRespToXapp == true {
1086 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
1087 xapp.Logger.Debug("XAPP-SubDelReq: subs.NoRespToXapp == true")
1091 // Whatever is received success, fail or timeout, send successful delete response
1092 subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
1093 subDelRespMsg.RequestId.Id = trans.RequestId.Id
1094 subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
1095 subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
1096 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
1098 c.UpdateCounter(cSubDelRespToXapp)
1099 c.rmrSendToXapp("", subs, trans)
1103 //-------------------------------------------------------------------
1104 // SUBS CREATE Handling
1105 //-------------------------------------------------------------------
1106 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp, e2SubscriptionDirectives *E2SubscriptionDirectives, waitRouteCleanupTime time.Duration) {
1108 var event interface{} = nil
1109 var removeSubscriptionFromDb bool = false
1110 trans := c.tracker.NewSubsTransaction(subs)
1111 subs.WaitTransactionTurn(trans)
1112 defer subs.ReleaseTransactionTurn(trans)
1113 defer trans.Release()
1115 xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
1117 subRfMsg, valid := subs.GetCachedResponse()
1118 if subRfMsg == nil && valid == true {
1119 event = c.sendE2TSubscriptionRequest(subs, trans, parentTrans, e2SubscriptionDirectives)
1120 switch event.(type) {
1121 case *e2ap.E2APSubscriptionResponse:
1122 subRfMsg, valid = subs.SetCachedResponse(event, true)
1123 subs.SubRespRcvd = true
1124 case *e2ap.E2APSubscriptionFailure:
1125 subRfMsg, valid = subs.SetCachedResponse(event, false)
1126 xapp.Logger.Debug("SUBS-SubReq: internal delete due failure event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1127 case *SubmgrRestartTestEvent:
1128 // This is used to simulate that no response has been received and after restart, subscriptions are restored from db
1129 xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
1130 subRfMsg, valid = subs.SetCachedResponse(event, false)
1131 parentTrans.SendEvent(subRfMsg, 0)
1133 case *PackSubscriptionRequestErrortEvent, *SDLWriteErrortEvent:
1134 subRfMsg, valid = subs.SetCachedResponse(event, false)
1137 if subs.PolicyUpdate == false {
1138 xapp.Logger.Debug("SUBS-SubReq: internal delete due default event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1139 subRfMsg, valid = subs.SetCachedResponse(nil, false)
1140 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1142 subRfMsg, valid = subs.SetCachedResponse(nil, true)
1145 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
1147 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
1150 removeSubscriptionFromDb = true
1153 err := c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
1156 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1159 // Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
1161 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanupTime, c)
1164 parentTrans.SendEvent(subRfMsg, 0)
1167 //-------------------------------------------------------------------
1168 // SUBS DELETE Handling
1169 //-------------------------------------------------------------------
1171 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp, waitRouteCleanupTime time.Duration) {
1173 trans := c.tracker.NewSubsTransaction(subs)
1174 subs.WaitTransactionTurn(trans)
1175 defer subs.ReleaseTransactionTurn(trans)
1176 defer trans.Release()
1178 xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
1182 if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
1185 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1190 // Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
1191 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanupTime, c)
1192 parentTrans.SendEvent(nil, 0)
1195 //-------------------------------------------------------------------
1196 // send to E2T Subscription Request
1197 //-------------------------------------------------------------------
1198 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp, e2SubscriptionDirectives *E2SubscriptionDirectives) interface{} {
1200 var event interface{} = nil
1201 var timedOut bool = false
1202 const ricRequestorId = 123
1204 subReqMsg := subs.SubReqMsg
1205 subReqMsg.RequestId = subs.GetReqId().RequestId
1206 subReqMsg.RequestId.Id = ricRequestorId
1207 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
1209 xapp.Logger.Error("SUBS-SubReq ASN1 pack error: %s", idstring(err, trans, subs, parentTrans))
1210 return &PackSubscriptionRequestErrortEvent{
1212 ErrorSource: models.SubscriptionInstanceErrorSourceASN1,
1213 ErrorCause: err.Error(),
1218 // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
1219 err = c.WriteSubscriptionToDb(subs)
1221 return &SDLWriteErrortEvent{
1223 ErrorSource: models.SubscriptionInstanceErrorSourceDBAAS,
1224 ErrorCause: err.Error(),
1229 for retries := int64(0); retries < e2SubscriptionDirectives.E2MaxTryCount; retries++ {
1230 desc := fmt.Sprintf("(retry %d)", retries)
1232 c.UpdateCounter(cSubReqToE2)
1234 c.UpdateCounter(cSubReReqToE2)
1236 c.rmrSendToE2T(desc, subs, trans)
1237 if subs.DoNotWaitSubResp == false {
1238 event, timedOut = trans.WaitEvent(e2SubscriptionDirectives.E2TimeoutTimerValue)
1240 c.UpdateCounter(cSubReqTimerExpiry)
1244 // Simulating case where subscrition request has been sent but response has not been received before restart
1245 event = &SubmgrRestartTestEvent{}
1246 xapp.Logger.Debug("Restart event, DoNotWaitSubResp == true")
1250 xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1254 //-------------------------------------------------------------------
1255 // send to E2T Subscription Delete Request
1256 //-------------------------------------------------------------------
1258 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
1260 var event interface{}
1262 const ricRequestorId = 123
1264 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1265 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1266 subDelReqMsg.RequestId.Id = ricRequestorId
1267 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1268 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1270 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
1274 for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
1275 desc := fmt.Sprintf("(retry %d)", retries)
1277 c.UpdateCounter(cSubDelReqToE2)
1279 c.UpdateCounter(cSubDelReReqToE2)
1281 c.rmrSendToE2T(desc, subs, trans)
1282 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
1284 c.UpdateCounter(cSubDelReqTimerExpiry)
1289 xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1293 //-------------------------------------------------------------------
1294 // handle from E2T Subscription Response
1295 //-------------------------------------------------------------------
1296 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
1297 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1298 c.UpdateCounter(cSubRespFromE2)
1300 subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
1302 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
1305 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
1307 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
1310 trans := subs.GetTransaction()
1312 err = fmt.Errorf("Ongoing transaction not found")
1313 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
1316 xapp.Logger.Debug("SUBS-SubResp: Sending event, trans= %v", trans)
1317 sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
1318 if sendOk == false {
1319 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1320 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
1325 //-------------------------------------------------------------------
1326 // handle from E2T Subscription Failure
1327 //-------------------------------------------------------------------
1328 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
1329 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1330 c.UpdateCounter(cSubFailFromE2)
1331 subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
1333 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
1336 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
1338 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
1341 trans := subs.GetTransaction()
1343 err = fmt.Errorf("Ongoing transaction not found")
1344 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
1347 sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
1348 if sendOk == false {
1349 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1350 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
1355 //-------------------------------------------------------------------
1356 // handle from E2T Subscription Delete Response
1357 //-------------------------------------------------------------------
1358 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
1359 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1360 c.UpdateCounter(cSubDelRespFromE2)
1361 subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
1363 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1366 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
1368 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1371 trans := subs.GetTransaction()
1373 err = fmt.Errorf("Ongoing transaction not found")
1374 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
1377 sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
1378 if sendOk == false {
1379 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1380 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
1385 //-------------------------------------------------------------------
1386 // handle from E2T Subscription Delete Failure
1387 //-------------------------------------------------------------------
1388 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
1389 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1390 c.UpdateCounter(cSubDelFailFromE2)
1391 subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
1393 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1396 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
1398 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1401 trans := subs.GetTransaction()
1403 err = fmt.Errorf("Ongoing transaction not found")
1404 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
1407 sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
1408 if sendOk == false {
1409 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1410 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
1415 //-------------------------------------------------------------------
1417 //-------------------------------------------------------------------
1418 func typeofSubsMessage(v interface{}) string {
1423 //case *e2ap.E2APSubscriptionRequest:
1425 case *e2ap.E2APSubscriptionResponse:
1427 case *e2ap.E2APSubscriptionFailure:
1429 //case *e2ap.E2APSubscriptionDeleteRequest:
1430 // return "SubDelReq"
1431 case *e2ap.E2APSubscriptionDeleteResponse:
1433 case *e2ap.E2APSubscriptionDeleteFailure:
1440 //-------------------------------------------------------------------
1442 //-------------------------------------------------------------------
1443 func (c *Control) WriteSubscriptionToDb(subs *Subscription) error {
1444 xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
1445 err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
1447 xapp.Logger.Error("%v", err)
1453 //-------------------------------------------------------------------
1455 //-------------------------------------------------------------------
1456 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) error {
1458 if removeSubscriptionFromDb == true {
1459 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1460 c.RemoveSubscriptionFromDb(subs)
1462 // Update is needed for successful response and merge case here
1463 if subs.RetryFromXapp == false {
1464 err := c.WriteSubscriptionToDb(subs)
1468 subs.RetryFromXapp = false
1472 //-------------------------------------------------------------------
1474 //-------------------------------------------------------------------
1475 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
1476 xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
1477 err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
1479 xapp.Logger.Error("%v", err)
1483 //-------------------------------------------------------------------
1485 //-------------------------------------------------------------------
1486 func (c *Control) WriteRESTSubscriptionToDb(restSubId string, restSubs *RESTSubscription) {
1487 xapp.Logger.Debug("WriteRESTSubscriptionToDb() restSubId = %s", restSubId)
1488 err := c.WriteRESTSubscriptionToSdl(restSubId, restSubs)
1490 xapp.Logger.Error("%v", err)
1494 //-------------------------------------------------------------------
1496 //-------------------------------------------------------------------
1497 func (c *Control) UpdateRESTSubscriptionInDB(restSubId string, restSubs *RESTSubscription, removeRestSubscriptionFromDb bool) {
1499 if removeRestSubscriptionFromDb == true {
1500 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1501 c.RemoveRESTSubscriptionFromDb(restSubId)
1503 c.WriteRESTSubscriptionToDb(restSubId, restSubs)
1507 //-------------------------------------------------------------------
1509 //-------------------------------------------------------------------
1510 func (c *Control) RemoveRESTSubscriptionFromDb(restSubId string) {
1511 xapp.Logger.Debug("RemoveRESTSubscriptionFromDb() restSubId = %s", restSubId)
1512 err := c.RemoveRESTSubscriptionFromSdl(restSubId)
1514 xapp.Logger.Error("%v", err)
1518 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) {
1520 if c.UTTesting == true {
1521 // Reqistry mutex is not locked after real restart but it can be when restart is simulated in unit tests
1522 c.registry.mutex = new(sync.Mutex)
1525 const ricRequestorId = 123
1526 xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
1528 // Send delete for every endpoint in the subscription
1529 if subs.PolicyUpdate == false {
1530 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1531 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1532 subDelReqMsg.RequestId.Id = ricRequestorId
1533 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1534 mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1536 xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
1539 for _, endPoint := range subs.EpList.Endpoints {
1540 params := &xapp.RMRParams{}
1541 params.Mtype = mType
1542 params.SubId = int(subs.GetReqId().InstanceId)
1544 params.Meid = subs.Meid
1545 params.Src = endPoint.String()
1546 params.PayloadLen = len(payload.Buf)
1547 params.Payload = payload.Buf
1549 subs.DeleteFromDb = true
1550 c.handleXAPPSubscriptionDeleteRequest(params)
1555 func (c *Control) PrintRESTSubscriptionRequest(p *models.SubscriptionParams) {
1557 fmt.Println("CRESTSubscriptionRequest")
1563 if p.SubscriptionID != "" {
1564 fmt.Println(" SubscriptionID = ", p.SubscriptionID)
1566 fmt.Println(" SubscriptionID = ''")
1569 fmt.Printf(" ClientEndpoint.Host = %s\n", p.ClientEndpoint.Host)
1571 if p.ClientEndpoint.HTTPPort != nil {
1572 fmt.Printf(" ClientEndpoint.HTTPPort = %v\n", *p.ClientEndpoint.HTTPPort)
1574 fmt.Println(" ClientEndpoint.HTTPPort = nil")
1577 if p.ClientEndpoint.RMRPort != nil {
1578 fmt.Printf(" ClientEndpoint.RMRPort = %v\n", *p.ClientEndpoint.RMRPort)
1580 fmt.Println(" ClientEndpoint.RMRPort = nil")
1584 fmt.Printf(" Meid = %s\n", *p.Meid)
1586 fmt.Println(" Meid = nil")
1589 if p.E2SubscriptionDirectives == nil {
1590 fmt.Println(" E2SubscriptionDirectives = nil")
1592 fmt.Println(" E2SubscriptionDirectives")
1593 if p.E2SubscriptionDirectives.E2RetryCount == nil {
1594 fmt.Println(" E2RetryCount == nil")
1596 fmt.Printf(" E2RetryCount = %v\n", *p.E2SubscriptionDirectives.E2RetryCount)
1598 fmt.Printf(" E2TimeoutTimerValue = %v\n", p.E2SubscriptionDirectives.E2TimeoutTimerValue)
1599 fmt.Printf(" RMRRoutingNeeded = %v\n", p.E2SubscriptionDirectives.RMRRoutingNeeded)
1601 for _, subscriptionDetail := range p.SubscriptionDetails {
1602 if p.RANFunctionID != nil {
1603 fmt.Printf(" RANFunctionID = %v\n", *p.RANFunctionID)
1605 fmt.Println(" RANFunctionID = nil")
1607 fmt.Printf(" SubscriptionDetail.XappEventInstanceID = %v\n", *subscriptionDetail.XappEventInstanceID)
1608 fmt.Printf(" SubscriptionDetail.EventTriggers = %v\n", subscriptionDetail.EventTriggers)
1610 for _, actionToBeSetup := range subscriptionDetail.ActionToBeSetupList {
1611 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionID = %v\n", *actionToBeSetup.ActionID)
1612 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionType = %s\n", *actionToBeSetup.ActionType)
1613 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionDefinition = %v\n", actionToBeSetup.ActionDefinition)
1615 if actionToBeSetup.SubsequentAction != nil {
1616 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.SubsequentAction.SubsequentActionType = %s\n", *actionToBeSetup.SubsequentAction.SubsequentActionType)
1617 fmt.Printf(" SubscriptionDetail.ActionToBeSetup..SubsequentAction.TimeToWait = %s\n", *actionToBeSetup.SubsequentAction.TimeToWait)
1619 fmt.Println(" SubscriptionDetail.ActionToBeSetup.SubsequentAction = nil")