diff --git a/src/Storage/Device.php b/src/Storage/Device.php index 961339c1..b323ffd0 100644 --- a/src/Storage/Device.php +++ b/src/Storage/Device.php @@ -97,6 +97,33 @@ abstract public function getPath(string $filename, ?string $prefix = null): stri */ abstract public function upload(string $source, string $path, int $chunk = 1, int $chunks = 1, array &$metadata = []): int; + /** + * Prepare Upload. + * + * Initialize adapter-specific upload state without transferring a chunk body. + * + * @throws Exception + */ + abstract public function prepareUpload(string $path, string $contentType, int $chunks = 1, array &$metadata = []): void; + + /** + * Upload Chunk. + * + * Upload exactly one chunk without finalizing the full upload. + * + * @throws Exception + */ + abstract public function uploadChunk(string $source, string $path, int $chunk = 1, int $chunks = 1, array &$metadata = []): int; + + /** + * Finalize Upload. + * + * Complete a prepared upload once all chunks are known to be present. + * + * @throws Exception + */ + abstract public function finalizeUpload(string $path, int $chunks = 1, array &$metadata = []): bool; + /** * Upload Data. * diff --git a/src/Storage/Device/Local.php b/src/Storage/Device/Local.php index 23757936..b3550580 100644 --- a/src/Storage/Device/Local.php +++ b/src/Storage/Device/Local.php @@ -55,16 +55,37 @@ public function getPath(string $filename, ?string $prefix = null): string * @throws Exception */ public function upload(string $source, string $path, int $chunk = 1, int $chunks = 1, array &$metadata = []): int + { + $this->prepareUpload($path, '', $chunks, $metadata); + $chunksReceived = $this->uploadChunk($source, $path, $chunk, $chunks, $metadata); + + if ($chunks === $chunksReceived) { + $this->finalizeUpload($path, $chunks, $metadata); + } + + return $chunksReceived; + } + + public function prepareUpload(string $path, string $contentType, int $chunks = 1, array &$metadata = []): void { $this->createDirectory(\dirname($path)); + $metadata['parts'] ??= []; + $metadata['chunks'] ??= 0; + } + + public function uploadChunk(string $source, string $path, int $chunk = 1, int $chunks = 1, array &$metadata = []): int + { + $this->prepareUpload($path, '', $chunks, $metadata); - // move_uploaded_file() verifies the file is not tampered with if ($chunks === 1) { - if (! \move_uploaded_file($source, $path)) { + if (! \move_uploaded_file($source, $path) && ! \rename($source, $path)) { throw new Exception('Can\'t upload file '.$path); } - return $chunks; + $metadata['parts'][$chunk] = true; + $metadata['chunks'] = 1; + + return 1; } $tmp = \dirname($path).DIRECTORY_SEPARATOR.'tmp_'.\basename($path); @@ -84,14 +105,33 @@ public function upload(string $source, string $path, int $chunk = 1, int $chunks } $chunksReceived = $this->countChunks($tmp, $path); + $metadata['parts'][$chunk] = true; + $metadata['chunks'] = $chunksReceived; - if ($chunks === $chunksReceived) { - $this->joinChunks($path, $chunks); + return $chunksReceived; + } - return $chunksReceived; + public function finalizeUpload(string $path, int $chunks = 1, array &$metadata = []): bool + { + if (\file_exists($path)) { + return true; } - return $chunksReceived; + if ($chunks === 1) { + return false; + } + + $tmp = \dirname($path).DIRECTORY_SEPARATOR.'tmp_'.\basename($path); + for ($i = 1; $i <= $chunks; $i++) { + $part = $tmp.DIRECTORY_SEPARATOR.\pathinfo($path, PATHINFO_FILENAME).'.part.'.$i; + if (! \file_exists($part)) { + throw new Exception('Missing chunk '.$i); + } + } + + $this->joinChunks($path, $chunks); + + return true; } /** @@ -108,7 +148,7 @@ public function upload(string $source, string $path, int $chunk = 1, int $chunks */ public function uploadData(string $data, string $path, string $contentType, int $chunk = 1, int $chunks = 1, array &$metadata = []): int { - $this->createDirectory(\dirname($path)); + $this->prepareUpload($path, $contentType, $chunks, $metadata); if ($chunks === 1) { if (! \file_put_contents($path, $data)) { @@ -131,9 +171,11 @@ public function uploadData(string $data, string $path, string $contentType, int } $chunksReceived = $this->countChunks($tmp, $path); + $metadata['parts'][$chunk] = true; + $metadata['chunks'] = $chunksReceived; if ($chunks === $chunksReceived) { - $this->joinChunks($path, $chunks); + $this->finalizeUpload($path, $chunks, $metadata); return $chunksReceived; } diff --git a/src/Storage/Device/S3.php b/src/Storage/Device/S3.php index 9129ba7d..c84fc418 100644 --- a/src/Storage/Device/S3.php +++ b/src/Storage/Device/S3.php @@ -167,7 +167,64 @@ public static function setRetryDelay(int $delay): void */ public function upload(string $source, string $path, int $chunk = 1, int $chunks = 1, array &$metadata = []): int { - return $this->uploadData(\file_get_contents($source), $path, \mime_content_type($source), $chunk, $chunks, $metadata); + $contentType = \mime_content_type($source) ?: ''; + $this->prepareUpload($path, $contentType, $chunks, $metadata); + $chunksReceived = $this->uploadChunk($source, $path, $chunk, $chunks, $metadata); + + if ($chunks === $chunksReceived) { + $this->finalizeUpload($path, $chunks, $metadata); + } + + return $chunksReceived; + } + + public function prepareUpload(string $path, string $contentType, int $chunks = 1, array &$metadata = []): void + { + $metadata['parts'] ??= []; + $metadata['chunks'] ??= 0; + $metadata['content_type'] ??= $contentType; + + if ($chunks === 1 || ! empty($metadata['uploadId'])) { + return; + } + + $metadata['uploadId'] = $this->createMultipartUpload($path, $contentType); + } + + public function uploadChunk(string $source, string $path, int $chunk = 1, int $chunks = 1, array &$metadata = []): int + { + $data = \file_get_contents($source); + if ($data === false) { + throw new Exception('Can\'t read file '.$source); + } + + return $this->uploadChunkData($data, $path, $metadata['content_type'] ?? (\mime_content_type($source) ?: ''), $chunk, $chunks, $metadata); + } + + public function finalizeUpload(string $path, int $chunks = 1, array &$metadata = []): bool + { + if ($this->exists($path)) { + return true; + } + + if ($chunks === 1) { + return false; + } + + if (empty($metadata['uploadId'])) { + throw new Exception('Missing multipart upload ID'); + } + + $metadata['parts'] ??= []; + for ($i = 1; $i <= $chunks; $i++) { + if (! array_key_exists($i, $metadata['parts'])) { + throw new Exception('Missing chunk '.$i); + } + } + + $this->completeMultipartUpload($path, $metadata['uploadId'], $metadata['parts']); + + return true; } /** @@ -183,38 +240,40 @@ public function upload(string $source, string $path, int $chunk = 1, int $chunks * @throws Exception */ public function uploadData(string $data, string $path, string $contentType, int $chunk = 1, int $chunks = 1, array &$metadata = []): int + { + $this->prepareUpload($path, $contentType, $chunks, $metadata); + $chunksReceived = $this->uploadChunkData($data, $path, $contentType, $chunk, $chunks, $metadata); + + if ($chunks === $chunksReceived) { + $this->finalizeUpload($path, $chunks, $metadata); + } + + return $chunksReceived; + } + + private function uploadChunkData(string $data, string $path, string $contentType, int $chunk = 1, int $chunks = 1, array &$metadata = []): int { if ($chunk == 1 && $chunks == 1) { - return $this->write($path, $data, $contentType); + $this->write($path, $data, $contentType); + $metadata['parts'][$chunk] = true; + $metadata['chunks'] = 1; + + return 1; } - $uploadId = $metadata['uploadId'] ?? null; - if (empty($uploadId)) { - $uploadId = $this->createMultipartUpload($path, $contentType); - $metadata['uploadId'] = $uploadId; + + if (empty($metadata['uploadId'])) { + throw new Exception('Missing multipart upload ID'); } $metadata['parts'] ??= []; $metadata['chunks'] ??= 0; - $etag = $this->uploadPart($data, $path, $contentType, $chunk, $uploadId); + $etag = $this->uploadPart($data, $path, $contentType, $chunk, $metadata['uploadId']); // skip incrementing if the chunk was re-uploaded if (! array_key_exists($chunk, $metadata['parts'])) { $metadata['chunks']++; } $metadata['parts'][$chunk] = $etag; - if ($metadata['chunks'] == $chunks) { - $headers = $this->headers; - $amzHeaders = $this->amzHeaders; - - if ($this->exists($path)) { - return $metadata['chunks']; - } - - $this->headers = $headers; - $this->amzHeaders = $amzHeaders; - - $this->completeMultipartUpload($path, $uploadId, $metadata['parts']); - } return $metadata['chunks']; } @@ -307,7 +366,7 @@ protected function completeMultipartUpload(string $path, string $uploadId, array { $uri = $path !== '' ? '/'.\str_replace(['%2F', '%3F'], ['/', '?'], \rawurlencode($path)) : '/'; - \ksort($parts); + \ksort($parts, SORT_NUMERIC); $body = ''; foreach ($parts as $key => $etag) { diff --git a/src/Storage/Device/Telemetry.php b/src/Storage/Device/Telemetry.php index 206aea85..c285974c 100644 --- a/src/Storage/Device/Telemetry.php +++ b/src/Storage/Device/Telemetry.php @@ -64,6 +64,21 @@ public function upload(string $source, string $path, int $chunk = 1, int $chunks return $this->measure(__FUNCTION__, $source, $path, $chunk, $chunks, $metadata); } + public function prepareUpload(string $path, string $contentType, int $chunks = 1, array &$metadata = []): void + { + $this->measure(__FUNCTION__, $path, $contentType, $chunks, $metadata); + } + + public function uploadChunk(string $source, string $path, int $chunk = 1, int $chunks = 1, array &$metadata = []): int + { + return $this->measure(__FUNCTION__, $source, $path, $chunk, $chunks, $metadata); + } + + public function finalizeUpload(string $path, int $chunks = 1, array &$metadata = []): bool + { + return $this->measure(__FUNCTION__, $path, $chunks, $metadata); + } + public function uploadData(string $data, string $path, string $contentType, int $chunk = 1, int $chunks = 1, array &$metadata = []): int { return $this->measure(__FUNCTION__, $data, $path, $contentType, $chunk, $chunks, $metadata); diff --git a/tests/Storage/Device/LocalTest.php b/tests/Storage/Device/LocalTest.php index d141a6c2..0df27847 100644 --- a/tests/Storage/Device/LocalTest.php +++ b/tests/Storage/Device/LocalTest.php @@ -190,6 +190,51 @@ public function testPartUpload() return $dest; } + public function testUploadChunkDoesNotFinalizeUntilFinalizeUpload(): void + { + $dest = $this->object->getPath('chunked-phase-upload.txt'); + $metadata = []; + $parts = [ + 2 => 'bbb', + 1 => 'aaa', + 3 => 'ccc', + ]; + + foreach ($parts as $chunk => $data) { + $source = __DIR__.'/chunk-'.$chunk.'.part'; + file_put_contents($source, $data); + + $this->object->uploadChunk($source, $dest, $chunk, 3, $metadata); + $this->assertFalse($this->object->exists($dest)); + } + + $this->assertSame(3, $metadata['chunks']); + $this->assertTrue($this->object->finalizeUpload($dest, 3, $metadata)); + $this->assertSame('aaabbbccc', $this->object->read($dest)); + $this->assertTrue($this->object->finalizeUpload($dest, 3, $metadata)); + + $this->object->delete($dest); + } + + public function testFinalizeUploadRequiresAllLocalChunks(): void + { + $dest = $this->object->getPath('chunked-phase-missing.txt'); + $metadata = []; + $source = __DIR__.'/chunk-missing.part'; + file_put_contents($source, 'aaa'); + + $this->object->uploadChunk($source, $dest, 1, 2, $metadata); + + try { + $this->object->finalizeUpload($dest, 2, $metadata); + $this->fail('Expected missing chunk exception'); + } catch (\Exception $e) { + $this->assertSame('Missing chunk 2', $e->getMessage()); + } finally { + $this->object->abort($dest); + } + } + public function testPartUploadRetry() { $source = __DIR__.'/../../resources/disk-a/large_file.mp4'; diff --git a/tests/Storage/Device/S3SlowDownTest.php b/tests/Storage/Device/S3SlowDownTest.php index 634c5b1f..e9c2705c 100644 --- a/tests/Storage/Device/S3SlowDownTest.php +++ b/tests/Storage/Device/S3SlowDownTest.php @@ -10,10 +10,46 @@ */ class TestableS3 extends S3 { + public array $calls = []; + + public string $completedBody = ''; + + private bool $objectExists = false; + public function exposedIsTransientError(int $statusCode, string $body): bool { return $this->isTransientError($statusCode, $body); } + + protected function call(string $operation, string $method, string $uri, string $data = '', array $parameters = [], bool $decode = true) + { + $this->calls[] = $operation; + + if ($operation === 's3:info') { + if (! $this->objectExists) { + throw new \Exception('Not found'); + } + + return (object) ['headers' => ['content-length' => 1], 'body' => '']; + } + + if ($operation === 's3:createMultipartUpload') { + return (object) ['headers' => [], 'body' => ['UploadId' => 'upload-123']]; + } + + if ($operation === 's3:uploadPart') { + return (object) ['headers' => ['etag' => 'etag-'.$parameters['partNumber']], 'body' => '']; + } + + if ($operation === 's3:completeMultipartUpload') { + $this->completedBody = $data; + $this->objectExists = true; + + return (object) ['headers' => [], 'body' => '']; + } + + return (object) ['headers' => [], 'body' => '']; + } } class S3SlowDownTest extends TestCase @@ -68,4 +104,77 @@ public function testDefaultRetrySettings(): void $this->assertSame(3, $prop('retryAttempts')); $this->assertSame(500, $prop('retryDelay')); } + + public function testPrepareUploadCreatesMultipartMetadata(): void + { + $metadata = []; + + $this->s3->prepareUpload('/root/file.txt', 'text/plain', 2, $metadata); + + $this->assertSame('upload-123', $metadata['uploadId']); + $this->assertSame([], $metadata['parts']); + $this->assertSame(0, $metadata['chunks']); + $this->assertSame(['s3:createMultipartUpload'], $this->s3->calls); + } + + public function testUploadChunkRecordsPartWithoutCompleting(): void + { + $metadata = []; + $source = __DIR__.'/s3-chunk.part'; + file_put_contents($source, 'aaa'); + + $this->s3->prepareUpload('/root/file.txt', 'text/plain', 2, $metadata); + $chunks = $this->s3->uploadChunk($source, '/root/file.txt', 1, 2, $metadata); + + $this->assertSame(1, $chunks); + $this->assertSame('etag-1', $metadata['parts'][1]); + $this->assertNotContains('s3:completeMultipartUpload', $this->s3->calls); + + unlink($source); + } + + public function testFinalizeUploadRequiresAllS3Parts(): void + { + $metadata = [ + 'uploadId' => 'upload-123', + 'parts' => [1 => 'etag-1'], + 'chunks' => 1, + ]; + + $this->expectException(\Exception::class); + $this->expectExceptionMessage('Missing chunk 2'); + $this->s3->finalizeUpload('/root/file.txt', 2, $metadata); + } + + public function testFinalizeUploadCompletesS3PartsInNumericOrder(): void + { + $metadata = [ + 'uploadId' => 'upload-123', + 'parts' => [ + 10 => 'etag-10', + 9 => 'etag-9', + 8 => 'etag-8', + 7 => 'etag-7', + 6 => 'etag-6', + 5 => 'etag-5', + 4 => 'etag-4', + 3 => 'etag-3', + 2 => 'etag-2', + 1 => 'etag-1', + ], + 'chunks' => 10, + ]; + + $this->assertTrue($this->s3->finalizeUpload('/root/file.txt', 10, $metadata)); + + $part1 = strpos($this->s3->completedBody, '1'); + $part2 = strpos($this->s3->completedBody, '2'); + $part10 = strpos($this->s3->completedBody, '10'); + + $this->assertNotFalse($part1); + $this->assertNotFalse($part2); + $this->assertNotFalse($part10); + $this->assertLessThan($part2, $part1); + $this->assertLessThan($part10, $part2); + } }