X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=internal%2Fsdlgoredis%2Fsdlgoredis.go;h=278be2ada72cb2ceb3fda7754c7db3b41ee11d9d;hb=0e96b4aef53285c0a61b8ce7d1769c04df7b6061;hp=0b9323975032c2156a15733e0f90fae987d97f7f;hpb=74b5e054dc40ed47cbbbb28f1fc7c80b50efcae6;p=ric-plt%2Fsdlgo.git diff --git a/internal/sdlgoredis/sdlgoredis.go b/internal/sdlgoredis/sdlgoredis.go index 0b93239..278be2a 100644 --- a/internal/sdlgoredis/sdlgoredis.go +++ b/internal/sdlgoredis/sdlgoredis.go @@ -23,9 +23,10 @@ package sdlgoredis import ( + "context" "errors" "fmt" - "github.com/go-redis/redis/v7" + "github.com/go-redis/redis/v8" "io" "log" "net" @@ -61,6 +62,7 @@ type Config struct { } type DB struct { + ctx context.Context client RedisClient sentinel RedisSentinelCreateCb subscribe SubscribeFn @@ -72,46 +74,48 @@ type DB struct { } type Subscriber interface { - Channel() <-chan *redis.Message - Subscribe(channels ...string) error - Unsubscribe(channels ...string) error + Channel(opts ...redis.ChannelOption) <-chan *redis.Message + Subscribe(ctx context.Context, channels ...string) error + Unsubscribe(ctx context.Context, channels ...string) error Close() error } -type SubscribeFn func(client RedisClient, channels ...string) Subscriber +type SubscribeFn func(ctx context.Context, client RedisClient, channels ...string) Subscriber type RedisClient interface { - Command() *redis.CommandsInfoCmd + Command(ctx context.Context) *redis.CommandsInfoCmd Close() error - Subscribe(channels ...string) *redis.PubSub - MSet(pairs ...interface{}) *redis.StatusCmd - Do(args ...interface{}) *redis.Cmd - MGet(keys ...string) *redis.SliceCmd - Del(keys ...string) *redis.IntCmd - Keys(pattern string) *redis.StringSliceCmd - SetNX(key string, value interface{}, expiration time.Duration) *redis.BoolCmd - SAdd(key string, members ...interface{}) *redis.IntCmd - SRem(key string, members ...interface{}) *redis.IntCmd - SMembers(key string) *redis.StringSliceCmd - SIsMember(key string, member interface{}) *redis.BoolCmd - SCard(key string) *redis.IntCmd - PTTL(key string) *redis.DurationCmd - Eval(script string, keys []string, args ...interface{}) *redis.Cmd - EvalSha(sha1 string, keys []string, args ...interface{}) *redis.Cmd - ScriptExists(scripts ...string) *redis.BoolSliceCmd - ScriptLoad(script string) *redis.StringCmd - Info(section ...string) *redis.StringCmd -} - -var dbLogger *log.Logger + Subscribe(ctx context.Context, channels ...string) *redis.PubSub + MSet(ctx context.Context, pairs ...interface{}) *redis.StatusCmd + Do(ctx context.Context, args ...interface{}) *redis.Cmd + MGet(ctx context.Context, keys ...string) *redis.SliceCmd + Del(ctx context.Context, keys ...string) *redis.IntCmd + Keys(ctx context.Context, pattern string) *redis.StringSliceCmd + SetNX(ctx context.Context, key string, value interface{}, expiration time.Duration) *redis.BoolCmd + SAdd(ctx context.Context, key string, members ...interface{}) *redis.IntCmd + SRem(ctx context.Context, key string, members ...interface{}) *redis.IntCmd + SMembers(ctx context.Context, key string) *redis.StringSliceCmd + SIsMember(ctx context.Context, key string, member interface{}) *redis.BoolCmd + SCard(ctx context.Context, key string) *redis.IntCmd + PTTL(ctx context.Context, key string) *redis.DurationCmd + Eval(ctx context.Context, script string, keys []string, args ...interface{}) *redis.Cmd + EvalSha(ctx context.Context, sha1 string, keys []string, args ...interface{}) *redis.Cmd + ScriptExists(ctx context.Context, scripts ...string) *redis.BoolSliceCmd + ScriptLoad(ctx context.Context, script string) *redis.StringCmd + Info(ctx context.Context, section ...string) *redis.StringCmd +} + +var dbLogger *logger func init() { - dbLogger = log.New(os.Stdout, "database: ", log.LstdFlags|log.Lshortfile) + dbLogger = &logger{ + log: log.New(os.Stdout, "database: ", log.LstdFlags|log.Lshortfile), + } redis.SetLogger(dbLogger) } func SetDbLogger(out io.Writer) { - dbLogger.SetOutput(out) + dbLogger.log.SetOutput(out) } func checkResultAndError(result interface{}, err error) (bool, error) { @@ -143,12 +147,13 @@ func checkIntResultAndError(result interface{}, err error) (bool, error) { return false, nil } -func subscribeNotifications(client RedisClient, channels ...string) Subscriber { - return client.Subscribe(channels...) +func subscribeNotifications(ctx context.Context, client RedisClient, channels ...string) Subscriber { + return client.Subscribe(ctx, channels...) } func CreateDB(client RedisClient, subscribe SubscribeFn, sentinelCreateCb RedisSentinelCreateCb, cfg Config, sentinelAddr string) *DB { db := DB{ + ctx: context.Background(), client: client, sentinel: sentinelCreateCb, subscribe: subscribe, @@ -266,7 +271,7 @@ func newRedisClient(addr, port, clusterName string, isHa bool) RedisClient { } func (db *DB) CheckCommands() { - commands, err := db.client.Command().Result() + commands, err := db.client.Command(db.ctx).Result() if err == nil { redisModuleCommands := []string{"setie", "delie", "setiepub", "setnxpub", "msetmpub", "delmpub"} @@ -277,7 +282,7 @@ func (db *DB) CheckCommands() { } } } else { - dbLogger.Printf("SDL DB commands checking failure: %s\n", err) + dbLogger.Printf(db.ctx, "SDL DB commands checking failure: %s\n", err) } } @@ -306,7 +311,7 @@ func (db *DB) SubscribeChannelDB(cb func(string, ...string), channelPrefix, even eventSeparator string, ch intChannels, channels ...string) { - sub := db.subscribe(db.client, channels...) + sub := db.subscribe(db.ctx, db.client, channels...) rxChannel := sub.Channel() lCbMap := sCbMap.GetMapCopy() for { @@ -318,14 +323,14 @@ func (db *DB) SubscribeChannelDB(cb func(string, ...string), channelPrefix, even } case channel := <-ch.addChannel: lCbMap = sCbMap.GetMapCopy() - sub.Subscribe(channel) + sub.Subscribe(db.ctx, channel) case channel := <-ch.removeChannel: lCbMap = sCbMap.GetMapCopy() - sub.Unsubscribe(channel) + sub.Unsubscribe(db.ctx, channel) case exit := <-ch.exit: if exit { if err := sub.Close(); err != nil { - dbLogger.Printf("SDL DB channel closing failure: %s\n", err) + dbLogger.Printf(db.ctx, "SDL DB channel closing failure: %s\n", err) } return } @@ -342,7 +347,7 @@ func (db *DB) SubscribeChannelDB(cb func(string, ...string), channelPrefix, even } func (db *DB) MSet(pairs ...interface{}) error { - return db.client.MSet(pairs...).Err() + return db.client.MSet(db.ctx, pairs...).Err() } func (db *DB) MSetMPub(channelsAndEvents []string, pairs ...interface{}) error { @@ -359,12 +364,12 @@ func (db *DB) MSetMPub(channelsAndEvents []string, pairs ...interface{}) error { for _, d := range channelsAndEvents { command = append(command, d) } - _, err := db.client.Do(command...).Result() + _, err := db.client.Do(db.ctx, command...).Result() return err } func (db *DB) MGet(keys []string) ([]interface{}, error) { - return db.client.MGet(keys...).Result() + return db.client.MGet(db.ctx, keys...).Result() } func (db *DB) DelMPub(channelsAndEvents []string, keys []string) error { @@ -381,18 +386,18 @@ func (db *DB) DelMPub(channelsAndEvents []string, keys []string) error { for _, d := range channelsAndEvents { command = append(command, d) } - _, err := db.client.Do(command...).Result() + _, err := db.client.Do(db.ctx, command...).Result() return err } func (db *DB) Del(keys []string) error { - _, err := db.client.Del(keys...).Result() + _, err := db.client.Del(db.ctx, keys...).Result() return err } func (db *DB) Keys(pattern string) ([]string, error) { - return db.client.Keys(pattern).Result() + return db.client.Keys(db.ctx, pattern).Result() } func (db *DB) SetIE(key string, oldData, newData interface{}) (bool, error) { @@ -400,7 +405,7 @@ func (db *DB) SetIE(key string, oldData, newData interface{}) (bool, error) { return false, errors.New("Redis deployment not supporting command") } - return checkResultAndError(db.client.Do("SETIE", key, newData, oldData).Result()) + return checkResultAndError(db.client.Do(db.ctx, "SETIE", key, newData, oldData).Result()) } func (db *DB) SetIEPub(channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error) { @@ -416,7 +421,7 @@ func (db *DB) SetIEPub(channelsAndEvents []string, key string, oldData, newData for _, ce := range channelsAndEvents { command = append(command, ce) } - return checkResultAndError(db.client.Do(command...).Result()) + return checkResultAndError(db.client.Do(db.ctx, command...).Result()) } func (db *DB) SetNXPub(channelsAndEvents []string, key string, data interface{}) (bool, error) { @@ -431,10 +436,10 @@ func (db *DB) SetNXPub(channelsAndEvents []string, key string, data interface{}) for _, ce := range channelsAndEvents { command = append(command, ce) } - return checkResultAndError(db.client.Do(command...).Result()) + return checkResultAndError(db.client.Do(db.ctx, command...).Result()) } func (db *DB) SetNX(key string, data interface{}, expiration time.Duration) (bool, error) { - return db.client.SetNX(key, data, expiration).Result() + return db.client.SetNX(db.ctx, key, data, expiration).Result() } func (db *DB) DelIEPub(channelsAndEvents []string, key string, data interface{}) (bool, error) { @@ -449,56 +454,56 @@ func (db *DB) DelIEPub(channelsAndEvents []string, key string, data interface{}) for _, ce := range channelsAndEvents { command = append(command, ce) } - return checkIntResultAndError(db.client.Do(command...).Result()) + return checkIntResultAndError(db.client.Do(db.ctx, command...).Result()) } func (db *DB) DelIE(key string, data interface{}) (bool, error) { if !db.redisModules { return false, errors.New("Redis deployment not supporting command") } - return checkIntResultAndError(db.client.Do("DELIE", key, data).Result()) + return checkIntResultAndError(db.client.Do(db.ctx, "DELIE", key, data).Result()) } func (db *DB) SAdd(key string, data ...interface{}) error { - _, err := db.client.SAdd(key, data...).Result() + _, err := db.client.SAdd(db.ctx, key, data...).Result() return err } func (db *DB) SRem(key string, data ...interface{}) error { - _, err := db.client.SRem(key, data...).Result() + _, err := db.client.SRem(db.ctx, key, data...).Result() return err } func (db *DB) SMembers(key string) ([]string, error) { - result, err := db.client.SMembers(key).Result() + result, err := db.client.SMembers(db.ctx, key).Result() return result, err } func (db *DB) SIsMember(key string, data interface{}) (bool, error) { - result, err := db.client.SIsMember(key, data).Result() + result, err := db.client.SIsMember(db.ctx, key, data).Result() return result, err } func (db *DB) SCard(key string) (int64, error) { - result, err := db.client.SCard(key).Result() + result, err := db.client.SCard(db.ctx, key).Result() return result, err } func (db *DB) PTTL(key string) (time.Duration, error) { - result, err := db.client.PTTL(key).Result() + result, err := db.client.PTTL(db.ctx, key).Result() return result, err } func (db *DB) Info() (*DbInfo, error) { var info DbInfo - resultStr, err := db.client.Info("all").Result() + resultStr, err := db.client.Info(db.ctx, "all").Result() if err != nil { return &info, err } result := strings.Split(strings.ReplaceAll(resultStr, "\r\n", "\n"), "\n") - err = readRedisInfoReplyFields(result, &info) - return &info, err + readRedisInfoReplyFields(result, &info) + return &info, nil } func lineContains(line, substr string) bool { @@ -558,9 +563,8 @@ func getCommandstatsValues(values string) (string, string, string) { return calls, usec, usecPerCall } -func updateCommandstatsValues(i interface{}, line, cmdstat string) { +func updateCommandstatsValues(i interface{}, values string) { stype := reflect.ValueOf(i).Elem() - values := getFieldValueStr(line, cmdstat) callsStr, usecStr, usecPerCallStr := getCommandstatsValues(values) callsField := stype.FieldByName("Calls") @@ -580,9 +584,8 @@ func getKeyspaceValues(values string) (string, string, string) { return keys, expires, avgttl } -func updateKeyspaceValues(i interface{}, line, keyspace string) { +func updateKeyspaceValues(i interface{}, values string) { stype := reflect.ValueOf(i).Elem() - values := getFieldValueStr(line, keyspace) keysStr, expiresStr, avgttlStr := getKeyspaceValues(values) keysField := stype.FieldByName("Keys") @@ -595,90 +598,167 @@ func updateKeyspaceValues(i interface{}, line, keyspace string) { avgttlField.Set(reflect.ValueOf(getUint32FromString(avgttlStr))) } -func readRedisInfoReplyFields(input []string, info *DbInfo) error { +func updateServerInfoFields(config ConfigInfo, info *DbInfo) { + if value, ok := config["uptime_in_days"]; ok { + info.Fields.Server.UptimeInDays = getUint32FromString(value) + } +} + +func updateClientInfoFields(config ConfigInfo, info *DbInfo) { + if value, ok := config["connected_clients"]; ok { + info.Fields.Clients.ConnectedClients = getUint32FromString(value) + } + if value, ok := config["client_recent_max_input_buffer"]; ok { + info.Fields.Clients.ClientRecentMaxInputBuffer = getUint32FromString(value) + } + if value, ok := config["client_recent_max_output_buffer"]; ok { + info.Fields.Clients.ClientRecentMaxOutputBuffer = getUint32FromString(value) + } +} + +func updateMemoryInfoFields(config ConfigInfo, info *DbInfo) { + if value, ok := config["used_memory"]; ok { + info.Fields.Memory.UsedMemory = getUint64FromString(value) + } + if value, ok := config["used_memory_human"]; ok { + info.Fields.Memory.UsedMemoryHuman = value + } + if value, ok := config["used_memory_rss"]; ok { + info.Fields.Memory.UsedMemoryRss = getUint64FromString(value) + } + if value, ok := config["used_memory_rss_human"]; ok { + info.Fields.Memory.UsedMemoryRssHuman = value + } + if value, ok := config["used_memory_peak"]; ok { + info.Fields.Memory.UsedMemoryPeak = getUint64FromString(value) + } + if value, ok := config["used_memory_peak_human"]; ok { + info.Fields.Memory.UsedMemoryPeakHuman = value + } + if value, ok := config["used_memory_peak_perc"]; ok { + info.Fields.Memory.UsedMemoryPeakPerc = value + } + if value, ok := config["mem_fragmentation_ratio"]; ok { + info.Fields.Memory.MemFragmentationRatio = getFloat32FromString(value) + } + if value, ok := config["mem_fragmentation_bytes"]; ok { + info.Fields.Memory.MemFragmentationBytes = getUint32FromString(value) + } +} + +func updateStatsInfoFields(config ConfigInfo, info *DbInfo) { + if value, ok := config["total_connections_received"]; ok { + info.Fields.Stats.TotalConnectionsReceived = getUint32FromString(value) + } + if value, ok := config["total_commands_processed"]; ok { + info.Fields.Stats.TotalCommandsProcessed = getUint32FromString(value) + } + if value, ok := config["sync_full"]; ok { + info.Fields.Stats.SyncFull = getUint32FromString(value) + } + if value, ok := config["sync_partial_ok"]; ok { + info.Fields.Stats.SyncPartialOk = getUint32FromString(value) + } + if value, ok := config["sync_partial_err"]; ok { + info.Fields.Stats.SyncPartialErr = getUint32FromString(value) + } + if value, ok := config["pubsub_channels"]; ok { + info.Fields.Stats.PubsubChannels = getUint32FromString(value) + } +} + +func updateCpuInfoFields(config ConfigInfo, info *DbInfo) { + if value, ok := config["used_cpu_sys"]; ok { + info.Fields.Cpu.UsedCpuSys = getFloat64FromString(value) + } + if value, ok := config["used_cpu_user"]; ok { + info.Fields.Cpu.UsedCpuUser = getFloat64FromString(value) + } +} + +func updateCommandstatsInfoFields(config ConfigInfo, info *DbInfo) { + if values, ok := config["cmdstat_replconf"]; ok { + updateCommandstatsValues(&info.Fields.Commandstats.CmdstatReplconf, values) + } + if values, ok := config["cmdstat_keys"]; ok { + updateCommandstatsValues(&info.Fields.Commandstats.CmdstatKeys, values) + } + if values, ok := config["cmdstat_role"]; ok { + updateCommandstatsValues(&info.Fields.Commandstats.CmdstatRole, values) + } + if values, ok := config["cmdstat_psync"]; ok { + updateCommandstatsValues(&info.Fields.Commandstats.CmdstatPsync, values) + } + if values, ok := config["cmdstat_mset"]; ok { + updateCommandstatsValues(&info.Fields.Commandstats.CmdstatMset, values) + } + if values, ok := config["cmdstat_publish"]; ok { + updateCommandstatsValues(&info.Fields.Commandstats.CmdstatPublish, values) + } + if values, ok := config["cmdstat_info"]; ok { + updateCommandstatsValues(&info.Fields.Commandstats.CmdstatInfo, values) + } + if values, ok := config["cmdstat_ping"]; ok { + updateCommandstatsValues(&info.Fields.Commandstats.CmdstatPing, values) + } + if values, ok := config["cmdstat_client"]; ok { + updateCommandstatsValues(&info.Fields.Commandstats.CmdstatClient, values) + } + if values, ok := config["cmdstat_command"]; ok { + updateCommandstatsValues(&info.Fields.Commandstats.CmdstatCommand, values) + } + if values, ok := config["cmdstat_subscribe"]; ok { + updateCommandstatsValues(&info.Fields.Commandstats.CmdstatSubscribe, values) + } + if values, ok := config["cmdstat_monitor"]; ok { + updateCommandstatsValues(&info.Fields.Commandstats.CmdstatMonitor, values) + } + if values, ok := config["cmdstat_config"]; ok { + updateCommandstatsValues(&info.Fields.Commandstats.CmdstatConfig, values) + } + if values, ok := config["cmdstat_slaveof"]; ok { + updateCommandstatsValues(&info.Fields.Commandstats.CmdstatSlaveof, values) + } +} + +func updateKeyspaceInfoFields(config ConfigInfo, info *DbInfo) { + if values, ok := config["db0"]; ok { + updateKeyspaceValues(&info.Fields.Keyspace.Db, values) + } +} + +func getConfigInfo(input []string) ConfigInfo { + config := ConfigInfo{} for _, line := range input { - switch { - case lineContains(line, "role:") && !lineContains(line, "_role:"): - if "master" == getFieldValueStr(line, "role:") { - info.Fields.PrimaryRole = true + if i := strings.Index(line, ":"); i != -1 { + if key := strings.TrimSpace(line[:i]); len(key) > 0 { + if len(line) > i { + config[key] = strings.TrimSpace(line[i+1:]) + } } - case lineContains(line, "connected_slaves:"): - info.Fields.ConnectedReplicaCnt = getUint32FromString(getFieldValueStr(line, "connected_slaves:")) - case lineContains(line, "uptime_in_days:"): - info.Fields.Server.UptimeInDays = getUint32FromString(getFieldValueStr(line, "uptime_in_days:")) - case lineContains(line, "connected_clients:"): - info.Fields.Clients.ConnectedClients = getUint32FromString(getFieldValueStr(line, "connected_clients:")) - case lineContains(line, "client_recent_max_input_buffer:"): - info.Fields.Clients.ClientRecentMaxInputBuffer = getUint32FromString(getFieldValueStr(line, "client_recent_max_input_buffer:")) - case lineContains(line, "client_recent_max_output_buffer:"): - info.Fields.Clients.ClientRecentMaxOutputBuffer = getUint32FromString(getFieldValueStr(line, "client_recent_max_output_buffer:")) - case lineContains(line, "used_memory:"): - info.Fields.Memory.UsedMemory = getUint64FromString(getFieldValueStr(line, "used_memory:")) - case lineContains(line, "used_memory_human:"): - info.Fields.Memory.UsedMemoryHuman = getFieldValueStr(line, "used_memory_human:") - case lineContains(line, "used_memory_rss:"): - info.Fields.Memory.UsedMemoryRss = getUint64FromString(getFieldValueStr(line, "used_memory_rss:")) - case lineContains(line, "used_memory_rss_human:"): - info.Fields.Memory.UsedMemoryRssHuman = getFieldValueStr(line, "used_memory_rss_human:") - case lineContains(line, "used_memory_peak:"): - info.Fields.Memory.UsedMemoryPeak = getUint64FromString(getFieldValueStr(line, "used_memory_peak:")) - case lineContains(line, "used_memory_peak_human:"): - info.Fields.Memory.UsedMemoryPeakHuman = getFieldValueStr(line, "used_memory_peak_human:") - case lineContains(line, "used_memory_peak_perc:"): - info.Fields.Memory.UsedMemoryPeakPerc = getFieldValueStr(line, "used_memory_peak_perc:") - case lineContains(line, "mem_fragmentation_ratio:"): - info.Fields.Memory.MemFragmentationRatio = getFloat32FromString(getFieldValueStr(line, "mem_fragmentation_ratio:")) - case lineContains(line, "mem_fragmentation_bytes:"): - info.Fields.Memory.MemFragmentationBytes = getUint32FromString(getFieldValueStr(line, "mem_fragmentation_bytes:")) - case lineContains(line, "total_connections_received:"): - info.Fields.Stats.TotalConnectionsReceived = getUint32FromString(getFieldValueStr(line, "total_connections_received:")) - case lineContains(line, "total_commands_processed:"): - info.Fields.Stats.TotalCommandsProcessed = getUint32FromString(getFieldValueStr(line, "total_commands_processed:")) - case lineContains(line, "sync_full:"): - info.Fields.Stats.SyncFull = getUint32FromString(getFieldValueStr(line, "sync_full:")) - case lineContains(line, "sync_partial_ok:"): - info.Fields.Stats.SyncPartialOk = getUint32FromString(getFieldValueStr(line, "sync_partial_ok:")) - case lineContains(line, "sync_partial_err:"): - info.Fields.Stats.SyncPartialErr = getUint32FromString(getFieldValueStr(line, "sync_partial_err:")) - case lineContains(line, "pubsub_channels:"): - info.Fields.Stats.PubsubChannels = getUint32FromString(getFieldValueStr(line, "pubsub_channels:")) - case lineContains(line, "used_cpu_sys:"): - info.Fields.Cpu.UsedCpuSys = getFloat64FromString(getFieldValueStr(line, "used_cpu_sys:")) - case lineContains(line, "used_cpu_user:"): - info.Fields.Cpu.UsedCpuUser = getFloat64FromString(getFieldValueStr(line, "used_cpu_user:")) - case lineContains(line, "cmdstat_replconf:"): - updateCommandstatsValues(&info.Fields.Commandstats.CmdstatReplconf, line, "cmdstat_replconf:") - case lineContains(line, "cmdstat_keys:"): - updateCommandstatsValues(&info.Fields.Commandstats.CmdstatKeys, line, "cmdstat_keys:") - case lineContains(line, "cmdstat_role:"): - updateCommandstatsValues(&info.Fields.Commandstats.CmdstatRole, line, "cmdstat_role:") - case lineContains(line, "cmdstat_psync:"): - updateCommandstatsValues(&info.Fields.Commandstats.CmdstatPsync, line, "cmdstat_psync:") - case lineContains(line, "cmdstat_mset:"): - updateCommandstatsValues(&info.Fields.Commandstats.CmdstatMset, line, "cmdstat_mset:") - case lineContains(line, "cmdstat_publish:"): - updateCommandstatsValues(&info.Fields.Commandstats.CmdstatPublish, line, "cmdstat_publish:") - case lineContains(line, "cmdstat_info:"): - updateCommandstatsValues(&info.Fields.Commandstats.CmdstatInfo, line, "cmdstat_info:") - case lineContains(line, "cmdstat_ping:"): - updateCommandstatsValues(&info.Fields.Commandstats.CmdstatPing, line, "cmdstat_ping:") - case lineContains(line, "cmdstat_client:"): - updateCommandstatsValues(&info.Fields.Commandstats.CmdstatClient, line, "cmdstat_client:") - case lineContains(line, "cmdstat_command:"): - updateCommandstatsValues(&info.Fields.Commandstats.CmdstatCommand, line, "cmdstat_command:") - case lineContains(line, "cmdstat_subscribe:"): - updateCommandstatsValues(&info.Fields.Commandstats.CmdstatSubscribe, line, "cmdstat_subscribe:") - case lineContains(line, "cmdstat_monitor:"): - updateCommandstatsValues(&info.Fields.Commandstats.CmdstatMonitor, line, "cmdstat_monitor:") - case lineContains(line, "cmdstat_config:"): - updateCommandstatsValues(&info.Fields.Commandstats.CmdstatConfig, line, "cmdstat_config:") - case lineContains(line, "cmdstat_slaveof:"): - updateCommandstatsValues(&info.Fields.Commandstats.CmdstatSlaveof, line, "cmdstat_slaveof:") - case lineContains(line, "db0:"): - updateKeyspaceValues(&info.Fields.Keyspace.Db, line, "db0:") } } - return nil + return config +} + +func readRedisInfoReplyFields(input []string, info *DbInfo) { + config := getConfigInfo(input) + + if value, ok := config["role"]; ok { + if "master" == value { + info.Fields.PrimaryRole = true + } + } + if value, ok := config["connected_slaves"]; ok { + info.Fields.ConnectedReplicaCnt = getUint32FromString(value) + } + updateServerInfoFields(config, info) + updateClientInfoFields(config, info) + updateMemoryInfoFields(config, info) + updateStatsInfoFields(config, info) + updateCpuInfoFields(config, info) + updateCommandstatsInfoFields(config, info) + updateKeyspaceInfoFields(config, info) } func (db *DB) State() (*DbState, error) { @@ -797,7 +877,7 @@ var luaRefresh = redis.NewScript(`if redis.call("get", KEYS[1]) == ARGV[1] then func (db *DB) PExpireIE(key string, data interface{}, expiration time.Duration) error { expirationStr := strconv.FormatInt(int64(expiration/time.Millisecond), 10) - result, err := luaRefresh.Run(db.client, []string{key}, data, expirationStr).Result() + result, err := luaRefresh.Run(db.ctx, db.client, []string{key}, data, expirationStr).Result() if err != nil { return err }