Skip to content

fix: support Spark 4.1 BloomFilter V2 format and bit-scattering#4196

Open
andygrove wants to merge 3 commits intoapache:mainfrom
andygrove:issue-4193
Open

fix: support Spark 4.1 BloomFilter V2 format and bit-scattering#4196
andygrove wants to merge 3 commits intoapache:mainfrom
andygrove:issue-4193

Conversation

@andygrove
Copy link
Copy Markdown
Member

Which issue does this PR close?

Closes #4193 (sub-issue of #4098).

Rationale for this change

Spark 4.1's `BloomFilter.create` defaults to a new V2 implementation (`BloomFilterImplV2`) with both a new binary format and a new bit-scattering algorithm. Comet's reader hard-rejects non-V1 bytes and Comet's writer always emits V1, so on Spark 4.1 both `BloomFilterMightContain from random input` and `bloom_filter_agg` produce wrong results vs Spark.

V1 vs V2:

V1 (Spark <= 4.0, still supported in 4.1) V2 (Spark 4.1+ default)
Binary `[version=1][numHashFn][numWords][bits...]` `[version=2][numHashFn][seed][numWords][bits...]`
Scatter `combinedHash = h1 + i*h2`, i in 1..=N, 32-bit wrap `combinedHash = (long)h1 * Integer.MAX_VALUE; for (i=0; i<N; i++) combinedHash += h2;`, 64-bit wrap
Bit index `combinedHash<0 ? ~combinedHash : combinedHash`, mod 32-bit `bitSize` same negation, mod 64-bit `bitSize`
Seed always 0 per-filter (Spark's `BloomFilterImplV2.DEFAULT_SEED` is also 0)

What changes are included in this PR?

  • `SparkBloomFilter` (`native/spark-expr/src/bloom_filter/spark_bloom_filter.rs`): added `SparkBloomFilterVersion` (V1, V2) and a `seed` field. The deserializer detects the version from the leading 4 bytes; V2 reads an extra 4-byte seed. The serializer writes the layout matching the filter's version. `put_long` / `put_binary` / `might_contain_long` now branch on version for the bit-scattering algorithm and seed murmur3 with `self.seed` (always 0 for V1; configurable for V2).
  • `BloomFilterAgg` (`bloom_filter_agg.rs`): takes a `SparkBloomFilterVersion` so the aggregator emits the version matching Spark's output.
  • Proto (`expr.proto`): added `BloomFilterVersion` enum (V1 / V2 / Unspecified) and a `version` field on `BloomFilterAgg`.
  • JVM serde (`CometBloomFilterAggregate`): sets V2 on Spark 4.1+ and V1 on Spark <= 4.0 to match Spark's `BloomFilter.create` default.
  • New Rust unit tests cover V1 and V2 round-trips, that the two scattering schemes produce different bit patterns for the same input, and that the deserializer rejects unknown versions.
  • Removes the `assume(!isSpark41Plus, ...)` guards on `BloomFilterMightContain from random input` (CometExec3_4PlusSuite) and `bloom_filter_agg` (CometExecSuite); both now run on Spark 4.1.

How are these changes tested?

  • 4 new Rust unit tests in `spark_bloom_filter::tests` (round-trip, V1≠V2 bit pattern, panic on unknown version) — all pass.
  • `CometExec3_4PlusSuite` and `CometExecSuite` bloom filter tests pass locally on both Spark 4.0 (V1 path) and Spark 4.1 (V2 path).
  • Full `CometExecSuite` (124 tests) passes on Spark 4.0 — no regressions.

Closes apache#4193.

Spark 4.1 (`BloomFilter.create`) defaults to a new V2 binary format and a
new bit-scattering algorithm in `BloomFilterImplV2`:

- V2 binary: `[version=2][numHashFunctions][seed][numWords][bits...]`
  (V1 omits the seed).
- V2 scatter: `combinedHash = (long)h1 * Integer.MAX_VALUE; for (i = 0;
  i < numHashFunctions; i++) combinedHash += h2;` then take
  `combinedHash < 0 ? ~combinedHash : combinedHash` mod bitSize. V1 uses
  `h1 + i*h2` with i in 1..=N and 32-bit arithmetic.

Comet's reader hard-rejected non-V1 bytes, and the writer always emitted
V1, so on Spark 4.1 both `BloomFilterMightContain from random input`
and `bloom_filter_agg` failed with byte/result mismatches.

This change:

- Adds `SparkBloomFilterVersion` (V1, V2) and a `seed` field to
  `SparkBloomFilter`. Deserializer detects version from the leading 4
  bytes; for V2 it reads the extra seed. Serializer writes the matching
  layout. `put_long`/`put_binary`/`might_contain_long` branch on version
  for the bit-scattering algorithm and seed murmur3 with `self.seed`
  (always 0 for V1; configurable for V2).
- Threads the version through `BloomFilterAgg::new` so the aggregator
  emits the version that matches Spark's output. `BloomFilterAggregate`
  in Spark always uses `BloomFilterImplV2.DEFAULT_SEED = 0`.
- Adds a `version` field to the `BloomFilterAgg` proto and the
  `BloomFilterVersion` enum (V1 / V2 / Unspecified).
- `CometBloomFilterAggregate` (JVM serde) sets V2 on Spark 4.1+ and V1
  on Spark <= 4.0.
- New Rust tests cover V1 and V2 round-trips, that the two scattering
  schemes produce different bit patterns for the same input, and that
  the deserializer rejects unknown versions.
- Removes the `assume(!isSpark41Plus, ...)` guards from the
  `BloomFilterMightContain from random input` and `bloom_filter_agg`
  Comet test suites; both now pass on Spark 4.1, and the V1 path still
  passes on Spark 4.0.
@andygrove andygrove marked this pull request as draft May 3, 2026 19:06
andygrove added 2 commits May 3, 2026 14:50
The original `assume(!isSpark41Plus, ...)` guards skipped on both 4.1 and 4.2.
The previous commit removed them entirely, but Spark 4.2 has a separate
`might_contain` / `bloom_filter_agg` registration issue tracked in apache#4142
(`Function identifier must be fully qualified (3-part)`). Tightening the
guards back to `assume(!isSpark42Plus, "apache#4142")` lets the tests run on 4.1
(the goal of this PR) while staying skipped on 4.2 (existing behavior).
The previous commit added a `version: SparkBloomFilterVersion` parameter to
`BloomFilterAgg::new`, but missed updating the criterion bench under
`spark-expr/benches/bloom_filter_agg.rs`. Pass V1 explicitly to match the
historic behaviour the bench was written against.
@andygrove andygrove marked this pull request as ready for review May 3, 2026 22:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Spark 4.1: bloom filter result mismatch (might_contain returns wrong answers)

1 participant