X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=internal%2Fsdlgoredis%2Fsdlgoredis.go;h=570dfa123760a5d49c65cf7a56021f66dfdb3e86;hb=refs%2Fchanges%2F40%2F6740%2F2;hp=72eaebe6d864dde4eb59a57d5c663505206572b7;hpb=33961a269bf51f6c713fcb00576cdc0b9ac98ee9;p=ric-plt%2Fsdlgo.git diff --git a/internal/sdlgoredis/sdlgoredis.go b/internal/sdlgoredis/sdlgoredis.go index 72eaebe..570dfa1 100644 --- a/internal/sdlgoredis/sdlgoredis.go +++ b/internal/sdlgoredis/sdlgoredis.go @@ -34,6 +34,7 @@ import ( ) type ChannelNotificationCb func(channel string, payload ...string) +type RedisClientCreator func(addr, port, clusterName string, isHa bool) RedisClient type intChannels struct { addChannel chan string @@ -46,6 +47,14 @@ type sharedCbMap struct { cbMap map[string]ChannelNotificationCb } +type Config struct { + hostname string + port string + masterName string + sentinelPort string + clusterAddrList string +} + type DB struct { client RedisClient subscribe SubscribeFn @@ -102,8 +111,14 @@ func checkIntResultAndError(result interface{}, err error) (bool, error) { if err != nil { return false, err } - if result.(int) == int(1) { - return true, nil + if n, ok := result.(int64); ok { + if n == 1 { + return true, nil + } + } else if n, ok := result.(int); ok { + if n == 1 { + return true, nil + } } return false, nil } @@ -128,41 +143,94 @@ func CreateDB(client RedisClient, subscribe SubscribeFn) *DB { return &db } -func Create() *DB { - var client *redis.Client - hostname := os.Getenv("DBAAS_SERVICE_HOST") - if hostname == "" { - hostname = "localhost" +func Create() []*DB { + osimpl := osImpl{} + return ReadConfigAndCreateDbClients(osimpl, newRedisClient) +} + +func readConfig(osI OS) Config { + cfg := Config{ + hostname: osI.Getenv("DBAAS_SERVICE_HOST", "localhost"), + port: osI.Getenv("DBAAS_SERVICE_PORT", "6379"), + masterName: osI.Getenv("DBAAS_MASTER_NAME", ""), + sentinelPort: osI.Getenv("DBAAS_SERVICE_SENTINEL_PORT", ""), + clusterAddrList: osI.Getenv("DBAAS_CLUSTER_ADDR_LIST", ""), } - port := os.Getenv("DBAAS_SERVICE_PORT") - if port == "" { - port = "6379" + return cfg +} + +type OS interface { + Getenv(key string, defValue string) string +} + +type osImpl struct{} + +func (osImpl) Getenv(key string, defValue string) string { + val := os.Getenv(key) + if val == "" { + val = defValue } - sentinelPort := os.Getenv("DBAAS_SERVICE_SENTINEL_PORT") - masterName := os.Getenv("DBAAS_MASTER_NAME") - if sentinelPort == "" { - redisAddress := hostname + ":" + port - client = redis.NewClient(&redis.Options{ - Addr: redisAddress, - Password: "", // no password set - DB: 0, // use default DB - PoolSize: 20, - MaxRetries: 2, - }) + return val +} + +func ReadConfigAndCreateDbClients(osI OS, clientCreator RedisClientCreator) []*DB { + cfg := readConfig(osI) + return createDbClients(cfg, clientCreator) +} + +func createDbClients(cfg Config, clientCreator RedisClientCreator) []*DB { + if cfg.clusterAddrList == "" { + return []*DB{createLegacyDbClient(cfg, clientCreator)} + } + + dbs := []*DB{} + + addrList := strings.Split(cfg.clusterAddrList, ",") + for _, addr := range addrList { + db := createDbClient(cfg, addr, clientCreator) + dbs = append(dbs, db) + } + return dbs +} + +func createLegacyDbClient(cfg Config, clientCreator RedisClientCreator) *DB { + return createDbClient(cfg, cfg.hostname, clientCreator) +} + +func createDbClient(cfg Config, hostName string, clientCreator RedisClientCreator) *DB { + var client RedisClient + if cfg.sentinelPort == "" { + client = clientCreator(hostName, cfg.port, "", false) } else { - sentinelAddress := hostname + ":" + sentinelPort - client = redis.NewFailoverClient(&redis.FailoverOptions{ - MasterName: masterName, - SentinelAddrs: []string{sentinelAddress}, - PoolSize: 20, - MaxRetries: 2, - }) + client = clientCreator(hostName, cfg.sentinelPort, cfg.masterName, true) } db := CreateDB(client, subscribeNotifications) db.CheckCommands() return db } +func newRedisClient(addr, port, clusterName string, isHa bool) RedisClient { + if isHa == true { + sentinelAddress := addr + ":" + port + return redis.NewFailoverClient( + &redis.FailoverOptions{ + MasterName: clusterName, + SentinelAddrs: []string{sentinelAddress}, + PoolSize: 20, + MaxRetries: 2, + }, + ) + } + redisAddress := addr + ":" + port + return redis.NewClient(&redis.Options{ + Addr: redisAddress, + Password: "", // no password set + DB: 0, // use default DB + PoolSize: 20, + MaxRetries: 2, + }) +} + func (db *DB) CheckCommands() { commands, err := db.client.Command().Result() if err == nil { @@ -301,28 +369,53 @@ func (db *DB) SetIE(key string, oldData, newData interface{}) (bool, error) { return checkResultAndError(db.client.Do("SETIE", key, newData, oldData).Result()) } -func (db *DB) SetIEPub(channel, message, key string, oldData, newData interface{}) (bool, error) { +func (db *DB) SetIEPub(channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error) { if !db.redisModules { - return false, errors.New("Redis deployment not supporting command SETIEPUB") + return false, errors.New("Redis deployment not supporting command SETIEMPUB") + } + capacity := 4 + len(channelsAndEvents) + command := make([]interface{}, 0, capacity) + command = append(command, "SETIEMPUB") + command = append(command, key) + command = append(command, newData) + command = append(command, oldData) + for _, ce := range channelsAndEvents { + command = append(command, ce) } - return checkResultAndError(db.client.Do("SETIEPUB", key, newData, oldData, channel, message).Result()) + return checkResultAndError(db.client.Do(command...).Result()) } -func (db *DB) SetNXPub(channel, message, key string, data interface{}) (bool, error) { +func (db *DB) SetNXPub(channelsAndEvents []string, key string, data interface{}) (bool, error) { if !db.redisModules { - return false, errors.New("Redis deployment not supporting command SETNXPUB") + return false, errors.New("Redis deployment not supporting command SETNXMPUB") } - return checkResultAndError(db.client.Do("SETNXPUB", key, data, channel, message).Result()) + capacity := 3 + len(channelsAndEvents) + command := make([]interface{}, 0, capacity) + command = append(command, "SETNXMPUB") + command = append(command, key) + command = append(command, data) + for _, ce := range channelsAndEvents { + command = append(command, ce) + } + return checkResultAndError(db.client.Do(command...).Result()) } func (db *DB) SetNX(key string, data interface{}, expiration time.Duration) (bool, error) { return db.client.SetNX(key, data, expiration).Result() } -func (db *DB) DelIEPub(channel, message, key string, data interface{}) (bool, error) { +func (db *DB) DelIEPub(channelsAndEvents []string, key string, data interface{}) (bool, error) { if !db.redisModules { - return false, errors.New("Redis deployment not supporting command") + return false, errors.New("Redis deployment not supporting command DELIEMPUB") + } + capacity := 3 + len(channelsAndEvents) + command := make([]interface{}, 0, capacity) + command = append(command, "DELIEMPUB") + command = append(command, key) + command = append(command, data) + for _, ce := range channelsAndEvents { + command = append(command, ce) } - return checkIntResultAndError(db.client.Do("DELIEPUB", key, data, channel, message).Result()) + return checkIntResultAndError(db.client.Do(command...).Result()) } func (db *DB) DelIE(key string, data interface{}) (bool, error) {