2 Copyright (c) 2021 AT&T Intellectual Property.
3 Copyright (c) 2018-2021 Nokia.
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
9 http://www.apache.org/licenses/LICENSE-2.0
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
19 * This source code is part of the near-RT RIC (RAN Intelligent Controller)
20 * platform project (RICP).
38 //SyncStorage provides multi-namespace APIs to read, write and modify key-value
39 //pairs. key-values are belonging to a namespace and SyncStorage provides APIs
40 //where namespace can be given in every API call. This means that with
41 //SyncStorage you can easily set key-values under different namespace compared to
42 //SdlInstance where namespace can be defined only at SdlInstance instance creation
44 type SyncStorage struct {
51 //NewSyncStorage creates a new sdl instance.
52 //The database used as a backend is given as a parameter
53 func NewSyncStorage() *SyncStorage {
54 return newSyncStorage(NewDatabase())
57 func newSyncStorage(db *Database) *SyncStorage {
59 eventSeparator: "___",
64 //selectDbInstance Selects DB instance what provides DB services for the namespace
65 func (s *SyncStorage) getDbBackend(ns string) iDatabase {
66 instanceCount := uint32(len(s.db.instances))
67 instanceID := getHash(ns) % instanceCount
68 return s.db.instances[instanceID]
71 //getHash Returns hash value calculated from the string
72 func getHash(s string) uint32 {
73 tbl := crc32.MakeTable(crc32.IEEE)
74 return crc32.Checksum([]byte(s), tbl)
77 //SubscribeChannel lets you to subscribe for a events on a given channels.
78 //SDL notifications are events that are published on a specific channels.
79 //Both the channel and events are defined by the entity that is publishing
80 //the events under given namespace.
82 //When subscribing for a channel, a callback function is given as a parameter.
83 //Whenever a notification is received from a channel, this callback is called
84 //with channel and notifications as parameter (several notifications could be
85 //packed to a single callback function call). A call to SubscribeChannel function
86 //returns immediatelly, callbacks will be called asyncronously.
88 //It is possible to subscribe to different channels using different callbacks. In
89 //this case simply use SubscribeChannel function separately for each channel.
91 //When receiving events in callback routine, it is a good practive to return from
92 //callback as quickly as possible. E.g. reading in callback context should be avoided
93 //and using of Go signals is recommended. Also it should be noted that in case of several
94 //events received from different channels, callbacks are called in series one by one.
95 func (s *SyncStorage) SubscribeChannel(ns string, cb func(string, ...string), channels ...string) error {
96 nsPrefix := getNsPrefix(ns)
97 s.getDbBackend(ns).SubscribeChannelDB(cb, nsPrefix, s.eventSeparator, s.setNamespaceToChannels(nsPrefix, channels...)...)
101 //UnsubscribeChannel removes subscription from one or several channels under given
103 func (s *SyncStorage) UnsubscribeChannel(ns string, channels ...string) error {
104 nsPrefix := getNsPrefix(ns)
105 s.getDbBackend(ns).UnsubscribeChannelDB(s.setNamespaceToChannels(nsPrefix, channels...)...)
109 //Close connection to backend database.
110 func (s *SyncStorage) Close() error {
112 for _, db := range s.db.instances {
113 if err := db.CloseDB(); err != nil {
120 func (s *SyncStorage) checkChannelsAndEvents(cmd string, channelsAndEvents []string) error {
121 if len(channelsAndEvents)%2 != 0 {
122 return fmt.Errorf("%s: Channels and events must be given as pairs", cmd)
124 for i, v := range channelsAndEvents {
126 if strings.Contains(v, s.eventSeparator) {
127 return fmt.Errorf("%s: event %s contains illegal substring (\"%s\")", cmd, v, s.eventSeparator)
134 func (s *SyncStorage) setNamespaceToChannels(nsPrefix string, channels ...string) []string {
136 for _, v := range channels {
137 retVal = append(retVal, nsPrefix+v)
142 func (s *SyncStorage) setNamespaceToKeys(nsPrefix string, pairs ...interface{}) ([]interface{}, error) {
143 retVal := make([]interface{}, 0)
145 for _, v := range pairs {
146 reflectType := reflect.TypeOf(v)
147 switch reflectType.Kind() {
149 x := reflect.ValueOf(v).MapRange()
151 retVal = append(retVal, nsPrefix+x.Key().Interface().(string))
152 retVal = append(retVal, x.Value().Interface())
156 x := reflect.ValueOf(v)
158 return []interface{}{}, errors.New("Key/value pairs doesn't match")
160 for i2 := 0; i2 < x.Len(); i2++ {
162 retVal = append(retVal, nsPrefix+x.Index(i2).Interface().(string))
164 retVal = append(retVal, x.Index(i2).Interface())
168 if reflectType.Elem().Kind() == reflect.Uint8 {
169 retVal = append(retVal, v)
172 return []interface{}{}, errors.New("Key/value pairs doesn't match")
177 x := reflect.ValueOf(v)
179 return []interface{}{}, errors.New("Key/value pairs doesn't match")
181 for i2 := 0; i2 < x.Len(); i2++ {
183 retVal = append(retVal, nsPrefix+x.Index(i2).Interface().(string))
185 retVal = append(retVal, x.Index(i2).Interface())
189 if reflectType.Elem().Kind() == reflect.Uint8 {
190 retVal = append(retVal, v)
193 return []interface{}{}, errors.New("Key/value pairs doesn't match")
198 retVal = append(retVal, nsPrefix+v.(string))
201 retVal = append(retVal, v)
206 if len(retVal)%2 != 0 {
207 return []interface{}{}, errors.New("Key/value pairs doesn't match")
212 func (s *SyncStorage) prepareChannelsAndEvents(nsPrefix string, channelsAndEvents []string) []string {
213 channelEventMap := make(map[string]string)
214 for i, v := range channelsAndEvents {
218 _, exists := channelEventMap[v]
220 channelEventMap[v] = channelEventMap[v] + s.eventSeparator + channelsAndEvents[i+1]
222 channelEventMap[v] = channelsAndEvents[i+1]
225 retVal := make([]string, 0)
226 for k, v := range channelEventMap {
227 retVal = append(retVal, nsPrefix+k)
228 retVal = append(retVal, v)
233 //SetAndPublish function writes data to shared data layer storage and sends an event to
234 //a channel. Writing is done atomically, i.e. all succeeds or fails.
235 //Data is written under the namespace what is given as a parameter for this function.
236 //Data to be written is given as key-value pairs. Several key-value
237 //pairs can be written with one call.
238 //The key is expected to be string whereas value can be anything, string,
239 //number, slice array or map
241 //If data was set successfully, an event is sent to a channel.
242 //Channels and events are given as pairs is channelsAndEvents parameter.
243 //It is possible to send several events to several channels by giving several
244 //channel-event pairs.
245 // E.g. []{"channel1", "event1", "channel2", "event2", "channel1", "event3"}
246 //will send event1 and event3 to channel1 and event2 to channel2.
247 func (s *SyncStorage) SetAndPublish(ns string, channelsAndEvents []string, pairs ...interface{}) error {
248 nsPrefix := getNsPrefix(ns)
249 keyAndData, err := s.setNamespaceToKeys(nsPrefix, pairs...)
253 if len(channelsAndEvents) == 0 {
254 return s.getDbBackend(ns).MSet(keyAndData...)
256 if err := s.checkChannelsAndEvents("SetAndPublish", channelsAndEvents); err != nil {
259 channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
260 return s.getDbBackend(ns).MSetMPub(channelsAndEventsPrepared, keyAndData...)
263 //Set function writes data to shared data layer storage. Writing is done
264 //atomically, i.e. all succeeds or fails.
265 //Data is written under the namespace what is given as a parameter for this function.
266 //Data to be written is given as key-value pairs. Several key-value
267 //pairs can be written with one call.
268 //The key is expected to be string whereas value can be anything, string,
269 //number, slice array or map
270 func (s *SyncStorage) Set(ns string, pairs ...interface{}) error {
275 keyAndData, err := s.setNamespaceToKeys(getNsPrefix(ns), pairs...)
279 return s.getDbBackend(ns).MSet(keyAndData...)
282 //Get function atomically reads one or more keys from SDL. The returned map has the
283 //requested keys as index and data as value. If the requested key is not found
284 //from SDL, it's value is nil
285 //Read operation is targeted to the namespace what is given as a parameter for this
287 func (s *SyncStorage) Get(ns string, keys []string) (map[string]interface{}, error) {
288 m := make(map[string]interface{})
293 var keysWithNs []string
294 for _, v := range keys {
295 keysWithNs = append(keysWithNs, getNsPrefix(ns)+v)
297 val, err := s.getDbBackend(ns).MGet(keysWithNs)
301 for i, v := range val {
307 //SetIfAndPublish atomically replaces existing data with newData in SDL if data matches the oldData.
308 //If replace was done successfully, true will be returned. Also, if publishing was successfull, an event
309 //is published to a given channel.
310 //Data is written under the namespace what is given as a parameter for this function.
311 func (s *SyncStorage) SetIfAndPublish(ns string, channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error) {
312 nsPrefix := getNsPrefix(ns)
313 if len(channelsAndEvents) == 0 {
314 return s.getDbBackend(ns).SetIE(nsPrefix+key, oldData, newData)
316 if err := s.checkChannelsAndEvents("SetIfAndPublish", channelsAndEvents); err != nil {
319 channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
320 return s.getDbBackend(ns).SetIEPub(channelsAndEventsPrepared, nsPrefix+key, oldData, newData)
323 //SetIf atomically replaces existing data with newData in SDL if data matches the oldData.
324 //If replace was done successfully, true will be returned.
325 //Data is written under the namespace what is given as a parameter for this function.
326 func (s *SyncStorage) SetIf(ns string, key string, oldData, newData interface{}) (bool, error) {
327 return s.getDbBackend(ns).SetIE(getNsPrefix(ns)+key, oldData, newData)
330 //SetIfNotExistsAndPublish conditionally sets the value of a key. If key already exists in SDL,
331 //then it's value is not changed. Checking the key existence and potential set operation
332 //is done atomically. If the set operation was done successfully, an event is published to a
334 //Data is written under the namespace what is given as a parameter for this function.
335 func (s *SyncStorage) SetIfNotExistsAndPublish(ns string, channelsAndEvents []string, key string, data interface{}) (bool, error) {
336 nsPrefix := getNsPrefix(ns)
337 if len(channelsAndEvents) == 0 {
338 return s.getDbBackend(ns).SetNX(nsPrefix+key, data, 0)
340 if err := s.checkChannelsAndEvents("SetIfNotExistsAndPublish", channelsAndEvents); err != nil {
343 channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
344 return s.getDbBackend(ns).SetNXPub(channelsAndEventsPrepared, nsPrefix+key, data)
347 //SetIfNotExists conditionally sets the value of a key. If key already exists in SDL,
348 //then it's value is not changed. Checking the key existence and potential set operation
349 //is done atomically.
350 //Data is written under the namespace what is given as a parameter for this function.
351 func (s *SyncStorage) SetIfNotExists(ns string, key string, data interface{}) (bool, error) {
352 return s.getDbBackend(ns).SetNX(getNsPrefix(ns)+key, data, 0)
355 //RemoveAndPublish removes data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
356 //Trying to remove a nonexisting key is not considered as an error.
357 //An event is published into a given channel if remove operation is successfull and
358 //at least one key is removed (if several keys given). If the given key(s) doesn't exist
359 //when trying to remove, no event is published.
360 //Data is removed under the namespace what is given as a parameter for this function.
361 func (s *SyncStorage) RemoveAndPublish(ns string, channelsAndEvents []string, keys []string) error {
366 var keysWithNs []string
367 nsPrefix := getNsPrefix(ns)
368 for _, v := range keys {
369 keysWithNs = append(keysWithNs, nsPrefix+v)
371 if len(channelsAndEvents) == 0 {
372 return s.getDbBackend(ns).Del(keysWithNs)
374 if err := s.checkChannelsAndEvents("RemoveAndPublish", channelsAndEvents); err != nil {
377 channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
378 return s.getDbBackend(ns).DelMPub(channelsAndEventsPrepared, keysWithNs)
381 //Remove data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
382 //Data is removed under the namespace what is given as a parameter for this function.
383 func (s *SyncStorage) Remove(ns string, keys []string) error {
388 var keysWithNs []string
389 for _, v := range keys {
390 keysWithNs = append(keysWithNs, getNsPrefix(ns)+v)
392 err := s.getDbBackend(ns).Del(keysWithNs)
396 //RemoveIfAndPublish removes data from SDL conditionally and if remove was done successfully,
397 //a given event is published to channel. If existing data matches given data,
398 //key and data are removed from SDL. If remove was done successfully, true is returned.
399 //Data is removed under the namespace what is given as a parameter for this function.
400 func (s *SyncStorage) RemoveIfAndPublish(ns string, channelsAndEvents []string, key string, data interface{}) (bool, error) {
401 nsPrefix := getNsPrefix(ns)
402 if len(channelsAndEvents) == 0 {
403 return s.getDbBackend(ns).DelIE(nsPrefix+key, data)
405 if err := s.checkChannelsAndEvents("RemoveIfAndPublish", channelsAndEvents); err != nil {
408 channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
409 return s.getDbBackend(ns).DelIEPub(channelsAndEventsPrepared, nsPrefix+key, data)
412 //RemoveIf removes data from SDL conditionally. If existing data matches given data,
413 //key and data are removed from SDL. If remove was done successfully, true is returned.
414 //Data is removed under the namespace what is given as a parameter for this function.
415 func (s *SyncStorage) RemoveIf(ns string, key string, data interface{}) (bool, error) {
416 status, err := s.getDbBackend(ns).DelIE(getNsPrefix(ns)+key, data)
423 //GetAll returns all keys under the namespace. No prior knowledge about the keys in the
424 //given namespace exists, thus operation is not guaranteed to be atomic or isolated.
425 func (s *SyncStorage) GetAll(ns string) ([]string, error) {
426 nsPrefix := getNsPrefix(ns)
427 keys, err := s.getDbBackend(ns).Keys(nsPrefix + "*")
432 for _, v := range keys {
433 retVal = append(retVal, strings.Split(v, nsPrefix)[1])
438 // ListKeys returns all keys in the given namespace matching key search pattern.
440 // Supported search glob-style patterns:
441 // h?llo matches hello, hallo and hxllo
442 // h*llo matches hllo and heeeello
443 // h[ae]llo matches hello and hallo, but not hillo
444 // h[^e]llo matches hallo, hbllo, ... but not hello
445 // h[a-b]llo matches hallo and hbllo
447 // The \ escapes character in key search pattern and those will be treated as a normal
449 // h\[?llo\* matches h[ello* and h[allo*
451 // No prior knowledge about the keys in the given namespace exists,
452 // thus operation is not guaranteed to be atomic or isolated.
453 func (s *SyncStorage) ListKeys(ns string, pattern string) ([]string, error) {
454 nsPrefix := getNsPrefix(ns)
455 nsKeys, err := s.getDbBackend(ns).Keys(nsPrefix + pattern)
460 for _, key := range nsKeys {
461 keys = append(keys, strings.Split(key, nsPrefix)[1])
466 //RemoveAll removes all keys under the namespace. Remove operation is not atomic, thus
467 //it is not guaranteed that all keys are removed.
468 func (s *SyncStorage) RemoveAll(ns string) error {
469 keys, err := s.getDbBackend(ns).Keys(getNsPrefix(ns) + "*")
473 if (keys != nil) && (len(keys) != 0) {
474 err = s.getDbBackend(ns).Del(keys)
479 //RemoveAllAndPublish removes all keys under the namespace and if successfull, it
480 //will publish an event to given channel. This operation is not atomic, thus it is
481 //not guaranteed that all keys are removed.
482 func (s *SyncStorage) RemoveAllAndPublish(ns string, channelsAndEvents []string) error {
483 nsPrefix := getNsPrefix(ns)
484 keys, err := s.getDbBackend(ns).Keys(nsPrefix + "*")
488 if (keys != nil) && (len(keys) != 0) {
489 if len(channelsAndEvents) == 0 {
490 return s.getDbBackend(ns).Del(keys)
492 if err := s.checkChannelsAndEvents("RemoveAllAndPublish", channelsAndEvents); err != nil {
495 channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
496 err = s.getDbBackend(ns).DelMPub(channelsAndEventsPrepared, keys)
501 //AddMember adds a new members to a group under given namespace.
503 //SDL groups are unordered collections of members where each member is
504 //unique. It is possible to add the same member several times without the
505 //need to check if it already exists.
506 func (s *SyncStorage) AddMember(ns string, group string, member ...interface{}) error {
507 return s.getDbBackend(ns).SAdd(getNsPrefix(ns)+group, member...)
510 //RemoveMember removes members from a group under given namespace.
511 func (s *SyncStorage) RemoveMember(ns string, group string, member ...interface{}) error {
512 return s.getDbBackend(ns).SRem(getNsPrefix(ns)+group, member...)
515 //RemoveGroup removes the whole group along with it's members under given namespace.
516 func (s *SyncStorage) RemoveGroup(ns string, group string) error {
517 return s.getDbBackend(ns).Del([]string{getNsPrefix(ns) + group})
520 //GetMembers returns all the members from a group under given namespace.
521 func (s *SyncStorage) GetMembers(ns string, group string) ([]string, error) {
522 retVal, err := s.getDbBackend(ns).SMembers(getNsPrefix(ns) + group)
524 return []string{}, err
529 //IsMember returns true if given member is found from a group under given namespace.
530 func (s *SyncStorage) IsMember(ns string, group string, member interface{}) (bool, error) {
531 retVal, err := s.getDbBackend(ns).SIsMember(getNsPrefix(ns)+group, member)
538 //GroupSize returns the number of members in a group under given namespace.
539 func (s *SyncStorage) GroupSize(ns string, group string) (int64, error) {
540 retVal, err := s.getDbBackend(ns).SCard(getNsPrefix(ns) + group)
547 func (s *SyncStorage) randomToken() (string, error) {
549 defer s.mutex.Unlock()
552 s.tmp = make([]byte, 16)
555 if _, err := io.ReadFull(rand.Reader, s.tmp); err != nil {
559 return base64.RawURLEncoding.EncodeToString(s.tmp), nil
562 //LockResource function is used for locking a resource under given namespace.
563 //The resource lock in practice is a key with random value that is set to expire
564 //after a time period. The value written to key is a random value, thus only the
565 //instance created a lock, can release it. Resource locks are per namespace.
566 func (s *SyncStorage) LockResource(ns string, resource string, expiration time.Duration, opt *Options) (*SyncStorageLock, error) {
567 value, err := s.randomToken()
572 var retryTimer *time.Timer
573 for i, attempts := 0, opt.getRetryCount()+1; i < attempts; i++ {
574 ok, err := s.getDbBackend(ns).SetNX(getNsPrefix(ns)+resource, value, expiration)
578 return &SyncStorageLock{s: s, key: resource, value: value}, nil
580 if retryTimer == nil {
581 retryTimer = time.NewTimer(opt.getRetryWait())
582 defer retryTimer.Stop()
584 retryTimer.Reset(opt.getRetryWait())
591 return nil, errors.New("Lock not obtained")
594 //ReleaseResource removes the lock from a resource under given namespace. If lock
595 //is already expired or some other instance is keeping the lock (lock taken after
596 //expiration), an error is returned.
597 func (l *SyncStorageLock) ReleaseResource(ns string) error {
598 ok, err := l.s.getDbBackend(ns).DelIE(getNsPrefix(ns)+l.key, l.value)
604 return errors.New("Lock not held")
609 //RefreshResource function can be used to set a new expiration time for the
610 //resource lock (if the lock still exists) under given namespace. The old
611 //remaining expiration time is overwritten with the given new expiration time.
612 func (l *SyncStorageLock) RefreshResource(ns string, expiration time.Duration) error {
613 err := l.s.getDbBackend(ns).PExpireIE(getNsPrefix(ns)+l.key, l.value, expiration)
617 //CheckResource returns the expiration time left for a resource under given
618 //namespace. If the resource doesn't exist, -2 is returned.
619 func (s *SyncStorage) CheckResource(ns string, resource string) (time.Duration, error) {
620 result, err := s.getDbBackend(ns).PTTL(getNsPrefix(ns) + resource)
624 if result == time.Duration(-1) {
625 return 0, errors.New("invalid resource given, no expiration time attached")
630 //SyncStorageLock struct identifies the resource lock instance. Releasing and adjusting the
631 //expirations are done using the methods defined for this struct.
632 type SyncStorageLock struct {
638 func getNsPrefix(ns string) string {
639 return "{" + ns + "},"
642 type iDatabase interface {
643 SubscribeChannelDB(cb func(string, ...string), channelPrefix, eventSeparator string, channels ...string)
644 UnsubscribeChannelDB(channels ...string)
645 MSet(pairs ...interface{}) error
646 MSetMPub(channelsAndEvents []string, pairs ...interface{}) error
647 MGet(keys []string) ([]interface{}, error)
649 Del(keys []string) error
650 DelMPub(channelsAndEvents []string, keys []string) error
651 Keys(key string) ([]string, error)
652 SetIE(key string, oldData, newData interface{}) (bool, error)
653 SetIEPub(channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error)
654 SetNX(key string, data interface{}, expiration time.Duration) (bool, error)
655 SetNXPub(channelsAndEvents []string, key string, data interface{}) (bool, error)
656 DelIE(key string, data interface{}) (bool, error)
657 DelIEPub(channelsAndEvents []string, key string, data interface{}) (bool, error)
658 SAdd(key string, data ...interface{}) error
659 SRem(key string, data ...interface{}) error
660 SMembers(key string) ([]string, error)
661 SIsMember(key string, data interface{}) (bool, error)
662 SCard(key string) (int64, error)
663 PTTL(key string) (time.Duration, error)
664 PExpireIE(key string, data interface{}, expiration time.Duration) error