2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
24 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
25 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
26 rtmgrhandle "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client/handle"
27 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
28 httptransport "github.com/go-openapi/runtime/client"
29 "github.com/go-openapi/strfmt"
30 "github.com/spf13/viper"
36 //-----------------------------------------------------------------------------
38 //-----------------------------------------------------------------------------
40 var subReqTime time.Duration = 5 * time.Second
41 var subDelReqTime time.Duration = 5 * time.Second
42 var maxSubReqTryCount uint64 = 2 // Initial try + retry
43 var maxSubDelReqTryCount uint64 = 2 // Initial try + retry
50 rmrSendMutex sync.Mutex
70 xapp.Logger.Info("SUBMGR")
72 viper.SetEnvPrefix("submgr")
73 viper.AllowEmptyEnv(true)
74 seedSN = uint16(viper.GetInt("seed_sn"))
76 rand.Seed(time.Now().UnixNano())
77 seedSN = uint16(rand.Intn(65535))
82 xapp.Logger.Info("SUBMGR: Initial Sequence Number: %v", seedSN)
85 func NewControl() *Control {
87 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
88 client := rtmgrclient.New(transport, strfmt.Default)
89 handle := rtmgrhandle.NewProvideXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
90 deleteHandle := rtmgrhandle.NewDeleteXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
91 rtmgrClient := RtmgrClient{client, handle, deleteHandle}
93 registry := new(Registry)
94 registry.Initialize(seedSN)
95 registry.rtmgrClient = &rtmgrClient
97 tracker := new(Tracker)
100 timerMap := new(TimerMap)
103 return &Control{e2ap: new(E2ap),
111 func (c *Control) Run() {
115 func (c *Control) rmrSendRaw(desc string, params *RMRParams) (err error) {
117 xapp.Logger.Info("%s: %s", desc, params.String())
120 for ; i <= 10 && status == false; i++ {
121 c.rmrSendMutex.Lock()
122 status = xapp.Rmr.Send(params.RMRParams, false)
123 c.rmrSendMutex.Unlock()
125 xapp.Logger.Info("rmr.Send() failed. Retry count %d, %s", i, params.String())
126 time.Sleep(500 * time.Millisecond)
130 err = fmt.Errorf("rmr.Send() failed. Retry count %d, %s", i, params.String())
131 xapp.Logger.Error("%s: %s", desc, err.Error())
132 xapp.Rmr.Free(params.Mbuf)
137 func (c *Control) rmrSend(desc string, subs *Subscription, trans *Transaction, payload []byte, payloadLen int) (err error) {
138 params := &RMRParams{&xapp.RMRParams{}}
139 params.Mtype = trans.GetMtype()
140 params.SubId = int(subs.GetSubId())
142 params.Meid = subs.GetMeid()
144 params.PayloadLen = payloadLen
145 params.Payload = payload
148 return c.rmrSendRaw(desc, params)
151 func (c *Control) rmrReplyToSender(desc string, subs *Subscription, trans *Transaction, mType int, payload []byte, payloadLen int) (err error) {
152 params := &RMRParams{&xapp.RMRParams{}}
154 params.SubId = int(subs.GetSubId())
155 params.Xid = trans.GetXid()
156 params.Meid = trans.GetMeid()
158 params.PayloadLen = payloadLen
159 params.Payload = payload
162 return c.rmrSendRaw(desc, params)
165 func (c *Control) Consume(params *xapp.RMRParams) (err error) {
166 xapp.Rmr.Free(params.Mbuf)
168 msg := &RMRParams{params}
171 case xapp.RICMessageTypes["RIC_SUB_REQ"]:
172 go c.handleSubscriptionRequest(msg)
173 case xapp.RICMessageTypes["RIC_SUB_RESP"]:
174 go c.handleSubscriptionResponse(msg)
175 case xapp.RICMessageTypes["RIC_SUB_FAILURE"]:
176 go c.handleSubscriptionFailure(msg)
177 case xapp.RICMessageTypes["RIC_SUB_DEL_REQ"]:
178 go c.handleSubscriptionDeleteRequest(msg)
179 case xapp.RICMessageTypes["RIC_SUB_DEL_RESP"]:
180 go c.handleSubscriptionDeleteResponse(msg)
181 case xapp.RICMessageTypes["RIC_SUB_DEL_FAILURE"]:
182 go c.handleSubscriptionDeleteFailure(msg)
184 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
190 func (c *Control) handleSubscriptionRequest(params *RMRParams) {
191 xapp.Logger.Info("SubReq from xapp: %s", params.String())
196 trans, err := c.tracker.TrackTransaction(NewRmrEndpoint(params.Src),
204 xapp.Logger.Error("SubReq: %s, Dropping this msg. %s", err.Error(), params.String())
211 trans.SubReqMsg, err = c.e2ap.UnpackSubscriptionRequest(params.Payload)
213 xapp.Logger.Error("SubReq: %s Dropping this msg. %s", err.Error(), trans)
221 subs, err := c.registry.ReserveSubscription(&trans.RmrEndpoint, trans.Meid)
223 xapp.Logger.Error("SubReq: %s, Dropping this msg. %s", err.Error(), trans)
228 err = subs.SetTransaction(trans)
230 xapp.Logger.Error("SubReq: %s, Dropping this msg. %s", err.Error(), trans)
236 trans.SubReqMsg.RequestId.Seq = uint32(subs.GetSubId())
239 // TODO: subscription create is in fact owned by subscription and not transaction.
240 // Transaction is toward xapp while Subscription is toward ran.
241 // In merge several xapps may wake transactions, while only one subscription
242 // toward ran occurs -> subscription owns subscription creation toward ran
244 // This is intermediate solution while improving message handling
246 packedData, err := c.e2ap.PackSubscriptionRequest(trans.SubReqMsg)
248 xapp.Logger.Error("SubReq: %s for trans %s", err.Error(), trans)
254 //Optimize and store packed message to be sent (for retransmission). Again owned by subscription?
255 trans.Payload = packedData.Buf
256 trans.PayloadLen = len(packedData.Buf)
258 c.rmrSend("SubReq to E2T", subs, trans, packedData.Buf, len(packedData.Buf))
260 c.timerMap.StartTimer("RIC_SUB_REQ", int(subs.GetSubId()), subReqTime, FirstTry, c.handleSubscriptionRequestTimer)
261 xapp.Logger.Debug("SubReq: Debugging trans table = %v", c.tracker.transactionXappTable)
265 func (c *Control) handleSubscriptionResponse(params *RMRParams) {
266 xapp.Logger.Info("SubResp from E2T: %s", params.String())
271 SubRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
273 xapp.Logger.Error("SubDelReq: %s Dropping this msg. %s", err.Error(), params.String())
280 subs := c.registry.GetSubscription(uint16(SubRespMsg.RequestId.Seq))
281 if subs == nil && params.SubId > 0 {
282 subs = c.registry.GetSubscription(uint16(params.SubId))
286 xapp.Logger.Error("SubResp: Not valid subscription found payloadSeqNum: %d, SubId: %d. Dropping this msg. %s", SubRespMsg.RequestId.Seq, params.SubId, params.String())
289 xapp.Logger.Info("SubResp: subscription found payloadSeqNum: %d, SubId: %d", SubRespMsg.RequestId.Seq, subs.GetSubId())
294 trans := subs.GetTransaction()
296 xapp.Logger.Error("SubResp: Unknown trans. Dropping this msg. SubId: %d", subs.GetSubId())
300 trans.SubRespMsg = SubRespMsg
305 c.timerMap.StopTimer("RIC_SUB_REQ", int(subs.GetSubId()))
307 responseReceived := trans.CheckResponseReceived()
308 if responseReceived == true {
309 // Subscription timer already received
313 packedData, err := c.e2ap.PackSubscriptionResponse(trans.SubRespMsg)
315 xapp.Logger.Error("SubResp: %s for trans %s", err.Error(), trans)
320 //Optimize and store packed message to be sent.
321 trans.Payload = packedData.Buf
322 trans.PayloadLen = len(packedData.Buf)
326 c.rmrReplyToSender("SubResp to xapp", subs, trans, 12011, trans.Payload, trans.PayloadLen)
330 func (c *Control) handleSubscriptionFailure(params *RMRParams) {
331 xapp.Logger.Info("SubFail from E2T: %s", params.String())
336 SubFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
338 xapp.Logger.Error("SubFail: %s Dropping this msg. %s", err.Error(), params.String())
345 subs := c.registry.GetSubscription(uint16(SubFailMsg.RequestId.Seq))
346 if subs == nil && params.SubId > 0 {
347 subs = c.registry.GetSubscription(uint16(params.SubId))
351 xapp.Logger.Error("SubFail: Not valid subscription found payloadSeqNum: %d, SubId: %d. Dropping this msg. %s", SubFailMsg.RequestId.Seq, params.SubId, params.String())
354 xapp.Logger.Info("SubFail: subscription found payloadSeqNum: %d, SubId: %d", SubFailMsg.RequestId.Seq, subs.GetSubId())
359 trans := subs.GetTransaction()
361 xapp.Logger.Error("SubFail: Unknown trans. Dropping this msg. SubId: %d", subs.GetSubId())
364 trans.SubFailMsg = SubFailMsg
369 c.timerMap.StopTimer("RIC_SUB_REQ", int(subs.GetSubId()))
371 responseReceived := trans.CheckResponseReceived()
376 if responseReceived == true {
377 // Subscription timer already received
381 packedData, err := c.e2ap.PackSubscriptionFailure(trans.SubFailMsg)
383 //TODO error handling improvement
384 xapp.Logger.Error("SubFail: %s for trans %s (continue still)", err.Error(), trans)
386 //Optimize and store packed message to be sent.
387 trans.Payload = packedData.Buf
388 trans.PayloadLen = len(packedData.Buf)
389 c.rmrReplyToSender("SubFail to xapp", subs, trans, 12012, trans.Payload, trans.PayloadLen)
390 time.Sleep(3 * time.Second)
398 func (c *Control) handleSubscriptionRequestTimer(strId string, nbrId int, tryCount uint64) {
399 xapp.Logger.Info("SubReq timeout: subId: %v, tryCount: %v", nbrId, tryCount)
401 subs := c.registry.GetSubscription(uint16(nbrId))
403 xapp.Logger.Error("SubReq timeout: Unknown payloadSeqNum. Dropping this msg. SubId: %v", nbrId)
407 trans := subs.GetTransaction()
409 xapp.Logger.Error("SubReq timeout: Unknown trans. Dropping this msg. SubId: %v", subs.GetSubId())
413 responseReceived := trans.CheckResponseReceived()
415 if responseReceived == true {
416 // Subscription Response or Failure already received
420 if tryCount < maxSubReqTryCount {
421 xapp.Logger.Info("SubReq timeout: Resending SubReq to E2T: Mtype: %v, SubId: %v, Xid %s, Meid %v", trans.GetMtype(), subs.GetSubId(), trans.GetXid(), trans.GetMeid())
423 trans.RetryTransaction()
425 c.rmrSend("SubReq(SubReq timer retransmit) to E2T", subs, trans, trans.Payload, trans.PayloadLen)
428 c.timerMap.StartTimer("RIC_SUB_REQ", int(subs.GetSubId()), subReqTime, tryCount, c.handleSubscriptionRequestTimer)
432 // Delete CREATE transaction
435 // Create DELETE transaction (internal and no messages toward xapp)
436 deltrans, err := c.tracker.TrackTransaction(&trans.RmrEndpoint,
437 12020, // RIC SUBSCRIPTION DELETE
444 xapp.Logger.Error("SubReq timeout: %s, Dropping this msg.", err.Error())
445 //TODO improve error handling. Important at least in merge
450 deltrans.SubDelReqMsg = &e2ap.E2APSubscriptionDeleteRequest{}
451 deltrans.SubDelReqMsg.RequestId.Id = trans.SubReqMsg.RequestId.Id
452 deltrans.SubDelReqMsg.RequestId.Seq = uint32(subs.GetSubId())
453 deltrans.SubDelReqMsg.FunctionId = trans.SubReqMsg.FunctionId
454 packedData, err := c.e2ap.PackSubscriptionDeleteRequest(deltrans.SubDelReqMsg)
456 xapp.Logger.Error("SubReq timeout: Packing SubDelReq failed. Err: %v", err)
457 //TODO improve error handling. Important at least in merge
462 deltrans.PayloadLen = len(packedData.Buf)
463 deltrans.Payload = packedData.Buf
465 err = subs.SetTransaction(deltrans)
467 xapp.Logger.Error("SubReq timeout: %s, Dropping this msg.", err.Error())
468 //TODO improve error handling. Important at least in merge
473 c.rmrSend("SubDelReq(SubReq timer) to E2T", subs, deltrans, deltrans.Payload, deltrans.PayloadLen)
475 c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()), subDelReqTime, FirstTry, c.handleSubscriptionDeleteRequestTimer)
479 func (c *Control) handleSubscriptionDeleteRequest(params *RMRParams) {
480 xapp.Logger.Info("SubDelReq from xapp: %s", params.String())
485 trans, err := c.tracker.TrackTransaction(NewRmrEndpoint(params.Src),
493 xapp.Logger.Error("SubDelReq: %s, Dropping this msg. %s", err.Error(), params.String())
500 trans.SubDelReqMsg, err = c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
502 xapp.Logger.Error("SubDelReq: %s Dropping this msg. %s", err.Error(), trans)
510 subs := c.registry.GetSubscription(uint16(trans.SubDelReqMsg.RequestId.Seq))
511 if subs == nil && params.SubId > 0 {
512 subs = c.registry.GetSubscription(uint16(params.SubId))
516 xapp.Logger.Error("SubDelReq: Not valid subscription found payloadSeqNum: %d, SubId: %d. Dropping this msg. %s", trans.SubDelReqMsg.RequestId.Seq, params.SubId, trans)
520 xapp.Logger.Info("SubDelReq: subscription found payloadSeqNum: %d, SubId: %d. %s", trans.SubDelReqMsg.RequestId.Seq, params.SubId, trans)
522 err = subs.SetTransaction(trans)
524 xapp.Logger.Error("SubDelReq: %s, Dropping this msg. %s", err.Error(), trans)
530 // TODO: subscription delete is in fact owned by subscription and not transaction.
531 // Transaction is toward xapp while Subscription is toward ran.
532 // In merge several xapps may wake transactions, while only one subscription
533 // toward ran occurs -> subscription owns subscription creation toward ran
535 // This is intermediate solution while improving message handling
537 packedData, err := c.e2ap.PackSubscriptionDeleteRequest(trans.SubDelReqMsg)
539 xapp.Logger.Error("SubDelReq: %s for trans %s", err.Error(), trans)
544 //Optimize and store packed message to be sent (for retransmission). Again owned by subscription?
545 trans.Payload = packedData.Buf
546 trans.PayloadLen = len(packedData.Buf)
550 c.rmrSend("SubDelReq to E2T", subs, trans, trans.Payload, trans.PayloadLen)
552 c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()), subDelReqTime, FirstTry, c.handleSubscriptionDeleteRequestTimer)
556 func (c *Control) handleSubscriptionDeleteResponse(params *RMRParams) (err error) {
557 xapp.Logger.Info("SubDelResp from E2T:%s", params.String())
559 payloadSeqNum, err := c.e2ap.GetSubscriptionDeleteResponseSequenceNumber(params.Payload)
561 xapp.Logger.Error("SubDelResp: Unable to get Sequence Number from Payload. Dropping this msg. Err: %v, SubId: %v, Xid: %s, Payload %X", err, params.SubId, params.Xid, params.Payload)
564 xapp.Logger.Info("SubDelResp: Received payloadSeqNum: %v", payloadSeqNum)
566 subs := c.registry.GetSubscription(payloadSeqNum)
568 xapp.Logger.Error("SubDelResp: Unknown payloadSeqNum. Dropping this msg. PayloadSeqNum: %v, SubId: %v", payloadSeqNum, params.SubId)
572 trans := subs.GetTransaction()
574 xapp.Logger.Error("SubDelResp: Unknown trans. Dropping this msg. PayloadSeqNum: %v, SubId: %v", subs.GetSubId(), params.SubId)
578 c.timerMap.StopTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()))
580 responseReceived := trans.CheckResponseReceived()
581 if responseReceived == true {
582 // Subscription Delete timer already received
586 if trans.ForwardRespToXapp == true {
587 c.rmrReplyToSender("SubDelResp to xapp", subs, trans, params.Mtype, params.Payload, params.PayloadLen)
588 time.Sleep(3 * time.Second)
596 func (c *Control) handleSubscriptionDeleteFailure(params *RMRParams) {
597 xapp.Logger.Info("SubDelFail from E2T:%s", params.String())
599 payloadSeqNum, err := c.e2ap.GetSubscriptionDeleteFailureSequenceNumber(params.Payload)
601 xapp.Logger.Error("SubDelFail: Unable to get Sequence Number from Payload. Dropping this msg. Err: %v, %s", err, params.String())
604 xapp.Logger.Info("SubDelFail: Received payloadSeqNum: %v", payloadSeqNum)
606 subs := c.registry.GetSubscription(payloadSeqNum)
608 xapp.Logger.Error("SubDelFail: Unknown payloadSeqNum. Dropping this msg. PayloadSeqNum: %v, SubId: %v", payloadSeqNum, params.SubId)
612 trans := subs.GetTransaction()
614 xapp.Logger.Error("SubDelFail: Unknown trans. Dropping this msg. PayloadSeqNum: %v, SubId: %v", subs.GetSubId(), params.SubId)
618 c.timerMap.StopTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()))
620 responseReceived := trans.CheckResponseReceived()
621 if responseReceived == true {
622 // Subscription Delete timer already received
625 if trans.ForwardRespToXapp == true {
626 var subDelRespPayload []byte
627 subDelRespPayload, err = c.e2ap.PackSubscriptionDeleteResponseFromSubDelReq(trans.Payload, subs.GetSubId())
629 xapp.Logger.Error("SubDelFail:Packing SubDelResp failed. Err: %v", err)
633 // RIC SUBSCRIPTION DELETE RESPONSE
634 c.rmrReplyToSender("SubDelFail to xapp", subs, trans, 12021, subDelRespPayload, len(subDelRespPayload))
635 time.Sleep(3 * time.Second)
643 func (c *Control) handleSubscriptionDeleteRequestTimer(strId string, nbrId int, tryCount uint64) {
644 xapp.Logger.Info("SubDelReq timeout: subId: %v, tryCount: %v", nbrId, tryCount)
646 subs := c.registry.GetSubscription(uint16(nbrId))
648 xapp.Logger.Error("SubDelReq timeout: Unknown payloadSeqNum. Dropping this msg. SubId: %v", nbrId)
652 trans := subs.GetTransaction()
654 xapp.Logger.Error("SubDelReq timeout: Unknown trans. Dropping this msg. SubId: %v", subs.GetSubId())
658 responseReceived := trans.CheckResponseReceived()
659 if responseReceived == true {
660 // Subscription Delete Response or Failure already received
664 if tryCount < maxSubDelReqTryCount {
665 // Set possible to handle new response for the subId
666 trans.RetryTransaction()
667 c.rmrSend("SubDelReq(SubDelReq timer retransmit) to E2T", subs, trans, trans.Payload, trans.PayloadLen)
669 c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()), subReqTime, tryCount, c.handleSubscriptionDeleteRequestTimer)
673 if trans.ForwardRespToXapp == true {
674 var subDelRespPayload []byte
675 subDelRespPayload, err := c.e2ap.PackSubscriptionDeleteResponseFromSubDelReq(trans.Payload, subs.GetSubId())
677 xapp.Logger.Error("SubDelReq timeout: Unable to pack payload. Dropping this this msg. Err: %v, SubId: %v, Xid: %s, Payload %x", err, subs.GetSubId(), trans.GetXid(), trans.Payload)
681 // RIC SUBSCRIPTION DELETE RESPONSE
682 c.rmrReplyToSender("SubDelResp(SubDelReq timer) to xapp", subs, trans, 12021, subDelRespPayload, len(subDelRespPayload))
684 time.Sleep(3 * time.Second)