X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=sdl.go;h=426c2c46e468c2edb755f544dc2145868a73a591;hb=33fdc5897e5a97acd2b866c3f01af8151f40fda3;hp=9eee8c956ae88c63bb436511826c843a965fc3ef;hpb=a66fcfb6cc2918cb06945ebb2f84f0c9f440ec09;p=ric-plt%2Fsdlgo.git diff --git a/sdl.go b/sdl.go index 9eee8c9..426c2c4 100644 --- a/sdl.go +++ b/sdl.go @@ -18,38 +18,27 @@ package sdlgo import ( + "crypto/rand" + "encoding/base64" "errors" "fmt" + "io" "reflect" "strings" + "sync" + "time" "gerrit.o-ran-sc.org/r/ric-plt/sdlgo/internal/sdlgoredis" ) -type iDatabase interface { - SubscribeChannelDB(cb sdlgoredis.ChannelNotificationCb, channelPrefix, eventSeparator string, channels ...string) - UnsubscribeChannelDB(channels ...string) - MSet(pairs ...interface{}) error - MSetPub(ns, message string, pairs ...interface{}) error - MGet(keys []string) ([]interface{}, error) - CloseDB() error - Del(keys []string) error - DelPub(channel, message string, keys []string) error - Keys(key string) ([]string, error) - SetIE(key string, oldData, newData interface{}) (bool, error) - SetIEPub(channel, message, key string, oldData, newData interface{}) (bool, error) - SetNX(key string, data interface{}) (bool, error) - SetNXPub(channel, message, key string, data interface{}) (bool, error) - DelIE(key string, data interface{}) (bool, error) - DelIEPub(channel, message, key string, data interface{}) (bool, error) -} - //SdlInstance provides an API to read, write and modify //key-value pairs in a given namespace. type SdlInstance struct { nameSpace string nsPrefix string eventSeparator string + mutex sync.Mutex + tmp []byte iDatabase } @@ -132,6 +121,12 @@ func (s *SdlInstance) setNamespaceToKeys(pairs ...interface{}) ([]interface{}, e for _, v := range pairs { reflectType := reflect.TypeOf(v) switch reflectType.Kind() { + case reflect.Map: + x := reflect.ValueOf(v).MapRange() + for x.Next() { + retVal = append(retVal, s.nsPrefix+x.Key().Interface().(string)) + retVal = append(retVal, x.Value().Interface()) + } case reflect.Slice: if shouldBeKey { x := reflect.ValueOf(v) @@ -211,21 +206,20 @@ func (s *SdlInstance) prepareChannelsAndEvents(channelsAndEvents []string) []str return retVal } -//SetAndPublish function writes data to shared data layer storage and send an event to +//SetAndPublish function writes data to shared data layer storage and sends an event to //a channel. Writing is done atomically, i.e. all succeeds or fails. //Data to be written is given as key-value pairs. Several key-value //pairs can be written with one call. //The key is expected to be string whereas value can be anything, string, //number, slice array or map // +//If data was set successfully, an event is sent to a channel. //Channels and events are given as pairs is channelsAndEvents parameter. -//Although it is possible to give sevral channel-event pairs, current implementation -//supports sending events to one channel only due to missing support in DB backend. +//It is possible to send several events to several channels by giving several +//channel-event pairs. +// E.g. []{"channel1", "event1", "channel2", "event2", "channel1", "event3"} +//will send event1 and event3 to channel1 and event2 to channel2. func (s *SdlInstance) SetAndPublish(channelsAndEvents []string, pairs ...interface{}) error { - if len(pairs)%2 != 0 { - return errors.New("Invalid pairs parameter") - } - keyAndData, err := s.setNamespaceToKeys(pairs...) if err != nil { return err @@ -237,7 +231,7 @@ func (s *SdlInstance) SetAndPublish(channelsAndEvents []string, pairs ...interfa return err } channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents) - return s.MSetPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], keyAndData...) + return s.MSetMPub(channelsAndEventsPrepared, keyAndData...) } //Set function writes data to shared data layer storage. Writing is done @@ -307,7 +301,7 @@ func (s *SdlInstance) SetIf(key string, oldData, newData interface{}) (bool, err //given channel. func (s *SdlInstance) SetIfNotExistsAndPublish(channelsAndEvents []string, key string, data interface{}) (bool, error) { if len(channelsAndEvents) == 0 { - return s.SetNX(s.nsPrefix+key, data) + return s.SetNX(s.nsPrefix+key, data, 0) } if err := s.checkChannelsAndEvents("SetIfNotExistsAndPublish", channelsAndEvents); err != nil { return false, err @@ -320,11 +314,14 @@ func (s *SdlInstance) SetIfNotExistsAndPublish(channelsAndEvents []string, key s //then it's value is not changed. Checking the key existence and potential set operation //is done atomically. func (s *SdlInstance) SetIfNotExists(key string, data interface{}) (bool, error) { - return s.SetNX(s.nsPrefix+key, data) + return s.SetNX(s.nsPrefix+key, data, 0) } //RemoveAndPublish removes data from SDL. Operation is done atomically, i.e. either all succeeds or fails. -//An event is published into a given channel if remove operation is successfull. +//Trying to remove a nonexisting key is not considered as an error. +//An event is published into a given channel if remove operation is successfull and +//at least one key is removed (if several keys given). If the given key(s) doesn't exist +//when trying to remove, no event is published. func (s *SdlInstance) RemoveAndPublish(channelsAndEvents []string, keys []string) error { if len(keys) == 0 { return nil @@ -341,7 +338,7 @@ func (s *SdlInstance) RemoveAndPublish(channelsAndEvents []string, keys []string return err } channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents) - return s.DelPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], keysWithNs) + return s.DelMPub(channelsAndEventsPrepared, keysWithNs) } //Remove data from SDL. Operation is done atomically, i.e. either all succeeds or fails. @@ -425,8 +422,194 @@ func (s *SdlInstance) RemoveAllAndPublish(channelsAndEvents []string) error { return err } channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents) - err = s.DelPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], keys) + err = s.DelMPub(channelsAndEventsPrepared, keys) + } + return err +} + +//AddMember adds a new members to a group. +// +//SDL groups are unordered collections of members where each member is +//unique. It is possible to add the same member several times without the +//need to check if it already exists. +func (s *SdlInstance) AddMember(group string, member ...interface{}) error { + return s.SAdd(s.nsPrefix+group, member...) +} + +//RemoveMember removes members from a group. +func (s *SdlInstance) RemoveMember(group string, member ...interface{}) error { + return s.SRem(s.nsPrefix+group, member...) +} + +//RemoveGroup removes the whole group along with it's members. +func (s *SdlInstance) RemoveGroup(group string) error { + return s.Del([]string{s.nsPrefix + group}) +} + +//GetMembers returns all the members from a group. +func (s *SdlInstance) GetMembers(group string) ([]string, error) { + retVal, err := s.SMembers(s.nsPrefix + group) + if err != nil { + return []string{}, err + } + return retVal, err +} + +//IsMember returns true if given member is found from a group. +func (s *SdlInstance) IsMember(group string, member interface{}) (bool, error) { + retVal, err := s.SIsMember(s.nsPrefix+group, member) + if err != nil { + return false, err + } + return retVal, err +} + +//GroupSize returns the number of members in a group. +func (s *SdlInstance) GroupSize(group string) (int64, error) { + retVal, err := s.SCard(s.nsPrefix + group) + if err != nil { + return 0, err + } + return retVal, err +} + +func (s *SdlInstance) randomToken() (string, error) { + s.mutex.Lock() + defer s.mutex.Unlock() + + if len(s.tmp) == 0 { + s.tmp = make([]byte, 16) + } + + if _, err := io.ReadFull(rand.Reader, s.tmp); err != nil { + return "", err + } + + return base64.RawURLEncoding.EncodeToString(s.tmp), nil +} + +//LockResource function is used for locking a resource. The resource lock in +//practice is a key with random value that is set to expire after a time +//period. The value written to key is a random value, thus only the instance +//created a lock, can release it. Resource locks are per namespace. +func (s *SdlInstance) LockResource(resource string, expiration time.Duration, opt *Options) (*Lock, error) { + value, err := s.randomToken() + if err != nil { + return nil, err } + + var retryTimer *time.Timer + for i, attempts := 0, opt.getRetryCount()+1; i < attempts; i++ { + ok, err := s.SetNX(s.nsPrefix+resource, value, expiration) + if err != nil { + return nil, err + } else if ok { + return &Lock{s: s, key: resource, value: value}, nil + } + if retryTimer == nil { + retryTimer = time.NewTimer(opt.getRetryWait()) + defer retryTimer.Stop() + } else { + retryTimer.Reset(opt.getRetryWait()) + } + + select { + case <-retryTimer.C: + } + } + return nil, errors.New("Lock not obtained") +} + +//ReleaseResource removes the lock from a resource. If lock is already +//expired or some other instance is keeping the lock (lock taken after expiration), +//an error is returned. +func (l *Lock) ReleaseResource() error { + ok, err := l.s.DelIE(l.s.nsPrefix+l.key, l.value) + + if err != nil { + return err + } + if !ok { + return errors.New("Lock not held") + } + return nil +} + +//RefreshResource function can be used to set a new expiration time for the +//resource lock (if the lock still exists). The old remaining expiration +//time is overwritten with the given new expiration time. +func (l *Lock) RefreshResource(expiration time.Duration) error { + err := l.s.PExpireIE(l.s.nsPrefix+l.key, l.value, expiration) return err +} + +//CheckResource returns the expiration time left for a resource. +//If the resource doesn't exist, -2 is returned. +func (s *SdlInstance) CheckResource(resource string) (time.Duration, error) { + result, err := s.PTTL(s.nsPrefix + resource) + if err != nil { + return 0, err + } + if result == time.Duration(-1) { + return 0, errors.New("invalid resource given, no expiration time attached") + } + return result, nil +} + +//Options struct defines the behaviour for getting the resource lock. +type Options struct { + //The number of time the lock will be tried. + //Default: 0 = no retry + RetryCount int + + //Wait between the retries. + //Default: 100ms + RetryWait time.Duration +} + +func (o *Options) getRetryCount() int { + if o != nil && o.RetryCount > 0 { + return o.RetryCount + } + return 0 +} +func (o *Options) getRetryWait() time.Duration { + if o != nil && o.RetryWait > 0 { + return o.RetryWait + } + return 100 * time.Millisecond +} + +//Lock struct identifies the resource lock instance. Releasing and adjusting the +//expirations are done using the methods defined for this struct. +type Lock struct { + s *SdlInstance + key string + value string +} + +type iDatabase interface { + SubscribeChannelDB(cb sdlgoredis.ChannelNotificationCb, channelPrefix, eventSeparator string, channels ...string) + UnsubscribeChannelDB(channels ...string) + MSet(pairs ...interface{}) error + MSetMPub(channelsAndEvents []string, pairs ...interface{}) error + MGet(keys []string) ([]interface{}, error) + CloseDB() error + Del(keys []string) error + DelMPub(channelsAndEvents []string, keys []string) error + Keys(key string) ([]string, error) + SetIE(key string, oldData, newData interface{}) (bool, error) + SetIEPub(channel, message, key string, oldData, newData interface{}) (bool, error) + SetNX(key string, data interface{}, expiration time.Duration) (bool, error) + SetNXPub(channel, message, key string, data interface{}) (bool, error) + DelIE(key string, data interface{}) (bool, error) + DelIEPub(channel, message, key string, data interface{}) (bool, error) + SAdd(key string, data ...interface{}) error + SRem(key string, data ...interface{}) error + SMembers(key string) ([]string, error) + SIsMember(key string, data interface{}) (bool, error) + SCard(key string) (int64, error) + PTTL(key string) (time.Duration, error) + PExpireIE(key string, data interface{}, expiration time.Duration) error }