2 Copyright (c) 2019 AT&T Intellectual Property.
3 Copyright (c) 2018-2019 Nokia.
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
9 http://www.apache.org/licenses/LICENSE-2.0
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
31 "gerrit.o-ran-sc.org/r/ric-plt/sdlgo/internal/sdlgoredis"
34 //SdlInstance provides an API to read, write and modify
35 //key-value pairs in a given namespace.
36 type SdlInstance struct {
45 //Database struct is a holder for the internal database instance. Applications
46 //can use this exported data type to locally store a reference to database
47 //instance returned from NewDabase() function.
48 type Database struct {
52 //NewDatabase creates a connection to database that will be used
53 //as a backend for the key-value storage. The returned value
54 //can be reused between multiple SDL instances in which case each instance
55 //is using the same connection.
56 func NewDatabase() *Database {
58 instance: sdlgoredis.Create(),
62 //NewSdlInstance creates a new sdl instance using the given namespace.
63 //The database used as a backend is given as a parameter
64 func NewSdlInstance(NameSpace string, db *Database) *SdlInstance {
67 nsPrefix: "{" + NameSpace + "},",
68 eventSeparator: "___",
69 iDatabase: db.instance,
73 //SubscribeChannel lets you to subscribe for a events on a given channels.
74 //SDL notifications are events that are published on a specific channels.
75 //Both the channel and events are defined by the entity that is publishing
78 //When subscribing for a channel, a callback function is given as a parameter.
79 //Whenever a notification is received from a channel, this callback is called
80 //with channel and notifications as parameter (several notifications could be
81 //packed to a single callback function call). A call to SubscribeChannel function
82 //returns immediatelly, callbacks will be called asyncronously.
84 //It is possible to subscribe to different channels using different callbacks. In
85 //this case simply use SubscribeChannel function separately for each channel.
87 //When receiving events in callback routine, it is a good practive to return from
88 //callback as quickly as possible. E.g. reading in callback context should be avoided
89 //and using of Go signals is recommended. Also it should be noted that in case of several
90 //events received from different channels, callbacks are called in series one by one.
92 //This function is NOT SAFE FOR CONCURRENT USE by multiple goroutines.
93 func (s *SdlInstance) SubscribeChannel(cb func(string, ...string), channels ...string) error {
94 s.SubscribeChannelDB(cb, s.nsPrefix, s.eventSeparator, s.setNamespaceToChannels(channels...)...)
98 //UnsubscribeChannel removes subscription from one or several channels.
99 func (s *SdlInstance) UnsubscribeChannel(channels ...string) error {
100 s.UnsubscribeChannelDB(s.setNamespaceToChannels(channels...)...)
104 //Close connection to backend database.
105 func (s *SdlInstance) Close() error {
109 func (s *SdlInstance) checkChannelsAndEvents(cmd string, channelsAndEvents []string) error {
110 if len(channelsAndEvents)%2 != 0 {
111 return fmt.Errorf("%s: Channels and events must be given as pairs", cmd)
113 for i, v := range channelsAndEvents {
115 if strings.Contains(v, s.eventSeparator) {
116 return fmt.Errorf("%s: event %s contains illegal substring (\"%s\")", cmd, v, s.eventSeparator)
122 func (s *SdlInstance) setNamespaceToChannels(channels ...string) []string {
124 for _, v := range channels {
125 retVal = append(retVal, s.nsPrefix+v)
130 func (s *SdlInstance) setNamespaceToKeys(pairs ...interface{}) ([]interface{}, error) {
131 retVal := make([]interface{}, 0)
133 for _, v := range pairs {
134 reflectType := reflect.TypeOf(v)
135 switch reflectType.Kind() {
137 x := reflect.ValueOf(v).MapRange()
139 retVal = append(retVal, s.nsPrefix+x.Key().Interface().(string))
140 retVal = append(retVal, x.Value().Interface())
144 x := reflect.ValueOf(v)
146 return []interface{}{}, errors.New("Key/value pairs doesn't match")
148 for i2 := 0; i2 < x.Len(); i2++ {
150 retVal = append(retVal, s.nsPrefix+x.Index(i2).Interface().(string))
152 retVal = append(retVal, x.Index(i2).Interface())
156 if reflectType.Elem().Kind() == reflect.Uint8 {
157 retVal = append(retVal, v)
160 return []interface{}{}, errors.New("Key/value pairs doesn't match")
165 x := reflect.ValueOf(v)
167 return []interface{}{}, errors.New("Key/value pairs doesn't match")
169 for i2 := 0; i2 < x.Len(); i2++ {
171 retVal = append(retVal, s.nsPrefix+x.Index(i2).Interface().(string))
173 retVal = append(retVal, x.Index(i2).Interface())
177 if reflectType.Elem().Kind() == reflect.Uint8 {
178 retVal = append(retVal, v)
181 return []interface{}{}, errors.New("Key/value pairs doesn't match")
186 retVal = append(retVal, s.nsPrefix+v.(string))
189 retVal = append(retVal, v)
194 if len(retVal)%2 != 0 {
195 return []interface{}{}, errors.New("Key/value pairs doesn't match")
200 func (s *SdlInstance) prepareChannelsAndEvents(channelsAndEvents []string) []string {
201 channelEventMap := make(map[string]string)
202 for i, v := range channelsAndEvents {
206 _, exists := channelEventMap[v]
208 channelEventMap[v] = channelEventMap[v] + s.eventSeparator + channelsAndEvents[i+1]
210 channelEventMap[v] = channelsAndEvents[i+1]
213 retVal := make([]string, 0)
214 for k, v := range channelEventMap {
215 retVal = append(retVal, s.nsPrefix+k)
216 retVal = append(retVal, v)
221 //SetAndPublish function writes data to shared data layer storage and sends an event to
222 //a channel. Writing is done atomically, i.e. all succeeds or fails.
223 //Data to be written is given as key-value pairs. Several key-value
224 //pairs can be written with one call.
225 //The key is expected to be string whereas value can be anything, string,
226 //number, slice array or map
228 //If data was set successfully, an event is sent to a channel.
229 //Channels and events are given as pairs is channelsAndEvents parameter.
230 //It is possible to send several events to several channels by giving several
231 //channel-event pairs.
232 // E.g. []{"channel1", "event1", "channel2", "event2", "channel1", "event3"}
233 //will send event1 and event3 to channel1 and event2 to channel2.
234 func (s *SdlInstance) SetAndPublish(channelsAndEvents []string, pairs ...interface{}) error {
235 keyAndData, err := s.setNamespaceToKeys(pairs...)
239 if len(channelsAndEvents) == 0 {
240 return s.MSet(keyAndData...)
242 if err := s.checkChannelsAndEvents("SetAndPublish", channelsAndEvents); err != nil {
245 channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
246 return s.MSetMPub(channelsAndEventsPrepared, keyAndData...)
249 //Set function writes data to shared data layer storage. Writing is done
250 //atomically, i.e. all succeeds or fails.
251 //Data to be written is given as key-value pairs. Several key-value
252 //pairs can be written with one call.
253 //The key is expected to be string whereas value can be anything, string,
254 //number, slice array or map
255 func (s *SdlInstance) Set(pairs ...interface{}) error {
260 keyAndData, err := s.setNamespaceToKeys(pairs...)
264 return s.MSet(keyAndData...)
267 //Get function atomically reads one or more keys from SDL. The returned map has the
268 //requested keys as index and data as value. If the requested key is not found
269 //from SDL, it's value is nil
270 func (s *SdlInstance) Get(keys []string) (map[string]interface{}, error) {
271 m := make(map[string]interface{})
276 var keysWithNs []string
277 for _, v := range keys {
278 keysWithNs = append(keysWithNs, s.nsPrefix+v)
280 val, err := s.MGet(keysWithNs)
284 for i, v := range val {
290 //SetIfAndPublish atomically replaces existing data with newData in SDL if data matches the oldData.
291 //If replace was done successfully, true will be returned. Also, if publishing was successfull, an event
292 //is published to a given channel.
293 func (s *SdlInstance) SetIfAndPublish(channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error) {
294 if len(channelsAndEvents) == 0 {
295 return s.SetIE(s.nsPrefix+key, oldData, newData)
297 if err := s.checkChannelsAndEvents("SetIfAndPublish", channelsAndEvents); err != nil {
300 channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
301 return s.SetIEPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], s.nsPrefix+key, oldData, newData)
304 //SetIf atomically replaces existing data with newData in SDL if data matches the oldData.
305 //If replace was done successfully, true will be returned.
306 func (s *SdlInstance) SetIf(key string, oldData, newData interface{}) (bool, error) {
307 return s.SetIE(s.nsPrefix+key, oldData, newData)
310 //SetIfNotExistsAndPublish conditionally sets the value of a key. If key already exists in SDL,
311 //then it's value is not changed. Checking the key existence and potential set operation
312 //is done atomically. If the set operation was done successfully, an event is published to a
314 func (s *SdlInstance) SetIfNotExistsAndPublish(channelsAndEvents []string, key string, data interface{}) (bool, error) {
315 if len(channelsAndEvents) == 0 {
316 return s.SetNX(s.nsPrefix+key, data, 0)
318 if err := s.checkChannelsAndEvents("SetIfNotExistsAndPublish", channelsAndEvents); err != nil {
321 channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
322 return s.SetNXPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], s.nsPrefix+key, data)
325 //SetIfNotExists conditionally sets the value of a key. If key already exists in SDL,
326 //then it's value is not changed. Checking the key existence and potential set operation
327 //is done atomically.
328 func (s *SdlInstance) SetIfNotExists(key string, data interface{}) (bool, error) {
329 return s.SetNX(s.nsPrefix+key, data, 0)
332 //RemoveAndPublish removes data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
333 //Trying to remove a nonexisting key is not considered as an error.
334 //An event is published into a given channel if remove operation is successfull and
335 //at least one key is removed (if several keys given). If the given key(s) doesn't exist
336 //when trying to remove, no event is published.
337 func (s *SdlInstance) RemoveAndPublish(channelsAndEvents []string, keys []string) error {
342 var keysWithNs []string
343 for _, v := range keys {
344 keysWithNs = append(keysWithNs, s.nsPrefix+v)
346 if len(channelsAndEvents) == 0 {
347 return s.Del(keysWithNs)
349 if err := s.checkChannelsAndEvents("RemoveAndPublish", channelsAndEvents); err != nil {
352 channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
353 return s.DelMPub(channelsAndEventsPrepared, keysWithNs)
356 //Remove data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
357 func (s *SdlInstance) Remove(keys []string) error {
362 var keysWithNs []string
363 for _, v := range keys {
364 keysWithNs = append(keysWithNs, s.nsPrefix+v)
366 err := s.Del(keysWithNs)
370 //RemoveIfAndPublish removes data from SDL conditionally and if remove was done successfully,
371 //a given event is published to channel. If existing data matches given data,
372 //key and data are removed from SDL. If remove was done successfully, true is returned.
373 func (s *SdlInstance) RemoveIfAndPublish(channelsAndEvents []string, key string, data interface{}) (bool, error) {
374 if len(channelsAndEvents) == 0 {
375 return s.DelIE(s.nsPrefix+key, data)
377 if err := s.checkChannelsAndEvents("RemoveIfAndPublish", channelsAndEvents); err != nil {
380 channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
381 return s.DelIEPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], s.nsPrefix+key, data)
384 //RemoveIf removes data from SDL conditionally. If existing data matches given data,
385 //key and data are removed from SDL. If remove was done successfully, true is returned.
386 func (s *SdlInstance) RemoveIf(key string, data interface{}) (bool, error) {
387 status, err := s.DelIE(s.nsPrefix+key, data)
394 //GetAll returns all keys under the namespace. No prior knowledge about the keys in the
395 //given namespace exists, thus operation is not guaranteed to be atomic or isolated.
396 func (s *SdlInstance) GetAll() ([]string, error) {
397 keys, err := s.Keys(s.nsPrefix + "*")
402 for _, v := range keys {
403 retVal = append(retVal, strings.Split(v, s.nsPrefix)[1])
408 //RemoveAll removes all keys under the namespace. Remove operation is not atomic, thus
409 //it is not guaranteed that all keys are removed.
410 func (s *SdlInstance) RemoveAll() error {
411 keys, err := s.Keys(s.nsPrefix + "*")
415 if (keys != nil) && (len(keys) != 0) {
421 //RemoveAllAndPublish removes all keys under the namespace and if successfull, it
422 //will publish an event to given channel. This operation is not atomic, thus it is
423 //not guaranteed that all keys are removed.
424 func (s *SdlInstance) RemoveAllAndPublish(channelsAndEvents []string) error {
425 keys, err := s.Keys(s.nsPrefix + "*")
429 if (keys != nil) && (len(keys) != 0) {
430 if len(channelsAndEvents) == 0 {
433 if err := s.checkChannelsAndEvents("RemoveIfAndPublish", channelsAndEvents); err != nil {
436 channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
437 err = s.DelMPub(channelsAndEventsPrepared, keys)
442 //AddMember adds a new members to a group.
444 //SDL groups are unordered collections of members where each member is
445 //unique. It is possible to add the same member several times without the
446 //need to check if it already exists.
447 func (s *SdlInstance) AddMember(group string, member ...interface{}) error {
448 return s.SAdd(s.nsPrefix+group, member...)
451 //RemoveMember removes members from a group.
452 func (s *SdlInstance) RemoveMember(group string, member ...interface{}) error {
453 return s.SRem(s.nsPrefix+group, member...)
456 //RemoveGroup removes the whole group along with it's members.
457 func (s *SdlInstance) RemoveGroup(group string) error {
458 return s.Del([]string{s.nsPrefix + group})
461 //GetMembers returns all the members from a group.
462 func (s *SdlInstance) GetMembers(group string) ([]string, error) {
463 retVal, err := s.SMembers(s.nsPrefix + group)
465 return []string{}, err
470 //IsMember returns true if given member is found from a group.
471 func (s *SdlInstance) IsMember(group string, member interface{}) (bool, error) {
472 retVal, err := s.SIsMember(s.nsPrefix+group, member)
479 //GroupSize returns the number of members in a group.
480 func (s *SdlInstance) GroupSize(group string) (int64, error) {
481 retVal, err := s.SCard(s.nsPrefix + group)
488 func (s *SdlInstance) randomToken() (string, error) {
490 defer s.mutex.Unlock()
493 s.tmp = make([]byte, 16)
496 if _, err := io.ReadFull(rand.Reader, s.tmp); err != nil {
500 return base64.RawURLEncoding.EncodeToString(s.tmp), nil
503 //LockResource function is used for locking a resource. The resource lock in
504 //practice is a key with random value that is set to expire after a time
505 //period. The value written to key is a random value, thus only the instance
506 //created a lock, can release it. Resource locks are per namespace.
507 func (s *SdlInstance) LockResource(resource string, expiration time.Duration, opt *Options) (*Lock, error) {
508 value, err := s.randomToken()
513 var retryTimer *time.Timer
514 for i, attempts := 0, opt.getRetryCount()+1; i < attempts; i++ {
515 ok, err := s.SetNX(s.nsPrefix+resource, value, expiration)
519 return &Lock{s: s, key: resource, value: value}, nil
521 if retryTimer == nil {
522 retryTimer = time.NewTimer(opt.getRetryWait())
523 defer retryTimer.Stop()
525 retryTimer.Reset(opt.getRetryWait())
532 return nil, errors.New("Lock not obtained")
535 //ReleaseResource removes the lock from a resource. If lock is already
536 //expired or some other instance is keeping the lock (lock taken after expiration),
537 //an error is returned.
538 func (l *Lock) ReleaseResource() error {
539 ok, err := l.s.DelIE(l.s.nsPrefix+l.key, l.value)
545 return errors.New("Lock not held")
550 //RefreshResource function can be used to set a new expiration time for the
551 //resource lock (if the lock still exists). The old remaining expiration
552 //time is overwritten with the given new expiration time.
553 func (l *Lock) RefreshResource(expiration time.Duration) error {
554 err := l.s.PExpireIE(l.s.nsPrefix+l.key, l.value, expiration)
558 //CheckResource returns the expiration time left for a resource.
559 //If the resource doesn't exist, -2 is returned.
560 func (s *SdlInstance) CheckResource(resource string) (time.Duration, error) {
561 result, err := s.PTTL(s.nsPrefix + resource)
565 if result == time.Duration(-1) {
566 return 0, errors.New("invalid resource given, no expiration time attached")
571 //Options struct defines the behaviour for getting the resource lock.
572 type Options struct {
573 //The number of time the lock will be tried.
574 //Default: 0 = no retry
577 //Wait between the retries.
579 RetryWait time.Duration
582 func (o *Options) getRetryCount() int {
583 if o != nil && o.RetryCount > 0 {
589 func (o *Options) getRetryWait() time.Duration {
590 if o != nil && o.RetryWait > 0 {
593 return 100 * time.Millisecond
596 //Lock struct identifies the resource lock instance. Releasing and adjusting the
597 //expirations are done using the methods defined for this struct.
604 type iDatabase interface {
605 SubscribeChannelDB(cb func(string, ...string), channelPrefix, eventSeparator string, channels ...string)
606 UnsubscribeChannelDB(channels ...string)
607 MSet(pairs ...interface{}) error
608 MSetMPub(channelsAndEvents []string, pairs ...interface{}) error
609 MGet(keys []string) ([]interface{}, error)
611 Del(keys []string) error
612 DelMPub(channelsAndEvents []string, keys []string) error
613 Keys(key string) ([]string, error)
614 SetIE(key string, oldData, newData interface{}) (bool, error)
615 SetIEPub(channel, message, key string, oldData, newData interface{}) (bool, error)
616 SetNX(key string, data interface{}, expiration time.Duration) (bool, error)
617 SetNXPub(channel, message, key string, data interface{}) (bool, error)
618 DelIE(key string, data interface{}) (bool, error)
619 DelIEPub(channel, message, key string, data interface{}) (bool, error)
620 SAdd(key string, data ...interface{}) error
621 SRem(key string, data ...interface{}) error
622 SMembers(key string) ([]string, error)
623 SIsMember(key string, data interface{}) (bool, error)
624 SCard(key string) (int64, error)
625 PTTL(key string) (time.Duration, error)
626 PExpireIE(key string, data interface{}, expiration time.Duration) error