2 Copyright (c) 2019 AT&T Intellectual Property.
3 Copyright (c) 2018-2022 Nokia.
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
9 http://www.apache.org/licenses/LICENSE-2.0
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
19 * This source code is part of the near-RT RIC (RAN Intelligent Controller)
20 * platform project (RICP).
29 "github.com/go-redis/redis/v8"
41 const EventSeparator = "___"
42 const NsSeparator = ","
44 type ChannelNotificationCb func(channel string, payload ...string)
45 type RedisClientCreator func(addr, port, clusterName string, isHa bool) RedisClient
47 type intChannels struct {
48 addChannel chan string
49 removeChannel chan string
53 type sharedCbMap struct {
55 cbMap map[string]ChannelNotificationCb
62 sentinelPorts []string
70 sentinel RedisSentinelCreateCb
82 type Subscriber interface {
83 Channel(opts ...redis.ChannelOption) <-chan *redis.Message
84 Subscribe(ctx context.Context, channels ...string) error
85 Unsubscribe(ctx context.Context, channels ...string) error
89 type SubscribeFn func(ctx context.Context, client RedisClient, channels ...string) Subscriber
91 type RedisClient interface {
92 Command(ctx context.Context) *redis.CommandsInfoCmd
94 Subscribe(ctx context.Context, channels ...string) *redis.PubSub
95 MSet(ctx context.Context, pairs ...interface{}) *redis.StatusCmd
96 Do(ctx context.Context, args ...interface{}) *redis.Cmd
97 MGet(ctx context.Context, keys ...string) *redis.SliceCmd
98 Del(ctx context.Context, keys ...string) *redis.IntCmd
99 Keys(ctx context.Context, pattern string) *redis.StringSliceCmd
100 SetNX(ctx context.Context, key string, value interface{}, expiration time.Duration) *redis.BoolCmd
101 SAdd(ctx context.Context, key string, members ...interface{}) *redis.IntCmd
102 SRem(ctx context.Context, key string, members ...interface{}) *redis.IntCmd
103 SMembers(ctx context.Context, key string) *redis.StringSliceCmd
104 SIsMember(ctx context.Context, key string, member interface{}) *redis.BoolCmd
105 SCard(ctx context.Context, key string) *redis.IntCmd
106 PTTL(ctx context.Context, key string) *redis.DurationCmd
107 Eval(ctx context.Context, script string, keys []string, args ...interface{}) *redis.Cmd
108 EvalSha(ctx context.Context, sha1 string, keys []string, args ...interface{}) *redis.Cmd
109 ScriptExists(ctx context.Context, scripts ...string) *redis.BoolSliceCmd
110 ScriptLoad(ctx context.Context, script string) *redis.StringCmd
111 Info(ctx context.Context, section ...string) *redis.StringCmd
118 log: log.New(os.Stdout, "database: ", log.LstdFlags|log.Lshortfile),
120 redis.SetLogger(dbLogger)
123 func SetDbLogger(out io.Writer) {
124 dbLogger.log.SetOutput(out)
127 func checkResultAndError(result interface{}, err error) (bool, error) {
129 if err == redis.Nil {
140 func checkIntResultAndError(result interface{}, err error) (bool, error) {
144 if n, ok := result.(int64); ok {
148 } else if n, ok := result.(int); ok {
156 func subscribeNotifications(ctx context.Context, client RedisClient, channels ...string) Subscriber {
157 return client.Subscribe(ctx, channels...)
160 func CreateDB(client RedisClient, subscribe SubscribeFn, sentinelCreateCb RedisSentinelCreateCb,
161 addr, port, sentinelPort, masterName, nodeCnt string) *DB {
163 ctx: context.Background(),
165 sentinel: sentinelCreateCb,
166 subscribe: subscribe,
168 sCbMap: &sharedCbMap{cbMap: make(map[string]ChannelNotificationCb, 0)},
170 addChannel: make(chan string),
171 removeChannel: make(chan string),
172 exit: make(chan bool),
175 sentinelPort: sentinelPort,
177 masterName: masterName,
184 func Create() []*DB {
186 return ReadConfigAndCreateDbClients(osimpl, newRedisClient, subscribeNotifications, newRedisSentinel)
189 func readConfig(osI OS) Config {
191 hostname: osI.Getenv("DBAAS_SERVICE_HOST", "localhost"),
192 ports: strings.Split(osI.Getenv("DBAAS_SERVICE_PORT", "6379"), ","),
193 nodeCnt: osI.Getenv("DBAAS_NODE_COUNT", "1"),
196 if addrStr := osI.Getenv("DBAAS_CLUSTER_ADDR_LIST", ""); addrStr != "" {
197 cfg.clusterAddrs = strings.Split(addrStr, ",")
198 } else if cfg.hostname != "" {
199 cfg.clusterAddrs = append(cfg.clusterAddrs, cfg.hostname)
201 if sntPortStr := osI.Getenv("DBAAS_SERVICE_SENTINEL_PORT", ""); sntPortStr != "" {
202 cfg.sentinelPorts = strings.Split(sntPortStr, ",")
204 if nameStr := osI.Getenv("DBAAS_MASTER_NAME", ""); nameStr != "" {
205 cfg.masterNames = strings.Split(nameStr, ",")
212 Getenv(key string, defValue string) string
217 func (osImpl) Getenv(key string, defValue string) string {
218 val := os.Getenv(key)
225 func completeConfig(cfg *Config) {
226 if len(cfg.sentinelPorts) == 0 {
227 if len(cfg.clusterAddrs) > len(cfg.ports) && len(cfg.ports) > 0 {
228 for i := len(cfg.ports); i < len(cfg.clusterAddrs); i++ {
229 cfg.ports = append(cfg.ports, cfg.ports[i-1])
233 if len(cfg.clusterAddrs) > len(cfg.sentinelPorts) {
234 for i := len(cfg.sentinelPorts); i < len(cfg.clusterAddrs); i++ {
235 cfg.sentinelPorts = append(cfg.sentinelPorts, cfg.sentinelPorts[i-1])
238 if len(cfg.clusterAddrs) > len(cfg.masterNames) && len(cfg.masterNames) > 0 {
239 for i := len(cfg.masterNames); i < len(cfg.clusterAddrs); i++ {
240 cfg.masterNames = append(cfg.masterNames, cfg.masterNames[i-1])
246 func ReadConfigAndCreateDbClients(osI OS, clientCreator RedisClientCreator,
247 subscribe SubscribeFn,
248 sentinelCreateCb RedisSentinelCreateCb) []*DB {
250 cfg := readConfig(osI)
251 for i, addr := range cfg.clusterAddrs {
252 port := getListItem(cfg.ports, i)
253 sntPort := getListItem(cfg.sentinelPorts, i)
254 name := getListItem(cfg.masterNames, i)
255 db := createDbClient(addr, port, sntPort, name, cfg.nodeCnt,
256 clientCreator, subscribe, sentinelCreateCb)
257 dbs = append(dbs, db)
262 func getListItem(list []string, index int) string {
263 if index < len(list) {
269 func createDbClient(addr, port, sentinelPort, masterName, nodeCnt string, clientCreator RedisClientCreator,
270 subscribe SubscribeFn,
271 sentinelCreateCb RedisSentinelCreateCb) *DB {
272 var client RedisClient
274 if sentinelPort == "" {
275 client = clientCreator(addr, port, "", false)
276 db = CreateDB(client, subscribe, nil, addr, port, sentinelPort, masterName, nodeCnt)
278 client = clientCreator(addr, sentinelPort, masterName, true)
279 db = CreateDB(client, subscribe, sentinelCreateCb, addr, port, sentinelPort, masterName, nodeCnt)
285 func newRedisClient(addr, port, clusterName string, isHa bool) RedisClient {
287 sentinelAddress := addr + ":" + port
288 return redis.NewFailoverClient(
289 &redis.FailoverOptions{
290 MasterName: clusterName,
291 SentinelAddrs: []string{sentinelAddress},
297 redisAddress := addr + ":" + port
298 return redis.NewClient(&redis.Options{
300 Password: "", // no password set
301 DB: 0, // use default DB
307 func (db *DB) CheckCommands() {
308 commands, err := db.client.Command(db.ctx).Result()
310 redisModuleCommands := []string{"setie", "delie", "setiepub", "setnxpub",
311 "msetmpub", "delmpub"}
312 for _, v := range redisModuleCommands {
315 db.redisModules = false
319 dbLogger.Printf(db.ctx, "SDL DB commands checking failure: %s\n", err)
323 func (db *DB) CloseDB() error {
324 return db.client.Close()
327 func (db *DB) UnsubscribeChannelDB(channels ...string) error {
328 for _, v := range channels {
330 db.ch.removeChannel <- v
331 errStr := <-db.ch.removeChannel
333 return fmt.Errorf("SDL Unsubscribe of channel %s failed: %s", v, errStr)
335 if db.sCbMap.Count() == 0 {
342 func (db *DB) SubscribeChannelDB(cb func(string, ...string), channels ...string) error {
343 if db.sCbMap.Count() == 0 {
344 go func(sCbMap *sharedCbMap, ch intChannels) {
345 sub := db.subscribe(db.ctx, db.client, "")
346 rxChannel := sub.Channel()
347 lCbMap := sCbMap.GetMapCopy()
350 case msg := <-rxChannel:
351 cb, ok := lCbMap[msg.Channel]
353 nSChNames := strings.SplitAfterN(msg.Channel, NsSeparator, 2)
354 cb(nSChNames[1], strings.Split(msg.Payload, EventSeparator)...)
356 case channel := <-ch.addChannel:
357 lCbMap = sCbMap.GetMapCopy()
358 if err := sub.Subscribe(db.ctx, channel); err != nil {
359 ch.addChannel <- err.Error()
363 case channel := <-ch.removeChannel:
364 lCbMap = sCbMap.GetMapCopy()
365 if err := sub.Unsubscribe(db.ctx, channel); err != nil {
366 ch.removeChannel <- err.Error()
368 ch.removeChannel <- ""
370 case exit := <-ch.exit:
372 if err := sub.Close(); err != nil {
373 dbLogger.Printf(db.ctx, "SDL DB channel closing failure: %s\n", err)
381 for _, v := range channels {
383 db.ch.addChannel <- v
384 errStr := <-db.ch.addChannel
386 return fmt.Errorf("SDL Subscribe of channel %s failed: %s", v, errStr)
392 func (db *DB) MSet(pairs ...interface{}) error {
393 return db.client.MSet(db.ctx, pairs...).Err()
396 func (db *DB) MSetMPub(channelsAndEvents []string, pairs ...interface{}) error {
397 if !db.redisModules {
398 return errors.New("Redis deployment doesn't support MSETMPUB command")
400 command := make([]interface{}, 0)
401 command = append(command, "MSETMPUB")
402 command = append(command, len(pairs)/2)
403 command = append(command, len(channelsAndEvents)/2)
404 for _, d := range pairs {
405 command = append(command, d)
407 for _, d := range channelsAndEvents {
408 command = append(command, d)
410 _, err := db.client.Do(db.ctx, command...).Result()
414 func (db *DB) MGet(keys []string) ([]interface{}, error) {
415 return db.client.MGet(db.ctx, keys...).Result()
418 func (db *DB) DelMPub(channelsAndEvents []string, keys []string) error {
419 if !db.redisModules {
420 return errors.New("Redis deployment not supporting command DELMPUB")
422 command := make([]interface{}, 0)
423 command = append(command, "DELMPUB")
424 command = append(command, len(keys))
425 command = append(command, len(channelsAndEvents)/2)
426 for _, d := range keys {
427 command = append(command, d)
429 for _, d := range channelsAndEvents {
430 command = append(command, d)
432 _, err := db.client.Do(db.ctx, command...).Result()
437 func (db *DB) Del(keys []string) error {
438 _, err := db.client.Del(db.ctx, keys...).Result()
442 func (db *DB) Keys(pattern string) ([]string, error) {
443 return db.client.Keys(db.ctx, pattern).Result()
446 func (db *DB) SetIE(key string, oldData, newData interface{}) (bool, error) {
447 if !db.redisModules {
448 return false, errors.New("Redis deployment not supporting command")
451 return checkResultAndError(db.client.Do(db.ctx, "SETIE", key, newData, oldData).Result())
454 func (db *DB) SetIEPub(channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error) {
455 if !db.redisModules {
456 return false, errors.New("Redis deployment not supporting command SETIEMPUB")
458 capacity := 4 + len(channelsAndEvents)
459 command := make([]interface{}, 0, capacity)
460 command = append(command, "SETIEMPUB")
461 command = append(command, key)
462 command = append(command, newData)
463 command = append(command, oldData)
464 for _, ce := range channelsAndEvents {
465 command = append(command, ce)
467 return checkResultAndError(db.client.Do(db.ctx, command...).Result())
470 func (db *DB) SetNXPub(channelsAndEvents []string, key string, data interface{}) (bool, error) {
471 if !db.redisModules {
472 return false, errors.New("Redis deployment not supporting command SETNXMPUB")
474 capacity := 3 + len(channelsAndEvents)
475 command := make([]interface{}, 0, capacity)
476 command = append(command, "SETNXMPUB")
477 command = append(command, key)
478 command = append(command, data)
479 for _, ce := range channelsAndEvents {
480 command = append(command, ce)
482 return checkResultAndError(db.client.Do(db.ctx, command...).Result())
484 func (db *DB) SetNX(key string, data interface{}, expiration time.Duration) (bool, error) {
485 return db.client.SetNX(db.ctx, key, data, expiration).Result()
488 func (db *DB) DelIEPub(channelsAndEvents []string, key string, data interface{}) (bool, error) {
489 if !db.redisModules {
490 return false, errors.New("Redis deployment not supporting command DELIEMPUB")
492 capacity := 3 + len(channelsAndEvents)
493 command := make([]interface{}, 0, capacity)
494 command = append(command, "DELIEMPUB")
495 command = append(command, key)
496 command = append(command, data)
497 for _, ce := range channelsAndEvents {
498 command = append(command, ce)
500 return checkIntResultAndError(db.client.Do(db.ctx, command...).Result())
503 func (db *DB) DelIE(key string, data interface{}) (bool, error) {
504 if !db.redisModules {
505 return false, errors.New("Redis deployment not supporting command")
507 return checkIntResultAndError(db.client.Do(db.ctx, "DELIE", key, data).Result())
510 func (db *DB) SAdd(key string, data ...interface{}) error {
511 _, err := db.client.SAdd(db.ctx, key, data...).Result()
515 func (db *DB) SRem(key string, data ...interface{}) error {
516 _, err := db.client.SRem(db.ctx, key, data...).Result()
520 func (db *DB) SMembers(key string) ([]string, error) {
521 result, err := db.client.SMembers(db.ctx, key).Result()
525 func (db *DB) SIsMember(key string, data interface{}) (bool, error) {
526 result, err := db.client.SIsMember(db.ctx, key, data).Result()
530 func (db *DB) SCard(key string) (int64, error) {
531 result, err := db.client.SCard(db.ctx, key).Result()
535 func (db *DB) PTTL(key string) (time.Duration, error) {
536 result, err := db.client.PTTL(db.ctx, key).Result()
540 func (db *DB) Info() (*DbInfo, error) {
542 resultStr, err := db.client.Info(db.ctx, "all").Result()
547 result := strings.Split(strings.ReplaceAll(resultStr, "\r\n", "\n"), "\n")
548 readRedisInfoReplyFields(result, &info)
552 func lineContains(line, substr string) bool {
553 return strings.Contains(line, substr)
556 func getFieldValueStr(line, substr string) string {
557 if idx := strings.Index(line, substr); idx != -1 {
558 return line[idx+len(substr):]
563 func getUint32FromString(s string) uint32 {
564 if val, err := strconv.ParseUint(s, 10, 32); err == nil {
570 func getUint64FromString(s string) uint64 {
571 if val, err := strconv.ParseUint(s, 10, 64); err == nil {
577 func getFloatFromString(s string, bitSize int) float64 {
578 if val, err := strconv.ParseFloat(s, bitSize); err == nil {
584 func getFloat64FromString(s string) float64 {
585 return getFloatFromString(s, 64)
588 func getFloat32FromString(s string) float32 {
589 return float32(getFloatFromString(s, 32))
592 func getValueString(values string, key string) string {
593 slice := strings.Split(values, ",")
594 for _, s := range slice {
595 if lineContains(s, key) {
596 return getFieldValueStr(s, key)
602 func getCommandstatsValues(values string) (string, string, string) {
603 calls := getValueString(values, "calls=")
604 usec := getValueString(values, "usec=")
605 usecPerCall := getValueString(values, "usec_per_call=")
606 return calls, usec, usecPerCall
609 func updateCommandstatsValues(i interface{}, values string) {
610 stype := reflect.ValueOf(i).Elem()
611 callsStr, usecStr, usecPerCallStr := getCommandstatsValues(values)
613 callsField := stype.FieldByName("Calls")
614 callsField.Set(reflect.ValueOf(getUint32FromString(callsStr)))
616 usecField := stype.FieldByName("Usec")
617 usecField.Set(reflect.ValueOf(getUint32FromString(usecStr)))
619 usecPerCallField := stype.FieldByName("UsecPerCall")
620 usecPerCallField.Set(reflect.ValueOf(getFloat32FromString(usecPerCallStr)))
623 func getKeyspaceValues(values string) (string, string, string) {
624 keys := getValueString(values, "keys=")
625 expires := getValueString(values, "expires=")
626 avgttl := getValueString(values, "avg_ttl=")
627 return keys, expires, avgttl
630 func updateKeyspaceValues(i interface{}, values string) {
631 stype := reflect.ValueOf(i).Elem()
632 keysStr, expiresStr, avgttlStr := getKeyspaceValues(values)
634 keysField := stype.FieldByName("Keys")
635 keysField.Set(reflect.ValueOf(getUint32FromString(keysStr)))
637 expiresField := stype.FieldByName("Expires")
638 expiresField.Set(reflect.ValueOf(getUint32FromString(expiresStr)))
640 avgttlField := stype.FieldByName("AvgTtl")
641 avgttlField.Set(reflect.ValueOf(getUint32FromString(avgttlStr)))
644 func updateServerInfoFields(config ConfigInfo, info *DbInfo) {
645 if value, ok := config["uptime_in_days"]; ok {
646 info.Fields.Server.UptimeInDays = getUint32FromString(value)
650 func updateClientInfoFields(config ConfigInfo, info *DbInfo) {
651 if value, ok := config["connected_clients"]; ok {
652 info.Fields.Clients.ConnectedClients = getUint32FromString(value)
654 if value, ok := config["client_recent_max_input_buffer"]; ok {
655 info.Fields.Clients.ClientRecentMaxInputBuffer = getUint32FromString(value)
657 if value, ok := config["client_recent_max_output_buffer"]; ok {
658 info.Fields.Clients.ClientRecentMaxOutputBuffer = getUint32FromString(value)
662 func updateMemoryInfoFields(config ConfigInfo, info *DbInfo) {
663 if value, ok := config["used_memory"]; ok {
664 info.Fields.Memory.UsedMemory = getUint64FromString(value)
666 if value, ok := config["used_memory_human"]; ok {
667 info.Fields.Memory.UsedMemoryHuman = value
669 if value, ok := config["used_memory_rss"]; ok {
670 info.Fields.Memory.UsedMemoryRss = getUint64FromString(value)
672 if value, ok := config["used_memory_rss_human"]; ok {
673 info.Fields.Memory.UsedMemoryRssHuman = value
675 if value, ok := config["used_memory_peak"]; ok {
676 info.Fields.Memory.UsedMemoryPeak = getUint64FromString(value)
678 if value, ok := config["used_memory_peak_human"]; ok {
679 info.Fields.Memory.UsedMemoryPeakHuman = value
681 if value, ok := config["used_memory_peak_perc"]; ok {
682 info.Fields.Memory.UsedMemoryPeakPerc = value
684 if value, ok := config["mem_fragmentation_ratio"]; ok {
685 info.Fields.Memory.MemFragmentationRatio = getFloat32FromString(value)
687 if value, ok := config["mem_fragmentation_bytes"]; ok {
688 info.Fields.Memory.MemFragmentationBytes = getUint32FromString(value)
692 func updateStatsInfoFields(config ConfigInfo, info *DbInfo) {
693 if value, ok := config["total_connections_received"]; ok {
694 info.Fields.Stats.TotalConnectionsReceived = getUint32FromString(value)
696 if value, ok := config["total_commands_processed"]; ok {
697 info.Fields.Stats.TotalCommandsProcessed = getUint32FromString(value)
699 if value, ok := config["sync_full"]; ok {
700 info.Fields.Stats.SyncFull = getUint32FromString(value)
702 if value, ok := config["sync_partial_ok"]; ok {
703 info.Fields.Stats.SyncPartialOk = getUint32FromString(value)
705 if value, ok := config["sync_partial_err"]; ok {
706 info.Fields.Stats.SyncPartialErr = getUint32FromString(value)
708 if value, ok := config["pubsub_channels"]; ok {
709 info.Fields.Stats.PubsubChannels = getUint32FromString(value)
713 func updateCpuInfoFields(config ConfigInfo, info *DbInfo) {
714 if value, ok := config["used_cpu_sys"]; ok {
715 info.Fields.Cpu.UsedCpuSys = getFloat64FromString(value)
717 if value, ok := config["used_cpu_user"]; ok {
718 info.Fields.Cpu.UsedCpuUser = getFloat64FromString(value)
722 func updateCommandstatsInfoFields(config ConfigInfo, info *DbInfo) {
723 if values, ok := config["cmdstat_replconf"]; ok {
724 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatReplconf, values)
726 if values, ok := config["cmdstat_keys"]; ok {
727 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatKeys, values)
729 if values, ok := config["cmdstat_role"]; ok {
730 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatRole, values)
732 if values, ok := config["cmdstat_psync"]; ok {
733 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatPsync, values)
735 if values, ok := config["cmdstat_mset"]; ok {
736 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatMset, values)
738 if values, ok := config["cmdstat_publish"]; ok {
739 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatPublish, values)
741 if values, ok := config["cmdstat_info"]; ok {
742 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatInfo, values)
744 if values, ok := config["cmdstat_ping"]; ok {
745 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatPing, values)
747 if values, ok := config["cmdstat_client"]; ok {
748 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatClient, values)
750 if values, ok := config["cmdstat_command"]; ok {
751 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatCommand, values)
753 if values, ok := config["cmdstat_subscribe"]; ok {
754 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatSubscribe, values)
756 if values, ok := config["cmdstat_monitor"]; ok {
757 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatMonitor, values)
759 if values, ok := config["cmdstat_config"]; ok {
760 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatConfig, values)
762 if values, ok := config["cmdstat_slaveof"]; ok {
763 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatSlaveof, values)
767 func updateKeyspaceInfoFields(config ConfigInfo, info *DbInfo) {
768 if values, ok := config["db0"]; ok {
769 updateKeyspaceValues(&info.Fields.Keyspace.Db, values)
773 func getConfigInfo(input []string) ConfigInfo {
774 config := ConfigInfo{}
775 for _, line := range input {
776 if i := strings.Index(line, ":"); i != -1 {
777 if key := strings.TrimSpace(line[:i]); len(key) > 0 {
779 config[key] = strings.TrimSpace(line[i+1:])
787 func readRedisInfoReplyFields(input []string, info *DbInfo) {
788 config := getConfigInfo(input)
790 if value, ok := config["role"]; ok {
791 if "master" == value {
792 info.Fields.PrimaryRole = true
795 if value, ok := config["connected_slaves"]; ok {
796 info.Fields.ConnectedReplicaCnt = getUint32FromString(value)
798 updateServerInfoFields(config, info)
799 updateClientInfoFields(config, info)
800 updateMemoryInfoFields(config, info)
801 updateStatsInfoFields(config, info)
802 updateCpuInfoFields(config, info)
803 updateCommandstatsInfoFields(config, info)
804 updateKeyspaceInfoFields(config, info)
807 func (db *DB) State() (*DbState, error) {
808 dbState := new(DbState)
809 if db.sentinelPort != "" {
810 //Establish connection to Redis sentinel. The reason why connection is done
811 //here instead of time of the SDL instance creation is that for the time being
812 //sentinel connection is needed only here to get state information and
813 //state information is needed only by 'sdlcli' hence it is not time critical
814 //and also we want to avoid opening unnecessary TCP connections towards Redis
815 //sentinel for every SDL instance. Now it is done only when 'sdlcli' is used.
816 sentinelClient := db.sentinel(db.addr, db.sentinelPort, db.masterName, db.nodeCnt)
817 return sentinelClient.GetDbState()
819 info, err := db.Info()
821 dbState.PrimaryDbState.Err = err
824 return db.fillDbStateFromDbInfo(info)
828 func (db *DB) fillDbStateFromDbInfo(info *DbInfo) (*DbState, error) {
830 if info.Fields.PrimaryRole == true {
832 PrimaryDbState: PrimaryDbState{
833 Fields: PrimaryDbStateFields{
843 cnt, err := strconv.Atoi(db.nodeCnt)
845 dbState.Err = fmt.Errorf("DBAAS_NODE_COUNT configuration value '%s' conversion to integer failed", db.nodeCnt)
847 dbState.ConfigNodeCnt = cnt
850 return &dbState, dbState.Err
853 func createReplicaDbClient(host string) (*DB, error) {
854 addr, port, err := net.SplitHostPort(host)
858 return createDbClient(addr, port, "", "", "", newRedisClient, subscribeNotifications, nil), err
861 func getStatisticsInfo(db *DB, host string) (*DbStatisticsInfo, error) {
863 dbStatisticsInfo := new(DbStatisticsInfo)
864 dbStatisticsInfo.IPAddr, dbStatisticsInfo.Port, err = net.SplitHostPort(host)
869 info, err := db.Info()
873 dbStatisticsInfo.Info = info
875 return dbStatisticsInfo, nil
878 func sentinelStatistics(db *DB) (*DbStatistics, error) {
879 dbState := new(DbState)
880 dbStatistics := new(DbStatistics)
881 dbStatisticsInfo := new(DbStatisticsInfo)
884 dbState, err = db.State()
889 dbStatisticsInfo, err = getStatisticsInfo(db, dbState.PrimaryDbState.GetAddress())
890 dbStatistics.Stats = append(dbStatistics.Stats, dbStatisticsInfo)
892 if dbState.ReplicasDbState != nil {
893 for _, r := range dbState.ReplicasDbState.States {
894 replicaDb, err := createReplicaDbClient(r.GetAddress())
898 dbStatisticsInfo, err = getStatisticsInfo(replicaDb, r.GetAddress())
899 if closeErr := replicaDb.CloseDB(); closeErr != nil {
905 dbStatistics.Stats = append(dbStatistics.Stats, dbStatisticsInfo)
909 return dbStatistics, nil
912 func standaloneStatistics(db *DB) (*DbStatistics, error) {
913 dbStatistics := new(DbStatistics)
915 dbStatisticsInfo, err := getStatisticsInfo(db, net.JoinHostPort(db.addr, db.port))
916 dbStatistics.Stats = append(dbStatistics.Stats, dbStatisticsInfo)
918 return dbStatistics, err
921 func (db *DB) Statistics() (*DbStatistics, error) {
922 if db.sentinelPort != "" {
923 return sentinelStatistics(db)
926 return standaloneStatistics(db)
929 var luaRefresh = redis.NewScript(`if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("pexpire", KEYS[1], ARGV[2]) else return 0 end`)
931 func (db *DB) PExpireIE(key string, data interface{}, expiration time.Duration) error {
932 expirationStr := strconv.FormatInt(int64(expiration/time.Millisecond), 10)
933 result, err := luaRefresh.Run(db.ctx, db.client, []string{key}, data, expirationStr).Result()
937 if result == int64(1) {
940 return errors.New("Lock not held")
943 func (sCbMap *sharedCbMap) Add(channel string, cb ChannelNotificationCb) {
945 defer sCbMap.m.Unlock()
946 sCbMap.cbMap[channel] = cb
949 func (sCbMap *sharedCbMap) Remove(channel string) {
951 defer sCbMap.m.Unlock()
952 delete(sCbMap.cbMap, channel)
955 func (sCbMap *sharedCbMap) Count() int {
957 defer sCbMap.m.Unlock()
958 return len(sCbMap.cbMap)
961 func (sCbMap *sharedCbMap) GetMapCopy() map[string]ChannelNotificationCb {
963 defer sCbMap.m.Unlock()
964 mapCopy := make(map[string]ChannelNotificationCb, 0)
965 for i, v := range sCbMap.cbMap {