)
type ChannelNotificationCb func(channel string, payload ...string)
+type RedisClientCreator func(addr, port, clusterName string, isHa bool) RedisClient
type intChannels struct {
addChannel chan string
cbMap map[string]ChannelNotificationCb
}
+type Config struct {
+ hostname string
+ port string
+ masterName string
+ sentinelPort string
+ clusterAddrList string
+}
+
type DB struct {
client RedisClient
subscribe SubscribeFn
return &db
}
-func Create() *DB {
- var client *redis.Client
- hostname := os.Getenv("DBAAS_SERVICE_HOST")
- if hostname == "" {
- hostname = "localhost"
+func Create() []*DB {
+ osimpl := osImpl{}
+ return ReadConfigAndCreateDbClients(osimpl, newRedisClient)
+}
+
+func readConfig(osI OS) Config {
+ cfg := Config{
+ hostname: osI.Getenv("DBAAS_SERVICE_HOST", "localhost"),
+ port: osI.Getenv("DBAAS_SERVICE_PORT", "6379"),
+ masterName: osI.Getenv("DBAAS_MASTER_NAME", ""),
+ sentinelPort: osI.Getenv("DBAAS_SERVICE_SENTINEL_PORT", ""),
+ clusterAddrList: osI.Getenv("DBAAS_CLUSTER_ADDR_LIST", ""),
+ }
+ return cfg
+}
+
+type OS interface {
+ Getenv(key string, defValue string) string
+}
+
+type osImpl struct{}
+
+func (osImpl) Getenv(key string, defValue string) string {
+ val := os.Getenv(key)
+ if val == "" {
+ val = defValue
+ }
+ return val
+}
+
+func ReadConfigAndCreateDbClients(osI OS, clientCreator RedisClientCreator) []*DB {
+ cfg := readConfig(osI)
+ return createDbClients(cfg, clientCreator)
+}
+
+func createDbClients(cfg Config, clientCreator RedisClientCreator) []*DB {
+ if cfg.clusterAddrList == "" {
+ return []*DB{createLegacyDbClient(cfg, clientCreator)}
}
- port := os.Getenv("DBAAS_SERVICE_PORT")
- if port == "" {
- port = "6379"
+
+ dbs := []*DB{}
+
+ addrList := strings.Split(cfg.clusterAddrList, ",")
+ for _, addr := range addrList {
+ db := createDbClient(cfg, addr, clientCreator)
+ dbs = append(dbs, db)
}
- sentinelPort := os.Getenv("DBAAS_SERVICE_SENTINEL_PORT")
- masterName := os.Getenv("DBAAS_MASTER_NAME")
- if sentinelPort == "" {
- redisAddress := hostname + ":" + port
- client = redis.NewClient(&redis.Options{
- Addr: redisAddress,
- Password: "", // no password set
- DB: 0, // use default DB
- PoolSize: 20,
- MaxRetries: 2,
- })
+ return dbs
+}
+
+func createLegacyDbClient(cfg Config, clientCreator RedisClientCreator) *DB {
+ return createDbClient(cfg, cfg.hostname, clientCreator)
+}
+
+func createDbClient(cfg Config, hostName string, clientCreator RedisClientCreator) *DB {
+ var client RedisClient
+ if cfg.sentinelPort == "" {
+ client = clientCreator(hostName, cfg.port, "", false)
} else {
- sentinelAddress := hostname + ":" + sentinelPort
- client = redis.NewFailoverClient(&redis.FailoverOptions{
- MasterName: masterName,
- SentinelAddrs: []string{sentinelAddress},
- PoolSize: 20,
- MaxRetries: 2,
- })
+ client = clientCreator(hostName, cfg.sentinelPort, cfg.masterName, true)
}
db := CreateDB(client, subscribeNotifications)
db.CheckCommands()
return db
}
+func newRedisClient(addr, port, clusterName string, isHa bool) RedisClient {
+ if isHa == true {
+ sentinelAddress := addr + ":" + port
+ return redis.NewFailoverClient(
+ &redis.FailoverOptions{
+ MasterName: clusterName,
+ SentinelAddrs: []string{sentinelAddress},
+ PoolSize: 20,
+ MaxRetries: 2,
+ },
+ )
+ }
+ redisAddress := addr + ":" + port
+ return redis.NewClient(&redis.Options{
+ Addr: redisAddress,
+ Password: "", // no password set
+ DB: 0, // use default DB
+ PoolSize: 20,
+ MaxRetries: 2,
+ })
+}
+
func (db *DB) CheckCommands() {
commands, err := db.client.Command().Result()
if err == nil {
mock.Mock
}
+type MockOS struct {
+ mock.Mock
+}
+
func (m *pubSubMock) Channel() <-chan *redis.Message {
return m.Called().Get(0).(chan *redis.Message)
}
}
}
+func (m *MockOS) Getenv(key string, defValue string) string {
+ a := m.Called(key, defValue)
+ return a.String(0)
+}
+
func setup(commandsExists bool) (*pubSubMock, *clientMock, *sdlgoredis.DB) {
mock := new(clientMock)
pubSubMock, subscribeNotifications := setSubscribeNotifications()
return pubSubMock, mock, db
}
+func setupEnv(host, port, msname, sntport, clsaddrlist string) ([]*clientMock, []*sdlgoredis.DB) {
+ var clmocks []*clientMock
+
+ dummyCommandInfo := redis.CommandInfo{
+ Name: "dummy",
+ }
+ cmdResult := make(map[string]*redis.CommandInfo, 0)
+
+ cmdResult = map[string]*redis.CommandInfo{
+ "dummy": &dummyCommandInfo,
+ }
+
+ osmock := new(MockOS)
+ osmock.On("Getenv", "DBAAS_SERVICE_HOST", "localhost").Return(host)
+ osmock.On("Getenv", "DBAAS_SERVICE_PORT", "6379").Return(port)
+ osmock.On("Getenv", "DBAAS_MASTER_NAME", "").Return(msname)
+ osmock.On("Getenv", "DBAAS_SERVICE_SENTINEL_PORT", "").Return(sntport)
+ osmock.On("Getenv", "DBAAS_CLUSTER_ADDR_LIST", "").Return(clsaddrlist)
+
+ clients := sdlgoredis.ReadConfigAndCreateDbClients(
+ osmock,
+ func(addr, port, clusterName string, isHa bool) sdlgoredis.RedisClient {
+ clm := new(clientMock)
+ clm.On("Command").Return(redis.NewCommandsInfoCmdResult(cmdResult, nil))
+ clmocks = append(clmocks, clm)
+ return clm
+ },
+ )
+
+ return clmocks, clients
+}
+
+func TestCloseDbSuccessfully(t *testing.T) {
+ _, r, db := setup(true)
+ r.On("Close").Return(nil)
+ err := db.CloseDB()
+ assert.Nil(t, err)
+ r.AssertExpectations(t)
+}
+
+func TestCloseDbFailure(t *testing.T) {
+ _, r, db := setup(true)
+ r.On("Close").Return(errors.New("Some error"))
+ err := db.CloseDB()
+ assert.NotNil(t, err)
+ r.AssertExpectations(t)
+}
+
func TestMSetSuccessfully(t *testing.T) {
_, r, db := setup(true)
expectedKeysAndValues := []interface{}{"key1", "value1", "key2", 2}
assert.NotNil(t, err)
r.AssertExpectations(t)
}
+
+func TestClientStandaloneRedisLegacyEnv(t *testing.T) {
+ rcls, dbs := setupEnv(
+ "service-ricplt-dbaas-tcp-cluster-0.ricplt", "6376", "", "", "",
+ )
+ assert.Equal(t, 1, len(rcls))
+ assert.Equal(t, 1, len(dbs))
+
+ expectedKeysAndValues := []interface{}{"key1", "value1"}
+ rcls[0].On("MSet", expectedKeysAndValues).Return(redis.NewStatusResult("OK", nil))
+ err := dbs[0].MSet("key1", "value1")
+ assert.Nil(t, err)
+ rcls[0].AssertExpectations(t)
+}
+
+func TestClientSentinelRedisLegacyEnv(t *testing.T) {
+ rcls, dbs := setupEnv(
+ "service-ricplt-dbaas-tcp-cluster-0.ricplt", "6376", "dbaasmaster", "26376", "",
+ )
+ assert.Equal(t, 1, len(rcls))
+ assert.Equal(t, 1, len(dbs))
+
+ expectedKeysAndValues := []interface{}{"key1", "value1"}
+ rcls[0].On("MSet", expectedKeysAndValues).Return(redis.NewStatusResult("OK", nil))
+ err := dbs[0].MSet("key1", "value1")
+ assert.Nil(t, err)
+ rcls[0].AssertExpectations(t)
+}
+
+func TestClientTwoStandaloneRedisEnvs(t *testing.T) {
+ rcls, dbs := setupEnv(
+ "service-ricplt-dbaas-tcp-cluster-0.ricplt", "6376", "", "",
+ "service-ricplt-dbaas-tcp-cluster-0.ricplt,service-ricplt-dbaas-tcp-cluster-1.ricplt",
+ )
+ assert.Equal(t, 2, len(rcls))
+ assert.Equal(t, 2, len(dbs))
+
+ expectedKeysAndValues := []interface{}{"key1", "value1"}
+ rcls[0].On("MSet", expectedKeysAndValues).Return(redis.NewStatusResult("OK", nil))
+ err := dbs[0].MSet("key1", "value1")
+ assert.Nil(t, err)
+ rcls[0].AssertExpectations(t)
+
+ expectedKeysAndValues = []interface{}{"key2", "value2"}
+ rcls[1].On("MSet", expectedKeysAndValues).Return(redis.NewStatusResult("OK", nil))
+ err = dbs[1].MSet("key2", "value2")
+ assert.Nil(t, err)
+ rcls[0].AssertExpectations(t)
+ rcls[1].AssertExpectations(t)
+}
+
+func TestClientTwoSentinelRedisEnvs(t *testing.T) {
+ rcls, dbs := setupEnv(
+ "service-ricplt-dbaas-tcp-cluster-0.ricplt", "6376", "dbaasmaster", "26376",
+ "service-ricplt-dbaas-tcp-cluster-0.ricplt,service-ricplt-dbaas-tcp-cluster-1.ricplt",
+ )
+ assert.Equal(t, 2, len(rcls))
+ assert.Equal(t, 2, len(dbs))
+
+ expectedKeysAndValues := []interface{}{"key1", "value1"}
+ rcls[0].On("MSet", expectedKeysAndValues).Return(redis.NewStatusResult("OK", nil))
+ err := dbs[0].MSet("key1", "value1")
+ assert.Nil(t, err)
+ rcls[0].AssertExpectations(t)
+
+ expectedKeysAndValues = []interface{}{"key2", "value2"}
+ rcls[1].On("MSet", expectedKeysAndValues).Return(redis.NewStatusResult("OK", nil))
+ err = dbs[1].MSet("key2", "value2")
+ assert.Nil(t, err)
+ rcls[0].AssertExpectations(t)
+ rcls[1].AssertExpectations(t)
+}
"encoding/base64"
"errors"
"fmt"
+ "hash/crc32"
"io"
"reflect"
"strings"
eventSeparator string
mutex sync.Mutex
tmp []byte
- iDatabase
+ db *Database
}
//NewSyncStorage creates a new sdl instance.
func newSyncStorage(db *Database) *SyncStorage {
return &SyncStorage{
eventSeparator: "___",
- iDatabase: db.instance,
+ db: db,
}
}
+//selectDbInstance Selects DB instance what provides DB services for the namespace
+func (s *SyncStorage) getDbBackend(ns string) iDatabase {
+ instanceCount := uint32(len(s.db.instances))
+ instanceID := getHash(ns) % instanceCount
+ return s.db.instances[instanceID]
+}
+
+//getHash Returns hash value calculated from the string
+func getHash(s string) uint32 {
+ tbl := crc32.MakeTable(crc32.IEEE)
+ return crc32.Checksum([]byte(s), tbl)
+}
+
//SubscribeChannel lets you to subscribe for a events on a given channels.
//SDL notifications are events that are published on a specific channels.
//Both the channel and events are defined by the entity that is publishing
//events received from different channels, callbacks are called in series one by one.
func (s *SyncStorage) SubscribeChannel(ns string, cb func(string, ...string), channels ...string) error {
nsPrefix := getNsPrefix(ns)
- s.SubscribeChannelDB(cb, nsPrefix, s.eventSeparator, s.setNamespaceToChannels(nsPrefix, channels...)...)
+ s.getDbBackend(ns).SubscribeChannelDB(cb, nsPrefix, s.eventSeparator, s.setNamespaceToChannels(nsPrefix, channels...)...)
return nil
}
//namespace.
func (s *SyncStorage) UnsubscribeChannel(ns string, channels ...string) error {
nsPrefix := getNsPrefix(ns)
- s.UnsubscribeChannelDB(s.setNamespaceToChannels(nsPrefix, channels...)...)
+ s.getDbBackend(ns).UnsubscribeChannelDB(s.setNamespaceToChannels(nsPrefix, channels...)...)
return nil
}
//Close connection to backend database.
func (s *SyncStorage) Close() error {
- return s.CloseDB()
+ var ret error
+ for _, db := range s.db.instances {
+ if err := db.CloseDB(); err != nil {
+ ret = err
+ }
+ }
+ return ret
}
func (s *SyncStorage) checkChannelsAndEvents(cmd string, channelsAndEvents []string) error {
return err
}
if len(channelsAndEvents) == 0 {
- return s.MSet(keyAndData...)
+ return s.getDbBackend(ns).MSet(keyAndData...)
}
if err := s.checkChannelsAndEvents("SetAndPublish", channelsAndEvents); err != nil {
return err
}
channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
- return s.MSetMPub(channelsAndEventsPrepared, keyAndData...)
+ return s.getDbBackend(ns).MSetMPub(channelsAndEventsPrepared, keyAndData...)
}
//Set function writes data to shared data layer storage. Writing is done
if err != nil {
return err
}
- return s.MSet(keyAndData...)
+ return s.getDbBackend(ns).MSet(keyAndData...)
}
//Get function atomically reads one or more keys from SDL. The returned map has the
for _, v := range keys {
keysWithNs = append(keysWithNs, getNsPrefix(ns)+v)
}
- val, err := s.MGet(keysWithNs)
+ val, err := s.getDbBackend(ns).MGet(keysWithNs)
if err != nil {
return m, err
}
func (s *SyncStorage) SetIfAndPublish(ns string, channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error) {
nsPrefix := getNsPrefix(ns)
if len(channelsAndEvents) == 0 {
- return s.SetIE(nsPrefix+key, oldData, newData)
+ return s.getDbBackend(ns).SetIE(nsPrefix+key, oldData, newData)
}
if err := s.checkChannelsAndEvents("SetIfAndPublish", channelsAndEvents); err != nil {
return false, err
}
channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
- return s.SetIEPub(channelsAndEventsPrepared, nsPrefix+key, oldData, newData)
+ return s.getDbBackend(ns).SetIEPub(channelsAndEventsPrepared, nsPrefix+key, oldData, newData)
}
//SetIf atomically replaces existing data with newData in SDL if data matches the oldData.
//If replace was done successfully, true will be returned.
//Data is written under the namespace what is given as a parameter for this function.
func (s *SyncStorage) SetIf(ns string, key string, oldData, newData interface{}) (bool, error) {
- return s.SetIE(getNsPrefix(ns)+key, oldData, newData)
+ return s.getDbBackend(ns).SetIE(getNsPrefix(ns)+key, oldData, newData)
}
//SetIfNotExistsAndPublish conditionally sets the value of a key. If key already exists in SDL,
func (s *SyncStorage) SetIfNotExistsAndPublish(ns string, channelsAndEvents []string, key string, data interface{}) (bool, error) {
nsPrefix := getNsPrefix(ns)
if len(channelsAndEvents) == 0 {
- return s.SetNX(nsPrefix+key, data, 0)
+ return s.getDbBackend(ns).SetNX(nsPrefix+key, data, 0)
}
if err := s.checkChannelsAndEvents("SetIfNotExistsAndPublish", channelsAndEvents); err != nil {
return false, err
}
channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
- return s.SetNXPub(channelsAndEventsPrepared, nsPrefix+key, data)
+ return s.getDbBackend(ns).SetNXPub(channelsAndEventsPrepared, nsPrefix+key, data)
}
//SetIfNotExists conditionally sets the value of a key. If key already exists in SDL,
//is done atomically.
//Data is written under the namespace what is given as a parameter for this function.
func (s *SyncStorage) SetIfNotExists(ns string, key string, data interface{}) (bool, error) {
- return s.SetNX(getNsPrefix(ns)+key, data, 0)
+ return s.getDbBackend(ns).SetNX(getNsPrefix(ns)+key, data, 0)
}
//RemoveAndPublish removes data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
keysWithNs = append(keysWithNs, nsPrefix+v)
}
if len(channelsAndEvents) == 0 {
- return s.Del(keysWithNs)
+ return s.getDbBackend(ns).Del(keysWithNs)
}
if err := s.checkChannelsAndEvents("RemoveAndPublish", channelsAndEvents); err != nil {
return err
}
channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
- return s.DelMPub(channelsAndEventsPrepared, keysWithNs)
+ return s.getDbBackend(ns).DelMPub(channelsAndEventsPrepared, keysWithNs)
}
//Remove data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
for _, v := range keys {
keysWithNs = append(keysWithNs, getNsPrefix(ns)+v)
}
- err := s.Del(keysWithNs)
+ err := s.getDbBackend(ns).Del(keysWithNs)
return err
}
func (s *SyncStorage) RemoveIfAndPublish(ns string, channelsAndEvents []string, key string, data interface{}) (bool, error) {
nsPrefix := getNsPrefix(ns)
if len(channelsAndEvents) == 0 {
- return s.DelIE(nsPrefix+key, data)
+ return s.getDbBackend(ns).DelIE(nsPrefix+key, data)
}
if err := s.checkChannelsAndEvents("RemoveIfAndPublish", channelsAndEvents); err != nil {
return false, err
}
channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
- return s.DelIEPub(channelsAndEventsPrepared, nsPrefix+key, data)
+ return s.getDbBackend(ns).DelIEPub(channelsAndEventsPrepared, nsPrefix+key, data)
}
//RemoveIf removes data from SDL conditionally. If existing data matches given data,
//key and data are removed from SDL. If remove was done successfully, true is returned.
//Data is removed under the namespace what is given as a parameter for this function.
func (s *SyncStorage) RemoveIf(ns string, key string, data interface{}) (bool, error) {
- status, err := s.DelIE(getNsPrefix(ns)+key, data)
+ status, err := s.getDbBackend(ns).DelIE(getNsPrefix(ns)+key, data)
if err != nil {
return false, err
}
//given namespace exists, thus operation is not guaranteed to be atomic or isolated.
func (s *SyncStorage) GetAll(ns string) ([]string, error) {
nsPrefix := getNsPrefix(ns)
- keys, err := s.Keys(nsPrefix + "*")
+ keys, err := s.getDbBackend(ns).Keys(nsPrefix + "*")
var retVal []string
if err != nil {
return retVal, err
//RemoveAll removes all keys under the namespace. Remove operation is not atomic, thus
//it is not guaranteed that all keys are removed.
func (s *SyncStorage) RemoveAll(ns string) error {
- keys, err := s.Keys(getNsPrefix(ns) + "*")
+ keys, err := s.getDbBackend(ns).Keys(getNsPrefix(ns) + "*")
if err != nil {
return err
}
if (keys != nil) && (len(keys) != 0) {
- err = s.Del(keys)
+ err = s.getDbBackend(ns).Del(keys)
}
return err
}
//not guaranteed that all keys are removed.
func (s *SyncStorage) RemoveAllAndPublish(ns string, channelsAndEvents []string) error {
nsPrefix := getNsPrefix(ns)
- keys, err := s.Keys(nsPrefix + "*")
+ keys, err := s.getDbBackend(ns).Keys(nsPrefix + "*")
if err != nil {
return err
}
if (keys != nil) && (len(keys) != 0) {
if len(channelsAndEvents) == 0 {
- return s.Del(keys)
+ return s.getDbBackend(ns).Del(keys)
}
if err := s.checkChannelsAndEvents("RemoveAllAndPublish", channelsAndEvents); err != nil {
return err
}
channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
- err = s.DelMPub(channelsAndEventsPrepared, keys)
+ err = s.getDbBackend(ns).DelMPub(channelsAndEventsPrepared, keys)
}
return err
}
//unique. It is possible to add the same member several times without the
//need to check if it already exists.
func (s *SyncStorage) AddMember(ns string, group string, member ...interface{}) error {
- return s.SAdd(getNsPrefix(ns)+group, member...)
+ return s.getDbBackend(ns).SAdd(getNsPrefix(ns)+group, member...)
}
//RemoveMember removes members from a group under given namespace.
func (s *SyncStorage) RemoveMember(ns string, group string, member ...interface{}) error {
- return s.SRem(getNsPrefix(ns)+group, member...)
+ return s.getDbBackend(ns).SRem(getNsPrefix(ns)+group, member...)
}
//RemoveGroup removes the whole group along with it's members under given namespace.
func (s *SyncStorage) RemoveGroup(ns string, group string) error {
- return s.Del([]string{getNsPrefix(ns) + group})
+ return s.getDbBackend(ns).Del([]string{getNsPrefix(ns) + group})
}
//GetMembers returns all the members from a group under given namespace.
func (s *SyncStorage) GetMembers(ns string, group string) ([]string, error) {
- retVal, err := s.SMembers(getNsPrefix(ns) + group)
+ retVal, err := s.getDbBackend(ns).SMembers(getNsPrefix(ns) + group)
if err != nil {
return []string{}, err
}
//IsMember returns true if given member is found from a group under given namespace.
func (s *SyncStorage) IsMember(ns string, group string, member interface{}) (bool, error) {
- retVal, err := s.SIsMember(getNsPrefix(ns)+group, member)
+ retVal, err := s.getDbBackend(ns).SIsMember(getNsPrefix(ns)+group, member)
if err != nil {
return false, err
}
//GroupSize returns the number of members in a group under given namespace.
func (s *SyncStorage) GroupSize(ns string, group string) (int64, error) {
- retVal, err := s.SCard(getNsPrefix(ns) + group)
+ retVal, err := s.getDbBackend(ns).SCard(getNsPrefix(ns) + group)
if err != nil {
return 0, err
}
var retryTimer *time.Timer
for i, attempts := 0, opt.getRetryCount()+1; i < attempts; i++ {
- ok, err := s.SetNX(getNsPrefix(ns)+resource, value, expiration)
+ ok, err := s.getDbBackend(ns).SetNX(getNsPrefix(ns)+resource, value, expiration)
if err != nil {
return nil, err
} else if ok {
//is already expired or some other instance is keeping the lock (lock taken after
//expiration), an error is returned.
func (l *SyncStorageLock) ReleaseResource(ns string) error {
- ok, err := l.s.DelIE(getNsPrefix(ns)+l.key, l.value)
+ ok, err := l.s.getDbBackend(ns).DelIE(getNsPrefix(ns)+l.key, l.value)
if err != nil {
return err
//resource lock (if the lock still exists) under given namespace. The old
//remaining expiration time is overwritten with the given new expiration time.
func (l *SyncStorageLock) RefreshResource(ns string, expiration time.Duration) error {
- err := l.s.PExpireIE(getNsPrefix(ns)+l.key, l.value, expiration)
+ err := l.s.getDbBackend(ns).PExpireIE(getNsPrefix(ns)+l.key, l.value, expiration)
return err
}
//CheckResource returns the expiration time left for a resource under given
//namespace. If the resource doesn't exist, -2 is returned.
func (s *SyncStorage) CheckResource(ns string, resource string) (time.Duration, error) {
- result, err := s.PTTL(getNsPrefix(ns) + resource)
+ result, err := s.getDbBackend(ns).PTTL(getNsPrefix(ns) + resource)
if err != nil {
return 0, err
}