return db.client.Close()
}
-func (db *DB) UnsubscribeChannelDB(channels ...string) {
+func (db *DB) UnsubscribeChannelDB(channels ...string) error {
for _, v := range channels {
db.sCbMap.Remove(v)
db.ch.removeChannel <- v
+ errStr := <-db.ch.removeChannel
+ if errStr != "" {
+ return fmt.Errorf("SDL Unsubscribe of channel %s failed: %s", v, errStr)
+ }
if db.sCbMap.Count() == 0 {
db.ch.exit <- true
}
}
+ return nil
}
-func (db *DB) SubscribeChannelDB(cb func(string, ...string), channels ...string) {
+func (db *DB) SubscribeChannelDB(cb func(string, ...string), channels ...string) error {
if db.sCbMap.Count() == 0 {
- for _, v := range channels {
- db.sCbMap.Add(v, cb)
- }
-
- go func(sCbMap *sharedCbMap,
- ch intChannels,
- channels ...string) {
- sub := db.subscribe(db.ctx, db.client, channels...)
+ go func(sCbMap *sharedCbMap, ch intChannels) {
+ sub := db.subscribe(db.ctx, db.client, "")
rxChannel := sub.Channel()
lCbMap := sCbMap.GetMapCopy()
for {
}
case channel := <-ch.addChannel:
lCbMap = sCbMap.GetMapCopy()
- sub.Subscribe(db.ctx, channel)
+ if err := sub.Subscribe(db.ctx, channel); err != nil {
+ ch.addChannel <- err.Error()
+ } else {
+ ch.addChannel <- ""
+ }
case channel := <-ch.removeChannel:
lCbMap = sCbMap.GetMapCopy()
- sub.Unsubscribe(db.ctx, channel)
+ if err := sub.Unsubscribe(db.ctx, channel); err != nil {
+ ch.removeChannel <- err.Error()
+ } else {
+ ch.removeChannel <- ""
+ }
case exit := <-ch.exit:
if exit {
if err := sub.Close(); err != nil {
}
}
}
- }(db.sCbMap, db.ch, channels...)
-
- } else {
- for _, v := range channels {
- db.sCbMap.Add(v, cb)
- db.ch.addChannel <- v
+ }(db.sCbMap, db.ch)
+ }
+ for _, v := range channels {
+ db.sCbMap.Add(v, cb)
+ db.ch.addChannel <- v
+ errStr := <-db.ch.addChannel
+ if errStr != "" {
+ return fmt.Errorf("SDL Subscribe of channel %s failed: %s", v, errStr)
}
}
+ return nil
}
func (db *DB) MSet(pairs ...interface{}) error {
Payload: "event",
}
ps.On("Channel").Return(ch)
+ ps.On("Subscribe").Return(nil)
ps.On("Unsubscribe").Return(nil)
ps.On("Close").Return(nil)
count := 0
receivedChannel := ""
- db.SubscribeChannelDB(func(channel string, payload ...string) {
+ err := db.SubscribeChannelDB(func(channel string, payload ...string) {
count++
receivedChannel = channel
}, "{prefix},channel")
+ assert.Nil(t, err)
ch <- &msg
- db.UnsubscribeChannelDB("{prefix},channel")
+ err = db.UnsubscribeChannelDB("{prefix},channel")
+ assert.Nil(t, err)
time.Sleep(1 * time.Second)
assert.Equal(t, 1, count)
assert.Equal(t, "channel", receivedChannel)
ps.AssertExpectations(t)
}
+func TestSubscribeChannelDBFailure(t *testing.T) {
+ mockedErr := errors.New("Some DB Backend Subscribe Error")
+ ps, r, db := setupHaEnv(true)
+ ch := make(chan *redis.Message)
+ ps.On("Channel").Return(ch)
+ ps.On("Subscribe").Return(mockedErr)
+ err := db.SubscribeChannelDB(func(channel string, payload ...string) {
+ }, "{prefix},channel")
+ assert.NotNil(t, err)
+ assert.Contains(t, err.Error(), mockedErr.Error())
+ r.AssertExpectations(t)
+ ps.AssertExpectations(t)
+}
+
+func TestUnsubscribeChannelDBFailure(t *testing.T) {
+ mockedErr := errors.New("Some DB Backend Unsubscribe Error")
+ ps, r, db := setupHaEnv(true)
+ ch := make(chan *redis.Message)
+ ps.On("Channel").Return(ch)
+ ps.On("Subscribe").Return(nil)
+ ps.On("Unsubscribe").Return(mockedErr)
+ err := db.SubscribeChannelDB(func(channel string, payload ...string) {
+ }, "{prefix},channel")
+ assert.Nil(t, err)
+ err = db.UnsubscribeChannelDB("{prefix},channel")
+ assert.NotNil(t, err)
+ assert.Contains(t, err.Error(), mockedErr.Error())
+ r.AssertExpectations(t)
+ ps.AssertExpectations(t)
+}
+
func TestSubscribeChannelDBSubscribeTwoUnsubscribeOne(t *testing.T) {
ps, r, db := setupHaEnv(true)
ch := make(chan *redis.Message)
ps.On("Close").Return(nil)
count := 0
receivedChannel1 := ""
- db.SubscribeChannelDB(func(channel string, payload ...string) {
+ err := db.SubscribeChannelDB(func(channel string, payload ...string) {
count++
receivedChannel1 = channel
}, "{prefix},channel1")
+ assert.Nil(t, err)
ch <- &msg1
receivedChannel2 := ""
- db.SubscribeChannelDB(func(channel string, payload ...string) {
+ err = db.SubscribeChannelDB(func(channel string, payload ...string) {
count++
receivedChannel2 = channel
}, "{prefix},channel2")
-
+ assert.Nil(t, err)
time.Sleep(1 * time.Second)
- db.UnsubscribeChannelDB("{prefix},channel1")
+ err = db.UnsubscribeChannelDB("{prefix},channel1")
+ assert.Nil(t, err)
ch <- &msg2
- db.UnsubscribeChannelDB("{prefix},channel2")
+ err = db.UnsubscribeChannelDB("{prefix},channel2")
+ assert.Nil(t, err)
time.Sleep(1 * time.Second)
assert.Equal(t, 2, count)
assert.Equal(t, "channel1", receivedChannel1)
ps.On("Close").Return(nil)
count := 0
receivedChannel1 := ""
- db.SubscribeChannelDB(func(channel string, payload ...string) {
+ err := db.SubscribeChannelDB(func(channel string, payload ...string) {
count++
receivedChannel1 = channel
}, "{prefix1},channel")
+ assert.Nil(t, err)
ch <- &msg1
receivedChannel2 := ""
- db.SubscribeChannelDB(func(channel string, payload ...string) {
+ err = db.SubscribeChannelDB(func(channel string, payload ...string) {
count++
receivedChannel2 = channel
}, "{prefix2},channel")
-
+ assert.Nil(t, err)
time.Sleep(1 * time.Second)
- db.UnsubscribeChannelDB("{prefix1},channel")
+ err = db.UnsubscribeChannelDB("{prefix1},channel")
+ assert.Nil(t, err)
ch <- &msg2
- db.UnsubscribeChannelDB("{prefix2},channel")
+ err = db.UnsubscribeChannelDB("{prefix2},channel")
+ assert.Nil(t, err)
time.Sleep(1 * time.Second)
assert.Equal(t, 2, count)
assert.Equal(t, "channel", receivedChannel1)
Payload: "event",
}
ps.On("Channel").Return(ch)
+ ps.On("Subscribe").Return(nil)
ps.On("Unsubscribe").Return(nil)
ps.On("Close").Return(nil)
count := 0
receivedChannel := ""
- db.SubscribeChannelDB(func(channel string, payload ...string) {
+ err := db.SubscribeChannelDB(func(channel string, payload ...string) {
count++
receivedChannel = channel
}, "{prefix},channel")
+ assert.Nil(t, err)
ch <- &msg
- db.UnsubscribeChannelDB("{prefix},channel")
+ err = db.UnsubscribeChannelDB("{prefix},channel")
+ assert.Nil(t, err)
time.Sleep(1 * time.Second)
- db.SubscribeChannelDB(func(channel string, payload ...string) {
+ err = db.SubscribeChannelDB(func(channel string, payload ...string) {
count++
receivedChannel = channel
}, "{prefix}", "---", "{prefix},channel")
+ assert.Nil(t, err)
ch <- &msg
- db.UnsubscribeChannelDB("{prefix},channel")
+ err = db.UnsubscribeChannelDB("{prefix},channel")
+ assert.Nil(t, err)
time.Sleep(1 * time.Second)
assert.Equal(t, 2, count)
Payload: "event",
}
ps.On("Channel").Return(ch)
+ ps.On("Subscribe").Return(nil)
ps.On("Unsubscribe").Return(nil)
ps.On("Close").Return(nil)
count := 0
receivedChannel := ""
- db.SubscribeChannelDB(func(channel string, payload ...string) {
+ err := db.SubscribeChannelDB(func(channel string, payload ...string) {
count++
receivedChannel = channel
}, "{prefix},channel")
+ assert.Nil(t, err)
ch <- &msg
- db.UnsubscribeChannelDB("{prefix},channel")
+ err = db.UnsubscribeChannelDB("{prefix},channel")
+ assert.Nil(t, err)
time.Sleep(1 * time.Second)
assert.Equal(t, 0, count)
assert.Equal(t, "", receivedChannel)
//Deprecated: Will be removed in a future release, please use the SubscribeChannel
//receiver function of the SyncStorage type.
func (s *SdlInstance) SubscribeChannel(cb func(string, ...string), channels ...string) error {
- s.storage.SubscribeChannel(s.nameSpace, cb, channels...)
- return nil
+ return s.storage.SubscribeChannel(s.nameSpace, cb, channels...)
}
//UnsubscribeChannel removes subscription from one or several channels.
mock.Mock
}
-func (m *mockDB) SubscribeChannelDB(cb func(string, ...string), channels ...string) {
- m.Called(cb, channels)
+func (m *mockDB) SubscribeChannelDB(cb func(string, ...string), channels ...string) error {
+ a := m.Called(cb, channels)
+ return a.Error(0)
}
-func (m *mockDB) UnsubscribeChannelDB(channels ...string) {
- m.Called(channels)
+func (m *mockDB) UnsubscribeChannelDB(channels ...string) error {
+ a := m.Called(channels)
+ return a.Error(0)
}
func (m *mockDB) MSet(pairs ...interface{}) error {
expectedCB := func(channel string, events ...string) {}
expectedChannels := []string{"{namespace},channel1", "{namespace},channel2"}
- m.On("SubscribeChannelDB", mock.AnythingOfType("func(string, ...string)"), expectedChannels).Return()
- i.SubscribeChannel(expectedCB, "channel1", "channel2")
+ m.On("SubscribeChannelDB", mock.AnythingOfType("func(string, ...string)"), expectedChannels).Return(nil)
+ err := i.SubscribeChannel(expectedCB, "channel1", "channel2")
+ assert.Nil(t, err)
+ m.AssertExpectations(t)
+}
+
+func TestSubscribeChannelError(t *testing.T) {
+ mockedErr := errors.New("Some DB Backend Subscribe Error")
+ m, i := setup()
+
+ expectedCB := func(channel string, events ...string) {}
+ expectedChannels := []string{"{namespace},channel1", "{namespace},channel2"}
+
+ m.On("SubscribeChannelDB", mock.AnythingOfType("func(string, ...string)"), expectedChannels).Return(mockedErr)
+ err := i.SubscribeChannel(expectedCB, "channel1", "channel2")
+ assert.NotNil(t, err)
+ assert.Contains(t, err.Error(), mockedErr.Error())
m.AssertExpectations(t)
}
expectedChannels := []string{"{namespace},channel1", "{namespace},channel2"}
- m.On("UnsubscribeChannelDB", expectedChannels).Return()
- i.UnsubscribeChannel("channel1", "channel2")
+ m.On("UnsubscribeChannelDB", expectedChannels).Return(nil)
+ err := i.UnsubscribeChannel("channel1", "channel2")
+ assert.Nil(t, err)
m.AssertExpectations(t)
}
+
+func TestUnsubscribeChannelError(t *testing.T) {
+ mockedErr := errors.New("Some DB Backend Unsubscribe Error")
+ m, i := setup()
+
+ expectedChannels := []string{"{namespace},channel1", "{namespace},channel2"}
+
+ m.On("UnsubscribeChannelDB", expectedChannels).Return(mockedErr)
+ err := i.UnsubscribeChannel("channel1", "channel2")
+ assert.NotNil(t, err)
+ assert.Contains(t, err.Error(), mockedErr.Error())
+ m.AssertExpectations(t)
+}
+
func TestGetOneKey(t *testing.T) {
m, i := setup()
//events received from different channels, callbacks are called in series one by one.
func (s *SyncStorage) SubscribeChannel(ns string, cb func(string, ...string), channels ...string) error {
nsPrefix := getNsPrefix(ns)
- s.getDbBackend(ns).SubscribeChannelDB(cb, s.setNamespaceToChannels(nsPrefix, channels...)...)
- return nil
+ return s.getDbBackend(ns).SubscribeChannelDB(cb, s.setNamespaceToChannels(nsPrefix, channels...)...)
}
//UnsubscribeChannel removes subscription from one or several channels under given
//namespace.
func (s *SyncStorage) UnsubscribeChannel(ns string, channels ...string) error {
nsPrefix := getNsPrefix(ns)
- s.getDbBackend(ns).UnsubscribeChannelDB(s.setNamespaceToChannels(nsPrefix, channels...)...)
- return nil
+ return s.getDbBackend(ns).UnsubscribeChannelDB(s.setNamespaceToChannels(nsPrefix, channels...)...)
}
//Close connection to backend database.
}
type iDatabase interface {
- SubscribeChannelDB(cb func(string, ...string), channels ...string)
- UnsubscribeChannelDB(channels ...string)
+ SubscribeChannelDB(cb func(string, ...string), channels ...string) error
+ UnsubscribeChannelDB(channels ...string) error
MSet(pairs ...interface{}) error
MSetMPub(channelsAndEvents []string, pairs ...interface{}) error
MGet(keys []string) ([]interface{}, error)