Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move resolve_table_references out of datafusion-catalog` #14441

Merged
merged 4 commits into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions datafusion-examples/examples/sql_frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::common::plan_err;
use datafusion::common::{plan_err, TableReference};
use datafusion::config::ConfigOptions;
use datafusion::error::Result;
use datafusion::logical_expr::{
Expand All @@ -29,7 +29,6 @@ use datafusion::optimizer::{
use datafusion::sql::planner::{ContextProvider, SqlToRel};
use datafusion::sql::sqlparser::dialect::PostgreSqlDialect;
use datafusion::sql::sqlparser::parser::Parser;
use datafusion::sql::TableReference;
use std::any::Any;
use std::sync::Arc;

Expand Down
248 changes: 9 additions & 239 deletions datafusion/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,19 @@
//! * Simple memory based catalog: [`MemoryCatalogProviderList`], [`MemoryCatalogProvider`], [`MemorySchemaProvider`]

pub mod memory;
#[deprecated(
since = "46.0.0",
note = "use datafusion_sql::resolve::resolve_table_references"
)]
pub use datafusion_sql::resolve::resolve_table_references;
#[deprecated(
since = "46.0.0",
note = "use datafusion_common::{ResolvedTableReference, TableReference}"
)]
pub use datafusion_sql::{ResolvedTableReference, TableReference};
pub use memory::{
MemoryCatalogProvider, MemoryCatalogProviderList, MemorySchemaProvider,
};
use std::collections::BTreeSet;
use std::ops::ControlFlow;

mod r#async;
mod catalog;
mod dynamic_file;
Expand All @@ -43,239 +49,3 @@ pub use schema::*;
pub use session::*;
pub use table::*;
pub mod streaming;

/// Collects all tables and views referenced in the SQL statement. CTEs are collected separately.
/// This can be used to determine which tables need to be in the catalog for a query to be planned.
///
/// # Returns
///
/// A `(table_refs, ctes)` tuple, the first element contains table and view references and the second
/// element contains any CTE aliases that were defined and possibly referenced.
///
/// ## Example
///
/// ```
/// # use datafusion_sql::parser::DFParser;
/// # use datafusion_catalog::resolve_table_references;
/// let query = "SELECT a FROM foo where x IN (SELECT y FROM bar)";
/// let statement = DFParser::parse_sql(query).unwrap().pop_back().unwrap();
/// let (table_refs, ctes) = resolve_table_references(&statement, true).unwrap();
/// assert_eq!(table_refs.len(), 2);
/// assert_eq!(table_refs[0].to_string(), "bar");
/// assert_eq!(table_refs[1].to_string(), "foo");
/// assert_eq!(ctes.len(), 0);
/// ```
///
/// ## Example with CTEs
///
/// ```
/// # use datafusion_sql::parser::DFParser;
/// # use datafusion_catalog::resolve_table_references;
/// let query = "with my_cte as (values (1), (2)) SELECT * from my_cte;";
/// let statement = DFParser::parse_sql(query).unwrap().pop_back().unwrap();
/// let (table_refs, ctes) = resolve_table_references(&statement, true).unwrap();
/// assert_eq!(table_refs.len(), 0);
/// assert_eq!(ctes.len(), 1);
/// assert_eq!(ctes[0].to_string(), "my_cte");
/// ```
pub fn resolve_table_references(
statement: &datafusion_sql::parser::Statement,
enable_ident_normalization: bool,
) -> datafusion_common::Result<(Vec<TableReference>, Vec<TableReference>)> {
use datafusion_sql::parser::{
CopyToSource, CopyToStatement, Statement as DFStatement,
};
use datafusion_sql::planner::object_name_to_table_reference;
use information_schema::INFORMATION_SCHEMA;
use information_schema::INFORMATION_SCHEMA_TABLES;
use sqlparser::ast::*;

struct RelationVisitor {
relations: BTreeSet<ObjectName>,
all_ctes: BTreeSet<ObjectName>,
ctes_in_scope: Vec<ObjectName>,
}

impl RelationVisitor {
/// Record the reference to `relation`, if it's not a CTE reference.
fn insert_relation(&mut self, relation: &ObjectName) {
if !self.relations.contains(relation)
&& !self.ctes_in_scope.contains(relation)
{
self.relations.insert(relation.clone());
}
}
}

impl Visitor for RelationVisitor {
type Break = ();

fn pre_visit_relation(&mut self, relation: &ObjectName) -> ControlFlow<()> {
self.insert_relation(relation);
ControlFlow::Continue(())
}

fn pre_visit_query(&mut self, q: &Query) -> ControlFlow<Self::Break> {
if let Some(with) = &q.with {
for cte in &with.cte_tables {
// The non-recursive CTE name is not in scope when evaluating the CTE itself, so this is valid:
// `WITH t AS (SELECT * FROM t) SELECT * FROM t`
// Where the first `t` refers to a predefined table. So we are careful here
// to visit the CTE first, before putting it in scope.
if !with.recursive {
// This is a bit hackish as the CTE will be visited again as part of visiting `q`,
// but thankfully `insert_relation` is idempotent.
cte.visit(self);
}
self.ctes_in_scope
.push(ObjectName(vec![cte.alias.name.clone()]));
}
}
ControlFlow::Continue(())
}

fn post_visit_query(&mut self, q: &Query) -> ControlFlow<Self::Break> {
if let Some(with) = &q.with {
for _ in &with.cte_tables {
// Unwrap: We just pushed these in `pre_visit_query`
self.all_ctes.insert(self.ctes_in_scope.pop().unwrap());
}
}
ControlFlow::Continue(())
}

fn pre_visit_statement(&mut self, statement: &Statement) -> ControlFlow<()> {
if let Statement::ShowCreate {
obj_type: ShowCreateObject::Table | ShowCreateObject::View,
obj_name,
} = statement
{
self.insert_relation(obj_name)
}

// SHOW statements will later be rewritten into a SELECT from the information_schema
let requires_information_schema = matches!(
statement,
Statement::ShowFunctions { .. }
| Statement::ShowVariable { .. }
| Statement::ShowStatus { .. }
| Statement::ShowVariables { .. }
| Statement::ShowCreate { .. }
| Statement::ShowColumns { .. }
| Statement::ShowTables { .. }
| Statement::ShowCollation { .. }
);
if requires_information_schema {
for s in INFORMATION_SCHEMA_TABLES {
self.relations.insert(ObjectName(vec![
Ident::new(INFORMATION_SCHEMA),
Ident::new(*s),
]));
}
}
ControlFlow::Continue(())
}
}

let mut visitor = RelationVisitor {
relations: BTreeSet::new(),
all_ctes: BTreeSet::new(),
ctes_in_scope: vec![],
};

fn visit_statement(statement: &DFStatement, visitor: &mut RelationVisitor) {
match statement {
DFStatement::Statement(s) => {
let _ = s.as_ref().visit(visitor);
}
DFStatement::CreateExternalTable(table) => {
visitor.relations.insert(table.name.clone());
}
DFStatement::CopyTo(CopyToStatement { source, .. }) => match source {
CopyToSource::Relation(table_name) => {
visitor.insert_relation(table_name);
}
CopyToSource::Query(query) => {
query.visit(visitor);
}
},
DFStatement::Explain(explain) => visit_statement(&explain.statement, visitor),
}
}

visit_statement(statement, &mut visitor);

let table_refs = visitor
.relations
.into_iter()
.map(|x| object_name_to_table_reference(x, enable_ident_normalization))
.collect::<datafusion_common::Result<_>>()?;
let ctes = visitor
.all_ctes
.into_iter()
.map(|x| object_name_to_table_reference(x, enable_ident_normalization))
.collect::<datafusion_common::Result<_>>()?;
Ok((table_refs, ctes))
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn resolve_table_references_shadowed_cte() {
use datafusion_sql::parser::DFParser;

// An interesting edge case where the `t` name is used both as an ordinary table reference
// and as a CTE reference.
let query = "WITH t AS (SELECT * FROM t) SELECT * FROM t";
let statement = DFParser::parse_sql(query).unwrap().pop_back().unwrap();
let (table_refs, ctes) = resolve_table_references(&statement, true).unwrap();
assert_eq!(table_refs.len(), 1);
assert_eq!(ctes.len(), 1);
assert_eq!(ctes[0].to_string(), "t");
assert_eq!(table_refs[0].to_string(), "t");

// UNION is a special case where the CTE is not in scope for the second branch.
let query = "(with t as (select 1) select * from t) union (select * from t)";
let statement = DFParser::parse_sql(query).unwrap().pop_back().unwrap();
let (table_refs, ctes) = resolve_table_references(&statement, true).unwrap();
assert_eq!(table_refs.len(), 1);
assert_eq!(ctes.len(), 1);
assert_eq!(ctes[0].to_string(), "t");
assert_eq!(table_refs[0].to_string(), "t");

// Nested CTEs are also handled.
// Here the first `u` is a CTE, but the second `u` is a table reference.
// While `t` is always a CTE.
let query = "(with t as (with u as (select 1) select * from u) select * from u cross join t)";
let statement = DFParser::parse_sql(query).unwrap().pop_back().unwrap();
let (table_refs, ctes) = resolve_table_references(&statement, true).unwrap();
assert_eq!(table_refs.len(), 1);
assert_eq!(ctes.len(), 2);
assert_eq!(ctes[0].to_string(), "t");
assert_eq!(ctes[1].to_string(), "u");
assert_eq!(table_refs[0].to_string(), "u");
}

#[test]
fn resolve_table_references_recursive_cte() {
use datafusion_sql::parser::DFParser;

let query = "
WITH RECURSIVE nodes AS (
SELECT 1 as id
UNION ALL
SELECT id + 1 as id
FROM nodes
WHERE id < 10
)
SELECT * FROM nodes
";
let statement = DFParser::parse_sql(query).unwrap().pop_back().unwrap();
let (table_refs, ctes) = resolve_table_references(&statement, true).unwrap();
assert_eq!(table_refs.len(), 0);
assert_eq!(ctes.len(), 1);
assert_eq!(ctes[0].to_string(), "nodes");
}
}
3 changes: 1 addition & 2 deletions datafusion/core/tests/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};
use arrow_schema::{Fields, SchemaBuilder};
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{TransformedResult, TreeNode};
use datafusion_common::{plan_err, DFSchema, Result, ScalarValue};
use datafusion_common::{plan_err, DFSchema, Result, ScalarValue, TableReference};
use datafusion_expr::interval_arithmetic::{Interval, NullableInterval};
use datafusion_expr::{
col, lit, AggregateUDF, BinaryExpr, Expr, ExprSchemable, LogicalPlan, Operator,
Expand All @@ -41,7 +41,6 @@ use datafusion_sql::planner::{ContextProvider, SqlToRel};
use datafusion_sql::sqlparser::ast::Statement;
use datafusion_sql::sqlparser::dialect::GenericDialect;
use datafusion_sql::sqlparser::parser::Parser;
use datafusion_sql::TableReference;

use chrono::DateTime;
use datafusion_functions::datetime;
Expand Down
3 changes: 1 addition & 2 deletions datafusion/optimizer/tests/optimizer_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::sync::Arc;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};

