Add support for CI jobs
[ric-plt/sdlgo.git] / sdl.go
1 /*
2    Copyright (c) 2019 AT&T Intellectual Property.
3    Copyright (c) 2018-2019 Nokia.
4
5    Licensed under the Apache License, Version 2.0 (the "License");
6    you may not use this file except in compliance with the License.
7    You may obtain a copy of the License at
8
9        http://www.apache.org/licenses/LICENSE-2.0
10
11    Unless required by applicable law or agreed to in writing, software
12    distributed under the License is distributed on an "AS IS" BASIS,
13    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14    See the License for the specific language governing permissions and
15    limitations under the License.
16 */
17
18 package sdlgo
19
20 import (
21         "crypto/rand"
22         "encoding/base64"
23         "errors"
24         "fmt"
25         "io"
26         "reflect"
27         "strings"
28         "sync"
29         "time"
30
31         "gerrit.o-ran-sc.org/r/ric-plt/sdlgo/internal/sdlgoredis"
32 )
33
34 //SdlInstance provides an API to read, write and modify
35 //key-value pairs in a given namespace.
36 type SdlInstance struct {
37         nameSpace      string
38         nsPrefix       string
39         eventSeparator string
40         mutex          sync.Mutex
41         tmp            []byte
42         iDatabase
43 }
44
45 //Database struct is a holder for the internal database instance. Applications
46 //can use this exported data type to locally store a reference to database
47 //instance returned from NewDabase() function.
48 type Database struct {
49         instance iDatabase
50 }
51
52 //NewDatabase creates a connection to database that will be used
53 //as a backend for the key-value storage. The returned value
54 //can be reused between multiple SDL instances in which case each instance
55 //is using the same connection.
56 func NewDatabase() *Database {
57         return &Database{
58                 instance: sdlgoredis.Create(),
59         }
60 }
61
62 //NewSdlInstance creates a new sdl instance using the given namespace.
63 //The database used as a backend is given as a parameter
64 func NewSdlInstance(NameSpace string, db *Database) *SdlInstance {
65         return &SdlInstance{
66                 nameSpace:      NameSpace,
67                 nsPrefix:       "{" + NameSpace + "},",
68                 eventSeparator: "___",
69                 iDatabase:      db.instance,
70         }
71 }
72
73 //SubscribeChannel lets you to subscribe for a events on a given channels.
74 //SDL notifications are events that are published on a specific channels.
75 //Both the channel and events are defined by the entity that is publishing
76 //the events.
77 //
78 //When subscribing for a channel, a callback function is given as a parameter.
79 //Whenever a notification is received from a channel, this callback is called
80 //with channel and notifications as parameter (several notifications could be
81 //packed to a single callback function call). A call to SubscribeChannel function
82 //returns immediatelly, callbacks will be called asyncronously.
83 //
84 //It is possible to subscribe to different channels using different callbacks. In
85 //this case simply use SubscribeChannel function separately for each channel.
86 //
87 //When receiving events in callback routine, it is a good practive to return from
88 //callback as quickly as possible. E.g. reading in callback context should be avoided
89 //and using of Go signals is recommended. Also it should be noted that in case of several
90 //events received from different channels, callbacks are called in series one by one.
91 //
92 //This function is NOT SAFE FOR CONCURRENT USE by multiple goroutines.
93 func (s *SdlInstance) SubscribeChannel(cb func(string, ...string), channels ...string) error {
94         s.SubscribeChannelDB(cb, s.nsPrefix, s.eventSeparator, s.setNamespaceToChannels(channels...)...)
95         return nil
96 }
97
98 //UnsubscribeChannel removes subscription from one or several channels.
99 func (s *SdlInstance) UnsubscribeChannel(channels ...string) error {
100         s.UnsubscribeChannelDB(s.setNamespaceToChannels(channels...)...)
101         return nil
102 }
103
104 //Close connection to backend database.
105 func (s *SdlInstance) Close() error {
106         return s.CloseDB()
107 }
108
109 func (s *SdlInstance) checkChannelsAndEvents(cmd string, channelsAndEvents []string) error {
110         if len(channelsAndEvents)%2 != 0 {
111                 return fmt.Errorf("%s: Channels and events must be given as pairs", cmd)
112         }
113         for i, v := range channelsAndEvents {
114                 if i%2 != 0 {
115                         if strings.Contains(v, s.eventSeparator) {
116                                 return fmt.Errorf("%s: event %s contains illegal substring (\"%s\")", cmd, v, s.eventSeparator)
117                         }
118                 }
119         }
120         return nil
121 }
122 func (s *SdlInstance) setNamespaceToChannels(channels ...string) []string {
123         var retVal []string
124         for _, v := range channels {
125                 retVal = append(retVal, s.nsPrefix+v)
126         }
127         return retVal
128 }
129
130 func (s *SdlInstance) setNamespaceToKeys(pairs ...interface{}) ([]interface{}, error) {
131         retVal := make([]interface{}, 0)
132         shouldBeKey := true
133         for _, v := range pairs {
134                 reflectType := reflect.TypeOf(v)
135                 switch reflectType.Kind() {
136                 case reflect.Map:
137                         x := reflect.ValueOf(v).MapRange()
138                         for x.Next() {
139                                 retVal = append(retVal, s.nsPrefix+x.Key().Interface().(string))
140                                 retVal = append(retVal, x.Value().Interface())
141                         }
142                 case reflect.Slice:
143                         if shouldBeKey {
144                                 x := reflect.ValueOf(v)
145                                 if x.Len()%2 != 0 {
146                                         return []interface{}{}, errors.New("Key/value pairs doesn't match")
147                                 }
148                                 for i2 := 0; i2 < x.Len(); i2++ {
149                                         if i2%2 == 0 {
150                                                 retVal = append(retVal, s.nsPrefix+x.Index(i2).Interface().(string))
151                                         } else {
152                                                 retVal = append(retVal, x.Index(i2).Interface())
153                                         }
154                                 }
155                         } else {
156                                 if reflectType.Elem().Kind() == reflect.Uint8 {
157                                         retVal = append(retVal, v)
158                                         shouldBeKey = true
159                                 } else {
160                                         return []interface{}{}, errors.New("Key/value pairs doesn't match")
161                                 }
162                         }
163                 case reflect.Array:
164                         if shouldBeKey {
165                                 x := reflect.ValueOf(v)
166                                 if x.Len()%2 != 0 {
167                                         return []interface{}{}, errors.New("Key/value pairs doesn't match")
168                                 }
169                                 for i2 := 0; i2 < x.Len(); i2++ {
170                                         if i2%2 == 0 {
171                                                 retVal = append(retVal, s.nsPrefix+x.Index(i2).Interface().(string))
172                                         } else {
173                                                 retVal = append(retVal, x.Index(i2).Interface())
174                                         }
175                                 }
176                         } else {
177                                 if reflectType.Elem().Kind() == reflect.Uint8 {
178                                         retVal = append(retVal, v)
179                                         shouldBeKey = true
180                                 } else {
181                                         return []interface{}{}, errors.New("Key/value pairs doesn't match")
182                                 }
183                         }
184                 default:
185                         if shouldBeKey {
186                                 retVal = append(retVal, s.nsPrefix+v.(string))
187                                 shouldBeKey = false
188                         } else {
189                                 retVal = append(retVal, v)
190                                 shouldBeKey = true
191                         }
192                 }
193         }
194         if len(retVal)%2 != 0 {
195                 return []interface{}{}, errors.New("Key/value pairs doesn't match")
196         }
197         return retVal, nil
198 }
199
200 func (s *SdlInstance) prepareChannelsAndEvents(channelsAndEvents []string) []string {
201         channelEventMap := make(map[string]string)
202         for i, v := range channelsAndEvents {
203                 if i%2 != 0 {
204                         continue
205                 }
206                 _, exists := channelEventMap[v]
207                 if exists {
208                         channelEventMap[v] = channelEventMap[v] + s.eventSeparator + channelsAndEvents[i+1]
209                 } else {
210                         channelEventMap[v] = channelsAndEvents[i+1]
211                 }
212         }
213         retVal := make([]string, 0)
214         for k, v := range channelEventMap {
215                 retVal = append(retVal, s.nsPrefix+k)
216                 retVal = append(retVal, v)
217         }
218         return retVal
219 }
220
221 //SetAndPublish function writes data to shared data layer storage and sends an event to
222 //a channel. Writing is done atomically, i.e. all succeeds or fails.
223 //Data to be written is given as key-value pairs. Several key-value
224 //pairs can be written with one call.
225 //The key is expected to be string whereas value can be anything, string,
226 //number, slice array or map
227 //
228 //If data was set successfully, an event is sent to a channel.
229 //Channels and events are given as pairs is channelsAndEvents parameter.
230 //It is possible to send several events to several channels by giving several
231 //channel-event pairs.
232 //  E.g. []{"channel1", "event1", "channel2", "event2", "channel1", "event3"}
233 //will send event1 and event3 to channel1 and event2 to channel2.
234 func (s *SdlInstance) SetAndPublish(channelsAndEvents []string, pairs ...interface{}) error {
235         keyAndData, err := s.setNamespaceToKeys(pairs...)
236         if err != nil {
237                 return err
238         }
239         if len(channelsAndEvents) == 0 {
240                 return s.MSet(keyAndData...)
241         }
242         if err := s.checkChannelsAndEvents("SetAndPublish", channelsAndEvents); err != nil {
243                 return err
244         }
245         channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
246         return s.MSetMPub(channelsAndEventsPrepared, keyAndData...)
247 }
248
249 //Set function writes data to shared data layer storage. Writing is done
250 //atomically, i.e. all succeeds or fails.
251 //Data to be written is given as key-value pairs. Several key-value
252 //pairs can be written with one call.
253 //The key is expected to be string whereas value can be anything, string,
254 //number, slice array or map
255 func (s *SdlInstance) Set(pairs ...interface{}) error {
256         if len(pairs) == 0 {
257                 return nil
258         }
259
260         keyAndData, err := s.setNamespaceToKeys(pairs...)
261         if err != nil {
262                 return err
263         }
264         return s.MSet(keyAndData...)
265 }
266
267 //Get function atomically reads one or more keys from SDL. The returned map has the
268 //requested keys as index and data as value. If the requested key is not found
269 //from SDL, it's value is nil
270 func (s *SdlInstance) Get(keys []string) (map[string]interface{}, error) {
271         m := make(map[string]interface{})
272         if len(keys) == 0 {
273                 return m, nil
274         }
275
276         var keysWithNs []string
277         for _, v := range keys {
278                 keysWithNs = append(keysWithNs, s.nsPrefix+v)
279         }
280         val, err := s.MGet(keysWithNs)
281         if err != nil {
282                 return m, err
283         }
284         for i, v := range val {
285                 m[keys[i]] = v
286         }
287         return m, err
288 }
289
290 //SetIfAndPublish atomically replaces existing data with newData in SDL if data matches the oldData.
291 //If replace was done successfully, true will be returned. Also, if publishing was successfull, an event
292 //is published to a given channel.
293 func (s *SdlInstance) SetIfAndPublish(channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error) {
294         if len(channelsAndEvents) == 0 {
295                 return s.SetIE(s.nsPrefix+key, oldData, newData)
296         }
297         if err := s.checkChannelsAndEvents("SetIfAndPublish", channelsAndEvents); err != nil {
298                 return false, err
299         }
300         channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
301         return s.SetIEPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], s.nsPrefix+key, oldData, newData)
302 }
303
304 //SetIf atomically replaces existing data with newData in SDL if data matches the oldData.
305 //If replace was done successfully, true will be returned.
306 func (s *SdlInstance) SetIf(key string, oldData, newData interface{}) (bool, error) {
307         return s.SetIE(s.nsPrefix+key, oldData, newData)
308 }
309
310 //SetIfNotExistsAndPublish conditionally sets the value of a key. If key already exists in SDL,
311 //then it's value is not changed. Checking the key existence and potential set operation
312 //is done atomically. If the set operation was done successfully, an event is published to a
313 //given channel.
314 func (s *SdlInstance) SetIfNotExistsAndPublish(channelsAndEvents []string, key string, data interface{}) (bool, error) {
315         if len(channelsAndEvents) == 0 {
316                 return s.SetNX(s.nsPrefix+key, data, 0)
317         }
318         if err := s.checkChannelsAndEvents("SetIfNotExistsAndPublish", channelsAndEvents); err != nil {
319                 return false, err
320         }
321         channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
322         return s.SetNXPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], s.nsPrefix+key, data)
323 }
324
325 //SetIfNotExists conditionally sets the value of a key. If key already exists in SDL,
326 //then it's value is not changed. Checking the key existence and potential set operation
327 //is done atomically.
328 func (s *SdlInstance) SetIfNotExists(key string, data interface{}) (bool, error) {
329         return s.SetNX(s.nsPrefix+key, data, 0)
330 }
331
332 //RemoveAndPublish removes data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
333 //Trying to remove a nonexisting key is not considered as an error.
334 //An event is published into a given channel if remove operation is successfull and
335 //at least one key is removed (if several keys given). If the given key(s) doesn't exist
336 //when trying to remove, no event is published.
337 func (s *SdlInstance) RemoveAndPublish(channelsAndEvents []string, keys []string) error {
338         if len(keys) == 0 {
339                 return nil
340         }
341
342         var keysWithNs []string
343         for _, v := range keys {
344                 keysWithNs = append(keysWithNs, s.nsPrefix+v)
345         }
346         if len(channelsAndEvents) == 0 {
347                 return s.Del(keysWithNs)
348         }
349         if err := s.checkChannelsAndEvents("RemoveAndPublish", channelsAndEvents); err != nil {
350                 return err
351         }
352         channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
353         return s.DelMPub(channelsAndEventsPrepared, keysWithNs)
354 }
355
356 //Remove data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
357 func (s *SdlInstance) Remove(keys []string) error {
358         if len(keys) == 0 {
359                 return nil
360         }
361
362         var keysWithNs []string
363         for _, v := range keys {
364                 keysWithNs = append(keysWithNs, s.nsPrefix+v)
365         }
366         err := s.Del(keysWithNs)
367         return err
368 }
369
370 //RemoveIfAndPublish removes data from SDL conditionally and if remove was done successfully,
371 //a given event is published to channel. If existing data matches given data,
372 //key and data are removed from SDL. If remove was done successfully, true is returned.
373 func (s *SdlInstance) RemoveIfAndPublish(channelsAndEvents []string, key string, data interface{}) (bool, error) {
374         if len(channelsAndEvents) == 0 {
375                 return s.DelIE(s.nsPrefix+key, data)
376         }
377         if err := s.checkChannelsAndEvents("RemoveIfAndPublish", channelsAndEvents); err != nil {
378                 return false, err
379         }
380         channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
381         return s.DelIEPub(channelsAndEventsPrepared[0], channelsAndEventsPrepared[1], s.nsPrefix+key, data)
382 }
383
384 //RemoveIf removes data from SDL conditionally. If existing data matches given data,
385 //key and data are removed from SDL. If remove was done successfully, true is returned.
386 func (s *SdlInstance) RemoveIf(key string, data interface{}) (bool, error) {
387         status, err := s.DelIE(s.nsPrefix+key, data)
388         if err != nil {
389                 return false, err
390         }
391         return status, nil
392 }
393
394 //GetAll returns all keys under the namespace. No prior knowledge about the keys in the
395 //given namespace exists, thus operation is not guaranteed to be atomic or isolated.
396 func (s *SdlInstance) GetAll() ([]string, error) {
397         keys, err := s.Keys(s.nsPrefix + "*")
398         var retVal []string
399         if err != nil {
400                 return retVal, err
401         }
402         for _, v := range keys {
403                 retVal = append(retVal, strings.Split(v, s.nsPrefix)[1])
404         }
405         return retVal, err
406 }
407
408 //RemoveAll removes all keys under the namespace. Remove operation is not atomic, thus
409 //it is not guaranteed that all keys are removed.
410 func (s *SdlInstance) RemoveAll() error {
411         keys, err := s.Keys(s.nsPrefix + "*")
412         if err != nil {
413                 return err
414         }
415         if (keys != nil) && (len(keys) != 0) {
416                 err = s.Del(keys)
417         }
418         return err
419 }
420
421 //RemoveAllAndPublish removes all keys under the namespace and if successfull, it
422 //will publish an event to given channel. This operation is not atomic, thus it is
423 //not guaranteed that all keys are removed.
424 func (s *SdlInstance) RemoveAllAndPublish(channelsAndEvents []string) error {
425         keys, err := s.Keys(s.nsPrefix + "*")
426         if err != nil {
427                 return err
428         }
429         if (keys != nil) && (len(keys) != 0) {
430                 if len(channelsAndEvents) == 0 {
431                         return s.Del(keys)
432                 }
433                 if err := s.checkChannelsAndEvents("RemoveAllAndPublish", channelsAndEvents); err != nil {
434                         return err
435                 }
436                 channelsAndEventsPrepared := s.prepareChannelsAndEvents(channelsAndEvents)
437                 err = s.DelMPub(channelsAndEventsPrepared, keys)
438         }
439         return err
440 }
441
442 //AddMember adds a new members to a group.
443 //
444 //SDL groups are unordered collections of members where each member is
445 //unique. It is possible to add the same member several times without the
446 //need to check if it already exists.
447 func (s *SdlInstance) AddMember(group string, member ...interface{}) error {
448         return s.SAdd(s.nsPrefix+group, member...)
449 }
450
451 //RemoveMember removes members from a group.
452 func (s *SdlInstance) RemoveMember(group string, member ...interface{}) error {
453         return s.SRem(s.nsPrefix+group, member...)
454 }
455
456 //RemoveGroup removes the whole group along with it's members.
457 func (s *SdlInstance) RemoveGroup(group string) error {
458         return s.Del([]string{s.nsPrefix + group})
459 }
460
461 //GetMembers returns all the members from a group.
462 func (s *SdlInstance) GetMembers(group string) ([]string, error) {
463         retVal, err := s.SMembers(s.nsPrefix + group)
464         if err != nil {
465                 return []string{}, err
466         }
467         return retVal, err
468 }
469
470 //IsMember returns true if given member is found from a group.
471 func (s *SdlInstance) IsMember(group string, member interface{}) (bool, error) {
472         retVal, err := s.SIsMember(s.nsPrefix+group, member)
473         if err != nil {
474                 return false, err
475         }
476         return retVal, err
477 }
478
479 //GroupSize returns the number of members in a group.
480 func (s *SdlInstance) GroupSize(group string) (int64, error) {
481         retVal, err := s.SCard(s.nsPrefix + group)
482         if err != nil {
483                 return 0, err
484         }
485         return retVal, err
486 }
487
488 func (s *SdlInstance) randomToken() (string, error) {
489         s.mutex.Lock()
490         defer s.mutex.Unlock()
491
492         if len(s.tmp) == 0 {
493                 s.tmp = make([]byte, 16)
494         }
495
496         if _, err := io.ReadFull(rand.Reader, s.tmp); err != nil {
497                 return "", err
498         }
499
500         return base64.RawURLEncoding.EncodeToString(s.tmp), nil
501 }
502
503 //LockResource function is used for locking a resource. The resource lock in
504 //practice is a key with random value that is set to expire after a time
505 //period. The value written to key is a random value, thus only the instance
506 //created a lock, can release it. Resource locks are per namespace.
507 func (s *SdlInstance) LockResource(resource string, expiration time.Duration, opt *Options) (*Lock, error) {
508         value, err := s.randomToken()
509         if err != nil {
510                 return nil, err
511         }
512
513         var retryTimer *time.Timer
514         for i, attempts := 0, opt.getRetryCount()+1; i < attempts; i++ {
515                 ok, err := s.SetNX(s.nsPrefix+resource, value, expiration)
516                 if err != nil {
517                         return nil, err
518                 } else if ok {
519                         return &Lock{s: s, key: resource, value: value}, nil
520                 }
521                 if retryTimer == nil {
522                         retryTimer = time.NewTimer(opt.getRetryWait())
523                         defer retryTimer.Stop()
524                 } else {
525                         retryTimer.Reset(opt.getRetryWait())
526                 }
527
528                 select {
529                 case <-retryTimer.C:
530                 }
531         }
532         return nil, errors.New("Lock not obtained")
533 }
534
535 //ReleaseResource removes the lock from a resource. If lock is already
536 //expired or some other instance is keeping the lock (lock taken after expiration),
537 //an error is returned.
538 func (l *Lock) ReleaseResource() error {
539         ok, err := l.s.DelIE(l.s.nsPrefix+l.key, l.value)
540
541         if err != nil {
542                 return err
543         }
544         if !ok {
545                 return errors.New("Lock not held")
546         }
547         return nil
548 }
549
550 //RefreshResource function can be used to set a new expiration time for the
551 //resource lock (if the lock still exists). The old remaining expiration
552 //time is overwritten with the given new expiration time.
553 func (l *Lock) RefreshResource(expiration time.Duration) error {
554         err := l.s.PExpireIE(l.s.nsPrefix+l.key, l.value, expiration)
555         return err
556 }
557
558 //CheckResource returns the expiration time left for a resource.
559 //If the resource doesn't exist, -2 is returned.
560 func (s *SdlInstance) CheckResource(resource string) (time.Duration, error) {
561         result, err := s.PTTL(s.nsPrefix + resource)
562         if err != nil {
563                 return 0, err
564         }
565         if result == time.Duration(-1) {
566                 return 0, errors.New("invalid resource given, no expiration time attached")
567         }
568         return result, nil
569 }
570
571 //Options struct defines the behaviour for getting the resource lock.
572 type Options struct {
573         //The number of time the lock will be tried.
574         //Default: 0 = no retry
575         RetryCount int
576
577         //Wait between the retries.
578         //Default: 100ms
579         RetryWait time.Duration
580 }
581
582 func (o *Options) getRetryCount() int {
583         if o != nil && o.RetryCount > 0 {
584                 return o.RetryCount
585         }
586         return 0
587 }
588
589 func (o *Options) getRetryWait() time.Duration {
590         if o != nil && o.RetryWait > 0 {
591                 return o.RetryWait
592         }
593         return 100 * time.Millisecond
594 }
595
596 //Lock struct identifies the resource lock instance. Releasing and adjusting the
597 //expirations are done using the methods defined for this struct.
598 type Lock struct {
599         s     *SdlInstance
600         key   string
601         value string
602 }
603
604 type iDatabase interface {
605         SubscribeChannelDB(cb func(string, ...string), channelPrefix, eventSeparator string, channels ...string)
606         UnsubscribeChannelDB(channels ...string)
607         MSet(pairs ...interface{}) error
608         MSetMPub(channelsAndEvents []string, pairs ...interface{}) error
609         MGet(keys []string) ([]interface{}, error)
610         CloseDB() error
611         Del(keys []string) error
612         DelMPub(channelsAndEvents []string, keys []string) error
613         Keys(key string) ([]string, error)
614         SetIE(key string, oldData, newData interface{}) (bool, error)
615         SetIEPub(channel, message, key string, oldData, newData interface{}) (bool, error)
616         SetNX(key string, data interface{}, expiration time.Duration) (bool, error)
617         SetNXPub(channel, message, key string, data interface{}) (bool, error)
618         DelIE(key string, data interface{}) (bool, error)
619         DelIEPub(channel, message, key string, data interface{}) (bool, error)
620         SAdd(key string, data ...interface{}) error
621         SRem(key string, data ...interface{}) error
622         SMembers(key string) ([]string, error)
623         SIsMember(key string, data interface{}) (bool, error)
624         SCard(key string) (int64, error)
625         PTTL(key string) (time.Duration, error)
626         PExpireIE(key string, data interface{}, expiration time.Duration) error
627 }