Reproducible test case
Verified on datafusion = "53.1.0" + datafusion-proto = "53.1.0". The bug reproduces deterministically: a LogicalPlan with null_aware = true returns 0 rows when executed in-process, but returns 2 rows after a to_proto -> from_proto round-trip.
Setup
Cargo.toml:
[package]
name = "df_repro"
version = "0.1.0"
edition = "2021"
[dependencies]
datafusion = "53.1.0"
datafusion-proto = "53.1.0"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
src/main.rs:
use datafusion::execution::context::SessionContext;
use datafusion_proto::bytes::{
logical_plan_from_bytes_with_extension_codec,
logical_plan_to_bytes_with_extension_codec,
};
use datafusion_proto::logical_plan::DefaultLogicalExtensionCodec;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let ctx = SessionContext::new();
// Inline VALUES so the leaves are LogicalPlan::Values, which the default
// codec knows how to serialize. The data shape matches the SQL intent:
// t1 = {1, 2, 3}
// excludes = {1, NULL} <-- NULL on the right side of NOT IN
let sql = "
SELECT id
FROM (VALUES (1), (2), (3)) AS t1(id)
WHERE id NOT IN (
SELECT bad_id
FROM (VALUES (CAST(1 AS INT)), (CAST(NULL AS INT))) AS excludes(bad_id)
)
";
// The unoptimized plan still has Expr::InSubquery, which the default codec
// rejects. We must run the optimizer first; that lowers NOT IN into
// LogicalPlan::Join { join_type: LeftAnti, null_aware: true, .. }, which is
// the exact shape any real consumer (e.g. parallel IPC) serializes.
let df = ctx.sql(sql).await?;
let plan = ctx.state().optimize(df.logical_plan())?;
// (a) Direct execution -- correct, returns 0 rows.
let direct = ctx.execute_logical_plan(plan.clone()).await?.collect().await?;
let direct_rows: usize = direct.iter().map(|b| b.num_rows()).sum();
println!("direct rows: {direct_rows}");
// (b) Round-trip through datafusion-proto -- wrong, returns 2 rows.
let codec = DefaultLogicalExtensionCodec {};
let bytes = logical_plan_to_bytes_with_extension_codec(&plan, &codec)?;
let task_ctx = ctx.task_ctx();
let plan_rt =
logical_plan_from_bytes_with_extension_codec(&bytes, &task_ctx, &codec)?;
let rt = ctx.execute_logical_plan(plan_rt).await?.collect().await?;
let rt_rows: usize = rt.iter().map(|b| b.num_rows()).sum();
println!("round-trip rows: {rt_rows}");
assert_eq!(direct_rows, 0, "direct execution should return 0 rows");
assert_eq!(rt_rows, 0, "round-trip should also return 0 rows -- BUG");
Ok(())
}
Observed output
direct rows: 0
round-trip rows: 2
| Path |
Result |
Direct execution of optimized LogicalPlan::Join (null_aware=true) |
0 rows (correct) |
to_proto -> from_proto -> execute |
2 rows (wrong -- behaves as plain LeftAnti) |
Root cause
null_aware is present on the physical HashJoinExecNode proto (added in #19635) but missing from the logical JoinNode proto. The logical encoder also drops it during destructuring, and the logical decoder cannot restore it because the proto carries no field.
-
datafusion/proto/proto/datafusion.proto -- message JoinNode has 8 fields, no null_aware:
message JoinNode {
LogicalPlanNode left = 1;
LogicalPlanNode right = 2;
datafusion_common.JoinType join_type = 3;
datafusion_common.JoinConstraint join_constraint = 4;
repeated LogicalExprNode left_join_key = 5;
repeated LogicalExprNode right_join_key = 6;
datafusion_common.NullEquality null_equality = 7;
LogicalExprNode filter = 8;
// bool null_aware = 9; // MISSING
}
-
datafusion/proto/src/logical_plan/mod.rs:1469 -- to_proto destructures with .., so null_aware falls into "the rest" and is silently discarded:
LogicalPlan::Join(Join {
left, right, on, filter, join_type, join_constraint,
null_equality,
.. // <- null_aware is here, silently dropped
}) => { /* builds protobuf::JoinNode without null_aware */ }
-
datafusion/proto/src/logical_plan/mod.rs:830 -- from_proto rebuilds the join via LogicalPlanBuilder::join_with_expr_keys / join_using, neither of which carries a null_aware parameter. So even if the proto had the field, the current decoder would still drop it on the way back in.
Net effect: any consumer of datafusion_proto::bytes::logical_plan_to_bytes_with_extension_codec (parallel-execution IPC, plan caching, distributed scheduling, etc.) silently loses null-aware NOT IN semantics across the round-trip.
Fix
-
Proto: add bool null_aware = 9; to message JoinNode in datafusion/proto/proto/datafusion.proto. Default-false keeps wire compatibility with older writers.
-
to_proto (logical_plan/mod.rs:1469): pull null_aware out of the destructure and write it to the proto:
LogicalPlan::Join(Join {
left, right, on, filter, join_type, join_constraint,
null_equality, null_aware, ..
}) => {
// ...
protobuf::JoinNode { /* existing fields */, null_aware: *null_aware }
}
-
from_proto (logical_plan/mod.rs:830): stop using the builder paths that ignore null_aware. Construct the Join directly via Join::try_new(left, right, on, filter, join_type, join_constraint, null_equality, null_aware) (signature already accepts an Expr-keyed on and null_aware: bool, fits the decoder's data without a new builder API). Alternatively, add an expr-keyed LogicalPlanBuilder variant that carries null_equality + null_aware.
-
Tests in datafusion/proto/tests/cases/roundtrip_logical_plan.rs:
- Round-trip equality: build a
LogicalPlan::Join with null_aware=true, serialize, deserialize, assert the resulting plan's null_aware is still true.
- Execution regression: run the
t1 / excludes reproducer above through to_proto -> from_proto -> execute, and assert the result is 0 rows.
Reproducible test case
Verified on
datafusion = "53.1.0"+datafusion-proto = "53.1.0". The bug reproduces deterministically: aLogicalPlanwithnull_aware = truereturns 0 rows when executed in-process, but returns 2 rows after ato_proto->from_protoround-trip.Setup
Cargo.toml:src/main.rs:Observed output
LogicalPlan::Join(null_aware=true)to_proto->from_proto-> executeLeftAnti)Root cause
null_awareis present on the physicalHashJoinExecNodeproto (added in #19635) but missing from the logicalJoinNodeproto. The logical encoder also drops it during destructuring, and the logical decoder cannot restore it because the proto carries no field.datafusion/proto/proto/datafusion.proto--message JoinNodehas 8 fields, nonull_aware:datafusion/proto/src/logical_plan/mod.rs:1469--to_protodestructures with.., sonull_awarefalls into "the rest" and is silently discarded:datafusion/proto/src/logical_plan/mod.rs:830--from_protorebuilds the join viaLogicalPlanBuilder::join_with_expr_keys/join_using, neither of which carries anull_awareparameter. So even if the proto had the field, the current decoder would still drop it on the way back in.Net effect: any consumer of
datafusion_proto::bytes::logical_plan_to_bytes_with_extension_codec(parallel-execution IPC, plan caching, distributed scheduling, etc.) silently loses null-aware NOT IN semantics across the round-trip.Fix
Proto: add
bool null_aware = 9;tomessage JoinNodeindatafusion/proto/proto/datafusion.proto. Default-falsekeeps wire compatibility with older writers.to_proto(logical_plan/mod.rs:1469): pullnull_awareout of the destructure and write it to the proto:from_proto(logical_plan/mod.rs:830): stop using the builder paths that ignorenull_aware. Construct theJoindirectly viaJoin::try_new(left, right, on, filter, join_type, join_constraint, null_equality, null_aware)(signature already accepts anExpr-keyedonandnull_aware: bool, fits the decoder's data without a new builder API). Alternatively, add an expr-keyedLogicalPlanBuildervariant that carriesnull_equality+null_aware.Tests in
datafusion/proto/tests/cases/roundtrip_logical_plan.rs:LogicalPlan::Joinwithnull_aware=true, serialize, deserialize, assert the resulting plan'snull_awareis stilltrue.t1/excludesreproducer above throughto_proto->from_proto->execute, and assert the result is 0 rows.