Add support for CI jobs
[ric-plt/sdlgo.git] / sdl.go
diff --git a/sdl.go b/sdl.go
index e25a85d..3126542 100644 (file)
--- a/sdl.go
+++ b/sdl.go
 package sdlgo
 
 import (
+       "crypto/rand"
+       "encoding/base64"
        "errors"
        "fmt"
+       "io"
        "reflect"
        "strings"
+       "sync"
+       "time"
 
        "gerrit.o-ran-sc.org/r/ric-plt/sdlgo/internal/sdlgoredis"
 )
@@ -32,24 +37,36 @@ type SdlInstance struct {
        nameSpace      string
        nsPrefix       string
        eventSeparator string
+       mutex          sync.Mutex
+       tmp            []byte
        iDatabase
 }
 
+//Database struct is a holder for the internal database instance. Applications
+//can use this exported data type to locally store a reference to database
+//instance returned from NewDabase() function.
+type Database struct {
+       instance iDatabase
+}
+
 //NewDatabase creates a connection to database that will be used
-//as a backend for the key-value storage. The returned value shall
-//be given as a parameter when calling NewKeyValStorage
-func NewDatabase() *sdlgoredis.DB {
-       return sdlgoredis.Create()
+//as a backend for the key-value storage. The returned value
+//can be reused between multiple SDL instances in which case each instance
+//is using the same connection.
+func NewDatabase() *Database {
+       return &Database{
+               instance: sdlgoredis.Create(),
+       }
 }
 
 //NewSdlInstance creates a new sdl instance using the given namespace.
 //The database used as a backend is given as a parameter
-func NewSdlInstance(NameSpace string, db iDatabase) *SdlInstance {
+func NewSdlInstance(NameSpace string, db *Database) *SdlInstance {
        return &SdlInstance{
                nameSpace:      NameSpace,
                nsPrefix:       "{" + NameSpace + "},",
                eventSeparator: "___",
-               iDatabase:      db,
+               iDatabase:      db.instance,
        }
 }
 
@@ -71,6 +88,8 @@ func NewSdlInstance(NameSpace string, db iDatabase) *SdlInstance {
 //callback as quickly as possible. E.g. reading in callback context should be avoided
 //and using of Go signals is recommended. Also it should be noted that in case of several
 //events received from different channels, callbacks are called in series one by one.
+//
+//This function is NOT SAFE FOR CONCURRENT USE by multiple goroutines.
 func (s *SdlInstance) SubscribeChannel(cb func(string, ...string), channels ...string) error {
        s.SubscribeChannelDB(cb, s.nsPrefix, s.eventSeparator, s.setNamespaceToChannels(channels...)...)
        return nil
@@ -114,6 +133,12 @@ func (s *SdlInstance) setNamespaceToKeys(pairs ...interface{}) ([]interface{}, e
        for _, v := range pairs {
                reflectType := reflect.TypeOf(v)
                switch reflectType.Kind() {
+               case reflect.Map:
+                       x := reflect.ValueOf(v).MapRange()
+                       for x.Next() {
+                               retVal = append(retVal, s.nsPrefix+x.Key().Interface().(string))
+                               retVal = append(retVal, x.Value().Interface())
+                       }
                case reflect.Slice:
                        if shouldBeKey {
                                x := reflect.ValueOf(v)
@@ -193,21 +218,20 @@ func (s *SdlInstance) prepareChannelsAndEvents(channelsAndEvents []string) []str
        return retVal
 }
 
-//SetAndPublish function writes data to shared data layer storage and send an event to
+//SetAndPublish function writes data to shared data layer storage and sends an event to
 //a channel. Writing is done atomically, i.e. all succeeds or fails.
 //Data to be written is given as key-value pairs. Several key-value
 //pairs can be written with one call.
 //The key is expected to be string whereas value can be anything, string,
 //number, slice array or map
 //
+//If data was set successfully, an event is sent to a channel.
 //Channels and events are given as pairs is channelsAndEvents parameter.
-//Although it is possible to give sevral channel-event pairs, current implementation
-//supports sending events to one channel only due to missing support in DB backend.
+//It is possible to send several events to several channels by giving several
+//channel-event pairs.
+//  E.g. []{"channel1", "event1", "channel2", "event2", "channel1", "event3"}
+//will send event1 and event3 to channel1 and event2 to channel2.
 func (s *SdlInstance) SetAndPublish(channelsAndEvents []string, pairs ...interface{}) error {
-       if len(pairs)%2 != 0 {
-               return errors.New("Invalid pairs parameter")
-       }
-
        keyAndData, err := s.setNamespaceToKeys(pairs...)
        if err != nil {
                return err
@@ -219,7 +243,7 @@ func (s *SdlInstance) SetAndPublish(channelsAndEvents []string, pairs ...interfa
                return err
        }
        channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
-       return s.MSetPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], keyAndData...)
+       return s.MSetMPub(channelsAndEventsPrepared, keyAndData...)
 }
 
 //Set function writes data to shared data layer storage. Writing is done
@@ -289,7 +313,7 @@ func (s *SdlInstance) SetIf(key string, oldData, newData interface{}) (bool, err
 //given channel.
 func (s *SdlInstance) SetIfNotExistsAndPublish(channelsAndEvents []string, key string, data interface{}) (bool, error) {
        if len(channelsAndEvents) == 0 {
-               return s.SetNX(s.nsPrefix+key, data)
+               return s.SetNX(s.nsPrefix+key, data, 0)
        }
        if err := s.checkChannelsAndEvents("SetIfNotExistsAndPublish", channelsAndEvents); err != nil {
                return false, err
@@ -302,11 +326,14 @@ func (s *SdlInstance) SetIfNotExistsAndPublish(channelsAndEvents []string, key s
 //then it's value is not changed. Checking the key existence and potential set operation
 //is done atomically.
 func (s *SdlInstance) SetIfNotExists(key string, data interface{}) (bool, error) {
-       return s.SetNX(s.nsPrefix+key, data)
+       return s.SetNX(s.nsPrefix+key, data, 0)
 }
 
 //RemoveAndPublish removes data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
-//An event is published into a given channel if remove operation is successfull.
+//Trying to remove a nonexisting key is not considered as an error.
+//An event is published into a given channel if remove operation is successfull and
+//at least one key is removed (if several keys given). If the given key(s) doesn't exist
+//when trying to remove, no event is published.
 func (s *SdlInstance) RemoveAndPublish(channelsAndEvents []string, keys []string) error {
        if len(keys) == 0 {
                return nil
@@ -323,7 +350,7 @@ func (s *SdlInstance) RemoveAndPublish(channelsAndEvents []string, keys []string
                return err
        }
        channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
-       return s.DelPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], keysWithNs)
+       return s.DelMPub(channelsAndEventsPrepared, keysWithNs)
 }
 
 //Remove data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
@@ -403,11 +430,11 @@ func (s *SdlInstance) RemoveAllAndPublish(channelsAndEvents []string) error {
                if len(channelsAndEvents) == 0 {
                        return s.Del(keys)
                }
-               if err := s.checkChannelsAndEvents("RemoveIfAndPublish", channelsAndEvents); err != nil {
+               if err := s.checkChannelsAndEvents("RemoveAllAndPublish", channelsAndEvents); err != nil {
                        return err
                }
                channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
-               err = s.DelPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], keys)
+               err = s.DelMPub(channelsAndEventsPrepared, keys)
        }
        return err
 }
@@ -458,19 +485,135 @@ func (s *SdlInstance) GroupSize(group string) (int64, error) {
        return retVal, err
 }
 
+func (s *SdlInstance) randomToken() (string, error) {
+       s.mutex.Lock()
+       defer s.mutex.Unlock()
+
+       if len(s.tmp) == 0 {
+               s.tmp = make([]byte, 16)
+       }
+
+       if _, err := io.ReadFull(rand.Reader, s.tmp); err != nil {
+               return "", err
+       }
+
+       return base64.RawURLEncoding.EncodeToString(s.tmp), nil
+}
+
+//LockResource function is used for locking a resource. The resource lock in
+//practice is a key with random value that is set to expire after a time
+//period. The value written to key is a random value, thus only the instance
+//created a lock, can release it. Resource locks are per namespace.
+func (s *SdlInstance) LockResource(resource string, expiration time.Duration, opt *Options) (*Lock, error) {
+       value, err := s.randomToken()
+       if err != nil {
+               return nil, err
+       }
+
+       var retryTimer *time.Timer
+       for i, attempts := 0, opt.getRetryCount()+1; i < attempts; i++ {
+               ok, err := s.SetNX(s.nsPrefix+resource, value, expiration)
+               if err != nil {
+                       return nil, err
+               } else if ok {
+                       return &Lock{s: s, key: resource, value: value}, nil
+               }
+               if retryTimer == nil {
+                       retryTimer = time.NewTimer(opt.getRetryWait())
+                       defer retryTimer.Stop()
+               } else {
+                       retryTimer.Reset(opt.getRetryWait())
+               }
+
+               select {
+               case <-retryTimer.C:
+               }
+       }
+       return nil, errors.New("Lock not obtained")
+}
+
+//ReleaseResource removes the lock from a resource. If lock is already
+//expired or some other instance is keeping the lock (lock taken after expiration),
+//an error is returned.
+func (l *Lock) ReleaseResource() error {
+       ok, err := l.s.DelIE(l.s.nsPrefix+l.key, l.value)
+
+       if err != nil {
+               return err
+       }
+       if !ok {
+               return errors.New("Lock not held")
+       }
+       return nil
+}
+
+//RefreshResource function can be used to set a new expiration time for the
+//resource lock (if the lock still exists). The old remaining expiration
+//time is overwritten with the given new expiration time.
+func (l *Lock) RefreshResource(expiration time.Duration) error {
+       err := l.s.PExpireIE(l.s.nsPrefix+l.key, l.value, expiration)
+       return err
+}
+
+//CheckResource returns the expiration time left for a resource.
+//If the resource doesn't exist, -2 is returned.
+func (s *SdlInstance) CheckResource(resource string) (time.Duration, error) {
+       result, err := s.PTTL(s.nsPrefix + resource)
+       if err != nil {
+               return 0, err
+       }
+       if result == time.Duration(-1) {
+               return 0, errors.New("invalid resource given, no expiration time attached")
+       }
+       return result, nil
+}
+
+//Options struct defines the behaviour for getting the resource lock.
+type Options struct {
+       //The number of time the lock will be tried.
+       //Default: 0 = no retry
+       RetryCount int
+
+       //Wait between the retries.
+       //Default: 100ms
+       RetryWait time.Duration
+}
+
+func (o *Options) getRetryCount() int {
+       if o != nil && o.RetryCount > 0 {
+               return o.RetryCount
+       }
+       return 0
+}
+
+func (o *Options) getRetryWait() time.Duration {
+       if o != nil && o.RetryWait > 0 {
+               return o.RetryWait
+       }
+       return 100 * time.Millisecond
+}
+
+//Lock struct identifies the resource lock instance. Releasing and adjusting the
+//expirations are done using the methods defined for this struct.
+type Lock struct {
+       s     *SdlInstance
+       key   string
+       value string
+}
+
 type iDatabase interface {
-       SubscribeChannelDB(cb sdlgoredis.ChannelNotificationCb, channelPrefix, eventSeparator string, channels ...string)
+       SubscribeChannelDB(cb func(string, ...string), channelPrefix, eventSeparator string, channels ...string)
        UnsubscribeChannelDB(channels ...string)
        MSet(pairs ...interface{}) error
-       MSetPub(ns, message string, pairs ...interface{}) error
+       MSetMPub(channelsAndEvents []string, pairs ...interface{}) error
        MGet(keys []string) ([]interface{}, error)
        CloseDB() error
        Del(keys []string) error
-       DelPub(channel, message string, keys []string) error
+       DelMPub(channelsAndEvents []string, keys []string) error
        Keys(key string) ([]string, error)
        SetIE(key string, oldData, newData interface{}) (bool, error)
        SetIEPub(channel, message, key string, oldData, newData interface{}) (bool, error)
-       SetNX(key string, data interface{}) (bool, error)
+       SetNX(key string, data interface{}, expiration time.Duration) (bool, error)
        SetNXPub(channel, message, key string, data interface{}) (bool, error)
        DelIE(key string, data interface{}) (bool, error)
        DelIEPub(channel, message, key string, data interface{}) (bool, error)
@@ -479,4 +622,6 @@ type iDatabase interface {
        SMembers(key string) ([]string, error)
        SIsMember(key string, data interface{}) (bool, error)
        SCard(key string) (int64, error)
+       PTTL(key string) (time.Duration, error)
+       PExpireIE(key string, data interface{}, expiration time.Duration) error
 }