Support multiple event publishing
[ric-plt/sdlgo.git] / internal / sdlgoredis / sdlgoredis.go
1 /*
2    Copyright (c) 2019 AT&T Intellectual Property.
3    Copyright (c) 2018-2019 Nokia.
4
5    Licensed under the Apache License, Version 2.0 (the "License");
6    you may not use this file except in compliance with the License.
7    You may obtain a copy of the License at
8
9        http://www.apache.org/licenses/LICENSE-2.0
10
11    Unless required by applicable law or agreed to in writing, software
12    distributed under the License is distributed on an "AS IS" BASIS,
13    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14    See the License for the specific language governing permissions and
15    limitations under the License.
16 */
17
18 package sdlgoredis
19
20 import (
21         "errors"
22         "fmt"
23         "os"
24         "strings"
25         "time"
26
27         "github.com/go-redis/redis"
28 )
29
30 type ChannelNotificationCb func(channel string, payload ...string)
31
32 type intChannels struct {
33         addChannel    chan string
34         removeChannel chan string
35         exit          chan bool
36 }
37
38 type DB struct {
39         client       RedisClient
40         subscribe    SubscribeFn
41         redisModules bool
42         cbMap        map[string]ChannelNotificationCb
43         ch           intChannels
44 }
45
46 type Subscriber interface {
47         Channel() <-chan *redis.Message
48         Subscribe(channels ...string) error
49         Unsubscribe(channels ...string) error
50         Close() error
51 }
52
53 type SubscribeFn func(client RedisClient, channels ...string) Subscriber
54
55 type RedisClient interface {
56         Command() *redis.CommandsInfoCmd
57         Close() error
58         Subscribe(channels ...string) *redis.PubSub
59         MSet(pairs ...interface{}) *redis.StatusCmd
60         Do(args ...interface{}) *redis.Cmd
61         MGet(keys ...string) *redis.SliceCmd
62         Del(keys ...string) *redis.IntCmd
63         Keys(pattern string) *redis.StringSliceCmd
64         SetNX(key string, value interface{}, expiration time.Duration) *redis.BoolCmd
65         SAdd(key string, members ...interface{}) *redis.IntCmd
66         SRem(key string, members ...interface{}) *redis.IntCmd
67         SMembers(key string) *redis.StringSliceCmd
68         SIsMember(key string, member interface{}) *redis.BoolCmd
69         SCard(key string) *redis.IntCmd
70 }
71
72 func checkResultAndError(result interface{}, err error) (bool, error) {
73         if err != nil {
74                 if err == redis.Nil {
75                         return false, nil
76                 }
77                 return false, err
78         }
79         if result == "OK" {
80                 return true, nil
81         }
82         return false, nil
83 }
84
85 func checkIntResultAndError(result interface{}, err error) (bool, error) {
86         if err != nil {
87                 return false, err
88         }
89         if result == 1 {
90                 return true, nil
91         }
92         return false, nil
93 }
94
95 func subscribeNotifications(client RedisClient, channels ...string) Subscriber {
96         return client.Subscribe(channels...)
97 }
98
99 func CreateDB(client RedisClient, subscribe SubscribeFn) *DB {
100         db := DB{
101                 client:       client,
102                 subscribe:    subscribe,
103                 redisModules: true,
104                 cbMap:        make(map[string]ChannelNotificationCb, 0),
105                 ch: intChannels{
106                         addChannel:    make(chan string),
107                         removeChannel: make(chan string),
108                         exit:          make(chan bool),
109                 },
110         }
111
112         return &db
113 }
114
115 func Create() *DB {
116         hostname := os.Getenv("DBAAS_SERVICE_HOST")
117         if hostname == "" {
118                 hostname = "localhost"
119         }
120         port := os.Getenv("DBAAS_SERVICE_PORT")
121         if port == "" {
122                 port = "6379"
123         }
124         redisAddress := hostname + ":" + port
125         client := redis.NewClient(&redis.Options{
126                 Addr:     redisAddress,
127                 Password: "", // no password set
128                 DB:       0,  // use default DB
129                 PoolSize: 20,
130         })
131         db := CreateDB(client, subscribeNotifications)
132         db.CheckCommands()
133         return db
134 }
135
136 func (db *DB) CheckCommands() {
137         commands, err := db.client.Command().Result()
138         if err == nil {
139                 redisModuleCommands := []string{"setie", "delie", "setiepub", "setnxpub",
140                         "msetmpub", "delmpub"}
141                 for _, v := range redisModuleCommands {
142                         _, ok := commands[v]
143                         if !ok {
144                                 db.redisModules = false
145                         }
146                 }
147         } else {
148                 fmt.Println(err)
149         }
150 }
151
152 func (db *DB) CloseDB() error {
153         return db.client.Close()
154 }
155
156 func (db *DB) UnsubscribeChannelDB(channels ...string) {
157         for _, v := range channels {
158                 db.ch.removeChannel <- v
159                 delete(db.cbMap, v)
160                 if len(db.cbMap) == 0 {
161                         db.ch.exit <- true
162                 }
163         }
164 }
165
166 func (db *DB) SubscribeChannelDB(cb ChannelNotificationCb, channelPrefix, eventSeparator string, channels ...string) {
167         if len(db.cbMap) == 0 {
168                 for _, v := range channels {
169                         db.cbMap[v] = cb
170                 }
171
172                 go func(cbMap *map[string]ChannelNotificationCb,
173                         channelPrefix,
174                         eventSeparator string,
175                         ch intChannels,
176                         channels ...string) {
177                         sub := db.subscribe(db.client, channels...)
178                         rxChannel := sub.Channel()
179                         for {
180                                 select {
181                                 case msg := <-rxChannel:
182                                         cb, ok := (*cbMap)[msg.Channel]
183                                         if ok {
184                                                 cb(strings.TrimPrefix(msg.Channel, channelPrefix), strings.Split(msg.Payload, eventSeparator)...)
185                                         }
186                                 case channel := <-ch.addChannel:
187                                         sub.Subscribe(channel)
188                                 case channel := <-ch.removeChannel:
189                                         sub.Unsubscribe(channel)
190                                 case exit := <-ch.exit:
191                                         if exit {
192                                                 if err := sub.Close(); err != nil {
193                                                         fmt.Println(err)
194                                                 }
195                                                 return
196                                         }
197                                 }
198                         }
199                 }(&db.cbMap, channelPrefix, eventSeparator, db.ch, channels...)
200
201         } else {
202                 for _, v := range channels {
203                         db.cbMap[v] = cb
204                         db.ch.addChannel <- v
205                 }
206         }
207 }
208
209 func (db *DB) MSet(pairs ...interface{}) error {
210         return db.client.MSet(pairs...).Err()
211 }
212
213 func (db *DB) MSetMPub(channelsAndEvents []string, pairs ...interface{}) error {
214         if !db.redisModules {
215                 return errors.New("Redis deployment doesn't support MSETMPUB command")
216         }
217         command := make([]interface{}, 0)
218         command = append(command, "MSETMPUB")
219         command = append(command, len(pairs)/2)
220         command = append(command, len(channelsAndEvents)/2)
221         for _, d := range pairs {
222                 command = append(command, d)
223         }
224         for _, d := range channelsAndEvents {
225                 command = append(command, d)
226         }
227         _, err := db.client.Do(command...).Result()
228         return err
229 }
230
231 func (db *DB) MGet(keys []string) ([]interface{}, error) {
232         return db.client.MGet(keys...).Result()
233 }
234
235 func (db *DB) DelMPub(channelsAndEvents []string, keys []string) error {
236         if !db.redisModules {
237                 return errors.New("Redis deployment not supporting command DELMPUB")
238         }
239         command := make([]interface{}, 0)
240         command = append(command, "DELMPUB")
241         command = append(command, len(keys))
242         command = append(command, len(channelsAndEvents)/2)
243         for _, d := range keys {
244                 command = append(command, d)
245         }
246         for _, d := range channelsAndEvents {
247                 command = append(command, d)
248         }
249         _, err := db.client.Do(command...).Result()
250         return err
251
252 }
253
254 func (db *DB) Del(keys []string) error {
255         _, err := db.client.Del(keys...).Result()
256         return err
257 }
258
259 func (db *DB) Keys(pattern string) ([]string, error) {
260         return db.client.Keys(pattern).Result()
261 }
262
263 func (db *DB) SetIE(key string, oldData, newData interface{}) (bool, error) {
264         if !db.redisModules {
265                 return false, errors.New("Redis deployment not supporting command")
266         }
267
268         return checkResultAndError(db.client.Do("SETIE", key, newData, oldData).Result())
269 }
270
271 func (db *DB) SetIEPub(channel, message, key string, oldData, newData interface{}) (bool, error) {
272         if !db.redisModules {
273                 return false, errors.New("Redis deployment not supporting command SETIEPUB")
274         }
275         return checkResultAndError(db.client.Do("SETIEPUB", key, newData, oldData, channel, message).Result())
276 }
277
278 func (db *DB) SetNXPub(channel, message, key string, data interface{}) (bool, error) {
279         if !db.redisModules {
280                 return false, errors.New("Redis deployment not supporting command SETNXPUB")
281         }
282         return checkResultAndError(db.client.Do("SETNXPUB", key, data, channel, message).Result())
283 }
284 func (db *DB) SetNX(key string, data interface{}) (bool, error) {
285         return db.client.SetNX(key, data, 0).Result()
286 }
287
288 func (db *DB) DelIEPub(channel, message, key string, data interface{}) (bool, error) {
289         if !db.redisModules {
290                 return false, errors.New("Redis deployment not supporting command")
291         }
292         return checkIntResultAndError(db.client.Do("DELIEPUB", key, data, channel, message).Result())
293 }
294
295 func (db *DB) DelIE(key string, data interface{}) (bool, error) {
296         if !db.redisModules {
297                 return false, errors.New("Redis deployment not supporting command")
298         }
299         return checkIntResultAndError(db.client.Do("DELIE", key, data).Result())
300 }
301
302 func (db *DB) SAdd(key string, data ...interface{}) error {
303         _, err := db.client.SAdd(key, data...).Result()
304         return err
305 }
306
307 func (db *DB) SRem(key string, data ...interface{}) error {
308         _, err := db.client.SRem(key, data...).Result()
309         return err
310 }
311
312 func (db *DB) SMembers(key string) ([]string, error) {
313         result, err := db.client.SMembers(key).Result()
314         return result, err
315 }
316
317 func (db *DB) SIsMember(key string, data interface{}) (bool, error) {
318         result, err := db.client.SIsMember(key, data).Result()
319         return result, err
320 }
321
322 func (db *DB) SCard(key string) (int64, error) {
323         result, err := db.client.SCard(key).Result()
324         return result, err
325 }