Add support for notifications
[ric-plt/sdlgo.git] / sdl.go
diff --git a/sdl.go b/sdl.go
index 295d21e..4d98904 100644 (file)
--- a/sdl.go
+++ b/sdl.go
 package sdlgo
 
 import (
+       "errors"
+       "fmt"
        "reflect"
        "strings"
 
-       "gerrit.oran-osc.org/r/ric-plt/sdlgo/internal/sdlgoredis"
+       "gerrit.o-ran-sc.org/r/ric-plt/sdlgo/internal/sdlgoredis"
 )
 
 type iDatabase interface {
+       SubscribeChannelDB(cb sdlgoredis.ChannelNotificationCb, channelPrefix, eventSeparator string, channels ...string)
+       UnsubscribeChannelDB(channels ...string)
        MSet(pairs ...interface{}) error
+       MSetPub(ns, message string, pairs ...interface{}) error
        MGet(keys []string) ([]interface{}, error)
        CloseDB() error
        Del(keys []string) error
+       DelPub(channel, message string, keys []string) error
        Keys(key string) ([]string, error)
        SetIE(key string, oldData, newData interface{}) (bool, error)
+       SetIEPub(channel, message, key string, oldData, newData interface{}) (bool, error)
        SetNX(key string, data interface{}) (bool, error)
+       SetNXPub(channel, message, key string, data interface{}) (bool, error)
        DelIE(key string, data interface{}) (bool, error)
+       DelIEPub(channel, message, key string, data interface{}) (bool, error)
 }
 
+//SdlInstance provides an API to read, write and modify
+//key-value pairs in a given namespace.
 type SdlInstance struct {
-       nameSpace string
-       nsPrefix  string
+       nameSpace      string
+       nsPrefix       string
+       eventSeparator string
        iDatabase
 }
 
