Skip to content

Commit

Permalink
Support per-workload gcsfuse mount options
Browse files Browse the repository at this point in the history
  • Loading branch information
gargnitingoogle committed Sep 10, 2024
1 parent 9b50a76 commit 4392eab
Show file tree
Hide file tree
Showing 13 changed files with 342 additions and 126 deletions.
58 changes: 51 additions & 7 deletions perfmetrics/scripts/testing_on_gke/examples/dlio/dlio_workload.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,23 @@

def validateDlioWorkload(workload: dict, name: str):
"""Validates the given json workload object."""
if 'dlioWorkload' not in workload:
print(f"{name} does not have 'dlioWorkload' key in it.")
return False

if 'bucket' not in workload:
print(f"{name} does not have 'bucket' key in it.")
return False
for requiredWorkloadAttribute, expectedType in {
'bucket': str,
'gcsfuseMountOptions': str,
'dlioWorkload': dict,
}.items():
if requiredWorkloadAttribute not in workload:
print(f"{name} does not have '{requiredWorkloadAttribute}' key in it.")
return False
if not type(workload[requiredWorkloadAttribute]) is expectedType:
print(
f"In {name}, the type of '{requiredWorkloadAttribute}' is of type"
f" '{type(workload[requiredWorkloadAttribute])}', not {expectedType}"
)
return False
if expectedType == str and ' ' in workload[requiredWorkloadAttribute]:
print(f"{name} has space in the value of '{requiredWorkloadAttribute}'")
return False

