2 Copyright (c) 2019 AT&T Intellectual Property.
3 Copyright (c) 2018-2019 Nokia.
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
9 http://www.apache.org/licenses/LICENSE-2.0
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
26 "gerrit.o-ran-sc.org/r/ric-plt/sdlgo/internal/sdlgoredis"
29 type iDatabase interface {
30 SubscribeChannelDB(cb sdlgoredis.ChannelNotificationCb, channelPrefix, eventSeparator string, channels ...string)
31 UnsubscribeChannelDB(channels ...string)
32 MSet(pairs ...interface{}) error
33 MSetPub(ns, message string, pairs ...interface{}) error
34 MGet(keys []string) ([]interface{}, error)
36 Del(keys []string) error
37 DelPub(channel, message string, keys []string) error
38 Keys(key string) ([]string, error)
39 SetIE(key string, oldData, newData interface{}) (bool, error)
40 SetIEPub(channel, message, key string, oldData, newData interface{}) (bool, error)
41 SetNX(key string, data interface{}) (bool, error)
42 SetNXPub(channel, message, key string, data interface{}) (bool, error)
43 DelIE(key string, data interface{}) (bool, error)
44 DelIEPub(channel, message, key string, data interface{}) (bool, error)
47 //SdlInstance provides an API to read, write and modify
48 //key-value pairs in a given namespace.
49 type SdlInstance struct {
56 //NewDatabase creates a connection to database that will be used
57 //as a backend for the key-value storage. The returned value shall
58 //be given as a parameter when calling NewKeyValStorage
59 func NewDatabase() *sdlgoredis.DB {
60 return sdlgoredis.Create()
63 //NewSdlInstance creates a new sdl instance using the given namespace.
64 //The database used as a backend is given as a parameter
65 func NewSdlInstance(NameSpace string, db iDatabase) *SdlInstance {
68 nsPrefix: "{" + NameSpace + "},",
69 eventSeparator: "___",
74 //SubscribeChannel lets you to subscribe for a events on a given channels.
75 //SDL notifications are events that are published on a specific channels.
76 //Both the channel and events are defined by the entity that is publishing
79 //When subscribing for a channel, a callback function is given as a parameter.
80 //Whenever a notification is received from a channel, this callback is called
81 //with channel and notifications as parameter (several notifications could be
82 //packed to a single callback function call). A call to SubscribeChannel function
83 //returns immediatelly, callbacks will be called asyncronously.
85 //It is possible to subscribe to different channels using different callbacks. In
86 //this case simply use SubscribeChannel function separately for each channel.
88 //When receiving events in callback routine, it is a good practive to return from
89 //callback as quickly as possible. E.g. reading in callback context should be avoided
90 //and using of Go signals is recommended. Also it should be noted that in case of several
91 //events received from different channels, callbacks are called in series one by one.
92 func (s *SdlInstance) SubscribeChannel(cb func(string, ...string), channels ...string) error {
93 s.SubscribeChannelDB(cb, s.nsPrefix, s.eventSeparator, s.setNamespaceToChannels(channels...)...)
97 //UnsubscribeChannel removes subscription from one or several channels.
98 func (s *SdlInstance) UnsubscribeChannel(channels ...string) error {
99 s.UnsubscribeChannelDB(s.setNamespaceToChannels(channels...)...)
103 //Close connection to backend database.
104 func (s *SdlInstance) Close() error {
108 func (s *SdlInstance) checkChannelsAndEvents(cmd string, channelsAndEvents []string) error {
109 if len(channelsAndEvents)%2 != 0 {
110 return fmt.Errorf("%s: Channels and events must be given as pairs", cmd)
112 for i, v := range channelsAndEvents {
114 if strings.Contains(v, s.eventSeparator) {
115 return fmt.Errorf("%s: event %s contains illegal substring (\"%s\")", cmd, v, s.eventSeparator)
121 func (s *SdlInstance) setNamespaceToChannels(channels ...string) []string {
123 for _, v := range channels {
124 retVal = append(retVal, s.nsPrefix+v)
129 func (s *SdlInstance) setNamespaceToKeys(pairs ...interface{}) ([]interface{}, error) {
130 retVal := make([]interface{}, 0)
132 for _, v := range pairs {
133 reflectType := reflect.TypeOf(v)
134 switch reflectType.Kind() {
137 x := reflect.ValueOf(v)
139 return []interface{}{}, errors.New("Key/value pairs doesn't match")
141 for i2 := 0; i2 < x.Len(); i2++ {
143 retVal = append(retVal, s.nsPrefix+x.Index(i2).Interface().(string))
145 retVal = append(retVal, x.Index(i2).Interface())
149 if reflectType.Elem().Kind() == reflect.Uint8 {
150 retVal = append(retVal, v)
153 return []interface{}{}, errors.New("Key/value pairs doesn't match")
158 x := reflect.ValueOf(v)
160 return []interface{}{}, errors.New("Key/value pairs doesn't match")
162 for i2 := 0; i2 < x.Len(); i2++ {
164 retVal = append(retVal, s.nsPrefix+x.Index(i2).Interface().(string))
166 retVal = append(retVal, x.Index(i2).Interface())
170 if reflectType.Elem().Kind() == reflect.Uint8 {
171 retVal = append(retVal, v)
174 return []interface{}{}, errors.New("Key/value pairs doesn't match")
179 retVal = append(retVal, s.nsPrefix+v.(string))
182 retVal = append(retVal, v)
187 if len(retVal)%2 != 0 {
188 return []interface{}{}, errors.New("Key/value pairs doesn't match")
193 func (s *SdlInstance) prepareChannelsAndEvents(channelsAndEvents []string) []string {
194 channelEventMap := make(map[string]string)
195 for i, v := range channelsAndEvents {
199 _, exists := channelEventMap[v]
201 channelEventMap[v] = channelEventMap[v] + s.eventSeparator + channelsAndEvents[i+1]
203 channelEventMap[v] = channelsAndEvents[i+1]
206 retVal := make([]string, 0)
207 for k, v := range channelEventMap {
208 retVal = append(retVal, s.nsPrefix+k)
209 retVal = append(retVal, v)
214 //SetAndPublish function writes data to shared data layer storage and send an event to
215 //a channel. Writing is done atomically, i.e. all succeeds or fails.
216 //Data to be written is given as key-value pairs. Several key-value
217 //pairs can be written with one call.
218 //The key is expected to be string whereas value can be anything, string,
219 //number, slice array or map
221 //Channels and events are given as pairs is channelsAndEvents parameter.
222 //Although it is possible to give sevral channel-event pairs, current implementation
223 //supports sending events to one channel only due to missing support in DB backend.
224 func (s *SdlInstance) SetAndPublish(channelsAndEvents []string, pairs ...interface{}) error {
225 if len(pairs)%2 != 0 {
226 return errors.New("Invalid pairs parameter")
229 keyAndData, err := s.setNamespaceToKeys(pairs...)
233 if len(channelsAndEvents) == 0 {
234 return s.MSet(keyAndData...)
236 if err := s.checkChannelsAndEvents("SetAndPublish", channelsAndEvents); err != nil {
239 channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
240 return s.MSetPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], keyAndData...)
243 //Set function writes data to shared data layer storage. Writing is done
244 //atomically, i.e. all succeeds or fails.
245 //Data to be written is given as key-value pairs. Several key-value
246 //pairs can be written with one call.
247 //The key is expected to be string whereas value can be anything, string,
248 //number, slice array or map
249 func (s *SdlInstance) Set(pairs ...interface{}) error {
254 keyAndData, err := s.setNamespaceToKeys(pairs...)
258 return s.MSet(keyAndData...)
261 //Get function atomically reads one or more keys from SDL. The returned map has the
262 //requested keys as index and data as value. If the requested key is not found
263 //from SDL, it's value is nil
264 func (s *SdlInstance) Get(keys []string) (map[string]interface{}, error) {
265 m := make(map[string]interface{})
270 var keysWithNs []string
271 for _, v := range keys {
272 keysWithNs = append(keysWithNs, s.nsPrefix+v)
274 val, err := s.MGet(keysWithNs)
278 for i, v := range val {
284 //SetIfAndPublish atomically replaces existing data with newData in SDL if data matches the oldData.
285 //If replace was done successfully, true will be returned. Also, if publishing was successfull, an event
286 //is published to a given channel.
287 func (s *SdlInstance) SetIfAndPublish(channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error) {
288 if len(channelsAndEvents) == 0 {
289 return s.SetIE(s.nsPrefix+key, oldData, newData)
291 if err := s.checkChannelsAndEvents("SetIfAndPublish", channelsAndEvents); err != nil {
294 channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
295 return s.SetIEPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], s.nsPrefix+key, oldData, newData)
298 //SetIf atomically replaces existing data with newData in SDL if data matches the oldData.
299 //If replace was done successfully, true will be returned.
300 func (s *SdlInstance) SetIf(key string, oldData, newData interface{}) (bool, error) {
301 return s.SetIE(s.nsPrefix+key, oldData, newData)
304 //SetIfNotExistsAndPublish conditionally sets the value of a key. If key already exists in SDL,
305 //then it's value is not changed. Checking the key existence and potential set operation
306 //is done atomically. If the set operation was done successfully, an event is published to a
308 func (s *SdlInstance) SetIfNotExistsAndPublish(channelsAndEvents []string, key string, data interface{}) (bool, error) {
309 if len(channelsAndEvents) == 0 {
310 return s.SetNX(s.nsPrefix+key, data)
312 if err := s.checkChannelsAndEvents("SetIfNotExistsAndPublish", channelsAndEvents); err != nil {
315 channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
316 return s.SetNXPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], s.nsPrefix+key, data)
319 //SetIfNotExists conditionally sets the value of a key. If key already exists in SDL,
320 //then it's value is not changed. Checking the key existence and potential set operation
321 //is done atomically.
322 func (s *SdlInstance) SetIfNotExists(key string, data interface{}) (bool, error) {
323 return s.SetNX(s.nsPrefix+key, data)
326 //RemoveAndPublish removes data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
327 //An event is published into a given channel if remove operation is successfull.
328 func (s *SdlInstance) RemoveAndPublish(channelsAndEvents []string, keys []string) error {
333 var keysWithNs []string
334 for _, v := range keys {
335 keysWithNs = append(keysWithNs, s.nsPrefix+v)
337 if len(channelsAndEvents) == 0 {
338 return s.Del(keysWithNs)
340 if err := s.checkChannelsAndEvents("RemoveAndPublish", channelsAndEvents); err != nil {
343 channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
344 return s.DelPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], keysWithNs)
347 //Remove data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
348 func (s *SdlInstance) Remove(keys []string) error {
353 var keysWithNs []string
354 for _, v := range keys {
355 keysWithNs = append(keysWithNs, s.nsPrefix+v)
357 err := s.Del(keysWithNs)
361 //RemoveIfAndPublish removes data from SDL conditionally and if remove was done successfully,
362 //a given event is published to channel. If existing data matches given data,
363 //key and data are removed from SDL. If remove was done successfully, true is returned.
364 func (s *SdlInstance) RemoveIfAndPublish(channelsAndEvents []string, key string, data interface{}) (bool, error) {
365 if len(channelsAndEvents) == 0 {
366 return s.DelIE(s.nsPrefix+key, data)
368 if err := s.checkChannelsAndEvents("RemoveIfAndPublish", channelsAndEvents); err != nil {
371 channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
372 return s.DelIEPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], s.nsPrefix+key, data)
375 //RemoveIf removes data from SDL conditionally. If existing data matches given data,
376 //key and data are removed from SDL. If remove was done successfully, true is returned.
377 func (s *SdlInstance) RemoveIf(key string, data interface{}) (bool, error) {
378 status, err := s.DelIE(s.nsPrefix+key, data)
385 //GetAll returns all keys under the namespace. No prior knowledge about the keys in the
386 //given namespace exists, thus operation is not guaranteed to be atomic or isolated.
387 func (s *SdlInstance) GetAll() ([]string, error) {
388 keys, err := s.Keys(s.nsPrefix + "*")
393 for _, v := range keys {
394 retVal = append(retVal, strings.Split(v, s.nsPrefix)[1])
399 //RemoveAll removes all keys under the namespace. Remove operation is not atomic, thus
400 //it is not guaranteed that all keys are removed.
401 func (s *SdlInstance) RemoveAll() error {
402 keys, err := s.Keys(s.nsPrefix + "*")
406 if (keys != nil) && (len(keys) != 0) {
412 //RemoveAllAndPublish removes all keys under the namespace and if successfull, it
413 //will publish an event to given channel. This operation is not atomic, thus it is
414 //not guaranteed that all keys are removed.
415 func (s *SdlInstance) RemoveAllAndPublish(channelsAndEvents []string) error {
416 keys, err := s.Keys(s.nsPrefix + "*")
420 if (keys != nil) && (len(keys) != 0) {
421 if len(channelsAndEvents) == 0 {
424 if err := s.checkChannelsAndEvents("RemoveIfAndPublish", channelsAndEvents); err != nil {
427 channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
428 err = s.DelPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], keys)