2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
30 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
31 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
32 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
33 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
34 httptransport "github.com/go-openapi/runtime/client"
35 "github.com/go-openapi/strfmt"
36 "github.com/gorilla/mux"
37 "github.com/segmentio/ksuid"
38 "github.com/spf13/viper"
41 //-----------------------------------------------------------------------------
43 //-----------------------------------------------------------------------------
45 func idstring(err error, entries ...fmt.Stringer) string {
46 var retval string = ""
47 var filler string = ""
48 for _, entry := range entries {
50 retval += filler + entry.String()
53 retval += filler + "(NIL)"
57 retval += filler + "err(" + err.Error() + ")"
63 //-----------------------------------------------------------------------------
65 //-----------------------------------------------------------------------------
67 var e2tSubReqTimeout time.Duration
68 var e2tSubDelReqTime time.Duration
69 var e2tRecvMsgTimeout time.Duration
70 var waitRouteCleanup_ms time.Duration
71 var e2tMaxSubReqTryCount uint64 // Initial try + retry
72 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
73 var readSubsFromDb string
74 var restDuplicateCtrl duplicateCtrl
75 var dbRetryForever string
84 restSubsDb Sdlnterface
87 Counters map[string]xapp.Counter
97 type SubmgrRestartTestEvent struct{}
98 type SubmgrRestartUpEvent struct{}
101 xapp.Logger.Info("SUBMGR")
103 viper.SetEnvPrefix("submgr")
104 viper.AllowEmptyEnv(true)
107 func NewControl() *Control {
109 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
110 rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
112 registry := new(Registry)
113 registry.Initialize()
114 registry.rtmgrClient = &rtmgrClient
116 tracker := new(Tracker)
119 c := &Control{e2ap: new(E2ap),
122 e2SubsDb: CreateSdl(),
123 restSubsDb: CreateRESTSdl(),
124 Counters: xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
127 c.ReadConfigParameters("")
129 // Register REST handler for testing support
130 xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
131 xapp.Resource.InjectRoute("/ric/v1/restsubscriptions", c.GetAllRestSubscriptions, "GET")
132 xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET")
134 go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler, c.SubscriptionDeleteHandlerCB)
136 if readSubsFromDb == "false" {
140 restDuplicateCtrl.Init()
142 // Read subscriptions from db
143 c.ReadE2Subscriptions()
144 c.ReadRESTSubscriptions()
147 xapp.Logger.Info("Reading subscriptions from db")
148 subIds, register, err := c.ReadAllSubscriptionsFromSdl()
150 xapp.Logger.Error("%v", err)
152 c.registry.subIds = subIds
153 c.registry.register = register
154 c.HandleUncompletedSubscriptions(register)
157 restSubscriptions, err := c.ReadAllRESTSubscriptionsFromSdl()
159 xapp.Logger.Error("%v", err)
161 c.registry.restSubscriptions = restSubscriptions
167 func (c *Control) SymptomDataHandler(w http.ResponseWriter, r *http.Request) {
168 subscriptions, _ := c.registry.QueryHandler()
169 xapp.Resource.SendSymptomDataJson(w, r, subscriptions, "platform/subscriptions.json")
172 //-------------------------------------------------------------------
174 //-------------------------------------------------------------------
175 func (c *Control) GetAllRestSubscriptions(w http.ResponseWriter, r *http.Request) {
176 xapp.Logger.Info("GetAllRestSubscriptions() called")
177 response := c.registry.GetAllRestSubscriptions()
181 //-------------------------------------------------------------------
183 //-------------------------------------------------------------------
184 func (c *Control) ReadE2Subscriptions() error {
187 var register map[uint32]*Subscription
188 for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
189 xapp.Logger.Info("Reading E2 subscriptions from db")
190 subIds, register, err = c.ReadAllSubscriptionsFromSdl()
192 xapp.Logger.Error("%v", err)
193 <-time.After(1 * time.Second)
195 c.registry.subIds = subIds
196 c.registry.register = register
197 c.HandleUncompletedSubscriptions(register)
201 xapp.Logger.Info("Continuing without retring")
205 //-------------------------------------------------------------------
207 //-------------------------------------------------------------------
208 func (c *Control) ReadRESTSubscriptions() error {
210 var restSubscriptions map[string]*RESTSubscription
211 for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
212 xapp.Logger.Info("Reading REST subscriptions from db")
213 restSubscriptions, err = c.ReadAllRESTSubscriptionsFromSdl()
215 xapp.Logger.Error("%v", err)
216 <-time.After(1 * time.Second)
218 c.registry.restSubscriptions = restSubscriptions
222 xapp.Logger.Info("Continuing without retring")
226 //-------------------------------------------------------------------
228 //-------------------------------------------------------------------
229 func (c *Control) ReadConfigParameters(f string) {
231 // viper.GetDuration returns nanoseconds
232 e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
233 if e2tSubReqTimeout == 0 {
234 e2tSubReqTimeout = 2000 * 1000000
236 xapp.Logger.Info("e2tSubReqTimeout %v", e2tSubReqTimeout)
237 e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
238 if e2tSubDelReqTime == 0 {
239 e2tSubDelReqTime = 2000 * 1000000
241 xapp.Logger.Info("e2tSubDelReqTime %v", e2tSubDelReqTime)
242 e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
243 if e2tRecvMsgTimeout == 0 {
244 e2tRecvMsgTimeout = 2000 * 1000000
246 xapp.Logger.Info("e2tRecvMsgTimeout %v", e2tRecvMsgTimeout)
248 e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
249 if e2tMaxSubReqTryCount == 0 {
250 e2tMaxSubReqTryCount = 1
252 xapp.Logger.Info("e2tMaxSubReqTryCount %v", e2tMaxSubReqTryCount)
254 e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
255 if e2tMaxSubDelReqTryCount == 0 {
256 e2tMaxSubDelReqTryCount = 1
258 xapp.Logger.Info("e2tMaxSubDelReqTryCount %v", e2tMaxSubDelReqTryCount)
260 readSubsFromDb = viper.GetString("controls.readSubsFromDb")
261 if readSubsFromDb == "" {
262 readSubsFromDb = "true"
264 xapp.Logger.Info("readSubsFromDb %v", readSubsFromDb)
266 dbTryCount = viper.GetInt("controls.dbTryCount")
270 xapp.Logger.Info("dbTryCount %v", dbTryCount)
272 dbRetryForever = viper.GetString("controls.dbRetryForever")
273 if dbRetryForever == "" {
274 dbRetryForever = "true"
276 xapp.Logger.Info("dbRetryForever %v", dbRetryForever)
278 c.LoggerLevel = viper.GetUint32("logger.level")
279 if c.LoggerLevel == 0 {
282 xapp.Logger.Info("LoggerLevel %v", c.LoggerLevel)
284 // Internal cfg parameter, used to define a wait time for RMR route clean-up. None default
285 // value 100ms used currently only in unittests.
286 waitRouteCleanup_ms = viper.GetDuration("controls.waitRouteCleanup_ms") * 1000000
287 if waitRouteCleanup_ms == 0 {
288 waitRouteCleanup_ms = 5000 * 1000000
290 xapp.Logger.Info("waitRouteCleanup %v", waitRouteCleanup_ms)
293 //-------------------------------------------------------------------
295 //-------------------------------------------------------------------
296 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
298 xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
299 for subId, subs := range register {
300 if subs.SubRespRcvd == false {
301 subs.NoRespToXapp = true
302 xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
303 c.SendSubscriptionDeleteReq(subs)
308 func (c *Control) ReadyCB(data interface{}) {
309 if c.RMRClient == nil {
310 c.RMRClient = xapp.Rmr
314 func (c *Control) Run() {
315 xapp.SetReadyCB(c.ReadyCB, nil)
316 xapp.AddConfigChangeListener(c.ReadConfigParameters)
320 //-------------------------------------------------------------------
322 //-------------------------------------------------------------------
323 func (c *Control) GetOrCreateRestSubscription(p *models.SubscriptionParams, md5sum string, xAppRmrEndpoint string) (*RESTSubscription, string, error) {
326 var restSubscription *RESTSubscription
329 prevRestSubsId, exists := restDuplicateCtrl.GetLastKnownRestSubsIdBasedOnMd5sum(md5sum)
330 if p.SubscriptionID == "" {
332 restSubscription, err = c.registry.GetRESTSubscription(prevRestSubsId, false)
333 if restSubscription != nil {
334 restSubId = prevRestSubsId
336 xapp.Logger.Info("Existing restSubId %s found by MD5sum %s for a request without subscription ID - using previous subscription", prevRestSubsId, md5sum)
338 xapp.Logger.Info("Existing restSubId %s found by MD5sum %s for a request without subscription ID - Note: %s", prevRestSubsId, md5sum, err.Error())
341 xapp.Logger.Info("None existing restSubId %s referred by MD5sum %s for a request without subscription ID - deleting cached entry", prevRestSubsId, md5sum)
342 restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
346 if restSubscription == nil {
347 restSubId = ksuid.New().String()
348 restSubscription, err = c.registry.CreateRESTSubscription(&restSubId, &xAppRmrEndpoint, p.Meid)
350 xapp.Logger.Error("%s", err.Error())
351 c.UpdateCounter(cRestSubFailToXapp)
356 restSubId = p.SubscriptionID
358 xapp.Logger.Info("RestSubscription ID %s provided via REST request", restSubId)
360 restSubscription, err = c.registry.GetRESTSubscription(restSubId, false)
362 xapp.Logger.Error("%s", err.Error())
363 c.UpdateCounter(cRestSubFailToXapp)
368 xapp.Logger.Info("Existing restSubscription found for ID %s, new request based on md5sum", restSubId)
370 xapp.Logger.Info("Existing restSubscription found for ID %s(%s), re-transmission based on md5sum match with previous request", prevRestSubsId, restSubId)
374 return restSubscription, restSubId, nil
377 //-------------------------------------------------------------------
379 //-------------------------------------------------------------------
380 func (c *Control) SubscriptionHandler(params interface{}) (*models.SubscriptionResponse, error) {
383 c.UpdateCounter(cRestSubReqFromXapp)
385 subResp := models.SubscriptionResponse{}
386 p := params.(*models.SubscriptionParams)
388 if c.LoggerLevel > 2 {
389 c.PrintRESTSubscriptionRequest(p)
392 if p.ClientEndpoint == nil {
393 xapp.Logger.Error("ClientEndpoint == nil")
394 c.UpdateCounter(cRestSubFailToXapp)
395 return nil, fmt.Errorf("")
398 _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*p.ClientEndpoint)
400 xapp.Logger.Error("%s", err.Error())
401 c.UpdateCounter(cRestSubFailToXapp)
405 md5sum, err := CalculateRequestMd5sum(params)
407 xapp.Logger.Error("Failed to generate md5sum from incoming request - %s", err.Error())
410 restSubscription, restSubId, err := c.GetOrCreateRestSubscription(p, md5sum, xAppRmrEndpoint)
412 xapp.Logger.Error("Failed to get/allocate REST subscription")
416 subResp.SubscriptionID = &restSubId
417 subReqList := e2ap.SubscriptionRequestList{}
418 err = c.e2ap.FillSubscriptionReqMsgs(params, &subReqList, restSubscription)
420 xapp.Logger.Error("%s", err.Error())
421 restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
422 c.registry.DeleteRESTSubscription(&restSubId)
423 c.UpdateCounter(cRestSubFailToXapp)
427 duplicate := restDuplicateCtrl.IsDuplicateToOngoingTransaction(restSubId, md5sum)
429 xapp.Logger.Info("Retransmission blocker direct ACK for request of restSubsId %s restSubId MD5sum %s as retransmission", restSubId, md5sum)
430 c.UpdateCounter(cRestSubRespToXapp)
434 go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId, xAppRmrEndpoint, md5sum)
436 c.UpdateCounter(cRestSubRespToXapp)
440 //-------------------------------------------------------------------
442 //-------------------------------------------------------------------
444 func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription, subReqList *e2ap.SubscriptionRequestList,
445 clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string, xAppRmrEndpoint string, md5sum string) {
447 xapp.Logger.Info("Subscription Request count=%v ", len(subReqList.E2APSubscriptionRequests))
449 var xAppEventInstanceID int64
450 var e2EventInstanceID int64
452 defer restDuplicateCtrl.SetMd5sumFromLastOkRequest(*restSubId, md5sum)
454 for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ {
455 subReqMsg := subReqList.E2APSubscriptionRequests[index]
456 xAppEventInstanceID = (int64)(subReqMsg.RequestId.Id)
458 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), *restSubId, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid})
460 // Send notification to xApp that prosessing of a Subscription Request has failed.
461 err := fmt.Errorf("Tracking failure")
462 c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans)
466 xapp.Logger.Info("Handle SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
468 subRespMsg, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, *restSubId)
470 xapp.Logger.Info("Handled SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
473 c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans)
475 e2EventInstanceID = (int64)(subRespMsg.RequestId.InstanceId)
476 restSubscription.AddMd5Sum(md5sum)
477 xapp.Logger.Info("SubscriptionRequest index=%v processed successfullyfor %s. endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
478 index, *restSubId, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
479 c.sendSuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, e2EventInstanceID, clientEndpoint, trans)
485 //-------------------------------------------------------------------
487 //------------------------------------------------------------------
488 func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, meid *string,
489 restSubId string) (*e2ap.E2APSubscriptionResponse, error) {
491 err := c.tracker.Track(trans)
493 xapp.Logger.Error("XAPP-SubReq Tracking error: %s", idstring(err, trans))
494 err = fmt.Errorf("Tracking failure")
498 subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
500 xapp.Logger.Error("XAPP-SubReq Assign error: %s", idstring(err, trans))
507 go c.handleSubscriptionCreate(subs, trans)
508 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
512 switch themsg := event.(type) {
513 case *e2ap.E2APSubscriptionResponse:
516 case *e2ap.E2APSubscriptionFailure:
517 err = fmt.Errorf("E2 SubscriptionFailure received")
520 err = fmt.Errorf("unexpected E2 subscription response received")
524 err = fmt.Errorf("E2 subscription response timeout")
527 xapp.Logger.Error("XAPP-SubReq E2 subscription failed %s", idstring(err, trans, subs))
528 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
532 //-------------------------------------------------------------------
534 //-------------------------------------------------------------------
535 func (c *Control) sendUnsuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, err error,
536 clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp) {
538 // Send notification to xApp that prosessing of a Subscription Request has failed.
539 e2EventInstanceID := (int64)(0)
540 errorCause := err.Error()
541 resp := &models.SubscriptionResponse{
542 SubscriptionID: restSubId,
543 SubscriptionInstances: []*models.SubscriptionInstance{
544 &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
545 ErrorCause: &errorCause,
546 XappEventInstanceID: &xAppEventInstanceID},
549 // Mark REST subscription request processed.
550 restSubscription.SetProcessed(err)
551 c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
553 xapp.Logger.Info("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
554 errorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
556 xapp.Logger.Info("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v",
557 errorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID)
560 c.UpdateCounter(cRestSubFailNotifToXapp)
561 xapp.Subscription.Notify(resp, *clientEndpoint)
564 //-------------------------------------------------------------------
566 //-------------------------------------------------------------------
567 func (c *Control) sendSuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, e2EventInstanceID int64,
568 clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp) {
570 // Store successfully processed InstanceId for deletion
571 restSubscription.AddE2InstanceId((uint32)(e2EventInstanceID))
572 restSubscription.AddXappIdToE2Id(xAppEventInstanceID, e2EventInstanceID)
574 // Send notification to xApp that a Subscription Request has been processed.
575 resp := &models.SubscriptionResponse{
576 SubscriptionID: restSubId,
577 SubscriptionInstances: []*models.SubscriptionInstance{
578 &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
580 XappEventInstanceID: &xAppEventInstanceID},
583 // Mark REST subscription request processesd.
584 restSubscription.SetProcessed(nil)
585 c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
586 xapp.Logger.Info("Sending successful REST notification to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
587 clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
589 c.UpdateCounter(cRestSubNotifToXapp)
590 xapp.Subscription.Notify(resp, *clientEndpoint)
593 //-------------------------------------------------------------------
595 //-------------------------------------------------------------------
596 func (c *Control) SubscriptionDeleteHandlerCB(restSubId string) error {
599 c.UpdateCounter(cRestSubDelReqFromXapp)
601 xapp.Logger.Info("SubscriptionDeleteRequest from XAPP")
603 restSubscription, err := c.registry.GetRESTSubscription(restSubId, true)
605 xapp.Logger.Error("%s", err.Error())
606 if restSubscription == nil {
607 // Subscription was not found
610 if restSubscription.SubReqOngoing == true {
611 err := fmt.Errorf("Handling of the REST Subscription Request still ongoing %s", restSubId)
612 xapp.Logger.Error("%s", err.Error())
614 } else if restSubscription.SubDelReqOngoing == true {
615 // Previous request for same restSubId still ongoing
621 xAppRmrEndPoint := restSubscription.xAppRmrEndPoint
623 xapp.Logger.Info("Deleteting handler: processing instances = %v", restSubscription.InstanceIds)
624 for _, instanceId := range restSubscription.InstanceIds {
625 xAppEventInstanceID, err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId)
628 xapp.Logger.Error("%s", err.Error())
631 xapp.Logger.Info("Deleteting instanceId = %v", instanceId)
632 restSubscription.DeleteXappIdToE2Id(xAppEventInstanceID)
633 restSubscription.DeleteE2InstanceId(instanceId)
635 restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(restSubscription.lastReqMd5sum)
636 c.registry.DeleteRESTSubscription(&restSubId)
637 c.RemoveRESTSubscriptionFromDb(restSubId)
640 c.UpdateCounter(cRestSubDelRespToXapp)
645 //-------------------------------------------------------------------
647 //-------------------------------------------------------------------
648 func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32) (int64, error) {
650 var xAppEventInstanceID int64
651 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{instanceId})
653 xapp.Logger.Info("Subscription Delete Handler subscription for restSubId=%v, E2EventInstanceID=%v not found %s",
654 restSubId, instanceId, idstring(err, nil))
655 return xAppEventInstanceID, nil
658 xAppEventInstanceID = int64(subs.ReqId.Id)
659 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(*endPoint), *restSubId, e2ap.RequestId{subs.ReqId.Id, 0}, &xapp.RMRMeid{RanName: *meid})
661 err := fmt.Errorf("XAPP-SubDelReq transaction not created. restSubId %s, endPoint %s, meid %s, instanceId %v", *restSubId, *endPoint, *meid, instanceId)
662 xapp.Logger.Error("%s", err.Error())
664 defer trans.Release()
666 err = c.tracker.Track(trans)
668 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
669 xapp.Logger.Error("%s", err.Error())
670 return xAppEventInstanceID, &time.ParseError{}
675 go c.handleSubscriptionDelete(subs, trans)
676 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
678 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
680 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
682 return xAppEventInstanceID, nil
685 //-------------------------------------------------------------------
687 //-------------------------------------------------------------------
688 func (c *Control) QueryHandler() (models.SubscriptionList, error) {
689 xapp.Logger.Info("QueryHandler() called")
693 return c.registry.QueryHandler()
696 func (c *Control) TestRestHandler(w http.ResponseWriter, r *http.Request) {
697 xapp.Logger.Info("TestRestHandler() called")
699 pathParams := mux.Vars(r)
700 s := pathParams["testId"]
702 // This can be used to delete single subscription from db
703 if contains := strings.Contains(s, "deletesubid="); contains == true {
704 var splits = strings.Split(s, "=")
705 if subId, err := strconv.ParseInt(splits[1], 10, 64); err == nil {
706 xapp.Logger.Info("RemoveSubscriptionFromSdl() called. subId = %v", subId)
707 c.RemoveSubscriptionFromSdl(uint32(subId))
712 // This can be used to remove all subscriptions db from
714 xapp.Logger.Info("RemoveAllSubscriptionsFromSdl() called")
715 c.RemoveAllSubscriptionsFromSdl()
716 c.RemoveAllRESTSubscriptionsFromSdl()
720 // This is meant to cause submgr's restart in testing
722 xapp.Logger.Info("os.Exit(1) called")
726 xapp.Logger.Info("Unsupported rest command received %s", s)
729 //-------------------------------------------------------------------
731 //-------------------------------------------------------------------
733 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
734 params := &xapp.RMRParams{}
735 params.Mtype = trans.GetMtype()
736 params.SubId = int(subs.GetReqId().InstanceId)
738 params.Meid = subs.GetMeid()
740 params.PayloadLen = len(trans.Payload.Buf)
741 params.Payload = trans.Payload.Buf
743 xapp.Logger.Info("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
744 err = c.SendWithRetry(params, false, 5)
746 xapp.Logger.Error("rmrSendToE2T: Send failed: %+v", err)
751 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
753 params := &xapp.RMRParams{}
754 params.Mtype = trans.GetMtype()
755 params.SubId = int(subs.GetReqId().InstanceId)
756 params.Xid = trans.GetXid()
757 params.Meid = trans.GetMeid()
759 params.PayloadLen = len(trans.Payload.Buf)
760 params.Payload = trans.Payload.Buf
762 xapp.Logger.Info("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
763 err = c.SendWithRetry(params, false, 5)
765 xapp.Logger.Error("rmrSendToXapp: Send failed: %+v", err)
770 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
771 if c.RMRClient == nil {
772 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
773 xapp.Logger.Error("%s", err.Error())
778 defer c.RMRClient.Free(msg.Mbuf)
780 // xapp-frame might use direct access to c buffer and
781 // when msg.Mbuf is freed, someone might take it into use
782 // and payload data might be invalid inside message handle function
784 // subscriptions won't load system a lot so there is no
785 // real performance hit by cloning buffer into new go byte slice
786 cPay := append(msg.Payload[:0:0], msg.Payload...)
788 msg.PayloadLen = len(cPay)
791 case xapp.RIC_SUB_REQ:
792 go c.handleXAPPSubscriptionRequest(msg)
793 case xapp.RIC_SUB_RESP:
794 go c.handleE2TSubscriptionResponse(msg)
795 case xapp.RIC_SUB_FAILURE:
796 go c.handleE2TSubscriptionFailure(msg)
797 case xapp.RIC_SUB_DEL_REQ:
798 go c.handleXAPPSubscriptionDeleteRequest(msg)
799 case xapp.RIC_SUB_DEL_RESP:
800 go c.handleE2TSubscriptionDeleteResponse(msg)
801 case xapp.RIC_SUB_DEL_FAILURE:
802 go c.handleE2TSubscriptionDeleteFailure(msg)
804 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
809 //-------------------------------------------------------------------
810 // handle from XAPP Subscription Request
811 //------------------------------------------------------------------
812 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
813 xapp.Logger.Info("MSG from XAPP: %s", params.String())
814 c.UpdateCounter(cSubReqFromXapp)
816 subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
818 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
822 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
824 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
827 defer trans.Release()
829 if err = c.tracker.Track(trans); err != nil {
830 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
834 //TODO handle subscription toward e2term inside AssignToSubscription / hide handleSubscriptionCreate in it?
835 subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
837 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
841 c.wakeSubscriptionRequest(subs, trans)
844 //-------------------------------------------------------------------
845 // Wake Subscription Request to E2node
846 //------------------------------------------------------------------
847 func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) {
849 go c.handleSubscriptionCreate(subs, trans)
850 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
853 switch themsg := event.(type) {
854 case *e2ap.E2APSubscriptionResponse:
855 themsg.RequestId.Id = trans.RequestId.Id
856 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
859 c.UpdateCounter(cSubRespToXapp)
860 c.rmrSendToXapp("", subs, trans)
863 case *e2ap.E2APSubscriptionFailure:
864 themsg.RequestId.Id = trans.RequestId.Id
865 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
867 c.UpdateCounter(cSubFailToXapp)
868 c.rmrSendToXapp("", subs, trans)
874 xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(err, trans, subs))
875 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
878 //-------------------------------------------------------------------
879 // handle from XAPP Subscription Delete Request
880 //------------------------------------------------------------------
881 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
882 xapp.Logger.Info("MSG from XAPP: %s", params.String())
883 c.UpdateCounter(cSubDelReqFromXapp)
885 subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
887 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
891 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
893 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
896 defer trans.Release()
898 err = c.tracker.Track(trans)
900 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
904 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
906 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
913 go c.handleSubscriptionDelete(subs, trans)
914 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
916 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
918 if subs.NoRespToXapp == true {
919 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
923 // Whatever is received success, fail or timeout, send successful delete response
924 subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
925 subDelRespMsg.RequestId.Id = trans.RequestId.Id
926 subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
927 subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
928 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
930 c.UpdateCounter(cSubDelRespToXapp)
931 c.rmrSendToXapp("", subs, trans)
934 //TODO handle subscription toward e2term insiged RemoveFromSubscription / hide handleSubscriptionDelete in it?
935 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
938 //-------------------------------------------------------------------
939 // SUBS CREATE Handling
940 //-------------------------------------------------------------------
941 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp) {
943 var removeSubscriptionFromDb bool = false
944 trans := c.tracker.NewSubsTransaction(subs)
945 subs.WaitTransactionTurn(trans)
946 defer subs.ReleaseTransactionTurn(trans)
947 defer trans.Release()
949 xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
951 subRfMsg, valid := subs.GetCachedResponse()
952 if subRfMsg == nil && valid == true {
953 event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
954 switch event.(type) {
955 case *e2ap.E2APSubscriptionResponse:
956 subRfMsg, valid = subs.SetCachedResponse(event, true)
957 subs.SubRespRcvd = true
958 case *e2ap.E2APSubscriptionFailure:
959 removeSubscriptionFromDb = true
960 subRfMsg, valid = subs.SetCachedResponse(event, false)
961 xapp.Logger.Info("SUBS-SubReq: internal delete due failure event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
962 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
963 case *SubmgrRestartTestEvent:
964 // This simulates that no response has been received and after restart subscriptions are restored from db
965 xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
968 xapp.Logger.Info("SUBS-SubReq: internal delete due default event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
969 removeSubscriptionFromDb = true
970 subRfMsg, valid = subs.SetCachedResponse(nil, false)
971 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
973 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
975 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
978 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
980 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
983 c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
984 parentTrans.SendEvent(subRfMsg, 0)
987 //-------------------------------------------------------------------
988 // SUBS DELETE Handling
989 //-------------------------------------------------------------------
991 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
993 trans := c.tracker.NewSubsTransaction(subs)
994 subs.WaitTransactionTurn(trans)
995 defer subs.ReleaseTransactionTurn(trans)
996 defer trans.Release()
998 xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
1002 if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
1005 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1009 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
1010 // If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if
1011 // RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...))
1012 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
1013 c.registry.UpdateSubscriptionToDb(subs, c)
1014 parentTrans.SendEvent(nil, 0)
1017 //-------------------------------------------------------------------
1018 // send to E2T Subscription Request
1019 //-------------------------------------------------------------------
1020 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
1022 var event interface{} = nil
1023 var timedOut bool = false
1024 const ricRequestorId = 123
1026 subReqMsg := subs.SubReqMsg
1027 subReqMsg.RequestId = subs.GetReqId().RequestId
1028 subReqMsg.RequestId.Id = ricRequestorId
1029 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
1031 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
1035 // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
1036 c.WriteSubscriptionToDb(subs)
1038 for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
1039 desc := fmt.Sprintf("(retry %d)", retries)
1041 c.UpdateCounter(cSubReqToE2)
1043 c.UpdateCounter(cSubReReqToE2)
1045 c.rmrSendToE2T(desc, subs, trans)
1046 if subs.DoNotWaitSubResp == false {
1047 event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
1049 c.UpdateCounter(cSubReqTimerExpiry)
1053 // Simulating case where subscrition request has been sent but response has not been received before restart
1054 event = &SubmgrRestartTestEvent{}
1058 xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1062 //-------------------------------------------------------------------
1063 // send to E2T Subscription Delete Request
1064 //-------------------------------------------------------------------
1066 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
1068 var event interface{}
1070 const ricRequestorId = 123
1072 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1073 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1074 subDelReqMsg.RequestId.Id = ricRequestorId
1075 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1076 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1078 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
1082 for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
1083 desc := fmt.Sprintf("(retry %d)", retries)
1085 c.UpdateCounter(cSubDelReqToE2)
1087 c.UpdateCounter(cSubDelReReqToE2)
1089 c.rmrSendToE2T(desc, subs, trans)
1090 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
1092 c.UpdateCounter(cSubDelReqTimerExpiry)
1097 xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1101 //-------------------------------------------------------------------
1102 // handle from E2T Subscription Response
1103 //-------------------------------------------------------------------
1104 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
1105 xapp.Logger.Info("MSG from E2T: %s", params.String())
1106 c.UpdateCounter(cSubRespFromE2)
1108 subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
1110 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
1113 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
1115 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
1118 trans := subs.GetTransaction()
1120 err = fmt.Errorf("Ongoing transaction not found")
1121 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
1124 sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
1125 if sendOk == false {
1126 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1127 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
1132 //-------------------------------------------------------------------
1133 // handle from E2T Subscription Failure
1134 //-------------------------------------------------------------------
1135 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
1136 xapp.Logger.Info("MSG from E2T: %s", params.String())
1137 c.UpdateCounter(cSubFailFromE2)
1138 subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
1140 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
1143 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
1145 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
1148 trans := subs.GetTransaction()
1150 err = fmt.Errorf("Ongoing transaction not found")
1151 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
1154 sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
1155 if sendOk == false {
1156 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1157 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
1162 //-------------------------------------------------------------------
1163 // handle from E2T Subscription Delete Response
1164 //-------------------------------------------------------------------
1165 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
1166 xapp.Logger.Info("MSG from E2T: %s", params.String())
1167 c.UpdateCounter(cSubDelRespFromE2)
1168 subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
1170 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1173 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
1175 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1178 trans := subs.GetTransaction()
1180 err = fmt.Errorf("Ongoing transaction not found")
1181 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
1184 sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
1185 if sendOk == false {
1186 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1187 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
1192 //-------------------------------------------------------------------
1193 // handle from E2T Subscription Delete Failure
1194 //-------------------------------------------------------------------
1195 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
1196 xapp.Logger.Info("MSG from E2T: %s", params.String())
1197 c.UpdateCounter(cSubDelFailFromE2)
1198 subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
1200 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1203 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
1205 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1208 trans := subs.GetTransaction()
1210 err = fmt.Errorf("Ongoing transaction not found")
1211 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
1214 sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
1215 if sendOk == false {
1216 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1217 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
1222 //-------------------------------------------------------------------
1224 //-------------------------------------------------------------------
1225 func typeofSubsMessage(v interface{}) string {
1230 //case *e2ap.E2APSubscriptionRequest:
1232 case *e2ap.E2APSubscriptionResponse:
1234 case *e2ap.E2APSubscriptionFailure:
1236 //case *e2ap.E2APSubscriptionDeleteRequest:
1237 // return "SubDelReq"
1238 case *e2ap.E2APSubscriptionDeleteResponse:
1240 case *e2ap.E2APSubscriptionDeleteFailure:
1247 //-------------------------------------------------------------------
1249 //-------------------------------------------------------------------
1250 func (c *Control) WriteSubscriptionToDb(subs *Subscription) {
1251 xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
1252 err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
1254 xapp.Logger.Error("%v", err)
1258 //-------------------------------------------------------------------
1260 //-------------------------------------------------------------------
1261 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) {
1263 if removeSubscriptionFromDb == true {
1264 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1265 c.RemoveSubscriptionFromDb(subs)
1267 // Update is needed for successful response and merge case here
1268 if subs.RetryFromXapp == false {
1269 c.WriteSubscriptionToDb(subs)
1272 subs.RetryFromXapp = false
1275 //-------------------------------------------------------------------
1277 //-------------------------------------------------------------------
1278 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
1279 xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
1280 err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
1282 xapp.Logger.Error("%v", err)
1286 //-------------------------------------------------------------------
1288 //-------------------------------------------------------------------
1289 func (c *Control) WriteRESTSubscriptionToDb(restSubId string, restSubs *RESTSubscription) {
1290 xapp.Logger.Debug("WriteRESTSubscriptionToDb() restSubId = %s", restSubId)
1291 err := c.WriteRESTSubscriptionToSdl(restSubId, restSubs)
1293 xapp.Logger.Error("%v", err)
1297 //-------------------------------------------------------------------
1299 //-------------------------------------------------------------------
1300 func (c *Control) UpdateRESTSubscriptionInDB(restSubId string, restSubs *RESTSubscription, removeRestSubscriptionFromDb bool) {
1302 if removeRestSubscriptionFromDb == true {
1303 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1304 c.RemoveRESTSubscriptionFromDb(restSubId)
1306 c.WriteRESTSubscriptionToDb(restSubId, restSubs)
1310 //-------------------------------------------------------------------
1312 //-------------------------------------------------------------------
1313 func (c *Control) RemoveRESTSubscriptionFromDb(restSubId string) {
1314 xapp.Logger.Debug("RemoveRESTSubscriptionFromDb() restSubId = %s", restSubId)
1315 err := c.RemoveRESTSubscriptionFromSdl(restSubId)
1317 xapp.Logger.Error("%v", err)
1321 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) {
1323 const ricRequestorId = 123
1324 xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
1326 // Send delete for every endpoint in the subscription
1327 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1328 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1329 subDelReqMsg.RequestId.Id = ricRequestorId
1330 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1331 mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1333 xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
1336 for _, endPoint := range subs.EpList.Endpoints {
1337 params := &xapp.RMRParams{}
1338 params.Mtype = mType
1339 params.SubId = int(subs.GetReqId().InstanceId)
1341 params.Meid = subs.Meid
1342 params.Src = endPoint.String()
1343 params.PayloadLen = len(payload.Buf)
1344 params.Payload = payload.Buf
1346 subs.DeleteFromDb = true
1347 c.handleXAPPSubscriptionDeleteRequest(params)
1351 func (c *Control) PrintRESTSubscriptionRequest(p *models.SubscriptionParams) {
1353 fmt.Println("CRESTSubscriptionRequest")
1359 if p.SubscriptionID != "" {
1360 fmt.Println(" SubscriptionID = ", p.SubscriptionID)
1362 fmt.Println(" SubscriptionID = ''")
1365 fmt.Printf(" ClientEndpoint.Host = %s\n", p.ClientEndpoint.Host)
1367 if p.ClientEndpoint.HTTPPort != nil {
1368 fmt.Printf(" ClientEndpoint.HTTPPort = %v\n", *p.ClientEndpoint.HTTPPort)
1370 fmt.Println(" ClientEndpoint.HTTPPort = nil")
1373 if p.ClientEndpoint.RMRPort != nil {
1374 fmt.Printf(" ClientEndpoint.RMRPort = %v\n", *p.ClientEndpoint.RMRPort)
1376 fmt.Println(" ClientEndpoint.RMRPort = nil")
1380 fmt.Printf(" Meid = %s\n", *p.Meid)
1382 fmt.Println(" Meid = nil")
1385 for _, subscriptionDetail := range p.SubscriptionDetails {
1386 if p.RANFunctionID != nil {
1387 fmt.Printf(" RANFunctionID = %v\n", *p.RANFunctionID)
1389 fmt.Println(" RANFunctionID = nil")
1391 fmt.Printf(" SubscriptionDetail.XappEventInstanceID = %v\n", *subscriptionDetail.XappEventInstanceID)
1392 fmt.Printf(" SubscriptionDetail.EventTriggers = %v\n", subscriptionDetail.EventTriggers)
1394 for _, actionToBeSetup := range subscriptionDetail.ActionToBeSetupList {
1395 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionID = %v\n", *actionToBeSetup.ActionID)
1396 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionType = %s\n", *actionToBeSetup.ActionType)
1397 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionDefinition = %v\n", actionToBeSetup.ActionDefinition)
1399 if actionToBeSetup.SubsequentAction != nil {
1400 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.SubsequentAction.SubsequentActionType = %s\n", *actionToBeSetup.SubsequentAction.SubsequentActionType)
1401 fmt.Printf(" SubscriptionDetail.ActionToBeSetup..SubsequentAction.TimeToWait = %s\n", *actionToBeSetup.SubsequentAction.TimeToWait)
1403 fmt.Println(" SubscriptionDetail.ActionToBeSetup.SubsequentAction = nil")