Merge "Fix for DB read after VM restart and REST subscription query aded"
authorAnssi Mannila <anssi.mannila@nokia.com>
Tue, 3 Aug 2021 10:21:16 +0000 (10:21 +0000)
committerGerrit Code Review <gerrit@o-ran-sc.org>
Tue, 3 Aug 2021 10:21:16 +0000 (10:21 +0000)
1  2 
pkg/control/control.go
pkg/control/ut_messaging_test.go

diff --combined pkg/control/control.go
@@@ -72,6 -72,8 +72,8 @@@ var e2tMaxSubReqTryCount uint64    // I
  var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
  var readSubsFromDb string
  var restDuplicateCtrl duplicateCtrl
+ var dbRetryForever string
+ var dbTryCount int
  
  type Control struct {
        *xapp.RMRClient
@@@ -126,6 -128,7 +128,7 @@@ func NewControl() *Control 
  
        // Register REST handler for testing support
        xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
+       xapp.Resource.InjectRoute("/ric/v1/restsubscriptions", c.GetAllRestSubscriptions, "GET")
        xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET")
  
        go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler, c.SubscriptionDeleteHandlerCB)
        restDuplicateCtrl.Init()
  
        // Read subscriptions from db
-       xapp.Logger.Info("Reading subscriptions from db")
-       subIds, register, err := c.ReadAllSubscriptionsFromSdl()
-       if err != nil {
-               xapp.Logger.Error("%v", err)
-       } else {
-               c.registry.subIds = subIds
-               c.registry.register = register
-               c.HandleUncompletedSubscriptions(register)
-       }
+       c.ReadE2Subscriptions()
+       c.ReadRESTSubscriptions()
+       /*
+               xapp.Logger.Info("Reading subscriptions from db")
+               subIds, register, err := c.ReadAllSubscriptionsFromSdl()
+               if err != nil {
+                       xapp.Logger.Error("%v", err)
+               } else {
+                       c.registry.subIds = subIds
+                       c.registry.register = register
+                       c.HandleUncompletedSubscriptions(register)
+               }
  
-       restSubscriptions, err := c.ReadAllRESTSubscriptionsFromSdl()
-       if err != nil {
-               xapp.Logger.Error("%v", err)
-       } else {
-               c.registry.restSubscriptions = restSubscriptions
-       }
+               restSubscriptions, err := c.ReadAllRESTSubscriptionsFromSdl()
+               if err != nil {
+                       xapp.Logger.Error("%v", err)
+               } else {
+                       c.registry.restSubscriptions = restSubscriptions
+               }
+       */
        return c
  }
  
@@@ -161,6 -169,60 +169,60 @@@ func (c *Control) SymptomDataHandler(w 
        xapp.Resource.SendSymptomDataJson(w, r, subscriptions, "platform/subscriptions.json")
  }
  
+ //-------------------------------------------------------------------
+ //
+ //-------------------------------------------------------------------
+ func (c *Control) GetAllRestSubscriptions(w http.ResponseWriter, r *http.Request) {
+       xapp.Logger.Info("GetAllRestSubscriptions() called")
+       response := c.registry.GetAllRestSubscriptions()
+       w.Write(response)
+ }
+ //-------------------------------------------------------------------
+ //
+ //-------------------------------------------------------------------
+ func (c *Control) ReadE2Subscriptions() error {
+       var err error
+       var subIds []uint32
+       var register map[uint32]*Subscription
+       for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
+               xapp.Logger.Info("Reading E2 subscriptions from db")
+               subIds, register, err = c.ReadAllSubscriptionsFromSdl()
+               if err != nil {
+                       xapp.Logger.Error("%v", err)
+                       <-time.After(1 * time.Second)
+               } else {
+                       c.registry.subIds = subIds
+                       c.registry.register = register
+                       c.HandleUncompletedSubscriptions(register)
+                       return nil
+               }
+       }
+       xapp.Logger.Info("Continuing without retring")
+       return err
+ }
+ //-------------------------------------------------------------------
+ //
+ //-------------------------------------------------------------------
+ func (c *Control) ReadRESTSubscriptions() error {
+       var err error
+       var restSubscriptions map[string]*RESTSubscription
+       for i := 0; dbRetryForever == "true" || i < dbTryCount; i++ {
+               xapp.Logger.Info("Reading REST subscriptions from db")
+               restSubscriptions, err = c.ReadAllRESTSubscriptionsFromSdl()
+               if err != nil {
+                       xapp.Logger.Error("%v", err)
+                       <-time.After(1 * time.Second)
+               } else {
+                       c.registry.restSubscriptions = restSubscriptions
+                       return nil
+               }
+       }
+       xapp.Logger.Info("Continuing without retring")
+       return err
+ }
  //-------------------------------------------------------------------
  //
  //-------------------------------------------------------------------
