2 Copyright (c) 2019 AT&T Intellectual Property.
3 Copyright (c) 2018-2019 Nokia.
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
9 http://www.apache.org/licenses/LICENSE-2.0
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
26 "gerrit.o-ran-sc.org/r/ric-plt/sdlgo/internal/sdlgoredis"
29 type iDatabase interface {
30 SubscribeChannelDB(cb sdlgoredis.ChannelNotificationCb, channelPrefix, eventSeparator string, channels ...string)
31 UnsubscribeChannelDB(channels ...string)
32 MSet(pairs ...interface{}) error
33 MSetPub(ns, message string, pairs ...interface{}) error
34 MGet(keys []string) ([]interface{}, error)
36 Del(keys []string) error
37 DelPub(channel, message string, keys []string) error
38 Keys(key string) ([]string, error)
39 SetIE(key string, oldData, newData interface{}) (bool, error)
40 SetIEPub(channel, message, key string, oldData, newData interface{}) (bool, error)
41 SetNX(key string, data interface{}) (bool, error)
42 SetNXPub(channel, message, key string, data interface{}) (bool, error)
43 DelIE(key string, data interface{}) (bool, error)
44 DelIEPub(channel, message, key string, data interface{}) (bool, error)
47 //SdlInstance provides an API to read, write and modify
48 //key-value pairs in a given namespace.
49 type SdlInstance struct {
56 //NewDatabase creates a connection to database that will be used
57 //as a backend for the key-value storage. The returned value shall
58 //be given as a parameter when calling NewKeyValStorage
59 func NewDatabase() *sdlgoredis.DB {
60 return sdlgoredis.Create()
63 //NewSdlInstance creates a new sdl instance using the given namespace.
64 //The database used as a backend is given as a parameter
65 func NewSdlInstance(NameSpace string, db iDatabase) *SdlInstance {
68 nsPrefix: "{" + NameSpace + "},",
69 eventSeparator: "___",
74 //SubscribeChannel lets you to subscribe for a events on a given channels.
75 //SDL notifications are events that are published on a specific channels.
76 //Both the channel and events are defined by the entity that is publishing
79 //When subscribing for a channel, a callback function is given as a parameter.
80 //Whenever a notification is received from a channel, this callback is called
81 //with channel and notifications as parameter (several notifications could be
82 //packed to a single callback function call). A call to SubscribeChannel function
83 //returns immediatelly, callbacks will be called asyncronously.
85 //It is possible to subscribe to different channels using different callbacks. In
86 //this case simply use SubscribeChannel function separately for each channel.
88 //When receiving events in callback routine, it is a good practive to return from
89 //callback as quickly as possible. E.g. reading in callback context should be avoided
90 //and using of Go signals is recommended. Also it should be noted that in case of several
91 //events received from different channels, callbacks are called in series one by one.
92 func (s *SdlInstance) SubscribeChannel(cb func(string, ...string), channels ...string) error {
93 s.SubscribeChannelDB(cb, s.nsPrefix, s.eventSeparator, s.setNamespaceToChannels(channels...)...)
97 //UnsubscribeChannel removes subscription from one or several channels.
98 func (s *SdlInstance) UnsubscribeChannel(channels ...string) error {
99 s.UnsubscribeChannelDB(s.setNamespaceToChannels(channels...)...)
103 //Close connection to backend database.
104 func (s *SdlInstance) Close() error {
108 func (s *SdlInstance) checkChannelsAndEvents(cmd string, channelsAndEvents []string) error {
109 if len(channelsAndEvents)%2 != 0 {
110 return fmt.Errorf("%s: Channels and events must be given as pairs", cmd)
112 for i, v := range channelsAndEvents {
114 if strings.Contains(v, s.eventSeparator) {
115 return fmt.Errorf("%s: event %s contains illegal substring (\"%s\")", cmd, v, s.eventSeparator)
121 func (s *SdlInstance) setNamespaceToChannels(channels ...string) []string {
123 for _, v := range channels {
124 retVal = append(retVal, s.nsPrefix+v)
129 func (s *SdlInstance) setNamespaceToKeys(pairs ...interface{}) ([]interface{}, error) {
130 retVal := make([]interface{}, 0)
132 for _, v := range pairs {
133 reflectType := reflect.TypeOf(v)
134 switch reflectType.Kind() {
137 x := reflect.ValueOf(v)
139 return []interface{}{}, errors.New("Key/value pairs doesn't match")
141 for i2 := 0; i2 < x.Len(); i2++ {
143 retVal = append(retVal, s.nsPrefix+x.Index(i2).Interface().(string))
145 retVal = append(retVal, x.Index(i2).Interface())
149 return []interface{}{}, errors.New("Key/value pairs doesn't match")
153 x := reflect.ValueOf(v)
155 return []interface{}{}, errors.New("Key/value pairs doesn't match")
157 for i2 := 0; i2 < x.Len(); i2++ {
159 retVal = append(retVal, s.nsPrefix+x.Index(i2).Interface().(string))
161 retVal = append(retVal, x.Index(i2).Interface())
165 return []interface{}{}, errors.New("Key/value pairs doesn't match")
169 retVal = append(retVal, s.nsPrefix+v.(string))
172 retVal = append(retVal, v)
177 if len(retVal)%2 != 0 {
178 return []interface{}{}, errors.New("Key/value pairs doesn't match")
183 func (s *SdlInstance) prepareChannelsAndEvents(channelsAndEvents []string) []string {
184 channelEventMap := make(map[string]string)
185 for i, v := range channelsAndEvents {
189 _, exists := channelEventMap[v]
191 channelEventMap[v] = channelEventMap[v] + s.eventSeparator + channelsAndEvents[i+1]
193 channelEventMap[v] = channelsAndEvents[i+1]
196 retVal := make([]string, 0)
197 for k, v := range channelEventMap {
198 retVal = append(retVal, s.nsPrefix+k)
199 retVal = append(retVal, v)
204 //SetAndPublish function writes data to shared data layer storage and send an event to
205 //a channel. Writing is done atomically, i.e. all succeeds or fails.
206 //Data to be written is given as key-value pairs. Several key-value
207 //pairs can be written with one call.
208 //The key is expected to be string whereas value can be anything, string,
209 //number, slice array or map
211 //Channels and events are given as pairs is channelsAndEvents parameter.
212 //Although it is possible to give sevral channel-event pairs, current implementation
213 //supports sending events to one channel only due to missing support in DB backend.
214 func (s *SdlInstance) SetAndPublish(channelsAndEvents []string, pairs ...interface{}) error {
215 if len(pairs)%2 != 0 {
216 return errors.New("Invalid pairs parameter")
219 keyAndData, err := s.setNamespaceToKeys(pairs...)
223 if len(channelsAndEvents) == 0 {
224 return s.MSet(keyAndData...)
226 if err := s.checkChannelsAndEvents("SetAndPublish", channelsAndEvents); err != nil {
229 channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
230 return s.MSetPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], keyAndData...)
233 //Set function writes data to shared data layer storage. Writing is done
234 //atomically, i.e. all succeeds or fails.
235 //Data to be written is given as key-value pairs. Several key-value
236 //pairs can be written with one call.
237 //The key is expected to be string whereas value can be anything, string,
238 //number, slice array or map
239 func (s *SdlInstance) Set(pairs ...interface{}) error {
244 keyAndData, err := s.setNamespaceToKeys(pairs...)
248 return s.MSet(keyAndData...)
251 //Get function atomically reads one or more keys from SDL. The returned map has the
252 //requested keys as index and data as value. If the requested key is not found
253 //from SDL, it's value is nil
254 func (s *SdlInstance) Get(keys []string) (map[string]interface{}, error) {
255 m := make(map[string]interface{})
260 var keysWithNs []string
261 for _, v := range keys {
262 keysWithNs = append(keysWithNs, s.nsPrefix+v)
264 val, err := s.MGet(keysWithNs)
268 for i, v := range val {
274 //SetIfAndPublish atomically replaces existing data with newData in SDL if data matches the oldData.
275 //If replace was done successfully, true will be returned. Also, if publishing was successfull, an event
276 //is published to a given channel.
277 func (s *SdlInstance) SetIfAndPublish(channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error) {
278 if len(channelsAndEvents) == 0 {
279 return s.SetIE(s.nsPrefix+key, oldData, newData)
281 if err := s.checkChannelsAndEvents("SetIfAndPublish", channelsAndEvents); err != nil {
284 channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
285 return s.SetIEPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], s.nsPrefix+key, oldData, newData)
288 //SetIf atomically replaces existing data with newData in SDL if data matches the oldData.
289 //If replace was done successfully, true will be returned.
290 func (s *SdlInstance) SetIf(key string, oldData, newData interface{}) (bool, error) {
291 return s.SetIE(s.nsPrefix+key, oldData, newData)
294 //SetIfNotExistsAndPublish conditionally sets the value of a key. If key already exists in SDL,
295 //then it's value is not changed. Checking the key existence and potential set operation
296 //is done atomically. If the set operation was done successfully, an event is published to a
298 func (s *SdlInstance) SetIfNotExistsAndPublish(channelsAndEvents []string, key string, data interface{}) (bool, error) {
299 if len(channelsAndEvents) == 0 {
300 return s.SetNX(s.nsPrefix+key, data)
302 if err := s.checkChannelsAndEvents("SetIfNotExistsAndPublish", channelsAndEvents); err != nil {
305 channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
306 return s.SetNXPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], s.nsPrefix+key, data)
309 //SetIfNotExists conditionally sets the value of a key. If key already exists in SDL,
310 //then it's value is not changed. Checking the key existence and potential set operation
311 //is done atomically.
312 func (s *SdlInstance) SetIfNotExists(key string, data interface{}) (bool, error) {
313 return s.SetNX(s.nsPrefix+key, data)
316 //RemoveAndPublish removes data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
317 //An event is published into a given channel if remove operation is successfull.
318 func (s *SdlInstance) RemoveAndPublish(channelsAndEvents []string, keys []string) error {
323 var keysWithNs []string
324 for _, v := range keys {
325 keysWithNs = append(keysWithNs, s.nsPrefix+v)
327 if len(channelsAndEvents) == 0 {
328 return s.Del(keysWithNs)
330 if err := s.checkChannelsAndEvents("RemoveAndPublish", channelsAndEvents); err != nil {
333 channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
334 return s.DelPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], keysWithNs)
337 //Remove data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
338 func (s *SdlInstance) Remove(keys []string) error {
343 var keysWithNs []string
344 for _, v := range keys {
345 keysWithNs = append(keysWithNs, s.nsPrefix+v)
347 err := s.Del(keysWithNs)
351 //RemoveIfAndPublish removes data from SDL conditionally and if remove was done successfully,
352 //a given event is published to channel. If existing data matches given data,
353 //key and data are removed from SDL. If remove was done successfully, true is returned.
354 func (s *SdlInstance) RemoveIfAndPublish(channelsAndEvents []string, key string, data interface{}) (bool, error) {
355 if len(channelsAndEvents) == 0 {
356 return s.DelIE(s.nsPrefix+key, data)
358 if err := s.checkChannelsAndEvents("RemoveIfAndPublish", channelsAndEvents); err != nil {
361 channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
362 return s.DelIEPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], s.nsPrefix+key, data)
365 //RemoveIf removes data from SDL conditionally. If existing data matches given data,
366 //key and data are removed from SDL. If remove was done successfully, true is returned.
367 func (s *SdlInstance) RemoveIf(key string, data interface{}) (bool, error) {
368 status, err := s.DelIE(s.nsPrefix+key, data)
375 //GetAll returns all keys under the namespace. No prior knowledge about the keys in the
376 //given namespace exists, thus operation is not guaranteed to be atomic or isolated.
377 func (s *SdlInstance) GetAll() ([]string, error) {
378 keys, err := s.Keys(s.nsPrefix + "*")
383 for _, v := range keys {
384 retVal = append(retVal, strings.Split(v, s.nsPrefix)[1])
389 //RemoveAll removes all keys under the namespace. Remove operation is not atomic, thus
390 //it is not guaranteed that all keys are removed.
391 func (s *SdlInstance) RemoveAll() error {
392 keys, err := s.Keys(s.nsPrefix + "*")
396 if (keys != nil) && (len(keys) != 0) {
402 func (s *SdlInstance) RemoveAllAndPublish(channelsAndEvents []string) error {
403 keys, err := s.Keys(s.nsPrefix + "*")
407 if (keys != nil) && (len(keys) != 0) {
408 if len(channelsAndEvents) == 0 {
411 if err := s.checkChannelsAndEvents("RemoveIfAndPublish", channelsAndEvents); err != nil {
414 channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
415 err = s.DelPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], keys)