From 5968e74b629b62103818cba44de86cf023434433 Mon Sep 17 00:00:00 2001 From: "jeremy.barisch.rooney@channable.com" Date: Thu, 30 Apr 2026 08:26:45 +0200 Subject: [PATCH] Add chunks_done field to submissions_failed --- libs/opsqueue_python/src/common.rs | 2 ++ libs/opsqueue_python/tests/test_roundtrip.py | 28 ++++++++++++++++++ ..._add_chunks_done_to_submissions_failed.sql | 1 + opsqueue/opsqueue_example_database_schema.db | Bin 102400 -> 102400 bytes opsqueue/src/common/submission.rs | 7 +++-- 5 files changed, 36 insertions(+), 2 deletions(-) create mode 100644 opsqueue/migrations/20260429173300_add_chunks_done_to_submissions_failed.sql diff --git a/libs/opsqueue_python/src/common.rs b/libs/opsqueue_python/src/common.rs index 5a1dabb..f19eaa6 100644 --- a/libs/opsqueue_python/src/common.rs +++ b/libs/opsqueue_python/src/common.rs @@ -311,6 +311,7 @@ impl From for SubmissionFailed { id: value.id.into(), failed_at: value.failed_at, chunks_total: value.chunks_total.into(), + chunks_done: value.chunks_done.map(Into::into), metadata: value.metadata, strategic_metadata: value.strategic_metadata, failed_chunk_id: value.failed_chunk_id.into(), @@ -456,6 +457,7 @@ pub struct SubmissionCompleted { pub struct SubmissionFailed { pub id: SubmissionId, pub chunks_total: u64, + pub chunks_done: Option, pub metadata: Option, pub strategic_metadata: Option, pub failed_at: DateTime, diff --git a/libs/opsqueue_python/tests/test_roundtrip.py b/libs/opsqueue_python/tests/test_roundtrip.py index 3a61e86..ad10541 100644 --- a/libs/opsqueue_python/tests/test_roundtrip.py +++ b/libs/opsqueue_python/tests/test_roundtrip.py @@ -480,3 +480,31 @@ def consume(x: int) -> None: producer_client.cancel_submission(submission_id) assert isinstance(exc_info.value.submission, SubmissionNotCancellable.Failed) assert isinstance(exc_info.value.chunk, ChunkFailed) + + +def test_failed_submission_includes_chunks_done(opsqueue: OpsqueueProcess) -> None: + """The SubmissionFailed (in a SubmissionFailedError) includes the count of + chunks done. + + """ + url = "file:///tmp/opsqueue/test_failed_submission_includes_chunks_done" + producer_client = ProducerClient(f"localhost:{opsqueue.port}", url) + chunks = [1, 2, 3] + submission_id = producer_client.insert_submission(chunks, chunk_size=1) + + def run_consumer() -> None: + consumer_client = ConsumerClient(f"localhost:{opsqueue.port}", url) + + def consume(x: int) -> int | None: + if x == chunks[-1]: + raise ValueError(f"Couldn't consume {x}") + return x + + consumer_client.run_each_op( + consume, strategy=strategy_from_description("Oldest") + ) + + with background_process(run_consumer): + with pytest.raises(SubmissionFailedError) as exc_info: + producer_client.blocking_stream_completed_submission(submission_id) + assert exc_info.value.submission.chunks_done == len(chunks) - 1 diff --git a/opsqueue/migrations/20260429173300_add_chunks_done_to_submissions_failed.sql b/opsqueue/migrations/20260429173300_add_chunks_done_to_submissions_failed.sql new file mode 100644 index 0000000..c3f1cfc --- /dev/null +++ b/opsqueue/migrations/20260429173300_add_chunks_done_to_submissions_failed.sql @@ -0,0 +1 @@ +ALTER TABLE submissions_failed ADD COLUMN chunks_done INTEGER; diff --git a/opsqueue/opsqueue_example_database_schema.db b/opsqueue/opsqueue_example_database_schema.db index 4d6ad39aac49256a3bb2c0107d27d7f13b0ed702..ae49303b2d692551e8a9a018ee8a0af45eadaebf 100644 GIT binary patch delta 330 zcmZozz}B#UZGyC*C<6n7BoM=Z%0wMwM$wH4OZX+YxUVtrFW`6OyUUly`-``qSBqyC zPXPC|&58oMxa%vp*jdAkIdhnWa_!d(nIxv9C?sc;=4BTvq~zzNDwO0a6qhFDW)>G` z=I0eFq$OtNq^1}d7@6rBnCKc=Dj1qtnHX9bnW(OhG2d$+RK9I@is;Us`R4<2-6UEvIN4dvH;2lsXO`gNpUlAjjsG?O z1OBW0r}+2tZ{c6bKaYR%W<`Z^{>k(7*;qkt+SMljM_|MI#B<_gZ<;_7vk#f8l<`Xzc8a1 Q6ARZ~CbR95VvNr#0UTLrdH?_b delta 160 zcmZozz}B#UZGyC*FaraFBoM=Z@);t7x26C-Q~;U{l(kQtHra6 zXS$*SFaQ7m diff --git a/opsqueue/src/common/submission.rs b/opsqueue/src/common/submission.rs index 9afc984..69040f1 100644 --- a/opsqueue/src/common/submission.rs +++ b/opsqueue/src/common/submission.rs @@ -176,6 +176,7 @@ pub struct SubmissionFailed { pub id: SubmissionId, pub prefix: Option, pub chunks_total: ChunkCount, + pub chunks_done: Option, pub chunk_size: ChunkSize, pub metadata: Option, pub strategic_metadata: Option, @@ -595,6 +596,7 @@ pub mod db { id AS "id: SubmissionId" , prefix , chunks_total AS "chunks_total: ChunkCount" + , chunks_done AS "chunks_done: ChunkCount" , chunk_size AS "chunk_size!: ChunkSize" , metadata , ( SELECT json_group_object(metadata_key, metadata_value) @@ -615,6 +617,7 @@ pub mod db { id: row.id, prefix: row.prefix, chunks_total: row.chunks_total, + chunks_done: row.chunks_done, chunk_size: row.chunk_size, metadata: row.metadata, strategic_metadata: row.strategic_metadata.map(|json| json.0), @@ -816,8 +819,8 @@ pub mod db { query!( " INSERT INTO submissions_failed - (id, chunks_total, prefix, metadata, failed_at, failed_chunk_id) - SELECT id, chunks_total, prefix, metadata, julianday($1), $2 FROM submissions WHERE id = $3; + (id, chunks_total, chunks_done, prefix, metadata, failed_at, failed_chunk_id) + SELECT id, chunks_total, chunks_done, prefix, metadata, julianday($1), $2 FROM submissions WHERE id = $3; DELETE FROM submissions WHERE id = $4 RETURNING *; ",