2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
30 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
31 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
32 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
33 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
34 httptransport "github.com/go-openapi/runtime/client"
35 "github.com/go-openapi/strfmt"
36 "github.com/gorilla/mux"
37 "github.com/segmentio/ksuid"
38 "github.com/spf13/viper"
41 //-----------------------------------------------------------------------------
43 //-----------------------------------------------------------------------------
45 func idstring(err error, entries ...fmt.Stringer) string {
46 var retval string = ""
47 var filler string = ""
48 for _, entry := range entries {
49 retval += filler + entry.String()
53 retval += filler + "err(" + err.Error() + ")"
59 //-----------------------------------------------------------------------------
61 //-----------------------------------------------------------------------------
63 var e2tSubReqTimeout time.Duration
64 var e2tSubDelReqTime time.Duration
65 var e2tRecvMsgTimeout time.Duration
66 var waitRouteCleanup_ms time.Duration
67 var e2tMaxSubReqTryCount uint64 // Initial try + retry
68 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
69 var readSubsFromDb string
79 Counters map[string]xapp.Counter
89 type SubmgrRestartTestEvent struct{}
90 type SubmgrRestartUpEvent struct{}
93 xapp.Logger.Info("SUBMGR")
95 viper.SetEnvPrefix("submgr")
96 viper.AllowEmptyEnv(true)
99 func NewControl() *Control {
101 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
102 rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
104 registry := new(Registry)
105 registry.Initialize()
106 registry.rtmgrClient = &rtmgrClient
108 tracker := new(Tracker)
111 c := &Control{e2ap: new(E2ap),
115 Counters: xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
118 c.ReadConfigParameters("")
120 // Register REST handler for testing support
121 xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
122 xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET")
124 go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler, c.SubscriptionDeleteHandlerCB)
126 if readSubsFromDb == "false" {
130 // Read subscriptions from db
131 xapp.Logger.Info("Reading subscriptions from db")
132 subIds, register, err := c.ReadAllSubscriptionsFromSdl()
134 xapp.Logger.Error("%v", err)
136 c.registry.subIds = subIds
137 c.registry.register = register
138 c.HandleUncompletedSubscriptions(register)
143 func (c *Control) SymptomDataHandler(w http.ResponseWriter, r *http.Request) {
144 subscriptions, _ := c.registry.QueryHandler()
145 xapp.Resource.SendSymptomDataJson(w, r, subscriptions, "platform/subscriptions.json")
148 //-------------------------------------------------------------------
150 //-------------------------------------------------------------------
151 func (c *Control) ReadConfigParameters(f string) {
153 // viper.GetDuration returns nanoseconds
154 e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
155 if e2tSubReqTimeout == 0 {
156 e2tSubReqTimeout = 2000 * 1000000
158 xapp.Logger.Info("e2tSubReqTimeout %v", e2tSubReqTimeout)
159 e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
160 if e2tSubDelReqTime == 0 {
161 e2tSubDelReqTime = 2000 * 1000000
163 xapp.Logger.Info("e2tSubDelReqTime %v", e2tSubDelReqTime)
164 e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
165 if e2tRecvMsgTimeout == 0 {
166 e2tRecvMsgTimeout = 2000 * 1000000
168 xapp.Logger.Info("e2tRecvMsgTimeout %v", e2tRecvMsgTimeout)
170 // Internal cfg parameter, used to define a wait time for RMR route clean-up. None default
171 // value 100ms used currently only in unittests.
172 waitRouteCleanup_ms = viper.GetDuration("controls.waitRouteCleanup_ms") * 1000000
173 if waitRouteCleanup_ms == 0 {
174 waitRouteCleanup_ms = 5000 * 1000000
176 xapp.Logger.Info("waitRouteCleanup %v", waitRouteCleanup_ms)
178 e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
179 if e2tMaxSubReqTryCount == 0 {
180 e2tMaxSubReqTryCount = 1
182 xapp.Logger.Info("e2tMaxSubReqTryCount %v", e2tMaxSubReqTryCount)
184 e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
185 if e2tMaxSubDelReqTryCount == 0 {
186 e2tMaxSubDelReqTryCount = 1
188 xapp.Logger.Info("e2tMaxSubDelReqTryCount %v", e2tMaxSubDelReqTryCount)
190 readSubsFromDb = viper.GetString("controls.readSubsFromDb")
191 if readSubsFromDb == "" {
192 readSubsFromDb = "true"
194 xapp.Logger.Info("readSubsFromDb %v", readSubsFromDb)
195 c.LoggerLevel = viper.GetUint32("logger.level")
196 if c.LoggerLevel == 0 {
201 //-------------------------------------------------------------------
203 //-------------------------------------------------------------------
204 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
206 xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
207 for subId, subs := range register {
208 if subs.SubRespRcvd == false {
209 subs.NoRespToXapp = true
210 xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
211 c.SendSubscriptionDeleteReq(subs)
216 func (c *Control) ReadyCB(data interface{}) {
217 if c.RMRClient == nil {
218 c.RMRClient = xapp.Rmr
222 func (c *Control) Run() {
223 xapp.SetReadyCB(c.ReadyCB, nil)
224 xapp.AddConfigChangeListener(c.ReadConfigParameters)
228 //-------------------------------------------------------------------
230 //-------------------------------------------------------------------
231 func (c *Control) SubscriptionHandler(params interface{}) (*models.SubscriptionResponse, error) {
234 c.UpdateCounter(cRestSubReqFromXapp)
236 subResp := models.SubscriptionResponse{}
237 p := params.(*models.SubscriptionParams)
239 if c.LoggerLevel > 2 {
240 c.PrintRESTSubscriptionRequest(p)
243 if p.ClientEndpoint == nil {
244 xapp.Logger.Error("ClientEndpoint == nil")
245 c.UpdateCounter(cRestSubFailToXapp)
246 return nil, fmt.Errorf("")
249 _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*p.ClientEndpoint)
251 xapp.Logger.Error("%s", err.Error())
252 c.UpdateCounter(cRestSubFailToXapp)
256 var restSubscription *RESTSubscription
257 if p.SubscriptionID == "" {
258 restSubId = ksuid.New().String()
259 restSubscription, err = c.registry.CreateRESTSubscription(&restSubId, &xAppRmrEndpoint, p.Meid)
261 xapp.Logger.Error("%s", err.Error())
262 c.UpdateCounter(cRestSubFailToXapp)
267 restSubId = p.SubscriptionID
268 restSubscription, err = c.registry.GetRESTSubscription(restSubId, false)
270 xapp.Logger.Error("%s", err.Error())
271 c.UpdateCounter(cRestSubFailToXapp)
276 subResp.SubscriptionID = &restSubId
277 subReqList := e2ap.SubscriptionRequestList{}
278 err = c.e2ap.FillSubscriptionReqMsgs(params, &subReqList, restSubscription)
280 xapp.Logger.Error("%s", err.Error())
281 c.registry.DeleteRESTSubscription(&restSubId)
282 c.UpdateCounter(cRestSubFailToXapp)
286 go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId)
288 c.UpdateCounter(cRestSubRespToXapp)
292 //-------------------------------------------------------------------
294 //-------------------------------------------------------------------
296 func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription, subReqList *e2ap.SubscriptionRequestList,
297 clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string) {
299 xapp.Logger.Info("Subscription Request count=%v ", len(subReqList.E2APSubscriptionRequests))
301 _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*clientEndpoint)
303 c.registry.DeleteRESTSubscription(restSubId)
304 xapp.Logger.Error("XAPP-SubReq transaction not created, endpoint createtion failed for RESTSubId=%s, Meid=%s", *restSubId, *meid)
308 var xAppEventInstanceID int64
309 var e2EventInstanceID int64
310 var errorCause string
311 for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ {
312 subReqMsg := subReqList.E2APSubscriptionRequests[index]
314 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), *restSubId, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid})
316 c.registry.DeleteRESTSubscription(restSubId)
317 xapp.Logger.Error("XAPP-SubReq transaction not created. RESTSubId=%s, EndPoint=%s, Meid=%s", *restSubId, xAppRmrEndpoint, *meid)
321 defer trans.Release()
322 xAppEventInstanceID = (int64)(subReqMsg.RequestId.Id)
323 xapp.Logger.Info("Handle SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
324 subRespMsg, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, restSubId)
326 // Send notification to xApp that prosessing of a Subscription Request has failed.
327 e2EventInstanceID = (int64)(0)
328 errorCause = err.Error()
329 resp := &models.SubscriptionResponse{
330 SubscriptionID: restSubId,
331 SubscriptionInstances: []*models.SubscriptionInstance{
332 &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
333 ErrorCause: &errorCause,
334 XappEventInstanceID: &xAppEventInstanceID},
337 // Mark REST subscription request processed.
338 restSubscription.SetProcessed()
339 xapp.Logger.Info("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
340 errorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
341 xapp.Subscription.Notify(resp, *clientEndpoint)
342 c.UpdateCounter(cRestSubFailNotifToXapp)
344 e2EventInstanceID = (int64)(subRespMsg.RequestId.InstanceId)
346 xapp.Logger.Info("SubscriptionRequest index=%v processed successfully. endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
347 index, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
349 // Store successfully processed InstanceId for deletion
350 restSubscription.AddE2InstanceId(subRespMsg.RequestId.InstanceId)
351 restSubscription.AddXappIdToE2Id(xAppEventInstanceID, e2EventInstanceID)
353 // Send notification to xApp that a Subscription Request has been processed.
354 resp := &models.SubscriptionResponse{
355 SubscriptionID: restSubId,
356 SubscriptionInstances: []*models.SubscriptionInstance{
357 &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
359 XappEventInstanceID: &xAppEventInstanceID},
362 // Mark REST subscription request processesd.
363 restSubscription.SetProcessed()
364 xapp.Logger.Info("Sending successful REST notification to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
365 clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
366 xapp.Subscription.Notify(resp, *clientEndpoint)
367 c.UpdateCounter(cRestSubNotifToXapp)
373 //-------------------------------------------------------------------
375 //------------------------------------------------------------------
376 func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, meid *string,
377 restSubId *string) (*e2ap.E2APSubscriptionResponse, error) {
379 err := c.tracker.Track(trans)
381 xapp.Logger.Error("XAPP-SubReq Tracking error: %s", idstring(err, trans))
382 err = fmt.Errorf("Tracking failure")
386 subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
388 xapp.Logger.Error("XAPP-SubReq Assign error: %s", idstring(err, trans))
395 go c.handleSubscriptionCreate(subs, trans)
396 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
400 switch themsg := event.(type) {
401 case *e2ap.E2APSubscriptionResponse:
404 case *e2ap.E2APSubscriptionFailure:
405 err = fmt.Errorf("E2 SubscriptionFailure received")
408 err = fmt.Errorf("unexpected E2 subscription response received")
412 err = fmt.Errorf("E2 subscription response timeout")
415 xapp.Logger.Error("XAPP-SubReq E2 subscription failed %s", idstring(err, trans, subs))
416 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
420 //-------------------------------------------------------------------
422 //-------------------------------------------------------------------
423 func (c *Control) SubscriptionDeleteHandlerCB(restSubId string) error {
426 c.UpdateCounter(cRestSubDelReqFromXapp)
428 xapp.Logger.Info("SubscriptionDeleteRequest from XAPP")
430 restSubscription, err := c.registry.GetRESTSubscription(restSubId, true)
432 xapp.Logger.Error("%s", err.Error())
433 if restSubscription == nil {
434 // Subscription was not found
437 if restSubscription.SubReqOngoing == true {
438 err := fmt.Errorf("Handling of the REST Subscription Request still ongoing %s", restSubId)
439 xapp.Logger.Error("%s", err.Error())
441 } else if restSubscription.SubDelReqOngoing == true {
442 // Previous request for same restSubId still ongoing
448 xAppRmrEndPoint := restSubscription.xAppRmrEndPoint
450 for _, instanceId := range restSubscription.InstanceIds {
451 xAppEventInstanceID, err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId)
454 xapp.Logger.Error("%s", err.Error())
457 xapp.Logger.Info("Deleteting instanceId = %v", instanceId)
458 restSubscription.DeleteXappIdToE2Id(xAppEventInstanceID)
459 restSubscription.DeleteE2InstanceId(instanceId)
461 c.registry.DeleteRESTSubscription(&restSubId)
464 c.UpdateCounter(cRestSubDelRespToXapp)
469 //-------------------------------------------------------------------
471 //-------------------------------------------------------------------
472 func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32) (int64, error) {
474 var xAppEventInstanceID int64
475 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{instanceId})
477 xapp.Logger.Info("Subscription Delete Handler subscription for restSubId=%v, E2EventInstanceID=%v not found %s",
478 restSubId, instanceId, idstring(err, nil))
479 return xAppEventInstanceID, nil
482 xAppEventInstanceID = int64(subs.ReqId.Id)
483 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(*endPoint), *restSubId, e2ap.RequestId{subs.ReqId.Id, 0}, &xapp.RMRMeid{RanName: *meid})
485 err := fmt.Errorf("XAPP-SubDelReq transaction not created. restSubId %s, endPoint %s, meid %s, instanceId %v", *restSubId, *endPoint, *meid, instanceId)
486 xapp.Logger.Error("%s", err.Error())
488 defer trans.Release()
490 err = c.tracker.Track(trans)
492 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
493 xapp.Logger.Error("%s", err.Error())
494 return xAppEventInstanceID, &time.ParseError{}
499 go c.handleSubscriptionDelete(subs, trans)
500 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
502 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
504 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
506 return xAppEventInstanceID, nil
509 //-------------------------------------------------------------------
511 //-------------------------------------------------------------------
512 func (c *Control) QueryHandler() (models.SubscriptionList, error) {
513 xapp.Logger.Info("QueryHandler() called")
517 return c.registry.QueryHandler()
520 func (c *Control) TestRestHandler(w http.ResponseWriter, r *http.Request) {
521 xapp.Logger.Info("TestRestHandler() called")
523 pathParams := mux.Vars(r)
524 s := pathParams["testId"]
526 // This can be used to delete single subscription from db
527 if contains := strings.Contains(s, "deletesubid="); contains == true {
528 var splits = strings.Split(s, "=")
529 if subId, err := strconv.ParseInt(splits[1], 10, 64); err == nil {
530 xapp.Logger.Info("RemoveSubscriptionFromSdl() called. subId = %v", subId)
531 c.RemoveSubscriptionFromSdl(uint32(subId))
536 // This can be used to remove all subscriptions db from
538 xapp.Logger.Info("RemoveAllSubscriptionsFromSdl() called")
539 c.RemoveAllSubscriptionsFromSdl()
543 // This is meant to cause submgr's restart in testing
545 xapp.Logger.Info("os.Exit(1) called")
549 xapp.Logger.Info("Unsupported rest command received %s", s)
552 //-------------------------------------------------------------------
554 //-------------------------------------------------------------------
556 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
557 params := &xapp.RMRParams{}
558 params.Mtype = trans.GetMtype()
559 params.SubId = int(subs.GetReqId().InstanceId)
561 params.Meid = subs.GetMeid()
563 params.PayloadLen = len(trans.Payload.Buf)
564 params.Payload = trans.Payload.Buf
566 xapp.Logger.Info("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
567 err = c.SendWithRetry(params, false, 5)
569 xapp.Logger.Error("rmrSendToE2T: Send failed: %+v", err)
574 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
576 params := &xapp.RMRParams{}
577 params.Mtype = trans.GetMtype()
578 params.SubId = int(subs.GetReqId().InstanceId)
579 params.Xid = trans.GetXid()
580 params.Meid = trans.GetMeid()
582 params.PayloadLen = len(trans.Payload.Buf)
583 params.Payload = trans.Payload.Buf
585 xapp.Logger.Info("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
586 err = c.SendWithRetry(params, false, 5)
588 xapp.Logger.Error("rmrSendToXapp: Send failed: %+v", err)
593 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
594 if c.RMRClient == nil {
595 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
596 xapp.Logger.Error("%s", err.Error())
601 defer c.RMRClient.Free(msg.Mbuf)
603 // xapp-frame might use direct access to c buffer and
604 // when msg.Mbuf is freed, someone might take it into use
605 // and payload data might be invalid inside message handle function
607 // subscriptions won't load system a lot so there is no
608 // real performance hit by cloning buffer into new go byte slice
609 cPay := append(msg.Payload[:0:0], msg.Payload...)
611 msg.PayloadLen = len(cPay)
614 case xapp.RIC_SUB_REQ:
615 go c.handleXAPPSubscriptionRequest(msg)
616 case xapp.RIC_SUB_RESP:
617 go c.handleE2TSubscriptionResponse(msg)
618 case xapp.RIC_SUB_FAILURE:
619 go c.handleE2TSubscriptionFailure(msg)
620 case xapp.RIC_SUB_DEL_REQ:
621 go c.handleXAPPSubscriptionDeleteRequest(msg)
622 case xapp.RIC_SUB_DEL_RESP:
623 go c.handleE2TSubscriptionDeleteResponse(msg)
624 case xapp.RIC_SUB_DEL_FAILURE:
625 go c.handleE2TSubscriptionDeleteFailure(msg)
627 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
632 //-------------------------------------------------------------------
633 // handle from XAPP Subscription Request
634 //------------------------------------------------------------------
635 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
636 xapp.Logger.Info("MSG from XAPP: %s", params.String())
637 c.UpdateCounter(cSubReqFromXapp)
639 subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
641 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
645 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
647 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
650 defer trans.Release()
652 if err = c.tracker.Track(trans); err != nil {
653 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
657 //TODO handle subscription toward e2term inside AssignToSubscription / hide handleSubscriptionCreate in it?
658 subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
660 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
664 c.wakeSubscriptionRequest(subs, trans)
667 //-------------------------------------------------------------------
668 // Wake Subscription Request to E2node
669 //------------------------------------------------------------------
670 func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) {
672 go c.handleSubscriptionCreate(subs, trans)
673 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
676 switch themsg := event.(type) {
677 case *e2ap.E2APSubscriptionResponse:
678 themsg.RequestId.Id = trans.RequestId.Id
679 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
682 c.UpdateCounter(cSubRespToXapp)
683 c.rmrSendToXapp("", subs, trans)
686 case *e2ap.E2APSubscriptionFailure:
687 themsg.RequestId.Id = trans.RequestId.Id
688 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
690 c.UpdateCounter(cSubFailToXapp)
691 c.rmrSendToXapp("", subs, trans)
697 xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(err, trans, subs))
698 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
701 //-------------------------------------------------------------------
702 // handle from XAPP Subscription Delete Request
703 //------------------------------------------------------------------
704 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
705 xapp.Logger.Info("MSG from XAPP: %s", params.String())
706 c.UpdateCounter(cSubDelReqFromXapp)
708 subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
710 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
714 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
716 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
719 defer trans.Release()
721 err = c.tracker.Track(trans)
723 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
727 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
729 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
736 go c.handleSubscriptionDelete(subs, trans)
737 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
739 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
741 if subs.NoRespToXapp == true {
742 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
746 // Whatever is received success, fail or timeout, send successful delete response
747 subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
748 subDelRespMsg.RequestId.Id = trans.RequestId.Id
749 subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
750 subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
751 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
753 c.UpdateCounter(cSubDelRespToXapp)
754 c.rmrSendToXapp("", subs, trans)
757 //TODO handle subscription toward e2term insiged RemoveFromSubscription / hide handleSubscriptionDelete in it?
758 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
761 //-------------------------------------------------------------------
762 // SUBS CREATE Handling
763 //-------------------------------------------------------------------
764 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp) {
766 var removeSubscriptionFromDb bool = false
767 trans := c.tracker.NewSubsTransaction(subs)
768 subs.WaitTransactionTurn(trans)
769 defer subs.ReleaseTransactionTurn(trans)
770 defer trans.Release()
772 xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
774 subRfMsg, valid := subs.GetCachedResponse()
775 if subRfMsg == nil && valid == true {
776 event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
777 switch event.(type) {
778 case *e2ap.E2APSubscriptionResponse:
779 subRfMsg, valid = subs.SetCachedResponse(event, true)
780 subs.SubRespRcvd = true
781 case *e2ap.E2APSubscriptionFailure:
782 removeSubscriptionFromDb = true
783 subRfMsg, valid = subs.SetCachedResponse(event, false)
784 xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
785 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
786 case *SubmgrRestartTestEvent:
787 // This simulates that no response has been received and after restart subscriptions are restored from db
788 xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
791 xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
792 removeSubscriptionFromDb = true
793 subRfMsg, valid = subs.SetCachedResponse(nil, false)
794 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
796 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
798 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
801 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
803 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
806 c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
807 parentTrans.SendEvent(subRfMsg, 0)
810 //-------------------------------------------------------------------
811 // SUBS DELETE Handling
812 //-------------------------------------------------------------------
814 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
816 trans := c.tracker.NewSubsTransaction(subs)
817 subs.WaitTransactionTurn(trans)
818 defer subs.ReleaseTransactionTurn(trans)
819 defer trans.Release()
821 xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
825 if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
828 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
832 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
833 // If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if
834 // RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...))
835 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
836 c.registry.UpdateSubscriptionToDb(subs, c)
837 parentTrans.SendEvent(nil, 0)
840 //-------------------------------------------------------------------
841 // send to E2T Subscription Request
842 //-------------------------------------------------------------------
843 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
845 var event interface{} = nil
846 var timedOut bool = false
847 const ricRequestorId = 123
849 subReqMsg := subs.SubReqMsg
850 subReqMsg.RequestId = subs.GetReqId().RequestId
851 subReqMsg.RequestId.Id = ricRequestorId
852 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
854 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
858 // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
859 c.WriteSubscriptionToDb(subs)
861 for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
862 desc := fmt.Sprintf("(retry %d)", retries)
864 c.UpdateCounter(cSubReqToE2)
866 c.UpdateCounter(cSubReReqToE2)
868 c.rmrSendToE2T(desc, subs, trans)
869 if subs.DoNotWaitSubResp == false {
870 event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
872 c.UpdateCounter(cSubReqTimerExpiry)
876 // Simulating case where subscrition request has been sent but response has not been received before restart
877 event = &SubmgrRestartTestEvent{}
881 xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
885 //-------------------------------------------------------------------
886 // send to E2T Subscription Delete Request
887 //-------------------------------------------------------------------
889 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
891 var event interface{}
893 const ricRequestorId = 123
895 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
896 subDelReqMsg.RequestId = subs.GetReqId().RequestId
897 subDelReqMsg.RequestId.Id = ricRequestorId
898 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
899 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
901 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
905 for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
906 desc := fmt.Sprintf("(retry %d)", retries)
908 c.UpdateCounter(cSubDelReqToE2)
910 c.UpdateCounter(cSubDelReReqToE2)
912 c.rmrSendToE2T(desc, subs, trans)
913 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
915 c.UpdateCounter(cSubDelReqTimerExpiry)
920 xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
924 //-------------------------------------------------------------------
925 // handle from E2T Subscription Response
926 //-------------------------------------------------------------------
927 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
928 xapp.Logger.Info("MSG from E2T: %s", params.String())
929 c.UpdateCounter(cSubRespFromE2)
931 subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
933 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
936 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
938 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
941 trans := subs.GetTransaction()
943 err = fmt.Errorf("Ongoing transaction not found")
944 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
947 sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
949 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
950 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
955 //-------------------------------------------------------------------
956 // handle from E2T Subscription Failure
957 //-------------------------------------------------------------------
958 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
959 xapp.Logger.Info("MSG from E2T: %s", params.String())
960 c.UpdateCounter(cSubFailFromE2)
961 subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
963 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
966 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
968 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
971 trans := subs.GetTransaction()
973 err = fmt.Errorf("Ongoing transaction not found")
974 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
977 sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
979 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
980 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
985 //-------------------------------------------------------------------
986 // handle from E2T Subscription Delete Response
987 //-------------------------------------------------------------------
988 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
989 xapp.Logger.Info("MSG from E2T: %s", params.String())
990 c.UpdateCounter(cSubDelRespFromE2)
991 subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
993 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
996 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
998 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1001 trans := subs.GetTransaction()
1003 err = fmt.Errorf("Ongoing transaction not found")
1004 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
1007 sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
1008 if sendOk == false {
1009 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1010 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
1015 //-------------------------------------------------------------------
1016 // handle from E2T Subscription Delete Failure
1017 //-------------------------------------------------------------------
1018 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
1019 xapp.Logger.Info("MSG from E2T: %s", params.String())
1020 c.UpdateCounter(cSubDelFailFromE2)
1021 subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
1023 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1026 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
1028 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1031 trans := subs.GetTransaction()
1033 err = fmt.Errorf("Ongoing transaction not found")
1034 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
1037 sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
1038 if sendOk == false {
1039 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1040 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
1045 //-------------------------------------------------------------------
1047 //-------------------------------------------------------------------
1048 func typeofSubsMessage(v interface{}) string {
1053 //case *e2ap.E2APSubscriptionRequest:
1055 case *e2ap.E2APSubscriptionResponse:
1057 case *e2ap.E2APSubscriptionFailure:
1059 //case *e2ap.E2APSubscriptionDeleteRequest:
1060 // return "SubDelReq"
1061 case *e2ap.E2APSubscriptionDeleteResponse:
1063 case *e2ap.E2APSubscriptionDeleteFailure:
1070 //-------------------------------------------------------------------
1072 //-------------------------------------------------------------------
1073 func (c *Control) WriteSubscriptionToDb(subs *Subscription) {
1074 xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
1075 err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
1077 xapp.Logger.Error("%v", err)
1081 //-------------------------------------------------------------------
1083 //-------------------------------------------------------------------
1084 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) {
1086 if removeSubscriptionFromDb == true {
1087 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1088 c.RemoveSubscriptionFromDb(subs)
1090 // Update is needed for successful response and merge case here
1091 if subs.RetryFromXapp == false {
1092 c.WriteSubscriptionToDb(subs)
1095 subs.RetryFromXapp = false
1098 //-------------------------------------------------------------------
1100 //-------------------------------------------------------------------
1101 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
1102 xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
1103 err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
1105 xapp.Logger.Error("%v", err)
1109 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) {
1111 const ricRequestorId = 123
1112 xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
1114 // Send delete for every endpoint in the subscription
1115 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1116 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1117 subDelReqMsg.RequestId.Id = ricRequestorId
1118 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1119 mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1121 xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
1124 for _, endPoint := range subs.EpList.Endpoints {
1125 params := &xapp.RMRParams{}
1126 params.Mtype = mType
1127 params.SubId = int(subs.GetReqId().InstanceId)
1129 params.Meid = subs.Meid
1130 params.Src = endPoint.String()
1131 params.PayloadLen = len(payload.Buf)
1132 params.Payload = payload.Buf
1134 subs.DeleteFromDb = true
1135 c.handleXAPPSubscriptionDeleteRequest(params)
1139 func (c *Control) PrintRESTSubscriptionRequest(p *models.SubscriptionParams) {
1141 fmt.Println("CRESTSubscriptionRequest")
1142 fmt.Printf(" ClientEndpoint.Host = %s\n", p.ClientEndpoint.Host)
1144 if p.ClientEndpoint.HTTPPort != nil {
1145 fmt.Printf(" ClientEndpoint.HTTPPort = %v\n", *p.ClientEndpoint.HTTPPort)
1147 fmt.Println(" ClientEndpoint.HTTPPort = nil")
1150 if p.ClientEndpoint.RMRPort != nil {
1151 fmt.Printf(" ClientEndpoint.RMRPort = %v\n", *p.ClientEndpoint.RMRPort)
1153 fmt.Println(" ClientEndpoint.RMRPort = nil")
1157 fmt.Printf(" Meid = %s\n", *p.Meid)
1159 fmt.Println(" Meid = nil")
1162 for _, subscriptionDetail := range p.SubscriptionDetails {
1163 if p.RANFunctionID != nil {
1164 fmt.Printf(" RANFunctionID = %v\n", *p.RANFunctionID)
1166 fmt.Println(" RANFunctionID = nil")
1168 fmt.Printf(" SubscriptionDetail.XappEventInstanceID = %v\n", *subscriptionDetail.XappEventInstanceID)
1169 fmt.Printf(" SubscriptionDetail.EventTriggers = %v\n", subscriptionDetail.EventTriggers)
1171 for _, actionToBeSetup := range subscriptionDetail.ActionToBeSetupList {
1172 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionID = %v\n", *actionToBeSetup.ActionID)
1173 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionType = %s\n", *actionToBeSetup.ActionType)
1174 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionDefinition = %v\n", actionToBeSetup.ActionDefinition)
1176 if actionToBeSetup.SubsequentAction != nil {
1177 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.SubsequentAction.SubsequentActionType = %s\n", *actionToBeSetup.SubsequentAction.SubsequentActionType)
1178 fmt.Printf(" SubscriptionDetail.ActionToBeSetup..SubsequentAction.TimeToWait = %s\n", *actionToBeSetup.SubsequentAction.TimeToWait)
1180 fmt.Println(" SubscriptionDetail.ActionToBeSetup.SubsequentAction = nil")