Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 22 additions & 65 deletions src/labthings_fastapi/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,12 @@
TypeVar,
overload,
)
from weakref import WeakSet
import weakref
from fastapi import APIRouter, FastAPI, HTTPException, Request, Body, BackgroundTasks
from pydantic import BaseModel, create_model

from labthings_fastapi.message_broker import Message


from .middleware.url_for import URLFor
from .base_descriptor import (
Expand All @@ -68,7 +69,6 @@
)
from .thing_description import type_to_dataschema
from .thing_description._model import ActionAffordance, ActionOp, Form, LinkElement
from .utilities import labthings_data


if TYPE_CHECKING:
Expand Down Expand Up @@ -247,6 +247,20 @@
log=self.log,
)

def _publish_status(self) -> None:
"""Publish a status change event to any observers.

This should be called after each change to ``self._status``
"""
self.thing._thing_server_interface.publish(
Message(
thing=self.thing.name,
affordance=self.action.name, # type: ignore[attr-defined]
message_type="action",
payload=self._status.value,
)
)

def run(self) -> None:
"""Run the action and track progress.

Expand Down Expand Up @@ -282,7 +296,7 @@
add_thing_log_destination(self.id, self._log)
with invocation_contexts.set_invocation_id(self.id):
try:
action.emit_changed_event(self.thing, self._status.value)
self._publish_status()

thing = self.thing
kwargs = model_to_dict(self.input)
Expand All @@ -298,21 +312,21 @@
with self._status_lock:
self._status = InvocationStatus.RUNNING
self._start_time = datetime.datetime.now()
action.emit_changed_event(self.thing, self._status.value)
self._publish_status()

# Actually run the action
ret = action.func(thing, **kwargs, **self.dependencies)

with self._status_lock:
self._return_value = ret
self._status = InvocationStatus.COMPLETED
action.emit_changed_event(self.thing, self._status.value)
self._publish_status()

except InvocationCancelledError:
logger.info(f"Invocation {self.id} was cancelled.")
with self._status_lock:
self._status = InvocationStatus.CANCELLED
action.emit_changed_event(self.thing, self._status.value)
self._publish_status()
except Exception as e: # skipcq: PYL-W0703
# First log
if isinstance(e, InvocationError):
Expand All @@ -332,7 +346,7 @@
with self._status_lock:
self._status = InvocationStatus.ERROR
self._exception = e
action.emit_changed_event(self.thing, self._status.value)
self._publish_status()
finally:
with self._status_lock:
self._end_time = datetime.datetime.now()
Expand Down Expand Up @@ -521,8 +535,8 @@
with self._invocations_lock:
try:
invocation: Any = self._invocations[id]
except KeyError as e:
raise HTTPException(

Check warning on line 539 in src/labthings_fastapi/actions.py

View workflow job for this annotation

GitHub Actions / coverage

538-539 lines are not covered with tests
status_code=404,
detail="No action invocation found with ID {id}",
) from e
Expand All @@ -535,7 +549,7 @@
invocation.output.response
):
# TODO: honour "accept" header
return invocation.output.response()

Check warning on line 552 in src/labthings_fastapi/actions.py

View workflow job for this annotation

GitHub Actions / coverage

552 line is not covered with tests
return invocation.output

@router.delete(
Expand All @@ -560,8 +574,8 @@
with self._invocations_lock:
try:
invocation: Any = self._invocations[id]
except KeyError as e:
raise HTTPException(

Check warning on line 578 in src/labthings_fastapi/actions.py

View workflow job for this annotation

GitHub Actions / coverage

577-578 lines are not covered with tests
status_code=404,
detail="No action invocation found with ID {id}",
) from e
Expand Down Expand Up @@ -748,7 +762,7 @@
"""
super().__set_name__(owner, name)
if self.name != self.func.__name__:
raise ValueError(

Check warning on line 765 in src/labthings_fastapi/actions.py

View workflow job for this annotation

GitHub Actions / coverage

765 line is not covered with tests
f"Action name '{self.name}' does not match function name "
f"'{self.func.__name__}'",
)
Expand Down Expand Up @@ -810,70 +824,13 @@
"""

@wraps(self.func)
def wrapped(*args: Any, **kwargs: Any) -> Any: # noqa: DOC
def wrapped(*args: Any, **kwargs: Any) -> Any: # noqa: DOC101, DOC103, DOC201
"""Acquire the lock then run `func` with supplied arguments."""
with self.context_for_func(obj):
return self.func(*args, **kwargs)

return partial(wrapped, obj)

def _observers_set(self, obj: Thing) -> WeakSet:
"""Return a set used to notify changes.

