X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;ds=sidebyside;f=syncstorage.go;h=f2f708e42387b87cc87d4c8610eb0978198bd78b;hb=refs%2Fchanges%2F62%2F6062%2F1;hp=874a41ec05b074f27b4e359ee438765296588e66;hpb=725e2204f7256de05686d380b4e30a72e9dea8b4;p=ric-plt%2Fsdlgo.git diff --git a/syncstorage.go b/syncstorage.go index 874a41e..f2f708e 100644 --- a/syncstorage.go +++ b/syncstorage.go @@ -27,6 +27,7 @@ import ( "encoding/base64" "errors" "fmt" + "hash/crc32" "io" "reflect" "strings" @@ -44,7 +45,7 @@ type SyncStorage struct { eventSeparator string mutex sync.Mutex tmp []byte - iDatabase + db *Database } //NewSyncStorage creates a new sdl instance. @@ -56,10 +57,23 @@ func NewSyncStorage() *SyncStorage { func newSyncStorage(db *Database) *SyncStorage { return &SyncStorage{ eventSeparator: "___", - iDatabase: db.instance, + db: db, } } +//selectDbInstance Selects DB instance what provides DB services for the namespace +func (s *SyncStorage) getDbBackend(ns string) iDatabase { + instanceCount := uint32(len(s.db.instances)) + instanceID := getHash(ns) % instanceCount + return s.db.instances[instanceID] +} + +//getHash Returns hash value calculated from the string +func getHash(s string) uint32 { + tbl := crc32.MakeTable(crc32.IEEE) + return crc32.Checksum([]byte(s), tbl) +} + //SubscribeChannel lets you to subscribe for a events on a given channels. //SDL notifications are events that are published on a specific channels. //Both the channel and events are defined by the entity that is publishing @@ -80,7 +94,7 @@ func newSyncStorage(db *Database) *SyncStorage { //events received from different channels, callbacks are called in series one by one. func (s *SyncStorage) SubscribeChannel(ns string, cb func(string, ...string), channels ...string) error { nsPrefix := getNsPrefix(ns) - s.SubscribeChannelDB(cb, nsPrefix, s.eventSeparator, s.setNamespaceToChannels(nsPrefix, channels...)...) + s.getDbBackend(ns).SubscribeChannelDB(cb, nsPrefix, s.eventSeparator, s.setNamespaceToChannels(nsPrefix, channels...)...) return nil } @@ -88,13 +102,19 @@ func (s *SyncStorage) SubscribeChannel(ns string, cb func(string, ...string), ch //namespace. func (s *SyncStorage) UnsubscribeChannel(ns string, channels ...string) error { nsPrefix := getNsPrefix(ns) - s.UnsubscribeChannelDB(s.setNamespaceToChannels(nsPrefix, channels...)...) + s.getDbBackend(ns).UnsubscribeChannelDB(s.setNamespaceToChannels(nsPrefix, channels...)...) return nil } //Close connection to backend database. func (s *SyncStorage) Close() error { - return s.CloseDB() + var ret error + for _, db := range s.db.instances { + if err := db.CloseDB(); err != nil { + ret = err + } + } + return ret } func (s *SyncStorage) checkChannelsAndEvents(cmd string, channelsAndEvents []string) error { @@ -231,13 +251,13 @@ func (s *SyncStorage) SetAndPublish(ns string, channelsAndEvents []string, pairs return err } if len(channelsAndEvents) == 0 { - return s.MSet(keyAndData...) + return s.getDbBackend(ns).MSet(keyAndData...) } if err := s.checkChannelsAndEvents("SetAndPublish", channelsAndEvents); err != nil { return err } channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents) - return s.MSetMPub(channelsAndEventsPrepared, keyAndData...) + return s.getDbBackend(ns).MSetMPub(channelsAndEventsPrepared, keyAndData...) } //Set function writes data to shared data layer storage. Writing is done @@ -256,7 +276,7 @@ func (s *SyncStorage) Set(ns string, pairs ...interface{}) error { if err != nil { return err } - return s.MSet(keyAndData...) + return s.getDbBackend(ns).MSet(keyAndData...) } //Get function atomically reads one or more keys from SDL. The returned map has the @@ -274,7 +294,7 @@ func (s *SyncStorage) Get(ns string, keys []string) (map[string]interface{}, err for _, v := range keys { keysWithNs = append(keysWithNs, getNsPrefix(ns)+v) } - val, err := s.MGet(keysWithNs) + val, err := s.getDbBackend(ns).MGet(keysWithNs) if err != nil { return m, err } @@ -291,20 +311,20 @@ func (s *SyncStorage) Get(ns string, keys []string) (map[string]interface{}, err func (s *SyncStorage) SetIfAndPublish(ns string, channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error) { nsPrefix := getNsPrefix(ns) if len(channelsAndEvents) == 0 { - return s.SetIE(nsPrefix+key, oldData, newData) + return s.getDbBackend(ns).SetIE(nsPrefix+key, oldData, newData) } if err := s.checkChannelsAndEvents("SetIfAndPublish", channelsAndEvents); err != nil { return false, err } channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents) - return s.SetIEPub(channelsAndEventsPrepared, nsPrefix+key, oldData, newData) + return s.getDbBackend(ns).SetIEPub(channelsAndEventsPrepared, nsPrefix+key, oldData, newData) } //SetIf atomically replaces existing data with newData in SDL if data matches the oldData. //If replace was done successfully, true will be returned. //Data is written under the namespace what is given as a parameter for this function. func (s *SyncStorage) SetIf(ns string, key string, oldData, newData interface{}) (bool, error) { - return s.SetIE(getNsPrefix(ns)+key, oldData, newData) + return s.getDbBackend(ns).SetIE(getNsPrefix(ns)+key, oldData, newData) } //SetIfNotExistsAndPublish conditionally sets the value of a key. If key already exists in SDL, @@ -315,13 +335,13 @@ func (s *SyncStorage) SetIf(ns string, key string, oldData, newData interface{}) func (s *SyncStorage) SetIfNotExistsAndPublish(ns string, channelsAndEvents []string, key string, data interface{}) (bool, error) { nsPrefix := getNsPrefix(ns) if len(channelsAndEvents) == 0 { - return s.SetNX(nsPrefix+key, data, 0) + return s.getDbBackend(ns).SetNX(nsPrefix+key, data, 0) } if err := s.checkChannelsAndEvents("SetIfNotExistsAndPublish", channelsAndEvents); err != nil { return false, err } channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents) - return s.SetNXPub(channelsAndEventsPrepared, nsPrefix+key, data) + return s.getDbBackend(ns).SetNXPub(channelsAndEventsPrepared, nsPrefix+key, data) } //SetIfNotExists conditionally sets the value of a key. If key already exists in SDL, @@ -329,7 +349,7 @@ func (s *SyncStorage) SetIfNotExistsAndPublish(ns string, channelsAndEvents []st //is done atomically. //Data is written under the namespace what is given as a parameter for this function. func (s *SyncStorage) SetIfNotExists(ns string, key string, data interface{}) (bool, error) { - return s.SetNX(getNsPrefix(ns)+key, data, 0) + return s.getDbBackend(ns).SetNX(getNsPrefix(ns)+key, data, 0) } //RemoveAndPublish removes data from SDL. Operation is done atomically, i.e. either all succeeds or fails. @@ -349,13 +369,13 @@ func (s *SyncStorage) RemoveAndPublish(ns string, channelsAndEvents []string, ke keysWithNs = append(keysWithNs, nsPrefix+v) } if len(channelsAndEvents) == 0 { - return s.Del(keysWithNs) + return s.getDbBackend(ns).Del(keysWithNs) } if err := s.checkChannelsAndEvents("RemoveAndPublish", channelsAndEvents); err != nil { return err } channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents) - return s.DelMPub(channelsAndEventsPrepared, keysWithNs) + return s.getDbBackend(ns).DelMPub(channelsAndEventsPrepared, keysWithNs) } //Remove data from SDL. Operation is done atomically, i.e. either all succeeds or fails. @@ -369,7 +389,7 @@ func (s *SyncStorage) Remove(ns string, keys []string) error { for _, v := range keys { keysWithNs = append(keysWithNs, getNsPrefix(ns)+v) } - err := s.Del(keysWithNs) + err := s.getDbBackend(ns).Del(keysWithNs) return err } @@ -380,20 +400,20 @@ func (s *SyncStorage) Remove(ns string, keys []string) error { func (s *SyncStorage) RemoveIfAndPublish(ns string, channelsAndEvents []string, key string, data interface{}) (bool, error) { nsPrefix := getNsPrefix(ns) if len(channelsAndEvents) == 0 { - return s.DelIE(nsPrefix+key, data) + return s.getDbBackend(ns).DelIE(nsPrefix+key, data) } if err := s.checkChannelsAndEvents("RemoveIfAndPublish", channelsAndEvents); err != nil { return false, err } channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents) - return s.DelIEPub(channelsAndEventsPrepared, nsPrefix+key, data) + return s.getDbBackend(ns).DelIEPub(channelsAndEventsPrepared, nsPrefix+key, data) } //RemoveIf removes data from SDL conditionally. If existing data matches given data, //key and data are removed from SDL. If remove was done successfully, true is returned. //Data is removed under the namespace what is given as a parameter for this function. func (s *SyncStorage) RemoveIf(ns string, key string, data interface{}) (bool, error) { - status, err := s.DelIE(getNsPrefix(ns)+key, data) + status, err := s.getDbBackend(ns).DelIE(getNsPrefix(ns)+key, data) if err != nil { return false, err } @@ -404,7 +424,7 @@ func (s *SyncStorage) RemoveIf(ns string, key string, data interface{}) (bool, e //given namespace exists, thus operation is not guaranteed to be atomic or isolated. func (s *SyncStorage) GetAll(ns string) ([]string, error) { nsPrefix := getNsPrefix(ns) - keys, err := s.Keys(nsPrefix + "*") + keys, err := s.getDbBackend(ns).Keys(nsPrefix + "*") var retVal []string if err != nil { return retVal, err @@ -418,12 +438,12 @@ func (s *SyncStorage) GetAll(ns string) ([]string, error) { //RemoveAll removes all keys under the namespace. Remove operation is not atomic, thus //it is not guaranteed that all keys are removed. func (s *SyncStorage) RemoveAll(ns string) error { - keys, err := s.Keys(getNsPrefix(ns) + "*") + keys, err := s.getDbBackend(ns).Keys(getNsPrefix(ns) + "*") if err != nil { return err } if (keys != nil) && (len(keys) != 0) { - err = s.Del(keys) + err = s.getDbBackend(ns).Del(keys) } return err } @@ -433,19 +453,19 @@ func (s *SyncStorage) RemoveAll(ns string) error { //not guaranteed that all keys are removed. func (s *SyncStorage) RemoveAllAndPublish(ns string, channelsAndEvents []string) error { nsPrefix := getNsPrefix(ns) - keys, err := s.Keys(nsPrefix + "*") + keys, err := s.getDbBackend(ns).Keys(nsPrefix + "*") if err != nil { return err } if (keys != nil) && (len(keys) != 0) { if len(channelsAndEvents) == 0 { - return s.Del(keys) + return s.getDbBackend(ns).Del(keys) } if err := s.checkChannelsAndEvents("RemoveAllAndPublish", channelsAndEvents); err != nil { return err } channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents) - err = s.DelMPub(channelsAndEventsPrepared, keys) + err = s.getDbBackend(ns).DelMPub(channelsAndEventsPrepared, keys) } return err } @@ -456,22 +476,22 @@ func (s *SyncStorage) RemoveAllAndPublish(ns string, channelsAndEvents []string) //unique. It is possible to add the same member several times without the //need to check if it already exists. func (s *SyncStorage) AddMember(ns string, group string, member ...interface{}) error { - return s.SAdd(getNsPrefix(ns)+group, member...) + return s.getDbBackend(ns).SAdd(getNsPrefix(ns)+group, member...) } //RemoveMember removes members from a group under given namespace. func (s *SyncStorage) RemoveMember(ns string, group string, member ...interface{}) error { - return s.SRem(getNsPrefix(ns)+group, member...) + return s.getDbBackend(ns).SRem(getNsPrefix(ns)+group, member...) } //RemoveGroup removes the whole group along with it's members under given namespace. func (s *SyncStorage) RemoveGroup(ns string, group string) error { - return s.Del([]string{getNsPrefix(ns) + group}) + return s.getDbBackend(ns).Del([]string{getNsPrefix(ns) + group}) } //GetMembers returns all the members from a group under given namespace. func (s *SyncStorage) GetMembers(ns string, group string) ([]string, error) { - retVal, err := s.SMembers(getNsPrefix(ns) + group) + retVal, err := s.getDbBackend(ns).SMembers(getNsPrefix(ns) + group) if err != nil { return []string{}, err } @@ -480,7 +500,7 @@ func (s *SyncStorage) GetMembers(ns string, group string) ([]string, error) { //IsMember returns true if given member is found from a group under given namespace. func (s *SyncStorage) IsMember(ns string, group string, member interface{}) (bool, error) { - retVal, err := s.SIsMember(getNsPrefix(ns)+group, member) + retVal, err := s.getDbBackend(ns).SIsMember(getNsPrefix(ns)+group, member) if err != nil { return false, err } @@ -489,7 +509,7 @@ func (s *SyncStorage) IsMember(ns string, group string, member interface{}) (boo //GroupSize returns the number of members in a group under given namespace. func (s *SyncStorage) GroupSize(ns string, group string) (int64, error) { - retVal, err := s.SCard(getNsPrefix(ns) + group) + retVal, err := s.getDbBackend(ns).SCard(getNsPrefix(ns) + group) if err != nil { return 0, err } @@ -523,7 +543,7 @@ func (s *SyncStorage) LockResource(ns string, resource string, expiration time.D var retryTimer *time.Timer for i, attempts := 0, opt.getRetryCount()+1; i < attempts; i++ { - ok, err := s.SetNX(getNsPrefix(ns)+resource, value, expiration) + ok, err := s.getDbBackend(ns).SetNX(getNsPrefix(ns)+resource, value, expiration) if err != nil { return nil, err } else if ok { @@ -547,7 +567,7 @@ func (s *SyncStorage) LockResource(ns string, resource string, expiration time.D //is already expired or some other instance is keeping the lock (lock taken after //expiration), an error is returned. func (l *SyncStorageLock) ReleaseResource(ns string) error { - ok, err := l.s.DelIE(getNsPrefix(ns)+l.key, l.value) + ok, err := l.s.getDbBackend(ns).DelIE(getNsPrefix(ns)+l.key, l.value) if err != nil { return err @@ -562,14 +582,14 @@ func (l *SyncStorageLock) ReleaseResource(ns string) error { //resource lock (if the lock still exists) under given namespace. The old //remaining expiration time is overwritten with the given new expiration time. func (l *SyncStorageLock) RefreshResource(ns string, expiration time.Duration) error { - err := l.s.PExpireIE(getNsPrefix(ns)+l.key, l.value, expiration) + err := l.s.getDbBackend(ns).PExpireIE(getNsPrefix(ns)+l.key, l.value, expiration) return err } //CheckResource returns the expiration time left for a resource under given //namespace. If the resource doesn't exist, -2 is returned. func (s *SyncStorage) CheckResource(ns string, resource string) (time.Duration, error) { - result, err := s.PTTL(getNsPrefix(ns) + resource) + result, err := s.getDbBackend(ns).PTTL(getNsPrefix(ns) + resource) if err != nil { return 0, err }