2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
28 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
29 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
30 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
31 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/common"
32 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
33 httptransport "github.com/go-openapi/runtime/client"
34 "github.com/go-openapi/strfmt"
35 "github.com/segmentio/ksuid"
36 "github.com/spf13/viper"
39 //-----------------------------------------------------------------------------
41 //-----------------------------------------------------------------------------
43 func idstring(err error, entries ...fmt.Stringer) string {
44 var retval string = ""
45 var filler string = ""
46 for _, entry := range entries {
48 retval += filler + entry.String()
51 retval += filler + "(NIL)"
55 retval += filler + "err(" + err.Error() + ")"
61 //-----------------------------------------------------------------------------
63 //-----------------------------------------------------------------------------
65 var e2tSubReqTimeout time.Duration
66 var e2tSubDelReqTime time.Duration
67 var e2tRecvMsgTimeout time.Duration
68 var waitRouteCleanup_ms time.Duration
69 var e2tMaxSubReqTryCount uint64 // Initial try + retry
70 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
71 var checkE2State string
72 var readSubsFromDb string
73 var dbRetryForever string
81 restDuplicateCtrl *DuplicateCtrl
83 e2IfStateDb XappRnibInterface
85 restSubsDb Sdlnterface
88 Counters map[string]xapp.Counter
99 type SubmgrRestartTestEvent struct{}
100 type SubmgrRestartUpEvent struct{}
101 type PackSubscriptionRequestErrortEvent struct {
105 func (p *PackSubscriptionRequestErrortEvent) SetEvent(errorInfo *ErrorInfo) {
106 p.ErrorInfo = *errorInfo
109 type SDLWriteErrortEvent struct {
113 func (s *SDLWriteErrortEvent) SetEvent(errorInfo *ErrorInfo) {
114 s.ErrorInfo = *errorInfo
118 xapp.Logger.Debug("SUBMGR")
120 viper.SetEnvPrefix("submgr")
121 viper.AllowEmptyEnv(true)
124 func NewControl() *Control {
126 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
127 rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
129 registry := new(Registry)
130 registry.Initialize()
131 registry.rtmgrClient = &rtmgrClient
133 tracker := new(Tracker)
136 restDuplicateCtrl := new(DuplicateCtrl)
137 restDuplicateCtrl.Init()
139 e2IfState := new(E2IfState)
141 c := &Control{e2ap: new(E2ap),
144 restDuplicateCtrl: restDuplicateCtrl,
145 e2IfState: e2IfState,
146 e2IfStateDb: CreateXappRnibIfInstance(),
147 e2SubsDb: CreateSdl(),
148 restSubsDb: CreateRESTSdl(),
149 Counters: xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
154 c.ReadConfigParameters("")
156 // Register REST handler for testing support
157 xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET")
158 xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
159 xapp.Resource.InjectRoute("/ric/v1/restsubscriptions", c.GetAllRestSubscriptions, "GET")
161 xapp.Resource.InjectRoute("/ric/v1/get_all_e2nodes", c.GetAllE2Nodes, "GET")
162 xapp.Resource.InjectRoute("/ric/v1/get_e2node_rest_subscriptions/{ranName}", c.GetAllE2NodeRestSubscriptions, "GET")
164 xapp.Resource.InjectRoute("/ric/v1/get_all_xapps", c.GetAllXapps, "GET")
165 xapp.Resource.InjectRoute("/ric/v1/get_xapp_rest_restsubscriptions/{xappServiceName}", c.GetAllXappRestSubscriptions, "GET")
166 xapp.Resource.InjectRoute("/ric/v1/get_e2subscriptions/{restId}", c.GetE2Subscriptions, "GET")
168 xapp.Resource.InjectRoute("/ric/v1/delete_all_e2node_subscriptions/{ranName}", c.DeleteAllE2nodeSubscriptions, "DELETE")
169 xapp.Resource.InjectRoute("/ric/v1/delete_all_xapp_subscriptions/{xappServiceName}", c.DeleteAllXappSubscriptions, "DELETE")
171 if readSubsFromDb == "true" {
172 // Read subscriptions from db
173 c.ReadE2Subscriptions()
174 c.ReadRESTSubscriptions()
177 go xapp.Subscription.Listen(c.RESTSubscriptionHandler, c.RESTQueryHandler, c.RESTSubscriptionDeleteHandler)
181 func (c *Control) SymptomDataHandler(w http.ResponseWriter, r *http.Request) {
182 subscriptions, _ := c.registry.QueryHandler()
183 xapp.Resource.SendSymptomDataJson(w, r, subscriptions, "platform/subscriptions.json")
186 //-------------------------------------------------------------------
188 //-------------------------------------------------------------------
189 func (c *Control) RESTQueryHandler() (models.SubscriptionList, error) {
190 xapp.Logger.Debug("RESTQueryHandler() called")
194 return c.registry.QueryHandler()
197 //-------------------------------------------------------------------
199 //-------------------------------------------------------------------
200 func (c *Control) ReadE2Subscriptions() error {
203 var register map[uint32]*Subscription
204 for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
205 xapp.Logger.Debug("Reading E2 subscriptions from db")
206 subIds, register, err = c.ReadAllSubscriptionsFromSdl()
208 xapp.Logger.Error("%v", err)
209 <-time.After(1 * time.Second)
211 c.registry.subIds = subIds
212 c.registry.register = register
213 go c.HandleUncompletedSubscriptions(register)
217 xapp.Logger.Debug("Continuing without retring")
221 //-------------------------------------------------------------------
223 //-------------------------------------------------------------------
224 func (c *Control) ReadRESTSubscriptions() error {
226 xapp.Logger.Debug("ReadRESTSubscriptions()")
228 var restSubscriptions map[string]*RESTSubscription
229 for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
230 xapp.Logger.Debug("Reading REST subscriptions from db")
231 restSubscriptions, err = c.ReadAllRESTSubscriptionsFromSdl()
233 xapp.Logger.Error("%v", err)
234 <-time.After(1 * time.Second)
236 // Fix REST subscriptions ongoing status after restart
237 for restSubId, restSubscription := range restSubscriptions {
238 restSubscription.SubReqOngoing = false
239 restSubscription.SubDelReqOngoing = false
240 c.WriteRESTSubscriptionToSdl(restSubId, restSubscription)
242 c.registry.restSubscriptions = restSubscriptions
246 xapp.Logger.Debug("Continuing without retring")
250 //-------------------------------------------------------------------
252 //-------------------------------------------------------------------
253 func (c *Control) ReadConfigParameters(f string) {
255 xapp.Logger.Debug("ReadConfigParameters")
257 c.LoggerLevel = int(xapp.Logger.GetLevel())
258 xapp.Logger.Info("LoggerLevel = %v", c.LoggerLevel)
259 c.e2ap.SetASN1DebugPrintStatus(c.LoggerLevel)
261 // viper.GetDuration returns nanoseconds
262 e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
263 if e2tSubReqTimeout == 0 {
264 e2tSubReqTimeout = 2000 * 1000000
265 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tSubReqTimeout")
267 xapp.Logger.Debug("e2tSubReqTimeout= %v", e2tSubReqTimeout)
269 e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
270 if e2tSubDelReqTime == 0 {
271 e2tSubDelReqTime = 2000 * 1000000
272 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tSubDelReqTime")
274 xapp.Logger.Debug("e2tSubDelReqTime= %v", e2tSubDelReqTime)
276 e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
277 if e2tRecvMsgTimeout == 0 {
278 e2tRecvMsgTimeout = 2000 * 1000000
279 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tRecvMsgTimeout")
281 xapp.Logger.Debug("e2tRecvMsgTimeout= %v", e2tRecvMsgTimeout)
283 e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
284 if e2tMaxSubReqTryCount == 0 {
285 e2tMaxSubReqTryCount = 1
286 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tMaxSubReqTryCount")
288 xapp.Logger.Debug("e2tMaxSubReqTryCount= %v", e2tMaxSubReqTryCount)
290 e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
291 if e2tMaxSubDelReqTryCount == 0 {
292 e2tMaxSubDelReqTryCount = 1
293 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tMaxSubDelReqTryCount")
295 xapp.Logger.Debug("e2tMaxSubDelReqTryCount= %v", e2tMaxSubDelReqTryCount)
297 checkE2State = viper.GetString("controls.checkE2State")
298 if checkE2State == "" {
299 checkE2State = "true"
300 xapp.Logger.Debug("WARNING: Using hard coded default value for checkE2State")
302 xapp.Logger.Debug("checkE2State= %v", checkE2State)
304 readSubsFromDb = viper.GetString("controls.readSubsFromDb")
305 if readSubsFromDb == "" {
306 readSubsFromDb = "true"
307 xapp.Logger.Debug("WARNING: Using hard coded default value for readSubsFromDb")
309 xapp.Logger.Debug("readSubsFromDb= %v", readSubsFromDb)
311 dbTryCount = viper.GetInt("controls.dbTryCount")
314 xapp.Logger.Debug("WARNING: Using hard coded default value for dbTryCount")
316 xapp.Logger.Debug("dbTryCount= %v", dbTryCount)
318 dbRetryForever = viper.GetString("controls.dbRetryForever")
319 if dbRetryForever == "" {
320 dbRetryForever = "true"
321 xapp.Logger.Debug("WARNING: Using hard coded default value for dbRetryForever")
323 xapp.Logger.Debug("dbRetryForever= %v", dbRetryForever)
325 // Internal cfg parameter, used to define a wait time for RMR route clean-up. None default
326 // value 100ms used currently only in unittests.
327 waitRouteCleanup_ms = viper.GetDuration("controls.waitRouteCleanup_ms") * 1000000
328 if waitRouteCleanup_ms == 0 {
329 waitRouteCleanup_ms = 5000 * 1000000
330 xapp.Logger.Debug("WARNING: Using hard coded default value for waitRouteCleanup_ms")
332 xapp.Logger.Debug("waitRouteCleanup= %v", waitRouteCleanup_ms)
335 //-------------------------------------------------------------------
337 //-------------------------------------------------------------------
338 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
340 xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
341 for subId, subs := range register {
342 if subs.SubRespRcvd == false {
343 // If policy subscription has already been made successfully unsuccessful update should not be deleted.
344 if subs.PolicyUpdate == false {
345 subs.NoRespToXapp = true
346 xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
347 c.SendSubscriptionDeleteReq(subs)
353 func (c *Control) ReadyCB(data interface{}) {
354 if c.RMRClient == nil {
355 c.RMRClient = xapp.Rmr
359 func (c *Control) Run() {
360 xapp.SetReadyCB(c.ReadyCB, nil)
361 xapp.AddConfigChangeListener(c.ReadConfigParameters)
365 //-------------------------------------------------------------------
367 //-------------------------------------------------------------------
368 func (c *Control) GetOrCreateRestSubscription(p *models.SubscriptionParams, md5sum string, xAppRmrEndpoint string, xAppServiceName string) (*RESTSubscription, string, error) {
371 var restSubscription *RESTSubscription
374 prevRestSubsId, exists := c.restDuplicateCtrl.GetLastKnownRestSubsIdBasedOnMd5sum(md5sum)
375 if p.SubscriptionID == "" {
376 // Subscription does not contain REST subscription Id
378 restSubscription, err = c.registry.GetRESTSubscription(prevRestSubsId, false)
379 if restSubscription != nil {
380 // Subscription not found
381 restSubId = prevRestSubsId
383 xapp.Logger.Debug("Existing restSubId %s found by MD5sum %s for a request without subscription ID - using previous subscription", prevRestSubsId, md5sum)
385 xapp.Logger.Debug("Existing restSubId %s found by MD5sum %s for a request without subscription ID - Note: %s", prevRestSubsId, md5sum, err.Error())
388 xapp.Logger.Debug("None existing restSubId %s referred by MD5sum %s for a request without subscription ID - deleting cached entry", prevRestSubsId, md5sum)
389 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
393 if restSubscription == nil {
394 restSubId = ksuid.New().String()
395 restSubscription = c.registry.CreateRESTSubscription(&restSubId, &xAppServiceName, &xAppRmrEndpoint, p.Meid)
398 // Subscription contains REST subscription Id
399 restSubId = p.SubscriptionID
401 xapp.Logger.Debug("RestSubscription ID %s provided via REST request", restSubId)
402 restSubscription, err = c.registry.GetRESTSubscription(restSubId, false)
404 // Subscription with id in REST request does not exist
405 xapp.Logger.Error("%s", err.Error())
406 c.UpdateCounter(cRestSubFailToXapp)
411 xapp.Logger.Debug("Existing restSubscription found for ID %s, new request based on md5sum", restSubId)
413 xapp.Logger.Debug("Existing restSubscription found for ID %s(%s), re-transmission based on md5sum match with previous request", prevRestSubsId, restSubId)
417 return restSubscription, restSubId, nil
420 //-------------------------------------------------------------------
422 //-------------------------------------------------------------------
423 func (c *Control) RESTSubscriptionHandler(params interface{}) (*models.SubscriptionResponse, int) {
426 c.UpdateCounter(cRestSubReqFromXapp)
428 subResp := models.SubscriptionResponse{}
429 p := params.(*models.SubscriptionParams)
431 if c.LoggerLevel > 2 {
432 c.PrintRESTSubscriptionRequest(p)
435 if c.e2IfState.IsE2ConnectionUp(p.Meid) == false {
436 xapp.Logger.Error("No E2 connection for ranName %v", *p.Meid)
437 c.UpdateCounter(cRestReqRejDueE2Down)
438 return nil, common.SubscribeServiceUnavailableCode
441 if p.ClientEndpoint == nil {
442 err := fmt.Errorf("ClientEndpoint == nil")
443 xapp.Logger.Error("%v", err)
444 c.UpdateCounter(cRestSubFailToXapp)
445 return nil, common.SubscribeBadRequestCode
448 e2SubscriptionDirectives, err := c.GetE2SubscriptionDirectives(p)
450 xapp.Logger.Error("%s", err)
451 c.UpdateCounter(cRestSubFailToXapp)
452 return nil, common.SubscribeBadRequestCode
454 _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*p.ClientEndpoint)
456 xapp.Logger.Error("%s", err.Error())
457 c.UpdateCounter(cRestSubFailToXapp)
458 return nil, common.SubscribeBadRequestCode
461 md5sum, err := CalculateRequestMd5sum(params)
463 xapp.Logger.Error("Failed to generate md5sum from incoming request - %s", err.Error())
466 restSubscription, restSubId, err := c.GetOrCreateRestSubscription(p, md5sum, xAppRmrEndpoint, p.ClientEndpoint.Host)
468 xapp.Logger.Error("Subscription with id in REST request does not exist")
469 return nil, common.SubscribeNotFoundCode
472 subResp.SubscriptionID = &restSubId
473 subReqList := e2ap.SubscriptionRequestList{}
474 err = c.e2ap.FillSubscriptionReqMsgs(params, &subReqList, restSubscription)
476 xapp.Logger.Error("%s", err.Error())
477 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
478 c.registry.DeleteRESTSubscription(&restSubId)
479 c.UpdateCounter(cRestSubFailToXapp)
480 return nil, common.SubscribeBadRequestCode
483 duplicate := c.restDuplicateCtrl.IsDuplicateToOngoingTransaction(restSubId, md5sum)
485 err := fmt.Errorf("Retransmission blocker direct ACK for request of restSubsId %s restSubId MD5sum %s as retransmission", restSubId, md5sum)
486 xapp.Logger.Debug("%s", err)
487 c.registry.DeleteRESTSubscription(&restSubId)
488 c.UpdateCounter(cRestSubRespToXapp)
489 return &subResp, common.SubscribeCreatedCode
492 c.WriteRESTSubscriptionToDb(restSubId, restSubscription)
493 go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId, xAppRmrEndpoint, md5sum, e2SubscriptionDirectives)
495 c.UpdateCounter(cRestSubRespToXapp)
496 return &subResp, common.SubscribeCreatedCode
499 //-------------------------------------------------------------------
501 //-------------------------------------------------------------------
502 func (c *Control) GetE2SubscriptionDirectives(p *models.SubscriptionParams) (*E2SubscriptionDirectives, error) {
504 e2SubscriptionDirectives := &E2SubscriptionDirectives{}
505 if p == nil || p.E2SubscriptionDirectives == nil {
506 e2SubscriptionDirectives.E2TimeoutTimerValue = e2tSubReqTimeout
507 e2SubscriptionDirectives.E2MaxTryCount = int64(e2tMaxSubReqTryCount)
508 e2SubscriptionDirectives.CreateRMRRoute = true
509 xapp.Logger.Debug("p == nil || p.E2SubscriptionDirectives == nil. Using default values for E2TimeoutTimerValue = %v and E2RetryCount = %v RMRRoutingNeeded = true", e2tSubReqTimeout, e2tMaxSubReqTryCount)
511 if p.E2SubscriptionDirectives.E2TimeoutTimerValue >= 1 && p.E2SubscriptionDirectives.E2TimeoutTimerValue <= 10 {
512 e2SubscriptionDirectives.E2TimeoutTimerValue = time.Duration(p.E2SubscriptionDirectives.E2TimeoutTimerValue) * 1000000000 // Duration type cast returns nano seconds
514 return nil, fmt.Errorf("p.E2SubscriptionDirectives.E2TimeoutTimerValue out of range (1-10 seconds): %v", p.E2SubscriptionDirectives.E2TimeoutTimerValue)
516 if p.E2SubscriptionDirectives.E2RetryCount == nil {
517 xapp.Logger.Error("p.E2SubscriptionDirectives.E2RetryCount == nil. Using default value")
518 e2SubscriptionDirectives.E2MaxTryCount = int64(e2tMaxSubReqTryCount)
520 if *p.E2SubscriptionDirectives.E2RetryCount >= 0 && *p.E2SubscriptionDirectives.E2RetryCount <= 10 {
521 e2SubscriptionDirectives.E2MaxTryCount = *p.E2SubscriptionDirectives.E2RetryCount + 1 // E2MaxTryCount = First sending plus two retries
523 return nil, fmt.Errorf("p.E2SubscriptionDirectives.E2RetryCount out of range (0-10): %v", *p.E2SubscriptionDirectives.E2RetryCount)
526 e2SubscriptionDirectives.CreateRMRRoute = p.E2SubscriptionDirectives.RMRRoutingNeeded
528 xapp.Logger.Debug("e2SubscriptionDirectives.E2TimeoutTimerValue: %v", e2SubscriptionDirectives.E2TimeoutTimerValue)
529 xapp.Logger.Debug("e2SubscriptionDirectives.E2MaxTryCount: %v", e2SubscriptionDirectives.E2MaxTryCount)
530 xapp.Logger.Debug("e2SubscriptionDirectives.CreateRMRRoute: %v", e2SubscriptionDirectives.CreateRMRRoute)
531 return e2SubscriptionDirectives, nil
534 //-------------------------------------------------------------------
536 //-------------------------------------------------------------------
538 func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription, subReqList *e2ap.SubscriptionRequestList,
539 clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string, xAppRmrEndpoint string, md5sum string, e2SubscriptionDirectives *E2SubscriptionDirectives) {
541 c.SubscriptionProcessingStartDelay()
542 xapp.Logger.Debug("E2 SubscriptionRequest count = %v ", len(subReqList.E2APSubscriptionRequests))
544 var xAppEventInstanceID int64
545 var e2EventInstanceID int64
546 errorInfo := &ErrorInfo{}
548 defer c.restDuplicateCtrl.SetMd5sumFromLastOkRequest(*restSubId, md5sum)
550 for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ {
551 subReqMsg := subReqList.E2APSubscriptionRequests[index]
552 xAppEventInstanceID = (int64)(subReqMsg.RequestId.Id)
554 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), *restSubId, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid})
556 // Send notification to xApp that prosessing of a Subscription Request has failed.
557 err := fmt.Errorf("Tracking failure")
558 errorInfo.ErrorCause = err.Error()
559 c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans, errorInfo)
563 xapp.Logger.Debug("Handle SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
565 subRespMsg, errorInfo, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, *restSubId, e2SubscriptionDirectives)
567 xapp.Logger.Debug("Handled SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
571 if err.Error() == "TEST: restart event received" {
572 // This is just for UT cases. Stop here subscription processing
575 c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans, errorInfo)
577 e2EventInstanceID = (int64)(subRespMsg.RequestId.InstanceId)
578 restSubscription.AddMd5Sum(md5sum)
579 xapp.Logger.Debug("SubscriptionRequest index=%v processed successfullyfor %s. endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
580 index, *restSubId, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
581 c.sendSuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, e2EventInstanceID, clientEndpoint, trans, errorInfo)
586 //-------------------------------------------------------------------
588 //------------------------------------------------------------------
589 func (c *Control) SubscriptionProcessingStartDelay() {
590 if c.UTTesting == true {
591 // This is temporary fix for the UT problem that notification arrives before subscription response
592 // Correct fix would be to allow notification come before response and process it correctly
593 xapp.Logger.Debug("Setting 50 ms delay before starting processing Subscriptions")
594 <-time.After(time.Millisecond * 50)
595 xapp.Logger.Debug("Continuing after delay")
599 //-------------------------------------------------------------------
601 //------------------------------------------------------------------
602 func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, meid *string,
603 restSubId string, e2SubscriptionDirectives *E2SubscriptionDirectives) (*e2ap.E2APSubscriptionResponse, *ErrorInfo, error) {
605 errorInfo := ErrorInfo{}
607 err := c.tracker.Track(trans)
609 xapp.Logger.Error("XAPP-SubReq Tracking error: %s", idstring(err, trans))
610 errorInfo.ErrorCause = err.Error()
611 err = fmt.Errorf("Tracking failure")
612 return nil, &errorInfo, err
615 subs, errorInfo, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c, e2SubscriptionDirectives.CreateRMRRoute)
617 xapp.Logger.Error("XAPP-SubReq Assign error: %s", idstring(err, trans))
618 return nil, &errorInfo, err
624 subs.OngoingReqCount++
625 go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives, 0)
626 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
627 subs.OngoingReqCount--
631 switch themsg := event.(type) {
632 case *e2ap.E2APSubscriptionResponse:
634 if c.e2IfState.IsE2ConnectionUp(meid) == true {
635 errorInfo = c.e2ap.CheckActionNotAdmittedList(xapp.RIC_SUB_RESP, themsg.ActionNotAdmittedList, c)
636 return themsg, &errorInfo, nil
638 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
639 c.RemoveSubscriptionFromDb(subs)
640 err = fmt.Errorf("E2 interface down")
641 errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
643 case *e2ap.E2APSubscriptionFailure:
644 err = fmt.Errorf("RICSubscriptionFailure. E2NodeCause: (Cause:%v, Value %v)", themsg.Cause.Content, themsg.Cause.Value)
645 errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
646 case *PackSubscriptionRequestErrortEvent:
647 err = fmt.Errorf("E2 RICSubscriptionRequest pack failure")
648 errorInfo = themsg.ErrorInfo
649 case *SDLWriteErrortEvent:
650 err = fmt.Errorf("SDL write failure")
651 errorInfo = themsg.ErrorInfo
652 case *SubmgrRestartTestEvent:
653 err = fmt.Errorf("TEST: restart event received")
654 xapp.Logger.Debug("%s", err)
655 return nil, &errorInfo, err
657 err = fmt.Errorf("Unexpected E2 subscription response received")
658 errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
663 err = fmt.Errorf("E2 RICSubscriptionResponse timeout")
664 errorInfo.SetInfo(err.Error(), "", models.SubscriptionInstanceTimeoutTypeE2Timeout)
665 if subs.PolicyUpdate == true {
666 return nil, &errorInfo, err
670 xapp.Logger.Error("XAPP-SubReq E2 subscription failed %s", idstring(err, trans, subs))
671 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
672 return nil, &errorInfo, err
675 //-------------------------------------------------------------------
677 //-------------------------------------------------------------------
678 func (c *Control) sendUnsuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, err error,
679 clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp, errorInfo *ErrorInfo) {
681 // Send notification to xApp that prosessing of a Subscription Request has failed.
682 e2EventInstanceID := (int64)(0)
683 if errorInfo.ErrorSource == "" {
684 // Submgr is default source of error
685 errorInfo.ErrorSource = models.SubscriptionInstanceErrorSourceSUBMGR
687 resp := &models.SubscriptionResponse{
688 SubscriptionID: restSubId,
689 SubscriptionInstances: []*models.SubscriptionInstance{
690 &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
691 ErrorCause: errorInfo.ErrorCause,
692 ErrorSource: errorInfo.ErrorSource,
693 TimeoutType: errorInfo.TimeoutType,
694 XappEventInstanceID: &xAppEventInstanceID},
697 // Mark REST subscription request processed.
698 restSubscription.SetProcessed(err)
699 c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
701 xapp.Logger.Debug("Sending unsuccessful REST notification: ErrorCause:%s, ErrorSource:%s, TimeoutType:%s, to Endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
702 errorInfo.ErrorCause, errorInfo.ErrorSource, errorInfo.TimeoutType, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
704 xapp.Logger.Debug("Sending unsuccessful REST notification: ErrorCause:%s, ErrorSource:%s, TimeoutType:%s, to Endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v",
705 errorInfo.ErrorCause, errorInfo.ErrorSource, errorInfo.TimeoutType, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID)
708 c.UpdateCounter(cRestSubFailNotifToXapp)
709 xapp.Subscription.Notify(resp, *clientEndpoint)
711 // E2 is down. Delete completely processed request safely now
712 if c.e2IfState.IsE2ConnectionUp(&restSubscription.Meid) == false && restSubscription.SubReqOngoing == false {
713 c.registry.DeleteRESTSubscription(restSubId)
714 c.RemoveRESTSubscriptionFromDb(*restSubId)
718 //-------------------------------------------------------------------
720 //-------------------------------------------------------------------
721 func (c *Control) sendSuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, e2EventInstanceID int64,
722 clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp, errorInfo *ErrorInfo) {
724 // Store successfully processed InstanceId for deletion
725 restSubscription.AddE2InstanceId((uint32)(e2EventInstanceID))
726 restSubscription.AddXappIdToE2Id(xAppEventInstanceID, e2EventInstanceID)
728 // Send notification to xApp that a Subscription Request has been processed.
729 resp := &models.SubscriptionResponse{
730 SubscriptionID: restSubId,
731 SubscriptionInstances: []*models.SubscriptionInstance{
732 &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
733 ErrorCause: errorInfo.ErrorCause,
734 ErrorSource: errorInfo.ErrorSource,
735 XappEventInstanceID: &xAppEventInstanceID},
738 // Mark REST subscription request processesd.
739 restSubscription.SetProcessed(nil)
740 c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
741 xapp.Logger.Debug("Sending successful REST notification: ErrorCause:%s, ErrorSource:%s, TimeoutType:%s, to Endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
742 errorInfo.ErrorCause, errorInfo.ErrorSource, errorInfo.TimeoutType, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
743 c.UpdateCounter(cRestSubNotifToXapp)
744 xapp.Subscription.Notify(resp, *clientEndpoint)
746 // E2 is down. Delete completely processed request safely now
747 if c.e2IfState.IsE2ConnectionUp(&restSubscription.Meid) == false && restSubscription.SubReqOngoing == false {
748 c.registry.DeleteRESTSubscription(restSubId)
749 c.RemoveRESTSubscriptionFromDb(*restSubId)
753 //-------------------------------------------------------------------
755 //-------------------------------------------------------------------
756 func (c *Control) RESTSubscriptionDeleteHandler(restSubId string) int {
759 c.UpdateCounter(cRestSubDelReqFromXapp)
761 xapp.Logger.Debug("SubscriptionDeleteRequest from XAPP")
763 restSubscription, err := c.registry.GetRESTSubscription(restSubId, true)
765 xapp.Logger.Error("%s", err.Error())
766 if restSubscription == nil {
767 // Subscription was not found
768 c.UpdateCounter(cRestSubDelRespToXapp)
769 return common.UnsubscribeNoContentCode
771 if restSubscription.SubReqOngoing == true {
772 err := fmt.Errorf("Handling of the REST Subscription Request still ongoing %s", restSubId)
773 xapp.Logger.Error("%s", err.Error())
774 c.UpdateCounter(cRestSubDelFailToXapp)
775 return common.UnsubscribeBadRequestCode
776 } else if restSubscription.SubDelReqOngoing == true {
777 // Previous request for same restSubId still ongoing
778 c.UpdateCounter(cRestSubDelRespToXapp)
779 return common.UnsubscribeNoContentCode
784 xAppRmrEndPoint := restSubscription.xAppRmrEndPoint
786 xapp.Logger.Debug("Deleteting handler: processing instances = %v", restSubscription.InstanceIds)
787 for _, instanceId := range restSubscription.InstanceIds {
788 xAppEventInstanceID, err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId, 0)
791 xapp.Logger.Error("%s", err.Error())
793 xapp.Logger.Debug("Deleteting instanceId = %v", instanceId)
794 restSubscription.DeleteXappIdToE2Id(xAppEventInstanceID)
795 restSubscription.DeleteE2InstanceId(instanceId)
797 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(restSubscription.lastReqMd5sum)
798 c.registry.DeleteRESTSubscription(&restSubId)
799 c.RemoveRESTSubscriptionFromDb(restSubId)
802 c.UpdateCounter(cRestSubDelRespToXapp)
803 return common.UnsubscribeNoContentCode
806 //-------------------------------------------------------------------
808 //-------------------------------------------------------------------
809 func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32, waitRouteCleanupTime time.Duration) (int64, error) {
811 var xAppEventInstanceID int64
812 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{instanceId})
814 xapp.Logger.Debug("Subscription Delete Handler subscription for restSubId=%v, E2EventInstanceID=%v not found %s",
815 restSubId, instanceId, idstring(err, nil))
816 return xAppEventInstanceID, nil
819 xAppEventInstanceID = int64(subs.ReqId.Id)
820 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(*endPoint), *restSubId, e2ap.RequestId{subs.ReqId.Id, 0}, &xapp.RMRMeid{RanName: *meid})
822 err := fmt.Errorf("XAPP-SubDelReq transaction not created. restSubId %s, endPoint %s, meid %s, instanceId %v", *restSubId, *endPoint, *meid, instanceId)
823 xapp.Logger.Error("%s", err.Error())
825 defer trans.Release()
827 err = c.tracker.Track(trans)
829 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
830 xapp.Logger.Error("%s", err.Error())
831 return xAppEventInstanceID, &time.ParseError{}
836 subs.OngoingDelCount++
837 go c.handleSubscriptionDelete(subs, trans, waitRouteCleanupTime)
838 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
839 subs.OngoingDelCount--
841 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
843 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
845 return xAppEventInstanceID, nil
848 //-------------------------------------------------------------------
850 //-------------------------------------------------------------------
852 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
853 params := &xapp.RMRParams{}
854 params.Mtype = trans.GetMtype()
855 params.SubId = int(subs.GetReqId().InstanceId)
857 params.Meid = subs.GetMeid()
859 params.PayloadLen = len(trans.Payload.Buf)
860 params.Payload = trans.Payload.Buf
862 xapp.Logger.Debug("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
863 err = c.SendWithRetry(params, false, 5)
865 xapp.Logger.Error("rmrSendToE2T: Send failed: %+v", err)
870 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
872 params := &xapp.RMRParams{}
873 params.Mtype = trans.GetMtype()
874 params.SubId = int(subs.GetReqId().InstanceId)
875 params.Xid = trans.GetXid()
876 params.Meid = trans.GetMeid()
878 params.PayloadLen = len(trans.Payload.Buf)
879 params.Payload = trans.Payload.Buf
881 xapp.Logger.Debug("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
882 err = c.SendWithRetry(params, false, 5)
884 xapp.Logger.Error("rmrSendToXapp: Send failed: %+v", err)
889 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
890 if c.RMRClient == nil {
891 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
892 xapp.Logger.Error("%s", err.Error())
897 defer c.RMRClient.Free(msg.Mbuf)
899 // xapp-frame might use direct access to c buffer and
900 // when msg.Mbuf is freed, someone might take it into use
901 // and payload data might be invalid inside message handle function
903 // subscriptions won't load system a lot so there is no
904 // real performance hit by cloning buffer into new go byte slice
905 cPay := append(msg.Payload[:0:0], msg.Payload...)
907 msg.PayloadLen = len(cPay)
910 case xapp.RIC_SUB_REQ:
911 go c.handleXAPPSubscriptionRequest(msg)
912 case xapp.RIC_SUB_RESP:
913 go c.handleE2TSubscriptionResponse(msg)
914 case xapp.RIC_SUB_FAILURE:
915 go c.handleE2TSubscriptionFailure(msg)
916 case xapp.RIC_SUB_DEL_REQ:
917 go c.handleXAPPSubscriptionDeleteRequest(msg)
918 case xapp.RIC_SUB_DEL_RESP:
919 go c.handleE2TSubscriptionDeleteResponse(msg)
920 case xapp.RIC_SUB_DEL_FAILURE:
921 go c.handleE2TSubscriptionDeleteFailure(msg)
923 xapp.Logger.Debug("Unknown Message Type '%d', discarding", msg.Mtype)
928 //-------------------------------------------------------------------
929 // handle from XAPP Subscription Request
930 //------------------------------------------------------------------
931 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
932 xapp.Logger.Debug("MSG from XAPP: %s", params.String())
933 c.UpdateCounter(cSubReqFromXapp)
935 if c.e2IfState.IsE2ConnectionUp(¶ms.Meid.RanName) == false {
936 xapp.Logger.Error("No E2 connection for ranName %v", params.Meid.RanName)
940 subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
942 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
946 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
948 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
951 defer trans.Release()
953 if err = c.tracker.Track(trans); err != nil {
954 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
958 subs, _, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c, true)
960 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
964 c.wakeSubscriptionRequest(subs, trans)
967 //-------------------------------------------------------------------
968 // Wake Subscription Request to E2node
969 //------------------------------------------------------------------
970 func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) {
972 e2SubscriptionDirectives, _ := c.GetE2SubscriptionDirectives(nil)
973 subs.OngoingReqCount++
974 go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives, waitRouteCleanup_ms)
975 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
976 subs.OngoingReqCount--
979 switch themsg := event.(type) {
980 case *e2ap.E2APSubscriptionResponse:
981 themsg.RequestId.Id = trans.RequestId.Id
982 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
985 c.UpdateCounter(cSubRespToXapp)
986 c.rmrSendToXapp("", subs, trans)
989 case *e2ap.E2APSubscriptionFailure:
990 themsg.RequestId.Id = trans.RequestId.Id
991 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
993 c.UpdateCounter(cSubFailToXapp)
994 c.rmrSendToXapp("", subs, trans)
1000 xapp.Logger.Debug("XAPP-SubReq: failed %s", idstring(err, trans, subs))
1003 //-------------------------------------------------------------------
1004 // handle from XAPP Subscription Delete Request
1005 //------------------------------------------------------------------
1006 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
1007 xapp.Logger.Debug("MSG from XAPP: %s", params.String())
1008 c.UpdateCounter(cSubDelReqFromXapp)
1010 if c.e2IfState.IsE2ConnectionUp(¶ms.Meid.RanName) == false {
1011 xapp.Logger.Error("No E2 connection for ranName %v", params.Meid.RanName)
1015 subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
1017 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
1021 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
1023 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
1026 defer trans.Release()
1028 err = c.tracker.Track(trans)
1030 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
1034 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
1036 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
1043 subs.OngoingDelCount++
1044 go c.handleSubscriptionDelete(subs, trans, waitRouteCleanup_ms)
1045 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
1046 subs.OngoingDelCount--
1048 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
1050 if subs.NoRespToXapp == true {
1051 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
1052 xapp.Logger.Debug("XAPP-SubDelReq: subs.NoRespToXapp == true")
1056 // Whatever is received success, fail or timeout, send successful delete response
1057 subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
1058 subDelRespMsg.RequestId.Id = trans.RequestId.Id
1059 subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
1060 subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
1061 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
1063 c.UpdateCounter(cSubDelRespToXapp)
1064 c.rmrSendToXapp("", subs, trans)
1068 //-------------------------------------------------------------------
1069 // SUBS CREATE Handling
1070 //-------------------------------------------------------------------
1071 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp, e2SubscriptionDirectives *E2SubscriptionDirectives, waitRouteCleanupTime time.Duration) {
1073 var event interface{} = nil
1074 var removeSubscriptionFromDb bool = false
1075 trans := c.tracker.NewSubsTransaction(subs)
1076 subs.WaitTransactionTurn(trans)
1077 defer subs.ReleaseTransactionTurn(trans)
1078 defer trans.Release()
1080 xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
1082 subRfMsg, valid := subs.GetCachedResponse()
1083 if subRfMsg == nil && valid == true {
1084 event = c.sendE2TSubscriptionRequest(subs, trans, parentTrans, e2SubscriptionDirectives)
1085 switch event.(type) {
1086 case *e2ap.E2APSubscriptionResponse:
1087 subRfMsg, valid = subs.SetCachedResponse(event, true)
1088 subs.SubRespRcvd = true
1089 case *e2ap.E2APSubscriptionFailure:
1090 subRfMsg, valid = subs.SetCachedResponse(event, false)
1091 xapp.Logger.Debug("SUBS-SubReq: internal delete due failure event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1092 case *SubmgrRestartTestEvent:
1093 // This is used to simulate that no response has been received and after restart, subscriptions are restored from db
1094 xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
1095 subRfMsg, valid = subs.SetCachedResponse(event, false)
1096 parentTrans.SendEvent(subRfMsg, 0)
1098 case *PackSubscriptionRequestErrortEvent, *SDLWriteErrortEvent:
1099 subRfMsg, valid = subs.SetCachedResponse(event, false)
1102 if subs.PolicyUpdate == false {
1103 xapp.Logger.Debug("SUBS-SubReq: internal delete due default event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1104 subRfMsg, valid = subs.SetCachedResponse(nil, false)
1105 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1107 subRfMsg, valid = subs.SetCachedResponse(nil, true)
1110 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
1112 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
1115 removeSubscriptionFromDb = true
1118 err := c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
1121 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1124 // Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
1126 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanupTime, c)
1129 parentTrans.SendEvent(subRfMsg, 0)
1132 //-------------------------------------------------------------------
1133 // SUBS DELETE Handling
1134 //-------------------------------------------------------------------
1136 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp, waitRouteCleanupTime time.Duration) {
1138 trans := c.tracker.NewSubsTransaction(subs)
1139 subs.WaitTransactionTurn(trans)
1140 defer subs.ReleaseTransactionTurn(trans)
1141 defer trans.Release()
1143 xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
1147 if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
1150 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1155 // Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
1156 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanupTime, c)
1157 parentTrans.SendEvent(nil, 0)
1160 //-------------------------------------------------------------------
1161 // send to E2T Subscription Request
1162 //-------------------------------------------------------------------
1163 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp, e2SubscriptionDirectives *E2SubscriptionDirectives) interface{} {
1165 var event interface{} = nil
1166 var timedOut bool = false
1167 const ricRequestorId = 123
1169 subReqMsg := subs.SubReqMsg
1170 subReqMsg.RequestId = subs.GetReqId().RequestId
1171 subReqMsg.RequestId.Id = ricRequestorId
1172 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
1174 xapp.Logger.Error("SUBS-SubReq ASN1 pack error: %s", idstring(err, trans, subs, parentTrans))
1175 return &PackSubscriptionRequestErrortEvent{
1177 ErrorSource: models.SubscriptionInstanceErrorSourceASN1,
1178 ErrorCause: err.Error(),
1183 // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
1184 err = c.WriteSubscriptionToDb(subs)
1186 return &SDLWriteErrortEvent{
1188 ErrorSource: models.SubscriptionInstanceErrorSourceDBAAS,
1189 ErrorCause: err.Error(),
1194 for retries := int64(0); retries < e2SubscriptionDirectives.E2MaxTryCount; retries++ {
1195 desc := fmt.Sprintf("(retry %d)", retries)
1197 c.UpdateCounter(cSubReqToE2)
1199 c.UpdateCounter(cSubReReqToE2)
1201 c.rmrSendToE2T(desc, subs, trans)
1202 if subs.DoNotWaitSubResp == false {
1203 event, timedOut = trans.WaitEvent(e2SubscriptionDirectives.E2TimeoutTimerValue)
1205 c.UpdateCounter(cSubReqTimerExpiry)
1209 // Simulating case where subscrition request has been sent but response has not been received before restart
1210 event = &SubmgrRestartTestEvent{}
1211 xapp.Logger.Debug("Restart event, DoNotWaitSubResp == true")
1215 xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1219 //-------------------------------------------------------------------
1220 // send to E2T Subscription Delete Request
1221 //-------------------------------------------------------------------
1223 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
1225 var event interface{}
1227 const ricRequestorId = 123
1229 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1230 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1231 subDelReqMsg.RequestId.Id = ricRequestorId
1232 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1233 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1235 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
1239 for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
1240 desc := fmt.Sprintf("(retry %d)", retries)
1242 c.UpdateCounter(cSubDelReqToE2)
1244 c.UpdateCounter(cSubDelReReqToE2)
1246 c.rmrSendToE2T(desc, subs, trans)
1247 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
1249 c.UpdateCounter(cSubDelReqTimerExpiry)
1254 xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1258 //-------------------------------------------------------------------
1259 // handle from E2T Subscription Response
1260 //-------------------------------------------------------------------
1261 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
1262 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1263 c.UpdateCounter(cSubRespFromE2)
1265 subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
1267 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
1270 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
1272 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
1275 trans := subs.GetTransaction()
1277 err = fmt.Errorf("Ongoing transaction not found")
1278 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
1281 xapp.Logger.Debug("SUBS-SubResp: Sending event, trans= %v", trans)
1282 sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
1283 if sendOk == false {
1284 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1285 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
1290 //-------------------------------------------------------------------
1291 // handle from E2T Subscription Failure
1292 //-------------------------------------------------------------------
1293 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
1294 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1295 c.UpdateCounter(cSubFailFromE2)
1296 subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
1298 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
1301 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
1303 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
1306 trans := subs.GetTransaction()
1308 err = fmt.Errorf("Ongoing transaction not found")
1309 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
1312 sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
1313 if sendOk == false {
1314 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1315 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
1320 //-------------------------------------------------------------------
1321 // handle from E2T Subscription Delete Response
1322 //-------------------------------------------------------------------
1323 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
1324 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1325 c.UpdateCounter(cSubDelRespFromE2)
1326 subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
1328 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1331 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
1333 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1336 trans := subs.GetTransaction()
1338 err = fmt.Errorf("Ongoing transaction not found")
1339 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
1342 sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
1343 if sendOk == false {
1344 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1345 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
1350 //-------------------------------------------------------------------
1351 // handle from E2T Subscription Delete Failure
1352 //-------------------------------------------------------------------
1353 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
1354 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1355 c.UpdateCounter(cSubDelFailFromE2)
1356 subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
1358 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1361 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
1363 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1366 trans := subs.GetTransaction()
1368 err = fmt.Errorf("Ongoing transaction not found")
1369 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
1372 sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
1373 if sendOk == false {
1374 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1375 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
1380 //-------------------------------------------------------------------
1382 //-------------------------------------------------------------------
1383 func typeofSubsMessage(v interface{}) string {
1388 //case *e2ap.E2APSubscriptionRequest:
1390 case *e2ap.E2APSubscriptionResponse:
1392 case *e2ap.E2APSubscriptionFailure:
1394 //case *e2ap.E2APSubscriptionDeleteRequest:
1395 // return "SubDelReq"
1396 case *e2ap.E2APSubscriptionDeleteResponse:
1398 case *e2ap.E2APSubscriptionDeleteFailure:
1405 //-------------------------------------------------------------------
1407 //-------------------------------------------------------------------
1408 func (c *Control) WriteSubscriptionToDb(subs *Subscription) error {
1409 xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
1410 err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
1412 xapp.Logger.Error("%v", err)
1418 //-------------------------------------------------------------------
1420 //-------------------------------------------------------------------
1421 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) error {
1423 if removeSubscriptionFromDb == true {
1424 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1425 c.RemoveSubscriptionFromDb(subs)
1427 // Update is needed for successful response and merge case here
1428 if subs.RetryFromXapp == false {
1429 err := c.WriteSubscriptionToDb(subs)
1433 subs.RetryFromXapp = false
1437 //-------------------------------------------------------------------
1439 //-------------------------------------------------------------------
1440 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
1441 xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
1442 err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
1444 xapp.Logger.Error("%v", err)
1448 //-------------------------------------------------------------------
1450 //-------------------------------------------------------------------
1451 func (c *Control) WriteRESTSubscriptionToDb(restSubId string, restSubs *RESTSubscription) {
1452 xapp.Logger.Debug("WriteRESTSubscriptionToDb() restSubId = %s", restSubId)
1453 err := c.WriteRESTSubscriptionToSdl(restSubId, restSubs)
1455 xapp.Logger.Error("%v", err)
1459 //-------------------------------------------------------------------
1461 //-------------------------------------------------------------------
1462 func (c *Control) UpdateRESTSubscriptionInDB(restSubId string, restSubs *RESTSubscription, removeRestSubscriptionFromDb bool) {
1464 if removeRestSubscriptionFromDb == true {
1465 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1466 c.RemoveRESTSubscriptionFromDb(restSubId)
1468 c.WriteRESTSubscriptionToDb(restSubId, restSubs)
1472 //-------------------------------------------------------------------
1474 //-------------------------------------------------------------------
1475 func (c *Control) RemoveRESTSubscriptionFromDb(restSubId string) {
1476 xapp.Logger.Debug("RemoveRESTSubscriptionFromDb() restSubId = %s", restSubId)
1477 err := c.RemoveRESTSubscriptionFromSdl(restSubId)
1479 xapp.Logger.Error("%v", err)
1483 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) {
1485 if c.UTTesting == true {
1486 // Reqistry mutex is not locked after real restart but it can be when restart is simulated in unit tests
1487 c.registry.mutex = new(sync.Mutex)
1490 const ricRequestorId = 123
1491 xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
1493 // Send delete for every endpoint in the subscription
1494 if subs.PolicyUpdate == false {
1495 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1496 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1497 subDelReqMsg.RequestId.Id = ricRequestorId
1498 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1499 mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1501 xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
1504 for _, endPoint := range subs.EpList.Endpoints {
1505 params := &xapp.RMRParams{}
1506 params.Mtype = mType
1507 params.SubId = int(subs.GetReqId().InstanceId)
1509 params.Meid = subs.Meid
1510 params.Src = endPoint.String()
1511 params.PayloadLen = len(payload.Buf)
1512 params.Payload = payload.Buf
1514 subs.DeleteFromDb = true
1515 c.handleXAPPSubscriptionDeleteRequest(params)
1520 func (c *Control) PrintRESTSubscriptionRequest(p *models.SubscriptionParams) {
1522 fmt.Println("CRESTSubscriptionRequest")
1528 if p.SubscriptionID != "" {
1529 fmt.Println(" SubscriptionID = ", p.SubscriptionID)
1531 fmt.Println(" SubscriptionID = ''")
1534 fmt.Printf(" ClientEndpoint.Host = %s\n", p.ClientEndpoint.Host)
1536 if p.ClientEndpoint.HTTPPort != nil {
1537 fmt.Printf(" ClientEndpoint.HTTPPort = %v\n", *p.ClientEndpoint.HTTPPort)
1539 fmt.Println(" ClientEndpoint.HTTPPort = nil")
1542 if p.ClientEndpoint.RMRPort != nil {
1543 fmt.Printf(" ClientEndpoint.RMRPort = %v\n", *p.ClientEndpoint.RMRPort)
1545 fmt.Println(" ClientEndpoint.RMRPort = nil")
1549 fmt.Printf(" Meid = %s\n", *p.Meid)
1551 fmt.Println(" Meid = nil")
1554 if p.E2SubscriptionDirectives == nil {
1555 fmt.Println(" E2SubscriptionDirectives = nil")
1557 fmt.Println(" E2SubscriptionDirectives")
1558 if p.E2SubscriptionDirectives.E2RetryCount == nil {
1559 fmt.Println(" E2RetryCount == nil")
1561 fmt.Printf(" E2RetryCount = %v\n", *p.E2SubscriptionDirectives.E2RetryCount)
1563 fmt.Printf(" E2TimeoutTimerValue = %v\n", p.E2SubscriptionDirectives.E2TimeoutTimerValue)
1564 fmt.Printf(" RMRRoutingNeeded = %v\n", p.E2SubscriptionDirectives.RMRRoutingNeeded)
1566 for _, subscriptionDetail := range p.SubscriptionDetails {
1567 if p.RANFunctionID != nil {
1568 fmt.Printf(" RANFunctionID = %v\n", *p.RANFunctionID)
1570 fmt.Println(" RANFunctionID = nil")
1572 fmt.Printf(" SubscriptionDetail.XappEventInstanceID = %v\n", *subscriptionDetail.XappEventInstanceID)
1573 fmt.Printf(" SubscriptionDetail.EventTriggers = %v\n", subscriptionDetail.EventTriggers)
1575 for _, actionToBeSetup := range subscriptionDetail.ActionToBeSetupList {
1576 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionID = %v\n", *actionToBeSetup.ActionID)
1577 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionType = %s\n", *actionToBeSetup.ActionType)
1578 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionDefinition = %v\n", actionToBeSetup.ActionDefinition)
1580 if actionToBeSetup.SubsequentAction != nil {
1581 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.SubsequentAction.SubsequentActionType = %s\n", *actionToBeSetup.SubsequentAction.SubsequentActionType)
1582 fmt.Printf(" SubscriptionDetail.ActionToBeSetup..SubsequentAction.TimeToWait = %s\n", *actionToBeSetup.SubsequentAction.TimeToWait)
1584 fmt.Println(" SubscriptionDetail.ActionToBeSetup.SubsequentAction = nil")