Implement sentinel based DB capacity scaling 62/6062/1 dawn v0.7.0
authorTimo Tietavainen <timo.tietavainen@nokia.com>
Tue, 11 May 2021 13:38:33 +0000 (16:38 +0300)
committerTimo Tietavainen <timo.tietavainen@nokia.com>
Tue, 11 May 2021 13:42:01 +0000 (16:42 +0300)
For time being SDL has supported standalone DBAAS DB and DBAAS HA DB deployment
with Redis sentinel. With this commit extent SDL functionality to support Redis
sentinel based DB cluster where we have multiple DBAAS Redis sentinel groups
and these groups can be used to spread out SDL DB write and read operations to
different DB instances.
Implement cluster DBAAS DB service addresses reading from environment variable
'DBAAS_CLUSTER_ADDR_LIST'.
Implement crc32 hash value calculation from namespace string and selection of
a DB instance from DB cluster based on calculated hash.

Issue-ID: RIC-699

Signed-off-by: Timo Tietavainen <timo.tietavainen@nokia.com>
Change-Id: I8999c3c03d2d80d32a57d8ec051632b975bf9dc2

docs/release-notes.rst
internal/sdlgoredis/sdlgoredis.go
internal/sdlgoredis/sdlgoredis_test.go
sdl.go
sdl_private_fn_test.go
syncstorage.go

index d0e8f02..b0a7dd6 100644 (file)
@@ -30,6 +30,10 @@ This document provides the release notes of the sdlgo.
 Version history
 ---------------
 
+[0.7.0] - 2021-05-11
+
+* Add DB backend instance selection based on namespace value to balance DB load.
+
 [0.6.0] - 2021-05-11
 
 * Add SDL multi-namespace API 'SyncStorage'.
index 327946e..570dfa1 100644 (file)
@@ -34,6 +34,7 @@ import (
 )
 
 type ChannelNotificationCb func(channel string, payload ...string)
