2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
30 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
31 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
32 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
33 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
34 httptransport "github.com/go-openapi/runtime/client"
35 "github.com/go-openapi/strfmt"
36 "github.com/gorilla/mux"
37 "github.com/segmentio/ksuid"
38 "github.com/spf13/viper"
41 //-----------------------------------------------------------------------------
43 //-----------------------------------------------------------------------------
45 func idstring(err error, entries ...fmt.Stringer) string {
46 var retval string = ""
47 var filler string = ""
48 for _, entry := range entries {
49 retval += filler + entry.String()
53 retval += filler + "err(" + err.Error() + ")"
59 //-----------------------------------------------------------------------------
61 //-----------------------------------------------------------------------------
63 var e2tSubReqTimeout time.Duration
64 var e2tSubDelReqTime time.Duration
65 var e2tRecvMsgTimeout time.Duration
66 var waitRouteCleanup_ms time.Duration
67 var e2tMaxSubReqTryCount uint64 // Initial try + retry
68 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
69 var readSubsFromDb string
79 Counters map[string]xapp.Counter
89 type SubmgrRestartTestEvent struct{}
90 type SubmgrRestartUpEvent struct{}
93 xapp.Logger.Info("SUBMGR")
95 viper.SetEnvPrefix("submgr")
96 viper.AllowEmptyEnv(true)
99 func NewControl() *Control {
101 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
102 rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
104 registry := new(Registry)
105 registry.Initialize()
106 registry.rtmgrClient = &rtmgrClient
108 tracker := new(Tracker)
111 c := &Control{e2ap: new(E2ap),
115 Counters: xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
118 c.ReadConfigParameters("")
120 // Register REST handler for testing support
121 xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
122 xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET")
124 go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler, c.SubscriptionDeleteHandlerCB)
126 if readSubsFromDb == "false" {
130 // Read subscriptions from db
131 xapp.Logger.Info("Reading subscriptions from db")
132 subIds, register, err := c.ReadAllSubscriptionsFromSdl()
134 xapp.Logger.Error("%v", err)
136 c.registry.subIds = subIds
137 c.registry.register = register
138 c.HandleUncompletedSubscriptions(register)
143 func (c *Control) SymptomDataHandler(w http.ResponseWriter, r *http.Request) {
144 subscriptions, _ := c.registry.QueryHandler()
145 xapp.Resource.SendSymptomDataJson(w, r, subscriptions, "platform/subscriptions.json")
148 //-------------------------------------------------------------------
150 //-------------------------------------------------------------------
151 func (c *Control) ReadConfigParameters(f string) {
153 // viper.GetDuration returns nanoseconds
154 e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
155 if e2tSubReqTimeout == 0 {
156 e2tSubReqTimeout = 2000 * 1000000
158 xapp.Logger.Info("e2tSubReqTimeout %v", e2tSubReqTimeout)
159 e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
160 if e2tSubDelReqTime == 0 {
161 e2tSubDelReqTime = 2000 * 1000000
163 xapp.Logger.Info("e2tSubDelReqTime %v", e2tSubDelReqTime)
164 e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
165 if e2tRecvMsgTimeout == 0 {
166 e2tRecvMsgTimeout = 2000 * 1000000
168 xapp.Logger.Info("e2tRecvMsgTimeout %v", e2tRecvMsgTimeout)
170 // Internal cfg parameter, used to define a wait time for RMR route clean-up. None default
171 // value 100ms used currently only in unittests.
172 waitRouteCleanup_ms = viper.GetDuration("controls.waitRouteCleanup_ms") * 1000000
173 if waitRouteCleanup_ms == 0 {
174 waitRouteCleanup_ms = 5000 * 1000000
176 xapp.Logger.Info("waitRouteCleanup %v", waitRouteCleanup_ms)
178 e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
179 if e2tMaxSubReqTryCount == 0 {
180 e2tMaxSubReqTryCount = 1
182 xapp.Logger.Info("e2tMaxSubReqTryCount %v", e2tMaxSubReqTryCount)
184 e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
185 if e2tMaxSubDelReqTryCount == 0 {
186 e2tMaxSubDelReqTryCount = 1
188 xapp.Logger.Info("e2tMaxSubDelReqTryCount %v", e2tMaxSubDelReqTryCount)
190 readSubsFromDb = viper.GetString("controls.readSubsFromDb")
191 if readSubsFromDb == "" {
192 readSubsFromDb = "true"
194 xapp.Logger.Info("readSubsFromDb %v", readSubsFromDb)
195 c.LoggerLevel = viper.GetUint32("logger.level")
196 if c.LoggerLevel == 0 {
201 //-------------------------------------------------------------------
203 //-------------------------------------------------------------------
204 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
206 xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
207 for subId, subs := range register {
208 if subs.SubRespRcvd == false {
209 subs.NoRespToXapp = true
210 xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
211 c.SendSubscriptionDeleteReq(subs)
216 func (c *Control) ReadyCB(data interface{}) {
217 if c.RMRClient == nil {
218 c.RMRClient = xapp.Rmr
222 func (c *Control) Run() {
223 xapp.SetReadyCB(c.ReadyCB, nil)
224 xapp.AddConfigChangeListener(c.ReadConfigParameters)
228 //-------------------------------------------------------------------
230 //-------------------------------------------------------------------
231 func (c *Control) SubscriptionHandler(params interface{}) (*models.SubscriptionResponse, error) {
234 c.UpdateCounter(cRestSubReqFromXapp)
236 subResp := models.SubscriptionResponse{}
237 p := params.(*models.SubscriptionParams)
239 if c.LoggerLevel > 2 {
240 c.PrintRESTSubscriptionRequest(p)
243 if p.ClientEndpoint == nil {
244 xapp.Logger.Error("ClientEndpoint == nil")
245 c.UpdateCounter(cRestSubFailToXapp)
246 return nil, fmt.Errorf("")
249 _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*p.ClientEndpoint)
251 xapp.Logger.Error("%s", err.Error())
252 c.UpdateCounter(cRestSubFailToXapp)
256 var restSubscription *RESTSubscription
257 if p.SubscriptionID == "" {
258 restSubId = ksuid.New().String()
259 restSubscription, err = c.registry.CreateRESTSubscription(&restSubId, &xAppRmrEndpoint, p.Meid)
261 xapp.Logger.Error("%s", err.Error())
262 c.UpdateCounter(cRestSubFailToXapp)
267 restSubId = p.SubscriptionID
268 restSubscription, err = c.registry.GetRESTSubscription(restSubId, false)
270 xapp.Logger.Error("%s", err.Error())
271 c.UpdateCounter(cRestSubFailToXapp)
276 subResp.SubscriptionID = &restSubId
277 subReqList := e2ap.SubscriptionRequestList{}
278 err = c.e2ap.FillSubscriptionReqMsgs(params, &subReqList, restSubscription)
280 xapp.Logger.Error("%s", err.Error())
281 c.registry.DeleteRESTSubscription(&restSubId)
282 c.UpdateCounter(cRestSubFailToXapp)
286 go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId)
288 c.UpdateCounter(cRestSubRespToXapp)
292 //-------------------------------------------------------------------
294 //-------------------------------------------------------------------
296 func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription, subReqList *e2ap.SubscriptionRequestList,
297 clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string) {
299 xapp.Logger.Info("Subscription Request count=%v ", len(subReqList.E2APSubscriptionRequests))
301 _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*clientEndpoint)
303 xapp.Logger.Error("%s", err.Error())
307 var xAppEventInstanceID int64
308 var e2EventInstanceID int64
309 for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ {
310 subReqMsg := subReqList.E2APSubscriptionRequests[index]
312 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), *restSubId, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid})
314 c.registry.DeleteRESTSubscription(restSubId)
315 xapp.Logger.Error("XAPP-SubReq transaction not created. RESTSubId=%s, EndPoint=%s, Meid=%s", *restSubId, xAppRmrEndpoint, *meid)
319 defer trans.Release()
320 xapp.Logger.Info("Handle SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
321 subRespMsg, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, restSubId)
323 // Send notification to xApp that prosessing of a Subscription Request has failed.
324 xAppEventInstanceID = (int64)(subReqMsg.RequestId.Id)
325 e2EventInstanceID = (int64)(0)
326 resp := &models.SubscriptionResponse{
327 SubscriptionID: restSubId,
328 SubscriptionInstances: []*models.SubscriptionInstance{
329 &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
330 ErrorCause: nil, //TODO: Suitable Error cause.
331 XappEventInstanceID: &xAppEventInstanceID},
334 // Mark REST subscription request processed.
335 restSubscription.SetProcessed()
336 xapp.Logger.Info("Sending unsuccessful REST notification to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
337 clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
338 xapp.Subscription.Notify(resp, *clientEndpoint)
339 c.UpdateCounter(cRestSubFailNotifToXapp)
341 xAppEventInstanceID = (int64)(subRespMsg.RequestId.Id)
342 e2EventInstanceID = (int64)(subRespMsg.RequestId.InstanceId)
344 xapp.Logger.Info("SubscriptionRequest index=%v processed successfully. endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
345 index, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
347 // Store successfully processed InstanceId for deletion
348 restSubscription.AddE2InstanceId(subRespMsg.RequestId.InstanceId)
349 restSubscription.AddXappIdToE2Id(xAppEventInstanceID, e2EventInstanceID)
351 // Send notification to xApp that a Subscription Request has been processed.
352 resp := &models.SubscriptionResponse{
353 SubscriptionID: restSubId,
354 SubscriptionInstances: []*models.SubscriptionInstance{
355 &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
357 XappEventInstanceID: &xAppEventInstanceID},
360 // Mark REST subscription request processesd.
361 restSubscription.SetProcessed()
362 xapp.Logger.Info("Sending successful REST notification to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
363 clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
364 xapp.Subscription.Notify(resp, *clientEndpoint)
365 c.UpdateCounter(cRestSubNotifToXapp)
371 //-------------------------------------------------------------------
373 //------------------------------------------------------------------
374 func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, meid *string,
375 restSubId *string) (*e2ap.E2APSubscriptionResponse, error) {
377 err := c.tracker.Track(trans)
379 err = fmt.Errorf("XAPP-SubReq: %s", idstring(err, trans))
380 xapp.Logger.Error("%s", err.Error())
384 subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
386 err = fmt.Errorf("XAPP-SubReq: %s", idstring(err, trans))
387 xapp.Logger.Error("%s", err.Error())
394 go c.handleSubscriptionCreate(subs, trans)
395 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
399 switch themsg := event.(type) {
400 case *e2ap.E2APSubscriptionResponse:
403 case *e2ap.E2APSubscriptionFailure:
404 err = fmt.Errorf("SubscriptionFailure received")
410 err = fmt.Errorf("XAPP-SubReq: failed %s", idstring(err, trans, subs))
411 xapp.Logger.Error("%s", err.Error())
412 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
416 //-------------------------------------------------------------------
418 //-------------------------------------------------------------------
419 func (c *Control) SubscriptionDeleteHandlerCB(restSubId string) error {
422 c.UpdateCounter(cRestSubDelReqFromXapp)
424 xapp.Logger.Info("SubscriptionDeleteRequest from XAPP")
426 restSubscription, err := c.registry.GetRESTSubscription(restSubId, true)
428 xapp.Logger.Error("%s", err.Error())
429 if restSubscription == nil {
430 // Subscription was not found
433 if restSubscription.SubReqOngoing == true {
434 err := fmt.Errorf("Handling of the REST Subscription Request still ongoing %s", restSubId)
435 xapp.Logger.Error("%s", err.Error())
437 } else if restSubscription.SubDelReqOngoing == true {
438 // Previous request for same restSubId still ongoing
444 xAppRmrEndPoint := restSubscription.xAppRmrEndPoint
446 for _, instanceId := range restSubscription.InstanceIds {
447 xAppEventInstanceID, err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId)
450 xapp.Logger.Error("%s", err.Error())
453 xapp.Logger.Info("Deleteting instanceId = %v", instanceId)
454 restSubscription.DeleteXappIdToE2Id(xAppEventInstanceID)
455 restSubscription.DeleteE2InstanceId(instanceId)
457 c.registry.DeleteRESTSubscription(&restSubId)
460 c.UpdateCounter(cRestSubDelRespToXapp)
465 //-------------------------------------------------------------------
467 //-------------------------------------------------------------------
468 func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32) (int64, error) {
470 var xAppEventInstanceID int64
471 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{instanceId})
473 xapp.Logger.Info("Subscription Delete Handler subscription for restSubId=%v, E2EventInstanceID=%v not found %s",
474 restSubId, instanceId, idstring(err, nil))
475 return xAppEventInstanceID, nil
478 xAppEventInstanceID = int64(subs.ReqId.Id)
479 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(*endPoint), *restSubId, e2ap.RequestId{subs.ReqId.Id, 0}, &xapp.RMRMeid{RanName: *meid})
481 err := fmt.Errorf("XAPP-SubDelReq transaction not created. restSubId %s, endPoint %s, meid %s, instanceId %v", *restSubId, *endPoint, *meid, instanceId)
482 xapp.Logger.Error("%s", err.Error())
484 defer trans.Release()
486 err = c.tracker.Track(trans)
488 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
489 xapp.Logger.Error("%s", err.Error())
490 return xAppEventInstanceID, &time.ParseError{}
495 go c.handleSubscriptionDelete(subs, trans)
496 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
498 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
500 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
502 return xAppEventInstanceID, nil
505 //-------------------------------------------------------------------
507 //-------------------------------------------------------------------
508 func (c *Control) QueryHandler() (models.SubscriptionList, error) {
509 xapp.Logger.Info("QueryHandler() called")
513 return c.registry.QueryHandler()
516 func (c *Control) TestRestHandler(w http.ResponseWriter, r *http.Request) {
517 xapp.Logger.Info("TestRestHandler() called")
519 pathParams := mux.Vars(r)
520 s := pathParams["testId"]
522 // This can be used to delete single subscription from db
523 if contains := strings.Contains(s, "deletesubid="); contains == true {
524 var splits = strings.Split(s, "=")
525 if subId, err := strconv.ParseInt(splits[1], 10, 64); err == nil {
526 xapp.Logger.Info("RemoveSubscriptionFromSdl() called. subId = %v", subId)
527 c.RemoveSubscriptionFromSdl(uint32(subId))
532 // This can be used to remove all subscriptions db from
534 xapp.Logger.Info("RemoveAllSubscriptionsFromSdl() called")
535 c.RemoveAllSubscriptionsFromSdl()
539 // This is meant to cause submgr's restart in testing
541 xapp.Logger.Info("os.Exit(1) called")
545 xapp.Logger.Info("Unsupported rest command received %s", s)
548 //-------------------------------------------------------------------
550 //-------------------------------------------------------------------
552 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
553 params := &xapp.RMRParams{}
554 params.Mtype = trans.GetMtype()
555 params.SubId = int(subs.GetReqId().InstanceId)
557 params.Meid = subs.GetMeid()
559 params.PayloadLen = len(trans.Payload.Buf)
560 params.Payload = trans.Payload.Buf
562 xapp.Logger.Info("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
563 err = c.SendWithRetry(params, false, 5)
565 xapp.Logger.Error("rmrSendToE2T: Send failed: %+v", err)
570 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
572 params := &xapp.RMRParams{}
573 params.Mtype = trans.GetMtype()
574 params.SubId = int(subs.GetReqId().InstanceId)
575 params.Xid = trans.GetXid()
576 params.Meid = trans.GetMeid()
578 params.PayloadLen = len(trans.Payload.Buf)
579 params.Payload = trans.Payload.Buf
581 xapp.Logger.Info("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
582 err = c.SendWithRetry(params, false, 5)
584 xapp.Logger.Error("rmrSendToXapp: Send failed: %+v", err)
589 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
590 if c.RMRClient == nil {
591 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
592 xapp.Logger.Error("%s", err.Error())
597 defer c.RMRClient.Free(msg.Mbuf)
599 // xapp-frame might use direct access to c buffer and
600 // when msg.Mbuf is freed, someone might take it into use
601 // and payload data might be invalid inside message handle function
603 // subscriptions won't load system a lot so there is no
604 // real performance hit by cloning buffer into new go byte slice
605 cPay := append(msg.Payload[:0:0], msg.Payload...)
607 msg.PayloadLen = len(cPay)
610 case xapp.RIC_SUB_REQ:
611 go c.handleXAPPSubscriptionRequest(msg)
612 case xapp.RIC_SUB_RESP:
613 go c.handleE2TSubscriptionResponse(msg)
614 case xapp.RIC_SUB_FAILURE:
615 go c.handleE2TSubscriptionFailure(msg)
616 case xapp.RIC_SUB_DEL_REQ:
617 go c.handleXAPPSubscriptionDeleteRequest(msg)
618 case xapp.RIC_SUB_DEL_RESP:
619 go c.handleE2TSubscriptionDeleteResponse(msg)
620 case xapp.RIC_SUB_DEL_FAILURE:
621 go c.handleE2TSubscriptionDeleteFailure(msg)
623 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
628 //-------------------------------------------------------------------
629 // handle from XAPP Subscription Request
630 //------------------------------------------------------------------
631 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
632 xapp.Logger.Info("MSG from XAPP: %s", params.String())
633 c.UpdateCounter(cSubReqFromXapp)
635 subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
637 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
641 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
643 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
646 defer trans.Release()
648 if err = c.tracker.Track(trans); err != nil {
649 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
653 //TODO handle subscription toward e2term inside AssignToSubscription / hide handleSubscriptionCreate in it?
654 subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
656 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
660 c.wakeSubscriptionRequest(subs, trans)
663 //-------------------------------------------------------------------
664 // Wake Subscription Request to E2node
665 //------------------------------------------------------------------
666 func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) {
668 go c.handleSubscriptionCreate(subs, trans)
669 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
672 switch themsg := event.(type) {
673 case *e2ap.E2APSubscriptionResponse:
674 themsg.RequestId.Id = trans.RequestId.Id
675 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
678 c.UpdateCounter(cSubRespToXapp)
679 c.rmrSendToXapp("", subs, trans)
682 case *e2ap.E2APSubscriptionFailure:
683 themsg.RequestId.Id = trans.RequestId.Id
684 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
686 c.UpdateCounter(cSubFailToXapp)
687 c.rmrSendToXapp("", subs, trans)
693 xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(err, trans, subs))
694 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
697 //-------------------------------------------------------------------
698 // handle from XAPP Subscription Delete Request
699 //------------------------------------------------------------------
700 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
701 xapp.Logger.Info("MSG from XAPP: %s", params.String())
702 c.UpdateCounter(cSubDelReqFromXapp)
704 subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
706 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
710 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
712 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
715 defer trans.Release()
717 err = c.tracker.Track(trans)
719 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
723 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
725 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
732 go c.handleSubscriptionDelete(subs, trans)
733 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
735 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
737 if subs.NoRespToXapp == true {
738 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
742 // Whatever is received success, fail or timeout, send successful delete response
743 subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
744 subDelRespMsg.RequestId.Id = trans.RequestId.Id
745 subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
746 subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
747 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
749 c.UpdateCounter(cSubDelRespToXapp)
750 c.rmrSendToXapp("", subs, trans)
753 //TODO handle subscription toward e2term insiged RemoveFromSubscription / hide handleSubscriptionDelete in it?
754 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
757 //-------------------------------------------------------------------
758 // SUBS CREATE Handling
759 //-------------------------------------------------------------------
760 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp) {
762 var removeSubscriptionFromDb bool = false
763 trans := c.tracker.NewSubsTransaction(subs)
764 subs.WaitTransactionTurn(trans)
765 defer subs.ReleaseTransactionTurn(trans)
766 defer trans.Release()
768 xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
770 subRfMsg, valid := subs.GetCachedResponse()
771 if subRfMsg == nil && valid == true {
772 event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
773 switch event.(type) {
774 case *e2ap.E2APSubscriptionResponse:
775 subRfMsg, valid = subs.SetCachedResponse(event, true)
776 subs.SubRespRcvd = true
777 case *e2ap.E2APSubscriptionFailure:
778 removeSubscriptionFromDb = true
779 subRfMsg, valid = subs.SetCachedResponse(event, false)
780 xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
781 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
782 case *SubmgrRestartTestEvent:
783 // This simulates that no response has been received and after restart subscriptions are restored from db
784 xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
787 xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
788 removeSubscriptionFromDb = true
789 subRfMsg, valid = subs.SetCachedResponse(nil, false)
790 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
792 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
794 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
797 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
799 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
802 c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
803 parentTrans.SendEvent(subRfMsg, 0)
806 //-------------------------------------------------------------------
807 // SUBS DELETE Handling
808 //-------------------------------------------------------------------
810 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
812 trans := c.tracker.NewSubsTransaction(subs)
813 subs.WaitTransactionTurn(trans)
814 defer subs.ReleaseTransactionTurn(trans)
815 defer trans.Release()
817 xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
821 if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
824 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
828 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
829 // If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if
830 // RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...))
831 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
832 c.registry.UpdateSubscriptionToDb(subs, c)
833 parentTrans.SendEvent(nil, 0)
836 //-------------------------------------------------------------------
837 // send to E2T Subscription Request
838 //-------------------------------------------------------------------
839 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
841 var event interface{} = nil
842 var timedOut bool = false
844 subReqMsg := subs.SubReqMsg
845 subReqMsg.RequestId = subs.GetReqId().RequestId
846 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
848 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
852 // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
853 c.WriteSubscriptionToDb(subs)
855 for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
856 desc := fmt.Sprintf("(retry %d)", retries)
858 c.UpdateCounter(cSubReqToE2)
860 c.UpdateCounter(cSubReReqToE2)
862 c.rmrSendToE2T(desc, subs, trans)
863 if subs.DoNotWaitSubResp == false {
864 event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
866 c.UpdateCounter(cSubReqTimerExpiry)
870 // Simulating case where subscrition request has been sent but response has not been received before restart
871 event = &SubmgrRestartTestEvent{}
875 xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
879 //-------------------------------------------------------------------
880 // send to E2T Subscription Delete Request
881 //-------------------------------------------------------------------
883 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
885 var event interface{}
888 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
889 subDelReqMsg.RequestId = subs.GetReqId().RequestId
890 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
891 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
893 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
897 for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
898 desc := fmt.Sprintf("(retry %d)", retries)
900 c.UpdateCounter(cSubDelReqToE2)
902 c.UpdateCounter(cSubDelReReqToE2)
904 c.rmrSendToE2T(desc, subs, trans)
905 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
907 c.UpdateCounter(cSubDelReqTimerExpiry)
912 xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
916 //-------------------------------------------------------------------
917 // handle from E2T Subscription Response
918 //-------------------------------------------------------------------
919 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
920 xapp.Logger.Info("MSG from E2T: %s", params.String())
921 c.UpdateCounter(cSubRespFromE2)
923 subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
925 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
928 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
930 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
933 trans := subs.GetTransaction()
935 err = fmt.Errorf("Ongoing transaction not found")
936 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
939 sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
941 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
942 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
947 //-------------------------------------------------------------------
948 // handle from E2T Subscription Failure
949 //-------------------------------------------------------------------
950 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
951 xapp.Logger.Info("MSG from E2T: %s", params.String())
952 c.UpdateCounter(cSubFailFromE2)
953 subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
955 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
958 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
960 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
963 trans := subs.GetTransaction()
965 err = fmt.Errorf("Ongoing transaction not found")
966 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
969 sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
971 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
972 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
977 //-------------------------------------------------------------------
978 // handle from E2T Subscription Delete Response
979 //-------------------------------------------------------------------
980 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
981 xapp.Logger.Info("MSG from E2T: %s", params.String())
982 c.UpdateCounter(cSubDelRespFromE2)
983 subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
985 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
988 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
990 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
993 trans := subs.GetTransaction()
995 err = fmt.Errorf("Ongoing transaction not found")
996 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
999 sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
1000 if sendOk == false {
1001 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1002 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
1007 //-------------------------------------------------------------------
1008 // handle from E2T Subscription Delete Failure
1009 //-------------------------------------------------------------------
1010 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
1011 xapp.Logger.Info("MSG from E2T: %s", params.String())
1012 c.UpdateCounter(cSubDelFailFromE2)
1013 subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
1015 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1018 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
1020 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1023 trans := subs.GetTransaction()
1025 err = fmt.Errorf("Ongoing transaction not found")
1026 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
1029 sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
1030 if sendOk == false {
1031 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1032 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
1037 //-------------------------------------------------------------------
1039 //-------------------------------------------------------------------
1040 func typeofSubsMessage(v interface{}) string {
1045 //case *e2ap.E2APSubscriptionRequest:
1047 case *e2ap.E2APSubscriptionResponse:
1049 case *e2ap.E2APSubscriptionFailure:
1051 //case *e2ap.E2APSubscriptionDeleteRequest:
1052 // return "SubDelReq"
1053 case *e2ap.E2APSubscriptionDeleteResponse:
1055 case *e2ap.E2APSubscriptionDeleteFailure:
1062 //-------------------------------------------------------------------
1064 //-------------------------------------------------------------------
1065 func (c *Control) WriteSubscriptionToDb(subs *Subscription) {
1066 xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
1067 err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
1069 xapp.Logger.Error("%v", err)
1073 //-------------------------------------------------------------------
1075 //-------------------------------------------------------------------
1076 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) {
1078 if removeSubscriptionFromDb == true {
1079 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1080 c.RemoveSubscriptionFromDb(subs)
1082 // Update is needed for successful response and merge case here
1083 if subs.RetryFromXapp == false {
1084 c.WriteSubscriptionToDb(subs)
1087 subs.RetryFromXapp = false
1090 //-------------------------------------------------------------------
1092 //-------------------------------------------------------------------
1093 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
1094 xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
1095 err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
1097 xapp.Logger.Error("%v", err)
1101 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) {
1103 xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
1105 // Send delete for every endpoint in the subscription
1106 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1107 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1108 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1109 mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1111 xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
1114 for _, endPoint := range subs.EpList.Endpoints {
1115 params := &xapp.RMRParams{}
1116 params.Mtype = mType
1117 params.SubId = int(subs.GetReqId().InstanceId)
1119 params.Meid = subs.Meid
1120 params.Src = endPoint.String()
1121 params.PayloadLen = len(payload.Buf)
1122 params.Payload = payload.Buf
1124 subs.DeleteFromDb = true
1125 c.handleXAPPSubscriptionDeleteRequest(params)
1129 func (c *Control) PrintRESTSubscriptionRequest(p *models.SubscriptionParams) {
1131 fmt.Println("CRESTSubscriptionRequest")
1132 fmt.Printf(" ClientEndpoint.Host = %s\n", p.ClientEndpoint.Host)
1134 if p.ClientEndpoint.HTTPPort != nil {
1135 fmt.Printf(" ClientEndpoint.HTTPPort = %v\n", *p.ClientEndpoint.HTTPPort)
1137 fmt.Println(" ClientEndpoint.HTTPPort = nil")
1140 if p.ClientEndpoint.RMRPort != nil {
1141 fmt.Printf(" ClientEndpoint.RMRPort = %v\n", *p.ClientEndpoint.RMRPort)
1143 fmt.Println(" ClientEndpoint.RMRPort = nil")
1147 fmt.Printf(" Meid = %s\n", *p.Meid)
1149 fmt.Println(" Meid = nil")
1152 for _, subscriptionDetail := range p.SubscriptionDetails {
1153 if p.RANFunctionID != nil {
1154 fmt.Printf(" RANFunctionID = %v\n", *p.RANFunctionID)
1156 fmt.Println(" RANFunctionID = nil")
1158 fmt.Printf(" SubscriptionDetail.XappEventInstanceID = %v\n", *subscriptionDetail.XappEventInstanceID)
1159 fmt.Printf(" SubscriptionDetail.EventTriggers.OctetString = %X\n", subscriptionDetail.EventTriggers.OctetString)
1161 for _, actionToBeSetup := range subscriptionDetail.ActionToBeSetupList {
1162 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionID = %v\n", *actionToBeSetup.ActionID)
1163 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionType = %s\n", *actionToBeSetup.ActionType)
1164 if actionToBeSetup.ActionDefinition != nil {
1165 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionDefinition.OctetString = %X\n", actionToBeSetup.ActionDefinition.OctetString)
1167 fmt.Println(" SubscriptionDetail.ActionToBeSetup.ActionDefinition = nil")
1170 if actionToBeSetup.SubsequentAction != nil {
1171 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.SubsequentAction.SubsequentActionType = %s\n", *actionToBeSetup.SubsequentAction.SubsequentActionType)
1172 fmt.Printf(" SubscriptionDetail.ActionToBeSetup..SubsequentAction.TimeToWait = %s\n", *actionToBeSetup.SubsequentAction.TimeToWait)
1174 fmt.Println(" SubscriptionDetail.ActionToBeSetup.SubsequentAction = nil")