2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
30 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
31 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
32 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
33 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
34 httptransport "github.com/go-openapi/runtime/client"
35 "github.com/go-openapi/strfmt"
36 "github.com/gorilla/mux"
37 "github.com/segmentio/ksuid"
38 "github.com/spf13/viper"
41 //-----------------------------------------------------------------------------
43 //-----------------------------------------------------------------------------
45 func idstring(err error, entries ...fmt.Stringer) string {
46 var retval string = ""
47 var filler string = ""
48 for _, entry := range entries {
50 retval += filler + entry.String()
53 retval += filler + "(NIL)"
57 retval += filler + "err(" + err.Error() + ")"
63 //-----------------------------------------------------------------------------
65 //-----------------------------------------------------------------------------
67 var e2tSubReqTimeout time.Duration
68 var e2tSubDelReqTime time.Duration
69 var e2tRecvMsgTimeout time.Duration
70 var waitRouteCleanup_ms time.Duration
71 var e2tMaxSubReqTryCount uint64 // Initial try + retry
72 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
73 var readSubsFromDb string
74 var restDuplicateCtrl duplicateCtrl
75 var dbRetryForever string
84 restSubsDb Sdlnterface
87 Counters map[string]xapp.Counter
97 type SubmgrRestartTestEvent struct{}
98 type SubmgrRestartUpEvent struct{}
101 xapp.Logger.Info("SUBMGR")
103 viper.SetEnvPrefix("submgr")
104 viper.AllowEmptyEnv(true)
107 func NewControl() *Control {
109 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
110 rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
112 registry := new(Registry)
113 registry.Initialize()
114 registry.rtmgrClient = &rtmgrClient
116 tracker := new(Tracker)
119 c := &Control{e2ap: new(E2ap),
122 e2SubsDb: CreateSdl(),
123 restSubsDb: CreateRESTSdl(),
124 Counters: xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
127 c.ReadConfigParameters("")
129 // Register REST handler for testing support
130 xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
131 xapp.Resource.InjectRoute("/ric/v1/restsubscriptions", c.GetAllRestSubscriptions, "GET")
132 xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET")
134 go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler, c.SubscriptionDeleteHandlerCB)
136 if readSubsFromDb == "false" {
140 restDuplicateCtrl.Init()
142 // Read subscriptions from db
143 c.ReadE2Subscriptions()
144 c.ReadRESTSubscriptions()
148 func (c *Control) SymptomDataHandler(w http.ResponseWriter, r *http.Request) {
149 subscriptions, _ := c.registry.QueryHandler()
150 xapp.Resource.SendSymptomDataJson(w, r, subscriptions, "platform/subscriptions.json")
153 //-------------------------------------------------------------------
155 //-------------------------------------------------------------------
156 func (c *Control) GetAllRestSubscriptions(w http.ResponseWriter, r *http.Request) {
157 xapp.Logger.Info("GetAllRestSubscriptions() called")
158 response := c.registry.GetAllRestSubscriptions()
162 //-------------------------------------------------------------------
164 //-------------------------------------------------------------------
165 func (c *Control) ReadE2Subscriptions() error {
168 var register map[uint32]*Subscription
169 for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
170 xapp.Logger.Info("Reading E2 subscriptions from db")
171 subIds, register, err = c.ReadAllSubscriptionsFromSdl()
173 xapp.Logger.Error("%v", err)
174 <-time.After(1 * time.Second)
176 c.registry.subIds = subIds
177 c.registry.register = register
178 c.HandleUncompletedSubscriptions(register)
182 xapp.Logger.Info("Continuing without retring")
186 //-------------------------------------------------------------------
188 //-------------------------------------------------------------------
189 func (c *Control) ReadRESTSubscriptions() error {
191 var restSubscriptions map[string]*RESTSubscription
192 for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
193 xapp.Logger.Info("Reading REST subscriptions from db")
194 restSubscriptions, err = c.ReadAllRESTSubscriptionsFromSdl()
196 xapp.Logger.Error("%v", err)
197 <-time.After(1 * time.Second)
199 c.registry.restSubscriptions = restSubscriptions
203 xapp.Logger.Info("Continuing without retring")
207 //-------------------------------------------------------------------
209 //-------------------------------------------------------------------
210 func (c *Control) ReadConfigParameters(f string) {
212 // viper.GetDuration returns nanoseconds
213 e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
214 if e2tSubReqTimeout == 0 {
215 e2tSubReqTimeout = 2000 * 1000000
217 xapp.Logger.Info("e2tSubReqTimeout %v", e2tSubReqTimeout)
218 e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
219 if e2tSubDelReqTime == 0 {
220 e2tSubDelReqTime = 2000 * 1000000
222 xapp.Logger.Info("e2tSubDelReqTime %v", e2tSubDelReqTime)
223 e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
224 if e2tRecvMsgTimeout == 0 {
225 e2tRecvMsgTimeout = 2000 * 1000000
227 xapp.Logger.Info("e2tRecvMsgTimeout %v", e2tRecvMsgTimeout)
229 e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
230 if e2tMaxSubReqTryCount == 0 {
231 e2tMaxSubReqTryCount = 1
233 xapp.Logger.Info("e2tMaxSubReqTryCount %v", e2tMaxSubReqTryCount)
235 e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
236 if e2tMaxSubDelReqTryCount == 0 {
237 e2tMaxSubDelReqTryCount = 1
239 xapp.Logger.Info("e2tMaxSubDelReqTryCount %v", e2tMaxSubDelReqTryCount)
241 readSubsFromDb = viper.GetString("controls.readSubsFromDb")
242 if readSubsFromDb == "" {
243 readSubsFromDb = "true"
245 xapp.Logger.Info("readSubsFromDb %v", readSubsFromDb)
247 dbTryCount = viper.GetInt("controls.dbTryCount")
251 xapp.Logger.Info("dbTryCount %v", dbTryCount)
253 dbRetryForever = viper.GetString("controls.dbRetryForever")
254 if dbRetryForever == "" {
255 dbRetryForever = "true"
257 xapp.Logger.Info("dbRetryForever %v", dbRetryForever)
259 c.LoggerLevel = viper.GetUint32("logger.level")
260 if c.LoggerLevel == 0 {
263 xapp.Logger.Info("LoggerLevel %v", c.LoggerLevel)
265 // Internal cfg parameter, used to define a wait time for RMR route clean-up. None default
266 // value 100ms used currently only in unittests.
267 waitRouteCleanup_ms = viper.GetDuration("controls.waitRouteCleanup_ms") * 1000000
268 if waitRouteCleanup_ms == 0 {
269 waitRouteCleanup_ms = 5000 * 1000000
271 xapp.Logger.Info("waitRouteCleanup %v", waitRouteCleanup_ms)
274 //-------------------------------------------------------------------
276 //-------------------------------------------------------------------
277 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
279 xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
280 for subId, subs := range register {
281 if subs.SubRespRcvd == false {
282 subs.NoRespToXapp = true
283 xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
284 c.SendSubscriptionDeleteReq(subs)
289 func (c *Control) ReadyCB(data interface{}) {
290 if c.RMRClient == nil {
291 c.RMRClient = xapp.Rmr
295 func (c *Control) Run() {
296 xapp.SetReadyCB(c.ReadyCB, nil)
297 xapp.AddConfigChangeListener(c.ReadConfigParameters)
301 //-------------------------------------------------------------------
303 //-------------------------------------------------------------------
304 func (c *Control) GetOrCreateRestSubscription(p *models.SubscriptionParams, md5sum string, xAppRmrEndpoint string) (*RESTSubscription, string, error) {
307 var restSubscription *RESTSubscription
310 prevRestSubsId, exists := restDuplicateCtrl.GetLastKnownRestSubsIdBasedOnMd5sum(md5sum)
311 if p.SubscriptionID == "" {
313 restSubscription, err = c.registry.GetRESTSubscription(prevRestSubsId, false)
314 if restSubscription != nil {
315 restSubId = prevRestSubsId
317 xapp.Logger.Info("Existing restSubId %s found by MD5sum %s for a request without subscription ID - using previous subscription", prevRestSubsId, md5sum)
319 xapp.Logger.Info("Existing restSubId %s found by MD5sum %s for a request without subscription ID - Note: %s", prevRestSubsId, md5sum, err.Error())
322 xapp.Logger.Info("None existing restSubId %s referred by MD5sum %s for a request without subscription ID - deleting cached entry", prevRestSubsId, md5sum)
323 restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
327 if restSubscription == nil {
328 restSubId = ksuid.New().String()
329 restSubscription, err = c.registry.CreateRESTSubscription(&restSubId, &xAppRmrEndpoint, p.Meid)
331 xapp.Logger.Error("%s", err.Error())
332 c.UpdateCounter(cRestSubFailToXapp)
337 restSubId = p.SubscriptionID
339 xapp.Logger.Info("RestSubscription ID %s provided via REST request", restSubId)
341 restSubscription, err = c.registry.GetRESTSubscription(restSubId, false)
343 xapp.Logger.Error("%s", err.Error())
344 c.UpdateCounter(cRestSubFailToXapp)
349 xapp.Logger.Info("Existing restSubscription found for ID %s, new request based on md5sum", restSubId)
351 xapp.Logger.Info("Existing restSubscription found for ID %s(%s), re-transmission based on md5sum match with previous request", prevRestSubsId, restSubId)
355 return restSubscription, restSubId, nil
358 //-------------------------------------------------------------------
360 //-------------------------------------------------------------------
361 func (c *Control) SubscriptionHandler(params interface{}) (*models.SubscriptionResponse, error) {
364 c.UpdateCounter(cRestSubReqFromXapp)
366 subResp := models.SubscriptionResponse{}
367 p := params.(*models.SubscriptionParams)
369 if c.LoggerLevel > 2 {
370 c.PrintRESTSubscriptionRequest(p)
373 if p.ClientEndpoint == nil {
374 xapp.Logger.Error("ClientEndpoint == nil")
375 c.UpdateCounter(cRestSubFailToXapp)
376 return nil, fmt.Errorf("")
379 _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*p.ClientEndpoint)
381 xapp.Logger.Error("%s", err.Error())
382 c.UpdateCounter(cRestSubFailToXapp)
386 md5sum, err := CalculateRequestMd5sum(params)
388 xapp.Logger.Error("Failed to generate md5sum from incoming request - %s", err.Error())
391 restSubscription, restSubId, err := c.GetOrCreateRestSubscription(p, md5sum, xAppRmrEndpoint)
393 xapp.Logger.Error("Failed to get/allocate REST subscription")
397 subResp.SubscriptionID = &restSubId
398 subReqList := e2ap.SubscriptionRequestList{}
399 err = c.e2ap.FillSubscriptionReqMsgs(params, &subReqList, restSubscription)
401 xapp.Logger.Error("%s", err.Error())
402 restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
403 c.registry.DeleteRESTSubscription(&restSubId)
404 c.UpdateCounter(cRestSubFailToXapp)
408 duplicate := restDuplicateCtrl.IsDuplicateToOngoingTransaction(restSubId, md5sum)
410 xapp.Logger.Info("Retransmission blocker direct ACK for request of restSubsId %s restSubId MD5sum %s as retransmission", restSubId, md5sum)
411 c.UpdateCounter(cRestSubRespToXapp)
415 c.WriteRESTSubscriptionToDb(restSubId, restSubscription)
417 go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId, xAppRmrEndpoint, md5sum)
419 c.UpdateCounter(cRestSubRespToXapp)
423 //-------------------------------------------------------------------
425 //-------------------------------------------------------------------
427 func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription, subReqList *e2ap.SubscriptionRequestList,
428 clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string, xAppRmrEndpoint string, md5sum string) {
430 xapp.Logger.Info("Subscription Request count=%v ", len(subReqList.E2APSubscriptionRequests))
432 var xAppEventInstanceID int64
433 var e2EventInstanceID int64
435 defer restDuplicateCtrl.SetMd5sumFromLastOkRequest(*restSubId, md5sum)
437 for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ {
438 subReqMsg := subReqList.E2APSubscriptionRequests[index]
439 xAppEventInstanceID = (int64)(subReqMsg.RequestId.Id)
441 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), *restSubId, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid})
443 // Send notification to xApp that prosessing of a Subscription Request has failed.
444 err := fmt.Errorf("Tracking failure")
445 c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans)
449 xapp.Logger.Info("Handle SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
451 subRespMsg, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, *restSubId)
453 xapp.Logger.Info("Handled SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
456 c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans)
458 e2EventInstanceID = (int64)(subRespMsg.RequestId.InstanceId)
459 restSubscription.AddMd5Sum(md5sum)
460 xapp.Logger.Info("SubscriptionRequest index=%v processed successfullyfor %s. endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
461 index, *restSubId, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
462 c.sendSuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, e2EventInstanceID, clientEndpoint, trans)
468 //-------------------------------------------------------------------
470 //------------------------------------------------------------------
471 func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, meid *string,
472 restSubId string) (*e2ap.E2APSubscriptionResponse, error) {
474 err := c.tracker.Track(trans)
476 xapp.Logger.Error("XAPP-SubReq Tracking error: %s", idstring(err, trans))
477 err = fmt.Errorf("Tracking failure")
481 subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
483 xapp.Logger.Error("XAPP-SubReq Assign error: %s", idstring(err, trans))
490 go c.handleSubscriptionCreate(subs, trans)
491 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
495 switch themsg := event.(type) {
496 case *e2ap.E2APSubscriptionResponse:
499 case *e2ap.E2APSubscriptionFailure:
500 err = fmt.Errorf("E2 SubscriptionFailure received")
503 err = fmt.Errorf("unexpected E2 subscription response received")
507 err = fmt.Errorf("E2 subscription response timeout")
510 xapp.Logger.Error("XAPP-SubReq E2 subscription failed %s", idstring(err, trans, subs))
511 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
515 //-------------------------------------------------------------------
517 //-------------------------------------------------------------------
518 func (c *Control) sendUnsuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, err error,
519 clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp) {
521 // Send notification to xApp that prosessing of a Subscription Request has failed.
522 e2EventInstanceID := (int64)(0)
523 errorCause := err.Error()
524 resp := &models.SubscriptionResponse{
525 SubscriptionID: restSubId,
526 SubscriptionInstances: []*models.SubscriptionInstance{
527 &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
528 ErrorCause: &errorCause,
529 XappEventInstanceID: &xAppEventInstanceID},
532 // Mark REST subscription request processed.
533 restSubscription.SetProcessed(err)
534 c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
536 xapp.Logger.Info("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
537 errorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
539 xapp.Logger.Info("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v",
540 errorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID)
543 c.UpdateCounter(cRestSubFailNotifToXapp)
544 xapp.Subscription.Notify(resp, *clientEndpoint)
547 //-------------------------------------------------------------------
549 //-------------------------------------------------------------------
550 func (c *Control) sendSuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, e2EventInstanceID int64,
551 clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp) {
553 // Store successfully processed InstanceId for deletion
554 restSubscription.AddE2InstanceId((uint32)(e2EventInstanceID))
555 restSubscription.AddXappIdToE2Id(xAppEventInstanceID, e2EventInstanceID)
557 // Send notification to xApp that a Subscription Request has been processed.
558 resp := &models.SubscriptionResponse{
559 SubscriptionID: restSubId,
560 SubscriptionInstances: []*models.SubscriptionInstance{
561 &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
563 XappEventInstanceID: &xAppEventInstanceID},
566 // Mark REST subscription request processesd.
567 restSubscription.SetProcessed(nil)
568 c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
569 xapp.Logger.Info("Sending successful REST notification to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
570 clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
572 c.UpdateCounter(cRestSubNotifToXapp)
573 xapp.Subscription.Notify(resp, *clientEndpoint)
576 //-------------------------------------------------------------------
578 //-------------------------------------------------------------------
579 func (c *Control) SubscriptionDeleteHandlerCB(restSubId string) error {
582 c.UpdateCounter(cRestSubDelReqFromXapp)
584 xapp.Logger.Info("SubscriptionDeleteRequest from XAPP")
586 restSubscription, err := c.registry.GetRESTSubscription(restSubId, true)
588 xapp.Logger.Error("%s", err.Error())
589 if restSubscription == nil {
590 // Subscription was not found
593 if restSubscription.SubReqOngoing == true {
594 err := fmt.Errorf("Handling of the REST Subscription Request still ongoing %s", restSubId)
595 xapp.Logger.Error("%s", err.Error())
597 } else if restSubscription.SubDelReqOngoing == true {
598 // Previous request for same restSubId still ongoing
604 xAppRmrEndPoint := restSubscription.xAppRmrEndPoint
606 xapp.Logger.Info("Deleteting handler: processing instances = %v", restSubscription.InstanceIds)
607 for _, instanceId := range restSubscription.InstanceIds {
608 xAppEventInstanceID, err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId)
611 xapp.Logger.Error("%s", err.Error())
614 xapp.Logger.Info("Deleteting instanceId = %v", instanceId)
615 restSubscription.DeleteXappIdToE2Id(xAppEventInstanceID)
616 restSubscription.DeleteE2InstanceId(instanceId)
618 restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(restSubscription.lastReqMd5sum)
619 c.registry.DeleteRESTSubscription(&restSubId)
620 c.RemoveRESTSubscriptionFromDb(restSubId)
623 c.UpdateCounter(cRestSubDelRespToXapp)
628 //-------------------------------------------------------------------
630 //-------------------------------------------------------------------
631 func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32) (int64, error) {
633 var xAppEventInstanceID int64
634 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{instanceId})
636 xapp.Logger.Info("Subscription Delete Handler subscription for restSubId=%v, E2EventInstanceID=%v not found %s",
637 restSubId, instanceId, idstring(err, nil))
638 return xAppEventInstanceID, nil
641 xAppEventInstanceID = int64(subs.ReqId.Id)
642 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(*endPoint), *restSubId, e2ap.RequestId{subs.ReqId.Id, 0}, &xapp.RMRMeid{RanName: *meid})
644 err := fmt.Errorf("XAPP-SubDelReq transaction not created. restSubId %s, endPoint %s, meid %s, instanceId %v", *restSubId, *endPoint, *meid, instanceId)
645 xapp.Logger.Error("%s", err.Error())
647 defer trans.Release()
649 err = c.tracker.Track(trans)
651 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
652 xapp.Logger.Error("%s", err.Error())
653 return xAppEventInstanceID, &time.ParseError{}
658 go c.handleSubscriptionDelete(subs, trans)
659 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
661 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
663 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
665 return xAppEventInstanceID, nil
668 //-------------------------------------------------------------------
670 //-------------------------------------------------------------------
671 func (c *Control) QueryHandler() (models.SubscriptionList, error) {
672 xapp.Logger.Info("QueryHandler() called")
676 return c.registry.QueryHandler()
679 func (c *Control) TestRestHandler(w http.ResponseWriter, r *http.Request) {
680 xapp.Logger.Info("TestRestHandler() called")
682 pathParams := mux.Vars(r)
683 s := pathParams["testId"]
685 // This can be used to delete single subscription from db
686 if contains := strings.Contains(s, "deletesubid="); contains == true {
687 var splits = strings.Split(s, "=")
688 if subId, err := strconv.ParseInt(splits[1], 10, 64); err == nil {
689 xapp.Logger.Info("RemoveSubscriptionFromSdl() called. subId = %v", subId)
690 c.RemoveSubscriptionFromSdl(uint32(subId))
695 // This can be used to remove all subscriptions db from
697 xapp.Logger.Info("RemoveAllSubscriptionsFromSdl() called")
698 c.RemoveAllSubscriptionsFromSdl()
699 c.RemoveAllRESTSubscriptionsFromSdl()
703 // This is meant to cause submgr's restart in testing
705 xapp.Logger.Info("os.Exit(1) called")
709 xapp.Logger.Info("Unsupported rest command received %s", s)
712 //-------------------------------------------------------------------
714 //-------------------------------------------------------------------
716 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
717 params := &xapp.RMRParams{}
718 params.Mtype = trans.GetMtype()
719 params.SubId = int(subs.GetReqId().InstanceId)
721 params.Meid = subs.GetMeid()
723 params.PayloadLen = len(trans.Payload.Buf)
724 params.Payload = trans.Payload.Buf
726 xapp.Logger.Info("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
727 err = c.SendWithRetry(params, false, 5)
729 xapp.Logger.Error("rmrSendToE2T: Send failed: %+v", err)
734 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
736 params := &xapp.RMRParams{}
737 params.Mtype = trans.GetMtype()
738 params.SubId = int(subs.GetReqId().InstanceId)
739 params.Xid = trans.GetXid()
740 params.Meid = trans.GetMeid()
742 params.PayloadLen = len(trans.Payload.Buf)
743 params.Payload = trans.Payload.Buf
745 xapp.Logger.Info("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
746 err = c.SendWithRetry(params, false, 5)
748 xapp.Logger.Error("rmrSendToXapp: Send failed: %+v", err)
753 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
754 if c.RMRClient == nil {
755 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
756 xapp.Logger.Error("%s", err.Error())
761 defer c.RMRClient.Free(msg.Mbuf)
763 // xapp-frame might use direct access to c buffer and
764 // when msg.Mbuf is freed, someone might take it into use
765 // and payload data might be invalid inside message handle function
767 // subscriptions won't load system a lot so there is no
768 // real performance hit by cloning buffer into new go byte slice
769 cPay := append(msg.Payload[:0:0], msg.Payload...)
771 msg.PayloadLen = len(cPay)
774 case xapp.RIC_SUB_REQ:
775 go c.handleXAPPSubscriptionRequest(msg)
776 case xapp.RIC_SUB_RESP:
777 go c.handleE2TSubscriptionResponse(msg)
778 case xapp.RIC_SUB_FAILURE:
779 go c.handleE2TSubscriptionFailure(msg)
780 case xapp.RIC_SUB_DEL_REQ:
781 go c.handleXAPPSubscriptionDeleteRequest(msg)
782 case xapp.RIC_SUB_DEL_RESP:
783 go c.handleE2TSubscriptionDeleteResponse(msg)
784 case xapp.RIC_SUB_DEL_FAILURE:
785 go c.handleE2TSubscriptionDeleteFailure(msg)
787 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
792 //-------------------------------------------------------------------
793 // handle from XAPP Subscription Request
794 //------------------------------------------------------------------
795 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
796 xapp.Logger.Info("MSG from XAPP: %s", params.String())
797 c.UpdateCounter(cSubReqFromXapp)
799 subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
801 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
805 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
807 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
810 defer trans.Release()
812 if err = c.tracker.Track(trans); err != nil {
813 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
817 //TODO handle subscription toward e2term inside AssignToSubscription / hide handleSubscriptionCreate in it?
818 subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
820 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
824 c.wakeSubscriptionRequest(subs, trans)
827 //-------------------------------------------------------------------
828 // Wake Subscription Request to E2node
829 //------------------------------------------------------------------
830 func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) {
832 go c.handleSubscriptionCreate(subs, trans)
833 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
836 switch themsg := event.(type) {
837 case *e2ap.E2APSubscriptionResponse:
838 themsg.RequestId.Id = trans.RequestId.Id
839 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
842 c.UpdateCounter(cSubRespToXapp)
843 c.rmrSendToXapp("", subs, trans)
846 case *e2ap.E2APSubscriptionFailure:
847 themsg.RequestId.Id = trans.RequestId.Id
848 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
850 c.UpdateCounter(cSubFailToXapp)
851 c.rmrSendToXapp("", subs, trans)
857 xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(err, trans, subs))
858 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
861 //-------------------------------------------------------------------
862 // handle from XAPP Subscription Delete Request
863 //------------------------------------------------------------------
864 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
865 xapp.Logger.Info("MSG from XAPP: %s", params.String())
866 c.UpdateCounter(cSubDelReqFromXapp)
868 subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
870 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
874 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
876 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
879 defer trans.Release()
881 err = c.tracker.Track(trans)
883 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
887 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
889 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
896 go c.handleSubscriptionDelete(subs, trans)
897 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
899 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
901 if subs.NoRespToXapp == true {
902 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
906 // Whatever is received success, fail or timeout, send successful delete response
907 subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
908 subDelRespMsg.RequestId.Id = trans.RequestId.Id
909 subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
910 subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
911 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
913 c.UpdateCounter(cSubDelRespToXapp)
914 c.rmrSendToXapp("", subs, trans)
917 //TODO handle subscription toward e2term insiged RemoveFromSubscription / hide handleSubscriptionDelete in it?
918 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
921 //-------------------------------------------------------------------
922 // SUBS CREATE Handling
923 //-------------------------------------------------------------------
924 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp) {
926 var removeSubscriptionFromDb bool = false
927 trans := c.tracker.NewSubsTransaction(subs)
928 subs.WaitTransactionTurn(trans)
929 defer subs.ReleaseTransactionTurn(trans)
930 defer trans.Release()
932 xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
934 subRfMsg, valid := subs.GetCachedResponse()
935 if subRfMsg == nil && valid == true {
936 event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
937 switch event.(type) {
938 case *e2ap.E2APSubscriptionResponse:
939 subRfMsg, valid = subs.SetCachedResponse(event, true)
940 subs.SubRespRcvd = true
941 case *e2ap.E2APSubscriptionFailure:
942 removeSubscriptionFromDb = true
943 subRfMsg, valid = subs.SetCachedResponse(event, false)
944 xapp.Logger.Info("SUBS-SubReq: internal delete due failure event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
945 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
946 case *SubmgrRestartTestEvent:
947 // This simulates that no response has been received and after restart subscriptions are restored from db
948 xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
951 xapp.Logger.Info("SUBS-SubReq: internal delete due default event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
952 removeSubscriptionFromDb = true
953 subRfMsg, valid = subs.SetCachedResponse(nil, false)
954 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
956 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
958 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
961 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
963 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
966 c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
967 parentTrans.SendEvent(subRfMsg, 0)
970 //-------------------------------------------------------------------
971 // SUBS DELETE Handling
972 //-------------------------------------------------------------------
974 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
976 trans := c.tracker.NewSubsTransaction(subs)
977 subs.WaitTransactionTurn(trans)
978 defer subs.ReleaseTransactionTurn(trans)
979 defer trans.Release()
981 xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
985 if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
988 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
992 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
993 // If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if
994 // RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...))
995 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
996 c.registry.UpdateSubscriptionToDb(subs, c)
997 parentTrans.SendEvent(nil, 0)
1000 //-------------------------------------------------------------------
1001 // send to E2T Subscription Request
1002 //-------------------------------------------------------------------
1003 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
1005 var event interface{} = nil
1006 var timedOut bool = false
1007 const ricRequestorId = 123
1009 subReqMsg := subs.SubReqMsg
1010 subReqMsg.RequestId = subs.GetReqId().RequestId
1011 subReqMsg.RequestId.Id = ricRequestorId
1012 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
1014 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
1018 // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
1019 c.WriteSubscriptionToDb(subs)
1021 for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
1022 desc := fmt.Sprintf("(retry %d)", retries)
1024 c.UpdateCounter(cSubReqToE2)
1026 c.UpdateCounter(cSubReReqToE2)
1028 c.rmrSendToE2T(desc, subs, trans)
1029 if subs.DoNotWaitSubResp == false {
1030 event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
1032 c.UpdateCounter(cSubReqTimerExpiry)
1036 // Simulating case where subscrition request has been sent but response has not been received before restart
1037 event = &SubmgrRestartTestEvent{}
1041 xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1045 //-------------------------------------------------------------------
1046 // send to E2T Subscription Delete Request
1047 //-------------------------------------------------------------------
1049 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
1051 var event interface{}
1053 const ricRequestorId = 123
1055 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1056 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1057 subDelReqMsg.RequestId.Id = ricRequestorId
1058 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1059 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1061 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
1065 for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
1066 desc := fmt.Sprintf("(retry %d)", retries)
1068 c.UpdateCounter(cSubDelReqToE2)
1070 c.UpdateCounter(cSubDelReReqToE2)
1072 c.rmrSendToE2T(desc, subs, trans)
1073 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
1075 c.UpdateCounter(cSubDelReqTimerExpiry)
1080 xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1084 //-------------------------------------------------------------------
1085 // handle from E2T Subscription Response
1086 //-------------------------------------------------------------------
1087 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
1088 xapp.Logger.Info("MSG from E2T: %s", params.String())
1089 c.UpdateCounter(cSubRespFromE2)
1091 subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
1093 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
1096 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
1098 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
1101 trans := subs.GetTransaction()
1103 err = fmt.Errorf("Ongoing transaction not found")
1104 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
1107 sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
1108 if sendOk == false {
1109 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1110 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
1115 //-------------------------------------------------------------------
1116 // handle from E2T Subscription Failure
1117 //-------------------------------------------------------------------
1118 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
1119 xapp.Logger.Info("MSG from E2T: %s", params.String())
1120 c.UpdateCounter(cSubFailFromE2)
1121 subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
1123 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
1126 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
1128 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
1131 trans := subs.GetTransaction()
1133 err = fmt.Errorf("Ongoing transaction not found")
1134 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
1137 sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
1138 if sendOk == false {
1139 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1140 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
1145 //-------------------------------------------------------------------
1146 // handle from E2T Subscription Delete Response
1147 //-------------------------------------------------------------------
1148 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
1149 xapp.Logger.Info("MSG from E2T: %s", params.String())
1150 c.UpdateCounter(cSubDelRespFromE2)
1151 subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
1153 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1156 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
1158 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1161 trans := subs.GetTransaction()
1163 err = fmt.Errorf("Ongoing transaction not found")
1164 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
1167 sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
1168 if sendOk == false {
1169 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1170 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
1175 //-------------------------------------------------------------------
1176 // handle from E2T Subscription Delete Failure
1177 //-------------------------------------------------------------------
1178 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
1179 xapp.Logger.Info("MSG from E2T: %s", params.String())
1180 c.UpdateCounter(cSubDelFailFromE2)
1181 subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
1183 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1186 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
1188 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1191 trans := subs.GetTransaction()
1193 err = fmt.Errorf("Ongoing transaction not found")
1194 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
1197 sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
1198 if sendOk == false {
1199 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1200 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
1205 //-------------------------------------------------------------------
1207 //-------------------------------------------------------------------
1208 func typeofSubsMessage(v interface{}) string {
1213 //case *e2ap.E2APSubscriptionRequest:
1215 case *e2ap.E2APSubscriptionResponse:
1217 case *e2ap.E2APSubscriptionFailure:
1219 //case *e2ap.E2APSubscriptionDeleteRequest:
1220 // return "SubDelReq"
1221 case *e2ap.E2APSubscriptionDeleteResponse:
1223 case *e2ap.E2APSubscriptionDeleteFailure:
1230 //-------------------------------------------------------------------
1232 //-------------------------------------------------------------------
1233 func (c *Control) WriteSubscriptionToDb(subs *Subscription) {
1234 xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
1235 err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
1237 xapp.Logger.Error("%v", err)
1241 //-------------------------------------------------------------------
1243 //-------------------------------------------------------------------
1244 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) {
1246 if removeSubscriptionFromDb == true {
1247 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1248 c.RemoveSubscriptionFromDb(subs)
1250 // Update is needed for successful response and merge case here
1251 if subs.RetryFromXapp == false {
1252 c.WriteSubscriptionToDb(subs)
1255 subs.RetryFromXapp = false
1258 //-------------------------------------------------------------------
1260 //-------------------------------------------------------------------
1261 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
1262 xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
1263 err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
1265 xapp.Logger.Error("%v", err)
1269 //-------------------------------------------------------------------
1271 //-------------------------------------------------------------------
1272 func (c *Control) WriteRESTSubscriptionToDb(restSubId string, restSubs *RESTSubscription) {
1273 xapp.Logger.Debug("WriteRESTSubscriptionToDb() restSubId = %s", restSubId)
1274 err := c.WriteRESTSubscriptionToSdl(restSubId, restSubs)
1276 xapp.Logger.Error("%v", err)
1280 //-------------------------------------------------------------------
1282 //-------------------------------------------------------------------
1283 func (c *Control) UpdateRESTSubscriptionInDB(restSubId string, restSubs *RESTSubscription, removeRestSubscriptionFromDb bool) {
1285 if removeRestSubscriptionFromDb == true {
1286 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1287 c.RemoveRESTSubscriptionFromDb(restSubId)
1289 c.WriteRESTSubscriptionToDb(restSubId, restSubs)
1293 //-------------------------------------------------------------------
1295 //-------------------------------------------------------------------
1296 func (c *Control) RemoveRESTSubscriptionFromDb(restSubId string) {
1297 xapp.Logger.Debug("RemoveRESTSubscriptionFromDb() restSubId = %s", restSubId)
1298 err := c.RemoveRESTSubscriptionFromSdl(restSubId)
1300 xapp.Logger.Error("%v", err)
1304 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) {
1306 const ricRequestorId = 123
1307 xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
1309 // Send delete for every endpoint in the subscription
1310 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1311 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1312 subDelReqMsg.RequestId.Id = ricRequestorId
1313 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1314 mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1316 xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
1319 for _, endPoint := range subs.EpList.Endpoints {
1320 params := &xapp.RMRParams{}
1321 params.Mtype = mType
1322 params.SubId = int(subs.GetReqId().InstanceId)
1324 params.Meid = subs.Meid
1325 params.Src = endPoint.String()
1326 params.PayloadLen = len(payload.Buf)
1327 params.Payload = payload.Buf
1329 subs.DeleteFromDb = true
1330 c.handleXAPPSubscriptionDeleteRequest(params)
1334 func (c *Control) PrintRESTSubscriptionRequest(p *models.SubscriptionParams) {
1336 fmt.Println("CRESTSubscriptionRequest")
1342 if p.SubscriptionID != "" {
1343 fmt.Println(" SubscriptionID = ", p.SubscriptionID)
1345 fmt.Println(" SubscriptionID = ''")
1348 fmt.Printf(" ClientEndpoint.Host = %s\n", p.ClientEndpoint.Host)
1350 if p.ClientEndpoint.HTTPPort != nil {
1351 fmt.Printf(" ClientEndpoint.HTTPPort = %v\n", *p.ClientEndpoint.HTTPPort)
1353 fmt.Println(" ClientEndpoint.HTTPPort = nil")
1356 if p.ClientEndpoint.RMRPort != nil {
1357 fmt.Printf(" ClientEndpoint.RMRPort = %v\n", *p.ClientEndpoint.RMRPort)
1359 fmt.Println(" ClientEndpoint.RMRPort = nil")
1363 fmt.Printf(" Meid = %s\n", *p.Meid)
1365 fmt.Println(" Meid = nil")
1368 for _, subscriptionDetail := range p.SubscriptionDetails {
1369 if p.RANFunctionID != nil {
1370 fmt.Printf(" RANFunctionID = %v\n", *p.RANFunctionID)
1372 fmt.Println(" RANFunctionID = nil")
1374 fmt.Printf(" SubscriptionDetail.XappEventInstanceID = %v\n", *subscriptionDetail.XappEventInstanceID)
1375 fmt.Printf(" SubscriptionDetail.EventTriggers = %v\n", subscriptionDetail.EventTriggers)
1377 for _, actionToBeSetup := range subscriptionDetail.ActionToBeSetupList {
1378 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionID = %v\n", *actionToBeSetup.ActionID)
1379 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionType = %s\n", *actionToBeSetup.ActionType)
1380 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionDefinition = %v\n", actionToBeSetup.ActionDefinition)
1382 if actionToBeSetup.SubsequentAction != nil {
1383 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.SubsequentAction.SubsequentActionType = %s\n", *actionToBeSetup.SubsequentAction.SubsequentActionType)
1384 fmt.Printf(" SubscriptionDetail.ActionToBeSetup..SubsequentAction.TimeToWait = %s\n", *actionToBeSetup.SubsequentAction.TimeToWait)
1386 fmt.Println(" SubscriptionDetail.ActionToBeSetup.SubsequentAction = nil")