2 Copyright (c) 2019 AT&T Intellectual Property.
3 Copyright (c) 2018-2019 Nokia.
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
9 http://www.apache.org/licenses/LICENSE-2.0
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
26 "gerrit.o-ran-sc.org/r/ric-plt/sdlgo/internal/sdlgoredis"
29 //SdlInstance provides an API to read, write and modify
30 //key-value pairs in a given namespace.
31 type SdlInstance struct {
38 //NewDatabase creates a connection to database that will be used
39 //as a backend for the key-value storage. The returned value shall
40 //be given as a parameter when calling NewKeyValStorage
41 func NewDatabase() *sdlgoredis.DB {
42 return sdlgoredis.Create()
45 //NewSdlInstance creates a new sdl instance using the given namespace.
46 //The database used as a backend is given as a parameter
47 func NewSdlInstance(NameSpace string, db iDatabase) *SdlInstance {
50 nsPrefix: "{" + NameSpace + "},",
51 eventSeparator: "___",
56 //SubscribeChannel lets you to subscribe for a events on a given channels.
57 //SDL notifications are events that are published on a specific channels.
58 //Both the channel and events are defined by the entity that is publishing
61 //When subscribing for a channel, a callback function is given as a parameter.
62 //Whenever a notification is received from a channel, this callback is called
63 //with channel and notifications as parameter (several notifications could be
64 //packed to a single callback function call). A call to SubscribeChannel function
65 //returns immediatelly, callbacks will be called asyncronously.
67 //It is possible to subscribe to different channels using different callbacks. In
68 //this case simply use SubscribeChannel function separately for each channel.
70 //When receiving events in callback routine, it is a good practive to return from
71 //callback as quickly as possible. E.g. reading in callback context should be avoided
72 //and using of Go signals is recommended. Also it should be noted that in case of several
73 //events received from different channels, callbacks are called in series one by one.
74 func (s *SdlInstance) SubscribeChannel(cb func(string, ...string), channels ...string) error {
75 s.SubscribeChannelDB(cb, s.nsPrefix, s.eventSeparator, s.setNamespaceToChannels(channels...)...)
79 //UnsubscribeChannel removes subscription from one or several channels.
80 func (s *SdlInstance) UnsubscribeChannel(channels ...string) error {
81 s.UnsubscribeChannelDB(s.setNamespaceToChannels(channels...)...)
85 //Close connection to backend database.
86 func (s *SdlInstance) Close() error {
90 func (s *SdlInstance) checkChannelsAndEvents(cmd string, channelsAndEvents []string) error {
91 if len(channelsAndEvents)%2 != 0 {
92 return fmt.Errorf("%s: Channels and events must be given as pairs", cmd)
94 for i, v := range channelsAndEvents {
96 if strings.Contains(v, s.eventSeparator) {
97 return fmt.Errorf("%s: event %s contains illegal substring (\"%s\")", cmd, v, s.eventSeparator)
103 func (s *SdlInstance) setNamespaceToChannels(channels ...string) []string {
105 for _, v := range channels {
106 retVal = append(retVal, s.nsPrefix+v)
111 func (s *SdlInstance) setNamespaceToKeys(pairs ...interface{}) ([]interface{}, error) {
112 retVal := make([]interface{}, 0)
114 for _, v := range pairs {
115 reflectType := reflect.TypeOf(v)
116 switch reflectType.Kind() {
118 x := reflect.ValueOf(v).MapRange()
120 retVal = append(retVal, s.nsPrefix+x.Key().Interface().(string))
121 retVal = append(retVal, x.Value().Interface())
125 x := reflect.ValueOf(v)
127 return []interface{}{}, errors.New("Key/value pairs doesn't match")
129 for i2 := 0; i2 < x.Len(); i2++ {
131 retVal = append(retVal, s.nsPrefix+x.Index(i2).Interface().(string))
133 retVal = append(retVal, x.Index(i2).Interface())
137 if reflectType.Elem().Kind() == reflect.Uint8 {
138 retVal = append(retVal, v)
141 return []interface{}{}, errors.New("Key/value pairs doesn't match")
146 x := reflect.ValueOf(v)
148 return []interface{}{}, errors.New("Key/value pairs doesn't match")
150 for i2 := 0; i2 < x.Len(); i2++ {
152 retVal = append(retVal, s.nsPrefix+x.Index(i2).Interface().(string))
154 retVal = append(retVal, x.Index(i2).Interface())
158 if reflectType.Elem().Kind() == reflect.Uint8 {
159 retVal = append(retVal, v)
162 return []interface{}{}, errors.New("Key/value pairs doesn't match")
167 retVal = append(retVal, s.nsPrefix+v.(string))
170 retVal = append(retVal, v)
175 if len(retVal)%2 != 0 {
176 return []interface{}{}, errors.New("Key/value pairs doesn't match")
181 func (s *SdlInstance) prepareChannelsAndEvents(channelsAndEvents []string) []string {
182 channelEventMap := make(map[string]string)
183 for i, v := range channelsAndEvents {
187 _, exists := channelEventMap[v]
189 channelEventMap[v] = channelEventMap[v] + s.eventSeparator + channelsAndEvents[i+1]
191 channelEventMap[v] = channelsAndEvents[i+1]
194 retVal := make([]string, 0)
195 for k, v := range channelEventMap {
196 retVal = append(retVal, s.nsPrefix+k)
197 retVal = append(retVal, v)
202 //SetAndPublish function writes data to shared data layer storage and sends an event to
203 //a channel. Writing is done atomically, i.e. all succeeds or fails.
204 //Data to be written is given as key-value pairs. Several key-value
205 //pairs can be written with one call.
206 //The key is expected to be string whereas value can be anything, string,
207 //number, slice array or map
209 //If data was set successfully, an event is sent to a channel.
210 //Channels and events are given as pairs is channelsAndEvents parameter.
211 //It is possible to send several events to several channels by giving several
212 //channel-event pairs.
213 // E.g. []{"channel1", "event1", "channel2", "event2", "channel1", "event3"}
214 //will send event1 and event3 to channel1 and event2 to channel2.
215 func (s *SdlInstance) SetAndPublish(channelsAndEvents []string, pairs ...interface{}) error {
216 keyAndData, err := s.setNamespaceToKeys(pairs...)
220 if len(channelsAndEvents) == 0 {
221 return s.MSet(keyAndData...)
223 if err := s.checkChannelsAndEvents("SetAndPublish", channelsAndEvents); err != nil {
226 channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
227 return s.MSetMPub(channelsAndEventsPrepared, keyAndData...)
230 //Set function writes data to shared data layer storage. Writing is done
231 //atomically, i.e. all succeeds or fails.
232 //Data to be written is given as key-value pairs. Several key-value
233 //pairs can be written with one call.
234 //The key is expected to be string whereas value can be anything, string,
235 //number, slice array or map
236 func (s *SdlInstance) Set(pairs ...interface{}) error {
241 keyAndData, err := s.setNamespaceToKeys(pairs...)
245 return s.MSet(keyAndData...)
248 //Get function atomically reads one or more keys from SDL. The returned map has the
249 //requested keys as index and data as value. If the requested key is not found
250 //from SDL, it's value is nil
251 func (s *SdlInstance) Get(keys []string) (map[string]interface{}, error) {
252 m := make(map[string]interface{})
257 var keysWithNs []string
258 for _, v := range keys {
259 keysWithNs = append(keysWithNs, s.nsPrefix+v)
261 val, err := s.MGet(keysWithNs)
265 for i, v := range val {
271 //SetIfAndPublish atomically replaces existing data with newData in SDL if data matches the oldData.
272 //If replace was done successfully, true will be returned. Also, if publishing was successfull, an event
273 //is published to a given channel.
274 func (s *SdlInstance) SetIfAndPublish(channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error) {
275 if len(channelsAndEvents) == 0 {
276 return s.SetIE(s.nsPrefix+key, oldData, newData)
278 if err := s.checkChannelsAndEvents("SetIfAndPublish", channelsAndEvents); err != nil {
281 channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
282 return s.SetIEPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], s.nsPrefix+key, oldData, newData)
285 //SetIf atomically replaces existing data with newData in SDL if data matches the oldData.
286 //If replace was done successfully, true will be returned.
287 func (s *SdlInstance) SetIf(key string, oldData, newData interface{}) (bool, error) {
288 return s.SetIE(s.nsPrefix+key, oldData, newData)
291 //SetIfNotExistsAndPublish conditionally sets the value of a key. If key already exists in SDL,
292 //then it's value is not changed. Checking the key existence and potential set operation
293 //is done atomically. If the set operation was done successfully, an event is published to a
295 func (s *SdlInstance) SetIfNotExistsAndPublish(channelsAndEvents []string, key string, data interface{}) (bool, error) {
296 if len(channelsAndEvents) == 0 {
297 return s.SetNX(s.nsPrefix+key, data)
299 if err := s.checkChannelsAndEvents("SetIfNotExistsAndPublish", channelsAndEvents); err != nil {
302 channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
303 return s.SetNXPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], s.nsPrefix+key, data)
306 //SetIfNotExists conditionally sets the value of a key. If key already exists in SDL,
307 //then it's value is not changed. Checking the key existence and potential set operation
308 //is done atomically.
309 func (s *SdlInstance) SetIfNotExists(key string, data interface{}) (bool, error) {
310 return s.SetNX(s.nsPrefix+key, data)
313 //RemoveAndPublish removes data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
314 //Trying to remove a nonexisting key is not considered as an error.
315 //An event is published into a given channel if remove operation is successfull and
316 //at least one key is removed (if several keys given). If the given key(s) doesn't exist
317 //when trying to remove, no event is published.
318 func (s *SdlInstance) RemoveAndPublish(channelsAndEvents []string, keys []string) error {
323 var keysWithNs []string
324 for _, v := range keys {
325 keysWithNs = append(keysWithNs, s.nsPrefix+v)
327 if len(channelsAndEvents) == 0 {
328 return s.Del(keysWithNs)
330 if err := s.checkChannelsAndEvents("RemoveAndPublish", channelsAndEvents); err != nil {
333 channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
334 return s.DelMPub(channelsAndEventsPrepared, keysWithNs)
337 //Remove data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
338 func (s *SdlInstance) Remove(keys []string) error {
343 var keysWithNs []string
344 for _, v := range keys {
345 keysWithNs = append(keysWithNs, s.nsPrefix+v)
347 err := s.Del(keysWithNs)
351 //RemoveIfAndPublish removes data from SDL conditionally and if remove was done successfully,
352 //a given event is published to channel. If existing data matches given data,
353 //key and data are removed from SDL. If remove was done successfully, true is returned.
354 func (s *SdlInstance) RemoveIfAndPublish(channelsAndEvents []string, key string, data interface{}) (bool, error) {
355 if len(channelsAndEvents) == 0 {
356 return s.DelIE(s.nsPrefix+key, data)
358 if err := s.checkChannelsAndEvents("RemoveIfAndPublish", channelsAndEvents); err != nil {
361 channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
362 return s.DelIEPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], s.nsPrefix+key, data)
365 //RemoveIf removes data from SDL conditionally. If existing data matches given data,
366 //key and data are removed from SDL. If remove was done successfully, true is returned.
367 func (s *SdlInstance) RemoveIf(key string, data interface{}) (bool, error) {
368 status, err := s.DelIE(s.nsPrefix+key, data)
375 //GetAll returns all keys under the namespace. No prior knowledge about the keys in the
376 //given namespace exists, thus operation is not guaranteed to be atomic or isolated.
377 func (s *SdlInstance) GetAll() ([]string, error) {
378 keys, err := s.Keys(s.nsPrefix + "*")
383 for _, v := range keys {
384 retVal = append(retVal, strings.Split(v, s.nsPrefix)[1])
389 //RemoveAll removes all keys under the namespace. Remove operation is not atomic, thus
390 //it is not guaranteed that all keys are removed.
391 func (s *SdlInstance) RemoveAll() error {
392 keys, err := s.Keys(s.nsPrefix + "*")
396 if (keys != nil) && (len(keys) != 0) {
402 //RemoveAllAndPublish removes all keys under the namespace and if successfull, it
403 //will publish an event to given channel. This operation is not atomic, thus it is
404 //not guaranteed that all keys are removed.
405 func (s *SdlInstance) RemoveAllAndPublish(channelsAndEvents []string) error {
406 keys, err := s.Keys(s.nsPrefix + "*")
410 if (keys != nil) && (len(keys) != 0) {
411 if len(channelsAndEvents) == 0 {
414 if err := s.checkChannelsAndEvents("RemoveIfAndPublish", channelsAndEvents); err != nil {
417 channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
418 err = s.DelMPub(channelsAndEventsPrepared, keys)
423 //AddMember adds a new members to a group.
425 //SDL groups are unordered collections of members where each member is
426 //unique. It is possible to add the same member several times without the
427 //need to check if it already exists.
428 func (s *SdlInstance) AddMember(group string, member ...interface{}) error {
429 return s.SAdd(s.nsPrefix+group, member...)
432 //RemoveMember removes members from a group.
433 func (s *SdlInstance) RemoveMember(group string, member ...interface{}) error {
434 return s.SRem(s.nsPrefix+group, member...)
437 //RemoveGroup removes the whole group along with it's members.
438 func (s *SdlInstance) RemoveGroup(group string) error {
439 return s.Del([]string{s.nsPrefix + group})
442 //GetMembers returns all the members from a group.
443 func (s *SdlInstance) GetMembers(group string) ([]string, error) {
444 retVal, err := s.SMembers(s.nsPrefix + group)
446 return []string{}, err
451 //IsMember returns true if given member is found from a group.
452 func (s *SdlInstance) IsMember(group string, member interface{}) (bool, error) {
453 retVal, err := s.SIsMember(s.nsPrefix+group, member)
460 //GroupSize returns the number of members in a group.
461 func (s *SdlInstance) GroupSize(group string) (int64, error) {
462 retVal, err := s.SCard(s.nsPrefix + group)
469 type iDatabase interface {
470 SubscribeChannelDB(cb sdlgoredis.ChannelNotificationCb, channelPrefix, eventSeparator string, channels ...string)
471 UnsubscribeChannelDB(channels ...string)
472 MSet(pairs ...interface{}) error
473 MSetMPub(channelsAndEvents []string, pairs ...interface{}) error
474 MGet(keys []string) ([]interface{}, error)
476 Del(keys []string) error
477 DelMPub(channelsAndEvents []string, keys []string) error
478 Keys(key string) ([]string, error)
479 SetIE(key string, oldData, newData interface{}) (bool, error)
480 SetIEPub(channel, message, key string, oldData, newData interface{}) (bool, error)
481 SetNX(key string, data interface{}) (bool, error)
482 SetNXPub(channel, message, key string, data interface{}) (bool, error)
483 DelIE(key string, data interface{}) (bool, error)
484 DelIEPub(channel, message, key string, data interface{}) (bool, error)
485 SAdd(key string, data ...interface{}) error
486 SRem(key string, data ...interface{}) error
487 SMembers(key string) ([]string, error)
488 SIsMember(key string, data interface{}) (bool, error)
489 SCard(key string) (int64, error)