@@ -45,34 +57,87 @@ type SdlInstance struct {
 //as a backend for the key-value storage. The returned value shall
 //be given as a parameter when calling NewKeyValStorage
 func NewDatabase() *sdlgoredis.DB {
-       db := sdlgoredis.Create()
-       return db
+       return sdlgoredis.Create()
 }
 
 //NewSdlInstance creates a new sdl instance using the given namespace.
 //The database used as a backend is given as a parameter
 func NewSdlInstance(NameSpace string, db iDatabase) *SdlInstance {
-       s := SdlInstance{
-               nameSpace: NameSpace,
-               nsPrefix:  "{" + NameSpace + "},",
-               iDatabase: db,
+       return &SdlInstance{
+               nameSpace:      NameSpace,
+               nsPrefix:       "{" + NameSpace + "},",
+               eventSeparator: "___",
+               iDatabase:      db,
        }
+}
+
+//SubscribeChannel lets you to subscribe for a events on a given channels.
+//SDL notifications are events that are published on a specific channels.
+//Both the channel and events are defined by the entity that is publishing
+//the events.
+//
+//When subscribing for a channel, a callback function is given as a parameter.
+//Whenever a notification is received from a channel, this callback is called
+//with channel and notifications as parameter (several notifications could be
+//packed to a single callback function call). A call to SubscribeChannel function
+//returns immediatelly, callbacks will be called asyncronously.
+//
+//It is possible to subscribe to different channels using different callbacks. In
+//this case simply use SubscribeChannel function separately for each channel.
+//
+//When receiving events in callback routine, it is a good practive to return from
+//callback as quickly as possible. E.g. reading in callback context should be avoided
+//and using of Go signals is recommended. Also it should be noted that in case of several
+//events received from different channels, callbacks are called in series one by one.
+func (s *SdlInstance) SubscribeChannel(cb func(string, ...string), channels ...string) error {
+       s.SubscribeChannelDB(cb, s.nsPrefix, s.eventSeparator, s.setNamespaceToChannels(channels...)...)
+       return nil
+}
 
-       return &s
+//UnsubscribeChannel removes subscription from one or several channels.
+func (s *SdlInstance) UnsubscribeChannel(channels ...string) error {
+       s.UnsubscribeChannelDB(s.setNamespaceToChannels(channels...)...)
+       return nil
 }
 
+//Close connection to backend database.
 func (s *SdlInstance) Close() error {
        return s.CloseDB()
 }
 
-func (s *SdlInstance) setNamespaceToKeys(pairs ...interface{}) []interface{} {
-       var retVal []interface{}
-       for i, v := range pairs {
-               if i%2 == 0 {
-                       reflectType := reflect.TypeOf(v)
-                       switch reflectType.Kind() {
-                       case reflect.Slice:
+func (s *SdlInstance) checkChannelsAndEvents(cmd string, channelsAndEvents []string) error {
+       if len(channelsAndEvents)%2 != 0 {
+               return fmt.Errorf("%s: Channels and events must be given as pairs", cmd)
+       }
+       for i, v := range channelsAndEvents {
+               if i%2 != 0 {
+                       if strings.Contains(v, s.eventSeparator) {
+                               return fmt.Errorf("%s: event %s contains illegal substring (\"%s\")", cmd, v, s.eventSeparator)
+                       }
+               }
+       }
+       return nil
+}
+func (s *SdlInstance) setNamespaceToChannels(channels ...string) []string {
+       var retVal []string
+       for _, v := range channels {
+               retVal = append(retVal, s.nsPrefix+v)
+       }
+       return retVal
+}
+
+func (s *SdlInstance) setNamespaceToKeys(pairs ...interface{}) ([]interface{}, error) {
+       retVal := make([]interface{}, 0)
+       shouldBeKey := true
+       for _, v := range pairs {
+               reflectType := reflect.TypeOf(v)
+               switch reflectType.Kind() {
+               case reflect.Slice:
+                       if shouldBeKey {
                                x := reflect.ValueOf(v)
+                               if x.Len()%2 != 0 {
+                                       return []interface{}{}, errors.New("Key/value pairs doesn't match")
+                               }
                                for i2 := 0; i2 < x.Len(); i2++ {
                                        if i2%2 == 0 {
                                                retVal = append(retVal, s.nsPrefix+x.Index(i2).Interface().(string))
@@ -80,8 +145,15 @@ func (s *SdlInstance) setNamespaceToKeys(pairs ...interface{}) []interface{} {
                                                retVal = append(retVal, x.Index(i2).Interface())
                                        }
                                }
-                       case reflect.Array:
+                       } else {
+                               return []interface{}{}, errors.New("Key/value pairs doesn't match")
+                       }
+               case reflect.Array:
+                       if shouldBeKey {
                                x := reflect.ValueOf(v)
+                               if x.Len()%2 != 0 {
+                                       return []interface{}{}, errors.New("Key/value pairs doesn't match")
+                               }
                                for i2 := 0; i2 < x.Len(); i2++ {
                                        if i2%2 == 0 {
                                                retVal = append(retVal, s.nsPrefix+x.Index(i2).Interface().(string))
@@ -89,16 +161,75 @@ func (s *SdlInstance) setNamespaceToKeys(pairs ...interface{}) []interface{} {
                                                retVal = append(retVal, x.Index(i2).Interface())
                                        }
                                }
-                       default:
+                       } else {
+                               return []interface{}{}, errors.New("Key/value pairs doesn't match")
+                       }
+               default:
+                       if shouldBeKey {
                                retVal = append(retVal, s.nsPrefix+v.(string))
+                               shouldBeKey = false
+                       } else {
+                               retVal = append(retVal, v)
+                               shouldBeKey = true
                        }
+               }
+       }
+       if len(retVal)%2 != 0 {
+               return []interface{}{}, errors.New("Key/value pairs doesn't match")
+       }
+       return retVal, nil
+}
+
+func (s *SdlInstance) prepareChannelsAndEvents(channelsAndEvents []string) []string {
+       channelEventMap := make(map[string]string)
+       for i, v := range channelsAndEvents {
+               if i%2 != 0 {
+                       continue
+               }
+               _, exists := channelEventMap[v]
+               if exists {
+                       channelEventMap[v] = channelEventMap[v] + s.eventSeparator + channelsAndEvents[i+1]
                } else {
-                       retVal = append(retVal, v)
+                       channelEventMap[v] = channelsAndEvents[i+1]
                }
        }
+       retVal := make([]string, 0)
+       for k, v := range channelEventMap {
+               retVal = append(retVal, s.nsPrefix+k)
+               retVal = append(retVal, v)
+       }
        return retVal
 }
 
+//SetAndPublish function writes data to shared data layer storage and send an event to
+//a channel. Writing is done atomically, i.e. all succeeds or fails.
+//Data to be written is given as key-value pairs. Several key-value
+//pairs can be written with one call.
+//The key is expected to be string whereas value can be anything, string,
+//number, slice array or map
+//
+//Channels and events are given as pairs is channelsAndEvents parameter.
+//Although it is possible to give sevral channel-event pairs, current implementation
+//supports sending events to one channel only due to missing support in DB backend.
+func (s *SdlInstance) SetAndPublish(channelsAndEvents []string, pairs ...interface{}) error {
+       if len(pairs)%2 != 0 {
+               return errors.New("Invalid pairs parameter")
+       }
+
+       keyAndData, err := s.setNamespaceToKeys(pairs...)
+       if err != nil {
+               return err
+       }
+       if len(channelsAndEvents) == 0 {
+               return s.MSet(keyAndData...)
+       }
+       if err := s.checkChannelsAndEvents("SetAndPublish", channelsAndEvents); err != nil {
+               return err
+       }
+       channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
+       return s.MSetPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], keyAndData...)
+}
+
 //Set function writes data to shared data layer storage. Writing is done
 //atomically, i.e. all succeeds or fails.
 //Data to be written is given as key-value pairs. Several key-value
@@ -110,9 +241,11 @@ func (s *SdlInstance) Set(pairs ...interface{}) error {
                return nil
        }
 
-       keyAndData := s.setNamespaceToKeys(pairs...)
-       err := s.MSet(keyAndData...)
-       return err
+       keyAndData, err := s.setNamespaceToKeys(pairs...)
+       if err != nil {
+               return err
+       }
+       return s.MSet(keyAndData...)
 }
 
 //Get function atomically reads one or more keys from SDL. The returned map has the
@@ -138,28 +271,70 @@ func (s *SdlInstance) Get(keys []string) (map[string]interface{}, error) {
        return m, err
 }
 
+//SetIfAndPublish atomically replaces existing data with newData in SDL if data matches the oldData.
+//If replace was done successfully, true will be returned. Also, if publishing was successfull, an event
+//is published to a given channel.
+func (s *SdlInstance) SetIfAndPublish(channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error) {
+       if len(channelsAndEvents) == 0 {
+               return s.SetIE(s.nsPrefix+key, oldData, newData)
+       }
+       if err := s.checkChannelsAndEvents("SetIfAndPublish", channelsAndEvents); err != nil {
+               return false, err
+       }
+       channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
+       return s.SetIEPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], s.nsPrefix+key, oldData, newData)
+}
+
 //SetIf atomically replaces existing data with newData in SDL if data matches the oldData.
 //If replace was done successfully, true will be returned.
 func (s *SdlInstance) SetIf(key string, oldData, newData interface{}) (bool, error) {
-       status, err := s.SetIE(s.nsPrefix+key, oldData, newData)
-       if err != nil {
+       return s.SetIE(s.nsPrefix+key, oldData, newData)
+}
+
+//SetIfNotExistsAndPublish conditionally sets the value of a key. If key already exists in SDL,
+//then it's value is not changed. Checking the key existence and potential set operation
+//is done atomically. If the set operation was done successfully, an event is published to a
+//given channel.
+func (s *SdlInstance) SetIfNotExistsAndPublish(channelsAndEvents []string, key string, data interface{}) (bool, error) {
+       if len(channelsAndEvents) == 0 {
+               return s.SetNX(s.nsPrefix+key, data)
+       }
+       if err := s.checkChannelsAndEvents("SetIfNotExistsAndPublish", channelsAndEvents); err != nil {
                return false, err
        }
-       return status, nil
+       channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
+       return s.SetNXPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], s.nsPrefix+key, data)
 }
 
 //SetIfNotExists conditionally sets the value of a key. If key already exists in SDL,
 //then it's value is not changed. Checking the key existence and potential set operation
 //is done atomically.
 func (s *SdlInstance) SetIfNotExists(key string, data interface{}) (bool, error) {
-       status, err := s.SetNX(s.nsPrefix+key, data)
-       if err != nil {
-               return false, err
+       return s.SetNX(s.nsPrefix+key, data)
+}
+
+//RemoveAndPublish removes data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
+//An event is published into a given channel if remove operation is successfull.
+func (s *SdlInstance) RemoveAndPublish(channelsAndEvents []string, keys []string) error {
+       if len(keys) == 0 {
+               return nil
        }
-       return status, nil
+
+       var keysWithNs []string
+       for _, v := range keys {
+               keysWithNs = append(keysWithNs, s.nsPrefix+v)
+       }
+       if len(channelsAndEvents) == 0 {
+               return s.Del(keysWithNs)
+       }
+       if err := s.checkChannelsAndEvents("RemoveAndPublish", channelsAndEvents); err != nil {
+               return err
+       }
+       channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
+       return s.DelPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], keysWithNs)
 }
 
-//Remove data from SDL. Operation is done atomically, i.e. either all succeeds or fails
+//Remove data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
 func (s *SdlInstance) Remove(keys []string) error {
        if len(keys) == 0 {
                return nil
@@ -173,6 +348,20 @@ func (s *SdlInstance) Remove(keys []string) error {
        return err
 }
 
+//RemoveIfAndPublish removes data from SDL conditionally and if remove was done successfully,
+//a given event is published to channel. If existing data matches given data,
+//key and data are removed from SDL. If remove was done successfully, true is returned.
+func (s *SdlInstance) RemoveIfAndPublish(channelsAndEvents []string, key string, data interface{}) (bool, error) {
+       if len(channelsAndEvents) == 0 {
+               return s.DelIE(s.nsPrefix+key, data)
+       }
+       if err := s.checkChannelsAndEvents("RemoveIfAndPublish", channelsAndEvents); err != nil {
+               return false, err
+       }
+       channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
+       return s.DelIEPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], s.nsPrefix+key, data)
+}
+
 //RemoveIf removes data from SDL conditionally. If existing data matches given data,
 //key and data are removed from SDL. If remove was done successfully, true is returned.
 func (s *SdlInstance) RemoveIf(key string, data interface{}) (bool, error) {
@@ -187,7 +376,7 @@ func (s *SdlInstance) RemoveIf(key string, data interface{}) (bool, error) {
 //given namespace exists, thus operation is not guaranteed to be atomic or isolated.
 func (s *SdlInstance) GetAll() ([]string, error) {
        keys, err := s.Keys(s.nsPrefix + "*")
-       var retVal []string = nil
+       var retVal []string
        if err != nil {
                return retVal, err
        }
@@ -204,8 +393,27 @@ func (s *SdlInstance) RemoveAll() error {
        if err != nil {
                return err
        }
-       if keys != nil {
+       if (keys != nil) && (len(keys) != 0) {
                err = s.Del(keys)
        }
        return err
 }
+
+func (s *SdlInstance) RemoveAllAndPublish(channelsAndEvents []string) error {
+       keys, err := s.Keys(s.nsPrefix + "*")
+       if err != nil {
+               return err
+       }
+       if (keys != nil) && (len(keys) != 0) {
+               if len(channelsAndEvents) == 0 {
+                       return s.Del(keys)
+               }
+               if err := s.checkChannelsAndEvents("RemoveIfAndPublish", channelsAndEvents); err != nil {
+                       return err
+               }
+               channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
+               err = s.DelPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], keys)
+       }
+       return err
+
+}