Implement sentinel based DB capacity scaling
[ric-plt/sdlgo.git] / syncstorage.go
index 874a41e..f2f708e 100644 (file)
@@ -27,6 +27,7 @@ import (
        "encoding/base64"
        "errors"
        "fmt"
+       "hash/crc32"
        "io"
        "reflect"
        "strings"
@@ -44,7 +45,7 @@ type SyncStorage struct {
        eventSeparator string
        mutex          sync.Mutex
        tmp            []byte
-       iDatabase
+       db             *Database
 }
 
 //NewSyncStorage creates a new sdl instance.
@@ -56,10 +57,23 @@ func NewSyncStorage() *SyncStorage {
 func newSyncStorage(db *Database) *SyncStorage {
        return &SyncStorage{
                eventSeparator: "___",
-               iDatabase:      db.instance,
+               db:             db,
        }
 }
 
+//selectDbInstance Selects DB instance what provides DB services for the namespace
+func (s *SyncStorage) getDbBackend(ns string) iDatabase {
+       instanceCount := uint32(len(s.db.instances))
+       instanceID := getHash(ns) % instanceCount
+       return s.db.instances[instanceID]
+}
+
+//getHash Returns hash value calculated from the string
+func getHash(s string) uint32 {
+       tbl := crc32.MakeTable(crc32.IEEE)
+       return crc32.Checksum([]byte(s), tbl)
+}
+
 //SubscribeChannel lets you to subscribe for a events on a given channels.
 //SDL notifications are events that are published on a specific channels.
 //Both the channel and events are defined by the entity that is publishing
@@ -80,7 +94,7 @@ func newSyncStorage(db *Database) *SyncStorage {
 //events received from different channels, callbacks are called in series one by one.
 func (s *SyncStorage) SubscribeChannel(ns string, cb func(string, ...string), channels ...string) error {
        nsPrefix := getNsPrefix(ns)
-       s.SubscribeChannelDB(cb, nsPrefix, s.eventSeparator, s.setNamespaceToChannels(nsPrefix, channels...)...)
+       s.getDbBackend(ns).SubscribeChannelDB(cb, nsPrefix, s.eventSeparator, s.setNamespaceToChannels(nsPrefix, channels...)...)
        return nil
 }
 
@@ -88,13 +102,19 @@ func (s *SyncStorage) SubscribeChannel(ns string, cb func(string, ...string), ch
 //namespace.
 func (s *SyncStorage) UnsubscribeChannel(ns string, channels ...string) error {
        nsPrefix := getNsPrefix(ns)
-       s.UnsubscribeChannelDB(s.setNamespaceToChannels(nsPrefix, channels...)...)
+       s.getDbBackend(ns).UnsubscribeChannelDB(s.setNamespaceToChannels(nsPrefix, channels...)...)
        return nil
 }
 
 //Close connection to backend database.
 func (s *SyncStorage) Close() error {
-       return s.CloseDB()
+       var ret error
+       for _, db := range s.db.instances {
+               if err := db.CloseDB(); err != nil {
+                       ret = err
+               }
+       }
+       return ret
 }
 
 func (s *SyncStorage) checkChannelsAndEvents(cmd string, channelsAndEvents []string) error {
@@ -231,13 +251,13 @@ func (s *SyncStorage) SetAndPublish(ns string, channelsAndEvents []string, pairs
                return err
        }
        if len(channelsAndEvents) == 0 {
-               return s.MSet(keyAndData...)
+               return s.getDbBackend(ns).MSet(keyAndData...)
        }
        if err := s.checkChannelsAndEvents("SetAndPublish", channelsAndEvents); err != nil {
                return err
        }
        channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
-       return s.MSetMPub(channelsAndEventsPrepared, keyAndData...)
+       return s.getDbBackend(ns).MSetMPub(channelsAndEventsPrepared, keyAndData...)
 }
 
 //Set function writes data to shared data layer storage. Writing is done
@@ -256,7 +276,7 @@ func (s *SyncStorage) Set(ns string, pairs ...interface{}) error {
        if err != nil {
                return err
        }
-       return s.MSet(keyAndData...)
+       return s.getDbBackend(ns).MSet(keyAndData...)
 }
 
 //Get function atomically reads one or more keys from SDL. The returned map has the
@@ -274,7 +294,7 @@ func (s *SyncStorage) Get(ns string, keys []string) (map[string]interface{}, err
        for _, v := range keys {
                keysWithNs = append(keysWithNs, getNsPrefix(ns)+v)
        }
-       val, err := s.MGet(keysWithNs)
+       val, err := s.getDbBackend(ns).MGet(keysWithNs)
        if err != nil {
                return m, err
        }
@@ -291,20 +311,20 @@ func (s *SyncStorage) Get(ns string, keys []string) (map[string]interface{}, err
 func (s *SyncStorage) SetIfAndPublish(ns string, channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error) {
        nsPrefix := getNsPrefix(ns)
        if len(channelsAndEvents) == 0 {
-               return s.SetIE(nsPrefix+key, oldData, newData)
+               return s.getDbBackend(ns).SetIE(nsPrefix+key, oldData, newData)
        }
        if err := s.checkChannelsAndEvents("SetIfAndPublish", channelsAndEvents); err != nil {
                return false, err
        }
        channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
-       return s.SetIEPub(channelsAndEventsPrepared, nsPrefix+key, oldData, newData)
+       return s.getDbBackend(ns).SetIEPub(channelsAndEventsPrepared, nsPrefix+key, oldData, newData)
 }
 
 //SetIf atomically replaces existing data with newData in SDL if data matches the oldData.
 //If replace was done successfully, true will be returned.
 //Data is written under the namespace what is given as a parameter for this function.
 func (s *SyncStorage) SetIf(ns string, key string, oldData, newData interface{}) (bool, error) {
-       return s.SetIE(getNsPrefix(ns)+key, oldData, newData)
+       return s.getDbBackend(ns).SetIE(getNsPrefix(ns)+key, oldData, newData)
 }
 
 //SetIfNotExistsAndPublish conditionally sets the value of a key. If key already exists in SDL,
@@ -315,13 +335,13 @@ func (s *SyncStorage) SetIf(ns string, key string, oldData, newData interface{})
 func (s *SyncStorage) SetIfNotExistsAndPublish(ns string, channelsAndEvents []string, key string, data interface{}) (bool, error) {
        nsPrefix := getNsPrefix(ns)
        if len(channelsAndEvents) == 0 {
-               return s.SetNX(nsPrefix+key, data, 0)
+               return s.getDbBackend(ns).SetNX(nsPrefix+key, data, 0)
        }
        if err := s.checkChannelsAndEvents("SetIfNotExistsAndPublish", channelsAndEvents); err != nil {
                return false, err
        }
        channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
-       return s.SetNXPub(channelsAndEventsPrepared, nsPrefix+key, data)
+       return s.getDbBackend(ns).SetNXPub(channelsAndEventsPrepared, nsPrefix+key, data)
 }
 
 //SetIfNotExists conditionally sets the value of a key. If key already exists in SDL,
@@ -329,7 +349,7 @@ func (s *SyncStorage) SetIfNotExistsAndPublish(ns string, channelsAndEvents []st
 //is done atomically.
 //Data is written under the namespace what is given as a parameter for this function.
 func (s *SyncStorage) SetIfNotExists(ns string, key string, data interface{}) (bool, error) {
-       return s.SetNX(getNsPrefix(ns)+key, data, 0)
+       return s.getDbBackend(ns).SetNX(getNsPrefix(ns)+key, data, 0)
 }
 
 //RemoveAndPublish removes data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
@@ -349,13 +369,13 @@ func (s *SyncStorage) RemoveAndPublish(ns string, channelsAndEvents []string, ke
                keysWithNs = append(keysWithNs, nsPrefix+v)
        }
        if len(channelsAndEvents) == 0 {
-               return s.Del(keysWithNs)
+               return s.getDbBackend(ns).Del(keysWithNs)
        }
        if err := s.checkChannelsAndEvents("RemoveAndPublish", channelsAndEvents); err != nil {
                return err
        }
        channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
-       return s.DelMPub(channelsAndEventsPrepared, keysWithNs)
+       return s.getDbBackend(ns).DelMPub(channelsAndEventsPrepared, keysWithNs)
 }
 
 //Remove data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
@@ -369,7 +389,7 @@ func (s *SyncStorage) Remove(ns string, keys []string) error {
        for _, v := range keys {
                keysWithNs = append(keysWithNs, getNsPrefix(ns)+v)
        }
-       err := s.Del(keysWithNs)
+       err := s.getDbBackend(ns).Del(keysWithNs)
        return err
 }
 
@@ -380,20 +400,20 @@ func (s *SyncStorage) Remove(ns string, keys []string) error {
 func (s *SyncStorage) RemoveIfAndPublish(ns string, channelsAndEvents []string, key string, data interface{}) (bool, error) {
        nsPrefix := getNsPrefix(ns)
        if len(channelsAndEvents) == 0 {
-               return s.DelIE(nsPrefix+key, data)
+               return s.getDbBackend(ns).DelIE(nsPrefix+key, data)
        }
        if err := s.checkChannelsAndEvents("RemoveIfAndPublish", channelsAndEvents); err != nil {
                return false, err
        }
        channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
-       return s.DelIEPub(channelsAndEventsPrepared, nsPrefix+key, data)
+       return s.getDbBackend(ns).DelIEPub(channelsAndEventsPrepared, nsPrefix+key, data)
 }
 
 //RemoveIf removes data from SDL conditionally. If existing data matches given data,
 //key and data are removed from SDL. If remove was done successfully, true is returned.
 //Data is removed under the namespace what is given as a parameter for this function.
 func (s *SyncStorage) RemoveIf(ns string, key string, data interface{}) (bool, error) {
-       status, err := s.DelIE(getNsPrefix(ns)+key, data)
+       status, err := s.getDbBackend(ns).DelIE(getNsPrefix(ns)+key, data)
        if err != nil {
                return false, err
        }
@@ -404,7 +424,7 @@ func (s *SyncStorage) RemoveIf(ns string, key string, data interface{}) (bool, e
 //given namespace exists, thus operation is not guaranteed to be atomic or isolated.
 func (s *SyncStorage) GetAll(ns string) ([]string, error) {
        nsPrefix := getNsPrefix(ns)
-       keys, err := s.Keys(nsPrefix + "*")
+       keys, err := s.getDbBackend(ns).Keys(nsPrefix + "*")
        var retVal []string
        if err != nil {
                return retVal, err
@@ -418,12 +438,12 @@ func (s *SyncStorage) GetAll(ns string) ([]string, error) {
 //RemoveAll removes all keys under the namespace. Remove operation is not atomic, thus
 //it is not guaranteed that all keys are removed.
 func (s *SyncStorage) RemoveAll(ns string) error {
-       keys, err := s.Keys(getNsPrefix(ns) + "*")
+       keys, err := s.getDbBackend(ns).Keys(getNsPrefix(ns) + "*")
        if err != nil {
                return err
        }
        if (keys != nil) && (len(keys) != 0) {
-               err = s.Del(keys)
+               err = s.getDbBackend(ns).Del(keys)
        }
        return err
 }
@@ -433,19 +453,19 @@ func (s *SyncStorage) RemoveAll(ns string) error {
 //not guaranteed that all keys are removed.
 func (s *SyncStorage) RemoveAllAndPublish(ns string, channelsAndEvents []string) error {
        nsPrefix := getNsPrefix(ns)
-       keys, err := s.Keys(nsPrefix + "*")
+       keys, err := s.getDbBackend(ns).Keys(nsPrefix + "*")
        if err != nil {
                return err
        }
        if (keys != nil) && (len(keys) != 0) {
                if len(channelsAndEvents) == 0 {
-                       return s.Del(keys)
+                       return s.getDbBackend(ns).Del(keys)
                }
                if err := s.checkChannelsAndEvents("RemoveAllAndPublish", channelsAndEvents); err != nil {
                        return err
                }
                channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
-               err = s.DelMPub(channelsAndEventsPrepared, keys)
+               err = s.getDbBackend(ns).DelMPub(channelsAndEventsPrepared, keys)
        }
        return err
 }
@@ -456,22 +476,22 @@ func (s *SyncStorage) RemoveAllAndPublish(ns string, channelsAndEvents []string)
 //unique. It is possible to add the same member several times without the
 //need to check if it already exists.
 func (s *SyncStorage) AddMember(ns string, group string, member ...interface{}) error {
-       return s.SAdd(getNsPrefix(ns)+group, member...)
+       return s.getDbBackend(ns).SAdd(getNsPrefix(ns)+group, member...)
 }
 
 //RemoveMember removes members from a group under given namespace.
 func (s *SyncStorage) RemoveMember(ns string, group string, member ...interface{}) error {
-       return s.SRem(getNsPrefix(ns)+group, member...)
+       return s.getDbBackend(ns).SRem(getNsPrefix(ns)+group, member...)
 }
 
 //RemoveGroup removes the whole group along with it's members under given namespace.
 func (s *SyncStorage) RemoveGroup(ns string, group string) error {
-       return s.Del([]string{getNsPrefix(ns) + group})
+       return s.getDbBackend(ns).Del([]string{getNsPrefix(ns) + group})
 }
 
 //GetMembers returns all the members from a group under given namespace.
 func (s *SyncStorage) GetMembers(ns string, group string) ([]string, error) {
-       retVal, err := s.SMembers(getNsPrefix(ns) + group)
+       retVal, err := s.getDbBackend(ns).SMembers(getNsPrefix(ns) + group)
        if err != nil {
                return []string{}, err
        }
@@ -480,7 +500,7 @@ func (s *SyncStorage) GetMembers(ns string, group string) ([]string, error) {
 
 //IsMember returns true if given member is found from a group under given namespace.
 func (s *SyncStorage) IsMember(ns string, group string, member interface{}) (bool, error) {
-       retVal, err := s.SIsMember(getNsPrefix(ns)+group, member)
+       retVal, err := s.getDbBackend(ns).SIsMember(getNsPrefix(ns)+group, member)
        if err != nil {
                return false, err
        }
@@ -489,7 +509,7 @@ func (s *SyncStorage) IsMember(ns string, group string, member interface{}) (boo
 
 //GroupSize returns the number of members in a group under given namespace.
 func (s *SyncStorage) GroupSize(ns string, group string) (int64, error) {
-       retVal, err := s.SCard(getNsPrefix(ns) + group)
+       retVal, err := s.getDbBackend(ns).SCard(getNsPrefix(ns) + group)
        if err != nil {
                return 0, err
        }
@@ -523,7 +543,7 @@ func (s *SyncStorage) LockResource(ns string, resource string, expiration time.D
 
        var retryTimer *time.Timer
        for i, attempts := 0, opt.getRetryCount()+1; i < attempts; i++ {
-               ok, err := s.SetNX(getNsPrefix(ns)+resource, value, expiration)
+               ok, err := s.getDbBackend(ns).SetNX(getNsPrefix(ns)+resource, value, expiration)
                if err != nil {
                        return nil, err
                } else if ok {
@@ -547,7 +567,7 @@ func (s *SyncStorage) LockResource(ns string, resource string, expiration time.D
 //is already expired or some other instance is keeping the lock (lock taken after
 //expiration), an error is returned.
 func (l *SyncStorageLock) ReleaseResource(ns string) error {
-       ok, err := l.s.DelIE(getNsPrefix(ns)+l.key, l.value)
+       ok, err := l.s.getDbBackend(ns).DelIE(getNsPrefix(ns)+l.key, l.value)
 
        if err != nil {
                return err
@@ -562,14 +582,14 @@ func (l *SyncStorageLock) ReleaseResource(ns string) error {
 //resource lock (if the lock still exists) under given namespace. The old
 //remaining expiration time is overwritten with the given new expiration time.
 func (l *SyncStorageLock) RefreshResource(ns string, expiration time.Duration) error {
-       err := l.s.PExpireIE(getNsPrefix(ns)+l.key, l.value, expiration)
+       err := l.s.getDbBackend(ns).PExpireIE(getNsPrefix(ns)+l.key, l.value, expiration)
        return err
 }
 
 //CheckResource returns the expiration time left for a resource under given
 //namespace. If the resource doesn't exist, -2 is returned.
 func (s *SyncStorage) CheckResource(ns string, resource string) (time.Duration, error) {
-       result, err := s.PTTL(getNsPrefix(ns) + resource)
+       result, err := s.getDbBackend(ns).PTTL(getNsPrefix(ns) + resource)
        if err != nil {
                return 0, err
        }