Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support simple/cross lateral joins #14595

Open
wants to merge 8 commits into
base: main
Choose a base branch
from

Conversation

skyzh
Copy link

@skyzh skyzh commented Feb 11, 2025

Which issue does this PR close?

Rationale for this change

Add partial lateral join support.

What changes are included in this PR?

The cross lateral join logical plan is like:

CrossJoin
  Left
  Right (with outer column reference)

We added a new rule DecorrelateLateralJoin to handle this case. We reuse the scalar/predicate subquery handling code for decorrelating lateral joins.

Are these changes tested?

New test case in sqllogictest.

Are there any user-facing changes?

Some lateral joins are now supported. Note that we still have some of them in joins.slt that cannot be planned, and can be fixed gradually.

@github-actions github-actions bot added optimizer Optimizer rules sqllogictest SQL Logic Tests (.slt) labels Feb 11, 2025
@skyzh
Copy link
Author

skyzh commented Feb 11, 2025

cc @alamb would you please help take a look as you were engaged on lateral join discussions before? Thanks :)

@skyzh skyzh force-pushed the skyzh/lateral-planning branch from 1054e96 to ab3832f Compare February 11, 2025 01:12
@skyzh skyzh changed the title support simple lateral joins support simple/scalar lateral joins Feb 11, 2025
@skyzh skyzh changed the title support simple/scalar lateral joins support simple/cross lateral joins Feb 11, 2025
@skyzh
Copy link
Author

skyzh commented Feb 12, 2025

should pass all sqllogictests :)

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you very much for starting this work @skyzh 🙏

Do you know of a reference / description somewhere for how to plan LATERAL joins? I am not familar with their semantics nor how we can rewrite them to subqueries.

My one smoke test was to get an explain plan in DuckDB and it produced a plan with 2 joins which seems diffrent than what this PR does 🤔

I also left some code suggestions, which are minor compared to the plan

Basically I don't know what the plans for such queries should look like so I am not sure if this code does the right thing

Comment on lines +89 to +102
struct Visitor {
contains: bool,
}
impl<'n> TreeNodeVisitor<'n> for Visitor {
type Node = LogicalPlan;
fn f_down(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
if plan.contains_outer_reference() {
self.contains = true;
Ok(TreeNodeRecursion::Stop)
} else {
Ok(TreeNodeRecursion::Continue)
}
}
}
let mut visitor = Visitor { contains: false };
plan.visit_with_subqueries(&mut visitor).unwrap();
visitor.contains
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can do this more simply using apply_subqueries as described here: https://datafusion.apache.org/library-user-guide/query-optimizer.html#recursively-walk-an-expression-tree

Something like (untested):

