From d14c9d8bc5ed846c4b9f05af739eb9fba1978737 Mon Sep 17 00:00:00 2001 From: Timo Tietavainen Date: Fri, 30 Sep 2022 13:17:19 +0300 Subject: [PATCH] Handle Subscribe() and Unsubscribe() error Add handling of Redis channel Subscribe() and Unsubscribe() function calls error status. Error status is returned back to the caller of SDL APIs SubscribeChannel() and UnsubscribeChannel(). Issue-Id: RIC-942 Signed-off-by: Timo Tietavainen Change-Id: I2442e3ef78cdddabc2b983f59b0b05f5932c45c0 --- internal/sdlgoredis/sdlgoredis.go | 46 +++++++++++-------- internal/sdlgoredis/sdlgoredis_test.go | 84 ++++++++++++++++++++++++++-------- sdl.go | 3 +- sdl_test.go | 48 +++++++++++++++---- syncstorage.go | 10 ++-- 5 files changed, 139 insertions(+), 52 deletions(-) diff --git a/internal/sdlgoredis/sdlgoredis.go b/internal/sdlgoredis/sdlgoredis.go index 20ed0bf..8cdae46 100644 --- a/internal/sdlgoredis/sdlgoredis.go +++ b/internal/sdlgoredis/sdlgoredis.go @@ -324,26 +324,25 @@ func (db *DB) CloseDB() error { return db.client.Close() } -func (db *DB) UnsubscribeChannelDB(channels ...string) { +func (db *DB) UnsubscribeChannelDB(channels ...string) error { for _, v := range channels { db.sCbMap.Remove(v) db.ch.removeChannel <- v + errStr := <-db.ch.removeChannel + if errStr != "" { + return fmt.Errorf("SDL Unsubscribe of channel %s failed: %s", v, errStr) + } if db.sCbMap.Count() == 0 { db.ch.exit <- true } } + return nil } -func (db *DB) SubscribeChannelDB(cb func(string, ...string), channels ...string) { +func (db *DB) SubscribeChannelDB(cb func(string, ...string), channels ...string) error { if db.sCbMap.Count() == 0 { - for _, v := range channels { - db.sCbMap.Add(v, cb) - } - - go func(sCbMap *sharedCbMap, - ch intChannels, - channels ...string) { - sub := db.subscribe(db.ctx, db.client, channels...) + go func(sCbMap *sharedCbMap, ch intChannels) { + sub := db.subscribe(db.ctx, db.client, "") rxChannel := sub.Channel() lCbMap := sCbMap.GetMapCopy() for { @@ -356,10 +355,18 @@ func (db *DB) SubscribeChannelDB(cb func(string, ...string), channels ...string) } case channel := <-ch.addChannel: lCbMap = sCbMap.GetMapCopy() - sub.Subscribe(db.ctx, channel) + if err := sub.Subscribe(db.ctx, channel); err != nil { + ch.addChannel <- err.Error() + } else { + ch.addChannel <- "" + } case channel := <-ch.removeChannel: lCbMap = sCbMap.GetMapCopy() - sub.Unsubscribe(db.ctx, channel) + if err := sub.Unsubscribe(db.ctx, channel); err != nil { + ch.removeChannel <- err.Error() + } else { + ch.removeChannel <- "" + } case exit := <-ch.exit: if exit { if err := sub.Close(); err != nil { @@ -369,14 +376,17 @@ func (db *DB) SubscribeChannelDB(cb func(string, ...string), channels ...string) } } } - }(db.sCbMap, db.ch, channels...) - - } else { - for _, v := range channels { - db.sCbMap.Add(v, cb) - db.ch.addChannel <- v + }(db.sCbMap, db.ch) + } + for _, v := range channels { + db.sCbMap.Add(v, cb) + db.ch.addChannel <- v + errStr := <-db.ch.addChannel + if errStr != "" { + return fmt.Errorf("SDL Subscribe of channel %s failed: %s", v, errStr) } } + return nil } func (db *DB) MSet(pairs ...interface{}) error { diff --git a/internal/sdlgoredis/sdlgoredis_test.go b/internal/sdlgoredis/sdlgoredis_test.go index 106ba6c..85c18bb 100644 --- a/internal/sdlgoredis/sdlgoredis_test.go +++ b/internal/sdlgoredis/sdlgoredis_test.go @@ -906,16 +906,19 @@ func TestSubscribeChannelDBSubscribeRXUnsubscribe(t *testing.T) { Payload: "event", } ps.On("Channel").Return(ch) + ps.On("Subscribe").Return(nil) ps.On("Unsubscribe").Return(nil) ps.On("Close").Return(nil) count := 0 receivedChannel := "" - db.SubscribeChannelDB(func(channel string, payload ...string) { + err := db.SubscribeChannelDB(func(channel string, payload ...string) { count++ receivedChannel = channel }, "{prefix},channel") + assert.Nil(t, err) ch <- &msg - db.UnsubscribeChannelDB("{prefix},channel") + err = db.UnsubscribeChannelDB("{prefix},channel") + assert.Nil(t, err) time.Sleep(1 * time.Second) assert.Equal(t, 1, count) assert.Equal(t, "channel", receivedChannel) @@ -923,6 +926,37 @@ func TestSubscribeChannelDBSubscribeRXUnsubscribe(t *testing.T) { ps.AssertExpectations(t) } +func TestSubscribeChannelDBFailure(t *testing.T) { + mockedErr := errors.New("Some DB Backend Subscribe Error") + ps, r, db := setupHaEnv(true) + ch := make(chan *redis.Message) + ps.On("Channel").Return(ch) + ps.On("Subscribe").Return(mockedErr) + err := db.SubscribeChannelDB(func(channel string, payload ...string) { + }, "{prefix},channel") + assert.NotNil(t, err) + assert.Contains(t, err.Error(), mockedErr.Error()) + r.AssertExpectations(t) + ps.AssertExpectations(t) +} + +func TestUnsubscribeChannelDBFailure(t *testing.T) { + mockedErr := errors.New("Some DB Backend Unsubscribe Error") + ps, r, db := setupHaEnv(true) + ch := make(chan *redis.Message) + ps.On("Channel").Return(ch) + ps.On("Subscribe").Return(nil) + ps.On("Unsubscribe").Return(mockedErr) + err := db.SubscribeChannelDB(func(channel string, payload ...string) { + }, "{prefix},channel") + assert.Nil(t, err) + err = db.UnsubscribeChannelDB("{prefix},channel") + assert.NotNil(t, err) + assert.Contains(t, err.Error(), mockedErr.Error()) + r.AssertExpectations(t) + ps.AssertExpectations(t) +} + func TestSubscribeChannelDBSubscribeTwoUnsubscribeOne(t *testing.T) { ps, r, db := setupHaEnv(true) ch := make(chan *redis.Message) @@ -943,21 +977,24 @@ func TestSubscribeChannelDBSubscribeTwoUnsubscribeOne(t *testing.T) { ps.On("Close").Return(nil) count := 0 receivedChannel1 := "" - db.SubscribeChannelDB(func(channel string, payload ...string) { + err := db.SubscribeChannelDB(func(channel string, payload ...string) { count++ receivedChannel1 = channel }, "{prefix},channel1") + assert.Nil(t, err) ch <- &msg1 receivedChannel2 := "" - db.SubscribeChannelDB(func(channel string, payload ...string) { + err = db.SubscribeChannelDB(func(channel string, payload ...string) { count++ receivedChannel2 = channel }, "{prefix},channel2") - + assert.Nil(t, err) time.Sleep(1 * time.Second) - db.UnsubscribeChannelDB("{prefix},channel1") + err = db.UnsubscribeChannelDB("{prefix},channel1") + assert.Nil(t, err) ch <- &msg2 - db.UnsubscribeChannelDB("{prefix},channel2") + err = db.UnsubscribeChannelDB("{prefix},channel2") + assert.Nil(t, err) time.Sleep(1 * time.Second) assert.Equal(t, 2, count) assert.Equal(t, "channel1", receivedChannel1) @@ -986,21 +1023,24 @@ func TestSubscribeChannelDBTwoSubscribesWithUnequalPrefixAndUnsubscribes(t *test ps.On("Close").Return(nil) count := 0 receivedChannel1 := "" - db.SubscribeChannelDB(func(channel string, payload ...string) { + err := db.SubscribeChannelDB(func(channel string, payload ...string) { count++ receivedChannel1 = channel }, "{prefix1},channel") + assert.Nil(t, err) ch <- &msg1 receivedChannel2 := "" - db.SubscribeChannelDB(func(channel string, payload ...string) { + err = db.SubscribeChannelDB(func(channel string, payload ...string) { count++ receivedChannel2 = channel }, "{prefix2},channel") - + assert.Nil(t, err) time.Sleep(1 * time.Second) - db.UnsubscribeChannelDB("{prefix1},channel") + err = db.UnsubscribeChannelDB("{prefix1},channel") + assert.Nil(t, err) ch <- &msg2 - db.UnsubscribeChannelDB("{prefix2},channel") + err = db.UnsubscribeChannelDB("{prefix2},channel") + assert.Nil(t, err) time.Sleep(1 * time.Second) assert.Equal(t, 2, count) assert.Equal(t, "channel", receivedChannel1) @@ -1018,25 +1058,30 @@ func TestSubscribeChannelReDBSubscribeAfterUnsubscribe(t *testing.T) { Payload: "event", } ps.On("Channel").Return(ch) + ps.On("Subscribe").Return(nil) ps.On("Unsubscribe").Return(nil) ps.On("Close").Return(nil) count := 0 receivedChannel := "" - db.SubscribeChannelDB(func(channel string, payload ...string) { + err := db.SubscribeChannelDB(func(channel string, payload ...string) { count++ receivedChannel = channel }, "{prefix},channel") + assert.Nil(t, err) ch <- &msg - db.UnsubscribeChannelDB("{prefix},channel") + err = db.UnsubscribeChannelDB("{prefix},channel") + assert.Nil(t, err) time.Sleep(1 * time.Second) - db.SubscribeChannelDB(func(channel string, payload ...string) { + err = db.SubscribeChannelDB(func(channel string, payload ...string) { count++ receivedChannel = channel }, "{prefix}", "---", "{prefix},channel") + assert.Nil(t, err) ch <- &msg - db.UnsubscribeChannelDB("{prefix},channel") + err = db.UnsubscribeChannelDB("{prefix},channel") + assert.Nil(t, err) time.Sleep(1 * time.Second) assert.Equal(t, 2, count) @@ -1054,16 +1099,19 @@ func TestSubscribeChannelDBSubscribeReceivedEventIgnoredIfChannelNameIsUnknown(t Payload: "event", } ps.On("Channel").Return(ch) + ps.On("Subscribe").Return(nil) ps.On("Unsubscribe").Return(nil) ps.On("Close").Return(nil) count := 0 receivedChannel := "" - db.SubscribeChannelDB(func(channel string, payload ...string) { + err := db.SubscribeChannelDB(func(channel string, payload ...string) { count++ receivedChannel = channel }, "{prefix},channel") + assert.Nil(t, err) ch <- &msg - db.UnsubscribeChannelDB("{prefix},channel") + err = db.UnsubscribeChannelDB("{prefix},channel") + assert.Nil(t, err) time.Sleep(1 * time.Second) assert.Equal(t, 0, count) assert.Equal(t, "", receivedChannel) diff --git a/sdl.go b/sdl.go index dbee7f1..02dae3d 100644 --- a/sdl.go +++ b/sdl.go @@ -93,8 +93,7 @@ func NewSdlInstance(NameSpace string, db *Database) *SdlInstance { //Deprecated: Will be removed in a future release, please use the SubscribeChannel //receiver function of the SyncStorage type. func (s *SdlInstance) SubscribeChannel(cb func(string, ...string), channels ...string) error { - s.storage.SubscribeChannel(s.nameSpace, cb, channels...) - return nil + return s.storage.SubscribeChannel(s.nameSpace, cb, channels...) } //UnsubscribeChannel removes subscription from one or several channels. diff --git a/sdl_test.go b/sdl_test.go index df83ff1..725552a 100644 --- a/sdl_test.go +++ b/sdl_test.go @@ -37,12 +37,14 @@ type mockDB struct { mock.Mock } -func (m *mockDB) SubscribeChannelDB(cb func(string, ...string), channels ...string) { - m.Called(cb, channels) +func (m *mockDB) SubscribeChannelDB(cb func(string, ...string), channels ...string) error { + a := m.Called(cb, channels) + return a.Error(0) } -func (m *mockDB) UnsubscribeChannelDB(channels ...string) { - m.Called(channels) +func (m *mockDB) UnsubscribeChannelDB(channels ...string) error { + a := m.Called(channels) + return a.Error(0) } func (m *mockDB) MSet(pairs ...interface{}) error { @@ -193,8 +195,23 @@ func TestSubscribeChannel(t *testing.T) { expectedCB := func(channel string, events ...string) {} expectedChannels := []string{"{namespace},channel1", "{namespace},channel2"} - m.On("SubscribeChannelDB", mock.AnythingOfType("func(string, ...string)"), expectedChannels).Return() - i.SubscribeChannel(expectedCB, "channel1", "channel2") + m.On("SubscribeChannelDB", mock.AnythingOfType("func(string, ...string)"), expectedChannels).Return(nil) + err := i.SubscribeChannel(expectedCB, "channel1", "channel2") + assert.Nil(t, err) + m.AssertExpectations(t) +} + +func TestSubscribeChannelError(t *testing.T) { + mockedErr := errors.New("Some DB Backend Subscribe Error") + m, i := setup() + + expectedCB := func(channel string, events ...string) {} + expectedChannels := []string{"{namespace},channel1", "{namespace},channel2"} + + m.On("SubscribeChannelDB", mock.AnythingOfType("func(string, ...string)"), expectedChannels).Return(mockedErr) + err := i.SubscribeChannel(expectedCB, "channel1", "channel2") + assert.NotNil(t, err) + assert.Contains(t, err.Error(), mockedErr.Error()) m.AssertExpectations(t) } @@ -203,10 +220,25 @@ func TestUnsubscribeChannel(t *testing.T) { expectedChannels := []string{"{namespace},channel1", "{namespace},channel2"} - m.On("UnsubscribeChannelDB", expectedChannels).Return() - i.UnsubscribeChannel("channel1", "channel2") + m.On("UnsubscribeChannelDB", expectedChannels).Return(nil) + err := i.UnsubscribeChannel("channel1", "channel2") + assert.Nil(t, err) m.AssertExpectations(t) } + +func TestUnsubscribeChannelError(t *testing.T) { + mockedErr := errors.New("Some DB Backend Unsubscribe Error") + m, i := setup() + + expectedChannels := []string{"{namespace},channel1", "{namespace},channel2"} + + m.On("UnsubscribeChannelDB", expectedChannels).Return(mockedErr) + err := i.UnsubscribeChannel("channel1", "channel2") + assert.NotNil(t, err) + assert.Contains(t, err.Error(), mockedErr.Error()) + m.AssertExpectations(t) +} + func TestGetOneKey(t *testing.T) { m, i := setup() diff --git a/syncstorage.go b/syncstorage.go index 4571dd9..349bcc3 100644 --- a/syncstorage.go +++ b/syncstorage.go @@ -93,16 +93,14 @@ func getHash(s string) uint32 { //events received from different channels, callbacks are called in series one by one. func (s *SyncStorage) SubscribeChannel(ns string, cb func(string, ...string), channels ...string) error { nsPrefix := getNsPrefix(ns) - s.getDbBackend(ns).SubscribeChannelDB(cb, s.setNamespaceToChannels(nsPrefix, channels...)...) - return nil + return s.getDbBackend(ns).SubscribeChannelDB(cb, s.setNamespaceToChannels(nsPrefix, channels...)...) } //UnsubscribeChannel removes subscription from one or several channels under given //namespace. func (s *SyncStorage) UnsubscribeChannel(ns string, channels ...string) error { nsPrefix := getNsPrefix(ns) - s.getDbBackend(ns).UnsubscribeChannelDB(s.setNamespaceToChannels(nsPrefix, channels...)...) - return nil + return s.getDbBackend(ns).UnsubscribeChannelDB(s.setNamespaceToChannels(nsPrefix, channels...)...) } //Close connection to backend database. @@ -639,8 +637,8 @@ func getNsPrefix(ns string) string { } type iDatabase interface { - SubscribeChannelDB(cb func(string, ...string), channels ...string) - UnsubscribeChannelDB(channels ...string) + SubscribeChannelDB(cb func(string, ...string), channels ...string) error + UnsubscribeChannelDB(channels ...string) error MSet(pairs ...interface{}) error MSetMPub(channelsAndEvents []string, pairs ...interface{}) error MGet(keys []string) ([]interface{}, error) -- 2.16.6