Skip to content

Commit 1aa56aa

Browse files
committed
upgrade column pruning to projection pushdown
Signed-off-by: Runji Wang <[email protected]>
1 parent f4942e8 commit 1aa56aa

7 files changed

+231
-109
lines changed

src/agg.rs

+8-5
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,15 @@ pub type AggSet = Vec<Expr>;
1111
pub fn analyze_aggs(egraph: &EGraph, enode: &Expr) -> AggSet {
1212
use Expr::*;
1313
let x = |i: &Id| egraph[*i].data.aggs.clone();
14-
if let Max(_) | Min(_) | Sum(_) | Avg(_) | Count(_) = enode {
15-
return vec![enode.clone()];
14+
match enode {
15+
Max(_) | Min(_) | Sum(_) | Avg(_) | Count(_) => vec![enode.clone()],
16+
// merge the set from all children
17+
Nested(_) | List(_) | Neg(_) | Not(_) | IsNull(_) | Add(_) | Sub(_) | Mul(_) | Div(_)
18+
| Eq(_) | NotEq(_) | Gt(_) | Lt(_) | GtEq(_) | LtEq(_) | And(_) | Or(_) | Xor(_)
19+
| Asc(_) | Desc(_) => enode.children().iter().flat_map(x).collect(),
20+
// ignore plan nodes
21+
_ => vec![],
1622
}
17-
// merge the set from all children
18-
// TODO: ignore plan nodes
19-
enode.children().iter().flat_map(x).collect()
2023
}
2124

2225
#[derive(Debug, PartialEq, Eq)]

src/lib.rs

+19-3
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,10 @@ define_language! {
7272
// output = aggs || group_keys
7373

7474
// internal functions
75-
"prune" = Prune([Id; 2]), // (prune node child)
76-
// do column prune on `child`
77-
// with the used columns in `node`
75+
"column-merge" = ColumnMerge([Id; 2]), // (column-merge list1 list2)
76+
// return a list of columns from list1 and list2
77+
"column-prune" = ColumnPrune([Id; 2]), // (column-prune filter list)
78+
// remove element from `list` whose column set is not a subset of `filter`
7879
"empty" = Empty(Id), // (empty child)
7980
// returns empty chunk
8081
// with the same schema as `child`
@@ -92,6 +93,21 @@ impl Expr {
9293
}
9394
}
9495

96+
trait ExprExt {
97+
fn as_list(&self) -> &[Id];
98+
}
99+
100+
impl<D> ExprExt for egg::EClass<Expr, D> {
101+
fn as_list(&self) -> &[Id] {
102+
self.iter()
103+
.find_map(|e| match e {
104+
Expr::List(list) => Some(list),
105+
_ => None,
106+
})
107+
.expect("not list")
108+
}
109+
}
110+
95111
/// The unified analysis for all rules.
96112
#[derive(Default)]
97113
pub struct ExprAnalysis;

src/plan.rs

+92-65
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@ pub fn rules() -> Vec<Rewrite> {
1212
let mut rules = vec![];
1313
rules.extend(cancel_rules());
1414
rules.extend(merge_rules());
15-
rules.extend(pushdown_rules());
15+
rules.extend(predicate_pushdown_rules());
16+
rules.extend(projection_pushdown_rules());
1617
rules.extend(join_rules());
1718
rules
1819
}
@@ -25,7 +26,6 @@ fn cancel_rules() -> Vec<Rewrite> { vec![
2526
rw!("filter-true"; "(filter true ?child)" => "?child"),
2627
rw!("filter-false"; "(filter false ?child)" => "(empty ?child)"),
2728
rw!("inner-join-false"; "(join inner false ?l ?r)" => "(empty (join inner false ?l ?r))"),
28-
rw!("identical-proj"; "(proj ?exprs ?child)" => "?child" if schema_is_eq("?exprs", "?child")),
2929

3030
rw!("proj-on-empty"; "(proj ?exprs (empty ?c))" => "(empty ?exprs)"),
3131
rw!("filter-on-empty"; "(filter ?cond (empty ?c))" => "(empty ?c)"),
@@ -53,32 +53,30 @@ fn merge_rules() -> Vec<Rewrite> { vec![
5353
]}
5454

5555
#[rustfmt::skip]
56-
fn pushdown_rules() -> Vec<Rewrite> { vec![
57-
pushdown("proj", "?exprs", "limit", "?limit ?offset"),
58-
pushdown("limit", "?limit ?offset", "proj", "?exprs"),
56+
fn predicate_pushdown_rules() -> Vec<Rewrite> { vec![
5957
pushdown("filter", "?cond", "order", "?keys"),
6058
pushdown("filter", "?cond", "limit", "?limit ?offset"),
6159
pushdown("filter", "?cond", "topn", "?limit ?offset ?keys"),
6260
rw!("pushdown-filter-join";
6361
"(filter ?cond (join inner ?on ?left ?right))" =>
6462
"(join inner (and ?on ?cond) ?left ?right)"
6563
),
66-
rw!("pushdown-join-left";
64+
rw!("pushdown-filter-join-left";
6765
"(join inner (and ?cond1 ?cond2) ?left ?right)" =>
6866
"(join inner ?cond2 (filter ?cond1 ?left) ?right)"
6967
if columns_is_subset("?cond1", "?left")
7068
),
71-
rw!("pushdown-join-left-1";
69+
rw!("pushdown-filter-join-left-1";
7270
"(join inner ?cond1 ?left ?right)" =>
7371
"(join inner true (filter ?cond1 ?left) ?right)"
7472
if columns_is_subset("?cond1", "?left")
7573
),
76-
rw!("pushdown-join-right";
74+
rw!("pushdown-filter-join-right";
7775
"(join inner (and ?cond1 ?cond2) ?left ?right)" =>
7876
"(join inner ?cond2 ?left (filter ?cond1 ?right))"
7977
if columns_is_subset("?cond1", "?right")
8078
),
81-
rw!("pushdown-join-right-1";
79+
rw!("pushdown-filter-join-right-1";
8280
"(join inner ?cond1 ?left ?right)" =>
8381
"(join inner true ?left (filter ?cond1 ?right))"
8482
if columns_is_subset("?cond1", "?right")
@@ -159,6 +157,7 @@ pub fn analyze_columns(egraph: &EGraph, enode: &Expr) -> ColumnSet {
159157
Column(col) => [*col].into_iter().collect(),
160158
Proj([exprs, _]) => x(exprs).clone(),
161159
Agg([exprs, group_keys, _]) => x(exprs).union(x(group_keys)).cloned().collect(),
160+
ColumnPrune([filter, _]) => x(filter).clone(), // inaccurate
162161
_ => {
163162
// merge the columns from all children
164163
(enode.children().iter())
@@ -178,74 +177,100 @@ pub fn merge(to: &mut ColumnSet, from: ColumnSet) -> DidMerge {
178177
}
179178
}
180179

181-
/// Column pruning rules remove unused columns from a plan.
182-
///
183-
/// We introduce an internal node [`Expr::Prune`]
184-
/// to top-down traverse the plan tree and collect all used columns.
180+
/// Pushdown projections and prune unused columns.
185181
#[rustfmt::skip]
186-
pub fn column_pruning_rules() -> Vec<Rewrite> { vec![
187-
// projection is the source of prune node
188-
// note that this rule may be applied for a lot of times,
189-
// so it's not recommand to apply column pruning with other rules together.
190-
rw!("prune-gen";
191-
"(proj ?exprs ?child)" =>
192-
"(proj ?exprs (prune ?exprs ?child))"
182+
pub fn projection_pushdown_rules() -> Vec<Rewrite> { vec![
183+
rw!("identical-proj";
184+
"(proj ?exprs ?child)" => "?child"
185+
if schema_is_eq("?exprs", "?child")
193186
),
194-
// then it is pushed down through the plan node tree,
195-
// merging all used columns along the way
196-
rw!("prune-limit";
197-
"(prune ?set (limit ?limit ?offset ?child))" =>
198-
"(limit ?limit ?offset (prune ?set ?child))"
187+
pushdown("proj", "?exprs", "limit", "?limit ?offset"),
188+
pushdown("limit", "?limit ?offset", "proj", "?exprs"),
189+
rw!("pushdown-proj-order";
190+
"(proj ?exprs (order ?keys ?child))" =>
191+
"(proj ?exprs (order ?keys (proj (column-merge ?exprs ?keys) ?child)))"
199192
),
200-
// note that we use `list` to represent the union of multiple column sets.
201-
// because the column set of `list` is calculated by union all its children.
202-
// see `analyze_columns()`.
203-
rw!("prune-order";
204-
"(prune ?set (order ?keys ?child))" =>
205-
"(order ?keys (prune (list ?set ?keys) ?child))"
193+
rw!("pushdown-proj-topn";
194+
"(proj ?exprs (topn ?limit ?offset ?keys ?child))" =>
195+
"(proj ?exprs (topn ?limit ?offset ?keys (proj (column-merge ?exprs ?keys) ?child)))"
206196
),
207-
rw!("prune-filter";
208-
"(prune ?set (filter ?cond ?child))" =>
209-
"(filter ?cond (prune (list ?set ?cond) ?child))"
197+
rw!("pushdown-proj-filter";
198+
"(proj ?exprs (filter ?cond ?child))" =>
199+
"(proj ?exprs (filter ?cond (proj (column-merge ?exprs ?cond) ?child)))"
210200
),
211-
rw!("prune-agg";
212-
"(prune ?set (agg ?aggs ?groupby ?child))" =>
213-
"(agg ?aggs ?groupby (prune (list ?set ?aggs ?groupby) ?child))"
201+
rw!("pushdown-proj-agg";
202+
"(agg ?aggs ?groupby ?child)" =>
203+
"(agg ?aggs ?groupby (proj (column-merge ?aggs ?groupby) ?child))"
214204
),
215-
rw!("prune-join";
216-
"(prune ?set (join ?type ?on ?left ?right))" =>
217-
"(join ?type ?on
218-
(prune (list ?set ?on) ?left)
219-
(prune (list ?set ?on) ?right)
220-
)"
205+
rw!("pushdown-proj-join";
206+
"(proj ?exprs (join ?type ?on ?left ?right))" =>
207+
"(proj ?exprs (join ?type ?on
208+
(proj (column-prune ?left (column-merge ?exprs ?on)) ?left)
209+
(proj (column-prune ?right (column-merge ?exprs ?on)) ?right)
210+
))"
221211
),
222-
// projection and scan is the sink of prune node
223-
rw!("prune-proj";
224-
"(prune ?set (proj ?exprs ?child))" =>
225-
"(proj (prune ?set ?exprs) ?child))"
212+
// column pruning
213+
rw!("pushdown-proj-scan";
214+
"(proj ?exprs (scan ?table ?columns))" =>
215+
"(proj ?exprs (scan ?table (column-prune ?exprs ?columns)))"
226216
),
227-
rw!("prune-scan";
228-
"(prune ?set (scan ?table ?columns))" =>
229-
"(scan ?table (prune ?set ?columns))"
217+
// evaluate 'column-merge' and 'column-prune'
218+
rw!("column-merge";
219+
"(column-merge ?list1 ?list2)" =>
220+
{ ColumnMerge {
221+
lists: [var("?list1"), var("?list2")],
222+
}}
230223
),
231-
// finally the prune is applied to a list of expressions
232-
rw!("prune-list";
233-
"(prune ?set ?list)" =>
234-
{ PruneList {
235-
set: var("?set"),
224+
rw!("column-prune";
225+
"(column-prune ?filter ?list)" =>
226+
{ ColumnPrune {
227+
filter: var("?filter"),
236228
list: var("?list"),
237229
}}
238230
if is_list("?list")
239231
),
240232
]}
241233

242-
/// Remove unused columns in `set` from `list`.
243-
struct PruneList {
244-
set: Var,
234+
/// Return a list of columns from `lists`.
235+
struct ColumnMerge {
236+
lists: [Var; 2],
237+
}
238+
239+
impl Applier<Expr, ExprAnalysis> for ColumnMerge {
240+
fn apply_one(
241+
&self,
242+
egraph: &mut EGraph,
243+
eclass: Id,
244+
subst: &Subst,
245+
_searcher_ast: Option<&PatternAst<Expr>>,
246+
_rule_name: Symbol,
247+
) -> Vec<Id> {
248+
let list1 = &egraph[subst[self.lists[0]]].data.columns;
249+
let list2 = &egraph[subst[self.lists[1]]].data.columns;
250+
let mut list: Vec<&Column> = list1.union(list2).collect();
251+
list.sort_unstable_by_key(|c| c.as_str());
252+
let list = list
253+
.into_iter()
254+
.map(|col| egraph.lookup(Expr::Column(col.clone())).unwrap())
255+
.collect();
256+
let id = egraph.add(Expr::List(list));
257+
258+
// copied from `Pattern::apply_one`
259+
if egraph.union(eclass, id) {
260+
vec![eclass]
261+
} else {
262+
vec![]
263+
}
264+
}
265+
}
266+
267+
/// Remove element from `list` whose column set is not a subset of `filter`
268+
struct ColumnPrune {
269+
filter: Var,
245270
list: Var,
246271
}
247272

248-
impl Applier<Expr, ExprAnalysis> for PruneList {
273+
impl Applier<Expr, ExprAnalysis> for ColumnPrune {
249274
fn apply_one(
250275
&self,
251276
egraph: &mut EGraph,
@@ -254,10 +279,10 @@ impl Applier<Expr, ExprAnalysis> for PruneList {
254279
_searcher_ast: Option<&PatternAst<Expr>>,
255280
_rule_name: Symbol,
256281
) -> Vec<Id> {
257-
let used_columns = &egraph[subst[self.set]].data.columns;
258-
let list = egraph[subst[self.list]].nodes[0].as_list();
282+
let columns = &egraph[subst[self.filter]].data.columns;
283+
let list = egraph[subst[self.list]].as_list();
259284
let pruned = (list.iter().cloned())
260-
.filter(|id| !egraph[*id].data.columns.is_disjoint(used_columns))
285+
.filter(|id| egraph[*id].data.columns.is_subset(columns))
261286
.collect();
262287
let id = egraph.add(Expr::List(pruned));
263288

@@ -273,7 +298,9 @@ impl Applier<Expr, ExprAnalysis> for PruneList {
273298
/// Returns true if the variable is a list.
274299
fn is_list(v: &str) -> impl Fn(&mut EGraph, Id, &Subst) -> bool {
275300
let v = var(v);
276-
// we have no rule to rewrite a list,
277-
// so it should only contains one `Expr::List` in `nodes`.
278-
move |egraph, _, subst| matches!(egraph[subst[v]].nodes.first(), Some(Expr::List(_)))
301+
move |egraph, _, subst| {
302+
egraph[subst[v]]
303+
.iter()
304+
.any(|node| matches!(node, Expr::List(_)))
305+
}
279306
}

src/schema.rs

-4
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,6 @@ pub fn analyze_schema(egraph: &EGraph, enode: &Expr) -> Schema {
3030
Proj([exprs, _]) => x(exprs)?,
3131
Agg([exprs, group_keys, _]) => concat(x(exprs)?, x(group_keys)?),
3232

33-
// prune node may changes the schema, but we don't know the exact result for now
34-
// so just return `None` to indicate "unknown"
35-
Prune(_) => return None,
36-
3733
// not plan node
3834
_ => return None,
3935
})
File renamed without changes.

tests/8_column_pruning.rs

-32
This file was deleted.

0 commit comments

Comments
 (0)