X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=internal%2Fsdlgoredis%2Fsdlgoredis.go;fp=internal%2Fsdlgoredis%2Fsdlgoredis.go;h=ab56b1273d205b7a9ece39e5f0e21c5e048e738e;hb=ae7460ab662366115f6decc834a109bfa8985cc6;hp=b032d8f566c70e9699355c7f872175731578386b;hpb=135580f96b31e8dd0ccd5695df877a7942b912ae;p=ric-plt%2Fsdlgo.git diff --git a/internal/sdlgoredis/sdlgoredis.go b/internal/sdlgoredis/sdlgoredis.go index b032d8f..ab56b12 100644 --- a/internal/sdlgoredis/sdlgoredis.go +++ b/internal/sdlgoredis/sdlgoredis.go @@ -22,6 +22,7 @@ import ( "fmt" "os" "strings" + "time" "github.com/go-redis/redis" ) @@ -35,12 +36,39 @@ type intChannels struct { } type DB struct { - client *redis.Client + client RedisClient + subscribe SubscribeFn redisModules bool cbMap map[string]ChannelNotificationCb ch intChannels } +type Subscriber interface { + Channel() <-chan *redis.Message + Subscribe(channels ...string) error + Unsubscribe(channels ...string) error + Close() error +} + +type SubscribeFn func(client RedisClient, channels ...string) Subscriber + +type RedisClient interface { + Command() *redis.CommandsInfoCmd + Close() error + Subscribe(channels ...string) *redis.PubSub + MSet(pairs ...interface{}) *redis.StatusCmd + Do(args ...interface{}) *redis.Cmd + MGet(keys ...string) *redis.SliceCmd + Del(keys ...string) *redis.IntCmd + Keys(pattern string) *redis.StringSliceCmd + SetNX(key string, value interface{}, expiration time.Duration) *redis.BoolCmd + SAdd(key string, members ...interface{}) *redis.IntCmd + SRem(key string, members ...interface{}) *redis.IntCmd + SMembers(key string) *redis.StringSliceCmd + SIsMember(key string, member interface{}) *redis.BoolCmd + SCard(key string) *redis.IntCmd +} + func checkResultAndError(result interface{}, err error) (bool, error) { if err != nil { if err == redis.Nil { @@ -50,21 +78,38 @@ func checkResultAndError(result interface{}, err error) (bool, error) { } if result == "OK" { return true, nil - } else { - return false, nil } + return false, nil } func checkIntResultAndError(result interface{}, err error) (bool, error) { if err != nil { return false, err } - if result.(int64) == 1 { + if result == 1 { return true, nil - } else { - return false, nil } + return false, nil +} +func subscribeNotifications(client RedisClient, channels ...string) Subscriber { + return client.Subscribe(channels...) +} + +func CreateDB(client RedisClient, subscribe SubscribeFn) *DB { + db := DB{ + client: client, + subscribe: subscribe, + redisModules: true, + cbMap: make(map[string]ChannelNotificationCb, 0), + ch: intChannels{ + addChannel: make(chan string), + removeChannel: make(chan string), + exit: make(chan bool), + }, + } + + return &db } func Create() *DB { @@ -83,21 +128,16 @@ func Create() *DB { DB: 0, // use default DB PoolSize: 20, }) + db := CreateDB(client, subscribeNotifications) + db.CheckCommands() + return db +} - db := DB{ - client: client, - redisModules: true, - cbMap: make(map[string]ChannelNotificationCb, 0), - ch: intChannels{ - addChannel: make(chan string), - removeChannel: make(chan string), - exit: make(chan bool), - }, - } - +func (db *DB) CheckCommands() { commands, err := db.client.Command().Result() if err == nil { - redisModuleCommands := []string{"setie", "delie", "msetpub", "setiepub", "setnxpub", "delpub"} + redisModuleCommands := []string{"setie", "delie", "setiepub", "setnxpub", + "msetmpub", "delmpub"} for _, v := range redisModuleCommands { _, ok := commands[v] if !ok { @@ -107,7 +147,6 @@ func Create() *DB { } else { fmt.Println(err) } - return &db } func (db *DB) CloseDB() error { @@ -135,7 +174,7 @@ func (db *DB) SubscribeChannelDB(cb ChannelNotificationCb, channelPrefix, eventS eventSeparator string, ch intChannels, channels ...string) { - sub := db.client.Subscribe(channels...) + sub := db.subscribe(db.client, channels...) rxChannel := sub.Channel() for { select { @@ -171,16 +210,20 @@ func (db *DB) MSet(pairs ...interface{}) error { return db.client.MSet(pairs...).Err() } -func (db *DB) MSetPub(channel, message string, pairs ...interface{}) error { +func (db *DB) MSetMPub(channelsAndEvents []string, pairs ...interface{}) error { if !db.redisModules { - return errors.New("Redis deployment doesn't support MSETPUB command") + return errors.New("Redis deployment doesn't support MSETMPUB command") } command := make([]interface{}, 0) - command = append(command, "MSETPUB") + command = append(command, "MSETMPUB") + command = append(command, len(pairs)/2) + command = append(command, len(channelsAndEvents)/2) for _, d := range pairs { command = append(command, d) } - command = append(command, channel, message) + for _, d := range channelsAndEvents { + command = append(command, d) + } _, err := db.client.Do(command...).Result() return err } @@ -189,18 +232,23 @@ func (db *DB) MGet(keys []string) ([]interface{}, error) { return db.client.MGet(keys...).Result() } -func (db *DB) DelPub(channel, message string, keys []string) error { +func (db *DB) DelMPub(channelsAndEvents []string, keys []string) error { if !db.redisModules { - return errors.New("Redis deployment not supporting command DELPUB") + return errors.New("Redis deployment not supporting command DELMPUB") } command := make([]interface{}, 0) - command = append(command, "DELPUB") + command = append(command, "DELMPUB") + command = append(command, len(keys)) + command = append(command, len(channelsAndEvents)/2) for _, d := range keys { command = append(command, d) } - command = append(command, channel, message) + for _, d := range channelsAndEvents { + command = append(command, d) + } _, err := db.client.Do(command...).Result() return err + } func (db *DB) Del(keys []string) error {