"""The module provides synchronous Shared Data Layer (SDL) interface."""
-from typing import (Dict, Set, List, Union)
+from typing import (Callable, Dict, Set, List, Optional, Tuple, Union)
from abc import ABC, abstractmethod
from ricsdl.exceptions import (
RejectedByBackend
A concrete implementation subclass 'SyncStorage' derives from this abstract class.
"""
+ @abstractmethod
+ def is_active(self):
+ """
+ Verify SDL storage healthiness.
+
+ Verify SDL connection to the backend data storage.
+
+ Args:
+ None
+
+ Returns:
+ bool: True if SDL is operational, false otherwise.
+
+ Raises:
+ None
+ """
+ pass
+
@abstractmethod
def close(self):
"""
"""
pass
+ @abstractmethod
+ def set_and_publish(self, ns: str, channels_and_events: Dict[str, Union[str, List[str]]],
+ data_map: Dict[str, bytes]) -> None:
+ """
+ Publish event to channel after writing data.
+
+ set_and_publish function writes data to shared data layer storage and sends an
+ event to a channel. Writing is done atomically, i.e. all succeeds or fails.
+ Data to be written is given as key-value pairs. Several key-value pairs can be
+ written with one call.
+ The key is expected to be string whereas value is a byte string.
+
+ If data was set successfully, an event is sent to a channel.
+ It is possible to send several events to several channels by giving a list of
+ events
+ E.g. {"channel1": ["event1", "event3"], "channel2": ["event2"]}
+ will send event1 and event3 to channel1 and event2 to channel2.
+
+ Args:
+ ns: Namespace under which this operation is targeted.
+ channels_and_events: Channel to publish if data was set successfully.
+ data_map: Data to be written.
+
+ Returns:
+ None
+
+ Raises:
+ SdlTypeError: If function's argument is of an inappropriate type.
+ NotConnected: If SDL is not connected to the backend data storage.
+ RejectedByBackend: If backend data storage rejects the request.
+ BackendError: If the backend data storage fails to process the request.
+ """
+ pass
+
+ @abstractmethod
+ def set_if_and_publish(self, ns: str, channels_and_events: Dict[str, Union[str, List[str]]],
+ key: str, old_data: bytes, new_data: bytes) -> bool:
+ """
+ Publish event to channel after conditionally modifying the value of a key if the
+ current value in data storage matches the user's last known value.
+
+ set_if_and_publish atomically replaces existing data with new_data in SDL if data
+ matches the old_data. If replace was done successfully, true will be returned.
+ Also, if publishing was successful, an event is published to a given channel.
+
+ Args:
+ ns (str): Namespace under which this operation is targeted.
+ channels_and_events (dict): Channel to publish if data was set successfully.
+ key (str): Key for which data modification will be executed.
+ old_data (bytes): Last known data.
+ new_data (bytes): Data to be written.
+
+ Returns:
+ bool: True for successful modification, false if the user's last known data did not
+ match the current value in data storage.
+
+ Raises:
+ SdlTypeError: If function's argument is of an inappropriate type.
+ NotConnected: If SDL is not connected to the backend data storage.
+ RejectedByBackend: If backend data storage rejects the request.
+ BackendError: If the backend data storage fails to process the request.
+ """
+ pass
+
+ @abstractmethod
+ def set_if_not_exists_and_publish(self, ns: str,
+ channels_and_events: Dict[str, Union[str, List[str]]],
+ key: str, data: bytes) -> bool:
+ """
+ Publish event to channel after writing data to SDL storage if key does not exist.
+
+ set_if_not_exists_and_publish conditionally sets the value of a key. If key
+ already exists in SDL, then it's value is not changed. Checking the key existence
+ and potential set operation is done atomically. If the set operation was done
+ successfully, an event is published to a given channel.
+
+ Args:
+ ns (str): Namespace under which this operation is targeted.
+ channels_and_events (dict): Channel to publish if data was set successfully.
+ key (str): Key to be set.
+ data (bytes): Data to be written.
+
+ Returns:
+ bool: True if key didn't exist yet and set operation was executed, false if key already
+ existed and thus its value was left untouched.
+
+ Raises:
+ SdlTypeError: If function's argument is of an inappropriate type.
+ NotConnected: If SDL is not connected to the backend data storage.
+ RejectedByBackend: If backend data storage rejects the request.
+ BackendError: If the backend data storage fails to process the request.
+ """
+ pass
+
+ @abstractmethod
+ def remove_and_publish(self, ns: str, channels_and_events: Dict[str, Union[str, List[str]]],
+ keys: Union[str, Set[str]]) -> None:
+ """
+ Publish event to channel after removing data.
+
+ remove_and_publish removes data from SDL. Operation is done atomically, i.e.
+ either all succeeds or fails.
+ Trying to remove a nonexisting key is not considered as an error.
+ An event is published into a given channel if remove operation is successful and
+ at least one key is removed (if several keys given). If the given key(s) doesn't
+ exist when trying to remove, no event is published.
+
+ Args:
+ ns: Namespace under which this operation is targeted.
+ channels_and_events: Channel to publish if data was removed successfully.
+ keys: One key or multiple keys, which data is to be removed.
+
+ Returns:
+ None
+
+ Raises:
+ SdlTypeError: If function's argument is of an inappropriate type.
+ NotConnected: If SDL is not connected to the backend data storage.
+ RejectedByBackend: If backend data storage rejects the request.
+ BackendError: If the backend data storage fails to process the request.
+ """
+ pass
+
+ @abstractmethod
+ def remove_if_and_publish(self, ns: str, channels_and_events: Dict[str, Union[str, List[str]]],
+ key: str, data: bytes) -> bool:
+ """
+ Publish event to channel after removing key and its data from database if the
+ current data value is expected one.
+
+ remove_if_and_publish removes data from SDL conditionally, and if remove was done
+ successfully, a given event is published to channel. If existing data matches
+ given data, key and data are removed from SDL. If remove was done successfully,
+ true is returned.
+
+ Args:
+ ns (str): Namespace under which this operation is targeted.
+ channels_and_events (dict): Channel to publish if data was removed successfully.
+ key (str): Key, which data is to be removed.
+ data (bytes): Last known value of data
+
+ Returns:
+ bool: True if successful removal, false if the user's last known data did not match the
+ current value in data storage.
+
+ Raises:
+ SdlTypeError: If function's argument is of an inappropriate type.
+ NotConnected: If SDL is not connected to the backend data storage.
+ RejectedByBackend: If backend data storage rejects the request.
+ BackendError: If the backend data storage fails to process the request.
+ """
+ pass
+
+ @abstractmethod
+ def remove_all_and_publish(self, ns: str,
+ channels_and_events: Dict[str, Union[str, List[str]]]) -> None:
+ """
+ Publish event to channel after removing all keys under the namespace.
+
+ remove_all_and_publish removes all keys under the namespace and if successful, it
+ will publish an event to given channel. This operation is not atomic, thus it is
+ not guaranteed that all keys are removed.
+
+ Args:
+ ns (str): Namespace under which this operation is targeted.
+ channels_and_events (dict): Channel to publish if data was removed successfully.
+
+ Returns:
+ None
+
+ Raises:
+ SdlTypeError: If function's argument is of an inappropriate type.
+ NotConnected: If SDL is not connected to the backend data storage.
+ RejectedByBackend: If backend data storage rejects the request.
+ BackendError: If the backend data storage fails to process the request.
+ """
+ pass
+
+ @abstractmethod
+ def subscribe_channel(self, ns: str, cb: Callable[[str, List[str]], None],
+ channels: Union[str, Set[str]]) -> None:
+ """
+ Subscribes the client to the specified channels.
+
+ subscribe_channel lets you to subscribe for events on a given channels.
+ SDL notifications are events that are published on a specific channel.
+ Both the channel and events are defined by the entity that is publishing
+ the events.
+
+ When subscribing for a channel, a callback function is given as a parameter.
+ Whenever single notification or many notifications are received from a channel,
+ this callback is called with channel and notification list as parameter. A call
+ to subscribe_channel function returns immediately, callbacks will be called
+ synchronously from a dedicated thread.
+
+ It is possible to subscribe to different channels using different callbacks. In
+ this case simply use subscribe_channel function separately for each channel.
+
+ When receiving events in callback routine, it is a good practice to return from
+ callback as quickly as possible. Also it should be noted that in case of several
+ events received from different channels, callbacks are called in series one by
+ one.
+
+ Args:
+ ns: Namespace under which this operation is targeted.
+ cb: A function that is called when event(s) on channel is received.
+ channels: One channel or multiple channels to be subscribed.
+
+ Returns:
+ None
+
+ Raises:
+ SdlTypeError: If function's argument is of an inappropriate type.
+ NotConnected: If SDL is not connected to the backend data storage.
+ RejectedByBackend: If backend data storage rejects the request.
+ BackendError: If the backend data storage fails to process the request.
+ """
+ pass
+
+ @abstractmethod
+ def unsubscribe_channel(self, ns: str, channels: Union[str, Set[str]]) -> None:
+ """
+ unsubscribe_channel removes subscription from one or several channels.
+
+ Args:
+ ns: Namespace under which this operation is targeted.
+ channels: One channel or multiple channels to be unsubscribed.
+
+ Returns:
+ None
+
+ Raises:
+ SdlTypeError: If function's argument is of an inappropriate type.
+ NotConnected: If SDL is not connected to the backend data storage.
+ RejectedByBackend: If backend data storage rejects the request.
+ BackendError: If the backend data storage fails to process the request.
+ """
+ pass
+
+ @abstractmethod
+ def start_event_listener(self) -> None:
+ """
+ start_event_listener creates an event loop in a separate thread for handling
+ events from subscriptions. The registered callback function will be called
+ when an event is received.
+
+ It should be noted that subscribe_channel must be called before calling
+ start_event_listener to do at least one subscription before event loop
+ starts.
+
+ Raises:
+ SdlTypeError: If function's argument is of an inappropriate type.
+ NotConnected: If SDL is not connected to the backend data storage.
+ RejectedByBackend: If backend data storage rejects the request.
+ BackendError: If the backend data storage fails to process the request.
+ """
+ pass
+
+ @abstractmethod
+ def handle_events(self) -> Optional[Tuple[str, List[str]]]:
+ """
+ handle_events is a non-blocking function that returns a tuple containing channel
+ name and message(s) received from an event. The registered callback function will
+ still be called when an event is received.
+
+ This function is called if SDL user decides to handle notifications in its own
+ event loop. Calling this function after start_event_listener raises an exception.
+ If there are no notifications, these returns None.
+
+ It should be noted that subscribe_channel must be called before calling of the
+ handle_events in an event loop. At least one subscription must be done before
+ events handling starts.
+
+ Returns:
+ Tuple: (channel: str, message(s): list of str)
+
+ Raises:
+ SdlTypeError: If function's argument is of an inappropriate type.
+ NotConnected: If SDL is not connected to the backend data storage.
+ RejectedByBackend: If backend data storage rejects the request.
+ BackendError: If the backend data storage fails to process the request.
+ """
+ pass
+
@abstractmethod
def get_lock_resource(self, ns: str, resource: str,
expiration: Union[int, float]) -> SyncLockAbc: