X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=sdl.go;h=e25a85ddcb88aa97b4ce08dde7f901ba4e5b4618;hb=refs%2Fchanges%2F94%2F194%2F3;hp=295d21e1c5c990bd4815eee1171b417db8ef1799;hpb=9617339c09dfd2a0dca05afadb07ae3f7f06a9c6;p=ric-plt%2Fsdlgo.git diff --git a/sdl.go b/sdl.go index 295d21e..e25a85d 100644 --- a/sdl.go +++ b/sdl.go @@ -18,26 +18,20 @@ package sdlgo import ( + "errors" + "fmt" "reflect" "strings" - "gerrit.oran-osc.org/r/ric-plt/sdlgo/internal/sdlgoredis" + "gerrit.o-ran-sc.org/r/ric-plt/sdlgo/internal/sdlgoredis" ) -type iDatabase interface { - MSet(pairs ...interface{}) error - MGet(keys []string) ([]interface{}, error) - CloseDB() error - Del(keys []string) error - Keys(key string) ([]string, error) - SetIE(key string, oldData, newData interface{}) (bool, error) - SetNX(key string, data interface{}) (bool, error) - DelIE(key string, data interface{}) (bool, error) -} - +//SdlInstance provides an API to read, write and modify +//key-value pairs in a given namespace. type SdlInstance struct { - nameSpace string - nsPrefix string + nameSpace string + nsPrefix string + eventSeparator string iDatabase } @@ -45,34 +39,87 @@ type SdlInstance struct { //as a backend for the key-value storage. The returned value shall //be given as a parameter when calling NewKeyValStorage func NewDatabase() *sdlgoredis.DB { - db := sdlgoredis.Create() - return db + return sdlgoredis.Create() } //NewSdlInstance creates a new sdl instance using the given namespace. //The database used as a backend is given as a parameter func NewSdlInstance(NameSpace string, db iDatabase) *SdlInstance { - s := SdlInstance{ - nameSpace: NameSpace, - nsPrefix: "{" + NameSpace + "},", - iDatabase: db, + return &SdlInstance{ + nameSpace: NameSpace, + nsPrefix: "{" + NameSpace + "},", + eventSeparator: "___", + iDatabase: db, } +} + +//SubscribeChannel lets you to subscribe for a events on a given channels. +//SDL notifications are events that are published on a specific channels. +//Both the channel and events are defined by the entity that is publishing +//the events. +// +//When subscribing for a channel, a callback function is given as a parameter. +//Whenever a notification is received from a channel, this callback is called +//with channel and notifications as parameter (several notifications could be +//packed to a single callback function call). A call to SubscribeChannel function +//returns immediatelly, callbacks will be called asyncronously. +// +//It is possible to subscribe to different channels using different callbacks. In +//this case simply use SubscribeChannel function separately for each channel. +// +//When receiving events in callback routine, it is a good practive to return from +//callback as quickly as possible. E.g. reading in callback context should be avoided +//and using of Go signals is recommended. Also it should be noted that in case of several +//events received from different channels, callbacks are called in series one by one. +func (s *SdlInstance) SubscribeChannel(cb func(string, ...string), channels ...string) error { + s.SubscribeChannelDB(cb, s.nsPrefix, s.eventSeparator, s.setNamespaceToChannels(channels...)...) + return nil +} - return &s +//UnsubscribeChannel removes subscription from one or several channels. +func (s *SdlInstance) UnsubscribeChannel(channels ...string) error { + s.UnsubscribeChannelDB(s.setNamespaceToChannels(channels...)...) + return nil } +//Close connection to backend database. func (s *SdlInstance) Close() error { return s.CloseDB() } -func (s *SdlInstance) setNamespaceToKeys(pairs ...interface{}) []interface{} { - var retVal []interface{} - for i, v := range pairs { - if i%2 == 0 { - reflectType := reflect.TypeOf(v) - switch reflectType.Kind() { - case reflect.Slice: +func (s *SdlInstance) checkChannelsAndEvents(cmd string, channelsAndEvents []string) error { + if len(channelsAndEvents)%2 != 0 { + return fmt.Errorf("%s: Channels and events must be given as pairs", cmd) + } + for i, v := range channelsAndEvents { + if i%2 != 0 { + if strings.Contains(v, s.eventSeparator) { + return fmt.Errorf("%s: event %s contains illegal substring (\"%s\")", cmd, v, s.eventSeparator) + } + } + } + return nil +} +func (s *SdlInstance) setNamespaceToChannels(channels ...string) []string { + var retVal []string + for _, v := range channels { + retVal = append(retVal, s.nsPrefix+v) + } + return retVal +} + +func (s *SdlInstance) setNamespaceToKeys(pairs ...interface{}) ([]interface{}, error) { + retVal := make([]interface{}, 0) + shouldBeKey := true + for _, v := range pairs { + reflectType := reflect.TypeOf(v) + switch reflectType.Kind() { + case reflect.Slice: + if shouldBeKey { x := reflect.ValueOf(v) + if x.Len()%2 != 0 { + return []interface{}{}, errors.New("Key/value pairs doesn't match") + } for i2 := 0; i2 < x.Len(); i2++ { if i2%2 == 0 { retVal = append(retVal, s.nsPrefix+x.Index(i2).Interface().(string)) @@ -80,8 +127,20 @@ func (s *SdlInstance) setNamespaceToKeys(pairs ...interface{}) []interface{} { retVal = append(retVal, x.Index(i2).Interface()) } } - case reflect.Array: + } else { + if reflectType.Elem().Kind() == reflect.Uint8 { + retVal = append(retVal, v) + shouldBeKey = true + } else { + return []interface{}{}, errors.New("Key/value pairs doesn't match") + } + } + case reflect.Array: + if shouldBeKey { x := reflect.ValueOf(v) + if x.Len()%2 != 0 { + return []interface{}{}, errors.New("Key/value pairs doesn't match") + } for i2 := 0; i2 < x.Len(); i2++ { if i2%2 == 0 { retVal = append(retVal, s.nsPrefix+x.Index(i2).Interface().(string)) @@ -89,16 +148,80 @@ func (s *SdlInstance) setNamespaceToKeys(pairs ...interface{}) []interface{} { retVal = append(retVal, x.Index(i2).Interface()) } } - default: + } else { + if reflectType.Elem().Kind() == reflect.Uint8 { + retVal = append(retVal, v) + shouldBeKey = true + } else { + return []interface{}{}, errors.New("Key/value pairs doesn't match") + } + } + default: + if shouldBeKey { retVal = append(retVal, s.nsPrefix+v.(string)) + shouldBeKey = false + } else { + retVal = append(retVal, v) + shouldBeKey = true } + } + } + if len(retVal)%2 != 0 { + return []interface{}{}, errors.New("Key/value pairs doesn't match") + } + return retVal, nil +} + +func (s *SdlInstance) prepareChannelsAndEvents(channelsAndEvents []string) []string { + channelEventMap := make(map[string]string) + for i, v := range channelsAndEvents { + if i%2 != 0 { + continue + } + _, exists := channelEventMap[v] + if exists { + channelEventMap[v] = channelEventMap[v] + s.eventSeparator + channelsAndEvents[i+1] } else { - retVal = append(retVal, v) + channelEventMap[v] = channelsAndEvents[i+1] } } + retVal := make([]string, 0) + for k, v := range channelEventMap { + retVal = append(retVal, s.nsPrefix+k) + retVal = append(retVal, v) + } return retVal } +//SetAndPublish function writes data to shared data layer storage and send an event to +//a channel. Writing is done atomically, i.e. all succeeds or fails. +//Data to be written is given as key-value pairs. Several key-value +//pairs can be written with one call. +//The key is expected to be string whereas value can be anything, string, +//number, slice array or map +// +//Channels and events are given as pairs is channelsAndEvents parameter. +//Although it is possible to give sevral channel-event pairs, current implementation +//supports sending events to one channel only due to missing support in DB backend. +func (s *SdlInstance) SetAndPublish(channelsAndEvents []string, pairs ...interface{}) error { + if len(pairs)%2 != 0 { + return errors.New("Invalid pairs parameter") + } + + keyAndData, err := s.setNamespaceToKeys(pairs...) + if err != nil { + return err + } + if len(channelsAndEvents) == 0 { + return s.MSet(keyAndData...) + } + if err := s.checkChannelsAndEvents("SetAndPublish", channelsAndEvents); err != nil { + return err + } + channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents) + return s.MSetPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], keyAndData...) +} + //Set function writes data to shared data layer storage. Writing is done //atomically, i.e. all succeeds or fails. //Data to be written is given as key-value pairs. Several key-value @@ -110,9 +233,11 @@ func (s *SdlInstance) Set(pairs ...interface{}) error { return nil } - keyAndData := s.setNamespaceToKeys(pairs...) - err := s.MSet(keyAndData...) - return err + keyAndData, err := s.setNamespaceToKeys(pairs...) + if err != nil { + return err + } + return s.MSet(keyAndData...) } //Get function atomically reads one or more keys from SDL. The returned map has the @@ -138,28 +263,70 @@ func (s *SdlInstance) Get(keys []string) (map[string]interface{}, error) { return m, err } +//SetIfAndPublish atomically replaces existing data with newData in SDL if data matches the oldData. +//If replace was done successfully, true will be returned. Also, if publishing was successfull, an event +//is published to a given channel. +func (s *SdlInstance) SetIfAndPublish(channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error) { + if len(channelsAndEvents) == 0 { + return s.SetIE(s.nsPrefix+key, oldData, newData) + } + if err := s.checkChannelsAndEvents("SetIfAndPublish", channelsAndEvents); err != nil { + return false, err + } + channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents) + return s.SetIEPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], s.nsPrefix+key, oldData, newData) +} + //SetIf atomically replaces existing data with newData in SDL if data matches the oldData. //If replace was done successfully, true will be returned. func (s *SdlInstance) SetIf(key string, oldData, newData interface{}) (bool, error) { - status, err := s.SetIE(s.nsPrefix+key, oldData, newData) - if err != nil { + return s.SetIE(s.nsPrefix+key, oldData, newData) +} + +//SetIfNotExistsAndPublish conditionally sets the value of a key. If key already exists in SDL, +//then it's value is not changed. Checking the key existence and potential set operation +//is done atomically. If the set operation was done successfully, an event is published to a +//given channel. +func (s *SdlInstance) SetIfNotExistsAndPublish(channelsAndEvents []string, key string, data interface{}) (bool, error) { + if len(channelsAndEvents) == 0 { + return s.SetNX(s.nsPrefix+key, data) + } + if err := s.checkChannelsAndEvents("SetIfNotExistsAndPublish", channelsAndEvents); err != nil { return false, err } - return status, nil + channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents) + return s.SetNXPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], s.nsPrefix+key, data) } //SetIfNotExists conditionally sets the value of a key. If key already exists in SDL, //then it's value is not changed. Checking the key existence and potential set operation //is done atomically. func (s *SdlInstance) SetIfNotExists(key string, data interface{}) (bool, error) { - status, err := s.SetNX(s.nsPrefix+key, data) - if err != nil { - return false, err + return s.SetNX(s.nsPrefix+key, data) +} + +//RemoveAndPublish removes data from SDL. Operation is done atomically, i.e. either all succeeds or fails. +//An event is published into a given channel if remove operation is successfull. +func (s *SdlInstance) RemoveAndPublish(channelsAndEvents []string, keys []string) error { + if len(keys) == 0 { + return nil } - return status, nil + + var keysWithNs []string + for _, v := range keys { + keysWithNs = append(keysWithNs, s.nsPrefix+v) + } + if len(channelsAndEvents) == 0 { + return s.Del(keysWithNs) + } + if err := s.checkChannelsAndEvents("RemoveAndPublish", channelsAndEvents); err != nil { + return err + } + channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents) + return s.DelPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], keysWithNs) } -//Remove data from SDL. Operation is done atomically, i.e. either all succeeds or fails +//Remove data from SDL. Operation is done atomically, i.e. either all succeeds or fails. func (s *SdlInstance) Remove(keys []string) error { if len(keys) == 0 { return nil @@ -173,6 +340,20 @@ func (s *SdlInstance) Remove(keys []string) error { return err } +//RemoveIfAndPublish removes data from SDL conditionally and if remove was done successfully, +//a given event is published to channel. If existing data matches given data, +//key and data are removed from SDL. If remove was done successfully, true is returned. +func (s *SdlInstance) RemoveIfAndPublish(channelsAndEvents []string, key string, data interface{}) (bool, error) { + if len(channelsAndEvents) == 0 { + return s.DelIE(s.nsPrefix+key, data) + } + if err := s.checkChannelsAndEvents("RemoveIfAndPublish", channelsAndEvents); err != nil { + return false, err + } + channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents) + return s.DelIEPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], s.nsPrefix+key, data) +} + //RemoveIf removes data from SDL conditionally. If existing data matches given data, //key and data are removed from SDL. If remove was done successfully, true is returned. func (s *SdlInstance) RemoveIf(key string, data interface{}) (bool, error) { @@ -187,7 +368,7 @@ func (s *SdlInstance) RemoveIf(key string, data interface{}) (bool, error) { //given namespace exists, thus operation is not guaranteed to be atomic or isolated. func (s *SdlInstance) GetAll() ([]string, error) { keys, err := s.Keys(s.nsPrefix + "*") - var retVal []string = nil + var retVal []string if err != nil { return retVal, err } @@ -204,8 +385,98 @@ func (s *SdlInstance) RemoveAll() error { if err != nil { return err } - if keys != nil { + if (keys != nil) && (len(keys) != 0) { err = s.Del(keys) } return err } + +//RemoveAllAndPublish removes all keys under the namespace and if successfull, it +//will publish an event to given channel. This operation is not atomic, thus it is +//not guaranteed that all keys are removed. +func (s *SdlInstance) RemoveAllAndPublish(channelsAndEvents []string) error { + keys, err := s.Keys(s.nsPrefix + "*") + if err != nil { + return err + } + if (keys != nil) && (len(keys) != 0) { + if len(channelsAndEvents) == 0 { + return s.Del(keys) + } + if err := s.checkChannelsAndEvents("RemoveIfAndPublish", channelsAndEvents); err != nil { + return err + } + channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents) + err = s.DelPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], keys) + } + return err +} + +//AddMember adds a new members to a group. +// +//SDL groups are unordered collections of members where each member is +//unique. It is possible to add the same member several times without the +//need to check if it already exists. +func (s *SdlInstance) AddMember(group string, member ...interface{}) error { + return s.SAdd(s.nsPrefix+group, member...) +} + +//RemoveMember removes members from a group. +func (s *SdlInstance) RemoveMember(group string, member ...interface{}) error { + return s.SRem(s.nsPrefix+group, member...) +} + +//RemoveGroup removes the whole group along with it's members. +func (s *SdlInstance) RemoveGroup(group string) error { + return s.Del([]string{s.nsPrefix + group}) +} + +//GetMembers returns all the members from a group. +func (s *SdlInstance) GetMembers(group string) ([]string, error) { + retVal, err := s.SMembers(s.nsPrefix + group) + if err != nil { + return []string{}, err + } + return retVal, err +} + +//IsMember returns true if given member is found from a group. +func (s *SdlInstance) IsMember(group string, member interface{}) (bool, error) { + retVal, err := s.SIsMember(s.nsPrefix+group, member) + if err != nil { + return false, err + } + return retVal, err +} + +//GroupSize returns the number of members in a group. +func (s *SdlInstance) GroupSize(group string) (int64, error) { + retVal, err := s.SCard(s.nsPrefix + group) + if err != nil { + return 0, err + } + return retVal, err +} + +type iDatabase interface { + SubscribeChannelDB(cb sdlgoredis.ChannelNotificationCb, channelPrefix, eventSeparator string, channels ...string) + UnsubscribeChannelDB(channels ...string) + MSet(pairs ...interface{}) error + MSetPub(ns, message string, pairs ...interface{}) error + MGet(keys []string) ([]interface{}, error) + CloseDB() error + Del(keys []string) error + DelPub(channel, message string, keys []string) error + Keys(key string) ([]string, error) + SetIE(key string, oldData, newData interface{}) (bool, error) + SetIEPub(channel, message, key string, oldData, newData interface{}) (bool, error) + SetNX(key string, data interface{}) (bool, error) + SetNXPub(channel, message, key string, data interface{}) (bool, error) + DelIE(key string, data interface{}) (bool, error) + DelIEPub(channel, message, key string, data interface{}) (bool, error) + SAdd(key string, data ...interface{}) error + SRem(key string, data ...interface{}) error + SMembers(key string) ([]string, error) + SIsMember(key string, data interface{}) (bool, error) + SCard(key string) (int64, error) +}