2 Copyright (c) 2019 AT&T Intellectual Property.
3 Copyright (c) 2018-2019 Nokia.
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
9 http://www.apache.org/licenses/LICENSE-2.0
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
31 "gerrit.o-ran-sc.org/r/ric-plt/sdlgo/internal/sdlgoredis"
34 //SdlInstance provides an API to read, write and modify
35 //key-value pairs in a given namespace.
36 type SdlInstance struct {
45 //NewDatabase creates a connection to database that will be used
46 //as a backend for the key-value storage. The returned value shall
47 //be given as a parameter when calling NewKeyValStorage
48 func NewDatabase() *sdlgoredis.DB {
49 return sdlgoredis.Create()
52 //NewSdlInstance creates a new sdl instance using the given namespace.
53 //The database used as a backend is given as a parameter
54 func NewSdlInstance(NameSpace string, db iDatabase) *SdlInstance {
57 nsPrefix: "{" + NameSpace + "},",
58 eventSeparator: "___",
63 //SubscribeChannel lets you to subscribe for a events on a given channels.
64 //SDL notifications are events that are published on a specific channels.
65 //Both the channel and events are defined by the entity that is publishing
68 //When subscribing for a channel, a callback function is given as a parameter.
69 //Whenever a notification is received from a channel, this callback is called
70 //with channel and notifications as parameter (several notifications could be
71 //packed to a single callback function call). A call to SubscribeChannel function
72 //returns immediatelly, callbacks will be called asyncronously.
74 //It is possible to subscribe to different channels using different callbacks. In
75 //this case simply use SubscribeChannel function separately for each channel.
77 //When receiving events in callback routine, it is a good practive to return from
78 //callback as quickly as possible. E.g. reading in callback context should be avoided
79 //and using of Go signals is recommended. Also it should be noted that in case of several
80 //events received from different channels, callbacks are called in series one by one.
81 func (s *SdlInstance) SubscribeChannel(cb func(string, ...string), channels ...string) error {
82 s.SubscribeChannelDB(cb, s.nsPrefix, s.eventSeparator, s.setNamespaceToChannels(channels...)...)
86 //UnsubscribeChannel removes subscription from one or several channels.
87 func (s *SdlInstance) UnsubscribeChannel(channels ...string) error {
88 s.UnsubscribeChannelDB(s.setNamespaceToChannels(channels...)...)
92 //Close connection to backend database.
93 func (s *SdlInstance) Close() error {
97 func (s *SdlInstance) checkChannelsAndEvents(cmd string, channelsAndEvents []string) error {
98 if len(channelsAndEvents)%2 != 0 {
99 return fmt.Errorf("%s: Channels and events must be given as pairs", cmd)
101 for i, v := range channelsAndEvents {
103 if strings.Contains(v, s.eventSeparator) {
104 return fmt.Errorf("%s: event %s contains illegal substring (\"%s\")", cmd, v, s.eventSeparator)
110 func (s *SdlInstance) setNamespaceToChannels(channels ...string) []string {
112 for _, v := range channels {
113 retVal = append(retVal, s.nsPrefix+v)
118 func (s *SdlInstance) setNamespaceToKeys(pairs ...interface{}) ([]interface{}, error) {
119 retVal := make([]interface{}, 0)
121 for _, v := range pairs {
122 reflectType := reflect.TypeOf(v)
123 switch reflectType.Kind() {
125 x := reflect.ValueOf(v).MapRange()
127 retVal = append(retVal, s.nsPrefix+x.Key().Interface().(string))
128 retVal = append(retVal, x.Value().Interface())
132 x := reflect.ValueOf(v)
134 return []interface{}{}, errors.New("Key/value pairs doesn't match")
136 for i2 := 0; i2 < x.Len(); i2++ {
138 retVal = append(retVal, s.nsPrefix+x.Index(i2).Interface().(string))
140 retVal = append(retVal, x.Index(i2).Interface())
144 if reflectType.Elem().Kind() == reflect.Uint8 {
145 retVal = append(retVal, v)
148 return []interface{}{}, errors.New("Key/value pairs doesn't match")
153 x := reflect.ValueOf(v)
155 return []interface{}{}, errors.New("Key/value pairs doesn't match")
157 for i2 := 0; i2 < x.Len(); i2++ {
159 retVal = append(retVal, s.nsPrefix+x.Index(i2).Interface().(string))
161 retVal = append(retVal, x.Index(i2).Interface())
165 if reflectType.Elem().Kind() == reflect.Uint8 {
166 retVal = append(retVal, v)
169 return []interface{}{}, errors.New("Key/value pairs doesn't match")
174 retVal = append(retVal, s.nsPrefix+v.(string))
177 retVal = append(retVal, v)
182 if len(retVal)%2 != 0 {
183 return []interface{}{}, errors.New("Key/value pairs doesn't match")
188 func (s *SdlInstance) prepareChannelsAndEvents(channelsAndEvents []string) []string {
189 channelEventMap := make(map[string]string)
190 for i, v := range channelsAndEvents {
194 _, exists := channelEventMap[v]
196 channelEventMap[v] = channelEventMap[v] + s.eventSeparator + channelsAndEvents[i+1]
198 channelEventMap[v] = channelsAndEvents[i+1]
201 retVal := make([]string, 0)
202 for k, v := range channelEventMap {
203 retVal = append(retVal, s.nsPrefix+k)
204 retVal = append(retVal, v)
209 //SetAndPublish function writes data to shared data layer storage and sends an event to
210 //a channel. Writing is done atomically, i.e. all succeeds or fails.
211 //Data to be written is given as key-value pairs. Several key-value
212 //pairs can be written with one call.
213 //The key is expected to be string whereas value can be anything, string,
214 //number, slice array or map
216 //If data was set successfully, an event is sent to a channel.
217 //Channels and events are given as pairs is channelsAndEvents parameter.
218 //It is possible to send several events to several channels by giving several
219 //channel-event pairs.
220 // E.g. []{"channel1", "event1", "channel2", "event2", "channel1", "event3"}
221 //will send event1 and event3 to channel1 and event2 to channel2.
222 func (s *SdlInstance) SetAndPublish(channelsAndEvents []string, pairs ...interface{}) error {
223 keyAndData, err := s.setNamespaceToKeys(pairs...)
227 if len(channelsAndEvents) == 0 {
228 return s.MSet(keyAndData...)
230 if err := s.checkChannelsAndEvents("SetAndPublish", channelsAndEvents); err != nil {
233 channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
234 return s.MSetMPub(channelsAndEventsPrepared, keyAndData...)
237 //Set function writes data to shared data layer storage. Writing is done
238 //atomically, i.e. all succeeds or fails.
239 //Data to be written is given as key-value pairs. Several key-value
240 //pairs can be written with one call.
241 //The key is expected to be string whereas value can be anything, string,
242 //number, slice array or map
243 func (s *SdlInstance) Set(pairs ...interface{}) error {
248 keyAndData, err := s.setNamespaceToKeys(pairs...)
252 return s.MSet(keyAndData...)
255 //Get function atomically reads one or more keys from SDL. The returned map has the
256 //requested keys as index and data as value. If the requested key is not found
257 //from SDL, it's value is nil
258 func (s *SdlInstance) Get(keys []string) (map[string]interface{}, error) {
259 m := make(map[string]interface{})
264 var keysWithNs []string
265 for _, v := range keys {
266 keysWithNs = append(keysWithNs, s.nsPrefix+v)
268 val, err := s.MGet(keysWithNs)
272 for i, v := range val {
278 //SetIfAndPublish atomically replaces existing data with newData in SDL if data matches the oldData.
279 //If replace was done successfully, true will be returned. Also, if publishing was successfull, an event
280 //is published to a given channel.
281 func (s *SdlInstance) SetIfAndPublish(channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error) {
282 if len(channelsAndEvents) == 0 {
283 return s.SetIE(s.nsPrefix+key, oldData, newData)
285 if err := s.checkChannelsAndEvents("SetIfAndPublish", channelsAndEvents); err != nil {
288 channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
289 return s.SetIEPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], s.nsPrefix+key, oldData, newData)
292 //SetIf atomically replaces existing data with newData in SDL if data matches the oldData.
293 //If replace was done successfully, true will be returned.
294 func (s *SdlInstance) SetIf(key string, oldData, newData interface{}) (bool, error) {
295 return s.SetIE(s.nsPrefix+key, oldData, newData)
298 //SetIfNotExistsAndPublish conditionally sets the value of a key. If key already exists in SDL,
299 //then it's value is not changed. Checking the key existence and potential set operation
300 //is done atomically. If the set operation was done successfully, an event is published to a
302 func (s *SdlInstance) SetIfNotExistsAndPublish(channelsAndEvents []string, key string, data interface{}) (bool, error) {
303 if len(channelsAndEvents) == 0 {
304 return s.SetNX(s.nsPrefix+key, data, 0)
306 if err := s.checkChannelsAndEvents("SetIfNotExistsAndPublish", channelsAndEvents); err != nil {
309 channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
310 return s.SetNXPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], s.nsPrefix+key, data)
313 //SetIfNotExists conditionally sets the value of a key. If key already exists in SDL,
314 //then it's value is not changed. Checking the key existence and potential set operation
315 //is done atomically.
316 func (s *SdlInstance) SetIfNotExists(key string, data interface{}) (bool, error) {
317 return s.SetNX(s.nsPrefix+key, data, 0)
320 //RemoveAndPublish removes data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
321 //Trying to remove a nonexisting key is not considered as an error.
322 //An event is published into a given channel if remove operation is successfull and
323 //at least one key is removed (if several keys given). If the given key(s) doesn't exist
324 //when trying to remove, no event is published.
325 func (s *SdlInstance) RemoveAndPublish(channelsAndEvents []string, keys []string) error {
330 var keysWithNs []string
331 for _, v := range keys {
332 keysWithNs = append(keysWithNs, s.nsPrefix+v)
334 if len(channelsAndEvents) == 0 {
335 return s.Del(keysWithNs)
337 if err := s.checkChannelsAndEvents("RemoveAndPublish", channelsAndEvents); err != nil {
340 channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
341 return s.DelMPub(channelsAndEventsPrepared, keysWithNs)
344 //Remove data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
345 func (s *SdlInstance) Remove(keys []string) error {
350 var keysWithNs []string
351 for _, v := range keys {
352 keysWithNs = append(keysWithNs, s.nsPrefix+v)
354 err := s.Del(keysWithNs)
358 //RemoveIfAndPublish removes data from SDL conditionally and if remove was done successfully,
359 //a given event is published to channel. If existing data matches given data,
360 //key and data are removed from SDL. If remove was done successfully, true is returned.
361 func (s *SdlInstance) RemoveIfAndPublish(channelsAndEvents []string, key string, data interface{}) (bool, error) {
362 if len(channelsAndEvents) == 0 {
363 return s.DelIE(s.nsPrefix+key, data)
365 if err := s.checkChannelsAndEvents("RemoveIfAndPublish", channelsAndEvents); err != nil {
368 channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
369 return s.DelIEPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], s.nsPrefix+key, data)
372 //RemoveIf removes data from SDL conditionally. If existing data matches given data,
373 //key and data are removed from SDL. If remove was done successfully, true is returned.
374 func (s *SdlInstance) RemoveIf(key string, data interface{}) (bool, error) {
375 status, err := s.DelIE(s.nsPrefix+key, data)
382 //GetAll returns all keys under the namespace. No prior knowledge about the keys in the
383 //given namespace exists, thus operation is not guaranteed to be atomic or isolated.
384 func (s *SdlInstance) GetAll() ([]string, error) {
385 keys, err := s.Keys(s.nsPrefix + "*")
390 for _, v := range keys {
391 retVal = append(retVal, strings.Split(v, s.nsPrefix)[1])
396 //RemoveAll removes all keys under the namespace. Remove operation is not atomic, thus
397 //it is not guaranteed that all keys are removed.
398 func (s *SdlInstance) RemoveAll() error {
399 keys, err := s.Keys(s.nsPrefix + "*")
403 if (keys != nil) && (len(keys) != 0) {
409 //RemoveAllAndPublish removes all keys under the namespace and if successfull, it
410 //will publish an event to given channel. This operation is not atomic, thus it is
411 //not guaranteed that all keys are removed.
412 func (s *SdlInstance) RemoveAllAndPublish(channelsAndEvents []string) error {
413 keys, err := s.Keys(s.nsPrefix + "*")
417 if (keys != nil) && (len(keys) != 0) {
418 if len(channelsAndEvents) == 0 {
421 if err := s.checkChannelsAndEvents("RemoveIfAndPublish", channelsAndEvents); err != nil {
424 channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
425 err = s.DelMPub(channelsAndEventsPrepared, keys)
430 //AddMember adds a new members to a group.
432 //SDL groups are unordered collections of members where each member is
433 //unique. It is possible to add the same member several times without the
434 //need to check if it already exists.
435 func (s *SdlInstance) AddMember(group string, member ...interface{}) error {
436 return s.SAdd(s.nsPrefix+group, member...)
439 //RemoveMember removes members from a group.
440 func (s *SdlInstance) RemoveMember(group string, member ...interface{}) error {
441 return s.SRem(s.nsPrefix+group, member...)
444 //RemoveGroup removes the whole group along with it's members.
445 func (s *SdlInstance) RemoveGroup(group string) error {
446 return s.Del([]string{s.nsPrefix + group})
449 //GetMembers returns all the members from a group.
450 func (s *SdlInstance) GetMembers(group string) ([]string, error) {
451 retVal, err := s.SMembers(s.nsPrefix + group)
453 return []string{}, err
458 //IsMember returns true if given member is found from a group.
459 func (s *SdlInstance) IsMember(group string, member interface{}) (bool, error) {
460 retVal, err := s.SIsMember(s.nsPrefix+group, member)
467 //GroupSize returns the number of members in a group.
468 func (s *SdlInstance) GroupSize(group string) (int64, error) {
469 retVal, err := s.SCard(s.nsPrefix + group)
476 func (s *SdlInstance) randomToken() (string, error) {
478 defer s.mutex.Unlock()
481 s.tmp = make([]byte, 16)
484 if _, err := io.ReadFull(rand.Reader, s.tmp); err != nil {
488 return base64.RawURLEncoding.EncodeToString(s.tmp), nil
491 //LockResource function is used for locking a resource. The resource lock in
492 //practice is a key with random value that is set to expire after a time
493 //period. The value written to key is a random value, thus only the instance
494 //created a lock, can release it. Resource locks are per namespace.
495 func (s *SdlInstance) LockResource(resource string, expiration time.Duration, opt *Options) (*Lock, error) {
496 value, err := s.randomToken()
501 var retryTimer *time.Timer
502 for i, attempts := 0, opt.getRetryCount()+1; i < attempts; i++ {
503 ok, err := s.SetNX(s.nsPrefix+resource, value, expiration)
507 return &Lock{s: s, key: resource, value: value}, nil
509 if retryTimer == nil {
510 retryTimer = time.NewTimer(opt.getRetryWait())
511 defer retryTimer.Stop()
513 retryTimer.Reset(opt.getRetryWait())
520 return nil, errors.New("Lock not obtained")
523 //ReleaseResource removes the lock from a resource. If lock is already
524 //expired or some other instance is keeping the lock (lock taken after expiration),
525 //an error is returned.
526 func (l *Lock) ReleaseResource() error {
527 ok, err := l.s.DelIE(l.s.nsPrefix+l.key, l.value)
533 return errors.New("Lock not held")
538 //RefreshResource function can be used to set a new expiration time for the
539 //resource lock (if the lock still exists). The old remaining expiration
540 //time is overwritten with the given new expiration time.
541 func (l *Lock) RefreshResource(expiration time.Duration) error {
542 err := l.s.PExpireIE(l.s.nsPrefix+l.key, l.value, expiration)
546 //CheckResource returns the expiration time left for a resource.
547 //If the resource doesn't exist, -2 is returned.
548 func (s *SdlInstance) CheckResource(resource string) (time.Duration, error) {
549 result, err := s.PTTL(s.nsPrefix + resource)
553 if result == time.Duration(-1) {
554 return 0, errors.New("invalid resource given, no expiration time attached")
559 //Options struct defines the behaviour for getting the resource lock.
560 type Options struct {
561 //The number of time the lock will be tried.
562 //Default: 0 = no retry
565 //Wait between the retries.
567 RetryWait time.Duration
570 func (o *Options) getRetryCount() int {
571 if o != nil && o.RetryCount > 0 {
577 func (o *Options) getRetryWait() time.Duration {
578 if o != nil && o.RetryWait > 0 {
581 return 100 * time.Millisecond
584 //Lock struct identifies the resource lock instance. Releasing and adjusting the
585 //expirations are done using the methods defined for this struct.
592 type iDatabase interface {
593 SubscribeChannelDB(cb sdlgoredis.ChannelNotificationCb, channelPrefix, eventSeparator string, channels ...string)
594 UnsubscribeChannelDB(channels ...string)
595 MSet(pairs ...interface{}) error
596 MSetMPub(channelsAndEvents []string, pairs ...interface{}) error
597 MGet(keys []string) ([]interface{}, error)
599 Del(keys []string) error
600 DelMPub(channelsAndEvents []string, keys []string) error
601 Keys(key string) ([]string, error)
602 SetIE(key string, oldData, newData interface{}) (bool, error)
603 SetIEPub(channel, message, key string, oldData, newData interface{}) (bool, error)
604 SetNX(key string, data interface{}, expiration time.Duration) (bool, error)
605 SetNXPub(channel, message, key string, data interface{}) (bool, error)
606 DelIE(key string, data interface{}) (bool, error)
607 DelIEPub(channel, message, key string, data interface{}) (bool, error)
608 SAdd(key string, data ...interface{}) error
609 SRem(key string, data ...interface{}) error
610 SMembers(key string) ([]string, error)
611 SIsMember(key string, data interface{}) (bool, error)
612 SCard(key string) (int64, error)
613 PTTL(key string) (time.Duration, error)
614 PExpireIE(key string, data interface{}, expiration time.Duration) error