Fix multi-namespace SDL event subscribe 92/7692/2 v0.9.6
authorTimo Tietavainen <timo.tietavainen@nokia.com>
Mon, 7 Feb 2022 11:16:51 +0000 (13:16 +0200)
committerTimo Tietavainen <timo.tietavainen@nokia.com>
Mon, 7 Feb 2022 15:50:23 +0000 (17:50 +0200)
SDL event channel internal format was wrongly showed to application
when the same application instance was subscribing to receive SDL
events from multiple namespaces. In that case subscribes were all
registered to the first namespace in SDL and hence when an event in a
channel belonging to another namespace was triggered, namespace internal
format '{other-namespace},' was showed in front of channel name. With
this commit change namespace lookup from channel name to search from
left to right the first comma ',' character. String after comma is
a channel name what should be showed to an application and all before
the comma ',' character can be silently parsed away in SDL.

Issue-Id: RIC-885

Signed-off-by: Timo Tietavainen <timo.tietavainen@nokia.com>
Change-Id: I90b94862670e1e529381112e385b65250e7cb275

docs/release-notes.rst
internal/sdlgoredis/sdlgoredis.go
internal/sdlgoredis/sdlgoredis_test.go
sdl_test.go
syncstorage.go

index 463d876..d792912 100644 (file)
@@ -30,6 +30,10 @@ This document provides the release notes of the sdlgo.
 Version history
 ---------------
 
+[0.9.6] - 2022-02-07
+
+* Fix multi-namespace SDL event subscribe
+
 [0.9.5] - 2022-01-20
 
 * Pump Redis client version to v8.11.4 and fix Redis APIs to have a Golang
index 278be2a..9162f39 100644 (file)
@@ -38,6 +38,9 @@ import (
        "time"
 )
 
+const EventSeparator = "___"
+const NsSeparator = ","
+
 type ChannelNotificationCb func(channel string, payload ...string)
 type RedisClientCreator func(addr, port, clusterName string, isHa bool) RedisClient
 
@@ -300,15 +303,13 @@ func (db *DB) UnsubscribeChannelDB(channels ...string) {
        }
 }
 
