2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
30 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
31 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
32 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
33 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
34 httptransport "github.com/go-openapi/runtime/client"
35 "github.com/go-openapi/strfmt"
36 "github.com/gorilla/mux"
37 "github.com/spf13/viper"
40 //-----------------------------------------------------------------------------
42 //-----------------------------------------------------------------------------
44 func idstring(err error, entries ...fmt.Stringer) string {
45 var retval string = ""
46 var filler string = ""
47 for _, entry := range entries {
48 retval += filler + entry.String()
52 retval += filler + "err(" + err.Error() + ")"
59 //-----------------------------------------------------------------------------
61 //-----------------------------------------------------------------------------
63 var e2tSubReqTimeout time.Duration
64 var e2tSubDelReqTime time.Duration
65 var e2tRecvMsgTimeout time.Duration
66 var e2tMaxSubReqTryCount uint64 // Initial try + retry
67 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
68 var readSubsFromDb string
76 //subscriber *xapp.Subscriber
79 Counters map[string]xapp.Counter
88 type SubmgrRestartTestEvent struct{}
89 type SubmgrRestartUpEvent struct{}
92 xapp.Logger.Info("SUBMGR")
94 viper.SetEnvPrefix("submgr")
95 viper.AllowEmptyEnv(true)
98 func NewControl() *Control {
100 ReadConfigParameters()
101 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
102 rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
104 registry := new(Registry)
105 registry.Initialize()
106 registry.rtmgrClient = &rtmgrClient
108 tracker := new(Tracker)
111 //subscriber := xapp.NewSubscriber(viper.GetString("subscription.host"), viper.GetInt("subscription.timeout"))
113 c := &Control{e2ap: new(E2ap),
117 //subscriber: subscriber,
118 Counters: xapp.Metric.RegisterCounterGroup(GetMetricsOpts(), "SUBMGR"),
121 // Register REST handler for testing support
122 xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
124 go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler, c.SubscriptionDeleteHandler)
125 //go c.subscriber.Listen(c.SubscriptionHandler, c.QueryHandler)
127 if readSubsFromDb == "false" {
131 // Read subscriptions from db
132 xapp.Logger.Info("Reading subscriptions from db")
133 subIds, register, err := c.ReadAllSubscriptionsFromSdl()
135 xapp.Logger.Error("%v", err)
137 c.registry.subIds = subIds
138 c.registry.register = register
139 c.HandleUncompletedSubscriptions(register)
144 //-------------------------------------------------------------------
146 //-------------------------------------------------------------------
147 func ReadConfigParameters() {
149 // viper.GetDuration returns nanoseconds
150 e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
151 if e2tSubReqTimeout == 0 {
152 e2tSubReqTimeout = 2000 * 1000000
154 xapp.Logger.Info("e2tSubReqTimeout %v", e2tSubReqTimeout)
155 e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
156 if e2tSubDelReqTime == 0 {
157 e2tSubDelReqTime = 2000 * 1000000
159 xapp.Logger.Info("e2tSubDelReqTime %v", e2tSubDelReqTime)
160 e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
161 if e2tRecvMsgTimeout == 0 {
162 e2tRecvMsgTimeout = 2000 * 1000000
164 xapp.Logger.Info("e2tRecvMsgTimeout %v", e2tRecvMsgTimeout)
165 e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
166 if e2tMaxSubReqTryCount == 0 {
167 e2tMaxSubReqTryCount = 1
169 xapp.Logger.Info("e2tMaxSubReqTryCount %v", e2tMaxSubReqTryCount)
170 e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
171 if e2tMaxSubDelReqTryCount == 0 {
172 e2tMaxSubDelReqTryCount = 1
174 xapp.Logger.Info("e2tMaxSubDelReqTryCount %v", e2tMaxSubDelReqTryCount)
176 readSubsFromDb = viper.GetString("controls.readSubsFromDb")
177 if readSubsFromDb == "" {
178 readSubsFromDb = "true"
180 xapp.Logger.Info("readSubsFromDb %v", readSubsFromDb)
183 //-------------------------------------------------------------------
185 //-------------------------------------------------------------------
186 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
188 xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
189 for subId, subs := range register {
190 if subs.SubRespRcvd == false {
191 subs.NoRespToXapp = true
192 xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
193 c.SendSubscriptionDeleteReq(subs)
198 func (c *Control) ReadyCB(data interface{}) {
199 if c.RMRClient == nil {
200 c.RMRClient = xapp.Rmr
204 func (c *Control) Run() {
205 xapp.SetReadyCB(c.ReadyCB, nil)
209 //-------------------------------------------------------------------
211 //-------------------------------------------------------------------
212 func (c *Control) SubscriptionHandler(stype models.SubscriptionType, params interface{}) (*models.SubscriptionResponse, error) {
214 switch p := params.(type) {
215 case *models.ReportParams:
216 trans := c.tracker.NewXappTransaction(NewRmrEndpoint(p.ClientEndpoint),"" , 0, &xapp.RMRMeid{RanName: p.Meid})
218 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
221 defer trans.Release()
222 case *models.ControlParams:
223 case *models.PolicyParams:
226 return &models.SubscriptionResponse{}, fmt.Errorf("Subscription rest interface not implemented")
229 func (c *Control) SubscriptionDeleteHandler(s string) error {
233 func (c *Control) QueryHandler() (models.SubscriptionList, error) {
234 xapp.Logger.Info("QueryHandler() called")
236 return c.registry.QueryHandler()
239 func (c *Control) TestRestHandler(w http.ResponseWriter, r *http.Request) {
240 xapp.Logger.Info("TestRestHandler() called")
242 pathParams := mux.Vars(r)
243 s := pathParams["testId"]
245 // This can be used to delete single subscription from db
246 if contains := strings.Contains(s, "deletesubid="); contains == true {
247 var splits = strings.Split(s, "=")
248 if subId, err := strconv.ParseInt(splits[1], 10, 64); err == nil {
249 xapp.Logger.Info("RemoveSubscriptionFromSdl() called. subId = %v", subId)
250 c.RemoveSubscriptionFromSdl(uint32(subId))
255 // This can be used to remove all subscriptions db from
257 xapp.Logger.Info("RemoveAllSubscriptionsFromSdl() called")
258 c.RemoveAllSubscriptionsFromSdl()
262 // This is meant to cause submgr's restart in testing
264 xapp.Logger.Info("os.Exit(1) called")
268 xapp.Logger.Info("Unsupported rest command received %s", s)
271 //-------------------------------------------------------------------
273 //-------------------------------------------------------------------
275 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
276 params := &xapp.RMRParams{}
277 params.Mtype = trans.GetMtype()
278 params.SubId = int(subs.GetReqId().InstanceId)
280 params.Meid = subs.GetMeid()
282 params.PayloadLen = len(trans.Payload.Buf)
283 params.Payload = trans.Payload.Buf
285 xapp.Logger.Info("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
286 return c.SendWithRetry(params, false, 5)
289 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
291 params := &xapp.RMRParams{}
292 params.Mtype = trans.GetMtype()
293 params.SubId = int(subs.GetReqId().InstanceId)
294 params.Xid = trans.GetXid()
295 params.Meid = trans.GetMeid()
297 params.PayloadLen = len(trans.Payload.Buf)
298 params.Payload = trans.Payload.Buf
300 xapp.Logger.Info("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
301 return c.SendWithRetry(params, false, 5)
304 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
305 if c.RMRClient == nil {
306 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
307 xapp.Logger.Error("%s", err.Error())
312 defer c.RMRClient.Free(msg.Mbuf)
314 // xapp-frame might use direct access to c buffer and
315 // when msg.Mbuf is freed, someone might take it into use
316 // and payload data might be invalid inside message handle function
318 // subscriptions won't load system a lot so there is no
319 // real performance hit by cloning buffer into new go byte slice
320 cPay := append(msg.Payload[:0:0], msg.Payload...)
322 msg.PayloadLen = len(cPay)
325 case xapp.RIC_SUB_REQ:
326 go c.handleXAPPSubscriptionRequest(msg)
327 case xapp.RIC_SUB_RESP:
328 go c.handleE2TSubscriptionResponse(msg)
329 case xapp.RIC_SUB_FAILURE:
330 go c.handleE2TSubscriptionFailure(msg)
331 case xapp.RIC_SUB_DEL_REQ:
332 go c.handleXAPPSubscriptionDeleteRequest(msg)
333 case xapp.RIC_SUB_DEL_RESP:
334 go c.handleE2TSubscriptionDeleteResponse(msg)
335 case xapp.RIC_SUB_DEL_FAILURE:
336 go c.handleE2TSubscriptionDeleteFailure(msg)
338 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
343 //-------------------------------------------------------------------
344 // handle from XAPP Subscription Request
345 //------------------------------------------------------------------
346 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
347 xapp.Logger.Info("MSG from XAPP: %s", params.String())
348 c.UpdateCounter(cSubReqFromXapp)
350 subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
352 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
356 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId, params.Meid)
358 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
361 defer trans.Release()
363 if err = c.tracker.Track(trans); err != nil {
364 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
368 //TODO handle subscription toward e2term inside AssignToSubscription / hide handleSubscriptionCreate in it?
369 subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
371 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
375 c.wakeSubscriptionRequest(subs, trans)
378 //-------------------------------------------------------------------
379 // Wake Subscription Request to E2node
380 //------------------------------------------------------------------
381 func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *TransactionXapp) {
383 go c.handleSubscriptionCreate(subs, trans)
384 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
387 switch themsg := event.(type) {
388 case *e2ap.E2APSubscriptionResponse:
389 themsg.RequestId.Id = trans.RequestId.Id
390 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
393 c.UpdateCounter(cSubRespToXapp)
394 c.rmrSendToXapp("", subs, trans)
397 case *e2ap.E2APSubscriptionFailure:
398 themsg.RequestId.Id = trans.RequestId.Id
399 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
401 c.UpdateCounter(cSubFailToXapp)
402 c.rmrSendToXapp("", subs, trans)
408 xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(err, trans, subs))
409 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
412 //-------------------------------------------------------------------
413 // handle from XAPP Subscription Delete Request
414 //------------------------------------------------------------------
415 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
416 xapp.Logger.Info("MSG from XAPP: %s", params.String())
417 c.UpdateCounter(cSubDelReqFromXapp)
419 subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
421 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
425 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId, params.Meid)
427 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
430 defer trans.Release()
432 err = c.tracker.Track(trans)
434 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
438 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
440 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
447 go c.handleSubscriptionDelete(subs, trans)
448 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
450 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
452 if subs.NoRespToXapp == true {
453 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
457 // Whatever is received success, fail or timeout, send successful delete response
458 subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
459 subDelRespMsg.RequestId.Id = trans.RequestId.Id
460 subDelRespMsg.RequestId.InstanceId = subs.GetReqId().RequestId.InstanceId
461 subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
462 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
464 c.UpdateCounter(cSubDelRespToXapp)
465 c.rmrSendToXapp("", subs, trans)
468 //TODO handle subscription toward e2term insiged RemoveFromSubscription / hide handleSubscriptionDelete in it?
469 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
472 //-------------------------------------------------------------------
473 // SUBS CREATE Handling
474 //-------------------------------------------------------------------
475 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp) {
477 var removeSubscriptionFromDb bool = false
478 trans := c.tracker.NewSubsTransaction(subs)
479 subs.WaitTransactionTurn(trans)
480 defer subs.ReleaseTransactionTurn(trans)
481 defer trans.Release()
483 xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
485 subRfMsg, valid := subs.GetCachedResponse()
486 if subRfMsg == nil && valid == true {
487 event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
488 switch event.(type) {
489 case *e2ap.E2APSubscriptionResponse:
490 subRfMsg, valid = subs.SetCachedResponse(event, true)
491 subs.SubRespRcvd = true
492 case *e2ap.E2APSubscriptionFailure:
493 removeSubscriptionFromDb = true
494 subRfMsg, valid = subs.SetCachedResponse(event, false)
495 xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
496 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
497 case *SubmgrRestartTestEvent:
498 // This simulates that no response has been received and after restart subscriptions are restored from db
499 xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
502 xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
503 removeSubscriptionFromDb = true
504 subRfMsg, valid = subs.SetCachedResponse(nil, false)
505 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
507 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
509 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
512 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
514 c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second, c)
517 c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
518 parentTrans.SendEvent(subRfMsg, 0)
521 //-------------------------------------------------------------------
522 // SUBS DELETE Handling
523 //-------------------------------------------------------------------
525 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
527 trans := c.tracker.NewSubsTransaction(subs)
528 subs.WaitTransactionTurn(trans)
529 defer subs.ReleaseTransactionTurn(trans)
530 defer trans.Release()
532 xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
536 if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
539 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
543 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
544 // If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if
545 // RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...))
546 c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second, c)
547 c.registry.UpdateSubscriptionToDb(subs, c)
548 parentTrans.SendEvent(nil, 0)
551 //-------------------------------------------------------------------
552 // send to E2T Subscription Request
553 //-------------------------------------------------------------------
554 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
556 var event interface{} = nil
557 var timedOut bool = false
559 subReqMsg := subs.SubReqMsg
560 subReqMsg.RequestId = subs.GetReqId().RequestId
561 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
563 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
567 // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
568 c.WriteSubscriptionToDb(subs)
569 for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
570 desc := fmt.Sprintf("(retry %d)", retries)
572 c.UpdateCounter(cSubReqToE2)
574 c.UpdateCounter(cSubReReqToE2)
576 c.rmrSendToE2T(desc, subs, trans)
577 if subs.DoNotWaitSubResp == false {
578 event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
580 c.UpdateCounter(cSubReqTimerExpiry)
584 // Simulating case where subscrition request has been sent but response has not been received before restart
585 event = &SubmgrRestartTestEvent{}
589 xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
593 //-------------------------------------------------------------------
594 // send to E2T Subscription Delete Request
595 //-------------------------------------------------------------------
597 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
599 var event interface{}
602 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
603 subDelReqMsg.RequestId = subs.GetReqId().RequestId
604 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
605 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
607 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
611 for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
612 desc := fmt.Sprintf("(retry %d)", retries)
614 c.UpdateCounter(cSubDelReqToE2)
616 c.UpdateCounter(cSubDelReReqToE2)
618 c.rmrSendToE2T(desc, subs, trans)
619 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
621 c.UpdateCounter(cSubDelReqTimerExpiry)
626 xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
630 //-------------------------------------------------------------------
631 // handle from E2T Subscription Response
632 //-------------------------------------------------------------------
633 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
634 xapp.Logger.Info("MSG from E2T: %s", params.String())
635 c.UpdateCounter(cSubRespFromE2)
636 subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
638 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
641 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
643 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
646 trans := subs.GetTransaction()
648 err = fmt.Errorf("Ongoing transaction not found")
649 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
652 sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
654 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
655 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
660 //-------------------------------------------------------------------
661 // handle from E2T Subscription Failure
662 //-------------------------------------------------------------------
663 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
664 xapp.Logger.Info("MSG from E2T: %s", params.String())
665 c.UpdateCounter(cSubFailFromE2)
666 subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
668 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
671 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
673 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
676 trans := subs.GetTransaction()
678 err = fmt.Errorf("Ongoing transaction not found")
679 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
682 sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
684 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
685 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
690 //-------------------------------------------------------------------
691 // handle from E2T Subscription Delete Response
692 //-------------------------------------------------------------------
693 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
694 xapp.Logger.Info("MSG from E2T: %s", params.String())
695 c.UpdateCounter(cSubDelRespFromE2)
696 subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
698 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
701 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
703 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
706 trans := subs.GetTransaction()
708 err = fmt.Errorf("Ongoing transaction not found")
709 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
712 sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
714 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
715 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
720 //-------------------------------------------------------------------
721 // handle from E2T Subscription Delete Failure
722 //-------------------------------------------------------------------
723 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
724 xapp.Logger.Info("MSG from E2T: %s", params.String())
725 c.UpdateCounter(cSubDelFailFromE2)
726 subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
728 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
731 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
733 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
736 trans := subs.GetTransaction()
738 err = fmt.Errorf("Ongoing transaction not found")
739 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
742 sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
744 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
745 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
750 //-------------------------------------------------------------------
752 //-------------------------------------------------------------------
753 func typeofSubsMessage(v interface{}) string {
758 case *e2ap.E2APSubscriptionRequest:
760 case *e2ap.E2APSubscriptionResponse:
762 case *e2ap.E2APSubscriptionFailure:
764 case *e2ap.E2APSubscriptionDeleteRequest:
766 case *e2ap.E2APSubscriptionDeleteResponse:
768 case *e2ap.E2APSubscriptionDeleteFailure:
775 //-------------------------------------------------------------------
777 //-------------------------------------------------------------------
778 func (c *Control) WriteSubscriptionToDb(subs *Subscription) {
779 xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
780 err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
782 xapp.Logger.Error("%v", err)
786 //-------------------------------------------------------------------
788 //-------------------------------------------------------------------
789 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) {
791 if removeSubscriptionFromDb == true {
792 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
793 c.RemoveSubscriptionFromDb(subs)
795 // Update is needed for successful response and merge case here
796 if subs.RetryFromXapp == false {
797 c.WriteSubscriptionToDb(subs)
800 subs.RetryFromXapp = false
803 //-------------------------------------------------------------------
805 //-------------------------------------------------------------------
806 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
807 xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
808 err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
810 xapp.Logger.Error("%v", err)
814 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) {
816 xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
818 // Send delete for every endpoint in the subscription
819 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
820 subDelReqMsg.RequestId = subs.GetReqId().RequestId
821 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
822 mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
824 xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
827 for _, endPoint := range subs.EpList.Endpoints {
828 params := &xapp.RMRParams{}
830 params.SubId = int(subs.GetReqId().InstanceId)
832 params.Meid = subs.Meid
833 params.Src = endPoint.String()
834 params.PayloadLen = len(payload.Buf)
835 params.Payload = payload.Buf
839 xapp.Logger.Error("SendSubscriptionDeleteReq() params == nil")
843 subs.DeleteFromDb = true
844 c.handleXAPPSubscriptionDeleteRequest(params)