2 Copyright (c) 2019 AT&T Intellectual Property.
3 Copyright (c) 2018-2019 Nokia.
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
9 http://www.apache.org/licenses/LICENSE-2.0
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
19 * This source code is part of the near-RT RIC (RAN Intelligent Controller)
20 * platform project (RICP).
28 "github.com/go-redis/redis"
36 type ChannelNotificationCb func(channel string, payload ...string)
37 type RedisClientCreator func(addr, port, clusterName string, isHa bool) RedisClient
39 type intChannels struct {
40 addChannel chan string
41 removeChannel chan string
45 type sharedCbMap struct {
47 cbMap map[string]ChannelNotificationCb
55 clusterAddrList string
66 type Subscriber interface {
67 Channel() <-chan *redis.Message
68 Subscribe(channels ...string) error
69 Unsubscribe(channels ...string) error
73 type SubscribeFn func(client RedisClient, channels ...string) Subscriber
75 type RedisClient interface {
76 Command() *redis.CommandsInfoCmd
78 Subscribe(channels ...string) *redis.PubSub
79 MSet(pairs ...interface{}) *redis.StatusCmd
80 Do(args ...interface{}) *redis.Cmd
81 MGet(keys ...string) *redis.SliceCmd
82 Del(keys ...string) *redis.IntCmd
83 Keys(pattern string) *redis.StringSliceCmd
84 SetNX(key string, value interface{}, expiration time.Duration) *redis.BoolCmd
85 SAdd(key string, members ...interface{}) *redis.IntCmd
86 SRem(key string, members ...interface{}) *redis.IntCmd
87 SMembers(key string) *redis.StringSliceCmd
88 SIsMember(key string, member interface{}) *redis.BoolCmd
89 SCard(key string) *redis.IntCmd
90 PTTL(key string) *redis.DurationCmd
91 Eval(script string, keys []string, args ...interface{}) *redis.Cmd
92 EvalSha(sha1 string, keys []string, args ...interface{}) *redis.Cmd
93 ScriptExists(scripts ...string) *redis.BoolSliceCmd
94 ScriptLoad(script string) *redis.StringCmd
97 func checkResultAndError(result interface{}, err error) (bool, error) {
110 func checkIntResultAndError(result interface{}, err error) (bool, error) {
114 if n, ok := result.(int64); ok {
118 } else if n, ok := result.(int); ok {
126 func subscribeNotifications(client RedisClient, channels ...string) Subscriber {
127 return client.Subscribe(channels...)
130 func CreateDB(client RedisClient, subscribe SubscribeFn) *DB {
133 subscribe: subscribe,
135 sCbMap: &sharedCbMap{cbMap: make(map[string]ChannelNotificationCb, 0)},
137 addChannel: make(chan string),
138 removeChannel: make(chan string),
139 exit: make(chan bool),
146 func Create() []*DB {
148 return ReadConfigAndCreateDbClients(osimpl, newRedisClient)
151 func readConfig(osI OS) Config {
153 hostname: osI.Getenv("DBAAS_SERVICE_HOST", "localhost"),
154 port: osI.Getenv("DBAAS_SERVICE_PORT", "6379"),
155 masterName: osI.Getenv("DBAAS_MASTER_NAME", ""),
156 sentinelPort: osI.Getenv("DBAAS_SERVICE_SENTINEL_PORT", ""),
157 clusterAddrList: osI.Getenv("DBAAS_CLUSTER_ADDR_LIST", ""),
163 Getenv(key string, defValue string) string
168 func (osImpl) Getenv(key string, defValue string) string {
169 val := os.Getenv(key)
176 func ReadConfigAndCreateDbClients(osI OS, clientCreator RedisClientCreator) []*DB {
177 cfg := readConfig(osI)
178 return createDbClients(cfg, clientCreator)
181 func createDbClients(cfg Config, clientCreator RedisClientCreator) []*DB {
182 if cfg.clusterAddrList == "" {
183 return []*DB{createLegacyDbClient(cfg, clientCreator)}
188 addrList := strings.Split(cfg.clusterAddrList, ",")
189 for _, addr := range addrList {
190 db := createDbClient(cfg, addr, clientCreator)
191 dbs = append(dbs, db)
196 func createLegacyDbClient(cfg Config, clientCreator RedisClientCreator) *DB {
197 return createDbClient(cfg, cfg.hostname, clientCreator)
200 func createDbClient(cfg Config, hostName string, clientCreator RedisClientCreator) *DB {
201 var client RedisClient
202 if cfg.sentinelPort == "" {
203 client = clientCreator(hostName, cfg.port, "", false)
205 client = clientCreator(hostName, cfg.sentinelPort, cfg.masterName, true)
207 db := CreateDB(client, subscribeNotifications)
212 func newRedisClient(addr, port, clusterName string, isHa bool) RedisClient {
214 sentinelAddress := addr + ":" + port
215 return redis.NewFailoverClient(
216 &redis.FailoverOptions{
217 MasterName: clusterName,
218 SentinelAddrs: []string{sentinelAddress},
224 redisAddress := addr + ":" + port
225 return redis.NewClient(&redis.Options{
227 Password: "", // no password set
228 DB: 0, // use default DB
234 func (db *DB) CheckCommands() {
235 commands, err := db.client.Command().Result()
237 redisModuleCommands := []string{"setie", "delie", "setiepub", "setnxpub",
238 "msetmpub", "delmpub"}
239 for _, v := range redisModuleCommands {
242 db.redisModules = false
250 func (db *DB) CloseDB() error {
251 return db.client.Close()
254 func (db *DB) UnsubscribeChannelDB(channels ...string) {
255 for _, v := range channels {
257 db.ch.removeChannel <- v
258 if db.sCbMap.Count() == 0 {
264 func (db *DB) SubscribeChannelDB(cb func(string, ...string), channelPrefix, eventSeparator string, channels ...string) {
265 if db.sCbMap.Count() == 0 {
266 for _, v := range channels {
270 go func(sCbMap *sharedCbMap,
272 eventSeparator string,
274 channels ...string) {
275 sub := db.subscribe(db.client, channels...)
276 rxChannel := sub.Channel()
277 lCbMap := sCbMap.GetMapCopy()
280 case msg := <-rxChannel:
281 cb, ok := lCbMap[msg.Channel]
283 cb(strings.TrimPrefix(msg.Channel, channelPrefix), strings.Split(msg.Payload, eventSeparator)...)
285 case channel := <-ch.addChannel:
286 lCbMap = sCbMap.GetMapCopy()
287 sub.Subscribe(channel)
288 case channel := <-ch.removeChannel:
289 lCbMap = sCbMap.GetMapCopy()
290 sub.Unsubscribe(channel)
291 case exit := <-ch.exit:
293 if err := sub.Close(); err != nil {
300 }(db.sCbMap, channelPrefix, eventSeparator, db.ch, channels...)
303 for _, v := range channels {
305 db.ch.addChannel <- v
310 func (db *DB) MSet(pairs ...interface{}) error {
311 return db.client.MSet(pairs...).Err()
314 func (db *DB) MSetMPub(channelsAndEvents []string, pairs ...interface{}) error {
315 if !db.redisModules {
316 return errors.New("Redis deployment doesn't support MSETMPUB command")
318 command := make([]interface{}, 0)
319 command = append(command, "MSETMPUB")
320 command = append(command, len(pairs)/2)
321 command = append(command, len(channelsAndEvents)/2)
322 for _, d := range pairs {
323 command = append(command, d)
325 for _, d := range channelsAndEvents {
326 command = append(command, d)
328 _, err := db.client.Do(command...).Result()
332 func (db *DB) MGet(keys []string) ([]interface{}, error) {
333 return db.client.MGet(keys...).Result()
336 func (db *DB) DelMPub(channelsAndEvents []string, keys []string) error {
337 if !db.redisModules {
338 return errors.New("Redis deployment not supporting command DELMPUB")
340 command := make([]interface{}, 0)
341 command = append(command, "DELMPUB")
342 command = append(command, len(keys))
343 command = append(command, len(channelsAndEvents)/2)
344 for _, d := range keys {
345 command = append(command, d)
347 for _, d := range channelsAndEvents {
348 command = append(command, d)
350 _, err := db.client.Do(command...).Result()
355 func (db *DB) Del(keys []string) error {
356 _, err := db.client.Del(keys...).Result()
360 func (db *DB) Keys(pattern string) ([]string, error) {
361 return db.client.Keys(pattern).Result()
364 func (db *DB) SetIE(key string, oldData, newData interface{}) (bool, error) {
365 if !db.redisModules {
366 return false, errors.New("Redis deployment not supporting command")
369 return checkResultAndError(db.client.Do("SETIE", key, newData, oldData).Result())
372 func (db *DB) SetIEPub(channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error) {
373 if !db.redisModules {
374 return false, errors.New("Redis deployment not supporting command SETIEMPUB")
376 capacity := 4 + len(channelsAndEvents)
377 command := make([]interface{}, 0, capacity)
378 command = append(command, "SETIEMPUB")
379 command = append(command, key)
380 command = append(command, newData)
381 command = append(command, oldData)
382 for _, ce := range channelsAndEvents {
383 command = append(command, ce)
385 return checkResultAndError(db.client.Do(command...).Result())
388 func (db *DB) SetNXPub(channelsAndEvents []string, key string, data interface{}) (bool, error) {
389 if !db.redisModules {
390 return false, errors.New("Redis deployment not supporting command SETNXMPUB")
392 capacity := 3 + len(channelsAndEvents)
393 command := make([]interface{}, 0, capacity)
394 command = append(command, "SETNXMPUB")
395 command = append(command, key)
396 command = append(command, data)
397 for _, ce := range channelsAndEvents {
398 command = append(command, ce)
400 return checkResultAndError(db.client.Do(command...).Result())
402 func (db *DB) SetNX(key string, data interface{}, expiration time.Duration) (bool, error) {
403 return db.client.SetNX(key, data, expiration).Result()
406 func (db *DB) DelIEPub(channelsAndEvents []string, key string, data interface{}) (bool, error) {
407 if !db.redisModules {
408 return false, errors.New("Redis deployment not supporting command DELIEMPUB")
410 capacity := 3 + len(channelsAndEvents)
411 command := make([]interface{}, 0, capacity)
412 command = append(command, "DELIEMPUB")
413 command = append(command, key)
414 command = append(command, data)
415 for _, ce := range channelsAndEvents {
416 command = append(command, ce)
418 return checkIntResultAndError(db.client.Do(command...).Result())
421 func (db *DB) DelIE(key string, data interface{}) (bool, error) {
422 if !db.redisModules {
423 return false, errors.New("Redis deployment not supporting command")
425 return checkIntResultAndError(db.client.Do("DELIE", key, data).Result())
428 func (db *DB) SAdd(key string, data ...interface{}) error {
429 _, err := db.client.SAdd(key, data...).Result()
433 func (db *DB) SRem(key string, data ...interface{}) error {
434 _, err := db.client.SRem(key, data...).Result()
438 func (db *DB) SMembers(key string) ([]string, error) {
439 result, err := db.client.SMembers(key).Result()
443 func (db *DB) SIsMember(key string, data interface{}) (bool, error) {
444 result, err := db.client.SIsMember(key, data).Result()
448 func (db *DB) SCard(key string) (int64, error) {
449 result, err := db.client.SCard(key).Result()
453 func (db *DB) PTTL(key string) (time.Duration, error) {
454 result, err := db.client.PTTL(key).Result()
458 var luaRefresh = redis.NewScript(`if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("pexpire", KEYS[1], ARGV[2]) else return 0 end`)
460 func (db *DB) PExpireIE(key string, data interface{}, expiration time.Duration) error {
461 expirationStr := strconv.FormatInt(int64(expiration/time.Millisecond), 10)
462 result, err := luaRefresh.Run(db.client, []string{key}, data, expirationStr).Result()
466 if result == int64(1) {
469 return errors.New("Lock not held")
472 func (sCbMap *sharedCbMap) Add(channel string, cb ChannelNotificationCb) {
474 defer sCbMap.m.Unlock()
475 sCbMap.cbMap[channel] = cb
478 func (sCbMap *sharedCbMap) Remove(channel string) {
480 defer sCbMap.m.Unlock()
481 delete(sCbMap.cbMap, channel)
484 func (sCbMap *sharedCbMap) Count() int {
486 defer sCbMap.m.Unlock()
487 return len(sCbMap.cbMap)
490 func (sCbMap *sharedCbMap) GetMapCopy() map[string]ChannelNotificationCb {
492 defer sCbMap.m.Unlock()
493 mapCopy := make(map[string]ChannelNotificationCb, 0)
494 for i, v := range sCbMap.cbMap {