Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 0 additions & 16 deletions pylabrobot/thermocycling/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,10 @@ async def run_protocol(self, protocol: Protocol, block_max_volume: float):
async def get_block_current_temperature(self) -> List[float]:
"""Get the current block temperature zones in °C."""

@abstractmethod
async def get_block_target_temperature(self) -> List[float]:
"""Get the block target temperature zones in °C. May raise RuntimeError if no target is set."""

@abstractmethod
async def get_lid_current_temperature(self) -> List[float]:
"""Get the current lid temperature zones in °C."""

@abstractmethod
async def get_lid_target_temperature(self) -> List[float]:
"""Get the lid target temperature zones in °C. May raise RuntimeError if no target is set."""

@abstractmethod
async def get_lid_open(self) -> bool:
"""Return ``True`` if the lid is open."""
Expand All @@ -85,14 +77,6 @@ async def get_hold_time(self) -> float:
async def get_current_cycle_index(self) -> int:
"""Get the zero-based index of the current cycle."""

@abstractmethod
async def get_total_cycle_count(self) -> int:
"""Get the total cycle count."""

@abstractmethod
async def get_current_step_index(self) -> int:
"""Get the zero-based index of the current step within the cycle."""

@abstractmethod
async def get_total_step_count(self) -> int:
"""Get the total number of steps in the current cycle."""
16 changes: 0 additions & 16 deletions pylabrobot/thermocycling/chatterbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,33 +151,17 @@ async def get_hold_time(self) -> float:
async def get_current_cycle_index(self) -> int:
return 1

async def get_total_cycle_count(self) -> int:
return 1

async def get_current_step_index(self) -> int:
# If the profile is "running", it means the simulation hasn't happened yet.
# The moment get_hold_time is called, it will complete instantly.
return self._state.total_steps if not self._state.is_profile_running else 1

async def get_total_step_count(self) -> int:
return self._state.total_steps

async def get_block_current_temperature(self) -> List[float]:
return self._state.block_temp

async def get_block_target_temperature(self) -> List[float]:
if self._state.block_target is None:
raise RuntimeError("Block target temperature is not set. Is a cycle running?")
return self._state.block_target

async def get_lid_current_temperature(self) -> List[float]:
return self._state.lid_temp

async def get_lid_target_temperature(self) -> List[float]:
if self._state.lid_target is None:
raise RuntimeError("Lid target temperature is not set. Is a cycle running?")
return self._state.lid_target

async def get_lid_open(self) -> bool:
return self._state.lid_open

Expand Down
12 changes: 0 additions & 12 deletions pylabrobot/thermocycling/inheco/odtc_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,6 @@ async def get_block_current_temperature(self) -> List[float]:
temps = await self.get_sensor_data()
return [temps.get("Mount", 0.0)]

async def get_block_target_temperature(self) -> List[float]:
raise NotImplementedError()

async def get_block_status(self) -> BlockStatus:
raise NotImplementedError()

Expand All @@ -208,9 +205,6 @@ async def get_lid_current_temperature(self) -> List[float]:
temps = await self.get_sensor_data()
return [temps.get("Lid", 0.0)]

async def get_lid_target_temperature(self) -> List[float]:
raise NotImplementedError()

# -------------------------------------------------------------------------
# Protocol
# -------------------------------------------------------------------------
Expand Down Expand Up @@ -382,11 +376,5 @@ async def get_hold_time(self) -> float:
async def get_current_cycle_index(self) -> int:
raise NotImplementedError()

async def get_total_cycle_count(self) -> int:
raise NotImplementedError()

async def get_current_step_index(self) -> int:
raise NotImplementedError()

async def get_total_step_count(self) -> int:
raise NotImplementedError()
23 changes: 0 additions & 23 deletions pylabrobot/thermocycling/opentrons_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,22 +126,9 @@ def _find_module(self) -> dict:
async def get_block_current_temperature(self) -> List[float]:
return [cast(float, self._find_module()["currentTemperature"])]

async def get_block_target_temperature(self) -> List[float]:
target_temp = self._find_module().get("targetTemperature")
if target_temp is None:
raise RuntimeError("Block target temperature is not set. is a cycle running?")
return [cast(float, target_temp)]

async def get_lid_current_temperature(self) -> List[float]:
return [cast(float, self._find_module()["lidTemperature"])]

