X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=internal%2Fsdlgoredis%2Fsdlgoredis.go;h=7f66f0cb68dc4b660fac700e7458164eaf9027c7;hb=refs%2Fchanges%2F48%2F9148%2F1;hp=77dce092fd1ef6f4d457582f8cb4b4966101a5b8;hpb=6f724aa3950cdc01246351644348fdc0f6e9ca4a;p=ric-plt%2Fsdlgo.git diff --git a/internal/sdlgoredis/sdlgoredis.go b/internal/sdlgoredis/sdlgoredis.go index 77dce09..7f66f0c 100644 --- a/internal/sdlgoredis/sdlgoredis.go +++ b/internal/sdlgoredis/sdlgoredis.go @@ -1,6 +1,6 @@ /* Copyright (c) 2019 AT&T Intellectual Property. - Copyright (c) 2018-2019 Nokia. + Copyright (c) 2018-2022 Nokia. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -23,18 +23,24 @@ package sdlgoredis import ( + "context" "errors" "fmt" - "github.com/go-redis/redis/v7" + "github.com/go-redis/redis/v8" "io" "log" + "net" "os" + "reflect" "strconv" "strings" "sync" "time" ) +const EventSeparator = "___" +const NsSeparator = "," + type ChannelNotificationCb func(channel string, payload ...string) type RedisClientCreator func(addr, port, clusterName string, isHa bool) RedisClient @@ -50,66 +56,72 @@ type sharedCbMap struct { } type Config struct { - hostname string - port string - masterName string - sentinelPort string - clusterAddrList string - nodeCnt string + hostname string + ports []string + masterNames []string + sentinelPorts []string + clusterAddrs []string + nodeCnt string } type DB struct { + ctx context.Context client RedisClient sentinel RedisSentinelCreateCb subscribe SubscribeFn redisModules bool sCbMap *sharedCbMap ch intChannels - cfg Config addr string + port string + sentinelPort string + masterName string + nodeCnt string } type Subscriber interface { - Channel() <-chan *redis.Message - Subscribe(channels ...string) error - Unsubscribe(channels ...string) error + Channel(opts ...redis.ChannelOption) <-chan *redis.Message + Subscribe(ctx context.Context, channels ...string) error + Unsubscribe(ctx context.Context, channels ...string) error Close() error } -type SubscribeFn func(client RedisClient, channels ...string) Subscriber +type SubscribeFn func(ctx context.Context, client RedisClient, channels ...string) Subscriber type RedisClient interface { - Command() *redis.CommandsInfoCmd + Command(ctx context.Context) *redis.CommandsInfoCmd Close() error - Subscribe(channels ...string) *redis.PubSub - MSet(pairs ...interface{}) *redis.StatusCmd - Do(args ...interface{}) *redis.Cmd - MGet(keys ...string) *redis.SliceCmd - Del(keys ...string) *redis.IntCmd - Keys(pattern string) *redis.StringSliceCmd - SetNX(key string, value interface{}, expiration time.Duration) *redis.BoolCmd - SAdd(key string, members ...interface{}) *redis.IntCmd - SRem(key string, members ...interface{}) *redis.IntCmd - SMembers(key string) *redis.StringSliceCmd - SIsMember(key string, member interface{}) *redis.BoolCmd - SCard(key string) *redis.IntCmd - PTTL(key string) *redis.DurationCmd - Eval(script string, keys []string, args ...interface{}) *redis.Cmd - EvalSha(sha1 string, keys []string, args ...interface{}) *redis.Cmd - ScriptExists(scripts ...string) *redis.BoolSliceCmd - ScriptLoad(script string) *redis.StringCmd - Info(section ...string) *redis.StringCmd -} - -var dbLogger *log.Logger + Subscribe(ctx context.Context, channels ...string) *redis.PubSub + MSet(ctx context.Context, pairs ...interface{}) *redis.StatusCmd + Do(ctx context.Context, args ...interface{}) *redis.Cmd + MGet(ctx context.Context, keys ...string) *redis.SliceCmd + Del(ctx context.Context, keys ...string) *redis.IntCmd + Keys(ctx context.Context, pattern string) *redis.StringSliceCmd + SetNX(ctx context.Context, key string, value interface{}, expiration time.Duration) *redis.BoolCmd + SAdd(ctx context.Context, key string, members ...interface{}) *redis.IntCmd + SRem(ctx context.Context, key string, members ...interface{}) *redis.IntCmd + SMembers(ctx context.Context, key string) *redis.StringSliceCmd + SIsMember(ctx context.Context, key string, member interface{}) *redis.BoolCmd + SCard(ctx context.Context, key string) *redis.IntCmd + PTTL(ctx context.Context, key string) *redis.DurationCmd + Eval(ctx context.Context, script string, keys []string, args ...interface{}) *redis.Cmd + EvalSha(ctx context.Context, sha1 string, keys []string, args ...interface{}) *redis.Cmd + ScriptExists(ctx context.Context, scripts ...string) *redis.BoolSliceCmd + ScriptLoad(ctx context.Context, script string) *redis.StringCmd + Info(ctx context.Context, section ...string) *redis.StringCmd +} + +var dbLogger *logger func init() { - dbLogger = log.New(os.Stdout, "database: ", log.LstdFlags|log.Lshortfile) + dbLogger = &logger{ + log: log.New(os.Stdout, "database: ", log.LstdFlags|log.Lshortfile), + } redis.SetLogger(dbLogger) } func SetDbLogger(out io.Writer) { - dbLogger.SetOutput(out) + dbLogger.log.SetOutput(out) } func checkResultAndError(result interface{}, err error) (bool, error) { @@ -141,12 +153,14 @@ func checkIntResultAndError(result interface{}, err error) (bool, error) { return false, nil } -func subscribeNotifications(client RedisClient, channels ...string) Subscriber { - return client.Subscribe(channels...) +func subscribeNotifications(ctx context.Context, client RedisClient, channels ...string) Subscriber { + return client.Subscribe(ctx, channels...) } -func CreateDB(client RedisClient, subscribe SubscribeFn, sentinelCreateCb RedisSentinelCreateCb, cfg Config, sentinelAddr string) *DB { +func CreateDB(client RedisClient, subscribe SubscribeFn, sentinelCreateCb RedisSentinelCreateCb, + addr, port, sentinelPort, masterName, nodeCnt string) *DB { db := DB{ + ctx: context.Background(), client: client, sentinel: sentinelCreateCb, subscribe: subscribe, @@ -157,8 +171,11 @@ func CreateDB(client RedisClient, subscribe SubscribeFn, sentinelCreateCb RedisS removeChannel: make(chan string), exit: make(chan bool), }, - cfg: cfg, - addr: sentinelAddr, + addr: addr, + sentinelPort: sentinelPort, + port: port, + masterName: masterName, + nodeCnt: nodeCnt, } return &db @@ -171,13 +188,23 @@ func Create() []*DB { func readConfig(osI OS) Config { cfg := Config{ - hostname: osI.Getenv("DBAAS_SERVICE_HOST", "localhost"), - port: osI.Getenv("DBAAS_SERVICE_PORT", "6379"), - masterName: osI.Getenv("DBAAS_MASTER_NAME", ""), - sentinelPort: osI.Getenv("DBAAS_SERVICE_SENTINEL_PORT", ""), - clusterAddrList: osI.Getenv("DBAAS_CLUSTER_ADDR_LIST", ""), - nodeCnt: osI.Getenv("DBAAS_NODE_COUNT", "1"), + hostname: osI.Getenv("DBAAS_SERVICE_HOST", "localhost"), + ports: strings.Split(osI.Getenv("DBAAS_SERVICE_PORT", "6379"), ","), + nodeCnt: osI.Getenv("DBAAS_NODE_COUNT", "1"), + } + + if addrStr := osI.Getenv("DBAAS_CLUSTER_ADDR_LIST", ""); addrStr != "" { + cfg.clusterAddrs = strings.Split(addrStr, ",") + } else if cfg.hostname != "" { + cfg.clusterAddrs = append(cfg.clusterAddrs, cfg.hostname) + } + if sntPortStr := osI.Getenv("DBAAS_SERVICE_SENTINEL_PORT", ""); sntPortStr != "" { + cfg.sentinelPorts = strings.Split(sntPortStr, ",") + } + if nameStr := osI.Getenv("DBAAS_MASTER_NAME", ""); nameStr != "" { + cfg.masterNames = strings.Split(nameStr, ",") } + completeConfig(&cfg) return cfg } @@ -195,47 +222,61 @@ func (osImpl) Getenv(key string, defValue string) string { return val } -func ReadConfigAndCreateDbClients(osI OS, clientCreator RedisClientCreator, - subscribe SubscribeFn, - sentinelCreateCb RedisSentinelCreateCb) []*DB { - cfg := readConfig(osI) - return createDbClients(cfg, clientCreator, subscribe, sentinelCreateCb) +func completeConfig(cfg *Config) { + if len(cfg.sentinelPorts) == 0 { + if len(cfg.clusterAddrs) > len(cfg.ports) && len(cfg.ports) > 0 { + for i := len(cfg.ports); i < len(cfg.clusterAddrs); i++ { + cfg.ports = append(cfg.ports, cfg.ports[i-1]) + } + } + } else { + if len(cfg.clusterAddrs) > len(cfg.sentinelPorts) { + for i := len(cfg.sentinelPorts); i < len(cfg.clusterAddrs); i++ { + cfg.sentinelPorts = append(cfg.sentinelPorts, cfg.sentinelPorts[i-1]) + } + } + if len(cfg.clusterAddrs) > len(cfg.masterNames) && len(cfg.masterNames) > 0 { + for i := len(cfg.masterNames); i < len(cfg.clusterAddrs); i++ { + cfg.masterNames = append(cfg.masterNames, cfg.masterNames[i-1]) + } + } + } } -func createDbClients(cfg Config, clientCreator RedisClientCreator, +func ReadConfigAndCreateDbClients(osI OS, clientCreator RedisClientCreator, subscribe SubscribeFn, sentinelCreateCb RedisSentinelCreateCb) []*DB { - if cfg.clusterAddrList == "" { - return []*DB{createLegacyDbClient(cfg, clientCreator, subscribe, sentinelCreateCb)} - } - dbs := []*DB{} - - addrList := strings.Split(cfg.clusterAddrList, ",") - for _, addr := range addrList { - db := createDbClient(cfg, addr, clientCreator, subscribe, sentinelCreateCb) + cfg := readConfig(osI) + for i, addr := range cfg.clusterAddrs { + port := getListItem(cfg.ports, i) + sntPort := getListItem(cfg.sentinelPorts, i) + name := getListItem(cfg.masterNames, i) + db := createDbClient(addr, port, sntPort, name, cfg.nodeCnt, + clientCreator, subscribe, sentinelCreateCb) dbs = append(dbs, db) } return dbs } -func createLegacyDbClient(cfg Config, clientCreator RedisClientCreator, - subscribe SubscribeFn, - sentinelCreateCb RedisSentinelCreateCb) *DB { - return createDbClient(cfg, cfg.hostname, clientCreator, subscribe, sentinelCreateCb) +func getListItem(list []string, index int) string { + if index < len(list) { + return list[index] + } + return "" } -func createDbClient(cfg Config, hostName string, clientCreator RedisClientCreator, +func createDbClient(addr, port, sentinelPort, masterName, nodeCnt string, clientCreator RedisClientCreator, subscribe SubscribeFn, sentinelCreateCb RedisSentinelCreateCb) *DB { var client RedisClient var db *DB - if cfg.sentinelPort == "" { - client = clientCreator(hostName, cfg.port, "", false) - db = CreateDB(client, subscribe, nil, cfg, hostName) + if sentinelPort == "" { + client = clientCreator(addr, port, "", false) + db = CreateDB(client, subscribe, nil, addr, port, sentinelPort, masterName, nodeCnt) } else { - client = clientCreator(hostName, cfg.sentinelPort, cfg.masterName, true) - db = CreateDB(client, subscribe, sentinelCreateCb, cfg, hostName) + client = clientCreator(addr, sentinelPort, masterName, true) + db = CreateDB(client, subscribe, sentinelCreateCb, addr, port, sentinelPort, masterName, nodeCnt) } db.CheckCommands() return db @@ -264,7 +305,7 @@ func newRedisClient(addr, port, clusterName string, isHa bool) RedisClient { } func (db *DB) CheckCommands() { - commands, err := db.client.Command().Result() + commands, err := db.client.Command(db.ctx).Result() if err == nil { redisModuleCommands := []string{"setie", "delie", "setiepub", "setnxpub", "msetmpub", "delmpub"} @@ -275,7 +316,7 @@ func (db *DB) CheckCommands() { } } } else { - dbLogger.Printf("SDL DB commands checking failure: %s\n", err) + dbLogger.Printf(db.ctx, "SDL DB commands checking failure: %s\n", err) } } @@ -283,28 +324,25 @@ func (db *DB) CloseDB() error { return db.client.Close() } -func (db *DB) UnsubscribeChannelDB(channels ...string) { +func (db *DB) UnsubscribeChannelDB(channels ...string) error { for _, v := range channels { db.sCbMap.Remove(v) db.ch.removeChannel <- v + errStr := <-db.ch.removeChannel + if errStr != "" { + return fmt.Errorf("SDL Unsubscribe of channel %s failed: %s", v, errStr) + } if db.sCbMap.Count() == 0 { db.ch.exit <- true } } + return nil } -func (db *DB) SubscribeChannelDB(cb func(string, ...string), channelPrefix, eventSeparator string, channels ...string) { +func (db *DB) SubscribeChannelDB(cb func(string, ...string), channels ...string) error { if db.sCbMap.Count() == 0 { - for _, v := range channels { - db.sCbMap.Add(v, cb) - } - - go func(sCbMap *sharedCbMap, - channelPrefix, - eventSeparator string, - ch intChannels, - channels ...string) { - sub := db.subscribe(db.client, channels...) + go func(sCbMap *sharedCbMap, ch intChannels) { + sub := db.subscribe(db.ctx, db.client, "") rxChannel := sub.Channel() lCbMap := sCbMap.GetMapCopy() for { @@ -312,35 +350,47 @@ func (db *DB) SubscribeChannelDB(cb func(string, ...string), channelPrefix, even case msg := <-rxChannel: cb, ok := lCbMap[msg.Channel] if ok { - cb(strings.TrimPrefix(msg.Channel, channelPrefix), strings.Split(msg.Payload, eventSeparator)...) + nSChNames := strings.SplitAfterN(msg.Channel, NsSeparator, 2) + cb(nSChNames[1], strings.Split(msg.Payload, EventSeparator)...) } case channel := <-ch.addChannel: lCbMap = sCbMap.GetMapCopy() - sub.Subscribe(channel) + if err := sub.Subscribe(db.ctx, channel); err != nil { + ch.addChannel <- err.Error() + } else { + ch.addChannel <- "" + } case channel := <-ch.removeChannel: lCbMap = sCbMap.GetMapCopy() - sub.Unsubscribe(channel) + if err := sub.Unsubscribe(db.ctx, channel); err != nil { + ch.removeChannel <- err.Error() + } else { + ch.removeChannel <- "" + } case exit := <-ch.exit: if exit { if err := sub.Close(); err != nil { - dbLogger.Printf("SDL DB channel closing failure: %s\n", err) + dbLogger.Printf(db.ctx, "SDL DB channel closing failure: %s\n", err) } return } } } - }(db.sCbMap, channelPrefix, eventSeparator, db.ch, channels...) - - } else { - for _, v := range channels { - db.sCbMap.Add(v, cb) - db.ch.addChannel <- v + }(db.sCbMap, db.ch) + } + for _, v := range channels { + db.sCbMap.Add(v, cb) + db.ch.addChannel <- v + errStr := <-db.ch.addChannel + if errStr != "" { + return fmt.Errorf("SDL Subscribe of channel %s failed: %s", v, errStr) } } + return nil } func (db *DB) MSet(pairs ...interface{}) error { - return db.client.MSet(pairs...).Err() + return db.client.MSet(db.ctx, pairs...).Err() } func (db *DB) MSetMPub(channelsAndEvents []string, pairs ...interface{}) error { @@ -357,12 +407,12 @@ func (db *DB) MSetMPub(channelsAndEvents []string, pairs ...interface{}) error { for _, d := range channelsAndEvents { command = append(command, d) } - _, err := db.client.Do(command...).Result() + _, err := db.client.Do(db.ctx, command...).Result() return err } func (db *DB) MGet(keys []string) ([]interface{}, error) { - return db.client.MGet(keys...).Result() + return db.client.MGet(db.ctx, keys...).Result() } func (db *DB) DelMPub(channelsAndEvents []string, keys []string) error { @@ -379,18 +429,18 @@ func (db *DB) DelMPub(channelsAndEvents []string, keys []string) error { for _, d := range channelsAndEvents { command = append(command, d) } - _, err := db.client.Do(command...).Result() + _, err := db.client.Do(db.ctx, command...).Result() return err } func (db *DB) Del(keys []string) error { - _, err := db.client.Del(keys...).Result() + _, err := db.client.Del(db.ctx, keys...).Result() return err } func (db *DB) Keys(pattern string) ([]string, error) { - return db.client.Keys(pattern).Result() + return db.client.Keys(db.ctx, pattern).Result() } func (db *DB) SetIE(key string, oldData, newData interface{}) (bool, error) { @@ -398,7 +448,7 @@ func (db *DB) SetIE(key string, oldData, newData interface{}) (bool, error) { return false, errors.New("Redis deployment not supporting command") } - return checkResultAndError(db.client.Do("SETIE", key, newData, oldData).Result()) + return checkResultAndError(db.client.Do(db.ctx, "SETIE", key, newData, oldData).Result()) } func (db *DB) SetIEPub(channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error) { @@ -414,7 +464,7 @@ func (db *DB) SetIEPub(channelsAndEvents []string, key string, oldData, newData for _, ce := range channelsAndEvents { command = append(command, ce) } - return checkResultAndError(db.client.Do(command...).Result()) + return checkResultAndError(db.client.Do(db.ctx, command...).Result()) } func (db *DB) SetNXPub(channelsAndEvents []string, key string, data interface{}) (bool, error) { @@ -429,10 +479,10 @@ func (db *DB) SetNXPub(channelsAndEvents []string, key string, data interface{}) for _, ce := range channelsAndEvents { command = append(command, ce) } - return checkResultAndError(db.client.Do(command...).Result()) + return checkResultAndError(db.client.Do(db.ctx, command...).Result()) } func (db *DB) SetNX(key string, data interface{}, expiration time.Duration) (bool, error) { - return db.client.SetNX(key, data, expiration).Result() + return db.client.SetNX(db.ctx, key, data, expiration).Result() } func (db *DB) DelIEPub(channelsAndEvents []string, key string, data interface{}) (bool, error) { @@ -447,87 +497,323 @@ func (db *DB) DelIEPub(channelsAndEvents []string, key string, data interface{}) for _, ce := range channelsAndEvents { command = append(command, ce) } - return checkIntResultAndError(db.client.Do(command...).Result()) + return checkIntResultAndError(db.client.Do(db.ctx, command...).Result()) } func (db *DB) DelIE(key string, data interface{}) (bool, error) { if !db.redisModules { return false, errors.New("Redis deployment not supporting command") } - return checkIntResultAndError(db.client.Do("DELIE", key, data).Result()) + return checkIntResultAndError(db.client.Do(db.ctx, "DELIE", key, data).Result()) } func (db *DB) SAdd(key string, data ...interface{}) error { - _, err := db.client.SAdd(key, data...).Result() + _, err := db.client.SAdd(db.ctx, key, data...).Result() return err } func (db *DB) SRem(key string, data ...interface{}) error { - _, err := db.client.SRem(key, data...).Result() + _, err := db.client.SRem(db.ctx, key, data...).Result() return err } func (db *DB) SMembers(key string) ([]string, error) { - result, err := db.client.SMembers(key).Result() + result, err := db.client.SMembers(db.ctx, key).Result() return result, err } func (db *DB) SIsMember(key string, data interface{}) (bool, error) { - result, err := db.client.SIsMember(key, data).Result() + result, err := db.client.SIsMember(db.ctx, key, data).Result() return result, err } func (db *DB) SCard(key string) (int64, error) { - result, err := db.client.SCard(key).Result() + result, err := db.client.SCard(db.ctx, key).Result() return result, err } func (db *DB) PTTL(key string) (time.Duration, error) { - result, err := db.client.PTTL(key).Result() + result, err := db.client.PTTL(db.ctx, key).Result() return result, err } func (db *DB) Info() (*DbInfo, error) { var info DbInfo - resultStr, err := db.client.Info("all").Result() + resultStr, err := db.client.Info(db.ctx, "all").Result() if err != nil { return &info, err } result := strings.Split(strings.ReplaceAll(resultStr, "\r\n", "\n"), "\n") - err = readRedisInfoReplyFields(result, &info) - return &info, err + readRedisInfoReplyFields(result, &info) + return &info, nil +} + +func lineContains(line, substr string) bool { + return strings.Contains(line, substr) +} + +func getFieldValueStr(line, substr string) string { + if idx := strings.Index(line, substr); idx != -1 { + return line[idx+len(substr):] + } + return "" +} + +func getUint32FromString(s string) uint32 { + if val, err := strconv.ParseUint(s, 10, 32); err == nil { + return uint32(val) + } + return 0 } -func readRedisInfoReplyFields(input []string, info *DbInfo) error { +func getUint64FromString(s string) uint64 { + if val, err := strconv.ParseUint(s, 10, 64); err == nil { + return uint64(val) + } + return 0 +} + +func getFloatFromString(s string, bitSize int) float64 { + if val, err := strconv.ParseFloat(s, bitSize); err == nil { + return val + } + return 0 +} + +func getFloat64FromString(s string) float64 { + return getFloatFromString(s, 64) +} + +func getFloat32FromString(s string) float32 { + return float32(getFloatFromString(s, 32)) +} + +func getValueString(values string, key string) string { + slice := strings.Split(values, ",") + for _, s := range slice { + if lineContains(s, key) { + return getFieldValueStr(s, key) + } + } + return "" +} + +func getCommandstatsValues(values string) (string, string, string) { + calls := getValueString(values, "calls=") + usec := getValueString(values, "usec=") + usecPerCall := getValueString(values, "usec_per_call=") + return calls, usec, usecPerCall +} + +func updateCommandstatsValues(i interface{}, values string) { + stype := reflect.ValueOf(i).Elem() + callsStr, usecStr, usecPerCallStr := getCommandstatsValues(values) + + callsField := stype.FieldByName("Calls") + callsField.Set(reflect.ValueOf(getUint32FromString(callsStr))) + + usecField := stype.FieldByName("Usec") + usecField.Set(reflect.ValueOf(getUint32FromString(usecStr))) + + usecPerCallField := stype.FieldByName("UsecPerCall") + usecPerCallField.Set(reflect.ValueOf(getFloat32FromString(usecPerCallStr))) +} + +func getKeyspaceValues(values string) (string, string, string) { + keys := getValueString(values, "keys=") + expires := getValueString(values, "expires=") + avgttl := getValueString(values, "avg_ttl=") + return keys, expires, avgttl +} + +func updateKeyspaceValues(i interface{}, values string) { + stype := reflect.ValueOf(i).Elem() + keysStr, expiresStr, avgttlStr := getKeyspaceValues(values) + + keysField := stype.FieldByName("Keys") + keysField.Set(reflect.ValueOf(getUint32FromString(keysStr))) + + expiresField := stype.FieldByName("Expires") + expiresField.Set(reflect.ValueOf(getUint32FromString(expiresStr))) + + avgttlField := stype.FieldByName("AvgTtl") + avgttlField.Set(reflect.ValueOf(getUint32FromString(avgttlStr))) +} + +func updateServerInfoFields(config ConfigInfo, info *DbInfo) { + if value, ok := config["uptime_in_days"]; ok { + info.Fields.Server.UptimeInDays = getUint32FromString(value) + } +} + +func updateClientInfoFields(config ConfigInfo, info *DbInfo) { + if value, ok := config["connected_clients"]; ok { + info.Fields.Clients.ConnectedClients = getUint32FromString(value) + } + if value, ok := config["client_recent_max_input_buffer"]; ok { + info.Fields.Clients.ClientRecentMaxInputBuffer = getUint32FromString(value) + } + if value, ok := config["client_recent_max_output_buffer"]; ok { + info.Fields.Clients.ClientRecentMaxOutputBuffer = getUint32FromString(value) + } +} + +func updateMemoryInfoFields(config ConfigInfo, info *DbInfo) { + if value, ok := config["used_memory"]; ok { + info.Fields.Memory.UsedMemory = getUint64FromString(value) + } + if value, ok := config["used_memory_human"]; ok { + info.Fields.Memory.UsedMemoryHuman = value + } + if value, ok := config["used_memory_rss"]; ok { + info.Fields.Memory.UsedMemoryRss = getUint64FromString(value) + } + if value, ok := config["used_memory_rss_human"]; ok { + info.Fields.Memory.UsedMemoryRssHuman = value + } + if value, ok := config["used_memory_peak"]; ok { + info.Fields.Memory.UsedMemoryPeak = getUint64FromString(value) + } + if value, ok := config["used_memory_peak_human"]; ok { + info.Fields.Memory.UsedMemoryPeakHuman = value + } + if value, ok := config["used_memory_peak_perc"]; ok { + info.Fields.Memory.UsedMemoryPeakPerc = value + } + if value, ok := config["mem_fragmentation_ratio"]; ok { + info.Fields.Memory.MemFragmentationRatio = getFloat32FromString(value) + } + if value, ok := config["mem_fragmentation_bytes"]; ok { + info.Fields.Memory.MemFragmentationBytes = getUint32FromString(value) + } +} + +func updateStatsInfoFields(config ConfigInfo, info *DbInfo) { + if value, ok := config["total_connections_received"]; ok { + info.Fields.Stats.TotalConnectionsReceived = getUint32FromString(value) + } + if value, ok := config["total_commands_processed"]; ok { + info.Fields.Stats.TotalCommandsProcessed = getUint32FromString(value) + } + if value, ok := config["sync_full"]; ok { + info.Fields.Stats.SyncFull = getUint32FromString(value) + } + if value, ok := config["sync_partial_ok"]; ok { + info.Fields.Stats.SyncPartialOk = getUint32FromString(value) + } + if value, ok := config["sync_partial_err"]; ok { + info.Fields.Stats.SyncPartialErr = getUint32FromString(value) + } + if value, ok := config["pubsub_channels"]; ok { + info.Fields.Stats.PubsubChannels = getUint32FromString(value) + } +} + +func updateCpuInfoFields(config ConfigInfo, info *DbInfo) { + if value, ok := config["used_cpu_sys"]; ok { + info.Fields.Cpu.UsedCpuSys = getFloat64FromString(value) + } + if value, ok := config["used_cpu_user"]; ok { + info.Fields.Cpu.UsedCpuUser = getFloat64FromString(value) + } +} + +func updateCommandstatsInfoFields(config ConfigInfo, info *DbInfo) { + if values, ok := config["cmdstat_replconf"]; ok { + updateCommandstatsValues(&info.Fields.Commandstats.CmdstatReplconf, values) + } + if values, ok := config["cmdstat_keys"]; ok { + updateCommandstatsValues(&info.Fields.Commandstats.CmdstatKeys, values) + } + if values, ok := config["cmdstat_role"]; ok { + updateCommandstatsValues(&info.Fields.Commandstats.CmdstatRole, values) + } + if values, ok := config["cmdstat_psync"]; ok { + updateCommandstatsValues(&info.Fields.Commandstats.CmdstatPsync, values) + } + if values, ok := config["cmdstat_mset"]; ok { + updateCommandstatsValues(&info.Fields.Commandstats.CmdstatMset, values) + } + if values, ok := config["cmdstat_publish"]; ok { + updateCommandstatsValues(&info.Fields.Commandstats.CmdstatPublish, values) + } + if values, ok := config["cmdstat_info"]; ok { + updateCommandstatsValues(&info.Fields.Commandstats.CmdstatInfo, values) + } + if values, ok := config["cmdstat_ping"]; ok { + updateCommandstatsValues(&info.Fields.Commandstats.CmdstatPing, values) + } + if values, ok := config["cmdstat_client"]; ok { + updateCommandstatsValues(&info.Fields.Commandstats.CmdstatClient, values) + } + if values, ok := config["cmdstat_command"]; ok { + updateCommandstatsValues(&info.Fields.Commandstats.CmdstatCommand, values) + } + if values, ok := config["cmdstat_subscribe"]; ok { + updateCommandstatsValues(&info.Fields.Commandstats.CmdstatSubscribe, values) + } + if values, ok := config["cmdstat_monitor"]; ok { + updateCommandstatsValues(&info.Fields.Commandstats.CmdstatMonitor, values) + } + if values, ok := config["cmdstat_config"]; ok { + updateCommandstatsValues(&info.Fields.Commandstats.CmdstatConfig, values) + } + if values, ok := config["cmdstat_slaveof"]; ok { + updateCommandstatsValues(&info.Fields.Commandstats.CmdstatSlaveof, values) + } +} + +func updateKeyspaceInfoFields(config ConfigInfo, info *DbInfo) { + if values, ok := config["db0"]; ok { + updateKeyspaceValues(&info.Fields.Keyspace.Db, values) + } +} + +func getConfigInfo(input []string) ConfigInfo { + config := ConfigInfo{} for _, line := range input { - if idx := strings.Index(line, "role:"); idx != -1 { - roleStr := line[idx+len("role:"):] - if roleStr == "master" { - info.Fields.PrimaryRole = true - } - } else if idx := strings.Index(line, "connected_slaves:"); idx != -1 { - cntStr := line[idx+len("connected_slaves:"):] - cnt, err := strconv.ParseUint(cntStr, 10, 32) - if err != nil { - return fmt.Errorf("Info reply error: %s", err.Error()) + if i := strings.Index(line, ":"); i != -1 { + if key := strings.TrimSpace(line[:i]); len(key) > 0 { + if len(line) > i { + config[key] = strings.TrimSpace(line[i+1:]) + } } - info.Fields.ConnectedReplicaCnt = uint32(cnt) } } - return nil + return config +} + +func readRedisInfoReplyFields(input []string, info *DbInfo) { + config := getConfigInfo(input) + + if value, ok := config["role"]; ok { + if "master" == value { + info.Fields.PrimaryRole = true + } + } + if value, ok := config["connected_slaves"]; ok { + info.Fields.ConnectedReplicaCnt = getUint32FromString(value) + } + updateServerInfoFields(config, info) + updateClientInfoFields(config, info) + updateMemoryInfoFields(config, info) + updateStatsInfoFields(config, info) + updateCpuInfoFields(config, info) + updateCommandstatsInfoFields(config, info) + updateKeyspaceInfoFields(config, info) } func (db *DB) State() (*DbState, error) { dbState := new(DbState) - if db.cfg.sentinelPort != "" { + if db.sentinelPort != "" { //Establish connection to Redis sentinel. The reason why connection is done //here instead of time of the SDL instance creation is that for the time being //sentinel connection is needed only here to get state information and //state information is needed only by 'sdlcli' hence it is not time critical //and also we want to avoid opening unnecessary TCP connections towards Redis //sentinel for every SDL instance. Now it is done only when 'sdlcli' is used. - sentinelClient := db.sentinel(&db.cfg, db.addr) + sentinelClient := db.sentinel(db.addr, db.sentinelPort, db.masterName, db.nodeCnt) return sentinelClient.GetDbState() } else { info, err := db.Info() @@ -546,15 +832,17 @@ func (db *DB) fillDbStateFromDbInfo(info *DbInfo) (*DbState, error) { PrimaryDbState: PrimaryDbState{ Fields: PrimaryDbStateFields{ Role: "master", + Ip: db.addr, + Port: db.port, Flags: "master", }, }, } } - cnt, err := strconv.Atoi(db.cfg.nodeCnt) + cnt, err := strconv.Atoi(db.nodeCnt) if err != nil { - dbState.Err = fmt.Errorf("DBAAS_NODE_COUNT configuration value '%s' conversion to integer failed", db.cfg.nodeCnt) + dbState.Err = fmt.Errorf("DBAAS_NODE_COUNT configuration value '%s' conversion to integer failed", db.nodeCnt) } else { dbState.ConfigNodeCnt = cnt } @@ -562,11 +850,77 @@ func (db *DB) fillDbStateFromDbInfo(info *DbInfo) (*DbState, error) { return &dbState, dbState.Err } +func createReplicaDbClient(host string) *DB { + addr, port, _ := net.SplitHostPort(host) + return createDbClient(addr, port, "", "", "", newRedisClient, subscribeNotifications, nil) +} + +func getStatisticsInfo(db *DB, host string) (*DbStatisticsInfo, error) { + dbStatisticsInfo := new(DbStatisticsInfo) + dbStatisticsInfo.IPAddr, dbStatisticsInfo.Port, _ = net.SplitHostPort(host) + + info, err := db.Info() + if err != nil { + return nil, err + } + dbStatisticsInfo.Info = info + + return dbStatisticsInfo, nil +} + +func sentinelStatistics(db *DB) (*DbStatistics, error) { + dbState := new(DbState) + dbStatistics := new(DbStatistics) + dbStatisticsInfo := new(DbStatisticsInfo) + var err error + + dbState, err = db.State() + if err != nil { + return nil, err + } + + dbStatisticsInfo, err = getStatisticsInfo(db, dbState.PrimaryDbState.GetAddress()) + dbStatistics.Stats = append(dbStatistics.Stats, dbStatisticsInfo) + + if dbState.ReplicasDbState != nil { + for _, r := range dbState.ReplicasDbState.States { + replicaDb := createReplicaDbClient(r.GetAddress()) + dbStatisticsInfo, err = getStatisticsInfo(replicaDb, r.GetAddress()) + if closeErr := replicaDb.CloseDB(); closeErr != nil { + return nil, closeErr + } + if err != nil { + return nil, err + } + dbStatistics.Stats = append(dbStatistics.Stats, dbStatisticsInfo) + } + } + + return dbStatistics, nil +} + +func standaloneStatistics(db *DB) (*DbStatistics, error) { + dbStatistics := new(DbStatistics) + + dbStatisticsInfo, err := getStatisticsInfo(db, net.JoinHostPort(db.addr, db.port)) + dbStatistics.Stats = append(dbStatistics.Stats, dbStatisticsInfo) + + return dbStatistics, err +} + +func (db *DB) Statistics() (*DbStatistics, error) { + if db.sentinelPort != "" { + return sentinelStatistics(db) + } + + return standaloneStatistics(db) +} + var luaRefresh = redis.NewScript(`if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("pexpire", KEYS[1], ARGV[2]) else return 0 end`) func (db *DB) PExpireIE(key string, data interface{}, expiration time.Duration) error { expirationStr := strconv.FormatInt(int64(expiration/time.Millisecond), 10) - result, err := luaRefresh.Run(db.client, []string{key}, data, expirationStr).Result() + result, err := luaRefresh.Run(db.ctx, db.client, []string{key}, data, expirationStr).Result() if err != nil { return err }