1 # Copyright (c) 2020 Samsung Electronics
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
16 # This source code is part of the near-RT RIC (RAN Intelligent Controller)
17 # platform project (RICP).
20 """Examples on how to use Shared Data Layer (SDL) notification feature.
22 Execution of these examples requires:
23 * Following Redis extension commands have been installed to runtime environment:
31 Redis v4.0 or greater is required. Older versions do not support extension modules.
32 Implementation of above commands is produced by RIC DBaaS:
33 https://gerrit.o-ran-sc.org/r/admin/repos/ric-plt/dbaas
34 In official RIC deployments these commands are installed by `dbaas` service to Redis
36 In development environment you may want install commands manually to pod/container, which is
38 * Following environment variables are needed to set to the pod/container where the application
39 utilizing SDL is going to be run.
40 DBAAS_SERVICE_HOST = [DB service address]
41 DBAAS_SERVICE_PORT= [DB service port]
42 DBAAS_MASTER_NAME = [DB name]. Needed to set only if Redis sentinel is used to provide high
43 availability for Redis DB solution.
44 DBAAS_SERVICE_SENTINEL_PORT = [Redis sentinel port number]. Needed to set only if Redis
46 DBASS_CLUSTER_ADDR_LIST = [list of DB service addresses]. Is set only if more than one
47 Redis sentinel groups are in use.
48 In official RIC deployments four first environment variables are defined in Helm configMaps
49 of the DBaaS and these configurations can be loaded automatically as environment variables
50 into application pods via `envFrom dbaas-appconfig` statement in an application Helm Charts.
51 The last environment variable is not for time being in use in official RIC deployments, only
52 in Nokia SEP deployments.
58 from ricsdl.syncstorage import SyncStorage
59 from ricsdl.exceptions import RejectedByBackend, NotConnected, BackendError
60 from typing import (Union, List)
62 # There are two available methods for applications to handle notifications:
63 # - EVENT_LISTENER (true):
64 # - User calls sdl.start_event_listener() which will create an SDL managed
65 # event loop for handling messages.
66 # - EVENT_LISTENER (false):
67 # - User need to call sdl.handle_events() which will return the message
69 # Note: In both cases, the given callback function will be executed.
72 # Constants used in the examples below.
74 MY_CHANNEL = "my_channel"
75 MY_LOCK = threading.Lock()
78 def _try_func_return(func):
80 Generic wrapper function to call SDL API function and handle exceptions if they are raised.
84 except RejectedByBackend as exp:
85 print(f'SDL function {func.__name__} failed: {str(exp)}')
86 # Permanent failure, just forward the exception
88 except (NotConnected, BackendError) as exp:
89 print(f'SDL function {func.__name__} failed for a temporal error: {str(exp)}')
90 # Here we could have a retry logic
93 def _try_func_callback_return(func):
94 """Generic wrapper function for testing SDL APIs with callback functions.
96 threading.Lock is unlocked in the callback function and threading.Lock is
97 only used to demonstrate that the callback function was called.
101 ret = _try_func_return(func)
102 while MY_LOCK.locked():
107 # Creates SDL instance. The call creates connection to the SDL database backend.
108 mysdl = _try_func_return(SyncStorage)
110 # Stores the last received channel and message
114 # Allows program to stop receive thread at the end of execution
118 def cb(channel: str, message: Union[str, List[str]]):
119 """An example of function that will be called when a single event or list of
122 This function sets last_cb_channel and last_cb_message as channel and
123 message respectively. This also unlocks the global lock variable.
126 channel: Channel where the message was received
127 message: Received message
129 global last_cb_channel, last_cb_message, MY_LOCK
130 last_cb_channel = channel
131 last_cb_message = message
137 """An example of a listener thread that continuously calls sdl.handle_events()."""
140 while not stop_thread:
141 message = mysdl.handle_events()
143 # You could process message here
147 # Subscribe to MY_CHANNEL. We expect that anytime we receive a message in the
148 # channel, cb function will be called.
149 _try_func_return(lambda: mysdl.subscribe_channel(MY_NS, cb, MY_CHANNEL))
151 # As mentioned above, there are two available methods for applications to
152 # handle notifications
154 _try_func_return(mysdl.start_event_listener)
156 thread = threading.Thread(target=listen_thread)
159 # Sets a value 'my_value' for a key 'my_key' under given namespace. Note that value
160 # type must be bytes and multiple key values can be set in one set function call.
161 _try_func_callback_return(
162 lambda: mysdl.set_and_publish(MY_NS, {MY_CHANNEL: "SET PUBLISH"}, {'my_key': b'my_value'}))
163 assert last_cb_channel == MY_CHANNEL and last_cb_message == "SET PUBLISH"
165 # Sets a value 'my_value' for a key 'my_key' under given namespace. Note that value
166 # type must be bytes and multiple key values can be set in one set function call.
167 # Function publishes two events into one channel.
168 _try_func_callback_return(
169 lambda: mysdl.set_and_publish(MY_NS, {MY_CHANNEL: ["SET PUBLISH1", "SET PUBLISH2"]}, {'my_key': b'my_value'}))
170 assert last_cb_channel == MY_CHANNEL and last_cb_message == ["SET PUBLISH1", "SET PUBLISH2"]
172 # Sets a value 'my_value2' for a key 'my_key' under given namespace only if the old value is
174 was_set = _try_func_callback_return(lambda: mysdl.set_if_and_publish(
175 MY_NS, {MY_CHANNEL: "SET IF PUBLISH"}, 'my_key', b'my_value', b'my_value2'))
176 assert was_set is True
177 assert last_cb_channel == MY_CHANNEL and last_cb_message == "SET IF PUBLISH"
178 # Try again. This time value 'my_value2' won't be set, because the key has already 'my_value2'
179 # value. Callback function will not be called here.
180 was_set = _try_func_return(lambda: mysdl.set_if_and_publish(MY_NS, {MY_CHANNEL: "SET IF PUBLISH"},
181 'my_key', b'my_value', b'my_value2'))
182 assert was_set is False
184 # Sets a value 'my_value' for a key 'my_key2' under given namespace only if the key doesn't exist.
185 # Note that value types must be bytes.
186 was_set = _try_func_callback_return(lambda: mysdl.set_if_not_exists_and_publish(
187 MY_NS, {MY_CHANNEL: "SET IF NOT PUBLISH"}, 'my_key2', b'my_value'))
188 assert was_set is True
189 assert last_cb_channel == MY_CHANNEL and last_cb_message == "SET IF NOT PUBLISH"
190 # Try again. This time the key 'my_key2' already exists. Callback function will not be called here.
191 was_set = _try_func_return(lambda: mysdl.set_if_not_exists_and_publish(
192 MY_NS, {MY_CHANNEL: "SET IF NOT PUBLISH"}, 'my_key2', b'my_value'))
193 assert was_set is False
195 # Removes a key 'my_key' under given namespace.
196 _try_func_callback_return(
197 lambda: mysdl.remove_and_publish(MY_NS, {MY_CHANNEL: "REMOVE PUBLISH"}, 'my_key'))
198 my_ret_dict = _try_func_return(lambda: mysdl.get(MY_NS, 'my_key'))
199 assert my_ret_dict == {}
200 assert last_cb_channel == MY_CHANNEL and last_cb_message == "REMOVE PUBLISH"
202 # Removes a key 'my_key' under given namespace only if the old value is 'my_value'.
203 was_removed = _try_func_callback_return(lambda: mysdl.remove_if_and_publish(
204 MY_NS, {MY_CHANNEL: "REMOVE IF PUBLISH"}, 'my_key2', b'my_value'))
205 assert was_removed is True
206 assert last_cb_channel == MY_CHANNEL and last_cb_message == "REMOVE IF PUBLISH"
207 # Try again to remove not anymore existing key 'my_key'. Callback function will not be called here.
208 was_removed = _try_func_return(lambda: mysdl.remove_if_and_publish(
209 MY_NS, {MY_CHANNEL: "REMOVE IF PUBLISH"}, 'my_key2', b'my_value'))
210 assert was_removed is False
212 # Removes all the keys under given namespace.
213 _try_func_return(lambda: mysdl.set(MY_NS, {'my_key': b'something'}))
214 my_ret_dict = _try_func_return(lambda: mysdl.get(MY_NS, {'my_key'}))
215 assert my_ret_dict != {}
217 _try_func_callback_return(
218 lambda: mysdl.remove_all_and_publish(MY_NS, {MY_CHANNEL: "REMOVE ALL PUBLISH"}))
219 my_ret_dict = _try_func_return(lambda: mysdl.get(MY_NS, {'my_key'}))
220 assert my_ret_dict == {}
221 assert last_cb_channel == MY_CHANNEL and last_cb_message == "REMOVE ALL PUBLISH"