2 Copyright (c) 2021 AT&T Intellectual Property.
3 Copyright (c) 2018-2021 Nokia.
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
9 http://www.apache.org/licenses/LICENSE-2.0
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
19 * This source code is part of the near-RT RIC (RAN Intelligent Controller)
20 * platform project (RICP).
30 "gerrit.o-ran-sc.org/r/ric-plt/sdlgo/internal/sdlgoredis"
39 //SyncStorage provides multi-namespace APIs to read, write and modify key-value
40 //pairs. key-values are belonging to a namespace and SyncStorage provides APIs
41 //where namespace can be given in every API call. This means that with
42 //SyncStorage you can easily set key-values under different namespace compared to
43 //SdlInstance where namespace can be defined only at SdlInstance instance creation
45 type SyncStorage struct {
51 //NewSyncStorage creates a new sdl instance.
52 //The database used as a backend is given as a parameter
53 func NewSyncStorage() *SyncStorage {
54 return newSyncStorage(NewDatabase())
57 func newSyncStorage(db *Database) *SyncStorage {
63 //selectDbInstance Selects DB instance what provides DB services for the namespace
64 func (s *SyncStorage) getDbBackend(ns string) iDatabase {
65 instanceCount := uint32(len(s.db.instances))
66 instanceID := getHash(ns) % instanceCount
67 return s.db.instances[instanceID]
70 //getHash Returns hash value calculated from the string
71 func getHash(s string) uint32 {
72 tbl := crc32.MakeTable(crc32.IEEE)
73 return crc32.Checksum([]byte(s), tbl)
76 //SubscribeChannel lets you to subscribe for a events on a given channels.
77 //SDL notifications are events that are published on a specific channels.
78 //Both the channel and events are defined by the entity that is publishing
79 //the events under given namespace.
81 //When subscribing for a channel, a callback function is given as a parameter.
82 //Whenever a notification is received from a channel, this callback is called
83 //with channel and notifications as parameter (several notifications could be
84 //packed to a single callback function call). A call to SubscribeChannel function
85 //returns immediatelly, callbacks will be called asyncronously.
87 //It is possible to subscribe to different channels using different callbacks. In
88 //this case simply use SubscribeChannel function separately for each channel.
90 //When receiving events in callback routine, it is a good practive to return from
91 //callback as quickly as possible. E.g. reading in callback context should be avoided
92 //and using of Go signals is recommended. Also it should be noted that in case of several
93 //events received from different channels, callbacks are called in series one by one.
94 func (s *SyncStorage) SubscribeChannel(ns string, cb func(string, ...string), channels ...string) error {
95 nsPrefix := getNsPrefix(ns)
96 return s.getDbBackend(ns).SubscribeChannelDB(cb, s.setNamespaceToChannels(nsPrefix, channels...)...)
99 //UnsubscribeChannel removes subscription from one or several channels under given
101 func (s *SyncStorage) UnsubscribeChannel(ns string, channels ...string) error {
102 nsPrefix := getNsPrefix(ns)
103 return s.getDbBackend(ns).UnsubscribeChannelDB(s.setNamespaceToChannels(nsPrefix, channels...)...)
106 //Close connection to backend database.
107 func (s *SyncStorage) Close() error {
109 for _, db := range s.db.instances {
110 if err := db.CloseDB(); err != nil {
117 func (s *SyncStorage) checkChannelsAndEvents(cmd string, channelsAndEvents []string) error {
118 if len(channelsAndEvents)%2 != 0 {
119 return fmt.Errorf("%s: Channels and events must be given as pairs", cmd)
121 for i, v := range channelsAndEvents {
123 if strings.Contains(v, sdlgoredis.EventSeparator) {
124 return fmt.Errorf("%s: event %s contains illegal substring (\"%s\")", cmd, v, sdlgoredis.EventSeparator)
131 func (s *SyncStorage) setNamespaceToChannels(nsPrefix string, channels ...string) []string {
133 for _, v := range channels {
134 retVal = append(retVal, nsPrefix+v)
139 func (s *SyncStorage) setNamespaceToKeys(nsPrefix string, pairs ...interface{}) ([]interface{}, error) {
140 retVal := make([]interface{}, 0)
142 for _, v := range pairs {
143 reflectType := reflect.TypeOf(v)
144 switch reflectType.Kind() {
146 x := reflect.ValueOf(v).MapRange()
148 retVal = append(retVal, nsPrefix+x.Key().Interface().(string))
149 retVal = append(retVal, x.Value().Interface())
153 x := reflect.ValueOf(v)
155 return []interface{}{}, errors.New("Key/value pairs doesn't match")
157 for i2 := 0; i2 < x.Len(); i2++ {
159 retVal = append(retVal, nsPrefix+x.Index(i2).Interface().(string))
161 retVal = append(retVal, x.Index(i2).Interface())
165 if reflectType.Elem().Kind() == reflect.Uint8 {
166 retVal = append(retVal, v)
169 return []interface{}{}, errors.New("Key/value pairs doesn't match")
174 x := reflect.ValueOf(v)
176 return []interface{}{}, errors.New("Key/value pairs doesn't match")
178 for i2 := 0; i2 < x.Len(); i2++ {
180 retVal = append(retVal, nsPrefix+x.Index(i2).Interface().(string))
182 retVal = append(retVal, x.Index(i2).Interface())
186 if reflectType.Elem().Kind() == reflect.Uint8 {
187 retVal = append(retVal, v)
190 return []interface{}{}, errors.New("Key/value pairs doesn't match")
195 retVal = append(retVal, nsPrefix+v.(string))
198 retVal = append(retVal, v)
203 if len(retVal)%2 != 0 {
204 return []interface{}{}, errors.New("Key/value pairs doesn't match")
209 func (s *SyncStorage) prepareChannelsAndEvents(nsPrefix string, channelsAndEvents []string) []string {
210 channelEventMap := make(map[string]string)
211 for i, v := range channelsAndEvents {
215 _, exists := channelEventMap[v]
217 channelEventMap[v] = channelEventMap[v] + sdlgoredis.EventSeparator + channelsAndEvents[i+1]
219 channelEventMap[v] = channelsAndEvents[i+1]
222 retVal := make([]string, 0)
223 for k, v := range channelEventMap {
224 retVal = append(retVal, nsPrefix+k)
225 retVal = append(retVal, v)
230 //SetAndPublish function writes data to shared data layer storage and sends an event to
231 //a channel. Writing is done atomically, i.e. all succeeds or fails.
232 //Data is written under the namespace what is given as a parameter for this function.
233 //Data to be written is given as key-value pairs. Several key-value
234 //pairs can be written with one call.
235 //The key is expected to be string whereas value can be anything, string,
236 //number, slice array or map
238 //If data was set successfully, an event is sent to a channel.
239 //Channels and events are given as pairs is channelsAndEvents parameter.
240 //It is possible to send several events to several channels by giving several
241 //channel-event pairs.
242 // E.g. []{"channel1", "event1", "channel2", "event2", "channel1", "event3"}
243 //will send event1 and event3 to channel1 and event2 to channel2.
244 func (s *SyncStorage) SetAndPublish(ns string, channelsAndEvents []string, pairs ...interface{}) error {
245 nsPrefix := getNsPrefix(ns)
246 keyAndData, err := s.setNamespaceToKeys(nsPrefix, pairs...)
250 if len(channelsAndEvents) == 0 {
251 return s.getDbBackend(ns).MSet(keyAndData...)
253 if err := s.checkChannelsAndEvents("SetAndPublish", channelsAndEvents); err != nil {
256 channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
257 return s.getDbBackend(ns).MSetMPub(channelsAndEventsPrepared, keyAndData...)
260 //Set function writes data to shared data layer storage. Writing is done
261 //atomically, i.e. all succeeds or fails.
262 //Data is written under the namespace what is given as a parameter for this function.
263 //Data to be written is given as key-value pairs. Several key-value
264 //pairs can be written with one call.
265 //The key is expected to be string whereas value can be anything, string,
266 //number, slice array or map
267 func (s *SyncStorage) Set(ns string, pairs ...interface{}) error {
272 keyAndData, err := s.setNamespaceToKeys(getNsPrefix(ns), pairs...)
276 return s.getDbBackend(ns).MSet(keyAndData...)
279 //Get function atomically reads one or more keys from SDL. The returned map has the
280 //requested keys as index and data as value. If the requested key is not found
281 //from SDL, it's value is nil
282 //Read operation is targeted to the namespace what is given as a parameter for this
284 func (s *SyncStorage) Get(ns string, keys []string) (map[string]interface{}, error) {
285 m := make(map[string]interface{})
290 var keysWithNs []string
291 for _, v := range keys {
292 keysWithNs = append(keysWithNs, getNsPrefix(ns)+v)
294 val, err := s.getDbBackend(ns).MGet(keysWithNs)
298 for i, v := range val {
304 //SetIfAndPublish atomically replaces existing data with newData in SDL if data matches the oldData.
305 //If replace was done successfully, true will be returned. Also, if publishing was successfull, an event
306 //is published to a given channel.
307 //Data is written under the namespace what is given as a parameter for this function.
308 func (s *SyncStorage) SetIfAndPublish(ns string, channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error) {
309 nsPrefix := getNsPrefix(ns)
310 if len(channelsAndEvents) == 0 {
311 return s.getDbBackend(ns).SetIE(nsPrefix+key, oldData, newData)
313 if err := s.checkChannelsAndEvents("SetIfAndPublish", channelsAndEvents); err != nil {
316 channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
317 return s.getDbBackend(ns).SetIEPub(channelsAndEventsPrepared, nsPrefix+key, oldData, newData)
320 //SetIf atomically replaces existing data with newData in SDL if data matches the oldData.
321 //If replace was done successfully, true will be returned.
322 //Data is written under the namespace what is given as a parameter for this function.
323 func (s *SyncStorage) SetIf(ns string, key string, oldData, newData interface{}) (bool, error) {
324 return s.getDbBackend(ns).SetIE(getNsPrefix(ns)+key, oldData, newData)
327 //SetIfNotExistsAndPublish conditionally sets the value of a key. If key already exists in SDL,
328 //then it's value is not changed. Checking the key existence and potential set operation
329 //is done atomically. If the set operation was done successfully, an event is published to a
331 //Data is written under the namespace what is given as a parameter for this function.
332 func (s *SyncStorage) SetIfNotExistsAndPublish(ns string, channelsAndEvents []string, key string, data interface{}) (bool, error) {
333 nsPrefix := getNsPrefix(ns)
334 if len(channelsAndEvents) == 0 {
335 return s.getDbBackend(ns).SetNX(nsPrefix+key, data, 0)
337 if err := s.checkChannelsAndEvents("SetIfNotExistsAndPublish", channelsAndEvents); err != nil {
340 channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
341 return s.getDbBackend(ns).SetNXPub(channelsAndEventsPrepared, nsPrefix+key, data)
344 //SetIfNotExists conditionally sets the value of a key. If key already exists in SDL,
345 //then it's value is not changed. Checking the key existence and potential set operation
346 //is done atomically.
347 //Data is written under the namespace what is given as a parameter for this function.
348 func (s *SyncStorage) SetIfNotExists(ns string, key string, data interface{}) (bool, error) {
349 return s.getDbBackend(ns).SetNX(getNsPrefix(ns)+key, data, 0)
352 //RemoveAndPublish removes data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
353 //Trying to remove a nonexisting key is not considered as an error.
354 //An event is published into a given channel if remove operation is successfull and
355 //at least one key is removed (if several keys given). If the given key(s) doesn't exist
356 //when trying to remove, no event is published.
357 //Data is removed under the namespace what is given as a parameter for this function.
358 func (s *SyncStorage) RemoveAndPublish(ns string, channelsAndEvents []string, keys []string) error {
363 var keysWithNs []string
364 nsPrefix := getNsPrefix(ns)
365 for _, v := range keys {
366 keysWithNs = append(keysWithNs, nsPrefix+v)
368 if len(channelsAndEvents) == 0 {
369 return s.getDbBackend(ns).Del(keysWithNs)
371 if err := s.checkChannelsAndEvents("RemoveAndPublish", channelsAndEvents); err != nil {
374 channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
375 return s.getDbBackend(ns).DelMPub(channelsAndEventsPrepared, keysWithNs)
378 //Remove data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
379 //Data is removed under the namespace what is given as a parameter for this function.
380 func (s *SyncStorage) Remove(ns string, keys []string) error {
385 var keysWithNs []string
386 for _, v := range keys {
387 keysWithNs = append(keysWithNs, getNsPrefix(ns)+v)
389 err := s.getDbBackend(ns).Del(keysWithNs)
393 //RemoveIfAndPublish removes data from SDL conditionally and if remove was done successfully,
394 //a given event is published to channel. If existing data matches given data,
395 //key and data are removed from SDL. If remove was done successfully, true is returned.
396 //Data is removed under the namespace what is given as a parameter for this function.
397 func (s *SyncStorage) RemoveIfAndPublish(ns string, channelsAndEvents []string, key string, data interface{}) (bool, error) {
398 nsPrefix := getNsPrefix(ns)
399 if len(channelsAndEvents) == 0 {
400 return s.getDbBackend(ns).DelIE(nsPrefix+key, data)
402 if err := s.checkChannelsAndEvents("RemoveIfAndPublish", channelsAndEvents); err != nil {
405 channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
406 return s.getDbBackend(ns).DelIEPub(channelsAndEventsPrepared, nsPrefix+key, data)
409 //RemoveIf removes data from SDL conditionally. If existing data matches given data,
410 //key and data are removed from SDL. If remove was done successfully, true is returned.
411 //Data is removed under the namespace what is given as a parameter for this function.
412 func (s *SyncStorage) RemoveIf(ns string, key string, data interface{}) (bool, error) {
413 status, err := s.getDbBackend(ns).DelIE(getNsPrefix(ns)+key, data)
420 //GetAll returns all keys under the namespace. No prior knowledge about the keys in the
421 //given namespace exists, thus operation is not guaranteed to be atomic or isolated.
422 func (s *SyncStorage) GetAll(ns string) ([]string, error) {
423 nsPrefix := getNsPrefix(ns)
424 keys, err := s.getDbBackend(ns).Keys(nsPrefix + "*")
429 for _, v := range keys {
430 retVal = append(retVal, strings.Split(v, nsPrefix)[1])
435 // ListKeys returns all keys in the given namespace matching key search pattern.
437 // Supported search glob-style patterns:
438 // h?llo matches hello, hallo and hxllo
439 // h*llo matches hllo and heeeello
440 // h[ae]llo matches hello and hallo, but not hillo
441 // h[^e]llo matches hallo, hbllo, ... but not hello
442 // h[a-b]llo matches hallo and hbllo
444 // The \ escapes character in key search pattern and those will be treated as a normal
446 // h\[?llo\* matches h[ello* and h[allo*
448 // No prior knowledge about the keys in the given namespace exists,
449 // thus operation is not guaranteed to be atomic or isolated.
450 func (s *SyncStorage) ListKeys(ns string, pattern string) ([]string, error) {
451 nsPrefix := getNsPrefix(ns)
452 nsKeys, err := s.getDbBackend(ns).Keys(nsPrefix + pattern)
457 for _, key := range nsKeys {
458 keys = append(keys, strings.Split(key, nsPrefix)[1])
463 //RemoveAll removes all keys under the namespace. Remove operation is not atomic, thus
464 //it is not guaranteed that all keys are removed.
465 func (s *SyncStorage) RemoveAll(ns string) error {
466 keys, err := s.getDbBackend(ns).Keys(getNsPrefix(ns) + "*")
470 if (keys != nil) && (len(keys) != 0) {
471 err = s.getDbBackend(ns).Del(keys)
476 //RemoveAllAndPublish removes all keys under the namespace and if successfull, it
477 //will publish an event to given channel. This operation is not atomic, thus it is
478 //not guaranteed that all keys are removed.
479 func (s *SyncStorage) RemoveAllAndPublish(ns string, channelsAndEvents []string) error {
480 nsPrefix := getNsPrefix(ns)
481 keys, err := s.getDbBackend(ns).Keys(nsPrefix + "*")
485 if (keys != nil) && (len(keys) != 0) {
486 if len(channelsAndEvents) == 0 {
487 return s.getDbBackend(ns).Del(keys)
489 if err := s.checkChannelsAndEvents("RemoveAllAndPublish", channelsAndEvents); err != nil {
492 channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
493 err = s.getDbBackend(ns).DelMPub(channelsAndEventsPrepared, keys)
498 //AddMember adds a new members to a group under given namespace.
500 //SDL groups are unordered collections of members where each member is
501 //unique. It is possible to add the same member several times without the
502 //need to check if it already exists.
503 func (s *SyncStorage) AddMember(ns string, group string, member ...interface{}) error {
504 return s.getDbBackend(ns).SAdd(getNsPrefix(ns)+group, member...)
507 //RemoveMember removes members from a group under given namespace.
508 func (s *SyncStorage) RemoveMember(ns string, group string, member ...interface{}) error {
509 return s.getDbBackend(ns).SRem(getNsPrefix(ns)+group, member...)
512 //RemoveGroup removes the whole group along with it's members under given namespace.
513 func (s *SyncStorage) RemoveGroup(ns string, group string) error {
514 return s.getDbBackend(ns).Del([]string{getNsPrefix(ns) + group})
517 //GetMembers returns all the members from a group under given namespace.
518 func (s *SyncStorage) GetMembers(ns string, group string) ([]string, error) {
519 retVal, err := s.getDbBackend(ns).SMembers(getNsPrefix(ns) + group)
521 return []string{}, err
526 //IsMember returns true if given member is found from a group under given namespace.
527 func (s *SyncStorage) IsMember(ns string, group string, member interface{}) (bool, error) {
528 retVal, err := s.getDbBackend(ns).SIsMember(getNsPrefix(ns)+group, member)
535 //GroupSize returns the number of members in a group under given namespace.
536 func (s *SyncStorage) GroupSize(ns string, group string) (int64, error) {
537 retVal, err := s.getDbBackend(ns).SCard(getNsPrefix(ns) + group)
544 func (s *SyncStorage) randomToken() (string, error) {
546 defer s.mutex.Unlock()
549 s.tmp = make([]byte, 16)
552 if _, err := io.ReadFull(rand.Reader, s.tmp); err != nil {
556 return base64.RawURLEncoding.EncodeToString(s.tmp), nil
559 //LockResource function is used for locking a resource under given namespace.
560 //The resource lock in practice is a key with random value that is set to expire
561 //after a time period. The value written to key is a random value, thus only the
562 //instance created a lock, can release it. Resource locks are per namespace.
563 func (s *SyncStorage) LockResource(ns string, resource string, expiration time.Duration, opt *Options) (*SyncStorageLock, error) {
564 value, err := s.randomToken()
569 var retryTimer *time.Timer
570 for i, attempts := 0, opt.getRetryCount()+1; i < attempts; i++ {
571 ok, err := s.getDbBackend(ns).SetNX(getNsPrefix(ns)+resource, value, expiration)
575 return &SyncStorageLock{s: s, key: resource, value: value}, nil
577 if retryTimer == nil {
578 retryTimer = time.NewTimer(opt.getRetryWait())
579 defer retryTimer.Stop()
581 retryTimer.Reset(opt.getRetryWait())
588 return nil, errors.New("Lock not obtained")
591 //ReleaseResource removes the lock from a resource under given namespace. If lock
592 //is already expired or some other instance is keeping the lock (lock taken after
593 //expiration), an error is returned.
594 func (l *SyncStorageLock) ReleaseResource(ns string) error {
595 ok, err := l.s.getDbBackend(ns).DelIE(getNsPrefix(ns)+l.key, l.value)
601 return errors.New("Lock not held")
606 //RefreshResource function can be used to set a new expiration time for the
607 //resource lock (if the lock still exists) under given namespace. The old
608 //remaining expiration time is overwritten with the given new expiration time.
609 func (l *SyncStorageLock) RefreshResource(ns string, expiration time.Duration) error {
610 err := l.s.getDbBackend(ns).PExpireIE(getNsPrefix(ns)+l.key, l.value, expiration)
614 //CheckResource returns the expiration time left for a resource under given
615 //namespace. If the resource doesn't exist, -2 is returned.
616 func (s *SyncStorage) CheckResource(ns string, resource string) (time.Duration, error) {
617 result, err := s.getDbBackend(ns).PTTL(getNsPrefix(ns) + resource)
621 if result == time.Duration(-1) {
622 return 0, errors.New("invalid resource given, no expiration time attached")
627 //SyncStorageLock struct identifies the resource lock instance. Releasing and adjusting the
628 //expirations are done using the methods defined for this struct.
629 type SyncStorageLock struct {
635 func getNsPrefix(ns string) string {
636 return "{" + ns + "}" + sdlgoredis.NsSeparator
639 type iDatabase interface {
640 SubscribeChannelDB(cb func(string, ...string), channels ...string) error
641 UnsubscribeChannelDB(channels ...string) error
642 MSet(pairs ...interface{}) error
643 MSetMPub(channelsAndEvents []string, pairs ...interface{}) error
644 MGet(keys []string) ([]interface{}, error)
646 Del(keys []string) error
647 DelMPub(channelsAndEvents []string, keys []string) error
648 Keys(key string) ([]string, error)
649 SetIE(key string, oldData, newData interface{}) (bool, error)
650 SetIEPub(channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error)
651 SetNX(key string, data interface{}, expiration time.Duration) (bool, error)
652 SetNXPub(channelsAndEvents []string, key string, data interface{}) (bool, error)
653 DelIE(key string, data interface{}) (bool, error)
654 DelIEPub(channelsAndEvents []string, key string, data interface{}) (bool, error)
655 SAdd(key string, data ...interface{}) error
656 SRem(key string, data ...interface{}) error
657 SMembers(key string) ([]string, error)
658 SIsMember(key string, data interface{}) (bool, error)
659 SCard(key string) (int64, error)
660 PTTL(key string) (time.Duration, error)
661 PExpireIE(key string, data interface{}, expiration time.Duration) error