if 'fioWorkload' in workload:
print(f"{name} has 'fioWorkload' key in it, which is unexpected.")
Expand Down Expand Up @@ -73,6 +83,14 @@ class DlioWorkload:
4. bucket (str): Name of a GCS bucket to read input files from.
5. batchSizes (set of ints): a set of ints representing multiple batchsize
values to test.
6. gcsfuseMountOptions (str): gcsfuse mount options as a single
string in compact stringified format, to be used for the
test scenario "gcsfuse-generic". The individual config/cli flag values should
be separated by comma. Each cli flag should be of the form "<flag>[=<value>]",
while each config-file flag should be of form
"<config>[:<subconfig>[:<subsubconfig>[...]]]:<value>". For example, a legal
value would be:
"implicit-dirs,file_mode=777,file-cache:enable-parallel-downloads:true,metadata-cache:ttl-secs:true".
"""

def __init__(
Expand All @@ -82,12 +100,14 @@ def __init__(
recordLength: int,
bucket: str,
batchSizes: list,
gcsfuseMountOptions: str,
):
self.scenario = scenario
self.numFilesTrain = numFilesTrain
self.recordLength = recordLength
self.bucket = bucket
self.batchSizes = set(batchSizes)
self.gcsfuseMountOptions = gcsfuseMountOptions


def ParseTestConfigForDlioWorkloads(testConfigFileName: str):
Expand Down Expand Up @@ -119,6 +139,30 @@ def ParseTestConfigForDlioWorkloads(testConfigFileName: str):
dlioWorkload['recordLength'],
workload['bucket'],
dlioWorkload['batchSizes'],
workload['gcsfuseMountOptions'],
)
)
return dlioWorkloads


def DlioChartNamePodName(
dlioWorkload: DlioWorkload, instanceID: str, batchSize: int
) -> (str, str, str):
shortenScenario = {
'local-ssd': 'ssd',
'gcsfuse-generic': 'gcsfuse',
}
shortForScenario = (
shortenScenario[dlioWorkload.scenario]
if dlioWorkload.scenario in shortenScenario
else 'other'
)

hashOfWorkload = str(hash((instanceID, batchSize, dlioWorkload))).replace(
'-', ''
)
return (
f'dlio-unet3d-{shortForScenario}-{dlioWorkload.recordLength}-{hashOfWorkload}',
f'dlio-tester-{shortForScenario}-{dlioWorkload.recordLength}-{hashOfWorkload}',
f'{instanceID}/{dlioWorkload.numFilesTrain}-{dlioWorkload.recordLength}-{batchSize}-{hashOfWorkload}/{dlioWorkload.scenario}',
)
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,20 @@ class DlioWorkloadTest(unittest.TestCase):
def test_validate_dlio_workload_empty(self):
self.assertFalse(validateDlioWorkload(({}), "empty-dlio-workload"))

def test_validate_dlio_workload_invalid_no_bucket(self):
def test_validate_dlio_workload_invalid_missing_bucket(self):
self.assertFalse(
validateDlioWorkload(({"dlioWorkload": {}}), "invalid-dlio-workload-1")
validateDlioWorkload(
({"dlioWorkload": {}, "gcsfuseMountOptions": ""}),
"invalid-dlio-workload-missing-bucket",
)
)

def test_validate_dlio_workload_invalid_bucket_contains_space(self):
self.assertFalse(
validateDlioWorkload(
({"dlioWorkload": {}, "gcsfuseMountOptions": "", "bucket": " "}),
"invalid-dlio-workload-bucket-contains-space",
)
)

def test_validate_dlio_workload_invalid_no_dlioWorkloadSpecified(self):
Expand All @@ -22,7 +33,11 @@ def test_validate_dlio_workload_invalid_no_dlioWorkloadSpecified(self):
def test_validate_dlio_workload_invalid_commented_out_dlioWorkload(self):
self.assertFalse(
validateDlioWorkload(
({"_dlioWorkload": {}, "bucket": "dummy-bucket"}),
({
"_dlioWorkload": {},
"bucket": "dummy-bucket",
"gcsfuseMountOptions": "implicit-dirs,cache-max-size:-1",
}),
"commented-out-dlio-workload",
)
)
Expand All @@ -34,6 +49,7 @@ def test_validate_dlio_workload_invalid_mixed_dlioWorkload_fioWorkload(self):
"dlioWorkload": {},
"fioWorkload": {},
"bucket": "dummy-bucket",
"gcsfuseMountOptions": "implicit-dirs,cache-max-size:-1",
}),
"mixed-dlio/fio-workload",
)
Expand All @@ -46,6 +62,7 @@ def test_validate_dlio_workload_invalid_missing_numFilesTrain(self):
"batchSizes": [100, 200],
},
"bucket": "dummy-bucket",
"gcsfuseMountOptions": "implicit-dirs,cache-max-size:-1",
})
self.assertFalse(
validateDlioWorkload(
Expand All @@ -62,6 +79,7 @@ def test_validate_dlio_workload_invalid_unsupported_numFilesTrain(self):
"batchSizes": [100, 200],
},
"bucket": "dummy-bucket",
"gcsfuseMountOptions": "implicit-dirs,cache-max-size:-1",
})
self.assertFalse(
validateDlioWorkload(
Expand All @@ -77,6 +95,7 @@ def test_validate_dlio_workload_invalid_missing_recordLength(self):
"batchSizes": [100, 200],
},
"bucket": "dummy-bucket",
"gcsfuseMountOptions": "implicit-dirs,cache-max-size:-1",
})
self.assertFalse(
validateDlioWorkload(
Expand All @@ -93,6 +112,7 @@ def test_validate_dlio_workload_invalid_unsupported_recordLength(self):
"batchSizes": [100, 200],
},
"bucket": "dummy-bucket",
"gcsfuseMountOptions": "implicit-dirs,cache-max-size:-1",
})
self.assertFalse(
validateDlioWorkload(
Expand All @@ -101,13 +121,69 @@ def test_validate_dlio_workload_invalid_unsupported_recordLength(self):
)
pass

def test_validate_dlio_workload_invalid_missing_gcsfuseMountOptions(self):
workload = dict({
"dlioWorkload": {
"numFilesTrain": 1000,
"recordLength": 100,
"batchSizes": [100, 200],
},
"bucket": "dummy-bucket",
})
self.assertFalse(
validateDlioWorkload(
workload, "invalid-dlio-workload-missing-gcsfuseMountOptions"
)
)
pass

def test_validate_dlio_workload_invalid_unsupported_gcsfuseMountOptions(
self,
):
workload = dict({
"dlioWorkload": {
"numFilesTrain": 1000,
"recordLength": 10000,
"batchSizes": [100, 200],
},
"bucket": "dummy-bucket",
"gcsfuseMountOptions": 100,
})
self.assertFalse(
validateDlioWorkload(
workload, "invalid-dlio-workload-unsupported-gcsfuseMountOptions1"
)
)
pass

def test_validate_dlio_workload_invalid_gcsfuseMountOptions_contains_space(
self,
):
workload = dict({
"dlioWorkload": {
"numFilesTrain": 1000,
"recordLength": 10000,
"batchSizes": [100, 200],
},
"bucket": "dummy-bucket",
"gcsfuseMountOptions": "abc def",
})
self.assertFalse(
validateDlioWorkload(
workload,
"invalid-dlio-workload-unsupported-gcsfuseMountOptions-contains-space",
)
)
pass

def test_validate_dlio_workload_invalid_missing_batchSizes(self):
workload = dict({
"dlioWorkload": {
"numFilesTrain": 1000,
"recordLength": 10000,
},
"bucket": "dummy-bucket",
"gcsfuseMountOptions": "implicit-dirs,cache-max-size:-1",
})
self.assertFalse(
validateDlioWorkload(
Expand All @@ -124,6 +200,7 @@ def test_validate_dlio_workload_invalid_unsupported_batchSizes1(self):
"batchSizes": ["100"],
},
"bucket": "dummy-bucket",
"gcsfuseMountOptions": "implicit-dirs,cache-max-size:-1",
})
self.assertFalse(
validateDlioWorkload(
Expand All @@ -140,6 +217,7 @@ def test_validate_dlio_workload_invalid_unsupported_batchSizes2(self):
"batchSizes": [0, -1],
},
"bucket": "dummy-bucket",
"gcsfuseMountOptions": "implicit-dirs,cache-max-size:-1",
})
self.assertFalse(
validateDlioWorkload(
Expand All @@ -156,6 +234,7 @@ def test_validate_dlio_workload_valid_single_batchSize(self):
"batchSizes": [100],
},
"bucket": "dummy-bucket",
"gcsfuseMountOptions": "implicit-dirs,cache-max-size:-1",
})
self.assertTrue(validateDlioWorkload(workload, "valid-dlio-workload-2"))
pass
Expand All @@ -168,6 +247,7 @@ def test_validate_dlio_workload_valid_multiple_batchSizes(self):
"batchSizes": [100, 200],
},
"bucket": "dummy-bucket",
"gcsfuseMountOptions": "implicit-dirs,cache-max-size:-1",
})
self.assertTrue(validateDlioWorkload(workload, "valid-dlio-workload-2"))
pass
Expand Down
11 changes: 5 additions & 6 deletions perfmetrics/scripts/testing_on_gke/examples/dlio/parse_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,15 +168,14 @@ def downloadDlioOutputs(dlioWorkloads: set, instanceId: str):
continue

for i in range(summary_data["epochs"]):
test_name = summary_data["hostname"]
part_list = test_name.split("-")
key = "-".join(part_list[2:5])
key = root.split("/")[-2]
key_split = key.split("-")

if key not in output:
output[key] = {
"num_files_train": part_list[-3],
"mean_file_size": part_list[-2],
"batch_size": part_list[-1],
"num_files_train": key_split[-4],
"mean_file_size": key_split[-3],
"batch_size": key_split[-2],
"records": {
"local-ssd": [],
"gcsfuse-generic": [],
Expand Down
52 changes: 13 additions & 39 deletions perfmetrics/scripts/testing_on_gke/examples/dlio/run_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,6 @@
import dlio_workload


# The default value of gcsfuse-mount-options to be used
# for "gcsfuse-generic" scenario.
# For description of how to specify the value for this,
# look at the description of the argparser argument for gcsfuse-mount-options.
_DEFAULT_GCSFUSE_MOUNT_OPTIONS = 'implicit-dirs'


def run_command(command: str):
"""Runs the given string command as a subprocess."""
result = subprocess.run(command.split(' '), capture_output=True, text=True)
Expand All @@ -49,20 +42,17 @@ def escapeCommasInString(unescapedStr: str) -> str:
def createHelmInstallCommands(
dlioWorkloads: set,
instanceId: str,
gcsfuseMountOptions: str,
machineType: str,
) -> list:
"""Creates helm install commands for the given dlioWorkload objects."""
helm_commands = []
if not gcsfuseMountOptions:
gcsfuseMountOptions = _DEFAULT_GCSFUSE_MOUNT_OPTIONS
for dlioWorkload in dlioWorkloads:
for batchSize in dlioWorkload.batchSizes:
chartName, podName, outputDirPrefix = dlio_workload.DlioChartNamePodName(
dlioWorkload, instanceId, batchSize
)
commands = [
(
'helm install'
f' dlio-unet3d-{dlioWorkload.scenario}-{dlioWorkload.numFilesTrain}-{dlioWorkload.recordLength}-{batchSize} unet3d-loading-test'
),
f'helm install {chartName} unet3d-loading-test',
f'--set bucketName={dlioWorkload.bucket}',
f'--set scenario={dlioWorkload.scenario}',
f'--set dlio.numFilesTrain={dlioWorkload.numFilesTrain}',
Expand All @@ -71,9 +61,11 @@ def createHelmInstallCommands(
f'--set instanceId={instanceId}',
(
'--set'
f' gcsfuse.mountOptions={escapeCommasInString(gcsfuseMountOptions)}'
f' gcsfuse.mountOptions={escapeCommasInString(dlioWorkload.gcsfuseMountOptions)}'
),
f'--set nodeType={machineType}',
f'--set podName={podName}',
f'--set outputDirPrefix={outputDirPrefix}',
]

helm_command = ' '.join(commands)
Expand All @@ -88,7 +80,6 @@ def main(args) -> None:
helmInstallCommands = createHelmInstallCommands(
dlioWorkloads,
args.instance_id,
args.gcsfuse_mount_options,
args.machine_type,
)
for helmInstallCommand in helmInstallCommands:
Expand Down Expand Up @@ -121,21 +112,6 @@ def main(args) -> None:
),
required=True,
)
parser.add_argument(
'--gcsfuse-mount-options',
metavar='GCSFuse mount options',
help=(
'GCSFuse mount-options, in a compact stringified'
' format, to be set for the '
' scenario "gcsfuse-generic". The individual config/cli flag values'
' should be separated by comma. Each cli flag should be of the form'
' "<name>[=<value>]". Each config-file flag should be of form'
' "<config>[:<subconfig>[:<subsubconfig>[...]]]:<value>". For'
' example, a sample value would be:'
' "implicit-dirs,file_mode=777,file-cache:enable-parallel-downloads:true,metadata-cache:ttl-secs:-1".'
),
required=False,
)
parser.add_argument(
'--machine-type',
metavar='Machine-type of the GCE VM or GKE cluster node',
Expand All @@ -153,19 +129,17 @@ def main(args) -> None:
)

args = parser.parse_args()
for argument in ['instance_id', 'gcsfuse_mount_options', 'machine_type']:
value = getattr(args, argument)
if ' ' in value:
raise Exception(
f'Argument {argument} (value="{value}") contains space in it, which'
' is not supported.'
)
for argument in ['machine_type', 'instance_id']:
for argument in ['instance_id', 'machine_type']:
value = getattr(args, argument)
if len(value) == 0 or str.isspace(value):
raise Exception(
f'Argument {argument} (value="{value}") is empty or contains only'
' spaces.'
)
if ' ' in value:
raise Exception(
f'Argument {argument} (value="{value}") contains space in it, which'
' is not supported.'
)

main(args)
Loading

0 comments on commit 4392eab

Please sign in to comment.