0b9323975032c2156a15733e0f90fae987d97f7f
[ric-plt/sdlgo.git] / internal / sdlgoredis / sdlgoredis.go
1 /*
2    Copyright (c) 2019 AT&T Intellectual Property.
3    Copyright (c) 2018-2019 Nokia.
4
5    Licensed under the Apache License, Version 2.0 (the "License");
6    you may not use this file except in compliance with the License.
7    You may obtain a copy of the License at
8
9        http://www.apache.org/licenses/LICENSE-2.0
10
11    Unless required by applicable law or agreed to in writing, software
12    distributed under the License is distributed on an "AS IS" BASIS,
13    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14    See the License for the specific language governing permissions and
15    limitations under the License.
16 */
17
18 /*
19  * This source code is part of the near-RT RIC (RAN Intelligent Controller)
20  * platform project (RICP).
21  */
22
23 package sdlgoredis
24
25 import (
26         "errors"
27         "fmt"
28         "github.com/go-redis/redis/v7"
29         "io"
30         "log"
31         "net"
32         "os"
33         "reflect"
34         "strconv"
35         "strings"
36         "sync"
37         "time"
38 )
39
40 type ChannelNotificationCb func(channel string, payload ...string)
41 type RedisClientCreator func(addr, port, clusterName string, isHa bool) RedisClient
42
43 type intChannels struct {
44         addChannel    chan string
45         removeChannel chan string
46         exit          chan bool
47 }
48
49 type sharedCbMap struct {
50         m     sync.Mutex
51         cbMap map[string]ChannelNotificationCb
52 }
53
54 type Config struct {
55         hostname        string
56         port            string
57         masterName      string
58         sentinelPort    string
59         clusterAddrList string
60         nodeCnt         string
61 }
62
63 type DB struct {
64         client       RedisClient
65         sentinel     RedisSentinelCreateCb
66         subscribe    SubscribeFn
67         redisModules bool
68         sCbMap       *sharedCbMap
69         ch           intChannels
70         cfg          Config
71         addr         string
72 }
73
74 type Subscriber interface {
75         Channel() <-chan *redis.Message
76         Subscribe(channels ...string) error
77         Unsubscribe(channels ...string) error
78         Close() error
79 }
80
81 type SubscribeFn func(client RedisClient, channels ...string) Subscriber
82
83 type RedisClient interface {
84         Command() *redis.CommandsInfoCmd
85         Close() error
86         Subscribe(channels ...string) *redis.PubSub
87         MSet(pairs ...interface{}) *redis.StatusCmd
88         Do(args ...interface{}) *redis.Cmd
89         MGet(keys ...string) *redis.SliceCmd
90         Del(keys ...string) *redis.IntCmd
91         Keys(pattern string) *redis.StringSliceCmd
92         SetNX(key string, value interface{}, expiration time.Duration) *redis.BoolCmd
93         SAdd(key string, members ...interface{}) *redis.IntCmd
94         SRem(key string, members ...interface{}) *redis.IntCmd
95         SMembers(key string) *redis.StringSliceCmd
96         SIsMember(key string, member interface{}) *redis.BoolCmd
97         SCard(key string) *redis.IntCmd
98         PTTL(key string) *redis.DurationCmd
99         Eval(script string, keys []string, args ...interface{}) *redis.Cmd
100         EvalSha(sha1 string, keys []string, args ...interface{}) *redis.Cmd
101         ScriptExists(scripts ...string) *redis.BoolSliceCmd
102         ScriptLoad(script string) *redis.StringCmd
103         Info(section ...string) *redis.StringCmd
104 }
105
106 var dbLogger *log.Logger
107
108 func init() {
109         dbLogger = log.New(os.Stdout, "database: ", log.LstdFlags|log.Lshortfile)
110         redis.SetLogger(dbLogger)
111 }
112
113 func SetDbLogger(out io.Writer) {
114         dbLogger.SetOutput(out)
115 }
116
117 func checkResultAndError(result interface{}, err error) (bool, error) {
118         if err != nil {
119                 if err == redis.Nil {
120                         return false, nil
121                 }
122                 return false, err
123         }
124         if result == "OK" {
125                 return true, nil
126         }
127         return false, nil
128 }
129
130 func checkIntResultAndError(result interface{}, err error) (bool, error) {
131         if err != nil {
132                 return false, err
133         }
134         if n, ok := result.(int64); ok {
135                 if n == 1 {
136                         return true, nil
137                 }
138         } else if n, ok := result.(int); ok {
139                 if n == 1 {
140                         return true, nil
141                 }
142         }
143         return false, nil
144 }
145
146 func subscribeNotifications(client RedisClient, channels ...string) Subscriber {
147         return client.Subscribe(channels...)
148 }
149
150 func CreateDB(client RedisClient, subscribe SubscribeFn, sentinelCreateCb RedisSentinelCreateCb, cfg Config, sentinelAddr string) *DB {
151         db := DB{
152                 client:       client,
153                 sentinel:     sentinelCreateCb,
154                 subscribe:    subscribe,
155                 redisModules: true,
156                 sCbMap:       &sharedCbMap{cbMap: make(map[string]ChannelNotificationCb, 0)},
157                 ch: intChannels{
158                         addChannel:    make(chan string),
159                         removeChannel: make(chan string),
160                         exit:          make(chan bool),
161                 },
162                 cfg:  cfg,
163                 addr: sentinelAddr,
164         }
165
166         return &db
167 }
168
169 func Create() []*DB {
170         osimpl := osImpl{}
171         return ReadConfigAndCreateDbClients(osimpl, newRedisClient, subscribeNotifications, newRedisSentinel)
172 }
173
174 func readConfig(osI OS) Config {
175         cfg := Config{
176                 hostname:        osI.Getenv("DBAAS_SERVICE_HOST", "localhost"),
177                 port:            osI.Getenv("DBAAS_SERVICE_PORT", "6379"),
178                 masterName:      osI.Getenv("DBAAS_MASTER_NAME", ""),
179                 sentinelPort:    osI.Getenv("DBAAS_SERVICE_SENTINEL_PORT", ""),
180                 clusterAddrList: osI.Getenv("DBAAS_CLUSTER_ADDR_LIST", ""),
181                 nodeCnt:         osI.Getenv("DBAAS_NODE_COUNT", "1"),
182         }
183         return cfg
184 }
185
186 type OS interface {
187         Getenv(key string, defValue string) string
188 }
189
190 type osImpl struct{}
191
192 func (osImpl) Getenv(key string, defValue string) string {
193         val := os.Getenv(key)
194         if val == "" {
195                 val = defValue
196         }
197         return val
198 }
199
200 func ReadConfigAndCreateDbClients(osI OS, clientCreator RedisClientCreator,
201         subscribe SubscribeFn,
202         sentinelCreateCb RedisSentinelCreateCb) []*DB {
203         cfg := readConfig(osI)
204         return createDbClients(cfg, clientCreator, subscribe, sentinelCreateCb)
205 }
206
207 func createDbClients(cfg Config, clientCreator RedisClientCreator,
208         subscribe SubscribeFn,
209         sentinelCreateCb RedisSentinelCreateCb) []*DB {
210         if cfg.clusterAddrList == "" {
211                 return []*DB{createLegacyDbClient(cfg, clientCreator, subscribe, sentinelCreateCb)}
212         }
213
214         dbs := []*DB{}
215
216         addrList := strings.Split(cfg.clusterAddrList, ",")
217         for _, addr := range addrList {
218                 db := createDbClient(cfg, addr, clientCreator, subscribe, sentinelCreateCb)
219                 dbs = append(dbs, db)
220         }
221         return dbs
222 }
223
224 func createLegacyDbClient(cfg Config, clientCreator RedisClientCreator,
225         subscribe SubscribeFn,
226         sentinelCreateCb RedisSentinelCreateCb) *DB {
227         return createDbClient(cfg, cfg.hostname, clientCreator, subscribe, sentinelCreateCb)
228 }
229
230 func createDbClient(cfg Config, hostName string, clientCreator RedisClientCreator,
231         subscribe SubscribeFn,
232         sentinelCreateCb RedisSentinelCreateCb) *DB {
233         var client RedisClient
234         var db *DB
235         if cfg.sentinelPort == "" {
236                 client = clientCreator(hostName, cfg.port, "", false)
237                 db = CreateDB(client, subscribe, nil, cfg, hostName)
238         } else {
239                 client = clientCreator(hostName, cfg.sentinelPort, cfg.masterName, true)
240                 db = CreateDB(client, subscribe, sentinelCreateCb, cfg, hostName)
241         }
242         db.CheckCommands()
243         return db
244 }
245
246 func newRedisClient(addr, port, clusterName string, isHa bool) RedisClient {
247         if isHa == true {
248                 sentinelAddress := addr + ":" + port
249                 return redis.NewFailoverClient(
250                         &redis.FailoverOptions{
251                                 MasterName:    clusterName,
252                                 SentinelAddrs: []string{sentinelAddress},
253                                 PoolSize:      20,
254                                 MaxRetries:    2,
255                         },
256                 )
257         }
258         redisAddress := addr + ":" + port
259         return redis.NewClient(&redis.Options{
260                 Addr:       redisAddress,
261                 Password:   "", // no password set
262                 DB:         0,  // use default DB
263                 PoolSize:   20,
264                 MaxRetries: 2,
265         })
266 }
267
268 func (db *DB) CheckCommands() {
269         commands, err := db.client.Command().Result()
270         if err == nil {
271                 redisModuleCommands := []string{"setie", "delie", "setiepub", "setnxpub",
272                         "msetmpub", "delmpub"}
273                 for _, v := range redisModuleCommands {
274                         _, ok := commands[v]
275                         if !ok {
276                                 db.redisModules = false
277                         }
278                 }
279         } else {
280                 dbLogger.Printf("SDL DB commands checking failure: %s\n", err)
281         }
282 }
283
284 func (db *DB) CloseDB() error {
285         return db.client.Close()
286 }
287
288 func (db *DB) UnsubscribeChannelDB(channels ...string) {
289         for _, v := range channels {
290                 db.sCbMap.Remove(v)
291                 db.ch.removeChannel <- v
292                 if db.sCbMap.Count() == 0 {
293                         db.ch.exit <- true
294                 }
295         }
296 }
297
298 func (db *DB) SubscribeChannelDB(cb func(string, ...string), channelPrefix, eventSeparator string, channels ...string) {
299         if db.sCbMap.Count() == 0 {
300                 for _, v := range channels {
301                         db.sCbMap.Add(v, cb)
302                 }
303
304                 go func(sCbMap *sharedCbMap,
305                         channelPrefix,
306                         eventSeparator string,
307                         ch intChannels,
308                         channels ...string) {
309                         sub := db.subscribe(db.client, channels...)
310                         rxChannel := sub.Channel()
311                         lCbMap := sCbMap.GetMapCopy()
312                         for {
313                                 select {
314                                 case msg := <-rxChannel:
315                                         cb, ok := lCbMap[msg.Channel]
316                                         if ok {
317                                                 cb(strings.TrimPrefix(msg.Channel, channelPrefix), strings.Split(msg.Payload, eventSeparator)...)
318                                         }
319                                 case channel := <-ch.addChannel:
320                                         lCbMap = sCbMap.GetMapCopy()
321                                         sub.Subscribe(channel)
322                                 case channel := <-ch.removeChannel:
323                                         lCbMap = sCbMap.GetMapCopy()
324                                         sub.Unsubscribe(channel)
325                                 case exit := <-ch.exit:
326                                         if exit {
327                                                 if err := sub.Close(); err != nil {
328                                                         dbLogger.Printf("SDL DB channel closing failure: %s\n", err)
329                                                 }
330                                                 return
331                                         }
332                                 }
333                         }
334                 }(db.sCbMap, channelPrefix, eventSeparator, db.ch, channels...)
335
336         } else {
337                 for _, v := range channels {
338                         db.sCbMap.Add(v, cb)
339                         db.ch.addChannel <- v
340                 }
341         }
342 }
343
344 func (db *DB) MSet(pairs ...interface{}) error {
345         return db.client.MSet(pairs...).Err()
346 }
347
348 func (db *DB) MSetMPub(channelsAndEvents []string, pairs ...interface{}) error {
349         if !db.redisModules {
350                 return errors.New("Redis deployment doesn't support MSETMPUB command")
351         }
352         command := make([]interface{}, 0)
353         command = append(command, "MSETMPUB")
354         command = append(command, len(pairs)/2)
355         command = append(command, len(channelsAndEvents)/2)
356         for _, d := range pairs {
357                 command = append(command, d)
358         }
359         for _, d := range channelsAndEvents {
360                 command = append(command, d)
361         }
362         _, err := db.client.Do(command...).Result()
363         return err
364 }
365
366 func (db *DB) MGet(keys []string) ([]interface{}, error) {
367         return db.client.MGet(keys...).Result()
368 }
369
370 func (db *DB) DelMPub(channelsAndEvents []string, keys []string) error {
371         if !db.redisModules {
372                 return errors.New("Redis deployment not supporting command DELMPUB")
373         }
374         command := make([]interface{}, 0)
375         command = append(command, "DELMPUB")
376         command = append(command, len(keys))
377         command = append(command, len(channelsAndEvents)/2)
378         for _, d := range keys {
379                 command = append(command, d)
380         }
381         for _, d := range channelsAndEvents {
382                 command = append(command, d)
383         }
384         _, err := db.client.Do(command...).Result()
385         return err
386
387 }
388
389 func (db *DB) Del(keys []string) error {
390         _, err := db.client.Del(keys...).Result()
391         return err
392 }
393
394 func (db *DB) Keys(pattern string) ([]string, error) {
395         return db.client.Keys(pattern).Result()
396 }
397
398 func (db *DB) SetIE(key string, oldData, newData interface{}) (bool, error) {
399         if !db.redisModules {
400                 return false, errors.New("Redis deployment not supporting command")
401         }
402
403         return checkResultAndError(db.client.Do("SETIE", key, newData, oldData).Result())
404 }
405
406 func (db *DB) SetIEPub(channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error) {
407         if !db.redisModules {
408                 return false, errors.New("Redis deployment not supporting command SETIEMPUB")
409         }
410         capacity := 4 + len(channelsAndEvents)
411         command := make([]interface{}, 0, capacity)
412         command = append(command, "SETIEMPUB")
413         command = append(command, key)
414         command = append(command, newData)
415         command = append(command, oldData)
416         for _, ce := range channelsAndEvents {
417                 command = append(command, ce)
418         }
419         return checkResultAndError(db.client.Do(command...).Result())
420 }
421
422 func (db *DB) SetNXPub(channelsAndEvents []string, key string, data interface{}) (bool, error) {
423         if !db.redisModules {
424                 return false, errors.New("Redis deployment not supporting command SETNXMPUB")
425         }
426         capacity := 3 + len(channelsAndEvents)
427         command := make([]interface{}, 0, capacity)
428         command = append(command, "SETNXMPUB")
429         command = append(command, key)
430         command = append(command, data)
431         for _, ce := range channelsAndEvents {
432                 command = append(command, ce)
433         }
434         return checkResultAndError(db.client.Do(command...).Result())
435 }
436 func (db *DB) SetNX(key string, data interface{}, expiration time.Duration) (bool, error) {
437         return db.client.SetNX(key, data, expiration).Result()
438 }
439
440 func (db *DB) DelIEPub(channelsAndEvents []string, key string, data interface{}) (bool, error) {
441         if !db.redisModules {
442                 return false, errors.New("Redis deployment not supporting command DELIEMPUB")
443         }
444         capacity := 3 + len(channelsAndEvents)
445         command := make([]interface{}, 0, capacity)
446         command = append(command, "DELIEMPUB")
447         command = append(command, key)
448         command = append(command, data)
449         for _, ce := range channelsAndEvents {
450                 command = append(command, ce)
451         }
452         return checkIntResultAndError(db.client.Do(command...).Result())
453 }
454
455 func (db *DB) DelIE(key string, data interface{}) (bool, error) {
456         if !db.redisModules {
457                 return false, errors.New("Redis deployment not supporting command")
458         }
459         return checkIntResultAndError(db.client.Do("DELIE", key, data).Result())
460 }
461
462 func (db *DB) SAdd(key string, data ...interface{}) error {
463         _, err := db.client.SAdd(key, data...).Result()
464         return err
465 }
466
467 func (db *DB) SRem(key string, data ...interface{}) error {
468         _, err := db.client.SRem(key, data...).Result()
469         return err
470 }
471
472 func (db *DB) SMembers(key string) ([]string, error) {
473         result, err := db.client.SMembers(key).Result()
474         return result, err
475 }
476
477 func (db *DB) SIsMember(key string, data interface{}) (bool, error) {
478         result, err := db.client.SIsMember(key, data).Result()
479         return result, err
480 }
481
482 func (db *DB) SCard(key string) (int64, error) {
483         result, err := db.client.SCard(key).Result()
484         return result, err
485 }
486
487 func (db *DB) PTTL(key string) (time.Duration, error) {
488         result, err := db.client.PTTL(key).Result()
489         return result, err
490 }
491
492 func (db *DB) Info() (*DbInfo, error) {
493         var info DbInfo
494         resultStr, err := db.client.Info("all").Result()
495         if err != nil {
496                 return &info, err
497         }
498
499         result := strings.Split(strings.ReplaceAll(resultStr, "\r\n", "\n"), "\n")
500         err = readRedisInfoReplyFields(result, &info)
501         return &info, err
502 }
503
504 func lineContains(line, substr string) bool {
505         return strings.Contains(line, substr)
506 }
507
508 func getFieldValueStr(line, substr string) string {
509         if idx := strings.Index(line, substr); idx != -1 {
510                 return line[idx+len(substr):]
511         }
512         return ""
513 }
514
515 func getUint32FromString(s string) uint32 {
516         if val, err := strconv.ParseUint(s, 10, 32); err == nil {
517                 return uint32(val)
518         }
519         return 0
520 }
521
522 func getUint64FromString(s string) uint64 {
523         if val, err := strconv.ParseUint(s, 10, 64); err == nil {
524                 return uint64(val)
525         }
526         return 0
527 }
528
529 func getFloatFromString(s string, bitSize int) float64 {
530         if val, err := strconv.ParseFloat(s, bitSize); err == nil {
531                 return val
532         }
533         return 0
534 }
535
536 func getFloat64FromString(s string) float64 {
537         return getFloatFromString(s, 64)
538 }
539
540 func getFloat32FromString(s string) float32 {
541         return float32(getFloatFromString(s, 32))
542 }
543
544 func getValueString(values string, key string) string {
545         slice := strings.Split(values, ",")
546         for _, s := range slice {
547                 if lineContains(s, key) {
548                         return getFieldValueStr(s, key)
549                 }
550         }
551         return ""
552 }
553
554 func getCommandstatsValues(values string) (string, string, string) {
555         calls := getValueString(values, "calls=")
556         usec := getValueString(values, "usec=")
557         usecPerCall := getValueString(values, "usec_per_call=")
558         return calls, usec, usecPerCall
559 }
560
561 func updateCommandstatsValues(i interface{}, line, cmdstat string) {
562         stype := reflect.ValueOf(i).Elem()
563         values := getFieldValueStr(line, cmdstat)
564         callsStr, usecStr, usecPerCallStr := getCommandstatsValues(values)
565
566         callsField := stype.FieldByName("Calls")
567         callsField.Set(reflect.ValueOf(getUint32FromString(callsStr)))
568
569         usecField := stype.FieldByName("Usec")
570         usecField.Set(reflect.ValueOf(getUint32FromString(usecStr)))
571
572         usecPerCallField := stype.FieldByName("UsecPerCall")
573         usecPerCallField.Set(reflect.ValueOf(getFloat32FromString(usecPerCallStr)))
574 }
575
576 func getKeyspaceValues(values string) (string, string, string) {
577         keys := getValueString(values, "keys=")
578         expires := getValueString(values, "expires=")
579         avgttl := getValueString(values, "avg_ttl=")
580         return keys, expires, avgttl
581 }
582
583 func updateKeyspaceValues(i interface{}, line, keyspace string) {
584         stype := reflect.ValueOf(i).Elem()
585         values := getFieldValueStr(line, keyspace)
586         keysStr, expiresStr, avgttlStr := getKeyspaceValues(values)
587
588         keysField := stype.FieldByName("Keys")
589         keysField.Set(reflect.ValueOf(getUint32FromString(keysStr)))
590
591         expiresField := stype.FieldByName("Expires")
592         expiresField.Set(reflect.ValueOf(getUint32FromString(expiresStr)))
593
594         avgttlField := stype.FieldByName("AvgTtl")
595         avgttlField.Set(reflect.ValueOf(getUint32FromString(avgttlStr)))
596 }
597
598 func readRedisInfoReplyFields(input []string, info *DbInfo) error {
599         for _, line := range input {
600                 switch {
601                 case lineContains(line, "role:") && !lineContains(line, "_role:"):
602                         if "master" == getFieldValueStr(line, "role:") {
603                                 info.Fields.PrimaryRole = true
604                         }
605                 case lineContains(line, "connected_slaves:"):
606                         info.Fields.ConnectedReplicaCnt = getUint32FromString(getFieldValueStr(line, "connected_slaves:"))
607                 case lineContains(line, "uptime_in_days:"):
608                         info.Fields.Server.UptimeInDays = getUint32FromString(getFieldValueStr(line, "uptime_in_days:"))
609                 case lineContains(line, "connected_clients:"):
610                         info.Fields.Clients.ConnectedClients = getUint32FromString(getFieldValueStr(line, "connected_clients:"))
611                 case lineContains(line, "client_recent_max_input_buffer:"):
612                         info.Fields.Clients.ClientRecentMaxInputBuffer = getUint32FromString(getFieldValueStr(line, "client_recent_max_input_buffer:"))
613                 case lineContains(line, "client_recent_max_output_buffer:"):
614                         info.Fields.Clients.ClientRecentMaxOutputBuffer = getUint32FromString(getFieldValueStr(line, "client_recent_max_output_buffer:"))
615                 case lineContains(line, "used_memory:"):
616                         info.Fields.Memory.UsedMemory = getUint64FromString(getFieldValueStr(line, "used_memory:"))
617                 case lineContains(line, "used_memory_human:"):
618                         info.Fields.Memory.UsedMemoryHuman = getFieldValueStr(line, "used_memory_human:")
619                 case lineContains(line, "used_memory_rss:"):
620                         info.Fields.Memory.UsedMemoryRss = getUint64FromString(getFieldValueStr(line, "used_memory_rss:"))
621                 case lineContains(line, "used_memory_rss_human:"):
622                         info.Fields.Memory.UsedMemoryRssHuman = getFieldValueStr(line, "used_memory_rss_human:")
623                 case lineContains(line, "used_memory_peak:"):
624                         info.Fields.Memory.UsedMemoryPeak = getUint64FromString(getFieldValueStr(line, "used_memory_peak:"))
625                 case lineContains(line, "used_memory_peak_human:"):
626                         info.Fields.Memory.UsedMemoryPeakHuman = getFieldValueStr(line, "used_memory_peak_human:")
627                 case lineContains(line, "used_memory_peak_perc:"):
628                         info.Fields.Memory.UsedMemoryPeakPerc = getFieldValueStr(line, "used_memory_peak_perc:")
629                 case lineContains(line, "mem_fragmentation_ratio:"):
630                         info.Fields.Memory.MemFragmentationRatio = getFloat32FromString(getFieldValueStr(line, "mem_fragmentation_ratio:"))
631                 case lineContains(line, "mem_fragmentation_bytes:"):
632                         info.Fields.Memory.MemFragmentationBytes = getUint32FromString(getFieldValueStr(line, "mem_fragmentation_bytes:"))
633                 case lineContains(line, "total_connections_received:"):
634                         info.Fields.Stats.TotalConnectionsReceived = getUint32FromString(getFieldValueStr(line, "total_connections_received:"))
635                 case lineContains(line, "total_commands_processed:"):
636                         info.Fields.Stats.TotalCommandsProcessed = getUint32FromString(getFieldValueStr(line, "total_commands_processed:"))
637                 case lineContains(line, "sync_full:"):
638                         info.Fields.Stats.SyncFull = getUint32FromString(getFieldValueStr(line, "sync_full:"))
639                 case lineContains(line, "sync_partial_ok:"):
640                         info.Fields.Stats.SyncPartialOk = getUint32FromString(getFieldValueStr(line, "sync_partial_ok:"))
641                 case lineContains(line, "sync_partial_err:"):
642                         info.Fields.Stats.SyncPartialErr = getUint32FromString(getFieldValueStr(line, "sync_partial_err:"))
643                 case lineContains(line, "pubsub_channels:"):
644                         info.Fields.Stats.PubsubChannels = getUint32FromString(getFieldValueStr(line, "pubsub_channels:"))
645                 case lineContains(line, "used_cpu_sys:"):
646                         info.Fields.Cpu.UsedCpuSys = getFloat64FromString(getFieldValueStr(line, "used_cpu_sys:"))
647                 case lineContains(line, "used_cpu_user:"):
648                         info.Fields.Cpu.UsedCpuUser = getFloat64FromString(getFieldValueStr(line, "used_cpu_user:"))
649                 case lineContains(line, "cmdstat_replconf:"):
650                         updateCommandstatsValues(&info.Fields.Commandstats.CmdstatReplconf, line, "cmdstat_replconf:")
651                 case lineContains(line, "cmdstat_keys:"):
652                         updateCommandstatsValues(&info.Fields.Commandstats.CmdstatKeys, line, "cmdstat_keys:")
653                 case lineContains(line, "cmdstat_role:"):
654                         updateCommandstatsValues(&info.Fields.Commandstats.CmdstatRole, line, "cmdstat_role:")
655                 case lineContains(line, "cmdstat_psync:"):
656                         updateCommandstatsValues(&info.Fields.Commandstats.CmdstatPsync, line, "cmdstat_psync:")
657                 case lineContains(line, "cmdstat_mset:"):
658                         updateCommandstatsValues(&info.Fields.Commandstats.CmdstatMset, line, "cmdstat_mset:")
659                 case lineContains(line, "cmdstat_publish:"):
660                         updateCommandstatsValues(&info.Fields.Commandstats.CmdstatPublish, line, "cmdstat_publish:")
661                 case lineContains(line, "cmdstat_info:"):
662                         updateCommandstatsValues(&info.Fields.Commandstats.CmdstatInfo, line, "cmdstat_info:")
663                 case lineContains(line, "cmdstat_ping:"):
664                         updateCommandstatsValues(&info.Fields.Commandstats.CmdstatPing, line, "cmdstat_ping:")
665                 case lineContains(line, "cmdstat_client:"):
666                         updateCommandstatsValues(&info.Fields.Commandstats.CmdstatClient, line, "cmdstat_client:")
667                 case lineContains(line, "cmdstat_command:"):
668                         updateCommandstatsValues(&info.Fields.Commandstats.CmdstatCommand, line, "cmdstat_command:")
669                 case lineContains(line, "cmdstat_subscribe:"):
670                         updateCommandstatsValues(&info.Fields.Commandstats.CmdstatSubscribe, line, "cmdstat_subscribe:")
671                 case lineContains(line, "cmdstat_monitor:"):
672                         updateCommandstatsValues(&info.Fields.Commandstats.CmdstatMonitor, line, "cmdstat_monitor:")
673                 case lineContains(line, "cmdstat_config:"):
674                         updateCommandstatsValues(&info.Fields.Commandstats.CmdstatConfig, line, "cmdstat_config:")
675                 case lineContains(line, "cmdstat_slaveof:"):
676                         updateCommandstatsValues(&info.Fields.Commandstats.CmdstatSlaveof, line, "cmdstat_slaveof:")
677                 case lineContains(line, "db0:"):
678                         updateKeyspaceValues(&info.Fields.Keyspace.Db, line, "db0:")
679                 }
680         }
681         return nil
682 }
683
684 func (db *DB) State() (*DbState, error) {
685         dbState := new(DbState)
686         if db.cfg.sentinelPort != "" {
687                 //Establish connection to Redis sentinel. The reason why connection is done
688                 //here instead of time of the SDL instance creation is that for the time being
689                 //sentinel connection is needed only here to get state information and
690                 //state information is needed only by 'sdlcli' hence it is not time critical
691                 //and also we want to avoid opening unnecessary TCP connections towards Redis
692                 //sentinel for every SDL instance. Now it is done only when 'sdlcli' is used.
693                 sentinelClient := db.sentinel(&db.cfg, db.addr)
694                 return sentinelClient.GetDbState()
695         } else {
696                 info, err := db.Info()
697                 if err != nil {
698                         dbState.PrimaryDbState.Err = err
699                         return dbState, err
700                 }
701                 return db.fillDbStateFromDbInfo(info)
702         }
703 }
704
705 func (db *DB) fillDbStateFromDbInfo(info *DbInfo) (*DbState, error) {
706         var dbState DbState
707         if info.Fields.PrimaryRole == true {
708                 dbState = DbState{
709                         PrimaryDbState: PrimaryDbState{
710                                 Fields: PrimaryDbStateFields{
711                                         Role:  "master",
712                                         Ip:    db.cfg.hostname,
713                                         Port:  db.cfg.port,
714                                         Flags: "master",
715                                 },
716                         },
717                 }
718         }
719
720         cnt, err := strconv.Atoi(db.cfg.nodeCnt)
721         if err != nil {
722                 dbState.Err = fmt.Errorf("DBAAS_NODE_COUNT configuration value '%s' conversion to integer failed", db.cfg.nodeCnt)
723         } else {
724                 dbState.ConfigNodeCnt = cnt
725         }
726
727         return &dbState, dbState.Err
728 }
729
730 func createReplicaDbClient(host string) *DB {
731         cfg := readConfig(osImpl{})
732         cfg.sentinelPort = ""
733         cfg.clusterAddrList, cfg.port, _ = net.SplitHostPort(host)
734         return createDbClient(cfg, cfg.clusterAddrList, newRedisClient, subscribeNotifications, nil)
735 }
736
737 func getStatisticsInfo(db *DB, host string) (*DbStatisticsInfo, error) {
738         dbStatisticsInfo := new(DbStatisticsInfo)
739         dbStatisticsInfo.IPAddr, dbStatisticsInfo.Port, _ = net.SplitHostPort(host)
740
741         info, err := db.Info()
742         if err != nil {
743                 return nil, err
744         }
745         dbStatisticsInfo.Info = info
746
747         return dbStatisticsInfo, nil
748 }
749
750 func sentinelStatistics(db *DB) (*DbStatistics, error) {
751         dbState := new(DbState)
752         dbStatistics := new(DbStatistics)
753         dbStatisticsInfo := new(DbStatisticsInfo)
754         var err error
755
756         dbState, err = db.State()
757         if err != nil {
758                 return nil, err
759         }
760
761         dbStatisticsInfo, err = getStatisticsInfo(db, dbState.PrimaryDbState.GetAddress())
762         dbStatistics.Stats = append(dbStatistics.Stats, dbStatisticsInfo)
763
764         if dbState.ReplicasDbState != nil {
765                 for _, r := range dbState.ReplicasDbState.States {
766                         replicaDb := createReplicaDbClient(r.GetAddress())
767                         dbStatisticsInfo, err = getStatisticsInfo(replicaDb, r.GetAddress())
768                         replicaDb.CloseDB()
769                         if err != nil {
770                                 return nil, err
771                         }
772                         dbStatistics.Stats = append(dbStatistics.Stats, dbStatisticsInfo)
773                 }
774         }
775
776         return dbStatistics, nil
777 }
778
779 func standaloneStatistics(db *DB) (*DbStatistics, error) {
780         dbStatistics := new(DbStatistics)
781
782         dbStatisticsInfo, err := getStatisticsInfo(db, net.JoinHostPort(db.cfg.hostname, db.cfg.port))
783         dbStatistics.Stats = append(dbStatistics.Stats, dbStatisticsInfo)
784
785         return dbStatistics, err
786 }
787
788 func (db *DB) Statistics() (*DbStatistics, error) {
789         if db.cfg.sentinelPort != "" {
790                 return sentinelStatistics(db)
791         }
792
793         return standaloneStatistics(db)
794 }
795
796 var luaRefresh = redis.NewScript(`if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("pexpire", KEYS[1], ARGV[2]) else return 0 end`)
797
798 func (db *DB) PExpireIE(key string, data interface{}, expiration time.Duration) error {
799         expirationStr := strconv.FormatInt(int64(expiration/time.Millisecond), 10)
800         result, err := luaRefresh.Run(db.client, []string{key}, data, expirationStr).Result()
801         if err != nil {
802                 return err
803         }
804         if result == int64(1) {
805                 return nil
806         }
807         return errors.New("Lock not held")
808 }
809
810 func (sCbMap *sharedCbMap) Add(channel string, cb ChannelNotificationCb) {
811         sCbMap.m.Lock()
812         defer sCbMap.m.Unlock()
813         sCbMap.cbMap[channel] = cb
814 }
815
816 func (sCbMap *sharedCbMap) Remove(channel string) {
817         sCbMap.m.Lock()
818         defer sCbMap.m.Unlock()
819         delete(sCbMap.cbMap, channel)
820 }
821
822 func (sCbMap *sharedCbMap) Count() int {
823         sCbMap.m.Lock()
824         defer sCbMap.m.Unlock()
825         return len(sCbMap.cbMap)
826 }
827
828 func (sCbMap *sharedCbMap) GetMapCopy() map[string]ChannelNotificationCb {
829         sCbMap.m.Lock()
830         defer sCbMap.m.Unlock()
831         mapCopy := make(map[string]ChannelNotificationCb, 0)
832         for i, v := range sCbMap.cbMap {
833                 mapCopy[i] = v
834         }
835         return mapCopy
836 }