2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
30 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
31 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
32 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
33 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/common"
34 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
35 httptransport "github.com/go-openapi/runtime/client"
36 "github.com/go-openapi/strfmt"
37 "github.com/gorilla/mux"
38 "github.com/segmentio/ksuid"
39 "github.com/spf13/viper"
42 //-----------------------------------------------------------------------------
44 //-----------------------------------------------------------------------------
46 func idstring(err error, entries ...fmt.Stringer) string {
47 var retval string = ""
48 var filler string = ""
49 for _, entry := range entries {
51 retval += filler + entry.String()
54 retval += filler + "(NIL)"
58 retval += filler + "err(" + err.Error() + ")"
64 //-----------------------------------------------------------------------------
66 //-----------------------------------------------------------------------------
68 var e2tSubReqTimeout time.Duration
69 var e2tSubDelReqTime time.Duration
70 var e2tRecvMsgTimeout time.Duration
71 var waitRouteCleanup_ms time.Duration
72 var e2tMaxSubReqTryCount uint64 // Initial try + retry
73 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
74 var checkE2State string
75 var readSubsFromDb string
76 var dbRetryForever string
84 restDuplicateCtrl *DuplicateCtrl
86 e2IfStateDb XappRnibInterface
88 restSubsDb Sdlnterface
91 Counters map[string]xapp.Counter
102 type SubmgrRestartTestEvent struct{}
103 type SubmgrRestartUpEvent struct{}
104 type PackSubscriptionRequestErrortEvent struct {
108 func (p *PackSubscriptionRequestErrortEvent) SetEvent(errorInfo *ErrorInfo) {
109 p.ErrorInfo = *errorInfo
112 type SDLWriteErrortEvent struct {
116 func (s *SDLWriteErrortEvent) SetEvent(errorInfo *ErrorInfo) {
117 s.ErrorInfo = *errorInfo
121 xapp.Logger.Debug("SUBMGR")
123 viper.SetEnvPrefix("submgr")
124 viper.AllowEmptyEnv(true)
127 func NewControl() *Control {
129 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
130 rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
132 registry := new(Registry)
133 registry.Initialize()
134 registry.rtmgrClient = &rtmgrClient
136 tracker := new(Tracker)
139 restDuplicateCtrl := new(DuplicateCtrl)
140 restDuplicateCtrl.Init()
142 e2IfState := new(E2IfState)
144 c := &Control{e2ap: new(E2ap),
147 restDuplicateCtrl: restDuplicateCtrl,
148 e2IfState: e2IfState,
149 e2IfStateDb: CreateXappRnibIfInstance(),
150 e2SubsDb: CreateSdl(),
151 restSubsDb: CreateRESTSdl(),
152 Counters: xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
157 c.ReadConfigParameters("")
159 // Register REST handler for testing support
160 xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
161 xapp.Resource.InjectRoute("/ric/v1/restsubscriptions", c.GetAllRestSubscriptions, "GET")
162 xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET")
164 if readSubsFromDb == "true" {
165 // Read subscriptions from db
166 c.ReadE2Subscriptions()
167 c.ReadRESTSubscriptions()
170 go xapp.Subscription.Listen(c.RESTSubscriptionHandler, c.RESTQueryHandler, c.RESTSubscriptionDeleteHandler)
174 func (c *Control) SymptomDataHandler(w http.ResponseWriter, r *http.Request) {
175 subscriptions, _ := c.registry.QueryHandler()
176 xapp.Resource.SendSymptomDataJson(w, r, subscriptions, "platform/subscriptions.json")
179 //-------------------------------------------------------------------
181 //-------------------------------------------------------------------
182 func (c *Control) GetAllRestSubscriptions(w http.ResponseWriter, r *http.Request) {
183 xapp.Logger.Debug("GetAllRestSubscriptions() called")
184 response := c.registry.GetAllRestSubscriptions()
188 //-------------------------------------------------------------------
190 //-------------------------------------------------------------------
191 func (c *Control) ReadE2Subscriptions() error {
194 var register map[uint32]*Subscription
195 for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
196 xapp.Logger.Debug("Reading E2 subscriptions from db")
197 subIds, register, err = c.ReadAllSubscriptionsFromSdl()
199 xapp.Logger.Error("%v", err)
200 <-time.After(1 * time.Second)
202 c.registry.subIds = subIds
203 c.registry.register = register
204 c.HandleUncompletedSubscriptions(register)
208 xapp.Logger.Debug("Continuing without retring")
212 //-------------------------------------------------------------------
214 //-------------------------------------------------------------------
215 func (c *Control) ReadRESTSubscriptions() error {
217 var restSubscriptions map[string]*RESTSubscription
218 for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
219 xapp.Logger.Debug("Reading REST subscriptions from db")
220 restSubscriptions, err = c.ReadAllRESTSubscriptionsFromSdl()
222 xapp.Logger.Error("%v", err)
223 <-time.After(1 * time.Second)
225 c.registry.restSubscriptions = restSubscriptions
229 xapp.Logger.Debug("Continuing without retring")
233 //-------------------------------------------------------------------
235 //-------------------------------------------------------------------
236 func (c *Control) ReadConfigParameters(f string) {
238 xapp.Logger.Debug("ReadConfigParameters")
240 c.LoggerLevel = int(xapp.Logger.GetLevel())
241 xapp.Logger.Info("LoggerLevel = %v", c.LoggerLevel)
242 c.e2ap.SetASN1DebugPrintStatus(c.LoggerLevel)
244 // viper.GetDuration returns nanoseconds
245 e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
246 if e2tSubReqTimeout == 0 {
247 e2tSubReqTimeout = 2000 * 1000000
248 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tSubReqTimeout")
250 xapp.Logger.Debug("e2tSubReqTimeout= %v", e2tSubReqTimeout)
252 e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
253 if e2tSubDelReqTime == 0 {
254 e2tSubDelReqTime = 2000 * 1000000
255 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tSubDelReqTime")
257 xapp.Logger.Debug("e2tSubDelReqTime= %v", e2tSubDelReqTime)
259 e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
260 if e2tRecvMsgTimeout == 0 {
261 e2tRecvMsgTimeout = 2000 * 1000000
262 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tRecvMsgTimeout")
264 xapp.Logger.Debug("e2tRecvMsgTimeout= %v", e2tRecvMsgTimeout)
266 e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
267 if e2tMaxSubReqTryCount == 0 {
268 e2tMaxSubReqTryCount = 1
269 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tMaxSubReqTryCount")
271 xapp.Logger.Debug("e2tMaxSubReqTryCount= %v", e2tMaxSubReqTryCount)
273 e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
274 if e2tMaxSubDelReqTryCount == 0 {
275 e2tMaxSubDelReqTryCount = 1
276 xapp.Logger.Debug("WARNING: Using hard coded default value for e2tMaxSubDelReqTryCount")
278 xapp.Logger.Debug("e2tMaxSubDelReqTryCount= %v", e2tMaxSubDelReqTryCount)
280 checkE2State = viper.GetString("controls.checkE2State")
281 if checkE2State == "" {
282 checkE2State = "true"
283 xapp.Logger.Debug("WARNING: Using hard coded default value for checkE2State")
285 xapp.Logger.Debug("checkE2State= %v", checkE2State)
287 readSubsFromDb = viper.GetString("controls.readSubsFromDb")
288 if readSubsFromDb == "" {
289 readSubsFromDb = "true"
290 xapp.Logger.Debug("WARNING: Using hard coded default value for readSubsFromDb")
292 xapp.Logger.Debug("readSubsFromDb= %v", readSubsFromDb)
294 dbTryCount = viper.GetInt("controls.dbTryCount")
297 xapp.Logger.Debug("WARNING: Using hard coded default value for dbTryCount")
299 xapp.Logger.Debug("dbTryCount= %v", dbTryCount)
301 dbRetryForever = viper.GetString("controls.dbRetryForever")
302 if dbRetryForever == "" {
303 dbRetryForever = "true"
304 xapp.Logger.Debug("WARNING: Using hard coded default value for dbRetryForever")
306 xapp.Logger.Debug("dbRetryForever= %v", dbRetryForever)
308 // Internal cfg parameter, used to define a wait time for RMR route clean-up. None default
309 // value 100ms used currently only in unittests.
310 waitRouteCleanup_ms = viper.GetDuration("controls.waitRouteCleanup_ms") * 1000000
311 if waitRouteCleanup_ms == 0 {
312 waitRouteCleanup_ms = 5000 * 1000000
313 xapp.Logger.Debug("WARNING: Using hard coded default value for waitRouteCleanup_ms")
315 xapp.Logger.Debug("waitRouteCleanup= %v", waitRouteCleanup_ms)
318 //-------------------------------------------------------------------
320 //-------------------------------------------------------------------
321 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
323 xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
324 for subId, subs := range register {
325 if subs.SubRespRcvd == false {
326 // If policy subscription has already been made successfully unsuccessful update should not be deleted.
327 if subs.PolicyUpdate == false {
328 subs.NoRespToXapp = true
329 xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
330 c.SendSubscriptionDeleteReq(subs)
336 func (c *Control) ReadyCB(data interface{}) {
337 if c.RMRClient == nil {
338 c.RMRClient = xapp.Rmr
342 func (c *Control) Run() {
343 xapp.SetReadyCB(c.ReadyCB, nil)
344 xapp.AddConfigChangeListener(c.ReadConfigParameters)
348 //-------------------------------------------------------------------
350 //-------------------------------------------------------------------
351 func (c *Control) GetOrCreateRestSubscription(p *models.SubscriptionParams, md5sum string, xAppRmrEndpoint string) (*RESTSubscription, string, error) {
354 var restSubscription *RESTSubscription
357 prevRestSubsId, exists := c.restDuplicateCtrl.GetLastKnownRestSubsIdBasedOnMd5sum(md5sum)
358 if p.SubscriptionID == "" {
359 // Subscription does not contain REST subscription Id
361 restSubscription, err = c.registry.GetRESTSubscription(prevRestSubsId, false)
362 if restSubscription != nil {
363 // Subscription not found
364 restSubId = prevRestSubsId
366 xapp.Logger.Debug("Existing restSubId %s found by MD5sum %s for a request without subscription ID - using previous subscription", prevRestSubsId, md5sum)
368 xapp.Logger.Debug("Existing restSubId %s found by MD5sum %s for a request without subscription ID - Note: %s", prevRestSubsId, md5sum, err.Error())
371 xapp.Logger.Debug("None existing restSubId %s referred by MD5sum %s for a request without subscription ID - deleting cached entry", prevRestSubsId, md5sum)
372 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
376 if restSubscription == nil {
377 restSubId = ksuid.New().String()
378 restSubscription = c.registry.CreateRESTSubscription(&restSubId, &xAppRmrEndpoint, p.Meid)
381 // Subscription contains REST subscription Id
382 restSubId = p.SubscriptionID
384 xapp.Logger.Debug("RestSubscription ID %s provided via REST request", restSubId)
385 restSubscription, err = c.registry.GetRESTSubscription(restSubId, false)
387 // Subscription with id in REST request does not exist
388 xapp.Logger.Error("%s", err.Error())
389 c.UpdateCounter(cRestSubFailToXapp)
394 xapp.Logger.Debug("Existing restSubscription found for ID %s, new request based on md5sum", restSubId)
396 xapp.Logger.Debug("Existing restSubscription found for ID %s(%s), re-transmission based on md5sum match with previous request", prevRestSubsId, restSubId)
400 return restSubscription, restSubId, nil
403 //-------------------------------------------------------------------
405 //-------------------------------------------------------------------
406 func (c *Control) RESTSubscriptionHandler(params interface{}) (*models.SubscriptionResponse, int) {
409 c.UpdateCounter(cRestSubReqFromXapp)
411 subResp := models.SubscriptionResponse{}
412 p := params.(*models.SubscriptionParams)
414 if c.LoggerLevel > 2 {
415 c.PrintRESTSubscriptionRequest(p)
418 if c.e2IfState.IsE2ConnectionUp(p.Meid) == false {
419 xapp.Logger.Error("No E2 connection for ranName %v", *p.Meid)
420 c.UpdateCounter(cRestReqRejDueE2Down)
421 return nil, common.SubscribeServiceUnavailableCode
424 if p.ClientEndpoint == nil {
425 err := fmt.Errorf("ClientEndpoint == nil")
426 xapp.Logger.Error("%v", err)
427 c.UpdateCounter(cRestSubFailToXapp)
428 return nil, common.SubscribeBadRequestCode
431 _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*p.ClientEndpoint)
433 xapp.Logger.Error("%s", err.Error())
434 c.UpdateCounter(cRestSubFailToXapp)
435 return nil, common.SubscribeBadRequestCode
438 md5sum, err := CalculateRequestMd5sum(params)
440 xapp.Logger.Error("Failed to generate md5sum from incoming request - %s", err.Error())
443 restSubscription, restSubId, err := c.GetOrCreateRestSubscription(p, md5sum, xAppRmrEndpoint)
445 xapp.Logger.Error("Subscription with id in REST request does not exist")
446 return nil, common.SubscribeNotFoundCode
449 subResp.SubscriptionID = &restSubId
450 subReqList := e2ap.SubscriptionRequestList{}
451 err = c.e2ap.FillSubscriptionReqMsgs(params, &subReqList, restSubscription)
453 xapp.Logger.Error("%s", err.Error())
454 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
455 c.registry.DeleteRESTSubscription(&restSubId)
456 c.UpdateCounter(cRestSubFailToXapp)
457 return nil, common.SubscribeBadRequestCode
460 duplicate := c.restDuplicateCtrl.IsDuplicateToOngoingTransaction(restSubId, md5sum)
462 err := fmt.Errorf("Retransmission blocker direct ACK for request of restSubsId %s restSubId MD5sum %s as retransmission", restSubId, md5sum)
463 xapp.Logger.Debug("%s", err)
464 c.UpdateCounter(cRestSubRespToXapp)
465 return &subResp, common.SubscribeCreatedCode
468 c.WriteRESTSubscriptionToDb(restSubId, restSubscription)
469 e2SubscriptionDirectives, err := c.GetE2SubscriptionDirectives(p)
471 xapp.Logger.Error("%s", err)
472 return nil, common.SubscribeBadRequestCode
474 go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId, xAppRmrEndpoint, md5sum, e2SubscriptionDirectives)
476 c.UpdateCounter(cRestSubRespToXapp)
477 return &subResp, common.SubscribeCreatedCode
480 //-------------------------------------------------------------------
482 //-------------------------------------------------------------------
483 func (c *Control) GetE2SubscriptionDirectives(p *models.SubscriptionParams) (*E2SubscriptionDirectives, error) {
485 e2SubscriptionDirectives := &E2SubscriptionDirectives{}
486 if p == nil || p.E2SubscriptionDirectives == nil {
487 e2SubscriptionDirectives.E2TimeoutTimerValue = e2tSubReqTimeout
488 e2SubscriptionDirectives.E2MaxTryCount = int64(e2tMaxSubReqTryCount)
489 e2SubscriptionDirectives.CreateRMRRoute = true
490 xapp.Logger.Debug("p == nil || p.E2SubscriptionDirectives == nil. Using default values for E2TimeoutTimerValue = %v and E2RetryCount = %v RMRRoutingNeeded = true", e2tSubReqTimeout, e2tMaxSubReqTryCount)
492 if p.E2SubscriptionDirectives.E2TimeoutTimerValue >= 1 && p.E2SubscriptionDirectives.E2TimeoutTimerValue <= 10 {
493 e2SubscriptionDirectives.E2TimeoutTimerValue = time.Duration(p.E2SubscriptionDirectives.E2TimeoutTimerValue) * 1000000000 // Duration type cast returns nano seconds
495 return nil, fmt.Errorf("p.E2SubscriptionDirectives.E2TimeoutTimerValue out of range (1-10 seconds): %v", p.E2SubscriptionDirectives.E2TimeoutTimerValue)
497 if p.E2SubscriptionDirectives.E2RetryCount == nil {
498 xapp.Logger.Error("p.E2SubscriptionDirectives.E2RetryCount == nil. Using default value")
499 e2SubscriptionDirectives.E2MaxTryCount = int64(e2tMaxSubReqTryCount)
501 if *p.E2SubscriptionDirectives.E2RetryCount >= 0 && *p.E2SubscriptionDirectives.E2RetryCount <= 10 {
502 e2SubscriptionDirectives.E2MaxTryCount = *p.E2SubscriptionDirectives.E2RetryCount + 1 // E2MaxTryCount = First sending plus two retries
504 return nil, fmt.Errorf("p.E2SubscriptionDirectives.E2RetryCount out of range (0-10): %v", *p.E2SubscriptionDirectives.E2RetryCount)
507 e2SubscriptionDirectives.CreateRMRRoute = p.E2SubscriptionDirectives.RMRRoutingNeeded
509 xapp.Logger.Debug("e2SubscriptionDirectives.E2TimeoutTimerValue: %v", e2SubscriptionDirectives.E2TimeoutTimerValue)
510 xapp.Logger.Debug("e2SubscriptionDirectives.E2MaxTryCount: %v", e2SubscriptionDirectives.E2MaxTryCount)
511 xapp.Logger.Debug("e2SubscriptionDirectives.CreateRMRRoute: %v", e2SubscriptionDirectives.CreateRMRRoute)
512 return e2SubscriptionDirectives, nil
515 //-------------------------------------------------------------------
517 //-------------------------------------------------------------------
519 func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription, subReqList *e2ap.SubscriptionRequestList,
520 clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string, xAppRmrEndpoint string, md5sum string, e2SubscriptionDirectives *E2SubscriptionDirectives) {
522 c.SubscriptionProcessingStartDelay()
523 xapp.Logger.Debug("Subscription Request count=%v ", len(subReqList.E2APSubscriptionRequests))
525 var xAppEventInstanceID int64
526 var e2EventInstanceID int64
527 errorInfo := &ErrorInfo{}
529 defer c.restDuplicateCtrl.SetMd5sumFromLastOkRequest(*restSubId, md5sum)
531 for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ {
532 subReqMsg := subReqList.E2APSubscriptionRequests[index]
533 xAppEventInstanceID = (int64)(subReqMsg.RequestId.Id)
535 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), *restSubId, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid})
537 // Send notification to xApp that prosessing of a Subscription Request has failed.
538 err := fmt.Errorf("Tracking failure")
539 errorInfo.ErrorCause = err.Error()
540 c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans, errorInfo)
544 xapp.Logger.Debug("Handle SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
546 subRespMsg, errorInfo, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, *restSubId, e2SubscriptionDirectives)
548 xapp.Logger.Debug("Handled SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
552 c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans, errorInfo)
554 e2EventInstanceID = (int64)(subRespMsg.RequestId.InstanceId)
555 restSubscription.AddMd5Sum(md5sum)
556 xapp.Logger.Debug("SubscriptionRequest index=%v processed successfullyfor %s. endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
557 index, *restSubId, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
558 c.sendSuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, e2EventInstanceID, clientEndpoint, trans)
563 //-------------------------------------------------------------------
565 //------------------------------------------------------------------
566 func (c *Control) SubscriptionProcessingStartDelay() {
567 if c.UTTesting == true {
568 // This is temporary fix for the UT problem that notification arrives before subscription response
569 // Correct fix would be to allow notification come before response and process it correctly
570 xapp.Logger.Debug("Setting 50 ms delay before starting processing Subscriptions")
571 <-time.After(time.Millisecond * 50)
572 xapp.Logger.Debug("Continuing after delay")
576 //-------------------------------------------------------------------
578 //------------------------------------------------------------------
579 func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, meid *string,
580 restSubId string, e2SubscriptionDirectives *E2SubscriptionDirectives) (*e2ap.E2APSubscriptionResponse, *ErrorInfo, error) {
582 errorInfo := ErrorInfo{}
584 err := c.tracker.Track(trans)
586 xapp.Logger.Error("XAPP-SubReq Tracking error: %s", idstring(err, trans))
587 errorInfo.ErrorCause = err.Error()
588 err = fmt.Errorf("Tracking failure")
589 return nil, &errorInfo, err
592 subs, errorInfo, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c, e2SubscriptionDirectives.CreateRMRRoute)
594 xapp.Logger.Error("XAPP-SubReq Assign error: %s", idstring(err, trans))
595 return nil, &errorInfo, err
601 subs.OngoingReqCount++
602 go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives)
603 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
604 subs.OngoingReqCount--
608 switch themsg := event.(type) {
609 case *e2ap.E2APSubscriptionResponse:
611 if c.e2IfState.IsE2ConnectionUp(meid) == true {
612 return themsg, &errorInfo, nil
614 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
615 c.RemoveSubscriptionFromDb(subs)
616 err = fmt.Errorf("E2 interface down")
617 errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
618 return nil, &errorInfo, err
620 case *e2ap.E2APSubscriptionFailure:
621 err = fmt.Errorf("E2 SubscriptionFailure received")
622 errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
623 return nil, &errorInfo, err
624 case *PackSubscriptionRequestErrortEvent:
625 err = fmt.Errorf("E2 SubscriptionRequest pack failure")
626 return nil, &themsg.ErrorInfo, err
627 case *SDLWriteErrortEvent:
628 err = fmt.Errorf("SDL write failure")
629 return nil, &themsg.ErrorInfo, err
631 err = fmt.Errorf("Unexpected E2 subscription response received")
632 errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
637 err = fmt.Errorf("E2 subscription response timeout")
638 errorInfo.SetInfo(err.Error(), "", models.SubscriptionInstanceTimeoutTypeE2Timeout)
639 if subs.PolicyUpdate == true {
640 return nil, &errorInfo, err
644 xapp.Logger.Error("XAPP-SubReq E2 subscription failed %s", idstring(err, trans, subs))
646 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
647 return nil, &errorInfo, err
650 //-------------------------------------------------------------------
652 //-------------------------------------------------------------------
653 func (c *Control) sendUnsuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, err error,
654 clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp, errorInfo *ErrorInfo) {
656 // Send notification to xApp that prosessing of a Subscription Request has failed.
657 e2EventInstanceID := (int64)(0)
658 if errorInfo.ErrorSource == "" {
659 // Submgr is default source of error
660 errorInfo.ErrorSource = models.SubscriptionInstanceErrorSourceSUBMGR
662 resp := &models.SubscriptionResponse{
663 SubscriptionID: restSubId,
664 SubscriptionInstances: []*models.SubscriptionInstance{
665 &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
666 ErrorCause: errorInfo.ErrorCause,
667 ErrorSource: errorInfo.ErrorSource,
668 TimeoutType: errorInfo.TimeoutType,
669 XappEventInstanceID: &xAppEventInstanceID},
672 // Mark REST subscription request processed.
673 restSubscription.SetProcessed(err)
674 c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
676 xapp.Logger.Debug("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
677 errorInfo.ErrorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
679 xapp.Logger.Debug("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v",
680 errorInfo.ErrorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID)
683 c.UpdateCounter(cRestSubFailNotifToXapp)
684 xapp.Subscription.Notify(resp, *clientEndpoint)
686 if c.e2IfState.IsE2ConnectionUp(&restSubscription.Meid) == false && restSubscription.SubReqOngoing == false {
687 c.registry.DeleteRESTSubscription(restSubId)
688 c.RemoveRESTSubscriptionFromDb(*restSubId)
692 //-------------------------------------------------------------------
694 //-------------------------------------------------------------------
695 func (c *Control) sendSuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, e2EventInstanceID int64,
696 clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp) {
698 // Store successfully processed InstanceId for deletion
699 restSubscription.AddE2InstanceId((uint32)(e2EventInstanceID))
700 restSubscription.AddXappIdToE2Id(xAppEventInstanceID, e2EventInstanceID)
702 // Send notification to xApp that a Subscription Request has been processed.
703 resp := &models.SubscriptionResponse{
704 SubscriptionID: restSubId,
705 SubscriptionInstances: []*models.SubscriptionInstance{
706 &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
708 XappEventInstanceID: &xAppEventInstanceID},
711 // Mark REST subscription request processesd.
712 restSubscription.SetProcessed(nil)
713 c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
714 xapp.Logger.Debug("Sending successful REST notification to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
715 clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
717 c.UpdateCounter(cRestSubNotifToXapp)
718 xapp.Subscription.Notify(resp, *clientEndpoint)
720 if c.e2IfState.IsE2ConnectionUp(&restSubscription.Meid) == false && restSubscription.SubReqOngoing == false {
721 c.registry.DeleteRESTSubscription(restSubId)
722 c.RemoveRESTSubscriptionFromDb(*restSubId)
726 //-------------------------------------------------------------------
728 //-------------------------------------------------------------------
729 func (c *Control) RESTSubscriptionDeleteHandler(restSubId string) int {
732 c.UpdateCounter(cRestSubDelReqFromXapp)
734 xapp.Logger.Debug("SubscriptionDeleteRequest from XAPP")
736 restSubscription, err := c.registry.GetRESTSubscription(restSubId, true)
738 xapp.Logger.Error("%s", err.Error())
739 if restSubscription == nil {
740 // Subscription was not found
741 c.UpdateCounter(cRestSubDelRespToXapp)
742 return common.UnsubscribeNoContentCode
744 if restSubscription.SubReqOngoing == true {
745 err := fmt.Errorf("Handling of the REST Subscription Request still ongoing %s", restSubId)
746 xapp.Logger.Error("%s", err.Error())
747 c.UpdateCounter(cRestSubDelFailToXapp)
748 return common.UnsubscribeBadRequestCode
749 } else if restSubscription.SubDelReqOngoing == true {
750 // Previous request for same restSubId still ongoing
751 c.UpdateCounter(cRestSubDelRespToXapp)
752 return common.UnsubscribeNoContentCode
757 xAppRmrEndPoint := restSubscription.xAppRmrEndPoint
759 xapp.Logger.Debug("Deleteting handler: processing instances = %v", restSubscription.InstanceIds)
760 for _, instanceId := range restSubscription.InstanceIds {
761 xAppEventInstanceID, err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId)
764 xapp.Logger.Error("%s", err.Error())
766 xapp.Logger.Debug("Deleteting instanceId = %v", instanceId)
767 restSubscription.DeleteXappIdToE2Id(xAppEventInstanceID)
768 restSubscription.DeleteE2InstanceId(instanceId)
770 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(restSubscription.lastReqMd5sum)
771 c.registry.DeleteRESTSubscription(&restSubId)
772 c.RemoveRESTSubscriptionFromDb(restSubId)
775 c.UpdateCounter(cRestSubDelRespToXapp)
776 return common.UnsubscribeNoContentCode
779 //-------------------------------------------------------------------
781 //-------------------------------------------------------------------
782 func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32) (int64, error) {
784 var xAppEventInstanceID int64
785 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{instanceId})
787 xapp.Logger.Debug("Subscription Delete Handler subscription for restSubId=%v, E2EventInstanceID=%v not found %s",
788 restSubId, instanceId, idstring(err, nil))
789 return xAppEventInstanceID, nil
792 xAppEventInstanceID = int64(subs.ReqId.Id)
793 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(*endPoint), *restSubId, e2ap.RequestId{subs.ReqId.Id, 0}, &xapp.RMRMeid{RanName: *meid})
795 err := fmt.Errorf("XAPP-SubDelReq transaction not created. restSubId %s, endPoint %s, meid %s, instanceId %v", *restSubId, *endPoint, *meid, instanceId)
796 xapp.Logger.Error("%s", err.Error())
798 defer trans.Release()
800 err = c.tracker.Track(trans)
802 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
803 xapp.Logger.Error("%s", err.Error())
804 return xAppEventInstanceID, &time.ParseError{}
809 subs.OngoingDelCount++
810 go c.handleSubscriptionDelete(subs, trans)
811 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
812 subs.OngoingDelCount--
814 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
816 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
818 return xAppEventInstanceID, nil
821 //-------------------------------------------------------------------
823 //-------------------------------------------------------------------
824 func (c *Control) RESTQueryHandler() (models.SubscriptionList, error) {
825 xapp.Logger.Debug("RESTQueryHandler() called")
829 return c.registry.QueryHandler()
832 func (c *Control) TestRestHandler(w http.ResponseWriter, r *http.Request) {
833 xapp.Logger.Debug("RESTTestRestHandler() called")
835 pathParams := mux.Vars(r)
836 s := pathParams["testId"]
838 // This can be used to delete single subscription from db
839 if contains := strings.Contains(s, "deletesubid="); contains == true {
840 var splits = strings.Split(s, "=")
841 if subId, err := strconv.ParseInt(splits[1], 10, 64); err == nil {
842 xapp.Logger.Debug("RemoveSubscriptionFromSdl() called. subId = %v", subId)
843 c.RemoveSubscriptionFromSdl(uint32(subId))
848 // This can be used to remove all subscriptions db from
850 xapp.Logger.Debug("RemoveAllSubscriptionsFromSdl() called")
851 c.RemoveAllSubscriptionsFromSdl()
852 c.RemoveAllRESTSubscriptionsFromSdl()
856 // This is meant to cause submgr's restart in testing
858 xapp.Logger.Debug("os.Exit(1) called")
862 xapp.Logger.Debug("Unsupported rest command received %s", s)
865 //-------------------------------------------------------------------
867 //-------------------------------------------------------------------
869 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
870 params := &xapp.RMRParams{}
871 params.Mtype = trans.GetMtype()
872 params.SubId = int(subs.GetReqId().InstanceId)
874 params.Meid = subs.GetMeid()
876 params.PayloadLen = len(trans.Payload.Buf)
877 params.Payload = trans.Payload.Buf
879 xapp.Logger.Debug("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
880 err = c.SendWithRetry(params, false, 5)
882 xapp.Logger.Error("rmrSendToE2T: Send failed: %+v", err)
887 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
889 params := &xapp.RMRParams{}
890 params.Mtype = trans.GetMtype()
891 params.SubId = int(subs.GetReqId().InstanceId)
892 params.Xid = trans.GetXid()
893 params.Meid = trans.GetMeid()
895 params.PayloadLen = len(trans.Payload.Buf)
896 params.Payload = trans.Payload.Buf
898 xapp.Logger.Debug("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
899 err = c.SendWithRetry(params, false, 5)
901 xapp.Logger.Error("rmrSendToXapp: Send failed: %+v", err)
906 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
907 if c.RMRClient == nil {
908 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
909 xapp.Logger.Error("%s", err.Error())
914 defer c.RMRClient.Free(msg.Mbuf)
916 // xapp-frame might use direct access to c buffer and
917 // when msg.Mbuf is freed, someone might take it into use
918 // and payload data might be invalid inside message handle function
920 // subscriptions won't load system a lot so there is no
921 // real performance hit by cloning buffer into new go byte slice
922 cPay := append(msg.Payload[:0:0], msg.Payload...)
924 msg.PayloadLen = len(cPay)
927 case xapp.RIC_SUB_REQ:
928 go c.handleXAPPSubscriptionRequest(msg)
929 case xapp.RIC_SUB_RESP:
930 go c.handleE2TSubscriptionResponse(msg)
931 case xapp.RIC_SUB_FAILURE:
932 go c.handleE2TSubscriptionFailure(msg)
933 case xapp.RIC_SUB_DEL_REQ:
934 go c.handleXAPPSubscriptionDeleteRequest(msg)
935 case xapp.RIC_SUB_DEL_RESP:
936 go c.handleE2TSubscriptionDeleteResponse(msg)
937 case xapp.RIC_SUB_DEL_FAILURE:
938 go c.handleE2TSubscriptionDeleteFailure(msg)
940 xapp.Logger.Debug("Unknown Message Type '%d', discarding", msg.Mtype)
945 //-------------------------------------------------------------------
946 // handle from XAPP Subscription Request
947 //------------------------------------------------------------------
948 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
949 xapp.Logger.Debug("MSG from XAPP: %s", params.String())
950 c.UpdateCounter(cSubReqFromXapp)
952 if c.e2IfState.IsE2ConnectionUp(¶ms.Meid.RanName) == false {
953 xapp.Logger.Error("No E2 connection for ranName %v", params.Meid.RanName)
957 subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
959 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
963 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
965 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
968 defer trans.Release()
970 if err = c.tracker.Track(trans); err != nil {
971 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
975 //TODO handle subscription toward e2term inside AssignToSubscription / hide handleSubscriptionCreate in it?
976 subs, _, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c, true)
978 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
982 c.wakeSubscriptionRequest(subs, trans)
985 //-------------------------------------------------------------------
986 // Wake Subscription Request to E2node
987 //------------------------------------------------------------------
988 func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) {
990 e2SubscriptionDirectives, _ := c.GetE2SubscriptionDirectives(nil)
991 subs.OngoingReqCount++
992 go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives)
993 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
994 subs.OngoingReqCount--
997 switch themsg := event.(type) {
998 case *e2ap.E2APSubscriptionResponse:
999 themsg.RequestId.Id = trans.RequestId.Id
1000 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
1003 c.UpdateCounter(cSubRespToXapp)
1004 c.rmrSendToXapp("", subs, trans)
1007 case *e2ap.E2APSubscriptionFailure:
1008 themsg.RequestId.Id = trans.RequestId.Id
1009 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
1011 c.UpdateCounter(cSubFailToXapp)
1012 c.rmrSendToXapp("", subs, trans)
1018 xapp.Logger.Debug("XAPP-SubReq: failed %s", idstring(err, trans, subs))
1019 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
1022 //-------------------------------------------------------------------
1023 // handle from XAPP Subscription Delete Request
1024 //------------------------------------------------------------------
1025 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
1026 xapp.Logger.Debug("MSG from XAPP: %s", params.String())
1027 c.UpdateCounter(cSubDelReqFromXapp)
1029 if c.e2IfState.IsE2ConnectionUp(¶ms.Meid.RanName) == false {
1030 xapp.Logger.Error("No E2 connection for ranName %v", params.Meid.RanName)
1034 subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
1036 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
1040 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
1042 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
1045 defer trans.Release()
1047 err = c.tracker.Track(trans)
1049 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
1053 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
1055 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
1062 subs.OngoingDelCount++
1063 go c.handleSubscriptionDelete(subs, trans)
1064 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
1065 subs.OngoingDelCount--
1067 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
1069 if subs.NoRespToXapp == true {
1070 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
1071 xapp.Logger.Debug("XAPP-SubDelReq: subs.NoRespToXapp == true")
1075 // Whatever is received success, fail or timeout, send successful delete response
1076 subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
1077 subDelRespMsg.RequestId.Id = trans.RequestId.Id
1078 subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
1079 subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
1080 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
1082 c.UpdateCounter(cSubDelRespToXapp)
1083 c.rmrSendToXapp("", subs, trans)
1086 //TODO handle subscription toward e2term insiged RemoveFromSubscription / hide handleSubscriptionDelete in it?
1087 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
1090 //-------------------------------------------------------------------
1091 // SUBS CREATE Handling
1092 //-------------------------------------------------------------------
1093 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp, e2SubscriptionDirectives *E2SubscriptionDirectives) {
1095 var event interface{} = nil
1096 var removeSubscriptionFromDb bool = false
1097 trans := c.tracker.NewSubsTransaction(subs)
1098 subs.WaitTransactionTurn(trans)
1099 defer subs.ReleaseTransactionTurn(trans)
1100 defer trans.Release()
1102 xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
1104 subRfMsg, valid := subs.GetCachedResponse()
1105 if subRfMsg == nil && valid == true {
1106 event = c.sendE2TSubscriptionRequest(subs, trans, parentTrans, e2SubscriptionDirectives)
1107 switch event.(type) {
1108 case *e2ap.E2APSubscriptionResponse:
1109 subRfMsg, valid = subs.SetCachedResponse(event, true)
1110 subs.SubRespRcvd = true
1111 case *e2ap.E2APSubscriptionFailure:
1112 removeSubscriptionFromDb = true
1113 subRfMsg, valid = subs.SetCachedResponse(event, false)
1114 xapp.Logger.Debug("SUBS-SubReq: internal delete due failure event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1115 case *SubmgrRestartTestEvent:
1116 // This simulates that no response has been received and after restart subscriptions are restored from db
1117 xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
1118 case *PackSubscriptionRequestErrortEvent, *SDLWriteErrortEvent:
1119 subRfMsg, valid = subs.SetCachedResponse(event, false)
1122 if subs.PolicyUpdate == false {
1123 xapp.Logger.Debug("SUBS-SubReq: internal delete due default event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1124 removeSubscriptionFromDb = true
1125 subRfMsg, valid = subs.SetCachedResponse(nil, false)
1126 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1129 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
1131 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
1134 err := c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
1136 subRfMsg, valid = subs.SetCachedResponse(event, false)
1137 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1140 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
1142 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
1145 parentTrans.SendEvent(subRfMsg, 0)
1148 //-------------------------------------------------------------------
1149 // SUBS DELETE Handling
1150 //-------------------------------------------------------------------
1152 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
1154 trans := c.tracker.NewSubsTransaction(subs)
1155 subs.WaitTransactionTurn(trans)
1156 defer subs.ReleaseTransactionTurn(trans)
1157 defer trans.Release()
1159 xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
1163 if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
1166 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1170 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
1171 // If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if
1172 // RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...))
1173 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
1174 c.registry.UpdateSubscriptionToDb(subs, c)
1175 parentTrans.SendEvent(nil, 0)
1178 //-------------------------------------------------------------------
1179 // send to E2T Subscription Request
1180 //-------------------------------------------------------------------
1181 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp, e2SubscriptionDirectives *E2SubscriptionDirectives) interface{} {
1183 var event interface{} = nil
1184 var timedOut bool = false
1185 const ricRequestorId = 123
1187 subReqMsg := subs.SubReqMsg
1188 subReqMsg.RequestId = subs.GetReqId().RequestId
1189 subReqMsg.RequestId.Id = ricRequestorId
1190 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
1192 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
1193 return &PackSubscriptionRequestErrortEvent{
1195 ErrorSource: models.SubscriptionInstanceErrorSourceASN1,
1196 ErrorCause: err.Error(),
1201 // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
1202 err = c.WriteSubscriptionToDb(subs)
1204 return &SDLWriteErrortEvent{
1206 ErrorSource: models.SubscriptionInstanceErrorSourceDBAAS,
1207 ErrorCause: err.Error(),
1212 for retries := int64(0); retries < e2SubscriptionDirectives.E2MaxTryCount; retries++ {
1213 desc := fmt.Sprintf("(retry %d)", retries)
1215 c.UpdateCounter(cSubReqToE2)
1217 c.UpdateCounter(cSubReReqToE2)
1219 c.rmrSendToE2T(desc, subs, trans)
1220 if subs.DoNotWaitSubResp == false {
1221 event, timedOut = trans.WaitEvent(e2SubscriptionDirectives.E2TimeoutTimerValue)
1223 c.UpdateCounter(cSubReqTimerExpiry)
1227 // Simulating case where subscrition request has been sent but response has not been received before restart
1228 event = &SubmgrRestartTestEvent{}
1229 xapp.Logger.Debug("Restart event, DoNotWaitSubResp == true")
1233 xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1237 //-------------------------------------------------------------------
1238 // send to E2T Subscription Delete Request
1239 //-------------------------------------------------------------------
1241 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
1243 var event interface{}
1245 const ricRequestorId = 123
1247 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1248 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1249 subDelReqMsg.RequestId.Id = ricRequestorId
1250 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1251 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1253 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
1257 for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
1258 desc := fmt.Sprintf("(retry %d)", retries)
1260 c.UpdateCounter(cSubDelReqToE2)
1262 c.UpdateCounter(cSubDelReReqToE2)
1264 c.rmrSendToE2T(desc, subs, trans)
1265 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
1267 c.UpdateCounter(cSubDelReqTimerExpiry)
1272 xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1276 //-------------------------------------------------------------------
1277 // handle from E2T Subscription Response
1278 //-------------------------------------------------------------------
1279 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
1280 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1281 c.UpdateCounter(cSubRespFromE2)
1283 subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
1285 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
1288 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
1290 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
1293 trans := subs.GetTransaction()
1295 err = fmt.Errorf("Ongoing transaction not found")
1296 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
1299 sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
1300 if sendOk == false {
1301 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1302 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
1307 //-------------------------------------------------------------------
1308 // handle from E2T Subscription Failure
1309 //-------------------------------------------------------------------
1310 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
1311 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1312 c.UpdateCounter(cSubFailFromE2)
1313 subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
1315 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
1318 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
1320 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
1323 trans := subs.GetTransaction()
1325 err = fmt.Errorf("Ongoing transaction not found")
1326 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
1329 sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
1330 if sendOk == false {
1331 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1332 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
1337 //-------------------------------------------------------------------
1338 // handle from E2T Subscription Delete Response
1339 //-------------------------------------------------------------------
1340 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
1341 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1342 c.UpdateCounter(cSubDelRespFromE2)
1343 subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
1345 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1348 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
1350 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1353 trans := subs.GetTransaction()
1355 err = fmt.Errorf("Ongoing transaction not found")
1356 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
1359 sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
1360 if sendOk == false {
1361 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1362 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
1367 //-------------------------------------------------------------------
1368 // handle from E2T Subscription Delete Failure
1369 //-------------------------------------------------------------------
1370 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
1371 xapp.Logger.Debug("MSG from E2T: %s", params.String())
1372 c.UpdateCounter(cSubDelFailFromE2)
1373 subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
1375 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1378 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
1380 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1383 trans := subs.GetTransaction()
1385 err = fmt.Errorf("Ongoing transaction not found")
1386 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
1389 sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
1390 if sendOk == false {
1391 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1392 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
1397 //-------------------------------------------------------------------
1399 //-------------------------------------------------------------------
1400 func typeofSubsMessage(v interface{}) string {
1405 //case *e2ap.E2APSubscriptionRequest:
1407 case *e2ap.E2APSubscriptionResponse:
1409 case *e2ap.E2APSubscriptionFailure:
1411 //case *e2ap.E2APSubscriptionDeleteRequest:
1412 // return "SubDelReq"
1413 case *e2ap.E2APSubscriptionDeleteResponse:
1415 case *e2ap.E2APSubscriptionDeleteFailure:
1422 //-------------------------------------------------------------------
1424 //-------------------------------------------------------------------
1425 func (c *Control) WriteSubscriptionToDb(subs *Subscription) error {
1426 xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
1427 err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
1429 xapp.Logger.Error("%v", err)
1435 //-------------------------------------------------------------------
1437 //-------------------------------------------------------------------
1438 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) error {
1440 if removeSubscriptionFromDb == true {
1441 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1442 c.RemoveSubscriptionFromDb(subs)
1444 // Update is needed for successful response and merge case here
1445 if subs.RetryFromXapp == false {
1446 err := c.WriteSubscriptionToDb(subs)
1450 subs.RetryFromXapp = false
1454 //-------------------------------------------------------------------
1456 //-------------------------------------------------------------------
1457 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
1458 xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
1459 err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
1461 xapp.Logger.Error("%v", err)
1465 //-------------------------------------------------------------------
1467 //-------------------------------------------------------------------
1468 func (c *Control) WriteRESTSubscriptionToDb(restSubId string, restSubs *RESTSubscription) {
1469 xapp.Logger.Debug("WriteRESTSubscriptionToDb() restSubId = %s", restSubId)
1470 err := c.WriteRESTSubscriptionToSdl(restSubId, restSubs)
1472 xapp.Logger.Error("%v", err)
1476 //-------------------------------------------------------------------
1478 //-------------------------------------------------------------------
1479 func (c *Control) UpdateRESTSubscriptionInDB(restSubId string, restSubs *RESTSubscription, removeRestSubscriptionFromDb bool) {
1481 if removeRestSubscriptionFromDb == true {
1482 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1483 c.RemoveRESTSubscriptionFromDb(restSubId)
1485 c.WriteRESTSubscriptionToDb(restSubId, restSubs)
1489 //-------------------------------------------------------------------
1491 //-------------------------------------------------------------------
1492 func (c *Control) RemoveRESTSubscriptionFromDb(restSubId string) {
1493 xapp.Logger.Debug("RemoveRESTSubscriptionFromDb() restSubId = %s", restSubId)
1494 err := c.RemoveRESTSubscriptionFromSdl(restSubId)
1496 xapp.Logger.Error("%v", err)
1500 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) {
1502 const ricRequestorId = 123
1503 xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
1505 // Send delete for every endpoint in the subscription
1506 if subs.PolicyUpdate == false {
1507 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1508 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1509 subDelReqMsg.RequestId.Id = ricRequestorId
1510 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1511 mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1513 xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
1516 for _, endPoint := range subs.EpList.Endpoints {
1517 params := &xapp.RMRParams{}
1518 params.Mtype = mType
1519 params.SubId = int(subs.GetReqId().InstanceId)
1521 params.Meid = subs.Meid
1522 params.Src = endPoint.String()
1523 params.PayloadLen = len(payload.Buf)
1524 params.Payload = payload.Buf
1526 subs.DeleteFromDb = true
1527 c.handleXAPPSubscriptionDeleteRequest(params)
1532 func (c *Control) PrintRESTSubscriptionRequest(p *models.SubscriptionParams) {
1534 fmt.Println("CRESTSubscriptionRequest")
1540 if p.SubscriptionID != "" {
1541 fmt.Println(" SubscriptionID = ", p.SubscriptionID)
1543 fmt.Println(" SubscriptionID = ''")
1546 fmt.Printf(" ClientEndpoint.Host = %s\n", p.ClientEndpoint.Host)
1548 if p.ClientEndpoint.HTTPPort != nil {
1549 fmt.Printf(" ClientEndpoint.HTTPPort = %v\n", *p.ClientEndpoint.HTTPPort)
1551 fmt.Println(" ClientEndpoint.HTTPPort = nil")
1554 if p.ClientEndpoint.RMRPort != nil {
1555 fmt.Printf(" ClientEndpoint.RMRPort = %v\n", *p.ClientEndpoint.RMRPort)
1557 fmt.Println(" ClientEndpoint.RMRPort = nil")
1561 fmt.Printf(" Meid = %s\n", *p.Meid)
1563 fmt.Println(" Meid = nil")
1566 if p.E2SubscriptionDirectives == nil {
1567 fmt.Println(" E2SubscriptionDirectives = nil")
1569 fmt.Println(" E2SubscriptionDirectives")
1570 if p.E2SubscriptionDirectives.E2RetryCount == nil {
1571 fmt.Println(" E2RetryCount == nil")
1573 fmt.Printf(" E2RetryCount = %v\n", *p.E2SubscriptionDirectives.E2RetryCount)
1575 fmt.Printf(" E2TimeoutTimerValue = %v\n", p.E2SubscriptionDirectives.E2TimeoutTimerValue)
1576 fmt.Printf(" RMRRoutingNeeded = %v\n", p.E2SubscriptionDirectives.RMRRoutingNeeded)
1578 for _, subscriptionDetail := range p.SubscriptionDetails {
1579 if p.RANFunctionID != nil {
1580 fmt.Printf(" RANFunctionID = %v\n", *p.RANFunctionID)
1582 fmt.Println(" RANFunctionID = nil")
1584 fmt.Printf(" SubscriptionDetail.XappEventInstanceID = %v\n", *subscriptionDetail.XappEventInstanceID)
1585 fmt.Printf(" SubscriptionDetail.EventTriggers = %v\n", subscriptionDetail.EventTriggers)
1587 for _, actionToBeSetup := range subscriptionDetail.ActionToBeSetupList {
1588 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionID = %v\n", *actionToBeSetup.ActionID)
1589 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionType = %s\n", *actionToBeSetup.ActionType)
1590 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionDefinition = %v\n", actionToBeSetup.ActionDefinition)
1592 if actionToBeSetup.SubsequentAction != nil {
1593 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.SubsequentAction.SubsequentActionType = %s\n", *actionToBeSetup.SubsequentAction.SubsequentActionType)
1594 fmt.Printf(" SubscriptionDetail.ActionToBeSetup..SubsequentAction.TimeToWait = %s\n", *actionToBeSetup.SubsequentAction.TimeToWait)
1596 fmt.Println(" SubscriptionDetail.ActionToBeSetup.SubsequentAction = nil")