-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-51316][PYTHON] Allow Arrow batches in bytes instead of number of rows #50080
Conversation
0115d6e
to
0375159
Compare
e8c8238
to
15d3b07
Compare
@@ -112,6 +112,16 @@ class ArrowWriter(val root: VectorSchemaRoot, fields: Array[ArrowFieldWriter]) { | |||
count += 1 | |||
} | |||
|
|||
def sizeInBytes(): Int = { | |||
var i = 0 | |||
var bytes = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This represents the size of a single row and should work for primitive types. But what if we have a string or binary type, which can vary in size?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This actually represents the size of the whole Arrow batch. ArrowFieldWriter will be in charge of writing single column, and here we get the size of all columns. Since we're getting the size of the buffer being used in individual ArrowFieldWriter, it should work regardless of specific types.
cc @viirya |
// DO NOT use iter.grouped(). See BatchIterator. | ||
val batchIter = | ||
if (batchSize > 0) new BatchIterator(inputIter, batchSize) else Iterator(inputIter) | ||
val batchIter = Iterator(inputIter) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I don't see the new iterator is "batched" by ArrowRRunner
. So it is just removed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Like ArrowPythonWithNamedArgumentRunner
which is now batching rows internally with BatchedPythonArrowInput
, but I don't see such thing on ArrowRRunner too. Is it intentional?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I actually intentionally did not touch R patch because SparkR is deprecated. So I made this trait and used only for Scalar Python UDF cases.
def getSizeInBytes(): Int = { | ||
valueVector.setValueCount(count) | ||
// Before calling getBufferSizeFor, we need to call | ||
// `setValueCount`, see https://github.com/apache/arrow/pull/9187#issuecomment-763362710 | ||
valueVector.getBufferSizeFor(count) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, based on the API doc https://arrow.apache.org/docs/java/vector.html
After this step, the vector enters an immutable state. In other words, we
should no longer mutate it. (Unless we reuse the vector by allocating it
again. This will be discussed shortly.)
A Java Arrow field vector after called this method, should not be modified. But I think this patch will call getSizeInBytes
during inserting values into a vector.
It might cause unexpected error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah .. this has been discussed a lot, e.g., apache/arrow#9187 but it has been many years in production without an issue so I assume this is fine ....
var numRowsInBatch: Int = 0 | ||
|
||
def underBatchSizeLimit: Boolean = | ||
(maxBytesPerBatch == Int.MaxValue) || (arrowWriter.sizeInBytes() < maxBytesPerBatch) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Due to the API issue https://github.com/apache/spark/pull/50080/files#r1971063958, maybe we can call ArrowWriter.bytesWritten
(https://arrow.apache.org/docs/dev/java/reference/org/apache/arrow/vector/ipc/ArrowWriter.html#bytesWritten())?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. Let me followup for this separately if you don't mind.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay
@classmethod | ||
def setUpClass(cls): | ||
MapInArrowTests.setUpClass() | ||
# Set it to a small odd value to exercise batching logic for all test cases |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't get this comment. So you mean you set maxRecordsPerBatch
to 3 to make it meet earlier than maxBytesPerBatch
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I meant for both. Actually I think I should reduce the byte size some more so both number of records and bytes can be tested.
Merged to master and branch-4.0. |
…of rows ### What changes were proposed in this pull request? This PR allows Arrow batches in bytes instead of number of rows ### Why are the changes needed? We enabled `spark.sql.execution.pythonUDF.arrow.enabled` by default, and we should make sure users won't hit OOM. ### Does this PR introduce _any_ user-facing change? Yes. Now we will make the Arrow batches in bytes 256MB by default, and users can configure this ### How was this patch tested? Tested with changing default value to 1KB, and added a unittest. Also manually tested as below: ```python from pyspark.sql.functions import pandas_udf import pandas as pd # spark.conf.set("spark.sql.execution.arrow.maxBytesPerBatch", "1K") # spark.conf.set("spark.sql.execution.arrow.maxBytesPerBatch", "2K") # spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1") # spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "10") pandas_udf("long") def func(s: pd.Series) -> pd.Series: return s a = spark.range(100000).select(func("id")).collect() ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes #50080 from HyukjinKwon/bytes-arrow. Authored-by: Hyukjin Kwon <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]> (cherry picked from commit 53fc763) Signed-off-by: Hyukjin Kwon <[email protected]>
What changes were proposed in this pull request?
This PR allows Arrow batches in bytes instead of number of rows
Why are the changes needed?
We enabled
spark.sql.execution.pythonUDF.arrow.enabled
by default, and we should make sure users won't hit OOM.Does this PR introduce any user-facing change?
Yes. Now we will make the Arrow batches in bytes 256MB by default, and users can configure this
How was this patch tested?
Tested with changing default value to 1KB, and added a unittest. Also manually tested as below:
Was this patch authored or co-authored using generative AI tooling?
No.