2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
30 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
31 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
32 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
33 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
34 httptransport "github.com/go-openapi/runtime/client"
35 "github.com/go-openapi/strfmt"
36 "github.com/gorilla/mux"
37 "github.com/spf13/viper"
40 //-----------------------------------------------------------------------------
42 //-----------------------------------------------------------------------------
44 func idstring(err error, entries ...fmt.Stringer) string {
45 var retval string = ""
46 var filler string = ""
47 for _, entry := range entries {
48 retval += filler + entry.String()
52 retval += filler + "err(" + err.Error() + ")"
59 //-----------------------------------------------------------------------------
61 //-----------------------------------------------------------------------------
63 var e2tSubReqTimeout time.Duration
64 var e2tSubDelReqTime time.Duration
65 var e2tRecvMsgTimeout time.Duration
66 var e2tMaxSubReqTryCount uint64 // Initial try + retry
67 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
68 var readSubsFromDb string
76 //subscriber *xapp.Subscriber
79 Counters map[string]xapp.Counter
88 type SubmgrRestartTestEvent struct{}
89 type SubmgrRestartUpEvent struct{}
92 xapp.Logger.Info("SUBMGR")
94 viper.SetEnvPrefix("submgr")
95 viper.AllowEmptyEnv(true)
98 func NewControl() *Control {
100 ReadConfigParameters()
101 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
102 rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
104 registry := new(Registry)
105 registry.Initialize()
106 registry.rtmgrClient = &rtmgrClient
108 tracker := new(Tracker)
111 //subscriber := xapp.NewSubscriber(viper.GetString("subscription.host"), viper.GetInt("subscription.timeout"))
113 c := &Control{e2ap: new(E2ap),
117 //subscriber: subscriber,
118 Counters: xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
121 // Register REST handler for testing support
122 xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
124 go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler, c.SubscriptionDeleteHandler)
125 //go c.subscriber.Listen(c.SubscriptionHandler, c.QueryHandler)
127 if readSubsFromDb == "false" {
131 // Read subscriptions from db
132 xapp.Logger.Info("Reading subscriptions from db")
133 subIds, register, err := c.ReadAllSubscriptionsFromSdl()
135 xapp.Logger.Error("%v", err)
137 c.registry.subIds = subIds
138 c.registry.register = register
139 c.HandleUncompletedSubscriptions(register)
144 //-------------------------------------------------------------------
146 //-------------------------------------------------------------------
147 func ReadConfigParameters() {
149 // viper.GetDuration returns nanoseconds
150 e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
151 if e2tSubReqTimeout == 0 {
152 e2tSubReqTimeout = 2000 * 1000000
154 xapp.Logger.Info("e2tSubReqTimeout %v", e2tSubReqTimeout)
155 e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
156 if e2tSubDelReqTime == 0 {
157 e2tSubDelReqTime = 2000 * 1000000
159 xapp.Logger.Info("e2tSubDelReqTime %v", e2tSubDelReqTime)
160 e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
161 if e2tRecvMsgTimeout == 0 {
162 e2tRecvMsgTimeout = 2000 * 1000000
164 xapp.Logger.Info("e2tRecvMsgTimeout %v", e2tRecvMsgTimeout)
165 e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
166 if e2tMaxSubReqTryCount == 0 {
167 e2tMaxSubReqTryCount = 1
169 xapp.Logger.Info("e2tMaxSubReqTryCount %v", e2tMaxSubReqTryCount)
170 e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
171 if e2tMaxSubDelReqTryCount == 0 {
172 e2tMaxSubDelReqTryCount = 1
174 xapp.Logger.Info("e2tMaxSubDelReqTryCount %v", e2tMaxSubDelReqTryCount)
176 readSubsFromDb = viper.GetString("controls.readSubsFromDb")
177 if readSubsFromDb == "" {
178 readSubsFromDb = "true"
180 xapp.Logger.Info("readSubsFromDb %v", readSubsFromDb)
183 //-------------------------------------------------------------------
185 //-------------------------------------------------------------------
186 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
188 xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
189 for subId, subs := range register {
190 if subs.SubRespRcvd == false {
191 subs.NoRespToXapp = true
192 xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
193 c.SendSubscriptionDeleteReq(subs)
198 func (c *Control) ReadyCB(data interface{}) {
199 if c.RMRClient == nil {
200 c.RMRClient = xapp.Rmr
204 func (c *Control) Run() {
205 xapp.SetReadyCB(c.ReadyCB, nil)
209 //-------------------------------------------------------------------
211 //-------------------------------------------------------------------
212 func (c *Control) SubscriptionHandler(stype models.SubscriptionType, params interface{}) (*models.SubscriptionResponse, error) {
214 switch p := params.(type) {
215 case *models.ReportParams:
216 trans := c.tracker.NewXappTransaction(NewRmrEndpoint(p.ClientEndpoint),"" , 0, &xapp.RMRMeid{RanName: p.Meid})
218 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
221 defer trans.Release()
222 case *models.ControlParams:
223 case *models.PolicyParams:
226 return &models.SubscriptionResponse{}, fmt.Errorf("Subscription rest interface not implemented")
229 func (c *Control) SubscriptionDeleteHandler(s string) error {
233 func (c *Control) QueryHandler() (models.SubscriptionList, error) {
234 xapp.Logger.Info("QueryHandler() called")
236 return c.registry.QueryHandler()
239 func (c *Control) TestRestHandler(w http.ResponseWriter, r *http.Request) {
240 xapp.Logger.Info("TestRestHandler() called")
242 pathParams := mux.Vars(r)
243 s := pathParams["testId"]
245 // This can be used to delete single subscription from db
246 if contains := strings.Contains(s, "deletesubid="); contains == true {
247 var splits = strings.Split(s, "=")
248 if subId, err := strconv.ParseInt(splits[1], 10, 64); err == nil {
249 xapp.Logger.Info("RemoveSubscriptionFromSdl() called. subId = %v", subId)
250 c.RemoveSubscriptionFromSdl(uint32(subId))
255 // This can be used to remove all subscriptions db from
257 xapp.Logger.Info("RemoveAllSubscriptionsFromSdl() called")
258 c.RemoveAllSubscriptionsFromSdl()
262 // This is meant to cause submgr's restart in testing
264 xapp.Logger.Info("os.Exit(1) called")
268 xapp.Logger.Info("Unsupported rest command received %s", s)
271 //-------------------------------------------------------------------
273 //-------------------------------------------------------------------
275 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
276 params := &xapp.RMRParams{}
277 params.Mtype = trans.GetMtype()
278 params.SubId = int(subs.GetReqId().InstanceId)
280 params.Meid = subs.GetMeid()
282 params.PayloadLen = len(trans.Payload.Buf)
283 params.Payload = trans.Payload.Buf
285 xapp.Logger.Info("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
286 return c.SendWithRetry(params, false, 5)
289 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
291 params := &xapp.RMRParams{}
292 params.Mtype = trans.GetMtype()
293 params.SubId = int(subs.GetReqId().InstanceId)
294 params.Xid = trans.GetXid()
295 params.Meid = trans.GetMeid()
297 params.PayloadLen = len(trans.Payload.Buf)
298 params.Payload = trans.Payload.Buf
300 xapp.Logger.Info("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
301 return c.SendWithRetry(params, false, 5)
304 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
305 if c.RMRClient == nil {
306 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
307 xapp.Logger.Error("%s", err.Error())
312 defer c.RMRClient.Free(msg.Mbuf)
314 // xapp-frame might use direct access to c buffer and
315 // when msg.Mbuf is freed, someone might take it into use
316 // and payload data might be invalid inside message handle function
318 // subscriptions won't load system a lot so there is no
319 // real performance hit by cloning buffer into new go byte slice
320 cPay := append(msg.Payload[:0:0], msg.Payload...)
322 msg.PayloadLen = len(cPay)
325 case xapp.RIC_SUB_REQ:
326 go c.handleXAPPSubscriptionRequest(msg)
327 case xapp.RIC_SUB_RESP:
328 go c.handleE2TSubscriptionResponse(msg)
329 case xapp.RIC_SUB_FAILURE:
330 go c.handleE2TSubscriptionFailure(msg)
331 case xapp.RIC_SUB_DEL_REQ:
332 go c.handleXAPPSubscriptionDeleteRequest(msg)
333 case xapp.RIC_SUB_DEL_RESP:
334 go c.handleE2TSubscriptionDeleteResponse(msg)
335 case xapp.RIC_SUB_DEL_FAILURE:
336 go c.handleE2TSubscriptionDeleteFailure(msg)
338 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
343 //-------------------------------------------------------------------
344 // handle from XAPP Subscription Request
345 //------------------------------------------------------------------
346 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
347 xapp.Logger.Info("MSG from XAPP: %s", params.String())
348 c.UpdateCounter(cSubReqFromXapp)
350 subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
352 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
356 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
358 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
361 defer trans.Release()
363 if err = c.tracker.Track(trans); err != nil {
364 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
368 //TODO handle subscription toward e2term inside AssignToSubscription / hide handleSubscriptionCreate in it?
369 subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
371 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
375 c.wakeSubscriptionRequest(subs, trans)
378 //-------------------------------------------------------------------
379 // Wake Subscription Request to E2node
380 //------------------------------------------------------------------
381 func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) {
383 go c.handleSubscriptionCreate(subs, trans)
384 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
387 switch themsg := event.(type) {
388 case *e2ap.E2APSubscriptionResponse:
389 themsg.RequestId.Id = trans.RequestId.Id
390 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
393 c.UpdateCounter(cSubRespToXapp)
394 c.rmrSendToXapp("", subs, trans)
397 case *e2ap.E2APSubscriptionFailure:
398 themsg.RequestId.Id = trans.RequestId.Id
399 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
401 c.UpdateCounter(cSubFailToXapp)
402 c.rmrSendToXapp("", subs, trans)
408 xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(err, trans, subs))
409 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
412 //-------------------------------------------------------------------
413 // handle from XAPP Subscription Delete Request
414 //------------------------------------------------------------------
415 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
416 xapp.Logger.Info("MSG from XAPP: %s", params.String())
417 c.UpdateCounter(cSubDelReqFromXapp)
419 subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
421 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
425 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
427 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
430 defer trans.Release()
432 err = c.tracker.Track(trans)
434 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
438 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
440 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
447 go c.handleSubscriptionDelete(subs, trans)
448 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
450 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
452 if subs.NoRespToXapp == true {
453 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
457 // Whatever is received success, fail or timeout, send successful delete response
458 subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
459 subDelRespMsg.RequestId.Id = trans.RequestId.Id
460 subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
461 subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
462 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
464 c.UpdateCounter(cSubDelRespToXapp)
465 c.rmrSendToXapp("", subs, trans)
468 //TODO handle subscription toward e2term insiged RemoveFromSubscription / hide handleSubscriptionDelete in it?
469 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
472 //-------------------------------------------------------------------
473 // SUBS CREATE Handling
474 //-------------------------------------------------------------------
475 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp) {
477 var removeSubscriptionFromDb bool = false
478 trans := c.tracker.NewSubsTransaction(subs)
479 subs.WaitTransactionTurn(trans)
480 defer subs.ReleaseTransactionTurn(trans)
481 defer trans.Release()
483 xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
485 subRfMsg, valid := subs.GetCachedResponse()
486 if subRfMsg == nil && valid == true {
487 event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
488 switch event.(type) {
489 case *e2ap.E2APSubscriptionResponse:
490 subRfMsg, valid = subs.SetCachedResponse(event, true)
491 subs.SubRespRcvd = true
492 case *e2ap.E2APSubscriptionFailure:
493 removeSubscriptionFromDb = true
494 subRfMsg, valid = subs.SetCachedResponse(event, false)
495 xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
496 case *SubmgrRestartTestEvent:
497 // This simulates that no response has been received and after restart subscriptions are restored from db
498 xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
501 xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
502 removeSubscriptionFromDb = true
503 subRfMsg, valid = subs.SetCachedResponse(nil, false)
504 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
506 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
508 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
511 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
513 c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second, c)
516 c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
517 parentTrans.SendEvent(subRfMsg, 0)
520 //-------------------------------------------------------------------
521 // SUBS DELETE Handling
522 //-------------------------------------------------------------------
524 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
526 trans := c.tracker.NewSubsTransaction(subs)
527 subs.WaitTransactionTurn(trans)
528 defer subs.ReleaseTransactionTurn(trans)
529 defer trans.Release()
531 xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
535 if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
538 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
542 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
543 // If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if
544 // RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...))
545 c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second, c)
546 c.registry.UpdateSubscriptionToDb(subs, c)
547 parentTrans.SendEvent(nil, 0)
550 //-------------------------------------------------------------------
551 // send to E2T Subscription Request
552 //-------------------------------------------------------------------
553 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
555 var event interface{} = nil
556 var timedOut bool = false
558 subReqMsg := subs.SubReqMsg
559 subReqMsg.RequestId = subs.GetReqId().RequestId
560 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
562 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
566 // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
567 c.WriteSubscriptionToDb(subs)
568 for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
569 desc := fmt.Sprintf("(retry %d)", retries)
571 c.UpdateCounter(cSubReqToE2)
573 c.UpdateCounter(cSubReReqToE2)
575 c.rmrSendToE2T(desc, subs, trans)
576 if subs.DoNotWaitSubResp == false {
577 event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
579 c.UpdateCounter(cSubReqTimerExpiry)
583 // Simulating case where subscrition request has been sent but response has not been received before restart
584 event = &SubmgrRestartTestEvent{}
588 xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
592 //-------------------------------------------------------------------
593 // send to E2T Subscription Delete Request
594 //-------------------------------------------------------------------
596 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
598 var event interface{}
601 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
602 subDelReqMsg.RequestId = subs.GetReqId().RequestId
603 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
604 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
606 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
610 for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
611 desc := fmt.Sprintf("(retry %d)", retries)
613 c.UpdateCounter(cSubDelReqToE2)
615 c.UpdateCounter(cSubDelReReqToE2)
617 c.rmrSendToE2T(desc, subs, trans)
618 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
620 c.UpdateCounter(cSubDelReqTimerExpiry)
625 xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
629 //-------------------------------------------------------------------
630 // handle from E2T Subscription Response
631 //-------------------------------------------------------------------
632 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
633 xapp.Logger.Info("MSG from E2T: %s", params.String())
634 c.UpdateCounter(cSubRespFromE2)
635 subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
637 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
640 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
642 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
645 trans := subs.GetTransaction()
647 err = fmt.Errorf("Ongoing transaction not found")
648 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
651 sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
653 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
654 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
659 //-------------------------------------------------------------------
660 // handle from E2T Subscription Failure
661 //-------------------------------------------------------------------
662 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
663 xapp.Logger.Info("MSG from E2T: %s", params.String())
664 c.UpdateCounter(cSubFailFromE2)
665 subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
667 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
670 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
672 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
675 trans := subs.GetTransaction()
677 err = fmt.Errorf("Ongoing transaction not found")
678 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
681 sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
683 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
684 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
689 //-------------------------------------------------------------------
690 // handle from E2T Subscription Delete Response
691 //-------------------------------------------------------------------
692 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
693 xapp.Logger.Info("MSG from E2T: %s", params.String())
694 c.UpdateCounter(cSubDelRespFromE2)
695 subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
697 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
700 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
702 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
705 trans := subs.GetTransaction()
707 err = fmt.Errorf("Ongoing transaction not found")
708 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
711 sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
713 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
714 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
719 //-------------------------------------------------------------------
720 // handle from E2T Subscription Delete Failure
721 //-------------------------------------------------------------------
722 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
723 xapp.Logger.Info("MSG from E2T: %s", params.String())
724 c.UpdateCounter(cSubDelFailFromE2)
725 subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
727 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
730 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
732 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
735 trans := subs.GetTransaction()
737 err = fmt.Errorf("Ongoing transaction not found")
738 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
741 sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
743 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
744 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
749 //-------------------------------------------------------------------
751 //-------------------------------------------------------------------
752 func typeofSubsMessage(v interface{}) string {
757 case *e2ap.E2APSubscriptionRequest:
759 case *e2ap.E2APSubscriptionResponse:
761 case *e2ap.E2APSubscriptionFailure:
763 case *e2ap.E2APSubscriptionDeleteRequest:
765 case *e2ap.E2APSubscriptionDeleteResponse:
767 case *e2ap.E2APSubscriptionDeleteFailure:
774 //-------------------------------------------------------------------
776 //-------------------------------------------------------------------
777 func (c *Control) WriteSubscriptionToDb(subs *Subscription) {
778 xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
779 err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
781 xapp.Logger.Error("%v", err)
785 //-------------------------------------------------------------------
787 //-------------------------------------------------------------------
788 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) {
790 if removeSubscriptionFromDb == true {
791 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
792 c.RemoveSubscriptionFromDb(subs)
794 // Update is needed for successful response and merge case here
795 if subs.RetryFromXapp == false {
796 c.WriteSubscriptionToDb(subs)
799 subs.RetryFromXapp = false
802 //-------------------------------------------------------------------
804 //-------------------------------------------------------------------
805 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
806 xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
807 err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
809 xapp.Logger.Error("%v", err)
813 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) {
815 xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
817 // Send delete for every endpoint in the subscription
818 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
819 subDelReqMsg.RequestId = subs.GetReqId().RequestId
820 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
821 mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
823 xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
826 for _, endPoint := range subs.EpList.Endpoints {
827 params := &xapp.RMRParams{}
829 params.SubId = int(subs.GetReqId().InstanceId)
831 params.Meid = subs.Meid
832 params.Src = endPoint.String()
833 params.PayloadLen = len(payload.Buf)
834 params.Payload = payload.Buf
838 xapp.Logger.Error("SendSubscriptionDeleteReq() params == nil")
842 subs.DeleteFromDb = true
843 c.handleXAPPSubscriptionDeleteRequest(params)