fn plan_contains_outer_reference(plan: &LogicalPlan) -> bool {
  let mut contains = false;
  plan.apply_subqueries(|plan| {
            if plan.contains_outer_reference() {
                self.contains = true;
                Ok(TreeNodeRecursion::Stop)
            } else {
                Ok(TreeNodeRecursion::Continue)
            }
  }).unwrap();
  contains
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not equivalent to the current visitor unfortunately because apply_subqueries only visits the subqueries 🤣

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use apply_with_subqueries().

if join.join_type != JoinType::Inner {
return Ok(Transformed::no(plan));
}
// TODO: this makes the rule to be quadratic to the number of nodes, in theory, we can build this property
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe you can cause rewrite to be called in a bottom up fashion by using:
https://docs.rs/datafusion/latest/datafusion/optimizer/trait.OptimizerRule.html#method.apply_order

Then you could maybe add a contains_outer_reference type flag to DecorrelateLateralJoin

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

24a375d I decided to use TreeNodeRecursion::Jump to skip all children to make it O(n).

statement ok
CREATE TABLE t0(v0 BIGINT, v1 BIGINT);

statement ok
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

Could we please add some negative cases to these tests too such as:

  1. When there are two subqueries
  2. When the subqueries have no correlation

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only added (2) in c353246, I have some test cases for (1) in the comments but I don't plan to support them right now.

@skyzh
Copy link
Author

skyzh commented Feb 13, 2025

Okay, I just realized that the plan is not correct. I'll need to do more transformation to make it work :(

So back to the semantics of the lateral join,

SELECT *
  FROM t0,
  LATERAL (SELECT sum(v1) FROM t1 WHERE t0.v0 = t1.v0);

is basically saying that "for every row in t0, run the lateral query", which is equivalent to the following program:

for row in t0:
  lateral_result = run_query(SELECT sum(v1) FROM t1 WHERE t0.v0 = t1.v0)
  for row2 in lateral_result:
    output.append(row ++ row2) 

Note that sum will produce 1 row of null even when the right table doesn't contain any matching row.

The produced plan with this patch is:

Inner Join t0.v0 = t1.v0
  Scan t0
  Aggregate: groupBy=[[t1.v0]], aggr=[[sum(t1.v1)]]
    Scan t1

If t0 = (0, 0), (1, 1) and t1 = (0, 0), the output should have 2 rows, while this plan will only produce 1 row.

The correct plan should be:

Left Outer Join t0.v0 = t1.v0
  Scan t0
  Aggregate: groupBy=[[t1.v0]], aggr=[[sum(t1.v1)]]
    Scan t1

And aggregations like count(*) needs to be handled separately, which I already see some code on that in the codebase.

Back to the DuckDB plan. They are using the Hyper subquery unnesting. The idea of Hyper is that it will first eliminate duplicates on the join column. Therefore, in the case where the left table has a lot of duplicated rows, it only needs to compute the subquery once for all such rows (if it turns out to be a nested loop join). So what they are doing is that: eliminate the duplicates from the left table, then compute the subquery result, and finally join it back to the original one, which will be planned like:

^1:
Left Outer Join t0.v0 = t1.v0
  DistinctScan/DelimScan t0
  Aggregate: groupBy=[[t1.v0]], aggr=[[sum(t1.v1)]]
    Scan t1

which computes the right side row for every value in the left table. We will get (distinct t0.*, sum(t1.v1)) at this step. And then, deduplicate the result of the subquery (this step seems unnecessary?)

^2:
Dedup (HashJoin)
  ^1

And finally, join the result back to the original table,

HashJoin
  Scan t0
  ^2

The result should be the same as the corrected plan I mentioned before.

@alamb alamb marked this pull request as draft February 13, 2025 10:29
@alamb
Copy link
Contributor

alamb commented Feb 13, 2025

So back to the semantics of the lateral join,

Thank you @skyzh -- this is an amazing description. THank you -- hopefully we can include that in the source code eventually once we work through this issue

Ideally in my mind when someone found the lateral join support in DataFusion they would also have enough background via comments / links to understand what was going on.

Signed-off-by: Alex Chi Z <[email protected]>
Signed-off-by: Alex Chi Z <[email protected]>
@skyzh skyzh force-pushed the skyzh/lateral-planning branch from 449ae6b to c353246 Compare February 16, 2025 23:01
@skyzh
Copy link
Author

skyzh commented Feb 16, 2025

The current one-pass subquery unnesting implementation in PullUpCorrelatedExpr does not handle cases such as when we have joins in subqueries:

SELECT * FROM t0, LATERAL (SELECT 1), LATERAL (SELECT * FROM t1 WHERE t0.v0 = t1.v0);

Just note it down here and we can improve it in the future. We could implement a rule-based unnesting to push down the lateral join operator.

@skyzh
Copy link
Author

skyzh commented Feb 16, 2025

For multiple subqueries like:

SELECT * FROM t0, LATERAL (SELECT * FROM t1 WHERE t0.v0 = t1.v0 AND t1.v1 > (SELECT SUM(v1) FROM t2 WHERE t1.v0 = t2.v0));

I'm not planning to support it in this patch now as I don't think the current one-pass de-correlate process can handle it correctly without significant refactor :(

Signed-off-by: Alex Chi Z <[email protected]>
Signed-off-by: Alex Chi Z <[email protected]>
@skyzh
Copy link
Author

skyzh commented Feb 16, 2025

I think this patch will not regress existing supported cases as they get fully decorrelated with the scalar query rule and the predicate subquery rule. The patch unlocks a new set of plans produced by lateral joins, though it is possible that we could have missed some cases and generated incorrect plans for those newly-supported one. For queries that can be optimized into a physical plan: count(*) is not handled correctly. For queries they we know we don't support for now: as in the above comments.

@skyzh skyzh marked this pull request as ready for review February 16, 2025 23:18
@skyzh
Copy link
Author

skyzh commented Feb 16, 2025

And ready for review again :)

After trying understanding what's happening in decorrelate.rs, I think we need new code path to support a variety of logical plans produced by lateral joins. The key is that we should make the decorrelation code aware of the joins while de-correlating, instead of first gathering the information and then generate a join at the top.

If we are going towards making the optimizer able to unnest any subqueries and lateral joins, then we will likely have a meta rule that recursively apply the following rules top-down:

  • Convert join operators into LogicalApply if the right side of the join contains outer column reference. This can also be done in the SQL->logical phase when we encounter a lateral join. For the correlated filter predicate and scalar subqueries (exists/in), we can also convert them into the apply operator in the future.
  • Have a set of rules like: push down apply->join, push down apply->aggregation, push down apply->filter, etc.
  • Apply these rules top-down until no outer column reference is in the plan tree.

We can either use the Hyper unnesting rules (we implemented it in CMU-DB's optd optimizer) or the SQL server unnesting rules (which we've implemented in risinglight).

This meta unnesting rule is more powerful than what we have right now (decorrelate predicate subquery + scalar subquery unnesting rule) and we can eventually replace these two rules with the new meta unnesting rule in the future.

@skyzh
Copy link
Author

skyzh commented Feb 16, 2025

@skyzh skyzh marked this pull request as draft February 16, 2025 23:39
@skyzh skyzh marked this pull request as ready for review February 16, 2025 23:44
@skyzh skyzh requested a review from alamb February 19, 2025 16:24
@alamb alamb mentioned this pull request Feb 25, 2025
10 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
optimizer Optimizer rules sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants