Add support for CI jobs
[ric-plt/sdlgo.git] / internal / sdlgoredis / sdlgoredis.go
1 /*
2    Copyright (c) 2019 AT&T Intellectual Property.
3    Copyright (c) 2018-2019 Nokia.
4
5    Licensed under the Apache License, Version 2.0 (the "License");
6    you may not use this file except in compliance with the License.
7    You may obtain a copy of the License at
8
9        http://www.apache.org/licenses/LICENSE-2.0
10
11    Unless required by applicable law or agreed to in writing, software
12    distributed under the License is distributed on an "AS IS" BASIS,
13    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14    See the License for the specific language governing permissions and
15    limitations under the License.
16 */
17
18 package sdlgoredis
19
20 import (
21         "errors"
22         "fmt"
23         "os"
24         "strconv"
25         "strings"
26         "time"
27
28         "github.com/go-redis/redis"
29 )
30
31 type ChannelNotificationCb func(channel string, payload ...string)
32
33 type intChannels struct {
34         addChannel    chan string
35         removeChannel chan string
36         exit          chan bool
37 }
38
39 type DB struct {
40         client       RedisClient
41         subscribe    SubscribeFn
42         redisModules bool
43         cbMap        map[string]ChannelNotificationCb
44         ch           intChannels
45 }
46
47 type Subscriber interface {
48         Channel() <-chan *redis.Message
49         Subscribe(channels ...string) error
50         Unsubscribe(channels ...string) error
51         Close() error
52 }
53
54 type SubscribeFn func(client RedisClient, channels ...string) Subscriber
55
56 type RedisClient interface {
57         Command() *redis.CommandsInfoCmd
58         Close() error
59         Subscribe(channels ...string) *redis.PubSub
60         MSet(pairs ...interface{}) *redis.StatusCmd
61         Do(args ...interface{}) *redis.Cmd
62         MGet(keys ...string) *redis.SliceCmd
63         Del(keys ...string) *redis.IntCmd
64         Keys(pattern string) *redis.StringSliceCmd
65         SetNX(key string, value interface{}, expiration time.Duration) *redis.BoolCmd
66         SAdd(key string, members ...interface{}) *redis.IntCmd
67         SRem(key string, members ...interface{}) *redis.IntCmd
68         SMembers(key string) *redis.StringSliceCmd
69         SIsMember(key string, member interface{}) *redis.BoolCmd
70         SCard(key string) *redis.IntCmd
71         PTTL(key string) *redis.DurationCmd
72         Eval(script string, keys []string, args ...interface{}) *redis.Cmd
73         EvalSha(sha1 string, keys []string, args ...interface{}) *redis.Cmd
74         ScriptExists(scripts ...string) *redis.BoolSliceCmd
75         ScriptLoad(script string) *redis.StringCmd
76 }
77
78 func checkResultAndError(result interface{}, err error) (bool, error) {
79         if err != nil {
80                 if err == redis.Nil {
81                         return false, nil
82                 }
83                 return false, err
84         }
85         if result == "OK" {
86                 return true, nil
87         }
88         return false, nil
89 }
90
91 func checkIntResultAndError(result interface{}, err error) (bool, error) {
92         if err != nil {
93                 return false, err
94         }
95         if result.(int) == int(1) {
96                 return true, nil
97         }
98         return false, nil
99 }
100
101 func subscribeNotifications(client RedisClient, channels ...string) Subscriber {
102         return client.Subscribe(channels...)
103 }
104
105 func CreateDB(client RedisClient, subscribe SubscribeFn) *DB {
106         db := DB{
107                 client:       client,
108                 subscribe:    subscribe,
109                 redisModules: true,
110                 cbMap:        make(map[string]ChannelNotificationCb, 0),
111                 ch: intChannels{
112                         addChannel:    make(chan string),
113                         removeChannel: make(chan string),
114                         exit:          make(chan bool),
115                 },
116         }
117
118         return &db
119 }
120
121 func Create() *DB {
122         var client *redis.Client
123         hostname := os.Getenv("DBAAS_SERVICE_HOST")
124         if hostname == "" {
125                 hostname = "localhost"
126         }
127         port := os.Getenv("DBAAS_SERVICE_PORT")
128         if port == "" {
129                 port = "6379"
130         }
131         sentinelPort := os.Getenv("DBAAS_SERVICE_SENTINEL_PORT")
132         masterName := os.Getenv("DBAAS_MASTER_NAME")
133         if sentinelPort == "" {
134                 redisAddress := hostname + ":" + port
135                 client = redis.NewClient(&redis.Options{
136                         Addr:       redisAddress,
137                         Password:   "", // no password set
138                         DB:         0,  // use default DB
139                         PoolSize:   20,
140                         MaxRetries: 2,
141                 })
142         } else {
143                 sentinelAddress := hostname + ":" + sentinelPort
144                 client = redis.NewFailoverClient(&redis.FailoverOptions{
145                         MasterName:    masterName,
146                         SentinelAddrs: []string{sentinelAddress},
147                         PoolSize:      20,
148                         MaxRetries:    2,
149                 })
150         }
151         db := CreateDB(client, subscribeNotifications)
152         db.CheckCommands()
153         return db
154 }
155
156 func (db *DB) CheckCommands() {
157         commands, err := db.client.Command().Result()
158         if err == nil {
159                 redisModuleCommands := []string{"setie", "delie", "setiepub", "setnxpub",
160                         "msetmpub", "delmpub"}
161                 for _, v := range redisModuleCommands {
162                         _, ok := commands[v]
163                         if !ok {
164                                 db.redisModules = false
165                         }
166                 }
167         } else {
168                 fmt.Println(err)
169         }
170 }
171
172 func (db *DB) CloseDB() error {
173         return db.client.Close()
174 }
175
176 func (db *DB) UnsubscribeChannelDB(channels ...string) {
177         for _, v := range channels {
178                 db.ch.removeChannel <- v
179                 delete(db.cbMap, v)
180                 if len(db.cbMap) == 0 {
181                         db.ch.exit <- true
182                 }
183         }
184 }
185
186 func (db *DB) SubscribeChannelDB(cb func(string, ...string), channelPrefix, eventSeparator string, channels ...string) {
187         if len(db.cbMap) == 0 {
188                 for _, v := range channels {
189                         db.cbMap[v] = cb
190                 }
191
192                 go func(cbMap *map[string]ChannelNotificationCb,
193                         channelPrefix,
194                         eventSeparator string,
195                         ch intChannels,
196                         channels ...string) {
197                         sub := db.subscribe(db.client, channels...)
198                         rxChannel := sub.Channel()
199                         for {
200                                 select {
201                                 case msg := <-rxChannel:
202                                         cb, ok := (*cbMap)[msg.Channel]
203                                         if ok {
204                                                 cb(strings.TrimPrefix(msg.Channel, channelPrefix), strings.Split(msg.Payload, eventSeparator)...)
205                                         }
206                                 case channel := <-ch.addChannel:
207                                         sub.Subscribe(channel)
208                                 case channel := <-ch.removeChannel:
209                                         sub.Unsubscribe(channel)
210                                 case exit := <-ch.exit:
211                                         if exit {
212                                                 if err := sub.Close(); err != nil {
213                                                         fmt.Println(err)
214                                                 }
215                                                 return
216                                         }
217                                 }
218                         }
219                 }(&db.cbMap, channelPrefix, eventSeparator, db.ch, channels...)
220
221         } else {
222                 for _, v := range channels {
223                         db.cbMap[v] = cb
224                         db.ch.addChannel <- v
225                 }
226         }
227 }
228
229 func (db *DB) MSet(pairs ...interface{}) error {
230         return db.client.MSet(pairs...).Err()
231 }
232
233 func (db *DB) MSetMPub(channelsAndEvents []string, pairs ...interface{}) error {
234         if !db.redisModules {
235                 return errors.New("Redis deployment doesn't support MSETMPUB command")
236         }
237         command := make([]interface{}, 0)
238         command = append(command, "MSETMPUB")
239         command = append(command, len(pairs)/2)
240         command = append(command, len(channelsAndEvents)/2)
241         for _, d := range pairs {
242                 command = append(command, d)
243         }
244         for _, d := range channelsAndEvents {
245                 command = append(command, d)
246         }
247         _, err := db.client.Do(command...).Result()
248         return err
249 }
250
251 func (db *DB) MGet(keys []string) ([]interface{}, error) {
252         return db.client.MGet(keys...).Result()
253 }
254
255 func (db *DB) DelMPub(channelsAndEvents []string, keys []string) error {
256         if !db.redisModules {
257                 return errors.New("Redis deployment not supporting command DELMPUB")
258         }
259         command := make([]interface{}, 0)
260         command = append(command, "DELMPUB")
261         command = append(command, len(keys))
262         command = append(command, len(channelsAndEvents)/2)
263         for _, d := range keys {
264                 command = append(command, d)
265         }
266         for _, d := range channelsAndEvents {
267                 command = append(command, d)
268         }
269         _, err := db.client.Do(command...).Result()
270         return err
271
272 }
273
274 func (db *DB) Del(keys []string) error {
275         _, err := db.client.Del(keys...).Result()
276         return err
277 }
278
279 func (db *DB) Keys(pattern string) ([]string, error) {
280         return db.client.Keys(pattern).Result()
281 }
282
283 func (db *DB) SetIE(key string, oldData, newData interface{}) (bool, error) {
284         if !db.redisModules {
285                 return false, errors.New("Redis deployment not supporting command")
286         }
287
288         return checkResultAndError(db.client.Do("SETIE", key, newData, oldData).Result())
289 }
290
291 func (db *DB) SetIEPub(channel, message, key string, oldData, newData interface{}) (bool, error) {
292         if !db.redisModules {
293                 return false, errors.New("Redis deployment not supporting command SETIEPUB")
294         }
295         return checkResultAndError(db.client.Do("SETIEPUB", key, newData, oldData, channel, message).Result())
296 }
297
298 func (db *DB) SetNXPub(channel, message, key string, data interface{}) (bool, error) {
299         if !db.redisModules {
300                 return false, errors.New("Redis deployment not supporting command SETNXPUB")
301         }
302         return checkResultAndError(db.client.Do("SETNXPUB", key, data, channel, message).Result())
303 }
304 func (db *DB) SetNX(key string, data interface{}, expiration time.Duration) (bool, error) {
305         return db.client.SetNX(key, data, expiration).Result()
306 }
307
308 func (db *DB) DelIEPub(channel, message, key string, data interface{}) (bool, error) {
309         if !db.redisModules {
310                 return false, errors.New("Redis deployment not supporting command")
311         }
312         return checkIntResultAndError(db.client.Do("DELIEPUB", key, data, channel, message).Result())
313 }
314
315 func (db *DB) DelIE(key string, data interface{}) (bool, error) {
316         if !db.redisModules {
317                 return false, errors.New("Redis deployment not supporting command")
318         }
319         return checkIntResultAndError(db.client.Do("DELIE", key, data).Result())
320 }
321
322 func (db *DB) SAdd(key string, data ...interface{}) error {
323         _, err := db.client.SAdd(key, data...).Result()
324         return err
325 }
326
327 func (db *DB) SRem(key string, data ...interface{}) error {
328         _, err := db.client.SRem(key, data...).Result()
329         return err
330 }
331
332 func (db *DB) SMembers(key string) ([]string, error) {
333         result, err := db.client.SMembers(key).Result()
334         return result, err
335 }
336
337 func (db *DB) SIsMember(key string, data interface{}) (bool, error) {
338         result, err := db.client.SIsMember(key, data).Result()
339         return result, err
340 }
341
342 func (db *DB) SCard(key string) (int64, error) {
343         result, err := db.client.SCard(key).Result()
344         return result, err
345 }
346
347 func (db *DB) PTTL(key string) (time.Duration, error) {
348         result, err := db.client.PTTL(key).Result()
349         return result, err
350 }
351
352 var luaRefresh = redis.NewScript(`if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("pexpire", KEYS[1], ARGV[2]) else return 0 end`)
353
354 func (db *DB) PExpireIE(key string, data interface{}, expiration time.Duration) error {
355         expirationStr := strconv.FormatInt(int64(expiration/time.Millisecond), 10)
356         result, err := luaRefresh.Run(db.client, []string{key}, data, expirationStr).Result()
357         if err != nil {
358                 return err
359         }
360         if result == int64(1) {
361                 return nil
362         }
363         return errors.New("Lock not held")
364 }