2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
30 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
31 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
32 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
33 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
34 httptransport "github.com/go-openapi/runtime/client"
35 "github.com/go-openapi/strfmt"
36 "github.com/gorilla/mux"
37 "github.com/segmentio/ksuid"
38 "github.com/spf13/viper"
41 //-----------------------------------------------------------------------------
43 //-----------------------------------------------------------------------------
45 func idstring(err error, entries ...fmt.Stringer) string {
46 var retval string = ""
47 var filler string = ""
48 for _, entry := range entries {
49 retval += filler + entry.String()
53 retval += filler + "err(" + err.Error() + ")"
59 //-----------------------------------------------------------------------------
61 //-----------------------------------------------------------------------------
63 var e2tSubReqTimeout time.Duration
64 var e2tSubDelReqTime time.Duration
65 var e2tRecvMsgTimeout time.Duration
66 var waitRouteCleanup_ms time.Duration
67 var e2tMaxSubReqTryCount uint64 // Initial try + retry
68 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
69 var readSubsFromDb string
70 var restDuplicateCtrl duplicateCtrl
80 Counters map[string]xapp.Counter
90 type SubmgrRestartTestEvent struct{}
91 type SubmgrRestartUpEvent struct{}
94 xapp.Logger.Info("SUBMGR")
96 viper.SetEnvPrefix("submgr")
97 viper.AllowEmptyEnv(true)
100 func NewControl() *Control {
102 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
103 rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
105 registry := new(Registry)
106 registry.Initialize()
107 registry.rtmgrClient = &rtmgrClient
109 tracker := new(Tracker)
112 c := &Control{e2ap: new(E2ap),
116 Counters: xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
119 c.ReadConfigParameters("")
121 // Register REST handler for testing support
122 xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
123 xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET")
125 go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler, c.SubscriptionDeleteHandlerCB)
127 if readSubsFromDb == "false" {
131 restDuplicateCtrl.Init()
133 // Read subscriptions from db
134 xapp.Logger.Info("Reading subscriptions from db")
135 subIds, register, err := c.ReadAllSubscriptionsFromSdl()
137 xapp.Logger.Error("%v", err)
139 c.registry.subIds = subIds
140 c.registry.register = register
141 c.HandleUncompletedSubscriptions(register)
146 func (c *Control) SymptomDataHandler(w http.ResponseWriter, r *http.Request) {
147 subscriptions, _ := c.registry.QueryHandler()
148 xapp.Resource.SendSymptomDataJson(w, r, subscriptions, "platform/subscriptions.json")
151 //-------------------------------------------------------------------
153 //-------------------------------------------------------------------
154 func (c *Control) ReadConfigParameters(f string) {
156 // viper.GetDuration returns nanoseconds
157 e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
158 if e2tSubReqTimeout == 0 {
159 e2tSubReqTimeout = 2000 * 1000000
161 xapp.Logger.Info("e2tSubReqTimeout %v", e2tSubReqTimeout)
162 e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
163 if e2tSubDelReqTime == 0 {
164 e2tSubDelReqTime = 2000 * 1000000
166 xapp.Logger.Info("e2tSubDelReqTime %v", e2tSubDelReqTime)
167 e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
168 if e2tRecvMsgTimeout == 0 {
169 e2tRecvMsgTimeout = 2000 * 1000000
171 xapp.Logger.Info("e2tRecvMsgTimeout %v", e2tRecvMsgTimeout)
173 // Internal cfg parameter, used to define a wait time for RMR route clean-up. None default
174 // value 100ms used currently only in unittests.
175 waitRouteCleanup_ms = viper.GetDuration("controls.waitRouteCleanup_ms") * 1000000
176 if waitRouteCleanup_ms == 0 {
177 waitRouteCleanup_ms = 5000 * 1000000
179 xapp.Logger.Info("waitRouteCleanup %v", waitRouteCleanup_ms)
181 e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
182 if e2tMaxSubReqTryCount == 0 {
183 e2tMaxSubReqTryCount = 1
185 xapp.Logger.Info("e2tMaxSubReqTryCount %v", e2tMaxSubReqTryCount)
187 e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
188 if e2tMaxSubDelReqTryCount == 0 {
189 e2tMaxSubDelReqTryCount = 1
191 xapp.Logger.Info("e2tMaxSubDelReqTryCount %v", e2tMaxSubDelReqTryCount)
193 readSubsFromDb = viper.GetString("controls.readSubsFromDb")
194 if readSubsFromDb == "" {
195 readSubsFromDb = "true"
197 xapp.Logger.Info("readSubsFromDb %v", readSubsFromDb)
198 c.LoggerLevel = viper.GetUint32("logger.level")
199 if c.LoggerLevel == 0 {
204 //-------------------------------------------------------------------
206 //-------------------------------------------------------------------
207 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
209 xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
210 for subId, subs := range register {
211 if subs.SubRespRcvd == false {
212 subs.NoRespToXapp = true
213 xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
214 c.SendSubscriptionDeleteReq(subs)
219 func (c *Control) ReadyCB(data interface{}) {
220 if c.RMRClient == nil {
221 c.RMRClient = xapp.Rmr
225 func (c *Control) Run() {
226 xapp.SetReadyCB(c.ReadyCB, nil)
227 xapp.AddConfigChangeListener(c.ReadConfigParameters)
231 //-------------------------------------------------------------------
233 //-------------------------------------------------------------------
234 func (c *Control) SubscriptionHandler(params interface{}) (*models.SubscriptionResponse, error) {
237 c.UpdateCounter(cRestSubReqFromXapp)
239 subResp := models.SubscriptionResponse{}
240 p := params.(*models.SubscriptionParams)
242 if c.LoggerLevel > 2 {
243 c.PrintRESTSubscriptionRequest(p)
246 if p.ClientEndpoint == nil {
247 xapp.Logger.Error("ClientEndpoint == nil")
248 c.UpdateCounter(cRestSubFailToXapp)
249 return nil, fmt.Errorf("")
252 _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*p.ClientEndpoint)
254 xapp.Logger.Error("%s", err.Error())
255 c.UpdateCounter(cRestSubFailToXapp)
259 var restSubscription *RESTSubscription
260 if p.SubscriptionID == "" {
261 restSubId = ksuid.New().String()
262 restSubscription, err = c.registry.CreateRESTSubscription(&restSubId, &xAppRmrEndpoint, p.Meid)
264 xapp.Logger.Error("%s", err.Error())
265 c.UpdateCounter(cRestSubFailToXapp)
270 restSubId = p.SubscriptionID
271 restSubscription, err = c.registry.GetRESTSubscription(restSubId, false)
273 xapp.Logger.Error("%s", err.Error())
274 c.UpdateCounter(cRestSubFailToXapp)
279 subResp.SubscriptionID = &restSubId
280 subReqList := e2ap.SubscriptionRequestList{}
281 err = c.e2ap.FillSubscriptionReqMsgs(params, &subReqList, restSubscription)
283 xapp.Logger.Error("%s", err.Error())
284 c.registry.DeleteRESTSubscription(&restSubId)
285 c.UpdateCounter(cRestSubFailToXapp)
289 err, duplicate, md5sum := restDuplicateCtrl.IsDuplicateToOngoingTransaction(restSubId, params)
292 // We were unable to detect whether this request was duplicate or not, proceed
293 xapp.Logger.Info("%s - proceeding with the request", err.Error())
296 if *p.SubscriptionDetails[0].ActionToBeSetupList[0].ActionType == "report" {
297 xapp.Logger.Info("Retransmission blocker dropped for report typer of request")
298 c.UpdateCounter(cRestSubRespToXapp)
302 restSubscription.Md5sumOngoing = md5sum
305 go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId, xAppRmrEndpoint)
307 c.UpdateCounter(cRestSubRespToXapp)
311 func (c *Control) sendUnsuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, err error,
312 clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp) {
314 // Send notification to xApp that prosessing of a Subscription Request has failed.
315 e2EventInstanceID := (int64)(0)
316 errorCause := err.Error()
317 resp := &models.SubscriptionResponse{
318 SubscriptionID: restSubId,
319 SubscriptionInstances: []*models.SubscriptionInstance{
320 &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
321 ErrorCause: &errorCause,
322 XappEventInstanceID: &xAppEventInstanceID},
325 // Mark REST subscription request processed.
326 restSubscription.SetProcessed()
328 xapp.Logger.Info("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
329 errorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
331 xapp.Logger.Info("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v",
332 errorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID)
335 c.UpdateCounter(cRestSubFailNotifToXapp)
336 xapp.Subscription.Notify(resp, *clientEndpoint)
339 func (c *Control) sendSuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, e2EventInstanceID int64,
340 clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp) {
342 // Store successfully processed InstanceId for deletion
343 restSubscription.AddE2InstanceId((uint32)(e2EventInstanceID))
344 restSubscription.AddXappIdToE2Id(xAppEventInstanceID, e2EventInstanceID)
346 // Send notification to xApp that a Subscription Request has been processed.
347 resp := &models.SubscriptionResponse{
348 SubscriptionID: restSubId,
349 SubscriptionInstances: []*models.SubscriptionInstance{
350 &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
352 XappEventInstanceID: &xAppEventInstanceID},
355 // Mark REST subscription request processesd.
356 restSubscription.SetProcessed()
357 xapp.Logger.Info("Sending successful REST notification to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
358 clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
360 c.UpdateCounter(cRestSubNotifToXapp)
361 xapp.Subscription.Notify(resp, *clientEndpoint)
364 //-------------------------------------------------------------------
366 //-------------------------------------------------------------------
368 func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription, subReqList *e2ap.SubscriptionRequestList,
369 clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string, xAppRmrEndpoint string) {
371 xapp.Logger.Info("Subscription Request count=%v ", len(subReqList.E2APSubscriptionRequests))
373 var xAppEventInstanceID int64
374 var e2EventInstanceID int64
376 defer restDuplicateCtrl.TransactionComplete(restSubscription.Md5sumOngoing)
378 for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ {
379 subReqMsg := subReqList.E2APSubscriptionRequests[index]
380 xAppEventInstanceID = (int64)(subReqMsg.RequestId.Id)
382 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), *restSubId, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid})
384 // Send notification to xApp that prosessing of a Subscription Request has failed.
385 err := fmt.Errorf("Tracking failure")
386 c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans)
390 xapp.Logger.Info("Handle SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
392 subRespMsg, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, restSubId)
394 c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans)
396 e2EventInstanceID = (int64)(subRespMsg.RequestId.InstanceId)
397 xapp.Logger.Info("SubscriptionRequest index=%v processed successfully. endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
398 index, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
399 c.sendSuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, e2EventInstanceID, clientEndpoint, trans)
405 //-------------------------------------------------------------------
407 //------------------------------------------------------------------
408 func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, meid *string,
409 restSubId *string) (*e2ap.E2APSubscriptionResponse, error) {
411 err := c.tracker.Track(trans)
413 xapp.Logger.Error("XAPP-SubReq Tracking error: %s", idstring(err, trans))
414 err = fmt.Errorf("Tracking failure")
418 subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
420 xapp.Logger.Error("XAPP-SubReq Assign error: %s", idstring(err, trans))
427 go c.handleSubscriptionCreate(subs, trans)
428 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
432 switch themsg := event.(type) {
433 case *e2ap.E2APSubscriptionResponse:
436 case *e2ap.E2APSubscriptionFailure:
437 err = fmt.Errorf("E2 SubscriptionFailure received")
440 err = fmt.Errorf("unexpected E2 subscription response received")
444 err = fmt.Errorf("E2 subscription response timeout")
447 xapp.Logger.Error("XAPP-SubReq E2 subscription failed %s", idstring(err, trans, subs))
448 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
452 //-------------------------------------------------------------------
454 //-------------------------------------------------------------------
455 func (c *Control) SubscriptionDeleteHandlerCB(restSubId string) error {
458 c.UpdateCounter(cRestSubDelReqFromXapp)
460 xapp.Logger.Info("SubscriptionDeleteRequest from XAPP")
462 restSubscription, err := c.registry.GetRESTSubscription(restSubId, true)
464 xapp.Logger.Error("%s", err.Error())
465 if restSubscription == nil {
466 // Subscription was not found
469 if restSubscription.SubReqOngoing == true {
470 err := fmt.Errorf("Handling of the REST Subscription Request still ongoing %s", restSubId)
471 xapp.Logger.Error("%s", err.Error())
473 } else if restSubscription.SubDelReqOngoing == true {
474 // Previous request for same restSubId still ongoing
480 xAppRmrEndPoint := restSubscription.xAppRmrEndPoint
482 for _, instanceId := range restSubscription.InstanceIds {
483 xAppEventInstanceID, err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId)
486 xapp.Logger.Error("%s", err.Error())
489 xapp.Logger.Info("Deleteting instanceId = %v", instanceId)
490 restSubscription.DeleteXappIdToE2Id(xAppEventInstanceID)
491 restSubscription.DeleteE2InstanceId(instanceId)
493 c.registry.DeleteRESTSubscription(&restSubId)
496 c.UpdateCounter(cRestSubDelRespToXapp)
501 //-------------------------------------------------------------------
503 //-------------------------------------------------------------------
504 func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32) (int64, error) {
506 var xAppEventInstanceID int64
507 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{instanceId})
509 xapp.Logger.Info("Subscription Delete Handler subscription for restSubId=%v, E2EventInstanceID=%v not found %s",
510 restSubId, instanceId, idstring(err, nil))
511 return xAppEventInstanceID, nil
514 xAppEventInstanceID = int64(subs.ReqId.Id)
515 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(*endPoint), *restSubId, e2ap.RequestId{subs.ReqId.Id, 0}, &xapp.RMRMeid{RanName: *meid})
517 err := fmt.Errorf("XAPP-SubDelReq transaction not created. restSubId %s, endPoint %s, meid %s, instanceId %v", *restSubId, *endPoint, *meid, instanceId)
518 xapp.Logger.Error("%s", err.Error())
520 defer trans.Release()
522 err = c.tracker.Track(trans)
524 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
525 xapp.Logger.Error("%s", err.Error())
526 return xAppEventInstanceID, &time.ParseError{}
531 go c.handleSubscriptionDelete(subs, trans)
532 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
534 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
536 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
538 return xAppEventInstanceID, nil
541 //-------------------------------------------------------------------
543 //-------------------------------------------------------------------
544 func (c *Control) QueryHandler() (models.SubscriptionList, error) {
545 xapp.Logger.Info("QueryHandler() called")
549 return c.registry.QueryHandler()
552 func (c *Control) TestRestHandler(w http.ResponseWriter, r *http.Request) {
553 xapp.Logger.Info("TestRestHandler() called")
555 pathParams := mux.Vars(r)
556 s := pathParams["testId"]
558 // This can be used to delete single subscription from db
559 if contains := strings.Contains(s, "deletesubid="); contains == true {
560 var splits = strings.Split(s, "=")
561 if subId, err := strconv.ParseInt(splits[1], 10, 64); err == nil {
562 xapp.Logger.Info("RemoveSubscriptionFromSdl() called. subId = %v", subId)
563 c.RemoveSubscriptionFromSdl(uint32(subId))
568 // This can be used to remove all subscriptions db from
570 xapp.Logger.Info("RemoveAllSubscriptionsFromSdl() called")
571 c.RemoveAllSubscriptionsFromSdl()
575 // This is meant to cause submgr's restart in testing
577 xapp.Logger.Info("os.Exit(1) called")
581 xapp.Logger.Info("Unsupported rest command received %s", s)
584 //-------------------------------------------------------------------
586 //-------------------------------------------------------------------
588 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
589 params := &xapp.RMRParams{}
590 params.Mtype = trans.GetMtype()
591 params.SubId = int(subs.GetReqId().InstanceId)
593 params.Meid = subs.GetMeid()
595 params.PayloadLen = len(trans.Payload.Buf)
596 params.Payload = trans.Payload.Buf
598 xapp.Logger.Info("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
599 err = c.SendWithRetry(params, false, 5)
601 xapp.Logger.Error("rmrSendToE2T: Send failed: %+v", err)
606 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
608 params := &xapp.RMRParams{}
609 params.Mtype = trans.GetMtype()
610 params.SubId = int(subs.GetReqId().InstanceId)
611 params.Xid = trans.GetXid()
612 params.Meid = trans.GetMeid()
614 params.PayloadLen = len(trans.Payload.Buf)
615 params.Payload = trans.Payload.Buf
617 xapp.Logger.Info("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
618 err = c.SendWithRetry(params, false, 5)
620 xapp.Logger.Error("rmrSendToXapp: Send failed: %+v", err)
625 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
626 if c.RMRClient == nil {
627 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
628 xapp.Logger.Error("%s", err.Error())
633 defer c.RMRClient.Free(msg.Mbuf)
635 // xapp-frame might use direct access to c buffer and
636 // when msg.Mbuf is freed, someone might take it into use
637 // and payload data might be invalid inside message handle function
639 // subscriptions won't load system a lot so there is no
640 // real performance hit by cloning buffer into new go byte slice
641 cPay := append(msg.Payload[:0:0], msg.Payload...)
643 msg.PayloadLen = len(cPay)
646 case xapp.RIC_SUB_REQ:
647 go c.handleXAPPSubscriptionRequest(msg)
648 case xapp.RIC_SUB_RESP:
649 go c.handleE2TSubscriptionResponse(msg)
650 case xapp.RIC_SUB_FAILURE:
651 go c.handleE2TSubscriptionFailure(msg)
652 case xapp.RIC_SUB_DEL_REQ:
653 go c.handleXAPPSubscriptionDeleteRequest(msg)
654 case xapp.RIC_SUB_DEL_RESP:
655 go c.handleE2TSubscriptionDeleteResponse(msg)
656 case xapp.RIC_SUB_DEL_FAILURE:
657 go c.handleE2TSubscriptionDeleteFailure(msg)
659 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
664 //-------------------------------------------------------------------
665 // handle from XAPP Subscription Request
666 //------------------------------------------------------------------
667 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
668 xapp.Logger.Info("MSG from XAPP: %s", params.String())
669 c.UpdateCounter(cSubReqFromXapp)
671 subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
673 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
677 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
679 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
682 defer trans.Release()
684 if err = c.tracker.Track(trans); err != nil {
685 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
689 //TODO handle subscription toward e2term inside AssignToSubscription / hide handleSubscriptionCreate in it?
690 subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
692 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
696 c.wakeSubscriptionRequest(subs, trans)
699 //-------------------------------------------------------------------
700 // Wake Subscription Request to E2node
701 //------------------------------------------------------------------
702 func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) {
704 go c.handleSubscriptionCreate(subs, trans)
705 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
708 switch themsg := event.(type) {
709 case *e2ap.E2APSubscriptionResponse:
710 themsg.RequestId.Id = trans.RequestId.Id
711 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
714 c.UpdateCounter(cSubRespToXapp)
715 c.rmrSendToXapp("", subs, trans)
718 case *e2ap.E2APSubscriptionFailure:
719 themsg.RequestId.Id = trans.RequestId.Id
720 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
722 c.UpdateCounter(cSubFailToXapp)
723 c.rmrSendToXapp("", subs, trans)
729 xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(err, trans, subs))
730 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
733 //-------------------------------------------------------------------
734 // handle from XAPP Subscription Delete Request
735 //------------------------------------------------------------------
736 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
737 xapp.Logger.Info("MSG from XAPP: %s", params.String())
738 c.UpdateCounter(cSubDelReqFromXapp)
740 subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
742 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
746 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
748 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
751 defer trans.Release()
753 err = c.tracker.Track(trans)
755 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
759 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
761 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
768 go c.handleSubscriptionDelete(subs, trans)
769 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
771 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
773 if subs.NoRespToXapp == true {
774 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
778 // Whatever is received success, fail or timeout, send successful delete response
779 subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
780 subDelRespMsg.RequestId.Id = trans.RequestId.Id
781 subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
782 subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
783 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
785 c.UpdateCounter(cSubDelRespToXapp)
786 c.rmrSendToXapp("", subs, trans)
789 //TODO handle subscription toward e2term insiged RemoveFromSubscription / hide handleSubscriptionDelete in it?
790 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
793 //-------------------------------------------------------------------
794 // SUBS CREATE Handling
795 //-------------------------------------------------------------------
796 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp) {
798 var removeSubscriptionFromDb bool = false
799 trans := c.tracker.NewSubsTransaction(subs)
800 subs.WaitTransactionTurn(trans)
801 defer subs.ReleaseTransactionTurn(trans)
802 defer trans.Release()
804 xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
806 subRfMsg, valid := subs.GetCachedResponse()
807 if subRfMsg == nil && valid == true {
808 event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
809 switch event.(type) {
810 case *e2ap.E2APSubscriptionResponse:
811 subRfMsg, valid = subs.SetCachedResponse(event, true)
812 subs.SubRespRcvd = true
813 case *e2ap.E2APSubscriptionFailure:
814 removeSubscriptionFromDb = true
815 subRfMsg, valid = subs.SetCachedResponse(event, false)
816 xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
817 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
818 case *SubmgrRestartTestEvent:
819 // This simulates that no response has been received and after restart subscriptions are restored from db
820 xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
823 xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
824 removeSubscriptionFromDb = true
825 subRfMsg, valid = subs.SetCachedResponse(nil, false)
826 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
828 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
830 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
833 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
835 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
838 c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
839 parentTrans.SendEvent(subRfMsg, 0)
842 //-------------------------------------------------------------------
843 // SUBS DELETE Handling
844 //-------------------------------------------------------------------
846 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
848 trans := c.tracker.NewSubsTransaction(subs)
849 subs.WaitTransactionTurn(trans)
850 defer subs.ReleaseTransactionTurn(trans)
851 defer trans.Release()
853 xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
857 if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
860 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
864 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
865 // If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if
866 // RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...))
867 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
868 c.registry.UpdateSubscriptionToDb(subs, c)
869 parentTrans.SendEvent(nil, 0)
872 //-------------------------------------------------------------------
873 // send to E2T Subscription Request
874 //-------------------------------------------------------------------
875 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
877 var event interface{} = nil
878 var timedOut bool = false
879 const ricRequestorId = 123
881 subReqMsg := subs.SubReqMsg
882 subReqMsg.RequestId = subs.GetReqId().RequestId
883 subReqMsg.RequestId.Id = ricRequestorId
884 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
886 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
890 // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
891 c.WriteSubscriptionToDb(subs)
893 for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
894 desc := fmt.Sprintf("(retry %d)", retries)
896 c.UpdateCounter(cSubReqToE2)
898 c.UpdateCounter(cSubReReqToE2)
900 c.rmrSendToE2T(desc, subs, trans)
901 if subs.DoNotWaitSubResp == false {
902 event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
904 c.UpdateCounter(cSubReqTimerExpiry)
908 // Simulating case where subscrition request has been sent but response has not been received before restart
909 event = &SubmgrRestartTestEvent{}
913 xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
917 //-------------------------------------------------------------------
918 // send to E2T Subscription Delete Request
919 //-------------------------------------------------------------------
921 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
923 var event interface{}
925 const ricRequestorId = 123
927 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
928 subDelReqMsg.RequestId = subs.GetReqId().RequestId
929 subDelReqMsg.RequestId.Id = ricRequestorId
930 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
931 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
933 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
937 for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
938 desc := fmt.Sprintf("(retry %d)", retries)
940 c.UpdateCounter(cSubDelReqToE2)
942 c.UpdateCounter(cSubDelReReqToE2)
944 c.rmrSendToE2T(desc, subs, trans)
945 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
947 c.UpdateCounter(cSubDelReqTimerExpiry)
952 xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
956 //-------------------------------------------------------------------
957 // handle from E2T Subscription Response
958 //-------------------------------------------------------------------
959 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
960 xapp.Logger.Info("MSG from E2T: %s", params.String())
961 c.UpdateCounter(cSubRespFromE2)
963 subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
965 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
968 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
970 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
973 trans := subs.GetTransaction()
975 err = fmt.Errorf("Ongoing transaction not found")
976 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
979 sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
981 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
982 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
987 //-------------------------------------------------------------------
988 // handle from E2T Subscription Failure
989 //-------------------------------------------------------------------
990 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
991 xapp.Logger.Info("MSG from E2T: %s", params.String())
992 c.UpdateCounter(cSubFailFromE2)
993 subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
995 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
998 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
1000 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
1003 trans := subs.GetTransaction()
1005 err = fmt.Errorf("Ongoing transaction not found")
1006 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
1009 sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
1010 if sendOk == false {
1011 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1012 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
1017 //-------------------------------------------------------------------
1018 // handle from E2T Subscription Delete Response
1019 //-------------------------------------------------------------------
1020 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
1021 xapp.Logger.Info("MSG from E2T: %s", params.String())
1022 c.UpdateCounter(cSubDelRespFromE2)
1023 subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
1025 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1028 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
1030 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1033 trans := subs.GetTransaction()
1035 err = fmt.Errorf("Ongoing transaction not found")
1036 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
1039 sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
1040 if sendOk == false {
1041 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1042 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
1047 //-------------------------------------------------------------------
1048 // handle from E2T Subscription Delete Failure
1049 //-------------------------------------------------------------------
1050 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
1051 xapp.Logger.Info("MSG from E2T: %s", params.String())
1052 c.UpdateCounter(cSubDelFailFromE2)
1053 subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
1055 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1058 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
1060 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1063 trans := subs.GetTransaction()
1065 err = fmt.Errorf("Ongoing transaction not found")
1066 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
1069 sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
1070 if sendOk == false {
1071 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1072 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
1077 //-------------------------------------------------------------------
1079 //-------------------------------------------------------------------
1080 func typeofSubsMessage(v interface{}) string {
1085 //case *e2ap.E2APSubscriptionRequest:
1087 case *e2ap.E2APSubscriptionResponse:
1089 case *e2ap.E2APSubscriptionFailure:
1091 //case *e2ap.E2APSubscriptionDeleteRequest:
1092 // return "SubDelReq"
1093 case *e2ap.E2APSubscriptionDeleteResponse:
1095 case *e2ap.E2APSubscriptionDeleteFailure:
1102 //-------------------------------------------------------------------
1104 //-------------------------------------------------------------------
1105 func (c *Control) WriteSubscriptionToDb(subs *Subscription) {
1106 xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
1107 err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
1109 xapp.Logger.Error("%v", err)
1113 //-------------------------------------------------------------------
1115 //-------------------------------------------------------------------
1116 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) {
1118 if removeSubscriptionFromDb == true {
1119 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1120 c.RemoveSubscriptionFromDb(subs)
1122 // Update is needed for successful response and merge case here
1123 if subs.RetryFromXapp == false {
1124 c.WriteSubscriptionToDb(subs)
1127 subs.RetryFromXapp = false
1130 //-------------------------------------------------------------------
1132 //-------------------------------------------------------------------
1133 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
1134 xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
1135 err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
1137 xapp.Logger.Error("%v", err)
1141 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) {
1143 const ricRequestorId = 123
1144 xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
1146 // Send delete for every endpoint in the subscription
1147 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1148 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1149 subDelReqMsg.RequestId.Id = ricRequestorId
1150 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1151 mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1153 xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
1156 for _, endPoint := range subs.EpList.Endpoints {
1157 params := &xapp.RMRParams{}
1158 params.Mtype = mType
1159 params.SubId = int(subs.GetReqId().InstanceId)
1161 params.Meid = subs.Meid
1162 params.Src = endPoint.String()
1163 params.PayloadLen = len(payload.Buf)
1164 params.Payload = payload.Buf
1166 subs.DeleteFromDb = true
1167 c.handleXAPPSubscriptionDeleteRequest(params)
1171 func (c *Control) PrintRESTSubscriptionRequest(p *models.SubscriptionParams) {
1173 fmt.Println("CRESTSubscriptionRequest")
1175 if p.SubscriptionID != "" {
1176 fmt.Println(" SubscriptionID = ", p.SubscriptionID)
1178 fmt.Println(" SubscriptionID = ''")
1181 fmt.Printf(" ClientEndpoint.Host = %s\n", p.ClientEndpoint.Host)
1183 if p.ClientEndpoint.HTTPPort != nil {
1184 fmt.Printf(" ClientEndpoint.HTTPPort = %v\n", *p.ClientEndpoint.HTTPPort)
1186 fmt.Println(" ClientEndpoint.HTTPPort = nil")
1189 if p.ClientEndpoint.RMRPort != nil {
1190 fmt.Printf(" ClientEndpoint.RMRPort = %v\n", *p.ClientEndpoint.RMRPort)
1192 fmt.Println(" ClientEndpoint.RMRPort = nil")
1196 fmt.Printf(" Meid = %s\n", *p.Meid)
1198 fmt.Println(" Meid = nil")
1201 for _, subscriptionDetail := range p.SubscriptionDetails {
1202 if p.RANFunctionID != nil {
1203 fmt.Printf(" RANFunctionID = %v\n", *p.RANFunctionID)
1205 fmt.Println(" RANFunctionID = nil")
1207 fmt.Printf(" SubscriptionDetail.XappEventInstanceID = %v\n", *subscriptionDetail.XappEventInstanceID)
1208 fmt.Printf(" SubscriptionDetail.EventTriggers = %v\n", subscriptionDetail.EventTriggers)
1210 for _, actionToBeSetup := range subscriptionDetail.ActionToBeSetupList {
1211 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionID = %v\n", *actionToBeSetup.ActionID)
1212 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionType = %s\n", *actionToBeSetup.ActionType)
1213 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionDefinition = %v\n", actionToBeSetup.ActionDefinition)
1215 if actionToBeSetup.SubsequentAction != nil {
1216 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.SubsequentAction.SubsequentActionType = %s\n", *actionToBeSetup.SubsequentAction.SubsequentActionType)
1217 fmt.Printf(" SubscriptionDetail.ActionToBeSetup..SubsequentAction.TimeToWait = %s\n", *actionToBeSetup.SubsequentAction.TimeToWait)
1219 fmt.Println(" SubscriptionDetail.ActionToBeSetup.SubsequentAction = nil")