2 Copyright (c) 2021 AT&T Intellectual Property.
3 Copyright (c) 2018-2021 Nokia.
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
9 http://www.apache.org/licenses/LICENSE-2.0
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
19 * This source code is part of the near-RT RIC (RAN Intelligent Controller)
20 * platform project (RICP).
38 //SyncStorage provides multi-namespace APIs to read, write and modify key-value
39 //pairs. key-values are belonging to a namespace and SyncStorage provides APIs
40 //where namespace can be given in every API call. This means that with
41 //SyncStorage you can easily set key-values under different namespace compared to
42 //SdlInstance where namespace can be defined only at SdlInstance instance creation
44 type SyncStorage struct {
51 //NewSyncStorage creates a new sdl instance.
52 //The database used as a backend is given as a parameter
53 func NewSyncStorage() *SyncStorage {
54 return newSyncStorage(NewDatabase())
57 func newSyncStorage(db *Database) *SyncStorage {
59 eventSeparator: "___",
64 //selectDbInstance Selects DB instance what provides DB services for the namespace
65 func (s *SyncStorage) getDbBackend(ns string) iDatabase {
66 instanceCount := uint32(len(s.db.instances))
67 instanceID := getHash(ns) % instanceCount
68 return s.db.instances[instanceID]
71 //getHash Returns hash value calculated from the string
72 func getHash(s string) uint32 {
73 tbl := crc32.MakeTable(crc32.IEEE)
74 return crc32.Checksum([]byte(s), tbl)
77 //SubscribeChannel lets you to subscribe for a events on a given channels.
78 //SDL notifications are events that are published on a specific channels.
79 //Both the channel and events are defined by the entity that is publishing
80 //the events under given namespace.
82 //When subscribing for a channel, a callback function is given as a parameter.
83 //Whenever a notification is received from a channel, this callback is called
84 //with channel and notifications as parameter (several notifications could be
85 //packed to a single callback function call). A call to SubscribeChannel function
86 //returns immediatelly, callbacks will be called asyncronously.
88 //It is possible to subscribe to different channels using different callbacks. In
89 //this case simply use SubscribeChannel function separately for each channel.
91 //When receiving events in callback routine, it is a good practive to return from
92 //callback as quickly as possible. E.g. reading in callback context should be avoided
93 //and using of Go signals is recommended. Also it should be noted that in case of several
94 //events received from different channels, callbacks are called in series one by one.
95 func (s *SyncStorage) SubscribeChannel(ns string, cb func(string, ...string), channels ...string) error {
96 nsPrefix := getNsPrefix(ns)
97 s.getDbBackend(ns).SubscribeChannelDB(cb, nsPrefix, s.eventSeparator, s.setNamespaceToChannels(nsPrefix, channels...)...)
101 //UnsubscribeChannel removes subscription from one or several channels under given
103 func (s *SyncStorage) UnsubscribeChannel(ns string, channels ...string) error {
104 nsPrefix := getNsPrefix(ns)
105 s.getDbBackend(ns).UnsubscribeChannelDB(s.setNamespaceToChannels(nsPrefix, channels...)...)
109 //Close connection to backend database.
110 func (s *SyncStorage) Close() error {
112 for _, db := range s.db.instances {
113 if err := db.CloseDB(); err != nil {
120 func (s *SyncStorage) checkChannelsAndEvents(cmd string, channelsAndEvents []string) error {
121 if len(channelsAndEvents)%2 != 0 {
122 return fmt.Errorf("%s: Channels and events must be given as pairs", cmd)
124 for i, v := range channelsAndEvents {
126 if strings.Contains(v, s.eventSeparator) {
127 return fmt.Errorf("%s: event %s contains illegal substring (\"%s\")", cmd, v, s.eventSeparator)
134 func (s *SyncStorage) setNamespaceToChannels(nsPrefix string, channels ...string) []string {
136 for _, v := range channels {
137 retVal = append(retVal, nsPrefix+v)
142 func (s *SyncStorage) setNamespaceToKeys(nsPrefix string, pairs ...interface{}) ([]interface{}, error) {
143 retVal := make([]interface{}, 0)
145 for _, v := range pairs {
146 reflectType := reflect.TypeOf(v)
147 switch reflectType.Kind() {
149 x := reflect.ValueOf(v).MapRange()
151 retVal = append(retVal, nsPrefix+x.Key().Interface().(string))
152 retVal = append(retVal, x.Value().Interface())
156 x := reflect.ValueOf(v)
158 return []interface{}{}, errors.New("Key/value pairs doesn't match")
160 for i2 := 0; i2 < x.Len(); i2++ {
162 retVal = append(retVal, nsPrefix+x.Index(i2).Interface().(string))
164 retVal = append(retVal, x.Index(i2).Interface())
168 if reflectType.Elem().Kind() == reflect.Uint8 {
169 retVal = append(retVal, v)
172 return []interface{}{}, errors.New("Key/value pairs doesn't match")
177 x := reflect.ValueOf(v)
179 return []interface{}{}, errors.New("Key/value pairs doesn't match")
181 for i2 := 0; i2 < x.Len(); i2++ {
183 retVal = append(retVal, nsPrefix+x.Index(i2).Interface().(string))
185 retVal = append(retVal, x.Index(i2).Interface())
189 if reflectType.Elem().Kind() == reflect.Uint8 {
190 retVal = append(retVal, v)
193 return []interface{}{}, errors.New("Key/value pairs doesn't match")
198 retVal = append(retVal, nsPrefix+v.(string))
201 retVal = append(retVal, v)
206 if len(retVal)%2 != 0 {
207 return []interface{}{}, errors.New("Key/value pairs doesn't match")
212 func (s *SyncStorage) prepareChannelsAndEvents(nsPrefix string, channelsAndEvents []string) []string {
213 channelEventMap := make(map[string]string)
214 for i, v := range channelsAndEvents {
218 _, exists := channelEventMap[v]
220 channelEventMap[v] = channelEventMap[v] + s.eventSeparator + channelsAndEvents[i+1]
222 channelEventMap[v] = channelsAndEvents[i+1]
225 retVal := make([]string, 0)
226 for k, v := range channelEventMap {
227 retVal = append(retVal, nsPrefix+k)
228 retVal = append(retVal, v)
233 //SetAndPublish function writes data to shared data layer storage and sends an event to
234 //a channel. Writing is done atomically, i.e. all succeeds or fails.
235 //Data is written under the namespace what is given as a parameter for this function.
236 //Data to be written is given as key-value pairs. Several key-value
237 //pairs can be written with one call.
238 //The key is expected to be string whereas value can be anything, string,
239 //number, slice array or map
241 //If data was set successfully, an event is sent to a channel.
242 //Channels and events are given as pairs is channelsAndEvents parameter.
243 //It is possible to send several events to several channels by giving several
244 //channel-event pairs.
245 // E.g. []{"channel1", "event1", "channel2", "event2", "channel1", "event3"}
246 //will send event1 and event3 to channel1 and event2 to channel2.
247 func (s *SyncStorage) SetAndPublish(ns string, channelsAndEvents []string, pairs ...interface{}) error {
248 nsPrefix := getNsPrefix(ns)
249 keyAndData, err := s.setNamespaceToKeys(nsPrefix, pairs...)
253 if len(channelsAndEvents) == 0 {
254 return s.getDbBackend(ns).MSet(keyAndData...)
256 if err := s.checkChannelsAndEvents("SetAndPublish", channelsAndEvents); err != nil {
259 channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
260 return s.getDbBackend(ns).MSetMPub(channelsAndEventsPrepared, keyAndData...)
263 //Set function writes data to shared data layer storage. Writing is done
264 //atomically, i.e. all succeeds or fails.
265 //Data is written under the namespace what is given as a parameter for this function.
266 //Data to be written is given as key-value pairs. Several key-value
267 //pairs can be written with one call.
268 //The key is expected to be string whereas value can be anything, string,
269 //number, slice array or map
270 func (s *SyncStorage) Set(ns string, pairs ...interface{}) error {
275 keyAndData, err := s.setNamespaceToKeys(getNsPrefix(ns), pairs...)
279 return s.getDbBackend(ns).MSet(keyAndData...)
282 //Get function atomically reads one or more keys from SDL. The returned map has the
283 //requested keys as index and data as value. If the requested key is not found
284 //from SDL, it's value is nil
285 //Read operation is targeted to the namespace what is given as a parameter for this
287 func (s *SyncStorage) Get(ns string, keys []string) (map[string]interface{}, error) {
288 m := make(map[string]interface{})
293 var keysWithNs []string
294 for _, v := range keys {
295 keysWithNs = append(keysWithNs, getNsPrefix(ns)+v)
297 val, err := s.getDbBackend(ns).MGet(keysWithNs)
301 for i, v := range val {
307 //SetIfAndPublish atomically replaces existing data with newData in SDL if data matches the oldData.
308 //If replace was done successfully, true will be returned. Also, if publishing was successfull, an event
309 //is published to a given channel.
310 //Data is written under the namespace what is given as a parameter for this function.
311 func (s *SyncStorage) SetIfAndPublish(ns string, channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error) {
312 nsPrefix := getNsPrefix(ns)
313 if len(channelsAndEvents) == 0 {
314 return s.getDbBackend(ns).SetIE(nsPrefix+key, oldData, newData)
316 if err := s.checkChannelsAndEvents("SetIfAndPublish", channelsAndEvents); err != nil {
319 channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
320 return s.getDbBackend(ns).SetIEPub(channelsAndEventsPrepared, nsPrefix+key, oldData, newData)
323 //SetIf atomically replaces existing data with newData in SDL if data matches the oldData.
324 //If replace was done successfully, true will be returned.
325 //Data is written under the namespace what is given as a parameter for this function.
326 func (s *SyncStorage) SetIf(ns string, key string, oldData, newData interface{}) (bool, error) {
327 return s.getDbBackend(ns).SetIE(getNsPrefix(ns)+key, oldData, newData)
330 //SetIfNotExistsAndPublish conditionally sets the value of a key. If key already exists in SDL,
331 //then it's value is not changed. Checking the key existence and potential set operation
332 //is done atomically. If the set operation was done successfully, an event is published to a
334 //Data is written under the namespace what is given as a parameter for this function.
335 func (s *SyncStorage) SetIfNotExistsAndPublish(ns string, channelsAndEvents []string, key string, data interface{}) (bool, error) {
336 nsPrefix := getNsPrefix(ns)
337 if len(channelsAndEvents) == 0 {
338 return s.getDbBackend(ns).SetNX(nsPrefix+key, data, 0)
340 if err := s.checkChannelsAndEvents("SetIfNotExistsAndPublish", channelsAndEvents); err != nil {
343 channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
344 return s.getDbBackend(ns).SetNXPub(channelsAndEventsPrepared, nsPrefix+key, data)
347 //SetIfNotExists conditionally sets the value of a key. If key already exists in SDL,
348 //then it's value is not changed. Checking the key existence and potential set operation
349 //is done atomically.
350 //Data is written under the namespace what is given as a parameter for this function.
351 func (s *SyncStorage) SetIfNotExists(ns string, key string, data interface{}) (bool, error) {
352 return s.getDbBackend(ns).SetNX(getNsPrefix(ns)+key, data, 0)
355 //RemoveAndPublish removes data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
356 //Trying to remove a nonexisting key is not considered as an error.
357 //An event is published into a given channel if remove operation is successfull and
358 //at least one key is removed (if several keys given). If the given key(s) doesn't exist
359 //when trying to remove, no event is published.
360 //Data is removed under the namespace what is given as a parameter for this function.
361 func (s *SyncStorage) RemoveAndPublish(ns string, channelsAndEvents []string, keys []string) error {
366 var keysWithNs []string
367 nsPrefix := getNsPrefix(ns)
368 for _, v := range keys {
369 keysWithNs = append(keysWithNs, nsPrefix+v)
371 if len(channelsAndEvents) == 0 {
372 return s.getDbBackend(ns).Del(keysWithNs)
374 if err := s.checkChannelsAndEvents("RemoveAndPublish", channelsAndEvents); err != nil {
377 channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
378 return s.getDbBackend(ns).DelMPub(channelsAndEventsPrepared, keysWithNs)
381 //Remove data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
382 //Data is removed under the namespace what is given as a parameter for this function.
383 func (s *SyncStorage) Remove(ns string, keys []string) error {
388 var keysWithNs []string
389 for _, v := range keys {
390 keysWithNs = append(keysWithNs, getNsPrefix(ns)+v)
392 err := s.getDbBackend(ns).Del(keysWithNs)
396 //RemoveIfAndPublish removes data from SDL conditionally and if remove was done successfully,
397 //a given event is published to channel. If existing data matches given data,
398 //key and data are removed from SDL. If remove was done successfully, true is returned.
399 //Data is removed under the namespace what is given as a parameter for this function.
400 func (s *SyncStorage) RemoveIfAndPublish(ns string, channelsAndEvents []string, key string, data interface{}) (bool, error) {
401 nsPrefix := getNsPrefix(ns)
402 if len(channelsAndEvents) == 0 {
403 return s.getDbBackend(ns).DelIE(nsPrefix+key, data)
405 if err := s.checkChannelsAndEvents("RemoveIfAndPublish", channelsAndEvents); err != nil {
408 channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
409 return s.getDbBackend(ns).DelIEPub(channelsAndEventsPrepared, nsPrefix+key, data)
412 //RemoveIf removes data from SDL conditionally. If existing data matches given data,
413 //key and data are removed from SDL. If remove was done successfully, true is returned.
414 //Data is removed under the namespace what is given as a parameter for this function.
415 func (s *SyncStorage) RemoveIf(ns string, key string, data interface{}) (bool, error) {
416 status, err := s.getDbBackend(ns).DelIE(getNsPrefix(ns)+key, data)
423 //GetAll returns all keys under the namespace. No prior knowledge about the keys in the
424 //given namespace exists, thus operation is not guaranteed to be atomic or isolated.
425 func (s *SyncStorage) GetAll(ns string) ([]string, error) {
426 nsPrefix := getNsPrefix(ns)
427 keys, err := s.getDbBackend(ns).Keys(nsPrefix + "*")
432 for _, v := range keys {
433 retVal = append(retVal, strings.Split(v, nsPrefix)[1])
438 //RemoveAll removes all keys under the namespace. Remove operation is not atomic, thus
439 //it is not guaranteed that all keys are removed.
440 func (s *SyncStorage) RemoveAll(ns string) error {
441 keys, err := s.getDbBackend(ns).Keys(getNsPrefix(ns) + "*")
445 if (keys != nil) && (len(keys) != 0) {
446 err = s.getDbBackend(ns).Del(keys)
451 //RemoveAllAndPublish removes all keys under the namespace and if successfull, it
452 //will publish an event to given channel. This operation is not atomic, thus it is
453 //not guaranteed that all keys are removed.
454 func (s *SyncStorage) RemoveAllAndPublish(ns string, channelsAndEvents []string) error {
455 nsPrefix := getNsPrefix(ns)
456 keys, err := s.getDbBackend(ns).Keys(nsPrefix + "*")
460 if (keys != nil) && (len(keys) != 0) {
461 if len(channelsAndEvents) == 0 {
462 return s.getDbBackend(ns).Del(keys)
464 if err := s.checkChannelsAndEvents("RemoveAllAndPublish", channelsAndEvents); err != nil {
467 channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
468 err = s.getDbBackend(ns).DelMPub(channelsAndEventsPrepared, keys)
473 //AddMember adds a new members to a group under given namespace.
475 //SDL groups are unordered collections of members where each member is
476 //unique. It is possible to add the same member several times without the
477 //need to check if it already exists.
478 func (s *SyncStorage) AddMember(ns string, group string, member ...interface{}) error {
479 return s.getDbBackend(ns).SAdd(getNsPrefix(ns)+group, member...)
482 //RemoveMember removes members from a group under given namespace.
483 func (s *SyncStorage) RemoveMember(ns string, group string, member ...interface{}) error {
484 return s.getDbBackend(ns).SRem(getNsPrefix(ns)+group, member...)
487 //RemoveGroup removes the whole group along with it's members under given namespace.
488 func (s *SyncStorage) RemoveGroup(ns string, group string) error {
489 return s.getDbBackend(ns).Del([]string{getNsPrefix(ns) + group})
492 //GetMembers returns all the members from a group under given namespace.
493 func (s *SyncStorage) GetMembers(ns string, group string) ([]string, error) {
494 retVal, err := s.getDbBackend(ns).SMembers(getNsPrefix(ns) + group)
496 return []string{}, err
501 //IsMember returns true if given member is found from a group under given namespace.
502 func (s *SyncStorage) IsMember(ns string, group string, member interface{}) (bool, error) {
503 retVal, err := s.getDbBackend(ns).SIsMember(getNsPrefix(ns)+group, member)
510 //GroupSize returns the number of members in a group under given namespace.
511 func (s *SyncStorage) GroupSize(ns string, group string) (int64, error) {
512 retVal, err := s.getDbBackend(ns).SCard(getNsPrefix(ns) + group)
519 func (s *SyncStorage) randomToken() (string, error) {
521 defer s.mutex.Unlock()
524 s.tmp = make([]byte, 16)
527 if _, err := io.ReadFull(rand.Reader, s.tmp); err != nil {
531 return base64.RawURLEncoding.EncodeToString(s.tmp), nil
534 //LockResource function is used for locking a resource under given namespace.
535 //The resource lock in practice is a key with random value that is set to expire
536 //after a time period. The value written to key is a random value, thus only the
537 //instance created a lock, can release it. Resource locks are per namespace.
538 func (s *SyncStorage) LockResource(ns string, resource string, expiration time.Duration, opt *Options) (*SyncStorageLock, error) {
539 value, err := s.randomToken()
544 var retryTimer *time.Timer
545 for i, attempts := 0, opt.getRetryCount()+1; i < attempts; i++ {
546 ok, err := s.getDbBackend(ns).SetNX(getNsPrefix(ns)+resource, value, expiration)
550 return &SyncStorageLock{s: s, key: resource, value: value}, nil
552 if retryTimer == nil {
553 retryTimer = time.NewTimer(opt.getRetryWait())
554 defer retryTimer.Stop()
556 retryTimer.Reset(opt.getRetryWait())
563 return nil, errors.New("Lock not obtained")
566 //ReleaseResource removes the lock from a resource under given namespace. If lock
567 //is already expired or some other instance is keeping the lock (lock taken after
568 //expiration), an error is returned.
569 func (l *SyncStorageLock) ReleaseResource(ns string) error {
570 ok, err := l.s.getDbBackend(ns).DelIE(getNsPrefix(ns)+l.key, l.value)
576 return errors.New("Lock not held")
581 //RefreshResource function can be used to set a new expiration time for the
582 //resource lock (if the lock still exists) under given namespace. The old
583 //remaining expiration time is overwritten with the given new expiration time.
584 func (l *SyncStorageLock) RefreshResource(ns string, expiration time.Duration) error {
585 err := l.s.getDbBackend(ns).PExpireIE(getNsPrefix(ns)+l.key, l.value, expiration)
589 //CheckResource returns the expiration time left for a resource under given
590 //namespace. If the resource doesn't exist, -2 is returned.
591 func (s *SyncStorage) CheckResource(ns string, resource string) (time.Duration, error) {
592 result, err := s.getDbBackend(ns).PTTL(getNsPrefix(ns) + resource)
596 if result == time.Duration(-1) {
597 return 0, errors.New("invalid resource given, no expiration time attached")
602 //SyncStorageLock struct identifies the resource lock instance. Releasing and adjusting the
603 //expirations are done using the methods defined for this struct.
604 type SyncStorageLock struct {
610 func getNsPrefix(ns string) string {
611 return "{" + ns + "},"
614 type iDatabase interface {
615 SubscribeChannelDB(cb func(string, ...string), channelPrefix, eventSeparator string, channels ...string)
616 UnsubscribeChannelDB(channels ...string)
617 MSet(pairs ...interface{}) error
618 MSetMPub(channelsAndEvents []string, pairs ...interface{}) error
619 MGet(keys []string) ([]interface{}, error)
621 Del(keys []string) error
622 DelMPub(channelsAndEvents []string, keys []string) error
623 Keys(key string) ([]string, error)
624 SetIE(key string, oldData, newData interface{}) (bool, error)
625 SetIEPub(channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error)
626 SetNX(key string, data interface{}, expiration time.Duration) (bool, error)
627 SetNXPub(channelsAndEvents []string, key string, data interface{}) (bool, error)
628 DelIE(key string, data interface{}) (bool, error)
629 DelIEPub(channelsAndEvents []string, key string, data interface{}) (bool, error)
630 SAdd(key string, data ...interface{}) error
631 SRem(key string, data ...interface{}) error
632 SMembers(key string) ([]string, error)
633 SIsMember(key string, data interface{}) (bool, error)
634 SCard(key string) (int64, error)
635 PTTL(key string) (time.Duration, error)
636 PExpireIE(key string, data interface{}, expiration time.Duration) error