Fix for DB read after VM restart and REST subscription query aded 52/6552/2
authorAnssi Mannila <anssi.mannila@nokia.com>
Tue, 3 Aug 2021 08:57:11 +0000 (11:57 +0300)
committerAnssi Mannila <anssi.mannila@nokia.com>
Tue, 3 Aug 2021 10:07:59 +0000 (13:07 +0300)
Change-Id: I975ba41ceafee16470fdfacc7c823815dfc6ad49
Signed-off-by: Anssi Mannila <anssi.mannila@nokia.com>
config/submgr-config.yaml
pkg/control/control.go
pkg/control/registry.go
pkg/control/ut_messaging_test.go
test/config-file.json

index 25eb868..53d2161 100644 (file)
@@ -23,3 +23,5 @@
   "e2tMaxSubReqTryCount": 2
   "e2tMaxSubDelReqTryCount": 2
   "readSubsFromDb": "true"
+  "dbTryCount": 200
+  "dbRetryForever": "true"
index 4df7676..0b39f2c 100755 (executable)
@@ -72,6 +72,8 @@ var e2tMaxSubReqTryCount uint64    // Initial try + retry
 var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
 var readSubsFromDb string
 var restDuplicateCtrl duplicateCtrl
+var dbRetryForever string
+var dbTryCount int
 
 type Control struct {
        *xapp.RMRClient
@@ -126,6 +128,7 @@ func NewControl() *Control {
 
        // Register REST handler for testing support
        xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
+       xapp.Resource.InjectRoute("/ric/v1/restsubscriptions", c.GetAllRestSubscriptions, "GET")
        xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET")
 
        go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler, c.SubscriptionDeleteHandlerCB)
@@ -137,22 +140,27 @@ func NewControl() *Control {
        restDuplicateCtrl.Init()
 
        // Read subscriptions from db
-       xapp.Logger.Info("Reading subscriptions from db")
-       subIds, register, err := c.ReadAllSubscriptionsFromSdl()
-       if err != nil {
-               xapp.Logger.Error("%v", err)
-       } else {
-               c.registry.subIds = subIds
-               c.registry.register = register
-               c.HandleUncompletedSubscriptions(register)
-       }
+       c.ReadE2Subscriptions()
+       c.ReadRESTSubscriptions()
 
-       restSubscriptions, err := c.ReadAllRESTSubscriptionsFromSdl()
-       if err != nil {
-               xapp.Logger.Error("%v", err)
-       } else {
-               c.registry.restSubscriptions = restSubscriptions
-       }
+       /*
+               xapp.Logger.Info("Reading subscriptions from db")
+               subIds, register, err := c.ReadAllSubscriptionsFromSdl()
+               if err != nil {
+                       xapp.Logger.Error("%v", err)
+               } else {
+                       c.registry.subIds = subIds
+                       c.registry.register = register
+                       c.HandleUncompletedSubscriptions(register)
+               }
+
+               restSubscriptions, err := c.ReadAllRESTSubscriptionsFromSdl()
+               if err != nil {
+                       xapp.Logger.Error("%v", err)
+               } else {
+                       c.registry.restSubscriptions = restSubscriptions
+               }
+       */
        return c
 }
 
@@ -161,6 +169,60 @@ func (c *Control) SymptomDataHandler(w http.ResponseWriter, r *http.Request) {
        xapp.Resource.SendSymptomDataJson(w, r, subscriptions, "platform/subscriptions.json")
 }
 
+//-------------------------------------------------------------------
+//
+//-------------------------------------------------------------------
+func (c *Control) GetAllRestSubscriptions(w http.ResponseWriter, r *http.Request) {
+       xapp.Logger.Info("GetAllRestSubscriptions() called")
+       response := c.registry.GetAllRestSubscriptions()
+       w.Write(response)
+}
+
+//-------------------------------------------------------------------
+//
+//-------------------------------------------------------------------
+func (c *Control) ReadE2Subscriptions() error {
+       var err error
+       var subIds []uint32
+       var register map[uint32]*Subscription
+       for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
+               xapp.Logger.Info("Reading E2 subscriptions from db")
+               subIds, register, err = c.ReadAllSubscriptionsFromSdl()
+               if err != nil {
+                       xapp.Logger.Error("%v", err)
+                       <-time.After(1 * time.Second)
+               } else {
+                       c.registry.subIds = subIds
+                       c.registry.register = register
+                       c.HandleUncompletedSubscriptions(register)
+                       return nil
+               }
+       }
+       xapp.Logger.Info("Continuing without retring")
+       return err
+}
+
+//-------------------------------------------------------------------
+//
+//-------------------------------------------------------------------
+func (c *Control) ReadRESTSubscriptions() error {
+       var err error
+       var restSubscriptions map[string]*RESTSubscription
+       for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
+               xapp.Logger.Info("Reading REST subscriptions from db")
+               restSubscriptions, err = c.ReadAllRESTSubscriptionsFromSdl()
+               if err != nil {
+                       xapp.Logger.Error("%v", err)
+                       <-time.After(1 * time.Second)
+               } else {
+                       c.registry.restSubscriptions = restSubscriptions
+                       return nil
+               }
+       }
+       xapp.Logger.Info("Continuing without retring")
+       return err
+}
+
 //-------------------------------------------------------------------
 //
 //-------------------------------------------------------------------
