X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=internal%2Fsdlgoredis%2Fsdlgoredis.go;h=327946e214c78190686d5d444917b3da39d939be;hb=f3f2907dc4862ed4f86288e915e8994bdf9889d0;hp=8e2a2ef2e12e265dfce2b6f17a3adab5cfacc9b1;hpb=f759492b4f02f1e9d66115a6b83deec519cb5df4;p=ric-plt%2Fsdlgo.git diff --git a/internal/sdlgoredis/sdlgoredis.go b/internal/sdlgoredis/sdlgoredis.go index 8e2a2ef..327946e 100644 --- a/internal/sdlgoredis/sdlgoredis.go +++ b/internal/sdlgoredis/sdlgoredis.go @@ -15,17 +15,22 @@ limitations under the License. */ +/* + * This source code is part of the near-RT RIC (RAN Intelligent Controller) + * platform project (RICP). + */ + package sdlgoredis import ( "errors" "fmt" + "github.com/go-redis/redis" "os" "strconv" "strings" + "sync" "time" - - "github.com/go-redis/redis" ) type ChannelNotificationCb func(channel string, payload ...string) @@ -36,11 +41,16 @@ type intChannels struct { exit chan bool } +type sharedCbMap struct { + m sync.Mutex + cbMap map[string]ChannelNotificationCb +} + type DB struct { client RedisClient subscribe SubscribeFn redisModules bool - cbMap map[string]ChannelNotificationCb + sCbMap *sharedCbMap ch intChannels } @@ -92,8 +102,14 @@ func checkIntResultAndError(result interface{}, err error) (bool, error) { if err != nil { return false, err } - if result.(int64) == int64(1) { - return true, nil + if n, ok := result.(int64); ok { + if n == 1 { + return true, nil + } + } else if n, ok := result.(int); ok { + if n == 1 { + return true, nil + } } return false, nil } @@ -107,7 +123,7 @@ func CreateDB(client RedisClient, subscribe SubscribeFn) *DB { client: client, subscribe: subscribe, redisModules: true, - cbMap: make(map[string]ChannelNotificationCb, 0), + sCbMap: &sharedCbMap{cbMap: make(map[string]ChannelNotificationCb, 0)}, ch: intChannels{ addChannel: make(chan string), removeChannel: make(chan string), @@ -175,37 +191,40 @@ func (db *DB) CloseDB() error { func (db *DB) UnsubscribeChannelDB(channels ...string) { for _, v := range channels { + db.sCbMap.Remove(v) db.ch.removeChannel <- v - delete(db.cbMap, v) - if len(db.cbMap) == 0 { + if db.sCbMap.Count() == 0 { db.ch.exit <- true } } } func (db *DB) SubscribeChannelDB(cb func(string, ...string), channelPrefix, eventSeparator string, channels ...string) { - if len(db.cbMap) == 0 { + if db.sCbMap.Count() == 0 { for _, v := range channels { - db.cbMap[v] = cb + db.sCbMap.Add(v, cb) } - go func(cbMap *map[string]ChannelNotificationCb, + go func(sCbMap *sharedCbMap, channelPrefix, eventSeparator string, ch intChannels, channels ...string) { sub := db.subscribe(db.client, channels...) rxChannel := sub.Channel() + lCbMap := sCbMap.GetMapCopy() for { select { case msg := <-rxChannel: - cb, ok := (*cbMap)[msg.Channel] + cb, ok := lCbMap[msg.Channel] if ok { cb(strings.TrimPrefix(msg.Channel, channelPrefix), strings.Split(msg.Payload, eventSeparator)...) } case channel := <-ch.addChannel: + lCbMap = sCbMap.GetMapCopy() sub.Subscribe(channel) case channel := <-ch.removeChannel: + lCbMap = sCbMap.GetMapCopy() sub.Unsubscribe(channel) case exit := <-ch.exit: if exit { @@ -216,11 +235,11 @@ func (db *DB) SubscribeChannelDB(cb func(string, ...string), channelPrefix, even } } } - }(&db.cbMap, channelPrefix, eventSeparator, db.ch, channels...) + }(db.sCbMap, channelPrefix, eventSeparator, db.ch, channels...) } else { for _, v := range channels { - db.cbMap[v] = cb + db.sCbMap.Add(v, cb) db.ch.addChannel <- v } } @@ -288,28 +307,53 @@ func (db *DB) SetIE(key string, oldData, newData interface{}) (bool, error) { return checkResultAndError(db.client.Do("SETIE", key, newData, oldData).Result()) } -func (db *DB) SetIEPub(channel, message, key string, oldData, newData interface{}) (bool, error) { +func (db *DB) SetIEPub(channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error) { if !db.redisModules { - return false, errors.New("Redis deployment not supporting command SETIEPUB") + return false, errors.New("Redis deployment not supporting command SETIEMPUB") + } + capacity := 4 + len(channelsAndEvents) + command := make([]interface{}, 0, capacity) + command = append(command, "SETIEMPUB") + command = append(command, key) + command = append(command, newData) + command = append(command, oldData) + for _, ce := range channelsAndEvents { + command = append(command, ce) } - return checkResultAndError(db.client.Do("SETIEPUB", key, newData, oldData, channel, message).Result()) + return checkResultAndError(db.client.Do(command...).Result()) } -func (db *DB) SetNXPub(channel, message, key string, data interface{}) (bool, error) { +func (db *DB) SetNXPub(channelsAndEvents []string, key string, data interface{}) (bool, error) { if !db.redisModules { - return false, errors.New("Redis deployment not supporting command SETNXPUB") + return false, errors.New("Redis deployment not supporting command SETNXMPUB") + } + capacity := 3 + len(channelsAndEvents) + command := make([]interface{}, 0, capacity) + command = append(command, "SETNXMPUB") + command = append(command, key) + command = append(command, data) + for _, ce := range channelsAndEvents { + command = append(command, ce) } - return checkResultAndError(db.client.Do("SETNXPUB", key, data, channel, message).Result()) + return checkResultAndError(db.client.Do(command...).Result()) } func (db *DB) SetNX(key string, data interface{}, expiration time.Duration) (bool, error) { return db.client.SetNX(key, data, expiration).Result() } -func (db *DB) DelIEPub(channel, message, key string, data interface{}) (bool, error) { +func (db *DB) DelIEPub(channelsAndEvents []string, key string, data interface{}) (bool, error) { if !db.redisModules { - return false, errors.New("Redis deployment not supporting command") + return false, errors.New("Redis deployment not supporting command DELIEMPUB") + } + capacity := 3 + len(channelsAndEvents) + command := make([]interface{}, 0, capacity) + command = append(command, "DELIEMPUB") + command = append(command, key) + command = append(command, data) + for _, ce := range channelsAndEvents { + command = append(command, ce) } - return checkIntResultAndError(db.client.Do("DELIEPUB", key, data, channel, message).Result()) + return checkIntResultAndError(db.client.Do(command...).Result()) } func (db *DB) DelIE(key string, data interface{}) (bool, error) { @@ -362,3 +406,31 @@ func (db *DB) PExpireIE(key string, data interface{}, expiration time.Duration) } return errors.New("Lock not held") } + +func (sCbMap *sharedCbMap) Add(channel string, cb ChannelNotificationCb) { + sCbMap.m.Lock() + defer sCbMap.m.Unlock() + sCbMap.cbMap[channel] = cb +} + +func (sCbMap *sharedCbMap) Remove(channel string) { + sCbMap.m.Lock() + defer sCbMap.m.Unlock() + delete(sCbMap.cbMap, channel) +} + +func (sCbMap *sharedCbMap) Count() int { + sCbMap.m.Lock() + defer sCbMap.m.Unlock() + return len(sCbMap.cbMap) +} + +func (sCbMap *sharedCbMap) GetMapCopy() map[string]ChannelNotificationCb { + sCbMap.m.Lock() + defer sCbMap.m.Unlock() + mapCopy := make(map[string]ChannelNotificationCb, 0) + for i, v := range sCbMap.cbMap { + mapCopy[i] = v + } + return mapCopy +}