Support multiple event publishing
[ric-plt/sdlgo.git] / internal / sdlgoredis / sdlgoredis.go
index b032d8f..ab56b12 100644 (file)
@@ -22,6 +22,7 @@ import (
        "fmt"
        "os"
        "strings"
+       "time"
 
        "github.com/go-redis/redis"
 )
@@ -35,12 +36,39 @@ type intChannels struct {
 }
 
 type DB struct {
-       client       *redis.Client
+       client       RedisClient
+       subscribe    SubscribeFn
        redisModules bool
        cbMap        map[string]ChannelNotificationCb
        ch           intChannels
 }
 
+type Subscriber interface {
+       Channel() <-chan *redis.Message
+       Subscribe(channels ...string) error
+       Unsubscribe(channels ...string) error
+       Close() error
+}
+
+type SubscribeFn func(client RedisClient, channels ...string) Subscriber
+
+type RedisClient interface {
+       Command() *redis.CommandsInfoCmd
+       Close() error
+       Subscribe(channels ...string) *redis.PubSub
+       MSet(pairs ...interface{}) *redis.StatusCmd
+       Do(args ...interface{}) *redis.Cmd
+       MGet(keys ...string) *redis.SliceCmd
+       Del(keys ...string) *redis.IntCmd
+       Keys(pattern string) *redis.StringSliceCmd
+       SetNX(key string, value interface{}, expiration time.Duration) *redis.BoolCmd
+       SAdd(key string, members ...interface{}) *redis.IntCmd
+       SRem(key string, members ...interface{}) *redis.IntCmd
+       SMembers(key string) *redis.StringSliceCmd
+       SIsMember(key string, member interface{}) *redis.BoolCmd
+       SCard(key string) *redis.IntCmd
+}
+
 func checkResultAndError(result interface{}, err error) (bool, error) {
        if err != nil {
                if err == redis.Nil {
@@ -50,21 +78,38 @@ func checkResultAndError(result interface{}, err error) (bool, error) {
        }
        if result == "OK" {
                return true, nil
-       } else {
-               return false, nil
        }
+       return false, nil
 }
 
 func checkIntResultAndError(result interface{}, err error) (bool, error) {
        if err != nil {
                return false, err
        }
-       if result.(int64) == 1 {
+       if result == 1 {
                return true, nil
-       } else {
-               return false, nil
        }
+       return false, nil
+}
 
+func subscribeNotifications(client RedisClient, channels ...string) Subscriber {
+       return client.Subscribe(channels...)
+}
+
+func CreateDB(client RedisClient, subscribe SubscribeFn) *DB {
+       db := DB{
+               client:       client,
+               subscribe:    subscribe,
+               redisModules: true,
+               cbMap:        make(map[string]ChannelNotificationCb, 0),
+               ch: intChannels{
+                       addChannel:    make(chan string),
+                       removeChannel: make(chan string),
+                       exit:          make(chan bool),
+               },
+       }
+
+       return &db
 }
 
 func Create() *DB {
@@ -83,21 +128,16 @@ func Create() *DB {
                DB:       0,  // use default DB
                PoolSize: 20,
        })
+       db := CreateDB(client, subscribeNotifications)
+       db.CheckCommands()
+       return db
+}
 
-       db := DB{
-               client:       client,
-               redisModules: true,
-               cbMap:        make(map[string]ChannelNotificationCb, 0),
-               ch: intChannels{
-                       addChannel:    make(chan string),
-                       removeChannel: make(chan string),
-                       exit:          make(chan bool),
-               },
-       }
-
+func (db *DB) CheckCommands() {
        commands, err := db.client.Command().Result()
        if err == nil {
-               redisModuleCommands := []string{"setie", "delie", "msetpub", "setiepub", "setnxpub", "delpub"}
+               redisModuleCommands := []string{"setie", "delie", "setiepub", "setnxpub",
+                       "msetmpub", "delmpub"}
                for _, v := range redisModuleCommands {
                        _, ok := commands[v]
                        if !ok {
@@ -107,7 +147,6 @@ func Create() *DB {
        } else {
                fmt.Println(err)
        }
-       return &db
 }
 
 func (db *DB) CloseDB() error {
@@ -135,7 +174,7 @@ func (db *DB) SubscribeChannelDB(cb ChannelNotificationCb, channelPrefix, eventS
                        eventSeparator string,
                        ch intChannels,
                        channels ...string) {
-                       sub := db.client.Subscribe(channels...)
+                       sub := db.subscribe(db.client, channels...)
                        rxChannel := sub.Channel()
                        for {
                                select {
@@ -171,16 +210,20 @@ func (db *DB) MSet(pairs ...interface{}) error {
        return db.client.MSet(pairs...).Err()
 }
 
-func (db *DB) MSetPub(channel, message string, pairs ...interface{}) error {
+func (db *DB) MSetMPub(channelsAndEvents []string, pairs ...interface{}) error {
        if !db.redisModules {
-               return errors.New("Redis deployment doesn't support MSETPUB command")
+               return errors.New("Redis deployment doesn't support MSETMPUB command")
        }
        command := make([]interface{}, 0)
-       command = append(command, "MSETPUB")
+       command = append(command, "MSETMPUB")
+       command = append(command, len(pairs)/2)
+       command = append(command, len(channelsAndEvents)/2)
        for _, d := range pairs {
                command = append(command, d)
        }
-       command = append(command, channel, message)
+       for _, d := range channelsAndEvents {
+               command = append(command, d)
+       }
        _, err := db.client.Do(command...).Result()
        return err
 }
@@ -189,18 +232,23 @@ func (db *DB) MGet(keys []string) ([]interface{}, error) {
        return db.client.MGet(keys...).Result()
 }
 
-func (db *DB) DelPub(channel, message string, keys []string) error {
+func (db *DB) DelMPub(channelsAndEvents []string, keys []string) error {
        if !db.redisModules {
-               return errors.New("Redis deployment not supporting command DELPUB")
+               return errors.New("Redis deployment not supporting command DELMPUB")
        }
        command := make([]interface{}, 0)
-       command = append(command, "DELPUB")
+       command = append(command, "DELMPUB")
+       command = append(command, len(keys))
+       command = append(command, len(channelsAndEvents)/2)
        for _, d := range keys {
                command = append(command, d)
        }
-       command = append(command, channel, message)
+       for _, d := range channelsAndEvents {
+               command = append(command, d)
+       }
        _, err := db.client.Do(command...).Result()
        return err
+
 }
 
 func (db *DB) Del(keys []string) error {