2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
24 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
25 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
26 rtmgrhandle "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client/handle"
27 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
28 httptransport "github.com/go-openapi/runtime/client"
29 "github.com/go-openapi/strfmt"
30 "github.com/spf13/viper"
36 //-----------------------------------------------------------------------------
38 //-----------------------------------------------------------------------------
40 var subReqTime time.Duration = 5 * time.Second
41 var subDelReqTime time.Duration = 5 * time.Second
42 var maxSubReqTryCount uint64 = 2 // Initial try + retry
43 var maxSubDelReqTryCount uint64 = 2 // Initial try + retry
48 rtmgrClient *RtmgrClient
51 rmrSendMutex sync.Mutex
71 xapp.Logger.Info("SUBMGR")
73 viper.SetEnvPrefix("submgr")
74 viper.AllowEmptyEnv(true)
75 seedSN = uint16(viper.GetInt("seed_sn"))
77 rand.Seed(time.Now().UnixNano())
78 seedSN = uint16(rand.Intn(65535))
83 xapp.Logger.Info("SUBMGR: Initial Sequence Number: %v", seedSN)
86 func NewControl() *Control {
88 registry := new(Registry)
89 registry.Initialize(seedSN)
91 tracker := new(Tracker)
94 timerMap := new(TimerMap)
97 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
98 client := rtmgrclient.New(transport, strfmt.Default)
99 handle := rtmgrhandle.NewProvideXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
100 deleteHandle := rtmgrhandle.NewDeleteXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
101 rtmgrClient := RtmgrClient{client, handle, deleteHandle}
103 rtmgrClientPtr := &rtmgrClient
105 //TODO: to make this better. Now it is just a hack.
106 registry.rtmgrClient = rtmgrClientPtr
108 return &Control{e2ap: new(E2ap),
110 rtmgrClient: rtmgrClientPtr,
117 func (c *Control) Run() {
121 func (c *Control) rmrSendRaw(desc string, params *RMRParams) (err error) {
123 xapp.Logger.Info("%s: %s", desc, params.String())
126 for ; i <= 10 && status == false; i++ {
127 c.rmrSendMutex.Lock()
128 status = xapp.Rmr.Send(params.RMRParams, false)
129 c.rmrSendMutex.Unlock()
131 xapp.Logger.Info("rmr.Send() failed. Retry count %d, %s", i, params.String())
132 time.Sleep(500 * time.Millisecond)
136 err = fmt.Errorf("rmr.Send() failed. Retry count %d, %s", i, params.String())
137 xapp.Logger.Error("%s: %s", desc, err.Error())
138 xapp.Rmr.Free(params.Mbuf)
143 func (c *Control) rmrSend(desc string, subs *Subscription, trans *Transaction, payload []byte, payloadLen int) (err error) {
144 params := &RMRParams{&xapp.RMRParams{}}
145 params.Mtype = trans.GetMtype()
146 params.SubId = int(subs.GetSubId())
148 params.Meid = subs.GetMeid()
150 params.PayloadLen = payloadLen
151 params.Payload = payload
154 return c.rmrSendRaw(desc, params)
157 func (c *Control) rmrReplyToSender(desc string, subs *Subscription, trans *Transaction, mType int, payload []byte, payloadLen int) (err error) {
158 params := &RMRParams{&xapp.RMRParams{}}
160 params.SubId = int(subs.GetSubId())
161 params.Xid = trans.GetXid()
162 params.Meid = trans.GetMeid()
164 params.PayloadLen = payloadLen
165 params.Payload = payload
168 return c.rmrSendRaw(desc, params)
171 func (c *Control) Consume(params *xapp.RMRParams) (err error) {
172 xapp.Rmr.Free(params.Mbuf)
174 msg := &RMRParams{params}
177 case xapp.RICMessageTypes["RIC_SUB_REQ"]:
178 go c.handleSubscriptionRequest(msg)
179 case xapp.RICMessageTypes["RIC_SUB_RESP"]:
180 go c.handleSubscriptionResponse(msg)
181 case xapp.RICMessageTypes["RIC_SUB_FAILURE"]:
182 go c.handleSubscriptionFailure(msg)
183 case xapp.RICMessageTypes["RIC_SUB_DEL_REQ"]:
184 go c.handleSubscriptionDeleteRequest(msg)
185 case xapp.RICMessageTypes["RIC_SUB_DEL_RESP"]:
186 go c.handleSubscriptionDeleteResponse(msg)
187 case xapp.RICMessageTypes["RIC_SUB_DEL_FAILURE"]:
188 go c.handleSubscriptionDeleteFailure(msg)
190 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
196 func (c *Control) handleSubscriptionRequest(params *RMRParams) {
197 xapp.Logger.Info("SubReq from xapp: %s", params.String())
202 trans, err := c.tracker.TrackTransaction(NewRmrEndpoint(params.Src),
210 xapp.Logger.Error("SubReq: %s, Dropping this msg. %s", err.Error(), params.String())
217 trans.SubReqMsg, err = c.e2ap.UnpackSubscriptionRequest(params.Payload)
219 xapp.Logger.Error("SubReq: %s Dropping this msg. %s", err.Error(), trans)
227 subs, err := c.registry.ReserveSubscription(&trans.RmrEndpoint, trans.Meid)
229 xapp.Logger.Error("SubReq: %s, Dropping this msg. %s", err.Error(), trans)
234 err = subs.SetTransaction(trans)
236 xapp.Logger.Error("SubReq: %s, Dropping this msg. %s", err.Error(), trans)
237 c.registry.DelSubscription(subs.Seq)
242 trans.SubReqMsg.RequestId.Seq = uint32(subs.GetSubId())
245 // TODO: subscription create is in fact owned by subscription and not transaction.
246 // Transaction is toward xapp while Subscription is toward ran.
247 // In merge several xapps may wake transactions, while only one subscription
248 // toward ran occurs -> subscription owns subscription creation toward ran
250 // This is intermediate solution while improving message handling
252 packedData, err := c.e2ap.PackSubscriptionRequest(trans.SubReqMsg)
254 xapp.Logger.Error("SubReq: %s for trans %s", err.Error(), trans)
255 c.registry.DelSubscription(subs.Seq)
260 //Optimize and store packed message to be sent (for retransmission). Again owned by subscription?
261 trans.Payload = packedData.Buf
262 trans.PayloadLen = len(packedData.Buf)
264 c.rmrSend("SubReq to E2T", subs, trans, packedData.Buf, len(packedData.Buf))
266 c.timerMap.StartTimer("RIC_SUB_REQ", int(subs.GetSubId()), subReqTime, FirstTry, c.handleSubscriptionRequestTimer)
267 xapp.Logger.Debug("SubReq: Debugging trans table = %v", c.tracker.transactionXappTable)
271 func (c *Control) handleSubscriptionResponse(params *RMRParams) {
272 xapp.Logger.Info("SubResp from E2T: %s", params.String())
277 SubRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
279 xapp.Logger.Error("SubDelReq: %s Dropping this msg. %s", err.Error(), params.String())
286 subs := c.registry.GetSubscription(uint16(SubRespMsg.RequestId.Seq))
287 if subs == nil && params.SubId > 0 {
288 subs = c.registry.GetSubscription(uint16(params.SubId))
292 xapp.Logger.Error("SubResp: Not valid subscription found payloadSeqNum: %d, SubId: %d. Dropping this msg. %s", SubRespMsg.RequestId.Seq, params.SubId, params.String())
295 xapp.Logger.Info("SubResp: subscription found payloadSeqNum: %d, SubId: %d", SubRespMsg.RequestId.Seq, subs.GetSubId())
300 trans := subs.GetTransaction()
302 xapp.Logger.Error("SubResp: Unknown trans. Dropping this msg. SubId: %d", subs.GetSubId())
306 trans.SubRespMsg = SubRespMsg
311 c.timerMap.StopTimer("RIC_SUB_REQ", int(subs.GetSubId()))
313 responseReceived := trans.CheckResponseReceived()
314 if responseReceived == true {
315 // Subscription timer already received
319 packedData, err := c.e2ap.PackSubscriptionResponse(trans.SubRespMsg)
321 xapp.Logger.Error("SubResp: %s for trans %s", err.Error(), trans)
326 //Optimize and store packed message to be sent.
327 trans.Payload = packedData.Buf
328 trans.PayloadLen = len(packedData.Buf)
332 c.rmrReplyToSender("SubResp to xapp", subs, trans, 12011, trans.Payload, trans.PayloadLen)
336 func (c *Control) handleSubscriptionFailure(params *RMRParams) {
337 xapp.Logger.Info("SubFail from E2T: %s", params.String())
339 payloadSeqNum, err := c.e2ap.GetSubscriptionFailureSequenceNumber(params.Payload)
341 xapp.Logger.Error("SubFail: Unable to get Sequence Number from Payload. Dropping this msg. Err: %v, SubId: %v, Xid: %s, Payload %X", err, params.SubId, params.Xid, params.Payload)
344 xapp.Logger.Info("SubFail: Received payloadSeqNum: %v", payloadSeqNum)
346 subs := c.registry.GetSubscription(payloadSeqNum)
348 xapp.Logger.Error("SubFail: Unknown payloadSeqNum. Dropping this msg. PayloadSeqNum: %v, SubId: %v", payloadSeqNum, params.SubId)
352 trans := subs.GetTransaction()
354 xapp.Logger.Error("SubFail: Unknown trans. Dropping this msg. PayloadSeqNum: %v, SubId: %v", payloadSeqNum, params.SubId)
358 c.timerMap.StopTimer("RIC_SUB_REQ", int(payloadSeqNum))
360 responseReceived := trans.CheckResponseReceived()
362 xapp.Logger.Info("SubFail: Dropping this msg. Err: %v SubId: %v", err, payloadSeqNum)
366 if responseReceived == true {
367 // Subscription timer already received
370 xapp.Logger.Info("SubFail: SubId: %v, from address: %s. Forwarding response to xApp", payloadSeqNum, trans.RmrEndpoint)
372 c.rmrReplyToSender("SubFail to xapp", subs, trans, params.Mtype, params.Payload, params.PayloadLen)
374 time.Sleep(3 * time.Second)
376 xapp.Logger.Info("SubFail: Deleting trans record. SubId: %v, Xid: %s", params.SubId, params.Xid)
378 if !c.registry.DelSubscription(payloadSeqNum) {
379 xapp.Logger.Error("SubFail: Failed to release sequency number. SubId: %v, Xid: %s", params.SubId, params.Xid)
384 func (c *Control) handleSubscriptionRequestTimer(strId string, nbrId int, tryCount uint64) {
385 xapp.Logger.Info("SubReq timeout: subId: %v, tryCount: %v", nbrId, tryCount)
387 subs := c.registry.GetSubscription(uint16(nbrId))
389 xapp.Logger.Error("SubReq timeout: Unknown payloadSeqNum. Dropping this msg. SubId: %v", nbrId)
393 trans := subs.GetTransaction()
395 xapp.Logger.Error("SubReq timeout: Unknown trans. Dropping this msg. SubId: %v", subs.GetSubId())
399 responseReceived := trans.CheckResponseReceived()
401 if responseReceived == true {
402 // Subscription Response or Failure already received
406 if tryCount < maxSubReqTryCount {
407 xapp.Logger.Info("SubReq timeout: Resending SubReq to E2T: Mtype: %v, SubId: %v, Xid %s, Meid %v", trans.GetMtype(), subs.GetSubId(), trans.GetXid(), trans.GetMeid())
409 trans.RetryTransaction()
411 c.rmrSend("SubReq(SubReq timer) to E2T", subs, trans, trans.Payload, trans.PayloadLen)
414 c.timerMap.StartTimer("RIC_SUB_REQ", int(subs.GetSubId()), subReqTime, tryCount, c.handleSubscriptionRequestTimer)
418 // Delete CREATE transaction
421 // Create DELETE transaction (internal and no messages toward xapp)
422 deltrans, err := c.tracker.TrackTransaction(&trans.RmrEndpoint,
423 12020, // RIC SUBSCRIPTION DELETE
430 xapp.Logger.Error("SubReq timeout: %s, Dropping this msg.", err.Error())
431 //TODO improve error handling. Important at least in merge
432 c.registry.DelSubscription(subs.GetSubId())
436 deltrans.SubDelReqMsg = &e2ap.E2APSubscriptionDeleteRequest{}
437 deltrans.SubDelReqMsg.RequestId.Id = trans.SubReqMsg.RequestId.Id
438 deltrans.SubDelReqMsg.RequestId.Seq = uint32(subs.GetSubId())
439 deltrans.SubDelReqMsg.FunctionId = trans.SubReqMsg.FunctionId
440 packedData, err := c.e2ap.PackSubscriptionDeleteRequest(deltrans.SubDelReqMsg)
442 xapp.Logger.Error("SubReq timeout: Packing SubDelReq failed. Err: %v", err)
443 //TODO improve error handling. Important at least in merge
445 c.registry.DelSubscription(subs.GetSubId())
448 deltrans.PayloadLen = len(packedData.Buf)
449 deltrans.Payload = packedData.Buf
451 err = subs.SetTransaction(deltrans)
453 xapp.Logger.Error("SubReq timeout: %s, Dropping this msg.", err.Error())
454 //TODO improve error handling. Important at least in merge
459 c.rmrSend("SubDelReq(SubReq timer) to E2T", subs, deltrans, deltrans.Payload, deltrans.PayloadLen)
461 c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()), subDelReqTime, FirstTry, c.handleSubscriptionDeleteRequestTimer)
465 func (c *Control) handleSubscriptionDeleteRequest(params *RMRParams) {
466 xapp.Logger.Info("SubDelReq from xapp: %s", params.String())
471 trans, err := c.tracker.TrackTransaction(NewRmrEndpoint(params.Src),
479 xapp.Logger.Error("SubDelReq: %s, Dropping this msg. %s", err.Error(), params.String())
486 trans.SubDelReqMsg, err = c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
488 xapp.Logger.Error("SubDelReq: %s Dropping this msg. %s", err.Error(), trans)
496 subs := c.registry.GetSubscription(uint16(trans.SubDelReqMsg.RequestId.Seq))
497 if subs == nil && params.SubId > 0 {
498 subs = c.registry.GetSubscription(uint16(params.SubId))
502 xapp.Logger.Error("SubDelReq: Not valid subscription found payloadSeqNum: %d, SubId: %d. Dropping this msg. %s", trans.SubDelReqMsg.RequestId.Seq, params.SubId, trans)
506 xapp.Logger.Info("SubDelReq: subscription found payloadSeqNum: %d, SubId: %d. %s", trans.SubDelReqMsg.RequestId.Seq, params.SubId, trans)
508 err = subs.SetTransaction(trans)
510 xapp.Logger.Error("SubDelReq: %s, Dropping this msg. %s", err.Error(), trans)
516 // TODO: subscription delete is in fact owned by subscription and not transaction.
517 // Transaction is toward xapp while Subscription is toward ran.
518 // In merge several xapps may wake transactions, while only one subscription
519 // toward ran occurs -> subscription owns subscription creation toward ran
521 // This is intermediate solution while improving message handling
523 packedData, err := c.e2ap.PackSubscriptionDeleteRequest(trans.SubDelReqMsg)
525 xapp.Logger.Error("SubDelReq: %s for trans %s", err.Error(), trans)
530 //Optimize and store packed message to be sent (for retransmission). Again owned by subscription?
531 trans.Payload = packedData.Buf
532 trans.PayloadLen = len(packedData.Buf)
536 c.rmrSend("SubDelReq to E2T", subs, trans, trans.Payload, trans.PayloadLen)
538 c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()), subDelReqTime, FirstTry, c.handleSubscriptionDeleteRequestTimer)
542 func (c *Control) handleSubscriptionDeleteResponse(params *RMRParams) (err error) {
543 xapp.Logger.Info("SubDelResp from E2T:%s", params.String())
545 payloadSeqNum, err := c.e2ap.GetSubscriptionDeleteResponseSequenceNumber(params.Payload)
547 xapp.Logger.Error("SubDelResp: Unable to get Sequence Number from Payload. Dropping this msg. Err: %v, SubId: %v, Xid: %s, Payload %X", err, params.SubId, params.Xid, params.Payload)
550 xapp.Logger.Info("SubDelResp: Received payloadSeqNum: %v", payloadSeqNum)
552 subs := c.registry.GetSubscription(payloadSeqNum)
554 xapp.Logger.Error("SubDelResp: Unknown payloadSeqNum. Dropping this msg. PayloadSeqNum: %v, SubId: %v", payloadSeqNum, params.SubId)
558 trans := subs.GetTransaction()
560 xapp.Logger.Error("SubDelResp: Unknown trans. Dropping this msg. PayloadSeqNum: %v, SubId: %v", subs.GetSubId(), params.SubId)
564 c.timerMap.StopTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()))
566 responseReceived := trans.CheckResponseReceived()
567 if responseReceived == true {
568 // Subscription Delete timer already received
574 if trans.ForwardRespToXapp == true {
575 c.rmrReplyToSender("SubDelResp to xapp", subs, trans, params.Mtype, params.Payload, params.PayloadLen)
576 time.Sleep(3 * time.Second)
579 xapp.Logger.Info("SubDelResp: Deleting trans record. SubId: %v, Xid: %s", subs.GetSubId(), trans.GetXid())
580 if !c.registry.DelSubscription(subs.GetSubId()) {
581 xapp.Logger.Error("SubDelResp: Failed to release sequency number. SubId: %v, Xid: %s", subs.GetSubId(), trans.GetXid())
587 func (c *Control) handleSubscriptionDeleteFailure(params *RMRParams) {
588 xapp.Logger.Info("SubDelFail from E2T:%s", params.String())
590 payloadSeqNum, err := c.e2ap.GetSubscriptionDeleteFailureSequenceNumber(params.Payload)
592 xapp.Logger.Error("SubDelFail: Unable to get Sequence Number from Payload. Dropping this msg. Err: %v, %s", err, params.String())
595 xapp.Logger.Info("SubDelFail: Received payloadSeqNum: %v", payloadSeqNum)
597 subs := c.registry.GetSubscription(payloadSeqNum)
599 xapp.Logger.Error("SubDelFail: Unknown payloadSeqNum. Dropping this msg. PayloadSeqNum: %v, SubId: %v", payloadSeqNum, params.SubId)
603 trans := subs.GetTransaction()
605 xapp.Logger.Error("SubDelFail: Unknown trans. Dropping this msg. PayloadSeqNum: %v, SubId: %v", subs.GetSubId(), params.SubId)
609 c.timerMap.StopTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()))
611 responseReceived := trans.CheckResponseReceived()
612 if responseReceived == true {
613 // Subscription Delete timer already received
616 if trans.ForwardRespToXapp == true {
617 var subDelRespPayload []byte
618 subDelRespPayload, err = c.e2ap.PackSubscriptionDeleteResponseFromSubDelReq(trans.Payload, subs.GetSubId())
620 xapp.Logger.Error("SubDelFail:Packing SubDelResp failed. Err: %v", err)
624 // RIC SUBSCRIPTION DELETE RESPONSE
625 c.rmrReplyToSender("SubDelFail to xapp", subs, trans, 12021, subDelRespPayload, len(subDelRespPayload))
626 time.Sleep(3 * time.Second)
629 xapp.Logger.Info("SubDelFail: Deleting trans record. SubId: %v, Xid: %s", subs.GetSubId(), trans.GetXid())
631 if !c.registry.DelSubscription(subs.GetSubId()) {
632 xapp.Logger.Error("SubDelFail: Failed to release sequency number. Err: %v, SubId: %v, Xid: %s", err, subs.GetSubId(), trans.GetXid())
638 func (c *Control) handleSubscriptionDeleteRequestTimer(strId string, nbrId int, tryCount uint64) {
639 xapp.Logger.Info("SubDelReq timeout: subId: %v, tryCount: %v", nbrId, tryCount)
641 subs := c.registry.GetSubscription(uint16(nbrId))
643 xapp.Logger.Error("SubDelReq timeout: Unknown payloadSeqNum. Dropping this msg. SubId: %v", nbrId)
647 trans := subs.GetTransaction()
649 xapp.Logger.Error("SubDelReq timeout: Unknown trans. Dropping this msg. SubId: %v", subs.GetSubId())
653 responseReceived := trans.CheckResponseReceived()
654 if responseReceived == true {
655 // Subscription Delete Response or Failure already received
659 if tryCount < maxSubDelReqTryCount {
660 xapp.Logger.Info("SubDelReq timeout: Resending SubDelReq to E2T: Mtype: %v, SubId: %v, Xid %s, Meid %v", trans.GetMtype(), subs.GetSubId(), trans.GetXid(), trans.GetMeid())
661 // Set possible to handle new response for the subId
663 trans.RetryTransaction()
665 c.rmrSend("SubDelReq(SubDelReq timer) to E2T", subs, trans, trans.Payload, trans.PayloadLen)
668 c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()), subReqTime, tryCount, c.handleSubscriptionDeleteRequestTimer)
672 if trans.ForwardRespToXapp == true {
673 var subDelRespPayload []byte
674 subDelRespPayload, err := c.e2ap.PackSubscriptionDeleteResponseFromSubDelReq(trans.Payload, subs.GetSubId())
676 xapp.Logger.Error("SubDelReq timeout: Unable to pack payload. Dropping this this msg. Err: %v, SubId: %v, Xid: %s, Payload %x", err, subs.GetSubId(), trans.GetXid(), trans.Payload)
680 // RIC SUBSCRIPTION DELETE RESPONSE
681 c.rmrReplyToSender("SubDelResp(SubDelReq timer) to xapp", subs, trans, 12021, subDelRespPayload, len(subDelRespPayload))
683 time.Sleep(3 * time.Second)
687 xapp.Logger.Info("SubDelReq timeout: Deleting trans record. SubId: %v, Xid: %s", subs.GetSubId(), trans.GetXid())
689 if !c.registry.DelSubscription(subs.GetSubId()) {
690 xapp.Logger.Error("SubDelReq timeout: Failed to release sequency number. SubId: %v, Xid: %s", subs.GetSubId(), trans.GetXid())