2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
30 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
31 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
32 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
33 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
34 httptransport "github.com/go-openapi/runtime/client"
35 "github.com/go-openapi/strfmt"
36 "github.com/gorilla/mux"
37 "github.com/segmentio/ksuid"
38 "github.com/spf13/viper"
41 //-----------------------------------------------------------------------------
43 //-----------------------------------------------------------------------------
45 func idstring(err error, entries ...fmt.Stringer) string {
46 var retval string = ""
47 var filler string = ""
48 for _, entry := range entries {
49 retval += filler + entry.String()
53 retval += filler + "err(" + err.Error() + ")"
59 //-----------------------------------------------------------------------------
61 //-----------------------------------------------------------------------------
63 var e2tSubReqTimeout time.Duration
64 var e2tSubDelReqTime time.Duration
65 var e2tRecvMsgTimeout time.Duration
66 var waitRouteCleanup_ms time.Duration
67 var e2tMaxSubReqTryCount uint64 // Initial try + retry
68 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
69 var readSubsFromDb string
70 var restDuplicateCtrl duplicateCtrl
80 Counters map[string]xapp.Counter
90 type SubmgrRestartTestEvent struct{}
91 type SubmgrRestartUpEvent struct{}
94 xapp.Logger.Info("SUBMGR")
96 viper.SetEnvPrefix("submgr")
97 viper.AllowEmptyEnv(true)
100 func NewControl() *Control {
102 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
103 rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
105 registry := new(Registry)
106 registry.Initialize()
107 registry.rtmgrClient = &rtmgrClient
109 tracker := new(Tracker)
112 c := &Control{e2ap: new(E2ap),
116 Counters: xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
119 c.ReadConfigParameters("")
121 // Register REST handler for testing support
122 xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
123 xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET")
125 go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler, c.SubscriptionDeleteHandlerCB)
127 if readSubsFromDb == "false" {
131 restDuplicateCtrl.Init()
133 // Read subscriptions from db
134 xapp.Logger.Info("Reading subscriptions from db")
135 subIds, register, err := c.ReadAllSubscriptionsFromSdl()
137 xapp.Logger.Error("%v", err)
139 c.registry.subIds = subIds
140 c.registry.register = register
141 c.HandleUncompletedSubscriptions(register)
146 func (c *Control) SymptomDataHandler(w http.ResponseWriter, r *http.Request) {
147 subscriptions, _ := c.registry.QueryHandler()
148 xapp.Resource.SendSymptomDataJson(w, r, subscriptions, "platform/subscriptions.json")
151 //-------------------------------------------------------------------
153 //-------------------------------------------------------------------
154 func (c *Control) ReadConfigParameters(f string) {
156 // viper.GetDuration returns nanoseconds
157 e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
158 if e2tSubReqTimeout == 0 {
159 e2tSubReqTimeout = 2000 * 1000000
161 xapp.Logger.Info("e2tSubReqTimeout %v", e2tSubReqTimeout)
162 e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
163 if e2tSubDelReqTime == 0 {
164 e2tSubDelReqTime = 2000 * 1000000
166 xapp.Logger.Info("e2tSubDelReqTime %v", e2tSubDelReqTime)
167 e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
168 if e2tRecvMsgTimeout == 0 {
169 e2tRecvMsgTimeout = 2000 * 1000000
171 xapp.Logger.Info("e2tRecvMsgTimeout %v", e2tRecvMsgTimeout)
173 // Internal cfg parameter, used to define a wait time for RMR route clean-up. None default
174 // value 100ms used currently only in unittests.
175 waitRouteCleanup_ms = viper.GetDuration("controls.waitRouteCleanup_ms") * 1000000
176 if waitRouteCleanup_ms == 0 {
177 waitRouteCleanup_ms = 5000 * 1000000
179 xapp.Logger.Info("waitRouteCleanup %v", waitRouteCleanup_ms)
181 e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
182 if e2tMaxSubReqTryCount == 0 {
183 e2tMaxSubReqTryCount = 1
185 xapp.Logger.Info("e2tMaxSubReqTryCount %v", e2tMaxSubReqTryCount)
187 e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
188 if e2tMaxSubDelReqTryCount == 0 {
189 e2tMaxSubDelReqTryCount = 1
191 xapp.Logger.Info("e2tMaxSubDelReqTryCount %v", e2tMaxSubDelReqTryCount)
193 readSubsFromDb = viper.GetString("controls.readSubsFromDb")
194 if readSubsFromDb == "" {
195 readSubsFromDb = "true"
197 xapp.Logger.Info("readSubsFromDb %v", readSubsFromDb)
198 c.LoggerLevel = viper.GetUint32("logger.level")
199 if c.LoggerLevel == 0 {
204 //-------------------------------------------------------------------
206 //-------------------------------------------------------------------
207 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
209 xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
210 for subId, subs := range register {
211 if subs.SubRespRcvd == false {
212 subs.NoRespToXapp = true
213 xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
214 c.SendSubscriptionDeleteReq(subs)
219 func (c *Control) ReadyCB(data interface{}) {
220 if c.RMRClient == nil {
221 c.RMRClient = xapp.Rmr
225 func (c *Control) Run() {
226 xapp.SetReadyCB(c.ReadyCB, nil)
227 xapp.AddConfigChangeListener(c.ReadConfigParameters)
231 //-------------------------------------------------------------------
233 //-------------------------------------------------------------------
234 func (c *Control) SubscriptionHandler(params interface{}) (*models.SubscriptionResponse, error) {
237 c.UpdateCounter(cRestSubReqFromXapp)
239 subResp := models.SubscriptionResponse{}
240 p := params.(*models.SubscriptionParams)
242 if c.LoggerLevel > 2 {
243 c.PrintRESTSubscriptionRequest(p)
246 if p.ClientEndpoint == nil {
247 xapp.Logger.Error("ClientEndpoint == nil")
248 c.UpdateCounter(cRestSubFailToXapp)
249 return nil, fmt.Errorf("")
252 _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*p.ClientEndpoint)
254 xapp.Logger.Error("%s", err.Error())
255 c.UpdateCounter(cRestSubFailToXapp)
259 var restSubscription *RESTSubscription
260 if p.SubscriptionID == "" {
261 restSubId = ksuid.New().String()
262 restSubscription, err = c.registry.CreateRESTSubscription(&restSubId, &xAppRmrEndpoint, p.Meid)
264 xapp.Logger.Error("%s", err.Error())
265 c.UpdateCounter(cRestSubFailToXapp)
270 restSubId = p.SubscriptionID
271 restSubscription, err = c.registry.GetRESTSubscription(restSubId, false)
273 xapp.Logger.Error("%s", err.Error())
274 c.UpdateCounter(cRestSubFailToXapp)
279 subResp.SubscriptionID = &restSubId
280 subReqList := e2ap.SubscriptionRequestList{}
281 err = c.e2ap.FillSubscriptionReqMsgs(params, &subReqList, restSubscription)
283 xapp.Logger.Error("%s", err.Error())
284 c.registry.DeleteRESTSubscription(&restSubId)
285 c.UpdateCounter(cRestSubFailToXapp)
289 err, duplicate, md5sum := restDuplicateCtrl.IsDuplicateToOngoingTransaction(restSubId, params)
292 // We were unable to detect whether this request was duplicate or not, proceed
293 xapp.Logger.Info("%s - proceeding with the request", err.Error())
296 if *p.SubscriptionDetails[0].ActionToBeSetupList[0].ActionType == "report" {
297 xapp.Logger.Info("Retransmission blocker dropped for report typer of request")
298 c.UpdateCounter(cRestSubRespToXapp)
302 restSubscription.Md5sumOngoing = md5sum
305 go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId, xAppRmrEndpoint)
307 c.UpdateCounter(cRestSubRespToXapp)
311 func (c *Control) sendUnsuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, err error,
312 clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp) {
314 // Send notification to xApp that prosessing of a Subscription Request has failed.
315 e2EventInstanceID := (int64)(0)
316 errorCause := err.Error()
317 resp := &models.SubscriptionResponse{
318 SubscriptionID: restSubId,
319 SubscriptionInstances: []*models.SubscriptionInstance{
320 &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
321 ErrorCause: &errorCause,
322 XappEventInstanceID: &xAppEventInstanceID},
325 // Mark REST subscription request processed.
326 restSubscription.SetProcessed()
328 xapp.Logger.Info("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
329 errorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
331 xapp.Logger.Info("Sending unsuccessful REST notification (cause %s) to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v",
332 errorCause, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID)
334 xapp.Subscription.Notify(resp, *clientEndpoint)
335 c.UpdateCounter(cRestSubFailNotifToXapp)
338 func (c *Control) sendSuccesfullResponseNotification(restSubId *string, restSubscription *RESTSubscription, xAppEventInstanceID int64, e2EventInstanceID int64,
339 clientEndpoint *models.SubscriptionParamsClientEndpoint, trans *TransactionXapp) {
341 // Store successfully processed InstanceId for deletion
342 restSubscription.AddE2InstanceId((uint32)(e2EventInstanceID))
343 restSubscription.AddXappIdToE2Id(xAppEventInstanceID, e2EventInstanceID)
345 // Send notification to xApp that a Subscription Request has been processed.
346 resp := &models.SubscriptionResponse{
347 SubscriptionID: restSubId,
348 SubscriptionInstances: []*models.SubscriptionInstance{
349 &models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
351 XappEventInstanceID: &xAppEventInstanceID},
354 // Mark REST subscription request processesd.
355 restSubscription.SetProcessed()
356 xapp.Logger.Info("Sending successful REST notification to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
357 clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
358 xapp.Subscription.Notify(resp, *clientEndpoint)
359 c.UpdateCounter(cRestSubNotifToXapp)
362 //-------------------------------------------------------------------
364 //-------------------------------------------------------------------
366 func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription, subReqList *e2ap.SubscriptionRequestList,
367 clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string, xAppRmrEndpoint string) {
369 xapp.Logger.Info("Subscription Request count=%v ", len(subReqList.E2APSubscriptionRequests))
371 var xAppEventInstanceID int64
372 var e2EventInstanceID int64
374 defer restDuplicateCtrl.TransactionComplete(restSubscription.Md5sumOngoing)
376 for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ {
377 subReqMsg := subReqList.E2APSubscriptionRequests[index]
378 xAppEventInstanceID = (int64)(subReqMsg.RequestId.Id)
380 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), *restSubId, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid})
382 // Send notification to xApp that prosessing of a Subscription Request has failed.
383 err := fmt.Errorf("Tracking failure")
384 c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans)
388 xapp.Logger.Info("Handle SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
390 subRespMsg, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, restSubId)
392 c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans)
394 e2EventInstanceID = (int64)(subRespMsg.RequestId.InstanceId)
395 xapp.Logger.Info("SubscriptionRequest index=%v processed successfully. endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
396 index, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
397 c.sendSuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, e2EventInstanceID, clientEndpoint, trans)
403 //-------------------------------------------------------------------
405 //------------------------------------------------------------------
406 func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, meid *string,
407 restSubId *string) (*e2ap.E2APSubscriptionResponse, error) {
409 err := c.tracker.Track(trans)
411 xapp.Logger.Error("XAPP-SubReq Tracking error: %s", idstring(err, trans))
412 err = fmt.Errorf("Tracking failure")
416 subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
418 xapp.Logger.Error("XAPP-SubReq Assign error: %s", idstring(err, trans))
425 go c.handleSubscriptionCreate(subs, trans)
426 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
430 switch themsg := event.(type) {
431 case *e2ap.E2APSubscriptionResponse:
434 case *e2ap.E2APSubscriptionFailure:
435 err = fmt.Errorf("E2 SubscriptionFailure received")
438 err = fmt.Errorf("unexpected E2 subscription response received")
442 err = fmt.Errorf("E2 subscription response timeout")
445 xapp.Logger.Error("XAPP-SubReq E2 subscription failed %s", idstring(err, trans, subs))
446 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
450 //-------------------------------------------------------------------
452 //-------------------------------------------------------------------
453 func (c *Control) SubscriptionDeleteHandlerCB(restSubId string) error {
456 c.UpdateCounter(cRestSubDelReqFromXapp)
458 xapp.Logger.Info("SubscriptionDeleteRequest from XAPP")
460 restSubscription, err := c.registry.GetRESTSubscription(restSubId, true)
462 xapp.Logger.Error("%s", err.Error())
463 if restSubscription == nil {
464 // Subscription was not found
467 if restSubscription.SubReqOngoing == true {
468 err := fmt.Errorf("Handling of the REST Subscription Request still ongoing %s", restSubId)
469 xapp.Logger.Error("%s", err.Error())
471 } else if restSubscription.SubDelReqOngoing == true {
472 // Previous request for same restSubId still ongoing
478 xAppRmrEndPoint := restSubscription.xAppRmrEndPoint
480 for _, instanceId := range restSubscription.InstanceIds {
481 xAppEventInstanceID, err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId)
484 xapp.Logger.Error("%s", err.Error())
487 xapp.Logger.Info("Deleteting instanceId = %v", instanceId)
488 restSubscription.DeleteXappIdToE2Id(xAppEventInstanceID)
489 restSubscription.DeleteE2InstanceId(instanceId)
491 c.registry.DeleteRESTSubscription(&restSubId)
494 c.UpdateCounter(cRestSubDelRespToXapp)
499 //-------------------------------------------------------------------
501 //-------------------------------------------------------------------
502 func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32) (int64, error) {
504 var xAppEventInstanceID int64
505 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{instanceId})
507 xapp.Logger.Info("Subscription Delete Handler subscription for restSubId=%v, E2EventInstanceID=%v not found %s",
508 restSubId, instanceId, idstring(err, nil))
509 return xAppEventInstanceID, nil
512 xAppEventInstanceID = int64(subs.ReqId.Id)
513 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(*endPoint), *restSubId, e2ap.RequestId{subs.ReqId.Id, 0}, &xapp.RMRMeid{RanName: *meid})
515 err := fmt.Errorf("XAPP-SubDelReq transaction not created. restSubId %s, endPoint %s, meid %s, instanceId %v", *restSubId, *endPoint, *meid, instanceId)
516 xapp.Logger.Error("%s", err.Error())
518 defer trans.Release()
520 err = c.tracker.Track(trans)
522 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
523 xapp.Logger.Error("%s", err.Error())
524 return xAppEventInstanceID, &time.ParseError{}
529 go c.handleSubscriptionDelete(subs, trans)
530 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
532 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
534 c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
536 return xAppEventInstanceID, nil
539 //-------------------------------------------------------------------
541 //-------------------------------------------------------------------
542 func (c *Control) QueryHandler() (models.SubscriptionList, error) {
543 xapp.Logger.Info("QueryHandler() called")
547 return c.registry.QueryHandler()
550 func (c *Control) TestRestHandler(w http.ResponseWriter, r *http.Request) {
551 xapp.Logger.Info("TestRestHandler() called")
553 pathParams := mux.Vars(r)
554 s := pathParams["testId"]
556 // This can be used to delete single subscription from db
557 if contains := strings.Contains(s, "deletesubid="); contains == true {
558 var splits = strings.Split(s, "=")
559 if subId, err := strconv.ParseInt(splits[1], 10, 64); err == nil {
560 xapp.Logger.Info("RemoveSubscriptionFromSdl() called. subId = %v", subId)
561 c.RemoveSubscriptionFromSdl(uint32(subId))
566 // This can be used to remove all subscriptions db from
568 xapp.Logger.Info("RemoveAllSubscriptionsFromSdl() called")
569 c.RemoveAllSubscriptionsFromSdl()
573 // This is meant to cause submgr's restart in testing
575 xapp.Logger.Info("os.Exit(1) called")
579 xapp.Logger.Info("Unsupported rest command received %s", s)
582 //-------------------------------------------------------------------
584 //-------------------------------------------------------------------
586 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
587 params := &xapp.RMRParams{}
588 params.Mtype = trans.GetMtype()
589 params.SubId = int(subs.GetReqId().InstanceId)
591 params.Meid = subs.GetMeid()
593 params.PayloadLen = len(trans.Payload.Buf)
594 params.Payload = trans.Payload.Buf
596 xapp.Logger.Info("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
597 err = c.SendWithRetry(params, false, 5)
599 xapp.Logger.Error("rmrSendToE2T: Send failed: %+v", err)
604 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
606 params := &xapp.RMRParams{}
607 params.Mtype = trans.GetMtype()
608 params.SubId = int(subs.GetReqId().InstanceId)
609 params.Xid = trans.GetXid()
610 params.Meid = trans.GetMeid()
612 params.PayloadLen = len(trans.Payload.Buf)
613 params.Payload = trans.Payload.Buf
615 xapp.Logger.Info("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
616 err = c.SendWithRetry(params, false, 5)
618 xapp.Logger.Error("rmrSendToXapp: Send failed: %+v", err)
623 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
624 if c.RMRClient == nil {
625 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
626 xapp.Logger.Error("%s", err.Error())
631 defer c.RMRClient.Free(msg.Mbuf)
633 // xapp-frame might use direct access to c buffer and
634 // when msg.Mbuf is freed, someone might take it into use
635 // and payload data might be invalid inside message handle function
637 // subscriptions won't load system a lot so there is no
638 // real performance hit by cloning buffer into new go byte slice
639 cPay := append(msg.Payload[:0:0], msg.Payload...)
641 msg.PayloadLen = len(cPay)
644 case xapp.RIC_SUB_REQ:
645 go c.handleXAPPSubscriptionRequest(msg)
646 case xapp.RIC_SUB_RESP:
647 go c.handleE2TSubscriptionResponse(msg)
648 case xapp.RIC_SUB_FAILURE:
649 go c.handleE2TSubscriptionFailure(msg)
650 case xapp.RIC_SUB_DEL_REQ:
651 go c.handleXAPPSubscriptionDeleteRequest(msg)
652 case xapp.RIC_SUB_DEL_RESP:
653 go c.handleE2TSubscriptionDeleteResponse(msg)
654 case xapp.RIC_SUB_DEL_FAILURE:
655 go c.handleE2TSubscriptionDeleteFailure(msg)
657 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
662 //-------------------------------------------------------------------
663 // handle from XAPP Subscription Request
664 //------------------------------------------------------------------
665 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
666 xapp.Logger.Info("MSG from XAPP: %s", params.String())
667 c.UpdateCounter(cSubReqFromXapp)
669 subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
671 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
675 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
677 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
680 defer trans.Release()
682 if err = c.tracker.Track(trans); err != nil {
683 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
687 //TODO handle subscription toward e2term inside AssignToSubscription / hide handleSubscriptionCreate in it?
688 subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
690 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
694 c.wakeSubscriptionRequest(subs, trans)
697 //-------------------------------------------------------------------
698 // Wake Subscription Request to E2node
699 //------------------------------------------------------------------
700 func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) {
702 go c.handleSubscriptionCreate(subs, trans)
703 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
706 switch themsg := event.(type) {
707 case *e2ap.E2APSubscriptionResponse:
708 themsg.RequestId.Id = trans.RequestId.Id
709 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
712 c.UpdateCounter(cSubRespToXapp)
713 c.rmrSendToXapp("", subs, trans)
716 case *e2ap.E2APSubscriptionFailure:
717 themsg.RequestId.Id = trans.RequestId.Id
718 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
720 c.UpdateCounter(cSubFailToXapp)
721 c.rmrSendToXapp("", subs, trans)
727 xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(err, trans, subs))
728 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
731 //-------------------------------------------------------------------
732 // handle from XAPP Subscription Delete Request
733 //------------------------------------------------------------------
734 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
735 xapp.Logger.Info("MSG from XAPP: %s", params.String())
736 c.UpdateCounter(cSubDelReqFromXapp)
738 subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
740 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
744 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
746 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
749 defer trans.Release()
751 err = c.tracker.Track(trans)
753 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
757 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
759 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
766 go c.handleSubscriptionDelete(subs, trans)
767 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
769 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
771 if subs.NoRespToXapp == true {
772 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
776 // Whatever is received success, fail or timeout, send successful delete response
777 subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
778 subDelRespMsg.RequestId.Id = trans.RequestId.Id
779 subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
780 subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
781 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
783 c.UpdateCounter(cSubDelRespToXapp)
784 c.rmrSendToXapp("", subs, trans)
787 //TODO handle subscription toward e2term insiged RemoveFromSubscription / hide handleSubscriptionDelete in it?
788 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
791 //-------------------------------------------------------------------
792 // SUBS CREATE Handling
793 //-------------------------------------------------------------------
794 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp) {
796 var removeSubscriptionFromDb bool = false
797 trans := c.tracker.NewSubsTransaction(subs)
798 subs.WaitTransactionTurn(trans)
799 defer subs.ReleaseTransactionTurn(trans)
800 defer trans.Release()
802 xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
804 subRfMsg, valid := subs.GetCachedResponse()
805 if subRfMsg == nil && valid == true {
806 event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
807 switch event.(type) {
808 case *e2ap.E2APSubscriptionResponse:
809 subRfMsg, valid = subs.SetCachedResponse(event, true)
810 subs.SubRespRcvd = true
811 case *e2ap.E2APSubscriptionFailure:
812 removeSubscriptionFromDb = true
813 subRfMsg, valid = subs.SetCachedResponse(event, false)
814 xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
815 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
816 case *SubmgrRestartTestEvent:
817 // This simulates that no response has been received and after restart subscriptions are restored from db
818 xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
821 xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
822 removeSubscriptionFromDb = true
823 subRfMsg, valid = subs.SetCachedResponse(nil, false)
824 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
826 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
828 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
831 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
833 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
836 c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
837 parentTrans.SendEvent(subRfMsg, 0)
840 //-------------------------------------------------------------------
841 // SUBS DELETE Handling
842 //-------------------------------------------------------------------
844 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
846 trans := c.tracker.NewSubsTransaction(subs)
847 subs.WaitTransactionTurn(trans)
848 defer subs.ReleaseTransactionTurn(trans)
849 defer trans.Release()
851 xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
855 if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
858 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
862 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
863 // If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if
864 // RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...))
865 c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
866 c.registry.UpdateSubscriptionToDb(subs, c)
867 parentTrans.SendEvent(nil, 0)
870 //-------------------------------------------------------------------
871 // send to E2T Subscription Request
872 //-------------------------------------------------------------------
873 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
875 var event interface{} = nil
876 var timedOut bool = false
877 const ricRequestorId = 123
879 subReqMsg := subs.SubReqMsg
880 subReqMsg.RequestId = subs.GetReqId().RequestId
881 subReqMsg.RequestId.Id = ricRequestorId
882 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
884 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
888 // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
889 c.WriteSubscriptionToDb(subs)
891 for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
892 desc := fmt.Sprintf("(retry %d)", retries)
894 c.UpdateCounter(cSubReqToE2)
896 c.UpdateCounter(cSubReReqToE2)
898 c.rmrSendToE2T(desc, subs, trans)
899 if subs.DoNotWaitSubResp == false {
900 event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
902 c.UpdateCounter(cSubReqTimerExpiry)
906 // Simulating case where subscrition request has been sent but response has not been received before restart
907 event = &SubmgrRestartTestEvent{}
911 xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
915 //-------------------------------------------------------------------
916 // send to E2T Subscription Delete Request
917 //-------------------------------------------------------------------
919 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
921 var event interface{}
923 const ricRequestorId = 123
925 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
926 subDelReqMsg.RequestId = subs.GetReqId().RequestId
927 subDelReqMsg.RequestId.Id = ricRequestorId
928 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
929 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
931 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
935 for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
936 desc := fmt.Sprintf("(retry %d)", retries)
938 c.UpdateCounter(cSubDelReqToE2)
940 c.UpdateCounter(cSubDelReReqToE2)
942 c.rmrSendToE2T(desc, subs, trans)
943 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
945 c.UpdateCounter(cSubDelReqTimerExpiry)
950 xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
954 //-------------------------------------------------------------------
955 // handle from E2T Subscription Response
956 //-------------------------------------------------------------------
957 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
958 xapp.Logger.Info("MSG from E2T: %s", params.String())
959 c.UpdateCounter(cSubRespFromE2)
961 subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
963 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
966 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
968 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
971 trans := subs.GetTransaction()
973 err = fmt.Errorf("Ongoing transaction not found")
974 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
977 sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
979 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
980 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
985 //-------------------------------------------------------------------
986 // handle from E2T Subscription Failure
987 //-------------------------------------------------------------------
988 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
989 xapp.Logger.Info("MSG from E2T: %s", params.String())
990 c.UpdateCounter(cSubFailFromE2)
991 subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
993 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
996 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
998 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
1001 trans := subs.GetTransaction()
1003 err = fmt.Errorf("Ongoing transaction not found")
1004 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
1007 sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
1008 if sendOk == false {
1009 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1010 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
1015 //-------------------------------------------------------------------
1016 // handle from E2T Subscription Delete Response
1017 //-------------------------------------------------------------------
1018 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
1019 xapp.Logger.Info("MSG from E2T: %s", params.String())
1020 c.UpdateCounter(cSubDelRespFromE2)
1021 subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
1023 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1026 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
1028 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
1031 trans := subs.GetTransaction()
1033 err = fmt.Errorf("Ongoing transaction not found")
1034 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
1037 sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
1038 if sendOk == false {
1039 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1040 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
1045 //-------------------------------------------------------------------
1046 // handle from E2T Subscription Delete Failure
1047 //-------------------------------------------------------------------
1048 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
1049 xapp.Logger.Info("MSG from E2T: %s", params.String())
1050 c.UpdateCounter(cSubDelFailFromE2)
1051 subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
1053 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1056 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
1058 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
1061 trans := subs.GetTransaction()
1063 err = fmt.Errorf("Ongoing transaction not found")
1064 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
1067 sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
1068 if sendOk == false {
1069 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
1070 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
1075 //-------------------------------------------------------------------
1077 //-------------------------------------------------------------------
1078 func typeofSubsMessage(v interface{}) string {
1083 //case *e2ap.E2APSubscriptionRequest:
1085 case *e2ap.E2APSubscriptionResponse:
1087 case *e2ap.E2APSubscriptionFailure:
1089 //case *e2ap.E2APSubscriptionDeleteRequest:
1090 // return "SubDelReq"
1091 case *e2ap.E2APSubscriptionDeleteResponse:
1093 case *e2ap.E2APSubscriptionDeleteFailure:
1100 //-------------------------------------------------------------------
1102 //-------------------------------------------------------------------
1103 func (c *Control) WriteSubscriptionToDb(subs *Subscription) {
1104 xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
1105 err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
1107 xapp.Logger.Error("%v", err)
1111 //-------------------------------------------------------------------
1113 //-------------------------------------------------------------------
1114 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) {
1116 if removeSubscriptionFromDb == true {
1117 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1118 c.RemoveSubscriptionFromDb(subs)
1120 // Update is needed for successful response and merge case here
1121 if subs.RetryFromXapp == false {
1122 c.WriteSubscriptionToDb(subs)
1125 subs.RetryFromXapp = false
1128 //-------------------------------------------------------------------
1130 //-------------------------------------------------------------------
1131 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
1132 xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
1133 err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
1135 xapp.Logger.Error("%v", err)
1139 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) {
1141 const ricRequestorId = 123
1142 xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
1144 // Send delete for every endpoint in the subscription
1145 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1146 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1147 subDelReqMsg.RequestId.Id = ricRequestorId
1148 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1149 mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1151 xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
1154 for _, endPoint := range subs.EpList.Endpoints {
1155 params := &xapp.RMRParams{}
1156 params.Mtype = mType
1157 params.SubId = int(subs.GetReqId().InstanceId)
1159 params.Meid = subs.Meid
1160 params.Src = endPoint.String()
1161 params.PayloadLen = len(payload.Buf)
1162 params.Payload = payload.Buf
1164 subs.DeleteFromDb = true
1165 c.handleXAPPSubscriptionDeleteRequest(params)
1169 func (c *Control) PrintRESTSubscriptionRequest(p *models.SubscriptionParams) {
1171 fmt.Println("CRESTSubscriptionRequest")
1172 fmt.Printf(" ClientEndpoint.Host = %s\n", p.ClientEndpoint.Host)
1174 if p.ClientEndpoint.HTTPPort != nil {
1175 fmt.Printf(" ClientEndpoint.HTTPPort = %v\n", *p.ClientEndpoint.HTTPPort)
1177 fmt.Println(" ClientEndpoint.HTTPPort = nil")
1180 if p.ClientEndpoint.RMRPort != nil {
1181 fmt.Printf(" ClientEndpoint.RMRPort = %v\n", *p.ClientEndpoint.RMRPort)
1183 fmt.Println(" ClientEndpoint.RMRPort = nil")
1187 fmt.Printf(" Meid = %s\n", *p.Meid)
1189 fmt.Println(" Meid = nil")
1192 for _, subscriptionDetail := range p.SubscriptionDetails {
1193 if p.RANFunctionID != nil {
1194 fmt.Printf(" RANFunctionID = %v\n", *p.RANFunctionID)
1196 fmt.Println(" RANFunctionID = nil")
1198 fmt.Printf(" SubscriptionDetail.XappEventInstanceID = %v\n", *subscriptionDetail.XappEventInstanceID)
1199 fmt.Printf(" SubscriptionDetail.EventTriggers = %v\n", subscriptionDetail.EventTriggers)
1201 for _, actionToBeSetup := range subscriptionDetail.ActionToBeSetupList {
1202 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionID = %v\n", *actionToBeSetup.ActionID)
1203 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionType = %s\n", *actionToBeSetup.ActionType)
1204 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.ActionDefinition = %v\n", actionToBeSetup.ActionDefinition)
1206 if actionToBeSetup.SubsequentAction != nil {
1207 fmt.Printf(" SubscriptionDetail.ActionToBeSetup.SubsequentAction.SubsequentActionType = %s\n", *actionToBeSetup.SubsequentAction.SubsequentActionType)
1208 fmt.Printf(" SubscriptionDetail.ActionToBeSetup..SubsequentAction.TimeToWait = %s\n", *actionToBeSetup.SubsequentAction.TimeToWait)
1210 fmt.Println(" SubscriptionDetail.ActionToBeSetup.SubsequentAction = nil")