Implement SDL multi-namespace API
[ric-plt/sdlgo.git] / syncstorage.go
1 /*
2    Copyright (c) 2021 AT&T Intellectual Property.
3    Copyright (c) 2018-2021 Nokia.
4
5    Licensed under the Apache License, Version 2.0 (the "License");
6    you may not use this file except in compliance with the License.
7    You may obtain a copy of the License at
8
9        http://www.apache.org/licenses/LICENSE-2.0
10
11    Unless required by applicable law or agreed to in writing, software
12    distributed under the License is distributed on an "AS IS" BASIS,
13    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14    See the License for the specific language governing permissions and
15    limitations under the License.
16 */
17
18 /*
19  * This source code is part of the near-RT RIC (RAN Intelligent Controller)
20  * platform project (RICP).
21  */
22
23 package sdlgo
24
25 import (
26         "crypto/rand"
27         "encoding/base64"
28         "errors"
29         "fmt"
30         "io"
31         "reflect"
32         "strings"
33         "sync"
34         "time"
35 )
36
37 //SyncStorage provides multi-namespace APIs to read, write and modify key-value
38 //pairs. key-values are belonging to a namespace and SyncStorage provides APIs
39 //where namespace can be given in every API call. This means that with
40 //SyncStorage you can easily set key-values under different namespace compared to
41 //SdlInstance where namespace can be defined only at SdlInstance instance creation
42 //time.
43 type SyncStorage struct {
44         eventSeparator string
45         mutex          sync.Mutex
46         tmp            []byte
47         iDatabase
48 }
49
50 //NewSyncStorage creates a new sdl instance.
51 //The database used as a backend is given as a parameter
52 func NewSyncStorage() *SyncStorage {
53         return newSyncStorage(NewDatabase())
54 }
55
56 func newSyncStorage(db *Database) *SyncStorage {
57         return &SyncStorage{
58                 eventSeparator: "___",
59                 iDatabase:      db.instance,
60         }
61 }
62
63 //SubscribeChannel lets you to subscribe for a events on a given channels.
64 //SDL notifications are events that are published on a specific channels.
65 //Both the channel and events are defined by the entity that is publishing
66 //the events under given namespace.
67 //
68 //When subscribing for a channel, a callback function is given as a parameter.
69 //Whenever a notification is received from a channel, this callback is called
70 //with channel and notifications as parameter (several notifications could be
71 //packed to a single callback function call). A call to SubscribeChannel function
72 //returns immediatelly, callbacks will be called asyncronously.
73 //
74 //It is possible to subscribe to different channels using different callbacks. In
75 //this case simply use SubscribeChannel function separately for each channel.
76 //
77 //When receiving events in callback routine, it is a good practive to return from
78 //callback as quickly as possible. E.g. reading in callback context should be avoided
79 //and using of Go signals is recommended. Also it should be noted that in case of several
80 //events received from different channels, callbacks are called in series one by one.
81 func (s *SyncStorage) SubscribeChannel(ns string, cb func(string, ...string), channels ...string) error {
82         nsPrefix := getNsPrefix(ns)
83         s.SubscribeChannelDB(cb, nsPrefix, s.eventSeparator, s.setNamespaceToChannels(nsPrefix, channels...)...)
84         return nil
85 }
86
87 //UnsubscribeChannel removes subscription from one or several channels under given
88 //namespace.
89 func (s *SyncStorage) UnsubscribeChannel(ns string, channels ...string) error {
90         nsPrefix := getNsPrefix(ns)
91         s.UnsubscribeChannelDB(s.setNamespaceToChannels(nsPrefix, channels...)...)
92         return nil
93 }
94
95 //Close connection to backend database.
96 func (s *SyncStorage) Close() error {
97         return s.CloseDB()
98 }
99
100 func (s *SyncStorage) checkChannelsAndEvents(cmd string, channelsAndEvents []string) error {
101         if len(channelsAndEvents)%2 != 0 {
102                 return fmt.Errorf("%s: Channels and events must be given as pairs", cmd)
103         }
104         for i, v := range channelsAndEvents {
105                 if i%2 != 0 {
106                         if strings.Contains(v, s.eventSeparator) {
107                                 return fmt.Errorf("%s: event %s contains illegal substring (\"%s\")", cmd, v, s.eventSeparator)
108                         }
109                 }
110         }
111         return nil
112 }
113
114 func (s *SyncStorage) setNamespaceToChannels(nsPrefix string, channels ...string) []string {
115         var retVal []string
116         for _, v := range channels {
117                 retVal = append(retVal, nsPrefix+v)
118         }
119         return retVal
120 }
121
122 func (s *SyncStorage) setNamespaceToKeys(nsPrefix string, pairs ...interface{}) ([]interface{}, error) {
123         retVal := make([]interface{}, 0)
124         shouldBeKey := true
125         for _, v := range pairs {
126                 reflectType := reflect.TypeOf(v)
127                 switch reflectType.Kind() {
128                 case reflect.Map:
129                         x := reflect.ValueOf(v).MapRange()
130                         for x.Next() {
131                                 retVal = append(retVal, nsPrefix+x.Key().Interface().(string))
132                                 retVal = append(retVal, x.Value().Interface())
133                         }
134                 case reflect.Slice:
135                         if shouldBeKey {
136                                 x := reflect.ValueOf(v)
137                                 if x.Len()%2 != 0 {
138                                         return []interface{}{}, errors.New("Key/value pairs doesn't match")
139                                 }
140                                 for i2 := 0; i2 < x.Len(); i2++ {
141                                         if i2%2 == 0 {
142                                                 retVal = append(retVal, nsPrefix+x.Index(i2).Interface().(string))
143                                         } else {
144                                                 retVal = append(retVal, x.Index(i2).Interface())
145                                         }
146                                 }
147                         } else {
148                                 if reflectType.Elem().Kind() == reflect.Uint8 {
149                                         retVal = append(retVal, v)
150                                         shouldBeKey = true
151                                 } else {
152                                         return []interface{}{}, errors.New("Key/value pairs doesn't match")
153                                 }
154                         }
155                 case reflect.Array:
156                         if shouldBeKey {
157                                 x := reflect.ValueOf(v)
158                                 if x.Len()%2 != 0 {
159                                         return []interface{}{}, errors.New("Key/value pairs doesn't match")
160                                 }
161                                 for i2 := 0; i2 < x.Len(); i2++ {
162                                         if i2%2 == 0 {
163                                                 retVal = append(retVal, nsPrefix+x.Index(i2).Interface().(string))
164                                         } else {
165                                                 retVal = append(retVal, x.Index(i2).Interface())
166                                         }
167                                 }
168                         } else {
169                                 if reflectType.Elem().Kind() == reflect.Uint8 {
170                                         retVal = append(retVal, v)
171                                         shouldBeKey = true
172                                 } else {
173                                         return []interface{}{}, errors.New("Key/value pairs doesn't match")
174                                 }
175                         }
176                 default:
177                         if shouldBeKey {
178                                 retVal = append(retVal, nsPrefix+v.(string))
179                                 shouldBeKey = false
180                         } else {
181                                 retVal = append(retVal, v)
182                                 shouldBeKey = true
183                         }
184                 }
185         }
186         if len(retVal)%2 != 0 {
187                 return []interface{}{}, errors.New("Key/value pairs doesn't match")
188         }
189         return retVal, nil
190 }
191
192 func (s *SyncStorage) prepareChannelsAndEvents(nsPrefix string, channelsAndEvents []string) []string {
193         channelEventMap := make(map[string]string)
194         for i, v := range channelsAndEvents {
195                 if i%2 != 0 {
196                         continue
197                 }
198                 _, exists := channelEventMap[v]
199                 if exists {
200                         channelEventMap[v] = channelEventMap[v] + s.eventSeparator + channelsAndEvents[i+1]
201                 } else {
202                         channelEventMap[v] = channelsAndEvents[i+1]
203                 }
204         }
205         retVal := make([]string, 0)
206         for k, v := range channelEventMap {
207                 retVal = append(retVal, nsPrefix+k)
208                 retVal = append(retVal, v)
209         }
210         return retVal
211 }
212
213 //SetAndPublish function writes data to shared data layer storage and sends an event to
214 //a channel. Writing is done atomically, i.e. all succeeds or fails.
215 //Data is written under the namespace what is given as a parameter for this function.
216 //Data to be written is given as key-value pairs. Several key-value
217 //pairs can be written with one call.
218 //The key is expected to be string whereas value can be anything, string,
219 //number, slice array or map
220 //
221 //If data was set successfully, an event is sent to a channel.
222 //Channels and events are given as pairs is channelsAndEvents parameter.
223 //It is possible to send several events to several channels by giving several
224 //channel-event pairs.
225 //  E.g. []{"channel1", "event1", "channel2", "event2", "channel1", "event3"}
226 //will send event1 and event3 to channel1 and event2 to channel2.
227 func (s *SyncStorage) SetAndPublish(ns string, channelsAndEvents []string, pairs ...interface{}) error {
228         nsPrefix := getNsPrefix(ns)
229         keyAndData, err := s.setNamespaceToKeys(nsPrefix, pairs...)
230         if err != nil {
231                 return err
232         }
233         if len(channelsAndEvents) == 0 {
234                 return s.MSet(keyAndData...)
235         }
236         if err := s.checkChannelsAndEvents("SetAndPublish", channelsAndEvents); err != nil {
237                 return err
238         }
239         channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
240         return s.MSetMPub(channelsAndEventsPrepared, keyAndData...)
241 }
242
243 //Set function writes data to shared data layer storage. Writing is done
244 //atomically, i.e. all succeeds or fails.
245 //Data is written under the namespace what is given as a parameter for this function.
246 //Data to be written is given as key-value pairs. Several key-value
247 //pairs can be written with one call.
248 //The key is expected to be string whereas value can be anything, string,
249 //number, slice array or map
250 func (s *SyncStorage) Set(ns string, pairs ...interface{}) error {
251         if len(pairs) == 0 {
252                 return nil
253         }
254
255         keyAndData, err := s.setNamespaceToKeys(getNsPrefix(ns), pairs...)
256         if err != nil {
257                 return err
258         }
259         return s.MSet(keyAndData...)
260 }
261
262 //Get function atomically reads one or more keys from SDL. The returned map has the
263 //requested keys as index and data as value. If the requested key is not found
264 //from SDL, it's value is nil
265 //Read operation is targeted to the namespace what is given as a parameter for this
266 //function.
267 func (s *SyncStorage) Get(ns string, keys []string) (map[string]interface{}, error) {
268         m := make(map[string]interface{})
269         if len(keys) == 0 {
270                 return m, nil
271         }
272
273         var keysWithNs []string
274         for _, v := range keys {
275                 keysWithNs = append(keysWithNs, getNsPrefix(ns)+v)
276         }
277         val, err := s.MGet(keysWithNs)
278         if err != nil {
279                 return m, err
280         }
281         for i, v := range val {
282                 m[keys[i]] = v
283         }
284         return m, err
285 }
286
287 //SetIfAndPublish atomically replaces existing data with newData in SDL if data matches the oldData.
288 //If replace was done successfully, true will be returned. Also, if publishing was successfull, an event
289 //is published to a given channel.
290 //Data is written under the namespace what is given as a parameter for this function.
291 func (s *SyncStorage) SetIfAndPublish(ns string, channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error) {
292         nsPrefix := getNsPrefix(ns)
293         if len(channelsAndEvents) == 0 {
294                 return s.SetIE(nsPrefix+key, oldData, newData)
295         }
296         if err := s.checkChannelsAndEvents("SetIfAndPublish", channelsAndEvents); err != nil {
297                 return false, err
298         }
299         channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
300         return s.SetIEPub(channelsAndEventsPrepared, nsPrefix+key, oldData, newData)
301 }
302
303 //SetIf atomically replaces existing data with newData in SDL if data matches the oldData.
304 //If replace was done successfully, true will be returned.
305 //Data is written under the namespace what is given as a parameter for this function.
306 func (s *SyncStorage) SetIf(ns string, key string, oldData, newData interface{}) (bool, error) {
307         return s.SetIE(getNsPrefix(ns)+key, oldData, newData)
308 }
309
310 //SetIfNotExistsAndPublish conditionally sets the value of a key. If key already exists in SDL,
311 //then it's value is not changed. Checking the key existence and potential set operation
312 //is done atomically. If the set operation was done successfully, an event is published to a
313 //given channel.
314 //Data is written under the namespace what is given as a parameter for this function.
315 func (s *SyncStorage) SetIfNotExistsAndPublish(ns string, channelsAndEvents []string, key string, data interface{}) (bool, error) {
316         nsPrefix := getNsPrefix(ns)
317         if len(channelsAndEvents) == 0 {
318                 return s.SetNX(nsPrefix+key, data, 0)
319         }
320         if err := s.checkChannelsAndEvents("SetIfNotExistsAndPublish", channelsAndEvents); err != nil {
321                 return false, err
322         }
323         channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
324         return s.SetNXPub(channelsAndEventsPrepared, nsPrefix+key, data)
325 }
326
327 //SetIfNotExists conditionally sets the value of a key. If key already exists in SDL,
328 //then it's value is not changed. Checking the key existence and potential set operation
329 //is done atomically.
330 //Data is written under the namespace what is given as a parameter for this function.
331 func (s *SyncStorage) SetIfNotExists(ns string, key string, data interface{}) (bool, error) {
332         return s.SetNX(getNsPrefix(ns)+key, data, 0)
333 }
334
335 //RemoveAndPublish removes data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
336 //Trying to remove a nonexisting key is not considered as an error.
337 //An event is published into a given channel if remove operation is successfull and
338 //at least one key is removed (if several keys given). If the given key(s) doesn't exist
339 //when trying to remove, no event is published.
340 //Data is removed under the namespace what is given as a parameter for this function.
341 func (s *SyncStorage) RemoveAndPublish(ns string, channelsAndEvents []string, keys []string) error {
342         if len(keys) == 0 {
343                 return nil
344         }
345
346         var keysWithNs []string
347         nsPrefix := getNsPrefix(ns)
348         for _, v := range keys {
349                 keysWithNs = append(keysWithNs, nsPrefix+v)
350         }
351         if len(channelsAndEvents) == 0 {
352                 return s.Del(keysWithNs)
353         }
354         if err := s.checkChannelsAndEvents("RemoveAndPublish", channelsAndEvents); err != nil {
355                 return err
356         }
357         channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
358         return s.DelMPub(channelsAndEventsPrepared, keysWithNs)
359 }
360
361 //Remove data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
362 //Data is removed under the namespace what is given as a parameter for this function.
363 func (s *SyncStorage) Remove(ns string, keys []string) error {
364         if len(keys) == 0 {
365                 return nil
366         }
367
368         var keysWithNs []string
369         for _, v := range keys {
370                 keysWithNs = append(keysWithNs, getNsPrefix(ns)+v)
371         }
372         err := s.Del(keysWithNs)
373         return err
374 }
375
376 //RemoveIfAndPublish removes data from SDL conditionally and if remove was done successfully,
377 //a given event is published to channel. If existing data matches given data,
378 //key and data are removed from SDL. If remove was done successfully, true is returned.
379 //Data is removed under the namespace what is given as a parameter for this function.
380 func (s *SyncStorage) RemoveIfAndPublish(ns string, channelsAndEvents []string, key string, data interface{}) (bool, error) {
381         nsPrefix := getNsPrefix(ns)
382         if len(channelsAndEvents) == 0 {
383                 return s.DelIE(nsPrefix+key, data)
384         }
385         if err := s.checkChannelsAndEvents("RemoveIfAndPublish", channelsAndEvents); err != nil {
386                 return false, err
387         }
388         channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
389         return s.DelIEPub(channelsAndEventsPrepared, nsPrefix+key, data)
390 }
391
392 //RemoveIf removes data from SDL conditionally. If existing data matches given data,
393 //key and data are removed from SDL. If remove was done successfully, true is returned.
394 //Data is removed under the namespace what is given as a parameter for this function.
395 func (s *SyncStorage) RemoveIf(ns string, key string, data interface{}) (bool, error) {
396         status, err := s.DelIE(getNsPrefix(ns)+key, data)
397         if err != nil {
398                 return false, err
399         }
400         return status, nil
401 }
402
403 //GetAll returns all keys under the namespace. No prior knowledge about the keys in the
404 //given namespace exists, thus operation is not guaranteed to be atomic or isolated.
405 func (s *SyncStorage) GetAll(ns string) ([]string, error) {
406         nsPrefix := getNsPrefix(ns)
407         keys, err := s.Keys(nsPrefix + "*")
408         var retVal []string
409         if err != nil {
410                 return retVal, err
411         }
412         for _, v := range keys {
413                 retVal = append(retVal, strings.Split(v, nsPrefix)[1])
414         }
415         return retVal, err
416 }
417
418 //RemoveAll removes all keys under the namespace. Remove operation is not atomic, thus
419 //it is not guaranteed that all keys are removed.
420 func (s *SyncStorage) RemoveAll(ns string) error {
421         keys, err := s.Keys(getNsPrefix(ns) + "*")
422         if err != nil {
423                 return err
424         }
425         if (keys != nil) && (len(keys) != 0) {
426                 err = s.Del(keys)
427         }
428         return err
429 }
430
431 //RemoveAllAndPublish removes all keys under the namespace and if successfull, it
432 //will publish an event to given channel. This operation is not atomic, thus it is
433 //not guaranteed that all keys are removed.
434 func (s *SyncStorage) RemoveAllAndPublish(ns string, channelsAndEvents []string) error {
435         nsPrefix := getNsPrefix(ns)
436         keys, err := s.Keys(nsPrefix + "*")
437         if err != nil {
438                 return err
439         }
440         if (keys != nil) && (len(keys) != 0) {
441                 if len(channelsAndEvents) == 0 {
442                         return s.Del(keys)
443                 }
444                 if err := s.checkChannelsAndEvents("RemoveAllAndPublish", channelsAndEvents); err != nil {
445                         return err
446                 }
447                 channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
448                 err = s.DelMPub(channelsAndEventsPrepared, keys)
449         }
450         return err
451 }
452
453 //AddMember adds a new members to a group under given namespace.
454 //
455 //SDL groups are unordered collections of members where each member is
456 //unique. It is possible to add the same member several times without the
457 //need to check if it already exists.
458 func (s *SyncStorage) AddMember(ns string, group string, member ...interface{}) error {
459         return s.SAdd(getNsPrefix(ns)+group, member...)
460 }
461
462 //RemoveMember removes members from a group under given namespace.
463 func (s *SyncStorage) RemoveMember(ns string, group string, member ...interface{}) error {
464         return s.SRem(getNsPrefix(ns)+group, member...)
465 }
466
467 //RemoveGroup removes the whole group along with it's members under given namespace.
468 func (s *SyncStorage) RemoveGroup(ns string, group string) error {
469         return s.Del([]string{getNsPrefix(ns) + group})
470 }
471
472 //GetMembers returns all the members from a group under given namespace.
473 func (s *SyncStorage) GetMembers(ns string, group string) ([]string, error) {
474         retVal, err := s.SMembers(getNsPrefix(ns) + group)
475         if err != nil {
476                 return []string{}, err
477         }
478         return retVal, err
479 }
480
481 //IsMember returns true if given member is found from a group under given namespace.
482 func (s *SyncStorage) IsMember(ns string, group string, member interface{}) (bool, error) {
483         retVal, err := s.SIsMember(getNsPrefix(ns)+group, member)
484         if err != nil {
485                 return false, err
486         }
487         return retVal, err
488 }
489
490 //GroupSize returns the number of members in a group under given namespace.
491 func (s *SyncStorage) GroupSize(ns string, group string) (int64, error) {
492         retVal, err := s.SCard(getNsPrefix(ns) + group)
493         if err != nil {
494                 return 0, err
495         }
496         return retVal, err
497 }
498
499 func (s *SyncStorage) randomToken() (string, error) {
500         s.mutex.Lock()
501         defer s.mutex.Unlock()
502
503         if len(s.tmp) == 0 {
504                 s.tmp = make([]byte, 16)
505         }
506
507         if _, err := io.ReadFull(rand.Reader, s.tmp); err != nil {
508                 return "", err
509         }
510
511         return base64.RawURLEncoding.EncodeToString(s.tmp), nil
512 }
513
514 //LockResource function is used for locking a resource under given namespace.
515 //The resource lock in practice is a key with random value that is set to expire
516 //after a time period. The value written to key is a random value, thus only the
517 //instance created a lock, can release it. Resource locks are per namespace.
518 func (s *SyncStorage) LockResource(ns string, resource string, expiration time.Duration, opt *Options) (*SyncStorageLock, error) {
519         value, err := s.randomToken()
520         if err != nil {
521                 return nil, err
522         }
523
524         var retryTimer *time.Timer
525         for i, attempts := 0, opt.getRetryCount()+1; i < attempts; i++ {
526                 ok, err := s.SetNX(getNsPrefix(ns)+resource, value, expiration)
527                 if err != nil {
528                         return nil, err
529                 } else if ok {
530                         return &SyncStorageLock{s: s, key: resource, value: value}, nil
531                 }
532                 if retryTimer == nil {
533                         retryTimer = time.NewTimer(opt.getRetryWait())
534                         defer retryTimer.Stop()
535                 } else {
536                         retryTimer.Reset(opt.getRetryWait())
537                 }
538
539                 select {
540                 case <-retryTimer.C:
541                 }
542         }
543         return nil, errors.New("Lock not obtained")
544 }
545
546 //ReleaseResource removes the lock from a resource under given namespace. If lock
547 //is already expired or some other instance is keeping the lock (lock taken after
548 //expiration), an error is returned.
549 func (l *SyncStorageLock) ReleaseResource(ns string) error {
550         ok, err := l.s.DelIE(getNsPrefix(ns)+l.key, l.value)
551
552         if err != nil {
553                 return err
554         }
555         if !ok {
556                 return errors.New("Lock not held")
557         }
558         return nil
559 }
560
561 //RefreshResource function can be used to set a new expiration time for the
562 //resource lock (if the lock still exists) under given namespace. The old
563 //remaining expiration time is overwritten with the given new expiration time.
564 func (l *SyncStorageLock) RefreshResource(ns string, expiration time.Duration) error {
565         err := l.s.PExpireIE(getNsPrefix(ns)+l.key, l.value, expiration)
566         return err
567 }
568
569 //CheckResource returns the expiration time left for a resource under given
570 //namespace. If the resource doesn't exist, -2 is returned.
571 func (s *SyncStorage) CheckResource(ns string, resource string) (time.Duration, error) {
572         result, err := s.PTTL(getNsPrefix(ns) + resource)
573         if err != nil {
574                 return 0, err
575         }
576         if result == time.Duration(-1) {
577                 return 0, errors.New("invalid resource given, no expiration time attached")
578         }
579         return result, nil
580 }
581
582 //SyncStorageLock struct identifies the resource lock instance. Releasing and adjusting the
583 //expirations are done using the methods defined for this struct.
584 type SyncStorageLock struct {
585         s     *SyncStorage
586         key   string
587         value string
588 }
589
590 func getNsPrefix(ns string) string {
591         return "{" + ns + "},"
592 }
593
594 type iDatabase interface {
595         SubscribeChannelDB(cb func(string, ...string), channelPrefix, eventSeparator string, channels ...string)
596         UnsubscribeChannelDB(channels ...string)
597         MSet(pairs ...interface{}) error
598         MSetMPub(channelsAndEvents []string, pairs ...interface{}) error
599         MGet(keys []string) ([]interface{}, error)
600         CloseDB() error
601         Del(keys []string) error
602         DelMPub(channelsAndEvents []string, keys []string) error
603         Keys(key string) ([]string, error)
604         SetIE(key string, oldData, newData interface{}) (bool, error)
605         SetIEPub(channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error)
606         SetNX(key string, data interface{}, expiration time.Duration) (bool, error)
607         SetNXPub(channelsAndEvents []string, key string, data interface{}) (bool, error)
608         DelIE(key string, data interface{}) (bool, error)
609         DelIEPub(channelsAndEvents []string, key string, data interface{}) (bool, error)
610         SAdd(key string, data ...interface{}) error
611         SRem(key string, data ...interface{}) error
612         SMembers(key string) ([]string, error)
613         SIsMember(key string, data interface{}) (bool, error)
614         SCard(key string) (int64, error)
615         PTTL(key string) (time.Duration, error)
616         PExpireIE(key string, data interface{}, expiration time.Duration) error
617 }