2 Copyright (c) 2019 AT&T Intellectual Property.
3 Copyright (c) 2018-2019 Nokia.
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
9 http://www.apache.org/licenses/LICENSE-2.0
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
19 * This source code is part of the near-RT RIC (RAN Intelligent Controller)
20 * platform project (RICP).
29 "github.com/go-redis/redis/v8"
41 const EventSeparator = "___"
42 const NsSeparator = ","
44 type ChannelNotificationCb func(channel string, payload ...string)
45 type RedisClientCreator func(addr, port, clusterName string, isHa bool) RedisClient
47 type intChannels struct {
48 addChannel chan string
49 removeChannel chan string
53 type sharedCbMap struct {
55 cbMap map[string]ChannelNotificationCb
63 clusterAddrList string
70 sentinel RedisSentinelCreateCb
79 type Subscriber interface {
80 Channel(opts ...redis.ChannelOption) <-chan *redis.Message
81 Subscribe(ctx context.Context, channels ...string) error
82 Unsubscribe(ctx context.Context, channels ...string) error
86 type SubscribeFn func(ctx context.Context, client RedisClient, channels ...string) Subscriber
88 type RedisClient interface {
89 Command(ctx context.Context) *redis.CommandsInfoCmd
91 Subscribe(ctx context.Context, channels ...string) *redis.PubSub
92 MSet(ctx context.Context, pairs ...interface{}) *redis.StatusCmd
93 Do(ctx context.Context, args ...interface{}) *redis.Cmd
94 MGet(ctx context.Context, keys ...string) *redis.SliceCmd
95 Del(ctx context.Context, keys ...string) *redis.IntCmd
96 Keys(ctx context.Context, pattern string) *redis.StringSliceCmd
97 SetNX(ctx context.Context, key string, value interface{}, expiration time.Duration) *redis.BoolCmd
98 SAdd(ctx context.Context, key string, members ...interface{}) *redis.IntCmd
99 SRem(ctx context.Context, key string, members ...interface{}) *redis.IntCmd
100 SMembers(ctx context.Context, key string) *redis.StringSliceCmd
101 SIsMember(ctx context.Context, key string, member interface{}) *redis.BoolCmd
102 SCard(ctx context.Context, key string) *redis.IntCmd
103 PTTL(ctx context.Context, key string) *redis.DurationCmd
104 Eval(ctx context.Context, script string, keys []string, args ...interface{}) *redis.Cmd
105 EvalSha(ctx context.Context, sha1 string, keys []string, args ...interface{}) *redis.Cmd
106 ScriptExists(ctx context.Context, scripts ...string) *redis.BoolSliceCmd
107 ScriptLoad(ctx context.Context, script string) *redis.StringCmd
108 Info(ctx context.Context, section ...string) *redis.StringCmd
115 log: log.New(os.Stdout, "database: ", log.LstdFlags|log.Lshortfile),
117 redis.SetLogger(dbLogger)
120 func SetDbLogger(out io.Writer) {
121 dbLogger.log.SetOutput(out)
124 func checkResultAndError(result interface{}, err error) (bool, error) {
126 if err == redis.Nil {
137 func checkIntResultAndError(result interface{}, err error) (bool, error) {
141 if n, ok := result.(int64); ok {
145 } else if n, ok := result.(int); ok {
153 func subscribeNotifications(ctx context.Context, client RedisClient, channels ...string) Subscriber {
154 return client.Subscribe(ctx, channels...)
157 func CreateDB(client RedisClient, subscribe SubscribeFn, sentinelCreateCb RedisSentinelCreateCb, cfg Config, sentinelAddr string) *DB {
159 ctx: context.Background(),
161 sentinel: sentinelCreateCb,
162 subscribe: subscribe,
164 sCbMap: &sharedCbMap{cbMap: make(map[string]ChannelNotificationCb, 0)},
166 addChannel: make(chan string),
167 removeChannel: make(chan string),
168 exit: make(chan bool),
177 func Create() []*DB {
179 return ReadConfigAndCreateDbClients(osimpl, newRedisClient, subscribeNotifications, newRedisSentinel)
182 func readConfig(osI OS) Config {
184 hostname: osI.Getenv("DBAAS_SERVICE_HOST", "localhost"),
185 port: osI.Getenv("DBAAS_SERVICE_PORT", "6379"),
186 masterName: osI.Getenv("DBAAS_MASTER_NAME", ""),
187 sentinelPort: osI.Getenv("DBAAS_SERVICE_SENTINEL_PORT", ""),
188 clusterAddrList: osI.Getenv("DBAAS_CLUSTER_ADDR_LIST", ""),
189 nodeCnt: osI.Getenv("DBAAS_NODE_COUNT", "1"),
195 Getenv(key string, defValue string) string
200 func (osImpl) Getenv(key string, defValue string) string {
201 val := os.Getenv(key)
208 func ReadConfigAndCreateDbClients(osI OS, clientCreator RedisClientCreator,
209 subscribe SubscribeFn,
210 sentinelCreateCb RedisSentinelCreateCb) []*DB {
211 cfg := readConfig(osI)
212 return createDbClients(cfg, clientCreator, subscribe, sentinelCreateCb)
215 func createDbClients(cfg Config, clientCreator RedisClientCreator,
216 subscribe SubscribeFn,
217 sentinelCreateCb RedisSentinelCreateCb) []*DB {
218 if cfg.clusterAddrList == "" {
219 return []*DB{createLegacyDbClient(cfg, clientCreator, subscribe, sentinelCreateCb)}
224 addrList := strings.Split(cfg.clusterAddrList, ",")
225 for _, addr := range addrList {
226 db := createDbClient(cfg, addr, clientCreator, subscribe, sentinelCreateCb)
227 dbs = append(dbs, db)
232 func createLegacyDbClient(cfg Config, clientCreator RedisClientCreator,
233 subscribe SubscribeFn,
234 sentinelCreateCb RedisSentinelCreateCb) *DB {
235 return createDbClient(cfg, cfg.hostname, clientCreator, subscribe, sentinelCreateCb)
238 func createDbClient(cfg Config, hostName string, clientCreator RedisClientCreator,
239 subscribe SubscribeFn,
240 sentinelCreateCb RedisSentinelCreateCb) *DB {
241 var client RedisClient
243 if cfg.sentinelPort == "" {
244 client = clientCreator(hostName, cfg.port, "", false)
245 db = CreateDB(client, subscribe, nil, cfg, hostName)
247 client = clientCreator(hostName, cfg.sentinelPort, cfg.masterName, true)
248 db = CreateDB(client, subscribe, sentinelCreateCb, cfg, hostName)
254 func newRedisClient(addr, port, clusterName string, isHa bool) RedisClient {
256 sentinelAddress := addr + ":" + port
257 return redis.NewFailoverClient(
258 &redis.FailoverOptions{
259 MasterName: clusterName,
260 SentinelAddrs: []string{sentinelAddress},
266 redisAddress := addr + ":" + port
267 return redis.NewClient(&redis.Options{
269 Password: "", // no password set
270 DB: 0, // use default DB
276 func (db *DB) CheckCommands() {
277 commands, err := db.client.Command(db.ctx).Result()
279 redisModuleCommands := []string{"setie", "delie", "setiepub", "setnxpub",
280 "msetmpub", "delmpub"}
281 for _, v := range redisModuleCommands {
284 db.redisModules = false
288 dbLogger.Printf(db.ctx, "SDL DB commands checking failure: %s\n", err)
292 func (db *DB) CloseDB() error {
293 return db.client.Close()
296 func (db *DB) UnsubscribeChannelDB(channels ...string) {
297 for _, v := range channels {
299 db.ch.removeChannel <- v
300 if db.sCbMap.Count() == 0 {
306 func (db *DB) SubscribeChannelDB(cb func(string, ...string), channels ...string) {
307 if db.sCbMap.Count() == 0 {
308 for _, v := range channels {
312 go func(sCbMap *sharedCbMap,
314 channels ...string) {
315 sub := db.subscribe(db.ctx, db.client, channels...)
316 rxChannel := sub.Channel()
317 lCbMap := sCbMap.GetMapCopy()
320 case msg := <-rxChannel:
321 cb, ok := lCbMap[msg.Channel]
323 nSChNames := strings.SplitAfterN(msg.Channel, NsSeparator, 2)
324 cb(nSChNames[1], strings.Split(msg.Payload, EventSeparator)...)
326 case channel := <-ch.addChannel:
327 lCbMap = sCbMap.GetMapCopy()
328 sub.Subscribe(db.ctx, channel)
329 case channel := <-ch.removeChannel:
330 lCbMap = sCbMap.GetMapCopy()
331 sub.Unsubscribe(db.ctx, channel)
332 case exit := <-ch.exit:
334 if err := sub.Close(); err != nil {
335 dbLogger.Printf(db.ctx, "SDL DB channel closing failure: %s\n", err)
341 }(db.sCbMap, db.ch, channels...)
344 for _, v := range channels {
346 db.ch.addChannel <- v
351 func (db *DB) MSet(pairs ...interface{}) error {
352 return db.client.MSet(db.ctx, pairs...).Err()
355 func (db *DB) MSetMPub(channelsAndEvents []string, pairs ...interface{}) error {
356 if !db.redisModules {
357 return errors.New("Redis deployment doesn't support MSETMPUB command")
359 command := make([]interface{}, 0)
360 command = append(command, "MSETMPUB")
361 command = append(command, len(pairs)/2)
362 command = append(command, len(channelsAndEvents)/2)
363 for _, d := range pairs {
364 command = append(command, d)
366 for _, d := range channelsAndEvents {
367 command = append(command, d)
369 _, err := db.client.Do(db.ctx, command...).Result()
373 func (db *DB) MGet(keys []string) ([]interface{}, error) {
374 return db.client.MGet(db.ctx, keys...).Result()
377 func (db *DB) DelMPub(channelsAndEvents []string, keys []string) error {
378 if !db.redisModules {
379 return errors.New("Redis deployment not supporting command DELMPUB")
381 command := make([]interface{}, 0)
382 command = append(command, "DELMPUB")
383 command = append(command, len(keys))
384 command = append(command, len(channelsAndEvents)/2)
385 for _, d := range keys {
386 command = append(command, d)
388 for _, d := range channelsAndEvents {
389 command = append(command, d)
391 _, err := db.client.Do(db.ctx, command...).Result()
396 func (db *DB) Del(keys []string) error {
397 _, err := db.client.Del(db.ctx, keys...).Result()
401 func (db *DB) Keys(pattern string) ([]string, error) {
402 return db.client.Keys(db.ctx, pattern).Result()
405 func (db *DB) SetIE(key string, oldData, newData interface{}) (bool, error) {
406 if !db.redisModules {
407 return false, errors.New("Redis deployment not supporting command")
410 return checkResultAndError(db.client.Do(db.ctx, "SETIE", key, newData, oldData).Result())
413 func (db *DB) SetIEPub(channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error) {
414 if !db.redisModules {
415 return false, errors.New("Redis deployment not supporting command SETIEMPUB")
417 capacity := 4 + len(channelsAndEvents)
418 command := make([]interface{}, 0, capacity)
419 command = append(command, "SETIEMPUB")
420 command = append(command, key)
421 command = append(command, newData)
422 command = append(command, oldData)
423 for _, ce := range channelsAndEvents {
424 command = append(command, ce)
426 return checkResultAndError(db.client.Do(db.ctx, command...).Result())
429 func (db *DB) SetNXPub(channelsAndEvents []string, key string, data interface{}) (bool, error) {
430 if !db.redisModules {
431 return false, errors.New("Redis deployment not supporting command SETNXMPUB")
433 capacity := 3 + len(channelsAndEvents)
434 command := make([]interface{}, 0, capacity)
435 command = append(command, "SETNXMPUB")
436 command = append(command, key)
437 command = append(command, data)
438 for _, ce := range channelsAndEvents {
439 command = append(command, ce)
441 return checkResultAndError(db.client.Do(db.ctx, command...).Result())
443 func (db *DB) SetNX(key string, data interface{}, expiration time.Duration) (bool, error) {
444 return db.client.SetNX(db.ctx, key, data, expiration).Result()
447 func (db *DB) DelIEPub(channelsAndEvents []string, key string, data interface{}) (bool, error) {
448 if !db.redisModules {
449 return false, errors.New("Redis deployment not supporting command DELIEMPUB")
451 capacity := 3 + len(channelsAndEvents)
452 command := make([]interface{}, 0, capacity)
453 command = append(command, "DELIEMPUB")
454 command = append(command, key)
455 command = append(command, data)
456 for _, ce := range channelsAndEvents {
457 command = append(command, ce)
459 return checkIntResultAndError(db.client.Do(db.ctx, command...).Result())
462 func (db *DB) DelIE(key string, data interface{}) (bool, error) {
463 if !db.redisModules {
464 return false, errors.New("Redis deployment not supporting command")
466 return checkIntResultAndError(db.client.Do(db.ctx, "DELIE", key, data).Result())
469 func (db *DB) SAdd(key string, data ...interface{}) error {
470 _, err := db.client.SAdd(db.ctx, key, data...).Result()
474 func (db *DB) SRem(key string, data ...interface{}) error {
475 _, err := db.client.SRem(db.ctx, key, data...).Result()
479 func (db *DB) SMembers(key string) ([]string, error) {
480 result, err := db.client.SMembers(db.ctx, key).Result()
484 func (db *DB) SIsMember(key string, data interface{}) (bool, error) {
485 result, err := db.client.SIsMember(db.ctx, key, data).Result()
489 func (db *DB) SCard(key string) (int64, error) {
490 result, err := db.client.SCard(db.ctx, key).Result()
494 func (db *DB) PTTL(key string) (time.Duration, error) {
495 result, err := db.client.PTTL(db.ctx, key).Result()
499 func (db *DB) Info() (*DbInfo, error) {
501 resultStr, err := db.client.Info(db.ctx, "all").Result()
506 result := strings.Split(strings.ReplaceAll(resultStr, "\r\n", "\n"), "\n")
507 readRedisInfoReplyFields(result, &info)
511 func lineContains(line, substr string) bool {
512 return strings.Contains(line, substr)
515 func getFieldValueStr(line, substr string) string {
516 if idx := strings.Index(line, substr); idx != -1 {
517 return line[idx+len(substr):]
522 func getUint32FromString(s string) uint32 {
523 if val, err := strconv.ParseUint(s, 10, 32); err == nil {
529 func getUint64FromString(s string) uint64 {
530 if val, err := strconv.ParseUint(s, 10, 64); err == nil {
536 func getFloatFromString(s string, bitSize int) float64 {
537 if val, err := strconv.ParseFloat(s, bitSize); err == nil {
543 func getFloat64FromString(s string) float64 {
544 return getFloatFromString(s, 64)
547 func getFloat32FromString(s string) float32 {
548 return float32(getFloatFromString(s, 32))
551 func getValueString(values string, key string) string {
552 slice := strings.Split(values, ",")
553 for _, s := range slice {
554 if lineContains(s, key) {
555 return getFieldValueStr(s, key)
561 func getCommandstatsValues(values string) (string, string, string) {
562 calls := getValueString(values, "calls=")
563 usec := getValueString(values, "usec=")
564 usecPerCall := getValueString(values, "usec_per_call=")
565 return calls, usec, usecPerCall
568 func updateCommandstatsValues(i interface{}, values string) {
569 stype := reflect.ValueOf(i).Elem()
570 callsStr, usecStr, usecPerCallStr := getCommandstatsValues(values)
572 callsField := stype.FieldByName("Calls")
573 callsField.Set(reflect.ValueOf(getUint32FromString(callsStr)))
575 usecField := stype.FieldByName("Usec")
576 usecField.Set(reflect.ValueOf(getUint32FromString(usecStr)))
578 usecPerCallField := stype.FieldByName("UsecPerCall")
579 usecPerCallField.Set(reflect.ValueOf(getFloat32FromString(usecPerCallStr)))
582 func getKeyspaceValues(values string) (string, string, string) {
583 keys := getValueString(values, "keys=")
584 expires := getValueString(values, "expires=")
585 avgttl := getValueString(values, "avg_ttl=")
586 return keys, expires, avgttl
589 func updateKeyspaceValues(i interface{}, values string) {
590 stype := reflect.ValueOf(i).Elem()
591 keysStr, expiresStr, avgttlStr := getKeyspaceValues(values)
593 keysField := stype.FieldByName("Keys")
594 keysField.Set(reflect.ValueOf(getUint32FromString(keysStr)))
596 expiresField := stype.FieldByName("Expires")
597 expiresField.Set(reflect.ValueOf(getUint32FromString(expiresStr)))
599 avgttlField := stype.FieldByName("AvgTtl")
600 avgttlField.Set(reflect.ValueOf(getUint32FromString(avgttlStr)))
603 func updateServerInfoFields(config ConfigInfo, info *DbInfo) {
604 if value, ok := config["uptime_in_days"]; ok {
605 info.Fields.Server.UptimeInDays = getUint32FromString(value)
609 func updateClientInfoFields(config ConfigInfo, info *DbInfo) {
610 if value, ok := config["connected_clients"]; ok {
611 info.Fields.Clients.ConnectedClients = getUint32FromString(value)
613 if value, ok := config["client_recent_max_input_buffer"]; ok {
614 info.Fields.Clients.ClientRecentMaxInputBuffer = getUint32FromString(value)
616 if value, ok := config["client_recent_max_output_buffer"]; ok {
617 info.Fields.Clients.ClientRecentMaxOutputBuffer = getUint32FromString(value)
621 func updateMemoryInfoFields(config ConfigInfo, info *DbInfo) {
622 if value, ok := config["used_memory"]; ok {
623 info.Fields.Memory.UsedMemory = getUint64FromString(value)
625 if value, ok := config["used_memory_human"]; ok {
626 info.Fields.Memory.UsedMemoryHuman = value
628 if value, ok := config["used_memory_rss"]; ok {
629 info.Fields.Memory.UsedMemoryRss = getUint64FromString(value)
631 if value, ok := config["used_memory_rss_human"]; ok {
632 info.Fields.Memory.UsedMemoryRssHuman = value
634 if value, ok := config["used_memory_peak"]; ok {
635 info.Fields.Memory.UsedMemoryPeak = getUint64FromString(value)
637 if value, ok := config["used_memory_peak_human"]; ok {
638 info.Fields.Memory.UsedMemoryPeakHuman = value
640 if value, ok := config["used_memory_peak_perc"]; ok {
641 info.Fields.Memory.UsedMemoryPeakPerc = value
643 if value, ok := config["mem_fragmentation_ratio"]; ok {
644 info.Fields.Memory.MemFragmentationRatio = getFloat32FromString(value)
646 if value, ok := config["mem_fragmentation_bytes"]; ok {
647 info.Fields.Memory.MemFragmentationBytes = getUint32FromString(value)
651 func updateStatsInfoFields(config ConfigInfo, info *DbInfo) {
652 if value, ok := config["total_connections_received"]; ok {
653 info.Fields.Stats.TotalConnectionsReceived = getUint32FromString(value)
655 if value, ok := config["total_commands_processed"]; ok {
656 info.Fields.Stats.TotalCommandsProcessed = getUint32FromString(value)
658 if value, ok := config["sync_full"]; ok {
659 info.Fields.Stats.SyncFull = getUint32FromString(value)
661 if value, ok := config["sync_partial_ok"]; ok {
662 info.Fields.Stats.SyncPartialOk = getUint32FromString(value)
664 if value, ok := config["sync_partial_err"]; ok {
665 info.Fields.Stats.SyncPartialErr = getUint32FromString(value)
667 if value, ok := config["pubsub_channels"]; ok {
668 info.Fields.Stats.PubsubChannels = getUint32FromString(value)
672 func updateCpuInfoFields(config ConfigInfo, info *DbInfo) {
673 if value, ok := config["used_cpu_sys"]; ok {
674 info.Fields.Cpu.UsedCpuSys = getFloat64FromString(value)
676 if value, ok := config["used_cpu_user"]; ok {
677 info.Fields.Cpu.UsedCpuUser = getFloat64FromString(value)
681 func updateCommandstatsInfoFields(config ConfigInfo, info *DbInfo) {
682 if values, ok := config["cmdstat_replconf"]; ok {
683 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatReplconf, values)
685 if values, ok := config["cmdstat_keys"]; ok {
686 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatKeys, values)
688 if values, ok := config["cmdstat_role"]; ok {
689 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatRole, values)
691 if values, ok := config["cmdstat_psync"]; ok {
692 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatPsync, values)
694 if values, ok := config["cmdstat_mset"]; ok {
695 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatMset, values)
697 if values, ok := config["cmdstat_publish"]; ok {
698 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatPublish, values)
700 if values, ok := config["cmdstat_info"]; ok {
701 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatInfo, values)
703 if values, ok := config["cmdstat_ping"]; ok {
704 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatPing, values)
706 if values, ok := config["cmdstat_client"]; ok {
707 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatClient, values)
709 if values, ok := config["cmdstat_command"]; ok {
710 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatCommand, values)
712 if values, ok := config["cmdstat_subscribe"]; ok {
713 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatSubscribe, values)
715 if values, ok := config["cmdstat_monitor"]; ok {
716 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatMonitor, values)
718 if values, ok := config["cmdstat_config"]; ok {
719 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatConfig, values)
721 if values, ok := config["cmdstat_slaveof"]; ok {
722 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatSlaveof, values)
726 func updateKeyspaceInfoFields(config ConfigInfo, info *DbInfo) {
727 if values, ok := config["db0"]; ok {
728 updateKeyspaceValues(&info.Fields.Keyspace.Db, values)
732 func getConfigInfo(input []string) ConfigInfo {
733 config := ConfigInfo{}
734 for _, line := range input {
735 if i := strings.Index(line, ":"); i != -1 {
736 if key := strings.TrimSpace(line[:i]); len(key) > 0 {
738 config[key] = strings.TrimSpace(line[i+1:])
746 func readRedisInfoReplyFields(input []string, info *DbInfo) {
747 config := getConfigInfo(input)
749 if value, ok := config["role"]; ok {
750 if "master" == value {
751 info.Fields.PrimaryRole = true
754 if value, ok := config["connected_slaves"]; ok {
755 info.Fields.ConnectedReplicaCnt = getUint32FromString(value)
757 updateServerInfoFields(config, info)
758 updateClientInfoFields(config, info)
759 updateMemoryInfoFields(config, info)
760 updateStatsInfoFields(config, info)
761 updateCpuInfoFields(config, info)
762 updateCommandstatsInfoFields(config, info)
763 updateKeyspaceInfoFields(config, info)
766 func (db *DB) State() (*DbState, error) {
767 dbState := new(DbState)
768 if db.cfg.sentinelPort != "" {
769 //Establish connection to Redis sentinel. The reason why connection is done
770 //here instead of time of the SDL instance creation is that for the time being
771 //sentinel connection is needed only here to get state information and
772 //state information is needed only by 'sdlcli' hence it is not time critical
773 //and also we want to avoid opening unnecessary TCP connections towards Redis
774 //sentinel for every SDL instance. Now it is done only when 'sdlcli' is used.
775 sentinelClient := db.sentinel(&db.cfg, db.addr)
776 return sentinelClient.GetDbState()
778 info, err := db.Info()
780 dbState.PrimaryDbState.Err = err
783 return db.fillDbStateFromDbInfo(info)
787 func (db *DB) fillDbStateFromDbInfo(info *DbInfo) (*DbState, error) {
789 if info.Fields.PrimaryRole == true {
791 PrimaryDbState: PrimaryDbState{
792 Fields: PrimaryDbStateFields{
802 cnt, err := strconv.Atoi(db.cfg.nodeCnt)
804 dbState.Err = fmt.Errorf("DBAAS_NODE_COUNT configuration value '%s' conversion to integer failed", db.cfg.nodeCnt)
806 dbState.ConfigNodeCnt = cnt
809 return &dbState, dbState.Err
812 func createReplicaDbClient(host string) *DB {
813 cfg := readConfig(osImpl{})
814 cfg.sentinelPort = ""
815 cfg.clusterAddrList, cfg.port, _ = net.SplitHostPort(host)
816 return createDbClient(cfg, cfg.clusterAddrList, newRedisClient, subscribeNotifications, nil)
819 func getStatisticsInfo(db *DB, host string) (*DbStatisticsInfo, error) {
820 dbStatisticsInfo := new(DbStatisticsInfo)
821 dbStatisticsInfo.IPAddr, dbStatisticsInfo.Port, _ = net.SplitHostPort(host)
823 info, err := db.Info()
827 dbStatisticsInfo.Info = info
829 return dbStatisticsInfo, nil
832 func sentinelStatistics(db *DB) (*DbStatistics, error) {
833 dbState := new(DbState)
834 dbStatistics := new(DbStatistics)
835 dbStatisticsInfo := new(DbStatisticsInfo)
838 dbState, err = db.State()
843 dbStatisticsInfo, err = getStatisticsInfo(db, dbState.PrimaryDbState.GetAddress())
844 dbStatistics.Stats = append(dbStatistics.Stats, dbStatisticsInfo)
846 if dbState.ReplicasDbState != nil {
847 for _, r := range dbState.ReplicasDbState.States {
848 replicaDb := createReplicaDbClient(r.GetAddress())
849 dbStatisticsInfo, err = getStatisticsInfo(replicaDb, r.GetAddress())
854 dbStatistics.Stats = append(dbStatistics.Stats, dbStatisticsInfo)
858 return dbStatistics, nil
861 func standaloneStatistics(db *DB) (*DbStatistics, error) {
862 dbStatistics := new(DbStatistics)
864 dbStatisticsInfo, err := getStatisticsInfo(db, net.JoinHostPort(db.cfg.hostname, db.cfg.port))
865 dbStatistics.Stats = append(dbStatistics.Stats, dbStatisticsInfo)
867 return dbStatistics, err
870 func (db *DB) Statistics() (*DbStatistics, error) {
871 if db.cfg.sentinelPort != "" {
872 return sentinelStatistics(db)
875 return standaloneStatistics(db)
878 var luaRefresh = redis.NewScript(`if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("pexpire", KEYS[1], ARGV[2]) else return 0 end`)
880 func (db *DB) PExpireIE(key string, data interface{}, expiration time.Duration) error {
881 expirationStr := strconv.FormatInt(int64(expiration/time.Millisecond), 10)
882 result, err := luaRefresh.Run(db.ctx, db.client, []string{key}, data, expirationStr).Result()
886 if result == int64(1) {
889 return errors.New("Lock not held")
892 func (sCbMap *sharedCbMap) Add(channel string, cb ChannelNotificationCb) {
894 defer sCbMap.m.Unlock()
895 sCbMap.cbMap[channel] = cb
898 func (sCbMap *sharedCbMap) Remove(channel string) {
900 defer sCbMap.m.Unlock()
901 delete(sCbMap.cbMap, channel)
904 func (sCbMap *sharedCbMap) Count() int {
906 defer sCbMap.m.Unlock()
907 return len(sCbMap.cbMap)
910 func (sCbMap *sharedCbMap) GetMapCopy() map[string]ChannelNotificationCb {
912 defer sCbMap.m.Unlock()
913 mapCopy := make(map[string]ChannelNotificationCb, 0)
914 for i, v := range sCbMap.cbMap {