2 Copyright (c) 2019 AT&T Intellectual Property.
3 Copyright (c) 2018-2019 Nokia.
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
9 http://www.apache.org/licenses/LICENSE-2.0
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
19 * This source code is part of the near-RT RIC (RAN Intelligent Controller)
20 * platform project (RICP).
28 "github.com/go-redis/redis/v7"
40 type ChannelNotificationCb func(channel string, payload ...string)
41 type RedisClientCreator func(addr, port, clusterName string, isHa bool) RedisClient
43 type intChannels struct {
44 addChannel chan string
45 removeChannel chan string
49 type sharedCbMap struct {
51 cbMap map[string]ChannelNotificationCb
59 clusterAddrList string
65 sentinel RedisSentinelCreateCb
74 type Subscriber interface {
75 Channel() <-chan *redis.Message
76 Subscribe(channels ...string) error
77 Unsubscribe(channels ...string) error
81 type SubscribeFn func(client RedisClient, channels ...string) Subscriber
83 type RedisClient interface {
84 Command() *redis.CommandsInfoCmd
86 Subscribe(channels ...string) *redis.PubSub
87 MSet(pairs ...interface{}) *redis.StatusCmd
88 Do(args ...interface{}) *redis.Cmd
89 MGet(keys ...string) *redis.SliceCmd
90 Del(keys ...string) *redis.IntCmd
91 Keys(pattern string) *redis.StringSliceCmd
92 SetNX(key string, value interface{}, expiration time.Duration) *redis.BoolCmd
93 SAdd(key string, members ...interface{}) *redis.IntCmd
94 SRem(key string, members ...interface{}) *redis.IntCmd
95 SMembers(key string) *redis.StringSliceCmd
96 SIsMember(key string, member interface{}) *redis.BoolCmd
97 SCard(key string) *redis.IntCmd
98 PTTL(key string) *redis.DurationCmd
99 Eval(script string, keys []string, args ...interface{}) *redis.Cmd
100 EvalSha(sha1 string, keys []string, args ...interface{}) *redis.Cmd
101 ScriptExists(scripts ...string) *redis.BoolSliceCmd
102 ScriptLoad(script string) *redis.StringCmd
103 Info(section ...string) *redis.StringCmd
106 var dbLogger *log.Logger
109 dbLogger = log.New(os.Stdout, "database: ", log.LstdFlags|log.Lshortfile)
110 redis.SetLogger(dbLogger)
113 func SetDbLogger(out io.Writer) {
114 dbLogger.SetOutput(out)
117 func checkResultAndError(result interface{}, err error) (bool, error) {
119 if err == redis.Nil {
130 func checkIntResultAndError(result interface{}, err error) (bool, error) {
134 if n, ok := result.(int64); ok {
138 } else if n, ok := result.(int); ok {
146 func subscribeNotifications(client RedisClient, channels ...string) Subscriber {
147 return client.Subscribe(channels...)
150 func CreateDB(client RedisClient, subscribe SubscribeFn, sentinelCreateCb RedisSentinelCreateCb, cfg Config, sentinelAddr string) *DB {
153 sentinel: sentinelCreateCb,
154 subscribe: subscribe,
156 sCbMap: &sharedCbMap{cbMap: make(map[string]ChannelNotificationCb, 0)},
158 addChannel: make(chan string),
159 removeChannel: make(chan string),
160 exit: make(chan bool),
169 func Create() []*DB {
171 return ReadConfigAndCreateDbClients(osimpl, newRedisClient, subscribeNotifications, newRedisSentinel)
174 func readConfig(osI OS) Config {
176 hostname: osI.Getenv("DBAAS_SERVICE_HOST", "localhost"),
177 port: osI.Getenv("DBAAS_SERVICE_PORT", "6379"),
178 masterName: osI.Getenv("DBAAS_MASTER_NAME", ""),
179 sentinelPort: osI.Getenv("DBAAS_SERVICE_SENTINEL_PORT", ""),
180 clusterAddrList: osI.Getenv("DBAAS_CLUSTER_ADDR_LIST", ""),
181 nodeCnt: osI.Getenv("DBAAS_NODE_COUNT", "1"),
187 Getenv(key string, defValue string) string
192 func (osImpl) Getenv(key string, defValue string) string {
193 val := os.Getenv(key)
200 func ReadConfigAndCreateDbClients(osI OS, clientCreator RedisClientCreator,
201 subscribe SubscribeFn,
202 sentinelCreateCb RedisSentinelCreateCb) []*DB {
203 cfg := readConfig(osI)
204 return createDbClients(cfg, clientCreator, subscribe, sentinelCreateCb)
207 func createDbClients(cfg Config, clientCreator RedisClientCreator,
208 subscribe SubscribeFn,
209 sentinelCreateCb RedisSentinelCreateCb) []*DB {
210 if cfg.clusterAddrList == "" {
211 return []*DB{createLegacyDbClient(cfg, clientCreator, subscribe, sentinelCreateCb)}
216 addrList := strings.Split(cfg.clusterAddrList, ",")
217 for _, addr := range addrList {
218 db := createDbClient(cfg, addr, clientCreator, subscribe, sentinelCreateCb)
219 dbs = append(dbs, db)
224 func createLegacyDbClient(cfg Config, clientCreator RedisClientCreator,
225 subscribe SubscribeFn,
226 sentinelCreateCb RedisSentinelCreateCb) *DB {
227 return createDbClient(cfg, cfg.hostname, clientCreator, subscribe, sentinelCreateCb)
230 func createDbClient(cfg Config, hostName string, clientCreator RedisClientCreator,
231 subscribe SubscribeFn,
232 sentinelCreateCb RedisSentinelCreateCb) *DB {
233 var client RedisClient
235 if cfg.sentinelPort == "" {
236 client = clientCreator(hostName, cfg.port, "", false)
237 db = CreateDB(client, subscribe, nil, cfg, hostName)
239 client = clientCreator(hostName, cfg.sentinelPort, cfg.masterName, true)
240 db = CreateDB(client, subscribe, sentinelCreateCb, cfg, hostName)
246 func newRedisClient(addr, port, clusterName string, isHa bool) RedisClient {
248 sentinelAddress := addr + ":" + port
249 return redis.NewFailoverClient(
250 &redis.FailoverOptions{
251 MasterName: clusterName,
252 SentinelAddrs: []string{sentinelAddress},
258 redisAddress := addr + ":" + port
259 return redis.NewClient(&redis.Options{
261 Password: "", // no password set
262 DB: 0, // use default DB
268 func (db *DB) CheckCommands() {
269 commands, err := db.client.Command().Result()
271 redisModuleCommands := []string{"setie", "delie", "setiepub", "setnxpub",
272 "msetmpub", "delmpub"}
273 for _, v := range redisModuleCommands {
276 db.redisModules = false
280 dbLogger.Printf("SDL DB commands checking failure: %s\n", err)
284 func (db *DB) CloseDB() error {
285 return db.client.Close()
288 func (db *DB) UnsubscribeChannelDB(channels ...string) {
289 for _, v := range channels {
291 db.ch.removeChannel <- v
292 if db.sCbMap.Count() == 0 {
298 func (db *DB) SubscribeChannelDB(cb func(string, ...string), channelPrefix, eventSeparator string, channels ...string) {
299 if db.sCbMap.Count() == 0 {
300 for _, v := range channels {
304 go func(sCbMap *sharedCbMap,
306 eventSeparator string,
308 channels ...string) {
309 sub := db.subscribe(db.client, channels...)
310 rxChannel := sub.Channel()
311 lCbMap := sCbMap.GetMapCopy()
314 case msg := <-rxChannel:
315 cb, ok := lCbMap[msg.Channel]
317 cb(strings.TrimPrefix(msg.Channel, channelPrefix), strings.Split(msg.Payload, eventSeparator)...)
319 case channel := <-ch.addChannel:
320 lCbMap = sCbMap.GetMapCopy()
321 sub.Subscribe(channel)
322 case channel := <-ch.removeChannel:
323 lCbMap = sCbMap.GetMapCopy()
324 sub.Unsubscribe(channel)
325 case exit := <-ch.exit:
327 if err := sub.Close(); err != nil {
328 dbLogger.Printf("SDL DB channel closing failure: %s\n", err)
334 }(db.sCbMap, channelPrefix, eventSeparator, db.ch, channels...)
337 for _, v := range channels {
339 db.ch.addChannel <- v
344 func (db *DB) MSet(pairs ...interface{}) error {
345 return db.client.MSet(pairs...).Err()
348 func (db *DB) MSetMPub(channelsAndEvents []string, pairs ...interface{}) error {
349 if !db.redisModules {
350 return errors.New("Redis deployment doesn't support MSETMPUB command")
352 command := make([]interface{}, 0)
353 command = append(command, "MSETMPUB")
354 command = append(command, len(pairs)/2)
355 command = append(command, len(channelsAndEvents)/2)
356 for _, d := range pairs {
357 command = append(command, d)
359 for _, d := range channelsAndEvents {
360 command = append(command, d)
362 _, err := db.client.Do(command...).Result()
366 func (db *DB) MGet(keys []string) ([]interface{}, error) {
367 return db.client.MGet(keys...).Result()
370 func (db *DB) DelMPub(channelsAndEvents []string, keys []string) error {
371 if !db.redisModules {
372 return errors.New("Redis deployment not supporting command DELMPUB")
374 command := make([]interface{}, 0)
375 command = append(command, "DELMPUB")
376 command = append(command, len(keys))
377 command = append(command, len(channelsAndEvents)/2)
378 for _, d := range keys {
379 command = append(command, d)
381 for _, d := range channelsAndEvents {
382 command = append(command, d)
384 _, err := db.client.Do(command...).Result()
389 func (db *DB) Del(keys []string) error {
390 _, err := db.client.Del(keys...).Result()
394 func (db *DB) Keys(pattern string) ([]string, error) {
395 return db.client.Keys(pattern).Result()
398 func (db *DB) SetIE(key string, oldData, newData interface{}) (bool, error) {
399 if !db.redisModules {
400 return false, errors.New("Redis deployment not supporting command")
403 return checkResultAndError(db.client.Do("SETIE", key, newData, oldData).Result())
406 func (db *DB) SetIEPub(channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error) {
407 if !db.redisModules {
408 return false, errors.New("Redis deployment not supporting command SETIEMPUB")
410 capacity := 4 + len(channelsAndEvents)
411 command := make([]interface{}, 0, capacity)
412 command = append(command, "SETIEMPUB")
413 command = append(command, key)
414 command = append(command, newData)
415 command = append(command, oldData)
416 for _, ce := range channelsAndEvents {
417 command = append(command, ce)
419 return checkResultAndError(db.client.Do(command...).Result())
422 func (db *DB) SetNXPub(channelsAndEvents []string, key string, data interface{}) (bool, error) {
423 if !db.redisModules {
424 return false, errors.New("Redis deployment not supporting command SETNXMPUB")
426 capacity := 3 + len(channelsAndEvents)
427 command := make([]interface{}, 0, capacity)
428 command = append(command, "SETNXMPUB")
429 command = append(command, key)
430 command = append(command, data)
431 for _, ce := range channelsAndEvents {
432 command = append(command, ce)
434 return checkResultAndError(db.client.Do(command...).Result())
436 func (db *DB) SetNX(key string, data interface{}, expiration time.Duration) (bool, error) {
437 return db.client.SetNX(key, data, expiration).Result()
440 func (db *DB) DelIEPub(channelsAndEvents []string, key string, data interface{}) (bool, error) {
441 if !db.redisModules {
442 return false, errors.New("Redis deployment not supporting command DELIEMPUB")
444 capacity := 3 + len(channelsAndEvents)
445 command := make([]interface{}, 0, capacity)
446 command = append(command, "DELIEMPUB")
447 command = append(command, key)
448 command = append(command, data)
449 for _, ce := range channelsAndEvents {
450 command = append(command, ce)
452 return checkIntResultAndError(db.client.Do(command...).Result())
455 func (db *DB) DelIE(key string, data interface{}) (bool, error) {
456 if !db.redisModules {
457 return false, errors.New("Redis deployment not supporting command")
459 return checkIntResultAndError(db.client.Do("DELIE", key, data).Result())
462 func (db *DB) SAdd(key string, data ...interface{}) error {
463 _, err := db.client.SAdd(key, data...).Result()
467 func (db *DB) SRem(key string, data ...interface{}) error {
468 _, err := db.client.SRem(key, data...).Result()
472 func (db *DB) SMembers(key string) ([]string, error) {
473 result, err := db.client.SMembers(key).Result()
477 func (db *DB) SIsMember(key string, data interface{}) (bool, error) {
478 result, err := db.client.SIsMember(key, data).Result()
482 func (db *DB) SCard(key string) (int64, error) {
483 result, err := db.client.SCard(key).Result()
487 func (db *DB) PTTL(key string) (time.Duration, error) {
488 result, err := db.client.PTTL(key).Result()
492 func (db *DB) Info() (*DbInfo, error) {
494 resultStr, err := db.client.Info("all").Result()
499 result := strings.Split(strings.ReplaceAll(resultStr, "\r\n", "\n"), "\n")
500 err = readRedisInfoReplyFields(result, &info)
504 func lineContains(line, substr string) bool {
505 return strings.Contains(line, substr)
508 func getFieldValueStr(line, substr string) string {
509 if idx := strings.Index(line, substr); idx != -1 {
510 return line[idx+len(substr):]
515 func getUint32FromString(s string) uint32 {
516 if val, err := strconv.ParseUint(s, 10, 32); err == nil {
522 func getUint64FromString(s string) uint64 {
523 if val, err := strconv.ParseUint(s, 10, 64); err == nil {
529 func getFloatFromString(s string, bitSize int) float64 {
530 if val, err := strconv.ParseFloat(s, bitSize); err == nil {
536 func getFloat64FromString(s string) float64 {
537 return getFloatFromString(s, 64)
540 func getFloat32FromString(s string) float32 {
541 return float32(getFloatFromString(s, 32))
544 func getValueString(values string, key string) string {
545 slice := strings.Split(values, ",")
546 for _, s := range slice {
547 if lineContains(s, key) {
548 return getFieldValueStr(s, key)
554 func getCommandstatsValues(values string) (string, string, string) {
555 calls := getValueString(values, "calls=")
556 usec := getValueString(values, "usec=")
557 usecPerCall := getValueString(values, "usec_per_call=")
558 return calls, usec, usecPerCall
561 func updateCommandstatsValues(i interface{}, line, cmdstat string) {
562 stype := reflect.ValueOf(i).Elem()
563 values := getFieldValueStr(line, cmdstat)
564 callsStr, usecStr, usecPerCallStr := getCommandstatsValues(values)
566 callsField := stype.FieldByName("Calls")
567 callsField.Set(reflect.ValueOf(getUint32FromString(callsStr)))
569 usecField := stype.FieldByName("Usec")
570 usecField.Set(reflect.ValueOf(getUint32FromString(usecStr)))
572 usecPerCallField := stype.FieldByName("UsecPerCall")
573 usecPerCallField.Set(reflect.ValueOf(getFloat32FromString(usecPerCallStr)))
576 func getKeyspaceValues(values string) (string, string, string) {
577 keys := getValueString(values, "keys=")
578 expires := getValueString(values, "expires=")
579 avgttl := getValueString(values, "avg_ttl=")
580 return keys, expires, avgttl
583 func updateKeyspaceValues(i interface{}, line, keyspace string) {
584 stype := reflect.ValueOf(i).Elem()
585 values := getFieldValueStr(line, keyspace)
586 keysStr, expiresStr, avgttlStr := getKeyspaceValues(values)
588 keysField := stype.FieldByName("Keys")
589 keysField.Set(reflect.ValueOf(getUint32FromString(keysStr)))
591 expiresField := stype.FieldByName("Expires")
592 expiresField.Set(reflect.ValueOf(getUint32FromString(expiresStr)))
594 avgttlField := stype.FieldByName("AvgTtl")
595 avgttlField.Set(reflect.ValueOf(getUint32FromString(avgttlStr)))
598 func readRedisInfoReplyFields(input []string, info *DbInfo) error {
599 for _, line := range input {
601 case lineContains(line, "role:") && !lineContains(line, "_role:"):
602 if "master" == getFieldValueStr(line, "role:") {
603 info.Fields.PrimaryRole = true
605 case lineContains(line, "connected_slaves:"):
606 info.Fields.ConnectedReplicaCnt = getUint32FromString(getFieldValueStr(line, "connected_slaves:"))
607 case lineContains(line, "uptime_in_days:"):
608 info.Fields.Server.UptimeInDays = getUint32FromString(getFieldValueStr(line, "uptime_in_days:"))
609 case lineContains(line, "connected_clients:"):
610 info.Fields.Clients.ConnectedClients = getUint32FromString(getFieldValueStr(line, "connected_clients:"))
611 case lineContains(line, "client_recent_max_input_buffer:"):
612 info.Fields.Clients.ClientRecentMaxInputBuffer = getUint32FromString(getFieldValueStr(line, "client_recent_max_input_buffer:"))
613 case lineContains(line, "client_recent_max_output_buffer:"):
614 info.Fields.Clients.ClientRecentMaxOutputBuffer = getUint32FromString(getFieldValueStr(line, "client_recent_max_output_buffer:"))
615 case lineContains(line, "used_memory:"):
616 info.Fields.Memory.UsedMemory = getUint64FromString(getFieldValueStr(line, "used_memory:"))
617 case lineContains(line, "used_memory_human:"):
618 info.Fields.Memory.UsedMemoryHuman = getFieldValueStr(line, "used_memory_human:")
619 case lineContains(line, "used_memory_rss:"):
620 info.Fields.Memory.UsedMemoryRss = getUint64FromString(getFieldValueStr(line, "used_memory_rss:"))
621 case lineContains(line, "used_memory_rss_human:"):
622 info.Fields.Memory.UsedMemoryRssHuman = getFieldValueStr(line, "used_memory_rss_human:")
623 case lineContains(line, "used_memory_peak:"):
624 info.Fields.Memory.UsedMemoryPeak = getUint64FromString(getFieldValueStr(line, "used_memory_peak:"))
625 case lineContains(line, "used_memory_peak_human:"):
626 info.Fields.Memory.UsedMemoryPeakHuman = getFieldValueStr(line, "used_memory_peak_human:")
627 case lineContains(line, "used_memory_peak_perc:"):
628 info.Fields.Memory.UsedMemoryPeakPerc = getFieldValueStr(line, "used_memory_peak_perc:")
629 case lineContains(line, "mem_fragmentation_ratio:"):
630 info.Fields.Memory.MemFragmentationRatio = getFloat32FromString(getFieldValueStr(line, "mem_fragmentation_ratio:"))
631 case lineContains(line, "mem_fragmentation_bytes:"):
632 info.Fields.Memory.MemFragmentationBytes = getUint32FromString(getFieldValueStr(line, "mem_fragmentation_bytes:"))
633 case lineContains(line, "total_connections_received:"):
634 info.Fields.Stats.TotalConnectionsReceived = getUint32FromString(getFieldValueStr(line, "total_connections_received:"))
635 case lineContains(line, "total_commands_processed:"):
636 info.Fields.Stats.TotalCommandsProcessed = getUint32FromString(getFieldValueStr(line, "total_commands_processed:"))
637 case lineContains(line, "sync_full:"):
638 info.Fields.Stats.SyncFull = getUint32FromString(getFieldValueStr(line, "sync_full:"))
639 case lineContains(line, "sync_partial_ok:"):
640 info.Fields.Stats.SyncPartialOk = getUint32FromString(getFieldValueStr(line, "sync_partial_ok:"))
641 case lineContains(line, "sync_partial_err:"):
642 info.Fields.Stats.SyncPartialErr = getUint32FromString(getFieldValueStr(line, "sync_partial_err:"))
643 case lineContains(line, "pubsub_channels:"):
644 info.Fields.Stats.PubsubChannels = getUint32FromString(getFieldValueStr(line, "pubsub_channels:"))
645 case lineContains(line, "used_cpu_sys:"):
646 info.Fields.Cpu.UsedCpuSys = getFloat64FromString(getFieldValueStr(line, "used_cpu_sys:"))
647 case lineContains(line, "used_cpu_user:"):
648 info.Fields.Cpu.UsedCpuUser = getFloat64FromString(getFieldValueStr(line, "used_cpu_user:"))
649 case lineContains(line, "cmdstat_replconf:"):
650 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatReplconf, line, "cmdstat_replconf:")
651 case lineContains(line, "cmdstat_keys:"):
652 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatKeys, line, "cmdstat_keys:")
653 case lineContains(line, "cmdstat_role:"):
654 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatRole, line, "cmdstat_role:")
655 case lineContains(line, "cmdstat_psync:"):
656 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatPsync, line, "cmdstat_psync:")
657 case lineContains(line, "cmdstat_mset:"):
658 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatMset, line, "cmdstat_mset:")
659 case lineContains(line, "cmdstat_publish:"):
660 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatPublish, line, "cmdstat_publish:")
661 case lineContains(line, "cmdstat_info:"):
662 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatInfo, line, "cmdstat_info:")
663 case lineContains(line, "cmdstat_ping:"):
664 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatPing, line, "cmdstat_ping:")
665 case lineContains(line, "cmdstat_client:"):
666 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatClient, line, "cmdstat_client:")
667 case lineContains(line, "cmdstat_command:"):
668 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatCommand, line, "cmdstat_command:")
669 case lineContains(line, "cmdstat_subscribe:"):
670 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatSubscribe, line, "cmdstat_subscribe:")
671 case lineContains(line, "cmdstat_monitor:"):
672 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatMonitor, line, "cmdstat_monitor:")
673 case lineContains(line, "cmdstat_config:"):
674 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatConfig, line, "cmdstat_config:")
675 case lineContains(line, "cmdstat_slaveof:"):
676 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatSlaveof, line, "cmdstat_slaveof:")
677 case lineContains(line, "db0:"):
678 updateKeyspaceValues(&info.Fields.Keyspace.Db, line, "db0:")
684 func (db *DB) State() (*DbState, error) {
685 dbState := new(DbState)
686 if db.cfg.sentinelPort != "" {
687 //Establish connection to Redis sentinel. The reason why connection is done
688 //here instead of time of the SDL instance creation is that for the time being
689 //sentinel connection is needed only here to get state information and
690 //state information is needed only by 'sdlcli' hence it is not time critical
691 //and also we want to avoid opening unnecessary TCP connections towards Redis
692 //sentinel for every SDL instance. Now it is done only when 'sdlcli' is used.
693 sentinelClient := db.sentinel(&db.cfg, db.addr)
694 return sentinelClient.GetDbState()
696 info, err := db.Info()
698 dbState.PrimaryDbState.Err = err
701 return db.fillDbStateFromDbInfo(info)
705 func (db *DB) fillDbStateFromDbInfo(info *DbInfo) (*DbState, error) {
707 if info.Fields.PrimaryRole == true {
709 PrimaryDbState: PrimaryDbState{
710 Fields: PrimaryDbStateFields{
720 cnt, err := strconv.Atoi(db.cfg.nodeCnt)
722 dbState.Err = fmt.Errorf("DBAAS_NODE_COUNT configuration value '%s' conversion to integer failed", db.cfg.nodeCnt)
724 dbState.ConfigNodeCnt = cnt
727 return &dbState, dbState.Err
730 func createReplicaDbClient(host string) *DB {
731 cfg := readConfig(osImpl{})
732 cfg.sentinelPort = ""
733 cfg.clusterAddrList, cfg.port, _ = net.SplitHostPort(host)
734 return createDbClient(cfg, cfg.clusterAddrList, newRedisClient, subscribeNotifications, nil)
737 func getStatisticsInfo(db *DB, host string) (*DbStatisticsInfo, error) {
738 dbStatisticsInfo := new(DbStatisticsInfo)
739 dbStatisticsInfo.IPAddr, dbStatisticsInfo.Port, _ = net.SplitHostPort(host)
741 info, err := db.Info()
745 dbStatisticsInfo.Info = info
747 return dbStatisticsInfo, nil
750 func sentinelStatistics(db *DB) (*DbStatistics, error) {
751 dbState := new(DbState)
752 dbStatistics := new(DbStatistics)
753 dbStatisticsInfo := new(DbStatisticsInfo)
756 dbState, err = db.State()
761 dbStatisticsInfo, err = getStatisticsInfo(db, dbState.PrimaryDbState.GetAddress())
762 dbStatistics.Stats = append(dbStatistics.Stats, dbStatisticsInfo)
764 if dbState.ReplicasDbState != nil {
765 for _, r := range dbState.ReplicasDbState.States {
766 replicaDb := createReplicaDbClient(r.GetAddress())
767 dbStatisticsInfo, err = getStatisticsInfo(replicaDb, r.GetAddress())
772 dbStatistics.Stats = append(dbStatistics.Stats, dbStatisticsInfo)
776 return dbStatistics, nil
779 func standaloneStatistics(db *DB) (*DbStatistics, error) {
780 dbStatistics := new(DbStatistics)
782 dbStatisticsInfo, err := getStatisticsInfo(db, net.JoinHostPort(db.cfg.hostname, db.cfg.port))
783 dbStatistics.Stats = append(dbStatistics.Stats, dbStatisticsInfo)
785 return dbStatistics, err
788 func (db *DB) Statistics() (*DbStatistics, error) {
789 if db.cfg.sentinelPort != "" {
790 return sentinelStatistics(db)
793 return standaloneStatistics(db)
796 var luaRefresh = redis.NewScript(`if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("pexpire", KEYS[1], ARGV[2]) else return 0 end`)
798 func (db *DB) PExpireIE(key string, data interface{}, expiration time.Duration) error {
799 expirationStr := strconv.FormatInt(int64(expiration/time.Millisecond), 10)
800 result, err := luaRefresh.Run(db.client, []string{key}, data, expirationStr).Result()
804 if result == int64(1) {
807 return errors.New("Lock not held")
810 func (sCbMap *sharedCbMap) Add(channel string, cb ChannelNotificationCb) {
812 defer sCbMap.m.Unlock()
813 sCbMap.cbMap[channel] = cb
816 func (sCbMap *sharedCbMap) Remove(channel string) {
818 defer sCbMap.m.Unlock()
819 delete(sCbMap.cbMap, channel)
822 func (sCbMap *sharedCbMap) Count() int {
824 defer sCbMap.m.Unlock()
825 return len(sCbMap.cbMap)
828 func (sCbMap *sharedCbMap) GetMapCopy() map[string]ChannelNotificationCb {
830 defer sCbMap.m.Unlock()
831 mapCopy := make(map[string]ChannelNotificationCb, 0)
832 for i, v := range sCbMap.cbMap {