2 Copyright (c) 2019 AT&T Intellectual Property.
3 Copyright (c) 2018-2019 Nokia.
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
9 http://www.apache.org/licenses/LICENSE-2.0
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
28 "github.com/go-redis/redis"
31 type ChannelNotificationCb func(channel string, payload ...string)
33 type intChannels struct {
34 addChannel chan string
35 removeChannel chan string
43 cbMap map[string]ChannelNotificationCb
47 type Subscriber interface {
48 Channel() <-chan *redis.Message
49 Subscribe(channels ...string) error
50 Unsubscribe(channels ...string) error
54 type SubscribeFn func(client RedisClient, channels ...string) Subscriber
56 type RedisClient interface {
57 Command() *redis.CommandsInfoCmd
59 Subscribe(channels ...string) *redis.PubSub
60 MSet(pairs ...interface{}) *redis.StatusCmd
61 Do(args ...interface{}) *redis.Cmd
62 MGet(keys ...string) *redis.SliceCmd
63 Del(keys ...string) *redis.IntCmd
64 Keys(pattern string) *redis.StringSliceCmd
65 SetNX(key string, value interface{}, expiration time.Duration) *redis.BoolCmd
66 SAdd(key string, members ...interface{}) *redis.IntCmd
67 SRem(key string, members ...interface{}) *redis.IntCmd
68 SMembers(key string) *redis.StringSliceCmd
69 SIsMember(key string, member interface{}) *redis.BoolCmd
70 SCard(key string) *redis.IntCmd
71 PTTL(key string) *redis.DurationCmd
72 Eval(script string, keys []string, args ...interface{}) *redis.Cmd
73 EvalSha(sha1 string, keys []string, args ...interface{}) *redis.Cmd
74 ScriptExists(scripts ...string) *redis.BoolSliceCmd
75 ScriptLoad(script string) *redis.StringCmd
78 func checkResultAndError(result interface{}, err error) (bool, error) {
91 func checkIntResultAndError(result interface{}, err error) (bool, error) {
101 func subscribeNotifications(client RedisClient, channels ...string) Subscriber {
102 return client.Subscribe(channels...)
105 func CreateDB(client RedisClient, subscribe SubscribeFn) *DB {
108 subscribe: subscribe,
110 cbMap: make(map[string]ChannelNotificationCb, 0),
112 addChannel: make(chan string),
113 removeChannel: make(chan string),
114 exit: make(chan bool),
122 hostname := os.Getenv("DBAAS_SERVICE_HOST")
124 hostname = "localhost"
126 port := os.Getenv("DBAAS_SERVICE_PORT")
130 redisAddress := hostname + ":" + port
131 client := redis.NewClient(&redis.Options{
133 Password: "", // no password set
134 DB: 0, // use default DB
137 db := CreateDB(client, subscribeNotifications)
142 func (db *DB) CheckCommands() {
143 commands, err := db.client.Command().Result()
145 redisModuleCommands := []string{"setie", "delie", "setiepub", "setnxpub",
146 "msetmpub", "delmpub"}
147 for _, v := range redisModuleCommands {
150 db.redisModules = false
158 func (db *DB) CloseDB() error {
159 return db.client.Close()
162 func (db *DB) UnsubscribeChannelDB(channels ...string) {
163 for _, v := range channels {
164 db.ch.removeChannel <- v
166 if len(db.cbMap) == 0 {
172 func (db *DB) SubscribeChannelDB(cb ChannelNotificationCb, channelPrefix, eventSeparator string, channels ...string) {
173 if len(db.cbMap) == 0 {
174 for _, v := range channels {
178 go func(cbMap *map[string]ChannelNotificationCb,
180 eventSeparator string,
182 channels ...string) {
183 sub := db.subscribe(db.client, channels...)
184 rxChannel := sub.Channel()
187 case msg := <-rxChannel:
188 cb, ok := (*cbMap)[msg.Channel]
190 cb(strings.TrimPrefix(msg.Channel, channelPrefix), strings.Split(msg.Payload, eventSeparator)...)
192 case channel := <-ch.addChannel:
193 sub.Subscribe(channel)
194 case channel := <-ch.removeChannel:
195 sub.Unsubscribe(channel)
196 case exit := <-ch.exit:
198 if err := sub.Close(); err != nil {
205 }(&db.cbMap, channelPrefix, eventSeparator, db.ch, channels...)
208 for _, v := range channels {
210 db.ch.addChannel <- v
215 func (db *DB) MSet(pairs ...interface{}) error {
216 return db.client.MSet(pairs...).Err()
219 func (db *DB) MSetMPub(channelsAndEvents []string, pairs ...interface{}) error {
220 if !db.redisModules {
221 return errors.New("Redis deployment doesn't support MSETMPUB command")
223 command := make([]interface{}, 0)
224 command = append(command, "MSETMPUB")
225 command = append(command, len(pairs)/2)
226 command = append(command, len(channelsAndEvents)/2)
227 for _, d := range pairs {
228 command = append(command, d)
230 for _, d := range channelsAndEvents {
231 command = append(command, d)
233 _, err := db.client.Do(command...).Result()
237 func (db *DB) MGet(keys []string) ([]interface{}, error) {
238 return db.client.MGet(keys...).Result()
241 func (db *DB) DelMPub(channelsAndEvents []string, keys []string) error {
242 if !db.redisModules {
243 return errors.New("Redis deployment not supporting command DELMPUB")
245 command := make([]interface{}, 0)
246 command = append(command, "DELMPUB")
247 command = append(command, len(keys))
248 command = append(command, len(channelsAndEvents)/2)
249 for _, d := range keys {
250 command = append(command, d)
252 for _, d := range channelsAndEvents {
253 command = append(command, d)
255 _, err := db.client.Do(command...).Result()
260 func (db *DB) Del(keys []string) error {
261 _, err := db.client.Del(keys...).Result()
265 func (db *DB) Keys(pattern string) ([]string, error) {
266 return db.client.Keys(pattern).Result()
269 func (db *DB) SetIE(key string, oldData, newData interface{}) (bool, error) {
270 if !db.redisModules {
271 return false, errors.New("Redis deployment not supporting command")
274 return checkResultAndError(db.client.Do("SETIE", key, newData, oldData).Result())
277 func (db *DB) SetIEPub(channel, message, key string, oldData, newData interface{}) (bool, error) {
278 if !db.redisModules {
279 return false, errors.New("Redis deployment not supporting command SETIEPUB")
281 return checkResultAndError(db.client.Do("SETIEPUB", key, newData, oldData, channel, message).Result())
284 func (db *DB) SetNXPub(channel, message, key string, data interface{}) (bool, error) {
285 if !db.redisModules {
286 return false, errors.New("Redis deployment not supporting command SETNXPUB")
288 return checkResultAndError(db.client.Do("SETNXPUB", key, data, channel, message).Result())
290 func (db *DB) SetNX(key string, data interface{}, expiration time.Duration) (bool, error) {
291 return db.client.SetNX(key, data, expiration).Result()
294 func (db *DB) DelIEPub(channel, message, key string, data interface{}) (bool, error) {
295 if !db.redisModules {
296 return false, errors.New("Redis deployment not supporting command")
298 return checkIntResultAndError(db.client.Do("DELIEPUB", key, data, channel, message).Result())
301 func (db *DB) DelIE(key string, data interface{}) (bool, error) {
302 if !db.redisModules {
303 return false, errors.New("Redis deployment not supporting command")
305 return checkIntResultAndError(db.client.Do("DELIE", key, data).Result())
308 func (db *DB) SAdd(key string, data ...interface{}) error {
309 _, err := db.client.SAdd(key, data...).Result()
313 func (db *DB) SRem(key string, data ...interface{}) error {
314 _, err := db.client.SRem(key, data...).Result()
318 func (db *DB) SMembers(key string) ([]string, error) {
319 result, err := db.client.SMembers(key).Result()
323 func (db *DB) SIsMember(key string, data interface{}) (bool, error) {
324 result, err := db.client.SIsMember(key, data).Result()
328 func (db *DB) SCard(key string) (int64, error) {
329 result, err := db.client.SCard(key).Result()
333 func (db *DB) PTTL(key string) (time.Duration, error) {
334 result, err := db.client.PTTL(key).Result()
338 var luaRefresh = redis.NewScript(`if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("pexpire", KEYS[1], ARGV[2]) else return 0 end`)
340 func (db *DB) PExpireIE(key string, data interface{}, expiration time.Duration) error {
341 expirationStr := strconv.FormatInt(int64(expiration/time.Millisecond), 10)
342 result, err := luaRefresh.Run(db.client, []string{key}, data, expirationStr).Result()
346 if result == int64(1) {
349 return errors.New("Lock not held")