1 # Copyright (c) 2020 Samsung Electronics
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
16 # This source code is part of the near-RT RIC (RAN Intelligent Controller)
17 # platform project (RICP).
20 """Examples on how to use Shared Data Layer (SDL) notification feature.
22 Execution of these examples requires:
23 * Following Redis extension commands have been installed to runtime environment:
31 Redis v4.0 or greater is required. Older versions do not support extension modules.
32 Implementation of above commands is produced by RIC DBaaS:
33 https://gerrit.o-ran-sc.org/r/admin/repos/ric-plt/dbaas
34 In official RIC deployments these commands are installed by `dbaas` service to Redis
36 In development environment you may want install commands manually to pod/container, which is
38 * Following environment variables are needed to set to the pod/container where the application
39 utilizing SDL is going to be run.
40 DBAAS_SERVICE_HOST = [redis server address]
41 DBAAS_SERVICE_PORT= [redis server port]
42 DBAAS_MASTER_NAME = [master Redis sentinel name]. Needed to set only if sentinel is in use.
43 DBAAS_SERVICE_SENTINEL_PORT = [Redis sentinel port number]. Needed to set only if sentinel
50 from ricsdl.syncstorage import SyncStorage
51 from ricsdl.exceptions import RejectedByBackend, NotConnected, BackendError
53 # There are two available methods for applications to handle notifications:
54 # - EVENT_LISTENER (true):
55 # - User calls sdl.start_event_listener() which will create an SDL managed
56 # event loop for handling messages.
57 # - EVENT_LISTENER (false):
58 # - User need to call sdl.handle_messages() which will return the message
60 # Note: In both cases, the given callback function will be executed.
63 # Constants used in the examples below.
65 MY_CHANNEL = "my_channel"
66 MY_LOCK = threading.Lock()
69 def _try_func_return(func):
71 Generic wrapper function to call SDL API function and handle exceptions if they are raised.
75 except RejectedByBackend as exp:
76 print(f'SDL function {func.__name__} failed: {str(exp)}')
77 # Permanent failure, just forward the exception
79 except (NotConnected, BackendError) as exp:
80 print(f'SDL function {func.__name__} failed for a temporal error: {str(exp)}')
81 # Here we could have a retry logic
84 def _try_func_callback_return(func):
85 """Generic wrapper function for testing SDL APIs with callback functions.
87 threading.Lock is unlocked in the callback function and threading.Lock is
88 only used to demonstrate that the callback function was called.
92 ret = _try_func_return(func)
93 while MY_LOCK.locked():
98 # Creates SDL instance. The call creates connection to the SDL database backend.
99 mysdl = _try_func_return(SyncStorage)
101 # Stores the last received channel and message
105 # Allows program to stop receive thread at the end of execution
109 def cb(channel: str, message: str):
110 """An example of function that will be called when an event is received.
112 This function sets last_cb_channel and last_cb_message as channel and
113 message respectively. This also unlocks the global lock variable.
116 channel: Channel where the message was received
117 message: Received message
119 global last_cb_channel, last_cb_message, MY_LOCK
120 last_cb_channel = channel
121 last_cb_message = message
127 """An example of a listener thread that continuously calls sdl.handle_events()."""
130 while not stop_thread:
131 message = mysdl.handle_events()
133 # You could process message here
138 # As mentioned above, there are two available methods for applications to
139 # handle notifications
141 _try_func_return(mysdl.start_event_listener)
143 thread = threading.Thread(target=listen_thread)
146 # Subscribe to MY_CHANNEL. We expect that anytime we receive a message in the
147 # channel, cb function will be called.
148 _try_func_return(lambda: mysdl.subscribe_channel(MY_NS, cb, MY_CHANNEL))
150 # Sets a value 'my_value' for a key 'my_key' under given namespace. Note that value
151 # type must be bytes and multiple key values can be set in one set function call.
152 _try_func_callback_return(
153 lambda: mysdl.set_and_publish(MY_NS, {MY_CHANNEL: "SET PUBLISH"}, {'my_key': b'my_value'}))
154 assert last_cb_channel == MY_CHANNEL and last_cb_message == "SET PUBLISH"
156 # Sets a value 'my_value2' for a key 'my_key' under given namespace only if the old value is
158 was_set = _try_func_callback_return(lambda: mysdl.set_if_and_publish(
159 MY_NS, {MY_CHANNEL: "SET IF PUBLISH"}, 'my_key', b'my_value', b'my_value2'))
160 assert was_set is True
161 assert last_cb_channel == MY_CHANNEL and last_cb_message == "SET IF PUBLISH"
162 # Try again. This time value 'my_value2' won't be set, because the key has already 'my_value2'
163 # value. Callback function will not be called here.
164 was_set = _try_func_return(lambda: mysdl.set_if_and_publish(MY_NS, {MY_CHANNEL: "SET IF PUBLISH"},
165 'my_key', b'my_value', b'my_value2'))
166 assert was_set is False
168 # Sets a value 'my_value' for a key 'my_key2' under given namespace only if the key doesn't exist.
169 # Note that value types must be bytes.
170 was_set = _try_func_callback_return(lambda: mysdl.set_if_not_exists_and_publish(
171 MY_NS, {MY_CHANNEL: "SET IF NOT PUBLISH"}, 'my_key2', b'my_value'))
172 assert was_set is True
173 assert last_cb_channel == MY_CHANNEL and last_cb_message == "SET IF NOT PUBLISH"
174 # Try again. This time the key 'my_key2' already exists. Callback function will not be called here.
175 was_set = _try_func_return(lambda: mysdl.set_if_not_exists_and_publish(
176 MY_NS, {MY_CHANNEL: "SET IF NOT PUBLISH"}, 'my_key2', b'my_value'))
177 assert was_set is False
179 # Removes a key 'my_key' under given namespace.
180 _try_func_callback_return(
181 lambda: mysdl.remove_and_publish(MY_NS, {MY_CHANNEL: "REMOVE PUBLISH"}, 'my_key'))
182 my_ret_dict = _try_func_return(lambda: mysdl.get(MY_NS, 'my_key'))
183 assert my_ret_dict == {}
184 assert last_cb_channel == MY_CHANNEL and last_cb_message == "REMOVE PUBLISH"
186 # Removes a key 'my_key' under given namespace only if the old value is 'my_value'.
187 was_removed = _try_func_callback_return(lambda: mysdl.remove_if_and_publish(
188 MY_NS, {MY_CHANNEL: "REMOVE IF PUBLISH"}, 'my_key2', b'my_value'))
189 assert was_removed is True
190 assert last_cb_channel == MY_CHANNEL and last_cb_message == "REMOVE IF PUBLISH"
191 # Try again to remove not anymore existing key 'my_key'. Callback function will not be called here.
192 was_removed = _try_func_return(lambda: mysdl.remove_if_and_publish(
193 MY_NS, {MY_CHANNEL: "REMOVE IF PUBLISH"}, 'my_key2', b'my_value'))
194 assert was_removed is False
196 # Removes all the keys under given namespace.
197 _try_func_return(lambda: mysdl.set(MY_NS, {'my_key': b'something'}))
198 my_ret_dict = _try_func_return(lambda: mysdl.get(MY_NS, {'my_key'}))
199 assert my_ret_dict != {}
201 _try_func_callback_return(
202 lambda: mysdl.remove_all_and_publish(MY_NS, {MY_CHANNEL: "REMOVE ALL PUBLISH"}))
203 my_ret_dict = _try_func_return(lambda: mysdl.get(MY_NS, {'my_key'}))
204 assert my_ret_dict == {}
205 assert last_cb_channel == MY_CHANNEL and last_cb_message == "REMOVE ALL PUBLISH"