2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
30 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
31 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
32 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
33 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
34 httptransport "github.com/go-openapi/runtime/client"
35 "github.com/go-openapi/strfmt"
36 "github.com/gorilla/mux"
37 "github.com/segmentio/ksuid"
38 "github.com/spf13/viper"
41 //-----------------------------------------------------------------------------
43 //-----------------------------------------------------------------------------
45 func idstring(err error, entries ...fmt.Stringer) string {
46 var retval string = ""
47 var filler string = ""
48 for _, entry := range entries {
49 retval += filler + entry.String()
53 retval += filler + "err(" + err.Error() + ")"
59 //-----------------------------------------------------------------------------
61 //-----------------------------------------------------------------------------
63 var e2tSubReqTimeout time.Duration
64 var e2tSubDelReqTime time.Duration
65 var e2tRecvMsgTimeout time.Duration
66 var waitRouteCleanup_ms time.Duration
67 var e2tMaxSubReqTryCount uint64 // Initial try + retry
68 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
69 var readSubsFromDb string
79 Counters map[string]xapp.Counter
89 type SubmgrRestartTestEvent struct{}
90 type SubmgrRestartUpEvent struct{}
93 xapp.Logger.Info("SUBMGR")
95 viper.SetEnvPrefix("submgr")
96 viper.AllowEmptyEnv(true)
99 func NewControl() *Control {
101 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
102 rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
104 registry := new(Registry)
105 registry.Initialize()
106 registry.rtmgrClient = &rtmgrClient
108 tracker := new(Tracker)
111 c := &Control{e2ap: new(E2ap),
115 Counters: xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
118 c.ReadConfigParameters("")
120 // Register REST handler for testing support
121 xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
122 xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET")
124 go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler, c.SubscriptionDeleteHandlerCB)
126 if readSubsFromDb == "false" {
130 // Read subscriptions from db
131 xapp.Logger.Info("Reading subscriptions from db")
132 subIds, register, err := c.ReadAllSubscriptionsFromSdl()
134 xapp.Logger.Error("%v", err)
136 c.registry.subIds = subIds
137 c.registry.register = register
138 c.HandleUncompletedSubscriptions(register)
143 func (c *Control) SymptomDataHandler(w http.ResponseWriter, r *http.Request) {
144 subscriptions, _ := c.registry.QueryHandler()
145 xapp.Resource.SendSymptomDataJson(w, r, subscriptions, "platform/subscriptions.json")
148 //-------------------------------------------------------------------
150 //-------------------------------------------------------------------
151 func (c *Control) ReadConfigParameters(f string) {
153 // viper.GetDuration returns nanoseconds
154 e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
155 if e2tSubReqTimeout == 0 {
156 e2tSubReqTimeout = 2000 * 1000000
158 xapp.Logger.Info("e2tSubReqTimeout %v", e2tSubReqTimeout)
159 e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
160 if e2tSubDelReqTime == 0 {
161 e2tSubDelReqTime = 2000 * 1000000
163 xapp.Logger.Info("e2tSubDelReqTime %v", e2tSubDelReqTime)
164 e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
165 if e2tRecvMsgTimeout == 0 {
166 e2tRecvMsgTimeout = 2000 * 1000000
168 xapp.Logger.Info("e2tRecvMsgTimeout %v", e2tRecvMsgTimeout)
170 // Internal cfg parameter, used to define a wait time for RMR route clean-up. None default
171 // value 100ms used currently only in unittests.
172 waitRouteCleanup_ms = viper.GetDuration("controls.waitRouteCleanup_ms") * 1000000
173 if waitRouteCleanup_ms == 0 {
174 waitRouteCleanup_ms = 5000 * 1000000
176 xapp.Logger.Info("waitRouteCleanup %v", waitRouteCleanup_ms)
178 e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
179 if e2tMaxSubReqTryCount == 0 {
180 e2tMaxSubReqTryCount = 1
182 xapp.Logger.Info("e2tMaxSubReqTryCount %v", e2tMaxSubReqTryCount)
184 e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
185 if e2tMaxSubDelReqTryCount == 0 {
186 e2tMaxSubDelReqTryCount = 1
188 xapp.Logger.Info("e2tMaxSubDelReqTryCount %v", e2tMaxSubDelReqTryCount)
190 readSubsFromDb = viper.GetString("controls.readSubsFromDb")
191 if readSubsFromDb == "" {
192 readSubsFromDb = "true"
194 xapp.Logger.Info("readSubsFromDb %v", readSubsFromDb)
195 c.LoggerLevel = viper.GetUint32("logger.level")
196 if c.LoggerLevel == 0 {
201 //-------------------------------------------------------------------
203 //-------------------------------------------------------------------
204 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
206 xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
207 for subId, subs := range register {
208 if subs.SubRespRcvd == false {
209 subs.NoRespToXapp = true
210 xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
211 c.SendSubscriptionDeleteReq(subs)
216 func (c *Control) ReadyCB(data interface{}) {
217 if c.RMRClient == nil {
218 c.RMRClient = xapp.Rmr
222 func (c *Control) Run() {
223 xapp.SetReadyCB(c.ReadyCB, nil)
224 xapp.AddConfigChangeListener(c.ReadConfigParameters)
228 //-------------------------------------------------------------------
230 //-------------------------------------------------------------------
231 func (c *Control) SubscriptionHandler(params interface{}) (*models.SubscriptionResponse, error) {
234 c.UpdateCounter(cRestSubReqFromXapp)
236 restSubId := ksuid.New().String()
237 subResp := models.SubscriptionResponse{}
238 subResp.SubscriptionID = &restSubId
239 p := params.(*models.SubscriptionParams)
241 if c.LoggerLevel > 2 {
242 c.PrintRESTSubscriptionRequest(p)
245 if p.ClientEndpoint == nil {
246 xapp.Logger.Error("ClientEndpoint == nil")
247 c.UpdateCounter(cRestSubFailToXapp)
248 return nil, fmt.Errorf("")
251 _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*p.ClientEndpoint)
253 xapp.Logger.Error("%s", err.Error())
254 c.UpdateCounter(cRestSubFailToXapp)
258 restSubscription, err := c.registry.CreateRESTSubscription(&restSubId, &xAppRmrEndpoint, p.Meid)
260 xapp.Logger.Error("%s", err.Error())
261 c.UpdateCounter(cRestSubFailToXapp)
265 subReqList := e2ap.SubscriptionRequestList{}
266 err = c.e2ap.FillSubscriptionReqMsgs(params, &subReqList, restSubscription)
268 xapp.Logger.Error("%s", err.Error())
269 c.registry.DeleteRESTSubscription(&restSubId)
270 c.UpdateCounter(cRestSubFailToXapp)
274 go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId)
276 c.UpdateCounter(cRestSubRespToXapp)
280 //-------------------------------------------------------------------
282 //-------------------------------------------------------------------
284 func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription, subReqList *e2ap.SubscriptionRequestList,
285 clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string) {
287 xapp.Logger.Info("Subscription Request count=%v ", len(subReqList.E2APSubscriptionRequests))
289 _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*clientEndpoint)
291 xapp.Logger.Error("%s", err.Error())
295 var requestorID int64
297 for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ {
298 subReqMsg := subReqList.E2APSubscriptionRequests[index]
300 xid := *restSubId + "_" + strconv.FormatUint(uint64(subReqMsg.RequestId.InstanceId), 10)
301 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), xid, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid})
302 //trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), *restSubId, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid})
304 c.registry.DeleteRESTSubscription(restSubId)
305 xapp.Logger.Error("XAPP-SubReq transaction not created. RESTSubId=%s, EndPoint=%s, Meid=%s", *restSubId, xAppRmrEndpoint, *meid)
309 defer trans.Release()
310 xapp.Logger.Info("Handle SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
311 subRespMsg, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, restSubId)
313 // Send notification to xApp that prosessing of a Subscription Request has failed. Currently it is not possible
314 // to indicate error. Such possibility should be added. As a workaround requestorID and instanceId are set to zero value
315 requestorID = (int64)(0)
316 instanceId = (int64)(0)
317 resp := &models.SubscriptionResponse{
318 SubscriptionID: restSubId,
319 SubscriptionInstances: []*models.SubscriptionInstance{
320 &models.SubscriptionInstance{RequestorID: &requestorID, InstanceID: &instanceId},
323 // Mark REST subscription request processed.
324 restSubscription.SetProcessed()
325 xapp.Logger.Info("Sending unsuccessful REST notification to endpoint=%v:%v, InstanceId=%v, %s", clientEndpoint.Host, clientEndpoint.HTTPPort, instanceId, idstring(nil, trans))
326 xapp.Subscription.Notify(resp, *clientEndpoint)
327 c.UpdateCounter(cRestSubFailNotifToXapp)
329 xapp.Logger.Info("SubscriptionRequest index=%v processed successfully. endpoint=%v, InstanceId=%v, %s", index, *clientEndpoint, instanceId, idstring(nil, trans))
331 // Store successfully processed InstanceId for deletion
332 restSubscription.AddInstanceId(subRespMsg.RequestId.InstanceId)
334 // Send notification to xApp that a Subscription Request has been processed.
335 requestorID = (int64)(subRespMsg.RequestId.Id)
336 instanceId = (int64)(subRespMsg.RequestId.InstanceId)
337 resp := &models.SubscriptionResponse{
338 SubscriptionID: restSubId,
339 SubscriptionInstances: []*models.SubscriptionInstance{
340 &models.SubscriptionInstance{RequestorID: &requestorID, InstanceID: &instanceId},
343 // Mark REST subscription request processesd.
344 restSubscription.SetProcessed()
345 xapp.Logger.Info("Sending successful REST notification to endpoint=%v, InstanceId=%v, %s", *clientEndpoint, instanceId, idstring(nil, trans))
346 xapp.Subscription.Notify(resp, *clientEndpoint)
347 c.UpdateCounter(cRestSubNotifToXapp)
353 //-------------------------------------------------------------------
355 //------------------------------------------------------------------
356 func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, meid *string,
357 restSubId *string) (*e2ap.E2APSubscriptionResponse, error) {
359 err := c.tracker.Track(trans)
361 err = fmt.Errorf("XAPP-SubReq: %s", idstring(err, trans))
362 xapp.Logger.Error("%s", err.Error())
366 subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
368 err = fmt.Errorf("XAPP-SubReq: %s", idstring(err, trans))
369 xapp.Logger.Error("%s", err.Error())
376 go c.handleSubscriptionCreate(subs, trans)
377 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
381 switch themsg := event.(type) {
382 case *e2ap.E2APSubscriptionResponse:
385 case *e2ap.E2APSubscriptionFailure:
386 err = fmt.Errorf("SubscriptionFailure received")
392 err = fmt.Errorf("XAPP-SubReq: failed %s", idstring(err, trans, subs))
393 xapp.Logger.Error("%s", err.Error())
394 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
398 //-------------------------------------------------------------------
400 //-------------------------------------------------------------------
401 func (c *Control) SubscriptionDeleteHandlerCB(restSubId string) error {
404 c.UpdateCounter(cRestSubDelReqFromXapp)
406 xapp.Logger.Info("SubscriptionDeleteRequest from XAPP")
408 restSubscription, err := c.registry.GetRESTSubscription(restSubId)
410 xapp.Logger.Error("%s", err.Error())
411 if restSubscription == nil {
412 // Subscription was not found
415 if restSubscription.SubReqOngoing == true {
416 err := fmt.Errorf("Handling of the REST Subscription Request still ongoing %s", restSubId)
417 xapp.Logger.Error("%s", err.Error())
419 } else if restSubscription.SubDelReqOngoing == true {
420 // Previous request for same restSubId still ongoing
426 xAppRmrEndPoint := restSubscription.xAppRmrEndPoint
428 for _, instanceId := range restSubscription.InstanceIds {
429 err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId)
431 xapp.Logger.Error("%s", err.Error())
434 xapp.Logger.Info("Deleteting instanceId = %v", instanceId)
435 restSubscription.DeleteInstanceId(instanceId)
437 c.registry.DeleteRESTSubscription(&restSubId)
440 c.UpdateCounter(cRestSubDelRespToXapp)
445 //-------------------------------------------------------------------
447 //-------------------------------------------------------------------
448 func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32) error {
450 xid := *restSubId + "_" + strconv.FormatUint(uint64(instanceId), 10)
451 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(*endPoint), xid, e2ap.RequestId{0, 0}, &xapp.RMRMeid{RanName: *meid})
452 //trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(*endPoint), *restSubId, e2ap.RequestId{0, 0}, &xapp.RMRMeid{RanName: *meid})
454 err := fmt.Errorf("XAPP-SubDelReq transaction not created. restSubId %s, endPoint %s, meid %s, instanceId %v", *restSubId, *endPoint, *meid, instanceId)
455 xapp.Logger.Error("%s", err.Error())
457 defer trans.Release()
459 err := c.tracker.Track(trans)
461 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
462 xapp.Logger.Error("%s", err.Error())
463 return &time.ParseError{}
466 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{instanceId})
468 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
469 xapp.Logger.Error("%s", err.Error())
475 go c.handleSubscriptionDelete(subs, trans)
476 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
478 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
480 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
485 //-------------------------------------------------------------------
487 //-------------------------------------------------------------------
488 func (c *Control) QueryHandler() (models.SubscriptionList, error) {
489 xapp.Logger.Info("QueryHandler() called")
493 return c.registry.QueryHandler()
496 func (c *Control) TestRestHandler(w http.ResponseWriter, r *http.Request) {
497 xapp.Logger.Info("TestRestHandler() called")
499 pathParams := mux.Vars(r)
500 s := pathParams["testId"]
502 // This can be used to delete single subscription from db
503 if contains := strings.Contains(s, "deletesubid="); contains == true {
504 var splits = strings.Split(s, "=")
505 if subId, err := strconv.ParseInt(splits[1], 10, 64); err == nil {
506 xapp.Logger.Info("RemoveSubscriptionFromSdl() called. subId = %v", subId)
507 c.RemoveSubscriptionFromSdl(uint32(subId))
512 // This can be used to remove all subscriptions db from
514 xapp.Logger.Info("RemoveAllSubscriptionsFromSdl() called")
515 c.RemoveAllSubscriptionsFromSdl()
519 // This is meant to cause submgr's restart in testing
521 xapp.Logger.Info("os.Exit(1) called")
525 xapp.Logger.Info("Unsupported rest command received %s", s)
528 //-------------------------------------------------------------------
530 //-------------------------------------------------------------------
532 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
533 params := &xapp.RMRParams{}
534 params.Mtype = trans.GetMtype()
535 params.SubId = int(subs.GetReqId().InstanceId)
537 params.Meid = subs.GetMeid()
539 params.PayloadLen = len(trans.Payload.Buf)
540 params.Payload = trans.Payload.Buf
542 xapp.Logger.Info("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
543 err = c.SendWithRetry(params, false, 5)
545 xapp.Logger.Error("rmrSendToE2T: Send failed: %+v", err)
550 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
552 params := &xapp.RMRParams{}
553 params.Mtype = trans.GetMtype()
554 params.SubId = int(subs.GetReqId().InstanceId)
555 params.Xid = trans.GetXid()
556 params.Meid = trans.GetMeid()
558 params.PayloadLen = len(trans.Payload.Buf)
559 params.Payload = trans.Payload.Buf
561 xapp.Logger.Info("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
562 err = c.SendWithRetry(params, false, 5)
564 xapp.Logger.Error("rmrSendToXapp: Send failed: %+v", err)
569 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
570 if c.RMRClient == nil {
571 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
572 xapp.Logger.Error("%s", err.Error())
577 defer c.RMRClient.Free(msg.Mbuf)
579 // xapp-frame might use direct access to c buffer and
580 // when msg.Mbuf is freed, someone might take it into use
581 // and payload data might be invalid inside message handle function
583 // subscriptions won't load system a lot so there is no
584 // real performance hit by cloning buffer into new go byte slice
585 cPay := append(msg.Payload[:0:0], msg.Payload...)
587 msg.PayloadLen = len(cPay)
590 case xapp.RIC_SUB_REQ:
591 go c.handleXAPPSubscriptionRequest(msg)
592 case xapp.RIC_SUB_RESP:
593 go c.handleE2TSubscriptionResponse(msg)
594 case xapp.RIC_SUB_FAILURE:
595 go c.handleE2TSubscriptionFailure(msg)
596 case xapp.RIC_SUB_DEL_REQ:
597 go c.handleXAPPSubscriptionDeleteRequest(msg)
598 case xapp.RIC_SUB_DEL_RESP:
599 go c.handleE2TSubscriptionDeleteResponse(msg)
600 case xapp.RIC_SUB_DEL_FAILURE:
601 go c.handleE2TSubscriptionDeleteFailure(msg)
603 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
608 //-------------------------------------------------------------------
609 // handle from XAPP Subscription Request
610 //------------------------------------------------------------------
611 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
612 xapp.Logger.Info("MSG from XAPP: %s", params.String())
613 c.UpdateCounter(cSubReqFromXapp)
615 subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
617 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
621 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
623 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
626 defer trans.Release()
628 if err = c.tracker.Track(trans); err != nil {
629 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
633 //TODO handle subscription toward e2term inside AssignToSubscription / hide handleSubscriptionCreate in it?
634 subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
636 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
640 c.wakeSubscriptionRequest(subs, trans)
643 //-------------------------------------------------------------------
644 // Wake Subscription Request to E2node
645 //------------------------------------------------------------------
646 func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) {
648 go c.handleSubscriptionCreate(subs, trans)
649 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
652 switch themsg := event.(type) {
653 case *e2ap.E2APSubscriptionResponse:
654 themsg.RequestId.Id = trans.RequestId.Id
655 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
658 c.UpdateCounter(cSubRespToXapp)
659 c.rmrSendToXapp("", subs, trans)
662 case *e2ap.E2APSubscriptionFailure:
663 themsg.RequestId.Id = trans.RequestId.Id
664 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
666 c.UpdateCounter(cSubFailToXapp)
667 c.rmrSendToXapp("", subs, trans)
673 xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(err, trans, subs))
674 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
677 //-------------------------------------------------------------------
678 // handle from XAPP Subscription Delete Request
679 //------------------------------------------------------------------
680 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
681 xapp.Logger.Info("MSG from XAPP: %s", params.String())
682 c.UpdateCounter(cSubDelReqFromXapp)
684 subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
686 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
690 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
692 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
695 defer trans.Release()
697 err = c.tracker.Track(trans)
699 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
703 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
705 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
712 go c.handleSubscriptionDelete(subs, trans)
713 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
715 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
717 if subs.NoRespToXapp == true {
718 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
722 // Whatever is received success, fail or timeout, send successful delete response
723 subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
724 subDelRespMsg.RequestId.Id = trans.RequestId.Id
725 subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
726 subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
727 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
729 c.UpdateCounter(cSubDelRespToXapp)
730 c.rmrSendToXapp("", subs, trans)
733 //TODO handle subscription toward e2term insiged RemoveFromSubscription / hide handleSubscriptionDelete in it?
734 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
737 //-------------------------------------------------------------------
738 // SUBS CREATE Handling
739 //-------------------------------------------------------------------
740 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp) {
742 var removeSubscriptionFromDb bool = false
743 trans := c.tracker.NewSubsTransaction(subs)
744 subs.WaitTransactionTurn(trans)
745 defer subs.ReleaseTransactionTurn(trans)
746 defer trans.Release()
748 xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
750 subRfMsg, valid := subs.GetCachedResponse()
751 if subRfMsg == nil && valid == true {
752 event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
753 switch event.(type) {
754 case *e2ap.E2APSubscriptionResponse:
755 subRfMsg, valid = subs.SetCachedResponse(event, true)
756 subs.SubRespRcvd = true
757 case *e2ap.E2APSubscriptionFailure:
758 removeSubscriptionFromDb = true
759 subRfMsg, valid = subs.SetCachedResponse(event, false)
760 xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
761 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
762 case *SubmgrRestartTestEvent:
763 // This simulates that no response has been received and after restart subscriptions are restored from db
764 xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
767 xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
768 removeSubscriptionFromDb = true
769 subRfMsg, valid = subs.SetCachedResponse(nil, false)
770 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
772 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
774 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
777 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
779 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
782 c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
783 parentTrans.SendEvent(subRfMsg, 0)
786 //-------------------------------------------------------------------
787 // SUBS DELETE Handling
788 //-------------------------------------------------------------------
790 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
792 trans := c.tracker.NewSubsTransaction(subs)
793 subs.WaitTransactionTurn(trans)
794 defer subs.ReleaseTransactionTurn(trans)
795 defer trans.Release()
797 xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
801 if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
804 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
808 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
809 // If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if
810 // RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...))
811 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
812 c.registry.UpdateSubscriptionToDb(subs, c)
813 parentTrans.SendEvent(nil, 0)
816 //-------------------------------------------------------------------
817 // send to E2T Subscription Request
818 //-------------------------------------------------------------------
819 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
821 var event interface{} = nil
822 var timedOut bool = false
824 subReqMsg := subs.SubReqMsg
825 subReqMsg.RequestId = subs.GetReqId().RequestId
826 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
828 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
832 // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
833 c.WriteSubscriptionToDb(subs)
835 for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
836 desc := fmt.Sprintf("(retry %d)", retries)
838 c.UpdateCounter(cSubReqToE2)
840 c.UpdateCounter(cSubReReqToE2)
842 c.rmrSendToE2T(desc, subs, trans)
843 if subs.DoNotWaitSubResp == false {
844 event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
846 c.UpdateCounter(cSubReqTimerExpiry)
850 // Simulating case where subscrition request has been sent but response has not been received before restart
851 event = &SubmgrRestartTestEvent{}
855 xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
859 //-------------------------------------------------------------------
860 // send to E2T Subscription Delete Request
861 //-------------------------------------------------------------------
863 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
865 var event interface{}
868 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
869 subDelReqMsg.RequestId = subs.GetReqId().RequestId
870 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
871 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
873 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
877 for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
878 desc := fmt.Sprintf("(retry %d)", retries)
880 c.UpdateCounter(cSubDelReqToE2)
882 c.UpdateCounter(cSubDelReReqToE2)
884 c.rmrSendToE2T(desc, subs, trans)
885 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
887 c.UpdateCounter(cSubDelReqTimerExpiry)
892 xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
896 //-------------------------------------------------------------------
897 // handle from E2T Subscription Response
898 //-------------------------------------------------------------------
899 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
900 xapp.Logger.Info("MSG from E2T: %s", params.String())
901 c.UpdateCounter(cSubRespFromE2)
903 subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
905 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
908 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
910 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
913 trans := subs.GetTransaction()
915 err = fmt.Errorf("Ongoing transaction not found")
916 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
919 sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
921 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
922 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
927 //-------------------------------------------------------------------
928 // handle from E2T Subscription Failure
929 //-------------------------------------------------------------------
930 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
931 xapp.Logger.Info("MSG from E2T: %s", params.String())
932 c.UpdateCounter(cSubFailFromE2)
933 subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
935 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
938 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
940 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
943 trans := subs.GetTransaction()
945 err = fmt.Errorf("Ongoing transaction not found")
946 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
949 sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
951 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
952 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
957 //-------------------------------------------------------------------
958 // handle from E2T Subscription Delete Response
959 //-------------------------------------------------------------------
960 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
961 xapp.Logger.Info("MSG from E2T: %s", params.String())
962 c.UpdateCounter(cSubDelRespFromE2)
963 subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
965 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
968 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
970 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
973 trans := subs.GetTransaction()
975 err = fmt.Errorf("Ongoing transaction not found")
976 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
979 sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
981 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
982 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
987 //-------------------------------------------------------------------
988 // handle from E2T Subscription Delete Failure
989 //-------------------------------------------------------------------
990 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
991 xapp.Logger.Info("MSG from E2T: %s", params.String())
992 c.UpdateCounter(cSubDelFailFromE2)
993 subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
995 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
998 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
1000 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1003 trans := subs.GetTransaction()
1005 err = fmt.Errorf("Ongoing transaction not found")
1006 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
1009 sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
1010 if sendOk == false {
1011 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1012 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
1017 //-------------------------------------------------------------------
1019 //-------------------------------------------------------------------
1020 func typeofSubsMessage(v interface{}) string {
1025 //case *e2ap.E2APSubscriptionRequest:
1027 case *e2ap.E2APSubscriptionResponse:
1029 case *e2ap.E2APSubscriptionFailure:
1031 //case *e2ap.E2APSubscriptionDeleteRequest:
1032 // return "SubDelReq"
1033 case *e2ap.E2APSubscriptionDeleteResponse:
1035 case *e2ap.E2APSubscriptionDeleteFailure:
1042 //-------------------------------------------------------------------
1044 //-------------------------------------------------------------------
1045 func (c *Control) WriteSubscriptionToDb(subs *Subscription) {
1046 xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
1047 err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
1049 xapp.Logger.Error("%v", err)
1053 //-------------------------------------------------------------------
1055 //-------------------------------------------------------------------
1056 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) {
1058 if removeSubscriptionFromDb == true {
1059 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1060 c.RemoveSubscriptionFromDb(subs)
1062 // Update is needed for successful response and merge case here
1063 if subs.RetryFromXapp == false {
1064 c.WriteSubscriptionToDb(subs)
1067 subs.RetryFromXapp = false
1070 //-------------------------------------------------------------------
1072 //-------------------------------------------------------------------
1073 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
1074 xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
1075 err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
1077 xapp.Logger.Error("%v", err)
1081 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) {
1083 xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
1085 // Send delete for every endpoint in the subscription
1086 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1087 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1088 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1089 mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1091 xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
1094 for _, endPoint := range subs.EpList.Endpoints {
1095 params := &xapp.RMRParams{}
1096 params.Mtype = mType
1097 params.SubId = int(subs.GetReqId().InstanceId)
1099 params.Meid = subs.Meid
1100 params.Src = endPoint.String()
1101 params.PayloadLen = len(payload.Buf)
1102 params.Payload = payload.Buf
1104 subs.DeleteFromDb = true
1105 c.handleXAPPSubscriptionDeleteRequest(params)
1109 func (c *Control) PrintRESTSubscriptionRequest(p *models.SubscriptionParams) {
1111 fmt.Println("CRESTSubscriptionRequest")
1112 fmt.Printf(" ClientEndpoint.Host = %s\n", p.ClientEndpoint.Host)
1114 if p.ClientEndpoint.HTTPPort != nil {
1115 fmt.Printf(" ClientEndpoint.HTTPPort = %v\n", *p.ClientEndpoint.HTTPPort)
1117 fmt.Println(" ClientEndpoint.HTTPPort = nil")
1120 if p.ClientEndpoint.RMRPort != nil {
1121 fmt.Printf(" ClientEndpoint.RMRPort = %v\n", *p.ClientEndpoint.RMRPort)
1123 fmt.Println(" ClientEndpoint.RMRPort = nil")
1127 fmt.Printf(" Meid = %s\n", *p.Meid)
1129 fmt.Println(" Meid = nil")
1132 for _, subscriptionDetail := range p.SubscriptionDetails {
1133 if p.RANFunctionID != nil {
1134 fmt.Printf(" RANFunctionID = %v\n", *p.RANFunctionID)
1136 fmt.Println(" RANFunctionID = nil")
1138 fmt.Printf(" SubscriptionDetail.RequestorID = %v\n", *subscriptionDetail.RequestorID)
1139 fmt.Printf(" SubscriptionDetail.InstanceID = %v\n", *subscriptionDetail.InstanceID)
1140 fmt.Printf(" SubscriptionDetail.EventTriggers.OctetString = %X\n", subscriptionDetail.EventTriggers.OctetString)
1142 for _, actionToBeSetup := range subscriptionDetail.ActionToBeSetupList {
1143 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionID = %v\n", *actionToBeSetup.ActionID)
1144 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionType = %s\n", *actionToBeSetup.ActionType)
1145 if actionToBeSetup.ActionDefinition != nil {
1146 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionDefinition.OctetString = %X\n", actionToBeSetup.ActionDefinition.OctetString)
1148 fmt.Println(" SubscriptionDetail.ActionToBeSetup.ActionDefinition = nil")
1151 if actionToBeSetup.SubsequentAction != nil {
1152 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.SubsequentAction.SubsequentActionType = %s\n", *actionToBeSetup.SubsequentAction.SubsequentActionType)
1153 fmt.Printf(" SubscriptionDetail.ActionToBeSetup..SubsequentAction.TimeToWait = %s\n", *actionToBeSetup.SubsequentAction.TimeToWait)
1155 fmt.Println(" SubscriptionDetail.ActionToBeSetup.SubsequentAction = nil")