Fix sdlcli healthcheck DBAAS status in SEP install
[ric-plt/sdlgo.git] / internal / sdlgoredis / sdlgoredis.go
1 /*
2    Copyright (c) 2019 AT&T Intellectual Property.
3    Copyright (c) 2018-2019 Nokia.
4
5    Licensed under the Apache License, Version 2.0 (the "License");
6    you may not use this file except in compliance with the License.
7    You may obtain a copy of the License at
8
9        http://www.apache.org/licenses/LICENSE-2.0
10
11    Unless required by applicable law or agreed to in writing, software
12    distributed under the License is distributed on an "AS IS" BASIS,
13    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14    See the License for the specific language governing permissions and
15    limitations under the License.
16 */
17
18 /*
19  * This source code is part of the near-RT RIC (RAN Intelligent Controller)
20  * platform project (RICP).
21  */
22
23 package sdlgoredis
24
25 import (
26         "errors"
27         "fmt"
28         "github.com/go-redis/redis/v7"
29         "io"
30         "log"
31         "os"
32         "strconv"
33         "strings"
34         "sync"
35         "time"
36 )
37
38 type ChannelNotificationCb func(channel string, payload ...string)
39 type RedisClientCreator func(addr, port, clusterName string, isHa bool) RedisClient
40
41 type intChannels struct {
42         addChannel    chan string
43         removeChannel chan string
44         exit          chan bool
45 }
46
47 type sharedCbMap struct {
48         m     sync.Mutex
49         cbMap map[string]ChannelNotificationCb
50 }
51
52 type Config struct {
53         hostname        string
54         port            string
55         masterName      string
56         sentinelPort    string
57         clusterAddrList string
58         nodeCnt         string
59 }
60
61 type DB struct {
62         client       RedisClient
63         sentinel     RedisSentinelCreateCb
64         subscribe    SubscribeFn
65         redisModules bool
66         sCbMap       *sharedCbMap
67         ch           intChannels
68         cfg          Config
69         addr         string
70 }
71
72 type Subscriber interface {
73         Channel() <-chan *redis.Message
74         Subscribe(channels ...string) error
75         Unsubscribe(channels ...string) error
76         Close() error
77 }
78
79 type SubscribeFn func(client RedisClient, channels ...string) Subscriber
80
81 type RedisClient interface {
82         Command() *redis.CommandsInfoCmd
83         Close() error
84         Subscribe(channels ...string) *redis.PubSub
85         MSet(pairs ...interface{}) *redis.StatusCmd
86         Do(args ...interface{}) *redis.Cmd
87         MGet(keys ...string) *redis.SliceCmd
88         Del(keys ...string) *redis.IntCmd
89         Keys(pattern string) *redis.StringSliceCmd
90         SetNX(key string, value interface{}, expiration time.Duration) *redis.BoolCmd
91         SAdd(key string, members ...interface{}) *redis.IntCmd
92         SRem(key string, members ...interface{}) *redis.IntCmd
93         SMembers(key string) *redis.StringSliceCmd
94         SIsMember(key string, member interface{}) *redis.BoolCmd
95         SCard(key string) *redis.IntCmd
96         PTTL(key string) *redis.DurationCmd
97         Eval(script string, keys []string, args ...interface{}) *redis.Cmd
98         EvalSha(sha1 string, keys []string, args ...interface{}) *redis.Cmd
99         ScriptExists(scripts ...string) *redis.BoolSliceCmd
100         ScriptLoad(script string) *redis.StringCmd
101         Info(section ...string) *redis.StringCmd
102 }
103
104 var dbLogger *log.Logger
105
106 func init() {
107         dbLogger = log.New(os.Stdout, "database: ", log.LstdFlags|log.Lshortfile)
108         redis.SetLogger(dbLogger)
109 }
110
111 func SetDbLogger(out io.Writer) {
112         dbLogger.SetOutput(out)
113 }
114
115 func checkResultAndError(result interface{}, err error) (bool, error) {
116         if err != nil {
117                 if err == redis.Nil {
118                         return false, nil
119                 }
120                 return false, err
121         }
122         if result == "OK" {
123                 return true, nil
124         }
125         return false, nil
126 }
127
128 func checkIntResultAndError(result interface{}, err error) (bool, error) {
129         if err != nil {
130                 return false, err
131         }
132         if n, ok := result.(int64); ok {
133                 if n == 1 {
134                         return true, nil
135                 }
136         } else if n, ok := result.(int); ok {
137                 if n == 1 {
138                         return true, nil
139                 }
140         }
141         return false, nil
142 }
143
144 func subscribeNotifications(client RedisClient, channels ...string) Subscriber {
145         return client.Subscribe(channels...)
146 }
147
148 func CreateDB(client RedisClient, subscribe SubscribeFn, sentinelCreateCb RedisSentinelCreateCb, cfg Config, sentinelAddr string) *DB {
149         db := DB{
150                 client:       client,
151                 sentinel:     sentinelCreateCb,
152                 subscribe:    subscribe,
153                 redisModules: true,
154                 sCbMap:       &sharedCbMap{cbMap: make(map[string]ChannelNotificationCb, 0)},
155                 ch: intChannels{
156                         addChannel:    make(chan string),
157                         removeChannel: make(chan string),
158                         exit:          make(chan bool),
159                 },
160                 cfg:  cfg,
161                 addr: sentinelAddr,
162         }
163
164         return &db
165 }
166
167 func Create() []*DB {
168         osimpl := osImpl{}
169         return ReadConfigAndCreateDbClients(osimpl, newRedisClient, subscribeNotifications, newRedisSentinel)
170 }
171
172 func readConfig(osI OS) Config {
173         cfg := Config{
174                 hostname:        osI.Getenv("DBAAS_SERVICE_HOST", "localhost"),
175                 port:            osI.Getenv("DBAAS_SERVICE_PORT", "6379"),
176                 masterName:      osI.Getenv("DBAAS_MASTER_NAME", ""),
177                 sentinelPort:    osI.Getenv("DBAAS_SERVICE_SENTINEL_PORT", ""),
178                 clusterAddrList: osI.Getenv("DBAAS_CLUSTER_ADDR_LIST", ""),
179                 nodeCnt:         osI.Getenv("DBAAS_NODE_COUNT", "1"),
180         }
181         return cfg
182 }
183
184 type OS interface {
185         Getenv(key string, defValue string) string
186 }
187
188 type osImpl struct{}
189
190 func (osImpl) Getenv(key string, defValue string) string {
191         val := os.Getenv(key)
192         if val == "" {
193                 val = defValue
194         }
195         return val
196 }
197
198 func ReadConfigAndCreateDbClients(osI OS, clientCreator RedisClientCreator,
199         subscribe SubscribeFn,
200         sentinelCreateCb RedisSentinelCreateCb) []*DB {
201         cfg := readConfig(osI)
202         return createDbClients(cfg, clientCreator, subscribe, sentinelCreateCb)
203 }
204
205 func createDbClients(cfg Config, clientCreator RedisClientCreator,
206         subscribe SubscribeFn,
207         sentinelCreateCb RedisSentinelCreateCb) []*DB {
208         if cfg.clusterAddrList == "" {
209                 return []*DB{createLegacyDbClient(cfg, clientCreator, subscribe, sentinelCreateCb)}
210         }
211
212         dbs := []*DB{}
213
214         addrList := strings.Split(cfg.clusterAddrList, ",")
215         for _, addr := range addrList {
216                 db := createDbClient(cfg, addr, clientCreator, subscribe, sentinelCreateCb)
217                 dbs = append(dbs, db)
218         }
219         return dbs
220 }
221
222 func createLegacyDbClient(cfg Config, clientCreator RedisClientCreator,
223         subscribe SubscribeFn,
224         sentinelCreateCb RedisSentinelCreateCb) *DB {
225         return createDbClient(cfg, cfg.hostname, clientCreator, subscribe, sentinelCreateCb)
226 }
227
228 func createDbClient(cfg Config, hostName string, clientCreator RedisClientCreator,
229         subscribe SubscribeFn,
230         sentinelCreateCb RedisSentinelCreateCb) *DB {
231         var client RedisClient
232         var db *DB
233         if cfg.sentinelPort == "" {
234                 client = clientCreator(hostName, cfg.port, "", false)
235                 db = CreateDB(client, subscribe, nil, cfg, hostName)
236         } else {
237                 client = clientCreator(hostName, cfg.sentinelPort, cfg.masterName, true)
238                 db = CreateDB(client, subscribe, sentinelCreateCb, cfg, hostName)
239         }
240         db.CheckCommands()
241         return db
242 }
243
244 func newRedisClient(addr, port, clusterName string, isHa bool) RedisClient {
245         if isHa == true {
246                 sentinelAddress := addr + ":" + port
247                 return redis.NewFailoverClient(
248                         &redis.FailoverOptions{
249                                 MasterName:    clusterName,
250                                 SentinelAddrs: []string{sentinelAddress},
251                                 PoolSize:      20,
252                                 MaxRetries:    2,
253                         },
254                 )
255         }
256         redisAddress := addr + ":" + port
257         return redis.NewClient(&redis.Options{
258                 Addr:       redisAddress,
259                 Password:   "", // no password set
260                 DB:         0,  // use default DB
261                 PoolSize:   20,
262                 MaxRetries: 2,
263         })
264 }
265
266 func (db *DB) CheckCommands() {
267         commands, err := db.client.Command().Result()
268         if err == nil {
269                 redisModuleCommands := []string{"setie", "delie", "setiepub", "setnxpub",
270                         "msetmpub", "delmpub"}
271                 for _, v := range redisModuleCommands {
272                         _, ok := commands[v]
273                         if !ok {
274                                 db.redisModules = false
275                         }
276                 }
277         } else {
278                 dbLogger.Printf("SDL DB commands checking failure: %s\n", err)
279         }
280 }
281
282 func (db *DB) CloseDB() error {
283         return db.client.Close()
284 }
285
286 func (db *DB) UnsubscribeChannelDB(channels ...string) {
287         for _, v := range channels {
288                 db.sCbMap.Remove(v)
289                 db.ch.removeChannel <- v
290                 if db.sCbMap.Count() == 0 {
291                         db.ch.exit <- true
292                 }
293         }
294 }
295
296 func (db *DB) SubscribeChannelDB(cb func(string, ...string), channelPrefix, eventSeparator string, channels ...string) {
297         if db.sCbMap.Count() == 0 {
298                 for _, v := range channels {
299                         db.sCbMap.Add(v, cb)
300                 }
301
302                 go func(sCbMap *sharedCbMap,
303                         channelPrefix,
304                         eventSeparator string,
305                         ch intChannels,
306                         channels ...string) {
307                         sub := db.subscribe(db.client, channels...)
308                         rxChannel := sub.Channel()
309                         lCbMap := sCbMap.GetMapCopy()
310                         for {
311                                 select {
312                                 case msg := <-rxChannel:
313                                         cb, ok := lCbMap[msg.Channel]
314                                         if ok {
315                                                 cb(strings.TrimPrefix(msg.Channel, channelPrefix), strings.Split(msg.Payload, eventSeparator)...)
316                                         }
317                                 case channel := <-ch.addChannel:
318                                         lCbMap = sCbMap.GetMapCopy()
319                                         sub.Subscribe(channel)
320                                 case channel := <-ch.removeChannel:
321                                         lCbMap = sCbMap.GetMapCopy()
322                                         sub.Unsubscribe(channel)
323                                 case exit := <-ch.exit:
324                                         if exit {
325                                                 if err := sub.Close(); err != nil {
326                                                         dbLogger.Printf("SDL DB channel closing failure: %s\n", err)
327                                                 }
328                                                 return
329                                         }
330                                 }
331                         }
332                 }(db.sCbMap, channelPrefix, eventSeparator, db.ch, channels...)
333
334         } else {
335                 for _, v := range channels {
336                         db.sCbMap.Add(v, cb)
337                         db.ch.addChannel <- v
338                 }
339         }
340 }
341
342 func (db *DB) MSet(pairs ...interface{}) error {
343         return db.client.MSet(pairs...).Err()
344 }
345
346 func (db *DB) MSetMPub(channelsAndEvents []string, pairs ...interface{}) error {
347         if !db.redisModules {
348                 return errors.New("Redis deployment doesn't support MSETMPUB command")
349         }
350         command := make([]interface{}, 0)
351         command = append(command, "MSETMPUB")
352         command = append(command, len(pairs)/2)
353         command = append(command, len(channelsAndEvents)/2)
354         for _, d := range pairs {
355                 command = append(command, d)
356         }
357         for _, d := range channelsAndEvents {
358                 command = append(command, d)
359         }
360         _, err := db.client.Do(command...).Result()
361         return err
362 }
363
364 func (db *DB) MGet(keys []string) ([]interface{}, error) {
365         return db.client.MGet(keys...).Result()
366 }
367
368 func (db *DB) DelMPub(channelsAndEvents []string, keys []string) error {
369         if !db.redisModules {
370                 return errors.New("Redis deployment not supporting command DELMPUB")
371         }
372         command := make([]interface{}, 0)
373         command = append(command, "DELMPUB")
374         command = append(command, len(keys))
375         command = append(command, len(channelsAndEvents)/2)
376         for _, d := range keys {
377                 command = append(command, d)
378         }
379         for _, d := range channelsAndEvents {
380                 command = append(command, d)
381         }
382         _, err := db.client.Do(command...).Result()
383         return err
384
385 }
386
387 func (db *DB) Del(keys []string) error {
388         _, err := db.client.Del(keys...).Result()
389         return err
390 }
391
392 func (db *DB) Keys(pattern string) ([]string, error) {
393         return db.client.Keys(pattern).Result()
394 }
395
396 func (db *DB) SetIE(key string, oldData, newData interface{}) (bool, error) {
397         if !db.redisModules {
398                 return false, errors.New("Redis deployment not supporting command")
399         }
400
401         return checkResultAndError(db.client.Do("SETIE", key, newData, oldData).Result())
402 }
403
404 func (db *DB) SetIEPub(channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error) {
405         if !db.redisModules {
406                 return false, errors.New("Redis deployment not supporting command SETIEMPUB")
407         }
408         capacity := 4 + len(channelsAndEvents)
409         command := make([]interface{}, 0, capacity)
410         command = append(command, "SETIEMPUB")
411         command = append(command, key)
412         command = append(command, newData)
413         command = append(command, oldData)
414         for _, ce := range channelsAndEvents {
415                 command = append(command, ce)
416         }
417         return checkResultAndError(db.client.Do(command...).Result())
418 }
419
420 func (db *DB) SetNXPub(channelsAndEvents []string, key string, data interface{}) (bool, error) {
421         if !db.redisModules {
422                 return false, errors.New("Redis deployment not supporting command SETNXMPUB")
423         }
424         capacity := 3 + len(channelsAndEvents)
425         command := make([]interface{}, 0, capacity)
426         command = append(command, "SETNXMPUB")
427         command = append(command, key)
428         command = append(command, data)
429         for _, ce := range channelsAndEvents {
430                 command = append(command, ce)
431         }
432         return checkResultAndError(db.client.Do(command...).Result())
433 }
434 func (db *DB) SetNX(key string, data interface{}, expiration time.Duration) (bool, error) {
435         return db.client.SetNX(key, data, expiration).Result()
436 }
437
438 func (db *DB) DelIEPub(channelsAndEvents []string, key string, data interface{}) (bool, error) {
439         if !db.redisModules {
440                 return false, errors.New("Redis deployment not supporting command DELIEMPUB")
441         }
442         capacity := 3 + len(channelsAndEvents)
443         command := make([]interface{}, 0, capacity)
444         command = append(command, "DELIEMPUB")
445         command = append(command, key)
446         command = append(command, data)
447         for _, ce := range channelsAndEvents {
448                 command = append(command, ce)
449         }
450         return checkIntResultAndError(db.client.Do(command...).Result())
451 }
452
453 func (db *DB) DelIE(key string, data interface{}) (bool, error) {
454         if !db.redisModules {
455                 return false, errors.New("Redis deployment not supporting command")
456         }
457         return checkIntResultAndError(db.client.Do("DELIE", key, data).Result())
458 }
459
460 func (db *DB) SAdd(key string, data ...interface{}) error {
461         _, err := db.client.SAdd(key, data...).Result()
462         return err
463 }
464
465 func (db *DB) SRem(key string, data ...interface{}) error {
466         _, err := db.client.SRem(key, data...).Result()
467         return err
468 }
469
470 func (db *DB) SMembers(key string) ([]string, error) {
471         result, err := db.client.SMembers(key).Result()
472         return result, err
473 }
474
475 func (db *DB) SIsMember(key string, data interface{}) (bool, error) {
476         result, err := db.client.SIsMember(key, data).Result()
477         return result, err
478 }
479
480 func (db *DB) SCard(key string) (int64, error) {
481         result, err := db.client.SCard(key).Result()
482         return result, err
483 }
484
485 func (db *DB) PTTL(key string) (time.Duration, error) {
486         result, err := db.client.PTTL(key).Result()
487         return result, err
488 }
489
490 func (db *DB) Info() (*DbInfo, error) {
491         var info DbInfo
492         resultStr, err := db.client.Info("all").Result()
493         if err != nil {
494                 return &info, err
495         }
496
497         result := strings.Split(strings.ReplaceAll(resultStr, "\r\n", "\n"), "\n")
498         err = readRedisInfoReplyFields(result, &info)
499         return &info, err
500 }
501
502 func readRedisInfoReplyFields(input []string, info *DbInfo) error {
503         for _, line := range input {
504                 if idx := strings.Index(line, "role:"); idx != -1 {
505                         roleStr := line[idx+len("role:"):]
506                         if roleStr == "master" {
507                                 info.Fields.PrimaryRole = true
508                         }
509                 } else if idx := strings.Index(line, "connected_slaves:"); idx != -1 {
510                         cntStr := line[idx+len("connected_slaves:"):]
511                         cnt, err := strconv.ParseUint(cntStr, 10, 32)
512                         if err != nil {
513                                 return fmt.Errorf("Info reply error: %s", err.Error())
514                         }
515                         info.Fields.ConnectedReplicaCnt = uint32(cnt)
516                 }
517         }
518         return nil
519 }
520
521 func (db *DB) State() (*DbState, error) {
522         dbState := new(DbState)
523         if db.cfg.sentinelPort != "" {
524                 //Establish connection to Redis sentinel. The reason why connection is done
525                 //here instead of time of the SDL instance creation is that for the time being
526                 //sentinel connection is needed only here to get state information and
527                 //state information is needed only by 'sdlcli' hence it is not time critical
528                 //and also we want to avoid opening unnecessary TCP connections towards Redis
529                 //sentinel for every SDL instance. Now it is done only when 'sdlcli' is used.
530                 sentinelClient := db.sentinel(&db.cfg, db.addr)
531                 return sentinelClient.GetDbState()
532         } else {
533                 info, err := db.Info()
534                 if err != nil {
535                         dbState.PrimaryDbState.Err = err
536                         return dbState, err
537                 }
538                 return db.fillDbStateFromDbInfo(info)
539         }
540 }
541
542 func (db *DB) fillDbStateFromDbInfo(info *DbInfo) (*DbState, error) {
543         var dbState DbState
544         if info.Fields.PrimaryRole == true {
545                 dbState = DbState{
546                         PrimaryDbState: PrimaryDbState{
547                                 Fields: PrimaryDbStateFields{
548                                         Role:  "master",
549                                         Flags: "master",
550                                 },
551                         },
552                 }
553         }
554
555         cnt, err := strconv.Atoi(db.cfg.nodeCnt)
556         if err != nil {
557                 dbState.Err = fmt.Errorf("DBAAS_NODE_COUNT configuration value '%s' conversion to integer failed", db.cfg.nodeCnt)
558         } else {
559                 dbState.ConfigNodeCnt = cnt
560         }
561
562         return &dbState, dbState.Err
563 }
564
565 var luaRefresh = redis.NewScript(`if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("pexpire", KEYS[1], ARGV[2]) else return 0 end`)
566
567 func (db *DB) PExpireIE(key string, data interface{}, expiration time.Duration) error {
568         expirationStr := strconv.FormatInt(int64(expiration/time.Millisecond), 10)
569         result, err := luaRefresh.Run(db.client, []string{key}, data, expirationStr).Result()
570         if err != nil {
571                 return err
572         }
573         if result == int64(1) {
574                 return nil
575         }
576         return errors.New("Lock not held")
577 }
578
579 func (sCbMap *sharedCbMap) Add(channel string, cb ChannelNotificationCb) {
580         sCbMap.m.Lock()
581         defer sCbMap.m.Unlock()
582         sCbMap.cbMap[channel] = cb
583 }
584
585 func (sCbMap *sharedCbMap) Remove(channel string) {
586         sCbMap.m.Lock()
587         defer sCbMap.m.Unlock()
588         delete(sCbMap.cbMap, channel)
589 }
590
591 func (sCbMap *sharedCbMap) Count() int {
592         sCbMap.m.Lock()
593         defer sCbMap.m.Unlock()
594         return len(sCbMap.cbMap)
595 }
596
597 func (sCbMap *sharedCbMap) GetMapCopy() map[string]ChannelNotificationCb {
598         sCbMap.m.Lock()
599         defer sCbMap.m.Unlock()
600         mapCopy := make(map[string]ChannelNotificationCb, 0)
601         for i, v := range sCbMap.cbMap {
602                 mapCopy[i] = v
603         }
604         return mapCopy
605 }