2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
24 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
25 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/packer"
26 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
27 rtmgrhandle "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client/handle"
28 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
29 httptransport "github.com/go-openapi/runtime/client"
30 "github.com/go-openapi/strfmt"
31 "github.com/spf13/viper"
37 //-----------------------------------------------------------------------------
39 //-----------------------------------------------------------------------------
41 var subReqTime time.Duration = 5 * time.Second
42 var subDelReqTime time.Duration = 5 * time.Second
43 var maxSubReqTryCount uint64 = 2 // Initial try + retry
44 var maxSubDelReqTryCount uint64 = 2 // Initial try + retry
51 rmrSendMutex sync.Mutex
71 xapp.Logger.Info("SUBMGR")
73 viper.SetEnvPrefix("submgr")
74 viper.AllowEmptyEnv(true)
75 seedSN = uint16(viper.GetInt("seed_sn"))
77 rand.Seed(time.Now().UnixNano())
78 seedSN = uint16(rand.Intn(65535))
83 xapp.Logger.Info("SUBMGR: Initial Sequence Number: %v", seedSN)
86 func NewControl() *Control {
88 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
89 client := rtmgrclient.New(transport, strfmt.Default)
90 handle := rtmgrhandle.NewProvideXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
91 deleteHandle := rtmgrhandle.NewDeleteXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
92 rtmgrClient := RtmgrClient{client, handle, deleteHandle}
94 registry := new(Registry)
95 registry.Initialize(seedSN)
96 registry.rtmgrClient = &rtmgrClient
98 tracker := new(Tracker)
101 timerMap := new(TimerMap)
104 return &Control{e2ap: new(E2ap),
112 func (c *Control) Run() {
116 func (c *Control) rmrSendRaw(desc string, params *RMRParams) (err error) {
118 xapp.Logger.Info("%s: %s", desc, params.String())
121 for ; i <= 10 && status == false; i++ {
122 c.rmrSendMutex.Lock()
123 status = xapp.Rmr.Send(params.RMRParams, false)
124 c.rmrSendMutex.Unlock()
126 xapp.Logger.Info("rmr.Send() failed. Retry count %d, %s", i, params.String())
127 time.Sleep(500 * time.Millisecond)
131 err = fmt.Errorf("rmr.Send() failed. Retry count %d, %s", i, params.String())
132 xapp.Logger.Error("%s: %s", desc, err.Error())
133 xapp.Rmr.Free(params.Mbuf)
138 func (c *Control) rmrSend(desc string, subs *Subscription, trans *Transaction, payload *packer.PackedData) (err error) {
139 params := &RMRParams{&xapp.RMRParams{}}
140 params.Mtype = trans.GetMtype()
141 params.SubId = int(subs.GetSubId())
143 params.Meid = subs.GetMeid()
145 params.PayloadLen = len(payload.Buf)
146 params.Payload = payload.Buf
149 return c.rmrSendRaw(desc, params)
152 func (c *Control) rmrReplyToSender(desc string, subs *Subscription, trans *Transaction, mType int, payload *packer.PackedData) (err error) {
153 params := &RMRParams{&xapp.RMRParams{}}
155 params.SubId = int(subs.GetSubId())
156 params.Xid = trans.GetXid()
157 params.Meid = trans.GetMeid()
159 params.PayloadLen = len(payload.Buf)
160 params.Payload = payload.Buf
163 return c.rmrSendRaw(desc, params)
166 func (c *Control) Consume(params *xapp.RMRParams) (err error) {
167 xapp.Rmr.Free(params.Mbuf)
169 msg := &RMRParams{params}
172 case xapp.RICMessageTypes["RIC_SUB_REQ"]:
173 go c.handleSubscriptionRequest(msg)
174 case xapp.RICMessageTypes["RIC_SUB_RESP"]:
175 go c.handleSubscriptionResponse(msg)
176 case xapp.RICMessageTypes["RIC_SUB_FAILURE"]:
177 go c.handleSubscriptionFailure(msg)
178 case xapp.RICMessageTypes["RIC_SUB_DEL_REQ"]:
179 go c.handleSubscriptionDeleteRequest(msg)
180 case xapp.RICMessageTypes["RIC_SUB_DEL_RESP"]:
181 go c.handleSubscriptionDeleteResponse(msg)
182 case xapp.RICMessageTypes["RIC_SUB_DEL_FAILURE"]:
183 go c.handleSubscriptionDeleteFailure(msg)
185 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
191 func (c *Control) handleSubscriptionRequest(params *RMRParams) {
192 xapp.Logger.Info("SubReq from xapp: %s", params.String())
197 trans, err := c.tracker.TrackTransaction(NewRmrEndpoint(params.Src),
205 xapp.Logger.Error("SubReq: %s, Dropping this msg. %s", err.Error(), params.String())
212 trans.SubReqMsg, err = c.e2ap.UnpackSubscriptionRequest(params.Payload)
214 xapp.Logger.Error("SubReq: %s Dropping this msg. %s", err.Error(), trans)
222 subs, err := c.registry.ReserveSubscription(&trans.RmrEndpoint, trans.Meid)
224 xapp.Logger.Error("SubReq: %s, Dropping this msg. %s", err.Error(), trans)
229 err = subs.SetTransaction(trans)
231 xapp.Logger.Error("SubReq: %s, Dropping this msg. %s", err.Error(), trans)
237 trans.SubReqMsg.RequestId.Seq = uint32(subs.GetSubId())
240 // TODO: subscription create is in fact owned by subscription and not transaction.
241 // Transaction is toward xapp while Subscription is toward ran.
242 // In merge several xapps may wake transactions, while only one subscription
243 // toward ran occurs -> subscription owns subscription creation toward ran
245 // This is intermediate solution while improving message handling
247 trans.Payload, err = c.e2ap.PackSubscriptionRequest(trans.SubReqMsg)
249 xapp.Logger.Error("SubReq: %s for trans %s", err.Error(), trans)
255 c.rmrSend("SubReq: SubReq to E2T", subs, trans, trans.Payload)
257 c.timerMap.StartTimer("RIC_SUB_REQ", int(subs.GetSubId()), subReqTime, FirstTry, c.handleSubscriptionRequestTimer)
258 xapp.Logger.Debug("SubReq: Debugging trans table = %v", c.tracker.transactionXappTable)
262 func (c *Control) handleSubscriptionResponse(params *RMRParams) {
263 xapp.Logger.Info("SubResp from E2T: %s", params.String())
268 SubRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
270 xapp.Logger.Error("SubDelReq: %s Dropping this msg. %s", err.Error(), params.String())
277 subs := c.registry.GetSubscription(uint16(SubRespMsg.RequestId.Seq))
278 if subs == nil && params.SubId > 0 {
279 subs = c.registry.GetSubscription(uint16(params.SubId))
283 xapp.Logger.Error("SubResp: Not valid subscription found payloadSeqNum: %d, SubId: %d. Dropping this msg. %s", SubRespMsg.RequestId.Seq, params.SubId, params.String())
286 xapp.Logger.Info("SubResp: subscription found payloadSeqNum: %d, SubId: %d", SubRespMsg.RequestId.Seq, subs.GetSubId())
291 trans := subs.GetTransaction()
293 xapp.Logger.Error("SubResp: Unknown trans. Dropping this msg. SubId: %d", subs.GetSubId())
297 trans.SubRespMsg = SubRespMsg
302 c.timerMap.StopTimer("RIC_SUB_REQ", int(subs.GetSubId()))
304 responseReceived := trans.CheckResponseReceived()
305 if responseReceived == true {
306 // Subscription timer already received
310 trans.Payload, err = c.e2ap.PackSubscriptionResponse(trans.SubRespMsg)
312 xapp.Logger.Error("SubResp: %s for trans %s", err.Error(), trans)
319 c.rmrReplyToSender("SubResp: SubResp to xapp", subs, trans, 12011, trans.Payload)
323 func (c *Control) handleSubscriptionFailure(params *RMRParams) {
324 xapp.Logger.Info("SubFail from E2T: %s", params.String())
329 SubFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
331 xapp.Logger.Error("SubFail: %s Dropping this msg. %s", err.Error(), params.String())
338 subs := c.registry.GetSubscription(uint16(SubFailMsg.RequestId.Seq))
339 if subs == nil && params.SubId > 0 {
340 subs = c.registry.GetSubscription(uint16(params.SubId))
344 xapp.Logger.Error("SubFail: Not valid subscription found payloadSeqNum: %d, SubId: %d. Dropping this msg. %s", SubFailMsg.RequestId.Seq, params.SubId, params.String())
347 xapp.Logger.Info("SubFail: subscription found payloadSeqNum: %d, SubId: %d", SubFailMsg.RequestId.Seq, subs.GetSubId())
352 trans := subs.GetTransaction()
354 xapp.Logger.Error("SubFail: Unknown trans. Dropping this msg. SubId: %d", subs.GetSubId())
357 trans.SubFailMsg = SubFailMsg
362 c.timerMap.StopTimer("RIC_SUB_REQ", int(subs.GetSubId()))
364 responseReceived := trans.CheckResponseReceived()
369 if responseReceived == true {
370 // Subscription timer already received
374 trans.Payload, err = c.e2ap.PackSubscriptionFailure(trans.SubFailMsg)
376 c.rmrReplyToSender("SubFail: SubFail to xapp", subs, trans, 12012, trans.Payload)
377 time.Sleep(3 * time.Second)
379 //TODO error handling improvement
380 xapp.Logger.Error("SubFail: %s for trans %s (continuing cleaning)", err.Error(), trans)
388 func (c *Control) handleSubscriptionRequestTimer(strId string, nbrId int, tryCount uint64) {
389 xapp.Logger.Info("SubReq timeout: subId: %v, tryCount: %v", nbrId, tryCount)
391 subs := c.registry.GetSubscription(uint16(nbrId))
393 xapp.Logger.Error("SubReq timeout: Unknown payloadSeqNum. Dropping this msg. SubId: %v", nbrId)
397 trans := subs.GetTransaction()
399 xapp.Logger.Error("SubReq timeout: Unknown trans. Dropping this msg. SubId: %v", subs.GetSubId())
403 responseReceived := trans.CheckResponseReceived()
405 if responseReceived == true {
406 // Subscription Response or Failure already received
410 if tryCount < maxSubReqTryCount {
411 xapp.Logger.Info("SubReq timeout: Resending SubReq to E2T: Mtype: %v, SubId: %v, Xid %s, Meid %v", trans.GetMtype(), subs.GetSubId(), trans.GetXid(), trans.GetMeid())
413 trans.RetryTransaction()
415 c.rmrSend("SubReq timeout: SubReq to E2T", subs, trans, trans.Payload)
418 c.timerMap.StartTimer("RIC_SUB_REQ", int(subs.GetSubId()), subReqTime, tryCount, c.handleSubscriptionRequestTimer)
422 // Release CREATE transaction
425 // Create DELETE transaction (internal and no messages toward xapp)
426 deltrans, err := c.tracker.TrackTransaction(&trans.RmrEndpoint,
427 12020, // RIC SUBSCRIPTION DELETE
434 xapp.Logger.Error("SubReq timeout: %s, Dropping this msg.", err.Error())
435 //TODO improve error handling. Important at least in merge
440 deltrans.SubDelReqMsg = &e2ap.E2APSubscriptionDeleteRequest{}
441 deltrans.SubDelReqMsg.RequestId.Id = trans.SubReqMsg.RequestId.Id
442 deltrans.SubDelReqMsg.RequestId.Seq = uint32(subs.GetSubId())
443 deltrans.SubDelReqMsg.FunctionId = trans.SubReqMsg.FunctionId
444 deltrans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(deltrans.SubDelReqMsg)
446 xapp.Logger.Error("SubReq timeout: Packing SubDelReq failed. Err: %v", err)
447 //TODO improve error handling. Important at least in merge
453 err = subs.SetTransaction(deltrans)
455 xapp.Logger.Error("SubReq timeout: %s, Dropping this msg.", err.Error())
456 //TODO improve error handling. Important at least in merge
461 c.rmrSend("SubReq timer: SubDelReq to E2T", subs, deltrans, deltrans.Payload)
462 c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()), subDelReqTime, FirstTry, c.handleSubscriptionDeleteRequestTimer)
466 func (c *Control) handleSubscriptionDeleteRequest(params *RMRParams) {
467 xapp.Logger.Info("SubDelReq from xapp: %s", params.String())
472 trans, err := c.tracker.TrackTransaction(NewRmrEndpoint(params.Src),
480 xapp.Logger.Error("SubDelReq: %s, Dropping this msg. %s", err.Error(), params.String())
487 trans.SubDelReqMsg, err = c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
489 xapp.Logger.Error("SubDelReq: %s Dropping this msg. %s", err.Error(), trans)
497 subs := c.registry.GetSubscription(uint16(trans.SubDelReqMsg.RequestId.Seq))
498 if subs == nil && params.SubId > 0 {
499 subs = c.registry.GetSubscription(uint16(params.SubId))
503 xapp.Logger.Error("SubDelReq: Not valid subscription found payloadSeqNum: %d, SubId: %d. Dropping this msg. %s", trans.SubDelReqMsg.RequestId.Seq, params.SubId, trans)
507 xapp.Logger.Info("SubDelReq: subscription found payloadSeqNum: %d, SubId: %d. %s", trans.SubDelReqMsg.RequestId.Seq, params.SubId, trans)
509 err = subs.SetTransaction(trans)
511 xapp.Logger.Error("SubDelReq: %s, Dropping this msg. %s", err.Error(), trans)
517 // TODO: subscription delete is in fact owned by subscription and not transaction.
518 // Transaction is toward xapp while Subscription is toward ran.
519 // In merge several xapps may wake transactions, while only one subscription
520 // toward ran occurs -> subscription owns subscription creation toward ran
522 // This is intermediate solution while improving message handling
524 trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(trans.SubDelReqMsg)
526 xapp.Logger.Error("SubDelReq: %s for trans %s", err.Error(), trans)
533 c.rmrSend("SubDelReq: SubDelReq to E2T", subs, trans, trans.Payload)
535 c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()), subDelReqTime, FirstTry, c.handleSubscriptionDeleteRequestTimer)
539 func (c *Control) handleSubscriptionDeleteResponse(params *RMRParams) (err error) {
540 xapp.Logger.Info("SubDelResp from E2T:%s", params.String())
545 SubDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
547 xapp.Logger.Error("SubDelResp: %s Dropping this msg. %s", err.Error(), params.String())
554 subs := c.registry.GetSubscription(uint16(SubDelRespMsg.RequestId.Seq))
555 if subs == nil && params.SubId > 0 {
556 subs = c.registry.GetSubscription(uint16(params.SubId))
560 xapp.Logger.Error("SubDelResp: Not valid subscription found payloadSeqNum: %d, SubId: %d. Dropping this msg. %s", SubDelRespMsg.RequestId.Seq, params.SubId, params.String())
563 xapp.Logger.Info("SubDelResp: subscription found payloadSeqNum: %d, SubId: %d", SubDelRespMsg.RequestId.Seq, subs.GetSubId())
568 trans := subs.GetTransaction()
570 xapp.Logger.Error("SubDelResp: Unknown trans. Dropping this msg. SubId: %d", subs.GetSubId())
574 trans.SubDelRespMsg = SubDelRespMsg
579 c.timerMap.StopTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()))
581 responseReceived := trans.CheckResponseReceived()
582 if responseReceived == true {
583 // Subscription Delete timer already received
587 c.sendSubscriptionDeleteResponse("SubDelResp", trans, subs)
591 func (c *Control) handleSubscriptionDeleteFailure(params *RMRParams) {
592 xapp.Logger.Info("SubDelFail from E2T:%s", params.String())
597 SubDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
599 xapp.Logger.Error("SubDelFail: %s Dropping this msg. %s", err.Error(), params.String())
606 subs := c.registry.GetSubscription(uint16(SubDelFailMsg.RequestId.Seq))
607 if subs == nil && params.SubId > 0 {
608 subs = c.registry.GetSubscription(uint16(params.SubId))
612 xapp.Logger.Error("SubDelFail: Not valid subscription found payloadSeqNum: %d, SubId: %d. Dropping this msg. %s", SubDelFailMsg.RequestId.Seq, params.SubId, params.String())
615 xapp.Logger.Info("SubDelFail: subscription found payloadSeqNum: %d, SubId: %d", SubDelFailMsg.RequestId.Seq, subs.GetSubId())
620 trans := subs.GetTransaction()
622 xapp.Logger.Error("SubDelFail: Unknown trans. Dropping this msg. SubId: %d", subs.GetSubId())
625 trans.SubDelFailMsg = SubDelFailMsg
630 c.timerMap.StopTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()))
632 responseReceived := trans.CheckResponseReceived()
633 if responseReceived == true {
634 // Subscription Delete timer already received
638 c.sendSubscriptionDeleteResponse("SubDelFail", trans, subs)
642 func (c *Control) handleSubscriptionDeleteRequestTimer(strId string, nbrId int, tryCount uint64) {
643 xapp.Logger.Info("SubDelReq timeout: subId: %v, tryCount: %v", nbrId, tryCount)
645 subs := c.registry.GetSubscription(uint16(nbrId))
647 xapp.Logger.Error("SubDelReq timeout: Unknown payloadSeqNum. Dropping this msg. SubId: %v", nbrId)
651 trans := subs.GetTransaction()
653 xapp.Logger.Error("SubDelReq timeout: Unknown trans. Dropping this msg. SubId: %v", subs.GetSubId())
657 responseReceived := trans.CheckResponseReceived()
658 if responseReceived == true {
659 // Subscription Delete Response or Failure already received
663 if tryCount < maxSubDelReqTryCount {
664 // Set possible to handle new response for the subId
665 trans.RetryTransaction()
666 c.rmrSend("SubDelReq timeout: SubDelReq to E2T", subs, trans, trans.Payload)
668 c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()), subReqTime, tryCount, c.handleSubscriptionDeleteRequestTimer)
672 c.sendSubscriptionDeleteResponse("SubDelReq(timer)", trans, subs)
676 func (c *Control) sendSubscriptionDeleteResponse(desc string, trans *Transaction, subs *Subscription) {
678 if trans.ForwardRespToXapp == true {
679 //Always generate SubDelResp
680 trans.SubDelRespMsg = &e2ap.E2APSubscriptionDeleteResponse{}
681 trans.SubDelRespMsg.RequestId.Id = trans.SubDelReqMsg.RequestId.Id
682 trans.SubDelRespMsg.RequestId.Seq = uint32(subs.GetSubId())
683 trans.SubDelRespMsg.FunctionId = trans.SubDelReqMsg.FunctionId
686 trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(trans.SubDelRespMsg)
688 c.rmrReplyToSender(desc+": SubDelResp to xapp", subs, trans, 12021, trans.Payload)
689 time.Sleep(3 * time.Second)
691 //TODO error handling improvement
692 xapp.Logger.Error("%s: %s for trans %s (continuing cleaning)", desc, err.Error(), trans)