Support multiple event publishing 57/457/4 v0.2.1
authorMarco Tallskog <marco.tallskog@nokia.com>
Wed, 3 Jul 2019 07:25:20 +0000 (10:25 +0300)
committerMarco Tallskog <marco.tallskog@nokia.com>
Wed, 3 Jul 2019 09:28:58 +0000 (12:28 +0300)
Support for publishing multiple events at once.
Originally it was possible to give multiple channels and
events as parameter to SetAndPublish() and RemoveAndPublish() API
functions. However, only the first event was published as
used Redis didn't support multiple event publishing. This
commit takes new Redis commands into use enabling the
support for multiple event publishing.

Add unit tests for internal sdlgoredis package. To be able
to stub external redisgo module, the Create() function is
splitted so that in unit tests it is possible to provide
a stub interface instead of the real one when building
the redisgo instance. Also Subscribe() function is wrapped,
and wrapper function is provided during setup as calling
the Subscribe() for the first time in code execution,
a new interface is provided that needs to be able to mock
in unit tests.

Improve documentation. Add doc.go which provides a better
documentation how to use SDL.

Change-Id: Id072d9397fa92ea37bcc78df7db54edadf4d9e7c
Signed-off-by: Marco Tallskog <marco.tallskog@nokia.com>
doc.go [new file with mode: 0644]
go.mod
go.sum
internal/sdlgoredis/sdlgoredis.go
internal/sdlgoredis/sdlgoredis_test.go [new file with mode: 0644]
sdl.go
sdl_test.go

diff --git a/doc.go b/doc.go
new file mode 100644 (file)
index 0000000..038ee11
--- /dev/null
+++ b/doc.go
@@ -0,0 +1,135 @@
+/*
+   Copyright (c) 2019 AT&T Intellectual Property.
+   Copyright (c) 2018-2019 Nokia.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+*/
+
+/*
+Package sdlgo provides a lightweight, high-speed interface for accessing shared data storage.
+
+Shared Data Layer (SDL) is a concept where applications can use and share data using a common
+storage. The storage must be optimised for very high tranactional throughput and very low
+latency. Sdlgo is a library which provides applications an API to read and write data
+to a common storage using key-value paradigm. In addition to this, sdlgo provides an
+event mechanism that can be used to notify listeners that data was changed.
+
+This SDL version assumes that the DBAAS service provided by O-RAN community is
+working as a storage backend.
+
+Namespace
+
+A shared data layer connection is instantiated to a namespace and data is
+always read and modified within the namespace. Namespace provides data isolation across clients.
+Namespace instantiation happens when the sdlgo instance is created.
+E.g. the following code sets the SDL instance to use "example" as a namespace
+  sdl := sdlgo.NewSdlInstance("example", sdlgo.NewDatabase())
+Applications can use several namespaces at the same time by creating several sdlgo instances.
+
+Database connection
+
+In addition to creating the SDL instance, application is responsible for creating the backend
+database connection with NewDatabase() method. When the connection is created, sdlgo shall open a
+tcp connection to backend database immediatelly. The resulting connection may be used with several
+SDL instances if application needs to use several namespaces at a time. E.g.
+  connection := sdlgo.NewDatabase()
+  ns1 := sdlgo.NewSdlInstance("ns1", connection)
+  ns2 := sdlgo.NewSdlInstance("ns2", connection)
+For the backend database connection a circuit breaker design is used. If the connection fails,
+an error is returned immediatelly to application. Restoration of the connection happens
+automatically by SDL and application should retry the operation again after a while.
+
+Database service is discovered by using environment variables DBAAS_SERVICE_HOST and
+DBAAS_SERVICE_PORT. If not set, localhost and port 6379 are used by default.
+
+Keys and data
+
+Clients save key-value pairs. Keys are allways strings. The types of the values must be of a basic
+type, i.e. string, integer or byte array or slice. This means that the internal structures, like
+protobufs or JSON objects, must be serialised to a byte array or slice before passing it to SDL.
+Clients are responsible for managing the keys within a namespace.
+
+Some examples on how to set the data using different kind of input parameters:
+
+Basic usage, keys and values are given as own parameters (with mixed data types)
+  err := s.Set("key1", "value1", "key2", 2)
+
+Keys and values inside a slice (again with mixed types, thus empty interface used as a type)
+  exampleSlice := []interface{"key1", "value1", "key2", 2}
+  err := s.Set(exampleSlice)
+
+Data stored to a byte array
+  data := make([]byte), 3
+  data[0] = 1
+  data[1] = 2
+  data[2] = 3
+  s.Set("key", data)
+
+Keys and values stored into a map (byte array "data" used from previous example)
+  mapData := map[string]interface{
+    "key1" : "data",
+    "key2" : 2,
+    "key3" : data,
+  }
+
+When data is read from SDL storage, a map is returned where the requested key works as map key.
+If the key was not found, the value for the given key is nil. It is possible to request several
+key with one Get() call.
+
+Groups
+
+SDL groups are unordered collections of members where each member is unique. Using the SDL API
+it is possible to add/remove members from a group, remove the whole group or do queries like the
+size of a group and if member belongs to a group. Like key-value storage, groups are per namespace.
+
+Events
+
+Events are a publish-subscribe pattern to indicate interested parties that there has been a change
+in data. Delivery of the events are not guaranteed. In SDL, events are happening via channels and
+channels are per namespace. It is possible to publish several kinds of events through one channel.
+
+In order to publish changes to SDL data, the publisher need to call an API function that supports
+publishing. E.g.
+   err := sdl.SetAndPublish([]string{"channel1", "event1", "channel2", "event2"}, "key", "value")
+This example will publish event1 to channel1 and event2 in channel2 after writing the data.
+
+When subscribing the channels, the application needs to first create an SDL instance for the desired
+namespace. The subscription happens using the SubscribeChannel() API function. The parameters for
+the function takes a callback function and one or many channels to be subscribed. When an event is
+received for the given channel, the given callback function shall be called with one or many events.
+It is possible to make several subscriptions for different channels using different callback
+functions if different kind of handling is required.
+
+  sdl := sdlgo.NewSdlInstance("namespace", sdlgo.NewDatabase())
+
+  cb1 := func(channel string, event ...string) {
+    fmt.Printf("cb1: Received %s from channel %s\n", event, channel)
+  }
+
+  cb2 := func(channel string, event ...string) {
+    fmt.Printf("cb2: Received %s from channel %s\n", event, channel)
+  }
+
+  sdl.SubscribeChannel(cb1, "channel1", "channel2")
+  sdl.SubscribeChannel(cb2, "channel3")
+
+This example subscribes three channels from "namespace" and assigns cb1 for channel1 and channel2
+whereas channel3 is assigned to cb2.
+
+The callbacks are called from a context of a goroutine that is listening for the events. When
+application receives events, the preferred way to do the required processing of an event (e.g. read
+from SDL) is to do it in another context, e.g. by triggering applications own goroutine using Go
+channels. By doing like this, blocking the receive routine and possibly loosing events, can be
+avoided.
+*/
+package sdlgo
diff --git a/go.mod b/go.mod
index f1f39d4..e63d0dd 100644 (file)
--- a/go.mod
+++ b/go.mod
@@ -3,10 +3,9 @@ module gerrit.o-ran-sc.org/r/ric-plt/sdlgo
 go 1.12
 
 require (
-       github.com/go-redis/redis v6.15.2+incompatible
-       github.com/onsi/ginkgo v1.8.0 // indirect
-       github.com/onsi/gomega v1.5.0 // indirect
+       github.com/go-redis/redis v6.15.3+incompatible
        github.com/stretchr/testify v1.3.0
+       golang.org/x/sys v0.0.0-20190204203706-41f3e6584952 // indirect
 )
 
 replace gerrit.o-ran-sc.org/r/ric-plt/sdlgo/internal/sdlgoredis => ./internal/sdlgoredis
diff --git a/go.sum b/go.sum
index a6e2f77..4660804 100644 (file)
--- a/go.sum
+++ b/go.sum
@@ -2,8 +2,8 @@ github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8
 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
 github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
-github.com/go-redis/redis v6.15.2+incompatible h1:9SpNVG76gr6InJGxoZ6IuuxaCOQwDAhzyXg+Bs+0Sb4=
-github.com/go-redis/redis v6.15.2+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
+github.com/go-redis/redis v6.15.3+incompatible h1:NZ0O90AhLSvSrvLZ/S9h7D4kl1mW2PrKyxL7MyBKO2g=
+github.com/go-redis/redis v6.15.3+incompatible/go.mod h1:W2YCLaZryXHirdd9QqwkiVUxCQsrx8SbLq9Uqk7JS7A=
 github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM=
 github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
 github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
