Refactor long readRedisInfoReplyFields() function
[ric-plt/sdlgo.git] / internal / sdlgoredis / sdlgoredis.go
1 /*
2    Copyright (c) 2019 AT&T Intellectual Property.
3    Copyright (c) 2018-2019 Nokia.
4
5    Licensed under the Apache License, Version 2.0 (the "License");
6    you may not use this file except in compliance with the License.
7    You may obtain a copy of the License at
8
9        http://www.apache.org/licenses/LICENSE-2.0
10
11    Unless required by applicable law or agreed to in writing, software
12    distributed under the License is distributed on an "AS IS" BASIS,
13    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14    See the License for the specific language governing permissions and
15    limitations under the License.
16 */
17
18 /*
19  * This source code is part of the near-RT RIC (RAN Intelligent Controller)
20  * platform project (RICP).
21  */
22
23 package sdlgoredis
24
25 import (
26         "errors"
27         "fmt"
28         "github.com/go-redis/redis/v7"
29         "io"
30         "log"
31         "net"
32         "os"
33         "reflect"
34         "strconv"
35         "strings"
36         "sync"
37         "time"
38 )
39
40 type ChannelNotificationCb func(channel string, payload ...string)
41 type RedisClientCreator func(addr, port, clusterName string, isHa bool) RedisClient
42
43 type intChannels struct {
44         addChannel    chan string
45         removeChannel chan string
46         exit          chan bool
47 }
48
49 type sharedCbMap struct {
50         m     sync.Mutex
51         cbMap map[string]ChannelNotificationCb
52 }
53
54 type Config struct {
55         hostname        string
56         port            string
57         masterName      string
58         sentinelPort    string
59         clusterAddrList string
60         nodeCnt         string
61 }
62
63 type DB struct {
64         client       RedisClient
65         sentinel     RedisSentinelCreateCb
66         subscribe    SubscribeFn
67         redisModules bool
68         sCbMap       *sharedCbMap
69         ch           intChannels
70         cfg          Config
71         addr         string
72 }
73
74 type Subscriber interface {
75         Channel() <-chan *redis.Message
76         Subscribe(channels ...string) error
77         Unsubscribe(channels ...string) error
78         Close() error
79 }
80
81 type SubscribeFn func(client RedisClient, channels ...string) Subscriber
82
83 type RedisClient interface {
84         Command() *redis.CommandsInfoCmd
85         Close() error
86         Subscribe(channels ...string) *redis.PubSub
87         MSet(pairs ...interface{}) *redis.StatusCmd
88         Do(args ...interface{}) *redis.Cmd
89         MGet(keys ...string) *redis.SliceCmd
90         Del(keys ...string) *redis.IntCmd
91         Keys(pattern string) *redis.StringSliceCmd
92         SetNX(key string, value interface{}, expiration time.Duration) *redis.BoolCmd
93         SAdd(key string, members ...interface{}) *redis.IntCmd
94         SRem(key string, members ...interface{}) *redis.IntCmd
95         SMembers(key string) *redis.StringSliceCmd
96         SIsMember(key string, member interface{}) *redis.BoolCmd
97         SCard(key string) *redis.IntCmd
98         PTTL(key string) *redis.DurationCmd
99         Eval(script string, keys []string, args ...interface{}) *redis.Cmd
100         EvalSha(sha1 string, keys []string, args ...interface{}) *redis.Cmd
101         ScriptExists(scripts ...string) *redis.BoolSliceCmd
102         ScriptLoad(script string) *redis.StringCmd
103         Info(section ...string) *redis.StringCmd
104 }
105
106 var dbLogger *log.Logger
107
108 func init() {
109         dbLogger = log.New(os.Stdout, "database: ", log.LstdFlags|log.Lshortfile)
110         redis.SetLogger(dbLogger)
111 }
112
113 func SetDbLogger(out io.Writer) {
114         dbLogger.SetOutput(out)
115 }
116
117 func checkResultAndError(result interface{}, err error) (bool, error) {
118         if err != nil {
119                 if err == redis.Nil {
120                         return false, nil
121                 }
122                 return false, err
123         }
124         if result == "OK" {
125                 return true, nil
126         }
127         return false, nil
128 }
129
130 func checkIntResultAndError(result interface{}, err error) (bool, error) {
131         if err != nil {
132                 return false, err
133         }
134         if n, ok := result.(int64); ok {
135                 if n == 1 {
136                         return true, nil
137                 }
138         } else if n, ok := result.(int); ok {
139                 if n == 1 {
140                         return true, nil
141                 }
142         }
143         return false, nil
144 }
145
146 func subscribeNotifications(client RedisClient, channels ...string) Subscriber {
147         return client.Subscribe(channels...)
148 }
149
150 func CreateDB(client RedisClient, subscribe SubscribeFn, sentinelCreateCb RedisSentinelCreateCb, cfg Config, sentinelAddr string) *DB {
151         db := DB{
152                 client:       client,
153                 sentinel:     sentinelCreateCb,
154                 subscribe:    subscribe,
155                 redisModules: true,
156                 sCbMap:       &sharedCbMap{cbMap: make(map[string]ChannelNotificationCb, 0)},
157                 ch: intChannels{
158                         addChannel:    make(chan string),
159                         removeChannel: make(chan string),
160                         exit:          make(chan bool),
161                 },
162                 cfg:  cfg,
163                 addr: sentinelAddr,
164         }
165
166         return &db
167 }
168
169 func Create() []*DB {
170         osimpl := osImpl{}
171         return ReadConfigAndCreateDbClients(osimpl, newRedisClient, subscribeNotifications, newRedisSentinel)
172 }
173
174 func readConfig(osI OS) Config {
175         cfg := Config{
176                 hostname:        osI.Getenv("DBAAS_SERVICE_HOST", "localhost"),
177                 port:            osI.Getenv("DBAAS_SERVICE_PORT", "6379"),
178                 masterName:      osI.Getenv("DBAAS_MASTER_NAME", ""),
179                 sentinelPort:    osI.Getenv("DBAAS_SERVICE_SENTINEL_PORT", ""),
180                 clusterAddrList: osI.Getenv("DBAAS_CLUSTER_ADDR_LIST", ""),
181                 nodeCnt:         osI.Getenv("DBAAS_NODE_COUNT", "1"),
182         }
183         return cfg
184 }
185
186 type OS interface {
187         Getenv(key string, defValue string) string
188 }
189
190 type osImpl struct{}
191
192 func (osImpl) Getenv(key string, defValue string) string {
193         val := os.Getenv(key)
194         if val == "" {
195                 val = defValue
196         }
197         return val
198 }
199
200 func ReadConfigAndCreateDbClients(osI OS, clientCreator RedisClientCreator,
201         subscribe SubscribeFn,
202         sentinelCreateCb RedisSentinelCreateCb) []*DB {
203         cfg := readConfig(osI)
204         return createDbClients(cfg, clientCreator, subscribe, sentinelCreateCb)
205 }
206
207 func createDbClients(cfg Config, clientCreator RedisClientCreator,
208         subscribe SubscribeFn,
209         sentinelCreateCb RedisSentinelCreateCb) []*DB {
210         if cfg.clusterAddrList == "" {
211                 return []*DB{createLegacyDbClient(cfg, clientCreator, subscribe, sentinelCreateCb)}
212         }
213
214         dbs := []*DB{}
215
216         addrList := strings.Split(cfg.clusterAddrList, ",")
217         for _, addr := range addrList {
218                 db := createDbClient(cfg, addr, clientCreator, subscribe, sentinelCreateCb)
219                 dbs = append(dbs, db)
220         }
221         return dbs
222 }
223
224 func createLegacyDbClient(cfg Config, clientCreator RedisClientCreator,
225         subscribe SubscribeFn,
226         sentinelCreateCb RedisSentinelCreateCb) *DB {
227         return createDbClient(cfg, cfg.hostname, clientCreator, subscribe, sentinelCreateCb)
228 }
229
230 func createDbClient(cfg Config, hostName string, clientCreator RedisClientCreator,
231         subscribe SubscribeFn,
232         sentinelCreateCb RedisSentinelCreateCb) *DB {
233         var client RedisClient
234         var db *DB
235         if cfg.sentinelPort == "" {
236                 client = clientCreator(hostName, cfg.port, "", false)
237                 db = CreateDB(client, subscribe, nil, cfg, hostName)
238         } else {
239                 client = clientCreator(hostName, cfg.sentinelPort, cfg.masterName, true)
240                 db = CreateDB(client, subscribe, sentinelCreateCb, cfg, hostName)
241         }
242         db.CheckCommands()
243         return db
244 }
245
246 func newRedisClient(addr, port, clusterName string, isHa bool) RedisClient {
247         if isHa == true {
248                 sentinelAddress := addr + ":" + port
249                 return redis.NewFailoverClient(
250                         &redis.FailoverOptions{
251                                 MasterName:    clusterName,
252                                 SentinelAddrs: []string{sentinelAddress},
253                                 PoolSize:      20,
254                                 MaxRetries:    2,
255                         },
256                 )
257         }
258         redisAddress := addr + ":" + port
259         return redis.NewClient(&redis.Options{
260                 Addr:       redisAddress,
261                 Password:   "", // no password set
262                 DB:         0,  // use default DB
263                 PoolSize:   20,
264                 MaxRetries: 2,
265         })
266 }
267
268 func (db *DB) CheckCommands() {
269         commands, err := db.client.Command().Result()
270         if err == nil {
271                 redisModuleCommands := []string{"setie", "delie", "setiepub", "setnxpub",
272                         "msetmpub", "delmpub"}
273                 for _, v := range redisModuleCommands {
274                         _, ok := commands[v]
275                         if !ok {
276                                 db.redisModules = false
277                         }
278                 }
279         } else {
280                 dbLogger.Printf("SDL DB commands checking failure: %s\n", err)
281         }
282 }
283
284 func (db *DB) CloseDB() error {
285         return db.client.Close()
286 }
287
288 func (db *DB) UnsubscribeChannelDB(channels ...string) {
289         for _, v := range channels {
290                 db.sCbMap.Remove(v)
291                 db.ch.removeChannel <- v
292                 if db.sCbMap.Count() == 0 {
293                         db.ch.exit <- true
294                 }
295         }
296 }
297
298 func (db *DB) SubscribeChannelDB(cb func(string, ...string), channelPrefix, eventSeparator string, channels ...string) {
299         if db.sCbMap.Count() == 0 {
300                 for _, v := range channels {
301                         db.sCbMap.Add(v, cb)
302                 }
303
304                 go func(sCbMap *sharedCbMap,
305                         channelPrefix,
306                         eventSeparator string,
307                         ch intChannels,
308                         channels ...string) {
309                         sub := db.subscribe(db.client, channels...)
310                         rxChannel := sub.Channel()
311                         lCbMap := sCbMap.GetMapCopy()
312                         for {
313                                 select {
314                                 case msg := <-rxChannel:
315                                         cb, ok := lCbMap[msg.Channel]
316                                         if ok {
317                                                 cb(strings.TrimPrefix(msg.Channel, channelPrefix), strings.Split(msg.Payload, eventSeparator)...)
318                                         }
319                                 case channel := <-ch.addChannel:
320                                         lCbMap = sCbMap.GetMapCopy()
321                                         sub.Subscribe(channel)
322                                 case channel := <-ch.removeChannel:
323                                         lCbMap = sCbMap.GetMapCopy()
324                                         sub.Unsubscribe(channel)
325                                 case exit := <-ch.exit:
326                                         if exit {
327                                                 if err := sub.Close(); err != nil {
328                                                         dbLogger.Printf("SDL DB channel closing failure: %s\n", err)
329                                                 }
330                                                 return
331                                         }
332                                 }
333                         }
334                 }(db.sCbMap, channelPrefix, eventSeparator, db.ch, channels...)
335
336         } else {
337                 for _, v := range channels {
338                         db.sCbMap.Add(v, cb)
339                         db.ch.addChannel <- v
340                 }
341         }
342 }
343
344 func (db *DB) MSet(pairs ...interface{}) error {
345         return db.client.MSet(pairs...).Err()
346 }
347
348 func (db *DB) MSetMPub(channelsAndEvents []string, pairs ...interface{}) error {
349         if !db.redisModules {
350                 return errors.New("Redis deployment doesn't support MSETMPUB command")
351         }
352         command := make([]interface{}, 0)
353         command = append(command, "MSETMPUB")
354         command = append(command, len(pairs)/2)
355         command = append(command, len(channelsAndEvents)/2)
356         for _, d := range pairs {
357                 command = append(command, d)
358         }
359         for _, d := range channelsAndEvents {
360                 command = append(command, d)
361         }
362         _, err := db.client.Do(command...).Result()
363         return err
364 }
365
366 func (db *DB) MGet(keys []string) ([]interface{}, error) {
367         return db.client.MGet(keys...).Result()
368 }
369
370 func (db *DB) DelMPub(channelsAndEvents []string, keys []string) error {
371         if !db.redisModules {
372                 return errors.New("Redis deployment not supporting command DELMPUB")
373         }
374         command := make([]interface{}, 0)
375         command = append(command, "DELMPUB")
376         command = append(command, len(keys))
377         command = append(command, len(channelsAndEvents)/2)
378         for _, d := range keys {
379                 command = append(command, d)
380         }
381         for _, d := range channelsAndEvents {
382                 command = append(command, d)
383         }
384         _, err := db.client.Do(command...).Result()
385         return err
386
387 }
388
389 func (db *DB) Del(keys []string) error {
390         _, err := db.client.Del(keys...).Result()
391         return err
392 }
393
394 func (db *DB) Keys(pattern string) ([]string, error) {
395         return db.client.Keys(pattern).Result()
396 }
397
398 func (db *DB) SetIE(key string, oldData, newData interface{}) (bool, error) {
399         if !db.redisModules {
400                 return false, errors.New("Redis deployment not supporting command")
401         }
402
403         return checkResultAndError(db.client.Do("SETIE", key, newData, oldData).Result())
404 }
405
406 func (db *DB) SetIEPub(channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error) {
407         if !db.redisModules {
408                 return false, errors.New("Redis deployment not supporting command SETIEMPUB")
409         }
410         capacity := 4 + len(channelsAndEvents)
411         command := make([]interface{}, 0, capacity)
412         command = append(command, "SETIEMPUB")
413         command = append(command, key)
414         command = append(command, newData)
415         command = append(command, oldData)
416         for _, ce := range channelsAndEvents {
417                 command = append(command, ce)
418         }
419         return checkResultAndError(db.client.Do(command...).Result())
420 }
421
422 func (db *DB) SetNXPub(channelsAndEvents []string, key string, data interface{}) (bool, error) {
423         if !db.redisModules {
424                 return false, errors.New("Redis deployment not supporting command SETNXMPUB")
425         }
426         capacity := 3 + len(channelsAndEvents)
427         command := make([]interface{}, 0, capacity)
428         command = append(command, "SETNXMPUB")
429         command = append(command, key)
430         command = append(command, data)
431         for _, ce := range channelsAndEvents {
432                 command = append(command, ce)
433         }
434         return checkResultAndError(db.client.Do(command...).Result())
435 }
436 func (db *DB) SetNX(key string, data interface{}, expiration time.Duration) (bool, error) {
437         return db.client.SetNX(key, data, expiration).Result()
438 }
439
440 func (db *DB) DelIEPub(channelsAndEvents []string, key string, data interface{}) (bool, error) {
441         if !db.redisModules {
442                 return false, errors.New("Redis deployment not supporting command DELIEMPUB")
443         }
444         capacity := 3 + len(channelsAndEvents)
445         command := make([]interface{}, 0, capacity)
446         command = append(command, "DELIEMPUB")
447         command = append(command, key)
448         command = append(command, data)
449         for _, ce := range channelsAndEvents {
450                 command = append(command, ce)
451         }
452         return checkIntResultAndError(db.client.Do(command...).Result())
453 }
454
455 func (db *DB) DelIE(key string, data interface{}) (bool, error) {
456         if !db.redisModules {
457                 return false, errors.New("Redis deployment not supporting command")
458         }
459         return checkIntResultAndError(db.client.Do("DELIE", key, data).Result())
460 }
461
462 func (db *DB) SAdd(key string, data ...interface{}) error {
463         _, err := db.client.SAdd(key, data...).Result()
464         return err
465 }
466
467 func (db *DB) SRem(key string, data ...interface{}) error {
468         _, err := db.client.SRem(key, data...).Result()
469         return err
470 }
471
472 func (db *DB) SMembers(key string) ([]string, error) {
473         result, err := db.client.SMembers(key).Result()
474         return result, err
475 }
476
477 func (db *DB) SIsMember(key string, data interface{}) (bool, error) {
478         result, err := db.client.SIsMember(key, data).Result()
479         return result, err
480 }
481
482 func (db *DB) SCard(key string) (int64, error) {
483         result, err := db.client.SCard(key).Result()
484         return result, err
485 }
486
487 func (db *DB) PTTL(key string) (time.Duration, error) {
488         result, err := db.client.PTTL(key).Result()
489         return result, err
490 }
491
492 func (db *DB) Info() (*DbInfo, error) {
493         var info DbInfo
494         resultStr, err := db.client.Info("all").Result()
495         if err != nil {
496                 return &info, err
497         }
498
499         result := strings.Split(strings.ReplaceAll(resultStr, "\r\n", "\n"), "\n")
500         readRedisInfoReplyFields(result, &info)
501         return &info, nil
502 }
503
504 func lineContains(line, substr string) bool {
505         return strings.Contains(line, substr)
506 }
507
508 func getFieldValueStr(line, substr string) string {
509         if idx := strings.Index(line, substr); idx != -1 {
510                 return line[idx+len(substr):]
511         }
512         return ""
513 }
514
515 func getUint32FromString(s string) uint32 {
516         if val, err := strconv.ParseUint(s, 10, 32); err == nil {
517                 return uint32(val)
518         }
519         return 0
520 }
521
522 func getUint64FromString(s string) uint64 {
523         if val, err := strconv.ParseUint(s, 10, 64); err == nil {
524                 return uint64(val)
525         }
526         return 0
527 }
528
529 func getFloatFromString(s string, bitSize int) float64 {
530         if val, err := strconv.ParseFloat(s, bitSize); err == nil {
531                 return val
532         }
533         return 0
534 }
535
536 func getFloat64FromString(s string) float64 {
537         return getFloatFromString(s, 64)
538 }
539
540 func getFloat32FromString(s string) float32 {
541         return float32(getFloatFromString(s, 32))
542 }
543
544 func getValueString(values string, key string) string {
545         slice := strings.Split(values, ",")
546         for _, s := range slice {
547                 if lineContains(s, key) {
548                         return getFieldValueStr(s, key)
549                 }
550         }
551         return ""
552 }
553
554 func getCommandstatsValues(values string) (string, string, string) {
555         calls := getValueString(values, "calls=")
556         usec := getValueString(values, "usec=")
557         usecPerCall := getValueString(values, "usec_per_call=")
558         return calls, usec, usecPerCall
559 }
560
561 func updateCommandstatsValues(i interface{}, values string) {
562         stype := reflect.ValueOf(i).Elem()
563         callsStr, usecStr, usecPerCallStr := getCommandstatsValues(values)
564
565         callsField := stype.FieldByName("Calls")
566         callsField.Set(reflect.ValueOf(getUint32FromString(callsStr)))
567
568         usecField := stype.FieldByName("Usec")
569         usecField.Set(reflect.ValueOf(getUint32FromString(usecStr)))
570
571         usecPerCallField := stype.FieldByName("UsecPerCall")
572         usecPerCallField.Set(reflect.ValueOf(getFloat32FromString(usecPerCallStr)))
573 }
574
575 func getKeyspaceValues(values string) (string, string, string) {
576         keys := getValueString(values, "keys=")
577         expires := getValueString(values, "expires=")
578         avgttl := getValueString(values, "avg_ttl=")
579         return keys, expires, avgttl
580 }
581
582 func updateKeyspaceValues(i interface{}, values string) {
583         stype := reflect.ValueOf(i).Elem()
584         keysStr, expiresStr, avgttlStr := getKeyspaceValues(values)
585
586         keysField := stype.FieldByName("Keys")
587         keysField.Set(reflect.ValueOf(getUint32FromString(keysStr)))
588
589         expiresField := stype.FieldByName("Expires")
590         expiresField.Set(reflect.ValueOf(getUint32FromString(expiresStr)))
591
592         avgttlField := stype.FieldByName("AvgTtl")
593         avgttlField.Set(reflect.ValueOf(getUint32FromString(avgttlStr)))
594 }
595
596 func updateServerInfoFields(config ConfigInfo, info *DbInfo) {
597         if value, ok := config["uptime_in_days"]; ok {
598                 info.Fields.Server.UptimeInDays = getUint32FromString(value)
599         }
600 }
601
602 func updateClientInfoFields(config ConfigInfo, info *DbInfo) {
603         if value, ok := config["connected_clients"]; ok {
604                 info.Fields.Clients.ConnectedClients = getUint32FromString(value)
605         }
606         if value, ok := config["client_recent_max_input_buffer"]; ok {
607                 info.Fields.Clients.ClientRecentMaxInputBuffer = getUint32FromString(value)
608         }
609         if value, ok := config["client_recent_max_output_buffer"]; ok {
610                 info.Fields.Clients.ClientRecentMaxOutputBuffer = getUint32FromString(value)
611         }
612 }
613
614 func updateMemoryInfoFields(config ConfigInfo, info *DbInfo) {
615         if value, ok := config["used_memory"]; ok {
616                 info.Fields.Memory.UsedMemory = getUint64FromString(value)
617         }
618         if value, ok := config["used_memory_human"]; ok {
619                 info.Fields.Memory.UsedMemoryHuman = value
620         }
621         if value, ok := config["used_memory_rss"]; ok {
622                 info.Fields.Memory.UsedMemoryRss = getUint64FromString(value)
623         }
624         if value, ok := config["used_memory_rss_human"]; ok {
625                 info.Fields.Memory.UsedMemoryRssHuman = value
626         }
627         if value, ok := config["used_memory_peak"]; ok {
628                 info.Fields.Memory.UsedMemoryPeak = getUint64FromString(value)
629         }
630         if value, ok := config["used_memory_peak_human"]; ok {
631                 info.Fields.Memory.UsedMemoryPeakHuman = value
632         }
633         if value, ok := config["used_memory_peak_perc"]; ok {
634                 info.Fields.Memory.UsedMemoryPeakPerc = value
635         }
636         if value, ok := config["mem_fragmentation_ratio"]; ok {
637                 info.Fields.Memory.MemFragmentationRatio = getFloat32FromString(value)
638         }
639         if value, ok := config["mem_fragmentation_bytes"]; ok {
640                 info.Fields.Memory.MemFragmentationBytes = getUint32FromString(value)
641         }
642 }
643
644 func updateStatsInfoFields(config ConfigInfo, info *DbInfo) {
645         if value, ok := config["total_connections_received"]; ok {
646                 info.Fields.Stats.TotalConnectionsReceived = getUint32FromString(value)
647         }
648         if value, ok := config["total_commands_processed"]; ok {
649                 info.Fields.Stats.TotalCommandsProcessed = getUint32FromString(value)
650         }
651         if value, ok := config["sync_full"]; ok {
652                 info.Fields.Stats.SyncFull = getUint32FromString(value)
653         }
654         if value, ok := config["sync_partial_ok"]; ok {
655                 info.Fields.Stats.SyncPartialOk = getUint32FromString(value)
656         }
657         if value, ok := config["sync_partial_err"]; ok {
658                 info.Fields.Stats.SyncPartialErr = getUint32FromString(value)
659         }
660         if value, ok := config["pubsub_channels"]; ok {
661                 info.Fields.Stats.PubsubChannels = getUint32FromString(value)
662         }
663 }
664
665 func updateCpuInfoFields(config ConfigInfo, info *DbInfo) {
666         if value, ok := config["used_cpu_sys"]; ok {
667                 info.Fields.Cpu.UsedCpuSys = getFloat64FromString(value)
668         }
669         if value, ok := config["used_cpu_user"]; ok {
670                 info.Fields.Cpu.UsedCpuUser = getFloat64FromString(value)
671         }
672 }
673
674 func updateCommandstatsInfoFields(config ConfigInfo, info *DbInfo) {
675         if values, ok := config["cmdstat_replconf"]; ok {
676                 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatReplconf, values)
677         }
678         if values, ok := config["cmdstat_keys"]; ok {
679                 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatKeys, values)
680         }
681         if values, ok := config["cmdstat_role"]; ok {
682                 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatRole, values)
683         }
684         if values, ok := config["cmdstat_psync"]; ok {
685                 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatPsync, values)
686         }
687         if values, ok := config["cmdstat_mset"]; ok {
688                 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatMset, values)
689         }
690         if values, ok := config["cmdstat_publish"]; ok {
691                 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatPublish, values)
692         }
693         if values, ok := config["cmdstat_info"]; ok {
694                 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatInfo, values)
695         }
696         if values, ok := config["cmdstat_ping"]; ok {
697                 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatPing, values)
698         }
699         if values, ok := config["cmdstat_client"]; ok {
700                 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatClient, values)
701         }
702         if values, ok := config["cmdstat_command"]; ok {
703                 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatCommand, values)
704         }
705         if values, ok := config["cmdstat_subscribe"]; ok {
706                 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatSubscribe, values)
707         }
708         if values, ok := config["cmdstat_monitor"]; ok {
709                 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatMonitor, values)
710         }
711         if values, ok := config["cmdstat_config"]; ok {
712                 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatConfig, values)
713         }
714         if values, ok := config["cmdstat_slaveof"]; ok {
715                 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatSlaveof, values)
716         }
717 }
718
719 func updateKeyspaceInfoFields(config ConfigInfo, info *DbInfo) {
720         if values, ok := config["db0"]; ok {
721                 updateKeyspaceValues(&info.Fields.Keyspace.Db, values)
722         }
723 }
724
725 func getConfigInfo(input []string) ConfigInfo {
726         config := ConfigInfo{}
727         for _, line := range input {
728                 if i := strings.Index(line, ":"); i != -1 {
729                         if key := strings.TrimSpace(line[:i]); len(key) > 0 {
730                                 if len(line) > i {
731                                         config[key] = strings.TrimSpace(line[i+1:])
732                                 }
733                         }
734                 }
735         }
736         return config
737 }
738
739 func readRedisInfoReplyFields(input []string, info *DbInfo) {
740         config := getConfigInfo(input)
741
742         if value, ok := config["role"]; ok {
743                 if "master" == value {
744                         info.Fields.PrimaryRole = true
745                 }
746         }
747         if value, ok := config["connected_slaves"]; ok {
748                 info.Fields.ConnectedReplicaCnt = getUint32FromString(value)
749         }
750         updateServerInfoFields(config, info)
751         updateClientInfoFields(config, info)
752         updateMemoryInfoFields(config, info)
753         updateStatsInfoFields(config, info)
754         updateCpuInfoFields(config, info)
755         updateCommandstatsInfoFields(config, info)
756         updateKeyspaceInfoFields(config, info)
757 }
758
759 func (db *DB) State() (*DbState, error) {
760         dbState := new(DbState)
761         if db.cfg.sentinelPort != "" {
762                 //Establish connection to Redis sentinel. The reason why connection is done
763                 //here instead of time of the SDL instance creation is that for the time being
764                 //sentinel connection is needed only here to get state information and
765                 //state information is needed only by 'sdlcli' hence it is not time critical
766                 //and also we want to avoid opening unnecessary TCP connections towards Redis
767                 //sentinel for every SDL instance. Now it is done only when 'sdlcli' is used.
768                 sentinelClient := db.sentinel(&db.cfg, db.addr)
769                 return sentinelClient.GetDbState()
770         } else {
771                 info, err := db.Info()
772                 if err != nil {
773                         dbState.PrimaryDbState.Err = err
774                         return dbState, err
775                 }
776                 return db.fillDbStateFromDbInfo(info)
777         }
778 }
779
780 func (db *DB) fillDbStateFromDbInfo(info *DbInfo) (*DbState, error) {
781         var dbState DbState
782         if info.Fields.PrimaryRole == true {
783                 dbState = DbState{
784                         PrimaryDbState: PrimaryDbState{
785                                 Fields: PrimaryDbStateFields{
786                                         Role:  "master",
787                                         Ip:    db.cfg.hostname,
788                                         Port:  db.cfg.port,
789                                         Flags: "master",
790                                 },
791                         },
792                 }
793         }
794
795         cnt, err := strconv.Atoi(db.cfg.nodeCnt)
796         if err != nil {
797                 dbState.Err = fmt.Errorf("DBAAS_NODE_COUNT configuration value '%s' conversion to integer failed", db.cfg.nodeCnt)
798         } else {
799                 dbState.ConfigNodeCnt = cnt
800         }
801
802         return &dbState, dbState.Err
803 }
804
805 func createReplicaDbClient(host string) *DB {
806         cfg := readConfig(osImpl{})
807         cfg.sentinelPort = ""
808         cfg.clusterAddrList, cfg.port, _ = net.SplitHostPort(host)
809         return createDbClient(cfg, cfg.clusterAddrList, newRedisClient, subscribeNotifications, nil)
810 }
811
812 func getStatisticsInfo(db *DB, host string) (*DbStatisticsInfo, error) {
813         dbStatisticsInfo := new(DbStatisticsInfo)
814         dbStatisticsInfo.IPAddr, dbStatisticsInfo.Port, _ = net.SplitHostPort(host)
815
816         info, err := db.Info()
817         if err != nil {
818                 return nil, err
819         }
820         dbStatisticsInfo.Info = info
821
822         return dbStatisticsInfo, nil
823 }
824
825 func sentinelStatistics(db *DB) (*DbStatistics, error) {
826         dbState := new(DbState)
827         dbStatistics := new(DbStatistics)
828         dbStatisticsInfo := new(DbStatisticsInfo)
829         var err error
830
831         dbState, err = db.State()
832         if err != nil {
833                 return nil, err
834         }
835
836         dbStatisticsInfo, err = getStatisticsInfo(db, dbState.PrimaryDbState.GetAddress())
837         dbStatistics.Stats = append(dbStatistics.Stats, dbStatisticsInfo)
838
839         if dbState.ReplicasDbState != nil {
840                 for _, r := range dbState.ReplicasDbState.States {
841                         replicaDb := createReplicaDbClient(r.GetAddress())
842                         dbStatisticsInfo, err = getStatisticsInfo(replicaDb, r.GetAddress())
843                         replicaDb.CloseDB()
844                         if err != nil {
845                                 return nil, err
846                         }
847                         dbStatistics.Stats = append(dbStatistics.Stats, dbStatisticsInfo)
848                 }
849         }
850
851         return dbStatistics, nil
852 }
853
854 func standaloneStatistics(db *DB) (*DbStatistics, error) {
855         dbStatistics := new(DbStatistics)
856
857         dbStatisticsInfo, err := getStatisticsInfo(db, net.JoinHostPort(db.cfg.hostname, db.cfg.port))
858         dbStatistics.Stats = append(dbStatistics.Stats, dbStatisticsInfo)
859
860         return dbStatistics, err
861 }
862
863 func (db *DB) Statistics() (*DbStatistics, error) {
864         if db.cfg.sentinelPort != "" {
865                 return sentinelStatistics(db)
866         }
867
868         return standaloneStatistics(db)
869 }
870
871 var luaRefresh = redis.NewScript(`if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("pexpire", KEYS[1], ARGV[2]) else return 0 end`)
872
873 func (db *DB) PExpireIE(key string, data interface{}, expiration time.Duration) error {
874         expirationStr := strconv.FormatInt(int64(expiration/time.Millisecond), 10)
875         result, err := luaRefresh.Run(db.client, []string{key}, data, expirationStr).Result()
876         if err != nil {
877                 return err
878         }
879         if result == int64(1) {
880                 return nil
881         }
882         return errors.New("Lock not held")
883 }
884
885 func (sCbMap *sharedCbMap) Add(channel string, cb ChannelNotificationCb) {
886         sCbMap.m.Lock()
887         defer sCbMap.m.Unlock()
888         sCbMap.cbMap[channel] = cb
889 }
890
891 func (sCbMap *sharedCbMap) Remove(channel string) {
892         sCbMap.m.Lock()
893         defer sCbMap.m.Unlock()
894         delete(sCbMap.cbMap, channel)
895 }
896
897 func (sCbMap *sharedCbMap) Count() int {
898         sCbMap.m.Lock()
899         defer sCbMap.m.Unlock()
900         return len(sCbMap.cbMap)
901 }
902
903 func (sCbMap *sharedCbMap) GetMapCopy() map[string]ChannelNotificationCb {
904         sCbMap.m.Lock()
905         defer sCbMap.m.Unlock()
906         mapCopy := make(map[string]ChannelNotificationCb, 0)
907         for i, v := range sCbMap.cbMap {
908                 mapCopy[i] = v
909         }
910         return mapCopy
911 }