diff --git a/Cargo.lock b/Cargo.lock index 41aef4f..d584710 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1736,7 +1736,7 @@ dependencies = [ [[package]] name = "fula-api" -version = "0.6.7" +version = "0.6.8" dependencies = [ "anyhow", "axum", @@ -1765,7 +1765,7 @@ dependencies = [ [[package]] name = "fula-blockstore" -version = "0.6.7" +version = "0.6.8" dependencies = [ "anyhow", "async-trait", @@ -1803,7 +1803,7 @@ dependencies = [ [[package]] name = "fula-cli" -version = "0.6.7" +version = "0.6.8" dependencies = [ "anyhow", "async-trait", @@ -1857,7 +1857,7 @@ dependencies = [ [[package]] name = "fula-client" -version = "0.6.7" +version = "0.6.8" dependencies = [ "anyhow", "async-trait", @@ -1899,7 +1899,7 @@ dependencies = [ [[package]] name = "fula-core" -version = "0.6.7" +version = "0.6.8" dependencies = [ "anyhow", "async-trait", @@ -1934,7 +1934,7 @@ dependencies = [ [[package]] name = "fula-crypto" -version = "0.6.7" +version = "0.6.8" dependencies = [ "aes-gcm", "anyhow", @@ -1979,7 +1979,7 @@ dependencies = [ [[package]] name = "fula-flutter" -version = "0.6.7" +version = "0.6.8" dependencies = [ "anyhow", "async-lock", @@ -2002,7 +2002,7 @@ dependencies = [ [[package]] name = "fula-js" -version = "0.6.7" +version = "0.6.8" dependencies = [ "base64 0.22.1", "bytes", diff --git a/Cargo.toml b/Cargo.toml index 5c7fd89..b836b7a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -79,7 +79,7 @@ name = "encrypted_upload_test" path = "examples/encrypted_upload_test.rs" [workspace.package] -version = "0.6.7" +version = "0.6.8" edition = "2021" license = "MIT OR Apache-2.0" repository = "https://github.com/functionland/fula-api" diff --git a/crates/fula-client/tests/issue_34_two_client_interop.rs b/crates/fula-client/tests/issue_34_two_client_interop.rs new file mode 100644 index 0000000..6f17628 --- /dev/null +++ b/crates/fula-client/tests/issue_34_two_client_interop.rs @@ -0,0 +1,259 @@ +//! Issue #34 — two-client interop guard for the sealed-flush fix. +//! +//! The #34 fix makes `flush_forest` write back a `Sealed` pointer state into +//! the in-memory HAMT after each successful node PUT, so flushes stop +//! re-uploading the whole ever-touched tree. This file proves the fix does +//! not change MULTI-CLIENT semantics: two clients (same user key, separate +//! devices) alternating writes against one stateful mock master that honors +//! conditional PUTs (`If-None-Match: *` / `If-Match`, 412 on mismatch — +//! exactly the master/S3 contract the production conflict arbiter relies on). +//! +//! The sequence under test is the report's exact worry — "user has two +//! clients and one writes and then the other one writes": +//! +//! 1. Client A writes 3 files via `put_object_flat` (flush per put → A's +//! in-memory forest is fully SEALED). +//! 2. Client B (fresh instance, same secret) writes 3 files — B loads A's +//! committed state from the master first, so its conditional flushes +//! succeed and advance the manifest/page ETags past A's cached view. +//! 3. Client A writes a 4th file FROM ITS NOW-STALE SEALED FOREST. A's +//! flush must hit 412 on the conditional PUT, evict its cache (sealed +//! state discarded wholesale), reload B's winning state, replay its WAL +//! (the 4th file's insert), and retry to success — the pre-existing +//! NEW-7.2 reconcile protocol, with sealing now in the mix. +//! 4. After the reconcile, A can read B's writes. +//! 5. A THIRD fresh client must read all 7 files byte-exactly and list +//! exactly 7 — nothing lost, nothing corrupted, no write clobbered. +//! +//! The test asserts the 412/backoff path actually fired (via the +//! `flush_backoff_count` metric), so a future change that accidentally makes +//! step 3 conflict-free cannot turn this into a vacuous pass. + +#![cfg(not(target_arch = "wasm32"))] + +use bytes::Bytes; +use cid::multihash::Multihash; +use cid::Cid; +use fula_client::{Config, EncryptedClient, EncryptionConfig}; +use fula_crypto::keys::SecretKey; +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; +use tempfile::TempDir; +use wiremock::matchers::method; +use wiremock::{Mock, MockServer, Request, Respond, ResponseTemplate}; + +// ───────────────────────────── stateful mock ──────────────────────────────── +// Same conditional-PUT mock as tests/dir_index_v8_wal_recovery_and_guard.rs +// (duplicated here because that file is feature-gated on test-fault-injection +// and this one intentionally runs in the default suite). + +fn blake3_raw_cid(data: &[u8]) -> Cid { + let h = blake3::hash(data); + let mh = Multihash::<64>::wrap(0x1e, h.as_bytes()).expect("blake3 multihash wrap"); + Cid::new_v1(0x55, mh) +} + +type Stash = Arc>>>; + +fn header_value(req: &Request, name: &str) -> Option { + req.headers + .iter() + .find(|(k, _)| k.as_str().eq_ignore_ascii_case(name)) + .and_then(|(_, v)| v.to_str().ok()) + .map(|s| s.to_string()) +} + +/// PUT that honors `If-None-Match: *` (412 if exists) and `If-Match: ` +/// (412 if the stored body's CID differs) — the S3/master contract. +struct PutResponder { + stash: Stash, +} +impl Respond for PutResponder { + fn respond(&self, req: &Request) -> ResponseTemplate { + let path = req.url.path().to_string(); + let body = req.body.clone(); + let mut s = self.stash.lock().unwrap(); + + if let Some(inm) = header_value(req, "If-None-Match") { + if inm.trim() == "*" && s.contains_key(&path) { + return ResponseTemplate::new(412); + } + } + if let Some(im) = header_value(req, "If-Match") { + let want = im.trim().trim_matches('"').to_string(); + let current = s.get(&path).map(|b| blake3_raw_cid(b).to_string()); + if current.as_deref() != Some(want.as_str()) { + return ResponseTemplate::new(412); + } + } + + let cid = blake3_raw_cid(&body); + s.insert(path, body); + ResponseTemplate::new(200).insert_header("ETag", cid.to_string()) + } +} + +struct GetResponder { + stash: Stash, +} +impl Respond for GetResponder { + fn respond(&self, req: &Request) -> ResponseTemplate { + let path = req.url.path().to_string(); + match self.stash.lock().unwrap().get(&path) { + Some(bytes) => ResponseTemplate::new(200) + .insert_header("ETag", blake3_raw_cid(bytes).to_string()) + .set_body_bytes(bytes.clone()), + None => ResponseTemplate::new(404), + } + } +} + +struct HeadResponder { + stash: Stash, +} +impl Respond for HeadResponder { + fn respond(&self, req: &Request) -> ResponseTemplate { + let path = req.url.path().to_string(); + match self.stash.lock().unwrap().get(&path) { + Some(bytes) => ResponseTemplate::new(200) + .insert_header("ETag", blake3_raw_cid(bytes).to_string()) + .insert_header("Content-Length", bytes.len().to_string()), + None => ResponseTemplate::new(404), + } + } +} + +async fn start_mock() -> (MockServer, Stash) { + let server = MockServer::start().await; + let stash: Stash = Arc::new(Mutex::new(HashMap::new())); + Mock::given(method("PUT")) + .respond_with(PutResponder { stash: stash.clone() }) + .mount(&server) + .await; + Mock::given(method("GET")) + .respond_with(GetResponder { stash: stash.clone() }) + .mount(&server) + .await; + Mock::given(method("HEAD")) + .respond_with(HeadResponder { stash: stash.clone() }) + .mount(&server) + .await; + (server, stash) +} + +fn build_client(uri: &str, cache: &std::path::Path, secret: SecretKey) -> EncryptedClient { + let mut config = Config::new(uri).with_token("test-jwt"); + config.walkable_v8_writer_enabled = true; + config.block_cache_enabled = true; + config.block_cache_path = Some(cache.to_path_buf()); + EncryptedClient::new(config, EncryptionConfig::from_secret_key(secret)) + .expect("EncryptedClient::new") +} + +fn body_for(name: &str) -> Vec { + format!("issue34-two-client-unique-payload::{name}").into_bytes() +} + +#[tokio::test] +async fn two_clients_alternating_writes_compose_without_loss() { + // Isolate the WAL/state dir from the developer machine. Both clients + // share it (per-bucket WAL files), which is harmless for this sequential + // alternation: every clean flush clears the bucket's WAL. + let state = TempDir::new().expect("state dir"); + std::env::set_var("FULA_STATE_DIR", state.path()); + + let (server, _stash) = start_mock().await; + let secret = SecretKey::generate(); + let bucket = "issue34-two-client"; + + let cache_a = TempDir::new().unwrap(); + let cache_b = TempDir::new().unwrap(); + let cache_c = TempDir::new().unwrap(); + let client_a = build_client(&server.uri(), &cache_a.path().join("a.redb"), secret.clone()); + let client_b = build_client(&server.uri(), &cache_b.path().join("b.redb"), secret.clone()); + + // 1) Client A: three immediate-save puts. Each one flushes, so A's + // in-memory forest ends fully sealed. + for name in ["a-1", "a-2", "a-3"] { + client_a + .put_object_flat( + bucket, + &format!("/docs/{name}.txt"), + Bytes::from(body_for(name)), + None, + ) + .await + .unwrap_or_else(|e| panic!("client A put {name}: {e:?}")); + } + + // 2) Client B: loads A's committed state cold, then writes three files. + // B's conditional flushes advance the manifest/page ETags past A's + // cached (sealed) view. + for name in ["b-1", "b-2", "b-3"] { + client_b + .put_object_flat( + bucket, + &format!("/docs/{name}.txt"), + Bytes::from(body_for(name)), + None, + ) + .await + .unwrap_or_else(|e| panic!("client B put {name}: {e:?}")); + } + + // 3) Client A again, from its now-STALE sealed forest. The flush must + // 412, evict the sealed state, reload B's winning state, replay the + // WAL insert, and retry to success. + let backoffs_before = fula_client::flush_backoff_count(); + client_a + .put_object_flat( + bucket, + "/docs/a-4.txt", + Bytes::from(body_for("a-4")), + None, + ) + .await + .expect("client A's stale-forest put must reconcile via 412-retry, not fail"); + let backoffs_after = fula_client::flush_backoff_count(); + assert!( + backoffs_after > backoffs_before, + "the two-client conflict must actually exercise the 412/backoff \ + reconcile path (backoff count {} -> {}); if this stops firing the \ + test has gone vacuous — investigate why A's stale ETags no longer \ + conflict", + backoffs_before, + backoffs_after + ); + + // 4) Post-reconcile, A's reloaded view contains B's writes. + let b1_via_a = client_a + .get_object_flat(bucket, "/docs/b-1.txt") + .await + .expect("client A must read B's write after reconcile"); + assert_eq!(b1_via_a.to_vec(), body_for("b-1")); + + // 5) A third fresh client reads EVERYTHING byte-exactly and the forest + // lists exactly the 7 files — no lost update, no corruption. + let client_c = build_client(&server.uri(), &cache_c.path().join("c.redb"), secret.clone()); + for name in ["a-1", "a-2", "a-3", "a-4", "b-1", "b-2", "b-3"] { + let got = client_c + .get_object_flat(bucket, &format!("/docs/{name}.txt")) + .await + .unwrap_or_else(|e| panic!("client C get {name}: {e:?}")); + assert_eq!( + got.to_vec(), + body_for(name), + "{name} must round-trip byte-exactly through the two-client dance" + ); + } + let listing = client_c + .list_files_from_forest(bucket) + .await + .expect("client C forest listing"); + assert_eq!( + listing.len(), + 7, + "forest must list exactly the 7 files written by A and B, got: {:?}", + listing.iter().map(|f| f.original_key.clone()).collect::>() + ); +} diff --git a/crates/fula-crypto/src/sharded_hamt_forest.rs b/crates/fula-crypto/src/sharded_hamt_forest.rs index 3566b12..f4115db 100644 --- a/crates/fula-crypto/src/sharded_hamt_forest.rs +++ b/crates/fula-crypto/src/sharded_hamt_forest.rs @@ -2085,9 +2085,18 @@ impl ShardedHamtPrivateForest { /// mutated nodes while untouched subtree ciphertexts stay readable. /// * If a shard's in-memory HAMT is empty, the root is recorded as /// `None` and no nodes are written. - /// * Failures leave partial state: the shard whose `Node::store` fails - /// keeps its pre-flush root pointer but its `seq` has already been - /// bumped. Callers are expected to treat flush failure as + /// * Issue #34: node persistence goes through `Node::store_sealing`, + /// which downgrades persisted `InMemory` children to + /// `ChildPtr::Sealed` in place. Consequence: a flush PUTs only the + /// nodes mutated since the previous flush (plus the shard root) — + /// NOT the whole ever-touched tree — so per-flush cost is O(path) + /// instead of O(bucket size). Persisted bytes are identical to the + /// pre-sealing writer's (same plaintexts, same content addresses). + /// * Failures leave partial state: the shard whose `store_sealing` + /// fails keeps its pre-flush root pointer but its `seq` has already + /// been bumped, and children persisted before the failure keep + /// their seal (their PUTs succeeded; content-addressed references + /// stay valid). Callers are expected to treat flush failure as /// non-recoverable and drop the in-memory forest rather than /// continuing to mutate it. pub async fn flush_dirty( @@ -2129,8 +2138,18 @@ impl ShardedHamtPrivateForest { // pair atomically (next-flush logic only updates a shard's // `root` + `root_cid` when its dirty flag is set). let (new_root, new_root_cid) = { - let guard = self.loaded_shards[idx].read().await; - match &*guard { + // Issue #34: `write()` (not `read()`) because the sealing + // store mutates pointers in place — every InMemory child + // that persists is downgraded to `ChildPtr::Sealed`, so + // the NEXT flush of this shard re-PUTs only the path + // mutated since this one (plus the root) instead of the + // whole ever-touched tree. Writers already serialize on + // the outer forest `&mut self`, so taking the shard's + // write lock here cannot deadlock with a concurrent + // writer; readers briefly block exactly as they would on + // a concurrent upsert. + let mut guard = self.loaded_shards[idx].write().await; + match &mut *guard { LoadedShard::NotLoaded => { return Err(CryptoError::Hamt(format!( "shard {} marked dirty but NotLoaded — internal invariant violation", @@ -2142,7 +2161,7 @@ impl ShardedHamtPrivateForest { if node.is_empty() { (None, None) } else { - let result = node.store(&store).await?; + let result = node.store_sealing(&store).await?; (Some(result.storage_key), result.cid) } } @@ -5391,4 +5410,477 @@ mod tests { extracted.directories.keys().collect::>() ); } + + // ===================================================================== + // Issue #34 — per-upload cost grows with bucket size (forest write + // amplification). FIXED — these are the inverted regression guards. + // + // Pre-fix mechanism (pinned at commit 2ed7fe0, measured: flush #150 + // re-PUT 17 nodes / 161.7 KB; 150 × 4 KB files cost 1,630 node PUTs / + // 12.2 MB of index uploads): `Node::set_value` marks every node on a + // mutated path `ChildPtr::InMemory`; `Node::store(&self)` re-PUT every + // InMemory node but could never downgrade them, so in a long-lived + // session EVERY flush re-uploaded the whole ever-touched subtree — + // O(N²) total for the FxFiles pattern (N × `put_object_flat` = + // N × (upsert + flush) into one directory = one hot shard). + // + // The fix: `flush_dirty` persists via `Node::store_sealing`, which + // write-backs persisted children as `ChildPtr::Sealed` (resident for + // reads, reference-only for stores). Per-flush cost is now O(mutated + // path), independent of bucket size, and persisted bytes are + // byte-identical to the legacy writer's (see + // `node::round_trip_tests::issue_34_sealing_and_legacy_store_produce_ + // identical_roots`). + // ===================================================================== + + /// PUT-call + PUT-byte counting backend, local to the #34 tests + /// (`CountingBlobBackend` counts ops but not payload bytes, and byte + /// growth is half the #34 story). + struct Issue34CountingBackend { + inner: InMemoryBackend, + puts: std::sync::atomic::AtomicU64, + put_bytes: std::sync::atomic::AtomicU64, + } + + impl Issue34CountingBackend { + fn new() -> Self { + Self { + inner: InMemoryBackend::new(), + puts: std::sync::atomic::AtomicU64::new(0), + put_bytes: std::sync::atomic::AtomicU64::new(0), + } + } + + /// `(total PUT calls, total PUT payload bytes)` so far. + fn snapshot(&self) -> (u64, u64) { + use std::sync::atomic::Ordering::Relaxed; + (self.puts.load(Relaxed), self.put_bytes.load(Relaxed)) + } + } + + #[async_trait::async_trait] + impl crate::wnfs_hamt::v7_store::BlobBackend for Issue34CountingBackend { + async fn get(&self, path: &str) -> Result> { + self.inner.get(path).await + } + + async fn put( + &self, + path: &str, + bytes: Vec, + ) -> Result { + use std::sync::atomic::Ordering::Relaxed; + self.puts.fetch_add(1, Relaxed); + self.put_bytes.fetch_add(bytes.len() as u64, Relaxed); + self.inner.put(path, bytes).await + } + } + + /// Entry weighted like production: `put_object_flat` stashes the + /// `x-fula-encryption` JSON (wrapped DEK + nonce + encrypted private + /// metadata, ~0.9 KB) into `user_metadata` before the upsert + /// (`fula-client/src/encryption.rs`), so each forest entry carries + /// ~1 KB. Per-node payloads — and therefore the re-upload bytes this + /// test measures — scale with that weight; using it keeps the numbers + /// representative of the FxFiles repro rather than artificially small. + fn issue34_entry(path: &str) -> ForestFileEntry { + let mut e = file_entry(path, 4096); + e.user_metadata.insert( + "x-fula-encryption".to_string(), + "x".repeat(900), + ); + e + } + + /// Issue #34 FIXED, guard 1: the marginal flush cost of + /// `upsert + flush_dirty` (= what every `put_object_flat` call does) + /// stays FLAT as the bucket fills — bounded PUT calls per flush + /// regardless of object count — and the full data set survives 150 + /// sealed flushes intact (read back through a fresh forest). + /// + /// Pre-fix (pinned at 2ed7fe0): per-flush PUTs grew 1 → 17 and + /// per-flush bytes grew O(N) (~1.08 KB × bucket size; 161.7 KB at + /// flush #150), totalling 1,630 PUTs / 12.2 MB for 150 × 4 KB files. + /// Post-fix: a flush re-PUTs the shard root plus the path(s) mutated + /// by that one upsert (the `F:` file entry and the `D:` parent-dir + /// entry), so the count is bounded by tree DEPTH, not tree size. + #[tokio::test] + async fn issue_34_per_flush_put_cost_stays_flat_as_bucket_fills() { + const N: usize = 150; + // Pin the shard salt so the hot-shard index is stable run-to-run. + // (The HAMT shape itself is salt-independent — node hashing is + // `H::hash(key)` over fixed path strings — so the counts below are + // deterministic either way; the pinned salt also stabilizes which + // shard the directory routes to.) + const FIXED_SALT: [u8; 32] = [0x34; 32]; + + let backend = Arc::new(Issue34CountingBackend::new()); + let mut manifest = crate::private_forest::ShardManifestV7::new(16); + manifest.root.shard_salt = FIXED_SALT.to_vec(); + let mut forest = + ShardedHamtPrivateForest::from_manifest(manifest, "documents-v8", test_dek()); + + let mut puts_per_flush = Vec::with_capacity(N); + let mut bytes_per_flush = Vec::with_capacity(N); + for i in 0..N { + forest + .upsert_file( + issue34_entry(&format!("/e2e/perf/file-{:03}.bin", i)), + &backend, + ) + .await + .unwrap(); + let (p0, b0) = backend.snapshot(); + forest.flush_dirty(&backend).await.unwrap(); + let (p1, b1) = backend.snapshot(); + puts_per_flush.push(p1 - p0); + bytes_per_flush.push(b1 - b0); + } + + let (total_puts, total_bytes) = backend.snapshot(); + eprintln!( + "\n[#34 FIXED] one directory, one hot shard, flush after every \ + upsert (the `put_object_flat` pattern):" + ); + eprintln!(" upload # | HAMT-node PUTs in that flush | bytes in that flush"); + for &n in &[1usize, 10, 25, 50, 75, 100, 125, 150] { + eprintln!( + " {:>8} | {:>28} | {:>19}", + n, + puts_per_flush[n - 1], + bytes_per_flush[n - 1] + ); + } + eprintln!( + " TOTAL for {} files of 4 KB: {} node PUTs, {:.1} MB uploaded \ + (index alone, object bodies excluded; pre-fix: 1630 PUTs, 12.2 MB)", + N, + total_puts, + total_bytes as f64 / 1.0e6 + ); + + // (1) FLAT marginal PUT count: every steady-state flush is bounded + // by tree depth (root + the F:/D: paths of one upsert), never by + // bucket size. Bound of 6 = root + 2 paths × depth 2 + slack for a + // bucket-split flush; pre-fix flush #150 alone was 17 and growing + // with N. + let max_steady_puts = puts_per_flush[14..].iter().copied().max().unwrap(); + assert!( + max_steady_puts <= 6, + "issue #34 regression: a steady-state flush PUT {} nodes — \ + per-flush cost must be bounded by tree depth (≤6), not bucket \ + size. The write amplification is back; check that flush_dirty \ + still seals via store_sealing and that mutations don't unseal \ + untouched siblings.", + max_steady_puts + ); + + // (2) NO growth trend: the last 10 flushes must not PUT more than + // the flushes right after the root split settles (15-24). Pre-fix + // the late window was ~10× the early one. + let mid_puts: u64 = puts_per_flush[14..24].iter().sum(); + let late_puts: u64 = puts_per_flush[N - 10..].iter().sum(); + assert!( + late_puts <= mid_puts + 10, + "issue #34 regression: per-flush PUT count is growing with \ + bucket size again (flushes 15-24: {} total; last 10: {} total).", + mid_puts, + late_puts + ); + + // (3) Per-flush BYTES are bounded by node sizes on the mutated + // path, not by bucket size. The largest steady-state flush carries + // the root + two ~10-entry children (~1 KB metadata each) ≈ 25 KB; + // pre-fix flush #150 carried 161.7 KB and grew ~1.08 KB per object. + let max_steady_bytes = bytes_per_flush[14..].iter().copied().max().unwrap(); + assert!( + max_steady_bytes <= 60_000, + "issue #34 regression: a steady-state flush uploaded {} bytes — \ + per-flush bytes must be bounded by the mutated path's node \ + sizes (≲25 KB here), not by bucket size.", + max_steady_bytes + ); + + // (4) INTEGRITY — the fix must not lose or corrupt anything. A + // fresh forest over the final manifest (cold load path, exactly + // what another device or an app restart sees) must read every one + // of the 150 files and list the directory completely. + let final_manifest = forest.flush_dirty(&backend).await.unwrap().clone(); + let mut fresh = ShardedHamtPrivateForest::from_manifest( + final_manifest, + "documents-v8", + test_dek(), + ); + for i in 0..N { + let path = format!("/e2e/perf/file-{:03}.bin", i); + let got = fresh.get_file(&path, &backend).await.unwrap(); + let entry = got.unwrap_or_else(|| { + panic!("file {} lost after 150 sealed flushes", path) + }); + assert_eq!(entry.size, 4096, "entry metadata must survive sealing"); + assert_eq!( + entry.user_metadata.get("x-fula-encryption").map(String::len), + Some(900), + "user_metadata must survive sealing" + ); + } + let listing = fresh.list_directory("/e2e/perf", &backend).await.unwrap(); + assert_eq!( + listing.len(), + N, + "directory listing must surface all {} files after sealed flushes", + N + ); + } + + /// Issue #34 FIXED, guard 2: a long-lived session's marginal flush + /// costs the same as a fresh session's over identical data. + /// + /// Pre-fix (pinned at 2ed7fe0) the long-lived session re-PUT the whole + /// ever-touched tree (17 nodes) where a fresh forest loaded from the + /// same manifest re-PUT only the touched path (3 nodes) — the gap WAS + /// the never-downgraded `InMemory` set. Sealing closes it: after every + /// flush the session tree is reference-equivalent to a freshly-loaded + /// one (resident children are `Sealed`, which store paths treat + /// exactly like `Stored`). + #[tokio::test] + async fn issue_34_long_session_marginal_flush_matches_fresh_session() { + const N: usize = 150; + const FIXED_SALT: [u8; 32] = [0x34; 32]; + + let backend = Arc::new(Issue34CountingBackend::new()); + let mut manifest = crate::private_forest::ShardManifestV7::new(16); + manifest.root.shard_salt = FIXED_SALT.to_vec(); + let mut forest = + ShardedHamtPrivateForest::from_manifest(manifest, "documents-v8", test_dek()); + + // The long-lived session: N × (upsert + flush), like FxFiles. + for i in 0..N { + forest + .upsert_file( + issue34_entry(&format!("/e2e/perf/file-{:03}.bin", i)), + &backend, + ) + .await + .unwrap(); + forest.flush_dirty(&backend).await.unwrap(); + } + + // Marginal cost of upload #151 in the long-lived session. + forest + .upsert_file(issue34_entry("/e2e/perf/file-150.bin"), &backend) + .await + .unwrap(); + let (p0, _) = backend.snapshot(); + let manifest_after = forest.flush_dirty(&backend).await.unwrap().clone(); + let (p1, _) = backend.snapshot(); + let long_session_puts = p1 - p0; + + // Fresh session over the SAME persisted state: from_manifest is the + // production load path, so every shard-root pointer deserializes as + // `Stored`/`StoredV2` and only the genuinely mutated path goes + // `InMemory` on the next write. + let mut fresh = ShardedHamtPrivateForest::from_manifest( + manifest_after, + "documents-v8", + test_dek(), + ); + fresh + .upsert_file(issue34_entry("/e2e/perf/file-151.bin"), &backend) + .await + .unwrap(); + let (q0, _) = backend.snapshot(); + fresh.flush_dirty(&backend).await.unwrap(); + let (q1, _) = backend.snapshot(); + let fresh_session_puts = q1 - q0; + + eprintln!( + "\n[#34 FIXED] marginal flush after 150 uploads — long-lived \ + session: {} node PUTs; fresh session (same data, loaded from \ + manifest): {} node PUTs", + long_session_puts, fresh_session_puts + ); + + assert!( + fresh_session_puts >= 1, + "fresh session must persist at least the touched path" + ); + // Identical workload, identical persisted state → the sealed + // long-lived session must pay the same marginal cost as a cold + // session (±2 nodes of slack for path-shape differences between + // the two inserted keys). Pre-fix the gap was 17 vs 3. + assert!( + long_session_puts <= fresh_session_puts + 2, + "issue #34 regression: the long-lived session's marginal flush \ + re-PUT {} nodes vs {} for a fresh session over the same data — \ + the never-downgraded InMemory set is accumulating again.", + long_session_puts, + fresh_session_puts + ); + + // Cross-session composition (two-client shape, sequential): the + // long-lived session wrote files 000-150, the second session + // (loaded from the first's manifest) wrote file-151. A THIRD cold + // session over the final manifest must see writes from BOTH + // sessions — proving sealed flushes from different sessions + // compose on shared storage with no loss. + let manifest_final = fresh.flush_dirty(&backend).await.unwrap().clone(); + let mut third = ShardedHamtPrivateForest::from_manifest( + manifest_final, + "documents-v8", + test_dek(), + ); + for path in [ + "/e2e/perf/file-000.bin", // session 1, first write + "/e2e/perf/file-149.bin", // session 1, last bulk write + "/e2e/perf/file-150.bin", // session 1, marginal write + "/e2e/perf/file-151.bin", // session 2's write + ] { + assert!( + third.get_file(path, &backend).await.unwrap().is_some(), + "third session must read {} written across two prior sessions", + path + ); + } + let listing = third.list_directory("/e2e/perf", &backend).await.unwrap(); + assert_eq!( + listing.len(), + N + 2, + "third session must list all files from both prior sessions" + ); + } + + /// Issue #34 — aliasing guarantee: an in-flight reader holding the + /// shard-root `Arc` from BEFORE a flush is completely unaffected by the + /// sealing write-back (its view stays the pre-seal tree; the transient + /// placeholder used inside `store_sealing` is never observable). + /// `Arc::make_mut` clones any node whose refcount > 1, so the sealer + /// mutates a private copy — this test pins that copy-on-write contract + /// for the sealing path specifically. + #[tokio::test] + async fn issue_34_in_flight_reader_arc_unaffected_by_sealing() { + const N: usize = 30; + const FIXED_SALT: [u8; 32] = [0x34; 32]; + + let backend = Arc::new(Issue34CountingBackend::new()); + let mut manifest = crate::private_forest::ShardManifestV7::new(16); + manifest.root.shard_salt = FIXED_SALT.to_vec(); + let mut forest = + ShardedHamtPrivateForest::from_manifest(manifest, "documents-v8", test_dek()); + + for i in 0..N { + forest + .upsert_file( + issue34_entry(&format!("/e2e/perf/file-{:03}.bin", i)), + &backend, + ) + .await + .unwrap(); + } + + // Simulate an in-flight reader: grab the hot shard's root Arc the + // way resolve paths do, BEFORE the flush. + let idx = forest.shard_for_file("/e2e/perf/file-000.bin"); + let pre_seal_root: Arc = { + let guard = forest.loaded_shards[idx].read().await; + match &*guard { + LoadedShard::Loaded(node) => Arc::clone(node), + other => panic!("hot shard must be Loaded, got {:?}", std::mem::discriminant(other)), + } + }; + + // Flush — seals the live tree (cloning shared nodes via make_mut). + forest.flush_dirty(&backend).await.unwrap(); + + // The reader's pre-seal view must still resolve every entry — no + // placeholder, no missing child, values intact. + let reader = forest.reader_store_for(idx, &backend); + for i in 0..N { + let key = file_key(&format!("/e2e/perf/file-{:03}.bin", i)); + let got = pre_seal_root.get(&key, &reader).await.unwrap(); + assert!( + got.is_some(), + "in-flight reader lost entry {} during sealing", + i + ); + } + + // And the post-seal live tree reads the same set. + for i in 0..N { + let path = format!("/e2e/perf/file-{:03}.bin", i); + assert!( + forest.get_file(&path, &backend).await.unwrap().is_some(), + "post-seal tree lost entry {}", + path + ); + } + } + + /// Issue #34 — deletion over a SEALED tree: `remove_file` resolves + /// sealed children (in-memory, zero fetches), unseals exactly the + /// touched path, and the following flush persists a tree from which a + /// cold reader sees the deletion and ALL surviving files. Exercises + /// `remove_value` + `canonicalize` over `ChildPtr::Sealed`, which the + /// upsert-only guards above never reach. + #[tokio::test] + async fn issue_34_delete_after_sealed_flushes_preserves_remaining_files() { + const N: usize = 40; + const FIXED_SALT: [u8; 32] = [0x34; 32]; + + let backend = Arc::new(Issue34CountingBackend::new()); + let mut manifest = crate::private_forest::ShardManifestV7::new(16); + manifest.root.shard_salt = FIXED_SALT.to_vec(); + let mut forest = + ShardedHamtPrivateForest::from_manifest(manifest, "documents-v8", test_dek()); + + // Seed with per-file flushes so the tree is fully sealed. + for i in 0..N { + forest + .upsert_file( + issue34_entry(&format!("/e2e/perf/file-{:03}.bin", i)), + &backend, + ) + .await + .unwrap(); + forest.flush_dirty(&backend).await.unwrap(); + } + + // Delete one file from the sealed tree and flush. + let victim = "/e2e/perf/file-013.bin"; + let removed = forest.remove_file(victim, &backend).await.unwrap(); + assert!(removed.is_some(), "remove_file must find the sealed entry"); + let (p0, _) = backend.snapshot(); + let manifest_after = forest.flush_dirty(&backend).await.unwrap().clone(); + let (p1, _) = backend.snapshot(); + let delete_flush_puts = p1 - p0; + assert!( + delete_flush_puts <= 6, + "deletion flush must also be path-bounded (got {} PUTs)", + delete_flush_puts + ); + + // Cold reader: the victim is gone, all 39 others fully intact. + let mut fresh = ShardedHamtPrivateForest::from_manifest( + manifest_after, + "documents-v8", + test_dek(), + ); + assert!( + fresh.get_file(victim, &backend).await.unwrap().is_none(), + "deleted file must not resurface after a sealed flush" + ); + for i in 0..N { + let path = format!("/e2e/perf/file-{:03}.bin", i); + if path == victim { + continue; + } + assert!( + fresh.get_file(&path, &backend).await.unwrap().is_some(), + "file {} must survive a sealed delete-flush", + path + ); + } + let listing = fresh.list_directory("/e2e/perf", &backend).await.unwrap(); + assert_eq!(listing.len(), N - 1, "listing must reflect exactly one deletion"); + } } diff --git a/crates/fula-crypto/src/wnfs_hamt/node.rs b/crates/fula-crypto/src/wnfs_hamt/node.rs index ddb647f..42cafa5 100644 --- a/crates/fula-crypto/src/wnfs_hamt/node.rs +++ b/crates/fula-crypto/src/wnfs_hamt/node.rs @@ -166,6 +166,126 @@ where store.put_node(bytes).await } + /// Issue #34 — like [`store`](Self::store), but with WRITE-BACK: every + /// `InMemory` child that persists successfully is downgraded in place + /// to `ChildPtr::Sealed { storage_key, cid, node }`, so the NEXT store + /// of this tree emits the recorded reference instead of re-uploading + /// the unchanged subtree. + /// + /// Why this exists: `store(&self)` cannot record that a child was + /// persisted, so in a long-lived session the `InMemory` set grows + /// monotonically (every mutation path stays "dirty" forever) and every + /// flush re-serializes, re-encrypts, and re-PUTs the whole ever-touched + /// tree — O(N²) total upload cost for N sequential single-file puts + /// (issue #34). With sealing, a flush PUTs exactly the nodes mutated + /// since the previous flush plus this root: O(depth) per flush. + /// + /// Persisted bytes are IDENTICAL to `store`'s: a sealed child re-emits + /// the same `PointerWire::Link`/`LinkV2` its original PUT produced + /// (same plaintext → same content-addressed storage_key), so readers — + /// including pre-#34 SDKs — cannot distinguish the two writers. Pinned + /// by `issue_34_sealing_and_legacy_store_produce_identical_roots`. + /// + /// Error handling: if a child's recursive store fails, that child is + /// restored to `InMemory` and the error propagates — the in-memory + /// tree stays fully intact and a retry re-persists exactly the + /// still-unsealed remainder. Children that already sealed earlier in + /// the walk keep their seal (their PUTs succeeded; the references are + /// durable and content-addressed). + /// + /// Takes `self: &mut Arc` (same copy-on-write receiver as + /// [`set`](Self::set)) because the seal mutates pointers in place; + /// recursion is hand-boxed like `set_value` since `async_recursion` + /// and arbitrary-self receivers don't mix. + pub fn store_sealing<'a>( + self: &'a mut Arc, + store: &'a (impl HamtNodeStore + ?Sized), + ) -> AsyncBoxFut<'a, Result> + where + K: 'a, + V: 'a, + { + Box::pin(async move { + let node = Arc::make_mut(self); + let mut wire_pointers = Vec::with_capacity(node.pointers.len()); + for p in node.pointers.iter_mut() { + let wire = match p { + Pointer::Values(pairs) => PointerWire::Values( + pairs + .iter() + .map(|pr| (pr.key.clone(), pr.value.clone())) + .collect(), + ), + Pointer::Link(ChildPtr::Stored(key)) => PointerWire::Link(*key), + Pointer::Link(ChildPtr::StoredV2 { storage_key, cid }) => { + PointerWire::LinkV2 { + storage_key: *storage_key, + cid: cid.clone(), + } + } + // Already persisted by a prior sealing pass and not + // mutated since (a mutation would have re-attached the + // subtree as InMemory): emit the recorded reference, + // zero I/O. + Pointer::Link(ChildPtr::Sealed { storage_key, cid, .. }) => match cid { + Some(cid) => PointerWire::LinkV2 { + storage_key: *storage_key, + cid: cid.clone(), + }, + None => PointerWire::Link(*storage_key), + }, + Pointer::Link(child @ ChildPtr::InMemory(_)) => { + // Take the Arc out so the recursive seal can run + // with refcount-preserving `&mut Arc` semantics + // (a transient placeholder sits in the slot only + // across this block — both exits below overwrite + // it before anything else can observe the tree). + let taken = std::mem::replace( + child, + ChildPtr::Stored([0u8; super::store::STORAGE_KEY_LEN]), + ); + let ChildPtr::InMemory(mut child_arc) = taken else { + unreachable!("match arm guarantees InMemory"); + }; + match Node::::store_sealing(&mut child_arc, store).await { + Ok(result) => { + let wire = match result.cid { + Some(ref cid) => PointerWire::LinkV2 { + storage_key: result.storage_key, + cid: cid.clone(), + }, + None => PointerWire::Link(result.storage_key), + }; + *child = ChildPtr::Sealed { + storage_key: result.storage_key, + cid: result.cid, + node: child_arc, + }; + wire + } + Err(e) => { + // Restore the child so the in-memory tree + // is never left pointing at the + // placeholder; the caller can retry the + // flush and re-persist what's left. + *child = ChildPtr::InMemory(child_arc); + return Err(e); + } + } + } + }; + wire_pointers.push(wire); + } + let wire = NodeWire { + bitmask: node.bitmask, + pointers: wire_pointers, + }; + let bytes = postcard::to_allocvec(&wire) + .map_err(|e| CryptoError::Serialization(format!("encode hamt node: {e}")))?; + store.put_node(bytes).await + }) + } + //---------------------------------------------------------------------------------------------- // Iteration //---------------------------------------------------------------------------------------------- @@ -854,4 +974,267 @@ mod round_trip_tests { distinct_orders.len() ); } + + // ===================================================================== + // Issue #34 — write-amplification regression guards. + // + // History: before the fix, `store(&self)` persisted every `InMemory` + // child but could never downgrade it, so the InMemory set grew + // monotonically and every flush re-PUT the ENTIRE ever-touched tree — + // O(N²) total upload cost for N sequential puts (pinned at commit + // 74ec299 / 2ed7fe0: two consecutive stores of an unchanged 64-entry + // tree each re-PUT all ~12 nodes). The fix is `store_sealing`, which + // write-backs persisted children as `ChildPtr::Sealed`. These tests + // are the inverted pins plus the integrity guarantees of the fix. + // ===================================================================== + + use std::sync::atomic::{AtomicUsize, Ordering}; + + struct PutCountingStore { + inner: InMemoryStore, + puts: AtomicUsize, + /// When `Some(n)`, the n-th put_node call (1-based) fails once. + fail_on_put: std::sync::Mutex>, + } + + impl PutCountingStore { + fn new() -> Self { + Self { + inner: InMemoryStore::new(), + puts: AtomicUsize::new(0), + fail_on_put: std::sync::Mutex::new(None), + } + } + + fn puts(&self) -> usize { + self.puts.load(Ordering::Relaxed) + } + } + + #[async_trait::async_trait] + impl HamtNodeStore for PutCountingStore { + async fn get_node(&self, key: &StorageKey) -> Result { + self.inner.get_node(key).await + } + async fn put_node(&self, bytes: HamtNodeBytes) -> Result { + let n = self.puts.fetch_add(1, Ordering::Relaxed) + 1; + let should_fail = { + let mut guard = self.fail_on_put.lock().unwrap(); + if *guard == Some(n) { + *guard = None; // fail exactly once + true + } else { + false + } + }; + if should_fail { + return Err(CryptoError::Hamt(format!( + "injected put_node failure on call {}", + n + ))); + } + self.inner.put_node(bytes).await + } + } + + /// Build the deterministic 64-entry test tree (fixed keys → fixed + /// BLAKE3-driven shape; most root slots split past + /// HAMT_VALUES_BUCKET_SIZE = 3, producing a multi-node tree). + async fn build_issue34_tree(store: &PutCountingStore) -> Arc { + let mut root: Arc = Arc::new(TestNode::default()); + for i in 0u64..64 { + let k = format!("issue34-key-{:04}", i).into_bytes(); + root.set(k, i, store).await.unwrap(); + } + root + } + + /// Assert every entry of the 64-entry test tree reads back correctly + /// through `node` (resolving Sealed/Stored children as needed), with + /// `overridden` taking precedence for mutated keys. + async fn assert_issue34_tree_complete( + node: &TestNode, + store: &PutCountingStore, + overridden: &[(u64, u64)], + ) { + for i in 0u64..64 { + let k = format!("issue34-key-{:04}", i).into_bytes(); + let want = overridden + .iter() + .find(|(idx, _)| *idx == i) + .map(|(_, v)| *v) + .unwrap_or(i); + let got = node.get(&k, store).await.unwrap(); + assert_eq!( + got, + Some(want), + "entry {} must survive sealing round-trips intact", + i + ); + } + } + + /// Issue #34 FIXED — `store_sealing` write-backs persisted children, so + /// re-storing an unchanged tree re-PUTs ONLY the root node (1 PUT, same + /// content address), not the whole tree. The legacy `store(&self)` also + /// benefits once the tree is sealed (its `to_wire` emits the recorded + /// references). A subsequent single-key mutation re-PUTs only the + /// touched path. This is the inverted form of the original pin + /// `issue_34_store_reputs_entire_unchanged_in_memory_tree`. + #[tokio::test] + async fn issue_34_fixed_store_sealing_skips_unchanged_subtrees() { + let store = PutCountingStore::new(); + let mut root = build_issue34_tree(&store).await; + + // First sealing store: persists the full tree once. + let first_key = root.store_sealing(&store).await.unwrap().storage_key; + let first = store.puts(); + assert!( + first >= 5, + "setup degenerate: 64 entries should split into several nodes, \ + got only {} PUTs", + first + ); + + // Second sealing store, NO mutation: children are Sealed, so only + // the root is re-PUT (content-addressed → identical key, and the + // PUT itself is idempotent on the backend). + let second_key = root.store_sealing(&store).await.unwrap().storage_key; + let second = store.puts() - first; + assert_eq!(first_key, second_key, "unchanged tree → same content address"); + assert_eq!( + second, 1, + "issue #34 regression: re-storing an unchanged sealed tree must \ + PUT only the root node, got {} PUTs (pre-fix behaviour was {} — \ + the whole tree)", + second, first + ); + + // Legacy store(&self) on the sealed tree: also root-only now, and + // the same bytes → same key. (Pre-fix this was the amplifying path.) + let third_key = root.store(&store).await.unwrap().storage_key; + let third = store.puts() - first - second; + assert_eq!(first_key, third_key, "legacy store must emit identical bytes"); + assert_eq!( + third, 1, + "legacy store() of a sealed tree must also be root-only, got {}", + third + ); + + // Mutate ONE key: only the touched path unseals; the next sealing + // store re-PUTs the path (root + its branch), not the tree. + let mutated = format!("issue34-key-{:04}", 7).into_bytes(); + root.set(mutated, 7_000, &store).await.unwrap(); + let before = store.puts(); + let fourth_key = root.store_sealing(&store).await.unwrap().storage_key; + let fourth = store.puts() - before; + assert_ne!(first_key, fourth_key, "mutation must change the root address"); + assert!( + (2..=4).contains(&fourth), + "single-key mutation must re-PUT only the touched path \ + (root + 1-2 nodes), got {} PUTs", + fourth + ); + + // INTEGRITY — through the live (sealed) tree... + assert_issue34_tree_complete(&root, &store, &[(7, 7_000)]).await; + // ...and through a cold reload of the final root from the backend + // (proves the sealed flush persisted a complete, correct tree). + let reloaded: TestNode = TestNode::load(&fourth_key, &store).await.unwrap(); + assert_issue34_tree_complete(&reloaded, &store, &[(7, 7_000)]).await; + } + + /// Issue #34 — NO-CORRUPTION equivalence proof: `store_sealing` + /// persists byte-identical state to the legacy `store`. Same inserts → + /// same content-addressed root key (content addressing makes root-key + /// equality transitive over the whole tree: identical root bytes embed + /// identical child keys, which address identical child plaintexts). + /// Readers — including pre-fix SDKs — cannot distinguish the writers. + #[tokio::test] + async fn issue_34_sealing_and_legacy_store_produce_identical_roots() { + let store_legacy = PutCountingStore::new(); + let root_legacy = build_issue34_tree(&store_legacy).await; + let key_legacy = root_legacy.store(&store_legacy).await.unwrap().storage_key; + + let store_sealed = PutCountingStore::new(); + let mut root_sealed = build_issue34_tree(&store_sealed).await; + let key_sealed = root_sealed + .store_sealing(&store_sealed) + .await + .unwrap() + .storage_key; + + assert_eq!( + key_legacy, key_sealed, + "store_sealing must persist byte-identical state to store() — \ + a divergence here means the fix changed the on-disk format and \ + could corrupt interop with already-uploaded data" + ); + + // Both persisted trees round-trip completely from their backends. + let from_legacy: TestNode = + TestNode::load(&key_legacy, &store_legacy).await.unwrap(); + assert_issue34_tree_complete(&from_legacy, &store_legacy, &[]).await; + let from_sealed: TestNode = + TestNode::load(&key_sealed, &store_sealed).await.unwrap(); + assert_issue34_tree_complete(&from_sealed, &store_sealed, &[]).await; + } + + /// Issue #34 — error-path integrity: if a node PUT fails mid-seal, the + /// in-memory tree must remain fully intact (no placeholder left in any + /// slot), already-sealed children keep their seal, and a retry must + /// succeed and produce the exact same root as a never-failed run. + #[tokio::test] + async fn issue_34_store_sealing_failure_leaves_tree_intact_and_retryable() { + // Control: the root key a clean run produces. + let control_store = PutCountingStore::new(); + let mut control_root = build_issue34_tree(&control_store).await; + let control_key = control_root + .store_sealing(&control_store) + .await + .unwrap() + .storage_key; + let control_puts = control_store.puts(); + + // Failing run: inject a one-shot failure on the 3rd put_node. + let store = PutCountingStore::new(); + let mut root = build_issue34_tree(&store).await; + *store.fail_on_put.lock().unwrap() = Some(3); + let err = root.store_sealing(&store).await; + assert!(err.is_err(), "injected failure must surface"); + + // The tree must still be completely readable in memory — the + // placeholder installed during sealing must never survive an + // error exit. + assert_issue34_tree_complete(&root, &store, &[]).await; + + // Retry: succeeds, costs at most a clean run (children sealed + // before the failure are skipped), and lands on the identical + // content address. + let before_retry = store.puts(); + let retry_key = root.store_sealing(&store).await.unwrap().storage_key; + let retry_puts = store.puts() - before_retry; + assert_eq!( + retry_key, control_key, + "retry after a failed seal must converge on the same persisted \ + state as a never-failed run" + ); + // Exactly the two children whose PUTs succeeded before the injected + // failure (calls 1 and 2) keep their seal and are skipped on retry; + // everything else — including the child whose PUT failed (restored + // to InMemory) — is re-persisted. Deterministic: fixed keys → fixed + // tree shape → fixed walk order. + assert_eq!( + retry_puts, + control_puts - 2, + "retry must re-PUT everything EXCEPT the 2 children sealed \ + before the injected failure (retry: {}, clean run: {})", + retry_puts, + control_puts + ); + + // And the persisted tree is complete. + let reloaded: TestNode = TestNode::load(&retry_key, &store).await.unwrap(); + assert_issue34_tree_complete(&reloaded, &store, &[]).await; + } } diff --git a/crates/fula-crypto/src/wnfs_hamt/pointer.rs b/crates/fula-crypto/src/wnfs_hamt/pointer.rs index 8a5c9fc..2b1fb05 100644 --- a/crates/fula-crypto/src/wnfs_hamt/pointer.rs +++ b/crates/fula-crypto/src/wnfs_hamt/pointer.rs @@ -61,6 +61,32 @@ where storage_key: StorageKey, cid: Cid, }, + /// Issue #34 — persisted AND still memory-resident. + /// + /// Produced by `Node::store_sealing` when an `InMemory` child has been + /// successfully PUT: the subtree stays resident (reads resolve from + /// memory with zero I/O, exactly like `InMemory`), but store paths now + /// emit the recorded `storage_key`/`cid` reference WITHOUT re-uploading + /// the subtree. This is the write-back state that `InMemory` was + /// missing: before it existed, a flushed-but-resident subtree could + /// only be represented as `InMemory`, so every subsequent flush + /// re-serialized, re-encrypted (fresh nonce), and re-PUT the whole + /// ever-touched tree — O(N²) total upload cost for N sequential puts. + /// + /// Never serialized as a distinct wire variant: `to_wire` maps it to + /// the same `PointerWire::Link`/`LinkV2` the original PUT produced, so + /// the on-disk format is byte-identical to pre-#34 writers. Any + /// mutation through `set_value`/`remove_value`/`canonicalize` resolves + /// the Arc and re-attaches the subtree as `InMemory`, invalidating the + /// seal for exactly the changed path. + /// + /// `cid` is `Some` only when the backend attested one on the original + /// PUT (walkable-v8 master); mirrors the `LinkV2`-vs-`Link` split. + Sealed { + storage_key: StorageKey, + cid: Option, + node: Arc>, + }, } /// Each bit in the bitmask of a HAMT node maps to one `Pointer`. A `Pointer` @@ -178,6 +204,11 @@ where .await?; Ok(Arc::new(node)) } + // Issue #34 — the subtree is persisted but still resident; + // serve it from memory exactly like `InMemory`. No I/O, no + // integrity re-check needed: these are the same plaintext + // nodes the seal-time PUT serialized. + ChildPtr::Sealed { node, .. } => Ok(Arc::clone(node)), } } } @@ -291,6 +322,19 @@ where storage_key: *storage_key, cid: cid.clone(), }), + // Issue #34 — already persisted by a prior `store_sealing` + // pass; emit the recorded reference WITHOUT re-uploading the + // subtree. Wire form matches what the original PUT produced + // (`LinkV2` when the backend attested a CID, legacy `Link` + // otherwise), so persisted bytes are identical to a pre-#34 + // writer's. + Pointer::Link(ChildPtr::Sealed { storage_key, cid, .. }) => match cid { + Some(cid) => Ok(PointerWire::LinkV2 { + storage_key: *storage_key, + cid: cid.clone(), + }), + None => Ok(PointerWire::Link(*storage_key)), + }, Pointer::Link(ChildPtr::InMemory(child)) => { let result = child.store(store).await?; match result.cid { @@ -343,6 +387,11 @@ impl Clone for ChildPtr { storage_key: *storage_key, cid: cid.clone(), }, + ChildPtr::Sealed { storage_key, cid, node } => ChildPtr::Sealed { + storage_key: *storage_key, + cid: cid.clone(), + node: Arc::clone(node), + }, } } } @@ -366,6 +415,12 @@ impl Debug for ChildPtr { .field("storage_key", &hex::encode(storage_key)) .field("cid", cid) .finish(), + ChildPtr::Sealed { storage_key, cid, node } => f + .debug_struct("Sealed") + .field("storage_key", &hex::encode(storage_key)) + .field("cid", cid) + .field("node", node) + .finish(), } } } @@ -405,6 +460,17 @@ impl PartialEq for Pointer { Pointer::Link(ChildPtr::StoredV2 { storage_key: a_sk, cid: a_cid }), Pointer::Link(ChildPtr::StoredV2 { storage_key: b_sk, cid: b_cid }), ) => a_sk == b_sk && a_cid == b_cid, + // Sealed equality compares the persisted identity only + // (storage_key is the content address; cid mirrors the + // LinkV2/Link split). The resident `node` Arc is deliberately + // not compared — it is derived state, same rationale as the + // InMemory `false` arm above. Cross-variant comparison + // (Sealed vs Stored/StoredV2/InMemory) stays `false` so the + // sealed-vs-loaded distinction remains observable in tests. + ( + Pointer::Link(ChildPtr::Sealed { storage_key: a_sk, cid: a_cid, .. }), + Pointer::Link(ChildPtr::Sealed { storage_key: b_sk, cid: b_cid, .. }), + ) => a_sk == b_sk && a_cid == b_cid, _ => false, } } diff --git a/crates/fula-flutter/Cargo.toml b/crates/fula-flutter/Cargo.toml index 2ecba3c..ae656fb 100644 --- a/crates/fula-flutter/Cargo.toml +++ b/crates/fula-flutter/Cargo.toml @@ -5,7 +5,7 @@ description = "Flutter bindings for Fula decentralized storage - works on Androi # to parse `*.workspace = true` keys in its own manifest scan. Keep # these in sync with `[workspace.package]` in the root Cargo.toml. # (Same workaround as crates/fula-js.) -version = "0.6.7" +version = "0.6.8" edition = "2021" license = "MIT OR Apache-2.0" repository = "https://github.com/functionland/fula-api" diff --git a/crates/fula-js/Cargo.toml b/crates/fula-js/Cargo.toml index db3abad..bcb8b0b 100644 --- a/crates/fula-js/Cargo.toml +++ b/crates/fula-js/Cargo.toml @@ -4,7 +4,7 @@ description = "JavaScript/TypeScript SDK for Fula decentralized storage - WASM b # Hard-coded (not workspace-inherited) because wasm-pack <= 0.13 fails # to parse `*.workspace = true` keys in its own manifest scan. Keep # these in sync with `[workspace.package]` in the root Cargo.toml. -version = "0.6.7" +version = "0.6.8" edition = "2021" license = "MIT OR Apache-2.0" repository = "https://github.com/functionland/fula-api" diff --git a/packages/fula_client/CHANGELOG.md b/packages/fula_client/CHANGELOG.md index eb633b5..f71b249 100644 --- a/packages/fula_client/CHANGELOG.md +++ b/packages/fula_client/CHANGELOG.md @@ -5,6 +5,28 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.6.8] - 2026-06-12 + +### Fixed + +- **Forest write amplification on sequential uploads + ([#34](https://github.com/functionland/fula-api/issues/34)).** Every + `put_object_flat` (and any other per-file flush) re-uploaded the entire + ever-touched portion of the bucket's HAMT index instead of just the + changed path, because nodes persisted by a flush were never marked + clean in memory. Sequential bulk uploads into one directory therefore + cost O(N²) total index traffic (measured: 1,630 index PUTs / 12.2 MB + to store 150 × 4 KB files; per-upload latency grew 1.0 s → 3.8 s as the + bucket filled). Flushes now write back a `Sealed` pointer state after + each successful node PUT: a flush uploads only the shard root plus the + path(s) mutated since the previous flush, so per-upload cost is flat + regardless of bucket size (same workload now: 384 PUTs / 3.5 MB, ~3 + node PUTs per upload at any bucket size). The persisted format is + byte-identical to previous releases — existing buckets are read and + written exactly as before, and older SDKs interoperate unchanged. + Applies to all platforms (Android, iOS, Windows, web/wasm, fula-js) + since the fix lives in the shared `fula-crypto` core. + ## [0.6.7] - 2026-06-10 ### Fixed diff --git a/packages/fula_client/ios/fula_client.podspec b/packages/fula_client/ios/fula_client.podspec index db48d67..7f81d46 100644 --- a/packages/fula_client/ios/fula_client.podspec +++ b/packages/fula_client/ios/fula_client.podspec @@ -6,7 +6,7 @@ Pod::Spec.new do |s| s.name = 'fula_client' - s.version = '0.6.7' + s.version = '0.6.8' s.summary = 'Flutter SDK for Fula decentralized storage' s.description = <<-DESC A Flutter plugin providing client-side encryption, metadata privacy, diff --git a/packages/fula_client/pubspec.yaml b/packages/fula_client/pubspec.yaml index e873d70..048e684 100644 --- a/packages/fula_client/pubspec.yaml +++ b/packages/fula_client/pubspec.yaml @@ -1,6 +1,6 @@ name: fula_client description: Flutter SDK for Fula decentralized storage with client-side encryption, metadata privacy, and secure sharing. -version: 0.6.7 +version: 0.6.8 homepage: https://fx.land repository: https://github.com/functionland/fula-api issue_tracker: https://github.com/functionland/fula-api/issues