Note that we need to supply the `~lt.Thing` we are looking at, as in
general there may be more than one object of the same type, and
descriptor instances are shared between all instances of their class.

:param obj: The `~lt.Thing` on which the action is being observed.

:return: a weak set of callables to notify on changes to the action.
This is used by websocket endpoints.
"""
ld = labthings_data(obj)
if self.name not in ld.action_observers:
ld.action_observers[self.name] = WeakSet()
return ld.action_observers[self.name]

def emit_changed_event(self, obj: Thing, status: str) -> None:
"""Notify subscribers that the action status has changed.

This function is run from within the `.Invocation` thread that
is created when an action is called. It must be run from a thread
as it is communicating with the event loop via an `asyncio` blocking
portal. Async code must not use the blocking portal as it can deadlock
the event loop.

:param obj: The `~lt.Thing` on which the action is being observed.
:param status: The status of the action, to be sent to observers.
"""
obj._thing_server_interface.start_async_task_soon(
self.emit_changed_event_async,
obj,
status,
)

async def emit_changed_event_async(self, obj: Thing, value: Any) -> None:
"""Notify subscribers that the action status has changed.

This is an async function that must be run in the `anyio` event loop.
It will send messages to each observer to notify them that something
has changed.

:param obj: The `~lt.Thing` on which the action is defined.
`.ActionDescriptor` objects are unique to the class, but there may
be more than one `~lt.Thing` attached to a server with the same class.
We use ``obj`` to look up the observers of the current `~lt.Thing`.
:param value: The action status to communicate to the observers.
"""
action_name = self.name
for observer in self._observers_set(obj):
await observer.send(
{
"messageType": "actionStatus",
"data": {"action name": action_name, "status": value},
}
)

def add_to_fastapi(self, app: FastAPI, thing: Thing) -> None:
"""Add this action to a FastAPI app, bound to a particular Thing.

Expand Down Expand Up @@ -940,14 +897,14 @@
try:
responses[200]["model"] = self.output_model
pass
except AttributeError:
print(f"Failed to generate response model for action {self.name}")

Check warning on line 901 in src/labthings_fastapi/actions.py

View workflow job for this annotation

GitHub Actions / coverage

900-901 lines are not covered with tests
# Add an additional media type if we may return a file
if hasattr(self.output_model, "media_type"):
responses[200]["content"][self.output_model.media_type] = {}

Check warning on line 904 in src/labthings_fastapi/actions.py

View workflow job for this annotation

GitHub Actions / coverage

