Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 162 additions & 0 deletions pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
Drop,
DropTipRack,
GripDirection,
Mix,
MultiHeadAspirationContainer,
MultiHeadAspirationPlate,
MultiHeadDispenseContainer,
Expand Down Expand Up @@ -8889,6 +8890,167 @@ async def head96_experimental_dispense(
du=f"{stop_flow_rate_increment:05}",
)

@_requires_head96
@need_iswap_parked
async def mix96(
self,
mix: Mix,
resource: Optional[Union[Plate, Container, List[Well]]] = None,
a1_coordinate: Optional[Coordinate] = None,
minimum_traverse_height_start: Optional[float] = None,
offset: Coordinate = Coordinate.zero(),
blowout_air_volume: float = 5.0,
lld_mode: Optional[LLDMode] = None,
descent_speed: float = 80.0,
swap_speed: float = 5.0,
settling_time: float = 0.0,
minimum_traverse_height_end: Optional[float] = None,
):
"""Position the 96-head over a target and mix in place.

Raises the single channels to safe Z, then moves X/Y over the target and descends into the
well, then runs ``mix.repetitions`` symmetric aspirate / dispense cycles: each aspirate
follows the surface down by ``surface_following_distance`` and each dispense follows it
back up, so the tip oscillates without drifting. Returns to a traverse height when done.

Z targets are in tip-bottom space (the target Z is where the tip end goes, not the stop
disk). Declare the target in exactly one of two ways:

- ``a1_coordinate``: an explicit deck Coordinate for the channel-A1 position.
- ``resource`` (+ ``offset``): like ``aspirate96`` - a Plate (head A1 over well A1), a
Container, or a list of Wells.

Args:
mix: volume, repetitions, flow_rate and optional surface_following_distance.
resource: aspirate96-style target (Plate / Container / list[Well]); mutually exclusive
with ``a1_coordinate``.
a1_coordinate: explicit channel-A1 deck target; mutually exclusive with ``resource``.
minimum_traverse_height_start: absolute tip-bottom Z before the X/Y move; None uses full Z
safety.
offset: added to the resolved target position.
blowout_air_volume: air gap taken above the well before descent and expelled above it on
exit, to clear the tips of residual on the way out; 0 skips both.
lld_mode: liquid-level-detection mode; only ``LLDMode.OFF`` is supported (the default).
descent_speed: speed for the fast descent down to just above the well.
swap_speed: speed from there into the well to the target Z.
settling_time: seconds to wait after the last cycle, before retracting out of the well.
minimum_traverse_height_end: absolute tip-bottom Z after mixing; None uses full Z safety.

Raises:
ValueError: if neither or both of ``a1_coordinate`` / ``resource`` are given,
``lld_mode`` is not ``LLDMode.OFF``, or ``settling_time`` < 0.
RuntimeError: if the 96-head holds no tips.
"""
if (a1_coordinate is None) == (resource is None):
raise ValueError("provide exactly one of a1_coordinate or resource")

lld_mode = lld_mode if lld_mode is not None else self.LLDMode.OFF
if lld_mode is not self.LLDMode.OFF:
raise ValueError("mix96 currently supports only LLDMode.OFF")

if settling_time < 0:
raise ValueError("settling_time must be >= 0")

if await self.head96_request_tip_presence() == 0:
raise RuntimeError("96-head has no tips (firmware reports none); pick up tips first")

# H0 direct-drive moves don't raise the single channels (the C0 core-96 commands do so at
# firmware level), so do it explicitly before the X/Y traverse.
await self.move_all_channels_in_z_safety()

# resolve the channel-A1 deck target (and the container top, when a resource gives us one)
if a1_coordinate is not None:
a1 = a1_coordinate + offset
z_top: Optional[float] = None
else:
anchor: Container
if isinstance(resource, Plate):
anchor = resource.get_item(0) # head A1 over well A1 (as aspirate96 resolves it)
elif isinstance(resource, list):
anchor = resource[0]
else:
assert resource is not None
anchor = resource
a1 = anchor.get_absolute_location(x="c", y="c", z="cavity_bottom") + offset
z_top = anchor.get_absolute_location(x="c", y="c", z="top").z

