tracker *Tracker
timerMap *TimerMap
rmrSendMutex sync.Mutex
+ msgCounter uint64
}
type RMRMeid struct {
rtmgrClient: &rtmgrClient,
tracker: tracker,
timerMap: timerMap,
+ msgCounter: 0,
}
}
}
func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
+ c.msgCounter++
switch msg.Mtype {
case xapp.RICMessageTypes["RIC_SUB_REQ"]:
go c.handleSubscriptionRequest(msg)
params.Mbuf = nil
/* Reserve a sequence number and set it in the payload */
- newSubId, isIdValid := c.registry.ReserveSequenceNumber()
- if isIdValid != true {
+ subs := c.registry.ReserveSubscription()
+ if subs == nil {
xapp.Logger.Error("SubReq: Failed to reserve sequence number. Dropping this msg. SubId: %v, Xid: %s", params.SubId, params.Xid)
- c.registry.releaseSequenceNumber(newSubId)
return
}
- params.SubId = int(newSubId)
- err := c.e2ap.SetSubscriptionRequestSequenceNumber(params.Payload, newSubId)
+ params.SubId = int(subs.Seq)
+ err := c.e2ap.SetSubscriptionRequestSequenceNumber(params.Payload, subs.Seq)
if err != nil {
xapp.Logger.Error("SubReq: Unable to get Sequence Number from Payload. Dropping this msg. Err: %v, SubId: %v, Xid: %s, Payload %X", err, params.SubId, params.Xid, params.Payload)
- c.registry.releaseSequenceNumber(newSubId)
+ c.registry.releaseSequenceNumber(subs.Seq)
return
}
srcAddr, srcPort, err := c.rtmgrClient.SplitSource(params.Src)
if err != nil {
xapp.Logger.Error("SubReq: Failed to update routing-manager. Dropping this msg. Err: %s, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
- c.registry.releaseSequenceNumber(newSubId)
+ c.registry.releaseSequenceNumber(subs.Seq)
return
}
// Create transatcion record for every subscription request
var forwardRespToXapp bool = true
var responseReceived bool = false
- transaction, err := c.tracker.TrackTransaction(newSubId, CREATE, *srcAddr, *srcPort, params, responseReceived, forwardRespToXapp)
+ transaction, err := c.tracker.TrackTransaction(subs.Seq, CREATE, *srcAddr, *srcPort, params, responseReceived, forwardRespToXapp)
if err != nil {
xapp.Logger.Error("SubReq: Failed to create transaction record. Dropping this msg. Err: %v SubId: %v, Xid: %s", err, params.SubId, params.Xid)
- c.registry.releaseSequenceNumber(newSubId)
+ c.registry.releaseSequenceNumber(subs.Seq)
return
}
err = c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
if err != nil {
xapp.Logger.Error("SubReq: Failed to update routing manager. Dropping this msg. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
- c.registry.releaseSequenceNumber(newSubId)
+ c.registry.releaseSequenceNumber(subs.Seq)
return
}
if err != nil {
xapp.Logger.Error("SubReq: Failed to send request to E2T %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
}
- c.timerMap.StartTimer("RIC_SUB_REQ", int(newSubId), subReqTime, FirstTry, c.handleSubscriptionRequestTimer)
+ c.timerMap.StartTimer("RIC_SUB_REQ", int(subs.Seq), subReqTime, FirstTry, c.handleSubscriptionRequestTimer)
xapp.Logger.Debug("SubReq: Debugging transaction table = %v", c.tracker.transactionTable)
return
}
xapp.Logger.Info("SubDelReq: Received payloadSeqNum: %v", payloadSeqNum)
if c.registry.IsValidSequenceNumber(payloadSeqNum) {
- c.registry.deleteSubscription(payloadSeqNum)
var forwardRespToXapp bool = true
_, err = c.trackDeleteTransaction(params, payloadSeqNum, forwardRespToXapp)
if err != nil {
xapp.Logger.Error("SubDelReq: Failed to create transaction record. Dropping this msg. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
return
}
+ c.registry.setSubscriptionToUnConfirmed(payloadSeqNum)
} else {
xapp.Logger.Error("SubDelReq: Not valid sequence number. Dropping this msg. SubId: %v, Xid: %s", params.SubId, params.Xid)
return