Implement sentinel based DB capacity scaling
[ric-plt/sdlgo.git] / sdl.go
1 /*
2    Copyright (c) 2019 AT&T Intellectual Property.
3    Copyright (c) 2018-2019 Nokia.
4
5    Licensed under the Apache License, Version 2.0 (the "License");
6    you may not use this file except in compliance with the License.
7    You may obtain a copy of the License at
8
9        http://www.apache.org/licenses/LICENSE-2.0
10
11    Unless required by applicable law or agreed to in writing, software
12    distributed under the License is distributed on an "AS IS" BASIS,
13    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14    See the License for the specific language governing permissions and
15    limitations under the License.
16 */
17
18 /*
19  * This source code is part of the near-RT RIC (RAN Intelligent Controller)
20  * platform project (RICP).
21  */
22
23 package sdlgo
24
25 import (
26         "gerrit.o-ran-sc.org/r/ric-plt/sdlgo/internal/sdlgoredis"
27         "time"
28 )
29
30 //SdlInstance provides an API to read, write and modify
31 //key-value pairs in a given namespace.
32 type SdlInstance struct {
33         nameSpace string
34         nsPrefix  string
35         storage   *SyncStorage
36 }
37
38 //Database struct is a holder for the internal database instance. Applications
39 //can use this exported data type to locally store a reference to database
40 //instance returned from NewDabase() function.
41 type Database struct {
42         instances []iDatabase
43 }
44
45 //NewDatabase creates a connection to database that will be used
46 //as a backend for the key-value storage. The returned value
47 //can be reused between multiple SDL instances in which case each instance
48 //is using the same connection.
49 func NewDatabase() *Database {
50         db := &Database{}
51         for _, v := range sdlgoredis.Create() {
52                 db.instances = append(db.instances, v)
53         }
54         return db
55 }
56
57 //NewSdlInstance creates a new sdl instance using the given namespace.
58 //The database used as a backend is given as a parameter
59 func NewSdlInstance(NameSpace string, db *Database) *SdlInstance {
60         return &SdlInstance{
61                 nameSpace: NameSpace,
62                 nsPrefix:  "{" + NameSpace + "},",
63                 storage:   newSyncStorage(db),
64         }
65 }
66
67 //SubscribeChannel lets you to subscribe for a events on a given channels.
68 //SDL notifications are events that are published on a specific channels.
69 //Both the channel and events are defined by the entity that is publishing
70 //the events.
71 //
72 //When subscribing for a channel, a callback function is given as a parameter.
73 //Whenever a notification is received from a channel, this callback is called
74 //with channel and notifications as parameter (several notifications could be
75 //packed to a single callback function call). A call to SubscribeChannel function
76 //returns immediatelly, callbacks will be called asyncronously.
77 //
78 //It is possible to subscribe to different channels using different callbacks. In
79 //this case simply use SubscribeChannel function separately for each channel.
80 //
81 //When receiving events in callback routine, it is a good practive to return from
82 //callback as quickly as possible. E.g. reading in callback context should be avoided
83 //and using of Go signals is recommended. Also it should be noted that in case of several
84 //events received from different channels, callbacks are called in series one by one.
85 func (s *SdlInstance) SubscribeChannel(cb func(string, ...string), channels ...string) error {
86         s.storage.SubscribeChannel(s.nameSpace, cb, channels...)
87         return nil
88 }
89
90 //UnsubscribeChannel removes subscription from one or several channels.
91 func (s *SdlInstance) UnsubscribeChannel(channels ...string) error {
92         return s.storage.UnsubscribeChannel(s.nameSpace, channels...)
93 }
94
95 //Close connection to backend database.
96 func (s *SdlInstance) Close() error {
97         return s.storage.Close()
98 }
99
100 //SetAndPublish function writes data to shared data layer storage and sends an event to
101 //a channel. Writing is done atomically, i.e. all succeeds or fails.
102 //Data to be written is given as key-value pairs. Several key-value
103 //pairs can be written with one call.
104 //The key is expected to be string whereas value can be anything, string,
105 //number, slice array or map
106 //
107 //If data was set successfully, an event is sent to a channel.
108 //Channels and events are given as pairs is channelsAndEvents parameter.
109 //It is possible to send several events to several channels by giving several
110 //channel-event pairs.
111 //  E.g. []{"channel1", "event1", "channel2", "event2", "channel1", "event3"}
112 //will send event1 and event3 to channel1 and event2 to channel2.
113 func (s *SdlInstance) SetAndPublish(channelsAndEvents []string, pairs ...interface{}) error {
114         return s.storage.SetAndPublish(s.nameSpace, channelsAndEvents, pairs...)
115 }
116
117 //Set function writes data to shared data layer storage. Writing is done
118 //atomically, i.e. all succeeds or fails.
119 //Data to be written is given as key-value pairs. Several key-value
120 //pairs can be written with one call.
121 //The key is expected to be string whereas value can be anything, string,
122 //number, slice array or map
123 func (s *SdlInstance) Set(pairs ...interface{}) error {
124         return s.storage.Set(s.nameSpace, pairs...)
125 }
126
127 //Get function atomically reads one or more keys from SDL. The returned map has the
128 //requested keys as index and data as value. If the requested key is not found
129 //from SDL, it's value is nil
130 func (s *SdlInstance) Get(keys []string) (map[string]interface{}, error) {
131         return s.storage.Get(s.nameSpace, keys)
132 }
133
134 //SetIfAndPublish atomically replaces existing data with newData in SDL if data matches the oldData.
135 //If replace was done successfully, true will be returned. Also, if publishing was successfull, an event
136 //is published to a given channel.
137 func (s *SdlInstance) SetIfAndPublish(channelsAndEvents []string, key string, oldData, newData interface{}) (bool, error) {
138         return s.storage.SetIfAndPublish(s.nameSpace, channelsAndEvents, key, oldData, newData)
139 }
140
141 //SetIf atomically replaces existing data with newData in SDL if data matches the oldData.
142 //If replace was done successfully, true will be returned.
143 func (s *SdlInstance) SetIf(key string, oldData, newData interface{}) (bool, error) {
144         return s.storage.SetIf(s.nameSpace, key, oldData, newData)
145 }
146
147 //SetIfNotExistsAndPublish conditionally sets the value of a key. If key already exists in SDL,
148 //then it's value is not changed. Checking the key existence and potential set operation
149 //is done atomically. If the set operation was done successfully, an event is published to a
150 //given channel.
151 func (s *SdlInstance) SetIfNotExistsAndPublish(channelsAndEvents []string, key string, data interface{}) (bool, error) {
152         return s.storage.SetIfNotExistsAndPublish(s.nameSpace, channelsAndEvents, key, data)
153 }
154
155 //SetIfNotExists conditionally sets the value of a key. If key already exists in SDL,
156 //then it's value is not changed. Checking the key existence and potential set operation
157 //is done atomically.
158 func (s *SdlInstance) SetIfNotExists(key string, data interface{}) (bool, error) {
159         return s.storage.SetIfNotExists(s.nameSpace, key, data)
160 }
161
162 //RemoveAndPublish removes data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
163 //Trying to remove a nonexisting key is not considered as an error.
164 //An event is published into a given channel if remove operation is successfull and
165 //at least one key is removed (if several keys given). If the given key(s) doesn't exist
166 //when trying to remove, no event is published.
167 func (s *SdlInstance) RemoveAndPublish(channelsAndEvents []string, keys []string) error {
168         return s.storage.RemoveAndPublish(s.nameSpace, channelsAndEvents, keys)
169 }
170
171 //Remove data from SDL. Operation is done atomically, i.e. either all succeeds or fails.
172 func (s *SdlInstance) Remove(keys []string) error {
173         return s.storage.Remove(s.nameSpace, keys)
174 }
175
176 //RemoveIfAndPublish removes data from SDL conditionally and if remove was done successfully,
177 //a given event is published to channel. If existing data matches given data,
178 //key and data are removed from SDL. If remove was done successfully, true is returned.
179 func (s *SdlInstance) RemoveIfAndPublish(channelsAndEvents []string, key string, data interface{}) (bool, error) {
180         return s.storage.RemoveIfAndPublish(s.nameSpace, channelsAndEvents, key, data)
181 }
182
183 //RemoveIf removes data from SDL conditionally. If existing data matches given data,
184 //key and data are removed from SDL. If remove was done successfully, true is returned.
185 func (s *SdlInstance) RemoveIf(key string, data interface{}) (bool, error) {
186         return s.storage.RemoveIf(s.nameSpace, key, data)
187 }
188
189 //GetAll returns all keys under the namespace. No prior knowledge about the keys in the
190 //given namespace exists, thus operation is not guaranteed to be atomic or isolated.
191 func (s *SdlInstance) GetAll() ([]string, error) {
192         return s.storage.GetAll(s.nameSpace)
193 }
194
195 //RemoveAll removes all keys under the namespace. Remove operation is not atomic, thus
196 //it is not guaranteed that all keys are removed.
197 func (s *SdlInstance) RemoveAll() error {
198         return s.storage.RemoveAll(s.nameSpace)
199 }
200
201 //RemoveAllAndPublish removes all keys under the namespace and if successfull, it
202 //will publish an event to given channel. This operation is not atomic, thus it is
203 //not guaranteed that all keys are removed.
204 func (s *SdlInstance) RemoveAllAndPublish(channelsAndEvents []string) error {
205         return s.storage.RemoveAllAndPublish(s.nameSpace, channelsAndEvents)
206 }
207
208 //AddMember adds a new members to a group.
209 //
210 //SDL groups are unordered collections of members where each member is
211 //unique. It is possible to add the same member several times without the
212 //need to check if it already exists.
213 func (s *SdlInstance) AddMember(group string, member ...interface{}) error {
214         return s.storage.AddMember(s.nameSpace, group, member...)
215 }
216
217 //RemoveMember removes members from a group.
218 func (s *SdlInstance) RemoveMember(group string, member ...interface{}) error {
219         return s.storage.RemoveMember(s.nameSpace, group, member...)
220 }
221
222 //RemoveGroup removes the whole group along with it's members.
223 func (s *SdlInstance) RemoveGroup(group string) error {
224         return s.storage.RemoveGroup(s.nameSpace, group)
225 }
226
227 //GetMembers returns all the members from a group.
228 func (s *SdlInstance) GetMembers(group string) ([]string, error) {
229         return s.storage.GetMembers(s.nameSpace, group)
230 }
231
232 //IsMember returns true if given member is found from a group.
233 func (s *SdlInstance) IsMember(group string, member interface{}) (bool, error) {
234         return s.storage.IsMember(s.nameSpace, group, member)
235 }
236
237 //GroupSize returns the number of members in a group.
238 func (s *SdlInstance) GroupSize(group string) (int64, error) {
239         return s.storage.GroupSize(s.nameSpace, group)
240 }
241
242 //LockResource function is used for locking a resource. The resource lock in
243 //practice is a key with random value that is set to expire after a time
244 //period. The value written to key is a random value, thus only the instance
245 //created a lock, can release it. Resource locks are per namespace.
246 func (s *SdlInstance) LockResource(resource string, expiration time.Duration, opt *Options) (*Lock, error) {
247         l, err := s.storage.LockResource(s.nameSpace, resource, expiration, opt)
248         if l != nil {
249                 return &Lock{
250                         s:           s,
251                         storageLock: l,
252                 }, err
253         }
254         return nil, err
255 }
256
257 //ReleaseResource removes the lock from a resource. If lock is already
258 //expired or some other instance is keeping the lock (lock taken after expiration),
259 //an error is returned.
260 func (l *Lock) ReleaseResource() error {
261         return l.storageLock.ReleaseResource(l.s.nameSpace)
262 }
263
264 //RefreshResource function can be used to set a new expiration time for the
265 //resource lock (if the lock still exists). The old remaining expiration
266 //time is overwritten with the given new expiration time.
267 func (l *Lock) RefreshResource(expiration time.Duration) error {
268         return l.storageLock.RefreshResource(l.s.nameSpace, expiration)
269 }
270
271 //CheckResource returns the expiration time left for a resource.
272 //If the resource doesn't exist, -2 is returned.
273 func (s *SdlInstance) CheckResource(resource string) (time.Duration, error) {
274         return s.storage.CheckResource(s.nameSpace, resource)
275 }
276
277 //Options struct defines the behaviour for getting the resource lock.
278 type Options struct {
279         //The number of time the lock will be tried.
280         //Default: 0 = no retry
281         RetryCount int
282
283         //Wait between the retries.
284         //Default: 100ms
285         RetryWait time.Duration
286 }
287
288 func (o *Options) getRetryCount() int {
289         if o != nil && o.RetryCount > 0 {
290                 return o.RetryCount
291         }
292         return 0
293 }
294
295 func (o *Options) getRetryWait() time.Duration {
296         if o != nil && o.RetryWait > 0 {
297                 return o.RetryWait
298         }
299         return 100 * time.Millisecond
300 }
301
302 //Lock struct identifies the resource lock instance. Releasing and adjusting the
303 //expirations are done using the methods defined for this struct.
304 type Lock struct {
305         s           *SdlInstance
306         storageLock *SyncStorageLock
307 }