2 Copyright (c) 2021 AT&T Intellectual Property.
3 Copyright (c) 2018-2021 Nokia.
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
9 http://www.apache.org/licenses/LICENSE-2.0
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
19 * This source code is part of the near-RT RIC (RAN Intelligent Controller)
20 * platform project (RICP).
37 //SyncStorage provides multi-namespace APIs to read, write and modify key-value
38 //pairs. key-values are belonging to a namespace and SyncStorage provides APIs
39 //where namespace can be given in every API call. This means that with
40 //SyncStorage you can easily set key-values under different namespace compared to
41 //SdlInstance where namespace can be defined only at SdlInstance instance creation
43 type SyncStorage struct {
50 //NewSyncStorage creates a new sdl instance.
51 //The database used as a backend is given as a parameter
52 func NewSyncStorage() *SyncStorage {
53 return newSyncStorage(NewDatabase())
56 func newSyncStorage(db *Database) *SyncStorage {
58 eventSeparator: "___",
59 iDatabase: db.instance,
63 //SubscribeChannel lets you to subscribe for a events on a given channels.
64 //SDL notifications are events that are published on a specific channels.
65 //Both the channel and events are defined by the entity that is publishing
66 //the events under given namespace.
68 //When subscribing for a channel, a callback function is given as a parameter.
69 //Whenever a notification is received from a channel, this callback is called
70 //with channel and notifications as parameter (several notifications could be
71 //packed to a single callback function call). A call to SubscribeChannel function
72 //returns immediatelly, callbacks will be called asyncronously.
74 //It is possible to subscribe to different channels using different callbacks. In
75 //this case simply use SubscribeChannel function separately for each channel.
77 //When receiving events in callback routine, it is a good practive to return from
78 //callback as quickly as possible. E.g. reading in callback context should be avoided
79 //and using of Go signals is recommended. Also it should be noted that in case of several
80 //events received from different channels, callbacks are called in series one by one.
81 func (s *SyncStorage) SubscribeChannel(ns string, cb func(string, ...string), channels ...string) error {
82 nsPrefix := getNsPrefix(ns)
83 s.SubscribeChannelDB(cb, nsPrefix, s.eventSeparator, s.setNamespaceToChannels(nsPrefix, channels...)...)
87 //UnsubscribeChannel removes subscription from one or several channels under given
89 func (s *SyncStorage) UnsubscribeChannel(ns string, channels ...string) error {
90 nsPrefix := getNsPrefix(ns)
91 s.UnsubscribeChannelDB(s.setNamespaceToChannels(nsPrefix, channels...)...)
95 //Close connection to backend database.
96 func (s *SyncStorage) Close() error {
100 func (s *SyncStorage) checkChannelsAndEvents(cmd string, channelsAndEvents []string) error {
101 if len(channelsAndEvents)%2 != 0 {
102 return fmt.Errorf("%s: Channels and events must be given as pairs", cmd)
104 for i, v := range channelsAndEvents {
106 if strings.Contains(v, s.eventSeparator) {
107 return fmt.Errorf("%s: event %s contains illegal substring (\"%s\")", cmd, v, s.eventSeparator)
114 func (s *SyncStorage) setNamespaceToChannels(nsPrefix string, channels ...string) []string {
116 for _, v := range channels {
117 retVal = append(retVal, nsPrefix+v)
122 func (s *SyncStorage) setNamespaceToKeys(nsPrefix string, pairs ...interface{}) ([]interface{}, error) {
123 retVal := make([]interface{}, 0)
125 for _, v := range pairs {
126 reflectType := reflect.TypeOf(v)
127 switch reflectType.Kind() {
129 x := reflect.ValueOf(v).MapRange()
131 retVal = append(retVal, nsPrefix+x.Key().Interface().(string))
132 retVal = append(retVal, x.Value().Interface())
136 x := reflect.ValueOf(v)
138 return []interface{}{}, errors.New("Key/value pairs doesn't match")
140 for i2 := 0; i2 < x.Len(); i2++ {
142 retVal = append(retVal, nsPrefix+x.Index(i2).Interface().(string))
144 retVal = append(retVal, x.Index(i2).Interface())
148 if reflectType.Elem().Kind() == reflect.Uint8 {
149 retVal = append(retVal, v)
152 return []interface{}{}, errors.New("Key/value pairs doesn't match")
157 x := reflect.ValueOf(v)
159 return []interface{}{}, errors.New("Key/value pairs doesn't match")
161 for i2 := 0; i2 < x.Len(); i2++ {
163 retVal = append(retVal, nsPrefix+x.Index(i2).Interface().(string))
165 retVal = append(retVal, x.Index(i2).Interface())
169 if reflectType.Elem().Kind() == reflect.Uint8 {
170 retVal = append(retVal, v)
173 return []interface{}{}, errors.New("Key/value pairs doesn't match")
178 retVal = append(retVal, nsPrefix+v.(string))
181 retVal = append(retVal, v)
186 if len(retVal)%2 != 0 {
187 return []interface{}{}, errors.New("Key/value pairs doesn't match")
192 func (s *SyncStorage) prepareChannelsAndEvents(nsPrefix string, channelsAndEvents []string) []string {
193 channelEventMap := make(map[string]string)
194 for i, v := range channelsAndEvents {
198 _, exists := channelEventMap[v]
200 channelEventMap[v] = channelEventMap[v] + s.eventSeparator + channelsAndEvents[i+1]
202 channelEventMap[v] = channelsAndEvents[i+1]
205 retVal := make([]string, 0)
206 for k, v := range channelEventMap {
207 retVal = append(retVal, nsPrefix+k)
208 retVal = append(retVal, v)
213 //SetAndPublish function writes data to shared data layer storage and sends an event to
214 //a channel. Writing is done atomically, i.e. all succeeds or fails.
215 //Data is written under the namespace what is given as a parameter for this function.
216 //Data to be written is given as key-value pairs. Several key-value
217 //pairs can be written with one call.
218 //The key is expected to be string whereas value can be anything, string,
219 //number, slice array or map
221 //If data was set successfully, an event is sent to a channel.
222 //Channels and events are given as pairs is channelsAndEvents parameter.
223 //It is possible to send several events to several channels by giving several
224 //channel-event pairs.
225 // E.g. []{"channel1", "event1", "channel2", "event2", "channel1", "event3"}
226 //will send event1 and event3 to channel1 and event2 to channel2.
227 func (s *SyncStorage) SetAndPublish(ns string, channelsAndEvents []string, pairs ...interface{}) error {
228 nsPrefix := getNsPrefix(ns)
229 keyAndData, err := s.setNamespaceToKeys(nsPrefix, pairs...)
233 if len(channelsAndEvents) == 0 {
234 return s.MSet(keyAndData...)
236 if err := s.checkChannelsAndEvents("SetAndPublish", channelsAndEvents); err != nil {
239 channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
240 return s.MSetMPub(channelsAndEventsPrepared, keyAndData...)
243 //Set function writes data to shared data layer storage. Writing is done
244 //atomically, i.e. all succeeds or fails.
245 //Data is written under the namespace what is given as a parameter for this function.
246 //Data to be written is given as key-value pairs. Several key-value
247 //pairs can be written with one call.
248 //The key is expected to be string whereas value can be anything, string,
249 //number, slice array or map
250 func (s *SyncStorage) Set(ns string, pairs ...interface{}) error {
255 keyAndData, err := s.setNamespaceToKeys(getNsPrefix(ns), pairs...)
259 return s.MSet(keyAndData...)
262 //Get function atomically reads one or more keys from SDL. The returned map has the
263 //requested keys as index and data as value. If the requested key is not found
264 //from SDL, it's value is nil
265 //Read operation is targeted to the namespace what is given as a parameter for this
267 func (s *SyncStorage) Get(ns string, keys []string) (map[string]interface{}, error) {
268 m := make(map[string]interface{})
273 var keysWithNs []string
274 for _, v := range keys {
275 keysWithNs = append(keysWithNs, getNsPrefix(ns)+v)
277 val, err := s.MGet(keysWithNs)
281 for i, v := range val {
287 //SetIfAndPublish atomically replaces existing data with newData in SDL if data matches the oldData.
288 //If replace was done successfully, true will be returned. Also, if publishing was successfull, an event
289 //is published to a given channel.
290 //Data is written under the namespace what is given as a parameter for this function.
291 func (s *SyncStorage) SetIfAndPublish(ns string, channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error) {
292 nsPrefix := getNsPrefix(ns)
293 if len(channelsAndEvents) == 0 {
294 return s.SetIE(nsPrefix+key, oldData, newData)
296 if err := s.checkChannelsAndEvents("SetIfAndPublish", channelsAndEvents); err != nil {
299 channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
300 return s.SetIEPub(channelsAndEventsPrepared, nsPrefix+key, oldData, newData)
303 //SetIf atomically replaces existing data with newData in SDL if data matches the oldData.
304 //If replace was done successfully, true will be returned.
305 //Data is written under the namespace what is given as a parameter for this function.
306 func (s *SyncStorage) SetIf(ns string, key string, oldData, newData interface{}) (bool, error) {
307 return s.SetIE(getNsPrefix(ns)+key, oldData, newData)
310 //SetIfNotExistsAndPublish conditionally sets the value of a key. If key already exists in SDL,
311 //then it's value is not changed. Checking the key existence and potential set operation
312 //is done atomically. If the set operation was done successfully, an event is published to a
314 //Data is written under the namespace what is given as a parameter for this function.
315 func (s *SyncStorage) SetIfNotExistsAndPublish(ns string, channelsAndEvents []string, key string, data interface{}) (bool, error) {
316 nsPrefix := getNsPrefix(ns)
317 if len(channelsAndEvents) == 0 {
318 return s.SetNX(nsPrefix+key, data, 0)
320 if err := s.checkChannelsAndEvents("SetIfNotExistsAndPublish", channelsAndEvents); err != nil {
323 channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
324 return s.SetNXPub(channelsAndEventsPrepared, nsPrefix+key, data)
327 //SetIfNotExists conditionally sets the value of a key. If key already exists in SDL,
328 //then it's value is not changed. Checking the key existence and potential set operation
329 //is done atomically.
330 //Data is written under the namespace what is given as a parameter for this function.
331 func (s *SyncStorage) SetIfNotExists(ns string, key string, data interface{}) (bool, error) {
332 return s.SetNX(getNsPrefix(ns)+key, data, 0)
335 //RemoveAndPublish removes data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
336 //Trying to remove a nonexisting key is not considered as an error.
337 //An event is published into a given channel if remove operation is successfull and
338 //at least one key is removed (if several keys given). If the given key(s) doesn't exist
339 //when trying to remove, no event is published.
340 //Data is removed under the namespace what is given as a parameter for this function.
341 func (s *SyncStorage) RemoveAndPublish(ns string, channelsAndEvents []string, keys []string) error {
346 var keysWithNs []string
347 nsPrefix := getNsPrefix(ns)
348 for _, v := range keys {
349 keysWithNs = append(keysWithNs, nsPrefix+v)
351 if len(channelsAndEvents) == 0 {
352 return s.Del(keysWithNs)
354 if err := s.checkChannelsAndEvents("RemoveAndPublish", channelsAndEvents); err != nil {
357 channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
358 return s.DelMPub(channelsAndEventsPrepared, keysWithNs)
361 //Remove data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
362 //Data is removed under the namespace what is given as a parameter for this function.
363 func (s *SyncStorage) Remove(ns string, keys []string) error {
368 var keysWithNs []string
369 for _, v := range keys {
370 keysWithNs = append(keysWithNs, getNsPrefix(ns)+v)
372 err := s.Del(keysWithNs)
376 //RemoveIfAndPublish removes data from SDL conditionally and if remove was done successfully,
377 //a given event is published to channel. If existing data matches given data,
378 //key and data are removed from SDL. If remove was done successfully, true is returned.
379 //Data is removed under the namespace what is given as a parameter for this function.
380 func (s *SyncStorage) RemoveIfAndPublish(ns string, channelsAndEvents []string, key string, data interface{}) (bool, error) {
381 nsPrefix := getNsPrefix(ns)
382 if len(channelsAndEvents) == 0 {
383 return s.DelIE(nsPrefix+key, data)
385 if err := s.checkChannelsAndEvents("RemoveIfAndPublish", channelsAndEvents); err != nil {
388 channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
389 return s.DelIEPub(channelsAndEventsPrepared, nsPrefix+key, data)
392 //RemoveIf removes data from SDL conditionally. If existing data matches given data,
393 //key and data are removed from SDL. If remove was done successfully, true is returned.
394 //Data is removed under the namespace what is given as a parameter for this function.
395 func (s *SyncStorage) RemoveIf(ns string, key string, data interface{}) (bool, error) {
396 status, err := s.DelIE(getNsPrefix(ns)+key, data)
403 //GetAll returns all keys under the namespace. No prior knowledge about the keys in the
404 //given namespace exists, thus operation is not guaranteed to be atomic or isolated.
405 func (s *SyncStorage) GetAll(ns string) ([]string, error) {
406 nsPrefix := getNsPrefix(ns)
407 keys, err := s.Keys(nsPrefix + "*")
412 for _, v := range keys {
413 retVal = append(retVal, strings.Split(v, nsPrefix)[1])
418 //RemoveAll removes all keys under the namespace. Remove operation is not atomic, thus
419 //it is not guaranteed that all keys are removed.
420 func (s *SyncStorage) RemoveAll(ns string) error {
421 keys, err := s.Keys(getNsPrefix(ns) + "*")
425 if (keys != nil) && (len(keys) != 0) {
431 //RemoveAllAndPublish removes all keys under the namespace and if successfull, it
432 //will publish an event to given channel. This operation is not atomic, thus it is
433 //not guaranteed that all keys are removed.
434 func (s *SyncStorage) RemoveAllAndPublish(ns string, channelsAndEvents []string) error {
435 nsPrefix := getNsPrefix(ns)
436 keys, err := s.Keys(nsPrefix + "*")
440 if (keys != nil) && (len(keys) != 0) {
441 if len(channelsAndEvents) == 0 {
444 if err := s.checkChannelsAndEvents("RemoveAllAndPublish", channelsAndEvents); err != nil {
447 channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
448 err = s.DelMPub(channelsAndEventsPrepared, keys)
453 //AddMember adds a new members to a group under given namespace.
455 //SDL groups are unordered collections of members where each member is
456 //unique. It is possible to add the same member several times without the
457 //need to check if it already exists.
458 func (s *SyncStorage) AddMember(ns string, group string, member ...interface{}) error {
459 return s.SAdd(getNsPrefix(ns)+group, member...)
462 //RemoveMember removes members from a group under given namespace.
463 func (s *SyncStorage) RemoveMember(ns string, group string, member ...interface{}) error {
464 return s.SRem(getNsPrefix(ns)+group, member...)
467 //RemoveGroup removes the whole group along with it's members under given namespace.
468 func (s *SyncStorage) RemoveGroup(ns string, group string) error {
469 return s.Del([]string{getNsPrefix(ns) + group})
472 //GetMembers returns all the members from a group under given namespace.
473 func (s *SyncStorage) GetMembers(ns string, group string) ([]string, error) {
474 retVal, err := s.SMembers(getNsPrefix(ns) + group)
476 return []string{}, err
481 //IsMember returns true if given member is found from a group under given namespace.
482 func (s *SyncStorage) IsMember(ns string, group string, member interface{}) (bool, error) {
483 retVal, err := s.SIsMember(getNsPrefix(ns)+group, member)
490 //GroupSize returns the number of members in a group under given namespace.
491 func (s *SyncStorage) GroupSize(ns string, group string) (int64, error) {
492 retVal, err := s.SCard(getNsPrefix(ns) + group)
499 func (s *SyncStorage) randomToken() (string, error) {
501 defer s.mutex.Unlock()
504 s.tmp = make([]byte, 16)
507 if _, err := io.ReadFull(rand.Reader, s.tmp); err != nil {
511 return base64.RawURLEncoding.EncodeToString(s.tmp), nil
514 //LockResource function is used for locking a resource under given namespace.
515 //The resource lock in practice is a key with random value that is set to expire
516 //after a time period. The value written to key is a random value, thus only the
517 //instance created a lock, can release it. Resource locks are per namespace.
518 func (s *SyncStorage) LockResource(ns string, resource string, expiration time.Duration, opt *Options) (*SyncStorageLock, error) {
519 value, err := s.randomToken()
524 var retryTimer *time.Timer
525 for i, attempts := 0, opt.getRetryCount()+1; i < attempts; i++ {
526 ok, err := s.SetNX(getNsPrefix(ns)+resource, value, expiration)
530 return &SyncStorageLock{s: s, key: resource, value: value}, nil
532 if retryTimer == nil {
533 retryTimer = time.NewTimer(opt.getRetryWait())
534 defer retryTimer.Stop()
536 retryTimer.Reset(opt.getRetryWait())
543 return nil, errors.New("Lock not obtained")
546 //ReleaseResource removes the lock from a resource under given namespace. If lock
547 //is already expired or some other instance is keeping the lock (lock taken after
548 //expiration), an error is returned.
549 func (l *SyncStorageLock) ReleaseResource(ns string) error {
550 ok, err := l.s.DelIE(getNsPrefix(ns)+l.key, l.value)
556 return errors.New("Lock not held")
561 //RefreshResource function can be used to set a new expiration time for the
562 //resource lock (if the lock still exists) under given namespace. The old
563 //remaining expiration time is overwritten with the given new expiration time.
564 func (l *SyncStorageLock) RefreshResource(ns string, expiration time.Duration) error {
565 err := l.s.PExpireIE(getNsPrefix(ns)+l.key, l.value, expiration)
569 //CheckResource returns the expiration time left for a resource under given
570 //namespace. If the resource doesn't exist, -2 is returned.
571 func (s *SyncStorage) CheckResource(ns string, resource string) (time.Duration, error) {
572 result, err := s.PTTL(getNsPrefix(ns) + resource)
576 if result == time.Duration(-1) {
577 return 0, errors.New("invalid resource given, no expiration time attached")
582 //SyncStorageLock struct identifies the resource lock instance. Releasing and adjusting the
583 //expirations are done using the methods defined for this struct.
584 type SyncStorageLock struct {
590 func getNsPrefix(ns string) string {
591 return "{" + ns + "},"
594 type iDatabase interface {
595 SubscribeChannelDB(cb func(string, ...string), channelPrefix, eventSeparator string, channels ...string)
596 UnsubscribeChannelDB(channels ...string)
597 MSet(pairs ...interface{}) error
598 MSetMPub(channelsAndEvents []string, pairs ...interface{}) error
599 MGet(keys []string) ([]interface{}, error)
601 Del(keys []string) error
602 DelMPub(channelsAndEvents []string, keys []string) error
603 Keys(key string) ([]string, error)
604 SetIE(key string, oldData, newData interface{}) (bool, error)
605 SetIEPub(channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error)
606 SetNX(key string, data interface{}, expiration time.Duration) (bool, error)
607 SetNXPub(channelsAndEvents []string, key string, data interface{}) (bool, error)
608 DelIE(key string, data interface{}) (bool, error)
609 DelIEPub(channelsAndEvents []string, key string, data interface{}) (bool, error)
610 SAdd(key string, data ...interface{}) error
611 SRem(key string, data ...interface{}) error
612 SMembers(key string) ([]string, error)
613 SIsMember(key string, data interface{}) (bool, error)
614 SCard(key string) (int64, error)
615 PTTL(key string) (time.Duration, error)
616 PExpireIE(key string, data interface{}, expiration time.Duration) error