2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
30 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
31 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
32 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
33 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
34 httptransport "github.com/go-openapi/runtime/client"
35 "github.com/go-openapi/strfmt"
36 "github.com/gorilla/mux"
37 "github.com/spf13/viper"
40 //-----------------------------------------------------------------------------
42 //-----------------------------------------------------------------------------
44 func idstring(err error, entries ...fmt.Stringer) string {
45 var retval string = ""
46 var filler string = ""
47 for _, entry := range entries {
48 retval += filler + entry.String()
52 retval += filler + "err(" + err.Error() + ")"
59 //-----------------------------------------------------------------------------
61 //-----------------------------------------------------------------------------
63 var e2tSubReqTimeout time.Duration
64 var e2tSubDelReqTime time.Duration
65 var e2tRecvMsgTimeout time.Duration
66 var e2tMaxSubReqTryCount uint64 // Initial try + retry
67 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
68 var readSubsFromDb string
76 //subscriber *xapp.Subscriber
87 type SubmgrRestartTestEvent struct{}
88 type SubmgrRestartUpEvent struct{}
91 xapp.Logger.Info("SUBMGR")
93 viper.SetEnvPrefix("submgr")
94 viper.AllowEmptyEnv(true)
97 func NewControl() *Control {
99 ReadConfigParameters()
100 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
101 rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
103 registry := new(Registry)
104 registry.Initialize()
105 registry.rtmgrClient = &rtmgrClient
107 tracker := new(Tracker)
110 //subscriber := xapp.NewSubscriber(viper.GetString("subscription.host"), viper.GetInt("subscription.timeout"))
112 c := &Control{e2ap: new(E2ap),
116 //subscriber: subscriber,
119 // Register REST handler for testing support
120 xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
122 go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler, c.SubscriptionDeleteHandler)
123 //go c.subscriber.Listen(c.SubscriptionHandler, c.QueryHandler)
125 if readSubsFromDb == "false" {
129 // Read subscriptions from db
130 xapp.Logger.Info("Reading subscriptions from db")
131 subIds, register, err := c.ReadAllSubscriptionsFromSdl()
133 xapp.Logger.Error("%v", err)
135 c.registry.subIds = subIds
136 c.registry.register = register
137 c.HandleUncompletedSubscriptions(register)
142 //-------------------------------------------------------------------
144 //-------------------------------------------------------------------
145 func ReadConfigParameters() {
147 // viper.GetDuration returns nanoseconds
148 e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
149 if e2tSubReqTimeout == 0 {
150 e2tSubReqTimeout = 2000 * 1000000
152 xapp.Logger.Info("e2tSubReqTimeout %v", e2tSubReqTimeout)
153 e2tSubDelReqTime = viper.GetDuration("controls.e2tSubDelReqTime_ms") * 1000000
154 if e2tSubDelReqTime == 0 {
155 e2tSubDelReqTime = 2000 * 1000000
157 xapp.Logger.Info("e2tSubDelReqTime %v", e2tSubDelReqTime)
158 e2tRecvMsgTimeout = viper.GetDuration("controls.e2tRecvMsgTimeout_ms") * 1000000
159 if e2tRecvMsgTimeout == 0 {
160 e2tRecvMsgTimeout = 2000 * 1000000
162 xapp.Logger.Info("e2tRecvMsgTimeout %v", e2tRecvMsgTimeout)
163 e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
164 if e2tMaxSubReqTryCount == 0 {
165 e2tMaxSubReqTryCount = 1
167 xapp.Logger.Info("e2tMaxSubReqTryCount %v", e2tMaxSubReqTryCount)
168 e2tMaxSubDelReqTryCount = viper.GetUint64("controls.e2tMaxSubDelReqTryCount")
169 if e2tMaxSubDelReqTryCount == 0 {
170 e2tMaxSubDelReqTryCount = 1
172 xapp.Logger.Info("e2tMaxSubDelReqTryCount %v", e2tMaxSubDelReqTryCount)
174 readSubsFromDb = viper.GetString("controls.readSubsFromDb")
175 if readSubsFromDb == "" {
176 readSubsFromDb = "true"
178 xapp.Logger.Info("readSubsFromDb %v", readSubsFromDb)
181 //-------------------------------------------------------------------
183 //-------------------------------------------------------------------
184 func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
186 xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
187 for subId, subs := range register {
188 if subs.SubRespRcvd == false {
189 subs.NoRespToXapp = true
190 xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
191 c.SendSubscriptionDeleteReq(subs)
196 func (c *Control) ReadyCB(data interface{}) {
197 if c.RMRClient == nil {
198 c.RMRClient = xapp.Rmr
202 func (c *Control) Run() {
203 xapp.SetReadyCB(c.ReadyCB, nil)
207 //-------------------------------------------------------------------
209 //-------------------------------------------------------------------
210 func (c *Control) SubscriptionHandler(stype models.SubscriptionType, params interface{}) (*models.SubscriptionResponse, error) {
212 switch p := params.(type) {
213 case *models.ReportParams:
214 trans := c.tracker.NewXappTransaction(NewRmrEndpoint(p.ClientEndpoint),"" , 0, &xapp.RMRMeid{RanName: p.Meid})
216 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
219 defer trans.Release()
220 case *models.ControlParams:
221 case *models.PolicyParams:
224 return &models.SubscriptionResponse{}, fmt.Errorf("Subscription rest interface not implemented")
227 func (c *Control) SubscriptionDeleteHandler(s string) error {
231 func (c *Control) QueryHandler() (models.SubscriptionList, error) {
232 return c.registry.QueryHandler()
235 func (c *Control) TestRestHandler(w http.ResponseWriter, r *http.Request) {
237 xapp.Logger.Info("TestRestHandler() called")
239 pathParams := mux.Vars(r)
240 s := pathParams["testId"]
242 // This can be used to delete single subscription from db
243 if contains := strings.Contains(s, "deletesubid="); contains == true {
244 var splits = strings.Split(s, "=")
245 if subId, err := strconv.ParseInt(splits[1], 10, 64); err == nil {
246 xapp.Logger.Info("RemoveSubscriptionFromSdl() called. subId = %v", subId)
247 c.RemoveSubscriptionFromSdl(uint32(subId))
252 // This can be used to remove all subscriptions db from
254 xapp.Logger.Info("RemoveAllSubscriptionsFromSdl() called")
255 c.RemoveAllSubscriptionsFromSdl()
259 // This is meant to cause submgr's restart in testing
261 xapp.Logger.Info("os.Exit(1) called")
265 xapp.Logger.Info("Unsupported rest command received %s", s)
268 //-------------------------------------------------------------------
270 //-------------------------------------------------------------------
272 func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
273 params := &xapp.RMRParams{}
274 params.Mtype = trans.GetMtype()
275 params.SubId = int(subs.GetReqId().InstanceId)
277 params.Meid = subs.GetMeid()
279 params.PayloadLen = len(trans.Payload.Buf)
280 params.Payload = trans.Payload.Buf
282 xapp.Logger.Info("MSG to E2T: %s %s %s", desc, trans.String(), params.String())
283 return c.SendWithRetry(params, false, 5)
286 func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
288 params := &xapp.RMRParams{}
289 params.Mtype = trans.GetMtype()
290 params.SubId = int(subs.GetReqId().InstanceId)
291 params.Xid = trans.GetXid()
292 params.Meid = trans.GetMeid()
294 params.PayloadLen = len(trans.Payload.Buf)
295 params.Payload = trans.Payload.Buf
297 xapp.Logger.Info("MSG to XAPP: %s %s %s", desc, trans.String(), params.String())
298 return c.SendWithRetry(params, false, 5)
301 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
302 if c.RMRClient == nil {
303 err = fmt.Errorf("Rmr object nil can handle %s", msg.String())
304 xapp.Logger.Error("%s", err.Error())
309 defer c.RMRClient.Free(msg.Mbuf)
311 // xapp-frame might use direct access to c buffer and
312 // when msg.Mbuf is freed, someone might take it into use
313 // and payload data might be invalid inside message handle function
315 // subscriptions won't load system a lot so there is no
316 // real performance hit by cloning buffer into new go byte slice
317 cPay := append(msg.Payload[:0:0], msg.Payload...)
319 msg.PayloadLen = len(cPay)
322 case xapp.RIC_SUB_REQ:
323 go c.handleXAPPSubscriptionRequest(msg)
324 case xapp.RIC_SUB_RESP:
325 go c.handleE2TSubscriptionResponse(msg)
326 case xapp.RIC_SUB_FAILURE:
327 go c.handleE2TSubscriptionFailure(msg)
328 case xapp.RIC_SUB_DEL_REQ:
329 go c.handleXAPPSubscriptionDeleteRequest(msg)
330 case xapp.RIC_SUB_DEL_RESP:
331 go c.handleE2TSubscriptionDeleteResponse(msg)
332 case xapp.RIC_SUB_DEL_FAILURE:
333 go c.handleE2TSubscriptionDeleteFailure(msg)
335 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
340 //-------------------------------------------------------------------
341 // handle from XAPP Subscription Request
342 //------------------------------------------------------------------
343 func (c *Control) handleXAPPSubscriptionRequest(params *xapp.RMRParams) {
344 xapp.Logger.Info("MSG from XAPP: %s", params.String())
346 subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
348 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
352 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqMsg.RequestId.InstanceId, params.Meid)
354 xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
357 defer trans.Release()
359 err = c.tracker.Track(trans)
361 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
365 //TODO handle subscription toward e2term inside AssignToSubscription / hide handleSubscriptionCreate in it?
366 subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag)
368 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
375 go c.handleSubscriptionCreate(subs, trans)
376 event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
379 switch themsg := event.(type) {
380 case *e2ap.E2APSubscriptionResponse:
381 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
384 c.rmrSendToXapp("", subs, trans)
387 case *e2ap.E2APSubscriptionFailure:
388 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
390 c.rmrSendToXapp("", subs, trans)
396 xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(err, trans, subs))
397 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
400 //-------------------------------------------------------------------
401 // handle from XAPP Subscription Delete Request
402 //------------------------------------------------------------------
403 func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
404 xapp.Logger.Info("MSG from XAPP: %s", params.String())
406 subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
408 xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
412 trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subDelReqMsg.RequestId.InstanceId, params.Meid)
414 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
417 defer trans.Release()
419 err = c.tracker.Track(trans)
421 xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
425 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{trans.GetSubId()})
427 xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
434 go c.handleSubscriptionDelete(subs, trans)
435 trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
437 xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
439 if subs.NoRespToXapp == true {
440 // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
444 // Whatever is received success, fail or timeout, send successful delete response
445 subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
446 subDelRespMsg.RequestId = subs.GetReqId().RequestId
447 subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
448 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
450 c.rmrSendToXapp("", subs, trans)
453 //TODO handle subscription toward e2term insiged RemoveFromSubscription / hide handleSubscriptionDelete in it?
454 //c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
457 //-------------------------------------------------------------------
458 // SUBS CREATE Handling
459 //-------------------------------------------------------------------
460 func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp) {
462 var removeSubscriptionFromDb bool = false
463 trans := c.tracker.NewSubsTransaction(subs)
464 subs.WaitTransactionTurn(trans)
465 defer subs.ReleaseTransactionTurn(trans)
466 defer trans.Release()
468 xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
470 subRfMsg, valid := subs.GetCachedResponse()
471 if subRfMsg == nil && valid == true {
474 // In case of failure
475 // - make internal delete
476 // - in case duplicate cause, retry (currently max 1 retry)
478 maxRetries := uint64(1)
480 for retries := uint64(0); retries <= maxRetries && doRetry; retries++ {
483 event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
484 switch themsg := event.(type) {
485 case *e2ap.E2APSubscriptionResponse:
486 subRfMsg, valid = subs.SetCachedResponse(event, true)
487 subs.SubRespRcvd = true
488 case *e2ap.E2APSubscriptionFailure:
489 removeSubscriptionFromDb = true
490 subRfMsg, valid = subs.SetCachedResponse(event, false)
492 for _, item := range themsg.ActionNotAdmittedList.Items {
493 if item.Cause.Content != e2ap.E2AP_CauseContent_Ric || (item.Cause.Value != e2ap.E2AP_CauseValue_Ric_duplicate_action && item.Cause.Value != e2ap.E2AP_CauseValue_Ric_duplicate_event) {
498 xapp.Logger.Info("SUBS-SubReq: internal delete and possible retry due event(%s) retry(%t,%d/%d) %s", typeofSubsMessage(event), doRetry, retries, maxRetries, idstring(nil, trans, subs, parentTrans))
499 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
500 case *SubmgrRestartTestEvent:
501 // This simulates that no response has been received and after restart subscriptions are restored from db
502 xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
505 xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
506 removeSubscriptionFromDb = true
507 subRfMsg, valid = subs.SetCachedResponse(nil, false)
508 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
511 xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
513 xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
516 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
518 c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second, c)
521 c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
522 parentTrans.SendEvent(subRfMsg, 0)
525 //-------------------------------------------------------------------
526 // SUBS DELETE Handling
527 //-------------------------------------------------------------------
529 func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
531 trans := c.tracker.NewSubsTransaction(subs)
532 subs.WaitTransactionTurn(trans)
533 defer subs.ReleaseTransactionTurn(trans)
534 defer trans.Release()
536 xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
539 if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
542 c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
546 //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
547 // If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if
548 // RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...))
549 c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second, c)
550 c.registry.UpdateSubscriptionToDb(subs, c)
551 parentTrans.SendEvent(nil, 0)
554 //-------------------------------------------------------------------
555 // send to E2T Subscription Request
556 //-------------------------------------------------------------------
557 func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
559 var event interface{} = nil
560 var timedOut bool = false
562 subReqMsg := subs.SubReqMsg
563 subReqMsg.RequestId = subs.GetReqId().RequestId
564 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
566 xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
570 // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
571 c.WriteSubscriptionToDb(subs)
572 for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
573 desc := fmt.Sprintf("(retry %d)", retries)
574 c.rmrSendToE2T(desc, subs, trans)
575 if subs.DoNotWaitSubResp == false {
576 event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
581 // Simulating case where subscrition request has been sent but response has not been received before restart
582 event = &SubmgrRestartTestEvent{}
586 xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
590 //-------------------------------------------------------------------
591 // send to E2T Subscription Delete Request
592 //-------------------------------------------------------------------
594 func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
596 var event interface{}
599 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
600 subDelReqMsg.RequestId = subs.GetReqId().RequestId
601 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
602 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
604 xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
608 for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
609 desc := fmt.Sprintf("(retry %d)", retries)
610 c.rmrSendToE2T(desc, subs, trans)
611 event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
617 xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
621 //-------------------------------------------------------------------
622 // handle from E2T Subscription Reponse
623 //-------------------------------------------------------------------
624 func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
625 xapp.Logger.Info("MSG from E2T: %s", params.String())
626 subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
628 xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
631 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.InstanceId})
633 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
636 trans := subs.GetTransaction()
638 err = fmt.Errorf("Ongoing transaction not found")
639 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
642 sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
644 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
645 xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
650 //-------------------------------------------------------------------
651 // handle from E2T Subscription Failure
652 //-------------------------------------------------------------------
653 func (c *Control) handleE2TSubscriptionFailure(params *xapp.RMRParams) {
654 xapp.Logger.Info("MSG from E2T: %s", params.String())
655 subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
657 xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
660 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.InstanceId})
662 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
665 trans := subs.GetTransaction()
667 err = fmt.Errorf("Ongoing transaction not found")
668 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
671 sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
673 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
674 xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
679 //-------------------------------------------------------------------
680 // handle from E2T Subscription Delete Response
681 //-------------------------------------------------------------------
682 func (c *Control) handleE2TSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
683 xapp.Logger.Info("MSG from E2T: %s", params.String())
684 subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
686 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
689 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.InstanceId})
691 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
694 trans := subs.GetTransaction()
696 err = fmt.Errorf("Ongoing transaction not found")
697 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
700 sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
702 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
703 xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
708 //-------------------------------------------------------------------
709 // handle from E2T Subscription Delete Failure
710 //-------------------------------------------------------------------
711 func (c *Control) handleE2TSubscriptionDeleteFailure(params *xapp.RMRParams) {
712 xapp.Logger.Info("MSG from E2T: %s", params.String())
713 subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
715 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
718 subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.InstanceId})
720 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
723 trans := subs.GetTransaction()
725 err = fmt.Errorf("Ongoing transaction not found")
726 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
729 sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
731 err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
732 xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
737 //-------------------------------------------------------------------
739 //-------------------------------------------------------------------
740 func typeofSubsMessage(v interface{}) string {
745 case *e2ap.E2APSubscriptionRequest:
747 case *e2ap.E2APSubscriptionResponse:
749 case *e2ap.E2APSubscriptionFailure:
751 case *e2ap.E2APSubscriptionDeleteRequest:
753 case *e2ap.E2APSubscriptionDeleteResponse:
755 case *e2ap.E2APSubscriptionDeleteFailure:
762 //-------------------------------------------------------------------
764 //-------------------------------------------------------------------
765 func (c *Control) WriteSubscriptionToDb(subs *Subscription) {
766 xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
767 err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
769 xapp.Logger.Error("%v", err)
773 //-------------------------------------------------------------------
775 //-------------------------------------------------------------------
776 func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) {
778 if removeSubscriptionFromDb == true {
779 // Subscription was written in db already when subscription request was sent to BTS, except for merged request
780 c.RemoveSubscriptionFromDb(subs)
782 // Update is needed for successful response and merge case here
783 if subs.RetryFromXapp == false {
784 c.WriteSubscriptionToDb(subs)
787 subs.RetryFromXapp = false
790 //-------------------------------------------------------------------
792 //-------------------------------------------------------------------
793 func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
794 xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
795 err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
797 xapp.Logger.Error("%v", err)
801 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) {
803 xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
805 // Send delete for every endpoint in the subscription
806 subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
807 subDelReqMsg.RequestId = subs.GetReqId().RequestId
808 subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
809 mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
811 xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
814 for _, endPoint := range subs.EpList.Endpoints {
815 params := &xapp.RMRParams{}
817 params.SubId = int(subs.GetReqId().InstanceId)
819 params.Meid = subs.Meid
820 params.Src = endPoint.String()
821 params.PayloadLen = len(payload.Buf)
822 params.Payload = payload.Buf
826 xapp.Logger.Error("SendSubscriptionDeleteReq() params == nil")
830 subs.DeleteFromDb = true
831 c.handleXAPPSubscriptionDeleteRequest(params)