async def get_lid_target_temperature(self) -> List[float]:
"""Get the lid target temperature in °C. Raises RuntimeError if no target is active."""
target_temp = self._find_module().get("lidTargetTemperature")
if target_temp is None:
raise RuntimeError("Lid target temperature is not set. is a cycle running?")
return [cast(float, target_temp)]

async def get_lid_open(self) -> bool:
return cast(str, self._find_module()["lidStatus"]) == "open"

Expand Down Expand Up @@ -185,20 +172,10 @@ async def get_current_cycle_index(self) -> int:

raise RuntimeError("Current cycle index is not available. Is a profile running?")

async def get_total_cycle_count(self) -> int:
# https://github.com/PyLabRobot/pylabrobot/issues/632
raise NotImplementedError('Opentrons "cycle" concept is not understood currently.')

async def get_current_step_index(self) -> int:
"""Get the zero-based index of the current step from the Opentrons API."""
# Opentrons API returns one-based, convert to zero-based
step_index = self._find_module().get("currentStepIndex")
if step_index is None:
raise RuntimeError("Current step index is not available. Is a profile running?")
return cast(int, step_index) - 1

async def get_total_step_count(self) -> int:
total_steps = self._find_module().get("totalStepCount")
if total_steps is None:
raise RuntimeError("Total step count is not available. Is a profile running?")
return cast(int, total_steps)
4 changes: 0 additions & 4 deletions pylabrobot/thermocycling/opentrons_backend_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,17 +98,13 @@ async def test_getters_return_correct_data(self, mock_list_connected_modules):
mock_list_connected_modules.return_value = [mock_data]

assert await self.thermocycler_backend.get_block_current_temperature() == [25.5]
assert await self.thermocycler_backend.get_block_target_temperature() == [95.0]
assert await self.thermocycler_backend.get_lid_current_temperature() == [37.1]
assert await self.thermocycler_backend.get_lid_target_temperature() == [105.0]
assert await self.thermocycler_backend.get_lid_open() is True
assert await self.thermocycler_backend.get_lid_status() == LidStatus.HOLDING_AT_TARGET
assert await self.thermocycler_backend.get_block_status() == BlockStatus.HOLDING_AT_TARGET
assert await self.thermocycler_backend.get_hold_time() == 12.0
# assert await self.thermocycler_backend.get_current_cycle_index() == 1 # 2 - 1 = 1 (zero-based)
# assert await self.thermocycler_backend.get_total_cycle_count() == 10
assert await self.thermocycler_backend.get_current_step_index() == 0 # 1 - 1 = 0 (zero-based)
assert await self.thermocycler_backend.get_total_step_count() == 3

@patch("pylabrobot.thermocycling.opentrons_backend.list_connected_modules")
async def test_get_hold_time_raises_if_not_running(self, mock_list_connected_modules):
Expand Down
20 changes: 0 additions & 20 deletions pylabrobot/thermocycling/opentrons_backend_usb.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,25 +281,11 @@ async def get_block_current_temperature(self) -> List[float]:
plate_temp = await self._driver.get_plate_temperature()
return [plate_temp.current]

async def get_block_target_temperature(self) -> List[float]:
assert self._driver is not None
plate_temp = await self._driver.get_plate_temperature()
if plate_temp.target is not None:
return [plate_temp.target]
raise RuntimeError("Block target temperature is not set.")

async def get_lid_current_temperature(self) -> List[float]:
assert self._driver is not None
lid_temp = await self._driver.get_lid_temperature()
return [lid_temp.current]

async def get_lid_target_temperature(self) -> List[float]:
assert self._driver is not None
lid_temp = await self._driver.get_lid_temperature()
if lid_temp.target is not None:
return [lid_temp.target]
raise RuntimeError("Lid target temperature is not set.")

async def get_lid_open(self) -> bool:
"""Return True if the lid is open."""
assert self._driver is not None
Expand All @@ -326,11 +312,5 @@ async def get_hold_time(self) -> float:
async def get_current_cycle_index(self) -> int:
return self._current_cycle_index if self._current_cycle_index is not None else 0

async def get_total_cycle_count(self) -> int:
return self._total_cycle_count if self._total_cycle_count is not None else 0

async def get_current_step_index(self) -> int:
return self._current_step_index if self._current_step_index is not None else 0

