Handle CloseDB() error in statistics query
[ric-plt/sdlgo.git] / internal / sdlgoredis / sdlgoredis.go
index 0b93239..7f66f0c 100644 (file)
@@ -1,6 +1,6 @@
 /*
    Copyright (c) 2019 AT&T Intellectual Property.
-   Copyright (c) 2018-2019 Nokia.
+   Copyright (c) 2018-2022 Nokia.
 
    Licensed under the Apache License, Version 2.0 (the "License");
    you may not use this file except in compliance with the License.
 package sdlgoredis
 
 import (
+       "context"
        "errors"
        "fmt"
-       "github.com/go-redis/redis/v7"
+       "github.com/go-redis/redis/v8"
        "io"
        "log"
        "net"
@@ -37,6 +38,9 @@ import (
        "time"
 )
 
+const EventSeparator = "___"
+const NsSeparator = ","
+
 type ChannelNotificationCb func(channel string, payload ...string)
 type RedisClientCreator func(addr, port, clusterName string, isHa bool) RedisClient
 
@@ -52,66 +56,72 @@ type sharedCbMap struct {
 }
 
 type Config struct {
-       hostname        string
-       port            string
-       masterName      string
-       sentinelPort    string
-       clusterAddrList string
-       nodeCnt         string
+       hostname      string
+       ports         []string
+       masterNames   []string
+       sentinelPorts []string
+       clusterAddrs  []string
+       nodeCnt       string
 }
 
 type DB struct {
+       ctx          context.Context
        client       RedisClient
        sentinel     RedisSentinelCreateCb
        subscribe    SubscribeFn
        redisModules bool
        sCbMap       *sharedCbMap
        ch           intChannels
-       cfg          Config
        addr         string
+       port         string
+       sentinelPort string
+       masterName   string
+       nodeCnt      string
 }
 
 type Subscriber interface {
-       Channel() <-chan *redis.Message
-       Subscribe(channels ...string) error
-       Unsubscribe(channels ...string) error
+       Channel(opts ...redis.ChannelOption) <-chan *redis.Message
+       Subscribe(ctx context.Context, channels ...string) error
+       Unsubscribe(ctx context.Context, channels ...string) error
        Close() error
 }
 
-type SubscribeFn func(client RedisClient, channels ...string) Subscriber
+type SubscribeFn func(ctx context.Context, client RedisClient, channels ...string) Subscriber
 
 type RedisClient interface {
-       Command() *redis.CommandsInfoCmd
+       Command(ctx context.Context) *redis.CommandsInfoCmd
        Close() error
-       Subscribe(channels ...string) *redis.PubSub
-       MSet(pairs ...interface{}) *redis.StatusCmd
-       Do(args ...interface{}) *redis.Cmd
-       MGet(keys ...string) *redis.SliceCmd
-       Del(keys ...string) *redis.IntCmd
-       Keys(pattern string) *redis.StringSliceCmd
-       SetNX(key string, value interface{}, expiration time.Duration) *redis.BoolCmd
-       SAdd(key string, members ...interface{}) *redis.IntCmd
-       SRem(key string, members ...interface{}) *redis.IntCmd
-       SMembers(key string) *redis.StringSliceCmd
-       SIsMember(key string, member interface{}) *redis.BoolCmd
-       SCard(key string) *redis.IntCmd
-       PTTL(key string) *redis.DurationCmd
-       Eval(script string, keys []string, args ...interface{}) *redis.Cmd
-       EvalSha(sha1 string, keys []string, args ...interface{}) *redis.Cmd
-       ScriptExists(scripts ...string) *redis.BoolSliceCmd
-       ScriptLoad(script string) *redis.StringCmd
-       Info(section ...string) *redis.StringCmd
-}
-
-var dbLogger *log.Logger
+       Subscribe(ctx context.Context, channels ...string) *redis.PubSub
+       MSet(ctx context.Context, pairs ...interface{}) *redis.StatusCmd
+       Do(ctx context.Context, args ...interface{}) *redis.Cmd
+       MGet(ctx context.Context, keys ...string) *redis.SliceCmd
+       Del(ctx context.Context, keys ...string) *redis.IntCmd
+       Keys(ctx context.Context, pattern string) *redis.StringSliceCmd
+       SetNX(ctx context.Context, key string, value interface{}, expiration time.Duration) *redis.BoolCmd
+       SAdd(ctx context.Context, key string, members ...interface{}) *redis.IntCmd
+       SRem(ctx context.Context, key string, members ...interface{}) *redis.IntCmd
+       SMembers(ctx context.Context, key string) *redis.StringSliceCmd
+       SIsMember(ctx context.Context, key string, member interface{}) *redis.BoolCmd
+       SCard(ctx context.Context, key string) *redis.IntCmd
+       PTTL(ctx context.Context, key string) *redis.DurationCmd
+       Eval(ctx context.Context, script string, keys []string, args ...interface{}) *redis.Cmd
+       EvalSha(ctx context.Context, sha1 string, keys []string, args ...interface{}) *redis.Cmd
+       ScriptExists(ctx context.Context, scripts ...string) *redis.BoolSliceCmd
+       ScriptLoad(ctx context.Context, script string) *redis.StringCmd
+       Info(ctx context.Context, section ...string) *redis.StringCmd
+}
+
+var dbLogger *logger
 
 func init() {
-       dbLogger = log.New(os.Stdout, "database: ", log.LstdFlags|log.Lshortfile)
+       dbLogger = &logger{
+               log: log.New(os.Stdout, "database: ", log.LstdFlags|log.Lshortfile),
+       }
        redis.SetLogger(dbLogger)
 }
 
 func SetDbLogger(out io.Writer) {
-       dbLogger.SetOutput(out)
+       dbLogger.log.SetOutput(out)
 }
 
 func checkResultAndError(result interface{}, err error) (bool, error) {
@@ -143,12 +153,14 @@ func checkIntResultAndError(result interface{}, err error) (bool, error) {
        return false, nil
 }
 
-func subscribeNotifications(client RedisClient, channels ...string) Subscriber {
-       return client.Subscribe(channels...)
+func subscribeNotifications(ctx context.Context, client RedisClient, channels ...string) Subscriber {
+       return client.Subscribe(ctx, channels...)
 }
 
-func CreateDB(client RedisClient, subscribe SubscribeFn, sentinelCreateCb RedisSentinelCreateCb, cfg Config, sentinelAddr string) *DB {
+func CreateDB(client RedisClient, subscribe SubscribeFn, sentinelCreateCb RedisSentinelCreateCb,
+       addr, port, sentinelPort, masterName, nodeCnt string) *DB {
        db := DB{
+               ctx:          context.Background(),
                client:       client,
                sentinel:     sentinelCreateCb,
                subscribe:    subscribe,
@@ -159,8 +171,11 @@ func CreateDB(client RedisClient, subscribe SubscribeFn, sentinelCreateCb RedisS
                        removeChannel: make(chan string),
                        exit:          make(chan bool),
                },
-               cfg:  cfg,
-               addr: sentinelAddr,
+               addr:         addr,
+               sentinelPort: sentinelPort,
+               port:         port,
+               masterName:   masterName,
+               nodeCnt:      nodeCnt,
        }
 
        return &db
@@ -173,13 +188,23 @@ func Create() []*DB {
 
 func readConfig(osI OS) Config {
        cfg := Config{
-               hostname:        osI.Getenv("DBAAS_SERVICE_HOST", "localhost"),
-               port:            osI.Getenv("DBAAS_SERVICE_PORT", "6379"),
-               masterName:      osI.Getenv("DBAAS_MASTER_NAME", ""),
-               sentinelPort:    osI.Getenv("DBAAS_SERVICE_SENTINEL_PORT", ""),
-               clusterAddrList: osI.Getenv("DBAAS_CLUSTER_ADDR_LIST", ""),
-               nodeCnt:         osI.Getenv("DBAAS_NODE_COUNT", "1"),
+               hostname: osI.Getenv("DBAAS_SERVICE_HOST", "localhost"),
+               ports:    strings.Split(osI.Getenv("DBAAS_SERVICE_PORT", "6379"), ","),
+               nodeCnt:  osI.Getenv("DBAAS_NODE_COUNT", "1"),
+       }
+
+       if addrStr := osI.Getenv("DBAAS_CLUSTER_ADDR_LIST", ""); addrStr != "" {
+               cfg.clusterAddrs = strings.Split(addrStr, ",")
+       } else if cfg.hostname != "" {
+               cfg.clusterAddrs = append(cfg.clusterAddrs, cfg.hostname)
        }
+       if sntPortStr := osI.Getenv("DBAAS_SERVICE_SENTINEL_PORT", ""); sntPortStr != "" {
+               cfg.sentinelPorts = strings.Split(sntPortStr, ",")
+       }
+       if nameStr := osI.Getenv("DBAAS_MASTER_NAME", ""); nameStr != "" {
+               cfg.masterNames = strings.Split(nameStr, ",")
+       }
+       completeConfig(&cfg)
        return cfg
 }
 
@@ -197,47 +222,61 @@ func (osImpl) Getenv(key string, defValue string) string {
        return val
 }
 
-func ReadConfigAndCreateDbClients(osI OS, clientCreator RedisClientCreator,
-       subscribe SubscribeFn,
-       sentinelCreateCb RedisSentinelCreateCb) []*DB {
-       cfg := readConfig(osI)
-       return createDbClients(cfg, clientCreator, subscribe, sentinelCreateCb)
+func completeConfig(cfg *Config) {
+       if len(cfg.sentinelPorts) == 0 {
+               if len(cfg.clusterAddrs) > len(cfg.ports) && len(cfg.ports) > 0 {
+                       for i := len(cfg.ports); i < len(cfg.clusterAddrs); i++ {
+                               cfg.ports = append(cfg.ports, cfg.ports[i-1])
+                       }
+               }
+       } else {
+               if len(cfg.clusterAddrs) > len(cfg.sentinelPorts) {
+                       for i := len(cfg.sentinelPorts); i < len(cfg.clusterAddrs); i++ {
+                               cfg.sentinelPorts = append(cfg.sentinelPorts, cfg.sentinelPorts[i-1])
+                       }
+               }
+               if len(cfg.clusterAddrs) > len(cfg.masterNames) && len(cfg.masterNames) > 0 {
+                       for i := len(cfg.masterNames); i < len(cfg.clusterAddrs); i++ {
+                               cfg.masterNames = append(cfg.masterNames, cfg.masterNames[i-1])
+                       }
+               }
+       }
 }
 
-func createDbClients(cfg Config, clientCreator RedisClientCreator,
+func ReadConfigAndCreateDbClients(osI OS, clientCreator RedisClientCreator,
        subscribe SubscribeFn,
        sentinelCreateCb RedisSentinelCreateCb) []*DB {
-       if cfg.clusterAddrList == "" {
-               return []*DB{createLegacyDbClient(cfg, clientCreator, subscribe, sentinelCreateCb)}
-       }
-
        dbs := []*DB{}
-
-       addrList := strings.Split(cfg.clusterAddrList, ",")
-       for _, addr := range addrList {
-               db := createDbClient(cfg, addr, clientCreator, subscribe, sentinelCreateCb)
+       cfg := readConfig(osI)
+       for i, addr := range cfg.clusterAddrs {
+               port := getListItem(cfg.ports, i)
+               sntPort := getListItem(cfg.sentinelPorts, i)
+               name := getListItem(cfg.masterNames, i)
+               db := createDbClient(addr, port, sntPort, name, cfg.nodeCnt,
+                       clientCreator, subscribe, sentinelCreateCb)
                dbs = append(dbs, db)
        }
        return dbs
 }
 
-func createLegacyDbClient(cfg Config, clientCreator RedisClientCreator,
-       subscribe SubscribeFn,
-       sentinelCreateCb RedisSentinelCreateCb) *DB {
-       return createDbClient(cfg, cfg.hostname, clientCreator, subscribe, sentinelCreateCb)
+func getListItem(list []string, index int) string {
+       if index < len(list) {
+               return list[index]
+       }
+       return ""
 }
 
-func createDbClient(cfg Config, hostName string, clientCreator RedisClientCreator,
+func createDbClient(addr, port, sentinelPort, masterName, nodeCnt string, clientCreator RedisClientCreator,
        subscribe SubscribeFn,
        sentinelCreateCb RedisSentinelCreateCb) *DB {
        var client RedisClient
        var db *DB
-       if cfg.sentinelPort == "" {
-               client = clientCreator(hostName, cfg.port, "", false)
-               db = CreateDB(client, subscribe, nil, cfg, hostName)
+       if sentinelPort == "" {
+               client = clientCreator(addr, port, "", false)
+               db = CreateDB(client, subscribe, nil, addr, port, sentinelPort, masterName, nodeCnt)
        } else {
-               client = clientCreator(hostName, cfg.sentinelPort, cfg.masterName, true)
-               db = CreateDB(client, subscribe, sentinelCreateCb, cfg, hostName)
+               client = clientCreator(addr, sentinelPort, masterName, true)
+               db = CreateDB(client, subscribe, sentinelCreateCb, addr, port, sentinelPort, masterName, nodeCnt)
        }
        db.CheckCommands()
        return db
@@ -266,7 +305,7 @@ func newRedisClient(addr, port, clusterName string, isHa bool) RedisClient {
 }
 
 func (db *DB) CheckCommands() {
-       commands, err := db.client.Command().Result()
+       commands, err := db.client.Command(db.ctx).Result()
        if err == nil {
                redisModuleCommands := []string{"setie", "delie", "setiepub", "setnxpub",
                        "msetmpub", "delmpub"}
@@ -277,7 +316,7 @@ func (db *DB) CheckCommands() {
                        }
                }
        } else {
-               dbLogger.Printf("SDL DB commands checking failure: %s\n", err)
+               dbLogger.Printf(db.ctx, "SDL DB commands checking failure: %s\n", err)
        }
 }
 
@@ -285,28 +324,25 @@ func (db *DB) CloseDB() error {
        return db.client.Close()
 }
 
-func (db *DB) UnsubscribeChannelDB(channels ...string) {
+func (db *DB) UnsubscribeChannelDB(channels ...string) error {
        for _, v := range channels {
                db.sCbMap.Remove(v)
                db.ch.removeChannel <- v
+               errStr := <-db.ch.removeChannel
+               if errStr != "" {
+                       return fmt.Errorf("SDL Unsubscribe of channel %s failed: %s", v, errStr)
+               }
                if db.sCbMap.Count() == 0 {
                        db.ch.exit <- true
                }
        }
+       return nil
 }
 
-func (db *DB) SubscribeChannelDB(cb func(string, ...string), channelPrefix, eventSeparator string, channels ...string) {
+func (db *DB) SubscribeChannelDB(cb func(string, ...string), channels ...string) error {
        if db.sCbMap.Count() == 0 {
-               for _, v := range channels {
-                       db.sCbMap.Add(v, cb)
-               }
-
-               go func(sCbMap *sharedCbMap,
-                       channelPrefix,
-                       eventSeparator string,
-                       ch intChannels,
-                       channels ...string) {
-                       sub := db.subscribe(db.client, channels...)
+               go func(sCbMap *sharedCbMap, ch intChannels) {
+                       sub := db.subscribe(db.ctx, db.client, "")
                        rxChannel := sub.Channel()
                        lCbMap := sCbMap.GetMapCopy()
                        for {
@@ -314,35 +350,47 @@ func (db *DB) SubscribeChannelDB(cb func(string, ...string), channelPrefix, even
                                case msg := <-rxChannel:
                                        cb, ok := lCbMap[msg.Channel]
                                        if ok {
-                                               cb(strings.TrimPrefix(msg.Channel, channelPrefix), strings.Split(msg.Payload, eventSeparator)...)
+                                               nSChNames := strings.SplitAfterN(msg.Channel, NsSeparator, 2)
+                                               cb(nSChNames[1], strings.Split(msg.Payload, EventSeparator)...)
                                        }
                                case channel := <-ch.addChannel:
                                        lCbMap = sCbMap.GetMapCopy()
-                                       sub.Subscribe(channel)
+                                       if err := sub.Subscribe(db.ctx, channel); err != nil {
+                                               ch.addChannel <- err.Error()
+                                       } else {
+                                               ch.addChannel <- ""
+                                       }
                                case channel := <-ch.removeChannel:
                                        lCbMap = sCbMap.GetMapCopy()
-                                       sub.Unsubscribe(channel)
+                                       if err := sub.Unsubscribe(db.ctx, channel); err != nil {
+                                               ch.removeChannel <- err.Error()
+                                       } else {
+                                               ch.removeChannel <- ""
+                                       }
                                case exit := <-ch.exit:
                                        if exit {
                                                if err := sub.Close(); err != nil {
-                                                       dbLogger.Printf("SDL DB channel closing failure: %s\n", err)
+                                                       dbLogger.Printf(db.ctx, "SDL DB channel closing failure: %s\n", err)
                                                }
                                                return
                                        }
                                }
                        }
-               }(db.sCbMap, channelPrefix, eventSeparator, db.ch, channels...)
-
-       } else {
-               for _, v := range channels {
-                       db.sCbMap.Add(v, cb)
-                       db.ch.addChannel <- v
+               }(db.sCbMap, db.ch)
+       }
+       for _, v := range channels {
+               db.sCbMap.Add(v, cb)
+               db.ch.addChannel <- v
+               errStr := <-db.ch.addChannel
+               if errStr != "" {
+                       return fmt.Errorf("SDL Subscribe of channel %s failed: %s", v, errStr)
                }
        }
+       return nil
 }
 
 func (db *DB) MSet(pairs ...interface{}) error {
-       return db.client.MSet(pairs...).Err()
+       return db.client.MSet(db.ctx, pairs...).Err()
 }
 
 func (db *DB) MSetMPub(channelsAndEvents []string, pairs ...interface{}) error {
@@ -359,12 +407,12 @@ func (db *DB) MSetMPub(channelsAndEvents []string, pairs ...interface{}) error {
        for _, d := range channelsAndEvents {
                command = append(command, d)
        }
-       _, err := db.client.Do(command...).Result()
+       _, err := db.client.Do(db.ctx, command...).Result()
        return err
 }
 
 func (db *DB) MGet(keys []string) ([]interface{}, error) {
-       return db.client.MGet(keys...).Result()
+       return db.client.MGet(db.ctx, keys...).Result()
 }
 
 func (db *DB) DelMPub(channelsAndEvents []string, keys []string) error {
@@ -381,18 +429,18 @@ func (db *DB) DelMPub(channelsAndEvents []string, keys []string) error {
        for _, d := range channelsAndEvents {
                command = append(command, d)
        }
-       _, err := db.client.Do(command...).Result()
+       _, err := db.client.Do(db.ctx, command...).Result()
        return err
 
 }
 
 func (db *DB) Del(keys []string) error {
-       _, err := db.client.Del(keys...).Result()
+       _, err := db.client.Del(db.ctx, keys...).Result()
        return err
 }
 
 func (db *DB) Keys(pattern string) ([]string, error) {
-       return db.client.Keys(pattern).Result()
+       return db.client.Keys(db.ctx, pattern).Result()
 }
 
 func (db *DB) SetIE(key string, oldData, newData interface{}) (bool, error) {
@@ -400,7 +448,7 @@ func (db *DB) SetIE(key string, oldData, newData interface{}) (bool, error) {
                return false, errors.New("Redis deployment not supporting command")
        }
 
-       return checkResultAndError(db.client.Do("SETIE", key, newData, oldData).Result())
+       return checkResultAndError(db.client.Do(db.ctx, "SETIE", key, newData, oldData).Result())
 }
 
 func (db *DB) SetIEPub(channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error) {
@@ -416,7 +464,7 @@ func (db *DB) SetIEPub(channelsAndEvents []string, key string, oldData, newData
        for _, ce := range channelsAndEvents {
                command = append(command, ce)
        }
-       return checkResultAndError(db.client.Do(command...).Result())
+       return checkResultAndError(db.client.Do(db.ctx, command...).Result())
 }
 
 func (db *DB) SetNXPub(channelsAndEvents []string, key string, data interface{}) (bool, error) {
@@ -431,10 +479,10 @@ func (db *DB) SetNXPub(channelsAndEvents []string, key string, data interface{})
        for _, ce := range channelsAndEvents {
                command = append(command, ce)
        }
-       return checkResultAndError(db.client.Do(command...).Result())
+       return checkResultAndError(db.client.Do(db.ctx, command...).Result())
 }
 func (db *DB) SetNX(key string, data interface{}, expiration time.Duration) (bool, error) {
-       return db.client.SetNX(key, data, expiration).Result()
+       return db.client.SetNX(db.ctx, key, data, expiration).Result()
 }
 
 func (db *DB) DelIEPub(channelsAndEvents []string, key string, data interface{}) (bool, error) {
@@ -449,56 +497,56 @@ func (db *DB) DelIEPub(channelsAndEvents []string, key string, data interface{})
        for _, ce := range channelsAndEvents {
                command = append(command, ce)
        }
-       return checkIntResultAndError(db.client.Do(command...).Result())
+       return checkIntResultAndError(db.client.Do(db.ctx, command...).Result())
 }
 
 func (db *DB) DelIE(key string, data interface{}) (bool, error) {
        if !db.redisModules {
                return false, errors.New("Redis deployment not supporting command")
        }
-       return checkIntResultAndError(db.client.Do("DELIE", key, data).Result())
+       return checkIntResultAndError(db.client.Do(db.ctx, "DELIE", key, data).Result())
 }
 
 func (db *DB) SAdd(key string, data ...interface{}) error {
-       _, err := db.client.SAdd(key, data...).Result()
+       _, err := db.client.SAdd(db.ctx, key, data...).Result()
        return err
 }
 
 func (db *DB) SRem(key string, data ...interface{}) error {
-       _, err := db.client.SRem(key, data...).Result()
+       _, err := db.client.SRem(db.ctx, key, data...).Result()
        return err
 }
 
 func (db *DB) SMembers(key string) ([]string, error) {
-       result, err := db.client.SMembers(key).Result()
+       result, err := db.client.SMembers(db.ctx, key).Result()
        return result, err
 }
 
 func (db *DB) SIsMember(key string, data interface{}) (bool, error) {
-       result, err := db.client.SIsMember(key, data).Result()
+       result, err := db.client.SIsMember(db.ctx, key, data).Result()
        return result, err
 }
 
 func (db *DB) SCard(key string) (int64, error) {
-       result, err := db.client.SCard(key).Result()
+       result, err := db.client.SCard(db.ctx, key).Result()
        return result, err
 }
 
 func (db *DB) PTTL(key string) (time.Duration, error) {
-       result, err := db.client.PTTL(key).Result()
+       result, err := db.client.PTTL(db.ctx, key).Result()
        return result, err
 }
 
 func (db *DB) Info() (*DbInfo, error) {
        var info DbInfo
-       resultStr, err := db.client.Info("all").Result()
+       resultStr, err := db.client.Info(db.ctx, "all").Result()
        if err != nil {
                return &info, err
        }
 
        result := strings.Split(strings.ReplaceAll(resultStr, "\r\n", "\n"), "\n")
-       err = readRedisInfoReplyFields(result, &info)
-       return &info, err
+       readRedisInfoReplyFields(result, &info)
+       return &info, nil
 }
 
 func lineContains(line, substr string) bool {
@@ -558,9 +606,8 @@ func getCommandstatsValues(values string) (string, string, string) {
        return calls, usec, usecPerCall
 }
 
-func updateCommandstatsValues(i interface{}, line, cmdstat string) {
+func updateCommandstatsValues(i interface{}, values string) {
        stype := reflect.ValueOf(i).Elem()
-       values := getFieldValueStr(line, cmdstat)
        callsStr, usecStr, usecPerCallStr := getCommandstatsValues(values)
 
        callsField := stype.FieldByName("Calls")
@@ -580,9 +627,8 @@ func getKeyspaceValues(values string) (string, string, string) {
        return keys, expires, avgttl
 }
 
-func updateKeyspaceValues(i interface{}, line, keyspace string) {
+func updateKeyspaceValues(i interface{}, values string) {
        stype := reflect.ValueOf(i).Elem()
-       values := getFieldValueStr(line, keyspace)
        keysStr, expiresStr, avgttlStr := getKeyspaceValues(values)
 
        keysField := stype.FieldByName("Keys")
@@ -595,102 +641,179 @@ func updateKeyspaceValues(i interface{}, line, keyspace string) {
        avgttlField.Set(reflect.ValueOf(getUint32FromString(avgttlStr)))
 }
 
-func readRedisInfoReplyFields(input []string, info *DbInfo) error {
+func updateServerInfoFields(config ConfigInfo, info *DbInfo) {
+       if value, ok := config["uptime_in_days"]; ok {
+               info.Fields.Server.UptimeInDays = getUint32FromString(value)
+       }
+}
+
+func updateClientInfoFields(config ConfigInfo, info *DbInfo) {
+       if value, ok := config["connected_clients"]; ok {
+               info.Fields.Clients.ConnectedClients = getUint32FromString(value)
+       }
+       if value, ok := config["client_recent_max_input_buffer"]; ok {
+               info.Fields.Clients.ClientRecentMaxInputBuffer = getUint32FromString(value)
+       }
+       if value, ok := config["client_recent_max_output_buffer"]; ok {
+               info.Fields.Clients.ClientRecentMaxOutputBuffer = getUint32FromString(value)
+       }
+}
+
+func updateMemoryInfoFields(config ConfigInfo, info *DbInfo) {
+       if value, ok := config["used_memory"]; ok {
+               info.Fields.Memory.UsedMemory = getUint64FromString(value)
+       }
+       if value, ok := config["used_memory_human"]; ok {
+               info.Fields.Memory.UsedMemoryHuman = value
+       }
+       if value, ok := config["used_memory_rss"]; ok {
+               info.Fields.Memory.UsedMemoryRss = getUint64FromString(value)
+       }
+       if value, ok := config["used_memory_rss_human"]; ok {
+               info.Fields.Memory.UsedMemoryRssHuman = value
+       }
+       if value, ok := config["used_memory_peak"]; ok {
+               info.Fields.Memory.UsedMemoryPeak = getUint64FromString(value)
+       }
+       if value, ok := config["used_memory_peak_human"]; ok {
+               info.Fields.Memory.UsedMemoryPeakHuman = value
+       }
+       if value, ok := config["used_memory_peak_perc"]; ok {
+               info.Fields.Memory.UsedMemoryPeakPerc = value
+       }
+       if value, ok := config["mem_fragmentation_ratio"]; ok {
+               info.Fields.Memory.MemFragmentationRatio = getFloat32FromString(value)
+       }
+       if value, ok := config["mem_fragmentation_bytes"]; ok {
+               info.Fields.Memory.MemFragmentationBytes = getUint32FromString(value)
+       }
+}
+
+func updateStatsInfoFields(config ConfigInfo, info *DbInfo) {
+       if value, ok := config["total_connections_received"]; ok {
+               info.Fields.Stats.TotalConnectionsReceived = getUint32FromString(value)
+       }
+       if value, ok := config["total_commands_processed"]; ok {
+               info.Fields.Stats.TotalCommandsProcessed = getUint32FromString(value)
+       }
+       if value, ok := config["sync_full"]; ok {
+               info.Fields.Stats.SyncFull = getUint32FromString(value)
+       }
+       if value, ok := config["sync_partial_ok"]; ok {
+               info.Fields.Stats.SyncPartialOk = getUint32FromString(value)
+       }
+       if value, ok := config["sync_partial_err"]; ok {
+               info.Fields.Stats.SyncPartialErr = getUint32FromString(value)
+       }
+       if value, ok := config["pubsub_channels"]; ok {
+               info.Fields.Stats.PubsubChannels = getUint32FromString(value)
+       }
+}
+
+func updateCpuInfoFields(config ConfigInfo, info *DbInfo) {
+       if value, ok := config["used_cpu_sys"]; ok {
+               info.Fields.Cpu.UsedCpuSys = getFloat64FromString(value)
+       }
+       if value, ok := config["used_cpu_user"]; ok {
+               info.Fields.Cpu.UsedCpuUser = getFloat64FromString(value)
+       }
+}
+
+func updateCommandstatsInfoFields(config ConfigInfo, info *DbInfo) {
+       if values, ok := config["cmdstat_replconf"]; ok {
+               updateCommandstatsValues(&info.Fields.Commandstats.CmdstatReplconf, values)
+       }
+       if values, ok := config["cmdstat_keys"]; ok {
+               updateCommandstatsValues(&info.Fields.Commandstats.CmdstatKeys, values)
+       }
+       if values, ok := config["cmdstat_role"]; ok {
+               updateCommandstatsValues(&info.Fields.Commandstats.CmdstatRole, values)
+       }
+       if values, ok := config["cmdstat_psync"]; ok {
+               updateCommandstatsValues(&info.Fields.Commandstats.CmdstatPsync, values)
+       }
+       if values, ok := config["cmdstat_mset"]; ok {
+               updateCommandstatsValues(&info.Fields.Commandstats.CmdstatMset, values)
+       }
+       if values, ok := config["cmdstat_publish"]; ok {
+               updateCommandstatsValues(&info.Fields.Commandstats.CmdstatPublish, values)
+       }
+       if values, ok := config["cmdstat_info"]; ok {
+               updateCommandstatsValues(&info.Fields.Commandstats.CmdstatInfo, values)
+       }
+       if values, ok := config["cmdstat_ping"]; ok {
+               updateCommandstatsValues(&info.Fields.Commandstats.CmdstatPing, values)
+       }
+       if values, ok := config["cmdstat_client"]; ok {
+               updateCommandstatsValues(&info.Fields.Commandstats.CmdstatClient, values)
+       }
+       if values, ok := config["cmdstat_command"]; ok {
+               updateCommandstatsValues(&info.Fields.Commandstats.CmdstatCommand, values)
+       }
+       if values, ok := config["cmdstat_subscribe"]; ok {
+               updateCommandstatsValues(&info.Fields.Commandstats.CmdstatSubscribe, values)
+       }
+       if values, ok := config["cmdstat_monitor"]; ok {
+               updateCommandstatsValues(&info.Fields.Commandstats.CmdstatMonitor, values)
+       }
+       if values, ok := config["cmdstat_config"]; ok {
+               updateCommandstatsValues(&info.Fields.Commandstats.CmdstatConfig, values)
+       }
+       if values, ok := config["cmdstat_slaveof"]; ok {
+               updateCommandstatsValues(&info.Fields.Commandstats.CmdstatSlaveof, values)
+       }
+}
+
+func updateKeyspaceInfoFields(config ConfigInfo, info *DbInfo) {
+       if values, ok := config["db0"]; ok {
+               updateKeyspaceValues(&info.Fields.Keyspace.Db, values)
+       }
+}
+
+func getConfigInfo(input []string) ConfigInfo {
+       config := ConfigInfo{}
        for _, line := range input {
-               switch {
-               case lineContains(line, "role:") && !lineContains(line, "_role:"):
-                       if "master" == getFieldValueStr(line, "role:") {
-                               info.Fields.PrimaryRole = true
+               if i := strings.Index(line, ":"); i != -1 {
+                       if key := strings.TrimSpace(line[:i]); len(key) > 0 {
+                               if len(line) > i {
+                                       config[key] = strings.TrimSpace(line[i+1:])
+                               }
                        }
-               case lineContains(line, "connected_slaves:"):
-                       info.Fields.ConnectedReplicaCnt = getUint32FromString(getFieldValueStr(line, "connected_slaves:"))
-               case lineContains(line, "uptime_in_days:"):
-                       info.Fields.Server.UptimeInDays = getUint32FromString(getFieldValueStr(line, "uptime_in_days:"))
-               case lineContains(line, "connected_clients:"):
-                       info.Fields.Clients.ConnectedClients = getUint32FromString(getFieldValueStr(line, "connected_clients:"))
-               case lineContains(line, "client_recent_max_input_buffer:"):
-                       info.Fields.Clients.ClientRecentMaxInputBuffer = getUint32FromString(getFieldValueStr(line, "client_recent_max_input_buffer:"))
-               case lineContains(line, "client_recent_max_output_buffer:"):
-                       info.Fields.Clients.ClientRecentMaxOutputBuffer = getUint32FromString(getFieldValueStr(line, "client_recent_max_output_buffer:"))
-               case lineContains(line, "used_memory:"):
-                       info.Fields.Memory.UsedMemory = getUint64FromString(getFieldValueStr(line, "used_memory:"))
-               case lineContains(line, "used_memory_human:"):
-                       info.Fields.Memory.UsedMemoryHuman = getFieldValueStr(line, "used_memory_human:")
-               case lineContains(line, "used_memory_rss:"):
-                       info.Fields.Memory.UsedMemoryRss = getUint64FromString(getFieldValueStr(line, "used_memory_rss:"))
-               case lineContains(line, "used_memory_rss_human:"):
-                       info.Fields.Memory.UsedMemoryRssHuman = getFieldValueStr(line, "used_memory_rss_human:")
-               case lineContains(line, "used_memory_peak:"):
-                       info.Fields.Memory.UsedMemoryPeak = getUint64FromString(getFieldValueStr(line, "used_memory_peak:"))
-               case lineContains(line, "used_memory_peak_human:"):
-                       info.Fields.Memory.UsedMemoryPeakHuman = getFieldValueStr(line, "used_memory_peak_human:")
-               case lineContains(line, "used_memory_peak_perc:"):
-                       info.Fields.Memory.UsedMemoryPeakPerc = getFieldValueStr(line, "used_memory_peak_perc:")
-               case lineContains(line, "mem_fragmentation_ratio:"):
-                       info.Fields.Memory.MemFragmentationRatio = getFloat32FromString(getFieldValueStr(line, "mem_fragmentation_ratio:"))
-               case lineContains(line, "mem_fragmentation_bytes:"):
-                       info.Fields.Memory.MemFragmentationBytes = getUint32FromString(getFieldValueStr(line, "mem_fragmentation_bytes:"))
-               case lineContains(line, "total_connections_received:"):
-                       info.Fields.Stats.TotalConnectionsReceived = getUint32FromString(getFieldValueStr(line, "total_connections_received:"))
-               case lineContains(line, "total_commands_processed:"):
-                       info.Fields.Stats.TotalCommandsProcessed = getUint32FromString(getFieldValueStr(line, "total_commands_processed:"))
-               case lineContains(line, "sync_full:"):
-                       info.Fields.Stats.SyncFull = getUint32FromString(getFieldValueStr(line, "sync_full:"))
-               case lineContains(line, "sync_partial_ok:"):
-                       info.Fields.Stats.SyncPartialOk = getUint32FromString(getFieldValueStr(line, "sync_partial_ok:"))
-               case lineContains(line, "sync_partial_err:"):
-                       info.Fields.Stats.SyncPartialErr = getUint32FromString(getFieldValueStr(line, "sync_partial_err:"))
-               case lineContains(line, "pubsub_channels:"):
-                       info.Fields.Stats.PubsubChannels = getUint32FromString(getFieldValueStr(line, "pubsub_channels:"))
-               case lineContains(line, "used_cpu_sys:"):
-                       info.Fields.Cpu.UsedCpuSys = getFloat64FromString(getFieldValueStr(line, "used_cpu_sys:"))
-               case lineContains(line, "used_cpu_user:"):
-                       info.Fields.Cpu.UsedCpuUser = getFloat64FromString(getFieldValueStr(line, "used_cpu_user:"))
-               case lineContains(line, "cmdstat_replconf:"):
-                       updateCommandstatsValues(&info.Fields.Commandstats.CmdstatReplconf, line, "cmdstat_replconf:")
-               case lineContains(line, "cmdstat_keys:"):
-                       updateCommandstatsValues(&info.Fields.Commandstats.CmdstatKeys, line, "cmdstat_keys:")
-               case lineContains(line, "cmdstat_role:"):
-                       updateCommandstatsValues(&info.Fields.Commandstats.CmdstatRole, line, "cmdstat_role:")
-               case lineContains(line, "cmdstat_psync:"):
-                       updateCommandstatsValues(&info.Fields.Commandstats.CmdstatPsync, line, "cmdstat_psync:")
-               case lineContains(line, "cmdstat_mset:"):
-                       updateCommandstatsValues(&info.Fields.Commandstats.CmdstatMset, line, "cmdstat_mset:")
-               case lineContains(line, "cmdstat_publish:"):
-                       updateCommandstatsValues(&info.Fields.Commandstats.CmdstatPublish, line, "cmdstat_publish:")
-               case lineContains(line, "cmdstat_info:"):
-                       updateCommandstatsValues(&info.Fields.Commandstats.CmdstatInfo, line, "cmdstat_info:")
-               case lineContains(line, "cmdstat_ping:"):
-                       updateCommandstatsValues(&info.Fields.Commandstats.CmdstatPing, line, "cmdstat_ping:")
-               case lineContains(line, "cmdstat_client:"):
-                       updateCommandstatsValues(&info.Fields.Commandstats.CmdstatClient, line, "cmdstat_client:")
-               case lineContains(line, "cmdstat_command:"):
-                       updateCommandstatsValues(&info.Fields.Commandstats.CmdstatCommand, line, "cmdstat_command:")
-               case lineContains(line, "cmdstat_subscribe:"):
-                       updateCommandstatsValues(&info.Fields.Commandstats.CmdstatSubscribe, line, "cmdstat_subscribe:")
-               case lineContains(line, "cmdstat_monitor:"):
-                       updateCommandstatsValues(&info.Fields.Commandstats.CmdstatMonitor, line, "cmdstat_monitor:")
-               case lineContains(line, "cmdstat_config:"):
-                       updateCommandstatsValues(&info.Fields.Commandstats.CmdstatConfig, line, "cmdstat_config:")
-               case lineContains(line, "cmdstat_slaveof:"):
-                       updateCommandstatsValues(&info.Fields.Commandstats.CmdstatSlaveof, line, "cmdstat_slaveof:")
-               case lineContains(line, "db0:"):
-                       updateKeyspaceValues(&info.Fields.Keyspace.Db, line, "db0:")
                }
        }
-       return nil
+       return config
+}
+
+func readRedisInfoReplyFields(input []string, info *DbInfo) {
+       config := getConfigInfo(input)
+
+       if value, ok := config["role"]; ok {
+               if "master" == value {
+                       info.Fields.PrimaryRole = true
+               }
+       }
+       if value, ok := config["connected_slaves"]; ok {
+               info.Fields.ConnectedReplicaCnt = getUint32FromString(value)
+       }
+       updateServerInfoFields(config, info)
+       updateClientInfoFields(config, info)
+       updateMemoryInfoFields(config, info)
+       updateStatsInfoFields(config, info)
+       updateCpuInfoFields(config, info)
+       updateCommandstatsInfoFields(config, info)
+       updateKeyspaceInfoFields(config, info)
 }
 
 func (db *DB) State() (*DbState, error) {
        dbState := new(DbState)
-       if db.cfg.sentinelPort != "" {
+       if db.sentinelPort != "" {
                //Establish connection to Redis sentinel. The reason why connection is done
                //here instead of time of the SDL instance creation is that for the time being
                //sentinel connection is needed only here to get state information and
                //state information is needed only by 'sdlcli' hence it is not time critical
                //and also we want to avoid opening unnecessary TCP connections towards Redis
                //sentinel for every SDL instance. Now it is done only when 'sdlcli' is used.
-               sentinelClient := db.sentinel(&db.cfg, db.addr)
+               sentinelClient := db.sentinel(db.addr, db.sentinelPort, db.masterName, db.nodeCnt)
                return sentinelClient.GetDbState()
        } else {
                info, err := db.Info()
@@ -709,17 +832,17 @@ func (db *DB) fillDbStateFromDbInfo(info *DbInfo) (*DbState, error) {
                        PrimaryDbState: PrimaryDbState{
                                Fields: PrimaryDbStateFields{
                                        Role:  "master",
-                                       Ip:    db.cfg.hostname,
-                                       Port:  db.cfg.port,
+                                       Ip:    db.addr,
+                                       Port:  db.port,
                                        Flags: "master",
                                },
                        },
                }
        }
 
-       cnt, err := strconv.Atoi(db.cfg.nodeCnt)
+       cnt, err := strconv.Atoi(db.nodeCnt)
        if err != nil {
-               dbState.Err = fmt.Errorf("DBAAS_NODE_COUNT configuration value '%s' conversion to integer failed", db.cfg.nodeCnt)
+               dbState.Err = fmt.Errorf("DBAAS_NODE_COUNT configuration value '%s' conversion to integer failed", db.nodeCnt)
        } else {
                dbState.ConfigNodeCnt = cnt
        }
@@ -728,10 +851,8 @@ func (db *DB) fillDbStateFromDbInfo(info *DbInfo) (*DbState, error) {
 }
 
 func createReplicaDbClient(host string) *DB {
-       cfg := readConfig(osImpl{})
-       cfg.sentinelPort = ""
-       cfg.clusterAddrList, cfg.port, _ = net.SplitHostPort(host)
-       return createDbClient(cfg, cfg.clusterAddrList, newRedisClient, subscribeNotifications, nil)
+       addr, port, _ := net.SplitHostPort(host)
+       return createDbClient(addr, port, "", "", "", newRedisClient, subscribeNotifications, nil)
 }
 
 func getStatisticsInfo(db *DB, host string) (*DbStatisticsInfo, error) {
@@ -765,7 +886,9 @@ func sentinelStatistics(db *DB) (*DbStatistics, error) {
                for _, r := range dbState.ReplicasDbState.States {
                        replicaDb := createReplicaDbClient(r.GetAddress())
                        dbStatisticsInfo, err = getStatisticsInfo(replicaDb, r.GetAddress())
-                       replicaDb.CloseDB()
+                       if closeErr := replicaDb.CloseDB(); closeErr != nil {
+                               return nil, closeErr
+                       }
                        if err != nil {
                                return nil, err
                        }
@@ -779,14 +902,14 @@ func sentinelStatistics(db *DB) (*DbStatistics, error) {
 func standaloneStatistics(db *DB) (*DbStatistics, error) {
        dbStatistics := new(DbStatistics)
 
-       dbStatisticsInfo, err := getStatisticsInfo(db, net.JoinHostPort(db.cfg.hostname, db.cfg.port))
+       dbStatisticsInfo, err := getStatisticsInfo(db, net.JoinHostPort(db.addr, db.port))
        dbStatistics.Stats = append(dbStatistics.Stats, dbStatisticsInfo)
 
        return dbStatistics, err
 }
 
 func (db *DB) Statistics() (*DbStatistics, error) {
-       if db.cfg.sentinelPort != "" {
+       if db.sentinelPort != "" {
                return sentinelStatistics(db)
        }
 
@@ -797,7 +920,7 @@ var luaRefresh = redis.NewScript(`if redis.call("get", KEYS[1]) == ARGV[1] then
 
 func (db *DB) PExpireIE(key string, data interface{}, expiration time.Duration) error {
        expirationStr := strconv.FormatInt(int64(expiration/time.Millisecond), 10)
-       result, err := luaRefresh.Run(db.client, []string{key}, data, expirationStr).Result()
+       result, err := luaRefresh.Run(db.ctx, db.client, []string{key}, data, expirationStr).Result()
        if err != nil {
                return err
        }