Fixes for subscription merge release cases
[ric-plt/submgr.git] / pkg / control / control.go
index 4c09ee2..68688e2 100755 (executable)
@@ -25,6 +25,7 @@ import (
        "os"
        "strconv"
        "strings"
+       "sync"
        "time"
 
        "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
@@ -213,6 +214,8 @@ func (c *Control) ReadE2Subscriptions() error {
 //
 //-------------------------------------------------------------------
 func (c *Control) ReadRESTSubscriptions() error {
+
+       xapp.Logger.Debug("ReadRESTSubscriptions()")
        var err error
        var restSubscriptions map[string]*RESTSubscription
        for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
@@ -222,6 +225,12 @@ func (c *Control) ReadRESTSubscriptions() error {
                        xapp.Logger.Error("%v", err)
                        <-time.After(1 * time.Second)
                } else {
+                       // Fix REST subscriptions ongoing status after restart
+                       for restSubId, restSubscription := range restSubscriptions {
+                               restSubscription.SubReqOngoing = false
+                               restSubscription.SubDelReqOngoing = false
+                               c.WriteRESTSubscriptionToSdl(restSubId, restSubscription)
+                       }
                        c.registry.restSubscriptions = restSubscriptions
                        return nil
                }
@@ -522,7 +531,7 @@ func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription
        clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string, xAppRmrEndpoint string, md5sum string, e2SubscriptionDirectives *E2SubscriptionDirectives) {
 
        c.SubscriptionProcessingStartDelay()
-       xapp.Logger.Debug("E2 SubscriptionRequest count =%v ", len(subReqList.E2APSubscriptionRequests))
+       xapp.Logger.Debug("E2 SubscriptionRequest count = %v ", len(subReqList.E2APSubscriptionRequests))
 
        var xAppEventInstanceID int64
        var e2EventInstanceID int64
@@ -551,6 +560,10 @@ func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription
                trans.Release()
 
                if err != nil {
+                       if err.Error() == "TEST: restart event received" {
+                               // This is just for UT cases. Stop here subscription processing
+                               return
+                       }
                        c.sendUnsuccesfullResponseNotification(restSubId, restSubscription, xAppEventInstanceID, err, clientEndpoint, trans, errorInfo)
                } else {
                        e2EventInstanceID = (int64)(subRespMsg.RequestId.InstanceId)
@@ -601,7 +614,7 @@ func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e
        // Wake subs request
        //
        subs.OngoingReqCount++
-       go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives)
+       go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives, 0)
        event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
        subs.OngoingReqCount--
 
@@ -627,6 +640,10 @@ func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e
                case *SDLWriteErrortEvent:
                        err = fmt.Errorf("SDL write failure")
                        errorInfo = themsg.ErrorInfo
+               case *SubmgrRestartTestEvent:
+                       err = fmt.Errorf("TEST: restart event received")
+                       xapp.Logger.Debug("%s", err)
+                       return nil, &errorInfo, err
                default:
                        err = fmt.Errorf("Unexpected E2 subscription response received")
                        errorInfo.SetInfo(err.Error(), models.SubscriptionInstanceErrorSourceE2Node, "")
@@ -759,7 +776,7 @@ func (c *Control) RESTSubscriptionDeleteHandler(restSubId string) int {
        go func() {
                xapp.Logger.Debug("Deleteting handler: processing instances = %v", restSubscription.InstanceIds)
                for _, instanceId := range restSubscription.InstanceIds {
-                       xAppEventInstanceID, err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId)
+                       xAppEventInstanceID, err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId, 0)
 
                        if err != nil {
                                xapp.Logger.Error("%s", err.Error())
@@ -780,7 +797,7 @@ func (c *Control) RESTSubscriptionDeleteHandler(restSubId string) int {
 //-------------------------------------------------------------------
 //
 //-------------------------------------------------------------------
-func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32) (int64, error) {
+func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32, waitRouteCleanupTime time.Duration) (int64, error) {
 
        var xAppEventInstanceID int64
        subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{instanceId})
@@ -808,7 +825,7 @@ func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string,
        // Wake subs delete
        //
        subs.OngoingDelCount++
-       go c.handleSubscriptionDelete(subs, trans)
+       go c.handleSubscriptionDelete(subs, trans, waitRouteCleanupTime)
        trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
        subs.OngoingDelCount--
 
@@ -990,7 +1007,7 @@ func (c *Control) wakeSubscriptionRequest(subs *Subscription, trans *Transaction
 
        e2SubscriptionDirectives, _ := c.GetE2SubscriptionDirectives(nil)
        subs.OngoingReqCount++
-       go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives)
+       go c.handleSubscriptionCreate(subs, trans, e2SubscriptionDirectives, waitRouteCleanup_ms)
        event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
        subs.OngoingReqCount--
        var err error
@@ -1061,7 +1078,7 @@ func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
        // Wake subs delete
        //
        subs.OngoingDelCount++
-       go c.handleSubscriptionDelete(subs, trans)
+       go c.handleSubscriptionDelete(subs, trans, waitRouteCleanup_ms)
        trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
        subs.OngoingDelCount--
 
@@ -1091,7 +1108,7 @@ func (c *Control) handleXAPPSubscriptionDeleteRequest(params *xapp.RMRParams) {
 //-------------------------------------------------------------------
 // SUBS CREATE Handling
 //-------------------------------------------------------------------
-func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp, e2SubscriptionDirectives *E2SubscriptionDirectives) {
+func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp, e2SubscriptionDirectives *E2SubscriptionDirectives, waitRouteCleanupTime time.Duration) {
 
        var event interface{} = nil
        var removeSubscriptionFromDb bool = false
@@ -1110,38 +1127,43 @@ func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *Tran
                        subRfMsg, valid = subs.SetCachedResponse(event, true)
                        subs.SubRespRcvd = true
                case *e2ap.E2APSubscriptionFailure:
-                       removeSubscriptionFromDb = true
                        subRfMsg, valid = subs.SetCachedResponse(event, false)
                        xapp.Logger.Debug("SUBS-SubReq: internal delete due failure event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
                case *SubmgrRestartTestEvent:
-                       // This simulates that no response has been received and after restart subscriptions are restored from db
+                       // This is used to simulate that no response has been received and after restart, subscriptions are restored from db
                        xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
+                       subRfMsg, valid = subs.SetCachedResponse(event, false)
+                       parentTrans.SendEvent(subRfMsg, 0)
+                       return
                case *PackSubscriptionRequestErrortEvent, *SDLWriteErrortEvent:
-                       removeSubscriptionFromDb = true
                        subRfMsg, valid = subs.SetCachedResponse(event, false)
                default:
                        // Timer expiry
                        if subs.PolicyUpdate == false {
                                xapp.Logger.Debug("SUBS-SubReq: internal delete due default event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
-                               removeSubscriptionFromDb = true
                                subRfMsg, valid = subs.SetCachedResponse(nil, false)
                                c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
+                       } else {
+                               subRfMsg, valid = subs.SetCachedResponse(nil, true)
                        }
                }
                xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
        } else {
                xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
        }
+       if valid == false {
+               removeSubscriptionFromDb = true
+       }
 
        err := c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
        if err != nil {
-               subRfMsg, valid = subs.SetCachedResponse(event, false)
+               valid = false
                c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
        }
 
        //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
        if valid == false {
-               c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
+               c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanupTime, c)
        }
 
        parentTrans.SendEvent(subRfMsg, 0)
@@ -1151,7 +1173,7 @@ func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *Tran
 // SUBS DELETE Handling
 //-------------------------------------------------------------------
 
-func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
+func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp, waitRouteCleanupTime time.Duration) {
 
        trans := c.tracker.NewSubsTransaction(subs)
        subs.WaitTransactionTurn(trans)
@@ -1172,8 +1194,7 @@ func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *Tran
        //Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
        //  If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if
        //  RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...))
-       c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanup_ms, c)
-       c.registry.UpdateSubscriptionToDb(subs, c)
+       c.registry.RemoveFromSubscription(subs, parentTrans, waitRouteCleanupTime, c)
        parentTrans.SendEvent(nil, 0)
 }
 
@@ -1298,6 +1319,7 @@ func (c *Control) handleE2TSubscriptionResponse(params *xapp.RMRParams) {
                xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
                return
        }
+       xapp.Logger.Debug("SUBS-SubResp: Sending event, trans= %v", trans)
        sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
        if sendOk == false {
                err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
@@ -1501,6 +1523,11 @@ func (c *Control) RemoveRESTSubscriptionFromDb(restSubId string) {
 
 func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) {
 
+       if c.UTTesting == true {
+               // Reqistry mutex is not locked after real restart but it can be when restart is simulated in unit tests
+               c.registry.mutex = new(sync.Mutex)
+       }
+
        const ricRequestorId = 123
        xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)