use datafusion_common::config::ConfigOptions;
use datafusion_common::{assert_contains, plan_err, Result};
use datafusion_common::{assert_contains, plan_err, Result, TableReference};
use datafusion_expr::sqlparser::dialect::PostgreSqlDialect;
use datafusion_expr::test::function_stub::sum_udaf;
use datafusion_expr::{AggregateUDF, LogicalPlan, ScalarUDF, TableSource, WindowUDF};
Expand All @@ -36,7 +36,6 @@ use datafusion_sql::planner::{ContextProvider, SqlToRel};
use datafusion_sql::sqlparser::ast::Statement;
use datafusion_sql::sqlparser::dialect::GenericDialect;
use datafusion_sql::sqlparser::parser::Parser;
use datafusion_sql::TableReference;

#[cfg(test)]
#[ctor::ctor]
Expand Down
3 changes: 1 addition & 2 deletions datafusion/sql/examples/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::{collections::HashMap, sync::Arc};
use arrow_schema::{DataType, Field, Schema};

use datafusion_common::config::ConfigOptions;
use datafusion_common::{plan_err, Result};
use datafusion_common::{plan_err, Result, TableReference};
use datafusion_expr::planner::ExprPlanner;
use datafusion_expr::WindowUDF;
use datafusion_expr::{
Expand All @@ -32,7 +32,6 @@ use datafusion_functions_aggregate::sum::sum_udaf;
use datafusion_sql::{
planner::{ContextProvider, SqlToRel},
sqlparser::{dialect::GenericDialect, parser::Parser},
TableReference,
};

fn main() {
Expand Down
3 changes: 1 addition & 2 deletions datafusion/sql/src/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1073,11 +1073,10 @@ mod tests {
use sqlparser::parser::Parser;

use datafusion_common::config::ConfigOptions;
use datafusion_common::TableReference;
use datafusion_expr::logical_plan::builder::LogicalTableSource;
use datafusion_expr::{AggregateUDF, ScalarUDF, TableSource, WindowUDF};

use crate::TableReference;

use super::*;

struct TestContextProvider {
Expand Down
6 changes: 5 additions & 1 deletion datafusion/sql/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pub mod parser;
pub mod planner;
mod query;
mod relation;
pub mod resolve;
mod select;
mod set_expr;
mod stack;
Expand All @@ -49,6 +50,9 @@ mod statement;
pub mod unparser;
pub mod utils;
mod values;

#[deprecated(
since = "46.0.0",
note = "use datafusion_common::{ResolvedTableReference, TableReference}"
)]
pub use datafusion_common::{ResolvedTableReference, TableReference};
pub use sqlparser;
Loading