2 Copyright (c) 2019 AT&T Intellectual Property.
3 Copyright (c) 2018-2019 Nokia.
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
9 http://www.apache.org/licenses/LICENSE-2.0
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
19 * This source code is part of the near-RT RIC (RAN Intelligent Controller)
20 * platform project (RICP).
28 "github.com/go-redis/redis/v7"
38 type ChannelNotificationCb func(channel string, payload ...string)
39 type RedisClientCreator func(addr, port, clusterName string, isHa bool) RedisClient
41 type intChannels struct {
42 addChannel chan string
43 removeChannel chan string
47 type sharedCbMap struct {
49 cbMap map[string]ChannelNotificationCb
57 clusterAddrList string
63 sentinel RedisSentinelCreateCb
72 type Subscriber interface {
73 Channel() <-chan *redis.Message
74 Subscribe(channels ...string) error
75 Unsubscribe(channels ...string) error
79 type SubscribeFn func(client RedisClient, channels ...string) Subscriber
81 type RedisClient interface {
82 Command() *redis.CommandsInfoCmd
84 Subscribe(channels ...string) *redis.PubSub
85 MSet(pairs ...interface{}) *redis.StatusCmd
86 Do(args ...interface{}) *redis.Cmd
87 MGet(keys ...string) *redis.SliceCmd
88 Del(keys ...string) *redis.IntCmd
89 Keys(pattern string) *redis.StringSliceCmd
90 SetNX(key string, value interface{}, expiration time.Duration) *redis.BoolCmd
91 SAdd(key string, members ...interface{}) *redis.IntCmd
92 SRem(key string, members ...interface{}) *redis.IntCmd
93 SMembers(key string) *redis.StringSliceCmd
94 SIsMember(key string, member interface{}) *redis.BoolCmd
95 SCard(key string) *redis.IntCmd
96 PTTL(key string) *redis.DurationCmd
97 Eval(script string, keys []string, args ...interface{}) *redis.Cmd
98 EvalSha(sha1 string, keys []string, args ...interface{}) *redis.Cmd
99 ScriptExists(scripts ...string) *redis.BoolSliceCmd
100 ScriptLoad(script string) *redis.StringCmd
101 Info(section ...string) *redis.StringCmd
104 var dbLogger *log.Logger
107 dbLogger = log.New(os.Stdout, "database: ", log.LstdFlags|log.Lshortfile)
108 redis.SetLogger(dbLogger)
111 func SetDbLogger(out io.Writer) {
112 dbLogger.SetOutput(out)
115 func checkResultAndError(result interface{}, err error) (bool, error) {
117 if err == redis.Nil {
128 func checkIntResultAndError(result interface{}, err error) (bool, error) {
132 if n, ok := result.(int64); ok {
136 } else if n, ok := result.(int); ok {
144 func subscribeNotifications(client RedisClient, channels ...string) Subscriber {
145 return client.Subscribe(channels...)
148 func CreateDB(client RedisClient, subscribe SubscribeFn, sentinelCreateCb RedisSentinelCreateCb, cfg Config, sentinelAddr string) *DB {
151 sentinel: sentinelCreateCb,
152 subscribe: subscribe,
154 sCbMap: &sharedCbMap{cbMap: make(map[string]ChannelNotificationCb, 0)},
156 addChannel: make(chan string),
157 removeChannel: make(chan string),
158 exit: make(chan bool),
167 func Create() []*DB {
169 return ReadConfigAndCreateDbClients(osimpl, newRedisClient, subscribeNotifications, newRedisSentinel)
172 func readConfig(osI OS) Config {
174 hostname: osI.Getenv("DBAAS_SERVICE_HOST", "localhost"),
175 port: osI.Getenv("DBAAS_SERVICE_PORT", "6379"),
176 masterName: osI.Getenv("DBAAS_MASTER_NAME", ""),
177 sentinelPort: osI.Getenv("DBAAS_SERVICE_SENTINEL_PORT", ""),
178 clusterAddrList: osI.Getenv("DBAAS_CLUSTER_ADDR_LIST", ""),
179 nodeCnt: osI.Getenv("DBAAS_NODE_COUNT", "1"),
185 Getenv(key string, defValue string) string
190 func (osImpl) Getenv(key string, defValue string) string {
191 val := os.Getenv(key)
198 func ReadConfigAndCreateDbClients(osI OS, clientCreator RedisClientCreator,
199 subscribe SubscribeFn,
200 sentinelCreateCb RedisSentinelCreateCb) []*DB {
201 cfg := readConfig(osI)
202 return createDbClients(cfg, clientCreator, subscribe, sentinelCreateCb)
205 func createDbClients(cfg Config, clientCreator RedisClientCreator,
206 subscribe SubscribeFn,
207 sentinelCreateCb RedisSentinelCreateCb) []*DB {
208 if cfg.clusterAddrList == "" {
209 return []*DB{createLegacyDbClient(cfg, clientCreator, subscribe, sentinelCreateCb)}
214 addrList := strings.Split(cfg.clusterAddrList, ",")
215 for _, addr := range addrList {
216 db := createDbClient(cfg, addr, clientCreator, subscribe, sentinelCreateCb)
217 dbs = append(dbs, db)
222 func createLegacyDbClient(cfg Config, clientCreator RedisClientCreator,
223 subscribe SubscribeFn,
224 sentinelCreateCb RedisSentinelCreateCb) *DB {
225 return createDbClient(cfg, cfg.hostname, clientCreator, subscribe, sentinelCreateCb)
228 func createDbClient(cfg Config, hostName string, clientCreator RedisClientCreator,
229 subscribe SubscribeFn,
230 sentinelCreateCb RedisSentinelCreateCb) *DB {
231 var client RedisClient
233 if cfg.sentinelPort == "" {
234 client = clientCreator(hostName, cfg.port, "", false)
235 db = CreateDB(client, subscribe, nil, cfg, hostName)
237 client = clientCreator(hostName, cfg.sentinelPort, cfg.masterName, true)
238 db = CreateDB(client, subscribe, sentinelCreateCb, cfg, hostName)
244 func newRedisClient(addr, port, clusterName string, isHa bool) RedisClient {
246 sentinelAddress := addr + ":" + port
247 return redis.NewFailoverClient(
248 &redis.FailoverOptions{
249 MasterName: clusterName,
250 SentinelAddrs: []string{sentinelAddress},
256 redisAddress := addr + ":" + port
257 return redis.NewClient(&redis.Options{
259 Password: "", // no password set
260 DB: 0, // use default DB
266 func (db *DB) CheckCommands() {
267 commands, err := db.client.Command().Result()
269 redisModuleCommands := []string{"setie", "delie", "setiepub", "setnxpub",
270 "msetmpub", "delmpub"}
271 for _, v := range redisModuleCommands {
274 db.redisModules = false
278 dbLogger.Printf("SDL DB commands checking failure: %s\n", err)
282 func (db *DB) CloseDB() error {
283 return db.client.Close()
286 func (db *DB) UnsubscribeChannelDB(channels ...string) {
287 for _, v := range channels {
289 db.ch.removeChannel <- v
290 if db.sCbMap.Count() == 0 {
296 func (db *DB) SubscribeChannelDB(cb func(string, ...string), channelPrefix, eventSeparator string, channels ...string) {
297 if db.sCbMap.Count() == 0 {
298 for _, v := range channels {
302 go func(sCbMap *sharedCbMap,
304 eventSeparator string,
306 channels ...string) {
307 sub := db.subscribe(db.client, channels...)
308 rxChannel := sub.Channel()
309 lCbMap := sCbMap.GetMapCopy()
312 case msg := <-rxChannel:
313 cb, ok := lCbMap[msg.Channel]
315 cb(strings.TrimPrefix(msg.Channel, channelPrefix), strings.Split(msg.Payload, eventSeparator)...)
317 case channel := <-ch.addChannel:
318 lCbMap = sCbMap.GetMapCopy()
319 sub.Subscribe(channel)
320 case channel := <-ch.removeChannel:
321 lCbMap = sCbMap.GetMapCopy()
322 sub.Unsubscribe(channel)
323 case exit := <-ch.exit:
325 if err := sub.Close(); err != nil {
326 dbLogger.Printf("SDL DB channel closing failure: %s\n", err)
332 }(db.sCbMap, channelPrefix, eventSeparator, db.ch, channels...)
335 for _, v := range channels {
337 db.ch.addChannel <- v
342 func (db *DB) MSet(pairs ...interface{}) error {
343 return db.client.MSet(pairs...).Err()
346 func (db *DB) MSetMPub(channelsAndEvents []string, pairs ...interface{}) error {
347 if !db.redisModules {
348 return errors.New("Redis deployment doesn't support MSETMPUB command")
350 command := make([]interface{}, 0)
351 command = append(command, "MSETMPUB")
352 command = append(command, len(pairs)/2)
353 command = append(command, len(channelsAndEvents)/2)
354 for _, d := range pairs {
355 command = append(command, d)
357 for _, d := range channelsAndEvents {
358 command = append(command, d)
360 _, err := db.client.Do(command...).Result()
364 func (db *DB) MGet(keys []string) ([]interface{}, error) {
365 return db.client.MGet(keys...).Result()
368 func (db *DB) DelMPub(channelsAndEvents []string, keys []string) error {
369 if !db.redisModules {
370 return errors.New("Redis deployment not supporting command DELMPUB")
372 command := make([]interface{}, 0)
373 command = append(command, "DELMPUB")
374 command = append(command, len(keys))
375 command = append(command, len(channelsAndEvents)/2)
376 for _, d := range keys {
377 command = append(command, d)
379 for _, d := range channelsAndEvents {
380 command = append(command, d)
382 _, err := db.client.Do(command...).Result()
387 func (db *DB) Del(keys []string) error {
388 _, err := db.client.Del(keys...).Result()
392 func (db *DB) Keys(pattern string) ([]string, error) {
393 return db.client.Keys(pattern).Result()
396 func (db *DB) SetIE(key string, oldData, newData interface{}) (bool, error) {
397 if !db.redisModules {
398 return false, errors.New("Redis deployment not supporting command")
401 return checkResultAndError(db.client.Do("SETIE", key, newData, oldData).Result())
404 func (db *DB) SetIEPub(channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error) {
405 if !db.redisModules {
406 return false, errors.New("Redis deployment not supporting command SETIEMPUB")
408 capacity := 4 + len(channelsAndEvents)
409 command := make([]interface{}, 0, capacity)
410 command = append(command, "SETIEMPUB")
411 command = append(command, key)
412 command = append(command, newData)
413 command = append(command, oldData)
414 for _, ce := range channelsAndEvents {
415 command = append(command, ce)
417 return checkResultAndError(db.client.Do(command...).Result())
420 func (db *DB) SetNXPub(channelsAndEvents []string, key string, data interface{}) (bool, error) {
421 if !db.redisModules {
422 return false, errors.New("Redis deployment not supporting command SETNXMPUB")
424 capacity := 3 + len(channelsAndEvents)
425 command := make([]interface{}, 0, capacity)
426 command = append(command, "SETNXMPUB")
427 command = append(command, key)
428 command = append(command, data)
429 for _, ce := range channelsAndEvents {
430 command = append(command, ce)
432 return checkResultAndError(db.client.Do(command...).Result())
434 func (db *DB) SetNX(key string, data interface{}, expiration time.Duration) (bool, error) {
435 return db.client.SetNX(key, data, expiration).Result()
438 func (db *DB) DelIEPub(channelsAndEvents []string, key string, data interface{}) (bool, error) {
439 if !db.redisModules {
440 return false, errors.New("Redis deployment not supporting command DELIEMPUB")
442 capacity := 3 + len(channelsAndEvents)
443 command := make([]interface{}, 0, capacity)
444 command = append(command, "DELIEMPUB")
445 command = append(command, key)
446 command = append(command, data)
447 for _, ce := range channelsAndEvents {
448 command = append(command, ce)
450 return checkIntResultAndError(db.client.Do(command...).Result())
453 func (db *DB) DelIE(key string, data interface{}) (bool, error) {
454 if !db.redisModules {
455 return false, errors.New("Redis deployment not supporting command")
457 return checkIntResultAndError(db.client.Do("DELIE", key, data).Result())
460 func (db *DB) SAdd(key string, data ...interface{}) error {
461 _, err := db.client.SAdd(key, data...).Result()
465 func (db *DB) SRem(key string, data ...interface{}) error {
466 _, err := db.client.SRem(key, data...).Result()
470 func (db *DB) SMembers(key string) ([]string, error) {
471 result, err := db.client.SMembers(key).Result()
475 func (db *DB) SIsMember(key string, data interface{}) (bool, error) {
476 result, err := db.client.SIsMember(key, data).Result()
480 func (db *DB) SCard(key string) (int64, error) {
481 result, err := db.client.SCard(key).Result()
485 func (db *DB) PTTL(key string) (time.Duration, error) {
486 result, err := db.client.PTTL(key).Result()
490 func (db *DB) Info() (*DbInfo, error) {
492 resultStr, err := db.client.Info("all").Result()
497 result := strings.Split(strings.ReplaceAll(resultStr, "\r\n", "\n"), "\n")
498 err = readRedisInfoReplyFields(result, &info)
502 func readRedisInfoReplyFields(input []string, info *DbInfo) error {
503 for _, line := range input {
504 if idx := strings.Index(line, "role:"); idx != -1 {
505 roleStr := line[idx+len("role:"):]
506 if roleStr == "master" {
507 info.Fields.PrimaryRole = true
509 } else if idx := strings.Index(line, "connected_slaves:"); idx != -1 {
510 cntStr := line[idx+len("connected_slaves:"):]
511 cnt, err := strconv.ParseUint(cntStr, 10, 32)
513 return fmt.Errorf("Info reply error: %s", err.Error())
515 info.Fields.ConnectedReplicaCnt = uint32(cnt)
521 func (db *DB) State() (*DbState, error) {
522 dbState := new(DbState)
523 if db.cfg.sentinelPort != "" {
524 //Establish connection to Redis sentinel. The reason why connection is done
525 //here instead of time of the SDL instance creation is that for the time being
526 //sentinel connection is needed only here to get state information and
527 //state information is needed only by 'sdlcli' hence it is not time critical
528 //and also we want to avoid opening unnecessary TCP connections towards Redis
529 //sentinel for every SDL instance. Now it is done only when 'sdlcli' is used.
530 sentinelClient := db.sentinel(&db.cfg, db.addr)
531 return sentinelClient.GetDbState()
533 info, err := db.Info()
535 dbState.PrimaryDbState.Err = err
538 return db.fillDbStateFromDbInfo(info)
542 func (db *DB) fillDbStateFromDbInfo(info *DbInfo) (*DbState, error) {
544 if info.Fields.PrimaryRole == true {
546 PrimaryDbState: PrimaryDbState{
547 Fields: PrimaryDbStateFields{
555 cnt, err := strconv.Atoi(db.cfg.nodeCnt)
557 dbState.Err = fmt.Errorf("DBAAS_NODE_COUNT configuration value '%s' conversion to integer failed", db.cfg.nodeCnt)
559 dbState.ConfigNodeCnt = cnt
562 return &dbState, dbState.Err
565 var luaRefresh = redis.NewScript(`if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("pexpire", KEYS[1], ARGV[2]) else return 0 end`)
567 func (db *DB) PExpireIE(key string, data interface{}, expiration time.Duration) error {
568 expirationStr := strconv.FormatInt(int64(expiration/time.Millisecond), 10)
569 result, err := luaRefresh.Run(db.client, []string{key}, data, expirationStr).Result()
573 if result == int64(1) {
576 return errors.New("Lock not held")
579 func (sCbMap *sharedCbMap) Add(channel string, cb ChannelNotificationCb) {
581 defer sCbMap.m.Unlock()
582 sCbMap.cbMap[channel] = cb
585 func (sCbMap *sharedCbMap) Remove(channel string) {
587 defer sCbMap.m.Unlock()
588 delete(sCbMap.cbMap, channel)
591 func (sCbMap *sharedCbMap) Count() int {
593 defer sCbMap.m.Unlock()
594 return len(sCbMap.cbMap)
597 func (sCbMap *sharedCbMap) GetMapCopy() map[string]ChannelNotificationCb {
599 defer sCbMap.m.Unlock()
600 mapCopy := make(map[string]ChannelNotificationCb, 0)
601 for i, v := range sCbMap.cbMap {