Merge "RIC:1060: Change in PTL"
[ric-plt/sdlgo.git] / syncstorage.go
1 /*
2    Copyright (c) 2021 AT&T Intellectual Property.
3    Copyright (c) 2018-2021 Nokia.
4
5    Licensed under the Apache License, Version 2.0 (the "License");
6    you may not use this file except in compliance with the License.
7    You may obtain a copy of the License at
8
9        http://www.apache.org/licenses/LICENSE-2.0
10
11    Unless required by applicable law or agreed to in writing, software
12    distributed under the License is distributed on an "AS IS" BASIS,
13    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14    See the License for the specific language governing permissions and
15    limitations under the License.
16 */
17
18 /*
19  * This source code is part of the near-RT RIC (RAN Intelligent Controller)
20  * platform project (RICP).
21  */
22
23 package sdlgo
24
25 import (
26         "crypto/rand"
27         "encoding/base64"
28         "errors"
29         "fmt"
30         "gerrit.o-ran-sc.org/r/ric-plt/sdlgo/internal/sdlgoredis"
31         "hash/crc32"
32         "io"
33         "reflect"
34         "strings"
35         "sync"
36         "time"
37 )
38
39 //SyncStorage provides multi-namespace APIs to read, write and modify key-value
40 //pairs. key-values are belonging to a namespace and SyncStorage provides APIs
41 //where namespace can be given in every API call. This means that with
42 //SyncStorage you can easily set key-values under different namespace compared to
43 //SdlInstance where namespace can be defined only at SdlInstance instance creation
44 //time.
45 type SyncStorage struct {
46         mutex sync.Mutex
47         tmp   []byte
48         db    *Database
49 }
50
51 //NewSyncStorage creates a new sdl instance.
52 //The database used as a backend is given as a parameter
53 func NewSyncStorage() *SyncStorage {
54         return newSyncStorage(NewDatabase())
55 }
56
57 func newSyncStorage(db *Database) *SyncStorage {
58         return &SyncStorage{
59                 db: db,
60         }
61 }
62
63 //selectDbInstance Selects DB instance what provides DB services for the namespace
64 func (s *SyncStorage) getDbBackend(ns string) iDatabase {
65         instanceCount := uint32(len(s.db.instances))
66         instanceID := getHash(ns) % instanceCount
67         return s.db.instances[instanceID]
68 }
69
70 //getHash Returns hash value calculated from the string
71 func getHash(s string) uint32 {
72         tbl := crc32.MakeTable(crc32.IEEE)
73         return crc32.Checksum([]byte(s), tbl)
74 }
75
76 //SubscribeChannel lets you to subscribe for a events on a given channels.
77 //SDL notifications are events that are published on a specific channels.
78 //Both the channel and events are defined by the entity that is publishing
79 //the events under given namespace.
80 //
81 //When subscribing for a channel, a callback function is given as a parameter.
82 //Whenever a notification is received from a channel, this callback is called
83 //with channel and notifications as parameter (several notifications could be
84 //packed to a single callback function call). A call to SubscribeChannel function
85 //returns immediatelly, callbacks will be called asyncronously.
86 //
87 //It is possible to subscribe to different channels using different callbacks. In
88 //this case simply use SubscribeChannel function separately for each channel.
89 //
90 //When receiving events in callback routine, it is a good practive to return from
91 //callback as quickly as possible. E.g. reading in callback context should be avoided
92 //and using of Go signals is recommended. Also it should be noted that in case of several
93 //events received from different channels, callbacks are called in series one by one.
94 func (s *SyncStorage) SubscribeChannel(ns string, cb func(string, ...string), channels ...string) error {
95         nsPrefix := getNsPrefix(ns)
96         return s.getDbBackend(ns).SubscribeChannelDB(cb, s.setNamespaceToChannels(nsPrefix, channels...)...)
97 }
98
99 //UnsubscribeChannel removes subscription from one or several channels under given
100 //namespace.
101 func (s *SyncStorage) UnsubscribeChannel(ns string, channels ...string) error {
102         nsPrefix := getNsPrefix(ns)
103         return s.getDbBackend(ns).UnsubscribeChannelDB(s.setNamespaceToChannels(nsPrefix, channels...)...)
104 }
105
106 //Close connection to backend database.
107 func (s *SyncStorage) Close() error {
108         var ret error
109         for _, db := range s.db.instances {
110                 if err := db.CloseDB(); err != nil {
111                         ret = err
112                 }
113         }
114         return ret
115 }
116
117 func (s *SyncStorage) checkChannelsAndEvents(cmd string, channelsAndEvents []string) error {
118         if len(channelsAndEvents)%2 != 0 {
119                 return fmt.Errorf("%s: Channels and events must be given as pairs", cmd)
120         }
121         for i, v := range channelsAndEvents {
122                 if i%2 != 0 {
123                         if strings.Contains(v, sdlgoredis.EventSeparator) {
124                                 return fmt.Errorf("%s: event %s contains illegal substring (\"%s\")", cmd, v, sdlgoredis.EventSeparator)
125                         }
126                 }
127         }
128         return nil
129 }
130
131 func (s *SyncStorage) setNamespaceToChannels(nsPrefix string, channels ...string) []string {
132         var retVal []string
133         for _, v := range channels {
134                 retVal = append(retVal, nsPrefix+v)
135         }
136         return retVal
137 }
138
139 func (s *SyncStorage) setNamespaceToKeys(nsPrefix string, pairs ...interface{}) ([]interface{}, error) {
140         retVal := make([]interface{}, 0)
141         shouldBeKey := true
142         for _, v := range pairs {
143                 reflectType := reflect.TypeOf(v)
144                 switch reflectType.Kind() {
145                 case reflect.Map:
146                         x := reflect.ValueOf(v).MapRange()
147                         for x.Next() {
148                                 retVal = append(retVal, nsPrefix+x.Key().Interface().(string))
149                                 retVal = append(retVal, x.Value().Interface())
150                         }
151                 case reflect.Slice:
152                         if shouldBeKey {
153                                 x := reflect.ValueOf(v)
154                                 if x.Len()%2 != 0 {
155                                         return []interface{}{}, errors.New("Key/value pairs doesn't match")
156                                 }
157                                 for i2 := 0; i2 < x.Len(); i2++ {
158                                         if i2%2 == 0 {
159                                                 retVal = append(retVal, nsPrefix+x.Index(i2).Interface().(string))
160                                         } else {
161                                                 retVal = append(retVal, x.Index(i2).Interface())
162                                         }
163                                 }
164                         } else {
165                                 if reflectType.Elem().Kind() == reflect.Uint8 {
166                                         retVal = append(retVal, v)
167                                         shouldBeKey = true
168                                 } else {
169                                         return []interface{}{}, errors.New("Key/value pairs doesn't match")
170                                 }
171                         }
172                 case reflect.Array:
173                         if shouldBeKey {
174                                 x := reflect.ValueOf(v)
175                                 if x.Len()%2 != 0 {
176                                         return []interface{}{}, errors.New("Key/value pairs doesn't match")
177                                 }
178                                 for i2 := 0; i2 < x.Len(); i2++ {
179                                         if i2%2 == 0 {
180                                                 retVal = append(retVal, nsPrefix+x.Index(i2).Interface().(string))
181                                         } else {
182                                                 retVal = append(retVal, x.Index(i2).Interface())
183                                         }
184                                 }
185                         } else {
186                                 if reflectType.Elem().Kind() == reflect.Uint8 {
187                                         retVal = append(retVal, v)
188                                         shouldBeKey = true
189                                 } else {
190                                         return []interface{}{}, errors.New("Key/value pairs doesn't match")
191                                 }
192                         }
193                 default:
194                         if shouldBeKey {
195                                 retVal = append(retVal, nsPrefix+v.(string))
196                                 shouldBeKey = false
197                         } else {
198                                 retVal = append(retVal, v)
199                                 shouldBeKey = true
200                         }
201                 }
202         }
203         if len(retVal)%2 != 0 {
204                 return []interface{}{}, errors.New("Key/value pairs doesn't match")
205         }
206         return retVal, nil
207 }
208
209 func (s *SyncStorage) prepareChannelsAndEvents(nsPrefix string, channelsAndEvents []string) []string {
210         channelEventMap := make(map[string]string)
211         for i, v := range channelsAndEvents {
212                 if i%2 != 0 {
213                         continue
214                 }
215                 _, exists := channelEventMap[v]
216                 if exists {
217                         channelEventMap[v] = channelEventMap[v] + sdlgoredis.EventSeparator + channelsAndEvents[i+1]
218                 } else {
219                         channelEventMap[v] = channelsAndEvents[i+1]
220                 }
221         }
222         retVal := make([]string, 0)
223         for k, v := range channelEventMap {
224                 retVal = append(retVal, nsPrefix+k)
225                 retVal = append(retVal, v)
226         }
227         return retVal
228 }
229
230 //SetAndPublish function writes data to shared data layer storage and sends an event to
231 //a channel. Writing is done atomically, i.e. all succeeds or fails.
232 //Data is written under the namespace what is given as a parameter for this function.
233 //Data to be written is given as key-value pairs. Several key-value
234 //pairs can be written with one call.
235 //The key is expected to be string whereas value can be anything, string,
236 //number, slice array or map
237 //
238 //If data was set successfully, an event is sent to a channel.
239 //Channels and events are given as pairs is channelsAndEvents parameter.
240 //It is possible to send several events to several channels by giving several
241 //channel-event pairs.
242 //  E.g. []{"channel1", "event1", "channel2", "event2", "channel1", "event3"}
243 //will send event1 and event3 to channel1 and event2 to channel2.
244 func (s *SyncStorage) SetAndPublish(ns string, channelsAndEvents []string, pairs ...interface{}) error {
245         nsPrefix := getNsPrefix(ns)
246         keyAndData, err := s.setNamespaceToKeys(nsPrefix, pairs...)
247         if err != nil {
248                 return err
249         }
250         if len(channelsAndEvents) == 0 {
251                 return s.getDbBackend(ns).MSet(keyAndData...)
252         }
253         if err := s.checkChannelsAndEvents("SetAndPublish", channelsAndEvents); err != nil {
254                 return err
255         }
256         channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
257         return s.getDbBackend(ns).MSetMPub(channelsAndEventsPrepared, keyAndData...)
258 }
259
260 //Set function writes data to shared data layer storage. Writing is done
261 //atomically, i.e. all succeeds or fails.
262 //Data is written under the namespace what is given as a parameter for this function.
263 //Data to be written is given as key-value pairs. Several key-value
264 //pairs can be written with one call.
265 //The key is expected to be string whereas value can be anything, string,
266 //number, slice array or map
267 func (s *SyncStorage) Set(ns string, pairs ...interface{}) error {
268         if len(pairs) == 0 {
269                 return nil
270         }
271
272         keyAndData, err := s.setNamespaceToKeys(getNsPrefix(ns), pairs...)
273         if err != nil {
274                 return err
275         }
276         return s.getDbBackend(ns).MSet(keyAndData...)
277 }
278
279 //Get function atomically reads one or more keys from SDL. The returned map has the
280 //requested keys as index and data as value. If the requested key is not found
281 //from SDL, it's value is nil
282 //Read operation is targeted to the namespace what is given as a parameter for this
283 //function.
284 func (s *SyncStorage) Get(ns string, keys []string) (map[string]interface{}, error) {
285         m := make(map[string]interface{})
286         if len(keys) == 0 {
287                 return m, nil
288         }
289
290         var keysWithNs []string
291         for _, v := range keys {
292                 keysWithNs = append(keysWithNs, getNsPrefix(ns)+v)
293         }
294         val, err := s.getDbBackend(ns).MGet(keysWithNs)
295         if err != nil {
296                 return m, err
297         }
298         for i, v := range val {
299                 m[keys[i]] = v
300         }
301         return m, err
302 }
303
304 //SetIfAndPublish atomically replaces existing data with newData in SDL if data matches the oldData.
305 //If replace was done successfully, true will be returned. Also, if publishing was successfull, an event
306 //is published to a given channel.
307 //Data is written under the namespace what is given as a parameter for this function.
308 func (s *SyncStorage) SetIfAndPublish(ns string, channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error) {
309         nsPrefix := getNsPrefix(ns)
310         if len(channelsAndEvents) == 0 {
311                 return s.getDbBackend(ns).SetIE(nsPrefix+key, oldData, newData)
312         }
313         if err := s.checkChannelsAndEvents("SetIfAndPublish", channelsAndEvents); err != nil {
314                 return false, err
315         }
316         channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
317         return s.getDbBackend(ns).SetIEPub(channelsAndEventsPrepared, nsPrefix+key, oldData, newData)
318 }
319
320 //SetIf atomically replaces existing data with newData in SDL if data matches the oldData.
321 //If replace was done successfully, true will be returned.
322 //Data is written under the namespace what is given as a parameter for this function.
323 func (s *SyncStorage) SetIf(ns string, key string, oldData, newData interface{}) (bool, error) {
324         return s.getDbBackend(ns).SetIE(getNsPrefix(ns)+key, oldData, newData)
325 }
326
327 //SetIfNotExistsAndPublish conditionally sets the value of a key. If key already exists in SDL,
328 //then it's value is not changed. Checking the key existence and potential set operation
329 //is done atomically. If the set operation was done successfully, an event is published to a
330 //given channel.
331 //Data is written under the namespace what is given as a parameter for this function.
332 func (s *SyncStorage) SetIfNotExistsAndPublish(ns string, channelsAndEvents []string, key string, data interface{}) (bool, error) {
333         nsPrefix := getNsPrefix(ns)
334         if len(channelsAndEvents) == 0 {
335                 return s.getDbBackend(ns).SetNX(nsPrefix+key, data, 0)
336         }
337         if err := s.checkChannelsAndEvents("SetIfNotExistsAndPublish", channelsAndEvents); err != nil {
338                 return false, err
339         }
340         channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
341         return s.getDbBackend(ns).SetNXPub(channelsAndEventsPrepared, nsPrefix+key, data)
342 }
343
344 //SetIfNotExists conditionally sets the value of a key. If key already exists in SDL,
345 //then it's value is not changed. Checking the key existence and potential set operation
346 //is done atomically.
347 //Data is written under the namespace what is given as a parameter for this function.
348 func (s *SyncStorage) SetIfNotExists(ns string, key string, data interface{}) (bool, error) {
349         return s.getDbBackend(ns).SetNX(getNsPrefix(ns)+key, data, 0)
350 }
351
352 //RemoveAndPublish removes data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
353 //Trying to remove a nonexisting key is not considered as an error.
354 //An event is published into a given channel if remove operation is successfull and
355 //at least one key is removed (if several keys given). If the given key(s) doesn't exist
356 //when trying to remove, no event is published.
357 //Data is removed under the namespace what is given as a parameter for this function.
358 func (s *SyncStorage) RemoveAndPublish(ns string, channelsAndEvents []string, keys []string) error {
359         if len(keys) == 0 {
360                 return nil
361         }
362
363         var keysWithNs []string
364         nsPrefix := getNsPrefix(ns)
365         for _, v := range keys {
366                 keysWithNs = append(keysWithNs, nsPrefix+v)
367         }
368         if len(channelsAndEvents) == 0 {
369                 return s.getDbBackend(ns).Del(keysWithNs)
370         }
371         if err := s.checkChannelsAndEvents("RemoveAndPublish", channelsAndEvents); err != nil {
372                 return err
373         }
374         channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
375         return s.getDbBackend(ns).DelMPub(channelsAndEventsPrepared, keysWithNs)
376 }
377
378 //Remove data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
379 //Data is removed under the namespace what is given as a parameter for this function.
380 func (s *SyncStorage) Remove(ns string, keys []string) error {
381         if len(keys) == 0 {
382                 return nil
383         }
384
385         var keysWithNs []string
386         for _, v := range keys {
387                 keysWithNs = append(keysWithNs, getNsPrefix(ns)+v)
388         }
389         err := s.getDbBackend(ns).Del(keysWithNs)
390         return err
391 }
392
393 //RemoveIfAndPublish removes data from SDL conditionally and if remove was done successfully,
394 //a given event is published to channel. If existing data matches given data,
395 //key and data are removed from SDL. If remove was done successfully, true is returned.
396 //Data is removed under the namespace what is given as a parameter for this function.
397 func (s *SyncStorage) RemoveIfAndPublish(ns string, channelsAndEvents []string, key string, data interface{}) (bool, error) {
398         nsPrefix := getNsPrefix(ns)
399         if len(channelsAndEvents) == 0 {
400                 return s.getDbBackend(ns).DelIE(nsPrefix+key, data)
401         }
402         if err := s.checkChannelsAndEvents("RemoveIfAndPublish", channelsAndEvents); err != nil {
403                 return false, err
404         }
405         channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
406         return s.getDbBackend(ns).DelIEPub(channelsAndEventsPrepared, nsPrefix+key, data)
407 }
408
409 //RemoveIf removes data from SDL conditionally. If existing data matches given data,
410 //key and data are removed from SDL. If remove was done successfully, true is returned.
411 //Data is removed under the namespace what is given as a parameter for this function.
412 func (s *SyncStorage) RemoveIf(ns string, key string, data interface{}) (bool, error) {
413         status, err := s.getDbBackend(ns).DelIE(getNsPrefix(ns)+key, data)
414         if err != nil {
415                 return false, err
416         }
417         return status, nil
418 }
419
420 //GetAll returns all keys under the namespace. No prior knowledge about the keys in the
421 //given namespace exists, thus operation is not guaranteed to be atomic or isolated.
422 func (s *SyncStorage) GetAll(ns string) ([]string, error) {
423         nsPrefix := getNsPrefix(ns)
424         keys, err := s.getDbBackend(ns).Keys(nsPrefix + "*")
425         var retVal []string
426         if err != nil {
427                 return retVal, err
428         }
429         for _, v := range keys {
430                 retVal = append(retVal, strings.Split(v, nsPrefix)[1])
431         }
432         return retVal, err
433 }
434
435 // ListKeys returns all keys in the given namespace matching key search pattern.
436 //
437 //  Supported search glob-style patterns:
438 //    h?llo matches hello, hallo and hxllo
439 //    h*llo matches hllo and heeeello
440 //    h[ae]llo matches hello and hallo, but not hillo
441 //    h[^e]llo matches hallo, hbllo, ... but not hello
442 //    h[a-b]llo matches hallo and hbllo
443 //
444 //  The \ escapes character in key search pattern and those will be treated as a normal
445 //  character:
446 //    h\[?llo\* matches h[ello* and h[allo*
447 //
448 // No prior knowledge about the keys in the given namespace exists,
449 // thus operation is not guaranteed to be atomic or isolated.
450 func (s *SyncStorage) ListKeys(ns string, pattern string) ([]string, error) {
451         nsPrefix := getNsPrefix(ns)
452         nsKeys, err := s.getDbBackend(ns).Keys(nsPrefix + pattern)
453         var keys []string
454         if err != nil {
455                 return keys, err
456         }
457         for _, key := range nsKeys {
458                 keys = append(keys, strings.Split(key, nsPrefix)[1])
459         }
460         return keys, err
461 }
462
463 //RemoveAll removes all keys under the namespace. Remove operation is not atomic, thus
464 //it is not guaranteed that all keys are removed.
465 func (s *SyncStorage) RemoveAll(ns string) error {
466         keys, err := s.getDbBackend(ns).Keys(getNsPrefix(ns) + "*")
467         if err != nil {
468                 return err
469         }
470         if (keys != nil) && (len(keys) != 0) {
471                 err = s.getDbBackend(ns).Del(keys)
472         }
473         return err
474 }
475
476 //RemoveAllAndPublish removes all keys under the namespace and if successfull, it
477 //will publish an event to given channel. This operation is not atomic, thus it is
478 //not guaranteed that all keys are removed.
479 func (s *SyncStorage) RemoveAllAndPublish(ns string, channelsAndEvents []string) error {
480         nsPrefix := getNsPrefix(ns)
481         keys, err := s.getDbBackend(ns).Keys(nsPrefix + "*")
482         if err != nil {
483                 return err
484         }
485         if (keys != nil) && (len(keys) != 0) {
486                 if len(channelsAndEvents) == 0 {
487                         return s.getDbBackend(ns).Del(keys)
488                 }
489                 if err := s.checkChannelsAndEvents("RemoveAllAndPublish", channelsAndEvents); err != nil {
490                         return err
491                 }
492                 channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
493                 err = s.getDbBackend(ns).DelMPub(channelsAndEventsPrepared, keys)
494         }
495         return err
496 }
497
498 //AddMember adds a new members to a group under given namespace.
499 //
500 //SDL groups are unordered collections of members where each member is
501 //unique. It is possible to add the same member several times without the
502 //need to check if it already exists.
503 func (s *SyncStorage) AddMember(ns string, group string, member ...interface{}) error {
504         return s.getDbBackend(ns).SAdd(getNsPrefix(ns)+group, member...)
505 }
506
507 //RemoveMember removes members from a group under given namespace.
508 func (s *SyncStorage) RemoveMember(ns string, group string, member ...interface{}) error {
509         return s.getDbBackend(ns).SRem(getNsPrefix(ns)+group, member...)
510 }
511
512 //RemoveGroup removes the whole group along with it's members under given namespace.
513 func (s *SyncStorage) RemoveGroup(ns string, group string) error {
514         return s.getDbBackend(ns).Del([]string{getNsPrefix(ns) + group})
515 }
516
517 //GetMembers returns all the members from a group under given namespace.
518 func (s *SyncStorage) GetMembers(ns string, group string) ([]string, error) {
519         retVal, err := s.getDbBackend(ns).SMembers(getNsPrefix(ns) + group)
520         if err != nil {
521                 return []string{}, err
522         }
523         return retVal, err
524 }
525
526 //IsMember returns true if given member is found from a group under given namespace.
527 func (s *SyncStorage) IsMember(ns string, group string, member interface{}) (bool, error) {
528         retVal, err := s.getDbBackend(ns).SIsMember(getNsPrefix(ns)+group, member)
529         if err != nil {
530                 return false, err
531         }
532         return retVal, err
533 }
534
535 //GroupSize returns the number of members in a group under given namespace.
536 func (s *SyncStorage) GroupSize(ns string, group string) (int64, error) {
537         retVal, err := s.getDbBackend(ns).SCard(getNsPrefix(ns) + group)
538         if err != nil {
539                 return 0, err
540         }
541         return retVal, err
542 }
543
544 func (s *SyncStorage) randomToken() (string, error) {
545         s.mutex.Lock()
546         defer s.mutex.Unlock()
547
548         if len(s.tmp) == 0 {
549                 s.tmp = make([]byte, 16)
550         }
551
552         if _, err := io.ReadFull(rand.Reader, s.tmp); err != nil {
553                 return "", err
554         }
555
556         return base64.RawURLEncoding.EncodeToString(s.tmp), nil
557 }
558
559 //LockResource function is used for locking a resource under given namespace.
560 //The resource lock in practice is a key with random value that is set to expire
561 //after a time period. The value written to key is a random value, thus only the
562 //instance created a lock, can release it. Resource locks are per namespace.
563 func (s *SyncStorage) LockResource(ns string, resource string, expiration time.Duration, opt *Options) (*SyncStorageLock, error) {
564         value, err := s.randomToken()
565         if err != nil {
566                 return nil, err
567         }
568
569         var retryTimer *time.Timer
570         for i, attempts := 0, opt.getRetryCount()+1; i < attempts; i++ {
571                 ok, err := s.getDbBackend(ns).SetNX(getNsPrefix(ns)+resource, value, expiration)
572                 if err != nil {
573                         return nil, err
574                 } else if ok {
575                         return &SyncStorageLock{s: s, key: resource, value: value}, nil
576                 }
577                 if retryTimer == nil {
578                         retryTimer = time.NewTimer(opt.getRetryWait())
579                         defer retryTimer.Stop()
580                 } else {
581                         retryTimer.Reset(opt.getRetryWait())
582                 }
583
584                 select {
585                 case <-retryTimer.C:
586                 }
587         }
588         return nil, errors.New("Lock not obtained")
589 }
590
591 //ReleaseResource removes the lock from a resource under given namespace. If lock
592 //is already expired or some other instance is keeping the lock (lock taken after
593 //expiration), an error is returned.
594 func (l *SyncStorageLock) ReleaseResource(ns string) error {
595         ok, err := l.s.getDbBackend(ns).DelIE(getNsPrefix(ns)+l.key, l.value)
596
597         if err != nil {
598                 return err
599         }
600         if !ok {
601                 return errors.New("Lock not held")
602         }
603         return nil
604 }
605
606 //RefreshResource function can be used to set a new expiration time for the
607 //resource lock (if the lock still exists) under given namespace. The old
608 //remaining expiration time is overwritten with the given new expiration time.
609 func (l *SyncStorageLock) RefreshResource(ns string, expiration time.Duration) error {
610         err := l.s.getDbBackend(ns).PExpireIE(getNsPrefix(ns)+l.key, l.value, expiration)
611         return err
612 }
613
614 //CheckResource returns the expiration time left for a resource under given
615 //namespace. If the resource doesn't exist, -2 is returned.
616 func (s *SyncStorage) CheckResource(ns string, resource string) (time.Duration, error) {
617         result, err := s.getDbBackend(ns).PTTL(getNsPrefix(ns) + resource)
618         if err != nil {
619                 return 0, err
620         }
621         if result == time.Duration(-1) {
622                 return 0, errors.New("invalid resource given, no expiration time attached")
623         }
624         return result, nil
625 }
626
627 //SyncStorageLock struct identifies the resource lock instance. Releasing and adjusting the
628 //expirations are done using the methods defined for this struct.
629 type SyncStorageLock struct {
630         s     *SyncStorage
631         key   string
632         value string
633 }
634
635 func getNsPrefix(ns string) string {
636         return "{" + ns + "}" + sdlgoredis.NsSeparator
637 }
638
639 type iDatabase interface {
640         SubscribeChannelDB(cb func(string, ...string), channels ...string) error
641         UnsubscribeChannelDB(channels ...string) error
642         MSet(pairs ...interface{}) error
643         MSetMPub(channelsAndEvents []string, pairs ...interface{}) error
644         MGet(keys []string) ([]interface{}, error)
645         CloseDB() error
646         Del(keys []string) error
647         DelMPub(channelsAndEvents []string, keys []string) error
648         Keys(key string) ([]string, error)
649         SetIE(key string, oldData, newData interface{}) (bool, error)
650         SetIEPub(channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error)
651         SetNX(key string, data interface{}, expiration time.Duration) (bool, error)
652         SetNXPub(channelsAndEvents []string, key string, data interface{}) (bool, error)
653         DelIE(key string, data interface{}) (bool, error)
654         DelIEPub(channelsAndEvents []string, key string, data interface{}) (bool, error)
655         SAdd(key string, data ...interface{}) error
656         SRem(key string, data ...interface{}) error
657         SMembers(key string) ([]string, error)
658         SIsMember(key string, data interface{}) (bool, error)
659         SCard(key string) (int64, error)
660         PTTL(key string) (time.Duration, error)
661         PExpireIE(key string, data interface{}, expiration time.Duration) error
662 }