2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
30 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
31 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
32 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
33 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
34 httptransport "github.com/go-openapi/runtime/client"
35 "github.com/go-openapi/strfmt"
36 "github.com/gorilla/mux"
37 "github.com/segmentio/ksuid"
38 "github.com/spf13/viper"
41 //-----------------------------------------------------------------------------
43 //-----------------------------------------------------------------------------
45 func idstring(err error, entries ...fmt.Stringer) string {
46 var retval string = ""
47 var filler string = ""
48 for _, entry := range entries {
50 retval += filler + entry.String()
53 retval += filler + "(NIL)"
57 retval += filler + "err(" + err.Error() + ")"
63 //-----------------------------------------------------------------------------
65 //-----------------------------------------------------------------------------
67 var e2tSubReqTimeout time.Duration
68 var e2tSubDelReqTime time.Duration
69 var e2tRecvMsgTimeout time.Duration
70 var waitRouteCleanup_ms time.Duration
71 var e2tMaxSubReqTryCount uint64 // Initial try + retry
72 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
73 var readSubsFromDb string
74 var restDuplicateCtrl duplicateCtrl
75 var dbRetryForever string
84 restSubsDb Sdlnterface
87 Counters map[string]xapp.Counter
97 type SubmgrRestartTestEvent struct{}
98 type SubmgrRestartUpEvent struct{}
101 xapp.Logger.Info("SUBMGR")
103 viper.SetEnvPrefix("submgr")
104 viper.AllowEmptyEnv(true)
107 func NewControl() *Control {
109 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
110 rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
112 registry := new(Registry)
113 registry.Initialize()
114 registry.rtmgrClient = &rtmgrClient
116 tracker := new(Tracker)
119 c := &Control{e2ap: new(E2ap),
122 e2SubsDb: CreateSdl(),
123 restSubsDb: CreateRESTSdl(),
124 Counters: xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
127 c.ReadConfigParameters("")
129 // Register REST handler for testing support
130 xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
131 xapp.Resource.InjectRoute("/ric/v1/restsubscriptions", c.GetAllRestSubscriptions, "GET")
132 xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET")
134 go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler, c.SubscriptionDeleteHandlerCB)
136 if readSubsFromDb == "false" {
140 restDuplicateCtrl.Init()
142 // Read subscriptions from db
143 c.ReadE2Subscriptions()
144 c.ReadRESTSubscriptions()
147 xapp.Logger.Info("Reading subscriptions from db")
148 subIds, register, err := c.ReadAllSubscriptionsFromSdl()
150 xapp.Logger.Error("%v", err)
152 c.registry.subIds = subIds
153 c.registry.register = register
154 c.HandleUncompletedSubscriptions(register)
157 restSubscriptions, err := c.ReadAllRESTSubscriptionsFromSdl()
159 xapp.Logger.Error("%v", err)
161 c.registry.restSubscriptions = restSubscriptions
167 func (c *Control) SymptomDataHandler(w http.ResponseWriter, r *http.Request) {
168 subscriptions, _ := c.registry.QueryHandler()
169 xapp.Resource.SendSymptomDataJson(w, r, subscriptions, "platform/subscriptions.json")
172 //-------------------------------------------------------------------
174 //-------------------------------------------------------------------
175 func (c *Control) GetAllRestSubscriptions(w http.ResponseWriter, r *http.Request) {
176 xapp.Logger.Info("GetAllRestSubscriptions() called")
177 response := c.registry.GetAllRestSubscriptions()
181 //-------------------------------------------------------------------
183 //-------------------------------------------------------------------
184 func (c *Control) ReadE2Subscriptions() error {
187 var register map[uint32]*Subscription
188 for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
189 xapp.Logger.Info("Reading E2 subscriptions from db")
190 subIds, register, err = c.ReadAllSubscriptionsFromSdl()
192 xapp.Logger.Error("%v", err)
193 <-time.After(1 * time.Second)
195 c.registry.subIds = subIds
196 c.registry.register = register
197 c.HandleUncompletedSubscriptions(register)
201 xapp.Logger.Info("Continuing without retring")
205 //-------------------------------------------------------------------
207 //-------------------------------------------------------------------
208 func (c *Control) ReadRESTSubscriptions() error {
210 var restSubscriptions map[string]*RESTSubscription
211 for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
212 xapp.Logger.Info("Reading REST subscriptions from db")
213 restSubscriptions, err = c.ReadAllRESTSubscriptionsFromSdl()
215 xapp.Logger.Error("%v", err)
216 <-time.After(1 * time.Second)
218 c.registry.restSubscriptions = restSubscriptions
222 xapp.Logger.Info("Continuing without retring")
226 //-------------------------------------------------------------------
228 //-------------------------------------------------------------------
229 func (c *Control) ReadConfigParameters(f string) {
231 // viper.GetDuration returns nanoseconds
232 e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
233 if e2tSubReqTimeout == 0 {
234 e2tSubReqTimeout = 2000 * 1000000
236 xapp.Logger.Info("e2tSubReqTimeout %v", e2tSubReqTimeout)
237 e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
238 if e2tSubDelReqTime == 0 {
239 e2tSubDelReqTime = 2000 * 1000000
241 xapp.Logger.Info("e2tSubDelReqTime %v", e2tSubDelReqTime)
242 e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
243 if e2tRecvMsgTimeout == 0 {
244 e2tRecvMsgTimeout = 2000 * 1000000
246 xapp.Logger.Info("e2tRecvMsgTimeout %v", e2tRecvMsgTimeout)
248 e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
249 if e2tMaxSubReqTryCount == 0 {
250 e2tMaxSubReqTryCount = 1
252 xapp.Logger.Info("e2tMaxSubReqTryCount %v", e2tMaxSubReqTryCount)
254 e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
255 if e2tMaxSubDelReqTryCount == 0 {
256 e2tMaxSubDelReqTryCount = 1
258 xapp.Logger.Info("e2tMaxSubDelReqTryCount %v", e2tMaxSubDelReqTryCount)
260 readSubsFromDb = viper.GetString("controls.readSubsFromDb")
261 if readSubsFromDb == "" {
262 readSubsFromDb = "true"
264 xapp.Logger.Info("readSubsFromDb %v", readSubsFromDb)
266 dbTryCount = viper.GetInt("controls.dbTryCount")
270 xapp.Logger.Info("dbTryCount %v", dbTryCount)
272 dbRetryForever = viper.GetString("controls.dbRetryForever")
273 if dbRetryForever == "" {
274 dbRetryForever = "true"
276 xapp.Logger.Info("dbRetryForever %v", dbRetryForever)
278 c.LoggerLevel = viper.GetUint32("logger.level")
279 if c.LoggerLevel == 0 {
282 xapp.Logger.Info("LoggerLevel %v", c.LoggerLevel)
284 // Internal cfg parameter, used to define a wait time for RMR route clean-up. None default
285 // value 100ms used currently only in unittests.
286 waitRouteCleanup_ms = viper.GetDuration("controls.waitRouteCleanup_ms") * 1000000
287 if waitRouteCleanup_ms == 0 {
288 waitRouteCleanup_ms = 5000 * 1000000
290 xapp.Logger.Info("waitRouteCleanup %v", waitRouteCleanup_ms)
293 //-------------------------------------------------------------------
295 //-------------------------------------------------------------------
296 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
298 xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
299 for subId, subs := range register {
300 if subs.SubRespRcvd == false {
301 subs.NoRespToXapp = true
302 xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
303 c.SendSubscriptionDeleteReq(subs)
308 func (c *Control) ReadyCB(data interface{}) {
309 if c.RMRClient == nil {
310 c.RMRClient = xapp.Rmr
314 func (c *Control) Run() {
315 xapp.SetReadyCB(c.ReadyCB, nil)
316 xapp.AddConfigChangeListener(c.ReadConfigParameters)
320 //-------------------------------------------------------------------
322 //-------------------------------------------------------------------
323 func (c *Control) GetOrCreateRestSubscription(p *models.SubscriptionParams, md5sum string, xAppRmrEndpoint string) (*RESTSubscription, string, error) {
326 var restSubscription *RESTSubscription
329 prevRestSubsId, exists := restDuplicateCtrl.GetLastKnownRestSubsIdBasedOnMd5sum(md5sum)
330 if p.SubscriptionID == "" {
332 restSubscription, err = c.registry.GetRESTSubscription(prevRestSubsId, false)
333 if restSubscription != nil {
334 restSubId = prevRestSubsId
336 xapp.Logger.Info("Existing restSubId %s found by MD5sum %s for a request without subscription ID - using previous subscription", prevRestSubsId, md5sum)
338 xapp.Logger.Info("Existing restSubId %s found by MD5sum %s for a request without subscription ID - Note: %s", prevRestSubsId, md5sum, err.Error())
341 xapp.Logger.Info("None existing restSubId %s referred by MD5sum %s for a request without subscription ID - deleting cached entry", prevRestSubsId, md5sum)
342 restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
346 if restSubscription == nil {
347 restSubId = ksuid.New().String()
348 restSubscription, err = c.registry.CreateRESTSubscription(&restSubId, &xAppRmrEndpoint, p.Meid)
350 xapp.Logger.Error("%s", err.Error())
351 c.UpdateCounter(cRestSubFailToXapp)
356 restSubId = p.SubscriptionID
358 xapp.Logger.Info("RestSubscription ID %s provided via REST request", restSubId)
360 restSubscription, err = c.registry.GetRESTSubscription(restSubId, false)
362 xapp.Logger.Error("%s", err.Error())
363 c.UpdateCounter(cRestSubFailToXapp)
368 xapp.Logger.Info("Existing restSubscription found for ID %s, new request based on md5sum", restSubId)
370 xapp.Logger.Info("Existing restSubscription found for ID %s(%s), re-transmission based on md5sum match with previous request", prevRestSubsId, restSubId)
374 return restSubscription, restSubId, nil
377 //-------------------------------------------------------------------
379 //-------------------------------------------------------------------
380 func (c *Control) SubscriptionHandler(params interface{}) (*models.SubscriptionResponse, error) {
383 c.UpdateCounter(cRestSubReqFromXapp)
385 subResp := models.SubscriptionResponse{}
386 p := params.(*models.SubscriptionParams)
388 if c.LoggerLevel > 2 {
389 c.PrintRESTSubscriptionRequest(p)
392 if p.ClientEndpoint == nil {
393 xapp.Logger.Error("ClientEndpoint == nil")
394 c.UpdateCounter(cRestSubFailToXapp)
395 return nil, fmt.Errorf("")
398 _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*p.ClientEndpoint)
400 xapp.Logger.Error("%s", err.Error())
401 c.UpdateCounter(cRestSubFailToXapp)
405 md5sum, err := CalculateRequestMd5sum(params)
407 xapp.Logger.Error("Failed to generate md5sum from incoming request - %s", err.Error())
410 restSubscription, restSubId, err := c.GetOrCreateRestSubscription(p, md5sum, xAppRmrEndpoint)
412 xapp.Logger.Error("Failed to get/allocate REST subscription")
416 subResp.SubscriptionID = &restSubId
417 subReqList := e2ap.SubscriptionRequestList{}
418 err = c.e2ap.FillSubscriptionReqMsgs(params, &subReqList, restSubscription)
420 xapp.Logger.Error("%s", err.Error())
421 restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
422 c.registry.DeleteRESTSubscription(&restSubId)
423 c.UpdateCounter(cRestSubFailToXapp)
427 duplicate := restDuplicateCtrl.IsDuplicateToOngoingTransaction(restSubId, md5sum)
429 xapp.Logger.Info("Retransmission blocker direct ACK for request of restSubsId %s restSubId MD5sum %s as retransmission", restSubId, md5sum)
430 c.UpdateCounter(cRestSubRespToXapp)
434 c.WriteRESTSubscriptionToDb(restSubId, restSubscription)
436 go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId, xAppRmrEndpoint, md5sum)
438 c.UpdateCounter(cRestSubRespToXapp)
442 //-------------------------------------------------------------------
444 //-------------------------------------------------------------------
446 func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription, subReqList *e2ap.SubscriptionRequestList,
447 clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string, xAppRmrEndpoint string, md5sum string) {
449 xapp.Logger.Info("Subscription Request count=%v ", len(subReqList.E2APSubscriptionRequests))
451 var xAppEventInstanceID int64
452 var e2EventInstanceID int64
454 defer restDuplicateCtrl.SetMd5sumFromLastOkRequest(*restSubId, md5sum)
456 for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ {
457 subReqMsg := subReqList.E2APSubscriptionRequests[index]
458 xAppEventInstanceID = (int64)(subReqMsg.RequestId.Id)
460 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), *restSubId, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid})
462 // Send notification to xApp that prosessing of a Subscription Request has failed.
463 err := fmt.Errorf("Tracking failure")
464 c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans)
468 xapp.Logger.Info("Handle SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
470 subRespMsg, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, *restSubId)
472 xapp.Logger.Info("Handled SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
475 c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans)
477 e2EventInstanceID = (int64)(subRespMsg.RequestId.InstanceId)
478 restSubscription.AddMd5Sum(md5sum)
479 xapp.Logger.Info("SubscriptionRequest index=%v processed successfullyfor %s. endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
480 index, *restSubId, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
481 c.sendSuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, e2EventInstanceID, clientEndpoint, trans)
487 //-------------------------------------------------------------------
489 //------------------------------------------------------------------
490 func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, meid *string,
491 restSubId string) (*e2ap.E2APSubscriptionResponse, error) {
493 err := c.tracker.Track(trans)
495 xapp.Logger.Error("XAPP-SubReq Tracking error: %s", idstring(err, trans))
496 err = fmt.Errorf("Tracking failure")
500 subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
502 xapp.Logger.Error("XAPP-SubReq Assign error: %s", idstring(err, trans))
509 go c.handleSubscriptionCreate(subs, trans)
510 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
514 switch themsg := event.(type) {
515 case *e2ap.E2APSubscriptionResponse:
518 case *e2ap.E2APSubscriptionFailure:
519 err = fmt.Errorf("E2 SubscriptionFailure received")
522 err = fmt.Errorf("unexpected E2 subscription response received")
526 err = fmt.Errorf("E2 subscription response timeout")
529 xapp.Logger.Error("XAPP-SubReq E2 subscription failed %s", idstring(err, trans, subs))
530 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
534 //-------------------------------------------------------------------
536 //-------------------------------------------------------------------
537 func (c *Control) sendUnsuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, err error,
538 clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp) {
540 // Send notification to xApp that prosessing of a Subscription Request has failed.
541 e2EventInstanceID := (int64)(0)
542 errorCause := err.Error()
543 resp := &models.SubscriptionResponse{
544 SubscriptionID: restSubId,
545 SubscriptionInstances: []*models.SubscriptionInstance{
546 &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
547 ErrorCause: &errorCause,
548 XappEventInstanceID: &xAppEventInstanceID},
551 // Mark REST subscription request processed.
552 restSubscription.SetProcessed(err)
553 c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
555 xapp.Logger.Info("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
556 errorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
558 xapp.Logger.Info("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v",
559 errorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID)
562 c.UpdateCounter(cRestSubFailNotifToXapp)
563 xapp.Subscription.Notify(resp, *clientEndpoint)
566 //-------------------------------------------------------------------
568 //-------------------------------------------------------------------
569 func (c *Control) sendSuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, e2EventInstanceID int64,
570 clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp) {
572 // Store successfully processed InstanceId for deletion
573 restSubscription.AddE2InstanceId((uint32)(e2EventInstanceID))
574 restSubscription.AddXappIdToE2Id(xAppEventInstanceID, e2EventInstanceID)
576 // Send notification to xApp that a Subscription Request has been processed.
577 resp := &models.SubscriptionResponse{
578 SubscriptionID: restSubId,
579 SubscriptionInstances: []*models.SubscriptionInstance{
580 &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
582 XappEventInstanceID: &xAppEventInstanceID},
585 // Mark REST subscription request processesd.
586 restSubscription.SetProcessed(nil)
587 c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
588 xapp.Logger.Info("Sending successful REST notification to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
589 clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
591 c.UpdateCounter(cRestSubNotifToXapp)
592 xapp.Subscription.Notify(resp, *clientEndpoint)
595 //-------------------------------------------------------------------
597 //-------------------------------------------------------------------
598 func (c *Control) SubscriptionDeleteHandlerCB(restSubId string) error {
601 c.UpdateCounter(cRestSubDelReqFromXapp)
603 xapp.Logger.Info("SubscriptionDeleteRequest from XAPP")
605 restSubscription, err := c.registry.GetRESTSubscription(restSubId, true)
607 xapp.Logger.Error("%s", err.Error())
608 if restSubscription == nil {
609 // Subscription was not found
612 if restSubscription.SubReqOngoing == true {
613 err := fmt.Errorf("Handling of the REST Subscription Request still ongoing %s", restSubId)
614 xapp.Logger.Error("%s", err.Error())
616 } else if restSubscription.SubDelReqOngoing == true {
617 // Previous request for same restSubId still ongoing
623 xAppRmrEndPoint := restSubscription.xAppRmrEndPoint
625 xapp.Logger.Info("Deleteting handler: processing instances = %v", restSubscription.InstanceIds)
626 for _, instanceId := range restSubscription.InstanceIds {
627 xAppEventInstanceID, err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId)
630 xapp.Logger.Error("%s", err.Error())
633 xapp.Logger.Info("Deleteting instanceId = %v", instanceId)
634 restSubscription.DeleteXappIdToE2Id(xAppEventInstanceID)
635 restSubscription.DeleteE2InstanceId(instanceId)
637 restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(restSubscription.lastReqMd5sum)
638 c.registry.DeleteRESTSubscription(&restSubId)
639 c.RemoveRESTSubscriptionFromDb(restSubId)
642 c.UpdateCounter(cRestSubDelRespToXapp)
647 //-------------------------------------------------------------------
649 //-------------------------------------------------------------------
650 func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32) (int64, error) {
652 var xAppEventInstanceID int64
653 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{instanceId})
655 xapp.Logger.Info("Subscription Delete Handler subscription for restSubId=%v, E2EventInstanceID=%v not found %s",
656 restSubId, instanceId, idstring(err, nil))
657 return xAppEventInstanceID, nil
660 xAppEventInstanceID = int64(subs.ReqId.Id)
661 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(*endPoint), *restSubId, e2ap.RequestId{subs.ReqId.Id, 0}, &xapp.RMRMeid{RanName: *meid})
663 err := fmt.Errorf("XAPP-SubDelReq transaction not created. restSubId %s, endPoint %s, meid %s, instanceId %v", *restSubId, *endPoint, *meid, instanceId)
664 xapp.Logger.Error("%s", err.Error())
666 defer trans.Release()
668 err = c.tracker.Track(trans)
670 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
671 xapp.Logger.Error("%s", err.Error())
672 return xAppEventInstanceID, &time.ParseError{}
677 go c.handleSubscriptionDelete(subs, trans)
678 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
680 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
682 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
684 return xAppEventInstanceID, nil
687 //-------------------------------------------------------------------
689 //-------------------------------------------------------------------
690 func (c *Control) QueryHandler() (models.SubscriptionList, error) {
691 xapp.Logger.Info("QueryHandler() called")
695 return c.registry.QueryHandler()
698 func (c *Control) TestRestHandler(w http.ResponseWriter, r *http.Request) {
699 xapp.Logger.Info("TestRestHandler() called")
701 pathParams := mux.Vars(r)
702 s := pathParams["testId"]
704 // This can be used to delete single subscription from db
705 if contains := strings.Contains(s, "deletesubid="); contains == true {
706 var splits = strings.Split(s, "=")
707 if subId, err := strconv.ParseInt(splits[1], 10, 64); err == nil {
708 xapp.Logger.Info("RemoveSubscriptionFromSdl() called. subId = %v", subId)
709 c.RemoveSubscriptionFromSdl(uint32(subId))
714 // This can be used to remove all subscriptions db from
716 xapp.Logger.Info("RemoveAllSubscriptionsFromSdl() called")
717 c.RemoveAllSubscriptionsFromSdl()
718 c.RemoveAllRESTSubscriptionsFromSdl()
722 // This is meant to cause submgr's restart in testing
724 xapp.Logger.Info("os.Exit(1) called")
728 xapp.Logger.Info("Unsupported rest command received %s", s)
731 //-------------------------------------------------------------------
733 //-------------------------------------------------------------------
735 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
736 params := &xapp.RMRParams{}
737 params.Mtype = trans.GetMtype()
738 params.SubId = int(subs.GetReqId().InstanceId)
740 params.Meid = subs.GetMeid()
742 params.PayloadLen = len(trans.Payload.Buf)
743 params.Payload = trans.Payload.Buf
745 xapp.Logger.Info("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
746 err = c.SendWithRetry(params, false, 5)
748 xapp.Logger.Error("rmrSendToE2T: Send failed: %+v", err)
753 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
755 params := &xapp.RMRParams{}
756 params.Mtype = trans.GetMtype()
757 params.SubId = int(subs.GetReqId().InstanceId)
758 params.Xid = trans.GetXid()
759 params.Meid = trans.GetMeid()
761 params.PayloadLen = len(trans.Payload.Buf)
762 params.Payload = trans.Payload.Buf
764 xapp.Logger.Info("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
765 err = c.SendWithRetry(params, false, 5)
767 xapp.Logger.Error("rmrSendToXapp: Send failed: %+v", err)
772 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
773 if c.RMRClient == nil {
774 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
775 xapp.Logger.Error("%s", err.Error())
780 defer c.RMRClient.Free(msg.Mbuf)
782 // xapp-frame might use direct access to c buffer and
783 // when msg.Mbuf is freed, someone might take it into use
784 // and payload data might be invalid inside message handle function
786 // subscriptions won't load system a lot so there is no
787 // real performance hit by cloning buffer into new go byte slice
788 cPay := append(msg.Payload[:0:0], msg.Payload...)
790 msg.PayloadLen = len(cPay)
793 case xapp.RIC_SUB_REQ:
794 go c.handleXAPPSubscriptionRequest(msg)
795 case xapp.RIC_SUB_RESP:
796 go c.handleE2TSubscriptionResponse(msg)
797 case xapp.RIC_SUB_FAILURE:
798 go c.handleE2TSubscriptionFailure(msg)
799 case xapp.RIC_SUB_DEL_REQ:
800 go c.handleXAPPSubscriptionDeleteRequest(msg)
801 case xapp.RIC_SUB_DEL_RESP:
802 go c.handleE2TSubscriptionDeleteResponse(msg)
803 case xapp.RIC_SUB_DEL_FAILURE:
804 go c.handleE2TSubscriptionDeleteFailure(msg)
806 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
811 //-------------------------------------------------------------------
812 // handle from XAPP Subscription Request
813 //------------------------------------------------------------------
814 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
815 xapp.Logger.Info("MSG from XAPP: %s", params.String())
816 c.UpdateCounter(cSubReqFromXapp)
818 subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
820 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
824 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
826 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
829 defer trans.Release()
831 if err = c.tracker.Track(trans); err != nil {
832 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
836 //TODO handle subscription toward e2term inside AssignToSubscription / hide handleSubscriptionCreate in it?
837 subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
839 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
843 c.wakeSubscriptionRequest(subs, trans)
846 //-------------------------------------------------------------------
847 // Wake Subscription Request to E2node
848 //------------------------------------------------------------------
849 func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) {
851 go c.handleSubscriptionCreate(subs, trans)
852 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
855 switch themsg := event.(type) {
856 case *e2ap.E2APSubscriptionResponse:
857 themsg.RequestId.Id = trans.RequestId.Id
858 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
861 c.UpdateCounter(cSubRespToXapp)
862 c.rmrSendToXapp("", subs, trans)
865 case *e2ap.E2APSubscriptionFailure:
866 themsg.RequestId.Id = trans.RequestId.Id
867 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
869 c.UpdateCounter(cSubFailToXapp)
870 c.rmrSendToXapp("", subs, trans)
876 xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(err, trans, subs))
877 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
880 //-------------------------------------------------------------------
881 // handle from XAPP Subscription Delete Request
882 //------------------------------------------------------------------
883 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
884 xapp.Logger.Info("MSG from XAPP: %s", params.String())
885 c.UpdateCounter(cSubDelReqFromXapp)
887 subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
889 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
893 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
895 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
898 defer trans.Release()
900 err = c.tracker.Track(trans)
902 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
906 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
908 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
915 go c.handleSubscriptionDelete(subs, trans)
916 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
918 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
920 if subs.NoRespToXapp == true {
921 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
925 // Whatever is received success, fail or timeout, send successful delete response
926 subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
927 subDelRespMsg.RequestId.Id = trans.RequestId.Id
928 subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
929 subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
930 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
932 c.UpdateCounter(cSubDelRespToXapp)
933 c.rmrSendToXapp("", subs, trans)
936 //TODO handle subscription toward e2term insiged RemoveFromSubscription / hide handleSubscriptionDelete in it?
937 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
940 //-------------------------------------------------------------------
941 // SUBS CREATE Handling
942 //-------------------------------------------------------------------
943 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp) {
945 var removeSubscriptionFromDb bool = false
946 trans := c.tracker.NewSubsTransaction(subs)
947 subs.WaitTransactionTurn(trans)
948 defer subs.ReleaseTransactionTurn(trans)
949 defer trans.Release()
951 xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
953 subRfMsg, valid := subs.GetCachedResponse()
954 if subRfMsg == nil && valid == true {
955 event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
956 switch event.(type) {
957 case *e2ap.E2APSubscriptionResponse:
958 subRfMsg, valid = subs.SetCachedResponse(event, true)
959 subs.SubRespRcvd = true
960 case *e2ap.E2APSubscriptionFailure:
961 removeSubscriptionFromDb = true
962 subRfMsg, valid = subs.SetCachedResponse(event, false)
963 xapp.Logger.Info("SUBS-SubReq: internal delete due failure event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
964 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
965 case *SubmgrRestartTestEvent:
966 // This simulates that no response has been received and after restart subscriptions are restored from db
967 xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
970 xapp.Logger.Info("SUBS-SubReq: internal delete due default event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
971 removeSubscriptionFromDb = true
972 subRfMsg, valid = subs.SetCachedResponse(nil, false)
973 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
975 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
977 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
980 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
982 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
985 c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
986 parentTrans.SendEvent(subRfMsg, 0)
989 //-------------------------------------------------------------------
990 // SUBS DELETE Handling
991 //-------------------------------------------------------------------
993 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
995 trans := c.tracker.NewSubsTransaction(subs)
996 subs.WaitTransactionTurn(trans)
997 defer subs.ReleaseTransactionTurn(trans)
998 defer trans.Release()
1000 xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
1004 if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
1007 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1011 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
1012 // If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if
1013 // RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...))
1014 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
1015 c.registry.UpdateSubscriptionToDb(subs, c)
1016 parentTrans.SendEvent(nil, 0)
1019 //-------------------------------------------------------------------
1020 // send to E2T Subscription Request
1021 //-------------------------------------------------------------------
1022 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
1024 var event interface{} = nil
1025 var timedOut bool = false
1026 const ricRequestorId = 123
1028 subReqMsg := subs.SubReqMsg
1029 subReqMsg.RequestId = subs.GetReqId().RequestId
1030 subReqMsg.RequestId.Id = ricRequestorId
1031 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
1033 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
1037 // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
1038 c.WriteSubscriptionToDb(subs)
1040 for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
1041 desc := fmt.Sprintf("(retry %d)", retries)
1043 c.UpdateCounter(cSubReqToE2)
1045 c.UpdateCounter(cSubReReqToE2)
1047 c.rmrSendToE2T(desc, subs, trans)
1048 if subs.DoNotWaitSubResp == false {
1049 event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
1051 c.UpdateCounter(cSubReqTimerExpiry)
1055 // Simulating case where subscrition request has been sent but response has not been received before restart
1056 event = &SubmgrRestartTestEvent{}
1060 xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1064 //-------------------------------------------------------------------
1065 // send to E2T Subscription Delete Request
1066 //-------------------------------------------------------------------
1068 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
1070 var event interface{}
1072 const ricRequestorId = 123
1074 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1075 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1076 subDelReqMsg.RequestId.Id = ricRequestorId
1077 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1078 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1080 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
1084 for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
1085 desc := fmt.Sprintf("(retry %d)", retries)
1087 c.UpdateCounter(cSubDelReqToE2)
1089 c.UpdateCounter(cSubDelReReqToE2)
1091 c.rmrSendToE2T(desc, subs, trans)
1092 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
1094 c.UpdateCounter(cSubDelReqTimerExpiry)
1099 xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1103 //-------------------------------------------------------------------
1104 // handle from E2T Subscription Response
1105 //-------------------------------------------------------------------
1106 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
1107 xapp.Logger.Info("MSG from E2T: %s", params.String())
1108 c.UpdateCounter(cSubRespFromE2)
1110 subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
1112 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
1115 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
1117 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
1120 trans := subs.GetTransaction()
1122 err = fmt.Errorf("Ongoing transaction not found")
1123 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
1126 sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
1127 if sendOk == false {
1128 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1129 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
1134 //-------------------------------------------------------------------
1135 // handle from E2T Subscription Failure
1136 //-------------------------------------------------------------------
1137 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
1138 xapp.Logger.Info("MSG from E2T: %s", params.String())
1139 c.UpdateCounter(cSubFailFromE2)
1140 subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
1142 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
1145 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
1147 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
1150 trans := subs.GetTransaction()
1152 err = fmt.Errorf("Ongoing transaction not found")
1153 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
1156 sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
1157 if sendOk == false {
1158 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1159 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
1164 //-------------------------------------------------------------------
1165 // handle from E2T Subscription Delete Response
1166 //-------------------------------------------------------------------
1167 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
1168 xapp.Logger.Info("MSG from E2T: %s", params.String())
1169 c.UpdateCounter(cSubDelRespFromE2)
1170 subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
1172 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1175 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
1177 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1180 trans := subs.GetTransaction()
1182 err = fmt.Errorf("Ongoing transaction not found")
1183 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
1186 sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
1187 if sendOk == false {
1188 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1189 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
1194 //-------------------------------------------------------------------
1195 // handle from E2T Subscription Delete Failure
1196 //-------------------------------------------------------------------
1197 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
1198 xapp.Logger.Info("MSG from E2T: %s", params.String())
1199 c.UpdateCounter(cSubDelFailFromE2)
1200 subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
1202 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1205 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
1207 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1210 trans := subs.GetTransaction()
1212 err = fmt.Errorf("Ongoing transaction not found")
1213 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
1216 sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
1217 if sendOk == false {
1218 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1219 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
1224 //-------------------------------------------------------------------
1226 //-------------------------------------------------------------------
1227 func typeofSubsMessage(v interface{}) string {
1232 //case *e2ap.E2APSubscriptionRequest:
1234 case *e2ap.E2APSubscriptionResponse:
1236 case *e2ap.E2APSubscriptionFailure:
1238 //case *e2ap.E2APSubscriptionDeleteRequest:
1239 // return "SubDelReq"
1240 case *e2ap.E2APSubscriptionDeleteResponse:
1242 case *e2ap.E2APSubscriptionDeleteFailure:
1249 //-------------------------------------------------------------------
1251 //-------------------------------------------------------------------
1252 func (c *Control) WriteSubscriptionToDb(subs *Subscription) {
1253 xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
1254 err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
1256 xapp.Logger.Error("%v", err)
1260 //-------------------------------------------------------------------
1262 //-------------------------------------------------------------------
1263 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) {
1265 if removeSubscriptionFromDb == true {
1266 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1267 c.RemoveSubscriptionFromDb(subs)
1269 // Update is needed for successful response and merge case here
1270 if subs.RetryFromXapp == false {
1271 c.WriteSubscriptionToDb(subs)
1274 subs.RetryFromXapp = false
1277 //-------------------------------------------------------------------
1279 //-------------------------------------------------------------------
1280 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
1281 xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
1282 err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
1284 xapp.Logger.Error("%v", err)
1288 //-------------------------------------------------------------------
1290 //-------------------------------------------------------------------
1291 func (c *Control) WriteRESTSubscriptionToDb(restSubId string, restSubs *RESTSubscription) {
1292 xapp.Logger.Debug("WriteRESTSubscriptionToDb() restSubId = %s", restSubId)
1293 err := c.WriteRESTSubscriptionToSdl(restSubId, restSubs)
1295 xapp.Logger.Error("%v", err)
1299 //-------------------------------------------------------------------
1301 //-------------------------------------------------------------------
1302 func (c *Control) UpdateRESTSubscriptionInDB(restSubId string, restSubs *RESTSubscription, removeRestSubscriptionFromDb bool) {
1304 if removeRestSubscriptionFromDb == true {
1305 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1306 c.RemoveRESTSubscriptionFromDb(restSubId)
1308 c.WriteRESTSubscriptionToDb(restSubId, restSubs)
1312 //-------------------------------------------------------------------
1314 //-------------------------------------------------------------------
1315 func (c *Control) RemoveRESTSubscriptionFromDb(restSubId string) {
1316 xapp.Logger.Debug("RemoveRESTSubscriptionFromDb() restSubId = %s", restSubId)
1317 err := c.RemoveRESTSubscriptionFromSdl(restSubId)
1319 xapp.Logger.Error("%v", err)
1323 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) {
1325 const ricRequestorId = 123
1326 xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
1328 // Send delete for every endpoint in the subscription
1329 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1330 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1331 subDelReqMsg.RequestId.Id = ricRequestorId
1332 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1333 mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1335 xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
1338 for _, endPoint := range subs.EpList.Endpoints {
1339 params := &xapp.RMRParams{}
1340 params.Mtype = mType
1341 params.SubId = int(subs.GetReqId().InstanceId)
1343 params.Meid = subs.Meid
1344 params.Src = endPoint.String()
1345 params.PayloadLen = len(payload.Buf)
1346 params.Payload = payload.Buf
1348 subs.DeleteFromDb = true
1349 c.handleXAPPSubscriptionDeleteRequest(params)
1353 func (c *Control) PrintRESTSubscriptionRequest(p *models.SubscriptionParams) {
1355 fmt.Println("CRESTSubscriptionRequest")
1361 if p.SubscriptionID != "" {
1362 fmt.Println(" SubscriptionID = ", p.SubscriptionID)
1364 fmt.Println(" SubscriptionID = ''")
1367 fmt.Printf(" ClientEndpoint.Host = %s\n", p.ClientEndpoint.Host)
1369 if p.ClientEndpoint.HTTPPort != nil {
1370 fmt.Printf(" ClientEndpoint.HTTPPort = %v\n", *p.ClientEndpoint.HTTPPort)
1372 fmt.Println(" ClientEndpoint.HTTPPort = nil")
1375 if p.ClientEndpoint.RMRPort != nil {
1376 fmt.Printf(" ClientEndpoint.RMRPort = %v\n", *p.ClientEndpoint.RMRPort)
1378 fmt.Println(" ClientEndpoint.RMRPort = nil")
1382 fmt.Printf(" Meid = %s\n", *p.Meid)
1384 fmt.Println(" Meid = nil")
1387 for _, subscriptionDetail := range p.SubscriptionDetails {
1388 if p.RANFunctionID != nil {
1389 fmt.Printf(" RANFunctionID = %v\n", *p.RANFunctionID)
1391 fmt.Println(" RANFunctionID = nil")
1393 fmt.Printf(" SubscriptionDetail.XappEventInstanceID = %v\n", *subscriptionDetail.XappEventInstanceID)
1394 fmt.Printf(" SubscriptionDetail.EventTriggers = %v\n", subscriptionDetail.EventTriggers)
1396 for _, actionToBeSetup := range subscriptionDetail.ActionToBeSetupList {
1397 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionID = %v\n", *actionToBeSetup.ActionID)
1398 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionType = %s\n", *actionToBeSetup.ActionType)
1399 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionDefinition = %v\n", actionToBeSetup.ActionDefinition)
1401 if actionToBeSetup.SubsequentAction != nil {
1402 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.SubsequentAction.SubsequentActionType = %s\n", *actionToBeSetup.SubsequentAction.SubsequentActionType)
1403 fmt.Printf(" SubscriptionDetail.ActionToBeSetup..SubsequentAction.TimeToWait = %s\n", *actionToBeSetup.SubsequentAction.TimeToWait)
1405 fmt.Println(" SubscriptionDetail.ActionToBeSetup.SubsequentAction = nil")