2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
28 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
29 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
30 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
31 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/common"
32 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
33 httptransport "github.com/go-openapi/runtime/client"
34 "github.com/go-openapi/strfmt"
35 "github.com/segmentio/ksuid"
36 "github.com/spf13/viper"
39 //-----------------------------------------------------------------------------
41 //-----------------------------------------------------------------------------
43 func idstring(err error, entries ...fmt.Stringer) string {
44 var retval string = ""
45 var filler string = ""
46 for _, entry := range entries {
48 retval += filler + entry.String()
51 retval += filler + "(NIL)"
55 retval += filler + "err(" + err.Error() + ")"
61 //-----------------------------------------------------------------------------
63 //-----------------------------------------------------------------------------
65 var e2tSubReqTimeout time.Duration
66 var e2tSubDelReqTime time.Duration
67 var e2tRecvMsgTimeout time.Duration
68 var waitRouteCleanup_ms time.Duration
69 var e2tMaxSubReqTryCount uint64 // Initial try + retry
70 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
71 var checkE2State string
72 var readSubsFromDb string
73 var dbRetryForever string
81 restDuplicateCtrl *DuplicateCtrl
83 e2IfStateDb XappRnibInterface
85 restSubsDb Sdlnterface
88 Counters map[string]xapp.Counter
99 type SubmgrRestartTestEvent struct{}
100 type SubmgrRestartUpEvent struct{}
101 type PackSubscriptionRequestErrortEvent struct {
105 func (p *PackSubscriptionRequestErrortEvent) SetEvent(errorInfo *ErrorInfo) {
106 p.ErrorInfo = *errorInfo
109 type SDLWriteErrortEvent struct {
113 func (s *SDLWriteErrortEvent) SetEvent(errorInfo *ErrorInfo) {
114 s.ErrorInfo = *errorInfo
118 xapp.Logger.Debug("SUBMGR")
120 viper.SetEnvPrefix("submgr")
121 viper.AllowEmptyEnv(true)
124 func NewControl() *Control {
126 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
127 rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
129 registry := new(Registry)
130 registry.Initialize()
131 registry.rtmgrClient = &rtmgrClient
133 tracker := new(Tracker)
136 restDuplicateCtrl := new(DuplicateCtrl)
137 restDuplicateCtrl.Init()
139 e2IfState := new(E2IfState)
141 c := &Control{e2ap: new(E2ap),
144 restDuplicateCtrl: restDuplicateCtrl,
145 e2IfState: e2IfState,
146 e2IfStateDb: CreateXappRnibIfInstance(),
147 e2SubsDb: CreateSdl(),
148 restSubsDb: CreateRESTSdl(),
149 Counters: xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
154 c.ReadConfigParameters("")
156 // Register REST handler for testing support
157 xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET")
158 xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
159 xapp.Resource.InjectRoute("/ric/v1/restsubscriptions", c.GetAllRestSubscriptions, "GET")
161 xapp.Resource.InjectRoute("/ric/v1/get_all_e2nodes", c.GetAllE2Nodes, "GET")
162 xapp.Resource.InjectRoute("/ric/v1/get_e2node_rest_subscriptions/{ranName}", c.GetAllE2NodeRestSubscriptions, "GET")
164 xapp.Resource.InjectRoute("/ric/v1/get_all_xapps", c.GetAllXapps, "GET")
165 xapp.Resource.InjectRoute("/ric/v1/get_xapp_rest_restsubscriptions/{xappServiceName}", c.GetAllXappRestSubscriptions, "GET")
166 xapp.Resource.InjectRoute("/ric/v1/get_e2subscriptions/{restId}", c.GetE2Subscriptions, "GET")
168 xapp.Resource.InjectRoute("/ric/v1/delete_all_e2node_subscriptions/{ranName}", c.DeleteAllE2nodeSubscriptions, "DELETE")
169 xapp.Resource.InjectRoute("/ric/v1/delete_all_xapp_subscriptions/{xappServiceName}", c.DeleteAllXappSubscriptions, "DELETE")
171 if readSubsFromDb == "true" {
172 // Read subscriptions from db
173 c.ReadE2Subscriptions()
174 c.ReadRESTSubscriptions()
177 go xapp.Subscription.Listen(c.RESTSubscriptionHandler, c.RESTQueryHandler, c.RESTSubscriptionDeleteHandler)
181 func (c *Control) SymptomDataHandler(w http.ResponseWriter, r *http.Request) {
182 subscriptions, _ := c.registry.QueryHandler()
183 xapp.Resource.SendSymptomDataJson(w, r, subscriptions, "platform/subscriptions.json")
186 //-------------------------------------------------------------------
188 //-------------------------------------------------------------------
189 func (c *Control) RESTQueryHandler() (models.SubscriptionList, error) {
190 xapp.Logger.Debug("RESTQueryHandler() called")
194 return c.registry.QueryHandler()
197 //-------------------------------------------------------------------
199 //-------------------------------------------------------------------
200 func (c *Control) ReadE2Subscriptions() error {
203 var register map[uint32]*Subscription
204 for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
205 xapp.Logger.Debug("Reading E2 subscriptions from db")
206 subIds, register, err = c.ReadAllSubscriptionsFromSdl()
208 xapp.Logger.Error("%v", err)
209 <-time.After(1 * time.Second)
211 c.registry.subIds = subIds
212 c.registry.register = register
213 go c.HandleUncompletedSubscriptions(register)
217 xapp.Logger.Debug("Continuing without retring")
221 //-------------------------------------------------------------------
223 //-------------------------------------------------------------------
224 func (c *Control) ReadRESTSubscriptions() error {
226 xapp.Logger.Debug("ReadRESTSubscriptions()")
228 var restSubscriptions map[string]*RESTSubscription
229 for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
230 xapp.Logger.Debug("Reading REST subscriptions from db")
231 restSubscriptions, err = c.ReadAllRESTSubscriptionsFromSdl()
233 xapp.Logger.Error("%v", err)
234 <-time.After(1 * time.Second)
236 // Fix REST subscriptions ongoing status after restart
237 for restSubId, restSubscription := range restSubscriptions {
238 restSubscription.SubReqOngoing = false
239 restSubscription.SubDelReqOngoing = false
240 c.WriteRESTSubscriptionToSdl(restSubId, restSubscription)
242 c.registry.restSubscriptions = restSubscriptions
246 xapp.Logger.Debug("Continuing without retring")
250 //-------------------------------------------------------------------
252 //-------------------------------------------------------------------
253 func (c *Control) ReadConfigParameters(f string) {
255 xapp.Logger.Debug("ReadConfigParameters")
257 c.LoggerLevel = int(xapp.Logger.GetLevel())
258 xapp.Logger.Info("LoggerLevel = %v", c.LoggerLevel)
259 c.e2ap.SetASN1DebugPrintStatus(c.LoggerLevel)
261 // viper.GetDuration returns nanoseconds
262 e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
263 if e2tSubReqTimeout == 0 {
264 e2tSubReqTimeout = 2000 * 1000000
265 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tSubReqTimeout")
267 xapp.Logger.Debug("e2tSubReqTimeout= %v", e2tSubReqTimeout)
269 e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
270 if e2tSubDelReqTime == 0 {
271 e2tSubDelReqTime = 2000 * 1000000
272 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tSubDelReqTime")
274 xapp.Logger.Debug("e2tSubDelReqTime= %v", e2tSubDelReqTime)
276 e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
277 if e2tRecvMsgTimeout == 0 {
278 e2tRecvMsgTimeout = 2000 * 1000000
279 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tRecvMsgTimeout")
281 xapp.Logger.Debug("e2tRecvMsgTimeout= %v", e2tRecvMsgTimeout)
283 e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
284 if e2tMaxSubReqTryCount == 0 {
285 e2tMaxSubReqTryCount = 1
286 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tMaxSubReqTryCount")
288 xapp.Logger.Debug("e2tMaxSubReqTryCount= %v", e2tMaxSubReqTryCount)
290 e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
291 if e2tMaxSubDelReqTryCount == 0 {
292 e2tMaxSubDelReqTryCount = 1
293 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tMaxSubDelReqTryCount")
295 xapp.Logger.Debug("e2tMaxSubDelReqTryCount= %v", e2tMaxSubDelReqTryCount)
297 checkE2State = viper.GetString("controls.checkE2State")
298 if checkE2State == "" {
299 checkE2State = "true"
300 xapp.Logger.Debug("WARNING: Using hard coded default value for checkE2State")
302 xapp.Logger.Debug("checkE2State= %v", checkE2State)
304 readSubsFromDb = viper.GetString("controls.readSubsFromDb")
305 if readSubsFromDb == "" {
306 readSubsFromDb = "true"
307 xapp.Logger.Debug("WARNING: Using hard coded default value for readSubsFromDb")
309 xapp.Logger.Debug("readSubsFromDb= %v", readSubsFromDb)
311 dbTryCount = viper.GetInt("controls.dbTryCount")
314 xapp.Logger.Debug("WARNING: Using hard coded default value for dbTryCount")
316 xapp.Logger.Debug("dbTryCount= %v", dbTryCount)
318 dbRetryForever = viper.GetString("controls.dbRetryForever")
319 if dbRetryForever == "" {
320 dbRetryForever = "true"
321 xapp.Logger.Debug("WARNING: Using hard coded default value for dbRetryForever")
323 xapp.Logger.Debug("dbRetryForever= %v", dbRetryForever)
325 // Internal cfg parameter, used to define a wait time for RMR route clean-up. None default
326 // value 100ms used currently only in unittests.
327 waitRouteCleanup_ms = viper.GetDuration("controls.waitRouteCleanup_ms") * 1000000
328 if waitRouteCleanup_ms == 0 {
329 waitRouteCleanup_ms = 5000 * 1000000
330 xapp.Logger.Debug("WARNING: Using hard coded default value for waitRouteCleanup_ms")
332 xapp.Logger.Debug("waitRouteCleanup= %v", waitRouteCleanup_ms)
335 //-------------------------------------------------------------------
337 //-------------------------------------------------------------------
338 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
340 xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
341 for subId, subs := range register {
342 if subs.SubRespRcvd == false {
343 // If policy subscription has already been made successfully unsuccessful update should not be deleted.
344 if subs.PolicyUpdate == false {
345 subs.NoRespToXapp = true
346 xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
347 c.SendSubscriptionDeleteReq(subs)
353 func (c *Control) ReadyCB(data interface{}) {
354 if c.RMRClient == nil {
355 c.RMRClient = xapp.Rmr
359 func (c *Control) Run() {
360 xapp.SetReadyCB(c.ReadyCB, nil)
361 xapp.AddConfigChangeListener(c.ReadConfigParameters)
365 //-------------------------------------------------------------------
367 //-------------------------------------------------------------------
368 func (c *Control) GetOrCreateRestSubscription(p *models.SubscriptionParams, md5sum string, xAppRmrEndpoint string, xAppServiceName string) (*RESTSubscription, string, error) {
371 var restSubscription *RESTSubscription
374 prevRestSubsId, exists := c.restDuplicateCtrl.GetLastKnownRestSubsIdBasedOnMd5sum(md5sum)
375 if p.SubscriptionID == "" {
376 // Subscription does not contain REST subscription Id
378 restSubscription, err = c.registry.GetRESTSubscription(prevRestSubsId, false)
379 if restSubscription != nil {
380 // Subscription not found
381 restSubId = prevRestSubsId
383 xapp.Logger.Debug("Existing restSubId %s found by MD5sum %s for a request without subscription ID - using previous subscription", prevRestSubsId, md5sum)
385 xapp.Logger.Debug("Existing restSubId %s found by MD5sum %s for a request without subscription ID - Note: %s", prevRestSubsId, md5sum, err.Error())
388 xapp.Logger.Debug("None existing restSubId %s referred by MD5sum %s for a request without subscription ID - deleting cached entry", prevRestSubsId, md5sum)
389 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
393 if restSubscription == nil {
394 restSubId = ksuid.New().String()
395 restSubscription = c.registry.CreateRESTSubscription(&restSubId, &xAppServiceName, &xAppRmrEndpoint, p.Meid)
398 // Subscription contains REST subscription Id
399 restSubId = p.SubscriptionID
401 xapp.Logger.Debug("RestSubscription ID %s provided via REST request", restSubId)
402 restSubscription, err = c.registry.GetRESTSubscription(restSubId, false)
404 // Subscription with id in REST request does not exist
405 xapp.Logger.Error("%s", err.Error())
406 c.UpdateCounter(cRestSubFailToXapp)
411 xapp.Logger.Debug("Existing restSubscription found for ID %s, new request based on md5sum", restSubId)
413 xapp.Logger.Debug("Existing restSubscription found for ID %s(%s), re-transmission based on md5sum match with previous request", prevRestSubsId, restSubId)
417 return restSubscription, restSubId, nil
420 //-------------------------------------------------------------------
422 //-------------------------------------------------------------------
423 func (c *Control) RESTSubscriptionHandler(params interface{}) (*models.SubscriptionResponse, int) {
426 c.UpdateCounter(cRestSubReqFromXapp)
428 subResp := models.SubscriptionResponse{}
429 p := params.(*models.SubscriptionParams)
431 if c.LoggerLevel > 2 {
432 c.PrintRESTSubscriptionRequest(p)
435 if c.e2IfState.IsE2ConnectionUp(p.Meid) == false {
436 xapp.Logger.Error("No E2 connection for ranName %v", *p.Meid)
437 c.UpdateCounter(cRestReqRejDueE2Down)
438 return nil, common.SubscribeServiceUnavailableCode
441 if p.ClientEndpoint == nil {
442 err := fmt.Errorf("ClientEndpoint == nil")
443 xapp.Logger.Error("%v", err)
444 c.UpdateCounter(cRestSubFailToXapp)
445 return nil, common.SubscribeBadRequestCode
448 _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*p.ClientEndpoint)
450 xapp.Logger.Error("%s", err.Error())
451 c.UpdateCounter(cRestSubFailToXapp)
452 return nil, common.SubscribeBadRequestCode
455 md5sum, err := CalculateRequestMd5sum(params)
457 xapp.Logger.Error("Failed to generate md5sum from incoming request - %s", err.Error())
460 restSubscription, restSubId, err := c.GetOrCreateRestSubscription(p, md5sum, xAppRmrEndpoint, p.ClientEndpoint.Host)
462 xapp.Logger.Error("Subscription with id in REST request does not exist")
463 return nil, common.SubscribeNotFoundCode
466 subResp.SubscriptionID = &restSubId
467 subReqList := e2ap.SubscriptionRequestList{}
468 err = c.e2ap.FillSubscriptionReqMsgs(params, &subReqList, restSubscription)
470 xapp.Logger.Error("%s", err.Error())
471 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
472 c.registry.DeleteRESTSubscription(&restSubId)
473 c.UpdateCounter(cRestSubFailToXapp)
474 return nil, common.SubscribeBadRequestCode
477 duplicate := c.restDuplicateCtrl.IsDuplicateToOngoingTransaction(restSubId, md5sum)
479 err := fmt.Errorf("Retransmission blocker direct ACK for request of restSubsId %s restSubId MD5sum %s as retransmission", restSubId, md5sum)
480 xapp.Logger.Debug("%s", err)
481 c.registry.DeleteRESTSubscription(&restSubId)
482 c.UpdateCounter(cRestSubRespToXapp)
483 return &subResp, common.SubscribeCreatedCode
486 c.WriteRESTSubscriptionToDb(restSubId, restSubscription)
487 e2SubscriptionDirectives, err := c.GetE2SubscriptionDirectives(p)
489 xapp.Logger.Error("%s", err)
490 c.registry.DeleteRESTSubscription(&restSubId)
491 return nil, common.SubscribeBadRequestCode
493 go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId, xAppRmrEndpoint, md5sum, e2SubscriptionDirectives)
495 c.UpdateCounter(cRestSubRespToXapp)
496 return &subResp, common.SubscribeCreatedCode
499 //-------------------------------------------------------------------
501 //-------------------------------------------------------------------
502 func (c *Control) GetE2SubscriptionDirectives(p *models.SubscriptionParams) (*E2SubscriptionDirectives, error) {
504 e2SubscriptionDirectives := &E2SubscriptionDirectives{}
505 if p == nil || p.E2SubscriptionDirectives == nil {
506 e2SubscriptionDirectives.E2TimeoutTimerValue = e2tSubReqTimeout
507 e2SubscriptionDirectives.E2MaxTryCount = int64(e2tMaxSubReqTryCount)
508 e2SubscriptionDirectives.CreateRMRRoute = true
509 xapp.Logger.Debug("p == nil || p.E2SubscriptionDirectives == nil. Using default values for E2TimeoutTimerValue = %v and E2RetryCount = %v RMRRoutingNeeded = true", e2tSubReqTimeout, e2tMaxSubReqTryCount)
511 if p.E2SubscriptionDirectives.E2TimeoutTimerValue >= 1 && p.E2SubscriptionDirectives.E2TimeoutTimerValue <= 10 {
512 e2SubscriptionDirectives.E2TimeoutTimerValue = time.Duration(p.E2SubscriptionDirectives.E2TimeoutTimerValue) * 1000000000 // Duration type cast returns nano seconds
514 return nil, fmt.Errorf("p.E2SubscriptionDirectives.E2TimeoutTimerValue out of range (1-10 seconds): %v", p.E2SubscriptionDirectives.E2TimeoutTimerValue)
516 if p.E2SubscriptionDirectives.E2RetryCount == nil {
517 xapp.Logger.Error("p.E2SubscriptionDirectives.E2RetryCount == nil. Using default value")
518 e2SubscriptionDirectives.E2MaxTryCount = int64(e2tMaxSubReqTryCount)
520 if *p.E2SubscriptionDirectives.E2RetryCount >= 0 && *p.E2SubscriptionDirectives.E2RetryCount <= 10 {
521 e2SubscriptionDirectives.E2MaxTryCount = *p.E2SubscriptionDirectives.E2RetryCount + 1 // E2MaxTryCount = First sending plus two retries
523 return nil, fmt.Errorf("p.E2SubscriptionDirectives.E2RetryCount out of range (0-10): %v", *p.E2SubscriptionDirectives.E2RetryCount)
526 e2SubscriptionDirectives.CreateRMRRoute = p.E2SubscriptionDirectives.RMRRoutingNeeded
528 xapp.Logger.Debug("e2SubscriptionDirectives.E2TimeoutTimerValue: %v", e2SubscriptionDirectives.E2TimeoutTimerValue)
529 xapp.Logger.Debug("e2SubscriptionDirectives.E2MaxTryCount: %v", e2SubscriptionDirectives.E2MaxTryCount)
530 xapp.Logger.Debug("e2SubscriptionDirectives.CreateRMRRoute: %v", e2SubscriptionDirectives.CreateRMRRoute)
531 return e2SubscriptionDirectives, nil
534 //-------------------------------------------------------------------
536 //-------------------------------------------------------------------
538 func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription, subReqList *e2ap.SubscriptionRequestList,
539 clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string, xAppRmrEndpoint string, md5sum string, e2SubscriptionDirectives *E2SubscriptionDirectives) {
541 c.SubscriptionProcessingStartDelay()
542 xapp.Logger.Debug("E2 SubscriptionRequest count = %v ", len(subReqList.E2APSubscriptionRequests))
544 var xAppEventInstanceID int64
545 var e2EventInstanceID int64
546 errorInfo := &ErrorInfo{}
548 defer c.restDuplicateCtrl.SetMd5sumFromLastOkRequest(*restSubId, md5sum)
550 for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ {
551 subReqMsg := subReqList.E2APSubscriptionRequests[index]
552 xAppEventInstanceID = (int64)(subReqMsg.RequestId.Id)
554 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), *restSubId, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid})
556 // Send notification to xApp that prosessing of a Subscription Request has failed.
557 err := fmt.Errorf("Tracking failure")
558 errorInfo.ErrorCause = err.Error()
559 c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans, errorInfo)
563 xapp.Logger.Debug("Handle SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
565 subRespMsg, errorInfo, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, *restSubId, e2SubscriptionDirectives)
567 xapp.Logger.Debug("Handled SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
571 if err.Error() == "TEST: restart event received" {
572 // This is just for UT cases. Stop here subscription processing
575 c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans, errorInfo)
577 e2EventInstanceID = (int64)(subRespMsg.RequestId.InstanceId)
578 restSubscription.AddMd5Sum(md5sum)
579 xapp.Logger.Debug("SubscriptionRequest index=%v processed successfullyfor %s. endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
580 index, *restSubId, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
581 c.sendSuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, e2EventInstanceID, clientEndpoint, trans)
586 //-------------------------------------------------------------------
588 //------------------------------------------------------------------
589 func (c *Control) SubscriptionProcessingStartDelay() {
590 if c.UTTesting == true {
591 // This is temporary fix for the UT problem that notification arrives before subscription response
592 // Correct fix would be to allow notification come before response and process it correctly
593 xapp.Logger.Debug("Setting 50 ms delay before starting processing Subscriptions")
594 <-time.After(time.Millisecond * 50)
595 xapp.Logger.Debug("Continuing after delay")
599 //-------------------------------------------------------------------
601 //------------------------------------------------------------------
602 func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, meid *string,
603 restSubId string, e2SubscriptionDirectives *E2SubscriptionDirectives) (*e2ap.E2APSubscriptionResponse, *ErrorInfo, error) {
605 errorInfo := ErrorInfo{}
607 err := c.tracker.Track(trans)
609 xapp.Logger.Error("XAPP-SubReq Tracking error: %s", idstring(err, trans))
610 errorInfo.ErrorCause = err.Error()
611 err = fmt.Errorf("Tracking failure")
612 return nil, &errorInfo, err
615 subs, errorInfo, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c, e2SubscriptionDirectives.CreateRMRRoute)
617 xapp.Logger.Error("XAPP-SubReq Assign error: %s", idstring(err, trans))
618 return nil, &errorInfo, err
624 subs.OngoingReqCount++
625 go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives, 0)
626 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
627 subs.OngoingReqCount--
631 switch themsg := event.(type) {
632 case *e2ap.E2APSubscriptionResponse:
634 if c.e2IfState.IsE2ConnectionUp(meid) == true {
635 return themsg, &errorInfo, nil
637 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
638 c.RemoveSubscriptionFromDb(subs)
639 err = fmt.Errorf("E2 interface down")
640 errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
642 case *e2ap.E2APSubscriptionFailure:
643 err = fmt.Errorf("E2 SubscriptionFailure received")
644 errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
645 case *PackSubscriptionRequestErrortEvent:
646 err = fmt.Errorf("E2 SubscriptionRequest pack failure")
647 errorInfo = themsg.ErrorInfo
648 case *SDLWriteErrortEvent:
649 err = fmt.Errorf("SDL write failure")
650 errorInfo = themsg.ErrorInfo
651 case *SubmgrRestartTestEvent:
652 err = fmt.Errorf("TEST: restart event received")
653 xapp.Logger.Debug("%s", err)
654 return nil, &errorInfo, err
656 err = fmt.Errorf("Unexpected E2 subscription response received")
657 errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
662 err = fmt.Errorf("E2 subscription response timeout")
663 errorInfo.SetInfo(err.Error(), "", models.SubscriptionInstanceTimeoutTypeE2Timeout)
664 if subs.PolicyUpdate == true {
665 return nil, &errorInfo, err
669 xapp.Logger.Error("XAPP-SubReq E2 subscription failed %s", idstring(err, trans, subs))
670 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
671 return nil, &errorInfo, err
674 //-------------------------------------------------------------------
676 //-------------------------------------------------------------------
677 func (c *Control) sendUnsuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, err error,
678 clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp, errorInfo *ErrorInfo) {
680 // Send notification to xApp that prosessing of a Subscription Request has failed.
681 e2EventInstanceID := (int64)(0)
682 if errorInfo.ErrorSource == "" {
683 // Submgr is default source of error
684 errorInfo.ErrorSource = models.SubscriptionInstanceErrorSourceSUBMGR
686 resp := &models.SubscriptionResponse{
687 SubscriptionID: restSubId,
688 SubscriptionInstances: []*models.SubscriptionInstance{
689 &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
690 ErrorCause: errorInfo.ErrorCause,
691 ErrorSource: errorInfo.ErrorSource,
692 TimeoutType: errorInfo.TimeoutType,
693 XappEventInstanceID: &xAppEventInstanceID},
696 // Mark REST subscription request processed.
697 restSubscription.SetProcessed(err)
698 c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
700 xapp.Logger.Debug("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
701 errorInfo.ErrorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
703 xapp.Logger.Debug("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v",
704 errorInfo.ErrorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID)
707 c.UpdateCounter(cRestSubFailNotifToXapp)
708 xapp.Subscription.Notify(resp, *clientEndpoint)
710 // E2 is down. Delete completely processed request safely now
711 if c.e2IfState.IsE2ConnectionUp(&restSubscription.Meid) == false && restSubscription.SubReqOngoing == false {
712 c.registry.DeleteRESTSubscription(restSubId)
713 c.RemoveRESTSubscriptionFromDb(*restSubId)
717 //-------------------------------------------------------------------
719 //-------------------------------------------------------------------
720 func (c *Control) sendSuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, e2EventInstanceID int64,
721 clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp) {
723 // Store successfully processed InstanceId for deletion
724 restSubscription.AddE2InstanceId((uint32)(e2EventInstanceID))
725 restSubscription.AddXappIdToE2Id(xAppEventInstanceID, e2EventInstanceID)
727 // Send notification to xApp that a Subscription Request has been processed.
728 resp := &models.SubscriptionResponse{
729 SubscriptionID: restSubId,
730 SubscriptionInstances: []*models.SubscriptionInstance{
731 &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
733 XappEventInstanceID: &xAppEventInstanceID},
736 // Mark REST subscription request processesd.
737 restSubscription.SetProcessed(nil)
738 c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
739 xapp.Logger.Debug("Sending successful REST notification to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
740 clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
742 c.UpdateCounter(cRestSubNotifToXapp)
743 xapp.Subscription.Notify(resp, *clientEndpoint)
745 // E2 is down. Delete completely processed request safely now
746 if c.e2IfState.IsE2ConnectionUp(&restSubscription.Meid) == false && restSubscription.SubReqOngoing == false {
747 c.registry.DeleteRESTSubscription(restSubId)
748 c.RemoveRESTSubscriptionFromDb(*restSubId)
752 //-------------------------------------------------------------------
754 //-------------------------------------------------------------------
755 func (c *Control) RESTSubscriptionDeleteHandler(restSubId string) int {
758 c.UpdateCounter(cRestSubDelReqFromXapp)
760 xapp.Logger.Debug("SubscriptionDeleteRequest from XAPP")
762 restSubscription, err := c.registry.GetRESTSubscription(restSubId, true)
764 xapp.Logger.Error("%s", err.Error())
765 if restSubscription == nil {
766 // Subscription was not found
767 c.UpdateCounter(cRestSubDelRespToXapp)
768 return common.UnsubscribeNoContentCode
770 if restSubscription.SubReqOngoing == true {
771 err := fmt.Errorf("Handling of the REST Subscription Request still ongoing %s", restSubId)
772 xapp.Logger.Error("%s", err.Error())
773 c.UpdateCounter(cRestSubDelFailToXapp)
774 return common.UnsubscribeBadRequestCode
775 } else if restSubscription.SubDelReqOngoing == true {
776 // Previous request for same restSubId still ongoing
777 c.UpdateCounter(cRestSubDelRespToXapp)
778 return common.UnsubscribeNoContentCode
783 xAppRmrEndPoint := restSubscription.xAppRmrEndPoint
785 xapp.Logger.Debug("Deleteting handler: processing instances = %v", restSubscription.InstanceIds)
786 for _, instanceId := range restSubscription.InstanceIds {
787 xAppEventInstanceID, err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId, 0)
790 xapp.Logger.Error("%s", err.Error())
792 xapp.Logger.Debug("Deleteting instanceId = %v", instanceId)
793 restSubscription.DeleteXappIdToE2Id(xAppEventInstanceID)
794 restSubscription.DeleteE2InstanceId(instanceId)
796 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(restSubscription.lastReqMd5sum)
797 c.registry.DeleteRESTSubscription(&restSubId)
798 c.RemoveRESTSubscriptionFromDb(restSubId)
801 c.UpdateCounter(cRestSubDelRespToXapp)
802 return common.UnsubscribeNoContentCode
805 //-------------------------------------------------------------------
807 //-------------------------------------------------------------------
808 func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32, waitRouteCleanupTime time.Duration) (int64, error) {
810 var xAppEventInstanceID int64
811 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{instanceId})
813 xapp.Logger.Debug("Subscription Delete Handler subscription for restSubId=%v, E2EventInstanceID=%v not found %s",
814 restSubId, instanceId, idstring(err, nil))
815 return xAppEventInstanceID, nil
818 xAppEventInstanceID = int64(subs.ReqId.Id)
819 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(*endPoint), *restSubId, e2ap.RequestId{subs.ReqId.Id, 0}, &xapp.RMRMeid{RanName: *meid})
821 err := fmt.Errorf("XAPP-SubDelReq transaction not created. restSubId %s, endPoint %s, meid %s, instanceId %v", *restSubId, *endPoint, *meid, instanceId)
822 xapp.Logger.Error("%s", err.Error())
824 defer trans.Release()
826 err = c.tracker.Track(trans)
828 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
829 xapp.Logger.Error("%s", err.Error())
830 return xAppEventInstanceID, &time.ParseError{}
835 subs.OngoingDelCount++
836 go c.handleSubscriptionDelete(subs, trans, waitRouteCleanupTime)
837 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
838 subs.OngoingDelCount--
840 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
842 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
844 return xAppEventInstanceID, nil
847 //-------------------------------------------------------------------
849 //-------------------------------------------------------------------
851 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
852 params := &xapp.RMRParams{}
853 params.Mtype = trans.GetMtype()
854 params.SubId = int(subs.GetReqId().InstanceId)
856 params.Meid = subs.GetMeid()
858 params.PayloadLen = len(trans.Payload.Buf)
859 params.Payload = trans.Payload.Buf
861 xapp.Logger.Debug("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
862 err = c.SendWithRetry(params, false, 5)
864 xapp.Logger.Error("rmrSendToE2T: Send failed: %+v", err)
869 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
871 params := &xapp.RMRParams{}
872 params.Mtype = trans.GetMtype()
873 params.SubId = int(subs.GetReqId().InstanceId)
874 params.Xid = trans.GetXid()
875 params.Meid = trans.GetMeid()
877 params.PayloadLen = len(trans.Payload.Buf)
878 params.Payload = trans.Payload.Buf
880 xapp.Logger.Debug("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
881 err = c.SendWithRetry(params, false, 5)
883 xapp.Logger.Error("rmrSendToXapp: Send failed: %+v", err)
888 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
889 if c.RMRClient == nil {
890 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
891 xapp.Logger.Error("%s", err.Error())
896 defer c.RMRClient.Free(msg.Mbuf)
898 // xapp-frame might use direct access to c buffer and
899 // when msg.Mbuf is freed, someone might take it into use
900 // and payload data might be invalid inside message handle function
902 // subscriptions won't load system a lot so there is no
903 // real performance hit by cloning buffer into new go byte slice
904 cPay := append(msg.Payload[:0:0], msg.Payload...)
906 msg.PayloadLen = len(cPay)
909 case xapp.RIC_SUB_REQ:
910 go c.handleXAPPSubscriptionRequest(msg)
911 case xapp.RIC_SUB_RESP:
912 go c.handleE2TSubscriptionResponse(msg)
913 case xapp.RIC_SUB_FAILURE:
914 go c.handleE2TSubscriptionFailure(msg)
915 case xapp.RIC_SUB_DEL_REQ:
916 go c.handleXAPPSubscriptionDeleteRequest(msg)
917 case xapp.RIC_SUB_DEL_RESP:
918 go c.handleE2TSubscriptionDeleteResponse(msg)
919 case xapp.RIC_SUB_DEL_FAILURE:
920 go c.handleE2TSubscriptionDeleteFailure(msg)
922 xapp.Logger.Debug("Unknown Message Type '%d', discarding", msg.Mtype)
927 //-------------------------------------------------------------------
928 // handle from XAPP Subscription Request
929 //------------------------------------------------------------------
930 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
931 xapp.Logger.Debug("MSG from XAPP: %s", params.String())
932 c.UpdateCounter(cSubReqFromXapp)
934 if c.e2IfState.IsE2ConnectionUp(¶ms.Meid.RanName) == false {
935 xapp.Logger.Error("No E2 connection for ranName %v", params.Meid.RanName)
939 subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
941 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
945 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
947 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
950 defer trans.Release()
952 if err = c.tracker.Track(trans); err != nil {
953 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
957 subs, _, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c, true)
959 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
963 c.wakeSubscriptionRequest(subs, trans)
966 //-------------------------------------------------------------------
967 // Wake Subscription Request to E2node
968 //------------------------------------------------------------------
969 func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) {
971 e2SubscriptionDirectives, _ := c.GetE2SubscriptionDirectives(nil)
972 subs.OngoingReqCount++
973 go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives, waitRouteCleanup_ms)
974 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
975 subs.OngoingReqCount--
978 switch themsg := event.(type) {
979 case *e2ap.E2APSubscriptionResponse:
980 themsg.RequestId.Id = trans.RequestId.Id
981 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
984 c.UpdateCounter(cSubRespToXapp)
985 c.rmrSendToXapp("", subs, trans)
988 case *e2ap.E2APSubscriptionFailure:
989 themsg.RequestId.Id = trans.RequestId.Id
990 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
992 c.UpdateCounter(cSubFailToXapp)
993 c.rmrSendToXapp("", subs, trans)
999 xapp.Logger.Debug("XAPP-SubReq: failed %s", idstring(err, trans, subs))
1002 //-------------------------------------------------------------------
1003 // handle from XAPP Subscription Delete Request
1004 //------------------------------------------------------------------
1005 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
1006 xapp.Logger.Debug("MSG from XAPP: %s", params.String())
1007 c.UpdateCounter(cSubDelReqFromXapp)
1009 if c.e2IfState.IsE2ConnectionUp(¶ms.Meid.RanName) == false {
1010 xapp.Logger.Error("No E2 connection for ranName %v", params.Meid.RanName)
1014 subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
1016 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
1020 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
1022 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
1025 defer trans.Release()
1027 err = c.tracker.Track(trans)
1029 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
1033 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
1035 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
1042 subs.OngoingDelCount++
1043 go c.handleSubscriptionDelete(subs, trans, waitRouteCleanup_ms)
1044 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
1045 subs.OngoingDelCount--
1047 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
1049 if subs.NoRespToXapp == true {
1050 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
1051 xapp.Logger.Debug("XAPP-SubDelReq: subs.NoRespToXapp == true")
1055 // Whatever is received success, fail or timeout, send successful delete response
1056 subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
1057 subDelRespMsg.RequestId.Id = trans.RequestId.Id
1058 subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
1059 subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
1060 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
1062 c.UpdateCounter(cSubDelRespToXapp)
1063 c.rmrSendToXapp("", subs, trans)
1067 //-------------------------------------------------------------------
1068 // SUBS CREATE Handling
1069 //-------------------------------------------------------------------
1070 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp, e2SubscriptionDirectives *E2SubscriptionDirectives, waitRouteCleanupTime time.Duration) {
1072 var event interface{} = nil
1073 var removeSubscriptionFromDb bool = false
1074 trans := c.tracker.NewSubsTransaction(subs)
1075 subs.WaitTransactionTurn(trans)
1076 defer subs.ReleaseTransactionTurn(trans)
1077 defer trans.Release()
1079 xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
1081 subRfMsg, valid := subs.GetCachedResponse()
1082 if subRfMsg == nil && valid == true {
1083 event = c.sendE2TSubscriptionRequest(subs, trans, parentTrans, e2SubscriptionDirectives)
1084 switch event.(type) {
1085 case *e2ap.E2APSubscriptionResponse:
1086 subRfMsg, valid = subs.SetCachedResponse(event, true)
1087 subs.SubRespRcvd = true
1088 case *e2ap.E2APSubscriptionFailure:
1089 subRfMsg, valid = subs.SetCachedResponse(event, false)
1090 xapp.Logger.Debug("SUBS-SubReq: internal delete due failure event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1091 case *SubmgrRestartTestEvent:
1092 // This is used to simulate that no response has been received and after restart, subscriptions are restored from db
1093 xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
1094 subRfMsg, valid = subs.SetCachedResponse(event, false)
1095 parentTrans.SendEvent(subRfMsg, 0)
1097 case *PackSubscriptionRequestErrortEvent, *SDLWriteErrortEvent:
1098 subRfMsg, valid = subs.SetCachedResponse(event, false)
1101 if subs.PolicyUpdate == false {
1102 xapp.Logger.Debug("SUBS-SubReq: internal delete due default event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1103 subRfMsg, valid = subs.SetCachedResponse(nil, false)
1104 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1106 subRfMsg, valid = subs.SetCachedResponse(nil, true)
1109 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
1111 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
1114 removeSubscriptionFromDb = true
1117 err := c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
1120 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1123 // Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
1125 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanupTime, c)
1128 parentTrans.SendEvent(subRfMsg, 0)
1131 //-------------------------------------------------------------------
1132 // SUBS DELETE Handling
1133 //-------------------------------------------------------------------
1135 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp, waitRouteCleanupTime time.Duration) {
1137 trans := c.tracker.NewSubsTransaction(subs)
1138 subs.WaitTransactionTurn(trans)
1139 defer subs.ReleaseTransactionTurn(trans)
1140 defer trans.Release()
1142 xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
1146 if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
1149 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1154 // Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
1155 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanupTime, c)
1156 parentTrans.SendEvent(nil, 0)
1159 //-------------------------------------------------------------------
1160 // send to E2T Subscription Request
1161 //-------------------------------------------------------------------
1162 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp, e2SubscriptionDirectives *E2SubscriptionDirectives) interface{} {
1164 var event interface{} = nil
1165 var timedOut bool = false
1166 const ricRequestorId = 123
1168 subReqMsg := subs.SubReqMsg
1169 subReqMsg.RequestId = subs.GetReqId().RequestId
1170 subReqMsg.RequestId.Id = ricRequestorId
1171 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
1173 xapp.Logger.Error("SUBS-SubReq ASN1 pack error: %s", idstring(err, trans, subs, parentTrans))
1174 return &PackSubscriptionRequestErrortEvent{
1176 ErrorSource: models.SubscriptionInstanceErrorSourceASN1,
1177 ErrorCause: err.Error(),
1182 // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
1183 err = c.WriteSubscriptionToDb(subs)
1185 return &SDLWriteErrortEvent{
1187 ErrorSource: models.SubscriptionInstanceErrorSourceDBAAS,
1188 ErrorCause: err.Error(),
1193 for retries := int64(0); retries < e2SubscriptionDirectives.E2MaxTryCount; retries++ {
1194 desc := fmt.Sprintf("(retry %d)", retries)
1196 c.UpdateCounter(cSubReqToE2)
1198 c.UpdateCounter(cSubReReqToE2)
1200 c.rmrSendToE2T(desc, subs, trans)
1201 if subs.DoNotWaitSubResp == false {
1202 event, timedOut = trans.WaitEvent(e2SubscriptionDirectives.E2TimeoutTimerValue)
1204 c.UpdateCounter(cSubReqTimerExpiry)
1208 // Simulating case where subscrition request has been sent but response has not been received before restart
1209 event = &SubmgrRestartTestEvent{}
1210 xapp.Logger.Debug("Restart event, DoNotWaitSubResp == true")
1214 xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1218 //-------------------------------------------------------------------
1219 // send to E2T Subscription Delete Request
1220 //-------------------------------------------------------------------
1222 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
1224 var event interface{}
1226 const ricRequestorId = 123
1228 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1229 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1230 subDelReqMsg.RequestId.Id = ricRequestorId
1231 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1232 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1234 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
1238 for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
1239 desc := fmt.Sprintf("(retry %d)", retries)
1241 c.UpdateCounter(cSubDelReqToE2)
1243 c.UpdateCounter(cSubDelReReqToE2)
1245 c.rmrSendToE2T(desc, subs, trans)
1246 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
1248 c.UpdateCounter(cSubDelReqTimerExpiry)
1253 xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1257 //-------------------------------------------------------------------
1258 // handle from E2T Subscription Response
1259 //-------------------------------------------------------------------
1260 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
1261 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1262 c.UpdateCounter(cSubRespFromE2)
1264 subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
1266 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
1269 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
1271 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
1274 trans := subs.GetTransaction()
1276 err = fmt.Errorf("Ongoing transaction not found")
1277 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
1280 xapp.Logger.Debug("SUBS-SubResp: Sending event, trans= %v", trans)
1281 sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
1282 if sendOk == false {
1283 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1284 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
1289 //-------------------------------------------------------------------
1290 // handle from E2T Subscription Failure
1291 //-------------------------------------------------------------------
1292 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
1293 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1294 c.UpdateCounter(cSubFailFromE2)
1295 subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
1297 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
1300 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
1302 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
1305 trans := subs.GetTransaction()
1307 err = fmt.Errorf("Ongoing transaction not found")
1308 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
1311 sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
1312 if sendOk == false {
1313 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1314 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
1319 //-------------------------------------------------------------------
1320 // handle from E2T Subscription Delete Response
1321 //-------------------------------------------------------------------
1322 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
1323 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1324 c.UpdateCounter(cSubDelRespFromE2)
1325 subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
1327 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1330 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
1332 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1335 trans := subs.GetTransaction()
1337 err = fmt.Errorf("Ongoing transaction not found")
1338 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
1341 sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
1342 if sendOk == false {
1343 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1344 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
1349 //-------------------------------------------------------------------
1350 // handle from E2T Subscription Delete Failure
1351 //-------------------------------------------------------------------
1352 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
1353 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1354 c.UpdateCounter(cSubDelFailFromE2)
1355 subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
1357 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1360 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
1362 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1365 trans := subs.GetTransaction()
1367 err = fmt.Errorf("Ongoing transaction not found")
1368 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
1371 sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
1372 if sendOk == false {
1373 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1374 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
1379 //-------------------------------------------------------------------
1381 //-------------------------------------------------------------------
1382 func typeofSubsMessage(v interface{}) string {
1387 //case *e2ap.E2APSubscriptionRequest:
1389 case *e2ap.E2APSubscriptionResponse:
1391 case *e2ap.E2APSubscriptionFailure:
1393 //case *e2ap.E2APSubscriptionDeleteRequest:
1394 // return "SubDelReq"
1395 case *e2ap.E2APSubscriptionDeleteResponse:
1397 case *e2ap.E2APSubscriptionDeleteFailure:
1404 //-------------------------------------------------------------------
1406 //-------------------------------------------------------------------
1407 func (c *Control) WriteSubscriptionToDb(subs *Subscription) error {
1408 xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
1409 err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
1411 xapp.Logger.Error("%v", err)
1417 //-------------------------------------------------------------------
1419 //-------------------------------------------------------------------
1420 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) error {
1422 if removeSubscriptionFromDb == true {
1423 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1424 c.RemoveSubscriptionFromDb(subs)
1426 // Update is needed for successful response and merge case here
1427 if subs.RetryFromXapp == false {
1428 err := c.WriteSubscriptionToDb(subs)
1432 subs.RetryFromXapp = false
1436 //-------------------------------------------------------------------
1438 //-------------------------------------------------------------------
1439 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
1440 xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
1441 err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
1443 xapp.Logger.Error("%v", err)
1447 //-------------------------------------------------------------------
1449 //-------------------------------------------------------------------
1450 func (c *Control) WriteRESTSubscriptionToDb(restSubId string, restSubs *RESTSubscription) {
1451 xapp.Logger.Debug("WriteRESTSubscriptionToDb() restSubId = %s", restSubId)
1452 err := c.WriteRESTSubscriptionToSdl(restSubId, restSubs)
1454 xapp.Logger.Error("%v", err)
1458 //-------------------------------------------------------------------
1460 //-------------------------------------------------------------------
1461 func (c *Control) UpdateRESTSubscriptionInDB(restSubId string, restSubs *RESTSubscription, removeRestSubscriptionFromDb bool) {
1463 if removeRestSubscriptionFromDb == true {
1464 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1465 c.RemoveRESTSubscriptionFromDb(restSubId)
1467 c.WriteRESTSubscriptionToDb(restSubId, restSubs)
1471 //-------------------------------------------------------------------
1473 //-------------------------------------------------------------------
1474 func (c *Control) RemoveRESTSubscriptionFromDb(restSubId string) {
1475 xapp.Logger.Debug("RemoveRESTSubscriptionFromDb() restSubId = %s", restSubId)
1476 err := c.RemoveRESTSubscriptionFromSdl(restSubId)
1478 xapp.Logger.Error("%v", err)
1482 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) {
1484 if c.UTTesting == true {
1485 // Reqistry mutex is not locked after real restart but it can be when restart is simulated in unit tests
1486 c.registry.mutex = new(sync.Mutex)
1489 const ricRequestorId = 123
1490 xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
1492 // Send delete for every endpoint in the subscription
1493 if subs.PolicyUpdate == false {
1494 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1495 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1496 subDelReqMsg.RequestId.Id = ricRequestorId
1497 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1498 mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1500 xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
1503 for _, endPoint := range subs.EpList.Endpoints {
1504 params := &xapp.RMRParams{}
1505 params.Mtype = mType
1506 params.SubId = int(subs.GetReqId().InstanceId)
1508 params.Meid = subs.Meid
1509 params.Src = endPoint.String()
1510 params.PayloadLen = len(payload.Buf)
1511 params.Payload = payload.Buf
1513 subs.DeleteFromDb = true
1514 c.handleXAPPSubscriptionDeleteRequest(params)
1519 func (c *Control) PrintRESTSubscriptionRequest(p *models.SubscriptionParams) {
1521 fmt.Println("CRESTSubscriptionRequest")
1527 if p.SubscriptionID != "" {
1528 fmt.Println(" SubscriptionID = ", p.SubscriptionID)
1530 fmt.Println(" SubscriptionID = ''")
1533 fmt.Printf(" ClientEndpoint.Host = %s\n", p.ClientEndpoint.Host)
1535 if p.ClientEndpoint.HTTPPort != nil {
1536 fmt.Printf(" ClientEndpoint.HTTPPort = %v\n", *p.ClientEndpoint.HTTPPort)
1538 fmt.Println(" ClientEndpoint.HTTPPort = nil")
1541 if p.ClientEndpoint.RMRPort != nil {
1542 fmt.Printf(" ClientEndpoint.RMRPort = %v\n", *p.ClientEndpoint.RMRPort)
1544 fmt.Println(" ClientEndpoint.RMRPort = nil")
1548 fmt.Printf(" Meid = %s\n", *p.Meid)
1550 fmt.Println(" Meid = nil")
1553 if p.E2SubscriptionDirectives == nil {
1554 fmt.Println(" E2SubscriptionDirectives = nil")
1556 fmt.Println(" E2SubscriptionDirectives")
1557 if p.E2SubscriptionDirectives.E2RetryCount == nil {
1558 fmt.Println(" E2RetryCount == nil")
1560 fmt.Printf(" E2RetryCount = %v\n", *p.E2SubscriptionDirectives.E2RetryCount)
1562 fmt.Printf(" E2TimeoutTimerValue = %v\n", p.E2SubscriptionDirectives.E2TimeoutTimerValue)
1563 fmt.Printf(" RMRRoutingNeeded = %v\n", p.E2SubscriptionDirectives.RMRRoutingNeeded)
1565 for _, subscriptionDetail := range p.SubscriptionDetails {
1566 if p.RANFunctionID != nil {
1567 fmt.Printf(" RANFunctionID = %v\n", *p.RANFunctionID)
1569 fmt.Println(" RANFunctionID = nil")
1571 fmt.Printf(" SubscriptionDetail.XappEventInstanceID = %v\n", *subscriptionDetail.XappEventInstanceID)
1572 fmt.Printf(" SubscriptionDetail.EventTriggers = %v\n", subscriptionDetail.EventTriggers)
1574 for _, actionToBeSetup := range subscriptionDetail.ActionToBeSetupList {
1575 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionID = %v\n", *actionToBeSetup.ActionID)
1576 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionType = %s\n", *actionToBeSetup.ActionType)
1577 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionDefinition = %v\n", actionToBeSetup.ActionDefinition)
1579 if actionToBeSetup.SubsequentAction != nil {
1580 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.SubsequentAction.SubsequentActionType = %s\n", *actionToBeSetup.SubsequentAction.SubsequentActionType)
1581 fmt.Printf(" SubscriptionDetail.ActionToBeSetup..SubsequentAction.TimeToWait = %s\n", *actionToBeSetup.SubsequentAction.TimeToWait)
1583 fmt.Println(" SubscriptionDetail.ActionToBeSetup.SubsequentAction = nil")