2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
30 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
31 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
32 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
33 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
34 httptransport "github.com/go-openapi/runtime/client"
35 "github.com/go-openapi/strfmt"
36 "github.com/gorilla/mux"
37 "github.com/segmentio/ksuid"
38 "github.com/spf13/viper"
41 //-----------------------------------------------------------------------------
43 //-----------------------------------------------------------------------------
45 func idstring(err error, entries ...fmt.Stringer) string {
46 var retval string = ""
47 var filler string = ""
48 for _, entry := range entries {
50 retval += filler + entry.String()
53 retval += filler + "(NIL)"
57 retval += filler + "err(" + err.Error() + ")"
63 //-----------------------------------------------------------------------------
65 //-----------------------------------------------------------------------------
67 var e2tSubReqTimeout time.Duration
68 var e2tSubDelReqTime time.Duration
69 var e2tRecvMsgTimeout time.Duration
70 var waitRouteCleanup_ms time.Duration
71 var e2tMaxSubReqTryCount uint64 // Initial try + retry
72 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
73 var readSubsFromDb string
74 var restDuplicateCtrl duplicateCtrl
75 var dbRetryForever string
84 restSubsDb Sdlnterface
87 Counters map[string]xapp.Counter
98 type SubmgrRestartTestEvent struct{}
99 type SubmgrRestartUpEvent struct{}
102 xapp.Logger.Info("SUBMGR")
104 viper.SetEnvPrefix("submgr")
105 viper.AllowEmptyEnv(true)
108 func NewControl() *Control {
110 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
111 rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
113 registry := new(Registry)
114 registry.Initialize()
115 registry.rtmgrClient = &rtmgrClient
117 tracker := new(Tracker)
120 c := &Control{e2ap: new(E2ap),
123 e2SubsDb: CreateSdl(),
124 restSubsDb: CreateRESTSdl(),
125 Counters: xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
128 c.ReadConfigParameters("")
130 // Register REST handler for testing support
131 xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
132 xapp.Resource.InjectRoute("/ric/v1/restsubscriptions", c.GetAllRestSubscriptions, "GET")
133 xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET")
135 go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler, c.SubscriptionDeleteHandlerCB)
137 if readSubsFromDb == "false" {
141 restDuplicateCtrl.Init()
143 // Read subscriptions from db
144 c.ReadE2Subscriptions()
145 c.ReadRESTSubscriptions()
149 func (c *Control) SymptomDataHandler(w http.ResponseWriter, r *http.Request) {
150 subscriptions, _ := c.registry.QueryHandler()
151 xapp.Resource.SendSymptomDataJson(w, r, subscriptions, "platform/subscriptions.json")
154 //-------------------------------------------------------------------
156 //-------------------------------------------------------------------
157 func (c *Control) GetAllRestSubscriptions(w http.ResponseWriter, r *http.Request) {
158 xapp.Logger.Info("GetAllRestSubscriptions() called")
159 response := c.registry.GetAllRestSubscriptions()
163 //-------------------------------------------------------------------
165 //-------------------------------------------------------------------
166 func (c *Control) ReadE2Subscriptions() error {
169 var register map[uint32]*Subscription
170 for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
171 xapp.Logger.Info("Reading E2 subscriptions from db")
172 subIds, register, err = c.ReadAllSubscriptionsFromSdl()
174 xapp.Logger.Error("%v", err)
175 <-time.After(1 * time.Second)
177 c.registry.subIds = subIds
178 c.registry.register = register
179 c.HandleUncompletedSubscriptions(register)
183 xapp.Logger.Info("Continuing without retring")
187 //-------------------------------------------------------------------
189 //-------------------------------------------------------------------
190 func (c *Control) ReadRESTSubscriptions() error {
192 var restSubscriptions map[string]*RESTSubscription
193 for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
194 xapp.Logger.Info("Reading REST subscriptions from db")
195 restSubscriptions, err = c.ReadAllRESTSubscriptionsFromSdl()
197 xapp.Logger.Error("%v", err)
198 <-time.After(1 * time.Second)
200 c.registry.restSubscriptions = restSubscriptions
204 xapp.Logger.Info("Continuing without retring")
208 //-------------------------------------------------------------------
210 //-------------------------------------------------------------------
211 func (c *Control) ReadConfigParameters(f string) {
213 // viper.GetDuration returns nanoseconds
214 e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
215 if e2tSubReqTimeout == 0 {
216 e2tSubReqTimeout = 2000 * 1000000
218 xapp.Logger.Info("e2tSubReqTimeout %v", e2tSubReqTimeout)
219 e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
220 if e2tSubDelReqTime == 0 {
221 e2tSubDelReqTime = 2000 * 1000000
223 xapp.Logger.Info("e2tSubDelReqTime %v", e2tSubDelReqTime)
224 e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
225 if e2tRecvMsgTimeout == 0 {
226 e2tRecvMsgTimeout = 2000 * 1000000
228 xapp.Logger.Info("e2tRecvMsgTimeout %v", e2tRecvMsgTimeout)
230 e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
231 if e2tMaxSubReqTryCount == 0 {
232 e2tMaxSubReqTryCount = 1
234 xapp.Logger.Info("e2tMaxSubReqTryCount %v", e2tMaxSubReqTryCount)
236 e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
237 if e2tMaxSubDelReqTryCount == 0 {
238 e2tMaxSubDelReqTryCount = 1
240 xapp.Logger.Info("e2tMaxSubDelReqTryCount %v", e2tMaxSubDelReqTryCount)
242 readSubsFromDb = viper.GetString("controls.readSubsFromDb")
243 if readSubsFromDb == "" {
244 readSubsFromDb = "true"
246 xapp.Logger.Info("readSubsFromDb %v", readSubsFromDb)
248 dbTryCount = viper.GetInt("controls.dbTryCount")
252 xapp.Logger.Info("dbTryCount %v", dbTryCount)
254 dbRetryForever = viper.GetString("controls.dbRetryForever")
255 if dbRetryForever == "" {
256 dbRetryForever = "true"
258 xapp.Logger.Info("dbRetryForever %v", dbRetryForever)
260 c.LoggerLevel = viper.GetUint32("logger.level")
261 if c.LoggerLevel == 0 {
264 xapp.Logger.Info("LoggerLevel %v", c.LoggerLevel)
266 // Internal cfg parameter, used to define a wait time for RMR route clean-up. None default
267 // value 100ms used currently only in unittests.
268 waitRouteCleanup_ms = viper.GetDuration("controls.waitRouteCleanup_ms") * 1000000
269 if waitRouteCleanup_ms == 0 {
270 waitRouteCleanup_ms = 5000 * 1000000
272 xapp.Logger.Info("waitRouteCleanup %v", waitRouteCleanup_ms)
275 //-------------------------------------------------------------------
277 //-------------------------------------------------------------------
278 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
280 xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
281 for subId, subs := range register {
282 if subs.SubRespRcvd == false {
283 subs.NoRespToXapp = true
284 xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
285 c.SendSubscriptionDeleteReq(subs)
290 func (c *Control) ReadyCB(data interface{}) {
291 if c.RMRClient == nil {
292 c.RMRClient = xapp.Rmr
296 func (c *Control) Run() {
297 xapp.SetReadyCB(c.ReadyCB, nil)
298 xapp.AddConfigChangeListener(c.ReadConfigParameters)
302 //-------------------------------------------------------------------
304 //-------------------------------------------------------------------
305 func (c *Control) GetOrCreateRestSubscription(p *models.SubscriptionParams, md5sum string, xAppRmrEndpoint string) (*RESTSubscription, string, error) {
308 var restSubscription *RESTSubscription
311 prevRestSubsId, exists := restDuplicateCtrl.GetLastKnownRestSubsIdBasedOnMd5sum(md5sum)
312 if p.SubscriptionID == "" {
314 restSubscription, err = c.registry.GetRESTSubscription(prevRestSubsId, false)
315 if restSubscription != nil {
316 restSubId = prevRestSubsId
318 xapp.Logger.Info("Existing restSubId %s found by MD5sum %s for a request without subscription ID - using previous subscription", prevRestSubsId, md5sum)
320 xapp.Logger.Info("Existing restSubId %s found by MD5sum %s for a request without subscription ID - Note: %s", prevRestSubsId, md5sum, err.Error())
323 xapp.Logger.Info("None existing restSubId %s referred by MD5sum %s for a request without subscription ID - deleting cached entry", prevRestSubsId, md5sum)
324 restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
328 if restSubscription == nil {
329 restSubId = ksuid.New().String()
330 restSubscription, err = c.registry.CreateRESTSubscription(&restSubId, &xAppRmrEndpoint, p.Meid)
332 xapp.Logger.Error("%s", err.Error())
333 c.UpdateCounter(cRestSubFailToXapp)
338 restSubId = p.SubscriptionID
340 xapp.Logger.Info("RestSubscription ID %s provided via REST request", restSubId)
342 restSubscription, err = c.registry.GetRESTSubscription(restSubId, false)
344 xapp.Logger.Error("%s", err.Error())
345 c.UpdateCounter(cRestSubFailToXapp)
350 xapp.Logger.Info("Existing restSubscription found for ID %s, new request based on md5sum", restSubId)
352 xapp.Logger.Info("Existing restSubscription found for ID %s(%s), re-transmission based on md5sum match with previous request", prevRestSubsId, restSubId)
356 return restSubscription, restSubId, nil
359 //-------------------------------------------------------------------
361 //-------------------------------------------------------------------
362 func (c *Control) SubscriptionHandler(params interface{}) (*models.SubscriptionResponse, error) {
365 c.UpdateCounter(cRestSubReqFromXapp)
367 subResp := models.SubscriptionResponse{}
368 p := params.(*models.SubscriptionParams)
370 if c.LoggerLevel > 2 {
371 c.PrintRESTSubscriptionRequest(p)
374 if p.ClientEndpoint == nil {
375 xapp.Logger.Error("ClientEndpoint == nil")
376 c.UpdateCounter(cRestSubFailToXapp)
377 return nil, fmt.Errorf("")
380 _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*p.ClientEndpoint)
382 xapp.Logger.Error("%s", err.Error())
383 c.UpdateCounter(cRestSubFailToXapp)
387 md5sum, err := CalculateRequestMd5sum(params)
389 xapp.Logger.Error("Failed to generate md5sum from incoming request - %s", err.Error())
392 restSubscription, restSubId, err := c.GetOrCreateRestSubscription(p, md5sum, xAppRmrEndpoint)
394 xapp.Logger.Error("Failed to get/allocate REST subscription")
398 subResp.SubscriptionID = &restSubId
399 subReqList := e2ap.SubscriptionRequestList{}
400 err = c.e2ap.FillSubscriptionReqMsgs(params, &subReqList, restSubscription)
402 xapp.Logger.Error("%s", err.Error())
403 restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
404 c.registry.DeleteRESTSubscription(&restSubId)
405 c.UpdateCounter(cRestSubFailToXapp)
409 duplicate := restDuplicateCtrl.IsDuplicateToOngoingTransaction(restSubId, md5sum)
411 xapp.Logger.Info("Retransmission blocker direct ACK for request of restSubsId %s restSubId MD5sum %s as retransmission", restSubId, md5sum)
412 c.UpdateCounter(cRestSubRespToXapp)
416 c.WriteRESTSubscriptionToDb(restSubId, restSubscription)
418 go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId, xAppRmrEndpoint, md5sum)
420 c.UpdateCounter(cRestSubRespToXapp)
424 //-------------------------------------------------------------------
426 //-------------------------------------------------------------------
428 func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription, subReqList *e2ap.SubscriptionRequestList,
429 clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string, xAppRmrEndpoint string, md5sum string) {
431 c.SubscriptionProcessingStartDelay()
432 xapp.Logger.Debug("Subscription Request count=%v ", len(subReqList.E2APSubscriptionRequests))
434 var xAppEventInstanceID int64
435 var e2EventInstanceID int64
437 defer restDuplicateCtrl.SetMd5sumFromLastOkRequest(*restSubId, md5sum)
439 for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ {
440 subReqMsg := subReqList.E2APSubscriptionRequests[index]
441 xAppEventInstanceID = (int64)(subReqMsg.RequestId.Id)
443 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), *restSubId, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid})
445 // Send notification to xApp that prosessing of a Subscription Request has failed.
446 err := fmt.Errorf("Tracking failure")
447 c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans)
451 xapp.Logger.Info("Handle SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
453 subRespMsg, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, *restSubId)
455 xapp.Logger.Debug("Handled SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
459 c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans)
461 e2EventInstanceID = (int64)(subRespMsg.RequestId.InstanceId)
462 restSubscription.AddMd5Sum(md5sum)
463 xapp.Logger.Info("SubscriptionRequest index=%v processed successfullyfor %s. endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
464 index, *restSubId, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
465 c.sendSuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, e2EventInstanceID, clientEndpoint, trans)
470 //-------------------------------------------------------------------
472 //------------------------------------------------------------------
473 func (c *Control) SubscriptionProcessingStartDelay() {
474 if c.UTTesting == true {
475 // This is temporary fix for the UT problem that notification arrives before subscription response
476 // Correct fix would be to allow notification come before response and process it correctly
477 xapp.Logger.Debug("Setting 50 ms delay before starting processing Subscriptions")
478 <-time.After(time.Millisecond * 50)
479 xapp.Logger.Debug("Continuing after delay")
483 //-------------------------------------------------------------------
485 //------------------------------------------------------------------
486 func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, meid *string,
487 restSubId string) (*e2ap.E2APSubscriptionResponse, error) {
489 err := c.tracker.Track(trans)
491 xapp.Logger.Error("XAPP-SubReq Tracking error: %s", idstring(err, trans))
492 err = fmt.Errorf("Tracking failure")
496 subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
498 xapp.Logger.Error("XAPP-SubReq Assign error: %s", idstring(err, trans))
505 go c.handleSubscriptionCreate(subs, trans)
506 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
510 switch themsg := event.(type) {
511 case *e2ap.E2APSubscriptionResponse:
514 case *e2ap.E2APSubscriptionFailure:
515 err = fmt.Errorf("E2 SubscriptionFailure received")
518 err = fmt.Errorf("unexpected E2 subscription response received")
522 err = fmt.Errorf("E2 subscription response timeout")
525 xapp.Logger.Error("XAPP-SubReq E2 subscription failed %s", idstring(err, trans, subs))
526 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
530 //-------------------------------------------------------------------
532 //-------------------------------------------------------------------
533 func (c *Control) sendUnsuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, err error,
534 clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp) {
536 // Send notification to xApp that prosessing of a Subscription Request has failed.
537 e2EventInstanceID := (int64)(0)
538 errorCause := err.Error()
539 resp := &models.SubscriptionResponse{
540 SubscriptionID: restSubId,
541 SubscriptionInstances: []*models.SubscriptionInstance{
542 &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
543 ErrorCause: &errorCause,
544 XappEventInstanceID: &xAppEventInstanceID},
547 // Mark REST subscription request processed.
548 restSubscription.SetProcessed(err)
549 c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
551 xapp.Logger.Info("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
552 errorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
554 xapp.Logger.Info("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v",
555 errorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID)
558 c.UpdateCounter(cRestSubFailNotifToXapp)
559 xapp.Subscription.Notify(resp, *clientEndpoint)
562 //-------------------------------------------------------------------
564 //-------------------------------------------------------------------
565 func (c *Control) sendSuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, e2EventInstanceID int64,
566 clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp) {
568 // Store successfully processed InstanceId for deletion
569 restSubscription.AddE2InstanceId((uint32)(e2EventInstanceID))
570 restSubscription.AddXappIdToE2Id(xAppEventInstanceID, e2EventInstanceID)
572 // Send notification to xApp that a Subscription Request has been processed.
573 resp := &models.SubscriptionResponse{
574 SubscriptionID: restSubId,
575 SubscriptionInstances: []*models.SubscriptionInstance{
576 &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
578 XappEventInstanceID: &xAppEventInstanceID},
581 // Mark REST subscription request processesd.
582 restSubscription.SetProcessed(nil)
583 c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
584 xapp.Logger.Info("Sending successful REST notification to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
585 clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
587 c.UpdateCounter(cRestSubNotifToXapp)
588 xapp.Subscription.Notify(resp, *clientEndpoint)
591 //-------------------------------------------------------------------
593 //-------------------------------------------------------------------
594 func (c *Control) SubscriptionDeleteHandlerCB(restSubId string) error {
597 c.UpdateCounter(cRestSubDelReqFromXapp)
599 xapp.Logger.Info("SubscriptionDeleteRequest from XAPP")
601 restSubscription, err := c.registry.GetRESTSubscription(restSubId, true)
603 xapp.Logger.Error("%s", err.Error())
604 if restSubscription == nil {
605 // Subscription was not found
608 if restSubscription.SubReqOngoing == true {
609 err := fmt.Errorf("Handling of the REST Subscription Request still ongoing %s", restSubId)
610 xapp.Logger.Error("%s", err.Error())
612 } else if restSubscription.SubDelReqOngoing == true {
613 // Previous request for same restSubId still ongoing
619 xAppRmrEndPoint := restSubscription.xAppRmrEndPoint
621 xapp.Logger.Info("Deleteting handler: processing instances = %v", restSubscription.InstanceIds)
622 for _, instanceId := range restSubscription.InstanceIds {
623 xAppEventInstanceID, err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId)
626 xapp.Logger.Error("%s", err.Error())
629 xapp.Logger.Info("Deleteting instanceId = %v", instanceId)
630 restSubscription.DeleteXappIdToE2Id(xAppEventInstanceID)
631 restSubscription.DeleteE2InstanceId(instanceId)
633 restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(restSubscription.lastReqMd5sum)
634 c.registry.DeleteRESTSubscription(&restSubId)
635 c.RemoveRESTSubscriptionFromDb(restSubId)
638 c.UpdateCounter(cRestSubDelRespToXapp)
643 //-------------------------------------------------------------------
645 //-------------------------------------------------------------------
646 func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32) (int64, error) {
648 var xAppEventInstanceID int64
649 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{instanceId})
651 xapp.Logger.Info("Subscription Delete Handler subscription for restSubId=%v, E2EventInstanceID=%v not found %s",
652 restSubId, instanceId, idstring(err, nil))
653 return xAppEventInstanceID, nil
656 xAppEventInstanceID = int64(subs.ReqId.Id)
657 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(*endPoint), *restSubId, e2ap.RequestId{subs.ReqId.Id, 0}, &xapp.RMRMeid{RanName: *meid})
659 err := fmt.Errorf("XAPP-SubDelReq transaction not created. restSubId %s, endPoint %s, meid %s, instanceId %v", *restSubId, *endPoint, *meid, instanceId)
660 xapp.Logger.Error("%s", err.Error())
662 defer trans.Release()
664 err = c.tracker.Track(trans)
666 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
667 xapp.Logger.Error("%s", err.Error())
668 return xAppEventInstanceID, &time.ParseError{}
673 go c.handleSubscriptionDelete(subs, trans)
674 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
676 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
678 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
680 return xAppEventInstanceID, nil
683 //-------------------------------------------------------------------
685 //-------------------------------------------------------------------
686 func (c *Control) QueryHandler() (models.SubscriptionList, error) {
687 xapp.Logger.Info("QueryHandler() called")
691 return c.registry.QueryHandler()
694 func (c *Control) TestRestHandler(w http.ResponseWriter, r *http.Request) {
695 xapp.Logger.Info("TestRestHandler() called")
697 pathParams := mux.Vars(r)
698 s := pathParams["testId"]
700 // This can be used to delete single subscription from db
701 if contains := strings.Contains(s, "deletesubid="); contains == true {
702 var splits = strings.Split(s, "=")
703 if subId, err := strconv.ParseInt(splits[1], 10, 64); err == nil {
704 xapp.Logger.Info("RemoveSubscriptionFromSdl() called. subId = %v", subId)
705 c.RemoveSubscriptionFromSdl(uint32(subId))
710 // This can be used to remove all subscriptions db from
712 xapp.Logger.Info("RemoveAllSubscriptionsFromSdl() called")
713 c.RemoveAllSubscriptionsFromSdl()
714 c.RemoveAllRESTSubscriptionsFromSdl()
718 // This is meant to cause submgr's restart in testing
720 xapp.Logger.Info("os.Exit(1) called")
724 xapp.Logger.Info("Unsupported rest command received %s", s)
727 //-------------------------------------------------------------------
729 //-------------------------------------------------------------------
731 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
732 params := &xapp.RMRParams{}
733 params.Mtype = trans.GetMtype()
734 params.SubId = int(subs.GetReqId().InstanceId)
736 params.Meid = subs.GetMeid()
738 params.PayloadLen = len(trans.Payload.Buf)
739 params.Payload = trans.Payload.Buf
741 xapp.Logger.Info("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
742 err = c.SendWithRetry(params, false, 5)
744 xapp.Logger.Error("rmrSendToE2T: Send failed: %+v", err)
749 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
751 params := &xapp.RMRParams{}
752 params.Mtype = trans.GetMtype()
753 params.SubId = int(subs.GetReqId().InstanceId)
754 params.Xid = trans.GetXid()
755 params.Meid = trans.GetMeid()
757 params.PayloadLen = len(trans.Payload.Buf)
758 params.Payload = trans.Payload.Buf
760 xapp.Logger.Info("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
761 err = c.SendWithRetry(params, false, 5)
763 xapp.Logger.Error("rmrSendToXapp: Send failed: %+v", err)
768 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
769 if c.RMRClient == nil {
770 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
771 xapp.Logger.Error("%s", err.Error())
776 defer c.RMRClient.Free(msg.Mbuf)
778 // xapp-frame might use direct access to c buffer and
779 // when msg.Mbuf is freed, someone might take it into use
780 // and payload data might be invalid inside message handle function
782 // subscriptions won't load system a lot so there is no
783 // real performance hit by cloning buffer into new go byte slice
784 cPay := append(msg.Payload[:0:0], msg.Payload...)
786 msg.PayloadLen = len(cPay)
789 case xapp.RIC_SUB_REQ:
790 go c.handleXAPPSubscriptionRequest(msg)
791 case xapp.RIC_SUB_RESP:
792 go c.handleE2TSubscriptionResponse(msg)
793 case xapp.RIC_SUB_FAILURE:
794 go c.handleE2TSubscriptionFailure(msg)
795 case xapp.RIC_SUB_DEL_REQ:
796 go c.handleXAPPSubscriptionDeleteRequest(msg)
797 case xapp.RIC_SUB_DEL_RESP:
798 go c.handleE2TSubscriptionDeleteResponse(msg)
799 case xapp.RIC_SUB_DEL_FAILURE:
800 go c.handleE2TSubscriptionDeleteFailure(msg)
802 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
807 //-------------------------------------------------------------------
808 // handle from XAPP Subscription Request
809 //------------------------------------------------------------------
810 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
811 xapp.Logger.Info("MSG from XAPP: %s", params.String())
812 c.UpdateCounter(cSubReqFromXapp)
814 subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
816 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
820 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
822 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
825 defer trans.Release()
827 if err = c.tracker.Track(trans); err != nil {
828 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
832 //TODO handle subscription toward e2term inside AssignToSubscription / hide handleSubscriptionCreate in it?
833 subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
835 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
839 c.wakeSubscriptionRequest(subs, trans)
842 //-------------------------------------------------------------------
843 // Wake Subscription Request to E2node
844 //------------------------------------------------------------------
845 func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) {
847 go c.handleSubscriptionCreate(subs, trans)
848 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
851 switch themsg := event.(type) {
852 case *e2ap.E2APSubscriptionResponse:
853 themsg.RequestId.Id = trans.RequestId.Id
854 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
857 c.UpdateCounter(cSubRespToXapp)
858 c.rmrSendToXapp("", subs, trans)
861 case *e2ap.E2APSubscriptionFailure:
862 themsg.RequestId.Id = trans.RequestId.Id
863 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
865 c.UpdateCounter(cSubFailToXapp)
866 c.rmrSendToXapp("", subs, trans)
872 xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(err, trans, subs))
873 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
876 //-------------------------------------------------------------------
877 // handle from XAPP Subscription Delete Request
878 //------------------------------------------------------------------
879 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
880 xapp.Logger.Info("MSG from XAPP: %s", params.String())
881 c.UpdateCounter(cSubDelReqFromXapp)
883 subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
885 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
889 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
891 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
894 defer trans.Release()
896 err = c.tracker.Track(trans)
898 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
902 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
904 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
911 go c.handleSubscriptionDelete(subs, trans)
912 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
914 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
916 if subs.NoRespToXapp == true {
917 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
921 // Whatever is received success, fail or timeout, send successful delete response
922 subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
923 subDelRespMsg.RequestId.Id = trans.RequestId.Id
924 subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
925 subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
926 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
928 c.UpdateCounter(cSubDelRespToXapp)
929 c.rmrSendToXapp("", subs, trans)
932 //TODO handle subscription toward e2term insiged RemoveFromSubscription / hide handleSubscriptionDelete in it?
933 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
936 //-------------------------------------------------------------------
937 // SUBS CREATE Handling
938 //-------------------------------------------------------------------
939 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp) {
941 var removeSubscriptionFromDb bool = false
942 trans := c.tracker.NewSubsTransaction(subs)
943 subs.WaitTransactionTurn(trans)
944 defer subs.ReleaseTransactionTurn(trans)
945 defer trans.Release()
947 xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
949 subRfMsg, valid := subs.GetCachedResponse()
950 if subRfMsg == nil && valid == true {
951 event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
952 switch event.(type) {
953 case *e2ap.E2APSubscriptionResponse:
954 subRfMsg, valid = subs.SetCachedResponse(event, true)
955 subs.SubRespRcvd = true
956 case *e2ap.E2APSubscriptionFailure:
957 removeSubscriptionFromDb = true
958 subRfMsg, valid = subs.SetCachedResponse(event, false)
959 xapp.Logger.Info("SUBS-SubReq: internal delete due failure event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
960 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
961 case *SubmgrRestartTestEvent:
962 // This simulates that no response has been received and after restart subscriptions are restored from db
963 xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
966 xapp.Logger.Info("SUBS-SubReq: internal delete due default event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
967 removeSubscriptionFromDb = true
968 subRfMsg, valid = subs.SetCachedResponse(nil, false)
969 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
971 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
973 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
976 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
978 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
981 c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
982 parentTrans.SendEvent(subRfMsg, 0)
985 //-------------------------------------------------------------------
986 // SUBS DELETE Handling
987 //-------------------------------------------------------------------
989 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
991 trans := c.tracker.NewSubsTransaction(subs)
992 subs.WaitTransactionTurn(trans)
993 defer subs.ReleaseTransactionTurn(trans)
994 defer trans.Release()
996 xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
1000 if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
1003 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1007 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
1008 // If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if
1009 // RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...))
1010 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
1011 c.registry.UpdateSubscriptionToDb(subs, c)
1012 parentTrans.SendEvent(nil, 0)
1015 //-------------------------------------------------------------------
1016 // send to E2T Subscription Request
1017 //-------------------------------------------------------------------
1018 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
1020 var event interface{} = nil
1021 var timedOut bool = false
1022 const ricRequestorId = 123
1024 subReqMsg := subs.SubReqMsg
1025 subReqMsg.RequestId = subs.GetReqId().RequestId
1026 subReqMsg.RequestId.Id = ricRequestorId
1027 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
1029 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
1033 // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
1034 c.WriteSubscriptionToDb(subs)
1036 for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
1037 desc := fmt.Sprintf("(retry %d)", retries)
1039 c.UpdateCounter(cSubReqToE2)
1041 c.UpdateCounter(cSubReReqToE2)
1043 c.rmrSendToE2T(desc, subs, trans)
1044 if subs.DoNotWaitSubResp == false {
1045 event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
1047 c.UpdateCounter(cSubReqTimerExpiry)
1051 // Simulating case where subscrition request has been sent but response has not been received before restart
1052 event = &SubmgrRestartTestEvent{}
1056 xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1060 //-------------------------------------------------------------------
1061 // send to E2T Subscription Delete Request
1062 //-------------------------------------------------------------------
1064 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
1066 var event interface{}
1068 const ricRequestorId = 123
1070 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1071 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1072 subDelReqMsg.RequestId.Id = ricRequestorId
1073 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1074 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1076 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
1080 for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
1081 desc := fmt.Sprintf("(retry %d)", retries)
1083 c.UpdateCounter(cSubDelReqToE2)
1085 c.UpdateCounter(cSubDelReReqToE2)
1087 c.rmrSendToE2T(desc, subs, trans)
1088 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
1090 c.UpdateCounter(cSubDelReqTimerExpiry)
1095 xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1099 //-------------------------------------------------------------------
1100 // handle from E2T Subscription Response
1101 //-------------------------------------------------------------------
1102 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
1103 xapp.Logger.Info("MSG from E2T: %s", params.String())
1104 c.UpdateCounter(cSubRespFromE2)
1106 subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
1108 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
1111 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
1113 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
1116 trans := subs.GetTransaction()
1118 err = fmt.Errorf("Ongoing transaction not found")
1119 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
1122 sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
1123 if sendOk == false {
1124 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1125 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
1130 //-------------------------------------------------------------------
1131 // handle from E2T Subscription Failure
1132 //-------------------------------------------------------------------
1133 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
1134 xapp.Logger.Info("MSG from E2T: %s", params.String())
1135 c.UpdateCounter(cSubFailFromE2)
1136 subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
1138 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
1141 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
1143 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
1146 trans := subs.GetTransaction()
1148 err = fmt.Errorf("Ongoing transaction not found")
1149 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
1152 sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
1153 if sendOk == false {
1154 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1155 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
1160 //-------------------------------------------------------------------
1161 // handle from E2T Subscription Delete Response
1162 //-------------------------------------------------------------------
1163 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
1164 xapp.Logger.Info("MSG from E2T: %s", params.String())
1165 c.UpdateCounter(cSubDelRespFromE2)
1166 subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
1168 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1171 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
1173 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1176 trans := subs.GetTransaction()
1178 err = fmt.Errorf("Ongoing transaction not found")
1179 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
1182 sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
1183 if sendOk == false {
1184 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1185 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
1190 //-------------------------------------------------------------------
1191 // handle from E2T Subscription Delete Failure
1192 //-------------------------------------------------------------------
1193 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
1194 xapp.Logger.Info("MSG from E2T: %s", params.String())
1195 c.UpdateCounter(cSubDelFailFromE2)
1196 subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
1198 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1201 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
1203 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1206 trans := subs.GetTransaction()
1208 err = fmt.Errorf("Ongoing transaction not found")
1209 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
1212 sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
1213 if sendOk == false {
1214 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1215 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
1220 //-------------------------------------------------------------------
1222 //-------------------------------------------------------------------
1223 func typeofSubsMessage(v interface{}) string {
1228 //case *e2ap.E2APSubscriptionRequest:
1230 case *e2ap.E2APSubscriptionResponse:
1232 case *e2ap.E2APSubscriptionFailure:
1234 //case *e2ap.E2APSubscriptionDeleteRequest:
1235 // return "SubDelReq"
1236 case *e2ap.E2APSubscriptionDeleteResponse:
1238 case *e2ap.E2APSubscriptionDeleteFailure:
1245 //-------------------------------------------------------------------
1247 //-------------------------------------------------------------------
1248 func (c *Control) WriteSubscriptionToDb(subs *Subscription) {
1249 xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
1250 err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
1252 xapp.Logger.Error("%v", err)
1256 //-------------------------------------------------------------------
1258 //-------------------------------------------------------------------
1259 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) {
1261 if removeSubscriptionFromDb == true {
1262 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1263 c.RemoveSubscriptionFromDb(subs)
1265 // Update is needed for successful response and merge case here
1266 if subs.RetryFromXapp == false {
1267 c.WriteSubscriptionToDb(subs)
1270 subs.RetryFromXapp = false
1273 //-------------------------------------------------------------------
1275 //-------------------------------------------------------------------
1276 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
1277 xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
1278 err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
1280 xapp.Logger.Error("%v", err)
1284 //-------------------------------------------------------------------
1286 //-------------------------------------------------------------------
1287 func (c *Control) WriteRESTSubscriptionToDb(restSubId string, restSubs *RESTSubscription) {
1288 xapp.Logger.Debug("WriteRESTSubscriptionToDb() restSubId = %s", restSubId)
1289 err := c.WriteRESTSubscriptionToSdl(restSubId, restSubs)
1291 xapp.Logger.Error("%v", err)
1295 //-------------------------------------------------------------------
1297 //-------------------------------------------------------------------
1298 func (c *Control) UpdateRESTSubscriptionInDB(restSubId string, restSubs *RESTSubscription, removeRestSubscriptionFromDb bool) {
1300 if removeRestSubscriptionFromDb == true {
1301 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1302 c.RemoveRESTSubscriptionFromDb(restSubId)
1304 c.WriteRESTSubscriptionToDb(restSubId, restSubs)
1308 //-------------------------------------------------------------------
1310 //-------------------------------------------------------------------
1311 func (c *Control) RemoveRESTSubscriptionFromDb(restSubId string) {
1312 xapp.Logger.Debug("RemoveRESTSubscriptionFromDb() restSubId = %s", restSubId)
1313 err := c.RemoveRESTSubscriptionFromSdl(restSubId)
1315 xapp.Logger.Error("%v", err)
1319 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) {
1321 const ricRequestorId = 123
1322 xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
1324 // Send delete for every endpoint in the subscription
1325 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1326 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1327 subDelReqMsg.RequestId.Id = ricRequestorId
1328 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1329 mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1331 xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
1334 for _, endPoint := range subs.EpList.Endpoints {
1335 params := &xapp.RMRParams{}
1336 params.Mtype = mType
1337 params.SubId = int(subs.GetReqId().InstanceId)
1339 params.Meid = subs.Meid
1340 params.Src = endPoint.String()
1341 params.PayloadLen = len(payload.Buf)
1342 params.Payload = payload.Buf
1344 subs.DeleteFromDb = true
1345 c.handleXAPPSubscriptionDeleteRequest(params)
1349 func (c *Control) PrintRESTSubscriptionRequest(p *models.SubscriptionParams) {
1351 fmt.Println("CRESTSubscriptionRequest")
1357 if p.SubscriptionID != "" {
1358 fmt.Println(" SubscriptionID = ", p.SubscriptionID)
1360 fmt.Println(" SubscriptionID = ''")
1363 fmt.Printf(" ClientEndpoint.Host = %s\n", p.ClientEndpoint.Host)
1365 if p.ClientEndpoint.HTTPPort != nil {
1366 fmt.Printf(" ClientEndpoint.HTTPPort = %v\n", *p.ClientEndpoint.HTTPPort)
1368 fmt.Println(" ClientEndpoint.HTTPPort = nil")
1371 if p.ClientEndpoint.RMRPort != nil {
1372 fmt.Printf(" ClientEndpoint.RMRPort = %v\n", *p.ClientEndpoint.RMRPort)
1374 fmt.Println(" ClientEndpoint.RMRPort = nil")
1378 fmt.Printf(" Meid = %s\n", *p.Meid)
1380 fmt.Println(" Meid = nil")
1383 for _, subscriptionDetail := range p.SubscriptionDetails {
1384 if p.RANFunctionID != nil {
1385 fmt.Printf(" RANFunctionID = %v\n", *p.RANFunctionID)
1387 fmt.Println(" RANFunctionID = nil")
1389 fmt.Printf(" SubscriptionDetail.XappEventInstanceID = %v\n", *subscriptionDetail.XappEventInstanceID)
1390 fmt.Printf(" SubscriptionDetail.EventTriggers = %v\n", subscriptionDetail.EventTriggers)
1392 for _, actionToBeSetup := range subscriptionDetail.ActionToBeSetupList {
1393 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionID = %v\n", *actionToBeSetup.ActionID)
1394 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionType = %s\n", *actionToBeSetup.ActionType)
1395 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionDefinition = %v\n", actionToBeSetup.ActionDefinition)
1397 if actionToBeSetup.SubsequentAction != nil {
1398 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.SubsequentAction.SubsequentActionType = %s\n", *actionToBeSetup.SubsequentAction.SubsequentActionType)
1399 fmt.Printf(" SubscriptionDetail.ActionToBeSetup..SubsequentAction.TimeToWait = %s\n", *actionToBeSetup.SubsequentAction.TimeToWait)
1401 fmt.Println(" SubscriptionDetail.ActionToBeSetup.SubsequentAction = nil")