2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
24 "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
25 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
26 rtmgrhandle "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client/handle"
27 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
28 httptransport "github.com/go-openapi/runtime/client"
29 "github.com/go-openapi/strfmt"
30 "github.com/spf13/viper"
36 //-----------------------------------------------------------------------------
38 //-----------------------------------------------------------------------------
40 var subReqTime time.Duration = 5 * time.Second
41 var subDelReqTime time.Duration = 5 * time.Second
42 var maxSubReqTryCount uint64 = 2 // Initial try + retry
43 var maxSubDelReqTryCount uint64 = 2 // Initial try + retry
48 rtmgrClient *RtmgrClient
51 rmrSendMutex sync.Mutex
71 xapp.Logger.Info("SUBMGR")
73 viper.SetEnvPrefix("submgr")
74 viper.AllowEmptyEnv(true)
75 seedSN = uint16(viper.GetInt("seed_sn"))
77 rand.Seed(time.Now().UnixNano())
78 seedSN = uint16(rand.Intn(65535))
83 xapp.Logger.Info("SUBMGR: Initial Sequence Number: %v", seedSN)
86 func NewControl() *Control {
88 registry := new(Registry)
89 registry.Initialize(seedSN)
91 tracker := new(Tracker)
94 timerMap := new(TimerMap)
97 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
98 client := rtmgrclient.New(transport, strfmt.Default)
99 handle := rtmgrhandle.NewProvideXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
100 deleteHandle := rtmgrhandle.NewDeleteXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
101 rtmgrClient := RtmgrClient{client, handle, deleteHandle}
103 rtmgrClientPtr := &rtmgrClient
105 //TODO: to make this better. Now it is just a hack.
106 registry.rtmgrClient = rtmgrClientPtr
108 return &Control{e2ap: new(E2ap),
110 rtmgrClient: rtmgrClientPtr,
117 func (c *Control) Run() {
121 func (c *Control) rmrSendRaw(desc string, params *RMRParams) (err error) {
123 xapp.Logger.Info("%s: %s", desc, params.String())
126 for ; i <= 10 && status == false; i++ {
127 c.rmrSendMutex.Lock()
128 status = xapp.Rmr.Send(params.RMRParams, false)
129 c.rmrSendMutex.Unlock()
131 xapp.Logger.Info("rmr.Send() failed. Retry count %d, %s", i, params.String())
132 time.Sleep(500 * time.Millisecond)
136 err = fmt.Errorf("rmr.Send() failed. Retry count %d, %s", i, params.String())
137 xapp.Logger.Error("%s: %s", desc, err.Error())
138 xapp.Rmr.Free(params.Mbuf)
143 func (c *Control) rmrSend(desc string, subs *Subscription, trans *Transaction, payload []byte, payloadLen int) (err error) {
144 params := &RMRParams{&xapp.RMRParams{}}
145 params.Mtype = trans.GetMtype()
146 params.SubId = int(subs.GetSubId())
148 params.Meid = subs.GetMeid()
150 params.PayloadLen = payloadLen
151 params.Payload = payload
154 return c.rmrSendRaw(desc, params)
157 func (c *Control) rmrReplyToSender(desc string, subs *Subscription, trans *Transaction, mType int, payload []byte, payloadLen int) (err error) {
158 params := &RMRParams{&xapp.RMRParams{}}
160 params.SubId = int(subs.GetSubId())
161 params.Xid = trans.GetXid()
162 params.Meid = trans.GetMeid()
164 params.PayloadLen = payloadLen
165 params.Payload = payload
168 return c.rmrSendRaw(desc, params)
171 func (c *Control) Consume(params *xapp.RMRParams) (err error) {
172 xapp.Rmr.Free(params.Mbuf)
174 msg := &RMRParams{params}
177 case xapp.RICMessageTypes["RIC_SUB_REQ"]:
178 go c.handleSubscriptionRequest(msg)
179 case xapp.RICMessageTypes["RIC_SUB_RESP"]:
180 go c.handleSubscriptionResponse(msg)
181 case xapp.RICMessageTypes["RIC_SUB_FAILURE"]:
182 go c.handleSubscriptionFailure(msg)
183 case xapp.RICMessageTypes["RIC_SUB_DEL_REQ"]:
184 go c.handleSubscriptionDeleteRequest(msg)
185 case xapp.RICMessageTypes["RIC_SUB_DEL_RESP"]:
186 go c.handleSubscriptionDeleteResponse(msg)
187 case xapp.RICMessageTypes["RIC_SUB_DEL_FAILURE"]:
188 go c.handleSubscriptionDeleteFailure(msg)
190 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
196 func (c *Control) handleSubscriptionRequest(params *RMRParams) {
197 xapp.Logger.Info("SubReq from xapp: %s", params.String())
202 trans, err := c.tracker.TrackTransaction(NewRmrEndpoint(params.Src),
210 xapp.Logger.Error("SubReq: %s, Dropping this msg. %s", err.Error(), params.String())
217 trans.SubReqMsg, err = c.e2ap.UnpackSubscriptionRequest(params.Payload)
219 xapp.Logger.Error("SubReq: %s Dropping this msg. %s", err.Error(), trans)
227 subs, err := c.registry.ReserveSubscription(&trans.RmrEndpoint, trans.Meid)
229 xapp.Logger.Error("SubReq: %s, Dropping this msg. %s", err.Error(), trans)
234 err = subs.SetTransaction(trans)
236 xapp.Logger.Error("SubReq: %s, Dropping this msg. %s", err.Error(), trans)
237 c.registry.DelSubscription(subs.Seq)
242 trans.SubReqMsg.RequestId.Seq = uint32(subs.GetSubId())
245 // TODO: subscription create is in fact owned by subscription and not transaction.
246 // Transaction is toward xapp while Subscription is toward ran.
247 // In merge several xapps may wake transactions, while only one subscription
248 // toward ran occurs -> subscription owns subscription creation toward ran
250 // This is intermediate solution while improving message handling
252 packedData, err := c.e2ap.PackSubscriptionRequest(trans.SubReqMsg)
254 xapp.Logger.Error("SubReq: %s for trans %s", err.Error(), trans)
255 c.registry.DelSubscription(subs.Seq)
260 //Optimize and store packed message to be sent (for retransmission). Again owned by subscription?
261 trans.Payload = packedData.Buf
262 trans.PayloadLen = len(packedData.Buf)
264 c.rmrSend("SubReq to E2T", subs, trans, packedData.Buf, len(packedData.Buf))
266 c.timerMap.StartTimer("RIC_SUB_REQ", int(subs.Seq), subReqTime, FirstTry, c.handleSubscriptionRequestTimer)
267 xapp.Logger.Debug("SubReq: Debugging trans table = %v", c.tracker.transactionXappTable)
271 func (c *Control) handleSubscriptionResponse(params *RMRParams) {
272 xapp.Logger.Info("SubResp from E2T: %s", params.String())
274 payloadSeqNum, err := c.e2ap.GetSubscriptionResponseSequenceNumber(params.Payload)
276 xapp.Logger.Error("SubResp: Unable to get Sequence Number from Payload. Dropping this msg. Err: %v, SubId: %v, Xid: %s, Payload %X", err, params.SubId, params.Xid, params.Payload)
279 xapp.Logger.Info("SubResp: Received payloadSeqNum: %v", payloadSeqNum)
281 subs := c.registry.GetSubscription(payloadSeqNum)
283 xapp.Logger.Error("SubResp: Unknown payloadSeqNum. Dropping this msg. PayloadSeqNum: %v, SubId: %v", payloadSeqNum, params.SubId)
287 trans := subs.GetTransaction()
289 c.timerMap.StopTimer("RIC_SUB_REQ", int(payloadSeqNum))
291 responseReceived := trans.CheckResponseReceived()
292 if responseReceived == true {
293 // Subscription timer already received
299 c.rmrReplyToSender("SubResp to xapp", subs, trans, params.Mtype, params.Payload, params.PayloadLen)
300 xapp.Logger.Info("SubResp: SubId: %v, from address: %s. Deleting trans record", payloadSeqNum, trans.RmrEndpoint)
304 func (c *Control) handleSubscriptionFailure(params *RMRParams) {
305 xapp.Logger.Info("SubFail from E2T: %s", params.String())
307 payloadSeqNum, err := c.e2ap.GetSubscriptionFailureSequenceNumber(params.Payload)
309 xapp.Logger.Error("SubFail: Unable to get Sequence Number from Payload. Dropping this msg. Err: %v, SubId: %v, Xid: %s, Payload %X", err, params.SubId, params.Xid, params.Payload)
312 xapp.Logger.Info("SubFail: Received payloadSeqNum: %v", payloadSeqNum)
314 subs := c.registry.GetSubscription(payloadSeqNum)
316 xapp.Logger.Error("SubFail: Unknown payloadSeqNum. Dropping this msg. PayloadSeqNum: %v, SubId: %v", payloadSeqNum, params.SubId)
320 trans := subs.GetTransaction()
322 xapp.Logger.Error("SubFail: Unknown trans. Dropping this msg. PayloadSeqNum: %v, SubId: %v", payloadSeqNum, params.SubId)
326 c.timerMap.StopTimer("RIC_SUB_REQ", int(payloadSeqNum))
328 responseReceived := trans.CheckResponseReceived()
330 xapp.Logger.Info("SubFail: Dropping this msg. Err: %v SubId: %v", err, payloadSeqNum)
334 if responseReceived == true {
335 // Subscription timer already received
338 xapp.Logger.Info("SubFail: SubId: %v, from address: %s. Forwarding response to xApp", payloadSeqNum, trans.RmrEndpoint)
340 c.rmrReplyToSender("SubFail to xapp", subs, trans, params.Mtype, params.Payload, params.PayloadLen)
342 time.Sleep(3 * time.Second)
344 xapp.Logger.Info("SubFail: Deleting trans record. SubId: %v, Xid: %s", params.SubId, params.Xid)
346 if !c.registry.DelSubscription(payloadSeqNum) {
347 xapp.Logger.Error("SubFail: Failed to release sequency number. SubId: %v, Xid: %s", params.SubId, params.Xid)
352 func (c *Control) handleSubscriptionRequestTimer(strId string, nbrId int, tryCount uint64) {
353 xapp.Logger.Info("SubReq timeout: subId: %v, tryCount: %v", nbrId, tryCount)
355 subs := c.registry.GetSubscription(uint16(nbrId))
357 xapp.Logger.Error("SubReq timeout: Unknown payloadSeqNum. Dropping this msg. SubId: %v", nbrId)
361 trans := subs.GetTransaction()
363 xapp.Logger.Error("SubReq timeout: Unknown trans. Dropping this msg. SubId: %v", subs.GetSubId())
367 responseReceived := trans.CheckResponseReceived()
369 if responseReceived == true {
370 // Subscription Response or Failure already received
374 if tryCount < maxSubReqTryCount {
375 xapp.Logger.Info("SubReq timeout: Resending SubReq to E2T: Mtype: %v, SubId: %v, Xid %s, Meid %v", trans.GetMtype(), subs.GetSubId(), trans.GetXid(), trans.GetMeid())
377 trans.RetryTransaction()
379 c.rmrSend("SubReq(SubReq timer) to E2T", subs, trans, trans.Payload, trans.PayloadLen)
382 c.timerMap.StartTimer("RIC_SUB_REQ", int(subs.GetSubId()), subReqTime, tryCount, c.handleSubscriptionRequestTimer)
386 // Delete CREATE transaction
389 // Create DELETE transaction (internal and no messages toward xapp)
390 deltrans, err := c.tracker.TrackTransaction(&trans.RmrEndpoint,
391 12020, // RIC SUBSCRIPTION DELETE
398 xapp.Logger.Error("SubReq timeout: %s, Dropping this msg.", err.Error())
399 //TODO improve error handling. Important at least in merge
400 c.registry.DelSubscription(subs.GetSubId())
404 deltrans.SubDelReqMsg = &e2ap.E2APSubscriptionDeleteRequest{}
405 deltrans.SubDelReqMsg.RequestId.Id = trans.SubReqMsg.RequestId.Id
406 deltrans.SubDelReqMsg.RequestId.Seq = uint32(subs.GetSubId())
407 deltrans.SubDelReqMsg.FunctionId = trans.SubReqMsg.FunctionId
408 packedData, err := c.e2ap.PackSubscriptionDeleteRequest(deltrans.SubDelReqMsg)
410 xapp.Logger.Error("SubReq timeout: Packing SubDelReq failed. Err: %v", err)
411 //TODO improve error handling. Important at least in merge
413 c.registry.DelSubscription(subs.GetSubId())
416 deltrans.PayloadLen = len(packedData.Buf)
417 deltrans.Payload = packedData.Buf
419 err = subs.SetTransaction(deltrans)
421 xapp.Logger.Error("SubReq timeout: %s, Dropping this msg.", err.Error())
422 //TODO improve error handling. Important at least in merge
427 c.rmrSend("SubDelReq(SubReq timer) to E2T", subs, deltrans, deltrans.Payload, deltrans.PayloadLen)
429 c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()), subDelReqTime, FirstTry, c.handleSubscriptionDeleteRequestTimer)
433 func (c *Control) handleSubscriptionDeleteRequest(params *RMRParams) {
434 var subs *Subscription
436 xapp.Logger.Info("SubDelReq from xapp: %s", params.String())
438 trans, err := c.tracker.TrackTransaction(NewRmrEndpoint(params.Src),
446 xapp.Logger.Error("SubDelReq: %s, Dropping this msg. %s", err.Error(), params.String())
450 payloadSeqNum, err := c.e2ap.GetSubscriptionDeleteRequestSequenceNumber(params.Payload)
452 subs = c.registry.GetSubscription(payloadSeqNum)
454 if subs == nil && params.SubId > 0 {
455 subs = c.registry.GetSubscription(uint16(params.SubId))
459 xapp.Logger.Error("SubDelReq: Not valid subscription found payloadSeqNum: %d. Dropping this msg. %s", payloadSeqNum, trans)
463 xapp.Logger.Info("SubDelReq: subscription found payloadSeqNum: %d. %s", payloadSeqNum, trans)
465 trans.PayloadLen = params.PayloadLen
466 trans.Payload = params.Payload
468 err = subs.SetTransaction(trans)
470 xapp.Logger.Error("SubDelReq: %s, Dropping this msg. %s", err.Error(), trans)
477 c.rmrSend("SubDelReq to E2T", subs, trans, trans.Payload, trans.PayloadLen)
479 c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()), subDelReqTime, FirstTry, c.handleSubscriptionDeleteRequestTimer)
483 func (c *Control) handleSubscriptionDeleteResponse(params *RMRParams) (err error) {
484 xapp.Logger.Info("SubDelResp from E2T:%s", params.String())
486 payloadSeqNum, err := c.e2ap.GetSubscriptionDeleteResponseSequenceNumber(params.Payload)
488 xapp.Logger.Error("SubDelResp: Unable to get Sequence Number from Payload. Dropping this msg. Err: %v, SubId: %v, Xid: %s, Payload %X", err, params.SubId, params.Xid, params.Payload)
491 xapp.Logger.Info("SubDelResp: Received payloadSeqNum: %v", payloadSeqNum)
493 subs := c.registry.GetSubscription(payloadSeqNum)
495 xapp.Logger.Error("SubDelResp: Unknown payloadSeqNum. Dropping this msg. PayloadSeqNum: %v, SubId: %v", payloadSeqNum, params.SubId)
499 trans := subs.GetTransaction()
501 xapp.Logger.Error("SubDelResp: Unknown trans. Dropping this msg. PayloadSeqNum: %v, SubId: %v", subs.GetSubId(), params.SubId)
505 c.timerMap.StopTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()))
507 responseReceived := trans.CheckResponseReceived()
508 if responseReceived == true {
509 // Subscription Delete timer already received
515 if trans.ForwardRespToXapp == true {
516 c.rmrReplyToSender("SubDelResp to xapp", subs, trans, params.Mtype, params.Payload, params.PayloadLen)
517 time.Sleep(3 * time.Second)
520 xapp.Logger.Info("SubDelResp: Deleting trans record. SubId: %v, Xid: %s", subs.GetSubId(), trans.GetXid())
521 if !c.registry.DelSubscription(subs.GetSubId()) {
522 xapp.Logger.Error("SubDelResp: Failed to release sequency number. SubId: %v, Xid: %s", subs.GetSubId(), trans.GetXid())
528 func (c *Control) handleSubscriptionDeleteFailure(params *RMRParams) {
529 xapp.Logger.Info("SubDelFail from E2T:%s", params.String())
531 payloadSeqNum, err := c.e2ap.GetSubscriptionDeleteFailureSequenceNumber(params.Payload)
533 xapp.Logger.Error("SubDelFail: Unable to get Sequence Number from Payload. Dropping this msg. Err: %v, %s", err, params.String())
536 xapp.Logger.Info("SubDelFail: Received payloadSeqNum: %v", payloadSeqNum)
538 subs := c.registry.GetSubscription(payloadSeqNum)
540 xapp.Logger.Error("SubDelFail: Unknown payloadSeqNum. Dropping this msg. PayloadSeqNum: %v, SubId: %v", payloadSeqNum, params.SubId)
544 trans := subs.GetTransaction()
546 xapp.Logger.Error("SubDelFail: Unknown trans. Dropping this msg. PayloadSeqNum: %v, SubId: %v", subs.GetSubId(), params.SubId)
550 c.timerMap.StopTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()))
552 responseReceived := trans.CheckResponseReceived()
553 if responseReceived == true {
554 // Subscription Delete timer already received
557 if trans.ForwardRespToXapp == true {
558 var subDelRespPayload []byte
559 subDelRespPayload, err = c.e2ap.PackSubscriptionDeleteResponseFromSubDelReq(trans.Payload, subs.GetSubId())
561 xapp.Logger.Error("SubDelFail:Packing SubDelResp failed. Err: %v", err)
565 // RIC SUBSCRIPTION DELETE RESPONSE
566 c.rmrReplyToSender("SubDelFail to xapp", subs, trans, 12021, subDelRespPayload, len(subDelRespPayload))
567 time.Sleep(3 * time.Second)
570 xapp.Logger.Info("SubDelFail: Deleting trans record. SubId: %v, Xid: %s", subs.GetSubId(), trans.GetXid())
572 if !c.registry.DelSubscription(subs.GetSubId()) {
573 xapp.Logger.Error("SubDelFail: Failed to release sequency number. Err: %v, SubId: %v, Xid: %s", err, subs.GetSubId(), trans.GetXid())
579 func (c *Control) handleSubscriptionDeleteRequestTimer(strId string, nbrId int, tryCount uint64) {
580 xapp.Logger.Info("SubDelReq timeout: subId: %v, tryCount: %v", nbrId, tryCount)
582 subs := c.registry.GetSubscription(uint16(nbrId))
584 xapp.Logger.Error("SubDelReq timeout: Unknown payloadSeqNum. Dropping this msg. SubId: %v", nbrId)
588 trans := subs.GetTransaction()
590 xapp.Logger.Error("SubDelReq timeout: Unknown trans. Dropping this msg. SubId: %v", subs.GetSubId())
594 responseReceived := trans.CheckResponseReceived()
595 if responseReceived == true {
596 // Subscription Delete Response or Failure already received
600 if tryCount < maxSubDelReqTryCount {
601 xapp.Logger.Info("SubDelReq timeout: Resending SubDelReq to E2T: Mtype: %v, SubId: %v, Xid %s, Meid %v", trans.GetMtype(), subs.GetSubId(), trans.GetXid(), trans.GetMeid())
602 // Set possible to handle new response for the subId
604 trans.RetryTransaction()
606 c.rmrSend("SubDelReq(SubDelReq timer) to E2T", subs, trans, trans.Payload, trans.PayloadLen)
609 c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()), subReqTime, tryCount, c.handleSubscriptionDeleteRequestTimer)
613 if trans.ForwardRespToXapp == true {
614 var subDelRespPayload []byte
615 subDelRespPayload, err := c.e2ap.PackSubscriptionDeleteResponseFromSubDelReq(trans.Payload, subs.GetSubId())
617 xapp.Logger.Error("SubDelReq timeout: Unable to pack payload. Dropping this this msg. Err: %v, SubId: %v, Xid: %s, Payload %x", err, subs.GetSubId(), trans.GetXid(), trans.Payload)
621 // RIC SUBSCRIPTION DELETE RESPONSE
622 c.rmrReplyToSender("SubDelResp(SubDelReq timer) to xapp", subs, trans, 12021, subDelRespPayload, len(subDelRespPayload))
624 time.Sleep(3 * time.Second)
628 xapp.Logger.Info("SubDelReq timeout: Deleting trans record. SubId: %v, Xid: %s", subs.GetSubId(), trans.GetXid())
630 if !c.registry.DelSubscription(subs.GetSubId()) {
631 xapp.Logger.Error("SubDelReq timeout: Failed to release sequency number. SubId: %v, Xid: %s", subs.GetSubId(), trans.GetXid())