Support multiple event publishing
[ric-plt/sdlgo.git] / sdl.go
1 /*
2    Copyright (c) 2019 AT&T Intellectual Property.
3    Copyright (c) 2018-2019 Nokia.
4
5    Licensed under the Apache License, Version 2.0 (the "License");
6    you may not use this file except in compliance with the License.
7    You may obtain a copy of the License at
8
9        http://www.apache.org/licenses/LICENSE-2.0
10
11    Unless required by applicable law or agreed to in writing, software
12    distributed under the License is distributed on an "AS IS" BASIS,
13    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14    See the License for the specific language governing permissions and
15    limitations under the License.
16 */
17
18 package sdlgo
19
20 import (
21         "errors"
22         "fmt"
23         "reflect"
24         "strings"
25
26         "gerrit.o-ran-sc.org/r/ric-plt/sdlgo/internal/sdlgoredis"
27 )
28
29 //SdlInstance provides an API to read, write and modify
30 //key-value pairs in a given namespace.
31 type SdlInstance struct {
32         nameSpace      string
33         nsPrefix       string
34         eventSeparator string
35         iDatabase
36 }
37
38 //NewDatabase creates a connection to database that will be used
39 //as a backend for the key-value storage. The returned value shall
40 //be given as a parameter when calling NewKeyValStorage
41 func NewDatabase() *sdlgoredis.DB {
42         return sdlgoredis.Create()
43 }
44
45 //NewSdlInstance creates a new sdl instance using the given namespace.
46 //The database used as a backend is given as a parameter
47 func NewSdlInstance(NameSpace string, db iDatabase) *SdlInstance {
48         return &SdlInstance{
49                 nameSpace:      NameSpace,
50                 nsPrefix:       "{" + NameSpace + "},",
51                 eventSeparator: "___",
52                 iDatabase:      db,
53         }
54 }
55
56 //SubscribeChannel lets you to subscribe for a events on a given channels.
57 //SDL notifications are events that are published on a specific channels.
58 //Both the channel and events are defined by the entity that is publishing
59 //the events.
60 //
61 //When subscribing for a channel, a callback function is given as a parameter.
62 //Whenever a notification is received from a channel, this callback is called
63 //with channel and notifications as parameter (several notifications could be
64 //packed to a single callback function call). A call to SubscribeChannel function
65 //returns immediatelly, callbacks will be called asyncronously.
66 //
67 //It is possible to subscribe to different channels using different callbacks. In
68 //this case simply use SubscribeChannel function separately for each channel.
69 //
70 //When receiving events in callback routine, it is a good practive to return from
71 //callback as quickly as possible. E.g. reading in callback context should be avoided
72 //and using of Go signals is recommended. Also it should be noted that in case of several
73 //events received from different channels, callbacks are called in series one by one.
74 func (s *SdlInstance) SubscribeChannel(cb func(string, ...string), channels ...string) error {
75         s.SubscribeChannelDB(cb, s.nsPrefix, s.eventSeparator, s.setNamespaceToChannels(channels...)...)
76         return nil
77 }
78
79 //UnsubscribeChannel removes subscription from one or several channels.
80 func (s *SdlInstance) UnsubscribeChannel(channels ...string) error {
81         s.UnsubscribeChannelDB(s.setNamespaceToChannels(channels...)...)
82         return nil
83 }
84
85 //Close connection to backend database.
86 func (s *SdlInstance) Close() error {
87         return s.CloseDB()
88 }
89
90 func (s *SdlInstance) checkChannelsAndEvents(cmd string, channelsAndEvents []string) error {
91         if len(channelsAndEvents)%2 != 0 {
92                 return fmt.Errorf("%s: Channels and events must be given as pairs", cmd)
93         }
94         for i, v := range channelsAndEvents {
95                 if i%2 != 0 {
96                         if strings.Contains(v, s.eventSeparator) {
97                                 return fmt.Errorf("%s: event %s contains illegal substring (\"%s\")", cmd, v, s.eventSeparator)
98                         }
99                 }
100         }
101         return nil
102 }
103 func (s *SdlInstance) setNamespaceToChannels(channels ...string) []string {
104         var retVal []string
105         for _, v := range channels {
106                 retVal = append(retVal, s.nsPrefix+v)
107         }
108         return retVal
109 }
110
111 func (s *SdlInstance) setNamespaceToKeys(pairs ...interface{}) ([]interface{}, error) {
112         retVal := make([]interface{}, 0)
113         shouldBeKey := true
114         for _, v := range pairs {
115                 reflectType := reflect.TypeOf(v)
116                 switch reflectType.Kind() {
117                 case reflect.Map:
118                         x := reflect.ValueOf(v).MapRange()
119                         for x.Next() {
120                                 retVal = append(retVal, s.nsPrefix+x.Key().Interface().(string))
121                                 retVal = append(retVal, x.Value().Interface())
122                         }
123                 case reflect.Slice:
124                         if shouldBeKey {
125                                 x := reflect.ValueOf(v)
126                                 if x.Len()%2 != 0 {
127                                         return []interface{}{}, errors.New("Key/value pairs doesn't match")
128                                 }
129                                 for i2 := 0; i2 < x.Len(); i2++ {
130                                         if i2%2 == 0 {
131                                                 retVal = append(retVal, s.nsPrefix+x.Index(i2).Interface().(string))
132                                         } else {
133                                                 retVal = append(retVal, x.Index(i2).Interface())
134                                         }
135                                 }
136                         } else {
137                                 if reflectType.Elem().Kind() == reflect.Uint8 {
138                                         retVal = append(retVal, v)
139                                         shouldBeKey = true
140                                 } else {
141                                         return []interface{}{}, errors.New("Key/value pairs doesn't match")
142                                 }
143                         }
144                 case reflect.Array:
145                         if shouldBeKey {
146                                 x := reflect.ValueOf(v)
147                                 if x.Len()%2 != 0 {
148                                         return []interface{}{}, errors.New("Key/value pairs doesn't match")
149                                 }
150                                 for i2 := 0; i2 < x.Len(); i2++ {
151                                         if i2%2 == 0 {
152                                                 retVal = append(retVal, s.nsPrefix+x.Index(i2).Interface().(string))
153                                         } else {
154                                                 retVal = append(retVal, x.Index(i2).Interface())
155                                         }
156                                 }
157                         } else {
158                                 if reflectType.Elem().Kind() == reflect.Uint8 {
159                                         retVal = append(retVal, v)
160                                         shouldBeKey = true
161                                 } else {
162                                         return []interface{}{}, errors.New("Key/value pairs doesn't match")
163                                 }
164                         }
165                 default:
166                         if shouldBeKey {
167                                 retVal = append(retVal, s.nsPrefix+v.(string))
168                                 shouldBeKey = false
169                         } else {
170                                 retVal = append(retVal, v)
171                                 shouldBeKey = true
172                         }
173                 }
174         }
175         if len(retVal)%2 != 0 {
176                 return []interface{}{}, errors.New("Key/value pairs doesn't match")
177         }
178         return retVal, nil
179 }
180
181 func (s *SdlInstance) prepareChannelsAndEvents(channelsAndEvents []string) []string {
182         channelEventMap := make(map[string]string)
183         for i, v := range channelsAndEvents {
184                 if i%2 != 0 {
185                         continue
186                 }
187                 _, exists := channelEventMap[v]
188                 if exists {
189                         channelEventMap[v] = channelEventMap[v] + s.eventSeparator + channelsAndEvents[i+1]
190                 } else {
191                         channelEventMap[v] = channelsAndEvents[i+1]
192                 }
193         }
194         retVal := make([]string, 0)
195         for k, v := range channelEventMap {
196                 retVal = append(retVal, s.nsPrefix+k)
197                 retVal = append(retVal, v)
198         }
199         return retVal
200 }
201
202 //SetAndPublish function writes data to shared data layer storage and sends an event to
203 //a channel. Writing is done atomically, i.e. all succeeds or fails.
204 //Data to be written is given as key-value pairs. Several key-value
205 //pairs can be written with one call.
206 //The key is expected to be string whereas value can be anything, string,
207 //number, slice array or map
208 //
209 //If data was set successfully, an event is sent to a channel.
210 //Channels and events are given as pairs is channelsAndEvents parameter.
211 //It is possible to send several events to several channels by giving several
212 //channel-event pairs.
213 //  E.g. []{"channel1", "event1", "channel2", "event2", "channel1", "event3"}
214 //will send event1 and event3 to channel1 and event2 to channel2.
215 func (s *SdlInstance) SetAndPublish(channelsAndEvents []string, pairs ...interface{}) error {
216         keyAndData, err := s.setNamespaceToKeys(pairs...)
217         if err != nil {
218                 return err
219         }
220         if len(channelsAndEvents) == 0 {
221                 return s.MSet(keyAndData...)
222         }
223         if err := s.checkChannelsAndEvents("SetAndPublish", channelsAndEvents); err != nil {
224                 return err
225         }
226         channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
227         return s.MSetMPub(channelsAndEventsPrepared, keyAndData...)
228 }
229
230 //Set function writes data to shared data layer storage. Writing is done
231 //atomically, i.e. all succeeds or fails.
232 //Data to be written is given as key-value pairs. Several key-value
233 //pairs can be written with one call.
234 //The key is expected to be string whereas value can be anything, string,
235 //number, slice array or map
236 func (s *SdlInstance) Set(pairs ...interface{}) error {
237         if len(pairs) == 0 {
238                 return nil
239         }
240
241         keyAndData, err := s.setNamespaceToKeys(pairs...)
242         if err != nil {
243                 return err
244         }
245         return s.MSet(keyAndData...)
246 }
247
248 //Get function atomically reads one or more keys from SDL. The returned map has the
249 //requested keys as index and data as value. If the requested key is not found
250 //from SDL, it's value is nil
251 func (s *SdlInstance) Get(keys []string) (map[string]interface{}, error) {
252         m := make(map[string]interface{})
253         if len(keys) == 0 {
254                 return m, nil
255         }
256
257         var keysWithNs []string
258         for _, v := range keys {
259                 keysWithNs = append(keysWithNs, s.nsPrefix+v)
260         }
261         val, err := s.MGet(keysWithNs)
262         if err != nil {
263                 return m, err
264         }
265         for i, v := range val {
266                 m[keys[i]] = v
267         }
268         return m, err
269 }
270
271 //SetIfAndPublish atomically replaces existing data with newData in SDL if data matches the oldData.
272 //If replace was done successfully, true will be returned. Also, if publishing was successfull, an event
273 //is published to a given channel.
274 func (s *SdlInstance) SetIfAndPublish(channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error) {
275         if len(channelsAndEvents) == 0 {
276                 return s.SetIE(s.nsPrefix+key, oldData, newData)
277         }
278         if err := s.checkChannelsAndEvents("SetIfAndPublish", channelsAndEvents); err != nil {
279                 return false, err
280         }
281         channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
282         return s.SetIEPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], s.nsPrefix+key, oldData, newData)
283 }
284
285 //SetIf atomically replaces existing data with newData in SDL if data matches the oldData.
286 //If replace was done successfully, true will be returned.
287 func (s *SdlInstance) SetIf(key string, oldData, newData interface{}) (bool, error) {
288         return s.SetIE(s.nsPrefix+key, oldData, newData)
289 }
290
291 //SetIfNotExistsAndPublish conditionally sets the value of a key. If key already exists in SDL,
292 //then it's value is not changed. Checking the key existence and potential set operation
293 //is done atomically. If the set operation was done successfully, an event is published to a
294 //given channel.
295 func (s *SdlInstance) SetIfNotExistsAndPublish(channelsAndEvents []string, key string, data interface{}) (bool, error) {
296         if len(channelsAndEvents) == 0 {
297                 return s.SetNX(s.nsPrefix+key, data)
298         }
299         if err := s.checkChannelsAndEvents("SetIfNotExistsAndPublish", channelsAndEvents); err != nil {
300                 return false, err
301         }
302         channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
303         return s.SetNXPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], s.nsPrefix+key, data)
304 }
305
306 //SetIfNotExists conditionally sets the value of a key. If key already exists in SDL,
307 //then it's value is not changed. Checking the key existence and potential set operation
308 //is done atomically.
309 func (s *SdlInstance) SetIfNotExists(key string, data interface{}) (bool, error) {
310         return s.SetNX(s.nsPrefix+key, data)
311 }
312
313 //RemoveAndPublish removes data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
314 //Trying to remove a nonexisting key is not considered as an error.
315 //An event is published into a given channel if remove operation is successfull and
316 //at least one key is removed (if several keys given). If the given key(s) doesn't exist
317 //when trying to remove, no event is published.
318 func (s *SdlInstance) RemoveAndPublish(channelsAndEvents []string, keys []string) error {
319         if len(keys) == 0 {
320                 return nil
321         }
322
323         var keysWithNs []string
324         for _, v := range keys {
325                 keysWithNs = append(keysWithNs, s.nsPrefix+v)
326         }
327         if len(channelsAndEvents) == 0 {
328                 return s.Del(keysWithNs)
329         }
330         if err := s.checkChannelsAndEvents("RemoveAndPublish", channelsAndEvents); err != nil {
331                 return err
332         }
333         channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
334         return s.DelMPub(channelsAndEventsPrepared, keysWithNs)
335 }
336
337 //Remove data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
338 func (s *SdlInstance) Remove(keys []string) error {
339         if len(keys) == 0 {
340                 return nil
341         }
342
343         var keysWithNs []string
344         for _, v := range keys {
345                 keysWithNs = append(keysWithNs, s.nsPrefix+v)
346         }
347         err := s.Del(keysWithNs)
348         return err
349 }
350
351 //RemoveIfAndPublish removes data from SDL conditionally and if remove was done successfully,
352 //a given event is published to channel. If existing data matches given data,
353 //key and data are removed from SDL. If remove was done successfully, true is returned.
354 func (s *SdlInstance) RemoveIfAndPublish(channelsAndEvents []string, key string, data interface{}) (bool, error) {
355         if len(channelsAndEvents) == 0 {
356                 return s.DelIE(s.nsPrefix+key, data)
357         }
358         if err := s.checkChannelsAndEvents("RemoveIfAndPublish", channelsAndEvents); err != nil {
359                 return false, err
360         }
361         channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
362         return s.DelIEPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], s.nsPrefix+key, data)
363 }
364
365 //RemoveIf removes data from SDL conditionally. If existing data matches given data,
366 //key and data are removed from SDL. If remove was done successfully, true is returned.
367 func (s *SdlInstance) RemoveIf(key string, data interface{}) (bool, error) {
368         status, err := s.DelIE(s.nsPrefix+key, data)
369         if err != nil {
370                 return false, err
371         }
372         return status, nil
373 }
374
375 //GetAll returns all keys under the namespace. No prior knowledge about the keys in the
376 //given namespace exists, thus operation is not guaranteed to be atomic or isolated.
377 func (s *SdlInstance) GetAll() ([]string, error) {
378         keys, err := s.Keys(s.nsPrefix + "*")
379         var retVal []string
380         if err != nil {
381                 return retVal, err
382         }
383         for _, v := range keys {
384                 retVal = append(retVal, strings.Split(v, s.nsPrefix)[1])
385         }
386         return retVal, err
387 }
388
389 //RemoveAll removes all keys under the namespace. Remove operation is not atomic, thus
390 //it is not guaranteed that all keys are removed.
391 func (s *SdlInstance) RemoveAll() error {
392         keys, err := s.Keys(s.nsPrefix + "*")
393         if err != nil {
394                 return err
395         }
396         if (keys != nil) && (len(keys) != 0) {
397                 err = s.Del(keys)
398         }
399         return err
400 }
401
402 //RemoveAllAndPublish removes all keys under the namespace and if successfull, it
403 //will publish an event to given channel. This operation is not atomic, thus it is
404 //not guaranteed that all keys are removed.
405 func (s *SdlInstance) RemoveAllAndPublish(channelsAndEvents []string) error {
406         keys, err := s.Keys(s.nsPrefix + "*")
407         if err != nil {
408                 return err
409         }
410         if (keys != nil) && (len(keys) != 0) {
411                 if len(channelsAndEvents) == 0 {
412                         return s.Del(keys)
413                 }
414                 if err := s.checkChannelsAndEvents("RemoveIfAndPublish", channelsAndEvents); err != nil {
415                         return err
416                 }
417                 channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
418                 err = s.DelMPub(channelsAndEventsPrepared, keys)
419         }
420         return err
421 }
422
423 //AddMember adds a new members to a group.
424 //
425 //SDL groups are unordered collections of members where each member is
426 //unique. It is possible to add the same member several times without the
427 //need to check if it already exists.
428 func (s *SdlInstance) AddMember(group string, member ...interface{}) error {
429         return s.SAdd(s.nsPrefix+group, member...)
430 }
431
432 //RemoveMember removes members from a group.
433 func (s *SdlInstance) RemoveMember(group string, member ...interface{}) error {
434         return s.SRem(s.nsPrefix+group, member...)
435 }
436
437 //RemoveGroup removes the whole group along with it's members.
438 func (s *SdlInstance) RemoveGroup(group string) error {
439         return s.Del([]string{s.nsPrefix + group})
440 }
441
442 //GetMembers returns all the members from a group.
443 func (s *SdlInstance) GetMembers(group string) ([]string, error) {
444         retVal, err := s.SMembers(s.nsPrefix + group)
445         if err != nil {
446                 return []string{}, err
447         }
448         return retVal, err
449 }
450
451 //IsMember returns true if given member is found from a group.
452 func (s *SdlInstance) IsMember(group string, member interface{}) (bool, error) {
453         retVal, err := s.SIsMember(s.nsPrefix+group, member)
454         if err != nil {
455                 return false, err
456         }
457         return retVal, err
458 }
459
460 //GroupSize returns the number of members in a group.
461 func (s *SdlInstance) GroupSize(group string) (int64, error) {
462         retVal, err := s.SCard(s.nsPrefix + group)
463         if err != nil {
464                 return 0, err
465         }
466         return retVal, err
467 }
468
469 type iDatabase interface {
470         SubscribeChannelDB(cb sdlgoredis.ChannelNotificationCb, channelPrefix, eventSeparator string, channels ...string)
471         UnsubscribeChannelDB(channels ...string)
472         MSet(pairs ...interface{}) error
473         MSetMPub(channelsAndEvents []string, pairs ...interface{}) error
474         MGet(keys []string) ([]interface{}, error)
475         CloseDB() error
476         Del(keys []string) error
477         DelMPub(channelsAndEvents []string, keys []string) error
478         Keys(key string) ([]string, error)
479         SetIE(key string, oldData, newData interface{}) (bool, error)
480         SetIEPub(channel, message, key string, oldData, newData interface{}) (bool, error)
481         SetNX(key string, data interface{}) (bool, error)
482         SetNXPub(channel, message, key string, data interface{}) (bool, error)
483         DelIE(key string, data interface{}) (bool, error)
484         DelIEPub(channel, message, key string, data interface{}) (bool, error)
485         SAdd(key string, data ...interface{}) error
486         SRem(key string, data ...interface{}) error
487         SMembers(key string) ([]string, error)
488         SIsMember(key string, data interface{}) (bool, error)
489         SCard(key string) (int64, error)
490 }