2 Copyright (c) 2021 AT&T Intellectual Property.
3 Copyright (c) 2018-2021 Nokia.
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
9 http://www.apache.org/licenses/LICENSE-2.0
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
19 * This source code is part of the near-RT RIC (RAN Intelligent Controller)
20 * platform project (RICP).
29 "gerrit.o-ran-sc.org/r/ric-plt/sdlgo/internal/cli"
31 "gerrit.o-ran-sc.org/r/ric-plt/sdlgo/internal/sdlgoredis"
40 //SyncStorage provides multi-namespace APIs to read, write and modify key-value
41 //pairs. key-values are belonging to a namespace and SyncStorage provides APIs
42 //where namespace can be given in every API call. This means that with
43 //SyncStorage you can easily set key-values under different namespace compared to
44 //SdlInstance where namespace can be defined only at SdlInstance instance creation
46 type SyncStorage struct {
52 //NewSyncStorage creates a new sdl instance.
53 //The database used as a backend is given as a parameter
54 func NewSyncStorage() *SyncStorage {
55 return newSyncStorage(NewDatabase())
57 //Function to run the health check
58 func runHealthCheck(dbCreateCb DbCreateCb) ([]sdlgoredis.DbState, error) {
60 var states []sdlgoredis.DbState
61 for _, dbInst := range dbCreateCb().Instances {
62 state, err := dbInst.State()
66 states = append(states, *state)
70 //NewSyncStorage creates a new sdl instance.
71 //The database used as a backend is given as a parameter
72 func newSyncStorage(db *Database) *SyncStorage {
78 //selectDbInstance Selects DB instance what provides DB services for the namespace
79 func (s *SyncStorage) getDbBackend(ns string) iDatabase {
80 instanceCount := uint32(len(s.db.instances))
81 instanceID := getHash(ns) % instanceCount
82 return s.db.instances[instanceID]
85 //getHash Returns hash value calculated from the string
86 func getHash(s string) uint32 {
87 tbl := crc32.MakeTable(crc32.IEEE)
88 return crc32.Checksum([]byte(s), tbl)
91 //SubscribeChannel lets you to subscribe for a events on a given channels.
92 //SDL notifications are events that are published on a specific channels.
93 //Both the channel and events are defined by the entity that is publishing
94 //the events under given namespace.
96 //When subscribing for a channel, a callback function is given as a parameter.
97 //Whenever a notification is received from a channel, this callback is called
98 //with channel and notifications as parameter (several notifications could be
99 //packed to a single callback function call). A call to SubscribeChannel function
100 //returns immediatelly, callbacks will be called asyncronously.
102 //It is possible to subscribe to different channels using different callbacks. In
103 //this case simply use SubscribeChannel function separately for each channel.
105 //When receiving events in callback routine, it is a good practive to return from
106 //callback as quickly as possible. E.g. reading in callback context should be avoided
107 //and using of Go signals is recommended. Also it should be noted that in case of several
108 //events received from different channels, callbacks are called in series one by one.
109 func (s *SyncStorage) SubscribeChannel(ns string, cb func(string, ...string), channels ...string) error {
110 nsPrefix := getNsPrefix(ns)
111 return s.getDbBackend(ns).SubscribeChannelDB(cb, s.setNamespaceToChannels(nsPrefix, channels...)...)
114 //UnsubscribeChannel removes subscription from one or several channels under given
116 func (s *SyncStorage) UnsubscribeChannel(ns string, channels ...string) error {
117 nsPrefix := getNsPrefix(ns)
118 return s.getDbBackend(ns).UnsubscribeChannelDB(s.setNamespaceToChannels(nsPrefix, channels...)...)
121 //Close connection to backend database.
122 func (s *SyncStorage) Close() error {
124 for _, db := range s.db.instances {
125 if err := db.CloseDB(); err != nil {
132 func (s *SyncStorage) checkChannelsAndEvents(cmd string, channelsAndEvents []string) error {
133 if len(channelsAndEvents)%2 != 0 {
134 return fmt.Errorf("%s: Channels and events must be given as pairs", cmd)
136 for i, v := range channelsAndEvents {
138 if strings.Contains(v, sdlgoredis.EventSeparator) {
139 return fmt.Errorf("%s: event %s contains illegal substring (\"%s\")", cmd, v, sdlgoredis.EventSeparator)
146 func (s *SyncStorage) setNamespaceToChannels(nsPrefix string, channels ...string) []string {
148 for _, v := range channels {
149 retVal = append(retVal, nsPrefix+v)
154 func (s *SyncStorage) setNamespaceToKeys(nsPrefix string, pairs ...interface{}) ([]interface{}, error) {
155 retVal := make([]interface{}, 0)
157 for _, v := range pairs {
158 reflectType := reflect.TypeOf(v)
159 switch reflectType.Kind() {
161 x := reflect.ValueOf(v).MapRange()
163 retVal = append(retVal, nsPrefix+x.Key().Interface().(string))
164 retVal = append(retVal, x.Value().Interface())
168 x := reflect.ValueOf(v)
170 return []interface{}{}, errors.New("Key/value pairs doesn't match")
172 for i2 := 0; i2 < x.Len(); i2++ {
174 retVal = append(retVal, nsPrefix+x.Index(i2).Interface().(string))
176 retVal = append(retVal, x.Index(i2).Interface())
180 if reflectType.Elem().Kind() == reflect.Uint8 {
181 retVal = append(retVal, v)
184 return []interface{}{}, errors.New("Key/value pairs doesn't match")
189 x := reflect.ValueOf(v)
191 return []interface{}{}, errors.New("Key/value pairs doesn't match")
193 for i2 := 0; i2 < x.Len(); i2++ {
195 retVal = append(retVal, nsPrefix+x.Index(i2).Interface().(string))
197 retVal = append(retVal, x.Index(i2).Interface())
201 if reflectType.Elem().Kind() == reflect.Uint8 {
202 retVal = append(retVal, v)
205 return []interface{}{}, errors.New("Key/value pairs doesn't match")
210 retVal = append(retVal, nsPrefix+v.(string))
213 retVal = append(retVal, v)
218 if len(retVal)%2 != 0 {
219 return []interface{}{}, errors.New("Key/value pairs doesn't match")
224 func (s *SyncStorage) prepareChannelsAndEvents(nsPrefix string, channelsAndEvents []string) []string {
225 channelEventMap := make(map[string]string)
226 for i, v := range channelsAndEvents {
230 _, exists := channelEventMap[v]
232 channelEventMap[v] = channelEventMap[v] + sdlgoredis.EventSeparator + channelsAndEvents[i+1]
234 channelEventMap[v] = channelsAndEvents[i+1]
237 retVal := make([]string, 0)
238 for k, v := range channelEventMap {
239 retVal = append(retVal, nsPrefix+k)
240 retVal = append(retVal, v)
245 //SetAndPublish function writes data to shared data layer storage and sends an event to
246 //a channel. Writing is done atomically, i.e. all succeeds or fails.
247 //Data is written under the namespace what is given as a parameter for this function.
248 //Data to be written is given as key-value pairs. Several key-value
249 //pairs can be written with one call.
250 //The key is expected to be string whereas value can be anything, string,
251 //number, slice array or map
253 //If data was set successfully, an event is sent to a channel.
254 //Channels and events are given as pairs is channelsAndEvents parameter.
255 //It is possible to send several events to several channels by giving several
256 //channel-event pairs.
257 // E.g. []{"channel1", "event1", "channel2", "event2", "channel1", "event3"}
258 //will send event1 and event3 to channel1 and event2 to channel2.
259 func (s *SyncStorage) SetAndPublish(ns string, channelsAndEvents []string, pairs ...interface{}) error {
260 nsPrefix := getNsPrefix(ns)
261 keyAndData, err := s.setNamespaceToKeys(nsPrefix, pairs...)
265 if len(channelsAndEvents) == 0 {
266 return s.getDbBackend(ns).MSet(keyAndData...)
268 if err := s.checkChannelsAndEvents("SetAndPublish", channelsAndEvents); err != nil {
271 channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
272 return s.getDbBackend(ns).MSetMPub(channelsAndEventsPrepared, keyAndData...)
275 //Set function writes data to shared data layer storage. Writing is done
276 //atomically, i.e. all succeeds or fails.
277 //Data is written under the namespace what is given as a parameter for this function.
278 //Data to be written is given as key-value pairs. Several key-value
279 //pairs can be written with one call.
280 //The key is expected to be string whereas value can be anything, string,
281 //number, slice array or map
282 func (s *SyncStorage) Set(ns string, pairs ...interface{}) error {
287 keyAndData, err := s.setNamespaceToKeys(getNsPrefix(ns), pairs...)
291 return s.getDbBackend(ns).MSet(keyAndData...)
294 //Get function atomically reads one or more keys from SDL. The returned map has the
295 //requested keys as index and data as value. If the requested key is not found
296 //from SDL, it's value is nil
297 //Read operation is targeted to the namespace what is given as a parameter for this
299 func (s *SyncStorage) Get(ns string, keys []string) (map[string]interface{}, error) {
300 m := make(map[string]interface{})
305 var keysWithNs []string
306 for _, v := range keys {
307 keysWithNs = append(keysWithNs, getNsPrefix(ns)+v)
309 val, err := s.getDbBackend(ns).MGet(keysWithNs)
313 for i, v := range val {
319 //SetIfAndPublish atomically replaces existing data with newData in SDL if data matches the oldData.
320 //If replace was done successfully, true will be returned. Also, if publishing was successfull, an event
321 //is published to a given channel.
322 //Data is written under the namespace what is given as a parameter for this function.
323 func (s *SyncStorage) SetIfAndPublish(ns string, channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error) {
324 nsPrefix := getNsPrefix(ns)
325 if len(channelsAndEvents) == 0 {
326 return s.getDbBackend(ns).SetIE(nsPrefix+key, oldData, newData)
328 if err := s.checkChannelsAndEvents("SetIfAndPublish", channelsAndEvents); err != nil {
331 channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
332 return s.getDbBackend(ns).SetIEPub(channelsAndEventsPrepared, nsPrefix+key, oldData, newData)
335 //SetIf atomically replaces existing data with newData in SDL if data matches the oldData.
336 //If replace was done successfully, true will be returned.
337 //Data is written under the namespace what is given as a parameter for this function.
338 func (s *SyncStorage) SetIf(ns string, key string, oldData, newData interface{}) (bool, error) {
339 return s.getDbBackend(ns).SetIE(getNsPrefix(ns)+key, oldData, newData)
342 //SetIfNotExistsAndPublish conditionally sets the value of a key. If key already exists in SDL,
343 //then it's value is not changed. Checking the key existence and potential set operation
344 //is done atomically. If the set operation was done successfully, an event is published to a
346 //Data is written under the namespace what is given as a parameter for this function.
347 func (s *SyncStorage) SetIfNotExistsAndPublish(ns string, channelsAndEvents []string, key string, data interface{}) (bool, error) {
348 nsPrefix := getNsPrefix(ns)
349 if len(channelsAndEvents) == 0 {
350 return s.getDbBackend(ns).SetNX(nsPrefix+key, data, 0)
352 if err := s.checkChannelsAndEvents("SetIfNotExistsAndPublish", channelsAndEvents); err != nil {
355 channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
356 return s.getDbBackend(ns).SetNXPub(channelsAndEventsPrepared, nsPrefix+key, data)
359 //SetIfNotExists conditionally sets the value of a key. If key already exists in SDL,
360 //then it's value is not changed. Checking the key existence and potential set operation
361 //is done atomically.
362 //Data is written under the namespace what is given as a parameter for this function.
363 func (s *SyncStorage) SetIfNotExists(ns string, key string, data interface{}) (bool, error) {
364 return s.getDbBackend(ns).SetNX(getNsPrefix(ns)+key, data, 0)
367 //RemoveAndPublish removes data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
368 //Trying to remove a nonexisting key is not considered as an error.
369 //An event is published into a given channel if remove operation is successfull and
370 //at least one key is removed (if several keys given). If the given key(s) doesn't exist
371 //when trying to remove, no event is published.
372 //Data is removed under the namespace what is given as a parameter for this function.
373 func (s *SyncStorage) RemoveAndPublish(ns string, channelsAndEvents []string, keys []string) error {
378 var keysWithNs []string
379 nsPrefix := getNsPrefix(ns)
380 for _, v := range keys {
381 keysWithNs = append(keysWithNs, nsPrefix+v)
383 if len(channelsAndEvents) == 0 {
384 return s.getDbBackend(ns).Del(keysWithNs)
386 if err := s.checkChannelsAndEvents("RemoveAndPublish", channelsAndEvents); err != nil {
389 channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
390 return s.getDbBackend(ns).DelMPub(channelsAndEventsPrepared, keysWithNs)
393 //Remove data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
394 //Data is removed under the namespace what is given as a parameter for this function.
395 func (s *SyncStorage) Remove(ns string, keys []string) error {
400 var keysWithNs []string
401 for _, v := range keys {
402 keysWithNs = append(keysWithNs, getNsPrefix(ns)+v)
404 err := s.getDbBackend(ns).Del(keysWithNs)
408 //RemoveIfAndPublish removes data from SDL conditionally and if remove was done successfully,
409 //a given event is published to channel. If existing data matches given data,
410 //key and data are removed from SDL. If remove was done successfully, true is returned.
411 //Data is removed under the namespace what is given as a parameter for this function.
412 func (s *SyncStorage) RemoveIfAndPublish(ns string, channelsAndEvents []string, key string, data interface{}) (bool, error) {
413 nsPrefix := getNsPrefix(ns)
414 if len(channelsAndEvents) == 0 {
415 return s.getDbBackend(ns).DelIE(nsPrefix+key, data)
417 if err := s.checkChannelsAndEvents("RemoveIfAndPublish", channelsAndEvents); err != nil {
420 channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
421 return s.getDbBackend(ns).DelIEPub(channelsAndEventsPrepared, nsPrefix+key, data)
424 //RemoveIf removes data from SDL conditionally. If existing data matches given data,
425 //key and data are removed from SDL. If remove was done successfully, true is returned.
426 //Data is removed under the namespace what is given as a parameter for this function.
427 func (s *SyncStorage) RemoveIf(ns string, key string, data interface{}) (bool, error) {
428 status, err := s.getDbBackend(ns).DelIE(getNsPrefix(ns)+key, data)
435 //GetAll returns all keys under the namespace. No prior knowledge about the keys in the
436 //given namespace exists, thus operation is not guaranteed to be atomic or isolated.
437 func (s *SyncStorage) GetAll(ns string) ([]string, error) {
438 nsPrefix := getNsPrefix(ns)
439 keys, err := s.getDbBackend(ns).Keys(nsPrefix + "*")
444 for _, v := range keys {
445 retVal = append(retVal, strings.Split(v, nsPrefix)[1])
450 // ListKeys returns all keys in the given namespace matching key search pattern.
452 // Supported search glob-style patterns:
453 // h?llo matches hello, hallo and hxllo
454 // h*llo matches hllo and heeeello
455 // h[ae]llo matches hello and hallo, but not hillo
456 // h[^e]llo matches hallo, hbllo, ... but not hello
457 // h[a-b]llo matches hallo and hbllo
459 // The \ escapes character in key search pattern and those will be treated as a normal
461 // h\[?llo\* matches h[ello* and h[allo*
463 // No prior knowledge about the keys in the given namespace exists,
464 // thus operation is not guaranteed to be atomic or isolated.
465 func (s *SyncStorage) ListKeys(ns string, pattern string) ([]string, error) {
466 nsPrefix := getNsPrefix(ns)
467 nsKeys, err := s.getDbBackend(ns).Keys(nsPrefix + pattern)
472 for _, key := range nsKeys {
473 keys = append(keys, strings.Split(key, nsPrefix)[1])
478 //RemoveAll removes all keys under the namespace. Remove operation is not atomic, thus
479 //it is not guaranteed that all keys are removed.
480 func (s *SyncStorage) RemoveAll(ns string) error {
481 keys, err := s.getDbBackend(ns).Keys(getNsPrefix(ns) + "*")
485 if (keys != nil) && (len(keys) != 0) {
486 err = s.getDbBackend(ns).Del(keys)
491 //RemoveAllAndPublish removes all keys under the namespace and if successfull, it
492 //will publish an event to given channel. This operation is not atomic, thus it is
493 //not guaranteed that all keys are removed.
494 func (s *SyncStorage) RemoveAllAndPublish(ns string, channelsAndEvents []string) error {
495 nsPrefix := getNsPrefix(ns)
496 keys, err := s.getDbBackend(ns).Keys(nsPrefix + "*")
500 if (keys != nil) && (len(keys) != 0) {
501 if len(channelsAndEvents) == 0 {
502 return s.getDbBackend(ns).Del(keys)
504 if err := s.checkChannelsAndEvents("RemoveAllAndPublish", channelsAndEvents); err != nil {
507 channelsAndEventsPrepared := s.prepareChannelsAndEvents(nsPrefix, channelsAndEvents)
508 err = s.getDbBackend(ns).DelMPub(channelsAndEventsPrepared, keys)
513 //AddMember adds a new members to a group under given namespace.
515 //SDL groups are unordered collections of members where each member is
516 //unique. It is possible to add the same member several times without the
517 //need to check if it already exists.
518 func (s *SyncStorage) AddMember(ns string, group string, member ...interface{}) error {
519 return s.getDbBackend(ns).SAdd(getNsPrefix(ns)+group, member...)
522 //RemoveMember removes members from a group under given namespace.
523 func (s *SyncStorage) RemoveMember(ns string, group string, member ...interface{}) error {
524 return s.getDbBackend(ns).SRem(getNsPrefix(ns)+group, member...)
527 //RemoveGroup removes the whole group along with it's members under given namespace.
528 func (s *SyncStorage) RemoveGroup(ns string, group string) error {
529 return s.getDbBackend(ns).Del([]string{getNsPrefix(ns) + group})
532 //GetMembers returns all the members from a group under given namespace.
533 func (s *SyncStorage) GetMembers(ns string, group string) ([]string, error) {
534 retVal, err := s.getDbBackend(ns).SMembers(getNsPrefix(ns) + group)
536 return []string{}, err
541 //IsMember returns true if given member is found from a group under given namespace.
542 func (s *SyncStorage) IsMember(ns string, group string, member interface{}) (bool, error) {
543 retVal, err := s.getDbBackend(ns).SIsMember(getNsPrefix(ns)+group, member)
550 //GroupSize returns the number of members in a group under given namespace.
551 func (s *SyncStorage) GroupSize(ns string, group string) (int64, error) {
552 retVal, err := s.getDbBackend(ns).SCard(getNsPrefix(ns) + group)
559 func (s *SyncStorage) randomToken() (string, error) {
561 defer s.mutex.Unlock()
564 s.tmp = make([]byte, 16)
567 if _, err := io.ReadFull(rand.Reader, s.tmp); err != nil {
571 return base64.RawURLEncoding.EncodeToString(s.tmp), nil
574 //LockResource function is used for locking a resource under given namespace.
575 //The resource lock in practice is a key with random value that is set to expire
576 //after a time period. The value written to key is a random value, thus only the
577 //instance created a lock, can release it. Resource locks are per namespace.
578 func (s *SyncStorage) LockResource(ns string, resource string, expiration time.Duration, opt *Options) (*SyncStorageLock, error) {
579 value, err := s.randomToken()
584 var retryTimer *time.Timer
585 for i, attempts := 0, opt.getRetryCount()+1; i < attempts; i++ {
586 ok, err := s.getDbBackend(ns).SetNX(getNsPrefix(ns)+resource, value, expiration)
590 return &SyncStorageLock{s: s, key: resource, value: value}, nil
592 if retryTimer == nil {
593 retryTimer = time.NewTimer(opt.getRetryWait())
594 defer retryTimer.Stop()
596 retryTimer.Reset(opt.getRetryWait())
603 return nil, errors.New("Lock not obtained")
606 //ReleaseResource removes the lock from a resource under given namespace. If lock
607 //is already expired or some other instance is keeping the lock (lock taken after
608 //expiration), an error is returned.
609 func (l *SyncStorageLock) ReleaseResource(ns string) error {
610 ok, err := l.s.getDbBackend(ns).DelIE(getNsPrefix(ns)+l.key, l.value)
616 return errors.New("Lock not held")
621 //RefreshResource function can be used to set a new expiration time for the
622 //resource lock (if the lock still exists) under given namespace. The old
623 //remaining expiration time is overwritten with the given new expiration time.
624 func (l *SyncStorageLock) RefreshResource(ns string, expiration time.Duration) error {
625 err := l.s.getDbBackend(ns).PExpireIE(getNsPrefix(ns)+l.key, l.value, expiration)
629 //CheckResource returns the expiration time left for a resource under given
630 //namespace. If the resource doesn't exist, -2 is returned.
631 func (s *SyncStorage) CheckResource(ns string, resource string) (time.Duration, error) {
632 result, err := s.getDbBackend(ns).PTTL(getNsPrefix(ns) + resource)
636 if result == time.Duration(-1) {
637 return 0, errors.New("invalid resource given, no expiration time attached")
642 //SyncStorageLock struct identifies the resource lock instance. Releasing and adjusting the
643 //expirations are done using the methods defined for this struct.
644 type SyncStorageLock struct {
650 func getNsPrefix(ns string) string {
651 return "{" + ns + "}" + sdlgoredis.NsSeparator
654 type iDatabase interface {
655 SubscribeChannelDB(cb func(string, ...string), channels ...string) error
656 UnsubscribeChannelDB(channels ...string) error
657 MSet(pairs ...interface{}) error
658 MSetMPub(channelsAndEvents []string, pairs ...interface{}) error
659 MGet(keys []string) ([]interface{}, error)
661 Del(keys []string) error
662 DelMPub(channelsAndEvents []string, keys []string) error
663 Keys(key string) ([]string, error)
664 SetIE(key string, oldData, newData interface{}) (bool, error)
665 SetIEPub(channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error)
666 SetNX(key string, data interface{}, expiration time.Duration) (bool, error)
667 SetNXPub(channelsAndEvents []string, key string, data interface{}) (bool, error)
668 DelIE(key string, data interface{}) (bool, error)
669 DelIEPub(channelsAndEvents []string, key string, data interface{}) (bool, error)
670 SAdd(key string, data ...interface{}) error
671 SRem(key string, data ...interface{}) error
672 SMembers(key string) ([]string, error)
673 SIsMember(key string, data interface{}) (bool, error)
674 SCard(key string) (int64, error)
675 PTTL(key string) (time.Duration, error)
676 PExpireIE(key string, data interface{}, expiration time.Duration) error