-func (db *DB) SubscribeChannelDB(cb func(string, ...string), channelPrefix, eventSeparator string, channels ...string) {
+func (db *DB) SubscribeChannelDB(cb func(string, ...string), channels ...string) {
        if db.sCbMap.Count() == 0 {
                for _, v := range channels {
                        db.sCbMap.Add(v, cb)
                }
 
                go func(sCbMap *sharedCbMap,
-                       channelPrefix,
-                       eventSeparator string,
                        ch intChannels,
                        channels ...string) {
                        sub := db.subscribe(db.ctx, db.client, channels...)
@@ -319,7 +320,8 @@ func (db *DB) SubscribeChannelDB(cb func(string, ...string), channelPrefix, even
                                case msg := <-rxChannel:
                                        cb, ok := lCbMap[msg.Channel]
                                        if ok {
-                                               cb(strings.TrimPrefix(msg.Channel, channelPrefix), strings.Split(msg.Payload, eventSeparator)...)
+                                               nSChNames := strings.SplitAfterN(msg.Channel, NsSeparator, 2)
+                                               cb(nSChNames[1], strings.Split(msg.Payload, EventSeparator)...)
                                        }
                                case channel := <-ch.addChannel:
                                        lCbMap = sCbMap.GetMapCopy()
@@ -336,7 +338,7 @@ func (db *DB) SubscribeChannelDB(cb func(string, ...string), channelPrefix, even
                                        }
                                }
                        }
-               }(db.sCbMap, channelPrefix, eventSeparator, db.ch, channels...)
+               }(db.sCbMap, db.ch, channels...)
 
        } else {
                for _, v := range channels {
index d8b8328..c95b8af 100644 (file)
@@ -900,7 +900,7 @@ func TestSubscribeChannelDBSubscribeRXUnsubscribe(t *testing.T) {
        ps, r, db := setupHaEnv(true)
        ch := make(chan *redis.Message)
        msg := redis.Message{
-               Channel: "{prefix}channel",
+               Channel: "{prefix},channel",
                Pattern: "pattern",
                Payload: "event",
        }
@@ -912,9 +912,9 @@ func TestSubscribeChannelDBSubscribeRXUnsubscribe(t *testing.T) {
        db.SubscribeChannelDB(func(channel string, payload ...string) {
                count++
                receivedChannel = channel
-       }, "{prefix}", "---", "{prefix}channel")
+       }, "{prefix},channel")
        ch <- &msg
-       db.UnsubscribeChannelDB("{prefix}channel")
+       db.UnsubscribeChannelDB("{prefix},channel")
        time.Sleep(1 * time.Second)
        assert.Equal(t, 1, count)
        assert.Equal(t, "channel", receivedChannel)
@@ -926,12 +926,12 @@ func TestSubscribeChannelDBSubscribeTwoUnsubscribeOne(t *testing.T) {
        ps, r, db := setupHaEnv(true)
        ch := make(chan *redis.Message)
        msg1 := redis.Message{
-               Channel: "{prefix}channel1",
+               Channel: "{prefix},channel1",
                Pattern: "pattern",
                Payload: "event",
        }
        msg2 := redis.Message{
-               Channel: "{prefix}channel2",
+               Channel: "{prefix},channel2",
                Pattern: "pattern",
                Payload: "event",
        }
@@ -945,18 +945,18 @@ func TestSubscribeChannelDBSubscribeTwoUnsubscribeOne(t *testing.T) {
        db.SubscribeChannelDB(func(channel string, payload ...string) {
                count++
                receivedChannel1 = channel
-       }, "{prefix}", "---", "{prefix}channel1")
+       }, "{prefix},channel1")
        ch <- &msg1
        receivedChannel2 := ""
        db.SubscribeChannelDB(func(channel string, payload ...string) {
                count++
                receivedChannel2 = channel
-       }, "{prefix}", "---", "{prefix}channel2")
+       }, "{prefix},channel2")
 
        time.Sleep(1 * time.Second)
-       db.UnsubscribeChannelDB("{prefix}channel1")
+       db.UnsubscribeChannelDB("{prefix},channel1")
        ch <- &msg2
-       db.UnsubscribeChannelDB("{prefix}channel2")
+       db.UnsubscribeChannelDB("{prefix},channel2")
        time.Sleep(1 * time.Second)
        assert.Equal(t, 2, count)
        assert.Equal(t, "channel1", receivedChannel1)
@@ -965,11 +965,54 @@ func TestSubscribeChannelDBSubscribeTwoUnsubscribeOne(t *testing.T) {
        ps.AssertExpectations(t)
 }
 
