2 Copyright (c) 2019 AT&T Intellectual Property.
3 Copyright (c) 2018-2019 Nokia.
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
9 http://www.apache.org/licenses/LICENSE-2.0
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
28 "github.com/go-redis/redis"
31 type ChannelNotificationCb func(channel string, payload ...string)
33 type intChannels struct {
34 addChannel chan string
35 removeChannel chan string
43 cbMap map[string]ChannelNotificationCb
47 type Subscriber interface {
48 Channel() <-chan *redis.Message
49 Subscribe(channels ...string) error
50 Unsubscribe(channels ...string) error
54 type SubscribeFn func(client RedisClient, channels ...string) Subscriber
56 type RedisClient interface {
57 Command() *redis.CommandsInfoCmd
59 Subscribe(channels ...string) *redis.PubSub
60 MSet(pairs ...interface{}) *redis.StatusCmd
61 Do(args ...interface{}) *redis.Cmd
62 MGet(keys ...string) *redis.SliceCmd
63 Del(keys ...string) *redis.IntCmd
64 Keys(pattern string) *redis.StringSliceCmd
65 SetNX(key string, value interface{}, expiration time.Duration) *redis.BoolCmd
66 SAdd(key string, members ...interface{}) *redis.IntCmd
67 SRem(key string, members ...interface{}) *redis.IntCmd
68 SMembers(key string) *redis.StringSliceCmd
69 SIsMember(key string, member interface{}) *redis.BoolCmd
70 SCard(key string) *redis.IntCmd
71 PTTL(key string) *redis.DurationCmd
72 Eval(script string, keys []string, args ...interface{}) *redis.Cmd
73 EvalSha(sha1 string, keys []string, args ...interface{}) *redis.Cmd
74 ScriptExists(scripts ...string) *redis.BoolSliceCmd
75 ScriptLoad(script string) *redis.StringCmd
78 func checkResultAndError(result interface{}, err error) (bool, error) {
91 func checkIntResultAndError(result interface{}, err error) (bool, error) {
95 if result.(int64) == int64(1) {
101 func subscribeNotifications(client RedisClient, channels ...string) Subscriber {
102 return client.Subscribe(channels...)
105 func CreateDB(client RedisClient, subscribe SubscribeFn) *DB {
108 subscribe: subscribe,
110 cbMap: make(map[string]ChannelNotificationCb, 0),
112 addChannel: make(chan string),
113 removeChannel: make(chan string),
114 exit: make(chan bool),
122 var client *redis.Client
123 hostname := os.Getenv("DBAAS_SERVICE_HOST")
125 hostname = "localhost"
127 port := os.Getenv("DBAAS_SERVICE_PORT")
131 sentinelPort := os.Getenv("DBAAS_SERVICE_SENTINEL_PORT")
132 masterName := os.Getenv("DBAAS_MASTER_NAME")
133 if sentinelPort == "" {
134 redisAddress := hostname + ":" + port
135 client = redis.NewClient(&redis.Options{
137 Password: "", // no password set
138 DB: 0, // use default DB
143 sentinelAddress := hostname + ":" + sentinelPort
144 client = redis.NewFailoverClient(&redis.FailoverOptions{
145 MasterName: masterName,
146 SentinelAddrs: []string{sentinelAddress},
151 db := CreateDB(client, subscribeNotifications)
156 func (db *DB) CheckCommands() {
157 commands, err := db.client.Command().Result()
159 redisModuleCommands := []string{"setie", "delie", "setiepub", "setnxpub",
160 "msetmpub", "delmpub"}
161 for _, v := range redisModuleCommands {
164 db.redisModules = false
172 func (db *DB) CloseDB() error {
173 return db.client.Close()
176 func (db *DB) UnsubscribeChannelDB(channels ...string) {
177 for _, v := range channels {
178 db.ch.removeChannel <- v
180 if len(db.cbMap) == 0 {
186 func (db *DB) SubscribeChannelDB(cb func(string, ...string), channelPrefix, eventSeparator string, channels ...string) {
187 if len(db.cbMap) == 0 {
188 for _, v := range channels {
192 go func(cbMap *map[string]ChannelNotificationCb,
194 eventSeparator string,
196 channels ...string) {
197 sub := db.subscribe(db.client, channels...)
198 rxChannel := sub.Channel()
201 case msg := <-rxChannel:
202 cb, ok := (*cbMap)[msg.Channel]
204 cb(strings.TrimPrefix(msg.Channel, channelPrefix), strings.Split(msg.Payload, eventSeparator)...)
206 case channel := <-ch.addChannel:
207 sub.Subscribe(channel)
208 case channel := <-ch.removeChannel:
209 sub.Unsubscribe(channel)
210 case exit := <-ch.exit:
212 if err := sub.Close(); err != nil {
219 }(&db.cbMap, channelPrefix, eventSeparator, db.ch, channels...)
222 for _, v := range channels {
224 db.ch.addChannel <- v
229 func (db *DB) MSet(pairs ...interface{}) error {
230 return db.client.MSet(pairs...).Err()
233 func (db *DB) MSetMPub(channelsAndEvents []string, pairs ...interface{}) error {
234 if !db.redisModules {
235 return errors.New("Redis deployment doesn't support MSETMPUB command")
237 command := make([]interface{}, 0)
238 command = append(command, "MSETMPUB")
239 command = append(command, len(pairs)/2)
240 command = append(command, len(channelsAndEvents)/2)
241 for _, d := range pairs {
242 command = append(command, d)
244 for _, d := range channelsAndEvents {
245 command = append(command, d)
247 _, err := db.client.Do(command...).Result()
251 func (db *DB) MGet(keys []string) ([]interface{}, error) {
252 return db.client.MGet(keys...).Result()
255 func (db *DB) DelMPub(channelsAndEvents []string, keys []string) error {
256 if !db.redisModules {
257 return errors.New("Redis deployment not supporting command DELMPUB")
259 command := make([]interface{}, 0)
260 command = append(command, "DELMPUB")
261 command = append(command, len(keys))
262 command = append(command, len(channelsAndEvents)/2)
263 for _, d := range keys {
264 command = append(command, d)
266 for _, d := range channelsAndEvents {
267 command = append(command, d)
269 _, err := db.client.Do(command...).Result()
274 func (db *DB) Del(keys []string) error {
275 _, err := db.client.Del(keys...).Result()
279 func (db *DB) Keys(pattern string) ([]string, error) {
280 return db.client.Keys(pattern).Result()
283 func (db *DB) SetIE(key string, oldData, newData interface{}) (bool, error) {
284 if !db.redisModules {
285 return false, errors.New("Redis deployment not supporting command")
288 return checkResultAndError(db.client.Do("SETIE", key, newData, oldData).Result())
291 func (db *DB) SetIEPub(channel, message, key string, oldData, newData interface{}) (bool, error) {
292 if !db.redisModules {
293 return false, errors.New("Redis deployment not supporting command SETIEPUB")
295 return checkResultAndError(db.client.Do("SETIEPUB", key, newData, oldData, channel, message).Result())
298 func (db *DB) SetNXPub(channel, message, key string, data interface{}) (bool, error) {
299 if !db.redisModules {
300 return false, errors.New("Redis deployment not supporting command SETNXPUB")
302 return checkResultAndError(db.client.Do("SETNXPUB", key, data, channel, message).Result())
304 func (db *DB) SetNX(key string, data interface{}, expiration time.Duration) (bool, error) {
305 return db.client.SetNX(key, data, expiration).Result()
308 func (db *DB) DelIEPub(channel, message, key string, data interface{}) (bool, error) {
309 if !db.redisModules {
310 return false, errors.New("Redis deployment not supporting command")
312 return checkIntResultAndError(db.client.Do("DELIEPUB", key, data, channel, message).Result())
315 func (db *DB) DelIE(key string, data interface{}) (bool, error) {
316 if !db.redisModules {
317 return false, errors.New("Redis deployment not supporting command")
319 return checkIntResultAndError(db.client.Do("DELIE", key, data).Result())
322 func (db *DB) SAdd(key string, data ...interface{}) error {
323 _, err := db.client.SAdd(key, data...).Result()
327 func (db *DB) SRem(key string, data ...interface{}) error {
328 _, err := db.client.SRem(key, data...).Result()
332 func (db *DB) SMembers(key string) ([]string, error) {
333 result, err := db.client.SMembers(key).Result()
337 func (db *DB) SIsMember(key string, data interface{}) (bool, error) {
338 result, err := db.client.SIsMember(key, data).Result()
342 func (db *DB) SCard(key string) (int64, error) {
343 result, err := db.client.SCard(key).Result()
347 func (db *DB) PTTL(key string) (time.Duration, error) {
348 result, err := db.client.PTTL(key).Result()
352 var luaRefresh = redis.NewScript(`if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("pexpire", KEYS[1], ARGV[2]) else return 0 end`)
354 func (db *DB) PExpireIE(key string, data interface{}, expiration time.Duration) error {
355 expirationStr := strconv.FormatInt(int64(expiration/time.Millisecond), 10)
356 result, err := luaRefresh.Run(db.client, []string{key}, data, expirationStr).Result()
360 if result == int64(1) {
363 return errors.New("Lock not held")