# traverse to start height; None retracts to full (stop-disk) Z safety, a value is the
# tip-bottom height the rest of the method works in
if minimum_traverse_height_start is None:
await self.head96_move_to_z_safety()
else:
await self.head96_move_tool_z(minimum_traverse_height_start, speed=descent_speed)

# move X, Y; X acceleration_level=1 below y=200 mm, the low-Y zone where the head is
# cantilevered forward off the X-drive and wobbles most
await asyncio.gather(
self.head96_move_x(a1.x, acceleration_level=1 if a1.y <= 200.0 else 3),
self.head96_move_y(a1.y),
)

# the tip oscillates between the floor (a1.z) and mix_start (floor + sf), starting at
# mix_start so the first aspirate can follow the surface down without hitting the bottom.
sf = 0.0 if mix.surface_following_distance is None else mix.surface_following_distance
mix_floor = a1.z
mix_start = a1.z + sf

# 2-stage Z descent in tip-bottom space: descent_speed to the swap-start height just above
# the well, aspirate blowout_air_volume, then swap_speed down to mix_start; move_tool_z lands
# the tip end each move.
z_clearance = 5.0
swap_start_z = (z_top if z_top is not None else mix_start) + z_clearance
await self.head96_move_tool_z(swap_start_z, speed=descent_speed)
if blowout_air_volume:
await self.head96_experimental_aspirate(
blowout_air_volume,
flow_rate=mix.flow_rate,
minimum_height=mix_floor,
surface_following_distance=0,
requires_tip=False,
)
await self.head96_move_tool_z(mix_start, speed=swap_speed)

# symmetric mix cycles (no per-cycle drift): each aspirate follows the surface down by sf
# to the floor, each dispense back up to mix_start. minimum_height is the tip-bottom floor;
# the experimental commands convert it to the stop-disk reference.
for _ in range(mix.repetitions):
await self.head96_experimental_aspirate(
mix.volume,
flow_rate=mix.flow_rate,
minimum_height=mix_floor,
surface_following_distance=sf,
requires_tip=False,
)
await self.head96_experimental_dispense(
mix.volume,
flow_rate=mix.flow_rate,
minimum_height=mix_floor,
surface_following_distance=sf,
requires_tip=False,
)

# settle in place (tip still in the liquid) after the last cycle
if settling_time:
await asyncio.sleep(settling_time)

# careful exit at swap_speed back up to the swap-start height (mirrors the descent), before
# the fast traverse out
await self.head96_move_tool_z(swap_start_z, speed=swap_speed)

if blowout_air_volume:
await self.head96_experimental_dispense(
blowout_air_volume,
flow_rate=mix.flow_rate,
requires_tip=False,
)

# traverse to end height; None retracts to full (stop-disk) Z safety, a value is the
# tip-bottom height the rest of the method works in
if minimum_traverse_height_end is None:
await self.head96_move_to_z_safety()
else:
await self.head96_move_tool_z(minimum_traverse_height_end, speed=descent_speed)

# # # Granular commands # # #

