2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
24 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
25 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
26 rtmgrhandle "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client/handle"
27 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
28 httptransport "github.com/go-openapi/runtime/client"
29 "github.com/go-openapi/strfmt"
30 "github.com/spf13/viper"
36 //-----------------------------------------------------------------------------
38 //-----------------------------------------------------------------------------
40 var subReqTime time.Duration = 5 * time.Second
41 var subDelReqTime time.Duration = 5 * time.Second
42 var maxSubReqTryCount uint64 = 2 // Initial try + retry
43 var maxSubDelReqTryCount uint64 = 2 // Initial try + retry
48 rtmgrClient *RtmgrClient
51 rmrSendMutex sync.Mutex
71 xapp.Logger.Info("SUBMGR")
73 viper.SetEnvPrefix("submgr")
74 viper.AllowEmptyEnv(true)
75 seedSN = uint16(viper.GetInt("seed_sn"))
77 rand.Seed(time.Now().UnixNano())
78 seedSN = uint16(rand.Intn(65535))
83 xapp.Logger.Info("SUBMGR: Initial Sequence Number: %v", seedSN)
86 func NewControl() *Control {
88 registry := new(Registry)
89 registry.Initialize(seedSN)
91 tracker := new(Tracker)
94 timerMap := new(TimerMap)
97 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
98 client := rtmgrclient.New(transport, strfmt.Default)
99 handle := rtmgrhandle.NewProvideXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
100 deleteHandle := rtmgrhandle.NewDeleteXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
101 rtmgrClient := RtmgrClient{client, handle, deleteHandle}
103 rtmgrClientPtr := &rtmgrClient
105 //TODO: to make this better. Now it is just a hack.
106 registry.rtmgrClient = rtmgrClientPtr
108 return &Control{e2ap: new(E2ap),
110 rtmgrClient: rtmgrClientPtr,
117 func (c *Control) Run() {
121 func (c *Control) rmrSendRaw(desc string, params *RMRParams) (err error) {
123 xapp.Logger.Info("%s: %s", desc, params.String())
126 for ; i <= 10 && status == false; i++ {
127 c.rmrSendMutex.Lock()
128 status = xapp.Rmr.Send(params.RMRParams, false)
129 c.rmrSendMutex.Unlock()
131 xapp.Logger.Info("rmr.Send() failed. Retry count %d, %s", i, params.String())
132 time.Sleep(500 * time.Millisecond)
136 err = fmt.Errorf("rmr.Send() failed. Retry count %d, %s", i, params.String())
137 xapp.Logger.Error("%s: %s", desc, err.Error())
138 xapp.Rmr.Free(params.Mbuf)
143 func (c *Control) rmrSend(desc string, subs *Subscription, trans *Transaction, payload []byte, payloadLen int) (err error) {
144 params := &RMRParams{&xapp.RMRParams{}}
145 params.Mtype = trans.GetMtype()
146 params.SubId = int(subs.GetSubId())
148 params.Meid = subs.GetMeid()
150 params.PayloadLen = payloadLen
151 params.Payload = payload
154 return c.rmrSendRaw(desc, params)
157 func (c *Control) rmrReplyToSender(desc string, subs *Subscription, trans *Transaction, mType int, payload []byte, payloadLen int) (err error) {
158 params := &RMRParams{&xapp.RMRParams{}}
160 params.SubId = int(subs.GetSubId())
161 params.Xid = trans.GetXid()
162 params.Meid = trans.GetMeid()
164 params.PayloadLen = payloadLen
165 params.Payload = payload
168 return c.rmrSendRaw(desc, params)
171 func (c *Control) Consume(params *xapp.RMRParams) (err error) {
172 xapp.Rmr.Free(params.Mbuf)
174 msg := &RMRParams{params}
177 case xapp.RICMessageTypes["RIC_SUB_REQ"]:
178 go c.handleSubscriptionRequest(msg)
179 case xapp.RICMessageTypes["RIC_SUB_RESP"]:
180 go c.handleSubscriptionResponse(msg)
181 case xapp.RICMessageTypes["RIC_SUB_FAILURE"]:
182 go c.handleSubscriptionFailure(msg)
183 case xapp.RICMessageTypes["RIC_SUB_DEL_REQ"]:
184 go c.handleSubscriptionDeleteRequest(msg)
185 case xapp.RICMessageTypes["RIC_SUB_DEL_RESP"]:
186 go c.handleSubscriptionDeleteResponse(msg)
187 case xapp.RICMessageTypes["RIC_SUB_DEL_FAILURE"]:
188 go c.handleSubscriptionDeleteFailure(msg)
190 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
196 func (c *Control) handleSubscriptionRequest(params *RMRParams) {
197 xapp.Logger.Info("SubReq from xapp: %s", params.String())
202 trans, err := c.tracker.TrackTransaction(NewRmrEndpoint(params.Src),
210 xapp.Logger.Error("SubReq: %s, Dropping this msg. %s", err.Error(), params.String())
217 trans.SubReqMsg, err = c.e2ap.UnpackSubscriptionRequest(params.Payload)
219 xapp.Logger.Error("SubReq: %s Dropping this msg. %s", err.Error(), trans)
227 subs, err := c.registry.ReserveSubscription(&trans.RmrEndpoint, trans.Meid)
229 xapp.Logger.Error("SubReq: %s, Dropping this msg. %s", err.Error(), trans)
234 err = subs.SetTransaction(trans)
236 xapp.Logger.Error("SubReq: %s, Dropping this msg. %s", err.Error(), trans)
237 c.registry.DelSubscription(subs.Seq)
242 trans.SubReqMsg.RequestId.Seq = uint32(subs.GetSubId())
245 // TODO: subscription create is in fact owned by subscription and not transaction.
246 // Transaction is toward xapp while Subscription is toward ran.
247 // In merge several xapps may wake transactions, while only one subscription
248 // toward ran occurs -> subscription owns subscription creation toward ran
250 // This is intermediate solution while improving message handling
252 packedData, err := c.e2ap.PackSubscriptionRequest(trans.SubReqMsg)
254 xapp.Logger.Error("SubReq: %s for trans %s", err.Error(), trans)
255 c.registry.DelSubscription(subs.Seq)
260 //Optimize and store packed message to be sent (for retransmission). Again owned by subscription?
261 trans.Payload = packedData.Buf
262 trans.PayloadLen = len(packedData.Buf)
264 c.rmrSend("SubReq to E2T", subs, trans, packedData.Buf, len(packedData.Buf))
266 c.timerMap.StartTimer("RIC_SUB_REQ", int(subs.GetSubId()), subReqTime, FirstTry, c.handleSubscriptionRequestTimer)
267 xapp.Logger.Debug("SubReq: Debugging trans table = %v", c.tracker.transactionXappTable)
271 func (c *Control) handleSubscriptionResponse(params *RMRParams) {
272 xapp.Logger.Info("SubResp from E2T: %s", params.String())
277 SubRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
279 xapp.Logger.Error("SubDelReq: %s Dropping this msg. %s", err.Error(), params.String())
286 subs := c.registry.GetSubscription(uint16(SubRespMsg.RequestId.Seq))
287 if subs == nil && params.SubId > 0 {
288 subs = c.registry.GetSubscription(uint16(params.SubId))
292 xapp.Logger.Error("SubResp: Not valid subscription found payloadSeqNum: %d, SubId: %d. Dropping this msg. %s", SubRespMsg.RequestId.Seq, params.SubId, params.String())
295 xapp.Logger.Info("SubResp: subscription found payloadSeqNum: %d, SubId: %d", SubRespMsg.RequestId.Seq, subs.GetSubId())
300 trans := subs.GetTransaction()
302 xapp.Logger.Error("SubResp: Unknown trans. Dropping this msg. SubId: %d", subs.GetSubId())
306 trans.SubRespMsg = SubRespMsg
311 c.timerMap.StopTimer("RIC_SUB_REQ", int(subs.GetSubId()))
313 responseReceived := trans.CheckResponseReceived()
314 if responseReceived == true {
315 // Subscription timer already received
319 packedData, err := c.e2ap.PackSubscriptionResponse(trans.SubRespMsg)
321 xapp.Logger.Error("SubResp: %s for trans %s", err.Error(), trans)
326 //Optimize and store packed message to be sent.
327 trans.Payload = packedData.Buf
328 trans.PayloadLen = len(packedData.Buf)
332 c.rmrReplyToSender("SubResp to xapp", subs, trans, 12011, trans.Payload, trans.PayloadLen)
336 func (c *Control) handleSubscriptionFailure(params *RMRParams) {
337 xapp.Logger.Info("SubFail from E2T: %s", params.String())
342 SubFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
344 xapp.Logger.Error("SubFail: %s Dropping this msg. %s", err.Error(), params.String())
351 subs := c.registry.GetSubscription(uint16(SubFailMsg.RequestId.Seq))
352 if subs == nil && params.SubId > 0 {
353 subs = c.registry.GetSubscription(uint16(params.SubId))
357 xapp.Logger.Error("SubFail: Not valid subscription found payloadSeqNum: %d, SubId: %d. Dropping this msg. %s", SubFailMsg.RequestId.Seq, params.SubId, params.String())
360 xapp.Logger.Info("SubFail: subscription found payloadSeqNum: %d, SubId: %d", SubFailMsg.RequestId.Seq, subs.GetSubId())
365 trans := subs.GetTransaction()
367 xapp.Logger.Error("SubFail: Unknown trans. Dropping this msg. SubId: %d", subs.GetSubId())
370 trans.SubFailMsg = SubFailMsg
375 c.timerMap.StopTimer("RIC_SUB_REQ", int(subs.GetSubId()))
377 responseReceived := trans.CheckResponseReceived()
382 if responseReceived == true {
383 // Subscription timer already received
387 packedData, err := c.e2ap.PackSubscriptionFailure(trans.SubFailMsg)
389 //TODO error handling improvement
390 xapp.Logger.Error("SubFail: %s for trans %s (continue still)", err.Error(), trans)
392 //Optimize and store packed message to be sent.
393 trans.Payload = packedData.Buf
394 trans.PayloadLen = len(packedData.Buf)
395 c.rmrReplyToSender("SubFail to xapp", subs, trans, 12012, trans.Payload, trans.PayloadLen)
396 time.Sleep(3 * time.Second)
400 if !c.registry.DelSubscription(subs.GetSubId()) {
401 xapp.Logger.Error("SubFail: Failed to release sequency number. %s", subs)
406 func (c *Control) handleSubscriptionRequestTimer(strId string, nbrId int, tryCount uint64) {
407 xapp.Logger.Info("SubReq timeout: subId: %v, tryCount: %v", nbrId, tryCount)
409 subs := c.registry.GetSubscription(uint16(nbrId))
411 xapp.Logger.Error("SubReq timeout: Unknown payloadSeqNum. Dropping this msg. SubId: %v", nbrId)
415 trans := subs.GetTransaction()
417 xapp.Logger.Error("SubReq timeout: Unknown trans. Dropping this msg. SubId: %v", subs.GetSubId())
421 responseReceived := trans.CheckResponseReceived()
423 if responseReceived == true {
424 // Subscription Response or Failure already received
428 if tryCount < maxSubReqTryCount {
429 xapp.Logger.Info("SubReq timeout: Resending SubReq to E2T: Mtype: %v, SubId: %v, Xid %s, Meid %v", trans.GetMtype(), subs.GetSubId(), trans.GetXid(), trans.GetMeid())
431 trans.RetryTransaction()
433 c.rmrSend("SubReq(SubReq timer) to E2T", subs, trans, trans.Payload, trans.PayloadLen)
436 c.timerMap.StartTimer("RIC_SUB_REQ", int(subs.GetSubId()), subReqTime, tryCount, c.handleSubscriptionRequestTimer)
440 // Delete CREATE transaction
443 // Create DELETE transaction (internal and no messages toward xapp)
444 deltrans, err := c.tracker.TrackTransaction(&trans.RmrEndpoint,
445 12020, // RIC SUBSCRIPTION DELETE
452 xapp.Logger.Error("SubReq timeout: %s, Dropping this msg.", err.Error())
453 //TODO improve error handling. Important at least in merge
454 c.registry.DelSubscription(subs.GetSubId())
458 deltrans.SubDelReqMsg = &e2ap.E2APSubscriptionDeleteRequest{}
459 deltrans.SubDelReqMsg.RequestId.Id = trans.SubReqMsg.RequestId.Id
460 deltrans.SubDelReqMsg.RequestId.Seq = uint32(subs.GetSubId())
461 deltrans.SubDelReqMsg.FunctionId = trans.SubReqMsg.FunctionId
462 packedData, err := c.e2ap.PackSubscriptionDeleteRequest(deltrans.SubDelReqMsg)
464 xapp.Logger.Error("SubReq timeout: Packing SubDelReq failed. Err: %v", err)
465 //TODO improve error handling. Important at least in merge
467 c.registry.DelSubscription(subs.GetSubId())
470 deltrans.PayloadLen = len(packedData.Buf)
471 deltrans.Payload = packedData.Buf
473 err = subs.SetTransaction(deltrans)
475 xapp.Logger.Error("SubReq timeout: %s, Dropping this msg.", err.Error())
476 //TODO improve error handling. Important at least in merge
481 c.rmrSend("SubDelReq(SubReq timer) to E2T", subs, deltrans, deltrans.Payload, deltrans.PayloadLen)
483 c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()), subDelReqTime, FirstTry, c.handleSubscriptionDeleteRequestTimer)
487 func (c *Control) handleSubscriptionDeleteRequest(params *RMRParams) {
488 xapp.Logger.Info("SubDelReq from xapp: %s", params.String())
493 trans, err := c.tracker.TrackTransaction(NewRmrEndpoint(params.Src),
501 xapp.Logger.Error("SubDelReq: %s, Dropping this msg. %s", err.Error(), params.String())
508 trans.SubDelReqMsg, err = c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
510 xapp.Logger.Error("SubDelReq: %s Dropping this msg. %s", err.Error(), trans)
518 subs := c.registry.GetSubscription(uint16(trans.SubDelReqMsg.RequestId.Seq))
519 if subs == nil && params.SubId > 0 {
520 subs = c.registry.GetSubscription(uint16(params.SubId))
524 xapp.Logger.Error("SubDelReq: Not valid subscription found payloadSeqNum: %d, SubId: %d. Dropping this msg. %s", trans.SubDelReqMsg.RequestId.Seq, params.SubId, trans)
528 xapp.Logger.Info("SubDelReq: subscription found payloadSeqNum: %d, SubId: %d. %s", trans.SubDelReqMsg.RequestId.Seq, params.SubId, trans)
530 err = subs.SetTransaction(trans)
532 xapp.Logger.Error("SubDelReq: %s, Dropping this msg. %s", err.Error(), trans)
538 // TODO: subscription delete is in fact owned by subscription and not transaction.
539 // Transaction is toward xapp while Subscription is toward ran.
540 // In merge several xapps may wake transactions, while only one subscription
541 // toward ran occurs -> subscription owns subscription creation toward ran
543 // This is intermediate solution while improving message handling
545 packedData, err := c.e2ap.PackSubscriptionDeleteRequest(trans.SubDelReqMsg)
547 xapp.Logger.Error("SubDelReq: %s for trans %s", err.Error(), trans)
552 //Optimize and store packed message to be sent (for retransmission). Again owned by subscription?
553 trans.Payload = packedData.Buf
554 trans.PayloadLen = len(packedData.Buf)
558 c.rmrSend("SubDelReq to E2T", subs, trans, trans.Payload, trans.PayloadLen)
560 c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()), subDelReqTime, FirstTry, c.handleSubscriptionDeleteRequestTimer)
564 func (c *Control) handleSubscriptionDeleteResponse(params *RMRParams) (err error) {
565 xapp.Logger.Info("SubDelResp from E2T:%s", params.String())
567 payloadSeqNum, err := c.e2ap.GetSubscriptionDeleteResponseSequenceNumber(params.Payload)
569 xapp.Logger.Error("SubDelResp: Unable to get Sequence Number from Payload. Dropping this msg. Err: %v, SubId: %v, Xid: %s, Payload %X", err, params.SubId, params.Xid, params.Payload)
572 xapp.Logger.Info("SubDelResp: Received payloadSeqNum: %v", payloadSeqNum)
574 subs := c.registry.GetSubscription(payloadSeqNum)
576 xapp.Logger.Error("SubDelResp: Unknown payloadSeqNum. Dropping this msg. PayloadSeqNum: %v, SubId: %v", payloadSeqNum, params.SubId)
580 trans := subs.GetTransaction()
582 xapp.Logger.Error("SubDelResp: Unknown trans. Dropping this msg. PayloadSeqNum: %v, SubId: %v", subs.GetSubId(), params.SubId)
586 c.timerMap.StopTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()))
588 responseReceived := trans.CheckResponseReceived()
589 if responseReceived == true {
590 // Subscription Delete timer already received
596 if trans.ForwardRespToXapp == true {
597 c.rmrReplyToSender("SubDelResp to xapp", subs, trans, params.Mtype, params.Payload, params.PayloadLen)
598 time.Sleep(3 * time.Second)
601 xapp.Logger.Info("SubDelResp: Deleting trans record. SubId: %v, Xid: %s", subs.GetSubId(), trans.GetXid())
602 if !c.registry.DelSubscription(subs.GetSubId()) {
603 xapp.Logger.Error("SubDelResp: Failed to release sequency number. SubId: %v, Xid: %s", subs.GetSubId(), trans.GetXid())
609 func (c *Control) handleSubscriptionDeleteFailure(params *RMRParams) {
610 xapp.Logger.Info("SubDelFail from E2T:%s", params.String())
612 payloadSeqNum, err := c.e2ap.GetSubscriptionDeleteFailureSequenceNumber(params.Payload)
614 xapp.Logger.Error("SubDelFail: Unable to get Sequence Number from Payload. Dropping this msg. Err: %v, %s", err, params.String())
617 xapp.Logger.Info("SubDelFail: Received payloadSeqNum: %v", payloadSeqNum)
619 subs := c.registry.GetSubscription(payloadSeqNum)
621 xapp.Logger.Error("SubDelFail: Unknown payloadSeqNum. Dropping this msg. PayloadSeqNum: %v, SubId: %v", payloadSeqNum, params.SubId)
625 trans := subs.GetTransaction()
627 xapp.Logger.Error("SubDelFail: Unknown trans. Dropping this msg. PayloadSeqNum: %v, SubId: %v", subs.GetSubId(), params.SubId)
631 c.timerMap.StopTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()))
633 responseReceived := trans.CheckResponseReceived()
634 if responseReceived == true {
635 // Subscription Delete timer already received
638 if trans.ForwardRespToXapp == true {
639 var subDelRespPayload []byte
640 subDelRespPayload, err = c.e2ap.PackSubscriptionDeleteResponseFromSubDelReq(trans.Payload, subs.GetSubId())
642 xapp.Logger.Error("SubDelFail:Packing SubDelResp failed. Err: %v", err)
646 // RIC SUBSCRIPTION DELETE RESPONSE
647 c.rmrReplyToSender("SubDelFail to xapp", subs, trans, 12021, subDelRespPayload, len(subDelRespPayload))
648 time.Sleep(3 * time.Second)
651 xapp.Logger.Info("SubDelFail: Deleting trans record. SubId: %v, Xid: %s", subs.GetSubId(), trans.GetXid())
653 if !c.registry.DelSubscription(subs.GetSubId()) {
654 xapp.Logger.Error("SubDelFail: Failed to release sequency number. Err: %v, SubId: %v, Xid: %s", err, subs.GetSubId(), trans.GetXid())
660 func (c *Control) handleSubscriptionDeleteRequestTimer(strId string, nbrId int, tryCount uint64) {
661 xapp.Logger.Info("SubDelReq timeout: subId: %v, tryCount: %v", nbrId, tryCount)
663 subs := c.registry.GetSubscription(uint16(nbrId))
665 xapp.Logger.Error("SubDelReq timeout: Unknown payloadSeqNum. Dropping this msg. SubId: %v", nbrId)
669 trans := subs.GetTransaction()
671 xapp.Logger.Error("SubDelReq timeout: Unknown trans. Dropping this msg. SubId: %v", subs.GetSubId())
675 responseReceived := trans.CheckResponseReceived()
676 if responseReceived == true {
677 // Subscription Delete Response or Failure already received
681 if tryCount < maxSubDelReqTryCount {
682 xapp.Logger.Info("SubDelReq timeout: Resending SubDelReq to E2T: Mtype: %v, SubId: %v, Xid %s, Meid %v", trans.GetMtype(), subs.GetSubId(), trans.GetXid(), trans.GetMeid())
683 // Set possible to handle new response for the subId
685 trans.RetryTransaction()
687 c.rmrSend("SubDelReq(SubDelReq timer) to E2T", subs, trans, trans.Payload, trans.PayloadLen)
690 c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()), subReqTime, tryCount, c.handleSubscriptionDeleteRequestTimer)
694 if trans.ForwardRespToXapp == true {
695 var subDelRespPayload []byte
696 subDelRespPayload, err := c.e2ap.PackSubscriptionDeleteResponseFromSubDelReq(trans.Payload, subs.GetSubId())
698 xapp.Logger.Error("SubDelReq timeout: Unable to pack payload. Dropping this this msg. Err: %v, SubId: %v, Xid: %s, Payload %x", err, subs.GetSubId(), trans.GetXid(), trans.Payload)
702 // RIC SUBSCRIPTION DELETE RESPONSE
703 c.rmrReplyToSender("SubDelResp(SubDelReq timer) to xapp", subs, trans, 12021, subDelRespPayload, len(subDelRespPayload))
705 time.Sleep(3 * time.Second)
709 xapp.Logger.Info("SubDelReq timeout: Deleting trans record. SubId: %v, Xid: %s", subs.GetSubId(), trans.GetXid())
711 if !c.registry.DelSubscription(subs.GetSubId()) {
712 xapp.Logger.Error("SubDelReq timeout: Failed to release sequency number. SubId: %v, Xid: %s", subs.GetSubId(), trans.GetXid())