2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
30 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
31 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
32 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
33 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
34 httptransport "github.com/go-openapi/runtime/client"
35 "github.com/go-openapi/strfmt"
36 "github.com/gorilla/mux"
37 "github.com/segmentio/ksuid"
38 "github.com/spf13/viper"
41 //-----------------------------------------------------------------------------
43 //-----------------------------------------------------------------------------
45 func idstring(err error, entries ...fmt.Stringer) string {
46 var retval string = ""
47 var filler string = ""
48 for _, entry := range entries {
49 retval += filler + entry.String()
53 retval += filler + "err(" + err.Error() + ")"
59 //-----------------------------------------------------------------------------
61 //-----------------------------------------------------------------------------
63 var e2tSubReqTimeout time.Duration
64 var e2tSubDelReqTime time.Duration
65 var e2tRecvMsgTimeout time.Duration
66 var waitRouteCleanup_ms time.Duration
67 var e2tMaxSubReqTryCount uint64 // Initial try + retry
68 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
69 var readSubsFromDb string
77 //subscriber *xapp.Subscriber
80 Counters map[string]xapp.Counter
89 type SubmgrRestartTestEvent struct{}
90 type SubmgrRestartUpEvent struct{}
93 xapp.Logger.Info("SUBMGR")
95 viper.SetEnvPrefix("submgr")
96 viper.AllowEmptyEnv(true)
99 func NewControl() *Control {
101 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
102 rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
104 registry := new(Registry)
105 registry.Initialize()
106 registry.rtmgrClient = &rtmgrClient
108 tracker := new(Tracker)
111 c := &Control{e2ap: new(E2ap),
115 //subscriber: subscriber,
116 Counters: xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
118 c.ReadConfigParameters("")
120 // Register REST handler for testing support
121 xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
122 xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET")
124 go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler, c.SubscriptionDeleteHandlerCB)
126 if readSubsFromDb == "false" {
130 // Read subscriptions from db
131 xapp.Logger.Info("Reading subscriptions from db")
132 subIds, register, err := c.ReadAllSubscriptionsFromSdl()
134 xapp.Logger.Error("%v", err)
136 c.registry.subIds = subIds
137 c.registry.register = register
138 c.HandleUncompletedSubscriptions(register)
143 func (c *Control) SymptomDataHandler(w http.ResponseWriter, r *http.Request) {
144 subscriptions, _ := c.registry.QueryHandler()
145 xapp.Resource.SendSymptomDataJson(w, r, subscriptions, "platform/subscriptions.json")
148 //-------------------------------------------------------------------
150 //-------------------------------------------------------------------
151 func (c *Control) ReadConfigParameters(f string) {
153 // viper.GetDuration returns nanoseconds
154 e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
155 if e2tSubReqTimeout == 0 {
156 e2tSubReqTimeout = 2000 * 1000000
158 xapp.Logger.Info("e2tSubReqTimeout %v", e2tSubReqTimeout)
159 e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
160 if e2tSubDelReqTime == 0 {
161 e2tSubDelReqTime = 2000 * 1000000
163 xapp.Logger.Info("e2tSubDelReqTime %v", e2tSubDelReqTime)
164 e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
165 if e2tRecvMsgTimeout == 0 {
166 e2tRecvMsgTimeout = 2000 * 1000000
168 xapp.Logger.Info("e2tRecvMsgTimeout %v", e2tRecvMsgTimeout)
170 // Internal cfg parameter, used to define a wait time for RMR route clean-up. None default
171 // value 100ms used currently only in unittests.
172 waitRouteCleanup_ms = viper.GetDuration("controls.waitRouteCleanup_ms") * 1000000
173 if waitRouteCleanup_ms == 0 {
174 waitRouteCleanup_ms = 5000 * 1000000
176 xapp.Logger.Info("waitRouteCleanup %v", waitRouteCleanup_ms)
178 e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
179 if e2tMaxSubReqTryCount == 0 {
180 e2tMaxSubReqTryCount = 1
182 xapp.Logger.Info("e2tMaxSubReqTryCount %v", e2tMaxSubReqTryCount)
184 e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
185 if e2tMaxSubDelReqTryCount == 0 {
186 e2tMaxSubDelReqTryCount = 1
188 xapp.Logger.Info("e2tMaxSubDelReqTryCount %v", e2tMaxSubDelReqTryCount)
190 readSubsFromDb = viper.GetString("controls.readSubsFromDb")
191 if readSubsFromDb == "" {
192 readSubsFromDb = "true"
194 xapp.Logger.Info("readSubsFromDb %v", readSubsFromDb)
197 //-------------------------------------------------------------------
199 //-------------------------------------------------------------------
200 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
202 xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
203 for subId, subs := range register {
204 if subs.SubRespRcvd == false {
205 subs.NoRespToXapp = true
206 xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
207 c.SendSubscriptionDeleteReq(subs)
212 func (c *Control) ReadyCB(data interface{}) {
213 if c.RMRClient == nil {
214 c.RMRClient = xapp.Rmr
218 func (c *Control) Run() {
219 xapp.SetReadyCB(c.ReadyCB, nil)
220 xapp.AddConfigChangeListener(c.ReadConfigParameters)
224 //-------------------------------------------------------------------
226 //-------------------------------------------------------------------
227 func (c *Control) SubscriptionHandler(params interface{}) (*models.SubscriptionResponse, error) {
229 restSubId := ksuid.New().String()
230 subResp := models.SubscriptionResponse{}
231 subResp.SubscriptionID = &restSubId
232 p := params.(*models.SubscriptionParams)
236 c.UpdateCounter(cRestSubReqFromXapp)
238 if p.ClientEndpoint == nil {
239 xapp.Logger.Error("ClientEndpoint == nil")
240 return nil, fmt.Errorf("")
243 _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*p.ClientEndpoint)
245 xapp.Logger.Error("%s", err.Error())
249 restSubscription, err := c.registry.CreateRESTSubscription(&restSubId, &xAppRmrEndpoint, p.Meid)
251 xapp.Logger.Error("%s", err.Error())
255 subReqList := e2ap.SubscriptionRequestList{}
256 err = c.e2ap.FillSubscriptionReqMsgs(params, &subReqList, restSubscription)
258 xapp.Logger.Error("%s", err.Error())
259 c.registry.DeleteRESTSubscription(&restSubId)
263 go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId)
269 //-------------------------------------------------------------------
271 //-------------------------------------------------------------------
273 func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription, subReqList *e2ap.SubscriptionRequestList,
274 clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string) {
276 xapp.Logger.Info("Subscription Request count=%v ", len(subReqList.E2APSubscriptionRequests))
278 _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*clientEndpoint)
280 xapp.Logger.Error("%s", err.Error())
284 var requestorID int64
286 for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ {
287 subReqMsg := subReqList.E2APSubscriptionRequests[index]
289 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), *restSubId, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid})
291 c.registry.DeleteRESTSubscription(restSubId)
292 xapp.Logger.Error("XAPP-SubReq transaction not created. RESTSubId=%s, EndPoint=%s, Meid=%s", *restSubId, xAppRmrEndpoint, *meid)
296 defer trans.Release()
297 xapp.Logger.Info("Handle SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
298 subRespMsg, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, restSubId)
300 // Send notification to xApp that prosessing of a Subscription Request has failed. Currently it is not possible
301 // to indicate error. Such possibility should be added. As a workaround requestorID and instanceId are set to zero value
302 requestorID = (int64)(0)
303 instanceId = (int64)(0)
304 resp := &models.SubscriptionResponse{
305 SubscriptionID: restSubId,
306 SubscriptionInstances: []*models.SubscriptionInstance{
307 &models.SubscriptionInstance{RequestorID: &requestorID, InstanceID: &instanceId},
310 // Mark REST subscription request processed.
311 restSubscription.SetProcessed()
312 xapp.Logger.Info("Sending unsuccessful REST notification to endpoint=%v:%v, InstanceId=%v, %s", clientEndpoint.Host, clientEndpoint.HTTPPort, instanceId, idstring(nil, trans))
313 xapp.Subscription.Notify(resp, *clientEndpoint)
314 c.UpdateCounter(cRestSubFailToXapp)
316 xapp.Logger.Info("SubscriptionRequest index=%v processed successfully. endpoint=%v, InstanceId=%v, %s", index, *clientEndpoint, instanceId, idstring(nil, trans))
318 // Store successfully processed InstanceId for deletion
319 restSubscription.AddInstanceId(subRespMsg.RequestId.InstanceId)
321 // Send notification to xApp that a Subscription Request has been processed.
322 requestorID = (int64)(subRespMsg.RequestId.Id)
323 instanceId = (int64)(subRespMsg.RequestId.InstanceId)
324 resp := &models.SubscriptionResponse{
325 SubscriptionID: restSubId,
326 SubscriptionInstances: []*models.SubscriptionInstance{
327 &models.SubscriptionInstance{RequestorID: &requestorID, InstanceID: &instanceId},
330 // Mark REST subscription request processesd.
331 restSubscription.SetProcessed()
332 xapp.Logger.Info("Sending successful REST notification to endpoint=%v, InstanceId=%v, %s", *clientEndpoint, instanceId, idstring(nil, trans))
333 xapp.Subscription.Notify(resp, *clientEndpoint)
334 c.UpdateCounter(cRestSubRespToXapp)
340 //-------------------------------------------------------------------
342 //------------------------------------------------------------------
343 func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, meid *string,
344 restSubId *string) (*e2ap.E2APSubscriptionResponse, error) {
346 err := c.tracker.Track(trans)
348 err = fmt.Errorf("XAPP-SubReq: %s", idstring(err, trans))
349 xapp.Logger.Error("%s", err.Error())
353 subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
355 err = fmt.Errorf("XAPP-SubReq: %s", idstring(err, trans))
356 xapp.Logger.Error("%s", err.Error())
363 go c.handleSubscriptionCreate(subs, trans)
364 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
368 switch themsg := event.(type) {
369 case *e2ap.E2APSubscriptionResponse:
372 case *e2ap.E2APSubscriptionFailure:
373 err = fmt.Errorf("SubscriptionFailure received")
379 err = fmt.Errorf("XAPP-SubReq: failed %s", idstring(err, trans, subs))
380 xapp.Logger.Error("%s", err.Error())
381 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
385 //-------------------------------------------------------------------
387 //-------------------------------------------------------------------
388 func (c *Control) SubscriptionDeleteHandlerCB(restSubId string) error {
391 c.UpdateCounter(cRestSubDelReqFromXapp)
393 xapp.Logger.Info("SubscriptionDeleteRequest from XAPP")
395 restSubscription, err := c.registry.GetRESTSubscription(restSubId)
397 xapp.Logger.Error("%s", err.Error())
398 if restSubscription == nil {
399 // Subscription was not found
402 if restSubscription.SubReqOngoing == true {
403 err := fmt.Errorf("Handling of the REST Subscription Request still ongoing %s", restSubId)
404 xapp.Logger.Error("%s", err.Error())
406 } else if restSubscription.SubDelReqOngoing == true {
407 // Previous request for same restSubId still ongoing
413 xAppRmrEndPoint := restSubscription.xAppRmrEndPoint
415 for _, instanceId := range restSubscription.InstanceIds {
416 err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId)
418 xapp.Logger.Error("%s", err.Error())
421 xapp.Logger.Info("Deleteting instanceId = %v", instanceId)
422 restSubscription.DeleteInstanceId(instanceId)
424 c.registry.DeleteRESTSubscription(&restSubId)
427 c.UpdateCounter(cRestSubDelRespToXapp)
432 //-------------------------------------------------------------------
434 //-------------------------------------------------------------------
435 func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32) error {
437 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(*endPoint), *restSubId, e2ap.RequestId{0, 0}, &xapp.RMRMeid{RanName: *meid})
439 err := fmt.Errorf("XAPP-SubDelReq transaction not created. restSubId %s, endPoint %s, meid %s, instanceId %v", *restSubId, *endPoint, *meid, instanceId)
440 xapp.Logger.Error("%s", err.Error())
442 defer trans.Release()
444 err := c.tracker.Track(trans)
446 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
447 xapp.Logger.Error("%s", err.Error())
448 return &time.ParseError{}
451 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{instanceId})
453 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
454 xapp.Logger.Error("%s", err.Error())
460 go c.handleSubscriptionDelete(subs, trans)
461 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
463 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
465 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
470 //-------------------------------------------------------------------
472 //-------------------------------------------------------------------
473 func (c *Control) QueryHandler() (models.SubscriptionList, error) {
474 xapp.Logger.Info("QueryHandler() called")
478 return c.registry.QueryHandler()
481 func (c *Control) TestRestHandler(w http.ResponseWriter, r *http.Request) {
482 xapp.Logger.Info("TestRestHandler() called")
484 pathParams := mux.Vars(r)
485 s := pathParams["testId"]
487 // This can be used to delete single subscription from db
488 if contains := strings.Contains(s, "deletesubid="); contains == true {
489 var splits = strings.Split(s, "=")
490 if subId, err := strconv.ParseInt(splits[1], 10, 64); err == nil {
491 xapp.Logger.Info("RemoveSubscriptionFromSdl() called. subId = %v", subId)
492 c.RemoveSubscriptionFromSdl(uint32(subId))
497 // This can be used to remove all subscriptions db from
499 xapp.Logger.Info("RemoveAllSubscriptionsFromSdl() called")
500 c.RemoveAllSubscriptionsFromSdl()
504 // This is meant to cause submgr's restart in testing
506 xapp.Logger.Info("os.Exit(1) called")
510 xapp.Logger.Info("Unsupported rest command received %s", s)
513 //-------------------------------------------------------------------
515 //-------------------------------------------------------------------
517 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
518 params := &xapp.RMRParams{}
519 params.Mtype = trans.GetMtype()
520 params.SubId = int(subs.GetReqId().InstanceId)
522 params.Meid = subs.GetMeid()
524 params.PayloadLen = len(trans.Payload.Buf)
525 params.Payload = trans.Payload.Buf
527 xapp.Logger.Info("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
528 err = c.SendWithRetry(params, false, 5)
530 xapp.Logger.Error("rmrSendToE2T: Send failed: %+v", err)
535 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
537 params := &xapp.RMRParams{}
538 params.Mtype = trans.GetMtype()
539 params.SubId = int(subs.GetReqId().InstanceId)
540 params.Xid = trans.GetXid()
541 params.Meid = trans.GetMeid()
543 params.PayloadLen = len(trans.Payload.Buf)
544 params.Payload = trans.Payload.Buf
546 xapp.Logger.Info("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
547 err = c.SendWithRetry(params, false, 5)
549 xapp.Logger.Error("rmrSendToXapp: Send failed: %+v", err)
554 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
555 if c.RMRClient == nil {
556 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
557 xapp.Logger.Error("%s", err.Error())
562 defer c.RMRClient.Free(msg.Mbuf)
564 // xapp-frame might use direct access to c buffer and
565 // when msg.Mbuf is freed, someone might take it into use
566 // and payload data might be invalid inside message handle function
568 // subscriptions won't load system a lot so there is no
569 // real performance hit by cloning buffer into new go byte slice
570 cPay := append(msg.Payload[:0:0], msg.Payload...)
572 msg.PayloadLen = len(cPay)
575 case xapp.RIC_SUB_REQ:
576 go c.handleXAPPSubscriptionRequest(msg)
577 case xapp.RIC_SUB_RESP:
578 go c.handleE2TSubscriptionResponse(msg)
579 case xapp.RIC_SUB_FAILURE:
580 go c.handleE2TSubscriptionFailure(msg)
581 case xapp.RIC_SUB_DEL_REQ:
582 go c.handleXAPPSubscriptionDeleteRequest(msg)
583 case xapp.RIC_SUB_DEL_RESP:
584 go c.handleE2TSubscriptionDeleteResponse(msg)
585 case xapp.RIC_SUB_DEL_FAILURE:
586 go c.handleE2TSubscriptionDeleteFailure(msg)
588 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
593 //-------------------------------------------------------------------
594 // handle from XAPP Subscription Request
595 //------------------------------------------------------------------
596 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
597 xapp.Logger.Info("MSG from XAPP: %s", params.String())
598 c.UpdateCounter(cSubReqFromXapp)
600 subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
602 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
606 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
608 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
611 defer trans.Release()
613 if err = c.tracker.Track(trans); err != nil {
614 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
618 //TODO handle subscription toward e2term inside AssignToSubscription / hide handleSubscriptionCreate in it?
619 subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
621 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
625 c.wakeSubscriptionRequest(subs, trans)
628 //-------------------------------------------------------------------
629 // Wake Subscription Request to E2node
630 //------------------------------------------------------------------
631 func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) {
633 go c.handleSubscriptionCreate(subs, trans)
634 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
637 switch themsg := event.(type) {
638 case *e2ap.E2APSubscriptionResponse:
639 themsg.RequestId.Id = trans.RequestId.Id
640 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
643 c.UpdateCounter(cSubRespToXapp)
644 c.rmrSendToXapp("", subs, trans)
647 case *e2ap.E2APSubscriptionFailure:
648 themsg.RequestId.Id = trans.RequestId.Id
649 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
651 c.UpdateCounter(cSubFailToXapp)
652 c.rmrSendToXapp("", subs, trans)
658 xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(err, trans, subs))
659 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
662 //-------------------------------------------------------------------
663 // handle from XAPP Subscription Delete Request
664 //------------------------------------------------------------------
665 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
666 xapp.Logger.Info("MSG from XAPP: %s", params.String())
667 c.UpdateCounter(cSubDelReqFromXapp)
669 subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
671 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
675 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
677 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
680 defer trans.Release()
682 err = c.tracker.Track(trans)
684 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
688 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
690 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
697 go c.handleSubscriptionDelete(subs, trans)
698 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
700 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
702 if subs.NoRespToXapp == true {
703 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
707 // Whatever is received success, fail or timeout, send successful delete response
708 subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
709 subDelRespMsg.RequestId.Id = trans.RequestId.Id
710 subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
711 subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
712 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
714 c.UpdateCounter(cSubDelRespToXapp)
715 c.rmrSendToXapp("", subs, trans)
718 //TODO handle subscription toward e2term insiged RemoveFromSubscription / hide handleSubscriptionDelete in it?
719 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
722 //-------------------------------------------------------------------
723 // SUBS CREATE Handling
724 //-------------------------------------------------------------------
725 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp) {
727 var removeSubscriptionFromDb bool = false
728 trans := c.tracker.NewSubsTransaction(subs)
729 subs.WaitTransactionTurn(trans)
730 defer subs.ReleaseTransactionTurn(trans)
731 defer trans.Release()
733 xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
735 subRfMsg, valid := subs.GetCachedResponse()
736 if subRfMsg == nil && valid == true {
737 event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
738 switch event.(type) {
739 case *e2ap.E2APSubscriptionResponse:
740 subRfMsg, valid = subs.SetCachedResponse(event, true)
741 subs.SubRespRcvd = true
742 case *e2ap.E2APSubscriptionFailure:
743 removeSubscriptionFromDb = true
744 subRfMsg, valid = subs.SetCachedResponse(event, false)
745 xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
746 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
747 case *SubmgrRestartTestEvent:
748 // This simulates that no response has been received and after restart subscriptions are restored from db
749 xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
752 xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
753 removeSubscriptionFromDb = true
754 subRfMsg, valid = subs.SetCachedResponse(nil, false)
755 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
757 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
759 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
762 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
764 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
767 c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
768 parentTrans.SendEvent(subRfMsg, 0)
771 //-------------------------------------------------------------------
772 // SUBS DELETE Handling
773 //-------------------------------------------------------------------
775 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
777 trans := c.tracker.NewSubsTransaction(subs)
778 subs.WaitTransactionTurn(trans)
779 defer subs.ReleaseTransactionTurn(trans)
780 defer trans.Release()
782 xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
786 if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
789 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
793 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
794 // If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if
795 // RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...))
796 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
797 c.registry.UpdateSubscriptionToDb(subs, c)
798 parentTrans.SendEvent(nil, 0)
801 //-------------------------------------------------------------------
802 // send to E2T Subscription Request
803 //-------------------------------------------------------------------
804 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
806 var event interface{} = nil
807 var timedOut bool = false
809 subReqMsg := subs.SubReqMsg
810 subReqMsg.RequestId = subs.GetReqId().RequestId
811 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
813 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
817 // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
818 c.WriteSubscriptionToDb(subs)
820 for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
821 desc := fmt.Sprintf("(retry %d)", retries)
823 c.UpdateCounter(cSubReqToE2)
825 c.UpdateCounter(cSubReReqToE2)
827 c.rmrSendToE2T(desc, subs, trans)
828 if subs.DoNotWaitSubResp == false {
829 event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
831 c.UpdateCounter(cSubReqTimerExpiry)
835 // Simulating case where subscrition request has been sent but response has not been received before restart
836 event = &SubmgrRestartTestEvent{}
840 xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
844 //-------------------------------------------------------------------
845 // send to E2T Subscription Delete Request
846 //-------------------------------------------------------------------
848 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
850 var event interface{}
853 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
854 subDelReqMsg.RequestId = subs.GetReqId().RequestId
855 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
856 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
858 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
862 for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
863 desc := fmt.Sprintf("(retry %d)", retries)
865 c.UpdateCounter(cSubDelReqToE2)
867 c.UpdateCounter(cSubDelReReqToE2)
869 c.rmrSendToE2T(desc, subs, trans)
870 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
872 c.UpdateCounter(cSubDelReqTimerExpiry)
877 xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
881 //-------------------------------------------------------------------
882 // handle from E2T Subscription Response
883 //-------------------------------------------------------------------
884 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
885 xapp.Logger.Info("MSG from E2T: %s", params.String())
886 c.UpdateCounter(cSubRespFromE2)
888 subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
890 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
893 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
895 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
898 trans := subs.GetTransaction()
900 err = fmt.Errorf("Ongoing transaction not found")
901 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
904 sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
906 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
907 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
912 //-------------------------------------------------------------------
913 // handle from E2T Subscription Failure
914 //-------------------------------------------------------------------
915 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
916 xapp.Logger.Info("MSG from E2T: %s", params.String())
917 c.UpdateCounter(cSubFailFromE2)
918 subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
920 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
923 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
925 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
928 trans := subs.GetTransaction()
930 err = fmt.Errorf("Ongoing transaction not found")
931 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
934 sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
936 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
937 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
942 //-------------------------------------------------------------------
943 // handle from E2T Subscription Delete Response
944 //-------------------------------------------------------------------
945 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
946 xapp.Logger.Info("MSG from E2T: %s", params.String())
947 c.UpdateCounter(cSubDelRespFromE2)
948 subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
950 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
953 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
955 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
958 trans := subs.GetTransaction()
960 err = fmt.Errorf("Ongoing transaction not found")
961 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
964 sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
966 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
967 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
972 //-------------------------------------------------------------------
973 // handle from E2T Subscription Delete Failure
974 //-------------------------------------------------------------------
975 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
976 xapp.Logger.Info("MSG from E2T: %s", params.String())
977 c.UpdateCounter(cSubDelFailFromE2)
978 subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
980 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
983 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
985 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
988 trans := subs.GetTransaction()
990 err = fmt.Errorf("Ongoing transaction not found")
991 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
994 sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
996 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
997 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
1002 //-------------------------------------------------------------------
1004 //-------------------------------------------------------------------
1005 func typeofSubsMessage(v interface{}) string {
1010 //case *e2ap.E2APSubscriptionRequest:
1012 case *e2ap.E2APSubscriptionResponse:
1014 case *e2ap.E2APSubscriptionFailure:
1016 //case *e2ap.E2APSubscriptionDeleteRequest:
1017 // return "SubDelReq"
1018 case *e2ap.E2APSubscriptionDeleteResponse:
1020 case *e2ap.E2APSubscriptionDeleteFailure:
1027 //-------------------------------------------------------------------
1029 //-------------------------------------------------------------------
1030 func (c *Control) WriteSubscriptionToDb(subs *Subscription) {
1031 xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
1032 err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
1034 xapp.Logger.Error("%v", err)
1038 //-------------------------------------------------------------------
1040 //-------------------------------------------------------------------
1041 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) {
1043 if removeSubscriptionFromDb == true {
1044 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1045 c.RemoveSubscriptionFromDb(subs)
1047 // Update is needed for successful response and merge case here
1048 if subs.RetryFromXapp == false {
1049 c.WriteSubscriptionToDb(subs)
1052 subs.RetryFromXapp = false
1055 //-------------------------------------------------------------------
1057 //-------------------------------------------------------------------
1058 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
1059 xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
1060 err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
1062 xapp.Logger.Error("%v", err)
1066 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) {
1068 xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
1070 // Send delete for every endpoint in the subscription
1071 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1072 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1073 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1074 mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1076 xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
1079 for _, endPoint := range subs.EpList.Endpoints {
1080 params := &xapp.RMRParams{}
1081 params.Mtype = mType
1082 params.SubId = int(subs.GetReqId().InstanceId)
1084 params.Meid = subs.Meid
1085 params.Src = endPoint.String()
1086 params.PayloadLen = len(payload.Buf)
1087 params.Payload = payload.Buf
1089 subs.DeleteFromDb = true
1090 c.handleXAPPSubscriptionDeleteRequest(params)