2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
26 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
27 rtmgrhandle "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client/handle"
28 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
29 httptransport "github.com/go-openapi/runtime/client"
30 "github.com/go-openapi/strfmt"
31 "github.com/spf13/viper"
38 var rmrSendMutex = &sync.Mutex{}
40 var subReqTime time.Duration = 2 * time.Second
41 var SubDelReqTime time.Duration = 2 * time.Second
46 rtmgrClient *RtmgrClient
48 rcChan chan *xapp.RMRParams
67 xapp.Logger.Info("SUBMGR /ric-plt-submgr:r3-test-v2")
69 viper.SetEnvPrefix("submgr")
70 viper.AllowEmptyEnv(true)
71 seedSN = uint16(viper.GetInt("seed_sn"))
73 rand.Seed(time.Now().UnixNano())
74 seedSN = uint16(rand.Intn(65535))
79 xapp.Logger.Info("SUBMGR: Initial Sequence Number: %v", seedSN)
82 func NewControl() Control {
83 registry := new(Registry)
84 registry.Initialize(seedSN)
86 tracker := new(Tracker)
89 timerMap := new(TimerMap)
92 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
93 client := rtmgrclient.New(transport, strfmt.Default)
94 handle := rtmgrhandle.NewProvideXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
95 deleteHandle := rtmgrhandle.NewDeleteXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
96 rtmgrClient := RtmgrClient{client, handle, deleteHandle}
98 return Control{new(E2ap), registry, &rtmgrClient, tracker, make(chan *xapp.RMRParams),timerMap}
101 func (c *Control) Run() {
106 func (c *Control) Consume(rp *xapp.RMRParams) (err error) {
111 func (c *Control) rmrSend(params *xapp.RMRParams) (err error) {
115 for ; i <= 10 && status == false; i++ {
116 status = xapp.Rmr.Send(params, false)
118 xapp.Logger.Info("rmr.Send() failed. Retry count %v, Mtype: %v, SubId: %v, Xid %s",i, params.Mtype, params.SubId, params.Xid)
119 time.Sleep(500 * time.Millisecond)
123 err = errors.New("rmr.Send() failed")
124 xapp.Rmr.Free(params.Mbuf)
126 rmrSendMutex.Unlock()
129 if !xapp.Rmr.Send(params, false) {
130 err = errors.New("rmr.Send() failed")
131 xapp.Rmr.Free(params.Mbuf)
137 func (c *Control) rmrReplyToSender(params *xapp.RMRParams) (err error) {
142 func (c *Control) controlLoop() {
146 case xapp.RICMessageTypes["RIC_SUB_REQ"]:
147 go c.handleSubscriptionRequest(msg)
148 case xapp.RICMessageTypes["RIC_SUB_RESP"]:
149 go c.handleSubscriptionResponse(msg)
150 case xapp.RICMessageTypes["RIC_SUB_FAILURE"]:
151 go c.handleSubscriptionFailure(msg)
152 case xapp.RICMessageTypes["RIC_SUB_DEL_REQ"]:
153 go c.handleSubscriptionDeleteRequest(msg)
154 case xapp.RICMessageTypes["RIC_SUB_DEL_RESP"]:
155 go c.handleSubscriptionDeleteResponse(msg)
157 err := errors.New("Message Type " + strconv.Itoa(msg.Mtype) + " is discarded")
158 xapp.Logger.Error("Unknown message type: %v", err)
163 func (c *Control) handleSubscriptionRequest(params *xapp.RMRParams) (err error) {
164 xapp.Logger.Info("Subscription Request Received from Src: %s, Mtype: %v, SubId: %v, Xid: %s, Meid: %v",params.Src, params.Mtype, params.SubId, params.Xid, params.Meid)
165 xapp.Rmr.Free(params.Mbuf)
168 /* Reserve a sequence number and set it in the payload */
169 newSubId, isIdValid := c.registry.ReserveSequenceNumber()
170 if isIdValid != true {
171 xapp.Logger.Info("Further processing of this SubscriptionRequest stopped. SubId: %v, Xid: %s",params.SubId, params.Xid)
175 err = c.e2ap.SetSubscriptionRequestSequenceNumber(params.Payload, newSubId)
177 err = errors.New("Unable to set Subscription Sequence Number in Payload due to: " + err.Error())
181 srcAddr, srcPort, err := c.rtmgrClient.SplitSource(params.Src)
183 xapp.Logger.Error("Failed to update routing-manager about the subscription request with reason: %s", err)
187 /* Create transatcion records for every subscription request */
188 xactKey := TransactionKey{newSubId, CREATE}
189 xactValue := Transaction{*srcAddr, *srcPort, params}
190 err = c.tracker.TrackTransaction(xactKey, xactValue)
192 xapp.Logger.Error("Failed to create a Subscription Request transaction record due to %v", err)
196 /* Update routing manager about the new subscription*/
197 subRouteAction := SubRouteInfo{CREATE, *srcAddr, *srcPort, newSubId}
198 xapp.Logger.Info("Starting routing manager update")
199 c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
201 //time.Sleep(3 * time.Second)
203 // Setting new subscription ID in the RMR header
204 params.SubId = int(newSubId)
205 xapp.Logger.Info("Forwarding Subscription Request to E2T: Mtype: %v, SubId: %v, Xid %s, Meid %v",params.Mtype, params.SubId, params.Xid, params.Meid)
206 err = c.rmrSend(params)
208 xapp.Logger.Error("Failed to send request to E2T %v. SubId: %v, Xid: %s", err, params.SubId, params.Xid)
210 c.timerMap.StartTimer(newSubId, subReqTime, c.handleSubscriptionRequestTimer)
212 xapp.Logger.Debug("--- Debugging transaction table = %v", c.tracker.transactionTable)
216 func (c *Control) handleSubscriptionResponse(params *xapp.RMRParams) (err error) {
217 xapp.Logger.Info("Subscription Response Received from Src: %s, Mtype: %v, SubId: %v, Meid: %v",params.Src, params.Mtype, params.SubId, params.Meid)
218 xapp.Rmr.Free(params.Mbuf)
221 payloadSeqNum, err := c.e2ap.GetSubscriptionResponseSequenceNumber(params.Payload)
223 err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())
227 xapp.Logger.Info("Received payloadSeqNum: %v",payloadSeqNum)
228 if !c.registry.IsValidSequenceNumber(payloadSeqNum) {
229 err = errors.New("Unknown Subscription ID: " + strconv.Itoa(int(payloadSeqNum)) + " in Subscritpion Response. Message discarded.")
233 // c.timerMap.StopTimer(payloadSeqNum)
235 c.registry.setSubscriptionToConfirmed(payloadSeqNum)
236 var transaction Transaction
237 transaction, err = c.tracker.RetriveTransaction(payloadSeqNum, CREATE)
239 xapp.Logger.Error("Failed to retrive transaction record. Err: %v", err)
240 xapp.Logger.Info("Further processing of this Subscription Response stopped. SubId: %v, Xid: %s",params.SubId, params.Xid)
243 xapp.Logger.Info("Subscription ID: %v, from address: %v:%v. Retrieved old subId...", int(payloadSeqNum), transaction.XappInstanceAddress, transaction.XappPort)
245 params.SubId = int(payloadSeqNum)
246 params.Xid = transaction.OrigParams.Xid
248 xapp.Logger.Info("Forwarding Subscription Response to UEEC: Mtype: %v, SubId: %v, Xid: %s, Meid: %v",params.Mtype, params.SubId, params.Xid, params.Meid)
249 err = c.rmrReplyToSender(params)
251 xapp.Logger.Error("Failed to send response to requestor %v. SubId: %v, Xid: %s", err, params.SubId, params.Xid)
254 xapp.Logger.Info("Subscription ID: %v, from address: %v:%v. Deleting transaction record", int(payloadSeqNum), transaction.XappInstanceAddress, transaction.XappPort)
255 transaction, err = c.tracker.completeTransaction(payloadSeqNum, CREATE)
257 xapp.Logger.Error("Failed to delete a Subscription Request transaction record due to %v", err)
263 func (c *Control) handleSubscriptionFailure(params *xapp.RMRParams) (err error) {
264 xapp.Logger.Info("Subscription Failure Received from Src: %s, Mtype: %v, SubId: %v, Meid: %v",params.Src, params.Mtype, params.SubId, params.Meid)
265 xapp.Rmr.Free(params.Mbuf)
268 payloadSeqNum, err := c.e2ap.GetSubscriptionFailureSequenceNumber(params.Payload)
270 err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())
273 xapp.Logger.Info("Received payloadSeqNum: %v", payloadSeqNum)
275 // should here be IsValidSequenceNumber check?
277 // c.timerMap.StopTimer(payloadSeqNum)
279 var transaction Transaction
280 transaction, err = c.tracker.RetriveTransaction(payloadSeqNum, CREATE)
282 xapp.Logger.Error("Failed to retrive transaction record. Err %v", err)
283 xapp.Logger.Info("Further processing of this Subscription Failure stopped. SubId: %v, Xid: %s",params.SubId, params.Xid)
286 xapp.Logger.Info("Subscription ID: %v, from address: %v:%v. Forwarding response to requestor...", int(payloadSeqNum), transaction.XappInstanceAddress, transaction.XappPort)
288 params.SubId = int(payloadSeqNum)
289 params.Xid = transaction.OrigParams.Xid
291 xapp.Logger.Info("Forwarding Subscription Failure to UEEC: Mtype: %v, SubId: %v, Xid: %v, Meid: %v",params.Mtype, params.SubId, params.Xid, params.Meid)
292 err = c.rmrReplyToSender(params)
294 xapp.Logger.Error("Failed to send response to requestor %v. SubId: %v, Xid: %s", err, params.SubId, params.Xid)
297 time.Sleep(3 * time.Second)
299 xapp.Logger.Info("Starting routing manager update")
300 subRouteAction := SubRouteInfo{CREATE, transaction.XappInstanceAddress, transaction.XappPort, payloadSeqNum}
301 c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
303 xapp.Logger.Info("Deleting trancaction record")
304 if c.registry.releaseSequenceNumber(payloadSeqNum) {
305 transaction, err = c.tracker.completeTransaction(payloadSeqNum, CREATE)
307 xapp.Logger.Error("Failed to delete a Subscription Request transaction record due to %v", err)
314 func (c *Control) handleSubscriptionRequestTimer(subId uint16) {
315 xapp.Logger.Info("Subscription Request timer expired. SubId: %v",subId)
317 transaction, err := c.tracker.completeTransaction(subId, CREATE)
319 xapp.Logger.Error("Failed to delete a Subscription Request transaction record due to %v", err)
322 xapp.Logger.Info("SubId: %v, Xid %v, Meid: %v",subId, transaction.OrigParams.Xid, transaction.OrigParams.Meid)
324 var params xapp.RMRParams
325 params.Mtype = 12012 //xapp.RICMessageTypes["RIC_SUB_FAILURE"]
326 params.SubId = int(subId)
327 params.Meid = transaction.OrigParams.Meid
328 params.Xid = transaction.OrigParams.Xid
329 payload := []byte("40C9408098000003EA7E00050000010016EA6300020021EA6E00808180EA6F000400000000EA6F000400010040EA6F000400020080EA6F0004000300C0EA6F000400040100EA6F000400050140EA6F000400060180EA6F0004000701C0EA6F000400080200EA6F000400090240EA6F0004000A0280EA6F0004000B02C0EA6F0004000C0300EA6F0004000D0340EA6F0004000E0380EA6F0004000F03C0")
330 params.PayloadLen = len(payload)
331 params.Payload = payload
333 xapp.Logger.Info("Forwarding Subscription Failure to UEEC: Mtype: %v, SubId: %v, Xid: %s, Meid: %v",params.Mtype, params.SubId, params.Xid, params.Meid)
334 err = c.rmrReplyToSender(¶ms)
336 xapp.Logger.Error("Failed to send response to requestor %v. SubId: %v, Xid: %s", err, params.SubId, params.Xid)
340 time.Sleep(3 * time.Second)
342 xapp.Logger.Info("Subscription ID: %v, from address: %v:%v. Deleting transaction record", int(subId), transaction.XappInstanceAddress, transaction.XappPort)
344 xapp.Logger.Info("Starting routing manager update")
345 subRouteAction := SubRouteInfo{DELETE, transaction.XappInstanceAddress, transaction.XappPort, payloadSeqNum}
346 c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
348 xapp.Logger.Info("Deleting trancaction record")
349 if c.registry.releaseSequenceNumber(payloadSeqNum) {
350 transaction, err = c.tracker.completeTransaction(payloadSeqNum, CREATE)
352 xapp.Logger.Error("Failed to delete a Subscription Request transaction record due to %v", err)
360 func (act Action) String() string {
361 actions := [...]string{
367 if act < CREATE || act > DELETE {
373 func (act Action) valid() bool {
375 case CREATE, MERGE, DELETE:
382 func (c *Control) handleSubscriptionDeleteRequest(params *xapp.RMRParams) (err error) {
383 xapp.Logger.Info("Subscription Delete Request Received from Src: %s, Mtype: %v, SubId: %v, Xid: %s, Meid: %v",params.Src, params.Mtype, params.SubId, params.Xid, params.Meid)
384 xapp.Rmr.Free(params.Mbuf)
387 payloadSeqNum, err := c.e2ap.GetSubscriptionDeleteRequestSequenceNumber(params.Payload)
389 err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())
392 xapp.Logger.Info("Received payloadSeqNum: %v", payloadSeqNum)
393 if c.registry.IsValidSequenceNumber(payloadSeqNum) {
394 c.registry.deleteSubscription(payloadSeqNum)
395 trackErr := c.trackDeleteTransaction(params, payloadSeqNum)
397 xapp.Logger.Error("Failed to create a Subscription Delete Request transaction record due to %v", trackErr)
402 xapp.Logger.Info("Forwarding Delete Subscription Request to E2T: Mtype: %v, SubId: %v, Xid: %s, Meid: %v",params.Mtype, params.SubId, params.Xid, params.Meid)
405 xapp.Logger.Error("Failed to send request to E2T %v. SubId: %v, Xid: %s", err, params.SubId, params.Xid)
407 c.timerMap.StartTimer(payloadSeqNum, SubDelReqTime, c.handleSubscriptionDeleteRequestTimer)
412 func (c *Control) trackDeleteTransaction(params *xapp.RMRParams, payloadSeqNum uint16) (err error) {
413 srcAddr, srcPort, err := c.rtmgrClient.SplitSource(params.Src)
415 xapp.Logger.Error("Failed to update routing-manager about the subscription delete request with reason: %s", err)
417 xactKey := TransactionKey{payloadSeqNum, DELETE}
418 xactValue := Transaction{*srcAddr, *srcPort, params}
419 err = c.tracker.TrackTransaction(xactKey, xactValue)
423 func (c *Control) handleSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
424 xapp.Logger.Info("Subscription Delete Response Received from Src: %s, Mtype: %v, SubId: %v, Meid: %v",params.Src, params.Mtype, params.SubId, params.Meid)
425 xapp.Rmr.Free(params.Mbuf)
428 payloadSeqNum, err := c.e2ap.GetSubscriptionDeleteResponseSequenceNumber(params.Payload)
430 err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())
433 xapp.Logger.Info("Received payloadSeqNum: %v", payloadSeqNum)
435 // should here be IsValidSequenceNumber check?
436 // c.timerMap.StopTimer(payloadSeqNum)
438 var transaction Transaction
439 transaction, err = c.tracker.RetriveTransaction(payloadSeqNum, DELETE)
441 xapp.Logger.Error("Failed to retrive transaction record. Err %v", err)
442 xapp.Logger.Info("Further processing of this Subscription Delete Response stopped. SubId: %v, Xid: %s",params.SubId, params.Xid)
445 xapp.Logger.Info("Subscription ID: %v, from address: %v:%v. Forwarding response to requestor...", int(payloadSeqNum), transaction.XappInstanceAddress, transaction.XappPort)
447 params.SubId = int(payloadSeqNum)
448 params.Xid = transaction.OrigParams.Xid
449 xapp.Logger.Info("Forwarding Subscription Delete Response to UEEC: Mtype: %v, SubId: %v, Xid: %v, Meid: %v",params.Mtype, params.SubId, params.Xid, params.Meid)
450 err = c.rmrReplyToSender(params)
452 xapp.Logger.Error("Failed to send response to requestor %v. SubId: %v, Xid: %s", err, params.SubId, params.Xid)
456 time.Sleep(3 * time.Second)
458 xapp.Logger.Info("Starting routing manager update")
459 subRouteAction := SubRouteInfo{DELETE, transaction.XappInstanceAddress, transaction.XappPort, payloadSeqNum}
460 c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
462 xapp.Logger.Info("Deleting trancaction record")
463 if c.registry.releaseSequenceNumber(payloadSeqNum) {
464 transaction, err = c.tracker.completeTransaction(payloadSeqNum, DELETE)
466 xapp.Logger.Error("Failed to delete a Subscription Delete Request transaction record due to %v", err)
473 func (c *Control) handleSubscriptionDeleteFailure(params *xapp.RMRParams) (err error) {
474 xapp.Logger.Info("Subscription Delete Failure Received from Src: %s, Mtype: %v, SubId: %v, Meid: %v",params.Src, params.Mtype, params.SubId, params.Meid)
475 xapp.Rmr.Free(params.Mbuf)
478 payloadSeqNum, err := c.e2ap.GetSubscriptionDeleteFailureSequenceNumber(params.Payload)
480 err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())
483 xapp.Logger.Info("Received payloadSeqNum: %v", payloadSeqNum)
485 // should here be IsValidSequenceNumber check?
486 // c.timerMap.StopTimer(payloadSeqNum)
488 var transaction Transaction
489 transaction, err = c.tracker.RetriveTransaction(payloadSeqNum, DELETE)
491 xapp.Logger.Error("Failed to retrive transaction record. Err %v", err)
492 xapp.Logger.Info("Further processing of this Subscription Delete Failure stopped. SubId: %v, Xid: %s",params.SubId, params.Xid)
495 xapp.Logger.Info("Subscription ID: %v, from address: %v:%v. Forwarding response to requestor...", int(payloadSeqNum), transaction.XappInstanceAddress, transaction.XappPort)
497 params.SubId = int(payloadSeqNum)
498 params.Xid = transaction.OrigParams.Xid
499 xapp.Logger.Info("Forwarding Subscription Delete Failure to UEEC: Mtype: %v, SubId: %v, Xid: %v, Meid: %v",params.Mtype, params.SubId, params.Xid, params.Meid)
500 err = c.rmrReplyToSender(params)
502 xapp.Logger.Error("Failed to send response to requestor %v. SubId: %v, Xid: %s", err, params.SubId, params.Xid)
506 time.Sleep(3 * time.Second)
508 xapp.Logger.Info("Starting routing manager update")
509 subRouteAction := SubRouteInfo{DELETE, transaction.XappInstanceAddress, transaction.XappPort, payloadSeqNum}
510 c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
512 xapp.Logger.Info("Deleting trancaction record")
513 if c.registry.releaseSequenceNumber(payloadSeqNum) {
514 transaction, err = c.tracker.completeTransaction(payloadSeqNum, DELETE)
516 xapp.Logger.Error("Failed to delete a Subscription Delete Request transaction record due to %v", err)
523 func (c *Control) handleSubscriptionDeleteRequestTimer(subId uint16) {
524 xapp.Logger.Info("Subscription Delete Request timer expired. SubId: %v",subId)
526 transaction, err := c.tracker.completeTransaction(subId, DELETE)
528 xapp.Logger.Error("Failed to delete a Subscription Delete Request transaction record due to %v", err)
531 xapp.Logger.Info("SubId: %v, Xid %v, Meid: %v",subId, transaction.OrigParams.Xid, transaction.OrigParams.Meid)
533 var params xapp.RMRParams
534 params.Mtype = 12022 //xapp.RICMessageTypes["RIC_SUB_DEL_FAILURE"]
535 params.SubId = int(subId)
536 params.Meid = transaction.OrigParams.Meid
537 params.Xid = transaction.OrigParams.Xid
538 payload := []byte("40CA4018000003EA7E00050000010016EA6300020021EA74000200C0")
539 params.PayloadLen = len(payload)
540 params.Payload = payload
542 xapp.Logger.Info("Forwarding Subscription Delete Failure to UEEC: Mtype: %v, SubId: %v, Xid: %s, Meid: %v",params.Mtype, params.SubId, params.Xid, params.Meid)
543 err = c.rmrReplyToSender(¶ms)
545 xapp.Logger.Error("Failed to send response to requestor %v. SubId: %v, Xid: %s", err, params.SubId, params.Xid)
549 time.Sleep(3 * time.Second)
550 xapp.Logger.Info("Subscription ID: %v, from address: %v:%v. Deleting transaction record", int(subId), transaction.XappInstanceAddress, transaction.XappPort)
552 xapp.Logger.Info("Starting routing manager update")
553 subRouteAction := SubRouteInfo{DELETE, transaction.XappInstanceAddress, transaction.XappPort, payloadSeqNum}
554 c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
556 xapp.Logger.Info("Deleting trancaction record")
557 if c.registry.releaseSequenceNumber(payloadSeqNum) {
558 transaction, err = c.tracker.completeTransaction(payloadSeqNum, DELETE)
560 xapp.Logger.Error("Failed to delete a Subscription Delete Request transaction record due to %v", err)