2 Copyright (c) 2019 AT&T Intellectual Property.
3 Copyright (c) 2018-2019 Nokia.
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
9 http://www.apache.org/licenses/LICENSE-2.0
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
26 "gerrit.o-ran-sc.org/r/ric-plt/sdlgo/internal/sdlgoredis"
29 //SdlInstance provides an API to read, write and modify
30 //key-value pairs in a given namespace.
31 type SdlInstance struct {
38 //NewDatabase creates a connection to database that will be used
39 //as a backend for the key-value storage. The returned value shall
40 //be given as a parameter when calling NewKeyValStorage
41 func NewDatabase() *sdlgoredis.DB {
42 return sdlgoredis.Create()
45 //NewSdlInstance creates a new sdl instance using the given namespace.
46 //The database used as a backend is given as a parameter
47 func NewSdlInstance(NameSpace string, db iDatabase) *SdlInstance {
50 nsPrefix: "{" + NameSpace + "},",
51 eventSeparator: "___",
56 //SubscribeChannel lets you to subscribe for a events on a given channels.
57 //SDL notifications are events that are published on a specific channels.
58 //Both the channel and events are defined by the entity that is publishing
61 //When subscribing for a channel, a callback function is given as a parameter.
62 //Whenever a notification is received from a channel, this callback is called
63 //with channel and notifications as parameter (several notifications could be
64 //packed to a single callback function call). A call to SubscribeChannel function
65 //returns immediatelly, callbacks will be called asyncronously.
67 //It is possible to subscribe to different channels using different callbacks. In
68 //this case simply use SubscribeChannel function separately for each channel.
70 //When receiving events in callback routine, it is a good practive to return from
71 //callback as quickly as possible. E.g. reading in callback context should be avoided
72 //and using of Go signals is recommended. Also it should be noted that in case of several
73 //events received from different channels, callbacks are called in series one by one.
74 func (s *SdlInstance) SubscribeChannel(cb func(string, ...string), channels ...string) error {
75 s.SubscribeChannelDB(cb, s.nsPrefix, s.eventSeparator, s.setNamespaceToChannels(channels...)...)
79 //UnsubscribeChannel removes subscription from one or several channels.
80 func (s *SdlInstance) UnsubscribeChannel(channels ...string) error {
81 s.UnsubscribeChannelDB(s.setNamespaceToChannels(channels...)...)
85 //Close connection to backend database.
86 func (s *SdlInstance) Close() error {
90 func (s *SdlInstance) checkChannelsAndEvents(cmd string, channelsAndEvents []string) error {
91 if len(channelsAndEvents)%2 != 0 {
92 return fmt.Errorf("%s: Channels and events must be given as pairs", cmd)
94 for i, v := range channelsAndEvents {
96 if strings.Contains(v, s.eventSeparator) {
97 return fmt.Errorf("%s: event %s contains illegal substring (\"%s\")", cmd, v, s.eventSeparator)
103 func (s *SdlInstance) setNamespaceToChannels(channels ...string) []string {
105 for _, v := range channels {
106 retVal = append(retVal, s.nsPrefix+v)
111 func (s *SdlInstance) setNamespaceToKeys(pairs ...interface{}) ([]interface{}, error) {
112 retVal := make([]interface{}, 0)
114 for _, v := range pairs {
115 reflectType := reflect.TypeOf(v)
116 switch reflectType.Kind() {
119 x := reflect.ValueOf(v)
121 return []interface{}{}, errors.New("Key/value pairs doesn't match")
123 for i2 := 0; i2 < x.Len(); i2++ {
125 retVal = append(retVal, s.nsPrefix+x.Index(i2).Interface().(string))
127 retVal = append(retVal, x.Index(i2).Interface())
131 if reflectType.Elem().Kind() == reflect.Uint8 {
132 retVal = append(retVal, v)
135 return []interface{}{}, errors.New("Key/value pairs doesn't match")
140 x := reflect.ValueOf(v)
142 return []interface{}{}, errors.New("Key/value pairs doesn't match")
144 for i2 := 0; i2 < x.Len(); i2++ {
146 retVal = append(retVal, s.nsPrefix+x.Index(i2).Interface().(string))
148 retVal = append(retVal, x.Index(i2).Interface())
152 if reflectType.Elem().Kind() == reflect.Uint8 {
153 retVal = append(retVal, v)
156 return []interface{}{}, errors.New("Key/value pairs doesn't match")
161 retVal = append(retVal, s.nsPrefix+v.(string))
164 retVal = append(retVal, v)
169 if len(retVal)%2 != 0 {
170 return []interface{}{}, errors.New("Key/value pairs doesn't match")
175 func (s *SdlInstance) prepareChannelsAndEvents(channelsAndEvents []string) []string {
176 channelEventMap := make(map[string]string)
177 for i, v := range channelsAndEvents {
181 _, exists := channelEventMap[v]
183 channelEventMap[v] = channelEventMap[v] + s.eventSeparator + channelsAndEvents[i+1]
185 channelEventMap[v] = channelsAndEvents[i+1]
188 retVal := make([]string, 0)
189 for k, v := range channelEventMap {
190 retVal = append(retVal, s.nsPrefix+k)
191 retVal = append(retVal, v)
196 //SetAndPublish function writes data to shared data layer storage and send an event to
197 //a channel. Writing is done atomically, i.e. all succeeds or fails.
198 //Data to be written is given as key-value pairs. Several key-value
199 //pairs can be written with one call.
200 //The key is expected to be string whereas value can be anything, string,
201 //number, slice array or map
203 //Channels and events are given as pairs is channelsAndEvents parameter.
204 //Although it is possible to give sevral channel-event pairs, current implementation
205 //supports sending events to one channel only due to missing support in DB backend.
206 func (s *SdlInstance) SetAndPublish(channelsAndEvents []string, pairs ...interface{}) error {
207 if len(pairs)%2 != 0 {
208 return errors.New("Invalid pairs parameter")
211 keyAndData, err := s.setNamespaceToKeys(pairs...)
215 if len(channelsAndEvents) == 0 {
216 return s.MSet(keyAndData...)
218 if err := s.checkChannelsAndEvents("SetAndPublish", channelsAndEvents); err != nil {
221 channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
222 return s.MSetPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], keyAndData...)
225 //Set function writes data to shared data layer storage. Writing is done
226 //atomically, i.e. all succeeds or fails.
227 //Data to be written is given as key-value pairs. Several key-value
228 //pairs can be written with one call.
229 //The key is expected to be string whereas value can be anything, string,
230 //number, slice array or map
231 func (s *SdlInstance) Set(pairs ...interface{}) error {
236 keyAndData, err := s.setNamespaceToKeys(pairs...)
240 return s.MSet(keyAndData...)
243 //Get function atomically reads one or more keys from SDL. The returned map has the
244 //requested keys as index and data as value. If the requested key is not found
245 //from SDL, it's value is nil
246 func (s *SdlInstance) Get(keys []string) (map[string]interface{}, error) {
247 m := make(map[string]interface{})
252 var keysWithNs []string
253 for _, v := range keys {
254 keysWithNs = append(keysWithNs, s.nsPrefix+v)
256 val, err := s.MGet(keysWithNs)
260 for i, v := range val {
266 //SetIfAndPublish atomically replaces existing data with newData in SDL if data matches the oldData.
267 //If replace was done successfully, true will be returned. Also, if publishing was successfull, an event
268 //is published to a given channel.
269 func (s *SdlInstance) SetIfAndPublish(channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error) {
270 if len(channelsAndEvents) == 0 {
271 return s.SetIE(s.nsPrefix+key, oldData, newData)
273 if err := s.checkChannelsAndEvents("SetIfAndPublish", channelsAndEvents); err != nil {
276 channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
277 return s.SetIEPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], s.nsPrefix+key, oldData, newData)
280 //SetIf atomically replaces existing data with newData in SDL if data matches the oldData.
281 //If replace was done successfully, true will be returned.
282 func (s *SdlInstance) SetIf(key string, oldData, newData interface{}) (bool, error) {
283 return s.SetIE(s.nsPrefix+key, oldData, newData)
286 //SetIfNotExistsAndPublish conditionally sets the value of a key. If key already exists in SDL,
287 //then it's value is not changed. Checking the key existence and potential set operation
288 //is done atomically. If the set operation was done successfully, an event is published to a
290 func (s *SdlInstance) SetIfNotExistsAndPublish(channelsAndEvents []string, key string, data interface{}) (bool, error) {
291 if len(channelsAndEvents) == 0 {
292 return s.SetNX(s.nsPrefix+key, data)
294 if err := s.checkChannelsAndEvents("SetIfNotExistsAndPublish", channelsAndEvents); err != nil {
297 channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
298 return s.SetNXPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], s.nsPrefix+key, data)
301 //SetIfNotExists conditionally sets the value of a key. If key already exists in SDL,
302 //then it's value is not changed. Checking the key existence and potential set operation
303 //is done atomically.
304 func (s *SdlInstance) SetIfNotExists(key string, data interface{}) (bool, error) {
305 return s.SetNX(s.nsPrefix+key, data)
308 //RemoveAndPublish removes data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
309 //An event is published into a given channel if remove operation is successfull.
310 func (s *SdlInstance) RemoveAndPublish(channelsAndEvents []string, keys []string) error {
315 var keysWithNs []string
316 for _, v := range keys {
317 keysWithNs = append(keysWithNs, s.nsPrefix+v)
319 if len(channelsAndEvents) == 0 {
320 return s.Del(keysWithNs)
322 if err := s.checkChannelsAndEvents("RemoveAndPublish", channelsAndEvents); err != nil {
325 channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
326 return s.DelPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], keysWithNs)
329 //Remove data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
330 func (s *SdlInstance) Remove(keys []string) error {
335 var keysWithNs []string
336 for _, v := range keys {
337 keysWithNs = append(keysWithNs, s.nsPrefix+v)
339 err := s.Del(keysWithNs)
343 //RemoveIfAndPublish removes data from SDL conditionally and if remove was done successfully,
344 //a given event is published to channel. If existing data matches given data,
345 //key and data are removed from SDL. If remove was done successfully, true is returned.
346 func (s *SdlInstance) RemoveIfAndPublish(channelsAndEvents []string, key string, data interface{}) (bool, error) {
347 if len(channelsAndEvents) == 0 {
348 return s.DelIE(s.nsPrefix+key, data)
350 if err := s.checkChannelsAndEvents("RemoveIfAndPublish", channelsAndEvents); err != nil {
353 channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
354 return s.DelIEPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], s.nsPrefix+key, data)
357 //RemoveIf removes data from SDL conditionally. If existing data matches given data,
358 //key and data are removed from SDL. If remove was done successfully, true is returned.
359 func (s *SdlInstance) RemoveIf(key string, data interface{}) (bool, error) {
360 status, err := s.DelIE(s.nsPrefix+key, data)
367 //GetAll returns all keys under the namespace. No prior knowledge about the keys in the
368 //given namespace exists, thus operation is not guaranteed to be atomic or isolated.
369 func (s *SdlInstance) GetAll() ([]string, error) {
370 keys, err := s.Keys(s.nsPrefix + "*")
375 for _, v := range keys {
376 retVal = append(retVal, strings.Split(v, s.nsPrefix)[1])
381 //RemoveAll removes all keys under the namespace. Remove operation is not atomic, thus
382 //it is not guaranteed that all keys are removed.
383 func (s *SdlInstance) RemoveAll() error {
384 keys, err := s.Keys(s.nsPrefix + "*")
388 if (keys != nil) && (len(keys) != 0) {
394 //RemoveAllAndPublish removes all keys under the namespace and if successfull, it
395 //will publish an event to given channel. This operation is not atomic, thus it is
396 //not guaranteed that all keys are removed.
397 func (s *SdlInstance) RemoveAllAndPublish(channelsAndEvents []string) error {
398 keys, err := s.Keys(s.nsPrefix + "*")
402 if (keys != nil) && (len(keys) != 0) {
403 if len(channelsAndEvents) == 0 {
406 if err := s.checkChannelsAndEvents("RemoveIfAndPublish", channelsAndEvents); err != nil {
409 channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
410 err = s.DelPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], keys)
415 //AddMember adds a new members to a group.
417 //SDL groups are unordered collections of members where each member is
418 //unique. It is possible to add the same member several times without the
419 //need to check if it already exists.
420 func (s *SdlInstance) AddMember(group string, member ...interface{}) error {
421 return s.SAdd(s.nsPrefix+group, member...)
424 //RemoveMember removes members from a group.
425 func (s *SdlInstance) RemoveMember(group string, member ...interface{}) error {
426 return s.SRem(s.nsPrefix+group, member...)
429 //RemoveGroup removes the whole group along with it's members.
430 func (s *SdlInstance) RemoveGroup(group string) error {
431 return s.Del([]string{s.nsPrefix + group})
434 //GetMembers returns all the members from a group.
435 func (s *SdlInstance) GetMembers(group string) ([]string, error) {
436 retVal, err := s.SMembers(s.nsPrefix + group)
438 return []string{}, err
443 //IsMember returns true if given member is found from a group.
444 func (s *SdlInstance) IsMember(group string, member interface{}) (bool, error) {
445 retVal, err := s.SIsMember(s.nsPrefix+group, member)
452 //GroupSize returns the number of members in a group.
453 func (s *SdlInstance) GroupSize(group string) (int64, error) {
454 retVal, err := s.SCard(s.nsPrefix + group)
461 type iDatabase interface {
462 SubscribeChannelDB(cb sdlgoredis.ChannelNotificationCb, channelPrefix, eventSeparator string, channels ...string)
463 UnsubscribeChannelDB(channels ...string)
464 MSet(pairs ...interface{}) error
465 MSetPub(ns, message string, pairs ...interface{}) error
466 MGet(keys []string) ([]interface{}, error)
468 Del(keys []string) error
469 DelPub(channel, message string, keys []string) error
470 Keys(key string) ([]string, error)
471 SetIE(key string, oldData, newData interface{}) (bool, error)
472 SetIEPub(channel, message, key string, oldData, newData interface{}) (bool, error)
473 SetNX(key string, data interface{}) (bool, error)
474 SetNXPub(channel, message, key string, data interface{}) (bool, error)
475 DelIE(key string, data interface{}) (bool, error)
476 DelIEPub(channel, message, key string, data interface{}) (bool, error)
477 SAdd(key string, data ...interface{}) error
478 SRem(key string, data ...interface{}) error
479 SMembers(key string) ([]string, error)
480 SIsMember(key string, data interface{}) (bool, error)
481 SCard(key string) (int64, error)