Read list type of environment variables
[ric-plt/sdlpy.git] / ricsdl-package / examples / notify.py
1 # Copyright (c) 2020 Samsung Electronics
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 #
16 # This source code is part of the near-RT RIC (RAN Intelligent Controller)
17 # platform project (RICP).
18 #
19
20 """Examples on how to use Shared Data Layer (SDL) notification feature.
21
22 Execution of  these examples requires:
23  * Following Redis extension commands have been installed to runtime environment:
24    - MSETPUB
25    - SETIE
26    - SETIEMPUB
27    - SETNXMPUB
28    - DELMPUB
29    - DELIE
30    - DELIEMPUB
31    Redis v4.0 or greater is required. Older versions do not support extension modules.
32    Implementation of above commands is produced by RIC DBaaS:
33    https://gerrit.o-ran-sc.org/r/admin/repos/ric-plt/dbaas
34    In official RIC deployments these commands are installed by `dbaas` service to Redis
35    container(s).
36    In development environment you may want install commands manually to pod/container, which is
37    running Redis.
38  * Following environment variables are needed to set to the pod/container where the application
39    utilizing SDL is going to be run.
40      DBAAS_SERVICE_HOST = [DB service address]
41      DBAAS_SERVICE_PORT= [Comma separated list of DB service ports]. Only one port supported in
42      RIC deployments, Nokia SEP deployments can have multiple ports.
43      DBAAS_MASTER_NAME = [Comma separated list of DB names]. Needed to set only if Redis
44      sentinel is used to provide high availability for Redis DB solution. Only one DB name
45      supported in RIC deployments, Nokia SEP deployments can have multiple DB names.
46      DBAAS_SERVICE_SENTINEL_PORT = [Comma separated list of Redis sentinel port number]. Needed
47      to set only if Redis sentinel is in use. Only one port supported in RIC deployments, Nokia
48      SEP deployments can have multiple ports.
49      DBASS_CLUSTER_ADDR_LIST = [Comma separated list of DB service addresses]. Is set only if
50      more than one Redis sentinel groups are in use. Only in use in Nokia SEP deployments.
51    In official RIC deployments four first environment variables are defined in Helm configMaps
52    of the DBaaS and these configurations can be loaded automatically as environment variables
53    into application pods via `envFrom dbaas-appconfig` statement in an application Helm Charts.
54    The last environment variable is not for time being in use in official RIC deployments, only
55    in Nokia SEP deployments.
56 """
57
58 import threading
59 import time
60
61 from ricsdl.syncstorage import SyncStorage
62 from ricsdl.exceptions import RejectedByBackend, NotConnected, BackendError
63 from typing import (Union, List)
64
65 # There are two available methods for applications to handle notifications:
66 #   - EVENT_LISTENER (true):
67 #     - User calls sdl.start_event_listener() which will create an SDL managed
68 #       event loop for handling messages.
69 #   - EVENT_LISTENER (false):
70 #     - User need to call sdl.handle_events() which will return the message
71 #
72 # Note: In both cases, the given callback function will be executed.
73 EVENT_LISTENER = True
74
75 # Constants used in the examples below.
76 MY_NS = 'my_ns'
77 MY_CHANNEL = "my_channel"
78 MY_LOCK = threading.Lock()
79
80
81 def _try_func_return(func):
82     """
83     Generic wrapper function to call SDL API function and handle exceptions if they are raised.
84     """
85     try:
86         return func()
87     except RejectedByBackend as exp:
88         print(f'SDL function {func.__name__} failed: {str(exp)}')
89         # Permanent failure, just forward the exception
90         raise
91     except (NotConnected, BackendError) as exp:
92         print(f'SDL function {func.__name__} failed for a temporal error: {str(exp)}')
93         # Here we could have a retry logic
94
95
96 def _try_func_callback_return(func):
97     """Generic wrapper function for testing SDL APIs with callback functions.
98
99     threading.Lock is unlocked in the callback function and threading.Lock is
100     only used to demonstrate that the callback function was called.
101     """
102     global MY_LOCK
103     MY_LOCK.acquire()
104     ret = _try_func_return(func)
105     while MY_LOCK.locked():
106         time.sleep(0.01)
107     return ret
108
109
110 # Creates SDL instance. The call creates connection to the SDL database backend.
111 mysdl = _try_func_return(SyncStorage)
112
113 # Stores the last received channel and message
114 last_cb_channel = ""
115 last_cb_message = ""
116
117 # Allows program to stop receive thread at the end of execution
118 stop_thread = False
119
120
121 def cb(channel: str, message: List[str]):
122     """An example of function that will be called when an event list is received.
123
124     This function sets last_cb_channel and last_cb_message as channel and
125     message respectively. This also unlocks the global lock variable.
126
127     Args:
128         channel: Channel where the message was received
129         message: Received message
130     """
131     global last_cb_channel, last_cb_message, MY_LOCK
132     last_cb_channel = channel
133     last_cb_message = message
134     if MY_LOCK.locked():
135         MY_LOCK.release()
136
137
138 def listen_thread():
139     """An example of a listener thread that continuously calls sdl.handle_events()."""
140     global mysdl
141     global stop_thread
142     while not stop_thread:
143         message = mysdl.handle_events()
144         if message:
145             # You could process message here
146             pass
147         time.sleep(0.001)
148
149 # Subscribe to MY_CHANNEL. We expect that anytime we receive a message in the
150 # channel, cb function will be called.
151 _try_func_return(lambda: mysdl.subscribe_channel(MY_NS, cb, MY_CHANNEL))
152
153 # As mentioned above, there are two available methods for applications to
154 # handle notifications
155 if EVENT_LISTENER:
156     _try_func_return(mysdl.start_event_listener)
157 else:
158     thread = threading.Thread(target=listen_thread)
159     thread.start()
160
161 # Sets a value 'my_value' for a key 'my_key' under given namespace. Note that value
162 # type must be bytes and multiple key values can be set in one set function call.
163 _try_func_callback_return(
164     lambda: mysdl.set_and_publish(MY_NS, {MY_CHANNEL: "SET PUBLISH"}, {'my_key': b'my_value'}))
165 assert last_cb_channel == MY_CHANNEL and last_cb_message[0] == "SET PUBLISH"
166
167 # Sets a value 'my_value' for a key 'my_key' under given namespace. Note that value
168 # type must be bytes and multiple key values can be set in one set function call.
169 # Function publishes two events into one channel.
170 _try_func_callback_return(
171     lambda: mysdl.set_and_publish(MY_NS, {MY_CHANNEL: ["SET PUBLISH1", "SET PUBLISH2"]}, {'my_key': b'my_value'}))
172 assert last_cb_channel == MY_CHANNEL and last_cb_message == ["SET PUBLISH1", "SET PUBLISH2"]
173
174 # Sets a value 'my_value2' for a key 'my_key' under given namespace only if the old value is
175 # 'my_value'.
176 was_set = _try_func_callback_return(lambda: mysdl.set_if_and_publish(
177     MY_NS, {MY_CHANNEL: "SET IF PUBLISH"}, 'my_key', b'my_value', b'my_value2'))
178 assert was_set is True
179 assert last_cb_channel == MY_CHANNEL and last_cb_message[0] == "SET IF PUBLISH"
180 # Try again. This time value 'my_value2' won't be set, because the key has already 'my_value2'
181 # value. Callback function will not be called here.
182 was_set = _try_func_return(lambda: mysdl.set_if_and_publish(MY_NS, {MY_CHANNEL: "SET IF PUBLISH"},
183                                                             'my_key', b'my_value', b'my_value2'))
184 assert was_set is False
185
186 # Sets a value 'my_value' for a key 'my_key2' under given namespace only if the key doesn't exist.
187 # Note that value types must be bytes.
188 was_set = _try_func_callback_return(lambda: mysdl.set_if_not_exists_and_publish(
189     MY_NS, {MY_CHANNEL: "SET IF NOT PUBLISH"}, 'my_key2', b'my_value'))
190 assert was_set is True
191 assert last_cb_channel == MY_CHANNEL and last_cb_message[0] == "SET IF NOT PUBLISH"
192 # Try again. This time the key 'my_key2' already exists. Callback function will not be called here.
193 was_set = _try_func_return(lambda: mysdl.set_if_not_exists_and_publish(
194     MY_NS, {MY_CHANNEL: "SET IF NOT PUBLISH"}, 'my_key2', b'my_value'))
195 assert was_set is False
196
197 # Removes a key 'my_key' under given namespace.
198 _try_func_callback_return(
199     lambda: mysdl.remove_and_publish(MY_NS, {MY_CHANNEL: "REMOVE PUBLISH"}, 'my_key'))
200 my_ret_dict = _try_func_return(lambda: mysdl.get(MY_NS, 'my_key'))
201 assert my_ret_dict == {}
202 assert last_cb_channel == MY_CHANNEL and last_cb_message[0] == "REMOVE PUBLISH"
203
204 # Removes a key 'my_key' under given namespace only if the old value is 'my_value'.
205 was_removed = _try_func_callback_return(lambda: mysdl.remove_if_and_publish(
206     MY_NS, {MY_CHANNEL: "REMOVE IF PUBLISH"}, 'my_key2', b'my_value'))
207 assert was_removed is True
208 assert last_cb_channel == MY_CHANNEL and last_cb_message[0] == "REMOVE IF PUBLISH"
209 # Try again to remove not anymore existing key 'my_key'. Callback function will not be called here.
210 was_removed = _try_func_return(lambda: mysdl.remove_if_and_publish(
211     MY_NS, {MY_CHANNEL: "REMOVE IF PUBLISH"}, 'my_key2', b'my_value'))
212 assert was_removed is False
213
214 # Removes all the keys under given namespace.
215 _try_func_return(lambda: mysdl.set(MY_NS, {'my_key': b'something'}))
216 my_ret_dict = _try_func_return(lambda: mysdl.get(MY_NS, {'my_key'}))
217 assert my_ret_dict != {}
218
219 _try_func_callback_return(
220     lambda: mysdl.remove_all_and_publish(MY_NS, {MY_CHANNEL: "REMOVE ALL PUBLISH"}))
221 my_ret_dict = _try_func_return(lambda: mysdl.get(MY_NS, {'my_key'}))
222 assert my_ret_dict == {}
223 assert last_cb_channel == MY_CHANNEL and last_cb_message[0] == "REMOVE ALL PUBLISH"
224
225 stop_thread = True
226 mysdl.close()