2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
30 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
31 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
32 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
33 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
34 httptransport "github.com/go-openapi/runtime/client"
35 "github.com/go-openapi/strfmt"
36 "github.com/gorilla/mux"
37 "github.com/segmentio/ksuid"
38 "github.com/spf13/viper"
41 //-----------------------------------------------------------------------------
43 //-----------------------------------------------------------------------------
45 func idstring(err error, entries ...fmt.Stringer) string {
46 var retval string = ""
47 var filler string = ""
48 for _, entry := range entries {
50 retval += filler + entry.String()
53 retval += filler + "(NIL)"
57 retval += filler + "err(" + err.Error() + ")"
63 //-----------------------------------------------------------------------------
65 //-----------------------------------------------------------------------------
67 var e2tSubReqTimeout time.Duration
68 var e2tSubDelReqTime time.Duration
69 var e2tRecvMsgTimeout time.Duration
70 var waitRouteCleanup_ms time.Duration
71 var e2tMaxSubReqTryCount uint64 // Initial try + retry
72 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
73 var readSubsFromDb string
74 var dbRetryForever string
82 restDuplicateCtrl *DuplicateCtrl
84 restSubsDb Sdlnterface
87 Counters map[string]xapp.Counter
98 type SubmgrRestartTestEvent struct{}
99 type SubmgrRestartUpEvent struct{}
102 xapp.Logger.Info("SUBMGR")
104 viper.SetEnvPrefix("submgr")
105 viper.AllowEmptyEnv(true)
108 func NewControl() *Control {
110 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
111 rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
113 registry := new(Registry)
114 registry.Initialize()
115 registry.rtmgrClient = &rtmgrClient
117 tracker := new(Tracker)
120 restDuplicateCtrl := new(DuplicateCtrl)
121 restDuplicateCtrl.Init()
123 c := &Control{e2ap: new(E2ap),
126 restDuplicateCtrl: restDuplicateCtrl,
127 e2SubsDb: CreateSdl(),
128 restSubsDb: CreateRESTSdl(),
129 Counters: xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
132 c.ReadConfigParameters("")
134 // Register REST handler for testing support
135 xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
136 xapp.Resource.InjectRoute("/ric/v1/restsubscriptions", c.GetAllRestSubscriptions, "GET")
137 xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET")
139 if readSubsFromDb == "false" {
143 // Read subscriptions from db
144 c.ReadE2Subscriptions()
145 c.ReadRESTSubscriptions()
147 go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler, c.SubscriptionDeleteHandlerCB)
152 func (c *Control) SymptomDataHandler(w http.ResponseWriter, r *http.Request) {
153 subscriptions, _ := c.registry.QueryHandler()
154 xapp.Resource.SendSymptomDataJson(w, r, subscriptions, "platform/subscriptions.json")
157 //-------------------------------------------------------------------
159 //-------------------------------------------------------------------
160 func (c *Control) GetAllRestSubscriptions(w http.ResponseWriter, r *http.Request) {
161 xapp.Logger.Info("GetAllRestSubscriptions() called")
162 response := c.registry.GetAllRestSubscriptions()
166 //-------------------------------------------------------------------
168 //-------------------------------------------------------------------
169 func (c *Control) ReadE2Subscriptions() error {
172 var register map[uint32]*Subscription
173 for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
174 xapp.Logger.Info("Reading E2 subscriptions from db")
175 subIds, register, err = c.ReadAllSubscriptionsFromSdl()
177 xapp.Logger.Error("%v", err)
178 <-time.After(1 * time.Second)
180 c.registry.subIds = subIds
181 c.registry.register = register
182 c.HandleUncompletedSubscriptions(register)
186 xapp.Logger.Info("Continuing without retring")
190 //-------------------------------------------------------------------
192 //-------------------------------------------------------------------
193 func (c *Control) ReadRESTSubscriptions() error {
195 var restSubscriptions map[string]*RESTSubscription
196 for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
197 xapp.Logger.Info("Reading REST subscriptions from db")
198 restSubscriptions, err = c.ReadAllRESTSubscriptionsFromSdl()
200 xapp.Logger.Error("%v", err)
201 <-time.After(1 * time.Second)
203 c.registry.restSubscriptions = restSubscriptions
207 xapp.Logger.Info("Continuing without retring")
211 //-------------------------------------------------------------------
213 //-------------------------------------------------------------------
214 func (c *Control) ReadConfigParameters(f string) {
216 // viper.GetDuration returns nanoseconds
217 e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
218 if e2tSubReqTimeout == 0 {
219 e2tSubReqTimeout = 2000 * 1000000
221 xapp.Logger.Info("e2tSubReqTimeout %v", e2tSubReqTimeout)
222 e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
223 if e2tSubDelReqTime == 0 {
224 e2tSubDelReqTime = 2000 * 1000000
226 xapp.Logger.Info("e2tSubDelReqTime %v", e2tSubDelReqTime)
227 e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
228 if e2tRecvMsgTimeout == 0 {
229 e2tRecvMsgTimeout = 2000 * 1000000
231 xapp.Logger.Info("e2tRecvMsgTimeout %v", e2tRecvMsgTimeout)
233 e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
234 if e2tMaxSubReqTryCount == 0 {
235 e2tMaxSubReqTryCount = 1
237 xapp.Logger.Info("e2tMaxSubReqTryCount %v", e2tMaxSubReqTryCount)
239 e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
240 if e2tMaxSubDelReqTryCount == 0 {
241 e2tMaxSubDelReqTryCount = 1
243 xapp.Logger.Info("e2tMaxSubDelReqTryCount %v", e2tMaxSubDelReqTryCount)
245 readSubsFromDb = viper.GetString("controls.readSubsFromDb")
246 if readSubsFromDb == "" {
247 readSubsFromDb = "true"
249 xapp.Logger.Info("readSubsFromDb %v", readSubsFromDb)
251 dbTryCount = viper.GetInt("controls.dbTryCount")
255 xapp.Logger.Info("dbTryCount %v", dbTryCount)
257 dbRetryForever = viper.GetString("controls.dbRetryForever")
258 if dbRetryForever == "" {
259 dbRetryForever = "true"
261 xapp.Logger.Info("dbRetryForever %v", dbRetryForever)
263 c.LoggerLevel = viper.GetUint32("logger.level")
264 if c.LoggerLevel == 0 {
267 xapp.Logger.Info("LoggerLevel %v", c.LoggerLevel)
269 // Internal cfg parameter, used to define a wait time for RMR route clean-up. None default
270 // value 100ms used currently only in unittests.
271 waitRouteCleanup_ms = viper.GetDuration("controls.waitRouteCleanup_ms") * 1000000
272 if waitRouteCleanup_ms == 0 {
273 waitRouteCleanup_ms = 5000 * 1000000
275 xapp.Logger.Info("waitRouteCleanup %v", waitRouteCleanup_ms)
278 //-------------------------------------------------------------------
280 //-------------------------------------------------------------------
281 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
283 xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
284 for subId, subs := range register {
285 if subs.SubRespRcvd == false {
286 subs.NoRespToXapp = true
287 xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
288 c.SendSubscriptionDeleteReq(subs)
293 func (c *Control) ReadyCB(data interface{}) {
294 if c.RMRClient == nil {
295 c.RMRClient = xapp.Rmr
299 func (c *Control) Run() {
300 xapp.SetReadyCB(c.ReadyCB, nil)
301 xapp.AddConfigChangeListener(c.ReadConfigParameters)
305 //-------------------------------------------------------------------
307 //-------------------------------------------------------------------
308 func (c *Control) GetOrCreateRestSubscription(p *models.SubscriptionParams, md5sum string, xAppRmrEndpoint string) (*RESTSubscription, string, error) {
311 var restSubscription *RESTSubscription
314 prevRestSubsId, exists := c.restDuplicateCtrl.GetLastKnownRestSubsIdBasedOnMd5sum(md5sum)
315 if p.SubscriptionID == "" {
317 restSubscription, err = c.registry.GetRESTSubscription(prevRestSubsId, false)
318 if restSubscription != nil {
319 restSubId = prevRestSubsId
321 xapp.Logger.Info("Existing restSubId %s found by MD5sum %s for a request without subscription ID - using previous subscription", prevRestSubsId, md5sum)
323 xapp.Logger.Info("Existing restSubId %s found by MD5sum %s for a request without subscription ID - Note: %s", prevRestSubsId, md5sum, err.Error())
326 xapp.Logger.Info("None existing restSubId %s referred by MD5sum %s for a request without subscription ID - deleting cached entry", prevRestSubsId, md5sum)
327 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
331 if restSubscription == nil {
332 restSubId = ksuid.New().String()
333 restSubscription, err = c.registry.CreateRESTSubscription(&restSubId, &xAppRmrEndpoint, p.Meid)
335 xapp.Logger.Error("%s", err.Error())
336 c.UpdateCounter(cRestSubFailToXapp)
341 restSubId = p.SubscriptionID
343 xapp.Logger.Info("RestSubscription ID %s provided via REST request", restSubId)
345 restSubscription, err = c.registry.GetRESTSubscription(restSubId, false)
347 xapp.Logger.Error("%s", err.Error())
348 c.UpdateCounter(cRestSubFailToXapp)
353 xapp.Logger.Info("Existing restSubscription found for ID %s, new request based on md5sum", restSubId)
355 xapp.Logger.Info("Existing restSubscription found for ID %s(%s), re-transmission based on md5sum match with previous request", prevRestSubsId, restSubId)
359 return restSubscription, restSubId, nil
362 //-------------------------------------------------------------------
364 //-------------------------------------------------------------------
365 func (c *Control) SubscriptionHandler(params interface{}) (*models.SubscriptionResponse, error) {
368 c.UpdateCounter(cRestSubReqFromXapp)
370 subResp := models.SubscriptionResponse{}
371 p := params.(*models.SubscriptionParams)
373 if c.LoggerLevel > 2 {
374 c.PrintRESTSubscriptionRequest(p)
377 if p.ClientEndpoint == nil {
378 xapp.Logger.Error("ClientEndpoint == nil")
379 c.UpdateCounter(cRestSubFailToXapp)
380 return nil, fmt.Errorf("")
383 _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*p.ClientEndpoint)
385 xapp.Logger.Error("%s", err.Error())
386 c.UpdateCounter(cRestSubFailToXapp)
390 md5sum, err := CalculateRequestMd5sum(params)
392 xapp.Logger.Error("Failed to generate md5sum from incoming request - %s", err.Error())
395 restSubscription, restSubId, err := c.GetOrCreateRestSubscription(p, md5sum, xAppRmrEndpoint)
397 xapp.Logger.Error("Failed to get/allocate REST subscription")
401 subResp.SubscriptionID = &restSubId
402 subReqList := e2ap.SubscriptionRequestList{}
403 err = c.e2ap.FillSubscriptionReqMsgs(params, &subReqList, restSubscription)
405 xapp.Logger.Error("%s", err.Error())
406 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(md5sum)
407 c.registry.DeleteRESTSubscription(&restSubId)
408 c.UpdateCounter(cRestSubFailToXapp)
412 duplicate := c.restDuplicateCtrl.IsDuplicateToOngoingTransaction(restSubId, md5sum)
414 xapp.Logger.Info("Retransmission blocker direct ACK for request of restSubsId %s restSubId MD5sum %s as retransmission", restSubId, md5sum)
415 c.UpdateCounter(cRestSubRespToXapp)
419 c.WriteRESTSubscriptionToDb(restSubId, restSubscription)
421 go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId, xAppRmrEndpoint, md5sum)
423 c.UpdateCounter(cRestSubRespToXapp)
427 //-------------------------------------------------------------------
429 //-------------------------------------------------------------------
431 func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription, subReqList *e2ap.SubscriptionRequestList,
432 clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string, xAppRmrEndpoint string, md5sum string) {
434 c.SubscriptionProcessingStartDelay()
435 xapp.Logger.Debug("Subscription Request count=%v ", len(subReqList.E2APSubscriptionRequests))
437 var xAppEventInstanceID int64
438 var e2EventInstanceID int64
440 defer c.restDuplicateCtrl.SetMd5sumFromLastOkRequest(*restSubId, md5sum)
442 for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ {
443 subReqMsg := subReqList.E2APSubscriptionRequests[index]
444 xAppEventInstanceID = (int64)(subReqMsg.RequestId.Id)
446 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), *restSubId, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid})
448 // Send notification to xApp that prosessing of a Subscription Request has failed.
449 err := fmt.Errorf("Tracking failure")
450 c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans)
454 xapp.Logger.Info("Handle SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
456 subRespMsg, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, *restSubId)
458 xapp.Logger.Debug("Handled SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
462 c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans)
464 e2EventInstanceID = (int64)(subRespMsg.RequestId.InstanceId)
465 restSubscription.AddMd5Sum(md5sum)
466 xapp.Logger.Info("SubscriptionRequest index=%v processed successfullyfor %s. endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
467 index, *restSubId, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
468 c.sendSuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, e2EventInstanceID, clientEndpoint, trans)
473 //-------------------------------------------------------------------
475 //------------------------------------------------------------------
476 func (c *Control) SubscriptionProcessingStartDelay() {
477 if c.UTTesting == true {
478 // This is temporary fix for the UT problem that notification arrives before subscription response
479 // Correct fix would be to allow notification come before response and process it correctly
480 xapp.Logger.Debug("Setting 50 ms delay before starting processing Subscriptions")
481 <-time.After(time.Millisecond * 50)
482 xapp.Logger.Debug("Continuing after delay")
486 //-------------------------------------------------------------------
488 //------------------------------------------------------------------
489 func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, meid *string,
490 restSubId string) (*e2ap.E2APSubscriptionResponse, error) {
492 err := c.tracker.Track(trans)
494 xapp.Logger.Error("XAPP-SubReq Tracking error: %s", idstring(err, trans))
495 err = fmt.Errorf("Tracking failure")
499 subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
501 xapp.Logger.Error("XAPP-SubReq Assign error: %s", idstring(err, trans))
508 go c.handleSubscriptionCreate(subs, trans)
509 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
513 switch themsg := event.(type) {
514 case *e2ap.E2APSubscriptionResponse:
517 case *e2ap.E2APSubscriptionFailure:
518 err = fmt.Errorf("E2 SubscriptionFailure received")
521 err = fmt.Errorf("unexpected E2 subscription response received")
525 err = fmt.Errorf("E2 subscription response timeout")
528 xapp.Logger.Error("XAPP-SubReq E2 subscription failed %s", idstring(err, trans, subs))
529 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
533 //-------------------------------------------------------------------
535 //-------------------------------------------------------------------
536 func (c *Control) sendUnsuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, err error,
537 clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp) {
539 // Send notification to xApp that prosessing of a Subscription Request has failed.
540 e2EventInstanceID := (int64)(0)
541 errorCause := err.Error()
542 resp := &models.SubscriptionResponse{
543 SubscriptionID: restSubId,
544 SubscriptionInstances: []*models.SubscriptionInstance{
545 &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
546 ErrorCause: &errorCause,
547 XappEventInstanceID: &xAppEventInstanceID},
550 // Mark REST subscription request processed.
551 restSubscription.SetProcessed(err)
552 c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
554 xapp.Logger.Info("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
555 errorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
557 xapp.Logger.Info("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v",
558 errorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID)
561 c.UpdateCounter(cRestSubFailNotifToXapp)
562 xapp.Subscription.Notify(resp, *clientEndpoint)
565 //-------------------------------------------------------------------
567 //-------------------------------------------------------------------
568 func (c *Control) sendSuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, e2EventInstanceID int64,
569 clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp) {
571 // Store successfully processed InstanceId for deletion
572 restSubscription.AddE2InstanceId((uint32)(e2EventInstanceID))
573 restSubscription.AddXappIdToE2Id(xAppEventInstanceID, e2EventInstanceID)
575 // Send notification to xApp that a Subscription Request has been processed.
576 resp := &models.SubscriptionResponse{
577 SubscriptionID: restSubId,
578 SubscriptionInstances: []*models.SubscriptionInstance{
579 &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
581 XappEventInstanceID: &xAppEventInstanceID},
584 // Mark REST subscription request processesd.
585 restSubscription.SetProcessed(nil)
586 c.UpdateRESTSubscriptionInDB(*restSubId, restSubscription, false)
587 xapp.Logger.Info("Sending successful REST notification to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
588 clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
590 c.UpdateCounter(cRestSubNotifToXapp)
591 xapp.Subscription.Notify(resp, *clientEndpoint)
594 //-------------------------------------------------------------------
596 //-------------------------------------------------------------------
597 func (c *Control) SubscriptionDeleteHandlerCB(restSubId string) error {
600 c.UpdateCounter(cRestSubDelReqFromXapp)
602 xapp.Logger.Info("SubscriptionDeleteRequest from XAPP")
604 restSubscription, err := c.registry.GetRESTSubscription(restSubId, true)
606 xapp.Logger.Error("%s", err.Error())
607 if restSubscription == nil {
608 // Subscription was not found
611 if restSubscription.SubReqOngoing == true {
612 err := fmt.Errorf("Handling of the REST Subscription Request still ongoing %s", restSubId)
613 xapp.Logger.Error("%s", err.Error())
615 } else if restSubscription.SubDelReqOngoing == true {
616 // Previous request for same restSubId still ongoing
622 xAppRmrEndPoint := restSubscription.xAppRmrEndPoint
624 xapp.Logger.Info("Deleteting handler: processing instances = %v", restSubscription.InstanceIds)
625 for _, instanceId := range restSubscription.InstanceIds {
626 xAppEventInstanceID, err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId)
629 xapp.Logger.Error("%s", err.Error())
632 xapp.Logger.Info("Deleteting instanceId = %v", instanceId)
633 restSubscription.DeleteXappIdToE2Id(xAppEventInstanceID)
634 restSubscription.DeleteE2InstanceId(instanceId)
636 c.restDuplicateCtrl.DeleteLastKnownRestSubsIdBasedOnMd5sum(restSubscription.lastReqMd5sum)
637 c.registry.DeleteRESTSubscription(&restSubId)
638 c.RemoveRESTSubscriptionFromDb(restSubId)
641 c.UpdateCounter(cRestSubDelRespToXapp)
646 //-------------------------------------------------------------------
648 //-------------------------------------------------------------------
649 func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32) (int64, error) {
651 var xAppEventInstanceID int64
652 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{instanceId})
654 xapp.Logger.Info("Subscription Delete Handler subscription for restSubId=%v, E2EventInstanceID=%v not found %s",
655 restSubId, instanceId, idstring(err, nil))
656 return xAppEventInstanceID, nil
659 xAppEventInstanceID = int64(subs.ReqId.Id)
660 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(*endPoint), *restSubId, e2ap.RequestId{subs.ReqId.Id, 0}, &xapp.RMRMeid{RanName: *meid})
662 err := fmt.Errorf("XAPP-SubDelReq transaction not created. restSubId %s, endPoint %s, meid %s, instanceId %v", *restSubId, *endPoint, *meid, instanceId)
663 xapp.Logger.Error("%s", err.Error())
665 defer trans.Release()
667 err = c.tracker.Track(trans)
669 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
670 xapp.Logger.Error("%s", err.Error())
671 return xAppEventInstanceID, &time.ParseError{}
676 go c.handleSubscriptionDelete(subs, trans)
677 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
679 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
681 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
683 return xAppEventInstanceID, nil
686 //-------------------------------------------------------------------
688 //-------------------------------------------------------------------
689 func (c *Control) QueryHandler() (models.SubscriptionList, error) {
690 xapp.Logger.Info("QueryHandler() called")
694 return c.registry.QueryHandler()
697 func (c *Control) TestRestHandler(w http.ResponseWriter, r *http.Request) {
698 xapp.Logger.Info("TestRestHandler() called")
700 pathParams := mux.Vars(r)
701 s := pathParams["testId"]
703 // This can be used to delete single subscription from db
704 if contains := strings.Contains(s, "deletesubid="); contains == true {
705 var splits = strings.Split(s, "=")
706 if subId, err := strconv.ParseInt(splits[1], 10, 64); err == nil {
707 xapp.Logger.Info("RemoveSubscriptionFromSdl() called. subId = %v", subId)
708 c.RemoveSubscriptionFromSdl(uint32(subId))
713 // This can be used to remove all subscriptions db from
715 xapp.Logger.Info("RemoveAllSubscriptionsFromSdl() called")
716 c.RemoveAllSubscriptionsFromSdl()
717 c.RemoveAllRESTSubscriptionsFromSdl()
721 // This is meant to cause submgr's restart in testing
723 xapp.Logger.Info("os.Exit(1) called")
727 xapp.Logger.Info("Unsupported rest command received %s", s)
730 //-------------------------------------------------------------------
732 //-------------------------------------------------------------------
734 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
735 params := &xapp.RMRParams{}
736 params.Mtype = trans.GetMtype()
737 params.SubId = int(subs.GetReqId().InstanceId)
739 params.Meid = subs.GetMeid()
741 params.PayloadLen = len(trans.Payload.Buf)
742 params.Payload = trans.Payload.Buf
744 xapp.Logger.Info("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
745 err = c.SendWithRetry(params, false, 5)
747 xapp.Logger.Error("rmrSendToE2T: Send failed: %+v", err)
752 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
754 params := &xapp.RMRParams{}
755 params.Mtype = trans.GetMtype()
756 params.SubId = int(subs.GetReqId().InstanceId)
757 params.Xid = trans.GetXid()
758 params.Meid = trans.GetMeid()
760 params.PayloadLen = len(trans.Payload.Buf)
761 params.Payload = trans.Payload.Buf
763 xapp.Logger.Info("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
764 err = c.SendWithRetry(params, false, 5)
766 xapp.Logger.Error("rmrSendToXapp: Send failed: %+v", err)
771 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
772 if c.RMRClient == nil {
773 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
774 xapp.Logger.Error("%s", err.Error())
779 defer c.RMRClient.Free(msg.Mbuf)
781 // xapp-frame might use direct access to c buffer and
782 // when msg.Mbuf is freed, someone might take it into use
783 // and payload data might be invalid inside message handle function
785 // subscriptions won't load system a lot so there is no
786 // real performance hit by cloning buffer into new go byte slice
787 cPay := append(msg.Payload[:0:0], msg.Payload...)
789 msg.PayloadLen = len(cPay)
792 case xapp.RIC_SUB_REQ:
793 go c.handleXAPPSubscriptionRequest(msg)
794 case xapp.RIC_SUB_RESP:
795 go c.handleE2TSubscriptionResponse(msg)
796 case xapp.RIC_SUB_FAILURE:
797 go c.handleE2TSubscriptionFailure(msg)
798 case xapp.RIC_SUB_DEL_REQ:
799 go c.handleXAPPSubscriptionDeleteRequest(msg)
800 case xapp.RIC_SUB_DEL_RESP:
801 go c.handleE2TSubscriptionDeleteResponse(msg)
802 case xapp.RIC_SUB_DEL_FAILURE:
803 go c.handleE2TSubscriptionDeleteFailure(msg)
805 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
810 //-------------------------------------------------------------------
811 // handle from XAPP Subscription Request
812 //------------------------------------------------------------------
813 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
814 xapp.Logger.Info("MSG from XAPP: %s", params.String())
815 c.UpdateCounter(cSubReqFromXapp)
817 subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
819 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
823 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
825 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
828 defer trans.Release()
830 if err = c.tracker.Track(trans); err != nil {
831 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
835 //TODO handle subscription toward e2term inside AssignToSubscription / hide handleSubscriptionCreate in it?
836 subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
838 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
842 c.wakeSubscriptionRequest(subs, trans)
845 //-------------------------------------------------------------------
846 // Wake Subscription Request to E2node
847 //------------------------------------------------------------------
848 func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) {
850 go c.handleSubscriptionCreate(subs, trans)
851 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
854 switch themsg := event.(type) {
855 case *e2ap.E2APSubscriptionResponse:
856 themsg.RequestId.Id = trans.RequestId.Id
857 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
860 c.UpdateCounter(cSubRespToXapp)
861 c.rmrSendToXapp("", subs, trans)
864 case *e2ap.E2APSubscriptionFailure:
865 themsg.RequestId.Id = trans.RequestId.Id
866 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
868 c.UpdateCounter(cSubFailToXapp)
869 c.rmrSendToXapp("", subs, trans)
875 xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(err, trans, subs))
876 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
879 //-------------------------------------------------------------------
880 // handle from XAPP Subscription Delete Request
881 //------------------------------------------------------------------
882 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
883 xapp.Logger.Info("MSG from XAPP: %s", params.String())
884 c.UpdateCounter(cSubDelReqFromXapp)
886 subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
888 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
892 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
894 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
897 defer trans.Release()
899 err = c.tracker.Track(trans)
901 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
905 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
907 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
914 go c.handleSubscriptionDelete(subs, trans)
915 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
917 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
919 if subs.NoRespToXapp == true {
920 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
924 // Whatever is received success, fail or timeout, send successful delete response
925 subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
926 subDelRespMsg.RequestId.Id = trans.RequestId.Id
927 subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
928 subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
929 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
931 c.UpdateCounter(cSubDelRespToXapp)
932 c.rmrSendToXapp("", subs, trans)
935 //TODO handle subscription toward e2term insiged RemoveFromSubscription / hide handleSubscriptionDelete in it?
936 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
939 //-------------------------------------------------------------------
940 // SUBS CREATE Handling
941 //-------------------------------------------------------------------
942 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp) {
944 var removeSubscriptionFromDb bool = false
945 trans := c.tracker.NewSubsTransaction(subs)
946 subs.WaitTransactionTurn(trans)
947 defer subs.ReleaseTransactionTurn(trans)
948 defer trans.Release()
950 xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
952 subRfMsg, valid := subs.GetCachedResponse()
953 if subRfMsg == nil && valid == true {
954 event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
955 switch event.(type) {
956 case *e2ap.E2APSubscriptionResponse:
957 subRfMsg, valid = subs.SetCachedResponse(event, true)
958 subs.SubRespRcvd = true
959 case *e2ap.E2APSubscriptionFailure:
960 removeSubscriptionFromDb = true
961 subRfMsg, valid = subs.SetCachedResponse(event, false)
962 xapp.Logger.Info("SUBS-SubReq: internal delete due failure event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
963 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
964 case *SubmgrRestartTestEvent:
965 // This simulates that no response has been received and after restart subscriptions are restored from db
966 xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
969 xapp.Logger.Info("SUBS-SubReq: internal delete due default event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
970 removeSubscriptionFromDb = true
971 subRfMsg, valid = subs.SetCachedResponse(nil, false)
972 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
974 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
976 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
979 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
981 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
984 c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
985 parentTrans.SendEvent(subRfMsg, 0)
988 //-------------------------------------------------------------------
989 // SUBS DELETE Handling
990 //-------------------------------------------------------------------
992 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
994 trans := c.tracker.NewSubsTransaction(subs)
995 subs.WaitTransactionTurn(trans)
996 defer subs.ReleaseTransactionTurn(trans)
997 defer trans.Release()
999 xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
1003 if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
1006 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
1010 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
1011 // If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if
1012 // RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...))
1013 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
1014 c.registry.UpdateSubscriptionToDb(subs, c)
1015 parentTrans.SendEvent(nil, 0)
1018 //-------------------------------------------------------------------
1019 // send to E2T Subscription Request
1020 //-------------------------------------------------------------------
1021 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
1023 var event interface{} = nil
1024 var timedOut bool = false
1025 const ricRequestorId = 123
1027 subReqMsg := subs.SubReqMsg
1028 subReqMsg.RequestId = subs.GetReqId().RequestId
1029 subReqMsg.RequestId.Id = ricRequestorId
1030 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
1032 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
1036 // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
1037 c.WriteSubscriptionToDb(subs)
1039 for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
1040 desc := fmt.Sprintf("(retry %d)", retries)
1042 c.UpdateCounter(cSubReqToE2)
1044 c.UpdateCounter(cSubReReqToE2)
1046 c.rmrSendToE2T(desc, subs, trans)
1047 if subs.DoNotWaitSubResp == false {
1048 event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
1050 c.UpdateCounter(cSubReqTimerExpiry)
1054 // Simulating case where subscrition request has been sent but response has not been received before restart
1055 event = &SubmgrRestartTestEvent{}
1059 xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1063 //-------------------------------------------------------------------
1064 // send to E2T Subscription Delete Request
1065 //-------------------------------------------------------------------
1067 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
1069 var event interface{}
1071 const ricRequestorId = 123
1073 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1074 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1075 subDelReqMsg.RequestId.Id = ricRequestorId
1076 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1077 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1079 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
1083 for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
1084 desc := fmt.Sprintf("(retry %d)", retries)
1086 c.UpdateCounter(cSubDelReqToE2)
1088 c.UpdateCounter(cSubDelReReqToE2)
1090 c.rmrSendToE2T(desc, subs, trans)
1091 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
1093 c.UpdateCounter(cSubDelReqTimerExpiry)
1098 xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
1102 //-------------------------------------------------------------------
1103 // handle from E2T Subscription Response
1104 //-------------------------------------------------------------------
1105 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
1106 xapp.Logger.Info("MSG from E2T: %s", params.String())
1107 c.UpdateCounter(cSubRespFromE2)
1109 subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
1111 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
1114 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
1116 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
1119 trans := subs.GetTransaction()
1121 err = fmt.Errorf("Ongoing transaction not found")
1122 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
1125 sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
1126 if sendOk == false {
1127 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1128 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
1133 //-------------------------------------------------------------------
1134 // handle from E2T Subscription Failure
1135 //-------------------------------------------------------------------
1136 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
1137 xapp.Logger.Info("MSG from E2T: %s", params.String())
1138 c.UpdateCounter(cSubFailFromE2)
1139 subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
1141 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
1144 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
1146 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
1149 trans := subs.GetTransaction()
1151 err = fmt.Errorf("Ongoing transaction not found")
1152 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
1155 sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
1156 if sendOk == false {
1157 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1158 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
1163 //-------------------------------------------------------------------
1164 // handle from E2T Subscription Delete Response
1165 //-------------------------------------------------------------------
1166 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
1167 xapp.Logger.Info("MSG from E2T: %s", params.String())
1168 c.UpdateCounter(cSubDelRespFromE2)
1169 subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
1171 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1174 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
1176 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1179 trans := subs.GetTransaction()
1181 err = fmt.Errorf("Ongoing transaction not found")
1182 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
1185 sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
1186 if sendOk == false {
1187 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1188 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
1193 //-------------------------------------------------------------------
1194 // handle from E2T Subscription Delete Failure
1195 //-------------------------------------------------------------------
1196 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
1197 xapp.Logger.Info("MSG from E2T: %s", params.String())
1198 c.UpdateCounter(cSubDelFailFromE2)
1199 subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
1201 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1204 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
1206 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1209 trans := subs.GetTransaction()
1211 err = fmt.Errorf("Ongoing transaction not found")
1212 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
1215 sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
1216 if sendOk == false {
1217 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1218 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
1223 //-------------------------------------------------------------------
1225 //-------------------------------------------------------------------
1226 func typeofSubsMessage(v interface{}) string {
1231 //case *e2ap.E2APSubscriptionRequest:
1233 case *e2ap.E2APSubscriptionResponse:
1235 case *e2ap.E2APSubscriptionFailure:
1237 //case *e2ap.E2APSubscriptionDeleteRequest:
1238 // return "SubDelReq"
1239 case *e2ap.E2APSubscriptionDeleteResponse:
1241 case *e2ap.E2APSubscriptionDeleteFailure:
1248 //-------------------------------------------------------------------
1250 //-------------------------------------------------------------------
1251 func (c *Control) WriteSubscriptionToDb(subs *Subscription) {
1252 xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
1253 err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
1255 xapp.Logger.Error("%v", err)
1259 //-------------------------------------------------------------------
1261 //-------------------------------------------------------------------
1262 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) {
1264 if removeSubscriptionFromDb == true {
1265 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1266 c.RemoveSubscriptionFromDb(subs)
1268 // Update is needed for successful response and merge case here
1269 if subs.RetryFromXapp == false {
1270 c.WriteSubscriptionToDb(subs)
1273 subs.RetryFromXapp = false
1276 //-------------------------------------------------------------------
1278 //-------------------------------------------------------------------
1279 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
1280 xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
1281 err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
1283 xapp.Logger.Error("%v", err)
1287 //-------------------------------------------------------------------
1289 //-------------------------------------------------------------------
1290 func (c *Control) WriteRESTSubscriptionToDb(restSubId string, restSubs *RESTSubscription) {
1291 xapp.Logger.Debug("WriteRESTSubscriptionToDb() restSubId = %s", restSubId)
1292 err := c.WriteRESTSubscriptionToSdl(restSubId, restSubs)
1294 xapp.Logger.Error("%v", err)
1298 //-------------------------------------------------------------------
1300 //-------------------------------------------------------------------
1301 func (c *Control) UpdateRESTSubscriptionInDB(restSubId string, restSubs *RESTSubscription, removeRestSubscriptionFromDb bool) {
1303 if removeRestSubscriptionFromDb == true {
1304 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1305 c.RemoveRESTSubscriptionFromDb(restSubId)
1307 c.WriteRESTSubscriptionToDb(restSubId, restSubs)
1311 //-------------------------------------------------------------------
1313 //-------------------------------------------------------------------
1314 func (c *Control) RemoveRESTSubscriptionFromDb(restSubId string) {
1315 xapp.Logger.Debug("RemoveRESTSubscriptionFromDb() restSubId = %s", restSubId)
1316 err := c.RemoveRESTSubscriptionFromSdl(restSubId)
1318 xapp.Logger.Error("%v", err)
1322 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) {
1324 const ricRequestorId = 123
1325 xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
1327 // Send delete for every endpoint in the subscription
1328 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1329 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1330 subDelReqMsg.RequestId.Id = ricRequestorId
1331 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1332 mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1334 xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
1337 for _, endPoint := range subs.EpList.Endpoints {
1338 params := &xapp.RMRParams{}
1339 params.Mtype = mType
1340 params.SubId = int(subs.GetReqId().InstanceId)
1342 params.Meid = subs.Meid
1343 params.Src = endPoint.String()
1344 params.PayloadLen = len(payload.Buf)
1345 params.Payload = payload.Buf
1347 subs.DeleteFromDb = true
1348 c.handleXAPPSubscriptionDeleteRequest(params)
1352 func (c *Control) PrintRESTSubscriptionRequest(p *models.SubscriptionParams) {
1354 fmt.Println("CRESTSubscriptionRequest")
1360 if p.SubscriptionID != "" {
1361 fmt.Println(" SubscriptionID = ", p.SubscriptionID)
1363 fmt.Println(" SubscriptionID = ''")
1366 fmt.Printf(" ClientEndpoint.Host = %s\n", p.ClientEndpoint.Host)
1368 if p.ClientEndpoint.HTTPPort != nil {
1369 fmt.Printf(" ClientEndpoint.HTTPPort = %v\n", *p.ClientEndpoint.HTTPPort)
1371 fmt.Println(" ClientEndpoint.HTTPPort = nil")
1374 if p.ClientEndpoint.RMRPort != nil {
1375 fmt.Printf(" ClientEndpoint.RMRPort = %v\n", *p.ClientEndpoint.RMRPort)
1377 fmt.Println(" ClientEndpoint.RMRPort = nil")
1381 fmt.Printf(" Meid = %s\n", *p.Meid)
1383 fmt.Println(" Meid = nil")
1386 for _, subscriptionDetail := range p.SubscriptionDetails {
1387 if p.RANFunctionID != nil {
1388 fmt.Printf(" RANFunctionID = %v\n", *p.RANFunctionID)
1390 fmt.Println(" RANFunctionID = nil")
1392 fmt.Printf(" SubscriptionDetail.XappEventInstanceID = %v\n", *subscriptionDetail.XappEventInstanceID)
1393 fmt.Printf(" SubscriptionDetail.EventTriggers = %v\n", subscriptionDetail.EventTriggers)
1395 for _, actionToBeSetup := range subscriptionDetail.ActionToBeSetupList {
1396 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionID = %v\n", *actionToBeSetup.ActionID)
1397 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionType = %s\n", *actionToBeSetup.ActionType)
1398 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionDefinition = %v\n", actionToBeSetup.ActionDefinition)
1400 if actionToBeSetup.SubsequentAction != nil {
1401 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.SubsequentAction.SubsequentActionType = %s\n", *actionToBeSetup.SubsequentAction.SubsequentActionType)
1402 fmt.Printf(" SubscriptionDetail.ActionToBeSetup..SubsequentAction.TimeToWait = %s\n", *actionToBeSetup.SubsequentAction.TimeToWait)
1404 fmt.Println(" SubscriptionDetail.ActionToBeSetup.SubsequentAction = nil")