Take DBAAS multi-channel publishing Redis modules into use
[ric-plt/sdlgo.git] / sdl.go
1 /*
2    Copyright (c) 2019 AT&T Intellectual Property.
3    Copyright (c) 2018-2019 Nokia.
4
5    Licensed under the Apache License, Version 2.0 (the "License");
6    you may not use this file except in compliance with the License.
7    You may obtain a copy of the License at
8
9        http://www.apache.org/licenses/LICENSE-2.0
10
11    Unless required by applicable law or agreed to in writing, software
12    distributed under the License is distributed on an "AS IS" BASIS,
13    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14    See the License for the specific language governing permissions and
15    limitations under the License.
16 */
17
18 /*
19  * This source code is part of the near-RT RIC (RAN Intelligent Controller)
20  * platform project (RICP).
21  */
22
23 package sdlgo
24
25 import (
26         "crypto/rand"
27         "encoding/base64"
28         "errors"
29         "fmt"
30         "io"
31         "reflect"
32         "strings"
33         "sync"
34         "time"
35
36         "gerrit.o-ran-sc.org/r/ric-plt/sdlgo/internal/sdlgoredis"
37 )
38
39 //SdlInstance provides an API to read, write and modify
40 //key-value pairs in a given namespace.
41 type SdlInstance struct {
42         nameSpace      string
43         nsPrefix       string
44         eventSeparator string
45         mutex          sync.Mutex
46         tmp            []byte
47         iDatabase
48 }
49
50 //Database struct is a holder for the internal database instance. Applications
51 //can use this exported data type to locally store a reference to database
52 //instance returned from NewDabase() function.
53 type Database struct {
54         instance iDatabase
55 }
56
57 //NewDatabase creates a connection to database that will be used
58 //as a backend for the key-value storage. The returned value
59 //can be reused between multiple SDL instances in which case each instance
60 //is using the same connection.
61 func NewDatabase() *Database {
62         return &Database{
63                 instance: sdlgoredis.Create(),
64         }
65 }
66
67 //NewSdlInstance creates a new sdl instance using the given namespace.
68 //The database used as a backend is given as a parameter
69 func NewSdlInstance(NameSpace string, db *Database) *SdlInstance {
70         return &SdlInstance{
71                 nameSpace:      NameSpace,
72                 nsPrefix:       "{" + NameSpace + "},",
73                 eventSeparator: "___",
74                 iDatabase:      db.instance,
75         }
76 }
77
78 //SubscribeChannel lets you to subscribe for a events on a given channels.
79 //SDL notifications are events that are published on a specific channels.
80 //Both the channel and events are defined by the entity that is publishing
81 //the events.
82 //
83 //When subscribing for a channel, a callback function is given as a parameter.
84 //Whenever a notification is received from a channel, this callback is called
85 //with channel and notifications as parameter (several notifications could be
86 //packed to a single callback function call). A call to SubscribeChannel function
87 //returns immediatelly, callbacks will be called asyncronously.
88 //
89 //It is possible to subscribe to different channels using different callbacks. In
90 //this case simply use SubscribeChannel function separately for each channel.
91 //
92 //When receiving events in callback routine, it is a good practive to return from
93 //callback as quickly as possible. E.g. reading in callback context should be avoided
94 //and using of Go signals is recommended. Also it should be noted that in case of several
95 //events received from different channels, callbacks are called in series one by one.
96 func (s *SdlInstance) SubscribeChannel(cb func(string, ...string), channels ...string) error {
97         s.SubscribeChannelDB(cb, s.nsPrefix, s.eventSeparator, s.setNamespaceToChannels(channels...)...)
98         return nil
99 }
100
101 //UnsubscribeChannel removes subscription from one or several channels.
102 func (s *SdlInstance) UnsubscribeChannel(channels ...string) error {
103         s.UnsubscribeChannelDB(s.setNamespaceToChannels(channels...)...)
104         return nil
105 }
106
107 //Close connection to backend database.
108 func (s *SdlInstance) Close() error {
109         return s.CloseDB()
110 }
111
112 func (s *SdlInstance) checkChannelsAndEvents(cmd string, channelsAndEvents []string) error {
113         if len(channelsAndEvents)%2 != 0 {
114                 return fmt.Errorf("%s: Channels and events must be given as pairs", cmd)
115         }
116         for i, v := range channelsAndEvents {
117                 if i%2 != 0 {
118                         if strings.Contains(v, s.eventSeparator) {
119                                 return fmt.Errorf("%s: event %s contains illegal substring (\"%s\")", cmd, v, s.eventSeparator)
120                         }
121                 }
122         }
123         return nil
124 }
125 func (s *SdlInstance) setNamespaceToChannels(channels ...string) []string {
126         var retVal []string
127         for _, v := range channels {
128                 retVal = append(retVal, s.nsPrefix+v)
129         }
130         return retVal
131 }
132
133 func (s *SdlInstance) setNamespaceToKeys(pairs ...interface{}) ([]interface{}, error) {
134         retVal := make([]interface{}, 0)
135         shouldBeKey := true
136         for _, v := range pairs {
137                 reflectType := reflect.TypeOf(v)
138                 switch reflectType.Kind() {
139                 case reflect.Map:
140                         x := reflect.ValueOf(v).MapRange()
141                         for x.Next() {
142                                 retVal = append(retVal, s.nsPrefix+x.Key().Interface().(string))
143                                 retVal = append(retVal, x.Value().Interface())
144                         }
145                 case reflect.Slice:
146                         if shouldBeKey {
147                                 x := reflect.ValueOf(v)
148                                 if x.Len()%2 != 0 {
149                                         return []interface{}{}, errors.New("Key/value pairs doesn't match")
150                                 }
151                                 for i2 := 0; i2 < x.Len(); i2++ {
152                                         if i2%2 == 0 {
153                                                 retVal = append(retVal, s.nsPrefix+x.Index(i2).Interface().(string))
154                                         } else {
155                                                 retVal = append(retVal, x.Index(i2).Interface())
156                                         }
157                                 }
158                         } else {
159                                 if reflectType.Elem().Kind() == reflect.Uint8 {
160                                         retVal = append(retVal, v)
161                                         shouldBeKey = true
162                                 } else {
163                                         return []interface{}{}, errors.New("Key/value pairs doesn't match")
164                                 }
165                         }
166                 case reflect.Array:
167                         if shouldBeKey {
168                                 x := reflect.ValueOf(v)
169                                 if x.Len()%2 != 0 {
170                                         return []interface{}{}, errors.New("Key/value pairs doesn't match")
171                                 }
172                                 for i2 := 0; i2 < x.Len(); i2++ {
173                                         if i2%2 == 0 {
174                                                 retVal = append(retVal, s.nsPrefix+x.Index(i2).Interface().(string))
175                                         } else {
176                                                 retVal = append(retVal, x.Index(i2).Interface())
177                                         }
178                                 }
179                         } else {
180                                 if reflectType.Elem().Kind() == reflect.Uint8 {
181                                         retVal = append(retVal, v)
182                                         shouldBeKey = true
183                                 } else {
184                                         return []interface{}{}, errors.New("Key/value pairs doesn't match")
185                                 }
186                         }
187                 default:
188                         if shouldBeKey {
189                                 retVal = append(retVal, s.nsPrefix+v.(string))
190                                 shouldBeKey = false
191                         } else {
192                                 retVal = append(retVal, v)
193                                 shouldBeKey = true
194                         }
195                 }
196         }
197         if len(retVal)%2 != 0 {
198                 return []interface{}{}, errors.New("Key/value pairs doesn't match")
199         }
200         return retVal, nil
201 }
202
203 func (s *SdlInstance) prepareChannelsAndEvents(channelsAndEvents []string) []string {
204         channelEventMap := make(map[string]string)
205         for i, v := range channelsAndEvents {
206                 if i%2 != 0 {
207                         continue
208                 }
209                 _, exists := channelEventMap[v]
210                 if exists {
211                         channelEventMap[v] = channelEventMap[v] + s.eventSeparator + channelsAndEvents[i+1]
212                 } else {
213                         channelEventMap[v] = channelsAndEvents[i+1]
214                 }
215         }
216         retVal := make([]string, 0)
217         for k, v := range channelEventMap {
218                 retVal = append(retVal, s.nsPrefix+k)
219                 retVal = append(retVal, v)
220         }
221         return retVal
222 }
223
224 //SetAndPublish function writes data to shared data layer storage and sends an event to
225 //a channel. Writing is done atomically, i.e. all succeeds or fails.
226 //Data to be written is given as key-value pairs. Several key-value
227 //pairs can be written with one call.
228 //The key is expected to be string whereas value can be anything, string,
229 //number, slice array or map
230 //
231 //If data was set successfully, an event is sent to a channel.
232 //Channels and events are given as pairs is channelsAndEvents parameter.
233 //It is possible to send several events to several channels by giving several
234 //channel-event pairs.
235 //  E.g. []{"channel1", "event1", "channel2", "event2", "channel1", "event3"}
236 //will send event1 and event3 to channel1 and event2 to channel2.
237 func (s *SdlInstance) SetAndPublish(channelsAndEvents []string, pairs ...interface{}) error {
238         keyAndData, err := s.setNamespaceToKeys(pairs...)
239         if err != nil {
240                 return err
241         }
242         if len(channelsAndEvents) == 0 {
243                 return s.MSet(keyAndData...)
244         }
245         if err := s.checkChannelsAndEvents("SetAndPublish", channelsAndEvents); err != nil {
246                 return err
247         }
248         channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
249         return s.MSetMPub(channelsAndEventsPrepared, keyAndData...)
250 }
251
252 //Set function writes data to shared data layer storage. Writing is done
253 //atomically, i.e. all succeeds or fails.
254 //Data to be written is given as key-value pairs. Several key-value
255 //pairs can be written with one call.
256 //The key is expected to be string whereas value can be anything, string,
257 //number, slice array or map
258 func (s *SdlInstance) Set(pairs ...interface{}) error {
259         if len(pairs) == 0 {
260                 return nil
261         }
262
263         keyAndData, err := s.setNamespaceToKeys(pairs...)
264         if err != nil {
265                 return err
266         }
267         return s.MSet(keyAndData...)
268 }
269
270 //Get function atomically reads one or more keys from SDL. The returned map has the
271 //requested keys as index and data as value. If the requested key is not found
272 //from SDL, it's value is nil
273 func (s *SdlInstance) Get(keys []string) (map[string]interface{}, error) {
274         m := make(map[string]interface{})
275         if len(keys) == 0 {
276                 return m, nil
277         }
278
279         var keysWithNs []string
280         for _, v := range keys {
281                 keysWithNs = append(keysWithNs, s.nsPrefix+v)
282         }
283         val, err := s.MGet(keysWithNs)
284         if err != nil {
285                 return m, err
286         }
287         for i, v := range val {
288                 m[keys[i]] = v
289         }
290         return m, err
291 }
292
293 //SetIfAndPublish atomically replaces existing data with newData in SDL if data matches the oldData.
294 //If replace was done successfully, true will be returned. Also, if publishing was successfull, an event
295 //is published to a given channel.
296 func (s *SdlInstance) SetIfAndPublish(channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error) {
297         if len(channelsAndEvents) == 0 {
298                 return s.SetIE(s.nsPrefix+key, oldData, newData)
299         }
300         if err := s.checkChannelsAndEvents("SetIfAndPublish", channelsAndEvents); err != nil {
301                 return false, err
302         }
303         channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
304         return s.SetIEPub(channelsAndEventsPrepared, s.nsPrefix+key, oldData, newData)
305 }
306
307 //SetIf atomically replaces existing data with newData in SDL if data matches the oldData.
308 //If replace was done successfully, true will be returned.
309 func (s *SdlInstance) SetIf(key string, oldData, newData interface{}) (bool, error) {
310         return s.SetIE(s.nsPrefix+key, oldData, newData)
311 }
312
313 //SetIfNotExistsAndPublish conditionally sets the value of a key. If key already exists in SDL,
314 //then it's value is not changed. Checking the key existence and potential set operation
315 //is done atomically. If the set operation was done successfully, an event is published to a
316 //given channel.
317 func (s *SdlInstance) SetIfNotExistsAndPublish(channelsAndEvents []string, key string, data interface{}) (bool, error) {
318         if len(channelsAndEvents) == 0 {
319                 return s.SetNX(s.nsPrefix+key, data, 0)
320         }
321         if err := s.checkChannelsAndEvents("SetIfNotExistsAndPublish", channelsAndEvents); err != nil {
322                 return false, err
323         }
324         channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
325         return s.SetNXPub(channelsAndEventsPrepared, s.nsPrefix+key, data)
326 }
327
328 //SetIfNotExists conditionally sets the value of a key. If key already exists in SDL,
329 //then it's value is not changed. Checking the key existence and potential set operation
330 //is done atomically.
331 func (s *SdlInstance) SetIfNotExists(key string, data interface{}) (bool, error) {
332         return s.SetNX(s.nsPrefix+key, data, 0)
333 }
334
335 //RemoveAndPublish removes data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
336 //Trying to remove a nonexisting key is not considered as an error.
337 //An event is published into a given channel if remove operation is successfull and
338 //at least one key is removed (if several keys given). If the given key(s) doesn't exist
339 //when trying to remove, no event is published.
340 func (s *SdlInstance) RemoveAndPublish(channelsAndEvents []string, keys []string) error {
341         if len(keys) == 0 {
342                 return nil
343         }
344
345         var keysWithNs []string
346         for _, v := range keys {
347                 keysWithNs = append(keysWithNs, s.nsPrefix+v)
348         }
349         if len(channelsAndEvents) == 0 {
350                 return s.Del(keysWithNs)
351         }
352         if err := s.checkChannelsAndEvents("RemoveAndPublish", channelsAndEvents); err != nil {
353                 return err
354         }
355         channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
356         return s.DelMPub(channelsAndEventsPrepared, keysWithNs)
357 }
358
359 //Remove data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
360 func (s *SdlInstance) Remove(keys []string) error {
361         if len(keys) == 0 {
362                 return nil
363         }
364
365         var keysWithNs []string
366         for _, v := range keys {
367                 keysWithNs = append(keysWithNs, s.nsPrefix+v)
368         }
369         err := s.Del(keysWithNs)
370         return err
371 }
372
373 //RemoveIfAndPublish removes data from SDL conditionally and if remove was done successfully,
374 //a given event is published to channel. If existing data matches given data,
375 //key and data are removed from SDL. If remove was done successfully, true is returned.
376 func (s *SdlInstance) RemoveIfAndPublish(channelsAndEvents []string, key string, data interface{}) (bool, error) {
377         if len(channelsAndEvents) == 0 {
378                 return s.DelIE(s.nsPrefix+key, data)
379         }
380         if err := s.checkChannelsAndEvents("RemoveIfAndPublish", channelsAndEvents); err != nil {
381                 return false, err
382         }
383         channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
384         return s.DelIEPub(channelsAndEventsPrepared, s.nsPrefix+key, data)
385 }
386
387 //RemoveIf removes data from SDL conditionally. If existing data matches given data,
388 //key and data are removed from SDL. If remove was done successfully, true is returned.
389 func (s *SdlInstance) RemoveIf(key string, data interface{}) (bool, error) {
390         status, err := s.DelIE(s.nsPrefix+key, data)
391         if err != nil {
392                 return false, err
393         }
394         return status, nil
395 }
396
397 //GetAll returns all keys under the namespace. No prior knowledge about the keys in the
398 //given namespace exists, thus operation is not guaranteed to be atomic or isolated.
399 func (s *SdlInstance) GetAll() ([]string, error) {
400         keys, err := s.Keys(s.nsPrefix + "*")
401         var retVal []string
402         if err != nil {
403                 return retVal, err
404         }
405         for _, v := range keys {
406                 retVal = append(retVal, strings.Split(v, s.nsPrefix)[1])
407         }
408         return retVal, err
409 }
410
411 //RemoveAll removes all keys under the namespace. Remove operation is not atomic, thus
412 //it is not guaranteed that all keys are removed.
413 func (s *SdlInstance) RemoveAll() error {
414         keys, err := s.Keys(s.nsPrefix + "*")
415         if err != nil {
416                 return err
417         }
418         if (keys != nil) && (len(keys) != 0) {
419                 err = s.Del(keys)
420         }
421         return err
422 }
423
424 //RemoveAllAndPublish removes all keys under the namespace and if successfull, it
425 //will publish an event to given channel. This operation is not atomic, thus it is
426 //not guaranteed that all keys are removed.
427 func (s *SdlInstance) RemoveAllAndPublish(channelsAndEvents []string) error {
428         keys, err := s.Keys(s.nsPrefix + "*")
429         if err != nil {
430                 return err
431         }
432         if (keys != nil) && (len(keys) != 0) {
433                 if len(channelsAndEvents) == 0 {
434                         return s.Del(keys)
435                 }
436                 if err := s.checkChannelsAndEvents("RemoveAllAndPublish", channelsAndEvents); err != nil {
437                         return err
438                 }
439                 channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
440                 err = s.DelMPub(channelsAndEventsPrepared, keys)
441         }
442         return err
443 }
444
445 //AddMember adds a new members to a group.
446 //
447 //SDL groups are unordered collections of members where each member is
448 //unique. It is possible to add the same member several times without the
449 //need to check if it already exists.
450 func (s *SdlInstance) AddMember(group string, member ...interface{}) error {
451         return s.SAdd(s.nsPrefix+group, member...)
452 }
453
454 //RemoveMember removes members from a group.
455 func (s *SdlInstance) RemoveMember(group string, member ...interface{}) error {
456         return s.SRem(s.nsPrefix+group, member...)
457 }
458
459 //RemoveGroup removes the whole group along with it's members.
460 func (s *SdlInstance) RemoveGroup(group string) error {
461         return s.Del([]string{s.nsPrefix + group})
462 }
463
464 //GetMembers returns all the members from a group.
465 func (s *SdlInstance) GetMembers(group string) ([]string, error) {
466         retVal, err := s.SMembers(s.nsPrefix + group)
467         if err != nil {
468                 return []string{}, err
469         }
470         return retVal, err
471 }
472
473 //IsMember returns true if given member is found from a group.
474 func (s *SdlInstance) IsMember(group string, member interface{}) (bool, error) {
475         retVal, err := s.SIsMember(s.nsPrefix+group, member)
476         if err != nil {
477                 return false, err
478         }
479         return retVal, err
480 }
481
482 //GroupSize returns the number of members in a group.
483 func (s *SdlInstance) GroupSize(group string) (int64, error) {
484         retVal, err := s.SCard(s.nsPrefix + group)
485         if err != nil {
486                 return 0, err
487         }
488         return retVal, err
489 }
490
491 func (s *SdlInstance) randomToken() (string, error) {
492         s.mutex.Lock()
493         defer s.mutex.Unlock()
494
495         if len(s.tmp) == 0 {
496                 s.tmp = make([]byte, 16)
497         }
498
499         if _, err := io.ReadFull(rand.Reader, s.tmp); err != nil {
500                 return "", err
501         }
502
503         return base64.RawURLEncoding.EncodeToString(s.tmp), nil
504 }
505
506 //LockResource function is used for locking a resource. The resource lock in
507 //practice is a key with random value that is set to expire after a time
508 //period. The value written to key is a random value, thus only the instance
509 //created a lock, can release it. Resource locks are per namespace.
510 func (s *SdlInstance) LockResource(resource string, expiration time.Duration, opt *Options) (*Lock, error) {
511         value, err := s.randomToken()
512         if err != nil {
513                 return nil, err
514         }
515
516         var retryTimer *time.Timer
517         for i, attempts := 0, opt.getRetryCount()+1; i < attempts; i++ {
518                 ok, err := s.SetNX(s.nsPrefix+resource, value, expiration)
519                 if err != nil {
520                         return nil, err
521                 } else if ok {
522                         return &Lock{s: s, key: resource, value: value}, nil
523                 }
524                 if retryTimer == nil {
525                         retryTimer = time.NewTimer(opt.getRetryWait())
526                         defer retryTimer.Stop()
527                 } else {
528                         retryTimer.Reset(opt.getRetryWait())
529                 }
530
531                 select {
532                 case <-retryTimer.C:
533                 }
534         }
535         return nil, errors.New("Lock not obtained")
536 }
537
538 //ReleaseResource removes the lock from a resource. If lock is already
539 //expired or some other instance is keeping the lock (lock taken after expiration),
540 //an error is returned.
541 func (l *Lock) ReleaseResource() error {
542         ok, err := l.s.DelIE(l.s.nsPrefix+l.key, l.value)
543
544         if err != nil {
545                 return err
546         }
547         if !ok {
548                 return errors.New("Lock not held")
549         }
550         return nil
551 }
552
553 //RefreshResource function can be used to set a new expiration time for the
554 //resource lock (if the lock still exists). The old remaining expiration
555 //time is overwritten with the given new expiration time.
556 func (l *Lock) RefreshResource(expiration time.Duration) error {
557         err := l.s.PExpireIE(l.s.nsPrefix+l.key, l.value, expiration)
558         return err
559 }
560
561 //CheckResource returns the expiration time left for a resource.
562 //If the resource doesn't exist, -2 is returned.
563 func (s *SdlInstance) CheckResource(resource string) (time.Duration, error) {
564         result, err := s.PTTL(s.nsPrefix + resource)
565         if err != nil {
566                 return 0, err
567         }
568         if result == time.Duration(-1) {
569                 return 0, errors.New("invalid resource given, no expiration time attached")
570         }
571         return result, nil
572 }
573
574 //Options struct defines the behaviour for getting the resource lock.
575 type Options struct {
576         //The number of time the lock will be tried.
577         //Default: 0 = no retry
578         RetryCount int
579
580         //Wait between the retries.
581         //Default: 100ms
582         RetryWait time.Duration
583 }
584
585 func (o *Options) getRetryCount() int {
586         if o != nil && o.RetryCount > 0 {
587                 return o.RetryCount
588         }
589         return 0
590 }
591
592 func (o *Options) getRetryWait() time.Duration {
593         if o != nil && o.RetryWait > 0 {
594                 return o.RetryWait
595         }
596         return 100 * time.Millisecond
597 }
598
599 //Lock struct identifies the resource lock instance. Releasing and adjusting the
600 //expirations are done using the methods defined for this struct.
601 type Lock struct {
602         s     *SdlInstance
603         key   string
604         value string
605 }
606
607 type iDatabase interface {
608         SubscribeChannelDB(cb func(string, ...string), channelPrefix, eventSeparator string, channels ...string)
609         UnsubscribeChannelDB(channels ...string)
610         MSet(pairs ...interface{}) error
611         MSetMPub(channelsAndEvents []string, pairs ...interface{}) error
612         MGet(keys []string) ([]interface{}, error)
613         CloseDB() error
614         Del(keys []string) error
615         DelMPub(channelsAndEvents []string, keys []string) error
616         Keys(key string) ([]string, error)
617         SetIE(key string, oldData, newData interface{}) (bool, error)
618         SetIEPub(channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error)
619         SetNX(key string, data interface{}, expiration time.Duration) (bool, error)
620         SetNXPub(channelsAndEvents []string, key string, data interface{}) (bool, error)
621         DelIE(key string, data interface{}) (bool, error)
622         DelIEPub(channelsAndEvents []string, key string, data interface{}) (bool, error)
623         SAdd(key string, data ...interface{}) error
624         SRem(key string, data ...interface{}) error
625         SMembers(key string) ([]string, error)
626         SIsMember(key string, data interface{}) (bool, error)
627         SCard(key string) (int64, error)
628         PTTL(key string) (time.Duration, error)
629         PExpireIE(key string, data interface{}, expiration time.Duration) error
630 }