From: Anssi Mannila Date: Thu, 12 Dec 2019 08:47:47 +0000 (+0200) Subject: RICPLT-2911 Add missing error branches and do some cleanup. X-Git-Tag: 0.4.0~64 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=90fa02136dc7e56db75ce353e69571eb3cb0eb69;p=ric-plt%2Fsubmgr.git RICPLT-2911 Add missing error branches and do some cleanup. Change-Id: If5260eac94d2a60320811b7dc0ebc19f0969ba11 Signed-off-by: Anssi Mannila --- diff --git a/pkg/control/control.go b/pkg/control/control.go index d27844e..8a06174 100644 --- a/pkg/control/control.go +++ b/pkg/control/control.go @@ -30,13 +30,10 @@ import ( "github.com/go-openapi/strfmt" "github.com/spf13/viper" "math/rand" - "strconv" "time" "sync" ) -var rmrSendMutex = &sync.Mutex{} - var subReqTime time.Duration = 2 * time.Second var SubDelReqTime time.Duration = 2 * time.Second @@ -45,8 +42,8 @@ type Control struct { registry *Registry rtmgrClient *RtmgrClient tracker *Tracker - rcChan chan *xapp.RMRParams timerMap *TimerMap + rmrSendMutex *sync.Mutex } type RMRMeid struct { @@ -60,11 +57,12 @@ var seedSN uint16 const ( CREATE Action = 0 MERGE Action = 1 + NONE Action = 2 DELETE Action = 3 ) func init() { - xapp.Logger.Info("SUBMGR /ric-plt-submgr:r3-test-v2") + xapp.Logger.Info("SUBMGR /ric-plt-submgr:r3-test-v4") viper.AutomaticEnv() viper.SetEnvPrefix("submgr") viper.AllowEmptyEnv(true) @@ -89,32 +87,28 @@ func NewControl() Control { timerMap := new(TimerMap) timerMap.Init() + rmrSendMutex := &sync.Mutex{} + transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"}) client := rtmgrclient.New(transport, strfmt.Default) handle := rtmgrhandle.NewProvideXappSubscriptionHandleParamsWithTimeout(10 * time.Second) deleteHandle := rtmgrhandle.NewDeleteXappSubscriptionHandleParamsWithTimeout(10 * time.Second) rtmgrClient := RtmgrClient{client, handle, deleteHandle} - return Control{new(E2ap), registry, &rtmgrClient, tracker, make(chan *xapp.RMRParams),timerMap} + return Control{new(E2ap), registry, &rtmgrClient, tracker, timerMap, rmrSendMutex} } func (c *Control) Run() { - go c.controlLoop() xapp.Run(c) } -func (c *Control) Consume(rp *xapp.RMRParams) (err error) { - c.rcChan <- rp - return -} - func (c *Control) rmrSend(params *xapp.RMRParams) (err error) { status := false i := 1 for ; i <= 10 && status == false; i++ { - rmrSendMutex.Lock() + c.rmrSendMutex.Lock() status = xapp.Rmr.Send(params, false) - rmrSendMutex.Unlock() + c.rmrSendMutex.Unlock() if status == false { xapp.Logger.Info("rmr.Send() failed. Retry count %v, Mtype: %v, SubId: %v, Xid %s",i, params.Mtype, params.SubId, params.Xid) time.Sleep(500 * time.Millisecond) @@ -124,13 +118,6 @@ func (c *Control) rmrSend(params *xapp.RMRParams) (err error) { err = errors.New("rmr.Send() failed") xapp.Rmr.Free(params.Mbuf) } - - /* - if !xapp.Rmr.Send(params, false) { - err = errors.New("rmr.Send() failed") - xapp.Rmr.Free(params.Mbuf) - } - */ return } @@ -139,28 +126,25 @@ func (c *Control) rmrReplyToSender(params *xapp.RMRParams) (err error) { return } -func (c *Control) controlLoop() { - for { - msg := <-c.rcChan - switch msg.Mtype { - case xapp.RICMessageTypes["RIC_SUB_REQ"]: - go c.handleSubscriptionRequest(msg) - case xapp.RICMessageTypes["RIC_SUB_RESP"]: - go c.handleSubscriptionResponse(msg) - case xapp.RICMessageTypes["RIC_SUB_FAILURE"]: - go c.handleSubscriptionFailure(msg) - case xapp.RICMessageTypes["RIC_SUB_DEL_REQ"]: - go c.handleSubscriptionDeleteRequest(msg) - case xapp.RICMessageTypes["RIC_SUB_DEL_RESP"]: - go c.handleSubscriptionDeleteResponse(msg) - default: - err := errors.New("Message Type " + strconv.Itoa(msg.Mtype) + " is discarded") - xapp.Logger.Error("Unknown message type: %v", err) - } +func (c *Control) Consume(msg *xapp.RMRParams) (err error) { + switch msg.Mtype { + case xapp.RICMessageTypes["RIC_SUB_REQ"]: + go c.handleSubscriptionRequest(msg) + case xapp.RICMessageTypes["RIC_SUB_RESP"]: + go c.handleSubscriptionResponse(msg) + case xapp.RICMessageTypes["RIC_SUB_FAILURE"]: + go c.handleSubscriptionFailure(msg) + case xapp.RICMessageTypes["RIC_SUB_DEL_REQ"]: + go c.handleSubscriptionDeleteRequest(msg) + case xapp.RICMessageTypes["RIC_SUB_DEL_RESP"]: + go c.handleSubscriptionDeleteResponse(msg) + default: + xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype) } + return nil } -func (c *Control) handleSubscriptionRequest(params *xapp.RMRParams) (err error) { +func (c *Control) handleSubscriptionRequest(params *xapp.RMRParams) { xapp.Logger.Info("Subscription Request Received from Src: %s, Mtype: %v, SubId: %v, Xid: %s, Meid: %v",params.Src, params.Mtype, params.SubId, params.Xid, params.Meid) xapp.Rmr.Free(params.Mbuf) params.Mbuf = nil @@ -168,13 +152,13 @@ func (c *Control) handleSubscriptionRequest(params *xapp.RMRParams) (err error) /* Reserve a sequence number and set it in the payload */ newSubId, isIdValid := c.registry.ReserveSequenceNumber() if isIdValid != true { - xapp.Logger.Info("Further processing of this SubscriptionRequest stopped. SubId: %v, Xid: %s",params.SubId, params.Xid) + xapp.Logger.Error("Further processing of this SubscriptionRequest stopped. SubId: %v, Xid: %s",params.SubId, params.Xid) return } - err = c.e2ap.SetSubscriptionRequestSequenceNumber(params.Payload, newSubId) + err := c.e2ap.SetSubscriptionRequestSequenceNumber(params.Payload, newSubId) if err != nil { - err = errors.New("Unable to set Subscription Sequence Number in Payload due to: " + err.Error()) + xapp.Logger.Error("Unable to set Subscription Sequence Number in Payload. Dropping this Subscription Request message. Err: v%, SubId: %v, Xid: %s", err, params.SubId, params.Xid) return } @@ -189,16 +173,18 @@ func (c *Control) handleSubscriptionRequest(params *xapp.RMRParams) (err error) xactValue := Transaction{*srcAddr, *srcPort, params} err = c.tracker.TrackTransaction(xactKey, xactValue) if err != nil { - xapp.Logger.Error("Failed to create a Subscription Request transaction record due to %v", err) + xapp.Logger.Error("Failed to create a Subscription Request transaction record. Err: %v", err) return } /* Update routing manager about the new subscription*/ subRouteAction := SubRouteInfo{CREATE, *srcAddr, *srcPort, newSubId} xapp.Logger.Info("Starting routing manager update") - c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction) - - //time.Sleep(3 * time.Second) + err = c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction) + if err != nil { + xapp.Logger.Error("Failed to update routing manager. Dropping this Subscription Request message. Err: %v. SubId: %v, Xid: %s", err, params.SubId, params.Xid) + return + } // Setting new subscription ID in the RMR header params.SubId = int(newSubId) @@ -213,20 +199,20 @@ func (c *Control) handleSubscriptionRequest(params *xapp.RMRParams) (err error) return } -func (c *Control) handleSubscriptionResponse(params *xapp.RMRParams) (err error) { +func (c *Control) handleSubscriptionResponse(params *xapp.RMRParams) { xapp.Logger.Info("Subscription Response Received from Src: %s, Mtype: %v, SubId: %v, Meid: %v",params.Src, params.Mtype, params.SubId, params.Meid) xapp.Rmr.Free(params.Mbuf) params.Mbuf = nil payloadSeqNum, err := c.e2ap.GetSubscriptionResponseSequenceNumber(params.Payload) if err != nil { - err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error()) + xapp.Logger.Error("Unable to get Subscription Sequence Number from Payload. Dropping this Subscription Responsemessage. Err: v%, SubId: %v, Xid: %s", err, params.SubId, params.Xid) return } xapp.Logger.Info("Received payloadSeqNum: %v",payloadSeqNum) if !c.registry.IsValidSequenceNumber(payloadSeqNum) { - err = errors.New("Unknown Subscription ID: " + strconv.Itoa(int(payloadSeqNum)) + " in Subscritpion Response. Message discarded.") + xapp.Logger.Error("Unknown payloadSeqNum. Dropping this Subscription Response message. PayloadSeqNum: %v, SubId: %v, Xid: %s", payloadSeqNum, params.SubId, params.Xid) return } @@ -236,8 +222,7 @@ func (c *Control) handleSubscriptionResponse(params *xapp.RMRParams) (err error) var transaction Transaction transaction, err = c.tracker.RetriveTransaction(payloadSeqNum, CREATE) if err != nil { - xapp.Logger.Error("Failed to retrive transaction record. Err: %v", err) - xapp.Logger.Info("Further processing of this Subscription Response stopped. SubId: %v, Xid: %s",params.SubId, params.Xid) + xapp.Logger.Error("Failed to retrive transaction record. Dropping this Subscription Response message. Err: v%, SubId: %v, Xid: %s", err, params.SubId, params.Xid) return } xapp.Logger.Info("Subscription ID: %v, from address: %v:%v. Retrieved old subId...", int(payloadSeqNum), transaction.XappInstanceAddress, transaction.XappPort) @@ -260,27 +245,24 @@ func (c *Control) handleSubscriptionResponse(params *xapp.RMRParams) (err error) return } -func (c *Control) handleSubscriptionFailure(params *xapp.RMRParams) (err error) { +func (c *Control) handleSubscriptionFailure(params *xapp.RMRParams) { xapp.Logger.Info("Subscription Failure Received from Src: %s, Mtype: %v, SubId: %v, Meid: %v",params.Src, params.Mtype, params.SubId, params.Meid) xapp.Rmr.Free(params.Mbuf) params.Mbuf = nil payloadSeqNum, err := c.e2ap.GetSubscriptionFailureSequenceNumber(params.Payload) if err != nil { - err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error()) + xapp.Logger.Error("Unable to get Subscription Sequence Number from Payload. Dropping this Subscription Failure message. Err: v%, SubId: %v, Xid: %s", err, params.SubId, params.Xid) return } xapp.Logger.Info("Received payloadSeqNum: %v", payloadSeqNum) - // should here be IsValidSequenceNumber check? - // c.timerMap.StopTimer(payloadSeqNum) var transaction Transaction transaction, err = c.tracker.RetriveTransaction(payloadSeqNum, CREATE) if err != nil { - xapp.Logger.Error("Failed to retrive transaction record. Err %v", err) - xapp.Logger.Info("Further processing of this Subscription Failure stopped. SubId: %v, Xid: %s",params.SubId, params.Xid) + xapp.Logger.Error("Failed to retrive transaction record. Dropping this Subscription Failure message. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid) return } xapp.Logger.Info("Subscription ID: %v, from address: %v:%v. Forwarding response to requestor...", int(payloadSeqNum), transaction.XappInstanceAddress, transaction.XappPort) @@ -298,7 +280,10 @@ func (c *Control) handleSubscriptionFailure(params *xapp.RMRParams) (err error) xapp.Logger.Info("Starting routing manager update") subRouteAction := SubRouteInfo{CREATE, transaction.XappInstanceAddress, transaction.XappPort, payloadSeqNum} - c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction) + err = c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction) + if err != nil { + xapp.Logger.Error("Failed to update routing manager. Err: %v. SubId: %v, Xid: %s", err, params.SubId, params.Xid) + } xapp.Logger.Info("Deleting trancaction record") if c.registry.releaseSequenceNumber(payloadSeqNum) { @@ -307,6 +292,9 @@ func (c *Control) handleSubscriptionFailure(params *xapp.RMRParams) (err error) xapp.Logger.Error("Failed to delete a Subscription Request transaction record due to %v", err) return } + } else { + xapp.Logger.Error("Failed to release sequency number. SubId: %v, Xid: %s", params.SubId, params.Xid) + return } return } @@ -361,6 +349,7 @@ func (act Action) String() string { actions := [...]string{ "CREATE", "MERGE", + "NONE", "DELETE", } @@ -379,26 +368,29 @@ func (act Action) valid() bool { } } -func (c *Control) handleSubscriptionDeleteRequest(params *xapp.RMRParams) (err error) { +func (c *Control) handleSubscriptionDeleteRequest(params *xapp.RMRParams) { xapp.Logger.Info("Subscription Delete Request Received from Src: %s, Mtype: %v, SubId: %v, Xid: %s, Meid: %v",params.Src, params.Mtype, params.SubId, params.Xid, params.Meid) xapp.Rmr.Free(params.Mbuf) params.Mbuf = nil payloadSeqNum, err := c.e2ap.GetSubscriptionDeleteRequestSequenceNumber(params.Payload) if err != nil { - err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error()) + xapp.Logger.Error("Unable to get Subscription Sequence Number from Payload. Dropping this Subscription Delete Request message. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid) return } xapp.Logger.Info("Received payloadSeqNum: %v", payloadSeqNum) if c.registry.IsValidSequenceNumber(payloadSeqNum) { c.registry.deleteSubscription(payloadSeqNum) - trackErr := c.trackDeleteTransaction(params, payloadSeqNum) - if trackErr != nil { - xapp.Logger.Error("Failed to create a Subscription Delete Request transaction record due to %v", trackErr) - return trackErr + err = c.trackDeleteTransaction(params, payloadSeqNum) + if err != nil { + xapp.Logger.Error("Failed to create transaction record. Dropping this Subscription Delete Request message. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid) + return } + } else { + xapp.Logger.Error("Not valid sequence number. Dropping this Subscription Delete Request message. SubId: %v, Xid: %s", params.SubId, params.Xid) + return } - + xapp.Logger.Info("Forwarding Delete Subscription Request to E2T: Mtype: %v, SubId: %v, Xid: %s, Meid: %v",params.Mtype, params.SubId, params.Xid, params.Meid) c.rmrSend(params) if err != nil { @@ -427,19 +419,17 @@ func (c *Control) handleSubscriptionDeleteResponse(params *xapp.RMRParams) (err payloadSeqNum, err := c.e2ap.GetSubscriptionDeleteResponseSequenceNumber(params.Payload) if err != nil { - err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error()) + xapp.Logger.Error("Unable to get Subscription Sequence Number from Payload. Dropping this Subscription Delete Response message. Err: %, SubId: %v, Xid: %s", err, params.SubId, params.Xid) return } xapp.Logger.Info("Received payloadSeqNum: %v", payloadSeqNum) - // should here be IsValidSequenceNumber check? // c.timerMap.StopTimer(payloadSeqNum) var transaction Transaction transaction, err = c.tracker.RetriveTransaction(payloadSeqNum, DELETE) if err != nil { - xapp.Logger.Error("Failed to retrive transaction record. Err %v", err) - xapp.Logger.Info("Further processing of this Subscription Delete Response stopped. SubId: %v, Xid: %s",params.SubId, params.Xid) + xapp.Logger.Error("Failed to retrive transaction record. Dropping this Subscription Delete Response message. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid) return } xapp.Logger.Info("Subscription ID: %v, from address: %v:%v. Forwarding response to requestor...", int(payloadSeqNum), transaction.XappInstanceAddress, transaction.XappPort) @@ -457,7 +447,11 @@ func (c *Control) handleSubscriptionDeleteResponse(params *xapp.RMRParams) (err xapp.Logger.Info("Starting routing manager update") subRouteAction := SubRouteInfo{DELETE, transaction.XappInstanceAddress, transaction.XappPort, payloadSeqNum} - c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction) + err = c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction) + if err != nil { + xapp.Logger.Error("Failed to update routing manager %v. SubId: %v, Xid: %s", err, params.SubId, params.Xid) + return + } xapp.Logger.Info("Deleting trancaction record") if c.registry.releaseSequenceNumber(payloadSeqNum) { @@ -466,30 +460,31 @@ func (c *Control) handleSubscriptionDeleteResponse(params *xapp.RMRParams) (err xapp.Logger.Error("Failed to delete a Subscription Delete Request transaction record due to %v", err) return } + } else { + xapp.Logger.Error("Failed to release sequency number. SubId: %v, Xid: %s", params.SubId, params.Xid) + return } return } -func (c *Control) handleSubscriptionDeleteFailure(params *xapp.RMRParams) (err error) { +func (c *Control) handleSubscriptionDeleteFailure(params *xapp.RMRParams) { xapp.Logger.Info("Subscription Delete Failure Received from Src: %s, Mtype: %v, SubId: %v, Meid: %v",params.Src, params.Mtype, params.SubId, params.Meid) xapp.Rmr.Free(params.Mbuf) params.Mbuf = nil payloadSeqNum, err := c.e2ap.GetSubscriptionDeleteFailureSequenceNumber(params.Payload) if err != nil { - err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error()) + xapp.Logger.Error("Unable to get Subscription Sequence Number from Payload. Dropping this Subscription Delete Failure message. Err: %, SubId: %v, Xid: %s", err, params.SubId, params.Xid) return } xapp.Logger.Info("Received payloadSeqNum: %v", payloadSeqNum) - // should here be IsValidSequenceNumber check? // c.timerMap.StopTimer(payloadSeqNum) var transaction Transaction transaction, err = c.tracker.RetriveTransaction(payloadSeqNum, DELETE) if err != nil { - xapp.Logger.Error("Failed to retrive transaction record. Err %v", err) - xapp.Logger.Info("Further processing of this Subscription Delete Failure stopped. SubId: %v, Xid: %s",params.SubId, params.Xid) + xapp.Logger.Error("Failed to retrive transaction record. Dropping this Subscription Delete Failure message. Err %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid) return } xapp.Logger.Info("Subscription ID: %v, from address: %v:%v. Forwarding response to requestor...", int(payloadSeqNum), transaction.XappInstanceAddress, transaction.XappPort) @@ -508,6 +503,10 @@ func (c *Control) handleSubscriptionDeleteFailure(params *xapp.RMRParams) (err e xapp.Logger.Info("Starting routing manager update") subRouteAction := SubRouteInfo{DELETE, transaction.XappInstanceAddress, transaction.XappPort, payloadSeqNum} c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction) + if err != nil { + xapp.Logger.Error("Failed to update routing manager %v. SubId: %v, Xid: %s", err, params.SubId, params.Xid) + return + } xapp.Logger.Info("Deleting trancaction record") if c.registry.releaseSequenceNumber(payloadSeqNum) { @@ -516,6 +515,9 @@ func (c *Control) handleSubscriptionDeleteFailure(params *xapp.RMRParams) (err e xapp.Logger.Error("Failed to delete a Subscription Delete Request transaction record due to %v", err) return } + } else { + xapp.Logger.Error("Failed to release sequency number. SubId: %v, Xid: %s", err, params.SubId, params.Xid) + return } return }