Skip to content

Unparser doesn't resolve column names in outer query correctly #23138

Description

@drewrip

Describe the bug

This is a follow to #22961 where I described bugs in the Unparser that is creating invalid SQL. In general, I'm still seeing a lot of errors in generated SQL due to column references not being appropriately qualified. This usually is showing up where we have an unnamed subquery.

To Reproduce

This is a simple reproducer that uses DuckDB because DataFusion seems to accept the SQL. It targets datafusion main so that we can @goutamadwant 's changes from #23002 .

The Rust project:

datafusion = { git = "https://github.com/apache/datafusion.git" }
duckdb = { version = "1.10503.1", features = ["bundled"] }
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }
use std::sync::Arc;

use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::catalog::{
    CatalogProvider, MemoryCatalogProvider, MemorySchemaProvider, SchemaProvider,
};
use datafusion::datasource::empty::EmptyTable;
use datafusion::prelude::*;
use datafusion::sql::unparser::Unparser;
use datafusion::sql::unparser::dialect::DuckDBDialect;
use duckdb::Connection;

const QUERY: &str = r#"
SELECT * FROM
(
SELECT
        order_id
    FROM
        "warehouse"."main"."order_items"
) oi
JOIN (
    SELECT
        order_id,
        coalesce(discount_pct, 0) AS discount_pct_2
    FROM
        "warehouse"."main"."orders"
) o USING (order_id)
"#;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let order_items_schema = Arc::new(Schema::new(vec![Field::new(
        "order_id",
        DataType::Int32,
        true,
    )]));

    let orders_schema = Arc::new(Schema::new(vec![
        Field::new("order_id", DataType::Int32, false),
        Field::new("customer_id", DataType::Int32, true),
        Field::new("discount_pct", DataType::Decimal128(5, 2), true),
    ]));

    let ctx = SessionContext::new();

    let schema_provider = Arc::new(MemorySchemaProvider::new());
    schema_provider.register_table(
        "order_items".to_string(),
        Arc::new(EmptyTable::new(order_items_schema)),
    )?;
    schema_provider.register_table(
        "orders".to_string(),
        Arc::new(EmptyTable::new(orders_schema)),
    )?;

    let catalog = Arc::new(MemoryCatalogProvider::new());
    catalog.register_schema("main", schema_provider)?;
    ctx.register_catalog("warehouse", catalog);

    let dialect = DuckDBDialect::new();
    let unparser = Unparser::new(&dialect);
    let conn = Connection::open("warehouse.duckdb")?;

    {
        let unopt_plan = ctx.sql(QUERY).await?.into_unoptimized_plan();
        let sql = unparser.plan_to_sql(&unopt_plan)?;
        let mut stmt = conn.prepare(&sql.to_string())?;
        match stmt.query([]) {
            Ok(_) => {
                println!("original query is valid");
            }
            Err(e) => {
                println!("original query is not valid: {e}");
            }
        }
    }

    let plan = ctx.sql(QUERY).await?.into_optimized_plan()?;
    let sql = unparser.plan_to_sql(&plan)?;
    match conn.execute(&sql.to_string(), []) {
        Ok(_) => {
            println!("success");
        }
        Err(e) => {
            println!("failed: {e}");
            println!("Optimized sql =\n{sql}");
            println!("Optimized plan =\n{}", plan.display_indent());
        }
    }

    Ok(())
}

This yields the following error:

original query is valid
failed: Binder Error: Referenced table "o" not found!
Candidate tables: "oi"

LINE 1: ... "warehouse"."main"."order_items" AS "oi" INNER JOIN (SELECT "o"."order_id", CASE WHEN "__common_expr_1" IS NOT NULL...
                                                                        ^
Optimized sql =
SELECT "o"."order_id", "o"."discount_pct_2" FROM "warehouse"."main"."order_items" AS "oi" INNER JOIN (SELECT "o"."order_id", CASE WHEN "__common_expr_1" IS NOT NULL THEN "__common_expr_1" ELSE 0.00 END AS "discount_pct_2" FROM (SELECT CAST("o"."discount_pct" AS DECIMAL(22,2)) AS "__common_expr_1", "o"."order_id" FROM "warehouse"."main"."orders" AS "o")) AS "o" ON "oi"."order_id" = "o"."order_id"
Optimized plan =
Projection: o.order_id, o.discount_pct_2
  Inner Join: oi.order_id = o.order_id
    SubqueryAlias: oi
      TableScan: warehouse.main.order_items projection=[order_id]
    SubqueryAlias: o
      Projection: warehouse.main.orders.order_id, CASE WHEN __common_expr_1 IS NOT NULL THEN __common_expr_1 ELSE Decimal128(0.00,22,2) END AS discount_pct_2
        Projection: CAST(warehouse.main.orders.discount_pct AS Decimal128(22, 2)) AS __common_expr_1, warehouse.main.orders.order_id
          TableScan: warehouse.main.orders projection=[order_id, discount_pct]

Expected behavior

The Unparser should generate valid SQL. Specifically, we shouldn't be referencing "o" outside of the unnamed subquery.

SELECT
    "o"."order_id",
    "o"."discount_pct_2"
FROM
    "warehouse"."main"."order_items" AS "oi"
    INNER JOIN (
        SELECT
            "o"."order_id",                   <------ "o" doesn't exist here
            CASE
                WHEN "__common_expr_1" IS NOT NULL THEN "__common_expr_1"
                ELSE 0.00
            END AS "discount_pct_2"
        FROM
            (
                SELECT
                    CAST("o"."discount_pct" AS DECIMAL(22, 2)) AS "__common_expr_1",
                    "o"."order_id"
                FROM
                    "warehouse"."main"."orders" AS "o"
            )
    ) AS "o" ON "oi"."order_id" = "o"."order_id"

Additional context

The LogicalPlan that triggers this invalid SQL in this case is a direct result of the CSE pass. However, it does not seem to be doing anything wrong, we just can't accurately unparse the resulting LogicalPlan.

Metadata

Metadata

Assignees

Labels

bugSomething isn't working

Type

No fields configured for Bug.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions