Adding health check for the reddis db.
[ric-plt/sdlgo.git] / syncstorage.go
1 /*
2    Copyright (c) 2021 AT&T Intellectual Property.
3    Copyright (c) 2018-2021 Nokia.
4
5    Licensed under the Apache License, Version 2.0 (the "License");
6    you may not use this file except in compliance with the License.
7    You may obtain a copy of the License at
8
9        http://www.apache.org/licenses/LICENSE-2.0
10
11    Unless required by applicable law or agreed to in writing, software
12    distributed under the License is distributed on an "AS IS" BASIS,
13    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14    See the License for the specific language governing permissions and
15    limitations under the License.
16 */
17
18 /*
19  * This source code is part of the near-RT RIC (RAN Intelligent Controller)
20  * platform project (RICP).
21  */
22
23 package sdlgo
24
25 import (
26         "crypto/rand"
27         "encoding/base64"
28         "errors"
29         "gerrit.o-ran-sc.org/r/ric-plt/sdlgo/internal/cli"
30         "fmt"
31         "gerrit.o-ran-sc.org/r/ric-plt/sdlgo/internal/sdlgoredis"
32         "hash/crc32"
33         "io"
34         "reflect"
35         "strings"
36         "sync"
37         "time"
38 )
39
40 //SyncStorage provides multi-namespace APIs to read, write and modify key-value
41 //pairs. key-values are belonging to a namespace and SyncStorage provides APIs
42 //where namespace can be given in every API call. This means that with
43 //SyncStorage you can easily set key-values under different namespace compared to
44 //SdlInstance where namespace can be defined only at SdlInstance instance creation
45 //time.
46 type SyncStorage struct {
47         mutex sync.Mutex
48         tmp   []byte
49         db    *Database
50 }
51
52 //NewSyncStorage creates a new sdl instance.
53 //The database used as a backend is given as a parameter
54 func NewSyncStorage() *SyncStorage {
55         return newSyncStorage(NewDatabase())
56 }
57 //Function to run the health check
58 func runHealthCheck(dbCreateCb DbCreateCb) ([]sdlgoredis.DbState, error) {
59         var anyErr error
60         var states []sdlgoredis.DbState
61         for _, dbInst := range dbCreateCb().Instances {
62                 state, err := dbInst.State()
63                 if err != nil {
64                         anyErr = err
65                 }
66                 states = append(states, *state)
67         }
68         return states, anyErr
69 }
70 //NewSyncStorage creates a new sdl instance.
71 //The database used as a backend is given as a parameter
72 func newSyncStorage(db *Database) *SyncStorage {
73         return &SyncStorage{
74                 db: db,
75         }
76 }
77
78 //selectDbInstance Selects DB instance what provides DB services for the namespace
79 func (s *SyncStorage) getDbBackend(ns string) iDatabase {
80         instanceCount := uint32(len(s.db.instances))
81         instanceID := getHash(ns) % instanceCount
82         return s.db.instances[instanceID]
83 }
84
85 //getHash Returns hash value calculated from the string
86 func getHash(s string) uint32 {
87         tbl := crc32.MakeTable(crc32.IEEE)
88         return crc32.Checksum([]byte(s), tbl)
89 }
90
91 //SubscribeChannel lets you to subscribe for a events on a given channels.
92 //SDL notifications are events that are published on a specific channels.
93 //Both the channel and events are defined by the entity that is publishing
94 //the events under given namespace.
95 //
96 //When subscribing for a channel, a callback function is given as a parameter.
97 //Whenever a notification is received from a channel, this callback is called
98 //with channel and notifications as parameter (several notifications could be
99 //packed to a single callback function call). A call to SubscribeChannel function
100 //returns immediatelly, callbacks will be called asyncronously.
101 //
102 //It is possible to subscribe to different channels using different callbacks. In
103 //this case simply use SubscribeChannel function separately for each channel.
104 //
105 //When receiving events in callback routine, it is a good practive to return from
106 //callback as quickly as possible. E.g. reading in callback context should be avoided
107 //and using of Go signals is recommended. Also it should be noted that in case of several
108 //events received from different channels, callbacks are called in series one by one.
109 func (s *SyncStorage) SubscribeChannel(ns string, cb func(string, ...string), channels ...string) error {
110         nsPrefix := getNsPrefix(ns)
111         return s.getDbBackend(ns).SubscribeChannelDB(cb, s.setNamespaceToChannels(nsPrefix, channels...)...)
112 }
113
114 //UnsubscribeChannel removes subscription from one or several channels under given
115 //namespace.
116 func (s *SyncStorage) UnsubscribeChannel(ns string, channels ...string) error {
117         nsPrefix := getNsPrefix(ns)
118         return s.getDbBackend(ns).UnsubscribeChannelDB(s.setNamespaceToChannels(nsPrefix, channels...)...)
119 }
120
121 //Close connection to backend database.
122 func (s *SyncStorage) Close() error {
123         var ret error
124         for _, db := range s.db.instances {
125                 if err := db.CloseDB(); err != nil {
126                         ret = err
127                 }
128         }
129         return ret
130 }
131
132 func (s *SyncStorage) checkChannelsAndEvents(cmd string, channelsAndEvents []string) error {
133         if len(channelsAndEvents)%2 != 0 {
134                 return fmt.Errorf("%s: Channels and events must be given as pairs", cmd)
135         }
136         for i, v := range channelsAndEvents {
137                 if i%2 != 0 {
138                         if strings.Contains(v, sdlgoredis.EventSeparator) {
139                                 return fmt.Errorf("%s: event %s contains illegal substring (\"%s\")", cmd, v, sdlgoredis.EventSeparator)
140                         }
141                 }
142         }
143         return nil
144 }
145
146 func (s *SyncStorage) setNamespaceToChannels(nsPrefix string, channels ...string) []string {
147         var retVal []string
148         for _, v := range channels {
149                 retVal = append(retVal, nsPrefix+v)
150         }
151         return retVal
152 }
153
154 func (s *SyncStorage) setNamespaceToKeys(nsPrefix string, pairs ...interface{}) ([]interface{}, error) {
155         retVal := make([]interface{}, 0)
156         shouldBeKey := true
157         for _, v := range pairs {
158                 reflectType := reflect.TypeOf(v)
159                 switch reflectType.Kind() {
160                 case reflect.Map:
161                         x := reflect.ValueOf(v).MapRange()
162                         for x.Next() {
163                                 retVal = append(retVal, nsPrefix+x.Key().Interface().(string))
164                                 retVal = append(retVal, x.Value().Interface())
165                         }
166                 case reflect.Slice:
167                         if shouldBeKey {
168                                 x := reflect.ValueOf(v)
169                                 if x.Len()%2 != 0 {
170                                         return []interface{}{}, errors.New("Key/value pairs doesn't match")
171                                 }
172                                 for i2 := 0; i2 < x.Len(); i2++ {
173                                         if i2%2 == 0 {
174                                                 retVal = append(retVal, nsPrefix+x.Index(i2).Interface().(string))
175                                         } else {
176                                                 retVal = append(retVal, x.Index(i2).Interface())
177                                         }
178                                 }
179                         } else {
180                                 if reflectType.Elem().Kind() == reflect.Uint8 {
181                                         retVal = append(retVal, v)
182                                         shouldBeKey = true
183                                 } else {
184                                         return []interface{}{}, errors.New("Key/value pairs doesn't match")
185                                 }
186                         }
187                 case reflect.Array:
188                         if shouldBeKey {
189                                 x := reflect.ValueOf(v)
190                                 if x.Len()%2 != 0 {
191                                         return []interface{}{}, errors.New("Key/value pairs doesn't match")
192                                 }
193                                 for i2 := 0; i2 < x.Len(); i2++ {
194                                         if i2%2 == 0 {
195                                                 retVal = append(retVal, nsPrefix+x.Index(i2).Interface().(string))
196                                         } else {
197                                                 retVal = append(retVal, x.Index(i2).Interface())
198                                         }
199                                 }
200                         } else {
201                                 if reflectType.Elem().Kind() == reflect.Uint8 {
202                                         retVal = append(retVal, v)
203                                         shouldBeKey = true
204                                 } else {
205                                         return []interface{}{}, errors.New("Key/value pairs doesn't match")
206                                 }
207                         }
208                 default:
209                         if shouldBeKey {
210                                 retVal = append(retVal, nsPrefix+v.(string))
211                                 shouldBeKey = false
212                         } else {
213                                 retVal = append(retVal, v)
214                                 shouldBeKey = true
215                         }
216                 }
217         }
218         if len(retVal)%2 != 0 {
219                 return []interface{}{}, errors.New("Key/value pairs doesn't match")
220         }
221         return retVal, nil
222 }
223
224 func (s *SyncStorage) prepareChannelsAndEvents(nsPrefix string, channelsAndEvents []string) []string {
225         channelEventMap := make(map[string]string)
226         for i, v := range channelsAndEvents {
227                 if i%2 != 0 {
228                         continue
229                 }
230                 _, exists := channelEventMap[v]
231                 if exists {
232                         channelEventMap[v] = channelEventMap[v] + sdlgoredis.EventSeparator + channelsAndEvents[i+1]
233                 } else {
234                         channelEventMap[v] = channelsAndEvents[i+1]
235                 }
236         }
237         retVal := make([]string, 0)
238         for k, v := range channelEventMap {
239                 retVal = append(retVal, nsPrefix+k)
240                 retVal = append(retVal, v)
241         }
242         return retVal
243 }
244
245 //SetAndPublish function writes data to shared data layer storage and sends an event to
246 //a channel. Writing is done atomically, i.e. all succeeds or fails.
247 //Data is written under the namespace what is given as a parameter for this function.
248 //Data to be written is given as key-value pairs. Several key-value
249 //pairs can be written with one call.
250 //The key is expected to be string whereas value can be anything, string,
251 //number, slice array or map
252 //
253 //If data was set successfully, an event is sent to a channel.
254 //Channels and events are given as pairs is channelsAndEvents parameter.
255 //It is possible to send several events to several channels by giving several
256 //channel-event pairs.
257 //  E.g. []{"channel1", "event1", "channel2", "event2", "channel1", "event3"}
258 //will send event1 and event3 to channel1 and event2 to channel2.
259 func (s *SyncStorage) SetAndPublish(ns string, channelsAndEvents []string, pairs ...interface{}) error {
260         nsPrefix := getNsPrefix(ns)
261         keyAndData, err := s.setNamespaceToKeys(nsPrefix, pairs...)
262         if err != nil {
263                 return err
264         }
265         if len(channelsAndEvents) == 0 {
266                 return s.getDbBackend(ns).MSet(keyAndData...)
267         }
268         if err := s.checkChannelsAndEvents("SetAndPublish", channelsAndEvents); err != nil {
269                 return err
270         }
271         channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
272         return s.getDbBackend(ns).MSetMPub(channelsAndEventsPrepared, keyAndData...)
273 }
274
275 //Set function writes data to shared data layer storage. Writing is done
276 //atomically, i.e. all succeeds or fails.
277 //Data is written under the namespace what is given as a parameter for this function.
278 //Data to be written is given as key-value pairs. Several key-value
279 //pairs can be written with one call.
280 //The key is expected to be string whereas value can be anything, string,
281 //number, slice array or map
282 func (s *SyncStorage) Set(ns string, pairs ...interface{}) error {
283         if len(pairs) == 0 {
284                 return nil
285         }
286
287         keyAndData, err := s.setNamespaceToKeys(getNsPrefix(ns), pairs...)
288         if err != nil {
289                 return err
290         }
291         return s.getDbBackend(ns).MSet(keyAndData...)
292 }
293
294 //Get function atomically reads one or more keys from SDL. The returned map has the
295 //requested keys as index and data as value. If the requested key is not found
296 //from SDL, it's value is nil
297 //Read operation is targeted to the namespace what is given as a parameter for this
298 //function.
299 func (s *SyncStorage) Get(ns string, keys []string) (map[string]interface{}, error) {
300         m := make(map[string]interface{})
301         if len(keys) == 0 {
302                 return m, nil
303         }
304
305         var keysWithNs []string
306         for _, v := range keys {
307                 keysWithNs = append(keysWithNs, getNsPrefix(ns)+v)
308         }
309         val, err := s.getDbBackend(ns).MGet(keysWithNs)
310         if err != nil {
311                 return m, err
312         }
313         for i, v := range val {
314                 m[keys[i]] = v
315         }
316         return m, err
317 }
318
319 //SetIfAndPublish atomically replaces existing data with newData in SDL if data matches the oldData.
320 //If replace was done successfully, true will be returned. Also, if publishing was successfull, an event
321 //is published to a given channel.
322 //Data is written under the namespace what is given as a parameter for this function.
323 func (s *SyncStorage) SetIfAndPublish(ns string, channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error) {
324         nsPrefix := getNsPrefix(ns)
325         if len(channelsAndEvents) == 0 {
326                 return s.getDbBackend(ns).SetIE(nsPrefix+key, oldData, newData)
327         }
328         if err := s.checkChannelsAndEvents("SetIfAndPublish", channelsAndEvents); err != nil {
329                 return false, err
330         }
331         channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
332         return s.getDbBackend(ns).SetIEPub(channelsAndEventsPrepared, nsPrefix+key, oldData, newData)
333 }
334
335 //SetIf atomically replaces existing data with newData in SDL if data matches the oldData.
336 //If replace was done successfully, true will be returned.
337 //Data is written under the namespace what is given as a parameter for this function.
338 func (s *SyncStorage) SetIf(ns string, key string, oldData, newData interface{}) (bool, error) {
339         return s.getDbBackend(ns).SetIE(getNsPrefix(ns)+key, oldData, newData)
340 }
341
342 //SetIfNotExistsAndPublish conditionally sets the value of a key. If key already exists in SDL,
343 //then it's value is not changed. Checking the key existence and potential set operation
344 //is done atomically. If the set operation was done successfully, an event is published to a
345 //given channel.
346 //Data is written under the namespace what is given as a parameter for this function.
347 func (s *SyncStorage) SetIfNotExistsAndPublish(ns string, channelsAndEvents []string, key string, data interface{}) (bool, error) {
348         nsPrefix := getNsPrefix(ns)
349         if len(channelsAndEvents) == 0 {
350                 return s.getDbBackend(ns).SetNX(nsPrefix+key, data, 0)
351         }
352         if err := s.checkChannelsAndEvents("SetIfNotExistsAndPublish", channelsAndEvents); err != nil {
353                 return false, err
354         }
355         channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
356         return s.getDbBackend(ns).SetNXPub(channelsAndEventsPrepared, nsPrefix+key, data)
357 }
358
359 //SetIfNotExists conditionally sets the value of a key. If key already exists in SDL,
360 //then it's value is not changed. Checking the key existence and potential set operation
361 //is done atomically.
362 //Data is written under the namespace what is given as a parameter for this function.
363 func (s *SyncStorage) SetIfNotExists(ns string, key string, data interface{}) (bool, error) {
364         return s.getDbBackend(ns).SetNX(getNsPrefix(ns)+key, data, 0)
365 }
366
367 //RemoveAndPublish removes data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
368 //Trying to remove a nonexisting key is not considered as an error.
369 //An event is published into a given channel if remove operation is successfull and
370 //at least one key is removed (if several keys given). If the given key(s) doesn't exist
371 //when trying to remove, no event is published.
372 //Data is removed under the namespace what is given as a parameter for this function.
373 func (s *SyncStorage) RemoveAndPublish(ns string, channelsAndEvents []string, keys []string) error {
374         if len(keys) == 0 {
375                 return nil
376         }
377
378         var keysWithNs []string
379         nsPrefix := getNsPrefix(ns)
380         for _, v := range keys {
381                 keysWithNs = append(keysWithNs, nsPrefix+v)
382         }
383         if len(channelsAndEvents) == 0 {
384                 return s.getDbBackend(ns).Del(keysWithNs)
385         }
386         if err := s.checkChannelsAndEvents("RemoveAndPublish", channelsAndEvents); err != nil {
387                 return err
388         }
389         channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
390         return s.getDbBackend(ns).DelMPub(channelsAndEventsPrepared, keysWithNs)
391 }
392
393 //Remove data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
394 //Data is removed under the namespace what is given as a parameter for this function.
395 func (s *SyncStorage) Remove(ns string, keys []string) error {
396         if len(keys) == 0 {
397                 return nil
398         }
399
400         var keysWithNs []string
401         for _, v := range keys {
402                 keysWithNs = append(keysWithNs, getNsPrefix(ns)+v)
403         }
404         err := s.getDbBackend(ns).Del(keysWithNs)
405         return err
406 }
407
408 //RemoveIfAndPublish removes data from SDL conditionally and if remove was done successfully,
409 //a given event is published to channel. If existing data matches given data,
410 //key and data are removed from SDL. If remove was done successfully, true is returned.
411 //Data is removed under the namespace what is given as a parameter for this function.
412 func (s *SyncStorage) RemoveIfAndPublish(ns string, channelsAndEvents []string, key string, data interface{}) (bool, error) {
413         nsPrefix := getNsPrefix(ns)
414         if len(channelsAndEvents) == 0 {
415                 return s.getDbBackend(ns).DelIE(nsPrefix+key, data)
416         }
417         if err := s.checkChannelsAndEvents("RemoveIfAndPublish", channelsAndEvents); err != nil {
418                 return false, err
419         }
420         channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
421         return s.getDbBackend(ns).DelIEPub(channelsAndEventsPrepared, nsPrefix+key, data)
422 }
423
424 //RemoveIf removes data from SDL conditionally. If existing data matches given data,
425 //key and data are removed from SDL. If remove was done successfully, true is returned.
426 //Data is removed under the namespace what is given as a parameter for this function.
427 func (s *SyncStorage) RemoveIf(ns string, key string, data interface{}) (bool, error) {
428         status, err := s.getDbBackend(ns).DelIE(getNsPrefix(ns)+key, data)
429         if err != nil {
430                 return false, err
431         }
432         return status, nil
433 }
434
435 //GetAll returns all keys under the namespace. No prior knowledge about the keys in the
436 //given namespace exists, thus operation is not guaranteed to be atomic or isolated.
437 func (s *SyncStorage) GetAll(ns string) ([]string, error) {
438         nsPrefix := getNsPrefix(ns)
439         keys, err := s.getDbBackend(ns).Keys(nsPrefix + "*")
440         var retVal []string
441         if err != nil {
442                 return retVal, err
443         }
444         for _, v := range keys {
445                 retVal = append(retVal, strings.Split(v, nsPrefix)[1])
446         }
447         return retVal, err
448 }
449
450 // ListKeys returns all keys in the given namespace matching key search pattern.
451 //
452 //  Supported search glob-style patterns:
453 //    h?llo matches hello, hallo and hxllo
454 //    h*llo matches hllo and heeeello
455 //    h[ae]llo matches hello and hallo, but not hillo
456 //    h[^e]llo matches hallo, hbllo, ... but not hello
457 //    h[a-b]llo matches hallo and hbllo
458 //
459 //  The \ escapes character in key search pattern and those will be treated as a normal
460 //  character:
461 //    h\[?llo\* matches h[ello* and h[allo*
462 //
463 // No prior knowledge about the keys in the given namespace exists,
464 // thus operation is not guaranteed to be atomic or isolated.
465 func (s *SyncStorage) ListKeys(ns string, pattern string) ([]string, error) {
466         nsPrefix := getNsPrefix(ns)
467         nsKeys, err := s.getDbBackend(ns).Keys(nsPrefix + pattern)
468         var keys []string
469         if err != nil {
470                 return keys, err
471         }
472         for _, key := range nsKeys {
473                 keys = append(keys, strings.Split(key, nsPrefix)[1])
474         }
475         return keys, err
476 }
477
478 //RemoveAll removes all keys under the namespace. Remove operation is not atomic, thus
479 //it is not guaranteed that all keys are removed.
480 func (s *SyncStorage) RemoveAll(ns string) error {
481         keys, err := s.getDbBackend(ns).Keys(getNsPrefix(ns) + "*")
482         if err != nil {
483                 return err
484         }
485         if (keys != nil) && (len(keys) != 0) {
486                 err = s.getDbBackend(ns).Del(keys)
487         }
488         return err
489 }
490
491 //RemoveAllAndPublish removes all keys under the namespace and if successfull, it
492 //will publish an event to given channel. This operation is not atomic, thus it is
493 //not guaranteed that all keys are removed.
494 func (s *SyncStorage) RemoveAllAndPublish(ns string, channelsAndEvents []string) error {
495         nsPrefix := getNsPrefix(ns)
496         keys, err := s.getDbBackend(ns).Keys(nsPrefix + "*")
497         if err != nil {
498                 return err
499         }
500         if (keys != nil) && (len(keys) != 0) {
501                 if len(channelsAndEvents) == 0 {
502                         return s.getDbBackend(ns).Del(keys)
503                 }
504                 if err := s.checkChannelsAndEvents("RemoveAllAndPublish", channelsAndEvents); err != nil {
505                         return err
506                 }
507                 channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
508                 err = s.getDbBackend(ns).DelMPub(channelsAndEventsPrepared, keys)
509         }
510         return err
511 }
512
513 //AddMember adds a new members to a group under given namespace.
514 //
515 //SDL groups are unordered collections of members where each member is
516 //unique. It is possible to add the same member several times without the
517 //need to check if it already exists.
518 func (s *SyncStorage) AddMember(ns string, group string, member ...interface{}) error {
519         return s.getDbBackend(ns).SAdd(getNsPrefix(ns)+group, member...)
520 }
521
522 //RemoveMember removes members from a group under given namespace.
523 func (s *SyncStorage) RemoveMember(ns string, group string, member ...interface{}) error {
524         return s.getDbBackend(ns).SRem(getNsPrefix(ns)+group, member...)
525 }
526
527 //RemoveGroup removes the whole group along with it's members under given namespace.
528 func (s *SyncStorage) RemoveGroup(ns string, group string) error {
529         return s.getDbBackend(ns).Del([]string{getNsPrefix(ns) + group})
530 }
531
532 //GetMembers returns all the members from a group under given namespace.
533 func (s *SyncStorage) GetMembers(ns string, group string) ([]string, error) {
534         retVal, err := s.getDbBackend(ns).SMembers(getNsPrefix(ns) + group)
535         if err != nil {
536                 return []string{}, err
537         }
538         return retVal, err
539 }
540
541 //IsMember returns true if given member is found from a group under given namespace.
542 func (s *SyncStorage) IsMember(ns string, group string, member interface{}) (bool, error) {
543         retVal, err := s.getDbBackend(ns).SIsMember(getNsPrefix(ns)+group, member)
544         if err != nil {
545                 return false, err
546         }
547         return retVal, err
548 }
549
550 //GroupSize returns the number of members in a group under given namespace.
551 func (s *SyncStorage) GroupSize(ns string, group string) (int64, error) {
552         retVal, err := s.getDbBackend(ns).SCard(getNsPrefix(ns) + group)
553         if err != nil {
554                 return 0, err
555         }
556         return retVal, err
557 }
558
559 func (s *SyncStorage) randomToken() (string, error) {
560         s.mutex.Lock()
561         defer s.mutex.Unlock()
562
563         if len(s.tmp) == 0 {
564                 s.tmp = make([]byte, 16)
565         }
566
567         if _, err := io.ReadFull(rand.Reader, s.tmp); err != nil {
568                 return "", err
569         }
570
571         return base64.RawURLEncoding.EncodeToString(s.tmp), nil
572 }
573
574 //LockResource function is used for locking a resource under given namespace.
575 //The resource lock in practice is a key with random value that is set to expire
576 //after a time period. The value written to key is a random value, thus only the
577 //instance created a lock, can release it. Resource locks are per namespace.
578 func (s *SyncStorage) LockResource(ns string, resource string, expiration time.Duration, opt *Options) (*SyncStorageLock, error) {
579         value, err := s.randomToken()
580         if err != nil {
581                 return nil, err
582         }
583
584         var retryTimer *time.Timer
585         for i, attempts := 0, opt.getRetryCount()+1; i < attempts; i++ {
586                 ok, err := s.getDbBackend(ns).SetNX(getNsPrefix(ns)+resource, value, expiration)
587                 if err != nil {
588                         return nil, err
589                 } else if ok {
590                         return &SyncStorageLock{s: s, key: resource, value: value}, nil
591                 }
592                 if retryTimer == nil {
593                         retryTimer = time.NewTimer(opt.getRetryWait())
594                         defer retryTimer.Stop()
595                 } else {
596                         retryTimer.Reset(opt.getRetryWait())
597                 }
598
599                 select {
600                 case <-retryTimer.C:
601                 }
602         }
603         return nil, errors.New("Lock not obtained")
604 }
605
606 //ReleaseResource removes the lock from a resource under given namespace. If lock
607 //is already expired or some other instance is keeping the lock (lock taken after
608 //expiration), an error is returned.
609 func (l *SyncStorageLock) ReleaseResource(ns string) error {
610         ok, err := l.s.getDbBackend(ns).DelIE(getNsPrefix(ns)+l.key, l.value)
611
612         if err != nil {
613                 return err
614         }
615         if !ok {
616                 return errors.New("Lock not held")
617         }
618         return nil
619 }
620
621 //RefreshResource function can be used to set a new expiration time for the
622 //resource lock (if the lock still exists) under given namespace. The old
623 //remaining expiration time is overwritten with the given new expiration time.
624 func (l *SyncStorageLock) RefreshResource(ns string, expiration time.Duration) error {
625         err := l.s.getDbBackend(ns).PExpireIE(getNsPrefix(ns)+l.key, l.value, expiration)
626         return err
627 }
628
629 //CheckResource returns the expiration time left for a resource under given
630 //namespace. If the resource doesn't exist, -2 is returned.
631 func (s *SyncStorage) CheckResource(ns string, resource string) (time.Duration, error) {
632         result, err := s.getDbBackend(ns).PTTL(getNsPrefix(ns) + resource)
633         if err != nil {
634                 return 0, err
635         }
636         if result == time.Duration(-1) {
637                 return 0, errors.New("invalid resource given, no expiration time attached")
638         }
639         return result, nil
640 }
641
642 //SyncStorageLock struct identifies the resource lock instance. Releasing and adjusting the
643 //expirations are done using the methods defined for this struct.
644 type SyncStorageLock struct {
645         s     *SyncStorage
646         key   string
647         value string
648 }
649
650 func getNsPrefix(ns string) string {
651         return "{" + ns + "}" + sdlgoredis.NsSeparator
652 }
653
654 type iDatabase interface {
655         SubscribeChannelDB(cb func(string, ...string), channels ...string) error
656         UnsubscribeChannelDB(channels ...string) error
657         MSet(pairs ...interface{}) error
658         MSetMPub(channelsAndEvents []string, pairs ...interface{}) error
659         MGet(keys []string) ([]interface{}, error)
660         CloseDB() error
661         Del(keys []string) error
662         DelMPub(channelsAndEvents []string, keys []string) error
663         Keys(key string) ([]string, error)
664         SetIE(key string, oldData, newData interface{}) (bool, error)
665         SetIEPub(channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error)
666         SetNX(key string, data interface{}, expiration time.Duration) (bool, error)
667         SetNXPub(channelsAndEvents []string, key string, data interface{}) (bool, error)
668         DelIE(key string, data interface{}) (bool, error)
669         DelIEPub(channelsAndEvents []string, key string, data interface{}) (bool, error)
670         SAdd(key string, data ...interface{}) error
671         SRem(key string, data ...interface{}) error
672         SMembers(key string) ([]string, error)
673         SIsMember(key string, data interface{}) (bool, error)
674         SCard(key string) (int64, error)
675         PTTL(key string) (time.Duration, error)
676         PExpireIE(key string, data interface{}, expiration time.Duration) error
677 }