Skip to content

Commit

Permalink
[testing-on-gke] Support instance-id and more complex configurations (#…
Browse files Browse the repository at this point in the history
…2359)

* Support special cases

- Support instance_id (unique-id for the current test-run, allowed
  multiple runs concurrently)
   1. Support env variable instance_id in run-script.
   2. Support instance_id as argument in fio/dlio
      run_tests.py, parse_logs.py
   3. Pass instance_id in fio/dlio pod yaml config
- Support multiple combinations of blockSize,numThreads,
  filesPerThread, combinations for a given fileSize for fio tests.

* address self-review comment
  • Loading branch information
gargnitingoogle authored Aug 26, 2024
1 parent 697baff commit dc187d1
Show file tree
Hide file tree
Showing 9 changed files with 129 additions and 67 deletions.
51 changes: 31 additions & 20 deletions perfmetrics/scripts/testing_on_gke/examples/dlio/parse_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
sys.path.append("../")
from utils.utils import get_memory, get_cpu, standard_timestamp, is_mash_installed

_LOCAL_LOGS_LOCATION = "../../bin/dlio-logs"
_LOCAL_LOGS_LOCATION = "../../bin/dlio-logs/logs"

record = {
"pod_name": "",
Expand All @@ -44,7 +44,7 @@
}


def downloadDlioOutputs(dlioWorkloads):
def downloadDlioOutputs(dlioWorkloads: set, instanceId: str):
for dlioWorkload in dlioWorkloads:
print(f"Downloading DLIO logs from the bucket {dlioWorkload.bucket}...")
result = subprocess.run(
Expand All @@ -55,7 +55,7 @@ def downloadDlioOutputs(dlioWorkloads):
"cp",
"-r",
"--no-user-output-enabled", # do not print names of files being copied
f"gs://{dlioWorkload.bucket}/logs",
f"gs://{dlioWorkload.bucket}/logs/{instanceId}",
_LOCAL_LOGS_LOCATION,
],
capture_output=False,
Expand Down Expand Up @@ -92,6 +92,11 @@ def downloadDlioOutputs(dlioWorkloads):
),
required=True,
)
parser.add_argument(
"--instance-id",
help="unique string ID for current test-run",
required=True,
)
args = parser.parse_args()

try:
Expand All @@ -102,7 +107,7 @@ def downloadDlioOutputs(dlioWorkloads):
dlioWorkloads = dlio_workload.ParseTestConfigForDlioWorkloads(
args.workload_config
)
downloadDlioOutputs(dlioWorkloads)
downloadDlioOutputs(dlioWorkloads, args.instance_id)