+type RedisClientCreator func(addr, port, clusterName string, isHa bool) RedisClient
 
 type intChannels struct {
        addChannel    chan string
@@ -46,6 +47,14 @@ type sharedCbMap struct {
        cbMap map[string]ChannelNotificationCb
 }
 
+type Config struct {
+       hostname        string
+       port            string
+       masterName      string
+       sentinelPort    string
+       clusterAddrList string
+}
+
 type DB struct {
        client       RedisClient
        subscribe    SubscribeFn
@@ -134,41 +143,94 @@ func CreateDB(client RedisClient, subscribe SubscribeFn) *DB {
        return &db
 }
 
-func Create() *DB {
-       var client *redis.Client
-       hostname := os.Getenv("DBAAS_SERVICE_HOST")
-       if hostname == "" {
-               hostname = "localhost"
+func Create() []*DB {
+       osimpl := osImpl{}
+       return ReadConfigAndCreateDbClients(osimpl, newRedisClient)
+}
+
+func readConfig(osI OS) Config {
+       cfg := Config{
+               hostname:        osI.Getenv("DBAAS_SERVICE_HOST", "localhost"),
+               port:            osI.Getenv("DBAAS_SERVICE_PORT", "6379"),
+               masterName:      osI.Getenv("DBAAS_MASTER_NAME", ""),
+               sentinelPort:    osI.Getenv("DBAAS_SERVICE_SENTINEL_PORT", ""),
+               clusterAddrList: osI.Getenv("DBAAS_CLUSTER_ADDR_LIST", ""),
+       }
+       return cfg
+}
+
+type OS interface {
+       Getenv(key string, defValue string) string
+}
+
+type osImpl struct{}
+
+func (osImpl) Getenv(key string, defValue string) string {
+       val := os.Getenv(key)
+       if val == "" {
+               val = defValue
+       }
+       return val
+}
+
+func ReadConfigAndCreateDbClients(osI OS, clientCreator RedisClientCreator) []*DB {
+       cfg := readConfig(osI)
+       return createDbClients(cfg, clientCreator)
+}
+
+func createDbClients(cfg Config, clientCreator RedisClientCreator) []*DB {
+       if cfg.clusterAddrList == "" {
+               return []*DB{createLegacyDbClient(cfg, clientCreator)}
        }
-       port := os.Getenv("DBAAS_SERVICE_PORT")
-       if port == "" {
-               port = "6379"
+
+       dbs := []*DB{}
+
+       addrList := strings.Split(cfg.clusterAddrList, ",")
+       for _, addr := range addrList {
+               db := createDbClient(cfg, addr, clientCreator)
+               dbs = append(dbs, db)
        }
-       sentinelPort := os.Getenv("DBAAS_SERVICE_SENTINEL_PORT")
-       masterName := os.Getenv("DBAAS_MASTER_NAME")
-       if sentinelPort == "" {
-               redisAddress := hostname + ":" + port
-               client = redis.NewClient(&redis.Options{
-                       Addr:       redisAddress,
-                       Password:   "", // no password set
-                       DB:         0,  // use default DB
-                       PoolSize:   20,
-                       MaxRetries: 2,
-               })
+       return dbs
+}
+
+func createLegacyDbClient(cfg Config, clientCreator RedisClientCreator) *DB {
+       return createDbClient(cfg, cfg.hostname, clientCreator)
+}
+
+func createDbClient(cfg Config, hostName string, clientCreator RedisClientCreator) *DB {
+       var client RedisClient
+       if cfg.sentinelPort == "" {
+               client = clientCreator(hostName, cfg.port, "", false)
        } else {
-               sentinelAddress := hostname + ":" + sentinelPort
-               client = redis.NewFailoverClient(&redis.FailoverOptions{
-                       MasterName:    masterName,
-                       SentinelAddrs: []string{sentinelAddress},
-                       PoolSize:      20,
-                       MaxRetries:    2,
-               })
+               client = clientCreator(hostName, cfg.sentinelPort, cfg.masterName, true)
        }
        db := CreateDB(client, subscribeNotifications)
        db.CheckCommands()
        return db
 }
 
+func newRedisClient(addr, port, clusterName string, isHa bool) RedisClient {
+       if isHa == true {
+               sentinelAddress := addr + ":" + port
+               return redis.NewFailoverClient(
+                       &redis.FailoverOptions{
+                               MasterName:    clusterName,
+                               SentinelAddrs: []string{sentinelAddress},
+                               PoolSize:      20,
+                               MaxRetries:    2,
+                       },
+               )
+       }
+       redisAddress := addr + ":" + port
+       return redis.NewClient(&redis.Options{
+               Addr:       redisAddress,
+               Password:   "", // no password set
+               DB:         0,  // use default DB
+               PoolSize:   20,
+               MaxRetries: 2,
+       })
+}
+
 func (db *DB) CheckCommands() {
        commands, err := db.client.Command().Result()
        if err == nil {
index 039a208..469e8c4 100644 (file)
@@ -42,6 +42,10 @@ type pubSubMock struct {
        mock.Mock
 }
 
+type MockOS struct {
+       mock.Mock
+}
+
 func (m *pubSubMock) Channel() <-chan *redis.Message {
        return m.Called().Get(0).(chan *redis.Message)
 }
@@ -141,6 +145,11 @@ func setSubscribeNotifications() (*pubSubMock, sdlgoredis.SubscribeFn) {
        }
 }
 
+func (m *MockOS) Getenv(key string, defValue string) string {
+       a := m.Called(key, defValue)
+       return a.String(0)
+}
+
 func setup(commandsExists bool) (*pubSubMock, *clientMock, *sdlgoredis.DB) {
        mock := new(clientMock)
        pubSubMock, subscribeNotifications := setSubscribeNotifications()
@@ -171,6 +180,54 @@ func setup(commandsExists bool) (*pubSubMock, *clientMock, *sdlgoredis.DB) {
        return pubSubMock, mock, db
 }
 
+func setupEnv(host, port, msname, sntport, clsaddrlist string) ([]*clientMock, []*sdlgoredis.DB) {
+       var clmocks []*clientMock
+
+       dummyCommandInfo := redis.CommandInfo{
+               Name: "dummy",
+       }
+       cmdResult := make(map[string]*redis.CommandInfo, 0)
+
+       cmdResult = map[string]*redis.CommandInfo{
+               "dummy": &dummyCommandInfo,
+       }
+
+       osmock := new(MockOS)
+       osmock.On("Getenv", "DBAAS_SERVICE_HOST", "localhost").Return(host)
+       osmock.On("Getenv", "DBAAS_SERVICE_PORT", "6379").Return(port)
+       osmock.On("Getenv", "DBAAS_MASTER_NAME", "").Return(msname)
+       osmock.On("Getenv", "DBAAS_SERVICE_SENTINEL_PORT", "").Return(sntport)
+       osmock.On("Getenv", "DBAAS_CLUSTER_ADDR_LIST", "").Return(clsaddrlist)
+
+       clients := sdlgoredis.ReadConfigAndCreateDbClients(
+               osmock,
+               func(addr, port, clusterName string, isHa bool) sdlgoredis.RedisClient {
+                       clm := new(clientMock)
+                       clm.On("Command").Return(redis.NewCommandsInfoCmdResult(cmdResult, nil))
+                       clmocks = append(clmocks, clm)
+                       return clm
+               },
+       )
+
+       return clmocks, clients
+}
+
+func TestCloseDbSuccessfully(t *testing.T) {
+       _, r, db := setup(true)
+       r.On("Close").Return(nil)
+       err := db.CloseDB()
+       assert.Nil(t, err)
+       r.AssertExpectations(t)
+}
+
+func TestCloseDbFailure(t *testing.T) {
+       _, r, db := setup(true)
+       r.On("Close").Return(errors.New("Some error"))
+       err := db.CloseDB()
+       assert.NotNil(t, err)
+       r.AssertExpectations(t)
+}
+
 func TestMSetSuccessfully(t *testing.T) {
        _, r, db := setup(true)
        expectedKeysAndValues := []interface{}{"key1", "value1", "key2", 2}
@@ -848,3 +905,75 @@ func TestPExpireIELockNotHeld(t *testing.T) {
        assert.NotNil(t, err)
        r.AssertExpectations(t)
 }
+
+func TestClientStandaloneRedisLegacyEnv(t *testing.T) {
+       rcls, dbs := setupEnv(
+               "service-ricplt-dbaas-tcp-cluster-0.ricplt", "6376", "", "", "",
+       )
+       assert.Equal(t, 1, len(rcls))
+       assert.Equal(t, 1, len(dbs))
+
+       expectedKeysAndValues := []interface{}{"key1", "value1"}
+       rcls[0].On("MSet", expectedKeysAndValues).Return(redis.NewStatusResult("OK", nil))
+       err := dbs[0].MSet("key1", "value1")
+       assert.Nil(t, err)
+       rcls[0].AssertExpectations(t)
+}
+
+func TestClientSentinelRedisLegacyEnv(t *testing.T) {
+       rcls, dbs := setupEnv(
+               "service-ricplt-dbaas-tcp-cluster-0.ricplt", "6376", "dbaasmaster", "26376", "",
+       )
+       assert.Equal(t, 1, len(rcls))
+       assert.Equal(t, 1, len(dbs))
+
+       expectedKeysAndValues := []interface{}{"key1", "value1"}
+       rcls[0].On("MSet", expectedKeysAndValues).Return(redis.NewStatusResult("OK", nil))
+       err := dbs[0].MSet("key1", "value1")
+       assert.Nil(t, err)
+       rcls[0].AssertExpectations(t)
+}
+
+func TestClientTwoStandaloneRedisEnvs(t *testing.T) {
+       rcls, dbs := setupEnv(
+               "service-ricplt-dbaas-tcp-cluster-0.ricplt", "6376", "", "",
+               "service-ricplt-dbaas-tcp-cluster-0.ricplt,service-ricplt-dbaas-tcp-cluster-1.ricplt",
+       )
+       assert.Equal(t, 2, len(rcls))
+       assert.Equal(t, 2, len(dbs))
+
+       expectedKeysAndValues := []interface{}{"key1", "value1"}
+       rcls[0].On("MSet", expectedKeysAndValues).Return(redis.NewStatusResult("OK", nil))
+       err := dbs[0].MSet("key1", "value1")
+       assert.Nil(t, err)
+       rcls[0].AssertExpectations(t)
+
+       expectedKeysAndValues = []interface{}{"key2", "value2"}
+       rcls[1].On("MSet", expectedKeysAndValues).Return(redis.NewStatusResult("OK", nil))
+       err = dbs[1].MSet("key2", "value2")
+       assert.Nil(t, err)
+       rcls[0].AssertExpectations(t)
+       rcls[1].AssertExpectations(t)
+}
+
+func TestClientTwoSentinelRedisEnvs(t *testing.T) {
+       rcls, dbs := setupEnv(
+               "service-ricplt-dbaas-tcp-cluster-0.ricplt", "6376", "dbaasmaster", "26376",
+               "service-ricplt-dbaas-tcp-cluster-0.ricplt,service-ricplt-dbaas-tcp-cluster-1.ricplt",
+       )
+       assert.Equal(t, 2, len(rcls))
+       assert.Equal(t, 2, len(dbs))
+
+       expectedKeysAndValues := []interface{}{"key1", "value1"}
+       rcls[0].On("MSet", expectedKeysAndValues).Return(redis.NewStatusResult("OK", nil))
+       err := dbs[0].MSet("key1", "value1")
+       assert.Nil(t, err)
+       rcls[0].AssertExpectations(t)
+
+       expectedKeysAndValues = []interface{}{"key2", "value2"}
+       rcls[1].On("MSet", expectedKeysAndValues).Return(redis.NewStatusResult("OK", nil))
+       err = dbs[1].MSet("key2", "value2")
+       assert.Nil(t, err)
+       rcls[0].AssertExpectations(t)
+       rcls[1].AssertExpectations(t)
+}
diff --git a/sdl.go b/sdl.go
index afc4781..51cfb93 100644 (file)
--- a/sdl.go
+++ b/sdl.go
@@ -23,9 +23,8 @@
 package sdlgo
 
 import (
-       "time"
-
        "gerrit.o-ran-sc.org/r/ric-plt/sdlgo/internal/sdlgoredis"
+       "time"
 )
 
 //SdlInstance provides an API to read, write and modify
@@ -40,7 +39,7 @@ type SdlInstance struct {
 //can use this exported data type to locally store a reference to database
 //instance returned from NewDabase() function.
 type Database struct {
-       instance iDatabase
+       instances []iDatabase
 }
 
 //NewDatabase creates a connection to database that will be used
@@ -48,9 +47,11 @@ type Database struct {
 //can be reused between multiple SDL instances in which case each instance
 //is using the same connection.
 func NewDatabase() *Database {
-       return &Database{
-               instance: sdlgoredis.Create(),
+       db := &Database{}
+       for _, v := range sdlgoredis.Create() {
+               db.instances = append(db.instances, v)
        }
+       return db
 }
 
 //NewSdlInstance creates a new sdl instance using the given namespace.
index 318d84f..c60a16f 100644 (file)
@@ -25,9 +25,12 @@ package sdlgo
 //NewSdlInstanceForTest is used in unit tests only in order to replace the
 //underlying redis implementation with mock
 func NewSdlInstanceForTest(NameSpace string, instance iDatabase) *SdlInstance {
+       db := &Database{}
+       db.instances = append(db.instances, instance)
+
        return &SdlInstance{
                nameSpace: NameSpace,
                nsPrefix:  "{" + NameSpace + "},",
-               storage:   newSyncStorage(&Database{instance: instance}),
+               storage:   newSyncStorage(db),
        }
 }
index 874a41e..f2f708e 100644 (file)
@@ -27,6 +27,7 @@ import (
        "encoding/base64"
        "errors"
        "fmt"
+       "hash/crc32"
        "io"
        "reflect"
        "strings"
@@ -44,7 +45,7 @@ type SyncStorage struct {
        eventSeparator string
        mutex          sync.Mutex
        tmp            []byte
-       iDatabase
+       db             *Database
 }
 
 //NewSyncStorage creates a new sdl instance.
@@ -56,10 +57,23 @@ func NewSyncStorage() *SyncStorage {
 func newSyncStorage(db *Database) *SyncStorage {
        return &SyncStorage{
                eventSeparator: "___",
-               iDatabase:      db.instance,
+               db:             db,
        }
 }
 
+//selectDbInstance Selects DB instance what provides DB services for the namespace
+func (s *SyncStorage) getDbBackend(ns string) iDatabase {
+       instanceCount := uint32(len(s.db.instances))
+       instanceID := getHash(ns) % instanceCount
+       return s.db.instances[instanceID]
+}
+
+//getHash Returns hash value calculated from the string
+func getHash(s string) uint32 {
+       tbl := crc32.MakeTable(crc32.IEEE)
+       return crc32.Checksum([]byte(s), tbl)
+}
+
 //SubscribeChannel lets you to subscribe for a events on a given channels.
 //SDL notifications are events that are published on a specific channels.
 //Both the channel and events are defined by the entity that is publishing
@@ -80,7 +94,7 @@ func newSyncStorage(db *Database) *SyncStorage {
 //events received from different channels, callbacks are called in series one by one.
 func (s *SyncStorage) SubscribeChannel(ns string, cb func(string, ...string), channels ...string) error {
        nsPrefix := getNsPrefix(ns)
-       s.SubscribeChannelDB(cb, nsPrefix, s.eventSeparator, s.setNamespaceToChannels(nsPrefix, channels...)...)
+       s.getDbBackend(ns).SubscribeChannelDB(cb, nsPrefix, s.eventSeparator, s.setNamespaceToChannels(nsPrefix, channels...)...)
        return nil
 }
 
@@ -88,13 +102,19 @@ func (s *SyncStorage) SubscribeChannel(ns string, cb func(string, ...string), ch
 //namespace.
 func (s *SyncStorage) UnsubscribeChannel(ns string, channels ...string) error {
        nsPrefix := getNsPrefix(ns)
-       s.UnsubscribeChannelDB(s.setNamespaceToChannels(nsPrefix, channels...)...)
+       s.getDbBackend(ns).UnsubscribeChannelDB(s.setNamespaceToChannels(nsPrefix, channels...)...)
        return nil
 }
 
 //Close connection to backend database.
 func (s *SyncStorage) Close() error {
-       return s.CloseDB()
+       var ret error
+       for _, db := range s.db.instances {
+               if err := db.CloseDB(); err != nil {
+                       ret = err
+               }
+       }
+       return ret
 }
 
 func (s *SyncStorage) checkChannelsAndEvents(cmd string, channelsAndEvents []string) error {
@@ -231,13 +251,13 @@ func (s *SyncStorage) SetAndPublish(ns string, channelsAndEvents []string, pairs
                return err
        }
        if len(channelsAndEvents) == 0 {
-               return s.MSet(keyAndData...)
+               return s.getDbBackend(ns).MSet(keyAndData...)
        }
        if err := s.checkChannelsAndEvents("SetAndPublish", channelsAndEvents); err != nil {
                return err
        }
        channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
-       return s.MSetMPub(channelsAndEventsPrepared, keyAndData...)
+       return s.getDbBackend(ns).MSetMPub(channelsAndEventsPrepared, keyAndData...)
 }
 
 //Set function writes data to shared data layer storage. Writing is done
@@ -256,7 +276,7 @@ func (s *SyncStorage) Set(ns string, pairs ...interface{}) error {
        if err != nil {
                return err
        }
-       return s.MSet(keyAndData...)
+       return s.getDbBackend(ns).MSet(keyAndData...)
 }
 
 //Get function atomically reads one or more keys from SDL. The returned map has the
@@ -274,7 +294,7 @@ func (s *SyncStorage) Get(ns string, keys []string) (map[string]interface{}, err
        for _, v := range keys {
                keysWithNs = append(keysWithNs, getNsPrefix(ns)+v)
        }
-       val, err := s.MGet(keysWithNs)
+       val, err := s.getDbBackend(ns).MGet(keysWithNs)
        if err != nil {
                return m, err
        }
@@ -291,20 +311,20 @@ func (s *SyncStorage) Get(ns string, keys []string) (map[string]interface{}, err
 func (s *SyncStorage) SetIfAndPublish(ns string, channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error) {
        nsPrefix := getNsPrefix(ns)
        if len(channelsAndEvents) == 0 {
-               return s.SetIE(nsPrefix+key, oldData, newData)
+               return s.getDbBackend(ns).SetIE(nsPrefix+key, oldData, newData)
        }
        if err := s.checkChannelsAndEvents("SetIfAndPublish", channelsAndEvents); err != nil {
                return false, err
        }
        channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
-       return s.SetIEPub(channelsAndEventsPrepared, nsPrefix+key, oldData, newData)
+       return s.getDbBackend(ns).SetIEPub(channelsAndEventsPrepared, nsPrefix+key, oldData, newData)
 }
 
 //SetIf atomically replaces existing data with newData in SDL if data matches the oldData.
 //If replace was done successfully, true will be returned.
 //Data is written under the namespace what is given as a parameter for this function.
 func (s *SyncStorage) SetIf(ns string, key string, oldData, newData interface{}) (bool, error) {
-       return s.SetIE(getNsPrefix(ns)+key, oldData, newData)
+       return s.getDbBackend(ns).SetIE(getNsPrefix(ns)+key, oldData, newData)
 }
 
 //SetIfNotExistsAndPublish conditionally sets the value of a key. If key already exists in SDL,
@@ -315,13 +335,13 @@ func (s *SyncStorage) SetIf(ns string, key string, oldData, newData interface{})
 func (s *SyncStorage) SetIfNotExistsAndPublish(ns string, channelsAndEvents []string, key string, data interface{}) (bool, error) {
        nsPrefix := getNsPrefix(ns)
        if len(channelsAndEvents) == 0 {
-               return s.SetNX(nsPrefix+key, data, 0)
+               return s.getDbBackend(ns).SetNX(nsPrefix+key, data, 0)
        }
        if err := s.checkChannelsAndEvents("SetIfNotExistsAndPublish", channelsAndEvents); err != nil {
                return false, err
        }
        channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
-       return s.SetNXPub(channelsAndEventsPrepared, nsPrefix+key, data)
+       return s.getDbBackend(ns).SetNXPub(channelsAndEventsPrepared, nsPrefix+key, data)
 }
 
 //SetIfNotExists conditionally sets the value of a key. If key already exists in SDL,
@@ -329,7 +349,7 @@ func (s *SyncStorage) SetIfNotExistsAndPublish(ns string, channelsAndEvents []st
 //is done atomically.
 //Data is written under the namespace what is given as a parameter for this function.
 func (s *SyncStorage) SetIfNotExists(ns string, key string, data interface{}) (bool, error) {
-       return s.SetNX(getNsPrefix(ns)+key, data, 0)
+       return s.getDbBackend(ns).SetNX(getNsPrefix(ns)+key, data, 0)
 }
 
 //RemoveAndPublish removes data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
@@ -349,13 +369,13 @@ func (s *SyncStorage) RemoveAndPublish(ns string, channelsAndEvents []string, ke
                keysWithNs = append(keysWithNs, nsPrefix+v)
        }
        if len(channelsAndEvents) == 0 {
-               return s.Del(keysWithNs)
+               return s.getDbBackend(ns).Del(keysWithNs)
        }
        if err := s.checkChannelsAndEvents("RemoveAndPublish", channelsAndEvents); err != nil {
                return err
        }
        channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
-       return s.DelMPub(channelsAndEventsPrepared, keysWithNs)
+       return s.getDbBackend(ns).DelMPub(channelsAndEventsPrepared, keysWithNs)
 }
 
 //Remove data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
@@ -369,7 +389,7 @@ func (s *SyncStorage) Remove(ns string, keys []string) error {
        for _, v := range keys {
                keysWithNs = append(keysWithNs, getNsPrefix(ns)+v)
        }
-       err := s.Del(keysWithNs)
+       err := s.getDbBackend(ns).Del(keysWithNs)
        return err
 }
 
@@ -380,20 +400,20 @@ func (s *SyncStorage) Remove(ns string, keys []string) error {
 func (s *SyncStorage) RemoveIfAndPublish(ns string, channelsAndEvents []string, key string, data interface{}) (bool, error) {
        nsPrefix := getNsPrefix(ns)
        if len(channelsAndEvents) == 0 {
-               return s.DelIE(nsPrefix+key, data)
+               return s.getDbBackend(ns).DelIE(nsPrefix+key, data)
        }
        if err := s.checkChannelsAndEvents("RemoveIfAndPublish", channelsAndEvents); err != nil {
                return false, err
        }
        channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
-       return s.DelIEPub(channelsAndEventsPrepared, nsPrefix+key, data)
+       return s.getDbBackend(ns).DelIEPub(channelsAndEventsPrepared, nsPrefix+key, data)
 }
 
 //RemoveIf removes data from SDL conditionally. If existing data matches given data,
 //key and data are removed from SDL. If remove was done successfully, true is returned.
 //Data is removed under the namespace what is given as a parameter for this function.
 func (s *SyncStorage) RemoveIf(ns string, key string, data interface{}) (bool, error) {
-       status, err := s.DelIE(getNsPrefix(ns)+key, data)
+       status, err := s.getDbBackend(ns).DelIE(getNsPrefix(ns)+key, data)
        if err != nil {
                return false, err
        }
@@ -404,7 +424,7 @@ func (s *SyncStorage) RemoveIf(ns string, key string, data interface{}) (bool, e
 //given namespace exists, thus operation is not guaranteed to be atomic or isolated.
 func (s *SyncStorage) GetAll(ns string) ([]string, error) {
        nsPrefix := getNsPrefix(ns)
-       keys, err := s.Keys(nsPrefix + "*")
+       keys, err := s.getDbBackend(ns).Keys(nsPrefix + "*")
        var retVal []string
        if err != nil {
                return retVal, err
@@ -418,12 +438,12 @@ func (s *SyncStorage) GetAll(ns string) ([]string, error) {
 //RemoveAll removes all keys under the namespace. Remove operation is not atomic, thus
 //it is not guaranteed that all keys are removed.
 func (s *SyncStorage) RemoveAll(ns string) error {
-       keys, err := s.Keys(getNsPrefix(ns) + "*")
+       keys, err := s.getDbBackend(ns).Keys(getNsPrefix(ns) + "*")
        if err != nil {
                return err
        }
        if (keys != nil) && (len(keys) != 0) {
-               err = s.Del(keys)
+               err = s.getDbBackend(ns).Del(keys)
        }
        return err
 }
@@ -433,19 +453,19 @@ func (s *SyncStorage) RemoveAll(ns string) error {
 //not guaranteed that all keys are removed.
 func (s *SyncStorage) RemoveAllAndPublish(ns string, channelsAndEvents []string) error {
        nsPrefix := getNsPrefix(ns)
-       keys, err := s.Keys(nsPrefix + "*")
+       keys, err := s.getDbBackend(ns).Keys(nsPrefix + "*")
        if err != nil {
                return err
        }
        if (keys != nil) && (len(keys) != 0) {
                if len(channelsAndEvents) == 0 {
-                       return s.Del(keys)
+                       return s.getDbBackend(ns).Del(keys)
                }
                if err := s.checkChannelsAndEvents("RemoveAllAndPublish", channelsAndEvents); err != nil {
                        return err
                }
                channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
-               err = s.DelMPub(channelsAndEventsPrepared, keys)
+               err = s.getDbBackend(ns).DelMPub(channelsAndEventsPrepared, keys)
        }
        return err
 }
@@ -456,22 +476,22 @@ func (s *SyncStorage) RemoveAllAndPublish(ns string, channelsAndEvents []string)
 //unique. It is possible to add the same member several times without the
 //need to check if it already exists.
 func (s *SyncStorage) AddMember(ns string, group string, member ...interface{}) error {
-       return s.SAdd(getNsPrefix(ns)+group, member...)
+       return s.getDbBackend(ns).SAdd(getNsPrefix(ns)+group, member...)
 }
 
 //RemoveMember removes members from a group under given namespace.
 func (s *SyncStorage) RemoveMember(ns string, group string, member ...interface{}) error {
-       return s.SRem(getNsPrefix(ns)+group, member...)
+       return s.getDbBackend(ns).SRem(getNsPrefix(ns)+group, member...)
 }
 
 //RemoveGroup removes the whole group along with it's members under given namespace.
 func (s *SyncStorage) RemoveGroup(ns string, group string) error {
-       return s.Del([]string{getNsPrefix(ns) + group})
+       return s.getDbBackend(ns).Del([]string{getNsPrefix(ns) + group})
 }
 
 //GetMembers returns all the members from a group under given namespace.
 func (s *SyncStorage) GetMembers(ns string, group string) ([]string, error) {
-       retVal, err := s.SMembers(getNsPrefix(ns) + group)
+       retVal, err := s.getDbBackend(ns).SMembers(getNsPrefix(ns) + group)
        if err != nil {
                return []string{}, err
        }
@@ -480,7 +500,7 @@ func (s *SyncStorage) GetMembers(ns string, group string) ([]string, error) {
 
 //IsMember returns true if given member is found from a group under given namespace.
 func (s *SyncStorage) IsMember(ns string, group string, member interface{}) (bool, error) {
-       retVal, err := s.SIsMember(getNsPrefix(ns)+group, member)
+       retVal, err := s.getDbBackend(ns).SIsMember(getNsPrefix(ns)+group, member)
        if err != nil {
                return false, err
        }
@@ -489,7 +509,7 @@ func (s *SyncStorage) IsMember(ns string, group string, member interface{}) (boo
 
 //GroupSize returns the number of members in a group under given namespace.
 func (s *SyncStorage) GroupSize(ns string, group string) (int64, error) {
-       retVal, err := s.SCard(getNsPrefix(ns) + group)
+       retVal, err := s.getDbBackend(ns).SCard(getNsPrefix(ns) + group)
        if err != nil {
                return 0, err
        }
@@ -523,7 +543,7 @@ func (s *SyncStorage) LockResource(ns string, resource string, expiration time.D
 
        var retryTimer *time.Timer
        for i, attempts := 0, opt.getRetryCount()+1; i < attempts; i++ {
-               ok, err := s.SetNX(getNsPrefix(ns)+resource, value, expiration)
+               ok, err := s.getDbBackend(ns).SetNX(getNsPrefix(ns)+resource, value, expiration)
                if err != nil {
                        return nil, err
                } else if ok {
@@ -547,7 +567,7 @@ func (s *SyncStorage) LockResource(ns string, resource string, expiration time.D
 //is already expired or some other instance is keeping the lock (lock taken after
 //expiration), an error is returned.
 func (l *SyncStorageLock) ReleaseResource(ns string) error {
-       ok, err := l.s.DelIE(getNsPrefix(ns)+l.key, l.value)
+       ok, err := l.s.getDbBackend(ns).DelIE(getNsPrefix(ns)+l.key, l.value)
 
        if err != nil {
                return err
@@ -562,14 +582,14 @@ func (l *SyncStorageLock) ReleaseResource(ns string) error {
 //resource lock (if the lock still exists) under given namespace. The old
 //remaining expiration time is overwritten with the given new expiration time.
 func (l *SyncStorageLock) RefreshResource(ns string, expiration time.Duration) error {
-       err := l.s.PExpireIE(getNsPrefix(ns)+l.key, l.value, expiration)
+       err := l.s.getDbBackend(ns).PExpireIE(getNsPrefix(ns)+l.key, l.value, expiration)
        return err
 }
 
 //CheckResource returns the expiration time left for a resource under given
 //namespace. If the resource doesn't exist, -2 is returned.
 func (s *SyncStorage) CheckResource(ns string, resource string) (time.Duration, error) {
-       result, err := s.PTTL(getNsPrefix(ns) + resource)
+       result, err := s.getDbBackend(ns).PTTL(getNsPrefix(ns) + resource)
        if err != nil {
                return 0, err
        }