"errors"
"fmt"
"os"
+ "strings"
"github.com/go-redis/redis"
)
+type ChannelNotificationCb func(channel string, payload ...string)
+
+type intChannels struct {
+ addChannel chan string
+ removeChannel chan string
+ exit chan bool
+}
+
type DB struct {
client *redis.Client
redisModules bool
+ cbMap map[string]ChannelNotificationCb
+ ch intChannels
+}
+
+func checkResultAndError(result interface{}, err error) (bool, error) {
+ if err != nil {
+ if err == redis.Nil {
+ return false, nil
+ }
+ return false, err
+ }
+ if result == "OK" {
+ return true, nil
+ } else {
+ return false, nil
+ }
+}
+
+func checkIntResultAndError(result interface{}, err error) (bool, error) {
+ if err != nil {
+ return false, err
+ }
+ if result.(int64) == 1 {
+ return true, nil
+ } else {
+ return false, nil
+ }
+
}
func Create() *DB {
db := DB{
client: client,
redisModules: true,
+ cbMap: make(map[string]ChannelNotificationCb, 0),
+ ch: intChannels{
+ addChannel: make(chan string),
+ removeChannel: make(chan string),
+ exit: make(chan bool),
+ },
}
commands, err := db.client.Command().Result()
if err == nil {
- redisModuleCommands := []string{"setie", "delie"}
+ redisModuleCommands := []string{"setie", "delie", "msetpub", "setiepub", "setnxpub", "delpub"}
for _, v := range redisModuleCommands {
_, ok := commands[v]
if !ok {
return db.client.Close()
}
+func (db *DB) UnsubscribeChannelDB(channels ...string) {
+ for _, v := range channels {
+ db.ch.removeChannel <- v
+ delete(db.cbMap, v)
+ if len(db.cbMap) == 0 {
+ db.ch.exit <- true
+ }
+ }
+}
+
+func (db *DB) SubscribeChannelDB(cb ChannelNotificationCb, channelPrefix, eventSeparator string, channels ...string) {
+ if len(db.cbMap) == 0 {
+ for _, v := range channels {
+ db.cbMap[v] = cb
+ }
+
+ go func(cbMap *map[string]ChannelNotificationCb,
+ channelPrefix,
+ eventSeparator string,
+ ch intChannels,
+ channels ...string) {
+ sub := db.client.Subscribe(channels...)
+ rxChannel := sub.Channel()
+ for {
+ select {
+ case msg := <-rxChannel:
+ cb, ok := (*cbMap)[msg.Channel]
+ if ok {
+ cb(strings.TrimPrefix(msg.Channel, channelPrefix), strings.Split(msg.Payload, eventSeparator)...)
+ }
+ case channel := <-ch.addChannel:
+ sub.Subscribe(channel)
+ case channel := <-ch.removeChannel:
+ sub.Unsubscribe(channel)
+ case exit := <-ch.exit:
+ if exit {
+ if err := sub.Close(); err != nil {
+ fmt.Println(err)
+ }
+ return
+ }
+ }
+ }
+ }(&db.cbMap, channelPrefix, eventSeparator, db.ch, channels...)
+
+ } else {
+ for _, v := range channels {
+ db.cbMap[v] = cb
+ db.ch.addChannel <- v
+ }
+ }
+}
+
func (db *DB) MSet(pairs ...interface{}) error {
return db.client.MSet(pairs...).Err()
}
+func (db *DB) MSetPub(channel, message string, pairs ...interface{}) error {
+ if !db.redisModules {
+ return errors.New("Redis deployment doesn't support MSETPUB command")
+ }
+ command := make([]interface{}, 0)
+ command = append(command, "MSETPUB")
+ for _, d := range pairs {
+ command = append(command, d)
+ }
+ command = append(command, channel, message)
+ _, err := db.client.Do(command...).Result()
+ return err
+}
+
func (db *DB) MGet(keys []string) ([]interface{}, error) {
- val, err := db.client.MGet(keys...).Result()
- return val, err
+ return db.client.MGet(keys...).Result()
+}
+
+func (db *DB) DelPub(channel, message string, keys []string) error {
+ if !db.redisModules {
+ return errors.New("Redis deployment not supporting command DELPUB")
+ }
+ command := make([]interface{}, 0)
+ command = append(command, "DELPUB")
+ for _, d := range keys {
+ command = append(command, d)
+ }
+ command = append(command, channel, message)
+ _, err := db.client.Do(command...).Result()
+ return err
}
func (db *DB) Del(keys []string) error {
}
func (db *DB) Keys(pattern string) ([]string, error) {
- val, err := db.client.Keys(pattern).Result()
- return val, err
+ return db.client.Keys(pattern).Result()
}
func (db *DB) SetIE(key string, oldData, newData interface{}) (bool, error) {
return false, errors.New("Redis deployment not supporting command")
}
- result, err := db.client.Do("SETIE", key, newData, oldData).Result()
- if err != nil {
- return false, err
- }
- if result == "OK" {
- return true, nil
- } else {
- return false, nil
+ return checkResultAndError(db.client.Do("SETIE", key, newData, oldData).Result())
+}
+
+func (db *DB) SetIEPub(channel, message, key string, oldData, newData interface{}) (bool, error) {
+ if !db.redisModules {
+ return false, errors.New("Redis deployment not supporting command SETIEPUB")
}
+ return checkResultAndError(db.client.Do("SETIEPUB", key, newData, oldData, channel, message).Result())
}
+func (db *DB) SetNXPub(channel, message, key string, data interface{}) (bool, error) {
+ if !db.redisModules {
+ return false, errors.New("Redis deployment not supporting command SETNXPUB")
+ }
+ return checkResultAndError(db.client.Do("SETNXPUB", key, data, channel, message).Result())
+}
func (db *DB) SetNX(key string, data interface{}) (bool, error) {
- result, err := db.client.SetNX(key, data, 0).Result()
- return result, err
+ return db.client.SetNX(key, data, 0).Result()
}
-func (db *DB) DelIE(key string, data interface{}) (bool, error) {
+func (db *DB) DelIEPub(channel, message, key string, data interface{}) (bool, error) {
if !db.redisModules {
return false, errors.New("Redis deployment not supporting command")
}
- result, err := db.client.Do("DELIE", key, data).Result()
- if err != nil {
- return false, err
- }
- if result == "1" {
- return true, nil
+ return checkIntResultAndError(db.client.Do("DELIEPUB", key, data, channel, message).Result())
+}
+
+func (db *DB) DelIE(key string, data interface{}) (bool, error) {
+ if !db.redisModules {
+ return false, errors.New("Redis deployment not supporting command")
}
- return false, nil
+ return checkIntResultAndError(db.client.Do("DELIE", key, data).Result())
}
package sdlgo
import (
+ "errors"
+ "fmt"
"reflect"
"strings"
- "gerrit.oran-osc.org/r/ric-plt/sdlgo/internal/sdlgoredis"
+ "gerrit.o-ran-sc.org/r/ric-plt/sdlgo/internal/sdlgoredis"
)
type iDatabase interface {
+ SubscribeChannelDB(cb sdlgoredis.ChannelNotificationCb, channelPrefix, eventSeparator string, channels ...string)
+ UnsubscribeChannelDB(channels ...string)
MSet(pairs ...interface{}) error
+ MSetPub(ns, message string, pairs ...interface{}) error
MGet(keys []string) ([]interface{}, error)
CloseDB() error
Del(keys []string) error
+ DelPub(channel, message string, keys []string) error
Keys(key string) ([]string, error)
SetIE(key string, oldData, newData interface{}) (bool, error)
+ SetIEPub(channel, message, key string, oldData, newData interface{}) (bool, error)
SetNX(key string, data interface{}) (bool, error)
+ SetNXPub(channel, message, key string, data interface{}) (bool, error)
DelIE(key string, data interface{}) (bool, error)
+ DelIEPub(channel, message, key string, data interface{}) (bool, error)
}
+//SdlInstance provides an API to read, write and modify
+//key-value pairs in a given namespace.
type SdlInstance struct {
- nameSpace string
- nsPrefix string
+ nameSpace string
+ nsPrefix string
+ eventSeparator string
iDatabase
}
//as a backend for the key-value storage. The returned value shall
//be given as a parameter when calling NewKeyValStorage
func NewDatabase() *sdlgoredis.DB {
- db := sdlgoredis.Create()
- return db
+ return sdlgoredis.Create()
}
//NewSdlInstance creates a new sdl instance using the given namespace.
//The database used as a backend is given as a parameter
func NewSdlInstance(NameSpace string, db iDatabase) *SdlInstance {
- s := SdlInstance{
- nameSpace: NameSpace,
- nsPrefix: "{" + NameSpace + "},",
- iDatabase: db,
+ return &SdlInstance{
+ nameSpace: NameSpace,
+ nsPrefix: "{" + NameSpace + "},",
+ eventSeparator: "___",
+ iDatabase: db,
}
+}
+
+//SubscribeChannel lets you to subscribe for a events on a given channels.
+//SDL notifications are events that are published on a specific channels.
+//Both the channel and events are defined by the entity that is publishing
+//the events.
+//
+//When subscribing for a channel, a callback function is given as a parameter.
+//Whenever a notification is received from a channel, this callback is called
+//with channel and notifications as parameter (several notifications could be
+//packed to a single callback function call). A call to SubscribeChannel function
+//returns immediatelly, callbacks will be called asyncronously.
+//
+//It is possible to subscribe to different channels using different callbacks. In
+//this case simply use SubscribeChannel function separately for each channel.
+//
+//When receiving events in callback routine, it is a good practive to return from
+//callback as quickly as possible. E.g. reading in callback context should be avoided
+//and using of Go signals is recommended. Also it should be noted that in case of several
+//events received from different channels, callbacks are called in series one by one.
+func (s *SdlInstance) SubscribeChannel(cb func(string, ...string), channels ...string) error {
+ s.SubscribeChannelDB(cb, s.nsPrefix, s.eventSeparator, s.setNamespaceToChannels(channels...)...)
+ return nil
+}
- return &s
+//UnsubscribeChannel removes subscription from one or several channels.
+func (s *SdlInstance) UnsubscribeChannel(channels ...string) error {
+ s.UnsubscribeChannelDB(s.setNamespaceToChannels(channels...)...)
+ return nil
}
+//Close connection to backend database.
func (s *SdlInstance) Close() error {
return s.CloseDB()
}
-func (s *SdlInstance) setNamespaceToKeys(pairs ...interface{}) []interface{} {
- var retVal []interface{}
- for i, v := range pairs {
- if i%2 == 0 {
- reflectType := reflect.TypeOf(v)
- switch reflectType.Kind() {
- case reflect.Slice:
+func (s *SdlInstance) checkChannelsAndEvents(cmd string, channelsAndEvents []string) error {
+ if len(channelsAndEvents)%2 != 0 {
+ return fmt.Errorf("%s: Channels and events must be given as pairs", cmd)
+ }
+ for i, v := range channelsAndEvents {
+ if i%2 != 0 {
+ if strings.Contains(v, s.eventSeparator) {
+ return fmt.Errorf("%s: event %s contains illegal substring (\"%s\")", cmd, v, s.eventSeparator)
+ }
+ }
+ }
+ return nil
+}
+func (s *SdlInstance) setNamespaceToChannels(channels ...string) []string {
+ var retVal []string
+ for _, v := range channels {
+ retVal = append(retVal, s.nsPrefix+v)
+ }
+ return retVal
+}
+
+func (s *SdlInstance) setNamespaceToKeys(pairs ...interface{}) ([]interface{}, error) {
+ retVal := make([]interface{}, 0)
+ shouldBeKey := true
+ for _, v := range pairs {
+ reflectType := reflect.TypeOf(v)
+ switch reflectType.Kind() {
+ case reflect.Slice:
+ if shouldBeKey {
x := reflect.ValueOf(v)
+ if x.Len()%2 != 0 {
+ return []interface{}{}, errors.New("Key/value pairs doesn't match")
+ }
for i2 := 0; i2 < x.Len(); i2++ {
if i2%2 == 0 {
retVal = append(retVal, s.nsPrefix+x.Index(i2).Interface().(string))
retVal = append(retVal, x.Index(i2).Interface())
}
}
- case reflect.Array:
+ } else {
+ return []interface{}{}, errors.New("Key/value pairs doesn't match")
+ }
+ case reflect.Array:
+ if shouldBeKey {
x := reflect.ValueOf(v)
+ if x.Len()%2 != 0 {
+ return []interface{}{}, errors.New("Key/value pairs doesn't match")
+ }
for i2 := 0; i2 < x.Len(); i2++ {
if i2%2 == 0 {
retVal = append(retVal, s.nsPrefix+x.Index(i2).Interface().(string))
retVal = append(retVal, x.Index(i2).Interface())
}
}
- default:
+ } else {
+ return []interface{}{}, errors.New("Key/value pairs doesn't match")
+ }
+ default:
+ if shouldBeKey {
retVal = append(retVal, s.nsPrefix+v.(string))
+ shouldBeKey = false
+ } else {
+ retVal = append(retVal, v)
+ shouldBeKey = true
}
+ }
+ }
+ if len(retVal)%2 != 0 {
+ return []interface{}{}, errors.New("Key/value pairs doesn't match")
+ }
+ return retVal, nil
+}
+
+func (s *SdlInstance) prepareChannelsAndEvents(channelsAndEvents []string) []string {
+ channelEventMap := make(map[string]string)
+ for i, v := range channelsAndEvents {
+ if i%2 != 0 {
+ continue
+ }
+ _, exists := channelEventMap[v]
+ if exists {
+ channelEventMap[v] = channelEventMap[v] + s.eventSeparator + channelsAndEvents[i+1]
} else {
- retVal = append(retVal, v)
+ channelEventMap[v] = channelsAndEvents[i+1]
}
}
+ retVal := make([]string, 0)
+ for k, v := range channelEventMap {
+ retVal = append(retVal, s.nsPrefix+k)
+ retVal = append(retVal, v)
+ }
return retVal
}
+//SetAndPublish function writes data to shared data layer storage and send an event to
+//a channel. Writing is done atomically, i.e. all succeeds or fails.
+//Data to be written is given as key-value pairs. Several key-value
+//pairs can be written with one call.
+//The key is expected to be string whereas value can be anything, string,
+//number, slice array or map
+//
+//Channels and events are given as pairs is channelsAndEvents parameter.
+//Although it is possible to give sevral channel-event pairs, current implementation
+//supports sending events to one channel only due to missing support in DB backend.
+func (s *SdlInstance) SetAndPublish(channelsAndEvents []string, pairs ...interface{}) error {
+ if len(pairs)%2 != 0 {
+ return errors.New("Invalid pairs parameter")
+ }
+
+ keyAndData, err := s.setNamespaceToKeys(pairs...)
+ if err != nil {
+ return err
+ }
+ if len(channelsAndEvents) == 0 {
+ return s.MSet(keyAndData...)
+ }
+ if err := s.checkChannelsAndEvents("SetAndPublish", channelsAndEvents); err != nil {
+ return err
+ }
+ channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
+ return s.MSetPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], keyAndData...)
+}
+
//Set function writes data to shared data layer storage. Writing is done
//atomically, i.e. all succeeds or fails.
//Data to be written is given as key-value pairs. Several key-value
return nil
}
- keyAndData := s.setNamespaceToKeys(pairs...)
- err := s.MSet(keyAndData...)
- return err
+ keyAndData, err := s.setNamespaceToKeys(pairs...)
+ if err != nil {
+ return err
+ }
+ return s.MSet(keyAndData...)
}
//Get function atomically reads one or more keys from SDL. The returned map has the
return m, err
}
+//SetIfAndPublish atomically replaces existing data with newData in SDL if data matches the oldData.
+//If replace was done successfully, true will be returned. Also, if publishing was successfull, an event
+//is published to a given channel.
+func (s *SdlInstance) SetIfAndPublish(channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error) {
+ if len(channelsAndEvents) == 0 {
+ return s.SetIE(s.nsPrefix+key, oldData, newData)
+ }
+ if err := s.checkChannelsAndEvents("SetIfAndPublish", channelsAndEvents); err != nil {
+ return false, err
+ }
+ channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
+ return s.SetIEPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], s.nsPrefix+key, oldData, newData)
+}
+
//SetIf atomically replaces existing data with newData in SDL if data matches the oldData.
//If replace was done successfully, true will be returned.
func (s *SdlInstance) SetIf(key string, oldData, newData interface{}) (bool, error) {
- status, err := s.SetIE(s.nsPrefix+key, oldData, newData)
- if err != nil {
+ return s.SetIE(s.nsPrefix+key, oldData, newData)
+}
+
+//SetIfNotExistsAndPublish conditionally sets the value of a key. If key already exists in SDL,
+//then it's value is not changed. Checking the key existence and potential set operation
+//is done atomically. If the set operation was done successfully, an event is published to a
+//given channel.
+func (s *SdlInstance) SetIfNotExistsAndPublish(channelsAndEvents []string, key string, data interface{}) (bool, error) {
+ if len(channelsAndEvents) == 0 {
+ return s.SetNX(s.nsPrefix+key, data)
+ }
+ if err := s.checkChannelsAndEvents("SetIfNotExistsAndPublish", channelsAndEvents); err != nil {
return false, err
}
- return status, nil
+ channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
+ return s.SetNXPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], s.nsPrefix+key, data)
}
//SetIfNotExists conditionally sets the value of a key. If key already exists in SDL,
//then it's value is not changed. Checking the key existence and potential set operation
//is done atomically.
func (s *SdlInstance) SetIfNotExists(key string, data interface{}) (bool, error) {
- status, err := s.SetNX(s.nsPrefix+key, data)
- if err != nil {
- return false, err
+ return s.SetNX(s.nsPrefix+key, data)
+}
+
+//RemoveAndPublish removes data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
+//An event is published into a given channel if remove operation is successfull.
+func (s *SdlInstance) RemoveAndPublish(channelsAndEvents []string, keys []string) error {
+ if len(keys) == 0 {
+ return nil
}
- return status, nil
+
+ var keysWithNs []string
+ for _, v := range keys {
+ keysWithNs = append(keysWithNs, s.nsPrefix+v)
+ }
+ if len(channelsAndEvents) == 0 {
+ return s.Del(keysWithNs)
+ }
+ if err := s.checkChannelsAndEvents("RemoveAndPublish", channelsAndEvents); err != nil {
+ return err
+ }
+ channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
+ return s.DelPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], keysWithNs)
}
-//Remove data from SDL. Operation is done atomically, i.e. either all succeeds or fails
+//Remove data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
func (s *SdlInstance) Remove(keys []string) error {
if len(keys) == 0 {
return nil
return err
}
+//RemoveIfAndPublish removes data from SDL conditionally and if remove was done successfully,
+//a given event is published to channel. If existing data matches given data,
+//key and data are removed from SDL. If remove was done successfully, true is returned.
+func (s *SdlInstance) RemoveIfAndPublish(channelsAndEvents []string, key string, data interface{}) (bool, error) {
+ if len(channelsAndEvents) == 0 {
+ return s.DelIE(s.nsPrefix+key, data)
+ }
+ if err := s.checkChannelsAndEvents("RemoveIfAndPublish", channelsAndEvents); err != nil {
+ return false, err
+ }
+ channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
+ return s.DelIEPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], s.nsPrefix+key, data)
+}
+
//RemoveIf removes data from SDL conditionally. If existing data matches given data,
//key and data are removed from SDL. If remove was done successfully, true is returned.
func (s *SdlInstance) RemoveIf(key string, data interface{}) (bool, error) {
//given namespace exists, thus operation is not guaranteed to be atomic or isolated.
func (s *SdlInstance) GetAll() ([]string, error) {
keys, err := s.Keys(s.nsPrefix + "*")
- var retVal []string = nil
+ var retVal []string
if err != nil {
return retVal, err
}
if err != nil {
return err
}
- if keys != nil {
+ if (keys != nil) && (len(keys) != 0) {
err = s.Del(keys)
}
return err
}
+
+func (s *SdlInstance) RemoveAllAndPublish(channelsAndEvents []string) error {
+ keys, err := s.Keys(s.nsPrefix + "*")
+ if err != nil {
+ return err
+ }
+ if (keys != nil) && (len(keys) != 0) {
+ if len(channelsAndEvents) == 0 {
+ return s.Del(keys)
+ }
+ if err := s.checkChannelsAndEvents("RemoveIfAndPublish", channelsAndEvents); err != nil {
+ return err
+ }
+ channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
+ err = s.DelPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], keys)
+ }
+ return err
+
+}
"errors"
"testing"
- "gerrit.oran-osc.org/r/ric-plt/sdlgo"
+ "gerrit.o-ran-sc.org/r/ric-plt/sdlgo"
+ "gerrit.o-ran-sc.org/r/ric-plt/sdlgo/internal/sdlgoredis"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)
mock.Mock
}
+func (m *mockDB) SubscribeChannelDB(cb sdlgoredis.ChannelNotificationCb, channelPrefix, eventSeparator string, channels ...string) {
+ m.Called(cb, channelPrefix, eventSeparator, channels)
+}
+
+func (m *mockDB) UnsubscribeChannelDB(channels ...string) {
+ m.Called(channels)
+}
+
func (m *mockDB) MSet(pairs ...interface{}) error {
a := m.Called(pairs)
return a.Error(0)
}
+func (m *mockDB) MSetPub(ns, message string, pairs ...interface{}) error {
+ a := m.Called(ns, message, pairs)
+ return a.Error(0)
+}
+
func (m *mockDB) MGet(keys []string) ([]interface{}, error) {
a := m.Called(keys)
return a.Get(0).([]interface{}), a.Error(1)
return a.Error(0)
}
+func (m *mockDB) DelPub(channel, message string, keys []string) error {
+ a := m.Called(channel, message, keys)
+ return a.Error(0)
+}
+
func (m *mockDB) Keys(pattern string) ([]string, error) {
a := m.Called(pattern)
return a.Get(0).([]string), a.Error(1)
return a.Bool(0), a.Error(1)
}
+func (m *mockDB) SetIEPub(channel, message, key string, oldData, newData interface{}) (bool, error) {
+ a := m.Called(channel, message, key, oldData, newData)
+ return a.Bool(0), a.Error(1)
+}
+
func (m *mockDB) SetNX(key string, data interface{}) (bool, error) {
a := m.Called(key, data)
return a.Bool(0), a.Error(1)
}
+func (m *mockDB) SetNXPub(channel, message, key string, data interface{}) (bool, error) {
+ a := m.Called(channel, message, key, data)
+ return a.Bool(0), a.Error(1)
+}
+
func (m *mockDB) DelIE(key string, data interface{}) (bool, error) {
a := m.Called(key, data)
return a.Bool(0), a.Error(1)
}
+func (m *mockDB) DelIEPub(channel, message, key string, data interface{}) (bool, error) {
+ a := m.Called(channel, message, key, data)
+ return a.Bool(0), a.Error(1)
+}
+
func setup() (*mockDB, *sdlgo.SdlInstance) {
m := new(mockDB)
i := sdlgo.NewSdlInstance("namespace", m)
return m, i
}
+func TestSubscribeChannel(t *testing.T) {
+ m, i := setup()
+
+ expectedCB := func(channel string, events ...string) {}
+ expectedChannels := []string{"{namespace},channel1", "{namespace},channel2"}
+
+ m.On("SubscribeChannelDB", mock.AnythingOfType("sdlgoredis.ChannelNotificationCb"), "{namespace},", "___", expectedChannels).Return()
+ i.SubscribeChannel(expectedCB, "channel1", "channel2")
+ m.AssertExpectations(t)
+}
+
+func TestUnsubscribeChannel(t *testing.T) {
+ m, i := setup()
+
+ expectedChannels := []string{"{namespace},channel1", "{namespace},channel2"}
+
+ m.On("UnsubscribeChannelDB", expectedChannels).Return()
+ i.UnsubscribeChannel("channel1", "channel2")
+ m.AssertExpectations(t)
+}
func TestGetOneKey(t *testing.T) {
m, i := setup()
m.AssertExpectations(t)
}
+func TestWriteMixed(t *testing.T) {
+ m, i := setup()
+
+ msetExpected := []interface{}{"{namespace},key1", "data1", "{namespace},key2", "data2", "{namespace},key3", "data3"}
+
+ m.On("MSet", msetExpected).Return(nil)
+ err := i.Set("key1", "data1", []string{"key2", "data2"}, [2]string{"key3", "data3"})
+ assert.Nil(t, err)
+ m.AssertExpectations(t)
+}
+
+func TestWriteIncorrectMixed(t *testing.T) {
+ m, i := setup()
+
+ msetExpected := []interface{}{"{namespace},key1", "data1", "{namespace},key2", "data2", "{namespace},key3", "data3"}
+
+ m.AssertNotCalled(t, "MSet", msetExpected)
+ err := i.Set("key1", []string{"key2", "data2"}, [2]string{"key3", "data3"})
+ assert.NotNil(t, err)
+ m.AssertExpectations(t)
+}
+func TestWriteIncorrectPairs(t *testing.T) {
+ m, i := setup()
+
+ msetExpected := []interface{}{}
+
+ m.AssertNotCalled(t, "MSet", msetExpected)
+ err := i.Set("key")
+ assert.NotNil(t, err)
+ m.AssertExpectations(t)
+}
func TestWriteSeveralKeysSlice(t *testing.T) {
m, i := setup()
}
+func TestWriteSeveralKeysIncorrectSlice(t *testing.T) {
+ m, i := setup()
+
+ msetExpected := []interface{}{"{namespace},key1", "data1", "{namespace},key2", 22}
+
+ m.AssertNotCalled(t, "MSet", msetExpected)
+ err := i.Set([]interface{}{"key1", "data1", "key2"})
+ assert.NotNil(t, err)
+ m.AssertExpectations(t)
+
+}
+
func TestWriteSeveralKeysArray(t *testing.T) {
m, i := setup()
m.AssertExpectations(t)
}
+func TestWriteSeveralKeysIncorrectArray(t *testing.T) {
+ m, i := setup()
+
+ msetExpected := []interface{}{}
+
+ m.AssertNotCalled(t, "MSet", msetExpected)
+ err := i.Set([3]string{"key1", "data1", "key2"})
+ assert.NotNil(t, err)
+ m.AssertExpectations(t)
+}
+
func TestWriteFail(t *testing.T) {
m, i := setup()
m.AssertNotCalled(t, "MSet", msetExpected)
}
+func TestWriteAndPublishOneKeyOneChannel(t *testing.T) {
+ m, i := setup()
+
+ expectedChannel := "{namespace},channel"
+ expectedMessage := "event"
+ expectedKeyVal := []interface{}{"{namespace},key1", "data1"}
+
+ m.On("MSetPub", expectedChannel, expectedMessage, expectedKeyVal).Return(nil)
+ m.AssertNotCalled(t, "MSet", expectedKeyVal)
+ err := i.SetAndPublish([]string{"channel", "event"}, "key1", "data1")
+ assert.Nil(t, err)
+ m.AssertExpectations(t)
+}
+func TestWriteAndPublishOneKeyOneChannelTwoEvents(t *testing.T) {
+ m, i := setup()
+
+ expectedChannel := "{namespace},channel"
+ expectedMessage := "event1___event2"
+ expectedKeyVal := []interface{}{"{namespace},key1", "data1"}
+
+ m.On("MSetPub", expectedChannel, expectedMessage, expectedKeyVal).Return(nil)
+ m.AssertNotCalled(t, "MSet", expectedKeyVal)
+ err := i.SetAndPublish([]string{"channel", "event1", "channel", "event2"}, "key1", "data1")
+ assert.Nil(t, err)
+ m.AssertExpectations(t)
+}
+
+func TestWriteAndPublishIncorrectChannelAndEvent(t *testing.T) {
+ m, i := setup()
+
+ expectedChannel := "{namespace},channel"
+ expectedMessage := "event1___event2"
+ expectedKeyVal := []interface{}{"{namespace},key1", "data1"}
+ m.AssertNotCalled(t, "MSetPub", expectedChannel, expectedMessage, expectedKeyVal)
+ m.AssertNotCalled(t, "MSet", expectedKeyVal)
+ err := i.SetAndPublish([]string{"channel", "event1", "channel"}, "key1", "data1")
+ assert.NotNil(t, err)
+ m.AssertExpectations(t)
+}
+
+func TestWriteAndPublishNotAllowedCharactersInEvents(t *testing.T) {
+ m, i := setup()
+
+ expectedChannel := "{namespace},channel"
+ expectedMessage := "event1___event2"
+ expectedKeyVal := []interface{}{"{namespace},key1", "data1"}
+ m.AssertNotCalled(t, "MSetPub", expectedChannel, expectedMessage, expectedKeyVal)
+ m.AssertNotCalled(t, "MSet", expectedKeyVal)
+ err := i.SetAndPublish([]string{"channel", "event1___event2"}, "key1", "data1")
+ assert.NotNil(t, err)
+ m.AssertExpectations(t)
+}
+
+func TestWriteAndPublishNoData(t *testing.T) {
+ m, i := setup()
+
+ expectedChannel := "{namespace},channel"
+ expectedMessage := "event"
+ expectedKeyVal := []interface{}{"key"}
+
+ m.AssertNotCalled(t, "MSetPub", expectedChannel, expectedMessage, expectedKeyVal)
+ m.AssertNotCalled(t, "MSet", expectedKeyVal)
+ err := i.SetAndPublish([]string{"channel", "event"}, []interface{}{"key"})
+ assert.NotNil(t, err)
+ m.AssertExpectations(t)
+}
+
+func TestWriteAndPublishNoChannelEvent(t *testing.T) {
+ m, i := setup()
+
+ expectedKeyVal := []interface{}{"{namespace},key1", "data1"}
+
+ m.On("MSet", expectedKeyVal).Return(nil)
+ m.AssertNotCalled(t, "MSetPub", "", "", expectedKeyVal)
+ err := i.SetAndPublish([]string{}, "key1", "data1")
+ assert.Nil(t, err)
+ m.AssertExpectations(t)
+
+}
+
+func TestRemoveAndPublishSuccessfully(t *testing.T) {
+ m, i := setup()
+
+ expectedChannel := "{namespace},channel"
+ expectedEvent := "event"
+ expectedKeys := []string{"{namespace},key1", "{namespace},key2"}
+
+ m.On("DelPub", expectedChannel, expectedEvent, expectedKeys).Return(nil)
+ err := i.RemoveAndPublish([]string{"channel", "event"}, []string{"key1", "key2"})
+ assert.Nil(t, err)
+ m.AssertExpectations(t)
+}
+func TestRemoveAndPublishFail(t *testing.T) {
+ m, i := setup()
+
+ expectedChannel := "{namespace},channel"
+ expectedEvent := "event"
+ expectedKeys := []string{"{namespace},key1", "{namespace},key2"}
+
+ m.On("DelPub", expectedChannel, expectedEvent, expectedKeys).Return(errors.New("Some error"))
+ err := i.RemoveAndPublish([]string{"channel", "event"}, []string{"key1", "key2"})
+ assert.NotNil(t, err)
+ m.AssertExpectations(t)
+}
+
+func TestRemoveAndPublishNoChannels(t *testing.T) {
+ m, i := setup()
+
+ expectedKeys := []string{"{namespace},key1", "{namespace},key2"}
+
+ m.On("Del", expectedKeys).Return(nil)
+ err := i.RemoveAndPublish([]string{}, []string{"key1", "key2"})
+ assert.Nil(t, err)
+ m.AssertExpectations(t)
+}
+
+func TestRemoveAndPublishIncorrectChannel(t *testing.T) {
+ m, i := setup()
+
+ notExpectedChannel := "{namespace},channel"
+ notExpectedEvent := "event"
+ notExpectedKeys := []string{"{namespace},key"}
+
+ m.AssertNotCalled(t, "DelPub", notExpectedChannel, notExpectedEvent, notExpectedKeys)
+ m.AssertNotCalled(t, "Del", notExpectedKeys)
+ err := i.RemoveAndPublish([]string{"channel", "event", "channel2"}, []string{})
+ assert.Nil(t, err)
+ m.AssertExpectations(t)
+
+}
+func TestRemoveAndPublishNoKeys(t *testing.T) {
+ m, i := setup()
+
+ notExpectedChannel := "{namespace},channel"
+ notExpectedEvent := "event"
+ notExpectedKeys := []string{"{namespace},key"}
+
+ m.AssertNotCalled(t, "DelPub", notExpectedChannel, notExpectedEvent, notExpectedKeys)
+ m.AssertNotCalled(t, "Del", notExpectedKeys)
+ err := i.RemoveAndPublish([]string{"channel", "event"}, []string{})
+ assert.Nil(t, err)
+ m.AssertExpectations(t)
+}
+func TestRemoveAndPublishNoChannelsError(t *testing.T) {
+ m, i := setup()
+
+ expectedKeys := []string{"{namespace},key1", "{namespace},key2"}
+
+ m.On("Del", expectedKeys).Return(errors.New("Some error"))
+ err := i.RemoveAndPublish([]string{}, []string{"key1", "key2"})
+ assert.NotNil(t, err)
+ m.AssertExpectations(t)
+}
func TestRemoveSuccessfully(t *testing.T) {
m, i := setup()
mSetIEExpectedKey := string("{namespace},key1")
mSetIEExpectedOldData := interface{}("olddata")
mSetIEExpectedNewData := interface{}("newdata")
- m.On("SetIE", mSetIEExpectedKey, mSetIEExpectedOldData, mSetIEExpectedNewData).Return(true, errors.New("Some error"))
+ m.On("SetIE", mSetIEExpectedKey, mSetIEExpectedOldData, mSetIEExpectedNewData).Return(false, errors.New("Some error"))
status, err := i.SetIf("key1", "olddata", "newdata")
assert.NotNil(t, err)
assert.False(t, status)
m.AssertExpectations(t)
}
+func TestSetIfAndPublishSuccessfully(t *testing.T) {
+ m, i := setup()
+
+ expectedChannel := "{namespace},channel"
+ expectedEvent := "event"
+ expectedKey := "{namespace},key"
+ expectedOldData := interface{}("olddata")
+ expectedNewData := interface{}("newdata")
+ m.On("SetIEPub", expectedChannel, expectedEvent, expectedKey, expectedOldData, expectedNewData).Return(true, nil)
+ status, err := i.SetIfAndPublish([]string{"channel", "event"}, "key", "olddata", "newdata")
+ assert.Nil(t, err)
+ assert.True(t, status)
+ m.AssertExpectations(t)
+}
+
+func TestSetIfAndPublishIncorrectChannelAndEvent(t *testing.T) {
+ m, i := setup()
+
+ expectedChannel := "{namespace},channel"
+ expectedEvent := "event"
+ expectedKey := "{namespace},key"
+ expectedOldData := interface{}("olddata")
+ expectedNewData := interface{}("newdata")
+ m.AssertNotCalled(t, "SetIEPub", expectedChannel, expectedEvent, expectedKey, expectedOldData, expectedNewData)
+ m.AssertNotCalled(t, "SetIE", expectedKey, expectedOldData, expectedNewData)
+ status, err := i.SetIfAndPublish([]string{"channel", "event1", "channel"}, "key", "olddata", "newdata")
+ assert.NotNil(t, err)
+ assert.False(t, status)
+ m.AssertExpectations(t)
+}
+func TestSetIfAndPublishNOKStatus(t *testing.T) {
+ m, i := setup()
+
+ expectedChannel := "{namespace},channel"
+ expectedEvent := "event"
+ expectedKey := "{namespace},key"
+ expectedOldData := interface{}("olddata")
+ expectedNewData := interface{}("newdata")
+ m.On("SetIEPub", expectedChannel, expectedEvent, expectedKey, expectedOldData, expectedNewData).Return(false, nil)
+ status, err := i.SetIfAndPublish([]string{"channel", "event"}, "key", "olddata", "newdata")
+ assert.Nil(t, err)
+ assert.False(t, status)
+ m.AssertExpectations(t)
+}
+
+func TestSetIfAndPublishNoChannels(t *testing.T) {
+ m, i := setup()
+
+ expectedKey := "{namespace},key"
+ expectedOldData := interface{}("olddata")
+ expectedNewData := interface{}("newdata")
+ m.On("SetIE", expectedKey, expectedOldData, expectedNewData).Return(true, nil)
+ status, err := i.SetIfAndPublish([]string{}, "key", "olddata", "newdata")
+ assert.Nil(t, err)
+ assert.True(t, status)
+ m.AssertExpectations(t)
+}
+
+func TestSetIfNotExistsAndPublishSuccessfully(t *testing.T) {
+ m, i := setup()
+
+ expectedChannel := "{namespace},channel"
+ expectedEvent := "event"
+ expectedKey := "{namespace},key"
+ expectedData := interface{}("data")
+
+ m.On("SetNXPub", expectedChannel, expectedEvent, expectedKey, expectedData).Return(true, nil)
+ status, err := i.SetIfNotExistsAndPublish([]string{"channel", "event"}, "key", "data")
+ assert.Nil(t, err)
+ assert.True(t, status)
+ m.AssertExpectations(t)
+}
+
+func TestSetIfNotExistsAndPublishSeveralEvents(t *testing.T) {
+ m, i := setup()
+
+ expectedChannel := "{namespace},channel"
+ expectedEvent := "event1___event2"
+ expectedKey := "{namespace},key"
+ expectedData := interface{}("data")
+
+ m.On("SetNXPub", expectedChannel, expectedEvent, expectedKey, expectedData).Return(true, nil)
+ status, err := i.SetIfNotExistsAndPublish([]string{"channel", "event1", "channel", "event2"}, "key", "data")
+ assert.Nil(t, err)
+ assert.True(t, status)
+ m.AssertExpectations(t)
+}
+
+func TestSetIfNotExistsAndPublishNoChannels(t *testing.T) {
+ m, i := setup()
+
+ expectedKey := "{namespace},key"
+ expectedData := interface{}("data")
+
+ m.On("SetNX", expectedKey, expectedData).Return(true, nil)
+ status, err := i.SetIfNotExistsAndPublish([]string{}, "key", "data")
+ assert.Nil(t, err)
+ assert.True(t, status)
+ m.AssertExpectations(t)
+}
+
+func TestSetIfNotExistsAndPublishFail(t *testing.T) {
+ m, i := setup()
+
+ expectedChannel := "{namespace},channel"
+ expectedEvent := "event"
+ expectedKey := "{namespace},key"
+ expectedData := interface{}("data")
+
+ m.On("SetNXPub", expectedChannel, expectedEvent, expectedKey, expectedData).Return(false, nil)
+ status, err := i.SetIfNotExistsAndPublish([]string{"channel", "event"}, "key", "data")
+ assert.Nil(t, err)
+ assert.False(t, status)
+ m.AssertExpectations(t)
+}
+
+func TestSetIfNotExistsAndPublishIncorrectChannels(t *testing.T) {
+ m, i := setup()
+
+ expectedChannel := "{namespace},channel"
+ expectedEvent := "event"
+ expectedKey := "{namespace},key"
+ expectedData := interface{}("data")
+
+ m.AssertNotCalled(t, "SetNXPub", expectedChannel, expectedEvent, expectedKey, expectedData)
+ m.AssertNotCalled(t, "SetNX", expectedKey, expectedData)
+ status, err := i.SetIfNotExistsAndPublish([]string{"channel", "event", "channel2"}, "key", "data")
+ assert.NotNil(t, err)
+ assert.False(t, status)
+ m.AssertExpectations(t)
+}
+
+func TestSetIfNotExistsAndPublishError(t *testing.T) {
+ m, i := setup()
+
+ expectedChannel := "{namespace},channel"
+ expectedEvent := "event"
+ expectedKey := "{namespace},key"
+ expectedData := interface{}("data")
+
+ m.On("SetNXPub", expectedChannel, expectedEvent, expectedKey, expectedData).Return(false, errors.New("Some error"))
+ status, err := i.SetIfNotExistsAndPublish([]string{"channel", "event"}, "key", "data")
+ assert.NotNil(t, err)
+ assert.False(t, status)
+ m.AssertExpectations(t)
+}
+
func TestSetIfNotExistsSuccessfullyOkStatus(t *testing.T) {
m, i := setup()
mSetNXExpectedKey := string("{namespace},key1")
mSetNXExpectedData := interface{}("data")
- m.On("SetNX", mSetNXExpectedKey, mSetNXExpectedData).Return(true, errors.New("Some error"))
+ m.On("SetNX", mSetNXExpectedKey, mSetNXExpectedData).Return(false, errors.New("Some error"))
status, err := i.SetIfNotExists("key1", "data")
assert.NotNil(t, err)
assert.False(t, status)
m.AssertExpectations(t)
}
+
+func TestRemoveIfAndPublishSuccessfully(t *testing.T) {
+ m, i := setup()
+
+ expectedChannel := "{namespace},channel"
+ expectedEvent := "event1___event2"
+ expectedKey := "{namespace},key"
+ expectedValue := interface{}("data")
+
+ m.On("DelIEPub", expectedChannel, expectedEvent, expectedKey, expectedValue).Return(true, nil)
+ status, err := i.RemoveIfAndPublish([]string{"channel", "event1", "channel", "event2"}, "key", "data")
+ assert.Nil(t, err)
+ assert.True(t, status)
+ m.AssertExpectations(t)
+}
+
+func TestRemoveIfAndPublishNok(t *testing.T) {
+ m, i := setup()
+
+ expectedChannel := "{namespace},channel"
+ expectedEvent := "event1___event2"
+ expectedKey := "{namespace},key"
+ expectedValue := interface{}("data")
+
+ m.On("DelIEPub", expectedChannel, expectedEvent, expectedKey, expectedValue).Return(false, nil)
+ status, err := i.RemoveIfAndPublish([]string{"channel", "event1", "channel", "event2"}, "key", "data")
+ assert.Nil(t, err)
+ assert.False(t, status)
+ m.AssertExpectations(t)
+}
+
+func TestRemoveIfAndPublishError(t *testing.T) {
+ m, i := setup()
+
+ expectedChannel := "{namespace},channel"
+ expectedEvent := "event1___event2"
+ expectedKey := "{namespace},key"
+ expectedValue := interface{}("data")
+
+ m.On("DelIEPub", expectedChannel, expectedEvent, expectedKey, expectedValue).Return(false, errors.New("Some error"))
+ status, err := i.RemoveIfAndPublish([]string{"channel", "event1", "channel", "event2"}, "key", "data")
+ assert.NotNil(t, err)
+ assert.False(t, status)
+ m.AssertExpectations(t)
+}
+
+func TestRemoveIfAndPublishIncorrectChannel(t *testing.T) {
+ m, i := setup()
+
+ expectedChannel := "{namespace},channel"
+ expectedEvent := "event"
+ expectedKey := "{namespace},key"
+ expectedValue := interface{}("data")
+
+ m.AssertNotCalled(t, "DelIEPub", expectedChannel, expectedEvent, expectedKey, expectedValue)
+ m.AssertNotCalled(t, "DelIE", expectedKey, expectedValue)
+ status, err := i.RemoveIfAndPublish([]string{"channel", "event1", "channel"}, "key", "data")
+ assert.NotNil(t, err)
+ assert.False(t, status)
+ m.AssertExpectations(t)
+}
+
+func TestRemoveIfAndPublishNoChannels(t *testing.T) {
+ m, i := setup()
+
+ expectedKey := "{namespace},key"
+ expectedValue := interface{}("data")
+
+ m.On("DelIE", expectedKey, expectedValue).Return(true, nil)
+ status, err := i.RemoveIfAndPublish([]string{}, "key", "data")
+ assert.Nil(t, err)
+ assert.True(t, status)
+ m.AssertExpectations(t)
+
+}
func TestRemoveIfSuccessfullyOkStatus(t *testing.T) {
m, i := setup()
assert.False(t, status)
m.AssertExpectations(t)
}
+
+func TestRemoveAllAndPublishSuccessfully(t *testing.T) {
+ m, i := setup()
+
+ mKeysExpected := string("{namespace},*")
+ mKeysReturn := []string{"{namespace},key1", "{namespace},key2"}
+ mDelExpected := mKeysReturn
+ expectedChannel := "{namespace},channel"
+ expectedEvent := "event"
+ m.On("Keys", mKeysExpected).Return(mKeysReturn, nil)
+ m.On("DelPub", expectedChannel, expectedEvent, mDelExpected).Return(nil)
+ err := i.RemoveAllAndPublish([]string{"channel", "event"})
+ assert.Nil(t, err)
+ m.AssertExpectations(t)
+}
+
+func TestRemoveAllAndPublishKeysReturnError(t *testing.T) {
+ m, i := setup()
+
+ mKeysExpected := string("{namespace},*")
+ mKeysReturn := []string{"{namespace},key1", "{namespace},key2"}
+ mDelExpected := mKeysReturn
+ expectedChannel := "{namespace},channel"
+ expectedEvent := "event"
+ m.On("Keys", mKeysExpected).Return(mKeysReturn, errors.New("Some error"))
+ m.AssertNotCalled(t, "DelPub", expectedChannel, expectedEvent, mDelExpected)
+ err := i.RemoveAllAndPublish([]string{"channel", "event"})
+ assert.NotNil(t, err)
+ m.AssertExpectations(t)
+}
+
+func TestRemoveAllAndPublishKeysDelReturnsError(t *testing.T) {
+ m, i := setup()
+
+ mKeysExpected := string("{namespace},*")
+ mKeysReturn := []string{"{namespace},key1", "{namespace},key2"}
+ mDelExpected := mKeysReturn
+ expectedChannel := "{namespace},channel"
+ expectedEvent := "event"
+ m.On("Keys", mKeysExpected).Return(mKeysReturn, nil)
+ m.On("DelPub", expectedChannel, expectedEvent, mDelExpected).Return(errors.New("Some error"))
+ err := i.RemoveAllAndPublish([]string{"channel", "event"})
+ assert.NotNil(t, err)
+ m.AssertExpectations(t)
+}
+
+func TestRemoveAllAndPublishKeysEventsWithIllegalCharacters(t *testing.T) {
+ m, i := setup()
+
+ mKeysExpected := string("{namespace},*")
+ mKeysReturn := []string{"{namespace},key1", "{namespace},key2"}
+ mDelExpected := mKeysReturn
+ expectedChannel := "{namespace},channel"
+ expectedEvent := "event"
+ m.On("Keys", mKeysExpected).Return(mKeysReturn, nil)
+ m.AssertNotCalled(t, "DelPub", expectedChannel, expectedEvent, mDelExpected)
+ err := i.RemoveAllAndPublish([]string{"channel", "event___anotherEvent"})
+ assert.NotNil(t, err)
+ m.AssertExpectations(t)
+
+}
+
+func TestRemoveAllAndPublishNoChannels(t *testing.T) {
+ m, i := setup()
+
+ mKeysExpected := string("{namespace},*")
+ mKeysReturn := []string{"{namespace},key1", "{namespace},key2"}
+ mDelExpected := mKeysReturn
+ m.On("Keys", mKeysExpected).Return(mKeysReturn, nil)
+ m.On("Del", mDelExpected).Return(nil)
+ m.AssertNotCalled(t, "DelPub", "", "", mDelExpected)
+ err := i.RemoveAllAndPublish([]string{})
+ assert.Nil(t, err)
+ m.AssertExpectations(t)
+}
+
+func TestRemoveAllAndPublishIncorrectChannel(t *testing.T) {
+ m, i := setup()
+
+ mKeysExpected := string("{namespace},*")
+ mKeysReturn := []string{"{namespace},key1", "{namespace},key2"}
+ mDelExpected := mKeysReturn
+ m.On("Keys", mKeysExpected).Return(mKeysReturn, nil)
+ m.AssertNotCalled(t, "DelPub", "", "", mDelExpected)
+ err := i.RemoveAllAndPublish([]string{"channel", "event", "channel2"})
+ assert.NotNil(t, err)
+ m.AssertExpectations(t)
+
+}