+func TestSubscribeChannelDBTwoSubscribesWithUnequalPrefixAndUnsubscribes(t *testing.T) {
+       ps, r, db := setupHaEnv(true)
+       ch := make(chan *redis.Message)
+       msg1 := redis.Message{
+               Channel: "{prefix1},channel",
+               Pattern: "pattern",
+               Payload: "event",
+       }
+       msg2 := redis.Message{
+               Channel: "{prefix2},channel",
+               Pattern: "pattern",
+               Payload: "event",
+       }
+       ps.On("Channel").Return(ch)
+       ps.On("Subscribe").Return(nil)
+       ps.On("Unsubscribe").Return(nil)
+       ps.On("Unsubscribe").Return(nil)
+       ps.On("Close").Return(nil)
+       count := 0
+       receivedChannel1 := ""
+       db.SubscribeChannelDB(func(channel string, payload ...string) {
+               count++
+               receivedChannel1 = channel
+       }, "{prefix1},channel")
+       ch <- &msg1
+       receivedChannel2 := ""
+       db.SubscribeChannelDB(func(channel string, payload ...string) {
+               count++
+               receivedChannel2 = channel
+       }, "{prefix2},channel")
+
+       time.Sleep(1 * time.Second)
+       db.UnsubscribeChannelDB("{prefix1},channel")
+       ch <- &msg2
+       db.UnsubscribeChannelDB("{prefix2},channel")
+       time.Sleep(1 * time.Second)
+       assert.Equal(t, 2, count)
+       assert.Equal(t, "channel", receivedChannel1)
+       assert.Equal(t, "channel", receivedChannel2)
+       r.AssertExpectations(t)
+       ps.AssertExpectations(t)
+}
+
 func TestSubscribeChannelReDBSubscribeAfterUnsubscribe(t *testing.T) {
        ps, r, db := setupHaEnv(true)
        ch := make(chan *redis.Message)
        msg := redis.Message{
-               Channel: "{prefix}channel",
+               Channel: "{prefix},channel",
                Pattern: "pattern",
                Payload: "event",
        }
@@ -982,17 +1025,17 @@ func TestSubscribeChannelReDBSubscribeAfterUnsubscribe(t *testing.T) {
        db.SubscribeChannelDB(func(channel string, payload ...string) {
                count++
                receivedChannel = channel
-       }, "{prefix}", "---", "{prefix}channel")
+       }, "{prefix},channel")
        ch <- &msg
-       db.UnsubscribeChannelDB("{prefix}channel")
+       db.UnsubscribeChannelDB("{prefix},channel")
        time.Sleep(1 * time.Second)
 
        db.SubscribeChannelDB(func(channel string, payload ...string) {
                count++
                receivedChannel = channel
-       }, "{prefix}", "---", "{prefix}channel")
+       }, "{prefix}", "---", "{prefix},channel")
        ch <- &msg
-       db.UnsubscribeChannelDB("{prefix}channel")
+       db.UnsubscribeChannelDB("{prefix},channel")
 
        time.Sleep(1 * time.Second)
        assert.Equal(t, 2, count)
@@ -1001,6 +1044,32 @@ func TestSubscribeChannelReDBSubscribeAfterUnsubscribe(t *testing.T) {
        ps.AssertExpectations(t)
 }
 
