Allow byte array/slice be given as value
[ric-plt/sdlgo.git] / sdl.go
1 /*
2    Copyright (c) 2019 AT&T Intellectual Property.
3    Copyright (c) 2018-2019 Nokia.
4
5    Licensed under the Apache License, Version 2.0 (the "License");
6    you may not use this file except in compliance with the License.
7    You may obtain a copy of the License at
8
9        http://www.apache.org/licenses/LICENSE-2.0
10
11    Unless required by applicable law or agreed to in writing, software
12    distributed under the License is distributed on an "AS IS" BASIS,
13    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14    See the License for the specific language governing permissions and
15    limitations under the License.
16 */
17
18 package sdlgo
19
20 import (
21         "errors"
22         "fmt"
23         "reflect"
24         "strings"
25
26         "gerrit.o-ran-sc.org/r/ric-plt/sdlgo/internal/sdlgoredis"
27 )
28
29 type iDatabase interface {
30         SubscribeChannelDB(cb sdlgoredis.ChannelNotificationCb, channelPrefix, eventSeparator string, channels ...string)
31         UnsubscribeChannelDB(channels ...string)
32         MSet(pairs ...interface{}) error
33         MSetPub(ns, message string, pairs ...interface{}) error
34         MGet(keys []string) ([]interface{}, error)
35         CloseDB() error
36         Del(keys []string) error
37         DelPub(channel, message string, keys []string) error
38         Keys(key string) ([]string, error)
39         SetIE(key string, oldData, newData interface{}) (bool, error)
40         SetIEPub(channel, message, key string, oldData, newData interface{}) (bool, error)
41         SetNX(key string, data interface{}) (bool, error)
42         SetNXPub(channel, message, key string, data interface{}) (bool, error)
43         DelIE(key string, data interface{}) (bool, error)
44         DelIEPub(channel, message, key string, data interface{}) (bool, error)
45 }
46
47 //SdlInstance provides an API to read, write and modify
48 //key-value pairs in a given namespace.
49 type SdlInstance struct {
50         nameSpace      string
51         nsPrefix       string
52         eventSeparator string
53         iDatabase
54 }
55
56 //NewDatabase creates a connection to database that will be used
57 //as a backend for the key-value storage. The returned value shall
58 //be given as a parameter when calling NewKeyValStorage
59 func NewDatabase() *sdlgoredis.DB {
60         return sdlgoredis.Create()
61 }
62
63 //NewSdlInstance creates a new sdl instance using the given namespace.
64 //The database used as a backend is given as a parameter
65 func NewSdlInstance(NameSpace string, db iDatabase) *SdlInstance {
66         return &SdlInstance{
67                 nameSpace:      NameSpace,
68                 nsPrefix:       "{" + NameSpace + "},",
69                 eventSeparator: "___",
70                 iDatabase:      db,
71         }
72 }
73
74 //SubscribeChannel lets you to subscribe for a events on a given channels.
75 //SDL notifications are events that are published on a specific channels.
76 //Both the channel and events are defined by the entity that is publishing
77 //the events.
78 //
79 //When subscribing for a channel, a callback function is given as a parameter.
80 //Whenever a notification is received from a channel, this callback is called
81 //with channel and notifications as parameter (several notifications could be
82 //packed to a single callback function call). A call to SubscribeChannel function
83 //returns immediatelly, callbacks will be called asyncronously.
84 //
85 //It is possible to subscribe to different channels using different callbacks. In
86 //this case simply use SubscribeChannel function separately for each channel.
87 //
88 //When receiving events in callback routine, it is a good practive to return from
89 //callback as quickly as possible. E.g. reading in callback context should be avoided
90 //and using of Go signals is recommended. Also it should be noted that in case of several
91 //events received from different channels, callbacks are called in series one by one.
92 func (s *SdlInstance) SubscribeChannel(cb func(string, ...string), channels ...string) error {
93         s.SubscribeChannelDB(cb, s.nsPrefix, s.eventSeparator, s.setNamespaceToChannels(channels...)...)
94         return nil
95 }
96
97 //UnsubscribeChannel removes subscription from one or several channels.
98 func (s *SdlInstance) UnsubscribeChannel(channels ...string) error {
99         s.UnsubscribeChannelDB(s.setNamespaceToChannels(channels...)...)
100         return nil
101 }
102
103 //Close connection to backend database.
104 func (s *SdlInstance) Close() error {
105         return s.CloseDB()
106 }
107
108 func (s *SdlInstance) checkChannelsAndEvents(cmd string, channelsAndEvents []string) error {
109         if len(channelsAndEvents)%2 != 0 {
110                 return fmt.Errorf("%s: Channels and events must be given as pairs", cmd)
111         }
112         for i, v := range channelsAndEvents {
113                 if i%2 != 0 {
114                         if strings.Contains(v, s.eventSeparator) {
115                                 return fmt.Errorf("%s: event %s contains illegal substring (\"%s\")", cmd, v, s.eventSeparator)
116                         }
117                 }
118         }
119         return nil
120 }
121 func (s *SdlInstance) setNamespaceToChannels(channels ...string) []string {
122         var retVal []string
123         for _, v := range channels {
124                 retVal = append(retVal, s.nsPrefix+v)
125         }
126         return retVal
127 }
128
129 func (s *SdlInstance) setNamespaceToKeys(pairs ...interface{}) ([]interface{}, error) {
130         retVal := make([]interface{}, 0)
131         shouldBeKey := true
132         for _, v := range pairs {
133                 reflectType := reflect.TypeOf(v)
134                 switch reflectType.Kind() {
135                 case reflect.Slice:
136                         if shouldBeKey {
137                                 x := reflect.ValueOf(v)
138                                 if x.Len()%2 != 0 {
139                                         return []interface{}{}, errors.New("Key/value pairs doesn't match")
140                                 }
141                                 for i2 := 0; i2 < x.Len(); i2++ {
142                                         if i2%2 == 0 {
143                                                 retVal = append(retVal, s.nsPrefix+x.Index(i2).Interface().(string))
144                                         } else {
145                                                 retVal = append(retVal, x.Index(i2).Interface())
146                                         }
147                                 }
148                         } else {
149                                 if reflectType.Elem().Kind() == reflect.Uint8 {
150                                         retVal = append(retVal, v)
151                                         shouldBeKey = true
152                                 } else {
153                                         return []interface{}{}, errors.New("Key/value pairs doesn't match")
154                                 }
155                         }
156                 case reflect.Array:
157                         if shouldBeKey {
158                                 x := reflect.ValueOf(v)
159                                 if x.Len()%2 != 0 {
160                                         return []interface{}{}, errors.New("Key/value pairs doesn't match")
161                                 }
162                                 for i2 := 0; i2 < x.Len(); i2++ {
163                                         if i2%2 == 0 {
164                                                 retVal = append(retVal, s.nsPrefix+x.Index(i2).Interface().(string))
165                                         } else {
166                                                 retVal = append(retVal, x.Index(i2).Interface())
167                                         }
168                                 }
169                         } else {
170                                 if reflectType.Elem().Kind() == reflect.Uint8 {
171                                         retVal = append(retVal, v)
172                                         shouldBeKey = true
173                                 } else {
174                                         return []interface{}{}, errors.New("Key/value pairs doesn't match")
175                                 }
176                         }
177                 default:
178                         if shouldBeKey {
179                                 retVal = append(retVal, s.nsPrefix+v.(string))
180                                 shouldBeKey = false
181                         } else {
182                                 retVal = append(retVal, v)
183                                 shouldBeKey = true
184                         }
185                 }
186         }
187         if len(retVal)%2 != 0 {
188                 return []interface{}{}, errors.New("Key/value pairs doesn't match")
189         }
190         return retVal, nil
191 }
192
193 func (s *SdlInstance) prepareChannelsAndEvents(channelsAndEvents []string) []string {
194         channelEventMap := make(map[string]string)
195         for i, v := range channelsAndEvents {
196                 if i%2 != 0 {
197                         continue
198                 }
199                 _, exists := channelEventMap[v]
200                 if exists {
201                         channelEventMap[v] = channelEventMap[v] + s.eventSeparator + channelsAndEvents[i+1]
202                 } else {
203                         channelEventMap[v] = channelsAndEvents[i+1]
204                 }
205         }
206         retVal := make([]string, 0)
207         for k, v := range channelEventMap {
208                 retVal = append(retVal, s.nsPrefix+k)
209                 retVal = append(retVal, v)
210         }
211         return retVal
212 }
213
214 //SetAndPublish function writes data to shared data layer storage and send an event to
215 //a channel. Writing is done atomically, i.e. all succeeds or fails.
216 //Data to be written is given as key-value pairs. Several key-value
217 //pairs can be written with one call.
218 //The key is expected to be string whereas value can be anything, string,
219 //number, slice array or map
220 //
221 //Channels and events are given as pairs is channelsAndEvents parameter.
222 //Although it is possible to give sevral channel-event pairs, current implementation
223 //supports sending events to one channel only due to missing support in DB backend.
224 func (s *SdlInstance) SetAndPublish(channelsAndEvents []string, pairs ...interface{}) error {
225         if len(pairs)%2 != 0 {
226                 return errors.New("Invalid pairs parameter")
227         }
228
229         keyAndData, err := s.setNamespaceToKeys(pairs...)
230         if err != nil {
231                 return err
232         }
233         if len(channelsAndEvents) == 0 {
234                 return s.MSet(keyAndData...)
235         }
236         if err := s.checkChannelsAndEvents("SetAndPublish", channelsAndEvents); err != nil {
237                 return err
238         }
239         channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
240         return s.MSetPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], keyAndData...)
241 }
242
243 //Set function writes data to shared data layer storage. Writing is done
244 //atomically, i.e. all succeeds or fails.
245 //Data to be written is given as key-value pairs. Several key-value
246 //pairs can be written with one call.
247 //The key is expected to be string whereas value can be anything, string,
248 //number, slice array or map
249 func (s *SdlInstance) Set(pairs ...interface{}) error {
250         if len(pairs) == 0 {
251                 return nil
252         }
253
254         keyAndData, err := s.setNamespaceToKeys(pairs...)
255         if err != nil {
256                 return err
257         }
258         return s.MSet(keyAndData...)
259 }
260
261 //Get function atomically reads one or more keys from SDL. The returned map has the
262 //requested keys as index and data as value. If the requested key is not found
263 //from SDL, it's value is nil
264 func (s *SdlInstance) Get(keys []string) (map[string]interface{}, error) {
265         m := make(map[string]interface{})
266         if len(keys) == 0 {
267                 return m, nil
268         }
269
270         var keysWithNs []string
271         for _, v := range keys {
272                 keysWithNs = append(keysWithNs, s.nsPrefix+v)
273         }
274         val, err := s.MGet(keysWithNs)
275         if err != nil {
276                 return m, err
277         }
278         for i, v := range val {
279                 m[keys[i]] = v
280         }
281         return m, err
282 }
283
284 //SetIfAndPublish atomically replaces existing data with newData in SDL if data matches the oldData.
285 //If replace was done successfully, true will be returned. Also, if publishing was successfull, an event
286 //is published to a given channel.
287 func (s *SdlInstance) SetIfAndPublish(channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error) {
288         if len(channelsAndEvents) == 0 {
289                 return s.SetIE(s.nsPrefix+key, oldData, newData)
290         }
291         if err := s.checkChannelsAndEvents("SetIfAndPublish", channelsAndEvents); err != nil {
292                 return false, err
293         }
294         channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
295         return s.SetIEPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], s.nsPrefix+key, oldData, newData)
296 }
297
298 //SetIf atomically replaces existing data with newData in SDL if data matches the oldData.
299 //If replace was done successfully, true will be returned.
300 func (s *SdlInstance) SetIf(key string, oldData, newData interface{}) (bool, error) {
301         return s.SetIE(s.nsPrefix+key, oldData, newData)
302 }
303
304 //SetIfNotExistsAndPublish conditionally sets the value of a key. If key already exists in SDL,
305 //then it's value is not changed. Checking the key existence and potential set operation
306 //is done atomically. If the set operation was done successfully, an event is published to a
307 //given channel.
308 func (s *SdlInstance) SetIfNotExistsAndPublish(channelsAndEvents []string, key string, data interface{}) (bool, error) {
309         if len(channelsAndEvents) == 0 {
310                 return s.SetNX(s.nsPrefix+key, data)
311         }
312         if err := s.checkChannelsAndEvents("SetIfNotExistsAndPublish", channelsAndEvents); err != nil {
313                 return false, err
314         }
315         channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
316         return s.SetNXPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], s.nsPrefix+key, data)
317 }
318
319 //SetIfNotExists conditionally sets the value of a key. If key already exists in SDL,
320 //then it's value is not changed. Checking the key existence and potential set operation
321 //is done atomically.
322 func (s *SdlInstance) SetIfNotExists(key string, data interface{}) (bool, error) {
323         return s.SetNX(s.nsPrefix+key, data)
324 }
325
326 //RemoveAndPublish removes data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
327 //An event is published into a given channel if remove operation is successfull.
328 func (s *SdlInstance) RemoveAndPublish(channelsAndEvents []string, keys []string) error {
329         if len(keys) == 0 {
330                 return nil
331         }
332
333         var keysWithNs []string
334         for _, v := range keys {
335                 keysWithNs = append(keysWithNs, s.nsPrefix+v)
336         }
337         if len(channelsAndEvents) == 0 {
338                 return s.Del(keysWithNs)
339         }
340         if err := s.checkChannelsAndEvents("RemoveAndPublish", channelsAndEvents); err != nil {
341                 return err
342         }
343         channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
344         return s.DelPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], keysWithNs)
345 }
346
347 //Remove data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
348 func (s *SdlInstance) Remove(keys []string) error {
349         if len(keys) == 0 {
350                 return nil
351         }
352
353         var keysWithNs []string
354         for _, v := range keys {
355                 keysWithNs = append(keysWithNs, s.nsPrefix+v)
356         }
357         err := s.Del(keysWithNs)
358         return err
359 }
360
361 //RemoveIfAndPublish removes data from SDL conditionally and if remove was done successfully,
362 //a given event is published to channel. If existing data matches given data,
363 //key and data are removed from SDL. If remove was done successfully, true is returned.
364 func (s *SdlInstance) RemoveIfAndPublish(channelsAndEvents []string, key string, data interface{}) (bool, error) {
365         if len(channelsAndEvents) == 0 {
366                 return s.DelIE(s.nsPrefix+key, data)
367         }
368         if err := s.checkChannelsAndEvents("RemoveIfAndPublish", channelsAndEvents); err != nil {
369                 return false, err
370         }
371         channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
372         return s.DelIEPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], s.nsPrefix+key, data)
373 }
374
375 //RemoveIf removes data from SDL conditionally. If existing data matches given data,
376 //key and data are removed from SDL. If remove was done successfully, true is returned.
377 func (s *SdlInstance) RemoveIf(key string, data interface{}) (bool, error) {
378         status, err := s.DelIE(s.nsPrefix+key, data)
379         if err != nil {
380                 return false, err
381         }
382         return status, nil
383 }
384
385 //GetAll returns all keys under the namespace. No prior knowledge about the keys in the
386 //given namespace exists, thus operation is not guaranteed to be atomic or isolated.
387 func (s *SdlInstance) GetAll() ([]string, error) {
388         keys, err := s.Keys(s.nsPrefix + "*")
389         var retVal []string
390         if err != nil {
391                 return retVal, err
392         }
393         for _, v := range keys {
394                 retVal = append(retVal, strings.Split(v, s.nsPrefix)[1])
395         }
396         return retVal, err
397 }
398
399 //RemoveAll removes all keys under the namespace. Remove operation is not atomic, thus
400 //it is not guaranteed that all keys are removed.
401 func (s *SdlInstance) RemoveAll() error {
402         keys, err := s.Keys(s.nsPrefix + "*")
403         if err != nil {
404                 return err
405         }
406         if (keys != nil) && (len(keys) != 0) {
407                 err = s.Del(keys)
408         }
409         return err
410 }
411
412 //RemoveAllAndPublish removes all keys under the namespace and if successfull, it
413 //will publish an event to given channel. This operation is not atomic, thus it is
414 //not guaranteed that all keys are removed.
415 func (s *SdlInstance) RemoveAllAndPublish(channelsAndEvents []string) error {
416         keys, err := s.Keys(s.nsPrefix + "*")
417         if err != nil {
418                 return err
419         }
420         if (keys != nil) && (len(keys) != 0) {
421                 if len(channelsAndEvents) == 0 {
422                         return s.Del(keys)
423                 }
424                 if err := s.checkChannelsAndEvents("RemoveIfAndPublish", channelsAndEvents); err != nil {
425                         return err
426                 }
427                 channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
428                 err = s.DelPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], keys)
429         }
430         return err
431
432 }