diff --git a/pylabrobot/thermocycling/backend.py b/pylabrobot/thermocycling/backend.py index cf0955364ae..2ba239415e9 100644 --- a/pylabrobot/thermocycling/backend.py +++ b/pylabrobot/thermocycling/backend.py @@ -53,18 +53,10 @@ async def run_protocol(self, protocol: Protocol, block_max_volume: float): async def get_block_current_temperature(self) -> List[float]: """Get the current block temperature zones in °C.""" - @abstractmethod - async def get_block_target_temperature(self) -> List[float]: - """Get the block target temperature zones in °C. May raise RuntimeError if no target is set.""" - @abstractmethod async def get_lid_current_temperature(self) -> List[float]: """Get the current lid temperature zones in °C.""" - @abstractmethod - async def get_lid_target_temperature(self) -> List[float]: - """Get the lid target temperature zones in °C. May raise RuntimeError if no target is set.""" - @abstractmethod async def get_lid_open(self) -> bool: """Return ``True`` if the lid is open.""" @@ -85,14 +77,6 @@ async def get_hold_time(self) -> float: async def get_current_cycle_index(self) -> int: """Get the zero-based index of the current cycle.""" - @abstractmethod - async def get_total_cycle_count(self) -> int: - """Get the total cycle count.""" - @abstractmethod async def get_current_step_index(self) -> int: """Get the zero-based index of the current step within the cycle.""" - - @abstractmethod - async def get_total_step_count(self) -> int: - """Get the total number of steps in the current cycle.""" diff --git a/pylabrobot/thermocycling/chatterbox.py b/pylabrobot/thermocycling/chatterbox.py index 1c45e40752d..8d45ceede97 100644 --- a/pylabrobot/thermocycling/chatterbox.py +++ b/pylabrobot/thermocycling/chatterbox.py @@ -151,33 +151,17 @@ async def get_hold_time(self) -> float: async def get_current_cycle_index(self) -> int: return 1 - async def get_total_cycle_count(self) -> int: - return 1 - async def get_current_step_index(self) -> int: # If the profile is "running", it means the simulation hasn't happened yet. # The moment get_hold_time is called, it will complete instantly. return self._state.total_steps if not self._state.is_profile_running else 1 - async def get_total_step_count(self) -> int: - return self._state.total_steps - async def get_block_current_temperature(self) -> List[float]: return self._state.block_temp - async def get_block_target_temperature(self) -> List[float]: - if self._state.block_target is None: - raise RuntimeError("Block target temperature is not set. Is a cycle running?") - return self._state.block_target - async def get_lid_current_temperature(self) -> List[float]: return self._state.lid_temp - async def get_lid_target_temperature(self) -> List[float]: - if self._state.lid_target is None: - raise RuntimeError("Lid target temperature is not set. Is a cycle running?") - return self._state.lid_target - async def get_lid_open(self) -> bool: return self._state.lid_open diff --git a/pylabrobot/thermocycling/inheco/odtc_backend.py b/pylabrobot/thermocycling/inheco/odtc_backend.py index 84e1b69665b..3e9c8b5a8bb 100644 --- a/pylabrobot/thermocycling/inheco/odtc_backend.py +++ b/pylabrobot/thermocycling/inheco/odtc_backend.py @@ -184,9 +184,6 @@ async def get_block_current_temperature(self) -> List[float]: temps = await self.get_sensor_data() return [temps.get("Mount", 0.0)] - async def get_block_target_temperature(self) -> List[float]: - raise NotImplementedError() - async def get_block_status(self) -> BlockStatus: raise NotImplementedError() @@ -208,9 +205,6 @@ async def get_lid_current_temperature(self) -> List[float]: temps = await self.get_sensor_data() return [temps.get("Lid", 0.0)] - async def get_lid_target_temperature(self) -> List[float]: - raise NotImplementedError() - # ------------------------------------------------------------------------- # Protocol # ------------------------------------------------------------------------- @@ -382,11 +376,5 @@ async def get_hold_time(self) -> float: async def get_current_cycle_index(self) -> int: raise NotImplementedError() - async def get_total_cycle_count(self) -> int: - raise NotImplementedError() - async def get_current_step_index(self) -> int: raise NotImplementedError() - - async def get_total_step_count(self) -> int: - raise NotImplementedError() diff --git a/pylabrobot/thermocycling/opentrons_backend.py b/pylabrobot/thermocycling/opentrons_backend.py index 039708f348f..34df30a87eb 100644 --- a/pylabrobot/thermocycling/opentrons_backend.py +++ b/pylabrobot/thermocycling/opentrons_backend.py @@ -126,22 +126,9 @@ def _find_module(self) -> dict: async def get_block_current_temperature(self) -> List[float]: return [cast(float, self._find_module()["currentTemperature"])] - async def get_block_target_temperature(self) -> List[float]: - target_temp = self._find_module().get("targetTemperature") - if target_temp is None: - raise RuntimeError("Block target temperature is not set. is a cycle running?") - return [cast(float, target_temp)] - async def get_lid_current_temperature(self) -> List[float]: return [cast(float, self._find_module()["lidTemperature"])] - async def get_lid_target_temperature(self) -> List[float]: - """Get the lid target temperature in °C. Raises RuntimeError if no target is active.""" - target_temp = self._find_module().get("lidTargetTemperature") - if target_temp is None: - raise RuntimeError("Lid target temperature is not set. is a cycle running?") - return [cast(float, target_temp)] - async def get_lid_open(self) -> bool: return cast(str, self._find_module()["lidStatus"]) == "open" @@ -185,10 +172,6 @@ async def get_current_cycle_index(self) -> int: raise RuntimeError("Current cycle index is not available. Is a profile running?") - async def get_total_cycle_count(self) -> int: - # https://github.com/PyLabRobot/pylabrobot/issues/632 - raise NotImplementedError('Opentrons "cycle" concept is not understood currently.') - async def get_current_step_index(self) -> int: """Get the zero-based index of the current step from the Opentrons API.""" # Opentrons API returns one-based, convert to zero-based @@ -196,9 +179,3 @@ async def get_current_step_index(self) -> int: if step_index is None: raise RuntimeError("Current step index is not available. Is a profile running?") return cast(int, step_index) - 1 - - async def get_total_step_count(self) -> int: - total_steps = self._find_module().get("totalStepCount") - if total_steps is None: - raise RuntimeError("Total step count is not available. Is a profile running?") - return cast(int, total_steps) diff --git a/pylabrobot/thermocycling/opentrons_backend_tests.py b/pylabrobot/thermocycling/opentrons_backend_tests.py index c68460a991e..2f3cba3f2b1 100644 --- a/pylabrobot/thermocycling/opentrons_backend_tests.py +++ b/pylabrobot/thermocycling/opentrons_backend_tests.py @@ -98,17 +98,13 @@ async def test_getters_return_correct_data(self, mock_list_connected_modules): mock_list_connected_modules.return_value = [mock_data] assert await self.thermocycler_backend.get_block_current_temperature() == [25.5] - assert await self.thermocycler_backend.get_block_target_temperature() == [95.0] assert await self.thermocycler_backend.get_lid_current_temperature() == [37.1] - assert await self.thermocycler_backend.get_lid_target_temperature() == [105.0] assert await self.thermocycler_backend.get_lid_open() is True assert await self.thermocycler_backend.get_lid_status() == LidStatus.HOLDING_AT_TARGET assert await self.thermocycler_backend.get_block_status() == BlockStatus.HOLDING_AT_TARGET assert await self.thermocycler_backend.get_hold_time() == 12.0 # assert await self.thermocycler_backend.get_current_cycle_index() == 1 # 2 - 1 = 1 (zero-based) - # assert await self.thermocycler_backend.get_total_cycle_count() == 10 assert await self.thermocycler_backend.get_current_step_index() == 0 # 1 - 1 = 0 (zero-based) - assert await self.thermocycler_backend.get_total_step_count() == 3 @patch("pylabrobot.thermocycling.opentrons_backend.list_connected_modules") async def test_get_hold_time_raises_if_not_running(self, mock_list_connected_modules): diff --git a/pylabrobot/thermocycling/opentrons_backend_usb.py b/pylabrobot/thermocycling/opentrons_backend_usb.py index 41daf9a002c..2ecb246e8f9 100644 --- a/pylabrobot/thermocycling/opentrons_backend_usb.py +++ b/pylabrobot/thermocycling/opentrons_backend_usb.py @@ -281,25 +281,11 @@ async def get_block_current_temperature(self) -> List[float]: plate_temp = await self._driver.get_plate_temperature() return [plate_temp.current] - async def get_block_target_temperature(self) -> List[float]: - assert self._driver is not None - plate_temp = await self._driver.get_plate_temperature() - if plate_temp.target is not None: - return [plate_temp.target] - raise RuntimeError("Block target temperature is not set.") - async def get_lid_current_temperature(self) -> List[float]: assert self._driver is not None lid_temp = await self._driver.get_lid_temperature() return [lid_temp.current] - async def get_lid_target_temperature(self) -> List[float]: - assert self._driver is not None - lid_temp = await self._driver.get_lid_temperature() - if lid_temp.target is not None: - return [lid_temp.target] - raise RuntimeError("Lid target temperature is not set.") - async def get_lid_open(self) -> bool: """Return True if the lid is open.""" assert self._driver is not None @@ -326,11 +312,5 @@ async def get_hold_time(self) -> float: async def get_current_cycle_index(self) -> int: return self._current_cycle_index if self._current_cycle_index is not None else 0 - async def get_total_cycle_count(self) -> int: - return self._total_cycle_count if self._total_cycle_count is not None else 0 - async def get_current_step_index(self) -> int: return self._current_step_index if self._current_step_index is not None else 0 - - async def get_total_step_count(self) -> int: - return self._total_step_count if self._total_step_count is not None else 0 diff --git a/pylabrobot/thermocycling/thermo_fisher/thermo_fisher_thermocycler.py b/pylabrobot/thermocycling/thermo_fisher/thermo_fisher_thermocycler.py index 86bab4c0e3d..9e0dbb0ef7d 100644 --- a/pylabrobot/thermocycling/thermo_fisher/thermo_fisher_thermocycler.py +++ b/pylabrobot/thermocycling/thermo_fisher/thermo_fisher_thermocycler.py @@ -1036,19 +1036,3 @@ async def get_lid_open(self, *args, **kwargs): async def get_lid_status(self, *args, **kwargs) -> LidStatus: raise NotImplementedError - - async def get_lid_target_temperature(self, *args, **kwargs): - # deprecated - raise NotImplementedError - - async def get_total_cycle_count(self, *args, **kwargs): - # deprecated - raise NotImplementedError - - async def get_total_step_count(self, *args, **kwargs): - # deprecated - raise NotImplementedError - - async def get_block_target_temperature(self, *args, **kwargs): - # deprecated - raise NotImplementedError diff --git a/pylabrobot/thermocycling/thermocycler.py b/pylabrobot/thermocycling/thermocycler.py index 622599d47a2..b602f4073ea 100644 --- a/pylabrobot/thermocycling/thermocycler.py +++ b/pylabrobot/thermocycling/thermocycler.py @@ -48,6 +48,11 @@ def __init__( ) Machine.__init__(self, backend=backend) self.backend: ThermocyclerBackend = backend + self._block_target_temperature: Optional[List[float]] = None + self._lid_target_temperature: Optional[List[float]] = None + self._current_protocol: Optional[Protocol] = None + self._total_cycle_count: int = 0 + self._total_step_count: int = 0 async def open_lid(self, **backend_kwargs): return await self.backend.open_lid(**backend_kwargs) @@ -61,6 +66,7 @@ async def set_block_temperature(self, temperature: List[float], **backend_kwargs Args: temperature: List of target temperatures in °C for multiple zones. """ + self._block_target_temperature = list(temperature) return await self.backend.set_block_temperature(temperature, **backend_kwargs) async def set_lid_temperature(self, temperature: List[float], **backend_kwargs): @@ -69,6 +75,7 @@ async def set_lid_temperature(self, temperature: List[float], **backend_kwargs): Args: temperature: List of target temperatures in °C for multiple zones. """ + self._lid_target_temperature = list(temperature) return await self.backend.set_lid_temperature(temperature, **backend_kwargs) async def deactivate_block(self, **backend_kwargs): @@ -96,6 +103,11 @@ async def run_protocol(self, protocol: Protocol, block_max_volume: float, **back f"Expected {num_zones}, got {len(step.temperature)} in step {i}." ) + self._current_protocol = protocol + self._block_target_temperature = list(protocol.stages[0].steps[0].temperature) + self._total_cycle_count = sum(stage.repeats for stage in protocol.stages) + self._total_step_count = sum(len(stage.steps) * stage.repeats for stage in protocol.stages) + return await self.backend.run_protocol(protocol, block_max_volume, **backend_kwargs) async def run_pcr_profile( @@ -180,7 +192,9 @@ async def get_block_current_temperature(self, **backend_kwargs) -> List[float]: async def get_block_target_temperature(self, **backend_kwargs) -> List[float]: """Get the block's target temperature(s) (°C).""" - return await self.backend.get_block_target_temperature(**backend_kwargs) + if self._block_target_temperature is None: + raise RuntimeError("Block target temperature is not set.") + return self._block_target_temperature async def get_lid_current_temperature(self, **backend_kwargs) -> List[float]: """Get the current lid temperature(s) (°C).""" @@ -188,7 +202,9 @@ async def get_lid_current_temperature(self, **backend_kwargs) -> List[float]: async def get_lid_target_temperature(self, **backend_kwargs) -> List[float]: """Get the lid's target temperature(s) (°C), if supported.""" - return await self.backend.get_lid_target_temperature(**backend_kwargs) + if self._lid_target_temperature is None: + raise RuntimeError("Lid target temperature is not set.") + return self._lid_target_temperature async def get_lid_open(self, **backend_kwargs) -> bool: """Return ``True`` if the lid is open.""" @@ -212,15 +228,15 @@ async def get_current_cycle_index(self, **backend_kwargs) -> int: async def get_total_cycle_count(self, **backend_kwargs) -> int: """Get the total number of cycles.""" - return await self.backend.get_total_cycle_count(**backend_kwargs) + return self._total_cycle_count async def get_current_step_index(self, **backend_kwargs) -> int: """Get the one-based index of the current step.""" return await self.backend.get_current_step_index(**backend_kwargs) async def get_total_step_count(self, **backend_kwargs) -> int: - """Get the total number of steps in the current cycle.""" - return await self.backend.get_total_step_count(**backend_kwargs) + """Get the total number of steps in the current protocol.""" + return self._total_step_count async def wait_for_block(self, timeout: float = 600, tolerance: float = 0.5, **backend_kwargs): """Wait until block temp reaches target ± tolerance for all zones.""" @@ -234,22 +250,23 @@ async def wait_for_block(self, timeout: float = 600, tolerance: float = 0.5, **b raise TimeoutError("Block temperature timeout.") async def wait_for_lid(self, timeout: float = 1200, tolerance: float = 0.5, **backend_kwargs): - """Wait until the lid temperature reaches target ± ``tolerance`` or the lid temperature status is idle/holding at target.""" + """Wait until the lid temperature reaches target ± ``tolerance`` or the lid status indicates it is ready.""" try: targets = await self.get_lid_target_temperature(**backend_kwargs) except RuntimeError: targets = None + start = time.time() while time.time() - start < timeout: + status = await self.get_lid_status(**backend_kwargs) + if status in [LidStatus.IDLE, LidStatus.HOLDING_AT_TARGET, "idle", "holding at target"]: + return + if targets is not None: currents = await self.get_lid_current_temperature(**backend_kwargs) if all(abs(current - target) < tolerance for current, target in zip(currents, targets)): return - else: - # If no target temperature, check status - status = await self.get_lid_status(**backend_kwargs) - if status in ["idle", "holding at target"]: - return + await asyncio.sleep(1) raise TimeoutError("Lid temperature timeout.") @@ -261,15 +278,22 @@ async def is_profile_running(self, **backend_kwargs) -> bool: step = await self.get_current_step_index(**backend_kwargs) total_steps = await self.get_total_step_count(**backend_kwargs) - # if still holding in a step, it's running + # If still holding in a step, it is running. if hold and hold > 0: return True - # if haven't reached last cycle (zero-based indexing) - if cycle < total_cycles - 1: + + # Some backends report the completed step count after finishing. + if total_steps and step >= total_steps: + return False + + # If we have not reached the last cycle, it is running. + if total_cycles and cycle < total_cycles - 1: return True - # last cycle but not last step (zero-based indexing) - if cycle == total_cycles - 1 and step < total_steps - 1: + + # Last cycle but not last step. + if total_steps and cycle == total_cycles - 1 and step < total_steps - 1: return True + return False async def wait_for_profile_completion(self, poll_interval: float = 60.0, **backend_kwargs): diff --git a/pylabrobot/thermocycling/thermocycler_tests.py b/pylabrobot/thermocycling/thermocycler_tests.py index db0d4821c6f..7f691c6a121 100644 --- a/pylabrobot/thermocycling/thermocycler_tests.py +++ b/pylabrobot/thermocycling/thermocycler_tests.py @@ -24,17 +24,13 @@ def mock_backend() -> MagicMock: mock.deactivate_lid = AsyncMock() mock.run_protocol = AsyncMock() mock.get_block_current_temperature = AsyncMock(return_value=[25.0]) - mock.get_block_target_temperature = AsyncMock(return_value=None) mock.get_lid_current_temperature = AsyncMock(return_value=[25.0]) - mock.get_lid_target_temperature = AsyncMock(return_value=None) mock.get_lid_open = AsyncMock(return_value=False) mock.get_lid_temperature_status = AsyncMock(return_value="idle") mock.get_block_status = AsyncMock(return_value="idle") mock.get_hold_time = AsyncMock(return_value=0.0) mock.get_current_cycle_index = AsyncMock(return_value=0) - mock.get_total_cycle_count = AsyncMock(return_value=0) mock.get_current_step_index = AsyncMock(return_value=0) - mock.get_total_step_count = AsyncMock(return_value=0) return mock @@ -132,9 +128,9 @@ async def test_is_profile_running_logic(self): for hold, cycle, total_cycles, step, total_steps, expected in test_cases: self.tc.backend.get_hold_time.return_value = hold # type: ignore self.tc.backend.get_current_cycle_index.return_value = cycle # type: ignore - self.tc.backend.get_total_cycle_count.return_value = total_cycles # type: ignore + self.tc._total_cycle_count = total_cycles self.tc.backend.get_current_step_index.return_value = step # type: ignore - self.tc.backend.get_total_step_count.return_value = total_steps # type: ignore + self.tc._total_step_count = total_steps print(f"Testing with hold={hold}, cycle={cycle}, total_cycles={total_cycles}, ") assert await self.tc.is_profile_running() is expected