async def head96_dispensing_drive_move_to_home_volume(
Expand Down
85 changes: 84 additions & 1 deletion pylabrobot/liquid_handling/backends/hamilton/STAR_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from pylabrobot.arms.standard import CartesianCoords
from pylabrobot.liquid_handling import LiquidHandler
from pylabrobot.liquid_handling.standard import GripDirection, Pickup
from pylabrobot.liquid_handling.standard import GripDirection, Mix, Pickup
from pylabrobot.plate_reading import PlateReader
from pylabrobot.plate_reading.chatterbox import PlateReaderChatterboxBackend
from pylabrobot.resources import (
Expand Down Expand Up @@ -169,6 +169,24 @@ def _make_head96_information(star):
)


def _stub_mix96_motion(star):
"""Stub the 96-head primitives mix96 orchestrates so tests can assert the arguments it passes
without touching firmware. Tips present; iSWAP already parked via setUp."""
star._head96_information = _make_head96_information(star)
star.head96_request_tip_presence = unittest.mock.AsyncMock(return_value=1)
for method in (
"move_all_channels_in_z_safety",
"head96_move_to_z_safety",
"head96_move_z",
"head96_move_x",
"head96_move_y",
"head96_move_tool_z",
"head96_experimental_aspirate",
"head96_experimental_dispense",
):
setattr(star, method, unittest.mock.AsyncMock())


class TestPipChannelInformationParsing(unittest.TestCase):
"""VW (pip channel hardware-configuration) response parsing.

Expand Down Expand Up @@ -1307,6 +1325,71 @@ async def test_head96_experimental_aspirate_minimum_height_defaults_to_floor(sel
]
)

async def test_mix96_floor_maps_to_minimum_height_with_offset(self):
"""mix96 sends the resolved tip-bottom floor (well cavity_bottom + offset.z) as the
experimental command's minimum_height - guards offset.z reaching the floor."""
_stub_mix96_motion(self.STAR)
offset_z = 2.0
await self.STAR.mix96(
Mix(volume=50, repetitions=1, flow_rate=100),
resource=self.plate,
offset=Coordinate(0, 0, offset_z),
)
well = self.plate.get_item(0)
expected_floor = well.get_absolute_location(x="c", y="c", z="cavity_bottom").z + offset_z
self.assertEqual(
self.STAR.head96_experimental_aspirate.call_args.kwargs["minimum_height"], expected_floor
)

async def test_mix96_stroke_starts_surface_following_above_floor(self):
"""The careful (swap_speed) descent lands at floor + surface_following_distance and that
distance reaches the aspirate, so the stroke spans [floor, floor+sf], never below floor."""
_stub_mix96_motion(self.STAR)
floor_z, sf = 100.0, 8.0
await self.STAR.mix96(
Mix(volume=50, repetitions=1, flow_rate=100, surface_following_distance=sf),
a1_coordinate=Coordinate(500, 300, floor_z),
swap_speed=5.0,
)
# move_tool_z calls: [0] fast to swap-start, [1] careful to mix_start, [2] exit retract
careful_descent = self.STAR.head96_move_tool_z.call_args_list[1]
self.assertEqual(careful_descent.args[0], floor_z + sf)
self.assertEqual(careful_descent.kwargs["speed"], 5.0)
self.assertEqual(
self.STAR.head96_experimental_aspirate.call_args.kwargs["surface_following_distance"], sf
)

async def test_mix96_specified_traverse_heights_are_tip_bottom_moves(self):
"""A specified minimum_traverse_height_start/end is a tip-bottom Z (head96_move_tool_z), like
the rest of the method; only the None default retracts to stop-disk Z safety. Guards against a
geometric (tip-bottom) traverse height being driven as a stop-disk position."""
_stub_mix96_motion(self.STAR)
start_z, end_z = 250.0, 240.0
await self.STAR.mix96(
Mix(volume=50, repetitions=1, flow_rate=100),
a1_coordinate=Coordinate(500, 300, 100.0),
minimum_traverse_height_start=start_z,
minimum_traverse_height_end=end_z,
)
self.STAR.head96_move_to_z_safety.assert_not_called()
tool_z_targets = [call.args[0] for call in self.STAR.head96_move_tool_z.call_args_list]
self.assertEqual(tool_z_targets[0], start_z) # first tool move is the start traverse
self.assertEqual(tool_z_targets[-1], end_z) # last tool move is the end traverse

async def test_mix96_zero_blowout_skips_air_gap_calls(self):
"""blowout_air_volume=0 issues no firmware aspirate/dispense for the air gap: every
experimental aspirate/dispense is a mix-cycle stroke (mix.volume), none a zero-volume blow-out."""
_stub_mix96_motion(self.STAR)
await self.STAR.mix96(
Mix(volume=50, repetitions=1, flow_rate=100),
a1_coordinate=Coordinate(500, 300, 100.0),
blowout_air_volume=0.0,
)
asp_vols = [call.args[0] for call in self.STAR.head96_experimental_aspirate.call_args_list]
disp_vols = [call.args[0] for call in self.STAR.head96_experimental_dispense.call_args_list]
self.assertEqual(asp_vols, [50]) # one cycle aspirate, no blow-out aspirate
self.assertEqual(disp_vols, [50]) # one cycle dispense, no blow-out dispense

async def test_core_96_dispense_quadrant(self):
"""Test that each quadrant of a 384-well plate produces the correct firmware command.

Expand Down
Loading