X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=syncstorage.go;h=4571dd9eb6435f7eee7e937d2ab45768990c27d8;hb=6ffe956f918e3fac2e939847268b2ae94684a738;hp=874a41ec05b074f27b4e359ee438765296588e66;hpb=725e2204f7256de05686d380b4e30a72e9dea8b4;p=ric-plt%2Fsdlgo.git diff --git a/syncstorage.go b/syncstorage.go index 874a41e..4571dd9 100644 --- a/syncstorage.go +++ b/syncstorage.go @@ -27,6 +27,8 @@ import ( "encoding/base64" "errors" "fmt" + "gerrit.o-ran-sc.org/r/ric-plt/sdlgo/internal/sdlgoredis" + "hash/crc32" "io" "reflect" "strings" @@ -41,10 +43,9 @@ import ( //SdlInstance where namespace can be defined only at SdlInstance instance creation //time. type SyncStorage struct { - eventSeparator string - mutex sync.Mutex - tmp []byte - iDatabase + mutex sync.Mutex + tmp []byte + db *Database } //NewSyncStorage creates a new sdl instance. @@ -55,11 +56,23 @@ func NewSyncStorage() *SyncStorage { func newSyncStorage(db *Database) *SyncStorage { return &SyncStorage{ - eventSeparator: "___", - iDatabase: db.instance, + db: db, } } +//selectDbInstance Selects DB instance what provides DB services for the namespace +func (s *SyncStorage) getDbBackend(ns string) iDatabase { + instanceCount := uint32(len(s.db.instances)) + instanceID := getHash(ns) % instanceCount + return s.db.instances[instanceID] +} + +//getHash Returns hash value calculated from the string +func getHash(s string) uint32 { + tbl := crc32.MakeTable(crc32.IEEE) + return crc32.Checksum([]byte(s), tbl) +} + //SubscribeChannel lets you to subscribe for a events on a given channels. //SDL notifications are events that are published on a specific channels. //Both the channel and events are defined by the entity that is publishing @@ -80,7 +93,7 @@ func newSyncStorage(db *Database) *SyncStorage { //events received from different channels, callbacks are called in series one by one. func (s *SyncStorage) SubscribeChannel(ns string, cb func(string, ...string), channels ...string) error { nsPrefix := getNsPrefix(ns) - s.SubscribeChannelDB(cb, nsPrefix, s.eventSeparator, s.setNamespaceToChannels(nsPrefix, channels...)...) + s.getDbBackend(ns).SubscribeChannelDB(cb, s.setNamespaceToChannels(nsPrefix, channels...)...) return nil } @@ -88,13 +101,19 @@ func (s *SyncStorage) SubscribeChannel(ns string, cb func(string, ...string), ch //namespace. func (s *SyncStorage) UnsubscribeChannel(ns string, channels ...string) error { nsPrefix := getNsPrefix(ns) - s.UnsubscribeChannelDB(s.setNamespaceToChannels(nsPrefix, channels...)...) + s.getDbBackend(ns).UnsubscribeChannelDB(s.setNamespaceToChannels(nsPrefix, channels...)...) return nil } //Close connection to backend database. func (s *SyncStorage) Close() error { - return s.CloseDB() + var ret error + for _, db := range s.db.instances { + if err := db.CloseDB(); err != nil { + ret = err + } + } + return ret } func (s *SyncStorage) checkChannelsAndEvents(cmd string, channelsAndEvents []string) error { @@ -103,8 +122,8 @@ func (s *SyncStorage) checkChannelsAndEvents(cmd string, channelsAndEvents []str } for i, v := range channelsAndEvents { if i%2 != 0 { - if strings.Contains(v, s.eventSeparator) { - return fmt.Errorf("%s: event %s contains illegal substring (\"%s\")", cmd, v, s.eventSeparator) + if strings.Contains(v, sdlgoredis.EventSeparator) { + return fmt.Errorf("%s: event %s contains illegal substring (\"%s\")", cmd, v, sdlgoredis.EventSeparator) } } } @@ -197,7 +216,7 @@ func (s *SyncStorage) prepareChannelsAndEvents(nsPrefix string, channelsAndEvent } _, exists := channelEventMap[v] if exists { - channelEventMap[v] = channelEventMap[v] + s.eventSeparator + channelsAndEvents[i+1] + channelEventMap[v] = channelEventMap[v] + sdlgoredis.EventSeparator + channelsAndEvents[i+1] } else { channelEventMap[v] = channelsAndEvents[i+1] } @@ -231,13 +250,13 @@ func (s *SyncStorage) SetAndPublish(ns string, channelsAndEvents []string, pairs return err } if len(channelsAndEvents) == 0 { - return s.MSet(keyAndData...) + return s.getDbBackend(ns).MSet(keyAndData...) } if err := s.checkChannelsAndEvents("SetAndPublish", channelsAndEvents); err != nil { return err } channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents) - return s.MSetMPub(channelsAndEventsPrepared, keyAndData...) + return s.getDbBackend(ns).MSetMPub(channelsAndEventsPrepared, keyAndData...) } //Set function writes data to shared data layer storage. Writing is done @@ -256,7 +275,7 @@ func (s *SyncStorage) Set(ns string, pairs ...interface{}) error { if err != nil { return err } - return s.MSet(keyAndData...) + return s.getDbBackend(ns).MSet(keyAndData...) } //Get function atomically reads one or more keys from SDL. The returned map has the @@ -274,7 +293,7 @@ func (s *SyncStorage) Get(ns string, keys []string) (map[string]interface{}, err for _, v := range keys { keysWithNs = append(keysWithNs, getNsPrefix(ns)+v) } - val, err := s.MGet(keysWithNs) + val, err := s.getDbBackend(ns).MGet(keysWithNs) if err != nil { return m, err } @@ -291,20 +310,20 @@ func (s *SyncStorage) Get(ns string, keys []string) (map[string]interface{}, err func (s *SyncStorage) SetIfAndPublish(ns string, channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error) { nsPrefix := getNsPrefix(ns) if len(channelsAndEvents) == 0 { - return s.SetIE(nsPrefix+key, oldData, newData) + return s.getDbBackend(ns).SetIE(nsPrefix+key, oldData, newData) } if err := s.checkChannelsAndEvents("SetIfAndPublish", channelsAndEvents); err != nil { return false, err } channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents) - return s.SetIEPub(channelsAndEventsPrepared, nsPrefix+key, oldData, newData) + return s.getDbBackend(ns).SetIEPub(channelsAndEventsPrepared, nsPrefix+key, oldData, newData) } //SetIf atomically replaces existing data with newData in SDL if data matches the oldData. //If replace was done successfully, true will be returned. //Data is written under the namespace what is given as a parameter for this function. func (s *SyncStorage) SetIf(ns string, key string, oldData, newData interface{}) (bool, error) { - return s.SetIE(getNsPrefix(ns)+key, oldData, newData) + return s.getDbBackend(ns).SetIE(getNsPrefix(ns)+key, oldData, newData) } //SetIfNotExistsAndPublish conditionally sets the value of a key. If key already exists in SDL, @@ -315,13 +334,13 @@ func (s *SyncStorage) SetIf(ns string, key string, oldData, newData interface{}) func (s *SyncStorage) SetIfNotExistsAndPublish(ns string, channelsAndEvents []string, key string, data interface{}) (bool, error) { nsPrefix := getNsPrefix(ns) if len(channelsAndEvents) == 0 { - return s.SetNX(nsPrefix+key, data, 0) + return s.getDbBackend(ns).SetNX(nsPrefix+key, data, 0) } if err := s.checkChannelsAndEvents("SetIfNotExistsAndPublish", channelsAndEvents); err != nil { return false, err } channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents) - return s.SetNXPub(channelsAndEventsPrepared, nsPrefix+key, data) + return s.getDbBackend(ns).SetNXPub(channelsAndEventsPrepared, nsPrefix+key, data) } //SetIfNotExists conditionally sets the value of a key. If key already exists in SDL, @@ -329,7 +348,7 @@ func (s *SyncStorage) SetIfNotExistsAndPublish(ns string, channelsAndEvents []st //is done atomically. //Data is written under the namespace what is given as a parameter for this function. func (s *SyncStorage) SetIfNotExists(ns string, key string, data interface{}) (bool, error) { - return s.SetNX(getNsPrefix(ns)+key, data, 0) + return s.getDbBackend(ns).SetNX(getNsPrefix(ns)+key, data, 0) } //RemoveAndPublish removes data from SDL. Operation is done atomically, i.e. either all succeeds or fails. @@ -349,13 +368,13 @@ func (s *SyncStorage) RemoveAndPublish(ns string, channelsAndEvents []string, ke keysWithNs = append(keysWithNs, nsPrefix+v) } if len(channelsAndEvents) == 0 { - return s.Del(keysWithNs) + return s.getDbBackend(ns).Del(keysWithNs) } if err := s.checkChannelsAndEvents("RemoveAndPublish", channelsAndEvents); err != nil { return err } channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents) - return s.DelMPub(channelsAndEventsPrepared, keysWithNs) + return s.getDbBackend(ns).DelMPub(channelsAndEventsPrepared, keysWithNs) } //Remove data from SDL. Operation is done atomically, i.e. either all succeeds or fails. @@ -369,7 +388,7 @@ func (s *SyncStorage) Remove(ns string, keys []string) error { for _, v := range keys { keysWithNs = append(keysWithNs, getNsPrefix(ns)+v) } - err := s.Del(keysWithNs) + err := s.getDbBackend(ns).Del(keysWithNs) return err } @@ -380,20 +399,20 @@ func (s *SyncStorage) Remove(ns string, keys []string) error { func (s *SyncStorage) RemoveIfAndPublish(ns string, channelsAndEvents []string, key string, data interface{}) (bool, error) { nsPrefix := getNsPrefix(ns) if len(channelsAndEvents) == 0 { - return s.DelIE(nsPrefix+key, data) + return s.getDbBackend(ns).DelIE(nsPrefix+key, data) } if err := s.checkChannelsAndEvents("RemoveIfAndPublish", channelsAndEvents); err != nil { return false, err } channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents) - return s.DelIEPub(channelsAndEventsPrepared, nsPrefix+key, data) + return s.getDbBackend(ns).DelIEPub(channelsAndEventsPrepared, nsPrefix+key, data) } //RemoveIf removes data from SDL conditionally. If existing data matches given data, //key and data are removed from SDL. If remove was done successfully, true is returned. //Data is removed under the namespace what is given as a parameter for this function. func (s *SyncStorage) RemoveIf(ns string, key string, data interface{}) (bool, error) { - status, err := s.DelIE(getNsPrefix(ns)+key, data) + status, err := s.getDbBackend(ns).DelIE(getNsPrefix(ns)+key, data) if err != nil { return false, err } @@ -404,7 +423,7 @@ func (s *SyncStorage) RemoveIf(ns string, key string, data interface{}) (bool, e //given namespace exists, thus operation is not guaranteed to be atomic or isolated. func (s *SyncStorage) GetAll(ns string) ([]string, error) { nsPrefix := getNsPrefix(ns) - keys, err := s.Keys(nsPrefix + "*") + keys, err := s.getDbBackend(ns).Keys(nsPrefix + "*") var retVal []string if err != nil { return retVal, err @@ -415,15 +434,43 @@ func (s *SyncStorage) GetAll(ns string) ([]string, error) { return retVal, err } +// ListKeys returns all keys in the given namespace matching key search pattern. +// +// Supported search glob-style patterns: +// h?llo matches hello, hallo and hxllo +// h*llo matches hllo and heeeello +// h[ae]llo matches hello and hallo, but not hillo +// h[^e]llo matches hallo, hbllo, ... but not hello +// h[a-b]llo matches hallo and hbllo +// +// The \ escapes character in key search pattern and those will be treated as a normal +// character: +// h\[?llo\* matches h[ello* and h[allo* +// +// No prior knowledge about the keys in the given namespace exists, +// thus operation is not guaranteed to be atomic or isolated. +func (s *SyncStorage) ListKeys(ns string, pattern string) ([]string, error) { + nsPrefix := getNsPrefix(ns) + nsKeys, err := s.getDbBackend(ns).Keys(nsPrefix + pattern) + var keys []string + if err != nil { + return keys, err + } + for _, key := range nsKeys { + keys = append(keys, strings.Split(key, nsPrefix)[1]) + } + return keys, err +} + //RemoveAll removes all keys under the namespace. Remove operation is not atomic, thus //it is not guaranteed that all keys are removed. func (s *SyncStorage) RemoveAll(ns string) error { - keys, err := s.Keys(getNsPrefix(ns) + "*") + keys, err := s.getDbBackend(ns).Keys(getNsPrefix(ns) + "*") if err != nil { return err } if (keys != nil) && (len(keys) != 0) { - err = s.Del(keys) + err = s.getDbBackend(ns).Del(keys) } return err } @@ -433,19 +480,19 @@ func (s *SyncStorage) RemoveAll(ns string) error { //not guaranteed that all keys are removed. func (s *SyncStorage) RemoveAllAndPublish(ns string, channelsAndEvents []string) error { nsPrefix := getNsPrefix(ns) - keys, err := s.Keys(nsPrefix + "*") + keys, err := s.getDbBackend(ns).Keys(nsPrefix + "*") if err != nil { return err } if (keys != nil) && (len(keys) != 0) { if len(channelsAndEvents) == 0 { - return s.Del(keys) + return s.getDbBackend(ns).Del(keys) } if err := s.checkChannelsAndEvents("RemoveAllAndPublish", channelsAndEvents); err != nil { return err } channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents) - err = s.DelMPub(channelsAndEventsPrepared, keys) + err = s.getDbBackend(ns).DelMPub(channelsAndEventsPrepared, keys) } return err } @@ -456,22 +503,22 @@ func (s *SyncStorage) RemoveAllAndPublish(ns string, channelsAndEvents []string) //unique. It is possible to add the same member several times without the //need to check if it already exists. func (s *SyncStorage) AddMember(ns string, group string, member ...interface{}) error { - return s.SAdd(getNsPrefix(ns)+group, member...) + return s.getDbBackend(ns).SAdd(getNsPrefix(ns)+group, member...) } //RemoveMember removes members from a group under given namespace. func (s *SyncStorage) RemoveMember(ns string, group string, member ...interface{}) error { - return s.SRem(getNsPrefix(ns)+group, member...) + return s.getDbBackend(ns).SRem(getNsPrefix(ns)+group, member...) } //RemoveGroup removes the whole group along with it's members under given namespace. func (s *SyncStorage) RemoveGroup(ns string, group string) error { - return s.Del([]string{getNsPrefix(ns) + group}) + return s.getDbBackend(ns).Del([]string{getNsPrefix(ns) + group}) } //GetMembers returns all the members from a group under given namespace. func (s *SyncStorage) GetMembers(ns string, group string) ([]string, error) { - retVal, err := s.SMembers(getNsPrefix(ns) + group) + retVal, err := s.getDbBackend(ns).SMembers(getNsPrefix(ns) + group) if err != nil { return []string{}, err } @@ -480,7 +527,7 @@ func (s *SyncStorage) GetMembers(ns string, group string) ([]string, error) { //IsMember returns true if given member is found from a group under given namespace. func (s *SyncStorage) IsMember(ns string, group string, member interface{}) (bool, error) { - retVal, err := s.SIsMember(getNsPrefix(ns)+group, member) + retVal, err := s.getDbBackend(ns).SIsMember(getNsPrefix(ns)+group, member) if err != nil { return false, err } @@ -489,7 +536,7 @@ func (s *SyncStorage) IsMember(ns string, group string, member interface{}) (boo //GroupSize returns the number of members in a group under given namespace. func (s *SyncStorage) GroupSize(ns string, group string) (int64, error) { - retVal, err := s.SCard(getNsPrefix(ns) + group) + retVal, err := s.getDbBackend(ns).SCard(getNsPrefix(ns) + group) if err != nil { return 0, err } @@ -523,7 +570,7 @@ func (s *SyncStorage) LockResource(ns string, resource string, expiration time.D var retryTimer *time.Timer for i, attempts := 0, opt.getRetryCount()+1; i < attempts; i++ { - ok, err := s.SetNX(getNsPrefix(ns)+resource, value, expiration) + ok, err := s.getDbBackend(ns).SetNX(getNsPrefix(ns)+resource, value, expiration) if err != nil { return nil, err } else if ok { @@ -547,7 +594,7 @@ func (s *SyncStorage) LockResource(ns string, resource string, expiration time.D //is already expired or some other instance is keeping the lock (lock taken after //expiration), an error is returned. func (l *SyncStorageLock) ReleaseResource(ns string) error { - ok, err := l.s.DelIE(getNsPrefix(ns)+l.key, l.value) + ok, err := l.s.getDbBackend(ns).DelIE(getNsPrefix(ns)+l.key, l.value) if err != nil { return err @@ -562,14 +609,14 @@ func (l *SyncStorageLock) ReleaseResource(ns string) error { //resource lock (if the lock still exists) under given namespace. The old //remaining expiration time is overwritten with the given new expiration time. func (l *SyncStorageLock) RefreshResource(ns string, expiration time.Duration) error { - err := l.s.PExpireIE(getNsPrefix(ns)+l.key, l.value, expiration) + err := l.s.getDbBackend(ns).PExpireIE(getNsPrefix(ns)+l.key, l.value, expiration) return err } //CheckResource returns the expiration time left for a resource under given //namespace. If the resource doesn't exist, -2 is returned. func (s *SyncStorage) CheckResource(ns string, resource string) (time.Duration, error) { - result, err := s.PTTL(getNsPrefix(ns) + resource) + result, err := s.getDbBackend(ns).PTTL(getNsPrefix(ns) + resource) if err != nil { return 0, err } @@ -588,11 +635,11 @@ type SyncStorageLock struct { } func getNsPrefix(ns string) string { - return "{" + ns + "}," + return "{" + ns + "}" + sdlgoredis.NsSeparator } type iDatabase interface { - SubscribeChannelDB(cb func(string, ...string), channelPrefix, eventSeparator string, channels ...string) + SubscribeChannelDB(cb func(string, ...string), channels ...string) UnsubscribeChannelDB(channels ...string) MSet(pairs ...interface{}) error MSetMPub(channelsAndEvents []string, pairs ...interface{}) error