Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

load data before comms and nccl abort to solve issues with empty partitions #464

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion jvm/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ repository, usually in your `~/.m2/repository`.
Add the artifact jar to the Spark, for example:
```bash
ML_JAR="target/rapids-4-spark-ml_2.12-23.08.0-SNAPSHOT.jar"
PLUGIN_JAR="~/.m2/repository/com/nvidia/rapids-4-spark_2.12/23.08.1-SNAPSHOT/rapids-4-spark_2.12-23.08.1-SNAPSHOT.jar"
PLUGIN_JAR="~/.m2/repository/com/nvidia/rapids-4-spark_2.12/23.08.2-SNAPSHOT/rapids-4-spark_2.12-23.08.2-SNAPSHOT.jar"

$SPARK_HOME/bin/spark-shell --master $SPARK_MASTER \
--driver-memory 20G \
Expand Down
2 changes: 1 addition & 1 deletion notebooks/databricks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ If you already have a Databricks account, you can run the example notebooks on a
spark.task.resource.gpu.amount 1
spark.databricks.delta.preview.enabled true
spark.python.worker.reuse true
spark.executorEnv.PYTHONPATH /databricks/jars/rapids-4-spark_2.12-23.08.1.jar:/databricks/spark/python
spark.executorEnv.PYTHONPATH /databricks/jars/rapids-4-spark_2.12-23.08.2.jar:/databricks/spark/python
spark.sql.execution.arrow.maxRecordsPerBatch 100000
spark.rapids.memory.gpu.minAllocFraction 0.0001
spark.plugins com.nvidia.spark.SQLPlugin
Expand Down
4 changes: 2 additions & 2 deletions notebooks/databricks/init-pip-cuda-11.8.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
SPARK_RAPIDS_ML_ZIP=/dbfs/path/to/zip/file
# IMPORTANT: specify RAPIDS_VERSION fully 23.8.0 and not 23.8
# also RAPIDS_VERSION (python) fields should omit any leading 0 in month/minor field (i.e. 23.8.0 and not 23.08.0)
# while SPARK_RAPIDS_VERSION (jar) should have leading 0 in month/minor (e.g. 23.08.1 and not 23.8.1)
# while SPARK_RAPIDS_VERSION (jar) should have leading 0 in month/minor (e.g. 23.08.2 and not 23.8.2)
RAPIDS_VERSION=23.8.0
SPARK_RAPIDS_VERSION=23.08.1
SPARK_RAPIDS_VERSION=23.08.2

curl -L https://repo1.maven.org/maven2/com/nvidia/rapids-4-spark_2.12/${SPARK_RAPIDS_VERSION}/rapids-4-spark_2.12-${SPARK_RAPIDS_VERSION}-cuda11.jar -o /databricks/jars/rapids-4-spark_2.12-${SPARK_RAPIDS_VERSION}.jar

Expand Down
2 changes: 1 addition & 1 deletion python/benchmark/databricks/gpu_cluster_spec.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ cat <<EOF
"spark.task.cpus": "1",
"spark.databricks.delta.preview.enabled": "true",
"spark.python.worker.reuse": "true",
"spark.executorEnv.PYTHONPATH": "/databricks/jars/rapids-4-spark_2.12-23.08.1.jar:/databricks/spark/python",
"spark.executorEnv.PYTHONPATH": "/databricks/jars/rapids-4-spark_2.12-23.08.2.jar:/databricks/spark/python",
"spark.sql.files.minPartitionNum": "2",
"spark.sql.execution.arrow.maxRecordsPerBatch": "10000",
"spark.executor.cores": "8",
Expand Down
4 changes: 2 additions & 2 deletions python/benchmark/databricks/init-pip-cuda-11.8.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ SPARK_RAPIDS_ML_ZIP=/dbfs/path/to/spark-rapids-ml.zip
BENCHMARK_ZIP=/dbfs/path/to/benchmark.zip
# IMPORTANT: specify rapids fully 23.8.0 and not 23.8
# also RAPIDS_VERSION (python) fields should omit any leading 0 in month/minor field (i.e. 23.8.0 and not 23.08.0)
# while SPARK_RAPIDS_VERSION (jar) should have leading 0 in month/minor (e.g. 23.08.1 and not 23.8.1)
# while SPARK_RAPIDS_VERSION (jar) should have leading 0 in month/minor (e.g. 23.08.2 and not 23.8.1)
RAPIDS_VERSION=23.8.0
SPARK_RAPIDS_VERSION=23.08.1
SPARK_RAPIDS_VERSION=23.08.2

curl -L https://repo1.maven.org/maven2/com/nvidia/rapids-4-spark_2.12/${SPARK_RAPIDS_VERSION}/rapids-4-spark_2.12-${SPARK_RAPIDS_VERSION}-cuda11.jar -o /databricks/jars/rapids-4-spark_2.12-${SPARK_RAPIDS_VERSION}.jar

Expand Down
2 changes: 1 addition & 1 deletion python/run_benchmark.sh
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ EOF

if [[ $cluster_type == "gpu_etl" ]]
then
SPARK_RAPIDS_VERSION=23.08.1
SPARK_RAPIDS_VERSION=23.08.2
rapids_jar=${rapids_jar:-rapids-4-spark_2.12-$SPARK_RAPIDS_VERSION.jar}
if [ ! -f $rapids_jar ]; then
echo "downloading spark rapids jar"
Expand Down
8 changes: 7 additions & 1 deletion python/src/spark_rapids_ml/common/cuml_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,13 @@ def __exit__(self, *args: Any) -> None:
if not self.enable:
return
assert self._nccl_comm is not None
self._nccl_comm.destroy()

# if no exception cleanup nicely, otherwise abort
if not args[0]:
self._nccl_comm.destroy()
else:
self._nccl_comm.abort()

del self._nccl_comm

del self._handle
Expand Down
58 changes: 28 additions & 30 deletions python/src/spark_rapids_ml/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,6 @@ def _train_udf(pdf_iter: Iterator[pd.DataFrame]) -> pd.DataFrame:
from pyspark import BarrierTaskContext

logger = get_logger(cls)
logger.info("Initializing cuml context")

import cupy as cp

Expand All @@ -522,38 +521,37 @@ def _train_udf(pdf_iter: Iterator[pd.DataFrame]) -> pd.DataFrame:
# set gpu device
_CumlCommon.set_gpu_device(context, is_local)

# handle the input
# inputs = [(X, Optional(y)), (X, Optional(y))]
logger.info("Loading data into python worker memory")
inputs = []
sizes = []
for pdf in pdf_iter:
sizes.append(pdf.shape[0])
if multi_col_names:
features = np.array(pdf[multi_col_names], order=array_order)
else:
features = np.array(list(pdf[alias.data]), order=array_order)
# experiments indicate it is faster to convert to numpy array and then to cupy array than directly
# invoking cupy array on the list
if cuda_managed_mem_enabled:
features = cp.array(features)

label = pdf[alias.label] if alias.label in pdf.columns else None
row_number = (
pdf[alias.row_number] if alias.row_number in pdf.columns else None
)
inputs.append((features, label, row_number))

if len(sizes) == 0 or all(sz == 0 for sz in sizes):
raise RuntimeError(
"A python worker received no data. Please increase amount of data or use fewer workers."
)

logger.info("Initializing cuml context")
with CumlContext(
partition_id, num_workers, context, enable_nccl, require_ucx
) as cc:
# handle the input
# inputs = [(X, Optional(y)), (X, Optional(y))]
logger.info("Loading data into python worker memory")
inputs = []
sizes = []
for pdf in pdf_iter:
sizes.append(pdf.shape[0])
if multi_col_names:
features = np.array(pdf[multi_col_names], order=array_order)
else:
features = np.array(list(pdf[alias.data]), order=array_order)
# experiments indicate it is faster to convert to numpy array and then to cupy array than directly
# invoking cupy array on the list
if cuda_managed_mem_enabled:
features = cp.array(features)

label = pdf[alias.label] if alias.label in pdf.columns else None
row_number = (
pdf[alias.row_number]
if alias.row_number in pdf.columns
else None
)
inputs.append((features, label, row_number))

if len(sizes) == 0 or all(sz == 0 for sz in sizes):
raise RuntimeError(
"A python worker received no data. Please increase amount of data or use fewer workers."
)

params[param_alias.handle] = cc.handle
params[param_alias.part_sizes] = sizes
params[param_alias.num_cols] = dimension
Expand Down
2 changes: 1 addition & 1 deletion python/tests/test_nearest_neighbors.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ def test_nearest_neighbors(
random_state=0,
) # make_blobs creates a random dataset of isotropic gaussian blobs.

# set average norm to be 1 to allow comparisons with default error thresholds
# set average norm sq to be 1 to allow comparisons with default error thresholds
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was still wrong so correcting in this PR.

# below
root_ave_norm_sq = np.sqrt(np.average(np.linalg.norm(X, ord=2, axis=1) ** 2))
X = X / root_ave_norm_sq
Expand Down
Loading