Implement sentinel based DB capacity scaling
[ric-plt/sdlgo.git] / internal / sdlgoredis / sdlgoredis_test.go
index 962704c..469e8c4 100644 (file)
    limitations under the License.
 */
 
+/*
+ * This source code is part of the near-RT RIC (RAN Intelligent Controller)
+ * platform project (RICP).
+ */
+
 package sdlgoredis_test
 
 import (
@@ -37,6 +42,10 @@ type pubSubMock struct {
        mock.Mock
 }
 
+type MockOS struct {
+       mock.Mock
+}
+
 func (m *pubSubMock) Channel() <-chan *redis.Message {
        return m.Called().Get(0).(chan *redis.Message)
 }
@@ -136,6 +145,11 @@ func setSubscribeNotifications() (*pubSubMock, sdlgoredis.SubscribeFn) {
        }
 }
 
+func (m *MockOS) Getenv(key string, defValue string) string {
+       a := m.Called(key, defValue)
+       return a.String(0)
+}
+
 func setup(commandsExists bool) (*pubSubMock, *clientMock, *sdlgoredis.DB) {
        mock := new(clientMock)
        pubSubMock, subscribeNotifications := setSubscribeNotifications()
@@ -166,6 +180,54 @@ func setup(commandsExists bool) (*pubSubMock, *clientMock, *sdlgoredis.DB) {
        return pubSubMock, mock, db
 }
 
+func setupEnv(host, port, msname, sntport, clsaddrlist string) ([]*clientMock, []*sdlgoredis.DB) {
+       var clmocks []*clientMock
+
+       dummyCommandInfo := redis.CommandInfo{
+               Name: "dummy",
+       }
+       cmdResult := make(map[string]*redis.CommandInfo, 0)
+
+       cmdResult = map[string]*redis.CommandInfo{
+               "dummy": &dummyCommandInfo,
+       }
+
+       osmock := new(MockOS)
+       osmock.On("Getenv", "DBAAS_SERVICE_HOST", "localhost").Return(host)
+       osmock.On("Getenv", "DBAAS_SERVICE_PORT", "6379").Return(port)
+       osmock.On("Getenv", "DBAAS_MASTER_NAME", "").Return(msname)
+       osmock.On("Getenv", "DBAAS_SERVICE_SENTINEL_PORT", "").Return(sntport)
+       osmock.On("Getenv", "DBAAS_CLUSTER_ADDR_LIST", "").Return(clsaddrlist)
+
+       clients := sdlgoredis.ReadConfigAndCreateDbClients(
+               osmock,
+               func(addr, port, clusterName string, isHa bool) sdlgoredis.RedisClient {
+                       clm := new(clientMock)
+                       clm.On("Command").Return(redis.NewCommandsInfoCmdResult(cmdResult, nil))
+                       clmocks = append(clmocks, clm)
+                       return clm
+               },
+       )
+
+       return clmocks, clients
+}
+
+func TestCloseDbSuccessfully(t *testing.T) {
+       _, r, db := setup(true)
+       r.On("Close").Return(nil)
+       err := db.CloseDB()
+       assert.Nil(t, err)
+       r.AssertExpectations(t)
+}
+
+func TestCloseDbFailure(t *testing.T) {
+       _, r, db := setup(true)
+       r.On("Close").Return(errors.New("Some error"))
+       err := db.CloseDB()
+       assert.NotNil(t, err)
+       r.AssertExpectations(t)
+}
+
 func TestMSetSuccessfully(t *testing.T) {
        _, r, db := setup(true)
        expectedKeysAndValues := []interface{}{"key1", "value1", "key2", 2}
@@ -348,9 +410,9 @@ func TestSetIECommandMissing(t *testing.T) {
 
 func TestSetIEPubKeyExists(t *testing.T) {
        _, r, db := setup(true)
-       expectedMessage := []interface{}{"SETIEPUB", "key", "newdata", "olddata", "channel", "message"}
+       expectedMessage := []interface{}{"SETIEMPUB", "key", "newdata", "olddata", "channel", "message"}
        r.On("Do", expectedMessage).Return(redis.NewCmdResult("OK", nil))
-       result, err := db.SetIEPub("channel", "message", "key", "olddata", "newdata")
+       result, err := db.SetIEPub([]string{"channel", "message"}, "key", "olddata", "newdata")
        assert.True(t, result)
        assert.Nil(t, err)
        r.AssertExpectations(t)
@@ -358,9 +420,9 @@ func TestSetIEPubKeyExists(t *testing.T) {
 
 func TestSetIEPubKeyDoesntExists(t *testing.T) {
        _, r, db := setup(true)
-       expectedMessage := []interface{}{"SETIEPUB", "key", "newdata", "olddata", "channel", "message"}
+       expectedMessage := []interface{}{"SETIEMPUB", "key", "newdata", "olddata", "channel", "message"}
        r.On("Do", expectedMessage).Return(redis.NewCmdResult(nil, nil))
-       result, err := db.SetIEPub("channel", "message", "key", "olddata", "newdata")
+       result, err := db.SetIEPub([]string{"channel", "message"}, "key", "olddata", "newdata")
        assert.False(t, result)
        assert.Nil(t, err)
        r.AssertExpectations(t)
@@ -368,9 +430,9 @@ func TestSetIEPubKeyDoesntExists(t *testing.T) {
 
 func TestSetIEPubFailure(t *testing.T) {
        _, r, db := setup(true)
-       expectedMessage := []interface{}{"SETIEPUB", "key", "newdata", "olddata", "channel", "message"}
+       expectedMessage := []interface{}{"SETIEMPUB", "key", "newdata", "olddata", "channel", "message"}
        r.On("Do", expectedMessage).Return(redis.NewCmdResult(nil, errors.New("Some error")))
-       result, err := db.SetIEPub("channel", "message", "key", "olddata", "newdata")
+       result, err := db.SetIEPub([]string{"channel", "message"}, "key", "olddata", "newdata")
        assert.False(t, result)
        assert.NotNil(t, err)
        r.AssertExpectations(t)
@@ -378,9 +440,9 @@ func TestSetIEPubFailure(t *testing.T) {
 
 func TestSetIEPubCommandMissing(t *testing.T) {
        _, r, db := setup(false)
-       expectedMessage := []interface{}{"SETIEPUB", "key", "newdata", "olddata", "channel", "message"}
+       expectedMessage := []interface{}{"SETIEMPUB", "key", "newdata", "olddata", "channel", "message"}
        r.AssertNotCalled(t, "Do", expectedMessage)
-       result, err := db.SetIEPub("channel", "message", "key", "olddata", "newdata")
+       result, err := db.SetIEPub([]string{"channel", "message"}, "key", "olddata", "newdata")
        assert.False(t, result)
        assert.NotNil(t, err)
        r.AssertExpectations(t)
@@ -388,9 +450,9 @@ func TestSetIEPubCommandMissing(t *testing.T) {
 
 func TestSetNXPubKeyDoesntExist(t *testing.T) {
        _, r, db := setup(true)
-       expectedMessage := []interface{}{"SETNXPUB", "key", "data", "channel", "message"}
+       expectedMessage := []interface{}{"SETNXMPUB", "key", "data", "channel", "message"}
        r.On("Do", expectedMessage).Return(redis.NewCmdResult("OK", nil))
-       result, err := db.SetNXPub("channel", "message", "key", "data")
+       result, err := db.SetNXPub([]string{"channel", "message"}, "key", "data")
        assert.True(t, result)
        assert.Nil(t, err)
        r.AssertExpectations(t)
@@ -398,9 +460,9 @@ func TestSetNXPubKeyDoesntExist(t *testing.T) {
 
 func TestSetNXPubKeyExists(t *testing.T) {
        _, r, db := setup(true)
-       expectedMessage := []interface{}{"SETNXPUB", "key", "data", "channel", "message"}
+       expectedMessage := []interface{}{"SETNXMPUB", "key", "data", "channel", "message"}
        r.On("Do", expectedMessage).Return(redis.NewCmdResult(nil, nil))
-       result, err := db.SetNXPub("channel", "message", "key", "data")
+       result, err := db.SetNXPub([]string{"channel", "message"}, "key", "data")
        assert.False(t, result)
        assert.Nil(t, err)
        r.AssertExpectations(t)
@@ -408,9 +470,9 @@ func TestSetNXPubKeyExists(t *testing.T) {
 
 func TestSetNXPubFailure(t *testing.T) {
        _, r, db := setup(true)
-       expectedMessage := []interface{}{"SETNXPUB", "key", "data", "channel", "message"}
+       expectedMessage := []interface{}{"SETNXMPUB", "key", "data", "channel", "message"}
        r.On("Do", expectedMessage).Return(redis.NewCmdResult(nil, errors.New("Some error")))
-       result, err := db.SetNXPub("channel", "message", "key", "data")
+       result, err := db.SetNXPub([]string{"channel", "message"}, "key", "data")
        assert.False(t, result)
        assert.NotNil(t, err)
        r.AssertExpectations(t)
@@ -418,9 +480,9 @@ func TestSetNXPubFailure(t *testing.T) {
 
 func TestSetNXPubCommandMissing(t *testing.T) {
        _, r, db := setup(false)
-       expectedMessage := []interface{}{"SETNXPUB", "key", "data", "channel", "message"}
+       expectedMessage := []interface{}{"SETNXMPUB", "key", "data", "channel", "message"}
        r.AssertNotCalled(t, "Do", expectedMessage)
-       result, err := db.SetNXPub("channel", "message", "key", "data")
+       result, err := db.SetNXPub([]string{"channel", "message"}, "key", "data")
        assert.False(t, result)
        assert.NotNil(t, err)
        r.AssertExpectations(t)
@@ -462,9 +524,9 @@ func TestSetNXFailure(t *testing.T) {
 
 func TestDelIEPubKeyDoesntExist(t *testing.T) {
        _, r, db := setup(true)
-       expectedMessage := []interface{}{"DELIEPUB", "key", "data", "channel", "message"}
-       r.On("Do", expectedMessage).Return(redis.NewCmdResult(0, nil))
-       result, err := db.DelIEPub("channel", "message", "key", "data")
+       expectedMessage := []interface{}{"DELIEMPUB", "key", "data", "channel", "message"}
+       r.On("Do", expectedMessage).Return(redis.NewCmdResult(int64(0), nil))
+       result, err := db.DelIEPub([]string{"channel", "message"}, "key", "data")
        assert.False(t, result)
        assert.Nil(t, err)
        r.AssertExpectations(t)
@@ -472,9 +534,19 @@ func TestDelIEPubKeyDoesntExist(t *testing.T) {
 
 func TestDelIEPubKeyExists(t *testing.T) {
        _, r, db := setup(true)
-       expectedMessage := []interface{}{"DELIEPUB", "key", "data", "channel", "message"}
+       expectedMessage := []interface{}{"DELIEMPUB", "key", "data", "channel", "message"}
+       r.On("Do", expectedMessage).Return(redis.NewCmdResult(int64(1), nil))
+       result, err := db.DelIEPub([]string{"channel", "message"}, "key", "data")
+       assert.True(t, result)
+       assert.Nil(t, err)
+       r.AssertExpectations(t)
+}
+
+func TestDelIEPubKeyExistsIntTypeRedisValue(t *testing.T) {
+       _, r, db := setup(true)
+       expectedMessage := []interface{}{"DELIEMPUB", "key", "data", "channel", "message"}
        r.On("Do", expectedMessage).Return(redis.NewCmdResult(1, nil))
-       result, err := db.DelIEPub("channel", "message", "key", "data")
+       result, err := db.DelIEPub([]string{"channel", "message"}, "key", "data")
        assert.True(t, result)
        assert.Nil(t, err)
        r.AssertExpectations(t)
@@ -482,9 +554,9 @@ func TestDelIEPubKeyExists(t *testing.T) {
 
 func TestDelIEPubFailure(t *testing.T) {
        _, r, db := setup(true)
-       expectedMessage := []interface{}{"DELIEPUB", "key", "data", "channel", "message"}
-       r.On("Do", expectedMessage).Return(redis.NewCmdResult(0, errors.New("Some error")))
-       result, err := db.DelIEPub("channel", "message", "key", "data")
+       expectedMessage := []interface{}{"DELIEMPUB", "key", "data", "channel", "message"}
+       r.On("Do", expectedMessage).Return(redis.NewCmdResult(int64(0), errors.New("Some error")))
+       result, err := db.DelIEPub([]string{"channel", "message"}, "key", "data")
        assert.False(t, result)
        assert.NotNil(t, err)
        r.AssertExpectations(t)
@@ -492,9 +564,9 @@ func TestDelIEPubFailure(t *testing.T) {
 
 func TestDelIEPubCommandMissing(t *testing.T) {
        _, r, db := setup(false)
-       expectedMessage := []interface{}{"DELIEPUB", "key", "data", "channel", "message"}
+       expectedMessage := []interface{}{"DELIEMPUB", "key", "data", "channel", "message"}
        r.AssertNotCalled(t, "Do", expectedMessage)
-       result, err := db.DelIEPub("channel", "message", "key", "data")
+       result, err := db.DelIEPub([]string{"channel", "message"}, "key", "data")
        assert.False(t, result)
        assert.NotNil(t, err)
        r.AssertExpectations(t)
@@ -503,7 +575,7 @@ func TestDelIEPubCommandMissing(t *testing.T) {
 func TestDelIEKeyDoesntExist(t *testing.T) {
        _, r, db := setup(true)
        expectedMessage := []interface{}{"DELIE", "key", "data"}
-       r.On("Do", expectedMessage).Return(redis.NewCmdResult(0, nil))
+       r.On("Do", expectedMessage).Return(redis.NewCmdResult(int64(0), nil))
        result, err := db.DelIE("key", "data")
        assert.False(t, result)
        assert.Nil(t, err)
@@ -511,6 +583,16 @@ func TestDelIEKeyDoesntExist(t *testing.T) {
 }
 
 func TestDelIEKeyExists(t *testing.T) {
+       _, r, db := setup(true)
+       expectedMessage := []interface{}{"DELIE", "key", "data"}
+       r.On("Do", expectedMessage).Return(redis.NewCmdResult(int64(1), nil))
+       result, err := db.DelIE("key", "data")
+       assert.True(t, result)
+       assert.Nil(t, err)
+       r.AssertExpectations(t)
+}
+
+func TestDelIEKeyExistsIntTypeRedisValue(t *testing.T) {
        _, r, db := setup(true)
        expectedMessage := []interface{}{"DELIE", "key", "data"}
        r.On("Do", expectedMessage).Return(redis.NewCmdResult(1, nil))
@@ -523,7 +605,7 @@ func TestDelIEKeyExists(t *testing.T) {
 func TestDelIEFailure(t *testing.T) {
        _, r, db := setup(true)
        expectedMessage := []interface{}{"DELIE", "key", "data"}
-       r.On("Do", expectedMessage).Return(redis.NewCmdResult(0, errors.New("Some error")))
+       r.On("Do", expectedMessage).Return(redis.NewCmdResult(int64(0), errors.New("Some error")))
        result, err := db.DelIE("key", "data")
        assert.False(t, result)
        assert.NotNil(t, err)
@@ -710,6 +792,7 @@ func TestSubscribeChannelDBSubscribeTwoUnsubscribeOne(t *testing.T) {
                receivedChannel2 = channel
        }, "{prefix}", "---", "{prefix}channel2")
 
+       time.Sleep(1 * time.Second)
        db.UnsubscribeChannelDB("{prefix}channel1")
        ch <- &msg2
        db.UnsubscribeChannelDB("{prefix}channel2")
@@ -721,6 +804,42 @@ func TestSubscribeChannelDBSubscribeTwoUnsubscribeOne(t *testing.T) {
        ps.AssertExpectations(t)
 }
 
+func TestSubscribeChannelReDBSubscribeAfterUnsubscribe(t *testing.T) {
+       ps, r, db := setup(true)
+       ch := make(chan *redis.Message)
+       msg := redis.Message{
+               Channel: "{prefix}channel",
+               Pattern: "pattern",
+               Payload: "event",
+       }
+       ps.On("Channel").Return(ch)
+       ps.On("Unsubscribe").Return(nil)
+       ps.On("Close").Return(nil)
+       count := 0
+       receivedChannel := ""
+
+       db.SubscribeChannelDB(func(channel string, payload ...string) {
+               count++
+               receivedChannel = channel
+       }, "{prefix}", "---", "{prefix}channel")
+       ch <- &msg
+       db.UnsubscribeChannelDB("{prefix}channel")
+       time.Sleep(1 * time.Second)
+
+       db.SubscribeChannelDB(func(channel string, payload ...string) {
+               count++
+               receivedChannel = channel
+       }, "{prefix}", "---", "{prefix}channel")
+       ch <- &msg
+       db.UnsubscribeChannelDB("{prefix}channel")
+
+       time.Sleep(1 * time.Second)
+       assert.Equal(t, 2, count)
+       assert.Equal(t, "channel", receivedChannel)
+       r.AssertExpectations(t)
+       ps.AssertExpectations(t)
+}
+
 func TestPTTLSuccessfully(t *testing.T) {
        _, r, db := setup(true)
        expectedKey := "key"
@@ -786,3 +905,75 @@ func TestPExpireIELockNotHeld(t *testing.T) {
        assert.NotNil(t, err)
        r.AssertExpectations(t)
 }
+
+func TestClientStandaloneRedisLegacyEnv(t *testing.T) {
+       rcls, dbs := setupEnv(
+               "service-ricplt-dbaas-tcp-cluster-0.ricplt", "6376", "", "", "",
+       )
+       assert.Equal(t, 1, len(rcls))
+       assert.Equal(t, 1, len(dbs))
+
+       expectedKeysAndValues := []interface{}{"key1", "value1"}
+       rcls[0].On("MSet", expectedKeysAndValues).Return(redis.NewStatusResult("OK", nil))
+       err := dbs[0].MSet("key1", "value1")
+       assert.Nil(t, err)
+       rcls[0].AssertExpectations(t)
+}
+
+func TestClientSentinelRedisLegacyEnv(t *testing.T) {
+       rcls, dbs := setupEnv(
+               "service-ricplt-dbaas-tcp-cluster-0.ricplt", "6376", "dbaasmaster", "26376", "",
+       )
+       assert.Equal(t, 1, len(rcls))
+       assert.Equal(t, 1, len(dbs))
+
+       expectedKeysAndValues := []interface{}{"key1", "value1"}
+       rcls[0].On("MSet", expectedKeysAndValues).Return(redis.NewStatusResult("OK", nil))
+       err := dbs[0].MSet("key1", "value1")
+       assert.Nil(t, err)
+       rcls[0].AssertExpectations(t)
+}
+
+func TestClientTwoStandaloneRedisEnvs(t *testing.T) {
+       rcls, dbs := setupEnv(
+               "service-ricplt-dbaas-tcp-cluster-0.ricplt", "6376", "", "",
+               "service-ricplt-dbaas-tcp-cluster-0.ricplt,service-ricplt-dbaas-tcp-cluster-1.ricplt",
+       )
+       assert.Equal(t, 2, len(rcls))
+       assert.Equal(t, 2, len(dbs))
+
+       expectedKeysAndValues := []interface{}{"key1", "value1"}
+       rcls[0].On("MSet", expectedKeysAndValues).Return(redis.NewStatusResult("OK", nil))
+       err := dbs[0].MSet("key1", "value1")
+       assert.Nil(t, err)
+       rcls[0].AssertExpectations(t)
+
+       expectedKeysAndValues = []interface{}{"key2", "value2"}
+       rcls[1].On("MSet", expectedKeysAndValues).Return(redis.NewStatusResult("OK", nil))
+       err = dbs[1].MSet("key2", "value2")
+       assert.Nil(t, err)
+       rcls[0].AssertExpectations(t)
+       rcls[1].AssertExpectations(t)
+}
+
+func TestClientTwoSentinelRedisEnvs(t *testing.T) {
+       rcls, dbs := setupEnv(
+               "service-ricplt-dbaas-tcp-cluster-0.ricplt", "6376", "dbaasmaster", "26376",
+               "service-ricplt-dbaas-tcp-cluster-0.ricplt,service-ricplt-dbaas-tcp-cluster-1.ricplt",
+       )
+       assert.Equal(t, 2, len(rcls))
+       assert.Equal(t, 2, len(dbs))
+
+       expectedKeysAndValues := []interface{}{"key1", "value1"}
+       rcls[0].On("MSet", expectedKeysAndValues).Return(redis.NewStatusResult("OK", nil))
+       err := dbs[0].MSet("key1", "value1")
+       assert.Nil(t, err)
+       rcls[0].AssertExpectations(t)
+
+       expectedKeysAndValues = []interface{}{"key2", "value2"}
+       rcls[1].On("MSet", expectedKeysAndValues).Return(redis.NewStatusResult("OK", nil))
+       err = dbs[1].MSet("key2", "value2")
+       assert.Nil(t, err)
+       rcls[0].AssertExpectations(t)
+       rcls[1].AssertExpectations(t)
+}