Refactor long readRedisInfoReplyFields() function
[ric-plt/sdlgo.git] / internal / sdlgoredis / sdlgoredis.go
index 5f64960..78c8b5a 100644 (file)
@@ -24,10 +24,13 @@ package sdlgoredis
 
 import (
        "errors"
+       "fmt"
        "github.com/go-redis/redis/v7"
        "io"
        "log"
+       "net"
        "os"
+       "reflect"
        "strconv"
        "strings"
        "sync"
@@ -54,6 +57,7 @@ type Config struct {
        masterName      string
        sentinelPort    string
        clusterAddrList string
+       nodeCnt         string
 }
 
 type DB struct {
@@ -174,6 +178,7 @@ func readConfig(osI OS) Config {
                masterName:      osI.Getenv("DBAAS_MASTER_NAME", ""),
                sentinelPort:    osI.Getenv("DBAAS_SERVICE_SENTINEL_PORT", ""),
                clusterAddrList: osI.Getenv("DBAAS_CLUSTER_ADDR_LIST", ""),
+               nodeCnt:         osI.Getenv("DBAAS_NODE_COUNT", "1"),
        }
        return cfg
 }
@@ -487,28 +492,272 @@ func (db *DB) PTTL(key string) (time.Duration, error) {
 func (db *DB) Info() (*DbInfo, error) {
        var info DbInfo
        resultStr, err := db.client.Info("all").Result()
+       if err != nil {
+               return &info, err
+       }
+
        result := strings.Split(strings.ReplaceAll(resultStr, "\r\n", "\n"), "\n")
        readRedisInfoReplyFields(result, &info)
-       return &info, err
+       return &info, nil
 }
 
-func readRedisInfoReplyFields(input []string, info *DbInfo) {
+func lineContains(line, substr string) bool {
+       return strings.Contains(line, substr)
+}
+
+func getFieldValueStr(line, substr string) string {
+       if idx := strings.Index(line, substr); idx != -1 {
+               return line[idx+len(substr):]
+       }
+       return ""
+}
+
+func getUint32FromString(s string) uint32 {
+       if val, err := strconv.ParseUint(s, 10, 32); err == nil {
+               return uint32(val)
+       }
+       return 0
+}
+
+func getUint64FromString(s string) uint64 {
+       if val, err := strconv.ParseUint(s, 10, 64); err == nil {
+               return uint64(val)
+       }
+       return 0
+}
+
+func getFloatFromString(s string, bitSize int) float64 {
+       if val, err := strconv.ParseFloat(s, bitSize); err == nil {
+               return val
+       }
+       return 0
+}
+
+func getFloat64FromString(s string) float64 {
+       return getFloatFromString(s, 64)
+}
+
+func getFloat32FromString(s string) float32 {
+       return float32(getFloatFromString(s, 32))
+}
+
+func getValueString(values string, key string) string {
+       slice := strings.Split(values, ",")
+       for _, s := range slice {
+               if lineContains(s, key) {
+                       return getFieldValueStr(s, key)
+               }
+       }
+       return ""
+}
+
+func getCommandstatsValues(values string) (string, string, string) {
+       calls := getValueString(values, "calls=")
+       usec := getValueString(values, "usec=")
+       usecPerCall := getValueString(values, "usec_per_call=")
+       return calls, usec, usecPerCall
+}
+
+func updateCommandstatsValues(i interface{}, values string) {
+       stype := reflect.ValueOf(i).Elem()
+       callsStr, usecStr, usecPerCallStr := getCommandstatsValues(values)
+
+       callsField := stype.FieldByName("Calls")
+       callsField.Set(reflect.ValueOf(getUint32FromString(callsStr)))
+
+       usecField := stype.FieldByName("Usec")
+       usecField.Set(reflect.ValueOf(getUint32FromString(usecStr)))
+
+       usecPerCallField := stype.FieldByName("UsecPerCall")
+       usecPerCallField.Set(reflect.ValueOf(getFloat32FromString(usecPerCallStr)))
+}
+
+func getKeyspaceValues(values string) (string, string, string) {
+       keys := getValueString(values, "keys=")
+       expires := getValueString(values, "expires=")
+       avgttl := getValueString(values, "avg_ttl=")
+       return keys, expires, avgttl
+}
+
+func updateKeyspaceValues(i interface{}, values string) {
+       stype := reflect.ValueOf(i).Elem()
+       keysStr, expiresStr, avgttlStr := getKeyspaceValues(values)
+
+       keysField := stype.FieldByName("Keys")
+       keysField.Set(reflect.ValueOf(getUint32FromString(keysStr)))
+
+       expiresField := stype.FieldByName("Expires")
+       expiresField.Set(reflect.ValueOf(getUint32FromString(expiresStr)))
+
+       avgttlField := stype.FieldByName("AvgTtl")
+       avgttlField.Set(reflect.ValueOf(getUint32FromString(avgttlStr)))
+}
+
+func updateServerInfoFields(config ConfigInfo, info *DbInfo) {
+       if value, ok := config["uptime_in_days"]; ok {
+               info.Fields.Server.UptimeInDays = getUint32FromString(value)
+       }
+}
+
+func updateClientInfoFields(config ConfigInfo, info *DbInfo) {
+       if value, ok := config["connected_clients"]; ok {
+               info.Fields.Clients.ConnectedClients = getUint32FromString(value)
+       }
+       if value, ok := config["client_recent_max_input_buffer"]; ok {
+               info.Fields.Clients.ClientRecentMaxInputBuffer = getUint32FromString(value)
+       }
+       if value, ok := config["client_recent_max_output_buffer"]; ok {
+               info.Fields.Clients.ClientRecentMaxOutputBuffer = getUint32FromString(value)
+       }
+}
+
+func updateMemoryInfoFields(config ConfigInfo, info *DbInfo) {
+       if value, ok := config["used_memory"]; ok {
+               info.Fields.Memory.UsedMemory = getUint64FromString(value)
+       }
+       if value, ok := config["used_memory_human"]; ok {
+               info.Fields.Memory.UsedMemoryHuman = value
+       }
+       if value, ok := config["used_memory_rss"]; ok {
+               info.Fields.Memory.UsedMemoryRss = getUint64FromString(value)
+       }
+       if value, ok := config["used_memory_rss_human"]; ok {
+               info.Fields.Memory.UsedMemoryRssHuman = value
+       }
+       if value, ok := config["used_memory_peak"]; ok {
+               info.Fields.Memory.UsedMemoryPeak = getUint64FromString(value)
+       }
+       if value, ok := config["used_memory_peak_human"]; ok {
+               info.Fields.Memory.UsedMemoryPeakHuman = value
+       }
+       if value, ok := config["used_memory_peak_perc"]; ok {
+               info.Fields.Memory.UsedMemoryPeakPerc = value
+       }
+       if value, ok := config["mem_fragmentation_ratio"]; ok {
+               info.Fields.Memory.MemFragmentationRatio = getFloat32FromString(value)
+       }
+       if value, ok := config["mem_fragmentation_bytes"]; ok {
+               info.Fields.Memory.MemFragmentationBytes = getUint32FromString(value)
+       }
+}
+
+func updateStatsInfoFields(config ConfigInfo, info *DbInfo) {
+       if value, ok := config["total_connections_received"]; ok {
+               info.Fields.Stats.TotalConnectionsReceived = getUint32FromString(value)
+       }
+       if value, ok := config["total_commands_processed"]; ok {
+               info.Fields.Stats.TotalCommandsProcessed = getUint32FromString(value)
+       }
+       if value, ok := config["sync_full"]; ok {
+               info.Fields.Stats.SyncFull = getUint32FromString(value)
+       }
+       if value, ok := config["sync_partial_ok"]; ok {
+               info.Fields.Stats.SyncPartialOk = getUint32FromString(value)
+       }
+       if value, ok := config["sync_partial_err"]; ok {
+               info.Fields.Stats.SyncPartialErr = getUint32FromString(value)
+       }
+       if value, ok := config["pubsub_channels"]; ok {
+               info.Fields.Stats.PubsubChannels = getUint32FromString(value)
+       }
+}
+
+func updateCpuInfoFields(config ConfigInfo, info *DbInfo) {
+       if value, ok := config["used_cpu_sys"]; ok {
+               info.Fields.Cpu.UsedCpuSys = getFloat64FromString(value)
+       }
+       if value, ok := config["used_cpu_user"]; ok {
+               info.Fields.Cpu.UsedCpuUser = getFloat64FromString(value)
+       }
+}
+
+func updateCommandstatsInfoFields(config ConfigInfo, info *DbInfo) {
+       if values, ok := config["cmdstat_replconf"]; ok {
+               updateCommandstatsValues(&info.Fields.Commandstats.CmdstatReplconf, values)
+       }
+       if values, ok := config["cmdstat_keys"]; ok {
+               updateCommandstatsValues(&info.Fields.Commandstats.CmdstatKeys, values)
+       }
+       if values, ok := config["cmdstat_role"]; ok {
+               updateCommandstatsValues(&info.Fields.Commandstats.CmdstatRole, values)
+       }
+       if values, ok := config["cmdstat_psync"]; ok {
+               updateCommandstatsValues(&info.Fields.Commandstats.CmdstatPsync, values)
+       }
+       if values, ok := config["cmdstat_mset"]; ok {
+               updateCommandstatsValues(&info.Fields.Commandstats.CmdstatMset, values)
+       }
+       if values, ok := config["cmdstat_publish"]; ok {
+               updateCommandstatsValues(&info.Fields.Commandstats.CmdstatPublish, values)
+       }
+       if values, ok := config["cmdstat_info"]; ok {
+               updateCommandstatsValues(&info.Fields.Commandstats.CmdstatInfo, values)
+       }
+       if values, ok := config["cmdstat_ping"]; ok {
+               updateCommandstatsValues(&info.Fields.Commandstats.CmdstatPing, values)
+       }
+       if values, ok := config["cmdstat_client"]; ok {
+               updateCommandstatsValues(&info.Fields.Commandstats.CmdstatClient, values)
+       }
+       if values, ok := config["cmdstat_command"]; ok {
+               updateCommandstatsValues(&info.Fields.Commandstats.CmdstatCommand, values)
+       }
+       if values, ok := config["cmdstat_subscribe"]; ok {
+               updateCommandstatsValues(&info.Fields.Commandstats.CmdstatSubscribe, values)
+       }
+       if values, ok := config["cmdstat_monitor"]; ok {
+               updateCommandstatsValues(&info.Fields.Commandstats.CmdstatMonitor, values)
+       }
+       if values, ok := config["cmdstat_config"]; ok {
+               updateCommandstatsValues(&info.Fields.Commandstats.CmdstatConfig, values)
+       }
+       if values, ok := config["cmdstat_slaveof"]; ok {
+               updateCommandstatsValues(&info.Fields.Commandstats.CmdstatSlaveof, values)
+       }
+}
+
+func updateKeyspaceInfoFields(config ConfigInfo, info *DbInfo) {
+       if values, ok := config["db0"]; ok {
+               updateKeyspaceValues(&info.Fields.Keyspace.Db, values)
+       }
+}
+
+func getConfigInfo(input []string) ConfigInfo {
+       config := ConfigInfo{}
        for _, line := range input {
-               if idx := strings.Index(line, "role:"); idx != -1 {
-                       roleStr := line[idx+len("role:"):]
-                       if roleStr == "master" {
-                               info.Fields.MasterRole = true
-                       }
-               } else if idx := strings.Index(line, "connected_slaves:"); idx != -1 {
-                       cntStr := line[idx+len("connected_slaves:"):]
-                       if cnt, err := strconv.ParseUint(cntStr, 10, 32); err == nil {
-                               info.Fields.ConnectedReplicaCnt = uint32(cnt)
+               if i := strings.Index(line, ":"); i != -1 {
+                       if key := strings.TrimSpace(line[:i]); len(key) > 0 {
+                               if len(line) > i {
+                                       config[key] = strings.TrimSpace(line[i+1:])
+                               }
                        }
                }
        }
+       return config
+}
+
+func readRedisInfoReplyFields(input []string, info *DbInfo) {
+       config := getConfigInfo(input)
+
+       if value, ok := config["role"]; ok {
+               if "master" == value {
+                       info.Fields.PrimaryRole = true
+               }
+       }
+       if value, ok := config["connected_slaves"]; ok {
+               info.Fields.ConnectedReplicaCnt = getUint32FromString(value)
+       }
+       updateServerInfoFields(config, info)
+       updateClientInfoFields(config, info)
+       updateMemoryInfoFields(config, info)
+       updateStatsInfoFields(config, info)
+       updateCpuInfoFields(config, info)
+       updateCommandstatsInfoFields(config, info)
+       updateKeyspaceInfoFields(config, info)
 }
 
 func (db *DB) State() (*DbState, error) {
+       dbState := new(DbState)
        if db.cfg.sentinelPort != "" {
                //Establish connection to Redis sentinel. The reason why connection is done
                //here instead of time of the SDL instance creation is that for the time being
@@ -519,29 +768,104 @@ func (db *DB) State() (*DbState, error) {
                sentinelClient := db.sentinel(&db.cfg, db.addr)
                return sentinelClient.GetDbState()
        } else {
-               var dbState DbState
                info, err := db.Info()
                if err != nil {
-                       return &dbState, err
+                       dbState.PrimaryDbState.Err = err
+                       return dbState, err
                }
-               dbState = fillDbStateFromDbInfo(info)
-               return &dbState, err
+               return db.fillDbStateFromDbInfo(info)
        }
 }
 
-func fillDbStateFromDbInfo(info *DbInfo) DbState {
+func (db *DB) fillDbStateFromDbInfo(info *DbInfo) (*DbState, error) {
        var dbState DbState
-       if info.Fields.MasterRole == true {
+       if info.Fields.PrimaryRole == true {
                dbState = DbState{
-                       MasterDbState: MasterDbState{
-                               Fields: MasterDbStateFields{
+                       PrimaryDbState: PrimaryDbState{
+                               Fields: PrimaryDbStateFields{
                                        Role:  "master",
+                                       Ip:    db.cfg.hostname,
+                                       Port:  db.cfg.port,
                                        Flags: "master",
                                },
                        },
                }
        }
-       return dbState
+
+       cnt, err := strconv.Atoi(db.cfg.nodeCnt)
+       if err != nil {
+               dbState.Err = fmt.Errorf("DBAAS_NODE_COUNT configuration value '%s' conversion to integer failed", db.cfg.nodeCnt)
+       } else {
+               dbState.ConfigNodeCnt = cnt
+       }
+
+       return &dbState, dbState.Err
+}
+
+func createReplicaDbClient(host string) *DB {
+       cfg := readConfig(osImpl{})
+       cfg.sentinelPort = ""
+       cfg.clusterAddrList, cfg.port, _ = net.SplitHostPort(host)
+       return createDbClient(cfg, cfg.clusterAddrList, newRedisClient, subscribeNotifications, nil)
+}
+
+func getStatisticsInfo(db *DB, host string) (*DbStatisticsInfo, error) {
+       dbStatisticsInfo := new(DbStatisticsInfo)
+       dbStatisticsInfo.IPAddr, dbStatisticsInfo.Port, _ = net.SplitHostPort(host)
+
+       info, err := db.Info()
+       if err != nil {
+               return nil, err
+       }
+       dbStatisticsInfo.Info = info
+
+       return dbStatisticsInfo, nil
+}
+
+func sentinelStatistics(db *DB) (*DbStatistics, error) {
+       dbState := new(DbState)
+       dbStatistics := new(DbStatistics)
+       dbStatisticsInfo := new(DbStatisticsInfo)
+       var err error
+
+       dbState, err = db.State()
+       if err != nil {
+               return nil, err
+       }
+
+       dbStatisticsInfo, err = getStatisticsInfo(db, dbState.PrimaryDbState.GetAddress())
+       dbStatistics.Stats = append(dbStatistics.Stats, dbStatisticsInfo)
+
+       if dbState.ReplicasDbState != nil {
+               for _, r := range dbState.ReplicasDbState.States {
+                       replicaDb := createReplicaDbClient(r.GetAddress())
+                       dbStatisticsInfo, err = getStatisticsInfo(replicaDb, r.GetAddress())
+                       replicaDb.CloseDB()
+                       if err != nil {
+                               return nil, err
+                       }
+                       dbStatistics.Stats = append(dbStatistics.Stats, dbStatisticsInfo)
+               }
+       }
+
+       return dbStatistics, nil
+}
+
+func standaloneStatistics(db *DB) (*DbStatistics, error) {
+       dbStatistics := new(DbStatistics)
+
+       dbStatisticsInfo, err := getStatisticsInfo(db, net.JoinHostPort(db.cfg.hostname, db.cfg.port))
+       dbStatistics.Stats = append(dbStatistics.Stats, dbStatisticsInfo)
+
+       return dbStatistics, err
+}
+
+func (db *DB) Statistics() (*DbStatistics, error) {
+       if db.cfg.sentinelPort != "" {
+               return sentinelStatistics(db)
+       }
+
+       return standaloneStatistics(db)
 }
 
 var luaRefresh = redis.NewScript(`if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("pexpire", KEYS[1], ARGV[2]) else return 0 end`)