Implement sentinel based DB capacity scaling
[ric-plt/sdlgo.git] / internal / sdlgoredis / sdlgoredis.go
1 /*
2    Copyright (c) 2019 AT&T Intellectual Property.
3    Copyright (c) 2018-2019 Nokia.
4
5    Licensed under the Apache License, Version 2.0 (the "License");
6    you may not use this file except in compliance with the License.
7    You may obtain a copy of the License at
8
9        http://www.apache.org/licenses/LICENSE-2.0
10
11    Unless required by applicable law or agreed to in writing, software
12    distributed under the License is distributed on an "AS IS" BASIS,
13    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14    See the License for the specific language governing permissions and
15    limitations under the License.
16 */
17
18 /*
19  * This source code is part of the near-RT RIC (RAN Intelligent Controller)
20  * platform project (RICP).
21  */
22
23 package sdlgoredis
24
25 import (
26         "errors"
27         "fmt"
28         "github.com/go-redis/redis"
29         "os"
30         "strconv"
31         "strings"
32         "sync"
33         "time"
34 )
35
36 type ChannelNotificationCb func(channel string, payload ...string)
37 type RedisClientCreator func(addr, port, clusterName string, isHa bool) RedisClient
38
39 type intChannels struct {
40         addChannel    chan string
41         removeChannel chan string
42         exit          chan bool
43 }
44
45 type sharedCbMap struct {
46         m     sync.Mutex
47         cbMap map[string]ChannelNotificationCb
48 }
49
50 type Config struct {
51         hostname        string
52         port            string
53         masterName      string
54         sentinelPort    string
55         clusterAddrList string
56 }
57
58 type DB struct {
59         client       RedisClient
60         subscribe    SubscribeFn
61         redisModules bool
62         sCbMap       *sharedCbMap
63         ch           intChannels
64 }
65
66 type Subscriber interface {
67         Channel() <-chan *redis.Message
68         Subscribe(channels ...string) error
69         Unsubscribe(channels ...string) error
70         Close() error
71 }
72
73 type SubscribeFn func(client RedisClient, channels ...string) Subscriber
74
75 type RedisClient interface {
76         Command() *redis.CommandsInfoCmd
77         Close() error
78         Subscribe(channels ...string) *redis.PubSub
79         MSet(pairs ...interface{}) *redis.StatusCmd
80         Do(args ...interface{}) *redis.Cmd
81         MGet(keys ...string) *redis.SliceCmd
82         Del(keys ...string) *redis.IntCmd
83         Keys(pattern string) *redis.StringSliceCmd
84         SetNX(key string, value interface{}, expiration time.Duration) *redis.BoolCmd
85         SAdd(key string, members ...interface{}) *redis.IntCmd
86         SRem(key string, members ...interface{}) *redis.IntCmd
87         SMembers(key string) *redis.StringSliceCmd
88         SIsMember(key string, member interface{}) *redis.BoolCmd
89         SCard(key string) *redis.IntCmd
90         PTTL(key string) *redis.DurationCmd
91         Eval(script string, keys []string, args ...interface{}) *redis.Cmd
92         EvalSha(sha1 string, keys []string, args ...interface{}) *redis.Cmd
93         ScriptExists(scripts ...string) *redis.BoolSliceCmd
94         ScriptLoad(script string) *redis.StringCmd
95 }
96
97 func checkResultAndError(result interface{}, err error) (bool, error) {
98         if err != nil {
99                 if err == redis.Nil {
100                         return false, nil
101                 }
102                 return false, err
103         }
104         if result == "OK" {
105                 return true, nil
106         }
107         return false, nil
108 }
109
110 func checkIntResultAndError(result interface{}, err error) (bool, error) {
111         if err != nil {
112                 return false, err
113         }
114         if n, ok := result.(int64); ok {
115                 if n == 1 {
116                         return true, nil
117                 }
118         } else if n, ok := result.(int); ok {
119                 if n == 1 {
120                         return true, nil
121                 }
122         }
123         return false, nil
124 }
125
126 func subscribeNotifications(client RedisClient, channels ...string) Subscriber {
127         return client.Subscribe(channels...)
128 }
129
130 func CreateDB(client RedisClient, subscribe SubscribeFn) *DB {
131         db := DB{
132                 client:       client,
133                 subscribe:    subscribe,
134                 redisModules: true,
135                 sCbMap:       &sharedCbMap{cbMap: make(map[string]ChannelNotificationCb, 0)},
136                 ch: intChannels{
137                         addChannel:    make(chan string),
138                         removeChannel: make(chan string),
139                         exit:          make(chan bool),
140                 },
141         }
142
143         return &db
144 }
145
146 func Create() []*DB {
147         osimpl := osImpl{}
148         return ReadConfigAndCreateDbClients(osimpl, newRedisClient)
149 }
150
151 func readConfig(osI OS) Config {
152         cfg := Config{
153                 hostname:        osI.Getenv("DBAAS_SERVICE_HOST", "localhost"),
154                 port:            osI.Getenv("DBAAS_SERVICE_PORT", "6379"),
155                 masterName:      osI.Getenv("DBAAS_MASTER_NAME", ""),
156                 sentinelPort:    osI.Getenv("DBAAS_SERVICE_SENTINEL_PORT", ""),
157                 clusterAddrList: osI.Getenv("DBAAS_CLUSTER_ADDR_LIST", ""),
158         }
159         return cfg
160 }
161
162 type OS interface {
163         Getenv(key string, defValue string) string
164 }
165
166 type osImpl struct{}
167
168 func (osImpl) Getenv(key string, defValue string) string {
169         val := os.Getenv(key)
170         if val == "" {
171                 val = defValue
172         }
173         return val
174 }
175
176 func ReadConfigAndCreateDbClients(osI OS, clientCreator RedisClientCreator) []*DB {
177         cfg := readConfig(osI)
178         return createDbClients(cfg, clientCreator)
179 }
180
181 func createDbClients(cfg Config, clientCreator RedisClientCreator) []*DB {
182         if cfg.clusterAddrList == "" {
183                 return []*DB{createLegacyDbClient(cfg, clientCreator)}
184         }
185
186         dbs := []*DB{}
187
188         addrList := strings.Split(cfg.clusterAddrList, ",")
189         for _, addr := range addrList {
190                 db := createDbClient(cfg, addr, clientCreator)
191                 dbs = append(dbs, db)
192         }
193         return dbs
194 }
195
196 func createLegacyDbClient(cfg Config, clientCreator RedisClientCreator) *DB {
197         return createDbClient(cfg, cfg.hostname, clientCreator)
198 }
199
200 func createDbClient(cfg Config, hostName string, clientCreator RedisClientCreator) *DB {
201         var client RedisClient
202         if cfg.sentinelPort == "" {
203                 client = clientCreator(hostName, cfg.port, "", false)
204         } else {
205                 client = clientCreator(hostName, cfg.sentinelPort, cfg.masterName, true)
206         }
207         db := CreateDB(client, subscribeNotifications)
208         db.CheckCommands()
209         return db
210 }
211
212 func newRedisClient(addr, port, clusterName string, isHa bool) RedisClient {
213         if isHa == true {
214                 sentinelAddress := addr + ":" + port
215                 return redis.NewFailoverClient(
216                         &redis.FailoverOptions{
217                                 MasterName:    clusterName,
218                                 SentinelAddrs: []string{sentinelAddress},
219                                 PoolSize:      20,
220                                 MaxRetries:    2,
221                         },
222                 )
223         }
224         redisAddress := addr + ":" + port
225         return redis.NewClient(&redis.Options{
226                 Addr:       redisAddress,
227                 Password:   "", // no password set
228                 DB:         0,  // use default DB
229                 PoolSize:   20,
230                 MaxRetries: 2,
231         })
232 }
233
234 func (db *DB) CheckCommands() {
235         commands, err := db.client.Command().Result()
236         if err == nil {
237                 redisModuleCommands := []string{"setie", "delie", "setiepub", "setnxpub",
238                         "msetmpub", "delmpub"}
239                 for _, v := range redisModuleCommands {
240                         _, ok := commands[v]
241                         if !ok {
242                                 db.redisModules = false
243                         }
244                 }
245         } else {
246                 fmt.Println(err)
247         }
248 }
249
250 func (db *DB) CloseDB() error {
251         return db.client.Close()
252 }
253
254 func (db *DB) UnsubscribeChannelDB(channels ...string) {
255         for _, v := range channels {
256                 db.sCbMap.Remove(v)
257                 db.ch.removeChannel <- v
258                 if db.sCbMap.Count() == 0 {
259                         db.ch.exit <- true
260                 }
261         }
262 }
263
264 func (db *DB) SubscribeChannelDB(cb func(string, ...string), channelPrefix, eventSeparator string, channels ...string) {
265         if db.sCbMap.Count() == 0 {
266                 for _, v := range channels {
267                         db.sCbMap.Add(v, cb)
268                 }
269
270                 go func(sCbMap *sharedCbMap,
271                         channelPrefix,
272                         eventSeparator string,
273                         ch intChannels,
274                         channels ...string) {
275                         sub := db.subscribe(db.client, channels...)
276                         rxChannel := sub.Channel()
277                         lCbMap := sCbMap.GetMapCopy()
278                         for {
279                                 select {
280                                 case msg := <-rxChannel:
281                                         cb, ok := lCbMap[msg.Channel]
282                                         if ok {
283                                                 cb(strings.TrimPrefix(msg.Channel, channelPrefix), strings.Split(msg.Payload, eventSeparator)...)
284                                         }
285                                 case channel := <-ch.addChannel:
286                                         lCbMap = sCbMap.GetMapCopy()
287                                         sub.Subscribe(channel)
288                                 case channel := <-ch.removeChannel:
289                                         lCbMap = sCbMap.GetMapCopy()
290                                         sub.Unsubscribe(channel)
291                                 case exit := <-ch.exit:
292                                         if exit {
293                                                 if err := sub.Close(); err != nil {
294                                                         fmt.Println(err)
295                                                 }
296                                                 return
297                                         }
298                                 }
299                         }
300                 }(db.sCbMap, channelPrefix, eventSeparator, db.ch, channels...)
301
302         } else {
303                 for _, v := range channels {
304                         db.sCbMap.Add(v, cb)
305                         db.ch.addChannel <- v
306                 }
307         }
308 }
309
310 func (db *DB) MSet(pairs ...interface{}) error {
311         return db.client.MSet(pairs...).Err()
312 }
313
314 func (db *DB) MSetMPub(channelsAndEvents []string, pairs ...interface{}) error {
315         if !db.redisModules {
316                 return errors.New("Redis deployment doesn't support MSETMPUB command")
317         }
318         command := make([]interface{}, 0)
319         command = append(command, "MSETMPUB")
320         command = append(command, len(pairs)/2)
321         command = append(command, len(channelsAndEvents)/2)
322         for _, d := range pairs {
323                 command = append(command, d)
324         }
325         for _, d := range channelsAndEvents {
326                 command = append(command, d)
327         }
328         _, err := db.client.Do(command...).Result()
329         return err
330 }
331
332 func (db *DB) MGet(keys []string) ([]interface{}, error) {
333         return db.client.MGet(keys...).Result()
334 }
335
336 func (db *DB) DelMPub(channelsAndEvents []string, keys []string) error {
337         if !db.redisModules {
338                 return errors.New("Redis deployment not supporting command DELMPUB")
339         }
340         command := make([]interface{}, 0)
341         command = append(command, "DELMPUB")
342         command = append(command, len(keys))
343         command = append(command, len(channelsAndEvents)/2)
344         for _, d := range keys {
345                 command = append(command, d)
346         }
347         for _, d := range channelsAndEvents {
348                 command = append(command, d)
349         }
350         _, err := db.client.Do(command...).Result()
351         return err
352
353 }
354
355 func (db *DB) Del(keys []string) error {
356         _, err := db.client.Del(keys...).Result()
357         return err
358 }
359
360 func (db *DB) Keys(pattern string) ([]string, error) {
361         return db.client.Keys(pattern).Result()
362 }
363
364 func (db *DB) SetIE(key string, oldData, newData interface{}) (bool, error) {
365         if !db.redisModules {
366                 return false, errors.New("Redis deployment not supporting command")
367         }
368
369         return checkResultAndError(db.client.Do("SETIE", key, newData, oldData).Result())
370 }
371
372 func (db *DB) SetIEPub(channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error) {
373         if !db.redisModules {
374                 return false, errors.New("Redis deployment not supporting command SETIEMPUB")
375         }
376         capacity := 4 + len(channelsAndEvents)
377         command := make([]interface{}, 0, capacity)
378         command = append(command, "SETIEMPUB")
379         command = append(command, key)
380         command = append(command, newData)
381         command = append(command, oldData)
382         for _, ce := range channelsAndEvents {
383                 command = append(command, ce)
384         }
385         return checkResultAndError(db.client.Do(command...).Result())
386 }
387
388 func (db *DB) SetNXPub(channelsAndEvents []string, key string, data interface{}) (bool, error) {
389         if !db.redisModules {
390                 return false, errors.New("Redis deployment not supporting command SETNXMPUB")
391         }
392         capacity := 3 + len(channelsAndEvents)
393         command := make([]interface{}, 0, capacity)
394         command = append(command, "SETNXMPUB")
395         command = append(command, key)
396         command = append(command, data)
397         for _, ce := range channelsAndEvents {
398                 command = append(command, ce)
399         }
400         return checkResultAndError(db.client.Do(command...).Result())
401 }
402 func (db *DB) SetNX(key string, data interface{}, expiration time.Duration) (bool, error) {
403         return db.client.SetNX(key, data, expiration).Result()
404 }
405
406 func (db *DB) DelIEPub(channelsAndEvents []string, key string, data interface{}) (bool, error) {
407         if !db.redisModules {
408                 return false, errors.New("Redis deployment not supporting command DELIEMPUB")
409         }
410         capacity := 3 + len(channelsAndEvents)
411         command := make([]interface{}, 0, capacity)
412         command = append(command, "DELIEMPUB")
413         command = append(command, key)
414         command = append(command, data)
415         for _, ce := range channelsAndEvents {
416                 command = append(command, ce)
417         }
418         return checkIntResultAndError(db.client.Do(command...).Result())
419 }
420
421 func (db *DB) DelIE(key string, data interface{}) (bool, error) {
422         if !db.redisModules {
423                 return false, errors.New("Redis deployment not supporting command")
424         }
425         return checkIntResultAndError(db.client.Do("DELIE", key, data).Result())
426 }
427
428 func (db *DB) SAdd(key string, data ...interface{}) error {
429         _, err := db.client.SAdd(key, data...).Result()
430         return err
431 }
432
433 func (db *DB) SRem(key string, data ...interface{}) error {
434         _, err := db.client.SRem(key, data...).Result()
435         return err
436 }
437
438 func (db *DB) SMembers(key string) ([]string, error) {
439         result, err := db.client.SMembers(key).Result()
440         return result, err
441 }
442
443 func (db *DB) SIsMember(key string, data interface{}) (bool, error) {
444         result, err := db.client.SIsMember(key, data).Result()
445         return result, err
446 }
447
448 func (db *DB) SCard(key string) (int64, error) {
449         result, err := db.client.SCard(key).Result()
450         return result, err
451 }
452
453 func (db *DB) PTTL(key string) (time.Duration, error) {
454         result, err := db.client.PTTL(key).Result()
455         return result, err
456 }
457
458 var luaRefresh = redis.NewScript(`if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("pexpire", KEYS[1], ARGV[2]) else return 0 end`)
459
460 func (db *DB) PExpireIE(key string, data interface{}, expiration time.Duration) error {
461         expirationStr := strconv.FormatInt(int64(expiration/time.Millisecond), 10)
462         result, err := luaRefresh.Run(db.client, []string{key}, data, expirationStr).Result()
463         if err != nil {
464                 return err
465         }
466         if result == int64(1) {
467                 return nil
468         }
469         return errors.New("Lock not held")
470 }
471
472 func (sCbMap *sharedCbMap) Add(channel string, cb ChannelNotificationCb) {
473         sCbMap.m.Lock()
474         defer sCbMap.m.Unlock()
475         sCbMap.cbMap[channel] = cb
476 }
477
478 func (sCbMap *sharedCbMap) Remove(channel string) {
479         sCbMap.m.Lock()
480         defer sCbMap.m.Unlock()
481         delete(sCbMap.cbMap, channel)
482 }
483
484 func (sCbMap *sharedCbMap) Count() int {
485         sCbMap.m.Lock()
486         defer sCbMap.m.Unlock()
487         return len(sCbMap.cbMap)
488 }
489
490 func (sCbMap *sharedCbMap) GetMapCopy() map[string]ChannelNotificationCb {
491         sCbMap.m.Lock()
492         defer sCbMap.m.Unlock()
493         mapCopy := make(map[string]ChannelNotificationCb, 0)
494         for i, v := range sCbMap.cbMap {
495                 mapCopy[i] = v
496         }
497         return mapCopy
498 }