2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
30 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
31 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
32 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
33 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
34 httptransport "github.com/go-openapi/runtime/client"
35 "github.com/go-openapi/strfmt"
36 "github.com/gorilla/mux"
37 "github.com/spf13/viper"
40 //-----------------------------------------------------------------------------
42 //-----------------------------------------------------------------------------
44 func idstring(err error, entries ...fmt.Stringer) string {
45 var retval string = ""
46 var filler string = ""
47 for _, entry := range entries {
48 retval += filler + entry.String()
52 retval += filler + "err(" + err.Error() + ")"
58 //-----------------------------------------------------------------------------
60 //-----------------------------------------------------------------------------
62 var e2tSubReqTimeout time.Duration
63 var e2tSubDelReqTime time.Duration
64 var e2tRecvMsgTimeout time.Duration
65 var e2tMaxSubReqTryCount uint64 // Initial try + retry
66 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
67 var readSubsFromDb string
75 //subscriber *xapp.Subscriber
78 Counters map[string]xapp.Counter
87 type SubmgrRestartTestEvent struct{}
88 type SubmgrRestartUpEvent struct{}
91 xapp.Logger.Info("SUBMGR")
93 viper.SetEnvPrefix("submgr")
94 viper.AllowEmptyEnv(true)
97 func NewControl() *Control {
99 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
100 rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
102 registry := new(Registry)
103 registry.Initialize()
104 registry.rtmgrClient = &rtmgrClient
106 tracker := new(Tracker)
109 c := &Control{e2ap: new(E2ap),
113 //subscriber: subscriber,
114 Counters: xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
116 c.ReadConfigParameters("")
118 // Register REST handler for testing support
119 xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
121 go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler, c.SubscriptionDeleteHandler)
123 if readSubsFromDb == "false" {
127 // Read subscriptions from db
128 xapp.Logger.Info("Reading subscriptions from db")
129 subIds, register, err := c.ReadAllSubscriptionsFromSdl()
131 xapp.Logger.Error("%v", err)
133 c.registry.subIds = subIds
134 c.registry.register = register
135 c.HandleUncompletedSubscriptions(register)
140 //-------------------------------------------------------------------
142 //-------------------------------------------------------------------
143 func (c *Control) ReadConfigParameters(f string) {
145 // viper.GetDuration returns nanoseconds
146 e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
147 if e2tSubReqTimeout == 0 {
148 e2tSubReqTimeout = 2000 * 1000000
150 xapp.Logger.Info("e2tSubReqTimeout %v", e2tSubReqTimeout)
151 e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
152 if e2tSubDelReqTime == 0 {
153 e2tSubDelReqTime = 2000 * 1000000
155 xapp.Logger.Info("e2tSubDelReqTime %v", e2tSubDelReqTime)
156 e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
157 if e2tRecvMsgTimeout == 0 {
158 e2tRecvMsgTimeout = 2000 * 1000000
160 xapp.Logger.Info("e2tRecvMsgTimeout %v", e2tRecvMsgTimeout)
161 e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
162 if e2tMaxSubReqTryCount == 0 {
163 e2tMaxSubReqTryCount = 1
165 xapp.Logger.Info("e2tMaxSubReqTryCount %v", e2tMaxSubReqTryCount)
166 e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
167 if e2tMaxSubDelReqTryCount == 0 {
168 e2tMaxSubDelReqTryCount = 1
170 xapp.Logger.Info("e2tMaxSubDelReqTryCount %v", e2tMaxSubDelReqTryCount)
172 readSubsFromDb = viper.GetString("controls.readSubsFromDb")
173 if readSubsFromDb == "" {
174 readSubsFromDb = "true"
176 xapp.Logger.Info("readSubsFromDb %v", readSubsFromDb)
179 //-------------------------------------------------------------------
181 //-------------------------------------------------------------------
182 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
184 xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
185 for subId, subs := range register {
186 if subs.SubRespRcvd == false {
187 subs.NoRespToXapp = true
188 xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
189 c.SendSubscriptionDeleteReq(subs)
194 func (c *Control) ReadyCB(data interface{}) {
195 if c.RMRClient == nil {
196 c.RMRClient = xapp.Rmr
200 func (c *Control) Run() {
201 xapp.SetReadyCB(c.ReadyCB, nil)
202 xapp.AddConfigChangeListener(c.ReadConfigParameters)
206 //-------------------------------------------------------------------
208 //-------------------------------------------------------------------
209 func (c *Control) SubscriptionHandler(stype models.SubscriptionType, params interface{}) (*models.SubscriptionResponse, error) {
211 switch p := params.(type) {
212 case *models.ReportParams:
213 trans := c.tracker.NewXappTransaction(NewRmrEndpoint(p.ClientEndpoint),"" , 0, &xapp.RMRMeid{RanName: p.Meid})
215 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
218 defer trans.Release()
219 case *models.ControlParams:
220 case *models.PolicyParams:
223 return &models.SubscriptionResponse{}, fmt.Errorf("Subscription rest interface not implemented")
226 func (c *Control) SubscriptionDeleteHandler(s string) error {
230 func (c *Control) QueryHandler() (models.SubscriptionList, error) {
231 xapp.Logger.Info("QueryHandler() called")
233 return c.registry.QueryHandler()
236 func (c *Control) TestRestHandler(w http.ResponseWriter, r *http.Request) {
237 xapp.Logger.Info("TestRestHandler() called")
239 pathParams := mux.Vars(r)
240 s := pathParams["testId"]
242 // This can be used to delete single subscription from db
243 if contains := strings.Contains(s, "deletesubid="); contains == true {
244 var splits = strings.Split(s, "=")
245 if subId, err := strconv.ParseInt(splits[1], 10, 64); err == nil {
246 xapp.Logger.Info("RemoveSubscriptionFromSdl() called. subId = %v", subId)
247 c.RemoveSubscriptionFromSdl(uint32(subId))
252 // This can be used to remove all subscriptions db from
254 xapp.Logger.Info("RemoveAllSubscriptionsFromSdl() called")
255 c.RemoveAllSubscriptionsFromSdl()
259 // This is meant to cause submgr's restart in testing
261 xapp.Logger.Info("os.Exit(1) called")
265 xapp.Logger.Info("Unsupported rest command received %s", s)
268 //-------------------------------------------------------------------
270 //-------------------------------------------------------------------
272 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
273 params := &xapp.RMRParams{}
274 params.Mtype = trans.GetMtype()
275 params.SubId = int(subs.GetReqId().InstanceId)
277 params.Meid = subs.GetMeid()
279 params.PayloadLen = len(trans.Payload.Buf)
280 params.Payload = trans.Payload.Buf
282 xapp.Logger.Info("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
283 return c.SendWithRetry(params, false, 5)
286 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
288 params := &xapp.RMRParams{}
289 params.Mtype = trans.GetMtype()
290 params.SubId = int(subs.GetReqId().InstanceId)
291 params.Xid = trans.GetXid()
292 params.Meid = trans.GetMeid()
294 params.PayloadLen = len(trans.Payload.Buf)
295 params.Payload = trans.Payload.Buf
297 xapp.Logger.Info("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
298 return c.SendWithRetry(params, false, 5)
301 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
302 if c.RMRClient == nil {
303 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
304 xapp.Logger.Error("%s", err.Error())
309 defer c.RMRClient.Free(msg.Mbuf)
311 // xapp-frame might use direct access to c buffer and
312 // when msg.Mbuf is freed, someone might take it into use
313 // and payload data might be invalid inside message handle function
315 // subscriptions won't load system a lot so there is no
316 // real performance hit by cloning buffer into new go byte slice
317 cPay := append(msg.Payload[:0:0], msg.Payload...)
319 msg.PayloadLen = len(cPay)
322 case xapp.RIC_SUB_REQ:
323 go c.handleXAPPSubscriptionRequest(msg)
324 case xapp.RIC_SUB_RESP:
325 go c.handleE2TSubscriptionResponse(msg)
326 case xapp.RIC_SUB_FAILURE:
327 go c.handleE2TSubscriptionFailure(msg)
328 case xapp.RIC_SUB_DEL_REQ:
329 go c.handleXAPPSubscriptionDeleteRequest(msg)
330 case xapp.RIC_SUB_DEL_RESP:
331 go c.handleE2TSubscriptionDeleteResponse(msg)
332 case xapp.RIC_SUB_DEL_FAILURE:
333 go c.handleE2TSubscriptionDeleteFailure(msg)
335 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
340 //-------------------------------------------------------------------
341 // handle from XAPP Subscription Request
342 //------------------------------------------------------------------
343 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
344 xapp.Logger.Info("MSG from XAPP: %s", params.String())
345 c.UpdateCounter(cSubReqFromXapp)
347 subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
349 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
353 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
355 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
358 defer trans.Release()
360 if err = c.tracker.Track(trans); err != nil {
361 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
365 //TODO handle subscription toward e2term inside AssignToSubscription / hide handleSubscriptionCreate in it?
366 subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
368 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
372 c.wakeSubscriptionRequest(subs, trans)
375 //-------------------------------------------------------------------
376 // Wake Subscription Request to E2node
377 //------------------------------------------------------------------
378 func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) {
380 go c.handleSubscriptionCreate(subs, trans)
381 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
384 switch themsg := event.(type) {
385 case *e2ap.E2APSubscriptionResponse:
386 themsg.RequestId.Id = trans.RequestId.Id
387 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
390 c.UpdateCounter(cSubRespToXapp)
391 c.rmrSendToXapp("", subs, trans)
394 case *e2ap.E2APSubscriptionFailure:
395 themsg.RequestId.Id = trans.RequestId.Id
396 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
398 c.UpdateCounter(cSubFailToXapp)
399 c.rmrSendToXapp("", subs, trans)
405 xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(err, trans, subs))
406 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
409 //-------------------------------------------------------------------
410 // handle from XAPP Subscription Delete Request
411 //------------------------------------------------------------------
412 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
413 xapp.Logger.Info("MSG from XAPP: %s", params.String())
414 c.UpdateCounter(cSubDelReqFromXapp)
416 subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
418 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
422 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
424 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
427 defer trans.Release()
429 err = c.tracker.Track(trans)
431 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
435 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
437 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
444 go c.handleSubscriptionDelete(subs, trans)
445 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
447 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
449 if subs.NoRespToXapp == true {
450 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
454 // Whatever is received success, fail or timeout, send successful delete response
455 subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
456 subDelRespMsg.RequestId.Id = trans.RequestId.Id
457 subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
458 subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
459 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
461 c.UpdateCounter(cSubDelRespToXapp)
462 c.rmrSendToXapp("", subs, trans)
465 //TODO handle subscription toward e2term insiged RemoveFromSubscription / hide handleSubscriptionDelete in it?
466 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
469 //-------------------------------------------------------------------
470 // SUBS CREATE Handling
471 //-------------------------------------------------------------------
472 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp) {
474 var removeSubscriptionFromDb bool = false
475 trans := c.tracker.NewSubsTransaction(subs)
476 subs.WaitTransactionTurn(trans)
477 defer subs.ReleaseTransactionTurn(trans)
478 defer trans.Release()
480 xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
482 subRfMsg, valid := subs.GetCachedResponse()
483 if subRfMsg == nil && valid == true {
484 event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
485 switch event.(type) {
486 case *e2ap.E2APSubscriptionResponse:
487 subRfMsg, valid = subs.SetCachedResponse(event, true)
488 subs.SubRespRcvd = true
489 case *e2ap.E2APSubscriptionFailure:
490 removeSubscriptionFromDb = true
491 subRfMsg, valid = subs.SetCachedResponse(event, false)
492 xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
493 case *SubmgrRestartTestEvent:
494 // This simulates that no response has been received and after restart subscriptions are restored from db
495 xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
498 xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
499 removeSubscriptionFromDb = true
500 subRfMsg, valid = subs.SetCachedResponse(nil, false)
501 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
503 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
505 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
508 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
510 c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second, c)
513 c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
514 parentTrans.SendEvent(subRfMsg, 0)
517 //-------------------------------------------------------------------
518 // SUBS DELETE Handling
519 //-------------------------------------------------------------------
521 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
523 trans := c.tracker.NewSubsTransaction(subs)
524 subs.WaitTransactionTurn(trans)
525 defer subs.ReleaseTransactionTurn(trans)
526 defer trans.Release()
528 xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
532 if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
535 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
539 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
540 // If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if
541 // RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...))
542 c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second, c)
543 c.registry.UpdateSubscriptionToDb(subs, c)
544 parentTrans.SendEvent(nil, 0)
547 //-------------------------------------------------------------------
548 // send to E2T Subscription Request
549 //-------------------------------------------------------------------
550 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
552 var event interface{} = nil
553 var timedOut bool = false
555 subReqMsg := subs.SubReqMsg
556 subReqMsg.RequestId = subs.GetReqId().RequestId
557 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
559 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
563 // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
564 c.WriteSubscriptionToDb(subs)
565 for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
566 desc := fmt.Sprintf("(retry %d)", retries)
568 c.UpdateCounter(cSubReqToE2)
570 c.UpdateCounter(cSubReReqToE2)
572 c.rmrSendToE2T(desc, subs, trans)
573 if subs.DoNotWaitSubResp == false {
574 event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
576 c.UpdateCounter(cSubReqTimerExpiry)
580 // Simulating case where subscrition request has been sent but response has not been received before restart
581 event = &SubmgrRestartTestEvent{}
585 xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
589 //-------------------------------------------------------------------
590 // send to E2T Subscription Delete Request
591 //-------------------------------------------------------------------
593 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
595 var event interface{}
598 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
599 subDelReqMsg.RequestId = subs.GetReqId().RequestId
600 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
601 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
603 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
607 for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
608 desc := fmt.Sprintf("(retry %d)", retries)
610 c.UpdateCounter(cSubDelReqToE2)
612 c.UpdateCounter(cSubDelReReqToE2)
614 c.rmrSendToE2T(desc, subs, trans)
615 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
617 c.UpdateCounter(cSubDelReqTimerExpiry)
622 xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
626 //-------------------------------------------------------------------
627 // handle from E2T Subscription Response
628 //-------------------------------------------------------------------
629 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
630 xapp.Logger.Info("MSG from E2T: %s", params.String())
631 c.UpdateCounter(cSubRespFromE2)
632 subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
634 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
637 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
639 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
642 trans := subs.GetTransaction()
644 err = fmt.Errorf("Ongoing transaction not found")
645 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
648 sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
650 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
651 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
656 //-------------------------------------------------------------------
657 // handle from E2T Subscription Failure
658 //-------------------------------------------------------------------
659 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
660 xapp.Logger.Info("MSG from E2T: %s", params.String())
661 c.UpdateCounter(cSubFailFromE2)
662 subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
664 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
667 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
669 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
672 trans := subs.GetTransaction()
674 err = fmt.Errorf("Ongoing transaction not found")
675 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
678 sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
680 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
681 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
686 //-------------------------------------------------------------------
687 // handle from E2T Subscription Delete Response
688 //-------------------------------------------------------------------
689 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
690 xapp.Logger.Info("MSG from E2T: %s", params.String())
691 c.UpdateCounter(cSubDelRespFromE2)
692 subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
694 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
697 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
699 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
702 trans := subs.GetTransaction()
704 err = fmt.Errorf("Ongoing transaction not found")
705 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
708 sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
710 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
711 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
716 //-------------------------------------------------------------------
717 // handle from E2T Subscription Delete Failure
718 //-------------------------------------------------------------------
719 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
720 xapp.Logger.Info("MSG from E2T: %s", params.String())
721 c.UpdateCounter(cSubDelFailFromE2)
722 subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
724 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
727 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
729 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
732 trans := subs.GetTransaction()
734 err = fmt.Errorf("Ongoing transaction not found")
735 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
738 sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
740 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
741 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
746 //-------------------------------------------------------------------
748 //-------------------------------------------------------------------
749 func typeofSubsMessage(v interface{}) string {
754 case *e2ap.E2APSubscriptionRequest:
756 case *e2ap.E2APSubscriptionResponse:
758 case *e2ap.E2APSubscriptionFailure:
760 case *e2ap.E2APSubscriptionDeleteRequest:
762 case *e2ap.E2APSubscriptionDeleteResponse:
764 case *e2ap.E2APSubscriptionDeleteFailure:
771 //-------------------------------------------------------------------
773 //-------------------------------------------------------------------
774 func (c *Control) WriteSubscriptionToDb(subs *Subscription) {
775 xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
776 err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
778 xapp.Logger.Error("%v", err)
782 //-------------------------------------------------------------------
784 //-------------------------------------------------------------------
785 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) {
787 if removeSubscriptionFromDb == true {
788 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
789 c.RemoveSubscriptionFromDb(subs)
791 // Update is needed for successful response and merge case here
792 if subs.RetryFromXapp == false {
793 c.WriteSubscriptionToDb(subs)
796 subs.RetryFromXapp = false
799 //-------------------------------------------------------------------
801 //-------------------------------------------------------------------
802 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
803 xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
804 err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
806 xapp.Logger.Error("%v", err)
810 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) {
812 xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
814 // Send delete for every endpoint in the subscription
815 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
816 subDelReqMsg.RequestId = subs.GetReqId().RequestId
817 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
818 mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
820 xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
823 for _, endPoint := range subs.EpList.Endpoints {
824 params := &xapp.RMRParams{}
826 params.SubId = int(subs.GetReqId().InstanceId)
828 params.Meid = subs.Meid
829 params.Src = endPoint.String()
830 params.PayloadLen = len(payload.Buf)
831 params.Payload = payload.Buf
835 xapp.Logger.Error("SendSubscriptionDeleteReq() params == nil")
839 subs.DeleteFromDb = true
840 c.handleXAPPSubscriptionDeleteRequest(params)