X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=internal%2Fsdlgoredis%2Fsdlgoredis.go;h=78c8b5a6bac9854d3d6ea6ead3975bb8a9f8c120;hb=refs%2Ftags%2Fv0.9.4;hp=570dfa123760a5d49c65cf7a56021f66dfdb3e86;hpb=4123ade6080f3f5e3f3bf44e30faeacc531f32d0;p=ric-plt%2Fsdlgo.git diff --git a/internal/sdlgoredis/sdlgoredis.go b/internal/sdlgoredis/sdlgoredis.go index 570dfa1..78c8b5a 100644 --- a/internal/sdlgoredis/sdlgoredis.go +++ b/internal/sdlgoredis/sdlgoredis.go @@ -25,8 +25,12 @@ package sdlgoredis import ( "errors" "fmt" - "github.com/go-redis/redis" + "github.com/go-redis/redis/v7" + "io" + "log" + "net" "os" + "reflect" "strconv" "strings" "sync" @@ -53,14 +57,18 @@ type Config struct { masterName string sentinelPort string clusterAddrList string + nodeCnt string } type DB struct { client RedisClient + sentinel RedisSentinelCreateCb subscribe SubscribeFn redisModules bool sCbMap *sharedCbMap ch intChannels + cfg Config + addr string } type Subscriber interface { @@ -92,6 +100,18 @@ type RedisClient interface { EvalSha(sha1 string, keys []string, args ...interface{}) *redis.Cmd ScriptExists(scripts ...string) *redis.BoolSliceCmd ScriptLoad(script string) *redis.StringCmd + Info(section ...string) *redis.StringCmd +} + +var dbLogger *log.Logger + +func init() { + dbLogger = log.New(os.Stdout, "database: ", log.LstdFlags|log.Lshortfile) + redis.SetLogger(dbLogger) +} + +func SetDbLogger(out io.Writer) { + dbLogger.SetOutput(out) } func checkResultAndError(result interface{}, err error) (bool, error) { @@ -127,9 +147,10 @@ func subscribeNotifications(client RedisClient, channels ...string) Subscriber { return client.Subscribe(channels...) } -func CreateDB(client RedisClient, subscribe SubscribeFn) *DB { +func CreateDB(client RedisClient, subscribe SubscribeFn, sentinelCreateCb RedisSentinelCreateCb, cfg Config, sentinelAddr string) *DB { db := DB{ client: client, + sentinel: sentinelCreateCb, subscribe: subscribe, redisModules: true, sCbMap: &sharedCbMap{cbMap: make(map[string]ChannelNotificationCb, 0)}, @@ -138,6 +159,8 @@ func CreateDB(client RedisClient, subscribe SubscribeFn) *DB { removeChannel: make(chan string), exit: make(chan bool), }, + cfg: cfg, + addr: sentinelAddr, } return &db @@ -145,7 +168,7 @@ func CreateDB(client RedisClient, subscribe SubscribeFn) *DB { func Create() []*DB { osimpl := osImpl{} - return ReadConfigAndCreateDbClients(osimpl, newRedisClient) + return ReadConfigAndCreateDbClients(osimpl, newRedisClient, subscribeNotifications, newRedisSentinel) } func readConfig(osI OS) Config { @@ -155,6 +178,7 @@ func readConfig(osI OS) Config { masterName: osI.Getenv("DBAAS_MASTER_NAME", ""), sentinelPort: osI.Getenv("DBAAS_SERVICE_SENTINEL_PORT", ""), clusterAddrList: osI.Getenv("DBAAS_CLUSTER_ADDR_LIST", ""), + nodeCnt: osI.Getenv("DBAAS_NODE_COUNT", "1"), } return cfg } @@ -173,38 +197,48 @@ func (osImpl) Getenv(key string, defValue string) string { return val } -func ReadConfigAndCreateDbClients(osI OS, clientCreator RedisClientCreator) []*DB { +func ReadConfigAndCreateDbClients(osI OS, clientCreator RedisClientCreator, + subscribe SubscribeFn, + sentinelCreateCb RedisSentinelCreateCb) []*DB { cfg := readConfig(osI) - return createDbClients(cfg, clientCreator) + return createDbClients(cfg, clientCreator, subscribe, sentinelCreateCb) } -func createDbClients(cfg Config, clientCreator RedisClientCreator) []*DB { +func createDbClients(cfg Config, clientCreator RedisClientCreator, + subscribe SubscribeFn, + sentinelCreateCb RedisSentinelCreateCb) []*DB { if cfg.clusterAddrList == "" { - return []*DB{createLegacyDbClient(cfg, clientCreator)} + return []*DB{createLegacyDbClient(cfg, clientCreator, subscribe, sentinelCreateCb)} } dbs := []*DB{} addrList := strings.Split(cfg.clusterAddrList, ",") for _, addr := range addrList { - db := createDbClient(cfg, addr, clientCreator) + db := createDbClient(cfg, addr, clientCreator, subscribe, sentinelCreateCb) dbs = append(dbs, db) } return dbs } -func createLegacyDbClient(cfg Config, clientCreator RedisClientCreator) *DB { - return createDbClient(cfg, cfg.hostname, clientCreator) +func createLegacyDbClient(cfg Config, clientCreator RedisClientCreator, + subscribe SubscribeFn, + sentinelCreateCb RedisSentinelCreateCb) *DB { + return createDbClient(cfg, cfg.hostname, clientCreator, subscribe, sentinelCreateCb) } -func createDbClient(cfg Config, hostName string, clientCreator RedisClientCreator) *DB { +func createDbClient(cfg Config, hostName string, clientCreator RedisClientCreator, + subscribe SubscribeFn, + sentinelCreateCb RedisSentinelCreateCb) *DB { var client RedisClient + var db *DB if cfg.sentinelPort == "" { client = clientCreator(hostName, cfg.port, "", false) + db = CreateDB(client, subscribe, nil, cfg, hostName) } else { client = clientCreator(hostName, cfg.sentinelPort, cfg.masterName, true) + db = CreateDB(client, subscribe, sentinelCreateCb, cfg, hostName) } - db := CreateDB(client, subscribeNotifications) db.CheckCommands() return db } @@ -243,7 +277,7 @@ func (db *DB) CheckCommands() { } } } else { - fmt.Println(err) + dbLogger.Printf("SDL DB commands checking failure: %s\n", err) } } @@ -291,7 +325,7 @@ func (db *DB) SubscribeChannelDB(cb func(string, ...string), channelPrefix, even case exit := <-ch.exit: if exit { if err := sub.Close(); err != nil { - fmt.Println(err) + dbLogger.Printf("SDL DB channel closing failure: %s\n", err) } return } @@ -455,6 +489,385 @@ func (db *DB) PTTL(key string) (time.Duration, error) { return result, err } +func (db *DB) Info() (*DbInfo, error) { + var info DbInfo + resultStr, err := db.client.Info("all").Result() + if err != nil { + return &info, err + } + + result := strings.Split(strings.ReplaceAll(resultStr, "\r\n", "\n"), "\n") + readRedisInfoReplyFields(result, &info) + return &info, nil +} + +func lineContains(line, substr string) bool { + return strings.Contains(line, substr) +} + +func getFieldValueStr(line, substr string) string { + if idx := strings.Index(line, substr); idx != -1 { + return line[idx+len(substr):] + } + return "" +} + +func getUint32FromString(s string) uint32 { + if val, err := strconv.ParseUint(s, 10, 32); err == nil { + return uint32(val) + } + return 0 +} + +func getUint64FromString(s string) uint64 { + if val, err := strconv.ParseUint(s, 10, 64); err == nil { + return uint64(val) + } + return 0 +} + +func getFloatFromString(s string, bitSize int) float64 { + if val, err := strconv.ParseFloat(s, bitSize); err == nil { + return val + } + return 0 +} + +func getFloat64FromString(s string) float64 { + return getFloatFromString(s, 64) +} + +func getFloat32FromString(s string) float32 { + return float32(getFloatFromString(s, 32)) +} + +func getValueString(values string, key string) string { + slice := strings.Split(values, ",") + for _, s := range slice { + if lineContains(s, key) { + return getFieldValueStr(s, key) + } + } + return "" +} + +func getCommandstatsValues(values string) (string, string, string) { + calls := getValueString(values, "calls=") + usec := getValueString(values, "usec=") + usecPerCall := getValueString(values, "usec_per_call=") + return calls, usec, usecPerCall +} + +func updateCommandstatsValues(i interface{}, values string) { + stype := reflect.ValueOf(i).Elem() + callsStr, usecStr, usecPerCallStr := getCommandstatsValues(values) + + callsField := stype.FieldByName("Calls") + callsField.Set(reflect.ValueOf(getUint32FromString(callsStr))) + + usecField := stype.FieldByName("Usec") + usecField.Set(reflect.ValueOf(getUint32FromString(usecStr))) + + usecPerCallField := stype.FieldByName("UsecPerCall") + usecPerCallField.Set(reflect.ValueOf(getFloat32FromString(usecPerCallStr))) +} + +func getKeyspaceValues(values string) (string, string, string) { + keys := getValueString(values, "keys=") + expires := getValueString(values, "expires=") + avgttl := getValueString(values, "avg_ttl=") + return keys, expires, avgttl +} + +func updateKeyspaceValues(i interface{}, values string) { + stype := reflect.ValueOf(i).Elem() + keysStr, expiresStr, avgttlStr := getKeyspaceValues(values) + + keysField := stype.FieldByName("Keys") + keysField.Set(reflect.ValueOf(getUint32FromString(keysStr))) + + expiresField := stype.FieldByName("Expires") + expiresField.Set(reflect.ValueOf(getUint32FromString(expiresStr))) + + avgttlField := stype.FieldByName("AvgTtl") + avgttlField.Set(reflect.ValueOf(getUint32FromString(avgttlStr))) +} + +func updateServerInfoFields(config ConfigInfo, info *DbInfo) { + if value, ok := config["uptime_in_days"]; ok { + info.Fields.Server.UptimeInDays = getUint32FromString(value) + } +} + +func updateClientInfoFields(config ConfigInfo, info *DbInfo) { + if value, ok := config["connected_clients"]; ok { + info.Fields.Clients.ConnectedClients = getUint32FromString(value) + } + if value, ok := config["client_recent_max_input_buffer"]; ok { + info.Fields.Clients.ClientRecentMaxInputBuffer = getUint32FromString(value) + } + if value, ok := config["client_recent_max_output_buffer"]; ok { + info.Fields.Clients.ClientRecentMaxOutputBuffer = getUint32FromString(value) + } +} + +func updateMemoryInfoFields(config ConfigInfo, info *DbInfo) { + if value, ok := config["used_memory"]; ok { + info.Fields.Memory.UsedMemory = getUint64FromString(value) + } + if value, ok := config["used_memory_human"]; ok { + info.Fields.Memory.UsedMemoryHuman = value + } + if value, ok := config["used_memory_rss"]; ok { + info.Fields.Memory.UsedMemoryRss = getUint64FromString(value) + } + if value, ok := config["used_memory_rss_human"]; ok { + info.Fields.Memory.UsedMemoryRssHuman = value + } + if value, ok := config["used_memory_peak"]; ok { + info.Fields.Memory.UsedMemoryPeak = getUint64FromString(value) + } + if value, ok := config["used_memory_peak_human"]; ok { + info.Fields.Memory.UsedMemoryPeakHuman = value + } + if value, ok := config["used_memory_peak_perc"]; ok { + info.Fields.Memory.UsedMemoryPeakPerc = value + } + if value, ok := config["mem_fragmentation_ratio"]; ok { + info.Fields.Memory.MemFragmentationRatio = getFloat32FromString(value) + } + if value, ok := config["mem_fragmentation_bytes"]; ok { + info.Fields.Memory.MemFragmentationBytes = getUint32FromString(value) + } +} + +func updateStatsInfoFields(config ConfigInfo, info *DbInfo) { + if value, ok := config["total_connections_received"]; ok { + info.Fields.Stats.TotalConnectionsReceived = getUint32FromString(value) + } + if value, ok := config["total_commands_processed"]; ok { + info.Fields.Stats.TotalCommandsProcessed = getUint32FromString(value) + } + if value, ok := config["sync_full"]; ok { + info.Fields.Stats.SyncFull = getUint32FromString(value) + } + if value, ok := config["sync_partial_ok"]; ok { + info.Fields.Stats.SyncPartialOk = getUint32FromString(value) + } + if value, ok := config["sync_partial_err"]; ok { + info.Fields.Stats.SyncPartialErr = getUint32FromString(value) + } + if value, ok := config["pubsub_channels"]; ok { + info.Fields.Stats.PubsubChannels = getUint32FromString(value) + } +} + +func updateCpuInfoFields(config ConfigInfo, info *DbInfo) { + if value, ok := config["used_cpu_sys"]; ok { + info.Fields.Cpu.UsedCpuSys = getFloat64FromString(value) + } + if value, ok := config["used_cpu_user"]; ok { + info.Fields.Cpu.UsedCpuUser = getFloat64FromString(value) + } +} + +func updateCommandstatsInfoFields(config ConfigInfo, info *DbInfo) { + if values, ok := config["cmdstat_replconf"]; ok { + updateCommandstatsValues(&info.Fields.Commandstats.CmdstatReplconf, values) + } + if values, ok := config["cmdstat_keys"]; ok { + updateCommandstatsValues(&info.Fields.Commandstats.CmdstatKeys, values) + } + if values, ok := config["cmdstat_role"]; ok { + updateCommandstatsValues(&info.Fields.Commandstats.CmdstatRole, values) + } + if values, ok := config["cmdstat_psync"]; ok { + updateCommandstatsValues(&info.Fields.Commandstats.CmdstatPsync, values) + } + if values, ok := config["cmdstat_mset"]; ok { + updateCommandstatsValues(&info.Fields.Commandstats.CmdstatMset, values) + } + if values, ok := config["cmdstat_publish"]; ok { + updateCommandstatsValues(&info.Fields.Commandstats.CmdstatPublish, values) + } + if values, ok := config["cmdstat_info"]; ok { + updateCommandstatsValues(&info.Fields.Commandstats.CmdstatInfo, values) + } + if values, ok := config["cmdstat_ping"]; ok { + updateCommandstatsValues(&info.Fields.Commandstats.CmdstatPing, values) + } + if values, ok := config["cmdstat_client"]; ok { + updateCommandstatsValues(&info.Fields.Commandstats.CmdstatClient, values) + } + if values, ok := config["cmdstat_command"]; ok { + updateCommandstatsValues(&info.Fields.Commandstats.CmdstatCommand, values) + } + if values, ok := config["cmdstat_subscribe"]; ok { + updateCommandstatsValues(&info.Fields.Commandstats.CmdstatSubscribe, values) + } + if values, ok := config["cmdstat_monitor"]; ok { + updateCommandstatsValues(&info.Fields.Commandstats.CmdstatMonitor, values) + } + if values, ok := config["cmdstat_config"]; ok { + updateCommandstatsValues(&info.Fields.Commandstats.CmdstatConfig, values) + } + if values, ok := config["cmdstat_slaveof"]; ok { + updateCommandstatsValues(&info.Fields.Commandstats.CmdstatSlaveof, values) + } +} + +func updateKeyspaceInfoFields(config ConfigInfo, info *DbInfo) { + if values, ok := config["db0"]; ok { + updateKeyspaceValues(&info.Fields.Keyspace.Db, values) + } +} + +func getConfigInfo(input []string) ConfigInfo { + config := ConfigInfo{} + for _, line := range input { + if i := strings.Index(line, ":"); i != -1 { + if key := strings.TrimSpace(line[:i]); len(key) > 0 { + if len(line) > i { + config[key] = strings.TrimSpace(line[i+1:]) + } + } + } + } + return config +} + +func readRedisInfoReplyFields(input []string, info *DbInfo) { + config := getConfigInfo(input) + + if value, ok := config["role"]; ok { + if "master" == value { + info.Fields.PrimaryRole = true + } + } + if value, ok := config["connected_slaves"]; ok { + info.Fields.ConnectedReplicaCnt = getUint32FromString(value) + } + updateServerInfoFields(config, info) + updateClientInfoFields(config, info) + updateMemoryInfoFields(config, info) + updateStatsInfoFields(config, info) + updateCpuInfoFields(config, info) + updateCommandstatsInfoFields(config, info) + updateKeyspaceInfoFields(config, info) +} + +func (db *DB) State() (*DbState, error) { + dbState := new(DbState) + if db.cfg.sentinelPort != "" { + //Establish connection to Redis sentinel. The reason why connection is done + //here instead of time of the SDL instance creation is that for the time being + //sentinel connection is needed only here to get state information and + //state information is needed only by 'sdlcli' hence it is not time critical + //and also we want to avoid opening unnecessary TCP connections towards Redis + //sentinel for every SDL instance. Now it is done only when 'sdlcli' is used. + sentinelClient := db.sentinel(&db.cfg, db.addr) + return sentinelClient.GetDbState() + } else { + info, err := db.Info() + if err != nil { + dbState.PrimaryDbState.Err = err + return dbState, err + } + return db.fillDbStateFromDbInfo(info) + } +} + +func (db *DB) fillDbStateFromDbInfo(info *DbInfo) (*DbState, error) { + var dbState DbState + if info.Fields.PrimaryRole == true { + dbState = DbState{ + PrimaryDbState: PrimaryDbState{ + Fields: PrimaryDbStateFields{ + Role: "master", + Ip: db.cfg.hostname, + Port: db.cfg.port, + Flags: "master", + }, + }, + } + } + + cnt, err := strconv.Atoi(db.cfg.nodeCnt) + if err != nil { + dbState.Err = fmt.Errorf("DBAAS_NODE_COUNT configuration value '%s' conversion to integer failed", db.cfg.nodeCnt) + } else { + dbState.ConfigNodeCnt = cnt + } + + return &dbState, dbState.Err +} + +func createReplicaDbClient(host string) *DB { + cfg := readConfig(osImpl{}) + cfg.sentinelPort = "" + cfg.clusterAddrList, cfg.port, _ = net.SplitHostPort(host) + return createDbClient(cfg, cfg.clusterAddrList, newRedisClient, subscribeNotifications, nil) +} + +func getStatisticsInfo(db *DB, host string) (*DbStatisticsInfo, error) { + dbStatisticsInfo := new(DbStatisticsInfo) + dbStatisticsInfo.IPAddr, dbStatisticsInfo.Port, _ = net.SplitHostPort(host) + + info, err := db.Info() + if err != nil { + return nil, err + } + dbStatisticsInfo.Info = info + + return dbStatisticsInfo, nil +} + +func sentinelStatistics(db *DB) (*DbStatistics, error) { + dbState := new(DbState) + dbStatistics := new(DbStatistics) + dbStatisticsInfo := new(DbStatisticsInfo) + var err error + + dbState, err = db.State() + if err != nil { + return nil, err + } + + dbStatisticsInfo, err = getStatisticsInfo(db, dbState.PrimaryDbState.GetAddress()) + dbStatistics.Stats = append(dbStatistics.Stats, dbStatisticsInfo) + + if dbState.ReplicasDbState != nil { + for _, r := range dbState.ReplicasDbState.States { + replicaDb := createReplicaDbClient(r.GetAddress()) + dbStatisticsInfo, err = getStatisticsInfo(replicaDb, r.GetAddress()) + replicaDb.CloseDB() + if err != nil { + return nil, err + } + dbStatistics.Stats = append(dbStatistics.Stats, dbStatisticsInfo) + } + } + + return dbStatistics, nil +} + +func standaloneStatistics(db *DB) (*DbStatistics, error) { + dbStatistics := new(DbStatistics) + + dbStatisticsInfo, err := getStatisticsInfo(db, net.JoinHostPort(db.cfg.hostname, db.cfg.port)) + dbStatistics.Stats = append(dbStatistics.Stats, dbStatisticsInfo) + + return dbStatistics, err +} + +func (db *DB) Statistics() (*DbStatistics, error) { + if db.cfg.sentinelPort != "" { + return sentinelStatistics(db) + } + + return standaloneStatistics(db) +} + var luaRefresh = redis.NewScript(`if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("pexpire", KEYS[1], ARGV[2]) else return 0 end`) func (db *DB) PExpireIE(key string, data interface{}, expiration time.Duration) error {