2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
24 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
25 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
26 rtmgrhandle "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client/handle"
27 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
28 httptransport "github.com/go-openapi/runtime/client"
29 "github.com/go-openapi/strfmt"
30 "github.com/spf13/viper"
36 //-----------------------------------------------------------------------------
38 //-----------------------------------------------------------------------------
40 var subReqTime time.Duration = 5 * time.Second
41 var subDelReqTime time.Duration = 5 * time.Second
42 var maxSubReqTryCount uint64 = 2 // Initial try + retry
43 var maxSubDelReqTryCount uint64 = 2 // Initial try + retry
50 rmrSendMutex sync.Mutex
70 xapp.Logger.Info("SUBMGR")
72 viper.SetEnvPrefix("submgr")
73 viper.AllowEmptyEnv(true)
74 seedSN = uint16(viper.GetInt("seed_sn"))
76 rand.Seed(time.Now().UnixNano())
77 seedSN = uint16(rand.Intn(65535))
82 xapp.Logger.Info("SUBMGR: Initial Sequence Number: %v", seedSN)
85 func NewControl() *Control {
87 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
88 client := rtmgrclient.New(transport, strfmt.Default)
89 handle := rtmgrhandle.NewProvideXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
90 deleteHandle := rtmgrhandle.NewDeleteXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
91 rtmgrClient := RtmgrClient{client, handle, deleteHandle}
93 registry := new(Registry)
94 registry.Initialize(seedSN)
95 registry.rtmgrClient = &rtmgrClient
97 tracker := new(Tracker)
100 timerMap := new(TimerMap)
103 return &Control{e2ap: new(E2ap),
111 func (c *Control) Run() {
115 func (c *Control) rmrSendRaw(desc string, params *RMRParams) (err error) {
117 xapp.Logger.Info("%s: %s", desc, params.String())
120 for ; i <= 10 && status == false; i++ {
121 c.rmrSendMutex.Lock()
122 status = xapp.Rmr.Send(params.RMRParams, false)
123 c.rmrSendMutex.Unlock()
125 xapp.Logger.Info("rmr.Send() failed. Retry count %d, %s", i, params.String())
126 time.Sleep(500 * time.Millisecond)
130 err = fmt.Errorf("rmr.Send() failed. Retry count %d, %s", i, params.String())
131 xapp.Logger.Error("%s: %s", desc, err.Error())
132 xapp.Rmr.Free(params.Mbuf)
137 func (c *Control) rmrSend(desc string, subs *Subscription, trans *Transaction, payload []byte, payloadLen int) (err error) {
138 params := &RMRParams{&xapp.RMRParams{}}
139 params.Mtype = trans.GetMtype()
140 params.SubId = int(subs.GetSubId())
142 params.Meid = subs.GetMeid()
144 params.PayloadLen = payloadLen
145 params.Payload = payload
148 return c.rmrSendRaw(desc, params)
151 func (c *Control) rmrReplyToSender(desc string, subs *Subscription, trans *Transaction, mType int, payload []byte, payloadLen int) (err error) {
152 params := &RMRParams{&xapp.RMRParams{}}
154 params.SubId = int(subs.GetSubId())
155 params.Xid = trans.GetXid()
156 params.Meid = trans.GetMeid()
158 params.PayloadLen = payloadLen
159 params.Payload = payload
162 return c.rmrSendRaw(desc, params)
165 func (c *Control) Consume(params *xapp.RMRParams) (err error) {
166 xapp.Rmr.Free(params.Mbuf)
168 msg := &RMRParams{params}
171 case xapp.RICMessageTypes["RIC_SUB_REQ"]:
172 go c.handleSubscriptionRequest(msg)
173 case xapp.RICMessageTypes["RIC_SUB_RESP"]:
174 go c.handleSubscriptionResponse(msg)
175 case xapp.RICMessageTypes["RIC_SUB_FAILURE"]:
176 go c.handleSubscriptionFailure(msg)
177 case xapp.RICMessageTypes["RIC_SUB_DEL_REQ"]:
178 go c.handleSubscriptionDeleteRequest(msg)
179 case xapp.RICMessageTypes["RIC_SUB_DEL_RESP"]:
180 go c.handleSubscriptionDeleteResponse(msg)
181 case xapp.RICMessageTypes["RIC_SUB_DEL_FAILURE"]:
182 go c.handleSubscriptionDeleteFailure(msg)
184 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
190 func (c *Control) handleSubscriptionRequest(params *RMRParams) {
191 xapp.Logger.Info("SubReq from xapp: %s", params.String())
196 trans, err := c.tracker.TrackTransaction(NewRmrEndpoint(params.Src),
204 xapp.Logger.Error("SubReq: %s, Dropping this msg. %s", err.Error(), params.String())
211 trans.SubReqMsg, err = c.e2ap.UnpackSubscriptionRequest(params.Payload)
213 xapp.Logger.Error("SubReq: %s Dropping this msg. %s", err.Error(), trans)
221 subs, err := c.registry.ReserveSubscription(&trans.RmrEndpoint, trans.Meid)
223 xapp.Logger.Error("SubReq: %s, Dropping this msg. %s", err.Error(), trans)
228 err = subs.SetTransaction(trans)
230 xapp.Logger.Error("SubReq: %s, Dropping this msg. %s", err.Error(), trans)
236 trans.SubReqMsg.RequestId.Seq = uint32(subs.GetSubId())
239 // TODO: subscription create is in fact owned by subscription and not transaction.
240 // Transaction is toward xapp while Subscription is toward ran.
241 // In merge several xapps may wake transactions, while only one subscription
242 // toward ran occurs -> subscription owns subscription creation toward ran
244 // This is intermediate solution while improving message handling
246 packedData, err := c.e2ap.PackSubscriptionRequest(trans.SubReqMsg)
248 xapp.Logger.Error("SubReq: %s for trans %s", err.Error(), trans)
254 //Optimize and store packed message to be sent (for retransmission). Again owned by subscription?
255 trans.Payload = packedData.Buf
256 trans.PayloadLen = len(packedData.Buf)
258 c.rmrSend("SubReq to E2T", subs, trans, packedData.Buf, len(packedData.Buf))
260 c.timerMap.StartTimer("RIC_SUB_REQ", int(subs.GetSubId()), subReqTime, FirstTry, c.handleSubscriptionRequestTimer)
261 xapp.Logger.Debug("SubReq: Debugging trans table = %v", c.tracker.transactionXappTable)
265 func (c *Control) handleSubscriptionResponse(params *RMRParams) {
266 xapp.Logger.Info("SubResp from E2T: %s", params.String())
271 SubRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
273 xapp.Logger.Error("SubDelReq: %s Dropping this msg. %s", err.Error(), params.String())
280 subs := c.registry.GetSubscription(uint16(SubRespMsg.RequestId.Seq))
281 if subs == nil && params.SubId > 0 {
282 subs = c.registry.GetSubscription(uint16(params.SubId))
286 xapp.Logger.Error("SubResp: Not valid subscription found payloadSeqNum: %d, SubId: %d. Dropping this msg. %s", SubRespMsg.RequestId.Seq, params.SubId, params.String())
289 xapp.Logger.Info("SubResp: subscription found payloadSeqNum: %d, SubId: %d", SubRespMsg.RequestId.Seq, subs.GetSubId())
294 trans := subs.GetTransaction()
296 xapp.Logger.Error("SubResp: Unknown trans. Dropping this msg. SubId: %d", subs.GetSubId())
300 trans.SubRespMsg = SubRespMsg
305 c.timerMap.StopTimer("RIC_SUB_REQ", int(subs.GetSubId()))
307 responseReceived := trans.CheckResponseReceived()
308 if responseReceived == true {
309 // Subscription timer already received
313 packedData, err := c.e2ap.PackSubscriptionResponse(trans.SubRespMsg)
315 xapp.Logger.Error("SubResp: %s for trans %s", err.Error(), trans)
320 //Optimize and store packed message to be sent.
321 trans.Payload = packedData.Buf
322 trans.PayloadLen = len(packedData.Buf)
326 c.rmrReplyToSender("SubResp to xapp", subs, trans, 12011, trans.Payload, trans.PayloadLen)
330 func (c *Control) handleSubscriptionFailure(params *RMRParams) {
331 xapp.Logger.Info("SubFail from E2T: %s", params.String())
336 SubFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
338 xapp.Logger.Error("SubFail: %s Dropping this msg. %s", err.Error(), params.String())
345 subs := c.registry.GetSubscription(uint16(SubFailMsg.RequestId.Seq))
346 if subs == nil && params.SubId > 0 {
347 subs = c.registry.GetSubscription(uint16(params.SubId))
351 xapp.Logger.Error("SubFail: Not valid subscription found payloadSeqNum: %d, SubId: %d. Dropping this msg. %s", SubFailMsg.RequestId.Seq, params.SubId, params.String())
354 xapp.Logger.Info("SubFail: subscription found payloadSeqNum: %d, SubId: %d", SubFailMsg.RequestId.Seq, subs.GetSubId())
359 trans := subs.GetTransaction()
361 xapp.Logger.Error("SubFail: Unknown trans. Dropping this msg. SubId: %d", subs.GetSubId())
364 trans.SubFailMsg = SubFailMsg
369 c.timerMap.StopTimer("RIC_SUB_REQ", int(subs.GetSubId()))
371 responseReceived := trans.CheckResponseReceived()
376 if responseReceived == true {
377 // Subscription timer already received
381 packedData, err := c.e2ap.PackSubscriptionFailure(trans.SubFailMsg)
383 //TODO error handling improvement
384 xapp.Logger.Error("SubFail: %s for trans %s (continuing cleaning)", err.Error(), trans)
386 //Optimize and store packed message to be sent.
387 trans.Payload = packedData.Buf
388 trans.PayloadLen = len(packedData.Buf)
389 c.rmrReplyToSender("SubFail to xapp", subs, trans, 12012, trans.Payload, trans.PayloadLen)
390 time.Sleep(3 * time.Second)
398 func (c *Control) handleSubscriptionRequestTimer(strId string, nbrId int, tryCount uint64) {
399 xapp.Logger.Info("SubReq timeout: subId: %v, tryCount: %v", nbrId, tryCount)
401 subs := c.registry.GetSubscription(uint16(nbrId))
403 xapp.Logger.Error("SubReq timeout: Unknown payloadSeqNum. Dropping this msg. SubId: %v", nbrId)
407 trans := subs.GetTransaction()
409 xapp.Logger.Error("SubReq timeout: Unknown trans. Dropping this msg. SubId: %v", subs.GetSubId())
413 responseReceived := trans.CheckResponseReceived()
415 if responseReceived == true {
416 // Subscription Response or Failure already received
420 if tryCount < maxSubReqTryCount {
421 xapp.Logger.Info("SubReq timeout: Resending SubReq to E2T: Mtype: %v, SubId: %v, Xid %s, Meid %v", trans.GetMtype(), subs.GetSubId(), trans.GetXid(), trans.GetMeid())
423 trans.RetryTransaction()
425 c.rmrSend("SubReq(SubReq timer retransmit) to E2T", subs, trans, trans.Payload, trans.PayloadLen)
428 c.timerMap.StartTimer("RIC_SUB_REQ", int(subs.GetSubId()), subReqTime, tryCount, c.handleSubscriptionRequestTimer)
432 // Delete CREATE transaction
435 // Create DELETE transaction (internal and no messages toward xapp)
436 deltrans, err := c.tracker.TrackTransaction(&trans.RmrEndpoint,
437 12020, // RIC SUBSCRIPTION DELETE
444 xapp.Logger.Error("SubReq timeout: %s, Dropping this msg.", err.Error())
445 //TODO improve error handling. Important at least in merge
450 deltrans.SubDelReqMsg = &e2ap.E2APSubscriptionDeleteRequest{}
451 deltrans.SubDelReqMsg.RequestId.Id = trans.SubReqMsg.RequestId.Id
452 deltrans.SubDelReqMsg.RequestId.Seq = uint32(subs.GetSubId())
453 deltrans.SubDelReqMsg.FunctionId = trans.SubReqMsg.FunctionId
454 packedData, err := c.e2ap.PackSubscriptionDeleteRequest(deltrans.SubDelReqMsg)
456 xapp.Logger.Error("SubReq timeout: Packing SubDelReq failed. Err: %v", err)
457 //TODO improve error handling. Important at least in merge
462 deltrans.PayloadLen = len(packedData.Buf)
463 deltrans.Payload = packedData.Buf
465 err = subs.SetTransaction(deltrans)
467 xapp.Logger.Error("SubReq timeout: %s, Dropping this msg.", err.Error())
468 //TODO improve error handling. Important at least in merge
473 c.rmrSend("SubDelReq(SubReq timer) to E2T", subs, deltrans, deltrans.Payload, deltrans.PayloadLen)
475 c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()), subDelReqTime, FirstTry, c.handleSubscriptionDeleteRequestTimer)
479 func (c *Control) handleSubscriptionDeleteRequest(params *RMRParams) {
480 xapp.Logger.Info("SubDelReq from xapp: %s", params.String())
485 trans, err := c.tracker.TrackTransaction(NewRmrEndpoint(params.Src),
493 xapp.Logger.Error("SubDelReq: %s, Dropping this msg. %s", err.Error(), params.String())
500 trans.SubDelReqMsg, err = c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
502 xapp.Logger.Error("SubDelReq: %s Dropping this msg. %s", err.Error(), trans)
510 subs := c.registry.GetSubscription(uint16(trans.SubDelReqMsg.RequestId.Seq))
511 if subs == nil && params.SubId > 0 {
512 subs = c.registry.GetSubscription(uint16(params.SubId))
516 xapp.Logger.Error("SubDelReq: Not valid subscription found payloadSeqNum: %d, SubId: %d. Dropping this msg. %s", trans.SubDelReqMsg.RequestId.Seq, params.SubId, trans)
520 xapp.Logger.Info("SubDelReq: subscription found payloadSeqNum: %d, SubId: %d. %s", trans.SubDelReqMsg.RequestId.Seq, params.SubId, trans)
522 err = subs.SetTransaction(trans)
524 xapp.Logger.Error("SubDelReq: %s, Dropping this msg. %s", err.Error(), trans)
530 // TODO: subscription delete is in fact owned by subscription and not transaction.
531 // Transaction is toward xapp while Subscription is toward ran.
532 // In merge several xapps may wake transactions, while only one subscription
533 // toward ran occurs -> subscription owns subscription creation toward ran
535 // This is intermediate solution while improving message handling
537 packedData, err := c.e2ap.PackSubscriptionDeleteRequest(trans.SubDelReqMsg)
539 xapp.Logger.Error("SubDelReq: %s for trans %s", err.Error(), trans)
544 //Optimize and store packed message to be sent (for retransmission). Again owned by subscription?
545 trans.Payload = packedData.Buf
546 trans.PayloadLen = len(packedData.Buf)
550 c.rmrSend("SubDelReq to E2T", subs, trans, trans.Payload, trans.PayloadLen)
552 c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()), subDelReqTime, FirstTry, c.handleSubscriptionDeleteRequestTimer)
556 func (c *Control) handleSubscriptionDeleteResponse(params *RMRParams) (err error) {
557 xapp.Logger.Info("SubDelResp from E2T:%s", params.String())
562 SubDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
564 xapp.Logger.Error("SubDelResp: %s Dropping this msg. %s", err.Error(), params.String())
571 subs := c.registry.GetSubscription(uint16(SubDelRespMsg.RequestId.Seq))
572 if subs == nil && params.SubId > 0 {
573 subs = c.registry.GetSubscription(uint16(params.SubId))
577 xapp.Logger.Error("SubDelResp: Not valid subscription found payloadSeqNum: %d, SubId: %d. Dropping this msg. %s", SubDelRespMsg.RequestId.Seq, params.SubId, params.String())
580 xapp.Logger.Info("SubDelResp: subscription found payloadSeqNum: %d, SubId: %d", SubDelRespMsg.RequestId.Seq, subs.GetSubId())
585 trans := subs.GetTransaction()
587 xapp.Logger.Error("SubDelResp: Unknown trans. Dropping this msg. SubId: %d", subs.GetSubId())
591 trans.SubDelRespMsg = SubDelRespMsg
596 c.timerMap.StopTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()))
598 responseReceived := trans.CheckResponseReceived()
599 if responseReceived == true {
600 // Subscription Delete timer already received
604 packedData, err := c.e2ap.PackSubscriptionDeleteResponse(trans.SubDelRespMsg)
606 xapp.Logger.Error("SubDelResp: %s for trans %s (continuing cleaning)", err.Error(), trans)
608 //Optimize and store packed message to be sent.
609 trans.Payload = packedData.Buf
610 trans.PayloadLen = len(packedData.Buf)
612 if trans.ForwardRespToXapp == true {
613 c.rmrReplyToSender("SubDelResp to xapp", subs, trans, 12021, trans.Payload, trans.PayloadLen)
614 time.Sleep(3 * time.Second)
624 func (c *Control) handleSubscriptionDeleteFailure(params *RMRParams) {
625 xapp.Logger.Info("SubDelFail from E2T:%s", params.String())
627 payloadSeqNum, err := c.e2ap.GetSubscriptionDeleteFailureSequenceNumber(params.Payload)
629 xapp.Logger.Error("SubDelFail: Unable to get Sequence Number from Payload. Dropping this msg. Err: %v, %s", err, params.String())
632 xapp.Logger.Info("SubDelFail: Received payloadSeqNum: %v", payloadSeqNum)
634 subs := c.registry.GetSubscription(payloadSeqNum)
636 xapp.Logger.Error("SubDelFail: Unknown payloadSeqNum. Dropping this msg. PayloadSeqNum: %v, SubId: %v", payloadSeqNum, params.SubId)
640 trans := subs.GetTransaction()
642 xapp.Logger.Error("SubDelFail: Unknown trans. Dropping this msg. PayloadSeqNum: %v, SubId: %v", subs.GetSubId(), params.SubId)
646 c.timerMap.StopTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()))
648 responseReceived := trans.CheckResponseReceived()
649 if responseReceived == true {
650 // Subscription Delete timer already received
653 if trans.ForwardRespToXapp == true {
654 var subDelRespPayload []byte
655 subDelRespPayload, err = c.e2ap.PackSubscriptionDeleteResponseFromSubDelReq(trans.Payload, subs.GetSubId())
657 xapp.Logger.Error("SubDelFail:Packing SubDelResp failed. Err: %v", err)
661 // RIC SUBSCRIPTION DELETE RESPONSE
662 c.rmrReplyToSender("SubDelFail to xapp", subs, trans, 12021, subDelRespPayload, len(subDelRespPayload))
663 time.Sleep(3 * time.Second)
671 func (c *Control) handleSubscriptionDeleteRequestTimer(strId string, nbrId int, tryCount uint64) {
672 xapp.Logger.Info("SubDelReq timeout: subId: %v, tryCount: %v", nbrId, tryCount)
674 subs := c.registry.GetSubscription(uint16(nbrId))
676 xapp.Logger.Error("SubDelReq timeout: Unknown payloadSeqNum. Dropping this msg. SubId: %v", nbrId)
680 trans := subs.GetTransaction()
682 xapp.Logger.Error("SubDelReq timeout: Unknown trans. Dropping this msg. SubId: %v", subs.GetSubId())
686 responseReceived := trans.CheckResponseReceived()
687 if responseReceived == true {
688 // Subscription Delete Response or Failure already received
692 if tryCount < maxSubDelReqTryCount {
693 // Set possible to handle new response for the subId
694 trans.RetryTransaction()
695 c.rmrSend("SubDelReq(SubDelReq timer retransmit) to E2T", subs, trans, trans.Payload, trans.PayloadLen)
697 c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()), subReqTime, tryCount, c.handleSubscriptionDeleteRequestTimer)
701 if trans.ForwardRespToXapp == true {
702 var subDelRespPayload []byte
703 subDelRespPayload, err := c.e2ap.PackSubscriptionDeleteResponseFromSubDelReq(trans.Payload, subs.GetSubId())
705 xapp.Logger.Error("SubDelReq timeout: Unable to pack payload. Dropping this this msg. Err: %v, SubId: %v, Xid: %s, Payload %x", err, subs.GetSubId(), trans.GetXid(), trans.Payload)
709 // RIC SUBSCRIPTION DELETE RESPONSE
710 c.rmrReplyToSender("SubDelResp(SubDelReq timer) to xapp", subs, trans, 12021, subDelRespPayload, len(subDelRespPayload))
712 time.Sleep(3 * time.Second)