Implement sentinel based DB capacity scaling
[ric-plt/sdlgo.git] / syncstorage.go
1 /*
2    Copyright (c) 2021 AT&T Intellectual Property.
3    Copyright (c) 2018-2021 Nokia.
4
5    Licensed under the Apache License, Version 2.0 (the "License");
6    you may not use this file except in compliance with the License.
7    You may obtain a copy of the License at
8
9        http://www.apache.org/licenses/LICENSE-2.0
10
11    Unless required by applicable law or agreed to in writing, software
12    distributed under the License is distributed on an "AS IS" BASIS,
13    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14    See the License for the specific language governing permissions and
15    limitations under the License.
16 */
17
18 /*
19  * This source code is part of the near-RT RIC (RAN Intelligent Controller)
20  * platform project (RICP).
21  */
22
23 package sdlgo
24
25 import (
26         "crypto/rand"
27         "encoding/base64"
28         "errors"
29         "fmt"
30         "hash/crc32"
31         "io"
32         "reflect"
33         "strings"
34         "sync"
35         "time"
36 )
37
38 //SyncStorage provides multi-namespace APIs to read, write and modify key-value
39 //pairs. key-values are belonging to a namespace and SyncStorage provides APIs
40 //where namespace can be given in every API call. This means that with
41 //SyncStorage you can easily set key-values under different namespace compared to
42 //SdlInstance where namespace can be defined only at SdlInstance instance creation
43 //time.
44 type SyncStorage struct {
45         eventSeparator string
46         mutex          sync.Mutex
47         tmp            []byte
48         db             *Database
49 }
50
51 //NewSyncStorage creates a new sdl instance.
52 //The database used as a backend is given as a parameter
53 func NewSyncStorage() *SyncStorage {
54         return newSyncStorage(NewDatabase())
55 }
56
57 func newSyncStorage(db *Database) *SyncStorage {
58         return &SyncStorage{
59                 eventSeparator: "___",
60                 db:             db,
61         }
62 }
63
64 //selectDbInstance Selects DB instance what provides DB services for the namespace
65 func (s *SyncStorage) getDbBackend(ns string) iDatabase {
66         instanceCount := uint32(len(s.db.instances))
67         instanceID := getHash(ns) % instanceCount
68         return s.db.instances[instanceID]
69 }
70
71 //getHash Returns hash value calculated from the string
72 func getHash(s string) uint32 {
73         tbl := crc32.MakeTable(crc32.IEEE)
74         return crc32.Checksum([]byte(s), tbl)
75 }
76
77 //SubscribeChannel lets you to subscribe for a events on a given channels.
78 //SDL notifications are events that are published on a specific channels.
79 //Both the channel and events are defined by the entity that is publishing
80 //the events under given namespace.
81 //
82 //When subscribing for a channel, a callback function is given as a parameter.
83 //Whenever a notification is received from a channel, this callback is called
84 //with channel and notifications as parameter (several notifications could be
85 //packed to a single callback function call). A call to SubscribeChannel function
86 //returns immediatelly, callbacks will be called asyncronously.
87 //
88 //It is possible to subscribe to different channels using different callbacks. In
89 //this case simply use SubscribeChannel function separately for each channel.
90 //
91 //When receiving events in callback routine, it is a good practive to return from
92 //callback as quickly as possible. E.g. reading in callback context should be avoided
93 //and using of Go signals is recommended. Also it should be noted that in case of several
94 //events received from different channels, callbacks are called in series one by one.
95 func (s *SyncStorage) SubscribeChannel(ns string, cb func(string, ...string), channels ...string) error {
96         nsPrefix := getNsPrefix(ns)
97         s.getDbBackend(ns).SubscribeChannelDB(cb, nsPrefix, s.eventSeparator, s.setNamespaceToChannels(nsPrefix, channels...)...)
98         return nil
99 }
100
101 //UnsubscribeChannel removes subscription from one or several channels under given
102 //namespace.
103 func (s *SyncStorage) UnsubscribeChannel(ns string, channels ...string) error {
104         nsPrefix := getNsPrefix(ns)
105         s.getDbBackend(ns).UnsubscribeChannelDB(s.setNamespaceToChannels(nsPrefix, channels...)...)
106         return nil
107 }
108
109 //Close connection to backend database.
110 func (s *SyncStorage) Close() error {
111         var ret error
112         for _, db := range s.db.instances {
113                 if err := db.CloseDB(); err != nil {
114                         ret = err
115                 }
116         }
117         return ret
118 }
119
120 func (s *SyncStorage) checkChannelsAndEvents(cmd string, channelsAndEvents []string) error {
121         if len(channelsAndEvents)%2 != 0 {
122                 return fmt.Errorf("%s: Channels and events must be given as pairs", cmd)
123         }
124         for i, v := range channelsAndEvents {
125                 if i%2 != 0 {
126                         if strings.Contains(v, s.eventSeparator) {
127                                 return fmt.Errorf("%s: event %s contains illegal substring (\"%s\")", cmd, v, s.eventSeparator)
128                         }
129                 }
130         }
131         return nil
132 }
133
134 func (s *SyncStorage) setNamespaceToChannels(nsPrefix string, channels ...string) []string {
135         var retVal []string
136         for _, v := range channels {
137                 retVal = append(retVal, nsPrefix+v)
138         }
139         return retVal
140 }
141
142 func (s *SyncStorage) setNamespaceToKeys(nsPrefix string, pairs ...interface{}) ([]interface{}, error) {
143         retVal := make([]interface{}, 0)
144         shouldBeKey := true
145         for _, v := range pairs {
146                 reflectType := reflect.TypeOf(v)
147                 switch reflectType.Kind() {
148                 case reflect.Map:
149                         x := reflect.ValueOf(v).MapRange()
150                         for x.Next() {
151                                 retVal = append(retVal, nsPrefix+x.Key().Interface().(string))
152                                 retVal = append(retVal, x.Value().Interface())
153                         }
154                 case reflect.Slice:
155                         if shouldBeKey {
156                                 x := reflect.ValueOf(v)
157                                 if x.Len()%2 != 0 {
158                                         return []interface{}{}, errors.New("Key/value pairs doesn't match")
159                                 }
160                                 for i2 := 0; i2 < x.Len(); i2++ {
161                                         if i2%2 == 0 {
162                                                 retVal = append(retVal, nsPrefix+x.Index(i2).Interface().(string))
163                                         } else {
164                                                 retVal = append(retVal, x.Index(i2).Interface())
165                                         }
166                                 }
167                         } else {
168                                 if reflectType.Elem().Kind() == reflect.Uint8 {
169                                         retVal = append(retVal, v)
170                                         shouldBeKey = true
171                                 } else {
172                                         return []interface{}{}, errors.New("Key/value pairs doesn't match")
173                                 }
174                         }
175                 case reflect.Array:
176                         if shouldBeKey {
177                                 x := reflect.ValueOf(v)
178                                 if x.Len()%2 != 0 {
179                                         return []interface{}{}, errors.New("Key/value pairs doesn't match")
180                                 }
181                                 for i2 := 0; i2 < x.Len(); i2++ {
182                                         if i2%2 == 0 {
183                                                 retVal = append(retVal, nsPrefix+x.Index(i2).Interface().(string))
184                                         } else {
185                                                 retVal = append(retVal, x.Index(i2).Interface())
186                                         }
187                                 }
188                         } else {
189                                 if reflectType.Elem().Kind() == reflect.Uint8 {
190                                         retVal = append(retVal, v)
191                                         shouldBeKey = true
192                                 } else {
193                                         return []interface{}{}, errors.New("Key/value pairs doesn't match")
194                                 }
195                         }
196                 default:
197                         if shouldBeKey {
198                                 retVal = append(retVal, nsPrefix+v.(string))
199                                 shouldBeKey = false
200                         } else {
201                                 retVal = append(retVal, v)
202                                 shouldBeKey = true
203                         }
204                 }
205         }
206         if len(retVal)%2 != 0 {
207                 return []interface{}{}, errors.New("Key/value pairs doesn't match")
208         }
209         return retVal, nil
210 }
211
212 func (s *SyncStorage) prepareChannelsAndEvents(nsPrefix string, channelsAndEvents []string) []string {
213         channelEventMap := make(map[string]string)
214         for i, v := range channelsAndEvents {
215                 if i%2 != 0 {
216                         continue
217                 }
218                 _, exists := channelEventMap[v]
219                 if exists {
220                         channelEventMap[v] = channelEventMap[v] + s.eventSeparator + channelsAndEvents[i+1]
221                 } else {
222                         channelEventMap[v] = channelsAndEvents[i+1]
223                 }
224         }
225         retVal := make([]string, 0)
226         for k, v := range channelEventMap {
227                 retVal = append(retVal, nsPrefix+k)
228                 retVal = append(retVal, v)
229         }
230         return retVal
231 }
232
233 //SetAndPublish function writes data to shared data layer storage and sends an event to
234 //a channel. Writing is done atomically, i.e. all succeeds or fails.
235 //Data is written under the namespace what is given as a parameter for this function.
236 //Data to be written is given as key-value pairs. Several key-value
237 //pairs can be written with one call.
238 //The key is expected to be string whereas value can be anything, string,
239 //number, slice array or map
240 //
241 //If data was set successfully, an event is sent to a channel.
242 //Channels and events are given as pairs is channelsAndEvents parameter.
243 //It is possible to send several events to several channels by giving several
244 //channel-event pairs.
245 //  E.g. []{"channel1", "event1", "channel2", "event2", "channel1", "event3"}
246 //will send event1 and event3 to channel1 and event2 to channel2.
247 func (s *SyncStorage) SetAndPublish(ns string, channelsAndEvents []string, pairs ...interface{}) error {
248         nsPrefix := getNsPrefix(ns)
249         keyAndData, err := s.setNamespaceToKeys(nsPrefix, pairs...)
250         if err != nil {
251                 return err
252         }
253         if len(channelsAndEvents) == 0 {
254                 return s.getDbBackend(ns).MSet(keyAndData...)
255         }
256         if err := s.checkChannelsAndEvents("SetAndPublish", channelsAndEvents); err != nil {
257                 return err
258         }
259         channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
260         return s.getDbBackend(ns).MSetMPub(channelsAndEventsPrepared, keyAndData...)
261 }
262
263 //Set function writes data to shared data layer storage. Writing is done
264 //atomically, i.e. all succeeds or fails.
265 //Data is written under the namespace what is given as a parameter for this function.
266 //Data to be written is given as key-value pairs. Several key-value
267 //pairs can be written with one call.
268 //The key is expected to be string whereas value can be anything, string,
269 //number, slice array or map
270 func (s *SyncStorage) Set(ns string, pairs ...interface{}) error {
271         if len(pairs) == 0 {
272                 return nil
273         }
274
275         keyAndData, err := s.setNamespaceToKeys(getNsPrefix(ns), pairs...)
276         if err != nil {
277                 return err
278         }
279         return s.getDbBackend(ns).MSet(keyAndData...)
280 }
281
282 //Get function atomically reads one or more keys from SDL. The returned map has the
283 //requested keys as index and data as value. If the requested key is not found
284 //from SDL, it's value is nil
285 //Read operation is targeted to the namespace what is given as a parameter for this
286 //function.
287 func (s *SyncStorage) Get(ns string, keys []string) (map[string]interface{}, error) {
288         m := make(map[string]interface{})
289         if len(keys) == 0 {
290                 return m, nil
291         }
292
293         var keysWithNs []string
294         for _, v := range keys {
295                 keysWithNs = append(keysWithNs, getNsPrefix(ns)+v)
296         }
297         val, err := s.getDbBackend(ns).MGet(keysWithNs)
298         if err != nil {
299                 return m, err
300         }
301         for i, v := range val {
302                 m[keys[i]] = v
303         }
304         return m, err
305 }
306
307 //SetIfAndPublish atomically replaces existing data with newData in SDL if data matches the oldData.
308 //If replace was done successfully, true will be returned. Also, if publishing was successfull, an event
309 //is published to a given channel.
310 //Data is written under the namespace what is given as a parameter for this function.
311 func (s *SyncStorage) SetIfAndPublish(ns string, channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error) {
312         nsPrefix := getNsPrefix(ns)
313         if len(channelsAndEvents) == 0 {
314                 return s.getDbBackend(ns).SetIE(nsPrefix+key, oldData, newData)
315         }
316         if err := s.checkChannelsAndEvents("SetIfAndPublish", channelsAndEvents); err != nil {
317                 return false, err
318         }
319         channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
320         return s.getDbBackend(ns).SetIEPub(channelsAndEventsPrepared, nsPrefix+key, oldData, newData)
321 }
322
323 //SetIf atomically replaces existing data with newData in SDL if data matches the oldData.
324 //If replace was done successfully, true will be returned.
325 //Data is written under the namespace what is given as a parameter for this function.
326 func (s *SyncStorage) SetIf(ns string, key string, oldData, newData interface{}) (bool, error) {
327         return s.getDbBackend(ns).SetIE(getNsPrefix(ns)+key, oldData, newData)
328 }
329
330 //SetIfNotExistsAndPublish conditionally sets the value of a key. If key already exists in SDL,
331 //then it's value is not changed. Checking the key existence and potential set operation
332 //is done atomically. If the set operation was done successfully, an event is published to a
333 //given channel.
334 //Data is written under the namespace what is given as a parameter for this function.
335 func (s *SyncStorage) SetIfNotExistsAndPublish(ns string, channelsAndEvents []string, key string, data interface{}) (bool, error) {
336         nsPrefix := getNsPrefix(ns)
337         if len(channelsAndEvents) == 0 {
338                 return s.getDbBackend(ns).SetNX(nsPrefix+key, data, 0)
339         }
340         if err := s.checkChannelsAndEvents("SetIfNotExistsAndPublish", channelsAndEvents); err != nil {
341                 return false, err
342         }
343         channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
344         return s.getDbBackend(ns).SetNXPub(channelsAndEventsPrepared, nsPrefix+key, data)
345 }
346
347 //SetIfNotExists conditionally sets the value of a key. If key already exists in SDL,
348 //then it's value is not changed. Checking the key existence and potential set operation
349 //is done atomically.
350 //Data is written under the namespace what is given as a parameter for this function.
351 func (s *SyncStorage) SetIfNotExists(ns string, key string, data interface{}) (bool, error) {
352         return s.getDbBackend(ns).SetNX(getNsPrefix(ns)+key, data, 0)
353 }
354
355 //RemoveAndPublish removes data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
356 //Trying to remove a nonexisting key is not considered as an error.
357 //An event is published into a given channel if remove operation is successfull and
358 //at least one key is removed (if several keys given). If the given key(s) doesn't exist
359 //when trying to remove, no event is published.
360 //Data is removed under the namespace what is given as a parameter for this function.
361 func (s *SyncStorage) RemoveAndPublish(ns string, channelsAndEvents []string, keys []string) error {
362         if len(keys) == 0 {
363                 return nil
364         }
365
366         var keysWithNs []string
367         nsPrefix := getNsPrefix(ns)
368         for _, v := range keys {
369                 keysWithNs = append(keysWithNs, nsPrefix+v)
370         }
371         if len(channelsAndEvents) == 0 {
372                 return s.getDbBackend(ns).Del(keysWithNs)
373         }
374         if err := s.checkChannelsAndEvents("RemoveAndPublish", channelsAndEvents); err != nil {
375                 return err
376         }
377         channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
378         return s.getDbBackend(ns).DelMPub(channelsAndEventsPrepared, keysWithNs)
379 }
380
381 //Remove data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
382 //Data is removed under the namespace what is given as a parameter for this function.
383 func (s *SyncStorage) Remove(ns string, keys []string) error {
384         if len(keys) == 0 {
385                 return nil
386         }
387
388         var keysWithNs []string
389         for _, v := range keys {
390                 keysWithNs = append(keysWithNs, getNsPrefix(ns)+v)
391         }
392         err := s.getDbBackend(ns).Del(keysWithNs)
393         return err
394 }
395
396 //RemoveIfAndPublish removes data from SDL conditionally and if remove was done successfully,
397 //a given event is published to channel. If existing data matches given data,
398 //key and data are removed from SDL. If remove was done successfully, true is returned.
399 //Data is removed under the namespace what is given as a parameter for this function.
400 func (s *SyncStorage) RemoveIfAndPublish(ns string, channelsAndEvents []string, key string, data interface{}) (bool, error) {
401         nsPrefix := getNsPrefix(ns)
402         if len(channelsAndEvents) == 0 {
403                 return s.getDbBackend(ns).DelIE(nsPrefix+key, data)
404         }
405         if err := s.checkChannelsAndEvents("RemoveIfAndPublish", channelsAndEvents); err != nil {
406                 return false, err
407         }
408         channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
409         return s.getDbBackend(ns).DelIEPub(channelsAndEventsPrepared, nsPrefix+key, data)
410 }
411
412 //RemoveIf removes data from SDL conditionally. If existing data matches given data,
413 //key and data are removed from SDL. If remove was done successfully, true is returned.
414 //Data is removed under the namespace what is given as a parameter for this function.
415 func (s *SyncStorage) RemoveIf(ns string, key string, data interface{}) (bool, error) {
416         status, err := s.getDbBackend(ns).DelIE(getNsPrefix(ns)+key, data)
417         if err != nil {
418                 return false, err
419         }
420         return status, nil
421 }
422
423 //GetAll returns all keys under the namespace. No prior knowledge about the keys in the
424 //given namespace exists, thus operation is not guaranteed to be atomic or isolated.
425 func (s *SyncStorage) GetAll(ns string) ([]string, error) {
426         nsPrefix := getNsPrefix(ns)
427         keys, err := s.getDbBackend(ns).Keys(nsPrefix + "*")
428         var retVal []string
429         if err != nil {
430                 return retVal, err
431         }
432         for _, v := range keys {
433                 retVal = append(retVal, strings.Split(v, nsPrefix)[1])
434         }
435         return retVal, err
436 }
437
438 //RemoveAll removes all keys under the namespace. Remove operation is not atomic, thus
439 //it is not guaranteed that all keys are removed.
440 func (s *SyncStorage) RemoveAll(ns string) error {
441         keys, err := s.getDbBackend(ns).Keys(getNsPrefix(ns) + "*")
442         if err != nil {
443                 return err
444         }
445         if (keys != nil) && (len(keys) != 0) {
446                 err = s.getDbBackend(ns).Del(keys)
447         }
448         return err
449 }
450
451 //RemoveAllAndPublish removes all keys under the namespace and if successfull, it
452 //will publish an event to given channel. This operation is not atomic, thus it is
453 //not guaranteed that all keys are removed.
454 func (s *SyncStorage) RemoveAllAndPublish(ns string, channelsAndEvents []string) error {
455         nsPrefix := getNsPrefix(ns)
456         keys, err := s.getDbBackend(ns).Keys(nsPrefix + "*")
457         if err != nil {
458                 return err
459         }
460         if (keys != nil) && (len(keys) != 0) {
461                 if len(channelsAndEvents) == 0 {
462                         return s.getDbBackend(ns).Del(keys)
463                 }
464                 if err := s.checkChannelsAndEvents("RemoveAllAndPublish", channelsAndEvents); err != nil {
465                         return err
466                 }
467                 channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
468                 err = s.getDbBackend(ns).DelMPub(channelsAndEventsPrepared, keys)
469         }
470         return err
471 }
472
473 //AddMember adds a new members to a group under given namespace.
474 //
475 //SDL groups are unordered collections of members where each member is
476 //unique. It is possible to add the same member several times without the
477 //need to check if it already exists.
478 func (s *SyncStorage) AddMember(ns string, group string, member ...interface{}) error {
479         return s.getDbBackend(ns).SAdd(getNsPrefix(ns)+group, member...)
480 }
481
482 //RemoveMember removes members from a group under given namespace.
483 func (s *SyncStorage) RemoveMember(ns string, group string, member ...interface{}) error {
484         return s.getDbBackend(ns).SRem(getNsPrefix(ns)+group, member...)
485 }
486
487 //RemoveGroup removes the whole group along with it's members under given namespace.
488 func (s *SyncStorage) RemoveGroup(ns string, group string) error {
489         return s.getDbBackend(ns).Del([]string{getNsPrefix(ns) + group})
490 }
491
492 //GetMembers returns all the members from a group under given namespace.
493 func (s *SyncStorage) GetMembers(ns string, group string) ([]string, error) {
494         retVal, err := s.getDbBackend(ns).SMembers(getNsPrefix(ns) + group)
495         if err != nil {
496                 return []string{}, err
497         }
498         return retVal, err
499 }
500
501 //IsMember returns true if given member is found from a group under given namespace.
502 func (s *SyncStorage) IsMember(ns string, group string, member interface{}) (bool, error) {
503         retVal, err := s.getDbBackend(ns).SIsMember(getNsPrefix(ns)+group, member)
504         if err != nil {
505                 return false, err
506         }
507         return retVal, err
508 }
509
510 //GroupSize returns the number of members in a group under given namespace.
511 func (s *SyncStorage) GroupSize(ns string, group string) (int64, error) {
512         retVal, err := s.getDbBackend(ns).SCard(getNsPrefix(ns) + group)
513         if err != nil {
514                 return 0, err
515         }
516         return retVal, err
517 }
518
519 func (s *SyncStorage) randomToken() (string, error) {
520         s.mutex.Lock()
521         defer s.mutex.Unlock()
522
523         if len(s.tmp) == 0 {
524                 s.tmp = make([]byte, 16)
525         }
526
527         if _, err := io.ReadFull(rand.Reader, s.tmp); err != nil {
528                 return "", err
529         }
530
531         return base64.RawURLEncoding.EncodeToString(s.tmp), nil
532 }
533
534 //LockResource function is used for locking a resource under given namespace.
535 //The resource lock in practice is a key with random value that is set to expire
536 //after a time period. The value written to key is a random value, thus only the
537 //instance created a lock, can release it. Resource locks are per namespace.
538 func (s *SyncStorage) LockResource(ns string, resource string, expiration time.Duration, opt *Options) (*SyncStorageLock, error) {
539         value, err := s.randomToken()
540         if err != nil {
541                 return nil, err
542         }
543
544         var retryTimer *time.Timer
545         for i, attempts := 0, opt.getRetryCount()+1; i < attempts; i++ {
546                 ok, err := s.getDbBackend(ns).SetNX(getNsPrefix(ns)+resource, value, expiration)
547                 if err != nil {
548                         return nil, err
549                 } else if ok {
550                         return &SyncStorageLock{s: s, key: resource, value: value}, nil
551                 }
552                 if retryTimer == nil {
553                         retryTimer = time.NewTimer(opt.getRetryWait())
554                         defer retryTimer.Stop()
555                 } else {
556                         retryTimer.Reset(opt.getRetryWait())
557                 }
558
559                 select {
560                 case <-retryTimer.C:
561                 }
562         }
563         return nil, errors.New("Lock not obtained")
564 }
565
566 //ReleaseResource removes the lock from a resource under given namespace. If lock
567 //is already expired or some other instance is keeping the lock (lock taken after
568 //expiration), an error is returned.
569 func (l *SyncStorageLock) ReleaseResource(ns string) error {
570         ok, err := l.s.getDbBackend(ns).DelIE(getNsPrefix(ns)+l.key, l.value)
571
572         if err != nil {
573                 return err
574         }
575         if !ok {
576                 return errors.New("Lock not held")
577         }
578         return nil
579 }
580
581 //RefreshResource function can be used to set a new expiration time for the
582 //resource lock (if the lock still exists) under given namespace. The old
583 //remaining expiration time is overwritten with the given new expiration time.
584 func (l *SyncStorageLock) RefreshResource(ns string, expiration time.Duration) error {
585         err := l.s.getDbBackend(ns).PExpireIE(getNsPrefix(ns)+l.key, l.value, expiration)
586         return err
587 }
588
589 //CheckResource returns the expiration time left for a resource under given
590 //namespace. If the resource doesn't exist, -2 is returned.
591 func (s *SyncStorage) CheckResource(ns string, resource string) (time.Duration, error) {
592         result, err := s.getDbBackend(ns).PTTL(getNsPrefix(ns) + resource)
593         if err != nil {
594                 return 0, err
595         }
596         if result == time.Duration(-1) {
597                 return 0, errors.New("invalid resource given, no expiration time attached")
598         }
599         return result, nil
600 }
601
602 //SyncStorageLock struct identifies the resource lock instance. Releasing and adjusting the
603 //expirations are done using the methods defined for this struct.
604 type SyncStorageLock struct {
605         s     *SyncStorage
606         key   string
607         value string
608 }
609
610 func getNsPrefix(ns string) string {
611         return "{" + ns + "},"
612 }
613
614 type iDatabase interface {
615         SubscribeChannelDB(cb func(string, ...string), channelPrefix, eventSeparator string, channels ...string)
616         UnsubscribeChannelDB(channels ...string)
617         MSet(pairs ...interface{}) error
618         MSetMPub(channelsAndEvents []string, pairs ...interface{}) error
619         MGet(keys []string) ([]interface{}, error)
620         CloseDB() error
621         Del(keys []string) error
622         DelMPub(channelsAndEvents []string, keys []string) error
623         Keys(key string) ([]string, error)
624         SetIE(key string, oldData, newData interface{}) (bool, error)
625         SetIEPub(channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error)
626         SetNX(key string, data interface{}, expiration time.Duration) (bool, error)
627         SetNXPub(channelsAndEvents []string, key string, data interface{}) (bool, error)
628         DelIE(key string, data interface{}) (bool, error)
629         DelIEPub(channelsAndEvents []string, key string, data interface{}) (bool, error)
630         SAdd(key string, data ...interface{}) error
631         SRem(key string, data ...interface{}) error
632         SMembers(key string) ([]string, error)
633         SIsMember(key string, data interface{}) (bool, error)
634         SCard(key string) (int64, error)
635         PTTL(key string) (time.Duration, error)
636         PExpireIE(key string, data interface{}, expiration time.Duration) error
637 }