2 Copyright (c) 2019 AT&T Intellectual Property.
3 Copyright (c) 2018-2019 Nokia.
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
9 http://www.apache.org/licenses/LICENSE-2.0
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
19 * This source code is part of the near-RT RIC (RAN Intelligent Controller)
20 * platform project (RICP).
36 "gerrit.o-ran-sc.org/r/ric-plt/sdlgo/internal/sdlgoredis"
39 //SdlInstance provides an API to read, write and modify
40 //key-value pairs in a given namespace.
41 type SdlInstance struct {
50 //Database struct is a holder for the internal database instance. Applications
51 //can use this exported data type to locally store a reference to database
52 //instance returned from NewDabase() function.
53 type Database struct {
57 //NewDatabase creates a connection to database that will be used
58 //as a backend for the key-value storage. The returned value
59 //can be reused between multiple SDL instances in which case each instance
60 //is using the same connection.
61 func NewDatabase() *Database {
63 instance: sdlgoredis.Create(),
67 //NewSdlInstance creates a new sdl instance using the given namespace.
68 //The database used as a backend is given as a parameter
69 func NewSdlInstance(NameSpace string, db *Database) *SdlInstance {
72 nsPrefix: "{" + NameSpace + "},",
73 eventSeparator: "___",
74 iDatabase: db.instance,
78 //SubscribeChannel lets you to subscribe for a events on a given channels.
79 //SDL notifications are events that are published on a specific channels.
80 //Both the channel and events are defined by the entity that is publishing
83 //When subscribing for a channel, a callback function is given as a parameter.
84 //Whenever a notification is received from a channel, this callback is called
85 //with channel and notifications as parameter (several notifications could be
86 //packed to a single callback function call). A call to SubscribeChannel function
87 //returns immediatelly, callbacks will be called asyncronously.
89 //It is possible to subscribe to different channels using different callbacks. In
90 //this case simply use SubscribeChannel function separately for each channel.
92 //When receiving events in callback routine, it is a good practive to return from
93 //callback as quickly as possible. E.g. reading in callback context should be avoided
94 //and using of Go signals is recommended. Also it should be noted that in case of several
95 //events received from different channels, callbacks are called in series one by one.
97 //This function is NOT SAFE FOR CONCURRENT USE by multiple goroutines.
98 func (s *SdlInstance) SubscribeChannel(cb func(string, ...string), channels ...string) error {
99 s.SubscribeChannelDB(cb, s.nsPrefix, s.eventSeparator, s.setNamespaceToChannels(channels...)...)
103 //UnsubscribeChannel removes subscription from one or several channels.
104 func (s *SdlInstance) UnsubscribeChannel(channels ...string) error {
105 s.UnsubscribeChannelDB(s.setNamespaceToChannels(channels...)...)
109 //Close connection to backend database.
110 func (s *SdlInstance) Close() error {
114 func (s *SdlInstance) checkChannelsAndEvents(cmd string, channelsAndEvents []string) error {
115 if len(channelsAndEvents)%2 != 0 {
116 return fmt.Errorf("%s: Channels and events must be given as pairs", cmd)
118 for i, v := range channelsAndEvents {
120 if strings.Contains(v, s.eventSeparator) {
121 return fmt.Errorf("%s: event %s contains illegal substring (\"%s\")", cmd, v, s.eventSeparator)
127 func (s *SdlInstance) setNamespaceToChannels(channels ...string) []string {
129 for _, v := range channels {
130 retVal = append(retVal, s.nsPrefix+v)
135 func (s *SdlInstance) setNamespaceToKeys(pairs ...interface{}) ([]interface{}, error) {
136 retVal := make([]interface{}, 0)
138 for _, v := range pairs {
139 reflectType := reflect.TypeOf(v)
140 switch reflectType.Kind() {
142 x := reflect.ValueOf(v).MapRange()
144 retVal = append(retVal, s.nsPrefix+x.Key().Interface().(string))
145 retVal = append(retVal, x.Value().Interface())
149 x := reflect.ValueOf(v)
151 return []interface{}{}, errors.New("Key/value pairs doesn't match")
153 for i2 := 0; i2 < x.Len(); i2++ {
155 retVal = append(retVal, s.nsPrefix+x.Index(i2).Interface().(string))
157 retVal = append(retVal, x.Index(i2).Interface())
161 if reflectType.Elem().Kind() == reflect.Uint8 {
162 retVal = append(retVal, v)
165 return []interface{}{}, errors.New("Key/value pairs doesn't match")
170 x := reflect.ValueOf(v)
172 return []interface{}{}, errors.New("Key/value pairs doesn't match")
174 for i2 := 0; i2 < x.Len(); i2++ {
176 retVal = append(retVal, s.nsPrefix+x.Index(i2).Interface().(string))
178 retVal = append(retVal, x.Index(i2).Interface())
182 if reflectType.Elem().Kind() == reflect.Uint8 {
183 retVal = append(retVal, v)
186 return []interface{}{}, errors.New("Key/value pairs doesn't match")
191 retVal = append(retVal, s.nsPrefix+v.(string))
194 retVal = append(retVal, v)
199 if len(retVal)%2 != 0 {
200 return []interface{}{}, errors.New("Key/value pairs doesn't match")
205 func (s *SdlInstance) prepareChannelsAndEvents(channelsAndEvents []string) []string {
206 channelEventMap := make(map[string]string)
207 for i, v := range channelsAndEvents {
211 _, exists := channelEventMap[v]
213 channelEventMap[v] = channelEventMap[v] + s.eventSeparator + channelsAndEvents[i+1]
215 channelEventMap[v] = channelsAndEvents[i+1]
218 retVal := make([]string, 0)
219 for k, v := range channelEventMap {
220 retVal = append(retVal, s.nsPrefix+k)
221 retVal = append(retVal, v)
226 //SetAndPublish function writes data to shared data layer storage and sends an event to
227 //a channel. Writing is done atomically, i.e. all succeeds or fails.
228 //Data to be written is given as key-value pairs. Several key-value
229 //pairs can be written with one call.
230 //The key is expected to be string whereas value can be anything, string,
231 //number, slice array or map
233 //If data was set successfully, an event is sent to a channel.
234 //Channels and events are given as pairs is channelsAndEvents parameter.
235 //It is possible to send several events to several channels by giving several
236 //channel-event pairs.
237 // E.g. []{"channel1", "event1", "channel2", "event2", "channel1", "event3"}
238 //will send event1 and event3 to channel1 and event2 to channel2.
239 func (s *SdlInstance) SetAndPublish(channelsAndEvents []string, pairs ...interface{}) error {
240 keyAndData, err := s.setNamespaceToKeys(pairs...)
244 if len(channelsAndEvents) == 0 {
245 return s.MSet(keyAndData...)
247 if err := s.checkChannelsAndEvents("SetAndPublish", channelsAndEvents); err != nil {
250 channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
251 return s.MSetMPub(channelsAndEventsPrepared, keyAndData...)
254 //Set function writes data to shared data layer storage. Writing is done
255 //atomically, i.e. all succeeds or fails.
256 //Data to be written is given as key-value pairs. Several key-value
257 //pairs can be written with one call.
258 //The key is expected to be string whereas value can be anything, string,
259 //number, slice array or map
260 func (s *SdlInstance) Set(pairs ...interface{}) error {
265 keyAndData, err := s.setNamespaceToKeys(pairs...)
269 return s.MSet(keyAndData...)
272 //Get function atomically reads one or more keys from SDL. The returned map has the
273 //requested keys as index and data as value. If the requested key is not found
274 //from SDL, it's value is nil
275 func (s *SdlInstance) Get(keys []string) (map[string]interface{}, error) {
276 m := make(map[string]interface{})
281 var keysWithNs []string
282 for _, v := range keys {
283 keysWithNs = append(keysWithNs, s.nsPrefix+v)
285 val, err := s.MGet(keysWithNs)
289 for i, v := range val {
295 //SetIfAndPublish atomically replaces existing data with newData in SDL if data matches the oldData.
296 //If replace was done successfully, true will be returned. Also, if publishing was successfull, an event
297 //is published to a given channel.
298 func (s *SdlInstance) SetIfAndPublish(channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error) {
299 if len(channelsAndEvents) == 0 {
300 return s.SetIE(s.nsPrefix+key, oldData, newData)
302 if err := s.checkChannelsAndEvents("SetIfAndPublish", channelsAndEvents); err != nil {
305 channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
306 return s.SetIEPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], s.nsPrefix+key, oldData, newData)
309 //SetIf atomically replaces existing data with newData in SDL if data matches the oldData.
310 //If replace was done successfully, true will be returned.
311 func (s *SdlInstance) SetIf(key string, oldData, newData interface{}) (bool, error) {
312 return s.SetIE(s.nsPrefix+key, oldData, newData)
315 //SetIfNotExistsAndPublish conditionally sets the value of a key. If key already exists in SDL,
316 //then it's value is not changed. Checking the key existence and potential set operation
317 //is done atomically. If the set operation was done successfully, an event is published to a
319 func (s *SdlInstance) SetIfNotExistsAndPublish(channelsAndEvents []string, key string, data interface{}) (bool, error) {
320 if len(channelsAndEvents) == 0 {
321 return s.SetNX(s.nsPrefix+key, data, 0)
323 if err := s.checkChannelsAndEvents("SetIfNotExistsAndPublish", channelsAndEvents); err != nil {
326 channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
327 return s.SetNXPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], s.nsPrefix+key, data)
330 //SetIfNotExists conditionally sets the value of a key. If key already exists in SDL,
331 //then it's value is not changed. Checking the key existence and potential set operation
332 //is done atomically.
333 func (s *SdlInstance) SetIfNotExists(key string, data interface{}) (bool, error) {
334 return s.SetNX(s.nsPrefix+key, data, 0)
337 //RemoveAndPublish removes data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
338 //Trying to remove a nonexisting key is not considered as an error.
339 //An event is published into a given channel if remove operation is successfull and
340 //at least one key is removed (if several keys given). If the given key(s) doesn't exist
341 //when trying to remove, no event is published.
342 func (s *SdlInstance) RemoveAndPublish(channelsAndEvents []string, keys []string) error {
347 var keysWithNs []string
348 for _, v := range keys {
349 keysWithNs = append(keysWithNs, s.nsPrefix+v)
351 if len(channelsAndEvents) == 0 {
352 return s.Del(keysWithNs)
354 if err := s.checkChannelsAndEvents("RemoveAndPublish", channelsAndEvents); err != nil {
357 channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
358 return s.DelMPub(channelsAndEventsPrepared, keysWithNs)
361 //Remove data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
362 func (s *SdlInstance) Remove(keys []string) error {
367 var keysWithNs []string
368 for _, v := range keys {
369 keysWithNs = append(keysWithNs, s.nsPrefix+v)
371 err := s.Del(keysWithNs)
375 //RemoveIfAndPublish removes data from SDL conditionally and if remove was done successfully,
376 //a given event is published to channel. If existing data matches given data,
377 //key and data are removed from SDL. If remove was done successfully, true is returned.
378 func (s *SdlInstance) RemoveIfAndPublish(channelsAndEvents []string, key string, data interface{}) (bool, error) {
379 if len(channelsAndEvents) == 0 {
380 return s.DelIE(s.nsPrefix+key, data)
382 if err := s.checkChannelsAndEvents("RemoveIfAndPublish", channelsAndEvents); err != nil {
385 channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
386 return s.DelIEPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], s.nsPrefix+key, data)
389 //RemoveIf removes data from SDL conditionally. If existing data matches given data,
390 //key and data are removed from SDL. If remove was done successfully, true is returned.
391 func (s *SdlInstance) RemoveIf(key string, data interface{}) (bool, error) {
392 status, err := s.DelIE(s.nsPrefix+key, data)
399 //GetAll returns all keys under the namespace. No prior knowledge about the keys in the
400 //given namespace exists, thus operation is not guaranteed to be atomic or isolated.
401 func (s *SdlInstance) GetAll() ([]string, error) {
402 keys, err := s.Keys(s.nsPrefix + "*")
407 for _, v := range keys {
408 retVal = append(retVal, strings.Split(v, s.nsPrefix)[1])
413 //RemoveAll removes all keys under the namespace. Remove operation is not atomic, thus
414 //it is not guaranteed that all keys are removed.
415 func (s *SdlInstance) RemoveAll() error {
416 keys, err := s.Keys(s.nsPrefix + "*")
420 if (keys != nil) && (len(keys) != 0) {
426 //RemoveAllAndPublish removes all keys under the namespace and if successfull, it
427 //will publish an event to given channel. This operation is not atomic, thus it is
428 //not guaranteed that all keys are removed.
429 func (s *SdlInstance) RemoveAllAndPublish(channelsAndEvents []string) error {
430 keys, err := s.Keys(s.nsPrefix + "*")
434 if (keys != nil) && (len(keys) != 0) {
435 if len(channelsAndEvents) == 0 {
438 if err := s.checkChannelsAndEvents("RemoveAllAndPublish", channelsAndEvents); err != nil {
441 channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
442 err = s.DelMPub(channelsAndEventsPrepared, keys)
447 //AddMember adds a new members to a group.
449 //SDL groups are unordered collections of members where each member is
450 //unique. It is possible to add the same member several times without the
451 //need to check if it already exists.
452 func (s *SdlInstance) AddMember(group string, member ...interface{}) error {
453 return s.SAdd(s.nsPrefix+group, member...)
456 //RemoveMember removes members from a group.
457 func (s *SdlInstance) RemoveMember(group string, member ...interface{}) error {
458 return s.SRem(s.nsPrefix+group, member...)
461 //RemoveGroup removes the whole group along with it's members.
462 func (s *SdlInstance) RemoveGroup(group string) error {
463 return s.Del([]string{s.nsPrefix + group})
466 //GetMembers returns all the members from a group.
467 func (s *SdlInstance) GetMembers(group string) ([]string, error) {
468 retVal, err := s.SMembers(s.nsPrefix + group)
470 return []string{}, err
475 //IsMember returns true if given member is found from a group.
476 func (s *SdlInstance) IsMember(group string, member interface{}) (bool, error) {
477 retVal, err := s.SIsMember(s.nsPrefix+group, member)
484 //GroupSize returns the number of members in a group.
485 func (s *SdlInstance) GroupSize(group string) (int64, error) {
486 retVal, err := s.SCard(s.nsPrefix + group)
493 func (s *SdlInstance) randomToken() (string, error) {
495 defer s.mutex.Unlock()
498 s.tmp = make([]byte, 16)
501 if _, err := io.ReadFull(rand.Reader, s.tmp); err != nil {
505 return base64.RawURLEncoding.EncodeToString(s.tmp), nil
508 //LockResource function is used for locking a resource. The resource lock in
509 //practice is a key with random value that is set to expire after a time
510 //period. The value written to key is a random value, thus only the instance
511 //created a lock, can release it. Resource locks are per namespace.
512 func (s *SdlInstance) LockResource(resource string, expiration time.Duration, opt *Options) (*Lock, error) {
513 value, err := s.randomToken()
518 var retryTimer *time.Timer
519 for i, attempts := 0, opt.getRetryCount()+1; i < attempts; i++ {
520 ok, err := s.SetNX(s.nsPrefix+resource, value, expiration)
524 return &Lock{s: s, key: resource, value: value}, nil
526 if retryTimer == nil {
527 retryTimer = time.NewTimer(opt.getRetryWait())
528 defer retryTimer.Stop()
530 retryTimer.Reset(opt.getRetryWait())
537 return nil, errors.New("Lock not obtained")
540 //ReleaseResource removes the lock from a resource. If lock is already
541 //expired or some other instance is keeping the lock (lock taken after expiration),
542 //an error is returned.
543 func (l *Lock) ReleaseResource() error {
544 ok, err := l.s.DelIE(l.s.nsPrefix+l.key, l.value)
550 return errors.New("Lock not held")
555 //RefreshResource function can be used to set a new expiration time for the
556 //resource lock (if the lock still exists). The old remaining expiration
557 //time is overwritten with the given new expiration time.
558 func (l *Lock) RefreshResource(expiration time.Duration) error {
559 err := l.s.PExpireIE(l.s.nsPrefix+l.key, l.value, expiration)
563 //CheckResource returns the expiration time left for a resource.
564 //If the resource doesn't exist, -2 is returned.
565 func (s *SdlInstance) CheckResource(resource string) (time.Duration, error) {
566 result, err := s.PTTL(s.nsPrefix + resource)
570 if result == time.Duration(-1) {
571 return 0, errors.New("invalid resource given, no expiration time attached")
576 //Options struct defines the behaviour for getting the resource lock.
577 type Options struct {
578 //The number of time the lock will be tried.
579 //Default: 0 = no retry
582 //Wait between the retries.
584 RetryWait time.Duration
587 func (o *Options) getRetryCount() int {
588 if o != nil && o.RetryCount > 0 {
594 func (o *Options) getRetryWait() time.Duration {
595 if o != nil && o.RetryWait > 0 {
598 return 100 * time.Millisecond
601 //Lock struct identifies the resource lock instance. Releasing and adjusting the
602 //expirations are done using the methods defined for this struct.
609 type iDatabase interface {
610 SubscribeChannelDB(cb func(string, ...string), channelPrefix, eventSeparator string, channels ...string)
611 UnsubscribeChannelDB(channels ...string)
612 MSet(pairs ...interface{}) error
613 MSetMPub(channelsAndEvents []string, pairs ...interface{}) error
614 MGet(keys []string) ([]interface{}, error)
616 Del(keys []string) error
617 DelMPub(channelsAndEvents []string, keys []string) error
618 Keys(key string) ([]string, error)
619 SetIE(key string, oldData, newData interface{}) (bool, error)
620 SetIEPub(channel, message, key string, oldData, newData interface{}) (bool, error)
621 SetNX(key string, data interface{}, expiration time.Duration) (bool, error)
622 SetNXPub(channel, message, key string, data interface{}) (bool, error)
623 DelIE(key string, data interface{}) (bool, error)
624 DelIEPub(channel, message, key string, data interface{}) (bool, error)
625 SAdd(key string, data ...interface{}) error
626 SRem(key string, data ...interface{}) error
627 SMembers(key string) ([]string, error)
628 SIsMember(key string, data interface{}) (bool, error)
629 SCard(key string) (int64, error)
630 PTTL(key string) (time.Duration, error)
631 PExpireIE(key string, data interface{}, expiration time.Duration) error