Merge "New ListKeys API in syncstorage"
[ric-plt/sdlgo.git] / syncstorage.go
1 /*
2    Copyright (c) 2021 AT&T Intellectual Property.
3    Copyright (c) 2018-2021 Nokia.
4
5    Licensed under the Apache License, Version 2.0 (the "License");
6    you may not use this file except in compliance with the License.
7    You may obtain a copy of the License at
8
9        http://www.apache.org/licenses/LICENSE-2.0
10
11    Unless required by applicable law or agreed to in writing, software
12    distributed under the License is distributed on an "AS IS" BASIS,
13    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14    See the License for the specific language governing permissions and
15    limitations under the License.
16 */
17
18 /*
19  * This source code is part of the near-RT RIC (RAN Intelligent Controller)
20  * platform project (RICP).
21  */
22
23 package sdlgo
24
25 import (
26         "crypto/rand"
27         "encoding/base64"
28         "errors"
29         "fmt"
30         "hash/crc32"
31         "io"
32         "reflect"
33         "strings"
34         "sync"
35         "time"
36 )
37
38 //SyncStorage provides multi-namespace APIs to read, write and modify key-value
39 //pairs. key-values are belonging to a namespace and SyncStorage provides APIs
40 //where namespace can be given in every API call. This means that with
41 //SyncStorage you can easily set key-values under different namespace compared to
42 //SdlInstance where namespace can be defined only at SdlInstance instance creation
43 //time.
44 type SyncStorage struct {
45         eventSeparator string
46         mutex          sync.Mutex
47         tmp            []byte
48         db             *Database
49 }
50
51 //NewSyncStorage creates a new sdl instance.
52 //The database used as a backend is given as a parameter
53 func NewSyncStorage() *SyncStorage {
54         return newSyncStorage(NewDatabase())
55 }
56
57 func newSyncStorage(db *Database) *SyncStorage {
58         return &SyncStorage{
59                 eventSeparator: "___",
60                 db:             db,
61         }
62 }
63
64 //selectDbInstance Selects DB instance what provides DB services for the namespace
65 func (s *SyncStorage) getDbBackend(ns string) iDatabase {
66         instanceCount := uint32(len(s.db.instances))
67         instanceID := getHash(ns) % instanceCount
68         return s.db.instances[instanceID]
69 }
70
71 //getHash Returns hash value calculated from the string
72 func getHash(s string) uint32 {
73         tbl := crc32.MakeTable(crc32.IEEE)
74         return crc32.Checksum([]byte(s), tbl)
75 }
76
77 //SubscribeChannel lets you to subscribe for a events on a given channels.
78 //SDL notifications are events that are published on a specific channels.
79 //Both the channel and events are defined by the entity that is publishing
80 //the events under given namespace.
81 //
82 //When subscribing for a channel, a callback function is given as a parameter.
83 //Whenever a notification is received from a channel, this callback is called
84 //with channel and notifications as parameter (several notifications could be
85 //packed to a single callback function call). A call to SubscribeChannel function
86 //returns immediatelly, callbacks will be called asyncronously.
87 //
88 //It is possible to subscribe to different channels using different callbacks. In
89 //this case simply use SubscribeChannel function separately for each channel.
90 //
91 //When receiving events in callback routine, it is a good practive to return from
92 //callback as quickly as possible. E.g. reading in callback context should be avoided
93 //and using of Go signals is recommended. Also it should be noted that in case of several
94 //events received from different channels, callbacks are called in series one by one.
95 func (s *SyncStorage) SubscribeChannel(ns string, cb func(string, ...string), channels ...string) error {
96         nsPrefix := getNsPrefix(ns)
97         s.getDbBackend(ns).SubscribeChannelDB(cb, nsPrefix, s.eventSeparator, s.setNamespaceToChannels(nsPrefix, channels...)...)
98         return nil
99 }
100
101 //UnsubscribeChannel removes subscription from one or several channels under given
102 //namespace.
103 func (s *SyncStorage) UnsubscribeChannel(ns string, channels ...string) error {
104         nsPrefix := getNsPrefix(ns)
105         s.getDbBackend(ns).UnsubscribeChannelDB(s.setNamespaceToChannels(nsPrefix, channels...)...)
106         return nil
107 }
108
109 //Close connection to backend database.
110 func (s *SyncStorage) Close() error {
111         var ret error
112         for _, db := range s.db.instances {
113                 if err := db.CloseDB(); err != nil {
114                         ret = err
115                 }
116         }
117         return ret
118 }
119
120 func (s *SyncStorage) checkChannelsAndEvents(cmd string, channelsAndEvents []string) error {
121         if len(channelsAndEvents)%2 != 0 {
122                 return fmt.Errorf("%s: Channels and events must be given as pairs", cmd)
123         }
124         for i, v := range channelsAndEvents {
125                 if i%2 != 0 {
126                         if strings.Contains(v, s.eventSeparator) {
127                                 return fmt.Errorf("%s: event %s contains illegal substring (\"%s\")", cmd, v, s.eventSeparator)
128                         }
129                 }
130         }
131         return nil
132 }
133
134 func (s *SyncStorage) setNamespaceToChannels(nsPrefix string, channels ...string) []string {
135         var retVal []string
136         for _, v := range channels {
137                 retVal = append(retVal, nsPrefix+v)
138         }
139         return retVal
140 }
141
142 func (s *SyncStorage) setNamespaceToKeys(nsPrefix string, pairs ...interface{}) ([]interface{}, error) {
143         retVal := make([]interface{}, 0)
144         shouldBeKey := true
145         for _, v := range pairs {
146                 reflectType := reflect.TypeOf(v)
147                 switch reflectType.Kind() {
148                 case reflect.Map:
149                         x := reflect.ValueOf(v).MapRange()
150                         for x.Next() {
151                                 retVal = append(retVal, nsPrefix+x.Key().Interface().(string))
152                                 retVal = append(retVal, x.Value().Interface())
153                         }
154                 case reflect.Slice:
155                         if shouldBeKey {
156                                 x := reflect.ValueOf(v)
157                                 if x.Len()%2 != 0 {
158                                         return []interface{}{}, errors.New("Key/value pairs doesn't match")
159                                 }
160                                 for i2 := 0; i2 < x.Len(); i2++ {
161                                         if i2%2 == 0 {
162                                                 retVal = append(retVal, nsPrefix+x.Index(i2).Interface().(string))
163                                         } else {
164                                                 retVal = append(retVal, x.Index(i2).Interface())
165                                         }
166                                 }
167                         } else {
168                                 if reflectType.Elem().Kind() == reflect.Uint8 {
169                                         retVal = append(retVal, v)
170                                         shouldBeKey = true
171                                 } else {
172                                         return []interface{}{}, errors.New("Key/value pairs doesn't match")
173                                 }
174                         }
175                 case reflect.Array:
176                         if shouldBeKey {
177                                 x := reflect.ValueOf(v)
178                                 if x.Len()%2 != 0 {
179                                         return []interface{}{}, errors.New("Key/value pairs doesn't match")
180                                 }
181                                 for i2 := 0; i2 < x.Len(); i2++ {
182                                         if i2%2 == 0 {
183                                                 retVal = append(retVal, nsPrefix+x.Index(i2).Interface().(string))
184                                         } else {
185                                                 retVal = append(retVal, x.Index(i2).Interface())
186                                         }
187                                 }
188                         } else {
189                                 if reflectType.Elem().Kind() == reflect.Uint8 {
190                                         retVal = append(retVal, v)
191                                         shouldBeKey = true
192                                 } else {
193                                         return []interface{}{}, errors.New("Key/value pairs doesn't match")
194                                 }
195                         }
196                 default:
197                         if shouldBeKey {
198                                 retVal = append(retVal, nsPrefix+v.(string))
199                                 shouldBeKey = false
200                         } else {
201                                 retVal = append(retVal, v)
202                                 shouldBeKey = true
203                         }
204                 }
205         }
206         if len(retVal)%2 != 0 {
207                 return []interface{}{}, errors.New("Key/value pairs doesn't match")
208         }
209         return retVal, nil
210 }
211
212 func (s *SyncStorage) prepareChannelsAndEvents(nsPrefix string, channelsAndEvents []string) []string {
213         channelEventMap := make(map[string]string)
214         for i, v := range channelsAndEvents {
215                 if i%2 != 0 {
216                         continue
217                 }
218                 _, exists := channelEventMap[v]
219                 if exists {
220                         channelEventMap[v] = channelEventMap[v] + s.eventSeparator + channelsAndEvents[i+1]
221                 } else {
222                         channelEventMap[v] = channelsAndEvents[i+1]
223                 }
224         }
225         retVal := make([]string, 0)
226         for k, v := range channelEventMap {
227                 retVal = append(retVal, nsPrefix+k)
228                 retVal = append(retVal, v)
229         }
230         return retVal
231 }
232
233 //SetAndPublish function writes data to shared data layer storage and sends an event to
234 //a channel. Writing is done atomically, i.e. all succeeds or fails.
235 //Data is written under the namespace what is given as a parameter for this function.
236 //Data to be written is given as key-value pairs. Several key-value
237 //pairs can be written with one call.
238 //The key is expected to be string whereas value can be anything, string,
239 //number, slice array or map
240 //
241 //If data was set successfully, an event is sent to a channel.
242 //Channels and events are given as pairs is channelsAndEvents parameter.
243 //It is possible to send several events to several channels by giving several
244 //channel-event pairs.
245 //  E.g. []{"channel1", "event1", "channel2", "event2", "channel1", "event3"}
246 //will send event1 and event3 to channel1 and event2 to channel2.
247 func (s *SyncStorage) SetAndPublish(ns string, channelsAndEvents []string, pairs ...interface{}) error {
248         nsPrefix := getNsPrefix(ns)
249         keyAndData, err := s.setNamespaceToKeys(nsPrefix, pairs...)
250         if err != nil {
251                 return err
252         }
253         if len(channelsAndEvents) == 0 {
254                 return s.getDbBackend(ns).MSet(keyAndData...)
255         }
256         if err := s.checkChannelsAndEvents("SetAndPublish", channelsAndEvents); err != nil {
257                 return err
258         }
259         channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
260         return s.getDbBackend(ns).MSetMPub(channelsAndEventsPrepared, keyAndData...)
261 }
262
263 //Set function writes data to shared data layer storage. Writing is done
264 //atomically, i.e. all succeeds or fails.
265 //Data is written under the namespace what is given as a parameter for this function.
266 //Data to be written is given as key-value pairs. Several key-value
267 //pairs can be written with one call.
268 //The key is expected to be string whereas value can be anything, string,
269 //number, slice array or map
270 func (s *SyncStorage) Set(ns string, pairs ...interface{}) error {
271         if len(pairs) == 0 {
272                 return nil
273         }
274
275         keyAndData, err := s.setNamespaceToKeys(getNsPrefix(ns), pairs...)
276         if err != nil {
277                 return err
278         }
279         return s.getDbBackend(ns).MSet(keyAndData...)
280 }
281
282 //Get function atomically reads one or more keys from SDL. The returned map has the
283 //requested keys as index and data as value. If the requested key is not found
284 //from SDL, it's value is nil
285 //Read operation is targeted to the namespace what is given as a parameter for this
286 //function.
287 func (s *SyncStorage) Get(ns string, keys []string) (map[string]interface{}, error) {
288         m := make(map[string]interface{})
289         if len(keys) == 0 {
290                 return m, nil
291         }
292
293         var keysWithNs []string
294         for _, v := range keys {
295                 keysWithNs = append(keysWithNs, getNsPrefix(ns)+v)
296         }
297         val, err := s.getDbBackend(ns).MGet(keysWithNs)
298         if err != nil {
299                 return m, err
300         }
301         for i, v := range val {
302                 m[keys[i]] = v
303         }
304         return m, err
305 }
306
307 //SetIfAndPublish atomically replaces existing data with newData in SDL if data matches the oldData.
308 //If replace was done successfully, true will be returned. Also, if publishing was successfull, an event
309 //is published to a given channel.
310 //Data is written under the namespace what is given as a parameter for this function.
311 func (s *SyncStorage) SetIfAndPublish(ns string, channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error) {
312         nsPrefix := getNsPrefix(ns)
313         if len(channelsAndEvents) == 0 {
314                 return s.getDbBackend(ns).SetIE(nsPrefix+key, oldData, newData)
315         }
316         if err := s.checkChannelsAndEvents("SetIfAndPublish", channelsAndEvents); err != nil {
317                 return false, err
318         }
319         channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
320         return s.getDbBackend(ns).SetIEPub(channelsAndEventsPrepared, nsPrefix+key, oldData, newData)
321 }
322
323 //SetIf atomically replaces existing data with newData in SDL if data matches the oldData.
324 //If replace was done successfully, true will be returned.
325 //Data is written under the namespace what is given as a parameter for this function.
326 func (s *SyncStorage) SetIf(ns string, key string, oldData, newData interface{}) (bool, error) {
327         return s.getDbBackend(ns).SetIE(getNsPrefix(ns)+key, oldData, newData)
328 }
329
330 //SetIfNotExistsAndPublish conditionally sets the value of a key. If key already exists in SDL,
331 //then it's value is not changed. Checking the key existence and potential set operation
332 //is done atomically. If the set operation was done successfully, an event is published to a
333 //given channel.
334 //Data is written under the namespace what is given as a parameter for this function.
335 func (s *SyncStorage) SetIfNotExistsAndPublish(ns string, channelsAndEvents []string, key string, data interface{}) (bool, error) {
336         nsPrefix := getNsPrefix(ns)
337         if len(channelsAndEvents) == 0 {
338                 return s.getDbBackend(ns).SetNX(nsPrefix+key, data, 0)
339         }
340         if err := s.checkChannelsAndEvents("SetIfNotExistsAndPublish", channelsAndEvents); err != nil {
341                 return false, err
342         }
343         channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
344         return s.getDbBackend(ns).SetNXPub(channelsAndEventsPrepared, nsPrefix+key, data)
345 }
346
347 //SetIfNotExists conditionally sets the value of a key. If key already exists in SDL,
348 //then it's value is not changed. Checking the key existence and potential set operation
349 //is done atomically.
350 //Data is written under the namespace what is given as a parameter for this function.
351 func (s *SyncStorage) SetIfNotExists(ns string, key string, data interface{}) (bool, error) {
352         return s.getDbBackend(ns).SetNX(getNsPrefix(ns)+key, data, 0)
353 }
354
355 //RemoveAndPublish removes data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
356 //Trying to remove a nonexisting key is not considered as an error.
357 //An event is published into a given channel if remove operation is successfull and
358 //at least one key is removed (if several keys given). If the given key(s) doesn't exist
359 //when trying to remove, no event is published.
360 //Data is removed under the namespace what is given as a parameter for this function.
361 func (s *SyncStorage) RemoveAndPublish(ns string, channelsAndEvents []string, keys []string) error {
362         if len(keys) == 0 {
363                 return nil
364         }
365
366         var keysWithNs []string
367         nsPrefix := getNsPrefix(ns)
368         for _, v := range keys {
369                 keysWithNs = append(keysWithNs, nsPrefix+v)
370         }
371         if len(channelsAndEvents) == 0 {
372                 return s.getDbBackend(ns).Del(keysWithNs)
373         }
374         if err := s.checkChannelsAndEvents("RemoveAndPublish", channelsAndEvents); err != nil {
375                 return err
376         }
377         channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
378         return s.getDbBackend(ns).DelMPub(channelsAndEventsPrepared, keysWithNs)
379 }
380
381 //Remove data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
382 //Data is removed under the namespace what is given as a parameter for this function.
383 func (s *SyncStorage) Remove(ns string, keys []string) error {
384         if len(keys) == 0 {
385                 return nil
386         }
387
388         var keysWithNs []string
389         for _, v := range keys {
390                 keysWithNs = append(keysWithNs, getNsPrefix(ns)+v)
391         }
392         err := s.getDbBackend(ns).Del(keysWithNs)
393         return err
394 }
395
396 //RemoveIfAndPublish removes data from SDL conditionally and if remove was done successfully,
397 //a given event is published to channel. If existing data matches given data,
398 //key and data are removed from SDL. If remove was done successfully, true is returned.
399 //Data is removed under the namespace what is given as a parameter for this function.
400 func (s *SyncStorage) RemoveIfAndPublish(ns string, channelsAndEvents []string, key string, data interface{}) (bool, error) {
401         nsPrefix := getNsPrefix(ns)
402         if len(channelsAndEvents) == 0 {
403                 return s.getDbBackend(ns).DelIE(nsPrefix+key, data)
404         }
405         if err := s.checkChannelsAndEvents("RemoveIfAndPublish", channelsAndEvents); err != nil {
406                 return false, err
407         }
408         channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
409         return s.getDbBackend(ns).DelIEPub(channelsAndEventsPrepared, nsPrefix+key, data)
410 }
411
412 //RemoveIf removes data from SDL conditionally. If existing data matches given data,
413 //key and data are removed from SDL. If remove was done successfully, true is returned.
414 //Data is removed under the namespace what is given as a parameter for this function.
415 func (s *SyncStorage) RemoveIf(ns string, key string, data interface{}) (bool, error) {
416         status, err := s.getDbBackend(ns).DelIE(getNsPrefix(ns)+key, data)
417         if err != nil {
418                 return false, err
419         }
420         return status, nil
421 }
422
423 //GetAll returns all keys under the namespace. No prior knowledge about the keys in the
424 //given namespace exists, thus operation is not guaranteed to be atomic or isolated.
425 func (s *SyncStorage) GetAll(ns string) ([]string, error) {
426         nsPrefix := getNsPrefix(ns)
427         keys, err := s.getDbBackend(ns).Keys(nsPrefix + "*")
428         var retVal []string
429         if err != nil {
430                 return retVal, err
431         }
432         for _, v := range keys {
433                 retVal = append(retVal, strings.Split(v, nsPrefix)[1])
434         }
435         return retVal, err
436 }
437
438 // ListKeys returns all keys in the given namespace matching key search pattern.
439 //
440 //  Supported search glob-style patterns:
441 //    h?llo matches hello, hallo and hxllo
442 //    h*llo matches hllo and heeeello
443 //    h[ae]llo matches hello and hallo, but not hillo
444 //    h[^e]llo matches hallo, hbllo, ... but not hello
445 //    h[a-b]llo matches hallo and hbllo
446 //
447 //  The \ escapes character in key search pattern and those will be treated as a normal
448 //  character:
449 //    h\[?llo\* matches h[ello* and h[allo*
450 //
451 // No prior knowledge about the keys in the given namespace exists,
452 // thus operation is not guaranteed to be atomic or isolated.
453 func (s *SyncStorage) ListKeys(ns string, pattern string) ([]string, error) {
454         nsPrefix := getNsPrefix(ns)
455         nsKeys, err := s.getDbBackend(ns).Keys(nsPrefix + pattern)
456         var keys []string
457         if err != nil {
458                 return keys, err
459         }
460         for _, key := range nsKeys {
461                 keys = append(keys, strings.Split(key, nsPrefix)[1])
462         }
463         return keys, err
464 }
465
466 //RemoveAll removes all keys under the namespace. Remove operation is not atomic, thus
467 //it is not guaranteed that all keys are removed.
468 func (s *SyncStorage) RemoveAll(ns string) error {
469         keys, err := s.getDbBackend(ns).Keys(getNsPrefix(ns) + "*")
470         if err != nil {
471                 return err
472         }
473         if (keys != nil) && (len(keys) != 0) {
474                 err = s.getDbBackend(ns).Del(keys)
475         }
476         return err
477 }
478
479 //RemoveAllAndPublish removes all keys under the namespace and if successfull, it
480 //will publish an event to given channel. This operation is not atomic, thus it is
481 //not guaranteed that all keys are removed.
482 func (s *SyncStorage) RemoveAllAndPublish(ns string, channelsAndEvents []string) error {
483         nsPrefix := getNsPrefix(ns)
484         keys, err := s.getDbBackend(ns).Keys(nsPrefix + "*")
485         if err != nil {
486                 return err
487         }
488         if (keys != nil) && (len(keys) != 0) {
489                 if len(channelsAndEvents) == 0 {
490                         return s.getDbBackend(ns).Del(keys)
491                 }
492                 if err := s.checkChannelsAndEvents("RemoveAllAndPublish", channelsAndEvents); err != nil {
493                         return err
494                 }
495                 channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
496                 err = s.getDbBackend(ns).DelMPub(channelsAndEventsPrepared, keys)
497         }
498         return err
499 }
500
501 //AddMember adds a new members to a group under given namespace.
502 //
503 //SDL groups are unordered collections of members where each member is
504 //unique. It is possible to add the same member several times without the
505 //need to check if it already exists.
506 func (s *SyncStorage) AddMember(ns string, group string, member ...interface{}) error {
507         return s.getDbBackend(ns).SAdd(getNsPrefix(ns)+group, member...)
508 }
509
510 //RemoveMember removes members from a group under given namespace.
511 func (s *SyncStorage) RemoveMember(ns string, group string, member ...interface{}) error {
512         return s.getDbBackend(ns).SRem(getNsPrefix(ns)+group, member...)
513 }
514
515 //RemoveGroup removes the whole group along with it's members under given namespace.
516 func (s *SyncStorage) RemoveGroup(ns string, group string) error {
517         return s.getDbBackend(ns).Del([]string{getNsPrefix(ns) + group})
518 }
519
520 //GetMembers returns all the members from a group under given namespace.
521 func (s *SyncStorage) GetMembers(ns string, group string) ([]string, error) {
522         retVal, err := s.getDbBackend(ns).SMembers(getNsPrefix(ns) + group)
523         if err != nil {
524                 return []string{}, err
525         }
526         return retVal, err
527 }
528
529 //IsMember returns true if given member is found from a group under given namespace.
530 func (s *SyncStorage) IsMember(ns string, group string, member interface{}) (bool, error) {
531         retVal, err := s.getDbBackend(ns).SIsMember(getNsPrefix(ns)+group, member)
532         if err != nil {
533                 return false, err
534         }
535         return retVal, err
536 }
537
538 //GroupSize returns the number of members in a group under given namespace.
539 func (s *SyncStorage) GroupSize(ns string, group string) (int64, error) {
540         retVal, err := s.getDbBackend(ns).SCard(getNsPrefix(ns) + group)
541         if err != nil {
542                 return 0, err
543         }
544         return retVal, err
545 }
546
547 func (s *SyncStorage) randomToken() (string, error) {
548         s.mutex.Lock()
549         defer s.mutex.Unlock()
550
551         if len(s.tmp) == 0 {
552                 s.tmp = make([]byte, 16)
553         }
554
555         if _, err := io.ReadFull(rand.Reader, s.tmp); err != nil {
556                 return "", err
557         }
558
559         return base64.RawURLEncoding.EncodeToString(s.tmp), nil
560 }
561
562 //LockResource function is used for locking a resource under given namespace.
563 //The resource lock in practice is a key with random value that is set to expire
564 //after a time period. The value written to key is a random value, thus only the
565 //instance created a lock, can release it. Resource locks are per namespace.
566 func (s *SyncStorage) LockResource(ns string, resource string, expiration time.Duration, opt *Options) (*SyncStorageLock, error) {
567         value, err := s.randomToken()
568         if err != nil {
569                 return nil, err
570         }
571
572         var retryTimer *time.Timer
573         for i, attempts := 0, opt.getRetryCount()+1; i < attempts; i++ {
574                 ok, err := s.getDbBackend(ns).SetNX(getNsPrefix(ns)+resource, value, expiration)
575                 if err != nil {
576                         return nil, err
577                 } else if ok {
578                         return &SyncStorageLock{s: s, key: resource, value: value}, nil
579                 }
580                 if retryTimer == nil {
581                         retryTimer = time.NewTimer(opt.getRetryWait())
582                         defer retryTimer.Stop()
583                 } else {
584                         retryTimer.Reset(opt.getRetryWait())
585                 }
586
587                 select {
588                 case <-retryTimer.C:
589                 }
590         }
591         return nil, errors.New("Lock not obtained")
592 }
593
594 //ReleaseResource removes the lock from a resource under given namespace. If lock
595 //is already expired or some other instance is keeping the lock (lock taken after
596 //expiration), an error is returned.
597 func (l *SyncStorageLock) ReleaseResource(ns string) error {
598         ok, err := l.s.getDbBackend(ns).DelIE(getNsPrefix(ns)+l.key, l.value)
599
600         if err != nil {
601                 return err
602         }
603         if !ok {
604                 return errors.New("Lock not held")
605         }
606         return nil
607 }
608
609 //RefreshResource function can be used to set a new expiration time for the
610 //resource lock (if the lock still exists) under given namespace. The old
611 //remaining expiration time is overwritten with the given new expiration time.
612 func (l *SyncStorageLock) RefreshResource(ns string, expiration time.Duration) error {
613         err := l.s.getDbBackend(ns).PExpireIE(getNsPrefix(ns)+l.key, l.value, expiration)
614         return err
615 }
616
617 //CheckResource returns the expiration time left for a resource under given
618 //namespace. If the resource doesn't exist, -2 is returned.
619 func (s *SyncStorage) CheckResource(ns string, resource string) (time.Duration, error) {
620         result, err := s.getDbBackend(ns).PTTL(getNsPrefix(ns) + resource)
621         if err != nil {
622                 return 0, err
623         }
624         if result == time.Duration(-1) {
625                 return 0, errors.New("invalid resource given, no expiration time attached")
626         }
627         return result, nil
628 }
629
630 //SyncStorageLock struct identifies the resource lock instance. Releasing and adjusting the
631 //expirations are done using the methods defined for this struct.
632 type SyncStorageLock struct {
633         s     *SyncStorage
634         key   string
635         value string
636 }
637
638 func getNsPrefix(ns string) string {
639         return "{" + ns + "},"
640 }
641
642 type iDatabase interface {
643         SubscribeChannelDB(cb func(string, ...string), channelPrefix, eventSeparator string, channels ...string)
644         UnsubscribeChannelDB(channels ...string)
645         MSet(pairs ...interface{}) error
646         MSetMPub(channelsAndEvents []string, pairs ...interface{}) error
647         MGet(keys []string) ([]interface{}, error)
648         CloseDB() error
649         Del(keys []string) error
650         DelMPub(channelsAndEvents []string, keys []string) error
651         Keys(key string) ([]string, error)
652         SetIE(key string, oldData, newData interface{}) (bool, error)
653         SetIEPub(channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error)
654         SetNX(key string, data interface{}, expiration time.Duration) (bool, error)
655         SetNXPub(channelsAndEvents []string, key string, data interface{}) (bool, error)
656         DelIE(key string, data interface{}) (bool, error)
657         DelIEPub(channelsAndEvents []string, key string, data interface{}) (bool, error)
658         SAdd(key string, data ...interface{}) error
659         SRem(key string, data ...interface{}) error
660         SMembers(key string) ([]string, error)
661         SIsMember(key string, data interface{}) (bool, error)
662         SCard(key string) (int64, error)
663         PTTL(key string) (time.Duration, error)
664         PExpireIE(key string, data interface{}, expiration time.Duration) error
665 }