2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
30 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
31 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
32 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
33 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/common"
34 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
35 httptransport "github.com/go-openapi/runtime/client"
36 "github.com/go-openapi/strfmt"
37 "github.com/gorilla/mux"
38 "github.com/segmentio/ksuid"
39 "github.com/spf13/viper"
42 //-----------------------------------------------------------------------------
44 //-----------------------------------------------------------------------------
46 func idstring(err error, entries ...fmt.Stringer) string {
47 var retval string = ""
48 var filler string = ""
49 for _, entry := range entries {
51 retval += filler + entry.String()
54 retval += filler + "(NIL)"
58 retval += filler + "err(" + err.Error() + ")"
64 //-----------------------------------------------------------------------------
66 //-----------------------------------------------------------------------------
68 var e2tSubReqTimeout time.Duration
69 var e2tSubDelReqTime time.Duration
70 var e2tRecvMsgTimeout time.Duration
71 var waitRouteCleanup_ms time.Duration
72 var e2tMaxSubReqTryCount uint64 // Initial try + retry
73 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
74 var checkE2State string
75 var readSubsFromDb string
76 var dbRetryForever string
84 restDuplicateCtrl *DuplicateCtrl
86 e2IfStateDb XappRnibInterface
88 restSubsDb Sdlnterface
91 Counters map[string]xapp.Counter
102 type SubmgrRestartTestEvent struct{}
103 type SubmgrRestartUpEvent struct{}
104 type PackSubscriptionRequestErrortEvent struct {
108 func (p *PackSubscriptionRequestErrortEvent) SetEvent(errorInfo *ErrorInfo) {
109 p.ErrorInfo = *errorInfo
112 type SDLWriteErrortEvent struct {
116 func (s *SDLWriteErrortEvent) SetEvent(errorInfo *ErrorInfo) {
117 s.ErrorInfo = *errorInfo
121 xapp.Logger.Debug("SUBMGR")
123 viper.SetEnvPrefix("submgr")
124 viper.AllowEmptyEnv(true)
127 func NewControl() *Control {
129 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
130 rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
132 registry := new(Registry)
133 registry.Initialize()
134 registry.rtmgrClient = &rtmgrClient
136 tracker := new(Tracker)
139 restDuplicateCtrl := new(DuplicateCtrl)
140 restDuplicateCtrl.Init()
142 e2IfState := new(E2IfState)
144 c := &Control{e2ap: new(E2ap),
147 restDuplicateCtrl: restDuplicateCtrl,
148 e2IfState: e2IfState,
149 e2IfStateDb: CreateXappRnibIfInstance(),
150 e2SubsDb: CreateSdl(),
151 restSubsDb: CreateRESTSdl(),
152 Counters: xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
157 c.ReadConfigParameters("")
159 // Register REST handler for testing support
160 xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
161 xapp.Resource.InjectRoute("/ric/v1/restsubscriptions", c.GetAllRestSubscriptions, "GET")
162 xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET")
164 if readSubsFromDb == "true" {
165 // Read subscriptions from db
166 c.ReadE2Subscriptions()
167 c.ReadRESTSubscriptions()
170 go xapp.Subscription.Listen(c.RESTSubscriptionHandler, c.RESTQueryHandler, c.RESTSubscriptionDeleteHandler)
174 func (c *Control) SymptomDataHandler(w http.ResponseWriter, r *http.Request) {
175 subscriptions, _ := c.registry.QueryHandler()
176 xapp.Resource.SendSymptomDataJson(w, r, subscriptions, "platform/subscriptions.json")
179 //-------------------------------------------------------------------
181 //-------------------------------------------------------------------
182 func (c *Control) GetAllRestSubscriptions(w http.ResponseWriter, r *http.Request) {
183 xapp.Logger.Debug("GetAllRestSubscriptions() called")
184 response := c.registry.GetAllRestSubscriptions()
188 //-------------------------------------------------------------------
190 //-------------------------------------------------------------------
191 func (c *Control) ReadE2Subscriptions() error {
194 var register map[uint32]*Subscription
195 for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
196 xapp.Logger.Debug("Reading E2 subscriptions from db")
197 subIds, register, err = c.ReadAllSubscriptionsFromSdl()
199 xapp.Logger.Error("%v", err)
200 <-time.After(1 * time.Second)
202 c.registry.subIds = subIds
203 c.registry.register = register
204 c.HandleUncompletedSubscriptions(register)
208 xapp.Logger.Debug("Continuing without retring")
212 //-------------------------------------------------------------------
214 //-------------------------------------------------------------------
215 func (c *Control) ReadRESTSubscriptions() error {
217 var restSubscriptions map[string]*RESTSubscription
218 for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
219 xapp.Logger.Debug("Reading REST subscriptions from db")
220 restSubscriptions, err = c.ReadAllRESTSubscriptionsFromSdl()
222 xapp.Logger.Error("%v", err)
223 <-time.After(1 * time.Second)
225 c.registry.restSubscriptions = restSubscriptions
229 xapp.Logger.Debug("Continuing without retring")
233 //-------------------------------------------------------------------
235 //-------------------------------------------------------------------
236 func (c *Control) ReadConfigParameters(f string) {
238 xapp.Logger.Debug("ReadConfigParameters")
240 c.LoggerLevel = int(xapp.Logger.GetLevel())
241 xapp.Logger.Debug("LoggerLevel= %v", c.LoggerLevel)
242 c.e2ap.SetASN1DebugPrintStatus(c.LoggerLevel)
244 // viper.GetDuration returns nanoseconds
245 e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
246 if e2tSubReqTimeout == 0 {
247 e2tSubReqTimeout = 2000 * 1000000
248 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tSubReqTimeout")
250 xapp.Logger.Debug("e2tSubReqTimeout= %v", e2tSubReqTimeout)
252 e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
253 if e2tSubDelReqTime == 0 {
254 e2tSubDelReqTime = 2000 * 1000000
255 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tSubDelReqTime")
257 xapp.Logger.Debug("e2tSubDelReqTime= %v", e2tSubDelReqTime)
259 e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
260 if e2tRecvMsgTimeout == 0 {
261 e2tRecvMsgTimeout = 2000 * 1000000
262 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tRecvMsgTimeout")
264 xapp.Logger.Debug("e2tRecvMsgTimeout= %v", e2tRecvMsgTimeout)
266 e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
267 if e2tMaxSubReqTryCount == 0 {
268 e2tMaxSubReqTryCount = 1
269 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tMaxSubReqTryCount")
271 xapp.Logger.Debug("e2tMaxSubReqTryCount= %v", e2tMaxSubReqTryCount)
273 e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
274 if e2tMaxSubDelReqTryCount == 0 {
275 e2tMaxSubDelReqTryCount = 1
276 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tMaxSubDelReqTryCount")
278 xapp.Logger.Debug("e2tMaxSubDelReqTryCount= %v", e2tMaxSubDelReqTryCount)
280 checkE2State = viper.GetString("controls.checkE2State")
281 if checkE2State == "" {
282 checkE2State = "true"
283 xapp.Logger.Debug("WARNING: Using hard coded default value for checkE2State")
285 xapp.Logger.Debug("checkE2State= %v", checkE2State)
287 readSubsFromDb = viper.GetString("controls.readSubsFromDb")
288 if readSubsFromDb == "" {
289 readSubsFromDb = "true"
290 xapp.Logger.Debug("WARNING: Using hard coded default value for readSubsFromDb")
292 xapp.Logger.Debug("readSubsFromDb= %v", readSubsFromDb)
294 dbTryCount = viper.GetInt("controls.dbTryCount")
297 xapp.Logger.Debug("WARNING: Using hard coded default value for dbTryCount")
299 xapp.Logger.Debug("dbTryCount= %v", dbTryCount)
301 dbRetryForever = viper.GetString("controls.dbRetryForever")
302 if dbRetryForever == "" {
303 dbRetryForever = "true"
304 xapp.Logger.Debug("WARNING: Using hard coded default value for dbRetryForever")
306 xapp.Logger.Debug("dbRetryForever= %v", dbRetryForever)
308 // Internal cfg parameter, used to define a wait time for RMR route clean-up. None default
309 // value 100ms used currently only in unittests.
310 waitRouteCleanup_ms = viper.GetDuration("controls.waitRouteCleanup_ms") * 1000000
311 if waitRouteCleanup_ms == 0 {
312 waitRouteCleanup_ms = 5000 * 1000000
313 xapp.Logger.Debug("WARNING: Using hard coded default value for waitRouteCleanup_ms")
315 xapp.Logger.Debug("waitRouteCleanup= %v", waitRouteCleanup_ms)
318 //-------------------------------------------------------------------
320 //-------------------------------------------------------------------
321 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
323 xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
324 for subId, subs := range register {
325 if subs.SubRespRcvd == false {
326 // If policy subscription has already been made successfully unsuccessful update should not be deleted.
327 if subs.PolicyUpdate == false {
328 subs.NoRespToXapp = true
329 xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
330 c.SendSubscriptionDeleteReq(subs)
336 func (c *Control) ReadyCB(data interface{}) {
337 if c.RMRClient == nil {
338 c.RMRClient = xapp.Rmr
342 func (c *Control) Run() {
343 xapp.SetReadyCB(c.ReadyCB, nil)
344 xapp.AddConfigChangeListener(c.ReadConfigParameters)
348 //-------------------------------------------------------------------
350 //-------------------------------------------------------------------
351 func (c *Control) GetOrCreateRestSubscription(p *models.SubscriptionParams, md5sum string, xAppRmrEndpoint string) (*RESTSubscription, string, error) {
354 var restSubscription *RESTSubscription
357 prevRestSubsId, exists := c.restDuplicateCtrl.GetLastKnownRestSubsIdBasedOnMd5sum(md5sum)
358 if p.SubscriptionID == "" {
359 // Subscription does not contain REST subscription Id
361 restSubscription, err = c.registry.GetRESTSubscription(prevRestSubsId, false)
362 if restSubscription != nil {
363 // Subscription not found
364 restSubId = prevRestSubsId
366 xapp.Logger.Debug("Existing restSubId %s found by MD5sum %s for a request without subscription ID - using previous subscription", prevRestSubsId, md5sum)
368 xapp.Logger.Debug("Existing restSubId %s found by MD5sum %s for a request without subscription ID - Note: %s", prevRestSubsId, md5sum, err.Error())
371 xapp.Logger.Debug("None existing restSubId %s referred by MD5sum %s for a request without subscription ID - deleting cached entry", prevRestSubsId, md5sum)
372 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
376 if restSubscription == nil {
377 restSubId = ksuid.New().String()
378 restSubscription = c.registry.CreateRESTSubscription(&restSubId, &xAppRmrEndpoint, p.Meid)
381 // Subscription contains REST subscription Id
382 restSubId = p.SubscriptionID
384 xapp.Logger.Debug("RestSubscription ID %s provided via REST request", restSubId)
385 restSubscription, err = c.registry.GetRESTSubscription(restSubId, false)
387 // Subscription with id in REST request does not exist
388 xapp.Logger.Error("%s", err.Error())
389 c.UpdateCounter(cRestSubFailToXapp)
394 xapp.Logger.Debug("Existing restSubscription found for ID %s, new request based on md5sum", restSubId)
396 xapp.Logger.Debug("Existing restSubscription found for ID %s(%s), re-transmission based on md5sum match with previous request", prevRestSubsId, restSubId)
400 return restSubscription, restSubId, nil
403 //-------------------------------------------------------------------
405 //-------------------------------------------------------------------
406 func (c *Control) RESTSubscriptionHandler(params interface{}) (*models.SubscriptionResponse, int) {
409 c.UpdateCounter(cRestSubReqFromXapp)
411 subResp := models.SubscriptionResponse{}
412 p := params.(*models.SubscriptionParams)
414 if c.LoggerLevel > 2 {
415 c.PrintRESTSubscriptionRequest(p)
418 if c.e2IfState.IsE2ConnectionUp(p.Meid) == false {
419 xapp.Logger.Error("No E2 connection for ranName %v", *p.Meid)
420 c.UpdateCounter(cRestReqRejDueE2Down)
421 return nil, common.SubscribeServiceUnavailableCode
424 if p.ClientEndpoint == nil {
425 err := fmt.Errorf("ClientEndpoint == nil")
426 xapp.Logger.Error("%v", err)
427 c.UpdateCounter(cRestSubFailToXapp)
428 return nil, common.SubscribeBadRequestCode
431 _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*p.ClientEndpoint)
433 xapp.Logger.Error("%s", err.Error())
434 c.UpdateCounter(cRestSubFailToXapp)
435 return nil, common.SubscribeBadRequestCode
438 md5sum, err := CalculateRequestMd5sum(params)
440 xapp.Logger.Error("Failed to generate md5sum from incoming request - %s", err.Error())
443 restSubscription, restSubId, err := c.GetOrCreateRestSubscription(p, md5sum, xAppRmrEndpoint)
445 xapp.Logger.Error("Subscription with id in REST request does not exist")
446 return nil, common.SubscribeNotFoundCode
449 subResp.SubscriptionID = &restSubId
450 subReqList := e2ap.SubscriptionRequestList{}
451 err = c.e2ap.FillSubscriptionReqMsgs(params, &subReqList, restSubscription)
453 xapp.Logger.Error("%s", err.Error())
454 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
455 c.registry.DeleteRESTSubscription(&restSubId)
456 c.UpdateCounter(cRestSubFailToXapp)
457 return nil, common.SubscribeBadRequestCode
460 duplicate := c.restDuplicateCtrl.IsDuplicateToOngoingTransaction(restSubId, md5sum)
462 err := fmt.Errorf("Retransmission blocker direct ACK for request of restSubsId %s restSubId MD5sum %s as retransmission", restSubId, md5sum)
463 xapp.Logger.Debug("%s", err)
464 c.UpdateCounter(cRestSubRespToXapp)
465 return &subResp, common.SubscribeCreatedCode
468 c.WriteRESTSubscriptionToDb(restSubId, restSubscription)
469 e2SubscriptionDirectives, err := c.GetE2SubscriptionDirectives(p)
471 xapp.Logger.Error("%s", err)
472 return nil, common.SubscribeBadRequestCode
474 go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId, xAppRmrEndpoint, md5sum, e2SubscriptionDirectives)
476 c.UpdateCounter(cRestSubRespToXapp)
477 return &subResp, common.SubscribeCreatedCode
480 //-------------------------------------------------------------------
482 //-------------------------------------------------------------------
483 func (c *Control) GetE2SubscriptionDirectives(p *models.SubscriptionParams) (*E2SubscriptionDirectives, error) {
485 e2SubscriptionDirectives := &E2SubscriptionDirectives{}
486 if p == nil || p.E2SubscriptionDirectives == nil {
487 e2SubscriptionDirectives.E2TimeoutTimerValue = e2tSubReqTimeout
488 e2SubscriptionDirectives.E2MaxTryCount = int64(e2tMaxSubReqTryCount)
489 e2SubscriptionDirectives.CreateRMRRoute = true
490 xapp.Logger.Debug("p == nil || p.E2SubscriptionDirectives == nil. Using default values for E2TimeoutTimerValue = %v and E2RetryCount = %v RMRRoutingNeeded = true", e2tSubReqTimeout, e2tMaxSubReqTryCount)
492 if p.E2SubscriptionDirectives.E2TimeoutTimerValue >= 1 && p.E2SubscriptionDirectives.E2TimeoutTimerValue <= 10 {
493 e2SubscriptionDirectives.E2TimeoutTimerValue = time.Duration(p.E2SubscriptionDirectives.E2TimeoutTimerValue) * 1000000000 // Duration type cast returns nano seconds
495 return nil, fmt.Errorf("p.E2SubscriptionDirectives.E2TimeoutTimerValue out of range (1-10 seconds): %v", p.E2SubscriptionDirectives.E2TimeoutTimerValue)
497 if p.E2SubscriptionDirectives.E2RetryCount == nil {
498 xapp.Logger.Error("p.E2SubscriptionDirectives.E2RetryCount == nil. Using default value")
499 e2SubscriptionDirectives.E2MaxTryCount = int64(e2tMaxSubReqTryCount)
501 if *p.E2SubscriptionDirectives.E2RetryCount >= 0 && *p.E2SubscriptionDirectives.E2RetryCount <= 10 {
502 e2SubscriptionDirectives.E2MaxTryCount = *p.E2SubscriptionDirectives.E2RetryCount + 1 // E2MaxTryCount = First sending plus two retries
504 return nil, fmt.Errorf("p.E2SubscriptionDirectives.E2RetryCount out of range (0-10): %v", *p.E2SubscriptionDirectives.E2RetryCount)
507 e2SubscriptionDirectives.CreateRMRRoute = p.E2SubscriptionDirectives.RMRRoutingNeeded
509 xapp.Logger.Debug("e2SubscriptionDirectives.E2TimeoutTimerValue: %v", e2SubscriptionDirectives.E2TimeoutTimerValue)
510 xapp.Logger.Debug("e2SubscriptionDirectives.E2MaxTryCount: %v", e2SubscriptionDirectives.E2MaxTryCount)
511 xapp.Logger.Debug("e2SubscriptionDirectives.CreateRMRRoute: %v", e2SubscriptionDirectives.CreateRMRRoute)
512 return e2SubscriptionDirectives, nil
515 //-------------------------------------------------------------------
517 //-------------------------------------------------------------------
519 func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription, subReqList *e2ap.SubscriptionRequestList,
520 clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string, xAppRmrEndpoint string, md5sum string, e2SubscriptionDirectives *E2SubscriptionDirectives) {
522 c.SubscriptionProcessingStartDelay()
523 xapp.Logger.Debug("Subscription Request count=%v ", len(subReqList.E2APSubscriptionRequests))
525 var xAppEventInstanceID int64
526 var e2EventInstanceID int64
527 errorInfo := &ErrorInfo{}
529 defer c.restDuplicateCtrl.SetMd5sumFromLastOkRequest(*restSubId, md5sum)
531 for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ {
532 subReqMsg := subReqList.E2APSubscriptionRequests[index]
533 xAppEventInstanceID = (int64)(subReqMsg.RequestId.Id)
535 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), *restSubId, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid})
537 // Send notification to xApp that prosessing of a Subscription Request has failed.
538 err := fmt.Errorf("Tracking failure")
539 errorInfo.ErrorCause = err.Error()
540 c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans, errorInfo)
544 xapp.Logger.Debug("Handle SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
546 subRespMsg, errorInfo, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, *restSubId, e2SubscriptionDirectives)
548 xapp.Logger.Debug("Handled SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
552 c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans, errorInfo)
554 e2EventInstanceID = (int64)(subRespMsg.RequestId.InstanceId)
555 restSubscription.AddMd5Sum(md5sum)
556 xapp.Logger.Debug("SubscriptionRequest index=%v processed successfullyfor %s. endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
557 index, *restSubId, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
558 c.sendSuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, e2EventInstanceID, clientEndpoint, trans)
563 //-------------------------------------------------------------------
565 //------------------------------------------------------------------
566 func (c *Control) SubscriptionProcessingStartDelay() {
567 if c.UTTesting == true {
568 // This is temporary fix for the UT problem that notification arrives before subscription response
569 // Correct fix would be to allow notification come before response and process it correctly
570 xapp.Logger.Debug("Setting 50 ms delay before starting processing Subscriptions")
571 <-time.After(time.Millisecond * 50)
572 xapp.Logger.Debug("Continuing after delay")
576 //-------------------------------------------------------------------
578 //------------------------------------------------------------------
579 func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, meid *string,
580 restSubId string, e2SubscriptionDirectives *E2SubscriptionDirectives) (*e2ap.E2APSubscriptionResponse, *ErrorInfo, error) {
582 errorInfo := ErrorInfo{}
584 err := c.tracker.Track(trans)
586 xapp.Logger.Error("XAPP-SubReq Tracking error: %s", idstring(err, trans))
587 errorInfo.ErrorCause = err.Error()
588 err = fmt.Errorf("Tracking failure")
589 return nil, &errorInfo, err
592 subs, errorInfo, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c, e2SubscriptionDirectives.CreateRMRRoute)
594 xapp.Logger.Error("XAPP-SubReq Assign error: %s", idstring(err, trans))
595 return nil, &errorInfo, err
601 subs.OngoingReqCount++
602 go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives)
603 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
604 subs.OngoingReqCount--
608 switch themsg := event.(type) {
609 case *e2ap.E2APSubscriptionResponse:
611 if c.e2IfState.IsE2ConnectionUp(meid) == true {
612 return themsg, &errorInfo, nil
614 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
615 c.RemoveSubscriptionFromDb(subs)
616 err = fmt.Errorf("E2 interface down")
617 errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
618 return nil, &errorInfo, err
620 case *e2ap.E2APSubscriptionFailure:
621 err = fmt.Errorf("E2 SubscriptionFailure received")
622 errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
623 return nil, &errorInfo, err
624 case *PackSubscriptionRequestErrortEvent:
625 err = fmt.Errorf("E2 SubscriptionRequest pack failure")
626 return nil, &themsg.ErrorInfo, err
627 case *SDLWriteErrortEvent:
628 err = fmt.Errorf("SDL write failure")
629 return nil, &themsg.ErrorInfo, err
631 err = fmt.Errorf("Unexpected E2 subscription response received")
632 errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
636 err = fmt.Errorf("E2 subscription response timeout")
637 errorInfo.SetInfo(err.Error(), "", models.SubscriptionInstanceTimeoutTypeE2Timeout)
638 if subs.PolicyUpdate == true {
639 return nil, &errorInfo, err
643 xapp.Logger.Error("XAPP-SubReq E2 subscription failed %s", idstring(err, trans, subs))
644 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
645 return nil, &errorInfo, err
648 //-------------------------------------------------------------------
650 //-------------------------------------------------------------------
651 func (c *Control) sendUnsuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, err error,
652 clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp, errorInfo *ErrorInfo) {
654 // Send notification to xApp that prosessing of a Subscription Request has failed.
655 e2EventInstanceID := (int64)(0)
656 if errorInfo.ErrorSource == "" {
657 // Submgr is default source of error
658 errorInfo.ErrorSource = models.SubscriptionInstanceErrorSourceSUBMGR
660 resp := &models.SubscriptionResponse{
661 SubscriptionID: restSubId,
662 SubscriptionInstances: []*models.SubscriptionInstance{
663 &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
664 ErrorCause: errorInfo.ErrorCause,
665 ErrorSource: errorInfo.ErrorSource,
666 TimeoutType: errorInfo.TimeoutType,
667 XappEventInstanceID: &xAppEventInstanceID},
670 // Mark REST subscription request processed.
671 restSubscription.SetProcessed(err)
672 c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
674 xapp.Logger.Debug("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
675 errorInfo.ErrorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
677 xapp.Logger.Debug("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v",
678 errorInfo.ErrorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID)
681 c.UpdateCounter(cRestSubFailNotifToXapp)
682 xapp.Subscription.Notify(resp, *clientEndpoint)
684 if c.e2IfState.IsE2ConnectionUp(&restSubscription.Meid) == false && restSubscription.SubReqOngoing == false {
685 c.registry.DeleteRESTSubscription(restSubId)
686 c.RemoveRESTSubscriptionFromDb(*restSubId)
690 //-------------------------------------------------------------------
692 //-------------------------------------------------------------------
693 func (c *Control) sendSuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, e2EventInstanceID int64,
694 clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp) {
696 // Store successfully processed InstanceId for deletion
697 restSubscription.AddE2InstanceId((uint32)(e2EventInstanceID))
698 restSubscription.AddXappIdToE2Id(xAppEventInstanceID, e2EventInstanceID)
700 // Send notification to xApp that a Subscription Request has been processed.
701 resp := &models.SubscriptionResponse{
702 SubscriptionID: restSubId,
703 SubscriptionInstances: []*models.SubscriptionInstance{
704 &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
706 XappEventInstanceID: &xAppEventInstanceID},
709 // Mark REST subscription request processesd.
710 restSubscription.SetProcessed(nil)
711 c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
712 xapp.Logger.Debug("Sending successful REST notification to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
713 clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
715 c.UpdateCounter(cRestSubNotifToXapp)
716 xapp.Subscription.Notify(resp, *clientEndpoint)
718 if c.e2IfState.IsE2ConnectionUp(&restSubscription.Meid) == false && restSubscription.SubReqOngoing == false {
719 c.registry.DeleteRESTSubscription(restSubId)
720 c.RemoveRESTSubscriptionFromDb(*restSubId)
724 //-------------------------------------------------------------------
726 //-------------------------------------------------------------------
727 func (c *Control) RESTSubscriptionDeleteHandler(restSubId string) int {
730 c.UpdateCounter(cRestSubDelReqFromXapp)
732 xapp.Logger.Debug("SubscriptionDeleteRequest from XAPP")
734 restSubscription, err := c.registry.GetRESTSubscription(restSubId, true)
736 xapp.Logger.Error("%s", err.Error())
737 if restSubscription == nil {
738 // Subscription was not found
739 c.UpdateCounter(cRestSubDelRespToXapp)
740 return common.UnsubscribeNoContentCode
742 if restSubscription.SubReqOngoing == true {
743 err := fmt.Errorf("Handling of the REST Subscription Request still ongoing %s", restSubId)
744 xapp.Logger.Error("%s", err.Error())
745 c.UpdateCounter(cRestSubDelFailToXapp)
746 return common.UnsubscribeBadRequestCode
747 } else if restSubscription.SubDelReqOngoing == true {
748 // Previous request for same restSubId still ongoing
749 c.UpdateCounter(cRestSubDelFailToXapp)
750 return common.UnsubscribeBadRequestCode
755 xAppRmrEndPoint := restSubscription.xAppRmrEndPoint
757 xapp.Logger.Debug("Deleteting handler: processing instances = %v", restSubscription.InstanceIds)
758 for _, instanceId := range restSubscription.InstanceIds {
759 xAppEventInstanceID, err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId)
762 xapp.Logger.Error("%s", err.Error())
764 xapp.Logger.Debug("Deleteting instanceId = %v", instanceId)
765 restSubscription.DeleteXappIdToE2Id(xAppEventInstanceID)
766 restSubscription.DeleteE2InstanceId(instanceId)
768 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(restSubscription.lastReqMd5sum)
769 c.registry.DeleteRESTSubscription(&restSubId)
770 c.RemoveRESTSubscriptionFromDb(restSubId)
773 c.UpdateCounter(cRestSubDelRespToXapp)
774 return common.UnsubscribeNoContentCode
777 //-------------------------------------------------------------------
779 //-------------------------------------------------------------------
780 func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32) (int64, error) {
782 var xAppEventInstanceID int64
783 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{instanceId})
785 xapp.Logger.Debug("Subscription Delete Handler subscription for restSubId=%v, E2EventInstanceID=%v not found %s",
786 restSubId, instanceId, idstring(err, nil))
787 return xAppEventInstanceID, nil
790 xAppEventInstanceID = int64(subs.ReqId.Id)
791 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(*endPoint), *restSubId, e2ap.RequestId{subs.ReqId.Id, 0}, &xapp.RMRMeid{RanName: *meid})
793 err := fmt.Errorf("XAPP-SubDelReq transaction not created. restSubId %s, endPoint %s, meid %s, instanceId %v", *restSubId, *endPoint, *meid, instanceId)
794 xapp.Logger.Error("%s", err.Error())
796 defer trans.Release()
798 err = c.tracker.Track(trans)
800 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
801 xapp.Logger.Error("%s", err.Error())
802 return xAppEventInstanceID, &time.ParseError{}
807 subs.OngoingDelCount++
808 go c.handleSubscriptionDelete(subs, trans)
809 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
810 subs.OngoingDelCount--
812 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
814 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
816 return xAppEventInstanceID, nil
819 //-------------------------------------------------------------------
821 //-------------------------------------------------------------------
822 func (c *Control) RESTQueryHandler() (models.SubscriptionList, error) {
823 xapp.Logger.Debug("RESTQueryHandler() called")
827 return c.registry.QueryHandler()
830 func (c *Control) TestRestHandler(w http.ResponseWriter, r *http.Request) {
831 xapp.Logger.Debug("RESTTestRestHandler() called")
833 pathParams := mux.Vars(r)
834 s := pathParams["testId"]
836 // This can be used to delete single subscription from db
837 if contains := strings.Contains(s, "deletesubid="); contains == true {
838 var splits = strings.Split(s, "=")
839 if subId, err := strconv.ParseInt(splits[1], 10, 64); err == nil {
840 xapp.Logger.Debug("RemoveSubscriptionFromSdl() called. subId = %v", subId)
841 c.RemoveSubscriptionFromSdl(uint32(subId))
846 // This can be used to remove all subscriptions db from
848 xapp.Logger.Debug("RemoveAllSubscriptionsFromSdl() called")
849 c.RemoveAllSubscriptionsFromSdl()
850 c.RemoveAllRESTSubscriptionsFromSdl()
854 // This is meant to cause submgr's restart in testing
856 xapp.Logger.Debug("os.Exit(1) called")
860 xapp.Logger.Debug("Unsupported rest command received %s", s)
863 //-------------------------------------------------------------------
865 //-------------------------------------------------------------------
867 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
868 params := &xapp.RMRParams{}
869 params.Mtype = trans.GetMtype()
870 params.SubId = int(subs.GetReqId().InstanceId)
872 params.Meid = subs.GetMeid()
874 params.PayloadLen = len(trans.Payload.Buf)
875 params.Payload = trans.Payload.Buf
877 xapp.Logger.Debug("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
878 err = c.SendWithRetry(params, false, 5)
880 xapp.Logger.Error("rmrSendToE2T: Send failed: %+v", err)
885 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
887 params := &xapp.RMRParams{}
888 params.Mtype = trans.GetMtype()
889 params.SubId = int(subs.GetReqId().InstanceId)
890 params.Xid = trans.GetXid()
891 params.Meid = trans.GetMeid()
893 params.PayloadLen = len(trans.Payload.Buf)
894 params.Payload = trans.Payload.Buf
896 xapp.Logger.Debug("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
897 err = c.SendWithRetry(params, false, 5)
899 xapp.Logger.Error("rmrSendToXapp: Send failed: %+v", err)
904 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
905 if c.RMRClient == nil {
906 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
907 xapp.Logger.Error("%s", err.Error())
912 defer c.RMRClient.Free(msg.Mbuf)
914 // xapp-frame might use direct access to c buffer and
915 // when msg.Mbuf is freed, someone might take it into use
916 // and payload data might be invalid inside message handle function
918 // subscriptions won't load system a lot so there is no
919 // real performance hit by cloning buffer into new go byte slice
920 cPay := append(msg.Payload[:0:0], msg.Payload...)
922 msg.PayloadLen = len(cPay)
925 case xapp.RIC_SUB_REQ:
926 go c.handleXAPPSubscriptionRequest(msg)
927 case xapp.RIC_SUB_RESP:
928 go c.handleE2TSubscriptionResponse(msg)
929 case xapp.RIC_SUB_FAILURE:
930 go c.handleE2TSubscriptionFailure(msg)
931 case xapp.RIC_SUB_DEL_REQ:
932 go c.handleXAPPSubscriptionDeleteRequest(msg)
933 case xapp.RIC_SUB_DEL_RESP:
934 go c.handleE2TSubscriptionDeleteResponse(msg)
935 case xapp.RIC_SUB_DEL_FAILURE:
936 go c.handleE2TSubscriptionDeleteFailure(msg)
938 xapp.Logger.Debug("Unknown Message Type '%d', discarding", msg.Mtype)
943 //-------------------------------------------------------------------
944 // handle from XAPP Subscription Request
945 //------------------------------------------------------------------
946 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
947 xapp.Logger.Debug("MSG from XAPP: %s", params.String())
948 c.UpdateCounter(cSubReqFromXapp)
950 if c.e2IfState.IsE2ConnectionUp(¶ms.Meid.RanName) == false {
951 xapp.Logger.Error("No E2 connection for ranName %v", params.Meid.RanName)
955 subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
957 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
961 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
963 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
966 defer trans.Release()
968 if err = c.tracker.Track(trans); err != nil {
969 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
973 //TODO handle subscription toward e2term inside AssignToSubscription / hide handleSubscriptionCreate in it?
974 subs, _, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c, true)
976 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
980 c.wakeSubscriptionRequest(subs, trans)
983 //-------------------------------------------------------------------
984 // Wake Subscription Request to E2node
985 //------------------------------------------------------------------
986 func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) {
988 e2SubscriptionDirectives, _ := c.GetE2SubscriptionDirectives(nil)
989 subs.OngoingReqCount++
990 go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives)
991 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
992 subs.OngoingReqCount--
995 switch themsg := event.(type) {
996 case *e2ap.E2APSubscriptionResponse:
997 themsg.RequestId.Id = trans.RequestId.Id
998 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
1001 c.UpdateCounter(cSubRespToXapp)
1002 c.rmrSendToXapp("", subs, trans)
1005 case *e2ap.E2APSubscriptionFailure:
1006 themsg.RequestId.Id = trans.RequestId.Id
1007 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
1009 c.UpdateCounter(cSubFailToXapp)
1010 c.rmrSendToXapp("", subs, trans)
1016 xapp.Logger.Debug("XAPP-SubReq: failed %s", idstring(err, trans, subs))
1017 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
1020 //-------------------------------------------------------------------
1021 // handle from XAPP Subscription Delete Request
1022 //------------------------------------------------------------------
1023 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
1024 xapp.Logger.Debug("MSG from XAPP: %s", params.String())
1025 c.UpdateCounter(cSubDelReqFromXapp)
1027 if c.e2IfState.IsE2ConnectionUp(¶ms.Meid.RanName) == false {
1028 xapp.Logger.Error("No E2 connection for ranName %v", params.Meid.RanName)
1032 subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
1034 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
1038 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
1040 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
1043 defer trans.Release()
1045 err = c.tracker.Track(trans)
1047 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
1051 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
1053 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
1060 subs.OngoingDelCount++
1061 go c.handleSubscriptionDelete(subs, trans)
1062 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
1063 subs.OngoingDelCount--
1065 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
1067 if subs.NoRespToXapp == true {
1068 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
1069 xapp.Logger.Debug("XAPP-SubDelReq: subs.NoRespToXapp == true")
1073 // Whatever is received success, fail or timeout, send successful delete response
1074 subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
1075 subDelRespMsg.RequestId.Id = trans.RequestId.Id
1076 subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
1077 subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
1078 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
1080 c.UpdateCounter(cSubDelRespToXapp)
1081 c.rmrSendToXapp("", subs, trans)
1084 //TODO handle subscription toward e2term insiged RemoveFromSubscription / hide handleSubscriptionDelete in it?
1085 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
1088 //-------------------------------------------------------------------
1089 // SUBS CREATE Handling
1090 //-------------------------------------------------------------------
1091 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp, e2SubscriptionDirectives *E2SubscriptionDirectives) {
1093 var event interface{} = nil
1094 var removeSubscriptionFromDb bool = false
1095 trans := c.tracker.NewSubsTransaction(subs)
1096 subs.WaitTransactionTurn(trans)
1097 defer subs.ReleaseTransactionTurn(trans)
1098 defer trans.Release()
1100 xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
1102 subRfMsg, valid := subs.GetCachedResponse()
1103 if subRfMsg == nil && valid == true {
1104 event = c.sendE2TSubscriptionRequest(subs, trans, parentTrans, e2SubscriptionDirectives)
1105 switch event.(type) {
1106 case *e2ap.E2APSubscriptionResponse:
1107 subRfMsg, valid = subs.SetCachedResponse(event, true)
1108 subs.SubRespRcvd = true
1109 case *e2ap.E2APSubscriptionFailure:
1110 removeSubscriptionFromDb = true
1111 subRfMsg, valid = subs.SetCachedResponse(event, false)
1112 xapp.Logger.Debug("SUBS-SubReq: internal delete due failure event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1113 case *SubmgrRestartTestEvent:
1114 // This simulates that no response has been received and after restart subscriptions are restored from db
1115 xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
1116 case *PackSubscriptionRequestErrortEvent, *SDLWriteErrortEvent:
1117 subRfMsg, valid = subs.SetCachedResponse(event, false)
1119 if subs.PolicyUpdate == false {
1120 xapp.Logger.Debug("SUBS-SubReq: internal delete due default event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1121 removeSubscriptionFromDb = true
1122 subRfMsg, valid = subs.SetCachedResponse(nil, false)
1123 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1126 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
1128 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
1131 err := c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
1133 subRfMsg, valid = subs.SetCachedResponse(event, false)
1134 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1137 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
1139 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
1142 parentTrans.SendEvent(subRfMsg, 0)
1145 //-------------------------------------------------------------------
1146 // SUBS DELETE Handling
1147 //-------------------------------------------------------------------
1149 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
1151 trans := c.tracker.NewSubsTransaction(subs)
1152 subs.WaitTransactionTurn(trans)
1153 defer subs.ReleaseTransactionTurn(trans)
1154 defer trans.Release()
1156 xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
1160 if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
1163 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1167 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
1168 // If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if
1169 // RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...))
1170 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
1171 c.registry.UpdateSubscriptionToDb(subs, c)
1172 parentTrans.SendEvent(nil, 0)
1175 //-------------------------------------------------------------------
1176 // send to E2T Subscription Request
1177 //-------------------------------------------------------------------
1178 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp, e2SubscriptionDirectives *E2SubscriptionDirectives) interface{} {
1180 var event interface{} = nil
1181 var timedOut bool = false
1182 const ricRequestorId = 123
1184 subReqMsg := subs.SubReqMsg
1185 subReqMsg.RequestId = subs.GetReqId().RequestId
1186 subReqMsg.RequestId.Id = ricRequestorId
1187 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
1189 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
1190 return &PackSubscriptionRequestErrortEvent{
1192 ErrorSource: models.SubscriptionInstanceErrorSourceASN1,
1193 ErrorCause: err.Error(),
1198 // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
1199 err = c.WriteSubscriptionToDb(subs)
1201 return &SDLWriteErrortEvent{
1203 ErrorSource: models.SubscriptionInstanceErrorSourceDBAAS,
1204 ErrorCause: err.Error(),
1209 for retries := int64(0); retries < e2SubscriptionDirectives.E2MaxTryCount; retries++ {
1210 desc := fmt.Sprintf("(retry %d)", retries)
1212 c.UpdateCounter(cSubReqToE2)
1214 c.UpdateCounter(cSubReReqToE2)
1216 c.rmrSendToE2T(desc, subs, trans)
1217 if subs.DoNotWaitSubResp == false {
1218 event, timedOut = trans.WaitEvent(e2SubscriptionDirectives.E2TimeoutTimerValue)
1220 c.UpdateCounter(cSubReqTimerExpiry)
1224 // Simulating case where subscrition request has been sent but response has not been received before restart
1225 event = &SubmgrRestartTestEvent{}
1226 xapp.Logger.Debug("Restart event, DoNotWaitSubResp == true")
1230 xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1234 //-------------------------------------------------------------------
1235 // send to E2T Subscription Delete Request
1236 //-------------------------------------------------------------------
1238 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
1240 var event interface{}
1242 const ricRequestorId = 123
1244 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1245 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1246 subDelReqMsg.RequestId.Id = ricRequestorId
1247 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1248 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1250 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
1254 for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
1255 desc := fmt.Sprintf("(retry %d)", retries)
1257 c.UpdateCounter(cSubDelReqToE2)
1259 c.UpdateCounter(cSubDelReReqToE2)
1261 c.rmrSendToE2T(desc, subs, trans)
1262 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
1264 c.UpdateCounter(cSubDelReqTimerExpiry)
1269 xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1273 //-------------------------------------------------------------------
1274 // handle from E2T Subscription Response
1275 //-------------------------------------------------------------------
1276 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
1277 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1278 c.UpdateCounter(cSubRespFromE2)
1280 subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
1282 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
1285 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
1287 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
1290 trans := subs.GetTransaction()
1292 err = fmt.Errorf("Ongoing transaction not found")
1293 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
1296 sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
1297 if sendOk == false {
1298 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1299 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
1304 //-------------------------------------------------------------------
1305 // handle from E2T Subscription Failure
1306 //-------------------------------------------------------------------
1307 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
1308 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1309 c.UpdateCounter(cSubFailFromE2)
1310 subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
1312 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
1315 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
1317 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
1320 trans := subs.GetTransaction()
1322 err = fmt.Errorf("Ongoing transaction not found")
1323 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
1326 sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
1327 if sendOk == false {
1328 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1329 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
1334 //-------------------------------------------------------------------
1335 // handle from E2T Subscription Delete Response
1336 //-------------------------------------------------------------------
1337 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
1338 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1339 c.UpdateCounter(cSubDelRespFromE2)
1340 subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
1342 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1345 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
1347 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1350 trans := subs.GetTransaction()
1352 err = fmt.Errorf("Ongoing transaction not found")
1353 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
1356 sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
1357 if sendOk == false {
1358 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1359 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
1364 //-------------------------------------------------------------------
1365 // handle from E2T Subscription Delete Failure
1366 //-------------------------------------------------------------------
1367 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
1368 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1369 c.UpdateCounter(cSubDelFailFromE2)
1370 subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
1372 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1375 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
1377 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1380 trans := subs.GetTransaction()
1382 err = fmt.Errorf("Ongoing transaction not found")
1383 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
1386 sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
1387 if sendOk == false {
1388 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1389 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
1394 //-------------------------------------------------------------------
1396 //-------------------------------------------------------------------
1397 func typeofSubsMessage(v interface{}) string {
1402 //case *e2ap.E2APSubscriptionRequest:
1404 case *e2ap.E2APSubscriptionResponse:
1406 case *e2ap.E2APSubscriptionFailure:
1408 //case *e2ap.E2APSubscriptionDeleteRequest:
1409 // return "SubDelReq"
1410 case *e2ap.E2APSubscriptionDeleteResponse:
1412 case *e2ap.E2APSubscriptionDeleteFailure:
1419 //-------------------------------------------------------------------
1421 //-------------------------------------------------------------------
1422 func (c *Control) WriteSubscriptionToDb(subs *Subscription) error {
1423 xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
1424 err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
1426 xapp.Logger.Error("%v", err)
1432 //-------------------------------------------------------------------
1434 //-------------------------------------------------------------------
1435 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) error {
1437 if removeSubscriptionFromDb == true {
1438 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1439 c.RemoveSubscriptionFromDb(subs)
1441 // Update is needed for successful response and merge case here
1442 if subs.RetryFromXapp == false {
1443 err := c.WriteSubscriptionToDb(subs)
1447 subs.RetryFromXapp = false
1451 //-------------------------------------------------------------------
1453 //-------------------------------------------------------------------
1454 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
1455 xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
1456 err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
1458 xapp.Logger.Error("%v", err)
1462 //-------------------------------------------------------------------
1464 //-------------------------------------------------------------------
1465 func (c *Control) WriteRESTSubscriptionToDb(restSubId string, restSubs *RESTSubscription) {
1466 xapp.Logger.Debug("WriteRESTSubscriptionToDb() restSubId = %s", restSubId)
1467 err := c.WriteRESTSubscriptionToSdl(restSubId, restSubs)
1469 xapp.Logger.Error("%v", err)
1473 //-------------------------------------------------------------------
1475 //-------------------------------------------------------------------
1476 func (c *Control) UpdateRESTSubscriptionInDB(restSubId string, restSubs *RESTSubscription, removeRestSubscriptionFromDb bool) {
1478 if removeRestSubscriptionFromDb == true {
1479 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1480 c.RemoveRESTSubscriptionFromDb(restSubId)
1482 c.WriteRESTSubscriptionToDb(restSubId, restSubs)
1486 //-------------------------------------------------------------------
1488 //-------------------------------------------------------------------
1489 func (c *Control) RemoveRESTSubscriptionFromDb(restSubId string) {
1490 xapp.Logger.Debug("RemoveRESTSubscriptionFromDb() restSubId = %s", restSubId)
1491 err := c.RemoveRESTSubscriptionFromSdl(restSubId)
1493 xapp.Logger.Error("%v", err)
1497 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) {
1499 const ricRequestorId = 123
1500 xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
1502 // Send delete for every endpoint in the subscription
1503 if subs.PolicyUpdate == false {
1504 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1505 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1506 subDelReqMsg.RequestId.Id = ricRequestorId
1507 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1508 mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1510 xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
1513 for _, endPoint := range subs.EpList.Endpoints {
1514 params := &xapp.RMRParams{}
1515 params.Mtype = mType
1516 params.SubId = int(subs.GetReqId().InstanceId)
1518 params.Meid = subs.Meid
1519 params.Src = endPoint.String()
1520 params.PayloadLen = len(payload.Buf)
1521 params.Payload = payload.Buf
1523 subs.DeleteFromDb = true
1524 c.handleXAPPSubscriptionDeleteRequest(params)
1529 func (c *Control) PrintRESTSubscriptionRequest(p *models.SubscriptionParams) {
1531 fmt.Println("CRESTSubscriptionRequest")
1537 if p.SubscriptionID != "" {
1538 fmt.Println(" SubscriptionID = ", p.SubscriptionID)
1540 fmt.Println(" SubscriptionID = ''")
1543 fmt.Printf(" ClientEndpoint.Host = %s\n", p.ClientEndpoint.Host)
1545 if p.ClientEndpoint.HTTPPort != nil {
1546 fmt.Printf(" ClientEndpoint.HTTPPort = %v\n", *p.ClientEndpoint.HTTPPort)
1548 fmt.Println(" ClientEndpoint.HTTPPort = nil")
1551 if p.ClientEndpoint.RMRPort != nil {
1552 fmt.Printf(" ClientEndpoint.RMRPort = %v\n", *p.ClientEndpoint.RMRPort)
1554 fmt.Println(" ClientEndpoint.RMRPort = nil")
1558 fmt.Printf(" Meid = %s\n", *p.Meid)
1560 fmt.Println(" Meid = nil")
1563 if p.E2SubscriptionDirectives == nil {
1564 fmt.Println(" E2SubscriptionDirectives = nil")
1566 fmt.Println(" E2SubscriptionDirectives")
1567 if p.E2SubscriptionDirectives.E2RetryCount == nil {
1568 fmt.Println(" E2RetryCount == nil")
1570 fmt.Printf(" E2RetryCount = %v\n", *p.E2SubscriptionDirectives.E2RetryCount)
1572 fmt.Printf(" E2TimeoutTimerValue = %v\n", p.E2SubscriptionDirectives.E2TimeoutTimerValue)
1573 fmt.Printf(" RMRRoutingNeeded = %v\n", p.E2SubscriptionDirectives.RMRRoutingNeeded)
1575 for _, subscriptionDetail := range p.SubscriptionDetails {
1576 if p.RANFunctionID != nil {
1577 fmt.Printf(" RANFunctionID = %v\n", *p.RANFunctionID)
1579 fmt.Println(" RANFunctionID = nil")
1581 fmt.Printf(" SubscriptionDetail.XappEventInstanceID = %v\n", *subscriptionDetail.XappEventInstanceID)
1582 fmt.Printf(" SubscriptionDetail.EventTriggers = %v\n", subscriptionDetail.EventTriggers)
1584 for _, actionToBeSetup := range subscriptionDetail.ActionToBeSetupList {
1585 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionID = %v\n", *actionToBeSetup.ActionID)
1586 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionType = %s\n", *actionToBeSetup.ActionType)
1587 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionDefinition = %v\n", actionToBeSetup.ActionDefinition)
1589 if actionToBeSetup.SubsequentAction != nil {
1590 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.SubsequentAction.SubsequentActionType = %s\n", *actionToBeSetup.SubsequentAction.SubsequentActionType)
1591 fmt.Printf(" SubscriptionDetail.ActionToBeSetup..SubsequentAction.TimeToWait = %s\n", *actionToBeSetup.SubsequentAction.TimeToWait)
1593 fmt.Println(" SubscriptionDetail.ActionToBeSetup.SubsequentAction = nil")