574e7b0a601a9c9b8513a64c56de14fa8c947b84
[ric-plt/sdlgo.git] / internal / sdlgoredis / sdlgoredis.go
1 /*
2    Copyright (c) 2019 AT&T Intellectual Property.
3    Copyright (c) 2018-2019 Nokia.
4
5    Licensed under the Apache License, Version 2.0 (the "License");
6    you may not use this file except in compliance with the License.
7    You may obtain a copy of the License at
8
9        http://www.apache.org/licenses/LICENSE-2.0
10
11    Unless required by applicable law or agreed to in writing, software
12    distributed under the License is distributed on an "AS IS" BASIS,
13    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14    See the License for the specific language governing permissions and
15    limitations under the License.
16 */
17
18 /*
19  * This source code is part of the near-RT RIC (RAN Intelligent Controller)
20  * platform project (RICP).
21  */
22
23 package sdlgoredis
24
25 import (
26         "errors"
27         "fmt"
28         "os"
29         "strconv"
30         "strings"
31         "time"
32
33         "github.com/go-redis/redis"
34 )
35
36 type ChannelNotificationCb func(channel string, payload ...string)
37
38 type intChannels struct {
39         addChannel    chan string
40         removeChannel chan string
41         exit          chan bool
42 }
43
44 type DB struct {
45         client       RedisClient
46         subscribe    SubscribeFn
47         redisModules bool
48         cbMap        map[string]ChannelNotificationCb
49         ch           intChannels
50 }
51
52 type Subscriber interface {
53         Channel() <-chan *redis.Message
54         Subscribe(channels ...string) error
55         Unsubscribe(channels ...string) error
56         Close() error
57 }
58
59 type SubscribeFn func(client RedisClient, channels ...string) Subscriber
60
61 type RedisClient interface {
62         Command() *redis.CommandsInfoCmd
63         Close() error
64         Subscribe(channels ...string) *redis.PubSub
65         MSet(pairs ...interface{}) *redis.StatusCmd
66         Do(args ...interface{}) *redis.Cmd
67         MGet(keys ...string) *redis.SliceCmd
68         Del(keys ...string) *redis.IntCmd
69         Keys(pattern string) *redis.StringSliceCmd
70         SetNX(key string, value interface{}, expiration time.Duration) *redis.BoolCmd
71         SAdd(key string, members ...interface{}) *redis.IntCmd
72         SRem(key string, members ...interface{}) *redis.IntCmd
73         SMembers(key string) *redis.StringSliceCmd
74         SIsMember(key string, member interface{}) *redis.BoolCmd
75         SCard(key string) *redis.IntCmd
76         PTTL(key string) *redis.DurationCmd
77         Eval(script string, keys []string, args ...interface{}) *redis.Cmd
78         EvalSha(sha1 string, keys []string, args ...interface{}) *redis.Cmd
79         ScriptExists(scripts ...string) *redis.BoolSliceCmd
80         ScriptLoad(script string) *redis.StringCmd
81 }
82
83 func checkResultAndError(result interface{}, err error) (bool, error) {
84         if err != nil {
85                 if err == redis.Nil {
86                         return false, nil
87                 }
88                 return false, err
89         }
90         if result == "OK" {
91                 return true, nil
92         }
93         return false, nil
94 }
95
96 func checkIntResultAndError(result interface{}, err error) (bool, error) {
97         if err != nil {
98                 return false, err
99         }
100         if result.(int) == int(1) {
101                 return true, nil
102         }
103         return false, nil
104 }
105
106 func subscribeNotifications(client RedisClient, channels ...string) Subscriber {
107         return client.Subscribe(channels...)
108 }
109
110 func CreateDB(client RedisClient, subscribe SubscribeFn) *DB {
111         db := DB{
112                 client:       client,
113                 subscribe:    subscribe,
114                 redisModules: true,
115                 cbMap:        make(map[string]ChannelNotificationCb, 0),
116                 ch: intChannels{
117                         addChannel:    make(chan string),
118                         removeChannel: make(chan string),
119                         exit:          make(chan bool),
120                 },
121         }
122
123         return &db
124 }
125
126 func Create() *DB {
127         var client *redis.Client
128         hostname := os.Getenv("DBAAS_SERVICE_HOST")
129         if hostname == "" {
130                 hostname = "localhost"
131         }
132         port := os.Getenv("DBAAS_SERVICE_PORT")
133         if port == "" {
134                 port = "6379"
135         }
136         sentinelPort := os.Getenv("DBAAS_SERVICE_SENTINEL_PORT")
137         masterName := os.Getenv("DBAAS_MASTER_NAME")
138         if sentinelPort == "" {
139                 redisAddress := hostname + ":" + port
140                 client = redis.NewClient(&redis.Options{
141                         Addr:       redisAddress,
142                         Password:   "", // no password set
143                         DB:         0,  // use default DB
144                         PoolSize:   20,
145                         MaxRetries: 2,
146                 })
147         } else {
148                 sentinelAddress := hostname + ":" + sentinelPort
149                 client = redis.NewFailoverClient(&redis.FailoverOptions{
150                         MasterName:    masterName,
151                         SentinelAddrs: []string{sentinelAddress},
152                         PoolSize:      20,
153                         MaxRetries:    2,
154                 })
155         }
156         db := CreateDB(client, subscribeNotifications)
157         db.CheckCommands()
158         return db
159 }
160
161 func (db *DB) CheckCommands() {
162         commands, err := db.client.Command().Result()
163         if err == nil {
164                 redisModuleCommands := []string{"setie", "delie", "setiepub", "setnxpub",
165                         "msetmpub", "delmpub"}
166                 for _, v := range redisModuleCommands {
167                         _, ok := commands[v]
168                         if !ok {
169                                 db.redisModules = false
170                         }
171                 }
172         } else {
173                 fmt.Println(err)
174         }
175 }
176
177 func (db *DB) CloseDB() error {
178         return db.client.Close()
179 }
180
181 func (db *DB) UnsubscribeChannelDB(channels ...string) {
182         for _, v := range channels {
183                 db.ch.removeChannel <- v
184                 delete(db.cbMap, v)
185                 if len(db.cbMap) == 0 {
186                         db.ch.exit <- true
187                 }
188         }
189 }
190
191 func (db *DB) SubscribeChannelDB(cb func(string, ...string), channelPrefix, eventSeparator string, channels ...string) {
192         if len(db.cbMap) == 0 {
193                 for _, v := range channels {
194                         db.cbMap[v] = cb
195                 }
196
197                 go func(cbMap *map[string]ChannelNotificationCb,
198                         channelPrefix,
199                         eventSeparator string,
200                         ch intChannels,
201                         channels ...string) {
202                         sub := db.subscribe(db.client, channels...)
203                         rxChannel := sub.Channel()
204                         for {
205                                 select {
206                                 case msg := <-rxChannel:
207                                         cb, ok := (*cbMap)[msg.Channel]
208                                         if ok {
209                                                 cb(strings.TrimPrefix(msg.Channel, channelPrefix), strings.Split(msg.Payload, eventSeparator)...)
210                                         }
211                                 case channel := <-ch.addChannel:
212                                         sub.Subscribe(channel)
213                                 case channel := <-ch.removeChannel:
214                                         sub.Unsubscribe(channel)
215                                 case exit := <-ch.exit:
216                                         if exit {
217                                                 if err := sub.Close(); err != nil {
218                                                         fmt.Println(err)
219                                                 }
220                                                 return
221                                         }
222                                 }
223                         }
224                 }(&db.cbMap, channelPrefix, eventSeparator, db.ch, channels...)
225
226         } else {
227                 for _, v := range channels {
228                         db.cbMap[v] = cb
229                         db.ch.addChannel <- v
230                 }
231         }
232 }
233
234 func (db *DB) MSet(pairs ...interface{}) error {
235         return db.client.MSet(pairs...).Err()
236 }
237
238 func (db *DB) MSetMPub(channelsAndEvents []string, pairs ...interface{}) error {
239         if !db.redisModules {
240                 return errors.New("Redis deployment doesn't support MSETMPUB command")
241         }
242         command := make([]interface{}, 0)
243         command = append(command, "MSETMPUB")
244         command = append(command, len(pairs)/2)
245         command = append(command, len(channelsAndEvents)/2)
246         for _, d := range pairs {
247                 command = append(command, d)
248         }
249         for _, d := range channelsAndEvents {
250                 command = append(command, d)
251         }
252         _, err := db.client.Do(command...).Result()
253         return err
254 }
255
256 func (db *DB) MGet(keys []string) ([]interface{}, error) {
257         return db.client.MGet(keys...).Result()
258 }
259
260 func (db *DB) DelMPub(channelsAndEvents []string, keys []string) error {
261         if !db.redisModules {
262                 return errors.New("Redis deployment not supporting command DELMPUB")
263         }
264         command := make([]interface{}, 0)
265         command = append(command, "DELMPUB")
266         command = append(command, len(keys))
267         command = append(command, len(channelsAndEvents)/2)
268         for _, d := range keys {
269                 command = append(command, d)
270         }
271         for _, d := range channelsAndEvents {
272                 command = append(command, d)
273         }
274         _, err := db.client.Do(command...).Result()
275         return err
276
277 }
278
279 func (db *DB) Del(keys []string) error {
280         _, err := db.client.Del(keys...).Result()
281         return err
282 }
283
284 func (db *DB) Keys(pattern string) ([]string, error) {
285         return db.client.Keys(pattern).Result()
286 }
287
288 func (db *DB) SetIE(key string, oldData, newData interface{}) (bool, error) {
289         if !db.redisModules {
290                 return false, errors.New("Redis deployment not supporting command")
291         }
292
293         return checkResultAndError(db.client.Do("SETIE", key, newData, oldData).Result())
294 }
295
296 func (db *DB) SetIEPub(channel, message, key string, oldData, newData interface{}) (bool, error) {
297         if !db.redisModules {
298                 return false, errors.New("Redis deployment not supporting command SETIEPUB")
299         }
300         return checkResultAndError(db.client.Do("SETIEPUB", key, newData, oldData, channel, message).Result())
301 }
302
303 func (db *DB) SetNXPub(channel, message, key string, data interface{}) (bool, error) {
304         if !db.redisModules {
305                 return false, errors.New("Redis deployment not supporting command SETNXPUB")
306         }
307         return checkResultAndError(db.client.Do("SETNXPUB", key, data, channel, message).Result())
308 }
309 func (db *DB) SetNX(key string, data interface{}, expiration time.Duration) (bool, error) {
310         return db.client.SetNX(key, data, expiration).Result()
311 }
312
313 func (db *DB) DelIEPub(channel, message, key string, data interface{}) (bool, error) {
314         if !db.redisModules {
315                 return false, errors.New("Redis deployment not supporting command")
316         }
317         return checkIntResultAndError(db.client.Do("DELIEPUB", key, data, channel, message).Result())
318 }
319
320 func (db *DB) DelIE(key string, data interface{}) (bool, error) {
321         if !db.redisModules {
322                 return false, errors.New("Redis deployment not supporting command")
323         }
324         return checkIntResultAndError(db.client.Do("DELIE", key, data).Result())
325 }
326
327 func (db *DB) SAdd(key string, data ...interface{}) error {
328         _, err := db.client.SAdd(key, data...).Result()
329         return err
330 }
331
332 func (db *DB) SRem(key string, data ...interface{}) error {
333         _, err := db.client.SRem(key, data...).Result()
334         return err
335 }
336
337 func (db *DB) SMembers(key string) ([]string, error) {
338         result, err := db.client.SMembers(key).Result()
339         return result, err
340 }
341
342 func (db *DB) SIsMember(key string, data interface{}) (bool, error) {
343         result, err := db.client.SIsMember(key, data).Result()
344         return result, err
345 }
346
347 func (db *DB) SCard(key string) (int64, error) {
348         result, err := db.client.SCard(key).Result()
349         return result, err
350 }
351
352 func (db *DB) PTTL(key string) (time.Duration, error) {
353         result, err := db.client.PTTL(key).Result()
354         return result, err
355 }
356
357 var luaRefresh = redis.NewScript(`if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("pexpire", KEYS[1], ARGV[2]) else return 0 end`)
358
359 func (db *DB) PExpireIE(key string, data interface{}, expiration time.Duration) error {
360         expirationStr := strconv.FormatInt(int64(expiration/time.Millisecond), 10)
361         result, err := luaRefresh.Run(db.client, []string{key}, data, expirationStr).Result()
362         if err != nil {
363                 return err
364         }
365         if result == int64(1) {
366                 return nil
367         }
368         return errors.New("Lock not held")
369 }