2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
30 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
31 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
32 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
33 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
34 httptransport "github.com/go-openapi/runtime/client"
35 "github.com/go-openapi/strfmt"
36 "github.com/gorilla/mux"
37 "github.com/spf13/viper"
40 //-----------------------------------------------------------------------------
42 //-----------------------------------------------------------------------------
44 func idstring(err error, entries ...fmt.Stringer) string {
45 var retval string = ""
46 var filler string = ""
47 for _, entry := range entries {
48 retval += filler + entry.String()
52 retval += filler + "err(" + err.Error() + ")"
59 //-----------------------------------------------------------------------------
61 //-----------------------------------------------------------------------------
63 var e2tSubReqTimeout time.Duration
64 var e2tSubDelReqTime time.Duration
65 var e2tRecvMsgTimeout time.Duration
66 var e2tMaxSubReqTryCount uint64 // Initial try + retry
67 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
68 var readSubsFromDb string
76 //subscriber *xapp.Subscriber
79 Counters map[string]xapp.Counter
88 type SubmgrRestartTestEvent struct{}
89 type SubmgrRestartUpEvent struct{}
92 xapp.Logger.Info("SUBMGR")
94 viper.SetEnvPrefix("submgr")
95 viper.AllowEmptyEnv(true)
98 func NewControl() *Control {
100 ReadConfigParameters()
101 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
102 rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
104 registry := new(Registry)
105 registry.Initialize()
106 registry.rtmgrClient = &rtmgrClient
108 tracker := new(Tracker)
111 //subscriber := xapp.NewSubscriber(viper.GetString("subscription.host"), viper.GetInt("subscription.timeout"))
113 c := &Control{e2ap: new(E2ap),
117 //subscriber: subscriber,
118 Counters: xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
121 // Register REST handler for testing support
122 xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
124 go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler, c.SubscriptionDeleteHandler)
125 //go c.subscriber.Listen(c.SubscriptionHandler, c.QueryHandler)
127 if readSubsFromDb == "false" {
131 // Read subscriptions from db
132 xapp.Logger.Info("Reading subscriptions from db")
133 subIds, register, err := c.ReadAllSubscriptionsFromSdl()
135 xapp.Logger.Error("%v", err)
137 c.registry.subIds = subIds
138 c.registry.register = register
139 c.HandleUncompletedSubscriptions(register)
144 //-------------------------------------------------------------------
146 //-------------------------------------------------------------------
147 func ReadConfigParameters() {
149 // viper.GetDuration returns nanoseconds
150 e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
151 if e2tSubReqTimeout == 0 {
152 e2tSubReqTimeout = 2000 * 1000000
154 xapp.Logger.Info("e2tSubReqTimeout %v", e2tSubReqTimeout)
155 e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
156 if e2tSubDelReqTime == 0 {
157 e2tSubDelReqTime = 2000 * 1000000
159 xapp.Logger.Info("e2tSubDelReqTime %v", e2tSubDelReqTime)
160 e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
161 if e2tRecvMsgTimeout == 0 {
162 e2tRecvMsgTimeout = 2000 * 1000000
164 xapp.Logger.Info("e2tRecvMsgTimeout %v", e2tRecvMsgTimeout)
165 e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
166 if e2tMaxSubReqTryCount == 0 {
167 e2tMaxSubReqTryCount = 1
169 xapp.Logger.Info("e2tMaxSubReqTryCount %v", e2tMaxSubReqTryCount)
170 e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
171 if e2tMaxSubDelReqTryCount == 0 {
172 e2tMaxSubDelReqTryCount = 1
174 xapp.Logger.Info("e2tMaxSubDelReqTryCount %v", e2tMaxSubDelReqTryCount)
176 readSubsFromDb = viper.GetString("controls.readSubsFromDb")
177 if readSubsFromDb == "" {
178 readSubsFromDb = "true"
180 xapp.Logger.Info("readSubsFromDb %v", readSubsFromDb)
183 //-------------------------------------------------------------------
185 //-------------------------------------------------------------------
186 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
188 xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
189 for subId, subs := range register {
190 if subs.SubRespRcvd == false {
191 subs.NoRespToXapp = true
192 xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
193 c.SendSubscriptionDeleteReq(subs)
198 func (c *Control) ReadyCB(data interface{}) {
199 if c.RMRClient == nil {
200 c.RMRClient = xapp.Rmr
204 func (c *Control) Run() {
205 xapp.SetReadyCB(c.ReadyCB, nil)
209 //-------------------------------------------------------------------
211 //-------------------------------------------------------------------
212 func (c *Control) SubscriptionHandler(stype models.SubscriptionType, params interface{}) (*models.SubscriptionResponse, error) {
214 switch p := params.(type) {
215 case *models.ReportParams:
216 trans := c.tracker.NewXappTransaction(NewRmrEndpoint(p.ClientEndpoint),"" , 0, &xapp.RMRMeid{RanName: p.Meid})
218 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
221 defer trans.Release()
222 case *models.ControlParams:
223 case *models.PolicyParams:
226 return &models.SubscriptionResponse{}, fmt.Errorf("Subscription rest interface not implemented")
229 func (c *Control) SubscriptionDeleteHandler(s string) error {
233 func (c *Control) QueryHandler() (models.SubscriptionList, error) {
234 return c.registry.QueryHandler()
237 func (c *Control) TestRestHandler(w http.ResponseWriter, r *http.Request) {
239 xapp.Logger.Info("TestRestHandler() called")
241 pathParams := mux.Vars(r)
242 s := pathParams["testId"]
244 // This can be used to delete single subscription from db
245 if contains := strings.Contains(s, "deletesubid="); contains == true {
246 var splits = strings.Split(s, "=")
247 if subId, err := strconv.ParseInt(splits[1], 10, 64); err == nil {
248 xapp.Logger.Info("RemoveSubscriptionFromSdl() called. subId = %v", subId)
249 c.RemoveSubscriptionFromSdl(uint32(subId))
254 // This can be used to remove all subscriptions db from
256 xapp.Logger.Info("RemoveAllSubscriptionsFromSdl() called")
257 c.RemoveAllSubscriptionsFromSdl()
261 // This is meant to cause submgr's restart in testing
263 xapp.Logger.Info("os.Exit(1) called")
267 xapp.Logger.Info("Unsupported rest command received %s", s)
270 //-------------------------------------------------------------------
272 //-------------------------------------------------------------------
274 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
275 params := &xapp.RMRParams{}
276 params.Mtype = trans.GetMtype()
277 params.SubId = int(subs.GetReqId().InstanceId)
279 params.Meid = subs.GetMeid()
281 params.PayloadLen = len(trans.Payload.Buf)
282 params.Payload = trans.Payload.Buf
284 xapp.Logger.Info("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
285 return c.SendWithRetry(params, false, 5)
288 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
290 params := &xapp.RMRParams{}
291 params.Mtype = trans.GetMtype()
292 params.SubId = int(subs.GetReqId().InstanceId)
293 params.Xid = trans.GetXid()
294 params.Meid = trans.GetMeid()
296 params.PayloadLen = len(trans.Payload.Buf)
297 params.Payload = trans.Payload.Buf
299 xapp.Logger.Info("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
300 return c.SendWithRetry(params, false, 5)
303 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
304 if c.RMRClient == nil {
305 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
306 xapp.Logger.Error("%s", err.Error())
311 defer c.RMRClient.Free(msg.Mbuf)
313 // xapp-frame might use direct access to c buffer and
314 // when msg.Mbuf is freed, someone might take it into use
315 // and payload data might be invalid inside message handle function
317 // subscriptions won't load system a lot so there is no
318 // real performance hit by cloning buffer into new go byte slice
319 cPay := append(msg.Payload[:0:0], msg.Payload...)
321 msg.PayloadLen = len(cPay)
324 case xapp.RIC_SUB_REQ:
325 go c.handleXAPPSubscriptionRequest(msg)
326 case xapp.RIC_SUB_RESP:
327 go c.handleE2TSubscriptionResponse(msg)
328 case xapp.RIC_SUB_FAILURE:
329 go c.handleE2TSubscriptionFailure(msg)
330 case xapp.RIC_SUB_DEL_REQ:
331 go c.handleXAPPSubscriptionDeleteRequest(msg)
332 case xapp.RIC_SUB_DEL_RESP:
333 go c.handleE2TSubscriptionDeleteResponse(msg)
334 case xapp.RIC_SUB_DEL_FAILURE:
335 go c.handleE2TSubscriptionDeleteFailure(msg)
337 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
342 //-------------------------------------------------------------------
343 // handle from XAPP Subscription Request
344 //------------------------------------------------------------------
345 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
346 xapp.Logger.Info("MSG from XAPP: %s", params.String())
347 c.UpdateCounter(cSubReqFromXapp)
349 subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
351 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
355 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId.InstanceId, params.Meid)
357 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
360 defer trans.Release()
362 err = c.tracker.Track(trans)
364 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
368 //TODO handle subscription toward e2term inside AssignToSubscription / hide handleSubscriptionCreate in it?
369 subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
371 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
378 go c.handleSubscriptionCreate(subs, trans)
379 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
382 switch themsg := event.(type) {
383 case *e2ap.E2APSubscriptionResponse:
384 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
387 c.UpdateCounter(cSubRespToXapp)
388 c.rmrSendToXapp("", subs, trans)
391 case *e2ap.E2APSubscriptionFailure:
392 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
394 c.UpdateCounter(cSubFailToXapp)
395 c.rmrSendToXapp("", subs, trans)
401 xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(err, trans, subs))
402 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
405 //-------------------------------------------------------------------
406 // handle from XAPP Subscription Delete Request
407 //------------------------------------------------------------------
408 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
409 xapp.Logger.Info("MSG from XAPP: %s", params.String())
410 c.UpdateCounter(cSubDelReqFromXapp)
412 subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
414 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
418 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId.InstanceId, params.Meid)
420 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
423 defer trans.Release()
425 err = c.tracker.Track(trans)
427 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
431 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
433 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
440 go c.handleSubscriptionDelete(subs, trans)
441 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
443 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
445 if subs.NoRespToXapp == true {
446 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
450 // Whatever is received success, fail or timeout, send successful delete response
451 subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
452 subDelRespMsg.RequestId = subs.GetReqId().RequestId
453 subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
454 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
456 c.UpdateCounter(cSubDelRespToXapp)
457 c.rmrSendToXapp("", subs, trans)
460 //TODO handle subscription toward e2term insiged RemoveFromSubscription / hide handleSubscriptionDelete in it?
461 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
464 //-------------------------------------------------------------------
465 // SUBS CREATE Handling
466 //-------------------------------------------------------------------
467 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp) {
469 var removeSubscriptionFromDb bool = false
470 trans := c.tracker.NewSubsTransaction(subs)
471 subs.WaitTransactionTurn(trans)
472 defer subs.ReleaseTransactionTurn(trans)
473 defer trans.Release()
475 xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
477 subRfMsg, valid := subs.GetCachedResponse()
478 if subRfMsg == nil && valid == true {
479 event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
480 switch event.(type) {
481 case *e2ap.E2APSubscriptionResponse:
482 subRfMsg, valid = subs.SetCachedResponse(event, true)
483 subs.SubRespRcvd = true
484 case *e2ap.E2APSubscriptionFailure:
485 removeSubscriptionFromDb = true
486 subRfMsg, valid = subs.SetCachedResponse(event, false)
487 xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
488 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
489 case *SubmgrRestartTestEvent:
490 // This simulates that no response has been received and after restart subscriptions are restored from db
491 xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
494 xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
495 removeSubscriptionFromDb = true
496 subRfMsg, valid = subs.SetCachedResponse(nil, false)
497 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
499 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
501 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
504 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
506 c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second, c)
509 c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
510 parentTrans.SendEvent(subRfMsg, 0)
513 //-------------------------------------------------------------------
514 // SUBS DELETE Handling
515 //-------------------------------------------------------------------
517 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
519 trans := c.tracker.NewSubsTransaction(subs)
520 subs.WaitTransactionTurn(trans)
521 defer subs.ReleaseTransactionTurn(trans)
522 defer trans.Release()
524 xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
528 if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
531 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
535 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
536 // If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if
537 // RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...))
538 c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second, c)
539 c.registry.UpdateSubscriptionToDb(subs, c)
540 parentTrans.SendEvent(nil, 0)
543 //-------------------------------------------------------------------
544 // send to E2T Subscription Request
545 //-------------------------------------------------------------------
546 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
548 var event interface{} = nil
549 var timedOut bool = false
551 subReqMsg := subs.SubReqMsg
552 subReqMsg.RequestId = subs.GetReqId().RequestId
553 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
555 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
559 // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
560 c.WriteSubscriptionToDb(subs)
561 for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
562 desc := fmt.Sprintf("(retry %d)", retries)
564 c.UpdateCounter(cSubReqToE2)
566 c.UpdateCounter(cSubReReqToE2)
568 c.rmrSendToE2T(desc, subs, trans)
569 if subs.DoNotWaitSubResp == false {
570 event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
572 c.UpdateCounter(cSubReqTimerExpiry)
576 // Simulating case where subscrition request has been sent but response has not been received before restart
577 event = &SubmgrRestartTestEvent{}
581 xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
585 //-------------------------------------------------------------------
586 // send to E2T Subscription Delete Request
587 //-------------------------------------------------------------------
589 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
591 var event interface{}
594 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
595 subDelReqMsg.RequestId = subs.GetReqId().RequestId
596 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
597 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
599 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
603 for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
604 desc := fmt.Sprintf("(retry %d)", retries)
606 c.UpdateCounter(cSubDelReqToE2)
608 c.UpdateCounter(cSubDelReReqToE2)
610 c.rmrSendToE2T(desc, subs, trans)
611 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
613 c.UpdateCounter(cSubDelReqTimerExpiry)
618 xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
622 //-------------------------------------------------------------------
623 // handle from E2T Subscription Response
624 //-------------------------------------------------------------------
625 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
626 xapp.Logger.Info("MSG from E2T: %s", params.String())
627 c.UpdateCounter(cSubRespFromE2)
628 subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
630 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
633 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
635 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
638 trans := subs.GetTransaction()
640 err = fmt.Errorf("Ongoing transaction not found")
641 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
644 sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
646 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
647 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
652 //-------------------------------------------------------------------
653 // handle from E2T Subscription Failure
654 //-------------------------------------------------------------------
655 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
656 xapp.Logger.Info("MSG from E2T: %s", params.String())
657 c.UpdateCounter(cSubFailFromE2)
658 subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
660 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
663 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
665 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
668 trans := subs.GetTransaction()
670 err = fmt.Errorf("Ongoing transaction not found")
671 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
674 sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
676 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
677 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
682 //-------------------------------------------------------------------
683 // handle from E2T Subscription Delete Response
684 //-------------------------------------------------------------------
685 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
686 xapp.Logger.Info("MSG from E2T: %s", params.String())
687 c.UpdateCounter(cSubDelRespFromE2)
688 subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
690 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
693 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
695 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
698 trans := subs.GetTransaction()
700 err = fmt.Errorf("Ongoing transaction not found")
701 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
704 sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
706 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
707 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
712 //-------------------------------------------------------------------
713 // handle from E2T Subscription Delete Failure
714 //-------------------------------------------------------------------
715 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
716 xapp.Logger.Info("MSG from E2T: %s", params.String())
717 c.UpdateCounter(cSubDelFailFromE2)
718 subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
720 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
723 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
725 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
728 trans := subs.GetTransaction()
730 err = fmt.Errorf("Ongoing transaction not found")
731 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
734 sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
736 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
737 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
742 //-------------------------------------------------------------------
744 //-------------------------------------------------------------------
745 func typeofSubsMessage(v interface{}) string {
750 case *e2ap.E2APSubscriptionRequest:
752 case *e2ap.E2APSubscriptionResponse:
754 case *e2ap.E2APSubscriptionFailure:
756 case *e2ap.E2APSubscriptionDeleteRequest:
758 case *e2ap.E2APSubscriptionDeleteResponse:
760 case *e2ap.E2APSubscriptionDeleteFailure:
767 //-------------------------------------------------------------------
769 //-------------------------------------------------------------------
770 func (c *Control) WriteSubscriptionToDb(subs *Subscription) {
771 xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
772 err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
774 xapp.Logger.Error("%v", err)
778 //-------------------------------------------------------------------
780 //-------------------------------------------------------------------
781 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) {
783 if removeSubscriptionFromDb == true {
784 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
785 c.RemoveSubscriptionFromDb(subs)
787 // Update is needed for successful response and merge case here
788 if subs.RetryFromXapp == false {
789 c.WriteSubscriptionToDb(subs)
792 subs.RetryFromXapp = false
795 //-------------------------------------------------------------------
797 //-------------------------------------------------------------------
798 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
799 xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
800 err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
802 xapp.Logger.Error("%v", err)
806 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) {
808 xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
810 // Send delete for every endpoint in the subscription
811 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
812 subDelReqMsg.RequestId = subs.GetReqId().RequestId
813 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
814 mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
816 xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
819 for _, endPoint := range subs.EpList.Endpoints {
820 params := &xapp.RMRParams{}
822 params.SubId = int(subs.GetReqId().InstanceId)
824 params.Meid = subs.Meid
825 params.Src = endPoint.String()
826 params.PayloadLen = len(payload.Buf)
827 params.Payload = payload.Buf
831 xapp.Logger.Error("SendSubscriptionDeleteReq() params == nil")
835 subs.DeleteFromDb = true
836 c.handleXAPPSubscriptionDeleteRequest(params)