2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
30 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
31 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
32 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
33 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
34 httptransport "github.com/go-openapi/runtime/client"
35 "github.com/go-openapi/strfmt"
36 "github.com/gorilla/mux"
37 "github.com/segmentio/ksuid"
38 "github.com/spf13/viper"
41 //-----------------------------------------------------------------------------
43 //-----------------------------------------------------------------------------
45 func idstring(err error, entries ...fmt.Stringer) string {
46 var retval string = ""
47 var filler string = ""
48 for _, entry := range entries {
49 retval += filler + entry.String()
53 retval += filler + "err(" + err.Error() + ")"
59 //-----------------------------------------------------------------------------
61 //-----------------------------------------------------------------------------
63 var e2tSubReqTimeout time.Duration
64 var e2tSubDelReqTime time.Duration
65 var e2tRecvMsgTimeout time.Duration
66 var waitRouteCleanup_ms time.Duration
67 var e2tMaxSubReqTryCount uint64 // Initial try + retry
68 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
69 var readSubsFromDb string
79 Counters map[string]xapp.Counter
89 type SubmgrRestartTestEvent struct{}
90 type SubmgrRestartUpEvent struct{}
93 xapp.Logger.Info("SUBMGR")
95 viper.SetEnvPrefix("submgr")
96 viper.AllowEmptyEnv(true)
99 func NewControl() *Control {
101 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
102 rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
104 registry := new(Registry)
105 registry.Initialize()
106 registry.rtmgrClient = &rtmgrClient
108 tracker := new(Tracker)
111 c := &Control{e2ap: new(E2ap),
115 Counters: xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
118 c.ReadConfigParameters("")
120 // Register REST handler for testing support
121 xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
122 xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET")
124 go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler, c.SubscriptionDeleteHandlerCB)
126 if readSubsFromDb == "false" {
130 // Read subscriptions from db
131 xapp.Logger.Info("Reading subscriptions from db")
132 subIds, register, err := c.ReadAllSubscriptionsFromSdl()
134 xapp.Logger.Error("%v", err)
136 c.registry.subIds = subIds
137 c.registry.register = register
138 c.HandleUncompletedSubscriptions(register)
143 func (c *Control) SymptomDataHandler(w http.ResponseWriter, r *http.Request) {
144 subscriptions, _ := c.registry.QueryHandler()
145 xapp.Resource.SendSymptomDataJson(w, r, subscriptions, "platform/subscriptions.json")
148 //-------------------------------------------------------------------
150 //-------------------------------------------------------------------
151 func (c *Control) ReadConfigParameters(f string) {
153 // viper.GetDuration returns nanoseconds
154 e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
155 if e2tSubReqTimeout == 0 {
156 e2tSubReqTimeout = 2000 * 1000000
158 xapp.Logger.Info("e2tSubReqTimeout %v", e2tSubReqTimeout)
159 e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
160 if e2tSubDelReqTime == 0 {
161 e2tSubDelReqTime = 2000 * 1000000
163 xapp.Logger.Info("e2tSubDelReqTime %v", e2tSubDelReqTime)
164 e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
165 if e2tRecvMsgTimeout == 0 {
166 e2tRecvMsgTimeout = 2000 * 1000000
168 xapp.Logger.Info("e2tRecvMsgTimeout %v", e2tRecvMsgTimeout)
170 // Internal cfg parameter, used to define a wait time for RMR route clean-up. None default
171 // value 100ms used currently only in unittests.
172 waitRouteCleanup_ms = viper.GetDuration("controls.waitRouteCleanup_ms") * 1000000
173 if waitRouteCleanup_ms == 0 {
174 waitRouteCleanup_ms = 5000 * 1000000
176 xapp.Logger.Info("waitRouteCleanup %v", waitRouteCleanup_ms)
178 e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
179 if e2tMaxSubReqTryCount == 0 {
180 e2tMaxSubReqTryCount = 1
182 xapp.Logger.Info("e2tMaxSubReqTryCount %v", e2tMaxSubReqTryCount)
184 e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
185 if e2tMaxSubDelReqTryCount == 0 {
186 e2tMaxSubDelReqTryCount = 1
188 xapp.Logger.Info("e2tMaxSubDelReqTryCount %v", e2tMaxSubDelReqTryCount)
190 readSubsFromDb = viper.GetString("controls.readSubsFromDb")
191 if readSubsFromDb == "" {
192 readSubsFromDb = "true"
194 xapp.Logger.Info("readSubsFromDb %v", readSubsFromDb)
195 c.LoggerLevel = viper.GetUint32("logger.level")
196 if c.LoggerLevel == 0 {
201 //-------------------------------------------------------------------
203 //-------------------------------------------------------------------
204 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
206 xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
207 for subId, subs := range register {
208 if subs.SubRespRcvd == false {
209 subs.NoRespToXapp = true
210 xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
211 c.SendSubscriptionDeleteReq(subs)
216 func (c *Control) ReadyCB(data interface{}) {
217 if c.RMRClient == nil {
218 c.RMRClient = xapp.Rmr
222 func (c *Control) Run() {
223 xapp.SetReadyCB(c.ReadyCB, nil)
224 xapp.AddConfigChangeListener(c.ReadConfigParameters)
228 //-------------------------------------------------------------------
230 //-------------------------------------------------------------------
231 func (c *Control) SubscriptionHandler(params interface{}) (*models.SubscriptionResponse, error) {
234 c.UpdateCounter(cRestSubReqFromXapp)
236 subResp := models.SubscriptionResponse{}
237 p := params.(*models.SubscriptionParams)
239 if c.LoggerLevel > 2 {
240 c.PrintRESTSubscriptionRequest(p)
243 if p.ClientEndpoint == nil {
244 xapp.Logger.Error("ClientEndpoint == nil")
245 c.UpdateCounter(cRestSubFailToXapp)
246 return nil, fmt.Errorf("")
249 _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*p.ClientEndpoint)
251 xapp.Logger.Error("%s", err.Error())
252 c.UpdateCounter(cRestSubFailToXapp)
256 var restSubscription *RESTSubscription
257 if p.SubscriptionID == "" {
258 restSubId = ksuid.New().String()
259 restSubscription, err = c.registry.CreateRESTSubscription(&restSubId, &xAppRmrEndpoint, p.Meid)
261 xapp.Logger.Error("%s", err.Error())
262 c.UpdateCounter(cRestSubFailToXapp)
267 restSubId = p.SubscriptionID
268 restSubscription, err = c.registry.GetRESTSubscription(restSubId, false)
270 xapp.Logger.Error("%s", err.Error())
271 c.UpdateCounter(cRestSubFailToXapp)
276 subResp.SubscriptionID = &restSubId
277 subReqList := e2ap.SubscriptionRequestList{}
278 err = c.e2ap.FillSubscriptionReqMsgs(params, &subReqList, restSubscription)
280 xapp.Logger.Error("%s", err.Error())
281 c.registry.DeleteRESTSubscription(&restSubId)
282 c.UpdateCounter(cRestSubFailToXapp)
286 go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId)
288 c.UpdateCounter(cRestSubRespToXapp)
292 //-------------------------------------------------------------------
294 //-------------------------------------------------------------------
296 func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription, subReqList *e2ap.SubscriptionRequestList,
297 clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string) {
299 xapp.Logger.Info("Subscription Request count=%v ", len(subReqList.E2APSubscriptionRequests))
301 _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*clientEndpoint)
303 xapp.Logger.Error("%s", err.Error())
307 var xAppEventInstanceID int64
308 var e2EventInstanceID int64
309 for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ {
310 subReqMsg := subReqList.E2APSubscriptionRequests[index]
312 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), *restSubId, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid})
314 c.registry.DeleteRESTSubscription(restSubId)
315 xapp.Logger.Error("XAPP-SubReq transaction not created. RESTSubId=%s, EndPoint=%s, Meid=%s", *restSubId, xAppRmrEndpoint, *meid)
319 defer trans.Release()
320 xAppEventInstanceID = (int64)(subReqMsg.RequestId.Id)
321 xapp.Logger.Info("Handle SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
322 subRespMsg, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, restSubId)
324 // Send notification to xApp that prosessing of a Subscription Request has failed.
325 e2EventInstanceID = (int64)(0)
326 resp := &models.SubscriptionResponse{
327 SubscriptionID: restSubId,
328 SubscriptionInstances: []*models.SubscriptionInstance{
329 &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
330 ErrorCause: nil, //TODO: Suitable Error cause.
331 XappEventInstanceID: &xAppEventInstanceID},
334 // Mark REST subscription request processed.
335 restSubscription.SetProcessed()
336 xapp.Logger.Info("Sending unsuccessful REST notification to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
337 clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
338 xapp.Subscription.Notify(resp, *clientEndpoint)
339 c.UpdateCounter(cRestSubFailNotifToXapp)
341 e2EventInstanceID = (int64)(subRespMsg.RequestId.InstanceId)
343 xapp.Logger.Info("SubscriptionRequest index=%v processed successfully. endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
344 index, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
346 // Store successfully processed InstanceId for deletion
347 restSubscription.AddE2InstanceId(subRespMsg.RequestId.InstanceId)
348 restSubscription.AddXappIdToE2Id(xAppEventInstanceID, e2EventInstanceID)
350 // Send notification to xApp that a Subscription Request has been processed.
351 resp := &models.SubscriptionResponse{
352 SubscriptionID: restSubId,
353 SubscriptionInstances: []*models.SubscriptionInstance{
354 &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
356 XappEventInstanceID: &xAppEventInstanceID},
359 // Mark REST subscription request processesd.
360 restSubscription.SetProcessed()
361 xapp.Logger.Info("Sending successful REST notification to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
362 clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
363 xapp.Subscription.Notify(resp, *clientEndpoint)
364 c.UpdateCounter(cRestSubNotifToXapp)
370 //-------------------------------------------------------------------
372 //------------------------------------------------------------------
373 func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, meid *string,
374 restSubId *string) (*e2ap.E2APSubscriptionResponse, error) {
376 err := c.tracker.Track(trans)
378 err = fmt.Errorf("XAPP-SubReq: %s", idstring(err, trans))
379 xapp.Logger.Error("%s", err.Error())
383 subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
385 err = fmt.Errorf("XAPP-SubReq: %s", idstring(err, trans))
386 xapp.Logger.Error("%s", err.Error())
393 go c.handleSubscriptionCreate(subs, trans)
394 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
398 switch themsg := event.(type) {
399 case *e2ap.E2APSubscriptionResponse:
402 case *e2ap.E2APSubscriptionFailure:
403 err = fmt.Errorf("SubscriptionFailure received")
409 err = fmt.Errorf("XAPP-SubReq: failed %s", idstring(err, trans, subs))
410 xapp.Logger.Error("%s", err.Error())
411 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
415 //-------------------------------------------------------------------
417 //-------------------------------------------------------------------
418 func (c *Control) SubscriptionDeleteHandlerCB(restSubId string) error {
421 c.UpdateCounter(cRestSubDelReqFromXapp)
423 xapp.Logger.Info("SubscriptionDeleteRequest from XAPP")
425 restSubscription, err := c.registry.GetRESTSubscription(restSubId, true)
427 xapp.Logger.Error("%s", err.Error())
428 if restSubscription == nil {
429 // Subscription was not found
432 if restSubscription.SubReqOngoing == true {
433 err := fmt.Errorf("Handling of the REST Subscription Request still ongoing %s", restSubId)
434 xapp.Logger.Error("%s", err.Error())
436 } else if restSubscription.SubDelReqOngoing == true {
437 // Previous request for same restSubId still ongoing
443 xAppRmrEndPoint := restSubscription.xAppRmrEndPoint
445 for _, instanceId := range restSubscription.InstanceIds {
446 xAppEventInstanceID, err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId)
449 xapp.Logger.Error("%s", err.Error())
452 xapp.Logger.Info("Deleteting instanceId = %v", instanceId)
453 restSubscription.DeleteXappIdToE2Id(xAppEventInstanceID)
454 restSubscription.DeleteE2InstanceId(instanceId)
456 c.registry.DeleteRESTSubscription(&restSubId)
459 c.UpdateCounter(cRestSubDelRespToXapp)
464 //-------------------------------------------------------------------
466 //-------------------------------------------------------------------
467 func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32) (int64, error) {
469 var xAppEventInstanceID int64
470 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{instanceId})
472 xapp.Logger.Info("Subscription Delete Handler subscription for restSubId=%v, E2EventInstanceID=%v not found %s",
473 restSubId, instanceId, idstring(err, nil))
474 return xAppEventInstanceID, nil
477 xAppEventInstanceID = int64(subs.ReqId.Id)
478 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(*endPoint), *restSubId, e2ap.RequestId{subs.ReqId.Id, 0}, &xapp.RMRMeid{RanName: *meid})
480 err := fmt.Errorf("XAPP-SubDelReq transaction not created. restSubId %s, endPoint %s, meid %s, instanceId %v", *restSubId, *endPoint, *meid, instanceId)
481 xapp.Logger.Error("%s", err.Error())
483 defer trans.Release()
485 err = c.tracker.Track(trans)
487 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
488 xapp.Logger.Error("%s", err.Error())
489 return xAppEventInstanceID, &time.ParseError{}
494 go c.handleSubscriptionDelete(subs, trans)
495 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
497 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
499 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
501 return xAppEventInstanceID, nil
504 //-------------------------------------------------------------------
506 //-------------------------------------------------------------------
507 func (c *Control) QueryHandler() (models.SubscriptionList, error) {
508 xapp.Logger.Info("QueryHandler() called")
512 return c.registry.QueryHandler()
515 func (c *Control) TestRestHandler(w http.ResponseWriter, r *http.Request) {
516 xapp.Logger.Info("TestRestHandler() called")
518 pathParams := mux.Vars(r)
519 s := pathParams["testId"]
521 // This can be used to delete single subscription from db
522 if contains := strings.Contains(s, "deletesubid="); contains == true {
523 var splits = strings.Split(s, "=")
524 if subId, err := strconv.ParseInt(splits[1], 10, 64); err == nil {
525 xapp.Logger.Info("RemoveSubscriptionFromSdl() called. subId = %v", subId)
526 c.RemoveSubscriptionFromSdl(uint32(subId))
531 // This can be used to remove all subscriptions db from
533 xapp.Logger.Info("RemoveAllSubscriptionsFromSdl() called")
534 c.RemoveAllSubscriptionsFromSdl()
538 // This is meant to cause submgr's restart in testing
540 xapp.Logger.Info("os.Exit(1) called")
544 xapp.Logger.Info("Unsupported rest command received %s", s)
547 //-------------------------------------------------------------------
549 //-------------------------------------------------------------------
551 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
552 params := &xapp.RMRParams{}
553 params.Mtype = trans.GetMtype()
554 params.SubId = int(subs.GetReqId().InstanceId)
556 params.Meid = subs.GetMeid()
558 params.PayloadLen = len(trans.Payload.Buf)
559 params.Payload = trans.Payload.Buf
561 xapp.Logger.Info("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
562 err = c.SendWithRetry(params, false, 5)
564 xapp.Logger.Error("rmrSendToE2T: Send failed: %+v", err)
569 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
571 params := &xapp.RMRParams{}
572 params.Mtype = trans.GetMtype()
573 params.SubId = int(subs.GetReqId().InstanceId)
574 params.Xid = trans.GetXid()
575 params.Meid = trans.GetMeid()
577 params.PayloadLen = len(trans.Payload.Buf)
578 params.Payload = trans.Payload.Buf
580 xapp.Logger.Info("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
581 err = c.SendWithRetry(params, false, 5)
583 xapp.Logger.Error("rmrSendToXapp: Send failed: %+v", err)
588 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
589 if c.RMRClient == nil {
590 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
591 xapp.Logger.Error("%s", err.Error())
596 defer c.RMRClient.Free(msg.Mbuf)
598 // xapp-frame might use direct access to c buffer and
599 // when msg.Mbuf is freed, someone might take it into use
600 // and payload data might be invalid inside message handle function
602 // subscriptions won't load system a lot so there is no
603 // real performance hit by cloning buffer into new go byte slice
604 cPay := append(msg.Payload[:0:0], msg.Payload...)
606 msg.PayloadLen = len(cPay)
609 case xapp.RIC_SUB_REQ:
610 go c.handleXAPPSubscriptionRequest(msg)
611 case xapp.RIC_SUB_RESP:
612 go c.handleE2TSubscriptionResponse(msg)
613 case xapp.RIC_SUB_FAILURE:
614 go c.handleE2TSubscriptionFailure(msg)
615 case xapp.RIC_SUB_DEL_REQ:
616 go c.handleXAPPSubscriptionDeleteRequest(msg)
617 case xapp.RIC_SUB_DEL_RESP:
618 go c.handleE2TSubscriptionDeleteResponse(msg)
619 case xapp.RIC_SUB_DEL_FAILURE:
620 go c.handleE2TSubscriptionDeleteFailure(msg)
622 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
627 //-------------------------------------------------------------------
628 // handle from XAPP Subscription Request
629 //------------------------------------------------------------------
630 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
631 xapp.Logger.Info("MSG from XAPP: %s", params.String())
632 c.UpdateCounter(cSubReqFromXapp)
634 subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
636 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
640 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
642 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
645 defer trans.Release()
647 if err = c.tracker.Track(trans); err != nil {
648 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
652 //TODO handle subscription toward e2term inside AssignToSubscription / hide handleSubscriptionCreate in it?
653 subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
655 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
659 c.wakeSubscriptionRequest(subs, trans)
662 //-------------------------------------------------------------------
663 // Wake Subscription Request to E2node
664 //------------------------------------------------------------------
665 func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) {
667 go c.handleSubscriptionCreate(subs, trans)
668 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
671 switch themsg := event.(type) {
672 case *e2ap.E2APSubscriptionResponse:
673 themsg.RequestId.Id = trans.RequestId.Id
674 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
677 c.UpdateCounter(cSubRespToXapp)
678 c.rmrSendToXapp("", subs, trans)
681 case *e2ap.E2APSubscriptionFailure:
682 themsg.RequestId.Id = trans.RequestId.Id
683 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
685 c.UpdateCounter(cSubFailToXapp)
686 c.rmrSendToXapp("", subs, trans)
692 xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(err, trans, subs))
693 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
696 //-------------------------------------------------------------------
697 // handle from XAPP Subscription Delete Request
698 //------------------------------------------------------------------
699 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
700 xapp.Logger.Info("MSG from XAPP: %s", params.String())
701 c.UpdateCounter(cSubDelReqFromXapp)
703 subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
705 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
709 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
711 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
714 defer trans.Release()
716 err = c.tracker.Track(trans)
718 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
722 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
724 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
731 go c.handleSubscriptionDelete(subs, trans)
732 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
734 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
736 if subs.NoRespToXapp == true {
737 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
741 // Whatever is received success, fail or timeout, send successful delete response
742 subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
743 subDelRespMsg.RequestId.Id = trans.RequestId.Id
744 subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
745 subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
746 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
748 c.UpdateCounter(cSubDelRespToXapp)
749 c.rmrSendToXapp("", subs, trans)
752 //TODO handle subscription toward e2term insiged RemoveFromSubscription / hide handleSubscriptionDelete in it?
753 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
756 //-------------------------------------------------------------------
757 // SUBS CREATE Handling
758 //-------------------------------------------------------------------
759 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp) {
761 var removeSubscriptionFromDb bool = false
762 trans := c.tracker.NewSubsTransaction(subs)
763 subs.WaitTransactionTurn(trans)
764 defer subs.ReleaseTransactionTurn(trans)
765 defer trans.Release()
767 xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
769 subRfMsg, valid := subs.GetCachedResponse()
770 if subRfMsg == nil && valid == true {
771 event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
772 switch event.(type) {
773 case *e2ap.E2APSubscriptionResponse:
774 subRfMsg, valid = subs.SetCachedResponse(event, true)
775 subs.SubRespRcvd = true
776 case *e2ap.E2APSubscriptionFailure:
777 removeSubscriptionFromDb = true
778 subRfMsg, valid = subs.SetCachedResponse(event, false)
779 xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
780 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
781 case *SubmgrRestartTestEvent:
782 // This simulates that no response has been received and after restart subscriptions are restored from db
783 xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
786 xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
787 removeSubscriptionFromDb = true
788 subRfMsg, valid = subs.SetCachedResponse(nil, false)
789 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
791 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
793 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
796 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
798 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
801 c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
802 parentTrans.SendEvent(subRfMsg, 0)
805 //-------------------------------------------------------------------
806 // SUBS DELETE Handling
807 //-------------------------------------------------------------------
809 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
811 trans := c.tracker.NewSubsTransaction(subs)
812 subs.WaitTransactionTurn(trans)
813 defer subs.ReleaseTransactionTurn(trans)
814 defer trans.Release()
816 xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
820 if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
823 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
827 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
828 // If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if
829 // RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...))
830 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
831 c.registry.UpdateSubscriptionToDb(subs, c)
832 parentTrans.SendEvent(nil, 0)
835 //-------------------------------------------------------------------
836 // send to E2T Subscription Request
837 //-------------------------------------------------------------------
838 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
840 var event interface{} = nil
841 var timedOut bool = false
842 const ricRequestorId = 123
844 subReqMsg := subs.SubReqMsg
845 subReqMsg.RequestId = subs.GetReqId().RequestId
846 subReqMsg.RequestId.Id = ricRequestorId
847 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
849 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
853 // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
854 c.WriteSubscriptionToDb(subs)
856 for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
857 desc := fmt.Sprintf("(retry %d)", retries)
859 c.UpdateCounter(cSubReqToE2)
861 c.UpdateCounter(cSubReReqToE2)
863 c.rmrSendToE2T(desc, subs, trans)
864 if subs.DoNotWaitSubResp == false {
865 event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
867 c.UpdateCounter(cSubReqTimerExpiry)
871 // Simulating case where subscrition request has been sent but response has not been received before restart
872 event = &SubmgrRestartTestEvent{}
876 xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
880 //-------------------------------------------------------------------
881 // send to E2T Subscription Delete Request
882 //-------------------------------------------------------------------
884 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
886 var event interface{}
888 const ricRequestorId = 123
890 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
891 subDelReqMsg.RequestId = subs.GetReqId().RequestId
892 subDelReqMsg.RequestId.Id = ricRequestorId
893 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
894 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
896 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
900 for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
901 desc := fmt.Sprintf("(retry %d)", retries)
903 c.UpdateCounter(cSubDelReqToE2)
905 c.UpdateCounter(cSubDelReReqToE2)
907 c.rmrSendToE2T(desc, subs, trans)
908 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
910 c.UpdateCounter(cSubDelReqTimerExpiry)
915 xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
919 //-------------------------------------------------------------------
920 // handle from E2T Subscription Response
921 //-------------------------------------------------------------------
922 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
923 xapp.Logger.Info("MSG from E2T: %s", params.String())
924 c.UpdateCounter(cSubRespFromE2)
926 subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
928 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
931 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
933 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
936 trans := subs.GetTransaction()
938 err = fmt.Errorf("Ongoing transaction not found")
939 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
942 sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
944 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
945 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
950 //-------------------------------------------------------------------
951 // handle from E2T Subscription Failure
952 //-------------------------------------------------------------------
953 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
954 xapp.Logger.Info("MSG from E2T: %s", params.String())
955 c.UpdateCounter(cSubFailFromE2)
956 subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
958 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
961 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
963 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
966 trans := subs.GetTransaction()
968 err = fmt.Errorf("Ongoing transaction not found")
969 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
972 sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
974 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
975 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
980 //-------------------------------------------------------------------
981 // handle from E2T Subscription Delete Response
982 //-------------------------------------------------------------------
983 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
984 xapp.Logger.Info("MSG from E2T: %s", params.String())
985 c.UpdateCounter(cSubDelRespFromE2)
986 subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
988 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
991 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
993 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
996 trans := subs.GetTransaction()
998 err = fmt.Errorf("Ongoing transaction not found")
999 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
1002 sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
1003 if sendOk == false {
1004 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1005 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
1010 //-------------------------------------------------------------------
1011 // handle from E2T Subscription Delete Failure
1012 //-------------------------------------------------------------------
1013 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
1014 xapp.Logger.Info("MSG from E2T: %s", params.String())
1015 c.UpdateCounter(cSubDelFailFromE2)
1016 subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
1018 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1021 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
1023 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1026 trans := subs.GetTransaction()
1028 err = fmt.Errorf("Ongoing transaction not found")
1029 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
1032 sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
1033 if sendOk == false {
1034 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1035 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
1040 //-------------------------------------------------------------------
1042 //-------------------------------------------------------------------
1043 func typeofSubsMessage(v interface{}) string {
1048 //case *e2ap.E2APSubscriptionRequest:
1050 case *e2ap.E2APSubscriptionResponse:
1052 case *e2ap.E2APSubscriptionFailure:
1054 //case *e2ap.E2APSubscriptionDeleteRequest:
1055 // return "SubDelReq"
1056 case *e2ap.E2APSubscriptionDeleteResponse:
1058 case *e2ap.E2APSubscriptionDeleteFailure:
1065 //-------------------------------------------------------------------
1067 //-------------------------------------------------------------------
1068 func (c *Control) WriteSubscriptionToDb(subs *Subscription) {
1069 xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
1070 err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
1072 xapp.Logger.Error("%v", err)
1076 //-------------------------------------------------------------------
1078 //-------------------------------------------------------------------
1079 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) {
1081 if removeSubscriptionFromDb == true {
1082 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1083 c.RemoveSubscriptionFromDb(subs)
1085 // Update is needed for successful response and merge case here
1086 if subs.RetryFromXapp == false {
1087 c.WriteSubscriptionToDb(subs)
1090 subs.RetryFromXapp = false
1093 //-------------------------------------------------------------------
1095 //-------------------------------------------------------------------
1096 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
1097 xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
1098 err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
1100 xapp.Logger.Error("%v", err)
1104 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) {
1106 const ricRequestorId = 123
1107 xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
1109 // Send delete for every endpoint in the subscription
1110 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1111 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1112 subDelReqMsg.RequestId.Id = ricRequestorId
1113 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1114 mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1116 xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
1119 for _, endPoint := range subs.EpList.Endpoints {
1120 params := &xapp.RMRParams{}
1121 params.Mtype = mType
1122 params.SubId = int(subs.GetReqId().InstanceId)
1124 params.Meid = subs.Meid
1125 params.Src = endPoint.String()
1126 params.PayloadLen = len(payload.Buf)
1127 params.Payload = payload.Buf
1129 subs.DeleteFromDb = true
1130 c.handleXAPPSubscriptionDeleteRequest(params)
1134 func (c *Control) PrintRESTSubscriptionRequest(p *models.SubscriptionParams) {
1136 fmt.Println("CRESTSubscriptionRequest")
1137 fmt.Printf(" ClientEndpoint.Host = %s\n", p.ClientEndpoint.Host)
1139 if p.ClientEndpoint.HTTPPort != nil {
1140 fmt.Printf(" ClientEndpoint.HTTPPort = %v\n", *p.ClientEndpoint.HTTPPort)
1142 fmt.Println(" ClientEndpoint.HTTPPort = nil")
1145 if p.ClientEndpoint.RMRPort != nil {
1146 fmt.Printf(" ClientEndpoint.RMRPort = %v\n", *p.ClientEndpoint.RMRPort)
1148 fmt.Println(" ClientEndpoint.RMRPort = nil")
1152 fmt.Printf(" Meid = %s\n", *p.Meid)
1154 fmt.Println(" Meid = nil")
1157 for _, subscriptionDetail := range p.SubscriptionDetails {
1158 if p.RANFunctionID != nil {
1159 fmt.Printf(" RANFunctionID = %v\n", *p.RANFunctionID)
1161 fmt.Println(" RANFunctionID = nil")
1163 fmt.Printf(" SubscriptionDetail.XappEventInstanceID = %v\n", *subscriptionDetail.XappEventInstanceID)
1164 fmt.Printf(" SubscriptionDetail.EventTriggers.OctetString = %X\n", subscriptionDetail.EventTriggers.OctetString)
1166 for _, actionToBeSetup := range subscriptionDetail.ActionToBeSetupList {
1167 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionID = %v\n", *actionToBeSetup.ActionID)
1168 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionType = %s\n", *actionToBeSetup.ActionType)
1169 if actionToBeSetup.ActionDefinition != nil {
1170 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionDefinition.OctetString = %X\n", actionToBeSetup.ActionDefinition.OctetString)
1172 fmt.Println(" SubscriptionDetail.ActionToBeSetup.ActionDefinition = nil")
1175 if actionToBeSetup.SubsequentAction != nil {
1176 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.SubsequentAction.SubsequentActionType = %s\n", *actionToBeSetup.SubsequentAction.SubsequentActionType)
1177 fmt.Printf(" SubscriptionDetail.ActionToBeSetup..SubsequentAction.TimeToWait = %s\n", *actionToBeSetup.SubsequentAction.TimeToWait)
1179 fmt.Println(" SubscriptionDetail.ActionToBeSetup.SubsequentAction = nil")