Implement SDL multi-namespace API 61/6061/2 v0.6.0
authorTimo Tietavainen <timo.tietavainen@nokia.com>
Tue, 11 May 2021 12:28:57 +0000 (15:28 +0300)
committerTimo Tietavainen <timo.tietavainen@nokia.com>
Tue, 11 May 2021 13:14:47 +0000 (16:14 +0300)
Add 'SyncStorage' with its write and read APIs to support multi-namespace API
what has been implemented already in Python and C++ SDL APIs. In Multi-
namespace API, at SyncStorage' creation time, connection to database backend is
created but namespace is not defined. Namespace is given later as a parameter
to every SDL read and write API calls what makes it easy to write and read from
different namespaces in single SDL instance of type 'SyncStorage'.

This multi-namespace implementation is a prerequisite to have before sentinel
based DB capacity scaling feature RIC-699 can be implemented properly.

Issue-ID: RIC-699

Signed-off-by: Timo Tietavainen <timo.tietavainen@nokia.com>
Change-Id: I018dde1f016630d6b6e44e66a2f276a04e38505a

bench_test.go
cmd/sdltester/sdltester.go
doc.go
docs/release-notes.rst
example_test.go
sdl.go
sdl_private_fn_test.go
sdl_test.go
syncstorage.go [new file with mode: 0644]

index 328e543..0e4e9dc 100644 (file)
@@ -31,63 +31,100 @@ import (
        "gerrit.o-ran-sc.org/r/ric-plt/sdlgo"
 )
 
