2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
30 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
31 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
32 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
33 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/common"
34 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
35 httptransport "github.com/go-openapi/runtime/client"
36 "github.com/go-openapi/strfmt"
37 "github.com/gorilla/mux"
38 "github.com/segmentio/ksuid"
39 "github.com/spf13/viper"
42 //-----------------------------------------------------------------------------
44 //-----------------------------------------------------------------------------
46 func idstring(err error, entries ...fmt.Stringer) string {
47 var retval string = ""
48 var filler string = ""
49 for _, entry := range entries {
51 retval += filler + entry.String()
54 retval += filler + "(NIL)"
58 retval += filler + "err(" + err.Error() + ")"
64 //-----------------------------------------------------------------------------
66 //-----------------------------------------------------------------------------
68 var e2tSubReqTimeout time.Duration
69 var e2tSubDelReqTime time.Duration
70 var e2tRecvMsgTimeout time.Duration
71 var waitRouteCleanup_ms time.Duration
72 var e2tMaxSubReqTryCount uint64 // Initial try + retry
73 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
74 var checkE2State string
75 var readSubsFromDb string
76 var dbRetryForever string
84 restDuplicateCtrl *DuplicateCtrl
86 e2IfStateDb XappRnibInterface
88 restSubsDb Sdlnterface
91 Counters map[string]xapp.Counter
102 type SubmgrRestartTestEvent struct{}
103 type SubmgrRestartUpEvent struct{}
104 type PackSubscriptionRequestErrortEvent struct {
108 func (p *PackSubscriptionRequestErrortEvent) SetEvent(errorInfo *ErrorInfo) {
109 p.ErrorInfo = *errorInfo
112 type SDLWriteErrortEvent struct {
116 func (s *SDLWriteErrortEvent) SetEvent(errorInfo *ErrorInfo) {
117 s.ErrorInfo = *errorInfo
121 xapp.Logger.Debug("SUBMGR")
123 viper.SetEnvPrefix("submgr")
124 viper.AllowEmptyEnv(true)
127 func NewControl() *Control {
129 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
130 rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
132 registry := new(Registry)
133 registry.Initialize()
134 registry.rtmgrClient = &rtmgrClient
136 tracker := new(Tracker)
139 restDuplicateCtrl := new(DuplicateCtrl)
140 restDuplicateCtrl.Init()
142 e2IfState := new(E2IfState)
144 c := &Control{e2ap: new(E2ap),
147 restDuplicateCtrl: restDuplicateCtrl,
148 e2IfState: e2IfState,
149 e2IfStateDb: CreateXappRnibIfInstance(),
150 e2SubsDb: CreateSdl(),
151 restSubsDb: CreateRESTSdl(),
152 Counters: xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
157 c.ReadConfigParameters("")
159 // Register REST handler for testing support
160 xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
161 xapp.Resource.InjectRoute("/ric/v1/restsubscriptions", c.GetAllRestSubscriptions, "GET")
162 xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET")
164 if readSubsFromDb == "false" {
168 // Read subscriptions from db
169 c.ReadE2Subscriptions()
170 c.ReadRESTSubscriptions()
172 go xapp.Subscription.Listen(c.RESTSubscriptionHandler, c.RESTQueryHandler, c.RESTSubscriptionDeleteHandler)
177 func (c *Control) SymptomDataHandler(w http.ResponseWriter, r *http.Request) {
178 subscriptions, _ := c.registry.QueryHandler()
179 xapp.Resource.SendSymptomDataJson(w, r, subscriptions, "platform/subscriptions.json")
182 //-------------------------------------------------------------------
184 //-------------------------------------------------------------------
185 func (c *Control) GetAllRestSubscriptions(w http.ResponseWriter, r *http.Request) {
186 xapp.Logger.Debug("GetAllRestSubscriptions() called")
187 response := c.registry.GetAllRestSubscriptions()
191 //-------------------------------------------------------------------
193 //-------------------------------------------------------------------
194 func (c *Control) ReadE2Subscriptions() error {
197 var register map[uint32]*Subscription
198 for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
199 xapp.Logger.Debug("Reading E2 subscriptions from db")
200 subIds, register, err = c.ReadAllSubscriptionsFromSdl()
202 xapp.Logger.Error("%v", err)
203 <-time.After(1 * time.Second)
205 c.registry.subIds = subIds
206 c.registry.register = register
207 c.HandleUncompletedSubscriptions(register)
211 xapp.Logger.Debug("Continuing without retring")
215 //-------------------------------------------------------------------
217 //-------------------------------------------------------------------
218 func (c *Control) ReadRESTSubscriptions() error {
220 var restSubscriptions map[string]*RESTSubscription
221 for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
222 xapp.Logger.Debug("Reading REST subscriptions from db")
223 restSubscriptions, err = c.ReadAllRESTSubscriptionsFromSdl()
225 xapp.Logger.Error("%v", err)
226 <-time.After(1 * time.Second)
228 c.registry.restSubscriptions = restSubscriptions
232 xapp.Logger.Debug("Continuing without retring")
236 //-------------------------------------------------------------------
238 //-------------------------------------------------------------------
239 func (c *Control) ReadConfigParameters(f string) {
241 xapp.Logger.Debug("ReadConfigParameters")
243 c.LoggerLevel = int(xapp.Logger.GetLevel())
244 xapp.Logger.Debug("LoggerLevel= %v", c.LoggerLevel)
246 // viper.GetDuration returns nanoseconds
247 e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
248 if e2tSubReqTimeout == 0 {
249 e2tSubReqTimeout = 2000 * 1000000
250 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tSubReqTimeout")
252 xapp.Logger.Debug("e2tSubReqTimeout= %v", e2tSubReqTimeout)
254 e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
255 if e2tSubDelReqTime == 0 {
256 e2tSubDelReqTime = 2000 * 1000000
257 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tSubDelReqTime")
259 xapp.Logger.Debug("e2tSubDelReqTime= %v", e2tSubDelReqTime)
261 e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
262 if e2tRecvMsgTimeout == 0 {
263 e2tRecvMsgTimeout = 2000 * 1000000
264 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tRecvMsgTimeout")
266 xapp.Logger.Debug("e2tRecvMsgTimeout= %v", e2tRecvMsgTimeout)
268 e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
269 if e2tMaxSubReqTryCount == 0 {
270 e2tMaxSubReqTryCount = 1
271 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tMaxSubReqTryCount")
273 xapp.Logger.Debug("e2tMaxSubReqTryCount= %v", e2tMaxSubReqTryCount)
275 e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
276 if e2tMaxSubDelReqTryCount == 0 {
277 e2tMaxSubDelReqTryCount = 1
278 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tMaxSubDelReqTryCount")
280 xapp.Logger.Debug("e2tMaxSubDelReqTryCount= %v", e2tMaxSubDelReqTryCount)
282 checkE2State = viper.GetString("controls.checkE2State")
283 if checkE2State == "" {
284 checkE2State = "true"
285 xapp.Logger.Debug("WARNING: Using hard coded default value for checkE2State")
287 xapp.Logger.Debug("checkE2State= %v", checkE2State)
289 readSubsFromDb = viper.GetString("controls.readSubsFromDb")
290 if readSubsFromDb == "" {
291 readSubsFromDb = "true"
292 xapp.Logger.Debug("WARNING: Using hard coded default value for readSubsFromDb")
294 xapp.Logger.Debug("readSubsFromDb= %v", readSubsFromDb)
296 dbTryCount = viper.GetInt("controls.dbTryCount")
299 xapp.Logger.Debug("WARNING: Using hard coded default value for dbTryCount")
301 xapp.Logger.Debug("dbTryCount= %v", dbTryCount)
303 dbRetryForever = viper.GetString("controls.dbRetryForever")
304 if dbRetryForever == "" {
305 dbRetryForever = "true"
306 xapp.Logger.Debug("WARNING: Using hard coded default value for dbRetryForever")
308 xapp.Logger.Debug("dbRetryForever= %v", dbRetryForever)
310 // Internal cfg parameter, used to define a wait time for RMR route clean-up. None default
311 // value 100ms used currently only in unittests.
312 waitRouteCleanup_ms = viper.GetDuration("controls.waitRouteCleanup_ms") * 1000000
313 if waitRouteCleanup_ms == 0 {
314 waitRouteCleanup_ms = 5000 * 1000000
315 xapp.Logger.Debug("WARNING: Using hard coded default value for waitRouteCleanup_ms")
317 xapp.Logger.Debug("waitRouteCleanup= %v", waitRouteCleanup_ms)
320 //-------------------------------------------------------------------
322 //-------------------------------------------------------------------
323 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
325 xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
326 for subId, subs := range register {
327 if subs.SubRespRcvd == false {
328 // If policy subscription has already been made successfully unsuccessful update should not be deleted.
329 if subs.PolicyUpdate == false {
330 subs.NoRespToXapp = true
331 xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
332 c.SendSubscriptionDeleteReq(subs)
338 func (c *Control) ReadyCB(data interface{}) {
339 if c.RMRClient == nil {
340 c.RMRClient = xapp.Rmr
344 func (c *Control) Run() {
345 xapp.SetReadyCB(c.ReadyCB, nil)
346 xapp.AddConfigChangeListener(c.ReadConfigParameters)
350 //-------------------------------------------------------------------
352 //-------------------------------------------------------------------
353 func (c *Control) GetOrCreateRestSubscription(p *models.SubscriptionParams, md5sum string, xAppRmrEndpoint string) (*RESTSubscription, string, error) {
356 var restSubscription *RESTSubscription
359 prevRestSubsId, exists := c.restDuplicateCtrl.GetLastKnownRestSubsIdBasedOnMd5sum(md5sum)
360 if p.SubscriptionID == "" {
361 // Subscription does not contain REST subscription Id
363 restSubscription, err = c.registry.GetRESTSubscription(prevRestSubsId, false)
364 if restSubscription != nil {
365 // Subscription not found
366 restSubId = prevRestSubsId
368 xapp.Logger.Debug("Existing restSubId %s found by MD5sum %s for a request without subscription ID - using previous subscription", prevRestSubsId, md5sum)
370 xapp.Logger.Debug("Existing restSubId %s found by MD5sum %s for a request without subscription ID - Note: %s", prevRestSubsId, md5sum, err.Error())
373 xapp.Logger.Debug("None existing restSubId %s referred by MD5sum %s for a request without subscription ID - deleting cached entry", prevRestSubsId, md5sum)
374 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
378 if restSubscription == nil {
379 restSubId = ksuid.New().String()
380 restSubscription = c.registry.CreateRESTSubscription(&restSubId, &xAppRmrEndpoint, p.Meid)
383 // Subscription contains REST subscription Id
384 restSubId = p.SubscriptionID
386 xapp.Logger.Debug("RestSubscription ID %s provided via REST request", restSubId)
387 restSubscription, err = c.registry.GetRESTSubscription(restSubId, false)
389 // Subscription with id in REST request does not exist
390 xapp.Logger.Error("%s", err.Error())
391 c.UpdateCounter(cRestSubFailToXapp)
396 xapp.Logger.Debug("Existing restSubscription found for ID %s, new request based on md5sum", restSubId)
398 xapp.Logger.Debug("Existing restSubscription found for ID %s(%s), re-transmission based on md5sum match with previous request", prevRestSubsId, restSubId)
402 return restSubscription, restSubId, nil
405 //-------------------------------------------------------------------
407 //-------------------------------------------------------------------
408 func (c *Control) RESTSubscriptionHandler(params interface{}) (*models.SubscriptionResponse, int) {
411 c.UpdateCounter(cRestSubReqFromXapp)
413 subResp := models.SubscriptionResponse{}
414 p := params.(*models.SubscriptionParams)
416 if c.LoggerLevel > 2 {
417 c.PrintRESTSubscriptionRequest(p)
420 if c.e2IfState.IsE2ConnectionUp(p.Meid) == false {
421 xapp.Logger.Error("No E2 connection for ranName %v", *p.Meid)
422 c.UpdateCounter(cRestReqRejDueE2Down)
423 return nil, common.SubscribeServiceUnavailableCode
426 if p.ClientEndpoint == nil {
427 err := fmt.Errorf("ClientEndpoint == nil")
428 xapp.Logger.Error("%v", err)
429 c.UpdateCounter(cRestSubFailToXapp)
430 return nil, common.SubscribeBadRequestCode
433 _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*p.ClientEndpoint)
435 xapp.Logger.Error("%s", err.Error())
436 c.UpdateCounter(cRestSubFailToXapp)
437 return nil, common.SubscribeBadRequestCode
440 md5sum, err := CalculateRequestMd5sum(params)
442 xapp.Logger.Error("Failed to generate md5sum from incoming request - %s", err.Error())
445 restSubscription, restSubId, err := c.GetOrCreateRestSubscription(p, md5sum, xAppRmrEndpoint)
447 xapp.Logger.Error("Subscription with id in REST request does not exist")
448 return nil, common.SubscribeNotFoundCode
451 subResp.SubscriptionID = &restSubId
452 subReqList := e2ap.SubscriptionRequestList{}
453 err = c.e2ap.FillSubscriptionReqMsgs(params, &subReqList, restSubscription)
455 xapp.Logger.Error("%s", err.Error())
456 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
457 c.registry.DeleteRESTSubscription(&restSubId)
458 c.UpdateCounter(cRestSubFailToXapp)
459 return nil, common.SubscribeBadRequestCode
462 duplicate := c.restDuplicateCtrl.IsDuplicateToOngoingTransaction(restSubId, md5sum)
464 err := fmt.Errorf("Retransmission blocker direct ACK for request of restSubsId %s restSubId MD5sum %s as retransmission", restSubId, md5sum)
465 xapp.Logger.Debug("%s", err)
466 c.UpdateCounter(cRestSubRespToXapp)
467 return &subResp, common.SubscribeCreatedCode
470 c.WriteRESTSubscriptionToDb(restSubId, restSubscription)
471 e2SubscriptionDirectives, err := c.GetE2SubscriptionDirectives(p)
473 xapp.Logger.Error("%s", err)
474 return nil, common.SubscribeBadRequestCode
476 go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId, xAppRmrEndpoint, md5sum, e2SubscriptionDirectives)
478 c.UpdateCounter(cRestSubRespToXapp)
479 return &subResp, common.SubscribeCreatedCode
482 //-------------------------------------------------------------------
484 //-------------------------------------------------------------------
485 func (c *Control) GetE2SubscriptionDirectives(p *models.SubscriptionParams) (*E2SubscriptionDirectives, error) {
487 e2SubscriptionDirectives := &E2SubscriptionDirectives{}
488 if p == nil || p.E2SubscriptionDirectives == nil {
489 e2SubscriptionDirectives.E2TimeoutTimerValue = e2tSubReqTimeout
490 e2SubscriptionDirectives.E2MaxTryCount = int64(e2tMaxSubReqTryCount)
491 e2SubscriptionDirectives.CreateRMRRoute = true
492 xapp.Logger.Debug("p == nil || p.E2SubscriptionDirectives == nil. Using default values for E2TimeoutTimerValue = %v and E2RetryCount = %v RMRRoutingNeeded = true", e2tSubReqTimeout, e2tMaxSubReqTryCount)
494 if p.E2SubscriptionDirectives.E2TimeoutTimerValue >= 1 && p.E2SubscriptionDirectives.E2TimeoutTimerValue <= 10 {
495 e2SubscriptionDirectives.E2TimeoutTimerValue = time.Duration(p.E2SubscriptionDirectives.E2TimeoutTimerValue) * 1000000000 // Duration type cast returns nano seconds
497 return nil, fmt.Errorf("p.E2SubscriptionDirectives.E2TimeoutTimerValue out of range (1-10 seconds): %v", p.E2SubscriptionDirectives.E2TimeoutTimerValue)
499 if p.E2SubscriptionDirectives.E2RetryCount == nil {
500 xapp.Logger.Error("p.E2SubscriptionDirectives.E2RetryCount == nil. Using default value")
501 e2SubscriptionDirectives.E2MaxTryCount = int64(e2tMaxSubReqTryCount)
503 if *p.E2SubscriptionDirectives.E2RetryCount >= 0 && *p.E2SubscriptionDirectives.E2RetryCount <= 10 {
504 e2SubscriptionDirectives.E2MaxTryCount = *p.E2SubscriptionDirectives.E2RetryCount + 1 // E2MaxTryCount = First sending plus two retries
506 return nil, fmt.Errorf("p.E2SubscriptionDirectives.E2RetryCount out of range (0-10): %v", *p.E2SubscriptionDirectives.E2RetryCount)
509 e2SubscriptionDirectives.CreateRMRRoute = p.E2SubscriptionDirectives.RMRRoutingNeeded
511 xapp.Logger.Debug("e2SubscriptionDirectives.E2TimeoutTimerValue: %v", e2SubscriptionDirectives.E2TimeoutTimerValue)
512 xapp.Logger.Debug("e2SubscriptionDirectives.E2MaxTryCount: %v", e2SubscriptionDirectives.E2MaxTryCount)
513 xapp.Logger.Debug("e2SubscriptionDirectives.CreateRMRRoute: %v", e2SubscriptionDirectives.CreateRMRRoute)
514 return e2SubscriptionDirectives, nil
517 //-------------------------------------------------------------------
519 //-------------------------------------------------------------------
521 func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription, subReqList *e2ap.SubscriptionRequestList,
522 clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string, xAppRmrEndpoint string, md5sum string, e2SubscriptionDirectives *E2SubscriptionDirectives) {
524 c.SubscriptionProcessingStartDelay()
525 xapp.Logger.Debug("Subscription Request count=%v ", len(subReqList.E2APSubscriptionRequests))
527 var xAppEventInstanceID int64
528 var e2EventInstanceID int64
529 errorInfo := &ErrorInfo{}
531 defer c.restDuplicateCtrl.SetMd5sumFromLastOkRequest(*restSubId, md5sum)
533 for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ {
534 subReqMsg := subReqList.E2APSubscriptionRequests[index]
535 xAppEventInstanceID = (int64)(subReqMsg.RequestId.Id)
537 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), *restSubId, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid})
539 // Send notification to xApp that prosessing of a Subscription Request has failed.
540 err := fmt.Errorf("Tracking failure")
541 errorInfo.ErrorCause = err.Error()
542 c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans, errorInfo)
546 xapp.Logger.Debug("Handle SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
548 subRespMsg, errorInfo, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, *restSubId, e2SubscriptionDirectives)
550 xapp.Logger.Debug("Handled SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
554 c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans, errorInfo)
556 e2EventInstanceID = (int64)(subRespMsg.RequestId.InstanceId)
557 restSubscription.AddMd5Sum(md5sum)
558 xapp.Logger.Debug("SubscriptionRequest index=%v processed successfullyfor %s. endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
559 index, *restSubId, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
560 c.sendSuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, e2EventInstanceID, clientEndpoint, trans)
565 //-------------------------------------------------------------------
567 //------------------------------------------------------------------
568 func (c *Control) SubscriptionProcessingStartDelay() {
569 if c.UTTesting == true {
570 // This is temporary fix for the UT problem that notification arrives before subscription response
571 // Correct fix would be to allow notification come before response and process it correctly
572 xapp.Logger.Debug("Setting 50 ms delay before starting processing Subscriptions")
573 <-time.After(time.Millisecond * 50)
574 xapp.Logger.Debug("Continuing after delay")
578 //-------------------------------------------------------------------
580 //------------------------------------------------------------------
581 func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, meid *string,
582 restSubId string, e2SubscriptionDirectives *E2SubscriptionDirectives) (*e2ap.E2APSubscriptionResponse, *ErrorInfo, error) {
584 errorInfo := ErrorInfo{}
586 err := c.tracker.Track(trans)
588 xapp.Logger.Error("XAPP-SubReq Tracking error: %s", idstring(err, trans))
589 errorInfo.ErrorCause = err.Error()
590 err = fmt.Errorf("Tracking failure")
591 return nil, &errorInfo, err
594 subs, errorInfo, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c, e2SubscriptionDirectives.CreateRMRRoute)
596 xapp.Logger.Error("XAPP-SubReq Assign error: %s", idstring(err, trans))
597 return nil, &errorInfo, err
603 subs.OngoingReqCount++
604 go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives)
605 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
606 subs.OngoingReqCount--
610 switch themsg := event.(type) {
611 case *e2ap.E2APSubscriptionResponse:
613 if c.e2IfState.IsE2ConnectionUp(meid) == true {
614 return themsg, &errorInfo, nil
616 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
617 c.RemoveSubscriptionFromDb(subs)
618 err = fmt.Errorf("E2 interface down")
619 errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
620 return nil, &errorInfo, err
622 case *e2ap.E2APSubscriptionFailure:
623 err = fmt.Errorf("E2 SubscriptionFailure received")
624 errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
625 return nil, &errorInfo, err
626 case *PackSubscriptionRequestErrortEvent:
627 err = fmt.Errorf("E2 SubscriptionRequest pack failure")
628 return nil, &themsg.ErrorInfo, err
629 case *SDLWriteErrortEvent:
630 err = fmt.Errorf("SDL write failure")
631 return nil, &themsg.ErrorInfo, err
633 err = fmt.Errorf("Unexpected E2 subscription response received")
634 errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
638 err = fmt.Errorf("E2 subscription response timeout")
639 errorInfo.SetInfo(err.Error(), "", models.SubscriptionInstanceTimeoutTypeE2Timeout)
640 if subs.PolicyUpdate == true {
641 return nil, &errorInfo, err
645 xapp.Logger.Error("XAPP-SubReq E2 subscription failed %s", idstring(err, trans, subs))
646 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
647 return nil, &errorInfo, err
650 //-------------------------------------------------------------------
652 //-------------------------------------------------------------------
653 func (c *Control) sendUnsuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, err error,
654 clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp, errorInfo *ErrorInfo) {
656 // Send notification to xApp that prosessing of a Subscription Request has failed.
657 e2EventInstanceID := (int64)(0)
658 if errorInfo.ErrorSource == "" {
659 // Submgr is default source of error
660 errorInfo.ErrorSource = models.SubscriptionInstanceErrorSourceSUBMGR
662 resp := &models.SubscriptionResponse{
663 SubscriptionID: restSubId,
664 SubscriptionInstances: []*models.SubscriptionInstance{
665 &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
666 ErrorCause: errorInfo.ErrorCause,
667 ErrorSource: errorInfo.ErrorSource,
668 TimeoutType: errorInfo.TimeoutType,
669 XappEventInstanceID: &xAppEventInstanceID},
672 // Mark REST subscription request processed.
673 restSubscription.SetProcessed(err)
674 c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
676 xapp.Logger.Debug("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
677 errorInfo.ErrorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
679 xapp.Logger.Debug("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v",
680 errorInfo.ErrorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID)
683 c.UpdateCounter(cRestSubFailNotifToXapp)
684 xapp.Subscription.Notify(resp, *clientEndpoint)
686 if c.e2IfState.IsE2ConnectionUp(&restSubscription.Meid) == false && restSubscription.SubReqOngoing == false {
687 c.registry.DeleteRESTSubscription(restSubId)
688 c.RemoveRESTSubscriptionFromDb(*restSubId)
692 //-------------------------------------------------------------------
694 //-------------------------------------------------------------------
695 func (c *Control) sendSuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, e2EventInstanceID int64,
696 clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp) {
698 // Store successfully processed InstanceId for deletion
699 restSubscription.AddE2InstanceId((uint32)(e2EventInstanceID))
700 restSubscription.AddXappIdToE2Id(xAppEventInstanceID, e2EventInstanceID)
702 // Send notification to xApp that a Subscription Request has been processed.
703 resp := &models.SubscriptionResponse{
704 SubscriptionID: restSubId,
705 SubscriptionInstances: []*models.SubscriptionInstance{
706 &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
708 XappEventInstanceID: &xAppEventInstanceID},
711 // Mark REST subscription request processesd.
712 restSubscription.SetProcessed(nil)
713 c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
714 xapp.Logger.Debug("Sending successful REST notification to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
715 clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
717 c.UpdateCounter(cRestSubNotifToXapp)
718 xapp.Subscription.Notify(resp, *clientEndpoint)
720 if c.e2IfState.IsE2ConnectionUp(&restSubscription.Meid) == false && restSubscription.SubReqOngoing == false {
721 c.registry.DeleteRESTSubscription(restSubId)
722 c.RemoveRESTSubscriptionFromDb(*restSubId)
726 //-------------------------------------------------------------------
728 //-------------------------------------------------------------------
729 func (c *Control) RESTSubscriptionDeleteHandler(restSubId string) int {
732 c.UpdateCounter(cRestSubDelReqFromXapp)
734 xapp.Logger.Debug("SubscriptionDeleteRequest from XAPP")
736 restSubscription, err := c.registry.GetRESTSubscription(restSubId, true)
738 xapp.Logger.Error("%s", err.Error())
739 if restSubscription == nil {
740 // Subscription was not found
741 c.UpdateCounter(cRestSubDelRespToXapp)
742 return common.UnsubscribeNoContentCode
744 if restSubscription.SubReqOngoing == true {
745 err := fmt.Errorf("Handling of the REST Subscription Request still ongoing %s", restSubId)
746 xapp.Logger.Error("%s", err.Error())
747 c.UpdateCounter(cRestSubDelFailToXapp)
748 return common.UnsubscribeBadRequestCode
749 } else if restSubscription.SubDelReqOngoing == true {
750 // Previous request for same restSubId still ongoing
751 c.UpdateCounter(cRestSubDelFailToXapp)
752 return common.UnsubscribeBadRequestCode
757 xAppRmrEndPoint := restSubscription.xAppRmrEndPoint
759 xapp.Logger.Debug("Deleteting handler: processing instances = %v", restSubscription.InstanceIds)
760 for _, instanceId := range restSubscription.InstanceIds {
761 xAppEventInstanceID, err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId)
764 xapp.Logger.Error("%s", err.Error())
766 xapp.Logger.Debug("Deleteting instanceId = %v", instanceId)
767 restSubscription.DeleteXappIdToE2Id(xAppEventInstanceID)
768 restSubscription.DeleteE2InstanceId(instanceId)
770 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(restSubscription.lastReqMd5sum)
771 c.registry.DeleteRESTSubscription(&restSubId)
772 c.RemoveRESTSubscriptionFromDb(restSubId)
775 c.UpdateCounter(cRestSubDelRespToXapp)
776 return common.UnsubscribeNoContentCode
779 //-------------------------------------------------------------------
781 //-------------------------------------------------------------------
782 func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32) (int64, error) {
784 var xAppEventInstanceID int64
785 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{instanceId})
787 xapp.Logger.Debug("Subscription Delete Handler subscription for restSubId=%v, E2EventInstanceID=%v not found %s",
788 restSubId, instanceId, idstring(err, nil))
789 return xAppEventInstanceID, nil
792 xAppEventInstanceID = int64(subs.ReqId.Id)
793 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(*endPoint), *restSubId, e2ap.RequestId{subs.ReqId.Id, 0}, &xapp.RMRMeid{RanName: *meid})
795 err := fmt.Errorf("XAPP-SubDelReq transaction not created. restSubId %s, endPoint %s, meid %s, instanceId %v", *restSubId, *endPoint, *meid, instanceId)
796 xapp.Logger.Error("%s", err.Error())
798 defer trans.Release()
800 err = c.tracker.Track(trans)
802 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
803 xapp.Logger.Error("%s", err.Error())
804 return xAppEventInstanceID, &time.ParseError{}
809 subs.OngoingDelCount++
810 go c.handleSubscriptionDelete(subs, trans)
811 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
812 subs.OngoingDelCount--
814 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
816 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
818 return xAppEventInstanceID, nil
821 //-------------------------------------------------------------------
823 //-------------------------------------------------------------------
824 func (c *Control) RESTQueryHandler() (models.SubscriptionList, error) {
825 xapp.Logger.Debug("RESTQueryHandler() called")
829 return c.registry.QueryHandler()
832 func (c *Control) TestRestHandler(w http.ResponseWriter, r *http.Request) {
833 xapp.Logger.Debug("RESTTestRestHandler() called")
835 pathParams := mux.Vars(r)
836 s := pathParams["testId"]
838 // This can be used to delete single subscription from db
839 if contains := strings.Contains(s, "deletesubid="); contains == true {
840 var splits = strings.Split(s, "=")
841 if subId, err := strconv.ParseInt(splits[1], 10, 64); err == nil {
842 xapp.Logger.Debug("RemoveSubscriptionFromSdl() called. subId = %v", subId)
843 c.RemoveSubscriptionFromSdl(uint32(subId))
848 // This can be used to remove all subscriptions db from
850 xapp.Logger.Debug("RemoveAllSubscriptionsFromSdl() called")
851 c.RemoveAllSubscriptionsFromSdl()
852 c.RemoveAllRESTSubscriptionsFromSdl()
856 // This is meant to cause submgr's restart in testing
858 xapp.Logger.Debug("os.Exit(1) called")
862 xapp.Logger.Debug("Unsupported rest command received %s", s)
865 //-------------------------------------------------------------------
867 //-------------------------------------------------------------------
869 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
870 params := &xapp.RMRParams{}
871 params.Mtype = trans.GetMtype()
872 params.SubId = int(subs.GetReqId().InstanceId)
874 params.Meid = subs.GetMeid()
876 params.PayloadLen = len(trans.Payload.Buf)
877 params.Payload = trans.Payload.Buf
879 xapp.Logger.Debug("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
880 err = c.SendWithRetry(params, false, 5)
882 xapp.Logger.Error("rmrSendToE2T: Send failed: %+v", err)
887 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
889 params := &xapp.RMRParams{}
890 params.Mtype = trans.GetMtype()
891 params.SubId = int(subs.GetReqId().InstanceId)
892 params.Xid = trans.GetXid()
893 params.Meid = trans.GetMeid()
895 params.PayloadLen = len(trans.Payload.Buf)
896 params.Payload = trans.Payload.Buf
898 xapp.Logger.Debug("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
899 err = c.SendWithRetry(params, false, 5)
901 xapp.Logger.Error("rmrSendToXapp: Send failed: %+v", err)
906 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
907 if c.RMRClient == nil {
908 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
909 xapp.Logger.Error("%s", err.Error())
914 defer c.RMRClient.Free(msg.Mbuf)
916 // xapp-frame might use direct access to c buffer and
917 // when msg.Mbuf is freed, someone might take it into use
918 // and payload data might be invalid inside message handle function
920 // subscriptions won't load system a lot so there is no
921 // real performance hit by cloning buffer into new go byte slice
922 cPay := append(msg.Payload[:0:0], msg.Payload...)
924 msg.PayloadLen = len(cPay)
927 case xapp.RIC_SUB_REQ:
928 go c.handleXAPPSubscriptionRequest(msg)
929 case xapp.RIC_SUB_RESP:
930 go c.handleE2TSubscriptionResponse(msg)
931 case xapp.RIC_SUB_FAILURE:
932 go c.handleE2TSubscriptionFailure(msg)
933 case xapp.RIC_SUB_DEL_REQ:
934 go c.handleXAPPSubscriptionDeleteRequest(msg)
935 case xapp.RIC_SUB_DEL_RESP:
936 go c.handleE2TSubscriptionDeleteResponse(msg)
937 case xapp.RIC_SUB_DEL_FAILURE:
938 go c.handleE2TSubscriptionDeleteFailure(msg)
940 xapp.Logger.Debug("Unknown Message Type '%d', discarding", msg.Mtype)
945 //-------------------------------------------------------------------
946 // handle from XAPP Subscription Request
947 //------------------------------------------------------------------
948 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
949 xapp.Logger.Debug("MSG from XAPP: %s", params.String())
950 c.UpdateCounter(cSubReqFromXapp)
952 if c.e2IfState.IsE2ConnectionUp(¶ms.Meid.RanName) == false {
953 xapp.Logger.Error("No E2 connection for ranName %v", params.Meid.RanName)
957 subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
959 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
963 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
965 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
968 defer trans.Release()
970 if err = c.tracker.Track(trans); err != nil {
971 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
975 //TODO handle subscription toward e2term inside AssignToSubscription / hide handleSubscriptionCreate in it?
976 subs, _, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c, true)
978 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
982 c.wakeSubscriptionRequest(subs, trans)
985 //-------------------------------------------------------------------
986 // Wake Subscription Request to E2node
987 //------------------------------------------------------------------
988 func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) {
990 e2SubscriptionDirectives, _ := c.GetE2SubscriptionDirectives(nil)
991 subs.OngoingReqCount++
992 go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives)
993 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
994 subs.OngoingReqCount--
997 switch themsg := event.(type) {
998 case *e2ap.E2APSubscriptionResponse:
999 themsg.RequestId.Id = trans.RequestId.Id
1000 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
1003 c.UpdateCounter(cSubRespToXapp)
1004 c.rmrSendToXapp("", subs, trans)
1007 case *e2ap.E2APSubscriptionFailure:
1008 themsg.RequestId.Id = trans.RequestId.Id
1009 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
1011 c.UpdateCounter(cSubFailToXapp)
1012 c.rmrSendToXapp("", subs, trans)
1018 xapp.Logger.Debug("XAPP-SubReq: failed %s", idstring(err, trans, subs))
1019 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
1022 //-------------------------------------------------------------------
1023 // handle from XAPP Subscription Delete Request
1024 //------------------------------------------------------------------
1025 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
1026 xapp.Logger.Debug("MSG from XAPP: %s", params.String())
1027 c.UpdateCounter(cSubDelReqFromXapp)
1029 if c.e2IfState.IsE2ConnectionUp(¶ms.Meid.RanName) == false {
1030 xapp.Logger.Error("No E2 connection for ranName %v", params.Meid.RanName)
1034 subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
1036 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
1040 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
1042 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
1045 defer trans.Release()
1047 err = c.tracker.Track(trans)
1049 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
1053 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
1055 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
1062 subs.OngoingDelCount++
1063 go c.handleSubscriptionDelete(subs, trans)
1064 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
1065 subs.OngoingDelCount--
1067 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
1069 if subs.NoRespToXapp == true {
1070 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
1071 xapp.Logger.Debug("XAPP-SubDelReq: subs.NoRespToXapp == true")
1075 // Whatever is received success, fail or timeout, send successful delete response
1076 subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
1077 subDelRespMsg.RequestId.Id = trans.RequestId.Id
1078 subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
1079 subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
1080 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
1082 c.UpdateCounter(cSubDelRespToXapp)
1083 c.rmrSendToXapp("", subs, trans)
1086 //TODO handle subscription toward e2term insiged RemoveFromSubscription / hide handleSubscriptionDelete in it?
1087 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
1090 //-------------------------------------------------------------------
1091 // SUBS CREATE Handling
1092 //-------------------------------------------------------------------
1093 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp, e2SubscriptionDirectives *E2SubscriptionDirectives) {
1095 var event interface{} = nil
1096 var removeSubscriptionFromDb bool = false
1097 trans := c.tracker.NewSubsTransaction(subs)
1098 subs.WaitTransactionTurn(trans)
1099 defer subs.ReleaseTransactionTurn(trans)
1100 defer trans.Release()
1102 xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
1104 subRfMsg, valid := subs.GetCachedResponse()
1105 if subRfMsg == nil && valid == true {
1106 event = c.sendE2TSubscriptionRequest(subs, trans, parentTrans, e2SubscriptionDirectives)
1107 switch event.(type) {
1108 case *e2ap.E2APSubscriptionResponse:
1109 subRfMsg, valid = subs.SetCachedResponse(event, true)
1110 subs.SubRespRcvd = true
1111 case *e2ap.E2APSubscriptionFailure:
1112 removeSubscriptionFromDb = true
1113 subRfMsg, valid = subs.SetCachedResponse(event, false)
1114 xapp.Logger.Debug("SUBS-SubReq: internal delete due failure event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1115 case *SubmgrRestartTestEvent:
1116 // This simulates that no response has been received and after restart subscriptions are restored from db
1117 xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
1118 case *PackSubscriptionRequestErrortEvent, *SDLWriteErrortEvent:
1119 subRfMsg, valid = subs.SetCachedResponse(event, false)
1121 if subs.PolicyUpdate == false {
1122 xapp.Logger.Debug("SUBS-SubReq: internal delete due default event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1123 removeSubscriptionFromDb = true
1124 subRfMsg, valid = subs.SetCachedResponse(nil, false)
1125 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1128 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
1130 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
1133 err := c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
1135 subRfMsg, valid = subs.SetCachedResponse(event, false)
1136 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1139 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
1141 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
1144 parentTrans.SendEvent(subRfMsg, 0)
1147 //-------------------------------------------------------------------
1148 // SUBS DELETE Handling
1149 //-------------------------------------------------------------------
1151 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
1153 trans := c.tracker.NewSubsTransaction(subs)
1154 subs.WaitTransactionTurn(trans)
1155 defer subs.ReleaseTransactionTurn(trans)
1156 defer trans.Release()
1158 xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
1162 if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
1165 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1169 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
1170 // If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if
1171 // RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...))
1172 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
1173 c.registry.UpdateSubscriptionToDb(subs, c)
1174 parentTrans.SendEvent(nil, 0)
1177 //-------------------------------------------------------------------
1178 // send to E2T Subscription Request
1179 //-------------------------------------------------------------------
1180 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp, e2SubscriptionDirectives *E2SubscriptionDirectives) interface{} {
1182 var event interface{} = nil
1183 var timedOut bool = false
1184 const ricRequestorId = 123
1186 subReqMsg := subs.SubReqMsg
1187 subReqMsg.RequestId = subs.GetReqId().RequestId
1188 subReqMsg.RequestId.Id = ricRequestorId
1189 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
1191 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
1192 return &PackSubscriptionRequestErrortEvent{
1194 ErrorSource: models.SubscriptionInstanceErrorSourceASN1,
1195 ErrorCause: err.Error(),
1200 // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
1201 err = c.WriteSubscriptionToDb(subs)
1203 return &SDLWriteErrortEvent{
1205 ErrorSource: models.SubscriptionInstanceErrorSourceDBAAS,
1206 ErrorCause: err.Error(),
1211 for retries := int64(0); retries < e2SubscriptionDirectives.E2MaxTryCount; retries++ {
1212 desc := fmt.Sprintf("(retry %d)", retries)
1214 c.UpdateCounter(cSubReqToE2)
1216 c.UpdateCounter(cSubReReqToE2)
1218 c.rmrSendToE2T(desc, subs, trans)
1219 if subs.DoNotWaitSubResp == false {
1220 event, timedOut = trans.WaitEvent(e2SubscriptionDirectives.E2TimeoutTimerValue)
1222 c.UpdateCounter(cSubReqTimerExpiry)
1226 // Simulating case where subscrition request has been sent but response has not been received before restart
1227 event = &SubmgrRestartTestEvent{}
1228 xapp.Logger.Debug("Restart event, DoNotWaitSubResp == true")
1232 xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1236 //-------------------------------------------------------------------
1237 // send to E2T Subscription Delete Request
1238 //-------------------------------------------------------------------
1240 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
1242 var event interface{}
1244 const ricRequestorId = 123
1246 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1247 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1248 subDelReqMsg.RequestId.Id = ricRequestorId
1249 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1250 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1252 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
1256 for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
1257 desc := fmt.Sprintf("(retry %d)", retries)
1259 c.UpdateCounter(cSubDelReqToE2)
1261 c.UpdateCounter(cSubDelReReqToE2)
1263 c.rmrSendToE2T(desc, subs, trans)
1264 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
1266 c.UpdateCounter(cSubDelReqTimerExpiry)
1271 xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1275 //-------------------------------------------------------------------
1276 // handle from E2T Subscription Response
1277 //-------------------------------------------------------------------
1278 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
1279 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1280 c.UpdateCounter(cSubRespFromE2)
1282 subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
1284 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
1287 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
1289 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
1292 trans := subs.GetTransaction()
1294 err = fmt.Errorf("Ongoing transaction not found")
1295 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
1298 sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
1299 if sendOk == false {
1300 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1301 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
1306 //-------------------------------------------------------------------
1307 // handle from E2T Subscription Failure
1308 //-------------------------------------------------------------------
1309 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
1310 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1311 c.UpdateCounter(cSubFailFromE2)
1312 subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
1314 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
1317 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
1319 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
1322 trans := subs.GetTransaction()
1324 err = fmt.Errorf("Ongoing transaction not found")
1325 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
1328 sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
1329 if sendOk == false {
1330 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1331 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
1336 //-------------------------------------------------------------------
1337 // handle from E2T Subscription Delete Response
1338 //-------------------------------------------------------------------
1339 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
1340 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1341 c.UpdateCounter(cSubDelRespFromE2)
1342 subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
1344 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1347 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
1349 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1352 trans := subs.GetTransaction()
1354 err = fmt.Errorf("Ongoing transaction not found")
1355 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
1358 sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
1359 if sendOk == false {
1360 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1361 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
1366 //-------------------------------------------------------------------
1367 // handle from E2T Subscription Delete Failure
1368 //-------------------------------------------------------------------
1369 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
1370 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1371 c.UpdateCounter(cSubDelFailFromE2)
1372 subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
1374 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1377 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
1379 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1382 trans := subs.GetTransaction()
1384 err = fmt.Errorf("Ongoing transaction not found")
1385 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
1388 sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
1389 if sendOk == false {
1390 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1391 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
1396 //-------------------------------------------------------------------
1398 //-------------------------------------------------------------------
1399 func typeofSubsMessage(v interface{}) string {
1404 //case *e2ap.E2APSubscriptionRequest:
1406 case *e2ap.E2APSubscriptionResponse:
1408 case *e2ap.E2APSubscriptionFailure:
1410 //case *e2ap.E2APSubscriptionDeleteRequest:
1411 // return "SubDelReq"
1412 case *e2ap.E2APSubscriptionDeleteResponse:
1414 case *e2ap.E2APSubscriptionDeleteFailure:
1421 //-------------------------------------------------------------------
1423 //-------------------------------------------------------------------
1424 func (c *Control) WriteSubscriptionToDb(subs *Subscription) error {
1425 xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
1426 err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
1428 xapp.Logger.Error("%v", err)
1434 //-------------------------------------------------------------------
1436 //-------------------------------------------------------------------
1437 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) error {
1439 if removeSubscriptionFromDb == true {
1440 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1441 c.RemoveSubscriptionFromDb(subs)
1443 // Update is needed for successful response and merge case here
1444 if subs.RetryFromXapp == false {
1445 err := c.WriteSubscriptionToDb(subs)
1449 subs.RetryFromXapp = false
1453 //-------------------------------------------------------------------
1455 //-------------------------------------------------------------------
1456 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
1457 xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
1458 err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
1460 xapp.Logger.Error("%v", err)
1464 //-------------------------------------------------------------------
1466 //-------------------------------------------------------------------
1467 func (c *Control) WriteRESTSubscriptionToDb(restSubId string, restSubs *RESTSubscription) {
1468 xapp.Logger.Debug("WriteRESTSubscriptionToDb() restSubId = %s", restSubId)
1469 err := c.WriteRESTSubscriptionToSdl(restSubId, restSubs)
1471 xapp.Logger.Error("%v", err)
1475 //-------------------------------------------------------------------
1477 //-------------------------------------------------------------------
1478 func (c *Control) UpdateRESTSubscriptionInDB(restSubId string, restSubs *RESTSubscription, removeRestSubscriptionFromDb bool) {
1480 if removeRestSubscriptionFromDb == true {
1481 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1482 c.RemoveRESTSubscriptionFromDb(restSubId)
1484 c.WriteRESTSubscriptionToDb(restSubId, restSubs)
1488 //-------------------------------------------------------------------
1490 //-------------------------------------------------------------------
1491 func (c *Control) RemoveRESTSubscriptionFromDb(restSubId string) {
1492 xapp.Logger.Debug("RemoveRESTSubscriptionFromDb() restSubId = %s", restSubId)
1493 err := c.RemoveRESTSubscriptionFromSdl(restSubId)
1495 xapp.Logger.Error("%v", err)
1499 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) {
1501 const ricRequestorId = 123
1502 xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
1504 // Send delete for every endpoint in the subscription
1505 if subs.PolicyUpdate == false {
1506 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1507 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1508 subDelReqMsg.RequestId.Id = ricRequestorId
1509 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1510 mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1512 xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
1515 for _, endPoint := range subs.EpList.Endpoints {
1516 params := &xapp.RMRParams{}
1517 params.Mtype = mType
1518 params.SubId = int(subs.GetReqId().InstanceId)
1520 params.Meid = subs.Meid
1521 params.Src = endPoint.String()
1522 params.PayloadLen = len(payload.Buf)
1523 params.Payload = payload.Buf
1525 subs.DeleteFromDb = true
1526 c.handleXAPPSubscriptionDeleteRequest(params)
1531 func (c *Control) PrintRESTSubscriptionRequest(p *models.SubscriptionParams) {
1533 fmt.Println("CRESTSubscriptionRequest")
1539 if p.SubscriptionID != "" {
1540 fmt.Println(" SubscriptionID = ", p.SubscriptionID)
1542 fmt.Println(" SubscriptionID = ''")
1545 fmt.Printf(" ClientEndpoint.Host = %s\n", p.ClientEndpoint.Host)
1547 if p.ClientEndpoint.HTTPPort != nil {
1548 fmt.Printf(" ClientEndpoint.HTTPPort = %v\n", *p.ClientEndpoint.HTTPPort)
1550 fmt.Println(" ClientEndpoint.HTTPPort = nil")
1553 if p.ClientEndpoint.RMRPort != nil {
1554 fmt.Printf(" ClientEndpoint.RMRPort = %v\n", *p.ClientEndpoint.RMRPort)
1556 fmt.Println(" ClientEndpoint.RMRPort = nil")
1560 fmt.Printf(" Meid = %s\n", *p.Meid)
1562 fmt.Println(" Meid = nil")
1565 if p.E2SubscriptionDirectives == nil {
1566 fmt.Println(" E2SubscriptionDirectives = nil")
1568 fmt.Println(" E2SubscriptionDirectives")
1569 if p.E2SubscriptionDirectives.E2RetryCount == nil {
1570 fmt.Println(" E2RetryCount == nil")
1572 fmt.Printf(" E2RetryCount = %v\n", *p.E2SubscriptionDirectives.E2RetryCount)
1574 fmt.Printf(" E2TimeoutTimerValue = %v\n", p.E2SubscriptionDirectives.E2TimeoutTimerValue)
1575 fmt.Printf(" RMRRoutingNeeded = %v\n", p.E2SubscriptionDirectives.RMRRoutingNeeded)
1577 for _, subscriptionDetail := range p.SubscriptionDetails {
1578 if p.RANFunctionID != nil {
1579 fmt.Printf(" RANFunctionID = %v\n", *p.RANFunctionID)
1581 fmt.Println(" RANFunctionID = nil")
1583 fmt.Printf(" SubscriptionDetail.XappEventInstanceID = %v\n", *subscriptionDetail.XappEventInstanceID)
1584 fmt.Printf(" SubscriptionDetail.EventTriggers = %v\n", subscriptionDetail.EventTriggers)
1586 for _, actionToBeSetup := range subscriptionDetail.ActionToBeSetupList {
1587 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionID = %v\n", *actionToBeSetup.ActionID)
1588 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionType = %s\n", *actionToBeSetup.ActionType)
1589 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionDefinition = %v\n", actionToBeSetup.ActionDefinition)
1591 if actionToBeSetup.SubsequentAction != nil {
1592 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.SubsequentAction.SubsequentActionType = %s\n", *actionToBeSetup.SubsequentAction.SubsequentActionType)
1593 fmt.Printf(" SubscriptionDetail.ActionToBeSetup..SubsequentAction.TimeToWait = %s\n", *actionToBeSetup.SubsequentAction.TimeToWait)
1595 fmt.Println(" SubscriptionDetail.ActionToBeSetup.SubsequentAction = nil")