Following SDL APIs are defined so that multiple channel-event pairs can be
given as function argument but actual SDL implementation utilized such a DBAAS
(Redis) module what expect to get only one channel-event pair. Fix the
implementation of these SDL APIs to use the correct DBAAS module what support
multiple channel-event pairs:
* set_if_and_publish()
* set_if_not_exists_and_publish()
* remove_if_and_publish()
Please note that in runtime environment DBAAS service needs to run on DBAAS
image version 0.4.0 or newer. Older images do not have multiple channel-event
pairs support as a Redis module.
Updated also start_event_listener() and handle_events() description to
emphasize that channel-event subscription must be done before calling those
functions, otherwise open-source Redis client application could throw an
exception due to not having any subscription when it receives some event, for
example a ping event in case of the DBAAS Sentinel deployment.
Issue-ID: RIC-758
Signed-off-by: Timo Tietavainen <timo.tietavainen@nokia.com>
Change-Id: I75d4e3316d7387b7a1a735fe3eb357de7ef794bb
Version history
---------------
Version history
---------------
+[2.1.1] - 2021-03-09
+
+* Take DBAAS multi-channel publishing Redis modules into use.
+
[2.1.0] - 2020-08-26
* Add support for notifications
[2.1.0] - 2020-08-26
* Add support for notifications
-Notifications functionality provide SDL clients the possibility to receive notifications about data changes in SDL namespaces. SDL client receiving notifications about data changes is referred to as "subscriber", while the SDL client modifying data and publishing notifications is referred to as "publisher".
+Notifications functionality provide SDL clients the possibility to receive
+notifications about data changes in SDL namespaces. SDL client receiving
+notifications about data changes is referred to as "subscriber", while the SDL
+client modifying data and publishing notifications is referred to as
+"publisher".
* Following Redis extension commands have been installed to runtime environment:
- MSETPUB
- SETIE
* Following Redis extension commands have been installed to runtime environment:
- MSETPUB
- SETIE
- - SETIEPUB
- - SETNXPUB
- - DELPUB
+ - SETIEMPUB
+ - SETNXMPUB
+ - DELMPUB
Redis v4.0 or greater is required. Older versions do not support extension modules.
Implementation of above commands is produced by RIC DBaaS:
https://gerrit.o-ran-sc.org/r/admin/repos/ric-plt/dbaas
Redis v4.0 or greater is required. Older versions do not support extension modules.
Implementation of above commands is produced by RIC DBaaS:
https://gerrit.o-ran-sc.org/r/admin/repos/ric-plt/dbaas
+# Subscribe to MY_CHANNEL. We expect that anytime we receive a message in the
+# channel, cb function will be called.
+_try_func_return(lambda: mysdl.subscribe_channel(MY_NS, cb, MY_CHANNEL))
# As mentioned above, there are two available methods for applications to
# handle notifications
# As mentioned above, there are two available methods for applications to
# handle notifications
thread = threading.Thread(target=listen_thread)
thread.start()
thread = threading.Thread(target=listen_thread)
thread.start()
-# Subscribe to MY_CHANNEL. We expect that anytime we receive a message in the
-# channel, cb function will be called.
-_try_func_return(lambda: mysdl.subscribe_channel(MY_NS, cb, MY_CHANNEL))
-
# Sets a value 'my_value' for a key 'my_key' under given namespace. Note that value
# type must be bytes and multiple key values can be set in one set function call.
_try_func_callback_return(
# Sets a value 'my_value' for a key 'my_key' under given namespace. Note that value
# type must be bytes and multiple key values can be set in one set function call.
_try_func_callback_return(
* Following Redis extension commands have been installed to runtime environment:
- MSETPUB
- SETIE
* Following Redis extension commands have been installed to runtime environment:
- MSETPUB
- SETIE
- - SETIEPUB
- - SETNXPUB
- - DELPUB
+ - SETIEMPUB
+ - SETNXMPUB
+ - DELMPUB
Redis v4.0 or greater is required. Older versions do not support extension modules.
Implementation of above commands is produced by RIC DBaaS:
https://gerrit.o-ran-sc.org/r/admin/repos/ric-plt/dbaas
Redis v4.0 or greater is required. Older versions do not support extension modules.
Implementation of above commands is produced by RIC DBaaS:
https://gerrit.o-ran-sc.org/r/admin/repos/ric-plt/dbaas
values = self.__redis.mget(db_keys)
for idx, val in enumerate(values):
# return only key values, which has a value
values = self.__redis.mget(db_keys)
for idx, val in enumerate(values):
# return only key values, which has a value
ret[keys[idx]] = val
return ret
ret[keys[idx]] = val
return ret
channels_and_events_prepared = []
channels_and_events_prepared, _ = self._prepare_channels(ns, channels_and_events)
with _map_to_sdl_exception():
channels_and_events_prepared = []
channels_and_events_prepared, _ = self._prepare_channels(ns, channels_and_events)
with _map_to_sdl_exception():
- ret = self.__redis.execute_command("SETIEPUB", db_key, new_data, old_data,
+ ret = self.__redis.execute_command("SETIEMPUB", db_key, new_data, old_data,
*channels_and_events_prepared)
return ret == b"OK"
*channels_and_events_prepared)
return ret == b"OK"
db_key = self._add_key_ns_prefix(ns, key)
channels_and_events_prepared, _ = self._prepare_channels(ns, channels_and_events)
with _map_to_sdl_exception():
db_key = self._add_key_ns_prefix(ns, key)
channels_and_events_prepared, _ = self._prepare_channels(ns, channels_and_events)
with _map_to_sdl_exception():
- ret = self.__redis.execute_command("SETNXPUB", db_key, data,
+ ret = self.__redis.execute_command("SETNXMPUB", db_key, data,
*channels_and_events_prepared)
return ret == b"OK"
*channels_and_events_prepared)
return ret == b"OK"
db_key = self._add_key_ns_prefix(ns, key)
channels_and_events_prepared, _ = self._prepare_channels(ns, channels_and_events)
with _map_to_sdl_exception():
db_key = self._add_key_ns_prefix(ns, key)
channels_and_events_prepared, _ = self._prepare_channels(ns, channels_and_events)
with _map_to_sdl_exception():
- ret = self.__redis.execute_command("DELIEPUB", db_key, data,
+ ret = self.__redis.execute_command("DELIEMPUB", db_key, data,
*channels_and_events_prepared)
return bool(ret)
*channels_and_events_prepared)
return bool(ret)
events from subscriptions. The registered callback function will be called
when an event is received.
events from subscriptions. The registered callback function will be called
when an event is received.
+ It should be noted that subscribe_channel must be called before calling
+ start_event_listener to do at least one subscription before event loop
+ starts.
+
Raises:
SdlTypeError: If function's argument is of an inappropriate type.
NotConnected: If SDL is not connected to the backend data storage.
Raises:
SdlTypeError: If function's argument is of an inappropriate type.
NotConnected: If SDL is not connected to the backend data storage.
event loop. Calling this function after start_event_listener raises an exception.
If there are no notifications, these returns None.
event loop. Calling this function after start_event_listener raises an exception.
If there are no notifications, these returns None.
+ It should be noted that subscribe_channel must be called before calling of the
+ handle_events in an event loop. At least one subscription must be done before
+ events handling starts.
+
Returns:
Tuple: (channel: str, message: str)
Returns:
Tuple: (channel: str, message: str)
self.mock_redis.execute_command.return_value = b"OK"
ret = self.db.set_if_and_publish(self.ns, self.channels_and_events, self.key, self.old_data,
self.new_data)
self.mock_redis.execute_command.return_value = b"OK"
ret = self.db.set_if_and_publish(self.ns, self.channels_and_events, self.key, self.old_data,
self.new_data)
- self.mock_redis.execute_command.assert_called_once_with('SETIEPUB', self.key_redis,
+ self.mock_redis.execute_command.assert_called_once_with('SETIEMPUB', self.key_redis,
self.new_data, self.old_data,
*self.channels_and_events_redis)
assert ret is True
self.new_data, self.old_data,
*self.channels_and_events_redis)
assert ret is True
self.mock_redis.execute_command.return_value = None
ret = self.db.set_if_and_publish(self.ns, self.channels_and_events, self.key, self.old_data,
self.new_data)
self.mock_redis.execute_command.return_value = None
ret = self.db.set_if_and_publish(self.ns, self.channels_and_events, self.key, self.old_data,
self.new_data)
- self.mock_redis.execute_command.assert_called_once_with('SETIEPUB', self.key_redis,
+ self.mock_redis.execute_command.assert_called_once_with('SETIEMPUB', self.key_redis,
self.new_data, self.old_data,
*self.channels_and_events_redis)
assert ret is False
self.new_data, self.old_data,
*self.channels_and_events_redis)
assert ret is False
self.mock_redis.execute_command.return_value = b"OK"
ret = self.db.set_if_not_exists_and_publish(self.ns, self.channels_and_events, self.key,
self.new_data)
self.mock_redis.execute_command.return_value = b"OK"
ret = self.db.set_if_not_exists_and_publish(self.ns, self.channels_and_events, self.key,
self.new_data)
- self.mock_redis.execute_command.assert_called_once_with('SETNXPUB', self.key_redis,
+ self.mock_redis.execute_command.assert_called_once_with('SETNXMPUB', self.key_redis,
self.new_data,
*self.channels_and_events_redis)
assert ret is True
self.new_data,
*self.channels_and_events_redis)
assert ret is True
self.mock_redis.execute_command.return_value = None
ret = self.db.set_if_not_exists_and_publish(self.ns, self.channels_and_events, self.key,
self.new_data)
self.mock_redis.execute_command.return_value = None
ret = self.db.set_if_not_exists_and_publish(self.ns, self.channels_and_events, self.key,
self.new_data)
- self.mock_redis.execute_command.assert_called_once_with('SETNXPUB', self.key_redis,
+ self.mock_redis.execute_command.assert_called_once_with('SETNXMPUB', self.key_redis,
self.new_data,
*self.channels_and_events_redis)
assert ret is False
self.new_data,
*self.channels_and_events_redis)
assert ret is False
self.mock_redis.execute_command.return_value = 1
ret = self.db.remove_if_and_publish(self.ns, self.channels_and_events, self.key,
self.new_data)
self.mock_redis.execute_command.return_value = 1
ret = self.db.remove_if_and_publish(self.ns, self.channels_and_events, self.key,
self.new_data)
- self.mock_redis.execute_command.assert_called_once_with('DELIEPUB', self.key_redis,
+ self.mock_redis.execute_command.assert_called_once_with('DELIEMPUB', self.key_redis,
self.new_data,
*self.channels_and_events_redis)
assert ret is True
self.new_data,
*self.channels_and_events_redis)
assert ret is True
self.mock_redis.execute_command.return_value = 0
ret = self.db.remove_if_and_publish(self.ns, self.channels_and_events, self.key,
self.new_data)
self.mock_redis.execute_command.return_value = 0
ret = self.db.remove_if_and_publish(self.ns, self.channels_and_events, self.key,
self.new_data)
- self.mock_redis.execute_command.assert_called_once_with('DELIEPUB', self.key_redis,
+ self.mock_redis.execute_command.assert_called_once_with('DELIEMPUB', self.key_redis,
self.new_data,
*self.channels_and_events_redis)
assert ret is False
self.new_data,
*self.channels_and_events_redis)
assert ret is False