)
type ChannelNotificationCb func(channel string, payload ...string)
+type RedisClientCreator func(addr, port, clusterName string, isHa bool) RedisClient
type intChannels struct {
addChannel chan string
cbMap map[string]ChannelNotificationCb
}
+type Config struct {
+ hostname string
+ port string
+ masterName string
+ sentinelPort string
+ clusterAddrList string
+}
+
type DB struct {
client RedisClient
subscribe SubscribeFn
if err != nil {
return false, err
}
- if result.(int) == int(1) {
- return true, nil
+ if n, ok := result.(int64); ok {
+ if n == 1 {
+ return true, nil
+ }
+ } else if n, ok := result.(int); ok {
+ if n == 1 {
+ return true, nil
+ }
}
return false, nil
}
return &db
}
-func Create() *DB {
- var client *redis.Client
- hostname := os.Getenv("DBAAS_SERVICE_HOST")
- if hostname == "" {
- hostname = "localhost"
+func Create() []*DB {
+ osimpl := osImpl{}
+ return ReadConfigAndCreateDbClients(osimpl, newRedisClient)
+}
+
+func readConfig(osI OS) Config {
+ cfg := Config{
+ hostname: osI.Getenv("DBAAS_SERVICE_HOST", "localhost"),
+ port: osI.Getenv("DBAAS_SERVICE_PORT", "6379"),
+ masterName: osI.Getenv("DBAAS_MASTER_NAME", ""),
+ sentinelPort: osI.Getenv("DBAAS_SERVICE_SENTINEL_PORT", ""),
+ clusterAddrList: osI.Getenv("DBAAS_CLUSTER_ADDR_LIST", ""),
}
- port := os.Getenv("DBAAS_SERVICE_PORT")
- if port == "" {
- port = "6379"
+ return cfg
+}
+
+type OS interface {
+ Getenv(key string, defValue string) string
+}
+
+type osImpl struct{}
+
+func (osImpl) Getenv(key string, defValue string) string {
+ val := os.Getenv(key)
+ if val == "" {
+ val = defValue
}
- sentinelPort := os.Getenv("DBAAS_SERVICE_SENTINEL_PORT")
- masterName := os.Getenv("DBAAS_MASTER_NAME")
- if sentinelPort == "" {
- redisAddress := hostname + ":" + port
- client = redis.NewClient(&redis.Options{
- Addr: redisAddress,
- Password: "", // no password set
- DB: 0, // use default DB
- PoolSize: 20,
- MaxRetries: 2,
- })
+ return val
+}
+
+func ReadConfigAndCreateDbClients(osI OS, clientCreator RedisClientCreator) []*DB {
+ cfg := readConfig(osI)
+ return createDbClients(cfg, clientCreator)
+}
+
+func createDbClients(cfg Config, clientCreator RedisClientCreator) []*DB {
+ if cfg.clusterAddrList == "" {
+ return []*DB{createLegacyDbClient(cfg, clientCreator)}
+ }
+
+ dbs := []*DB{}
+
+ addrList := strings.Split(cfg.clusterAddrList, ",")
+ for _, addr := range addrList {
+ db := createDbClient(cfg, addr, clientCreator)
+ dbs = append(dbs, db)
+ }
+ return dbs
+}
+
+func createLegacyDbClient(cfg Config, clientCreator RedisClientCreator) *DB {
+ return createDbClient(cfg, cfg.hostname, clientCreator)
+}
+
+func createDbClient(cfg Config, hostName string, clientCreator RedisClientCreator) *DB {
+ var client RedisClient
+ if cfg.sentinelPort == "" {
+ client = clientCreator(hostName, cfg.port, "", false)
} else {
- sentinelAddress := hostname + ":" + sentinelPort
- client = redis.NewFailoverClient(&redis.FailoverOptions{
- MasterName: masterName,
- SentinelAddrs: []string{sentinelAddress},
- PoolSize: 20,
- MaxRetries: 2,
- })
+ client = clientCreator(hostName, cfg.sentinelPort, cfg.masterName, true)
}
db := CreateDB(client, subscribeNotifications)
db.CheckCommands()
return db
}
+func newRedisClient(addr, port, clusterName string, isHa bool) RedisClient {
+ if isHa == true {
+ sentinelAddress := addr + ":" + port
+ return redis.NewFailoverClient(
+ &redis.FailoverOptions{
+ MasterName: clusterName,
+ SentinelAddrs: []string{sentinelAddress},
+ PoolSize: 20,
+ MaxRetries: 2,
+ },
+ )
+ }
+ redisAddress := addr + ":" + port
+ return redis.NewClient(&redis.Options{
+ Addr: redisAddress,
+ Password: "", // no password set
+ DB: 0, // use default DB
+ PoolSize: 20,
+ MaxRetries: 2,
+ })
+}
+
func (db *DB) CheckCommands() {
commands, err := db.client.Command().Result()
if err == nil {
return checkResultAndError(db.client.Do("SETIE", key, newData, oldData).Result())
}
-func (db *DB) SetIEPub(channel, message, key string, oldData, newData interface{}) (bool, error) {
+func (db *DB) SetIEPub(channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error) {
if !db.redisModules {
- return false, errors.New("Redis deployment not supporting command SETIEPUB")
+ return false, errors.New("Redis deployment not supporting command SETIEMPUB")
+ }
+ capacity := 4 + len(channelsAndEvents)
+ command := make([]interface{}, 0, capacity)
+ command = append(command, "SETIEMPUB")
+ command = append(command, key)
+ command = append(command, newData)
+ command = append(command, oldData)
+ for _, ce := range channelsAndEvents {
+ command = append(command, ce)
}
- return checkResultAndError(db.client.Do("SETIEPUB", key, newData, oldData, channel, message).Result())
+ return checkResultAndError(db.client.Do(command...).Result())
}
-func (db *DB) SetNXPub(channel, message, key string, data interface{}) (bool, error) {
+func (db *DB) SetNXPub(channelsAndEvents []string, key string, data interface{}) (bool, error) {
if !db.redisModules {
- return false, errors.New("Redis deployment not supporting command SETNXPUB")
+ return false, errors.New("Redis deployment not supporting command SETNXMPUB")
}
- return checkResultAndError(db.client.Do("SETNXPUB", key, data, channel, message).Result())
+ capacity := 3 + len(channelsAndEvents)
+ command := make([]interface{}, 0, capacity)
+ command = append(command, "SETNXMPUB")
+ command = append(command, key)
+ command = append(command, data)
+ for _, ce := range channelsAndEvents {
+ command = append(command, ce)
+ }
+ return checkResultAndError(db.client.Do(command...).Result())
}
func (db *DB) SetNX(key string, data interface{}, expiration time.Duration) (bool, error) {
return db.client.SetNX(key, data, expiration).Result()
}
-func (db *DB) DelIEPub(channel, message, key string, data interface{}) (bool, error) {
+func (db *DB) DelIEPub(channelsAndEvents []string, key string, data interface{}) (bool, error) {
if !db.redisModules {
- return false, errors.New("Redis deployment not supporting command")
+ return false, errors.New("Redis deployment not supporting command DELIEMPUB")
+ }
+ capacity := 3 + len(channelsAndEvents)
+ command := make([]interface{}, 0, capacity)
+ command = append(command, "DELIEMPUB")
+ command = append(command, key)
+ command = append(command, data)
+ for _, ce := range channelsAndEvents {
+ command = append(command, ce)
}
- return checkIntResultAndError(db.client.Do("DELIEPUB", key, data, channel, message).Result())
+ return checkIntResultAndError(db.client.Do(command...).Result())
}
func (db *DB) DelIE(key string, data interface{}) (bool, error) {