From 8046c70a77be2de39ffda0092b1d86008145d81a Mon Sep 17 00:00:00 2001 From: Anssi Mannila Date: Thu, 2 Jan 2020 13:39:05 +0200 Subject: [PATCH] RICPLT-2801, RICPLT-2802 Change-Id: I750c3e404cbe9f1c0d8daa4f4ae30bc5f3964b8c Signed-off-by: Anssi Mannila --- go.mod | 1 + go.sum | 4 + pkg/control/control.go | 356 ++++++++++++++++++++++++++++--------------------- pkg/control/e2ap.go | 99 +++++++++++--- pkg/control/timer.go | 51 +++++-- pkg/control/tracker.go | 52 +++++++- 6 files changed, 376 insertions(+), 187 deletions(-) mode change 100644 => 100755 pkg/control/control.go mode change 100644 => 100755 pkg/control/timer.go diff --git a/go.mod b/go.mod index 6c5594a..73f1fd0 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ go 1.12 module gerrit.o-ran-sc.org/r/ric-plt/submgr require ( + gerrit.o-ran-sc.org/r/ric-plt/e2ap v0.0.0-00010101000000-000000000000 gerrit.o-ran-sc.org/r/ric-plt/xapp-frame v0.0.23 github.com/go-openapi/errors v0.19.2 github.com/go-openapi/runtime v0.19.7 diff --git a/go.sum b/go.sum index cf898c4..7de3fb3 100644 --- a/go.sum +++ b/go.sum @@ -118,6 +118,7 @@ github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/mux v1.7.1 h1:Dw4jY2nghMMRsh1ol8dv1axHkDwMQK2DHerMNJsIpJU= @@ -141,6 +142,7 @@ github.com/kr/pty v1.1.5/go.mod h1:9r2w37qlBe7rQ6e1fg1S/9xpWHSnaqNdHD3WcMdbPDA= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/magiconair/properties v1.8.0 h1:LLgXmsheXeRoUOBOjtwPQCWIYqM/LU1ayDtDePerRcY= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= +github.com/magiconair/properties v1.8.1 h1:ZC2Vc7/ZFkGmsVC9KvOjumD+G5lXy2RtTKyzRKO2BQ4= github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/mailru/easyjson v0.0.0-20180823135443-60711f1a8329/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.0.0-20190312143242-1de009706dbe/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= @@ -190,6 +192,7 @@ github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg= github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= github.com/spf13/viper v1.4.0 h1:yXHLWeravcrgGyFSyCgdYpXQ9dR9c/WED3pg1RhxqEU= github.com/spf13/viper v1.4.0/go.mod h1:PTJ7Z/lr49W6bUbkmS1V3by4uWynFiR9p7+dSq/yZzE= +github.com/spf13/viper v1.5.0 h1:GpsTwfsQ27oS/Aha/6d1oD7tpKIqWnOA6tgOX9HHkt4= github.com/spf13/viper v1.5.0/go.mod h1:AkYRkVJF8TkSG/xet6PzXX+l39KhhXa2pdqVSxnTcn4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -199,6 +202,7 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= diff --git a/pkg/control/control.go b/pkg/control/control.go old mode 100644 new mode 100755 index cb085de..d5a92b6 --- a/pkg/control/control.go +++ b/pkg/control/control.go @@ -19,8 +19,6 @@ package control -import "C" - import ( "errors" rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client" @@ -35,7 +33,9 @@ import ( ) var subReqTime time.Duration = 5 * time.Second -var SubDelReqTime time.Duration = 5 * time.Second +var subDelReqTime time.Duration = 5 * time.Second +var maxSubReqTryCount uint64 = 2 // Initial try + retry +var maxSubDelReqTryCount uint64 = 2 // Initial try + retry type Control struct { e2ap *E2ap @@ -62,7 +62,7 @@ const ( ) func init() { - xapp.Logger.Info("SUBMGR /ric-plt-submgr:r3-test-v4") + xapp.Logger.Info("SUBMGR") viper.AutomaticEnv() viper.SetEnvPrefix("submgr") viper.AllowEmptyEnv(true) @@ -158,12 +158,14 @@ func (c *Control) handleSubscriptionRequest(params *xapp.RMRParams) { newSubId, isIdValid := c.registry.ReserveSequenceNumber() if isIdValid != true { xapp.Logger.Error("SubReq: Failed to reserve sequence number. Dropping this msg. SubId: %v, Xid: %s", params.SubId, params.Xid) + c.registry.releaseSequenceNumber(newSubId) return } + params.SubId = int(newSubId) err := c.e2ap.SetSubscriptionRequestSequenceNumber(params.Payload, newSubId) if err != nil { - xapp.Logger.Error("SubReq: Unable to set Sequence Number in Payload. Dropping this msg. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid) + xapp.Logger.Error("SubReq: Unable to get Sequence Number from Payload. Dropping this msg. Err: %v, SubId: %v, Xid: %s, Payload %X", err, params.SubId, params.Xid, params.Payload) c.registry.releaseSequenceNumber(newSubId) return } @@ -175,33 +177,34 @@ func (c *Control) handleSubscriptionRequest(params *xapp.RMRParams) { return } - /* Create transatcion records for every subscription request */ - transaction, err := c.tracker.TrackTransaction(newSubId, CREATE, *srcAddr, *srcPort, params) + // Create transatcion record for every subscription request + var forwardRespToXapp bool = true + var responseReceived bool = false + transaction, err := c.tracker.TrackTransaction(newSubId, CREATE, *srcAddr, *srcPort, params, responseReceived, forwardRespToXapp) if err != nil { xapp.Logger.Error("SubReq: Failed to create transaction record. Dropping this msg. Err: %v SubId: %v, Xid: %s", err, params.SubId, params.Xid) c.registry.releaseSequenceNumber(newSubId) return } - /* Update routing manager about the new subscription*/ + // Update routing manager about the new subscription subRouteAction := transaction.SubRouteInfo() xapp.Logger.Info("SubReq: Starting routing manager update. SubId: %v, Xid: %s", params.SubId, params.Xid) err = c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction) if err != nil { - xapp.Logger.Error("SubReq: Failed to update routing manager. Dropping this SubReq msg. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid) + xapp.Logger.Error("SubReq: Failed to update routing manager. Dropping this msg. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid) + c.registry.releaseSequenceNumber(newSubId) return } // Setting new subscription ID in the RMR header - params.SubId = int(newSubId) - xapp.Logger.Info("Forwarding SubReq to E2T: Mtype: %v, SubId: %v, Xid %s, Meid %v", params.Mtype, params.SubId, params.Xid, params.Meid) + xapp.Logger.Info("SubReq: Forwarding SubReq to E2T: Mtype: %v, SubId: %v, Xid %s, Meid %v", params.Mtype, params.SubId, params.Xid, params.Meid) err = c.rmrSend(params) if err != nil { xapp.Logger.Error("SubReq: Failed to send request to E2T %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid) - } else { - c.timerMap.StartTimer("RIC_SUB_REQ", int(newSubId), subReqTime, c.handleSubscriptionRequestTimer) } + c.timerMap.StartTimer("RIC_SUB_REQ", int(newSubId), subReqTime, FirstTry, c.handleSubscriptionRequestTimer) xapp.Logger.Debug("SubReq: Debugging transaction table = %v", c.tracker.transactionTable) return } @@ -213,7 +216,7 @@ func (c *Control) handleSubscriptionResponse(params *xapp.RMRParams) { payloadSeqNum, err := c.e2ap.GetSubscriptionResponseSequenceNumber(params.Payload) if err != nil { - xapp.Logger.Error("SubResp: Unable to get Sequence Number from Payload. Dropping this msg. Err: %v, SubId: %v", err, params.SubId) + xapp.Logger.Error("SubResp: Unable to get Sequence Number from Payload. Dropping this msg. Err: %v, SubId: %v, Xid: %s, Payload %X", err, params.SubId, params.Xid, params.Payload) return } xapp.Logger.Info("SubResp: Received payloadSeqNum: %v", payloadSeqNum) @@ -225,13 +228,19 @@ func (c *Control) handleSubscriptionResponse(params *xapp.RMRParams) { c.timerMap.StopTimer("RIC_SUB_REQ", int(payloadSeqNum)) - c.registry.setSubscriptionToConfirmed(payloadSeqNum) - transaction, err := c.tracker.RetriveTransaction(payloadSeqNum, CREATE) + transaction, responseReceived, err := c.tracker.CheckResponseReceived(payloadSeqNum, CREATE) if err != nil { - xapp.Logger.Error("SubResp: Failed to retrive transaction record. Dropping this msg. Err: %v, SubId: %v", err, params.SubId) + xapp.Logger.Info("SubResp: Dropping this msg. Err: %v SubId: %v", err, payloadSeqNum) return } - xapp.Logger.Info("SubResp: SubId: %v, from address: %v:%v. Retrieved old subId", int(payloadSeqNum), transaction.Xappkey.Addr, transaction.Xappkey.Port) + + if responseReceived == true { + // Subscription timer already received + return + } + xapp.Logger.Info("SubResp: SubId: %v, from address: %v:%v.", payloadSeqNum, transaction.Xappkey.Addr, transaction.Xappkey.Port) + + c.registry.setSubscriptionToConfirmed(payloadSeqNum) params.SubId = int(payloadSeqNum) params.Xid = transaction.OrigParams.Xid @@ -242,8 +251,8 @@ func (c *Control) handleSubscriptionResponse(params *xapp.RMRParams) { xapp.Logger.Error("SubResp: Failed to send response to xApp. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid) } - xapp.Logger.Info("SubResp: SubId: %v, from address: %v:%v. Deleting transaction record", int(payloadSeqNum), transaction.Xappkey.Addr, transaction.Xappkey.Port) - transaction, err = c.tracker.completeTransaction(payloadSeqNum, CREATE) + xapp.Logger.Info("SubResp: SubId: %v, from address: %v:%v. Deleting transaction record", payloadSeqNum, transaction.Xappkey.Addr, transaction.Xappkey.Port) + _, err = c.tracker.completeTransaction(payloadSeqNum, CREATE) if err != nil { xapp.Logger.Error("SubResp: Failed to delete transaction record. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid) return @@ -258,28 +267,24 @@ func (c *Control) handleSubscriptionFailure(params *xapp.RMRParams) { payloadSeqNum, err := c.e2ap.GetSubscriptionFailureSequenceNumber(params.Payload) if err != nil { - xapp.Logger.Error("SubFail: Unable to get Sequence Number from Payload. Dropping this msg. Err: %v, SubId: %v", err, params.SubId) + xapp.Logger.Error("SubFail: Unable to get Sequence Number from Payload. Dropping this msg. Err: %v, SubId: %v, Xid: %s, Payload %X", err, params.SubId, params.Xid, params.Payload) return } xapp.Logger.Info("SubFail: Received payloadSeqNum: %v", payloadSeqNum) c.timerMap.StopTimer("RIC_SUB_REQ", int(payloadSeqNum)) - transaction, err := c.tracker.RetriveTransaction(payloadSeqNum, CREATE) + transaction, responseReceived, err := c.tracker.CheckResponseReceived(payloadSeqNum, CREATE) if err != nil { - xapp.Logger.Error("SubFail: Failed to retrive transaction record. Dropping this msg. Err: %v, SubId: %v", err, params.SubId) + xapp.Logger.Info("SubFail: Dropping this msg. Err: %v SubId: %v", err, payloadSeqNum) return } - xapp.Logger.Info("SubFail: SubId: %v, from address: %v:%v. Forwarding response to xApp", int(payloadSeqNum), transaction.Xappkey.Addr, transaction.Xappkey.Port) - params.SubId = int(payloadSeqNum) - params.Xid = transaction.OrigParams.Xid - - xapp.Logger.Info("Forwarding SubFail to xApp: Mtype: %v, SubId: %v, Xid: %v, Meid: %v", params.Mtype, params.SubId, params.Xid, params.Meid) - err = c.rmrReplyToSender(params) - if err != nil { - xapp.Logger.Error("Failed to send response to xApp. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid) + if responseReceived == true { + // Subscription timer already received + return } + xapp.Logger.Info("SubFail: SubId: %v, from address: %v:%v. Forwarding response to xApp", payloadSeqNum, transaction.Xappkey.Addr, transaction.Xappkey.Port) time.Sleep(3 * time.Second) @@ -292,7 +297,7 @@ func (c *Control) handleSubscriptionFailure(params *xapp.RMRParams) { xapp.Logger.Info("SubFail: Deleting transaction record. SubId: %v, Xid: %s", params.SubId, params.Xid) if c.registry.releaseSequenceNumber(payloadSeqNum) { - transaction, err = c.tracker.completeTransaction(payloadSeqNum, CREATE) + _, err = c.tracker.completeTransaction(payloadSeqNum, CREATE) if err != nil { xapp.Logger.Error("SubFail: Failed to delete transaction record. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid) return @@ -304,72 +309,81 @@ func (c *Control) handleSubscriptionFailure(params *xapp.RMRParams) { return } -func (c *Control) handleSubscriptionRequestTimer(strId string, nbrId int) { - newSubId := uint16(nbrId) - xapp.Logger.Info("SubReq timer expired. newSubId: %v", newSubId) - // var causeContent uint8 = 1 // just some random cause. To be checked later. Should be no respose or something - // var causeVal uint8 = 1 // just some random val. To be checked later. Should be no respose or something - // c.sendSubscriptionFailure(newSubId, causeContent, causeVal) -} - -/* -func (c *Control) sendSubscriptionFailure(subId uint16, causeContent uint8, causeVal uint8) { +func (c *Control) handleSubscriptionRequestTimer(strId string, nbrId int, tryCount uint64) { + subId := uint16(nbrId) + xapp.Logger.Info("handleSubTimer: SubReq timer expired. subId: %v, tryCount: %v", subId, tryCount) - transaction, err := c.tracker.completeTransaction(subId, CREATE) + transaction, responseReceived, err := c.tracker.CheckResponseReceived(subId, CREATE) if err != nil { - xapp.Logger.Error("SendSubFail: Failed to delete transaction record. Err:%v. SubId: %v", err, subId) + xapp.Logger.Info("handleSubTimer: Dropping this timer action. Err: %v SubId: %v", err, subId) return } - xapp.Logger.Info("SendSubFail: SubId: %v, Xid %v, Meid: %v", subId, transaction.OrigParams.Xid, transaction.OrigParams.Meid) - var params xapp.RMRParams - params.Mtype = 12012 //xapp.RICMessageTypes["RIC_SUB_FAILURE"] - params.SubId = int(subId) - params.Meid = transaction.OrigParams.Meid - params.Xid = transaction.OrigParams.Xid + if responseReceived == true { + // Subscription Response or Failure already received + return + } -// newPayload, packErr := c.e2ap.PackSubscriptionFailure(transaction.OrigParams.Payload, subId, causeContent, causeVal) -// if packErr != nil { -// xapp.Logger.Error("SendSubFail: PackSubscriptionFailure() due to %v", packErr) -// return -// } + if tryCount < maxSubReqTryCount { + xapp.Logger.Info("handleSubTimer: Resending SubReq to E2T: Mtype: %v, SubId: %v, Xid %s, Meid %v", transaction.OrigParams.Mtype, transaction.OrigParams.SubId, transaction.OrigParams.Xid, transaction.OrigParams.Meid) + // Set possible to handle new response for the subId + err = c.tracker.RetryTransaction(subId, CREATE) + if err != nil { + xapp.Logger.Error("handleSubDelTimer: Failed to retry transaction record. Dropping timer action. Err %v, SubId: %v", err, transaction.OrigParams.SubId) + return + } - newPayload := []byte("40CA4018000003EA7E00050000010016EA6300020021EA74000200C0") // Temporary solution + err = c.rmrSend(transaction.OrigParams) + if err != nil { + xapp.Logger.Error("handleSubTimer: Failed to send request to E2T %v, SubId: %v, Xid: %s", err, transaction.OrigParams.SubId, transaction.OrigParams.Xid) + } - params.PayloadLen = len(newPayload) - params.Payload = newPayload + tryCount++ + c.timerMap.StartTimer("RIC_SUB_REQ", int(subId), subReqTime, tryCount, c.handleSubscriptionRequestTimer) + return + } - xapp.Logger.Info("SendSubFail: Forwarding failure to xApp: Mtype: %v, SubId: %v, Xid: %s, Meid: %v",params.Mtype, params.SubId, params.Xid, params.Meid) - err = c.rmrReplyToSender(¶ms) + var subDelReqPayload []byte + subDelReqPayload, err = c.e2ap.PackSubscriptionDeleteRequest(transaction.OrigParams.Payload, subId) if err != nil { - xapp.Logger.Error("SendSubFail: Failed to send response to xApp. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid) + xapp.Logger.Error("handleSubTimer: Packing SubDelReq failed. Err: %v", err) + return } - time.Sleep(3 * time.Second) + // Cancel failed subscription + var params xapp.RMRParams + params.Mtype = 12020 // RIC SUBSCRIPTION DELETE + params.SubId = int(subId) + params.Xid = transaction.OrigParams.Xid + params.Meid = transaction.OrigParams.Meid + params.Src = transaction.OrigParams.Src + params.PayloadLen = len(subDelReqPayload) + params.Payload = subDelReqPayload + params.Mbuf = nil - xapp.Logger.Info("SendSubFail: SubId: %v, from address: %v:%v. Deleting transaction record", int(subId), transaction.Xappkey.Addr, transaction.Xappkey.Port) + // Delete CREATE transaction + _, err = c.tracker.completeTransaction(subId, CREATE) + if err != nil { + xapp.Logger.Error("handleSubTimer: Failed to delete create transaction record. Dropping this timer action. Err: %v, SubId: %v, Xid: %s", err, subId, params.Xid) + return + } - xapp.Logger.Info("SubReqTimer: Starting routing manager update. SubId: %v, Xid: %s", params.SubId, params.Xid) - subRouteAction := SubRouteInfo{DELETE, transaction.Xappkey.Addr, transaction.Xappkey.Port, subId} - err = c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction) + // Create DELETE transaction + var forwardRespToXapp bool = false + _, err = c.trackDeleteTransaction(¶ms, subId, forwardRespToXapp) if err != nil { - xapp.Logger.Error("SendSubFail: Failed to update routing manager %v. SubId: %v, Xid: %s", err, params.SubId, params.Xid) + xapp.Logger.Error("handleSubTimer: Failed to create delete transaction record. Dropping this timer action. Err: %v, SubId: %v, Xid: %s", err, subId, params.Xid) return } - xapp.Logger.Info("SendSubFail: Deleting transaction record. SubId: %v, Xid: %s", params.SubId, params.Xid) - if c.registry.releaseSequenceNumber(subId) { - transaction, err = c.tracker.completeTransaction(subId, CREATE) - if err != nil { - xapp.Logger.Error("SendSubFail: Failed to delete transaction record. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid) - return - } - } else { - xapp.Logger.Error("SendSubFail: Failed to release sequency number. SubId: %v, Xid: %s", params.SubId, params.Xid) + xapp.Logger.Info("handleSubTimer: Sending SubDelReq to E2T: Mtype: %v, SubId: %v, Meid: %v", params.Mtype, params.SubId, params.Meid) + c.rmrSend(¶ms) + if err != nil { + xapp.Logger.Error("handleSubTimer: Failed to send request to E2T %v. SubId: %v, Xid: %s", err, params.SubId, params.Xid) } + c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(subId), subDelReqTime, FirstTry, c.handleSubscriptionDeleteRequestTimer) return } -*/ func (act Action) String() string { actions := [...]string{ @@ -401,14 +415,15 @@ func (c *Control) handleSubscriptionDeleteRequest(params *xapp.RMRParams) { payloadSeqNum, err := c.e2ap.GetSubscriptionDeleteRequestSequenceNumber(params.Payload) if err != nil { - xapp.Logger.Error("SubDelReq: Unable to get Sequence Number from Payload. Dropping this msg. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid) + xapp.Logger.Error("SubDelReq: Unable to get Sequence Number from Payload. Dropping this msg. Err: %v, SubId: %v, Xid: %s, Payload %X", err, params.SubId, params.Xid, params.Payload) return } xapp.Logger.Info("SubDelReq: Received payloadSeqNum: %v", payloadSeqNum) if c.registry.IsValidSequenceNumber(payloadSeqNum) { c.registry.deleteSubscription(payloadSeqNum) - _, err = c.trackDeleteTransaction(params, payloadSeqNum) + var forwardRespToXapp bool = true + _, err = c.trackDeleteTransaction(params, payloadSeqNum, forwardRespToXapp) if err != nil { xapp.Logger.Error("SubDelReq: Failed to create transaction record. Dropping this msg. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid) return @@ -422,18 +437,18 @@ func (c *Control) handleSubscriptionDeleteRequest(params *xapp.RMRParams) { c.rmrSend(params) if err != nil { xapp.Logger.Error("SubDelReq: Failed to send request to E2T. Err %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid) - } else { - c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(payloadSeqNum), subReqTime, c.handleSubscriptionDeleteRequestTimer) } + c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(payloadSeqNum), subDelReqTime, FirstTry, c.handleSubscriptionDeleteRequestTimer) return } -func (c *Control) trackDeleteTransaction(params *xapp.RMRParams, payloadSeqNum uint16) (transaction *Transaction, err error) { +func (c *Control) trackDeleteTransaction(params *xapp.RMRParams, payloadSeqNum uint16, forwardRespToXapp bool) (transaction *Transaction, err error) { srcAddr, srcPort, err := c.rtmgrClient.SplitSource(params.Src) if err != nil { - xapp.Logger.Error("SubDelReq: Failed to update routing-manager. Err: %s, SubId: %v, Xid: %s", err, params.SubId, params.Xid) + xapp.Logger.Error("Failed to split source address. Err: %s, SubId: %v, Xid: %s", err, payloadSeqNum, params.Xid) } - transaction, err = c.tracker.TrackTransaction(payloadSeqNum, DELETE, *srcAddr, *srcPort, params) + var respReceived bool = false + transaction, err = c.tracker.TrackTransaction(payloadSeqNum, DELETE, *srcAddr, *srcPort, params, respReceived, forwardRespToXapp) return } @@ -444,30 +459,36 @@ func (c *Control) handleSubscriptionDeleteResponse(params *xapp.RMRParams) (err payloadSeqNum, err := c.e2ap.GetSubscriptionDeleteResponseSequenceNumber(params.Payload) if err != nil { - xapp.Logger.Error("SubDelResp: Unable to get Sequence Number from Payload. Dropping this msg. Err: %v, SubId: %v", err, params.SubId) + xapp.Logger.Error("SubDelResp: Unable to get Sequence Number from Payload. Dropping this msg. Err: %v, SubId: %v, Xid: %s, Payload %X", err, params.SubId, params.Xid, params.Payload) return } xapp.Logger.Info("SubDelResp: Received payloadSeqNum: %v", payloadSeqNum) c.timerMap.StopTimer("RIC_SUB_DEL_REQ", int(payloadSeqNum)) - transaction, err := c.tracker.RetriveTransaction(payloadSeqNum, DELETE) + transaction, responseReceived, err := c.tracker.CheckResponseReceived(payloadSeqNum, DELETE) if err != nil { - xapp.Logger.Error("SubDelResp: Failed to retrive transaction record. Dropping this msg. Err: %v, SubId: %v", err, params.SubId) + xapp.Logger.Info("SubDelResp: Dropping this msg. Err: %v SubId: %v", err, payloadSeqNum) return } - xapp.Logger.Info("SubDelResp: SubId: %v, from address: %v:%v. Forwarding response to xApp", int(payloadSeqNum), transaction.Xappkey.Addr, transaction.Xappkey.Port) - params.SubId = int(payloadSeqNum) - params.Xid = transaction.OrigParams.Xid - xapp.Logger.Info("Forwarding SubDelResp to xApp: Mtype: %v, SubId: %v, Xid: %v, Meid: %v", params.Mtype, params.SubId, params.Xid, params.Meid) - err = c.rmrReplyToSender(params) - if err != nil { - xapp.Logger.Error("SubDelResp: Failed to send response to xApp. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid) - // return + if responseReceived == true { + // Subscription Delete timer already received + return } + xapp.Logger.Info("SubDelResp: SubId: %v, from address: %v:%v. Forwarding response to xApp", payloadSeqNum, transaction.Xappkey.Addr, transaction.Xappkey.Port) - time.Sleep(3 * time.Second) + if transaction.ForwardRespToXapp == true { + params.SubId = int(payloadSeqNum) + params.Xid = transaction.OrigParams.Xid + xapp.Logger.Info("Forwarding SubDelResp to xApp: Mtype: %v, SubId: %v, Xid: %v, Meid: %v", params.Mtype, params.SubId, params.Xid, params.Meid) + err = c.rmrReplyToSender(params) + if err != nil { + xapp.Logger.Error("SubDelResp: Failed to send response to xApp. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid) + } + + time.Sleep(3 * time.Second) + } xapp.Logger.Info("SubDelResp: Starting routing manager update. SubId: %v, Xid: %s", params.SubId, params.Xid) subRouteAction := SubRouteInfo{DELETE, transaction.Xappkey.Addr, transaction.Xappkey.Port, payloadSeqNum} @@ -479,7 +500,7 @@ func (c *Control) handleSubscriptionDeleteResponse(params *xapp.RMRParams) (err xapp.Logger.Info("SubDelResp: Deleting transaction record. SubId: %v, Xid: %s", params.SubId, params.Xid) if c.registry.releaseSequenceNumber(payloadSeqNum) { - transaction, err = c.tracker.completeTransaction(payloadSeqNum, DELETE) + _, err = c.tracker.completeTransaction(payloadSeqNum, DELETE) if err != nil { xapp.Logger.Error("SubDelResp: Failed to delete transaction record. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid) return @@ -498,34 +519,53 @@ func (c *Control) handleSubscriptionDeleteFailure(params *xapp.RMRParams) { payloadSeqNum, err := c.e2ap.GetSubscriptionDeleteFailureSequenceNumber(params.Payload) if err != nil { - xapp.Logger.Error("SubDelFail: Unable to get Sequence Number from Payload. Dropping this msg. Err: %v, SubId: %v", err, params.SubId) + xapp.Logger.Error("SubDelFail: Unable to get Sequence Number from Payload. Dropping this msg. Err: %v, SubId: %v, Xid: %s, Payload %X", err, params.SubId, params.Xid, params.Payload) return } xapp.Logger.Info("SubDelFail: Received payloadSeqNum: %v", payloadSeqNum) c.timerMap.StopTimer("RIC_SUB_DEL_REQ", int(payloadSeqNum)) - transaction, err := c.tracker.RetriveTransaction(payloadSeqNum, DELETE) + transaction, responseReceived, err := c.tracker.CheckResponseReceived(payloadSeqNum, DELETE) if err != nil { - xapp.Logger.Error("SubDelFail: Failed to retrive transaction record. Dropping msg. Err %v, SubId: %v", err, params.SubId) + xapp.Logger.Info("SubDelFail: Dropping this msg. Err: %v SubId: %v", err, payloadSeqNum) return } - xapp.Logger.Info("SubDelFail: SubId: %v, from address: %v:%v. Forwarding response to xApp", int(payloadSeqNum), transaction.Xappkey.Addr, transaction.Xappkey.Port) - params.SubId = int(payloadSeqNum) - params.Xid = transaction.OrigParams.Xid - xapp.Logger.Info("Forwarding SubDelFail to xApp: Mtype: %v, SubId: %v, Xid: %v, Meid: %v", params.Mtype, params.SubId, params.Xid, params.Meid) - err = c.rmrReplyToSender(params) - if err != nil { - xapp.Logger.Error("Failed to send SubDelFail to xApp. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid) - // return + if responseReceived == true { + // Subscription Delete timer already received + return } + xapp.Logger.Info("SubDelFail: SubId: %v, from address: %v:%v. Forwarding response to xApp", payloadSeqNum, transaction.Xappkey.Addr, transaction.Xappkey.Port) - time.Sleep(3 * time.Second) + if transaction.ForwardRespToXapp == true { + var subDelRespPayload []byte + subDelRespPayload, err = c.e2ap.PackSubscriptionDeleteResponse(transaction.OrigParams.Payload, payloadSeqNum) + if err != nil { + xapp.Logger.Error("SubDelFail:Packing SubDelResp failed. Err: %v", err) + return + } + + params.Mtype = 12021 // RIC SUBSCRIPTION DELETE RESPONSE + params.SubId = int(payloadSeqNum) + params.Xid = transaction.OrigParams.Xid + params.Meid = transaction.OrigParams.Meid + params.Src = transaction.OrigParams.Src + params.PayloadLen = len(subDelRespPayload) + params.Payload = subDelRespPayload + params.Mbuf = nil + xapp.Logger.Info("SubDelFail: Forwarding SubDelFail to xApp: Mtype: %v, SubId: %v, Xid: %v, Meid: %v", params.Mtype, params.SubId, params.Xid, params.Meid) + err = c.rmrReplyToSender(params) + if err != nil { + xapp.Logger.Error("SubDelFail: Failed to send SubDelFail to xApp. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid) + } + + time.Sleep(3 * time.Second) + } xapp.Logger.Info("SubDelFail: Starting routing manager update. SubId: %v, Xid: %s", params.SubId, params.Xid) subRouteAction := SubRouteInfo{DELETE, transaction.Xappkey.Addr, transaction.Xappkey.Port, payloadSeqNum} - c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction) + err = c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction) if err != nil { xapp.Logger.Error("SubDelFail: Failed to update routing manager. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid) return @@ -533,7 +573,7 @@ func (c *Control) handleSubscriptionDeleteFailure(params *xapp.RMRParams) { xapp.Logger.Info("SubDelFail: Deleting transaction record. SubId: %v, Xid: %s", params.SubId, params.Xid) if c.registry.releaseSequenceNumber(payloadSeqNum) { - transaction, err = c.tracker.completeTransaction(payloadSeqNum, DELETE) + _, err = c.tracker.completeTransaction(payloadSeqNum, DELETE) if err != nil { xapp.Logger.Error("SubDelFail: Failed to delete transaction record. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid) return @@ -545,68 +585,84 @@ func (c *Control) handleSubscriptionDeleteFailure(params *xapp.RMRParams) { return } -func (c *Control) handleSubscriptionDeleteRequestTimer(strId string, nbrId int) { - newSubId := uint16(nbrId) - xapp.Logger.Info("SubDelReq timer expired. newSubId: %v", newSubId) - // var causeContent uint8 = 1 // just some random cause. To be checked later. Should be no respose or something - // var causeVal uint8 = 1 // just some random val. To be checked later. Should be no respose or something - // c.sendSubscriptionDeleteFailure(newSubId, causeContent, causeVal) -} +func (c *Control) handleSubscriptionDeleteRequestTimer(strId string, nbrId int, tryCount uint64) { + subId := uint16(nbrId) + xapp.Logger.Info("handleSubDelTimer: SubDelReq timer expired. subId: %v, tryCount: %v", subId, tryCount) -/* -func (c *Control) sendSubscriptionDeleteFailure(subId uint16, causeContent uint8, causeVal uint8) { - transaction, err := c.tracker.completeTransaction(subId, DELETE) + transaction, responseReceived, err := c.tracker.CheckResponseReceived(subId, DELETE) if err != nil { - xapp.Logger.Error("SendSubDelFail: Failed to delete transaction record. Err: %v, newSubId: %v", err, subId) + xapp.Logger.Info("handleSubTimer: Dropping this timer action. Err: %v SubId: %v", err, subId) return } - xapp.Logger.Info("SendSubDelFail: SubId: %v, Xid %v, Meid: %v",subId, transaction.OrigParams.Xid, transaction.OrigParams.Meid) - - var params xapp.RMRParams - params.Mtype = 12022 //xapp.RICMessageTypes["RIC_SUB_DEL_FAILURE"] - params.SubId = int(subId) - params.Meid = transaction.OrigParams.Meid - params.Xid = transaction.OrigParams.Xid -// newPayload, packErr := c.e2ap.PackSubscriptionDeleteFailure(transaction.OrigParams.Payload, subId, causeContent, causeVal) -// if packErr != nil { -// xapp.Logger.Error("SendSubDelFail: PackSubscriptionDeleteFailure(). Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)) -// return -// } + if responseReceived == true { + // Subscription Delete Response or Failure already received + return + } - newPayload := []byte("40CA4018000003EA7E00050000010016EA6300020021EA74000200C0") // Temporary solution + if tryCount < maxSubDelReqTryCount { + xapp.Logger.Info("handleSubDelTimer: Resending SubDelReq to E2T: Mtype: %v, SubId: %v, Xid %s, Meid %v", transaction.OrigParams.Mtype, transaction.OrigParams.SubId, transaction.OrigParams.Xid, transaction.OrigParams.Meid) + // Set possible to handle new response for the subId + err = c.tracker.RetryTransaction(subId, DELETE) + if err != nil { + xapp.Logger.Error("handleSubDelTimer: Failed to retry transaction record. Dropping timer action. Err %v, SubId: %v", err, transaction.OrigParams.SubId) + return + } - params.PayloadLen = len(newPayload) - params.Payload = newPayload + err = c.rmrSend(transaction.OrigParams) + if err != nil { + xapp.Logger.Error("handleSubDelTimer: Failed to send request to E2T %v, SubId: %v, Xid: %s", err, transaction.OrigParams.SubId, transaction.OrigParams.Xid) + } - xapp.Logger.Info("SendSubDelFail: Forwarding failure to xApp: Mtype: %v, SubId: %v, Xid: %s, Meid: %v",params.Mtype, params.SubId, params.Xid, params.Meid) - err = c.rmrReplyToSender(¶ms) - if err != nil { - xapp.Logger.Error("SendSubDelFail: Failed to send response to xApp: Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid) + tryCount++ + c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(subId), subReqTime, tryCount, c.handleSubscriptionDeleteRequestTimer) + return } - time.Sleep(3 * time.Second) + var params xapp.RMRParams + if transaction.ForwardRespToXapp == true { + var subDelRespPayload []byte + subDelRespPayload, err = c.e2ap.PackSubscriptionDeleteResponse(transaction.OrigParams.Payload, subId) + if err != nil { + xapp.Logger.Error("handleSubDelTimer: Unable to pack payload. Dropping this timer action. Err: %v, SubId: %v, Xid: %s, Payload %x", err, subId, transaction.OrigParams.Xid, transaction.OrigParams.Payload) + return + } + + params.Mtype = 12021 // RIC SUBSCRIPTION DELETE RESPONSE + params.SubId = int(subId) + params.Meid = transaction.OrigParams.Meid + params.Xid = transaction.OrigParams.Xid + params.Src = transaction.OrigParams.Src + params.PayloadLen = len(subDelRespPayload) + params.Payload = subDelRespPayload + params.Mbuf = nil + + xapp.Logger.Info("handleSubDelTimer: Sending SubDelResp to xApp: Mtype: %v, SubId: %v, Xid: %s, Meid: %v", params.Mtype, params.SubId, params.Xid, params.Meid) + err = c.rmrReplyToSender(¶ms) + if err != nil { + xapp.Logger.Error("handleSubDelTimer: Failed to send response to xApp: Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid) + } - xapp.Logger.Info("SendSubDelFail: SubId: %v, from address: %v:%v. Deleting transaction record", int(subId), transaction.Xappkey.Addr, transaction.Xappkey.Port) + time.Sleep(3 * time.Second) + } - xapp.Logger.Info("SendSubDelFail: Starting routing manager update. SubId: %v, Xid: %s", params.SubId, params.Xid) + xapp.Logger.Info("handleSubDelTimer: Starting routing manager update. SubId: %v, Xid: %s", subId, params.Xid) subRouteAction := SubRouteInfo{DELETE, transaction.Xappkey.Addr, transaction.Xappkey.Port, subId} err = c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction) if err != nil { - xapp.Logger.Error("SendSubDelFail: Failed to update routing manager. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid) + xapp.Logger.Error("handleSubDelTimer: Failed to update routing manager. Err: %v, SubId: %v, Xid: %s", err, subId, params.Xid) return } - xapp.Logger.Info("SendSubDelFail: Deleting transaction record. SubId: %v, Xid: %s", params.SubId, params.Xid) + xapp.Logger.Info("handleSubDelTimer: Deleting transaction record. SubId: %v, Xid: %s", subId, params.Xid) if c.registry.releaseSequenceNumber(subId) { - transaction, err = c.tracker.completeTransaction(subId, DELETE) + _, err = c.tracker.completeTransaction(subId, DELETE) if err != nil { - xapp.Logger.Error("SendSubDelFail: Failed to delete transaction record. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid) + xapp.Logger.Error("handleSubDelTimer: Failed to delete transaction record. Err: %v, SubId: %v, Xid: %s", err, subId, params.Xid) return } } else { - xapp.Logger.Error("SendSubDelFail: Failed to release sequency number. SubId: %v, Xid: %s", params.SubId, params.Xid) + xapp.Logger.Error("handleSubDelTimer: Failed to release sequency number. SubId: %v, Xid: %s", subId, params.Xid) } return } -*/ diff --git a/pkg/control/e2ap.go b/pkg/control/e2ap.go index 209b13c..084c7eb 100644 --- a/pkg/control/e2ap.go +++ b/pkg/control/e2ap.go @@ -28,9 +28,14 @@ import "C" import ( "fmt" + "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap" + "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap_wrapper" + "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/packer" "unsafe" ) +var packerif e2ap.E2APPackerIf = e2ap_wrapper.NewAsn1E2Packer() + type E2ap struct { } @@ -41,28 +46,28 @@ func (c *E2ap) GetSubscriptionRequestSequenceNumber(payload []byte) (subId uint1 cptr := unsafe.Pointer(&payload[0]) cret := C.e2ap_get_ric_subscription_request_sequence_number(cptr, C.size_t(len(payload))) if cret < 0 { - return 0, fmt.Errorf("e2ap wrapper is unable to get Subscirption Request Sequence Number due to wrong or invalid payload. ErrorCode: %v", cret) + return 0, fmt.Errorf("e2ap wrapper is unable to get Subscirption Request Sequence Number due to wrong or invalid payload. Erroxappde: %v", cret) } subId = uint16(cret) return } -// Used by submgr, rco test stub +// Used by submgr, xapp test stub func (c *E2ap) SetSubscriptionRequestSequenceNumber(payload []byte, newSubscriptionid uint16) (err error) { cptr := unsafe.Pointer(&payload[0]) size := C.e2ap_set_ric_subscription_request_sequence_number(cptr, C.size_t(len(payload)), C.long(newSubscriptionid)) if size < 0 { - return fmt.Errorf("e2ap wrapper is unable to set Subscription Request Sequence Number due to wrong or invalid payload. ErrorCode: %v", size) + return fmt.Errorf("e2ap wrapper is unable to set Subscription Request Sequence Number due to wrong or invalid payload. Erroxappde: %v", size) } return } -// Used by submgr, rco test stub +// Used by submgr, xapp test stub func (c *E2ap) GetSubscriptionResponseSequenceNumber(payload []byte) (subId uint16, err error) { cptr := unsafe.Pointer(&payload[0]) cret := C.e2ap_get_ric_subscription_response_sequence_number(cptr, C.size_t(len(payload))) if cret < 0 { - return 0, fmt.Errorf("e2ap wrapper is unable to get Subscirption Response Sequence Number due to wrong or invalid payload. ErrorCode: %v", cret) + return 0, fmt.Errorf("e2ap wrapper is unable to get Subscirption Response Sequence Number due to wrong or invalid payload. Erroxappde: %v", cret) } subId = uint16(cret) return @@ -73,7 +78,7 @@ func (c *E2ap) SetSubscriptionResponseSequenceNumber(payload []byte, newSubscrip cptr := unsafe.Pointer(&payload[0]) size := C.e2ap_set_ric_subscription_response_sequence_number(cptr, C.size_t(len(payload)), C.long(newSubscriptionid)) if size < 0 { - return fmt.Errorf("e2ap wrapper is unable to set Subscription Response Sequence Number due to wrong or invalid payload. ErrorCode: %v", size) + return fmt.Errorf("e2ap wrapper is unable to set Subscription Response Sequence Number due to wrong or invalid payload. Erroxappde: %v", size) } return } @@ -85,30 +90,30 @@ func (c *E2ap) GetSubscriptionDeleteRequestSequenceNumber(payload []byte) (subId cptr := unsafe.Pointer(&payload[0]) cret := C.e2ap_get_ric_subscription_delete_request_sequence_number(cptr, C.size_t(len(payload))) if cret < 0 { - return 0, fmt.Errorf("e2ap wrapper is unable to get Subscirption Delete Request Sequence Number due to wrong or invalid payload. ErrorCode: %v", cret) + return 0, fmt.Errorf("e2ap wrapper is unable to get Subscirption Delete Request Sequence Number due to wrong or invalid payload. Erroxappde: %v", cret) } subId = uint16(cret) return } -// Used by rco test stub +// Used by xapp test stub func (c *E2ap) SetSubscriptionDeleteRequestSequenceNumber(payload []byte, newSubscriptionid uint16) (err error) { cptr := unsafe.Pointer(&payload[0]) size := C.e2ap_set_ric_subscription_delete_request_sequence_number(cptr, C.size_t(len(payload)), C.long(newSubscriptionid)) if size < 0 { - return fmt.Errorf("e2ap wrapper is unable to set Subscription Delete Request Sequence Number due to wrong or invalid payload. ErrorCode: %v", size) + return fmt.Errorf("e2ap wrapper is unable to set Subscription Delete Request Sequence Number due to wrong or invalid payload. Erroxappde: %v", size) } return } /* RICsubscriptionDeleteResponse */ -// Used by submgr, rco test stub +// Used by submgr, e2t test stub func (c *E2ap) GetSubscriptionDeleteResponseSequenceNumber(payload []byte) (subId uint16, err error) { cptr := unsafe.Pointer(&payload[0]) cret := C.e2ap_get_ric_subscription_delete_response_sequence_number(cptr, C.size_t(len(payload))) if cret < 0 { - return 0, fmt.Errorf("e2ap wrapper is unable to get Subscirption Delete Response Sequence Number due to wrong or invalid payload. ErrorCode: %v", cret) + return 0, fmt.Errorf("e2ap wrapper is unable to get Subscirption Delete Response Sequence Number due to wrong or invalid payload. Erroxappde: %v", cret) } subId = uint16(cret) return @@ -119,7 +124,7 @@ func (c *E2ap) SetSubscriptionDeleteResponseSequenceNumber(payload []byte, newSu cptr := unsafe.Pointer(&payload[0]) size := C.e2ap_set_ric_subscription_delete_response_sequence_number(cptr, C.size_t(len(payload)), C.long(newSubscriptionid)) if size < 0 { - return fmt.Errorf("e2ap wrapper is unable to set Subscription Delete Response Sequence Number due to wrong or invalid payload. ErrorCode: %v", size) + return fmt.Errorf("e2ap wrapper is unable to set Subscription Delete Response Sequence Number due to wrong or invalid payload. Erroxappde: %v", size) } return } @@ -131,18 +136,18 @@ func (c *E2ap) GetSubscriptionFailureSequenceNumber(payload []byte) (subId uint1 cptr := unsafe.Pointer(&payload[0]) cret := C.e2ap_get_ric_subscription_failure_sequence_number(cptr, C.size_t(len(payload))) if cret < 0 { - return 0, fmt.Errorf("e2ap wrapper is unable to get Subscirption Failure Sequence Number due to wrong or invalid payload. ErrorCode: %v", cret) + return 0, fmt.Errorf("e2ap wrapper is unable to get Subscirption Failure Sequence Number due to wrong or invalid payload. Erroxappde: %v", cret) } subId = uint16(cret) return } -// Used by submgr +// Used by e2t test stub func (c *E2ap) SetSubscriptionFailureSequenceNumber(payload []byte, newSubscriptionid uint16) (err error) { cptr := unsafe.Pointer(&payload[0]) size := C.e2ap_set_ric_subscription_failure_sequence_number(cptr, C.size_t(len(payload)), C.long(newSubscriptionid)) if size < 0 { - return fmt.Errorf("e2ap wrapper is unable to set Subscription Failure Sequence Number due to wrong or invalid payload. ErrorCode: %v", size) + return fmt.Errorf("e2ap wrapper is unable to set Subscription Failure Sequence Number due to wrong or invalid payload. Erroxappde: %v", size) } return } @@ -154,7 +159,7 @@ func (c *E2ap) GetSubscriptionDeleteFailureSequenceNumber(payload []byte) (subId cptr := unsafe.Pointer(&payload[0]) cret := C.e2ap_get_ric_subscription_delete_failure_sequence_number(cptr, C.size_t(len(payload))) if cret < 0 { - return 0, fmt.Errorf("e2ap wrapper is unable to get Subscirption Delete Failure Sequence Number due to wrong or invalid payload. ErrorCode: %v", cret) + return 0, fmt.Errorf("e2ap wrapper is unable to get Subscirption Delete Failure Sequence Number due to wrong or invalid payload. Erroxappde: %v", cret) } subId = uint16(cret) return @@ -165,7 +170,67 @@ func (c *E2ap) SetSubscriptionDeleteFailureSequenceNumber(payload []byte, newSub cptr := unsafe.Pointer(&payload[0]) size := C.e2ap_set_ric_subscription_delete_failure_sequence_number(cptr, C.size_t(len(payload)), C.long(newSubscriptionid)) if size < 0 { - return fmt.Errorf("e2ap wrapper is unable to set Subscription Delete Failure Sequence Number due to wrong or invalid payload. ErrorCode: %v", size) + return fmt.Errorf("e2ap wrapper is unable to set Subscription Delete Failure Sequence Number due to wrong or invalid payload. Erroxappde: %v", size) } return } + +// Used by submgr +func (c *E2ap) PackSubscriptionDeleteResponse(payload []byte, newSubscriptionid uint16) (newPayload []byte, err error) { + e2SubDelReq := packerif.NewPackerSubscriptionDeleteRequest() + packedData := &packer.PackedData{} + packedData.Buf = payload + err = e2SubDelReq.UnPack(packedData) + if err != nil { + return make([]byte, 0), fmt.Errorf("PackSubDelResp: UnPack() failed: %s", err.Error()) + } + getErr, subDelReq := e2SubDelReq.Get() + if getErr != nil { + return make([]byte, 0), fmt.Errorf("PackSubDelResp: Get() failed: %s", getErr.Error()) + } + + e2SubDelResp := packerif.NewPackerSubscriptionDeleteResponse() + subDelResp := e2ap.E2APSubscriptionDeleteResponse{} + subDelResp.RequestId.Id = subDelReq.RequestId.Id + subDelResp.RequestId.Seq = uint32(newSubscriptionid) + subDelResp.FunctionId = subDelReq.FunctionId + err = e2SubDelResp.Set(&subDelResp) + if err != nil { + return make([]byte, 0), fmt.Errorf("PackSubDelResp: Set() failed: %s", err.Error()) + } + err, packedData = e2SubDelResp.Pack(nil) + if err != nil { + return make([]byte, 0), fmt.Errorf("PackSubDelResp: Pack() failed: %s", err.Error()) + } + return packedData.Buf, nil +} + +// Used by submgr +func (c *E2ap) PackSubscriptionDeleteRequest(payload []byte, newSubscriptionid uint16) (newPayload []byte, err error) { + e2SubReq := packerif.NewPackerSubscriptionRequest() + packedData := &packer.PackedData{} + packedData.Buf = payload + err = e2SubReq.UnPack(packedData) + if err != nil { + return make([]byte, 0), fmt.Errorf("PackSubDelReq: UnPack() failed: %s", err.Error()) + } + getErr, subReq := e2SubReq.Get() + if getErr != nil { + return make([]byte, 0), fmt.Errorf("PackSubDelReq: Get() failed: %s", getErr.Error()) + } + + e2SubDel := packerif.NewPackerSubscriptionDeleteRequest() + subDelReq := e2ap.E2APSubscriptionDeleteRequest{} + subDelReq.RequestId.Id = subReq.RequestId.Id + subDelReq.RequestId.Seq = uint32(newSubscriptionid) + subDelReq.FunctionId = subReq.FunctionId + err = e2SubDel.Set(&subDelReq) + if err != nil { + return make([]byte, 0), fmt.Errorf("PackSubDelReq: Set() failed: %s", err.Error()) + } + err, packedData = e2SubDel.Pack(nil) + if err != nil { + return make([]byte, 0), fmt.Errorf("PackSubDelReq: Pack() failed: %s", err.Error()) + } + return packedData.Buf, nil +} diff --git a/pkg/control/timer.go b/pkg/control/timer.go old mode 100644 new mode 100755 index c8385a6..07327b4 --- a/pkg/control/timer.go +++ b/pkg/control/timer.go @@ -21,13 +21,15 @@ Timer takes four parameters: 1) strId string 'string format timerMap key' 2) nbrId int 'numeric format timerMap key' 3) timerDuration time.Duration 'timer duration' - 4) timerFunction func(string, int) 'function to be executed when timer expires' + 4) tryCount uint64 'tryCount' + 5) timerFunction func(string, int) 'function to be executed when timer expires' Timer function is put inside in-build time.AfterFunc() Go function, where it is run inside own Go routine when the timer expires. Timer are two key values. Both are used always, but the other one can be left - "empty", i.e. strId = "" or nbrId = 0. Fourth parameter, the timer function is bare function name without - any function parameters and parenthesis! Filling first parameter strId with related name can improve - code readability and robustness, even the numeric Id would be enough from functionality point of view. + "empty", i.e. strId = "" or nbrId = 0. Fourth parameter is for tryCount. Fifth parameter, the timer + function is bare function name without any function parameters and parenthesis! Filling first parameter + strId with related name can improve code readability and robustness, even the numeric Id would be enough + from functionality point of view. TimerStart() function starts the timer. If TimerStart() function is called again with same key values while earlier started timer is still in the timerMap, i.e. it has not been stopped or the timer has not @@ -52,24 +54,45 @@ Timer takes four parameters: 1) subReqTime := 2 * time.Second subId := 123 - timerMap.StartTimer("RIC_SUB_REQ", int(subId), subReqTime, handleSubscriptionRequestTimer) + var tryCount uint64 = 1 + timerMap.StartTimer("RIC_SUB_REQ", int(subId), subReqTime, FirstTry, handleSubscriptionRequestTimer) timerMap.StopTimer("RIC_SUB_REQ", int(subId)) + + StartTimer() retry example. 2) + subReqTime := 2 * time.Second + subId := 123 + var tryCount uint64 = 1 + timerMap.StartTimer("RIC_SUB_REQ", int(subId), subReqTime, FirstTry, handleSubscriptionRequestTimer) + timerMap.StopTimer("RIC_SUB_REQ", int(subId)) + + 3) subReqTime := 2 * time.Second strId := "1UHSUwNqxiVgUWXvC4zFaatpZFF" - timerMap.StartTimer(strId, 0, subReqTime, handleSubscriptionRequestTimer) + var tryCount uint64 = 1 + timerMap.StartTimer(strId, 0, subReqTime, FirstTry, handleSubscriptionRequestTimer) timerMap.StopTimer(strId, 0) - 3) + 4) subReqTime := 2 * time.Second strId := "1UHSUwNqxiVgUWXvC4zFaatpZFF" - timerMap.StartTimer(RIC_SUB_REQ_" + strId, 0, subReqTime, handleSubscriptionRequestTimer) + var tryCount uint64 = 1 + timerMap.StartTimer(RIC_SUB_REQ_" + strId, 0, subReqTime, FirstTry, handleSubscriptionRequestTimer) timerMap.timerMap.StopTimer("RIC_SUB_REQ_" + strId, 0) Timer function example. This is run if any of the above started timer expires. - func handleSubscriptionRequestTimer1(strId string, nbrId int) { - fmt.Printf("Subscription Request timer expired. Name: %v, SubId: %v\n",strId, nbrId) + func handleSubscriptionRequestTimer1(strId string, nbrId int, tryCount uint64) { + fmt.Printf("Subscription Request timer expired. Name: %v, SubId: %v, tryCount: %v\n",strId, nbrId, tryCount) + ... + + // Retry + .... + + tryCount++ + timerMap.StartTimer("RIC_SUB_REQ", int(subId), subReqTime, tryCount, handleSubscriptionRequestTimer) + ... + } */ @@ -81,6 +104,8 @@ import ( "time" ) +const FirstTry = 1 + type TimerKey struct { strId string nbrId int @@ -101,7 +126,7 @@ func (t *TimerMap) Init() { t.timer = make(map[TimerKey]TimerInfo) } -func (t *TimerMap) StartTimer(strId string, nbrId int, expireAfterTime time.Duration, timerFunction func(srtId string, nbrId int)) bool { +func (t *TimerMap) StartTimer(strId string, nbrId int, expireAfterTime time.Duration, tryCount uint64, timerFunction func(srtId string, nbrId int, tryCount uint64)) bool { t.mutex.Lock() defer t.mutex.Unlock() if timerFunction == nil { @@ -119,9 +144,9 @@ func (t *TimerMap) StartTimer(strId string, nbrId int, expireAfterTime time.Dura delete(t.timer, timerKey) } - // Store in timerMap in-build Go "timer", timer function executor, and the function to be executed when the timer expires + // Store in timerMap in-build Go "timer", timer function executor and the function to be executed when the timer expires t.timer[timerKey] = TimerInfo{timerAddress: time.AfterFunc(expireAfterTime, func() { t.timerFunctionExecutor(strId, nbrId) }), - timerFunctionAddress: func() { timerFunction(strId, nbrId) }} + timerFunctionAddress: func() { timerFunction(strId, nbrId, tryCount) }} return true } diff --git a/pkg/control/tracker.go b/pkg/control/tracker.go index 65f816e..584b331 100644 --- a/pkg/control/tracker.go +++ b/pkg/control/tracker.go @@ -37,10 +37,12 @@ type TransactionXappKey struct { } type Transaction struct { - tracker *Tracker // tracker instance - Key TransactionKey // action key - Xappkey TransactionXappKey // transaction key - OrigParams *xapp.RMRParams // request orginal params + tracker *Tracker // tracker instance + Key TransactionKey // action key + Xappkey TransactionXappKey // transaction key + OrigParams *xapp.RMRParams // request orginal params + RespReceived bool + ForwardRespToXapp bool } func (t *Transaction) SubRouteInfo() SubRouteInfo { @@ -65,10 +67,10 @@ func (t *Tracker) Init() { Checks if a tranascation with similar type has been ongoing. If not then creates one. Returns error if there is similar transatcion ongoing. */ -func (t *Tracker) TrackTransaction(subID uint16, act Action, addr string, port uint16, params *xapp.RMRParams) (*Transaction, error) { +func (t *Tracker) TrackTransaction(subID uint16, act Action, addr string, port uint16, params *xapp.RMRParams, respReceived bool, forwardRespToXapp bool) (*Transaction, error) { key := TransactionKey{subID, act} xappkey := TransactionXappKey{addr, port, params.Xid} - trans := &Transaction{t, key, xappkey, params} + trans := &Transaction{t, key, xappkey, params, respReceived, forwardRespToXapp} t.mutex.Lock() defer t.mutex.Unlock() if _, ok := t.transactionTable[key]; ok { @@ -87,7 +89,7 @@ func (t *Tracker) TrackTransaction(subID uint16, act Action, addr string, port u } /* -Retreives the transaction table entry for the given request. +Retreives the transaction table entry for the given request. Controls that only one response is sent to xapp. Returns error in case the transaction cannot be found. */ func (t *Tracker) RetriveTransaction(subID uint16, act Action) (*Transaction, error) { @@ -119,3 +121,39 @@ func (t *Tracker) completeTransaction(subID uint16, act Action) (*Transaction, e err := fmt.Errorf("transaction record for Subscription ID %d and action %s does not exist", subID, act) return nil, err } + +/* +Makes possible to to detect has response already received from BTS +Returns error in case the transaction cannot be found. +*/ +func (t *Tracker) CheckResponseReceived(subID uint16, act Action) (*Transaction, bool, error) { + key := TransactionKey{subID, act} + t.mutex.Lock() + defer t.mutex.Unlock() + if trans, ok := t.transactionTable[key]; ok { + if trans.RespReceived == false { + trans.RespReceived = true + // This is used to control that only one response action (success response, failure or timer) is excecuted for the transaction + return trans, false, nil + } + return trans, true, nil + } + err := fmt.Errorf("transaction record for Subscription ID %d and action %s does not exist", subID, act) + return nil, false, err +} + +/* +Makes possible to receive response to retransmitted request to BTS +Returns error in case the transaction cannot be found. +*/ +func (t *Tracker) RetryTransaction(subID uint16, act Action) error { + key := TransactionKey{subID, act} + t.mutex.Lock() + defer t.mutex.Unlock() + if trans, ok := t.transactionTable[key]; ok { + trans.RespReceived = false + return nil + } + err := fmt.Errorf("transaction record for Subscription ID %d and action %s does not exist", subID, act) + return err +} -- 2.16.6