2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
30 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
31 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
32 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
33 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
34 httptransport "github.com/go-openapi/runtime/client"
35 "github.com/go-openapi/strfmt"
36 "github.com/gorilla/mux"
37 "github.com/segmentio/ksuid"
38 "github.com/spf13/viper"
41 //-----------------------------------------------------------------------------
43 //-----------------------------------------------------------------------------
45 func idstring(err error, entries ...fmt.Stringer) string {
46 var retval string = ""
47 var filler string = ""
48 for _, entry := range entries {
49 retval += filler + entry.String()
53 retval += filler + "err(" + err.Error() + ")"
59 //-----------------------------------------------------------------------------
61 //-----------------------------------------------------------------------------
63 var e2tSubReqTimeout time.Duration
64 var e2tSubDelReqTime time.Duration
65 var e2tRecvMsgTimeout time.Duration
66 var e2tMaxSubReqTryCount uint64 // Initial try + retry
67 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
68 var readSubsFromDb string
76 //subscriber *xapp.Subscriber
79 Counters map[string]xapp.Counter
88 type SubmgrRestartTestEvent struct{}
89 type SubmgrRestartUpEvent struct{}
92 xapp.Logger.Info("SUBMGR")
94 viper.SetEnvPrefix("submgr")
95 viper.AllowEmptyEnv(true)
98 func NewControl() *Control {
100 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
101 rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
103 registry := new(Registry)
104 registry.Initialize()
105 registry.rtmgrClient = &rtmgrClient
107 tracker := new(Tracker)
110 c := &Control{e2ap: new(E2ap),
114 //subscriber: subscriber,
115 Counters: xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
117 c.ReadConfigParameters("")
119 // Register REST handler for testing support
120 xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
121 xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET")
123 go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler, c.SubscriptionDeleteHandlerCB)
125 if readSubsFromDb == "false" {
129 // Read subscriptions from db
130 xapp.Logger.Info("Reading subscriptions from db")
131 subIds, register, err := c.ReadAllSubscriptionsFromSdl()
133 xapp.Logger.Error("%v", err)
135 c.registry.subIds = subIds
136 c.registry.register = register
137 c.HandleUncompletedSubscriptions(register)
142 func (c *Control) SymptomDataHandler(w http.ResponseWriter, r *http.Request) {
143 subscriptions, _ := c.registry.QueryHandler()
144 xapp.Resource.SendSymptomDataJson(w, r, subscriptions, "platform/subscriptions.json")
147 //-------------------------------------------------------------------
149 //-------------------------------------------------------------------
150 func (c *Control) ReadConfigParameters(f string) {
152 // viper.GetDuration returns nanoseconds
153 e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
154 if e2tSubReqTimeout == 0 {
155 e2tSubReqTimeout = 2000 * 1000000
157 xapp.Logger.Info("e2tSubReqTimeout %v", e2tSubReqTimeout)
158 e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
159 if e2tSubDelReqTime == 0 {
160 e2tSubDelReqTime = 2000 * 1000000
162 xapp.Logger.Info("e2tSubDelReqTime %v", e2tSubDelReqTime)
163 e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
164 if e2tRecvMsgTimeout == 0 {
165 e2tRecvMsgTimeout = 2000 * 1000000
167 xapp.Logger.Info("e2tRecvMsgTimeout %v", e2tRecvMsgTimeout)
168 e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
169 if e2tMaxSubReqTryCount == 0 {
170 e2tMaxSubReqTryCount = 1
172 xapp.Logger.Info("e2tMaxSubReqTryCount %v", e2tMaxSubReqTryCount)
173 e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
174 if e2tMaxSubDelReqTryCount == 0 {
175 e2tMaxSubDelReqTryCount = 1
177 xapp.Logger.Info("e2tMaxSubDelReqTryCount %v", e2tMaxSubDelReqTryCount)
179 readSubsFromDb = viper.GetString("controls.readSubsFromDb")
180 if readSubsFromDb == "" {
181 readSubsFromDb = "true"
183 xapp.Logger.Info("readSubsFromDb %v", readSubsFromDb)
186 //-------------------------------------------------------------------
188 //-------------------------------------------------------------------
189 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
191 xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
192 for subId, subs := range register {
193 if subs.SubRespRcvd == false {
194 subs.NoRespToXapp = true
195 xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
196 c.SendSubscriptionDeleteReq(subs)
201 func (c *Control) ReadyCB(data interface{}) {
202 if c.RMRClient == nil {
203 c.RMRClient = xapp.Rmr
207 func (c *Control) Run() {
208 xapp.SetReadyCB(c.ReadyCB, nil)
209 xapp.AddConfigChangeListener(c.ReadConfigParameters)
213 //-------------------------------------------------------------------
215 //-------------------------------------------------------------------
216 func (c *Control) SubscriptionHandler(params interface{}) (*models.SubscriptionResponse, error) {
218 restSubId := ksuid.New().String()
219 subResp := models.SubscriptionResponse{}
220 subResp.SubscriptionID = &restSubId
221 p := params.(*models.SubscriptionParams)
225 c.UpdateCounter(cSubReqFromXapp)
227 if p.ClientEndpoint == nil {
228 xapp.Logger.Error("ClientEndpoint == nil")
229 return nil, fmt.Errorf("")
232 _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*p.ClientEndpoint)
234 xapp.Logger.Error("%s", err.Error())
238 restSubscription, err := c.registry.CreateRESTSubscription(&restSubId, &xAppRmrEndpoint, p.Meid)
240 xapp.Logger.Error("%s", err.Error())
244 subReqList := e2ap.SubscriptionRequestList{}
245 err = c.e2ap.FillSubscriptionReqMsgs(params, &subReqList, restSubscription)
247 xapp.Logger.Error("%s", err.Error())
248 c.registry.DeleteRESTSubscription(&restSubId)
252 go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId)
258 //-------------------------------------------------------------------
260 //-------------------------------------------------------------------
262 func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription, subReqList *e2ap.SubscriptionRequestList,
263 clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string) {
265 xapp.Logger.Info("Subscription Request count=%v ", len(subReqList.E2APSubscriptionRequests))
267 _, xAppRmrEndpoint, err := ConstructEndpointAddresses(*clientEndpoint)
269 xapp.Logger.Error("%s", err.Error())
273 var requestorID int64
275 for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ {
276 subReqMsg := subReqList.E2APSubscriptionRequests[index]
278 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), *restSubId, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid})
280 c.registry.DeleteRESTSubscription(restSubId)
281 xapp.Logger.Error("XAPP-SubReq transaction not created. RESTSubId=%s, EndPoint=%s, Meid=%s", *restSubId, xAppRmrEndpoint, *meid)
285 defer trans.Release()
286 xapp.Logger.Info("Handle SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
287 subRespMsg, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, restSubId)
289 // Send notification to xApp that prosessing of a Subscription Request has failed. Currently it is not possible
290 // to indicate error. Such possibility should be added. As a workaround requestorID and instanceId are set to zero value
291 requestorID = (int64)(0)
292 instanceId = (int64)(0)
293 resp := &models.SubscriptionResponse{
294 SubscriptionID: restSubId,
295 SubscriptionInstances: []*models.SubscriptionInstance{
296 &models.SubscriptionInstance{RequestorID: &requestorID, InstanceID: &instanceId},
299 // Mark REST subscription request processed.
300 restSubscription.SetProcessed()
301 xapp.Logger.Info("Sending unsuccessful REST notification to endpoint=%v:%v, InstanceId=%v, %s", clientEndpoint.Host, clientEndpoint.HTTPPort, instanceId, idstring(nil, trans))
302 xapp.Subscription.Notify(resp, *clientEndpoint)
304 xapp.Logger.Info("SubscriptionRequest index=%v processed successfully. endpoint=%v, InstanceId=%v, %s", index, *clientEndpoint, instanceId, idstring(nil, trans))
306 // Store successfully processed InstanceId for deletion
307 restSubscription.AddInstanceId(subRespMsg.RequestId.InstanceId)
309 // Send notification to xApp that a Subscription Request has been processed.
310 requestorID = (int64)(subRespMsg.RequestId.Id)
311 instanceId = (int64)(subRespMsg.RequestId.InstanceId)
312 resp := &models.SubscriptionResponse{
313 SubscriptionID: restSubId,
314 SubscriptionInstances: []*models.SubscriptionInstance{
315 &models.SubscriptionInstance{RequestorID: &requestorID, InstanceID: &instanceId},
318 // Mark REST subscription request processesd.
319 restSubscription.SetProcessed()
320 xapp.Logger.Info("Sending successful REST notification to endpoint=%v, InstanceId=%v, %s", *clientEndpoint, instanceId, idstring(nil, trans))
321 xapp.Subscription.Notify(resp, *clientEndpoint)
323 c.UpdateCounter(cSubRespToXapp)
327 //-------------------------------------------------------------------
329 //------------------------------------------------------------------
330 func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, meid *string,
331 restSubId *string) (*e2ap.E2APSubscriptionResponse, error) {
333 err := c.tracker.Track(trans)
335 err = fmt.Errorf("XAPP-SubReq: %s", idstring(err, trans))
336 xapp.Logger.Error("%s", err.Error())
340 subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
342 err = fmt.Errorf("XAPP-SubReq: %s", idstring(err, trans))
343 xapp.Logger.Error("%s", err.Error())
350 go c.handleSubscriptionCreate(subs, trans)
351 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
355 switch themsg := event.(type) {
356 case *e2ap.E2APSubscriptionResponse:
359 case *e2ap.E2APSubscriptionFailure:
360 err = fmt.Errorf("SubscriptionFailure received")
366 err = fmt.Errorf("XAPP-SubReq: failed %s", idstring(err, trans, subs))
367 xapp.Logger.Error("%s", err.Error())
368 c.registry.RemoveFromSubscription(subs, trans, 5*time.Second, c)
372 //-------------------------------------------------------------------
374 //-------------------------------------------------------------------
375 func (c *Control) SubscriptionDeleteHandlerCB(restSubId string) error {
378 c.UpdateCounter(cSubDelReqFromXapp)
380 xapp.Logger.Info("SubscriptionDeleteRequest from XAPP")
382 restSubscription, err := c.registry.GetRESTSubscription(restSubId)
384 xapp.Logger.Error("%s", err.Error())
385 if restSubscription == nil {
386 // Subscription was not found
389 if restSubscription.SubReqOngoing == true {
390 err := fmt.Errorf("Handling of the REST Subscription Request still ongoing %s", restSubId)
391 xapp.Logger.Error("%s", err.Error())
393 } else if restSubscription.SubDelReqOngoing == true {
394 // Previous request for same restSubId still ongoing
400 xAppRmrEndPoint := restSubscription.xAppRmrEndPoint
402 for _, instanceId := range restSubscription.InstanceIds {
403 err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId)
405 xapp.Logger.Error("%s", err.Error())
408 xapp.Logger.Info("Deleteting instanceId = %v", instanceId)
409 restSubscription.DeleteInstanceId(instanceId)
411 c.registry.DeleteRESTSubscription(&restSubId)
414 c.UpdateCounter(cSubDelRespToXapp)
419 //-------------------------------------------------------------------
421 //-------------------------------------------------------------------
422 func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32) error {
424 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(*endPoint), *restSubId, e2ap.RequestId{0, 0}, &xapp.RMRMeid{RanName: *meid})
426 err := fmt.Errorf("XAPP-SubDelReq transaction not created. restSubId %s, endPoint %s, meid %s, instanceId %v", *restSubId, *endPoint, *meid, instanceId)
427 xapp.Logger.Error("%s", err.Error())
429 defer trans.Release()
431 err := c.tracker.Track(trans)
433 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
434 xapp.Logger.Error("%s", err.Error())
435 return &time.ParseError{}
438 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{instanceId})
440 err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
441 xapp.Logger.Error("%s", err.Error())
447 go c.handleSubscriptionDelete(subs, trans)
448 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
450 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
452 c.registry.RemoveFromSubscription(subs, trans, 5*time.Second, c)
457 //-------------------------------------------------------------------
459 //-------------------------------------------------------------------
460 func (c *Control) QueryHandler() (models.SubscriptionList, error) {
461 xapp.Logger.Info("QueryHandler() called")
465 return c.registry.QueryHandler()
468 func (c *Control) TestRestHandler(w http.ResponseWriter, r *http.Request) {
469 xapp.Logger.Info("TestRestHandler() called")
471 pathParams := mux.Vars(r)
472 s := pathParams["testId"]
474 // This can be used to delete single subscription from db
475 if contains := strings.Contains(s, "deletesubid="); contains == true {
476 var splits = strings.Split(s, "=")
477 if subId, err := strconv.ParseInt(splits[1], 10, 64); err == nil {
478 xapp.Logger.Info("RemoveSubscriptionFromSdl() called. subId = %v", subId)
479 c.RemoveSubscriptionFromSdl(uint32(subId))
484 // This can be used to remove all subscriptions db from
486 xapp.Logger.Info("RemoveAllSubscriptionsFromSdl() called")
487 c.RemoveAllSubscriptionsFromSdl()
491 // This is meant to cause submgr's restart in testing
493 xapp.Logger.Info("os.Exit(1) called")
497 xapp.Logger.Info("Unsupported rest command received %s", s)
500 //-------------------------------------------------------------------
502 //-------------------------------------------------------------------
504 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
505 params := &xapp.RMRParams{}
506 params.Mtype = trans.GetMtype()
507 params.SubId = int(subs.GetReqId().InstanceId)
509 params.Meid = subs.GetMeid()
511 params.PayloadLen = len(trans.Payload.Buf)
512 params.Payload = trans.Payload.Buf
514 xapp.Logger.Info("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
515 err = c.SendWithRetry(params, false, 5)
517 xapp.Logger.Error("rmrSendToE2T: Send failed: %+v", err)
522 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
524 params := &xapp.RMRParams{}
525 params.Mtype = trans.GetMtype()
526 params.SubId = int(subs.GetReqId().InstanceId)
527 params.Xid = trans.GetXid()
528 params.Meid = trans.GetMeid()
530 params.PayloadLen = len(trans.Payload.Buf)
531 params.Payload = trans.Payload.Buf
533 xapp.Logger.Info("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
534 err = c.SendWithRetry(params, false, 5)
536 xapp.Logger.Error("rmrSendToXapp: Send failed: %+v", err)
541 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
542 if c.RMRClient == nil {
543 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
544 xapp.Logger.Error("%s", err.Error())
549 defer c.RMRClient.Free(msg.Mbuf)
551 // xapp-frame might use direct access to c buffer and
552 // when msg.Mbuf is freed, someone might take it into use
553 // and payload data might be invalid inside message handle function
555 // subscriptions won't load system a lot so there is no
556 // real performance hit by cloning buffer into new go byte slice
557 cPay := append(msg.Payload[:0:0], msg.Payload...)
559 msg.PayloadLen = len(cPay)
562 case xapp.RIC_SUB_REQ:
563 go c.handleXAPPSubscriptionRequest(msg)
564 case xapp.RIC_SUB_RESP:
565 go c.handleE2TSubscriptionResponse(msg)
566 case xapp.RIC_SUB_FAILURE:
567 go c.handleE2TSubscriptionFailure(msg)
568 case xapp.RIC_SUB_DEL_REQ:
569 go c.handleXAPPSubscriptionDeleteRequest(msg)
570 case xapp.RIC_SUB_DEL_RESP:
571 go c.handleE2TSubscriptionDeleteResponse(msg)
572 case xapp.RIC_SUB_DEL_FAILURE:
573 go c.handleE2TSubscriptionDeleteFailure(msg)
575 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
580 //-------------------------------------------------------------------
581 // handle from XAPP Subscription Request
582 //------------------------------------------------------------------
583 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
584 xapp.Logger.Info("MSG from XAPP: %s", params.String())
585 c.UpdateCounter(cSubReqFromXapp)
587 subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
589 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
593 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
595 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
598 defer trans.Release()
600 if err = c.tracker.Track(trans); err != nil {
601 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
605 //TODO handle subscription toward e2term inside AssignToSubscription / hide handleSubscriptionCreate in it?
606 subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
608 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
612 c.wakeSubscriptionRequest(subs, trans)
615 //-------------------------------------------------------------------
616 // Wake Subscription Request to E2node
617 //------------------------------------------------------------------
618 func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) {
620 go c.handleSubscriptionCreate(subs, trans)
621 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
624 switch themsg := event.(type) {
625 case *e2ap.E2APSubscriptionResponse:
626 themsg.RequestId.Id = trans.RequestId.Id
627 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
630 c.UpdateCounter(cSubRespToXapp)
631 c.rmrSendToXapp("", subs, trans)
634 case *e2ap.E2APSubscriptionFailure:
635 themsg.RequestId.Id = trans.RequestId.Id
636 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
638 c.UpdateCounter(cSubFailToXapp)
639 c.rmrSendToXapp("", subs, trans)
645 xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(err, trans, subs))
646 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
649 //-------------------------------------------------------------------
650 // handle from XAPP Subscription Delete Request
651 //------------------------------------------------------------------
652 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
653 xapp.Logger.Info("MSG from XAPP: %s", params.String())
654 c.UpdateCounter(cSubDelReqFromXapp)
656 subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
658 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
662 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
664 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
667 defer trans.Release()
669 err = c.tracker.Track(trans)
671 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
675 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
677 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
684 go c.handleSubscriptionDelete(subs, trans)
685 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
687 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
689 if subs.NoRespToXapp == true {
690 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
694 // Whatever is received success, fail or timeout, send successful delete response
695 subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
696 subDelRespMsg.RequestId.Id = trans.RequestId.Id
697 subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
698 subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
699 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
701 c.UpdateCounter(cSubDelRespToXapp)
702 c.rmrSendToXapp("", subs, trans)
705 //TODO handle subscription toward e2term insiged RemoveFromSubscription / hide handleSubscriptionDelete in it?
706 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
709 //-------------------------------------------------------------------
710 // SUBS CREATE Handling
711 //-------------------------------------------------------------------
712 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp) {
714 var removeSubscriptionFromDb bool = false
715 trans := c.tracker.NewSubsTransaction(subs)
716 subs.WaitTransactionTurn(trans)
717 defer subs.ReleaseTransactionTurn(trans)
718 defer trans.Release()
720 xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
722 subRfMsg, valid := subs.GetCachedResponse()
723 if subRfMsg == nil && valid == true {
724 event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
725 switch event.(type) {
726 case *e2ap.E2APSubscriptionResponse:
727 subRfMsg, valid = subs.SetCachedResponse(event, true)
728 subs.SubRespRcvd = true
729 case *e2ap.E2APSubscriptionFailure:
730 removeSubscriptionFromDb = true
731 subRfMsg, valid = subs.SetCachedResponse(event, false)
732 xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
733 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
734 case *SubmgrRestartTestEvent:
735 // This simulates that no response has been received and after restart subscriptions are restored from db
736 xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
739 xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
740 removeSubscriptionFromDb = true
741 subRfMsg, valid = subs.SetCachedResponse(nil, false)
742 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
744 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
746 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
749 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
751 c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second, c)
754 c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
755 parentTrans.SendEvent(subRfMsg, 0)
758 //-------------------------------------------------------------------
759 // SUBS DELETE Handling
760 //-------------------------------------------------------------------
762 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
764 trans := c.tracker.NewSubsTransaction(subs)
765 subs.WaitTransactionTurn(trans)
766 defer subs.ReleaseTransactionTurn(trans)
767 defer trans.Release()
769 xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
773 if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
776 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
780 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
781 // If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if
782 // RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...))
783 c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second, c)
784 c.registry.UpdateSubscriptionToDb(subs, c)
785 parentTrans.SendEvent(nil, 0)
788 //-------------------------------------------------------------------
789 // send to E2T Subscription Request
790 //-------------------------------------------------------------------
791 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
793 var event interface{} = nil
794 var timedOut bool = false
796 subReqMsg := subs.SubReqMsg
797 subReqMsg.RequestId = subs.GetReqId().RequestId
798 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
800 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
804 // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
805 c.WriteSubscriptionToDb(subs)
806 for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
807 desc := fmt.Sprintf("(retry %d)", retries)
809 c.UpdateCounter(cSubReqToE2)
811 c.UpdateCounter(cSubReReqToE2)
813 c.rmrSendToE2T(desc, subs, trans)
814 if subs.DoNotWaitSubResp == false {
815 event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
817 c.UpdateCounter(cSubReqTimerExpiry)
821 // Simulating case where subscrition request has been sent but response has not been received before restart
822 event = &SubmgrRestartTestEvent{}
826 xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
830 //-------------------------------------------------------------------
831 // send to E2T Subscription Delete Request
832 //-------------------------------------------------------------------
834 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
836 var event interface{}
839 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
840 subDelReqMsg.RequestId = subs.GetReqId().RequestId
841 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
842 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
844 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
848 for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
849 desc := fmt.Sprintf("(retry %d)", retries)
851 c.UpdateCounter(cSubDelReqToE2)
853 c.UpdateCounter(cSubDelReReqToE2)
855 c.rmrSendToE2T(desc, subs, trans)
856 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
858 c.UpdateCounter(cSubDelReqTimerExpiry)
863 xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
867 //-------------------------------------------------------------------
868 // handle from E2T Subscription Response
869 //-------------------------------------------------------------------
870 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
871 xapp.Logger.Info("MSG from E2T: %s", params.String())
872 c.UpdateCounter(cSubRespFromE2)
873 subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
875 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
878 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
880 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
883 trans := subs.GetTransaction()
885 err = fmt.Errorf("Ongoing transaction not found")
886 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
889 sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
891 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
892 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
897 //-------------------------------------------------------------------
898 // handle from E2T Subscription Failure
899 //-------------------------------------------------------------------
900 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
901 xapp.Logger.Info("MSG from E2T: %s", params.String())
902 c.UpdateCounter(cSubFailFromE2)
903 subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
905 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
908 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
910 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
913 trans := subs.GetTransaction()
915 err = fmt.Errorf("Ongoing transaction not found")
916 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
919 sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
921 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
922 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
927 //-------------------------------------------------------------------
928 // handle from E2T Subscription Delete Response
929 //-------------------------------------------------------------------
930 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
931 xapp.Logger.Info("MSG from E2T: %s", params.String())
932 c.UpdateCounter(cSubDelRespFromE2)
933 subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
935 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
938 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
940 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
943 trans := subs.GetTransaction()
945 err = fmt.Errorf("Ongoing transaction not found")
946 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
949 sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
951 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
952 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
957 //-------------------------------------------------------------------
958 // handle from E2T Subscription Delete Failure
959 //-------------------------------------------------------------------
960 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
961 xapp.Logger.Info("MSG from E2T: %s", params.String())
962 c.UpdateCounter(cSubDelFailFromE2)
963 subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
965 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
968 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
970 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
973 trans := subs.GetTransaction()
975 err = fmt.Errorf("Ongoing transaction not found")
976 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
979 sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
981 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
982 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
987 //-------------------------------------------------------------------
989 //-------------------------------------------------------------------
990 func typeofSubsMessage(v interface{}) string {
995 case *e2ap.E2APSubscriptionRequest:
997 case *e2ap.E2APSubscriptionResponse:
999 case *e2ap.E2APSubscriptionFailure:
1001 case *e2ap.E2APSubscriptionDeleteRequest:
1003 case *e2ap.E2APSubscriptionDeleteResponse:
1005 case *e2ap.E2APSubscriptionDeleteFailure:
1012 //-------------------------------------------------------------------
1014 //-------------------------------------------------------------------
1015 func (c *Control) WriteSubscriptionToDb(subs *Subscription) {
1016 xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
1017 err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
1019 xapp.Logger.Error("%v", err)
1023 //-------------------------------------------------------------------
1025 //-------------------------------------------------------------------
1026 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) {
1028 if removeSubscriptionFromDb == true {
1029 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
1030 c.RemoveSubscriptionFromDb(subs)
1032 // Update is needed for successful response and merge case here
1033 if subs.RetryFromXapp == false {
1034 c.WriteSubscriptionToDb(subs)
1037 subs.RetryFromXapp = false
1040 //-------------------------------------------------------------------
1042 //-------------------------------------------------------------------
1043 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
1044 xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
1045 err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
1047 xapp.Logger.Error("%v", err)
1051 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) {
1053 xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
1055 // Send delete for every endpoint in the subscription
1056 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
1057 subDelReqMsg.RequestId = subs.GetReqId().RequestId
1058 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
1059 mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
1061 xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
1064 for _, endPoint := range subs.EpList.Endpoints {
1065 params := &xapp.RMRParams{}
1066 params.Mtype = mType
1067 params.SubId = int(subs.GetReqId().InstanceId)
1069 params.Meid = subs.Meid
1070 params.Src = endPoint.String()
1071 params.PayloadLen = len(payload.Buf)
1072 params.Payload = payload.Buf
1074 subs.DeleteFromDb = true
1075 c.handleXAPPSubscriptionDeleteRequest(params)