Implement sentinel based DB capacity scaling
[ric-plt/sdlgo.git] / internal / sdlgoredis / sdlgoredis.go
index 327946e..570dfa1 100644 (file)
@@ -34,6 +34,7 @@ import (
 )
 
 type ChannelNotificationCb func(channel string, payload ...string)
+type RedisClientCreator func(addr, port, clusterName string, isHa bool) RedisClient
 
 type intChannels struct {
        addChannel    chan string
@@ -46,6 +47,14 @@ type sharedCbMap struct {
        cbMap map[string]ChannelNotificationCb
 }
 
+type Config struct {
+       hostname        string
+       port            string
+       masterName      string
+       sentinelPort    string
+       clusterAddrList string
+}
+
 type DB struct {
        client       RedisClient
        subscribe    SubscribeFn
@@ -134,41 +143,94 @@ func CreateDB(client RedisClient, subscribe SubscribeFn) *DB {
        return &db
 }
 
-func Create() *DB {
-       var client *redis.Client
-       hostname := os.Getenv("DBAAS_SERVICE_HOST")
-       if hostname == "" {
-               hostname = "localhost"
+func Create() []*DB {
+       osimpl := osImpl{}
+       return ReadConfigAndCreateDbClients(osimpl, newRedisClient)
+}
+
+func readConfig(osI OS) Config {
+       cfg := Config{
+               hostname:        osI.Getenv("DBAAS_SERVICE_HOST", "localhost"),
+               port:            osI.Getenv("DBAAS_SERVICE_PORT", "6379"),
+               masterName:      osI.Getenv("DBAAS_MASTER_NAME", ""),
+               sentinelPort:    osI.Getenv("DBAAS_SERVICE_SENTINEL_PORT", ""),
+               clusterAddrList: osI.Getenv("DBAAS_CLUSTER_ADDR_LIST", ""),
+       }
+       return cfg
+}
+
+type OS interface {
+       Getenv(key string, defValue string) string
+}
+
+type osImpl struct{}
+
+func (osImpl) Getenv(key string, defValue string) string {
+       val := os.Getenv(key)
+       if val == "" {
+               val = defValue
+       }
+       return val
+}
+
+func ReadConfigAndCreateDbClients(osI OS, clientCreator RedisClientCreator) []*DB {
+       cfg := readConfig(osI)
+       return createDbClients(cfg, clientCreator)
+}
+
+func createDbClients(cfg Config, clientCreator RedisClientCreator) []*DB {
+       if cfg.clusterAddrList == "" {
+               return []*DB{createLegacyDbClient(cfg, clientCreator)}
        }
-       port := os.Getenv("DBAAS_SERVICE_PORT")
-       if port == "" {
-               port = "6379"
+
+       dbs := []*DB{}
+
+       addrList := strings.Split(cfg.clusterAddrList, ",")
+       for _, addr := range addrList {
+               db := createDbClient(cfg, addr, clientCreator)
+               dbs = append(dbs, db)
        }
-       sentinelPort := os.Getenv("DBAAS_SERVICE_SENTINEL_PORT")
-       masterName := os.Getenv("DBAAS_MASTER_NAME")
-       if sentinelPort == "" {
-               redisAddress := hostname + ":" + port
-               client = redis.NewClient(&redis.Options{
-                       Addr:       redisAddress,
-                       Password:   "", // no password set
-                       DB:         0,  // use default DB
-                       PoolSize:   20,
-                       MaxRetries: 2,
-               })
+       return dbs
+}
+
+func createLegacyDbClient(cfg Config, clientCreator RedisClientCreator) *DB {
+       return createDbClient(cfg, cfg.hostname, clientCreator)
+}
+
+func createDbClient(cfg Config, hostName string, clientCreator RedisClientCreator) *DB {
+       var client RedisClient
+       if cfg.sentinelPort == "" {
+               client = clientCreator(hostName, cfg.port, "", false)
        } else {
-               sentinelAddress := hostname + ":" + sentinelPort
-               client = redis.NewFailoverClient(&redis.FailoverOptions{
-                       MasterName:    masterName,
-                       SentinelAddrs: []string{sentinelAddress},
-                       PoolSize:      20,
-                       MaxRetries:    2,
-               })
+               client = clientCreator(hostName, cfg.sentinelPort, cfg.masterName, true)
        }
        db := CreateDB(client, subscribeNotifications)
        db.CheckCommands()
        return db
 }
 
+func newRedisClient(addr, port, clusterName string, isHa bool) RedisClient {
+       if isHa == true {
+               sentinelAddress := addr + ":" + port
+               return redis.NewFailoverClient(
+                       &redis.FailoverOptions{
+                               MasterName:    clusterName,
+                               SentinelAddrs: []string{sentinelAddress},
+                               PoolSize:      20,
+                               MaxRetries:    2,
+                       },
+               )
+       }
+       redisAddress := addr + ":" + port
+       return redis.NewClient(&redis.Options{
+               Addr:       redisAddress,
+               Password:   "", // no password set
+               DB:         0,  // use default DB
+               PoolSize:   20,
+               MaxRetries: 2,
+       })
+}
+
 func (db *DB) CheckCommands() {
        commands, err := db.client.Command().Result()
        if err == nil {