@@ -183,14 +245,6 @@ func (c *Control) ReadConfigParameters(f string) {
        }
        xapp.Logger.Info("e2tRecvMsgTimeout %v", e2tRecvMsgTimeout)
 
-       // Internal cfg parameter, used to define a wait time for RMR route clean-up. None default
-       // value 100ms used currently only in unittests.
-       waitRouteCleanup_ms = viper.GetDuration("controls.waitRouteCleanup_ms") * 1000000
-       if waitRouteCleanup_ms == 0 {
-               waitRouteCleanup_ms = 5000 * 1000000
-       }
-       xapp.Logger.Info("waitRouteCleanup %v", waitRouteCleanup_ms)
-
        e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
        if e2tMaxSubReqTryCount == 0 {
                e2tMaxSubReqTryCount = 1
@@ -208,10 +262,32 @@ func (c *Control) ReadConfigParameters(f string) {
                readSubsFromDb = "true"
        }
        xapp.Logger.Info("readSubsFromDb %v", readSubsFromDb)
+
+       dbTryCount = viper.GetInt("controls.dbTryCount")
+       if dbTryCount == 0 {
+               dbTryCount = 200
+       }
+       xapp.Logger.Info("dbTryCount %v", dbTryCount)
+
+       dbRetryForever = viper.GetString("controls.dbRetryForever")
+       if dbRetryForever == "" {
+               dbRetryForever = "true"
+       }
+       xapp.Logger.Info("dbRetryForever %v", dbRetryForever)
+
        c.LoggerLevel = viper.GetUint32("logger.level")
        if c.LoggerLevel == 0 {
                c.LoggerLevel = 3
        }
+       xapp.Logger.Info("LoggerLevel %v", c.LoggerLevel)
+
+       // Internal cfg parameter, used to define a wait time for RMR route clean-up. None default
+       // value 100ms used currently only in unittests.
+       waitRouteCleanup_ms = viper.GetDuration("controls.waitRouteCleanup_ms") * 1000000
+       if waitRouteCleanup_ms == 0 {
+               waitRouteCleanup_ms = 5000 * 1000000
+       }
+       xapp.Logger.Info("waitRouteCleanup %v", waitRouteCleanup_ms)
 }
 
 //-------------------------------------------------------------------
index 2495d3b..72a2100 100644 (file)
@@ -20,6 +20,7 @@
 package control
 
 import (
+       "encoding/json"
        "fmt"
        "sync"
        "time"
@@ -104,6 +105,16 @@ func (r *Registry) Initialize() {
        }
 }
 
+func (r *Registry) GetAllRestSubscriptions() []byte {
+       r.mutex.Lock()
+       defer r.mutex.Unlock()
+       restSubscriptionsJson, err := json.Marshal(r.restSubscriptions)
+       if err != nil {
+               xapp.Logger.Error("GetAllRestSubscriptions(): %v", err)
+       }
+       return restSubscriptionsJson
+}
+
 func (r *Registry) CreateRESTSubscription(restSubId *string, xAppRmrEndPoint *string, maid *string) (*RESTSubscription, error) {
        r.mutex.Lock()
        defer r.mutex.Unlock()
index 63ffaaf..a19ccab 100644 (file)
@@ -2370,6 +2370,11 @@ func TestPostEmptyDb(t *testing.T) {
        mainCtrl.sendPostRequest(t, "localhost:8080", "/ric/v1/test/emptydb")
 }
 
+func TestGetRestSubscriptions(t *testing.T) {
+
+       mainCtrl.sendGetRequest(t, "localhost:8080", "/ric/v1/restsubscriptions")
+}
+
 //-----------------------------------------------------------------------------
 // TestRESTSubReqAndRouteNok
 //
index 93dd1cd..fb4fb36 100644 (file)
       "e2tRecvMsgTimeout_ms": 2000,
       "e2tMaxSubReqTryCount": 2,
       "e2tMaxSubDelReqTryCount": 2,
-      "waitRouteCleanup_ms": 100,
       "readSubsFromDb": "true",
+      "dbTryCount": 2,
+      "dbRetryForever": "false",
+      "waitRouteCleanup_ms": 100,
       "subscription": {
           "host": "localhost:8088",
           "timeout": 2