2 Copyright (c) 2019 AT&T Intellectual Property.
3 Copyright (c) 2018-2019 Nokia.
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
9 http://www.apache.org/licenses/LICENSE-2.0
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
19 * This source code is part of the near-RT RIC (RAN Intelligent Controller)
20 * platform project (RICP).
36 "gerrit.o-ran-sc.org/r/ric-plt/sdlgo/internal/sdlgoredis"
39 //SdlInstance provides an API to read, write and modify
40 //key-value pairs in a given namespace.
41 type SdlInstance struct {
50 //Database struct is a holder for the internal database instance. Applications
51 //can use this exported data type to locally store a reference to database
52 //instance returned from NewDabase() function.
53 type Database struct {
57 //NewDatabase creates a connection to database that will be used
58 //as a backend for the key-value storage. The returned value
59 //can be reused between multiple SDL instances in which case each instance
60 //is using the same connection.
61 func NewDatabase() *Database {
63 instance: sdlgoredis.Create(),
67 //NewSdlInstance creates a new sdl instance using the given namespace.
68 //The database used as a backend is given as a parameter
69 func NewSdlInstance(NameSpace string, db *Database) *SdlInstance {
72 nsPrefix: "{" + NameSpace + "},",
73 eventSeparator: "___",
74 iDatabase: db.instance,
78 //SubscribeChannel lets you to subscribe for a events on a given channels.
79 //SDL notifications are events that are published on a specific channels.
80 //Both the channel and events are defined by the entity that is publishing
83 //When subscribing for a channel, a callback function is given as a parameter.
84 //Whenever a notification is received from a channel, this callback is called
85 //with channel and notifications as parameter (several notifications could be
86 //packed to a single callback function call). A call to SubscribeChannel function
87 //returns immediatelly, callbacks will be called asyncronously.
89 //It is possible to subscribe to different channels using different callbacks. In
90 //this case simply use SubscribeChannel function separately for each channel.
92 //When receiving events in callback routine, it is a good practive to return from
93 //callback as quickly as possible. E.g. reading in callback context should be avoided
94 //and using of Go signals is recommended. Also it should be noted that in case of several
95 //events received from different channels, callbacks are called in series one by one.
96 func (s *SdlInstance) SubscribeChannel(cb func(string, ...string), channels ...string) error {
97 s.SubscribeChannelDB(cb, s.nsPrefix, s.eventSeparator, s.setNamespaceToChannels(channels...)...)
101 //UnsubscribeChannel removes subscription from one or several channels.
102 func (s *SdlInstance) UnsubscribeChannel(channels ...string) error {
103 s.UnsubscribeChannelDB(s.setNamespaceToChannels(channels...)...)
107 //Close connection to backend database.
108 func (s *SdlInstance) Close() error {
112 func (s *SdlInstance) checkChannelsAndEvents(cmd string, channelsAndEvents []string) error {
113 if len(channelsAndEvents)%2 != 0 {
114 return fmt.Errorf("%s: Channels and events must be given as pairs", cmd)
116 for i, v := range channelsAndEvents {
118 if strings.Contains(v, s.eventSeparator) {
119 return fmt.Errorf("%s: event %s contains illegal substring (\"%s\")", cmd, v, s.eventSeparator)
125 func (s *SdlInstance) setNamespaceToChannels(channels ...string) []string {
127 for _, v := range channels {
128 retVal = append(retVal, s.nsPrefix+v)
133 func (s *SdlInstance) setNamespaceToKeys(pairs ...interface{}) ([]interface{}, error) {
134 retVal := make([]interface{}, 0)
136 for _, v := range pairs {
137 reflectType := reflect.TypeOf(v)
138 switch reflectType.Kind() {
140 x := reflect.ValueOf(v).MapRange()
142 retVal = append(retVal, s.nsPrefix+x.Key().Interface().(string))
143 retVal = append(retVal, x.Value().Interface())
147 x := reflect.ValueOf(v)
149 return []interface{}{}, errors.New("Key/value pairs doesn't match")
151 for i2 := 0; i2 < x.Len(); i2++ {
153 retVal = append(retVal, s.nsPrefix+x.Index(i2).Interface().(string))
155 retVal = append(retVal, x.Index(i2).Interface())
159 if reflectType.Elem().Kind() == reflect.Uint8 {
160 retVal = append(retVal, v)
163 return []interface{}{}, errors.New("Key/value pairs doesn't match")
168 x := reflect.ValueOf(v)
170 return []interface{}{}, errors.New("Key/value pairs doesn't match")
172 for i2 := 0; i2 < x.Len(); i2++ {
174 retVal = append(retVal, s.nsPrefix+x.Index(i2).Interface().(string))
176 retVal = append(retVal, x.Index(i2).Interface())
180 if reflectType.Elem().Kind() == reflect.Uint8 {
181 retVal = append(retVal, v)
184 return []interface{}{}, errors.New("Key/value pairs doesn't match")
189 retVal = append(retVal, s.nsPrefix+v.(string))
192 retVal = append(retVal, v)
197 if len(retVal)%2 != 0 {
198 return []interface{}{}, errors.New("Key/value pairs doesn't match")
203 func (s *SdlInstance) prepareChannelsAndEvents(channelsAndEvents []string) []string {
204 channelEventMap := make(map[string]string)
205 for i, v := range channelsAndEvents {
209 _, exists := channelEventMap[v]
211 channelEventMap[v] = channelEventMap[v] + s.eventSeparator + channelsAndEvents[i+1]
213 channelEventMap[v] = channelsAndEvents[i+1]
216 retVal := make([]string, 0)
217 for k, v := range channelEventMap {
218 retVal = append(retVal, s.nsPrefix+k)
219 retVal = append(retVal, v)
224 //SetAndPublish function writes data to shared data layer storage and sends an event to
225 //a channel. Writing is done atomically, i.e. all succeeds or fails.
226 //Data to be written is given as key-value pairs. Several key-value
227 //pairs can be written with one call.
228 //The key is expected to be string whereas value can be anything, string,
229 //number, slice array or map
231 //If data was set successfully, an event is sent to a channel.
232 //Channels and events are given as pairs is channelsAndEvents parameter.
233 //It is possible to send several events to several channels by giving several
234 //channel-event pairs.
235 // E.g. []{"channel1", "event1", "channel2", "event2", "channel1", "event3"}
236 //will send event1 and event3 to channel1 and event2 to channel2.
237 func (s *SdlInstance) SetAndPublish(channelsAndEvents []string, pairs ...interface{}) error {
238 keyAndData, err := s.setNamespaceToKeys(pairs...)
242 if len(channelsAndEvents) == 0 {
243 return s.MSet(keyAndData...)
245 if err := s.checkChannelsAndEvents("SetAndPublish", channelsAndEvents); err != nil {
248 channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
249 return s.MSetMPub(channelsAndEventsPrepared, keyAndData...)
252 //Set function writes data to shared data layer storage. Writing is done
253 //atomically, i.e. all succeeds or fails.
254 //Data to be written is given as key-value pairs. Several key-value
255 //pairs can be written with one call.
256 //The key is expected to be string whereas value can be anything, string,
257 //number, slice array or map
258 func (s *SdlInstance) Set(pairs ...interface{}) error {
263 keyAndData, err := s.setNamespaceToKeys(pairs...)
267 return s.MSet(keyAndData...)
270 //Get function atomically reads one or more keys from SDL. The returned map has the
271 //requested keys as index and data as value. If the requested key is not found
272 //from SDL, it's value is nil
273 func (s *SdlInstance) Get(keys []string) (map[string]interface{}, error) {
274 m := make(map[string]interface{})
279 var keysWithNs []string
280 for _, v := range keys {
281 keysWithNs = append(keysWithNs, s.nsPrefix+v)
283 val, err := s.MGet(keysWithNs)
287 for i, v := range val {
293 //SetIfAndPublish atomically replaces existing data with newData in SDL if data matches the oldData.
294 //If replace was done successfully, true will be returned. Also, if publishing was successfull, an event
295 //is published to a given channel.
296 func (s *SdlInstance) SetIfAndPublish(channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error) {
297 if len(channelsAndEvents) == 0 {
298 return s.SetIE(s.nsPrefix+key, oldData, newData)
300 if err := s.checkChannelsAndEvents("SetIfAndPublish", channelsAndEvents); err != nil {
303 channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
304 return s.SetIEPub(channelsAndEventsPrepared, s.nsPrefix+key, oldData, newData)
307 //SetIf atomically replaces existing data with newData in SDL if data matches the oldData.
308 //If replace was done successfully, true will be returned.
309 func (s *SdlInstance) SetIf(key string, oldData, newData interface{}) (bool, error) {
310 return s.SetIE(s.nsPrefix+key, oldData, newData)
313 //SetIfNotExistsAndPublish conditionally sets the value of a key. If key already exists in SDL,
314 //then it's value is not changed. Checking the key existence and potential set operation
315 //is done atomically. If the set operation was done successfully, an event is published to a
317 func (s *SdlInstance) SetIfNotExistsAndPublish(channelsAndEvents []string, key string, data interface{}) (bool, error) {
318 if len(channelsAndEvents) == 0 {
319 return s.SetNX(s.nsPrefix+key, data, 0)
321 if err := s.checkChannelsAndEvents("SetIfNotExistsAndPublish", channelsAndEvents); err != nil {
324 channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
325 return s.SetNXPub(channelsAndEventsPrepared, s.nsPrefix+key, data)
328 //SetIfNotExists conditionally sets the value of a key. If key already exists in SDL,
329 //then it's value is not changed. Checking the key existence and potential set operation
330 //is done atomically.
331 func (s *SdlInstance) SetIfNotExists(key string, data interface{}) (bool, error) {
332 return s.SetNX(s.nsPrefix+key, data, 0)
335 //RemoveAndPublish removes data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
336 //Trying to remove a nonexisting key is not considered as an error.
337 //An event is published into a given channel if remove operation is successfull and
338 //at least one key is removed (if several keys given). If the given key(s) doesn't exist
339 //when trying to remove, no event is published.
340 func (s *SdlInstance) RemoveAndPublish(channelsAndEvents []string, keys []string) error {
345 var keysWithNs []string
346 for _, v := range keys {
347 keysWithNs = append(keysWithNs, s.nsPrefix+v)
349 if len(channelsAndEvents) == 0 {
350 return s.Del(keysWithNs)
352 if err := s.checkChannelsAndEvents("RemoveAndPublish", channelsAndEvents); err != nil {
355 channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
356 return s.DelMPub(channelsAndEventsPrepared, keysWithNs)
359 //Remove data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
360 func (s *SdlInstance) Remove(keys []string) error {
365 var keysWithNs []string
366 for _, v := range keys {
367 keysWithNs = append(keysWithNs, s.nsPrefix+v)
369 err := s.Del(keysWithNs)
373 //RemoveIfAndPublish removes data from SDL conditionally and if remove was done successfully,
374 //a given event is published to channel. If existing data matches given data,
375 //key and data are removed from SDL. If remove was done successfully, true is returned.
376 func (s *SdlInstance) RemoveIfAndPublish(channelsAndEvents []string, key string, data interface{}) (bool, error) {
377 if len(channelsAndEvents) == 0 {
378 return s.DelIE(s.nsPrefix+key, data)
380 if err := s.checkChannelsAndEvents("RemoveIfAndPublish", channelsAndEvents); err != nil {
383 channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
384 return s.DelIEPub(channelsAndEventsPrepared, s.nsPrefix+key, data)
387 //RemoveIf removes data from SDL conditionally. If existing data matches given data,
388 //key and data are removed from SDL. If remove was done successfully, true is returned.
389 func (s *SdlInstance) RemoveIf(key string, data interface{}) (bool, error) {
390 status, err := s.DelIE(s.nsPrefix+key, data)
397 //GetAll returns all keys under the namespace. No prior knowledge about the keys in the
398 //given namespace exists, thus operation is not guaranteed to be atomic or isolated.
399 func (s *SdlInstance) GetAll() ([]string, error) {
400 keys, err := s.Keys(s.nsPrefix + "*")
405 for _, v := range keys {
406 retVal = append(retVal, strings.Split(v, s.nsPrefix)[1])
411 //RemoveAll removes all keys under the namespace. Remove operation is not atomic, thus
412 //it is not guaranteed that all keys are removed.
413 func (s *SdlInstance) RemoveAll() error {
414 keys, err := s.Keys(s.nsPrefix + "*")
418 if (keys != nil) && (len(keys) != 0) {
424 //RemoveAllAndPublish removes all keys under the namespace and if successfull, it
425 //will publish an event to given channel. This operation is not atomic, thus it is
426 //not guaranteed that all keys are removed.
427 func (s *SdlInstance) RemoveAllAndPublish(channelsAndEvents []string) error {
428 keys, err := s.Keys(s.nsPrefix + "*")
432 if (keys != nil) && (len(keys) != 0) {
433 if len(channelsAndEvents) == 0 {
436 if err := s.checkChannelsAndEvents("RemoveAllAndPublish", channelsAndEvents); err != nil {
439 channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
440 err = s.DelMPub(channelsAndEventsPrepared, keys)
445 //AddMember adds a new members to a group.
447 //SDL groups are unordered collections of members where each member is
448 //unique. It is possible to add the same member several times without the
449 //need to check if it already exists.
450 func (s *SdlInstance) AddMember(group string, member ...interface{}) error {
451 return s.SAdd(s.nsPrefix+group, member...)
454 //RemoveMember removes members from a group.
455 func (s *SdlInstance) RemoveMember(group string, member ...interface{}) error {
456 return s.SRem(s.nsPrefix+group, member...)
459 //RemoveGroup removes the whole group along with it's members.
460 func (s *SdlInstance) RemoveGroup(group string) error {
461 return s.Del([]string{s.nsPrefix + group})
464 //GetMembers returns all the members from a group.
465 func (s *SdlInstance) GetMembers(group string) ([]string, error) {
466 retVal, err := s.SMembers(s.nsPrefix + group)
468 return []string{}, err
473 //IsMember returns true if given member is found from a group.
474 func (s *SdlInstance) IsMember(group string, member interface{}) (bool, error) {
475 retVal, err := s.SIsMember(s.nsPrefix+group, member)
482 //GroupSize returns the number of members in a group.
483 func (s *SdlInstance) GroupSize(group string) (int64, error) {
484 retVal, err := s.SCard(s.nsPrefix + group)
491 func (s *SdlInstance) randomToken() (string, error) {
493 defer s.mutex.Unlock()
496 s.tmp = make([]byte, 16)
499 if _, err := io.ReadFull(rand.Reader, s.tmp); err != nil {
503 return base64.RawURLEncoding.EncodeToString(s.tmp), nil
506 //LockResource function is used for locking a resource. The resource lock in
507 //practice is a key with random value that is set to expire after a time
508 //period. The value written to key is a random value, thus only the instance
509 //created a lock, can release it. Resource locks are per namespace.
510 func (s *SdlInstance) LockResource(resource string, expiration time.Duration, opt *Options) (*Lock, error) {
511 value, err := s.randomToken()
516 var retryTimer *time.Timer
517 for i, attempts := 0, opt.getRetryCount()+1; i < attempts; i++ {
518 ok, err := s.SetNX(s.nsPrefix+resource, value, expiration)
522 return &Lock{s: s, key: resource, value: value}, nil
524 if retryTimer == nil {
525 retryTimer = time.NewTimer(opt.getRetryWait())
526 defer retryTimer.Stop()
528 retryTimer.Reset(opt.getRetryWait())
535 return nil, errors.New("Lock not obtained")
538 //ReleaseResource removes the lock from a resource. If lock is already
539 //expired or some other instance is keeping the lock (lock taken after expiration),
540 //an error is returned.
541 func (l *Lock) ReleaseResource() error {
542 ok, err := l.s.DelIE(l.s.nsPrefix+l.key, l.value)
548 return errors.New("Lock not held")
553 //RefreshResource function can be used to set a new expiration time for the
554 //resource lock (if the lock still exists). The old remaining expiration
555 //time is overwritten with the given new expiration time.
556 func (l *Lock) RefreshResource(expiration time.Duration) error {
557 err := l.s.PExpireIE(l.s.nsPrefix+l.key, l.value, expiration)
561 //CheckResource returns the expiration time left for a resource.
562 //If the resource doesn't exist, -2 is returned.
563 func (s *SdlInstance) CheckResource(resource string) (time.Duration, error) {
564 result, err := s.PTTL(s.nsPrefix + resource)
568 if result == time.Duration(-1) {
569 return 0, errors.New("invalid resource given, no expiration time attached")
574 //Options struct defines the behaviour for getting the resource lock.
575 type Options struct {
576 //The number of time the lock will be tried.
577 //Default: 0 = no retry
580 //Wait between the retries.
582 RetryWait time.Duration
585 func (o *Options) getRetryCount() int {
586 if o != nil && o.RetryCount > 0 {
592 func (o *Options) getRetryWait() time.Duration {
593 if o != nil && o.RetryWait > 0 {
596 return 100 * time.Millisecond
599 //Lock struct identifies the resource lock instance. Releasing and adjusting the
600 //expirations are done using the methods defined for this struct.
607 type iDatabase interface {
608 SubscribeChannelDB(cb func(string, ...string), channelPrefix, eventSeparator string, channels ...string)
609 UnsubscribeChannelDB(channels ...string)
610 MSet(pairs ...interface{}) error
611 MSetMPub(channelsAndEvents []string, pairs ...interface{}) error
612 MGet(keys []string) ([]interface{}, error)
614 Del(keys []string) error
615 DelMPub(channelsAndEvents []string, keys []string) error
616 Keys(key string) ([]string, error)
617 SetIE(key string, oldData, newData interface{}) (bool, error)
618 SetIEPub(channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error)
619 SetNX(key string, data interface{}, expiration time.Duration) (bool, error)
620 SetNXPub(channelsAndEvents []string, key string, data interface{}) (bool, error)
621 DelIE(key string, data interface{}) (bool, error)
622 DelIEPub(channelsAndEvents []string, key string, data interface{}) (bool, error)
623 SAdd(key string, data ...interface{}) error
624 SRem(key string, data ...interface{}) error
625 SMembers(key string) ([]string, error)
626 SIsMember(key string, data interface{}) (bool, error)
627 SCard(key string) (int64, error)
628 PTTL(key string) (time.Duration, error)
629 PExpireIE(key string, data interface{}, expiration time.Duration) error