2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
24 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
25 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
26 rtmgrhandle "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client/handle"
27 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
28 httptransport "github.com/go-openapi/runtime/client"
29 "github.com/go-openapi/strfmt"
30 "github.com/spf13/viper"
36 //-----------------------------------------------------------------------------
38 //-----------------------------------------------------------------------------
40 var subReqTime time.Duration = 5 * time.Second
41 var subDelReqTime time.Duration = 5 * time.Second
42 var maxSubReqTryCount uint64 = 2 // Initial try + retry
43 var maxSubDelReqTryCount uint64 = 2 // Initial try + retry
50 rmrSendMutex sync.Mutex
70 xapp.Logger.Info("SUBMGR")
72 viper.SetEnvPrefix("submgr")
73 viper.AllowEmptyEnv(true)
74 seedSN = uint16(viper.GetInt("seed_sn"))
76 rand.Seed(time.Now().UnixNano())
77 seedSN = uint16(rand.Intn(65535))
82 xapp.Logger.Info("SUBMGR: Initial Sequence Number: %v", seedSN)
85 func NewControl() *Control {
87 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
88 client := rtmgrclient.New(transport, strfmt.Default)
89 handle := rtmgrhandle.NewProvideXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
90 deleteHandle := rtmgrhandle.NewDeleteXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
91 rtmgrClient := RtmgrClient{client, handle, deleteHandle}
93 registry := new(Registry)
94 registry.Initialize(seedSN)
95 registry.rtmgrClient = &rtmgrClient
97 tracker := new(Tracker)
100 timerMap := new(TimerMap)
103 return &Control{e2ap: new(E2ap),
111 func (c *Control) Run() {
115 func (c *Control) rmrSendRaw(desc string, params *RMRParams) (err error) {
117 xapp.Logger.Info("%s: %s", desc, params.String())
120 for ; i <= 10 && status == false; i++ {
121 c.rmrSendMutex.Lock()
122 status = xapp.Rmr.Send(params.RMRParams, false)
123 c.rmrSendMutex.Unlock()
125 xapp.Logger.Info("rmr.Send() failed. Retry count %d, %s", i, params.String())
126 time.Sleep(500 * time.Millisecond)
130 err = fmt.Errorf("rmr.Send() failed. Retry count %d, %s", i, params.String())
131 xapp.Logger.Error("%s: %s", desc, err.Error())
132 xapp.Rmr.Free(params.Mbuf)
137 func (c *Control) rmrSend(desc string, subs *Subscription, trans *Transaction) (err error) {
138 params := &RMRParams{&xapp.RMRParams{}}
139 params.Mtype = trans.GetMtype()
140 params.SubId = int(subs.GetSubId())
142 params.Meid = subs.GetMeid()
144 params.PayloadLen = len(trans.Payload.Buf)
145 params.Payload = trans.Payload.Buf
148 return c.rmrSendRaw(desc, params)
151 func (c *Control) rmrReplyToSender(desc string, subs *Subscription, trans *Transaction) (err error) {
152 params := &RMRParams{&xapp.RMRParams{}}
153 params.Mtype = trans.GetMtype()
154 params.SubId = int(subs.GetSubId())
155 params.Xid = trans.GetXid()
156 params.Meid = trans.GetMeid()
158 params.PayloadLen = len(trans.Payload.Buf)
159 params.Payload = trans.Payload.Buf
162 return c.rmrSendRaw(desc, params)
165 func (c *Control) Consume(params *xapp.RMRParams) (err error) {
166 xapp.Rmr.Free(params.Mbuf)
168 msg := &RMRParams{params}
171 case xapp.RICMessageTypes["RIC_SUB_REQ"]:
172 go c.handleSubscriptionRequest(msg)
173 case xapp.RICMessageTypes["RIC_SUB_RESP"]:
174 go c.handleSubscriptionResponse(msg)
175 case xapp.RICMessageTypes["RIC_SUB_FAILURE"]:
176 go c.handleSubscriptionFailure(msg)
177 case xapp.RICMessageTypes["RIC_SUB_DEL_REQ"]:
178 go c.handleSubscriptionDeleteRequest(msg)
179 case xapp.RICMessageTypes["RIC_SUB_DEL_RESP"]:
180 go c.handleSubscriptionDeleteResponse(msg)
181 case xapp.RICMessageTypes["RIC_SUB_DEL_FAILURE"]:
182 go c.handleSubscriptionDeleteFailure(msg)
184 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
190 func (c *Control) handleSubscriptionRequest(params *RMRParams) {
191 xapp.Logger.Info("SubReq from xapp: %s", params.String())
196 trans, err := c.tracker.TrackTransaction(NewRmrEndpoint(params.Src),
203 xapp.Logger.Error("SubReq: %s, Dropping this msg. %s", err.Error(), params.String())
210 trans.SubReqMsg, err = c.e2ap.UnpackSubscriptionRequest(params.Payload)
212 xapp.Logger.Error("SubReq: %s Dropping this msg. %s", err.Error(), trans)
220 subs, err := c.registry.ReserveSubscription(&trans.RmrEndpoint, trans.Meid)
222 xapp.Logger.Error("SubReq: %s, Dropping this msg. %s", err.Error(), trans)
227 err = subs.SetTransaction(trans)
229 xapp.Logger.Error("SubReq: %s, Dropping this msg. %s", err.Error(), trans)
235 trans.SubReqMsg.RequestId.Seq = uint32(subs.GetSubId())
238 // TODO: subscription create is in fact owned by subscription and not transaction.
239 // Transaction is toward xapp while Subscription is toward ran.
240 // In merge several xapps may wake transactions, while only one subscription
241 // toward ran occurs -> subscription owns subscription creation toward ran
243 // This is intermediate solution while improving message handling
245 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(trans.SubReqMsg)
247 xapp.Logger.Error("SubReq: %s for trans %s", err.Error(), trans)
253 c.rmrSend("SubReq: SubReq to E2T", subs, trans)
255 c.timerMap.StartTimer("RIC_SUB_REQ", int(subs.GetSubId()), subReqTime, FirstTry, c.handleSubscriptionRequestTimer)
256 xapp.Logger.Debug("SubReq: Debugging trans table = %v", c.tracker.transactionXappTable)
260 func (c *Control) handleSubscriptionResponse(params *RMRParams) {
261 xapp.Logger.Info("SubResp from E2T: %s", params.String())
266 SubRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
268 xapp.Logger.Error("SubResp: %s Dropping this msg. %s", err.Error(), params.String())
275 subs := c.registry.GetSubscription(uint16(SubRespMsg.RequestId.Seq))
276 if subs == nil && params.SubId > 0 {
277 subs = c.registry.GetSubscription(uint16(params.SubId))
281 xapp.Logger.Error("SubResp: Not valid subscription found payloadSeqNum: %d, SubId: %d. Dropping this msg. %s", SubRespMsg.RequestId.Seq, params.SubId, params.String())
284 xapp.Logger.Info("SubResp: subscription found payloadSeqNum: %d, SubId: %d", SubRespMsg.RequestId.Seq, subs.GetSubId())
289 trans := subs.GetTransaction()
291 xapp.Logger.Error("SubResp: Unknown trans. Dropping this msg. SubId: %d", subs.GetSubId())
295 trans.SubRespMsg = SubRespMsg
300 c.timerMap.StopTimer("RIC_SUB_REQ", int(subs.GetSubId()))
302 responseReceived := trans.CheckResponseReceived()
303 if responseReceived == true {
304 // Subscription timer already received
308 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(trans.SubRespMsg)
310 xapp.Logger.Error("SubResp: %s for trans %s", err.Error(), trans)
317 c.rmrReplyToSender("SubResp: SubResp to xapp", subs, trans)
321 func (c *Control) handleSubscriptionFailure(params *RMRParams) {
322 xapp.Logger.Info("SubFail from E2T: %s", params.String())
327 SubFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
329 xapp.Logger.Error("SubFail: %s Dropping this msg. %s", err.Error(), params.String())
336 subs := c.registry.GetSubscription(uint16(SubFailMsg.RequestId.Seq))
337 if subs == nil && params.SubId > 0 {
338 subs = c.registry.GetSubscription(uint16(params.SubId))
342 xapp.Logger.Error("SubFail: Not valid subscription found payloadSeqNum: %d, SubId: %d. Dropping this msg. %s", SubFailMsg.RequestId.Seq, params.SubId, params.String())
345 xapp.Logger.Info("SubFail: subscription found payloadSeqNum: %d, SubId: %d", SubFailMsg.RequestId.Seq, subs.GetSubId())
350 trans := subs.GetTransaction()
352 xapp.Logger.Error("SubFail: Unknown trans. Dropping this msg. SubId: %d", subs.GetSubId())
355 trans.SubFailMsg = SubFailMsg
360 c.timerMap.StopTimer("RIC_SUB_REQ", int(subs.GetSubId()))
362 responseReceived := trans.CheckResponseReceived()
367 if responseReceived == true {
368 // Subscription timer already received
372 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(trans.SubFailMsg)
374 c.rmrReplyToSender("SubFail: SubFail to xapp", subs, trans)
375 time.Sleep(3 * time.Second)
377 //TODO error handling improvement
378 xapp.Logger.Error("SubFail: %s for trans %s (continuing cleaning)", err.Error(), trans)
386 func (c *Control) handleSubscriptionRequestTimer(strId string, nbrId int, tryCount uint64) {
387 xapp.Logger.Info("SubReq timeout: subId: %v, tryCount: %v", nbrId, tryCount)
389 subs := c.registry.GetSubscription(uint16(nbrId))
391 xapp.Logger.Error("SubReq timeout: Unknown payloadSeqNum. Dropping this msg. SubId: %v", nbrId)
395 trans := subs.GetTransaction()
397 xapp.Logger.Error("SubReq timeout: Unknown trans. Dropping this msg. SubId: %v", subs.GetSubId())
401 responseReceived := trans.CheckResponseReceived()
403 if responseReceived == true {
404 // Subscription Response or Failure already received
408 if tryCount < maxSubReqTryCount {
409 xapp.Logger.Info("SubReq timeout: subs: %s trans: %s", subs, trans)
411 trans.RetryTransaction()
413 c.rmrSend("SubReq timeout: SubReq to E2T", subs, trans)
416 c.timerMap.StartTimer("RIC_SUB_REQ", int(subs.GetSubId()), subReqTime, tryCount, c.handleSubscriptionRequestTimer)
420 // Release CREATE transaction
423 // Create DELETE transaction (internal and no messages toward xapp)
424 deltrans, err := c.tracker.TrackTransaction(&trans.RmrEndpoint,
431 xapp.Logger.Error("SubReq timeout: %s, Dropping this msg.", err.Error())
432 //TODO improve error handling. Important at least in merge
437 deltrans.SubDelReqMsg = &e2ap.E2APSubscriptionDeleteRequest{}
438 deltrans.SubDelReqMsg.RequestId.Id = trans.SubReqMsg.RequestId.Id
439 deltrans.SubDelReqMsg.RequestId.Seq = uint32(subs.GetSubId())
440 deltrans.SubDelReqMsg.FunctionId = trans.SubReqMsg.FunctionId
441 deltrans.Mtype, deltrans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(deltrans.SubDelReqMsg)
443 xapp.Logger.Error("SubReq timeout: Packing SubDelReq failed. Err: %v", err)
444 //TODO improve error handling. Important at least in merge
450 err = subs.SetTransaction(deltrans)
452 xapp.Logger.Error("SubReq timeout: %s, Dropping this msg.", err.Error())
453 //TODO improve error handling. Important at least in merge
458 c.rmrSend("SubReq timer: SubDelReq to E2T", subs, deltrans)
459 c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()), subDelReqTime, FirstTry, c.handleSubscriptionDeleteRequestTimer)
463 func (c *Control) handleSubscriptionDeleteRequest(params *RMRParams) {
464 xapp.Logger.Info("SubDelReq from xapp: %s", params.String())
469 trans, err := c.tracker.TrackTransaction(NewRmrEndpoint(params.Src),
476 xapp.Logger.Error("SubDelReq: %s, Dropping this msg. %s", err.Error(), params.String())
483 trans.SubDelReqMsg, err = c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
485 xapp.Logger.Error("SubDelReq: %s Dropping this msg. %s", err.Error(), trans)
493 subs := c.registry.GetSubscription(uint16(trans.SubDelReqMsg.RequestId.Seq))
494 if subs == nil && params.SubId > 0 {
495 subs = c.registry.GetSubscription(uint16(params.SubId))
499 xapp.Logger.Error("SubDelReq: Not valid subscription found payloadSeqNum: %d, SubId: %d. Dropping this msg. %s", trans.SubDelReqMsg.RequestId.Seq, params.SubId, trans)
503 xapp.Logger.Info("SubDelReq: subscription found payloadSeqNum: %d, SubId: %d. %s", trans.SubDelReqMsg.RequestId.Seq, params.SubId, trans)
505 err = subs.SetTransaction(trans)
507 xapp.Logger.Error("SubDelReq: %s, Dropping this msg. %s", err.Error(), trans)
513 // TODO: subscription delete is in fact owned by subscription and not transaction.
514 // Transaction is toward xapp while Subscription is toward ran.
515 // In merge several xapps may wake transactions, while only one subscription
516 // toward ran occurs -> subscription owns subscription creation toward ran
518 // This is intermediate solution while improving message handling
520 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(trans.SubDelReqMsg)
522 xapp.Logger.Error("SubDelReq: %s for trans %s", err.Error(), trans)
529 c.rmrSend("SubDelReq: SubDelReq to E2T", subs, trans)
531 c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()), subDelReqTime, FirstTry, c.handleSubscriptionDeleteRequestTimer)
535 func (c *Control) handleSubscriptionDeleteResponse(params *RMRParams) (err error) {
536 xapp.Logger.Info("SubDelResp from E2T:%s", params.String())
541 SubDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
543 xapp.Logger.Error("SubDelResp: %s Dropping this msg. %s", err.Error(), params.String())
550 subs := c.registry.GetSubscription(uint16(SubDelRespMsg.RequestId.Seq))
551 if subs == nil && params.SubId > 0 {
552 subs = c.registry.GetSubscription(uint16(params.SubId))
556 xapp.Logger.Error("SubDelResp: Not valid subscription found payloadSeqNum: %d, SubId: %d. Dropping this msg. %s", SubDelRespMsg.RequestId.Seq, params.SubId, params.String())
559 xapp.Logger.Info("SubDelResp: subscription found payloadSeqNum: %d, SubId: %d", SubDelRespMsg.RequestId.Seq, subs.GetSubId())
564 trans := subs.GetTransaction()
566 xapp.Logger.Error("SubDelResp: Unknown trans. Dropping this msg. SubId: %d", subs.GetSubId())
570 trans.SubDelRespMsg = SubDelRespMsg
575 c.timerMap.StopTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()))
577 responseReceived := trans.CheckResponseReceived()
578 if responseReceived == true {
579 // Subscription Delete timer already received
583 c.sendSubscriptionDeleteResponse("SubDelResp", trans, subs)
587 func (c *Control) handleSubscriptionDeleteFailure(params *RMRParams) {
588 xapp.Logger.Info("SubDelFail from E2T:%s", params.String())
593 SubDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
595 xapp.Logger.Error("SubDelFail: %s Dropping this msg. %s", err.Error(), params.String())
602 subs := c.registry.GetSubscription(uint16(SubDelFailMsg.RequestId.Seq))
603 if subs == nil && params.SubId > 0 {
604 subs = c.registry.GetSubscription(uint16(params.SubId))
608 xapp.Logger.Error("SubDelFail: Not valid subscription found payloadSeqNum: %d, SubId: %d. Dropping this msg. %s", SubDelFailMsg.RequestId.Seq, params.SubId, params.String())
611 xapp.Logger.Info("SubDelFail: subscription found payloadSeqNum: %d, SubId: %d", SubDelFailMsg.RequestId.Seq, subs.GetSubId())
616 trans := subs.GetTransaction()
618 xapp.Logger.Error("SubDelFail: Unknown trans. Dropping this msg. SubId: %d", subs.GetSubId())
621 trans.SubDelFailMsg = SubDelFailMsg
626 c.timerMap.StopTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()))
628 responseReceived := trans.CheckResponseReceived()
629 if responseReceived == true {
630 // Subscription Delete timer already received
634 c.sendSubscriptionDeleteResponse("SubDelFail", trans, subs)
638 func (c *Control) handleSubscriptionDeleteRequestTimer(strId string, nbrId int, tryCount uint64) {
639 xapp.Logger.Info("SubDelReq timeout: subId: %v, tryCount: %v", nbrId, tryCount)
641 subs := c.registry.GetSubscription(uint16(nbrId))
643 xapp.Logger.Error("SubDelReq timeout: Unknown payloadSeqNum. Dropping this msg. SubId: %v", nbrId)
647 trans := subs.GetTransaction()
649 xapp.Logger.Error("SubDelReq timeout: Unknown trans. Dropping this msg. SubId: %v", subs.GetSubId())
653 responseReceived := trans.CheckResponseReceived()
654 if responseReceived == true {
655 // Subscription Delete Response or Failure already received
659 if tryCount < maxSubDelReqTryCount {
660 // Set possible to handle new response for the subId
661 trans.RetryTransaction()
662 c.rmrSend("SubDelReq timeout: SubDelReq to E2T", subs, trans)
664 c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()), subReqTime, tryCount, c.handleSubscriptionDeleteRequestTimer)
668 c.sendSubscriptionDeleteResponse("SubDelReq(timer)", trans, subs)
672 func (c *Control) sendSubscriptionDeleteResponse(desc string, trans *Transaction, subs *Subscription) {
674 if trans.ForwardRespToXapp == true {
675 //Always generate SubDelResp
676 trans.SubDelRespMsg = &e2ap.E2APSubscriptionDeleteResponse{}
677 trans.SubDelRespMsg.RequestId.Id = trans.SubDelReqMsg.RequestId.Id
678 trans.SubDelRespMsg.RequestId.Seq = uint32(subs.GetSubId())
679 trans.SubDelRespMsg.FunctionId = trans.SubDelReqMsg.FunctionId
682 trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(trans.SubDelRespMsg)
684 c.rmrReplyToSender(desc+": SubDelResp to xapp", subs, trans)
685 time.Sleep(3 * time.Second)
687 //TODO error handling improvement
688 xapp.Logger.Error("%s: %s for trans %s (continuing cleaning)", desc, err.Error(), trans)