2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
24 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
25 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
26 rtmgrhandle "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client/handle"
27 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
28 httptransport "github.com/go-openapi/runtime/client"
29 "github.com/go-openapi/strfmt"
30 "github.com/spf13/viper"
35 //-----------------------------------------------------------------------------
37 //-----------------------------------------------------------------------------
39 var subReqTime time.Duration = 5 * time.Second
40 var subDelReqTime time.Duration = 5 * time.Second
41 var maxSubReqTryCount uint64 = 2 // Initial try + retry
42 var maxSubDelReqTryCount uint64 = 2 // Initial try + retry
49 rmrSendMutex sync.Mutex
67 xapp.Logger.Info("SUBMGR")
69 viper.SetEnvPrefix("submgr")
70 viper.AllowEmptyEnv(true)
73 func NewControl() *Control {
75 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
76 client := rtmgrclient.New(transport, strfmt.Default)
77 handle := rtmgrhandle.NewProvideXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
78 deleteHandle := rtmgrhandle.NewDeleteXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
79 rtmgrClient := RtmgrClient{client, handle, deleteHandle}
81 registry := new(Registry)
83 registry.rtmgrClient = &rtmgrClient
85 tracker := new(Tracker)
88 timerMap := new(TimerMap)
91 return &Control{e2ap: new(E2ap),
99 func (c *Control) Run() {
103 func (c *Control) rmrSendRaw(desc string, params *RMRParams) (err error) {
105 xapp.Logger.Info("%s: %s", desc, params.String())
108 for ; i <= 10 && status == false; i++ {
109 c.rmrSendMutex.Lock()
110 status = xapp.Rmr.Send(params.RMRParams, false)
111 c.rmrSendMutex.Unlock()
113 xapp.Logger.Info("rmr.Send() failed. Retry count %d, %s", i, params.String())
114 time.Sleep(500 * time.Millisecond)
118 err = fmt.Errorf("rmr.Send() failed. Retry count %d, %s", i, params.String())
119 xapp.Logger.Error("%s: %s", desc, err.Error())
120 xapp.Rmr.Free(params.Mbuf)
125 func (c *Control) rmrSend(desc string, subs *Subscription, trans *Transaction) (err error) {
126 params := &RMRParams{&xapp.RMRParams{}}
127 params.Mtype = trans.GetMtype()
128 params.SubId = int(subs.GetSubId())
130 params.Meid = subs.GetMeid()
132 params.PayloadLen = len(trans.Payload.Buf)
133 params.Payload = trans.Payload.Buf
136 return c.rmrSendRaw(desc, params)
139 func (c *Control) rmrReplyToSender(desc string, subs *Subscription, trans *Transaction) (err error) {
140 params := &RMRParams{&xapp.RMRParams{}}
141 params.Mtype = trans.GetMtype()
142 params.SubId = int(subs.GetSubId())
143 params.Xid = trans.GetXid()
144 params.Meid = trans.GetMeid()
146 params.PayloadLen = len(trans.Payload.Buf)
147 params.Payload = trans.Payload.Buf
150 return c.rmrSendRaw(desc, params)
153 func (c *Control) Consume(params *xapp.RMRParams) (err error) {
154 xapp.Rmr.Free(params.Mbuf)
156 msg := &RMRParams{params}
159 case xapp.RICMessageTypes["RIC_SUB_REQ"]:
160 go c.handleSubscriptionRequest(msg)
161 case xapp.RICMessageTypes["RIC_SUB_RESP"]:
162 go c.handleSubscriptionResponse(msg)
163 case xapp.RICMessageTypes["RIC_SUB_FAILURE"]:
164 go c.handleSubscriptionFailure(msg)
165 case xapp.RICMessageTypes["RIC_SUB_DEL_REQ"]:
166 go c.handleSubscriptionDeleteRequest(msg)
167 case xapp.RICMessageTypes["RIC_SUB_DEL_RESP"]:
168 go c.handleSubscriptionDeleteResponse(msg)
169 case xapp.RICMessageTypes["RIC_SUB_DEL_FAILURE"]:
170 go c.handleSubscriptionDeleteFailure(msg)
172 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
178 func (c *Control) handleSubscriptionRequest(params *RMRParams) {
179 xapp.Logger.Info("SubReq from xapp: %s", params.String())
184 trans, err := c.tracker.TrackTransaction(NewRmrEndpoint(params.Src),
191 xapp.Logger.Error("SubReq: %s, Dropping this msg. %s", err.Error(), params.String())
198 trans.SubReqMsg, err = c.e2ap.UnpackSubscriptionRequest(params.Payload)
200 xapp.Logger.Error("SubReq: %s Dropping this msg. %s", err.Error(), trans)
208 subs, err := c.registry.ReserveSubscription(&trans.RmrEndpoint, trans.Meid)
210 xapp.Logger.Error("SubReq: %s, Dropping this msg. %s", err.Error(), trans)
215 err = subs.SetTransaction(trans)
217 xapp.Logger.Error("SubReq: %s, Dropping this msg. %s", err.Error(), trans)
223 trans.SubReqMsg.RequestId.Seq = uint32(subs.GetSubId())
226 // TODO: subscription create is in fact owned by subscription and not transaction.
227 // Transaction is toward xapp while Subscription is toward ran.
228 // In merge several xapps may wake transactions, while only one subscription
229 // toward ran occurs -> subscription owns subscription creation toward ran
231 // This is intermediate solution while improving message handling
233 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(trans.SubReqMsg)
235 xapp.Logger.Error("SubReq: %s for trans %s", err.Error(), trans)
241 c.rmrSend("SubReq: SubReq to E2T", subs, trans)
243 c.timerMap.StartTimer("RIC_SUB_REQ", int(subs.GetSubId()), subReqTime, FirstTry, c.handleSubscriptionRequestTimer)
244 xapp.Logger.Debug("SubReq: Debugging trans table = %v", c.tracker.transactionXappTable)
248 func (c *Control) handleSubscriptionResponse(params *RMRParams) {
249 xapp.Logger.Info("SubResp from E2T: %s", params.String())
254 SubRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
256 xapp.Logger.Error("SubResp: %s Dropping this msg. %s", err.Error(), params.String())
263 subs := c.registry.GetSubscription(uint16(SubRespMsg.RequestId.Seq))
264 if subs == nil && params.SubId > 0 {
265 subs = c.registry.GetSubscription(uint16(params.SubId))
269 xapp.Logger.Error("SubResp: Not valid subscription found payloadSeqNum: %d, SubId: %d. Dropping this msg. %s", SubRespMsg.RequestId.Seq, params.SubId, params.String())
272 xapp.Logger.Info("SubResp: subscription found payloadSeqNum: %d, SubId: %d", SubRespMsg.RequestId.Seq, subs.GetSubId())
277 trans := subs.GetTransaction()
279 xapp.Logger.Error("SubResp: Unknown trans. Dropping this msg. SubId: %d", subs.GetSubId())
283 trans.SubRespMsg = SubRespMsg
288 c.timerMap.StopTimer("RIC_SUB_REQ", int(subs.GetSubId()))
290 responseReceived := trans.CheckResponseReceived()
291 if responseReceived == true {
292 // Subscription timer already received
296 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(trans.SubRespMsg)
298 xapp.Logger.Error("SubResp: %s for trans %s", err.Error(), trans)
305 c.rmrReplyToSender("SubResp: SubResp to xapp", subs, trans)
309 func (c *Control) handleSubscriptionFailure(params *RMRParams) {
310 xapp.Logger.Info("SubFail from E2T: %s", params.String())
315 SubFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
317 xapp.Logger.Error("SubFail: %s Dropping this msg. %s", err.Error(), params.String())
324 subs := c.registry.GetSubscription(uint16(SubFailMsg.RequestId.Seq))
325 if subs == nil && params.SubId > 0 {
326 subs = c.registry.GetSubscription(uint16(params.SubId))
330 xapp.Logger.Error("SubFail: Not valid subscription found payloadSeqNum: %d, SubId: %d. Dropping this msg. %s", SubFailMsg.RequestId.Seq, params.SubId, params.String())
333 xapp.Logger.Info("SubFail: subscription found payloadSeqNum: %d, SubId: %d", SubFailMsg.RequestId.Seq, subs.GetSubId())
338 trans := subs.GetTransaction()
340 xapp.Logger.Error("SubFail: Unknown trans. Dropping this msg. SubId: %d", subs.GetSubId())
343 trans.SubFailMsg = SubFailMsg
348 c.timerMap.StopTimer("RIC_SUB_REQ", int(subs.GetSubId()))
350 responseReceived := trans.CheckResponseReceived()
355 if responseReceived == true {
356 // Subscription timer already received
360 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(trans.SubFailMsg)
362 c.rmrReplyToSender("SubFail: SubFail to xapp", subs, trans)
363 time.Sleep(3 * time.Second)
365 //TODO error handling improvement
366 xapp.Logger.Error("SubFail: %s for trans %s (continuing cleaning)", err.Error(), trans)
374 func (c *Control) handleSubscriptionRequestTimer(strId string, nbrId int, tryCount uint64) {
375 xapp.Logger.Info("SubReq timeout: subId: %v, tryCount: %v", nbrId, tryCount)
377 subs := c.registry.GetSubscription(uint16(nbrId))
379 xapp.Logger.Error("SubReq timeout: Unknown payloadSeqNum. Dropping this msg. SubId: %v", nbrId)
383 trans := subs.GetTransaction()
385 xapp.Logger.Error("SubReq timeout: Unknown trans. Dropping this msg. SubId: %v", subs.GetSubId())
389 responseReceived := trans.CheckResponseReceived()
391 if responseReceived == true {
392 // Subscription Response or Failure already received
396 if tryCount < maxSubReqTryCount {
397 xapp.Logger.Info("SubReq timeout: subs: %s trans: %s", subs, trans)
399 trans.RetryTransaction()
401 c.rmrSend("SubReq timeout: SubReq to E2T", subs, trans)
404 c.timerMap.StartTimer("RIC_SUB_REQ", int(subs.GetSubId()), subReqTime, tryCount, c.handleSubscriptionRequestTimer)
408 // Release CREATE transaction
411 // Create DELETE transaction (internal and no messages toward xapp)
412 deltrans, err := c.tracker.TrackTransaction(&trans.RmrEndpoint,
419 xapp.Logger.Error("SubReq timeout: %s, Dropping this msg.", err.Error())
420 //TODO improve error handling. Important at least in merge
425 deltrans.SubDelReqMsg = &e2ap.E2APSubscriptionDeleteRequest{}
426 deltrans.SubDelReqMsg.RequestId.Id = trans.SubReqMsg.RequestId.Id
427 deltrans.SubDelReqMsg.RequestId.Seq = uint32(subs.GetSubId())
428 deltrans.SubDelReqMsg.FunctionId = trans.SubReqMsg.FunctionId
429 deltrans.Mtype, deltrans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(deltrans.SubDelReqMsg)
431 xapp.Logger.Error("SubReq timeout: Packing SubDelReq failed. Err: %v", err)
432 //TODO improve error handling. Important at least in merge
438 err = subs.SetTransaction(deltrans)
440 xapp.Logger.Error("SubReq timeout: %s, Dropping this msg.", err.Error())
441 //TODO improve error handling. Important at least in merge
446 c.rmrSend("SubReq timer: SubDelReq to E2T", subs, deltrans)
447 c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()), subDelReqTime, FirstTry, c.handleSubscriptionDeleteRequestTimer)
451 func (c *Control) handleSubscriptionDeleteRequest(params *RMRParams) {
452 xapp.Logger.Info("SubDelReq from xapp: %s", params.String())
457 trans, err := c.tracker.TrackTransaction(NewRmrEndpoint(params.Src),
464 xapp.Logger.Error("SubDelReq: %s, Dropping this msg. %s", err.Error(), params.String())
471 trans.SubDelReqMsg, err = c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
473 xapp.Logger.Error("SubDelReq: %s Dropping this msg. %s", err.Error(), trans)
481 subs := c.registry.GetSubscription(uint16(trans.SubDelReqMsg.RequestId.Seq))
482 if subs == nil && params.SubId > 0 {
483 subs = c.registry.GetSubscription(uint16(params.SubId))
487 xapp.Logger.Error("SubDelReq: Not valid subscription found payloadSeqNum: %d, SubId: %d. Dropping this msg. %s", trans.SubDelReqMsg.RequestId.Seq, params.SubId, trans)
491 xapp.Logger.Info("SubDelReq: subscription found payloadSeqNum: %d, SubId: %d. %s", trans.SubDelReqMsg.RequestId.Seq, params.SubId, trans)
493 err = subs.SetTransaction(trans)
495 xapp.Logger.Error("SubDelReq: %s, Dropping this msg. %s", err.Error(), trans)
501 // TODO: subscription delete is in fact owned by subscription and not transaction.
502 // Transaction is toward xapp while Subscription is toward ran.
503 // In merge several xapps may wake transactions, while only one subscription
504 // toward ran occurs -> subscription owns subscription creation toward ran
506 // This is intermediate solution while improving message handling
508 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(trans.SubDelReqMsg)
510 xapp.Logger.Error("SubDelReq: %s for trans %s", err.Error(), trans)
517 c.rmrSend("SubDelReq: SubDelReq to E2T", subs, trans)
519 c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()), subDelReqTime, FirstTry, c.handleSubscriptionDeleteRequestTimer)
523 func (c *Control) handleSubscriptionDeleteResponse(params *RMRParams) (err error) {
524 xapp.Logger.Info("SubDelResp from E2T:%s", params.String())
529 SubDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
531 xapp.Logger.Error("SubDelResp: %s Dropping this msg. %s", err.Error(), params.String())
538 subs := c.registry.GetSubscription(uint16(SubDelRespMsg.RequestId.Seq))
539 if subs == nil && params.SubId > 0 {
540 subs = c.registry.GetSubscription(uint16(params.SubId))
544 xapp.Logger.Error("SubDelResp: Not valid subscription found payloadSeqNum: %d, SubId: %d. Dropping this msg. %s", SubDelRespMsg.RequestId.Seq, params.SubId, params.String())
547 xapp.Logger.Info("SubDelResp: subscription found payloadSeqNum: %d, SubId: %d", SubDelRespMsg.RequestId.Seq, subs.GetSubId())
552 trans := subs.GetTransaction()
554 xapp.Logger.Error("SubDelResp: Unknown trans. Dropping this msg. SubId: %d", subs.GetSubId())
558 trans.SubDelRespMsg = SubDelRespMsg
563 c.timerMap.StopTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()))
565 responseReceived := trans.CheckResponseReceived()
566 if responseReceived == true {
567 // Subscription Delete timer already received
571 c.sendSubscriptionDeleteResponse("SubDelResp", trans, subs)
575 func (c *Control) handleSubscriptionDeleteFailure(params *RMRParams) {
576 xapp.Logger.Info("SubDelFail from E2T:%s", params.String())
581 SubDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
583 xapp.Logger.Error("SubDelFail: %s Dropping this msg. %s", err.Error(), params.String())
590 subs := c.registry.GetSubscription(uint16(SubDelFailMsg.RequestId.Seq))
591 if subs == nil && params.SubId > 0 {
592 subs = c.registry.GetSubscription(uint16(params.SubId))
596 xapp.Logger.Error("SubDelFail: Not valid subscription found payloadSeqNum: %d, SubId: %d. Dropping this msg. %s", SubDelFailMsg.RequestId.Seq, params.SubId, params.String())
599 xapp.Logger.Info("SubDelFail: subscription found payloadSeqNum: %d, SubId: %d", SubDelFailMsg.RequestId.Seq, subs.GetSubId())
604 trans := subs.GetTransaction()
606 xapp.Logger.Error("SubDelFail: Unknown trans. Dropping this msg. SubId: %d", subs.GetSubId())
609 trans.SubDelFailMsg = SubDelFailMsg
614 c.timerMap.StopTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()))
616 responseReceived := trans.CheckResponseReceived()
617 if responseReceived == true {
618 // Subscription Delete timer already received
622 c.sendSubscriptionDeleteResponse("SubDelFail", trans, subs)
626 func (c *Control) handleSubscriptionDeleteRequestTimer(strId string, nbrId int, tryCount uint64) {
627 xapp.Logger.Info("SubDelReq timeout: subId: %v, tryCount: %v", nbrId, tryCount)
629 subs := c.registry.GetSubscription(uint16(nbrId))
631 xapp.Logger.Error("SubDelReq timeout: Unknown payloadSeqNum. Dropping this msg. SubId: %v", nbrId)
635 trans := subs.GetTransaction()
637 xapp.Logger.Error("SubDelReq timeout: Unknown trans. Dropping this msg. SubId: %v", subs.GetSubId())
641 responseReceived := trans.CheckResponseReceived()
642 if responseReceived == true {
643 // Subscription Delete Response or Failure already received
647 if tryCount < maxSubDelReqTryCount {
648 // Set possible to handle new response for the subId
649 trans.RetryTransaction()
650 c.rmrSend("SubDelReq timeout: SubDelReq to E2T", subs, trans)
652 c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()), subReqTime, tryCount, c.handleSubscriptionDeleteRequestTimer)
656 c.sendSubscriptionDeleteResponse("SubDelReq(timer)", trans, subs)
660 func (c *Control) sendSubscriptionDeleteResponse(desc string, trans *Transaction, subs *Subscription) {
662 if trans.ForwardRespToXapp == true {
663 //Always generate SubDelResp
664 trans.SubDelRespMsg = &e2ap.E2APSubscriptionDeleteResponse{}
665 trans.SubDelRespMsg.RequestId.Id = trans.SubDelReqMsg.RequestId.Id
666 trans.SubDelRespMsg.RequestId.Seq = uint32(subs.GetSubId())
667 trans.SubDelRespMsg.FunctionId = trans.SubDelReqMsg.FunctionId
670 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(trans.SubDelRespMsg)
672 c.rmrReplyToSender(desc+": SubDelResp to xapp", subs, trans)
673 time.Sleep(3 * time.Second)
675 //TODO error handling improvement
676 xapp.Logger.Error("%s: %s for trans %s (continuing cleaning)", desc, err.Error(), trans)