/*
Copyright (c) 2019 AT&T Intellectual Property.
- Copyright (c) 2018-2019 Nokia.
+ Copyright (c) 2018-2022 Nokia.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
"time"
)
+const EventSeparator = "___"
+const NsSeparator = ","
+
type ChannelNotificationCb func(channel string, payload ...string)
type RedisClientCreator func(addr, port, clusterName string, isHa bool) RedisClient
}
type Config struct {
- hostname string
- port string
- masterName string
- sentinelPort string
- clusterAddrList string
- nodeCnt string
+ hostname string
+ ports []string
+ masterNames []string
+ sentinelPorts []string
+ clusterAddrs []string
+ nodeCnt string
}
type DB struct {
redisModules bool
sCbMap *sharedCbMap
ch intChannels
- cfg Config
addr string
+ port string
+ sentinelPort string
+ masterName string
+ nodeCnt string
}
type Subscriber interface {
return client.Subscribe(ctx, channels...)
}
-func CreateDB(client RedisClient, subscribe SubscribeFn, sentinelCreateCb RedisSentinelCreateCb, cfg Config, sentinelAddr string) *DB {
+func CreateDB(client RedisClient, subscribe SubscribeFn, sentinelCreateCb RedisSentinelCreateCb,
+ addr, port, sentinelPort, masterName, nodeCnt string) *DB {
db := DB{
ctx: context.Background(),
client: client,
removeChannel: make(chan string),
exit: make(chan bool),
},
- cfg: cfg,
- addr: sentinelAddr,
+ addr: addr,
+ sentinelPort: sentinelPort,
+ port: port,
+ masterName: masterName,
+ nodeCnt: nodeCnt,
}
return &db
func readConfig(osI OS) Config {
cfg := Config{
- hostname: osI.Getenv("DBAAS_SERVICE_HOST", "localhost"),
- port: osI.Getenv("DBAAS_SERVICE_PORT", "6379"),
- masterName: osI.Getenv("DBAAS_MASTER_NAME", ""),
- sentinelPort: osI.Getenv("DBAAS_SERVICE_SENTINEL_PORT", ""),
- clusterAddrList: osI.Getenv("DBAAS_CLUSTER_ADDR_LIST", ""),
- nodeCnt: osI.Getenv("DBAAS_NODE_COUNT", "1"),
+ hostname: osI.Getenv("DBAAS_SERVICE_HOST", "localhost"),
+ ports: strings.Split(osI.Getenv("DBAAS_SERVICE_PORT", "6379"), ","),
+ nodeCnt: osI.Getenv("DBAAS_NODE_COUNT", "1"),
+ }
+
+ if addrStr := osI.Getenv("DBAAS_CLUSTER_ADDR_LIST", ""); addrStr != "" {
+ cfg.clusterAddrs = strings.Split(addrStr, ",")
+ } else if cfg.hostname != "" {
+ cfg.clusterAddrs = append(cfg.clusterAddrs, cfg.hostname)
}
+ if sntPortStr := osI.Getenv("DBAAS_SERVICE_SENTINEL_PORT", ""); sntPortStr != "" {
+ cfg.sentinelPorts = strings.Split(sntPortStr, ",")
+ }
+ if nameStr := osI.Getenv("DBAAS_MASTER_NAME", ""); nameStr != "" {
+ cfg.masterNames = strings.Split(nameStr, ",")
+ }
+ completeConfig(&cfg)
return cfg
}
return val
}
-func ReadConfigAndCreateDbClients(osI OS, clientCreator RedisClientCreator,
- subscribe SubscribeFn,
- sentinelCreateCb RedisSentinelCreateCb) []*DB {
- cfg := readConfig(osI)
- return createDbClients(cfg, clientCreator, subscribe, sentinelCreateCb)
+func completeConfig(cfg *Config) {
+ if len(cfg.sentinelPorts) == 0 {
+ if len(cfg.clusterAddrs) > len(cfg.ports) && len(cfg.ports) > 0 {
+ for i := len(cfg.ports); i < len(cfg.clusterAddrs); i++ {
+ cfg.ports = append(cfg.ports, cfg.ports[i-1])
+ }
+ }
+ } else {
+ if len(cfg.clusterAddrs) > len(cfg.sentinelPorts) {
+ for i := len(cfg.sentinelPorts); i < len(cfg.clusterAddrs); i++ {
+ cfg.sentinelPorts = append(cfg.sentinelPorts, cfg.sentinelPorts[i-1])
+ }
+ }
+ if len(cfg.clusterAddrs) > len(cfg.masterNames) && len(cfg.masterNames) > 0 {
+ for i := len(cfg.masterNames); i < len(cfg.clusterAddrs); i++ {
+ cfg.masterNames = append(cfg.masterNames, cfg.masterNames[i-1])
+ }
+ }
+ }
}
-func createDbClients(cfg Config, clientCreator RedisClientCreator,
+func ReadConfigAndCreateDbClients(osI OS, clientCreator RedisClientCreator,
subscribe SubscribeFn,
sentinelCreateCb RedisSentinelCreateCb) []*DB {
- if cfg.clusterAddrList == "" {
- return []*DB{createLegacyDbClient(cfg, clientCreator, subscribe, sentinelCreateCb)}
- }
-
dbs := []*DB{}
-
- addrList := strings.Split(cfg.clusterAddrList, ",")
- for _, addr := range addrList {
- db := createDbClient(cfg, addr, clientCreator, subscribe, sentinelCreateCb)
+ cfg := readConfig(osI)
+ for i, addr := range cfg.clusterAddrs {
+ port := getListItem(cfg.ports, i)
+ sntPort := getListItem(cfg.sentinelPorts, i)
+ name := getListItem(cfg.masterNames, i)
+ db := createDbClient(addr, port, sntPort, name, cfg.nodeCnt,
+ clientCreator, subscribe, sentinelCreateCb)
dbs = append(dbs, db)
}
return dbs
}
-func createLegacyDbClient(cfg Config, clientCreator RedisClientCreator,
- subscribe SubscribeFn,
- sentinelCreateCb RedisSentinelCreateCb) *DB {
- return createDbClient(cfg, cfg.hostname, clientCreator, subscribe, sentinelCreateCb)
+func getListItem(list []string, index int) string {
+ if index < len(list) {
+ return list[index]
+ }
+ return ""
}
-func createDbClient(cfg Config, hostName string, clientCreator RedisClientCreator,
+func createDbClient(addr, port, sentinelPort, masterName, nodeCnt string, clientCreator RedisClientCreator,
subscribe SubscribeFn,
sentinelCreateCb RedisSentinelCreateCb) *DB {
var client RedisClient
var db *DB
- if cfg.sentinelPort == "" {
- client = clientCreator(hostName, cfg.port, "", false)
- db = CreateDB(client, subscribe, nil, cfg, hostName)
+ if sentinelPort == "" {
+ client = clientCreator(addr, port, "", false)
+ db = CreateDB(client, subscribe, nil, addr, port, sentinelPort, masterName, nodeCnt)
} else {
- client = clientCreator(hostName, cfg.sentinelPort, cfg.masterName, true)
- db = CreateDB(client, subscribe, sentinelCreateCb, cfg, hostName)
+ client = clientCreator(addr, sentinelPort, masterName, true)
+ db = CreateDB(client, subscribe, sentinelCreateCb, addr, port, sentinelPort, masterName, nodeCnt)
}
db.CheckCommands()
return db
return db.client.Close()
}
-func (db *DB) UnsubscribeChannelDB(channels ...string) {
+func (db *DB) UnsubscribeChannelDB(channels ...string) error {
for _, v := range channels {
db.sCbMap.Remove(v)
db.ch.removeChannel <- v
+ errStr := <-db.ch.removeChannel
+ if errStr != "" {
+ return fmt.Errorf("SDL Unsubscribe of channel %s failed: %s", v, errStr)
+ }
if db.sCbMap.Count() == 0 {
db.ch.exit <- true
}
}
+ return nil
}
-func (db *DB) SubscribeChannelDB(cb func(string, ...string), channelPrefix, eventSeparator string, channels ...string) {
+func (db *DB) SubscribeChannelDB(cb func(string, ...string), channels ...string) error {
if db.sCbMap.Count() == 0 {
- for _, v := range channels {
- db.sCbMap.Add(v, cb)
- }
-
- go func(sCbMap *sharedCbMap,
- channelPrefix,
- eventSeparator string,
- ch intChannels,
- channels ...string) {
- sub := db.subscribe(db.ctx, db.client, channels...)
+ go func(sCbMap *sharedCbMap, ch intChannels) {
+ sub := db.subscribe(db.ctx, db.client, "")
rxChannel := sub.Channel()
lCbMap := sCbMap.GetMapCopy()
for {
case msg := <-rxChannel:
cb, ok := lCbMap[msg.Channel]
if ok {
- cb(strings.TrimPrefix(msg.Channel, channelPrefix), strings.Split(msg.Payload, eventSeparator)...)
+ nSChNames := strings.SplitAfterN(msg.Channel, NsSeparator, 2)
+ cb(nSChNames[1], strings.Split(msg.Payload, EventSeparator)...)
}
case channel := <-ch.addChannel:
lCbMap = sCbMap.GetMapCopy()
- sub.Subscribe(db.ctx, channel)
+ if err := sub.Subscribe(db.ctx, channel); err != nil {
+ ch.addChannel <- err.Error()
+ } else {
+ ch.addChannel <- ""
+ }
case channel := <-ch.removeChannel:
lCbMap = sCbMap.GetMapCopy()
- sub.Unsubscribe(db.ctx, channel)
+ if err := sub.Unsubscribe(db.ctx, channel); err != nil {
+ ch.removeChannel <- err.Error()
+ } else {
+ ch.removeChannel <- ""
+ }
case exit := <-ch.exit:
if exit {
if err := sub.Close(); err != nil {
}
}
}
- }(db.sCbMap, channelPrefix, eventSeparator, db.ch, channels...)
-
- } else {
- for _, v := range channels {
- db.sCbMap.Add(v, cb)
- db.ch.addChannel <- v
+ }(db.sCbMap, db.ch)
+ }
+ for _, v := range channels {
+ db.sCbMap.Add(v, cb)
+ db.ch.addChannel <- v
+ errStr := <-db.ch.addChannel
+ if errStr != "" {
+ return fmt.Errorf("SDL Subscribe of channel %s failed: %s", v, errStr)
}
}
+ return nil
}
func (db *DB) MSet(pairs ...interface{}) error {
func (db *DB) State() (*DbState, error) {
dbState := new(DbState)
- if db.cfg.sentinelPort != "" {
+ if db.sentinelPort != "" {
//Establish connection to Redis sentinel. The reason why connection is done
//here instead of time of the SDL instance creation is that for the time being
//sentinel connection is needed only here to get state information and
//state information is needed only by 'sdlcli' hence it is not time critical
//and also we want to avoid opening unnecessary TCP connections towards Redis
//sentinel for every SDL instance. Now it is done only when 'sdlcli' is used.
- sentinelClient := db.sentinel(&db.cfg, db.addr)
+ sentinelClient := db.sentinel(db.addr, db.sentinelPort, db.masterName, db.nodeCnt)
return sentinelClient.GetDbState()
} else {
info, err := db.Info()
PrimaryDbState: PrimaryDbState{
Fields: PrimaryDbStateFields{
Role: "master",
- Ip: db.cfg.hostname,
- Port: db.cfg.port,
+ Ip: db.addr,
+ Port: db.port,
Flags: "master",
},
},
}
}
- cnt, err := strconv.Atoi(db.cfg.nodeCnt)
+ cnt, err := strconv.Atoi(db.nodeCnt)
if err != nil {
- dbState.Err = fmt.Errorf("DBAAS_NODE_COUNT configuration value '%s' conversion to integer failed", db.cfg.nodeCnt)
+ dbState.Err = fmt.Errorf("DBAAS_NODE_COUNT configuration value '%s' conversion to integer failed", db.nodeCnt)
} else {
dbState.ConfigNodeCnt = cnt
}
return &dbState, dbState.Err
}
-func createReplicaDbClient(host string) *DB {
- cfg := readConfig(osImpl{})
- cfg.sentinelPort = ""
- cfg.clusterAddrList, cfg.port, _ = net.SplitHostPort(host)
- return createDbClient(cfg, cfg.clusterAddrList, newRedisClient, subscribeNotifications, nil)
+func createReplicaDbClient(host string) (*DB, error) {
+ addr, port, err := net.SplitHostPort(host)
+ if err != nil {
+ return nil, err
+ }
+ return createDbClient(addr, port, "", "", "", newRedisClient, subscribeNotifications, nil), err
}
func getStatisticsInfo(db *DB, host string) (*DbStatisticsInfo, error) {
+ var err error
dbStatisticsInfo := new(DbStatisticsInfo)
- dbStatisticsInfo.IPAddr, dbStatisticsInfo.Port, _ = net.SplitHostPort(host)
+ dbStatisticsInfo.IPAddr, dbStatisticsInfo.Port, err = net.SplitHostPort(host)
+ if err != nil {
+ return nil, err
+ }
info, err := db.Info()
if err != nil {
if dbState.ReplicasDbState != nil {
for _, r := range dbState.ReplicasDbState.States {
- replicaDb := createReplicaDbClient(r.GetAddress())
+ replicaDb, err := createReplicaDbClient(r.GetAddress())
+ if err != nil {
+ return nil, err
+ }
dbStatisticsInfo, err = getStatisticsInfo(replicaDb, r.GetAddress())
- replicaDb.CloseDB()
+ if closeErr := replicaDb.CloseDB(); closeErr != nil {
+ return nil, closeErr
+ }
if err != nil {
return nil, err
}
func standaloneStatistics(db *DB) (*DbStatistics, error) {
dbStatistics := new(DbStatistics)
- dbStatisticsInfo, err := getStatisticsInfo(db, net.JoinHostPort(db.cfg.hostname, db.cfg.port))
+ dbStatisticsInfo, err := getStatisticsInfo(db, net.JoinHostPort(db.addr, db.port))
dbStatistics.Stats = append(dbStatistics.Stats, dbStatisticsInfo)
return dbStatistics, err
}
func (db *DB) Statistics() (*DbStatistics, error) {
- if db.cfg.sentinelPort != "" {
+ if db.sentinelPort != "" {
return sentinelStatistics(db)
}