X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=pkg%2Fcontrol%2Fcontrol.go;h=9ce34a07dc51596b3b07d56f6e09d2423e5db9fc;hb=47b842bf6afc45313a0edadc78f87bff06ddf2b4;hp=d5a92b64e1e322d6e6b924e1bd12d1cf9e688954;hpb=bf2f4122f6bf89fc572f6c4cad5f0c4108a996e0;p=ric-plt%2Fsubmgr.git diff --git a/pkg/control/control.go b/pkg/control/control.go index d5a92b6..9ce34a0 100755 --- a/pkg/control/control.go +++ b/pkg/control/control.go @@ -44,6 +44,7 @@ type Control struct { tracker *Tracker timerMap *TimerMap rmrSendMutex sync.Mutex + msgCounter uint64 } type RMRMeid struct { @@ -98,6 +99,7 @@ func NewControl() *Control { rtmgrClient: &rtmgrClient, tracker: tracker, timerMap: timerMap, + msgCounter: 0, } } @@ -130,6 +132,7 @@ func (c *Control) rmrReplyToSender(params *xapp.RMRParams) (err error) { } func (c *Control) Consume(msg *xapp.RMRParams) (err error) { + c.msgCounter++ switch msg.Mtype { case xapp.RICMessageTypes["RIC_SUB_REQ"]: go c.handleSubscriptionRequest(msg) @@ -155,35 +158,34 @@ func (c *Control) handleSubscriptionRequest(params *xapp.RMRParams) { params.Mbuf = nil /* Reserve a sequence number and set it in the payload */ - newSubId, isIdValid := c.registry.ReserveSequenceNumber() - if isIdValid != true { + subs := c.registry.ReserveSubscription() + if subs == nil { xapp.Logger.Error("SubReq: Failed to reserve sequence number. Dropping this msg. SubId: %v, Xid: %s", params.SubId, params.Xid) - c.registry.releaseSequenceNumber(newSubId) return } - params.SubId = int(newSubId) - err := c.e2ap.SetSubscriptionRequestSequenceNumber(params.Payload, newSubId) + params.SubId = int(subs.Seq) + err := c.e2ap.SetSubscriptionRequestSequenceNumber(params.Payload, subs.Seq) if err != nil { xapp.Logger.Error("SubReq: Unable to get Sequence Number from Payload. Dropping this msg. Err: %v, SubId: %v, Xid: %s, Payload %X", err, params.SubId, params.Xid, params.Payload) - c.registry.releaseSequenceNumber(newSubId) + c.registry.releaseSequenceNumber(subs.Seq) return } srcAddr, srcPort, err := c.rtmgrClient.SplitSource(params.Src) if err != nil { xapp.Logger.Error("SubReq: Failed to update routing-manager. Dropping this msg. Err: %s, SubId: %v, Xid: %s", err, params.SubId, params.Xid) - c.registry.releaseSequenceNumber(newSubId) + c.registry.releaseSequenceNumber(subs.Seq) return } // Create transatcion record for every subscription request var forwardRespToXapp bool = true var responseReceived bool = false - transaction, err := c.tracker.TrackTransaction(newSubId, CREATE, *srcAddr, *srcPort, params, responseReceived, forwardRespToXapp) + transaction, err := c.tracker.TrackTransaction(subs.Seq, CREATE, *srcAddr, *srcPort, params, responseReceived, forwardRespToXapp) if err != nil { xapp.Logger.Error("SubReq: Failed to create transaction record. Dropping this msg. Err: %v SubId: %v, Xid: %s", err, params.SubId, params.Xid) - c.registry.releaseSequenceNumber(newSubId) + c.registry.releaseSequenceNumber(subs.Seq) return } @@ -194,7 +196,7 @@ func (c *Control) handleSubscriptionRequest(params *xapp.RMRParams) { err = c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction) if err != nil { xapp.Logger.Error("SubReq: Failed to update routing manager. Dropping this msg. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid) - c.registry.releaseSequenceNumber(newSubId) + c.registry.releaseSequenceNumber(subs.Seq) return } @@ -204,7 +206,7 @@ func (c *Control) handleSubscriptionRequest(params *xapp.RMRParams) { if err != nil { xapp.Logger.Error("SubReq: Failed to send request to E2T %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid) } - c.timerMap.StartTimer("RIC_SUB_REQ", int(newSubId), subReqTime, FirstTry, c.handleSubscriptionRequestTimer) + c.timerMap.StartTimer("RIC_SUB_REQ", int(subs.Seq), subReqTime, FirstTry, c.handleSubscriptionRequestTimer) xapp.Logger.Debug("SubReq: Debugging transaction table = %v", c.tracker.transactionTable) return } @@ -421,13 +423,13 @@ func (c *Control) handleSubscriptionDeleteRequest(params *xapp.RMRParams) { xapp.Logger.Info("SubDelReq: Received payloadSeqNum: %v", payloadSeqNum) if c.registry.IsValidSequenceNumber(payloadSeqNum) { - c.registry.deleteSubscription(payloadSeqNum) var forwardRespToXapp bool = true _, err = c.trackDeleteTransaction(params, payloadSeqNum, forwardRespToXapp) if err != nil { xapp.Logger.Error("SubDelReq: Failed to create transaction record. Dropping this msg. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid) return } + c.registry.setSubscriptionToUnConfirmed(payloadSeqNum) } else { xapp.Logger.Error("SubDelReq: Not valid sequence number. Dropping this msg. SubId: %v, Xid: %s", params.SubId, params.Xid) return