Add support for notifications
[ric-plt/sdlgo.git] / internal / sdlgoredis / sdlgoredis.go
1 /*
2    Copyright (c) 2019 AT&T Intellectual Property.
3    Copyright (c) 2018-2019 Nokia.
4
5    Licensed under the Apache License, Version 2.0 (the "License");
6    you may not use this file except in compliance with the License.
7    You may obtain a copy of the License at
8
9        http://www.apache.org/licenses/LICENSE-2.0
10
11    Unless required by applicable law or agreed to in writing, software
12    distributed under the License is distributed on an "AS IS" BASIS,
13    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14    See the License for the specific language governing permissions and
15    limitations under the License.
16 */
17
18 package sdlgoredis
19
20 import (
21         "errors"
22         "fmt"
23         "os"
24         "strings"
25
26         "github.com/go-redis/redis"
27 )
28
29 type ChannelNotificationCb func(channel string, payload ...string)
30
31 type intChannels struct {
32         addChannel    chan string
33         removeChannel chan string
34         exit          chan bool
35 }
36
37 type DB struct {
38         client       *redis.Client
39         redisModules bool
40         cbMap        map[string]ChannelNotificationCb
41         ch           intChannels
42 }
43
44 func checkResultAndError(result interface{}, err error) (bool, error) {
45         if err != nil {
46                 if err == redis.Nil {
47                         return false, nil
48                 }
49                 return false, err
50         }
51         if result == "OK" {
52                 return true, nil
53         } else {
54                 return false, nil
55         }
56 }
57
58 func checkIntResultAndError(result interface{}, err error) (bool, error) {
59         if err != nil {
60                 return false, err
61         }
62         if result.(int64) == 1 {
63                 return true, nil
64         } else {
65                 return false, nil
66         }
67
68 }
69
70 func Create() *DB {
71         hostname := os.Getenv("DBAAS_SERVICE_HOST")
72         if hostname == "" {
73                 hostname = "localhost"
74         }
75         port := os.Getenv("DBAAS_SERVICE_PORT")
76         if port == "" {
77                 port = "6379"
78         }
79         redisAddress := hostname + ":" + port
80         client := redis.NewClient(&redis.Options{
81                 Addr:     redisAddress,
82                 Password: "", // no password set
83                 DB:       0,  // use default DB
84                 PoolSize: 20,
85         })
86
87         db := DB{
88                 client:       client,
89                 redisModules: true,
90                 cbMap:        make(map[string]ChannelNotificationCb, 0),
91                 ch: intChannels{
92                         addChannel:    make(chan string),
93                         removeChannel: make(chan string),
94                         exit:          make(chan bool),
95                 },
96         }
97
98         commands, err := db.client.Command().Result()
99         if err == nil {
100                 redisModuleCommands := []string{"setie", "delie", "msetpub", "setiepub", "setnxpub", "delpub"}
101                 for _, v := range redisModuleCommands {
102                         _, ok := commands[v]
103                         if !ok {
104                                 db.redisModules = false
105                         }
106                 }
107         } else {
108                 fmt.Println(err)
109         }
110         return &db
111 }
112
113 func (db *DB) CloseDB() error {
114         return db.client.Close()
115 }
116
117 func (db *DB) UnsubscribeChannelDB(channels ...string) {
118         for _, v := range channels {
119                 db.ch.removeChannel <- v
120                 delete(db.cbMap, v)
121                 if len(db.cbMap) == 0 {
122                         db.ch.exit <- true
123                 }
124         }
125 }
126
127 func (db *DB) SubscribeChannelDB(cb ChannelNotificationCb, channelPrefix, eventSeparator string, channels ...string) {
128         if len(db.cbMap) == 0 {
129                 for _, v := range channels {
130                         db.cbMap[v] = cb
131                 }
132
133                 go func(cbMap *map[string]ChannelNotificationCb,
134                         channelPrefix,
135                         eventSeparator string,
136                         ch intChannels,
137                         channels ...string) {
138                         sub := db.client.Subscribe(channels...)
139                         rxChannel := sub.Channel()
140                         for {
141                                 select {
142                                 case msg := <-rxChannel:
143                                         cb, ok := (*cbMap)[msg.Channel]
144                                         if ok {
145                                                 cb(strings.TrimPrefix(msg.Channel, channelPrefix), strings.Split(msg.Payload, eventSeparator)...)
146                                         }
147                                 case channel := <-ch.addChannel:
148                                         sub.Subscribe(channel)
149                                 case channel := <-ch.removeChannel:
150                                         sub.Unsubscribe(channel)
151                                 case exit := <-ch.exit:
152                                         if exit {
153                                                 if err := sub.Close(); err != nil {
154                                                         fmt.Println(err)
155                                                 }
156                                                 return
157                                         }
158                                 }
159                         }
160                 }(&db.cbMap, channelPrefix, eventSeparator, db.ch, channels...)
161
162         } else {
163                 for _, v := range channels {
164                         db.cbMap[v] = cb
165                         db.ch.addChannel <- v
166                 }
167         }
168 }
169
170 func (db *DB) MSet(pairs ...interface{}) error {
171         return db.client.MSet(pairs...).Err()
172 }
173
174 func (db *DB) MSetPub(channel, message string, pairs ...interface{}) error {
175         if !db.redisModules {
176                 return errors.New("Redis deployment doesn't support MSETPUB command")
177         }
178         command := make([]interface{}, 0)
179         command = append(command, "MSETPUB")
180         for _, d := range pairs {
181                 command = append(command, d)
182         }
183         command = append(command, channel, message)
184         _, err := db.client.Do(command...).Result()
185         return err
186 }
187
188 func (db *DB) MGet(keys []string) ([]interface{}, error) {
189         return db.client.MGet(keys...).Result()
190 }
191
192 func (db *DB) DelPub(channel, message string, keys []string) error {
193         if !db.redisModules {
194                 return errors.New("Redis deployment not supporting command DELPUB")
195         }
196         command := make([]interface{}, 0)
197         command = append(command, "DELPUB")
198         for _, d := range keys {
199                 command = append(command, d)
200         }
201         command = append(command, channel, message)
202         _, err := db.client.Do(command...).Result()
203         return err
204 }
205
206 func (db *DB) Del(keys []string) error {
207         _, err := db.client.Del(keys...).Result()
208         return err
209 }
210
211 func (db *DB) Keys(pattern string) ([]string, error) {
212         return db.client.Keys(pattern).Result()
213 }
214
215 func (db *DB) SetIE(key string, oldData, newData interface{}) (bool, error) {
216         if !db.redisModules {
217                 return false, errors.New("Redis deployment not supporting command")
218         }
219
220         return checkResultAndError(db.client.Do("SETIE", key, newData, oldData).Result())
221 }
222
223 func (db *DB) SetIEPub(channel, message, key string, oldData, newData interface{}) (bool, error) {
224         if !db.redisModules {
225                 return false, errors.New("Redis deployment not supporting command SETIEPUB")
226         }
227         return checkResultAndError(db.client.Do("SETIEPUB", key, newData, oldData, channel, message).Result())
228 }
229
230 func (db *DB) SetNXPub(channel, message, key string, data interface{}) (bool, error) {
231         if !db.redisModules {
232                 return false, errors.New("Redis deployment not supporting command SETNXPUB")
233         }
234         return checkResultAndError(db.client.Do("SETNXPUB", key, data, channel, message).Result())
235 }
236 func (db *DB) SetNX(key string, data interface{}) (bool, error) {
237         return db.client.SetNX(key, data, 0).Result()
238 }
239
240 func (db *DB) DelIEPub(channel, message, key string, data interface{}) (bool, error) {
241         if !db.redisModules {
242                 return false, errors.New("Redis deployment not supporting command")
243         }
244         return checkIntResultAndError(db.client.Do("DELIEPUB", key, data, channel, message).Result())
245 }
246
247 func (db *DB) DelIE(key string, data interface{}) (bool, error) {
248         if !db.redisModules {
249                 return false, errors.New("Redis deployment not supporting command")
250         }
251         return checkIntResultAndError(db.client.Do("DELIE", key, data).Result())
252 }