2 Copyright (c) 2019 AT&T Intellectual Property.
3 Copyright (c) 2018-2022 Nokia.
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
9 http://www.apache.org/licenses/LICENSE-2.0
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
19 * This source code is part of the near-RT RIC (RAN Intelligent Controller)
20 * platform project (RICP).
29 "github.com/go-redis/redis/v8"
41 const EventSeparator = "___"
42 const NsSeparator = ","
44 type ChannelNotificationCb func(channel string, payload ...string)
45 type RedisClientCreator func(addr, port, clusterName string, isHa bool) RedisClient
47 type intChannels struct {
48 addChannel chan string
49 removeChannel chan string
53 type sharedCbMap struct {
55 cbMap map[string]ChannelNotificationCb
62 sentinelPorts []string
70 sentinel RedisSentinelCreateCb
82 type Subscriber interface {
83 Channel(opts ...redis.ChannelOption) <-chan *redis.Message
84 Subscribe(ctx context.Context, channels ...string) error
85 Unsubscribe(ctx context.Context, channels ...string) error
89 type SubscribeFn func(ctx context.Context, client RedisClient, channels ...string) Subscriber
91 type RedisClient interface {
92 Command(ctx context.Context) *redis.CommandsInfoCmd
94 Subscribe(ctx context.Context, channels ...string) *redis.PubSub
95 MSet(ctx context.Context, pairs ...interface{}) *redis.StatusCmd
96 Do(ctx context.Context, args ...interface{}) *redis.Cmd
97 MGet(ctx context.Context, keys ...string) *redis.SliceCmd
98 Del(ctx context.Context, keys ...string) *redis.IntCmd
99 Keys(ctx context.Context, pattern string) *redis.StringSliceCmd
100 SetNX(ctx context.Context, key string, value interface{}, expiration time.Duration) *redis.BoolCmd
101 SAdd(ctx context.Context, key string, members ...interface{}) *redis.IntCmd
102 SRem(ctx context.Context, key string, members ...interface{}) *redis.IntCmd
103 SMembers(ctx context.Context, key string) *redis.StringSliceCmd
104 SIsMember(ctx context.Context, key string, member interface{}) *redis.BoolCmd
105 SCard(ctx context.Context, key string) *redis.IntCmd
106 PTTL(ctx context.Context, key string) *redis.DurationCmd
107 Eval(ctx context.Context, script string, keys []string, args ...interface{}) *redis.Cmd
108 EvalSha(ctx context.Context, sha1 string, keys []string, args ...interface{}) *redis.Cmd
109 ScriptExists(ctx context.Context, scripts ...string) *redis.BoolSliceCmd
110 ScriptLoad(ctx context.Context, script string) *redis.StringCmd
111 Info(ctx context.Context, section ...string) *redis.StringCmd
118 log: log.New(os.Stdout, "database: ", log.LstdFlags|log.Lshortfile),
120 redis.SetLogger(dbLogger)
123 func SetDbLogger(out io.Writer) {
124 dbLogger.log.SetOutput(out)
127 func checkResultAndError(result interface{}, err error) (bool, error) {
129 if err == redis.Nil {
140 func checkIntResultAndError(result interface{}, err error) (bool, error) {
144 if n, ok := result.(int64); ok {
148 } else if n, ok := result.(int); ok {
156 func subscribeNotifications(ctx context.Context, client RedisClient, channels ...string) Subscriber {
157 return client.Subscribe(ctx, channels...)
160 func CreateDB(client RedisClient, subscribe SubscribeFn, sentinelCreateCb RedisSentinelCreateCb,
161 addr, port, sentinelPort, masterName, nodeCnt string) *DB {
163 ctx: context.Background(),
165 sentinel: sentinelCreateCb,
166 subscribe: subscribe,
168 sCbMap: &sharedCbMap{cbMap: make(map[string]ChannelNotificationCb, 0)},
170 addChannel: make(chan string),
171 removeChannel: make(chan string),
172 exit: make(chan bool),
175 sentinelPort: sentinelPort,
177 masterName: masterName,
184 func Create() []*DB {
186 return ReadConfigAndCreateDbClients(osimpl, newRedisClient, subscribeNotifications, newRedisSentinel)
189 func readConfig(osI OS) Config {
191 hostname: osI.Getenv("DBAAS_SERVICE_HOST", "localhost"),
192 ports: strings.Split(osI.Getenv("DBAAS_SERVICE_PORT", "6379"), ","),
193 nodeCnt: osI.Getenv("DBAAS_NODE_COUNT", "1"),
196 if addrStr := osI.Getenv("DBAAS_CLUSTER_ADDR_LIST", ""); addrStr != "" {
197 cfg.clusterAddrs = strings.Split(addrStr, ",")
198 } else if cfg.hostname != "" {
199 cfg.clusterAddrs = append(cfg.clusterAddrs, cfg.hostname)
201 if sntPortStr := osI.Getenv("DBAAS_SERVICE_SENTINEL_PORT", ""); sntPortStr != "" {
202 cfg.sentinelPorts = strings.Split(sntPortStr, ",")
204 if nameStr := osI.Getenv("DBAAS_MASTER_NAME", ""); nameStr != "" {
205 cfg.masterNames = strings.Split(nameStr, ",")
212 Getenv(key string, defValue string) string
217 func (osImpl) Getenv(key string, defValue string) string {
218 val := os.Getenv(key)
225 func completeConfig(cfg *Config) {
226 if len(cfg.sentinelPorts) == 0 {
227 if len(cfg.clusterAddrs) > len(cfg.ports) && len(cfg.ports) > 0 {
228 for i := len(cfg.ports); i < len(cfg.clusterAddrs); i++ {
229 cfg.ports = append(cfg.ports, cfg.ports[i-1])
233 if len(cfg.clusterAddrs) > len(cfg.sentinelPorts) {
234 for i := len(cfg.sentinelPorts); i < len(cfg.clusterAddrs); i++ {
235 cfg.sentinelPorts = append(cfg.sentinelPorts, cfg.sentinelPorts[i-1])
238 if len(cfg.clusterAddrs) > len(cfg.masterNames) && len(cfg.masterNames) > 0 {
239 for i := len(cfg.masterNames); i < len(cfg.clusterAddrs); i++ {
240 cfg.masterNames = append(cfg.masterNames, cfg.masterNames[i-1])
246 func ReadConfigAndCreateDbClients(osI OS, clientCreator RedisClientCreator,
247 subscribe SubscribeFn,
248 sentinelCreateCb RedisSentinelCreateCb) []*DB {
250 cfg := readConfig(osI)
251 for i, addr := range cfg.clusterAddrs {
252 port := getListItem(cfg.ports, i)
253 sntPort := getListItem(cfg.sentinelPorts, i)
254 name := getListItem(cfg.masterNames, i)
255 db := createDbClient(addr, port, sntPort, name, cfg.nodeCnt,
256 clientCreator, subscribe, sentinelCreateCb)
257 dbs = append(dbs, db)
262 func getListItem(list []string, index int) string {
263 if index < len(list) {
269 func createDbClient(addr, port, sentinelPort, masterName, nodeCnt string, clientCreator RedisClientCreator,
270 subscribe SubscribeFn,
271 sentinelCreateCb RedisSentinelCreateCb) *DB {
272 var client RedisClient
274 if sentinelPort == "" {
275 client = clientCreator(addr, port, "", false)
276 db = CreateDB(client, subscribe, nil, addr, port, sentinelPort, masterName, nodeCnt)
278 client = clientCreator(addr, sentinelPort, masterName, true)
279 db = CreateDB(client, subscribe, sentinelCreateCb, addr, port, sentinelPort, masterName, nodeCnt)
285 func newRedisClient(addr, port, clusterName string, isHa bool) RedisClient {
287 sentinelAddress := addr + ":" + port
288 return redis.NewFailoverClient(
289 &redis.FailoverOptions{
290 MasterName: clusterName,
291 SentinelAddrs: []string{sentinelAddress},
297 redisAddress := addr + ":" + port
298 return redis.NewClient(&redis.Options{
300 Password: "", // no password set
301 DB: 0, // use default DB
307 func (db *DB) CheckCommands() {
308 commands, err := db.client.Command(db.ctx).Result()
310 redisModuleCommands := []string{"setie", "delie", "setiepub", "setnxpub",
311 "msetmpub", "delmpub"}
312 for _, v := range redisModuleCommands {
315 db.redisModules = false
319 dbLogger.Printf(db.ctx, "SDL DB commands checking failure: %s\n", err)
323 func (db *DB) CloseDB() error {
324 return db.client.Close()
327 func (db *DB) UnsubscribeChannelDB(channels ...string) {
328 for _, v := range channels {
330 db.ch.removeChannel <- v
331 if db.sCbMap.Count() == 0 {
337 func (db *DB) SubscribeChannelDB(cb func(string, ...string), channels ...string) {
338 if db.sCbMap.Count() == 0 {
339 for _, v := range channels {
343 go func(sCbMap *sharedCbMap,
345 channels ...string) {
346 sub := db.subscribe(db.ctx, db.client, channels...)
347 rxChannel := sub.Channel()
348 lCbMap := sCbMap.GetMapCopy()
351 case msg := <-rxChannel:
352 cb, ok := lCbMap[msg.Channel]
354 nSChNames := strings.SplitAfterN(msg.Channel, NsSeparator, 2)
355 cb(nSChNames[1], strings.Split(msg.Payload, EventSeparator)...)
357 case channel := <-ch.addChannel:
358 lCbMap = sCbMap.GetMapCopy()
359 sub.Subscribe(db.ctx, channel)
360 case channel := <-ch.removeChannel:
361 lCbMap = sCbMap.GetMapCopy()
362 sub.Unsubscribe(db.ctx, channel)
363 case exit := <-ch.exit:
365 if err := sub.Close(); err != nil {
366 dbLogger.Printf(db.ctx, "SDL DB channel closing failure: %s\n", err)
372 }(db.sCbMap, db.ch, channels...)
375 for _, v := range channels {
377 db.ch.addChannel <- v
382 func (db *DB) MSet(pairs ...interface{}) error {
383 return db.client.MSet(db.ctx, pairs...).Err()
386 func (db *DB) MSetMPub(channelsAndEvents []string, pairs ...interface{}) error {
387 if !db.redisModules {
388 return errors.New("Redis deployment doesn't support MSETMPUB command")
390 command := make([]interface{}, 0)
391 command = append(command, "MSETMPUB")
392 command = append(command, len(pairs)/2)
393 command = append(command, len(channelsAndEvents)/2)
394 for _, d := range pairs {
395 command = append(command, d)
397 for _, d := range channelsAndEvents {
398 command = append(command, d)
400 _, err := db.client.Do(db.ctx, command...).Result()
404 func (db *DB) MGet(keys []string) ([]interface{}, error) {
405 return db.client.MGet(db.ctx, keys...).Result()
408 func (db *DB) DelMPub(channelsAndEvents []string, keys []string) error {
409 if !db.redisModules {
410 return errors.New("Redis deployment not supporting command DELMPUB")
412 command := make([]interface{}, 0)
413 command = append(command, "DELMPUB")
414 command = append(command, len(keys))
415 command = append(command, len(channelsAndEvents)/2)
416 for _, d := range keys {
417 command = append(command, d)
419 for _, d := range channelsAndEvents {
420 command = append(command, d)
422 _, err := db.client.Do(db.ctx, command...).Result()
427 func (db *DB) Del(keys []string) error {
428 _, err := db.client.Del(db.ctx, keys...).Result()
432 func (db *DB) Keys(pattern string) ([]string, error) {
433 return db.client.Keys(db.ctx, pattern).Result()
436 func (db *DB) SetIE(key string, oldData, newData interface{}) (bool, error) {
437 if !db.redisModules {
438 return false, errors.New("Redis deployment not supporting command")
441 return checkResultAndError(db.client.Do(db.ctx, "SETIE", key, newData, oldData).Result())
444 func (db *DB) SetIEPub(channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error) {
445 if !db.redisModules {
446 return false, errors.New("Redis deployment not supporting command SETIEMPUB")
448 capacity := 4 + len(channelsAndEvents)
449 command := make([]interface{}, 0, capacity)
450 command = append(command, "SETIEMPUB")
451 command = append(command, key)
452 command = append(command, newData)
453 command = append(command, oldData)
454 for _, ce := range channelsAndEvents {
455 command = append(command, ce)
457 return checkResultAndError(db.client.Do(db.ctx, command...).Result())
460 func (db *DB) SetNXPub(channelsAndEvents []string, key string, data interface{}) (bool, error) {
461 if !db.redisModules {
462 return false, errors.New("Redis deployment not supporting command SETNXMPUB")
464 capacity := 3 + len(channelsAndEvents)
465 command := make([]interface{}, 0, capacity)
466 command = append(command, "SETNXMPUB")
467 command = append(command, key)
468 command = append(command, data)
469 for _, ce := range channelsAndEvents {
470 command = append(command, ce)
472 return checkResultAndError(db.client.Do(db.ctx, command...).Result())
474 func (db *DB) SetNX(key string, data interface{}, expiration time.Duration) (bool, error) {
475 return db.client.SetNX(db.ctx, key, data, expiration).Result()
478 func (db *DB) DelIEPub(channelsAndEvents []string, key string, data interface{}) (bool, error) {
479 if !db.redisModules {
480 return false, errors.New("Redis deployment not supporting command DELIEMPUB")
482 capacity := 3 + len(channelsAndEvents)
483 command := make([]interface{}, 0, capacity)
484 command = append(command, "DELIEMPUB")
485 command = append(command, key)
486 command = append(command, data)
487 for _, ce := range channelsAndEvents {
488 command = append(command, ce)
490 return checkIntResultAndError(db.client.Do(db.ctx, command...).Result())
493 func (db *DB) DelIE(key string, data interface{}) (bool, error) {
494 if !db.redisModules {
495 return false, errors.New("Redis deployment not supporting command")
497 return checkIntResultAndError(db.client.Do(db.ctx, "DELIE", key, data).Result())
500 func (db *DB) SAdd(key string, data ...interface{}) error {
501 _, err := db.client.SAdd(db.ctx, key, data...).Result()
505 func (db *DB) SRem(key string, data ...interface{}) error {
506 _, err := db.client.SRem(db.ctx, key, data...).Result()
510 func (db *DB) SMembers(key string) ([]string, error) {
511 result, err := db.client.SMembers(db.ctx, key).Result()
515 func (db *DB) SIsMember(key string, data interface{}) (bool, error) {
516 result, err := db.client.SIsMember(db.ctx, key, data).Result()
520 func (db *DB) SCard(key string) (int64, error) {
521 result, err := db.client.SCard(db.ctx, key).Result()
525 func (db *DB) PTTL(key string) (time.Duration, error) {
526 result, err := db.client.PTTL(db.ctx, key).Result()
530 func (db *DB) Info() (*DbInfo, error) {
532 resultStr, err := db.client.Info(db.ctx, "all").Result()
537 result := strings.Split(strings.ReplaceAll(resultStr, "\r\n", "\n"), "\n")
538 readRedisInfoReplyFields(result, &info)
542 func lineContains(line, substr string) bool {
543 return strings.Contains(line, substr)
546 func getFieldValueStr(line, substr string) string {
547 if idx := strings.Index(line, substr); idx != -1 {
548 return line[idx+len(substr):]
553 func getUint32FromString(s string) uint32 {
554 if val, err := strconv.ParseUint(s, 10, 32); err == nil {
560 func getUint64FromString(s string) uint64 {
561 if val, err := strconv.ParseUint(s, 10, 64); err == nil {
567 func getFloatFromString(s string, bitSize int) float64 {
568 if val, err := strconv.ParseFloat(s, bitSize); err == nil {
574 func getFloat64FromString(s string) float64 {
575 return getFloatFromString(s, 64)
578 func getFloat32FromString(s string) float32 {
579 return float32(getFloatFromString(s, 32))
582 func getValueString(values string, key string) string {
583 slice := strings.Split(values, ",")
584 for _, s := range slice {
585 if lineContains(s, key) {
586 return getFieldValueStr(s, key)
592 func getCommandstatsValues(values string) (string, string, string) {
593 calls := getValueString(values, "calls=")
594 usec := getValueString(values, "usec=")
595 usecPerCall := getValueString(values, "usec_per_call=")
596 return calls, usec, usecPerCall
599 func updateCommandstatsValues(i interface{}, values string) {
600 stype := reflect.ValueOf(i).Elem()
601 callsStr, usecStr, usecPerCallStr := getCommandstatsValues(values)
603 callsField := stype.FieldByName("Calls")
604 callsField.Set(reflect.ValueOf(getUint32FromString(callsStr)))
606 usecField := stype.FieldByName("Usec")
607 usecField.Set(reflect.ValueOf(getUint32FromString(usecStr)))
609 usecPerCallField := stype.FieldByName("UsecPerCall")
610 usecPerCallField.Set(reflect.ValueOf(getFloat32FromString(usecPerCallStr)))
613 func getKeyspaceValues(values string) (string, string, string) {
614 keys := getValueString(values, "keys=")
615 expires := getValueString(values, "expires=")
616 avgttl := getValueString(values, "avg_ttl=")
617 return keys, expires, avgttl
620 func updateKeyspaceValues(i interface{}, values string) {
621 stype := reflect.ValueOf(i).Elem()
622 keysStr, expiresStr, avgttlStr := getKeyspaceValues(values)
624 keysField := stype.FieldByName("Keys")
625 keysField.Set(reflect.ValueOf(getUint32FromString(keysStr)))
627 expiresField := stype.FieldByName("Expires")
628 expiresField.Set(reflect.ValueOf(getUint32FromString(expiresStr)))
630 avgttlField := stype.FieldByName("AvgTtl")
631 avgttlField.Set(reflect.ValueOf(getUint32FromString(avgttlStr)))
634 func updateServerInfoFields(config ConfigInfo, info *DbInfo) {
635 if value, ok := config["uptime_in_days"]; ok {
636 info.Fields.Server.UptimeInDays = getUint32FromString(value)
640 func updateClientInfoFields(config ConfigInfo, info *DbInfo) {
641 if value, ok := config["connected_clients"]; ok {
642 info.Fields.Clients.ConnectedClients = getUint32FromString(value)
644 if value, ok := config["client_recent_max_input_buffer"]; ok {
645 info.Fields.Clients.ClientRecentMaxInputBuffer = getUint32FromString(value)
647 if value, ok := config["client_recent_max_output_buffer"]; ok {
648 info.Fields.Clients.ClientRecentMaxOutputBuffer = getUint32FromString(value)
652 func updateMemoryInfoFields(config ConfigInfo, info *DbInfo) {
653 if value, ok := config["used_memory"]; ok {
654 info.Fields.Memory.UsedMemory = getUint64FromString(value)
656 if value, ok := config["used_memory_human"]; ok {
657 info.Fields.Memory.UsedMemoryHuman = value
659 if value, ok := config["used_memory_rss"]; ok {
660 info.Fields.Memory.UsedMemoryRss = getUint64FromString(value)
662 if value, ok := config["used_memory_rss_human"]; ok {
663 info.Fields.Memory.UsedMemoryRssHuman = value
665 if value, ok := config["used_memory_peak"]; ok {
666 info.Fields.Memory.UsedMemoryPeak = getUint64FromString(value)
668 if value, ok := config["used_memory_peak_human"]; ok {
669 info.Fields.Memory.UsedMemoryPeakHuman = value
671 if value, ok := config["used_memory_peak_perc"]; ok {
672 info.Fields.Memory.UsedMemoryPeakPerc = value
674 if value, ok := config["mem_fragmentation_ratio"]; ok {
675 info.Fields.Memory.MemFragmentationRatio = getFloat32FromString(value)
677 if value, ok := config["mem_fragmentation_bytes"]; ok {
678 info.Fields.Memory.MemFragmentationBytes = getUint32FromString(value)
682 func updateStatsInfoFields(config ConfigInfo, info *DbInfo) {
683 if value, ok := config["total_connections_received"]; ok {
684 info.Fields.Stats.TotalConnectionsReceived = getUint32FromString(value)
686 if value, ok := config["total_commands_processed"]; ok {
687 info.Fields.Stats.TotalCommandsProcessed = getUint32FromString(value)
689 if value, ok := config["sync_full"]; ok {
690 info.Fields.Stats.SyncFull = getUint32FromString(value)
692 if value, ok := config["sync_partial_ok"]; ok {
693 info.Fields.Stats.SyncPartialOk = getUint32FromString(value)
695 if value, ok := config["sync_partial_err"]; ok {
696 info.Fields.Stats.SyncPartialErr = getUint32FromString(value)
698 if value, ok := config["pubsub_channels"]; ok {
699 info.Fields.Stats.PubsubChannels = getUint32FromString(value)
703 func updateCpuInfoFields(config ConfigInfo, info *DbInfo) {
704 if value, ok := config["used_cpu_sys"]; ok {
705 info.Fields.Cpu.UsedCpuSys = getFloat64FromString(value)
707 if value, ok := config["used_cpu_user"]; ok {
708 info.Fields.Cpu.UsedCpuUser = getFloat64FromString(value)
712 func updateCommandstatsInfoFields(config ConfigInfo, info *DbInfo) {
713 if values, ok := config["cmdstat_replconf"]; ok {
714 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatReplconf, values)
716 if values, ok := config["cmdstat_keys"]; ok {
717 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatKeys, values)
719 if values, ok := config["cmdstat_role"]; ok {
720 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatRole, values)
722 if values, ok := config["cmdstat_psync"]; ok {
723 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatPsync, values)
725 if values, ok := config["cmdstat_mset"]; ok {
726 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatMset, values)
728 if values, ok := config["cmdstat_publish"]; ok {
729 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatPublish, values)
731 if values, ok := config["cmdstat_info"]; ok {
732 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatInfo, values)
734 if values, ok := config["cmdstat_ping"]; ok {
735 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatPing, values)
737 if values, ok := config["cmdstat_client"]; ok {
738 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatClient, values)
740 if values, ok := config["cmdstat_command"]; ok {
741 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatCommand, values)
743 if values, ok := config["cmdstat_subscribe"]; ok {
744 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatSubscribe, values)
746 if values, ok := config["cmdstat_monitor"]; ok {
747 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatMonitor, values)
749 if values, ok := config["cmdstat_config"]; ok {
750 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatConfig, values)
752 if values, ok := config["cmdstat_slaveof"]; ok {
753 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatSlaveof, values)
757 func updateKeyspaceInfoFields(config ConfigInfo, info *DbInfo) {
758 if values, ok := config["db0"]; ok {
759 updateKeyspaceValues(&info.Fields.Keyspace.Db, values)
763 func getConfigInfo(input []string) ConfigInfo {
764 config := ConfigInfo{}
765 for _, line := range input {
766 if i := strings.Index(line, ":"); i != -1 {
767 if key := strings.TrimSpace(line[:i]); len(key) > 0 {
769 config[key] = strings.TrimSpace(line[i+1:])
777 func readRedisInfoReplyFields(input []string, info *DbInfo) {
778 config := getConfigInfo(input)
780 if value, ok := config["role"]; ok {
781 if "master" == value {
782 info.Fields.PrimaryRole = true
785 if value, ok := config["connected_slaves"]; ok {
786 info.Fields.ConnectedReplicaCnt = getUint32FromString(value)
788 updateServerInfoFields(config, info)
789 updateClientInfoFields(config, info)
790 updateMemoryInfoFields(config, info)
791 updateStatsInfoFields(config, info)
792 updateCpuInfoFields(config, info)
793 updateCommandstatsInfoFields(config, info)
794 updateKeyspaceInfoFields(config, info)
797 func (db *DB) State() (*DbState, error) {
798 dbState := new(DbState)
799 if db.sentinelPort != "" {
800 //Establish connection to Redis sentinel. The reason why connection is done
801 //here instead of time of the SDL instance creation is that for the time being
802 //sentinel connection is needed only here to get state information and
803 //state information is needed only by 'sdlcli' hence it is not time critical
804 //and also we want to avoid opening unnecessary TCP connections towards Redis
805 //sentinel for every SDL instance. Now it is done only when 'sdlcli' is used.
806 sentinelClient := db.sentinel(db.addr, db.sentinelPort, db.masterName, db.nodeCnt)
807 return sentinelClient.GetDbState()
809 info, err := db.Info()
811 dbState.PrimaryDbState.Err = err
814 return db.fillDbStateFromDbInfo(info)
818 func (db *DB) fillDbStateFromDbInfo(info *DbInfo) (*DbState, error) {
820 if info.Fields.PrimaryRole == true {
822 PrimaryDbState: PrimaryDbState{
823 Fields: PrimaryDbStateFields{
833 cnt, err := strconv.Atoi(db.nodeCnt)
835 dbState.Err = fmt.Errorf("DBAAS_NODE_COUNT configuration value '%s' conversion to integer failed", db.nodeCnt)
837 dbState.ConfigNodeCnt = cnt
840 return &dbState, dbState.Err
843 func createReplicaDbClient(host string) *DB {
844 addr, port, _ := net.SplitHostPort(host)
845 return createDbClient(addr, port, "", "", "", newRedisClient, subscribeNotifications, nil)
848 func getStatisticsInfo(db *DB, host string) (*DbStatisticsInfo, error) {
849 dbStatisticsInfo := new(DbStatisticsInfo)
850 dbStatisticsInfo.IPAddr, dbStatisticsInfo.Port, _ = net.SplitHostPort(host)
852 info, err := db.Info()
856 dbStatisticsInfo.Info = info
858 return dbStatisticsInfo, nil
861 func sentinelStatistics(db *DB) (*DbStatistics, error) {
862 dbState := new(DbState)
863 dbStatistics := new(DbStatistics)
864 dbStatisticsInfo := new(DbStatisticsInfo)
867 dbState, err = db.State()
872 dbStatisticsInfo, err = getStatisticsInfo(db, dbState.PrimaryDbState.GetAddress())
873 dbStatistics.Stats = append(dbStatistics.Stats, dbStatisticsInfo)
875 if dbState.ReplicasDbState != nil {
876 for _, r := range dbState.ReplicasDbState.States {
877 replicaDb := createReplicaDbClient(r.GetAddress())
878 dbStatisticsInfo, err = getStatisticsInfo(replicaDb, r.GetAddress())
883 dbStatistics.Stats = append(dbStatistics.Stats, dbStatisticsInfo)
887 return dbStatistics, nil
890 func standaloneStatistics(db *DB) (*DbStatistics, error) {
891 dbStatistics := new(DbStatistics)
893 dbStatisticsInfo, err := getStatisticsInfo(db, net.JoinHostPort(db.addr, db.port))
894 dbStatistics.Stats = append(dbStatistics.Stats, dbStatisticsInfo)
896 return dbStatistics, err
899 func (db *DB) Statistics() (*DbStatistics, error) {
900 if db.sentinelPort != "" {
901 return sentinelStatistics(db)
904 return standaloneStatistics(db)
907 var luaRefresh = redis.NewScript(`if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("pexpire", KEYS[1], ARGV[2]) else return 0 end`)
909 func (db *DB) PExpireIE(key string, data interface{}, expiration time.Duration) error {
910 expirationStr := strconv.FormatInt(int64(expiration/time.Millisecond), 10)
911 result, err := luaRefresh.Run(db.ctx, db.client, []string{key}, data, expirationStr).Result()
915 if result == int64(1) {
918 return errors.New("Lock not held")
921 func (sCbMap *sharedCbMap) Add(channel string, cb ChannelNotificationCb) {
923 defer sCbMap.m.Unlock()
924 sCbMap.cbMap[channel] = cb
927 func (sCbMap *sharedCbMap) Remove(channel string) {
929 defer sCbMap.m.Unlock()
930 delete(sCbMap.cbMap, channel)
933 func (sCbMap *sharedCbMap) Count() int {
935 defer sCbMap.m.Unlock()
936 return len(sCbMap.cbMap)
939 func (sCbMap *sharedCbMap) GetMapCopy() map[string]ChannelNotificationCb {
941 defer sCbMap.m.Unlock()
942 mapCopy := make(map[string]ChannelNotificationCb, 0)
943 for i, v := range sCbMap.cbMap {