2 Copyright (c) 2021 AT&T Intellectual Property.
3 Copyright (c) 2018-2021 Nokia.
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
9 http://www.apache.org/licenses/LICENSE-2.0
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
19 * This source code is part of the near-RT RIC (RAN Intelligent Controller)
20 * platform project (RICP).
30 "gerrit.o-ran-sc.org/r/ric-plt/sdlgo/internal/sdlgoredis"
39 //SyncStorage provides multi-namespace APIs to read, write and modify key-value
40 //pairs. key-values are belonging to a namespace and SyncStorage provides APIs
41 //where namespace can be given in every API call. This means that with
42 //SyncStorage you can easily set key-values under different namespace compared to
43 //SdlInstance where namespace can be defined only at SdlInstance instance creation
45 type SyncStorage struct {
51 //NewSyncStorage creates a new sdl instance.
52 //The database used as a backend is given as a parameter
53 func NewSyncStorage() *SyncStorage {
54 return newSyncStorage(NewDatabase())
57 func newSyncStorage(db *Database) *SyncStorage {
63 //selectDbInstance Selects DB instance what provides DB services for the namespace
64 func (s *SyncStorage) getDbBackend(ns string) iDatabase {
65 instanceCount := uint32(len(s.db.instances))
66 instanceID := getHash(ns) % instanceCount
67 return s.db.instances[instanceID]
70 //getHash Returns hash value calculated from the string
71 func getHash(s string) uint32 {
72 tbl := crc32.MakeTable(crc32.IEEE)
73 return crc32.Checksum([]byte(s), tbl)
76 //SubscribeChannel lets you to subscribe for a events on a given channels.
77 //SDL notifications are events that are published on a specific channels.
78 //Both the channel and events are defined by the entity that is publishing
79 //the events under given namespace.
81 //When subscribing for a channel, a callback function is given as a parameter.
82 //Whenever a notification is received from a channel, this callback is called
83 //with channel and notifications as parameter (several notifications could be
84 //packed to a single callback function call). A call to SubscribeChannel function
85 //returns immediatelly, callbacks will be called asyncronously.
87 //It is possible to subscribe to different channels using different callbacks. In
88 //this case simply use SubscribeChannel function separately for each channel.
90 //When receiving events in callback routine, it is a good practive to return from
91 //callback as quickly as possible. E.g. reading in callback context should be avoided
92 //and using of Go signals is recommended. Also it should be noted that in case of several
93 //events received from different channels, callbacks are called in series one by one.
94 func (s *SyncStorage) SubscribeChannel(ns string, cb func(string, ...string), channels ...string) error {
95 nsPrefix := getNsPrefix(ns)
96 s.getDbBackend(ns).SubscribeChannelDB(cb, s.setNamespaceToChannels(nsPrefix, channels...)...)
100 //UnsubscribeChannel removes subscription from one or several channels under given
102 func (s *SyncStorage) UnsubscribeChannel(ns string, channels ...string) error {
103 nsPrefix := getNsPrefix(ns)
104 s.getDbBackend(ns).UnsubscribeChannelDB(s.setNamespaceToChannels(nsPrefix, channels...)...)
108 //Close connection to backend database.
109 func (s *SyncStorage) Close() error {
111 for _, db := range s.db.instances {
112 if err := db.CloseDB(); err != nil {
119 func (s *SyncStorage) checkChannelsAndEvents(cmd string, channelsAndEvents []string) error {
120 if len(channelsAndEvents)%2 != 0 {
121 return fmt.Errorf("%s: Channels and events must be given as pairs", cmd)
123 for i, v := range channelsAndEvents {
125 if strings.Contains(v, sdlgoredis.EventSeparator) {
126 return fmt.Errorf("%s: event %s contains illegal substring (\"%s\")", cmd, v, sdlgoredis.EventSeparator)
133 func (s *SyncStorage) setNamespaceToChannels(nsPrefix string, channels ...string) []string {
135 for _, v := range channels {
136 retVal = append(retVal, nsPrefix+v)
141 func (s *SyncStorage) setNamespaceToKeys(nsPrefix string, pairs ...interface{}) ([]interface{}, error) {
142 retVal := make([]interface{}, 0)
144 for _, v := range pairs {
145 reflectType := reflect.TypeOf(v)
146 switch reflectType.Kind() {
148 x := reflect.ValueOf(v).MapRange()
150 retVal = append(retVal, nsPrefix+x.Key().Interface().(string))
151 retVal = append(retVal, x.Value().Interface())
155 x := reflect.ValueOf(v)
157 return []interface{}{}, errors.New("Key/value pairs doesn't match")
159 for i2 := 0; i2 < x.Len(); i2++ {
161 retVal = append(retVal, nsPrefix+x.Index(i2).Interface().(string))
163 retVal = append(retVal, x.Index(i2).Interface())
167 if reflectType.Elem().Kind() == reflect.Uint8 {
168 retVal = append(retVal, v)
171 return []interface{}{}, errors.New("Key/value pairs doesn't match")
176 x := reflect.ValueOf(v)
178 return []interface{}{}, errors.New("Key/value pairs doesn't match")
180 for i2 := 0; i2 < x.Len(); i2++ {
182 retVal = append(retVal, nsPrefix+x.Index(i2).Interface().(string))
184 retVal = append(retVal, x.Index(i2).Interface())
188 if reflectType.Elem().Kind() == reflect.Uint8 {
189 retVal = append(retVal, v)
192 return []interface{}{}, errors.New("Key/value pairs doesn't match")
197 retVal = append(retVal, nsPrefix+v.(string))
200 retVal = append(retVal, v)
205 if len(retVal)%2 != 0 {
206 return []interface{}{}, errors.New("Key/value pairs doesn't match")
211 func (s *SyncStorage) prepareChannelsAndEvents(nsPrefix string, channelsAndEvents []string) []string {
212 channelEventMap := make(map[string]string)
213 for i, v := range channelsAndEvents {
217 _, exists := channelEventMap[v]
219 channelEventMap[v] = channelEventMap[v] + sdlgoredis.EventSeparator + channelsAndEvents[i+1]
221 channelEventMap[v] = channelsAndEvents[i+1]
224 retVal := make([]string, 0)
225 for k, v := range channelEventMap {
226 retVal = append(retVal, nsPrefix+k)
227 retVal = append(retVal, v)
232 //SetAndPublish function writes data to shared data layer storage and sends an event to
233 //a channel. Writing is done atomically, i.e. all succeeds or fails.
234 //Data is written under the namespace what is given as a parameter for this function.
235 //Data to be written is given as key-value pairs. Several key-value
236 //pairs can be written with one call.
237 //The key is expected to be string whereas value can be anything, string,
238 //number, slice array or map
240 //If data was set successfully, an event is sent to a channel.
241 //Channels and events are given as pairs is channelsAndEvents parameter.
242 //It is possible to send several events to several channels by giving several
243 //channel-event pairs.
244 // E.g. []{"channel1", "event1", "channel2", "event2", "channel1", "event3"}
245 //will send event1 and event3 to channel1 and event2 to channel2.
246 func (s *SyncStorage) SetAndPublish(ns string, channelsAndEvents []string, pairs ...interface{}) error {
247 nsPrefix := getNsPrefix(ns)
248 keyAndData, err := s.setNamespaceToKeys(nsPrefix, pairs...)
252 if len(channelsAndEvents) == 0 {
253 return s.getDbBackend(ns).MSet(keyAndData...)
255 if err := s.checkChannelsAndEvents("SetAndPublish", channelsAndEvents); err != nil {
258 channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
259 return s.getDbBackend(ns).MSetMPub(channelsAndEventsPrepared, keyAndData...)
262 //Set function writes data to shared data layer storage. Writing is done
263 //atomically, i.e. all succeeds or fails.
264 //Data is written under the namespace what is given as a parameter for this function.
265 //Data to be written is given as key-value pairs. Several key-value
266 //pairs can be written with one call.
267 //The key is expected to be string whereas value can be anything, string,
268 //number, slice array or map
269 func (s *SyncStorage) Set(ns string, pairs ...interface{}) error {
274 keyAndData, err := s.setNamespaceToKeys(getNsPrefix(ns), pairs...)
278 return s.getDbBackend(ns).MSet(keyAndData...)
281 //Get function atomically reads one or more keys from SDL. The returned map has the
282 //requested keys as index and data as value. If the requested key is not found
283 //from SDL, it's value is nil
284 //Read operation is targeted to the namespace what is given as a parameter for this
286 func (s *SyncStorage) Get(ns string, keys []string) (map[string]interface{}, error) {
287 m := make(map[string]interface{})
292 var keysWithNs []string
293 for _, v := range keys {
294 keysWithNs = append(keysWithNs, getNsPrefix(ns)+v)
296 val, err := s.getDbBackend(ns).MGet(keysWithNs)
300 for i, v := range val {
306 //SetIfAndPublish atomically replaces existing data with newData in SDL if data matches the oldData.
307 //If replace was done successfully, true will be returned. Also, if publishing was successfull, an event
308 //is published to a given channel.
309 //Data is written under the namespace what is given as a parameter for this function.
310 func (s *SyncStorage) SetIfAndPublish(ns string, channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error) {
311 nsPrefix := getNsPrefix(ns)
312 if len(channelsAndEvents) == 0 {
313 return s.getDbBackend(ns).SetIE(nsPrefix+key, oldData, newData)
315 if err := s.checkChannelsAndEvents("SetIfAndPublish", channelsAndEvents); err != nil {
318 channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
319 return s.getDbBackend(ns).SetIEPub(channelsAndEventsPrepared, nsPrefix+key, oldData, newData)
322 //SetIf atomically replaces existing data with newData in SDL if data matches the oldData.
323 //If replace was done successfully, true will be returned.
324 //Data is written under the namespace what is given as a parameter for this function.
325 func (s *SyncStorage) SetIf(ns string, key string, oldData, newData interface{}) (bool, error) {
326 return s.getDbBackend(ns).SetIE(getNsPrefix(ns)+key, oldData, newData)
329 //SetIfNotExistsAndPublish conditionally sets the value of a key. If key already exists in SDL,
330 //then it's value is not changed. Checking the key existence and potential set operation
331 //is done atomically. If the set operation was done successfully, an event is published to a
333 //Data is written under the namespace what is given as a parameter for this function.
334 func (s *SyncStorage) SetIfNotExistsAndPublish(ns string, channelsAndEvents []string, key string, data interface{}) (bool, error) {
335 nsPrefix := getNsPrefix(ns)
336 if len(channelsAndEvents) == 0 {
337 return s.getDbBackend(ns).SetNX(nsPrefix+key, data, 0)
339 if err := s.checkChannelsAndEvents("SetIfNotExistsAndPublish", channelsAndEvents); err != nil {
342 channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
343 return s.getDbBackend(ns).SetNXPub(channelsAndEventsPrepared, nsPrefix+key, data)
346 //SetIfNotExists conditionally sets the value of a key. If key already exists in SDL,
347 //then it's value is not changed. Checking the key existence and potential set operation
348 //is done atomically.
349 //Data is written under the namespace what is given as a parameter for this function.
350 func (s *SyncStorage) SetIfNotExists(ns string, key string, data interface{}) (bool, error) {
351 return s.getDbBackend(ns).SetNX(getNsPrefix(ns)+key, data, 0)
354 //RemoveAndPublish removes data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
355 //Trying to remove a nonexisting key is not considered as an error.
356 //An event is published into a given channel if remove operation is successfull and
357 //at least one key is removed (if several keys given). If the given key(s) doesn't exist
358 //when trying to remove, no event is published.
359 //Data is removed under the namespace what is given as a parameter for this function.
360 func (s *SyncStorage) RemoveAndPublish(ns string, channelsAndEvents []string, keys []string) error {
365 var keysWithNs []string
366 nsPrefix := getNsPrefix(ns)
367 for _, v := range keys {
368 keysWithNs = append(keysWithNs, nsPrefix+v)
370 if len(channelsAndEvents) == 0 {
371 return s.getDbBackend(ns).Del(keysWithNs)
373 if err := s.checkChannelsAndEvents("RemoveAndPublish", channelsAndEvents); err != nil {
376 channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
377 return s.getDbBackend(ns).DelMPub(channelsAndEventsPrepared, keysWithNs)
380 //Remove data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
381 //Data is removed under the namespace what is given as a parameter for this function.
382 func (s *SyncStorage) Remove(ns string, keys []string) error {
387 var keysWithNs []string
388 for _, v := range keys {
389 keysWithNs = append(keysWithNs, getNsPrefix(ns)+v)
391 err := s.getDbBackend(ns).Del(keysWithNs)
395 //RemoveIfAndPublish removes data from SDL conditionally and if remove was done successfully,
396 //a given event is published to channel. If existing data matches given data,
397 //key and data are removed from SDL. If remove was done successfully, true is returned.
398 //Data is removed under the namespace what is given as a parameter for this function.
399 func (s *SyncStorage) RemoveIfAndPublish(ns string, channelsAndEvents []string, key string, data interface{}) (bool, error) {
400 nsPrefix := getNsPrefix(ns)
401 if len(channelsAndEvents) == 0 {
402 return s.getDbBackend(ns).DelIE(nsPrefix+key, data)
404 if err := s.checkChannelsAndEvents("RemoveIfAndPublish", channelsAndEvents); err != nil {
407 channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
408 return s.getDbBackend(ns).DelIEPub(channelsAndEventsPrepared, nsPrefix+key, data)
411 //RemoveIf removes data from SDL conditionally. If existing data matches given data,
412 //key and data are removed from SDL. If remove was done successfully, true is returned.
413 //Data is removed under the namespace what is given as a parameter for this function.
414 func (s *SyncStorage) RemoveIf(ns string, key string, data interface{}) (bool, error) {
415 status, err := s.getDbBackend(ns).DelIE(getNsPrefix(ns)+key, data)
422 //GetAll returns all keys under the namespace. No prior knowledge about the keys in the
423 //given namespace exists, thus operation is not guaranteed to be atomic or isolated.
424 func (s *SyncStorage) GetAll(ns string) ([]string, error) {
425 nsPrefix := getNsPrefix(ns)
426 keys, err := s.getDbBackend(ns).Keys(nsPrefix + "*")
431 for _, v := range keys {
432 retVal = append(retVal, strings.Split(v, nsPrefix)[1])
437 // ListKeys returns all keys in the given namespace matching key search pattern.
439 // Supported search glob-style patterns:
440 // h?llo matches hello, hallo and hxllo
441 // h*llo matches hllo and heeeello
442 // h[ae]llo matches hello and hallo, but not hillo
443 // h[^e]llo matches hallo, hbllo, ... but not hello
444 // h[a-b]llo matches hallo and hbllo
446 // The \ escapes character in key search pattern and those will be treated as a normal
448 // h\[?llo\* matches h[ello* and h[allo*
450 // No prior knowledge about the keys in the given namespace exists,
451 // thus operation is not guaranteed to be atomic or isolated.
452 func (s *SyncStorage) ListKeys(ns string, pattern string) ([]string, error) {
453 nsPrefix := getNsPrefix(ns)
454 nsKeys, err := s.getDbBackend(ns).Keys(nsPrefix + pattern)
459 for _, key := range nsKeys {
460 keys = append(keys, strings.Split(key, nsPrefix)[1])
465 //RemoveAll removes all keys under the namespace. Remove operation is not atomic, thus
466 //it is not guaranteed that all keys are removed.
467 func (s *SyncStorage) RemoveAll(ns string) error {
468 keys, err := s.getDbBackend(ns).Keys(getNsPrefix(ns) + "*")
472 if (keys != nil) && (len(keys) != 0) {
473 err = s.getDbBackend(ns).Del(keys)
478 //RemoveAllAndPublish removes all keys under the namespace and if successfull, it
479 //will publish an event to given channel. This operation is not atomic, thus it is
480 //not guaranteed that all keys are removed.
481 func (s *SyncStorage) RemoveAllAndPublish(ns string, channelsAndEvents []string) error {
482 nsPrefix := getNsPrefix(ns)
483 keys, err := s.getDbBackend(ns).Keys(nsPrefix + "*")
487 if (keys != nil) && (len(keys) != 0) {
488 if len(channelsAndEvents) == 0 {
489 return s.getDbBackend(ns).Del(keys)
491 if err := s.checkChannelsAndEvents("RemoveAllAndPublish", channelsAndEvents); err != nil {
494 channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
495 err = s.getDbBackend(ns).DelMPub(channelsAndEventsPrepared, keys)
500 //AddMember adds a new members to a group under given namespace.
502 //SDL groups are unordered collections of members where each member is
503 //unique. It is possible to add the same member several times without the
504 //need to check if it already exists.
505 func (s *SyncStorage) AddMember(ns string, group string, member ...interface{}) error {
506 return s.getDbBackend(ns).SAdd(getNsPrefix(ns)+group, member...)
509 //RemoveMember removes members from a group under given namespace.
510 func (s *SyncStorage) RemoveMember(ns string, group string, member ...interface{}) error {
511 return s.getDbBackend(ns).SRem(getNsPrefix(ns)+group, member...)
514 //RemoveGroup removes the whole group along with it's members under given namespace.
515 func (s *SyncStorage) RemoveGroup(ns string, group string) error {
516 return s.getDbBackend(ns).Del([]string{getNsPrefix(ns) + group})
519 //GetMembers returns all the members from a group under given namespace.
520 func (s *SyncStorage) GetMembers(ns string, group string) ([]string, error) {
521 retVal, err := s.getDbBackend(ns).SMembers(getNsPrefix(ns) + group)
523 return []string{}, err
528 //IsMember returns true if given member is found from a group under given namespace.
529 func (s *SyncStorage) IsMember(ns string, group string, member interface{}) (bool, error) {
530 retVal, err := s.getDbBackend(ns).SIsMember(getNsPrefix(ns)+group, member)
537 //GroupSize returns the number of members in a group under given namespace.
538 func (s *SyncStorage) GroupSize(ns string, group string) (int64, error) {
539 retVal, err := s.getDbBackend(ns).SCard(getNsPrefix(ns) + group)
546 func (s *SyncStorage) randomToken() (string, error) {
548 defer s.mutex.Unlock()
551 s.tmp = make([]byte, 16)
554 if _, err := io.ReadFull(rand.Reader, s.tmp); err != nil {
558 return base64.RawURLEncoding.EncodeToString(s.tmp), nil
561 //LockResource function is used for locking a resource under given namespace.
562 //The resource lock in practice is a key with random value that is set to expire
563 //after a time period. The value written to key is a random value, thus only the
564 //instance created a lock, can release it. Resource locks are per namespace.
565 func (s *SyncStorage) LockResource(ns string, resource string, expiration time.Duration, opt *Options) (*SyncStorageLock, error) {
566 value, err := s.randomToken()
571 var retryTimer *time.Timer
572 for i, attempts := 0, opt.getRetryCount()+1; i < attempts; i++ {
573 ok, err := s.getDbBackend(ns).SetNX(getNsPrefix(ns)+resource, value, expiration)
577 return &SyncStorageLock{s: s, key: resource, value: value}, nil
579 if retryTimer == nil {
580 retryTimer = time.NewTimer(opt.getRetryWait())
581 defer retryTimer.Stop()
583 retryTimer.Reset(opt.getRetryWait())
590 return nil, errors.New("Lock not obtained")
593 //ReleaseResource removes the lock from a resource under given namespace. If lock
594 //is already expired or some other instance is keeping the lock (lock taken after
595 //expiration), an error is returned.
596 func (l *SyncStorageLock) ReleaseResource(ns string) error {
597 ok, err := l.s.getDbBackend(ns).DelIE(getNsPrefix(ns)+l.key, l.value)
603 return errors.New("Lock not held")
608 //RefreshResource function can be used to set a new expiration time for the
609 //resource lock (if the lock still exists) under given namespace. The old
610 //remaining expiration time is overwritten with the given new expiration time.
611 func (l *SyncStorageLock) RefreshResource(ns string, expiration time.Duration) error {
612 err := l.s.getDbBackend(ns).PExpireIE(getNsPrefix(ns)+l.key, l.value, expiration)
616 //CheckResource returns the expiration time left for a resource under given
617 //namespace. If the resource doesn't exist, -2 is returned.
618 func (s *SyncStorage) CheckResource(ns string, resource string) (time.Duration, error) {
619 result, err := s.getDbBackend(ns).PTTL(getNsPrefix(ns) + resource)
623 if result == time.Duration(-1) {
624 return 0, errors.New("invalid resource given, no expiration time attached")
629 //SyncStorageLock struct identifies the resource lock instance. Releasing and adjusting the
630 //expirations are done using the methods defined for this struct.
631 type SyncStorageLock struct {
637 func getNsPrefix(ns string) string {
638 return "{" + ns + "}" + sdlgoredis.NsSeparator
641 type iDatabase interface {
642 SubscribeChannelDB(cb func(string, ...string), channels ...string)
643 UnsubscribeChannelDB(channels ...string)
644 MSet(pairs ...interface{}) error
645 MSetMPub(channelsAndEvents []string, pairs ...interface{}) error
646 MGet(keys []string) ([]interface{}, error)
648 Del(keys []string) error
649 DelMPub(channelsAndEvents []string, keys []string) error
650 Keys(key string) ([]string, error)
651 SetIE(key string, oldData, newData interface{}) (bool, error)
652 SetIEPub(channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error)
653 SetNX(key string, data interface{}, expiration time.Duration) (bool, error)
654 SetNXPub(channelsAndEvents []string, key string, data interface{}) (bool, error)
655 DelIE(key string, data interface{}) (bool, error)
656 DelIEPub(channelsAndEvents []string, key string, data interface{}) (bool, error)
657 SAdd(key string, data ...interface{}) error
658 SRem(key string, data ...interface{}) error
659 SMembers(key string) ([]string, error)
660 SIsMember(key string, data interface{}) (bool, error)
661 SCard(key string) (int64, error)
662 PTTL(key string) (time.Duration, error)
663 PExpireIE(key string, data interface{}, expiration time.Duration) error