X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=internal%2Fsdlgoredis%2Fsdlgoredis.go;h=0b9323975032c2156a15733e0f90fae987d97f7f;hb=refs%2Ftags%2Fv0.9.0;hp=cefe4be971fb826da9e43cbbdd192a3331ea2c92;hpb=ee834c59a83cdd10889b3439a281fe43b9f98839;p=ric-plt%2Fsdlgo.git diff --git a/internal/sdlgoredis/sdlgoredis.go b/internal/sdlgoredis/sdlgoredis.go index cefe4be..0b93239 100644 --- a/internal/sdlgoredis/sdlgoredis.go +++ b/internal/sdlgoredis/sdlgoredis.go @@ -24,10 +24,13 @@ package sdlgoredis import ( "errors" + "fmt" "github.com/go-redis/redis/v7" "io" "log" + "net" "os" + "reflect" "strconv" "strings" "sync" @@ -54,6 +57,7 @@ type Config struct { masterName string sentinelPort string clusterAddrList string + nodeCnt string } type DB struct { @@ -174,6 +178,7 @@ func readConfig(osI OS) Config { masterName: osI.Getenv("DBAAS_MASTER_NAME", ""), sentinelPort: osI.Getenv("DBAAS_SERVICE_SENTINEL_PORT", ""), clusterAddrList: osI.Getenv("DBAAS_CLUSTER_ADDR_LIST", ""), + nodeCnt: osI.Getenv("DBAAS_NODE_COUNT", "1"), } return cfg } @@ -487,29 +492,197 @@ func (db *DB) PTTL(key string) (time.Duration, error) { func (db *DB) Info() (*DbInfo, error) { var info DbInfo resultStr, err := db.client.Info("all").Result() + if err != nil { + return &info, err + } + result := strings.Split(strings.ReplaceAll(resultStr, "\r\n", "\n"), "\n") - readRedisInfoReplyFields(result, &info) + err = readRedisInfoReplyFields(result, &info) return &info, err } -func readRedisInfoReplyFields(input []string, info *DbInfo) { +func lineContains(line, substr string) bool { + return strings.Contains(line, substr) +} + +func getFieldValueStr(line, substr string) string { + if idx := strings.Index(line, substr); idx != -1 { + return line[idx+len(substr):] + } + return "" +} + +func getUint32FromString(s string) uint32 { + if val, err := strconv.ParseUint(s, 10, 32); err == nil { + return uint32(val) + } + return 0 +} + +func getUint64FromString(s string) uint64 { + if val, err := strconv.ParseUint(s, 10, 64); err == nil { + return uint64(val) + } + return 0 +} + +func getFloatFromString(s string, bitSize int) float64 { + if val, err := strconv.ParseFloat(s, bitSize); err == nil { + return val + } + return 0 +} + +func getFloat64FromString(s string) float64 { + return getFloatFromString(s, 64) +} + +func getFloat32FromString(s string) float32 { + return float32(getFloatFromString(s, 32)) +} + +func getValueString(values string, key string) string { + slice := strings.Split(values, ",") + for _, s := range slice { + if lineContains(s, key) { + return getFieldValueStr(s, key) + } + } + return "" +} + +func getCommandstatsValues(values string) (string, string, string) { + calls := getValueString(values, "calls=") + usec := getValueString(values, "usec=") + usecPerCall := getValueString(values, "usec_per_call=") + return calls, usec, usecPerCall +} + +func updateCommandstatsValues(i interface{}, line, cmdstat string) { + stype := reflect.ValueOf(i).Elem() + values := getFieldValueStr(line, cmdstat) + callsStr, usecStr, usecPerCallStr := getCommandstatsValues(values) + + callsField := stype.FieldByName("Calls") + callsField.Set(reflect.ValueOf(getUint32FromString(callsStr))) + + usecField := stype.FieldByName("Usec") + usecField.Set(reflect.ValueOf(getUint32FromString(usecStr))) + + usecPerCallField := stype.FieldByName("UsecPerCall") + usecPerCallField.Set(reflect.ValueOf(getFloat32FromString(usecPerCallStr))) +} + +func getKeyspaceValues(values string) (string, string, string) { + keys := getValueString(values, "keys=") + expires := getValueString(values, "expires=") + avgttl := getValueString(values, "avg_ttl=") + return keys, expires, avgttl +} + +func updateKeyspaceValues(i interface{}, line, keyspace string) { + stype := reflect.ValueOf(i).Elem() + values := getFieldValueStr(line, keyspace) + keysStr, expiresStr, avgttlStr := getKeyspaceValues(values) + + keysField := stype.FieldByName("Keys") + keysField.Set(reflect.ValueOf(getUint32FromString(keysStr))) + + expiresField := stype.FieldByName("Expires") + expiresField.Set(reflect.ValueOf(getUint32FromString(expiresStr))) + + avgttlField := stype.FieldByName("AvgTtl") + avgttlField.Set(reflect.ValueOf(getUint32FromString(avgttlStr))) +} + +func readRedisInfoReplyFields(input []string, info *DbInfo) error { for _, line := range input { - if idx := strings.Index(line, "role:"); idx != -1 { - roleStr := line[idx+len("role:"):] - if roleStr == "master" { + switch { + case lineContains(line, "role:") && !lineContains(line, "_role:"): + if "master" == getFieldValueStr(line, "role:") { info.Fields.PrimaryRole = true } - } else if idx := strings.Index(line, "connected_slaves:"); idx != -1 { - cntStr := line[idx+len("connected_slaves:"):] - if cnt, err := strconv.ParseUint(cntStr, 10, 32); err == nil { - info.Fields.ConnectedReplicaCnt = uint32(cnt) - } - + case lineContains(line, "connected_slaves:"): + info.Fields.ConnectedReplicaCnt = getUint32FromString(getFieldValueStr(line, "connected_slaves:")) + case lineContains(line, "uptime_in_days:"): + info.Fields.Server.UptimeInDays = getUint32FromString(getFieldValueStr(line, "uptime_in_days:")) + case lineContains(line, "connected_clients:"): + info.Fields.Clients.ConnectedClients = getUint32FromString(getFieldValueStr(line, "connected_clients:")) + case lineContains(line, "client_recent_max_input_buffer:"): + info.Fields.Clients.ClientRecentMaxInputBuffer = getUint32FromString(getFieldValueStr(line, "client_recent_max_input_buffer:")) + case lineContains(line, "client_recent_max_output_buffer:"): + info.Fields.Clients.ClientRecentMaxOutputBuffer = getUint32FromString(getFieldValueStr(line, "client_recent_max_output_buffer:")) + case lineContains(line, "used_memory:"): + info.Fields.Memory.UsedMemory = getUint64FromString(getFieldValueStr(line, "used_memory:")) + case lineContains(line, "used_memory_human:"): + info.Fields.Memory.UsedMemoryHuman = getFieldValueStr(line, "used_memory_human:") + case lineContains(line, "used_memory_rss:"): + info.Fields.Memory.UsedMemoryRss = getUint64FromString(getFieldValueStr(line, "used_memory_rss:")) + case lineContains(line, "used_memory_rss_human:"): + info.Fields.Memory.UsedMemoryRssHuman = getFieldValueStr(line, "used_memory_rss_human:") + case lineContains(line, "used_memory_peak:"): + info.Fields.Memory.UsedMemoryPeak = getUint64FromString(getFieldValueStr(line, "used_memory_peak:")) + case lineContains(line, "used_memory_peak_human:"): + info.Fields.Memory.UsedMemoryPeakHuman = getFieldValueStr(line, "used_memory_peak_human:") + case lineContains(line, "used_memory_peak_perc:"): + info.Fields.Memory.UsedMemoryPeakPerc = getFieldValueStr(line, "used_memory_peak_perc:") + case lineContains(line, "mem_fragmentation_ratio:"): + info.Fields.Memory.MemFragmentationRatio = getFloat32FromString(getFieldValueStr(line, "mem_fragmentation_ratio:")) + case lineContains(line, "mem_fragmentation_bytes:"): + info.Fields.Memory.MemFragmentationBytes = getUint32FromString(getFieldValueStr(line, "mem_fragmentation_bytes:")) + case lineContains(line, "total_connections_received:"): + info.Fields.Stats.TotalConnectionsReceived = getUint32FromString(getFieldValueStr(line, "total_connections_received:")) + case lineContains(line, "total_commands_processed:"): + info.Fields.Stats.TotalCommandsProcessed = getUint32FromString(getFieldValueStr(line, "total_commands_processed:")) + case lineContains(line, "sync_full:"): + info.Fields.Stats.SyncFull = getUint32FromString(getFieldValueStr(line, "sync_full:")) + case lineContains(line, "sync_partial_ok:"): + info.Fields.Stats.SyncPartialOk = getUint32FromString(getFieldValueStr(line, "sync_partial_ok:")) + case lineContains(line, "sync_partial_err:"): + info.Fields.Stats.SyncPartialErr = getUint32FromString(getFieldValueStr(line, "sync_partial_err:")) + case lineContains(line, "pubsub_channels:"): + info.Fields.Stats.PubsubChannels = getUint32FromString(getFieldValueStr(line, "pubsub_channels:")) + case lineContains(line, "used_cpu_sys:"): + info.Fields.Cpu.UsedCpuSys = getFloat64FromString(getFieldValueStr(line, "used_cpu_sys:")) + case lineContains(line, "used_cpu_user:"): + info.Fields.Cpu.UsedCpuUser = getFloat64FromString(getFieldValueStr(line, "used_cpu_user:")) + case lineContains(line, "cmdstat_replconf:"): + updateCommandstatsValues(&info.Fields.Commandstats.CmdstatReplconf, line, "cmdstat_replconf:") + case lineContains(line, "cmdstat_keys:"): + updateCommandstatsValues(&info.Fields.Commandstats.CmdstatKeys, line, "cmdstat_keys:") + case lineContains(line, "cmdstat_role:"): + updateCommandstatsValues(&info.Fields.Commandstats.CmdstatRole, line, "cmdstat_role:") + case lineContains(line, "cmdstat_psync:"): + updateCommandstatsValues(&info.Fields.Commandstats.CmdstatPsync, line, "cmdstat_psync:") + case lineContains(line, "cmdstat_mset:"): + updateCommandstatsValues(&info.Fields.Commandstats.CmdstatMset, line, "cmdstat_mset:") + case lineContains(line, "cmdstat_publish:"): + updateCommandstatsValues(&info.Fields.Commandstats.CmdstatPublish, line, "cmdstat_publish:") + case lineContains(line, "cmdstat_info:"): + updateCommandstatsValues(&info.Fields.Commandstats.CmdstatInfo, line, "cmdstat_info:") + case lineContains(line, "cmdstat_ping:"): + updateCommandstatsValues(&info.Fields.Commandstats.CmdstatPing, line, "cmdstat_ping:") + case lineContains(line, "cmdstat_client:"): + updateCommandstatsValues(&info.Fields.Commandstats.CmdstatClient, line, "cmdstat_client:") + case lineContains(line, "cmdstat_command:"): + updateCommandstatsValues(&info.Fields.Commandstats.CmdstatCommand, line, "cmdstat_command:") + case lineContains(line, "cmdstat_subscribe:"): + updateCommandstatsValues(&info.Fields.Commandstats.CmdstatSubscribe, line, "cmdstat_subscribe:") + case lineContains(line, "cmdstat_monitor:"): + updateCommandstatsValues(&info.Fields.Commandstats.CmdstatMonitor, line, "cmdstat_monitor:") + case lineContains(line, "cmdstat_config:"): + updateCommandstatsValues(&info.Fields.Commandstats.CmdstatConfig, line, "cmdstat_config:") + case lineContains(line, "cmdstat_slaveof:"): + updateCommandstatsValues(&info.Fields.Commandstats.CmdstatSlaveof, line, "cmdstat_slaveof:") + case lineContains(line, "db0:"): + updateKeyspaceValues(&info.Fields.Keyspace.Db, line, "db0:") } } + return nil } func (db *DB) State() (*DbState, error) { + dbState := new(DbState) if db.cfg.sentinelPort != "" { //Establish connection to Redis sentinel. The reason why connection is done //here instead of time of the SDL instance creation is that for the time being @@ -520,29 +693,104 @@ func (db *DB) State() (*DbState, error) { sentinelClient := db.sentinel(&db.cfg, db.addr) return sentinelClient.GetDbState() } else { - var dbState DbState info, err := db.Info() if err != nil { - return &dbState, err + dbState.PrimaryDbState.Err = err + return dbState, err } - dbState = fillDbStateFromDbInfo(info) - return &dbState, err + return db.fillDbStateFromDbInfo(info) } } -func fillDbStateFromDbInfo(info *DbInfo) DbState { +func (db *DB) fillDbStateFromDbInfo(info *DbInfo) (*DbState, error) { var dbState DbState if info.Fields.PrimaryRole == true { dbState = DbState{ PrimaryDbState: PrimaryDbState{ Fields: PrimaryDbStateFields{ Role: "master", + Ip: db.cfg.hostname, + Port: db.cfg.port, Flags: "master", }, }, } } - return dbState + + cnt, err := strconv.Atoi(db.cfg.nodeCnt) + if err != nil { + dbState.Err = fmt.Errorf("DBAAS_NODE_COUNT configuration value '%s' conversion to integer failed", db.cfg.nodeCnt) + } else { + dbState.ConfigNodeCnt = cnt + } + + return &dbState, dbState.Err +} + +func createReplicaDbClient(host string) *DB { + cfg := readConfig(osImpl{}) + cfg.sentinelPort = "" + cfg.clusterAddrList, cfg.port, _ = net.SplitHostPort(host) + return createDbClient(cfg, cfg.clusterAddrList, newRedisClient, subscribeNotifications, nil) +} + +func getStatisticsInfo(db *DB, host string) (*DbStatisticsInfo, error) { + dbStatisticsInfo := new(DbStatisticsInfo) + dbStatisticsInfo.IPAddr, dbStatisticsInfo.Port, _ = net.SplitHostPort(host) + + info, err := db.Info() + if err != nil { + return nil, err + } + dbStatisticsInfo.Info = info + + return dbStatisticsInfo, nil +} + +func sentinelStatistics(db *DB) (*DbStatistics, error) { + dbState := new(DbState) + dbStatistics := new(DbStatistics) + dbStatisticsInfo := new(DbStatisticsInfo) + var err error + + dbState, err = db.State() + if err != nil { + return nil, err + } + + dbStatisticsInfo, err = getStatisticsInfo(db, dbState.PrimaryDbState.GetAddress()) + dbStatistics.Stats = append(dbStatistics.Stats, dbStatisticsInfo) + + if dbState.ReplicasDbState != nil { + for _, r := range dbState.ReplicasDbState.States { + replicaDb := createReplicaDbClient(r.GetAddress()) + dbStatisticsInfo, err = getStatisticsInfo(replicaDb, r.GetAddress()) + replicaDb.CloseDB() + if err != nil { + return nil, err + } + dbStatistics.Stats = append(dbStatistics.Stats, dbStatisticsInfo) + } + } + + return dbStatistics, nil +} + +func standaloneStatistics(db *DB) (*DbStatistics, error) { + dbStatistics := new(DbStatistics) + + dbStatisticsInfo, err := getStatisticsInfo(db, net.JoinHostPort(db.cfg.hostname, db.cfg.port)) + dbStatistics.Stats = append(dbStatistics.Stats, dbStatisticsInfo) + + return dbStatistics, err +} + +func (db *DB) Statistics() (*DbStatistics, error) { + if db.cfg.sentinelPort != "" { + return sentinelStatistics(db) + } + + return standaloneStatistics(db) } var luaRefresh = redis.NewScript(`if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("pexpire", KEYS[1], ARGV[2]) else return 0 end`)