Add support for notifications
[ric-plt/sdlpy.git] / ricsdl-package / examples / notify.py
diff --git a/ricsdl-package/examples/notify.py b/ricsdl-package/examples/notify.py
new file mode 100755 (executable)
index 0000000..7d2f203
--- /dev/null
@@ -0,0 +1,208 @@
+# Copyright (c) 2020 Samsung Electronics
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+#
+# This source code is part of the near-RT RIC (RAN Intelligent Controller)
+# platform project (RICP).
+#
+
+"""Examples on how to use Shared Data Layer (SDL) notification feature.
+
+Execution of  these examples requires:
+ * Following Redis extension commands have been installed to runtime environment:
+   - MSETPUB
+   - SETIE
+   - SETIEPUB
+   - SETNXPUB
+   - DELPUB
+   - DELIE
+   - DELIEPUB
+   Redis v4.0 or greater is required. Older versions do not support extension modules.
+   Implementation of above commands is produced by RIC DBaaS:
+   https://gerrit.o-ran-sc.org/r/admin/repos/ric-plt/dbaas
+   In official RIC deployments these commands are installed by `dbaas` service to Redis
+   container(s).
+   In development environment you may want install commands manually to pod/container, which is
+   running Redis.
+ * Following environment variables are needed to set to the pod/container where the application
+   utilizing SDL is going to be run.
+     DBAAS_SERVICE_HOST = [redis server address]
+     DBAAS_SERVICE_PORT= [redis server port]
+     DBAAS_MASTER_NAME = [master Redis sentinel name]. Needed to set only if sentinel is in use.
+     DBAAS_SERVICE_SENTINEL_PORT = [Redis sentinel port number]. Needed to set only if sentinel
+     is in use.
+"""
+
+import threading
+import time
+
+from ricsdl.syncstorage import SyncStorage
+from ricsdl.exceptions import RejectedByBackend, NotConnected, BackendError
+
+# There are two available methods for applications to handle notifications:
+#   - EVENT_LISTENER (true):
+#     - User calls sdl.start_event_listener() which will create an SDL managed
+#       event loop for handling messages.
+#   - EVENT_LISTENER (false):
+#     - User need to call sdl.handle_messages() which will return the message
+#
+# Note: In both cases, the given callback function will be executed.
+EVENT_LISTENER = True
+
+# Constants used in the examples below.
+MY_NS = 'my_ns'
+MY_CHANNEL = "my_channel"
+MY_LOCK = threading.Lock()
+
+
+def _try_func_return(func):
+    """
+    Generic wrapper function to call SDL API function and handle exceptions if they are raised.
+    """
+    try:
+        return func()
+    except RejectedByBackend as exp:
+        print(f'SDL function {func.__name__} failed: {str(exp)}')
+        # Permanent failure, just forward the exception
+        raise
+    except (NotConnected, BackendError) as exp:
+        print(f'SDL function {func.__name__} failed for a temporal error: {str(exp)}')
+        # Here we could have a retry logic
+
+
+def _try_func_callback_return(func):
+    """Generic wrapper function for testing SDL APIs with callback functions.
+
+    threading.Lock is unlocked in the callback function and threading.Lock is
+    only used to demonstrate that the callback function was called.
+    """
+    global MY_LOCK
+    MY_LOCK.acquire()
+    ret = _try_func_return(func)
+    while MY_LOCK.locked():
+        time.sleep(0.01)
+    return ret
+
+
+# Creates SDL instance. The call creates connection to the SDL database backend.
+mysdl = _try_func_return(SyncStorage)
+
+# Stores the last received channel and message
+last_cb_channel = ""
+last_cb_message = ""
+
+# Allows program to stop receive thread at the end of execution
+stop_thread = False
+
+
+def cb(channel: str, message: str):
+    """An example of function that will be called when an event is received.
+
+    This function sets last_cb_channel and last_cb_message as channel and
+    message respectively. This also unlocks the global lock variable.
+
+    Args:
+        channel: Channel where the message was received
+        message: Received message
+    """
+    global last_cb_channel, last_cb_message, MY_LOCK
+    last_cb_channel = channel
+    last_cb_message = message
+    if MY_LOCK.locked():
+        MY_LOCK.release()
+
+
+def listen_thread():
+    """An example of a listener thread that continuously calls sdl.handle_events()."""
+    global mysdl
+    global stop_thread
+    while not stop_thread:
+        message = mysdl.handle_events()
+        if message:
+            # You could process message here
+            pass
+        time.sleep(0.001)
+
+
+# As mentioned above, there are two available methods for applications to
+# handle notifications
+if EVENT_LISTENER:
+    _try_func_return(mysdl.start_event_listener)
+else:
+    thread = threading.Thread(target=listen_thread)
+    thread.start()
+
+# Subscribe to MY_CHANNEL. We expect that anytime we receive a message in the
+# channel, cb function will be called.
+_try_func_return(lambda: mysdl.subscribe_channel(MY_NS, cb, MY_CHANNEL))
+
+# Sets a value 'my_value' for a key 'my_key' under given namespace. Note that value
+# type must be bytes and multiple key values can be set in one set function call.
+_try_func_callback_return(
+    lambda: mysdl.set_and_publish(MY_NS, {MY_CHANNEL: "SET PUBLISH"}, {'my_key': b'my_value'}))
+assert last_cb_channel == MY_CHANNEL and last_cb_message == "SET PUBLISH"
+
+# Sets a value 'my_value2' for a key 'my_key' under given namespace only if the old value is
+# 'my_value'.
+was_set = _try_func_callback_return(lambda: mysdl.set_if_and_publish(
+    MY_NS, {MY_CHANNEL: "SET IF PUBLISH"}, 'my_key', b'my_value', b'my_value2'))
+assert was_set is True
+assert last_cb_channel == MY_CHANNEL and last_cb_message == "SET IF PUBLISH"
+# Try again. This time value 'my_value2' won't be set, because the key has already 'my_value2'
+# value. Callback function will not be called here.
+was_set = _try_func_return(lambda: mysdl.set_if_and_publish(MY_NS, {MY_CHANNEL: "SET IF PUBLISH"},
+                                                            'my_key', b'my_value', b'my_value2'))
+assert was_set is False
+
+# Sets a value 'my_value' for a key 'my_key2' under given namespace only if the key doesn't exist.
+# Note that value types must be bytes.
+was_set = _try_func_callback_return(lambda: mysdl.set_if_not_exists_and_publish(
+    MY_NS, {MY_CHANNEL: "SET IF NOT PUBLISH"}, 'my_key2', b'my_value'))
+assert was_set is True
+assert last_cb_channel == MY_CHANNEL and last_cb_message == "SET IF NOT PUBLISH"
+# Try again. This time the key 'my_key2' already exists. Callback function will not be called here.
+was_set = _try_func_return(lambda: mysdl.set_if_not_exists_and_publish(
+    MY_NS, {MY_CHANNEL: "SET IF NOT PUBLISH"}, 'my_key2', b'my_value'))
+assert was_set is False
+
+# Removes a key 'my_key' under given namespace.
+_try_func_callback_return(
+    lambda: mysdl.remove_and_publish(MY_NS, {MY_CHANNEL: "REMOVE PUBLISH"}, 'my_key'))
+my_ret_dict = _try_func_return(lambda: mysdl.get(MY_NS, 'my_key'))
+assert my_ret_dict == {}
+assert last_cb_channel == MY_CHANNEL and last_cb_message == "REMOVE PUBLISH"
+
+# Removes a key 'my_key' under given namespace only if the old value is 'my_value'.
+was_removed = _try_func_callback_return(lambda: mysdl.remove_if_and_publish(
+    MY_NS, {MY_CHANNEL: "REMOVE IF PUBLISH"}, 'my_key2', b'my_value'))
+assert was_removed is True
+assert last_cb_channel == MY_CHANNEL and last_cb_message == "REMOVE IF PUBLISH"
+# Try again to remove not anymore existing key 'my_key'. Callback function will not be called here.
+was_removed = _try_func_return(lambda: mysdl.remove_if_and_publish(
+    MY_NS, {MY_CHANNEL: "REMOVE IF PUBLISH"}, 'my_key2', b'my_value'))
+assert was_removed is False
+
+# Removes all the keys under given namespace.
+_try_func_return(lambda: mysdl.set(MY_NS, {'my_key': b'something'}))
+my_ret_dict = _try_func_return(lambda: mysdl.get(MY_NS, {'my_key'}))
+assert my_ret_dict != {}
+
+_try_func_callback_return(
+    lambda: mysdl.remove_all_and_publish(MY_NS, {MY_CHANNEL: "REMOVE ALL PUBLISH"}))
+my_ret_dict = _try_func_return(lambda: mysdl.get(MY_NS, {'my_key'}))
+assert my_ret_dict == {}
+assert last_cb_channel == MY_CHANNEL and last_cb_message == "REMOVE ALL PUBLISH"
+
+stop_thread = True
+mysdl.close()