@@@ -183,14 -245,6 +245,6 @@@ func (c *Control) ReadConfigParameters(
        }
        xapp.Logger.Info("e2tRecvMsgTimeout %v", e2tRecvMsgTimeout)
  
-       // Internal cfg parameter, used to define a wait time for RMR route clean-up. None default
-       // value 100ms used currently only in unittests.
-       waitRouteCleanup_ms = viper.GetDuration("controls.waitRouteCleanup_ms") * 1000000
-       if waitRouteCleanup_ms == 0 {
-               waitRouteCleanup_ms = 5000 * 1000000
-       }
-       xapp.Logger.Info("waitRouteCleanup %v", waitRouteCleanup_ms)
        e2tMaxSubReqTryCount = viper.GetUint64("controls.e2tMaxSubReqTryCount")
        if e2tMaxSubReqTryCount == 0 {
                e2tMaxSubReqTryCount = 1
                readSubsFromDb = "true"
        }
        xapp.Logger.Info("readSubsFromDb %v", readSubsFromDb)
+       dbTryCount = viper.GetInt("controls.dbTryCount")
+       if dbTryCount == 0 {
+               dbTryCount = 200
+       }
+       xapp.Logger.Info("dbTryCount %v", dbTryCount)
+       dbRetryForever = viper.GetString("controls.dbRetryForever")
+       if dbRetryForever == "" {
+               dbRetryForever = "true"
+       }
+       xapp.Logger.Info("dbRetryForever %v", dbRetryForever)
        c.LoggerLevel = viper.GetUint32("logger.level")
        if c.LoggerLevel == 0 {
                c.LoggerLevel = 3
        }
+       xapp.Logger.Info("LoggerLevel %v", c.LoggerLevel)
+       // Internal cfg parameter, used to define a wait time for RMR route clean-up. None default
+       // value 100ms used currently only in unittests.
+       waitRouteCleanup_ms = viper.GetDuration("controls.waitRouteCleanup_ms") * 1000000
+       if waitRouteCleanup_ms == 0 {
+               waitRouteCleanup_ms = 5000 * 1000000
+       }
+       xapp.Logger.Info("waitRouteCleanup %v", waitRouteCleanup_ms)
  }
  
  //-------------------------------------------------------------------
