Fix multi-namespace SDL event subscribe
[ric-plt/sdlgo.git] / syncstorage.go
1 /*
2    Copyright (c) 2021 AT&T Intellectual Property.
3    Copyright (c) 2018-2021 Nokia.
4
5    Licensed under the Apache License, Version 2.0 (the "License");
6    you may not use this file except in compliance with the License.
7    You may obtain a copy of the License at
8
9        http://www.apache.org/licenses/LICENSE-2.0
10
11    Unless required by applicable law or agreed to in writing, software
12    distributed under the License is distributed on an "AS IS" BASIS,
13    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14    See the License for the specific language governing permissions and
15    limitations under the License.
16 */
17
18 /*
19  * This source code is part of the near-RT RIC (RAN Intelligent Controller)
20  * platform project (RICP).
21  */
22
23 package sdlgo
24
25 import (
26         "crypto/rand"
27         "encoding/base64"
28         "errors"
29         "fmt"
30         "gerrit.o-ran-sc.org/r/ric-plt/sdlgo/internal/sdlgoredis"
31         "hash/crc32"
32         "io"
33         "reflect"
34         "strings"
35         "sync"
36         "time"
37 )
38
39 //SyncStorage provides multi-namespace APIs to read, write and modify key-value
40 //pairs. key-values are belonging to a namespace and SyncStorage provides APIs
41 //where namespace can be given in every API call. This means that with
42 //SyncStorage you can easily set key-values under different namespace compared to
43 //SdlInstance where namespace can be defined only at SdlInstance instance creation
44 //time.
45 type SyncStorage struct {
46         mutex sync.Mutex
47         tmp   []byte
48         db    *Database
49 }
50
51 //NewSyncStorage creates a new sdl instance.
52 //The database used as a backend is given as a parameter
53 func NewSyncStorage() *SyncStorage {
54         return newSyncStorage(NewDatabase())
55 }
56
57 func newSyncStorage(db *Database) *SyncStorage {
58         return &SyncStorage{
59                 db: db,
60         }
61 }
62
63 //selectDbInstance Selects DB instance what provides DB services for the namespace
64 func (s *SyncStorage) getDbBackend(ns string) iDatabase {
65         instanceCount := uint32(len(s.db.instances))
66         instanceID := getHash(ns) % instanceCount
67         return s.db.instances[instanceID]
68 }
69
70 //getHash Returns hash value calculated from the string
71 func getHash(s string) uint32 {
72         tbl := crc32.MakeTable(crc32.IEEE)
73         return crc32.Checksum([]byte(s), tbl)
74 }
75
76 //SubscribeChannel lets you to subscribe for a events on a given channels.
77 //SDL notifications are events that are published on a specific channels.
78 //Both the channel and events are defined by the entity that is publishing
79 //the events under given namespace.
80 //
81 //When subscribing for a channel, a callback function is given as a parameter.
82 //Whenever a notification is received from a channel, this callback is called
83 //with channel and notifications as parameter (several notifications could be
84 //packed to a single callback function call). A call to SubscribeChannel function
85 //returns immediatelly, callbacks will be called asyncronously.
86 //
87 //It is possible to subscribe to different channels using different callbacks. In
88 //this case simply use SubscribeChannel function separately for each channel.
89 //
90 //When receiving events in callback routine, it is a good practive to return from
91 //callback as quickly as possible. E.g. reading in callback context should be avoided
92 //and using of Go signals is recommended. Also it should be noted that in case of several
93 //events received from different channels, callbacks are called in series one by one.
94 func (s *SyncStorage) SubscribeChannel(ns string, cb func(string, ...string), channels ...string) error {
95         nsPrefix := getNsPrefix(ns)
96         s.getDbBackend(ns).SubscribeChannelDB(cb, s.setNamespaceToChannels(nsPrefix, channels...)...)
97         return nil
98 }
99
100 //UnsubscribeChannel removes subscription from one or several channels under given
101 //namespace.
102 func (s *SyncStorage) UnsubscribeChannel(ns string, channels ...string) error {
103         nsPrefix := getNsPrefix(ns)
104         s.getDbBackend(ns).UnsubscribeChannelDB(s.setNamespaceToChannels(nsPrefix, channels...)...)
105         return nil
106 }
107
108 //Close connection to backend database.
109 func (s *SyncStorage) Close() error {
110         var ret error
111         for _, db := range s.db.instances {
112                 if err := db.CloseDB(); err != nil {
113                         ret = err
114                 }
115         }
116         return ret
117 }
118
119 func (s *SyncStorage) checkChannelsAndEvents(cmd string, channelsAndEvents []string) error {
120         if len(channelsAndEvents)%2 != 0 {
121                 return fmt.Errorf("%s: Channels and events must be given as pairs", cmd)
122         }
123         for i, v := range channelsAndEvents {
124                 if i%2 != 0 {
125                         if strings.Contains(v, sdlgoredis.EventSeparator) {
126                                 return fmt.Errorf("%s: event %s contains illegal substring (\"%s\")", cmd, v, sdlgoredis.EventSeparator)
127                         }
128                 }
129         }
130         return nil
131 }
132
133 func (s *SyncStorage) setNamespaceToChannels(nsPrefix string, channels ...string) []string {
134         var retVal []string
135         for _, v := range channels {
136                 retVal = append(retVal, nsPrefix+v)
137         }
138         return retVal
139 }
140
141 func (s *SyncStorage) setNamespaceToKeys(nsPrefix string, pairs ...interface{}) ([]interface{}, error) {
142         retVal := make([]interface{}, 0)
143         shouldBeKey := true
144         for _, v := range pairs {
145                 reflectType := reflect.TypeOf(v)
146                 switch reflectType.Kind() {
147                 case reflect.Map:
148                         x := reflect.ValueOf(v).MapRange()
149                         for x.Next() {
150                                 retVal = append(retVal, nsPrefix+x.Key().Interface().(string))
151                                 retVal = append(retVal, x.Value().Interface())
152                         }
153                 case reflect.Slice:
154                         if shouldBeKey {
155                                 x := reflect.ValueOf(v)
156                                 if x.Len()%2 != 0 {
157                                         return []interface{}{}, errors.New("Key/value pairs doesn't match")
158                                 }
159                                 for i2 := 0; i2 < x.Len(); i2++ {
160                                         if i2%2 == 0 {
161                                                 retVal = append(retVal, nsPrefix+x.Index(i2).Interface().(string))
162                                         } else {
163                                                 retVal = append(retVal, x.Index(i2).Interface())
164                                         }
165                                 }
166                         } else {
167                                 if reflectType.Elem().Kind() == reflect.Uint8 {
168                                         retVal = append(retVal, v)
169                                         shouldBeKey = true
170                                 } else {
171                                         return []interface{}{}, errors.New("Key/value pairs doesn't match")
172                                 }
173                         }
174                 case reflect.Array:
175                         if shouldBeKey {
176                                 x := reflect.ValueOf(v)
177                                 if x.Len()%2 != 0 {
178                                         return []interface{}{}, errors.New("Key/value pairs doesn't match")
179                                 }
180                                 for i2 := 0; i2 < x.Len(); i2++ {
181                                         if i2%2 == 0 {
182                                                 retVal = append(retVal, nsPrefix+x.Index(i2).Interface().(string))
183                                         } else {
184                                                 retVal = append(retVal, x.Index(i2).Interface())
185                                         }
186                                 }
187                         } else {
188                                 if reflectType.Elem().Kind() == reflect.Uint8 {
189                                         retVal = append(retVal, v)
190                                         shouldBeKey = true
191                                 } else {
192                                         return []interface{}{}, errors.New("Key/value pairs doesn't match")
193                                 }
194                         }
195                 default:
196                         if shouldBeKey {
197                                 retVal = append(retVal, nsPrefix+v.(string))
198                                 shouldBeKey = false
199                         } else {
200                                 retVal = append(retVal, v)
201                                 shouldBeKey = true
202                         }
203                 }
204         }
205         if len(retVal)%2 != 0 {
206                 return []interface{}{}, errors.New("Key/value pairs doesn't match")
207         }
208         return retVal, nil
209 }
210
211 func (s *SyncStorage) prepareChannelsAndEvents(nsPrefix string, channelsAndEvents []string) []string {
212         channelEventMap := make(map[string]string)
213         for i, v := range channelsAndEvents {
214                 if i%2 != 0 {
215                         continue
216                 }
217                 _, exists := channelEventMap[v]
218                 if exists {
219                         channelEventMap[v] = channelEventMap[v] + sdlgoredis.EventSeparator + channelsAndEvents[i+1]
220                 } else {
221                         channelEventMap[v] = channelsAndEvents[i+1]
222                 }
223         }
224         retVal := make([]string, 0)
225         for k, v := range channelEventMap {
226                 retVal = append(retVal, nsPrefix+k)
227                 retVal = append(retVal, v)
228         }
229         return retVal
230 }
231
232 //SetAndPublish function writes data to shared data layer storage and sends an event to
233 //a channel. Writing is done atomically, i.e. all succeeds or fails.
234 //Data is written under the namespace what is given as a parameter for this function.
235 //Data to be written is given as key-value pairs. Several key-value
236 //pairs can be written with one call.
237 //The key is expected to be string whereas value can be anything, string,
238 //number, slice array or map
239 //
240 //If data was set successfully, an event is sent to a channel.
241 //Channels and events are given as pairs is channelsAndEvents parameter.
242 //It is possible to send several events to several channels by giving several
243 //channel-event pairs.
244 //  E.g. []{"channel1", "event1", "channel2", "event2", "channel1", "event3"}
245 //will send event1 and event3 to channel1 and event2 to channel2.
246 func (s *SyncStorage) SetAndPublish(ns string, channelsAndEvents []string, pairs ...interface{}) error {
247         nsPrefix := getNsPrefix(ns)
248         keyAndData, err := s.setNamespaceToKeys(nsPrefix, pairs...)
249         if err != nil {
250                 return err
251         }
252         if len(channelsAndEvents) == 0 {
253                 return s.getDbBackend(ns).MSet(keyAndData...)
254         }
255         if err := s.checkChannelsAndEvents("SetAndPublish", channelsAndEvents); err != nil {
256                 return err
257         }
258         channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
259         return s.getDbBackend(ns).MSetMPub(channelsAndEventsPrepared, keyAndData...)
260 }
261
262 //Set function writes data to shared data layer storage. Writing is done
263 //atomically, i.e. all succeeds or fails.
264 //Data is written under the namespace what is given as a parameter for this function.
265 //Data to be written is given as key-value pairs. Several key-value
266 //pairs can be written with one call.
267 //The key is expected to be string whereas value can be anything, string,
268 //number, slice array or map
269 func (s *SyncStorage) Set(ns string, pairs ...interface{}) error {
270         if len(pairs) == 0 {
271                 return nil
272         }
273
274         keyAndData, err := s.setNamespaceToKeys(getNsPrefix(ns), pairs...)
275         if err != nil {
276                 return err
277         }
278         return s.getDbBackend(ns).MSet(keyAndData...)
279 }
280
281 //Get function atomically reads one or more keys from SDL. The returned map has the
282 //requested keys as index and data as value. If the requested key is not found
283 //from SDL, it's value is nil
284 //Read operation is targeted to the namespace what is given as a parameter for this
285 //function.
286 func (s *SyncStorage) Get(ns string, keys []string) (map[string]interface{}, error) {
287         m := make(map[string]interface{})
288         if len(keys) == 0 {
289                 return m, nil
290         }
291
292         var keysWithNs []string
293         for _, v := range keys {
294                 keysWithNs = append(keysWithNs, getNsPrefix(ns)+v)
295         }
296         val, err := s.getDbBackend(ns).MGet(keysWithNs)
297         if err != nil {
298                 return m, err
299         }
300         for i, v := range val {
301                 m[keys[i]] = v
302         }
303         return m, err
304 }
305
306 //SetIfAndPublish atomically replaces existing data with newData in SDL if data matches the oldData.
307 //If replace was done successfully, true will be returned. Also, if publishing was successfull, an event
308 //is published to a given channel.
309 //Data is written under the namespace what is given as a parameter for this function.
310 func (s *SyncStorage) SetIfAndPublish(ns string, channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error) {
311         nsPrefix := getNsPrefix(ns)
312         if len(channelsAndEvents) == 0 {
313                 return s.getDbBackend(ns).SetIE(nsPrefix+key, oldData, newData)
314         }
315         if err := s.checkChannelsAndEvents("SetIfAndPublish", channelsAndEvents); err != nil {
316                 return false, err
317         }
318         channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
319         return s.getDbBackend(ns).SetIEPub(channelsAndEventsPrepared, nsPrefix+key, oldData, newData)
320 }
321
322 //SetIf atomically replaces existing data with newData in SDL if data matches the oldData.
323 //If replace was done successfully, true will be returned.
324 //Data is written under the namespace what is given as a parameter for this function.
325 func (s *SyncStorage) SetIf(ns string, key string, oldData, newData interface{}) (bool, error) {
326         return s.getDbBackend(ns).SetIE(getNsPrefix(ns)+key, oldData, newData)
327 }
328
329 //SetIfNotExistsAndPublish conditionally sets the value of a key. If key already exists in SDL,
330 //then it's value is not changed. Checking the key existence and potential set operation
331 //is done atomically. If the set operation was done successfully, an event is published to a
332 //given channel.
333 //Data is written under the namespace what is given as a parameter for this function.
334 func (s *SyncStorage) SetIfNotExistsAndPublish(ns string, channelsAndEvents []string, key string, data interface{}) (bool, error) {
335         nsPrefix := getNsPrefix(ns)
336         if len(channelsAndEvents) == 0 {
337                 return s.getDbBackend(ns).SetNX(nsPrefix+key, data, 0)
338         }
339         if err := s.checkChannelsAndEvents("SetIfNotExistsAndPublish", channelsAndEvents); err != nil {
340                 return false, err
341         }
342         channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
343         return s.getDbBackend(ns).SetNXPub(channelsAndEventsPrepared, nsPrefix+key, data)
344 }
345
346 //SetIfNotExists conditionally sets the value of a key. If key already exists in SDL,
347 //then it's value is not changed. Checking the key existence and potential set operation
348 //is done atomically.
349 //Data is written under the namespace what is given as a parameter for this function.
350 func (s *SyncStorage) SetIfNotExists(ns string, key string, data interface{}) (bool, error) {
351         return s.getDbBackend(ns).SetNX(getNsPrefix(ns)+key, data, 0)
352 }
353
354 //RemoveAndPublish removes data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
355 //Trying to remove a nonexisting key is not considered as an error.
356 //An event is published into a given channel if remove operation is successfull and
357 //at least one key is removed (if several keys given). If the given key(s) doesn't exist
358 //when trying to remove, no event is published.
359 //Data is removed under the namespace what is given as a parameter for this function.
360 func (s *SyncStorage) RemoveAndPublish(ns string, channelsAndEvents []string, keys []string) error {
361         if len(keys) == 0 {
362                 return nil
363         }
364
365         var keysWithNs []string
366         nsPrefix := getNsPrefix(ns)
367         for _, v := range keys {
368                 keysWithNs = append(keysWithNs, nsPrefix+v)
369         }
370         if len(channelsAndEvents) == 0 {
371                 return s.getDbBackend(ns).Del(keysWithNs)
372         }
373         if err := s.checkChannelsAndEvents("RemoveAndPublish", channelsAndEvents); err != nil {
374                 return err
375         }
376         channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
377         return s.getDbBackend(ns).DelMPub(channelsAndEventsPrepared, keysWithNs)
378 }
379
380 //Remove data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
381 //Data is removed under the namespace what is given as a parameter for this function.
382 func (s *SyncStorage) Remove(ns string, keys []string) error {
383         if len(keys) == 0 {
384                 return nil
385         }
386
387         var keysWithNs []string
388         for _, v := range keys {
389                 keysWithNs = append(keysWithNs, getNsPrefix(ns)+v)
390         }
391         err := s.getDbBackend(ns).Del(keysWithNs)
392         return err
393 }
394
395 //RemoveIfAndPublish removes data from SDL conditionally and if remove was done successfully,
396 //a given event is published to channel. If existing data matches given data,
397 //key and data are removed from SDL. If remove was done successfully, true is returned.
398 //Data is removed under the namespace what is given as a parameter for this function.
399 func (s *SyncStorage) RemoveIfAndPublish(ns string, channelsAndEvents []string, key string, data interface{}) (bool, error) {
400         nsPrefix := getNsPrefix(ns)
401         if len(channelsAndEvents) == 0 {
402                 return s.getDbBackend(ns).DelIE(nsPrefix+key, data)
403         }
404         if err := s.checkChannelsAndEvents("RemoveIfAndPublish", channelsAndEvents); err != nil {
405                 return false, err
406         }
407         channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
408         return s.getDbBackend(ns).DelIEPub(channelsAndEventsPrepared, nsPrefix+key, data)
409 }
410
411 //RemoveIf removes data from SDL conditionally. If existing data matches given data,
412 //key and data are removed from SDL. If remove was done successfully, true is returned.
413 //Data is removed under the namespace what is given as a parameter for this function.
414 func (s *SyncStorage) RemoveIf(ns string, key string, data interface{}) (bool, error) {
415         status, err := s.getDbBackend(ns).DelIE(getNsPrefix(ns)+key, data)
416         if err != nil {
417                 return false, err
418         }
419         return status, nil
420 }
421
422 //GetAll returns all keys under the namespace. No prior knowledge about the keys in the
423 //given namespace exists, thus operation is not guaranteed to be atomic or isolated.
424 func (s *SyncStorage) GetAll(ns string) ([]string, error) {
425         nsPrefix := getNsPrefix(ns)
426         keys, err := s.getDbBackend(ns).Keys(nsPrefix + "*")
427         var retVal []string
428         if err != nil {
429                 return retVal, err
430         }
431         for _, v := range keys {
432                 retVal = append(retVal, strings.Split(v, nsPrefix)[1])
433         }
434         return retVal, err
435 }
436
437 // ListKeys returns all keys in the given namespace matching key search pattern.
438 //
439 //  Supported search glob-style patterns:
440 //    h?llo matches hello, hallo and hxllo
441 //    h*llo matches hllo and heeeello
442 //    h[ae]llo matches hello and hallo, but not hillo
443 //    h[^e]llo matches hallo, hbllo, ... but not hello
444 //    h[a-b]llo matches hallo and hbllo
445 //
446 //  The \ escapes character in key search pattern and those will be treated as a normal
447 //  character:
448 //    h\[?llo\* matches h[ello* and h[allo*
449 //
450 // No prior knowledge about the keys in the given namespace exists,
451 // thus operation is not guaranteed to be atomic or isolated.
452 func (s *SyncStorage) ListKeys(ns string, pattern string) ([]string, error) {
453         nsPrefix := getNsPrefix(ns)
454         nsKeys, err := s.getDbBackend(ns).Keys(nsPrefix + pattern)
455         var keys []string
456         if err != nil {
457                 return keys, err
458         }
459         for _, key := range nsKeys {
460                 keys = append(keys, strings.Split(key, nsPrefix)[1])
461         }
462         return keys, err
463 }
464
465 //RemoveAll removes all keys under the namespace. Remove operation is not atomic, thus
466 //it is not guaranteed that all keys are removed.
467 func (s *SyncStorage) RemoveAll(ns string) error {
468         keys, err := s.getDbBackend(ns).Keys(getNsPrefix(ns) + "*")
469         if err != nil {
470                 return err
471         }
472         if (keys != nil) && (len(keys) != 0) {
473                 err = s.getDbBackend(ns).Del(keys)
474         }
475         return err
476 }
477
478 //RemoveAllAndPublish removes all keys under the namespace and if successfull, it
479 //will publish an event to given channel. This operation is not atomic, thus it is
480 //not guaranteed that all keys are removed.
481 func (s *SyncStorage) RemoveAllAndPublish(ns string, channelsAndEvents []string) error {
482         nsPrefix := getNsPrefix(ns)
483         keys, err := s.getDbBackend(ns).Keys(nsPrefix + "*")
484         if err != nil {
485                 return err
486         }
487         if (keys != nil) && (len(keys) != 0) {
488                 if len(channelsAndEvents) == 0 {
489                         return s.getDbBackend(ns).Del(keys)
490                 }
491                 if err := s.checkChannelsAndEvents("RemoveAllAndPublish", channelsAndEvents); err != nil {
492                         return err
493                 }
494                 channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
495                 err = s.getDbBackend(ns).DelMPub(channelsAndEventsPrepared, keys)
496         }
497         return err
498 }
499
500 //AddMember adds a new members to a group under given namespace.
501 //
502 //SDL groups are unordered collections of members where each member is
503 //unique. It is possible to add the same member several times without the
504 //need to check if it already exists.
505 func (s *SyncStorage) AddMember(ns string, group string, member ...interface{}) error {
506         return s.getDbBackend(ns).SAdd(getNsPrefix(ns)+group, member...)
507 }
508
509 //RemoveMember removes members from a group under given namespace.
510 func (s *SyncStorage) RemoveMember(ns string, group string, member ...interface{}) error {
511         return s.getDbBackend(ns).SRem(getNsPrefix(ns)+group, member...)
512 }
513
514 //RemoveGroup removes the whole group along with it's members under given namespace.
515 func (s *SyncStorage) RemoveGroup(ns string, group string) error {
516         return s.getDbBackend(ns).Del([]string{getNsPrefix(ns) + group})
517 }
518
519 //GetMembers returns all the members from a group under given namespace.
520 func (s *SyncStorage) GetMembers(ns string, group string) ([]string, error) {
521         retVal, err := s.getDbBackend(ns).SMembers(getNsPrefix(ns) + group)
522         if err != nil {
523                 return []string{}, err
524         }
525         return retVal, err
526 }
527
528 //IsMember returns true if given member is found from a group under given namespace.
529 func (s *SyncStorage) IsMember(ns string, group string, member interface{}) (bool, error) {
530         retVal, err := s.getDbBackend(ns).SIsMember(getNsPrefix(ns)+group, member)
531         if err != nil {
532                 return false, err
533         }
534         return retVal, err
535 }
536
537 //GroupSize returns the number of members in a group under given namespace.
538 func (s *SyncStorage) GroupSize(ns string, group string) (int64, error) {
539         retVal, err := s.getDbBackend(ns).SCard(getNsPrefix(ns) + group)
540         if err != nil {
541                 return 0, err
542         }
543         return retVal, err
544 }
545
546 func (s *SyncStorage) randomToken() (string, error) {
547         s.mutex.Lock()
548         defer s.mutex.Unlock()
549
550         if len(s.tmp) == 0 {
551                 s.tmp = make([]byte, 16)
552         }
553
554         if _, err := io.ReadFull(rand.Reader, s.tmp); err != nil {
555                 return "", err
556         }
557
558         return base64.RawURLEncoding.EncodeToString(s.tmp), nil
559 }
560
561 //LockResource function is used for locking a resource under given namespace.
562 //The resource lock in practice is a key with random value that is set to expire
563 //after a time period. The value written to key is a random value, thus only the
564 //instance created a lock, can release it. Resource locks are per namespace.
565 func (s *SyncStorage) LockResource(ns string, resource string, expiration time.Duration, opt *Options) (*SyncStorageLock, error) {
566         value, err := s.randomToken()
567         if err != nil {
568                 return nil, err
569         }
570
571         var retryTimer *time.Timer
572         for i, attempts := 0, opt.getRetryCount()+1; i < attempts; i++ {
573                 ok, err := s.getDbBackend(ns).SetNX(getNsPrefix(ns)+resource, value, expiration)
574                 if err != nil {
575                         return nil, err
576                 } else if ok {
577                         return &SyncStorageLock{s: s, key: resource, value: value}, nil
578                 }
579                 if retryTimer == nil {
580                         retryTimer = time.NewTimer(opt.getRetryWait())
581                         defer retryTimer.Stop()
582                 } else {
583                         retryTimer.Reset(opt.getRetryWait())
584                 }
585
586                 select {
587                 case <-retryTimer.C:
588                 }
589         }
590         return nil, errors.New("Lock not obtained")
591 }
592
593 //ReleaseResource removes the lock from a resource under given namespace. If lock
594 //is already expired or some other instance is keeping the lock (lock taken after
595 //expiration), an error is returned.
596 func (l *SyncStorageLock) ReleaseResource(ns string) error {
597         ok, err := l.s.getDbBackend(ns).DelIE(getNsPrefix(ns)+l.key, l.value)
598
599         if err != nil {
600                 return err
601         }
602         if !ok {
603                 return errors.New("Lock not held")
604         }
605         return nil
606 }
607
608 //RefreshResource function can be used to set a new expiration time for the
609 //resource lock (if the lock still exists) under given namespace. The old
610 //remaining expiration time is overwritten with the given new expiration time.
611 func (l *SyncStorageLock) RefreshResource(ns string, expiration time.Duration) error {
612         err := l.s.getDbBackend(ns).PExpireIE(getNsPrefix(ns)+l.key, l.value, expiration)
613         return err
614 }
615
616 //CheckResource returns the expiration time left for a resource under given
617 //namespace. If the resource doesn't exist, -2 is returned.
618 func (s *SyncStorage) CheckResource(ns string, resource string) (time.Duration, error) {
619         result, err := s.getDbBackend(ns).PTTL(getNsPrefix(ns) + resource)
620         if err != nil {
621                 return 0, err
622         }
623         if result == time.Duration(-1) {
624                 return 0, errors.New("invalid resource given, no expiration time attached")
625         }
626         return result, nil
627 }
628
629 //SyncStorageLock struct identifies the resource lock instance. Releasing and adjusting the
630 //expirations are done using the methods defined for this struct.
631 type SyncStorageLock struct {
632         s     *SyncStorage
633         key   string
634         value string
635 }
636
637 func getNsPrefix(ns string) string {
638         return "{" + ns + "}" + sdlgoredis.NsSeparator
639 }
640
641 type iDatabase interface {
642         SubscribeChannelDB(cb func(string, ...string), channels ...string)
643         UnsubscribeChannelDB(channels ...string)
644         MSet(pairs ...interface{}) error
645         MSetMPub(channelsAndEvents []string, pairs ...interface{}) error
646         MGet(keys []string) ([]interface{}, error)
647         CloseDB() error
648         Del(keys []string) error
649         DelMPub(channelsAndEvents []string, keys []string) error
650         Keys(key string) ([]string, error)
651         SetIE(key string, oldData, newData interface{}) (bool, error)
652         SetIEPub(channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error)
653         SetNX(key string, data interface{}, expiration time.Duration) (bool, error)
654         SetNXPub(channelsAndEvents []string, key string, data interface{}) (bool, error)
655         DelIE(key string, data interface{}) (bool, error)
656         DelIEPub(channelsAndEvents []string, key string, data interface{}) (bool, error)
657         SAdd(key string, data ...interface{}) error
658         SRem(key string, data ...interface{}) error
659         SMembers(key string) ([]string, error)
660         SIsMember(key string, data interface{}) (bool, error)
661         SCard(key string) (int64, error)
662         PTTL(key string) (time.Duration, error)
663         PExpireIE(key string, data interface{}, expiration time.Duration) error
664 }