Add support for notifications 49/149/5 v0.1.0
authorMarco Tallskog <marco.tallskog@nokia.com>
Tue, 14 May 2019 12:09:19 +0000 (15:09 +0300)
committerMarco Tallskog <marco.tallskog@nokia.com>
Thu, 16 May 2019 11:53:53 +0000 (14:53 +0300)
Add new API function to subscribe/unsubscribe  notifications
from a channel. With SubscribeChannel function it is possible to
subscribe one or many channels and attach a callback function taht will
be called when the notification (event) is received from a channel. It
is possible to have separate callback functions to different channels
but then subscription must be done separately. With UnsubscribeChannel
one can unsubscribe one or many subsribed channels.

Each function, that modifies database (sets or removes) will have
another API function that will cause an event to be sent to given
channel. E.g. the following two functions:

Set(pairs ...interface{})
SetAndPublish(channelsAndEvents []string, pairs ...interface{})

The first function just sets a value to key, whereas the second function
does the same thing and also sends an event to channel. The channels and
events are given as pairs in channelsAndEvents slice e.g.
[]string{"channel", "event"}
Although it is possible to give several channel event pairs, only
sending an event to one channel is supported at the moment due to
missing support in Redis DB.

Change-Id: I4fc96cd4c8a59410700f670ad5588fe71509fd03
Signed-off-by: Marco Tallskog <marco.tallskog@nokia.com>
bench_test.go
cmd/sdltester/sdltester.go
example_test.go
go.mod
internal/sdlgoredis/sdlgoredis.go
sdl.go
sdl_test.go

