2 Copyright (c) 2019 AT&T Intellectual Property.
3 Copyright (c) 2018-2019 Nokia.
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
9 http://www.apache.org/licenses/LICENSE-2.0
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
19 * This source code is part of the near-RT RIC (RAN Intelligent Controller)
20 * platform project (RICP).
28 "github.com/go-redis/redis"
36 type ChannelNotificationCb func(channel string, payload ...string)
38 type intChannels struct {
39 addChannel chan string
40 removeChannel chan string
44 type sharedCbMap struct {
46 cbMap map[string]ChannelNotificationCb
57 type Subscriber interface {
58 Channel() <-chan *redis.Message
59 Subscribe(channels ...string) error
60 Unsubscribe(channels ...string) error
64 type SubscribeFn func(client RedisClient, channels ...string) Subscriber
66 type RedisClient interface {
67 Command() *redis.CommandsInfoCmd
69 Subscribe(channels ...string) *redis.PubSub
70 MSet(pairs ...interface{}) *redis.StatusCmd
71 Do(args ...interface{}) *redis.Cmd
72 MGet(keys ...string) *redis.SliceCmd
73 Del(keys ...string) *redis.IntCmd
74 Keys(pattern string) *redis.StringSliceCmd
75 SetNX(key string, value interface{}, expiration time.Duration) *redis.BoolCmd
76 SAdd(key string, members ...interface{}) *redis.IntCmd
77 SRem(key string, members ...interface{}) *redis.IntCmd
78 SMembers(key string) *redis.StringSliceCmd
79 SIsMember(key string, member interface{}) *redis.BoolCmd
80 SCard(key string) *redis.IntCmd
81 PTTL(key string) *redis.DurationCmd
82 Eval(script string, keys []string, args ...interface{}) *redis.Cmd
83 EvalSha(sha1 string, keys []string, args ...interface{}) *redis.Cmd
84 ScriptExists(scripts ...string) *redis.BoolSliceCmd
85 ScriptLoad(script string) *redis.StringCmd
88 func checkResultAndError(result interface{}, err error) (bool, error) {
101 func checkIntResultAndError(result interface{}, err error) (bool, error) {
105 if result.(int) == int(1) {
111 func subscribeNotifications(client RedisClient, channels ...string) Subscriber {
112 return client.Subscribe(channels...)
115 func CreateDB(client RedisClient, subscribe SubscribeFn) *DB {
118 subscribe: subscribe,
120 sCbMap: &sharedCbMap{cbMap: make(map[string]ChannelNotificationCb, 0)},
122 addChannel: make(chan string),
123 removeChannel: make(chan string),
124 exit: make(chan bool),
132 var client *redis.Client
133 hostname := os.Getenv("DBAAS_SERVICE_HOST")
135 hostname = "localhost"
137 port := os.Getenv("DBAAS_SERVICE_PORT")
141 sentinelPort := os.Getenv("DBAAS_SERVICE_SENTINEL_PORT")
142 masterName := os.Getenv("DBAAS_MASTER_NAME")
143 if sentinelPort == "" {
144 redisAddress := hostname + ":" + port
145 client = redis.NewClient(&redis.Options{
147 Password: "", // no password set
148 DB: 0, // use default DB
153 sentinelAddress := hostname + ":" + sentinelPort
154 client = redis.NewFailoverClient(&redis.FailoverOptions{
155 MasterName: masterName,
156 SentinelAddrs: []string{sentinelAddress},
161 db := CreateDB(client, subscribeNotifications)
166 func (db *DB) CheckCommands() {
167 commands, err := db.client.Command().Result()
169 redisModuleCommands := []string{"setie", "delie", "setiepub", "setnxpub",
170 "msetmpub", "delmpub"}
171 for _, v := range redisModuleCommands {
174 db.redisModules = false
182 func (db *DB) CloseDB() error {
183 return db.client.Close()
186 func (db *DB) UnsubscribeChannelDB(channels ...string) {
187 for _, v := range channels {
189 db.ch.removeChannel <- v
190 if db.sCbMap.Count() == 0 {
196 func (db *DB) SubscribeChannelDB(cb func(string, ...string), channelPrefix, eventSeparator string, channels ...string) {
197 if db.sCbMap.Count() == 0 {
198 for _, v := range channels {
202 go func(sCbMap *sharedCbMap,
204 eventSeparator string,
206 channels ...string) {
207 sub := db.subscribe(db.client, channels...)
208 rxChannel := sub.Channel()
209 lCbMap := sCbMap.GetMapCopy()
212 case msg := <-rxChannel:
213 cb, ok := lCbMap[msg.Channel]
215 cb(strings.TrimPrefix(msg.Channel, channelPrefix), strings.Split(msg.Payload, eventSeparator)...)
217 case channel := <-ch.addChannel:
218 lCbMap = sCbMap.GetMapCopy()
219 sub.Subscribe(channel)
220 case channel := <-ch.removeChannel:
221 lCbMap = sCbMap.GetMapCopy()
222 sub.Unsubscribe(channel)
223 case exit := <-ch.exit:
225 if err := sub.Close(); err != nil {
232 }(db.sCbMap, channelPrefix, eventSeparator, db.ch, channels...)
235 for _, v := range channels {
237 db.ch.addChannel <- v
242 func (db *DB) MSet(pairs ...interface{}) error {
243 return db.client.MSet(pairs...).Err()
246 func (db *DB) MSetMPub(channelsAndEvents []string, pairs ...interface{}) error {
247 if !db.redisModules {
248 return errors.New("Redis deployment doesn't support MSETMPUB command")
250 command := make([]interface{}, 0)
251 command = append(command, "MSETMPUB")
252 command = append(command, len(pairs)/2)
253 command = append(command, len(channelsAndEvents)/2)
254 for _, d := range pairs {
255 command = append(command, d)
257 for _, d := range channelsAndEvents {
258 command = append(command, d)
260 _, err := db.client.Do(command...).Result()
264 func (db *DB) MGet(keys []string) ([]interface{}, error) {
265 return db.client.MGet(keys...).Result()
268 func (db *DB) DelMPub(channelsAndEvents []string, keys []string) error {
269 if !db.redisModules {
270 return errors.New("Redis deployment not supporting command DELMPUB")
272 command := make([]interface{}, 0)
273 command = append(command, "DELMPUB")
274 command = append(command, len(keys))
275 command = append(command, len(channelsAndEvents)/2)
276 for _, d := range keys {
277 command = append(command, d)
279 for _, d := range channelsAndEvents {
280 command = append(command, d)
282 _, err := db.client.Do(command...).Result()
287 func (db *DB) Del(keys []string) error {
288 _, err := db.client.Del(keys...).Result()
292 func (db *DB) Keys(pattern string) ([]string, error) {
293 return db.client.Keys(pattern).Result()
296 func (db *DB) SetIE(key string, oldData, newData interface{}) (bool, error) {
297 if !db.redisModules {
298 return false, errors.New("Redis deployment not supporting command")
301 return checkResultAndError(db.client.Do("SETIE", key, newData, oldData).Result())
304 func (db *DB) SetIEPub(channel, message, key string, oldData, newData interface{}) (bool, error) {
305 if !db.redisModules {
306 return false, errors.New("Redis deployment not supporting command SETIEPUB")
308 return checkResultAndError(db.client.Do("SETIEPUB", key, newData, oldData, channel, message).Result())
311 func (db *DB) SetNXPub(channel, message, key string, data interface{}) (bool, error) {
312 if !db.redisModules {
313 return false, errors.New("Redis deployment not supporting command SETNXPUB")
315 return checkResultAndError(db.client.Do("SETNXPUB", key, data, channel, message).Result())
317 func (db *DB) SetNX(key string, data interface{}, expiration time.Duration) (bool, error) {
318 return db.client.SetNX(key, data, expiration).Result()
321 func (db *DB) DelIEPub(channel, message, key string, data interface{}) (bool, error) {
322 if !db.redisModules {
323 return false, errors.New("Redis deployment not supporting command")
325 return checkIntResultAndError(db.client.Do("DELIEPUB", key, data, channel, message).Result())
328 func (db *DB) DelIE(key string, data interface{}) (bool, error) {
329 if !db.redisModules {
330 return false, errors.New("Redis deployment not supporting command")
332 return checkIntResultAndError(db.client.Do("DELIE", key, data).Result())
335 func (db *DB) SAdd(key string, data ...interface{}) error {
336 _, err := db.client.SAdd(key, data...).Result()
340 func (db *DB) SRem(key string, data ...interface{}) error {
341 _, err := db.client.SRem(key, data...).Result()
345 func (db *DB) SMembers(key string) ([]string, error) {
346 result, err := db.client.SMembers(key).Result()
350 func (db *DB) SIsMember(key string, data interface{}) (bool, error) {
351 result, err := db.client.SIsMember(key, data).Result()
355 func (db *DB) SCard(key string) (int64, error) {
356 result, err := db.client.SCard(key).Result()
360 func (db *DB) PTTL(key string) (time.Duration, error) {
361 result, err := db.client.PTTL(key).Result()
365 var luaRefresh = redis.NewScript(`if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("pexpire", KEYS[1], ARGV[2]) else return 0 end`)
367 func (db *DB) PExpireIE(key string, data interface{}, expiration time.Duration) error {
368 expirationStr := strconv.FormatInt(int64(expiration/time.Millisecond), 10)
369 result, err := luaRefresh.Run(db.client, []string{key}, data, expirationStr).Result()
373 if result == int64(1) {
376 return errors.New("Lock not held")
379 func (sCbMap *sharedCbMap) Add(channel string, cb ChannelNotificationCb) {
381 defer sCbMap.m.Unlock()
382 sCbMap.cbMap[channel] = cb
385 func (sCbMap *sharedCbMap) Remove(channel string) {
387 defer sCbMap.m.Unlock()
388 delete(sCbMap.cbMap, channel)
391 func (sCbMap *sharedCbMap) Count() int {
393 defer sCbMap.m.Unlock()
394 return len(sCbMap.cbMap)
397 func (sCbMap *sharedCbMap) GetMapCopy() map[string]ChannelNotificationCb {
399 defer sCbMap.m.Unlock()
400 mapCopy := make(map[string]ChannelNotificationCb, 0)
401 for i, v := range sCbMap.cbMap {