2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
30 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
31 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
32 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
33 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
34 httptransport "github.com/go-openapi/runtime/client"
35 "github.com/go-openapi/strfmt"
36 "github.com/gorilla/mux"
37 "github.com/segmentio/ksuid"
38 "github.com/spf13/viper"
41 //-----------------------------------------------------------------------------
43 //-----------------------------------------------------------------------------
45 func idstring(err error, entries ...fmt.Stringer) string {
46 var retval string = ""
47 var filler string = ""
48 for _, entry := range entries {
49 retval += filler + entry.String()
53 retval += filler + "err(" + err.Error() + ")"
59 //-----------------------------------------------------------------------------
61 //-----------------------------------------------------------------------------
63 var e2tSubReqTimeout time.Duration
64 var e2tSubDelReqTime time.Duration
65 var e2tRecvMsgTimeout time.Duration
66 var waitRouteCleanup_ms time.Duration
67 var e2tMaxSubReqTryCount uint64 // Initial try + retry
68 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
69 var readSubsFromDb string
77 //subscriber *xapp.Subscriber
80 Counters map[string]xapp.Counter
89 type SubmgrRestartTestEvent struct{}
90 type SubmgrRestartUpEvent struct{}
93 xapp.Logger.Info("SUBMGR")
95 viper.SetEnvPrefix("submgr")
96 viper.AllowEmptyEnv(true)
99 func NewControl() *Control {
101 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
102 rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
104 registry := new(Registry)
105 registry.Initialize()
106 registry.rtmgrClient = &rtmgrClient
108 tracker := new(Tracker)
111 c := &Control{e2ap: new(E2ap),
115 //subscriber: subscriber,
116 Counters: xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
118 c.ReadConfigParameters("")
120 // Register REST handler for testing support
121 xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
122 xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET")
124 go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler, c.SubscriptionDeleteHandlerCB)
126 if readSubsFromDb == "false" {
130 // Read subscriptions from db
131 xapp.Logger.Info("Reading subscriptions from db")
132 subIds, register, err := c.ReadAllSubscriptionsFromSdl()
134 xapp.Logger.Error("%v", err)
136 c.registry.subIds = subIds
137 c.registry.register = register
138 c.HandleUncompletedSubscriptions(register)
143 func (c *Control) SymptomDataHandler(w http.ResponseWriter, r *http.Request) {
144 subscriptions, _ := c.registry.QueryHandler()
145 xapp.Resource.SendSymptomDataJson(w, r, subscriptions, "platform/subscriptions.json")
148 //-------------------------------------------------------------------
150 //-------------------------------------------------------------------
151 func (c *Control) ReadConfigParameters(f string) {
153 // viper.GetDuration returns nanoseconds
154 e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
155 if e2tSubReqTimeout == 0 {
156 e2tSubReqTimeout = 2000 * 1000000
158 xapp.Logger.Info("e2tSubReqTimeout %v", e2tSubReqTimeout)
159 e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
160 if e2tSubDelReqTime == 0 {
161 e2tSubDelReqTime = 2000 * 1000000
163 xapp.Logger.Info("e2tSubDelReqTime %v", e2tSubDelReqTime)
164 e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
165 if e2tRecvMsgTimeout == 0 {
166 e2tRecvMsgTimeout = 2000 * 1000000
168 xapp.Logger.Info("e2tRecvMsgTimeout %v", e2tRecvMsgTimeout)
170 // Internal cfg parameter, used to define a wait time for RMR route clean-up. None default
171 // value 100ms used currently only in unittests.
172 waitRouteCleanup_ms = viper.GetDuration("controls.waitRouteCleanup_ms") * 1000000
173 if waitRouteCleanup_ms == 0 {
174 waitRouteCleanup_ms = 5000 * 1000000
176 xapp.Logger.Info("waitRouteCleanup %v", waitRouteCleanup_ms)
178 e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
179 if e2tMaxSubReqTryCount == 0 {
180 e2tMaxSubReqTryCount = 1
182 xapp.Logger.Info("e2tMaxSubReqTryCount %v", e2tMaxSubReqTryCount)
184 e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
185 if e2tMaxSubDelReqTryCount == 0 {
186 e2tMaxSubDelReqTryCount = 1
188 xapp.Logger.Info("e2tMaxSubDelReqTryCount %v", e2tMaxSubDelReqTryCount)
190 readSubsFromDb = viper.GetString("controls.readSubsFromDb")
191 if readSubsFromDb == "" {
192 readSubsFromDb = "true"
194 xapp.Logger.Info("readSubsFromDb %v", readSubsFromDb)
197 //-------------------------------------------------------------------
199 //-------------------------------------------------------------------
200 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
202 xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
203 for subId, subs := range register {
204 if subs.SubRespRcvd == false {
205 subs.NoRespToXapp = true
206 xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
207 c.SendSubscriptionDeleteReq(subs)
212 func (c *Control) ReadyCB(data interface{}) {
213 if c.RMRClient == nil {
214 c.RMRClient = xapp.Rmr
218 func (c *Control) Run() {
219 xapp.SetReadyCB(c.ReadyCB, nil)
220 xapp.AddConfigChangeListener(c.ReadConfigParameters)
224 //-------------------------------------------------------------------
226 //-------------------------------------------------------------------
227 func (c *Control) SubscriptionHandler(params interface{}) (*models.SubscriptionResponse, error) {
229 restSubId := ksuid.New().String()
230 subResp := models.SubscriptionResponse{}
231 subResp.SubscriptionID = &restSubId
232 p := params.(*models.SubscriptionParams)
236 c.UpdateCounter(cRestSubReqFromXapp)
238 if p.ClientEndpoint == nil {
239 xapp.Logger.Error("ClientEndpoint == nil")
240 return nil, fmt.Errorf("")
243 _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*p.ClientEndpoint)
245 xapp.Logger.Error("%s", err.Error())
249 restSubscription, err := c.registry.CreateRESTSubscription(&restSubId, &xAppRmrEndpoint, p.Meid)
251 xapp.Logger.Error("%s", err.Error())
255 subReqList := e2ap.SubscriptionRequestList{}
256 err = c.e2ap.FillSubscriptionReqMsgs(params, &subReqList, restSubscription)
258 xapp.Logger.Error("%s", err.Error())
259 c.registry.DeleteRESTSubscription(&restSubId)
263 go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId)
269 //-------------------------------------------------------------------
271 //-------------------------------------------------------------------
273 func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription, subReqList *e2ap.SubscriptionRequestList,
274 clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string) {
276 xapp.Logger.Info("Subscription Request count=%v ", len(subReqList.E2APSubscriptionRequests))
278 _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*clientEndpoint)
280 xapp.Logger.Error("%s", err.Error())
284 var requestorID int64
286 for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ {
287 subReqMsg := subReqList.E2APSubscriptionRequests[index]
289 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), *restSubId, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid})
291 c.registry.DeleteRESTSubscription(restSubId)
292 xapp.Logger.Error("XAPP-SubReq transaction not created. RESTSubId=%s, EndPoint=%s, Meid=%s", *restSubId, xAppRmrEndpoint, *meid)
296 defer trans.Release()
297 xapp.Logger.Info("Handle SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
298 subRespMsg, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, restSubId)
300 // Send notification to xApp that prosessing of a Subscription Request has failed. Currently it is not possible
301 // to indicate error. Such possibility should be added. As a workaround requestorID and instanceId are set to zero value
302 requestorID = (int64)(0)
303 instanceId = (int64)(0)
304 resp := &models.SubscriptionResponse{
305 SubscriptionID: restSubId,
306 SubscriptionInstances: []*models.SubscriptionInstance{
307 &models.SubscriptionInstance{RequestorID: &requestorID, InstanceID: &instanceId},
310 // Mark REST subscription request processed.
311 restSubscription.SetProcessed()
312 xapp.Logger.Info("Sending unsuccessful REST notification to endpoint=%v:%v, InstanceId=%v, %s", clientEndpoint.Host, clientEndpoint.HTTPPort, instanceId, idstring(nil, trans))
313 xapp.Subscription.Notify(resp, *clientEndpoint)
315 xapp.Logger.Info("SubscriptionRequest index=%v processed successfully. endpoint=%v, InstanceId=%v, %s", index, *clientEndpoint, instanceId, idstring(nil, trans))
317 // Store successfully processed InstanceId for deletion
318 restSubscription.AddInstanceId(subRespMsg.RequestId.InstanceId)
320 // Send notification to xApp that a Subscription Request has been processed.
321 requestorID = (int64)(subRespMsg.RequestId.Id)
322 instanceId = (int64)(subRespMsg.RequestId.InstanceId)
323 resp := &models.SubscriptionResponse{
324 SubscriptionID: restSubId,
325 SubscriptionInstances: []*models.SubscriptionInstance{
326 &models.SubscriptionInstance{RequestorID: &requestorID, InstanceID: &instanceId},
329 // Mark REST subscription request processesd.
330 restSubscription.SetProcessed()
331 xapp.Logger.Info("Sending successful REST notification to endpoint=%v, InstanceId=%v, %s", *clientEndpoint, instanceId, idstring(nil, trans))
332 xapp.Subscription.Notify(resp, *clientEndpoint)
335 c.UpdateCounter(cRestSubRespToXapp)
339 //-------------------------------------------------------------------
341 //------------------------------------------------------------------
342 func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, meid *string,
343 restSubId *string) (*e2ap.E2APSubscriptionResponse, error) {
345 err := c.tracker.Track(trans)
347 err = fmt.Errorf("XAPP-SubReq: %s", idstring(err, trans))
348 xapp.Logger.Error("%s", err.Error())
352 subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
354 err = fmt.Errorf("XAPP-SubReq: %s", idstring(err, trans))
355 xapp.Logger.Error("%s", err.Error())
362 go c.handleSubscriptionCreate(subs, trans)
363 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
367 switch themsg := event.(type) {
368 case *e2ap.E2APSubscriptionResponse:
371 case *e2ap.E2APSubscriptionFailure:
372 err = fmt.Errorf("SubscriptionFailure received")
378 err = fmt.Errorf("XAPP-SubReq: failed %s", idstring(err, trans, subs))
379 xapp.Logger.Error("%s", err.Error())
380 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
384 //-------------------------------------------------------------------
386 //-------------------------------------------------------------------
387 func (c *Control) SubscriptionDeleteHandlerCB(restSubId string) error {
390 c.UpdateCounter(cRestSubDelReqFromXapp)
392 xapp.Logger.Info("SubscriptionDeleteRequest from XAPP")
394 restSubscription, err := c.registry.GetRESTSubscription(restSubId)
396 xapp.Logger.Error("%s", err.Error())
397 if restSubscription == nil {
398 // Subscription was not found
401 if restSubscription.SubReqOngoing == true {
402 err := fmt.Errorf("Handling of the REST Subscription Request still ongoing %s", restSubId)
403 xapp.Logger.Error("%s", err.Error())
405 } else if restSubscription.SubDelReqOngoing == true {
406 // Previous request for same restSubId still ongoing
412 xAppRmrEndPoint := restSubscription.xAppRmrEndPoint
414 for _, instanceId := range restSubscription.InstanceIds {
415 err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId)
417 xapp.Logger.Error("%s", err.Error())
420 xapp.Logger.Info("Deleteting instanceId = %v", instanceId)
421 restSubscription.DeleteInstanceId(instanceId)
423 c.registry.DeleteRESTSubscription(&restSubId)
426 c.UpdateCounter(cRestSubDelRespToXapp)
431 //-------------------------------------------------------------------
433 //-------------------------------------------------------------------
434 func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32) error {
436 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(*endPoint), *restSubId, e2ap.RequestId{0, 0}, &xapp.RMRMeid{RanName: *meid})
438 err := fmt.Errorf("XAPP-SubDelReq transaction not created. restSubId %s, endPoint %s, meid %s, instanceId %v", *restSubId, *endPoint, *meid, instanceId)
439 xapp.Logger.Error("%s", err.Error())
441 defer trans.Release()
443 err := c.tracker.Track(trans)
445 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
446 xapp.Logger.Error("%s", err.Error())
447 return &time.ParseError{}
450 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{instanceId})
452 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
453 xapp.Logger.Error("%s", err.Error())
459 go c.handleSubscriptionDelete(subs, trans)
460 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
462 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
464 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
469 //-------------------------------------------------------------------
471 //-------------------------------------------------------------------
472 func (c *Control) QueryHandler() (models.SubscriptionList, error) {
473 xapp.Logger.Info("QueryHandler() called")
477 return c.registry.QueryHandler()
480 func (c *Control) TestRestHandler(w http.ResponseWriter, r *http.Request) {
481 xapp.Logger.Info("TestRestHandler() called")
483 pathParams := mux.Vars(r)
484 s := pathParams["testId"]
486 // This can be used to delete single subscription from db
487 if contains := strings.Contains(s, "deletesubid="); contains == true {
488 var splits = strings.Split(s, "=")
489 if subId, err := strconv.ParseInt(splits[1], 10, 64); err == nil {
490 xapp.Logger.Info("RemoveSubscriptionFromSdl() called. subId = %v", subId)
491 c.RemoveSubscriptionFromSdl(uint32(subId))
496 // This can be used to remove all subscriptions db from
498 xapp.Logger.Info("RemoveAllSubscriptionsFromSdl() called")
499 c.RemoveAllSubscriptionsFromSdl()
503 // This is meant to cause submgr's restart in testing
505 xapp.Logger.Info("os.Exit(1) called")
509 xapp.Logger.Info("Unsupported rest command received %s", s)
512 //-------------------------------------------------------------------
514 //-------------------------------------------------------------------
516 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
517 params := &xapp.RMRParams{}
518 params.Mtype = trans.GetMtype()
519 params.SubId = int(subs.GetReqId().InstanceId)
521 params.Meid = subs.GetMeid()
523 params.PayloadLen = len(trans.Payload.Buf)
524 params.Payload = trans.Payload.Buf
526 xapp.Logger.Info("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
527 err = c.SendWithRetry(params, false, 5)
529 xapp.Logger.Error("rmrSendToE2T: Send failed: %+v", err)
534 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
536 params := &xapp.RMRParams{}
537 params.Mtype = trans.GetMtype()
538 params.SubId = int(subs.GetReqId().InstanceId)
539 params.Xid = trans.GetXid()
540 params.Meid = trans.GetMeid()
542 params.PayloadLen = len(trans.Payload.Buf)
543 params.Payload = trans.Payload.Buf
545 xapp.Logger.Info("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
546 err = c.SendWithRetry(params, false, 5)
548 xapp.Logger.Error("rmrSendToXapp: Send failed: %+v", err)
553 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
554 if c.RMRClient == nil {
555 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
556 xapp.Logger.Error("%s", err.Error())
561 defer c.RMRClient.Free(msg.Mbuf)
563 // xapp-frame might use direct access to c buffer and
564 // when msg.Mbuf is freed, someone might take it into use
565 // and payload data might be invalid inside message handle function
567 // subscriptions won't load system a lot so there is no
568 // real performance hit by cloning buffer into new go byte slice
569 cPay := append(msg.Payload[:0:0], msg.Payload...)
571 msg.PayloadLen = len(cPay)
574 case xapp.RIC_SUB_REQ:
575 go c.handleXAPPSubscriptionRequest(msg)
576 case xapp.RIC_SUB_RESP:
577 go c.handleE2TSubscriptionResponse(msg)
578 case xapp.RIC_SUB_FAILURE:
579 go c.handleE2TSubscriptionFailure(msg)
580 case xapp.RIC_SUB_DEL_REQ:
581 go c.handleXAPPSubscriptionDeleteRequest(msg)
582 case xapp.RIC_SUB_DEL_RESP:
583 go c.handleE2TSubscriptionDeleteResponse(msg)
584 case xapp.RIC_SUB_DEL_FAILURE:
585 go c.handleE2TSubscriptionDeleteFailure(msg)
587 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
592 //-------------------------------------------------------------------
593 // handle from XAPP Subscription Request
594 //------------------------------------------------------------------
595 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
596 xapp.Logger.Info("MSG from XAPP: %s", params.String())
597 c.UpdateCounter(cSubReqFromXapp)
599 subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
601 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
605 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
607 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
610 defer trans.Release()
612 if err = c.tracker.Track(trans); err != nil {
613 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
617 //TODO handle subscription toward e2term inside AssignToSubscription / hide handleSubscriptionCreate in it?
618 subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
620 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
624 c.wakeSubscriptionRequest(subs, trans)
627 //-------------------------------------------------------------------
628 // Wake Subscription Request to E2node
629 //------------------------------------------------------------------
630 func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) {
632 go c.handleSubscriptionCreate(subs, trans)
633 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
636 switch themsg := event.(type) {
637 case *e2ap.E2APSubscriptionResponse:
638 themsg.RequestId.Id = trans.RequestId.Id
639 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
642 c.UpdateCounter(cSubRespToXapp)
643 c.rmrSendToXapp("", subs, trans)
646 case *e2ap.E2APSubscriptionFailure:
647 themsg.RequestId.Id = trans.RequestId.Id
648 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
650 c.UpdateCounter(cSubFailToXapp)
651 c.rmrSendToXapp("", subs, trans)
657 xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(err, trans, subs))
658 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
661 //-------------------------------------------------------------------
662 // handle from XAPP Subscription Delete Request
663 //------------------------------------------------------------------
664 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
665 xapp.Logger.Info("MSG from XAPP: %s", params.String())
666 c.UpdateCounter(cSubDelReqFromXapp)
668 subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
670 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
674 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
676 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
679 defer trans.Release()
681 err = c.tracker.Track(trans)
683 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
687 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
689 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
696 go c.handleSubscriptionDelete(subs, trans)
697 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
699 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
701 if subs.NoRespToXapp == true {
702 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
706 // Whatever is received success, fail or timeout, send successful delete response
707 subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
708 subDelRespMsg.RequestId.Id = trans.RequestId.Id
709 subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
710 subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
711 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
713 c.UpdateCounter(cSubDelRespToXapp)
714 c.rmrSendToXapp("", subs, trans)
717 //TODO handle subscription toward e2term insiged RemoveFromSubscription / hide handleSubscriptionDelete in it?
718 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
721 //-------------------------------------------------------------------
722 // SUBS CREATE Handling
723 //-------------------------------------------------------------------
724 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp) {
726 var removeSubscriptionFromDb bool = false
727 trans := c.tracker.NewSubsTransaction(subs)
728 subs.WaitTransactionTurn(trans)
729 defer subs.ReleaseTransactionTurn(trans)
730 defer trans.Release()
732 xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
734 subRfMsg, valid := subs.GetCachedResponse()
735 if subRfMsg == nil && valid == true {
736 event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
737 switch event.(type) {
738 case *e2ap.E2APSubscriptionResponse:
739 subRfMsg, valid = subs.SetCachedResponse(event, true)
740 subs.SubRespRcvd = true
741 case *e2ap.E2APSubscriptionFailure:
742 removeSubscriptionFromDb = true
743 subRfMsg, valid = subs.SetCachedResponse(event, false)
744 xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
745 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
746 case *SubmgrRestartTestEvent:
747 // This simulates that no response has been received and after restart subscriptions are restored from db
748 xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
751 xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
752 removeSubscriptionFromDb = true
753 subRfMsg, valid = subs.SetCachedResponse(nil, false)
754 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
756 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
758 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
761 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
763 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
766 c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
767 parentTrans.SendEvent(subRfMsg, 0)
770 //-------------------------------------------------------------------
771 // SUBS DELETE Handling
772 //-------------------------------------------------------------------
774 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
776 trans := c.tracker.NewSubsTransaction(subs)
777 subs.WaitTransactionTurn(trans)
778 defer subs.ReleaseTransactionTurn(trans)
779 defer trans.Release()
781 xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
785 if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
788 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
792 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
793 // If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if
794 // RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...))
795 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
796 c.registry.UpdateSubscriptionToDb(subs, c)
797 parentTrans.SendEvent(nil, 0)
800 //-------------------------------------------------------------------
801 // send to E2T Subscription Request
802 //-------------------------------------------------------------------
803 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
805 var event interface{} = nil
806 var timedOut bool = false
808 subReqMsg := subs.SubReqMsg
809 subReqMsg.RequestId = subs.GetReqId().RequestId
810 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
812 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
816 // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
817 c.WriteSubscriptionToDb(subs)
819 for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
820 desc := fmt.Sprintf("(retry %d)", retries)
822 c.UpdateCounter(cSubReqToE2)
824 c.UpdateCounter(cSubReReqToE2)
826 c.rmrSendToE2T(desc, subs, trans)
827 if subs.DoNotWaitSubResp == false {
828 event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
830 c.UpdateCounter(cSubReqTimerExpiry)
834 // Simulating case where subscrition request has been sent but response has not been received before restart
835 event = &SubmgrRestartTestEvent{}
839 xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
843 //-------------------------------------------------------------------
844 // send to E2T Subscription Delete Request
845 //-------------------------------------------------------------------
847 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
849 var event interface{}
852 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
853 subDelReqMsg.RequestId = subs.GetReqId().RequestId
854 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
855 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
857 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
861 for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
862 desc := fmt.Sprintf("(retry %d)", retries)
864 c.UpdateCounter(cSubDelReqToE2)
866 c.UpdateCounter(cSubDelReReqToE2)
868 c.rmrSendToE2T(desc, subs, trans)
869 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
871 c.UpdateCounter(cSubDelReqTimerExpiry)
876 xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
880 //-------------------------------------------------------------------
881 // handle from E2T Subscription Response
882 //-------------------------------------------------------------------
883 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
884 xapp.Logger.Info("MSG from E2T: %s", params.String())
885 c.UpdateCounter(cSubRespFromE2)
887 subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
889 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
892 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
894 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
897 trans := subs.GetTransaction()
899 err = fmt.Errorf("Ongoing transaction not found")
900 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
903 sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
905 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
906 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
911 //-------------------------------------------------------------------
912 // handle from E2T Subscription Failure
913 //-------------------------------------------------------------------
914 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
915 xapp.Logger.Info("MSG from E2T: %s", params.String())
916 c.UpdateCounter(cSubFailFromE2)
917 subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
919 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
922 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
924 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
927 trans := subs.GetTransaction()
929 err = fmt.Errorf("Ongoing transaction not found")
930 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
933 sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
935 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
936 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
941 //-------------------------------------------------------------------
942 // handle from E2T Subscription Delete Response
943 //-------------------------------------------------------------------
944 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
945 xapp.Logger.Info("MSG from E2T: %s", params.String())
946 c.UpdateCounter(cSubDelRespFromE2)
947 subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
949 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
952 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
954 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
957 trans := subs.GetTransaction()
959 err = fmt.Errorf("Ongoing transaction not found")
960 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
963 sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
965 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
966 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
971 //-------------------------------------------------------------------
972 // handle from E2T Subscription Delete Failure
973 //-------------------------------------------------------------------
974 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
975 xapp.Logger.Info("MSG from E2T: %s", params.String())
976 c.UpdateCounter(cSubDelFailFromE2)
977 subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
979 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
982 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
984 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
987 trans := subs.GetTransaction()
989 err = fmt.Errorf("Ongoing transaction not found")
990 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
993 sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
995 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
996 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
1001 //-------------------------------------------------------------------
1003 //-------------------------------------------------------------------
1004 func typeofSubsMessage(v interface{}) string {
1009 //case *e2ap.E2APSubscriptionRequest:
1011 case *e2ap.E2APSubscriptionResponse:
1013 case *e2ap.E2APSubscriptionFailure:
1015 //case *e2ap.E2APSubscriptionDeleteRequest:
1016 // return "SubDelReq"
1017 case *e2ap.E2APSubscriptionDeleteResponse:
1019 case *e2ap.E2APSubscriptionDeleteFailure:
1026 //-------------------------------------------------------------------
1028 //-------------------------------------------------------------------
1029 func (c *Control) WriteSubscriptionToDb(subs *Subscription) {
1030 xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
1031 err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
1033 xapp.Logger.Error("%v", err)
1037 //-------------------------------------------------------------------
1039 //-------------------------------------------------------------------
1040 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) {
1042 if removeSubscriptionFromDb == true {
1043 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1044 c.RemoveSubscriptionFromDb(subs)
1046 // Update is needed for successful response and merge case here
1047 if subs.RetryFromXapp == false {
1048 c.WriteSubscriptionToDb(subs)
1051 subs.RetryFromXapp = false
1054 //-------------------------------------------------------------------
1056 //-------------------------------------------------------------------
1057 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
1058 xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
1059 err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
1061 xapp.Logger.Error("%v", err)
1065 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) {
1067 xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
1069 // Send delete for every endpoint in the subscription
1070 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1071 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1072 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1073 mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1075 xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
1078 for _, endPoint := range subs.EpList.Endpoints {
1079 params := &xapp.RMRParams{}
1080 params.Mtype = mType
1081 params.SubId = int(subs.GetReqId().InstanceId)
1083 params.Meid = subs.Meid
1084 params.Src = endPoint.String()
1085 params.PayloadLen = len(payload.Buf)
1086 params.Payload = payload.Buf
1088 subs.DeleteFromDb = true
1089 c.handleXAPPSubscriptionDeleteRequest(params)