"time"
)
+const EventSeparator = "___"
+const NsSeparator = ","
+
type ChannelNotificationCb func(channel string, payload ...string)
type RedisClientCreator func(addr, port, clusterName string, isHa bool) RedisClient
}
}
-func (db *DB) SubscribeChannelDB(cb func(string, ...string), channelPrefix, eventSeparator string, channels ...string) {
+func (db *DB) SubscribeChannelDB(cb func(string, ...string), channels ...string) {
if db.sCbMap.Count() == 0 {
for _, v := range channels {
db.sCbMap.Add(v, cb)
}
go func(sCbMap *sharedCbMap,
- channelPrefix,
- eventSeparator string,
ch intChannels,
channels ...string) {
sub := db.subscribe(db.ctx, db.client, channels...)
case msg := <-rxChannel:
cb, ok := lCbMap[msg.Channel]
if ok {
- cb(strings.TrimPrefix(msg.Channel, channelPrefix), strings.Split(msg.Payload, eventSeparator)...)
+ nSChNames := strings.SplitAfterN(msg.Channel, NsSeparator, 2)
+ cb(nSChNames[1], strings.Split(msg.Payload, EventSeparator)...)
}
case channel := <-ch.addChannel:
lCbMap = sCbMap.GetMapCopy()
}
}
}
- }(db.sCbMap, channelPrefix, eventSeparator, db.ch, channels...)
+ }(db.sCbMap, db.ch, channels...)
} else {
for _, v := range channels {
ps, r, db := setupHaEnv(true)
ch := make(chan *redis.Message)
msg := redis.Message{
- Channel: "{prefix}channel",
+ Channel: "{prefix},channel",
Pattern: "pattern",
Payload: "event",
}
db.SubscribeChannelDB(func(channel string, payload ...string) {
count++
receivedChannel = channel
- }, "{prefix}", "---", "{prefix}channel")
+ }, "{prefix},channel")
ch <- &msg
- db.UnsubscribeChannelDB("{prefix}channel")
+ db.UnsubscribeChannelDB("{prefix},channel")
time.Sleep(1 * time.Second)
assert.Equal(t, 1, count)
assert.Equal(t, "channel", receivedChannel)
ps, r, db := setupHaEnv(true)
ch := make(chan *redis.Message)
msg1 := redis.Message{
- Channel: "{prefix}channel1",
+ Channel: "{prefix},channel1",
Pattern: "pattern",
Payload: "event",
}
msg2 := redis.Message{
- Channel: "{prefix}channel2",
+ Channel: "{prefix},channel2",
Pattern: "pattern",
Payload: "event",
}
db.SubscribeChannelDB(func(channel string, payload ...string) {
count++
receivedChannel1 = channel
- }, "{prefix}", "---", "{prefix}channel1")
+ }, "{prefix},channel1")
ch <- &msg1
receivedChannel2 := ""
db.SubscribeChannelDB(func(channel string, payload ...string) {
count++
receivedChannel2 = channel
- }, "{prefix}", "---", "{prefix}channel2")
+ }, "{prefix},channel2")
time.Sleep(1 * time.Second)
- db.UnsubscribeChannelDB("{prefix}channel1")
+ db.UnsubscribeChannelDB("{prefix},channel1")
ch <- &msg2
- db.UnsubscribeChannelDB("{prefix}channel2")
+ db.UnsubscribeChannelDB("{prefix},channel2")
time.Sleep(1 * time.Second)
assert.Equal(t, 2, count)
assert.Equal(t, "channel1", receivedChannel1)
ps.AssertExpectations(t)
}
+func TestSubscribeChannelDBTwoSubscribesWithUnequalPrefixAndUnsubscribes(t *testing.T) {
+ ps, r, db := setupHaEnv(true)
+ ch := make(chan *redis.Message)
+ msg1 := redis.Message{
+ Channel: "{prefix1},channel",
+ Pattern: "pattern",
+ Payload: "event",
+ }
+ msg2 := redis.Message{
+ Channel: "{prefix2},channel",
+ Pattern: "pattern",
+ Payload: "event",
+ }
+ ps.On("Channel").Return(ch)
+ ps.On("Subscribe").Return(nil)
+ ps.On("Unsubscribe").Return(nil)
+ ps.On("Unsubscribe").Return(nil)
+ ps.On("Close").Return(nil)
+ count := 0
+ receivedChannel1 := ""
+ db.SubscribeChannelDB(func(channel string, payload ...string) {
+ count++
+ receivedChannel1 = channel
+ }, "{prefix1},channel")
+ ch <- &msg1
+ receivedChannel2 := ""
+ db.SubscribeChannelDB(func(channel string, payload ...string) {
+ count++
+ receivedChannel2 = channel
+ }, "{prefix2},channel")
+
+ time.Sleep(1 * time.Second)
+ db.UnsubscribeChannelDB("{prefix1},channel")
+ ch <- &msg2
+ db.UnsubscribeChannelDB("{prefix2},channel")
+ time.Sleep(1 * time.Second)
+ assert.Equal(t, 2, count)
+ assert.Equal(t, "channel", receivedChannel1)
+ assert.Equal(t, "channel", receivedChannel2)
+ r.AssertExpectations(t)
+ ps.AssertExpectations(t)
+}
+
func TestSubscribeChannelReDBSubscribeAfterUnsubscribe(t *testing.T) {
ps, r, db := setupHaEnv(true)
ch := make(chan *redis.Message)
msg := redis.Message{
- Channel: "{prefix}channel",
+ Channel: "{prefix},channel",
Pattern: "pattern",
Payload: "event",
}
db.SubscribeChannelDB(func(channel string, payload ...string) {
count++
receivedChannel = channel
- }, "{prefix}", "---", "{prefix}channel")
+ }, "{prefix},channel")
ch <- &msg
- db.UnsubscribeChannelDB("{prefix}channel")
+ db.UnsubscribeChannelDB("{prefix},channel")
time.Sleep(1 * time.Second)
db.SubscribeChannelDB(func(channel string, payload ...string) {
count++
receivedChannel = channel
- }, "{prefix}", "---", "{prefix}channel")
+ }, "{prefix}", "---", "{prefix},channel")
ch <- &msg
- db.UnsubscribeChannelDB("{prefix}channel")
+ db.UnsubscribeChannelDB("{prefix},channel")
time.Sleep(1 * time.Second)
assert.Equal(t, 2, count)
ps.AssertExpectations(t)
}
+func TestSubscribeChannelDBSubscribeReceivedEventIgnoredIfChannelNameIsUnknown(t *testing.T) {
+ ps, r, db := setupHaEnv(true)
+ ch := make(chan *redis.Message)
+ msg := redis.Message{
+ Channel: "missingNsPrefixchannel",
+ Pattern: "pattern",
+ Payload: "event",
+ }
+ ps.On("Channel").Return(ch)
+ ps.On("Unsubscribe").Return(nil)
+ ps.On("Close").Return(nil)
+ count := 0
+ receivedChannel := ""
+ db.SubscribeChannelDB(func(channel string, payload ...string) {
+ count++
+ receivedChannel = channel
+ }, "{prefix},channel")
+ ch <- &msg
+ db.UnsubscribeChannelDB("{prefix},channel")
+ time.Sleep(1 * time.Second)
+ assert.Equal(t, 0, count)
+ assert.Equal(t, "", receivedChannel)
+ r.AssertExpectations(t)
+ ps.AssertExpectations(t)
+}
+
func TestPTTLSuccessfully(t *testing.T) {
_, r, db := setupHaEnv(true)
expectedKey := "key"
"encoding/base64"
"errors"
"fmt"
+ "gerrit.o-ran-sc.org/r/ric-plt/sdlgo/internal/sdlgoredis"
"hash/crc32"
"io"
"reflect"
//SdlInstance where namespace can be defined only at SdlInstance instance creation
//time.
type SyncStorage struct {
- eventSeparator string
- mutex sync.Mutex
- tmp []byte
- db *Database
+ mutex sync.Mutex
+ tmp []byte
+ db *Database
}
//NewSyncStorage creates a new sdl instance.
func newSyncStorage(db *Database) *SyncStorage {
return &SyncStorage{
- eventSeparator: "___",
- db: db,
+ db: db,
}
}
//events received from different channels, callbacks are called in series one by one.
func (s *SyncStorage) SubscribeChannel(ns string, cb func(string, ...string), channels ...string) error {
nsPrefix := getNsPrefix(ns)
- s.getDbBackend(ns).SubscribeChannelDB(cb, nsPrefix, s.eventSeparator, s.setNamespaceToChannels(nsPrefix, channels...)...)
+ s.getDbBackend(ns).SubscribeChannelDB(cb, s.setNamespaceToChannels(nsPrefix, channels...)...)
return nil
}
}
for i, v := range channelsAndEvents {
if i%2 != 0 {
- if strings.Contains(v, s.eventSeparator) {
- return fmt.Errorf("%s: event %s contains illegal substring (\"%s\")", cmd, v, s.eventSeparator)
+ if strings.Contains(v, sdlgoredis.EventSeparator) {
+ return fmt.Errorf("%s: event %s contains illegal substring (\"%s\")", cmd, v, sdlgoredis.EventSeparator)
}
}
}
}
_, exists := channelEventMap[v]
if exists {
- channelEventMap[v] = channelEventMap[v] + s.eventSeparator + channelsAndEvents[i+1]
+ channelEventMap[v] = channelEventMap[v] + sdlgoredis.EventSeparator + channelsAndEvents[i+1]
} else {
channelEventMap[v] = channelsAndEvents[i+1]
}
}
func getNsPrefix(ns string) string {
- return "{" + ns + "},"
+ return "{" + ns + "}" + sdlgoredis.NsSeparator
}
type iDatabase interface {
- SubscribeChannelDB(cb func(string, ...string), channelPrefix, eventSeparator string, channels ...string)
+ SubscribeChannelDB(cb func(string, ...string), channels ...string)
UnsubscribeChannelDB(channels ...string)
MSet(pairs ...interface{}) error
MSetMPub(channelsAndEvents []string, pairs ...interface{}) error