20ed0bf18e94853fcc60b6466bcd29646a8b91bc
[ric-plt/sdlgo.git] / internal / sdlgoredis / sdlgoredis.go
1 /*
2    Copyright (c) 2019 AT&T Intellectual Property.
3    Copyright (c) 2018-2022 Nokia.
4
5    Licensed under the Apache License, Version 2.0 (the "License");
6    you may not use this file except in compliance with the License.
7    You may obtain a copy of the License at
8
9        http://www.apache.org/licenses/LICENSE-2.0
10
11    Unless required by applicable law or agreed to in writing, software
12    distributed under the License is distributed on an "AS IS" BASIS,
13    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14    See the License for the specific language governing permissions and
15    limitations under the License.
16 */
17
18 /*
19  * This source code is part of the near-RT RIC (RAN Intelligent Controller)
20  * platform project (RICP).
21  */
22
23 package sdlgoredis
24
25 import (
26         "context"
27         "errors"
28         "fmt"
29         "github.com/go-redis/redis/v8"
30         "io"
31         "log"
32         "net"
33         "os"
34         "reflect"
35         "strconv"
36         "strings"
37         "sync"
38         "time"
39 )
40
41 const EventSeparator = "___"
42 const NsSeparator = ","
43
44 type ChannelNotificationCb func(channel string, payload ...string)
45 type RedisClientCreator func(addr, port, clusterName string, isHa bool) RedisClient
46
47 type intChannels struct {
48         addChannel    chan string
49         removeChannel chan string
50         exit          chan bool
51 }
52
53 type sharedCbMap struct {
54         m     sync.Mutex
55         cbMap map[string]ChannelNotificationCb
56 }
57
58 type Config struct {
59         hostname      string
60         ports         []string
61         masterNames   []string
62         sentinelPorts []string
63         clusterAddrs  []string
64         nodeCnt       string
65 }
66
67 type DB struct {
68         ctx          context.Context
69         client       RedisClient
70         sentinel     RedisSentinelCreateCb
71         subscribe    SubscribeFn
72         redisModules bool
73         sCbMap       *sharedCbMap
74         ch           intChannels
75         addr         string
76         port         string
77         sentinelPort string
78         masterName   string
79         nodeCnt      string
80 }
81
82 type Subscriber interface {
83         Channel(opts ...redis.ChannelOption) <-chan *redis.Message
84         Subscribe(ctx context.Context, channels ...string) error
85         Unsubscribe(ctx context.Context, channels ...string) error
86         Close() error
87 }
88
89 type SubscribeFn func(ctx context.Context, client RedisClient, channels ...string) Subscriber
90
91 type RedisClient interface {
92         Command(ctx context.Context) *redis.CommandsInfoCmd
93         Close() error
94         Subscribe(ctx context.Context, channels ...string) *redis.PubSub
95         MSet(ctx context.Context, pairs ...interface{}) *redis.StatusCmd
96         Do(ctx context.Context, args ...interface{}) *redis.Cmd
97         MGet(ctx context.Context, keys ...string) *redis.SliceCmd
98         Del(ctx context.Context, keys ...string) *redis.IntCmd
99         Keys(ctx context.Context, pattern string) *redis.StringSliceCmd
100         SetNX(ctx context.Context, key string, value interface{}, expiration time.Duration) *redis.BoolCmd
101         SAdd(ctx context.Context, key string, members ...interface{}) *redis.IntCmd
102         SRem(ctx context.Context, key string, members ...interface{}) *redis.IntCmd
103         SMembers(ctx context.Context, key string) *redis.StringSliceCmd
104         SIsMember(ctx context.Context, key string, member interface{}) *redis.BoolCmd
105         SCard(ctx context.Context, key string) *redis.IntCmd
106         PTTL(ctx context.Context, key string) *redis.DurationCmd
107         Eval(ctx context.Context, script string, keys []string, args ...interface{}) *redis.Cmd
108         EvalSha(ctx context.Context, sha1 string, keys []string, args ...interface{}) *redis.Cmd
109         ScriptExists(ctx context.Context, scripts ...string) *redis.BoolSliceCmd
110         ScriptLoad(ctx context.Context, script string) *redis.StringCmd
111         Info(ctx context.Context, section ...string) *redis.StringCmd
112 }
113
114 var dbLogger *logger
115
116 func init() {
117         dbLogger = &logger{
118                 log: log.New(os.Stdout, "database: ", log.LstdFlags|log.Lshortfile),
119         }
120         redis.SetLogger(dbLogger)
121 }
122
123 func SetDbLogger(out io.Writer) {
124         dbLogger.log.SetOutput(out)
125 }
126
127 func checkResultAndError(result interface{}, err error) (bool, error) {
128         if err != nil {
129                 if err == redis.Nil {
130                         return false, nil
131                 }
132                 return false, err
133         }
134         if result == "OK" {
135                 return true, nil
136         }
137         return false, nil
138 }
139
140 func checkIntResultAndError(result interface{}, err error) (bool, error) {
141         if err != nil {
142                 return false, err
143         }
144         if n, ok := result.(int64); ok {
145                 if n == 1 {
146                         return true, nil
147                 }
148         } else if n, ok := result.(int); ok {
149                 if n == 1 {
150                         return true, nil
151                 }
152         }
153         return false, nil
154 }
155
156 func subscribeNotifications(ctx context.Context, client RedisClient, channels ...string) Subscriber {
157         return client.Subscribe(ctx, channels...)
158 }
159
160 func CreateDB(client RedisClient, subscribe SubscribeFn, sentinelCreateCb RedisSentinelCreateCb,
161         addr, port, sentinelPort, masterName, nodeCnt string) *DB {
162         db := DB{
163                 ctx:          context.Background(),
164                 client:       client,
165                 sentinel:     sentinelCreateCb,
166                 subscribe:    subscribe,
167                 redisModules: true,
168                 sCbMap:       &sharedCbMap{cbMap: make(map[string]ChannelNotificationCb, 0)},
169                 ch: intChannels{
170                         addChannel:    make(chan string),
171                         removeChannel: make(chan string),
172                         exit:          make(chan bool),
173                 },
174                 addr:         addr,
175                 sentinelPort: sentinelPort,
176                 port:         port,
177                 masterName:   masterName,
178                 nodeCnt:      nodeCnt,
179         }
180
181         return &db
182 }
183
184 func Create() []*DB {
185         osimpl := osImpl{}
186         return ReadConfigAndCreateDbClients(osimpl, newRedisClient, subscribeNotifications, newRedisSentinel)
187 }
188
189 func readConfig(osI OS) Config {
190         cfg := Config{
191                 hostname: osI.Getenv("DBAAS_SERVICE_HOST", "localhost"),
192                 ports:    strings.Split(osI.Getenv("DBAAS_SERVICE_PORT", "6379"), ","),
193                 nodeCnt:  osI.Getenv("DBAAS_NODE_COUNT", "1"),
194         }
195
196         if addrStr := osI.Getenv("DBAAS_CLUSTER_ADDR_LIST", ""); addrStr != "" {
197                 cfg.clusterAddrs = strings.Split(addrStr, ",")
198         } else if cfg.hostname != "" {
199                 cfg.clusterAddrs = append(cfg.clusterAddrs, cfg.hostname)
200         }
201         if sntPortStr := osI.Getenv("DBAAS_SERVICE_SENTINEL_PORT", ""); sntPortStr != "" {
202                 cfg.sentinelPorts = strings.Split(sntPortStr, ",")
203         }
204         if nameStr := osI.Getenv("DBAAS_MASTER_NAME", ""); nameStr != "" {
205                 cfg.masterNames = strings.Split(nameStr, ",")
206         }
207         completeConfig(&cfg)
208         return cfg
209 }
210
211 type OS interface {
212         Getenv(key string, defValue string) string
213 }
214
215 type osImpl struct{}
216
217 func (osImpl) Getenv(key string, defValue string) string {
218         val := os.Getenv(key)
219         if val == "" {
220                 val = defValue
221         }
222         return val
223 }
224
225 func completeConfig(cfg *Config) {
226         if len(cfg.sentinelPorts) == 0 {
227                 if len(cfg.clusterAddrs) > len(cfg.ports) && len(cfg.ports) > 0 {
228                         for i := len(cfg.ports); i < len(cfg.clusterAddrs); i++ {
229                                 cfg.ports = append(cfg.ports, cfg.ports[i-1])
230                         }
231                 }
232         } else {
233                 if len(cfg.clusterAddrs) > len(cfg.sentinelPorts) {
234                         for i := len(cfg.sentinelPorts); i < len(cfg.clusterAddrs); i++ {
235                                 cfg.sentinelPorts = append(cfg.sentinelPorts, cfg.sentinelPorts[i-1])
236                         }
237                 }
238                 if len(cfg.clusterAddrs) > len(cfg.masterNames) && len(cfg.masterNames) > 0 {
239                         for i := len(cfg.masterNames); i < len(cfg.clusterAddrs); i++ {
240                                 cfg.masterNames = append(cfg.masterNames, cfg.masterNames[i-1])
241                         }
242                 }
243         }
244 }
245
246 func ReadConfigAndCreateDbClients(osI OS, clientCreator RedisClientCreator,
247         subscribe SubscribeFn,
248         sentinelCreateCb RedisSentinelCreateCb) []*DB {
249         dbs := []*DB{}
250         cfg := readConfig(osI)
251         for i, addr := range cfg.clusterAddrs {
252                 port := getListItem(cfg.ports, i)
253                 sntPort := getListItem(cfg.sentinelPorts, i)
254                 name := getListItem(cfg.masterNames, i)
255                 db := createDbClient(addr, port, sntPort, name, cfg.nodeCnt,
256                         clientCreator, subscribe, sentinelCreateCb)
257                 dbs = append(dbs, db)
258         }
259         return dbs
260 }
261
262 func getListItem(list []string, index int) string {
263         if index < len(list) {
264                 return list[index]
265         }
266         return ""
267 }
268
269 func createDbClient(addr, port, sentinelPort, masterName, nodeCnt string, clientCreator RedisClientCreator,
270         subscribe SubscribeFn,
271         sentinelCreateCb RedisSentinelCreateCb) *DB {
272         var client RedisClient
273         var db *DB
274         if sentinelPort == "" {
275                 client = clientCreator(addr, port, "", false)
276                 db = CreateDB(client, subscribe, nil, addr, port, sentinelPort, masterName, nodeCnt)
277         } else {
278                 client = clientCreator(addr, sentinelPort, masterName, true)
279                 db = CreateDB(client, subscribe, sentinelCreateCb, addr, port, sentinelPort, masterName, nodeCnt)
280         }
281         db.CheckCommands()
282         return db
283 }
284
285 func newRedisClient(addr, port, clusterName string, isHa bool) RedisClient {
286         if isHa == true {
287                 sentinelAddress := addr + ":" + port
288                 return redis.NewFailoverClient(
289                         &redis.FailoverOptions{
290                                 MasterName:    clusterName,
291                                 SentinelAddrs: []string{sentinelAddress},
292                                 PoolSize:      20,
293                                 MaxRetries:    2,
294                         },
295                 )
296         }
297         redisAddress := addr + ":" + port
298         return redis.NewClient(&redis.Options{
299                 Addr:       redisAddress,
300                 Password:   "", // no password set
301                 DB:         0,  // use default DB
302                 PoolSize:   20,
303                 MaxRetries: 2,
304         })
305 }
306
307 func (db *DB) CheckCommands() {
308         commands, err := db.client.Command(db.ctx).Result()
309         if err == nil {
310                 redisModuleCommands := []string{"setie", "delie", "setiepub", "setnxpub",
311                         "msetmpub", "delmpub"}
312                 for _, v := range redisModuleCommands {
313                         _, ok := commands[v]
314                         if !ok {
315                                 db.redisModules = false
316                         }
317                 }
318         } else {
319                 dbLogger.Printf(db.ctx, "SDL DB commands checking failure: %s\n", err)
320         }
321 }
322
323 func (db *DB) CloseDB() error {
324         return db.client.Close()
325 }
326
327 func (db *DB) UnsubscribeChannelDB(channels ...string) {
328         for _, v := range channels {
329                 db.sCbMap.Remove(v)
330                 db.ch.removeChannel <- v
331                 if db.sCbMap.Count() == 0 {
332                         db.ch.exit <- true
333                 }
334         }
335 }
336
337 func (db *DB) SubscribeChannelDB(cb func(string, ...string), channels ...string) {
338         if db.sCbMap.Count() == 0 {
339                 for _, v := range channels {
340                         db.sCbMap.Add(v, cb)
341                 }
342
343                 go func(sCbMap *sharedCbMap,
344                         ch intChannels,
345                         channels ...string) {
346                         sub := db.subscribe(db.ctx, db.client, channels...)
347                         rxChannel := sub.Channel()
348                         lCbMap := sCbMap.GetMapCopy()
349                         for {
350                                 select {
351                                 case msg := <-rxChannel:
352                                         cb, ok := lCbMap[msg.Channel]
353                                         if ok {
354                                                 nSChNames := strings.SplitAfterN(msg.Channel, NsSeparator, 2)
355                                                 cb(nSChNames[1], strings.Split(msg.Payload, EventSeparator)...)
356                                         }
357                                 case channel := <-ch.addChannel:
358                                         lCbMap = sCbMap.GetMapCopy()
359                                         sub.Subscribe(db.ctx, channel)
360                                 case channel := <-ch.removeChannel:
361                                         lCbMap = sCbMap.GetMapCopy()
362                                         sub.Unsubscribe(db.ctx, channel)
363                                 case exit := <-ch.exit:
364                                         if exit {
365                                                 if err := sub.Close(); err != nil {
366                                                         dbLogger.Printf(db.ctx, "SDL DB channel closing failure: %s\n", err)
367                                                 }
368                                                 return
369                                         }
370                                 }
371                         }
372                 }(db.sCbMap, db.ch, channels...)
373
374         } else {
375                 for _, v := range channels {
376                         db.sCbMap.Add(v, cb)
377                         db.ch.addChannel <- v
378                 }
379         }
380 }
381
382 func (db *DB) MSet(pairs ...interface{}) error {
383         return db.client.MSet(db.ctx, pairs...).Err()
384 }
385
386 func (db *DB) MSetMPub(channelsAndEvents []string, pairs ...interface{}) error {
387         if !db.redisModules {
388                 return errors.New("Redis deployment doesn't support MSETMPUB command")
389         }
390         command := make([]interface{}, 0)
391         command = append(command, "MSETMPUB")
392         command = append(command, len(pairs)/2)
393         command = append(command, len(channelsAndEvents)/2)
394         for _, d := range pairs {
395                 command = append(command, d)
396         }
397         for _, d := range channelsAndEvents {
398                 command = append(command, d)
399         }
400         _, err := db.client.Do(db.ctx, command...).Result()
401         return err
402 }
403
404 func (db *DB) MGet(keys []string) ([]interface{}, error) {
405         return db.client.MGet(db.ctx, keys...).Result()
406 }
407
408 func (db *DB) DelMPub(channelsAndEvents []string, keys []string) error {
409         if !db.redisModules {
410                 return errors.New("Redis deployment not supporting command DELMPUB")
411         }
412         command := make([]interface{}, 0)
413         command = append(command, "DELMPUB")
414         command = append(command, len(keys))
415         command = append(command, len(channelsAndEvents)/2)
416         for _, d := range keys {
417                 command = append(command, d)
418         }
419         for _, d := range channelsAndEvents {
420                 command = append(command, d)
421         }
422         _, err := db.client.Do(db.ctx, command...).Result()
423         return err
424
425 }
426
427 func (db *DB) Del(keys []string) error {
428         _, err := db.client.Del(db.ctx, keys...).Result()
429         return err
430 }
431
432 func (db *DB) Keys(pattern string) ([]string, error) {
433         return db.client.Keys(db.ctx, pattern).Result()
434 }
435
436 func (db *DB) SetIE(key string, oldData, newData interface{}) (bool, error) {
437         if !db.redisModules {
438                 return false, errors.New("Redis deployment not supporting command")
439         }
440
441         return checkResultAndError(db.client.Do(db.ctx, "SETIE", key, newData, oldData).Result())
442 }
443
444 func (db *DB) SetIEPub(channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error) {
445         if !db.redisModules {
446                 return false, errors.New("Redis deployment not supporting command SETIEMPUB")
447         }
448         capacity := 4 + len(channelsAndEvents)
449         command := make([]interface{}, 0, capacity)
450         command = append(command, "SETIEMPUB")
451         command = append(command, key)
452         command = append(command, newData)
453         command = append(command, oldData)
454         for _, ce := range channelsAndEvents {
455                 command = append(command, ce)
456         }
457         return checkResultAndError(db.client.Do(db.ctx, command...).Result())
458 }
459
460 func (db *DB) SetNXPub(channelsAndEvents []string, key string, data interface{}) (bool, error) {
461         if !db.redisModules {
462                 return false, errors.New("Redis deployment not supporting command SETNXMPUB")
463         }
464         capacity := 3 + len(channelsAndEvents)
465         command := make([]interface{}, 0, capacity)
466         command = append(command, "SETNXMPUB")
467         command = append(command, key)
468         command = append(command, data)
469         for _, ce := range channelsAndEvents {
470                 command = append(command, ce)
471         }
472         return checkResultAndError(db.client.Do(db.ctx, command...).Result())
473 }
474 func (db *DB) SetNX(key string, data interface{}, expiration time.Duration) (bool, error) {
475         return db.client.SetNX(db.ctx, key, data, expiration).Result()
476 }
477
478 func (db *DB) DelIEPub(channelsAndEvents []string, key string, data interface{}) (bool, error) {
479         if !db.redisModules {
480                 return false, errors.New("Redis deployment not supporting command DELIEMPUB")
481         }
482         capacity := 3 + len(channelsAndEvents)
483         command := make([]interface{}, 0, capacity)
484         command = append(command, "DELIEMPUB")
485         command = append(command, key)
486         command = append(command, data)
487         for _, ce := range channelsAndEvents {
488                 command = append(command, ce)
489         }
490         return checkIntResultAndError(db.client.Do(db.ctx, command...).Result())
491 }
492
493 func (db *DB) DelIE(key string, data interface{}) (bool, error) {
494         if !db.redisModules {
495                 return false, errors.New("Redis deployment not supporting command")
496         }
497         return checkIntResultAndError(db.client.Do(db.ctx, "DELIE", key, data).Result())
498 }
499
500 func (db *DB) SAdd(key string, data ...interface{}) error {
501         _, err := db.client.SAdd(db.ctx, key, data...).Result()
502         return err
503 }
504
505 func (db *DB) SRem(key string, data ...interface{}) error {
506         _, err := db.client.SRem(db.ctx, key, data...).Result()
507         return err
508 }
509
510 func (db *DB) SMembers(key string) ([]string, error) {
511         result, err := db.client.SMembers(db.ctx, key).Result()
512         return result, err
513 }
514
515 func (db *DB) SIsMember(key string, data interface{}) (bool, error) {
516         result, err := db.client.SIsMember(db.ctx, key, data).Result()
517         return result, err
518 }
519
520 func (db *DB) SCard(key string) (int64, error) {
521         result, err := db.client.SCard(db.ctx, key).Result()
522         return result, err
523 }
524
525 func (db *DB) PTTL(key string) (time.Duration, error) {
526         result, err := db.client.PTTL(db.ctx, key).Result()
527         return result, err
528 }
529
530 func (db *DB) Info() (*DbInfo, error) {
531         var info DbInfo
532         resultStr, err := db.client.Info(db.ctx, "all").Result()
533         if err != nil {
534                 return &info, err
535         }
536
537         result := strings.Split(strings.ReplaceAll(resultStr, "\r\n", "\n"), "\n")
538         readRedisInfoReplyFields(result, &info)
539         return &info, nil
540 }
541
542 func lineContains(line, substr string) bool {
543         return strings.Contains(line, substr)
544 }
545
546 func getFieldValueStr(line, substr string) string {
547         if idx := strings.Index(line, substr); idx != -1 {
548                 return line[idx+len(substr):]
549         }
550         return ""
551 }
552
553 func getUint32FromString(s string) uint32 {
554         if val, err := strconv.ParseUint(s, 10, 32); err == nil {
555                 return uint32(val)
556         }
557         return 0
558 }
559
560 func getUint64FromString(s string) uint64 {
561         if val, err := strconv.ParseUint(s, 10, 64); err == nil {
562                 return uint64(val)
563         }
564         return 0
565 }
566
567 func getFloatFromString(s string, bitSize int) float64 {
568         if val, err := strconv.ParseFloat(s, bitSize); err == nil {
569                 return val
570         }
571         return 0
572 }
573
574 func getFloat64FromString(s string) float64 {
575         return getFloatFromString(s, 64)
576 }
577
578 func getFloat32FromString(s string) float32 {
579         return float32(getFloatFromString(s, 32))
580 }
581
582 func getValueString(values string, key string) string {
583         slice := strings.Split(values, ",")
584         for _, s := range slice {
585                 if lineContains(s, key) {
586                         return getFieldValueStr(s, key)
587                 }
588         }
589         return ""
590 }
591
592 func getCommandstatsValues(values string) (string, string, string) {
593         calls := getValueString(values, "calls=")
594         usec := getValueString(values, "usec=")
595         usecPerCall := getValueString(values, "usec_per_call=")
596         return calls, usec, usecPerCall
597 }
598
599 func updateCommandstatsValues(i interface{}, values string) {
600         stype := reflect.ValueOf(i).Elem()
601         callsStr, usecStr, usecPerCallStr := getCommandstatsValues(values)
602
603         callsField := stype.FieldByName("Calls")
604         callsField.Set(reflect.ValueOf(getUint32FromString(callsStr)))
605
606         usecField := stype.FieldByName("Usec")
607         usecField.Set(reflect.ValueOf(getUint32FromString(usecStr)))
608
609         usecPerCallField := stype.FieldByName("UsecPerCall")
610         usecPerCallField.Set(reflect.ValueOf(getFloat32FromString(usecPerCallStr)))
611 }
612
613 func getKeyspaceValues(values string) (string, string, string) {
614         keys := getValueString(values, "keys=")
615         expires := getValueString(values, "expires=")
616         avgttl := getValueString(values, "avg_ttl=")
617         return keys, expires, avgttl
618 }
619
620 func updateKeyspaceValues(i interface{}, values string) {
621         stype := reflect.ValueOf(i).Elem()
622         keysStr, expiresStr, avgttlStr := getKeyspaceValues(values)
623
624         keysField := stype.FieldByName("Keys")
625         keysField.Set(reflect.ValueOf(getUint32FromString(keysStr)))
626
627         expiresField := stype.FieldByName("Expires")
628         expiresField.Set(reflect.ValueOf(getUint32FromString(expiresStr)))
629
630         avgttlField := stype.FieldByName("AvgTtl")
631         avgttlField.Set(reflect.ValueOf(getUint32FromString(avgttlStr)))
632 }
633
634 func updateServerInfoFields(config ConfigInfo, info *DbInfo) {
635         if value, ok := config["uptime_in_days"]; ok {
636                 info.Fields.Server.UptimeInDays = getUint32FromString(value)
637         }
638 }
639
640 func updateClientInfoFields(config ConfigInfo, info *DbInfo) {
641         if value, ok := config["connected_clients"]; ok {
642                 info.Fields.Clients.ConnectedClients = getUint32FromString(value)
643         }
644         if value, ok := config["client_recent_max_input_buffer"]; ok {
645                 info.Fields.Clients.ClientRecentMaxInputBuffer = getUint32FromString(value)
646         }
647         if value, ok := config["client_recent_max_output_buffer"]; ok {
648                 info.Fields.Clients.ClientRecentMaxOutputBuffer = getUint32FromString(value)
649         }
650 }
651
652 func updateMemoryInfoFields(config ConfigInfo, info *DbInfo) {
653         if value, ok := config["used_memory"]; ok {
654                 info.Fields.Memory.UsedMemory = getUint64FromString(value)
655         }
656         if value, ok := config["used_memory_human"]; ok {
657                 info.Fields.Memory.UsedMemoryHuman = value
658         }
659         if value, ok := config["used_memory_rss"]; ok {
660                 info.Fields.Memory.UsedMemoryRss = getUint64FromString(value)
661         }
662         if value, ok := config["used_memory_rss_human"]; ok {
663                 info.Fields.Memory.UsedMemoryRssHuman = value
664         }
665         if value, ok := config["used_memory_peak"]; ok {
666                 info.Fields.Memory.UsedMemoryPeak = getUint64FromString(value)
667         }
668         if value, ok := config["used_memory_peak_human"]; ok {
669                 info.Fields.Memory.UsedMemoryPeakHuman = value
670         }
671         if value, ok := config["used_memory_peak_perc"]; ok {
672                 info.Fields.Memory.UsedMemoryPeakPerc = value
673         }
674         if value, ok := config["mem_fragmentation_ratio"]; ok {
675                 info.Fields.Memory.MemFragmentationRatio = getFloat32FromString(value)
676         }
677         if value, ok := config["mem_fragmentation_bytes"]; ok {
678                 info.Fields.Memory.MemFragmentationBytes = getUint32FromString(value)
679         }
680 }
681
682 func updateStatsInfoFields(config ConfigInfo, info *DbInfo) {
683         if value, ok := config["total_connections_received"]; ok {
684                 info.Fields.Stats.TotalConnectionsReceived = getUint32FromString(value)
685         }
686         if value, ok := config["total_commands_processed"]; ok {
687                 info.Fields.Stats.TotalCommandsProcessed = getUint32FromString(value)
688         }
689         if value, ok := config["sync_full"]; ok {
690                 info.Fields.Stats.SyncFull = getUint32FromString(value)
691         }
692         if value, ok := config["sync_partial_ok"]; ok {
693                 info.Fields.Stats.SyncPartialOk = getUint32FromString(value)
694         }
695         if value, ok := config["sync_partial_err"]; ok {
696                 info.Fields.Stats.SyncPartialErr = getUint32FromString(value)
697         }
698         if value, ok := config["pubsub_channels"]; ok {
699                 info.Fields.Stats.PubsubChannels = getUint32FromString(value)
700         }
701 }
702
703 func updateCpuInfoFields(config ConfigInfo, info *DbInfo) {
704         if value, ok := config["used_cpu_sys"]; ok {
705                 info.Fields.Cpu.UsedCpuSys = getFloat64FromString(value)
706         }
707         if value, ok := config["used_cpu_user"]; ok {
708                 info.Fields.Cpu.UsedCpuUser = getFloat64FromString(value)
709         }
710 }
711
712 func updateCommandstatsInfoFields(config ConfigInfo, info *DbInfo) {
713         if values, ok := config["cmdstat_replconf"]; ok {
714                 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatReplconf, values)
715         }
716         if values, ok := config["cmdstat_keys"]; ok {
717                 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatKeys, values)
718         }
719         if values, ok := config["cmdstat_role"]; ok {
720                 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatRole, values)
721         }
722         if values, ok := config["cmdstat_psync"]; ok {
723                 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatPsync, values)
724         }
725         if values, ok := config["cmdstat_mset"]; ok {
726                 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatMset, values)
727         }
728         if values, ok := config["cmdstat_publish"]; ok {
729                 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatPublish, values)
730         }
731         if values, ok := config["cmdstat_info"]; ok {
732                 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatInfo, values)
733         }
734         if values, ok := config["cmdstat_ping"]; ok {
735                 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatPing, values)
736         }
737         if values, ok := config["cmdstat_client"]; ok {
738                 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatClient, values)
739         }
740         if values, ok := config["cmdstat_command"]; ok {
741                 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatCommand, values)
742         }
743         if values, ok := config["cmdstat_subscribe"]; ok {
744                 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatSubscribe, values)
745         }
746         if values, ok := config["cmdstat_monitor"]; ok {
747                 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatMonitor, values)
748         }
749         if values, ok := config["cmdstat_config"]; ok {
750                 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatConfig, values)
751         }
752         if values, ok := config["cmdstat_slaveof"]; ok {
753                 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatSlaveof, values)
754         }
755 }
756
757 func updateKeyspaceInfoFields(config ConfigInfo, info *DbInfo) {
758         if values, ok := config["db0"]; ok {
759                 updateKeyspaceValues(&info.Fields.Keyspace.Db, values)
760         }
761 }
762
763 func getConfigInfo(input []string) ConfigInfo {
764         config := ConfigInfo{}
765         for _, line := range input {
766                 if i := strings.Index(line, ":"); i != -1 {
767                         if key := strings.TrimSpace(line[:i]); len(key) > 0 {
768                                 if len(line) > i {
769                                         config[key] = strings.TrimSpace(line[i+1:])
770                                 }
771                         }
772                 }
773         }
774         return config
775 }
776
777 func readRedisInfoReplyFields(input []string, info *DbInfo) {
778         config := getConfigInfo(input)
779
780         if value, ok := config["role"]; ok {
781                 if "master" == value {
782                         info.Fields.PrimaryRole = true
783                 }
784         }
785         if value, ok := config["connected_slaves"]; ok {
786                 info.Fields.ConnectedReplicaCnt = getUint32FromString(value)
787         }
788         updateServerInfoFields(config, info)
789         updateClientInfoFields(config, info)
790         updateMemoryInfoFields(config, info)
791         updateStatsInfoFields(config, info)
792         updateCpuInfoFields(config, info)
793         updateCommandstatsInfoFields(config, info)
794         updateKeyspaceInfoFields(config, info)
795 }
796
797 func (db *DB) State() (*DbState, error) {
798         dbState := new(DbState)
799         if db.sentinelPort != "" {
800                 //Establish connection to Redis sentinel. The reason why connection is done
801                 //here instead of time of the SDL instance creation is that for the time being
802                 //sentinel connection is needed only here to get state information and
803                 //state information is needed only by 'sdlcli' hence it is not time critical
804                 //and also we want to avoid opening unnecessary TCP connections towards Redis
805                 //sentinel for every SDL instance. Now it is done only when 'sdlcli' is used.
806                 sentinelClient := db.sentinel(db.addr, db.sentinelPort, db.masterName, db.nodeCnt)
807                 return sentinelClient.GetDbState()
808         } else {
809                 info, err := db.Info()
810                 if err != nil {
811                         dbState.PrimaryDbState.Err = err
812                         return dbState, err
813                 }
814                 return db.fillDbStateFromDbInfo(info)
815         }
816 }
817
818 func (db *DB) fillDbStateFromDbInfo(info *DbInfo) (*DbState, error) {
819         var dbState DbState
820         if info.Fields.PrimaryRole == true {
821                 dbState = DbState{
822                         PrimaryDbState: PrimaryDbState{
823                                 Fields: PrimaryDbStateFields{
824                                         Role:  "master",
825                                         Ip:    db.addr,
826                                         Port:  db.port,
827                                         Flags: "master",
828                                 },
829                         },
830                 }
831         }
832
833         cnt, err := strconv.Atoi(db.nodeCnt)
834         if err != nil {
835                 dbState.Err = fmt.Errorf("DBAAS_NODE_COUNT configuration value '%s' conversion to integer failed", db.nodeCnt)
836         } else {
837                 dbState.ConfigNodeCnt = cnt
838         }
839
840         return &dbState, dbState.Err
841 }
842
843 func createReplicaDbClient(host string) *DB {
844         addr, port, _ := net.SplitHostPort(host)
845         return createDbClient(addr, port, "", "", "", newRedisClient, subscribeNotifications, nil)
846 }
847
848 func getStatisticsInfo(db *DB, host string) (*DbStatisticsInfo, error) {
849         dbStatisticsInfo := new(DbStatisticsInfo)
850         dbStatisticsInfo.IPAddr, dbStatisticsInfo.Port, _ = net.SplitHostPort(host)
851
852         info, err := db.Info()
853         if err != nil {
854                 return nil, err
855         }
856         dbStatisticsInfo.Info = info
857
858         return dbStatisticsInfo, nil
859 }
860
861 func sentinelStatistics(db *DB) (*DbStatistics, error) {
862         dbState := new(DbState)
863         dbStatistics := new(DbStatistics)
864         dbStatisticsInfo := new(DbStatisticsInfo)
865         var err error
866
867         dbState, err = db.State()
868         if err != nil {
869                 return nil, err
870         }
871
872         dbStatisticsInfo, err = getStatisticsInfo(db, dbState.PrimaryDbState.GetAddress())
873         dbStatistics.Stats = append(dbStatistics.Stats, dbStatisticsInfo)
874
875         if dbState.ReplicasDbState != nil {
876                 for _, r := range dbState.ReplicasDbState.States {
877                         replicaDb := createReplicaDbClient(r.GetAddress())
878                         dbStatisticsInfo, err = getStatisticsInfo(replicaDb, r.GetAddress())
879                         replicaDb.CloseDB()
880                         if err != nil {
881                                 return nil, err
882                         }
883                         dbStatistics.Stats = append(dbStatistics.Stats, dbStatisticsInfo)
884                 }
885         }
886
887         return dbStatistics, nil
888 }
889
890 func standaloneStatistics(db *DB) (*DbStatistics, error) {
891         dbStatistics := new(DbStatistics)
892
893         dbStatisticsInfo, err := getStatisticsInfo(db, net.JoinHostPort(db.addr, db.port))
894         dbStatistics.Stats = append(dbStatistics.Stats, dbStatisticsInfo)
895
896         return dbStatistics, err
897 }
898
899 func (db *DB) Statistics() (*DbStatistics, error) {
900         if db.sentinelPort != "" {
901                 return sentinelStatistics(db)
902         }
903
904         return standaloneStatistics(db)
905 }
906
907 var luaRefresh = redis.NewScript(`if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("pexpire", KEYS[1], ARGV[2]) else return 0 end`)
908
909 func (db *DB) PExpireIE(key string, data interface{}, expiration time.Duration) error {
910         expirationStr := strconv.FormatInt(int64(expiration/time.Millisecond), 10)
911         result, err := luaRefresh.Run(db.ctx, db.client, []string{key}, data, expirationStr).Result()
912         if err != nil {
913                 return err
914         }
915         if result == int64(1) {
916                 return nil
917         }
918         return errors.New("Lock not held")
919 }
920
921 func (sCbMap *sharedCbMap) Add(channel string, cb ChannelNotificationCb) {
922         sCbMap.m.Lock()
923         defer sCbMap.m.Unlock()
924         sCbMap.cbMap[channel] = cb
925 }
926
927 func (sCbMap *sharedCbMap) Remove(channel string) {
928         sCbMap.m.Lock()
929         defer sCbMap.m.Unlock()
930         delete(sCbMap.cbMap, channel)
931 }
932
933 func (sCbMap *sharedCbMap) Count() int {
934         sCbMap.m.Lock()
935         defer sCbMap.m.Unlock()
936         return len(sCbMap.cbMap)
937 }
938
939 func (sCbMap *sharedCbMap) GetMapCopy() map[string]ChannelNotificationCb {
940         sCbMap.m.Lock()
941         defer sCbMap.m.Unlock()
942         mapCopy := make(map[string]ChannelNotificationCb, 0)
943         for i, v := range sCbMap.cbMap {
944                 mapCopy[i] = v
945         }
946         return mapCopy
947 }