X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=syncstorage.go;h=850213aef36ca296fc66dcaa68b37f4826494d69;hb=977a55ca96d5dba1c7f9273671747eaf9cd6f894;hp=874a41ec05b074f27b4e359ee438765296588e66;hpb=725e2204f7256de05686d380b4e30a72e9dea8b4;p=ric-plt%2Fsdlgo.git diff --git a/syncstorage.go b/syncstorage.go index 874a41e..850213a 100644 --- a/syncstorage.go +++ b/syncstorage.go @@ -27,6 +27,7 @@ import ( "encoding/base64" "errors" "fmt" + "hash/crc32" "io" "reflect" "strings" @@ -44,7 +45,7 @@ type SyncStorage struct { eventSeparator string mutex sync.Mutex tmp []byte - iDatabase + db *Database } //NewSyncStorage creates a new sdl instance. @@ -56,10 +57,23 @@ func NewSyncStorage() *SyncStorage { func newSyncStorage(db *Database) *SyncStorage { return &SyncStorage{ eventSeparator: "___", - iDatabase: db.instance, + db: db, } } +//selectDbInstance Selects DB instance what provides DB services for the namespace +func (s *SyncStorage) getDbBackend(ns string) iDatabase { + instanceCount := uint32(len(s.db.instances)) + instanceID := getHash(ns) % instanceCount + return s.db.instances[instanceID] +} + +//getHash Returns hash value calculated from the string +func getHash(s string) uint32 { + tbl := crc32.MakeTable(crc32.IEEE) + return crc32.Checksum([]byte(s), tbl) +} + //SubscribeChannel lets you to subscribe for a events on a given channels. //SDL notifications are events that are published on a specific channels. //Both the channel and events are defined by the entity that is publishing @@ -80,7 +94,7 @@ func newSyncStorage(db *Database) *SyncStorage { //events received from different channels, callbacks are called in series one by one. func (s *SyncStorage) SubscribeChannel(ns string, cb func(string, ...string), channels ...string) error { nsPrefix := getNsPrefix(ns) - s.SubscribeChannelDB(cb, nsPrefix, s.eventSeparator, s.setNamespaceToChannels(nsPrefix, channels...)...) + s.getDbBackend(ns).SubscribeChannelDB(cb, nsPrefix, s.eventSeparator, s.setNamespaceToChannels(nsPrefix, channels...)...) return nil } @@ -88,13 +102,19 @@ func (s *SyncStorage) SubscribeChannel(ns string, cb func(string, ...string), ch //namespace. func (s *SyncStorage) UnsubscribeChannel(ns string, channels ...string) error { nsPrefix := getNsPrefix(ns) - s.UnsubscribeChannelDB(s.setNamespaceToChannels(nsPrefix, channels...)...) + s.getDbBackend(ns).UnsubscribeChannelDB(s.setNamespaceToChannels(nsPrefix, channels...)...) return nil } //Close connection to backend database. func (s *SyncStorage) Close() error { - return s.CloseDB() + var ret error + for _, db := range s.db.instances { + if err := db.CloseDB(); err != nil { + ret = err + } + } + return ret } func (s *SyncStorage) checkChannelsAndEvents(cmd string, channelsAndEvents []string) error { @@ -231,13 +251,13 @@ func (s *SyncStorage) SetAndPublish(ns string, channelsAndEvents []string, pairs return err } if len(channelsAndEvents) == 0 { - return s.MSet(keyAndData...) + return s.getDbBackend(ns).MSet(keyAndData...) } if err := s.checkChannelsAndEvents("SetAndPublish", channelsAndEvents); err != nil { return err } channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents) - return s.MSetMPub(channelsAndEventsPrepared, keyAndData...) + return s.getDbBackend(ns).MSetMPub(channelsAndEventsPrepared, keyAndData...) } //Set function writes data to shared data layer storage. Writing is done @@ -256,7 +276,7 @@ func (s *SyncStorage) Set(ns string, pairs ...interface{}) error { if err != nil { return err } - return s.MSet(keyAndData...) + return s.getDbBackend(ns).MSet(keyAndData...) } //Get function atomically reads one or more keys from SDL. The returned map has the @@ -274,7 +294,7 @@ func (s *SyncStorage) Get(ns string, keys []string) (map[string]interface{}, err for _, v := range keys { keysWithNs = append(keysWithNs, getNsPrefix(ns)+v) } - val, err := s.MGet(keysWithNs) + val, err := s.getDbBackend(ns).MGet(keysWithNs) if err != nil { return m, err } @@ -291,20 +311,20 @@ func (s *SyncStorage) Get(ns string, keys []string) (map[string]interface{}, err func (s *SyncStorage) SetIfAndPublish(ns string, channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error) { nsPrefix := getNsPrefix(ns) if len(channelsAndEvents) == 0 { - return s.SetIE(nsPrefix+key, oldData, newData) + return s.getDbBackend(ns).SetIE(nsPrefix+key, oldData, newData) } if err := s.checkChannelsAndEvents("SetIfAndPublish", channelsAndEvents); err != nil { return false, err } channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents) - return s.SetIEPub(channelsAndEventsPrepared, nsPrefix+key, oldData, newData) + return s.getDbBackend(ns).SetIEPub(channelsAndEventsPrepared, nsPrefix+key, oldData, newData) } //SetIf atomically replaces existing data with newData in SDL if data matches the oldData. //If replace was done successfully, true will be returned. //Data is written under the namespace what is given as a parameter for this function. func (s *SyncStorage) SetIf(ns string, key string, oldData, newData interface{}) (bool, error) { - return s.SetIE(getNsPrefix(ns)+key, oldData, newData) + return s.getDbBackend(ns).SetIE(getNsPrefix(ns)+key, oldData, newData) } //SetIfNotExistsAndPublish conditionally sets the value of a key. If key already exists in SDL, @@ -315,13 +335,13 @@ func (s *SyncStorage) SetIf(ns string, key string, oldData, newData interface{}) func (s *SyncStorage) SetIfNotExistsAndPublish(ns string, channelsAndEvents []string, key string, data interface{}) (bool, error) { nsPrefix := getNsPrefix(ns) if len(channelsAndEvents) == 0 { - return s.SetNX(nsPrefix+key, data, 0) + return s.getDbBackend(ns).SetNX(nsPrefix+key, data, 0) } if err := s.checkChannelsAndEvents("SetIfNotExistsAndPublish", channelsAndEvents); err != nil { return false, err } channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents) - return s.SetNXPub(channelsAndEventsPrepared, nsPrefix+key, data) + return s.getDbBackend(ns).SetNXPub(channelsAndEventsPrepared, nsPrefix+key, data) } //SetIfNotExists conditionally sets the value of a key. If key already exists in SDL, @@ -329,7 +349,7 @@ func (s *SyncStorage) SetIfNotExistsAndPublish(ns string, channelsAndEvents []st //is done atomically. //Data is written under the namespace what is given as a parameter for this function. func (s *SyncStorage) SetIfNotExists(ns string, key string, data interface{}) (bool, error) { - return s.SetNX(getNsPrefix(ns)+key, data, 0) + return s.getDbBackend(ns).SetNX(getNsPrefix(ns)+key, data, 0) } //RemoveAndPublish removes data from SDL. Operation is done atomically, i.e. either all succeeds or fails. @@ -349,13 +369,13 @@ func (s *SyncStorage) RemoveAndPublish(ns string, channelsAndEvents []string, ke keysWithNs = append(keysWithNs, nsPrefix+v) } if len(channelsAndEvents) == 0 { - return s.Del(keysWithNs) + return s.getDbBackend(ns).Del(keysWithNs) } if err := s.checkChannelsAndEvents("RemoveAndPublish", channelsAndEvents); err != nil { return err } channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents) - return s.DelMPub(channelsAndEventsPrepared, keysWithNs) + return s.getDbBackend(ns).DelMPub(channelsAndEventsPrepared, keysWithNs) } //Remove data from SDL. Operation is done atomically, i.e. either all succeeds or fails. @@ -369,7 +389,7 @@ func (s *SyncStorage) Remove(ns string, keys []string) error { for _, v := range keys { keysWithNs = append(keysWithNs, getNsPrefix(ns)+v) } - err := s.Del(keysWithNs) + err := s.getDbBackend(ns).Del(keysWithNs) return err } @@ -380,20 +400,20 @@ func (s *SyncStorage) Remove(ns string, keys []string) error { func (s *SyncStorage) RemoveIfAndPublish(ns string, channelsAndEvents []string, key string, data interface{}) (bool, error) { nsPrefix := getNsPrefix(ns) if len(channelsAndEvents) == 0 { - return s.DelIE(nsPrefix+key, data) + return s.getDbBackend(ns).DelIE(nsPrefix+key, data) } if err := s.checkChannelsAndEvents("RemoveIfAndPublish", channelsAndEvents); err != nil { return false, err } channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents) - return s.DelIEPub(channelsAndEventsPrepared, nsPrefix+key, data) + return s.getDbBackend(ns).DelIEPub(channelsAndEventsPrepared, nsPrefix+key, data) } //RemoveIf removes data from SDL conditionally. If existing data matches given data, //key and data are removed from SDL. If remove was done successfully, true is returned. //Data is removed under the namespace what is given as a parameter for this function. func (s *SyncStorage) RemoveIf(ns string, key string, data interface{}) (bool, error) { - status, err := s.DelIE(getNsPrefix(ns)+key, data) + status, err := s.getDbBackend(ns).DelIE(getNsPrefix(ns)+key, data) if err != nil { return false, err } @@ -404,7 +424,7 @@ func (s *SyncStorage) RemoveIf(ns string, key string, data interface{}) (bool, e //given namespace exists, thus operation is not guaranteed to be atomic or isolated. func (s *SyncStorage) GetAll(ns string) ([]string, error) { nsPrefix := getNsPrefix(ns) - keys, err := s.Keys(nsPrefix + "*") + keys, err := s.getDbBackend(ns).Keys(nsPrefix + "*") var retVal []string if err != nil { return retVal, err @@ -415,15 +435,43 @@ func (s *SyncStorage) GetAll(ns string) ([]string, error) { return retVal, err } +// ListKeys returns all keys in the given namespace matching key search pattern. +// +// Supported search glob-style patterns: +// h?llo matches hello, hallo and hxllo +// h*llo matches hllo and heeeello +// h[ae]llo matches hello and hallo, but not hillo +// h[^e]llo matches hallo, hbllo, ... but not hello +// h[a-b]llo matches hallo and hbllo +// +// The \ escapes character in key search pattern and those will be treated as a normal +// character: +// h\[?llo\* matches h[ello* and h[allo* +// +// No prior knowledge about the keys in the given namespace exists, +// thus operation is not guaranteed to be atomic or isolated. +func (s *SyncStorage) ListKeys(ns string, pattern string) ([]string, error) { + nsPrefix := getNsPrefix(ns) + nsKeys, err := s.getDbBackend(ns).Keys(nsPrefix + pattern) + var keys []string + if err != nil { + return keys, err + } + for _, key := range nsKeys { + keys = append(keys, strings.Split(key, nsPrefix)[1]) + } + return keys, err +} + //RemoveAll removes all keys under the namespace. Remove operation is not atomic, thus //it is not guaranteed that all keys are removed. func (s *SyncStorage) RemoveAll(ns string) error { - keys, err := s.Keys(getNsPrefix(ns) + "*") + keys, err := s.getDbBackend(ns).Keys(getNsPrefix(ns) + "*") if err != nil { return err } if (keys != nil) && (len(keys) != 0) { - err = s.Del(keys) + err = s.getDbBackend(ns).Del(keys) } return err } @@ -433,19 +481,19 @@ func (s *SyncStorage) RemoveAll(ns string) error { //not guaranteed that all keys are removed. func (s *SyncStorage) RemoveAllAndPublish(ns string, channelsAndEvents []string) error { nsPrefix := getNsPrefix(ns) - keys, err := s.Keys(nsPrefix + "*") + keys, err := s.getDbBackend(ns).Keys(nsPrefix + "*") if err != nil { return err } if (keys != nil) && (len(keys) != 0) { if len(channelsAndEvents) == 0 { - return s.Del(keys) + return s.getDbBackend(ns).Del(keys) } if err := s.checkChannelsAndEvents("RemoveAllAndPublish", channelsAndEvents); err != nil { return err } channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents) - err = s.DelMPub(channelsAndEventsPrepared, keys) + err = s.getDbBackend(ns).DelMPub(channelsAndEventsPrepared, keys) } return err } @@ -456,22 +504,22 @@ func (s *SyncStorage) RemoveAllAndPublish(ns string, channelsAndEvents []string) //unique. It is possible to add the same member several times without the //need to check if it already exists. func (s *SyncStorage) AddMember(ns string, group string, member ...interface{}) error { - return s.SAdd(getNsPrefix(ns)+group, member...) + return s.getDbBackend(ns).SAdd(getNsPrefix(ns)+group, member...) } //RemoveMember removes members from a group under given namespace. func (s *SyncStorage) RemoveMember(ns string, group string, member ...interface{}) error { - return s.SRem(getNsPrefix(ns)+group, member...) + return s.getDbBackend(ns).SRem(getNsPrefix(ns)+group, member...) } //RemoveGroup removes the whole group along with it's members under given namespace. func (s *SyncStorage) RemoveGroup(ns string, group string) error { - return s.Del([]string{getNsPrefix(ns) + group}) + return s.getDbBackend(ns).Del([]string{getNsPrefix(ns) + group}) } //GetMembers returns all the members from a group under given namespace. func (s *SyncStorage) GetMembers(ns string, group string) ([]string, error) { - retVal, err := s.SMembers(getNsPrefix(ns) + group) + retVal, err := s.getDbBackend(ns).SMembers(getNsPrefix(ns) + group) if err != nil { return []string{}, err } @@ -480,7 +528,7 @@ func (s *SyncStorage) GetMembers(ns string, group string) ([]string, error) { //IsMember returns true if given member is found from a group under given namespace. func (s *SyncStorage) IsMember(ns string, group string, member interface{}) (bool, error) { - retVal, err := s.SIsMember(getNsPrefix(ns)+group, member) + retVal, err := s.getDbBackend(ns).SIsMember(getNsPrefix(ns)+group, member) if err != nil { return false, err } @@ -489,7 +537,7 @@ func (s *SyncStorage) IsMember(ns string, group string, member interface{}) (boo //GroupSize returns the number of members in a group under given namespace. func (s *SyncStorage) GroupSize(ns string, group string) (int64, error) { - retVal, err := s.SCard(getNsPrefix(ns) + group) + retVal, err := s.getDbBackend(ns).SCard(getNsPrefix(ns) + group) if err != nil { return 0, err } @@ -523,7 +571,7 @@ func (s *SyncStorage) LockResource(ns string, resource string, expiration time.D var retryTimer *time.Timer for i, attempts := 0, opt.getRetryCount()+1; i < attempts; i++ { - ok, err := s.SetNX(getNsPrefix(ns)+resource, value, expiration) + ok, err := s.getDbBackend(ns).SetNX(getNsPrefix(ns)+resource, value, expiration) if err != nil { return nil, err } else if ok { @@ -547,7 +595,7 @@ func (s *SyncStorage) LockResource(ns string, resource string, expiration time.D //is already expired or some other instance is keeping the lock (lock taken after //expiration), an error is returned. func (l *SyncStorageLock) ReleaseResource(ns string) error { - ok, err := l.s.DelIE(getNsPrefix(ns)+l.key, l.value) + ok, err := l.s.getDbBackend(ns).DelIE(getNsPrefix(ns)+l.key, l.value) if err != nil { return err @@ -562,14 +610,14 @@ func (l *SyncStorageLock) ReleaseResource(ns string) error { //resource lock (if the lock still exists) under given namespace. The old //remaining expiration time is overwritten with the given new expiration time. func (l *SyncStorageLock) RefreshResource(ns string, expiration time.Duration) error { - err := l.s.PExpireIE(getNsPrefix(ns)+l.key, l.value, expiration) + err := l.s.getDbBackend(ns).PExpireIE(getNsPrefix(ns)+l.key, l.value, expiration) return err } //CheckResource returns the expiration time left for a resource under given //namespace. If the resource doesn't exist, -2 is returned. func (s *SyncStorage) CheckResource(ns string, resource string) (time.Duration, error) { - result, err := s.PTTL(getNsPrefix(ns) + resource) + result, err := s.getDbBackend(ns).PTTL(getNsPrefix(ns) + resource) if err != nil { return 0, err }