Add resource locking
[ric-plt/sdlgo.git] / sdl.go
1 /*
2    Copyright (c) 2019 AT&T Intellectual Property.
3    Copyright (c) 2018-2019 Nokia.
4
5    Licensed under the Apache License, Version 2.0 (the "License");
6    you may not use this file except in compliance with the License.
7    You may obtain a copy of the License at
8
9        http://www.apache.org/licenses/LICENSE-2.0
10
11    Unless required by applicable law or agreed to in writing, software
12    distributed under the License is distributed on an "AS IS" BASIS,
13    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14    See the License for the specific language governing permissions and
15    limitations under the License.
16 */
17
18 package sdlgo
19
20 import (
21         "crypto/rand"
22         "encoding/base64"
23         "errors"
24         "fmt"
25         "io"
26         "reflect"
27         "strings"
28         "sync"
29         "time"
30
31         "gerrit.o-ran-sc.org/r/ric-plt/sdlgo/internal/sdlgoredis"
32 )
33
34 //SdlInstance provides an API to read, write and modify
35 //key-value pairs in a given namespace.
36 type SdlInstance struct {
37         nameSpace      string
38         nsPrefix       string
39         eventSeparator string
40         mutex          sync.Mutex
41         tmp            []byte
42         iDatabase
43 }
44
45 //NewDatabase creates a connection to database that will be used
46 //as a backend for the key-value storage. The returned value shall
47 //be given as a parameter when calling NewKeyValStorage
48 func NewDatabase() *sdlgoredis.DB {
49         return sdlgoredis.Create()
50 }
51
52 //NewSdlInstance creates a new sdl instance using the given namespace.
53 //The database used as a backend is given as a parameter
54 func NewSdlInstance(NameSpace string, db iDatabase) *SdlInstance {
55         return &SdlInstance{
56                 nameSpace:      NameSpace,
57                 nsPrefix:       "{" + NameSpace + "},",
58                 eventSeparator: "___",
59                 iDatabase:      db,
60         }
61 }
62
63 //SubscribeChannel lets you to subscribe for a events on a given channels.
64 //SDL notifications are events that are published on a specific channels.
65 //Both the channel and events are defined by the entity that is publishing
66 //the events.
67 //
68 //When subscribing for a channel, a callback function is given as a parameter.
69 //Whenever a notification is received from a channel, this callback is called
70 //with channel and notifications as parameter (several notifications could be
71 //packed to a single callback function call). A call to SubscribeChannel function
72 //returns immediatelly, callbacks will be called asyncronously.
73 //
74 //It is possible to subscribe to different channels using different callbacks. In
75 //this case simply use SubscribeChannel function separately for each channel.
76 //
77 //When receiving events in callback routine, it is a good practive to return from
78 //callback as quickly as possible. E.g. reading in callback context should be avoided
79 //and using of Go signals is recommended. Also it should be noted that in case of several
80 //events received from different channels, callbacks are called in series one by one.
81 func (s *SdlInstance) SubscribeChannel(cb func(string, ...string), channels ...string) error {
82         s.SubscribeChannelDB(cb, s.nsPrefix, s.eventSeparator, s.setNamespaceToChannels(channels...)...)
83         return nil
84 }
85
86 //UnsubscribeChannel removes subscription from one or several channels.
87 func (s *SdlInstance) UnsubscribeChannel(channels ...string) error {
88         s.UnsubscribeChannelDB(s.setNamespaceToChannels(channels...)...)
89         return nil
90 }
91
92 //Close connection to backend database.
93 func (s *SdlInstance) Close() error {
94         return s.CloseDB()
95 }
96
97 func (s *SdlInstance) checkChannelsAndEvents(cmd string, channelsAndEvents []string) error {
98         if len(channelsAndEvents)%2 != 0 {
99                 return fmt.Errorf("%s: Channels and events must be given as pairs", cmd)
100         }
101         for i, v := range channelsAndEvents {
102                 if i%2 != 0 {
103                         if strings.Contains(v, s.eventSeparator) {
104                                 return fmt.Errorf("%s: event %s contains illegal substring (\"%s\")", cmd, v, s.eventSeparator)
105                         }
106                 }
107         }
108         return nil
109 }
110 func (s *SdlInstance) setNamespaceToChannels(channels ...string) []string {
111         var retVal []string
112         for _, v := range channels {
113                 retVal = append(retVal, s.nsPrefix+v)
114         }
115         return retVal
116 }
117
118 func (s *SdlInstance) setNamespaceToKeys(pairs ...interface{}) ([]interface{}, error) {
119         retVal := make([]interface{}, 0)
120         shouldBeKey := true
121         for _, v := range pairs {
122                 reflectType := reflect.TypeOf(v)
123                 switch reflectType.Kind() {
124                 case reflect.Map:
125                         x := reflect.ValueOf(v).MapRange()
126                         for x.Next() {
127                                 retVal = append(retVal, s.nsPrefix+x.Key().Interface().(string))
128                                 retVal = append(retVal, x.Value().Interface())
129                         }
130                 case reflect.Slice:
131                         if shouldBeKey {
132                                 x := reflect.ValueOf(v)
133                                 if x.Len()%2 != 0 {
134                                         return []interface{}{}, errors.New("Key/value pairs doesn't match")
135                                 }
136                                 for i2 := 0; i2 < x.Len(); i2++ {
137                                         if i2%2 == 0 {
138                                                 retVal = append(retVal, s.nsPrefix+x.Index(i2).Interface().(string))
139                                         } else {
140                                                 retVal = append(retVal, x.Index(i2).Interface())
141                                         }
142                                 }
143                         } else {
144                                 if reflectType.Elem().Kind() == reflect.Uint8 {
145                                         retVal = append(retVal, v)
146                                         shouldBeKey = true
147                                 } else {
148                                         return []interface{}{}, errors.New("Key/value pairs doesn't match")
149                                 }
150                         }
151                 case reflect.Array:
152                         if shouldBeKey {
153                                 x := reflect.ValueOf(v)
154                                 if x.Len()%2 != 0 {
155                                         return []interface{}{}, errors.New("Key/value pairs doesn't match")
156                                 }
157                                 for i2 := 0; i2 < x.Len(); i2++ {
158                                         if i2%2 == 0 {
159                                                 retVal = append(retVal, s.nsPrefix+x.Index(i2).Interface().(string))
160                                         } else {
161                                                 retVal = append(retVal, x.Index(i2).Interface())
162                                         }
163                                 }
164                         } else {
165                                 if reflectType.Elem().Kind() == reflect.Uint8 {
166                                         retVal = append(retVal, v)
167                                         shouldBeKey = true
168                                 } else {
169                                         return []interface{}{}, errors.New("Key/value pairs doesn't match")
170                                 }
171                         }
172                 default:
173                         if shouldBeKey {
174                                 retVal = append(retVal, s.nsPrefix+v.(string))
175                                 shouldBeKey = false
176                         } else {
177                                 retVal = append(retVal, v)
178                                 shouldBeKey = true
179                         }
180                 }
181         }
182         if len(retVal)%2 != 0 {
183                 return []interface{}{}, errors.New("Key/value pairs doesn't match")
184         }
185         return retVal, nil
186 }
187
188 func (s *SdlInstance) prepareChannelsAndEvents(channelsAndEvents []string) []string {
189         channelEventMap := make(map[string]string)
190         for i, v := range channelsAndEvents {
191                 if i%2 != 0 {
192                         continue
193                 }
194                 _, exists := channelEventMap[v]
195                 if exists {
196                         channelEventMap[v] = channelEventMap[v] + s.eventSeparator + channelsAndEvents[i+1]
197                 } else {
198                         channelEventMap[v] = channelsAndEvents[i+1]
199                 }
200         }
201         retVal := make([]string, 0)
202         for k, v := range channelEventMap {
203                 retVal = append(retVal, s.nsPrefix+k)
204                 retVal = append(retVal, v)
205         }
206         return retVal
207 }
208
209 //SetAndPublish function writes data to shared data layer storage and sends an event to
210 //a channel. Writing is done atomically, i.e. all succeeds or fails.
211 //Data to be written is given as key-value pairs. Several key-value
212 //pairs can be written with one call.
213 //The key is expected to be string whereas value can be anything, string,
214 //number, slice array or map
215 //
216 //If data was set successfully, an event is sent to a channel.
217 //Channels and events are given as pairs is channelsAndEvents parameter.
218 //It is possible to send several events to several channels by giving several
219 //channel-event pairs.
220 //  E.g. []{"channel1", "event1", "channel2", "event2", "channel1", "event3"}
221 //will send event1 and event3 to channel1 and event2 to channel2.
222 func (s *SdlInstance) SetAndPublish(channelsAndEvents []string, pairs ...interface{}) error {
223         keyAndData, err := s.setNamespaceToKeys(pairs...)
224         if err != nil {
225                 return err
226         }
227         if len(channelsAndEvents) == 0 {
228                 return s.MSet(keyAndData...)
229         }
230         if err := s.checkChannelsAndEvents("SetAndPublish", channelsAndEvents); err != nil {
231                 return err
232         }
233         channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
234         return s.MSetMPub(channelsAndEventsPrepared, keyAndData...)
235 }
236
237 //Set function writes data to shared data layer storage. Writing is done
238 //atomically, i.e. all succeeds or fails.
239 //Data to be written is given as key-value pairs. Several key-value
240 //pairs can be written with one call.
241 //The key is expected to be string whereas value can be anything, string,
242 //number, slice array or map
243 func (s *SdlInstance) Set(pairs ...interface{}) error {
244         if len(pairs) == 0 {
245                 return nil
246         }
247
248         keyAndData, err := s.setNamespaceToKeys(pairs...)
249         if err != nil {
250                 return err
251         }
252         return s.MSet(keyAndData...)
253 }
254
255 //Get function atomically reads one or more keys from SDL. The returned map has the
256 //requested keys as index and data as value. If the requested key is not found
257 //from SDL, it's value is nil
258 func (s *SdlInstance) Get(keys []string) (map[string]interface{}, error) {
259         m := make(map[string]interface{})
260         if len(keys) == 0 {
261                 return m, nil
262         }
263
264         var keysWithNs []string
265         for _, v := range keys {
266                 keysWithNs = append(keysWithNs, s.nsPrefix+v)
267         }
268         val, err := s.MGet(keysWithNs)
269         if err != nil {
270                 return m, err
271         }
272         for i, v := range val {
273                 m[keys[i]] = v
274         }
275         return m, err
276 }
277
278 //SetIfAndPublish atomically replaces existing data with newData in SDL if data matches the oldData.
279 //If replace was done successfully, true will be returned. Also, if publishing was successfull, an event
280 //is published to a given channel.
281 func (s *SdlInstance) SetIfAndPublish(channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error) {
282         if len(channelsAndEvents) == 0 {
283                 return s.SetIE(s.nsPrefix+key, oldData, newData)
284         }
285         if err := s.checkChannelsAndEvents("SetIfAndPublish", channelsAndEvents); err != nil {
286                 return false, err
287         }
288         channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
289         return s.SetIEPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], s.nsPrefix+key, oldData, newData)
290 }
291
292 //SetIf atomically replaces existing data with newData in SDL if data matches the oldData.
293 //If replace was done successfully, true will be returned.
294 func (s *SdlInstance) SetIf(key string, oldData, newData interface{}) (bool, error) {
295         return s.SetIE(s.nsPrefix+key, oldData, newData)
296 }
297
298 //SetIfNotExistsAndPublish conditionally sets the value of a key. If key already exists in SDL,
299 //then it's value is not changed. Checking the key existence and potential set operation
300 //is done atomically. If the set operation was done successfully, an event is published to a
301 //given channel.
302 func (s *SdlInstance) SetIfNotExistsAndPublish(channelsAndEvents []string, key string, data interface{}) (bool, error) {
303         if len(channelsAndEvents) == 0 {
304                 return s.SetNX(s.nsPrefix+key, data, 0)
305         }
306         if err := s.checkChannelsAndEvents("SetIfNotExistsAndPublish", channelsAndEvents); err != nil {
307                 return false, err
308         }
309         channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
310         return s.SetNXPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], s.nsPrefix+key, data)
311 }
312
313 //SetIfNotExists conditionally sets the value of a key. If key already exists in SDL,
314 //then it's value is not changed. Checking the key existence and potential set operation
315 //is done atomically.
316 func (s *SdlInstance) SetIfNotExists(key string, data interface{}) (bool, error) {
317         return s.SetNX(s.nsPrefix+key, data, 0)
318 }
319
320 //RemoveAndPublish removes data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
321 //Trying to remove a nonexisting key is not considered as an error.
322 //An event is published into a given channel if remove operation is successfull and
323 //at least one key is removed (if several keys given). If the given key(s) doesn't exist
324 //when trying to remove, no event is published.
325 func (s *SdlInstance) RemoveAndPublish(channelsAndEvents []string, keys []string) error {
326         if len(keys) == 0 {
327                 return nil
328         }
329
330         var keysWithNs []string
331         for _, v := range keys {
332                 keysWithNs = append(keysWithNs, s.nsPrefix+v)
333         }
334         if len(channelsAndEvents) == 0 {
335                 return s.Del(keysWithNs)
336         }
337         if err := s.checkChannelsAndEvents("RemoveAndPublish", channelsAndEvents); err != nil {
338                 return err
339         }
340         channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
341         return s.DelMPub(channelsAndEventsPrepared, keysWithNs)
342 }
343
344 //Remove data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
345 func (s *SdlInstance) Remove(keys []string) error {
346         if len(keys) == 0 {
347                 return nil
348         }
349
350         var keysWithNs []string
351         for _, v := range keys {
352                 keysWithNs = append(keysWithNs, s.nsPrefix+v)
353         }
354         err := s.Del(keysWithNs)
355         return err
356 }
357
358 //RemoveIfAndPublish removes data from SDL conditionally and if remove was done successfully,
359 //a given event is published to channel. If existing data matches given data,
360 //key and data are removed from SDL. If remove was done successfully, true is returned.
361 func (s *SdlInstance) RemoveIfAndPublish(channelsAndEvents []string, key string, data interface{}) (bool, error) {
362         if len(channelsAndEvents) == 0 {
363                 return s.DelIE(s.nsPrefix+key, data)
364         }
365         if err := s.checkChannelsAndEvents("RemoveIfAndPublish", channelsAndEvents); err != nil {
366                 return false, err
367         }
368         channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
369         return s.DelIEPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], s.nsPrefix+key, data)
370 }
371
372 //RemoveIf removes data from SDL conditionally. If existing data matches given data,
373 //key and data are removed from SDL. If remove was done successfully, true is returned.
374 func (s *SdlInstance) RemoveIf(key string, data interface{}) (bool, error) {
375         status, err := s.DelIE(s.nsPrefix+key, data)
376         if err != nil {
377                 return false, err
378         }
379         return status, nil
380 }
381
382 //GetAll returns all keys under the namespace. No prior knowledge about the keys in the
383 //given namespace exists, thus operation is not guaranteed to be atomic or isolated.
384 func (s *SdlInstance) GetAll() ([]string, error) {
385         keys, err := s.Keys(s.nsPrefix + "*")
386         var retVal []string
387         if err != nil {
388                 return retVal, err
389         }
390         for _, v := range keys {
391                 retVal = append(retVal, strings.Split(v, s.nsPrefix)[1])
392         }
393         return retVal, err
394 }
395
396 //RemoveAll removes all keys under the namespace. Remove operation is not atomic, thus
397 //it is not guaranteed that all keys are removed.
398 func (s *SdlInstance) RemoveAll() error {
399         keys, err := s.Keys(s.nsPrefix + "*")
400         if err != nil {
401                 return err
402         }
403         if (keys != nil) && (len(keys) != 0) {
404                 err = s.Del(keys)
405         }
406         return err
407 }
408
409 //RemoveAllAndPublish removes all keys under the namespace and if successfull, it
410 //will publish an event to given channel. This operation is not atomic, thus it is
411 //not guaranteed that all keys are removed.
412 func (s *SdlInstance) RemoveAllAndPublish(channelsAndEvents []string) error {
413         keys, err := s.Keys(s.nsPrefix + "*")
414         if err != nil {
415                 return err
416         }
417         if (keys != nil) && (len(keys) != 0) {
418                 if len(channelsAndEvents) == 0 {
419                         return s.Del(keys)
420                 }
421                 if err := s.checkChannelsAndEvents("RemoveIfAndPublish", channelsAndEvents); err != nil {
422                         return err
423                 }
424                 channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
425                 err = s.DelMPub(channelsAndEventsPrepared, keys)
426         }
427         return err
428 }
429
430 //AddMember adds a new members to a group.
431 //
432 //SDL groups are unordered collections of members where each member is
433 //unique. It is possible to add the same member several times without the
434 //need to check if it already exists.
435 func (s *SdlInstance) AddMember(group string, member ...interface{}) error {
436         return s.SAdd(s.nsPrefix+group, member...)
437 }
438
439 //RemoveMember removes members from a group.
440 func (s *SdlInstance) RemoveMember(group string, member ...interface{}) error {
441         return s.SRem(s.nsPrefix+group, member...)
442 }
443
444 //RemoveGroup removes the whole group along with it's members.
445 func (s *SdlInstance) RemoveGroup(group string) error {
446         return s.Del([]string{s.nsPrefix + group})
447 }
448
449 //GetMembers returns all the members from a group.
450 func (s *SdlInstance) GetMembers(group string) ([]string, error) {
451         retVal, err := s.SMembers(s.nsPrefix + group)
452         if err != nil {
453                 return []string{}, err
454         }
455         return retVal, err
456 }
457
458 //IsMember returns true if given member is found from a group.
459 func (s *SdlInstance) IsMember(group string, member interface{}) (bool, error) {
460         retVal, err := s.SIsMember(s.nsPrefix+group, member)
461         if err != nil {
462                 return false, err
463         }
464         return retVal, err
465 }
466
467 //GroupSize returns the number of members in a group.
468 func (s *SdlInstance) GroupSize(group string) (int64, error) {
469         retVal, err := s.SCard(s.nsPrefix + group)
470         if err != nil {
471                 return 0, err
472         }
473         return retVal, err
474 }
475
476 func (s *SdlInstance) randomToken() (string, error) {
477         s.mutex.Lock()
478         defer s.mutex.Unlock()
479
480         if len(s.tmp) == 0 {
481                 s.tmp = make([]byte, 16)
482         }
483
484         if _, err := io.ReadFull(rand.Reader, s.tmp); err != nil {
485                 return "", err
486         }
487
488         return base64.RawURLEncoding.EncodeToString(s.tmp), nil
489 }
490
491 //LockResource function is used for locking a resource. The resource lock in
492 //practice is a key with random value that is set to expire after a time
493 //period. The value written to key is a random value, thus only the instance
494 //created a lock, can release it. Resource locks are per namespace.
495 func (s *SdlInstance) LockResource(resource string, expiration time.Duration, opt *Options) (*Lock, error) {
496         value, err := s.randomToken()
497         if err != nil {
498                 return nil, err
499         }
500
501         var retryTimer *time.Timer
502         for i, attempts := 0, opt.getRetryCount()+1; i < attempts; i++ {
503                 ok, err := s.SetNX(s.nsPrefix+resource, value, expiration)
504                 if err != nil {
505                         return nil, err
506                 } else if ok {
507                         return &Lock{s: s, key: resource, value: value}, nil
508                 }
509                 if retryTimer == nil {
510                         retryTimer = time.NewTimer(opt.getRetryWait())
511                         defer retryTimer.Stop()
512                 } else {
513                         retryTimer.Reset(opt.getRetryWait())
514                 }
515
516                 select {
517                 case <-retryTimer.C:
518                 }
519         }
520         return nil, errors.New("Lock not obtained")
521 }
522
523 //ReleaseResource removes the lock from a resource. If lock is already
524 //expired or some other instance is keeping the lock (lock taken after expiration),
525 //an error is returned.
526 func (l *Lock) ReleaseResource() error {
527         ok, err := l.s.DelIE(l.s.nsPrefix+l.key, l.value)
528
529         if err != nil {
530                 return err
531         }
532         if !ok {
533                 return errors.New("Lock not held")
534         }
535         return nil
536 }
537
538 //RefreshResource function can be used to set a new expiration time for the
539 //resource lock (if the lock still exists). The old remaining expiration
540 //time is overwritten with the given new expiration time.
541 func (l *Lock) RefreshResource(expiration time.Duration) error {
542         err := l.s.PExpireIE(l.s.nsPrefix+l.key, l.value, expiration)
543         return err
544 }
545
546 //CheckResource returns the expiration time left for a resource.
547 //If the resource doesn't exist, -2 is returned.
548 func (s *SdlInstance) CheckResource(resource string) (time.Duration, error) {
549         result, err := s.PTTL(s.nsPrefix + resource)
550         if err != nil {
551                 return 0, err
552         }
553         if result == time.Duration(-1) {
554                 return 0, errors.New("invalid resource given, no expiration time attached")
555         }
556         return result, nil
557 }
558
559 //Options struct defines the behaviour for getting the resource lock.
560 type Options struct {
561         //The number of time the lock will be tried.
562         //Default: 0 = no retry
563         RetryCount int
564
565         //Wait between the retries.
566         //Default: 100ms
567         RetryWait time.Duration
568 }
569
570 func (o *Options) getRetryCount() int {
571         if o != nil && o.RetryCount > 0 {
572                 return o.RetryCount
573         }
574         return 0
575 }
576
577 func (o *Options) getRetryWait() time.Duration {
578         if o != nil && o.RetryWait > 0 {
579                 return o.RetryWait
580         }
581         return 100 * time.Millisecond
582 }
583
584 //Lock struct identifies the resource lock instance. Releasing and adjusting the
585 //expirations are done using the methods defined for this struct.
586 type Lock struct {
587         s     *SdlInstance
588         key   string
589         value string
590 }
591
592 type iDatabase interface {
593         SubscribeChannelDB(cb sdlgoredis.ChannelNotificationCb, channelPrefix, eventSeparator string, channels ...string)
594         UnsubscribeChannelDB(channels ...string)
595         MSet(pairs ...interface{}) error
596         MSetMPub(channelsAndEvents []string, pairs ...interface{}) error
597         MGet(keys []string) ([]interface{}, error)
598         CloseDB() error
599         Del(keys []string) error
600         DelMPub(channelsAndEvents []string, keys []string) error
601         Keys(key string) ([]string, error)
602         SetIE(key string, oldData, newData interface{}) (bool, error)
603         SetIEPub(channel, message, key string, oldData, newData interface{}) (bool, error)
604         SetNX(key string, data interface{}, expiration time.Duration) (bool, error)
605         SetNXPub(channel, message, key string, data interface{}) (bool, error)
606         DelIE(key string, data interface{}) (bool, error)
607         DelIEPub(channel, message, key string, data interface{}) (bool, error)
608         SAdd(key string, data ...interface{}) error
609         SRem(key string, data ...interface{}) error
610         SMembers(key string) ([]string, error)
611         SIsMember(key string, data interface{}) (bool, error)
612         SCard(key string) (int64, error)
613         PTTL(key string) (time.Duration, error)
614         PExpireIE(key string, data interface{}, expiration time.Duration) error
615 }