2 Copyright (c) 2019 AT&T Intellectual Property.
3 Copyright (c) 2018-2019 Nokia.
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
9 http://www.apache.org/licenses/LICENSE-2.0
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
28 "github.com/go-redis/redis"
31 type ChannelNotificationCb func(channel string, payload ...string)
33 type intChannels struct {
34 addChannel chan string
35 removeChannel chan string
43 cbMap map[string]ChannelNotificationCb
47 type Subscriber interface {
48 Channel() <-chan *redis.Message
49 Subscribe(channels ...string) error
50 Unsubscribe(channels ...string) error
54 type SubscribeFn func(client RedisClient, channels ...string) Subscriber
56 type RedisClient interface {
57 Command() *redis.CommandsInfoCmd
59 Subscribe(channels ...string) *redis.PubSub
60 MSet(pairs ...interface{}) *redis.StatusCmd
61 Do(args ...interface{}) *redis.Cmd
62 MGet(keys ...string) *redis.SliceCmd
63 Del(keys ...string) *redis.IntCmd
64 Keys(pattern string) *redis.StringSliceCmd
65 SetNX(key string, value interface{}, expiration time.Duration) *redis.BoolCmd
66 SAdd(key string, members ...interface{}) *redis.IntCmd
67 SRem(key string, members ...interface{}) *redis.IntCmd
68 SMembers(key string) *redis.StringSliceCmd
69 SIsMember(key string, member interface{}) *redis.BoolCmd
70 SCard(key string) *redis.IntCmd
71 PTTL(key string) *redis.DurationCmd
72 Eval(script string, keys []string, args ...interface{}) *redis.Cmd
73 EvalSha(sha1 string, keys []string, args ...interface{}) *redis.Cmd
74 ScriptExists(scripts ...string) *redis.BoolSliceCmd
75 ScriptLoad(script string) *redis.StringCmd
78 func checkResultAndError(result interface{}, err error) (bool, error) {
91 func checkIntResultAndError(result interface{}, err error) (bool, error) {
101 func subscribeNotifications(client RedisClient, channels ...string) Subscriber {
102 return client.Subscribe(channels...)
105 func CreateDB(client RedisClient, subscribe SubscribeFn) *DB {
108 subscribe: subscribe,
110 cbMap: make(map[string]ChannelNotificationCb, 0),
112 addChannel: make(chan string),
113 removeChannel: make(chan string),
114 exit: make(chan bool),
122 hostname := os.Getenv("DBAAS_SERVICE_HOST")
124 hostname = "localhost"
126 port := os.Getenv("DBAAS_SERVICE_PORT")
130 redisAddress := hostname + ":" + port
131 client := redis.NewClient(&redis.Options{
133 Password: "", // no password set
134 DB: 0, // use default DB
138 db := CreateDB(client, subscribeNotifications)
143 func (db *DB) CheckCommands() {
144 commands, err := db.client.Command().Result()
146 redisModuleCommands := []string{"setie", "delie", "setiepub", "setnxpub",
147 "msetmpub", "delmpub"}
148 for _, v := range redisModuleCommands {
151 db.redisModules = false
159 func (db *DB) CloseDB() error {
160 return db.client.Close()
163 func (db *DB) UnsubscribeChannelDB(channels ...string) {
164 for _, v := range channels {
165 db.ch.removeChannel <- v
167 if len(db.cbMap) == 0 {
173 func (db *DB) SubscribeChannelDB(cb ChannelNotificationCb, channelPrefix, eventSeparator string, channels ...string) {
174 if len(db.cbMap) == 0 {
175 for _, v := range channels {
179 go func(cbMap *map[string]ChannelNotificationCb,
181 eventSeparator string,
183 channels ...string) {
184 sub := db.subscribe(db.client, channels...)
185 rxChannel := sub.Channel()
188 case msg := <-rxChannel:
189 cb, ok := (*cbMap)[msg.Channel]
191 cb(strings.TrimPrefix(msg.Channel, channelPrefix), strings.Split(msg.Payload, eventSeparator)...)
193 case channel := <-ch.addChannel:
194 sub.Subscribe(channel)
195 case channel := <-ch.removeChannel:
196 sub.Unsubscribe(channel)
197 case exit := <-ch.exit:
199 if err := sub.Close(); err != nil {
206 }(&db.cbMap, channelPrefix, eventSeparator, db.ch, channels...)
209 for _, v := range channels {
211 db.ch.addChannel <- v
216 func (db *DB) MSet(pairs ...interface{}) error {
217 return db.client.MSet(pairs...).Err()
220 func (db *DB) MSetMPub(channelsAndEvents []string, pairs ...interface{}) error {
221 if !db.redisModules {
222 return errors.New("Redis deployment doesn't support MSETMPUB command")
224 command := make([]interface{}, 0)
225 command = append(command, "MSETMPUB")
226 command = append(command, len(pairs)/2)
227 command = append(command, len(channelsAndEvents)/2)
228 for _, d := range pairs {
229 command = append(command, d)
231 for _, d := range channelsAndEvents {
232 command = append(command, d)
234 _, err := db.client.Do(command...).Result()
238 func (db *DB) MGet(keys []string) ([]interface{}, error) {
239 return db.client.MGet(keys...).Result()
242 func (db *DB) DelMPub(channelsAndEvents []string, keys []string) error {
243 if !db.redisModules {
244 return errors.New("Redis deployment not supporting command DELMPUB")
246 command := make([]interface{}, 0)
247 command = append(command, "DELMPUB")
248 command = append(command, len(keys))
249 command = append(command, len(channelsAndEvents)/2)
250 for _, d := range keys {
251 command = append(command, d)
253 for _, d := range channelsAndEvents {
254 command = append(command, d)
256 _, err := db.client.Do(command...).Result()
261 func (db *DB) Del(keys []string) error {
262 _, err := db.client.Del(keys...).Result()
266 func (db *DB) Keys(pattern string) ([]string, error) {
267 return db.client.Keys(pattern).Result()
270 func (db *DB) SetIE(key string, oldData, newData interface{}) (bool, error) {
271 if !db.redisModules {
272 return false, errors.New("Redis deployment not supporting command")
275 return checkResultAndError(db.client.Do("SETIE", key, newData, oldData).Result())
278 func (db *DB) SetIEPub(channel, message, key string, oldData, newData interface{}) (bool, error) {
279 if !db.redisModules {
280 return false, errors.New("Redis deployment not supporting command SETIEPUB")
282 return checkResultAndError(db.client.Do("SETIEPUB", key, newData, oldData, channel, message).Result())
285 func (db *DB) SetNXPub(channel, message, key string, data interface{}) (bool, error) {
286 if !db.redisModules {
287 return false, errors.New("Redis deployment not supporting command SETNXPUB")
289 return checkResultAndError(db.client.Do("SETNXPUB", key, data, channel, message).Result())
291 func (db *DB) SetNX(key string, data interface{}, expiration time.Duration) (bool, error) {
292 return db.client.SetNX(key, data, expiration).Result()
295 func (db *DB) DelIEPub(channel, message, key string, data interface{}) (bool, error) {
296 if !db.redisModules {
297 return false, errors.New("Redis deployment not supporting command")
299 return checkIntResultAndError(db.client.Do("DELIEPUB", key, data, channel, message).Result())
302 func (db *DB) DelIE(key string, data interface{}) (bool, error) {
303 if !db.redisModules {
304 return false, errors.New("Redis deployment not supporting command")
306 return checkIntResultAndError(db.client.Do("DELIE", key, data).Result())
309 func (db *DB) SAdd(key string, data ...interface{}) error {
310 _, err := db.client.SAdd(key, data...).Result()
314 func (db *DB) SRem(key string, data ...interface{}) error {
315 _, err := db.client.SRem(key, data...).Result()
319 func (db *DB) SMembers(key string) ([]string, error) {
320 result, err := db.client.SMembers(key).Result()
324 func (db *DB) SIsMember(key string, data interface{}) (bool, error) {
325 result, err := db.client.SIsMember(key, data).Result()
329 func (db *DB) SCard(key string) (int64, error) {
330 result, err := db.client.SCard(key).Result()
334 func (db *DB) PTTL(key string) (time.Duration, error) {
335 result, err := db.client.PTTL(key).Result()
339 var luaRefresh = redis.NewScript(`if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("pexpire", KEYS[1], ARGV[2]) else return 0 end`)
341 func (db *DB) PExpireIE(key string, data interface{}, expiration time.Duration) error {
342 expirationStr := strconv.FormatInt(int64(expiration/time.Millisecond), 10)
343 result, err := luaRefresh.Run(db.client, []string{key}, data, expirationStr).Result()
347 if result == int64(1) {
350 return errors.New("Lock not held")