2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
30 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
31 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
32 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
33 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
34 httptransport "github.com/go-openapi/runtime/client"
35 "github.com/go-openapi/strfmt"
36 "github.com/gorilla/mux"
37 "github.com/spf13/viper"
40 //-----------------------------------------------------------------------------
42 //-----------------------------------------------------------------------------
44 func idstring(err error, entries ...fmt.Stringer) string {
45 var retval string = ""
46 var filler string = ""
47 for _, entry := range entries {
48 retval += filler + entry.String()
52 retval += filler + "err(" + err.Error() + ")"
59 //-----------------------------------------------------------------------------
61 //-----------------------------------------------------------------------------
63 var e2tSubReqTimeout time.Duration
64 var e2tSubDelReqTime time.Duration
65 var e2tRecvMsgTimeout time.Duration
66 var e2tMaxSubReqTryCount uint64 // Initial try + retry
67 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
68 var readSubsFromDb string
76 //subscriber *xapp.Subscriber
79 Counters map[string]xapp.Counter
88 type SubmgrRestartTestEvent struct{}
89 type SubmgrRestartUpEvent struct{}
92 xapp.Logger.Info("SUBMGR")
94 viper.SetEnvPrefix("submgr")
95 viper.AllowEmptyEnv(true)
98 func NewControl() *Control {
100 ReadConfigParameters()
101 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
102 rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
104 registry := new(Registry)
105 registry.Initialize()
106 registry.rtmgrClient = &rtmgrClient
108 tracker := new(Tracker)
111 //subscriber := xapp.NewSubscriber(viper.GetString("subscription.host"), viper.GetInt("subscription.timeout"))
113 c := &Control{e2ap: new(E2ap),
117 //subscriber: subscriber,
118 Counters: xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
121 // Register REST handler for testing support
122 xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
124 go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler, c.SubscriptionDeleteHandler)
125 //go c.subscriber.Listen(c.SubscriptionHandler, c.QueryHandler)
127 if readSubsFromDb == "false" {
131 // Read subscriptions from db
132 xapp.Logger.Info("Reading subscriptions from db")
133 subIds, register, err := c.ReadAllSubscriptionsFromSdl()
135 xapp.Logger.Error("%v", err)
137 c.registry.subIds = subIds
138 c.registry.register = register
139 c.HandleUncompletedSubscriptions(register)
144 //-------------------------------------------------------------------
146 //-------------------------------------------------------------------
147 func ReadConfigParameters() {
149 // viper.GetDuration returns nanoseconds
150 e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
151 if e2tSubReqTimeout == 0 {
152 e2tSubReqTimeout = 2000 * 1000000
154 xapp.Logger.Info("e2tSubReqTimeout %v", e2tSubReqTimeout)
155 e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
156 if e2tSubDelReqTime == 0 {
157 e2tSubDelReqTime = 2000 * 1000000
159 xapp.Logger.Info("e2tSubDelReqTime %v", e2tSubDelReqTime)
160 e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
161 if e2tRecvMsgTimeout == 0 {
162 e2tRecvMsgTimeout = 2000 * 1000000
164 xapp.Logger.Info("e2tRecvMsgTimeout %v", e2tRecvMsgTimeout)
165 e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
166 if e2tMaxSubReqTryCount == 0 {
167 e2tMaxSubReqTryCount = 1
169 xapp.Logger.Info("e2tMaxSubReqTryCount %v", e2tMaxSubReqTryCount)
170 e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
171 if e2tMaxSubDelReqTryCount == 0 {
172 e2tMaxSubDelReqTryCount = 1
174 xapp.Logger.Info("e2tMaxSubDelReqTryCount %v", e2tMaxSubDelReqTryCount)
176 readSubsFromDb = viper.GetString("controls.readSubsFromDb")
177 if readSubsFromDb == "" {
178 readSubsFromDb = "true"
180 xapp.Logger.Info("readSubsFromDb %v", readSubsFromDb)
183 //-------------------------------------------------------------------
185 //-------------------------------------------------------------------
186 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
188 xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
189 for subId, subs := range register {
190 if subs.SubRespRcvd == false {
191 subs.NoRespToXapp = true
192 xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
193 c.SendSubscriptionDeleteReq(subs)
198 func (c *Control) ReadyCB(data interface{}) {
199 if c.RMRClient == nil {
200 c.RMRClient = xapp.Rmr
204 func (c *Control) Run() {
205 xapp.SetReadyCB(c.ReadyCB, nil)
209 //-------------------------------------------------------------------
211 //-------------------------------------------------------------------
212 func (c *Control) SubscriptionHandler(stype models.SubscriptionType, params interface{}) (*models.SubscriptionResponse, error) {
214 switch p := params.(type) {
215 case *models.ReportParams:
216 trans := c.tracker.NewXappTransaction(NewRmrEndpoint(p.ClientEndpoint),"" , 0, &xapp.RMRMeid{RanName: p.Meid})
218 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
221 defer trans.Release()
222 case *models.ControlParams:
223 case *models.PolicyParams:
226 return &models.SubscriptionResponse{}, fmt.Errorf("Subscription rest interface not implemented")
229 func (c *Control) SubscriptionDeleteHandler(s string) error {
233 func (c *Control) QueryHandler() (models.SubscriptionList, error) {
234 xapp.Logger.Info("QueryHandler() called")
236 return c.registry.QueryHandler()
239 func (c *Control) TestRestHandler(w http.ResponseWriter, r *http.Request) {
240 xapp.Logger.Info("TestRestHandler() called")
242 pathParams := mux.Vars(r)
243 s := pathParams["testId"]
245 // This can be used to delete single subscription from db
246 if contains := strings.Contains(s, "deletesubid="); contains == true {
247 var splits = strings.Split(s, "=")
248 if subId, err := strconv.ParseInt(splits[1], 10, 64); err == nil {
249 xapp.Logger.Info("RemoveSubscriptionFromSdl() called. subId = %v", subId)
250 c.RemoveSubscriptionFromSdl(uint32(subId))
255 // This can be used to remove all subscriptions db from
257 xapp.Logger.Info("RemoveAllSubscriptionsFromSdl() called")
258 c.RemoveAllSubscriptionsFromSdl()
262 // This is meant to cause submgr's restart in testing
264 xapp.Logger.Info("os.Exit(1) called")
268 xapp.Logger.Info("Unsupported rest command received %s", s)
271 //-------------------------------------------------------------------
273 //-------------------------------------------------------------------
275 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
276 params := &xapp.RMRParams{}
277 params.Mtype = trans.GetMtype()
278 params.SubId = int(subs.GetReqId().InstanceId)
280 params.Meid = subs.GetMeid()
282 params.PayloadLen = len(trans.Payload.Buf)
283 params.Payload = trans.Payload.Buf
285 xapp.Logger.Info("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
286 return c.SendWithRetry(params, false, 5)
289 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
291 params := &xapp.RMRParams{}
292 params.Mtype = trans.GetMtype()
293 params.SubId = int(subs.GetReqId().InstanceId)
294 params.Xid = trans.GetXid()
295 params.Meid = trans.GetMeid()
297 params.PayloadLen = len(trans.Payload.Buf)
298 params.Payload = trans.Payload.Buf
300 xapp.Logger.Info("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
301 return c.SendWithRetry(params, false, 5)
304 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
305 if c.RMRClient == nil {
306 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
307 xapp.Logger.Error("%s", err.Error())
312 defer c.RMRClient.Free(msg.Mbuf)
314 // xapp-frame might use direct access to c buffer and
315 // when msg.Mbuf is freed, someone might take it into use
316 // and payload data might be invalid inside message handle function
318 // subscriptions won't load system a lot so there is no
319 // real performance hit by cloning buffer into new go byte slice
320 cPay := append(msg.Payload[:0:0], msg.Payload...)
322 msg.PayloadLen = len(cPay)
325 case xapp.RIC_SUB_REQ:
326 go c.handleXAPPSubscriptionRequest(msg)
327 case xapp.RIC_SUB_RESP:
328 go c.handleE2TSubscriptionResponse(msg)
329 case xapp.RIC_SUB_FAILURE:
330 go c.handleE2TSubscriptionFailure(msg)
331 case xapp.RIC_SUB_DEL_REQ:
332 go c.handleXAPPSubscriptionDeleteRequest(msg)
333 case xapp.RIC_SUB_DEL_RESP:
334 go c.handleE2TSubscriptionDeleteResponse(msg)
335 case xapp.RIC_SUB_DEL_FAILURE:
336 go c.handleE2TSubscriptionDeleteFailure(msg)
338 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
343 //-------------------------------------------------------------------
344 // handle from XAPP Subscription Request
345 //------------------------------------------------------------------
346 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
347 xapp.Logger.Info("MSG from XAPP: %s", params.String())
348 c.UpdateCounter(cSubReqFromXapp)
350 subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
352 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
356 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
358 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
361 defer trans.Release()
363 err = c.tracker.Track(trans)
365 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
369 //TODO handle subscription toward e2term inside AssignToSubscription / hide handleSubscriptionCreate in it?
370 subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
372 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
379 go c.handleSubscriptionCreate(subs, trans)
380 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
383 switch themsg := event.(type) {
384 case *e2ap.E2APSubscriptionResponse:
385 themsg.RequestId.Id = trans.RequestId.Id
386 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
389 c.UpdateCounter(cSubRespToXapp)
390 c.rmrSendToXapp("", subs, trans)
393 case *e2ap.E2APSubscriptionFailure:
394 themsg.RequestId.Id = trans.RequestId.Id
395 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
397 c.UpdateCounter(cSubFailToXapp)
398 c.rmrSendToXapp("", subs, trans)
404 xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(err, trans, subs))
405 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
408 //-------------------------------------------------------------------
409 // handle from XAPP Subscription Delete Request
410 //------------------------------------------------------------------
411 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
412 xapp.Logger.Info("MSG from XAPP: %s", params.String())
413 c.UpdateCounter(cSubDelReqFromXapp)
415 subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
417 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
421 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
423 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
426 defer trans.Release()
428 err = c.tracker.Track(trans)
430 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
434 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
436 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
443 go c.handleSubscriptionDelete(subs, trans)
444 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
446 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
448 if subs.NoRespToXapp == true {
449 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
453 // Whatever is received success, fail or timeout, send successful delete response
454 subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
455 subDelRespMsg.RequestId.Id = trans.RequestId.Id
456 subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
457 subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
458 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
460 c.UpdateCounter(cSubDelRespToXapp)
461 c.rmrSendToXapp("", subs, trans)
464 //TODO handle subscription toward e2term insiged RemoveFromSubscription / hide handleSubscriptionDelete in it?
465 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
468 //-------------------------------------------------------------------
469 // SUBS CREATE Handling
470 //-------------------------------------------------------------------
471 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp) {
473 var removeSubscriptionFromDb bool = false
474 trans := c.tracker.NewSubsTransaction(subs)
475 subs.WaitTransactionTurn(trans)
476 defer subs.ReleaseTransactionTurn(trans)
477 defer trans.Release()
479 xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
481 subRfMsg, valid := subs.GetCachedResponse()
482 if subRfMsg == nil && valid == true {
483 event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
484 switch event.(type) {
485 case *e2ap.E2APSubscriptionResponse:
486 subRfMsg, valid = subs.SetCachedResponse(event, true)
487 subs.SubRespRcvd = true
488 case *e2ap.E2APSubscriptionFailure:
489 removeSubscriptionFromDb = true
490 subRfMsg, valid = subs.SetCachedResponse(event, false)
491 xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
492 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
493 case *SubmgrRestartTestEvent:
494 // This simulates that no response has been received and after restart subscriptions are restored from db
495 xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
498 xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
499 removeSubscriptionFromDb = true
500 subRfMsg, valid = subs.SetCachedResponse(nil, false)
501 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
503 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
505 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
508 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
510 c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second, c)
513 c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
514 parentTrans.SendEvent(subRfMsg, 0)
517 //-------------------------------------------------------------------
518 // SUBS DELETE Handling
519 //-------------------------------------------------------------------
521 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
523 trans := c.tracker.NewSubsTransaction(subs)
524 subs.WaitTransactionTurn(trans)
525 defer subs.ReleaseTransactionTurn(trans)
526 defer trans.Release()
528 xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
532 if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
535 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
539 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
540 // If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if
541 // RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...))
542 c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second, c)
543 c.registry.UpdateSubscriptionToDb(subs, c)
544 parentTrans.SendEvent(nil, 0)
547 //-------------------------------------------------------------------
548 // send to E2T Subscription Request
549 //-------------------------------------------------------------------
550 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
552 var event interface{} = nil
553 var timedOut bool = false
555 subReqMsg := subs.SubReqMsg
556 subReqMsg.RequestId = subs.GetReqId().RequestId
557 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
559 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
563 // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
564 c.WriteSubscriptionToDb(subs)
565 for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
566 desc := fmt.Sprintf("(retry %d)", retries)
568 c.UpdateCounter(cSubReqToE2)
570 c.UpdateCounter(cSubReReqToE2)
572 c.rmrSendToE2T(desc, subs, trans)
573 if subs.DoNotWaitSubResp == false {
574 event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
576 c.UpdateCounter(cSubReqTimerExpiry)
580 // Simulating case where subscrition request has been sent but response has not been received before restart
581 event = &SubmgrRestartTestEvent{}
585 xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
589 //-------------------------------------------------------------------
590 // send to E2T Subscription Delete Request
591 //-------------------------------------------------------------------
593 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
595 var event interface{}
598 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
599 subDelReqMsg.RequestId = subs.GetReqId().RequestId
600 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
601 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
603 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
607 for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
608 desc := fmt.Sprintf("(retry %d)", retries)
610 c.UpdateCounter(cSubDelReqToE2)
612 c.UpdateCounter(cSubDelReReqToE2)
614 c.rmrSendToE2T(desc, subs, trans)
615 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
617 c.UpdateCounter(cSubDelReqTimerExpiry)
622 xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
626 //-------------------------------------------------------------------
627 // handle from E2T Subscription Response
628 //-------------------------------------------------------------------
629 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
630 xapp.Logger.Info("MSG from E2T: %s", params.String())
631 c.UpdateCounter(cSubRespFromE2)
632 subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
634 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
637 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
639 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
642 trans := subs.GetTransaction()
644 err = fmt.Errorf("Ongoing transaction not found")
645 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
648 sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
650 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
651 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
656 //-------------------------------------------------------------------
657 // handle from E2T Subscription Failure
658 //-------------------------------------------------------------------
659 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
660 xapp.Logger.Info("MSG from E2T: %s", params.String())
661 c.UpdateCounter(cSubFailFromE2)
662 subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
664 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
667 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
669 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
672 trans := subs.GetTransaction()
674 err = fmt.Errorf("Ongoing transaction not found")
675 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
678 sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
680 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
681 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
686 //-------------------------------------------------------------------
687 // handle from E2T Subscription Delete Response
688 //-------------------------------------------------------------------
689 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
690 xapp.Logger.Info("MSG from E2T: %s", params.String())
691 c.UpdateCounter(cSubDelRespFromE2)
692 subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
694 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
697 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
699 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
702 trans := subs.GetTransaction()
704 err = fmt.Errorf("Ongoing transaction not found")
705 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
708 sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
710 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
711 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
716 //-------------------------------------------------------------------
717 // handle from E2T Subscription Delete Failure
718 //-------------------------------------------------------------------
719 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
720 xapp.Logger.Info("MSG from E2T: %s", params.String())
721 c.UpdateCounter(cSubDelFailFromE2)
722 subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
724 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
727 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
729 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
732 trans := subs.GetTransaction()
734 err = fmt.Errorf("Ongoing transaction not found")
735 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
738 sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
740 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
741 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
746 //-------------------------------------------------------------------
748 //-------------------------------------------------------------------
749 func typeofSubsMessage(v interface{}) string {
754 case *e2ap.E2APSubscriptionRequest:
756 case *e2ap.E2APSubscriptionResponse:
758 case *e2ap.E2APSubscriptionFailure:
760 case *e2ap.E2APSubscriptionDeleteRequest:
762 case *e2ap.E2APSubscriptionDeleteResponse:
764 case *e2ap.E2APSubscriptionDeleteFailure:
771 //-------------------------------------------------------------------
773 //-------------------------------------------------------------------
774 func (c *Control) WriteSubscriptionToDb(subs *Subscription) {
775 xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
776 err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
778 xapp.Logger.Error("%v", err)
782 //-------------------------------------------------------------------
784 //-------------------------------------------------------------------
785 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) {
787 if removeSubscriptionFromDb == true {
788 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
789 c.RemoveSubscriptionFromDb(subs)
791 // Update is needed for successful response and merge case here
792 if subs.RetryFromXapp == false {
793 c.WriteSubscriptionToDb(subs)
796 subs.RetryFromXapp = false
799 //-------------------------------------------------------------------
801 //-------------------------------------------------------------------
802 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
803 xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
804 err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
806 xapp.Logger.Error("%v", err)
810 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) {
812 xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
814 // Send delete for every endpoint in the subscription
815 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
816 subDelReqMsg.RequestId = subs.GetReqId().RequestId
817 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
818 mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
820 xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
823 for _, endPoint := range subs.EpList.Endpoints {
824 params := &xapp.RMRParams{}
826 params.SubId = int(subs.GetReqId().InstanceId)
828 params.Meid = subs.Meid
829 params.Src = endPoint.String()
830 params.PayloadLen = len(payload.Buf)
831 params.Payload = payload.Buf
835 xapp.Logger.Error("SendSubscriptionDeleteReq() params == nil")
839 subs.DeleteFromDb = true
840 c.handleXAPPSubscriptionDeleteRequest(params)