Skip to content

logical JoinNode proto missing null_aware field (follow-up to #19635) #22065

@mithuncy

Description

@mithuncy

Reproducible test case

Verified on datafusion = "53.1.0" + datafusion-proto = "53.1.0". The bug reproduces deterministically: a LogicalPlan with null_aware = true returns 0 rows when executed in-process, but returns 2 rows after a to_proto -> from_proto round-trip.

Setup

Cargo.toml:

[package]
name = "df_repro"
version = "0.1.0"
edition = "2021"

[dependencies]
datafusion = "53.1.0"
datafusion-proto = "53.1.0"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }

src/main.rs:

use datafusion::execution::context::SessionContext;
use datafusion_proto::bytes::{
    logical_plan_from_bytes_with_extension_codec,
    logical_plan_to_bytes_with_extension_codec,
};
use datafusion_proto::logical_plan::DefaultLogicalExtensionCodec;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let ctx = SessionContext::new();

    // Inline VALUES so the leaves are LogicalPlan::Values, which the default
    // codec knows how to serialize. The data shape matches the SQL intent:
    //   t1       = {1, 2, 3}
    //   excludes = {1, NULL}     <-- NULL on the right side of NOT IN
    let sql = "
        SELECT id
        FROM (VALUES (1), (2), (3)) AS t1(id)
        WHERE id NOT IN (
            SELECT bad_id
            FROM (VALUES (CAST(1 AS INT)), (CAST(NULL AS INT))) AS excludes(bad_id)
        )
    ";

    // The unoptimized plan still has Expr::InSubquery, which the default codec
    // rejects. We must run the optimizer first; that lowers NOT IN into
    // LogicalPlan::Join { join_type: LeftAnti, null_aware: true, .. }, which is
    // the exact shape any real consumer (e.g. parallel IPC) serializes.
    let df = ctx.sql(sql).await?;
    let plan = ctx.state().optimize(df.logical_plan())?;

    // (a) Direct execution -- correct, returns 0 rows.
    let direct = ctx.execute_logical_plan(plan.clone()).await?.collect().await?;
    let direct_rows: usize = direct.iter().map(|b| b.num_rows()).sum();
    println!("direct rows: {direct_rows}");

    // (b) Round-trip through datafusion-proto -- wrong, returns 2 rows.
    let codec = DefaultLogicalExtensionCodec {};
    let bytes = logical_plan_to_bytes_with_extension_codec(&plan, &codec)?;
    let task_ctx = ctx.task_ctx();
    let plan_rt =
        logical_plan_from_bytes_with_extension_codec(&bytes, &task_ctx, &codec)?;
    let rt = ctx.execute_logical_plan(plan_rt).await?.collect().await?;
    let rt_rows: usize = rt.iter().map(|b| b.num_rows()).sum();
    println!("round-trip rows: {rt_rows}");

    assert_eq!(direct_rows, 0, "direct execution should return 0 rows");
    assert_eq!(rt_rows, 0, "round-trip should also return 0 rows -- BUG");
    Ok(())
}

Observed output

direct rows: 0
round-trip rows: 2
Path Result
Direct execution of optimized LogicalPlan::Join (null_aware=true) 0 rows (correct)
to_proto -> from_proto -> execute 2 rows (wrong -- behaves as plain LeftAnti)

Root cause

null_aware is present on the physical HashJoinExecNode proto (added in #19635) but missing from the logical JoinNode proto. The logical encoder also drops it during destructuring, and the logical decoder cannot restore it because the proto carries no field.

  1. datafusion/proto/proto/datafusion.proto -- message JoinNode has 8 fields, no null_aware:

    message JoinNode {
      LogicalPlanNode left = 1;
      LogicalPlanNode right = 2;
      datafusion_common.JoinType join_type = 3;
      datafusion_common.JoinConstraint join_constraint = 4;
      repeated LogicalExprNode left_join_key = 5;
      repeated LogicalExprNode right_join_key = 6;
      datafusion_common.NullEquality null_equality = 7;
      LogicalExprNode filter = 8;
      // bool null_aware = 9;   // MISSING
    }
  2. datafusion/proto/src/logical_plan/mod.rs:1469 -- to_proto destructures with .., so null_aware falls into "the rest" and is silently discarded:

    LogicalPlan::Join(Join {
        left, right, on, filter, join_type, join_constraint,
        null_equality,
        ..   // <- null_aware is here, silently dropped
    }) => { /* builds protobuf::JoinNode without null_aware */ }
  3. datafusion/proto/src/logical_plan/mod.rs:830 -- from_proto rebuilds the join via LogicalPlanBuilder::join_with_expr_keys / join_using, neither of which carries a null_aware parameter. So even if the proto had the field, the current decoder would still drop it on the way back in.

Net effect: any consumer of datafusion_proto::bytes::logical_plan_to_bytes_with_extension_codec (parallel-execution IPC, plan caching, distributed scheduling, etc.) silently loses null-aware NOT IN semantics across the round-trip.

Fix

  1. Proto: add bool null_aware = 9; to message JoinNode in datafusion/proto/proto/datafusion.proto. Default-false keeps wire compatibility with older writers.

  2. to_proto (logical_plan/mod.rs:1469): pull null_aware out of the destructure and write it to the proto:

    LogicalPlan::Join(Join {
        left, right, on, filter, join_type, join_constraint,
        null_equality, null_aware, ..
    }) => {
        // ...
        protobuf::JoinNode { /* existing fields */, null_aware: *null_aware }
    }
  3. from_proto (logical_plan/mod.rs:830): stop using the builder paths that ignore null_aware. Construct the Join directly via Join::try_new(left, right, on, filter, join_type, join_constraint, null_equality, null_aware) (signature already accepts an Expr-keyed on and null_aware: bool, fits the decoder's data without a new builder API). Alternatively, add an expr-keyed LogicalPlanBuilder variant that carries null_equality + null_aware.

  4. Tests in datafusion/proto/tests/cases/roundtrip_logical_plan.rs:

    • Round-trip equality: build a LogicalPlan::Join with null_aware=true, serialize, deserialize, assert the resulting plan's null_aware is still true.
    • Execution regression: run the t1 / excludes reproducer above through to_proto -> from_proto -> execute, and assert the result is 0 rows.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions