Handle Subscribe() and Unsubscribe() error
[ric-plt/sdlgo.git] / internal / sdlgoredis / sdlgoredis.go
1 /*
2    Copyright (c) 2019 AT&T Intellectual Property.
3    Copyright (c) 2018-2022 Nokia.
4
5    Licensed under the Apache License, Version 2.0 (the "License");
6    you may not use this file except in compliance with the License.
7    You may obtain a copy of the License at
8
9        http://www.apache.org/licenses/LICENSE-2.0
10
11    Unless required by applicable law or agreed to in writing, software
12    distributed under the License is distributed on an "AS IS" BASIS,
13    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14    See the License for the specific language governing permissions and
15    limitations under the License.
16 */
17
18 /*
19  * This source code is part of the near-RT RIC (RAN Intelligent Controller)
20  * platform project (RICP).
21  */
22
23 package sdlgoredis
24
25 import (
26         "context"
27         "errors"
28         "fmt"
29         "github.com/go-redis/redis/v8"
30         "io"
31         "log"
32         "net"
33         "os"
34         "reflect"
35         "strconv"
36         "strings"
37         "sync"
38         "time"
39 )
40
41 const EventSeparator = "___"
42 const NsSeparator = ","
43
44 type ChannelNotificationCb func(channel string, payload ...string)
45 type RedisClientCreator func(addr, port, clusterName string, isHa bool) RedisClient
46
47 type intChannels struct {
48         addChannel    chan string
49         removeChannel chan string
50         exit          chan bool
51 }
52
53 type sharedCbMap struct {
54         m     sync.Mutex
55         cbMap map[string]ChannelNotificationCb
56 }
57
58 type Config struct {
59         hostname      string
60         ports         []string
61         masterNames   []string
62         sentinelPorts []string
63         clusterAddrs  []string
64         nodeCnt       string
65 }
66
67 type DB struct {
68         ctx          context.Context
69         client       RedisClient
70         sentinel     RedisSentinelCreateCb
71         subscribe    SubscribeFn
72         redisModules bool
73         sCbMap       *sharedCbMap
74         ch           intChannels
75         addr         string
76         port         string
77         sentinelPort string
78         masterName   string
79         nodeCnt      string
80 }
81
82 type Subscriber interface {
83         Channel(opts ...redis.ChannelOption) <-chan *redis.Message
84         Subscribe(ctx context.Context, channels ...string) error
85         Unsubscribe(ctx context.Context, channels ...string) error
86         Close() error
87 }
88
89 type SubscribeFn func(ctx context.Context, client RedisClient, channels ...string) Subscriber
90
91 type RedisClient interface {
92         Command(ctx context.Context) *redis.CommandsInfoCmd
93         Close() error
94         Subscribe(ctx context.Context, channels ...string) *redis.PubSub
95         MSet(ctx context.Context, pairs ...interface{}) *redis.StatusCmd
96         Do(ctx context.Context, args ...interface{}) *redis.Cmd
97         MGet(ctx context.Context, keys ...string) *redis.SliceCmd
98         Del(ctx context.Context, keys ...string) *redis.IntCmd
99         Keys(ctx context.Context, pattern string) *redis.StringSliceCmd
100         SetNX(ctx context.Context, key string, value interface{}, expiration time.Duration) *redis.BoolCmd
101         SAdd(ctx context.Context, key string, members ...interface{}) *redis.IntCmd
102         SRem(ctx context.Context, key string, members ...interface{}) *redis.IntCmd
103         SMembers(ctx context.Context, key string) *redis.StringSliceCmd
104         SIsMember(ctx context.Context, key string, member interface{}) *redis.BoolCmd
105         SCard(ctx context.Context, key string) *redis.IntCmd
106         PTTL(ctx context.Context, key string) *redis.DurationCmd
107         Eval(ctx context.Context, script string, keys []string, args ...interface{}) *redis.Cmd
108         EvalSha(ctx context.Context, sha1 string, keys []string, args ...interface{}) *redis.Cmd
109         ScriptExists(ctx context.Context, scripts ...string) *redis.BoolSliceCmd
110         ScriptLoad(ctx context.Context, script string) *redis.StringCmd
111         Info(ctx context.Context, section ...string) *redis.StringCmd
112 }
113
114 var dbLogger *logger
115
116 func init() {
117         dbLogger = &logger{
118                 log: log.New(os.Stdout, "database: ", log.LstdFlags|log.Lshortfile),
119         }
120         redis.SetLogger(dbLogger)
121 }
122
123 func SetDbLogger(out io.Writer) {
124         dbLogger.log.SetOutput(out)
125 }
126
127 func checkResultAndError(result interface{}, err error) (bool, error) {
128         if err != nil {
129                 if err == redis.Nil {
130                         return false, nil
131                 }
132                 return false, err
133         }
134         if result == "OK" {
135                 return true, nil
136         }
137         return false, nil
138 }
139
140 func checkIntResultAndError(result interface{}, err error) (bool, error) {
141         if err != nil {
142                 return false, err
143         }
144         if n, ok := result.(int64); ok {
145                 if n == 1 {
146                         return true, nil
147                 }
148         } else if n, ok := result.(int); ok {
149                 if n == 1 {
150                         return true, nil
151                 }
152         }
153         return false, nil
154 }
155
156 func subscribeNotifications(ctx context.Context, client RedisClient, channels ...string) Subscriber {
157         return client.Subscribe(ctx, channels...)
158 }
159
160 func CreateDB(client RedisClient, subscribe SubscribeFn, sentinelCreateCb RedisSentinelCreateCb,
161         addr, port, sentinelPort, masterName, nodeCnt string) *DB {
162         db := DB{
163                 ctx:          context.Background(),
164                 client:       client,
165                 sentinel:     sentinelCreateCb,
166                 subscribe:    subscribe,
167                 redisModules: true,
168                 sCbMap:       &sharedCbMap{cbMap: make(map[string]ChannelNotificationCb, 0)},
169                 ch: intChannels{
170                         addChannel:    make(chan string),
171                         removeChannel: make(chan string),
172                         exit:          make(chan bool),
173                 },
174                 addr:         addr,
175                 sentinelPort: sentinelPort,
176                 port:         port,
177                 masterName:   masterName,
178                 nodeCnt:      nodeCnt,
179         }
180
181         return &db
182 }
183
184 func Create() []*DB {
185         osimpl := osImpl{}
186         return ReadConfigAndCreateDbClients(osimpl, newRedisClient, subscribeNotifications, newRedisSentinel)
187 }
188
189 func readConfig(osI OS) Config {
190         cfg := Config{
191                 hostname: osI.Getenv("DBAAS_SERVICE_HOST", "localhost"),
192                 ports:    strings.Split(osI.Getenv("DBAAS_SERVICE_PORT", "6379"), ","),
193                 nodeCnt:  osI.Getenv("DBAAS_NODE_COUNT", "1"),
194         }
195
196         if addrStr := osI.Getenv("DBAAS_CLUSTER_ADDR_LIST", ""); addrStr != "" {
197                 cfg.clusterAddrs = strings.Split(addrStr, ",")
198         } else if cfg.hostname != "" {
199                 cfg.clusterAddrs = append(cfg.clusterAddrs, cfg.hostname)
200         }
201         if sntPortStr := osI.Getenv("DBAAS_SERVICE_SENTINEL_PORT", ""); sntPortStr != "" {
202                 cfg.sentinelPorts = strings.Split(sntPortStr, ",")
203         }
204         if nameStr := osI.Getenv("DBAAS_MASTER_NAME", ""); nameStr != "" {
205                 cfg.masterNames = strings.Split(nameStr, ",")
206         }
207         completeConfig(&cfg)
208         return cfg
209 }
210
211 type OS interface {
212         Getenv(key string, defValue string) string
213 }
214
215 type osImpl struct{}
216
217 func (osImpl) Getenv(key string, defValue string) string {
218         val := os.Getenv(key)
219         if val == "" {
220                 val = defValue
221         }
222         return val
223 }
224
225 func completeConfig(cfg *Config) {
226         if len(cfg.sentinelPorts) == 0 {
227                 if len(cfg.clusterAddrs) > len(cfg.ports) && len(cfg.ports) > 0 {
228                         for i := len(cfg.ports); i < len(cfg.clusterAddrs); i++ {
229                                 cfg.ports = append(cfg.ports, cfg.ports[i-1])
230                         }
231                 }
232         } else {
233                 if len(cfg.clusterAddrs) > len(cfg.sentinelPorts) {
234                         for i := len(cfg.sentinelPorts); i < len(cfg.clusterAddrs); i++ {
235                                 cfg.sentinelPorts = append(cfg.sentinelPorts, cfg.sentinelPorts[i-1])
236                         }
237                 }
238                 if len(cfg.clusterAddrs) > len(cfg.masterNames) && len(cfg.masterNames) > 0 {
239                         for i := len(cfg.masterNames); i < len(cfg.clusterAddrs); i++ {
240                                 cfg.masterNames = append(cfg.masterNames, cfg.masterNames[i-1])
241                         }
242                 }
243         }
244 }
245
246 func ReadConfigAndCreateDbClients(osI OS, clientCreator RedisClientCreator,
247         subscribe SubscribeFn,
248         sentinelCreateCb RedisSentinelCreateCb) []*DB {
249         dbs := []*DB{}
250         cfg := readConfig(osI)
251         for i, addr := range cfg.clusterAddrs {
252                 port := getListItem(cfg.ports, i)
253                 sntPort := getListItem(cfg.sentinelPorts, i)
254                 name := getListItem(cfg.masterNames, i)
255                 db := createDbClient(addr, port, sntPort, name, cfg.nodeCnt,
256                         clientCreator, subscribe, sentinelCreateCb)
257                 dbs = append(dbs, db)
258         }
259         return dbs
260 }
261
262 func getListItem(list []string, index int) string {
263         if index < len(list) {
264                 return list[index]
265         }
266         return ""
267 }
268
269 func createDbClient(addr, port, sentinelPort, masterName, nodeCnt string, clientCreator RedisClientCreator,
270         subscribe SubscribeFn,
271         sentinelCreateCb RedisSentinelCreateCb) *DB {
272         var client RedisClient
273         var db *DB
274         if sentinelPort == "" {
275                 client = clientCreator(addr, port, "", false)
276                 db = CreateDB(client, subscribe, nil, addr, port, sentinelPort, masterName, nodeCnt)
277         } else {
278                 client = clientCreator(addr, sentinelPort, masterName, true)
279                 db = CreateDB(client, subscribe, sentinelCreateCb, addr, port, sentinelPort, masterName, nodeCnt)
280         }
281         db.CheckCommands()
282         return db
283 }
284
285 func newRedisClient(addr, port, clusterName string, isHa bool) RedisClient {
286         if isHa == true {
287                 sentinelAddress := addr + ":" + port
288                 return redis.NewFailoverClient(
289                         &redis.FailoverOptions{
290                                 MasterName:    clusterName,
291                                 SentinelAddrs: []string{sentinelAddress},
292                                 PoolSize:      20,
293                                 MaxRetries:    2,
294                         },
295                 )
296         }
297         redisAddress := addr + ":" + port
298         return redis.NewClient(&redis.Options{
299                 Addr:       redisAddress,
300                 Password:   "", // no password set
301                 DB:         0,  // use default DB
302                 PoolSize:   20,
303                 MaxRetries: 2,
304         })
305 }
306
307 func (db *DB) CheckCommands() {
308         commands, err := db.client.Command(db.ctx).Result()
309         if err == nil {
310                 redisModuleCommands := []string{"setie", "delie", "setiepub", "setnxpub",
311                         "msetmpub", "delmpub"}
312                 for _, v := range redisModuleCommands {
313                         _, ok := commands[v]
314                         if !ok {
315                                 db.redisModules = false
316                         }
317                 }
318         } else {
319                 dbLogger.Printf(db.ctx, "SDL DB commands checking failure: %s\n", err)
320         }
321 }
322
323 func (db *DB) CloseDB() error {
324         return db.client.Close()
325 }
326
327 func (db *DB) UnsubscribeChannelDB(channels ...string) error {
328         for _, v := range channels {
329                 db.sCbMap.Remove(v)
330                 db.ch.removeChannel <- v
331                 errStr := <-db.ch.removeChannel
332                 if errStr != "" {
333                         return fmt.Errorf("SDL Unsubscribe of channel %s failed: %s", v, errStr)
334                 }
335                 if db.sCbMap.Count() == 0 {
336                         db.ch.exit <- true
337                 }
338         }
339         return nil
340 }
341
342 func (db *DB) SubscribeChannelDB(cb func(string, ...string), channels ...string) error {
343         if db.sCbMap.Count() == 0 {
344                 go func(sCbMap *sharedCbMap, ch intChannels) {
345                         sub := db.subscribe(db.ctx, db.client, "")
346                         rxChannel := sub.Channel()
347                         lCbMap := sCbMap.GetMapCopy()
348                         for {
349                                 select {
350                                 case msg := <-rxChannel:
351                                         cb, ok := lCbMap[msg.Channel]
352                                         if ok {
353                                                 nSChNames := strings.SplitAfterN(msg.Channel, NsSeparator, 2)
354                                                 cb(nSChNames[1], strings.Split(msg.Payload, EventSeparator)...)
355                                         }
356                                 case channel := <-ch.addChannel:
357                                         lCbMap = sCbMap.GetMapCopy()
358                                         if err := sub.Subscribe(db.ctx, channel); err != nil {
359                                                 ch.addChannel <- err.Error()
360                                         } else {
361                                                 ch.addChannel <- ""
362                                         }
363                                 case channel := <-ch.removeChannel:
364                                         lCbMap = sCbMap.GetMapCopy()
365                                         if err := sub.Unsubscribe(db.ctx, channel); err != nil {
366                                                 ch.removeChannel <- err.Error()
367                                         } else {
368                                                 ch.removeChannel <- ""
369                                         }
370                                 case exit := <-ch.exit:
371                                         if exit {
372                                                 if err := sub.Close(); err != nil {
373                                                         dbLogger.Printf(db.ctx, "SDL DB channel closing failure: %s\n", err)
374                                                 }
375                                                 return
376                                         }
377                                 }
378                         }
379                 }(db.sCbMap, db.ch)
380         }
381         for _, v := range channels {
382                 db.sCbMap.Add(v, cb)
383                 db.ch.addChannel <- v
384                 errStr := <-db.ch.addChannel
385                 if errStr != "" {
386                         return fmt.Errorf("SDL Subscribe of channel %s failed: %s", v, errStr)
387                 }
388         }
389         return nil
390 }
391
392 func (db *DB) MSet(pairs ...interface{}) error {
393         return db.client.MSet(db.ctx, pairs...).Err()
394 }
395
396 func (db *DB) MSetMPub(channelsAndEvents []string, pairs ...interface{}) error {
397         if !db.redisModules {
398                 return errors.New("Redis deployment doesn't support MSETMPUB command")
399         }
400         command := make([]interface{}, 0)
401         command = append(command, "MSETMPUB")
402         command = append(command, len(pairs)/2)
403         command = append(command, len(channelsAndEvents)/2)
404         for _, d := range pairs {
405                 command = append(command, d)
406         }
407         for _, d := range channelsAndEvents {
408                 command = append(command, d)
409         }
410         _, err := db.client.Do(db.ctx, command...).Result()
411         return err
412 }
413
414 func (db *DB) MGet(keys []string) ([]interface{}, error) {
415         return db.client.MGet(db.ctx, keys...).Result()
416 }
417
418 func (db *DB) DelMPub(channelsAndEvents []string, keys []string) error {
419         if !db.redisModules {
420                 return errors.New("Redis deployment not supporting command DELMPUB")
421         }
422         command := make([]interface{}, 0)
423         command = append(command, "DELMPUB")
424         command = append(command, len(keys))
425         command = append(command, len(channelsAndEvents)/2)
426         for _, d := range keys {
427                 command = append(command, d)
428         }
429         for _, d := range channelsAndEvents {
430                 command = append(command, d)
431         }
432         _, err := db.client.Do(db.ctx, command...).Result()
433         return err
434
435 }
436
437 func (db *DB) Del(keys []string) error {
438         _, err := db.client.Del(db.ctx, keys...).Result()
439         return err
440 }
441
442 func (db *DB) Keys(pattern string) ([]string, error) {
443         return db.client.Keys(db.ctx, pattern).Result()
444 }
445
446 func (db *DB) SetIE(key string, oldData, newData interface{}) (bool, error) {
447         if !db.redisModules {
448                 return false, errors.New("Redis deployment not supporting command")
449         }
450
451         return checkResultAndError(db.client.Do(db.ctx, "SETIE", key, newData, oldData).Result())
452 }
453
454 func (db *DB) SetIEPub(channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error) {
455         if !db.redisModules {
456                 return false, errors.New("Redis deployment not supporting command SETIEMPUB")
457         }
458         capacity := 4 + len(channelsAndEvents)
459         command := make([]interface{}, 0, capacity)
460         command = append(command, "SETIEMPUB")
461         command = append(command, key)
462         command = append(command, newData)
463         command = append(command, oldData)
464         for _, ce := range channelsAndEvents {
465                 command = append(command, ce)
466         }
467         return checkResultAndError(db.client.Do(db.ctx, command...).Result())
468 }
469
470 func (db *DB) SetNXPub(channelsAndEvents []string, key string, data interface{}) (bool, error) {
471         if !db.redisModules {
472                 return false, errors.New("Redis deployment not supporting command SETNXMPUB")
473         }
474         capacity := 3 + len(channelsAndEvents)
475         command := make([]interface{}, 0, capacity)
476         command = append(command, "SETNXMPUB")
477         command = append(command, key)
478         command = append(command, data)
479         for _, ce := range channelsAndEvents {
480                 command = append(command, ce)
481         }
482         return checkResultAndError(db.client.Do(db.ctx, command...).Result())
483 }
484 func (db *DB) SetNX(key string, data interface{}, expiration time.Duration) (bool, error) {
485         return db.client.SetNX(db.ctx, key, data, expiration).Result()
486 }
487
488 func (db *DB) DelIEPub(channelsAndEvents []string, key string, data interface{}) (bool, error) {
489         if !db.redisModules {
490                 return false, errors.New("Redis deployment not supporting command DELIEMPUB")
491         }
492         capacity := 3 + len(channelsAndEvents)
493         command := make([]interface{}, 0, capacity)
494         command = append(command, "DELIEMPUB")
495         command = append(command, key)
496         command = append(command, data)
497         for _, ce := range channelsAndEvents {
498                 command = append(command, ce)
499         }
500         return checkIntResultAndError(db.client.Do(db.ctx, command...).Result())
501 }
502
503 func (db *DB) DelIE(key string, data interface{}) (bool, error) {
504         if !db.redisModules {
505                 return false, errors.New("Redis deployment not supporting command")
506         }
507         return checkIntResultAndError(db.client.Do(db.ctx, "DELIE", key, data).Result())
508 }
509
510 func (db *DB) SAdd(key string, data ...interface{}) error {
511         _, err := db.client.SAdd(db.ctx, key, data...).Result()
512         return err
513 }
514
515 func (db *DB) SRem(key string, data ...interface{}) error {
516         _, err := db.client.SRem(db.ctx, key, data...).Result()
517         return err
518 }
519
520 func (db *DB) SMembers(key string) ([]string, error) {
521         result, err := db.client.SMembers(db.ctx, key).Result()
522         return result, err
523 }
524
525 func (db *DB) SIsMember(key string, data interface{}) (bool, error) {
526         result, err := db.client.SIsMember(db.ctx, key, data).Result()
527         return result, err
528 }
529
530 func (db *DB) SCard(key string) (int64, error) {
531         result, err := db.client.SCard(db.ctx, key).Result()
532         return result, err
533 }
534
535 func (db *DB) PTTL(key string) (time.Duration, error) {
536         result, err := db.client.PTTL(db.ctx, key).Result()
537         return result, err
538 }
539
540 func (db *DB) Info() (*DbInfo, error) {
541         var info DbInfo
542         resultStr, err := db.client.Info(db.ctx, "all").Result()
543         if err != nil {
544                 return &info, err
545         }
546
547         result := strings.Split(strings.ReplaceAll(resultStr, "\r\n", "\n"), "\n")
548         readRedisInfoReplyFields(result, &info)
549         return &info, nil
550 }
551
552 func lineContains(line, substr string) bool {
553         return strings.Contains(line, substr)
554 }
555
556 func getFieldValueStr(line, substr string) string {
557         if idx := strings.Index(line, substr); idx != -1 {
558                 return line[idx+len(substr):]
559         }
560         return ""
561 }
562
563 func getUint32FromString(s string) uint32 {
564         if val, err := strconv.ParseUint(s, 10, 32); err == nil {
565                 return uint32(val)
566         }
567         return 0
568 }
569
570 func getUint64FromString(s string) uint64 {
571         if val, err := strconv.ParseUint(s, 10, 64); err == nil {
572                 return uint64(val)
573         }
574         return 0
575 }
576
577 func getFloatFromString(s string, bitSize int) float64 {
578         if val, err := strconv.ParseFloat(s, bitSize); err == nil {
579                 return val
580         }
581         return 0
582 }
583
584 func getFloat64FromString(s string) float64 {
585         return getFloatFromString(s, 64)
586 }
587
588 func getFloat32FromString(s string) float32 {
589         return float32(getFloatFromString(s, 32))
590 }
591
592 func getValueString(values string, key string) string {
593         slice := strings.Split(values, ",")
594         for _, s := range slice {
595                 if lineContains(s, key) {
596                         return getFieldValueStr(s, key)
597                 }
598         }
599         return ""
600 }
601
602 func getCommandstatsValues(values string) (string, string, string) {
603         calls := getValueString(values, "calls=")
604         usec := getValueString(values, "usec=")
605         usecPerCall := getValueString(values, "usec_per_call=")
606         return calls, usec, usecPerCall
607 }
608
609 func updateCommandstatsValues(i interface{}, values string) {
610         stype := reflect.ValueOf(i).Elem()
611         callsStr, usecStr, usecPerCallStr := getCommandstatsValues(values)
612
613         callsField := stype.FieldByName("Calls")
614         callsField.Set(reflect.ValueOf(getUint32FromString(callsStr)))
615
616         usecField := stype.FieldByName("Usec")
617         usecField.Set(reflect.ValueOf(getUint32FromString(usecStr)))
618
619         usecPerCallField := stype.FieldByName("UsecPerCall")
620         usecPerCallField.Set(reflect.ValueOf(getFloat32FromString(usecPerCallStr)))
621 }
622
623 func getKeyspaceValues(values string) (string, string, string) {
624         keys := getValueString(values, "keys=")
625         expires := getValueString(values, "expires=")
626         avgttl := getValueString(values, "avg_ttl=")
627         return keys, expires, avgttl
628 }
629
630 func updateKeyspaceValues(i interface{}, values string) {
631         stype := reflect.ValueOf(i).Elem()
632         keysStr, expiresStr, avgttlStr := getKeyspaceValues(values)
633
634         keysField := stype.FieldByName("Keys")
635         keysField.Set(reflect.ValueOf(getUint32FromString(keysStr)))
636
637         expiresField := stype.FieldByName("Expires")
638         expiresField.Set(reflect.ValueOf(getUint32FromString(expiresStr)))
639
640         avgttlField := stype.FieldByName("AvgTtl")
641         avgttlField.Set(reflect.ValueOf(getUint32FromString(avgttlStr)))
642 }
643
644 func updateServerInfoFields(config ConfigInfo, info *DbInfo) {
645         if value, ok := config["uptime_in_days"]; ok {
646                 info.Fields.Server.UptimeInDays = getUint32FromString(value)
647         }
648 }
649
650 func updateClientInfoFields(config ConfigInfo, info *DbInfo) {
651         if value, ok := config["connected_clients"]; ok {
652                 info.Fields.Clients.ConnectedClients = getUint32FromString(value)
653         }
654         if value, ok := config["client_recent_max_input_buffer"]; ok {
655                 info.Fields.Clients.ClientRecentMaxInputBuffer = getUint32FromString(value)
656         }
657         if value, ok := config["client_recent_max_output_buffer"]; ok {
658                 info.Fields.Clients.ClientRecentMaxOutputBuffer = getUint32FromString(value)
659         }
660 }
661
662 func updateMemoryInfoFields(config ConfigInfo, info *DbInfo) {
663         if value, ok := config["used_memory"]; ok {
664                 info.Fields.Memory.UsedMemory = getUint64FromString(value)
665         }
666         if value, ok := config["used_memory_human"]; ok {
667                 info.Fields.Memory.UsedMemoryHuman = value
668         }
669         if value, ok := config["used_memory_rss"]; ok {
670                 info.Fields.Memory.UsedMemoryRss = getUint64FromString(value)
671         }
672         if value, ok := config["used_memory_rss_human"]; ok {
673                 info.Fields.Memory.UsedMemoryRssHuman = value
674         }
675         if value, ok := config["used_memory_peak"]; ok {
676                 info.Fields.Memory.UsedMemoryPeak = getUint64FromString(value)
677         }
678         if value, ok := config["used_memory_peak_human"]; ok {
679                 info.Fields.Memory.UsedMemoryPeakHuman = value
680         }
681         if value, ok := config["used_memory_peak_perc"]; ok {
682                 info.Fields.Memory.UsedMemoryPeakPerc = value
683         }
684         if value, ok := config["mem_fragmentation_ratio"]; ok {
685                 info.Fields.Memory.MemFragmentationRatio = getFloat32FromString(value)
686         }
687         if value, ok := config["mem_fragmentation_bytes"]; ok {
688                 info.Fields.Memory.MemFragmentationBytes = getUint32FromString(value)
689         }
690 }
691
692 func updateStatsInfoFields(config ConfigInfo, info *DbInfo) {
693         if value, ok := config["total_connections_received"]; ok {
694                 info.Fields.Stats.TotalConnectionsReceived = getUint32FromString(value)
695         }
696         if value, ok := config["total_commands_processed"]; ok {
697                 info.Fields.Stats.TotalCommandsProcessed = getUint32FromString(value)
698         }
699         if value, ok := config["sync_full"]; ok {
700                 info.Fields.Stats.SyncFull = getUint32FromString(value)
701         }
702         if value, ok := config["sync_partial_ok"]; ok {
703                 info.Fields.Stats.SyncPartialOk = getUint32FromString(value)
704         }
705         if value, ok := config["sync_partial_err"]; ok {
706                 info.Fields.Stats.SyncPartialErr = getUint32FromString(value)
707         }
708         if value, ok := config["pubsub_channels"]; ok {
709                 info.Fields.Stats.PubsubChannels = getUint32FromString(value)
710         }
711 }
712
713 func updateCpuInfoFields(config ConfigInfo, info *DbInfo) {
714         if value, ok := config["used_cpu_sys"]; ok {
715                 info.Fields.Cpu.UsedCpuSys = getFloat64FromString(value)
716         }
717         if value, ok := config["used_cpu_user"]; ok {
718                 info.Fields.Cpu.UsedCpuUser = getFloat64FromString(value)
719         }
720 }
721
722 func updateCommandstatsInfoFields(config ConfigInfo, info *DbInfo) {
723         if values, ok := config["cmdstat_replconf"]; ok {
724                 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatReplconf, values)
725         }
726         if values, ok := config["cmdstat_keys"]; ok {
727                 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatKeys, values)
728         }
729         if values, ok := config["cmdstat_role"]; ok {
730                 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatRole, values)
731         }
732         if values, ok := config["cmdstat_psync"]; ok {
733                 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatPsync, values)
734         }
735         if values, ok := config["cmdstat_mset"]; ok {
736                 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatMset, values)
737         }
738         if values, ok := config["cmdstat_publish"]; ok {
739                 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatPublish, values)
740         }
741         if values, ok := config["cmdstat_info"]; ok {
742                 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatInfo, values)
743         }
744         if values, ok := config["cmdstat_ping"]; ok {
745                 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatPing, values)
746         }
747         if values, ok := config["cmdstat_client"]; ok {
748                 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatClient, values)
749         }
750         if values, ok := config["cmdstat_command"]; ok {
751                 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatCommand, values)
752         }
753         if values, ok := config["cmdstat_subscribe"]; ok {
754                 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatSubscribe, values)
755         }
756         if values, ok := config["cmdstat_monitor"]; ok {
757                 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatMonitor, values)
758         }
759         if values, ok := config["cmdstat_config"]; ok {
760                 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatConfig, values)
761         }
762         if values, ok := config["cmdstat_slaveof"]; ok {
763                 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatSlaveof, values)
764         }
765 }
766
767 func updateKeyspaceInfoFields(config ConfigInfo, info *DbInfo) {
768         if values, ok := config["db0"]; ok {
769                 updateKeyspaceValues(&info.Fields.Keyspace.Db, values)
770         }
771 }
772
773 func getConfigInfo(input []string) ConfigInfo {
774         config := ConfigInfo{}
775         for _, line := range input {
776                 if i := strings.Index(line, ":"); i != -1 {
777                         if key := strings.TrimSpace(line[:i]); len(key) > 0 {
778                                 if len(line) > i {
779                                         config[key] = strings.TrimSpace(line[i+1:])
780                                 }
781                         }
782                 }
783         }
784         return config
785 }
786
787 func readRedisInfoReplyFields(input []string, info *DbInfo) {
788         config := getConfigInfo(input)
789
790         if value, ok := config["role"]; ok {
791                 if "master" == value {
792                         info.Fields.PrimaryRole = true
793                 }
794         }
795         if value, ok := config["connected_slaves"]; ok {
796                 info.Fields.ConnectedReplicaCnt = getUint32FromString(value)
797         }
798         updateServerInfoFields(config, info)
799         updateClientInfoFields(config, info)
800         updateMemoryInfoFields(config, info)
801         updateStatsInfoFields(config, info)
802         updateCpuInfoFields(config, info)
803         updateCommandstatsInfoFields(config, info)
804         updateKeyspaceInfoFields(config, info)
805 }
806
807 func (db *DB) State() (*DbState, error) {
808         dbState := new(DbState)
809         if db.sentinelPort != "" {
810                 //Establish connection to Redis sentinel. The reason why connection is done
811                 //here instead of time of the SDL instance creation is that for the time being
812                 //sentinel connection is needed only here to get state information and
813                 //state information is needed only by 'sdlcli' hence it is not time critical
814                 //and also we want to avoid opening unnecessary TCP connections towards Redis
815                 //sentinel for every SDL instance. Now it is done only when 'sdlcli' is used.
816                 sentinelClient := db.sentinel(db.addr, db.sentinelPort, db.masterName, db.nodeCnt)
817                 return sentinelClient.GetDbState()
818         } else {
819                 info, err := db.Info()
820                 if err != nil {
821                         dbState.PrimaryDbState.Err = err
822                         return dbState, err
823                 }
824                 return db.fillDbStateFromDbInfo(info)
825         }
826 }
827
828 func (db *DB) fillDbStateFromDbInfo(info *DbInfo) (*DbState, error) {
829         var dbState DbState
830         if info.Fields.PrimaryRole == true {
831                 dbState = DbState{
832                         PrimaryDbState: PrimaryDbState{
833                                 Fields: PrimaryDbStateFields{
834                                         Role:  "master",
835                                         Ip:    db.addr,
836                                         Port:  db.port,
837                                         Flags: "master",
838                                 },
839                         },
840                 }
841         }
842
843         cnt, err := strconv.Atoi(db.nodeCnt)
844         if err != nil {
845                 dbState.Err = fmt.Errorf("DBAAS_NODE_COUNT configuration value '%s' conversion to integer failed", db.nodeCnt)
846         } else {
847                 dbState.ConfigNodeCnt = cnt
848         }
849
850         return &dbState, dbState.Err
851 }
852
853 func createReplicaDbClient(host string) *DB {
854         addr, port, _ := net.SplitHostPort(host)
855         return createDbClient(addr, port, "", "", "", newRedisClient, subscribeNotifications, nil)
856 }
857
858 func getStatisticsInfo(db *DB, host string) (*DbStatisticsInfo, error) {
859         dbStatisticsInfo := new(DbStatisticsInfo)
860         dbStatisticsInfo.IPAddr, dbStatisticsInfo.Port, _ = net.SplitHostPort(host)
861
862         info, err := db.Info()
863         if err != nil {
864                 return nil, err
865         }
866         dbStatisticsInfo.Info = info
867
868         return dbStatisticsInfo, nil
869 }
870
871 func sentinelStatistics(db *DB) (*DbStatistics, error) {
872         dbState := new(DbState)
873         dbStatistics := new(DbStatistics)
874         dbStatisticsInfo := new(DbStatisticsInfo)
875         var err error
876
877         dbState, err = db.State()
878         if err != nil {
879                 return nil, err
880         }
881
882         dbStatisticsInfo, err = getStatisticsInfo(db, dbState.PrimaryDbState.GetAddress())
883         dbStatistics.Stats = append(dbStatistics.Stats, dbStatisticsInfo)
884
885         if dbState.ReplicasDbState != nil {
886                 for _, r := range dbState.ReplicasDbState.States {
887                         replicaDb := createReplicaDbClient(r.GetAddress())
888                         dbStatisticsInfo, err = getStatisticsInfo(replicaDb, r.GetAddress())
889                         replicaDb.CloseDB()
890                         if err != nil {
891                                 return nil, err
892                         }
893                         dbStatistics.Stats = append(dbStatistics.Stats, dbStatisticsInfo)
894                 }
895         }
896
897         return dbStatistics, nil
898 }
899
900 func standaloneStatistics(db *DB) (*DbStatistics, error) {
901         dbStatistics := new(DbStatistics)
902
903         dbStatisticsInfo, err := getStatisticsInfo(db, net.JoinHostPort(db.addr, db.port))
904         dbStatistics.Stats = append(dbStatistics.Stats, dbStatisticsInfo)
905
906         return dbStatistics, err
907 }
908
909 func (db *DB) Statistics() (*DbStatistics, error) {
910         if db.sentinelPort != "" {
911                 return sentinelStatistics(db)
912         }
913
914         return standaloneStatistics(db)
915 }
916
917 var luaRefresh = redis.NewScript(`if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("pexpire", KEYS[1], ARGV[2]) else return 0 end`)
918
919 func (db *DB) PExpireIE(key string, data interface{}, expiration time.Duration) error {
920         expirationStr := strconv.FormatInt(int64(expiration/time.Millisecond), 10)
921         result, err := luaRefresh.Run(db.ctx, db.client, []string{key}, data, expirationStr).Result()
922         if err != nil {
923                 return err
924         }
925         if result == int64(1) {
926                 return nil
927         }
928         return errors.New("Lock not held")
929 }
930
931 func (sCbMap *sharedCbMap) Add(channel string, cb ChannelNotificationCb) {
932         sCbMap.m.Lock()
933         defer sCbMap.m.Unlock()
934         sCbMap.cbMap[channel] = cb
935 }
936
937 func (sCbMap *sharedCbMap) Remove(channel string) {
938         sCbMap.m.Lock()
939         defer sCbMap.m.Unlock()
940         delete(sCbMap.cbMap, channel)
941 }
942
943 func (sCbMap *sharedCbMap) Count() int {
944         sCbMap.m.Lock()
945         defer sCbMap.m.Unlock()
946         return len(sCbMap.cbMap)
947 }
948
949 func (sCbMap *sharedCbMap) GetMapCopy() map[string]ChannelNotificationCb {
950         sCbMap.m.Lock()
951         defer sCbMap.m.Unlock()
952         mapCopy := make(map[string]ChannelNotificationCb, 0)
953         for i, v := range sCbMap.cbMap {
954                 mapCopy[i] = v
955         }
956         return mapCopy
957 }