Rephrase release notes introduction
[ric-plt/sdlgo.git] / sdl.go
1 /*
2    Copyright (c) 2019 AT&T Intellectual Property.
3    Copyright (c) 2018-2019 Nokia.
4
5    Licensed under the Apache License, Version 2.0 (the "License");
6    you may not use this file except in compliance with the License.
7    You may obtain a copy of the License at
8
9        http://www.apache.org/licenses/LICENSE-2.0
10
11    Unless required by applicable law or agreed to in writing, software
12    distributed under the License is distributed on an "AS IS" BASIS,
13    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14    See the License for the specific language governing permissions and
15    limitations under the License.
16 */
17
18 /*
19  * This source code is part of the near-RT RIC (RAN Intelligent Controller)
20  * platform project (RICP).
21  */
22
23 package sdlgo
24
25 import (
26         "crypto/rand"
27         "encoding/base64"
28         "errors"
29         "fmt"
30         "io"
31         "reflect"
32         "strings"
33         "sync"
34         "time"
35
36         "gerrit.o-ran-sc.org/r/ric-plt/sdlgo/internal/sdlgoredis"
37 )
38
39 //SdlInstance provides an API to read, write and modify
40 //key-value pairs in a given namespace.
41 type SdlInstance struct {
42         nameSpace      string
43         nsPrefix       string
44         eventSeparator string
45         mutex          sync.Mutex
46         tmp            []byte
47         iDatabase
48 }
49
50 //Database struct is a holder for the internal database instance. Applications
51 //can use this exported data type to locally store a reference to database
52 //instance returned from NewDabase() function.
53 type Database struct {
54         instance iDatabase
55 }
56
57 //NewDatabase creates a connection to database that will be used
58 //as a backend for the key-value storage. The returned value
59 //can be reused between multiple SDL instances in which case each instance
60 //is using the same connection.
61 func NewDatabase() *Database {
62         return &Database{
63                 instance: sdlgoredis.Create(),
64         }
65 }
66
67 //NewSdlInstance creates a new sdl instance using the given namespace.
68 //The database used as a backend is given as a parameter
69 func NewSdlInstance(NameSpace string, db *Database) *SdlInstance {
70         return &SdlInstance{
71                 nameSpace:      NameSpace,
72                 nsPrefix:       "{" + NameSpace + "},",
73                 eventSeparator: "___",
74                 iDatabase:      db.instance,
75         }
76 }
77
78 //SubscribeChannel lets you to subscribe for a events on a given channels.
79 //SDL notifications are events that are published on a specific channels.
80 //Both the channel and events are defined by the entity that is publishing
81 //the events.
82 //
83 //When subscribing for a channel, a callback function is given as a parameter.
84 //Whenever a notification is received from a channel, this callback is called
85 //with channel and notifications as parameter (several notifications could be
86 //packed to a single callback function call). A call to SubscribeChannel function
87 //returns immediatelly, callbacks will be called asyncronously.
88 //
89 //It is possible to subscribe to different channels using different callbacks. In
90 //this case simply use SubscribeChannel function separately for each channel.
91 //
92 //When receiving events in callback routine, it is a good practive to return from
93 //callback as quickly as possible. E.g. reading in callback context should be avoided
94 //and using of Go signals is recommended. Also it should be noted that in case of several
95 //events received from different channels, callbacks are called in series one by one.
96 //
97 //This function is NOT SAFE FOR CONCURRENT USE by multiple goroutines.
98 func (s *SdlInstance) SubscribeChannel(cb func(string, ...string), channels ...string) error {
99         s.SubscribeChannelDB(cb, s.nsPrefix, s.eventSeparator, s.setNamespaceToChannels(channels...)...)
100         return nil
101 }
102
103 //UnsubscribeChannel removes subscription from one or several channels.
104 func (s *SdlInstance) UnsubscribeChannel(channels ...string) error {
105         s.UnsubscribeChannelDB(s.setNamespaceToChannels(channels...)...)
106         return nil
107 }
108
109 //Close connection to backend database.
110 func (s *SdlInstance) Close() error {
111         return s.CloseDB()
112 }
113
114 func (s *SdlInstance) checkChannelsAndEvents(cmd string, channelsAndEvents []string) error {
115         if len(channelsAndEvents)%2 != 0 {
116                 return fmt.Errorf("%s: Channels and events must be given as pairs", cmd)
117         }
118         for i, v := range channelsAndEvents {
119                 if i%2 != 0 {
120                         if strings.Contains(v, s.eventSeparator) {
121                                 return fmt.Errorf("%s: event %s contains illegal substring (\"%s\")", cmd, v, s.eventSeparator)
122                         }
123                 }
124         }
125         return nil
126 }
127 func (s *SdlInstance) setNamespaceToChannels(channels ...string) []string {
128         var retVal []string
129         for _, v := range channels {
130                 retVal = append(retVal, s.nsPrefix+v)
131         }
132         return retVal
133 }
134
135 func (s *SdlInstance) setNamespaceToKeys(pairs ...interface{}) ([]interface{}, error) {
136         retVal := make([]interface{}, 0)
137         shouldBeKey := true
138         for _, v := range pairs {
139                 reflectType := reflect.TypeOf(v)
140                 switch reflectType.Kind() {
141                 case reflect.Map:
142                         x := reflect.ValueOf(v).MapRange()
143                         for x.Next() {
144                                 retVal = append(retVal, s.nsPrefix+x.Key().Interface().(string))
145                                 retVal = append(retVal, x.Value().Interface())
146                         }
147                 case reflect.Slice:
148                         if shouldBeKey {
149                                 x := reflect.ValueOf(v)
150                                 if x.Len()%2 != 0 {
151                                         return []interface{}{}, errors.New("Key/value pairs doesn't match")
152                                 }
153                                 for i2 := 0; i2 < x.Len(); i2++ {
154                                         if i2%2 == 0 {
155                                                 retVal = append(retVal, s.nsPrefix+x.Index(i2).Interface().(string))
156                                         } else {
157                                                 retVal = append(retVal, x.Index(i2).Interface())
158                                         }
159                                 }
160                         } else {
161                                 if reflectType.Elem().Kind() == reflect.Uint8 {
162                                         retVal = append(retVal, v)
163                                         shouldBeKey = true
164                                 } else {
165                                         return []interface{}{}, errors.New("Key/value pairs doesn't match")
166                                 }
167                         }
168                 case reflect.Array:
169                         if shouldBeKey {
170                                 x := reflect.ValueOf(v)
171                                 if x.Len()%2 != 0 {
172                                         return []interface{}{}, errors.New("Key/value pairs doesn't match")
173                                 }
174                                 for i2 := 0; i2 < x.Len(); i2++ {
175                                         if i2%2 == 0 {
176                                                 retVal = append(retVal, s.nsPrefix+x.Index(i2).Interface().(string))
177                                         } else {
178                                                 retVal = append(retVal, x.Index(i2).Interface())
179                                         }
180                                 }
181                         } else {
182                                 if reflectType.Elem().Kind() == reflect.Uint8 {
183                                         retVal = append(retVal, v)
184                                         shouldBeKey = true
185                                 } else {
186                                         return []interface{}{}, errors.New("Key/value pairs doesn't match")
187                                 }
188                         }
189                 default:
190                         if shouldBeKey {
191                                 retVal = append(retVal, s.nsPrefix+v.(string))
192                                 shouldBeKey = false
193                         } else {
194                                 retVal = append(retVal, v)
195                                 shouldBeKey = true
196                         }
197                 }
198         }
199         if len(retVal)%2 != 0 {
200                 return []interface{}{}, errors.New("Key/value pairs doesn't match")
201         }
202         return retVal, nil
203 }
204
205 func (s *SdlInstance) prepareChannelsAndEvents(channelsAndEvents []string) []string {
206         channelEventMap := make(map[string]string)
207         for i, v := range channelsAndEvents {
208                 if i%2 != 0 {
209                         continue
210                 }
211                 _, exists := channelEventMap[v]
212                 if exists {
213                         channelEventMap[v] = channelEventMap[v] + s.eventSeparator + channelsAndEvents[i+1]
214                 } else {
215                         channelEventMap[v] = channelsAndEvents[i+1]
216                 }
217         }
218         retVal := make([]string, 0)
219         for k, v := range channelEventMap {
220                 retVal = append(retVal, s.nsPrefix+k)
221                 retVal = append(retVal, v)
222         }
223         return retVal
224 }
225
226 //SetAndPublish function writes data to shared data layer storage and sends an event to
227 //a channel. Writing is done atomically, i.e. all succeeds or fails.
228 //Data to be written is given as key-value pairs. Several key-value
229 //pairs can be written with one call.
230 //The key is expected to be string whereas value can be anything, string,
231 //number, slice array or map
232 //
233 //If data was set successfully, an event is sent to a channel.
234 //Channels and events are given as pairs is channelsAndEvents parameter.
235 //It is possible to send several events to several channels by giving several
236 //channel-event pairs.
237 //  E.g. []{"channel1", "event1", "channel2", "event2", "channel1", "event3"}
238 //will send event1 and event3 to channel1 and event2 to channel2.
239 func (s *SdlInstance) SetAndPublish(channelsAndEvents []string, pairs ...interface{}) error {
240         keyAndData, err := s.setNamespaceToKeys(pairs...)
241         if err != nil {
242                 return err
243         }
244         if len(channelsAndEvents) == 0 {
245                 return s.MSet(keyAndData...)
246         }
247         if err := s.checkChannelsAndEvents("SetAndPublish", channelsAndEvents); err != nil {
248                 return err
249         }
250         channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
251         return s.MSetMPub(channelsAndEventsPrepared, keyAndData...)
252 }
253
254 //Set function writes data to shared data layer storage. Writing is done
255 //atomically, i.e. all succeeds or fails.
256 //Data to be written is given as key-value pairs. Several key-value
257 //pairs can be written with one call.
258 //The key is expected to be string whereas value can be anything, string,
259 //number, slice array or map
260 func (s *SdlInstance) Set(pairs ...interface{}) error {
261         if len(pairs) == 0 {
262                 return nil
263         }
264
265         keyAndData, err := s.setNamespaceToKeys(pairs...)
266         if err != nil {
267                 return err
268         }
269         return s.MSet(keyAndData...)
270 }
271
272 //Get function atomically reads one or more keys from SDL. The returned map has the
273 //requested keys as index and data as value. If the requested key is not found
274 //from SDL, it's value is nil
275 func (s *SdlInstance) Get(keys []string) (map[string]interface{}, error) {
276         m := make(map[string]interface{})
277         if len(keys) == 0 {
278                 return m, nil
279         }
280
281         var keysWithNs []string
282         for _, v := range keys {
283                 keysWithNs = append(keysWithNs, s.nsPrefix+v)
284         }
285         val, err := s.MGet(keysWithNs)
286         if err != nil {
287                 return m, err
288         }
289         for i, v := range val {
290                 m[keys[i]] = v
291         }
292         return m, err
293 }
294
295 //SetIfAndPublish atomically replaces existing data with newData in SDL if data matches the oldData.
296 //If replace was done successfully, true will be returned. Also, if publishing was successfull, an event
297 //is published to a given channel.
298 func (s *SdlInstance) SetIfAndPublish(channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error) {
299         if len(channelsAndEvents) == 0 {
300                 return s.SetIE(s.nsPrefix+key, oldData, newData)
301         }
302         if err := s.checkChannelsAndEvents("SetIfAndPublish", channelsAndEvents); err != nil {
303                 return false, err
304         }
305         channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
306         return s.SetIEPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], s.nsPrefix+key, oldData, newData)
307 }
308
309 //SetIf atomically replaces existing data with newData in SDL if data matches the oldData.
310 //If replace was done successfully, true will be returned.
311 func (s *SdlInstance) SetIf(key string, oldData, newData interface{}) (bool, error) {
312         return s.SetIE(s.nsPrefix+key, oldData, newData)
313 }
314
315 //SetIfNotExistsAndPublish conditionally sets the value of a key. If key already exists in SDL,
316 //then it's value is not changed. Checking the key existence and potential set operation
317 //is done atomically. If the set operation was done successfully, an event is published to a
318 //given channel.
319 func (s *SdlInstance) SetIfNotExistsAndPublish(channelsAndEvents []string, key string, data interface{}) (bool, error) {
320         if len(channelsAndEvents) == 0 {
321                 return s.SetNX(s.nsPrefix+key, data, 0)
322         }
323         if err := s.checkChannelsAndEvents("SetIfNotExistsAndPublish", channelsAndEvents); err != nil {
324                 return false, err
325         }
326         channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
327         return s.SetNXPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], s.nsPrefix+key, data)
328 }
329
330 //SetIfNotExists conditionally sets the value of a key. If key already exists in SDL,
331 //then it's value is not changed. Checking the key existence and potential set operation
332 //is done atomically.
333 func (s *SdlInstance) SetIfNotExists(key string, data interface{}) (bool, error) {
334         return s.SetNX(s.nsPrefix+key, data, 0)
335 }
336
337 //RemoveAndPublish removes data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
338 //Trying to remove a nonexisting key is not considered as an error.
339 //An event is published into a given channel if remove operation is successfull and
340 //at least one key is removed (if several keys given). If the given key(s) doesn't exist
341 //when trying to remove, no event is published.
342 func (s *SdlInstance) RemoveAndPublish(channelsAndEvents []string, keys []string) error {
343         if len(keys) == 0 {
344                 return nil
345         }
346
347         var keysWithNs []string
348         for _, v := range keys {
349                 keysWithNs = append(keysWithNs, s.nsPrefix+v)
350         }
351         if len(channelsAndEvents) == 0 {
352                 return s.Del(keysWithNs)
353         }
354         if err := s.checkChannelsAndEvents("RemoveAndPublish", channelsAndEvents); err != nil {
355                 return err
356         }
357         channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
358         return s.DelMPub(channelsAndEventsPrepared, keysWithNs)
359 }
360
361 //Remove data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
362 func (s *SdlInstance) Remove(keys []string) error {
363         if len(keys) == 0 {
364                 return nil
365         }
366
367         var keysWithNs []string
368         for _, v := range keys {
369                 keysWithNs = append(keysWithNs, s.nsPrefix+v)
370         }
371         err := s.Del(keysWithNs)
372         return err
373 }
374
375 //RemoveIfAndPublish removes data from SDL conditionally and if remove was done successfully,
376 //a given event is published to channel. If existing data matches given data,
377 //key and data are removed from SDL. If remove was done successfully, true is returned.
378 func (s *SdlInstance) RemoveIfAndPublish(channelsAndEvents []string, key string, data interface{}) (bool, error) {
379         if len(channelsAndEvents) == 0 {
380                 return s.DelIE(s.nsPrefix+key, data)
381         }
382         if err := s.checkChannelsAndEvents("RemoveIfAndPublish", channelsAndEvents); err != nil {
383                 return false, err
384         }
385         channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
386         return s.DelIEPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], s.nsPrefix+key, data)
387 }
388
389 //RemoveIf removes data from SDL conditionally. If existing data matches given data,
390 //key and data are removed from SDL. If remove was done successfully, true is returned.
391 func (s *SdlInstance) RemoveIf(key string, data interface{}) (bool, error) {
392         status, err := s.DelIE(s.nsPrefix+key, data)
393         if err != nil {
394                 return false, err
395         }
396         return status, nil
397 }
398
399 //GetAll returns all keys under the namespace. No prior knowledge about the keys in the
400 //given namespace exists, thus operation is not guaranteed to be atomic or isolated.
401 func (s *SdlInstance) GetAll() ([]string, error) {
402         keys, err := s.Keys(s.nsPrefix + "*")
403         var retVal []string
404         if err != nil {
405                 return retVal, err
406         }
407         for _, v := range keys {
408                 retVal = append(retVal, strings.Split(v, s.nsPrefix)[1])
409         }
410         return retVal, err
411 }
412
413 //RemoveAll removes all keys under the namespace. Remove operation is not atomic, thus
414 //it is not guaranteed that all keys are removed.
415 func (s *SdlInstance) RemoveAll() error {
416         keys, err := s.Keys(s.nsPrefix + "*")
417         if err != nil {
418                 return err
419         }
420         if (keys != nil) && (len(keys) != 0) {
421                 err = s.Del(keys)
422         }
423         return err
424 }
425
426 //RemoveAllAndPublish removes all keys under the namespace and if successfull, it
427 //will publish an event to given channel. This operation is not atomic, thus it is
428 //not guaranteed that all keys are removed.
429 func (s *SdlInstance) RemoveAllAndPublish(channelsAndEvents []string) error {
430         keys, err := s.Keys(s.nsPrefix + "*")
431         if err != nil {
432                 return err
433         }
434         if (keys != nil) && (len(keys) != 0) {
435                 if len(channelsAndEvents) == 0 {
436                         return s.Del(keys)
437                 }
438                 if err := s.checkChannelsAndEvents("RemoveAllAndPublish", channelsAndEvents); err != nil {
439                         return err
440                 }
441                 channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
442                 err = s.DelMPub(channelsAndEventsPrepared, keys)
443         }
444         return err
445 }
446
447 //AddMember adds a new members to a group.
448 //
449 //SDL groups are unordered collections of members where each member is
450 //unique. It is possible to add the same member several times without the
451 //need to check if it already exists.
452 func (s *SdlInstance) AddMember(group string, member ...interface{}) error {
453         return s.SAdd(s.nsPrefix+group, member...)
454 }
455
456 //RemoveMember removes members from a group.
457 func (s *SdlInstance) RemoveMember(group string, member ...interface{}) error {
458         return s.SRem(s.nsPrefix+group, member...)
459 }
460
461 //RemoveGroup removes the whole group along with it's members.
462 func (s *SdlInstance) RemoveGroup(group string) error {
463         return s.Del([]string{s.nsPrefix + group})
464 }
465
466 //GetMembers returns all the members from a group.
467 func (s *SdlInstance) GetMembers(group string) ([]string, error) {
468         retVal, err := s.SMembers(s.nsPrefix + group)
469         if err != nil {
470                 return []string{}, err
471         }
472         return retVal, err
473 }
474
475 //IsMember returns true if given member is found from a group.
476 func (s *SdlInstance) IsMember(group string, member interface{}) (bool, error) {
477         retVal, err := s.SIsMember(s.nsPrefix+group, member)
478         if err != nil {
479                 return false, err
480         }
481         return retVal, err
482 }
483
484 //GroupSize returns the number of members in a group.
485 func (s *SdlInstance) GroupSize(group string) (int64, error) {
486         retVal, err := s.SCard(s.nsPrefix + group)
487         if err != nil {
488                 return 0, err
489         }
490         return retVal, err
491 }
492
493 func (s *SdlInstance) randomToken() (string, error) {
494         s.mutex.Lock()
495         defer s.mutex.Unlock()
496
497         if len(s.tmp) == 0 {
498                 s.tmp = make([]byte, 16)
499         }
500
501         if _, err := io.ReadFull(rand.Reader, s.tmp); err != nil {
502                 return "", err
503         }
504
505         return base64.RawURLEncoding.EncodeToString(s.tmp), nil
506 }
507
508 //LockResource function is used for locking a resource. The resource lock in
509 //practice is a key with random value that is set to expire after a time
510 //period. The value written to key is a random value, thus only the instance
511 //created a lock, can release it. Resource locks are per namespace.
512 func (s *SdlInstance) LockResource(resource string, expiration time.Duration, opt *Options) (*Lock, error) {
513         value, err := s.randomToken()
514         if err != nil {
515                 return nil, err
516         }
517
518         var retryTimer *time.Timer
519         for i, attempts := 0, opt.getRetryCount()+1; i < attempts; i++ {
520                 ok, err := s.SetNX(s.nsPrefix+resource, value, expiration)
521                 if err != nil {
522                         return nil, err
523                 } else if ok {
524                         return &Lock{s: s, key: resource, value: value}, nil
525                 }
526                 if retryTimer == nil {
527                         retryTimer = time.NewTimer(opt.getRetryWait())
528                         defer retryTimer.Stop()
529                 } else {
530                         retryTimer.Reset(opt.getRetryWait())
531                 }
532
533                 select {
534                 case <-retryTimer.C:
535                 }
536         }
537         return nil, errors.New("Lock not obtained")
538 }
539
540 //ReleaseResource removes the lock from a resource. If lock is already
541 //expired or some other instance is keeping the lock (lock taken after expiration),
542 //an error is returned.
543 func (l *Lock) ReleaseResource() error {
544         ok, err := l.s.DelIE(l.s.nsPrefix+l.key, l.value)
545
546         if err != nil {
547                 return err
548         }
549         if !ok {
550                 return errors.New("Lock not held")
551         }
552         return nil
553 }
554
555 //RefreshResource function can be used to set a new expiration time for the
556 //resource lock (if the lock still exists). The old remaining expiration
557 //time is overwritten with the given new expiration time.
558 func (l *Lock) RefreshResource(expiration time.Duration) error {
559         err := l.s.PExpireIE(l.s.nsPrefix+l.key, l.value, expiration)
560         return err
561 }
562
563 //CheckResource returns the expiration time left for a resource.
564 //If the resource doesn't exist, -2 is returned.
565 func (s *SdlInstance) CheckResource(resource string) (time.Duration, error) {
566         result, err := s.PTTL(s.nsPrefix + resource)
567         if err != nil {
568                 return 0, err
569         }
570         if result == time.Duration(-1) {
571                 return 0, errors.New("invalid resource given, no expiration time attached")
572         }
573         return result, nil
574 }
575
576 //Options struct defines the behaviour for getting the resource lock.
577 type Options struct {
578         //The number of time the lock will be tried.
579         //Default: 0 = no retry
580         RetryCount int
581
582         //Wait between the retries.
583         //Default: 100ms
584         RetryWait time.Duration
585 }
586
587 func (o *Options) getRetryCount() int {
588         if o != nil && o.RetryCount > 0 {
589                 return o.RetryCount
590         }
591         return 0
592 }
593
594 func (o *Options) getRetryWait() time.Duration {
595         if o != nil && o.RetryWait > 0 {
596                 return o.RetryWait
597         }
598         return 100 * time.Millisecond
599 }
600
601 //Lock struct identifies the resource lock instance. Releasing and adjusting the
602 //expirations are done using the methods defined for this struct.
603 type Lock struct {
604         s     *SdlInstance
605         key   string
606         value string
607 }
608
609 type iDatabase interface {
610         SubscribeChannelDB(cb func(string, ...string), channelPrefix, eventSeparator string, channels ...string)
611         UnsubscribeChannelDB(channels ...string)
612         MSet(pairs ...interface{}) error
613         MSetMPub(channelsAndEvents []string, pairs ...interface{}) error
614         MGet(keys []string) ([]interface{}, error)
615         CloseDB() error
616         Del(keys []string) error
617         DelMPub(channelsAndEvents []string, keys []string) error
618         Keys(key string) ([]string, error)
619         SetIE(key string, oldData, newData interface{}) (bool, error)
620         SetIEPub(channel, message, key string, oldData, newData interface{}) (bool, error)
621         SetNX(key string, data interface{}, expiration time.Duration) (bool, error)
622         SetNXPub(channel, message, key string, data interface{}) (bool, error)
623         DelIE(key string, data interface{}) (bool, error)
624         DelIEPub(channel, message, key string, data interface{}) (bool, error)
625         SAdd(key string, data ...interface{}) error
626         SRem(key string, data ...interface{}) error
627         SMembers(key string) ([]string, error)
628         SIsMember(key string, data interface{}) (bool, error)
629         SCard(key string) (int64, error)
630         PTTL(key string) (time.Duration, error)
631         PExpireIE(key string, data interface{}, expiration time.Duration) error
632 }