@@@ -544,7 -620,7 +620,7 @@@ func (c *Control) SubscriptionDeleteHan
  
        xAppRmrEndPoint := restSubscription.xAppRmrEndPoint
        go func() {
 -              xapp.Logger.Info("Deleteting instances = %v", restSubscription.InstanceIds)
 +              xapp.Logger.Info("Deleteting handler: processing instances = %v", restSubscription.InstanceIds)
                for _, instanceId := range restSubscription.InstanceIds {
                        xAppEventInstanceID, err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId)
  
@@@ -1276,10 -1352,6 +1352,10 @@@ func (c *Control) PrintRESTSubscription
  
        fmt.Println("CRESTSubscriptionRequest")
  
 +      if p == nil {
 +              return
 +      }
 +
        if p.SubscriptionID != "" {
                fmt.Println("  SubscriptionID = ", p.SubscriptionID)
        } else {
@@@ -2370,6 -2370,11 +2370,11 @@@ func TestPostEmptyDb(t *testing.T) 
        mainCtrl.sendPostRequest(t, "localhost:8080", "/ric/v1/test/emptydb")
  }
  
+ func TestGetRestSubscriptions(t *testing.T) {
+       mainCtrl.sendGetRequest(t, "localhost:8080", "/ric/v1/restsubscriptions")
+ }
  //-----------------------------------------------------------------------------
  // TestRESTSubReqAndRouteNok
  //
@@@ -2898,15 -2903,12 +2903,15 @@@ func TestRESTSubReqRetransmissionV4(t *
        params2.SetSubscriptionID(&restSubId)
  
        xapp.Subscription.SetResponseCB(xappConn1.SubscriptionRespHandler)
 -      xappConn1.WaitRESTNotificationForAnySubscriptionId(t)
 +      xappConn1.ExpectAnyNotification(t)
        // Resend the original request with an additional e2 subscription (detail), this time with restsubsid
        restSubId_resend := xappConn1.SendRESTSubsReq(t, params2)
 +      e2SubsId1 := xappConn1.WaitAnyRESTNotification(t)
 +      assert.Equal(t, e2SubsId, e2SubsId1)
  
        crereq2, cremsg2 := e2termConn1.RecvSubsReq(t)
  
 +      xappConn1.DecrementRequestCount()
        xappConn1.ExpectRESTNotification(t, restSubId_resend)
        e2termConn1.SendSubsResp(t, crereq2, cremsg2)
        e2SubsId2 := xappConn1.WaitRESTNotification(t, restSubId_resend)
        xapp.Subscription.SetResponseCB(xappConn1.SubscriptionRespHandler)
        params = xappConn1.GetRESTSubsReqReportParams(subReqCount)
        params.SetSubscriptionID(&restSubId)
 -      xappConn1.WaitRESTNotificationForAnySubscriptionId(t)
 +      xappConn1.ExpectAnyNotification(t)
        // Resend the original request again with only one e2 subscription (detail), this time with restsubsid
        restSubId_resend2 := xappConn1.SendRESTSubsReq(t, params)
        assert.Equal(t, restSubId_resend, restSubId_resend2)
  
 +      e2SubsId1 = xappConn1.WaitAnyRESTNotification(t)
 +      assert.Equal(t, e2SubsId, e2SubsId1)
 +
        // Delete both e2 subscriptions
        xappConn1.SendRESTSubsDelReq(t, &restSubId)
        e2SubsIds := []uint32{e2SubsId, e2SubsId2}
@@@ -3019,15 -3018,10 +3024,15 @@@ func TestRESTSubReqRetransmissionV5(t *
        params2.SetSubscriptionID(&restSubId)
  
        xapp.Subscription.SetResponseCB(xappConn1.SubscriptionRespHandler)
 -      xappConn1.WaitRESTNotificationForAnySubscriptionId(t)
 +      xappConn1.ExpectAnyNotification(t)
        // Resend the original request with an additional e2 subscription (detail), this time with restsubsid
        restSubId_resend := xappConn1.SendRESTSubsReq(t, params2)
  
 +      e2SubsId1 := xappConn1.WaitAnyRESTNotification(t)
 +      assert.Equal(t, e2SubsId, e2SubsId1)
 +      // The first E2 subscription returns immediately, manually decrement expected request count for the remaining request handling
 +      xappConn1.DecrementRequestCount()
 +
        crereq2, cremsg2 := e2termConn1.RecvSubsReq(t)
  
        xappConn1.ExpectRESTNotification(t, restSubId_resend)
  
        xapp.Subscription.SetResponseCB(xappConn1.SubscriptionRespHandler)
        params = xappConn1.GetRESTSubsReqReportParams(subReqCount)
 -      xappConn1.WaitRESTNotificationForAnySubscriptionId(t)
 +      xappConn1.ExpectAnyNotification(t)
        // Resend the original request again with only one e2 subscription (detail), WITHOUT restsubsid
        // md5sum shall find the original request
        restSubId_resend2 := xappConn1.SendRESTSubsReq(t, params)
        assert.Equal(t, restSubId_resend, restSubId_resend2)
  
 +      e2SubsId1 = xappConn1.WaitAnyRESTNotification(t)
 +      assert.Equal(t, e2SubsId, e2SubsId1)
 +
        // Delete both e2 subscriptions
        xappConn1.SendRESTSubsDelReq(t, &restSubId)
        e2SubsIds := []uint32{e2SubsId, e2SubsId2}
@@@ -3151,13 -3142,10 +3156,13 @@@ func TestRESTSubReqRetransmissionV6(t *
        params2.SetSubscriptionID(&restSubId)
  
        xapp.Subscription.SetResponseCB(xappConn1.SubscriptionRespHandler)
 -      xappConn1.WaitRESTNotificationForAnySubscriptionId(t)
 +      xappConn1.ExpectAnyNotification(t)
        // Resend the original request with an additional e2 subscription (detail), this time with restsubsid
        restSubId_resend := xappConn1.SendRESTSubsReq(t, params2)
  
 +      e2SubsId1 := xappConn1.WaitAnyRESTNotification(t)
 +      assert.Equal(t, e2SubsId, e2SubsId1)
 +
        crereq2, cremsg2 := e2termConn1.RecvSubsReq(t)
  
        xappConn1.ExpectRESTNotification(t, restSubId_resend)
@@@ -3955,12 -3943,12 +3960,12 @@@ func TestRESTSubReqAndSubDelOkSameActio
        params.SetMeid("RAN_NAME_1")
  
        xapp.Subscription.SetResponseCB(xappConn2.SubscriptionRespHandler)
 -      xappConn2.WaitRESTNotificationForAnySubscriptionId(t)
 +      xappConn2.ExpectAnyNotification(t)
        waiter := rtmgrHttp.AllocNextSleep(10, true)
        restSubId2 := xappConn2.SendRESTSubsReq(t, params)
        waiter.WaitResult(t)
        xapp.Logger.Info("Send REST subscriber request for subscriberId : %v", restSubId2)
 -      e2SubsId2 := <-xappConn2.RESTNotification
 +      e2SubsId2 := xappConn2.WaitAnyRESTNotification(t)
        xapp.Logger.Info("REST notification received e2SubsId=%v", e2SubsId2)
  
        queryXappSubscription(t, int64(e2SubsId1), "RAN_NAME_1", []string{"localhost:13560", "localhost:13660"})
@@@ -4935,10 -4923,10 +4940,10 @@@ func TestRESTSubReqAndSubDelOkSameActio
        params = xappConn2.GetRESTSubsReqReportParams(subReqCount)
        params.SetMeid("RAN_NAME_1")
        xapp.Subscription.SetResponseCB(xappConn2.SubscriptionRespHandler)
 -      xappConn2.WaitRESTNotificationForAnySubscriptionId(t)
 +      xappConn2.ExpectAnyNotification(t)
        restSubId2 := xappConn2.SendRESTSubsReq(t, params)
        xapp.Logger.Info("Send REST subscriber request for subscriberId : %v", restSubId2)
 -      e2SubsId2 := <-xappConn2.RESTNotification
 +      e2SubsId2 := xappConn2.WaitAnyRESTNotification(t)
        xapp.Logger.Info("REST notification received e2SubsId=%v", e2SubsId2)
  
        queryXappSubscription(t, int64(e2SubsId1), "RAN_NAME_1", []string{"localhost:13560", "localhost:13660"})