Fix multi-namespace SDL event subscribe
[ric-plt/sdlgo.git] / internal / sdlgoredis / sdlgoredis.go
1 /*
2    Copyright (c) 2019 AT&T Intellectual Property.
3    Copyright (c) 2018-2019 Nokia.
4
5    Licensed under the Apache License, Version 2.0 (the "License");
6    you may not use this file except in compliance with the License.
7    You may obtain a copy of the License at
8
9        http://www.apache.org/licenses/LICENSE-2.0
10
11    Unless required by applicable law or agreed to in writing, software
12    distributed under the License is distributed on an "AS IS" BASIS,
13    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14    See the License for the specific language governing permissions and
15    limitations under the License.
16 */
17
18 /*
19  * This source code is part of the near-RT RIC (RAN Intelligent Controller)
20  * platform project (RICP).
21  */
22
23 package sdlgoredis
24
25 import (
26         "context"
27         "errors"
28         "fmt"
29         "github.com/go-redis/redis/v8"
30         "io"
31         "log"
32         "net"
33         "os"
34         "reflect"
35         "strconv"
36         "strings"
37         "sync"
38         "time"
39 )
40
41 const EventSeparator = "___"
42 const NsSeparator = ","
43
44 type ChannelNotificationCb func(channel string, payload ...string)
45 type RedisClientCreator func(addr, port, clusterName string, isHa bool) RedisClient
46
47 type intChannels struct {
48         addChannel    chan string
49         removeChannel chan string
50         exit          chan bool
51 }
52
53 type sharedCbMap struct {
54         m     sync.Mutex
55         cbMap map[string]ChannelNotificationCb
56 }
57
58 type Config struct {
59         hostname        string
60         port            string
61         masterName      string
62         sentinelPort    string
63         clusterAddrList string
64         nodeCnt         string
65 }
66
67 type DB struct {
68         ctx          context.Context
69         client       RedisClient
70         sentinel     RedisSentinelCreateCb
71         subscribe    SubscribeFn
72         redisModules bool
73         sCbMap       *sharedCbMap
74         ch           intChannels
75         cfg          Config
76         addr         string
77 }
78
79 type Subscriber interface {
80         Channel(opts ...redis.ChannelOption) <-chan *redis.Message
81         Subscribe(ctx context.Context, channels ...string) error
82         Unsubscribe(ctx context.Context, channels ...string) error
83         Close() error
84 }
85
86 type SubscribeFn func(ctx context.Context, client RedisClient, channels ...string) Subscriber
87
88 type RedisClient interface {
89         Command(ctx context.Context) *redis.CommandsInfoCmd
90         Close() error
91         Subscribe(ctx context.Context, channels ...string) *redis.PubSub
92         MSet(ctx context.Context, pairs ...interface{}) *redis.StatusCmd
93         Do(ctx context.Context, args ...interface{}) *redis.Cmd
94         MGet(ctx context.Context, keys ...string) *redis.SliceCmd
95         Del(ctx context.Context, keys ...string) *redis.IntCmd
96         Keys(ctx context.Context, pattern string) *redis.StringSliceCmd
97         SetNX(ctx context.Context, key string, value interface{}, expiration time.Duration) *redis.BoolCmd
98         SAdd(ctx context.Context, key string, members ...interface{}) *redis.IntCmd
99         SRem(ctx context.Context, key string, members ...interface{}) *redis.IntCmd
100         SMembers(ctx context.Context, key string) *redis.StringSliceCmd
101         SIsMember(ctx context.Context, key string, member interface{}) *redis.BoolCmd
102         SCard(ctx context.Context, key string) *redis.IntCmd
103         PTTL(ctx context.Context, key string) *redis.DurationCmd
104         Eval(ctx context.Context, script string, keys []string, args ...interface{}) *redis.Cmd
105         EvalSha(ctx context.Context, sha1 string, keys []string, args ...interface{}) *redis.Cmd
106         ScriptExists(ctx context.Context, scripts ...string) *redis.BoolSliceCmd
107         ScriptLoad(ctx context.Context, script string) *redis.StringCmd
108         Info(ctx context.Context, section ...string) *redis.StringCmd
109 }
110
111 var dbLogger *logger
112
113 func init() {
114         dbLogger = &logger{
115                 log: log.New(os.Stdout, "database: ", log.LstdFlags|log.Lshortfile),
116         }
117         redis.SetLogger(dbLogger)
118 }
119
120 func SetDbLogger(out io.Writer) {
121         dbLogger.log.SetOutput(out)
122 }
123
124 func checkResultAndError(result interface{}, err error) (bool, error) {
125         if err != nil {
126                 if err == redis.Nil {
127                         return false, nil
128                 }
129                 return false, err
130         }
131         if result == "OK" {
132                 return true, nil
133         }
134         return false, nil
135 }
136
137 func checkIntResultAndError(result interface{}, err error) (bool, error) {
138         if err != nil {
139                 return false, err
140         }
141         if n, ok := result.(int64); ok {
142                 if n == 1 {
143                         return true, nil
144                 }
145         } else if n, ok := result.(int); ok {
146                 if n == 1 {
147                         return true, nil
148                 }
149         }
150         return false, nil
151 }
152
153 func subscribeNotifications(ctx context.Context, client RedisClient, channels ...string) Subscriber {
154         return client.Subscribe(ctx, channels...)
155 }
156
157 func CreateDB(client RedisClient, subscribe SubscribeFn, sentinelCreateCb RedisSentinelCreateCb, cfg Config, sentinelAddr string) *DB {
158         db := DB{
159                 ctx:          context.Background(),
160                 client:       client,
161                 sentinel:     sentinelCreateCb,
162                 subscribe:    subscribe,
163                 redisModules: true,
164                 sCbMap:       &sharedCbMap{cbMap: make(map[string]ChannelNotificationCb, 0)},
165                 ch: intChannels{
166                         addChannel:    make(chan string),
167                         removeChannel: make(chan string),
168                         exit:          make(chan bool),
169                 },
170                 cfg:  cfg,
171                 addr: sentinelAddr,
172         }
173
174         return &db
175 }
176
177 func Create() []*DB {
178         osimpl := osImpl{}
179         return ReadConfigAndCreateDbClients(osimpl, newRedisClient, subscribeNotifications, newRedisSentinel)
180 }
181
182 func readConfig(osI OS) Config {
183         cfg := Config{
184                 hostname:        osI.Getenv("DBAAS_SERVICE_HOST", "localhost"),
185                 port:            osI.Getenv("DBAAS_SERVICE_PORT", "6379"),
186                 masterName:      osI.Getenv("DBAAS_MASTER_NAME", ""),
187                 sentinelPort:    osI.Getenv("DBAAS_SERVICE_SENTINEL_PORT", ""),
188                 clusterAddrList: osI.Getenv("DBAAS_CLUSTER_ADDR_LIST", ""),
189                 nodeCnt:         osI.Getenv("DBAAS_NODE_COUNT", "1"),
190         }
191         return cfg
192 }
193
194 type OS interface {
195         Getenv(key string, defValue string) string
196 }
197
198 type osImpl struct{}
199
200 func (osImpl) Getenv(key string, defValue string) string {
201         val := os.Getenv(key)
202         if val == "" {
203                 val = defValue
204         }
205         return val
206 }
207
208 func ReadConfigAndCreateDbClients(osI OS, clientCreator RedisClientCreator,
209         subscribe SubscribeFn,
210         sentinelCreateCb RedisSentinelCreateCb) []*DB {
211         cfg := readConfig(osI)
212         return createDbClients(cfg, clientCreator, subscribe, sentinelCreateCb)
213 }
214
215 func createDbClients(cfg Config, clientCreator RedisClientCreator,
216         subscribe SubscribeFn,
217         sentinelCreateCb RedisSentinelCreateCb) []*DB {
218         if cfg.clusterAddrList == "" {
219                 return []*DB{createLegacyDbClient(cfg, clientCreator, subscribe, sentinelCreateCb)}
220         }
221
222         dbs := []*DB{}
223
224         addrList := strings.Split(cfg.clusterAddrList, ",")
225         for _, addr := range addrList {
226                 db := createDbClient(cfg, addr, clientCreator, subscribe, sentinelCreateCb)
227                 dbs = append(dbs, db)
228         }
229         return dbs
230 }
231
232 func createLegacyDbClient(cfg Config, clientCreator RedisClientCreator,
233         subscribe SubscribeFn,
234         sentinelCreateCb RedisSentinelCreateCb) *DB {
235         return createDbClient(cfg, cfg.hostname, clientCreator, subscribe, sentinelCreateCb)
236 }
237
238 func createDbClient(cfg Config, hostName string, clientCreator RedisClientCreator,
239         subscribe SubscribeFn,
240         sentinelCreateCb RedisSentinelCreateCb) *DB {
241         var client RedisClient
242         var db *DB
243         if cfg.sentinelPort == "" {
244                 client = clientCreator(hostName, cfg.port, "", false)
245                 db = CreateDB(client, subscribe, nil, cfg, hostName)
246         } else {
247                 client = clientCreator(hostName, cfg.sentinelPort, cfg.masterName, true)
248                 db = CreateDB(client, subscribe, sentinelCreateCb, cfg, hostName)
249         }
250         db.CheckCommands()
251         return db
252 }
253
254 func newRedisClient(addr, port, clusterName string, isHa bool) RedisClient {
255         if isHa == true {
256                 sentinelAddress := addr + ":" + port
257                 return redis.NewFailoverClient(
258                         &redis.FailoverOptions{
259                                 MasterName:    clusterName,
260                                 SentinelAddrs: []string{sentinelAddress},
261                                 PoolSize:      20,
262                                 MaxRetries:    2,
263                         },
264                 )
265         }
266         redisAddress := addr + ":" + port
267         return redis.NewClient(&redis.Options{
268                 Addr:       redisAddress,
269                 Password:   "", // no password set
270                 DB:         0,  // use default DB
271                 PoolSize:   20,
272                 MaxRetries: 2,
273         })
274 }
275
276 func (db *DB) CheckCommands() {
277         commands, err := db.client.Command(db.ctx).Result()
278         if err == nil {
279                 redisModuleCommands := []string{"setie", "delie", "setiepub", "setnxpub",
280                         "msetmpub", "delmpub"}
281                 for _, v := range redisModuleCommands {
282                         _, ok := commands[v]
283                         if !ok {
284                                 db.redisModules = false
285                         }
286                 }
287         } else {
288                 dbLogger.Printf(db.ctx, "SDL DB commands checking failure: %s\n", err)
289         }
290 }
291
292 func (db *DB) CloseDB() error {
293         return db.client.Close()
294 }
295
296 func (db *DB) UnsubscribeChannelDB(channels ...string) {
297         for _, v := range channels {
298                 db.sCbMap.Remove(v)
299                 db.ch.removeChannel <- v
300                 if db.sCbMap.Count() == 0 {
301                         db.ch.exit <- true
302                 }
303         }
304 }
305
306 func (db *DB) SubscribeChannelDB(cb func(string, ...string), channels ...string) {
307         if db.sCbMap.Count() == 0 {
308                 for _, v := range channels {
309                         db.sCbMap.Add(v, cb)
310                 }
311
312                 go func(sCbMap *sharedCbMap,
313                         ch intChannels,
314                         channels ...string) {
315                         sub := db.subscribe(db.ctx, db.client, channels...)
316                         rxChannel := sub.Channel()
317                         lCbMap := sCbMap.GetMapCopy()
318                         for {
319                                 select {
320                                 case msg := <-rxChannel:
321                                         cb, ok := lCbMap[msg.Channel]
322                                         if ok {
323                                                 nSChNames := strings.SplitAfterN(msg.Channel, NsSeparator, 2)
324                                                 cb(nSChNames[1], strings.Split(msg.Payload, EventSeparator)...)
325                                         }
326                                 case channel := <-ch.addChannel:
327                                         lCbMap = sCbMap.GetMapCopy()
328                                         sub.Subscribe(db.ctx, channel)
329                                 case channel := <-ch.removeChannel:
330                                         lCbMap = sCbMap.GetMapCopy()
331                                         sub.Unsubscribe(db.ctx, channel)
332                                 case exit := <-ch.exit:
333                                         if exit {
334                                                 if err := sub.Close(); err != nil {
335                                                         dbLogger.Printf(db.ctx, "SDL DB channel closing failure: %s\n", err)
336                                                 }
337                                                 return
338                                         }
339                                 }
340                         }
341                 }(db.sCbMap, db.ch, channels...)
342
343         } else {
344                 for _, v := range channels {
345                         db.sCbMap.Add(v, cb)
346                         db.ch.addChannel <- v
347                 }
348         }
349 }
350
351 func (db *DB) MSet(pairs ...interface{}) error {
352         return db.client.MSet(db.ctx, pairs...).Err()
353 }
354
355 func (db *DB) MSetMPub(channelsAndEvents []string, pairs ...interface{}) error {
356         if !db.redisModules {
357                 return errors.New("Redis deployment doesn't support MSETMPUB command")
358         }
359         command := make([]interface{}, 0)
360         command = append(command, "MSETMPUB")
361         command = append(command, len(pairs)/2)
362         command = append(command, len(channelsAndEvents)/2)
363         for _, d := range pairs {
364                 command = append(command, d)
365         }
366         for _, d := range channelsAndEvents {
367                 command = append(command, d)
368         }
369         _, err := db.client.Do(db.ctx, command...).Result()
370         return err
371 }
372
373 func (db *DB) MGet(keys []string) ([]interface{}, error) {
374         return db.client.MGet(db.ctx, keys...).Result()
375 }
376
377 func (db *DB) DelMPub(channelsAndEvents []string, keys []string) error {
378         if !db.redisModules {
379                 return errors.New("Redis deployment not supporting command DELMPUB")
380         }
381         command := make([]interface{}, 0)
382         command = append(command, "DELMPUB")
383         command = append(command, len(keys))
384         command = append(command, len(channelsAndEvents)/2)
385         for _, d := range keys {
386                 command = append(command, d)
387         }
388         for _, d := range channelsAndEvents {
389                 command = append(command, d)
390         }
391         _, err := db.client.Do(db.ctx, command...).Result()
392         return err
393
394 }
395
396 func (db *DB) Del(keys []string) error {
397         _, err := db.client.Del(db.ctx, keys...).Result()
398         return err
399 }
400
401 func (db *DB) Keys(pattern string) ([]string, error) {
402         return db.client.Keys(db.ctx, pattern).Result()
403 }
404
405 func (db *DB) SetIE(key string, oldData, newData interface{}) (bool, error) {
406         if !db.redisModules {
407                 return false, errors.New("Redis deployment not supporting command")
408         }
409
410         return checkResultAndError(db.client.Do(db.ctx, "SETIE", key, newData, oldData).Result())
411 }
412
413 func (db *DB) SetIEPub(channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error) {
414         if !db.redisModules {
415                 return false, errors.New("Redis deployment not supporting command SETIEMPUB")
416         }
417         capacity := 4 + len(channelsAndEvents)
418         command := make([]interface{}, 0, capacity)
419         command = append(command, "SETIEMPUB")
420         command = append(command, key)
421         command = append(command, newData)
422         command = append(command, oldData)
423         for _, ce := range channelsAndEvents {
424                 command = append(command, ce)
425         }
426         return checkResultAndError(db.client.Do(db.ctx, command...).Result())
427 }
428
429 func (db *DB) SetNXPub(channelsAndEvents []string, key string, data interface{}) (bool, error) {
430         if !db.redisModules {
431                 return false, errors.New("Redis deployment not supporting command SETNXMPUB")
432         }
433         capacity := 3 + len(channelsAndEvents)
434         command := make([]interface{}, 0, capacity)
435         command = append(command, "SETNXMPUB")
436         command = append(command, key)
437         command = append(command, data)
438         for _, ce := range channelsAndEvents {
439                 command = append(command, ce)
440         }
441         return checkResultAndError(db.client.Do(db.ctx, command...).Result())
442 }
443 func (db *DB) SetNX(key string, data interface{}, expiration time.Duration) (bool, error) {
444         return db.client.SetNX(db.ctx, key, data, expiration).Result()
445 }
446
447 func (db *DB) DelIEPub(channelsAndEvents []string, key string, data interface{}) (bool, error) {
448         if !db.redisModules {
449                 return false, errors.New("Redis deployment not supporting command DELIEMPUB")
450         }
451         capacity := 3 + len(channelsAndEvents)
452         command := make([]interface{}, 0, capacity)
453         command = append(command, "DELIEMPUB")
454         command = append(command, key)
455         command = append(command, data)
456         for _, ce := range channelsAndEvents {
457                 command = append(command, ce)
458         }
459         return checkIntResultAndError(db.client.Do(db.ctx, command...).Result())
460 }
461
462 func (db *DB) DelIE(key string, data interface{}) (bool, error) {
463         if !db.redisModules {
464                 return false, errors.New("Redis deployment not supporting command")
465         }
466         return checkIntResultAndError(db.client.Do(db.ctx, "DELIE", key, data).Result())
467 }
468
469 func (db *DB) SAdd(key string, data ...interface{}) error {
470         _, err := db.client.SAdd(db.ctx, key, data...).Result()
471         return err
472 }
473
474 func (db *DB) SRem(key string, data ...interface{}) error {
475         _, err := db.client.SRem(db.ctx, key, data...).Result()
476         return err
477 }
478
479 func (db *DB) SMembers(key string) ([]string, error) {
480         result, err := db.client.SMembers(db.ctx, key).Result()
481         return result, err
482 }
483
484 func (db *DB) SIsMember(key string, data interface{}) (bool, error) {
485         result, err := db.client.SIsMember(db.ctx, key, data).Result()
486         return result, err
487 }
488
489 func (db *DB) SCard(key string) (int64, error) {
490         result, err := db.client.SCard(db.ctx, key).Result()
491         return result, err
492 }
493
494 func (db *DB) PTTL(key string) (time.Duration, error) {
495         result, err := db.client.PTTL(db.ctx, key).Result()
496         return result, err
497 }
498
499 func (db *DB) Info() (*DbInfo, error) {
500         var info DbInfo
501         resultStr, err := db.client.Info(db.ctx, "all").Result()
502         if err != nil {
503                 return &info, err
504         }
505
506         result := strings.Split(strings.ReplaceAll(resultStr, "\r\n", "\n"), "\n")
507         readRedisInfoReplyFields(result, &info)
508         return &info, nil
509 }
510
511 func lineContains(line, substr string) bool {
512         return strings.Contains(line, substr)
513 }
514
515 func getFieldValueStr(line, substr string) string {
516         if idx := strings.Index(line, substr); idx != -1 {
517                 return line[idx+len(substr):]
518         }
519         return ""
520 }
521
522 func getUint32FromString(s string) uint32 {
523         if val, err := strconv.ParseUint(s, 10, 32); err == nil {
524                 return uint32(val)
525         }
526         return 0
527 }
528
529 func getUint64FromString(s string) uint64 {
530         if val, err := strconv.ParseUint(s, 10, 64); err == nil {
531                 return uint64(val)
532         }
533         return 0
534 }
535
536 func getFloatFromString(s string, bitSize int) float64 {
537         if val, err := strconv.ParseFloat(s, bitSize); err == nil {
538                 return val
539         }
540         return 0
541 }
542
543 func getFloat64FromString(s string) float64 {
544         return getFloatFromString(s, 64)
545 }
546
547 func getFloat32FromString(s string) float32 {
548         return float32(getFloatFromString(s, 32))
549 }
550
551 func getValueString(values string, key string) string {
552         slice := strings.Split(values, ",")
553         for _, s := range slice {
554                 if lineContains(s, key) {
555                         return getFieldValueStr(s, key)
556                 }
557         }
558         return ""
559 }
560
561 func getCommandstatsValues(values string) (string, string, string) {
562         calls := getValueString(values, "calls=")
563         usec := getValueString(values, "usec=")
564         usecPerCall := getValueString(values, "usec_per_call=")
565         return calls, usec, usecPerCall
566 }
567
568 func updateCommandstatsValues(i interface{}, values string) {
569         stype := reflect.ValueOf(i).Elem()
570         callsStr, usecStr, usecPerCallStr := getCommandstatsValues(values)
571
572         callsField := stype.FieldByName("Calls")
573         callsField.Set(reflect.ValueOf(getUint32FromString(callsStr)))
574
575         usecField := stype.FieldByName("Usec")
576         usecField.Set(reflect.ValueOf(getUint32FromString(usecStr)))
577
578         usecPerCallField := stype.FieldByName("UsecPerCall")
579         usecPerCallField.Set(reflect.ValueOf(getFloat32FromString(usecPerCallStr)))
580 }
581
582 func getKeyspaceValues(values string) (string, string, string) {
583         keys := getValueString(values, "keys=")
584         expires := getValueString(values, "expires=")
585         avgttl := getValueString(values, "avg_ttl=")
586         return keys, expires, avgttl
587 }
588
589 func updateKeyspaceValues(i interface{}, values string) {
590         stype := reflect.ValueOf(i).Elem()
591         keysStr, expiresStr, avgttlStr := getKeyspaceValues(values)
592
593         keysField := stype.FieldByName("Keys")
594         keysField.Set(reflect.ValueOf(getUint32FromString(keysStr)))
595
596         expiresField := stype.FieldByName("Expires")
597         expiresField.Set(reflect.ValueOf(getUint32FromString(expiresStr)))
598
599         avgttlField := stype.FieldByName("AvgTtl")
600         avgttlField.Set(reflect.ValueOf(getUint32FromString(avgttlStr)))
601 }
602
603 func updateServerInfoFields(config ConfigInfo, info *DbInfo) {
604         if value, ok := config["uptime_in_days"]; ok {
605                 info.Fields.Server.UptimeInDays = getUint32FromString(value)
606         }
607 }
608
609 func updateClientInfoFields(config ConfigInfo, info *DbInfo) {
610         if value, ok := config["connected_clients"]; ok {
611                 info.Fields.Clients.ConnectedClients = getUint32FromString(value)
612         }
613         if value, ok := config["client_recent_max_input_buffer"]; ok {
614                 info.Fields.Clients.ClientRecentMaxInputBuffer = getUint32FromString(value)
615         }
616         if value, ok := config["client_recent_max_output_buffer"]; ok {
617                 info.Fields.Clients.ClientRecentMaxOutputBuffer = getUint32FromString(value)
618         }
619 }
620
621 func updateMemoryInfoFields(config ConfigInfo, info *DbInfo) {
622         if value, ok := config["used_memory"]; ok {
623                 info.Fields.Memory.UsedMemory = getUint64FromString(value)
624         }
625         if value, ok := config["used_memory_human"]; ok {
626                 info.Fields.Memory.UsedMemoryHuman = value
627         }
628         if value, ok := config["used_memory_rss"]; ok {
629                 info.Fields.Memory.UsedMemoryRss = getUint64FromString(value)
630         }
631         if value, ok := config["used_memory_rss_human"]; ok {
632                 info.Fields.Memory.UsedMemoryRssHuman = value
633         }
634         if value, ok := config["used_memory_peak"]; ok {
635                 info.Fields.Memory.UsedMemoryPeak = getUint64FromString(value)
636         }
637         if value, ok := config["used_memory_peak_human"]; ok {
638                 info.Fields.Memory.UsedMemoryPeakHuman = value
639         }
640         if value, ok := config["used_memory_peak_perc"]; ok {
641                 info.Fields.Memory.UsedMemoryPeakPerc = value
642         }
643         if value, ok := config["mem_fragmentation_ratio"]; ok {
644                 info.Fields.Memory.MemFragmentationRatio = getFloat32FromString(value)
645         }
646         if value, ok := config["mem_fragmentation_bytes"]; ok {
647                 info.Fields.Memory.MemFragmentationBytes = getUint32FromString(value)
648         }
649 }
650
651 func updateStatsInfoFields(config ConfigInfo, info *DbInfo) {
652         if value, ok := config["total_connections_received"]; ok {
653                 info.Fields.Stats.TotalConnectionsReceived = getUint32FromString(value)
654         }
655         if value, ok := config["total_commands_processed"]; ok {
656                 info.Fields.Stats.TotalCommandsProcessed = getUint32FromString(value)
657         }
658         if value, ok := config["sync_full"]; ok {
659                 info.Fields.Stats.SyncFull = getUint32FromString(value)
660         }
661         if value, ok := config["sync_partial_ok"]; ok {
662                 info.Fields.Stats.SyncPartialOk = getUint32FromString(value)
663         }
664         if value, ok := config["sync_partial_err"]; ok {
665                 info.Fields.Stats.SyncPartialErr = getUint32FromString(value)
666         }
667         if value, ok := config["pubsub_channels"]; ok {
668                 info.Fields.Stats.PubsubChannels = getUint32FromString(value)
669         }
670 }
671
672 func updateCpuInfoFields(config ConfigInfo, info *DbInfo) {
673         if value, ok := config["used_cpu_sys"]; ok {
674                 info.Fields.Cpu.UsedCpuSys = getFloat64FromString(value)
675         }
676         if value, ok := config["used_cpu_user"]; ok {
677                 info.Fields.Cpu.UsedCpuUser = getFloat64FromString(value)
678         }
679 }
680
681 func updateCommandstatsInfoFields(config ConfigInfo, info *DbInfo) {
682         if values, ok := config["cmdstat_replconf"]; ok {
683                 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatReplconf, values)
684         }
685         if values, ok := config["cmdstat_keys"]; ok {
686                 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatKeys, values)
687         }
688         if values, ok := config["cmdstat_role"]; ok {
689                 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatRole, values)
690         }
691         if values, ok := config["cmdstat_psync"]; ok {
692                 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatPsync, values)
693         }
694         if values, ok := config["cmdstat_mset"]; ok {
695                 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatMset, values)
696         }
697         if values, ok := config["cmdstat_publish"]; ok {
698                 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatPublish, values)
699         }
700         if values, ok := config["cmdstat_info"]; ok {
701                 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatInfo, values)
702         }
703         if values, ok := config["cmdstat_ping"]; ok {
704                 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatPing, values)
705         }
706         if values, ok := config["cmdstat_client"]; ok {
707                 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatClient, values)
708         }
709         if values, ok := config["cmdstat_command"]; ok {
710                 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatCommand, values)
711         }
712         if values, ok := config["cmdstat_subscribe"]; ok {
713                 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatSubscribe, values)
714         }
715         if values, ok := config["cmdstat_monitor"]; ok {
716                 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatMonitor, values)
717         }
718         if values, ok := config["cmdstat_config"]; ok {
719                 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatConfig, values)
720         }
721         if values, ok := config["cmdstat_slaveof"]; ok {
722                 updateCommandstatsValues(&info.Fields.Commandstats.CmdstatSlaveof, values)
723         }
724 }
725
726 func updateKeyspaceInfoFields(config ConfigInfo, info *DbInfo) {
727         if values, ok := config["db0"]; ok {
728                 updateKeyspaceValues(&info.Fields.Keyspace.Db, values)
729         }
730 }
731
732 func getConfigInfo(input []string) ConfigInfo {
733         config := ConfigInfo{}
734         for _, line := range input {
735                 if i := strings.Index(line, ":"); i != -1 {
736                         if key := strings.TrimSpace(line[:i]); len(key) > 0 {
737                                 if len(line) > i {
738                                         config[key] = strings.TrimSpace(line[i+1:])
739                                 }
740                         }
741                 }
742         }
743         return config
744 }
745
746 func readRedisInfoReplyFields(input []string, info *DbInfo) {
747         config := getConfigInfo(input)
748
749         if value, ok := config["role"]; ok {
750                 if "master" == value {
751                         info.Fields.PrimaryRole = true
752                 }
753         }
754         if value, ok := config["connected_slaves"]; ok {
755                 info.Fields.ConnectedReplicaCnt = getUint32FromString(value)
756         }
757         updateServerInfoFields(config, info)
758         updateClientInfoFields(config, info)
759         updateMemoryInfoFields(config, info)
760         updateStatsInfoFields(config, info)
761         updateCpuInfoFields(config, info)
762         updateCommandstatsInfoFields(config, info)
763         updateKeyspaceInfoFields(config, info)
764 }
765
766 func (db *DB) State() (*DbState, error) {
767         dbState := new(DbState)
768         if db.cfg.sentinelPort != "" {
769                 //Establish connection to Redis sentinel. The reason why connection is done
770                 //here instead of time of the SDL instance creation is that for the time being
771                 //sentinel connection is needed only here to get state information and
772                 //state information is needed only by 'sdlcli' hence it is not time critical
773                 //and also we want to avoid opening unnecessary TCP connections towards Redis
774                 //sentinel for every SDL instance. Now it is done only when 'sdlcli' is used.
775                 sentinelClient := db.sentinel(&db.cfg, db.addr)
776                 return sentinelClient.GetDbState()
777         } else {
778                 info, err := db.Info()
779                 if err != nil {
780                         dbState.PrimaryDbState.Err = err
781                         return dbState, err
782                 }
783                 return db.fillDbStateFromDbInfo(info)
784         }
785 }
786
787 func (db *DB) fillDbStateFromDbInfo(info *DbInfo) (*DbState, error) {
788         var dbState DbState
789         if info.Fields.PrimaryRole == true {
790                 dbState = DbState{
791                         PrimaryDbState: PrimaryDbState{
792                                 Fields: PrimaryDbStateFields{
793                                         Role:  "master",
794                                         Ip:    db.cfg.hostname,
795                                         Port:  db.cfg.port,
796                                         Flags: "master",
797                                 },
798                         },
799                 }
800         }
801
802         cnt, err := strconv.Atoi(db.cfg.nodeCnt)
803         if err != nil {
804                 dbState.Err = fmt.Errorf("DBAAS_NODE_COUNT configuration value '%s' conversion to integer failed", db.cfg.nodeCnt)
805         } else {
806                 dbState.ConfigNodeCnt = cnt
807         }
808
809         return &dbState, dbState.Err
810 }
811
812 func createReplicaDbClient(host string) *DB {
813         cfg := readConfig(osImpl{})
814         cfg.sentinelPort = ""
815         cfg.clusterAddrList, cfg.port, _ = net.SplitHostPort(host)
816         return createDbClient(cfg, cfg.clusterAddrList, newRedisClient, subscribeNotifications, nil)
817 }
818
819 func getStatisticsInfo(db *DB, host string) (*DbStatisticsInfo, error) {
820         dbStatisticsInfo := new(DbStatisticsInfo)
821         dbStatisticsInfo.IPAddr, dbStatisticsInfo.Port, _ = net.SplitHostPort(host)
822
823         info, err := db.Info()
824         if err != nil {
825                 return nil, err
826         }
827         dbStatisticsInfo.Info = info
828
829         return dbStatisticsInfo, nil
830 }
831
832 func sentinelStatistics(db *DB) (*DbStatistics, error) {
833         dbState := new(DbState)
834         dbStatistics := new(DbStatistics)
835         dbStatisticsInfo := new(DbStatisticsInfo)
836         var err error
837
838         dbState, err = db.State()
839         if err != nil {
840                 return nil, err
841         }
842
843         dbStatisticsInfo, err = getStatisticsInfo(db, dbState.PrimaryDbState.GetAddress())
844         dbStatistics.Stats = append(dbStatistics.Stats, dbStatisticsInfo)
845
846         if dbState.ReplicasDbState != nil {
847                 for _, r := range dbState.ReplicasDbState.States {
848                         replicaDb := createReplicaDbClient(r.GetAddress())
849                         dbStatisticsInfo, err = getStatisticsInfo(replicaDb, r.GetAddress())
850                         replicaDb.CloseDB()
851                         if err != nil {
852                                 return nil, err
853                         }
854                         dbStatistics.Stats = append(dbStatistics.Stats, dbStatisticsInfo)
855                 }
856         }
857
858         return dbStatistics, nil
859 }
860
861 func standaloneStatistics(db *DB) (*DbStatistics, error) {
862         dbStatistics := new(DbStatistics)
863
864         dbStatisticsInfo, err := getStatisticsInfo(db, net.JoinHostPort(db.cfg.hostname, db.cfg.port))
865         dbStatistics.Stats = append(dbStatistics.Stats, dbStatisticsInfo)
866
867         return dbStatistics, err
868 }
869
870 func (db *DB) Statistics() (*DbStatistics, error) {
871         if db.cfg.sentinelPort != "" {
872                 return sentinelStatistics(db)
873         }
874
875         return standaloneStatistics(db)
876 }
877
878 var luaRefresh = redis.NewScript(`if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("pexpire", KEYS[1], ARGV[2]) else return 0 end`)
879
880 func (db *DB) PExpireIE(key string, data interface{}, expiration time.Duration) error {
881         expirationStr := strconv.FormatInt(int64(expiration/time.Millisecond), 10)
882         result, err := luaRefresh.Run(db.ctx, db.client, []string{key}, data, expirationStr).Result()
883         if err != nil {
884                 return err
885         }
886         if result == int64(1) {
887                 return nil
888         }
889         return errors.New("Lock not held")
890 }
891
892 func (sCbMap *sharedCbMap) Add(channel string, cb ChannelNotificationCb) {
893         sCbMap.m.Lock()
894         defer sCbMap.m.Unlock()
895         sCbMap.cbMap[channel] = cb
896 }
897
898 func (sCbMap *sharedCbMap) Remove(channel string) {
899         sCbMap.m.Lock()
900         defer sCbMap.m.Unlock()
901         delete(sCbMap.cbMap, channel)
902 }
903
904 func (sCbMap *sharedCbMap) Count() int {
905         sCbMap.m.Lock()
906         defer sCbMap.m.Unlock()
907         return len(sCbMap.cbMap)
908 }
909
910 func (sCbMap *sharedCbMap) GetMapCopy() map[string]ChannelNotificationCb {
911         sCbMap.m.Lock()
912         defer sCbMap.m.Unlock()
913         mapCopy := make(map[string]ChannelNotificationCb, 0)
914         for i, v := range sCbMap.cbMap {
915                 mapCopy[i] = v
916         }
917         return mapCopy
918 }