2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
30 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
31 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
32 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
33 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/common"
34 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
35 httptransport "github.com/go-openapi/runtime/client"
36 "github.com/go-openapi/strfmt"
37 "github.com/gorilla/mux"
38 "github.com/segmentio/ksuid"
39 "github.com/spf13/viper"
42 //-----------------------------------------------------------------------------
44 //-----------------------------------------------------------------------------
46 func idstring(err error, entries ...fmt.Stringer) string {
47 var retval string = ""
48 var filler string = ""
49 for _, entry := range entries {
51 retval += filler + entry.String()
54 retval += filler + "(NIL)"
58 retval += filler + "err(" + err.Error() + ")"
64 //-----------------------------------------------------------------------------
66 //-----------------------------------------------------------------------------
68 var e2tSubReqTimeout time.Duration
69 var e2tSubDelReqTime time.Duration
70 var e2tRecvMsgTimeout time.Duration
71 var waitRouteCleanup_ms time.Duration
72 var e2tMaxSubReqTryCount uint64 // Initial try + retry
73 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
74 var checkE2State string
75 var readSubsFromDb string
76 var dbRetryForever string
84 restDuplicateCtrl *DuplicateCtrl
86 e2IfStateDb XappRnibInterface
88 restSubsDb Sdlnterface
91 Counters map[string]xapp.Counter
102 type SubmgrRestartTestEvent struct{}
103 type SubmgrRestartUpEvent struct{}
104 type PackSubscriptionRequestErrortEvent struct {
108 func (p *PackSubscriptionRequestErrortEvent) SetEvent(errorInfo *ErrorInfo) {
109 p.ErrorInfo = *errorInfo
112 type SDLWriteErrortEvent struct {
116 func (s *SDLWriteErrortEvent) SetEvent(errorInfo *ErrorInfo) {
117 s.ErrorInfo = *errorInfo
121 xapp.Logger.Debug("SUBMGR")
123 viper.SetEnvPrefix("submgr")
124 viper.AllowEmptyEnv(true)
127 func NewControl() *Control {
129 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
130 rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
132 registry := new(Registry)
133 registry.Initialize()
134 registry.rtmgrClient = &rtmgrClient
136 tracker := new(Tracker)
139 restDuplicateCtrl := new(DuplicateCtrl)
140 restDuplicateCtrl.Init()
142 e2IfState := new(E2IfState)
144 c := &Control{e2ap: new(E2ap),
147 restDuplicateCtrl: restDuplicateCtrl,
148 e2IfState: e2IfState,
149 e2IfStateDb: CreateXappRnibIfInstance(),
150 e2SubsDb: CreateSdl(),
151 restSubsDb: CreateRESTSdl(),
152 Counters: xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
157 c.ReadConfigParameters("")
159 // Register REST handler for testing support
160 xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
161 xapp.Resource.InjectRoute("/ric/v1/restsubscriptions", c.GetAllRestSubscriptions, "GET")
162 xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET")
164 if readSubsFromDb == "true" {
165 // Read subscriptions from db
166 c.ReadE2Subscriptions()
167 c.ReadRESTSubscriptions()
170 go xapp.Subscription.Listen(c.RESTSubscriptionHandler, c.RESTQueryHandler, c.RESTSubscriptionDeleteHandler)
174 func (c *Control) SymptomDataHandler(w http.ResponseWriter, r *http.Request) {
175 subscriptions, _ := c.registry.QueryHandler()
176 xapp.Resource.SendSymptomDataJson(w, r, subscriptions, "platform/subscriptions.json")
179 //-------------------------------------------------------------------
181 //-------------------------------------------------------------------
182 func (c *Control) GetAllRestSubscriptions(w http.ResponseWriter, r *http.Request) {
183 xapp.Logger.Debug("GetAllRestSubscriptions() called")
184 response := c.registry.GetAllRestSubscriptions()
188 //-------------------------------------------------------------------
190 //-------------------------------------------------------------------
191 func (c *Control) ReadE2Subscriptions() error {
194 var register map[uint32]*Subscription
195 for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
196 xapp.Logger.Debug("Reading E2 subscriptions from db")
197 subIds, register, err = c.ReadAllSubscriptionsFromSdl()
199 xapp.Logger.Error("%v", err)
200 <-time.After(1 * time.Second)
202 c.registry.subIds = subIds
203 c.registry.register = register
204 c.HandleUncompletedSubscriptions(register)
208 xapp.Logger.Debug("Continuing without retring")
212 //-------------------------------------------------------------------
214 //-------------------------------------------------------------------
215 func (c *Control) ReadRESTSubscriptions() error {
217 var restSubscriptions map[string]*RESTSubscription
218 for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
219 xapp.Logger.Debug("Reading REST subscriptions from db")
220 restSubscriptions, err = c.ReadAllRESTSubscriptionsFromSdl()
222 xapp.Logger.Error("%v", err)
223 <-time.After(1 * time.Second)
225 c.registry.restSubscriptions = restSubscriptions
229 xapp.Logger.Debug("Continuing without retring")
233 //-------------------------------------------------------------------
235 //-------------------------------------------------------------------
236 func (c *Control) ReadConfigParameters(f string) {
238 xapp.Logger.Debug("ReadConfigParameters")
240 c.LoggerLevel = int(xapp.Logger.GetLevel())
241 xapp.Logger.Info("LoggerLevel = %v", c.LoggerLevel)
242 c.e2ap.SetASN1DebugPrintStatus(c.LoggerLevel)
244 // viper.GetDuration returns nanoseconds
245 e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
246 if e2tSubReqTimeout == 0 {
247 e2tSubReqTimeout = 2000 * 1000000
248 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tSubReqTimeout")
250 xapp.Logger.Debug("e2tSubReqTimeout= %v", e2tSubReqTimeout)
252 e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
253 if e2tSubDelReqTime == 0 {
254 e2tSubDelReqTime = 2000 * 1000000
255 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tSubDelReqTime")
257 xapp.Logger.Debug("e2tSubDelReqTime= %v", e2tSubDelReqTime)
259 e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
260 if e2tRecvMsgTimeout == 0 {
261 e2tRecvMsgTimeout = 2000 * 1000000
262 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tRecvMsgTimeout")
264 xapp.Logger.Debug("e2tRecvMsgTimeout= %v", e2tRecvMsgTimeout)
266 e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
267 if e2tMaxSubReqTryCount == 0 {
268 e2tMaxSubReqTryCount = 1
269 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tMaxSubReqTryCount")
271 xapp.Logger.Debug("e2tMaxSubReqTryCount= %v", e2tMaxSubReqTryCount)
273 e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
274 if e2tMaxSubDelReqTryCount == 0 {
275 e2tMaxSubDelReqTryCount = 1
276 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tMaxSubDelReqTryCount")
278 xapp.Logger.Debug("e2tMaxSubDelReqTryCount= %v", e2tMaxSubDelReqTryCount)
280 checkE2State = viper.GetString("controls.checkE2State")
281 if checkE2State == "" {
282 checkE2State = "true"
283 xapp.Logger.Debug("WARNING: Using hard coded default value for checkE2State")
285 xapp.Logger.Debug("checkE2State= %v", checkE2State)
287 readSubsFromDb = viper.GetString("controls.readSubsFromDb")
288 if readSubsFromDb == "" {
289 readSubsFromDb = "true"
290 xapp.Logger.Debug("WARNING: Using hard coded default value for readSubsFromDb")
292 xapp.Logger.Debug("readSubsFromDb= %v", readSubsFromDb)
294 dbTryCount = viper.GetInt("controls.dbTryCount")
297 xapp.Logger.Debug("WARNING: Using hard coded default value for dbTryCount")
299 xapp.Logger.Debug("dbTryCount= %v", dbTryCount)
301 dbRetryForever = viper.GetString("controls.dbRetryForever")
302 if dbRetryForever == "" {
303 dbRetryForever = "true"
304 xapp.Logger.Debug("WARNING: Using hard coded default value for dbRetryForever")
306 xapp.Logger.Debug("dbRetryForever= %v", dbRetryForever)
308 // Internal cfg parameter, used to define a wait time for RMR route clean-up. None default
309 // value 100ms used currently only in unittests.
310 waitRouteCleanup_ms = viper.GetDuration("controls.waitRouteCleanup_ms") * 1000000
311 if waitRouteCleanup_ms == 0 {
312 waitRouteCleanup_ms = 5000 * 1000000
313 xapp.Logger.Debug("WARNING: Using hard coded default value for waitRouteCleanup_ms")
315 xapp.Logger.Debug("waitRouteCleanup= %v", waitRouteCleanup_ms)
318 //-------------------------------------------------------------------
320 //-------------------------------------------------------------------
321 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
323 xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
324 for subId, subs := range register {
325 if subs.SubRespRcvd == false {
326 // If policy subscription has already been made successfully unsuccessful update should not be deleted.
327 if subs.PolicyUpdate == false {
328 subs.NoRespToXapp = true
329 xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
330 c.SendSubscriptionDeleteReq(subs)
336 func (c *Control) ReadyCB(data interface{}) {
337 if c.RMRClient == nil {
338 c.RMRClient = xapp.Rmr
342 func (c *Control) Run() {
343 xapp.SetReadyCB(c.ReadyCB, nil)
344 xapp.AddConfigChangeListener(c.ReadConfigParameters)
348 //-------------------------------------------------------------------
350 //-------------------------------------------------------------------
351 func (c *Control) GetOrCreateRestSubscription(p *models.SubscriptionParams, md5sum string, xAppRmrEndpoint string) (*RESTSubscription, string, error) {
354 var restSubscription *RESTSubscription
357 prevRestSubsId, exists := c.restDuplicateCtrl.GetLastKnownRestSubsIdBasedOnMd5sum(md5sum)
358 if p.SubscriptionID == "" {
359 // Subscription does not contain REST subscription Id
361 restSubscription, err = c.registry.GetRESTSubscription(prevRestSubsId, false)
362 if restSubscription != nil {
363 // Subscription not found
364 restSubId = prevRestSubsId
366 xapp.Logger.Debug("Existing restSubId %s found by MD5sum %s for a request without subscription ID - using previous subscription", prevRestSubsId, md5sum)
368 xapp.Logger.Debug("Existing restSubId %s found by MD5sum %s for a request without subscription ID - Note: %s", prevRestSubsId, md5sum, err.Error())
371 xapp.Logger.Debug("None existing restSubId %s referred by MD5sum %s for a request without subscription ID - deleting cached entry", prevRestSubsId, md5sum)
372 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
376 if restSubscription == nil {
377 restSubId = ksuid.New().String()
378 restSubscription = c.registry.CreateRESTSubscription(&restSubId, &xAppRmrEndpoint, p.Meid)
381 // Subscription contains REST subscription Id
382 restSubId = p.SubscriptionID
384 xapp.Logger.Debug("RestSubscription ID %s provided via REST request", restSubId)
385 restSubscription, err = c.registry.GetRESTSubscription(restSubId, false)
387 // Subscription with id in REST request does not exist
388 xapp.Logger.Error("%s", err.Error())
389 c.UpdateCounter(cRestSubFailToXapp)
394 xapp.Logger.Debug("Existing restSubscription found for ID %s, new request based on md5sum", restSubId)
396 xapp.Logger.Debug("Existing restSubscription found for ID %s(%s), re-transmission based on md5sum match with previous request", prevRestSubsId, restSubId)
400 return restSubscription, restSubId, nil
403 //-------------------------------------------------------------------
405 //-------------------------------------------------------------------
406 func (c *Control) RESTSubscriptionHandler(params interface{}) (*models.SubscriptionResponse, int) {
409 c.UpdateCounter(cRestSubReqFromXapp)
411 subResp := models.SubscriptionResponse{}
412 p := params.(*models.SubscriptionParams)
414 if c.LoggerLevel > 2 {
415 c.PrintRESTSubscriptionRequest(p)
418 if c.e2IfState.IsE2ConnectionUp(p.Meid) == false {
419 xapp.Logger.Error("No E2 connection for ranName %v", *p.Meid)
420 c.UpdateCounter(cRestReqRejDueE2Down)
421 return nil, common.SubscribeServiceUnavailableCode
424 if p.ClientEndpoint == nil {
425 err := fmt.Errorf("ClientEndpoint == nil")
426 xapp.Logger.Error("%v", err)
427 c.UpdateCounter(cRestSubFailToXapp)
428 return nil, common.SubscribeBadRequestCode
431 _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*p.ClientEndpoint)
433 xapp.Logger.Error("%s", err.Error())
434 c.UpdateCounter(cRestSubFailToXapp)
435 return nil, common.SubscribeBadRequestCode
438 md5sum, err := CalculateRequestMd5sum(params)
440 xapp.Logger.Error("Failed to generate md5sum from incoming request - %s", err.Error())
443 restSubscription, restSubId, err := c.GetOrCreateRestSubscription(p, md5sum, xAppRmrEndpoint)
445 xapp.Logger.Error("Subscription with id in REST request does not exist")
446 return nil, common.SubscribeNotFoundCode
449 subResp.SubscriptionID = &restSubId
450 subReqList := e2ap.SubscriptionRequestList{}
451 err = c.e2ap.FillSubscriptionReqMsgs(params, &subReqList, restSubscription)
453 xapp.Logger.Error("%s", err.Error())
454 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
455 c.registry.DeleteRESTSubscription(&restSubId)
456 c.UpdateCounter(cRestSubFailToXapp)
457 return nil, common.SubscribeBadRequestCode
460 duplicate := c.restDuplicateCtrl.IsDuplicateToOngoingTransaction(restSubId, md5sum)
462 err := fmt.Errorf("Retransmission blocker direct ACK for request of restSubsId %s restSubId MD5sum %s as retransmission", restSubId, md5sum)
463 xapp.Logger.Debug("%s", err)
464 c.registry.DeleteRESTSubscription(&restSubId)
465 c.UpdateCounter(cRestSubRespToXapp)
466 return &subResp, common.SubscribeCreatedCode
469 c.WriteRESTSubscriptionToDb(restSubId, restSubscription)
470 e2SubscriptionDirectives, err := c.GetE2SubscriptionDirectives(p)
472 xapp.Logger.Error("%s", err)
473 c.registry.DeleteRESTSubscription(&restSubId)
474 return nil, common.SubscribeBadRequestCode
476 go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId, xAppRmrEndpoint, md5sum, e2SubscriptionDirectives)
478 c.UpdateCounter(cRestSubRespToXapp)
479 return &subResp, common.SubscribeCreatedCode
482 //-------------------------------------------------------------------
484 //-------------------------------------------------------------------
485 func (c *Control) GetE2SubscriptionDirectives(p *models.SubscriptionParams) (*E2SubscriptionDirectives, error) {
487 e2SubscriptionDirectives := &E2SubscriptionDirectives{}
488 if p == nil || p.E2SubscriptionDirectives == nil {
489 e2SubscriptionDirectives.E2TimeoutTimerValue = e2tSubReqTimeout
490 e2SubscriptionDirectives.E2MaxTryCount = int64(e2tMaxSubReqTryCount)
491 e2SubscriptionDirectives.CreateRMRRoute = true
492 xapp.Logger.Debug("p == nil || p.E2SubscriptionDirectives == nil. Using default values for E2TimeoutTimerValue = %v and E2RetryCount = %v RMRRoutingNeeded = true", e2tSubReqTimeout, e2tMaxSubReqTryCount)
494 if p.E2SubscriptionDirectives.E2TimeoutTimerValue >= 1 && p.E2SubscriptionDirectives.E2TimeoutTimerValue <= 10 {
495 e2SubscriptionDirectives.E2TimeoutTimerValue = time.Duration(p.E2SubscriptionDirectives.E2TimeoutTimerValue) * 1000000000 // Duration type cast returns nano seconds
497 return nil, fmt.Errorf("p.E2SubscriptionDirectives.E2TimeoutTimerValue out of range (1-10 seconds): %v", p.E2SubscriptionDirectives.E2TimeoutTimerValue)
499 if p.E2SubscriptionDirectives.E2RetryCount == nil {
500 xapp.Logger.Error("p.E2SubscriptionDirectives.E2RetryCount == nil. Using default value")
501 e2SubscriptionDirectives.E2MaxTryCount = int64(e2tMaxSubReqTryCount)
503 if *p.E2SubscriptionDirectives.E2RetryCount >= 0 && *p.E2SubscriptionDirectives.E2RetryCount <= 10 {
504 e2SubscriptionDirectives.E2MaxTryCount = *p.E2SubscriptionDirectives.E2RetryCount + 1 // E2MaxTryCount = First sending plus two retries
506 return nil, fmt.Errorf("p.E2SubscriptionDirectives.E2RetryCount out of range (0-10): %v", *p.E2SubscriptionDirectives.E2RetryCount)
509 e2SubscriptionDirectives.CreateRMRRoute = p.E2SubscriptionDirectives.RMRRoutingNeeded
511 xapp.Logger.Debug("e2SubscriptionDirectives.E2TimeoutTimerValue: %v", e2SubscriptionDirectives.E2TimeoutTimerValue)
512 xapp.Logger.Debug("e2SubscriptionDirectives.E2MaxTryCount: %v", e2SubscriptionDirectives.E2MaxTryCount)
513 xapp.Logger.Debug("e2SubscriptionDirectives.CreateRMRRoute: %v", e2SubscriptionDirectives.CreateRMRRoute)
514 return e2SubscriptionDirectives, nil
517 //-------------------------------------------------------------------
519 //-------------------------------------------------------------------
521 func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription, subReqList *e2ap.SubscriptionRequestList,
522 clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string, xAppRmrEndpoint string, md5sum string, e2SubscriptionDirectives *E2SubscriptionDirectives) {
524 c.SubscriptionProcessingStartDelay()
525 xapp.Logger.Debug("E2 SubscriptionRequest count =%v ", len(subReqList.E2APSubscriptionRequests))
527 var xAppEventInstanceID int64
528 var e2EventInstanceID int64
529 errorInfo := &ErrorInfo{}
531 defer c.restDuplicateCtrl.SetMd5sumFromLastOkRequest(*restSubId, md5sum)
533 for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ {
534 subReqMsg := subReqList.E2APSubscriptionRequests[index]
535 xAppEventInstanceID = (int64)(subReqMsg.RequestId.Id)
537 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), *restSubId, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid})
539 // Send notification to xApp that prosessing of a Subscription Request has failed.
540 err := fmt.Errorf("Tracking failure")
541 errorInfo.ErrorCause = err.Error()
542 c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans, errorInfo)
546 xapp.Logger.Debug("Handle SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
548 subRespMsg, errorInfo, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, *restSubId, e2SubscriptionDirectives)
550 xapp.Logger.Debug("Handled SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
554 c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans, errorInfo)
556 e2EventInstanceID = (int64)(subRespMsg.RequestId.InstanceId)
557 restSubscription.AddMd5Sum(md5sum)
558 xapp.Logger.Debug("SubscriptionRequest index=%v processed successfullyfor %s. endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
559 index, *restSubId, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
560 c.sendSuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, e2EventInstanceID, clientEndpoint, trans)
565 //-------------------------------------------------------------------
567 //------------------------------------------------------------------
568 func (c *Control) SubscriptionProcessingStartDelay() {
569 if c.UTTesting == true {
570 // This is temporary fix for the UT problem that notification arrives before subscription response
571 // Correct fix would be to allow notification come before response and process it correctly
572 xapp.Logger.Debug("Setting 50 ms delay before starting processing Subscriptions")
573 <-time.After(time.Millisecond * 50)
574 xapp.Logger.Debug("Continuing after delay")
578 //-------------------------------------------------------------------
580 //------------------------------------------------------------------
581 func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, meid *string,
582 restSubId string, e2SubscriptionDirectives *E2SubscriptionDirectives) (*e2ap.E2APSubscriptionResponse, *ErrorInfo, error) {
584 errorInfo := ErrorInfo{}
586 err := c.tracker.Track(trans)
588 xapp.Logger.Error("XAPP-SubReq Tracking error: %s", idstring(err, trans))
589 errorInfo.ErrorCause = err.Error()
590 err = fmt.Errorf("Tracking failure")
591 return nil, &errorInfo, err
594 subs, errorInfo, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c, e2SubscriptionDirectives.CreateRMRRoute)
596 xapp.Logger.Error("XAPP-SubReq Assign error: %s", idstring(err, trans))
597 return nil, &errorInfo, err
603 subs.OngoingReqCount++
604 go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives)
605 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
606 subs.OngoingReqCount--
610 switch themsg := event.(type) {
611 case *e2ap.E2APSubscriptionResponse:
613 if c.e2IfState.IsE2ConnectionUp(meid) == true {
614 return themsg, &errorInfo, nil
616 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
617 c.RemoveSubscriptionFromDb(subs)
618 err = fmt.Errorf("E2 interface down")
619 errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
621 case *e2ap.E2APSubscriptionFailure:
622 err = fmt.Errorf("E2 SubscriptionFailure received")
623 errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
624 case *PackSubscriptionRequestErrortEvent:
625 err = fmt.Errorf("E2 SubscriptionRequest pack failure")
626 errorInfo = themsg.ErrorInfo
627 case *SDLWriteErrortEvent:
628 err = fmt.Errorf("SDL write failure")
629 errorInfo = themsg.ErrorInfo
631 err = fmt.Errorf("Unexpected E2 subscription response received")
632 errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
637 err = fmt.Errorf("E2 subscription response timeout")
638 errorInfo.SetInfo(err.Error(), "", models.SubscriptionInstanceTimeoutTypeE2Timeout)
639 if subs.PolicyUpdate == true {
640 return nil, &errorInfo, err
644 xapp.Logger.Error("XAPP-SubReq E2 subscription failed %s", idstring(err, trans, subs))
645 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
646 return nil, &errorInfo, err
649 //-------------------------------------------------------------------
651 //-------------------------------------------------------------------
652 func (c *Control) sendUnsuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, err error,
653 clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp, errorInfo *ErrorInfo) {
655 // Send notification to xApp that prosessing of a Subscription Request has failed.
656 e2EventInstanceID := (int64)(0)
657 if errorInfo.ErrorSource == "" {
658 // Submgr is default source of error
659 errorInfo.ErrorSource = models.SubscriptionInstanceErrorSourceSUBMGR
661 resp := &models.SubscriptionResponse{
662 SubscriptionID: restSubId,
663 SubscriptionInstances: []*models.SubscriptionInstance{
664 &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
665 ErrorCause: errorInfo.ErrorCause,
666 ErrorSource: errorInfo.ErrorSource,
667 TimeoutType: errorInfo.TimeoutType,
668 XappEventInstanceID: &xAppEventInstanceID},
671 // Mark REST subscription request processed.
672 restSubscription.SetProcessed(err)
673 c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
675 xapp.Logger.Debug("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
676 errorInfo.ErrorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
678 xapp.Logger.Debug("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v",
679 errorInfo.ErrorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID)
682 c.UpdateCounter(cRestSubFailNotifToXapp)
683 xapp.Subscription.Notify(resp, *clientEndpoint)
685 // E2 is down. Delete completely processed request safely now
686 if c.e2IfState.IsE2ConnectionUp(&restSubscription.Meid) == false && restSubscription.SubReqOngoing == false {
687 c.registry.DeleteRESTSubscription(restSubId)
688 c.RemoveRESTSubscriptionFromDb(*restSubId)
692 //-------------------------------------------------------------------
694 //-------------------------------------------------------------------
695 func (c *Control) sendSuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, e2EventInstanceID int64,
696 clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp) {
698 // Store successfully processed InstanceId for deletion
699 restSubscription.AddE2InstanceId((uint32)(e2EventInstanceID))
700 restSubscription.AddXappIdToE2Id(xAppEventInstanceID, e2EventInstanceID)
702 // Send notification to xApp that a Subscription Request has been processed.
703 resp := &models.SubscriptionResponse{
704 SubscriptionID: restSubId,
705 SubscriptionInstances: []*models.SubscriptionInstance{
706 &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
708 XappEventInstanceID: &xAppEventInstanceID},
711 // Mark REST subscription request processesd.
712 restSubscription.SetProcessed(nil)
713 c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
714 xapp.Logger.Debug("Sending successful REST notification to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
715 clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
717 c.UpdateCounter(cRestSubNotifToXapp)
718 xapp.Subscription.Notify(resp, *clientEndpoint)
720 // E2 is down. Delete completely processed request safely now
721 if c.e2IfState.IsE2ConnectionUp(&restSubscription.Meid) == false && restSubscription.SubReqOngoing == false {
722 c.registry.DeleteRESTSubscription(restSubId)
723 c.RemoveRESTSubscriptionFromDb(*restSubId)
727 //-------------------------------------------------------------------
729 //-------------------------------------------------------------------
730 func (c *Control) RESTSubscriptionDeleteHandler(restSubId string) int {
733 c.UpdateCounter(cRestSubDelReqFromXapp)
735 xapp.Logger.Debug("SubscriptionDeleteRequest from XAPP")
737 restSubscription, err := c.registry.GetRESTSubscription(restSubId, true)
739 xapp.Logger.Error("%s", err.Error())
740 if restSubscription == nil {
741 // Subscription was not found
742 c.UpdateCounter(cRestSubDelRespToXapp)
743 return common.UnsubscribeNoContentCode
745 if restSubscription.SubReqOngoing == true {
746 err := fmt.Errorf("Handling of the REST Subscription Request still ongoing %s", restSubId)
747 xapp.Logger.Error("%s", err.Error())
748 c.UpdateCounter(cRestSubDelFailToXapp)
749 return common.UnsubscribeBadRequestCode
750 } else if restSubscription.SubDelReqOngoing == true {
751 // Previous request for same restSubId still ongoing
752 c.UpdateCounter(cRestSubDelRespToXapp)
753 return common.UnsubscribeNoContentCode
758 xAppRmrEndPoint := restSubscription.xAppRmrEndPoint
760 xapp.Logger.Debug("Deleteting handler: processing instances = %v", restSubscription.InstanceIds)
761 for _, instanceId := range restSubscription.InstanceIds {
762 xAppEventInstanceID, err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId)
765 xapp.Logger.Error("%s", err.Error())
767 xapp.Logger.Debug("Deleteting instanceId = %v", instanceId)
768 restSubscription.DeleteXappIdToE2Id(xAppEventInstanceID)
769 restSubscription.DeleteE2InstanceId(instanceId)
771 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(restSubscription.lastReqMd5sum)
772 c.registry.DeleteRESTSubscription(&restSubId)
773 c.RemoveRESTSubscriptionFromDb(restSubId)
776 c.UpdateCounter(cRestSubDelRespToXapp)
777 return common.UnsubscribeNoContentCode
780 //-------------------------------------------------------------------
782 //-------------------------------------------------------------------
783 func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32) (int64, error) {
785 var xAppEventInstanceID int64
786 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{instanceId})
788 xapp.Logger.Debug("Subscription Delete Handler subscription for restSubId=%v, E2EventInstanceID=%v not found %s",
789 restSubId, instanceId, idstring(err, nil))
790 return xAppEventInstanceID, nil
793 xAppEventInstanceID = int64(subs.ReqId.Id)
794 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(*endPoint), *restSubId, e2ap.RequestId{subs.ReqId.Id, 0}, &xapp.RMRMeid{RanName: *meid})
796 err := fmt.Errorf("XAPP-SubDelReq transaction not created. restSubId %s, endPoint %s, meid %s, instanceId %v", *restSubId, *endPoint, *meid, instanceId)
797 xapp.Logger.Error("%s", err.Error())
799 defer trans.Release()
801 err = c.tracker.Track(trans)
803 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
804 xapp.Logger.Error("%s", err.Error())
805 return xAppEventInstanceID, &time.ParseError{}
810 subs.OngoingDelCount++
811 go c.handleSubscriptionDelete(subs, trans)
812 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
813 subs.OngoingDelCount--
815 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
817 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
819 return xAppEventInstanceID, nil
822 //-------------------------------------------------------------------
824 //-------------------------------------------------------------------
825 func (c *Control) RESTQueryHandler() (models.SubscriptionList, error) {
826 xapp.Logger.Debug("RESTQueryHandler() called")
830 return c.registry.QueryHandler()
833 func (c *Control) TestRestHandler(w http.ResponseWriter, r *http.Request) {
834 xapp.Logger.Debug("RESTTestRestHandler() called")
836 pathParams := mux.Vars(r)
837 s := pathParams["testId"]
839 // This can be used to delete single subscription from db
840 if contains := strings.Contains(s, "deletesubid="); contains == true {
841 var splits = strings.Split(s, "=")
842 if subId, err := strconv.ParseInt(splits[1], 10, 64); err == nil {
843 xapp.Logger.Debug("RemoveSubscriptionFromSdl() called. subId = %v", subId)
844 c.RemoveSubscriptionFromSdl(uint32(subId))
849 // This can be used to remove all subscriptions db from
851 xapp.Logger.Debug("RemoveAllSubscriptionsFromSdl() called")
852 c.RemoveAllSubscriptionsFromSdl()
853 c.RemoveAllRESTSubscriptionsFromSdl()
857 // This is meant to cause submgr's restart in testing
859 xapp.Logger.Debug("os.Exit(1) called")
863 xapp.Logger.Debug("Unsupported rest command received %s", s)
866 //-------------------------------------------------------------------
868 //-------------------------------------------------------------------
870 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
871 params := &xapp.RMRParams{}
872 params.Mtype = trans.GetMtype()
873 params.SubId = int(subs.GetReqId().InstanceId)
875 params.Meid = subs.GetMeid()
877 params.PayloadLen = len(trans.Payload.Buf)
878 params.Payload = trans.Payload.Buf
880 xapp.Logger.Debug("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
881 err = c.SendWithRetry(params, false, 5)
883 xapp.Logger.Error("rmrSendToE2T: Send failed: %+v", err)
888 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
890 params := &xapp.RMRParams{}
891 params.Mtype = trans.GetMtype()
892 params.SubId = int(subs.GetReqId().InstanceId)
893 params.Xid = trans.GetXid()
894 params.Meid = trans.GetMeid()
896 params.PayloadLen = len(trans.Payload.Buf)
897 params.Payload = trans.Payload.Buf
899 xapp.Logger.Debug("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
900 err = c.SendWithRetry(params, false, 5)
902 xapp.Logger.Error("rmrSendToXapp: Send failed: %+v", err)
907 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
908 if c.RMRClient == nil {
909 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
910 xapp.Logger.Error("%s", err.Error())
915 defer c.RMRClient.Free(msg.Mbuf)
917 // xapp-frame might use direct access to c buffer and
918 // when msg.Mbuf is freed, someone might take it into use
919 // and payload data might be invalid inside message handle function
921 // subscriptions won't load system a lot so there is no
922 // real performance hit by cloning buffer into new go byte slice
923 cPay := append(msg.Payload[:0:0], msg.Payload...)
925 msg.PayloadLen = len(cPay)
928 case xapp.RIC_SUB_REQ:
929 go c.handleXAPPSubscriptionRequest(msg)
930 case xapp.RIC_SUB_RESP:
931 go c.handleE2TSubscriptionResponse(msg)
932 case xapp.RIC_SUB_FAILURE:
933 go c.handleE2TSubscriptionFailure(msg)
934 case xapp.RIC_SUB_DEL_REQ:
935 go c.handleXAPPSubscriptionDeleteRequest(msg)
936 case xapp.RIC_SUB_DEL_RESP:
937 go c.handleE2TSubscriptionDeleteResponse(msg)
938 case xapp.RIC_SUB_DEL_FAILURE:
939 go c.handleE2TSubscriptionDeleteFailure(msg)
941 xapp.Logger.Debug("Unknown Message Type '%d', discarding", msg.Mtype)
946 //-------------------------------------------------------------------
947 // handle from XAPP Subscription Request
948 //------------------------------------------------------------------
949 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
950 xapp.Logger.Debug("MSG from XAPP: %s", params.String())
951 c.UpdateCounter(cSubReqFromXapp)
953 if c.e2IfState.IsE2ConnectionUp(¶ms.Meid.RanName) == false {
954 xapp.Logger.Error("No E2 connection for ranName %v", params.Meid.RanName)
958 subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
960 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
964 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
966 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
969 defer trans.Release()
971 if err = c.tracker.Track(trans); err != nil {
972 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
976 //TODO handle subscription toward e2term inside AssignToSubscription / hide handleSubscriptionCreate in it?
977 subs, _, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c, true)
979 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
983 c.wakeSubscriptionRequest(subs, trans)
986 //-------------------------------------------------------------------
987 // Wake Subscription Request to E2node
988 //------------------------------------------------------------------
989 func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) {
991 e2SubscriptionDirectives, _ := c.GetE2SubscriptionDirectives(nil)
992 subs.OngoingReqCount++
993 go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives)
994 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
995 subs.OngoingReqCount--
998 switch themsg := event.(type) {
999 case *e2ap.E2APSubscriptionResponse:
1000 themsg.RequestId.Id = trans.RequestId.Id
1001 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
1004 c.UpdateCounter(cSubRespToXapp)
1005 c.rmrSendToXapp("", subs, trans)
1008 case *e2ap.E2APSubscriptionFailure:
1009 themsg.RequestId.Id = trans.RequestId.Id
1010 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
1012 c.UpdateCounter(cSubFailToXapp)
1013 c.rmrSendToXapp("", subs, trans)
1019 xapp.Logger.Debug("XAPP-SubReq: failed %s", idstring(err, trans, subs))
1020 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
1023 //-------------------------------------------------------------------
1024 // handle from XAPP Subscription Delete Request
1025 //------------------------------------------------------------------
1026 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
1027 xapp.Logger.Debug("MSG from XAPP: %s", params.String())
1028 c.UpdateCounter(cSubDelReqFromXapp)
1030 if c.e2IfState.IsE2ConnectionUp(¶ms.Meid.RanName) == false {
1031 xapp.Logger.Error("No E2 connection for ranName %v", params.Meid.RanName)
1035 subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
1037 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
1041 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
1043 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
1046 defer trans.Release()
1048 err = c.tracker.Track(trans)
1050 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
1054 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
1056 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
1063 subs.OngoingDelCount++
1064 go c.handleSubscriptionDelete(subs, trans)
1065 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
1066 subs.OngoingDelCount--
1068 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
1070 if subs.NoRespToXapp == true {
1071 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
1072 xapp.Logger.Debug("XAPP-SubDelReq: subs.NoRespToXapp == true")
1076 // Whatever is received success, fail or timeout, send successful delete response
1077 subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
1078 subDelRespMsg.RequestId.Id = trans.RequestId.Id
1079 subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
1080 subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
1081 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
1083 c.UpdateCounter(cSubDelRespToXapp)
1084 c.rmrSendToXapp("", subs, trans)
1087 //TODO handle subscription toward e2term insiged RemoveFromSubscription / hide handleSubscriptionDelete in it?
1088 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
1091 //-------------------------------------------------------------------
1092 // SUBS CREATE Handling
1093 //-------------------------------------------------------------------
1094 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp, e2SubscriptionDirectives *E2SubscriptionDirectives) {
1096 var event interface{} = nil
1097 var removeSubscriptionFromDb bool = false
1098 trans := c.tracker.NewSubsTransaction(subs)
1099 subs.WaitTransactionTurn(trans)
1100 defer subs.ReleaseTransactionTurn(trans)
1101 defer trans.Release()
1103 xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
1105 subRfMsg, valid := subs.GetCachedResponse()
1106 if subRfMsg == nil && valid == true {
1107 event = c.sendE2TSubscriptionRequest(subs, trans, parentTrans, e2SubscriptionDirectives)
1108 switch event.(type) {
1109 case *e2ap.E2APSubscriptionResponse:
1110 subRfMsg, valid = subs.SetCachedResponse(event, true)
1111 subs.SubRespRcvd = true
1112 case *e2ap.E2APSubscriptionFailure:
1113 removeSubscriptionFromDb = true
1114 subRfMsg, valid = subs.SetCachedResponse(event, false)
1115 xapp.Logger.Debug("SUBS-SubReq: internal delete due failure event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1116 case *SubmgrRestartTestEvent:
1117 // This simulates that no response has been received and after restart subscriptions are restored from db
1118 xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
1119 case *PackSubscriptionRequestErrortEvent, *SDLWriteErrortEvent:
1120 removeSubscriptionFromDb = true
1121 subRfMsg, valid = subs.SetCachedResponse(event, false)
1124 if subs.PolicyUpdate == false {
1125 xapp.Logger.Debug("SUBS-SubReq: internal delete due default event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1126 removeSubscriptionFromDb = true
1127 subRfMsg, valid = subs.SetCachedResponse(nil, false)
1128 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1131 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
1133 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
1136 err := c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
1138 subRfMsg, valid = subs.SetCachedResponse(event, false)
1139 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1142 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
1144 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
1147 parentTrans.SendEvent(subRfMsg, 0)
1150 //-------------------------------------------------------------------
1151 // SUBS DELETE Handling
1152 //-------------------------------------------------------------------
1154 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
1156 trans := c.tracker.NewSubsTransaction(subs)
1157 subs.WaitTransactionTurn(trans)
1158 defer subs.ReleaseTransactionTurn(trans)
1159 defer trans.Release()
1161 xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
1165 if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
1168 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1172 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
1173 // If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if
1174 // RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...))
1175 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
1176 c.registry.UpdateSubscriptionToDb(subs, c)
1177 parentTrans.SendEvent(nil, 0)
1180 //-------------------------------------------------------------------
1181 // send to E2T Subscription Request
1182 //-------------------------------------------------------------------
1183 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp, e2SubscriptionDirectives *E2SubscriptionDirectives) interface{} {
1185 var event interface{} = nil
1186 var timedOut bool = false
1187 const ricRequestorId = 123
1189 subReqMsg := subs.SubReqMsg
1190 subReqMsg.RequestId = subs.GetReqId().RequestId
1191 subReqMsg.RequestId.Id = ricRequestorId
1192 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
1194 xapp.Logger.Error("SUBS-SubReq ASN1 pack error: %s", idstring(err, trans, subs, parentTrans))
1195 return &PackSubscriptionRequestErrortEvent{
1197 ErrorSource: models.SubscriptionInstanceErrorSourceASN1,
1198 ErrorCause: err.Error(),
1203 // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
1204 err = c.WriteSubscriptionToDb(subs)
1206 return &SDLWriteErrortEvent{
1208 ErrorSource: models.SubscriptionInstanceErrorSourceDBAAS,
1209 ErrorCause: err.Error(),
1214 for retries := int64(0); retries < e2SubscriptionDirectives.E2MaxTryCount; retries++ {
1215 desc := fmt.Sprintf("(retry %d)", retries)
1217 c.UpdateCounter(cSubReqToE2)
1219 c.UpdateCounter(cSubReReqToE2)
1221 c.rmrSendToE2T(desc, subs, trans)
1222 if subs.DoNotWaitSubResp == false {
1223 event, timedOut = trans.WaitEvent(e2SubscriptionDirectives.E2TimeoutTimerValue)
1225 c.UpdateCounter(cSubReqTimerExpiry)
1229 // Simulating case where subscrition request has been sent but response has not been received before restart
1230 event = &SubmgrRestartTestEvent{}
1231 xapp.Logger.Debug("Restart event, DoNotWaitSubResp == true")
1235 xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1239 //-------------------------------------------------------------------
1240 // send to E2T Subscription Delete Request
1241 //-------------------------------------------------------------------
1243 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
1245 var event interface{}
1247 const ricRequestorId = 123
1249 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1250 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1251 subDelReqMsg.RequestId.Id = ricRequestorId
1252 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1253 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1255 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
1259 for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
1260 desc := fmt.Sprintf("(retry %d)", retries)
1262 c.UpdateCounter(cSubDelReqToE2)
1264 c.UpdateCounter(cSubDelReReqToE2)
1266 c.rmrSendToE2T(desc, subs, trans)
1267 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
1269 c.UpdateCounter(cSubDelReqTimerExpiry)
1274 xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1278 //-------------------------------------------------------------------
1279 // handle from E2T Subscription Response
1280 //-------------------------------------------------------------------
1281 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
1282 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1283 c.UpdateCounter(cSubRespFromE2)
1285 subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
1287 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
1290 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
1292 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
1295 trans := subs.GetTransaction()
1297 err = fmt.Errorf("Ongoing transaction not found")
1298 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
1301 sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
1302 if sendOk == false {
1303 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1304 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
1309 //-------------------------------------------------------------------
1310 // handle from E2T Subscription Failure
1311 //-------------------------------------------------------------------
1312 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
1313 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1314 c.UpdateCounter(cSubFailFromE2)
1315 subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
1317 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
1320 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
1322 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
1325 trans := subs.GetTransaction()
1327 err = fmt.Errorf("Ongoing transaction not found")
1328 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
1331 sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
1332 if sendOk == false {
1333 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1334 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
1339 //-------------------------------------------------------------------
1340 // handle from E2T Subscription Delete Response
1341 //-------------------------------------------------------------------
1342 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
1343 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1344 c.UpdateCounter(cSubDelRespFromE2)
1345 subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
1347 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1350 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
1352 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1355 trans := subs.GetTransaction()
1357 err = fmt.Errorf("Ongoing transaction not found")
1358 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
1361 sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
1362 if sendOk == false {
1363 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1364 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
1369 //-------------------------------------------------------------------
1370 // handle from E2T Subscription Delete Failure
1371 //-------------------------------------------------------------------
1372 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
1373 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1374 c.UpdateCounter(cSubDelFailFromE2)
1375 subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
1377 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1380 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
1382 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1385 trans := subs.GetTransaction()
1387 err = fmt.Errorf("Ongoing transaction not found")
1388 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
1391 sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
1392 if sendOk == false {
1393 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1394 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
1399 //-------------------------------------------------------------------
1401 //-------------------------------------------------------------------
1402 func typeofSubsMessage(v interface{}) string {
1407 //case *e2ap.E2APSubscriptionRequest:
1409 case *e2ap.E2APSubscriptionResponse:
1411 case *e2ap.E2APSubscriptionFailure:
1413 //case *e2ap.E2APSubscriptionDeleteRequest:
1414 // return "SubDelReq"
1415 case *e2ap.E2APSubscriptionDeleteResponse:
1417 case *e2ap.E2APSubscriptionDeleteFailure:
1424 //-------------------------------------------------------------------
1426 //-------------------------------------------------------------------
1427 func (c *Control) WriteSubscriptionToDb(subs *Subscription) error {
1428 xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
1429 err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
1431 xapp.Logger.Error("%v", err)
1437 //-------------------------------------------------------------------
1439 //-------------------------------------------------------------------
1440 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) error {
1442 if removeSubscriptionFromDb == true {
1443 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1444 c.RemoveSubscriptionFromDb(subs)
1446 // Update is needed for successful response and merge case here
1447 if subs.RetryFromXapp == false {
1448 err := c.WriteSubscriptionToDb(subs)
1452 subs.RetryFromXapp = false
1456 //-------------------------------------------------------------------
1458 //-------------------------------------------------------------------
1459 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
1460 xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
1461 err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
1463 xapp.Logger.Error("%v", err)
1467 //-------------------------------------------------------------------
1469 //-------------------------------------------------------------------
1470 func (c *Control) WriteRESTSubscriptionToDb(restSubId string, restSubs *RESTSubscription) {
1471 xapp.Logger.Debug("WriteRESTSubscriptionToDb() restSubId = %s", restSubId)
1472 err := c.WriteRESTSubscriptionToSdl(restSubId, restSubs)
1474 xapp.Logger.Error("%v", err)
1478 //-------------------------------------------------------------------
1480 //-------------------------------------------------------------------
1481 func (c *Control) UpdateRESTSubscriptionInDB(restSubId string, restSubs *RESTSubscription, removeRestSubscriptionFromDb bool) {
1483 if removeRestSubscriptionFromDb == true {
1484 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1485 c.RemoveRESTSubscriptionFromDb(restSubId)
1487 c.WriteRESTSubscriptionToDb(restSubId, restSubs)
1491 //-------------------------------------------------------------------
1493 //-------------------------------------------------------------------
1494 func (c *Control) RemoveRESTSubscriptionFromDb(restSubId string) {
1495 xapp.Logger.Debug("RemoveRESTSubscriptionFromDb() restSubId = %s", restSubId)
1496 err := c.RemoveRESTSubscriptionFromSdl(restSubId)
1498 xapp.Logger.Error("%v", err)
1502 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) {
1504 const ricRequestorId = 123
1505 xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
1507 // Send delete for every endpoint in the subscription
1508 if subs.PolicyUpdate == false {
1509 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1510 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1511 subDelReqMsg.RequestId.Id = ricRequestorId
1512 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1513 mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1515 xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
1518 for _, endPoint := range subs.EpList.Endpoints {
1519 params := &xapp.RMRParams{}
1520 params.Mtype = mType
1521 params.SubId = int(subs.GetReqId().InstanceId)
1523 params.Meid = subs.Meid
1524 params.Src = endPoint.String()
1525 params.PayloadLen = len(payload.Buf)
1526 params.Payload = payload.Buf
1528 subs.DeleteFromDb = true
1529 c.handleXAPPSubscriptionDeleteRequest(params)
1534 func (c *Control) PrintRESTSubscriptionRequest(p *models.SubscriptionParams) {
1536 fmt.Println("CRESTSubscriptionRequest")
1542 if p.SubscriptionID != "" {
1543 fmt.Println(" SubscriptionID = ", p.SubscriptionID)
1545 fmt.Println(" SubscriptionID = ''")
1548 fmt.Printf(" ClientEndpoint.Host = %s\n", p.ClientEndpoint.Host)
1550 if p.ClientEndpoint.HTTPPort != nil {
1551 fmt.Printf(" ClientEndpoint.HTTPPort = %v\n", *p.ClientEndpoint.HTTPPort)
1553 fmt.Println(" ClientEndpoint.HTTPPort = nil")
1556 if p.ClientEndpoint.RMRPort != nil {
1557 fmt.Printf(" ClientEndpoint.RMRPort = %v\n", *p.ClientEndpoint.RMRPort)
1559 fmt.Println(" ClientEndpoint.RMRPort = nil")
1563 fmt.Printf(" Meid = %s\n", *p.Meid)
1565 fmt.Println(" Meid = nil")
1568 if p.E2SubscriptionDirectives == nil {
1569 fmt.Println(" E2SubscriptionDirectives = nil")
1571 fmt.Println(" E2SubscriptionDirectives")
1572 if p.E2SubscriptionDirectives.E2RetryCount == nil {
1573 fmt.Println(" E2RetryCount == nil")
1575 fmt.Printf(" E2RetryCount = %v\n", *p.E2SubscriptionDirectives.E2RetryCount)
1577 fmt.Printf(" E2TimeoutTimerValue = %v\n", p.E2SubscriptionDirectives.E2TimeoutTimerValue)
1578 fmt.Printf(" RMRRoutingNeeded = %v\n", p.E2SubscriptionDirectives.RMRRoutingNeeded)
1580 for _, subscriptionDetail := range p.SubscriptionDetails {
1581 if p.RANFunctionID != nil {
1582 fmt.Printf(" RANFunctionID = %v\n", *p.RANFunctionID)
1584 fmt.Println(" RANFunctionID = nil")
1586 fmt.Printf(" SubscriptionDetail.XappEventInstanceID = %v\n", *subscriptionDetail.XappEventInstanceID)
1587 fmt.Printf(" SubscriptionDetail.EventTriggers = %v\n", subscriptionDetail.EventTriggers)
1589 for _, actionToBeSetup := range subscriptionDetail.ActionToBeSetupList {
1590 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionID = %v\n", *actionToBeSetup.ActionID)
1591 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionType = %s\n", *actionToBeSetup.ActionType)
1592 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionDefinition = %v\n", actionToBeSetup.ActionDefinition)
1594 if actionToBeSetup.SubsequentAction != nil {
1595 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.SubsequentAction.SubsequentActionType = %s\n", *actionToBeSetup.SubsequentAction.SubsequentActionType)
1596 fmt.Printf(" SubscriptionDetail.ActionToBeSetup..SubsequentAction.TimeToWait = %s\n", *actionToBeSetup.SubsequentAction.TimeToWait)
1598 fmt.Println(" SubscriptionDetail.ActionToBeSetup.SubsequentAction = nil")