-type singleBenchmark struct {
-       key       string
+type nsKeysBenchmark struct {
+       ns        string
+       nsCount   int
+       keyName   string
+       keyCount  int
        keySize   int
        valueSize int
 }
 
-type multiBenchmark struct {
-       keyBase   string
+type keysBenchmark struct {
+       keyName   string
        keyCount  int
        keySize   int
        valueSize int
 }
 
-type setBenchmark struct {
+type groupBenchmark struct {
        key         string
        member      string
        memberCount int
 }
 
-func (bm singleBenchmark) String(oper string) string {
-       return fmt.Sprintf("op = %s key=%d value=%d", oper, bm.keySize, bm.valueSize)
+func (bm nsKeysBenchmark) String(oper string) string {
+       return fmt.Sprintf("op=%s ns-cnt=%d key-cnt=%d key-sz=%d value-sz=%d",
+               oper, bm.nsCount, bm.keyCount, bm.keySize, bm.valueSize)
+}
+
+func (bm keysBenchmark) String(oper string) string {
+       return fmt.Sprintf("op=%s key-cnt=%d key-sz=%d value-sz=%d",
+               oper, bm.keyCount, bm.keySize, bm.valueSize)
 }
 
-func (bm multiBenchmark) String(oper string) string {
-       return fmt.Sprintf("op = %s keycnt=%d key=%d value=%d", oper, bm.keyCount, bm.keySize, bm.valueSize)
+func (bm groupBenchmark) String(oper string) string {
+       return fmt.Sprintf("op=%s, mbr-cnt=%d", oper, bm.memberCount)
 }
 
-func (bm setBenchmark) String(oper string) string {
-       return fmt.Sprintf("op = %s, memberCount=%d", oper, bm.memberCount)
+func getNsKeyBenchmarkInput() []nsKeysBenchmark {
+       return []nsKeysBenchmark{
+               {"ns-a", 1, "a", 100, 10, 64},
+               {"ns-b", 10, "b", 100, 10, 64},
+               {"ns-c", 100, "c", 100, 10, 64},
+               {"ns-d", 1000, "d", 100, 10, 64},
+               {"ns-e", 10000, "e", 100, 10, 64},
+       }
 }
-func BenchmarkSet(b *testing.B) {
-       benchmarks := []singleBenchmark{
-               {"a", 10, 64},
-               {"b", 10, 1024},
-               {"c", 10, 64 * 1024},
-               {"d", 10, 1024 * 1024},
-               {"e", 10, 10 * 1024 * 1024},
-
-               {"f", 100, 64},
-               {"g", 100, 1024},
-               {"h", 100, 64 * 1024},
-               {"i", 100, 1024 * 1024},
-               {"j", 100, 10 * 1024 * 1024},
+
+func getKeyBenchmarkInput() []keysBenchmark {
+       return []keysBenchmark{
+               {"a", 1, 10, 64},
+               {"b", 1, 10, 1024},
+               {"c", 1, 10, 64 * 1024},
+               {"d", 1, 10, 1024 * 1024},
+               {"e", 1, 10, 10 * 1024 * 1024},
+
+               {"f", 1, 100, 64},
+               {"g", 1, 100, 1024},
+               {"h", 1, 100, 64 * 1024},
+               {"i", 1, 100, 1024 * 1024},
+               {"j", 1, 100, 10 * 1024 * 1024},
+
+               {"k", 2, 10, 64},
+               {"l", 10, 10, 64},
+               {"m", 100, 10, 64},
+               {"n", 1000, 10, 64},
+               {"r", 5000, 10, 64},
+
+               {"s", 2, 100, 64},
+               {"t", 10, 100, 64},
+               {"u", 100, 100, 64},
+               {"v", 1000, 100, 64},
+               {"x", 5000, 100, 64},
        }
+}
+
+func BenchmarkMultiNamespaceKeysWrite(b *testing.B) {
+       benchmarks := getNsKeyBenchmarkInput()
 
        for _, bm := range benchmarks {
-               b.Run(bm.String("set"), func(b *testing.B) {
-                       key := strings.Repeat(bm.key, bm.keySize)
+               b.Run(bm.String("ns-keys-set"), func(b *testing.B) {
+                       sdl := sdlgo.NewSyncStorage()
                        value := strings.Repeat("1", bm.valueSize)
-                       sdl := sdlgo.NewSdlInstance("namespace", sdlgo.NewDatabase())
-
+                       keyVals := make([]string, 0)
+                       for i := 0; i < bm.keyCount; i++ {
+                               key := strings.Repeat(bm.keyName+strconv.Itoa(i), bm.keySize)
+                               keyVals = append(keyVals, key, value)
+                       }
                        b.ResetTimer()
                        b.RunParallel(func(pb *testing.PB) {
                                for pb.Next() {
-                                       err := sdl.Set(key, value)
-                                       if err != nil {
-                                               b.Fatal(err)
+                                       for n := 0; n < bm.nsCount; n++ {
+                                               err := sdl.Set(bm.ns+strconv.Itoa(n), keyVals)
+                                               if err != nil {
+                                                       b.Fatal(err)
+                                               }
                                        }
                                }
                        })
@@ -95,35 +132,25 @@ func BenchmarkSet(b *testing.B) {
        }
 }
 
-func BenchmarkGet(b *testing.B) {
-       benchmarks := []singleBenchmark{
-               {"a", 10, 64},
-               {"b", 10, 1024},
-               {"c", 10, 64 * 1024},
-               {"d", 10, 1024 * 1024},
-               {"e", 10, 10 * 1024 * 1024},
-
-               {"f", 100, 64},
-               {"g", 100, 1024},
-               {"h", 100, 64 * 1024},
-               {"i", 100, 1024 * 1024},
-               {"j", 100, 10 * 1024 * 1024},
-       }
+func BenchmarkMultiNamespaceKeysRead(b *testing.B) {
+       benchmarks := getNsKeyBenchmarkInput()
 
        for _, bm := range benchmarks {
-               b.Run(bm.String("Get"), func(b *testing.B) {
-                       key := strings.Repeat(bm.key, bm.keySize)
-                       value := strings.Repeat("1", bm.valueSize)
-                       sdl := sdlgo.NewSdlInstance("namespace", sdlgo.NewDatabase())
-                       if err := sdl.Set(key, value); err != nil {
-                               b.Fatal(err)
+               b.Run(bm.String("keys-get"), func(b *testing.B) {
+                       sdl := sdlgo.NewSyncStorage()
+                       keys := make([]string, 0)
+                       for i := 0; i < bm.keyCount; i++ {
+                               key := strings.Repeat(bm.keyName+strconv.Itoa(i), bm.keySize)
+                               keys = append(keys, key)
                        }
                        b.ResetTimer()
                        b.RunParallel(func(pb *testing.PB) {
                                for pb.Next() {
-                                       _, err := sdl.Get([]string{key})
-                                       if err != nil {
-                                               b.Fatal(err)
+                                       for n := 0; n < bm.nsCount; n++ {
+                                               _, err := sdl.Get(bm.ns+strconv.Itoa(n), keys)
+                                               if err != nil {
+                                                       b.Fatal(err)
+                                               }
                                        }
                                }
                        })
@@ -131,28 +158,16 @@ func BenchmarkGet(b *testing.B) {
        }
 }
 
-func BenchmarkMultiSet(b *testing.B) {
-       benchmarks := []multiBenchmark{
-               {"a", 2, 10, 64},
-               {"b", 10, 10, 64},
-               {"c", 100, 10, 64},
-               {"d", 1000, 10, 64},
-               {"e", 5000, 10, 64},
-
-               {"f", 2, 100, 64},
-               {"g", 10, 100, 64},
-               {"h", 100, 100, 64},
-               {"i", 1000, 100, 64},
-               {"j", 5000, 100, 64},
-       }
+func BenchmarkKeysWrite(b *testing.B) {
+       benchmarks := getKeyBenchmarkInput()
 
        for _, bm := range benchmarks {
-               b.Run(bm.String("mset"), func(b *testing.B) {
+               b.Run(bm.String("keys-set"), func(b *testing.B) {
                        sdl := sdlgo.NewSdlInstance("namespace", sdlgo.NewDatabase())
                        value := strings.Repeat("1", bm.valueSize)
                        keyVals := make([]string, 0)
                        for i := 0; i < bm.keyCount; i++ {
-                               key := strings.Repeat(bm.keyBase+strconv.Itoa(i), bm.keySize)
+                               key := strings.Repeat(bm.keyName+strconv.Itoa(i), bm.keySize)
                                keyVals = append(keyVals, key, value)
                        }
                        b.ResetTimer()
@@ -168,33 +183,21 @@ func BenchmarkMultiSet(b *testing.B) {
        }
 }
 
-func BenchmarkMultiGet(b *testing.B) {
-       benchmarks := []multiBenchmark{
-               {"a", 2, 10, 64},
-               {"b", 10, 10, 64},
-               {"c", 100, 10, 64},
-               {"d", 1000, 10, 64},
-               {"e", 5000, 10, 64},
-
-               {"f", 2, 100, 64},
-               {"g", 10, 100, 64},
-               {"h", 100, 100, 64},
-               {"i", 1000, 100, 64},
-               {"j", 5000, 100, 64},
-       }
+func BenchmarkKeysRead(b *testing.B) {
+       benchmarks := getKeyBenchmarkInput()
 
        for _, bm := range benchmarks {
-               b.Run(bm.String("gset"), func(b *testing.B) {
+               b.Run(bm.String("keys-get"), func(b *testing.B) {
                        sdl := sdlgo.NewSdlInstance("namespace", sdlgo.NewDatabase())
-                       keyVals := make([]string, 0)
+                       keys := make([]string, 0)
                        for i := 0; i < bm.keyCount; i++ {
-                               key := strings.Repeat(bm.keyBase+strconv.Itoa(i), bm.keySize)
-                               keyVals = append(keyVals, key)
+                               key := strings.Repeat(bm.keyName+strconv.Itoa(i), bm.keySize)
+                               keys = append(keys, key)
                        }
                        b.ResetTimer()
                        b.RunParallel(func(pb *testing.PB) {
                                for pb.Next() {
-                                       _, err := sdl.Get(keyVals)
+                                       _, err := sdl.Get(keys)
                                        if err != nil {
                                                b.Fatal(err)
                                        }
@@ -204,8 +207,8 @@ func BenchmarkMultiGet(b *testing.B) {
        }
 }
 
-func BenchmarkSetAddMember(b *testing.B) {
-       benchmarks := []setBenchmark{
+func BenchmarkGroupMemberAdd(b *testing.B) {
+       benchmarks := []groupBenchmark{
                {"a", "x", 1},
                {"b", "x", 100},
                {"c", "x", 10000},
@@ -213,7 +216,7 @@ func BenchmarkSetAddMember(b *testing.B) {
        }
 
        for _, bm := range benchmarks {
-               b.Run(bm.String("AddMember"), func(b *testing.B) {
+               b.Run(bm.String("group-add-member"), func(b *testing.B) {
                        sdl := sdlgo.NewSdlInstance("namespace", sdlgo.NewDatabase())
                        members := make([]string, 0)
                        for i := 0; i < bm.memberCount; i++ {
index 2cf0364..f2eb379 100644 (file)
@@ -65,7 +65,7 @@ import (
  */
 
 func main() {
-       sdl := sdlgo.NewSdlInstance("tag1", sdlgo.NewDatabase())
+       sdl := sdlgo.NewSyncStorage()
 
        if len(os.Args) > 1 {
                switch command := os.Args[1]; command {
@@ -125,11 +125,11 @@ func printHelp() {
        fmt.Println("             is measured")
 }
 
-func write(sdl *sdlgo.SdlInstance) {
+func write(sdl *sdlgo.SyncStorage) {
        data := []byte{0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88, 0x99, 0x00,
                0xAA, 0xBB, 0xCC, 0xDD, 0xEE, 0xFF, 0x11, 0x22, 0x33, 0x44}
        start := time.Now()
-       err := sdl.Set("key1", data)
+       err := sdl.Set("tag1", "key1", data)
        elapsed := time.Since(start)
        if err == nil {
                fmt.Printf("Write: %s\n", elapsed)
@@ -138,10 +138,10 @@ func write(sdl *sdlgo.SdlInstance) {
        }
 }
 
-func read(sdl *sdlgo.SdlInstance) {
+func read(sdl *sdlgo.SyncStorage) {
        k := []string{"key1"}
        start := time.Now()
-       data, err := sdl.Get(k)
+       data, err := sdl.Get("tag1", k)
        elapsed := time.Since(start)
        if err == nil {
                value, ok := data["key1"]
@@ -156,10 +156,10 @@ func read(sdl *sdlgo.SdlInstance) {
        }
 }
 
-func remove(sdl *sdlgo.SdlInstance) {
+func remove(sdl *sdlgo.SyncStorage) {
        k := []string{"key1"}
        start := time.Now()
-       err := sdl.Remove(k)
+       err := sdl.Remove("tag1", k)
        elapsed := time.Since(start)
        if err == nil {
                fmt.Printf("Remove: %s\n", elapsed)
@@ -168,9 +168,9 @@ func remove(sdl *sdlgo.SdlInstance) {
        }
 }
 
-func noexist(sdl *sdlgo.SdlInstance) {
+func noexist(sdl *sdlgo.SyncStorage) {
        start := time.Now()
-       _, err := sdl.Get([]string{"no1", "no2"})
+       _, err := sdl.Get("tag1", []string{"no1", "no2"})
        elapsed := time.Since(start)
        if err == nil {
                fmt.Printf("Noexist: %s\n", elapsed)
@@ -179,9 +179,9 @@ func noexist(sdl *sdlgo.SdlInstance) {
        }
 }
 
-func getall(sdl *sdlgo.SdlInstance) {
+func getall(sdl *sdlgo.SyncStorage) {
        start := time.Now()
-       keys, err := sdl.GetAll()
+       keys, err := sdl.GetAll("tag1")
        elapsed := time.Since(start)
        if err == nil {
                fmt.Printf("Getall: %s\n", elapsed)
@@ -193,9 +193,9 @@ func getall(sdl *sdlgo.SdlInstance) {
        }
 }
 
-func removeall(sdl *sdlgo.SdlInstance) {
+func removeall(sdl *sdlgo.SyncStorage) {
        start := time.Now()
-       err := sdl.RemoveAll()
+       err := sdl.RemoveAll("tag1")
        elapsed := time.Since(start)
        if err == nil {
                fmt.Printf("Removeall: %s\n", elapsed)
@@ -204,9 +204,9 @@ func removeall(sdl *sdlgo.SdlInstance) {
        }
 }
 
-func emptymap(sdl *sdlgo.SdlInstance) {
+func emptymap(sdl *sdlgo.SyncStorage) {
        start := time.Now()
-       err := sdl.Set("", "")
+       err := sdl.Set("tag1", "", "")
        elapsed := time.Since(start)
        if err == nil {
                fmt.Printf("Emptymap: %s\n", elapsed)
@@ -215,11 +215,11 @@ func emptymap(sdl *sdlgo.SdlInstance) {
        }
 }
 
-func multiple(sdl *sdlgo.SdlInstance) {
+func multiple(sdl *sdlgo.SyncStorage) {
        data := []byte{0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88, 0x99, 0x00,
                0xAA, 0xBB, 0xCC, 0xDD, 0xEE, 0xFF, 0x11, 0x22, 0x33, 0x44}
        start := time.Now()
-       err := sdl.Set("key1m", data)
+       err := sdl.Set("tag1", "key1m", data)
        elapsed := time.Since(start)
        if err == nil {
                fmt.Printf("Multiple: %s ", elapsed)
@@ -227,7 +227,7 @@ func multiple(sdl *sdlgo.SdlInstance) {
                fmt.Println(err)
        }
        start = time.Now()
-       err = sdl.Set("key2m", data)
+       err = sdl.Set("tag1", "key2m", data)
        elapsed = time.Since(start)
        if err == nil {
                fmt.Printf(" %s \n", elapsed)
@@ -236,10 +236,10 @@ func multiple(sdl *sdlgo.SdlInstance) {
        }
 }
 
-func emptydata(sdl *sdlgo.SdlInstance) {
+func emptydata(sdl *sdlgo.SyncStorage) {
        data := []byte{}
        start := time.Now()
-       err := sdl.Set("key1", data)
+       err := sdl.Set("tag1", "key1", data)
        elapsed := time.Since(start)
        if err == nil {
                fmt.Printf("Emptydata: %s\n", elapsed)
@@ -248,13 +248,13 @@ func emptydata(sdl *sdlgo.SdlInstance) {
        }
 }
 
-func writeif(sdl *sdlgo.SdlInstance) {
+func writeif(sdl *sdlgo.SyncStorage) {
        oldVec := []byte{0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88, 0x99, 0x00,
                0xAA, 0xBB, 0xCC, 0xDD, 0xEE, 0xFF, 0x11, 0x22, 0x33, 0x44}
        newVec := []byte{0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88, 0x99, 0x00,
                0xAA, 0xBB, 0xCC, 0xDD, 0xEE, 0xFF, 0x11, 0x22, 0x33, 0x66}
        start := time.Now()
-       _, err := sdl.SetIf("key1", oldVec, newVec)
+       _, err := sdl.SetIf("tag1", "key1", oldVec, newVec)
        elapsed := time.Since(start)
        if err == nil {
                fmt.Printf("Writeif: %s\n", elapsed)
@@ -263,11 +263,11 @@ func writeif(sdl *sdlgo.SdlInstance) {
        }
 }
 
-func writeifnot(sdl *sdlgo.SdlInstance) {
+func writeifnot(sdl *sdlgo.SyncStorage) {
        vec := []byte{0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88, 0x99, 0x00,
                0xAA, 0xBB, 0xCC, 0xDD, 0xEE, 0xFF, 0x11, 0x22, 0x33, 0x88}
        start := time.Now()
-       _, err := sdl.SetIfNotExists("key1", vec)
+       _, err := sdl.SetIfNotExists("tag1", "key1", vec)
        elapsed := time.Since(start)
        if err == nil {
                fmt.Printf("Writeifnot: %s\n", elapsed)
@@ -276,11 +276,11 @@ func writeifnot(sdl *sdlgo.SdlInstance) {
        }
 }
 
-func removeif(sdl *sdlgo.SdlInstance) {
+func removeif(sdl *sdlgo.SyncStorage) {
        vec := []byte{0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88, 0x99, 0x00,
                0xAA, 0xBB, 0xCC, 0xDD, 0xEE, 0xFF, 0x11, 0x22, 0x33, 0x88}
        start := time.Now()
-       _, err := sdl.RemoveIf("key1", vec)
+       _, err := sdl.RemoveIf("tag1", "key1", vec)
        elapsed := time.Since(start)
        if err == nil {
                fmt.Printf("Removeif: %s\n", elapsed)
diff --git a/doc.go b/doc.go
index 2dbff84..e054760 100644 (file)
--- a/doc.go
+++ b/doc.go
@@ -37,27 +37,36 @@ multiple goroutines.
 
 Namespace
 
-A shared data layer connection is instantiated to a namespace and data is
-always read and modified within the namespace. Namespace provides data isolation across clients.
-Namespace instantiation happens when the sdlgo instance is created.
-E.g. the following code sets the SDL instance to use "example" as a namespace
-  sdl := sdlgo.NewSdlInstance("example", sdlgo.NewDatabase())
-Applications can use several namespaces at the same time by creating several sdlgo instances.
+Namespace concept in a shared data layer connection is to isolate data write and read operations
+to happen within particular namespace.
+
+SDL instance
+
+There are two ways to create SDL instance, the first preferable option is to create so called
+SDL multi-namespace instance with `sdlgo.NewSyncStorage` call. The second option is to create
+SDL instance with `sdlgo.NewSdlInstance` call. Latter SDL instance creation method is deprecated
+and it should not be used anymore in any new application implementations, it is left to SDL API
+to guarantee backward compatibility for the old application implementations.
+The difference between multi-namespace `SyncStorage` SDL instance and the old one is that in
+`SyncStorage` case namespace is not defined at instance creation time, but it is defined when
+SDL read and write APIs are called. This means that with SDL `SyncStorage` instance it is much
+easier to write and read data from different namespaces in a single application client compared
+to the old SDL API where you needed to create own SDL instance for each namespace going to be
+used later to write and read data.
 
 Database connection
 
-In addition to creating the SDL instance, application is responsible for creating the backend
-database connection with NewDatabase() method. When the connection is created, sdlgo shall open a
-tcp connection to backend database immediatelly. The resulting connection may be used with several
-SDL instances if application needs to use several namespaces at a time. E.g.
-  connection := sdlgo.NewDatabase()
-  ns1 := sdlgo.NewSdlInstance("ns1", connection)
-  ns2 := sdlgo.NewSdlInstance("ns2", connection)
+When `SyncStorage` instance is created, it also creates database backend connection, this means
+that sdlgo shall open a tcp connection to backend database. Below is example how to create SDL
+`SyncStorage` instance and what also connects to database backend under the hood:
+  sdl := sdlgo.NewSyncStorage()
+
 For the backend database connection a circuit breaker design is used. If the connection fails,
-an error is returned immediatelly to application. Restoration of the connection happens
+an error is returned immediately to application. Restoration of the connection happens
 automatically by SDL and application should retry the operation again after a while.
 
-Database service is discovered by using environment variables DBAAS_SERVICE_HOST and
+Database service is discovered by using DBAAS* environment variables. For simple standalone
+DBAAS case there are needed two environment variables: DBAAS_SERVICE_HOST and
 DBAAS_SERVICE_PORT. If not set, localhost and port 6379 are used by default.
 
 Keys and data
@@ -70,11 +79,11 @@ Clients are responsible for managing the keys within a namespace.
 Some examples on how to set the data using different kind of input parameters:
 
 Basic usage, keys and values are given as own parameters (with mixed data types)
-  err := s.Set("key1", "value1", "key2", 2)
+  err := s.Set("example-namaspace", "key1", "value1", "key2", 2)
 
 Keys and values inside a slice (again with mixed types, thus empty interface used as a type)
   exampleSlice := []interface{"key1", "value1", "key2", 2}
-  err := s.Set(exampleSlice)
+  err := s.Set("example-namaspace", exampleSlice)
 
 Data stored to a byte array
   data := make([]byte), 3
@@ -108,7 +117,10 @@ channels are per namespace. It is possible to publish several kinds of events th
 
 In order to publish changes to SDL data, the publisher need to call an API function that supports
 publishing. E.g.
-   err := sdl.SetAndPublish([]string{"channel1", "event1", "channel2", "event2"}, "key", "value")
+   err := sdl.SetAndPublish("example-namespace", []string{
+      "channel1", "event1", "channel2", "event2"}, "key", "value",
+   )
+
 This example will publish event1 to channel1 and event2 in channel2 after writing the data.
 
 When subscribing the channels, the application needs to first create an SDL instance for the desired
@@ -118,7 +130,7 @@ received for the given channel, the given callback function shall be called with
 It is possible to make several subscriptions for different channels using different callback
 functions if different kind of handling is required.
 
-  sdl := sdlgo.NewSdlInstance("namespace", sdlgo.NewDatabase())
+  sdl := sdlgo.NewSyncStorage()
 
   cb1 := func(channel string, event ...string) {
     fmt.Printf("cb1: Received %s from channel %s\n", event, channel)
@@ -128,10 +140,10 @@ functions if different kind of handling is required.
     fmt.Printf("cb2: Received %s from channel %s\n", event, channel)
   }
 
-  sdl.SubscribeChannel(cb1, "channel1", "channel2")
-  sdl.SubscribeChannel(cb2, "channel3")
+  sdl.SubscribeChannel("example-namespace", cb1, "channel1", "channel2")
+  sdl.SubscribeChannel("example-namespace", cb2, "channel3")
 
-This example subscribes three channels from "namespace" and assigns cb1 for channel1 and channel2
+This example subscribes three channels from "example-namespace" and assigns cb1 for channel1 and channel2
 whereas channel3 is assigned to cb2.
 
 The callbacks are called from a context of a goroutine that is listening for the events. When
index 1af8a8f..d0e8f02 100644 (file)
@@ -30,6 +30,10 @@ This document provides the release notes of the sdlgo.
 Version history
 ---------------
 
+[0.6.0] - 2021-05-11
+
+* Add SDL multi-namespace API 'SyncStorage'.
+
 [0.5.5] - 2021-03-09
 
 * Take DBAAS multi-channel publishing Redis modules into use.
index 4743e6e..d3be2f7 100644 (file)
@@ -28,14 +28,14 @@ import (
        "gerrit.o-ran-sc.org/r/ric-plt/sdlgo"
 )
 
-var sdl *sdlgo.SdlInstance
+var sdl *sdlgo.SyncStorage
 
 func init() {
-       sdl = sdlgo.NewSdlInstance("namespace", sdlgo.NewDatabase())
+       sdl = sdlgo.NewSyncStorage()
 }
 
 func ExampleSdlInstance_Set() {
-       err := sdl.Set("stringdata", "data", "intdata", 42)
+       err := sdl.Set("namespace", "stringdata", "data", "intdata", 42)
        if err != nil {
                panic(err)
        } else {
@@ -45,7 +45,7 @@ func ExampleSdlInstance_Set() {
 }
 
 func ExampleSdlInstance_Get() {
-       retMap, err := sdl.Get([]string{"stringdata", "intdata"})
+       retMap, err := sdl.Get("namespace", []string{"stringdata", "intdata"})
        if err != nil {
                panic(err)
        } else {
diff --git a/sdl.go b/sdl.go
index b2f47bd..afc4781 100644 (file)
--- a/sdl.go
+++ b/sdl.go
 package sdlgo
 
 import (
-       "crypto/rand"
-       "encoding/base64"
-       "errors"
-       "fmt"
-       "io"
-       "reflect"
-       "strings"
-       "sync"
        "time"
 
        "gerrit.o-ran-sc.org/r/ric-plt/sdlgo/internal/sdlgoredis"
@@ -39,12 +31,9 @@ import (
 //SdlInstance provides an API to read, write and modify
 //key-value pairs in a given namespace.
 type SdlInstance struct {
-       nameSpace      string
-       nsPrefix       string
-       eventSeparator string
-       mutex          sync.Mutex
-       tmp            []byte
-       iDatabase
+       nameSpace string
+       nsPrefix  string
+       storage   *SyncStorage
 }
 
 //Database struct is a holder for the internal database instance. Applications
@@ -68,10 +57,9 @@ func NewDatabase() *Database {
 //The database used as a backend is given as a parameter
 func NewSdlInstance(NameSpace string, db *Database) *SdlInstance {
        return &SdlInstance{
-               nameSpace:      NameSpace,
-               nsPrefix:       "{" + NameSpace + "},",
-               eventSeparator: "___",
-               iDatabase:      db.instance,
+               nameSpace: NameSpace,
+               nsPrefix:  "{" + NameSpace + "},",
+               storage:   newSyncStorage(db),
        }
 }
 
@@ -94,131 +82,18 @@ func NewSdlInstance(NameSpace string, db *Database) *SdlInstance {
 //and using of Go signals is recommended. Also it should be noted that in case of several
 //events received from different channels, callbacks are called in series one by one.
 func (s *SdlInstance) SubscribeChannel(cb func(string, ...string), channels ...string) error {
-       s.SubscribeChannelDB(cb, s.nsPrefix, s.eventSeparator, s.setNamespaceToChannels(channels...)...)
+       s.storage.SubscribeChannel(s.nameSpace, cb, channels...)
        return nil
 }
 
 //UnsubscribeChannel removes subscription from one or several channels.
 func (s *SdlInstance) UnsubscribeChannel(channels ...string) error {
-       s.UnsubscribeChannelDB(s.setNamespaceToChannels(channels...)...)
-       return nil
+       return s.storage.UnsubscribeChannel(s.nameSpace, channels...)
 }
 
 //Close connection to backend database.
 func (s *SdlInstance) Close() error {
-       return s.CloseDB()
-}
-
-func (s *SdlInstance) checkChannelsAndEvents(cmd string, channelsAndEvents []string) error {
-       if len(channelsAndEvents)%2 != 0 {
-               return fmt.Errorf("%s: Channels and events must be given as pairs", cmd)
-       }
-       for i, v := range channelsAndEvents {
-               if i%2 != 0 {
-                       if strings.Contains(v, s.eventSeparator) {
-                               return fmt.Errorf("%s: event %s contains illegal substring (\"%s\")", cmd, v, s.eventSeparator)
-                       }
-               }
-       }
-       return nil
-}
-func (s *SdlInstance) setNamespaceToChannels(channels ...string) []string {
-       var retVal []string
-       for _, v := range channels {
-               retVal = append(retVal, s.nsPrefix+v)
-       }
-       return retVal
-}
-
-func (s *SdlInstance) setNamespaceToKeys(pairs ...interface{}) ([]interface{}, error) {
-       retVal := make([]interface{}, 0)
-       shouldBeKey := true
-       for _, v := range pairs {
-               reflectType := reflect.TypeOf(v)
-               switch reflectType.Kind() {
-               case reflect.Map:
-                       x := reflect.ValueOf(v).MapRange()
-                       for x.Next() {
-                               retVal = append(retVal, s.nsPrefix+x.Key().Interface().(string))
-                               retVal = append(retVal, x.Value().Interface())
-                       }
-               case reflect.Slice:
-                       if shouldBeKey {
-                               x := reflect.ValueOf(v)
-                               if x.Len()%2 != 0 {
-                                       return []interface{}{}, errors.New("Key/value pairs doesn't match")
-                               }
-                               for i2 := 0; i2 < x.Len(); i2++ {
-                                       if i2%2 == 0 {
-                                               retVal = append(retVal, s.nsPrefix+x.Index(i2).Interface().(string))
-                                       } else {
-                                               retVal = append(retVal, x.Index(i2).Interface())
-                                       }
-                               }
-                       } else {
-                               if reflectType.Elem().Kind() == reflect.Uint8 {
-                                       retVal = append(retVal, v)
-                                       shouldBeKey = true
-                               } else {
-                                       return []interface{}{}, errors.New("Key/value pairs doesn't match")
-                               }
-                       }
-               case reflect.Array:
-                       if shouldBeKey {
-                               x := reflect.ValueOf(v)
-                               if x.Len()%2 != 0 {
-                                       return []interface{}{}, errors.New("Key/value pairs doesn't match")
-                               }
-                               for i2 := 0; i2 < x.Len(); i2++ {
-                                       if i2%2 == 0 {
-                                               retVal = append(retVal, s.nsPrefix+x.Index(i2).Interface().(string))
-                                       } else {
-                                               retVal = append(retVal, x.Index(i2).Interface())
-                                       }
-                               }
-                       } else {
-                               if reflectType.Elem().Kind() == reflect.Uint8 {
-                                       retVal = append(retVal, v)
-                                       shouldBeKey = true
-                               } else {
-                                       return []interface{}{}, errors.New("Key/value pairs doesn't match")
-                               }
-                       }
-               default:
-                       if shouldBeKey {
-                               retVal = append(retVal, s.nsPrefix+v.(string))
-                               shouldBeKey = false
-                       } else {
-                               retVal = append(retVal, v)
-                               shouldBeKey = true
-                       }
-               }
-       }
-       if len(retVal)%2 != 0 {
-               return []interface{}{}, errors.New("Key/value pairs doesn't match")
-       }
-       return retVal, nil
-}
-
-func (s *SdlInstance) prepareChannelsAndEvents(channelsAndEvents []string) []string {
-       channelEventMap := make(map[string]string)
-       for i, v := range channelsAndEvents {
-               if i%2 != 0 {
-                       continue
-               }
-               _, exists := channelEventMap[v]
-               if exists {
-                       channelEventMap[v] = channelEventMap[v] + s.eventSeparator + channelsAndEvents[i+1]
-               } else {
-                       channelEventMap[v] = channelsAndEvents[i+1]
-               }
-       }
-       retVal := make([]string, 0)
-       for k, v := range channelEventMap {
-               retVal = append(retVal, s.nsPrefix+k)
-               retVal = append(retVal, v)
-       }
-       return retVal
+       return s.storage.Close()
 }
 
 //SetAndPublish function writes data to shared data layer storage and sends an event to
@@ -235,18 +110,7 @@ func (s *SdlInstance) prepareChannelsAndEvents(channelsAndEvents []string) []str
 //  E.g. []{"channel1", "event1", "channel2", "event2", "channel1", "event3"}
 //will send event1 and event3 to channel1 and event2 to channel2.
 func (s *SdlInstance) SetAndPublish(channelsAndEvents []string, pairs ...interface{}) error {
-       keyAndData, err := s.setNamespaceToKeys(pairs...)
-       if err != nil {
-               return err
-       }
-       if len(channelsAndEvents) == 0 {
-               return s.MSet(keyAndData...)
-       }
-       if err := s.checkChannelsAndEvents("SetAndPublish", channelsAndEvents); err != nil {
-               return err
-       }
-       channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
-       return s.MSetMPub(channelsAndEventsPrepared, keyAndData...)
+       return s.storage.SetAndPublish(s.nameSpace, channelsAndEvents, pairs...)
 }
 
 //Set function writes data to shared data layer storage. Writing is done
@@ -256,58 +120,27 @@ func (s *SdlInstance) SetAndPublish(channelsAndEvents []string, pairs ...interfa
 //The key is expected to be string whereas value can be anything, string,
 //number, slice array or map
 func (s *SdlInstance) Set(pairs ...interface{}) error {
-       if len(pairs) == 0 {
-               return nil
-       }
-
-       keyAndData, err := s.setNamespaceToKeys(pairs...)
-       if err != nil {
-               return err
-       }
-       return s.MSet(keyAndData...)
+       return s.storage.Set(s.nameSpace, pairs...)
 }
 
 //Get function atomically reads one or more keys from SDL. The returned map has the
 //requested keys as index and data as value. If the requested key is not found
 //from SDL, it's value is nil
 func (s *SdlInstance) Get(keys []string) (map[string]interface{}, error) {
-       m := make(map[string]interface{})
-       if len(keys) == 0 {
-               return m, nil
-       }
-
-       var keysWithNs []string
-       for _, v := range keys {
-               keysWithNs = append(keysWithNs, s.nsPrefix+v)
-       }
-       val, err := s.MGet(keysWithNs)
-       if err != nil {
-               return m, err
-       }
-       for i, v := range val {
-               m[keys[i]] = v
-       }
-       return m, err
+       return s.storage.Get(s.nameSpace, keys)
 }
 
 //SetIfAndPublish atomically replaces existing data with newData in SDL if data matches the oldData.
 //If replace was done successfully, true will be returned. Also, if publishing was successfull, an event
 //is published to a given channel.
 func (s *SdlInstance) SetIfAndPublish(channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error) {
-       if len(channelsAndEvents) == 0 {
-               return s.SetIE(s.nsPrefix+key, oldData, newData)
-       }
-       if err := s.checkChannelsAndEvents("SetIfAndPublish", channelsAndEvents); err != nil {
-               return false, err
-       }
-       channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
-       return s.SetIEPub(channelsAndEventsPrepared, s.nsPrefix+key, oldData, newData)
+       return s.storage.SetIfAndPublish(s.nameSpace, channelsAndEvents, key, oldData, newData)
 }
 
 //SetIf atomically replaces existing data with newData in SDL if data matches the oldData.
 //If replace was done successfully, true will be returned.
 func (s *SdlInstance) SetIf(key string, oldData, newData interface{}) (bool, error) {
-       return s.SetIE(s.nsPrefix+key, oldData, newData)
+       return s.storage.SetIf(s.nameSpace, key, oldData, newData)
 }
 
 //SetIfNotExistsAndPublish conditionally sets the value of a key. If key already exists in SDL,
@@ -315,21 +148,14 @@ func (s *SdlInstance) SetIf(key string, oldData, newData interface{}) (bool, err
 //is done atomically. If the set operation was done successfully, an event is published to a
 //given channel.
 func (s *SdlInstance) SetIfNotExistsAndPublish(channelsAndEvents []string, key string, data interface{}) (bool, error) {
-       if len(channelsAndEvents) == 0 {
-               return s.SetNX(s.nsPrefix+key, data, 0)
-       }
-       if err := s.checkChannelsAndEvents("SetIfNotExistsAndPublish", channelsAndEvents); err != nil {
-               return false, err
-       }
-       channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
-       return s.SetNXPub(channelsAndEventsPrepared, s.nsPrefix+key, data)
+       return s.storage.SetIfNotExistsAndPublish(s.nameSpace, channelsAndEvents, key, data)
 }
 
 //SetIfNotExists conditionally sets the value of a key. If key already exists in SDL,
 //then it's value is not changed. Checking the key existence and potential set operation
 //is done atomically.
 func (s *SdlInstance) SetIfNotExists(key string, data interface{}) (bool, error) {
-       return s.SetNX(s.nsPrefix+key, data, 0)
+       return s.storage.SetIfNotExists(s.nameSpace, key, data)
 }
 
 //RemoveAndPublish removes data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
@@ -338,108 +164,44 @@ func (s *SdlInstance) SetIfNotExists(key string, data interface{}) (bool, error)
 //at least one key is removed (if several keys given). If the given key(s) doesn't exist
 //when trying to remove, no event is published.
 func (s *SdlInstance) RemoveAndPublish(channelsAndEvents []string, keys []string) error {
-       if len(keys) == 0 {
-               return nil
-       }
-
-       var keysWithNs []string
-       for _, v := range keys {
-               keysWithNs = append(keysWithNs, s.nsPrefix+v)
-       }
-       if len(channelsAndEvents) == 0 {
-               return s.Del(keysWithNs)
-       }
-       if err := s.checkChannelsAndEvents("RemoveAndPublish", channelsAndEvents); err != nil {
-               return err
-       }
-       channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
-       return s.DelMPub(channelsAndEventsPrepared, keysWithNs)
+       return s.storage.RemoveAndPublish(s.nameSpace, channelsAndEvents, keys)
 }
 
 //Remove data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
 func (s *SdlInstance) Remove(keys []string) error {
-       if len(keys) == 0 {
-               return nil
-       }
-
-       var keysWithNs []string
-       for _, v := range keys {
-               keysWithNs = append(keysWithNs, s.nsPrefix+v)
-       }
-       err := s.Del(keysWithNs)
-       return err
+       return s.storage.Remove(s.nameSpace, keys)
 }
 
 //RemoveIfAndPublish removes data from SDL conditionally and if remove was done successfully,
 //a given event is published to channel. If existing data matches given data,
 //key and data are removed from SDL. If remove was done successfully, true is returned.
 func (s *SdlInstance) RemoveIfAndPublish(channelsAndEvents []string, key string, data interface{}) (bool, error) {
-       if len(channelsAndEvents) == 0 {
-               return s.DelIE(s.nsPrefix+key, data)
-       }
-       if err := s.checkChannelsAndEvents("RemoveIfAndPublish", channelsAndEvents); err != nil {
-               return false, err
-       }
-       channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
-       return s.DelIEPub(channelsAndEventsPrepared, s.nsPrefix+key, data)
+       return s.storage.RemoveIfAndPublish(s.nameSpace, channelsAndEvents, key, data)
 }
 
 //RemoveIf removes data from SDL conditionally. If existing data matches given data,
 //key and data are removed from SDL. If remove was done successfully, true is returned.
 func (s *SdlInstance) RemoveIf(key string, data interface{}) (bool, error) {
-       status, err := s.DelIE(s.nsPrefix+key, data)
-       if err != nil {
-               return false, err
-       }
-       return status, nil
+       return s.storage.RemoveIf(s.nameSpace, key, data)
 }
 
 //GetAll returns all keys under the namespace. No prior knowledge about the keys in the
 //given namespace exists, thus operation is not guaranteed to be atomic or isolated.
 func (s *SdlInstance) GetAll() ([]string, error) {
-       keys, err := s.Keys(s.nsPrefix + "*")
-       var retVal []string
-       if err != nil {
-               return retVal, err
-       }
-       for _, v := range keys {
-               retVal = append(retVal, strings.Split(v, s.nsPrefix)[1])
-       }
-       return retVal, err
+       return s.storage.GetAll(s.nameSpace)
 }
 
 //RemoveAll removes all keys under the namespace. Remove operation is not atomic, thus
 //it is not guaranteed that all keys are removed.
 func (s *SdlInstance) RemoveAll() error {
-       keys, err := s.Keys(s.nsPrefix + "*")
-       if err != nil {
-               return err
-       }
-       if (keys != nil) && (len(keys) != 0) {
-               err = s.Del(keys)
-       }
-       return err
+       return s.storage.RemoveAll(s.nameSpace)
 }
 
 //RemoveAllAndPublish removes all keys under the namespace and if successfull, it
 //will publish an event to given channel. This operation is not atomic, thus it is
 //not guaranteed that all keys are removed.
 func (s *SdlInstance) RemoveAllAndPublish(channelsAndEvents []string) error {
-       keys, err := s.Keys(s.nsPrefix + "*")
-       if err != nil {
-               return err
-       }
-       if (keys != nil) && (len(keys) != 0) {
-               if len(channelsAndEvents) == 0 {
-                       return s.Del(keys)
-               }
-               if err := s.checkChannelsAndEvents("RemoveAllAndPublish", channelsAndEvents); err != nil {
-                       return err
-               }
-               channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
-               err = s.DelMPub(channelsAndEventsPrepared, keys)
-       }
-       return err
+       return s.storage.RemoveAllAndPublish(s.nameSpace, channelsAndEvents)
 }
 
 //AddMember adds a new members to a group.
@@ -448,59 +210,32 @@ func (s *SdlInstance) RemoveAllAndPublish(channelsAndEvents []string) error {
 //unique. It is possible to add the same member several times without the
 //need to check if it already exists.
 func (s *SdlInstance) AddMember(group string, member ...interface{}) error {
-       return s.SAdd(s.nsPrefix+group, member...)
+       return s.storage.AddMember(s.nameSpace, group, member...)
 }
 
 //RemoveMember removes members from a group.
 func (s *SdlInstance) RemoveMember(group string, member ...interface{}) error {
-       return s.SRem(s.nsPrefix+group, member...)
+       return s.storage.RemoveMember(s.nameSpace, group, member...)
 }
 
 //RemoveGroup removes the whole group along with it's members.
 func (s *SdlInstance) RemoveGroup(group string) error {
-       return s.Del([]string{s.nsPrefix + group})
+       return s.storage.RemoveGroup(s.nameSpace, group)
 }
 
 //GetMembers returns all the members from a group.
 func (s *SdlInstance) GetMembers(group string) ([]string, error) {
-       retVal, err := s.SMembers(s.nsPrefix + group)
-       if err != nil {
-               return []string{}, err
-       }
-       return retVal, err
+       return s.storage.GetMembers(s.nameSpace, group)
 }
 
 //IsMember returns true if given member is found from a group.
 func (s *SdlInstance) IsMember(group string, member interface{}) (bool, error) {
-       retVal, err := s.SIsMember(s.nsPrefix+group, member)
-       if err != nil {
-               return false, err
-       }
-       return retVal, err
+       return s.storage.IsMember(s.nameSpace, group, member)
 }
 
 //GroupSize returns the number of members in a group.
 func (s *SdlInstance) GroupSize(group string) (int64, error) {
-       retVal, err := s.SCard(s.nsPrefix + group)
-       if err != nil {
-               return 0, err
-       }
-       return retVal, err
-}
-
-func (s *SdlInstance) randomToken() (string, error) {
-       s.mutex.Lock()
-       defer s.mutex.Unlock()
-
-       if len(s.tmp) == 0 {
-               s.tmp = make([]byte, 16)
-       }
-
-       if _, err := io.ReadFull(rand.Reader, s.tmp); err != nil {
-               return "", err
-       }
-
-       return base64.RawURLEncoding.EncodeToString(s.tmp), nil
+       return s.storage.GroupSize(s.nameSpace, group)
 }
 
 //LockResource function is used for locking a resource. The resource lock in
@@ -508,67 +243,34 @@ func (s *SdlInstance) randomToken() (string, error) {
 //period. The value written to key is a random value, thus only the instance
 //created a lock, can release it. Resource locks are per namespace.
 func (s *SdlInstance) LockResource(resource string, expiration time.Duration, opt *Options) (*Lock, error) {
-       value, err := s.randomToken()
-       if err != nil {
-               return nil, err
+       l, err := s.storage.LockResource(s.nameSpace, resource, expiration, opt)
+       if l != nil {
+               return &Lock{
+                       s:           s,
+                       storageLock: l,
+               }, err
        }
-
-       var retryTimer *time.Timer
-       for i, attempts := 0, opt.getRetryCount()+1; i < attempts; i++ {
-               ok, err := s.SetNX(s.nsPrefix+resource, value, expiration)
-               if err != nil {
-                       return nil, err
-               } else if ok {
-                       return &Lock{s: s, key: resource, value: value}, nil
-               }
-               if retryTimer == nil {
-                       retryTimer = time.NewTimer(opt.getRetryWait())
-                       defer retryTimer.Stop()
-               } else {
-                       retryTimer.Reset(opt.getRetryWait())
-               }
-
-               select {
-               case <-retryTimer.C:
-               }
-       }
-       return nil, errors.New("Lock not obtained")
+       return nil, err
 }
 
 //ReleaseResource removes the lock from a resource. If lock is already
 //expired or some other instance is keeping the lock (lock taken after expiration),
 //an error is returned.
 func (l *Lock) ReleaseResource() error {
-       ok, err := l.s.DelIE(l.s.nsPrefix+l.key, l.value)
-
-       if err != nil {
-               return err
-       }
-       if !ok {
-               return errors.New("Lock not held")
-       }
-       return nil
+       return l.storageLock.ReleaseResource(l.s.nameSpace)
 }
 
 //RefreshResource function can be used to set a new expiration time for the
 //resource lock (if the lock still exists). The old remaining expiration
 //time is overwritten with the given new expiration time.
 func (l *Lock) RefreshResource(expiration time.Duration) error {
-       err := l.s.PExpireIE(l.s.nsPrefix+l.key, l.value, expiration)
-       return err
+       return l.storageLock.RefreshResource(l.s.nameSpace, expiration)
 }
 
 //CheckResource returns the expiration time left for a resource.
 //If the resource doesn't exist, -2 is returned.
 func (s *SdlInstance) CheckResource(resource string) (time.Duration, error) {
-       result, err := s.PTTL(s.nsPrefix + resource)
-       if err != nil {
-               return 0, err
-       }
-       if result == time.Duration(-1) {
-               return 0, errors.New("invalid resource given, no expiration time attached")
-       }
-       return result, nil
+       return s.storage.CheckResource(s.nameSpace, resource)
 }
 
 //Options struct defines the behaviour for getting the resource lock.
@@ -599,32 +301,6 @@ func (o *Options) getRetryWait() time.Duration {
 //Lock struct identifies the resource lock instance. Releasing and adjusting the
 //expirations are done using the methods defined for this struct.
 type Lock struct {
-       s     *SdlInstance
-       key   string
-       value string
-}
-
-type iDatabase interface {
-       SubscribeChannelDB(cb func(string, ...string), channelPrefix, eventSeparator string, channels ...string)
-       UnsubscribeChannelDB(channels ...string)
-       MSet(pairs ...interface{}) error
-       MSetMPub(channelsAndEvents []string, pairs ...interface{}) error
-       MGet(keys []string) ([]interface{}, error)
-       CloseDB() error
-       Del(keys []string) error
-       DelMPub(channelsAndEvents []string, keys []string) error
-       Keys(key string) ([]string, error)
-       SetIE(key string, oldData, newData interface{}) (bool, error)
-       SetIEPub(channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error)
-       SetNX(key string, data interface{}, expiration time.Duration) (bool, error)
-       SetNXPub(channelsAndEvents []string, key string, data interface{}) (bool, error)
-       DelIE(key string, data interface{}) (bool, error)
-       DelIEPub(channelsAndEvents []string, key string, data interface{}) (bool, error)
-       SAdd(key string, data ...interface{}) error
-       SRem(key string, data ...interface{}) error
-       SMembers(key string) ([]string, error)
-       SIsMember(key string, data interface{}) (bool, error)
-       SCard(key string) (int64, error)
-       PTTL(key string) (time.Duration, error)
-       PExpireIE(key string, data interface{}, expiration time.Duration) error
+       s           *SdlInstance
+       storageLock *SyncStorageLock
 }
index 541ee04..318d84f 100644 (file)
@@ -26,9 +26,8 @@ package sdlgo
 //underlying redis implementation with mock
 func NewSdlInstanceForTest(NameSpace string, instance iDatabase) *SdlInstance {
        return &SdlInstance{
-               nameSpace:      NameSpace,
-               nsPrefix:       "{" + NameSpace + "},",
-               eventSeparator: "___",
-               iDatabase:      instance,
+               nameSpace: NameSpace,
+               nsPrefix:  "{" + NameSpace + "},",
+               storage:   newSyncStorage(&Database{instance: instance}),
        }
 }
index acacfd6..8dc9996 100644 (file)
@@ -169,6 +169,24 @@ func verifySliceInOrder(a, b []string) bool {
 
 }
 
+func TestClose(t *testing.T) {
+       m, i := setup()
+
+       m.On("CloseDB").Return(nil)
+       err := i.Close()
+       assert.Nil(t, err)
+       m.AssertExpectations(t)
+}
+
+func TestCloseReturnError(t *testing.T) {
+       m, i := setup()
+
+       m.On("CloseDB").Return(errors.New("Some error"))
+       err := i.Close()
+       assert.NotNil(t, err)
+       m.AssertExpectations(t)
+}
+
 func TestSubscribeChannel(t *testing.T) {
        m, i := setup()
 
@@ -591,8 +609,8 @@ func TestRemoveAndPublishIncorrectChannel(t *testing.T) {
 
        m.AssertNotCalled(t, "DelMPub", notExpectedChannelAndEvent, notExpectedKeys)
        m.AssertNotCalled(t, "Del", notExpectedKeys)
-       err := i.RemoveAndPublish([]string{"channel", "event", "channel2"}, []string{})
-       assert.Nil(t, err)
+       err := i.RemoveAndPublish([]string{"channel", "event", "channel2"}, []string{"key1", "key2"})
+       assert.NotNil(t, err)
        m.AssertExpectations(t)
 
 }
diff --git a/syncstorage.go b/syncstorage.go
new file mode 100644 (file)
index 0000000..874a41e
--- /dev/null
@@ -0,0 +1,617 @@
+/*
+   Copyright (c) 2021 AT&T Intellectual Property.
+   Copyright (c) 2018-2021 Nokia.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+*/
+
+/*
+ * This source code is part of the near-RT RIC (RAN Intelligent Controller)
+ * platform project (RICP).
+ */
+
+package sdlgo
+
+import (
+       "crypto/rand"
+       "encoding/base64"
+       "errors"
+       "fmt"
+       "io"
+       "reflect"
+       "strings"
+       "sync"
+       "time"
+)
+
+//SyncStorage provides multi-namespace APIs to read, write and modify key-value
+//pairs. key-values are belonging to a namespace and SyncStorage provides APIs
+//where namespace can be given in every API call. This means that with
+//SyncStorage you can easily set key-values under different namespace compared to
+//SdlInstance where namespace can be defined only at SdlInstance instance creation
+//time.
+type SyncStorage struct {
+       eventSeparator string
+       mutex          sync.Mutex
+       tmp            []byte
+       iDatabase
+}
+
+//NewSyncStorage creates a new sdl instance.
+//The database used as a backend is given as a parameter
+func NewSyncStorage() *SyncStorage {
+       return newSyncStorage(NewDatabase())
+}
+
+func newSyncStorage(db *Database) *SyncStorage {
+       return &SyncStorage{
+               eventSeparator: "___",
+               iDatabase:      db.instance,
+       }
+}
+
+//SubscribeChannel lets you to subscribe for a events on a given channels.
+//SDL notifications are events that are published on a specific channels.
+//Both the channel and events are defined by the entity that is publishing
+//the events under given namespace.
+//
+//When subscribing for a channel, a callback function is given as a parameter.
+//Whenever a notification is received from a channel, this callback is called
+//with channel and notifications as parameter (several notifications could be
+//packed to a single callback function call). A call to SubscribeChannel function
+//returns immediatelly, callbacks will be called asyncronously.
+//
+//It is possible to subscribe to different channels using different callbacks. In
+//this case simply use SubscribeChannel function separately for each channel.
+//
+//When receiving events in callback routine, it is a good practive to return from
+//callback as quickly as possible. E.g. reading in callback context should be avoided
+//and using of Go signals is recommended. Also it should be noted that in case of several
+//events received from different channels, callbacks are called in series one by one.
+func (s *SyncStorage) SubscribeChannel(ns string, cb func(string, ...string), channels ...string) error {
+       nsPrefix := getNsPrefix(ns)
+       s.SubscribeChannelDB(cb, nsPrefix, s.eventSeparator, s.setNamespaceToChannels(nsPrefix, channels...)...)
+       return nil
+}
+
+//UnsubscribeChannel removes subscription from one or several channels under given
+//namespace.
+func (s *SyncStorage) UnsubscribeChannel(ns string, channels ...string) error {
+       nsPrefix := getNsPrefix(ns)
+       s.UnsubscribeChannelDB(s.setNamespaceToChannels(nsPrefix, channels...)...)
+       return nil
+}
+
+//Close connection to backend database.
+func (s *SyncStorage) Close() error {
+       return s.CloseDB()
+}
+
+func (s *SyncStorage) checkChannelsAndEvents(cmd string, channelsAndEvents []string) error {
+       if len(channelsAndEvents)%2 != 0 {
+               return fmt.Errorf("%s: Channels and events must be given as pairs", cmd)
+       }
+       for i, v := range channelsAndEvents {
+               if i%2 != 0 {
+                       if strings.Contains(v, s.eventSeparator) {
+                               return fmt.Errorf("%s: event %s contains illegal substring (\"%s\")", cmd, v, s.eventSeparator)
+                       }
+               }
+       }
+       return nil
+}
+
+func (s *SyncStorage) setNamespaceToChannels(nsPrefix string, channels ...string) []string {
+       var retVal []string
+       for _, v := range channels {
+               retVal = append(retVal, nsPrefix+v)
+       }
+       return retVal
+}
+
+func (s *SyncStorage) setNamespaceToKeys(nsPrefix string, pairs ...interface{}) ([]interface{}, error) {
+       retVal := make([]interface{}, 0)
+       shouldBeKey := true
+       for _, v := range pairs {
+               reflectType := reflect.TypeOf(v)
+               switch reflectType.Kind() {
+               case reflect.Map:
+                       x := reflect.ValueOf(v).MapRange()
+                       for x.Next() {
+                               retVal = append(retVal, nsPrefix+x.Key().Interface().(string))
+                               retVal = append(retVal, x.Value().Interface())
+                       }
+               case reflect.Slice:
+                       if shouldBeKey {
+                               x := reflect.ValueOf(v)
+                               if x.Len()%2 != 0 {
+                                       return []interface{}{}, errors.New("Key/value pairs doesn't match")
+                               }
+                               for i2 := 0; i2 < x.Len(); i2++ {
+                                       if i2%2 == 0 {
+                                               retVal = append(retVal, nsPrefix+x.Index(i2).Interface().(string))
+                                       } else {
+                                               retVal = append(retVal, x.Index(i2).Interface())
+                                       }
+                               }
+                       } else {
+                               if reflectType.Elem().Kind() == reflect.Uint8 {
+                                       retVal = append(retVal, v)
+                                       shouldBeKey = true
+                               } else {
+                                       return []interface{}{}, errors.New("Key/value pairs doesn't match")
+                               }
+                       }
+               case reflect.Array:
+                       if shouldBeKey {
+                               x := reflect.ValueOf(v)
+                               if x.Len()%2 != 0 {
+                                       return []interface{}{}, errors.New("Key/value pairs doesn't match")
+                               }
+                               for i2 := 0; i2 < x.Len(); i2++ {
+                                       if i2%2 == 0 {
+                                               retVal = append(retVal, nsPrefix+x.Index(i2).Interface().(string))
+                                       } else {
+                                               retVal = append(retVal, x.Index(i2).Interface())
+                                       }
+                               }
+                       } else {
+                               if reflectType.Elem().Kind() == reflect.Uint8 {
+                                       retVal = append(retVal, v)
+                                       shouldBeKey = true
+                               } else {
+                                       return []interface{}{}, errors.New("Key/value pairs doesn't match")
+                               }
+                       }
+               default:
+                       if shouldBeKey {
+                               retVal = append(retVal, nsPrefix+v.(string))
+                               shouldBeKey = false
+                       } else {
+                               retVal = append(retVal, v)
+                               shouldBeKey = true
+                       }
+               }
+       }
+       if len(retVal)%2 != 0 {
+               return []interface{}{}, errors.New("Key/value pairs doesn't match")
+       }
+       return retVal, nil
+}
+
+func (s *SyncStorage) prepareChannelsAndEvents(nsPrefix string, channelsAndEvents []string) []string {
+       channelEventMap := make(map[string]string)
+       for i, v := range channelsAndEvents {
+               if i%2 != 0 {
+                       continue
+               }
+               _, exists := channelEventMap[v]
+               if exists {
+                       channelEventMap[v] = channelEventMap[v] + s.eventSeparator + channelsAndEvents[i+1]
+               } else {
+                       channelEventMap[v] = channelsAndEvents[i+1]
+               }
+       }
+       retVal := make([]string, 0)
+       for k, v := range channelEventMap {
+               retVal = append(retVal, nsPrefix+k)
+               retVal = append(retVal, v)
+       }
+       return retVal
+}
+
+//SetAndPublish function writes data to shared data layer storage and sends an event to
+//a channel. Writing is done atomically, i.e. all succeeds or fails.
+//Data is written under the namespace what is given as a parameter for this function.
+//Data to be written is given as key-value pairs. Several key-value
+//pairs can be written with one call.
+//The key is expected to be string whereas value can be anything, string,
+//number, slice array or map
+//
+//If data was set successfully, an event is sent to a channel.
+//Channels and events are given as pairs is channelsAndEvents parameter.
+//It is possible to send several events to several channels by giving several
+//channel-event pairs.
+//  E.g. []{"channel1", "event1", "channel2", "event2", "channel1", "event3"}
+//will send event1 and event3 to channel1 and event2 to channel2.
+func (s *SyncStorage) SetAndPublish(ns string, channelsAndEvents []string, pairs ...interface{}) error {
+       nsPrefix := getNsPrefix(ns)
+       keyAndData, err := s.setNamespaceToKeys(nsPrefix, pairs...)
+       if err != nil {
+               return err
+       }
+       if len(channelsAndEvents) == 0 {
+               return s.MSet(keyAndData...)
+       }
+       if err := s.checkChannelsAndEvents("SetAndPublish", channelsAndEvents); err != nil {
+               return err
+       }
+       channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
+       return s.MSetMPub(channelsAndEventsPrepared, keyAndData...)
+}
+
+//Set function writes data to shared data layer storage. Writing is done
+//atomically, i.e. all succeeds or fails.
+//Data is written under the namespace what is given as a parameter for this function.
+//Data to be written is given as key-value pairs. Several key-value
+//pairs can be written with one call.
+//The key is expected to be string whereas value can be anything, string,
+//number, slice array or map
+func (s *SyncStorage) Set(ns string, pairs ...interface{}) error {
+       if len(pairs) == 0 {
+               return nil
+       }
+
+       keyAndData, err := s.setNamespaceToKeys(getNsPrefix(ns), pairs...)
+       if err != nil {
+               return err
+       }
+       return s.MSet(keyAndData...)
+}
+
+//Get function atomically reads one or more keys from SDL. The returned map has the
+//requested keys as index and data as value. If the requested key is not found
+//from SDL, it's value is nil
+//Read operation is targeted to the namespace what is given as a parameter for this
+//function.
+func (s *SyncStorage) Get(ns string, keys []string) (map[string]interface{}, error) {
+       m := make(map[string]interface{})
+       if len(keys) == 0 {
+               return m, nil
+       }
+
+       var keysWithNs []string
+       for _, v := range keys {
+               keysWithNs = append(keysWithNs, getNsPrefix(ns)+v)
+       }
+       val, err := s.MGet(keysWithNs)
+       if err != nil {
+               return m, err
+       }
+       for i, v := range val {
+               m[keys[i]] = v
+       }
+       return m, err
+}
+
+//SetIfAndPublish atomically replaces existing data with newData in SDL if data matches the oldData.
+//If replace was done successfully, true will be returned. Also, if publishing was successfull, an event
+//is published to a given channel.
+//Data is written under the namespace what is given as a parameter for this function.
+func (s *SyncStorage) SetIfAndPublish(ns string, channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error) {
+       nsPrefix := getNsPrefix(ns)
+       if len(channelsAndEvents) == 0 {
+               return s.SetIE(nsPrefix+key, oldData, newData)
+       }
+       if err := s.checkChannelsAndEvents("SetIfAndPublish", channelsAndEvents); err != nil {
+               return false, err
+       }
+       channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
+       return s.SetIEPub(channelsAndEventsPrepared, nsPrefix+key, oldData, newData)
+}
+
+//SetIf atomically replaces existing data with newData in SDL if data matches the oldData.
+//If replace was done successfully, true will be returned.
+//Data is written under the namespace what is given as a parameter for this function.
+func (s *SyncStorage) SetIf(ns string, key string, oldData, newData interface{}) (bool, error) {
+       return s.SetIE(getNsPrefix(ns)+key, oldData, newData)
+}
+
+//SetIfNotExistsAndPublish conditionally sets the value of a key. If key already exists in SDL,
+//then it's value is not changed. Checking the key existence and potential set operation
+//is done atomically. If the set operation was done successfully, an event is published to a
+//given channel.
+//Data is written under the namespace what is given as a parameter for this function.
+func (s *SyncStorage) SetIfNotExistsAndPublish(ns string, channelsAndEvents []string, key string, data interface{}) (bool, error) {
+       nsPrefix := getNsPrefix(ns)
+       if len(channelsAndEvents) == 0 {
+               return s.SetNX(nsPrefix+key, data, 0)
+       }
+       if err := s.checkChannelsAndEvents("SetIfNotExistsAndPublish", channelsAndEvents); err != nil {
+               return false, err
+       }
+       channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
+       return s.SetNXPub(channelsAndEventsPrepared, nsPrefix+key, data)
+}
+
+//SetIfNotExists conditionally sets the value of a key. If key already exists in SDL,
+//then it's value is not changed. Checking the key existence and potential set operation
+//is done atomically.
+//Data is written under the namespace what is given as a parameter for this function.
+func (s *SyncStorage) SetIfNotExists(ns string, key string, data interface{}) (bool, error) {
+       return s.SetNX(getNsPrefix(ns)+key, data, 0)
+}
+
+//RemoveAndPublish removes data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
+//Trying to remove a nonexisting key is not considered as an error.
+//An event is published into a given channel if remove operation is successfull and
+//at least one key is removed (if several keys given). If the given key(s) doesn't exist
+//when trying to remove, no event is published.
+//Data is removed under the namespace what is given as a parameter for this function.
+func (s *SyncStorage) RemoveAndPublish(ns string, channelsAndEvents []string, keys []string) error {
+       if len(keys) == 0 {
+               return nil
+       }
+
+       var keysWithNs []string
+       nsPrefix := getNsPrefix(ns)
+       for _, v := range keys {
+               keysWithNs = append(keysWithNs, nsPrefix+v)
+       }
+       if len(channelsAndEvents) == 0 {
+               return s.Del(keysWithNs)
+       }
+       if err := s.checkChannelsAndEvents("RemoveAndPublish", channelsAndEvents); err != nil {
+               return err
+       }
+       channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
+       return s.DelMPub(channelsAndEventsPrepared, keysWithNs)
+}
+
+//Remove data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
+//Data is removed under the namespace what is given as a parameter for this function.
+func (s *SyncStorage) Remove(ns string, keys []string) error {
+       if len(keys) == 0 {
+               return nil
+       }
+
+       var keysWithNs []string
+       for _, v := range keys {
+               keysWithNs = append(keysWithNs, getNsPrefix(ns)+v)
+       }
+       err := s.Del(keysWithNs)
+       return err
+}
+
+//RemoveIfAndPublish removes data from SDL conditionally and if remove was done successfully,
+//a given event is published to channel. If existing data matches given data,
+//key and data are removed from SDL. If remove was done successfully, true is returned.
+//Data is removed under the namespace what is given as a parameter for this function.
+func (s *SyncStorage) RemoveIfAndPublish(ns string, channelsAndEvents []string, key string, data interface{}) (bool, error) {
+       nsPrefix := getNsPrefix(ns)
+       if len(channelsAndEvents) == 0 {
+               return s.DelIE(nsPrefix+key, data)
+       }
+       if err := s.checkChannelsAndEvents("RemoveIfAndPublish", channelsAndEvents); err != nil {
+               return false, err
+       }
+       channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
+       return s.DelIEPub(channelsAndEventsPrepared, nsPrefix+key, data)
+}
+
+//RemoveIf removes data from SDL conditionally. If existing data matches given data,
+//key and data are removed from SDL. If remove was done successfully, true is returned.
+//Data is removed under the namespace what is given as a parameter for this function.
+func (s *SyncStorage) RemoveIf(ns string, key string, data interface{}) (bool, error) {
+       status, err := s.DelIE(getNsPrefix(ns)+key, data)
+       if err != nil {
+               return false, err
+       }
+       return status, nil
+}
+
+//GetAll returns all keys under the namespace. No prior knowledge about the keys in the
+//given namespace exists, thus operation is not guaranteed to be atomic or isolated.
+func (s *SyncStorage) GetAll(ns string) ([]string, error) {
+       nsPrefix := getNsPrefix(ns)
+       keys, err := s.Keys(nsPrefix + "*")
+       var retVal []string
+       if err != nil {
+               return retVal, err
+       }
+       for _, v := range keys {
+               retVal = append(retVal, strings.Split(v, nsPrefix)[1])
+       }
+       return retVal, err
+}
+
+//RemoveAll removes all keys under the namespace. Remove operation is not atomic, thus
+//it is not guaranteed that all keys are removed.
+func (s *SyncStorage) RemoveAll(ns string) error {
+       keys, err := s.Keys(getNsPrefix(ns) + "*")
+       if err != nil {
+               return err
+       }
+       if (keys != nil) && (len(keys) != 0) {
+               err = s.Del(keys)
+       }
+       return err
+}
+
+//RemoveAllAndPublish removes all keys under the namespace and if successfull, it
+//will publish an event to given channel. This operation is not atomic, thus it is
+//not guaranteed that all keys are removed.
+func (s *SyncStorage) RemoveAllAndPublish(ns string, channelsAndEvents []string) error {
+       nsPrefix := getNsPrefix(ns)
+       keys, err := s.Keys(nsPrefix + "*")
+       if err != nil {
+               return err
+       }
+       if (keys != nil) && (len(keys) != 0) {
+               if len(channelsAndEvents) == 0 {
+                       return s.Del(keys)
+               }
+               if err := s.checkChannelsAndEvents("RemoveAllAndPublish", channelsAndEvents); err != nil {
+                       return err
+               }
+               channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
+               err = s.DelMPub(channelsAndEventsPrepared, keys)
+       }
+       return err
+}
+
+//AddMember adds a new members to a group under given namespace.
+//
+//SDL groups are unordered collections of members where each member is
+//unique. It is possible to add the same member several times without the
+//need to check if it already exists.
+func (s *SyncStorage) AddMember(ns string, group string, member ...interface{}) error {
+       return s.SAdd(getNsPrefix(ns)+group, member...)
+}
+
+//RemoveMember removes members from a group under given namespace.
+func (s *SyncStorage) RemoveMember(ns string, group string, member ...interface{}) error {
+       return s.SRem(getNsPrefix(ns)+group, member...)
+}
+
+//RemoveGroup removes the whole group along with it's members under given namespace.
+func (s *SyncStorage) RemoveGroup(ns string, group string) error {
+       return s.Del([]string{getNsPrefix(ns) + group})
+}
+
+//GetMembers returns all the members from a group under given namespace.
+func (s *SyncStorage) GetMembers(ns string, group string) ([]string, error) {
+       retVal, err := s.SMembers(getNsPrefix(ns) + group)
+       if err != nil {
+               return []string{}, err
+       }
+       return retVal, err
+}
+
+//IsMember returns true if given member is found from a group under given namespace.
+func (s *SyncStorage) IsMember(ns string, group string, member interface{}) (bool, error) {
+       retVal, err := s.SIsMember(getNsPrefix(ns)+group, member)
+       if err != nil {
+               return false, err
+       }
+       return retVal, err
+}
+
+//GroupSize returns the number of members in a group under given namespace.
+func (s *SyncStorage) GroupSize(ns string, group string) (int64, error) {
+       retVal, err := s.SCard(getNsPrefix(ns) + group)
+       if err != nil {
+               return 0, err
+       }
+       return retVal, err
+}
+
+func (s *SyncStorage) randomToken() (string, error) {
+       s.mutex.Lock()
+       defer s.mutex.Unlock()
+
+       if len(s.tmp) == 0 {
+               s.tmp = make([]byte, 16)
+       }
+
+       if _, err := io.ReadFull(rand.Reader, s.tmp); err != nil {
+               return "", err
+       }
+
+       return base64.RawURLEncoding.EncodeToString(s.tmp), nil
+}
+
+//LockResource function is used for locking a resource under given namespace.
+//The resource lock in practice is a key with random value that is set to expire
+//after a time period. The value written to key is a random value, thus only the
+//instance created a lock, can release it. Resource locks are per namespace.
+func (s *SyncStorage) LockResource(ns string, resource string, expiration time.Duration, opt *Options) (*SyncStorageLock, error) {
+       value, err := s.randomToken()
+       if err != nil {
+               return nil, err
+       }
+
+       var retryTimer *time.Timer
+       for i, attempts := 0, opt.getRetryCount()+1; i < attempts; i++ {
+               ok, err := s.SetNX(getNsPrefix(ns)+resource, value, expiration)
+               if err != nil {
+                       return nil, err
+               } else if ok {
+                       return &SyncStorageLock{s: s, key: resource, value: value}, nil
+               }
+               if retryTimer == nil {
+                       retryTimer = time.NewTimer(opt.getRetryWait())
+                       defer retryTimer.Stop()
+               } else {
+                       retryTimer.Reset(opt.getRetryWait())
+               }
+
+               select {
+               case <-retryTimer.C:
+               }
+       }
+       return nil, errors.New("Lock not obtained")
+}
+
+//ReleaseResource removes the lock from a resource under given namespace. If lock
+//is already expired or some other instance is keeping the lock (lock taken after
+//expiration), an error is returned.
+func (l *SyncStorageLock) ReleaseResource(ns string) error {
+       ok, err := l.s.DelIE(getNsPrefix(ns)+l.key, l.value)
+
+       if err != nil {
+               return err
+       }
+       if !ok {
+               return errors.New("Lock not held")
+       }
+       return nil
+}
+
+//RefreshResource function can be used to set a new expiration time for the
+//resource lock (if the lock still exists) under given namespace. The old
+//remaining expiration time is overwritten with the given new expiration time.
+func (l *SyncStorageLock) RefreshResource(ns string, expiration time.Duration) error {
+       err := l.s.PExpireIE(getNsPrefix(ns)+l.key, l.value, expiration)
+       return err
+}
+
+//CheckResource returns the expiration time left for a resource under given
+//namespace. If the resource doesn't exist, -2 is returned.
+func (s *SyncStorage) CheckResource(ns string, resource string) (time.Duration, error) {
+       result, err := s.PTTL(getNsPrefix(ns) + resource)
+       if err != nil {
+               return 0, err
+       }
+       if result == time.Duration(-1) {
+               return 0, errors.New("invalid resource given, no expiration time attached")
+       }
+       return result, nil
+}
+
+//SyncStorageLock struct identifies the resource lock instance. Releasing and adjusting the
+//expirations are done using the methods defined for this struct.
+type SyncStorageLock struct {
+       s     *SyncStorage
+       key   string
+       value string
+}
+
+func getNsPrefix(ns string) string {
+       return "{" + ns + "},"
+}
+
+type iDatabase interface {
+       SubscribeChannelDB(cb func(string, ...string), channelPrefix, eventSeparator string, channels ...string)
+       UnsubscribeChannelDB(channels ...string)
+       MSet(pairs ...interface{}) error
+       MSetMPub(channelsAndEvents []string, pairs ...interface{}) error
+       MGet(keys []string) ([]interface{}, error)
+       CloseDB() error
+       Del(keys []string) error
+       DelMPub(channelsAndEvents []string, keys []string) error
+       Keys(key string) ([]string, error)
+       SetIE(key string, oldData, newData interface{}) (bool, error)
+       SetIEPub(channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error)
+       SetNX(key string, data interface{}, expiration time.Duration) (bool, error)
+       SetNXPub(channelsAndEvents []string, key string, data interface{}) (bool, error)
+       DelIE(key string, data interface{}) (bool, error)
+       DelIEPub(channelsAndEvents []string, key string, data interface{}) (bool, error)
+       SAdd(key string, data ...interface{}) error
+       SRem(key string, data ...interface{}) error
+       SMembers(key string) ([]string, error)
+       SIsMember(key string, data interface{}) (bool, error)
+       SCard(key string) (int64, error)
+       PTTL(key string) (time.Duration, error)
+       PExpireIE(key string, data interface{}, expiration time.Duration) error
+}