2 Copyright (c) 2019 AT&T Intellectual Property.
3 Copyright (c) 2018-2019 Nokia.
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
9 http://www.apache.org/licenses/LICENSE-2.0
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
19 * This source code is part of the near-RT RIC (RAN Intelligent Controller)
20 * platform project (RICP).
28 "github.com/go-redis/redis/v7"
40 type ChannelNotificationCb func(channel string, payload ...string)
41 type RedisClientCreator func(addr, port, clusterName string, isHa bool) RedisClient
43 type intChannels struct {
44 addChannel chan string
45 removeChannel chan string
49 type sharedCbMap struct {
51 cbMap map[string]ChannelNotificationCb
59 clusterAddrList string
65 sentinel RedisSentinelCreateCb
74 type Subscriber interface {
75 Channel() <-chan *redis.Message
76 Subscribe(channels ...string) error
77 Unsubscribe(channels ...string) error
81 type SubscribeFn func(client RedisClient, channels ...string) Subscriber
83 type RedisClient interface {
84 Command() *redis.CommandsInfoCmd
86 Subscribe(channels ...string) *redis.PubSub
87 MSet(pairs ...interface{}) *redis.StatusCmd
88 Do(args ...interface{}) *redis.Cmd
89 MGet(keys ...string) *redis.SliceCmd
90 Del(keys ...string) *redis.IntCmd
91 Keys(pattern string) *redis.StringSliceCmd
92 SetNX(key string, value interface{}, expiration time.Duration) *redis.BoolCmd
93 SAdd(key string, members ...interface{}) *redis.IntCmd
94 SRem(key string, members ...interface{}) *redis.IntCmd
95 SMembers(key string) *redis.StringSliceCmd
96 SIsMember(key string, member interface{}) *redis.BoolCmd
97 SCard(key string) *redis.IntCmd
98 PTTL(key string) *redis.DurationCmd
99 Eval(script string, keys []string, args ...interface{}) *redis.Cmd
100 EvalSha(sha1 string, keys []string, args ...interface{}) *redis.Cmd
101 ScriptExists(scripts ...string) *redis.BoolSliceCmd
102 ScriptLoad(script string) *redis.StringCmd
103 Info(section ...string) *redis.StringCmd
106 var dbLogger *log.Logger
109 dbLogger = log.New(os.Stdout, "database: ", log.LstdFlags|log.Lshortfile)
110 redis.SetLogger(dbLogger)
113 func SetDbLogger(out io.Writer) {
114 dbLogger.SetOutput(out)
117 func checkResultAndError(result interface{}, err error) (bool, error) {
119 if err == redis.Nil {
130 func checkIntResultAndError(result interface{}, err error) (bool, error) {
134 if n, ok := result.(int64); ok {
138 } else if n, ok := result.(int); ok {
146 func subscribeNotifications(client RedisClient, channels ...string) Subscriber {
147 return client.Subscribe(channels...)
150 func CreateDB(client RedisClient, subscribe SubscribeFn, sentinelCreateCb RedisSentinelCreateCb, cfg Config, sentinelAddr string) *DB {
153 sentinel: sentinelCreateCb,
154 subscribe: subscribe,
156 sCbMap: &sharedCbMap{cbMap: make(map[string]ChannelNotificationCb, 0)},
158 addChannel: make(chan string),
159 removeChannel: make(chan string),
160 exit: make(chan bool),
169 func Create() []*DB {
171 return ReadConfigAndCreateDbClients(osimpl, newRedisClient, subscribeNotifications, newRedisSentinel)
174 func readConfig(osI OS) Config {
176 hostname: osI.Getenv("DBAAS_SERVICE_HOST", "localhost"),
177 port: osI.Getenv("DBAAS_SERVICE_PORT", "6379"),
178 masterName: osI.Getenv("DBAAS_MASTER_NAME", ""),
179 sentinelPort: osI.Getenv("DBAAS_SERVICE_SENTINEL_PORT", ""),
180 clusterAddrList: osI.Getenv("DBAAS_CLUSTER_ADDR_LIST", ""),
181 nodeCnt: osI.Getenv("DBAAS_NODE_COUNT", "1"),
187 Getenv(key string, defValue string) string
192 func (osImpl) Getenv(key string, defValue string) string {
193 val := os.Getenv(key)
200 func ReadConfigAndCreateDbClients(osI OS, clientCreator RedisClientCreator,
201 subscribe SubscribeFn,
202 sentinelCreateCb RedisSentinelCreateCb) []*DB {
203 cfg := readConfig(osI)
204 return createDbClients(cfg, clientCreator, subscribe, sentinelCreateCb)
207 func createDbClients(cfg Config, clientCreator RedisClientCreator,
208 subscribe SubscribeFn,
209 sentinelCreateCb RedisSentinelCreateCb) []*DB {
210 if cfg.clusterAddrList == "" {
211 return []*DB{createLegacyDbClient(cfg, clientCreator, subscribe, sentinelCreateCb)}
216 addrList := strings.Split(cfg.clusterAddrList, ",")
217 for _, addr := range addrList {
218 db := createDbClient(cfg, addr, clientCreator, subscribe, sentinelCreateCb)
219 dbs = append(dbs, db)
224 func createLegacyDbClient(cfg Config, clientCreator RedisClientCreator,
225 subscribe SubscribeFn,
226 sentinelCreateCb RedisSentinelCreateCb) *DB {
227 return createDbClient(cfg, cfg.hostname, clientCreator, subscribe, sentinelCreateCb)
230 func createDbClient(cfg Config, hostName string, clientCreator RedisClientCreator,
231 subscribe SubscribeFn,
232 sentinelCreateCb RedisSentinelCreateCb) *DB {
233 var client RedisClient
235 if cfg.sentinelPort == "" {
236 client = clientCreator(hostName, cfg.port, "", false)
237 db = CreateDB(client, subscribe, nil, cfg, hostName)
239 client = clientCreator(hostName, cfg.sentinelPort, cfg.masterName, true)
240 db = CreateDB(client, subscribe, sentinelCreateCb, cfg, hostName)
246 func newRedisClient(addr, port, clusterName string, isHa bool) RedisClient {
248 sentinelAddress := addr + ":" + port
249 return redis.NewFailoverClient(
250 &redis.FailoverOptions{
251 MasterName: clusterName,
252 SentinelAddrs: []string{sentinelAddress},
258 redisAddress := addr + ":" + port
259 return redis.NewClient(&redis.Options{
261 Password: "", // no password set
262 DB: 0, // use default DB
268 func (db *DB) CheckCommands() {
269 commands, err := db.client.Command().Result()
271 redisModuleCommands := []string{"setie", "delie", "setiepub", "setnxpub",
272 "msetmpub", "delmpub"}
273 for _, v := range redisModuleCommands {
276 db.redisModules = false
280 dbLogger.Printf("SDL DB commands checking failure: %s\n", err)
284 func (db *DB) CloseDB() error {
285 return db.client.Close()
288 func (db *DB) UnsubscribeChannelDB(channels ...string) {
289 for _, v := range channels {
291 db.ch.removeChannel <- v
292 if db.sCbMap.Count() == 0 {
298 func (db *DB) SubscribeChannelDB(cb func(string, ...string), channelPrefix, eventSeparator string, channels ...string) {
299 if db.sCbMap.Count() == 0 {
300 for _, v := range channels {
304 go func(sCbMap *sharedCbMap,
306 eventSeparator string,
308 channels ...string) {
309 sub := db.subscribe(db.client, channels...)
310 rxChannel := sub.Channel()
311 lCbMap := sCbMap.GetMapCopy()
314 case msg := <-rxChannel:
315 cb, ok := lCbMap[msg.Channel]
317 cb(strings.TrimPrefix(msg.Channel, channelPrefix), strings.Split(msg.Payload, eventSeparator)...)
319 case channel := <-ch.addChannel:
320 lCbMap = sCbMap.GetMapCopy()
321 sub.Subscribe(channel)
322 case channel := <-ch.removeChannel:
323 lCbMap = sCbMap.GetMapCopy()
324 sub.Unsubscribe(channel)
325 case exit := <-ch.exit:
327 if err := sub.Close(); err != nil {
328 dbLogger.Printf("SDL DB channel closing failure: %s\n", err)
334 }(db.sCbMap, channelPrefix, eventSeparator, db.ch, channels...)
337 for _, v := range channels {
339 db.ch.addChannel <- v
344 func (db *DB) MSet(pairs ...interface{}) error {
345 return db.client.MSet(pairs...).Err()
348 func (db *DB) MSetMPub(channelsAndEvents []string, pairs ...interface{}) error {
349 if !db.redisModules {
350 return errors.New("Redis deployment doesn't support MSETMPUB command")
352 command := make([]interface{}, 0)
353 command = append(command, "MSETMPUB")
354 command = append(command, len(pairs)/2)
355 command = append(command, len(channelsAndEvents)/2)
356 for _, d := range pairs {
357 command = append(command, d)
359 for _, d := range channelsAndEvents {
360 command = append(command, d)
362 _, err := db.client.Do(command...).Result()
366 func (db *DB) MGet(keys []string) ([]interface{}, error) {
367 return db.client.MGet(keys...).Result()
370 func (db *DB) DelMPub(channelsAndEvents []string, keys []string) error {
371 if !db.redisModules {
372 return errors.New("Redis deployment not supporting command DELMPUB")
374 command := make([]interface{}, 0)
375 command = append(command, "DELMPUB")
376 command = append(command, len(keys))
377 command = append(command, len(channelsAndEvents)/2)
378 for _, d := range keys {
379 command = append(command, d)
381 for _, d := range channelsAndEvents {
382 command = append(command, d)
384 _, err := db.client.Do(command...).Result()
389 func (db *DB) Del(keys []string) error {
390 _, err := db.client.Del(keys...).Result()
394 func (db *DB) Keys(pattern string) ([]string, error) {
395 return db.client.Keys(pattern).Result()
398 func (db *DB) SetIE(key string, oldData, newData interface{}) (bool, error) {
399 if !db.redisModules {
400 return false, errors.New("Redis deployment not supporting command")
403 return checkResultAndError(db.client.Do("SETIE", key, newData, oldData).Result())
406 func (db *DB) SetIEPub(channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error) {
407 if !db.redisModules {
408 return false, errors.New("Redis deployment not supporting command SETIEMPUB")
410 capacity := 4 + len(channelsAndEvents)
411 command := make([]interface{}, 0, capacity)
412 command = append(command, "SETIEMPUB")
413 command = append(command, key)
414 command = append(command, newData)
415 command = append(command, oldData)
416 for _, ce := range channelsAndEvents {
417 command = append(command, ce)
419 return checkResultAndError(db.client.Do(command...).Result())
422 func (db *DB) SetNXPub(channelsAndEvents []string, key string, data interface{}) (bool, error) {
423 if !db.redisModules {
424 return false, errors.New("Redis deployment not supporting command SETNXMPUB")
426 capacity := 3 + len(channelsAndEvents)
427 command := make([]interface{}, 0, capacity)
428 command = append(command, "SETNXMPUB")
429 command = append(command, key)
430 command = append(command, data)
431 for _, ce := range channelsAndEvents {
432 command = append(command, ce)
434 return checkResultAndError(db.client.Do(command...).Result())
436 func (db *DB) SetNX(key string, data interface{}, expiration time.Duration) (bool, error) {
437 return db.client.SetNX(key, data, expiration).Result()
440 func (db *DB) DelIEPub(channelsAndEvents []string, key string, data interface{}) (bool, error) {
441 if !db.redisModules {
442 return false, errors.New("Redis deployment not supporting command DELIEMPUB")
444 capacity := 3 + len(channelsAndEvents)
445 command := make([]interface{}, 0, capacity)
446 command = append(command, "DELIEMPUB")
447 command = append(command, key)
448 command = append(command, data)
449 for _, ce := range channelsAndEvents {
450 command = append(command, ce)
452 return checkIntResultAndError(db.client.Do(command...).Result())
455 func (db *DB) DelIE(key string, data interface{}) (bool, error) {
456 if !db.redisModules {
457 return false, errors.New("Redis deployment not supporting command")
459 return checkIntResultAndError(db.client.Do("DELIE", key, data).Result())
462 func (db *DB) SAdd(key string, data ...interface{}) error {
463 _, err := db.client.SAdd(key, data...).Result()
467 func (db *DB) SRem(key string, data ...interface{}) error {
468 _, err := db.client.SRem(key, data...).Result()
472 func (db *DB) SMembers(key string) ([]string, error) {
473 result, err := db.client.SMembers(key).Result()
477 func (db *DB) SIsMember(key string, data interface{}) (bool, error) {
478 result, err := db.client.SIsMember(key, data).Result()
482 func (db *DB) SCard(key string) (int64, error) {
483 result, err := db.client.SCard(key).Result()
487 func (db *DB) PTTL(key string) (time.Duration, error) {
488 result, err := db.client.PTTL(key).Result()
492 func (db *DB) Info() (*DbInfo, error) {
494 resultStr, err := db.client.Info("all").Result()
499 result := strings.Split(strings.ReplaceAll(resultStr, "\r\n", "\n"), "\n")
500 readRedisInfoReplyFields(result, &info)
504 func lineContains(line, substr string) bool {
505 return strings.Contains(line, substr)
508 func getFieldValueStr(line, substr string) string {
509 if idx := strings.Index(line, substr); idx != -1 {
510 return line[idx+len(substr):]
515 func getUint32FromString(s string) uint32 {
516 if val, err := strconv.ParseUint(s, 10, 32); err == nil {
522 func getUint64FromString(s string) uint64 {
523 if val, err := strconv.ParseUint(s, 10, 64); err == nil {
529 func getFloatFromString(s string, bitSize int) float64 {
530 if val, err := strconv.ParseFloat(s, bitSize); err == nil {
536 func getFloat64FromString(s string) float64 {
537 return getFloatFromString(s, 64)
540 func getFloat32FromString(s string) float32 {
541 return float32(getFloatFromString(s, 32))
544 func getValueString(values string, key string) string {
545 slice := strings.Split(values, ",")
546 for _, s := range slice {
547 if lineContains(s, key) {
548 return getFieldValueStr(s, key)
554 func getCommandstatsValues(values string) (string, string, string) {
555 calls := getValueString(values, "calls=")
556 usec := getValueString(values, "usec=")
557 usecPerCall := getValueString(values, "usec_per_call=")
558 return calls, usec, usecPerCall
561 func updateCommandstatsValues(i interface{}, values string) {
562 stype := reflect.ValueOf(i).Elem()
563 callsStr, usecStr, usecPerCallStr := getCommandstatsValues(values)
565 callsField := stype.FieldByName("Calls")
566 callsField.Set(reflect.ValueOf(getUint32FromString(callsStr)))
568 usecField := stype.FieldByName("Usec")
569 usecField.Set(reflect.ValueOf(getUint32FromString(usecStr)))
571 usecPerCallField := stype.FieldByName("UsecPerCall")
572 usecPerCallField.Set(reflect.ValueOf(getFloat32FromString(usecPerCallStr)))
575 func getKeyspaceValues(values string) (string, string, string) {
576 keys := getValueString(values, "keys=")
577 expires := getValueString(values, "expires=")
578 avgttl := getValueString(values, "avg_ttl=")
579 return keys, expires, avgttl
582 func updateKeyspaceValues(i interface{}, values string) {
583 stype := reflect.ValueOf(i).Elem()
584 keysStr, expiresStr, avgttlStr := getKeyspaceValues(values)
586 keysField := stype.FieldByName("Keys")
587 keysField.Set(reflect.ValueOf(getUint32FromString(keysStr)))
589 expiresField := stype.FieldByName("Expires")
590 expiresField.Set(reflect.ValueOf(getUint32FromString(expiresStr)))
592 avgttlField := stype.FieldByName("AvgTtl")
593 avgttlField.Set(reflect.ValueOf(getUint32FromString(avgttlStr)))
596 func updateServerInfoFields(config ConfigInfo, info *DbInfo) {
597 if value, ok := config["uptime_in_days"]; ok {
598 info.Fields.Server.UptimeInDays = getUint32FromString(value)
602 func updateClientInfoFields(config ConfigInfo, info *DbInfo) {
603 if value, ok := config["connected_clients"]; ok {
604 info.Fields.Clients.ConnectedClients = getUint32FromString(value)
606 if value, ok := config["client_recent_max_input_buffer"]; ok {
607 info.Fields.Clients.ClientRecentMaxInputBuffer = getUint32FromString(value)
609 if value, ok := config["client_recent_max_output_buffer"]; ok {
610 info.Fields.Clients.ClientRecentMaxOutputBuffer = getUint32FromString(value)
614 func updateMemoryInfoFields(config ConfigInfo, info *DbInfo) {
615 if value, ok := config["used_memory"]; ok {
616 info.Fields.Memory.UsedMemory = getUint64FromString(value)
618 if value, ok := config["used_memory_human"]; ok {
619 info.Fields.Memory.UsedMemoryHuman = value
621 if value, ok := config["used_memory_rss"]; ok {
622 info.Fields.Memory.UsedMemoryRss = getUint64FromString(value)
624 if value, ok := config["used_memory_rss_human"]; ok {
625 info.Fields.Memory.UsedMemoryRssHuman = value
627 if value, ok := config["used_memory_peak"]; ok {
628 info.Fields.Memory.UsedMemoryPeak = getUint64FromString(value)
630 if value, ok := config["used_memory_peak_human"]; ok {
631 info.Fields.Memory.UsedMemoryPeakHuman = value
633 if value, ok := config["used_memory_peak_perc"]; ok {
634 info.Fields.Memory.UsedMemoryPeakPerc = value
636 if value, ok := config["mem_fragmentation_ratio"]; ok {
637 info.Fields.Memory.MemFragmentationRatio = getFloat32FromString(value)
639 if value, ok := config["mem_fragmentation_bytes"]; ok {
640 info.Fields.Memory.MemFragmentationBytes = getUint32FromString(value)
644 func updateStatsInfoFields(config ConfigInfo, info *DbInfo) {
645 if value, ok := config["total_connections_received"]; ok {
646 info.Fields.Stats.TotalConnectionsReceived = getUint32FromString(value)
648 if value, ok := config["total_commands_processed"]; ok {
649 info.Fields.Stats.TotalCommandsProcessed = getUint32FromString(value)
651 if value, ok := config["sync_full"]; ok {
652 info.Fields.Stats.SyncFull = getUint32FromString(value)
654 if value, ok := config["sync_partial_ok"]; ok {
655 info.Fields.Stats.SyncPartialOk = getUint32FromString(value)
657 if value, ok := config["sync_partial_err"]; ok {
658 info.Fields.Stats.SyncPartialErr = getUint32FromString(value)
660 if value, ok := config["pubsub_channels"]; ok {
661 info.Fields.Stats.PubsubChannels = getUint32FromString(value)
665 func updateCpuInfoFields(config ConfigInfo, info *DbInfo) {
666 if value, ok := config["used_cpu_sys"]; ok {
667 info.Fields.Cpu.UsedCpuSys = getFloat64FromString(value)
669 if value, ok := config["used_cpu_user"]; ok {
670 info.Fields.Cpu.UsedCpuUser = getFloat64FromString(value)
674 func updateCommandstatsInfoFields(config ConfigInfo, info *DbInfo) {
675 if values, ok := config["cmdstat_replconf"]; ok {
676 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatReplconf, values)
678 if values, ok := config["cmdstat_keys"]; ok {
679 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatKeys, values)
681 if values, ok := config["cmdstat_role"]; ok {
682 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatRole, values)
684 if values, ok := config["cmdstat_psync"]; ok {
685 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatPsync, values)
687 if values, ok := config["cmdstat_mset"]; ok {
688 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatMset, values)
690 if values, ok := config["cmdstat_publish"]; ok {
691 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatPublish, values)
693 if values, ok := config["cmdstat_info"]; ok {
694 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatInfo, values)
696 if values, ok := config["cmdstat_ping"]; ok {
697 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatPing, values)
699 if values, ok := config["cmdstat_client"]; ok {
700 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatClient, values)
702 if values, ok := config["cmdstat_command"]; ok {
703 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatCommand, values)
705 if values, ok := config["cmdstat_subscribe"]; ok {
706 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatSubscribe, values)
708 if values, ok := config["cmdstat_monitor"]; ok {
709 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatMonitor, values)
711 if values, ok := config["cmdstat_config"]; ok {
712 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatConfig, values)
714 if values, ok := config["cmdstat_slaveof"]; ok {
715 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatSlaveof, values)
719 func updateKeyspaceInfoFields(config ConfigInfo, info *DbInfo) {
720 if values, ok := config["db0"]; ok {
721 updateKeyspaceValues(&info.Fields.Keyspace.Db, values)
725 func getConfigInfo(input []string) ConfigInfo {
726 config := ConfigInfo{}
727 for _, line := range input {
728 if i := strings.Index(line, ":"); i != -1 {
729 if key := strings.TrimSpace(line[:i]); len(key) > 0 {
731 config[key] = strings.TrimSpace(line[i+1:])
739 func readRedisInfoReplyFields(input []string, info *DbInfo) {
740 config := getConfigInfo(input)
742 if value, ok := config["role"]; ok {
743 if "master" == value {
744 info.Fields.PrimaryRole = true
747 if value, ok := config["connected_slaves"]; ok {
748 info.Fields.ConnectedReplicaCnt = getUint32FromString(value)
750 updateServerInfoFields(config, info)
751 updateClientInfoFields(config, info)
752 updateMemoryInfoFields(config, info)
753 updateStatsInfoFields(config, info)
754 updateCpuInfoFields(config, info)
755 updateCommandstatsInfoFields(config, info)
756 updateKeyspaceInfoFields(config, info)
759 func (db *DB) State() (*DbState, error) {
760 dbState := new(DbState)
761 if db.cfg.sentinelPort != "" {
762 //Establish connection to Redis sentinel. The reason why connection is done
763 //here instead of time of the SDL instance creation is that for the time being
764 //sentinel connection is needed only here to get state information and
765 //state information is needed only by 'sdlcli' hence it is not time critical
766 //and also we want to avoid opening unnecessary TCP connections towards Redis
767 //sentinel for every SDL instance. Now it is done only when 'sdlcli' is used.
768 sentinelClient := db.sentinel(&db.cfg, db.addr)
769 return sentinelClient.GetDbState()
771 info, err := db.Info()
773 dbState.PrimaryDbState.Err = err
776 return db.fillDbStateFromDbInfo(info)
780 func (db *DB) fillDbStateFromDbInfo(info *DbInfo) (*DbState, error) {
782 if info.Fields.PrimaryRole == true {
784 PrimaryDbState: PrimaryDbState{
785 Fields: PrimaryDbStateFields{
795 cnt, err := strconv.Atoi(db.cfg.nodeCnt)
797 dbState.Err = fmt.Errorf("DBAAS_NODE_COUNT configuration value '%s' conversion to integer failed", db.cfg.nodeCnt)
799 dbState.ConfigNodeCnt = cnt
802 return &dbState, dbState.Err
805 func createReplicaDbClient(host string) *DB {
806 cfg := readConfig(osImpl{})
807 cfg.sentinelPort = ""
808 cfg.clusterAddrList, cfg.port, _ = net.SplitHostPort(host)
809 return createDbClient(cfg, cfg.clusterAddrList, newRedisClient, subscribeNotifications, nil)
812 func getStatisticsInfo(db *DB, host string) (*DbStatisticsInfo, error) {
813 dbStatisticsInfo := new(DbStatisticsInfo)
814 dbStatisticsInfo.IPAddr, dbStatisticsInfo.Port, _ = net.SplitHostPort(host)
816 info, err := db.Info()
820 dbStatisticsInfo.Info = info
822 return dbStatisticsInfo, nil
825 func sentinelStatistics(db *DB) (*DbStatistics, error) {
826 dbState := new(DbState)
827 dbStatistics := new(DbStatistics)
828 dbStatisticsInfo := new(DbStatisticsInfo)
831 dbState, err = db.State()
836 dbStatisticsInfo, err = getStatisticsInfo(db, dbState.PrimaryDbState.GetAddress())
837 dbStatistics.Stats = append(dbStatistics.Stats, dbStatisticsInfo)
839 if dbState.ReplicasDbState != nil {
840 for _, r := range dbState.ReplicasDbState.States {
841 replicaDb := createReplicaDbClient(r.GetAddress())
842 dbStatisticsInfo, err = getStatisticsInfo(replicaDb, r.GetAddress())
847 dbStatistics.Stats = append(dbStatistics.Stats, dbStatisticsInfo)
851 return dbStatistics, nil
854 func standaloneStatistics(db *DB) (*DbStatistics, error) {
855 dbStatistics := new(DbStatistics)
857 dbStatisticsInfo, err := getStatisticsInfo(db, net.JoinHostPort(db.cfg.hostname, db.cfg.port))
858 dbStatistics.Stats = append(dbStatistics.Stats, dbStatisticsInfo)
860 return dbStatistics, err
863 func (db *DB) Statistics() (*DbStatistics, error) {
864 if db.cfg.sentinelPort != "" {
865 return sentinelStatistics(db)
868 return standaloneStatistics(db)
871 var luaRefresh = redis.NewScript(`if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("pexpire", KEYS[1], ARGV[2]) else return 0 end`)
873 func (db *DB) PExpireIE(key string, data interface{}, expiration time.Duration) error {
874 expirationStr := strconv.FormatInt(int64(expiration/time.Millisecond), 10)
875 result, err := luaRefresh.Run(db.client, []string{key}, data, expirationStr).Result()
879 if result == int64(1) {
882 return errors.New("Lock not held")
885 func (sCbMap *sharedCbMap) Add(channel string, cb ChannelNotificationCb) {
887 defer sCbMap.m.Unlock()
888 sCbMap.cbMap[channel] = cb
891 func (sCbMap *sharedCbMap) Remove(channel string) {
893 defer sCbMap.m.Unlock()
894 delete(sCbMap.cbMap, channel)
897 func (sCbMap *sharedCbMap) Count() int {
899 defer sCbMap.m.Unlock()
900 return len(sCbMap.cbMap)
903 func (sCbMap *sharedCbMap) GetMapCopy() map[string]ChannelNotificationCb {
905 defer sCbMap.m.Unlock()
906 mapCopy := make(map[string]ChannelNotificationCb, 0)
907 for i, v := range sCbMap.cbMap {