"os"
"strconv"
"strings"
+ "sync"
"time"
"gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
//
//-------------------------------------------------------------------
func (c *Control) ReadRESTSubscriptions() error {
+
+ xapp.Logger.Debug("ReadRESTSubscriptions()")
var err error
var restSubscriptions map[string]*RESTSubscription
for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
xapp.Logger.Error("%v", err)
<-time.After(1 * time.Second)
} else {
+ // Fix REST subscriptions ongoing status after restart
+ for restSubId, restSubscription := range restSubscriptions {
+ restSubscription.SubReqOngoing = false
+ restSubscription.SubDelReqOngoing = false
+ c.WriteRESTSubscriptionToSdl(restSubId, restSubscription)
+ }
c.registry.restSubscriptions = restSubscriptions
return nil
}
if duplicate {
err := fmt.Errorf("Retransmission blocker direct ACK for request of restSubsId %s restSubId MD5sum %s as retransmission", restSubId, md5sum)
xapp.Logger.Debug("%s", err)
+ c.registry.DeleteRESTSubscription(&restSubId)
c.UpdateCounter(cRestSubRespToXapp)
return &subResp, common.SubscribeCreatedCode
}
e2SubscriptionDirectives, err := c.GetE2SubscriptionDirectives(p)
if err != nil {
xapp.Logger.Error("%s", err)
+ c.registry.DeleteRESTSubscription(&restSubId)
return nil, common.SubscribeBadRequestCode
}
go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId, xAppRmrEndpoint, md5sum, e2SubscriptionDirectives)
clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string, xAppRmrEndpoint string, md5sum string, e2SubscriptionDirectives *E2SubscriptionDirectives) {
c.SubscriptionProcessingStartDelay()
- xapp.Logger.Debug("Subscription Request count=%v ", len(subReqList.E2APSubscriptionRequests))
+ xapp.Logger.Debug("E2 SubscriptionRequest count = %v ", len(subReqList.E2APSubscriptionRequests))
var xAppEventInstanceID int64
var e2EventInstanceID int64
trans.Release()
if err != nil {
+ if err.Error() == "TEST: restart event received" {
+ // This is just for UT cases. Stop here subscription processing
+ return
+ }
c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans, errorInfo)
} else {
e2EventInstanceID = (int64)(subRespMsg.RequestId.InstanceId)
// Wake subs request
//
subs.OngoingReqCount++
- go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives)
+ go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives, 0)
event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
subs.OngoingReqCount--
c.RemoveSubscriptionFromDb(subs)
err = fmt.Errorf("E2 interface down")
errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
- return nil, &errorInfo, err
}
case *e2ap.E2APSubscriptionFailure:
err = fmt.Errorf("E2 SubscriptionFailure received")
errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
- return nil, &errorInfo, err
case *PackSubscriptionRequestErrortEvent:
err = fmt.Errorf("E2 SubscriptionRequest pack failure")
- return nil, &themsg.ErrorInfo, err
+ errorInfo = themsg.ErrorInfo
case *SDLWriteErrortEvent:
err = fmt.Errorf("SDL write failure")
- return nil, &themsg.ErrorInfo, err
+ errorInfo = themsg.ErrorInfo
+ case *SubmgrRestartTestEvent:
+ err = fmt.Errorf("TEST: restart event received")
+ xapp.Logger.Debug("%s", err)
+ return nil, &errorInfo, err
default:
err = fmt.Errorf("Unexpected E2 subscription response received")
errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
}
xapp.Logger.Error("XAPP-SubReq E2 subscription failed %s", idstring(err, trans, subs))
-
c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
return nil, &errorInfo, err
}
c.UpdateCounter(cRestSubFailNotifToXapp)
xapp.Subscription.Notify(resp, *clientEndpoint)
+ // E2 is down. Delete completely processed request safely now
if c.e2IfState.IsE2ConnectionUp(&restSubscription.Meid) == false && restSubscription.SubReqOngoing == false {
c.registry.DeleteRESTSubscription(restSubId)
c.RemoveRESTSubscriptionFromDb(*restSubId)
c.UpdateCounter(cRestSubNotifToXapp)
xapp.Subscription.Notify(resp, *clientEndpoint)
+ // E2 is down. Delete completely processed request safely now
if c.e2IfState.IsE2ConnectionUp(&restSubscription.Meid) == false && restSubscription.SubReqOngoing == false {
c.registry.DeleteRESTSubscription(restSubId)
c.RemoveRESTSubscriptionFromDb(*restSubId)
go func() {
xapp.Logger.Debug("Deleteting handler: processing instances = %v", restSubscription.InstanceIds)
for _, instanceId := range restSubscription.InstanceIds {
- xAppEventInstanceID, err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId)
+ xAppEventInstanceID, err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId, 0)
if err != nil {
xapp.Logger.Error("%s", err.Error())
//-------------------------------------------------------------------
//
//-------------------------------------------------------------------
-func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32) (int64, error) {
+func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32, waitRouteCleanupTime time.Duration) (int64, error) {
var xAppEventInstanceID int64
subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{instanceId})
// Wake subs delete
//
subs.OngoingDelCount++
- go c.handleSubscriptionDelete(subs, trans)
+ go c.handleSubscriptionDelete(subs, trans, waitRouteCleanupTime)
trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
subs.OngoingDelCount--
e2SubscriptionDirectives, _ := c.GetE2SubscriptionDirectives(nil)
subs.OngoingReqCount++
- go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives)
+ go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives, waitRouteCleanup_ms)
event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
subs.OngoingReqCount--
var err error
// Wake subs delete
//
subs.OngoingDelCount++
- go c.handleSubscriptionDelete(subs, trans)
+ go c.handleSubscriptionDelete(subs, trans, waitRouteCleanup_ms)
trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
subs.OngoingDelCount--
//-------------------------------------------------------------------
// SUBS CREATE Handling
//-------------------------------------------------------------------
-func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp, e2SubscriptionDirectives *E2SubscriptionDirectives) {
+func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp, e2SubscriptionDirectives *E2SubscriptionDirectives, waitRouteCleanupTime time.Duration) {
var event interface{} = nil
var removeSubscriptionFromDb bool = false
subRfMsg, valid = subs.SetCachedResponse(event, true)
subs.SubRespRcvd = true
case *e2ap.E2APSubscriptionFailure:
- removeSubscriptionFromDb = true
subRfMsg, valid = subs.SetCachedResponse(event, false)
xapp.Logger.Debug("SUBS-SubReq: internal delete due failure event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
case *SubmgrRestartTestEvent:
- // This simulates that no response has been received and after restart subscriptions are restored from db
+ // This is used to simulate that no response has been received and after restart, subscriptions are restored from db
xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
+ subRfMsg, valid = subs.SetCachedResponse(event, false)
+ parentTrans.SendEvent(subRfMsg, 0)
+ return
case *PackSubscriptionRequestErrortEvent, *SDLWriteErrortEvent:
subRfMsg, valid = subs.SetCachedResponse(event, false)
default:
// Timer expiry
if subs.PolicyUpdate == false {
xapp.Logger.Debug("SUBS-SubReq: internal delete due default event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
- removeSubscriptionFromDb = true
subRfMsg, valid = subs.SetCachedResponse(nil, false)
c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
+ } else {
+ subRfMsg, valid = subs.SetCachedResponse(nil, true)
}
}
xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
} else {
xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
}
+ if valid == false {
+ removeSubscriptionFromDb = true
+ }
err := c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
if err != nil {
- subRfMsg, valid = subs.SetCachedResponse(event, false)
+ valid = false
c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
}
//Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
if valid == false {
- c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
+ c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanupTime, c)
}
parentTrans.SendEvent(subRfMsg, 0)
// SUBS DELETE Handling
//-------------------------------------------------------------------
-func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
+func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp, waitRouteCleanupTime time.Duration) {
trans := c.tracker.NewSubsTransaction(subs)
subs.WaitTransactionTurn(trans)
//Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
// If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if
// RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...))
- c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
- c.registry.UpdateSubscriptionToDb(subs, c)
+ c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanupTime, c)
parentTrans.SendEvent(nil, 0)
}
subReqMsg.RequestId.Id = ricRequestorId
trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
if err != nil {
- xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
+ xapp.Logger.Error("SUBS-SubReq ASN1 pack error: %s", idstring(err, trans, subs, parentTrans))
return &PackSubscriptionRequestErrortEvent{
ErrorInfo{
ErrorSource: models.SubscriptionInstanceErrorSourceASN1,
xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
return
}
+ xapp.Logger.Debug("SUBS-SubResp: Sending event, trans= %v", trans)
sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
if sendOk == false {
err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) {
+ if c.UTTesting == true {
+ // Reqistry mutex is not locked after real restart but it can be when restart is simulated in unit tests
+ c.registry.mutex = new(sync.Mutex)
+ }
+
const ricRequestorId = 123
xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)