index 233f128..fa60686 100644 (file)
@@ -23,7 +23,7 @@ import (
        "strings"
        "testing"
 
-       "gerrit.oran-osc.org/r/ric-plt/sdlgo"
+       "gerrit.o-ran-sc.org/r/ric-plt/sdlgo"
 )
 
 type singleBenchmark struct {
index 85e5569..00d4e34 100644 (file)
@@ -22,7 +22,7 @@ import (
        "os"
        "time"
 
-       "gerrit.oran-osc.org/r/ric-plt/sdlgo"
+       "gerrit.o-ran-sc.org/r/ric-plt/sdlgo"
 )
 
 /*
index 5f78028..2e6f97a 100644 (file)
@@ -20,7 +20,7 @@ package sdlgo_test
 import (
        "fmt"
 
-       "gerrit.oran-osc.org/r/ric-plt/sdlgo"
+       "gerrit.o-ran-sc.org/r/ric-plt/sdlgo"
 )
 
 var sdl *sdlgo.SdlInstance
@@ -40,7 +40,7 @@ func ExampleSdlInstance_Set() {
 }
 
 func ExampleSdlInstance_Get() {
-       retMap, err := sdl.Get([]string{"strigdata", "intdata"})
+       retMap, err := sdl.Get([]string{"stringdata", "intdata"})
        if err != nil {
                panic(err)
        } else {
diff --git a/go.mod b/go.mod
index d46db31..f1f39d4 100644 (file)
--- a/go.mod
+++ b/go.mod
@@ -1,4 +1,4 @@
-module gerrit.oran-osc.org/r/ric-plt/sdlgo
+module gerrit.o-ran-sc.org/r/ric-plt/sdlgo
 
 go 1.12
 
@@ -9,4 +9,4 @@ require (
        github.com/stretchr/testify v1.3.0
 )
 
-replace gerrit.oran-osc.org/r/ric-plt/sdlgo/internal/sdlgoredis => ./internal/sdlgoredis
+replace gerrit.o-ran-sc.org/r/ric-plt/sdlgo/internal/sdlgoredis => ./internal/sdlgoredis
index 10b1bfe..1085d2c 100644 (file)
@@ -21,13 +21,50 @@ import (
        "errors"
        "fmt"
        "os"
+       "strings"
 
        "github.com/go-redis/redis"
 )
 
+type ChannelNotificationCb func(channel string, payload ...string)
+
+type intChannels struct {
+       addChannel    chan string
+       removeChannel chan string
+       exit          chan bool
+}
+
 type DB struct {
        client       *redis.Client
        redisModules bool
+       cbMap        map[string]ChannelNotificationCb
+       ch           intChannels
+}
+
+func checkResultAndError(result interface{}, err error) (bool, error) {
+       if err != nil {
+               if err == redis.Nil {
+                       return false, nil
+               }
+               return false, err
+       }
+       if result == "OK" {
+               return true, nil
+       } else {
+               return false, nil
+       }
+}
+
+func checkIntResultAndError(result interface{}, err error) (bool, error) {
+       if err != nil {
+               return false, err
+       }
+       if result.(int64) == 1 {
+               return true, nil
+       } else {
+               return false, nil
+       }
+
 }
 
 func Create() *DB {
@@ -50,11 +87,17 @@ func Create() *DB {
        db := DB{
                client:       client,
                redisModules: true,
+               cbMap:        make(map[string]ChannelNotificationCb, 0),
+               ch: intChannels{
+                       addChannel:    make(chan string),
+                       removeChannel: make(chan string),
+                       exit:          make(chan bool),
+               },
        }
 
        commands, err := db.client.Command().Result()
        if err == nil {
-               redisModuleCommands := []string{"setie", "delie"}
+               redisModuleCommands := []string{"setie", "delie", "msetpub", "setiepub", "setnxpub", "delpub"}
                for _, v := range redisModuleCommands {
                        _, ok := commands[v]
                        if !ok {
@@ -71,13 +114,93 @@ func (db *DB) CloseDB() error {
        return db.client.Close()
 }
 
+func (db *DB) UnsubscribeChannelDB(channels ...string) {
+       for _, v := range channels {
+               db.ch.removeChannel <- v
+               delete(db.cbMap, v)
+               if len(db.cbMap) == 0 {
+                       db.ch.exit <- true
+               }
+       }
+}
+
+func (db *DB) SubscribeChannelDB(cb ChannelNotificationCb, channelPrefix, eventSeparator string, channels ...string) {
+       if len(db.cbMap) == 0 {
+               for _, v := range channels {
+                       db.cbMap[v] = cb
+               }
+
+               go func(cbMap *map[string]ChannelNotificationCb,
+                       channelPrefix,
+                       eventSeparator string,
+                       ch intChannels,
+                       channels ...string) {
+                       sub := db.client.Subscribe(channels...)
+                       rxChannel := sub.Channel()
+                       for {
+                               select {
+                               case msg := <-rxChannel:
+                                       cb, ok := (*cbMap)[msg.Channel]
+                                       if ok {
+                                               cb(strings.TrimPrefix(msg.Channel, channelPrefix), strings.Split(msg.Payload, eventSeparator)...)
+                                       }
+                               case channel := <-ch.addChannel:
+                                       sub.Subscribe(channel)
+                               case channel := <-ch.removeChannel:
+                                       sub.Unsubscribe(channel)
+                               case exit := <-ch.exit:
+                                       if exit {
+                                               if err := sub.Close(); err != nil {
+                                                       fmt.Println(err)
+                                               }
+                                               return
+                                       }
+                               }
+                       }
+               }(&db.cbMap, channelPrefix, eventSeparator, db.ch, channels...)
+
+       } else {
+               for _, v := range channels {
+                       db.cbMap[v] = cb
+                       db.ch.addChannel <- v
+               }
+       }
+}
+
 func (db *DB) MSet(pairs ...interface{}) error {
        return db.client.MSet(pairs...).Err()
 }
 
+func (db *DB) MSetPub(channel, message string, pairs ...interface{}) error {
+       if !db.redisModules {
+               return errors.New("Redis deployment doesn't support MSETPUB command")
+       }
+       command := make([]interface{}, 0)
+       command = append(command, "MSETPUB")
+       for _, d := range pairs {
+               command = append(command, d)
+       }
+       command = append(command, channel, message)
+       _, err := db.client.Do(command...).Result()
+       return err
+}
+
 func (db *DB) MGet(keys []string) ([]interface{}, error) {
-       val, err := db.client.MGet(keys...).Result()
-       return val, err
+       return db.client.MGet(keys...).Result()
+}
+
+func (db *DB) DelPub(channel, message string, keys []string) error {
+       if !db.redisModules {
+               return errors.New("Redis deployment not supporting command DELPUB")
+       }
+       command := make([]interface{}, 0)
+       command = append(command, "DELPUB")
+       for _, d := range keys {
+               command = append(command, d)
+       }
+       command = append(command, channel, message)
+       _, err := db.client.Do(command...).Result()
+       return err
 }
 
 func (db *DB) Del(keys []string) error {
@@ -86,8 +209,7 @@ func (db *DB) Del(keys []string) error {
 }
 
 func (db *DB) Keys(pattern string) ([]string, error) {
-       val, err := db.client.Keys(pattern).Result()
-       return val, err
+       return db.client.Keys(pattern).Result()
 }
 
 func (db *DB) SetIE(key string, oldData, newData interface{}) (bool, error) {
@@ -95,32 +217,36 @@ func (db *DB) SetIE(key string, oldData, newData interface{}) (bool, error) {
                return false, errors.New("Redis deployment not supporting command")
        }
 
-       result, err := db.client.Do("SETIE", key, newData, oldData).Result()
-       if err != nil {
-               return false, err
-       }
-       if result == "OK" {
-               return true, nil
-       } else {
-               return false, nil
+       return checkResultAndError(db.client.Do("SETIE", key, newData, oldData).Result())
+}
+
+func (db *DB) SetIEPub(channel, message, key string, oldData, newData interface{}) (bool, error) {
+       if !db.redisModules {
+               return false, errors.New("Redis deployment not supporting command SETIEPUB")
        }
+       return checkResultAndError(db.client.Do("SETIEPUB", key, newData, oldData, channel, message).Result())
 }
 
+func (db *DB) SetNXPub(channel, message, key string, data interface{}) (bool, error) {
+       if !db.redisModules {
+               return false, errors.New("Redis deployment not supporting command SETNXPUB")
+       }
+       return checkResultAndError(db.client.Do("SETNXPUB", key, data, channel, message).Result())
+}
 func (db *DB) SetNX(key string, data interface{}) (bool, error) {
-       result, err := db.client.SetNX(key, data, 0).Result()
-       return result, err
+       return db.client.SetNX(key, data, 0).Result()
 }
 
-func (db *DB) DelIE(key string, data interface{}) (bool, error) {
+func (db *DB) DelIEPub(channel, message, key string, data interface{}) (bool, error) {
        if !db.redisModules {
                return false, errors.New("Redis deployment not supporting command")
        }
-       result, err := db.client.Do("DELIE", key, data).Result()
-       if err != nil {
-               return false, err
-       }
-       if result == "1" {
-               return true, nil
+       return checkIntResultAndError(db.client.Do("DELIEPUB", key, data, channel, message).Result())
+}
+
+func (db *DB) DelIE(key string, data interface{}) (bool, error) {
+       if !db.redisModules {
+               return false, errors.New("Redis deployment not supporting command")
        }
-       return false, nil
+       return checkIntResultAndError(db.client.Do("DELIE", key, data).Result())
 }
diff --git a/sdl.go b/sdl.go
index 295d21e..4d98904 100644 (file)
--- a/sdl.go
+++ b/sdl.go
 package sdlgo
 
 import (
+       "errors"
+       "fmt"
        "reflect"
        "strings"
 
-       "gerrit.oran-osc.org/r/ric-plt/sdlgo/internal/sdlgoredis"
+       "gerrit.o-ran-sc.org/r/ric-plt/sdlgo/internal/sdlgoredis"
 )
 
 type iDatabase interface {
+       SubscribeChannelDB(cb sdlgoredis.ChannelNotificationCb, channelPrefix, eventSeparator string, channels ...string)
+       UnsubscribeChannelDB(channels ...string)
        MSet(pairs ...interface{}) error
+       MSetPub(ns, message string, pairs ...interface{}) error
        MGet(keys []string) ([]interface{}, error)
        CloseDB() error
        Del(keys []string) error
+       DelPub(channel, message string, keys []string) error
        Keys(key string) ([]string, error)
        SetIE(key string, oldData, newData interface{}) (bool, error)
+       SetIEPub(channel, message, key string, oldData, newData interface{}) (bool, error)
        SetNX(key string, data interface{}) (bool, error)
+       SetNXPub(channel, message, key string, data interface{}) (bool, error)
        DelIE(key string, data interface{}) (bool, error)
+       DelIEPub(channel, message, key string, data interface{}) (bool, error)
 }
 
+//SdlInstance provides an API to read, write and modify
+//key-value pairs in a given namespace.
 type SdlInstance struct {
-       nameSpace string
-       nsPrefix  string
+       nameSpace      string
+       nsPrefix       string
+       eventSeparator string
        iDatabase
 }
 
@@ -45,34 +57,87 @@ type SdlInstance struct {
 //as a backend for the key-value storage. The returned value shall
 //be given as a parameter when calling NewKeyValStorage
 func NewDatabase() *sdlgoredis.DB {
-       db := sdlgoredis.Create()
-       return db
+       return sdlgoredis.Create()
 }
 
 //NewSdlInstance creates a new sdl instance using the given namespace.
 //The database used as a backend is given as a parameter
 func NewSdlInstance(NameSpace string, db iDatabase) *SdlInstance {
-       s := SdlInstance{
-               nameSpace: NameSpace,
-               nsPrefix:  "{" + NameSpace + "},",
-               iDatabase: db,
+       return &SdlInstance{
+               nameSpace:      NameSpace,
+               nsPrefix:       "{" + NameSpace + "},",
+               eventSeparator: "___",
+               iDatabase:      db,
        }
+}
+
+//SubscribeChannel lets you to subscribe for a events on a given channels.
+//SDL notifications are events that are published on a specific channels.
+//Both the channel and events are defined by the entity that is publishing
+//the events.
+//
+//When subscribing for a channel, a callback function is given as a parameter.
+//Whenever a notification is received from a channel, this callback is called
+//with channel and notifications as parameter (several notifications could be
+//packed to a single callback function call). A call to SubscribeChannel function
+//returns immediatelly, callbacks will be called asyncronously.
+//
+//It is possible to subscribe to different channels using different callbacks. In
+//this case simply use SubscribeChannel function separately for each channel.
+//
+//When receiving events in callback routine, it is a good practive to return from
+//callback as quickly as possible. E.g. reading in callback context should be avoided
+//and using of Go signals is recommended. Also it should be noted that in case of several
+//events received from different channels, callbacks are called in series one by one.
+func (s *SdlInstance) SubscribeChannel(cb func(string, ...string), channels ...string) error {
+       s.SubscribeChannelDB(cb, s.nsPrefix, s.eventSeparator, s.setNamespaceToChannels(channels...)...)
+       return nil
+}
 
-       return &s
+//UnsubscribeChannel removes subscription from one or several channels.
+func (s *SdlInstance) UnsubscribeChannel(channels ...string) error {
+       s.UnsubscribeChannelDB(s.setNamespaceToChannels(channels...)...)
+       return nil
 }
 
+//Close connection to backend database.
 func (s *SdlInstance) Close() error {
        return s.CloseDB()
 }
 
-func (s *SdlInstance) setNamespaceToKeys(pairs ...interface{}) []interface{} {
-       var retVal []interface{}
-       for i, v := range pairs {
-               if i%2 == 0 {
-                       reflectType := reflect.TypeOf(v)
-                       switch reflectType.Kind() {
-                       case reflect.Slice:
+func (s *SdlInstance) checkChannelsAndEvents(cmd string, channelsAndEvents []string) error {
+       if len(channelsAndEvents)%2 != 0 {
+               return fmt.Errorf("%s: Channels and events must be given as pairs", cmd)
+       }
+       for i, v := range channelsAndEvents {
+               if i%2 != 0 {
+                       if strings.Contains(v, s.eventSeparator) {
+                               return fmt.Errorf("%s: event %s contains illegal substring (\"%s\")", cmd, v, s.eventSeparator)
+                       }
+               }
+       }
+       return nil
+}
+func (s *SdlInstance) setNamespaceToChannels(channels ...string) []string {
+       var retVal []string
+       for _, v := range channels {
+               retVal = append(retVal, s.nsPrefix+v)
+       }
+       return retVal
+}
+
+func (s *SdlInstance) setNamespaceToKeys(pairs ...interface{}) ([]interface{}, error) {
+       retVal := make([]interface{}, 0)
+       shouldBeKey := true
+       for _, v := range pairs {
+               reflectType := reflect.TypeOf(v)
+               switch reflectType.Kind() {
+               case reflect.Slice:
+                       if shouldBeKey {
                                x := reflect.ValueOf(v)
+                               if x.Len()%2 != 0 {
+                                       return []interface{}{}, errors.New("Key/value pairs doesn't match")
+                               }
                                for i2 := 0; i2 < x.Len(); i2++ {
                                        if i2%2 == 0 {
                                                retVal = append(retVal, s.nsPrefix+x.Index(i2).Interface().(string))
@@ -80,8 +145,15 @@ func (s *SdlInstance) setNamespaceToKeys(pairs ...interface{}) []interface{} {
                                                retVal = append(retVal, x.Index(i2).Interface())
                                        }
                                }
-                       case reflect.Array:
+                       } else {
+                               return []interface{}{}, errors.New("Key/value pairs doesn't match")
+                       }
+               case reflect.Array:
+                       if shouldBeKey {
                                x := reflect.ValueOf(v)
+                               if x.Len()%2 != 0 {
+                                       return []interface{}{}, errors.New("Key/value pairs doesn't match")
+                               }
                                for i2 := 0; i2 < x.Len(); i2++ {
                                        if i2%2 == 0 {
                                                retVal = append(retVal, s.nsPrefix+x.Index(i2).Interface().(string))
@@ -89,16 +161,75 @@ func (s *SdlInstance) setNamespaceToKeys(pairs ...interface{}) []interface{} {
                                                retVal = append(retVal, x.Index(i2).Interface())
                                        }
                                }
-                       default:
+                       } else {
+                               return []interface{}{}, errors.New("Key/value pairs doesn't match")
+                       }
+               default:
+                       if shouldBeKey {
                                retVal = append(retVal, s.nsPrefix+v.(string))
+                               shouldBeKey = false
+                       } else {
+                               retVal = append(retVal, v)
+                               shouldBeKey = true
                        }
+               }
+       }
+       if len(retVal)%2 != 0 {
+               return []interface{}{}, errors.New("Key/value pairs doesn't match")
+       }
+       return retVal, nil
+}
+
+func (s *SdlInstance) prepareChannelsAndEvents(channelsAndEvents []string) []string {
+       channelEventMap := make(map[string]string)
+       for i, v := range channelsAndEvents {
+               if i%2 != 0 {
+                       continue
+               }
+               _, exists := channelEventMap[v]
+               if exists {
+                       channelEventMap[v] = channelEventMap[v] + s.eventSeparator + channelsAndEvents[i+1]
                } else {
-                       retVal = append(retVal, v)
+                       channelEventMap[v] = channelsAndEvents[i+1]
                }
        }
+       retVal := make([]string, 0)
+       for k, v := range channelEventMap {
+               retVal = append(retVal, s.nsPrefix+k)
+               retVal = append(retVal, v)
+       }
        return retVal
 }
 
+//SetAndPublish function writes data to shared data layer storage and send an event to
+//a channel. Writing is done atomically, i.e. all succeeds or fails.
+//Data to be written is given as key-value pairs. Several key-value
+//pairs can be written with one call.
+//The key is expected to be string whereas value can be anything, string,
+//number, slice array or map
+//
+//Channels and events are given as pairs is channelsAndEvents parameter.
+//Although it is possible to give sevral channel-event pairs, current implementation
+//supports sending events to one channel only due to missing support in DB backend.
+func (s *SdlInstance) SetAndPublish(channelsAndEvents []string, pairs ...interface{}) error {
+       if len(pairs)%2 != 0 {
+               return errors.New("Invalid pairs parameter")
+       }
+
+       keyAndData, err := s.setNamespaceToKeys(pairs...)
+       if err != nil {
+               return err
+       }
+       if len(channelsAndEvents) == 0 {
+               return s.MSet(keyAndData...)
+       }
+       if err := s.checkChannelsAndEvents("SetAndPublish", channelsAndEvents); err != nil {
+               return err
+       }
+       channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
+       return s.MSetPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], keyAndData...)
+}
+
 //Set function writes data to shared data layer storage. Writing is done
 //atomically, i.e. all succeeds or fails.
 //Data to be written is given as key-value pairs. Several key-value
@@ -110,9 +241,11 @@ func (s *SdlInstance) Set(pairs ...interface{}) error {
                return nil
        }
 
-       keyAndData := s.setNamespaceToKeys(pairs...)
-       err := s.MSet(keyAndData...)
-       return err
+       keyAndData, err := s.setNamespaceToKeys(pairs...)
+       if err != nil {
+               return err
+       }
+       return s.MSet(keyAndData...)
 }
 
 //Get function atomically reads one or more keys from SDL. The returned map has the
@@ -138,28 +271,70 @@ func (s *SdlInstance) Get(keys []string) (map[string]interface{}, error) {
        return m, err
 }
 
+//SetIfAndPublish atomically replaces existing data with newData in SDL if data matches the oldData.
+//If replace was done successfully, true will be returned. Also, if publishing was successfull, an event
+//is published to a given channel.
+func (s *SdlInstance) SetIfAndPublish(channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error) {
+       if len(channelsAndEvents) == 0 {
+               return s.SetIE(s.nsPrefix+key, oldData, newData)
+       }
+       if err := s.checkChannelsAndEvents("SetIfAndPublish", channelsAndEvents); err != nil {
+               return false, err
+       }
+       channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
+       return s.SetIEPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], s.nsPrefix+key, oldData, newData)
+}
+
 //SetIf atomically replaces existing data with newData in SDL if data matches the oldData.
 //If replace was done successfully, true will be returned.
 func (s *SdlInstance) SetIf(key string, oldData, newData interface{}) (bool, error) {
-       status, err := s.SetIE(s.nsPrefix+key, oldData, newData)
-       if err != nil {
+       return s.SetIE(s.nsPrefix+key, oldData, newData)
+}
+
+//SetIfNotExistsAndPublish conditionally sets the value of a key. If key already exists in SDL,
+//then it's value is not changed. Checking the key existence and potential set operation
+//is done atomically. If the set operation was done successfully, an event is published to a
+//given channel.
+func (s *SdlInstance) SetIfNotExistsAndPublish(channelsAndEvents []string, key string, data interface{}) (bool, error) {
+       if len(channelsAndEvents) == 0 {
+               return s.SetNX(s.nsPrefix+key, data)
+       }
+       if err := s.checkChannelsAndEvents("SetIfNotExistsAndPublish", channelsAndEvents); err != nil {
                return false, err
        }
-       return status, nil
+       channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
+       return s.SetNXPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], s.nsPrefix+key, data)
 }
 
 //SetIfNotExists conditionally sets the value of a key. If key already exists in SDL,
 //then it's value is not changed. Checking the key existence and potential set operation
 //is done atomically.
 func (s *SdlInstance) SetIfNotExists(key string, data interface{}) (bool, error) {
-       status, err := s.SetNX(s.nsPrefix+key, data)
-       if err != nil {
-               return false, err
+       return s.SetNX(s.nsPrefix+key, data)
+}
+
+//RemoveAndPublish removes data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
+//An event is published into a given channel if remove operation is successfull.
+func (s *SdlInstance) RemoveAndPublish(channelsAndEvents []string, keys []string) error {
+       if len(keys) == 0 {
+               return nil
        }
-       return status, nil
+
+       var keysWithNs []string
+       for _, v := range keys {
+               keysWithNs = append(keysWithNs, s.nsPrefix+v)
+       }
+       if len(channelsAndEvents) == 0 {
+               return s.Del(keysWithNs)
+       }
+       if err := s.checkChannelsAndEvents("RemoveAndPublish", channelsAndEvents); err != nil {
+               return err
+       }
+       channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
+       return s.DelPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], keysWithNs)
 }
 
-//Remove data from SDL. Operation is done atomically, i.e. either all succeeds or fails
+//Remove data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
 func (s *SdlInstance) Remove(keys []string) error {
        if len(keys) == 0 {
                return nil
@@ -173,6 +348,20 @@ func (s *SdlInstance) Remove(keys []string) error {
        return err
 }
 
+//RemoveIfAndPublish removes data from SDL conditionally and if remove was done successfully,
+//a given event is published to channel. If existing data matches given data,
+//key and data are removed from SDL. If remove was done successfully, true is returned.
+func (s *SdlInstance) RemoveIfAndPublish(channelsAndEvents []string, key string, data interface{}) (bool, error) {
+       if len(channelsAndEvents) == 0 {
+               return s.DelIE(s.nsPrefix+key, data)
+       }
+       if err := s.checkChannelsAndEvents("RemoveIfAndPublish", channelsAndEvents); err != nil {
+               return false, err
+       }
+       channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
+       return s.DelIEPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], s.nsPrefix+key, data)
+}
+
 //RemoveIf removes data from SDL conditionally. If existing data matches given data,
 //key and data are removed from SDL. If remove was done successfully, true is returned.
 func (s *SdlInstance) RemoveIf(key string, data interface{}) (bool, error) {
@@ -187,7 +376,7 @@ func (s *SdlInstance) RemoveIf(key string, data interface{}) (bool, error) {
 //given namespace exists, thus operation is not guaranteed to be atomic or isolated.
 func (s *SdlInstance) GetAll() ([]string, error) {
        keys, err := s.Keys(s.nsPrefix + "*")
-       var retVal []string = nil
+       var retVal []string
        if err != nil {
                return retVal, err
        }
@@ -204,8 +393,27 @@ func (s *SdlInstance) RemoveAll() error {
        if err != nil {
                return err
        }
-       if keys != nil {
+       if (keys != nil) && (len(keys) != 0) {
                err = s.Del(keys)
        }
        return err
 }
+
+func (s *SdlInstance) RemoveAllAndPublish(channelsAndEvents []string) error {
+       keys, err := s.Keys(s.nsPrefix + "*")
+       if err != nil {
+               return err
+       }
+       if (keys != nil) && (len(keys) != 0) {
+               if len(channelsAndEvents) == 0 {
+                       return s.Del(keys)
+               }
+               if err := s.checkChannelsAndEvents("RemoveIfAndPublish", channelsAndEvents); err != nil {
+                       return err
+               }
+               channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
+               err = s.DelPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], keys)
+       }
+       return err
+
+}
index dbd073f..dd65755 100644 (file)
@@ -21,7 +21,8 @@ import (
        "errors"
        "testing"
 
-       "gerrit.oran-osc.org/r/ric-plt/sdlgo"
+       "gerrit.o-ran-sc.org/r/ric-plt/sdlgo"
+       "gerrit.o-ran-sc.org/r/ric-plt/sdlgo/internal/sdlgoredis"
        "github.com/stretchr/testify/assert"
        "github.com/stretchr/testify/mock"
 )
@@ -30,11 +31,24 @@ type mockDB struct {
        mock.Mock
 }
 
+func (m *mockDB) SubscribeChannelDB(cb sdlgoredis.ChannelNotificationCb, channelPrefix, eventSeparator string, channels ...string) {
+       m.Called(cb, channelPrefix, eventSeparator, channels)
+}
+
+func (m *mockDB) UnsubscribeChannelDB(channels ...string) {
+       m.Called(channels)
+}
+
 func (m *mockDB) MSet(pairs ...interface{}) error {
        a := m.Called(pairs)
        return a.Error(0)
 }
 
+func (m *mockDB) MSetPub(ns, message string, pairs ...interface{}) error {
+       a := m.Called(ns, message, pairs)
+       return a.Error(0)
+}
+
 func (m *mockDB) MGet(keys []string) ([]interface{}, error) {
        a := m.Called(keys)
        return a.Get(0).([]interface{}), a.Error(1)
@@ -50,6 +64,11 @@ func (m *mockDB) Del(keys []string) error {
        return a.Error(0)
 }
 
+func (m *mockDB) DelPub(channel, message string, keys []string) error {
+       a := m.Called(channel, message, keys)
+       return a.Error(0)
+}
+
 func (m *mockDB) Keys(pattern string) ([]string, error) {
        a := m.Called(pattern)
        return a.Get(0).([]string), a.Error(1)
@@ -60,22 +79,57 @@ func (m *mockDB) SetIE(key string, oldData, newData interface{}) (bool, error) {
        return a.Bool(0), a.Error(1)
 }
 
+func (m *mockDB) SetIEPub(channel, message, key string, oldData, newData interface{}) (bool, error) {
+       a := m.Called(channel, message, key, oldData, newData)
+       return a.Bool(0), a.Error(1)
+}
+
 func (m *mockDB) SetNX(key string, data interface{}) (bool, error) {
        a := m.Called(key, data)
        return a.Bool(0), a.Error(1)
 }
 
+func (m *mockDB) SetNXPub(channel, message, key string, data interface{}) (bool, error) {
+       a := m.Called(channel, message, key, data)
+       return a.Bool(0), a.Error(1)
+}
+
 func (m *mockDB) DelIE(key string, data interface{}) (bool, error) {
        a := m.Called(key, data)
        return a.Bool(0), a.Error(1)
 }
 
+func (m *mockDB) DelIEPub(channel, message, key string, data interface{}) (bool, error) {
+       a := m.Called(channel, message, key, data)
+       return a.Bool(0), a.Error(1)
+}
+
 func setup() (*mockDB, *sdlgo.SdlInstance) {
        m := new(mockDB)
        i := sdlgo.NewSdlInstance("namespace", m)
        return m, i
 }
 
+func TestSubscribeChannel(t *testing.T) {
+       m, i := setup()
+
+       expectedCB := func(channel string, events ...string) {}
+       expectedChannels := []string{"{namespace},channel1", "{namespace},channel2"}
+
+       m.On("SubscribeChannelDB", mock.AnythingOfType("sdlgoredis.ChannelNotificationCb"), "{namespace},", "___", expectedChannels).Return()
+       i.SubscribeChannel(expectedCB, "channel1", "channel2")
+       m.AssertExpectations(t)
+}
+
+func TestUnsubscribeChannel(t *testing.T) {
+       m, i := setup()
+
+       expectedChannels := []string{"{namespace},channel1", "{namespace},channel2"}
+
+       m.On("UnsubscribeChannelDB", expectedChannels).Return()
+       i.UnsubscribeChannel("channel1", "channel2")
+       m.AssertExpectations(t)
+}
 func TestGetOneKey(t *testing.T) {
        m, i := setup()
 
@@ -161,6 +215,37 @@ func TestWriteOneKey(t *testing.T) {
        m.AssertExpectations(t)
 }
 
+func TestWriteMixed(t *testing.T) {
+       m, i := setup()
+
+       msetExpected := []interface{}{"{namespace},key1", "data1", "{namespace},key2", "data2", "{namespace},key3", "data3"}
+
+       m.On("MSet", msetExpected).Return(nil)
+       err := i.Set("key1", "data1", []string{"key2", "data2"}, [2]string{"key3", "data3"})
+       assert.Nil(t, err)
+       m.AssertExpectations(t)
+}
+
+func TestWriteIncorrectMixed(t *testing.T) {
+       m, i := setup()
+
+       msetExpected := []interface{}{"{namespace},key1", "data1", "{namespace},key2", "data2", "{namespace},key3", "data3"}
+
+       m.AssertNotCalled(t, "MSet", msetExpected)
+       err := i.Set("key1", []string{"key2", "data2"}, [2]string{"key3", "data3"})
+       assert.NotNil(t, err)
+       m.AssertExpectations(t)
+}
+func TestWriteIncorrectPairs(t *testing.T) {
+       m, i := setup()
+
+       msetExpected := []interface{}{}
+
+       m.AssertNotCalled(t, "MSet", msetExpected)
+       err := i.Set("key")
+       assert.NotNil(t, err)
+       m.AssertExpectations(t)
+}
 func TestWriteSeveralKeysSlice(t *testing.T) {
        m, i := setup()
 
@@ -173,6 +258,18 @@ func TestWriteSeveralKeysSlice(t *testing.T) {
 
 }
 
+func TestWriteSeveralKeysIncorrectSlice(t *testing.T) {
+       m, i := setup()
+
+       msetExpected := []interface{}{"{namespace},key1", "data1", "{namespace},key2", 22}
+
+       m.AssertNotCalled(t, "MSet", msetExpected)
+       err := i.Set([]interface{}{"key1", "data1", "key2"})
+       assert.NotNil(t, err)
+       m.AssertExpectations(t)
+
+}
+
 func TestWriteSeveralKeysArray(t *testing.T) {
        m, i := setup()
 
@@ -184,6 +281,17 @@ func TestWriteSeveralKeysArray(t *testing.T) {
        m.AssertExpectations(t)
 }
 
+func TestWriteSeveralKeysIncorrectArray(t *testing.T) {
+       m, i := setup()
+
+       msetExpected := []interface{}{}
+
+       m.AssertNotCalled(t, "MSet", msetExpected)
+       err := i.Set([3]string{"key1", "data1", "key2"})
+       assert.NotNil(t, err)
+       m.AssertExpectations(t)
+}
+
 func TestWriteFail(t *testing.T) {
        m, i := setup()
 
@@ -204,6 +312,159 @@ func TestWriteEmptyList(t *testing.T) {
        m.AssertNotCalled(t, "MSet", msetExpected)
 }
 
+func TestWriteAndPublishOneKeyOneChannel(t *testing.T) {
+       m, i := setup()
+
+       expectedChannel := "{namespace},channel"
+       expectedMessage := "event"
+       expectedKeyVal := []interface{}{"{namespace},key1", "data1"}
+
+       m.On("MSetPub", expectedChannel, expectedMessage, expectedKeyVal).Return(nil)
+       m.AssertNotCalled(t, "MSet", expectedKeyVal)
+       err := i.SetAndPublish([]string{"channel", "event"}, "key1", "data1")
+       assert.Nil(t, err)
+       m.AssertExpectations(t)
+}
+func TestWriteAndPublishOneKeyOneChannelTwoEvents(t *testing.T) {
+       m, i := setup()
+
+       expectedChannel := "{namespace},channel"
+       expectedMessage := "event1___event2"
+       expectedKeyVal := []interface{}{"{namespace},key1", "data1"}
+
+       m.On("MSetPub", expectedChannel, expectedMessage, expectedKeyVal).Return(nil)
+       m.AssertNotCalled(t, "MSet", expectedKeyVal)
+       err := i.SetAndPublish([]string{"channel", "event1", "channel", "event2"}, "key1", "data1")
+       assert.Nil(t, err)
+       m.AssertExpectations(t)
+}
+
+func TestWriteAndPublishIncorrectChannelAndEvent(t *testing.T) {
+       m, i := setup()
+
+       expectedChannel := "{namespace},channel"
+       expectedMessage := "event1___event2"
+       expectedKeyVal := []interface{}{"{namespace},key1", "data1"}
+       m.AssertNotCalled(t, "MSetPub", expectedChannel, expectedMessage, expectedKeyVal)
+       m.AssertNotCalled(t, "MSet", expectedKeyVal)
+       err := i.SetAndPublish([]string{"channel", "event1", "channel"}, "key1", "data1")
+       assert.NotNil(t, err)
+       m.AssertExpectations(t)
+}
+
+func TestWriteAndPublishNotAllowedCharactersInEvents(t *testing.T) {
+       m, i := setup()
+
+       expectedChannel := "{namespace},channel"
+       expectedMessage := "event1___event2"
+       expectedKeyVal := []interface{}{"{namespace},key1", "data1"}
+       m.AssertNotCalled(t, "MSetPub", expectedChannel, expectedMessage, expectedKeyVal)
+       m.AssertNotCalled(t, "MSet", expectedKeyVal)
+       err := i.SetAndPublish([]string{"channel", "event1___event2"}, "key1", "data1")
+       assert.NotNil(t, err)
+       m.AssertExpectations(t)
+}
+
+func TestWriteAndPublishNoData(t *testing.T) {
+       m, i := setup()
+
+       expectedChannel := "{namespace},channel"
+       expectedMessage := "event"
+       expectedKeyVal := []interface{}{"key"}
+
+       m.AssertNotCalled(t, "MSetPub", expectedChannel, expectedMessage, expectedKeyVal)
+       m.AssertNotCalled(t, "MSet", expectedKeyVal)
+       err := i.SetAndPublish([]string{"channel", "event"}, []interface{}{"key"})
+       assert.NotNil(t, err)
+       m.AssertExpectations(t)
+}
+
+func TestWriteAndPublishNoChannelEvent(t *testing.T) {
+       m, i := setup()
+
+       expectedKeyVal := []interface{}{"{namespace},key1", "data1"}
+
+       m.On("MSet", expectedKeyVal).Return(nil)
+       m.AssertNotCalled(t, "MSetPub", "", "", expectedKeyVal)
+       err := i.SetAndPublish([]string{}, "key1", "data1")
+       assert.Nil(t, err)
+       m.AssertExpectations(t)
+
+}
+
+func TestRemoveAndPublishSuccessfully(t *testing.T) {
+       m, i := setup()
+
+       expectedChannel := "{namespace},channel"
+       expectedEvent := "event"
+       expectedKeys := []string{"{namespace},key1", "{namespace},key2"}
+
+       m.On("DelPub", expectedChannel, expectedEvent, expectedKeys).Return(nil)
+       err := i.RemoveAndPublish([]string{"channel", "event"}, []string{"key1", "key2"})
+       assert.Nil(t, err)
+       m.AssertExpectations(t)
+}
+func TestRemoveAndPublishFail(t *testing.T) {
+       m, i := setup()
+
+       expectedChannel := "{namespace},channel"
+       expectedEvent := "event"
+       expectedKeys := []string{"{namespace},key1", "{namespace},key2"}
+
+       m.On("DelPub", expectedChannel, expectedEvent, expectedKeys).Return(errors.New("Some error"))
+       err := i.RemoveAndPublish([]string{"channel", "event"}, []string{"key1", "key2"})
+       assert.NotNil(t, err)
+       m.AssertExpectations(t)
+}
+
+func TestRemoveAndPublishNoChannels(t *testing.T) {
+       m, i := setup()
+
+       expectedKeys := []string{"{namespace},key1", "{namespace},key2"}
+
+       m.On("Del", expectedKeys).Return(nil)
+       err := i.RemoveAndPublish([]string{}, []string{"key1", "key2"})
+       assert.Nil(t, err)
+       m.AssertExpectations(t)
+}
+
+func TestRemoveAndPublishIncorrectChannel(t *testing.T) {
+       m, i := setup()
+
+       notExpectedChannel := "{namespace},channel"
+       notExpectedEvent := "event"
+       notExpectedKeys := []string{"{namespace},key"}
+
+       m.AssertNotCalled(t, "DelPub", notExpectedChannel, notExpectedEvent, notExpectedKeys)
+       m.AssertNotCalled(t, "Del", notExpectedKeys)
+       err := i.RemoveAndPublish([]string{"channel", "event", "channel2"}, []string{})
+       assert.Nil(t, err)
+       m.AssertExpectations(t)
+
+}
+func TestRemoveAndPublishNoKeys(t *testing.T) {
+       m, i := setup()
+
+       notExpectedChannel := "{namespace},channel"
+       notExpectedEvent := "event"
+       notExpectedKeys := []string{"{namespace},key"}
+
+       m.AssertNotCalled(t, "DelPub", notExpectedChannel, notExpectedEvent, notExpectedKeys)
+       m.AssertNotCalled(t, "Del", notExpectedKeys)
+       err := i.RemoveAndPublish([]string{"channel", "event"}, []string{})
+       assert.Nil(t, err)
+       m.AssertExpectations(t)
+}
+func TestRemoveAndPublishNoChannelsError(t *testing.T) {
+       m, i := setup()
+
+       expectedKeys := []string{"{namespace},key1", "{namespace},key2"}
+
+       m.On("Del", expectedKeys).Return(errors.New("Some error"))
+       err := i.RemoveAndPublish([]string{}, []string{"key1", "key2"})
+       assert.NotNil(t, err)
+       m.AssertExpectations(t)
+}
 func TestRemoveSuccessfully(t *testing.T) {
        m, i := setup()
 
@@ -356,13 +617,160 @@ func TestSetIfFailure(t *testing.T) {
        mSetIEExpectedKey := string("{namespace},key1")
        mSetIEExpectedOldData := interface{}("olddata")
        mSetIEExpectedNewData := interface{}("newdata")
-       m.On("SetIE", mSetIEExpectedKey, mSetIEExpectedOldData, mSetIEExpectedNewData).Return(true, errors.New("Some error"))
+       m.On("SetIE", mSetIEExpectedKey, mSetIEExpectedOldData, mSetIEExpectedNewData).Return(false, errors.New("Some error"))
        status, err := i.SetIf("key1", "olddata", "newdata")
        assert.NotNil(t, err)
        assert.False(t, status)
        m.AssertExpectations(t)
 }
 
+func TestSetIfAndPublishSuccessfully(t *testing.T) {
+       m, i := setup()
+
+       expectedChannel := "{namespace},channel"
+       expectedEvent := "event"
+       expectedKey := "{namespace},key"
+       expectedOldData := interface{}("olddata")
+       expectedNewData := interface{}("newdata")
+       m.On("SetIEPub", expectedChannel, expectedEvent, expectedKey, expectedOldData, expectedNewData).Return(true, nil)
+       status, err := i.SetIfAndPublish([]string{"channel", "event"}, "key", "olddata", "newdata")
+       assert.Nil(t, err)
+       assert.True(t, status)
+       m.AssertExpectations(t)
+}
+
+func TestSetIfAndPublishIncorrectChannelAndEvent(t *testing.T) {
+       m, i := setup()
+
+       expectedChannel := "{namespace},channel"
+       expectedEvent := "event"
+       expectedKey := "{namespace},key"
+       expectedOldData := interface{}("olddata")
+       expectedNewData := interface{}("newdata")
+       m.AssertNotCalled(t, "SetIEPub", expectedChannel, expectedEvent, expectedKey, expectedOldData, expectedNewData)
+       m.AssertNotCalled(t, "SetIE", expectedKey, expectedOldData, expectedNewData)
+       status, err := i.SetIfAndPublish([]string{"channel", "event1", "channel"}, "key", "olddata", "newdata")
+       assert.NotNil(t, err)
+       assert.False(t, status)
+       m.AssertExpectations(t)
+}
+func TestSetIfAndPublishNOKStatus(t *testing.T) {
+       m, i := setup()
+
+       expectedChannel := "{namespace},channel"
+       expectedEvent := "event"
+       expectedKey := "{namespace},key"
+       expectedOldData := interface{}("olddata")
+       expectedNewData := interface{}("newdata")
+       m.On("SetIEPub", expectedChannel, expectedEvent, expectedKey, expectedOldData, expectedNewData).Return(false, nil)
+       status, err := i.SetIfAndPublish([]string{"channel", "event"}, "key", "olddata", "newdata")
+       assert.Nil(t, err)
+       assert.False(t, status)
+       m.AssertExpectations(t)
+}
+
+func TestSetIfAndPublishNoChannels(t *testing.T) {
+       m, i := setup()
+
+       expectedKey := "{namespace},key"
+       expectedOldData := interface{}("olddata")
+       expectedNewData := interface{}("newdata")
+       m.On("SetIE", expectedKey, expectedOldData, expectedNewData).Return(true, nil)
+       status, err := i.SetIfAndPublish([]string{}, "key", "olddata", "newdata")
+       assert.Nil(t, err)
+       assert.True(t, status)
+       m.AssertExpectations(t)
+}
+
+func TestSetIfNotExistsAndPublishSuccessfully(t *testing.T) {
+       m, i := setup()
+
+       expectedChannel := "{namespace},channel"
+       expectedEvent := "event"
+       expectedKey := "{namespace},key"
+       expectedData := interface{}("data")
+
+       m.On("SetNXPub", expectedChannel, expectedEvent, expectedKey, expectedData).Return(true, nil)
+       status, err := i.SetIfNotExistsAndPublish([]string{"channel", "event"}, "key", "data")
+       assert.Nil(t, err)
+       assert.True(t, status)
+       m.AssertExpectations(t)
+}
+
+func TestSetIfNotExistsAndPublishSeveralEvents(t *testing.T) {
+       m, i := setup()
+
+       expectedChannel := "{namespace},channel"
+       expectedEvent := "event1___event2"
+       expectedKey := "{namespace},key"
+       expectedData := interface{}("data")
+
+       m.On("SetNXPub", expectedChannel, expectedEvent, expectedKey, expectedData).Return(true, nil)
+       status, err := i.SetIfNotExistsAndPublish([]string{"channel", "event1", "channel", "event2"}, "key", "data")
+       assert.Nil(t, err)
+       assert.True(t, status)
+       m.AssertExpectations(t)
+}
+
+func TestSetIfNotExistsAndPublishNoChannels(t *testing.T) {
+       m, i := setup()
+
+       expectedKey := "{namespace},key"
+       expectedData := interface{}("data")
+
+       m.On("SetNX", expectedKey, expectedData).Return(true, nil)
+       status, err := i.SetIfNotExistsAndPublish([]string{}, "key", "data")
+       assert.Nil(t, err)
+       assert.True(t, status)
+       m.AssertExpectations(t)
+}
+
+func TestSetIfNotExistsAndPublishFail(t *testing.T) {
+       m, i := setup()
+
+       expectedChannel := "{namespace},channel"
+       expectedEvent := "event"
+       expectedKey := "{namespace},key"
+       expectedData := interface{}("data")
+
+       m.On("SetNXPub", expectedChannel, expectedEvent, expectedKey, expectedData).Return(false, nil)
+       status, err := i.SetIfNotExistsAndPublish([]string{"channel", "event"}, "key", "data")
+       assert.Nil(t, err)
+       assert.False(t, status)
+       m.AssertExpectations(t)
+}
+
+func TestSetIfNotExistsAndPublishIncorrectChannels(t *testing.T) {
+       m, i := setup()
+
+       expectedChannel := "{namespace},channel"
+       expectedEvent := "event"
+       expectedKey := "{namespace},key"
+       expectedData := interface{}("data")
+
+       m.AssertNotCalled(t, "SetNXPub", expectedChannel, expectedEvent, expectedKey, expectedData)
+       m.AssertNotCalled(t, "SetNX", expectedKey, expectedData)
+       status, err := i.SetIfNotExistsAndPublish([]string{"channel", "event", "channel2"}, "key", "data")
+       assert.NotNil(t, err)
+       assert.False(t, status)
+       m.AssertExpectations(t)
+}
+
+func TestSetIfNotExistsAndPublishError(t *testing.T) {
+       m, i := setup()
+
+       expectedChannel := "{namespace},channel"
+       expectedEvent := "event"
+       expectedKey := "{namespace},key"
+       expectedData := interface{}("data")
+
+       m.On("SetNXPub", expectedChannel, expectedEvent, expectedKey, expectedData).Return(false, errors.New("Some error"))
+       status, err := i.SetIfNotExistsAndPublish([]string{"channel", "event"}, "key", "data")
+       assert.NotNil(t, err)
+       assert.False(t, status)
+       m.AssertExpectations(t)
+}
+
 func TestSetIfNotExistsSuccessfullyOkStatus(t *testing.T) {
        m, i := setup()
 
@@ -392,12 +800,87 @@ func TestSetIfNotExistsFailure(t *testing.T) {
 
        mSetNXExpectedKey := string("{namespace},key1")
        mSetNXExpectedData := interface{}("data")
-       m.On("SetNX", mSetNXExpectedKey, mSetNXExpectedData).Return(true, errors.New("Some error"))
+       m.On("SetNX", mSetNXExpectedKey, mSetNXExpectedData).Return(false, errors.New("Some error"))
        status, err := i.SetIfNotExists("key1", "data")
        assert.NotNil(t, err)
        assert.False(t, status)
        m.AssertExpectations(t)
 }
+
+func TestRemoveIfAndPublishSuccessfully(t *testing.T) {
+       m, i := setup()
+
+       expectedChannel := "{namespace},channel"
+       expectedEvent := "event1___event2"
+       expectedKey := "{namespace},key"
+       expectedValue := interface{}("data")
+
+       m.On("DelIEPub", expectedChannel, expectedEvent, expectedKey, expectedValue).Return(true, nil)
+       status, err := i.RemoveIfAndPublish([]string{"channel", "event1", "channel", "event2"}, "key", "data")
+       assert.Nil(t, err)
+       assert.True(t, status)
+       m.AssertExpectations(t)
+}
+
+func TestRemoveIfAndPublishNok(t *testing.T) {
+       m, i := setup()
+
+       expectedChannel := "{namespace},channel"
+       expectedEvent := "event1___event2"
+       expectedKey := "{namespace},key"
+       expectedValue := interface{}("data")
+
+       m.On("DelIEPub", expectedChannel, expectedEvent, expectedKey, expectedValue).Return(false, nil)
+       status, err := i.RemoveIfAndPublish([]string{"channel", "event1", "channel", "event2"}, "key", "data")
+       assert.Nil(t, err)
+       assert.False(t, status)
+       m.AssertExpectations(t)
+}
+
+func TestRemoveIfAndPublishError(t *testing.T) {
+       m, i := setup()
+
+       expectedChannel := "{namespace},channel"
+       expectedEvent := "event1___event2"
+       expectedKey := "{namespace},key"
+       expectedValue := interface{}("data")
+
+       m.On("DelIEPub", expectedChannel, expectedEvent, expectedKey, expectedValue).Return(false, errors.New("Some error"))
+       status, err := i.RemoveIfAndPublish([]string{"channel", "event1", "channel", "event2"}, "key", "data")
+       assert.NotNil(t, err)
+       assert.False(t, status)
+       m.AssertExpectations(t)
+}
+
+func TestRemoveIfAndPublishIncorrectChannel(t *testing.T) {
+       m, i := setup()
+
+       expectedChannel := "{namespace},channel"
+       expectedEvent := "event"
+       expectedKey := "{namespace},key"
+       expectedValue := interface{}("data")
+
+       m.AssertNotCalled(t, "DelIEPub", expectedChannel, expectedEvent, expectedKey, expectedValue)
+       m.AssertNotCalled(t, "DelIE", expectedKey, expectedValue)
+       status, err := i.RemoveIfAndPublish([]string{"channel", "event1", "channel"}, "key", "data")
+       assert.NotNil(t, err)
+       assert.False(t, status)
+       m.AssertExpectations(t)
+}
+
+func TestRemoveIfAndPublishNoChannels(t *testing.T) {
+       m, i := setup()
+
+       expectedKey := "{namespace},key"
+       expectedValue := interface{}("data")
+
+       m.On("DelIE", expectedKey, expectedValue).Return(true, nil)
+       status, err := i.RemoveIfAndPublish([]string{}, "key", "data")
+       assert.Nil(t, err)
+       assert.True(t, status)
+       m.AssertExpectations(t)
+
+}
 func TestRemoveIfSuccessfullyOkStatus(t *testing.T) {
        m, i := setup()
 
@@ -433,3 +916,92 @@ func TestRemoveIfFailure(t *testing.T) {
        assert.False(t, status)
        m.AssertExpectations(t)
 }
+
+func TestRemoveAllAndPublishSuccessfully(t *testing.T) {
+       m, i := setup()
+
+       mKeysExpected := string("{namespace},*")
+       mKeysReturn := []string{"{namespace},key1", "{namespace},key2"}
+       mDelExpected := mKeysReturn
+       expectedChannel := "{namespace},channel"
+       expectedEvent := "event"
+       m.On("Keys", mKeysExpected).Return(mKeysReturn, nil)
+       m.On("DelPub", expectedChannel, expectedEvent, mDelExpected).Return(nil)
+       err := i.RemoveAllAndPublish([]string{"channel", "event"})
+       assert.Nil(t, err)
+       m.AssertExpectations(t)
+}
+
+func TestRemoveAllAndPublishKeysReturnError(t *testing.T) {
+       m, i := setup()
+
+       mKeysExpected := string("{namespace},*")
+       mKeysReturn := []string{"{namespace},key1", "{namespace},key2"}
+       mDelExpected := mKeysReturn
+       expectedChannel := "{namespace},channel"
+       expectedEvent := "event"
+       m.On("Keys", mKeysExpected).Return(mKeysReturn, errors.New("Some error"))
+       m.AssertNotCalled(t, "DelPub", expectedChannel, expectedEvent, mDelExpected)
+       err := i.RemoveAllAndPublish([]string{"channel", "event"})
+       assert.NotNil(t, err)
+       m.AssertExpectations(t)
+}
+
+func TestRemoveAllAndPublishKeysDelReturnsError(t *testing.T) {
+       m, i := setup()
+
+       mKeysExpected := string("{namespace},*")
+       mKeysReturn := []string{"{namespace},key1", "{namespace},key2"}
+       mDelExpected := mKeysReturn
+       expectedChannel := "{namespace},channel"
+       expectedEvent := "event"
+       m.On("Keys", mKeysExpected).Return(mKeysReturn, nil)
+       m.On("DelPub", expectedChannel, expectedEvent, mDelExpected).Return(errors.New("Some error"))
+       err := i.RemoveAllAndPublish([]string{"channel", "event"})
+       assert.NotNil(t, err)
+       m.AssertExpectations(t)
+}
+
+func TestRemoveAllAndPublishKeysEventsWithIllegalCharacters(t *testing.T) {
+       m, i := setup()
+
+       mKeysExpected := string("{namespace},*")
+       mKeysReturn := []string{"{namespace},key1", "{namespace},key2"}
+       mDelExpected := mKeysReturn
+       expectedChannel := "{namespace},channel"
+       expectedEvent := "event"
+       m.On("Keys", mKeysExpected).Return(mKeysReturn, nil)
+       m.AssertNotCalled(t, "DelPub", expectedChannel, expectedEvent, mDelExpected)
+       err := i.RemoveAllAndPublish([]string{"channel", "event___anotherEvent"})
+       assert.NotNil(t, err)
+       m.AssertExpectations(t)
+
+}
+
+func TestRemoveAllAndPublishNoChannels(t *testing.T) {
+       m, i := setup()
+
+       mKeysExpected := string("{namespace},*")
+       mKeysReturn := []string{"{namespace},key1", "{namespace},key2"}
+       mDelExpected := mKeysReturn
+       m.On("Keys", mKeysExpected).Return(mKeysReturn, nil)
+       m.On("Del", mDelExpected).Return(nil)
+       m.AssertNotCalled(t, "DelPub", "", "", mDelExpected)
+       err := i.RemoveAllAndPublish([]string{})
+       assert.Nil(t, err)
+       m.AssertExpectations(t)
+}
+
+func TestRemoveAllAndPublishIncorrectChannel(t *testing.T) {
+       m, i := setup()
+
+       mKeysExpected := string("{namespace},*")
+       mKeysReturn := []string{"{namespace},key1", "{namespace},key2"}
+       mDelExpected := mKeysReturn
+       m.On("Keys", mKeysExpected).Return(mKeysReturn, nil)
+       m.AssertNotCalled(t, "DelPub", "", "", mDelExpected)
+       err := i.RemoveAllAndPublish([]string{"channel", "event", "channel2"})
+       assert.NotNil(t, err)
+       m.AssertExpectations(t)
+
+}