2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
30 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
31 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
32 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
33 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
34 httptransport "github.com/go-openapi/runtime/client"
35 "github.com/go-openapi/strfmt"
36 "github.com/gorilla/mux"
37 "github.com/spf13/viper"
40 //-----------------------------------------------------------------------------
42 //-----------------------------------------------------------------------------
44 func idstring(err error, entries ...fmt.Stringer) string {
45 var retval string = ""
46 var filler string = ""
47 for _, entry := range entries {
48 retval += filler + entry.String()
52 retval += filler + "err(" + err.Error() + ")"
58 //-----------------------------------------------------------------------------
60 //-----------------------------------------------------------------------------
62 var e2tSubReqTimeout time.Duration
63 var e2tSubDelReqTime time.Duration
64 var e2tRecvMsgTimeout time.Duration
65 var e2tMaxSubReqTryCount uint64 // Initial try + retry
66 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
67 var readSubsFromDb string
75 //subscriber *xapp.Subscriber
78 Counters map[string]xapp.Counter
87 type SubmgrRestartTestEvent struct{}
88 type SubmgrRestartUpEvent struct{}
91 xapp.Logger.Info("SUBMGR")
93 viper.SetEnvPrefix("submgr")
94 viper.AllowEmptyEnv(true)
97 func NewControl() *Control {
99 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
100 rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
102 registry := new(Registry)
103 registry.Initialize()
104 registry.rtmgrClient = &rtmgrClient
106 tracker := new(Tracker)
109 c := &Control{e2ap: new(E2ap),
113 //subscriber: subscriber,
114 Counters: xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
116 c.ReadConfigParameters("")
118 // Register REST handler for testing support
119 xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
120 xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET")
122 go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler, c.SubscriptionDeleteHandler)
124 if readSubsFromDb == "false" {
128 // Read subscriptions from db
129 xapp.Logger.Info("Reading subscriptions from db")
130 subIds, register, err := c.ReadAllSubscriptionsFromSdl()
132 xapp.Logger.Error("%v", err)
134 c.registry.subIds = subIds
135 c.registry.register = register
136 c.HandleUncompletedSubscriptions(register)
141 func (c *Control) SymptomDataHandler(w http.ResponseWriter, r *http.Request) {
142 subscriptions, _ := c.registry.QueryHandler()
143 xapp.Resource.SendSymptomDataJson(w, r, subscriptions, "platform/subscriptions.json")
146 //-------------------------------------------------------------------
148 //-------------------------------------------------------------------
149 func (c *Control) ReadConfigParameters(f string) {
151 // viper.GetDuration returns nanoseconds
152 e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
153 if e2tSubReqTimeout == 0 {
154 e2tSubReqTimeout = 2000 * 1000000
156 xapp.Logger.Info("e2tSubReqTimeout %v", e2tSubReqTimeout)
157 e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
158 if e2tSubDelReqTime == 0 {
159 e2tSubDelReqTime = 2000 * 1000000
161 xapp.Logger.Info("e2tSubDelReqTime %v", e2tSubDelReqTime)
162 e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
163 if e2tRecvMsgTimeout == 0 {
164 e2tRecvMsgTimeout = 2000 * 1000000
166 xapp.Logger.Info("e2tRecvMsgTimeout %v", e2tRecvMsgTimeout)
167 e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
168 if e2tMaxSubReqTryCount == 0 {
169 e2tMaxSubReqTryCount = 1
171 xapp.Logger.Info("e2tMaxSubReqTryCount %v", e2tMaxSubReqTryCount)
172 e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
173 if e2tMaxSubDelReqTryCount == 0 {
174 e2tMaxSubDelReqTryCount = 1
176 xapp.Logger.Info("e2tMaxSubDelReqTryCount %v", e2tMaxSubDelReqTryCount)
178 readSubsFromDb = viper.GetString("controls.readSubsFromDb")
179 if readSubsFromDb == "" {
180 readSubsFromDb = "true"
182 xapp.Logger.Info("readSubsFromDb %v", readSubsFromDb)
185 //-------------------------------------------------------------------
187 //-------------------------------------------------------------------
188 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
190 xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
191 for subId, subs := range register {
192 if subs.SubRespRcvd == false {
193 subs.NoRespToXapp = true
194 xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
195 c.SendSubscriptionDeleteReq(subs)
200 func (c *Control) ReadyCB(data interface{}) {
201 if c.RMRClient == nil {
202 c.RMRClient = xapp.Rmr
206 func (c *Control) Run() {
207 xapp.SetReadyCB(c.ReadyCB, nil)
208 xapp.AddConfigChangeListener(c.ReadConfigParameters)
212 //-------------------------------------------------------------------
214 //-------------------------------------------------------------------
215 func (c *Control) SubscriptionHandler(stype models.SubscriptionType, params interface{}) (*models.SubscriptionResponse, error) {
217 switch p := params.(type) {
218 case *models.ReportParams:
219 trans := c.tracker.NewXappTransaction(NewRmrEndpoint(p.ClientEndpoint),"" , 0, &xapp.RMRMeid{RanName: p.Meid})
221 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
224 defer trans.Release()
225 case *models.ControlParams:
226 case *models.PolicyParams:
229 return &models.SubscriptionResponse{}, fmt.Errorf("Subscription rest interface not implemented")
232 func (c *Control) SubscriptionDeleteHandler(s string) error {
236 func (c *Control) QueryHandler() (models.SubscriptionList, error) {
237 xapp.Logger.Info("QueryHandler() called")
239 return c.registry.QueryHandler()
242 func (c *Control) TestRestHandler(w http.ResponseWriter, r *http.Request) {
243 xapp.Logger.Info("TestRestHandler() called")
245 pathParams := mux.Vars(r)
246 s := pathParams["testId"]
248 // This can be used to delete single subscription from db
249 if contains := strings.Contains(s, "deletesubid="); contains == true {
250 var splits = strings.Split(s, "=")
251 if subId, err := strconv.ParseInt(splits[1], 10, 64); err == nil {
252 xapp.Logger.Info("RemoveSubscriptionFromSdl() called. subId = %v", subId)
253 c.RemoveSubscriptionFromSdl(uint32(subId))
258 // This can be used to remove all subscriptions db from
260 xapp.Logger.Info("RemoveAllSubscriptionsFromSdl() called")
261 c.RemoveAllSubscriptionsFromSdl()
265 // This is meant to cause submgr's restart in testing
267 xapp.Logger.Info("os.Exit(1) called")
271 xapp.Logger.Info("Unsupported rest command received %s", s)
274 //-------------------------------------------------------------------
276 //-------------------------------------------------------------------
278 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
279 params := &xapp.RMRParams{}
280 params.Mtype = trans.GetMtype()
281 params.SubId = int(subs.GetReqId().InstanceId)
283 params.Meid = subs.GetMeid()
285 params.PayloadLen = len(trans.Payload.Buf)
286 params.Payload = trans.Payload.Buf
288 xapp.Logger.Info("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
289 err = c.SendWithRetry(params, false, 5)
291 xapp.Logger.Error("rmrSendToE2T: Send failed: %+v", err)
296 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
298 params := &xapp.RMRParams{}
299 params.Mtype = trans.GetMtype()
300 params.SubId = int(subs.GetReqId().InstanceId)
301 params.Xid = trans.GetXid()
302 params.Meid = trans.GetMeid()
304 params.PayloadLen = len(trans.Payload.Buf)
305 params.Payload = trans.Payload.Buf
307 xapp.Logger.Info("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
308 err = c.SendWithRetry(params, false, 5)
310 xapp.Logger.Error("rmrSendToXapp: Send failed: %+v", err)
315 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
316 if c.RMRClient == nil {
317 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
318 xapp.Logger.Error("%s", err.Error())
323 defer c.RMRClient.Free(msg.Mbuf)
325 // xapp-frame might use direct access to c buffer and
326 // when msg.Mbuf is freed, someone might take it into use
327 // and payload data might be invalid inside message handle function
329 // subscriptions won't load system a lot so there is no
330 // real performance hit by cloning buffer into new go byte slice
331 cPay := append(msg.Payload[:0:0], msg.Payload...)
333 msg.PayloadLen = len(cPay)
336 case xapp.RIC_SUB_REQ:
337 go c.handleXAPPSubscriptionRequest(msg)
338 case xapp.RIC_SUB_RESP:
339 go c.handleE2TSubscriptionResponse(msg)
340 case xapp.RIC_SUB_FAILURE:
341 go c.handleE2TSubscriptionFailure(msg)
342 case xapp.RIC_SUB_DEL_REQ:
343 go c.handleXAPPSubscriptionDeleteRequest(msg)
344 case xapp.RIC_SUB_DEL_RESP:
345 go c.handleE2TSubscriptionDeleteResponse(msg)
346 case xapp.RIC_SUB_DEL_FAILURE:
347 go c.handleE2TSubscriptionDeleteFailure(msg)
349 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
354 //-------------------------------------------------------------------
355 // handle from XAPP Subscription Request
356 //------------------------------------------------------------------
357 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
358 xapp.Logger.Info("MSG from XAPP: %s", params.String())
359 c.UpdateCounter(cSubReqFromXapp)
361 subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
363 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
367 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
369 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
372 defer trans.Release()
374 if err = c.tracker.Track(trans); err != nil {
375 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
379 //TODO handle subscription toward e2term inside AssignToSubscription / hide handleSubscriptionCreate in it?
380 subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
382 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
386 c.wakeSubscriptionRequest(subs, trans)
389 //-------------------------------------------------------------------
390 // Wake Subscription Request to E2node
391 //------------------------------------------------------------------
392 func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) {
394 go c.handleSubscriptionCreate(subs, trans)
395 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
398 switch themsg := event.(type) {
399 case *e2ap.E2APSubscriptionResponse:
400 themsg.RequestId.Id = trans.RequestId.Id
401 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
404 c.UpdateCounter(cSubRespToXapp)
405 c.rmrSendToXapp("", subs, trans)
408 case *e2ap.E2APSubscriptionFailure:
409 themsg.RequestId.Id = trans.RequestId.Id
410 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
412 c.UpdateCounter(cSubFailToXapp)
413 c.rmrSendToXapp("", subs, trans)
419 xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(err, trans, subs))
420 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
423 //-------------------------------------------------------------------
424 // handle from XAPP Subscription Delete Request
425 //------------------------------------------------------------------
426 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
427 xapp.Logger.Info("MSG from XAPP: %s", params.String())
428 c.UpdateCounter(cSubDelReqFromXapp)
430 subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
432 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
436 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
438 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
441 defer trans.Release()
443 err = c.tracker.Track(trans)
445 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
449 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
451 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
458 go c.handleSubscriptionDelete(subs, trans)
459 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
461 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
463 if subs.NoRespToXapp == true {
464 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
468 // Whatever is received success, fail or timeout, send successful delete response
469 subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
470 subDelRespMsg.RequestId.Id = trans.RequestId.Id
471 subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
472 subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
473 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
475 c.UpdateCounter(cSubDelRespToXapp)
476 c.rmrSendToXapp("", subs, trans)
479 //TODO handle subscription toward e2term insiged RemoveFromSubscription / hide handleSubscriptionDelete in it?
480 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
483 //-------------------------------------------------------------------
484 // SUBS CREATE Handling
485 //-------------------------------------------------------------------
486 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp) {
488 var removeSubscriptionFromDb bool = false
489 trans := c.tracker.NewSubsTransaction(subs)
490 subs.WaitTransactionTurn(trans)
491 defer subs.ReleaseTransactionTurn(trans)
492 defer trans.Release()
494 xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
496 subRfMsg, valid := subs.GetCachedResponse()
497 if subRfMsg == nil && valid == true {
498 event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
499 switch event.(type) {
500 case *e2ap.E2APSubscriptionResponse:
501 subRfMsg, valid = subs.SetCachedResponse(event, true)
502 subs.SubRespRcvd = true
503 case *e2ap.E2APSubscriptionFailure:
504 removeSubscriptionFromDb = true
505 subRfMsg, valid = subs.SetCachedResponse(event, false)
506 xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
507 case *SubmgrRestartTestEvent:
508 // This simulates that no response has been received and after restart subscriptions are restored from db
509 xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
512 xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
513 removeSubscriptionFromDb = true
514 subRfMsg, valid = subs.SetCachedResponse(nil, false)
515 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
517 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
519 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
522 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
524 c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second, c)
527 c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
528 parentTrans.SendEvent(subRfMsg, 0)
531 //-------------------------------------------------------------------
532 // SUBS DELETE Handling
533 //-------------------------------------------------------------------
535 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
537 trans := c.tracker.NewSubsTransaction(subs)
538 subs.WaitTransactionTurn(trans)
539 defer subs.ReleaseTransactionTurn(trans)
540 defer trans.Release()
542 xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
546 if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
549 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
553 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
554 // If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if
555 // RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...))
556 c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second, c)
557 c.registry.UpdateSubscriptionToDb(subs, c)
558 parentTrans.SendEvent(nil, 0)
561 //-------------------------------------------------------------------
562 // send to E2T Subscription Request
563 //-------------------------------------------------------------------
564 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
566 var event interface{} = nil
567 var timedOut bool = false
569 subReqMsg := subs.SubReqMsg
570 subReqMsg.RequestId = subs.GetReqId().RequestId
571 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
573 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
577 // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
578 c.WriteSubscriptionToDb(subs)
579 for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
580 desc := fmt.Sprintf("(retry %d)", retries)
582 c.UpdateCounter(cSubReqToE2)
584 c.UpdateCounter(cSubReReqToE2)
586 c.rmrSendToE2T(desc, subs, trans)
587 if subs.DoNotWaitSubResp == false {
588 event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
590 c.UpdateCounter(cSubReqTimerExpiry)
594 // Simulating case where subscrition request has been sent but response has not been received before restart
595 event = &SubmgrRestartTestEvent{}
599 xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
603 //-------------------------------------------------------------------
604 // send to E2T Subscription Delete Request
605 //-------------------------------------------------------------------
607 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
609 var event interface{}
612 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
613 subDelReqMsg.RequestId = subs.GetReqId().RequestId
614 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
615 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
617 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
621 for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
622 desc := fmt.Sprintf("(retry %d)", retries)
624 c.UpdateCounter(cSubDelReqToE2)
626 c.UpdateCounter(cSubDelReReqToE2)
628 c.rmrSendToE2T(desc, subs, trans)
629 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
631 c.UpdateCounter(cSubDelReqTimerExpiry)
636 xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
640 //-------------------------------------------------------------------
641 // handle from E2T Subscription Response
642 //-------------------------------------------------------------------
643 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
644 xapp.Logger.Info("MSG from E2T: %s", params.String())
645 c.UpdateCounter(cSubRespFromE2)
646 subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
648 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
651 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
653 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
656 trans := subs.GetTransaction()
658 err = fmt.Errorf("Ongoing transaction not found")
659 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
662 sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
664 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
665 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
670 //-------------------------------------------------------------------
671 // handle from E2T Subscription Failure
672 //-------------------------------------------------------------------
673 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
674 xapp.Logger.Info("MSG from E2T: %s", params.String())
675 c.UpdateCounter(cSubFailFromE2)
676 subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
678 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
681 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
683 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
686 trans := subs.GetTransaction()
688 err = fmt.Errorf("Ongoing transaction not found")
689 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
692 sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
694 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
695 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
700 //-------------------------------------------------------------------
701 // handle from E2T Subscription Delete Response
702 //-------------------------------------------------------------------
703 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
704 xapp.Logger.Info("MSG from E2T: %s", params.String())
705 c.UpdateCounter(cSubDelRespFromE2)
706 subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
708 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
711 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
713 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
716 trans := subs.GetTransaction()
718 err = fmt.Errorf("Ongoing transaction not found")
719 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
722 sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
724 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
725 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
730 //-------------------------------------------------------------------
731 // handle from E2T Subscription Delete Failure
732 //-------------------------------------------------------------------
733 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
734 xapp.Logger.Info("MSG from E2T: %s", params.String())
735 c.UpdateCounter(cSubDelFailFromE2)
736 subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
738 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
741 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
743 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
746 trans := subs.GetTransaction()
748 err = fmt.Errorf("Ongoing transaction not found")
749 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
752 sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
754 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
755 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
760 //-------------------------------------------------------------------
762 //-------------------------------------------------------------------
763 func typeofSubsMessage(v interface{}) string {
768 case *e2ap.E2APSubscriptionRequest:
770 case *e2ap.E2APSubscriptionResponse:
772 case *e2ap.E2APSubscriptionFailure:
774 case *e2ap.E2APSubscriptionDeleteRequest:
776 case *e2ap.E2APSubscriptionDeleteResponse:
778 case *e2ap.E2APSubscriptionDeleteFailure:
785 //-------------------------------------------------------------------
787 //-------------------------------------------------------------------
788 func (c *Control) WriteSubscriptionToDb(subs *Subscription) {
789 xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
790 err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
792 xapp.Logger.Error("%v", err)
796 //-------------------------------------------------------------------
798 //-------------------------------------------------------------------
799 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) {
801 if removeSubscriptionFromDb == true {
802 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
803 c.RemoveSubscriptionFromDb(subs)
805 // Update is needed for successful response and merge case here
806 if subs.RetryFromXapp == false {
807 c.WriteSubscriptionToDb(subs)
810 subs.RetryFromXapp = false
813 //-------------------------------------------------------------------
815 //-------------------------------------------------------------------
816 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
817 xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
818 err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
820 xapp.Logger.Error("%v", err)
824 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) {
826 xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
828 // Send delete for every endpoint in the subscription
829 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
830 subDelReqMsg.RequestId = subs.GetReqId().RequestId
831 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
832 mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
834 xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
837 for _, endPoint := range subs.EpList.Endpoints {
838 params := &xapp.RMRParams{}
840 params.SubId = int(subs.GetReqId().InstanceId)
842 params.Meid = subs.Meid
843 params.Src = endPoint.String()
844 params.PayloadLen = len(payload.Buf)
845 params.Payload = payload.Buf
849 xapp.Logger.Error("SendSubscriptionDeleteReq() params == nil")
853 subs.DeleteFromDb = true
854 c.handleXAPPSubscriptionDeleteRequest(params)