Handle Subscribe() and Unsubscribe() error 46/9146/1
authorTimo Tietavainen <timo.tietavainen@nokia.com>
Fri, 30 Sep 2022 10:17:19 +0000 (13:17 +0300)
committerTimo Tietavainen <timo.tietavainen@nokia.com>
Fri, 30 Sep 2022 10:17:19 +0000 (13:17 +0300)
Add handling of Redis channel Subscribe() and Unsubscribe() function
calls error status. Error status is returned back to the caller of SDL
APIs SubscribeChannel() and UnsubscribeChannel().

Issue-Id: RIC-942

Signed-off-by: Timo Tietavainen <timo.tietavainen@nokia.com>
Change-Id: I2442e3ef78cdddabc2b983f59b0b05f5932c45c0

internal/sdlgoredis/sdlgoredis.go
internal/sdlgoredis/sdlgoredis_test.go
sdl.go
sdl_test.go
syncstorage.go

index 20ed0bf..8cdae46 100644 (file)
@@ -324,26 +324,25 @@ func (db *DB) CloseDB() error {
        return db.client.Close()
 }
 
-func (db *DB) UnsubscribeChannelDB(channels ...string) {
+func (db *DB) UnsubscribeChannelDB(channels ...string) error {
        for _, v := range channels {
                db.sCbMap.Remove(v)
                db.ch.removeChannel <- v
+               errStr := <-db.ch.removeChannel
+               if errStr != "" {
+                       return fmt.Errorf("SDL Unsubscribe of channel %s failed: %s", v, errStr)
+               }
                if db.sCbMap.Count() == 0 {
                        db.ch.exit <- true
                }
        }
+       return nil
 }
 
-func (db *DB) SubscribeChannelDB(cb func(string, ...string), channels ...string) {
+func (db *DB) SubscribeChannelDB(cb func(string, ...string), channels ...string) error {
        if db.sCbMap.Count() == 0 {
-               for _, v := range channels {
-                       db.sCbMap.Add(v, cb)
-               }
-
-               go func(sCbMap *sharedCbMap,
-                       ch intChannels,
-                       channels ...string) {
-                       sub := db.subscribe(db.ctx, db.client, channels...)
+               go func(sCbMap *sharedCbMap, ch intChannels) {
+                       sub := db.subscribe(db.ctx, db.client, "")
                        rxChannel := sub.Channel()
                        lCbMap := sCbMap.GetMapCopy()
                        for {
@@ -356,10 +355,18 @@ func (db *DB) SubscribeChannelDB(cb func(string, ...string), channels ...string)
                                        }
                                case channel := <-ch.addChannel:
                                        lCbMap = sCbMap.GetMapCopy()
-                                       sub.Subscribe(db.ctx, channel)
+                                       if err := sub.Subscribe(db.ctx, channel); err != nil {
+                                               ch.addChannel <- err.Error()
+                                       } else {
+                                               ch.addChannel <- ""
+                                       }
                                case channel := <-ch.removeChannel:
                                        lCbMap = sCbMap.GetMapCopy()
-                                       sub.Unsubscribe(db.ctx, channel)
+                                       if err := sub.Unsubscribe(db.ctx, channel); err != nil {
+                                               ch.removeChannel <- err.Error()
+                                       } else {
+                                               ch.removeChannel <- ""
+                                       }
                                case exit := <-ch.exit:
                                        if exit {
                                                if err := sub.Close(); err != nil {
@@ -369,14 +376,17 @@ func (db *DB) SubscribeChannelDB(cb func(string, ...string), channels ...string)
                                        }
                                }
                        }
-               }(db.sCbMap, db.ch, channels...)
-
-       } else {
-               for _, v := range channels {
-                       db.sCbMap.Add(v, cb)
-                       db.ch.addChannel <- v
+               }(db.sCbMap, db.ch)
+       }
+       for _, v := range channels {
+               db.sCbMap.Add(v, cb)
+               db.ch.addChannel <- v
+               errStr := <-db.ch.addChannel
+               if errStr != "" {
+                       return fmt.Errorf("SDL Subscribe of channel %s failed: %s", v, errStr)
                }
        }
+       return nil
 }
 
 func (db *DB) MSet(pairs ...interface{}) error {
index 106ba6c..85c18bb 100644 (file)
@@ -906,16 +906,19 @@ func TestSubscribeChannelDBSubscribeRXUnsubscribe(t *testing.T) {
                Payload: "event",
        }
        ps.On("Channel").Return(ch)
+       ps.On("Subscribe").Return(nil)
        ps.On("Unsubscribe").Return(nil)
        ps.On("Close").Return(nil)
        count := 0
        receivedChannel := ""
-       db.SubscribeChannelDB(func(channel string, payload ...string) {
+       err := db.SubscribeChannelDB(func(channel string, payload ...string) {
                count++
                receivedChannel = channel
        }, "{prefix},channel")
+       assert.Nil(t, err)
        ch <- &msg
-       db.UnsubscribeChannelDB("{prefix},channel")
+       err = db.UnsubscribeChannelDB("{prefix},channel")
+       assert.Nil(t, err)
        time.Sleep(1 * time.Second)
        assert.Equal(t, 1, count)
        assert.Equal(t, "channel", receivedChannel)
@@ -923,6 +926,37 @@ func TestSubscribeChannelDBSubscribeRXUnsubscribe(t *testing.T) {
        ps.AssertExpectations(t)
 }
 
+func TestSubscribeChannelDBFailure(t *testing.T) {
+       mockedErr := errors.New("Some DB Backend Subscribe Error")
+       ps, r, db := setupHaEnv(true)
+       ch := make(chan *redis.Message)
+       ps.On("Channel").Return(ch)
+       ps.On("Subscribe").Return(mockedErr)
+       err := db.SubscribeChannelDB(func(channel string, payload ...string) {
+       }, "{prefix},channel")
+       assert.NotNil(t, err)
+       assert.Contains(t, err.Error(), mockedErr.Error())
+       r.AssertExpectations(t)
+       ps.AssertExpectations(t)
+}
+
+func TestUnsubscribeChannelDBFailure(t *testing.T) {
+       mockedErr := errors.New("Some DB Backend Unsubscribe Error")
+       ps, r, db := setupHaEnv(true)
+       ch := make(chan *redis.Message)
+       ps.On("Channel").Return(ch)
+       ps.On("Subscribe").Return(nil)
+       ps.On("Unsubscribe").Return(mockedErr)
+       err := db.SubscribeChannelDB(func(channel string, payload ...string) {
+       }, "{prefix},channel")
+       assert.Nil(t, err)
+       err = db.UnsubscribeChannelDB("{prefix},channel")
+       assert.NotNil(t, err)
+       assert.Contains(t, err.Error(), mockedErr.Error())
+       r.AssertExpectations(t)
+       ps.AssertExpectations(t)
+}
+
 func TestSubscribeChannelDBSubscribeTwoUnsubscribeOne(t *testing.T) {
        ps, r, db := setupHaEnv(true)
        ch := make(chan *redis.Message)
@@ -943,21 +977,24 @@ func TestSubscribeChannelDBSubscribeTwoUnsubscribeOne(t *testing.T) {
        ps.On("Close").Return(nil)
        count := 0
        receivedChannel1 := ""
-       db.SubscribeChannelDB(func(channel string, payload ...string) {
+       err := db.SubscribeChannelDB(func(channel string, payload ...string) {
                count++
                receivedChannel1 = channel
        }, "{prefix},channel1")
+       assert.Nil(t, err)
        ch <- &msg1
        receivedChannel2 := ""
-       db.SubscribeChannelDB(func(channel string, payload ...string) {
+       err = db.SubscribeChannelDB(func(channel string, payload ...string) {
                count++
                receivedChannel2 = channel
        }, "{prefix},channel2")
-
+       assert.Nil(t, err)
        time.Sleep(1 * time.Second)
-       db.UnsubscribeChannelDB("{prefix},channel1")
+       err = db.UnsubscribeChannelDB("{prefix},channel1")
+       assert.Nil(t, err)
        ch <- &msg2
-       db.UnsubscribeChannelDB("{prefix},channel2")
+       err = db.UnsubscribeChannelDB("{prefix},channel2")
+       assert.Nil(t, err)
        time.Sleep(1 * time.Second)
        assert.Equal(t, 2, count)
        assert.Equal(t, "channel1", receivedChannel1)
@@ -986,21 +1023,24 @@ func TestSubscribeChannelDBTwoSubscribesWithUnequalPrefixAndUnsubscribes(t *test
        ps.On("Close").Return(nil)
        count := 0
        receivedChannel1 := ""
-       db.SubscribeChannelDB(func(channel string, payload ...string) {
+       err := db.SubscribeChannelDB(func(channel string, payload ...string) {
                count++
                receivedChannel1 = channel
        }, "{prefix1},channel")
+       assert.Nil(t, err)
        ch <- &msg1
        receivedChannel2 := ""
-       db.SubscribeChannelDB(func(channel string, payload ...string) {
+       err = db.SubscribeChannelDB(func(channel string, payload ...string) {
                count++
                receivedChannel2 = channel
        }, "{prefix2},channel")
-
+       assert.Nil(t, err)
        time.Sleep(1 * time.Second)
-       db.UnsubscribeChannelDB("{prefix1},channel")
+       err = db.UnsubscribeChannelDB("{prefix1},channel")
+       assert.Nil(t, err)
        ch <- &msg2
-       db.UnsubscribeChannelDB("{prefix2},channel")
+       err = db.UnsubscribeChannelDB("{prefix2},channel")
+       assert.Nil(t, err)
        time.Sleep(1 * time.Second)
        assert.Equal(t, 2, count)
        assert.Equal(t, "channel", receivedChannel1)
@@ -1018,25 +1058,30 @@ func TestSubscribeChannelReDBSubscribeAfterUnsubscribe(t *testing.T) {
                Payload: "event",
        }
        ps.On("Channel").Return(ch)
+       ps.On("Subscribe").Return(nil)
        ps.On("Unsubscribe").Return(nil)
        ps.On("Close").Return(nil)
        count := 0
        receivedChannel := ""
 
-       db.SubscribeChannelDB(func(channel string, payload ...string) {
+       err := db.SubscribeChannelDB(func(channel string, payload ...string) {
                count++
                receivedChannel = channel
        }, "{prefix},channel")
+       assert.Nil(t, err)
        ch <- &msg
-       db.UnsubscribeChannelDB("{prefix},channel")
+       err = db.UnsubscribeChannelDB("{prefix},channel")
+       assert.Nil(t, err)
        time.Sleep(1 * time.Second)
 
-       db.SubscribeChannelDB(func(channel string, payload ...string) {
+       err = db.SubscribeChannelDB(func(channel string, payload ...string) {
                count++
                receivedChannel = channel
        }, "{prefix}", "---", "{prefix},channel")
+       assert.Nil(t, err)
        ch <- &msg
-       db.UnsubscribeChannelDB("{prefix},channel")
+       err = db.UnsubscribeChannelDB("{prefix},channel")
+       assert.Nil(t, err)
 
        time.Sleep(1 * time.Second)
        assert.Equal(t, 2, count)
@@ -1054,16 +1099,19 @@ func TestSubscribeChannelDBSubscribeReceivedEventIgnoredIfChannelNameIsUnknown(t
                Payload: "event",
        }
        ps.On("Channel").Return(ch)
+       ps.On("Subscribe").Return(nil)
        ps.On("Unsubscribe").Return(nil)
        ps.On("Close").Return(nil)
        count := 0
        receivedChannel := ""
-       db.SubscribeChannelDB(func(channel string, payload ...string) {
+       err := db.SubscribeChannelDB(func(channel string, payload ...string) {
                count++
                receivedChannel = channel
        }, "{prefix},channel")
+       assert.Nil(t, err)
        ch <- &msg
-       db.UnsubscribeChannelDB("{prefix},channel")
+       err = db.UnsubscribeChannelDB("{prefix},channel")
+       assert.Nil(t, err)
        time.Sleep(1 * time.Second)
        assert.Equal(t, 0, count)
        assert.Equal(t, "", receivedChannel)
diff --git a/sdl.go b/sdl.go
index dbee7f1..02dae3d 100644 (file)
--- a/sdl.go
+++ b/sdl.go
@@ -93,8 +93,7 @@ func NewSdlInstance(NameSpace string, db *Database) *SdlInstance {
 //Deprecated: Will be removed in a future release, please use the SubscribeChannel
 //receiver function of the SyncStorage type.
 func (s *SdlInstance) SubscribeChannel(cb func(string, ...string), channels ...string) error {
-       s.storage.SubscribeChannel(s.nameSpace, cb, channels...)
-       return nil
+       return s.storage.SubscribeChannel(s.nameSpace, cb, channels...)
 }
 
 //UnsubscribeChannel removes subscription from one or several channels.
index df83ff1..725552a 100644 (file)
@@ -37,12 +37,14 @@ type mockDB struct {
        mock.Mock
 }
 
-func (m *mockDB) SubscribeChannelDB(cb func(string, ...string), channels ...string) {
-       m.Called(cb, channels)
+func (m *mockDB) SubscribeChannelDB(cb func(string, ...string), channels ...string) error {
+       a := m.Called(cb, channels)
+       return a.Error(0)
 }
 
-func (m *mockDB) UnsubscribeChannelDB(channels ...string) {
-       m.Called(channels)
+func (m *mockDB) UnsubscribeChannelDB(channels ...string) error {
+       a := m.Called(channels)
+       return a.Error(0)
 }
 
 func (m *mockDB) MSet(pairs ...interface{}) error {
@@ -193,8 +195,23 @@ func TestSubscribeChannel(t *testing.T) {
        expectedCB := func(channel string, events ...string) {}
        expectedChannels := []string{"{namespace},channel1", "{namespace},channel2"}
 
-       m.On("SubscribeChannelDB", mock.AnythingOfType("func(string, ...string)"), expectedChannels).Return()
-       i.SubscribeChannel(expectedCB, "channel1", "channel2")
+       m.On("SubscribeChannelDB", mock.AnythingOfType("func(string, ...string)"), expectedChannels).Return(nil)
+       err := i.SubscribeChannel(expectedCB, "channel1", "channel2")
+       assert.Nil(t, err)
+       m.AssertExpectations(t)
+}
+
+func TestSubscribeChannelError(t *testing.T) {
+       mockedErr := errors.New("Some DB Backend Subscribe Error")
+       m, i := setup()
+
+       expectedCB := func(channel string, events ...string) {}
+       expectedChannels := []string{"{namespace},channel1", "{namespace},channel2"}
+
+       m.On("SubscribeChannelDB", mock.AnythingOfType("func(string, ...string)"), expectedChannels).Return(mockedErr)
+       err := i.SubscribeChannel(expectedCB, "channel1", "channel2")
+       assert.NotNil(t, err)
+       assert.Contains(t, err.Error(), mockedErr.Error())
        m.AssertExpectations(t)
 }
 
@@ -203,10 +220,25 @@ func TestUnsubscribeChannel(t *testing.T) {
 
        expectedChannels := []string{"{namespace},channel1", "{namespace},channel2"}
 
-       m.On("UnsubscribeChannelDB", expectedChannels).Return()
-       i.UnsubscribeChannel("channel1", "channel2")
+       m.On("UnsubscribeChannelDB", expectedChannels).Return(nil)
+       err := i.UnsubscribeChannel("channel1", "channel2")
+       assert.Nil(t, err)
        m.AssertExpectations(t)
 }
+
+func TestUnsubscribeChannelError(t *testing.T) {
+       mockedErr := errors.New("Some DB Backend Unsubscribe Error")
+       m, i := setup()
+
+       expectedChannels := []string{"{namespace},channel1", "{namespace},channel2"}
+
+       m.On("UnsubscribeChannelDB", expectedChannels).Return(mockedErr)
+       err := i.UnsubscribeChannel("channel1", "channel2")
+       assert.NotNil(t, err)
+       assert.Contains(t, err.Error(), mockedErr.Error())
+       m.AssertExpectations(t)
+}
+
 func TestGetOneKey(t *testing.T) {
        m, i := setup()
 
index 4571dd9..349bcc3 100644 (file)
@@ -93,16 +93,14 @@ func getHash(s string) uint32 {
 //events received from different channels, callbacks are called in series one by one.
 func (s *SyncStorage) SubscribeChannel(ns string, cb func(string, ...string), channels ...string) error {
        nsPrefix := getNsPrefix(ns)
-       s.getDbBackend(ns).SubscribeChannelDB(cb, s.setNamespaceToChannels(nsPrefix, channels...)...)
-       return nil
+       return s.getDbBackend(ns).SubscribeChannelDB(cb, s.setNamespaceToChannels(nsPrefix, channels...)...)
 }
 
 //UnsubscribeChannel removes subscription from one or several channels under given
 //namespace.
 func (s *SyncStorage) UnsubscribeChannel(ns string, channels ...string) error {
        nsPrefix := getNsPrefix(ns)
-       s.getDbBackend(ns).UnsubscribeChannelDB(s.setNamespaceToChannels(nsPrefix, channels...)...)
-       return nil
+       return s.getDbBackend(ns).UnsubscribeChannelDB(s.setNamespaceToChannels(nsPrefix, channels...)...)
 }
 
 //Close connection to backend database.
@@ -639,8 +637,8 @@ func getNsPrefix(ns string) string {
 }
 
 type iDatabase interface {
-       SubscribeChannelDB(cb func(string, ...string), channels ...string)
-       UnsubscribeChannelDB(channels ...string)
+       SubscribeChannelDB(cb func(string, ...string), channels ...string) error
+       UnsubscribeChannelDB(channels ...string) error
        MSet(pairs ...interface{}) error
        MSetMPub(channelsAndEvents []string, pairs ...interface{}) error
        MGet(keys []string) ([]interface{}, error)