Bump Redis client version to v8.11.4
[ric-plt/sdlgo.git] / internal / sdlgoredis / sdlgoredis.go
1 /*
2    Copyright (c) 2019 AT&T Intellectual Property.
3    Copyright (c) 2018-2019 Nokia.
4
5    Licensed under the Apache License, Version 2.0 (the "License");
6    you may not use this file except in compliance with the License.
7    You may obtain a copy of the License at
8
9        http://www.apache.org/licenses/LICENSE-2.0
10
11    Unless required by applicable law or agreed to in writing, software
12    distributed under the License is distributed on an "AS IS" BASIS,
13    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14    See the License for the specific language governing permissions and
15    limitations under the License.
16 */
17
18 /*
19  * This source code is part of the near-RT RIC (RAN Intelligent Controller)
20  * platform project (RICP).
21  */
22
23 package sdlgoredis
24
25 import (
26         "context"
27         "errors"
28         "fmt"
29         "github.com/go-redis/redis/v8"
30         "io"
31         "log"
32         "net"
33         "os"
34         "reflect"
35         "strconv"
36         "strings"
37         "sync"
38         "time"
39 )
40
41 type ChannelNotificationCb func(channel string, payload ...string)
42 type RedisClientCreator func(addr, port, clusterName string, isHa bool) RedisClient
43
44 type intChannels struct {
45         addChannel    chan string
46         removeChannel chan string
47         exit          chan bool
48 }
49
50 type sharedCbMap struct {
51         m     sync.Mutex
52         cbMap map[string]ChannelNotificationCb
53 }
54
55 type Config struct {
56         hostname        string
57         port            string
58         masterName      string
59         sentinelPort    string
60         clusterAddrList string
61         nodeCnt         string
62 }
63
64 type DB struct {
65         ctx          context.Context
66         client       RedisClient
67         sentinel     RedisSentinelCreateCb
68         subscribe    SubscribeFn
69         redisModules bool
70         sCbMap       *sharedCbMap
71         ch           intChannels
72         cfg          Config
73         addr         string
74 }
75
76 type Subscriber interface {
77         Channel(opts ...redis.ChannelOption) <-chan *redis.Message
78         Subscribe(ctx context.Context, channels ...string) error
79         Unsubscribe(ctx context.Context, channels ...string) error
80         Close() error
81 }
82
83 type SubscribeFn func(ctx context.Context, client RedisClient, channels ...string) Subscriber
84
85 type RedisClient interface {
86         Command(ctx context.Context) *redis.CommandsInfoCmd
87         Close() error
88         Subscribe(ctx context.Context, channels ...string) *redis.PubSub
89         MSet(ctx context.Context, pairs ...interface{}) *redis.StatusCmd
90         Do(ctx context.Context, args ...interface{}) *redis.Cmd
91         MGet(ctx context.Context, keys ...string) *redis.SliceCmd
92         Del(ctx context.Context, keys ...string) *redis.IntCmd
93         Keys(ctx context.Context, pattern string) *redis.StringSliceCmd
94         SetNX(ctx context.Context, key string, value interface{}, expiration time.Duration) *redis.BoolCmd
95         SAdd(ctx context.Context, key string, members ...interface{}) *redis.IntCmd
96         SRem(ctx context.Context, key string, members ...interface{}) *redis.IntCmd
97         SMembers(ctx context.Context, key string) *redis.StringSliceCmd
98         SIsMember(ctx context.Context, key string, member interface{}) *redis.BoolCmd
99         SCard(ctx context.Context, key string) *redis.IntCmd
100         PTTL(ctx context.Context, key string) *redis.DurationCmd
101         Eval(ctx context.Context, script string, keys []string, args ...interface{}) *redis.Cmd
102         EvalSha(ctx context.Context, sha1 string, keys []string, args ...interface{}) *redis.Cmd
103         ScriptExists(ctx context.Context, scripts ...string) *redis.BoolSliceCmd
104         ScriptLoad(ctx context.Context, script string) *redis.StringCmd
105         Info(ctx context.Context, section ...string) *redis.StringCmd
106 }
107
108 var dbLogger *logger
109
110 func init() {
111         dbLogger = &logger{
112                 log: log.New(os.Stdout, "database: ", log.LstdFlags|log.Lshortfile),
113         }
114         redis.SetLogger(dbLogger)
115 }
116
117 func SetDbLogger(out io.Writer) {
118         dbLogger.log.SetOutput(out)
119 }
120
121 func checkResultAndError(result interface{}, err error) (bool, error) {
122         if err != nil {
123                 if err == redis.Nil {
124                         return false, nil
125                 }
126                 return false, err
127         }
128         if result == "OK" {
129                 return true, nil
130         }
131         return false, nil
132 }
133
134 func checkIntResultAndError(result interface{}, err error) (bool, error) {
135         if err != nil {
136                 return false, err
137         }
138         if n, ok := result.(int64); ok {
139                 if n == 1 {
140                         return true, nil
141                 }
142         } else if n, ok := result.(int); ok {
143                 if n == 1 {
144                         return true, nil
145                 }
146         }
147         return false, nil
148 }
149
150 func subscribeNotifications(ctx context.Context, client RedisClient, channels ...string) Subscriber {
151         return client.Subscribe(ctx, channels...)
152 }
153
154 func CreateDB(client RedisClient, subscribe SubscribeFn, sentinelCreateCb RedisSentinelCreateCb, cfg Config, sentinelAddr string) *DB {
155         db := DB{
156                 ctx:          context.Background(),
157                 client:       client,
158                 sentinel:     sentinelCreateCb,
159                 subscribe:    subscribe,
160                 redisModules: true,
161                 sCbMap:       &sharedCbMap{cbMap: make(map[string]ChannelNotificationCb, 0)},
162                 ch: intChannels{
163                         addChannel:    make(chan string),
164                         removeChannel: make(chan string),
165                         exit:          make(chan bool),
166                 },
167                 cfg:  cfg,
168                 addr: sentinelAddr,
169         }
170
171         return &db
172 }
173
174 func Create() []*DB {
175         osimpl := osImpl{}
176         return ReadConfigAndCreateDbClients(osimpl, newRedisClient, subscribeNotifications, newRedisSentinel)
177 }
178
179 func readConfig(osI OS) Config {
180         cfg := Config{
181                 hostname:        osI.Getenv("DBAAS_SERVICE_HOST", "localhost"),
182                 port:            osI.Getenv("DBAAS_SERVICE_PORT", "6379"),
183                 masterName:      osI.Getenv("DBAAS_MASTER_NAME", ""),
184                 sentinelPort:    osI.Getenv("DBAAS_SERVICE_SENTINEL_PORT", ""),
185                 clusterAddrList: osI.Getenv("DBAAS_CLUSTER_ADDR_LIST", ""),
186                 nodeCnt:         osI.Getenv("DBAAS_NODE_COUNT", "1"),
187         }
188         return cfg
189 }
190
191 type OS interface {
192         Getenv(key string, defValue string) string
193 }
194
195 type osImpl struct{}
196
197 func (osImpl) Getenv(key string, defValue string) string {
198         val := os.Getenv(key)
199         if val == "" {
200                 val = defValue
201         }
202         return val
203 }
204
205 func ReadConfigAndCreateDbClients(osI OS, clientCreator RedisClientCreator,
206         subscribe SubscribeFn,
207         sentinelCreateCb RedisSentinelCreateCb) []*DB {
208         cfg := readConfig(osI)
209         return createDbClients(cfg, clientCreator, subscribe, sentinelCreateCb)
210 }
211
212 func createDbClients(cfg Config, clientCreator RedisClientCreator,
213         subscribe SubscribeFn,
214         sentinelCreateCb RedisSentinelCreateCb) []*DB {
215         if cfg.clusterAddrList == "" {
216                 return []*DB{createLegacyDbClient(cfg, clientCreator, subscribe, sentinelCreateCb)}
217         }
218
219         dbs := []*DB{}
220
221         addrList := strings.Split(cfg.clusterAddrList, ",")
222         for _, addr := range addrList {
223                 db := createDbClient(cfg, addr, clientCreator, subscribe, sentinelCreateCb)
224                 dbs = append(dbs, db)
225         }
226         return dbs
227 }
228
229 func createLegacyDbClient(cfg Config, clientCreator RedisClientCreator,
230         subscribe SubscribeFn,
231         sentinelCreateCb RedisSentinelCreateCb) *DB {
232         return createDbClient(cfg, cfg.hostname, clientCreator, subscribe, sentinelCreateCb)
233 }
234
235 func createDbClient(cfg Config, hostName string, clientCreator RedisClientCreator,
236         subscribe SubscribeFn,
237         sentinelCreateCb RedisSentinelCreateCb) *DB {
238         var client RedisClient
239         var db *DB
240         if cfg.sentinelPort == "" {
241                 client = clientCreator(hostName, cfg.port, "", false)
242                 db = CreateDB(client, subscribe, nil, cfg, hostName)
243         } else {
244                 client = clientCreator(hostName, cfg.sentinelPort, cfg.masterName, true)
245                 db = CreateDB(client, subscribe, sentinelCreateCb, cfg, hostName)
246         }
247         db.CheckCommands()
248         return db
249 }
250
251 func newRedisClient(addr, port, clusterName string, isHa bool) RedisClient {
252         if isHa == true {
253                 sentinelAddress := addr + ":" + port
254                 return redis.NewFailoverClient(
255                         &redis.FailoverOptions{
256                                 MasterName:    clusterName,
257                                 SentinelAddrs: []string{sentinelAddress},
258                                 PoolSize:      20,
259                                 MaxRetries:    2,
260                         },
261                 )
262         }
263         redisAddress := addr + ":" + port
264         return redis.NewClient(&redis.Options{
265                 Addr:       redisAddress,
266                 Password:   "", // no password set
267                 DB:         0,  // use default DB
268                 PoolSize:   20,
269                 MaxRetries: 2,
270         })
271 }
272
273 func (db *DB) CheckCommands() {
274         commands, err := db.client.Command(db.ctx).Result()
275         if err == nil {
276                 redisModuleCommands := []string{"setie", "delie", "setiepub", "setnxpub",
277                         "msetmpub", "delmpub"}
278                 for _, v := range redisModuleCommands {
279                         _, ok := commands[v]
280                         if !ok {
281                                 db.redisModules = false
282                         }
283                 }
284         } else {
285                 dbLogger.Printf(db.ctx, "SDL DB commands checking failure: %s\n", err)
286         }
287 }
288
289 func (db *DB) CloseDB() error {
290         return db.client.Close()
291 }
292
293 func (db *DB) UnsubscribeChannelDB(channels ...string) {
294         for _, v := range channels {
295                 db.sCbMap.Remove(v)
296                 db.ch.removeChannel <- v
297                 if db.sCbMap.Count() == 0 {
298                         db.ch.exit <- true
299                 }
300         }
301 }
302
303 func (db *DB) SubscribeChannelDB(cb func(string, ...string), channelPrefix, eventSeparator string, channels ...string) {
304         if db.sCbMap.Count() == 0 {
305                 for _, v := range channels {
306                         db.sCbMap.Add(v, cb)
307                 }
308
309                 go func(sCbMap *sharedCbMap,
310                         channelPrefix,
311                         eventSeparator string,
312                         ch intChannels,
313                         channels ...string) {
314                         sub := db.subscribe(db.ctx, db.client, channels...)
315                         rxChannel := sub.Channel()
316                         lCbMap := sCbMap.GetMapCopy()
317                         for {
318                                 select {
319                                 case msg := <-rxChannel:
320                                         cb, ok := lCbMap[msg.Channel]
321                                         if ok {
322                                                 cb(strings.TrimPrefix(msg.Channel, channelPrefix), strings.Split(msg.Payload, eventSeparator)...)
323                                         }
324                                 case channel := <-ch.addChannel:
325                                         lCbMap = sCbMap.GetMapCopy()
326                                         sub.Subscribe(db.ctx, channel)
327                                 case channel := <-ch.removeChannel:
328                                         lCbMap = sCbMap.GetMapCopy()
329                                         sub.Unsubscribe(db.ctx, channel)
330                                 case exit := <-ch.exit:
331                                         if exit {
332                                                 if err := sub.Close(); err != nil {
333                                                         dbLogger.Printf(db.ctx, "SDL DB channel closing failure: %s\n", err)
334                                                 }
335                                                 return
336                                         }
337                                 }
338                         }
339                 }(db.sCbMap, channelPrefix, eventSeparator, db.ch, channels...)
340
341         } else {
342                 for _, v := range channels {
343                         db.sCbMap.Add(v, cb)
344                         db.ch.addChannel <- v
345                 }
346         }
347 }
348
349 func (db *DB) MSet(pairs ...interface{}) error {
350         return db.client.MSet(db.ctx, pairs...).Err()
351 }
352
353 func (db *DB) MSetMPub(channelsAndEvents []string, pairs ...interface{}) error {
354         if !db.redisModules {
355                 return errors.New("Redis deployment doesn't support MSETMPUB command")
356         }
357         command := make([]interface{}, 0)
358         command = append(command, "MSETMPUB")
359         command = append(command, len(pairs)/2)
360         command = append(command, len(channelsAndEvents)/2)
361         for _, d := range pairs {
362                 command = append(command, d)
363         }
364         for _, d := range channelsAndEvents {
365                 command = append(command, d)
366         }
367         _, err := db.client.Do(db.ctx, command...).Result()
368         return err
369 }
370
371 func (db *DB) MGet(keys []string) ([]interface{}, error) {
372         return db.client.MGet(db.ctx, keys...).Result()
373 }
374
375 func (db *DB) DelMPub(channelsAndEvents []string, keys []string) error {
376         if !db.redisModules {
377                 return errors.New("Redis deployment not supporting command DELMPUB")
378         }
379         command := make([]interface{}, 0)
380         command = append(command, "DELMPUB")
381         command = append(command, len(keys))
382         command = append(command, len(channelsAndEvents)/2)
383         for _, d := range keys {
384                 command = append(command, d)
385         }
386         for _, d := range channelsAndEvents {
387                 command = append(command, d)
388         }
389         _, err := db.client.Do(db.ctx, command...).Result()
390         return err
391
392 }
393
394 func (db *DB) Del(keys []string) error {
395         _, err := db.client.Del(db.ctx, keys...).Result()
396         return err
397 }
398
399 func (db *DB) Keys(pattern string) ([]string, error) {
400         return db.client.Keys(db.ctx, pattern).Result()
401 }
402
403 func (db *DB) SetIE(key string, oldData, newData interface{}) (bool, error) {
404         if !db.redisModules {
405                 return false, errors.New("Redis deployment not supporting command")
406         }
407
408         return checkResultAndError(db.client.Do(db.ctx, "SETIE", key, newData, oldData).Result())
409 }
410
411 func (db *DB) SetIEPub(channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error) {
412         if !db.redisModules {
413                 return false, errors.New("Redis deployment not supporting command SETIEMPUB")
414         }
415         capacity := 4 + len(channelsAndEvents)
416         command := make([]interface{}, 0, capacity)
417         command = append(command, "SETIEMPUB")
418         command = append(command, key)
419         command = append(command, newData)
420         command = append(command, oldData)
421         for _, ce := range channelsAndEvents {
422                 command = append(command, ce)
423         }
424         return checkResultAndError(db.client.Do(db.ctx, command...).Result())
425 }
426
427 func (db *DB) SetNXPub(channelsAndEvents []string, key string, data interface{}) (bool, error) {
428         if !db.redisModules {
429                 return false, errors.New("Redis deployment not supporting command SETNXMPUB")
430         }
431         capacity := 3 + len(channelsAndEvents)
432         command := make([]interface{}, 0, capacity)
433         command = append(command, "SETNXMPUB")
434         command = append(command, key)
435         command = append(command, data)
436         for _, ce := range channelsAndEvents {
437                 command = append(command, ce)
438         }
439         return checkResultAndError(db.client.Do(db.ctx, command...).Result())
440 }
441 func (db *DB) SetNX(key string, data interface{}, expiration time.Duration) (bool, error) {
442         return db.client.SetNX(db.ctx, key, data, expiration).Result()
443 }
444
445 func (db *DB) DelIEPub(channelsAndEvents []string, key string, data interface{}) (bool, error) {
446         if !db.redisModules {
447                 return false, errors.New("Redis deployment not supporting command DELIEMPUB")
448         }
449         capacity := 3 + len(channelsAndEvents)
450         command := make([]interface{}, 0, capacity)
451         command = append(command, "DELIEMPUB")
452         command = append(command, key)
453         command = append(command, data)
454         for _, ce := range channelsAndEvents {
455                 command = append(command, ce)
456         }
457         return checkIntResultAndError(db.client.Do(db.ctx, command...).Result())
458 }
459
460 func (db *DB) DelIE(key string, data interface{}) (bool, error) {
461         if !db.redisModules {
462                 return false, errors.New("Redis deployment not supporting command")
463         }
464         return checkIntResultAndError(db.client.Do(db.ctx, "DELIE", key, data).Result())
465 }
466
467 func (db *DB) SAdd(key string, data ...interface{}) error {
468         _, err := db.client.SAdd(db.ctx, key, data...).Result()
469         return err
470 }
471
472 func (db *DB) SRem(key string, data ...interface{}) error {
473         _, err := db.client.SRem(db.ctx, key, data...).Result()
474         return err
475 }
476
477 func (db *DB) SMembers(key string) ([]string, error) {
478         result, err := db.client.SMembers(db.ctx, key).Result()
479         return result, err
480 }
481
482 func (db *DB) SIsMember(key string, data interface{}) (bool, error) {
483         result, err := db.client.SIsMember(db.ctx, key, data).Result()
484         return result, err
485 }
486
487 func (db *DB) SCard(key string) (int64, error) {
488         result, err := db.client.SCard(db.ctx, key).Result()
489         return result, err
490 }
491
492 func (db *DB) PTTL(key string) (time.Duration, error) {
493         result, err := db.client.PTTL(db.ctx, key).Result()
494         return result, err
495 }
496
497 func (db *DB) Info() (*DbInfo, error) {
498         var info DbInfo
499         resultStr, err := db.client.Info(db.ctx, "all").Result()
500         if err != nil {
501                 return &info, err
502         }
503
504         result := strings.Split(strings.ReplaceAll(resultStr, "\r\n", "\n"), "\n")
505         readRedisInfoReplyFields(result, &info)
506         return &info, nil
507 }
508
509 func lineContains(line, substr string) bool {
510         return strings.Contains(line, substr)
511 }
512
513 func getFieldValueStr(line, substr string) string {
514         if idx := strings.Index(line, substr); idx != -1 {
515                 return line[idx+len(substr):]
516         }
517         return ""
518 }
519
520 func getUint32FromString(s string) uint32 {
521         if val, err := strconv.ParseUint(s, 10, 32); err == nil {
522                 return uint32(val)
523         }
524         return 0
525 }
526
527 func getUint64FromString(s string) uint64 {
528         if val, err := strconv.ParseUint(s, 10, 64); err == nil {
529                 return uint64(val)
530         }
531         return 0
532 }
533
534 func getFloatFromString(s string, bitSize int) float64 {
535         if val, err := strconv.ParseFloat(s, bitSize); err == nil {
536                 return val
537         }
538         return 0
539 }
540
541 func getFloat64FromString(s string) float64 {
542         return getFloatFromString(s, 64)
543 }
544
545 func getFloat32FromString(s string) float32 {
546         return float32(getFloatFromString(s, 32))
547 }
548
549 func getValueString(values string, key string) string {
550         slice := strings.Split(values, ",")
551         for _, s := range slice {
552                 if lineContains(s, key) {
553                         return getFieldValueStr(s, key)
554                 }
555         }
556         return ""
557 }
558
559 func getCommandstatsValues(values string) (string, string, string) {
560         calls := getValueString(values, "calls=")
561         usec := getValueString(values, "usec=")
562         usecPerCall := getValueString(values, "usec_per_call=")
563         return calls, usec, usecPerCall
564 }
565
566 func updateCommandstatsValues(i interface{}, values string) {
567         stype := reflect.ValueOf(i).Elem()
568         callsStr, usecStr, usecPerCallStr := getCommandstatsValues(values)
569
570         callsField := stype.FieldByName("Calls")
571         callsField.Set(reflect.ValueOf(getUint32FromString(callsStr)))
572
573         usecField := stype.FieldByName("Usec")
574         usecField.Set(reflect.ValueOf(getUint32FromString(usecStr)))
575
576         usecPerCallField := stype.FieldByName("UsecPerCall")
577         usecPerCallField.Set(reflect.ValueOf(getFloat32FromString(usecPerCallStr)))
578 }
579
580 func getKeyspaceValues(values string) (string, string, string) {
581         keys := getValueString(values, "keys=")
582         expires := getValueString(values, "expires=")
583         avgttl := getValueString(values, "avg_ttl=")
584         return keys, expires, avgttl
585 }
586
587 func updateKeyspaceValues(i interface{}, values string) {
588         stype := reflect.ValueOf(i).Elem()
589         keysStr, expiresStr, avgttlStr := getKeyspaceValues(values)
590
591         keysField := stype.FieldByName("Keys")
592         keysField.Set(reflect.ValueOf(getUint32FromString(keysStr)))
593
594         expiresField := stype.FieldByName("Expires")
595         expiresField.Set(reflect.ValueOf(getUint32FromString(expiresStr)))
596
597         avgttlField := stype.FieldByName("AvgTtl")
598         avgttlField.Set(reflect.ValueOf(getUint32FromString(avgttlStr)))
599 }
600
601 func updateServerInfoFields(config ConfigInfo, info *DbInfo) {
602         if value, ok := config["uptime_in_days"]; ok {
603                 info.Fields.Server.UptimeInDays = getUint32FromString(value)
604         }
605 }
606
607 func updateClientInfoFields(config ConfigInfo, info *DbInfo) {
608         if value, ok := config["connected_clients"]; ok {
609                 info.Fields.Clients.ConnectedClients = getUint32FromString(value)
610         }
611         if value, ok := config["client_recent_max_input_buffer"]; ok {
612                 info.Fields.Clients.ClientRecentMaxInputBuffer = getUint32FromString(value)
613         }
614         if value, ok := config["client_recent_max_output_buffer"]; ok {
615                 info.Fields.Clients.ClientRecentMaxOutputBuffer = getUint32FromString(value)
616         }
617 }
618
619 func updateMemoryInfoFields(config ConfigInfo, info *DbInfo) {
620         if value, ok := config["used_memory"]; ok {
621                 info.Fields.Memory.UsedMemory = getUint64FromString(value)
622         }
623         if value, ok := config["used_memory_human"]; ok {
624                 info.Fields.Memory.UsedMemoryHuman = value
625         }
626         if value, ok := config["used_memory_rss"]; ok {
627                 info.Fields.Memory.UsedMemoryRss = getUint64FromString(value)
628         }
629         if value, ok := config["used_memory_rss_human"]; ok {
630                 info.Fields.Memory.UsedMemoryRssHuman = value
631         }
632         if value, ok := config["used_memory_peak"]; ok {
633                 info.Fields.Memory.UsedMemoryPeak = getUint64FromString(value)
634         }
635         if value, ok := config["used_memory_peak_human"]; ok {
636                 info.Fields.Memory.UsedMemoryPeakHuman = value
637         }
638         if value, ok := config["used_memory_peak_perc"]; ok {
639                 info.Fields.Memory.UsedMemoryPeakPerc = value
640         }
641         if value, ok := config["mem_fragmentation_ratio"]; ok {
642                 info.Fields.Memory.MemFragmentationRatio = getFloat32FromString(value)
643         }
644         if value, ok := config["mem_fragmentation_bytes"]; ok {
645                 info.Fields.Memory.MemFragmentationBytes = getUint32FromString(value)
646         }
647 }
648
649 func updateStatsInfoFields(config ConfigInfo, info *DbInfo) {
650         if value, ok := config["total_connections_received"]; ok {
651                 info.Fields.Stats.TotalConnectionsReceived = getUint32FromString(value)
652         }
653         if value, ok := config["total_commands_processed"]; ok {
654                 info.Fields.Stats.TotalCommandsProcessed = getUint32FromString(value)
655         }
656         if value, ok := config["sync_full"]; ok {
657                 info.Fields.Stats.SyncFull = getUint32FromString(value)
658         }
659         if value, ok := config["sync_partial_ok"]; ok {
660                 info.Fields.Stats.SyncPartialOk = getUint32FromString(value)
661         }
662         if value, ok := config["sync_partial_err"]; ok {
663                 info.Fields.Stats.SyncPartialErr = getUint32FromString(value)
664         }
665         if value, ok := config["pubsub_channels"]; ok {
666                 info.Fields.Stats.PubsubChannels = getUint32FromString(value)
667         }
668 }
669
670 func updateCpuInfoFields(config ConfigInfo, info *DbInfo) {
671         if value, ok := config["used_cpu_sys"]; ok {
672                 info.Fields.Cpu.UsedCpuSys = getFloat64FromString(value)
673         }
674         if value, ok := config["used_cpu_user"]; ok {
675                 info.Fields.Cpu.UsedCpuUser = getFloat64FromString(value)
676         }
677 }
678
679 func updateCommandstatsInfoFields(config ConfigInfo, info *DbInfo) {
680         if values, ok := config["cmdstat_replconf"]; ok {
681                 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatReplconf, values)
682         }
683         if values, ok := config["cmdstat_keys"]; ok {
684                 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatKeys, values)
685         }
686         if values, ok := config["cmdstat_role"]; ok {
687                 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatRole, values)
688         }
689         if values, ok := config["cmdstat_psync"]; ok {
690                 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatPsync, values)
691         }
692         if values, ok := config["cmdstat_mset"]; ok {
693                 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatMset, values)
694         }
695         if values, ok := config["cmdstat_publish"]; ok {
696                 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatPublish, values)
697         }
698         if values, ok := config["cmdstat_info"]; ok {
699                 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatInfo, values)
700         }
701         if values, ok := config["cmdstat_ping"]; ok {
702                 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatPing, values)
703         }
704         if values, ok := config["cmdstat_client"]; ok {
705                 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatClient, values)
706         }
707         if values, ok := config["cmdstat_command"]; ok {
708                 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatCommand, values)
709         }
710         if values, ok := config["cmdstat_subscribe"]; ok {
711                 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatSubscribe, values)
712         }
713         if values, ok := config["cmdstat_monitor"]; ok {
714                 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatMonitor, values)
715         }
716         if values, ok := config["cmdstat_config"]; ok {
717                 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatConfig, values)
718         }
719         if values, ok := config["cmdstat_slaveof"]; ok {
720                 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatSlaveof, values)
721         }
722 }
723
724 func updateKeyspaceInfoFields(config ConfigInfo, info *DbInfo) {
725         if values, ok := config["db0"]; ok {
726                 updateKeyspaceValues(&info.Fields.Keyspace.Db, values)
727         }
728 }
729
730 func getConfigInfo(input []string) ConfigInfo {
731         config := ConfigInfo{}
732         for _, line := range input {
733                 if i := strings.Index(line, ":"); i != -1 {
734                         if key := strings.TrimSpace(line[:i]); len(key) > 0 {
735                                 if len(line) > i {
736                                         config[key] = strings.TrimSpace(line[i+1:])
737                                 }
738                         }
739                 }
740         }
741         return config
742 }
743
744 func readRedisInfoReplyFields(input []string, info *DbInfo) {
745         config := getConfigInfo(input)
746
747         if value, ok := config["role"]; ok {
748                 if "master" == value {
749                         info.Fields.PrimaryRole = true
750                 }
751         }
752         if value, ok := config["connected_slaves"]; ok {
753                 info.Fields.ConnectedReplicaCnt = getUint32FromString(value)
754         }
755         updateServerInfoFields(config, info)
756         updateClientInfoFields(config, info)
757         updateMemoryInfoFields(config, info)
758         updateStatsInfoFields(config, info)
759         updateCpuInfoFields(config, info)
760         updateCommandstatsInfoFields(config, info)
761         updateKeyspaceInfoFields(config, info)
762 }
763
764 func (db *DB) State() (*DbState, error) {
765         dbState := new(DbState)
766         if db.cfg.sentinelPort != "" {
767                 //Establish connection to Redis sentinel. The reason why connection is done
768                 //here instead of time of the SDL instance creation is that for the time being
769                 //sentinel connection is needed only here to get state information and
770                 //state information is needed only by 'sdlcli' hence it is not time critical
771                 //and also we want to avoid opening unnecessary TCP connections towards Redis
772                 //sentinel for every SDL instance. Now it is done only when 'sdlcli' is used.
773                 sentinelClient := db.sentinel(&db.cfg, db.addr)
774                 return sentinelClient.GetDbState()
775         } else {
776                 info, err := db.Info()
777                 if err != nil {
778                         dbState.PrimaryDbState.Err = err
779                         return dbState, err
780                 }
781                 return db.fillDbStateFromDbInfo(info)
782         }
783 }
784
785 func (db *DB) fillDbStateFromDbInfo(info *DbInfo) (*DbState, error) {
786         var dbState DbState
787         if info.Fields.PrimaryRole == true {
788                 dbState = DbState{
789                         PrimaryDbState: PrimaryDbState{
790                                 Fields: PrimaryDbStateFields{
791                                         Role:  "master",
792                                         Ip:    db.cfg.hostname,
793                                         Port:  db.cfg.port,
794                                         Flags: "master",
795                                 },
796                         },
797                 }
798         }
799
800         cnt, err := strconv.Atoi(db.cfg.nodeCnt)
801         if err != nil {
802                 dbState.Err = fmt.Errorf("DBAAS_NODE_COUNT configuration value '%s' conversion to integer failed", db.cfg.nodeCnt)
803         } else {
804                 dbState.ConfigNodeCnt = cnt
805         }
806
807         return &dbState, dbState.Err
808 }
809
810 func createReplicaDbClient(host string) *DB {
811         cfg := readConfig(osImpl{})
812         cfg.sentinelPort = ""
813         cfg.clusterAddrList, cfg.port, _ = net.SplitHostPort(host)
814         return createDbClient(cfg, cfg.clusterAddrList, newRedisClient, subscribeNotifications, nil)
815 }
816
817 func getStatisticsInfo(db *DB, host string) (*DbStatisticsInfo, error) {
818         dbStatisticsInfo := new(DbStatisticsInfo)
819         dbStatisticsInfo.IPAddr, dbStatisticsInfo.Port, _ = net.SplitHostPort(host)
820
821         info, err := db.Info()
822         if err != nil {
823                 return nil, err
824         }
825         dbStatisticsInfo.Info = info
826
827         return dbStatisticsInfo, nil
828 }
829
830 func sentinelStatistics(db *DB) (*DbStatistics, error) {
831         dbState := new(DbState)
832         dbStatistics := new(DbStatistics)
833         dbStatisticsInfo := new(DbStatisticsInfo)
834         var err error
835
836         dbState, err = db.State()
837         if err != nil {
838                 return nil, err
839         }
840
841         dbStatisticsInfo, err = getStatisticsInfo(db, dbState.PrimaryDbState.GetAddress())
842         dbStatistics.Stats = append(dbStatistics.Stats, dbStatisticsInfo)
843
844         if dbState.ReplicasDbState != nil {
845                 for _, r := range dbState.ReplicasDbState.States {
846                         replicaDb := createReplicaDbClient(r.GetAddress())
847                         dbStatisticsInfo, err = getStatisticsInfo(replicaDb, r.GetAddress())
848                         replicaDb.CloseDB()
849                         if err != nil {
850                                 return nil, err
851                         }
852                         dbStatistics.Stats = append(dbStatistics.Stats, dbStatisticsInfo)
853                 }
854         }
855
856         return dbStatistics, nil
857 }
858
859 func standaloneStatistics(db *DB) (*DbStatistics, error) {
860         dbStatistics := new(DbStatistics)
861
862         dbStatisticsInfo, err := getStatisticsInfo(db, net.JoinHostPort(db.cfg.hostname, db.cfg.port))
863         dbStatistics.Stats = append(dbStatistics.Stats, dbStatisticsInfo)
864
865         return dbStatistics, err
866 }
867
868 func (db *DB) Statistics() (*DbStatistics, error) {
869         if db.cfg.sentinelPort != "" {
870                 return sentinelStatistics(db)
871         }
872
873         return standaloneStatistics(db)
874 }
875
876 var luaRefresh = redis.NewScript(`if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("pexpire", KEYS[1], ARGV[2]) else return 0 end`)
877
878 func (db *DB) PExpireIE(key string, data interface{}, expiration time.Duration) error {
879         expirationStr := strconv.FormatInt(int64(expiration/time.Millisecond), 10)
880         result, err := luaRefresh.Run(db.ctx, db.client, []string{key}, data, expirationStr).Result()
881         if err != nil {
882                 return err
883         }
884         if result == int64(1) {
885                 return nil
886         }
887         return errors.New("Lock not held")
888 }
889
890 func (sCbMap *sharedCbMap) Add(channel string, cb ChannelNotificationCb) {
891         sCbMap.m.Lock()
892         defer sCbMap.m.Unlock()
893         sCbMap.cbMap[channel] = cb
894 }
895
896 func (sCbMap *sharedCbMap) Remove(channel string) {
897         sCbMap.m.Lock()
898         defer sCbMap.m.Unlock()
899         delete(sCbMap.cbMap, channel)
900 }
901
902 func (sCbMap *sharedCbMap) Count() int {
903         sCbMap.m.Lock()
904         defer sCbMap.m.Unlock()
905         return len(sCbMap.cbMap)
906 }
907
908 func (sCbMap *sharedCbMap) GetMapCopy() map[string]ChannelNotificationCb {
909         sCbMap.m.Lock()
910         defer sCbMap.m.Unlock()
911         mapCopy := make(map[string]ChannelNotificationCb, 0)
912         for i, v := range sCbMap.cbMap {
913                 mapCopy[i] = v
914         }
915         return mapCopy
916 }