Version history
---------------
+[3.0.0] - 2021-06-10
+
+* Simplify v2.3.0 notification callback and handle_events API to return received
+ event(s) only as a list type. Earlier single event was returned as a string type.
+ Notification callback and handle_events API changes are not backward compatible.
+
[2.3.0] - 2021-06-09
* Pack all the events in a channel to one DB notification to be in line with SDL Golang.
stop_thread = False
-def cb(channel: str, message: Union[str, List[str]]):
- """An example of function that will be called when a single event or list of
- events are received.
+def cb(channel: str, message: List[str]):
+ """An example of function that will be called when an event list is received.
This function sets last_cb_channel and last_cb_message as channel and
message respectively. This also unlocks the global lock variable.
# type must be bytes and multiple key values can be set in one set function call.
_try_func_callback_return(
lambda: mysdl.set_and_publish(MY_NS, {MY_CHANNEL: "SET PUBLISH"}, {'my_key': b'my_value'}))
-assert last_cb_channel == MY_CHANNEL and last_cb_message == "SET PUBLISH"
+assert last_cb_channel == MY_CHANNEL and last_cb_message[0] == "SET PUBLISH"
# Sets a value 'my_value' for a key 'my_key' under given namespace. Note that value
# type must be bytes and multiple key values can be set in one set function call.
was_set = _try_func_callback_return(lambda: mysdl.set_if_and_publish(
MY_NS, {MY_CHANNEL: "SET IF PUBLISH"}, 'my_key', b'my_value', b'my_value2'))
assert was_set is True
-assert last_cb_channel == MY_CHANNEL and last_cb_message == "SET IF PUBLISH"
+assert last_cb_channel == MY_CHANNEL and last_cb_message[0] == "SET IF PUBLISH"
# Try again. This time value 'my_value2' won't be set, because the key has already 'my_value2'
# value. Callback function will not be called here.
was_set = _try_func_return(lambda: mysdl.set_if_and_publish(MY_NS, {MY_CHANNEL: "SET IF PUBLISH"},
was_set = _try_func_callback_return(lambda: mysdl.set_if_not_exists_and_publish(
MY_NS, {MY_CHANNEL: "SET IF NOT PUBLISH"}, 'my_key2', b'my_value'))
assert was_set is True
-assert last_cb_channel == MY_CHANNEL and last_cb_message == "SET IF NOT PUBLISH"
+assert last_cb_channel == MY_CHANNEL and last_cb_message[0] == "SET IF NOT PUBLISH"
# Try again. This time the key 'my_key2' already exists. Callback function will not be called here.
was_set = _try_func_return(lambda: mysdl.set_if_not_exists_and_publish(
MY_NS, {MY_CHANNEL: "SET IF NOT PUBLISH"}, 'my_key2', b'my_value'))
lambda: mysdl.remove_and_publish(MY_NS, {MY_CHANNEL: "REMOVE PUBLISH"}, 'my_key'))
my_ret_dict = _try_func_return(lambda: mysdl.get(MY_NS, 'my_key'))
assert my_ret_dict == {}
-assert last_cb_channel == MY_CHANNEL and last_cb_message == "REMOVE PUBLISH"
+assert last_cb_channel == MY_CHANNEL and last_cb_message[0] == "REMOVE PUBLISH"
# Removes a key 'my_key' under given namespace only if the old value is 'my_value'.
was_removed = _try_func_callback_return(lambda: mysdl.remove_if_and_publish(
MY_NS, {MY_CHANNEL: "REMOVE IF PUBLISH"}, 'my_key2', b'my_value'))
assert was_removed is True
-assert last_cb_channel == MY_CHANNEL and last_cb_message == "REMOVE IF PUBLISH"
+assert last_cb_channel == MY_CHANNEL and last_cb_message[0] == "REMOVE IF PUBLISH"
# Try again to remove not anymore existing key 'my_key'. Callback function will not be called here.
was_removed = _try_func_return(lambda: mysdl.remove_if_and_publish(
MY_NS, {MY_CHANNEL: "REMOVE IF PUBLISH"}, 'my_key2', b'my_value'))
lambda: mysdl.remove_all_and_publish(MY_NS, {MY_CHANNEL: "REMOVE ALL PUBLISH"}))
my_ret_dict = _try_func_return(lambda: mysdl.get(MY_NS, {'my_key'}))
assert my_ret_dict == {}
-assert last_cb_channel == MY_CHANNEL and last_cb_message == "REMOVE ALL PUBLISH"
+assert last_cb_channel == MY_CHANNEL and last_cb_message[0] == "REMOVE ALL PUBLISH"
stop_thread = True
mysdl.close()
)
-__version__ = '2.3.0'
+__version__ = '3.0.0'
__all__ = [
pass
@abstractmethod
- def subscribe_channel(self, ns: str,
- cb: Union[Callable[[str, str], None], Callable[[str, List[str]], None]],
+ def subscribe_channel(self, ns: str, cb: Callable[[str, List[str]], None],
channels: List[str]) -> None:
"""
This takes a callback function and one or many channels to be subscribed.
When an event is received for the given channel, the given callback function
- shall be called with channel and notifications as parameter.
+ shall be called with channel and notification(s) as parameter.
"""
pass
pass
@abstractmethod
- def handle_events(self) -> Optional[Union[Tuple[str, str], Tuple[str, List[str]]]]:
+ def handle_events(self) -> Optional[Tuple[str, List[str]]]:
"""
handle_events is a non-blocking function that returns a tuple containing channel
name and message(s) received from notification.
for channel, events in channels_and_events.items():
self._queue.put((channel, events))
- def subscribe_channel(self, ns: str,
- cb: Union[Callable[[str, str], None], Callable[[str, List[str]], None]],
+ def subscribe_channel(self, ns: str, cb: Callable[[str, List[str]], None],
channels: List[str]) -> None:
for channel in channels:
self._channel_cbs[channel] = cb
message = self._queue.get()
cb = self._channel_cbs.get(message[0], None)
if cb:
- cb(message[0], message[1][0] if (isinstance(message[1], list) and len(message[1]) == 1) else message[1])
+ cb(message[0], message[1])
time.sleep(0.001)
def unsubscribe_channel(self, ns: str, channels: List[str]) -> None:
self._listen_thread.start()
self._run_in_thread = True
- def handle_events(self) -> Optional[Union[Tuple[str, str], Tuple[str, List[str]]]]:
+ def handle_events(self) -> Optional[Tuple[str, List[str]]]:
if self._listen_thread.is_alive() or self._run_in_thread:
raise Exception("Event loop already started")
try:
except queue.Empty:
return None
cb = self._channel_cbs.get(message[0], None)
- notifications = message[1][0] if (isinstance(message[1], list) and len(message[1]) == 1) else message[1]
if cb:
- cb(message[0], notifications)
- return (message[0], notifications)
+ cb(message[0], message[1])
+ return (message[0], message[1])
class FakeDictBackendLock(DbBackendLockAbc):
message_channel = self._strip_ns_from_bin_key('', message['channel'])
message_data = message['data'].decode('utf-8')
messages = message_data.split(self.event_separator)
- notification = messages[0] if len(messages) == 1 else messages
- handler(message_channel, notification)
- return message_channel, notification
+ handler(message_channel, messages)
+ return message_channel, messages
elif message_type != 'pong':
# this is a subscribe/unsubscribe message. ignore if we don't
# want them
*channels_and_events_prepared,
)
- def subscribe_channel(self, ns: str,
- cb: Union[Callable[[str, str], None], Callable[[str, List[str]], None]],
+ def subscribe_channel(self, ns: str, cb: Callable[[str, List[str]], None],
channels: List[str]) -> None:
channels = self.__add_keys_ns_prefix(ns, channels)
for channel in channels:
redis_ctx.pubsub_thread = redis_ctx.redis_pubsub.run_in_thread(sleep_time=0.001, daemon=True)
redis_ctx.run_in_thread = True
- def handle_events(self) -> Optional[Union[Tuple[str, str], Tuple[str, List[str]]]]:
+ def handle_events(self) -> Optional[Tuple[str, List[str]]]:
if self.next_client_event >= len(self.clients):
self.next_client_event = 0
redis_ctx = self.clients[self.next_client_event]
self.__dbbackend.remove_all_and_publish(ns, channels_and_events)
@func_arg_checker(SdlTypeError, 1, ns=str, cb=Callable, channels=(str, builtins.set))
- def subscribe_channel(self, ns: str,
- cb: Union[Callable[[str, str], None], Callable[[str, List[str]], None]],
+ def subscribe_channel(self, ns: str, cb: Callable[[str, List[str]], None],
channels: Union[str, Set[str]]) -> None:
self._validate_callback(cb)
channels = [channels] if isinstance(channels, str) else list(channels)
def start_event_listener(self) -> None:
self.__dbbackend.start_event_listener()
- def handle_events(self) -> Optional[Union[Tuple[str, str], Tuple[str, List[str]]]]:
+ def handle_events(self) -> Optional[Tuple[str, List[str]]]:
return self.__dbbackend.handle_events()
@func_arg_checker(SdlTypeError, 1, ns=str, resource=str, expiration=(int, float))
pass
@abstractmethod
- def subscribe_channel(self, ns: str,
- cb: Union[Callable[[str, str], None], Callable[[str, List[str]], None]],
+ def subscribe_channel(self, ns: str, cb: Callable[[str, List[str]], None],
channels: Union[str, Set[str]]) -> None:
"""
Subscribes the client to the specified channels.
When subscribing for a channel, a callback function is given as a parameter.
Whenever single notification or many notifications are received from a channel,
- this callback is called with channel and notifications as parameter. A call to
- subscribe_channel function returns immediately, callbacks will be called
+ this callback is called with channel and notification list as parameter. A call
+ to subscribe_channel function returns immediately, callbacks will be called
synchronously from a dedicated thread.
It is possible to subscribe to different channels using different callbacks. In
Args:
ns: Namespace under which this operation is targeted.
- cb: A function that is called when events on channel are received.
+ cb: A function that is called when event(s) on channel is received.
channels: One channel or multiple channels to be subscribed.
Returns:
pass
@abstractmethod
- def handle_events(self) -> Optional[Union[Tuple[str, str], Tuple[str, List[str]]]]:
+ def handle_events(self) -> Optional[Tuple[str, List[str]]]:
"""
handle_events is a non-blocking function that returns a tuple containing channel
name and message(s) received from an event. The registered callback function will
events handling starts.
Returns:
- Tuple: (channel: str, message: str) or
- Tuple: (channel: str, messages: list of str)
+ Tuple: (channel: str, message(s): list of str)
Raises:
SdlTypeError: If function's argument is of an inappropriate type.
cls.pubsub.channels = {b'{some-ns},ch1': Mock()}
def test_handle_pubsub_message(self):
- assert self.pubsub.handle_message([b'message', b'{some-ns},ch1', b'cbn']) == ('ch1', 'cbn')
- self.pubsub.channels.get(b'{some-ns},ch1').assert_called_once_with('ch1', 'cbn')
+ assert self.pubsub.handle_message([b'message', b'{some-ns},ch1', b'cbn']) == ('ch1', ['cbn'])
+ self.pubsub.channels.get(b'{some-ns},ch1').assert_called_once_with('ch1', ['cbn'])