2 Copyright (c) 2019 AT&T Intellectual Property.
3 Copyright (c) 2018-2019 Nokia.
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
9 http://www.apache.org/licenses/LICENSE-2.0
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
26 "github.com/go-redis/redis"
29 type ChannelNotificationCb func(channel string, payload ...string)
31 type intChannels struct {
32 addChannel chan string
33 removeChannel chan string
40 cbMap map[string]ChannelNotificationCb
44 func checkResultAndError(result interface{}, err error) (bool, error) {
58 func checkIntResultAndError(result interface{}, err error) (bool, error) {
62 if result.(int64) == 1 {
71 hostname := os.Getenv("DBAAS_SERVICE_HOST")
73 hostname = "localhost"
75 port := os.Getenv("DBAAS_SERVICE_PORT")
79 redisAddress := hostname + ":" + port
80 client := redis.NewClient(&redis.Options{
82 Password: "", // no password set
83 DB: 0, // use default DB
90 cbMap: make(map[string]ChannelNotificationCb, 0),
92 addChannel: make(chan string),
93 removeChannel: make(chan string),
94 exit: make(chan bool),
98 commands, err := db.client.Command().Result()
100 redisModuleCommands := []string{"setie", "delie", "msetpub", "setiepub", "setnxpub", "delpub"}
101 for _, v := range redisModuleCommands {
104 db.redisModules = false
113 func (db *DB) CloseDB() error {
114 return db.client.Close()
117 func (db *DB) UnsubscribeChannelDB(channels ...string) {
118 for _, v := range channels {
119 db.ch.removeChannel <- v
121 if len(db.cbMap) == 0 {
127 func (db *DB) SubscribeChannelDB(cb ChannelNotificationCb, channelPrefix, eventSeparator string, channels ...string) {
128 if len(db.cbMap) == 0 {
129 for _, v := range channels {
133 go func(cbMap *map[string]ChannelNotificationCb,
135 eventSeparator string,
137 channels ...string) {
138 sub := db.client.Subscribe(channels...)
139 rxChannel := sub.Channel()
142 case msg := <-rxChannel:
143 cb, ok := (*cbMap)[msg.Channel]
145 cb(strings.TrimPrefix(msg.Channel, channelPrefix), strings.Split(msg.Payload, eventSeparator)...)
147 case channel := <-ch.addChannel:
148 sub.Subscribe(channel)
149 case channel := <-ch.removeChannel:
150 sub.Unsubscribe(channel)
151 case exit := <-ch.exit:
153 if err := sub.Close(); err != nil {
160 }(&db.cbMap, channelPrefix, eventSeparator, db.ch, channels...)
163 for _, v := range channels {
165 db.ch.addChannel <- v
170 func (db *DB) MSet(pairs ...interface{}) error {
171 return db.client.MSet(pairs...).Err()
174 func (db *DB) MSetPub(channel, message string, pairs ...interface{}) error {
175 if !db.redisModules {
176 return errors.New("Redis deployment doesn't support MSETPUB command")
178 command := make([]interface{}, 0)
179 command = append(command, "MSETPUB")
180 for _, d := range pairs {
181 command = append(command, d)
183 command = append(command, channel, message)
184 _, err := db.client.Do(command...).Result()
188 func (db *DB) MGet(keys []string) ([]interface{}, error) {
189 return db.client.MGet(keys...).Result()
192 func (db *DB) DelPub(channel, message string, keys []string) error {
193 if !db.redisModules {
194 return errors.New("Redis deployment not supporting command DELPUB")
196 command := make([]interface{}, 0)
197 command = append(command, "DELPUB")
198 for _, d := range keys {
199 command = append(command, d)
201 command = append(command, channel, message)
202 _, err := db.client.Do(command...).Result()
206 func (db *DB) Del(keys []string) error {
207 _, err := db.client.Del(keys...).Result()
211 func (db *DB) Keys(pattern string) ([]string, error) {
212 return db.client.Keys(pattern).Result()
215 func (db *DB) SetIE(key string, oldData, newData interface{}) (bool, error) {
216 if !db.redisModules {
217 return false, errors.New("Redis deployment not supporting command")
220 return checkResultAndError(db.client.Do("SETIE", key, newData, oldData).Result())
223 func (db *DB) SetIEPub(channel, message, key string, oldData, newData interface{}) (bool, error) {
224 if !db.redisModules {
225 return false, errors.New("Redis deployment not supporting command SETIEPUB")
227 return checkResultAndError(db.client.Do("SETIEPUB", key, newData, oldData, channel, message).Result())
230 func (db *DB) SetNXPub(channel, message, key string, data interface{}) (bool, error) {
231 if !db.redisModules {
232 return false, errors.New("Redis deployment not supporting command SETNXPUB")
234 return checkResultAndError(db.client.Do("SETNXPUB", key, data, channel, message).Result())
236 func (db *DB) SetNX(key string, data interface{}) (bool, error) {
237 return db.client.SetNX(key, data, 0).Result()
240 func (db *DB) DelIEPub(channel, message, key string, data interface{}) (bool, error) {
241 if !db.redisModules {
242 return false, errors.New("Redis deployment not supporting command")
244 return checkIntResultAndError(db.client.Do("DELIEPUB", key, data, channel, message).Result())
247 func (db *DB) DelIE(key string, data interface{}) (bool, error) {
248 if !db.redisModules {
249 return false, errors.New("Redis deployment not supporting command")
251 return checkIntResultAndError(db.client.Do("DELIE", key, data).Result())