2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
30 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
31 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
32 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
33 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
34 httptransport "github.com/go-openapi/runtime/client"
35 "github.com/go-openapi/strfmt"
36 "github.com/gorilla/mux"
37 "github.com/segmentio/ksuid"
38 "github.com/spf13/viper"
41 //-----------------------------------------------------------------------------
43 //-----------------------------------------------------------------------------
45 func idstring(err error, entries ...fmt.Stringer) string {
46 var retval string = ""
47 var filler string = ""
48 for _, entry := range entries {
49 retval += filler + entry.String()
53 retval += filler + "err(" + err.Error() + ")"
59 //-----------------------------------------------------------------------------
61 //-----------------------------------------------------------------------------
63 var e2tSubReqTimeout time.Duration
64 var e2tSubDelReqTime time.Duration
65 var e2tRecvMsgTimeout time.Duration
66 var waitRouteCleanup_ms time.Duration
67 var e2tMaxSubReqTryCount uint64 // Initial try + retry
68 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
69 var readSubsFromDb string
77 //subscriber *xapp.Subscriber
80 Counters map[string]xapp.Counter
89 type SubmgrRestartTestEvent struct{}
90 type SubmgrRestartUpEvent struct{}
93 xapp.Logger.Info("SUBMGR")
95 viper.SetEnvPrefix("submgr")
96 viper.AllowEmptyEnv(true)
99 func NewControl() *Control {
101 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
102 rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
104 registry := new(Registry)
105 registry.Initialize()
106 registry.rtmgrClient = &rtmgrClient
108 tracker := new(Tracker)
111 c := &Control{e2ap: new(E2ap),
115 //subscriber: subscriber,
116 Counters: xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
118 c.ReadConfigParameters("")
120 // Register REST handler for testing support
121 xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
122 xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET")
124 go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler, c.SubscriptionDeleteHandlerCB)
126 if readSubsFromDb == "false" {
130 // Read subscriptions from db
131 xapp.Logger.Info("Reading subscriptions from db")
132 subIds, register, err := c.ReadAllSubscriptionsFromSdl()
134 xapp.Logger.Error("%v", err)
136 c.registry.subIds = subIds
137 c.registry.register = register
138 c.HandleUncompletedSubscriptions(register)
143 func (c *Control) SymptomDataHandler(w http.ResponseWriter, r *http.Request) {
144 subscriptions, _ := c.registry.QueryHandler()
145 xapp.Resource.SendSymptomDataJson(w, r, subscriptions, "platform/subscriptions.json")
148 //-------------------------------------------------------------------
150 //-------------------------------------------------------------------
151 func (c *Control) ReadConfigParameters(f string) {
153 // viper.GetDuration returns nanoseconds
154 e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
155 if e2tSubReqTimeout == 0 {
156 e2tSubReqTimeout = 2000 * 1000000
158 xapp.Logger.Info("e2tSubReqTimeout %v", e2tSubReqTimeout)
159 e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
160 if e2tSubDelReqTime == 0 {
161 e2tSubDelReqTime = 2000 * 1000000
163 xapp.Logger.Info("e2tSubDelReqTime %v", e2tSubDelReqTime)
164 e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
165 if e2tRecvMsgTimeout == 0 {
166 e2tRecvMsgTimeout = 2000 * 1000000
168 xapp.Logger.Info("e2tRecvMsgTimeout %v", e2tRecvMsgTimeout)
170 // Internal cfg parameter, used to define a wait time for RMR route clean-up. None default
171 // value 100ms used currently only in unittests.
172 waitRouteCleanup_ms = viper.GetDuration("controls.waitRouteCleanup_ms") * 1000000
173 if waitRouteCleanup_ms == 0 {
174 waitRouteCleanup_ms = 5000 * 1000000
176 xapp.Logger.Info("waitRouteCleanup %v", waitRouteCleanup_ms)
178 e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
179 if e2tMaxSubReqTryCount == 0 {
180 e2tMaxSubReqTryCount = 1
182 xapp.Logger.Info("e2tMaxSubReqTryCount %v", e2tMaxSubReqTryCount)
184 e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
185 if e2tMaxSubDelReqTryCount == 0 {
186 e2tMaxSubDelReqTryCount = 1
188 xapp.Logger.Info("e2tMaxSubDelReqTryCount %v", e2tMaxSubDelReqTryCount)
190 readSubsFromDb = viper.GetString("controls.readSubsFromDb")
191 if readSubsFromDb == "" {
192 readSubsFromDb = "true"
194 xapp.Logger.Info("readSubsFromDb %v", readSubsFromDb)
197 //-------------------------------------------------------------------
199 //-------------------------------------------------------------------
200 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
202 xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
203 for subId, subs := range register {
204 if subs.SubRespRcvd == false {
205 subs.NoRespToXapp = true
206 xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
207 c.SendSubscriptionDeleteReq(subs)
212 func (c *Control) ReadyCB(data interface{}) {
213 if c.RMRClient == nil {
214 c.RMRClient = xapp.Rmr
218 func (c *Control) Run() {
219 xapp.SetReadyCB(c.ReadyCB, nil)
220 xapp.AddConfigChangeListener(c.ReadConfigParameters)
224 //-------------------------------------------------------------------
226 //-------------------------------------------------------------------
227 func (c *Control) SubscriptionHandler(params interface{}) (*models.SubscriptionResponse, error) {
230 c.UpdateCounter(cRestSubReqFromXapp)
232 restSubId := ksuid.New().String()
233 subResp := models.SubscriptionResponse{}
234 subResp.SubscriptionID = &restSubId
235 p := params.(*models.SubscriptionParams)
237 if p.ClientEndpoint == nil {
238 xapp.Logger.Error("ClientEndpoint == nil")
239 c.UpdateCounter(cRestSubFailToXapp)
240 return nil, fmt.Errorf("")
243 _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*p.ClientEndpoint)
245 xapp.Logger.Error("%s", err.Error())
246 c.UpdateCounter(cRestSubFailToXapp)
250 restSubscription, err := c.registry.CreateRESTSubscription(&restSubId, &xAppRmrEndpoint, p.Meid)
252 xapp.Logger.Error("%s", err.Error())
253 c.UpdateCounter(cRestSubFailToXapp)
257 subReqList := e2ap.SubscriptionRequestList{}
258 err = c.e2ap.FillSubscriptionReqMsgs(params, &subReqList, restSubscription)
260 xapp.Logger.Error("%s", err.Error())
261 c.registry.DeleteRESTSubscription(&restSubId)
262 c.UpdateCounter(cRestSubFailToXapp)
266 go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId)
268 c.UpdateCounter(cRestSubRespToXapp)
272 //-------------------------------------------------------------------
274 //-------------------------------------------------------------------
276 func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription, subReqList *e2ap.SubscriptionRequestList,
277 clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string) {
279 xapp.Logger.Info("Subscription Request count=%v ", len(subReqList.E2APSubscriptionRequests))
281 _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*clientEndpoint)
283 xapp.Logger.Error("%s", err.Error())
287 var requestorID int64
289 for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ {
290 subReqMsg := subReqList.E2APSubscriptionRequests[index]
292 xid := *restSubId + "_" + strconv.FormatUint(uint64(subReqMsg.RequestId.InstanceId), 10)
293 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), xid, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid})
294 //trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), *restSubId, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid})
296 c.registry.DeleteRESTSubscription(restSubId)
297 xapp.Logger.Error("XAPP-SubReq transaction not created. RESTSubId=%s, EndPoint=%s, Meid=%s", *restSubId, xAppRmrEndpoint, *meid)
301 defer trans.Release()
302 xapp.Logger.Info("Handle SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
303 subRespMsg, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, restSubId)
305 // Send notification to xApp that prosessing of a Subscription Request has failed. Currently it is not possible
306 // to indicate error. Such possibility should be added. As a workaround requestorID and instanceId are set to zero value
307 requestorID = (int64)(0)
308 instanceId = (int64)(0)
309 resp := &models.SubscriptionResponse{
310 SubscriptionID: restSubId,
311 SubscriptionInstances: []*models.SubscriptionInstance{
312 &models.SubscriptionInstance{RequestorID: &requestorID, InstanceID: &instanceId},
315 // Mark REST subscription request processed.
316 restSubscription.SetProcessed()
317 xapp.Logger.Info("Sending unsuccessful REST notification to endpoint=%v:%v, InstanceId=%v, %s", clientEndpoint.Host, clientEndpoint.HTTPPort, instanceId, idstring(nil, trans))
318 xapp.Subscription.Notify(resp, *clientEndpoint)
319 c.UpdateCounter(cRestSubFailNotifToXapp)
321 xapp.Logger.Info("SubscriptionRequest index=%v processed successfully. endpoint=%v, InstanceId=%v, %s", index, *clientEndpoint, instanceId, idstring(nil, trans))
323 // Store successfully processed InstanceId for deletion
324 restSubscription.AddInstanceId(subRespMsg.RequestId.InstanceId)
326 // Send notification to xApp that a Subscription Request has been processed.
327 requestorID = (int64)(subRespMsg.RequestId.Id)
328 instanceId = (int64)(subRespMsg.RequestId.InstanceId)
329 resp := &models.SubscriptionResponse{
330 SubscriptionID: restSubId,
331 SubscriptionInstances: []*models.SubscriptionInstance{
332 &models.SubscriptionInstance{RequestorID: &requestorID, InstanceID: &instanceId},
335 // Mark REST subscription request processesd.
336 restSubscription.SetProcessed()
337 xapp.Logger.Info("Sending successful REST notification to endpoint=%v, InstanceId=%v, %s", *clientEndpoint, instanceId, idstring(nil, trans))
338 xapp.Subscription.Notify(resp, *clientEndpoint)
339 c.UpdateCounter(cRestSubNotifToXapp)
345 //-------------------------------------------------------------------
347 //------------------------------------------------------------------
348 func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, meid *string,
349 restSubId *string) (*e2ap.E2APSubscriptionResponse, error) {
351 err := c.tracker.Track(trans)
353 err = fmt.Errorf("XAPP-SubReq: %s", idstring(err, trans))
354 xapp.Logger.Error("%s", err.Error())
358 subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
360 err = fmt.Errorf("XAPP-SubReq: %s", idstring(err, trans))
361 xapp.Logger.Error("%s", err.Error())
368 go c.handleSubscriptionCreate(subs, trans)
369 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
373 switch themsg := event.(type) {
374 case *e2ap.E2APSubscriptionResponse:
377 case *e2ap.E2APSubscriptionFailure:
378 err = fmt.Errorf("SubscriptionFailure received")
384 err = fmt.Errorf("XAPP-SubReq: failed %s", idstring(err, trans, subs))
385 xapp.Logger.Error("%s", err.Error())
386 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
390 //-------------------------------------------------------------------
392 //-------------------------------------------------------------------
393 func (c *Control) SubscriptionDeleteHandlerCB(restSubId string) error {
396 c.UpdateCounter(cRestSubDelReqFromXapp)
398 xapp.Logger.Info("SubscriptionDeleteRequest from XAPP")
400 restSubscription, err := c.registry.GetRESTSubscription(restSubId)
402 xapp.Logger.Error("%s", err.Error())
403 if restSubscription == nil {
404 // Subscription was not found
407 if restSubscription.SubReqOngoing == true {
408 err := fmt.Errorf("Handling of the REST Subscription Request still ongoing %s", restSubId)
409 xapp.Logger.Error("%s", err.Error())
411 } else if restSubscription.SubDelReqOngoing == true {
412 // Previous request for same restSubId still ongoing
418 xAppRmrEndPoint := restSubscription.xAppRmrEndPoint
420 for _, instanceId := range restSubscription.InstanceIds {
421 err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId)
423 xapp.Logger.Error("%s", err.Error())
426 xapp.Logger.Info("Deleteting instanceId = %v", instanceId)
427 restSubscription.DeleteInstanceId(instanceId)
429 c.registry.DeleteRESTSubscription(&restSubId)
432 c.UpdateCounter(cRestSubDelRespToXapp)
437 //-------------------------------------------------------------------
439 //-------------------------------------------------------------------
440 func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32) error {
442 xid := *restSubId + "_" + strconv.FormatUint(uint64(instanceId), 10)
443 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(*endPoint), xid, e2ap.RequestId{0, 0}, &xapp.RMRMeid{RanName: *meid})
444 //trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(*endPoint), *restSubId, e2ap.RequestId{0, 0}, &xapp.RMRMeid{RanName: *meid})
446 err := fmt.Errorf("XAPP-SubDelReq transaction not created. restSubId %s, endPoint %s, meid %s, instanceId %v", *restSubId, *endPoint, *meid, instanceId)
447 xapp.Logger.Error("%s", err.Error())
449 defer trans.Release()
451 err := c.tracker.Track(trans)
453 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
454 xapp.Logger.Error("%s", err.Error())
455 return &time.ParseError{}
458 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{instanceId})
460 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
461 xapp.Logger.Error("%s", err.Error())
467 go c.handleSubscriptionDelete(subs, trans)
468 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
470 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
472 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
477 //-------------------------------------------------------------------
479 //-------------------------------------------------------------------
480 func (c *Control) QueryHandler() (models.SubscriptionList, error) {
481 xapp.Logger.Info("QueryHandler() called")
485 return c.registry.QueryHandler()
488 func (c *Control) TestRestHandler(w http.ResponseWriter, r *http.Request) {
489 xapp.Logger.Info("TestRestHandler() called")
491 pathParams := mux.Vars(r)
492 s := pathParams["testId"]
494 // This can be used to delete single subscription from db
495 if contains := strings.Contains(s, "deletesubid="); contains == true {
496 var splits = strings.Split(s, "=")
497 if subId, err := strconv.ParseInt(splits[1], 10, 64); err == nil {
498 xapp.Logger.Info("RemoveSubscriptionFromSdl() called. subId = %v", subId)
499 c.RemoveSubscriptionFromSdl(uint32(subId))
504 // This can be used to remove all subscriptions db from
506 xapp.Logger.Info("RemoveAllSubscriptionsFromSdl() called")
507 c.RemoveAllSubscriptionsFromSdl()
511 // This is meant to cause submgr's restart in testing
513 xapp.Logger.Info("os.Exit(1) called")
517 xapp.Logger.Info("Unsupported rest command received %s", s)
520 //-------------------------------------------------------------------
522 //-------------------------------------------------------------------
524 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
525 params := &xapp.RMRParams{}
526 params.Mtype = trans.GetMtype()
527 params.SubId = int(subs.GetReqId().InstanceId)
529 params.Meid = subs.GetMeid()
531 params.PayloadLen = len(trans.Payload.Buf)
532 params.Payload = trans.Payload.Buf
534 xapp.Logger.Info("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
535 err = c.SendWithRetry(params, false, 5)
537 xapp.Logger.Error("rmrSendToE2T: Send failed: %+v", err)
542 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
544 params := &xapp.RMRParams{}
545 params.Mtype = trans.GetMtype()
546 params.SubId = int(subs.GetReqId().InstanceId)
547 params.Xid = trans.GetXid()
548 params.Meid = trans.GetMeid()
550 params.PayloadLen = len(trans.Payload.Buf)
551 params.Payload = trans.Payload.Buf
553 xapp.Logger.Info("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
554 err = c.SendWithRetry(params, false, 5)
556 xapp.Logger.Error("rmrSendToXapp: Send failed: %+v", err)
561 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
562 if c.RMRClient == nil {
563 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
564 xapp.Logger.Error("%s", err.Error())
569 defer c.RMRClient.Free(msg.Mbuf)
571 // xapp-frame might use direct access to c buffer and
572 // when msg.Mbuf is freed, someone might take it into use
573 // and payload data might be invalid inside message handle function
575 // subscriptions won't load system a lot so there is no
576 // real performance hit by cloning buffer into new go byte slice
577 cPay := append(msg.Payload[:0:0], msg.Payload...)
579 msg.PayloadLen = len(cPay)
582 case xapp.RIC_SUB_REQ:
583 go c.handleXAPPSubscriptionRequest(msg)
584 case xapp.RIC_SUB_RESP:
585 go c.handleE2TSubscriptionResponse(msg)
586 case xapp.RIC_SUB_FAILURE:
587 go c.handleE2TSubscriptionFailure(msg)
588 case xapp.RIC_SUB_DEL_REQ:
589 go c.handleXAPPSubscriptionDeleteRequest(msg)
590 case xapp.RIC_SUB_DEL_RESP:
591 go c.handleE2TSubscriptionDeleteResponse(msg)
592 case xapp.RIC_SUB_DEL_FAILURE:
593 go c.handleE2TSubscriptionDeleteFailure(msg)
595 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
600 //-------------------------------------------------------------------
601 // handle from XAPP Subscription Request
602 //------------------------------------------------------------------
603 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
604 xapp.Logger.Info("MSG from XAPP: %s", params.String())
605 c.UpdateCounter(cSubReqFromXapp)
607 subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
609 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
613 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
615 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
618 defer trans.Release()
620 if err = c.tracker.Track(trans); err != nil {
621 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
625 //TODO handle subscription toward e2term inside AssignToSubscription / hide handleSubscriptionCreate in it?
626 subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
628 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
632 c.wakeSubscriptionRequest(subs, trans)
635 //-------------------------------------------------------------------
636 // Wake Subscription Request to E2node
637 //------------------------------------------------------------------
638 func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) {
640 go c.handleSubscriptionCreate(subs, trans)
641 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
644 switch themsg := event.(type) {
645 case *e2ap.E2APSubscriptionResponse:
646 themsg.RequestId.Id = trans.RequestId.Id
647 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
650 c.UpdateCounter(cSubRespToXapp)
651 c.rmrSendToXapp("", subs, trans)
654 case *e2ap.E2APSubscriptionFailure:
655 themsg.RequestId.Id = trans.RequestId.Id
656 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
658 c.UpdateCounter(cSubFailToXapp)
659 c.rmrSendToXapp("", subs, trans)
665 xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(err, trans, subs))
666 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
669 //-------------------------------------------------------------------
670 // handle from XAPP Subscription Delete Request
671 //------------------------------------------------------------------
672 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
673 xapp.Logger.Info("MSG from XAPP: %s", params.String())
674 c.UpdateCounter(cSubDelReqFromXapp)
676 subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
678 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
682 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
684 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
687 defer trans.Release()
689 err = c.tracker.Track(trans)
691 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
695 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
697 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
704 go c.handleSubscriptionDelete(subs, trans)
705 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
707 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
709 if subs.NoRespToXapp == true {
710 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
714 // Whatever is received success, fail or timeout, send successful delete response
715 subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
716 subDelRespMsg.RequestId.Id = trans.RequestId.Id
717 subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
718 subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
719 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
721 c.UpdateCounter(cSubDelRespToXapp)
722 c.rmrSendToXapp("", subs, trans)
725 //TODO handle subscription toward e2term insiged RemoveFromSubscription / hide handleSubscriptionDelete in it?
726 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
729 //-------------------------------------------------------------------
730 // SUBS CREATE Handling
731 //-------------------------------------------------------------------
732 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp) {
734 var removeSubscriptionFromDb bool = false
735 trans := c.tracker.NewSubsTransaction(subs)
736 subs.WaitTransactionTurn(trans)
737 defer subs.ReleaseTransactionTurn(trans)
738 defer trans.Release()
740 xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
742 subRfMsg, valid := subs.GetCachedResponse()
743 if subRfMsg == nil && valid == true {
744 event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
745 switch event.(type) {
746 case *e2ap.E2APSubscriptionResponse:
747 subRfMsg, valid = subs.SetCachedResponse(event, true)
748 subs.SubRespRcvd = true
749 case *e2ap.E2APSubscriptionFailure:
750 removeSubscriptionFromDb = true
751 subRfMsg, valid = subs.SetCachedResponse(event, false)
752 xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
753 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
754 case *SubmgrRestartTestEvent:
755 // This simulates that no response has been received and after restart subscriptions are restored from db
756 xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
759 xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
760 removeSubscriptionFromDb = true
761 subRfMsg, valid = subs.SetCachedResponse(nil, false)
762 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
764 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
766 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
769 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
771 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
774 c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
775 parentTrans.SendEvent(subRfMsg, 0)
778 //-------------------------------------------------------------------
779 // SUBS DELETE Handling
780 //-------------------------------------------------------------------
782 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
784 trans := c.tracker.NewSubsTransaction(subs)
785 subs.WaitTransactionTurn(trans)
786 defer subs.ReleaseTransactionTurn(trans)
787 defer trans.Release()
789 xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
793 if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
796 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
800 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
801 // If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if
802 // RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...))
803 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
804 c.registry.UpdateSubscriptionToDb(subs, c)
805 parentTrans.SendEvent(nil, 0)
808 //-------------------------------------------------------------------
809 // send to E2T Subscription Request
810 //-------------------------------------------------------------------
811 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
813 var event interface{} = nil
814 var timedOut bool = false
816 subReqMsg := subs.SubReqMsg
817 subReqMsg.RequestId = subs.GetReqId().RequestId
818 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
820 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
824 // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
825 c.WriteSubscriptionToDb(subs)
827 for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
828 desc := fmt.Sprintf("(retry %d)", retries)
830 c.UpdateCounter(cSubReqToE2)
832 c.UpdateCounter(cSubReReqToE2)
834 c.rmrSendToE2T(desc, subs, trans)
835 if subs.DoNotWaitSubResp == false {
836 event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
838 c.UpdateCounter(cSubReqTimerExpiry)
842 // Simulating case where subscrition request has been sent but response has not been received before restart
843 event = &SubmgrRestartTestEvent{}
847 xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
851 //-------------------------------------------------------------------
852 // send to E2T Subscription Delete Request
853 //-------------------------------------------------------------------
855 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
857 var event interface{}
860 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
861 subDelReqMsg.RequestId = subs.GetReqId().RequestId
862 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
863 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
865 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
869 for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
870 desc := fmt.Sprintf("(retry %d)", retries)
872 c.UpdateCounter(cSubDelReqToE2)
874 c.UpdateCounter(cSubDelReReqToE2)
876 c.rmrSendToE2T(desc, subs, trans)
877 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
879 c.UpdateCounter(cSubDelReqTimerExpiry)
884 xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
888 //-------------------------------------------------------------------
889 // handle from E2T Subscription Response
890 //-------------------------------------------------------------------
891 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
892 xapp.Logger.Info("MSG from E2T: %s", params.String())
893 c.UpdateCounter(cSubRespFromE2)
895 subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
897 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
900 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
902 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
905 trans := subs.GetTransaction()
907 err = fmt.Errorf("Ongoing transaction not found")
908 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
911 sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
913 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
914 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
919 //-------------------------------------------------------------------
920 // handle from E2T Subscription Failure
921 //-------------------------------------------------------------------
922 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
923 xapp.Logger.Info("MSG from E2T: %s", params.String())
924 c.UpdateCounter(cSubFailFromE2)
925 subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
927 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
930 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
932 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
935 trans := subs.GetTransaction()
937 err = fmt.Errorf("Ongoing transaction not found")
938 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
941 sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
943 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
944 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
949 //-------------------------------------------------------------------
950 // handle from E2T Subscription Delete Response
951 //-------------------------------------------------------------------
952 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
953 xapp.Logger.Info("MSG from E2T: %s", params.String())
954 c.UpdateCounter(cSubDelRespFromE2)
955 subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
957 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
960 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
962 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
965 trans := subs.GetTransaction()
967 err = fmt.Errorf("Ongoing transaction not found")
968 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
971 sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
973 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
974 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
979 //-------------------------------------------------------------------
980 // handle from E2T Subscription Delete Failure
981 //-------------------------------------------------------------------
982 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
983 xapp.Logger.Info("MSG from E2T: %s", params.String())
984 c.UpdateCounter(cSubDelFailFromE2)
985 subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
987 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
990 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
992 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
995 trans := subs.GetTransaction()
997 err = fmt.Errorf("Ongoing transaction not found")
998 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
1001 sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
1002 if sendOk == false {
1003 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1004 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
1009 //-------------------------------------------------------------------
1011 //-------------------------------------------------------------------
1012 func typeofSubsMessage(v interface{}) string {
1017 //case *e2ap.E2APSubscriptionRequest:
1019 case *e2ap.E2APSubscriptionResponse:
1021 case *e2ap.E2APSubscriptionFailure:
1023 //case *e2ap.E2APSubscriptionDeleteRequest:
1024 // return "SubDelReq"
1025 case *e2ap.E2APSubscriptionDeleteResponse:
1027 case *e2ap.E2APSubscriptionDeleteFailure:
1034 //-------------------------------------------------------------------
1036 //-------------------------------------------------------------------
1037 func (c *Control) WriteSubscriptionToDb(subs *Subscription) {
1038 xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
1039 err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
1041 xapp.Logger.Error("%v", err)
1045 //-------------------------------------------------------------------
1047 //-------------------------------------------------------------------
1048 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) {
1050 if removeSubscriptionFromDb == true {
1051 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1052 c.RemoveSubscriptionFromDb(subs)
1054 // Update is needed for successful response and merge case here
1055 if subs.RetryFromXapp == false {
1056 c.WriteSubscriptionToDb(subs)
1059 subs.RetryFromXapp = false
1062 //-------------------------------------------------------------------
1064 //-------------------------------------------------------------------
1065 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
1066 xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
1067 err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
1069 xapp.Logger.Error("%v", err)
1073 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) {
1075 xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
1077 // Send delete for every endpoint in the subscription
1078 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1079 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1080 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1081 mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1083 xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
1086 for _, endPoint := range subs.EpList.Endpoints {
1087 params := &xapp.RMRParams{}
1088 params.Mtype = mType
1089 params.SubId = int(subs.GetReqId().InstanceId)
1091 params.Meid = subs.Meid
1092 params.Src = endPoint.String()
1093 params.PayloadLen = len(payload.Buf)
1094 params.Payload = payload.Buf
1096 subs.DeleteFromDb = true
1097 c.handleXAPPSubscriptionDeleteRequest(params)