3041d80cbcf3a09f2955b4a7ad2bb41310fb5e79
[ric-plt/sdlpy.git] / ricsdl-package / examples / notify.py
1 # Copyright (c) 2020 Samsung Electronics
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 #
16 # This source code is part of the near-RT RIC (RAN Intelligent Controller)
17 # platform project (RICP).
18 #
19
20 """Examples on how to use Shared Data Layer (SDL) notification feature.
21
22 Execution of  these examples requires:
23  * Following Redis extension commands have been installed to runtime environment:
24    - MSETPUB
25    - SETIE
26    - SETIEMPUB
27    - SETNXMPUB
28    - DELMPUB
29    - DELIE
30    - DELIEMPUB
31    Redis v4.0 or greater is required. Older versions do not support extension modules.
32    Implementation of above commands is produced by RIC DBaaS:
33    https://gerrit.o-ran-sc.org/r/admin/repos/ric-plt/dbaas
34    In official RIC deployments these commands are installed by `dbaas` service to Redis
35    container(s).
36    In development environment you may want install commands manually to pod/container, which is
37    running Redis.
38  * Following environment variables are needed to set to the pod/container where the application
39    utilizing SDL is going to be run.
40      DBAAS_SERVICE_HOST = [redis server address]
41      DBAAS_SERVICE_PORT= [redis server port]
42      DBAAS_MASTER_NAME = [master Redis sentinel name]. Needed to set only if sentinel is in use.
43      DBAAS_SERVICE_SENTINEL_PORT = [Redis sentinel port number]. Needed to set only if sentinel
44      is in use.
45 """
46
47 import threading
48 import time
49
50 from ricsdl.syncstorage import SyncStorage
51 from ricsdl.exceptions import RejectedByBackend, NotConnected, BackendError
52
53 # There are two available methods for applications to handle notifications:
54 #   - EVENT_LISTENER (true):
55 #     - User calls sdl.start_event_listener() which will create an SDL managed
56 #       event loop for handling messages.
57 #   - EVENT_LISTENER (false):
58 #     - User need to call sdl.handle_messages() which will return the message
59 #
60 # Note: In both cases, the given callback function will be executed.
61 EVENT_LISTENER = True
62
63 # Constants used in the examples below.
64 MY_NS = 'my_ns'
65 MY_CHANNEL = "my_channel"
66 MY_LOCK = threading.Lock()
67
68
69 def _try_func_return(func):
70     """
71     Generic wrapper function to call SDL API function and handle exceptions if they are raised.
72     """
73     try:
74         return func()
75     except RejectedByBackend as exp:
76         print(f'SDL function {func.__name__} failed: {str(exp)}')
77         # Permanent failure, just forward the exception
78         raise
79     except (NotConnected, BackendError) as exp:
80         print(f'SDL function {func.__name__} failed for a temporal error: {str(exp)}')
81         # Here we could have a retry logic
82
83
84 def _try_func_callback_return(func):
85     """Generic wrapper function for testing SDL APIs with callback functions.
86
87     threading.Lock is unlocked in the callback function and threading.Lock is
88     only used to demonstrate that the callback function was called.
89     """
90     global MY_LOCK
91     MY_LOCK.acquire()
92     ret = _try_func_return(func)
93     while MY_LOCK.locked():
94         time.sleep(0.01)
95     return ret
96
97
98 # Creates SDL instance. The call creates connection to the SDL database backend.
99 mysdl = _try_func_return(SyncStorage)
100
101 # Stores the last received channel and message
102 last_cb_channel = ""
103 last_cb_message = ""
104
105 # Allows program to stop receive thread at the end of execution
106 stop_thread = False
107
108
109 def cb(channel: str, message: str):
110     """An example of function that will be called when an event is received.
111
112     This function sets last_cb_channel and last_cb_message as channel and
113     message respectively. This also unlocks the global lock variable.
114
115     Args:
116         channel: Channel where the message was received
117         message: Received message
118     """
119     global last_cb_channel, last_cb_message, MY_LOCK
120     last_cb_channel = channel
121     last_cb_message = message
122     if MY_LOCK.locked():
123         MY_LOCK.release()
124
125
126 def listen_thread():
127     """An example of a listener thread that continuously calls sdl.handle_events()."""
128     global mysdl
129     global stop_thread
130     while not stop_thread:
131         message = mysdl.handle_events()
132         if message:
133             # You could process message here
134             pass
135         time.sleep(0.001)
136
137 # Subscribe to MY_CHANNEL. We expect that anytime we receive a message in the
138 # channel, cb function will be called.
139 _try_func_return(lambda: mysdl.subscribe_channel(MY_NS, cb, MY_CHANNEL))
140
141 # As mentioned above, there are two available methods for applications to
142 # handle notifications
143 if EVENT_LISTENER:
144     _try_func_return(mysdl.start_event_listener)
145 else:
146     thread = threading.Thread(target=listen_thread)
147     thread.start()
148
149 # Sets a value 'my_value' for a key 'my_key' under given namespace. Note that value
150 # type must be bytes and multiple key values can be set in one set function call.
151 _try_func_callback_return(
152     lambda: mysdl.set_and_publish(MY_NS, {MY_CHANNEL: "SET PUBLISH"}, {'my_key': b'my_value'}))
153 assert last_cb_channel == MY_CHANNEL and last_cb_message == "SET PUBLISH"
154
155 # Sets a value 'my_value2' for a key 'my_key' under given namespace only if the old value is
156 # 'my_value'.
157 was_set = _try_func_callback_return(lambda: mysdl.set_if_and_publish(
158     MY_NS, {MY_CHANNEL: "SET IF PUBLISH"}, 'my_key', b'my_value', b'my_value2'))
159 assert was_set is True
160 assert last_cb_channel == MY_CHANNEL and last_cb_message == "SET IF PUBLISH"
161 # Try again. This time value 'my_value2' won't be set, because the key has already 'my_value2'
162 # value. Callback function will not be called here.
163 was_set = _try_func_return(lambda: mysdl.set_if_and_publish(MY_NS, {MY_CHANNEL: "SET IF PUBLISH"},
164                                                             'my_key', b'my_value', b'my_value2'))
165 assert was_set is False
166
167 # Sets a value 'my_value' for a key 'my_key2' under given namespace only if the key doesn't exist.
168 # Note that value types must be bytes.
169 was_set = _try_func_callback_return(lambda: mysdl.set_if_not_exists_and_publish(
170     MY_NS, {MY_CHANNEL: "SET IF NOT PUBLISH"}, 'my_key2', b'my_value'))
171 assert was_set is True
172 assert last_cb_channel == MY_CHANNEL and last_cb_message == "SET IF NOT PUBLISH"
173 # Try again. This time the key 'my_key2' already exists. Callback function will not be called here.
174 was_set = _try_func_return(lambda: mysdl.set_if_not_exists_and_publish(
175     MY_NS, {MY_CHANNEL: "SET IF NOT PUBLISH"}, 'my_key2', b'my_value'))
176 assert was_set is False
177
178 # Removes a key 'my_key' under given namespace.
179 _try_func_callback_return(
180     lambda: mysdl.remove_and_publish(MY_NS, {MY_CHANNEL: "REMOVE PUBLISH"}, 'my_key'))
181 my_ret_dict = _try_func_return(lambda: mysdl.get(MY_NS, 'my_key'))
182 assert my_ret_dict == {}
183 assert last_cb_channel == MY_CHANNEL and last_cb_message == "REMOVE PUBLISH"
184
185 # Removes a key 'my_key' under given namespace only if the old value is 'my_value'.
186 was_removed = _try_func_callback_return(lambda: mysdl.remove_if_and_publish(
187     MY_NS, {MY_CHANNEL: "REMOVE IF PUBLISH"}, 'my_key2', b'my_value'))
188 assert was_removed is True
189 assert last_cb_channel == MY_CHANNEL and last_cb_message == "REMOVE IF PUBLISH"
190 # Try again to remove not anymore existing key 'my_key'. Callback function will not be called here.
191 was_removed = _try_func_return(lambda: mysdl.remove_if_and_publish(
192     MY_NS, {MY_CHANNEL: "REMOVE IF PUBLISH"}, 'my_key2', b'my_value'))
193 assert was_removed is False
194
195 # Removes all the keys under given namespace.
196 _try_func_return(lambda: mysdl.set(MY_NS, {'my_key': b'something'}))
197 my_ret_dict = _try_func_return(lambda: mysdl.get(MY_NS, {'my_key'}))
198 assert my_ret_dict != {}
199
200 _try_func_callback_return(
201     lambda: mysdl.remove_all_and_publish(MY_NS, {MY_CHANNEL: "REMOVE ALL PUBLISH"}))
202 my_ret_dict = _try_func_return(lambda: mysdl.get(MY_NS, {'my_key'}))
203 assert my_ret_dict == {}
204 assert last_cb_channel == MY_CHANNEL and last_cb_message == "REMOVE ALL PUBLISH"
205
206 stop_thread = True
207 mysdl.close()