async def get_total_step_count(self) -> int:
return self._total_step_count if self._total_step_count is not None else 0
Original file line number Diff line number Diff line change
Expand Up @@ -1036,19 +1036,3 @@ async def get_lid_open(self, *args, **kwargs):

async def get_lid_status(self, *args, **kwargs) -> LidStatus:
raise NotImplementedError

async def get_lid_target_temperature(self, *args, **kwargs):
# deprecated
raise NotImplementedError

async def get_total_cycle_count(self, *args, **kwargs):
# deprecated
raise NotImplementedError

async def get_total_step_count(self, *args, **kwargs):
# deprecated
raise NotImplementedError

async def get_block_target_temperature(self, *args, **kwargs):
# deprecated
raise NotImplementedError
56 changes: 40 additions & 16 deletions pylabrobot/thermocycling/thermocycler.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ def __init__(
)
Machine.__init__(self, backend=backend)
self.backend: ThermocyclerBackend = backend
self._block_target_temperature: Optional[List[float]] = None
self._lid_target_temperature: Optional[List[float]] = None
self._current_protocol: Optional[Protocol] = None
self._total_cycle_count: int = 0
self._total_step_count: int = 0

async def open_lid(self, **backend_kwargs):
return await self.backend.open_lid(**backend_kwargs)
Expand All @@ -61,6 +66,7 @@ async def set_block_temperature(self, temperature: List[float], **backend_kwargs
Args:
temperature: List of target temperatures in °C for multiple zones.
"""
self._block_target_temperature = list(temperature)
return await self.backend.set_block_temperature(temperature, **backend_kwargs)

async def set_lid_temperature(self, temperature: List[float], **backend_kwargs):
Expand All @@ -69,6 +75,7 @@ async def set_lid_temperature(self, temperature: List[float], **backend_kwargs):
Args:
temperature: List of target temperatures in °C for multiple zones.
"""
self._lid_target_temperature = list(temperature)
return await self.backend.set_lid_temperature(temperature, **backend_kwargs)

async def deactivate_block(self, **backend_kwargs):
Expand Down Expand Up @@ -96,6 +103,11 @@ async def run_protocol(self, protocol: Protocol, block_max_volume: float, **back
f"Expected {num_zones}, got {len(step.temperature)} in step {i}."
)

self._current_protocol = protocol
self._block_target_temperature = list(protocol.stages[0].steps[0].temperature)
self._total_cycle_count = sum(stage.repeats for stage in protocol.stages)
self._total_step_count = sum(len(stage.steps) * stage.repeats for stage in protocol.stages)

return await self.backend.run_protocol(protocol, block_max_volume, **backend_kwargs)

async def run_pcr_profile(
Expand Down Expand Up @@ -180,15 +192,19 @@ async def get_block_current_temperature(self, **backend_kwargs) -> List[float]:

async def get_block_target_temperature(self, **backend_kwargs) -> List[float]:
"""Get the block's target temperature(s) (°C)."""
return await self.backend.get_block_target_temperature(**backend_kwargs)
if self._block_target_temperature is None:
raise RuntimeError("Block target temperature is not set.")
return self._block_target_temperature

async def get_lid_current_temperature(self, **backend_kwargs) -> List[float]:
"""Get the current lid temperature(s) (°C)."""
return await self.backend.get_lid_current_temperature(**backend_kwargs)

async def get_lid_target_temperature(self, **backend_kwargs) -> List[float]:
"""Get the lid's target temperature(s) (°C), if supported."""
return await self.backend.get_lid_target_temperature(**backend_kwargs)
if self._lid_target_temperature is None:
raise RuntimeError("Lid target temperature is not set.")
return self._lid_target_temperature

async def get_lid_open(self, **backend_kwargs) -> bool:
"""Return ``True`` if the lid is open."""
Expand All @@ -212,15 +228,15 @@ async def get_current_cycle_index(self, **backend_kwargs) -> int:

async def get_total_cycle_count(self, **backend_kwargs) -> int:
"""Get the total number of cycles."""
return await self.backend.get_total_cycle_count(**backend_kwargs)
return self._total_cycle_count

async def get_current_step_index(self, **backend_kwargs) -> int:
"""Get the one-based index of the current step."""
return await self.backend.get_current_step_index(**backend_kwargs)

async def get_total_step_count(self, **backend_kwargs) -> int:
"""Get the total number of steps in the current cycle."""
return await self.backend.get_total_step_count(**backend_kwargs)
"""Get the total number of steps in the current protocol."""
return self._total_step_count

async def wait_for_block(self, timeout: float = 600, tolerance: float = 0.5, **backend_kwargs):
"""Wait until block temp reaches target ± tolerance for all zones."""
Expand All @@ -234,22 +250,23 @@ async def wait_for_block(self, timeout: float = 600, tolerance: float = 0.5, **b
raise TimeoutError("Block temperature timeout.")

async def wait_for_lid(self, timeout: float = 1200, tolerance: float = 0.5, **backend_kwargs):
"""Wait until the lid temperature reaches target ± ``tolerance`` or the lid temperature status is idle/holding at target."""
"""Wait until the lid temperature reaches target ± ``tolerance`` or the lid status indicates it is ready."""
try:
targets = await self.get_lid_target_temperature(**backend_kwargs)
except RuntimeError:
targets = None

start = time.time()
while time.time() - start < timeout:
status = await self.get_lid_status(**backend_kwargs)
if status in [LidStatus.IDLE, LidStatus.HOLDING_AT_TARGET, "idle", "holding at target"]:
return

if targets is not None:
currents = await self.get_lid_current_temperature(**backend_kwargs)
if all(abs(current - target) < tolerance for current, target in zip(currents, targets)):
return
else:
# If no target temperature, check status
status = await self.get_lid_status(**backend_kwargs)
if status in ["idle", "holding at target"]:
return

await asyncio.sleep(1)
raise TimeoutError("Lid temperature timeout.")

Expand All @@ -261,15 +278,22 @@ async def is_profile_running(self, **backend_kwargs) -> bool:
step = await self.get_current_step_index(**backend_kwargs)
total_steps = await self.get_total_step_count(**backend_kwargs)

# if still holding in a step, it's running
# If still holding in a step, it is running.
if hold and hold > 0:
return True
# if haven't reached last cycle (zero-based indexing)
if cycle < total_cycles - 1:

# Some backends report the completed step count after finishing.
if total_steps and step >= total_steps:
return False

# If we have not reached the last cycle, it is running.
if total_cycles and cycle < total_cycles - 1:
return True
# last cycle but not last step (zero-based indexing)
if cycle == total_cycles - 1 and step < total_steps - 1:

# Last cycle but not last step.
if total_steps and cycle == total_cycles - 1 and step < total_steps - 1:
return True

return False

async def wait_for_profile_completion(self, poll_interval: float = 60.0, **backend_kwargs):
Expand Down
8 changes: 2 additions & 6 deletions pylabrobot/thermocycling/thermocycler_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,13 @@ def mock_backend() -> MagicMock:
mock.deactivate_lid = AsyncMock()
mock.run_protocol = AsyncMock()
mock.get_block_current_temperature = AsyncMock(return_value=[25.0])
mock.get_block_target_temperature = AsyncMock(return_value=None)
mock.get_lid_current_temperature = AsyncMock(return_value=[25.0])
mock.get_lid_target_temperature = AsyncMock(return_value=None)
mock.get_lid_open = AsyncMock(return_value=False)
mock.get_lid_temperature_status = AsyncMock(return_value="idle")
mock.get_block_status = AsyncMock(return_value="idle")
mock.get_hold_time = AsyncMock(return_value=0.0)
mock.get_current_cycle_index = AsyncMock(return_value=0)
mock.get_total_cycle_count = AsyncMock(return_value=0)
mock.get_current_step_index = AsyncMock(return_value=0)
mock.get_total_step_count = AsyncMock(return_value=0)
return mock


Expand Down Expand Up @@ -132,9 +128,9 @@ async def test_is_profile_running_logic(self):
for hold, cycle, total_cycles, step, total_steps, expected in test_cases:
self.tc.backend.get_hold_time.return_value = hold # type: ignore
self.tc.backend.get_current_cycle_index.return_value = cycle # type: ignore
self.tc.backend.get_total_cycle_count.return_value = total_cycles # type: ignore
self.tc._total_cycle_count = total_cycles
self.tc.backend.get_current_step_index.return_value = step # type: ignore
self.tc.backend.get_total_step_count.return_value = total_steps # type: ignore
self.tc._total_step_count = total_steps
print(f"Testing with hold={hold}, cycle={cycle}, total_cycles={total_cycles}, ")
assert await self.tc.is_profile_running() is expected

Expand Down