+func TestSubscribeChannelDBSubscribeReceivedEventIgnoredIfChannelNameIsUnknown(t *testing.T) {
+       ps, r, db := setupHaEnv(true)
+       ch := make(chan *redis.Message)
+       msg := redis.Message{
+               Channel: "missingNsPrefixchannel",
+               Pattern: "pattern",
+               Payload: "event",
+       }
+       ps.On("Channel").Return(ch)
+       ps.On("Unsubscribe").Return(nil)
+       ps.On("Close").Return(nil)
+       count := 0
+       receivedChannel := ""
+       db.SubscribeChannelDB(func(channel string, payload ...string) {
+               count++
+               receivedChannel = channel
+       }, "{prefix},channel")
+       ch <- &msg
+       db.UnsubscribeChannelDB("{prefix},channel")
+       time.Sleep(1 * time.Second)
+       assert.Equal(t, 0, count)
+       assert.Equal(t, "", receivedChannel)
+       r.AssertExpectations(t)
+       ps.AssertExpectations(t)
+}
+
 func TestPTTLSuccessfully(t *testing.T) {
        _, r, db := setupHaEnv(true)
        expectedKey := "key"
index 8dc9996..df83ff1 100644 (file)
@@ -37,8 +37,8 @@ type mockDB struct {
        mock.Mock
 }
 
-func (m *mockDB) SubscribeChannelDB(cb func(string, ...string), channelPrefix, eventSeparator string, channels ...string) {
-       m.Called(cb, channelPrefix, eventSeparator, channels)
+func (m *mockDB) SubscribeChannelDB(cb func(string, ...string), channels ...string) {
+       m.Called(cb, channels)
 }
 
 func (m *mockDB) UnsubscribeChannelDB(channels ...string) {
@@ -193,7 +193,7 @@ func TestSubscribeChannel(t *testing.T) {
        expectedCB := func(channel string, events ...string) {}
        expectedChannels := []string{"{namespace},channel1", "{namespace},channel2"}
 
-       m.On("SubscribeChannelDB", mock.AnythingOfType("func(string, ...string)"), "{namespace},", "___", expectedChannels).Return()
+       m.On("SubscribeChannelDB", mock.AnythingOfType("func(string, ...string)"), expectedChannels).Return()
        i.SubscribeChannel(expectedCB, "channel1", "channel2")
        m.AssertExpectations(t)
 }
index 850213a..4571dd9 100644 (file)
@@ -27,6 +27,7 @@ import (
        "encoding/base64"
        "errors"
        "fmt"
+       "gerrit.o-ran-sc.org/r/ric-plt/sdlgo/internal/sdlgoredis"
        "hash/crc32"
        "io"
        "reflect"
@@ -42,10 +43,9 @@ import (
 //SdlInstance where namespace can be defined only at SdlInstance instance creation
 //time.
 type SyncStorage struct {
-       eventSeparator string
-       mutex          sync.Mutex
-       tmp            []byte
-       db             *Database
+       mutex sync.Mutex
+       tmp   []byte
+       db    *Database
 }
 
 //NewSyncStorage creates a new sdl instance.
@@ -56,8 +56,7 @@ func NewSyncStorage() *SyncStorage {
 
 func newSyncStorage(db *Database) *SyncStorage {
        return &SyncStorage{
-               eventSeparator: "___",
-               db:             db,
+               db: db,
        }
 }
 
@@ -94,7 +93,7 @@ func getHash(s string) uint32 {
 //events received from different channels, callbacks are called in series one by one.
 func (s *SyncStorage) SubscribeChannel(ns string, cb func(string, ...string), channels ...string) error {
        nsPrefix := getNsPrefix(ns)
-       s.getDbBackend(ns).SubscribeChannelDB(cb, nsPrefix, s.eventSeparator, s.setNamespaceToChannels(nsPrefix, channels...)...)
+       s.getDbBackend(ns).SubscribeChannelDB(cb, s.setNamespaceToChannels(nsPrefix, channels...)...)
        return nil
 }
 
@@ -123,8 +122,8 @@ func (s *SyncStorage) checkChannelsAndEvents(cmd string, channelsAndEvents []str
        }
        for i, v := range channelsAndEvents {
                if i%2 != 0 {
-                       if strings.Contains(v, s.eventSeparator) {
-                               return fmt.Errorf("%s: event %s contains illegal substring (\"%s\")", cmd, v, s.eventSeparator)
+                       if strings.Contains(v, sdlgoredis.EventSeparator) {
+                               return fmt.Errorf("%s: event %s contains illegal substring (\"%s\")", cmd, v, sdlgoredis.EventSeparator)
                        }
                }
        }
@@ -217,7 +216,7 @@ func (s *SyncStorage) prepareChannelsAndEvents(nsPrefix string, channelsAndEvent
                }
                _, exists := channelEventMap[v]
                if exists {
-                       channelEventMap[v] = channelEventMap[v] + s.eventSeparator + channelsAndEvents[i+1]
+                       channelEventMap[v] = channelEventMap[v] + sdlgoredis.EventSeparator + channelsAndEvents[i+1]
                } else {
                        channelEventMap[v] = channelsAndEvents[i+1]
                }
@@ -636,11 +635,11 @@ type SyncStorageLock struct {
 }
 
 func getNsPrefix(ns string) string {
-       return "{" + ns + "},"
+       return "{" + ns + "}" + sdlgoredis.NsSeparator
 }
 
 type iDatabase interface {
-       SubscribeChannelDB(cb func(string, ...string), channelPrefix, eventSeparator string, channels ...string)
+       SubscribeChannelDB(cb func(string, ...string), channels ...string)
        UnsubscribeChannelDB(channels ...string)
        MSet(pairs ...interface{}) error
        MSetMPub(channelsAndEvents []string, pairs ...interface{}) error