"""
"{num_files_train}-{mean_file_size}-{batch_size}":
Expand All @@ -120,7 +125,7 @@ def downloadDlioOutputs(dlioWorkloads):
if not mash_installed:
print("Mash is not installed, will skip parsing CPU and memory usage.")

for root, _, files in os.walk(_LOCAL_LOGS_LOCATION):
for root, _, files in os.walk(_LOCAL_LOGS_LOCATION + "/" + args.instance_id):
if files:
print(f"Parsing directory {root} ...")
per_epoch_stats_file = root + "/per_epoch_stats.json"
Expand Down Expand Up @@ -153,9 +158,9 @@ def downloadDlioOutputs(dlioWorkloads):

if key not in output:
output[key] = {
"num_files_train": part_list[2],
"mean_file_size": part_list[3],
"batch_size": part_list[4],
"num_files_train": part_list[-3],
"mean_file_size": part_list[-2],
"batch_size": part_list[-1],
"records": {
"local-ssd": [],
"gcsfuse-generic": [],
Expand All @@ -167,7 +172,7 @@ def downloadDlioOutputs(dlioWorkloads):
r = record.copy()
r["pod_name"] = summary_data["hostname"]
r["epoch"] = i + 1
r["scenario"] = "-".join(part_list[5:])
r["scenario"] = root.split("/")[-1]
r["train_au_percentage"] = round(
summary_data["metric"]["train_au_percentage"][i], 2
)
Expand Down Expand Up @@ -221,7 +226,7 @@ def downloadDlioOutputs(dlioWorkloads):
" (s),GPU Utilization (%),Throughput (sample/s),Throughput"
" (MB/s),Throughput over Local SSD (%),GCSFuse Lowest Memory (MB),GCSFuse"
" Highest Memory (MB),GCSFuse Lowest CPU (core),GCSFuse Highest CPU"
" (core),Pod,Start,End,GcsfuseMountOptions\n"
" (core),Pod,Start,End,GcsfuseMountOptions,InstanceID\n"
)

for key in output:
Expand All @@ -242,19 +247,25 @@ def downloadDlioOutputs(dlioWorkloads):
):
for i in range(len(record_set["records"]["local-ssd"])):
r = record_set["records"][scenario][i]
r["throughput_over_local_ssd"] = round(
r["train_throughput_mb_per_second"]
/ record_set["records"]["local-ssd"][i][
"train_throughput_mb_per_second"
]
* 100,
2,
)
try:
r["throughput_over_local_ssd"] = round(
r["train_throughput_mb_per_second"]
/ record_set["records"]["local-ssd"][i][
"train_throughput_mb_per_second"
]
* 100,
2,
)
except ZeroDivisionError:
print("Got ZeroDivisionError. Ignoring it.")
r["throughput_over_local_ssd"] = 0
except:
raise
output_file.write(
f"{record_set['mean_file_size']},{record_set['num_files_train']},{total_size},{record_set['batch_size']},{scenario},"
)
output_file.write(
f"{r['epoch']},{r['duration']},{r['train_au_percentage']},{r['train_throughput_samples_per_second']},{r['train_throughput_mb_per_second']},{r['throughput_over_local_ssd']},{r['lowest_memory']},{r['highest_memory']},{r['lowest_cpu']},{r['highest_cpu']},{r['pod_name']},{r['start']},{r['end']},\"{r['gcsfuse_mount_options']}\"\n"
f"{r['epoch']},{r['duration']},{r['train_au_percentage']},{r['train_throughput_samples_per_second']},{r['train_throughput_mb_per_second']},{r['throughput_over_local_ssd']},{r['lowest_memory']},{r['highest_memory']},{r['lowest_cpu']},{r['highest_cpu']},{r['pod_name']},{r['start']},{r['end']},\"{r['gcsfuse_mount_options']}\",{args.instance_id}\n"
)
else:
for i in range(len(record_set["records"][scenario])):
Expand All @@ -264,7 +275,7 @@ def downloadDlioOutputs(dlioWorkloads):
f"{record_set['mean_file_size']},{record_set['num_files_train']},{total_size},{record_set['batch_size']},{scenario},"
)
output_file.write(
f"{r['epoch']},{r['duration']},{r['train_au_percentage']},{r['train_throughput_samples_per_second']},{r['train_throughput_mb_per_second']},{r['throughput_over_local_ssd']},{r['lowest_memory']},{r['highest_memory']},{r['lowest_cpu']},{r['highest_cpu']},{r['pod_name']},{r['start']},{r['end']},\"{r['gcsfuse_mount_options']}\"\n"
f"{r['epoch']},{r['duration']},{r['train_au_percentage']},{r['train_throughput_samples_per_second']},{r['train_throughput_mb_per_second']},{r['throughput_over_local_ssd']},{r['lowest_memory']},{r['highest_memory']},{r['lowest_cpu']},{r['highest_cpu']},{r['pod_name']},{r['start']},{r['end']},\"{r['gcsfuse_mount_options']}\",{args.instance_id}\n"
)

output_file.close()
16 changes: 12 additions & 4 deletions perfmetrics/scripts/testing_on_gke/examples/dlio/run_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,22 @@ def run_command(command: str):
print(result.stderr)


def createHelmInstallCommands(dlioWorkloads: set) -> list:
"""Create helm install commands for the given dlioWorkload objects."""
def createHelmInstallCommands(dlioWorkloads: set, instanceId: str):
"""Create helm install commands for the given set of dlioWorkload objects."""
helm_commands = []
for dlioWorkload in dlioWorkloads:
for batchSize in dlioWorkload.batchSizes:
commands = [
(
'helm install'
f' {dlioWorkload.bucket}-{batchSize}-{dlioWorkload.scenario} unet3d-loading-test'
f' dlio-unet3d-{dlioWorkload.scenario}-{dlioWorkload.numFilesTrain}-{dlioWorkload.recordLength}-{batchSize} unet3d-loading-test'
),
f'--set bucketName={dlioWorkload.bucket}',
f'--set scenario={dlioWorkload.scenario}',
f'--set dlio.numFilesTrain={dlioWorkload.numFilesTrain}',
f'--set dlio.recordLength={dlioWorkload.recordLength}',
f'--set dlio.batchSize={batchSize}',
f'--set instanceId={instanceId}',
]

helm_command = ' '.join(commands)
Expand All @@ -60,7 +61,9 @@ def main(args) -> None:
dlioWorkloads = dlio_workload.ParseTestConfigForDlioWorkloads(
args.workload_config
)
helmInstallCommands = createHelmInstallCommands(dlioWorkloads)
helmInstallCommands = createHelmInstallCommands(
dlioWorkloads, args.instance_id
)
for helmInstallCommand in helmInstallCommands:
print(f'{helmInstallCommand}')
if not args.dry_run:
Expand All @@ -81,6 +84,11 @@ def main(args) -> None:
help='Runs DLIO Unet3d tests using this JSON workload configuration.',
required=True,
)
parser.add_argument(
'--instance-id',
help='unique string ID for current test-run',
required=True,
)
parser.add_argument(
'-n',
'--dry-run',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
apiVersion: v1
kind: Pod
metadata:
name: dlio-tester-{{ .Values.dlio.numFilesTrain }}-{{ .Values.dlio.recordLength }}-{{ .Values.dlio.batchSize }}-{{ .Values.scenario }}
name: dlio-tester-{{ .Values.scenario }}-{{ .Values.dlio.numFilesTrain }}-{{ .Values.dlio.recordLength }}-{{ .Values.dlio.batchSize }}
{{- if ne .Values.scenario "local-ssd" }}
annotations:
gke-gcsfuse/volumes: "true"
Expand Down Expand Up @@ -73,6 +73,8 @@ spec:
sleep 300
{{ end }}
outputDir=/logs/{{ .Values.instanceId }}/{{ .Values.dlio.numFilesTrain }}-{{ .Values.dlio.recordLength }}-{{ .Values.dlio.batchSize }}/{{ .Values.scenario }}
echo "Testing {{ .Values.scenario }}"
mpirun -np 8 dlio_benchmark workload=unet3d_a100 \
++workload.train.epochs=4 \
Expand All @@ -84,14 +86,14 @@ spec:
++workload.reader.batch_size={{ .Values.dlio.batchSize }} \
++workload.dataset.record_length={{ .Values.dlio.recordLength }} \
++workload.reader.read_threads={{ .Values.dlio.readThreads }} \
++workload.output.folder=/logs/{{ .Values.dlio.numFilesTrain }}-{{ .Values.dlio.recordLength }}-{{ .Values.dlio.batchSize }}/{{ .Values.scenario }}
++workload.output.folder=${outputDir}
# dump the gcsfuse-mount-configuration to a file in output-directory.
{{ if eq .Values.scenario "gcsfuse-generic"}}
echo "{{ .Values.gcsfuse.mountOptions }}" > /logs/{{ .Values.dlio.numFilesTrain }}-{{ .Values.dlio.recordLength }}-{{ .Values.dlio.batchSize }}/{{ .Values.scenario }}/gcsfuse_mount_options
echo "{{ .Values.gcsfuse.mountOptions }}" > ${outputDir}/gcsfuse_mount_options
{{ end }}
gsutil -m cp -R /logs gs://{{ .Values.bucketName }}/logs/$(date +"%Y-%m-%d-%H-%M")
gsutil -m cp -R /logs/{{ .Values.instanceId }} gs://{{ .Values.bucketName }}/logs/{{ .Values.instanceId }}/$(date +"%Y-%m-%d-%H-%M")
volumeMounts:
- name: dshm
mountPath: /dev/shm
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ bucketName: gke-dlio-test-data
# scenario controls the kind of storage that is used for the load testing. local-ssd means directly on LSSD; gcsfuse-generic means on a gcsfuse mount with gcsfuse.mountOptions sent from the caller; gcsfuse-no-file-cache and gcsfuse-file-cache mean as their name suggests.
scenario: local-ssd
nodeType: n2-standard-96
instanceId: ldap-yyyymmdd-hhmmss

resourceLimits:
cpu: 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
apiVersion: v1
kind: Pod
metadata:
name: fio-tester-{{ .Values.fio.readType }}-{{ lower .Values.fio.fileSize }}-{{ lower .Values.fio.blockSize }}-{{ .Values.scenario }}
name: fio-tester-{{ .Values.instanceId }}-{{ .Values.scenario }}-{{ .Values.fio.readType }}-{{ lower .Values.fio.fileSize }}-{{ lower .Values.fio.blockSize }}-{{ .Values.fio.numThreads }}-{{ .Values.fio.filesPerThread }}
{{- if ne .Values.scenario "local-ssd" }}
annotations:
gke-gcsfuse/volumes: "true"
Expand Down Expand Up @@ -45,7 +45,12 @@ spec:
echo "Install dependencies..."
apt-get update
apt-get install -y libaio-dev gcc make git time wget
no_of_files_per_thread={{ .Values.fio.filesPerThread }}
block_size={{ .Values.fio.blockSize }}
file_size={{ .Values.fio.fileSize }}
num_of_threads={{ .Values.fio.numThreads }}
{{ if eq .Values.scenario "local-ssd" }}
echo "Installing gsutil..."
apt-get update && apt-get install -y apt-transport-https ca-certificates gnupg curl
Expand Down Expand Up @@ -105,12 +110,8 @@ spec:
echo "Setup default values..."
epoch=4
no_of_files_per_thread={{ .Values.fio.filesPerThread }}
read_type={{ .Values.fio.readType }}
pause_in_seconds=20
block_size={{ .Values.fio.blockSize }}
file_size={{ .Values.fio.fileSize }}
num_of_threads={{ .Values.fio.numThreads }}
workload_dir=/data
# Cleaning the pagecache, dentries and inode cache before the starting the workload.
Expand All @@ -125,18 +126,18 @@ spec:
time ls -R $workload_dir 1> /dev/null
echo "Run fio tests..."
mkdir -p /data/fio-output/{{ .Values.scenario }}/$read_type
output_dir=/data/fio-output/{{ .Values.instanceId }}/${file_size}-{{ lower .Values.fio.blockSize}}-${num_of_threads}-${no_of_files_per_thread}/{{ .Values.scenario }}/$read_type
mkdir -p ${output_dir}
# dump the gcsfuse-mount-configuration to a file in output-directory.
{{ if eq .Values.scenario "gcsfuse-generic" }}
echo "{{ .Values.gcsfuse.mountOptions }}" > /data/fio-output/{{ .Values.scenario }}/$read_type/gcsfuse_mount_options
echo "{{ .Values.gcsfuse.mountOptions }}" > ${output_dir}/gcsfuse_mount_options
{{ end }}
for i in $(seq $epoch); do
echo "[Epoch ${i}] start time:" `date +%s`
free -mh # Memory usage before workload start.
NUMJOBS=$num_of_threads NRFILES=$no_of_files_per_thread FILE_SIZE=$file_size BLOCK_SIZE=$block_size READ_TYPE=$read_type DIR=$workload_dir fio ${filename} --alloc-size=1048576 --output-format=json --output="/data/fio-output/{{ .Values.scenario }}/${read_type}/epoch${i}.json"
NUMJOBS=$num_of_threads NRFILES=$no_of_files_per_thread FILE_SIZE=$file_size BLOCK_SIZE=$block_size READ_TYPE=$read_type DIR=$workload_dir fio ${filename} --alloc-size=1048576 --output-format=json --output="${output_dir}/epoch${i}.json"
free -mh # Memory usage after workload completion.
echo "[Epoch ${i}] end time:" `date +%s`
Expand All @@ -154,7 +155,7 @@ spec:
done
{{ if eq .Values.scenario "local-ssd" }}
gsutil -m cp -R /data/fio-output/local-ssd gs://{{ .Values.bucketName }}/fio-output
gsutil -m cp -R /data/fio-output/{{ .Values.instanceId }}/* gs://{{ .Values.bucketName }}/fio-output/{{ .Values.instanceId }}/
{{ end }}
echo "fio job completed!"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ bucketName: gke-dlio-test-data
# scenario controls the kind of storage that is used for the load testing. local-ssd means directly on LSSD; gcsfuse-generic means on a gcsfuse mount with gcsfuse.mountOptions sent from the caller; gcsfuse-no-file-cache and gcsfuse-file-cache mean as their name suggests.
scenario: local-ssd
nodeType: n2-standard-96
instanceId: ldap-yyyymmdd-hhmmss

resourceLimits:
cpu: 0
Expand Down
Loading

0 comments on commit dc187d1

Please sign in to comment.