2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
24 //"gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/packer"
25 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
26 rtmgrhandle "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client/handle"
27 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
28 httptransport "github.com/go-openapi/runtime/client"
29 "github.com/go-openapi/strfmt"
30 "github.com/spf13/viper"
36 //-----------------------------------------------------------------------------
38 //-----------------------------------------------------------------------------
40 var subReqTime time.Duration = 5 * time.Second
41 var subDelReqTime time.Duration = 5 * time.Second
42 var maxSubReqTryCount uint64 = 2 // Initial try + retry
43 var maxSubDelReqTryCount uint64 = 2 // Initial try + retry
48 rtmgrClient *RtmgrClient
51 rmrSendMutex sync.Mutex
71 xapp.Logger.Info("SUBMGR")
73 viper.SetEnvPrefix("submgr")
74 viper.AllowEmptyEnv(true)
75 seedSN = uint16(viper.GetInt("seed_sn"))
77 rand.Seed(time.Now().UnixNano())
78 seedSN = uint16(rand.Intn(65535))
83 xapp.Logger.Info("SUBMGR: Initial Sequence Number: %v", seedSN)
86 func NewControl() *Control {
88 registry := new(Registry)
89 registry.Initialize(seedSN)
91 tracker := new(Tracker)
94 timerMap := new(TimerMap)
97 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
98 client := rtmgrclient.New(transport, strfmt.Default)
99 handle := rtmgrhandle.NewProvideXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
100 deleteHandle := rtmgrhandle.NewDeleteXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
101 rtmgrClient := RtmgrClient{client, handle, deleteHandle}
103 rtmgrClientPtr := &rtmgrClient
105 //TODO: to make this better. Now it is just a hack.
106 registry.rtmgrClient = rtmgrClientPtr
108 return &Control{e2ap: new(E2ap),
110 rtmgrClient: rtmgrClientPtr,
117 func (c *Control) Run() {
121 func (c *Control) rmrSendRaw(desc string, params *RMRParams) (err error) {
123 xapp.Logger.Info("%s: %s", desc, params.String())
126 for ; i <= 10 && status == false; i++ {
127 c.rmrSendMutex.Lock()
128 status = xapp.Rmr.Send(params.RMRParams, false)
129 c.rmrSendMutex.Unlock()
131 xapp.Logger.Info("rmr.Send() failed. Retry count %d, %s", i, params.String())
132 time.Sleep(500 * time.Millisecond)
136 err = fmt.Errorf("rmr.Send() failed. Retry count %d, %s", i, params.String())
137 xapp.Logger.Error("%s: %s", desc, err.Error())
138 xapp.Rmr.Free(params.Mbuf)
143 func (c *Control) rmrSend(desc string, subs *Subscription, trans *Transaction, payload []byte, payloadLen int) (err error) {
144 params := &RMRParams{&xapp.RMRParams{}}
145 params.Mtype = trans.GetMtype()
146 params.SubId = int(subs.GetSubId())
147 params.Xid = trans.GetXid()
148 params.Meid = subs.GetMeid()
150 params.PayloadLen = payloadLen
151 params.Payload = payload
154 return c.rmrSendRaw(desc, params)
157 func (c *Control) rmrReplyToSender(desc string, subs *Subscription, trans *Transaction, mType int, payload []byte, payloadLen int) (err error) {
158 params := &RMRParams{&xapp.RMRParams{}}
160 params.SubId = int(subs.GetSubId())
161 params.Xid = trans.GetXid()
162 params.Meid = subs.GetMeid()
164 params.PayloadLen = payloadLen
165 params.Payload = payload
168 return c.rmrSendRaw(desc, params)
171 func (c *Control) Consume(params *xapp.RMRParams) (err error) {
172 xapp.Rmr.Free(params.Mbuf)
175 msg := &RMRParams{params}
180 case xapp.RICMessageTypes["RIC_SUB_REQ"]:
181 go c.handleSubscriptionRequest(msg)
182 case xapp.RICMessageTypes["RIC_SUB_RESP"]:
183 go c.handleSubscriptionResponse(msg)
184 case xapp.RICMessageTypes["RIC_SUB_FAILURE"]:
185 go c.handleSubscriptionFailure(msg)
186 case xapp.RICMessageTypes["RIC_SUB_DEL_REQ"]:
187 go c.handleSubscriptionDeleteRequest(msg)
188 case xapp.RICMessageTypes["RIC_SUB_DEL_RESP"]:
189 go c.handleSubscriptionDeleteResponse(msg)
190 case xapp.RICMessageTypes["RIC_SUB_DEL_FAILURE"]:
191 go c.handleSubscriptionDeleteFailure(msg)
193 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
198 func (c *Control) handleSubscriptionRequest(params *RMRParams) {
199 xapp.Logger.Info("SubReq from xapp: %s", params.String())
201 srcAddr, srcPort, err := c.rtmgrClient.SplitSource(params.Src)
203 xapp.Logger.Error("SubReq: Failed to update routing-manager. Dropping this msg. Err: %s, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
207 subs, err := c.registry.ReserveSubscription(RmrEndpoint{*srcAddr, *srcPort}, params.Meid)
209 xapp.Logger.Error("SubReq: %s, Dropping this msg.", err.Error())
217 e2SubReq := packerif.NewPackerSubscriptionRequest()
218 packedData := &packer.PackedData{}
219 packedData.Buf = params.Payload
220 err = e2SubReq.UnPack(packedData)
222 xapp.Logger.Error("SubReq: UnPack() failed: %s", err.Error())
224 getErr, subReq := e2SubReq.Get()
226 xapp.Logger.Error("SubReq: Get() failed: %s", err.Error())
230 subReq.RequestId.Seq = uint32(subs.GetSubId())
232 err = e2SubReq.Set(subReq)
234 xapp.Logger.Error("SubReq: Set() failed: %s", err.Error())
237 err, packedData = e2SubReq.Pack(nil)
239 xapp.Logger.Error("SubReq: Pack() failed: %s", err.Error())
243 params.PayloadLen = len(packedData.Buf)
244 params.Payload = packedData.Buf
250 params.SubId = int(subs.GetSubId())
251 err = c.e2ap.SetSubscriptionRequestSequenceNumber(params.Payload, subs.GetSubId())
253 xapp.Logger.Error("SubReq: Unable to get Sequence Number from Payload. Dropping this msg. Err: %v, %s", err, params.String())
254 c.registry.DelSubscription(subs.Seq)
258 // Create transatcion record for every subscription request
259 var forwardRespToXapp bool = true
260 var responseReceived bool = false
261 trans, err := c.tracker.TrackTransaction(RmrEndpoint{*srcAddr, *srcPort}, params, responseReceived, forwardRespToXapp)
263 xapp.Logger.Error("SubReq: %s, Dropping this msg.", err.Error())
264 c.registry.DelSubscription(subs.Seq)
268 err = subs.SetTransaction(trans)
270 xapp.Logger.Error("SubReq: %s, Dropping this msg.", err.Error())
271 c.registry.DelSubscription(subs.Seq)
276 c.rmrSend("SubReq to E2T", subs, trans, params.Payload, params.PayloadLen)
278 c.timerMap.StartTimer("RIC_SUB_REQ", int(subs.Seq), subReqTime, FirstTry, c.handleSubscriptionRequestTimer)
279 xapp.Logger.Debug("SubReq: Debugging trans table = %v", c.tracker.transactionXappTable)
283 func (c *Control) handleSubscriptionResponse(params *RMRParams) {
284 xapp.Logger.Info("SubResp from E2T: %s", params.String())
286 payloadSeqNum, err := c.e2ap.GetSubscriptionResponseSequenceNumber(params.Payload)
288 xapp.Logger.Error("SubResp: Unable to get Sequence Number from Payload. Dropping this msg. Err: %v, SubId: %v, Xid: %s, Payload %X", err, params.SubId, params.Xid, params.Payload)
291 xapp.Logger.Info("SubResp: Received payloadSeqNum: %v", payloadSeqNum)
293 subs := c.registry.GetSubscription(payloadSeqNum)
295 xapp.Logger.Error("SubResp: Unknown payloadSeqNum. Dropping this msg. PayloadSeqNum: %v, SubId: %v", payloadSeqNum, params.SubId)
299 trans := subs.GetTransaction()
301 c.timerMap.StopTimer("RIC_SUB_REQ", int(payloadSeqNum))
303 responseReceived := trans.CheckResponseReceived()
304 if responseReceived == true {
305 // Subscription timer already received
311 c.rmrReplyToSender("SubResp to xapp", subs, trans, params.Mtype, params.Payload, params.PayloadLen)
312 xapp.Logger.Info("SubResp: SubId: %v, from address: %s. Deleting trans record", payloadSeqNum, trans.RmrEndpoint)
316 func (c *Control) handleSubscriptionFailure(params *RMRParams) {
317 xapp.Logger.Info("SubFail from E2T: %s", params.String())
319 payloadSeqNum, err := c.e2ap.GetSubscriptionFailureSequenceNumber(params.Payload)
321 xapp.Logger.Error("SubFail: Unable to get Sequence Number from Payload. Dropping this msg. Err: %v, SubId: %v, Xid: %s, Payload %X", err, params.SubId, params.Xid, params.Payload)
324 xapp.Logger.Info("SubFail: Received payloadSeqNum: %v", payloadSeqNum)
326 subs := c.registry.GetSubscription(payloadSeqNum)
328 xapp.Logger.Error("SubFail: Unknown payloadSeqNum. Dropping this msg. PayloadSeqNum: %v, SubId: %v", payloadSeqNum, params.SubId)
332 trans := subs.GetTransaction()
334 xapp.Logger.Error("SubFail: Unknown trans. Dropping this msg. PayloadSeqNum: %v, SubId: %v", payloadSeqNum, params.SubId)
338 c.timerMap.StopTimer("RIC_SUB_REQ", int(payloadSeqNum))
340 responseReceived := trans.CheckResponseReceived()
342 xapp.Logger.Info("SubFail: Dropping this msg. Err: %v SubId: %v", err, payloadSeqNum)
346 if responseReceived == true {
347 // Subscription timer already received
350 xapp.Logger.Info("SubFail: SubId: %v, from address: %s. Forwarding response to xApp", payloadSeqNum, trans.RmrEndpoint)
352 c.rmrReplyToSender("SubFail to xapp", subs, trans, params.Mtype, params.Payload, params.PayloadLen)
354 time.Sleep(3 * time.Second)
356 xapp.Logger.Info("SubFail: Deleting trans record. SubId: %v, Xid: %s", params.SubId, params.Xid)
358 if !c.registry.DelSubscription(payloadSeqNum) {
359 xapp.Logger.Error("SubFail: Failed to release sequency number. SubId: %v, Xid: %s", params.SubId, params.Xid)
364 func (c *Control) handleSubscriptionRequestTimer(strId string, nbrId int, tryCount uint64) {
365 xapp.Logger.Info("SubReq timeout: subId: %v, tryCount: %v", nbrId, tryCount)
367 subs := c.registry.GetSubscription(uint16(nbrId))
369 xapp.Logger.Error("SubReq timeout: Unknown payloadSeqNum. Dropping this msg. SubId: %v", nbrId)
373 trans := subs.GetTransaction()
375 xapp.Logger.Error("SubReq timeout: Unknown trans. Dropping this msg. SubId: %v", subs.GetSubId())
379 responseReceived := trans.CheckResponseReceived()
381 if responseReceived == true {
382 // Subscription Response or Failure already received
386 if tryCount < maxSubReqTryCount {
387 xapp.Logger.Info("SubReq timeout: Resending SubReq to E2T: Mtype: %v, SubId: %v, Xid %s, Meid %v", trans.OrigParams.Mtype, subs.GetSubId(), trans.GetXid(), subs.GetMeid())
389 trans.RetryTransaction()
391 c.rmrSend("SubReq(SubReq timer) to E2T", subs, trans, trans.OrigParams.Payload, trans.OrigParams.PayloadLen)
394 c.timerMap.StartTimer("RIC_SUB_REQ", int(subs.GetSubId()), subReqTime, tryCount, c.handleSubscriptionRequestTimer)
398 var subDelReqPayload []byte
399 subDelReqPayload, err := c.e2ap.PackSubscriptionDeleteRequest(trans.OrigParams.Payload, subs.GetSubId())
401 xapp.Logger.Error("SubReq timeout: Packing SubDelReq failed. Err: %v", err)
405 // Cancel failed subscription
406 params := &RMRParams{&xapp.RMRParams{}}
407 params.Mtype = 12020 // RIC SUBSCRIPTION DELETE
408 params.SubId = int(subs.GetSubId())
409 params.Xid = trans.GetXid()
410 params.Meid = subs.GetMeid()
411 params.Src = trans.OrigParams.Src
412 params.PayloadLen = len(subDelReqPayload)
413 params.Payload = subDelReqPayload
416 // Delete CREATE transaction
419 // Create DELETE transaction (internal and no messages toward xapp)
420 var forwardRespToXapp bool = false
421 var respReceived bool = false
422 deltrans, err := c.tracker.TrackTransaction(trans.RmrEndpoint, params, respReceived, forwardRespToXapp)
424 xapp.Logger.Error("SubReq timeout: %s, Dropping this msg.", err.Error())
428 err = subs.SetTransaction(deltrans)
430 xapp.Logger.Error("SubReq timeout: %s, Dropping this msg.", err.Error())
435 c.rmrSend("SubDelReq(SubReq timer) to E2T", subs, deltrans, deltrans.OrigParams.Payload, deltrans.OrigParams.PayloadLen)
437 c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()), subDelReqTime, FirstTry, c.handleSubscriptionDeleteRequestTimer)
441 func (c *Control) handleSubscriptionDeleteRequest(params *RMRParams) {
442 xapp.Logger.Info("SubDelReq from xapp: %s", params.String())
444 srcAddr, srcPort, err := c.rtmgrClient.SplitSource(params.Src)
446 xapp.Logger.Error("SubDelReq: Failed to update routing-manager. Dropping this msg. Err: %s, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
450 payloadSeqNum, err := c.e2ap.GetSubscriptionDeleteRequestSequenceNumber(params.Payload)
452 xapp.Logger.Error("SubDelReq: Unable to get Sequence Number from Payload. Dropping this msg. Err: %v, SubId: %v, Xid: %s, Payload %X", err, params.SubId, params.Xid, params.Payload)
455 xapp.Logger.Info("SubDelReq: Received payloadSeqNum: %v", payloadSeqNum)
457 subs := c.registry.GetSubscription(payloadSeqNum)
459 xapp.Logger.Error("SubDelReq: Not valid sequence number. Dropping this msg. SubId: %v, Xid: %s", params.SubId, params.Xid)
463 var forwardRespToXapp bool = true
464 var respReceived bool = false
465 trans, err := c.tracker.TrackTransaction(RmrEndpoint{*srcAddr, *srcPort}, params, respReceived, forwardRespToXapp)
467 xapp.Logger.Error("SubDelReq: %s, Dropping this msg.", err.Error())
471 err = subs.SetTransaction(trans)
473 xapp.Logger.Error("SubDelReq: %s, Dropping this msg.", err.Error())
480 c.rmrSend("SubDelReq to E2T", subs, trans, trans.OrigParams.Payload, trans.OrigParams.PayloadLen)
482 c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()), subDelReqTime, FirstTry, c.handleSubscriptionDeleteRequestTimer)
486 func (c *Control) handleSubscriptionDeleteResponse(params *RMRParams) (err error) {
487 xapp.Logger.Info("SubDelResp from E2T:%s", params.String())
489 payloadSeqNum, err := c.e2ap.GetSubscriptionDeleteResponseSequenceNumber(params.Payload)
491 xapp.Logger.Error("SubDelResp: Unable to get Sequence Number from Payload. Dropping this msg. Err: %v, SubId: %v, Xid: %s, Payload %X", err, params.SubId, params.Xid, params.Payload)
494 xapp.Logger.Info("SubDelResp: Received payloadSeqNum: %v", payloadSeqNum)
496 subs := c.registry.GetSubscription(payloadSeqNum)
498 xapp.Logger.Error("SubDelResp: Unknown payloadSeqNum. Dropping this msg. PayloadSeqNum: %v, SubId: %v", payloadSeqNum, params.SubId)
502 trans := subs.GetTransaction()
504 xapp.Logger.Error("SubDelResp: Unknown trans. Dropping this msg. PayloadSeqNum: %v, SubId: %v", subs.GetSubId(), params.SubId)
508 c.timerMap.StopTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()))
510 responseReceived := trans.CheckResponseReceived()
511 if responseReceived == true {
512 // Subscription Delete timer already received
518 if trans.ForwardRespToXapp == true {
519 c.rmrReplyToSender("SubDelResp to xapp", subs, trans, params.Mtype, params.Payload, params.PayloadLen)
520 time.Sleep(3 * time.Second)
523 xapp.Logger.Info("SubDelResp: Deleting trans record. SubId: %v, Xid: %s", subs.GetSubId(), trans.GetXid())
524 if !c.registry.DelSubscription(subs.GetSubId()) {
525 xapp.Logger.Error("SubDelResp: Failed to release sequency number. SubId: %v, Xid: %s", subs.GetSubId(), trans.GetXid())
531 func (c *Control) handleSubscriptionDeleteFailure(params *RMRParams) {
532 xapp.Logger.Info("SubDelFail from E2T:%s", params.String())
534 payloadSeqNum, err := c.e2ap.GetSubscriptionDeleteFailureSequenceNumber(params.Payload)
536 xapp.Logger.Error("SubDelFail: Unable to get Sequence Number from Payload. Dropping this msg. Err: %v, %s", err, params.String())
539 xapp.Logger.Info("SubDelFail: Received payloadSeqNum: %v", payloadSeqNum)
541 subs := c.registry.GetSubscription(payloadSeqNum)
543 xapp.Logger.Error("SubDelFail: Unknown payloadSeqNum. Dropping this msg. PayloadSeqNum: %v, SubId: %v", payloadSeqNum, params.SubId)
547 trans := subs.GetTransaction()
549 xapp.Logger.Error("SubDelFail: Unknown trans. Dropping this msg. PayloadSeqNum: %v, SubId: %v", subs.GetSubId(), params.SubId)
553 c.timerMap.StopTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()))
555 responseReceived := trans.CheckResponseReceived()
556 if responseReceived == true {
557 // Subscription Delete timer already received
560 if trans.ForwardRespToXapp == true {
561 var subDelRespPayload []byte
562 subDelRespPayload, err = c.e2ap.PackSubscriptionDeleteResponse(trans.OrigParams.Payload, subs.GetSubId())
564 xapp.Logger.Error("SubDelFail:Packing SubDelResp failed. Err: %v", err)
568 // RIC SUBSCRIPTION DELETE RESPONSE
569 c.rmrReplyToSender("SubDelFail to xapp", subs, trans, 12021, subDelRespPayload, len(subDelRespPayload))
570 time.Sleep(3 * time.Second)
573 xapp.Logger.Info("SubDelFail: Deleting trans record. SubId: %v, Xid: %s", subs.GetSubId(), trans.GetXid())
575 if !c.registry.DelSubscription(subs.GetSubId()) {
576 xapp.Logger.Error("SubDelFail: Failed to release sequency number. Err: %v, SubId: %v, Xid: %s", err, subs.GetSubId(), trans.GetXid())
582 func (c *Control) handleSubscriptionDeleteRequestTimer(strId string, nbrId int, tryCount uint64) {
583 xapp.Logger.Info("SubDelReq timeout: subId: %v, tryCount: %v", nbrId, tryCount)
585 subs := c.registry.GetSubscription(uint16(nbrId))
587 xapp.Logger.Error("SubDelReq timeout: Unknown payloadSeqNum. Dropping this msg. SubId: %v", nbrId)
591 trans := subs.GetTransaction()
593 xapp.Logger.Error("SubDelReq timeout: Unknown trans. Dropping this msg. SubId: %v", subs.GetSubId())
597 responseReceived := trans.CheckResponseReceived()
598 if responseReceived == true {
599 // Subscription Delete Response or Failure already received
603 if tryCount < maxSubDelReqTryCount {
604 xapp.Logger.Info("SubDelReq timeout: Resending SubDelReq to E2T: Mtype: %v, SubId: %v, Xid %s, Meid %v", trans.OrigParams.Mtype, subs.GetSubId(), trans.GetXid(), subs.GetMeid())
605 // Set possible to handle new response for the subId
607 trans.RetryTransaction()
609 c.rmrSend("SubDelReq(SubDelReq timer) to E2T", subs, trans, trans.OrigParams.Payload, trans.OrigParams.PayloadLen)
612 c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()), subReqTime, tryCount, c.handleSubscriptionDeleteRequestTimer)
616 if trans.ForwardRespToXapp == true {
617 var subDelRespPayload []byte
618 subDelRespPayload, err := c.e2ap.PackSubscriptionDeleteResponse(trans.OrigParams.Payload, subs.GetSubId())
620 xapp.Logger.Error("SubDelReq timeout: Unable to pack payload. Dropping this this msg. Err: %v, SubId: %v, Xid: %s, Payload %x", err, subs.GetSubId(), trans.GetXid(), trans.OrigParams.Payload)
624 // RIC SUBSCRIPTION DELETE RESPONSE
625 c.rmrReplyToSender("SubDelResp(SubDelReq timer) to xapp", subs, trans, 12021, subDelRespPayload, len(subDelRespPayload))
627 time.Sleep(3 * time.Second)
631 xapp.Logger.Info("SubDelReq timeout: Deleting trans record. SubId: %v, Xid: %s", subs.GetSubId(), trans.GetXid())
633 if !c.registry.DelSubscription(subs.GetSubId()) {
634 xapp.Logger.Error("SubDelReq timeout: Failed to release sequency number. SubId: %v, Xid: %s", subs.GetSubId(), trans.GetXid())