2 Copyright (c) 2019 AT&T Intellectual Property.
3 Copyright (c) 2018-2019 Nokia.
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
9 http://www.apache.org/licenses/LICENSE-2.0
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
19 * This source code is part of the near-RT RIC (RAN Intelligent Controller)
20 * platform project (RICP).
33 "github.com/go-redis/redis"
36 type ChannelNotificationCb func(channel string, payload ...string)
38 type intChannels struct {
39 addChannel chan string
40 removeChannel chan string
48 cbMap map[string]ChannelNotificationCb
52 type Subscriber interface {
53 Channel() <-chan *redis.Message
54 Subscribe(channels ...string) error
55 Unsubscribe(channels ...string) error
59 type SubscribeFn func(client RedisClient, channels ...string) Subscriber
61 type RedisClient interface {
62 Command() *redis.CommandsInfoCmd
64 Subscribe(channels ...string) *redis.PubSub
65 MSet(pairs ...interface{}) *redis.StatusCmd
66 Do(args ...interface{}) *redis.Cmd
67 MGet(keys ...string) *redis.SliceCmd
68 Del(keys ...string) *redis.IntCmd
69 Keys(pattern string) *redis.StringSliceCmd
70 SetNX(key string, value interface{}, expiration time.Duration) *redis.BoolCmd
71 SAdd(key string, members ...interface{}) *redis.IntCmd
72 SRem(key string, members ...interface{}) *redis.IntCmd
73 SMembers(key string) *redis.StringSliceCmd
74 SIsMember(key string, member interface{}) *redis.BoolCmd
75 SCard(key string) *redis.IntCmd
76 PTTL(key string) *redis.DurationCmd
77 Eval(script string, keys []string, args ...interface{}) *redis.Cmd
78 EvalSha(sha1 string, keys []string, args ...interface{}) *redis.Cmd
79 ScriptExists(scripts ...string) *redis.BoolSliceCmd
80 ScriptLoad(script string) *redis.StringCmd
83 func checkResultAndError(result interface{}, err error) (bool, error) {
96 func checkIntResultAndError(result interface{}, err error) (bool, error) {
100 if result.(int) == int(1) {
106 func subscribeNotifications(client RedisClient, channels ...string) Subscriber {
107 return client.Subscribe(channels...)
110 func CreateDB(client RedisClient, subscribe SubscribeFn) *DB {
113 subscribe: subscribe,
115 cbMap: make(map[string]ChannelNotificationCb, 0),
117 addChannel: make(chan string),
118 removeChannel: make(chan string),
119 exit: make(chan bool),
127 var client *redis.Client
128 hostname := os.Getenv("DBAAS_SERVICE_HOST")
130 hostname = "localhost"
132 port := os.Getenv("DBAAS_SERVICE_PORT")
136 sentinelPort := os.Getenv("DBAAS_SERVICE_SENTINEL_PORT")
137 masterName := os.Getenv("DBAAS_MASTER_NAME")
138 if sentinelPort == "" {
139 redisAddress := hostname + ":" + port
140 client = redis.NewClient(&redis.Options{
142 Password: "", // no password set
143 DB: 0, // use default DB
148 sentinelAddress := hostname + ":" + sentinelPort
149 client = redis.NewFailoverClient(&redis.FailoverOptions{
150 MasterName: masterName,
151 SentinelAddrs: []string{sentinelAddress},
156 db := CreateDB(client, subscribeNotifications)
161 func (db *DB) CheckCommands() {
162 commands, err := db.client.Command().Result()
164 redisModuleCommands := []string{"setie", "delie", "setiepub", "setnxpub",
165 "msetmpub", "delmpub"}
166 for _, v := range redisModuleCommands {
169 db.redisModules = false
177 func (db *DB) CloseDB() error {
178 return db.client.Close()
181 func (db *DB) UnsubscribeChannelDB(channels ...string) {
182 for _, v := range channels {
183 db.ch.removeChannel <- v
185 if len(db.cbMap) == 0 {
191 func (db *DB) SubscribeChannelDB(cb func(string, ...string), channelPrefix, eventSeparator string, channels ...string) {
192 if len(db.cbMap) == 0 {
193 for _, v := range channels {
197 go func(cbMap *map[string]ChannelNotificationCb,
199 eventSeparator string,
201 channels ...string) {
202 sub := db.subscribe(db.client, channels...)
203 rxChannel := sub.Channel()
206 case msg := <-rxChannel:
207 cb, ok := (*cbMap)[msg.Channel]
209 cb(strings.TrimPrefix(msg.Channel, channelPrefix), strings.Split(msg.Payload, eventSeparator)...)
211 case channel := <-ch.addChannel:
212 sub.Subscribe(channel)
213 case channel := <-ch.removeChannel:
214 sub.Unsubscribe(channel)
215 case exit := <-ch.exit:
217 if err := sub.Close(); err != nil {
224 }(&db.cbMap, channelPrefix, eventSeparator, db.ch, channels...)
227 for _, v := range channels {
229 db.ch.addChannel <- v
234 func (db *DB) MSet(pairs ...interface{}) error {
235 return db.client.MSet(pairs...).Err()
238 func (db *DB) MSetMPub(channelsAndEvents []string, pairs ...interface{}) error {
239 if !db.redisModules {
240 return errors.New("Redis deployment doesn't support MSETMPUB command")
242 command := make([]interface{}, 0)
243 command = append(command, "MSETMPUB")
244 command = append(command, len(pairs)/2)
245 command = append(command, len(channelsAndEvents)/2)
246 for _, d := range pairs {
247 command = append(command, d)
249 for _, d := range channelsAndEvents {
250 command = append(command, d)
252 _, err := db.client.Do(command...).Result()
256 func (db *DB) MGet(keys []string) ([]interface{}, error) {
257 return db.client.MGet(keys...).Result()
260 func (db *DB) DelMPub(channelsAndEvents []string, keys []string) error {
261 if !db.redisModules {
262 return errors.New("Redis deployment not supporting command DELMPUB")
264 command := make([]interface{}, 0)
265 command = append(command, "DELMPUB")
266 command = append(command, len(keys))
267 command = append(command, len(channelsAndEvents)/2)
268 for _, d := range keys {
269 command = append(command, d)
271 for _, d := range channelsAndEvents {
272 command = append(command, d)
274 _, err := db.client.Do(command...).Result()
279 func (db *DB) Del(keys []string) error {
280 _, err := db.client.Del(keys...).Result()
284 func (db *DB) Keys(pattern string) ([]string, error) {
285 return db.client.Keys(pattern).Result()
288 func (db *DB) SetIE(key string, oldData, newData interface{}) (bool, error) {
289 if !db.redisModules {
290 return false, errors.New("Redis deployment not supporting command")
293 return checkResultAndError(db.client.Do("SETIE", key, newData, oldData).Result())
296 func (db *DB) SetIEPub(channel, message, key string, oldData, newData interface{}) (bool, error) {
297 if !db.redisModules {
298 return false, errors.New("Redis deployment not supporting command SETIEPUB")
300 return checkResultAndError(db.client.Do("SETIEPUB", key, newData, oldData, channel, message).Result())
303 func (db *DB) SetNXPub(channel, message, key string, data interface{}) (bool, error) {
304 if !db.redisModules {
305 return false, errors.New("Redis deployment not supporting command SETNXPUB")
307 return checkResultAndError(db.client.Do("SETNXPUB", key, data, channel, message).Result())
309 func (db *DB) SetNX(key string, data interface{}, expiration time.Duration) (bool, error) {
310 return db.client.SetNX(key, data, expiration).Result()
313 func (db *DB) DelIEPub(channel, message, key string, data interface{}) (bool, error) {
314 if !db.redisModules {
315 return false, errors.New("Redis deployment not supporting command")
317 return checkIntResultAndError(db.client.Do("DELIEPUB", key, data, channel, message).Result())
320 func (db *DB) DelIE(key string, data interface{}) (bool, error) {
321 if !db.redisModules {
322 return false, errors.New("Redis deployment not supporting command")
324 return checkIntResultAndError(db.client.Do("DELIE", key, data).Result())
327 func (db *DB) SAdd(key string, data ...interface{}) error {
328 _, err := db.client.SAdd(key, data...).Result()
332 func (db *DB) SRem(key string, data ...interface{}) error {
333 _, err := db.client.SRem(key, data...).Result()
337 func (db *DB) SMembers(key string) ([]string, error) {
338 result, err := db.client.SMembers(key).Result()
342 func (db *DB) SIsMember(key string, data interface{}) (bool, error) {
343 result, err := db.client.SIsMember(key, data).Result()
347 func (db *DB) SCard(key string) (int64, error) {
348 result, err := db.client.SCard(key).Result()
352 func (db *DB) PTTL(key string) (time.Duration, error) {
353 result, err := db.client.PTTL(key).Result()
357 var luaRefresh = redis.NewScript(`if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("pexpire", KEYS[1], ARGV[2]) else return 0 end`)
359 func (db *DB) PExpireIE(key string, data interface{}, expiration time.Duration) error {
360 expirationStr := strconv.FormatInt(int64(expiration/time.Millisecond), 10)
361 result, err := luaRefresh.Run(db.client, []string{key}, data, expirationStr).Result()
365 if result == int64(1) {
368 return errors.New("Lock not held")