X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;ds=sidebyside;f=internal%2Fsdlgoredis%2Fsdlgoredis.go;h=77dce092fd1ef6f4d457582f8cb4b4966101a5b8;hb=6f724aa3950cdc01246351644348fdc0f6e9ca4a;hp=570dfa123760a5d49c65cf7a56021f66dfdb3e86;hpb=4123ade6080f3f5e3f3bf44e30faeacc531f32d0;p=ric-plt%2Fsdlgo.git diff --git a/internal/sdlgoredis/sdlgoredis.go b/internal/sdlgoredis/sdlgoredis.go index 570dfa1..77dce09 100644 --- a/internal/sdlgoredis/sdlgoredis.go +++ b/internal/sdlgoredis/sdlgoredis.go @@ -25,7 +25,9 @@ package sdlgoredis import ( "errors" "fmt" - "github.com/go-redis/redis" + "github.com/go-redis/redis/v7" + "io" + "log" "os" "strconv" "strings" @@ -53,14 +55,18 @@ type Config struct { masterName string sentinelPort string clusterAddrList string + nodeCnt string } type DB struct { client RedisClient + sentinel RedisSentinelCreateCb subscribe SubscribeFn redisModules bool sCbMap *sharedCbMap ch intChannels + cfg Config + addr string } type Subscriber interface { @@ -92,6 +98,18 @@ type RedisClient interface { EvalSha(sha1 string, keys []string, args ...interface{}) *redis.Cmd ScriptExists(scripts ...string) *redis.BoolSliceCmd ScriptLoad(script string) *redis.StringCmd + Info(section ...string) *redis.StringCmd +} + +var dbLogger *log.Logger + +func init() { + dbLogger = log.New(os.Stdout, "database: ", log.LstdFlags|log.Lshortfile) + redis.SetLogger(dbLogger) +} + +func SetDbLogger(out io.Writer) { + dbLogger.SetOutput(out) } func checkResultAndError(result interface{}, err error) (bool, error) { @@ -127,9 +145,10 @@ func subscribeNotifications(client RedisClient, channels ...string) Subscriber { return client.Subscribe(channels...) } -func CreateDB(client RedisClient, subscribe SubscribeFn) *DB { +func CreateDB(client RedisClient, subscribe SubscribeFn, sentinelCreateCb RedisSentinelCreateCb, cfg Config, sentinelAddr string) *DB { db := DB{ client: client, + sentinel: sentinelCreateCb, subscribe: subscribe, redisModules: true, sCbMap: &sharedCbMap{cbMap: make(map[string]ChannelNotificationCb, 0)}, @@ -138,6 +157,8 @@ func CreateDB(client RedisClient, subscribe SubscribeFn) *DB { removeChannel: make(chan string), exit: make(chan bool), }, + cfg: cfg, + addr: sentinelAddr, } return &db @@ -145,7 +166,7 @@ func CreateDB(client RedisClient, subscribe SubscribeFn) *DB { func Create() []*DB { osimpl := osImpl{} - return ReadConfigAndCreateDbClients(osimpl, newRedisClient) + return ReadConfigAndCreateDbClients(osimpl, newRedisClient, subscribeNotifications, newRedisSentinel) } func readConfig(osI OS) Config { @@ -155,6 +176,7 @@ func readConfig(osI OS) Config { masterName: osI.Getenv("DBAAS_MASTER_NAME", ""), sentinelPort: osI.Getenv("DBAAS_SERVICE_SENTINEL_PORT", ""), clusterAddrList: osI.Getenv("DBAAS_CLUSTER_ADDR_LIST", ""), + nodeCnt: osI.Getenv("DBAAS_NODE_COUNT", "1"), } return cfg } @@ -173,38 +195,48 @@ func (osImpl) Getenv(key string, defValue string) string { return val } -func ReadConfigAndCreateDbClients(osI OS, clientCreator RedisClientCreator) []*DB { +func ReadConfigAndCreateDbClients(osI OS, clientCreator RedisClientCreator, + subscribe SubscribeFn, + sentinelCreateCb RedisSentinelCreateCb) []*DB { cfg := readConfig(osI) - return createDbClients(cfg, clientCreator) + return createDbClients(cfg, clientCreator, subscribe, sentinelCreateCb) } -func createDbClients(cfg Config, clientCreator RedisClientCreator) []*DB { +func createDbClients(cfg Config, clientCreator RedisClientCreator, + subscribe SubscribeFn, + sentinelCreateCb RedisSentinelCreateCb) []*DB { if cfg.clusterAddrList == "" { - return []*DB{createLegacyDbClient(cfg, clientCreator)} + return []*DB{createLegacyDbClient(cfg, clientCreator, subscribe, sentinelCreateCb)} } dbs := []*DB{} addrList := strings.Split(cfg.clusterAddrList, ",") for _, addr := range addrList { - db := createDbClient(cfg, addr, clientCreator) + db := createDbClient(cfg, addr, clientCreator, subscribe, sentinelCreateCb) dbs = append(dbs, db) } return dbs } -func createLegacyDbClient(cfg Config, clientCreator RedisClientCreator) *DB { - return createDbClient(cfg, cfg.hostname, clientCreator) +func createLegacyDbClient(cfg Config, clientCreator RedisClientCreator, + subscribe SubscribeFn, + sentinelCreateCb RedisSentinelCreateCb) *DB { + return createDbClient(cfg, cfg.hostname, clientCreator, subscribe, sentinelCreateCb) } -func createDbClient(cfg Config, hostName string, clientCreator RedisClientCreator) *DB { +func createDbClient(cfg Config, hostName string, clientCreator RedisClientCreator, + subscribe SubscribeFn, + sentinelCreateCb RedisSentinelCreateCb) *DB { var client RedisClient + var db *DB if cfg.sentinelPort == "" { client = clientCreator(hostName, cfg.port, "", false) + db = CreateDB(client, subscribe, nil, cfg, hostName) } else { client = clientCreator(hostName, cfg.sentinelPort, cfg.masterName, true) + db = CreateDB(client, subscribe, sentinelCreateCb, cfg, hostName) } - db := CreateDB(client, subscribeNotifications) db.CheckCommands() return db } @@ -243,7 +275,7 @@ func (db *DB) CheckCommands() { } } } else { - fmt.Println(err) + dbLogger.Printf("SDL DB commands checking failure: %s\n", err) } } @@ -291,7 +323,7 @@ func (db *DB) SubscribeChannelDB(cb func(string, ...string), channelPrefix, even case exit := <-ch.exit: if exit { if err := sub.Close(); err != nil { - fmt.Println(err) + dbLogger.Printf("SDL DB channel closing failure: %s\n", err) } return } @@ -455,6 +487,81 @@ func (db *DB) PTTL(key string) (time.Duration, error) { return result, err } +func (db *DB) Info() (*DbInfo, error) { + var info DbInfo + resultStr, err := db.client.Info("all").Result() + if err != nil { + return &info, err + } + + result := strings.Split(strings.ReplaceAll(resultStr, "\r\n", "\n"), "\n") + err = readRedisInfoReplyFields(result, &info) + return &info, err +} + +func readRedisInfoReplyFields(input []string, info *DbInfo) error { + for _, line := range input { + if idx := strings.Index(line, "role:"); idx != -1 { + roleStr := line[idx+len("role:"):] + if roleStr == "master" { + info.Fields.PrimaryRole = true + } + } else if idx := strings.Index(line, "connected_slaves:"); idx != -1 { + cntStr := line[idx+len("connected_slaves:"):] + cnt, err := strconv.ParseUint(cntStr, 10, 32) + if err != nil { + return fmt.Errorf("Info reply error: %s", err.Error()) + } + info.Fields.ConnectedReplicaCnt = uint32(cnt) + } + } + return nil +} + +func (db *DB) State() (*DbState, error) { + dbState := new(DbState) + if db.cfg.sentinelPort != "" { + //Establish connection to Redis sentinel. The reason why connection is done + //here instead of time of the SDL instance creation is that for the time being + //sentinel connection is needed only here to get state information and + //state information is needed only by 'sdlcli' hence it is not time critical + //and also we want to avoid opening unnecessary TCP connections towards Redis + //sentinel for every SDL instance. Now it is done only when 'sdlcli' is used. + sentinelClient := db.sentinel(&db.cfg, db.addr) + return sentinelClient.GetDbState() + } else { + info, err := db.Info() + if err != nil { + dbState.PrimaryDbState.Err = err + return dbState, err + } + return db.fillDbStateFromDbInfo(info) + } +} + +func (db *DB) fillDbStateFromDbInfo(info *DbInfo) (*DbState, error) { + var dbState DbState + if info.Fields.PrimaryRole == true { + dbState = DbState{ + PrimaryDbState: PrimaryDbState{ + Fields: PrimaryDbStateFields{ + Role: "master", + Flags: "master", + }, + }, + } + } + + cnt, err := strconv.Atoi(db.cfg.nodeCnt) + if err != nil { + dbState.Err = fmt.Errorf("DBAAS_NODE_COUNT configuration value '%s' conversion to integer failed", db.cfg.nodeCnt) + } else { + dbState.ConfigNodeCnt = cnt + } + + return &dbState, dbState.Err +} + var luaRefresh = redis.NewScript(`if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("pexpire", KEYS[1], ARGV[2]) else return 0 end`) func (db *DB) PExpireIE(key string, data interface{}, expiration time.Duration) error {