2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
30 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
31 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
32 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
33 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/common"
34 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
35 httptransport "github.com/go-openapi/runtime/client"
36 "github.com/go-openapi/strfmt"
37 "github.com/gorilla/mux"
38 "github.com/segmentio/ksuid"
39 "github.com/spf13/viper"
42 //-----------------------------------------------------------------------------
44 //-----------------------------------------------------------------------------
46 func idstring(err error, entries ...fmt.Stringer) string {
47 var retval string = ""
48 var filler string = ""
49 for _, entry := range entries {
51 retval += filler + entry.String()
54 retval += filler + "(NIL)"
58 retval += filler + "err(" + err.Error() + ")"
64 //-----------------------------------------------------------------------------
66 //-----------------------------------------------------------------------------
68 var e2tSubReqTimeout time.Duration
69 var e2tSubDelReqTime time.Duration
70 var e2tRecvMsgTimeout time.Duration
71 var waitRouteCleanup_ms time.Duration
72 var e2tMaxSubReqTryCount uint64 // Initial try + retry
73 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
74 var checkE2State string
75 var readSubsFromDb string
76 var dbRetryForever string
84 restDuplicateCtrl *DuplicateCtrl
86 e2IfStateDb XappRnibInterface
88 restSubsDb Sdlnterface
91 Counters map[string]xapp.Counter
102 type SubmgrRestartTestEvent struct{}
103 type SubmgrRestartUpEvent struct{}
104 type PackSubscriptionRequestErrortEvent struct {
108 func (p *PackSubscriptionRequestErrortEvent) SetEvent(errorInfo *ErrorInfo) {
109 p.ErrorInfo = *errorInfo
112 type SDLWriteErrortEvent struct {
116 func (s *SDLWriteErrortEvent) SetEvent(errorInfo *ErrorInfo) {
117 s.ErrorInfo = *errorInfo
121 xapp.Logger.Debug("SUBMGR")
123 viper.SetEnvPrefix("submgr")
124 viper.AllowEmptyEnv(true)
127 func NewControl() *Control {
129 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
130 rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
132 registry := new(Registry)
133 registry.Initialize()
134 registry.rtmgrClient = &rtmgrClient
136 tracker := new(Tracker)
139 restDuplicateCtrl := new(DuplicateCtrl)
140 restDuplicateCtrl.Init()
142 e2IfState := new(E2IfState)
144 c := &Control{e2ap: new(E2ap),
147 restDuplicateCtrl: restDuplicateCtrl,
148 e2IfState: e2IfState,
149 e2IfStateDb: CreateXappRnibIfInstance(),
150 e2SubsDb: CreateSdl(),
151 restSubsDb: CreateRESTSdl(),
152 Counters: xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
157 c.ReadConfigParameters("")
159 // Register REST handler for testing support
160 xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
161 xapp.Resource.InjectRoute("/ric/v1/restsubscriptions", c.GetAllRestSubscriptions, "GET")
162 xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET")
164 if readSubsFromDb == "false" {
168 // Read subscriptions from db
169 c.ReadE2Subscriptions()
170 c.ReadRESTSubscriptions()
172 go xapp.Subscription.Listen(c.RESTSubscriptionHandler, c.RESTQueryHandler, c.RESTSubscriptionDeleteHandler)
177 func (c *Control) SymptomDataHandler(w http.ResponseWriter, r *http.Request) {
178 subscriptions, _ := c.registry.QueryHandler()
179 xapp.Resource.SendSymptomDataJson(w, r, subscriptions, "platform/subscriptions.json")
182 //-------------------------------------------------------------------
184 //-------------------------------------------------------------------
185 func (c *Control) GetAllRestSubscriptions(w http.ResponseWriter, r *http.Request) {
186 xapp.Logger.Debug("GetAllRestSubscriptions() called")
187 response := c.registry.GetAllRestSubscriptions()
191 //-------------------------------------------------------------------
193 //-------------------------------------------------------------------
194 func (c *Control) ReadE2Subscriptions() error {
197 var register map[uint32]*Subscription
198 for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
199 xapp.Logger.Debug("Reading E2 subscriptions from db")
200 subIds, register, err = c.ReadAllSubscriptionsFromSdl()
202 xapp.Logger.Error("%v", err)
203 <-time.After(1 * time.Second)
205 c.registry.subIds = subIds
206 c.registry.register = register
207 c.HandleUncompletedSubscriptions(register)
211 xapp.Logger.Debug("Continuing without retring")
215 //-------------------------------------------------------------------
217 //-------------------------------------------------------------------
218 func (c *Control) ReadRESTSubscriptions() error {
220 var restSubscriptions map[string]*RESTSubscription
221 for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
222 xapp.Logger.Debug("Reading REST subscriptions from db")
223 restSubscriptions, err = c.ReadAllRESTSubscriptionsFromSdl()
225 xapp.Logger.Error("%v", err)
226 <-time.After(1 * time.Second)
228 c.registry.restSubscriptions = restSubscriptions
232 xapp.Logger.Debug("Continuing without retring")
236 //-------------------------------------------------------------------
238 //-------------------------------------------------------------------
239 func (c *Control) ReadConfigParameters(f string) {
241 xapp.Logger.Debug("ReadConfigParameters")
243 c.LoggerLevel = int(xapp.Logger.GetLevel())
244 xapp.Logger.Debug("LoggerLevel= %v", c.LoggerLevel)
246 // viper.GetDuration returns nanoseconds
247 e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
248 if e2tSubReqTimeout == 0 {
249 e2tSubReqTimeout = 2000 * 1000000
250 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tSubReqTimeout")
252 xapp.Logger.Debug("e2tSubReqTimeout= %v", e2tSubReqTimeout)
254 e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
255 if e2tSubDelReqTime == 0 {
256 e2tSubDelReqTime = 2000 * 1000000
257 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tSubDelReqTime")
259 xapp.Logger.Debug("e2tSubDelReqTime= %v", e2tSubDelReqTime)
261 e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
262 if e2tRecvMsgTimeout == 0 {
263 e2tRecvMsgTimeout = 2000 * 1000000
264 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tRecvMsgTimeout")
266 xapp.Logger.Debug("e2tRecvMsgTimeout= %v", e2tRecvMsgTimeout)
268 e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
269 if e2tMaxSubReqTryCount == 0 {
270 e2tMaxSubReqTryCount = 1
271 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tMaxSubReqTryCount")
273 xapp.Logger.Debug("e2tMaxSubReqTryCount= %v", e2tMaxSubReqTryCount)
275 e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
276 if e2tMaxSubDelReqTryCount == 0 {
277 e2tMaxSubDelReqTryCount = 1
278 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tMaxSubDelReqTryCount")
280 xapp.Logger.Debug("e2tMaxSubDelReqTryCount= %v", e2tMaxSubDelReqTryCount)
282 checkE2State = viper.GetString("controls.checkE2State")
283 if checkE2State == "" {
284 checkE2State = "true"
285 xapp.Logger.Debug("WARNING: Using hard coded default value for checkE2State")
287 xapp.Logger.Debug("checkE2State= %v", checkE2State)
289 readSubsFromDb = viper.GetString("controls.readSubsFromDb")
290 if readSubsFromDb == "" {
291 readSubsFromDb = "true"
292 xapp.Logger.Debug("WARNING: Using hard coded default value for readSubsFromDb")
294 xapp.Logger.Debug("readSubsFromDb= %v", readSubsFromDb)
296 dbTryCount = viper.GetInt("controls.dbTryCount")
299 xapp.Logger.Debug("WARNING: Using hard coded default value for dbTryCount")
301 xapp.Logger.Debug("dbTryCount= %v", dbTryCount)
303 dbRetryForever = viper.GetString("controls.dbRetryForever")
304 if dbRetryForever == "" {
305 dbRetryForever = "true"
306 xapp.Logger.Debug("WARNING: Using hard coded default value for dbRetryForever")
308 xapp.Logger.Debug("dbRetryForever= %v", dbRetryForever)
310 // Internal cfg parameter, used to define a wait time for RMR route clean-up. None default
311 // value 100ms used currently only in unittests.
312 waitRouteCleanup_ms = viper.GetDuration("controls.waitRouteCleanup_ms") * 1000000
313 if waitRouteCleanup_ms == 0 {
314 waitRouteCleanup_ms = 5000 * 1000000
315 xapp.Logger.Debug("WARNING: Using hard coded default value for waitRouteCleanup_ms")
317 xapp.Logger.Debug("waitRouteCleanup= %v", waitRouteCleanup_ms)
320 //-------------------------------------------------------------------
322 //-------------------------------------------------------------------
323 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
325 xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
326 for subId, subs := range register {
327 if subs.SubRespRcvd == false {
328 // If policy subscription has already been made successfully unsuccessful update should not be deleted.
329 if subs.PolicyUpdate == false {
330 subs.NoRespToXapp = true
331 xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
332 c.SendSubscriptionDeleteReq(subs)
338 func (c *Control) ReadyCB(data interface{}) {
339 if c.RMRClient == nil {
340 c.RMRClient = xapp.Rmr
344 func (c *Control) Run() {
345 xapp.SetReadyCB(c.ReadyCB, nil)
346 xapp.AddConfigChangeListener(c.ReadConfigParameters)
350 //-------------------------------------------------------------------
352 //-------------------------------------------------------------------
353 func (c *Control) GetOrCreateRestSubscription(p *models.SubscriptionParams, md5sum string, xAppRmrEndpoint string) (*RESTSubscription, string, error) {
356 var restSubscription *RESTSubscription
359 prevRestSubsId, exists := c.restDuplicateCtrl.GetLastKnownRestSubsIdBasedOnMd5sum(md5sum)
360 if p.SubscriptionID == "" {
361 // Subscription does not contain REST subscription Id
363 restSubscription, err = c.registry.GetRESTSubscription(prevRestSubsId, false)
364 if restSubscription != nil {
365 // Subscription not found
366 restSubId = prevRestSubsId
368 xapp.Logger.Debug("Existing restSubId %s found by MD5sum %s for a request without subscription ID - using previous subscription", prevRestSubsId, md5sum)
370 xapp.Logger.Debug("Existing restSubId %s found by MD5sum %s for a request without subscription ID - Note: %s", prevRestSubsId, md5sum, err.Error())
373 xapp.Logger.Debug("None existing restSubId %s referred by MD5sum %s for a request without subscription ID - deleting cached entry", prevRestSubsId, md5sum)
374 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
378 if restSubscription == nil {
379 restSubId = ksuid.New().String()
380 restSubscription = c.registry.CreateRESTSubscription(&restSubId, &xAppRmrEndpoint, p.Meid)
383 // Subscription contains REST subscription Id
384 restSubId = p.SubscriptionID
386 xapp.Logger.Debug("RestSubscription ID %s provided via REST request", restSubId)
387 restSubscription, err = c.registry.GetRESTSubscription(restSubId, false)
389 // Subscription with id in REST request does not exist
390 xapp.Logger.Error("%s", err.Error())
391 c.UpdateCounter(cRestSubFailToXapp)
396 xapp.Logger.Debug("Existing restSubscription found for ID %s, new request based on md5sum", restSubId)
398 xapp.Logger.Debug("Existing restSubscription found for ID %s(%s), re-transmission based on md5sum match with previous request", prevRestSubsId, restSubId)
402 return restSubscription, restSubId, nil
405 //-------------------------------------------------------------------
407 //-------------------------------------------------------------------
408 func (c *Control) RESTSubscriptionHandler(params interface{}) (*models.SubscriptionResponse, int) {
411 c.UpdateCounter(cRestSubReqFromXapp)
413 subResp := models.SubscriptionResponse{}
414 p := params.(*models.SubscriptionParams)
416 if c.LoggerLevel > 2 {
417 c.PrintRESTSubscriptionRequest(p)
420 if c.e2IfState.IsE2ConnectionUp(p.Meid) == false {
421 xapp.Logger.Error("No E2 connection for ranName %v", *p.Meid)
422 c.UpdateCounter(cRestReqRejDueE2Down)
423 return nil, common.SubscribeServiceUnavailableCode
426 if p.ClientEndpoint == nil {
427 err := fmt.Errorf("ClientEndpoint == nil")
428 xapp.Logger.Error("%v", err)
429 c.UpdateCounter(cRestSubFailToXapp)
430 return nil, common.SubscribeBadRequestCode
433 _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*p.ClientEndpoint)
435 xapp.Logger.Error("%s", err.Error())
436 c.UpdateCounter(cRestSubFailToXapp)
437 return nil, common.SubscribeBadRequestCode
440 md5sum, err := CalculateRequestMd5sum(params)
442 xapp.Logger.Error("Failed to generate md5sum from incoming request - %s", err.Error())
445 restSubscription, restSubId, err := c.GetOrCreateRestSubscription(p, md5sum, xAppRmrEndpoint)
447 xapp.Logger.Error("Subscription with id in REST request does not exist")
448 return nil, common.SubscribeNotFoundCode
451 subResp.SubscriptionID = &restSubId
452 subReqList := e2ap.SubscriptionRequestList{}
453 err = c.e2ap.FillSubscriptionReqMsgs(params, &subReqList, restSubscription)
455 xapp.Logger.Error("%s", err.Error())
456 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
457 c.registry.DeleteRESTSubscription(&restSubId)
458 c.UpdateCounter(cRestSubFailToXapp)
459 return nil, common.SubscribeBadRequestCode
462 duplicate := c.restDuplicateCtrl.IsDuplicateToOngoingTransaction(restSubId, md5sum)
464 err := fmt.Errorf("Retransmission blocker direct ACK for request of restSubsId %s restSubId MD5sum %s as retransmission", restSubId, md5sum)
465 xapp.Logger.Debug("%s", err)
466 c.UpdateCounter(cRestSubRespToXapp)
467 return &subResp, common.SubscribeCreatedCode
470 c.WriteRESTSubscriptionToDb(restSubId, restSubscription)
471 e2SubscriptionDirectives, err := c.GetE2SubscriptionDirectives(p)
473 xapp.Logger.Error("%s", err)
474 return nil, common.SubscribeBadRequestCode
476 go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId, xAppRmrEndpoint, md5sum, e2SubscriptionDirectives)
478 c.UpdateCounter(cRestSubRespToXapp)
479 return &subResp, common.SubscribeCreatedCode
482 //-------------------------------------------------------------------
484 //-------------------------------------------------------------------
485 func (c *Control) GetE2SubscriptionDirectives(p *models.SubscriptionParams) (*E2SubscriptionDirectives, error) {
487 e2SubscriptionDirectives := &E2SubscriptionDirectives{}
488 if p == nil || p.E2SubscriptionDirectives == nil {
489 e2SubscriptionDirectives.E2TimeoutTimerValue = e2tSubReqTimeout
490 e2SubscriptionDirectives.E2MaxTryCount = int64(e2tMaxSubReqTryCount)
491 e2SubscriptionDirectives.CreateRMRRoute = true
492 xapp.Logger.Debug("p == nil || p.E2SubscriptionDirectives == nil. Using default values for E2TimeoutTimerValue = %v and E2RetryCount = %v RMRRoutingNeeded = true", e2tSubReqTimeout, e2tMaxSubReqTryCount)
494 if p.E2SubscriptionDirectives.E2TimeoutTimerValue >= 1 && p.E2SubscriptionDirectives.E2TimeoutTimerValue <= 10 {
495 e2SubscriptionDirectives.E2TimeoutTimerValue = time.Duration(p.E2SubscriptionDirectives.E2TimeoutTimerValue) * 1000000000 // Duration type cast returns nano seconds
497 return nil, fmt.Errorf("p.E2SubscriptionDirectives.E2TimeoutTimerValue out of range (1-10 seconds): %v", p.E2SubscriptionDirectives.E2TimeoutTimerValue)
499 if p.E2SubscriptionDirectives.E2RetryCount == nil {
500 xapp.Logger.Error("p.E2SubscriptionDirectives.E2RetryCount == nil. Using default value")
501 e2SubscriptionDirectives.E2MaxTryCount = int64(e2tMaxSubReqTryCount)
503 if *p.E2SubscriptionDirectives.E2RetryCount >= 0 && *p.E2SubscriptionDirectives.E2RetryCount <= 10 {
504 e2SubscriptionDirectives.E2MaxTryCount = *p.E2SubscriptionDirectives.E2RetryCount + 1 // E2MaxTryCount = First sending plus two retries
506 return nil, fmt.Errorf("p.E2SubscriptionDirectives.E2RetryCount out of range (0-10): %v", *p.E2SubscriptionDirectives.E2RetryCount)
509 e2SubscriptionDirectives.CreateRMRRoute = p.E2SubscriptionDirectives.RMRRoutingNeeded
511 xapp.Logger.Debug("e2SubscriptionDirectives.E2TimeoutTimerValue: %v", e2SubscriptionDirectives.E2TimeoutTimerValue)
512 xapp.Logger.Debug("e2SubscriptionDirectives.E2MaxTryCount: %v", e2SubscriptionDirectives.E2MaxTryCount)
513 xapp.Logger.Debug("e2SubscriptionDirectives.CreateRMRRoute: %v", e2SubscriptionDirectives.CreateRMRRoute)
514 return e2SubscriptionDirectives, nil
517 //-------------------------------------------------------------------
519 //-------------------------------------------------------------------
521 func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription, subReqList *e2ap.SubscriptionRequestList,
522 clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string, xAppRmrEndpoint string, md5sum string, e2SubscriptionDirectives *E2SubscriptionDirectives) {
524 c.SubscriptionProcessingStartDelay()
525 xapp.Logger.Debug("Subscription Request count=%v ", len(subReqList.E2APSubscriptionRequests))
527 var xAppEventInstanceID int64
528 var e2EventInstanceID int64
529 errorInfo := &ErrorInfo{}
531 defer c.restDuplicateCtrl.SetMd5sumFromLastOkRequest(*restSubId, md5sum)
533 for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ {
534 subReqMsg := subReqList.E2APSubscriptionRequests[index]
535 xAppEventInstanceID = (int64)(subReqMsg.RequestId.Id)
537 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), *restSubId, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid})
539 // Send notification to xApp that prosessing of a Subscription Request has failed.
540 err := fmt.Errorf("Tracking failure")
541 errorInfo.ErrorCause = err.Error()
542 c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans, errorInfo)
546 xapp.Logger.Debug("Handle SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
548 subRespMsg, errorInfo, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, *restSubId, e2SubscriptionDirectives)
550 xapp.Logger.Debug("Handled SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
554 c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans, errorInfo)
556 e2EventInstanceID = (int64)(subRespMsg.RequestId.InstanceId)
557 restSubscription.AddMd5Sum(md5sum)
558 xapp.Logger.Debug("SubscriptionRequest index=%v processed successfullyfor %s. endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
559 index, *restSubId, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
560 c.sendSuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, e2EventInstanceID, clientEndpoint, trans)
565 //-------------------------------------------------------------------
567 //------------------------------------------------------------------
568 func (c *Control) SubscriptionProcessingStartDelay() {
569 if c.UTTesting == true {
570 // This is temporary fix for the UT problem that notification arrives before subscription response
571 // Correct fix would be to allow notification come before response and process it correctly
572 xapp.Logger.Debug("Setting 50 ms delay before starting processing Subscriptions")
573 <-time.After(time.Millisecond * 50)
574 xapp.Logger.Debug("Continuing after delay")
578 //-------------------------------------------------------------------
580 //------------------------------------------------------------------
581 func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, meid *string,
582 restSubId string, e2SubscriptionDirectives *E2SubscriptionDirectives) (*e2ap.E2APSubscriptionResponse, *ErrorInfo, error) {
584 errorInfo := ErrorInfo{}
586 err := c.tracker.Track(trans)
588 xapp.Logger.Error("XAPP-SubReq Tracking error: %s", idstring(err, trans))
589 errorInfo.ErrorCause = err.Error()
590 err = fmt.Errorf("Tracking failure")
591 return nil, &errorInfo, err
594 subs, errorInfo, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c, e2SubscriptionDirectives.CreateRMRRoute)
596 xapp.Logger.Error("XAPP-SubReq Assign error: %s", idstring(err, trans))
597 return nil, &errorInfo, err
603 subs.OngoingReqCount++
604 go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives)
605 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
606 subs.OngoingReqCount--
610 switch themsg := event.(type) {
611 case *e2ap.E2APSubscriptionResponse:
613 if c.e2IfState.IsE2ConnectionUp(meid) == true {
614 return themsg, &errorInfo, nil
616 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
617 c.RemoveSubscriptionFromDb(subs)
618 err = fmt.Errorf("E2 interface down")
619 errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
620 return nil, &errorInfo, err
622 case *e2ap.E2APSubscriptionFailure:
623 err = fmt.Errorf("E2 SubscriptionFailure received")
624 errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
625 return nil, &errorInfo, err
626 case *PackSubscriptionRequestErrortEvent:
627 err = fmt.Errorf("E2 SubscriptionRequest pack failure")
628 return nil, &themsg.ErrorInfo, err
629 case *SDLWriteErrortEvent:
630 err = fmt.Errorf("SDL write failure")
631 return nil, &themsg.ErrorInfo, err
633 err = fmt.Errorf("Unexpected E2 subscription response received")
634 errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
638 err = fmt.Errorf("E2 subscription response timeout")
639 errorInfo.SetInfo(err.Error(), "", models.SubscriptionInstanceTimeoutTypeE2Timeout)
640 if subs.PolicyUpdate == true {
641 return nil, &errorInfo, err
645 xapp.Logger.Error("XAPP-SubReq E2 subscription failed %s", idstring(err, trans, subs))
646 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
647 return nil, &errorInfo, err
650 //-------------------------------------------------------------------
652 //-------------------------------------------------------------------
653 func (c *Control) sendUnsuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, err error,
654 clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp, errorInfo *ErrorInfo) {
656 // Send notification to xApp that prosessing of a Subscription Request has failed.
657 e2EventInstanceID := (int64)(0)
658 if errorInfo.ErrorSource == "" {
659 // Submgr is default source of error
660 errorInfo.ErrorSource = models.SubscriptionInstanceErrorSourceSUBMGR
662 resp := &models.SubscriptionResponse{
663 SubscriptionID: restSubId,
664 SubscriptionInstances: []*models.SubscriptionInstance{
665 &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
666 ErrorCause: errorInfo.ErrorCause,
667 ErrorSource: errorInfo.ErrorSource,
668 TimeoutType: errorInfo.TimeoutType,
669 XappEventInstanceID: &xAppEventInstanceID},
672 // Mark REST subscription request processed.
673 restSubscription.SetProcessed(err)
674 c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
676 xapp.Logger.Debug("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
677 errorInfo.ErrorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
679 xapp.Logger.Debug("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v",
680 errorInfo.ErrorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID)
683 c.UpdateCounter(cRestSubFailNotifToXapp)
684 xapp.Subscription.Notify(resp, *clientEndpoint)
686 if c.e2IfState.IsE2ConnectionUp(&restSubscription.Meid) == false && restSubscription.SubReqOngoing == false {
687 c.registry.DeleteRESTSubscription(restSubId)
688 c.RemoveRESTSubscriptionFromDb(*restSubId)
692 //-------------------------------------------------------------------
694 //-------------------------------------------------------------------
695 func (c *Control) sendSuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, e2EventInstanceID int64,
696 clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp) {
698 // Store successfully processed InstanceId for deletion
699 restSubscription.AddE2InstanceId((uint32)(e2EventInstanceID))
700 restSubscription.AddXappIdToE2Id(xAppEventInstanceID, e2EventInstanceID)
702 // Send notification to xApp that a Subscription Request has been processed.
703 resp := &models.SubscriptionResponse{
704 SubscriptionID: restSubId,
705 SubscriptionInstances: []*models.SubscriptionInstance{
706 &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
708 XappEventInstanceID: &xAppEventInstanceID},
711 // Mark REST subscription request processesd.
712 restSubscription.SetProcessed(nil)
713 c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
714 xapp.Logger.Debug("Sending successful REST notification to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
715 clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
717 c.UpdateCounter(cRestSubNotifToXapp)
718 xapp.Subscription.Notify(resp, *clientEndpoint)
720 if c.e2IfState.IsE2ConnectionUp(&restSubscription.Meid) == false && restSubscription.SubReqOngoing == false {
721 c.registry.DeleteRESTSubscription(restSubId)
722 c.RemoveRESTSubscriptionFromDb(*restSubId)
726 //-------------------------------------------------------------------
728 //-------------------------------------------------------------------
729 func (c *Control) RESTSubscriptionDeleteHandler(restSubId string) int {
732 c.UpdateCounter(cRestSubDelReqFromXapp)
734 xapp.Logger.Debug("SubscriptionDeleteRequest from XAPP")
736 restSubscription, err := c.registry.GetRESTSubscription(restSubId, true)
738 xapp.Logger.Error("%s", err.Error())
739 if restSubscription == nil {
740 // Subscription was not found
741 c.UpdateCounter(cRestSubDelRespToXapp)
742 return common.UnsubscribeNoContentCode
744 if restSubscription.SubReqOngoing == true {
745 err := fmt.Errorf("Handling of the REST Subscription Request still ongoing %s", restSubId)
746 xapp.Logger.Error("%s", err.Error())
747 c.UpdateCounter(cRestSubDelFailToXapp)
748 return common.UnsubscribeBadRequestCode
749 } else if restSubscription.SubDelReqOngoing == true {
750 // Previous request for same restSubId still ongoing
751 c.UpdateCounter(cRestSubDelFailToXapp)
752 return common.UnsubscribeBadRequestCode
757 xAppRmrEndPoint := restSubscription.xAppRmrEndPoint
759 xapp.Logger.Debug("Deleteting handler: processing instances = %v", restSubscription.InstanceIds)
760 for _, instanceId := range restSubscription.InstanceIds {
761 xAppEventInstanceID, err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId)
764 xapp.Logger.Error("%s", err.Error())
766 xapp.Logger.Debug("Deleteting instanceId = %v", instanceId)
767 restSubscription.DeleteXappIdToE2Id(xAppEventInstanceID)
768 restSubscription.DeleteE2InstanceId(instanceId)
770 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(restSubscription.lastReqMd5sum)
771 c.registry.DeleteRESTSubscription(&restSubId)
772 c.RemoveRESTSubscriptionFromDb(restSubId)
775 c.UpdateCounter(cRestSubDelRespToXapp)
776 return common.UnsubscribeNoContentCode
779 //-------------------------------------------------------------------
781 //-------------------------------------------------------------------
782 func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32) (int64, error) {
784 var xAppEventInstanceID int64
785 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{instanceId})
787 xapp.Logger.Debug("Subscription Delete Handler subscription for restSubId=%v, E2EventInstanceID=%v not found %s",
788 restSubId, instanceId, idstring(err, nil))
789 return xAppEventInstanceID, nil
792 xAppEventInstanceID = int64(subs.ReqId.Id)
793 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(*endPoint), *restSubId, e2ap.RequestId{subs.ReqId.Id, 0}, &xapp.RMRMeid{RanName: *meid})
795 err := fmt.Errorf("XAPP-SubDelReq transaction not created. restSubId %s, endPoint %s, meid %s, instanceId %v", *restSubId, *endPoint, *meid, instanceId)
796 xapp.Logger.Error("%s", err.Error())
798 defer trans.Release()
800 err = c.tracker.Track(trans)
802 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
803 xapp.Logger.Error("%s", err.Error())
804 return xAppEventInstanceID, &time.ParseError{}
809 subs.OngoingDelCount++
810 go c.handleSubscriptionDelete(subs, trans)
811 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
812 subs.OngoingDelCount--
814 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
816 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
818 return xAppEventInstanceID, nil
821 //-------------------------------------------------------------------
823 //-------------------------------------------------------------------
824 func (c *Control) RESTQueryHandler() (models.SubscriptionList, error) {
825 xapp.Logger.Debug("RESTQueryHandler() called")
829 return c.registry.QueryHandler()
832 func (c *Control) TestRestHandler(w http.ResponseWriter, r *http.Request) {
833 xapp.Logger.Debug("RESTTestRestHandler() called")
835 pathParams := mux.Vars(r)
836 s := pathParams["testId"]
838 // This can be used to delete single subscription from db
839 if contains := strings.Contains(s, "deletesubid="); contains == true {
840 var splits = strings.Split(s, "=")
841 if subId, err := strconv.ParseInt(splits[1], 10, 64); err == nil {
842 xapp.Logger.Debug("RemoveSubscriptionFromSdl() called. subId = %v", subId)
843 c.RemoveSubscriptionFromSdl(uint32(subId))
848 // This can be used to remove all subscriptions db from
850 xapp.Logger.Debug("RemoveAllSubscriptionsFromSdl() called")
851 c.RemoveAllSubscriptionsFromSdl()
852 c.RemoveAllRESTSubscriptionsFromSdl()
856 // This is meant to cause submgr's restart in testing
858 xapp.Logger.Debug("os.Exit(1) called")
862 xapp.Logger.Debug("Unsupported rest command received %s", s)
865 //-------------------------------------------------------------------
867 //-------------------------------------------------------------------
869 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
870 params := &xapp.RMRParams{}
871 params.Mtype = trans.GetMtype()
872 params.SubId = int(subs.GetReqId().InstanceId)
874 params.Meid = subs.GetMeid()
876 params.PayloadLen = len(trans.Payload.Buf)
877 params.Payload = trans.Payload.Buf
879 xapp.Logger.Debug("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
880 err = c.SendWithRetry(params, false, 5)
882 xapp.Logger.Error("rmrSendToE2T: Send failed: %+v", err)
887 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
889 params := &xapp.RMRParams{}
890 params.Mtype = trans.GetMtype()
891 params.SubId = int(subs.GetReqId().InstanceId)
892 params.Xid = trans.GetXid()
893 params.Meid = trans.GetMeid()
895 params.PayloadLen = len(trans.Payload.Buf)
896 params.Payload = trans.Payload.Buf
898 xapp.Logger.Debug("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
899 err = c.SendWithRetry(params, false, 5)
901 xapp.Logger.Error("rmrSendToXapp: Send failed: %+v", err)
906 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
907 if c.RMRClient == nil {
908 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
909 xapp.Logger.Error("%s", err.Error())
914 defer c.RMRClient.Free(msg.Mbuf)
916 // xapp-frame might use direct access to c buffer and
917 // when msg.Mbuf is freed, someone might take it into use
918 // and payload data might be invalid inside message handle function
920 // subscriptions won't load system a lot so there is no
921 // real performance hit by cloning buffer into new go byte slice
922 cPay := append(msg.Payload[:0:0], msg.Payload...)
924 msg.PayloadLen = len(cPay)
927 case xapp.RIC_SUB_REQ:
928 go c.handleXAPPSubscriptionRequest(msg)
929 case xapp.RIC_SUB_RESP:
930 go c.handleE2TSubscriptionResponse(msg)
931 case xapp.RIC_SUB_FAILURE:
932 go c.handleE2TSubscriptionFailure(msg)
933 case xapp.RIC_SUB_DEL_REQ:
934 go c.handleXAPPSubscriptionDeleteRequest(msg)
935 case xapp.RIC_SUB_DEL_RESP:
936 go c.handleE2TSubscriptionDeleteResponse(msg)
937 case xapp.RIC_SUB_DEL_FAILURE:
938 go c.handleE2TSubscriptionDeleteFailure(msg)
940 xapp.Logger.Debug("Unknown Message Type '%d', discarding", msg.Mtype)
945 //-------------------------------------------------------------------
946 // handle from XAPP Subscription Request
947 //------------------------------------------------------------------
948 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
949 xapp.Logger.Debug("MSG from XAPP: %s", params.String())
950 c.UpdateCounter(cSubReqFromXapp)
952 if c.e2IfState.IsE2ConnectionUp(¶ms.Meid.RanName) == false {
953 xapp.Logger.Error("No E2 connection for ranName %v", params.Meid.RanName)
957 subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
959 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
963 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
965 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
968 defer trans.Release()
970 if err = c.tracker.Track(trans); err != nil {
971 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
975 //TODO handle subscription toward e2term inside AssignToSubscription / hide handleSubscriptionCreate in it?
976 subs, _, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c, true)
978 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
982 c.wakeSubscriptionRequest(subs, trans)
985 //-------------------------------------------------------------------
986 // Wake Subscription Request to E2node
987 //------------------------------------------------------------------
988 func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) {
990 e2SubscriptionDirectives, _ := c.GetE2SubscriptionDirectives(nil)
991 subs.OngoingReqCount++
992 go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives)
993 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
994 subs.OngoingReqCount--
997 switch themsg := event.(type) {
998 case *e2ap.E2APSubscriptionResponse:
999 themsg.RequestId.Id = trans.RequestId.Id
1000 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
1003 c.UpdateCounter(cSubRespToXapp)
1004 c.rmrSendToXapp("", subs, trans)
1007 case *e2ap.E2APSubscriptionFailure:
1008 themsg.RequestId.Id = trans.RequestId.Id
1009 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
1011 c.UpdateCounter(cSubFailToXapp)
1012 c.rmrSendToXapp("", subs, trans)
1018 xapp.Logger.Debug("XAPP-SubReq: failed %s", idstring(err, trans, subs))
1019 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
1022 //-------------------------------------------------------------------
1023 // handle from XAPP Subscription Delete Request
1024 //------------------------------------------------------------------
1025 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
1026 xapp.Logger.Debug("MSG from XAPP: %s", params.String())
1027 c.UpdateCounter(cSubDelReqFromXapp)
1029 if c.e2IfState.IsE2ConnectionUp(¶ms.Meid.RanName) == false {
1030 xapp.Logger.Error("No E2 connection for ranName %v", params.Meid.RanName)
1034 subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
1036 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
1040 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
1042 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
1045 defer trans.Release()
1047 err = c.tracker.Track(trans)
1049 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
1053 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
1055 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
1062 subs.OngoingDelCount++
1063 go c.handleSubscriptionDelete(subs, trans)
1064 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
1065 subs.OngoingDelCount--
1067 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
1069 if subs.NoRespToXapp == true {
1070 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
1071 xapp.Logger.Debug("XAPP-SubDelReq: subs.NoRespToXapp == true")
1075 // Whatever is received success, fail or timeout, send successful delete response
1076 subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
1077 subDelRespMsg.RequestId.Id = trans.RequestId.Id
1078 subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
1079 subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
1080 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
1082 c.UpdateCounter(cSubDelRespToXapp)
1083 c.rmrSendToXapp("", subs, trans)
1086 //TODO handle subscription toward e2term insiged RemoveFromSubscription / hide handleSubscriptionDelete in it?
1087 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
1090 //-------------------------------------------------------------------
1091 // SUBS CREATE Handling
1092 //-------------------------------------------------------------------
1093 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp, e2SubscriptionDirectives *E2SubscriptionDirectives) {
1095 var event interface{} = nil
1096 var removeSubscriptionFromDb bool = false
1097 trans := c.tracker.NewSubsTransaction(subs)
1098 subs.WaitTransactionTurn(trans)
1099 defer subs.ReleaseTransactionTurn(trans)
1100 defer trans.Release()
1102 xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
1104 subRfMsg, valid := subs.GetCachedResponse()
1105 if subRfMsg == nil && valid == true {
1106 event = c.sendE2TSubscriptionRequest(subs, trans, parentTrans, e2SubscriptionDirectives)
1107 switch event.(type) {
1108 case *e2ap.E2APSubscriptionResponse:
1109 subRfMsg, valid = subs.SetCachedResponse(event, true)
1110 subs.SubRespRcvd = true
1111 case *e2ap.E2APSubscriptionFailure:
1112 removeSubscriptionFromDb = true
1113 subRfMsg, valid = subs.SetCachedResponse(event, false)
1114 xapp.Logger.Debug("SUBS-SubReq: internal delete due failure event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1115 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1116 case *SubmgrRestartTestEvent:
1117 // This simulates that no response has been received and after restart subscriptions are restored from db
1118 xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
1119 case *PackSubscriptionRequestErrortEvent, *SDLWriteErrortEvent:
1120 subRfMsg, valid = subs.SetCachedResponse(event, false)
1122 if subs.PolicyUpdate == false {
1123 xapp.Logger.Debug("SUBS-SubReq: internal delete due default event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1124 removeSubscriptionFromDb = true
1125 subRfMsg, valid = subs.SetCachedResponse(nil, false)
1126 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1129 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
1131 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
1134 err := c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
1136 subRfMsg, valid = subs.SetCachedResponse(event, false)
1137 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1140 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
1142 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
1145 parentTrans.SendEvent(subRfMsg, 0)
1148 //-------------------------------------------------------------------
1149 // SUBS DELETE Handling
1150 //-------------------------------------------------------------------
1152 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
1154 trans := c.tracker.NewSubsTransaction(subs)
1155 subs.WaitTransactionTurn(trans)
1156 defer subs.ReleaseTransactionTurn(trans)
1157 defer trans.Release()
1159 xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
1163 if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
1166 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1170 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
1171 // If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if
1172 // RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...))
1173 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
1174 c.registry.UpdateSubscriptionToDb(subs, c)
1175 parentTrans.SendEvent(nil, 0)
1178 //-------------------------------------------------------------------
1179 // send to E2T Subscription Request
1180 //-------------------------------------------------------------------
1181 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp, e2SubscriptionDirectives *E2SubscriptionDirectives) interface{} {
1183 var event interface{} = nil
1184 var timedOut bool = false
1185 const ricRequestorId = 123
1187 subReqMsg := subs.SubReqMsg
1188 subReqMsg.RequestId = subs.GetReqId().RequestId
1189 subReqMsg.RequestId.Id = ricRequestorId
1190 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
1192 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
1193 return &PackSubscriptionRequestErrortEvent{
1195 ErrorSource: models.SubscriptionInstanceErrorSourceASN1,
1196 ErrorCause: err.Error(),
1201 // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
1202 err = c.WriteSubscriptionToDb(subs)
1204 return &SDLWriteErrortEvent{
1206 ErrorSource: models.SubscriptionInstanceErrorSourceDBAAS,
1207 ErrorCause: err.Error(),
1212 for retries := int64(0); retries < e2SubscriptionDirectives.E2MaxTryCount; retries++ {
1213 desc := fmt.Sprintf("(retry %d)", retries)
1215 c.UpdateCounter(cSubReqToE2)
1217 c.UpdateCounter(cSubReReqToE2)
1219 c.rmrSendToE2T(desc, subs, trans)
1220 if subs.DoNotWaitSubResp == false {
1221 event, timedOut = trans.WaitEvent(e2SubscriptionDirectives.E2TimeoutTimerValue)
1223 c.UpdateCounter(cSubReqTimerExpiry)
1227 // Simulating case where subscrition request has been sent but response has not been received before restart
1228 event = &SubmgrRestartTestEvent{}
1229 xapp.Logger.Debug("Restart event, DoNotWaitSubResp == true")
1233 xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1237 //-------------------------------------------------------------------
1238 // send to E2T Subscription Delete Request
1239 //-------------------------------------------------------------------
1241 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
1243 var event interface{}
1245 const ricRequestorId = 123
1247 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1248 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1249 subDelReqMsg.RequestId.Id = ricRequestorId
1250 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1251 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1253 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
1257 for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
1258 desc := fmt.Sprintf("(retry %d)", retries)
1260 c.UpdateCounter(cSubDelReqToE2)
1262 c.UpdateCounter(cSubDelReReqToE2)
1264 c.rmrSendToE2T(desc, subs, trans)
1265 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
1267 c.UpdateCounter(cSubDelReqTimerExpiry)
1272 xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1276 //-------------------------------------------------------------------
1277 // handle from E2T Subscription Response
1278 //-------------------------------------------------------------------
1279 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
1280 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1281 c.UpdateCounter(cSubRespFromE2)
1283 subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
1285 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
1288 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
1290 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
1293 trans := subs.GetTransaction()
1295 err = fmt.Errorf("Ongoing transaction not found")
1296 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
1299 sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
1300 if sendOk == false {
1301 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1302 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
1307 //-------------------------------------------------------------------
1308 // handle from E2T Subscription Failure
1309 //-------------------------------------------------------------------
1310 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
1311 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1312 c.UpdateCounter(cSubFailFromE2)
1313 subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
1315 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
1318 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
1320 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
1323 trans := subs.GetTransaction()
1325 err = fmt.Errorf("Ongoing transaction not found")
1326 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
1329 sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
1330 if sendOk == false {
1331 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1332 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
1337 //-------------------------------------------------------------------
1338 // handle from E2T Subscription Delete Response
1339 //-------------------------------------------------------------------
1340 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
1341 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1342 c.UpdateCounter(cSubDelRespFromE2)
1343 subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
1345 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1348 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
1350 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1353 trans := subs.GetTransaction()
1355 err = fmt.Errorf("Ongoing transaction not found")
1356 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
1359 sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
1360 if sendOk == false {
1361 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1362 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
1367 //-------------------------------------------------------------------
1368 // handle from E2T Subscription Delete Failure
1369 //-------------------------------------------------------------------
1370 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
1371 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1372 c.UpdateCounter(cSubDelFailFromE2)
1373 subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
1375 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1378 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
1380 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1383 trans := subs.GetTransaction()
1385 err = fmt.Errorf("Ongoing transaction not found")
1386 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
1389 sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
1390 if sendOk == false {
1391 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1392 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
1397 //-------------------------------------------------------------------
1399 //-------------------------------------------------------------------
1400 func typeofSubsMessage(v interface{}) string {
1405 //case *e2ap.E2APSubscriptionRequest:
1407 case *e2ap.E2APSubscriptionResponse:
1409 case *e2ap.E2APSubscriptionFailure:
1411 //case *e2ap.E2APSubscriptionDeleteRequest:
1412 // return "SubDelReq"
1413 case *e2ap.E2APSubscriptionDeleteResponse:
1415 case *e2ap.E2APSubscriptionDeleteFailure:
1422 //-------------------------------------------------------------------
1424 //-------------------------------------------------------------------
1425 func (c *Control) WriteSubscriptionToDb(subs *Subscription) error {
1426 xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
1427 err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
1429 xapp.Logger.Error("%v", err)
1435 //-------------------------------------------------------------------
1437 //-------------------------------------------------------------------
1438 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) error {
1440 if removeSubscriptionFromDb == true {
1441 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1442 c.RemoveSubscriptionFromDb(subs)
1444 // Update is needed for successful response and merge case here
1445 if subs.RetryFromXapp == false {
1446 err := c.WriteSubscriptionToDb(subs)
1450 subs.RetryFromXapp = false
1454 //-------------------------------------------------------------------
1456 //-------------------------------------------------------------------
1457 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
1458 xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
1459 err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
1461 xapp.Logger.Error("%v", err)
1465 //-------------------------------------------------------------------
1467 //-------------------------------------------------------------------
1468 func (c *Control) WriteRESTSubscriptionToDb(restSubId string, restSubs *RESTSubscription) {
1469 xapp.Logger.Debug("WriteRESTSubscriptionToDb() restSubId = %s", restSubId)
1470 err := c.WriteRESTSubscriptionToSdl(restSubId, restSubs)
1472 xapp.Logger.Error("%v", err)
1476 //-------------------------------------------------------------------
1478 //-------------------------------------------------------------------
1479 func (c *Control) UpdateRESTSubscriptionInDB(restSubId string, restSubs *RESTSubscription, removeRestSubscriptionFromDb bool) {
1481 if removeRestSubscriptionFromDb == true {
1482 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1483 c.RemoveRESTSubscriptionFromDb(restSubId)
1485 c.WriteRESTSubscriptionToDb(restSubId, restSubs)
1489 //-------------------------------------------------------------------
1491 //-------------------------------------------------------------------
1492 func (c *Control) RemoveRESTSubscriptionFromDb(restSubId string) {
1493 xapp.Logger.Debug("RemoveRESTSubscriptionFromDb() restSubId = %s", restSubId)
1494 err := c.RemoveRESTSubscriptionFromSdl(restSubId)
1496 xapp.Logger.Error("%v", err)
1500 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) {
1502 const ricRequestorId = 123
1503 xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
1505 // Send delete for every endpoint in the subscription
1506 if subs.PolicyUpdate == false {
1507 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1508 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1509 subDelReqMsg.RequestId.Id = ricRequestorId
1510 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1511 mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1513 xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
1516 for _, endPoint := range subs.EpList.Endpoints {
1517 params := &xapp.RMRParams{}
1518 params.Mtype = mType
1519 params.SubId = int(subs.GetReqId().InstanceId)
1521 params.Meid = subs.Meid
1522 params.Src = endPoint.String()
1523 params.PayloadLen = len(payload.Buf)
1524 params.Payload = payload.Buf
1526 subs.DeleteFromDb = true
1527 c.handleXAPPSubscriptionDeleteRequest(params)
1532 func (c *Control) PrintRESTSubscriptionRequest(p *models.SubscriptionParams) {
1534 fmt.Println("CRESTSubscriptionRequest")
1540 if p.SubscriptionID != "" {
1541 fmt.Println(" SubscriptionID = ", p.SubscriptionID)
1543 fmt.Println(" SubscriptionID = ''")
1546 fmt.Printf(" ClientEndpoint.Host = %s\n", p.ClientEndpoint.Host)
1548 if p.ClientEndpoint.HTTPPort != nil {
1549 fmt.Printf(" ClientEndpoint.HTTPPort = %v\n", *p.ClientEndpoint.HTTPPort)
1551 fmt.Println(" ClientEndpoint.HTTPPort = nil")
1554 if p.ClientEndpoint.RMRPort != nil {
1555 fmt.Printf(" ClientEndpoint.RMRPort = %v\n", *p.ClientEndpoint.RMRPort)
1557 fmt.Println(" ClientEndpoint.RMRPort = nil")
1561 fmt.Printf(" Meid = %s\n", *p.Meid)
1563 fmt.Println(" Meid = nil")
1566 if p.E2SubscriptionDirectives == nil {
1567 fmt.Println(" E2SubscriptionDirectives = nil")
1569 fmt.Println(" E2SubscriptionDirectives")
1570 if p.E2SubscriptionDirectives.E2RetryCount == nil {
1571 fmt.Println(" E2RetryCount == nil")
1573 fmt.Printf(" E2RetryCount = %v\n", *p.E2SubscriptionDirectives.E2RetryCount)
1575 fmt.Printf(" E2TimeoutTimerValue = %v\n", p.E2SubscriptionDirectives.E2TimeoutTimerValue)
1576 fmt.Printf(" RMRRoutingNeeded = %v\n", p.E2SubscriptionDirectives.RMRRoutingNeeded)
1578 for _, subscriptionDetail := range p.SubscriptionDetails {
1579 if p.RANFunctionID != nil {
1580 fmt.Printf(" RANFunctionID = %v\n", *p.RANFunctionID)
1582 fmt.Println(" RANFunctionID = nil")
1584 fmt.Printf(" SubscriptionDetail.XappEventInstanceID = %v\n", *subscriptionDetail.XappEventInstanceID)
1585 fmt.Printf(" SubscriptionDetail.EventTriggers = %v\n", subscriptionDetail.EventTriggers)
1587 for _, actionToBeSetup := range subscriptionDetail.ActionToBeSetupList {
1588 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionID = %v\n", *actionToBeSetup.ActionID)
1589 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionType = %s\n", *actionToBeSetup.ActionType)
1590 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionDefinition = %v\n", actionToBeSetup.ActionDefinition)
1592 if actionToBeSetup.SubsequentAction != nil {
1593 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.SubsequentAction.SubsequentActionType = %s\n", *actionToBeSetup.SubsequentAction.SubsequentActionType)
1594 fmt.Printf(" SubscriptionDetail.ActionToBeSetup..SubsequentAction.TimeToWait = %s\n", *actionToBeSetup.SubsequentAction.TimeToWait)
1596 fmt.Println(" SubscriptionDetail.ActionToBeSetup.SubsequentAction = nil")