006001f427da6b8d45fbeaf217bc11c1f6f9718e
[ric-plt/sdlpy.git] / ricsdl-package / examples / notify.py
1 # Copyright (c) 2020 Samsung Electronics
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 #
16 # This source code is part of the near-RT RIC (RAN Intelligent Controller)
17 # platform project (RICP).
18 #
19
20 """Examples on how to use Shared Data Layer (SDL) notification feature.
21
22 Execution of  these examples requires:
23  * Following Redis extension commands have been installed to runtime environment:
24    - MSETPUB
25    - SETIE
26    - SETIEMPUB
27    - SETNXMPUB
28    - DELMPUB
29    - DELIE
30    - DELIEMPUB
31    Redis v4.0 or greater is required. Older versions do not support extension modules.
32    Implementation of above commands is produced by RIC DBaaS:
33    https://gerrit.o-ran-sc.org/r/admin/repos/ric-plt/dbaas
34    In official RIC deployments these commands are installed by `dbaas` service to Redis
35    container(s).
36    In development environment you may want install commands manually to pod/container, which is
37    running Redis.
38  * Following environment variables are needed to set to the pod/container where the application
39    utilizing SDL is going to be run.
40      DBAAS_SERVICE_HOST = [DB service address]
41      DBAAS_SERVICE_PORT= [DB service port]
42      DBAAS_MASTER_NAME = [DB name]. Needed to set only if Redis sentinel is used to provide high
43      availability for Redis DB solution.
44      DBAAS_SERVICE_SENTINEL_PORT = [Redis sentinel port number]. Needed to set only if Redis
45      sentinel is in use.
46      DBASS_CLUSTER_ADDR_LIST = [list of DB service addresses]. Is set only if more than one
47      Redis sentinel groups are in use.
48    In official RIC deployments four first environment variables are defined in Helm configMaps
49    of the DBaaS and these configurations can be loaded automatically as environment variables
50    into application pods via `envFrom dbaas-appconfig` statement in an application Helm Charts.
51    The last environment variable is not for time being in use in official RIC deployments, only
52    in Nokia SEP deployments.
53 """
54
55 import threading
56 import time
57
58 from ricsdl.syncstorage import SyncStorage
59 from ricsdl.exceptions import RejectedByBackend, NotConnected, BackendError
60 from typing import (Union, List)
61
62 # There are two available methods for applications to handle notifications:
63 #   - EVENT_LISTENER (true):
64 #     - User calls sdl.start_event_listener() which will create an SDL managed
65 #       event loop for handling messages.
66 #   - EVENT_LISTENER (false):
67 #     - User need to call sdl.handle_events() which will return the message
68 #
69 # Note: In both cases, the given callback function will be executed.
70 EVENT_LISTENER = True
71
72 # Constants used in the examples below.
73 MY_NS = 'my_ns'
74 MY_CHANNEL = "my_channel"
75 MY_LOCK = threading.Lock()
76
77
78 def _try_func_return(func):
79     """
80     Generic wrapper function to call SDL API function and handle exceptions if they are raised.
81     """
82     try:
83         return func()
84     except RejectedByBackend as exp:
85         print(f'SDL function {func.__name__} failed: {str(exp)}')
86         # Permanent failure, just forward the exception
87         raise
88     except (NotConnected, BackendError) as exp:
89         print(f'SDL function {func.__name__} failed for a temporal error: {str(exp)}')
90         # Here we could have a retry logic
91
92
93 def _try_func_callback_return(func):
94     """Generic wrapper function for testing SDL APIs with callback functions.
95
96     threading.Lock is unlocked in the callback function and threading.Lock is
97     only used to demonstrate that the callback function was called.
98     """
99     global MY_LOCK
100     MY_LOCK.acquire()
101     ret = _try_func_return(func)
102     while MY_LOCK.locked():
103         time.sleep(0.01)
104     return ret
105
106
107 # Creates SDL instance. The call creates connection to the SDL database backend.
108 mysdl = _try_func_return(SyncStorage)
109
110 # Stores the last received channel and message
111 last_cb_channel = ""
112 last_cb_message = ""
113
114 # Allows program to stop receive thread at the end of execution
115 stop_thread = False
116
117
118 def cb(channel: str, message: List[str]):
119     """An example of function that will be called when an event list is received.
120
121     This function sets last_cb_channel and last_cb_message as channel and
122     message respectively. This also unlocks the global lock variable.
123
124     Args:
125         channel: Channel where the message was received
126         message: Received message
127     """
128     global last_cb_channel, last_cb_message, MY_LOCK
129     last_cb_channel = channel
130     last_cb_message = message
131     if MY_LOCK.locked():
132         MY_LOCK.release()
133
134
135 def listen_thread():
136     """An example of a listener thread that continuously calls sdl.handle_events()."""
137     global mysdl
138     global stop_thread
139     while not stop_thread:
140         message = mysdl.handle_events()
141         if message:
142             # You could process message here
143             pass
144         time.sleep(0.001)
145
146 # Subscribe to MY_CHANNEL. We expect that anytime we receive a message in the
147 # channel, cb function will be called.
148 _try_func_return(lambda: mysdl.subscribe_channel(MY_NS, cb, MY_CHANNEL))
149
150 # As mentioned above, there are two available methods for applications to
151 # handle notifications
152 if EVENT_LISTENER:
153     _try_func_return(mysdl.start_event_listener)
154 else:
155     thread = threading.Thread(target=listen_thread)
156     thread.start()
157
158 # Sets a value 'my_value' for a key 'my_key' under given namespace. Note that value
159 # type must be bytes and multiple key values can be set in one set function call.
160 _try_func_callback_return(
161     lambda: mysdl.set_and_publish(MY_NS, {MY_CHANNEL: "SET PUBLISH"}, {'my_key': b'my_value'}))
162 assert last_cb_channel == MY_CHANNEL and last_cb_message[0] == "SET PUBLISH"
163
164 # Sets a value 'my_value' for a key 'my_key' under given namespace. Note that value
165 # type must be bytes and multiple key values can be set in one set function call.
166 # Function publishes two events into one channel.
167 _try_func_callback_return(
168     lambda: mysdl.set_and_publish(MY_NS, {MY_CHANNEL: ["SET PUBLISH1", "SET PUBLISH2"]}, {'my_key': b'my_value'}))
169 assert last_cb_channel == MY_CHANNEL and last_cb_message == ["SET PUBLISH1", "SET PUBLISH2"]
170
171 # Sets a value 'my_value2' for a key 'my_key' under given namespace only if the old value is
172 # 'my_value'.
173 was_set = _try_func_callback_return(lambda: mysdl.set_if_and_publish(
174     MY_NS, {MY_CHANNEL: "SET IF PUBLISH"}, 'my_key', b'my_value', b'my_value2'))
175 assert was_set is True
176 assert last_cb_channel == MY_CHANNEL and last_cb_message[0] == "SET IF PUBLISH"
177 # Try again. This time value 'my_value2' won't be set, because the key has already 'my_value2'
178 # value. Callback function will not be called here.
179 was_set = _try_func_return(lambda: mysdl.set_if_and_publish(MY_NS, {MY_CHANNEL: "SET IF PUBLISH"},
180                                                             'my_key', b'my_value', b'my_value2'))
181 assert was_set is False
182
183 # Sets a value 'my_value' for a key 'my_key2' under given namespace only if the key doesn't exist.
184 # Note that value types must be bytes.
185 was_set = _try_func_callback_return(lambda: mysdl.set_if_not_exists_and_publish(
186     MY_NS, {MY_CHANNEL: "SET IF NOT PUBLISH"}, 'my_key2', b'my_value'))
187 assert was_set is True
188 assert last_cb_channel == MY_CHANNEL and last_cb_message[0] == "SET IF NOT PUBLISH"
189 # Try again. This time the key 'my_key2' already exists. Callback function will not be called here.
190 was_set = _try_func_return(lambda: mysdl.set_if_not_exists_and_publish(
191     MY_NS, {MY_CHANNEL: "SET IF NOT PUBLISH"}, 'my_key2', b'my_value'))
192 assert was_set is False
193
194 # Removes a key 'my_key' under given namespace.
195 _try_func_callback_return(
196     lambda: mysdl.remove_and_publish(MY_NS, {MY_CHANNEL: "REMOVE PUBLISH"}, 'my_key'))
197 my_ret_dict = _try_func_return(lambda: mysdl.get(MY_NS, 'my_key'))
198 assert my_ret_dict == {}
199 assert last_cb_channel == MY_CHANNEL and last_cb_message[0] == "REMOVE PUBLISH"
200
201 # Removes a key 'my_key' under given namespace only if the old value is 'my_value'.
202 was_removed = _try_func_callback_return(lambda: mysdl.remove_if_and_publish(
203     MY_NS, {MY_CHANNEL: "REMOVE IF PUBLISH"}, 'my_key2', b'my_value'))
204 assert was_removed is True
205 assert last_cb_channel == MY_CHANNEL and last_cb_message[0] == "REMOVE IF PUBLISH"
206 # Try again to remove not anymore existing key 'my_key'. Callback function will not be called here.
207 was_removed = _try_func_return(lambda: mysdl.remove_if_and_publish(
208     MY_NS, {MY_CHANNEL: "REMOVE IF PUBLISH"}, 'my_key2', b'my_value'))
209 assert was_removed is False
210
211 # Removes all the keys under given namespace.
212 _try_func_return(lambda: mysdl.set(MY_NS, {'my_key': b'something'}))
213 my_ret_dict = _try_func_return(lambda: mysdl.get(MY_NS, {'my_key'}))
214 assert my_ret_dict != {}
215
216 _try_func_callback_return(
217     lambda: mysdl.remove_all_and_publish(MY_NS, {MY_CHANNEL: "REMOVE ALL PUBLISH"}))
218 my_ret_dict = _try_func_return(lambda: mysdl.get(MY_NS, {'my_key'}))
219 assert my_ret_dict == {}
220 assert last_cb_channel == MY_CHANNEL and last_cb_message[0] == "REMOVE ALL PUBLISH"
221
222 stop_thread = True
223 mysdl.close()