In earlier implementation (commit
db775397) single event was passed to
to application notification callback as a string type and if there was
multiple notification then as a list of string. Similarly
'handle_events' function was either returning a single event as string
or a list of string. In a code it is unpleasant to handle variables what
can be either string or list type, because in application you need to
check always the variable type what comes from SDL for these functions
and based on the type either handle the value as a string or list of
string. That's why by this commit change event(s) to be passed as a list
to the notification callback what application has registered and also
now 'handle_events' function returns always received notifications a
list. Because of type change this code change is not backward
compatible.
Issue-ID: RIC-795
Signed-off-by: Timo Tietavainen <timo.tietavainen@nokia.com>
Change-Id: I49f2bd1012d93f184945c891df03ed83ed83d544
Version history
---------------
Version history
---------------
+[3.0.0] - 2021-06-10
+
+* Simplify v2.3.0 notification callback and handle_events API to return received
+ event(s) only as a list type. Earlier single event was returned as a string type.
+ Notification callback and handle_events API changes are not backward compatible.
+
[2.3.0] - 2021-06-09
* Pack all the events in a channel to one DB notification to be in line with SDL Golang.
[2.3.0] - 2021-06-09
* Pack all the events in a channel to one DB notification to be in line with SDL Golang.
-def cb(channel: str, message: Union[str, List[str]]):
- """An example of function that will be called when a single event or list of
- events are received.
+def cb(channel: str, message: List[str]):
+ """An example of function that will be called when an event list is received.
This function sets last_cb_channel and last_cb_message as channel and
message respectively. This also unlocks the global lock variable.
This function sets last_cb_channel and last_cb_message as channel and
message respectively. This also unlocks the global lock variable.
# type must be bytes and multiple key values can be set in one set function call.
_try_func_callback_return(
lambda: mysdl.set_and_publish(MY_NS, {MY_CHANNEL: "SET PUBLISH"}, {'my_key': b'my_value'}))
# type must be bytes and multiple key values can be set in one set function call.
_try_func_callback_return(
lambda: mysdl.set_and_publish(MY_NS, {MY_CHANNEL: "SET PUBLISH"}, {'my_key': b'my_value'}))
-assert last_cb_channel == MY_CHANNEL and last_cb_message == "SET PUBLISH"
+assert last_cb_channel == MY_CHANNEL and last_cb_message[0] == "SET PUBLISH"
# Sets a value 'my_value' for a key 'my_key' under given namespace. Note that value
# type must be bytes and multiple key values can be set in one set function call.
# Sets a value 'my_value' for a key 'my_key' under given namespace. Note that value
# type must be bytes and multiple key values can be set in one set function call.
was_set = _try_func_callback_return(lambda: mysdl.set_if_and_publish(
MY_NS, {MY_CHANNEL: "SET IF PUBLISH"}, 'my_key', b'my_value', b'my_value2'))
assert was_set is True
was_set = _try_func_callback_return(lambda: mysdl.set_if_and_publish(
MY_NS, {MY_CHANNEL: "SET IF PUBLISH"}, 'my_key', b'my_value', b'my_value2'))
assert was_set is True
-assert last_cb_channel == MY_CHANNEL and last_cb_message == "SET IF PUBLISH"
+assert last_cb_channel == MY_CHANNEL and last_cb_message[0] == "SET IF PUBLISH"
# Try again. This time value 'my_value2' won't be set, because the key has already 'my_value2'
# value. Callback function will not be called here.
was_set = _try_func_return(lambda: mysdl.set_if_and_publish(MY_NS, {MY_CHANNEL: "SET IF PUBLISH"},
# Try again. This time value 'my_value2' won't be set, because the key has already 'my_value2'
# value. Callback function will not be called here.
was_set = _try_func_return(lambda: mysdl.set_if_and_publish(MY_NS, {MY_CHANNEL: "SET IF PUBLISH"},
was_set = _try_func_callback_return(lambda: mysdl.set_if_not_exists_and_publish(
MY_NS, {MY_CHANNEL: "SET IF NOT PUBLISH"}, 'my_key2', b'my_value'))
assert was_set is True
was_set = _try_func_callback_return(lambda: mysdl.set_if_not_exists_and_publish(
MY_NS, {MY_CHANNEL: "SET IF NOT PUBLISH"}, 'my_key2', b'my_value'))
assert was_set is True
-assert last_cb_channel == MY_CHANNEL and last_cb_message == "SET IF NOT PUBLISH"
+assert last_cb_channel == MY_CHANNEL and last_cb_message[0] == "SET IF NOT PUBLISH"
# Try again. This time the key 'my_key2' already exists. Callback function will not be called here.
was_set = _try_func_return(lambda: mysdl.set_if_not_exists_and_publish(
MY_NS, {MY_CHANNEL: "SET IF NOT PUBLISH"}, 'my_key2', b'my_value'))
# Try again. This time the key 'my_key2' already exists. Callback function will not be called here.
was_set = _try_func_return(lambda: mysdl.set_if_not_exists_and_publish(
MY_NS, {MY_CHANNEL: "SET IF NOT PUBLISH"}, 'my_key2', b'my_value'))
lambda: mysdl.remove_and_publish(MY_NS, {MY_CHANNEL: "REMOVE PUBLISH"}, 'my_key'))
my_ret_dict = _try_func_return(lambda: mysdl.get(MY_NS, 'my_key'))
assert my_ret_dict == {}
lambda: mysdl.remove_and_publish(MY_NS, {MY_CHANNEL: "REMOVE PUBLISH"}, 'my_key'))
my_ret_dict = _try_func_return(lambda: mysdl.get(MY_NS, 'my_key'))
assert my_ret_dict == {}
-assert last_cb_channel == MY_CHANNEL and last_cb_message == "REMOVE PUBLISH"
+assert last_cb_channel == MY_CHANNEL and last_cb_message[0] == "REMOVE PUBLISH"
# Removes a key 'my_key' under given namespace only if the old value is 'my_value'.
was_removed = _try_func_callback_return(lambda: mysdl.remove_if_and_publish(
MY_NS, {MY_CHANNEL: "REMOVE IF PUBLISH"}, 'my_key2', b'my_value'))
assert was_removed is True
# Removes a key 'my_key' under given namespace only if the old value is 'my_value'.
was_removed = _try_func_callback_return(lambda: mysdl.remove_if_and_publish(
MY_NS, {MY_CHANNEL: "REMOVE IF PUBLISH"}, 'my_key2', b'my_value'))
assert was_removed is True
-assert last_cb_channel == MY_CHANNEL and last_cb_message == "REMOVE IF PUBLISH"
+assert last_cb_channel == MY_CHANNEL and last_cb_message[0] == "REMOVE IF PUBLISH"
# Try again to remove not anymore existing key 'my_key'. Callback function will not be called here.
was_removed = _try_func_return(lambda: mysdl.remove_if_and_publish(
MY_NS, {MY_CHANNEL: "REMOVE IF PUBLISH"}, 'my_key2', b'my_value'))
# Try again to remove not anymore existing key 'my_key'. Callback function will not be called here.
was_removed = _try_func_return(lambda: mysdl.remove_if_and_publish(
MY_NS, {MY_CHANNEL: "REMOVE IF PUBLISH"}, 'my_key2', b'my_value'))
lambda: mysdl.remove_all_and_publish(MY_NS, {MY_CHANNEL: "REMOVE ALL PUBLISH"}))
my_ret_dict = _try_func_return(lambda: mysdl.get(MY_NS, {'my_key'}))
assert my_ret_dict == {}
lambda: mysdl.remove_all_and_publish(MY_NS, {MY_CHANNEL: "REMOVE ALL PUBLISH"}))
my_ret_dict = _try_func_return(lambda: mysdl.get(MY_NS, {'my_key'}))
assert my_ret_dict == {}
-assert last_cb_channel == MY_CHANNEL and last_cb_message == "REMOVE ALL PUBLISH"
+assert last_cb_channel == MY_CHANNEL and last_cb_message[0] == "REMOVE ALL PUBLISH"
stop_thread = True
mysdl.close()
stop_thread = True
mysdl.close()
- def subscribe_channel(self, ns: str,
- cb: Union[Callable[[str, str], None], Callable[[str, List[str]], None]],
+ def subscribe_channel(self, ns: str, cb: Callable[[str, List[str]], None],
channels: List[str]) -> None:
"""
This takes a callback function and one or many channels to be subscribed.
When an event is received for the given channel, the given callback function
channels: List[str]) -> None:
"""
This takes a callback function and one or many channels to be subscribed.
When an event is received for the given channel, the given callback function
- shall be called with channel and notifications as parameter.
+ shall be called with channel and notification(s) as parameter.
- def handle_events(self) -> Optional[Union[Tuple[str, str], Tuple[str, List[str]]]]:
+ def handle_events(self) -> Optional[Tuple[str, List[str]]]:
"""
handle_events is a non-blocking function that returns a tuple containing channel
name and message(s) received from notification.
"""
handle_events is a non-blocking function that returns a tuple containing channel
name and message(s) received from notification.
for channel, events in channels_and_events.items():
self._queue.put((channel, events))
for channel, events in channels_and_events.items():
self._queue.put((channel, events))
- def subscribe_channel(self, ns: str,
- cb: Union[Callable[[str, str], None], Callable[[str, List[str]], None]],
+ def subscribe_channel(self, ns: str, cb: Callable[[str, List[str]], None],
channels: List[str]) -> None:
for channel in channels:
self._channel_cbs[channel] = cb
channels: List[str]) -> None:
for channel in channels:
self._channel_cbs[channel] = cb
message = self._queue.get()
cb = self._channel_cbs.get(message[0], None)
if cb:
message = self._queue.get()
cb = self._channel_cbs.get(message[0], None)
if cb:
- cb(message[0], message[1][0] if (isinstance(message[1], list) and len(message[1]) == 1) else message[1])
+ cb(message[0], message[1])
time.sleep(0.001)
def unsubscribe_channel(self, ns: str, channels: List[str]) -> None:
time.sleep(0.001)
def unsubscribe_channel(self, ns: str, channels: List[str]) -> None:
self._listen_thread.start()
self._run_in_thread = True
self._listen_thread.start()
self._run_in_thread = True
- def handle_events(self) -> Optional[Union[Tuple[str, str], Tuple[str, List[str]]]]:
+ def handle_events(self) -> Optional[Tuple[str, List[str]]]:
if self._listen_thread.is_alive() or self._run_in_thread:
raise Exception("Event loop already started")
try:
if self._listen_thread.is_alive() or self._run_in_thread:
raise Exception("Event loop already started")
try:
except queue.Empty:
return None
cb = self._channel_cbs.get(message[0], None)
except queue.Empty:
return None
cb = self._channel_cbs.get(message[0], None)
- notifications = message[1][0] if (isinstance(message[1], list) and len(message[1]) == 1) else message[1]
- cb(message[0], notifications)
- return (message[0], notifications)
+ cb(message[0], message[1])
+ return (message[0], message[1])
class FakeDictBackendLock(DbBackendLockAbc):
class FakeDictBackendLock(DbBackendLockAbc):
message_channel = self._strip_ns_from_bin_key('', message['channel'])
message_data = message['data'].decode('utf-8')
messages = message_data.split(self.event_separator)
message_channel = self._strip_ns_from_bin_key('', message['channel'])
message_data = message['data'].decode('utf-8')
messages = message_data.split(self.event_separator)
- notification = messages[0] if len(messages) == 1 else messages
- handler(message_channel, notification)
- return message_channel, notification
+ handler(message_channel, messages)
+ return message_channel, messages
elif message_type != 'pong':
# this is a subscribe/unsubscribe message. ignore if we don't
# want them
elif message_type != 'pong':
# this is a subscribe/unsubscribe message. ignore if we don't
# want them
*channels_and_events_prepared,
)
*channels_and_events_prepared,
)
- def subscribe_channel(self, ns: str,
- cb: Union[Callable[[str, str], None], Callable[[str, List[str]], None]],
+ def subscribe_channel(self, ns: str, cb: Callable[[str, List[str]], None],
channels: List[str]) -> None:
channels = self.__add_keys_ns_prefix(ns, channels)
for channel in channels:
channels: List[str]) -> None:
channels = self.__add_keys_ns_prefix(ns, channels)
for channel in channels:
redis_ctx.pubsub_thread = redis_ctx.redis_pubsub.run_in_thread(sleep_time=0.001, daemon=True)
redis_ctx.run_in_thread = True
redis_ctx.pubsub_thread = redis_ctx.redis_pubsub.run_in_thread(sleep_time=0.001, daemon=True)
redis_ctx.run_in_thread = True
- def handle_events(self) -> Optional[Union[Tuple[str, str], Tuple[str, List[str]]]]:
+ def handle_events(self) -> Optional[Tuple[str, List[str]]]:
if self.next_client_event >= len(self.clients):
self.next_client_event = 0
redis_ctx = self.clients[self.next_client_event]
if self.next_client_event >= len(self.clients):
self.next_client_event = 0
redis_ctx = self.clients[self.next_client_event]
self.__dbbackend.remove_all_and_publish(ns, channels_and_events)
@func_arg_checker(SdlTypeError, 1, ns=str, cb=Callable, channels=(str, builtins.set))
self.__dbbackend.remove_all_and_publish(ns, channels_and_events)
@func_arg_checker(SdlTypeError, 1, ns=str, cb=Callable, channels=(str, builtins.set))
- def subscribe_channel(self, ns: str,
- cb: Union[Callable[[str, str], None], Callable[[str, List[str]], None]],
+ def subscribe_channel(self, ns: str, cb: Callable[[str, List[str]], None],
channels: Union[str, Set[str]]) -> None:
self._validate_callback(cb)
channels = [channels] if isinstance(channels, str) else list(channels)
channels: Union[str, Set[str]]) -> None:
self._validate_callback(cb)
channels = [channels] if isinstance(channels, str) else list(channels)
def start_event_listener(self) -> None:
self.__dbbackend.start_event_listener()
def start_event_listener(self) -> None:
self.__dbbackend.start_event_listener()
- def handle_events(self) -> Optional[Union[Tuple[str, str], Tuple[str, List[str]]]]:
+ def handle_events(self) -> Optional[Tuple[str, List[str]]]:
return self.__dbbackend.handle_events()
@func_arg_checker(SdlTypeError, 1, ns=str, resource=str, expiration=(int, float))
return self.__dbbackend.handle_events()
@func_arg_checker(SdlTypeError, 1, ns=str, resource=str, expiration=(int, float))
- def subscribe_channel(self, ns: str,
- cb: Union[Callable[[str, str], None], Callable[[str, List[str]], None]],
+ def subscribe_channel(self, ns: str, cb: Callable[[str, List[str]], None],
channels: Union[str, Set[str]]) -> None:
"""
Subscribes the client to the specified channels.
channels: Union[str, Set[str]]) -> None:
"""
Subscribes the client to the specified channels.
When subscribing for a channel, a callback function is given as a parameter.
Whenever single notification or many notifications are received from a channel,
When subscribing for a channel, a callback function is given as a parameter.
Whenever single notification or many notifications are received from a channel,
- this callback is called with channel and notifications as parameter. A call to
- subscribe_channel function returns immediately, callbacks will be called
+ this callback is called with channel and notification list as parameter. A call
+ to subscribe_channel function returns immediately, callbacks will be called
synchronously from a dedicated thread.
It is possible to subscribe to different channels using different callbacks. In
synchronously from a dedicated thread.
It is possible to subscribe to different channels using different callbacks. In
Args:
ns: Namespace under which this operation is targeted.
Args:
ns: Namespace under which this operation is targeted.
- cb: A function that is called when events on channel are received.
+ cb: A function that is called when event(s) on channel is received.
channels: One channel or multiple channels to be subscribed.
Returns:
channels: One channel or multiple channels to be subscribed.
Returns:
- def handle_events(self) -> Optional[Union[Tuple[str, str], Tuple[str, List[str]]]]:
+ def handle_events(self) -> Optional[Tuple[str, List[str]]]:
"""
handle_events is a non-blocking function that returns a tuple containing channel
name and message(s) received from an event. The registered callback function will
"""
handle_events is a non-blocking function that returns a tuple containing channel
name and message(s) received from an event. The registered callback function will
events handling starts.
Returns:
events handling starts.
Returns:
- Tuple: (channel: str, message: str) or
- Tuple: (channel: str, messages: list of str)
+ Tuple: (channel: str, message(s): list of str)
Raises:
SdlTypeError: If function's argument is of an inappropriate type.
Raises:
SdlTypeError: If function's argument is of an inappropriate type.
cls.pubsub.channels = {b'{some-ns},ch1': Mock()}
def test_handle_pubsub_message(self):
cls.pubsub.channels = {b'{some-ns},ch1': Mock()}
def test_handle_pubsub_message(self):
- assert self.pubsub.handle_message([b'message', b'{some-ns},ch1', b'cbn']) == ('ch1', 'cbn')
- self.pubsub.channels.get(b'{some-ns},ch1').assert_called_once_with('ch1', 'cbn')
+ assert self.pubsub.handle_message([b'message', b'{some-ns},ch1', b'cbn']) == ('ch1', ['cbn'])
+ self.pubsub.channels.get(b'{some-ns},ch1').assert_called_once_with('ch1', ['cbn'])