Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alternative approaches to "fan-out" style RepartitionExec #14287

Open
westonpace opened this issue Jan 24, 2025 · 6 comments
Open

Alternative approaches to "fan-out" style RepartitionExec #14287

westonpace opened this issue Jan 24, 2025 · 6 comments
Labels
enhancement New feature or request

Comments

@westonpace
Copy link
Member

Is your feature request related to a problem or challenge?

RepartitionExec is often used to fan out batches from a single partition into multiple partitions. For example, if we are scanning a very big parquet file we use the RepartitionExec to take the batches we receive from the Parquet file and fan it out to multiple partitions so that the data can be processed in parallel. Note: these RepartitionExec are often not setup by hand but rather inserted by the plan optimizer.

The current approach sets up a channel per partition and (I believe) emits batches in a round-robin order. This works well when the consumer is faster than the producer (typical in simple queries) or the workload is evenly balanced. However, when the workload is skewed this leads to problems.

  • The query can use too much memory because the data builds up in the channels of the slower consumers. (Note: if all consumers are slow then all outputs will fill and that does trigger the producer to pause).
  • There are potential performance disadvantages because we have cores that are ready to do processing but their queue is empty and meanwhile there are cores that are busy and have deep queues.

Describe the solution you'd like

Work stealing queues come to mind. I think there's a some literature on putting these to use in databases. They can be designed fairly efficiently. Maybe there are some solid Rust implementations (building one from scratch might be a bit annoying).

Otherwise, a simple and slow mutex-bound MPMC queue might be a nice alternative to at least avoid the memory issues (if not fix the performance issues).

There could be plenty of other approaches as well.

Describe alternatives you've considered

I don't have a good workaround at the moment.

Additional context

A smattering of Discord conversation: https://discord.com/channels/885562378132000778/1331914577935597579

@westonpace westonpace added the enhancement New feature or request label Jan 24, 2025
@alamb
Copy link
Contributor

alamb commented Jan 25, 2025

Thanks @westonpace for filing this -- I agree there is likely some improvements in this area that would be beneficial

I believe @crepererum spent quite a bit of time on the current RepartitionExec so maybe he has some comments to share

Also, I was just speaking with @ozankabak the other day and this exact topic came up (Improvements in RepartitionExec). I can't remember if he said @jayzhan211 was thinking about it or not 🤔

@alamb
Copy link
Contributor

alamb commented Jan 25, 2025

BTW my suggestion for a first step would be to get some example query / test case that shows where the current algorithm doesn't work very well. Then we can evaluate potential solutions in the context of how they affect that example

@berkaysynnada
Copy link
Contributor

We have designed a poll-based repartition mechanism that polls its input whenever any of the output partitions are polled. This approach deviates from the round-robin pattern, and instead ensures a truly even workload distribution for consumer partitions. A batch is sent to the partition that has completed its computation and is ready to process the next data.

This mechanism also exhibits prefetching behavior, similar to SortPreservingMerge, although the prefetching is limited to a single batch (or potentially up to the number of partitions—this will be finalized based on benchmark results).

The implementation is currently underway, and the initial benchmark results are very promising. Theoretically, this approach should perform better especially in scenarios where the producer pace is higher than consumer side, which is the case I believe @westonpace mentions in the issue description.

@Weijun-H is working on the implementation, and I hope we open the PR in the coming weeks once it is in a robust and optimized state.

@crepererum
Copy link
Contributor

IIRC the RepartitionExec has the following requirements:

  • performance: "one bit lock" is probably a no-go, esp. in a wide MPMC setup.
  • feed ALL consumers: Some downstream operators assume that they can more or less poll all consumer channels. At least we've seen dead-locks in the post when RepartitonExec assumes that all consumers are polled at the same rate. Hence you MUST feed empty consumers.
  • skewed inputs: Inputs may be skewed, i.e. provide different data rates and lengths.
  • limited buffering: A simple implementation that we've once had just had a tokio task per input and polled them until they are empty. That's NOT gonna work in many production systems since you're going to blow up memory, esp. when the outputs are eventually used with LIMIT clauses and/or slow processing nodes. This is the reason for the current "distributor channels" construct.

The work stealing approach sounds reasonable but is also somewhat a hack. I think if you don't know the output polling rate, then distributing data to the different outputs at a fixed rate (that's what round-robin does) isn't a great idea. I think finding a fast MPMC channel w/ a fixed capacity (to implement limited buffering) might be good.

@westonpace
Copy link
Member Author

BTW my suggestion for a first step would be to get some example query / test case that shows where the current algorithm doesn't work very well. Then we can evaluate potential solutions in the context of how they affect that example

I'll make an attempt this week to create a reproducer that triggers the memory issues we were seeing. The filter was "string contains" (or possibly "not string contains") on a string column that contained "LLM prompts" (e.g. paragraph sized English prose generally less than 1KB)

@Weijun-H
Copy link
Member

Weijun-H commented Jan 27, 2025

BTW my suggestion for a first step would be to get some example query / test case that shows where the current algorithm doesn't work very well. Then we can evaluate potential solutions in the context of how they affect that example

I'll make an attempt this week to create a reproducer that triggers the memory issues we were seeing. The filter was "string contains" (or possibly "not string contains") on a string column that contained "LLM prompts" (e.g. paragraph sized English prose generally less than 1KB)

I am working on OnDemandRepartition these weeks, these use cases would benefit the benchmark.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

5 participants