2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
24 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
25 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
26 rtmgrhandle "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client/handle"
27 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
28 httptransport "github.com/go-openapi/runtime/client"
29 "github.com/go-openapi/strfmt"
30 "github.com/spf13/viper"
36 //-----------------------------------------------------------------------------
38 //-----------------------------------------------------------------------------
40 var subReqTime time.Duration = 5 * time.Second
41 var subDelReqTime time.Duration = 5 * time.Second
42 var maxSubReqTryCount uint64 = 2 // Initial try + retry
43 var maxSubDelReqTryCount uint64 = 2 // Initial try + retry
48 rtmgrClient *RtmgrClient
51 rmrSendMutex sync.Mutex
71 xapp.Logger.Info("SUBMGR")
73 viper.SetEnvPrefix("submgr")
74 viper.AllowEmptyEnv(true)
75 seedSN = uint16(viper.GetInt("seed_sn"))
77 rand.Seed(time.Now().UnixNano())
78 seedSN = uint16(rand.Intn(65535))
83 xapp.Logger.Info("SUBMGR: Initial Sequence Number: %v", seedSN)
86 func NewControl() *Control {
88 registry := new(Registry)
89 registry.Initialize(seedSN)
91 tracker := new(Tracker)
94 timerMap := new(TimerMap)
97 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
98 client := rtmgrclient.New(transport, strfmt.Default)
99 handle := rtmgrhandle.NewProvideXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
100 deleteHandle := rtmgrhandle.NewDeleteXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
101 rtmgrClient := RtmgrClient{client, handle, deleteHandle}
103 rtmgrClientPtr := &rtmgrClient
105 //TODO: to make this better. Now it is just a hack.
106 registry.rtmgrClient = rtmgrClientPtr
108 return &Control{e2ap: new(E2ap),
110 rtmgrClient: rtmgrClientPtr,
117 func (c *Control) Run() {
121 func (c *Control) rmrSendRaw(desc string, params *RMRParams) (err error) {
123 xapp.Logger.Info("%s: %s", desc, params.String())
126 for ; i <= 10 && status == false; i++ {
127 c.rmrSendMutex.Lock()
128 status = xapp.Rmr.Send(params.RMRParams, false)
129 c.rmrSendMutex.Unlock()
131 xapp.Logger.Info("rmr.Send() failed. Retry count %d, %s", i, params.String())
132 time.Sleep(500 * time.Millisecond)
136 err = fmt.Errorf("rmr.Send() failed. Retry count %d, %s", i, params.String())
137 xapp.Logger.Error("%s: %s", desc, err.Error())
138 xapp.Rmr.Free(params.Mbuf)
143 func (c *Control) rmrSend(desc string, subs *Subscription, trans *Transaction, payload []byte, payloadLen int) (err error) {
144 params := &RMRParams{&xapp.RMRParams{}}
145 params.Mtype = trans.GetMtype()
146 params.SubId = int(subs.GetSubId())
148 params.Meid = subs.GetMeid()
150 params.PayloadLen = payloadLen
151 params.Payload = payload
154 return c.rmrSendRaw(desc, params)
157 func (c *Control) rmrReplyToSender(desc string, subs *Subscription, trans *Transaction, mType int, payload []byte, payloadLen int) (err error) {
158 params := &RMRParams{&xapp.RMRParams{}}
160 params.SubId = int(subs.GetSubId())
161 params.Xid = trans.GetXid()
162 params.Meid = trans.GetMeid()
164 params.PayloadLen = payloadLen
165 params.Payload = payload
168 return c.rmrSendRaw(desc, params)
171 func (c *Control) Consume(params *xapp.RMRParams) (err error) {
172 xapp.Rmr.Free(params.Mbuf)
174 msg := &RMRParams{params}
177 case xapp.RICMessageTypes["RIC_SUB_REQ"]:
178 go c.handleSubscriptionRequest(msg)
179 case xapp.RICMessageTypes["RIC_SUB_RESP"]:
180 go c.handleSubscriptionResponse(msg)
181 case xapp.RICMessageTypes["RIC_SUB_FAILURE"]:
182 go c.handleSubscriptionFailure(msg)
183 case xapp.RICMessageTypes["RIC_SUB_DEL_REQ"]:
184 go c.handleSubscriptionDeleteRequest(msg)
185 case xapp.RICMessageTypes["RIC_SUB_DEL_RESP"]:
186 go c.handleSubscriptionDeleteResponse(msg)
187 case xapp.RICMessageTypes["RIC_SUB_DEL_FAILURE"]:
188 go c.handleSubscriptionDeleteFailure(msg)
190 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
196 func (c *Control) handleSubscriptionRequest(params *RMRParams) {
197 xapp.Logger.Info("SubReq from xapp: %s", params.String())
202 trans, err := c.tracker.TrackTransaction(NewRmrEndpoint(params.Src),
210 xapp.Logger.Error("SubReq: %s, Dropping this msg. %s", err.Error(), params.String())
217 trans.SubReqMsg, err = c.e2ap.UnpackSubscriptionRequest(params.Payload)
219 xapp.Logger.Error("SubReq: %s Dropping this msg. %s", err.Error(), trans)
227 subs, err := c.registry.ReserveSubscription(&trans.RmrEndpoint, trans.Meid)
229 xapp.Logger.Error("SubReq: %s, Dropping this msg. %s", err.Error(), trans)
234 err = subs.SetTransaction(trans)
236 xapp.Logger.Error("SubReq: %s, Dropping this msg. %s", err.Error(), trans)
237 c.registry.DelSubscription(subs.Seq)
242 trans.SubReqMsg.RequestId.Seq = uint32(subs.GetSubId())
245 // TODO: subscription create is in fact owned by subscription and not transaction.
246 // Transaction is toward xapp while Subscription is toward ran.
247 // In merge several xapps may wake transactions, while only one subscription
248 // toward ran occurs -> subscription owns subscription creation toward ran
250 // This is intermediate solution while improving message handling
252 packedData, err := c.e2ap.PackSubscriptionRequest(trans.SubReqMsg)
254 xapp.Logger.Error("SubReq: %s for trans %s", err.Error(), trans)
255 c.registry.DelSubscription(subs.Seq)
260 //Optimize and store packed message to be sent (for retransmission). Again owned by subscription?
261 trans.Payload = packedData.Buf
262 trans.PayloadLen = len(packedData.Buf)
264 c.rmrSend("SubReq to E2T", subs, trans, packedData.Buf, len(packedData.Buf))
266 c.timerMap.StartTimer("RIC_SUB_REQ", int(subs.Seq), subReqTime, FirstTry, c.handleSubscriptionRequestTimer)
267 xapp.Logger.Debug("SubReq: Debugging trans table = %v", c.tracker.transactionXappTable)
271 func (c *Control) handleSubscriptionResponse(params *RMRParams) {
272 xapp.Logger.Info("SubResp from E2T: %s", params.String())
274 payloadSeqNum, err := c.e2ap.GetSubscriptionResponseSequenceNumber(params.Payload)
276 xapp.Logger.Error("SubResp: Unable to get Sequence Number from Payload. Dropping this msg. Err: %v, SubId: %v, Xid: %s, Payload %X", err, params.SubId, params.Xid, params.Payload)
279 xapp.Logger.Info("SubResp: Received payloadSeqNum: %v", payloadSeqNum)
281 subs := c.registry.GetSubscription(payloadSeqNum)
283 xapp.Logger.Error("SubResp: Unknown payloadSeqNum. Dropping this msg. PayloadSeqNum: %v, SubId: %v", payloadSeqNum, params.SubId)
287 trans := subs.GetTransaction()
289 c.timerMap.StopTimer("RIC_SUB_REQ", int(payloadSeqNum))
291 responseReceived := trans.CheckResponseReceived()
292 if responseReceived == true {
293 // Subscription timer already received
299 c.rmrReplyToSender("SubResp to xapp", subs, trans, params.Mtype, params.Payload, params.PayloadLen)
300 xapp.Logger.Info("SubResp: SubId: %v, from address: %s. Deleting trans record", payloadSeqNum, trans.RmrEndpoint)
304 func (c *Control) handleSubscriptionFailure(params *RMRParams) {
305 xapp.Logger.Info("SubFail from E2T: %s", params.String())
307 payloadSeqNum, err := c.e2ap.GetSubscriptionFailureSequenceNumber(params.Payload)
309 xapp.Logger.Error("SubFail: Unable to get Sequence Number from Payload. Dropping this msg. Err: %v, SubId: %v, Xid: %s, Payload %X", err, params.SubId, params.Xid, params.Payload)
312 xapp.Logger.Info("SubFail: Received payloadSeqNum: %v", payloadSeqNum)
314 subs := c.registry.GetSubscription(payloadSeqNum)
316 xapp.Logger.Error("SubFail: Unknown payloadSeqNum. Dropping this msg. PayloadSeqNum: %v, SubId: %v", payloadSeqNum, params.SubId)
320 trans := subs.GetTransaction()
322 xapp.Logger.Error("SubFail: Unknown trans. Dropping this msg. PayloadSeqNum: %v, SubId: %v", payloadSeqNum, params.SubId)
326 c.timerMap.StopTimer("RIC_SUB_REQ", int(payloadSeqNum))
328 responseReceived := trans.CheckResponseReceived()
330 xapp.Logger.Info("SubFail: Dropping this msg. Err: %v SubId: %v", err, payloadSeqNum)
334 if responseReceived == true {
335 // Subscription timer already received
338 xapp.Logger.Info("SubFail: SubId: %v, from address: %s. Forwarding response to xApp", payloadSeqNum, trans.RmrEndpoint)
340 c.rmrReplyToSender("SubFail to xapp", subs, trans, params.Mtype, params.Payload, params.PayloadLen)
342 time.Sleep(3 * time.Second)
344 xapp.Logger.Info("SubFail: Deleting trans record. SubId: %v, Xid: %s", params.SubId, params.Xid)
346 if !c.registry.DelSubscription(payloadSeqNum) {
347 xapp.Logger.Error("SubFail: Failed to release sequency number. SubId: %v, Xid: %s", params.SubId, params.Xid)
352 func (c *Control) handleSubscriptionRequestTimer(strId string, nbrId int, tryCount uint64) {
353 xapp.Logger.Info("SubReq timeout: subId: %v, tryCount: %v", nbrId, tryCount)
355 subs := c.registry.GetSubscription(uint16(nbrId))
357 xapp.Logger.Error("SubReq timeout: Unknown payloadSeqNum. Dropping this msg. SubId: %v", nbrId)
361 trans := subs.GetTransaction()
363 xapp.Logger.Error("SubReq timeout: Unknown trans. Dropping this msg. SubId: %v", subs.GetSubId())
367 responseReceived := trans.CheckResponseReceived()
369 if responseReceived == true {
370 // Subscription Response or Failure already received
374 if tryCount < maxSubReqTryCount {
375 xapp.Logger.Info("SubReq timeout: Resending SubReq to E2T: Mtype: %v, SubId: %v, Xid %s, Meid %v", trans.GetMtype(), subs.GetSubId(), trans.GetXid(), trans.GetMeid())
377 trans.RetryTransaction()
379 c.rmrSend("SubReq(SubReq timer) to E2T", subs, trans, trans.Payload, trans.PayloadLen)
382 c.timerMap.StartTimer("RIC_SUB_REQ", int(subs.GetSubId()), subReqTime, tryCount, c.handleSubscriptionRequestTimer)
386 // Delete CREATE transaction
389 // Create DELETE transaction (internal and no messages toward xapp)
390 deltrans, err := c.tracker.TrackTransaction(&trans.RmrEndpoint,
391 12020, // RIC SUBSCRIPTION DELETE
398 xapp.Logger.Error("SubReq timeout: %s, Dropping this msg.", err.Error())
399 //TODO improve error handling. Important at least in merge
400 c.registry.DelSubscription(subs.GetSubId())
404 deltrans.SubDelReqMsg = &e2ap.E2APSubscriptionDeleteRequest{}
405 deltrans.SubDelReqMsg.RequestId.Id = trans.SubReqMsg.RequestId.Id
406 deltrans.SubDelReqMsg.RequestId.Seq = uint32(subs.GetSubId())
407 deltrans.SubDelReqMsg.FunctionId = trans.SubReqMsg.FunctionId
408 packedData, err := c.e2ap.PackSubscriptionDeleteRequest(deltrans.SubDelReqMsg)
410 xapp.Logger.Error("SubReq timeout: Packing SubDelReq failed. Err: %v", err)
411 //TODO improve error handling. Important at least in merge
413 c.registry.DelSubscription(subs.GetSubId())
416 deltrans.PayloadLen = len(packedData.Buf)
417 deltrans.Payload = packedData.Buf
419 err = subs.SetTransaction(deltrans)
421 xapp.Logger.Error("SubReq timeout: %s, Dropping this msg.", err.Error())
422 //TODO improve error handling. Important at least in merge
427 c.rmrSend("SubDelReq(SubReq timer) to E2T", subs, deltrans, deltrans.Payload, deltrans.PayloadLen)
429 c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()), subDelReqTime, FirstTry, c.handleSubscriptionDeleteRequestTimer)
433 func (c *Control) handleSubscriptionDeleteRequest(params *RMRParams) {
434 var subs *Subscription
436 xapp.Logger.Info("SubDelReq from xapp: %s", params.String())
438 trans, err := c.tracker.TrackTransaction(NewRmrEndpoint(params.Src),
446 xapp.Logger.Error("SubDelReq: %s, Dropping this msg. %s", err.Error(), params.String())
453 trans.SubDelReqMsg, err = c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
455 xapp.Logger.Error("SubDelReq: %s Dropping this msg. %s", err.Error(), trans)
460 subs = c.registry.GetSubscription(uint16(trans.SubDelReqMsg.RequestId.Seq))
461 if subs == nil && params.SubId > 0 {
462 subs = c.registry.GetSubscription(uint16(params.SubId))
466 xapp.Logger.Error("SubDelReq: Not valid subscription found payloadSeqNum: %d, SubId: %d. Dropping this msg. %s", trans.SubDelReqMsg.RequestId.Seq, params.SubId, trans)
470 xapp.Logger.Info("SubDelReq: subscription found payloadSeqNum: %d, SubId: %d. %s", trans.SubDelReqMsg.RequestId.Seq, params.SubId, trans)
472 err = subs.SetTransaction(trans)
474 xapp.Logger.Error("SubDelReq: %s, Dropping this msg. %s", err.Error(), trans)
480 // TODO: subscription delete is in fact owned by subscription and not transaction.
481 // Transaction is toward xapp while Subscription is toward ran.
482 // In merge several xapps may wake transactions, while only one subscription
483 // toward ran occurs -> subscription owns subscription creation toward ran
485 // This is intermediate solution while improving message handling
487 packedData, err := c.e2ap.PackSubscriptionDeleteRequest(trans.SubDelReqMsg)
489 xapp.Logger.Error("SubDelReq: %s for trans %s", err.Error(), trans)
494 //Optimize and store packed message to be sent (for retransmission). Again owned by subscription?
495 trans.Payload = packedData.Buf
496 trans.PayloadLen = len(packedData.Buf)
500 c.rmrSend("SubDelReq to E2T", subs, trans, trans.Payload, trans.PayloadLen)
502 c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()), subDelReqTime, FirstTry, c.handleSubscriptionDeleteRequestTimer)
506 func (c *Control) handleSubscriptionDeleteResponse(params *RMRParams) (err error) {
507 xapp.Logger.Info("SubDelResp from E2T:%s", params.String())
509 payloadSeqNum, err := c.e2ap.GetSubscriptionDeleteResponseSequenceNumber(params.Payload)
511 xapp.Logger.Error("SubDelResp: Unable to get Sequence Number from Payload. Dropping this msg. Err: %v, SubId: %v, Xid: %s, Payload %X", err, params.SubId, params.Xid, params.Payload)
514 xapp.Logger.Info("SubDelResp: Received payloadSeqNum: %v", payloadSeqNum)
516 subs := c.registry.GetSubscription(payloadSeqNum)
518 xapp.Logger.Error("SubDelResp: Unknown payloadSeqNum. Dropping this msg. PayloadSeqNum: %v, SubId: %v", payloadSeqNum, params.SubId)
522 trans := subs.GetTransaction()
524 xapp.Logger.Error("SubDelResp: Unknown trans. Dropping this msg. PayloadSeqNum: %v, SubId: %v", subs.GetSubId(), params.SubId)
528 c.timerMap.StopTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()))
530 responseReceived := trans.CheckResponseReceived()
531 if responseReceived == true {
532 // Subscription Delete timer already received
538 if trans.ForwardRespToXapp == true {
539 c.rmrReplyToSender("SubDelResp to xapp", subs, trans, params.Mtype, params.Payload, params.PayloadLen)
540 time.Sleep(3 * time.Second)
543 xapp.Logger.Info("SubDelResp: Deleting trans record. SubId: %v, Xid: %s", subs.GetSubId(), trans.GetXid())
544 if !c.registry.DelSubscription(subs.GetSubId()) {
545 xapp.Logger.Error("SubDelResp: Failed to release sequency number. SubId: %v, Xid: %s", subs.GetSubId(), trans.GetXid())
551 func (c *Control) handleSubscriptionDeleteFailure(params *RMRParams) {
552 xapp.Logger.Info("SubDelFail from E2T:%s", params.String())
554 payloadSeqNum, err := c.e2ap.GetSubscriptionDeleteFailureSequenceNumber(params.Payload)
556 xapp.Logger.Error("SubDelFail: Unable to get Sequence Number from Payload. Dropping this msg. Err: %v, %s", err, params.String())
559 xapp.Logger.Info("SubDelFail: Received payloadSeqNum: %v", payloadSeqNum)
561 subs := c.registry.GetSubscription(payloadSeqNum)
563 xapp.Logger.Error("SubDelFail: Unknown payloadSeqNum. Dropping this msg. PayloadSeqNum: %v, SubId: %v", payloadSeqNum, params.SubId)
567 trans := subs.GetTransaction()
569 xapp.Logger.Error("SubDelFail: Unknown trans. Dropping this msg. PayloadSeqNum: %v, SubId: %v", subs.GetSubId(), params.SubId)
573 c.timerMap.StopTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()))
575 responseReceived := trans.CheckResponseReceived()
576 if responseReceived == true {
577 // Subscription Delete timer already received
580 if trans.ForwardRespToXapp == true {
581 var subDelRespPayload []byte
582 subDelRespPayload, err = c.e2ap.PackSubscriptionDeleteResponseFromSubDelReq(trans.Payload, subs.GetSubId())
584 xapp.Logger.Error("SubDelFail:Packing SubDelResp failed. Err: %v", err)
588 // RIC SUBSCRIPTION DELETE RESPONSE
589 c.rmrReplyToSender("SubDelFail to xapp", subs, trans, 12021, subDelRespPayload, len(subDelRespPayload))
590 time.Sleep(3 * time.Second)
593 xapp.Logger.Info("SubDelFail: Deleting trans record. SubId: %v, Xid: %s", subs.GetSubId(), trans.GetXid())
595 if !c.registry.DelSubscription(subs.GetSubId()) {
596 xapp.Logger.Error("SubDelFail: Failed to release sequency number. Err: %v, SubId: %v, Xid: %s", err, subs.GetSubId(), trans.GetXid())
602 func (c *Control) handleSubscriptionDeleteRequestTimer(strId string, nbrId int, tryCount uint64) {
603 xapp.Logger.Info("SubDelReq timeout: subId: %v, tryCount: %v", nbrId, tryCount)
605 subs := c.registry.GetSubscription(uint16(nbrId))
607 xapp.Logger.Error("SubDelReq timeout: Unknown payloadSeqNum. Dropping this msg. SubId: %v", nbrId)
611 trans := subs.GetTransaction()
613 xapp.Logger.Error("SubDelReq timeout: Unknown trans. Dropping this msg. SubId: %v", subs.GetSubId())
617 responseReceived := trans.CheckResponseReceived()
618 if responseReceived == true {
619 // Subscription Delete Response or Failure already received
623 if tryCount < maxSubDelReqTryCount {
624 xapp.Logger.Info("SubDelReq timeout: Resending SubDelReq to E2T: Mtype: %v, SubId: %v, Xid %s, Meid %v", trans.GetMtype(), subs.GetSubId(), trans.GetXid(), trans.GetMeid())
625 // Set possible to handle new response for the subId
627 trans.RetryTransaction()
629 c.rmrSend("SubDelReq(SubDelReq timer) to E2T", subs, trans, trans.Payload, trans.PayloadLen)
632 c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()), subReqTime, tryCount, c.handleSubscriptionDeleteRequestTimer)
636 if trans.ForwardRespToXapp == true {
637 var subDelRespPayload []byte
638 subDelRespPayload, err := c.e2ap.PackSubscriptionDeleteResponseFromSubDelReq(trans.Payload, subs.GetSubId())
640 xapp.Logger.Error("SubDelReq timeout: Unable to pack payload. Dropping this this msg. Err: %v, SubId: %v, Xid: %s, Payload %x", err, subs.GetSubId(), trans.GetXid(), trans.Payload)
644 // RIC SUBSCRIPTION DELETE RESPONSE
645 c.rmrReplyToSender("SubDelResp(SubDelReq timer) to xapp", subs, trans, 12021, subDelRespPayload, len(subDelRespPayload))
647 time.Sleep(3 * time.Second)
651 xapp.Logger.Info("SubDelReq timeout: Deleting trans record. SubId: %v, Xid: %s", subs.GetSubId(), trans.GetXid())
653 if !c.registry.DelSubscription(subs.GetSubId()) {
654 xapp.Logger.Error("SubDelReq timeout: Failed to release sequency number. SubId: %v, Xid: %s", subs.GetSubId(), trans.GetXid())