Add support for notifications
[ric-plt/sdlgo.git] / sdl.go
1 /*
2    Copyright (c) 2019 AT&T Intellectual Property.
3    Copyright (c) 2018-2019 Nokia.
4
5    Licensed under the Apache License, Version 2.0 (the "License");
6    you may not use this file except in compliance with the License.
7    You may obtain a copy of the License at
8
9        http://www.apache.org/licenses/LICENSE-2.0
10
11    Unless required by applicable law or agreed to in writing, software
12    distributed under the License is distributed on an "AS IS" BASIS,
13    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14    See the License for the specific language governing permissions and
15    limitations under the License.
16 */
17
18 package sdlgo
19
20 import (
21         "errors"
22         "fmt"
23         "reflect"
24         "strings"
25
26         "gerrit.o-ran-sc.org/r/ric-plt/sdlgo/internal/sdlgoredis"
27 )
28
29 type iDatabase interface {
30         SubscribeChannelDB(cb sdlgoredis.ChannelNotificationCb, channelPrefix, eventSeparator string, channels ...string)
31         UnsubscribeChannelDB(channels ...string)
32         MSet(pairs ...interface{}) error
33         MSetPub(ns, message string, pairs ...interface{}) error
34         MGet(keys []string) ([]interface{}, error)
35         CloseDB() error
36         Del(keys []string) error
37         DelPub(channel, message string, keys []string) error
38         Keys(key string) ([]string, error)
39         SetIE(key string, oldData, newData interface{}) (bool, error)
40         SetIEPub(channel, message, key string, oldData, newData interface{}) (bool, error)
41         SetNX(key string, data interface{}) (bool, error)
42         SetNXPub(channel, message, key string, data interface{}) (bool, error)
43         DelIE(key string, data interface{}) (bool, error)
44         DelIEPub(channel, message, key string, data interface{}) (bool, error)
45 }
46
47 //SdlInstance provides an API to read, write and modify
48 //key-value pairs in a given namespace.
49 type SdlInstance struct {
50         nameSpace      string
51         nsPrefix       string
52         eventSeparator string
53         iDatabase
54 }
55
56 //NewDatabase creates a connection to database that will be used
57 //as a backend for the key-value storage. The returned value shall
58 //be given as a parameter when calling NewKeyValStorage
59 func NewDatabase() *sdlgoredis.DB {
60         return sdlgoredis.Create()
61 }
62
63 //NewSdlInstance creates a new sdl instance using the given namespace.
64 //The database used as a backend is given as a parameter
65 func NewSdlInstance(NameSpace string, db iDatabase) *SdlInstance {
66         return &SdlInstance{
67                 nameSpace:      NameSpace,
68                 nsPrefix:       "{" + NameSpace + "},",
69                 eventSeparator: "___",
70                 iDatabase:      db,
71         }
72 }
73
74 //SubscribeChannel lets you to subscribe for a events on a given channels.
75 //SDL notifications are events that are published on a specific channels.
76 //Both the channel and events are defined by the entity that is publishing
77 //the events.
78 //
79 //When subscribing for a channel, a callback function is given as a parameter.
80 //Whenever a notification is received from a channel, this callback is called
81 //with channel and notifications as parameter (several notifications could be
82 //packed to a single callback function call). A call to SubscribeChannel function
83 //returns immediatelly, callbacks will be called asyncronously.
84 //
85 //It is possible to subscribe to different channels using different callbacks. In
86 //this case simply use SubscribeChannel function separately for each channel.
87 //
88 //When receiving events in callback routine, it is a good practive to return from
89 //callback as quickly as possible. E.g. reading in callback context should be avoided
90 //and using of Go signals is recommended. Also it should be noted that in case of several
91 //events received from different channels, callbacks are called in series one by one.
92 func (s *SdlInstance) SubscribeChannel(cb func(string, ...string), channels ...string) error {
93         s.SubscribeChannelDB(cb, s.nsPrefix, s.eventSeparator, s.setNamespaceToChannels(channels...)...)
94         return nil
95 }
96
97 //UnsubscribeChannel removes subscription from one or several channels.
98 func (s *SdlInstance) UnsubscribeChannel(channels ...string) error {
99         s.UnsubscribeChannelDB(s.setNamespaceToChannels(channels...)...)
100         return nil
101 }
102
103 //Close connection to backend database.
104 func (s *SdlInstance) Close() error {
105         return s.CloseDB()
106 }
107
108 func (s *SdlInstance) checkChannelsAndEvents(cmd string, channelsAndEvents []string) error {
109         if len(channelsAndEvents)%2 != 0 {
110                 return fmt.Errorf("%s: Channels and events must be given as pairs", cmd)
111         }
112         for i, v := range channelsAndEvents {
113                 if i%2 != 0 {
114                         if strings.Contains(v, s.eventSeparator) {
115                                 return fmt.Errorf("%s: event %s contains illegal substring (\"%s\")", cmd, v, s.eventSeparator)
116                         }
117                 }
118         }
119         return nil
120 }
121 func (s *SdlInstance) setNamespaceToChannels(channels ...string) []string {
122         var retVal []string
123         for _, v := range channels {
124                 retVal = append(retVal, s.nsPrefix+v)
125         }
126         return retVal
127 }
128
129 func (s *SdlInstance) setNamespaceToKeys(pairs ...interface{}) ([]interface{}, error) {
130         retVal := make([]interface{}, 0)
131         shouldBeKey := true
132         for _, v := range pairs {
133                 reflectType := reflect.TypeOf(v)
134                 switch reflectType.Kind() {
135                 case reflect.Slice:
136                         if shouldBeKey {
137                                 x := reflect.ValueOf(v)
138                                 if x.Len()%2 != 0 {
139                                         return []interface{}{}, errors.New("Key/value pairs doesn't match")
140                                 }
141                                 for i2 := 0; i2 < x.Len(); i2++ {
142                                         if i2%2 == 0 {
143                                                 retVal = append(retVal, s.nsPrefix+x.Index(i2).Interface().(string))
144                                         } else {
145                                                 retVal = append(retVal, x.Index(i2).Interface())
146                                         }
147                                 }
148                         } else {
149                                 return []interface{}{}, errors.New("Key/value pairs doesn't match")
150                         }
151                 case reflect.Array:
152                         if shouldBeKey {
153                                 x := reflect.ValueOf(v)
154                                 if x.Len()%2 != 0 {
155                                         return []interface{}{}, errors.New("Key/value pairs doesn't match")
156                                 }
157                                 for i2 := 0; i2 < x.Len(); i2++ {
158                                         if i2%2 == 0 {
159                                                 retVal = append(retVal, s.nsPrefix+x.Index(i2).Interface().(string))
160                                         } else {
161                                                 retVal = append(retVal, x.Index(i2).Interface())
162                                         }
163                                 }
164                         } else {
165                                 return []interface{}{}, errors.New("Key/value pairs doesn't match")
166                         }
167                 default:
168                         if shouldBeKey {
169                                 retVal = append(retVal, s.nsPrefix+v.(string))
170                                 shouldBeKey = false
171                         } else {
172                                 retVal = append(retVal, v)
173                                 shouldBeKey = true
174                         }
175                 }
176         }
177         if len(retVal)%2 != 0 {
178                 return []interface{}{}, errors.New("Key/value pairs doesn't match")
179         }
180         return retVal, nil
181 }
182
183 func (s *SdlInstance) prepareChannelsAndEvents(channelsAndEvents []string) []string {
184         channelEventMap := make(map[string]string)
185         for i, v := range channelsAndEvents {
186                 if i%2 != 0 {
187                         continue
188                 }
189                 _, exists := channelEventMap[v]
190                 if exists {
191                         channelEventMap[v] = channelEventMap[v] + s.eventSeparator + channelsAndEvents[i+1]
192                 } else {
193                         channelEventMap[v] = channelsAndEvents[i+1]
194                 }
195         }
196         retVal := make([]string, 0)
197         for k, v := range channelEventMap {
198                 retVal = append(retVal, s.nsPrefix+k)
199                 retVal = append(retVal, v)
200         }
201         return retVal
202 }
203
204 //SetAndPublish function writes data to shared data layer storage and send an event to
205 //a channel. Writing is done atomically, i.e. all succeeds or fails.
206 //Data to be written is given as key-value pairs. Several key-value
207 //pairs can be written with one call.
208 //The key is expected to be string whereas value can be anything, string,
209 //number, slice array or map
210 //
211 //Channels and events are given as pairs is channelsAndEvents parameter.
212 //Although it is possible to give sevral channel-event pairs, current implementation
213 //supports sending events to one channel only due to missing support in DB backend.
214 func (s *SdlInstance) SetAndPublish(channelsAndEvents []string, pairs ...interface{}) error {
215         if len(pairs)%2 != 0 {
216                 return errors.New("Invalid pairs parameter")
217         }
218
219         keyAndData, err := s.setNamespaceToKeys(pairs...)
220         if err != nil {
221                 return err
222         }
223         if len(channelsAndEvents) == 0 {
224                 return s.MSet(keyAndData...)
225         }
226         if err := s.checkChannelsAndEvents("SetAndPublish", channelsAndEvents); err != nil {
227                 return err
228         }
229         channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
230         return s.MSetPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], keyAndData...)
231 }
232
233 //Set function writes data to shared data layer storage. Writing is done
234 //atomically, i.e. all succeeds or fails.
235 //Data to be written is given as key-value pairs. Several key-value
236 //pairs can be written with one call.
237 //The key is expected to be string whereas value can be anything, string,
238 //number, slice array or map
239 func (s *SdlInstance) Set(pairs ...interface{}) error {
240         if len(pairs) == 0 {
241                 return nil
242         }
243
244         keyAndData, err := s.setNamespaceToKeys(pairs...)
245         if err != nil {
246                 return err
247         }
248         return s.MSet(keyAndData...)
249 }
250
251 //Get function atomically reads one or more keys from SDL. The returned map has the
252 //requested keys as index and data as value. If the requested key is not found
253 //from SDL, it's value is nil
254 func (s *SdlInstance) Get(keys []string) (map[string]interface{}, error) {
255         m := make(map[string]interface{})
256         if len(keys) == 0 {
257                 return m, nil
258         }
259
260         var keysWithNs []string
261         for _, v := range keys {
262                 keysWithNs = append(keysWithNs, s.nsPrefix+v)
263         }
264         val, err := s.MGet(keysWithNs)
265         if err != nil {
266                 return m, err
267         }
268         for i, v := range val {
269                 m[keys[i]] = v
270         }
271         return m, err
272 }
273
274 //SetIfAndPublish atomically replaces existing data with newData in SDL if data matches the oldData.
275 //If replace was done successfully, true will be returned. Also, if publishing was successfull, an event
276 //is published to a given channel.
277 func (s *SdlInstance) SetIfAndPublish(channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error) {
278         if len(channelsAndEvents) == 0 {
279                 return s.SetIE(s.nsPrefix+key, oldData, newData)
280         }
281         if err := s.checkChannelsAndEvents("SetIfAndPublish", channelsAndEvents); err != nil {
282                 return false, err
283         }
284         channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
285         return s.SetIEPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], s.nsPrefix+key, oldData, newData)
286 }
287
288 //SetIf atomically replaces existing data with newData in SDL if data matches the oldData.
289 //If replace was done successfully, true will be returned.
290 func (s *SdlInstance) SetIf(key string, oldData, newData interface{}) (bool, error) {
291         return s.SetIE(s.nsPrefix+key, oldData, newData)
292 }
293
294 //SetIfNotExistsAndPublish conditionally sets the value of a key. If key already exists in SDL,
295 //then it's value is not changed. Checking the key existence and potential set operation
296 //is done atomically. If the set operation was done successfully, an event is published to a
297 //given channel.
298 func (s *SdlInstance) SetIfNotExistsAndPublish(channelsAndEvents []string, key string, data interface{}) (bool, error) {
299         if len(channelsAndEvents) == 0 {
300                 return s.SetNX(s.nsPrefix+key, data)
301         }
302         if err := s.checkChannelsAndEvents("SetIfNotExistsAndPublish", channelsAndEvents); err != nil {
303                 return false, err
304         }
305         channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
306         return s.SetNXPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], s.nsPrefix+key, data)
307 }
308
309 //SetIfNotExists conditionally sets the value of a key. If key already exists in SDL,
310 //then it's value is not changed. Checking the key existence and potential set operation
311 //is done atomically.
312 func (s *SdlInstance) SetIfNotExists(key string, data interface{}) (bool, error) {
313         return s.SetNX(s.nsPrefix+key, data)
314 }
315
316 //RemoveAndPublish removes data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
317 //An event is published into a given channel if remove operation is successfull.
318 func (s *SdlInstance) RemoveAndPublish(channelsAndEvents []string, keys []string) error {
319         if len(keys) == 0 {
320                 return nil
321         }
322
323         var keysWithNs []string
324         for _, v := range keys {
325                 keysWithNs = append(keysWithNs, s.nsPrefix+v)
326         }
327         if len(channelsAndEvents) == 0 {
328                 return s.Del(keysWithNs)
329         }
330         if err := s.checkChannelsAndEvents("RemoveAndPublish", channelsAndEvents); err != nil {
331                 return err
332         }
333         channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
334         return s.DelPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], keysWithNs)
335 }
336
337 //Remove data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
338 func (s *SdlInstance) Remove(keys []string) error {
339         if len(keys) == 0 {
340                 return nil
341         }
342
343         var keysWithNs []string
344         for _, v := range keys {
345                 keysWithNs = append(keysWithNs, s.nsPrefix+v)
346         }
347         err := s.Del(keysWithNs)
348         return err
349 }
350
351 //RemoveIfAndPublish removes data from SDL conditionally and if remove was done successfully,
352 //a given event is published to channel. If existing data matches given data,
353 //key and data are removed from SDL. If remove was done successfully, true is returned.
354 func (s *SdlInstance) RemoveIfAndPublish(channelsAndEvents []string, key string, data interface{}) (bool, error) {
355         if len(channelsAndEvents) == 0 {
356                 return s.DelIE(s.nsPrefix+key, data)
357         }
358         if err := s.checkChannelsAndEvents("RemoveIfAndPublish", channelsAndEvents); err != nil {
359                 return false, err
360         }
361         channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
362         return s.DelIEPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], s.nsPrefix+key, data)
363 }
364
365 //RemoveIf removes data from SDL conditionally. If existing data matches given data,
366 //key and data are removed from SDL. If remove was done successfully, true is returned.
367 func (s *SdlInstance) RemoveIf(key string, data interface{}) (bool, error) {
368         status, err := s.DelIE(s.nsPrefix+key, data)
369         if err != nil {
370                 return false, err
371         }
372         return status, nil
373 }
374
375 //GetAll returns all keys under the namespace. No prior knowledge about the keys in the
376 //given namespace exists, thus operation is not guaranteed to be atomic or isolated.
377 func (s *SdlInstance) GetAll() ([]string, error) {
378         keys, err := s.Keys(s.nsPrefix + "*")
379         var retVal []string
380         if err != nil {
381                 return retVal, err
382         }
383         for _, v := range keys {
384                 retVal = append(retVal, strings.Split(v, s.nsPrefix)[1])
385         }
386         return retVal, err
387 }
388
389 //RemoveAll removes all keys under the namespace. Remove operation is not atomic, thus
390 //it is not guaranteed that all keys are removed.
391 func (s *SdlInstance) RemoveAll() error {
392         keys, err := s.Keys(s.nsPrefix + "*")
393         if err != nil {
394                 return err
395         }
396         if (keys != nil) && (len(keys) != 0) {
397                 err = s.Del(keys)
398         }
399         return err
400 }
401
402 func (s *SdlInstance) RemoveAllAndPublish(channelsAndEvents []string) error {
403         keys, err := s.Keys(s.nsPrefix + "*")
404         if err != nil {
405                 return err
406         }
407         if (keys != nil) && (len(keys) != 0) {
408                 if len(channelsAndEvents) == 0 {
409                         return s.Del(keys)
410                 }
411                 if err := s.checkChannelsAndEvents("RemoveIfAndPublish", channelsAndEvents); err != nil {
412                         return err
413                 }
414                 channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
415                 err = s.DelPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], keys)
416         }
417         return err
418
419 }