From 65b3b3ebeda840391d69e323d17c98e54b6fc95e Mon Sep 17 00:00:00 2001 From: Torsten Dittmann Date: Tue, 28 Apr 2026 14:36:37 +0400 Subject: [PATCH 1/2] feat: handle parallel chunk upload assembly race in Local and S3 devices --- .gitignore | 3 ++- src/Storage/Device/Local.php | 11 +++++++- src/Storage/Device/S3.php | 10 ++++++++ tests/Storage/Device/LocalTest.php | 40 +++++++++++++++++++++++++++--- 4 files changed, 59 insertions(+), 5 deletions(-) diff --git a/.gitignore b/.gitignore index 3491b43d..282ca759 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,5 @@ .phpunit.result.cache tests/chunk.php .idea/ -.env \ No newline at end of file +.env +.DS_Store \ No newline at end of file diff --git a/src/Storage/Device/Local.php b/src/Storage/Device/Local.php index c25612d9..23757936 100644 --- a/src/Storage/Device/Local.php +++ b/src/Storage/Device/Local.php @@ -164,8 +164,12 @@ private function countChunks(string $tmp, string $path): int private function joinChunks(string $path, int $chunks): void { + if (\file_exists($path)) { + return; + } + $tmp = \dirname($path).DIRECTORY_SEPARATOR.'tmp_'.\basename($path); - $tmpAssemble = \dirname($path).DIRECTORY_SEPARATOR.'tmp_assemble_'.\basename($path); + $tmpAssemble = \tempnam(\dirname($path), 'tmp_assemble_'.\basename($path).'_'); $dest = \fopen($tmpAssemble, 'wb'); if ($dest === false) { @@ -195,6 +199,11 @@ private function joinChunks(string $path, int $chunks): void \fclose($dest); if (! \rename($tmpAssemble, $path)) { + if (\file_exists($path)) { + \unlink($tmpAssemble); + + return; + } \unlink($tmpAssemble); throw new Exception('Failed to finalize assembled file '.$path); } diff --git a/src/Storage/Device/S3.php b/src/Storage/Device/S3.php index d2e79c4e..5e65c095 100644 --- a/src/Storage/Device/S3.php +++ b/src/Storage/Device/S3.php @@ -203,6 +203,16 @@ public function uploadData(string $data, string $path, string $contentType, int } $metadata['parts'][$chunk] = $etag; if ($metadata['chunks'] == $chunks) { + $headers = $this->headers; + $amzHeaders = $this->amzHeaders; + + if ($this->exists($path)) { + return $metadata['chunks']; + } + + $this->headers = $headers; + $this->amzHeaders = $amzHeaders; + $this->completeMultipartUpload($path, $uploadId, $metadata['parts']); } diff --git a/tests/Storage/Device/LocalTest.php b/tests/Storage/Device/LocalTest.php index 2146d413..d141a6c2 100644 --- a/tests/Storage/Device/LocalTest.php +++ b/tests/Storage/Device/LocalTest.php @@ -524,19 +524,20 @@ public function testJoinChunksStaleAssemblyFileIsOverwritten(): void { $storage = $this->makeJoinTestStorage(); $dest = $storage->getRoot().DIRECTORY_SEPARATOR.'test.dat'; - $tmpAssemble = $storage->getRoot().DIRECTORY_SEPARATOR.'tmp_assemble_test.dat'; $storage->uploadData('AAAA', $dest, 'application/octet-stream', 1, 3); $storage->uploadData('BBBB', $dest, 'application/octet-stream', 2, 3); // Simulate a stale assembly file left by a previously crashed attempt. - \file_put_contents($tmpAssemble, 'STALE_GARBAGE_DATA'); + // With unique temp paths (tempnam), stale files at old hardcoded paths + // are naturally bypassed rather than overwritten. + $staleFile = $storage->getRoot().DIRECTORY_SEPARATOR.'tmp_assemble_test.dat'; + \file_put_contents($staleFile, 'STALE_GARBAGE_DATA'); $storage->uploadData('CCCC', $dest, 'application/octet-stream', 3, 3); $this->assertTrue(\file_exists($dest)); $this->assertSame('AAAABBBBCCCC', \file_get_contents($dest), 'Stale assembly file must not corrupt output'); - $this->assertFalse(\file_exists($tmpAssemble), 'Temp assembly file should be removed after successful rename'); $storage->delete($storage->getRoot(), true); } @@ -577,4 +578,37 @@ public function testOutOfOrderUploadWithRetry(): void $storage->delete($storage->getRoot(), true); } + + public function testParallelChunkUpload(): void + { + $storage = $this->makeJoinTestStorage(); + $dest = $storage->getRoot().DIRECTORY_SEPARATOR.'parallel.dat'; + + // Upload chunk 1 (creates temp directory) + $storage->uploadData('AAAA', $dest, 'application/octet-stream', 1, 2); + + // Upload chunk 2 (assembles the file) + $storage->uploadData('BBBB', $dest, 'application/octet-stream', 2, 2); + + // Verify file exists and is correct + $this->assertTrue(\file_exists($dest)); + $this->assertSame('AAAABBBB', \file_get_contents($dest)); + + // Simulate the race where another request already assembled the file + // by calling joinChunks directly when the file already exists + $reflection = new \ReflectionClass($storage); + $method = $reflection->getMethod('joinChunks'); + $method->setAccessible(true); + + try { + $method->invoke($storage, $dest, 2); + } catch (\Exception $e) { + $this->fail('Duplicate assembly should not throw: '.$e->getMessage()); + } + + $this->assertTrue(\file_exists($dest), 'File should still exist after duplicate assembly attempt'); + $this->assertSame('AAAABBBB', \file_get_contents($dest), 'File content must not be corrupted'); + + $storage->delete($storage->getRoot(), true); + } } From 0385c1262651c184eb309e8cc51dcaa3f174374f Mon Sep 17 00:00:00 2001 From: Torsten Dittmann Date: Thu, 7 May 2026 10:08:38 +0400 Subject: [PATCH 2/2] feat: split chunked upload phases --- src/Storage/Device.php | 27 ++++++ src/Storage/Device/Local.php | 60 +++++++++++-- src/Storage/Device/S3.php | 101 +++++++++++++++++----- src/Storage/Device/Telemetry.php | 15 ++++ tests/Storage/Device/LocalTest.php | 45 ++++++++++ tests/Storage/Device/S3SlowDownTest.php | 109 ++++++++++++++++++++++++ 6 files changed, 327 insertions(+), 30 deletions(-) diff --git a/src/Storage/Device.php b/src/Storage/Device.php index 961339c1..b323ffd0 100644 --- a/src/Storage/Device.php +++ b/src/Storage/Device.php @@ -97,6 +97,33 @@ abstract public function getPath(string $filename, ?string $prefix = null): stri */ abstract public function upload(string $source, string $path, int $chunk = 1, int $chunks = 1, array &$metadata = []): int; + /** + * Prepare Upload. + * + * Initialize adapter-specific upload state without transferring a chunk body. + * + * @throws Exception + */ + abstract public function prepareUpload(string $path, string $contentType, int $chunks = 1, array &$metadata = []): void; + + /** + * Upload Chunk. + * + * Upload exactly one chunk without finalizing the full upload. + * + * @throws Exception + */ + abstract public function uploadChunk(string $source, string $path, int $chunk = 1, int $chunks = 1, array &$metadata = []): int; + + /** + * Finalize Upload. + * + * Complete a prepared upload once all chunks are known to be present. + * + * @throws Exception + */ + abstract public function finalizeUpload(string $path, int $chunks = 1, array &$metadata = []): bool; + /** * Upload Data. * diff --git a/src/Storage/Device/Local.php b/src/Storage/Device/Local.php index 23757936..b3550580 100644 --- a/src/Storage/Device/Local.php +++ b/src/Storage/Device/Local.php @@ -55,16 +55,37 @@ public function getPath(string $filename, ?string $prefix = null): string * @throws Exception */ public function upload(string $source, string $path, int $chunk = 1, int $chunks = 1, array &$metadata = []): int + { + $this->prepareUpload($path, '', $chunks, $metadata); + $chunksReceived = $this->uploadChunk($source, $path, $chunk, $chunks, $metadata); + + if ($chunks === $chunksReceived) { + $this->finalizeUpload($path, $chunks, $metadata); + } + + return $chunksReceived; + } + + public function prepareUpload(string $path, string $contentType, int $chunks = 1, array &$metadata = []): void { $this->createDirectory(\dirname($path)); + $metadata['parts'] ??= []; + $metadata['chunks'] ??= 0; + } + + public function uploadChunk(string $source, string $path, int $chunk = 1, int $chunks = 1, array &$metadata = []): int + { + $this->prepareUpload($path, '', $chunks, $metadata); - // move_uploaded_file() verifies the file is not tampered with if ($chunks === 1) { - if (! \move_uploaded_file($source, $path)) { + if (! \move_uploaded_file($source, $path) && ! \rename($source, $path)) { throw new Exception('Can\'t upload file '.$path); } - return $chunks; + $metadata['parts'][$chunk] = true; + $metadata['chunks'] = 1; + + return 1; } $tmp = \dirname($path).DIRECTORY_SEPARATOR.'tmp_'.\basename($path); @@ -84,14 +105,33 @@ public function upload(string $source, string $path, int $chunk = 1, int $chunks } $chunksReceived = $this->countChunks($tmp, $path); + $metadata['parts'][$chunk] = true; + $metadata['chunks'] = $chunksReceived; - if ($chunks === $chunksReceived) { - $this->joinChunks($path, $chunks); + return $chunksReceived; + } - return $chunksReceived; + public function finalizeUpload(string $path, int $chunks = 1, array &$metadata = []): bool + { + if (\file_exists($path)) { + return true; } - return $chunksReceived; + if ($chunks === 1) { + return false; + } + + $tmp = \dirname($path).DIRECTORY_SEPARATOR.'tmp_'.\basename($path); + for ($i = 1; $i <= $chunks; $i++) { + $part = $tmp.DIRECTORY_SEPARATOR.\pathinfo($path, PATHINFO_FILENAME).'.part.'.$i; + if (! \file_exists($part)) { + throw new Exception('Missing chunk '.$i); + } + } + + $this->joinChunks($path, $chunks); + + return true; } /** @@ -108,7 +148,7 @@ public function upload(string $source, string $path, int $chunk = 1, int $chunks */ public function uploadData(string $data, string $path, string $contentType, int $chunk = 1, int $chunks = 1, array &$metadata = []): int { - $this->createDirectory(\dirname($path)); + $this->prepareUpload($path, $contentType, $chunks, $metadata); if ($chunks === 1) { if (! \file_put_contents($path, $data)) { @@ -131,9 +171,11 @@ public function uploadData(string $data, string $path, string $contentType, int } $chunksReceived = $this->countChunks($tmp, $path); + $metadata['parts'][$chunk] = true; + $metadata['chunks'] = $chunksReceived; if ($chunks === $chunksReceived) { - $this->joinChunks($path, $chunks); + $this->finalizeUpload($path, $chunks, $metadata); return $chunksReceived; } diff --git a/src/Storage/Device/S3.php b/src/Storage/Device/S3.php index 5e65c095..94d3b753 100644 --- a/src/Storage/Device/S3.php +++ b/src/Storage/Device/S3.php @@ -167,7 +167,64 @@ public static function setRetryDelay(int $delay): void */ public function upload(string $source, string $path, int $chunk = 1, int $chunks = 1, array &$metadata = []): int { - return $this->uploadData(\file_get_contents($source), $path, \mime_content_type($source), $chunk, $chunks, $metadata); + $contentType = \mime_content_type($source) ?: ''; + $this->prepareUpload($path, $contentType, $chunks, $metadata); + $chunksReceived = $this->uploadChunk($source, $path, $chunk, $chunks, $metadata); + + if ($chunks === $chunksReceived) { + $this->finalizeUpload($path, $chunks, $metadata); + } + + return $chunksReceived; + } + + public function prepareUpload(string $path, string $contentType, int $chunks = 1, array &$metadata = []): void + { + $metadata['parts'] ??= []; + $metadata['chunks'] ??= 0; + $metadata['content_type'] ??= $contentType; + + if ($chunks === 1 || ! empty($metadata['uploadId'])) { + return; + } + + $metadata['uploadId'] = $this->createMultipartUpload($path, $contentType); + } + + public function uploadChunk(string $source, string $path, int $chunk = 1, int $chunks = 1, array &$metadata = []): int + { + $data = \file_get_contents($source); + if ($data === false) { + throw new Exception('Can\'t read file '.$source); + } + + return $this->uploadChunkData($data, $path, $metadata['content_type'] ?? (\mime_content_type($source) ?: ''), $chunk, $chunks, $metadata); + } + + public function finalizeUpload(string $path, int $chunks = 1, array &$metadata = []): bool + { + if ($this->exists($path)) { + return true; + } + + if ($chunks === 1) { + return false; + } + + if (empty($metadata['uploadId'])) { + throw new Exception('Missing multipart upload ID'); + } + + $metadata['parts'] ??= []; + for ($i = 1; $i <= $chunks; $i++) { + if (! array_key_exists($i, $metadata['parts'])) { + throw new Exception('Missing chunk '.$i); + } + } + + $this->completeMultipartUpload($path, $metadata['uploadId'], $metadata['parts']); + + return true; } /** @@ -183,38 +240,40 @@ public function upload(string $source, string $path, int $chunk = 1, int $chunks * @throws Exception */ public function uploadData(string $data, string $path, string $contentType, int $chunk = 1, int $chunks = 1, array &$metadata = []): int + { + $this->prepareUpload($path, $contentType, $chunks, $metadata); + $chunksReceived = $this->uploadChunkData($data, $path, $contentType, $chunk, $chunks, $metadata); + + if ($chunks === $chunksReceived) { + $this->finalizeUpload($path, $chunks, $metadata); + } + + return $chunksReceived; + } + + private function uploadChunkData(string $data, string $path, string $contentType, int $chunk = 1, int $chunks = 1, array &$metadata = []): int { if ($chunk == 1 && $chunks == 1) { - return $this->write($path, $data, $contentType); + $this->write($path, $data, $contentType); + $metadata['parts'][$chunk] = true; + $metadata['chunks'] = 1; + + return 1; } - $uploadId = $metadata['uploadId'] ?? null; - if (empty($uploadId)) { - $uploadId = $this->createMultipartUpload($path, $contentType); - $metadata['uploadId'] = $uploadId; + + if (empty($metadata['uploadId'])) { + throw new Exception('Missing multipart upload ID'); } $metadata['parts'] ??= []; $metadata['chunks'] ??= 0; - $etag = $this->uploadPart($data, $path, $contentType, $chunk, $uploadId); + $etag = $this->uploadPart($data, $path, $contentType, $chunk, $metadata['uploadId']); // skip incrementing if the chunk was re-uploaded if (! array_key_exists($chunk, $metadata['parts'])) { $metadata['chunks']++; } $metadata['parts'][$chunk] = $etag; - if ($metadata['chunks'] == $chunks) { - $headers = $this->headers; - $amzHeaders = $this->amzHeaders; - - if ($this->exists($path)) { - return $metadata['chunks']; - } - - $this->headers = $headers; - $this->amzHeaders = $amzHeaders; - - $this->completeMultipartUpload($path, $uploadId, $metadata['parts']); - } return $metadata['chunks']; } @@ -307,7 +366,7 @@ protected function completeMultipartUpload(string $path, string $uploadId, array { $uri = $path !== '' ? '/'.\str_replace(['%2F', '%3F'], ['/', '?'], \rawurlencode($path)) : '/'; - \ksort($parts); + \ksort($parts, SORT_NUMERIC); $body = ''; foreach ($parts as $key => $etag) { diff --git a/src/Storage/Device/Telemetry.php b/src/Storage/Device/Telemetry.php index 206aea85..c285974c 100644 --- a/src/Storage/Device/Telemetry.php +++ b/src/Storage/Device/Telemetry.php @@ -64,6 +64,21 @@ public function upload(string $source, string $path, int $chunk = 1, int $chunks return $this->measure(__FUNCTION__, $source, $path, $chunk, $chunks, $metadata); } + public function prepareUpload(string $path, string $contentType, int $chunks = 1, array &$metadata = []): void + { + $this->measure(__FUNCTION__, $path, $contentType, $chunks, $metadata); + } + + public function uploadChunk(string $source, string $path, int $chunk = 1, int $chunks = 1, array &$metadata = []): int + { + return $this->measure(__FUNCTION__, $source, $path, $chunk, $chunks, $metadata); + } + + public function finalizeUpload(string $path, int $chunks = 1, array &$metadata = []): bool + { + return $this->measure(__FUNCTION__, $path, $chunks, $metadata); + } + public function uploadData(string $data, string $path, string $contentType, int $chunk = 1, int $chunks = 1, array &$metadata = []): int { return $this->measure(__FUNCTION__, $data, $path, $contentType, $chunk, $chunks, $metadata); diff --git a/tests/Storage/Device/LocalTest.php b/tests/Storage/Device/LocalTest.php index d141a6c2..0df27847 100644 --- a/tests/Storage/Device/LocalTest.php +++ b/tests/Storage/Device/LocalTest.php @@ -190,6 +190,51 @@ public function testPartUpload() return $dest; } + public function testUploadChunkDoesNotFinalizeUntilFinalizeUpload(): void + { + $dest = $this->object->getPath('chunked-phase-upload.txt'); + $metadata = []; + $parts = [ + 2 => 'bbb', + 1 => 'aaa', + 3 => 'ccc', + ]; + + foreach ($parts as $chunk => $data) { + $source = __DIR__.'/chunk-'.$chunk.'.part'; + file_put_contents($source, $data); + + $this->object->uploadChunk($source, $dest, $chunk, 3, $metadata); + $this->assertFalse($this->object->exists($dest)); + } + + $this->assertSame(3, $metadata['chunks']); + $this->assertTrue($this->object->finalizeUpload($dest, 3, $metadata)); + $this->assertSame('aaabbbccc', $this->object->read($dest)); + $this->assertTrue($this->object->finalizeUpload($dest, 3, $metadata)); + + $this->object->delete($dest); + } + + public function testFinalizeUploadRequiresAllLocalChunks(): void + { + $dest = $this->object->getPath('chunked-phase-missing.txt'); + $metadata = []; + $source = __DIR__.'/chunk-missing.part'; + file_put_contents($source, 'aaa'); + + $this->object->uploadChunk($source, $dest, 1, 2, $metadata); + + try { + $this->object->finalizeUpload($dest, 2, $metadata); + $this->fail('Expected missing chunk exception'); + } catch (\Exception $e) { + $this->assertSame('Missing chunk 2', $e->getMessage()); + } finally { + $this->object->abort($dest); + } + } + public function testPartUploadRetry() { $source = __DIR__.'/../../resources/disk-a/large_file.mp4'; diff --git a/tests/Storage/Device/S3SlowDownTest.php b/tests/Storage/Device/S3SlowDownTest.php index 634c5b1f..e9c2705c 100644 --- a/tests/Storage/Device/S3SlowDownTest.php +++ b/tests/Storage/Device/S3SlowDownTest.php @@ -10,10 +10,46 @@ */ class TestableS3 extends S3 { + public array $calls = []; + + public string $completedBody = ''; + + private bool $objectExists = false; + public function exposedIsTransientError(int $statusCode, string $body): bool { return $this->isTransientError($statusCode, $body); } + + protected function call(string $operation, string $method, string $uri, string $data = '', array $parameters = [], bool $decode = true) + { + $this->calls[] = $operation; + + if ($operation === 's3:info') { + if (! $this->objectExists) { + throw new \Exception('Not found'); + } + + return (object) ['headers' => ['content-length' => 1], 'body' => '']; + } + + if ($operation === 's3:createMultipartUpload') { + return (object) ['headers' => [], 'body' => ['UploadId' => 'upload-123']]; + } + + if ($operation === 's3:uploadPart') { + return (object) ['headers' => ['etag' => 'etag-'.$parameters['partNumber']], 'body' => '']; + } + + if ($operation === 's3:completeMultipartUpload') { + $this->completedBody = $data; + $this->objectExists = true; + + return (object) ['headers' => [], 'body' => '']; + } + + return (object) ['headers' => [], 'body' => '']; + } } class S3SlowDownTest extends TestCase @@ -68,4 +104,77 @@ public function testDefaultRetrySettings(): void $this->assertSame(3, $prop('retryAttempts')); $this->assertSame(500, $prop('retryDelay')); } + + public function testPrepareUploadCreatesMultipartMetadata(): void + { + $metadata = []; + + $this->s3->prepareUpload('/root/file.txt', 'text/plain', 2, $metadata); + + $this->assertSame('upload-123', $metadata['uploadId']); + $this->assertSame([], $metadata['parts']); + $this->assertSame(0, $metadata['chunks']); + $this->assertSame(['s3:createMultipartUpload'], $this->s3->calls); + } + + public function testUploadChunkRecordsPartWithoutCompleting(): void + { + $metadata = []; + $source = __DIR__.'/s3-chunk.part'; + file_put_contents($source, 'aaa'); + + $this->s3->prepareUpload('/root/file.txt', 'text/plain', 2, $metadata); + $chunks = $this->s3->uploadChunk($source, '/root/file.txt', 1, 2, $metadata); + + $this->assertSame(1, $chunks); + $this->assertSame('etag-1', $metadata['parts'][1]); + $this->assertNotContains('s3:completeMultipartUpload', $this->s3->calls); + + unlink($source); + } + + public function testFinalizeUploadRequiresAllS3Parts(): void + { + $metadata = [ + 'uploadId' => 'upload-123', + 'parts' => [1 => 'etag-1'], + 'chunks' => 1, + ]; + + $this->expectException(\Exception::class); + $this->expectExceptionMessage('Missing chunk 2'); + $this->s3->finalizeUpload('/root/file.txt', 2, $metadata); + } + + public function testFinalizeUploadCompletesS3PartsInNumericOrder(): void + { + $metadata = [ + 'uploadId' => 'upload-123', + 'parts' => [ + 10 => 'etag-10', + 9 => 'etag-9', + 8 => 'etag-8', + 7 => 'etag-7', + 6 => 'etag-6', + 5 => 'etag-5', + 4 => 'etag-4', + 3 => 'etag-3', + 2 => 'etag-2', + 1 => 'etag-1', + ], + 'chunks' => 10, + ]; + + $this->assertTrue($this->s3->finalizeUpload('/root/file.txt', 10, $metadata)); + + $part1 = strpos($this->s3->completedBody, '1'); + $part2 = strpos($this->s3->completedBody, '2'); + $part10 = strpos($this->s3->completedBody, '10'); + + $this->assertNotFalse($part1); + $this->assertNotFalse($part2); + $this->assertNotFalse($part10); + $this->assertLessThan($part2, $part1); + $this->assertLessThan($part10, $part2); + } }