X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=internal%2Fsdlgoredis%2Fsdlgoredis.go;h=4121bb9abe50ab371f8e3f7c6975ee70d2a20723;hb=bd724d66fdfee6b1a6e589a4081529dd535ca423;hp=278be2ada72cb2ceb3fda7754c7db3b41ee11d9d;hpb=4d1222c6d4954e2a2855833696dc3b44418714a6;p=ric-plt%2Fsdlgo.git diff --git a/internal/sdlgoredis/sdlgoredis.go b/internal/sdlgoredis/sdlgoredis.go index 278be2a..4121bb9 100644 --- a/internal/sdlgoredis/sdlgoredis.go +++ b/internal/sdlgoredis/sdlgoredis.go @@ -1,6 +1,6 @@ /* Copyright (c) 2019 AT&T Intellectual Property. - Copyright (c) 2018-2019 Nokia. + Copyright (c) 2018-2022 Nokia. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -38,6 +38,9 @@ import ( "time" ) +const EventSeparator = "___" +const NsSeparator = "," + type ChannelNotificationCb func(channel string, payload ...string) type RedisClientCreator func(addr, port, clusterName string, isHa bool) RedisClient @@ -53,12 +56,12 @@ type sharedCbMap struct { } type Config struct { - hostname string - port string - masterName string - sentinelPort string - clusterAddrList string - nodeCnt string + hostname string + ports []string + masterNames []string + sentinelPorts []string + clusterAddrs []string + nodeCnt string } type DB struct { @@ -69,8 +72,11 @@ type DB struct { redisModules bool sCbMap *sharedCbMap ch intChannels - cfg Config addr string + port string + sentinelPort string + masterName string + nodeCnt string } type Subscriber interface { @@ -151,7 +157,8 @@ func subscribeNotifications(ctx context.Context, client RedisClient, channels .. return client.Subscribe(ctx, channels...) } -func CreateDB(client RedisClient, subscribe SubscribeFn, sentinelCreateCb RedisSentinelCreateCb, cfg Config, sentinelAddr string) *DB { +func CreateDB(client RedisClient, subscribe SubscribeFn, sentinelCreateCb RedisSentinelCreateCb, + addr, port, sentinelPort, masterName, nodeCnt string) *DB { db := DB{ ctx: context.Background(), client: client, @@ -164,8 +171,11 @@ func CreateDB(client RedisClient, subscribe SubscribeFn, sentinelCreateCb RedisS removeChannel: make(chan string), exit: make(chan bool), }, - cfg: cfg, - addr: sentinelAddr, + addr: addr, + sentinelPort: sentinelPort, + port: port, + masterName: masterName, + nodeCnt: nodeCnt, } return &db @@ -178,13 +188,23 @@ func Create() []*DB { func readConfig(osI OS) Config { cfg := Config{ - hostname: osI.Getenv("DBAAS_SERVICE_HOST", "localhost"), - port: osI.Getenv("DBAAS_SERVICE_PORT", "6379"), - masterName: osI.Getenv("DBAAS_MASTER_NAME", ""), - sentinelPort: osI.Getenv("DBAAS_SERVICE_SENTINEL_PORT", ""), - clusterAddrList: osI.Getenv("DBAAS_CLUSTER_ADDR_LIST", ""), - nodeCnt: osI.Getenv("DBAAS_NODE_COUNT", "1"), + hostname: osI.Getenv("DBAAS_SERVICE_HOST", "localhost"), + ports: strings.Split(osI.Getenv("DBAAS_SERVICE_PORT", "6379"), ","), + nodeCnt: osI.Getenv("DBAAS_NODE_COUNT", "1"), + } + + if addrStr := osI.Getenv("DBAAS_CLUSTER_ADDR_LIST", ""); addrStr != "" { + cfg.clusterAddrs = strings.Split(addrStr, ",") + } else if cfg.hostname != "" { + cfg.clusterAddrs = append(cfg.clusterAddrs, cfg.hostname) } + if sntPortStr := osI.Getenv("DBAAS_SERVICE_SENTINEL_PORT", ""); sntPortStr != "" { + cfg.sentinelPorts = strings.Split(sntPortStr, ",") + } + if nameStr := osI.Getenv("DBAAS_MASTER_NAME", ""); nameStr != "" { + cfg.masterNames = strings.Split(nameStr, ",") + } + completeConfig(&cfg) return cfg } @@ -202,47 +222,61 @@ func (osImpl) Getenv(key string, defValue string) string { return val } -func ReadConfigAndCreateDbClients(osI OS, clientCreator RedisClientCreator, - subscribe SubscribeFn, - sentinelCreateCb RedisSentinelCreateCb) []*DB { - cfg := readConfig(osI) - return createDbClients(cfg, clientCreator, subscribe, sentinelCreateCb) +func completeConfig(cfg *Config) { + if len(cfg.sentinelPorts) == 0 { + if len(cfg.clusterAddrs) > len(cfg.ports) && len(cfg.ports) > 0 { + for i := len(cfg.ports); i < len(cfg.clusterAddrs); i++ { + cfg.ports = append(cfg.ports, cfg.ports[i-1]) + } + } + } else { + if len(cfg.clusterAddrs) > len(cfg.sentinelPorts) { + for i := len(cfg.sentinelPorts); i < len(cfg.clusterAddrs); i++ { + cfg.sentinelPorts = append(cfg.sentinelPorts, cfg.sentinelPorts[i-1]) + } + } + if len(cfg.clusterAddrs) > len(cfg.masterNames) && len(cfg.masterNames) > 0 { + for i := len(cfg.masterNames); i < len(cfg.clusterAddrs); i++ { + cfg.masterNames = append(cfg.masterNames, cfg.masterNames[i-1]) + } + } + } } -func createDbClients(cfg Config, clientCreator RedisClientCreator, +func ReadConfigAndCreateDbClients(osI OS, clientCreator RedisClientCreator, subscribe SubscribeFn, sentinelCreateCb RedisSentinelCreateCb) []*DB { - if cfg.clusterAddrList == "" { - return []*DB{createLegacyDbClient(cfg, clientCreator, subscribe, sentinelCreateCb)} - } - dbs := []*DB{} - - addrList := strings.Split(cfg.clusterAddrList, ",") - for _, addr := range addrList { - db := createDbClient(cfg, addr, clientCreator, subscribe, sentinelCreateCb) + cfg := readConfig(osI) + for i, addr := range cfg.clusterAddrs { + port := getListItem(cfg.ports, i) + sntPort := getListItem(cfg.sentinelPorts, i) + name := getListItem(cfg.masterNames, i) + db := createDbClient(addr, port, sntPort, name, cfg.nodeCnt, + clientCreator, subscribe, sentinelCreateCb) dbs = append(dbs, db) } return dbs } -func createLegacyDbClient(cfg Config, clientCreator RedisClientCreator, - subscribe SubscribeFn, - sentinelCreateCb RedisSentinelCreateCb) *DB { - return createDbClient(cfg, cfg.hostname, clientCreator, subscribe, sentinelCreateCb) +func getListItem(list []string, index int) string { + if index < len(list) { + return list[index] + } + return "" } -func createDbClient(cfg Config, hostName string, clientCreator RedisClientCreator, +func createDbClient(addr, port, sentinelPort, masterName, nodeCnt string, clientCreator RedisClientCreator, subscribe SubscribeFn, sentinelCreateCb RedisSentinelCreateCb) *DB { var client RedisClient var db *DB - if cfg.sentinelPort == "" { - client = clientCreator(hostName, cfg.port, "", false) - db = CreateDB(client, subscribe, nil, cfg, hostName) + if sentinelPort == "" { + client = clientCreator(addr, port, "", false) + db = CreateDB(client, subscribe, nil, addr, port, sentinelPort, masterName, nodeCnt) } else { - client = clientCreator(hostName, cfg.sentinelPort, cfg.masterName, true) - db = CreateDB(client, subscribe, sentinelCreateCb, cfg, hostName) + client = clientCreator(addr, sentinelPort, masterName, true) + db = CreateDB(client, subscribe, sentinelCreateCb, addr, port, sentinelPort, masterName, nodeCnt) } db.CheckCommands() return db @@ -290,28 +324,25 @@ func (db *DB) CloseDB() error { return db.client.Close() } -func (db *DB) UnsubscribeChannelDB(channels ...string) { +func (db *DB) UnsubscribeChannelDB(channels ...string) error { for _, v := range channels { db.sCbMap.Remove(v) db.ch.removeChannel <- v + errStr := <-db.ch.removeChannel + if errStr != "" { + return fmt.Errorf("SDL Unsubscribe of channel %s failed: %s", v, errStr) + } if db.sCbMap.Count() == 0 { db.ch.exit <- true } } + return nil } -func (db *DB) SubscribeChannelDB(cb func(string, ...string), channelPrefix, eventSeparator string, channels ...string) { +func (db *DB) SubscribeChannelDB(cb func(string, ...string), channels ...string) error { if db.sCbMap.Count() == 0 { - for _, v := range channels { - db.sCbMap.Add(v, cb) - } - - go func(sCbMap *sharedCbMap, - channelPrefix, - eventSeparator string, - ch intChannels, - channels ...string) { - sub := db.subscribe(db.ctx, db.client, channels...) + go func(sCbMap *sharedCbMap, ch intChannels) { + sub := db.subscribe(db.ctx, db.client, "") rxChannel := sub.Channel() lCbMap := sCbMap.GetMapCopy() for { @@ -319,14 +350,23 @@ func (db *DB) SubscribeChannelDB(cb func(string, ...string), channelPrefix, even case msg := <-rxChannel: cb, ok := lCbMap[msg.Channel] if ok { - cb(strings.TrimPrefix(msg.Channel, channelPrefix), strings.Split(msg.Payload, eventSeparator)...) + nSChNames := strings.SplitAfterN(msg.Channel, NsSeparator, 2) + cb(nSChNames[1], strings.Split(msg.Payload, EventSeparator)...) } case channel := <-ch.addChannel: lCbMap = sCbMap.GetMapCopy() - sub.Subscribe(db.ctx, channel) + if err := sub.Subscribe(db.ctx, channel); err != nil { + ch.addChannel <- err.Error() + } else { + ch.addChannel <- "" + } case channel := <-ch.removeChannel: lCbMap = sCbMap.GetMapCopy() - sub.Unsubscribe(db.ctx, channel) + if err := sub.Unsubscribe(db.ctx, channel); err != nil { + ch.removeChannel <- err.Error() + } else { + ch.removeChannel <- "" + } case exit := <-ch.exit: if exit { if err := sub.Close(); err != nil { @@ -336,14 +376,17 @@ func (db *DB) SubscribeChannelDB(cb func(string, ...string), channelPrefix, even } } } - }(db.sCbMap, channelPrefix, eventSeparator, db.ch, channels...) - - } else { - for _, v := range channels { - db.sCbMap.Add(v, cb) - db.ch.addChannel <- v + }(db.sCbMap, db.ch) + } + for _, v := range channels { + db.sCbMap.Add(v, cb) + db.ch.addChannel <- v + errStr := <-db.ch.addChannel + if errStr != "" { + return fmt.Errorf("SDL Subscribe of channel %s failed: %s", v, errStr) } } + return nil } func (db *DB) MSet(pairs ...interface{}) error { @@ -763,14 +806,14 @@ func readRedisInfoReplyFields(input []string, info *DbInfo) { func (db *DB) State() (*DbState, error) { dbState := new(DbState) - if db.cfg.sentinelPort != "" { + if db.sentinelPort != "" { //Establish connection to Redis sentinel. The reason why connection is done //here instead of time of the SDL instance creation is that for the time being //sentinel connection is needed only here to get state information and //state information is needed only by 'sdlcli' hence it is not time critical //and also we want to avoid opening unnecessary TCP connections towards Redis //sentinel for every SDL instance. Now it is done only when 'sdlcli' is used. - sentinelClient := db.sentinel(&db.cfg, db.addr) + sentinelClient := db.sentinel(db.addr, db.sentinelPort, db.masterName, db.nodeCnt) return sentinelClient.GetDbState() } else { info, err := db.Info() @@ -789,17 +832,17 @@ func (db *DB) fillDbStateFromDbInfo(info *DbInfo) (*DbState, error) { PrimaryDbState: PrimaryDbState{ Fields: PrimaryDbStateFields{ Role: "master", - Ip: db.cfg.hostname, - Port: db.cfg.port, + Ip: db.addr, + Port: db.port, Flags: "master", }, }, } } - cnt, err := strconv.Atoi(db.cfg.nodeCnt) + cnt, err := strconv.Atoi(db.nodeCnt) if err != nil { - dbState.Err = fmt.Errorf("DBAAS_NODE_COUNT configuration value '%s' conversion to integer failed", db.cfg.nodeCnt) + dbState.Err = fmt.Errorf("DBAAS_NODE_COUNT configuration value '%s' conversion to integer failed", db.nodeCnt) } else { dbState.ConfigNodeCnt = cnt } @@ -807,16 +850,21 @@ func (db *DB) fillDbStateFromDbInfo(info *DbInfo) (*DbState, error) { return &dbState, dbState.Err } -func createReplicaDbClient(host string) *DB { - cfg := readConfig(osImpl{}) - cfg.sentinelPort = "" - cfg.clusterAddrList, cfg.port, _ = net.SplitHostPort(host) - return createDbClient(cfg, cfg.clusterAddrList, newRedisClient, subscribeNotifications, nil) +func createReplicaDbClient(host string) (*DB, error) { + addr, port, err := net.SplitHostPort(host) + if err != nil { + return nil, err + } + return createDbClient(addr, port, "", "", "", newRedisClient, subscribeNotifications, nil), err } func getStatisticsInfo(db *DB, host string) (*DbStatisticsInfo, error) { + var err error dbStatisticsInfo := new(DbStatisticsInfo) - dbStatisticsInfo.IPAddr, dbStatisticsInfo.Port, _ = net.SplitHostPort(host) + dbStatisticsInfo.IPAddr, dbStatisticsInfo.Port, err = net.SplitHostPort(host) + if err != nil { + return nil, err + } info, err := db.Info() if err != nil { @@ -843,9 +891,14 @@ func sentinelStatistics(db *DB) (*DbStatistics, error) { if dbState.ReplicasDbState != nil { for _, r := range dbState.ReplicasDbState.States { - replicaDb := createReplicaDbClient(r.GetAddress()) + replicaDb, err := createReplicaDbClient(r.GetAddress()) + if err != nil { + return nil, err + } dbStatisticsInfo, err = getStatisticsInfo(replicaDb, r.GetAddress()) - replicaDb.CloseDB() + if closeErr := replicaDb.CloseDB(); closeErr != nil { + return nil, closeErr + } if err != nil { return nil, err } @@ -859,14 +912,14 @@ func sentinelStatistics(db *DB) (*DbStatistics, error) { func standaloneStatistics(db *DB) (*DbStatistics, error) { dbStatistics := new(DbStatistics) - dbStatisticsInfo, err := getStatisticsInfo(db, net.JoinHostPort(db.cfg.hostname, db.cfg.port)) + dbStatisticsInfo, err := getStatisticsInfo(db, net.JoinHostPort(db.addr, db.port)) dbStatistics.Stats = append(dbStatistics.Stats, dbStatisticsInfo) return dbStatistics, err } func (db *DB) Statistics() (*DbStatistics, error) { - if db.cfg.sentinelPort != "" { + if db.sentinelPort != "" { return sentinelStatistics(db) }