X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=sdl.go;h=312654226d66a0c122874a2cf50fc6998134b488;hb=refs%2Fchanges%2F16%2F1316%2F1;hp=78dd15d482bfd9bbe9241f95213e349cdb9f287d;hpb=ae7460ab662366115f6decc834a109bfa8985cc6;p=ric-plt%2Fsdlgo.git diff --git a/sdl.go b/sdl.go index 78dd15d..3126542 100644 --- a/sdl.go +++ b/sdl.go @@ -18,10 +18,15 @@ package sdlgo import ( + "crypto/rand" + "encoding/base64" "errors" "fmt" + "io" "reflect" "strings" + "sync" + "time" "gerrit.o-ran-sc.org/r/ric-plt/sdlgo/internal/sdlgoredis" ) @@ -32,24 +37,36 @@ type SdlInstance struct { nameSpace string nsPrefix string eventSeparator string + mutex sync.Mutex + tmp []byte iDatabase } +//Database struct is a holder for the internal database instance. Applications +//can use this exported data type to locally store a reference to database +//instance returned from NewDabase() function. +type Database struct { + instance iDatabase +} + //NewDatabase creates a connection to database that will be used -//as a backend for the key-value storage. The returned value shall -//be given as a parameter when calling NewKeyValStorage -func NewDatabase() *sdlgoredis.DB { - return sdlgoredis.Create() +//as a backend for the key-value storage. The returned value +//can be reused between multiple SDL instances in which case each instance +//is using the same connection. +func NewDatabase() *Database { + return &Database{ + instance: sdlgoredis.Create(), + } } //NewSdlInstance creates a new sdl instance using the given namespace. //The database used as a backend is given as a parameter -func NewSdlInstance(NameSpace string, db iDatabase) *SdlInstance { +func NewSdlInstance(NameSpace string, db *Database) *SdlInstance { return &SdlInstance{ nameSpace: NameSpace, nsPrefix: "{" + NameSpace + "},", eventSeparator: "___", - iDatabase: db, + iDatabase: db.instance, } } @@ -71,6 +88,8 @@ func NewSdlInstance(NameSpace string, db iDatabase) *SdlInstance { //callback as quickly as possible. E.g. reading in callback context should be avoided //and using of Go signals is recommended. Also it should be noted that in case of several //events received from different channels, callbacks are called in series one by one. +// +//This function is NOT SAFE FOR CONCURRENT USE by multiple goroutines. func (s *SdlInstance) SubscribeChannel(cb func(string, ...string), channels ...string) error { s.SubscribeChannelDB(cb, s.nsPrefix, s.eventSeparator, s.setNamespaceToChannels(channels...)...) return nil @@ -294,7 +313,7 @@ func (s *SdlInstance) SetIf(key string, oldData, newData interface{}) (bool, err //given channel. func (s *SdlInstance) SetIfNotExistsAndPublish(channelsAndEvents []string, key string, data interface{}) (bool, error) { if len(channelsAndEvents) == 0 { - return s.SetNX(s.nsPrefix+key, data) + return s.SetNX(s.nsPrefix+key, data, 0) } if err := s.checkChannelsAndEvents("SetIfNotExistsAndPublish", channelsAndEvents); err != nil { return false, err @@ -307,7 +326,7 @@ func (s *SdlInstance) SetIfNotExistsAndPublish(channelsAndEvents []string, key s //then it's value is not changed. Checking the key existence and potential set operation //is done atomically. func (s *SdlInstance) SetIfNotExists(key string, data interface{}) (bool, error) { - return s.SetNX(s.nsPrefix+key, data) + return s.SetNX(s.nsPrefix+key, data, 0) } //RemoveAndPublish removes data from SDL. Operation is done atomically, i.e. either all succeeds or fails. @@ -411,7 +430,7 @@ func (s *SdlInstance) RemoveAllAndPublish(channelsAndEvents []string) error { if len(channelsAndEvents) == 0 { return s.Del(keys) } - if err := s.checkChannelsAndEvents("RemoveIfAndPublish", channelsAndEvents); err != nil { + if err := s.checkChannelsAndEvents("RemoveAllAndPublish", channelsAndEvents); err != nil { return err } channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents) @@ -466,8 +485,124 @@ func (s *SdlInstance) GroupSize(group string) (int64, error) { return retVal, err } +func (s *SdlInstance) randomToken() (string, error) { + s.mutex.Lock() + defer s.mutex.Unlock() + + if len(s.tmp) == 0 { + s.tmp = make([]byte, 16) + } + + if _, err := io.ReadFull(rand.Reader, s.tmp); err != nil { + return "", err + } + + return base64.RawURLEncoding.EncodeToString(s.tmp), nil +} + +//LockResource function is used for locking a resource. The resource lock in +//practice is a key with random value that is set to expire after a time +//period. The value written to key is a random value, thus only the instance +//created a lock, can release it. Resource locks are per namespace. +func (s *SdlInstance) LockResource(resource string, expiration time.Duration, opt *Options) (*Lock, error) { + value, err := s.randomToken() + if err != nil { + return nil, err + } + + var retryTimer *time.Timer + for i, attempts := 0, opt.getRetryCount()+1; i < attempts; i++ { + ok, err := s.SetNX(s.nsPrefix+resource, value, expiration) + if err != nil { + return nil, err + } else if ok { + return &Lock{s: s, key: resource, value: value}, nil + } + if retryTimer == nil { + retryTimer = time.NewTimer(opt.getRetryWait()) + defer retryTimer.Stop() + } else { + retryTimer.Reset(opt.getRetryWait()) + } + + select { + case <-retryTimer.C: + } + } + return nil, errors.New("Lock not obtained") +} + +//ReleaseResource removes the lock from a resource. If lock is already +//expired or some other instance is keeping the lock (lock taken after expiration), +//an error is returned. +func (l *Lock) ReleaseResource() error { + ok, err := l.s.DelIE(l.s.nsPrefix+l.key, l.value) + + if err != nil { + return err + } + if !ok { + return errors.New("Lock not held") + } + return nil +} + +//RefreshResource function can be used to set a new expiration time for the +//resource lock (if the lock still exists). The old remaining expiration +//time is overwritten with the given new expiration time. +func (l *Lock) RefreshResource(expiration time.Duration) error { + err := l.s.PExpireIE(l.s.nsPrefix+l.key, l.value, expiration) + return err +} + +//CheckResource returns the expiration time left for a resource. +//If the resource doesn't exist, -2 is returned. +func (s *SdlInstance) CheckResource(resource string) (time.Duration, error) { + result, err := s.PTTL(s.nsPrefix + resource) + if err != nil { + return 0, err + } + if result == time.Duration(-1) { + return 0, errors.New("invalid resource given, no expiration time attached") + } + return result, nil +} + +//Options struct defines the behaviour for getting the resource lock. +type Options struct { + //The number of time the lock will be tried. + //Default: 0 = no retry + RetryCount int + + //Wait between the retries. + //Default: 100ms + RetryWait time.Duration +} + +func (o *Options) getRetryCount() int { + if o != nil && o.RetryCount > 0 { + return o.RetryCount + } + return 0 +} + +func (o *Options) getRetryWait() time.Duration { + if o != nil && o.RetryWait > 0 { + return o.RetryWait + } + return 100 * time.Millisecond +} + +//Lock struct identifies the resource lock instance. Releasing and adjusting the +//expirations are done using the methods defined for this struct. +type Lock struct { + s *SdlInstance + key string + value string +} + type iDatabase interface { - SubscribeChannelDB(cb sdlgoredis.ChannelNotificationCb, channelPrefix, eventSeparator string, channels ...string) + SubscribeChannelDB(cb func(string, ...string), channelPrefix, eventSeparator string, channels ...string) UnsubscribeChannelDB(channels ...string) MSet(pairs ...interface{}) error MSetMPub(channelsAndEvents []string, pairs ...interface{}) error @@ -478,7 +613,7 @@ type iDatabase interface { Keys(key string) ([]string, error) SetIE(key string, oldData, newData interface{}) (bool, error) SetIEPub(channel, message, key string, oldData, newData interface{}) (bool, error) - SetNX(key string, data interface{}) (bool, error) + SetNX(key string, data interface{}, expiration time.Duration) (bool, error) SetNXPub(channel, message, key string, data interface{}) (bool, error) DelIE(key string, data interface{}) (bool, error) DelIEPub(channel, message, key string, data interface{}) (bool, error) @@ -487,4 +622,6 @@ type iDatabase interface { SMembers(key string) ([]string, error) SIsMember(key string, data interface{}) (bool, error) SCard(key string) (int64, error) + PTTL(key string) (time.Duration, error) + PExpireIE(key string, data interface{}, expiration time.Duration) error }