Add support for SDL groups
[ric-plt/sdlgo.git] / sdl.go
1 /*
2    Copyright (c) 2019 AT&T Intellectual Property.
3    Copyright (c) 2018-2019 Nokia.
4
5    Licensed under the Apache License, Version 2.0 (the "License");
6    you may not use this file except in compliance with the License.
7    You may obtain a copy of the License at
8
9        http://www.apache.org/licenses/LICENSE-2.0
10
11    Unless required by applicable law or agreed to in writing, software
12    distributed under the License is distributed on an "AS IS" BASIS,
13    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14    See the License for the specific language governing permissions and
15    limitations under the License.
16 */
17
18 package sdlgo
19
20 import (
21         "errors"
22         "fmt"
23         "reflect"
24         "strings"
25
26         "gerrit.o-ran-sc.org/r/ric-plt/sdlgo/internal/sdlgoredis"
27 )
28
29 //SdlInstance provides an API to read, write and modify
30 //key-value pairs in a given namespace.
31 type SdlInstance struct {
32         nameSpace      string
33         nsPrefix       string
34         eventSeparator string
35         iDatabase
36 }
37
38 //NewDatabase creates a connection to database that will be used
39 //as a backend for the key-value storage. The returned value shall
40 //be given as a parameter when calling NewKeyValStorage
41 func NewDatabase() *sdlgoredis.DB {
42         return sdlgoredis.Create()
43 }
44
45 //NewSdlInstance creates a new sdl instance using the given namespace.
46 //The database used as a backend is given as a parameter
47 func NewSdlInstance(NameSpace string, db iDatabase) *SdlInstance {
48         return &SdlInstance{
49                 nameSpace:      NameSpace,
50                 nsPrefix:       "{" + NameSpace + "},",
51                 eventSeparator: "___",
52                 iDatabase:      db,
53         }
54 }
55
56 //SubscribeChannel lets you to subscribe for a events on a given channels.
57 //SDL notifications are events that are published on a specific channels.
58 //Both the channel and events are defined by the entity that is publishing
59 //the events.
60 //
61 //When subscribing for a channel, a callback function is given as a parameter.
62 //Whenever a notification is received from a channel, this callback is called
63 //with channel and notifications as parameter (several notifications could be
64 //packed to a single callback function call). A call to SubscribeChannel function
65 //returns immediatelly, callbacks will be called asyncronously.
66 //
67 //It is possible to subscribe to different channels using different callbacks. In
68 //this case simply use SubscribeChannel function separately for each channel.
69 //
70 //When receiving events in callback routine, it is a good practive to return from
71 //callback as quickly as possible. E.g. reading in callback context should be avoided
72 //and using of Go signals is recommended. Also it should be noted that in case of several
73 //events received from different channels, callbacks are called in series one by one.
74 func (s *SdlInstance) SubscribeChannel(cb func(string, ...string), channels ...string) error {
75         s.SubscribeChannelDB(cb, s.nsPrefix, s.eventSeparator, s.setNamespaceToChannels(channels...)...)
76         return nil
77 }
78
79 //UnsubscribeChannel removes subscription from one or several channels.
80 func (s *SdlInstance) UnsubscribeChannel(channels ...string) error {
81         s.UnsubscribeChannelDB(s.setNamespaceToChannels(channels...)...)
82         return nil
83 }
84
85 //Close connection to backend database.
86 func (s *SdlInstance) Close() error {
87         return s.CloseDB()
88 }
89
90 func (s *SdlInstance) checkChannelsAndEvents(cmd string, channelsAndEvents []string) error {
91         if len(channelsAndEvents)%2 != 0 {
92                 return fmt.Errorf("%s: Channels and events must be given as pairs", cmd)
93         }
94         for i, v := range channelsAndEvents {
95                 if i%2 != 0 {
96                         if strings.Contains(v, s.eventSeparator) {
97                                 return fmt.Errorf("%s: event %s contains illegal substring (\"%s\")", cmd, v, s.eventSeparator)
98                         }
99                 }
100         }
101         return nil
102 }
103 func (s *SdlInstance) setNamespaceToChannels(channels ...string) []string {
104         var retVal []string
105         for _, v := range channels {
106                 retVal = append(retVal, s.nsPrefix+v)
107         }
108         return retVal
109 }
110
111 func (s *SdlInstance) setNamespaceToKeys(pairs ...interface{}) ([]interface{}, error) {
112         retVal := make([]interface{}, 0)
113         shouldBeKey := true
114         for _, v := range pairs {
115                 reflectType := reflect.TypeOf(v)
116                 switch reflectType.Kind() {
117                 case reflect.Slice:
118                         if shouldBeKey {
119                                 x := reflect.ValueOf(v)
120                                 if x.Len()%2 != 0 {
121                                         return []interface{}{}, errors.New("Key/value pairs doesn't match")
122                                 }
123                                 for i2 := 0; i2 < x.Len(); i2++ {
124                                         if i2%2 == 0 {
125                                                 retVal = append(retVal, s.nsPrefix+x.Index(i2).Interface().(string))
126                                         } else {
127                                                 retVal = append(retVal, x.Index(i2).Interface())
128                                         }
129                                 }
130                         } else {
131                                 if reflectType.Elem().Kind() == reflect.Uint8 {
132                                         retVal = append(retVal, v)
133                                         shouldBeKey = true
134                                 } else {
135                                         return []interface{}{}, errors.New("Key/value pairs doesn't match")
136                                 }
137                         }
138                 case reflect.Array:
139                         if shouldBeKey {
140                                 x := reflect.ValueOf(v)
141                                 if x.Len()%2 != 0 {
142                                         return []interface{}{}, errors.New("Key/value pairs doesn't match")
143                                 }
144                                 for i2 := 0; i2 < x.Len(); i2++ {
145                                         if i2%2 == 0 {
146                                                 retVal = append(retVal, s.nsPrefix+x.Index(i2).Interface().(string))
147                                         } else {
148                                                 retVal = append(retVal, x.Index(i2).Interface())
149                                         }
150                                 }
151                         } else {
152                                 if reflectType.Elem().Kind() == reflect.Uint8 {
153                                         retVal = append(retVal, v)
154                                         shouldBeKey = true
155                                 } else {
156                                         return []interface{}{}, errors.New("Key/value pairs doesn't match")
157                                 }
158                         }
159                 default:
160                         if shouldBeKey {
161                                 retVal = append(retVal, s.nsPrefix+v.(string))
162                                 shouldBeKey = false
163                         } else {
164                                 retVal = append(retVal, v)
165                                 shouldBeKey = true
166                         }
167                 }
168         }
169         if len(retVal)%2 != 0 {
170                 return []interface{}{}, errors.New("Key/value pairs doesn't match")
171         }
172         return retVal, nil
173 }
174
175 func (s *SdlInstance) prepareChannelsAndEvents(channelsAndEvents []string) []string {
176         channelEventMap := make(map[string]string)
177         for i, v := range channelsAndEvents {
178                 if i%2 != 0 {
179                         continue
180                 }
181                 _, exists := channelEventMap[v]
182                 if exists {
183                         channelEventMap[v] = channelEventMap[v] + s.eventSeparator + channelsAndEvents[i+1]
184                 } else {
185                         channelEventMap[v] = channelsAndEvents[i+1]
186                 }
187         }
188         retVal := make([]string, 0)
189         for k, v := range channelEventMap {
190                 retVal = append(retVal, s.nsPrefix+k)
191                 retVal = append(retVal, v)
192         }
193         return retVal
194 }
195
196 //SetAndPublish function writes data to shared data layer storage and send an event to
197 //a channel. Writing is done atomically, i.e. all succeeds or fails.
198 //Data to be written is given as key-value pairs. Several key-value
199 //pairs can be written with one call.
200 //The key is expected to be string whereas value can be anything, string,
201 //number, slice array or map
202 //
203 //Channels and events are given as pairs is channelsAndEvents parameter.
204 //Although it is possible to give sevral channel-event pairs, current implementation
205 //supports sending events to one channel only due to missing support in DB backend.
206 func (s *SdlInstance) SetAndPublish(channelsAndEvents []string, pairs ...interface{}) error {
207         if len(pairs)%2 != 0 {
208                 return errors.New("Invalid pairs parameter")
209         }
210
211         keyAndData, err := s.setNamespaceToKeys(pairs...)
212         if err != nil {
213                 return err
214         }
215         if len(channelsAndEvents) == 0 {
216                 return s.MSet(keyAndData...)
217         }
218         if err := s.checkChannelsAndEvents("SetAndPublish", channelsAndEvents); err != nil {
219                 return err
220         }
221         channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
222         return s.MSetPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], keyAndData...)
223 }
224
225 //Set function writes data to shared data layer storage. Writing is done
226 //atomically, i.e. all succeeds or fails.
227 //Data to be written is given as key-value pairs. Several key-value
228 //pairs can be written with one call.
229 //The key is expected to be string whereas value can be anything, string,
230 //number, slice array or map
231 func (s *SdlInstance) Set(pairs ...interface{}) error {
232         if len(pairs) == 0 {
233                 return nil
234         }
235
236         keyAndData, err := s.setNamespaceToKeys(pairs...)
237         if err != nil {
238                 return err
239         }
240         return s.MSet(keyAndData...)
241 }
242
243 //Get function atomically reads one or more keys from SDL. The returned map has the
244 //requested keys as index and data as value. If the requested key is not found
245 //from SDL, it's value is nil
246 func (s *SdlInstance) Get(keys []string) (map[string]interface{}, error) {
247         m := make(map[string]interface{})
248         if len(keys) == 0 {
249                 return m, nil
250         }
251
252         var keysWithNs []string
253         for _, v := range keys {
254                 keysWithNs = append(keysWithNs, s.nsPrefix+v)
255         }
256         val, err := s.MGet(keysWithNs)
257         if err != nil {
258                 return m, err
259         }
260         for i, v := range val {
261                 m[keys[i]] = v
262         }
263         return m, err
264 }
265
266 //SetIfAndPublish atomically replaces existing data with newData in SDL if data matches the oldData.
267 //If replace was done successfully, true will be returned. Also, if publishing was successfull, an event
268 //is published to a given channel.
269 func (s *SdlInstance) SetIfAndPublish(channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error) {
270         if len(channelsAndEvents) == 0 {
271                 return s.SetIE(s.nsPrefix+key, oldData, newData)
272         }
273         if err := s.checkChannelsAndEvents("SetIfAndPublish", channelsAndEvents); err != nil {
274                 return false, err
275         }
276         channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
277         return s.SetIEPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], s.nsPrefix+key, oldData, newData)
278 }
279
280 //SetIf atomically replaces existing data with newData in SDL if data matches the oldData.
281 //If replace was done successfully, true will be returned.
282 func (s *SdlInstance) SetIf(key string, oldData, newData interface{}) (bool, error) {
283         return s.SetIE(s.nsPrefix+key, oldData, newData)
284 }
285
286 //SetIfNotExistsAndPublish conditionally sets the value of a key. If key already exists in SDL,
287 //then it's value is not changed. Checking the key existence and potential set operation
288 //is done atomically. If the set operation was done successfully, an event is published to a
289 //given channel.
290 func (s *SdlInstance) SetIfNotExistsAndPublish(channelsAndEvents []string, key string, data interface{}) (bool, error) {
291         if len(channelsAndEvents) == 0 {
292                 return s.SetNX(s.nsPrefix+key, data)
293         }
294         if err := s.checkChannelsAndEvents("SetIfNotExistsAndPublish", channelsAndEvents); err != nil {
295                 return false, err
296         }
297         channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
298         return s.SetNXPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], s.nsPrefix+key, data)
299 }
300
301 //SetIfNotExists conditionally sets the value of a key. If key already exists in SDL,
302 //then it's value is not changed. Checking the key existence and potential set operation
303 //is done atomically.
304 func (s *SdlInstance) SetIfNotExists(key string, data interface{}) (bool, error) {
305         return s.SetNX(s.nsPrefix+key, data)
306 }
307
308 //RemoveAndPublish removes data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
309 //An event is published into a given channel if remove operation is successfull.
310 func (s *SdlInstance) RemoveAndPublish(channelsAndEvents []string, keys []string) error {
311         if len(keys) == 0 {
312                 return nil
313         }
314
315         var keysWithNs []string
316         for _, v := range keys {
317                 keysWithNs = append(keysWithNs, s.nsPrefix+v)
318         }
319         if len(channelsAndEvents) == 0 {
320                 return s.Del(keysWithNs)
321         }
322         if err := s.checkChannelsAndEvents("RemoveAndPublish", channelsAndEvents); err != nil {
323                 return err
324         }
325         channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
326         return s.DelPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], keysWithNs)
327 }
328
329 //Remove data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
330 func (s *SdlInstance) Remove(keys []string) error {
331         if len(keys) == 0 {
332                 return nil
333         }
334
335         var keysWithNs []string
336         for _, v := range keys {
337                 keysWithNs = append(keysWithNs, s.nsPrefix+v)
338         }
339         err := s.Del(keysWithNs)
340         return err
341 }
342
343 //RemoveIfAndPublish removes data from SDL conditionally and if remove was done successfully,
344 //a given event is published to channel. If existing data matches given data,
345 //key and data are removed from SDL. If remove was done successfully, true is returned.
346 func (s *SdlInstance) RemoveIfAndPublish(channelsAndEvents []string, key string, data interface{}) (bool, error) {
347         if len(channelsAndEvents) == 0 {
348                 return s.DelIE(s.nsPrefix+key, data)
349         }
350         if err := s.checkChannelsAndEvents("RemoveIfAndPublish", channelsAndEvents); err != nil {
351                 return false, err
352         }
353         channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
354         return s.DelIEPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], s.nsPrefix+key, data)
355 }
356
357 //RemoveIf removes data from SDL conditionally. If existing data matches given data,
358 //key and data are removed from SDL. If remove was done successfully, true is returned.
359 func (s *SdlInstance) RemoveIf(key string, data interface{}) (bool, error) {
360         status, err := s.DelIE(s.nsPrefix+key, data)
361         if err != nil {
362                 return false, err
363         }
364         return status, nil
365 }
366
367 //GetAll returns all keys under the namespace. No prior knowledge about the keys in the
368 //given namespace exists, thus operation is not guaranteed to be atomic or isolated.
369 func (s *SdlInstance) GetAll() ([]string, error) {
370         keys, err := s.Keys(s.nsPrefix + "*")
371         var retVal []string
372         if err != nil {
373                 return retVal, err
374         }
375         for _, v := range keys {
376                 retVal = append(retVal, strings.Split(v, s.nsPrefix)[1])
377         }
378         return retVal, err
379 }
380
381 //RemoveAll removes all keys under the namespace. Remove operation is not atomic, thus
382 //it is not guaranteed that all keys are removed.
383 func (s *SdlInstance) RemoveAll() error {
384         keys, err := s.Keys(s.nsPrefix + "*")
385         if err != nil {
386                 return err
387         }
388         if (keys != nil) && (len(keys) != 0) {
389                 err = s.Del(keys)
390         }
391         return err
392 }
393
394 //RemoveAllAndPublish removes all keys under the namespace and if successfull, it
395 //will publish an event to given channel. This operation is not atomic, thus it is
396 //not guaranteed that all keys are removed.
397 func (s *SdlInstance) RemoveAllAndPublish(channelsAndEvents []string) error {
398         keys, err := s.Keys(s.nsPrefix + "*")
399         if err != nil {
400                 return err
401         }
402         if (keys != nil) && (len(keys) != 0) {
403                 if len(channelsAndEvents) == 0 {
404                         return s.Del(keys)
405                 }
406                 if err := s.checkChannelsAndEvents("RemoveIfAndPublish", channelsAndEvents); err != nil {
407                         return err
408                 }
409                 channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
410                 err = s.DelPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], keys)
411         }
412         return err
413 }
414
415 //AddMember adds a new members to a group.
416 //
417 //SDL groups are unordered collections of members where each member is
418 //unique. It is possible to add the same member several times without the
419 //need to check if it already exists.
420 func (s *SdlInstance) AddMember(group string, member ...interface{}) error {
421         return s.SAdd(s.nsPrefix+group, member...)
422 }
423
424 //RemoveMember removes members from a group.
425 func (s *SdlInstance) RemoveMember(group string, member ...interface{}) error {
426         return s.SRem(s.nsPrefix+group, member...)
427 }
428
429 //RemoveGroup removes the whole group along with it's members.
430 func (s *SdlInstance) RemoveGroup(group string) error {
431         return s.Del([]string{s.nsPrefix + group})
432 }
433
434 //GetMembers returns all the members from a group.
435 func (s *SdlInstance) GetMembers(group string) ([]string, error) {
436         retVal, err := s.SMembers(s.nsPrefix + group)
437         if err != nil {
438                 return []string{}, err
439         }
440         return retVal, err
441 }
442
443 //IsMember returns true if given member is found from a group.
444 func (s *SdlInstance) IsMember(group string, member interface{}) (bool, error) {
445         retVal, err := s.SIsMember(s.nsPrefix+group, member)
446         if err != nil {
447                 return false, err
448         }
449         return retVal, err
450 }
451
452 //GroupSize returns the number of members in a group.
453 func (s *SdlInstance) GroupSize(group string) (int64, error) {
454         retVal, err := s.SCard(s.nsPrefix + group)
455         if err != nil {
456                 return 0, err
457         }
458         return retVal, err
459 }
460
461 type iDatabase interface {
462         SubscribeChannelDB(cb sdlgoredis.ChannelNotificationCb, channelPrefix, eventSeparator string, channels ...string)
463         UnsubscribeChannelDB(channels ...string)
464         MSet(pairs ...interface{}) error
465         MSetPub(ns, message string, pairs ...interface{}) error
466         MGet(keys []string) ([]interface{}, error)
467         CloseDB() error
468         Del(keys []string) error
469         DelPub(channel, message string, keys []string) error
470         Keys(key string) ([]string, error)
471         SetIE(key string, oldData, newData interface{}) (bool, error)
472         SetIEPub(channel, message, key string, oldData, newData interface{}) (bool, error)
473         SetNX(key string, data interface{}) (bool, error)
474         SetNXPub(channel, message, key string, data interface{}) (bool, error)
475         DelIE(key string, data interface{}) (bool, error)
476         DelIEPub(channel, message, key string, data interface{}) (bool, error)
477         SAdd(key string, data ...interface{}) error
478         SRem(key string, data ...interface{}) error
479         SMembers(key string) ([]string, error)
480         SIsMember(key string, data interface{}) (bool, error)
481         SCard(key string) (int64, error)
482 }