2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
30 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
31 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
32 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
33 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
34 httptransport "github.com/go-openapi/runtime/client"
35 "github.com/go-openapi/strfmt"
36 "github.com/gorilla/mux"
37 "github.com/segmentio/ksuid"
38 "github.com/spf13/viper"
41 //-----------------------------------------------------------------------------
43 //-----------------------------------------------------------------------------
45 func idstring(err error, entries ...fmt.Stringer) string {
46 var retval string = ""
47 var filler string = ""
48 for _, entry := range entries {
49 retval += filler + entry.String()
53 retval += filler + "err(" + err.Error() + ")"
59 //-----------------------------------------------------------------------------
61 //-----------------------------------------------------------------------------
63 var e2tSubReqTimeout time.Duration
64 var e2tSubDelReqTime time.Duration
65 var e2tRecvMsgTimeout time.Duration
66 var waitRouteCleanup_ms time.Duration
67 var e2tMaxSubReqTryCount uint64 // Initial try + retry
68 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
69 var readSubsFromDb string
77 //subscriber *xapp.Subscriber
80 Counters map[string]xapp.Counter
89 type SubmgrRestartTestEvent struct{}
90 type SubmgrRestartUpEvent struct{}
93 xapp.Logger.Info("SUBMGR")
95 viper.SetEnvPrefix("submgr")
96 viper.AllowEmptyEnv(true)
99 func NewControl() *Control {
101 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
102 rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
104 registry := new(Registry)
105 registry.Initialize()
106 registry.rtmgrClient = &rtmgrClient
108 tracker := new(Tracker)
111 c := &Control{e2ap: new(E2ap),
115 //subscriber: subscriber,
116 Counters: xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
118 c.ReadConfigParameters("")
120 // Register REST handler for testing support
121 xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
122 xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET")
124 go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler, c.SubscriptionDeleteHandlerCB)
126 if readSubsFromDb == "false" {
130 // Read subscriptions from db
131 xapp.Logger.Info("Reading subscriptions from db")
132 subIds, register, err := c.ReadAllSubscriptionsFromSdl()
134 xapp.Logger.Error("%v", err)
136 c.registry.subIds = subIds
137 c.registry.register = register
138 c.HandleUncompletedSubscriptions(register)
143 func (c *Control) SymptomDataHandler(w http.ResponseWriter, r *http.Request) {
144 subscriptions, _ := c.registry.QueryHandler()
145 xapp.Resource.SendSymptomDataJson(w, r, subscriptions, "platform/subscriptions.json")
148 //-------------------------------------------------------------------
150 //-------------------------------------------------------------------
151 func (c *Control) ReadConfigParameters(f string) {
153 // viper.GetDuration returns nanoseconds
154 e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
155 if e2tSubReqTimeout == 0 {
156 e2tSubReqTimeout = 2000 * 1000000
158 xapp.Logger.Info("e2tSubReqTimeout %v", e2tSubReqTimeout)
159 e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
160 if e2tSubDelReqTime == 0 {
161 e2tSubDelReqTime = 2000 * 1000000
163 xapp.Logger.Info("e2tSubDelReqTime %v", e2tSubDelReqTime)
164 e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
165 if e2tRecvMsgTimeout == 0 {
166 e2tRecvMsgTimeout = 2000 * 1000000
168 xapp.Logger.Info("e2tRecvMsgTimeout %v", e2tRecvMsgTimeout)
170 // Internal cfg parameter, used to define a wait time for RMR route clean-up. None default
171 // value 100ms used currently only in unittests.
172 waitRouteCleanup_ms = viper.GetDuration("controls.waitRouteCleanup_ms") * 1000000
173 if waitRouteCleanup_ms == 0 {
174 waitRouteCleanup_ms = 5000 * 1000000
176 xapp.Logger.Info("waitRouteCleanup %v", waitRouteCleanup_ms)
178 e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
179 if e2tMaxSubReqTryCount == 0 {
180 e2tMaxSubReqTryCount = 1
182 xapp.Logger.Info("e2tMaxSubReqTryCount %v", e2tMaxSubReqTryCount)
184 e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
185 if e2tMaxSubDelReqTryCount == 0 {
186 e2tMaxSubDelReqTryCount = 1
188 xapp.Logger.Info("e2tMaxSubDelReqTryCount %v", e2tMaxSubDelReqTryCount)
190 readSubsFromDb = viper.GetString("controls.readSubsFromDb")
191 if readSubsFromDb == "" {
192 readSubsFromDb = "true"
194 xapp.Logger.Info("readSubsFromDb %v", readSubsFromDb)
197 //-------------------------------------------------------------------
199 //-------------------------------------------------------------------
200 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
202 xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
203 for subId, subs := range register {
204 if subs.SubRespRcvd == false {
205 subs.NoRespToXapp = true
206 xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
207 c.SendSubscriptionDeleteReq(subs)
212 func (c *Control) ReadyCB(data interface{}) {
213 if c.RMRClient == nil {
214 c.RMRClient = xapp.Rmr
218 func (c *Control) Run() {
219 xapp.SetReadyCB(c.ReadyCB, nil)
220 xapp.AddConfigChangeListener(c.ReadConfigParameters)
224 //-------------------------------------------------------------------
226 //-------------------------------------------------------------------
227 func (c *Control) SubscriptionHandler(params interface{}) (*models.SubscriptionResponse, error) {
229 restSubId := ksuid.New().String()
230 subResp := models.SubscriptionResponse{}
231 subResp.SubscriptionID = &restSubId
232 p := params.(*models.SubscriptionParams)
236 c.UpdateCounter(cSubReqFromXapp)
238 if p.ClientEndpoint == nil {
239 xapp.Logger.Error("ClientEndpoint == nil")
240 return nil, fmt.Errorf("")
243 _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*p.ClientEndpoint)
245 xapp.Logger.Error("%s", err.Error())
249 restSubscription, err := c.registry.CreateRESTSubscription(&restSubId, &xAppRmrEndpoint, p.Meid)
251 xapp.Logger.Error("%s", err.Error())
255 subReqList := e2ap.SubscriptionRequestList{}
256 err = c.e2ap.FillSubscriptionReqMsgs(params, &subReqList, restSubscription)
258 xapp.Logger.Error("%s", err.Error())
259 c.registry.DeleteRESTSubscription(&restSubId)
263 go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId)
269 //-------------------------------------------------------------------
271 //-------------------------------------------------------------------
273 func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription, subReqList *e2ap.SubscriptionRequestList,
274 clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string) {
276 xapp.Logger.Info("Subscription Request count=%v ", len(subReqList.E2APSubscriptionRequests))
278 _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*clientEndpoint)
280 xapp.Logger.Error("%s", err.Error())
284 var requestorID int64
286 for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ {
287 subReqMsg := subReqList.E2APSubscriptionRequests[index]
289 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), *restSubId, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid})
291 c.registry.DeleteRESTSubscription(restSubId)
292 xapp.Logger.Error("XAPP-SubReq transaction not created. RESTSubId=%s, EndPoint=%s, Meid=%s", *restSubId, xAppRmrEndpoint, *meid)
296 defer trans.Release()
297 xapp.Logger.Info("Handle SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
298 subRespMsg, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, restSubId)
300 // Send notification to xApp that prosessing of a Subscription Request has failed. Currently it is not possible
301 // to indicate error. Such possibility should be added. As a workaround requestorID and instanceId are set to zero value
302 requestorID = (int64)(0)
303 instanceId = (int64)(0)
304 resp := &models.SubscriptionResponse{
305 SubscriptionID: restSubId,
306 SubscriptionInstances: []*models.SubscriptionInstance{
307 &models.SubscriptionInstance{RequestorID: &requestorID, InstanceID: &instanceId},
310 // Mark REST subscription request processed.
311 restSubscription.SetProcessed()
312 xapp.Logger.Info("Sending unsuccessful REST notification to endpoint=%v:%v, InstanceId=%v, %s", clientEndpoint.Host, clientEndpoint.HTTPPort, instanceId, idstring(nil, trans))
313 xapp.Subscription.Notify(resp, *clientEndpoint)
315 xapp.Logger.Info("SubscriptionRequest index=%v processed successfully. endpoint=%v, InstanceId=%v, %s", index, *clientEndpoint, instanceId, idstring(nil, trans))
317 // Store successfully processed InstanceId for deletion
318 restSubscription.AddInstanceId(subRespMsg.RequestId.InstanceId)
320 // Send notification to xApp that a Subscription Request has been processed.
321 requestorID = (int64)(subRespMsg.RequestId.Id)
322 instanceId = (int64)(subRespMsg.RequestId.InstanceId)
323 resp := &models.SubscriptionResponse{
324 SubscriptionID: restSubId,
325 SubscriptionInstances: []*models.SubscriptionInstance{
326 &models.SubscriptionInstance{RequestorID: &requestorID, InstanceID: &instanceId},
329 // Mark REST subscription request processesd.
330 restSubscription.SetProcessed()
331 xapp.Logger.Info("Sending successful REST notification to endpoint=%v, InstanceId=%v, %s", *clientEndpoint, instanceId, idstring(nil, trans))
332 xapp.Subscription.Notify(resp, *clientEndpoint)
334 c.UpdateCounter(cSubRespToXapp)
338 //-------------------------------------------------------------------
340 //------------------------------------------------------------------
341 func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, meid *string,
342 restSubId *string) (*e2ap.E2APSubscriptionResponse, error) {
344 err := c.tracker.Track(trans)
346 err = fmt.Errorf("XAPP-SubReq: %s", idstring(err, trans))
347 xapp.Logger.Error("%s", err.Error())
351 subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
353 err = fmt.Errorf("XAPP-SubReq: %s", idstring(err, trans))
354 xapp.Logger.Error("%s", err.Error())
361 go c.handleSubscriptionCreate(subs, trans)
362 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
366 switch themsg := event.(type) {
367 case *e2ap.E2APSubscriptionResponse:
370 case *e2ap.E2APSubscriptionFailure:
371 err = fmt.Errorf("SubscriptionFailure received")
377 err = fmt.Errorf("XAPP-SubReq: failed %s", idstring(err, trans, subs))
378 xapp.Logger.Error("%s", err.Error())
379 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
383 //-------------------------------------------------------------------
385 //-------------------------------------------------------------------
386 func (c *Control) SubscriptionDeleteHandlerCB(restSubId string) error {
389 c.UpdateCounter(cSubDelReqFromXapp)
391 xapp.Logger.Info("SubscriptionDeleteRequest from XAPP")
393 restSubscription, err := c.registry.GetRESTSubscription(restSubId)
395 xapp.Logger.Error("%s", err.Error())
396 if restSubscription == nil {
397 // Subscription was not found
400 if restSubscription.SubReqOngoing == true {
401 err := fmt.Errorf("Handling of the REST Subscription Request still ongoing %s", restSubId)
402 xapp.Logger.Error("%s", err.Error())
404 } else if restSubscription.SubDelReqOngoing == true {
405 // Previous request for same restSubId still ongoing
411 xAppRmrEndPoint := restSubscription.xAppRmrEndPoint
413 for _, instanceId := range restSubscription.InstanceIds {
414 err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId)
416 xapp.Logger.Error("%s", err.Error())
419 xapp.Logger.Info("Deleteting instanceId = %v", instanceId)
420 restSubscription.DeleteInstanceId(instanceId)
422 c.registry.DeleteRESTSubscription(&restSubId)
425 c.UpdateCounter(cSubDelRespToXapp)
430 //-------------------------------------------------------------------
432 //-------------------------------------------------------------------
433 func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32) error {
435 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(*endPoint), *restSubId, e2ap.RequestId{0, 0}, &xapp.RMRMeid{RanName: *meid})
437 err := fmt.Errorf("XAPP-SubDelReq transaction not created. restSubId %s, endPoint %s, meid %s, instanceId %v", *restSubId, *endPoint, *meid, instanceId)
438 xapp.Logger.Error("%s", err.Error())
440 defer trans.Release()
442 err := c.tracker.Track(trans)
444 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
445 xapp.Logger.Error("%s", err.Error())
446 return &time.ParseError{}
449 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{instanceId})
451 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
452 xapp.Logger.Error("%s", err.Error())
458 go c.handleSubscriptionDelete(subs, trans)
459 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
461 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
463 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
468 //-------------------------------------------------------------------
470 //-------------------------------------------------------------------
471 func (c *Control) QueryHandler() (models.SubscriptionList, error) {
472 xapp.Logger.Info("QueryHandler() called")
476 return c.registry.QueryHandler()
479 func (c *Control) TestRestHandler(w http.ResponseWriter, r *http.Request) {
480 xapp.Logger.Info("TestRestHandler() called")
482 pathParams := mux.Vars(r)
483 s := pathParams["testId"]
485 // This can be used to delete single subscription from db
486 if contains := strings.Contains(s, "deletesubid="); contains == true {
487 var splits = strings.Split(s, "=")
488 if subId, err := strconv.ParseInt(splits[1], 10, 64); err == nil {
489 xapp.Logger.Info("RemoveSubscriptionFromSdl() called. subId = %v", subId)
490 c.RemoveSubscriptionFromSdl(uint32(subId))
495 // This can be used to remove all subscriptions db from
497 xapp.Logger.Info("RemoveAllSubscriptionsFromSdl() called")
498 c.RemoveAllSubscriptionsFromSdl()
502 // This is meant to cause submgr's restart in testing
504 xapp.Logger.Info("os.Exit(1) called")
508 xapp.Logger.Info("Unsupported rest command received %s", s)
511 //-------------------------------------------------------------------
513 //-------------------------------------------------------------------
515 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
516 params := &xapp.RMRParams{}
517 params.Mtype = trans.GetMtype()
518 params.SubId = int(subs.GetReqId().InstanceId)
520 params.Meid = subs.GetMeid()
522 params.PayloadLen = len(trans.Payload.Buf)
523 params.Payload = trans.Payload.Buf
525 xapp.Logger.Info("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
526 err = c.SendWithRetry(params, false, 5)
528 xapp.Logger.Error("rmrSendToE2T: Send failed: %+v", err)
533 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
535 params := &xapp.RMRParams{}
536 params.Mtype = trans.GetMtype()
537 params.SubId = int(subs.GetReqId().InstanceId)
538 params.Xid = trans.GetXid()
539 params.Meid = trans.GetMeid()
541 params.PayloadLen = len(trans.Payload.Buf)
542 params.Payload = trans.Payload.Buf
544 xapp.Logger.Info("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
545 err = c.SendWithRetry(params, false, 5)
547 xapp.Logger.Error("rmrSendToXapp: Send failed: %+v", err)
552 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
553 if c.RMRClient == nil {
554 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
555 xapp.Logger.Error("%s", err.Error())
560 defer c.RMRClient.Free(msg.Mbuf)
562 // xapp-frame might use direct access to c buffer and
563 // when msg.Mbuf is freed, someone might take it into use
564 // and payload data might be invalid inside message handle function
566 // subscriptions won't load system a lot so there is no
567 // real performance hit by cloning buffer into new go byte slice
568 cPay := append(msg.Payload[:0:0], msg.Payload...)
570 msg.PayloadLen = len(cPay)
573 case xapp.RIC_SUB_REQ:
574 go c.handleXAPPSubscriptionRequest(msg)
575 case xapp.RIC_SUB_RESP:
576 go c.handleE2TSubscriptionResponse(msg)
577 case xapp.RIC_SUB_FAILURE:
578 go c.handleE2TSubscriptionFailure(msg)
579 case xapp.RIC_SUB_DEL_REQ:
580 go c.handleXAPPSubscriptionDeleteRequest(msg)
581 case xapp.RIC_SUB_DEL_RESP:
582 go c.handleE2TSubscriptionDeleteResponse(msg)
583 case xapp.RIC_SUB_DEL_FAILURE:
584 go c.handleE2TSubscriptionDeleteFailure(msg)
586 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
591 //-------------------------------------------------------------------
592 // handle from XAPP Subscription Request
593 //------------------------------------------------------------------
594 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
595 xapp.Logger.Info("MSG from XAPP: %s", params.String())
596 c.UpdateCounter(cSubReqFromXapp)
598 subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
600 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
604 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
606 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
609 defer trans.Release()
611 if err = c.tracker.Track(trans); err != nil {
612 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
616 //TODO handle subscription toward e2term inside AssignToSubscription / hide handleSubscriptionCreate in it?
617 subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
619 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
623 c.wakeSubscriptionRequest(subs, trans)
626 //-------------------------------------------------------------------
627 // Wake Subscription Request to E2node
628 //------------------------------------------------------------------
629 func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) {
631 go c.handleSubscriptionCreate(subs, trans)
632 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
635 switch themsg := event.(type) {
636 case *e2ap.E2APSubscriptionResponse:
637 themsg.RequestId.Id = trans.RequestId.Id
638 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
641 c.UpdateCounter(cSubRespToXapp)
642 c.rmrSendToXapp("", subs, trans)
645 case *e2ap.E2APSubscriptionFailure:
646 themsg.RequestId.Id = trans.RequestId.Id
647 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
649 c.UpdateCounter(cSubFailToXapp)
650 c.rmrSendToXapp("", subs, trans)
656 xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(err, trans, subs))
657 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
660 //-------------------------------------------------------------------
661 // handle from XAPP Subscription Delete Request
662 //------------------------------------------------------------------
663 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
664 xapp.Logger.Info("MSG from XAPP: %s", params.String())
665 c.UpdateCounter(cSubDelReqFromXapp)
667 subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
669 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
673 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
675 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
678 defer trans.Release()
680 err = c.tracker.Track(trans)
682 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
686 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
688 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
695 go c.handleSubscriptionDelete(subs, trans)
696 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
698 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
700 if subs.NoRespToXapp == true {
701 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
705 // Whatever is received success, fail or timeout, send successful delete response
706 subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
707 subDelRespMsg.RequestId.Id = trans.RequestId.Id
708 subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
709 subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
710 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
712 c.UpdateCounter(cSubDelRespToXapp)
713 c.rmrSendToXapp("", subs, trans)
716 //TODO handle subscription toward e2term insiged RemoveFromSubscription / hide handleSubscriptionDelete in it?
717 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
720 //-------------------------------------------------------------------
721 // SUBS CREATE Handling
722 //-------------------------------------------------------------------
723 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp) {
725 var removeSubscriptionFromDb bool = false
726 trans := c.tracker.NewSubsTransaction(subs)
727 subs.WaitTransactionTurn(trans)
728 defer subs.ReleaseTransactionTurn(trans)
729 defer trans.Release()
731 xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
733 subRfMsg, valid := subs.GetCachedResponse()
734 if subRfMsg == nil && valid == true {
735 event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
736 switch event.(type) {
737 case *e2ap.E2APSubscriptionResponse:
738 subRfMsg, valid = subs.SetCachedResponse(event, true)
739 subs.SubRespRcvd = true
740 case *e2ap.E2APSubscriptionFailure:
741 removeSubscriptionFromDb = true
742 subRfMsg, valid = subs.SetCachedResponse(event, false)
743 xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
744 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
745 case *SubmgrRestartTestEvent:
746 // This simulates that no response has been received and after restart subscriptions are restored from db
747 xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
750 xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
751 removeSubscriptionFromDb = true
752 subRfMsg, valid = subs.SetCachedResponse(nil, false)
753 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
755 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
757 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
760 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
762 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
765 c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
766 parentTrans.SendEvent(subRfMsg, 0)
769 //-------------------------------------------------------------------
770 // SUBS DELETE Handling
771 //-------------------------------------------------------------------
773 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
775 trans := c.tracker.NewSubsTransaction(subs)
776 subs.WaitTransactionTurn(trans)
777 defer subs.ReleaseTransactionTurn(trans)
778 defer trans.Release()
780 xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
784 if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
787 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
791 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
792 // If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if
793 // RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...))
794 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
795 c.registry.UpdateSubscriptionToDb(subs, c)
796 parentTrans.SendEvent(nil, 0)
799 //-------------------------------------------------------------------
800 // send to E2T Subscription Request
801 //-------------------------------------------------------------------
802 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
804 var event interface{} = nil
805 var timedOut bool = false
807 subReqMsg := subs.SubReqMsg
808 subReqMsg.RequestId = subs.GetReqId().RequestId
809 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
811 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
815 // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
816 c.WriteSubscriptionToDb(subs)
818 for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
819 desc := fmt.Sprintf("(retry %d)", retries)
821 c.UpdateCounter(cSubReqToE2)
823 c.UpdateCounter(cSubReReqToE2)
825 c.rmrSendToE2T(desc, subs, trans)
826 if subs.DoNotWaitSubResp == false {
827 event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
829 c.UpdateCounter(cSubReqTimerExpiry)
833 // Simulating case where subscrition request has been sent but response has not been received before restart
834 event = &SubmgrRestartTestEvent{}
838 xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
842 //-------------------------------------------------------------------
843 // send to E2T Subscription Delete Request
844 //-------------------------------------------------------------------
846 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
848 var event interface{}
851 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
852 subDelReqMsg.RequestId = subs.GetReqId().RequestId
853 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
854 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
856 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
860 for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
861 desc := fmt.Sprintf("(retry %d)", retries)
863 c.UpdateCounter(cSubDelReqToE2)
865 c.UpdateCounter(cSubDelReReqToE2)
867 c.rmrSendToE2T(desc, subs, trans)
868 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
870 c.UpdateCounter(cSubDelReqTimerExpiry)
875 xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
879 //-------------------------------------------------------------------
880 // handle from E2T Subscription Response
881 //-------------------------------------------------------------------
882 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
883 xapp.Logger.Info("MSG from E2T: %s", params.String())
884 c.UpdateCounter(cSubRespFromE2)
885 subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
887 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
890 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
892 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
895 trans := subs.GetTransaction()
897 err = fmt.Errorf("Ongoing transaction not found")
898 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
901 sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
903 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
904 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
909 //-------------------------------------------------------------------
910 // handle from E2T Subscription Failure
911 //-------------------------------------------------------------------
912 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
913 xapp.Logger.Info("MSG from E2T: %s", params.String())
914 c.UpdateCounter(cSubFailFromE2)
915 subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
917 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
920 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
922 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
925 trans := subs.GetTransaction()
927 err = fmt.Errorf("Ongoing transaction not found")
928 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
931 sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
933 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
934 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
939 //-------------------------------------------------------------------
940 // handle from E2T Subscription Delete Response
941 //-------------------------------------------------------------------
942 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
943 xapp.Logger.Info("MSG from E2T: %s", params.String())
944 c.UpdateCounter(cSubDelRespFromE2)
945 subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
947 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
950 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
952 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
955 trans := subs.GetTransaction()
957 err = fmt.Errorf("Ongoing transaction not found")
958 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
961 sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
963 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
964 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
969 //-------------------------------------------------------------------
970 // handle from E2T Subscription Delete Failure
971 //-------------------------------------------------------------------
972 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
973 xapp.Logger.Info("MSG from E2T: %s", params.String())
974 c.UpdateCounter(cSubDelFailFromE2)
975 subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
977 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
980 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
982 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
985 trans := subs.GetTransaction()
987 err = fmt.Errorf("Ongoing transaction not found")
988 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
991 sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
993 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
994 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
999 //-------------------------------------------------------------------
1001 //-------------------------------------------------------------------
1002 func typeofSubsMessage(v interface{}) string {
1007 case *e2ap.E2APSubscriptionRequest:
1009 case *e2ap.E2APSubscriptionResponse:
1011 case *e2ap.E2APSubscriptionFailure:
1013 case *e2ap.E2APSubscriptionDeleteRequest:
1015 case *e2ap.E2APSubscriptionDeleteResponse:
1017 case *e2ap.E2APSubscriptionDeleteFailure:
1024 //-------------------------------------------------------------------
1026 //-------------------------------------------------------------------
1027 func (c *Control) WriteSubscriptionToDb(subs *Subscription) {
1028 xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
1029 err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
1031 xapp.Logger.Error("%v", err)
1035 //-------------------------------------------------------------------
1037 //-------------------------------------------------------------------
1038 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) {
1040 if removeSubscriptionFromDb == true {
1041 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1042 c.RemoveSubscriptionFromDb(subs)
1044 // Update is needed for successful response and merge case here
1045 if subs.RetryFromXapp == false {
1046 c.WriteSubscriptionToDb(subs)
1049 subs.RetryFromXapp = false
1052 //-------------------------------------------------------------------
1054 //-------------------------------------------------------------------
1055 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
1056 xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
1057 err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
1059 xapp.Logger.Error("%v", err)
1063 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) {
1065 xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
1067 // Send delete for every endpoint in the subscription
1068 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1069 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1070 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1071 mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1073 xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
1076 for _, endPoint := range subs.EpList.Endpoints {
1077 params := &xapp.RMRParams{}
1078 params.Mtype = mType
1079 params.SubId = int(subs.GetReqId().InstanceId)
1081 params.Meid = subs.Meid
1082 params.Src = endPoint.String()
1083 params.PayloadLen = len(payload.Buf)
1084 params.Payload = payload.Buf
1086 subs.DeleteFromDb = true
1087 c.handleXAPPSubscriptionDeleteRequest(params)