@@ -25,6 +25,8 @@ golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f h1:wMNYb4v58l5UBM7MYRLPG6Zh
 golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e h1:o3PsSEY8E4eXWkXrIP9YJALUkVZqzHJT5DOasTyn8Vs=
 golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190204203706-41f3e6584952 h1:FDfvYgoVsA7TTZSbgiqjAbfPbK47CNHdWl3h/PJtii0=
+golang.org/x/sys v0.0.0-20190204203706-41f3e6584952/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
 golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
 gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
index b032d8f..ab56b12 100644 (file)
@@ -22,6 +22,7 @@ import (
        "fmt"
        "os"
        "strings"
+       "time"
 
        "github.com/go-redis/redis"
 )
@@ -35,12 +36,39 @@ type intChannels struct {
 }
 
 type DB struct {
-       client       *redis.Client
+       client       RedisClient
+       subscribe    SubscribeFn
        redisModules bool
        cbMap        map[string]ChannelNotificationCb
        ch           intChannels
 }
 
+type Subscriber interface {
+       Channel() <-chan *redis.Message
+       Subscribe(channels ...string) error
+       Unsubscribe(channels ...string) error
+       Close() error
+}
+
+type SubscribeFn func(client RedisClient, channels ...string) Subscriber
+
+type RedisClient interface {
+       Command() *redis.CommandsInfoCmd
+       Close() error
+       Subscribe(channels ...string) *redis.PubSub
+       MSet(pairs ...interface{}) *redis.StatusCmd
+       Do(args ...interface{}) *redis.Cmd
+       MGet(keys ...string) *redis.SliceCmd
+       Del(keys ...string) *redis.IntCmd
+       Keys(pattern string) *redis.StringSliceCmd
+       SetNX(key string, value interface{}, expiration time.Duration) *redis.BoolCmd
+       SAdd(key string, members ...interface{}) *redis.IntCmd
+       SRem(key string, members ...interface{}) *redis.IntCmd
+       SMembers(key string) *redis.StringSliceCmd
+       SIsMember(key string, member interface{}) *redis.BoolCmd
+       SCard(key string) *redis.IntCmd
+}
+
 func checkResultAndError(result interface{}, err error) (bool, error) {
        if err != nil {
                if err == redis.Nil {
@@ -50,21 +78,38 @@ func checkResultAndError(result interface{}, err error) (bool, error) {
        }
        if result == "OK" {
                return true, nil
-       } else {
-               return false, nil
        }
+       return false, nil
 }
 
 func checkIntResultAndError(result interface{}, err error) (bool, error) {
        if err != nil {
                return false, err
        }
-       if result.(int64) == 1 {
+       if result == 1 {
                return true, nil
-       } else {
-               return false, nil
        }
+       return false, nil
+}
 
+func subscribeNotifications(client RedisClient, channels ...string) Subscriber {
+       return client.Subscribe(channels...)
+}
+
+func CreateDB(client RedisClient, subscribe SubscribeFn) *DB {
+       db := DB{
+               client:       client,
+               subscribe:    subscribe,
+               redisModules: true,
+               cbMap:        make(map[string]ChannelNotificationCb, 0),
+               ch: intChannels{
+                       addChannel:    make(chan string),
+                       removeChannel: make(chan string),
+                       exit:          make(chan bool),
+               },
+       }
+
+       return &db
 }
 
 func Create() *DB {
@@ -83,21 +128,16 @@ func Create() *DB {
                DB:       0,  // use default DB
                PoolSize: 20,
        })
+       db := CreateDB(client, subscribeNotifications)
+       db.CheckCommands()
+       return db
+}
 
-       db := DB{
-               client:       client,
-               redisModules: true,
-               cbMap:        make(map[string]ChannelNotificationCb, 0),
-               ch: intChannels{
-                       addChannel:    make(chan string),
-                       removeChannel: make(chan string),
-                       exit:          make(chan bool),
-               },
-       }
-
+func (db *DB) CheckCommands() {
        commands, err := db.client.Command().Result()
        if err == nil {
-               redisModuleCommands := []string{"setie", "delie", "msetpub", "setiepub", "setnxpub", "delpub"}
+               redisModuleCommands := []string{"setie", "delie", "setiepub", "setnxpub",
+                       "msetmpub", "delmpub"}
                for _, v := range redisModuleCommands {
                        _, ok := commands[v]
                        if !ok {
@@ -107,7 +147,6 @@ func Create() *DB {
        } else {
                fmt.Println(err)
        }
-       return &db
 }
 
 func (db *DB) CloseDB() error {
@@ -135,7 +174,7 @@ func (db *DB) SubscribeChannelDB(cb ChannelNotificationCb, channelPrefix, eventS
                        eventSeparator string,
                        ch intChannels,
                        channels ...string) {
-                       sub := db.client.Subscribe(channels...)
+                       sub := db.subscribe(db.client, channels...)
                        rxChannel := sub.Channel()
                        for {
                                select {
@@ -171,16 +210,20 @@ func (db *DB) MSet(pairs ...interface{}) error {
        return db.client.MSet(pairs...).Err()
 }
 
-func (db *DB) MSetPub(channel, message string, pairs ...interface{}) error {
+func (db *DB) MSetMPub(channelsAndEvents []string, pairs ...interface{}) error {
        if !db.redisModules {
-               return errors.New("Redis deployment doesn't support MSETPUB command")
+               return errors.New("Redis deployment doesn't support MSETMPUB command")
        }
        command := make([]interface{}, 0)
-       command = append(command, "MSETPUB")
+       command = append(command, "MSETMPUB")
+       command = append(command, len(pairs)/2)
+       command = append(command, len(channelsAndEvents)/2)
        for _, d := range pairs {
                command = append(command, d)
        }
-       command = append(command, channel, message)
+       for _, d := range channelsAndEvents {
+               command = append(command, d)
+       }
        _, err := db.client.Do(command...).Result()
        return err
 }
@@ -189,18 +232,23 @@ func (db *DB) MGet(keys []string) ([]interface{}, error) {
        return db.client.MGet(keys...).Result()
 }
 
-func (db *DB) DelPub(channel, message string, keys []string) error {
+func (db *DB) DelMPub(channelsAndEvents []string, keys []string) error {
        if !db.redisModules {
-               return errors.New("Redis deployment not supporting command DELPUB")
+               return errors.New("Redis deployment not supporting command DELMPUB")
        }
        command := make([]interface{}, 0)
-       command = append(command, "DELPUB")
+       command = append(command, "DELMPUB")
+       command = append(command, len(keys))
+       command = append(command, len(channelsAndEvents)/2)
        for _, d := range keys {
                command = append(command, d)
        }
-       command = append(command, channel, message)
+       for _, d := range channelsAndEvents {
+               command = append(command, d)
+       }
        _, err := db.client.Do(command...).Result()
        return err
+
 }
 
 func (db *DB) Del(keys []string) error {
diff --git a/internal/sdlgoredis/sdlgoredis_test.go b/internal/sdlgoredis/sdlgoredis_test.go
new file mode 100644 (file)
index 0000000..81c211a
--- /dev/null
@@ -0,0 +1,702 @@
+
+/*
+   Copyright (c) 2019 AT&T Intellectual Property.
+   Copyright (c) 2018-2019 Nokia.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+*/
+
+package sdlgoredis_test
+
+import (
+       "testing"
+       "errors"
+       "time"
+
+       "github.com/go-redis/redis"
+       "gerrit.o-ran-sc.org/r/ric-plt/sdlgo/internal/sdlgoredis"
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/mock"
+)
+
+type clientMock struct {
+       mock.Mock
+}
+
+type pubSubMock struct {
+       mock.Mock
+}
+
+func (m *pubSubMock) Channel() <-chan *redis.Message {
+       return m.Called().Get(0).(chan *redis.Message)
+}
+
+func (m *pubSubMock) Subscribe(channels ...string) error {
+       return m.Called().Error(0)
+}
+
+func (m *pubSubMock) Unsubscribe(channels ...string) error {
+       return m.Called().Error(0)
+}
+
+func (m *pubSubMock) Close() error {
+       return m.Called().Error(0)
+}
+
+func (m *clientMock) Command() *redis.CommandsInfoCmd {
+       return m.Called().Get(0).(*redis.CommandsInfoCmd)
+}
+
+func (m *clientMock) Close() error {
+       return m.Called().Error(0)
+}
+
+func (m *clientMock) Subscribe(channels ...string) *redis.PubSub {
+       return m.Called(channels).Get(0).(*redis.PubSub)
+}
+
+func (m *clientMock) MSet(pairs ...interface{}) *redis.StatusCmd {
+       return m.Called(pairs).Get(0).(*redis.StatusCmd)
+}
+
+func (m *clientMock) Do(args ...interface{}) *redis.Cmd {
+       return m.Called(args).Get(0).(*redis.Cmd)
+}
+
+func (m *clientMock) MGet(keys ...string) *redis.SliceCmd {
+       return m.Called(keys).Get(0).(*redis.SliceCmd)
+}
+
+func (m *clientMock) Del(keys ...string) *redis.IntCmd {
+       return m.Called(keys).Get(0).(*redis.IntCmd)
+}
+
+func (m *clientMock) Keys(pattern string) *redis.StringSliceCmd {
+       return m.Called(pattern).Get(0).(*redis.StringSliceCmd)
+}
+
+func (m *clientMock) SetNX(key string, value interface{}, expiration time.Duration) *redis.BoolCmd {
+       return m.Called(key, value, expiration).Get(0).(*redis.BoolCmd)
+}
+
+func (m *clientMock) SAdd(key string, members ...interface{}) *redis.IntCmd {
+       return m.Called(key, members).Get(0).(*redis.IntCmd)
+}
+
+func (m *clientMock) SRem(key string, members ...interface{}) *redis.IntCmd {
+       return m.Called(key, members).Get(0).(*redis.IntCmd)
+}
+
+func (m *clientMock) SMembers(key string) *redis.StringSliceCmd {
+       return m.Called(key).Get(0).(*redis.StringSliceCmd)
+}
+
+func (m *clientMock) SIsMember(key string, member interface{}) *redis.BoolCmd {
+       return m.Called(key, member).Get(0).(*redis.BoolCmd)
+}
+
+func (m *clientMock) SCard(key string) *redis.IntCmd {
+       return m.Called(key).Get(0).(*redis.IntCmd)
+}
+
+func setSubscribeNotifications() (*pubSubMock, sdlgoredis.SubscribeFn) {
+       mock := new(pubSubMock)
+       return mock, func(client sdlgoredis.RedisClient, channels ...string) sdlgoredis.Subscriber {
+               return mock
+       }
+}
+
+func setup(commandsExists bool) (*pubSubMock, *clientMock, *sdlgoredis.DB) {
+       mock := new(clientMock)
+       pubSubMock, subscribeNotifications := setSubscribeNotifications()
+       db := sdlgoredis.CreateDB(mock, subscribeNotifications)
+
+       dummyCommandInfo := redis.CommandInfo{
+               Name: "dummy",
+       }
+       cmdResult := make(map[string]*redis.CommandInfo, 0)
+
+       if commandsExists {
+               cmdResult = map[string]*redis.CommandInfo{
+                       "setie": &dummyCommandInfo,
+                       "delie": &dummyCommandInfo,
+                       "setiepub": &dummyCommandInfo,
+                       "setnxpub": &dummyCommandInfo,
+                       "msetmpub": &dummyCommandInfo,
+                       "delmpub": &dummyCommandInfo,
+               }
+       } else {
+               cmdResult = map[string]*redis.CommandInfo{
+                       "dummy": &dummyCommandInfo,
+               }
+       }
+
+       mock.On("Command").Return(redis.NewCommandsInfoCmdResult(cmdResult, nil))
+       db.CheckCommands()
+       return pubSubMock, mock, db
+}
+
+func TestMSetSuccessfully(t *testing.T) {
+       _, r, db := setup(true)
+       expectedKeysAndValues := []interface{}{"key1", "value1", "key2", 2}
+       r.On("MSet",expectedKeysAndValues).Return(redis.NewStatusResult("OK", nil))
+       err := db.MSet("key1", "value1", "key2", 2)
+       assert.Nil(t, err)
+       r.AssertExpectations(t)
+}
+
+func TestMSetFailure(t *testing.T) {
+       _, r, db := setup(true)
+       expectedKeysAndValues := []interface{}{"key1", "value1", "key2", 2}
+       r.On("MSet",expectedKeysAndValues).Return(redis.NewStatusResult("OK", errors.New("Some error")))
+       err := db.MSet("key1", "value1", "key2", 2)
+       assert.NotNil(t, err)
+       r.AssertExpectations(t)
+}
+
+func TestMSetMPubSuccessfully(t *testing.T) {
+       _, r, db := setup(true)
+       expectedMessage := []interface{}{"MSETMPUB", 2, 2, "key1", "val1", "key2", "val2",
+                                                                        "chan1", "event1", "chan2", "event2"}
+       r.On("Do", expectedMessage).Return(redis.NewCmdResult("", nil))
+       assert.Nil(t, db.MSetMPub([]string{"chan1", "event1", "chan2", "event2"},
+                                                         "key1", "val1", "key2", "val2"))
+       r.AssertExpectations(t)
+}
+
+func TestMsetMPubFailure(t *testing.T) {
+       _, r, db := setup(true)
+       expectedMessage := []interface{}{"MSETMPUB", 2, 2, "key1", "val1", "key2", "val2",
+                                                                        "chan1", "event1", "chan2", "event2"}
+       r.On("Do", expectedMessage).Return(redis.NewCmdResult("", errors.New("Some error")))
+       assert.NotNil(t, db.MSetMPub([]string{"chan1", "event1", "chan2", "event2"},
+                                                            "key1", "val1", "key2", "val2"))
+       r.AssertExpectations(t)
+}
+
+func TestMSetMPubCommandMissing(t *testing.T) {
+       _, r, db := setup(false)
+       expectedMessage := []interface{}{"MSETMPUB", 2, 2, "key1", "val1", "key2", "val2",
+                                                                        "chan1", "event1", "chan2", "event2"}
+       r.AssertNotCalled(t, "Do", expectedMessage)
+       assert.NotNil(t, db.MSetMPub([]string{"chan1", "event1", "chan2", "event2"},
+                                                                "key1", "val1", "key2", "val2"))
+       r.AssertExpectations(t)
+
+}
+
+func TestMGetSuccessfully(t *testing.T) {
+       _, r, db := setup(true)
+       expectedKeys := []string{"key1", "key2", "key3"}
+       expectedResult := []interface{}{"val1", 2, nil}
+       r.On("MGet", expectedKeys).Return(redis.NewSliceResult(expectedResult, nil))
+       result, err := db.MGet([]string{"key1", "key2", "key3"})
+       assert.Equal(t, result, expectedResult)
+       assert.Nil(t, err)
+       r.AssertExpectations(t)
+}
+
+func TestMGetFailure(t *testing.T) {
+       _, r, db := setup(true)
+       expectedKeys := []string{"key1", "key2", "key3"}
+       expectedResult := []interface{}{nil}
+       r.On("MGet", expectedKeys).Return(redis.NewSliceResult(expectedResult,
+                                                                                                                  errors.New("Some error")))
+       result, err := db.MGet([]string{"key1", "key2", "key3"})
+       assert.Equal(t, result, expectedResult)
+       assert.NotNil(t, err)
+       r.AssertExpectations(t)
+}
+
+func TestDelMPubSuccessfully(t *testing.T) {
+       _, r, db := setup(true)
+       expectedMessage := []interface{}{"DELMPUB", 2, 2, "key1", "key2", "chan1", "event1",
+                                                                        "chan2", "event2"}
+       r.On("Do", expectedMessage).Return(redis.NewCmdResult("", nil))
+       assert.Nil(t, db.DelMPub([]string{"chan1", "event1", "chan2", "event2"},
+                                                        []string{"key1", "key2"}))
+       r.AssertExpectations(t)
+}
+
+func TestDelMPubFailure(t *testing.T) {
+       _, r, db := setup(true)
+       expectedMessage := []interface{}{"DELMPUB", 2, 2, "key1", "key2", "chan1", "event1",
+                                                                        "chan2", "event2"}
+       r.On("Do", expectedMessage).Return(redis.NewCmdResult("", errors.New("Some error")))
+       assert.NotNil(t, db.DelMPub([]string{"chan1", "event1", "chan2", "event2"},
+                                                               []string{"key1", "key2"}))
+       r.AssertExpectations(t)
+}
+
+func TestDelMPubCommandMissing(t *testing.T) {
+       _, r, db := setup(false)
+       expectedMessage := []interface{}{"DELMPUB", 2, 2, "key1", "key2", "chan1", "event1",
+                                                                        "chan2", "event2"}
+       r.AssertNotCalled(t, "Do", expectedMessage)
+       assert.NotNil(t, db.DelMPub([]string{"chan1", "event1", "chan2", "event2"},
+                                                               []string{"key1", "key2"}))
+       r.AssertExpectations(t)
+}
+
+func TestDelSuccessfully(t *testing.T) {
+       _, r, db := setup(true)
+       expectedKeys := []string{"key1", "key2"}
+       r.On("Del", expectedKeys).Return(redis.NewIntResult(2, nil))
+       assert.Nil(t, db.Del([]string{"key1", "key2"}))
+       r.AssertExpectations(t)
+}
+
+func TestDelFailure(t *testing.T) {
+       _, r, db := setup(true)
+       expectedKeys := []string{"key1", "key2"}
+       r.On("Del", expectedKeys).Return(redis.NewIntResult(2, errors.New("Some error")))
+       assert.NotNil(t, db.Del([]string{"key1", "key2"}))
+       r.AssertExpectations(t)
+}
+
+func TestKeysSuccessfully(t *testing.T) {
+       _, r, db := setup(true)
+       expectedPattern := "pattern*"
+       expectedResult := []string{"pattern1", "pattern2"}
+       r.On("Keys", expectedPattern).Return(redis.NewStringSliceResult(expectedResult, nil))
+       result, err := db.Keys("pattern*")
+       assert.Equal(t, result, expectedResult)
+       assert.Nil(t, err)
+       r.AssertExpectations(t)
+}
+
+func TestKeysFailure(t *testing.T) {
+       _, r, db := setup(true)
+       expectedPattern := "pattern*"
+       expectedResult := []string{}
+       r.On("Keys", expectedPattern).Return(redis.NewStringSliceResult(expectedResult,
+                                                                                                                                   errors.New("Some error")))
+       _, err := db.Keys("pattern*")
+       assert.NotNil(t, err)
+       r.AssertExpectations(t)
+}
+
+func TestSetIEKeyExists(t *testing.T) {
+       _, r, db := setup(true)
+       expectedMessage := []interface{}{"SETIE", "key", "newdata", "olddata"}
+       r.On("Do", expectedMessage).Return(redis.NewCmdResult("OK", nil))
+       result, err := db.SetIE("key", "olddata", "newdata")
+       assert.True(t, result)
+       assert.Nil(t, err)
+       r.AssertExpectations(t)
+}
+
+func TestSetIEKeyDoesntExists(t *testing.T) {
+       _, r, db := setup(true)
+       expectedMessage := []interface{}{"SETIE", "key", "newdata", "olddata"}
+       r.On("Do", expectedMessage).Return(redis.NewCmdResult(nil, nil))
+       result, err := db.SetIE("key", "olddata", "newdata")
+       assert.False(t, result)
+       assert.Nil(t, err)
+       r.AssertExpectations(t)
+}
+
+func TestSetIEFailure(t *testing.T) {
+       _, r, db := setup(true)
+       expectedMessage := []interface{}{"SETIE", "key", "newdata", "olddata"}
+       r.On("Do", expectedMessage).Return(redis.NewCmdResult(nil, errors.New("Some error")))
+       result, err := db.SetIE("key", "olddata", "newdata")
+       assert.False(t, result)
+       assert.NotNil(t, err)
+       r.AssertExpectations(t)
+}
+
+func TestSetIECommandMissing(t *testing.T) {
+       _, r, db := setup(false)
+       expectedMessage := []interface{}{"SETIE", "key", "newdata", "olddata"}
+       r.AssertNotCalled(t, "Do", expectedMessage)
+       result, err := db.SetIE("key", "olddata", "newdata")
+       assert.False(t, result)
+       assert.NotNil(t, err)
+       r.AssertExpectations(t)
+}
+
+func TestSetIEPubKeyExists(t *testing.T) {
+       _, r, db := setup(true)
+       expectedMessage := []interface{}{"SETIEPUB", "key", "newdata", "olddata", "channel", "message"}
+       r.On("Do", expectedMessage).Return(redis.NewCmdResult("OK", nil))
+       result, err := db.SetIEPub("channel", "message", "key", "olddata", "newdata")
+       assert.True(t, result)
+       assert.Nil(t, err)
+       r.AssertExpectations(t)
+}
+
+func TestSetIEPubKeyDoesntExists(t *testing.T) {
+       _, r, db := setup(true)
+       expectedMessage := []interface{}{"SETIEPUB", "key", "newdata", "olddata", "channel", "message"}
+       r.On("Do", expectedMessage).Return(redis.NewCmdResult(nil, nil))
+       result, err := db.SetIEPub("channel", "message", "key", "olddata", "newdata")
+       assert.False(t, result)
+       assert.Nil(t, err)
+       r.AssertExpectations(t)
+}
+
+func TestSetIEPubFailure(t *testing.T) {
+       _, r, db := setup(true)
+       expectedMessage := []interface{}{"SETIEPUB", "key", "newdata", "olddata", "channel", "message"}
+       r.On("Do", expectedMessage).Return(redis.NewCmdResult(nil, errors.New("Some error")))
+       result, err := db.SetIEPub("channel", "message", "key", "olddata", "newdata")
+       assert.False(t, result)
+       assert.NotNil(t, err)
+       r.AssertExpectations(t)
+}
+
+func TestSetIEPubCommandMissing(t *testing.T) {
+       _, r, db := setup(false)
+       expectedMessage := []interface{}{"SETIEPUB", "key", "newdata", "olddata", "channel", "message"}
+       r.AssertNotCalled(t, "Do", expectedMessage)
+       result, err := db.SetIEPub("channel", "message", "key", "olddata", "newdata")
+       assert.False(t, result)
+       assert.NotNil(t, err)
+       r.AssertExpectations(t)
+}
+
+func TestSetNXPubKeyDoesntExist(t *testing.T) {
+       _, r, db := setup(true)
+       expectedMessage := []interface{}{"SETNXPUB", "key", "data", "channel", "message"}
+       r.On("Do", expectedMessage).Return(redis.NewCmdResult("OK", nil))
+       result, err := db.SetNXPub("channel", "message", "key", "data")
+       assert.True(t, result)
+       assert.Nil(t, err)
+       r.AssertExpectations(t)
+}
+
+func TestSetNXPubKeyExists(t *testing.T) {
+       _, r, db := setup(true)
+       expectedMessage := []interface{}{"SETNXPUB", "key", "data", "channel", "message"}
+       r.On("Do", expectedMessage).Return(redis.NewCmdResult(nil, nil))
+       result, err := db.SetNXPub("channel", "message", "key", "data")
+       assert.False(t, result)
+       assert.Nil(t, err)
+       r.AssertExpectations(t)
+}
+
+func TestSetNXPubFailure(t *testing.T) {
+       _, r, db := setup(true)
+       expectedMessage := []interface{}{"SETNXPUB", "key", "data", "channel", "message"}
+       r.On("Do", expectedMessage).Return(redis.NewCmdResult(nil, errors.New("Some error")))
+       result, err := db.SetNXPub("channel", "message", "key", "data")
+       assert.False(t, result)
+       assert.NotNil(t, err)
+       r.AssertExpectations(t)
+}
+
+func TestSetNXPubCommandMissing(t *testing.T) {
+       _, r, db := setup(false)
+       expectedMessage := []interface{}{"SETNXPUB", "key", "data", "channel", "message"}
+       r.AssertNotCalled(t, "Do", expectedMessage)
+       result, err := db.SetNXPub("channel", "message", "key", "data")
+       assert.False(t, result)
+       assert.NotNil(t, err)
+       r.AssertExpectations(t)
+}
+
+func TestSetNXSuccessfully(t *testing.T) {
+       _, r, db := setup(true)
+       expectedKey := "key"
+       expectedData := "data"
+       r.On("SetNX", expectedKey, expectedData, time.Duration(0)).Return(redis.NewBoolResult(true, nil))
+       result, err := db.SetNX("key", "data")
+       assert.True(t, result)
+       assert.Nil(t, err)
+       r.AssertExpectations(t)
+}
+
+func TestSetNXUnsuccessfully(t *testing.T) {
+       _, r, db := setup(true)
+       expectedKey := "key"
+       expectedData := "data"
+       r.On("SetNX", expectedKey, expectedData, time.Duration(0)).Return(redis.NewBoolResult(false, nil))
+       result, err := db.SetNX("key", "data")
+       assert.False(t, result)
+       assert.Nil(t, err)
+       r.AssertExpectations(t)
+}
+
+func TestSetNXFailure(t *testing.T) {
+       _, r, db := setup(true)
+       expectedKey := "key"
+       expectedData := "data"
+       r.On("SetNX", expectedKey, expectedData, time.Duration(0)).
+               Return(redis.NewBoolResult(false,errors.New("Some error")))
+       result, err := db.SetNX("key", "data")
+       assert.False(t, result)
+       assert.NotNil(t, err)
+       r.AssertExpectations(t)
+}
+
+func TestDelIEPubKeyDoesntExist(t *testing.T) {
+       _, r, db := setup(true)
+       expectedMessage := []interface{}{"DELIEPUB", "key", "data", "channel", "message"}
+       r.On("Do", expectedMessage).Return(redis.NewCmdResult(0, nil))
+       result, err := db.DelIEPub("channel", "message", "key", "data")
+       assert.False(t, result)
+       assert.Nil(t, err)
+       r.AssertExpectations(t)
+}
+
+func TestDelIEPubKeyExists(t *testing.T) {
+       _, r, db := setup(true)
+       expectedMessage := []interface{}{"DELIEPUB", "key", "data", "channel", "message"}
+       r.On("Do", expectedMessage).Return(redis.NewCmdResult(1, nil))
+       result, err := db.DelIEPub("channel", "message", "key", "data")
+       assert.True(t, result)
+       assert.Nil(t, err)
+       r.AssertExpectations(t)
+}
+
+func TestDelIEPubFailure(t *testing.T) {
+       _, r, db := setup(true)
+       expectedMessage := []interface{}{"DELIEPUB", "key", "data", "channel", "message"}
+       r.On("Do", expectedMessage).Return(redis.NewCmdResult(0, errors.New("Some error")))
+       result, err := db.DelIEPub("channel", "message", "key", "data")
+       assert.False(t, result)
+       assert.NotNil(t, err)
+       r.AssertExpectations(t)
+}
+
+func TestDelIEPubCommandMissing(t *testing.T) {
+       _, r, db := setup(false)
+       expectedMessage := []interface{}{"DELIEPUB", "key", "data", "channel", "message"}
+       r.AssertNotCalled(t, "Do", expectedMessage)
+       result, err := db.DelIEPub("channel", "message", "key", "data")
+       assert.False(t, result)
+       assert.NotNil(t, err)
+       r.AssertExpectations(t)
+}
+
+func TestDelIEKeyDoesntExist(t *testing.T) {
+       _, r, db := setup(true)
+       expectedMessage := []interface{}{"DELIE", "key", "data"}
+       r.On("Do", expectedMessage).Return(redis.NewCmdResult(0, nil))
+       result, err := db.DelIE("key", "data")
+       assert.False(t, result)
+       assert.Nil(t, err)
+       r.AssertExpectations(t)
+}
+
+func TestDelIEKeyExists(t *testing.T) {
+       _, r, db := setup(true)
+       expectedMessage := []interface{}{"DELIE", "key", "data"}
+       r.On("Do", expectedMessage).Return(redis.NewCmdResult(1, nil))
+       result, err := db.DelIE("key", "data")
+       assert.True(t, result)
+       assert.Nil(t, err)
+       r.AssertExpectations(t)
+}
+
+func TestDelIEFailure(t *testing.T) {
+       _, r, db := setup(true)
+       expectedMessage := []interface{}{"DELIE", "key", "data"}
+       r.On("Do", expectedMessage).Return(redis.NewCmdResult(0, errors.New("Some error")))
+       result, err := db.DelIE("key", "data")
+       assert.False(t, result)
+       assert.NotNil(t, err)
+       r.AssertExpectations(t)
+}
+
+func TestDelIECommandMissing(t *testing.T) {
+       _, r, db := setup(false)
+       expectedMessage := []interface{}{"DELIE", "key", "data"}
+       r.AssertNotCalled(t, "Do", expectedMessage)
+       result, err := db.DelIE("key", "data")
+       assert.False(t, result)
+       assert.NotNil(t, err)
+       r.AssertExpectations(t)
+}
+
+func TestSAddSuccessfully(t *testing.T) {
+       _, r, db := setup(true)
+       expectedKey := "key"
+       expectedData := []interface{}{"data", 2}
+       r.On("SAdd", expectedKey, expectedData).Return(redis.NewIntResult(2, nil))
+       assert.Nil(t, db.SAdd("key", "data", 2))
+       r.AssertExpectations(t)
+}
+
+func TestSAddFailure(t *testing.T) {
+       _, r, db := setup(true)
+       expectedKey := "key"
+       expectedData := []interface{}{"data", 2}
+       r.On("SAdd", expectedKey, expectedData).Return(redis.NewIntResult(2, errors.New("Some error")))
+       assert.NotNil(t, db.SAdd("key", "data", 2))
+       r.AssertExpectations(t)
+}
+
+func TestSRemSuccessfully(t *testing.T) {
+       _, r, db := setup(true)
+       expectedKey := "key"
+       expectedData := []interface{}{"data", 2}
+       r.On("SRem", expectedKey, expectedData).Return(redis.NewIntResult(2, nil))
+       assert.Nil(t, db.SRem("key", "data", 2))
+       r.AssertExpectations(t)
+}
+
+func TestSRemFailure(t *testing.T) {
+       _, r, db := setup(true)
+       expectedKey := "key"
+       expectedData := []interface{}{"data", 2}
+       r.On("SRem", expectedKey, expectedData).Return(redis.NewIntResult(2, errors.New("Some error")))
+       assert.NotNil(t, db.SRem("key", "data", 2))
+       r.AssertExpectations(t)
+}
+
+func TestSMembersSuccessfully(t *testing.T) {
+       _, r, db := setup(true)
+       expectedKey := "key"
+       expectedResult := []string{"member1", "member2"}
+       r.On("SMembers", expectedKey).Return(redis.NewStringSliceResult(expectedResult, nil))
+       result, err := db.SMembers("key")
+       assert.Equal(t, result, expectedResult)
+       assert.Nil(t, err)
+       r.AssertExpectations(t)
+}
+
+func TestSMembersFailure(t *testing.T) {
+       _, r, db := setup(true)
+       expectedKey := "key"
+       expectedResult := []string{"member1", "member2"}
+       r.On("SMembers", expectedKey).Return(redis.NewStringSliceResult(expectedResult,
+                                                                                                                                       errors.New("Some error")))
+       result, err := db.SMembers("key")
+       assert.Equal(t, result, expectedResult)
+       assert.NotNil(t, err)
+       r.AssertExpectations(t)
+}
+
+func TestSIsMemberIsMember(t *testing.T) {
+       _, r, db := setup(true)
+       expectedKey := "key"
+       expectedData := "data"
+       r.On("SIsMember", expectedKey, expectedData).Return(redis.NewBoolResult(true, nil))
+       result, err := db.SIsMember("key", "data")
+       assert.True(t, result)
+       assert.Nil(t, err)
+       r.AssertExpectations(t)
+}
+
+func TestSIsMemberIsNotMember(t *testing.T) {
+       _, r, db := setup(true)
+       expectedKey := "key"
+       expectedData := "data"
+       r.On("SIsMember", expectedKey, expectedData).Return(redis.NewBoolResult(false, nil))
+       result, err := db.SIsMember("key", "data")
+       assert.False(t, result)
+       assert.Nil(t, err)
+       r.AssertExpectations(t)
+}
+
+func TestSIsMemberFailure(t *testing.T) {
+       _, r, db := setup(true)
+       expectedKey := "key"
+       expectedData := "data"
+       r.On("SIsMember", expectedKey, expectedData).
+               Return(redis.NewBoolResult(false, errors.New("Some error")))
+       result, err := db.SIsMember("key", "data")
+       assert.False(t, result)
+       assert.NotNil(t, err)
+       r.AssertExpectations(t)
+}
+
+func TestSCardSuccessfully(t *testing.T) {
+       _, r, db := setup(true)
+       expectedKey := "key"
+       r.On("SCard", expectedKey).Return(redis.NewIntResult(1, nil))
+       result, err := db.SCard("key")
+       assert.Equal(t, int64(1), result)
+       assert.Nil(t, err)
+       r.AssertExpectations(t)
+}
+
+func TestSCardFailure(t *testing.T) {
+       _, r, db := setup(true)
+       expectedKey := "key"
+       r.On("SCard", expectedKey).Return(redis.NewIntResult(1, errors.New("Some error")))
+       result, err := db.SCard("key")
+       assert.Equal(t, int64(1), result)
+       assert.NotNil(t, err)
+       r.AssertExpectations(t)
+}
+
+func TestSubscribeChannelDBSubscribeRXUnsubscribe(t *testing.T) {
+       ps, r, db := setup(true)
+       ch := make(chan *redis.Message)
+       msg := redis.Message{
+               Channel: "{prefix}channel",
+               Pattern: "pattern",
+               Payload: "event",
+       }
+       ps.On("Channel").Return(ch)
+       ps.On("Unsubscribe").Return(nil)
+       ps.On("Close").Return(nil)
+       count := 0
+       receivedChannel := ""
+       db.SubscribeChannelDB(func(channel string, payload ...string){
+               count++
+               receivedChannel = channel
+               },"{prefix}", "---", "{prefix}channel")
+       ch <- &msg
+       db.UnsubscribeChannelDB("{prefix}channel")
+       time.Sleep(1 * time.Second)
+       assert.Equal(t, 1, count)
+       assert.Equal(t, "channel", receivedChannel)
+       r.AssertExpectations(t)
+       ps.AssertExpectations(t)
+}
+
+func TestSubscribeChannelDBSubscribeTwoUnsubscribeOne(t *testing.T) {
+       ps, r, db := setup(true)
+       ch := make(chan *redis.Message)
+       msg1 := redis.Message{
+               Channel: "{prefix}channel1",
+               Pattern: "pattern",
+               Payload: "event",
+       }
+       msg2 := redis.Message{
+               Channel: "{prefix}channel2",
+               Pattern: "pattern",
+               Payload: "event",
+       }
+       ps.On("Channel").Return(ch)
+       ps.On("Subscribe").Return(nil)
+       ps.On("Unsubscribe").Return(nil)
+       ps.On("Unsubscribe").Return(nil)
+       ps.On("Close").Return(nil)
+       count := 0
+       receivedChannel1 := ""
+       db.SubscribeChannelDB(func(channel string, payload ...string){
+               count++
+               receivedChannel1 = channel
+               },"{prefix}", "---", "{prefix}channel1")
+       ch <- &msg1
+       receivedChannel2 := ""
+       db.SubscribeChannelDB(func(channel string, payload ...string){
+               count++
+               receivedChannel2 = channel
+               },"{prefix}", "---", "{prefix}channel2")
+
+       db.UnsubscribeChannelDB("{prefix}channel1")
+       ch <- &msg2
+       db.UnsubscribeChannelDB("{prefix}channel2")
+       time.Sleep(1 * time.Second)
+       assert.Equal(t, 2, count)
+       assert.Equal(t, "channel1", receivedChannel1)
+       assert.Equal(t, "channel2", receivedChannel2)
+       r.AssertExpectations(t)
+       ps.AssertExpectations(t)
+}
\ No newline at end of file
diff --git a/sdl.go b/sdl.go
index e25a85d..78dd15d 100644 (file)
--- a/sdl.go
+++ b/sdl.go
@@ -114,6 +114,12 @@ func (s *SdlInstance) setNamespaceToKeys(pairs ...interface{}) ([]interface{}, e
        for _, v := range pairs {
                reflectType := reflect.TypeOf(v)
                switch reflectType.Kind() {
+               case reflect.Map:
+                       x := reflect.ValueOf(v).MapRange()
+                       for x.Next() {
+                               retVal = append(retVal, s.nsPrefix+x.Key().Interface().(string))
+                               retVal = append(retVal, x.Value().Interface())
+                       }
                case reflect.Slice:
                        if shouldBeKey {
                                x := reflect.ValueOf(v)
@@ -193,21 +199,20 @@ func (s *SdlInstance) prepareChannelsAndEvents(channelsAndEvents []string) []str
        return retVal
 }
 
-//SetAndPublish function writes data to shared data layer storage and send an event to
+//SetAndPublish function writes data to shared data layer storage and sends an event to
 //a channel. Writing is done atomically, i.e. all succeeds or fails.
 //Data to be written is given as key-value pairs. Several key-value
 //pairs can be written with one call.
 //The key is expected to be string whereas value can be anything, string,
 //number, slice array or map
 //
+//If data was set successfully, an event is sent to a channel.
 //Channels and events are given as pairs is channelsAndEvents parameter.
-//Although it is possible to give sevral channel-event pairs, current implementation
-//supports sending events to one channel only due to missing support in DB backend.
+//It is possible to send several events to several channels by giving several
+//channel-event pairs.
+//  E.g. []{"channel1", "event1", "channel2", "event2", "channel1", "event3"}
+//will send event1 and event3 to channel1 and event2 to channel2.
 func (s *SdlInstance) SetAndPublish(channelsAndEvents []string, pairs ...interface{}) error {
-       if len(pairs)%2 != 0 {
-               return errors.New("Invalid pairs parameter")
-       }
-
        keyAndData, err := s.setNamespaceToKeys(pairs...)
        if err != nil {
                return err
@@ -219,7 +224,7 @@ func (s *SdlInstance) SetAndPublish(channelsAndEvents []string, pairs ...interfa
                return err
        }
        channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
-       return s.MSetPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], keyAndData...)
+       return s.MSetMPub(channelsAndEventsPrepared, keyAndData...)
 }
 
 //Set function writes data to shared data layer storage. Writing is done
@@ -306,7 +311,10 @@ func (s *SdlInstance) SetIfNotExists(key string, data interface{}) (bool, error)
 }
 
 //RemoveAndPublish removes data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
-//An event is published into a given channel if remove operation is successfull.
+//Trying to remove a nonexisting key is not considered as an error.
+//An event is published into a given channel if remove operation is successfull and
+//at least one key is removed (if several keys given). If the given key(s) doesn't exist
+//when trying to remove, no event is published.
 func (s *SdlInstance) RemoveAndPublish(channelsAndEvents []string, keys []string) error {
        if len(keys) == 0 {
                return nil
@@ -323,7 +331,7 @@ func (s *SdlInstance) RemoveAndPublish(channelsAndEvents []string, keys []string
                return err
        }
        channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
-       return s.DelPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], keysWithNs)
+       return s.DelMPub(channelsAndEventsPrepared, keysWithNs)
 }
 
 //Remove data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
@@ -407,7 +415,7 @@ func (s *SdlInstance) RemoveAllAndPublish(channelsAndEvents []string) error {
                        return err
                }
                channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
-               err = s.DelPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], keys)
+               err = s.DelMPub(channelsAndEventsPrepared, keys)
        }
        return err
 }
@@ -462,11 +470,11 @@ type iDatabase interface {
        SubscribeChannelDB(cb sdlgoredis.ChannelNotificationCb, channelPrefix, eventSeparator string, channels ...string)
        UnsubscribeChannelDB(channels ...string)
        MSet(pairs ...interface{}) error
-       MSetPub(ns, message string, pairs ...interface{}) error
+       MSetMPub(channelsAndEvents []string, pairs ...interface{}) error
        MGet(keys []string) ([]interface{}, error)
        CloseDB() error
        Del(keys []string) error
-       DelPub(channel, message string, keys []string) error
+       DelMPub(channelsAndEvents []string, keys []string) error
        Keys(key string) ([]string, error)
        SetIE(key string, oldData, newData interface{}) (bool, error)
        SetIEPub(channel, message, key string, oldData, newData interface{}) (bool, error)
index 3d06bb4..652ec55 100644 (file)
@@ -20,6 +20,7 @@ package sdlgo_test
 import (
        "errors"
        "testing"
+       "reflect"
 
        "gerrit.o-ran-sc.org/r/ric-plt/sdlgo"
        "gerrit.o-ran-sc.org/r/ric-plt/sdlgo/internal/sdlgoredis"
@@ -44,8 +45,8 @@ func (m *mockDB) MSet(pairs ...interface{}) error {
        return a.Error(0)
 }
 
-func (m *mockDB) MSetPub(ns, message string, pairs ...interface{}) error {
-       a := m.Called(ns, message, pairs)
+func (m *mockDB) MSetMPub(channelsAndEvents []string, pairs ...interface{}) error {
+       a := m.Called(channelsAndEvents, pairs)
        return a.Error(0)
 }
 
@@ -64,8 +65,8 @@ func (m *mockDB) Del(keys []string) error {
        return a.Error(0)
 }
 
-func (m *mockDB) DelPub(channel, message string, keys []string) error {
-       a := m.Called(channel, message, keys)
+func (m *mockDB) DelMPub(channelsAndEvents []string, keys []string) error {
+       a := m.Called(channelsAndEvents, keys)
        return a.Error(0)
 }
 
@@ -134,6 +135,25 @@ func setup() (*mockDB, *sdlgo.SdlInstance) {
        return m, i
 }
 
+func verifySliceInOrder(a, b []string) bool {
+               for i, v := range a {
+                       found := false
+                       if i%2 == 0 {
+                               for j, x := range b {
+                                       if j%2 == 0 && x == v && a[i+1] == b[j+1] {
+                                               found = true
+                                               break
+                                       }
+                               }
+                               if !found {
+                                       return false
+                               }
+                       }
+               }
+               return true
+
+}
+
 func TestSubscribeChannel(t *testing.T) {
        m, i := setup()
 
@@ -272,6 +292,39 @@ func TestWriteByteArrayAsValue(t *testing.T) {
        m.AssertExpectations(t)
 }
 
+func TestWriteMapAsInput(t *testing.T){
+       m, i := setup()
+
+       setExpected := []interface{}{"{namespace},key1", "string123",
+                                                               "{namespace},key22", 12,
+                                                               "{namespace},key333", []byte{1,2,3,4,5}}
+       inputMap := map[string]interface{}{
+               "key1": "string123",
+               "key22": 12,
+               "key333": []byte{1,2,3,4,5},
+       }
+
+       m.On("MSet", mock.MatchedBy(func(input []interface{}) bool {
+               for _, v := range input {
+                       found := false
+                       for _, x := range setExpected {
+                               found = reflect.DeepEqual(x, v)
+                               if found == true {
+                                       break
+                               }
+                       }
+                       if found == false {
+                               return false
+                       }
+               }
+               return true
+               })).Return(nil)
+
+       err := i.Set(inputMap)
+       assert.Nil(t, err)
+       m.AssertExpectations(t)
+}
+
 func TestWriteMixed(t *testing.T) {
        m, i := setup()
 
@@ -372,24 +425,41 @@ func TestWriteEmptyList(t *testing.T) {
 func TestWriteAndPublishOneKeyOneChannel(t *testing.T) {
        m, i := setup()
 
-       expectedChannel := "{namespace},channel"
-       expectedMessage := "event"
+       expectedChannelAndEvent := []string{"{namespace},channel", "event"}
        expectedKeyVal := []interface{}{"{namespace},key1", "data1"}
 
-       m.On("MSetPub", expectedChannel, expectedMessage, expectedKeyVal).Return(nil)
+       m.On("MSetMPub", expectedChannelAndEvent, expectedKeyVal).Return(nil)
        m.AssertNotCalled(t, "MSet", expectedKeyVal)
        err := i.SetAndPublish([]string{"channel", "event"}, "key1", "data1")
        assert.Nil(t, err)
        m.AssertExpectations(t)
 }
+
+func TestWriteAndPublishSeveralChannelsAndEvents(t *testing.T) {
+       m , i := setup()
+
+       expectedChannelsAndEvents := []string{"{namespace},channel1", "event1___event2", 
+                                                                                 "{namespace},channel2", "event3___event4"}
+       expectedKeyVal := []interface{}{"{namespace},key", "data"}
+
+       verifyChannelAndEvent := func(input []string) bool {
+               return verifySliceInOrder(input, expectedChannelsAndEvents)
+       }
+       m.On("MSetMPub", mock.MatchedBy(verifyChannelAndEvent), expectedKeyVal).Return(nil)
+       m.AssertNotCalled(t, "MSet", expectedKeyVal)
+       err := i.SetAndPublish([]string{"channel1", "event1", "channel2", "event3", "channel1", "event2", "channel2", "event4"},
+                                                                       "key", "data")
+       assert.Nil(t, err)
+       m.AssertExpectations(t)
+}
+
 func TestWriteAndPublishOneKeyOneChannelTwoEvents(t *testing.T) {
        m, i := setup()
 
-       expectedChannel := "{namespace},channel"
-       expectedMessage := "event1___event2"
+       expectedChannelAndEvents := []string{"{namespace},channel", "event1___event2"}
        expectedKeyVal := []interface{}{"{namespace},key1", "data1"}
 
-       m.On("MSetPub", expectedChannel, expectedMessage, expectedKeyVal).Return(nil)
+       m.On("MSetMPub", expectedChannelAndEvents, expectedKeyVal).Return(nil)
        m.AssertNotCalled(t, "MSet", expectedKeyVal)
        err := i.SetAndPublish([]string{"channel", "event1", "channel", "event2"}, "key1", "data1")
        assert.Nil(t, err)
@@ -399,10 +469,9 @@ func TestWriteAndPublishOneKeyOneChannelTwoEvents(t *testing.T) {
 func TestWriteAndPublishIncorrectChannelAndEvent(t *testing.T) {
        m, i := setup()
 
-       expectedChannel := "{namespace},channel"
-       expectedMessage := "event1___event2"
+       expectedChannelAndEvent := []string{}
        expectedKeyVal := []interface{}{"{namespace},key1", "data1"}
-       m.AssertNotCalled(t, "MSetPub", expectedChannel, expectedMessage, expectedKeyVal)
+       m.AssertNotCalled(t, "MSetMPub", expectedChannelAndEvent, expectedKeyVal)
        m.AssertNotCalled(t, "MSet", expectedKeyVal)
        err := i.SetAndPublish([]string{"channel", "event1", "channel"}, "key1", "data1")
        assert.NotNil(t, err)
@@ -412,10 +481,9 @@ func TestWriteAndPublishIncorrectChannelAndEvent(t *testing.T) {
 func TestWriteAndPublishNotAllowedCharactersInEvents(t *testing.T) {
        m, i := setup()
 
-       expectedChannel := "{namespace},channel"
-       expectedMessage := "event1___event2"
+       expectedChannelAndEvent := []string{}
        expectedKeyVal := []interface{}{"{namespace},key1", "data1"}
-       m.AssertNotCalled(t, "MSetPub", expectedChannel, expectedMessage, expectedKeyVal)
+       m.AssertNotCalled(t, "MSetMPub", expectedChannelAndEvent, expectedKeyVal)
        m.AssertNotCalled(t, "MSet", expectedKeyVal)
        err := i.SetAndPublish([]string{"channel", "event1___event2"}, "key1", "data1")
        assert.NotNil(t, err)
@@ -425,11 +493,10 @@ func TestWriteAndPublishNotAllowedCharactersInEvents(t *testing.T) {
 func TestWriteAndPublishNoData(t *testing.T) {
        m, i := setup()
 
-       expectedChannel := "{namespace},channel"
-       expectedMessage := "event"
+       expectedChannelAndEvent := []string{}
        expectedKeyVal := []interface{}{"key"}
 
-       m.AssertNotCalled(t, "MSetPub", expectedChannel, expectedMessage, expectedKeyVal)
+       m.AssertNotCalled(t, "MSetMPub", expectedChannelAndEvent, expectedKeyVal)
        m.AssertNotCalled(t, "MSet", expectedKeyVal)
        err := i.SetAndPublish([]string{"channel", "event"}, []interface{}{"key"})
        assert.NotNil(t, err)
@@ -442,7 +509,7 @@ func TestWriteAndPublishNoChannelEvent(t *testing.T) {
        expectedKeyVal := []interface{}{"{namespace},key1", "data1"}
 
        m.On("MSet", expectedKeyVal).Return(nil)
-       m.AssertNotCalled(t, "MSetPub", "", "", expectedKeyVal)
+       m.AssertNotCalled(t, "MSetMPub", "", "", expectedKeyVal)
        err := i.SetAndPublish([]string{}, "key1", "data1")
        assert.Nil(t, err)
        m.AssertExpectations(t)
@@ -452,23 +519,39 @@ func TestWriteAndPublishNoChannelEvent(t *testing.T) {
 func TestRemoveAndPublishSuccessfully(t *testing.T) {
        m, i := setup()
 
-       expectedChannel := "{namespace},channel"
-       expectedEvent := "event"
+       expectedChannelAndEvent := []string{"{namespace},channel", "event"}
        expectedKeys := []string{"{namespace},key1", "{namespace},key2"}
 
-       m.On("DelPub", expectedChannel, expectedEvent, expectedKeys).Return(nil)
+       m.On("DelMPub", expectedChannelAndEvent, expectedKeys).Return(nil)
        err := i.RemoveAndPublish([]string{"channel", "event"}, []string{"key1", "key2"})
        assert.Nil(t, err)
        m.AssertExpectations(t)
 }
+
+func TestRemoveAndPublishSeveralChannelsAndEventsSuccessfully(t *testing.T) {
+       m, i := setup()
+
+       expectedChannelAndEvent := []string{"{namespace},channel1", "event1___event2",
+                                                                               "{namespace},channel2", "event3___event4"}
+       expectedKeys := []string{"{namespace},key1", "{namespace},key2"}
+
+       verifyChannelAndEvent := func(input []string) bool {
+               return verifySliceInOrder(input, expectedChannelAndEvent)
+       }
+       m.On("DelMPub", mock.MatchedBy(verifyChannelAndEvent), expectedKeys).Return(nil)
+       err := i.RemoveAndPublish([]string{"channel1", "event1", "channel2", "event3",
+                                                                       "channel1", "event2", "channel2", "event4"},
+                                                                       []string{"key1", "key2"})
+       assert.Nil(t, err)
+       m.AssertExpectations(t)
+}
 func TestRemoveAndPublishFail(t *testing.T) {
        m, i := setup()
 
-       expectedChannel := "{namespace},channel"
-       expectedEvent := "event"
+       expectedChannelAndEvent := []string{"{namespace},channel", "event"}
        expectedKeys := []string{"{namespace},key1", "{namespace},key2"}
 
-       m.On("DelPub", expectedChannel, expectedEvent, expectedKeys).Return(errors.New("Some error"))
+       m.On("DelMPub", expectedChannelAndEvent, expectedKeys).Return(errors.New("Some error"))
        err := i.RemoveAndPublish([]string{"channel", "event"}, []string{"key1", "key2"})
        assert.NotNil(t, err)
        m.AssertExpectations(t)
@@ -488,11 +571,10 @@ func TestRemoveAndPublishNoChannels(t *testing.T) {
 func TestRemoveAndPublishIncorrectChannel(t *testing.T) {
        m, i := setup()
 
-       notExpectedChannel := "{namespace},channel"
-       notExpectedEvent := "event"
+       notExpectedChannelAndEvent := []string{}
        notExpectedKeys := []string{"{namespace},key"}
 
-       m.AssertNotCalled(t, "DelPub", notExpectedChannel, notExpectedEvent, notExpectedKeys)
+       m.AssertNotCalled(t, "DelMPub", notExpectedChannelAndEvent, notExpectedKeys)
        m.AssertNotCalled(t, "Del", notExpectedKeys)
        err := i.RemoveAndPublish([]string{"channel", "event", "channel2"}, []string{})
        assert.Nil(t, err)
@@ -502,11 +584,10 @@ func TestRemoveAndPublishIncorrectChannel(t *testing.T) {
 func TestRemoveAndPublishNoKeys(t *testing.T) {
        m, i := setup()
 
-       notExpectedChannel := "{namespace},channel"
-       notExpectedEvent := "event"
+       notExpectedChannelAndEvent := []string{}
        notExpectedKeys := []string{"{namespace},key"}
 
-       m.AssertNotCalled(t, "DelPub", notExpectedChannel, notExpectedEvent, notExpectedKeys)
+       m.AssertNotCalled(t, "DelMPub", notExpectedChannelAndEvent, notExpectedKeys)
        m.AssertNotCalled(t, "Del", notExpectedKeys)
        err := i.RemoveAndPublish([]string{"channel", "event"}, []string{})
        assert.Nil(t, err)
@@ -980,10 +1061,9 @@ func TestRemoveAllAndPublishSuccessfully(t *testing.T) {
        mKeysExpected := string("{namespace},*")
        mKeysReturn := []string{"{namespace},key1", "{namespace},key2"}
        mDelExpected := mKeysReturn
-       expectedChannel := "{namespace},channel"
-       expectedEvent := "event"
+       expectedChannelAndEvent := []string{"{namespace},channel", "event"}
        m.On("Keys", mKeysExpected).Return(mKeysReturn, nil)
-       m.On("DelPub", expectedChannel, expectedEvent, mDelExpected).Return(nil)
+       m.On("DelMPub", expectedChannelAndEvent, mDelExpected).Return(nil)
        err := i.RemoveAllAndPublish([]string{"channel", "event"})
        assert.Nil(t, err)
        m.AssertExpectations(t)
@@ -995,10 +1075,9 @@ func TestRemoveAllAndPublishKeysReturnError(t *testing.T) {
        mKeysExpected := string("{namespace},*")
        mKeysReturn := []string{"{namespace},key1", "{namespace},key2"}
        mDelExpected := mKeysReturn
-       expectedChannel := "{namespace},channel"
-       expectedEvent := "event"
+       expectedChannelAndEvent := []string{"{namespace},channel", "event" }
        m.On("Keys", mKeysExpected).Return(mKeysReturn, errors.New("Some error"))
-       m.AssertNotCalled(t, "DelPub", expectedChannel, expectedEvent, mDelExpected)
+       m.AssertNotCalled(t, "DelMPub", expectedChannelAndEvent, mDelExpected)
        err := i.RemoveAllAndPublish([]string{"channel", "event"})
        assert.NotNil(t, err)
        m.AssertExpectations(t)
@@ -1010,10 +1089,9 @@ func TestRemoveAllAndPublishKeysDelReturnsError(t *testing.T) {
        mKeysExpected := string("{namespace},*")
        mKeysReturn := []string{"{namespace},key1", "{namespace},key2"}
        mDelExpected := mKeysReturn
-       expectedChannel := "{namespace},channel"
-       expectedEvent := "event"
+       expectedChannelAndEvent := []string{"{namespace},channel", "event"}
        m.On("Keys", mKeysExpected).Return(mKeysReturn, nil)
-       m.On("DelPub", expectedChannel, expectedEvent, mDelExpected).Return(errors.New("Some error"))
+       m.On("DelMPub", expectedChannelAndEvent, mDelExpected).Return(errors.New("Some error"))
        err := i.RemoveAllAndPublish([]string{"channel", "event"})
        assert.NotNil(t, err)
        m.AssertExpectations(t)
@@ -1025,10 +1103,9 @@ func TestRemoveAllAndPublishKeysEventsWithIllegalCharacters(t *testing.T) {
        mKeysExpected := string("{namespace},*")
        mKeysReturn := []string{"{namespace},key1", "{namespace},key2"}
        mDelExpected := mKeysReturn
-       expectedChannel := "{namespace},channel"
-       expectedEvent := "event"
+       expectedChannelAndEvent := []string{"{namespace},channel", "event"}
        m.On("Keys", mKeysExpected).Return(mKeysReturn, nil)
-       m.AssertNotCalled(t, "DelPub", expectedChannel, expectedEvent, mDelExpected)
+       m.AssertNotCalled(t, "DelMPub", expectedChannelAndEvent, mDelExpected)
        err := i.RemoveAllAndPublish([]string{"channel", "event___anotherEvent"})
        assert.NotNil(t, err)
        m.AssertExpectations(t)
@@ -1043,7 +1120,7 @@ func TestRemoveAllAndPublishNoChannels(t *testing.T) {
        mDelExpected := mKeysReturn
        m.On("Keys", mKeysExpected).Return(mKeysReturn, nil)
        m.On("Del", mDelExpected).Return(nil)
-       m.AssertNotCalled(t, "DelPub", "", "", mDelExpected)
+       m.AssertNotCalled(t, "DelMPub", "", "", mDelExpected)
        err := i.RemoveAllAndPublish([]string{})
        assert.Nil(t, err)
        m.AssertExpectations(t)
@@ -1056,7 +1133,7 @@ func TestRemoveAllAndPublishIncorrectChannel(t *testing.T) {
        mKeysReturn := []string{"{namespace},key1", "{namespace},key2"}
        mDelExpected := mKeysReturn
        m.On("Keys", mKeysExpected).Return(mKeysReturn, nil)
-       m.AssertNotCalled(t, "DelPub", "", "", mDelExpected)
+       m.AssertNotCalled(t, "DelMPub", "", "", mDelExpected)
        err := i.RemoveAllAndPublish([]string{"channel", "event", "channel2"})
        assert.NotNil(t, err)
        m.AssertExpectations(t)