2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
26 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
27 rtmgrhandle "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client/handle"
28 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
29 httptransport "github.com/go-openapi/runtime/client"
30 "github.com/go-openapi/strfmt"
31 "github.com/spf13/viper"
37 var subReqTime time.Duration = 2 * time.Second
38 var SubDelReqTime time.Duration = 2 * time.Second
43 rtmgrClient *RtmgrClient
46 rmrSendMutex *sync.Mutex
65 xapp.Logger.Info("SUBMGR /ric-plt-submgr:r3-test-v4")
67 viper.SetEnvPrefix("submgr")
68 viper.AllowEmptyEnv(true)
69 seedSN = uint16(viper.GetInt("seed_sn"))
71 rand.Seed(time.Now().UnixNano())
72 seedSN = uint16(rand.Intn(65535))
77 xapp.Logger.Info("SUBMGR: Initial Sequence Number: %v", seedSN)
80 func NewControl() Control {
81 registry := new(Registry)
82 registry.Initialize(seedSN)
84 tracker := new(Tracker)
87 timerMap := new(TimerMap)
90 rmrSendMutex := &sync.Mutex{}
92 transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
93 client := rtmgrclient.New(transport, strfmt.Default)
94 handle := rtmgrhandle.NewProvideXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
95 deleteHandle := rtmgrhandle.NewDeleteXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
96 rtmgrClient := RtmgrClient{client, handle, deleteHandle}
98 return Control{new(E2ap), registry, &rtmgrClient, tracker, timerMap, rmrSendMutex}
101 func (c *Control) Run() {
105 func (c *Control) rmrSend(params *xapp.RMRParams) (err error) {
108 for ; i <= 10 && status == false; i++ {
109 c.rmrSendMutex.Lock()
110 status = xapp.Rmr.Send(params, false)
111 c.rmrSendMutex.Unlock()
113 xapp.Logger.Info("rmr.Send() failed. Retry count %v, Mtype: %v, SubId: %v, Xid %s",i, params.Mtype, params.SubId, params.Xid)
114 time.Sleep(500 * time.Millisecond)
118 err = errors.New("rmr.Send() failed")
119 xapp.Rmr.Free(params.Mbuf)
124 func (c *Control) rmrReplyToSender(params *xapp.RMRParams) (err error) {
129 func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
131 case xapp.RICMessageTypes["RIC_SUB_REQ"]:
132 go c.handleSubscriptionRequest(msg)
133 case xapp.RICMessageTypes["RIC_SUB_RESP"]:
134 go c.handleSubscriptionResponse(msg)
135 case xapp.RICMessageTypes["RIC_SUB_FAILURE"]:
136 go c.handleSubscriptionFailure(msg)
137 case xapp.RICMessageTypes["RIC_SUB_DEL_REQ"]:
138 go c.handleSubscriptionDeleteRequest(msg)
139 case xapp.RICMessageTypes["RIC_SUB_DEL_RESP"]:
140 go c.handleSubscriptionDeleteResponse(msg)
142 xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
147 func (c *Control) handleSubscriptionRequest(params *xapp.RMRParams) {
148 xapp.Logger.Info("Subscription Request Received from Src: %s, Mtype: %v, SubId: %v, Xid: %s, Meid: %v",params.Src, params.Mtype, params.SubId, params.Xid, params.Meid)
149 xapp.Rmr.Free(params.Mbuf)
152 /* Reserve a sequence number and set it in the payload */
153 newSubId, isIdValid := c.registry.ReserveSequenceNumber()
154 if isIdValid != true {
155 xapp.Logger.Error("Further processing of this SubscriptionRequest stopped. SubId: %v, Xid: %s",params.SubId, params.Xid)
159 err := c.e2ap.SetSubscriptionRequestSequenceNumber(params.Payload, newSubId)
161 xapp.Logger.Error("Unable to set Subscription Sequence Number in Payload. Dropping this Subscription Request message. Err: v%, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
165 srcAddr, srcPort, err := c.rtmgrClient.SplitSource(params.Src)
167 xapp.Logger.Error("Failed to update routing-manager about the subscription request with reason: %s", err)
171 /* Create transatcion records for every subscription request */
172 xactKey := TransactionKey{newSubId, CREATE}
173 xactValue := Transaction{*srcAddr, *srcPort, params}
174 err = c.tracker.TrackTransaction(xactKey, xactValue)
176 xapp.Logger.Error("Failed to create a Subscription Request transaction record. Err: %v", err)
180 /* Update routing manager about the new subscription*/
181 subRouteAction := SubRouteInfo{CREATE, *srcAddr, *srcPort, newSubId}
182 xapp.Logger.Info("Starting routing manager update")
183 err = c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
185 xapp.Logger.Error("Failed to update routing manager. Dropping this Subscription Request message. Err: %v. SubId: %v, Xid: %s", err, params.SubId, params.Xid)
189 // Setting new subscription ID in the RMR header
190 params.SubId = int(newSubId)
191 xapp.Logger.Info("Forwarding Subscription Request to E2T: Mtype: %v, SubId: %v, Xid %s, Meid %v",params.Mtype, params.SubId, params.Xid, params.Meid)
192 err = c.rmrSend(params)
194 xapp.Logger.Error("Failed to send request to E2T %v. SubId: %v, Xid: %s", err, params.SubId, params.Xid)
196 c.timerMap.StartTimer(newSubId, subReqTime, c.handleSubscriptionRequestTimer)
198 xapp.Logger.Debug("--- Debugging transaction table = %v", c.tracker.transactionTable)
202 func (c *Control) handleSubscriptionResponse(params *xapp.RMRParams) {
203 xapp.Logger.Info("Subscription Response Received from Src: %s, Mtype: %v, SubId: %v, Meid: %v",params.Src, params.Mtype, params.SubId, params.Meid)
204 xapp.Rmr.Free(params.Mbuf)
207 payloadSeqNum, err := c.e2ap.GetSubscriptionResponseSequenceNumber(params.Payload)
209 xapp.Logger.Error("Unable to get Subscription Sequence Number from Payload. Dropping this Subscription Responsemessage. Err: v%, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
213 xapp.Logger.Info("Received payloadSeqNum: %v",payloadSeqNum)
214 if !c.registry.IsValidSequenceNumber(payloadSeqNum) {
215 xapp.Logger.Error("Unknown payloadSeqNum. Dropping this Subscription Response message. PayloadSeqNum: %v, SubId: %v, Xid: %s", payloadSeqNum, params.SubId, params.Xid)
219 // c.timerMap.StopTimer(payloadSeqNum)
221 c.registry.setSubscriptionToConfirmed(payloadSeqNum)
222 var transaction Transaction
223 transaction, err = c.tracker.RetriveTransaction(payloadSeqNum, CREATE)
225 xapp.Logger.Error("Failed to retrive transaction record. Dropping this Subscription Response message. Err: v%, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
228 xapp.Logger.Info("Subscription ID: %v, from address: %v:%v. Retrieved old subId...", int(payloadSeqNum), transaction.XappInstanceAddress, transaction.XappPort)
230 params.SubId = int(payloadSeqNum)
231 params.Xid = transaction.OrigParams.Xid
233 xapp.Logger.Info("Forwarding Subscription Response to UEEC: Mtype: %v, SubId: %v, Xid: %s, Meid: %v",params.Mtype, params.SubId, params.Xid, params.Meid)
234 err = c.rmrReplyToSender(params)
236 xapp.Logger.Error("Failed to send response to requestor %v. SubId: %v, Xid: %s", err, params.SubId, params.Xid)
239 xapp.Logger.Info("Subscription ID: %v, from address: %v:%v. Deleting transaction record", int(payloadSeqNum), transaction.XappInstanceAddress, transaction.XappPort)
240 transaction, err = c.tracker.completeTransaction(payloadSeqNum, CREATE)
242 xapp.Logger.Error("Failed to delete a Subscription Request transaction record due to %v", err)
248 func (c *Control) handleSubscriptionFailure(params *xapp.RMRParams) {
249 xapp.Logger.Info("Subscription Failure Received from Src: %s, Mtype: %v, SubId: %v, Meid: %v",params.Src, params.Mtype, params.SubId, params.Meid)
250 xapp.Rmr.Free(params.Mbuf)
253 payloadSeqNum, err := c.e2ap.GetSubscriptionFailureSequenceNumber(params.Payload)
255 xapp.Logger.Error("Unable to get Subscription Sequence Number from Payload. Dropping this Subscription Failure message. Err: v%, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
258 xapp.Logger.Info("Received payloadSeqNum: %v", payloadSeqNum)
260 // c.timerMap.StopTimer(payloadSeqNum)
262 var transaction Transaction
263 transaction, err = c.tracker.RetriveTransaction(payloadSeqNum, CREATE)
265 xapp.Logger.Error("Failed to retrive transaction record. Dropping this Subscription Failure message. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
268 xapp.Logger.Info("Subscription ID: %v, from address: %v:%v. Forwarding response to requestor...", int(payloadSeqNum), transaction.XappInstanceAddress, transaction.XappPort)
270 params.SubId = int(payloadSeqNum)
271 params.Xid = transaction.OrigParams.Xid
273 xapp.Logger.Info("Forwarding Subscription Failure to UEEC: Mtype: %v, SubId: %v, Xid: %v, Meid: %v",params.Mtype, params.SubId, params.Xid, params.Meid)
274 err = c.rmrReplyToSender(params)
276 xapp.Logger.Error("Failed to send response to requestor %v. SubId: %v, Xid: %s", err, params.SubId, params.Xid)
279 time.Sleep(3 * time.Second)
281 xapp.Logger.Info("Starting routing manager update")
282 subRouteAction := SubRouteInfo{CREATE, transaction.XappInstanceAddress, transaction.XappPort, payloadSeqNum}
283 err = c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
285 xapp.Logger.Error("Failed to update routing manager. Err: %v. SubId: %v, Xid: %s", err, params.SubId, params.Xid)
288 xapp.Logger.Info("Deleting trancaction record")
289 if c.registry.releaseSequenceNumber(payloadSeqNum) {
290 transaction, err = c.tracker.completeTransaction(payloadSeqNum, CREATE)
292 xapp.Logger.Error("Failed to delete a Subscription Request transaction record due to %v", err)
296 xapp.Logger.Error("Failed to release sequency number. SubId: %v, Xid: %s", params.SubId, params.Xid)
302 func (c *Control) handleSubscriptionRequestTimer(subId uint16) {
303 xapp.Logger.Info("Subscription Request timer expired. SubId: %v",subId)
305 transaction, err := c.tracker.completeTransaction(subId, CREATE)
307 xapp.Logger.Error("Failed to delete a Subscription Request transaction record due to %v", err)
310 xapp.Logger.Info("SubId: %v, Xid %v, Meid: %v",subId, transaction.OrigParams.Xid, transaction.OrigParams.Meid)
312 var params xapp.RMRParams
313 params.Mtype = 12012 //xapp.RICMessageTypes["RIC_SUB_FAILURE"]
314 params.SubId = int(subId)
315 params.Meid = transaction.OrigParams.Meid
316 params.Xid = transaction.OrigParams.Xid
317 payload := []byte("40C9408098000003EA7E00050000010016EA6300020021EA6E00808180EA6F000400000000EA6F000400010040EA6F000400020080EA6F0004000300C0EA6F000400040100EA6F000400050140EA6F000400060180EA6F0004000701C0EA6F000400080200EA6F000400090240EA6F0004000A0280EA6F0004000B02C0EA6F0004000C0300EA6F0004000D0340EA6F0004000E0380EA6F0004000F03C0")
318 params.PayloadLen = len(payload)
319 params.Payload = payload
321 xapp.Logger.Info("Forwarding Subscription Failure to UEEC: Mtype: %v, SubId: %v, Xid: %s, Meid: %v",params.Mtype, params.SubId, params.Xid, params.Meid)
322 err = c.rmrReplyToSender(¶ms)
324 xapp.Logger.Error("Failed to send response to requestor %v. SubId: %v, Xid: %s", err, params.SubId, params.Xid)
328 time.Sleep(3 * time.Second)
330 xapp.Logger.Info("Subscription ID: %v, from address: %v:%v. Deleting transaction record", int(subId), transaction.XappInstanceAddress, transaction.XappPort)
332 xapp.Logger.Info("Starting routing manager update")
333 subRouteAction := SubRouteInfo{DELETE, transaction.XappInstanceAddress, transaction.XappPort, payloadSeqNum}
334 c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
336 xapp.Logger.Info("Deleting trancaction record")
337 if c.registry.releaseSequenceNumber(payloadSeqNum) {
338 transaction, err = c.tracker.completeTransaction(payloadSeqNum, CREATE)
340 xapp.Logger.Error("Failed to delete a Subscription Request transaction record due to %v", err)
348 func (act Action) String() string {
349 actions := [...]string{
356 if act < CREATE || act > DELETE {
362 func (act Action) valid() bool {
364 case CREATE, MERGE, DELETE:
371 func (c *Control) handleSubscriptionDeleteRequest(params *xapp.RMRParams) {
372 xapp.Logger.Info("Subscription Delete Request Received from Src: %s, Mtype: %v, SubId: %v, Xid: %s, Meid: %v",params.Src, params.Mtype, params.SubId, params.Xid, params.Meid)
373 xapp.Rmr.Free(params.Mbuf)
376 payloadSeqNum, err := c.e2ap.GetSubscriptionDeleteRequestSequenceNumber(params.Payload)
378 xapp.Logger.Error("Unable to get Subscription Sequence Number from Payload. Dropping this Subscription Delete Request message. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
381 xapp.Logger.Info("Received payloadSeqNum: %v", payloadSeqNum)
382 if c.registry.IsValidSequenceNumber(payloadSeqNum) {
383 c.registry.deleteSubscription(payloadSeqNum)
384 err = c.trackDeleteTransaction(params, payloadSeqNum)
386 xapp.Logger.Error("Failed to create transaction record. Dropping this Subscription Delete Request message. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
390 xapp.Logger.Error("Not valid sequence number. Dropping this Subscription Delete Request message. SubId: %v, Xid: %s", params.SubId, params.Xid)
394 xapp.Logger.Info("Forwarding Delete Subscription Request to E2T: Mtype: %v, SubId: %v, Xid: %s, Meid: %v",params.Mtype, params.SubId, params.Xid, params.Meid)
397 xapp.Logger.Error("Failed to send request to E2T %v. SubId: %v, Xid: %s", err, params.SubId, params.Xid)
399 c.timerMap.StartTimer(payloadSeqNum, SubDelReqTime, c.handleSubscriptionDeleteRequestTimer)
404 func (c *Control) trackDeleteTransaction(params *xapp.RMRParams, payloadSeqNum uint16) (err error) {
405 srcAddr, srcPort, err := c.rtmgrClient.SplitSource(params.Src)
407 xapp.Logger.Error("Failed to update routing-manager about the subscription delete request with reason: %s", err)
409 xactKey := TransactionKey{payloadSeqNum, DELETE}
410 xactValue := Transaction{*srcAddr, *srcPort, params}
411 err = c.tracker.TrackTransaction(xactKey, xactValue)
415 func (c *Control) handleSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
416 xapp.Logger.Info("Subscription Delete Response Received from Src: %s, Mtype: %v, SubId: %v, Meid: %v",params.Src, params.Mtype, params.SubId, params.Meid)
417 xapp.Rmr.Free(params.Mbuf)
420 payloadSeqNum, err := c.e2ap.GetSubscriptionDeleteResponseSequenceNumber(params.Payload)
422 xapp.Logger.Error("Unable to get Subscription Sequence Number from Payload. Dropping this Subscription Delete Response message. Err: %, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
425 xapp.Logger.Info("Received payloadSeqNum: %v", payloadSeqNum)
427 // c.timerMap.StopTimer(payloadSeqNum)
429 var transaction Transaction
430 transaction, err = c.tracker.RetriveTransaction(payloadSeqNum, DELETE)
432 xapp.Logger.Error("Failed to retrive transaction record. Dropping this Subscription Delete Response message. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
435 xapp.Logger.Info("Subscription ID: %v, from address: %v:%v. Forwarding response to requestor...", int(payloadSeqNum), transaction.XappInstanceAddress, transaction.XappPort)
437 params.SubId = int(payloadSeqNum)
438 params.Xid = transaction.OrigParams.Xid
439 xapp.Logger.Info("Forwarding Subscription Delete Response to UEEC: Mtype: %v, SubId: %v, Xid: %v, Meid: %v",params.Mtype, params.SubId, params.Xid, params.Meid)
440 err = c.rmrReplyToSender(params)
442 xapp.Logger.Error("Failed to send response to requestor %v. SubId: %v, Xid: %s", err, params.SubId, params.Xid)
446 time.Sleep(3 * time.Second)
448 xapp.Logger.Info("Starting routing manager update")
449 subRouteAction := SubRouteInfo{DELETE, transaction.XappInstanceAddress, transaction.XappPort, payloadSeqNum}
450 err = c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
452 xapp.Logger.Error("Failed to update routing manager %v. SubId: %v, Xid: %s", err, params.SubId, params.Xid)
456 xapp.Logger.Info("Deleting trancaction record")
457 if c.registry.releaseSequenceNumber(payloadSeqNum) {
458 transaction, err = c.tracker.completeTransaction(payloadSeqNum, DELETE)
460 xapp.Logger.Error("Failed to delete a Subscription Delete Request transaction record due to %v", err)
464 xapp.Logger.Error("Failed to release sequency number. SubId: %v, Xid: %s", params.SubId, params.Xid)
470 func (c *Control) handleSubscriptionDeleteFailure(params *xapp.RMRParams) {
471 xapp.Logger.Info("Subscription Delete Failure Received from Src: %s, Mtype: %v, SubId: %v, Meid: %v",params.Src, params.Mtype, params.SubId, params.Meid)
472 xapp.Rmr.Free(params.Mbuf)
475 payloadSeqNum, err := c.e2ap.GetSubscriptionDeleteFailureSequenceNumber(params.Payload)
477 xapp.Logger.Error("Unable to get Subscription Sequence Number from Payload. Dropping this Subscription Delete Failure message. Err: %, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
480 xapp.Logger.Info("Received payloadSeqNum: %v", payloadSeqNum)
482 // c.timerMap.StopTimer(payloadSeqNum)
484 var transaction Transaction
485 transaction, err = c.tracker.RetriveTransaction(payloadSeqNum, DELETE)
487 xapp.Logger.Error("Failed to retrive transaction record. Dropping this Subscription Delete Failure message. Err %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
490 xapp.Logger.Info("Subscription ID: %v, from address: %v:%v. Forwarding response to requestor...", int(payloadSeqNum), transaction.XappInstanceAddress, transaction.XappPort)
492 params.SubId = int(payloadSeqNum)
493 params.Xid = transaction.OrigParams.Xid
494 xapp.Logger.Info("Forwarding Subscription Delete Failure to UEEC: Mtype: %v, SubId: %v, Xid: %v, Meid: %v",params.Mtype, params.SubId, params.Xid, params.Meid)
495 err = c.rmrReplyToSender(params)
497 xapp.Logger.Error("Failed to send response to requestor %v. SubId: %v, Xid: %s", err, params.SubId, params.Xid)
501 time.Sleep(3 * time.Second)
503 xapp.Logger.Info("Starting routing manager update")
504 subRouteAction := SubRouteInfo{DELETE, transaction.XappInstanceAddress, transaction.XappPort, payloadSeqNum}
505 c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
507 xapp.Logger.Error("Failed to update routing manager %v. SubId: %v, Xid: %s", err, params.SubId, params.Xid)
511 xapp.Logger.Info("Deleting trancaction record")
512 if c.registry.releaseSequenceNumber(payloadSeqNum) {
513 transaction, err = c.tracker.completeTransaction(payloadSeqNum, DELETE)
515 xapp.Logger.Error("Failed to delete a Subscription Delete Request transaction record due to %v", err)
519 xapp.Logger.Error("Failed to release sequency number. SubId: %v, Xid: %s", err, params.SubId, params.Xid)
525 func (c *Control) handleSubscriptionDeleteRequestTimer(subId uint16) {
526 xapp.Logger.Info("Subscription Delete Request timer expired. SubId: %v",subId)
528 transaction, err := c.tracker.completeTransaction(subId, DELETE)
530 xapp.Logger.Error("Failed to delete a Subscription Delete Request transaction record due to %v", err)
533 xapp.Logger.Info("SubId: %v, Xid %v, Meid: %v",subId, transaction.OrigParams.Xid, transaction.OrigParams.Meid)
535 var params xapp.RMRParams
536 params.Mtype = 12022 //xapp.RICMessageTypes["RIC_SUB_DEL_FAILURE"]
537 params.SubId = int(subId)
538 params.Meid = transaction.OrigParams.Meid
539 params.Xid = transaction.OrigParams.Xid
540 payload := []byte("40CA4018000003EA7E00050000010016EA6300020021EA74000200C0")
541 params.PayloadLen = len(payload)
542 params.Payload = payload
544 xapp.Logger.Info("Forwarding Subscription Delete Failure to UEEC: Mtype: %v, SubId: %v, Xid: %s, Meid: %v",params.Mtype, params.SubId, params.Xid, params.Meid)
545 err = c.rmrReplyToSender(¶ms)
547 xapp.Logger.Error("Failed to send response to requestor %v. SubId: %v, Xid: %s", err, params.SubId, params.Xid)
551 time.Sleep(3 * time.Second)
552 xapp.Logger.Info("Subscription ID: %v, from address: %v:%v. Deleting transaction record", int(subId), transaction.XappInstanceAddress, transaction.XappPort)
554 xapp.Logger.Info("Starting routing manager update")
555 subRouteAction := SubRouteInfo{DELETE, transaction.XappInstanceAddress, transaction.XappPort, payloadSeqNum}
556 c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
558 xapp.Logger.Info("Deleting trancaction record")
559 if c.registry.releaseSequenceNumber(payloadSeqNum) {
560 transaction, err = c.tracker.completeTransaction(payloadSeqNum, DELETE)
562 xapp.Logger.Error("Failed to delete a Subscription Delete Request transaction record due to %v", err)