2 Copyright (c) 2019 AT&T Intellectual Property.
3 Copyright (c) 2018-2019 Nokia.
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
9 http://www.apache.org/licenses/LICENSE-2.0
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
27 "github.com/go-redis/redis"
30 type ChannelNotificationCb func(channel string, payload ...string)
32 type intChannels struct {
33 addChannel chan string
34 removeChannel chan string
42 cbMap map[string]ChannelNotificationCb
46 type Subscriber interface {
47 Channel() <-chan *redis.Message
48 Subscribe(channels ...string) error
49 Unsubscribe(channels ...string) error
53 type SubscribeFn func(client RedisClient, channels ...string) Subscriber
55 type RedisClient interface {
56 Command() *redis.CommandsInfoCmd
58 Subscribe(channels ...string) *redis.PubSub
59 MSet(pairs ...interface{}) *redis.StatusCmd
60 Do(args ...interface{}) *redis.Cmd
61 MGet(keys ...string) *redis.SliceCmd
62 Del(keys ...string) *redis.IntCmd
63 Keys(pattern string) *redis.StringSliceCmd
64 SetNX(key string, value interface{}, expiration time.Duration) *redis.BoolCmd
65 SAdd(key string, members ...interface{}) *redis.IntCmd
66 SRem(key string, members ...interface{}) *redis.IntCmd
67 SMembers(key string) *redis.StringSliceCmd
68 SIsMember(key string, member interface{}) *redis.BoolCmd
69 SCard(key string) *redis.IntCmd
72 func checkResultAndError(result interface{}, err error) (bool, error) {
85 func checkIntResultAndError(result interface{}, err error) (bool, error) {
95 func subscribeNotifications(client RedisClient, channels ...string) Subscriber {
96 return client.Subscribe(channels...)
99 func CreateDB(client RedisClient, subscribe SubscribeFn) *DB {
102 subscribe: subscribe,
104 cbMap: make(map[string]ChannelNotificationCb, 0),
106 addChannel: make(chan string),
107 removeChannel: make(chan string),
108 exit: make(chan bool),
116 hostname := os.Getenv("DBAAS_SERVICE_HOST")
118 hostname = "localhost"
120 port := os.Getenv("DBAAS_SERVICE_PORT")
124 redisAddress := hostname + ":" + port
125 client := redis.NewClient(&redis.Options{
127 Password: "", // no password set
128 DB: 0, // use default DB
131 db := CreateDB(client, subscribeNotifications)
136 func (db *DB) CheckCommands() {
137 commands, err := db.client.Command().Result()
139 redisModuleCommands := []string{"setie", "delie", "setiepub", "setnxpub",
140 "msetmpub", "delmpub"}
141 for _, v := range redisModuleCommands {
144 db.redisModules = false
152 func (db *DB) CloseDB() error {
153 return db.client.Close()
156 func (db *DB) UnsubscribeChannelDB(channels ...string) {
157 for _, v := range channels {
158 db.ch.removeChannel <- v
160 if len(db.cbMap) == 0 {
166 func (db *DB) SubscribeChannelDB(cb ChannelNotificationCb, channelPrefix, eventSeparator string, channels ...string) {
167 if len(db.cbMap) == 0 {
168 for _, v := range channels {
172 go func(cbMap *map[string]ChannelNotificationCb,
174 eventSeparator string,
176 channels ...string) {
177 sub := db.subscribe(db.client, channels...)
178 rxChannel := sub.Channel()
181 case msg := <-rxChannel:
182 cb, ok := (*cbMap)[msg.Channel]
184 cb(strings.TrimPrefix(msg.Channel, channelPrefix), strings.Split(msg.Payload, eventSeparator)...)
186 case channel := <-ch.addChannel:
187 sub.Subscribe(channel)
188 case channel := <-ch.removeChannel:
189 sub.Unsubscribe(channel)
190 case exit := <-ch.exit:
192 if err := sub.Close(); err != nil {
199 }(&db.cbMap, channelPrefix, eventSeparator, db.ch, channels...)
202 for _, v := range channels {
204 db.ch.addChannel <- v
209 func (db *DB) MSet(pairs ...interface{}) error {
210 return db.client.MSet(pairs...).Err()
213 func (db *DB) MSetMPub(channelsAndEvents []string, pairs ...interface{}) error {
214 if !db.redisModules {
215 return errors.New("Redis deployment doesn't support MSETMPUB command")
217 command := make([]interface{}, 0)
218 command = append(command, "MSETMPUB")
219 command = append(command, len(pairs)/2)
220 command = append(command, len(channelsAndEvents)/2)
221 for _, d := range pairs {
222 command = append(command, d)
224 for _, d := range channelsAndEvents {
225 command = append(command, d)
227 _, err := db.client.Do(command...).Result()
231 func (db *DB) MGet(keys []string) ([]interface{}, error) {
232 return db.client.MGet(keys...).Result()
235 func (db *DB) DelMPub(channelsAndEvents []string, keys []string) error {
236 if !db.redisModules {
237 return errors.New("Redis deployment not supporting command DELMPUB")
239 command := make([]interface{}, 0)
240 command = append(command, "DELMPUB")
241 command = append(command, len(keys))
242 command = append(command, len(channelsAndEvents)/2)
243 for _, d := range keys {
244 command = append(command, d)
246 for _, d := range channelsAndEvents {
247 command = append(command, d)
249 _, err := db.client.Do(command...).Result()
254 func (db *DB) Del(keys []string) error {
255 _, err := db.client.Del(keys...).Result()
259 func (db *DB) Keys(pattern string) ([]string, error) {
260 return db.client.Keys(pattern).Result()
263 func (db *DB) SetIE(key string, oldData, newData interface{}) (bool, error) {
264 if !db.redisModules {
265 return false, errors.New("Redis deployment not supporting command")
268 return checkResultAndError(db.client.Do("SETIE", key, newData, oldData).Result())
271 func (db *DB) SetIEPub(channel, message, key string, oldData, newData interface{}) (bool, error) {
272 if !db.redisModules {
273 return false, errors.New("Redis deployment not supporting command SETIEPUB")
275 return checkResultAndError(db.client.Do("SETIEPUB", key, newData, oldData, channel, message).Result())
278 func (db *DB) SetNXPub(channel, message, key string, data interface{}) (bool, error) {
279 if !db.redisModules {
280 return false, errors.New("Redis deployment not supporting command SETNXPUB")
282 return checkResultAndError(db.client.Do("SETNXPUB", key, data, channel, message).Result())
284 func (db *DB) SetNX(key string, data interface{}) (bool, error) {
285 return db.client.SetNX(key, data, 0).Result()
288 func (db *DB) DelIEPub(channel, message, key string, data interface{}) (bool, error) {
289 if !db.redisModules {
290 return false, errors.New("Redis deployment not supporting command")
292 return checkIntResultAndError(db.client.Do("DELIEPUB", key, data, channel, message).Result())
295 func (db *DB) DelIE(key string, data interface{}) (bool, error) {
296 if !db.redisModules {
297 return false, errors.New("Redis deployment not supporting command")
299 return checkIntResultAndError(db.client.Do("DELIE", key, data).Result())
302 func (db *DB) SAdd(key string, data ...interface{}) error {
303 _, err := db.client.SAdd(key, data...).Result()
307 func (db *DB) SRem(key string, data ...interface{}) error {
308 _, err := db.client.SRem(key, data...).Result()
312 func (db *DB) SMembers(key string) ([]string, error) {
313 result, err := db.client.SMembers(key).Result()
317 func (db *DB) SIsMember(key string, data interface{}) (bool, error) {
318 result, err := db.client.SIsMember(key, data).Result()
322 func (db *DB) SCard(key string) (int64, error) {
323 result, err := db.client.SCard(key).Result()