2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
30 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
31 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
32 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
33 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
34 httptransport "github.com/go-openapi/runtime/client"
35 "github.com/go-openapi/strfmt"
36 "github.com/gorilla/mux"
37 "github.com/spf13/viper"
40 //-----------------------------------------------------------------------------
42 //-----------------------------------------------------------------------------
44 func idstring(err error, entries ...fmt.Stringer) string {
45 var retval string = ""
46 var filler string = ""
47 for _, entry := range entries {
48 retval += filler + entry.String()
52 retval += filler + "err(" + err.Error() + ")"
59 //-----------------------------------------------------------------------------
61 //-----------------------------------------------------------------------------
63 var e2tSubReqTimeout time.Duration
64 var e2tSubDelReqTime time.Duration
65 var e2tRecvMsgTimeout time.Duration
66 var e2tMaxSubReqTryCount uint64 // Initial try + retry
67 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
68 var readSubsFromDb string
76 //subscriber *xapp.Subscriber
87 type SubmgrRestartTestEvent struct{}
88 type SubmgrRestartUpEvent struct{}
91 xapp.Logger.Info("SUBMGR")
93 viper.SetEnvPrefix("submgr")
94 viper.AllowEmptyEnv(true)
97 func NewControl() *Control {
99 ReadConfigParameters()
100 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
101 rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
103 registry := new(Registry)
104 registry.Initialize()
105 registry.rtmgrClient = &rtmgrClient
107 tracker := new(Tracker)
110 //subscriber := xapp.NewSubscriber(viper.GetString("subscription.host"), viper.GetInt("subscription.timeout"))
112 c := &Control{e2ap: new(E2ap),
116 //subscriber: subscriber,
119 // Register REST handler for testing support
120 xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
122 go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler, c.SubscriptionDeleteHandler)
123 //go c.subscriber.Listen(c.SubscriptionHandler, c.QueryHandler)
125 if readSubsFromDb == "false" {
129 // Read subscriptions from db
130 xapp.Logger.Info("Reading subscriptions from db")
131 subIds, register, err := c.ReadAllSubscriptionsFromSdl()
133 xapp.Logger.Error("%v", err)
135 c.registry.subIds = subIds
136 c.registry.register = register
137 c.HandleUncompletedSubscriptions(register)
142 //-------------------------------------------------------------------
144 //-------------------------------------------------------------------
145 func ReadConfigParameters() {
147 // viper.GetDuration returns nanoseconds
148 e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
149 if e2tSubReqTimeout == 0 {
150 e2tSubReqTimeout = 2000 * 1000000
152 xapp.Logger.Info("e2tSubReqTimeout %v", e2tSubReqTimeout)
153 e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
154 if e2tSubDelReqTime == 0 {
155 e2tSubDelReqTime = 2000 * 1000000
157 xapp.Logger.Info("e2tSubDelReqTime %v", e2tSubDelReqTime)
158 e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
159 if e2tRecvMsgTimeout == 0 {
160 e2tRecvMsgTimeout = 2000 * 1000000
162 xapp.Logger.Info("e2tRecvMsgTimeout %v", e2tRecvMsgTimeout)
163 e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
164 if e2tMaxSubReqTryCount == 0 {
165 e2tMaxSubReqTryCount = 1
167 xapp.Logger.Info("e2tMaxSubReqTryCount %v", e2tMaxSubReqTryCount)
168 e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
169 if e2tMaxSubDelReqTryCount == 0 {
170 e2tMaxSubDelReqTryCount = 1
172 xapp.Logger.Info("e2tMaxSubDelReqTryCount %v", e2tMaxSubDelReqTryCount)
174 readSubsFromDb = viper.GetString("controls.readSubsFromDb")
175 if readSubsFromDb == "" {
176 readSubsFromDb = "true"
178 xapp.Logger.Info("readSubsFromDb %v", readSubsFromDb)
181 //-------------------------------------------------------------------
183 //-------------------------------------------------------------------
184 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
186 xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
187 for subId, subs := range register {
188 if subs.SubRespRcvd == false {
189 subs.NoRespToXapp = true
190 xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
191 c.SendSubscriptionDeleteReq(subs)
196 func (c *Control) ReadyCB(data interface{}) {
197 if c.RMRClient == nil {
198 c.RMRClient = xapp.Rmr
202 func (c *Control) Run() {
203 xapp.SetReadyCB(c.ReadyCB, nil)
207 //-------------------------------------------------------------------
209 //-------------------------------------------------------------------
210 func (c *Control) SubscriptionHandler(stype models.SubscriptionType, params interface{}) (*models.SubscriptionResponse, error) {
212 switch p := params.(type) {
213 case *models.ReportParams:
214 trans := c.tracker.NewXappTransaction(NewRmrEndpoint(p.ClientEndpoint),"" , 0, &xapp.RMRMeid{RanName: p.Meid})
216 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
219 defer trans.Release()
220 case *models.ControlParams:
221 case *models.PolicyParams:
224 return &models.SubscriptionResponse{}, fmt.Errorf("Subscription rest interface not implemented")
227 func (c *Control) SubscriptionDeleteHandler(s string) error {
231 func (c *Control) QueryHandler() (models.SubscriptionList, error) {
232 return c.registry.QueryHandler()
235 func (c *Control) TestRestHandler(w http.ResponseWriter, r *http.Request) {
237 xapp.Logger.Info("TestRestHandler() called")
239 pathParams := mux.Vars(r)
240 s := pathParams["testId"]
242 // This can be used to delete single subscription from db
243 if contains := strings.Contains(s, "deletesubid="); contains == true {
244 var splits = strings.Split(s, "=")
245 if subId, err := strconv.ParseInt(splits[1], 10, 64); err == nil {
246 xapp.Logger.Info("RemoveSubscriptionFromSdl() called. subId = %v", subId)
247 c.RemoveSubscriptionFromSdl(uint32(subId))
252 // This can be used to remove all subscriptions db from
254 xapp.Logger.Info("RemoveAllSubscriptionsFromSdl() called")
255 c.RemoveAllSubscriptionsFromSdl()
259 // This is meant to cause submgr's restart in testing
261 xapp.Logger.Info("os.Exit(1) called")
265 xapp.Logger.Info("Unsupported rest command received %s", s)
268 //-------------------------------------------------------------------
270 //-------------------------------------------------------------------
272 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
273 params := &xapp.RMRParams{}
274 params.Mtype = trans.GetMtype()
275 params.SubId = int(subs.GetReqId().InstanceId)
277 params.Meid = subs.GetMeid()
279 params.PayloadLen = len(trans.Payload.Buf)
280 params.Payload = trans.Payload.Buf
282 xapp.Logger.Info("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
283 return c.SendWithRetry(params, false, 5)
286 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
288 params := &xapp.RMRParams{}
289 params.Mtype = trans.GetMtype()
290 params.SubId = int(subs.GetReqId().InstanceId)
291 params.Xid = trans.GetXid()
292 params.Meid = trans.GetMeid()
294 params.PayloadLen = len(trans.Payload.Buf)
295 params.Payload = trans.Payload.Buf
297 xapp.Logger.Info("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
298 return c.SendWithRetry(params, false, 5)
301 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
302 if c.RMRClient == nil {
303 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
304 xapp.Logger.Error("%s", err.Error())
309 defer c.RMRClient.Free(msg.Mbuf)
311 // xapp-frame might use direct access to c buffer and
312 // when msg.Mbuf is freed, someone might take it into use
313 // and payload data might be invalid inside message handle function
315 // subscriptions won't load system a lot so there is no
316 // real performance hit by cloning buffer into new go byte slice
317 cPay := append(msg.Payload[:0:0], msg.Payload...)
319 msg.PayloadLen = len(cPay)
322 case xapp.RIC_SUB_REQ:
323 go c.handleXAPPSubscriptionRequest(msg)
324 case xapp.RIC_SUB_RESP:
325 go c.handleE2TSubscriptionResponse(msg)
326 case xapp.RIC_SUB_FAILURE:
327 go c.handleE2TSubscriptionFailure(msg)
328 case xapp.RIC_SUB_DEL_REQ:
329 go c.handleXAPPSubscriptionDeleteRequest(msg)
330 case xapp.RIC_SUB_DEL_RESP:
331 go c.handleE2TSubscriptionDeleteResponse(msg)
332 case xapp.RIC_SUB_DEL_FAILURE:
333 go c.handleE2TSubscriptionDeleteFailure(msg)
335 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
340 //-------------------------------------------------------------------
341 // handle from XAPP Subscription Request
342 //------------------------------------------------------------------
343 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
344 xapp.Logger.Info("MSG from XAPP: %s", params.String())
346 subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
348 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
352 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId.InstanceId, params.Meid)
354 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
357 defer trans.Release()
359 err = c.tracker.Track(trans)
361 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
365 //TODO handle subscription toward e2term inside AssignToSubscription / hide handleSubscriptionCreate in it?
366 subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag)
368 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
375 go c.handleSubscriptionCreate(subs, trans)
376 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
379 switch themsg := event.(type) {
380 case *e2ap.E2APSubscriptionResponse:
381 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
384 c.rmrSendToXapp("", subs, trans)
387 case *e2ap.E2APSubscriptionFailure:
388 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
390 c.rmrSendToXapp("", subs, trans)
396 xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(err, trans, subs))
397 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
400 //-------------------------------------------------------------------
401 // handle from XAPP Subscription Delete Request
402 //------------------------------------------------------------------
403 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
404 xapp.Logger.Info("MSG from XAPP: %s", params.String())
406 subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
408 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
412 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId.InstanceId, params.Meid)
414 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
417 defer trans.Release()
419 err = c.tracker.Track(trans)
421 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
425 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
427 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
434 go c.handleSubscriptionDelete(subs, trans)
435 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
437 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
439 if subs.NoRespToXapp == true {
440 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
444 // Whatever is received success, fail or timeout, send successful delete response
445 subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
446 subDelRespMsg.RequestId = subs.GetReqId().RequestId
447 subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
448 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
450 c.rmrSendToXapp("", subs, trans)
453 //TODO handle subscription toward e2term insiged RemoveFromSubscription / hide handleSubscriptionDelete in it?
454 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
457 //-------------------------------------------------------------------
458 // SUBS CREATE Handling
459 //-------------------------------------------------------------------
460 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp) {
462 var removeSubscriptionFromDb bool = false
463 trans := c.tracker.NewSubsTransaction(subs)
464 subs.WaitTransactionTurn(trans)
465 defer subs.ReleaseTransactionTurn(trans)
466 defer trans.Release()
468 xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
470 subRfMsg, valid := subs.GetCachedResponse()
471 if subRfMsg == nil && valid == true {
472 event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
473 switch event.(type) {
474 case *e2ap.E2APSubscriptionResponse:
475 subRfMsg, valid = subs.SetCachedResponse(event, true)
476 subs.SubRespRcvd = true
477 case *e2ap.E2APSubscriptionFailure:
478 removeSubscriptionFromDb = true
479 subRfMsg, valid = subs.SetCachedResponse(event, false)
480 xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
481 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
482 case *SubmgrRestartTestEvent:
483 // This simulates that no response has been received and after restart subscriptions are restored from db
484 xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
487 xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
488 removeSubscriptionFromDb = true
489 subRfMsg, valid = subs.SetCachedResponse(nil, false)
490 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
492 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
494 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
497 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
499 c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second, c)
502 c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
503 parentTrans.SendEvent(subRfMsg, 0)
506 //-------------------------------------------------------------------
507 // SUBS DELETE Handling
508 //-------------------------------------------------------------------
510 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
512 trans := c.tracker.NewSubsTransaction(subs)
513 subs.WaitTransactionTurn(trans)
514 defer subs.ReleaseTransactionTurn(trans)
515 defer trans.Release()
517 xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
520 if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
523 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
527 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
528 // If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if
529 // RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...))
530 c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second, c)
531 c.registry.UpdateSubscriptionToDb(subs, c)
532 parentTrans.SendEvent(nil, 0)
535 //-------------------------------------------------------------------
536 // send to E2T Subscription Request
537 //-------------------------------------------------------------------
538 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
540 var event interface{} = nil
541 var timedOut bool = false
543 subReqMsg := subs.SubReqMsg
544 subReqMsg.RequestId = subs.GetReqId().RequestId
545 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
547 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
551 // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
552 c.WriteSubscriptionToDb(subs)
553 for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
554 desc := fmt.Sprintf("(retry %d)", retries)
555 c.rmrSendToE2T(desc, subs, trans)
556 if subs.DoNotWaitSubResp == false {
557 event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
562 // Simulating case where subscrition request has been sent but response has not been received before restart
563 event = &SubmgrRestartTestEvent{}
567 xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
571 //-------------------------------------------------------------------
572 // send to E2T Subscription Delete Request
573 //-------------------------------------------------------------------
575 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
577 var event interface{}
580 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
581 subDelReqMsg.RequestId = subs.GetReqId().RequestId
582 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
583 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
585 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
589 for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
590 desc := fmt.Sprintf("(retry %d)", retries)
591 c.rmrSendToE2T(desc, subs, trans)
592 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
598 xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
602 //-------------------------------------------------------------------
603 // handle from E2T Subscription Reponse
604 //-------------------------------------------------------------------
605 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
606 xapp.Logger.Info("MSG from E2T: %s", params.String())
607 subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
609 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
612 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
614 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
617 trans := subs.GetTransaction()
619 err = fmt.Errorf("Ongoing transaction not found")
620 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
623 sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
625 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
626 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
631 //-------------------------------------------------------------------
632 // handle from E2T Subscription Failure
633 //-------------------------------------------------------------------
634 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
635 xapp.Logger.Info("MSG from E2T: %s", params.String())
636 subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
638 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
641 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
643 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
646 trans := subs.GetTransaction()
648 err = fmt.Errorf("Ongoing transaction not found")
649 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
652 sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
654 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
655 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
660 //-------------------------------------------------------------------
661 // handle from E2T Subscription Delete Response
662 //-------------------------------------------------------------------
663 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
664 xapp.Logger.Info("MSG from E2T: %s", params.String())
665 subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
667 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
670 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
672 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
675 trans := subs.GetTransaction()
677 err = fmt.Errorf("Ongoing transaction not found")
678 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
681 sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
683 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
684 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
689 //-------------------------------------------------------------------
690 // handle from E2T Subscription Delete Failure
691 //-------------------------------------------------------------------
692 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
693 xapp.Logger.Info("MSG from E2T: %s", params.String())
694 subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
696 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
699 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
701 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
704 trans := subs.GetTransaction()
706 err = fmt.Errorf("Ongoing transaction not found")
707 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
710 sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
712 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
713 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
718 //-------------------------------------------------------------------
720 //-------------------------------------------------------------------
721 func typeofSubsMessage(v interface{}) string {
726 case *e2ap.E2APSubscriptionRequest:
728 case *e2ap.E2APSubscriptionResponse:
730 case *e2ap.E2APSubscriptionFailure:
732 case *e2ap.E2APSubscriptionDeleteRequest:
734 case *e2ap.E2APSubscriptionDeleteResponse:
736 case *e2ap.E2APSubscriptionDeleteFailure:
743 //-------------------------------------------------------------------
745 //-------------------------------------------------------------------
746 func (c *Control) WriteSubscriptionToDb(subs *Subscription) {
747 xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
748 err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
750 xapp.Logger.Error("%v", err)
754 //-------------------------------------------------------------------
756 //-------------------------------------------------------------------
757 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) {
759 if removeSubscriptionFromDb == true {
760 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
761 c.RemoveSubscriptionFromDb(subs)
763 // Update is needed for successful response and merge case here
764 if subs.RetryFromXapp == false {
765 c.WriteSubscriptionToDb(subs)
768 subs.RetryFromXapp = false
771 //-------------------------------------------------------------------
773 //-------------------------------------------------------------------
774 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
775 xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
776 err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
778 xapp.Logger.Error("%v", err)
782 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) {
784 xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
786 // Send delete for every endpoint in the subscription
787 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
788 subDelReqMsg.RequestId = subs.GetReqId().RequestId
789 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
790 mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
792 xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
795 for _, endPoint := range subs.EpList.Endpoints {
796 params := &xapp.RMRParams{}
798 params.SubId = int(subs.GetReqId().InstanceId)
800 params.Meid = subs.Meid
801 params.Src = endPoint.String()
802 params.PayloadLen = len(payload.Buf)
803 params.Payload = payload.Buf
807 xapp.Logger.Error("SendSubscriptionDeleteReq() params == nil")
811 subs.DeleteFromDb = true
812 c.handleXAPPSubscriptionDeleteRequest(params)