From 40508c8e2b5154083afffffdfbc9ffcf2c7f868c Mon Sep 17 00:00:00 2001 From: Timo Tietavainen Date: Mon, 7 Feb 2022 13:16:51 +0200 Subject: [PATCH] Fix multi-namespace SDL event subscribe SDL event channel internal format was wrongly showed to application when the same application instance was subscribing to receive SDL events from multiple namespaces. In that case subscribes were all registered to the first namespace in SDL and hence when an event in a channel belonging to another namespace was triggered, namespace internal format '{other-namespace},' was showed in front of channel name. With this commit change namespace lookup from channel name to search from left to right the first comma ',' character. String after comma is a channel name what should be showed to an application and all before the comma ',' character can be silently parsed away in SDL. Issue-Id: RIC-885 Signed-off-by: Timo Tietavainen Change-Id: I90b94862670e1e529381112e385b65250e7cb275 --- docs/release-notes.rst | 4 ++ internal/sdlgoredis/sdlgoredis.go | 12 +++-- internal/sdlgoredis/sdlgoredis_test.go | 97 +++++++++++++++++++++++++++++----- sdl_test.go | 6 +-- syncstorage.go | 23 ++++---- 5 files changed, 108 insertions(+), 34 deletions(-) diff --git a/docs/release-notes.rst b/docs/release-notes.rst index 463d876..d792912 100644 --- a/docs/release-notes.rst +++ b/docs/release-notes.rst @@ -30,6 +30,10 @@ This document provides the release notes of the sdlgo. Version history --------------- +[0.9.6] - 2022-02-07 + +* Fix multi-namespace SDL event subscribe + [0.9.5] - 2022-01-20 * Pump Redis client version to v8.11.4 and fix Redis APIs to have a Golang diff --git a/internal/sdlgoredis/sdlgoredis.go b/internal/sdlgoredis/sdlgoredis.go index 278be2a..9162f39 100644 --- a/internal/sdlgoredis/sdlgoredis.go +++ b/internal/sdlgoredis/sdlgoredis.go @@ -38,6 +38,9 @@ import ( "time" ) +const EventSeparator = "___" +const NsSeparator = "," + type ChannelNotificationCb func(channel string, payload ...string) type RedisClientCreator func(addr, port, clusterName string, isHa bool) RedisClient @@ -300,15 +303,13 @@ func (db *DB) UnsubscribeChannelDB(channels ...string) { } } -func (db *DB) SubscribeChannelDB(cb func(string, ...string), channelPrefix, eventSeparator string, channels ...string) { +func (db *DB) SubscribeChannelDB(cb func(string, ...string), channels ...string) { if db.sCbMap.Count() == 0 { for _, v := range channels { db.sCbMap.Add(v, cb) } go func(sCbMap *sharedCbMap, - channelPrefix, - eventSeparator string, ch intChannels, channels ...string) { sub := db.subscribe(db.ctx, db.client, channels...) @@ -319,7 +320,8 @@ func (db *DB) SubscribeChannelDB(cb func(string, ...string), channelPrefix, even case msg := <-rxChannel: cb, ok := lCbMap[msg.Channel] if ok { - cb(strings.TrimPrefix(msg.Channel, channelPrefix), strings.Split(msg.Payload, eventSeparator)...) + nSChNames := strings.SplitAfterN(msg.Channel, NsSeparator, 2) + cb(nSChNames[1], strings.Split(msg.Payload, EventSeparator)...) } case channel := <-ch.addChannel: lCbMap = sCbMap.GetMapCopy() @@ -336,7 +338,7 @@ func (db *DB) SubscribeChannelDB(cb func(string, ...string), channelPrefix, even } } } - }(db.sCbMap, channelPrefix, eventSeparator, db.ch, channels...) + }(db.sCbMap, db.ch, channels...) } else { for _, v := range channels { diff --git a/internal/sdlgoredis/sdlgoredis_test.go b/internal/sdlgoredis/sdlgoredis_test.go index d8b8328..c95b8af 100644 --- a/internal/sdlgoredis/sdlgoredis_test.go +++ b/internal/sdlgoredis/sdlgoredis_test.go @@ -900,7 +900,7 @@ func TestSubscribeChannelDBSubscribeRXUnsubscribe(t *testing.T) { ps, r, db := setupHaEnv(true) ch := make(chan *redis.Message) msg := redis.Message{ - Channel: "{prefix}channel", + Channel: "{prefix},channel", Pattern: "pattern", Payload: "event", } @@ -912,9 +912,9 @@ func TestSubscribeChannelDBSubscribeRXUnsubscribe(t *testing.T) { db.SubscribeChannelDB(func(channel string, payload ...string) { count++ receivedChannel = channel - }, "{prefix}", "---", "{prefix}channel") + }, "{prefix},channel") ch <- &msg - db.UnsubscribeChannelDB("{prefix}channel") + db.UnsubscribeChannelDB("{prefix},channel") time.Sleep(1 * time.Second) assert.Equal(t, 1, count) assert.Equal(t, "channel", receivedChannel) @@ -926,12 +926,12 @@ func TestSubscribeChannelDBSubscribeTwoUnsubscribeOne(t *testing.T) { ps, r, db := setupHaEnv(true) ch := make(chan *redis.Message) msg1 := redis.Message{ - Channel: "{prefix}channel1", + Channel: "{prefix},channel1", Pattern: "pattern", Payload: "event", } msg2 := redis.Message{ - Channel: "{prefix}channel2", + Channel: "{prefix},channel2", Pattern: "pattern", Payload: "event", } @@ -945,18 +945,18 @@ func TestSubscribeChannelDBSubscribeTwoUnsubscribeOne(t *testing.T) { db.SubscribeChannelDB(func(channel string, payload ...string) { count++ receivedChannel1 = channel - }, "{prefix}", "---", "{prefix}channel1") + }, "{prefix},channel1") ch <- &msg1 receivedChannel2 := "" db.SubscribeChannelDB(func(channel string, payload ...string) { count++ receivedChannel2 = channel - }, "{prefix}", "---", "{prefix}channel2") + }, "{prefix},channel2") time.Sleep(1 * time.Second) - db.UnsubscribeChannelDB("{prefix}channel1") + db.UnsubscribeChannelDB("{prefix},channel1") ch <- &msg2 - db.UnsubscribeChannelDB("{prefix}channel2") + db.UnsubscribeChannelDB("{prefix},channel2") time.Sleep(1 * time.Second) assert.Equal(t, 2, count) assert.Equal(t, "channel1", receivedChannel1) @@ -965,11 +965,54 @@ func TestSubscribeChannelDBSubscribeTwoUnsubscribeOne(t *testing.T) { ps.AssertExpectations(t) } +func TestSubscribeChannelDBTwoSubscribesWithUnequalPrefixAndUnsubscribes(t *testing.T) { + ps, r, db := setupHaEnv(true) + ch := make(chan *redis.Message) + msg1 := redis.Message{ + Channel: "{prefix1},channel", + Pattern: "pattern", + Payload: "event", + } + msg2 := redis.Message{ + Channel: "{prefix2},channel", + Pattern: "pattern", + Payload: "event", + } + ps.On("Channel").Return(ch) + ps.On("Subscribe").Return(nil) + ps.On("Unsubscribe").Return(nil) + ps.On("Unsubscribe").Return(nil) + ps.On("Close").Return(nil) + count := 0 + receivedChannel1 := "" + db.SubscribeChannelDB(func(channel string, payload ...string) { + count++ + receivedChannel1 = channel + }, "{prefix1},channel") + ch <- &msg1 + receivedChannel2 := "" + db.SubscribeChannelDB(func(channel string, payload ...string) { + count++ + receivedChannel2 = channel + }, "{prefix2},channel") + + time.Sleep(1 * time.Second) + db.UnsubscribeChannelDB("{prefix1},channel") + ch <- &msg2 + db.UnsubscribeChannelDB("{prefix2},channel") + time.Sleep(1 * time.Second) + assert.Equal(t, 2, count) + assert.Equal(t, "channel", receivedChannel1) + assert.Equal(t, "channel", receivedChannel2) + r.AssertExpectations(t) + ps.AssertExpectations(t) +} + func TestSubscribeChannelReDBSubscribeAfterUnsubscribe(t *testing.T) { ps, r, db := setupHaEnv(true) ch := make(chan *redis.Message) msg := redis.Message{ - Channel: "{prefix}channel", + Channel: "{prefix},channel", Pattern: "pattern", Payload: "event", } @@ -982,17 +1025,17 @@ func TestSubscribeChannelReDBSubscribeAfterUnsubscribe(t *testing.T) { db.SubscribeChannelDB(func(channel string, payload ...string) { count++ receivedChannel = channel - }, "{prefix}", "---", "{prefix}channel") + }, "{prefix},channel") ch <- &msg - db.UnsubscribeChannelDB("{prefix}channel") + db.UnsubscribeChannelDB("{prefix},channel") time.Sleep(1 * time.Second) db.SubscribeChannelDB(func(channel string, payload ...string) { count++ receivedChannel = channel - }, "{prefix}", "---", "{prefix}channel") + }, "{prefix}", "---", "{prefix},channel") ch <- &msg - db.UnsubscribeChannelDB("{prefix}channel") + db.UnsubscribeChannelDB("{prefix},channel") time.Sleep(1 * time.Second) assert.Equal(t, 2, count) @@ -1001,6 +1044,32 @@ func TestSubscribeChannelReDBSubscribeAfterUnsubscribe(t *testing.T) { ps.AssertExpectations(t) } +func TestSubscribeChannelDBSubscribeReceivedEventIgnoredIfChannelNameIsUnknown(t *testing.T) { + ps, r, db := setupHaEnv(true) + ch := make(chan *redis.Message) + msg := redis.Message{ + Channel: "missingNsPrefixchannel", + Pattern: "pattern", + Payload: "event", + } + ps.On("Channel").Return(ch) + ps.On("Unsubscribe").Return(nil) + ps.On("Close").Return(nil) + count := 0 + receivedChannel := "" + db.SubscribeChannelDB(func(channel string, payload ...string) { + count++ + receivedChannel = channel + }, "{prefix},channel") + ch <- &msg + db.UnsubscribeChannelDB("{prefix},channel") + time.Sleep(1 * time.Second) + assert.Equal(t, 0, count) + assert.Equal(t, "", receivedChannel) + r.AssertExpectations(t) + ps.AssertExpectations(t) +} + func TestPTTLSuccessfully(t *testing.T) { _, r, db := setupHaEnv(true) expectedKey := "key" diff --git a/sdl_test.go b/sdl_test.go index 8dc9996..df83ff1 100644 --- a/sdl_test.go +++ b/sdl_test.go @@ -37,8 +37,8 @@ type mockDB struct { mock.Mock } -func (m *mockDB) SubscribeChannelDB(cb func(string, ...string), channelPrefix, eventSeparator string, channels ...string) { - m.Called(cb, channelPrefix, eventSeparator, channels) +func (m *mockDB) SubscribeChannelDB(cb func(string, ...string), channels ...string) { + m.Called(cb, channels) } func (m *mockDB) UnsubscribeChannelDB(channels ...string) { @@ -193,7 +193,7 @@ func TestSubscribeChannel(t *testing.T) { expectedCB := func(channel string, events ...string) {} expectedChannels := []string{"{namespace},channel1", "{namespace},channel2"} - m.On("SubscribeChannelDB", mock.AnythingOfType("func(string, ...string)"), "{namespace},", "___", expectedChannels).Return() + m.On("SubscribeChannelDB", mock.AnythingOfType("func(string, ...string)"), expectedChannels).Return() i.SubscribeChannel(expectedCB, "channel1", "channel2") m.AssertExpectations(t) } diff --git a/syncstorage.go b/syncstorage.go index 850213a..4571dd9 100644 --- a/syncstorage.go +++ b/syncstorage.go @@ -27,6 +27,7 @@ import ( "encoding/base64" "errors" "fmt" + "gerrit.o-ran-sc.org/r/ric-plt/sdlgo/internal/sdlgoredis" "hash/crc32" "io" "reflect" @@ -42,10 +43,9 @@ import ( //SdlInstance where namespace can be defined only at SdlInstance instance creation //time. type SyncStorage struct { - eventSeparator string - mutex sync.Mutex - tmp []byte - db *Database + mutex sync.Mutex + tmp []byte + db *Database } //NewSyncStorage creates a new sdl instance. @@ -56,8 +56,7 @@ func NewSyncStorage() *SyncStorage { func newSyncStorage(db *Database) *SyncStorage { return &SyncStorage{ - eventSeparator: "___", - db: db, + db: db, } } @@ -94,7 +93,7 @@ func getHash(s string) uint32 { //events received from different channels, callbacks are called in series one by one. func (s *SyncStorage) SubscribeChannel(ns string, cb func(string, ...string), channels ...string) error { nsPrefix := getNsPrefix(ns) - s.getDbBackend(ns).SubscribeChannelDB(cb, nsPrefix, s.eventSeparator, s.setNamespaceToChannels(nsPrefix, channels...)...) + s.getDbBackend(ns).SubscribeChannelDB(cb, s.setNamespaceToChannels(nsPrefix, channels...)...) return nil } @@ -123,8 +122,8 @@ func (s *SyncStorage) checkChannelsAndEvents(cmd string, channelsAndEvents []str } for i, v := range channelsAndEvents { if i%2 != 0 { - if strings.Contains(v, s.eventSeparator) { - return fmt.Errorf("%s: event %s contains illegal substring (\"%s\")", cmd, v, s.eventSeparator) + if strings.Contains(v, sdlgoredis.EventSeparator) { + return fmt.Errorf("%s: event %s contains illegal substring (\"%s\")", cmd, v, sdlgoredis.EventSeparator) } } } @@ -217,7 +216,7 @@ func (s *SyncStorage) prepareChannelsAndEvents(nsPrefix string, channelsAndEvent } _, exists := channelEventMap[v] if exists { - channelEventMap[v] = channelEventMap[v] + s.eventSeparator + channelsAndEvents[i+1] + channelEventMap[v] = channelEventMap[v] + sdlgoredis.EventSeparator + channelsAndEvents[i+1] } else { channelEventMap[v] = channelsAndEvents[i+1] } @@ -636,11 +635,11 @@ type SyncStorageLock struct { } func getNsPrefix(ns string) string { - return "{" + ns + "}," + return "{" + ns + "}" + sdlgoredis.NsSeparator } type iDatabase interface { - SubscribeChannelDB(cb func(string, ...string), channelPrefix, eventSeparator string, channels ...string) + SubscribeChannelDB(cb func(string, ...string), channels ...string) UnsubscribeChannelDB(channels ...string) MSet(pairs ...interface{}) error MSetMPub(channelsAndEvents []string, pairs ...interface{}) error -- 2.16.6