"github.com/go-redis/redis/v7"
"io"
"log"
+ "net"
"os"
+ "reflect"
"strconv"
"strings"
"sync"
return &info, err
}
+func lineContains(line, substr string) bool {
+ return strings.Contains(line, substr)
+}
+
+func getFieldValueStr(line, substr string) string {
+ if idx := strings.Index(line, substr); idx != -1 {
+ return line[idx+len(substr):]
+ }
+ return ""
+}
+
+func getUint32FromString(s string) uint32 {
+ if val, err := strconv.ParseUint(s, 10, 32); err == nil {
+ return uint32(val)
+ }
+ return 0
+}
+
+func getUint64FromString(s string) uint64 {
+ if val, err := strconv.ParseUint(s, 10, 64); err == nil {
+ return uint64(val)
+ }
+ return 0
+}
+
+func getFloatFromString(s string, bitSize int) float64 {
+ if val, err := strconv.ParseFloat(s, bitSize); err == nil {
+ return val
+ }
+ return 0
+}
+
+func getFloat64FromString(s string) float64 {
+ return getFloatFromString(s, 64)
+}
+
+func getFloat32FromString(s string) float32 {
+ return float32(getFloatFromString(s, 32))
+}
+
+func getValueString(values string, key string) string {
+ slice := strings.Split(values, ",")
+ for _, s := range slice {
+ if lineContains(s, key) {
+ return getFieldValueStr(s, key)
+ }
+ }
+ return ""
+}
+
+func getCommandstatsValues(values string) (string, string, string) {
+ calls := getValueString(values, "calls=")
+ usec := getValueString(values, "usec=")
+ usecPerCall := getValueString(values, "usec_per_call=")
+ return calls, usec, usecPerCall
+}
+
+func updateCommandstatsValues(i interface{}, line, cmdstat string) {
+ stype := reflect.ValueOf(i).Elem()
+ values := getFieldValueStr(line, cmdstat)
+ callsStr, usecStr, usecPerCallStr := getCommandstatsValues(values)
+
+ callsField := stype.FieldByName("Calls")
+ callsField.Set(reflect.ValueOf(getUint32FromString(callsStr)))
+
+ usecField := stype.FieldByName("Usec")
+ usecField.Set(reflect.ValueOf(getUint32FromString(usecStr)))
+
+ usecPerCallField := stype.FieldByName("UsecPerCall")
+ usecPerCallField.Set(reflect.ValueOf(getFloat32FromString(usecPerCallStr)))
+}
+
+func getKeyspaceValues(values string) (string, string, string) {
+ keys := getValueString(values, "keys=")
+ expires := getValueString(values, "expires=")
+ avgttl := getValueString(values, "avg_ttl=")
+ return keys, expires, avgttl
+}
+
+func updateKeyspaceValues(i interface{}, line, keyspace string) {
+ stype := reflect.ValueOf(i).Elem()
+ values := getFieldValueStr(line, keyspace)
+ keysStr, expiresStr, avgttlStr := getKeyspaceValues(values)
+
+ keysField := stype.FieldByName("Keys")
+ keysField.Set(reflect.ValueOf(getUint32FromString(keysStr)))
+
+ expiresField := stype.FieldByName("Expires")
+ expiresField.Set(reflect.ValueOf(getUint32FromString(expiresStr)))
+
+ avgttlField := stype.FieldByName("AvgTtl")
+ avgttlField.Set(reflect.ValueOf(getUint32FromString(avgttlStr)))
+}
+
func readRedisInfoReplyFields(input []string, info *DbInfo) error {
for _, line := range input {
- if idx := strings.Index(line, "role:"); idx != -1 {
- roleStr := line[idx+len("role:"):]
- if roleStr == "master" {
+ switch {
+ case lineContains(line, "role:") && !lineContains(line, "_role:"):
+ if "master" == getFieldValueStr(line, "role:") {
info.Fields.PrimaryRole = true
}
- } else if idx := strings.Index(line, "connected_slaves:"); idx != -1 {
- cntStr := line[idx+len("connected_slaves:"):]
- cnt, err := strconv.ParseUint(cntStr, 10, 32)
- if err != nil {
- return fmt.Errorf("Info reply error: %s", err.Error())
- }
- info.Fields.ConnectedReplicaCnt = uint32(cnt)
+ case lineContains(line, "connected_slaves:"):
+ info.Fields.ConnectedReplicaCnt = getUint32FromString(getFieldValueStr(line, "connected_slaves:"))
+ case lineContains(line, "uptime_in_days:"):
+ info.Fields.Server.UptimeInDays = getUint32FromString(getFieldValueStr(line, "uptime_in_days:"))
+ case lineContains(line, "connected_clients:"):
+ info.Fields.Clients.ConnectedClients = getUint32FromString(getFieldValueStr(line, "connected_clients:"))
+ case lineContains(line, "client_recent_max_input_buffer:"):
+ info.Fields.Clients.ClientRecentMaxInputBuffer = getUint32FromString(getFieldValueStr(line, "client_recent_max_input_buffer:"))
+ case lineContains(line, "client_recent_max_output_buffer:"):
+ info.Fields.Clients.ClientRecentMaxOutputBuffer = getUint32FromString(getFieldValueStr(line, "client_recent_max_output_buffer:"))
+ case lineContains(line, "used_memory:"):
+ info.Fields.Memory.UsedMemory = getUint64FromString(getFieldValueStr(line, "used_memory:"))
+ case lineContains(line, "used_memory_human:"):
+ info.Fields.Memory.UsedMemoryHuman = getFieldValueStr(line, "used_memory_human:")
+ case lineContains(line, "used_memory_rss:"):
+ info.Fields.Memory.UsedMemoryRss = getUint64FromString(getFieldValueStr(line, "used_memory_rss:"))
+ case lineContains(line, "used_memory_rss_human:"):
+ info.Fields.Memory.UsedMemoryRssHuman = getFieldValueStr(line, "used_memory_rss_human:")
+ case lineContains(line, "used_memory_peak:"):
+ info.Fields.Memory.UsedMemoryPeak = getUint64FromString(getFieldValueStr(line, "used_memory_peak:"))
+ case lineContains(line, "used_memory_peak_human:"):
+ info.Fields.Memory.UsedMemoryPeakHuman = getFieldValueStr(line, "used_memory_peak_human:")
+ case lineContains(line, "used_memory_peak_perc:"):
+ info.Fields.Memory.UsedMemoryPeakPerc = getFieldValueStr(line, "used_memory_peak_perc:")
+ case lineContains(line, "mem_fragmentation_ratio:"):
+ info.Fields.Memory.MemFragmentationRatio = getFloat32FromString(getFieldValueStr(line, "mem_fragmentation_ratio:"))
+ case lineContains(line, "mem_fragmentation_bytes:"):
+ info.Fields.Memory.MemFragmentationBytes = getUint32FromString(getFieldValueStr(line, "mem_fragmentation_bytes:"))
+ case lineContains(line, "total_connections_received:"):
+ info.Fields.Stats.TotalConnectionsReceived = getUint32FromString(getFieldValueStr(line, "total_connections_received:"))
+ case lineContains(line, "total_commands_processed:"):
+ info.Fields.Stats.TotalCommandsProcessed = getUint32FromString(getFieldValueStr(line, "total_commands_processed:"))
+ case lineContains(line, "sync_full:"):
+ info.Fields.Stats.SyncFull = getUint32FromString(getFieldValueStr(line, "sync_full:"))
+ case lineContains(line, "sync_partial_ok:"):
+ info.Fields.Stats.SyncPartialOk = getUint32FromString(getFieldValueStr(line, "sync_partial_ok:"))
+ case lineContains(line, "sync_partial_err:"):
+ info.Fields.Stats.SyncPartialErr = getUint32FromString(getFieldValueStr(line, "sync_partial_err:"))
+ case lineContains(line, "pubsub_channels:"):
+ info.Fields.Stats.PubsubChannels = getUint32FromString(getFieldValueStr(line, "pubsub_channels:"))
+ case lineContains(line, "used_cpu_sys:"):
+ info.Fields.Cpu.UsedCpuSys = getFloat64FromString(getFieldValueStr(line, "used_cpu_sys:"))
+ case lineContains(line, "used_cpu_user:"):
+ info.Fields.Cpu.UsedCpuUser = getFloat64FromString(getFieldValueStr(line, "used_cpu_user:"))
+ case lineContains(line, "cmdstat_replconf:"):
+ updateCommandstatsValues(&info.Fields.Commandstats.CmdstatReplconf, line, "cmdstat_replconf:")
+ case lineContains(line, "cmdstat_keys:"):
+ updateCommandstatsValues(&info.Fields.Commandstats.CmdstatKeys, line, "cmdstat_keys:")
+ case lineContains(line, "cmdstat_role:"):
+ updateCommandstatsValues(&info.Fields.Commandstats.CmdstatRole, line, "cmdstat_role:")
+ case lineContains(line, "cmdstat_psync:"):
+ updateCommandstatsValues(&info.Fields.Commandstats.CmdstatPsync, line, "cmdstat_psync:")
+ case lineContains(line, "cmdstat_mset:"):
+ updateCommandstatsValues(&info.Fields.Commandstats.CmdstatMset, line, "cmdstat_mset:")
+ case lineContains(line, "cmdstat_publish:"):
+ updateCommandstatsValues(&info.Fields.Commandstats.CmdstatPublish, line, "cmdstat_publish:")
+ case lineContains(line, "cmdstat_info:"):
+ updateCommandstatsValues(&info.Fields.Commandstats.CmdstatInfo, line, "cmdstat_info:")
+ case lineContains(line, "cmdstat_ping:"):
+ updateCommandstatsValues(&info.Fields.Commandstats.CmdstatPing, line, "cmdstat_ping:")
+ case lineContains(line, "cmdstat_client:"):
+ updateCommandstatsValues(&info.Fields.Commandstats.CmdstatClient, line, "cmdstat_client:")
+ case lineContains(line, "cmdstat_command:"):
+ updateCommandstatsValues(&info.Fields.Commandstats.CmdstatCommand, line, "cmdstat_command:")
+ case lineContains(line, "cmdstat_subscribe:"):
+ updateCommandstatsValues(&info.Fields.Commandstats.CmdstatSubscribe, line, "cmdstat_subscribe:")
+ case lineContains(line, "cmdstat_monitor:"):
+ updateCommandstatsValues(&info.Fields.Commandstats.CmdstatMonitor, line, "cmdstat_monitor:")
+ case lineContains(line, "cmdstat_config:"):
+ updateCommandstatsValues(&info.Fields.Commandstats.CmdstatConfig, line, "cmdstat_config:")
+ case lineContains(line, "cmdstat_slaveof:"):
+ updateCommandstatsValues(&info.Fields.Commandstats.CmdstatSlaveof, line, "cmdstat_slaveof:")
+ case lineContains(line, "db0:"):
+ updateKeyspaceValues(&info.Fields.Keyspace.Db, line, "db0:")
}
}
return nil
PrimaryDbState: PrimaryDbState{
Fields: PrimaryDbStateFields{
Role: "master",
+ Ip: db.cfg.hostname,
+ Port: db.cfg.port,
Flags: "master",
},
},
return &dbState, dbState.Err
}
+func createReplicaDbClient(host string) *DB {
+ cfg := readConfig(osImpl{})
+ cfg.sentinelPort = ""
+ cfg.clusterAddrList, cfg.port, _ = net.SplitHostPort(host)
+ return createDbClient(cfg, cfg.clusterAddrList, newRedisClient, subscribeNotifications, nil)
+}
+
+func getStatisticsInfo(db *DB, host string) (*DbStatisticsInfo, error) {
+ dbStatisticsInfo := new(DbStatisticsInfo)
+ dbStatisticsInfo.IPAddr, dbStatisticsInfo.Port, _ = net.SplitHostPort(host)
+
+ info, err := db.Info()
+ if err != nil {
+ return nil, err
+ }
+ dbStatisticsInfo.Info = info
+
+ return dbStatisticsInfo, nil
+}
+
+func sentinelStatistics(db *DB) (*DbStatistics, error) {
+ dbState := new(DbState)
+ dbStatistics := new(DbStatistics)
+ dbStatisticsInfo := new(DbStatisticsInfo)
+ var err error
+
+ dbState, err = db.State()
+ if err != nil {
+ return nil, err
+ }
+
+ dbStatisticsInfo, err = getStatisticsInfo(db, dbState.PrimaryDbState.GetAddress())
+ dbStatistics.Stats = append(dbStatistics.Stats, dbStatisticsInfo)
+
+ if dbState.ReplicasDbState != nil {
+ for _, r := range dbState.ReplicasDbState.States {
+ replicaDb := createReplicaDbClient(r.GetAddress())
+ dbStatisticsInfo, err = getStatisticsInfo(replicaDb, r.GetAddress())
+ replicaDb.CloseDB()
+ if err != nil {
+ return nil, err
+ }
+ dbStatistics.Stats = append(dbStatistics.Stats, dbStatisticsInfo)
+ }
+ }
+
+ return dbStatistics, nil
+}
+
+func standaloneStatistics(db *DB) (*DbStatistics, error) {
+ dbStatistics := new(DbStatistics)
+
+ dbStatisticsInfo, err := getStatisticsInfo(db, net.JoinHostPort(db.cfg.hostname, db.cfg.port))
+ dbStatistics.Stats = append(dbStatistics.Stats, dbStatisticsInfo)
+
+ return dbStatistics, err
+}
+
+func (db *DB) Statistics() (*DbStatistics, error) {
+ if db.cfg.sentinelPort != "" {
+ return sentinelStatistics(db)
+ }
+
+ return standaloneStatistics(db)
+}
+
var luaRefresh = redis.NewScript(`if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("pexpire", KEYS[1], ARGV[2]) else return 0 end`)
func (db *DB) PExpireIE(key string, data interface{}, expiration time.Duration) error {