904 line is not covered with tests
# Now we can add the endpoint to the app.
if thing.path is None:
raise NotConnectedToServerError(

Check warning on line 907 in src/labthings_fastapi/actions.py

View workflow job for this annotation

GitHub Actions / coverage

907 line is not covered with tests
"Can't add the endpoint without thing.path!"
)
app.post(
Expand Down Expand Up @@ -995,7 +952,7 @@
"""
path = path or thing.path
if path is None:
raise NotConnectedToServerError("Can't generate forms without a path!")

Check warning on line 955 in src/labthings_fastapi/actions.py

View workflow job for this annotation

GitHub Actions / coverage

955 line is not covered with tests
forms = [
Form[ActionOp](href=path + self.name, op=[ActionOp.invokeaction]),
]
Expand Down
2 changes: 1 addition & 1 deletion src/labthings_fastapi/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class ReadOnlyPropertyError(AttributeError):
class PropertyNotObservableError(RuntimeError):
"""The property is not observable.

This exception is raised when `~lt.Thing.observe_property` is called with a
This exception is raised when trying to observe
property that is not observable. Currently, only data properties are
observable: functional properties (using a getter/setter) may not be
observed.
Expand Down
122 changes: 122 additions & 0 deletions src/labthings_fastapi/message_broker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
"""Handle pub-sub style events.

Both properties and actions can emit events that may be observed. This module handles
all the pub-sub messaging in LabThings.
"""

import anyio
from pydantic.dataclasses import dataclass
from typing import Any, Literal
from weakref import WeakSet

from anyio.abc import ObjectSendStream


@dataclass
class Message:
"""A pub-sub event message.

This is the message that is sent when a property or action generates
an event.

This is a pydantic dataclass, so we validate the message. This might
change in the future for performance reasons.

:param thing: The name of the Thing generating the event.
:param affordance: The name of the affordance generating the event.
:param message: The message to send.
"""

thing: str
affordance: str
message_type: Literal["property", "action", "event"]
payload: Any


class MessageBroker:
r"""A class that relays pub/sub messages.

This class takes care of relaying messages to streams that have subscribed to them.
It does not format messages or handle any details of e.g. websocket protocol.

Subscriptions require an `ObjectSendStream[Message]` and each time a `Message`
matching the subscription parameters (``thing`` and ``affordance``) is published,
it will be sent on that stream.

The broker does not validate thing or affordance names: that's up to the code
calling `MessageBroker.subscribe`\ .
"""

def __init__(self) -> None:
"""Initialise the message broker."""
# Note that we use a weak set below, so that when a websocket disconnects,
# its stream is removed automatically.
self._subscriptions: dict[
str, dict[str, WeakSet[ObjectSendStream[Message]]]
] = {}

def subscribe(
self, thing: str, affordance: str, stream: ObjectSendStream[Message]
) -> None:
"""Subscribe to messages from a particular affordance.

Note that this method is not async - it just registers the stream and so
can be run from any thread.

:param thing: The name of the `.Thing` being subscribed to.
:param affordance: The name of the affordance being subscribed to.
:param stream: A stream to send the messages to.
:raises TypeError: if the `thing` argument is not a string.
"""
if not isinstance(thing, str):
raise TypeError(f"The `thing` argument should be a string, not {thing}.")
if thing not in self._subscriptions:
self._subscriptions[thing] = {}
if affordance not in self._subscriptions[thing]:
self._subscriptions[thing][affordance] = WeakSet()
self._subscriptions[thing][affordance].add(stream)

def unsubscribe(
self, thing: str, affordance: str, stream: ObjectSendStream[Message]
) -> None:
"""Unsubscribe a stream from messages from a particular affordance.

:param thing: The name of the `.Thing` being unsubscribed from.
:param affordance: The name of the affordance being unsubscribed from.
:param stream: The stream to unsubscribe.
:raises KeyError: if there is no such subscription.
:raises TypeError: if the `thing` argument is not a string.
"""
if not isinstance(thing, str):
raise TypeError(f"The `thing` argument should be a string, not {thing}.")
try:
self._subscriptions[thing][affordance].discard(stream)
except KeyError as e:
raise e

async def publish(self, message: Message) -> None:
"""Publish a message.

This async method will relay the message to any subscriber streams.

:param message: the message to send.
"""
try:
subscriptions = self._subscriptions[message.thing][message.affordance]
except KeyError:
return # No subscribers for this thing.
for stream in subscriptions:
await stream.send(message)

async def close_streams(self) -> None:
"""Close all streams that are subscribed to receive messages.

This should be called when the server shuts down.
"""
# We use a task group so we shut down all streams concurrently, rather
# than waiting for each one to close.
async with anyio.create_task_group() as tg:
for thing_subs in self._subscriptions.values():
for subs in thing_subs.values():
for stream in subs:
tg.start_soon(stream.aclose)
Loading
Loading