2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
28 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
29 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
30 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
31 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/common"
32 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
33 httptransport "github.com/go-openapi/runtime/client"
34 "github.com/go-openapi/strfmt"
35 "github.com/segmentio/ksuid"
36 "github.com/spf13/viper"
39 //-----------------------------------------------------------------------------
41 //-----------------------------------------------------------------------------
43 func idstring(err error, entries ...fmt.Stringer) string {
44 var retval string = ""
45 var filler string = ""
46 for _, entry := range entries {
48 retval += filler + entry.String()
51 retval += filler + "(NIL)"
55 retval += filler + "err(" + err.Error() + ")"
61 //-----------------------------------------------------------------------------
63 //-----------------------------------------------------------------------------
65 var e2tSubReqTimeout time.Duration
66 var e2tSubDelReqTime time.Duration
67 var e2tRecvMsgTimeout time.Duration
68 var waitRouteCleanup_ms time.Duration
69 var e2tMaxSubReqTryCount uint64 // Initial try + retry
70 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
71 var checkE2State string
72 var readSubsFromDb string
73 var dbRetryForever string
81 restDuplicateCtrl *DuplicateCtrl
83 e2IfStateDb XappRnibInterface
85 restSubsDb Sdlnterface
88 Counters map[string]xapp.Counter
99 type SubmgrRestartTestEvent struct{}
100 type SubmgrRestartUpEvent struct{}
101 type PackSubscriptionRequestErrortEvent struct {
105 func (p *PackSubscriptionRequestErrortEvent) SetEvent(errorInfo *ErrorInfo) {
106 p.ErrorInfo = *errorInfo
109 type SDLWriteErrortEvent struct {
113 func (s *SDLWriteErrortEvent) SetEvent(errorInfo *ErrorInfo) {
114 s.ErrorInfo = *errorInfo
118 xapp.Logger.Debug("SUBMGR")
120 viper.SetEnvPrefix("submgr")
121 viper.AllowEmptyEnv(true)
124 func NewControl() *Control {
126 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
127 rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
129 registry := new(Registry)
130 registry.Initialize()
131 registry.rtmgrClient = &rtmgrClient
133 tracker := new(Tracker)
136 restDuplicateCtrl := new(DuplicateCtrl)
137 restDuplicateCtrl.Init()
139 e2IfState := new(E2IfState)
141 c := &Control{e2ap: new(E2ap),
144 restDuplicateCtrl: restDuplicateCtrl,
145 e2IfState: e2IfState,
146 e2IfStateDb: CreateXappRnibIfInstance(),
147 e2SubsDb: CreateSdl(),
148 restSubsDb: CreateRESTSdl(),
149 Counters: xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
154 c.ReadConfigParameters("")
156 // Register REST handler for testing support
157 xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET")
158 xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
159 xapp.Resource.InjectRoute("/ric/v1/restsubscriptions", c.GetAllRestSubscriptions, "GET")
161 xapp.Resource.InjectRoute("/ric/v1/get_all_e2nodes", c.GetAllE2Nodes, "GET")
162 xapp.Resource.InjectRoute("/ric/v1/get_e2node_rest_subscriptions/{ranName}", c.GetAllE2NodeRestSubscriptions, "GET")
164 xapp.Resource.InjectRoute("/ric/v1/get_all_xapps", c.GetAllXapps, "GET")
165 xapp.Resource.InjectRoute("/ric/v1/get_xapp_rest_restsubscriptions/{xappServiceName}", c.GetAllXappRestSubscriptions, "GET")
166 xapp.Resource.InjectRoute("/ric/v1/get_e2subscriptions/{restId}", c.GetE2Subscriptions, "GET")
168 xapp.Resource.InjectRoute("/ric/v1/delete_all_e2node_subscriptions/{ranName}", c.DeleteAllE2nodeSubscriptions, "DELETE")
169 xapp.Resource.InjectRoute("/ric/v1/delete_all_xapp_subscriptions/{xappServiceName}", c.DeleteAllXappSubscriptions, "DELETE")
171 if readSubsFromDb == "true" {
172 // Read subscriptions from db
173 c.ReadE2Subscriptions()
174 c.ReadRESTSubscriptions()
177 go xapp.Subscription.Listen(c.RESTSubscriptionHandler, c.RESTQueryHandler, c.RESTSubscriptionDeleteHandler)
181 func (c *Control) SymptomDataHandler(w http.ResponseWriter, r *http.Request) {
182 subscriptions, _ := c.registry.QueryHandler()
183 xapp.Resource.SendSymptomDataJson(w, r, subscriptions, "platform/subscriptions.json")
186 //-------------------------------------------------------------------
188 //-------------------------------------------------------------------
189 func (c *Control) RESTQueryHandler() (models.SubscriptionList, error) {
190 xapp.Logger.Debug("RESTQueryHandler() called")
194 return c.registry.QueryHandler()
197 //-------------------------------------------------------------------
199 //-------------------------------------------------------------------
200 func (c *Control) ReadE2Subscriptions() error {
203 var register map[uint32]*Subscription
204 for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
205 xapp.Logger.Debug("Reading E2 subscriptions from db")
206 subIds, register, err = c.ReadAllSubscriptionsFromSdl()
208 xapp.Logger.Error("%v", err)
209 <-time.After(1 * time.Second)
211 c.registry.subIds = subIds
212 c.registry.register = register
213 go c.HandleUncompletedSubscriptions(register)
217 xapp.Logger.Debug("Continuing without retring")
221 //-------------------------------------------------------------------
223 //-------------------------------------------------------------------
224 func (c *Control) ReadRESTSubscriptions() error {
226 xapp.Logger.Debug("ReadRESTSubscriptions()")
228 var restSubscriptions map[string]*RESTSubscription
229 for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
230 xapp.Logger.Debug("Reading REST subscriptions from db")
231 restSubscriptions, err = c.ReadAllRESTSubscriptionsFromSdl()
233 xapp.Logger.Error("%v", err)
234 <-time.After(1 * time.Second)
236 // Fix REST subscriptions ongoing status after restart
237 for restSubId, restSubscription := range restSubscriptions {
238 restSubscription.SubReqOngoing = false
239 restSubscription.SubDelReqOngoing = false
240 c.WriteRESTSubscriptionToSdl(restSubId, restSubscription)
242 c.registry.restSubscriptions = restSubscriptions
246 xapp.Logger.Debug("Continuing without retring")
250 //-------------------------------------------------------------------
252 //-------------------------------------------------------------------
253 func (c *Control) ReadConfigParameters(f string) {
255 xapp.Logger.Debug("ReadConfigParameters")
257 c.LoggerLevel = int(xapp.Logger.GetLevel())
258 xapp.Logger.Info("LoggerLevel = %v", c.LoggerLevel)
259 c.e2ap.SetASN1DebugPrintStatus(c.LoggerLevel)
261 // viper.GetDuration returns nanoseconds
262 e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
263 if e2tSubReqTimeout == 0 {
264 e2tSubReqTimeout = 2000 * 1000000
265 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tSubReqTimeout")
267 xapp.Logger.Debug("e2tSubReqTimeout= %v", e2tSubReqTimeout)
269 e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
270 if e2tSubDelReqTime == 0 {
271 e2tSubDelReqTime = 2000 * 1000000
272 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tSubDelReqTime")
274 xapp.Logger.Debug("e2tSubDelReqTime= %v", e2tSubDelReqTime)
276 e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
277 if e2tRecvMsgTimeout == 0 {
278 e2tRecvMsgTimeout = 2000 * 1000000
279 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tRecvMsgTimeout")
281 xapp.Logger.Debug("e2tRecvMsgTimeout= %v", e2tRecvMsgTimeout)
283 e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
284 if e2tMaxSubReqTryCount == 0 {
285 e2tMaxSubReqTryCount = 1
286 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tMaxSubReqTryCount")
288 xapp.Logger.Debug("e2tMaxSubReqTryCount= %v", e2tMaxSubReqTryCount)
290 e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
291 if e2tMaxSubDelReqTryCount == 0 {
292 e2tMaxSubDelReqTryCount = 1
293 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tMaxSubDelReqTryCount")
295 xapp.Logger.Debug("e2tMaxSubDelReqTryCount= %v", e2tMaxSubDelReqTryCount)
297 checkE2State = viper.GetString("controls.checkE2State")
298 if checkE2State == "" {
299 checkE2State = "true"
300 xapp.Logger.Debug("WARNING: Using hard coded default value for checkE2State")
302 xapp.Logger.Debug("checkE2State= %v", checkE2State)
304 readSubsFromDb = viper.GetString("controls.readSubsFromDb")
305 if readSubsFromDb == "" {
306 readSubsFromDb = "true"
307 xapp.Logger.Debug("WARNING: Using hard coded default value for readSubsFromDb")
309 xapp.Logger.Debug("readSubsFromDb= %v", readSubsFromDb)
311 dbTryCount = viper.GetInt("controls.dbTryCount")
314 xapp.Logger.Debug("WARNING: Using hard coded default value for dbTryCount")
316 xapp.Logger.Debug("dbTryCount= %v", dbTryCount)
318 dbRetryForever = viper.GetString("controls.dbRetryForever")
319 if dbRetryForever == "" {
320 dbRetryForever = "true"
321 xapp.Logger.Debug("WARNING: Using hard coded default value for dbRetryForever")
323 xapp.Logger.Debug("dbRetryForever= %v", dbRetryForever)
325 // Internal cfg parameter, used to define a wait time for RMR route clean-up. None default
326 // value 100ms used currently only in unittests.
327 waitRouteCleanup_ms = viper.GetDuration("controls.waitRouteCleanup_ms") * 1000000
328 if waitRouteCleanup_ms == 0 {
329 waitRouteCleanup_ms = 5000 * 1000000
330 xapp.Logger.Debug("WARNING: Using hard coded default value for waitRouteCleanup_ms")
332 xapp.Logger.Debug("waitRouteCleanup= %v", waitRouteCleanup_ms)
335 //-------------------------------------------------------------------
337 //-------------------------------------------------------------------
338 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
340 xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
341 for subId, subs := range register {
342 if subs.SubRespRcvd == false {
343 // If policy subscription has already been made successfully unsuccessful update should not be deleted.
344 if subs.PolicyUpdate == false {
345 subs.NoRespToXapp = true
346 xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
347 c.SendSubscriptionDeleteReq(subs)
353 func (c *Control) ReadyCB(data interface{}) {
354 if c.RMRClient == nil {
355 c.RMRClient = xapp.Rmr
359 func (c *Control) Run() {
360 xapp.SetReadyCB(c.ReadyCB, nil)
361 xapp.AddConfigChangeListener(c.ReadConfigParameters)
365 //-------------------------------------------------------------------
367 //-------------------------------------------------------------------
368 func (c *Control) GetOrCreateRestSubscription(p *models.SubscriptionParams, md5sum string, xAppRmrEndpoint string, xAppServiceName string) (*RESTSubscription, string, error) {
371 var restSubscription *RESTSubscription
374 prevRestSubsId, exists := c.restDuplicateCtrl.GetLastKnownRestSubsIdBasedOnMd5sum(md5sum)
375 if p.SubscriptionID == "" {
376 // Subscription does not contain REST subscription Id
378 restSubscription, err = c.registry.GetRESTSubscription(prevRestSubsId, false)
379 if restSubscription != nil {
380 // Subscription not found
381 restSubId = prevRestSubsId
383 xapp.Logger.Debug("Existing restSubId %s found by MD5sum %s for a request without subscription ID - using previous subscription", prevRestSubsId, md5sum)
385 xapp.Logger.Debug("Existing restSubId %s found by MD5sum %s for a request without subscription ID - Note: %s", prevRestSubsId, md5sum, err.Error())
388 xapp.Logger.Debug("None existing restSubId %s referred by MD5sum %s for a request without subscription ID - deleting cached entry", prevRestSubsId, md5sum)
389 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
393 if restSubscription == nil {
394 restSubId = ksuid.New().String()
395 restSubscription = c.registry.CreateRESTSubscription(&restSubId, &xAppServiceName, &xAppRmrEndpoint, p.Meid)
398 // Subscription contains REST subscription Id
399 restSubId = p.SubscriptionID
401 xapp.Logger.Debug("RestSubscription ID %s provided via REST request", restSubId)
402 restSubscription, err = c.registry.GetRESTSubscription(restSubId, false)
404 // Subscription with id in REST request does not exist
405 xapp.Logger.Error("%s", err.Error())
406 c.UpdateCounter(cRestSubFailToXapp)
411 xapp.Logger.Debug("Existing restSubscription found for ID %s, new request based on md5sum", restSubId)
413 xapp.Logger.Debug("Existing restSubscription found for ID %s(%s), re-transmission based on md5sum match with previous request", prevRestSubsId, restSubId)
417 return restSubscription, restSubId, nil
420 //-------------------------------------------------------------------
422 //-------------------------------------------------------------------
423 func (c *Control) RESTSubscriptionHandler(params interface{}) (*models.SubscriptionResponse, int) {
426 c.UpdateCounter(cRestSubReqFromXapp)
428 subResp := models.SubscriptionResponse{}
429 p := params.(*models.SubscriptionParams)
431 if c.LoggerLevel > 2 {
432 c.PrintRESTSubscriptionRequest(p)
435 if c.e2IfState.IsE2ConnectionUp(p.Meid) == false {
436 xapp.Logger.Error("No E2 connection for ranName %v", *p.Meid)
437 c.UpdateCounter(cRestReqRejDueE2Down)
438 return nil, common.SubscribeServiceUnavailableCode
441 if p.ClientEndpoint == nil {
442 err := fmt.Errorf("ClientEndpoint == nil")
443 xapp.Logger.Error("%v", err)
444 c.UpdateCounter(cRestSubFailToXapp)
445 return nil, common.SubscribeBadRequestCode
448 _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*p.ClientEndpoint)
450 xapp.Logger.Error("%s", err.Error())
451 c.UpdateCounter(cRestSubFailToXapp)
452 return nil, common.SubscribeBadRequestCode
455 md5sum, err := CalculateRequestMd5sum(params)
457 xapp.Logger.Error("Failed to generate md5sum from incoming request - %s", err.Error())
460 restSubscription, restSubId, err := c.GetOrCreateRestSubscription(p, md5sum, xAppRmrEndpoint, p.ClientEndpoint.Host)
462 xapp.Logger.Error("Subscription with id in REST request does not exist")
463 return nil, common.SubscribeNotFoundCode
466 subResp.SubscriptionID = &restSubId
467 subReqList := e2ap.SubscriptionRequestList{}
468 err = c.e2ap.FillSubscriptionReqMsgs(params, &subReqList, restSubscription)
470 xapp.Logger.Error("%s", err.Error())
471 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
472 c.registry.DeleteRESTSubscription(&restSubId)
473 c.UpdateCounter(cRestSubFailToXapp)
474 return nil, common.SubscribeBadRequestCode
477 duplicate := c.restDuplicateCtrl.IsDuplicateToOngoingTransaction(restSubId, md5sum)
479 err := fmt.Errorf("Retransmission blocker direct ACK for request of restSubsId %s restSubId MD5sum %s as retransmission", restSubId, md5sum)
480 xapp.Logger.Debug("%s", err)
481 c.registry.DeleteRESTSubscription(&restSubId)
482 c.UpdateCounter(cRestSubRespToXapp)
483 return &subResp, common.SubscribeCreatedCode
486 c.WriteRESTSubscriptionToDb(restSubId, restSubscription)
487 e2SubscriptionDirectives, err := c.GetE2SubscriptionDirectives(p)
489 xapp.Logger.Error("%s", err)
490 c.registry.DeleteRESTSubscription(&restSubId)
491 return nil, common.SubscribeBadRequestCode
493 go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId, xAppRmrEndpoint, md5sum, e2SubscriptionDirectives)
495 c.UpdateCounter(cRestSubRespToXapp)
496 return &subResp, common.SubscribeCreatedCode
499 //-------------------------------------------------------------------
501 //-------------------------------------------------------------------
502 func (c *Control) GetE2SubscriptionDirectives(p *models.SubscriptionParams) (*E2SubscriptionDirectives, error) {
504 e2SubscriptionDirectives := &E2SubscriptionDirectives{}
505 if p == nil || p.E2SubscriptionDirectives == nil {
506 e2SubscriptionDirectives.E2TimeoutTimerValue = e2tSubReqTimeout
507 e2SubscriptionDirectives.E2MaxTryCount = int64(e2tMaxSubReqTryCount)
508 e2SubscriptionDirectives.CreateRMRRoute = true
509 xapp.Logger.Debug("p == nil || p.E2SubscriptionDirectives == nil. Using default values for E2TimeoutTimerValue = %v and E2RetryCount = %v RMRRoutingNeeded = true", e2tSubReqTimeout, e2tMaxSubReqTryCount)
511 if p.E2SubscriptionDirectives.E2TimeoutTimerValue >= 1 && p.E2SubscriptionDirectives.E2TimeoutTimerValue <= 10 {
512 e2SubscriptionDirectives.E2TimeoutTimerValue = time.Duration(p.E2SubscriptionDirectives.E2TimeoutTimerValue) * 1000000000 // Duration type cast returns nano seconds
514 return nil, fmt.Errorf("p.E2SubscriptionDirectives.E2TimeoutTimerValue out of range (1-10 seconds): %v", p.E2SubscriptionDirectives.E2TimeoutTimerValue)
516 if p.E2SubscriptionDirectives.E2RetryCount == nil {
517 xapp.Logger.Error("p.E2SubscriptionDirectives.E2RetryCount == nil. Using default value")
518 e2SubscriptionDirectives.E2MaxTryCount = int64(e2tMaxSubReqTryCount)
520 if *p.E2SubscriptionDirectives.E2RetryCount >= 0 && *p.E2SubscriptionDirectives.E2RetryCount <= 10 {
521 e2SubscriptionDirectives.E2MaxTryCount = *p.E2SubscriptionDirectives.E2RetryCount + 1 // E2MaxTryCount = First sending plus two retries
523 return nil, fmt.Errorf("p.E2SubscriptionDirectives.E2RetryCount out of range (0-10): %v", *p.E2SubscriptionDirectives.E2RetryCount)
526 e2SubscriptionDirectives.CreateRMRRoute = p.E2SubscriptionDirectives.RMRRoutingNeeded
528 xapp.Logger.Debug("e2SubscriptionDirectives.E2TimeoutTimerValue: %v", e2SubscriptionDirectives.E2TimeoutTimerValue)
529 xapp.Logger.Debug("e2SubscriptionDirectives.E2MaxTryCount: %v", e2SubscriptionDirectives.E2MaxTryCount)
530 xapp.Logger.Debug("e2SubscriptionDirectives.CreateRMRRoute: %v", e2SubscriptionDirectives.CreateRMRRoute)
531 return e2SubscriptionDirectives, nil
534 //-------------------------------------------------------------------
536 //-------------------------------------------------------------------
538 func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription, subReqList *e2ap.SubscriptionRequestList,
539 clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string, xAppRmrEndpoint string, md5sum string, e2SubscriptionDirectives *E2SubscriptionDirectives) {
541 c.SubscriptionProcessingStartDelay()
542 xapp.Logger.Debug("E2 SubscriptionRequest count = %v ", len(subReqList.E2APSubscriptionRequests))
544 var xAppEventInstanceID int64
545 var e2EventInstanceID int64
546 errorInfo := &ErrorInfo{}
548 defer c.restDuplicateCtrl.SetMd5sumFromLastOkRequest(*restSubId, md5sum)
550 for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ {
551 subReqMsg := subReqList.E2APSubscriptionRequests[index]
552 xAppEventInstanceID = (int64)(subReqMsg.RequestId.Id)
554 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), *restSubId, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid})
556 // Send notification to xApp that prosessing of a Subscription Request has failed.
557 err := fmt.Errorf("Tracking failure")
558 errorInfo.ErrorCause = err.Error()
559 c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans, errorInfo)
563 xapp.Logger.Debug("Handle SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
565 subRespMsg, errorInfo, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, *restSubId, e2SubscriptionDirectives)
567 xapp.Logger.Debug("Handled SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
571 if err.Error() == "TEST: restart event received" {
572 // This is just for UT cases. Stop here subscription processing
575 c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans, errorInfo)
577 e2EventInstanceID = (int64)(subRespMsg.RequestId.InstanceId)
578 restSubscription.AddMd5Sum(md5sum)
579 xapp.Logger.Debug("SubscriptionRequest index=%v processed successfullyfor %s. endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
580 index, *restSubId, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
581 c.sendSuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, e2EventInstanceID, clientEndpoint, trans, errorInfo)
586 //-------------------------------------------------------------------
588 //------------------------------------------------------------------
589 func (c *Control) SubscriptionProcessingStartDelay() {
590 if c.UTTesting == true {
591 // This is temporary fix for the UT problem that notification arrives before subscription response
592 // Correct fix would be to allow notification come before response and process it correctly
593 xapp.Logger.Debug("Setting 50 ms delay before starting processing Subscriptions")
594 <-time.After(time.Millisecond * 50)
595 xapp.Logger.Debug("Continuing after delay")
599 //-------------------------------------------------------------------
601 //------------------------------------------------------------------
602 func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, meid *string,
603 restSubId string, e2SubscriptionDirectives *E2SubscriptionDirectives) (*e2ap.E2APSubscriptionResponse, *ErrorInfo, error) {
605 errorInfo := ErrorInfo{}
607 err := c.tracker.Track(trans)
609 xapp.Logger.Error("XAPP-SubReq Tracking error: %s", idstring(err, trans))
610 errorInfo.ErrorCause = err.Error()
611 err = fmt.Errorf("Tracking failure")
612 return nil, &errorInfo, err
615 subs, errorInfo, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c, e2SubscriptionDirectives.CreateRMRRoute)
617 xapp.Logger.Error("XAPP-SubReq Assign error: %s", idstring(err, trans))
618 return nil, &errorInfo, err
624 subs.OngoingReqCount++
625 go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives, 0)
626 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
627 subs.OngoingReqCount--
631 switch themsg := event.(type) {
632 case *e2ap.E2APSubscriptionResponse:
634 if c.e2IfState.IsE2ConnectionUp(meid) == true {
635 errorInfo = c.e2ap.CheckActionNotAdmittedList(xapp.RIC_SUB_RESP, themsg.ActionNotAdmittedList, c)
636 return themsg, &errorInfo, nil
638 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
639 c.RemoveSubscriptionFromDb(subs)
640 err = fmt.Errorf("E2 interface down")
641 errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
643 case *e2ap.E2APSubscriptionFailure:
644 err = fmt.Errorf("E2 RICSubscriptionFailure received")
645 errorInfo = c.e2ap.CheckActionNotAdmittedList(xapp.RIC_SUB_FAILURE, themsg.ActionNotAdmittedList, c)
646 case *PackSubscriptionRequestErrortEvent:
647 err = fmt.Errorf("E2 RICSubscriptionRequest pack failure")
648 errorInfo = themsg.ErrorInfo
649 case *SDLWriteErrortEvent:
650 err = fmt.Errorf("SDL write failure")
651 errorInfo = themsg.ErrorInfo
652 case *SubmgrRestartTestEvent:
653 err = fmt.Errorf("TEST: restart event received")
654 xapp.Logger.Debug("%s", err)
655 return nil, &errorInfo, err
657 err = fmt.Errorf("Unexpected E2 subscription response received")
658 errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
663 err = fmt.Errorf("E2 RICSubscriptionResponse timeout")
664 errorInfo.SetInfo(err.Error(), "", models.SubscriptionInstanceTimeoutTypeE2Timeout)
665 if subs.PolicyUpdate == true {
666 return nil, &errorInfo, err
670 xapp.Logger.Error("XAPP-SubReq E2 subscription failed %s", idstring(err, trans, subs))
671 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
672 return nil, &errorInfo, err
675 //-------------------------------------------------------------------
677 //-------------------------------------------------------------------
678 func (c *Control) sendUnsuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, err error,
679 clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp, errorInfo *ErrorInfo) {
681 // Send notification to xApp that prosessing of a Subscription Request has failed.
682 e2EventInstanceID := (int64)(0)
683 if errorInfo.ErrorSource == "" {
684 // Submgr is default source of error
685 errorInfo.ErrorSource = models.SubscriptionInstanceErrorSourceSUBMGR
687 resp := &models.SubscriptionResponse{
688 SubscriptionID: restSubId,
689 SubscriptionInstances: []*models.SubscriptionInstance{
690 &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
691 ErrorCause: errorInfo.ErrorCause,
692 ErrorSource: errorInfo.ErrorSource,
693 TimeoutType: errorInfo.TimeoutType,
694 XappEventInstanceID: &xAppEventInstanceID},
697 // Mark REST subscription request processed.
698 restSubscription.SetProcessed(err)
699 c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
701 xapp.Logger.Debug("Sending unsuccessful REST notification (Error cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
702 errorInfo.ErrorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
704 xapp.Logger.Debug("Sending unsuccessful REST notification (Error cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v",
705 errorInfo.ErrorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID)
708 c.UpdateCounter(cRestSubFailNotifToXapp)
709 xapp.Subscription.Notify(resp, *clientEndpoint)
711 // E2 is down. Delete completely processed request safely now
712 if c.e2IfState.IsE2ConnectionUp(&restSubscription.Meid) == false && restSubscription.SubReqOngoing == false {
713 c.registry.DeleteRESTSubscription(restSubId)
714 c.RemoveRESTSubscriptionFromDb(*restSubId)
718 //-------------------------------------------------------------------
720 //-------------------------------------------------------------------
721 func (c *Control) sendSuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, e2EventInstanceID int64,
722 clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp, errorInfo *ErrorInfo) {
724 // Store successfully processed InstanceId for deletion
725 restSubscription.AddE2InstanceId((uint32)(e2EventInstanceID))
726 restSubscription.AddXappIdToE2Id(xAppEventInstanceID, e2EventInstanceID)
728 // Send notification to xApp that a Subscription Request has been processed.
729 resp := &models.SubscriptionResponse{
730 SubscriptionID: restSubId,
731 SubscriptionInstances: []*models.SubscriptionInstance{
732 &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
733 ErrorCause: errorInfo.ErrorCause,
734 ErrorSource: errorInfo.ErrorSource,
735 XappEventInstanceID: &xAppEventInstanceID},
738 // Mark REST subscription request processesd.
739 restSubscription.SetProcessed(nil)
740 c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
741 if errorInfo.ErrorCause != " " {
742 xapp.Logger.Debug("Sending successful REST notification (Error cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
743 errorInfo.ErrorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
745 xapp.Logger.Debug("Sending successful REST notification to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
746 clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
748 c.UpdateCounter(cRestSubNotifToXapp)
749 xapp.Subscription.Notify(resp, *clientEndpoint)
751 // E2 is down. Delete completely processed request safely now
752 if c.e2IfState.IsE2ConnectionUp(&restSubscription.Meid) == false && restSubscription.SubReqOngoing == false {
753 c.registry.DeleteRESTSubscription(restSubId)
754 c.RemoveRESTSubscriptionFromDb(*restSubId)
758 //-------------------------------------------------------------------
760 //-------------------------------------------------------------------
761 func (c *Control) RESTSubscriptionDeleteHandler(restSubId string) int {
764 c.UpdateCounter(cRestSubDelReqFromXapp)
766 xapp.Logger.Debug("SubscriptionDeleteRequest from XAPP")
768 restSubscription, err := c.registry.GetRESTSubscription(restSubId, true)
770 xapp.Logger.Error("%s", err.Error())
771 if restSubscription == nil {
772 // Subscription was not found
773 c.UpdateCounter(cRestSubDelRespToXapp)
774 return common.UnsubscribeNoContentCode
776 if restSubscription.SubReqOngoing == true {
777 err := fmt.Errorf("Handling of the REST Subscription Request still ongoing %s", restSubId)
778 xapp.Logger.Error("%s", err.Error())
779 c.UpdateCounter(cRestSubDelFailToXapp)
780 return common.UnsubscribeBadRequestCode
781 } else if restSubscription.SubDelReqOngoing == true {
782 // Previous request for same restSubId still ongoing
783 c.UpdateCounter(cRestSubDelRespToXapp)
784 return common.UnsubscribeNoContentCode
789 xAppRmrEndPoint := restSubscription.xAppRmrEndPoint
791 xapp.Logger.Debug("Deleteting handler: processing instances = %v", restSubscription.InstanceIds)
792 for _, instanceId := range restSubscription.InstanceIds {
793 xAppEventInstanceID, err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId, 0)
796 xapp.Logger.Error("%s", err.Error())
798 xapp.Logger.Debug("Deleteting instanceId = %v", instanceId)
799 restSubscription.DeleteXappIdToE2Id(xAppEventInstanceID)
800 restSubscription.DeleteE2InstanceId(instanceId)
802 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(restSubscription.lastReqMd5sum)
803 c.registry.DeleteRESTSubscription(&restSubId)
804 c.RemoveRESTSubscriptionFromDb(restSubId)
807 c.UpdateCounter(cRestSubDelRespToXapp)
808 return common.UnsubscribeNoContentCode
811 //-------------------------------------------------------------------
813 //-------------------------------------------------------------------
814 func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32, waitRouteCleanupTime time.Duration) (int64, error) {
816 var xAppEventInstanceID int64
817 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{instanceId})
819 xapp.Logger.Debug("Subscription Delete Handler subscription for restSubId=%v, E2EventInstanceID=%v not found %s",
820 restSubId, instanceId, idstring(err, nil))
821 return xAppEventInstanceID, nil
824 xAppEventInstanceID = int64(subs.ReqId.Id)
825 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(*endPoint), *restSubId, e2ap.RequestId{subs.ReqId.Id, 0}, &xapp.RMRMeid{RanName: *meid})
827 err := fmt.Errorf("XAPP-SubDelReq transaction not created. restSubId %s, endPoint %s, meid %s, instanceId %v", *restSubId, *endPoint, *meid, instanceId)
828 xapp.Logger.Error("%s", err.Error())
830 defer trans.Release()
832 err = c.tracker.Track(trans)
834 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
835 xapp.Logger.Error("%s", err.Error())
836 return xAppEventInstanceID, &time.ParseError{}
841 subs.OngoingDelCount++
842 go c.handleSubscriptionDelete(subs, trans, waitRouteCleanupTime)
843 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
844 subs.OngoingDelCount--
846 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
848 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
850 return xAppEventInstanceID, nil
853 //-------------------------------------------------------------------
855 //-------------------------------------------------------------------
857 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
858 params := &xapp.RMRParams{}
859 params.Mtype = trans.GetMtype()
860 params.SubId = int(subs.GetReqId().InstanceId)
862 params.Meid = subs.GetMeid()
864 params.PayloadLen = len(trans.Payload.Buf)
865 params.Payload = trans.Payload.Buf
867 xapp.Logger.Debug("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
868 err = c.SendWithRetry(params, false, 5)
870 xapp.Logger.Error("rmrSendToE2T: Send failed: %+v", err)
875 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
877 params := &xapp.RMRParams{}
878 params.Mtype = trans.GetMtype()
879 params.SubId = int(subs.GetReqId().InstanceId)
880 params.Xid = trans.GetXid()
881 params.Meid = trans.GetMeid()
883 params.PayloadLen = len(trans.Payload.Buf)
884 params.Payload = trans.Payload.Buf
886 xapp.Logger.Debug("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
887 err = c.SendWithRetry(params, false, 5)
889 xapp.Logger.Error("rmrSendToXapp: Send failed: %+v", err)
894 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
895 if c.RMRClient == nil {
896 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
897 xapp.Logger.Error("%s", err.Error())
902 defer c.RMRClient.Free(msg.Mbuf)
904 // xapp-frame might use direct access to c buffer and
905 // when msg.Mbuf is freed, someone might take it into use
906 // and payload data might be invalid inside message handle function
908 // subscriptions won't load system a lot so there is no
909 // real performance hit by cloning buffer into new go byte slice
910 cPay := append(msg.Payload[:0:0], msg.Payload...)
912 msg.PayloadLen = len(cPay)
915 case xapp.RIC_SUB_REQ:
916 go c.handleXAPPSubscriptionRequest(msg)
917 case xapp.RIC_SUB_RESP:
918 go c.handleE2TSubscriptionResponse(msg)
919 case xapp.RIC_SUB_FAILURE:
920 go c.handleE2TSubscriptionFailure(msg)
921 case xapp.RIC_SUB_DEL_REQ:
922 go c.handleXAPPSubscriptionDeleteRequest(msg)
923 case xapp.RIC_SUB_DEL_RESP:
924 go c.handleE2TSubscriptionDeleteResponse(msg)
925 case xapp.RIC_SUB_DEL_FAILURE:
926 go c.handleE2TSubscriptionDeleteFailure(msg)
928 xapp.Logger.Debug("Unknown Message Type '%d', discarding", msg.Mtype)
933 //-------------------------------------------------------------------
934 // handle from XAPP Subscription Request
935 //------------------------------------------------------------------
936 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
937 xapp.Logger.Debug("MSG from XAPP: %s", params.String())
938 c.UpdateCounter(cSubReqFromXapp)
940 if c.e2IfState.IsE2ConnectionUp(¶ms.Meid.RanName) == false {
941 xapp.Logger.Error("No E2 connection for ranName %v", params.Meid.RanName)
945 subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
947 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
951 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
953 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
956 defer trans.Release()
958 if err = c.tracker.Track(trans); err != nil {
959 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
963 subs, _, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c, true)
965 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
969 c.wakeSubscriptionRequest(subs, trans)
972 //-------------------------------------------------------------------
973 // Wake Subscription Request to E2node
974 //------------------------------------------------------------------
975 func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) {
977 e2SubscriptionDirectives, _ := c.GetE2SubscriptionDirectives(nil)
978 subs.OngoingReqCount++
979 go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives, waitRouteCleanup_ms)
980 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
981 subs.OngoingReqCount--
984 switch themsg := event.(type) {
985 case *e2ap.E2APSubscriptionResponse:
986 themsg.RequestId.Id = trans.RequestId.Id
987 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
990 c.UpdateCounter(cSubRespToXapp)
991 c.rmrSendToXapp("", subs, trans)
994 case *e2ap.E2APSubscriptionFailure:
995 themsg.RequestId.Id = trans.RequestId.Id
996 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
998 c.UpdateCounter(cSubFailToXapp)
999 c.rmrSendToXapp("", subs, trans)
1005 xapp.Logger.Debug("XAPP-SubReq: failed %s", idstring(err, trans, subs))
1008 //-------------------------------------------------------------------
1009 // handle from XAPP Subscription Delete Request
1010 //------------------------------------------------------------------
1011 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
1012 xapp.Logger.Debug("MSG from XAPP: %s", params.String())
1013 c.UpdateCounter(cSubDelReqFromXapp)
1015 if c.e2IfState.IsE2ConnectionUp(¶ms.Meid.RanName) == false {
1016 xapp.Logger.Error("No E2 connection for ranName %v", params.Meid.RanName)
1020 subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
1022 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
1026 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
1028 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
1031 defer trans.Release()
1033 err = c.tracker.Track(trans)
1035 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
1039 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
1041 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
1048 subs.OngoingDelCount++
1049 go c.handleSubscriptionDelete(subs, trans, waitRouteCleanup_ms)
1050 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
1051 subs.OngoingDelCount--
1053 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
1055 if subs.NoRespToXapp == true {
1056 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
1057 xapp.Logger.Debug("XAPP-SubDelReq: subs.NoRespToXapp == true")
1061 // Whatever is received success, fail or timeout, send successful delete response
1062 subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
1063 subDelRespMsg.RequestId.Id = trans.RequestId.Id
1064 subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
1065 subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
1066 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
1068 c.UpdateCounter(cSubDelRespToXapp)
1069 c.rmrSendToXapp("", subs, trans)
1073 //-------------------------------------------------------------------
1074 // SUBS CREATE Handling
1075 //-------------------------------------------------------------------
1076 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp, e2SubscriptionDirectives *E2SubscriptionDirectives, waitRouteCleanupTime time.Duration) {
1078 var event interface{} = nil
1079 var removeSubscriptionFromDb bool = false
1080 trans := c.tracker.NewSubsTransaction(subs)
1081 subs.WaitTransactionTurn(trans)
1082 defer subs.ReleaseTransactionTurn(trans)
1083 defer trans.Release()
1085 xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
1087 subRfMsg, valid := subs.GetCachedResponse()
1088 if subRfMsg == nil && valid == true {
1089 event = c.sendE2TSubscriptionRequest(subs, trans, parentTrans, e2SubscriptionDirectives)
1090 switch event.(type) {
1091 case *e2ap.E2APSubscriptionResponse:
1092 subRfMsg, valid = subs.SetCachedResponse(event, true)
1093 subs.SubRespRcvd = true
1094 case *e2ap.E2APSubscriptionFailure:
1095 subRfMsg, valid = subs.SetCachedResponse(event, false)
1096 xapp.Logger.Debug("SUBS-SubReq: internal delete due failure event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1097 case *SubmgrRestartTestEvent:
1098 // This is used to simulate that no response has been received and after restart, subscriptions are restored from db
1099 xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
1100 subRfMsg, valid = subs.SetCachedResponse(event, false)
1101 parentTrans.SendEvent(subRfMsg, 0)
1103 case *PackSubscriptionRequestErrortEvent, *SDLWriteErrortEvent:
1104 subRfMsg, valid = subs.SetCachedResponse(event, false)
1107 if subs.PolicyUpdate == false {
1108 xapp.Logger.Debug("SUBS-SubReq: internal delete due default event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1109 subRfMsg, valid = subs.SetCachedResponse(nil, false)
1110 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1112 subRfMsg, valid = subs.SetCachedResponse(nil, true)
1115 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
1117 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
1120 removeSubscriptionFromDb = true
1123 err := c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
1126 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1129 // Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
1131 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanupTime, c)
1134 parentTrans.SendEvent(subRfMsg, 0)
1137 //-------------------------------------------------------------------
1138 // SUBS DELETE Handling
1139 //-------------------------------------------------------------------
1141 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp, waitRouteCleanupTime time.Duration) {
1143 trans := c.tracker.NewSubsTransaction(subs)
1144 subs.WaitTransactionTurn(trans)
1145 defer subs.ReleaseTransactionTurn(trans)
1146 defer trans.Release()
1148 xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
1152 if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
1155 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1160 // Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
1161 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanupTime, c)
1162 parentTrans.SendEvent(nil, 0)
1165 //-------------------------------------------------------------------
1166 // send to E2T Subscription Request
1167 //-------------------------------------------------------------------
1168 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp, e2SubscriptionDirectives *E2SubscriptionDirectives) interface{} {
1170 var event interface{} = nil
1171 var timedOut bool = false
1172 const ricRequestorId = 123
1174 subReqMsg := subs.SubReqMsg
1175 subReqMsg.RequestId = subs.GetReqId().RequestId
1176 subReqMsg.RequestId.Id = ricRequestorId
1177 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
1179 xapp.Logger.Error("SUBS-SubReq ASN1 pack error: %s", idstring(err, trans, subs, parentTrans))
1180 return &PackSubscriptionRequestErrortEvent{
1182 ErrorSource: models.SubscriptionInstanceErrorSourceASN1,
1183 ErrorCause: err.Error(),
1188 // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
1189 err = c.WriteSubscriptionToDb(subs)
1191 return &SDLWriteErrortEvent{
1193 ErrorSource: models.SubscriptionInstanceErrorSourceDBAAS,
1194 ErrorCause: err.Error(),
1199 for retries := int64(0); retries < e2SubscriptionDirectives.E2MaxTryCount; retries++ {
1200 desc := fmt.Sprintf("(retry %d)", retries)
1202 c.UpdateCounter(cSubReqToE2)
1204 c.UpdateCounter(cSubReReqToE2)
1206 c.rmrSendToE2T(desc, subs, trans)
1207 if subs.DoNotWaitSubResp == false {
1208 event, timedOut = trans.WaitEvent(e2SubscriptionDirectives.E2TimeoutTimerValue)
1210 c.UpdateCounter(cSubReqTimerExpiry)
1214 // Simulating case where subscrition request has been sent but response has not been received before restart
1215 event = &SubmgrRestartTestEvent{}
1216 xapp.Logger.Debug("Restart event, DoNotWaitSubResp == true")
1220 xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1224 //-------------------------------------------------------------------
1225 // send to E2T Subscription Delete Request
1226 //-------------------------------------------------------------------
1228 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
1230 var event interface{}
1232 const ricRequestorId = 123
1234 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1235 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1236 subDelReqMsg.RequestId.Id = ricRequestorId
1237 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1238 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1240 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
1244 for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
1245 desc := fmt.Sprintf("(retry %d)", retries)
1247 c.UpdateCounter(cSubDelReqToE2)
1249 c.UpdateCounter(cSubDelReReqToE2)
1251 c.rmrSendToE2T(desc, subs, trans)
1252 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
1254 c.UpdateCounter(cSubDelReqTimerExpiry)
1259 xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1263 //-------------------------------------------------------------------
1264 // handle from E2T Subscription Response
1265 //-------------------------------------------------------------------
1266 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
1267 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1268 c.UpdateCounter(cSubRespFromE2)
1270 subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
1272 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
1275 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
1277 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
1280 trans := subs.GetTransaction()
1282 err = fmt.Errorf("Ongoing transaction not found")
1283 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
1286 xapp.Logger.Debug("SUBS-SubResp: Sending event, trans= %v", trans)
1287 sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
1288 if sendOk == false {
1289 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1290 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
1295 //-------------------------------------------------------------------
1296 // handle from E2T Subscription Failure
1297 //-------------------------------------------------------------------
1298 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
1299 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1300 c.UpdateCounter(cSubFailFromE2)
1301 subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
1303 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
1306 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
1308 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
1311 trans := subs.GetTransaction()
1313 err = fmt.Errorf("Ongoing transaction not found")
1314 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
1317 sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
1318 if sendOk == false {
1319 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1320 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
1325 //-------------------------------------------------------------------
1326 // handle from E2T Subscription Delete Response
1327 //-------------------------------------------------------------------
1328 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
1329 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1330 c.UpdateCounter(cSubDelRespFromE2)
1331 subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
1333 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1336 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
1338 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1341 trans := subs.GetTransaction()
1343 err = fmt.Errorf("Ongoing transaction not found")
1344 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
1347 sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
1348 if sendOk == false {
1349 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1350 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
1355 //-------------------------------------------------------------------
1356 // handle from E2T Subscription Delete Failure
1357 //-------------------------------------------------------------------
1358 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
1359 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1360 c.UpdateCounter(cSubDelFailFromE2)
1361 subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
1363 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1366 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
1368 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1371 trans := subs.GetTransaction()
1373 err = fmt.Errorf("Ongoing transaction not found")
1374 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
1377 sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
1378 if sendOk == false {
1379 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1380 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
1385 //-------------------------------------------------------------------
1387 //-------------------------------------------------------------------
1388 func typeofSubsMessage(v interface{}) string {
1393 //case *e2ap.E2APSubscriptionRequest:
1395 case *e2ap.E2APSubscriptionResponse:
1397 case *e2ap.E2APSubscriptionFailure:
1399 //case *e2ap.E2APSubscriptionDeleteRequest:
1400 // return "SubDelReq"
1401 case *e2ap.E2APSubscriptionDeleteResponse:
1403 case *e2ap.E2APSubscriptionDeleteFailure:
1410 //-------------------------------------------------------------------
1412 //-------------------------------------------------------------------
1413 func (c *Control) WriteSubscriptionToDb(subs *Subscription) error {
1414 xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
1415 err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
1417 xapp.Logger.Error("%v", err)
1423 //-------------------------------------------------------------------
1425 //-------------------------------------------------------------------
1426 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) error {
1428 if removeSubscriptionFromDb == true {
1429 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1430 c.RemoveSubscriptionFromDb(subs)
1432 // Update is needed for successful response and merge case here
1433 if subs.RetryFromXapp == false {
1434 err := c.WriteSubscriptionToDb(subs)
1438 subs.RetryFromXapp = false
1442 //-------------------------------------------------------------------
1444 //-------------------------------------------------------------------
1445 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
1446 xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
1447 err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
1449 xapp.Logger.Error("%v", err)
1453 //-------------------------------------------------------------------
1455 //-------------------------------------------------------------------
1456 func (c *Control) WriteRESTSubscriptionToDb(restSubId string, restSubs *RESTSubscription) {
1457 xapp.Logger.Debug("WriteRESTSubscriptionToDb() restSubId = %s", restSubId)
1458 err := c.WriteRESTSubscriptionToSdl(restSubId, restSubs)
1460 xapp.Logger.Error("%v", err)
1464 //-------------------------------------------------------------------
1466 //-------------------------------------------------------------------
1467 func (c *Control) UpdateRESTSubscriptionInDB(restSubId string, restSubs *RESTSubscription, removeRestSubscriptionFromDb bool) {
1469 if removeRestSubscriptionFromDb == true {
1470 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1471 c.RemoveRESTSubscriptionFromDb(restSubId)
1473 c.WriteRESTSubscriptionToDb(restSubId, restSubs)
1477 //-------------------------------------------------------------------
1479 //-------------------------------------------------------------------
1480 func (c *Control) RemoveRESTSubscriptionFromDb(restSubId string) {
1481 xapp.Logger.Debug("RemoveRESTSubscriptionFromDb() restSubId = %s", restSubId)
1482 err := c.RemoveRESTSubscriptionFromSdl(restSubId)
1484 xapp.Logger.Error("%v", err)
1488 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) {
1490 if c.UTTesting == true {
1491 // Reqistry mutex is not locked after real restart but it can be when restart is simulated in unit tests
1492 c.registry.mutex = new(sync.Mutex)
1495 const ricRequestorId = 123
1496 xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
1498 // Send delete for every endpoint in the subscription
1499 if subs.PolicyUpdate == false {
1500 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1501 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1502 subDelReqMsg.RequestId.Id = ricRequestorId
1503 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1504 mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1506 xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
1509 for _, endPoint := range subs.EpList.Endpoints {
1510 params := &xapp.RMRParams{}
1511 params.Mtype = mType
1512 params.SubId = int(subs.GetReqId().InstanceId)
1514 params.Meid = subs.Meid
1515 params.Src = endPoint.String()
1516 params.PayloadLen = len(payload.Buf)
1517 params.Payload = payload.Buf
1519 subs.DeleteFromDb = true
1520 c.handleXAPPSubscriptionDeleteRequest(params)
1525 func (c *Control) PrintRESTSubscriptionRequest(p *models.SubscriptionParams) {
1527 fmt.Println("CRESTSubscriptionRequest")
1533 if p.SubscriptionID != "" {
1534 fmt.Println(" SubscriptionID = ", p.SubscriptionID)
1536 fmt.Println(" SubscriptionID = ''")
1539 fmt.Printf(" ClientEndpoint.Host = %s\n", p.ClientEndpoint.Host)
1541 if p.ClientEndpoint.HTTPPort != nil {
1542 fmt.Printf(" ClientEndpoint.HTTPPort = %v\n", *p.ClientEndpoint.HTTPPort)
1544 fmt.Println(" ClientEndpoint.HTTPPort = nil")
1547 if p.ClientEndpoint.RMRPort != nil {
1548 fmt.Printf(" ClientEndpoint.RMRPort = %v\n", *p.ClientEndpoint.RMRPort)
1550 fmt.Println(" ClientEndpoint.RMRPort = nil")
1554 fmt.Printf(" Meid = %s\n", *p.Meid)
1556 fmt.Println(" Meid = nil")
1559 if p.E2SubscriptionDirectives == nil {
1560 fmt.Println(" E2SubscriptionDirectives = nil")
1562 fmt.Println(" E2SubscriptionDirectives")
1563 if p.E2SubscriptionDirectives.E2RetryCount == nil {
1564 fmt.Println(" E2RetryCount == nil")
1566 fmt.Printf(" E2RetryCount = %v\n", *p.E2SubscriptionDirectives.E2RetryCount)
1568 fmt.Printf(" E2TimeoutTimerValue = %v\n", p.E2SubscriptionDirectives.E2TimeoutTimerValue)
1569 fmt.Printf(" RMRRoutingNeeded = %v\n", p.E2SubscriptionDirectives.RMRRoutingNeeded)
1571 for _, subscriptionDetail := range p.SubscriptionDetails {
1572 if p.RANFunctionID != nil {
1573 fmt.Printf(" RANFunctionID = %v\n", *p.RANFunctionID)
1575 fmt.Println(" RANFunctionID = nil")
1577 fmt.Printf(" SubscriptionDetail.XappEventInstanceID = %v\n", *subscriptionDetail.XappEventInstanceID)
1578 fmt.Printf(" SubscriptionDetail.EventTriggers = %v\n", subscriptionDetail.EventTriggers)
1580 for _, actionToBeSetup := range subscriptionDetail.ActionToBeSetupList {
1581 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionID = %v\n", *actionToBeSetup.ActionID)
1582 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionType = %s\n", *actionToBeSetup.ActionType)
1583 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionDefinition = %v\n", actionToBeSetup.ActionDefinition)
1585 if actionToBeSetup.SubsequentAction != nil {
1586 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.SubsequentAction.SubsequentActionType = %s\n", *actionToBeSetup.SubsequentAction.SubsequentActionType)
1587 fmt.Printf(" SubscriptionDetail.ActionToBeSetup..SubsequentAction.TimeToWait = %s\n", *actionToBeSetup.SubsequentAction.TimeToWait)
1589 fmt.Println(" SubscriptionDetail.ActionToBeSetup.SubsequentAction = nil")