"fmt"
"os"
"strings"
+ "time"
"github.com/go-redis/redis"
)
}
type DB struct {
- client *redis.Client
+ client RedisClient
+ subscribe SubscribeFn
redisModules bool
cbMap map[string]ChannelNotificationCb
ch intChannels
}
+type Subscriber interface {
+ Channel() <-chan *redis.Message
+ Subscribe(channels ...string) error
+ Unsubscribe(channels ...string) error
+ Close() error
+}
+
+type SubscribeFn func(client RedisClient, channels ...string) Subscriber
+
+type RedisClient interface {
+ Command() *redis.CommandsInfoCmd
+ Close() error
+ Subscribe(channels ...string) *redis.PubSub
+ MSet(pairs ...interface{}) *redis.StatusCmd
+ Do(args ...interface{}) *redis.Cmd
+ MGet(keys ...string) *redis.SliceCmd
+ Del(keys ...string) *redis.IntCmd
+ Keys(pattern string) *redis.StringSliceCmd
+ SetNX(key string, value interface{}, expiration time.Duration) *redis.BoolCmd
+ SAdd(key string, members ...interface{}) *redis.IntCmd
+ SRem(key string, members ...interface{}) *redis.IntCmd
+ SMembers(key string) *redis.StringSliceCmd
+ SIsMember(key string, member interface{}) *redis.BoolCmd
+ SCard(key string) *redis.IntCmd
+}
+
func checkResultAndError(result interface{}, err error) (bool, error) {
if err != nil {
if err == redis.Nil {
}
if result == "OK" {
return true, nil
- } else {
- return false, nil
}
+ return false, nil
}
func checkIntResultAndError(result interface{}, err error) (bool, error) {
if err != nil {
return false, err
}
- if result.(int64) == 1 {
+ if result == 1 {
return true, nil
- } else {
- return false, nil
}
+ return false, nil
+}
+func subscribeNotifications(client RedisClient, channels ...string) Subscriber {
+ return client.Subscribe(channels...)
+}
+
+func CreateDB(client RedisClient, subscribe SubscribeFn) *DB {
+ db := DB{
+ client: client,
+ subscribe: subscribe,
+ redisModules: true,
+ cbMap: make(map[string]ChannelNotificationCb, 0),
+ ch: intChannels{
+ addChannel: make(chan string),
+ removeChannel: make(chan string),
+ exit: make(chan bool),
+ },
+ }
+
+ return &db
}
func Create() *DB {
DB: 0, // use default DB
PoolSize: 20,
})
+ db := CreateDB(client, subscribeNotifications)
+ db.CheckCommands()
+ return db
+}
- db := DB{
- client: client,
- redisModules: true,
- cbMap: make(map[string]ChannelNotificationCb, 0),
- ch: intChannels{
- addChannel: make(chan string),
- removeChannel: make(chan string),
- exit: make(chan bool),
- },
- }
-
+func (db *DB) CheckCommands() {
commands, err := db.client.Command().Result()
if err == nil {
- redisModuleCommands := []string{"setie", "delie", "msetpub", "setiepub", "setnxpub", "delpub"}
+ redisModuleCommands := []string{"setie", "delie", "setiepub", "setnxpub",
+ "msetmpub", "delmpub"}
for _, v := range redisModuleCommands {
_, ok := commands[v]
if !ok {
} else {
fmt.Println(err)
}
- return &db
}
func (db *DB) CloseDB() error {
eventSeparator string,
ch intChannels,
channels ...string) {
- sub := db.client.Subscribe(channels...)
+ sub := db.subscribe(db.client, channels...)
rxChannel := sub.Channel()
for {
select {
return db.client.MSet(pairs...).Err()
}
-func (db *DB) MSetPub(channel, message string, pairs ...interface{}) error {
+func (db *DB) MSetMPub(channelsAndEvents []string, pairs ...interface{}) error {
if !db.redisModules {
- return errors.New("Redis deployment doesn't support MSETPUB command")
+ return errors.New("Redis deployment doesn't support MSETMPUB command")
}
command := make([]interface{}, 0)
- command = append(command, "MSETPUB")
+ command = append(command, "MSETMPUB")
+ command = append(command, len(pairs)/2)
+ command = append(command, len(channelsAndEvents)/2)
for _, d := range pairs {
command = append(command, d)
}
- command = append(command, channel, message)
+ for _, d := range channelsAndEvents {
+ command = append(command, d)
+ }
_, err := db.client.Do(command...).Result()
return err
}
return db.client.MGet(keys...).Result()
}
-func (db *DB) DelPub(channel, message string, keys []string) error {
+func (db *DB) DelMPub(channelsAndEvents []string, keys []string) error {
if !db.redisModules {
- return errors.New("Redis deployment not supporting command DELPUB")
+ return errors.New("Redis deployment not supporting command DELMPUB")
}
command := make([]interface{}, 0)
- command = append(command, "DELPUB")
+ command = append(command, "DELMPUB")
+ command = append(command, len(keys))
+ command = append(command, len(channelsAndEvents)/2)
for _, d := range keys {
command = append(command, d)
}
- command = append(command, channel, message)
+ for _, d := range channelsAndEvents {
+ command = append(command, d)
+ }
_, err := db.client.Do(command...).Result()
return err
+
}
func (db *DB) Del(keys []string) error {