2 Copyright (c) 2019 AT&T Intellectual Property.
3 Copyright (c) 2018-2019 Nokia.
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
9 http://www.apache.org/licenses/LICENSE-2.0
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
19 * This source code is part of the near-RT RIC (RAN Intelligent Controller)
20 * platform project (RICP).
29 "github.com/go-redis/redis/v8"
41 type ChannelNotificationCb func(channel string, payload ...string)
42 type RedisClientCreator func(addr, port, clusterName string, isHa bool) RedisClient
44 type intChannels struct {
45 addChannel chan string
46 removeChannel chan string
50 type sharedCbMap struct {
52 cbMap map[string]ChannelNotificationCb
60 clusterAddrList string
67 sentinel RedisSentinelCreateCb
76 type Subscriber interface {
77 Channel(opts ...redis.ChannelOption) <-chan *redis.Message
78 Subscribe(ctx context.Context, channels ...string) error
79 Unsubscribe(ctx context.Context, channels ...string) error
83 type SubscribeFn func(ctx context.Context, client RedisClient, channels ...string) Subscriber
85 type RedisClient interface {
86 Command(ctx context.Context) *redis.CommandsInfoCmd
88 Subscribe(ctx context.Context, channels ...string) *redis.PubSub
89 MSet(ctx context.Context, pairs ...interface{}) *redis.StatusCmd
90 Do(ctx context.Context, args ...interface{}) *redis.Cmd
91 MGet(ctx context.Context, keys ...string) *redis.SliceCmd
92 Del(ctx context.Context, keys ...string) *redis.IntCmd
93 Keys(ctx context.Context, pattern string) *redis.StringSliceCmd
94 SetNX(ctx context.Context, key string, value interface{}, expiration time.Duration) *redis.BoolCmd
95 SAdd(ctx context.Context, key string, members ...interface{}) *redis.IntCmd
96 SRem(ctx context.Context, key string, members ...interface{}) *redis.IntCmd
97 SMembers(ctx context.Context, key string) *redis.StringSliceCmd
98 SIsMember(ctx context.Context, key string, member interface{}) *redis.BoolCmd
99 SCard(ctx context.Context, key string) *redis.IntCmd
100 PTTL(ctx context.Context, key string) *redis.DurationCmd
101 Eval(ctx context.Context, script string, keys []string, args ...interface{}) *redis.Cmd
102 EvalSha(ctx context.Context, sha1 string, keys []string, args ...interface{}) *redis.Cmd
103 ScriptExists(ctx context.Context, scripts ...string) *redis.BoolSliceCmd
104 ScriptLoad(ctx context.Context, script string) *redis.StringCmd
105 Info(ctx context.Context, section ...string) *redis.StringCmd
112 log: log.New(os.Stdout, "database: ", log.LstdFlags|log.Lshortfile),
114 redis.SetLogger(dbLogger)
117 func SetDbLogger(out io.Writer) {
118 dbLogger.log.SetOutput(out)
121 func checkResultAndError(result interface{}, err error) (bool, error) {
123 if err == redis.Nil {
134 func checkIntResultAndError(result interface{}, err error) (bool, error) {
138 if n, ok := result.(int64); ok {
142 } else if n, ok := result.(int); ok {
150 func subscribeNotifications(ctx context.Context, client RedisClient, channels ...string) Subscriber {
151 return client.Subscribe(ctx, channels...)
154 func CreateDB(client RedisClient, subscribe SubscribeFn, sentinelCreateCb RedisSentinelCreateCb, cfg Config, sentinelAddr string) *DB {
156 ctx: context.Background(),
158 sentinel: sentinelCreateCb,
159 subscribe: subscribe,
161 sCbMap: &sharedCbMap{cbMap: make(map[string]ChannelNotificationCb, 0)},
163 addChannel: make(chan string),
164 removeChannel: make(chan string),
165 exit: make(chan bool),
174 func Create() []*DB {
176 return ReadConfigAndCreateDbClients(osimpl, newRedisClient, subscribeNotifications, newRedisSentinel)
179 func readConfig(osI OS) Config {
181 hostname: osI.Getenv("DBAAS_SERVICE_HOST", "localhost"),
182 port: osI.Getenv("DBAAS_SERVICE_PORT", "6379"),
183 masterName: osI.Getenv("DBAAS_MASTER_NAME", ""),
184 sentinelPort: osI.Getenv("DBAAS_SERVICE_SENTINEL_PORT", ""),
185 clusterAddrList: osI.Getenv("DBAAS_CLUSTER_ADDR_LIST", ""),
186 nodeCnt: osI.Getenv("DBAAS_NODE_COUNT", "1"),
192 Getenv(key string, defValue string) string
197 func (osImpl) Getenv(key string, defValue string) string {
198 val := os.Getenv(key)
205 func ReadConfigAndCreateDbClients(osI OS, clientCreator RedisClientCreator,
206 subscribe SubscribeFn,
207 sentinelCreateCb RedisSentinelCreateCb) []*DB {
208 cfg := readConfig(osI)
209 return createDbClients(cfg, clientCreator, subscribe, sentinelCreateCb)
212 func createDbClients(cfg Config, clientCreator RedisClientCreator,
213 subscribe SubscribeFn,
214 sentinelCreateCb RedisSentinelCreateCb) []*DB {
215 if cfg.clusterAddrList == "" {
216 return []*DB{createLegacyDbClient(cfg, clientCreator, subscribe, sentinelCreateCb)}
221 addrList := strings.Split(cfg.clusterAddrList, ",")
222 for _, addr := range addrList {
223 db := createDbClient(cfg, addr, clientCreator, subscribe, sentinelCreateCb)
224 dbs = append(dbs, db)
229 func createLegacyDbClient(cfg Config, clientCreator RedisClientCreator,
230 subscribe SubscribeFn,
231 sentinelCreateCb RedisSentinelCreateCb) *DB {
232 return createDbClient(cfg, cfg.hostname, clientCreator, subscribe, sentinelCreateCb)
235 func createDbClient(cfg Config, hostName string, clientCreator RedisClientCreator,
236 subscribe SubscribeFn,
237 sentinelCreateCb RedisSentinelCreateCb) *DB {
238 var client RedisClient
240 if cfg.sentinelPort == "" {
241 client = clientCreator(hostName, cfg.port, "", false)
242 db = CreateDB(client, subscribe, nil, cfg, hostName)
244 client = clientCreator(hostName, cfg.sentinelPort, cfg.masterName, true)
245 db = CreateDB(client, subscribe, sentinelCreateCb, cfg, hostName)
251 func newRedisClient(addr, port, clusterName string, isHa bool) RedisClient {
253 sentinelAddress := addr + ":" + port
254 return redis.NewFailoverClient(
255 &redis.FailoverOptions{
256 MasterName: clusterName,
257 SentinelAddrs: []string{sentinelAddress},
263 redisAddress := addr + ":" + port
264 return redis.NewClient(&redis.Options{
266 Password: "", // no password set
267 DB: 0, // use default DB
273 func (db *DB) CheckCommands() {
274 commands, err := db.client.Command(db.ctx).Result()
276 redisModuleCommands := []string{"setie", "delie", "setiepub", "setnxpub",
277 "msetmpub", "delmpub"}
278 for _, v := range redisModuleCommands {
281 db.redisModules = false
285 dbLogger.Printf(db.ctx, "SDL DB commands checking failure: %s\n", err)
289 func (db *DB) CloseDB() error {
290 return db.client.Close()
293 func (db *DB) UnsubscribeChannelDB(channels ...string) {
294 for _, v := range channels {
296 db.ch.removeChannel <- v
297 if db.sCbMap.Count() == 0 {
303 func (db *DB) SubscribeChannelDB(cb func(string, ...string), channelPrefix, eventSeparator string, channels ...string) {
304 if db.sCbMap.Count() == 0 {
305 for _, v := range channels {
309 go func(sCbMap *sharedCbMap,
311 eventSeparator string,
313 channels ...string) {
314 sub := db.subscribe(db.ctx, db.client, channels...)
315 rxChannel := sub.Channel()
316 lCbMap := sCbMap.GetMapCopy()
319 case msg := <-rxChannel:
320 cb, ok := lCbMap[msg.Channel]
322 cb(strings.TrimPrefix(msg.Channel, channelPrefix), strings.Split(msg.Payload, eventSeparator)...)
324 case channel := <-ch.addChannel:
325 lCbMap = sCbMap.GetMapCopy()
326 sub.Subscribe(db.ctx, channel)
327 case channel := <-ch.removeChannel:
328 lCbMap = sCbMap.GetMapCopy()
329 sub.Unsubscribe(db.ctx, channel)
330 case exit := <-ch.exit:
332 if err := sub.Close(); err != nil {
333 dbLogger.Printf(db.ctx, "SDL DB channel closing failure: %s\n", err)
339 }(db.sCbMap, channelPrefix, eventSeparator, db.ch, channels...)
342 for _, v := range channels {
344 db.ch.addChannel <- v
349 func (db *DB) MSet(pairs ...interface{}) error {
350 return db.client.MSet(db.ctx, pairs...).Err()
353 func (db *DB) MSetMPub(channelsAndEvents []string, pairs ...interface{}) error {
354 if !db.redisModules {
355 return errors.New("Redis deployment doesn't support MSETMPUB command")
357 command := make([]interface{}, 0)
358 command = append(command, "MSETMPUB")
359 command = append(command, len(pairs)/2)
360 command = append(command, len(channelsAndEvents)/2)
361 for _, d := range pairs {
362 command = append(command, d)
364 for _, d := range channelsAndEvents {
365 command = append(command, d)
367 _, err := db.client.Do(db.ctx, command...).Result()
371 func (db *DB) MGet(keys []string) ([]interface{}, error) {
372 return db.client.MGet(db.ctx, keys...).Result()
375 func (db *DB) DelMPub(channelsAndEvents []string, keys []string) error {
376 if !db.redisModules {
377 return errors.New("Redis deployment not supporting command DELMPUB")
379 command := make([]interface{}, 0)
380 command = append(command, "DELMPUB")
381 command = append(command, len(keys))
382 command = append(command, len(channelsAndEvents)/2)
383 for _, d := range keys {
384 command = append(command, d)
386 for _, d := range channelsAndEvents {
387 command = append(command, d)
389 _, err := db.client.Do(db.ctx, command...).Result()
394 func (db *DB) Del(keys []string) error {
395 _, err := db.client.Del(db.ctx, keys...).Result()
399 func (db *DB) Keys(pattern string) ([]string, error) {
400 return db.client.Keys(db.ctx, pattern).Result()
403 func (db *DB) SetIE(key string, oldData, newData interface{}) (bool, error) {
404 if !db.redisModules {
405 return false, errors.New("Redis deployment not supporting command")
408 return checkResultAndError(db.client.Do(db.ctx, "SETIE", key, newData, oldData).Result())
411 func (db *DB) SetIEPub(channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error) {
412 if !db.redisModules {
413 return false, errors.New("Redis deployment not supporting command SETIEMPUB")
415 capacity := 4 + len(channelsAndEvents)
416 command := make([]interface{}, 0, capacity)
417 command = append(command, "SETIEMPUB")
418 command = append(command, key)
419 command = append(command, newData)
420 command = append(command, oldData)
421 for _, ce := range channelsAndEvents {
422 command = append(command, ce)
424 return checkResultAndError(db.client.Do(db.ctx, command...).Result())
427 func (db *DB) SetNXPub(channelsAndEvents []string, key string, data interface{}) (bool, error) {
428 if !db.redisModules {
429 return false, errors.New("Redis deployment not supporting command SETNXMPUB")
431 capacity := 3 + len(channelsAndEvents)
432 command := make([]interface{}, 0, capacity)
433 command = append(command, "SETNXMPUB")
434 command = append(command, key)
435 command = append(command, data)
436 for _, ce := range channelsAndEvents {
437 command = append(command, ce)
439 return checkResultAndError(db.client.Do(db.ctx, command...).Result())
441 func (db *DB) SetNX(key string, data interface{}, expiration time.Duration) (bool, error) {
442 return db.client.SetNX(db.ctx, key, data, expiration).Result()
445 func (db *DB) DelIEPub(channelsAndEvents []string, key string, data interface{}) (bool, error) {
446 if !db.redisModules {
447 return false, errors.New("Redis deployment not supporting command DELIEMPUB")
449 capacity := 3 + len(channelsAndEvents)
450 command := make([]interface{}, 0, capacity)
451 command = append(command, "DELIEMPUB")
452 command = append(command, key)
453 command = append(command, data)
454 for _, ce := range channelsAndEvents {
455 command = append(command, ce)
457 return checkIntResultAndError(db.client.Do(db.ctx, command...).Result())
460 func (db *DB) DelIE(key string, data interface{}) (bool, error) {
461 if !db.redisModules {
462 return false, errors.New("Redis deployment not supporting command")
464 return checkIntResultAndError(db.client.Do(db.ctx, "DELIE", key, data).Result())
467 func (db *DB) SAdd(key string, data ...interface{}) error {
468 _, err := db.client.SAdd(db.ctx, key, data...).Result()
472 func (db *DB) SRem(key string, data ...interface{}) error {
473 _, err := db.client.SRem(db.ctx, key, data...).Result()
477 func (db *DB) SMembers(key string) ([]string, error) {
478 result, err := db.client.SMembers(db.ctx, key).Result()
482 func (db *DB) SIsMember(key string, data interface{}) (bool, error) {
483 result, err := db.client.SIsMember(db.ctx, key, data).Result()
487 func (db *DB) SCard(key string) (int64, error) {
488 result, err := db.client.SCard(db.ctx, key).Result()
492 func (db *DB) PTTL(key string) (time.Duration, error) {
493 result, err := db.client.PTTL(db.ctx, key).Result()
497 func (db *DB) Info() (*DbInfo, error) {
499 resultStr, err := db.client.Info(db.ctx, "all").Result()
504 result := strings.Split(strings.ReplaceAll(resultStr, "\r\n", "\n"), "\n")
505 readRedisInfoReplyFields(result, &info)
509 func lineContains(line, substr string) bool {
510 return strings.Contains(line, substr)
513 func getFieldValueStr(line, substr string) string {
514 if idx := strings.Index(line, substr); idx != -1 {
515 return line[idx+len(substr):]
520 func getUint32FromString(s string) uint32 {
521 if val, err := strconv.ParseUint(s, 10, 32); err == nil {
527 func getUint64FromString(s string) uint64 {
528 if val, err := strconv.ParseUint(s, 10, 64); err == nil {
534 func getFloatFromString(s string, bitSize int) float64 {
535 if val, err := strconv.ParseFloat(s, bitSize); err == nil {
541 func getFloat64FromString(s string) float64 {
542 return getFloatFromString(s, 64)
545 func getFloat32FromString(s string) float32 {
546 return float32(getFloatFromString(s, 32))
549 func getValueString(values string, key string) string {
550 slice := strings.Split(values, ",")
551 for _, s := range slice {
552 if lineContains(s, key) {
553 return getFieldValueStr(s, key)
559 func getCommandstatsValues(values string) (string, string, string) {
560 calls := getValueString(values, "calls=")
561 usec := getValueString(values, "usec=")
562 usecPerCall := getValueString(values, "usec_per_call=")
563 return calls, usec, usecPerCall
566 func updateCommandstatsValues(i interface{}, values string) {
567 stype := reflect.ValueOf(i).Elem()
568 callsStr, usecStr, usecPerCallStr := getCommandstatsValues(values)
570 callsField := stype.FieldByName("Calls")
571 callsField.Set(reflect.ValueOf(getUint32FromString(callsStr)))
573 usecField := stype.FieldByName("Usec")
574 usecField.Set(reflect.ValueOf(getUint32FromString(usecStr)))
576 usecPerCallField := stype.FieldByName("UsecPerCall")
577 usecPerCallField.Set(reflect.ValueOf(getFloat32FromString(usecPerCallStr)))
580 func getKeyspaceValues(values string) (string, string, string) {
581 keys := getValueString(values, "keys=")
582 expires := getValueString(values, "expires=")
583 avgttl := getValueString(values, "avg_ttl=")
584 return keys, expires, avgttl
587 func updateKeyspaceValues(i interface{}, values string) {
588 stype := reflect.ValueOf(i).Elem()
589 keysStr, expiresStr, avgttlStr := getKeyspaceValues(values)
591 keysField := stype.FieldByName("Keys")
592 keysField.Set(reflect.ValueOf(getUint32FromString(keysStr)))
594 expiresField := stype.FieldByName("Expires")
595 expiresField.Set(reflect.ValueOf(getUint32FromString(expiresStr)))
597 avgttlField := stype.FieldByName("AvgTtl")
598 avgttlField.Set(reflect.ValueOf(getUint32FromString(avgttlStr)))
601 func updateServerInfoFields(config ConfigInfo, info *DbInfo) {
602 if value, ok := config["uptime_in_days"]; ok {
603 info.Fields.Server.UptimeInDays = getUint32FromString(value)
607 func updateClientInfoFields(config ConfigInfo, info *DbInfo) {
608 if value, ok := config["connected_clients"]; ok {
609 info.Fields.Clients.ConnectedClients = getUint32FromString(value)
611 if value, ok := config["client_recent_max_input_buffer"]; ok {
612 info.Fields.Clients.ClientRecentMaxInputBuffer = getUint32FromString(value)
614 if value, ok := config["client_recent_max_output_buffer"]; ok {
615 info.Fields.Clients.ClientRecentMaxOutputBuffer = getUint32FromString(value)
619 func updateMemoryInfoFields(config ConfigInfo, info *DbInfo) {
620 if value, ok := config["used_memory"]; ok {
621 info.Fields.Memory.UsedMemory = getUint64FromString(value)
623 if value, ok := config["used_memory_human"]; ok {
624 info.Fields.Memory.UsedMemoryHuman = value
626 if value, ok := config["used_memory_rss"]; ok {
627 info.Fields.Memory.UsedMemoryRss = getUint64FromString(value)
629 if value, ok := config["used_memory_rss_human"]; ok {
630 info.Fields.Memory.UsedMemoryRssHuman = value
632 if value, ok := config["used_memory_peak"]; ok {
633 info.Fields.Memory.UsedMemoryPeak = getUint64FromString(value)
635 if value, ok := config["used_memory_peak_human"]; ok {
636 info.Fields.Memory.UsedMemoryPeakHuman = value
638 if value, ok := config["used_memory_peak_perc"]; ok {
639 info.Fields.Memory.UsedMemoryPeakPerc = value
641 if value, ok := config["mem_fragmentation_ratio"]; ok {
642 info.Fields.Memory.MemFragmentationRatio = getFloat32FromString(value)
644 if value, ok := config["mem_fragmentation_bytes"]; ok {
645 info.Fields.Memory.MemFragmentationBytes = getUint32FromString(value)
649 func updateStatsInfoFields(config ConfigInfo, info *DbInfo) {
650 if value, ok := config["total_connections_received"]; ok {
651 info.Fields.Stats.TotalConnectionsReceived = getUint32FromString(value)
653 if value, ok := config["total_commands_processed"]; ok {
654 info.Fields.Stats.TotalCommandsProcessed = getUint32FromString(value)
656 if value, ok := config["sync_full"]; ok {
657 info.Fields.Stats.SyncFull = getUint32FromString(value)
659 if value, ok := config["sync_partial_ok"]; ok {
660 info.Fields.Stats.SyncPartialOk = getUint32FromString(value)
662 if value, ok := config["sync_partial_err"]; ok {
663 info.Fields.Stats.SyncPartialErr = getUint32FromString(value)
665 if value, ok := config["pubsub_channels"]; ok {
666 info.Fields.Stats.PubsubChannels = getUint32FromString(value)
670 func updateCpuInfoFields(config ConfigInfo, info *DbInfo) {
671 if value, ok := config["used_cpu_sys"]; ok {
672 info.Fields.Cpu.UsedCpuSys = getFloat64FromString(value)
674 if value, ok := config["used_cpu_user"]; ok {
675 info.Fields.Cpu.UsedCpuUser = getFloat64FromString(value)
679 func updateCommandstatsInfoFields(config ConfigInfo, info *DbInfo) {
680 if values, ok := config["cmdstat_replconf"]; ok {
681 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatReplconf, values)
683 if values, ok := config["cmdstat_keys"]; ok {
684 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatKeys, values)
686 if values, ok := config["cmdstat_role"]; ok {
687 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatRole, values)
689 if values, ok := config["cmdstat_psync"]; ok {
690 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatPsync, values)
692 if values, ok := config["cmdstat_mset"]; ok {
693 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatMset, values)
695 if values, ok := config["cmdstat_publish"]; ok {
696 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatPublish, values)
698 if values, ok := config["cmdstat_info"]; ok {
699 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatInfo, values)
701 if values, ok := config["cmdstat_ping"]; ok {
702 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatPing, values)
704 if values, ok := config["cmdstat_client"]; ok {
705 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatClient, values)
707 if values, ok := config["cmdstat_command"]; ok {
708 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatCommand, values)
710 if values, ok := config["cmdstat_subscribe"]; ok {
711 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatSubscribe, values)
713 if values, ok := config["cmdstat_monitor"]; ok {
714 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatMonitor, values)
716 if values, ok := config["cmdstat_config"]; ok {
717 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatConfig, values)
719 if values, ok := config["cmdstat_slaveof"]; ok {
720 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatSlaveof, values)
724 func updateKeyspaceInfoFields(config ConfigInfo, info *DbInfo) {
725 if values, ok := config["db0"]; ok {
726 updateKeyspaceValues(&info.Fields.Keyspace.Db, values)
730 func getConfigInfo(input []string) ConfigInfo {
731 config := ConfigInfo{}
732 for _, line := range input {
733 if i := strings.Index(line, ":"); i != -1 {
734 if key := strings.TrimSpace(line[:i]); len(key) > 0 {
736 config[key] = strings.TrimSpace(line[i+1:])
744 func readRedisInfoReplyFields(input []string, info *DbInfo) {
745 config := getConfigInfo(input)
747 if value, ok := config["role"]; ok {
748 if "master" == value {
749 info.Fields.PrimaryRole = true
752 if value, ok := config["connected_slaves"]; ok {
753 info.Fields.ConnectedReplicaCnt = getUint32FromString(value)
755 updateServerInfoFields(config, info)
756 updateClientInfoFields(config, info)
757 updateMemoryInfoFields(config, info)
758 updateStatsInfoFields(config, info)
759 updateCpuInfoFields(config, info)
760 updateCommandstatsInfoFields(config, info)
761 updateKeyspaceInfoFields(config, info)
764 func (db *DB) State() (*DbState, error) {
765 dbState := new(DbState)
766 if db.cfg.sentinelPort != "" {
767 //Establish connection to Redis sentinel. The reason why connection is done
768 //here instead of time of the SDL instance creation is that for the time being
769 //sentinel connection is needed only here to get state information and
770 //state information is needed only by 'sdlcli' hence it is not time critical
771 //and also we want to avoid opening unnecessary TCP connections towards Redis
772 //sentinel for every SDL instance. Now it is done only when 'sdlcli' is used.
773 sentinelClient := db.sentinel(&db.cfg, db.addr)
774 return sentinelClient.GetDbState()
776 info, err := db.Info()
778 dbState.PrimaryDbState.Err = err
781 return db.fillDbStateFromDbInfo(info)
785 func (db *DB) fillDbStateFromDbInfo(info *DbInfo) (*DbState, error) {
787 if info.Fields.PrimaryRole == true {
789 PrimaryDbState: PrimaryDbState{
790 Fields: PrimaryDbStateFields{
800 cnt, err := strconv.Atoi(db.cfg.nodeCnt)
802 dbState.Err = fmt.Errorf("DBAAS_NODE_COUNT configuration value '%s' conversion to integer failed", db.cfg.nodeCnt)
804 dbState.ConfigNodeCnt = cnt
807 return &dbState, dbState.Err
810 func createReplicaDbClient(host string) *DB {
811 cfg := readConfig(osImpl{})
812 cfg.sentinelPort = ""
813 cfg.clusterAddrList, cfg.port, _ = net.SplitHostPort(host)
814 return createDbClient(cfg, cfg.clusterAddrList, newRedisClient, subscribeNotifications, nil)
817 func getStatisticsInfo(db *DB, host string) (*DbStatisticsInfo, error) {
818 dbStatisticsInfo := new(DbStatisticsInfo)
819 dbStatisticsInfo.IPAddr, dbStatisticsInfo.Port, _ = net.SplitHostPort(host)
821 info, err := db.Info()
825 dbStatisticsInfo.Info = info
827 return dbStatisticsInfo, nil
830 func sentinelStatistics(db *DB) (*DbStatistics, error) {
831 dbState := new(DbState)
832 dbStatistics := new(DbStatistics)
833 dbStatisticsInfo := new(DbStatisticsInfo)
836 dbState, err = db.State()
841 dbStatisticsInfo, err = getStatisticsInfo(db, dbState.PrimaryDbState.GetAddress())
842 dbStatistics.Stats = append(dbStatistics.Stats, dbStatisticsInfo)
844 if dbState.ReplicasDbState != nil {
845 for _, r := range dbState.ReplicasDbState.States {
846 replicaDb := createReplicaDbClient(r.GetAddress())
847 dbStatisticsInfo, err = getStatisticsInfo(replicaDb, r.GetAddress())
852 dbStatistics.Stats = append(dbStatistics.Stats, dbStatisticsInfo)
856 return dbStatistics, nil
859 func standaloneStatistics(db *DB) (*DbStatistics, error) {
860 dbStatistics := new(DbStatistics)
862 dbStatisticsInfo, err := getStatisticsInfo(db, net.JoinHostPort(db.cfg.hostname, db.cfg.port))
863 dbStatistics.Stats = append(dbStatistics.Stats, dbStatisticsInfo)
865 return dbStatistics, err
868 func (db *DB) Statistics() (*DbStatistics, error) {
869 if db.cfg.sentinelPort != "" {
870 return sentinelStatistics(db)
873 return standaloneStatistics(db)
876 var luaRefresh = redis.NewScript(`if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("pexpire", KEYS[1], ARGV[2]) else return 0 end`)
878 func (db *DB) PExpireIE(key string, data interface{}, expiration time.Duration) error {
879 expirationStr := strconv.FormatInt(int64(expiration/time.Millisecond), 10)
880 result, err := luaRefresh.Run(db.ctx, db.client, []string{key}, data, expirationStr).Result()
884 if result == int64(1) {
887 return errors.New("Lock not held")
890 func (sCbMap *sharedCbMap) Add(channel string, cb ChannelNotificationCb) {
892 defer sCbMap.m.Unlock()
893 sCbMap.cbMap[channel] = cb
896 func (sCbMap *sharedCbMap) Remove(channel string) {
898 defer sCbMap.m.Unlock()
899 delete(sCbMap.cbMap, channel)
902 func (sCbMap *sharedCbMap) Count() int {
904 defer sCbMap.m.Unlock()
905 return len(sCbMap.cbMap)
908 func (sCbMap *sharedCbMap) GetMapCopy() map[string]ChannelNotificationCb {
910 defer sCbMap.m.Unlock()
911 mapCopy := make(map[string]ChannelNotificationCb, 0)
912 for i, v := range sCbMap.cbMap {