import (
"errors"
- "fmt"
- "github.com/go-redis/redis"
+ "github.com/go-redis/redis/v7"
+ "io"
+ "log"
"os"
"strconv"
"strings"
type DB struct {
client RedisClient
+ sentinel RedisSentinelCreateCb
subscribe SubscribeFn
redisModules bool
sCbMap *sharedCbMap
ch intChannels
+ cfg Config
+ addr string
}
type Subscriber interface {
EvalSha(sha1 string, keys []string, args ...interface{}) *redis.Cmd
ScriptExists(scripts ...string) *redis.BoolSliceCmd
ScriptLoad(script string) *redis.StringCmd
+ Info(section ...string) *redis.StringCmd
+}
+
+var dbLogger *log.Logger
+
+func init() {
+ dbLogger = log.New(os.Stdout, "database: ", log.LstdFlags|log.Lshortfile)
+ redis.SetLogger(dbLogger)
+}
+
+func SetDbLogger(out io.Writer) {
+ dbLogger.SetOutput(out)
}
func checkResultAndError(result interface{}, err error) (bool, error) {
return client.Subscribe(channels...)
}
-func CreateDB(client RedisClient, subscribe SubscribeFn) *DB {
+func CreateDB(client RedisClient, subscribe SubscribeFn, sentinelCreateCb RedisSentinelCreateCb, cfg Config, sentinelAddr string) *DB {
db := DB{
client: client,
+ sentinel: sentinelCreateCb,
subscribe: subscribe,
redisModules: true,
sCbMap: &sharedCbMap{cbMap: make(map[string]ChannelNotificationCb, 0)},
removeChannel: make(chan string),
exit: make(chan bool),
},
+ cfg: cfg,
+ addr: sentinelAddr,
}
return &db
func Create() []*DB {
osimpl := osImpl{}
- return ReadConfigAndCreateDbClients(osimpl, newRedisClient)
+ return ReadConfigAndCreateDbClients(osimpl, newRedisClient, subscribeNotifications, newRedisSentinel)
}
func readConfig(osI OS) Config {
return val
}
-func ReadConfigAndCreateDbClients(osI OS, clientCreator RedisClientCreator) []*DB {
+func ReadConfigAndCreateDbClients(osI OS, clientCreator RedisClientCreator,
+ subscribe SubscribeFn,
+ sentinelCreateCb RedisSentinelCreateCb) []*DB {
cfg := readConfig(osI)
- return createDbClients(cfg, clientCreator)
+ return createDbClients(cfg, clientCreator, subscribe, sentinelCreateCb)
}
-func createDbClients(cfg Config, clientCreator RedisClientCreator) []*DB {
+func createDbClients(cfg Config, clientCreator RedisClientCreator,
+ subscribe SubscribeFn,
+ sentinelCreateCb RedisSentinelCreateCb) []*DB {
if cfg.clusterAddrList == "" {
- return []*DB{createLegacyDbClient(cfg, clientCreator)}
+ return []*DB{createLegacyDbClient(cfg, clientCreator, subscribe, sentinelCreateCb)}
}
dbs := []*DB{}
addrList := strings.Split(cfg.clusterAddrList, ",")
for _, addr := range addrList {
- db := createDbClient(cfg, addr, clientCreator)
+ db := createDbClient(cfg, addr, clientCreator, subscribe, sentinelCreateCb)
dbs = append(dbs, db)
}
return dbs
}
-func createLegacyDbClient(cfg Config, clientCreator RedisClientCreator) *DB {
- return createDbClient(cfg, cfg.hostname, clientCreator)
+func createLegacyDbClient(cfg Config, clientCreator RedisClientCreator,
+ subscribe SubscribeFn,
+ sentinelCreateCb RedisSentinelCreateCb) *DB {
+ return createDbClient(cfg, cfg.hostname, clientCreator, subscribe, sentinelCreateCb)
}
-func createDbClient(cfg Config, hostName string, clientCreator RedisClientCreator) *DB {
+func createDbClient(cfg Config, hostName string, clientCreator RedisClientCreator,
+ subscribe SubscribeFn,
+ sentinelCreateCb RedisSentinelCreateCb) *DB {
var client RedisClient
+ var db *DB
if cfg.sentinelPort == "" {
client = clientCreator(hostName, cfg.port, "", false)
+ db = CreateDB(client, subscribe, nil, cfg, hostName)
} else {
client = clientCreator(hostName, cfg.sentinelPort, cfg.masterName, true)
+ db = CreateDB(client, subscribe, sentinelCreateCb, cfg, hostName)
}
- db := CreateDB(client, subscribeNotifications)
db.CheckCommands()
return db
}
}
}
} else {
- fmt.Println(err)
+ dbLogger.Printf("SDL DB commands checking failure: %s\n", err)
}
}
case exit := <-ch.exit:
if exit {
if err := sub.Close(); err != nil {
- fmt.Println(err)
+ dbLogger.Printf("SDL DB channel closing failure: %s\n", err)
}
return
}
return result, err
}
+func (db *DB) Info() (*DbInfo, error) {
+ var info DbInfo
+ resultStr, err := db.client.Info("all").Result()
+ result := strings.Split(strings.ReplaceAll(resultStr, "\r\n", "\n"), "\n")
+ readRedisInfoReplyFields(result, &info)
+ return &info, err
+}
+
+func readRedisInfoReplyFields(input []string, info *DbInfo) {
+ for _, line := range input {
+ if idx := strings.Index(line, "role:"); idx != -1 {
+ roleStr := line[idx+len("role:"):]
+ if roleStr == "master" {
+ info.Fields.MasterRole = true
+ }
+ } else if idx := strings.Index(line, "connected_slaves:"); idx != -1 {
+ cntStr := line[idx+len("connected_slaves:"):]
+ if cnt, err := strconv.ParseUint(cntStr, 10, 32); err == nil {
+ info.Fields.ConnectedReplicaCnt = uint32(cnt)
+ }
+ }
+ }
+}
+
+func (db *DB) State() (*DbState, error) {
+ if db.cfg.sentinelPort != "" {
+ //Establish connection to Redis sentinel. The reason why connection is done
+ //here instead of time of the SDL instance creation is that for the time being
+ //sentinel connection is needed only here to get state information and
+ //state information is needed only by 'sdlcli' hence it is not time critical
+ //and also we want to avoid opening unnecessary TCP connections towards Redis
+ //sentinel for every SDL instance. Now it is done only when 'sdlcli' is used.
+ sentinelClient := db.sentinel(&db.cfg, db.addr)
+ return sentinelClient.GetDbState()
+ } else {
+ var dbState DbState
+ info, err := db.Info()
+ if err != nil {
+ return &dbState, err
+ }
+ dbState = fillDbStateFromDbInfo(info)
+ return &dbState, err
+ }
+}
+
+func fillDbStateFromDbInfo(info *DbInfo) DbState {
+ var dbState DbState
+ if info.Fields.MasterRole == true {
+ dbState = DbState{
+ MasterDbState: MasterDbState{
+ Fields: MasterDbStateFields{
+ Role: "master",
+ Flags: "master",
+ },
+ },
+ }
+ }
+ return dbState
+}
+
var luaRefresh = redis.NewScript(`if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("pexpire", KEYS[1], ARGV[2]) else return 0 end`)
func (db *DB) PExpireIE(key string, data interface{}, expiration time.Duration) error {