Skip to content

Commit

Permalink
Migrate pudl_datastore to use Click.
Browse files Browse the repository at this point in the history
  • Loading branch information
zaneselvans committed Dec 3, 2023
1 parent f0d7022 commit 3d741d3
Show file tree
Hide file tree
Showing 2 changed files with 169 additions and 124 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ keywords = [
[project.scripts]
metadata_to_rst = "pudl.convert.metadata_to_rst:metadata_to_rst"
ferc_to_sqlite = "pudl.ferc_to_sqlite.cli:main"
pudl_datastore = "pudl.workspace.datastore:main"
pudl_datastore = "pudl.workspace.datastore:pudl_datastore"
pudl_etl = "pudl.etl.cli:pudl_etl"
pudl_setup = "pudl.workspace.setup_cli:main"
pudl_check_fks = "pudl.etl.check_foreign_keys:pudl_check_fks"
Expand Down
291 changes: 168 additions & 123 deletions src/pudl/workspace/datastore.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
"""Datastore manages file retrieval for PUDL datasets."""
import argparse
import hashlib
import io
import json
import pathlib
import re
import sys
import zipfile
Expand All @@ -12,6 +12,7 @@
from typing import Annotated, Any, Self
from urllib.parse import ParseResult, urlparse

import click
import datapackage
import requests
from google.auth.exceptions import DefaultCredentialsError
Expand Down Expand Up @@ -412,114 +413,22 @@ def get_zipfile_file_names(self, zip_file: zipfile.ZipFile):
return zipfile.ZipFile.namelist(zip_file)


class ParseKeyValues(argparse.Action):
"""Transforms k1=v1,k2=v2,...
into dict(k1=v1, k2=v2, ...).
"""

def __call__(self, parser, namespace, values, option_string=None):
"""Parses the argument value into dict."""
d = getattr(namespace, self.dest, {})
if isinstance(values, str):
values = [values]
for val in values:
for kv in val.split(","):
k, v = kv.split("=")
d[k] = v
setattr(namespace, self.dest, d)


def parse_command_line():
"""Collect the command line arguments."""
known_datasets = "\n".join(
[f" - {x}" for x in ZenodoFetcher().get_known_datasets()]
)

dataset_msg = f"""
Available Datasets:
{known_datasets}"""

parser = argparse.ArgumentParser(
description="Download and cache ETL source data from Zenodo.",
epilog=dataset_msg,
formatter_class=argparse.RawTextHelpFormatter,
)

parser.add_argument(
"--dataset",
help="Download the specified dataset only. See below for available options. "
"The default is to download all datasets, which may take hours depending on "
"network speed.",
)
parser.add_argument(
"--validate",
help="Validate locally cached datapackages, but don't download anything.",
action="store_true",
default=False,
)
parser.add_argument(
"--loglevel",
help="Set logging level (DEBUG, INFO, WARNING, ERROR, or CRITICAL).",
default="INFO",
)
parser.add_argument(
"--logfile",
default=None,
type=str,
help="If specified, write logs to this file.",
)
parser.add_argument(
"--quiet",
help="Do not send logging messages to stdout.",
action="store_true",
default=False,
)
parser.add_argument(
"--gcs-cache-path",
type=str,
help="""Load datastore resources from Google Cloud Storage. Should be gs://bucket[/path_prefix].
The main zenodo cache bucket is gs://zenodo-cache.catalyst.coop.
If specified without --bypass-local-cache, the local cache will be populated from the GCS cache.
If specified with --bypass-local-cache, the GCS cache will be populated by Zenodo.""",
)
parser.add_argument(
"--bypass-local-cache",
action="store_true",
default=False,
help="""If enabled, the local file cache for datastore will not be used.""",
)
parser.add_argument(
"--partition",
default={},
action=ParseKeyValues,
metavar="KEY=VALUE,...",
help="Only retrieve resources matching these conditions.",
)
parser.add_argument(
"--list-partitions",
action="store_true",
default=False,
help="List available partition keys and values for each dataset.",
)

return parser.parse_args()


def print_partitions(dstore: Datastore, datasets: list[str]) -> None:
"""Prints known partition keys and its values for each of the datasets."""
for single_ds in datasets:
parts = dstore.get_datapackage_descriptor(single_ds).get_partitions()
partitions = dstore.get_datapackage_descriptor(single_ds).get_partitions()

print(f"\nPartitions for {single_ds} ({dstore.get_doi(single_ds)}):")
for pkey in sorted(parts):
print(f' {pkey}: {", ".join(str(x) for x in sorted(parts[pkey]))}')
if not parts:
print(f"\nPartitions for {single_ds} ({ZenodoFetcher().get_doi(single_ds)}):")
for partition_key in sorted(partitions):
print(
f' {partition_key}: {", ".join(str(x) for x in sorted(partitions[partition_key]))}'
)
if not partitions:
print(" -- no known partitions --")


def validate_cache(
dstore: Datastore, datasets: list[str], args: argparse.Namespace
dstore: Datastore, datasets: list[str], partition: dict[str, str]
) -> None:
"""Validate elements in the datastore cache.
Expand All @@ -530,7 +439,7 @@ def validate_cache(
num_invalid = 0
descriptor = dstore.get_datapackage_descriptor(single_ds)
for res, content in dstore.get_resources(
single_ds, cached_only=True, **args.partition
single_ds, cached_only=True, **partition
):
try:
num_total += 1
Expand All @@ -547,49 +456,185 @@ def validate_cache(


def fetch_resources(
dstore: Datastore, datasets: list[str], args: argparse.Namespace
dstore: Datastore,
datasets: list[str],
partition: dict[str, int | str],
gcs_cache_path: str,
bypass_local_cache: bool,
) -> None:
"""Retrieve all matching resources and store them in the cache."""
for single_ds in datasets:
for res, contents in dstore.get_resources(
single_ds, skip_optimally_cached=True, **args.partition
single_ds, skip_optimally_cached=True, **partition
):
logger.info(f"Retrieved {res}.")
# If the gcs_cache_path is specified and we don't want
# to bypass the local cache, populate the local cache.
if args.gcs_cache_path and not args.bypass_local_cache:
if gcs_cache_path and not bypass_local_cache:
dstore._cache.add(res, contents)


def main():
"""Cache datasets."""
args = parse_command_line()
def _parse_key_values(
ctx: click.core.Context,
param: click.Option,
values: str,
) -> dict[str, str]:
"""Parse key-value pairs into a Python dictionary.
pudl.logging_helpers.configure_root_logger(
logfile=args.logfile, loglevel=args.loglevel
)
Transforms a command line argument of the form: k1=v1,k2=v2,k3=v3...
into: {k1:v1, k2:v2, k3:v3, ...}
"""
out_dict = {}
for val in values:
for key_value in val.split(","):
key, value = key_value.split("=")
out_dict[key] = value
return out_dict


@click.command(
context_settings={"help_option_names": ["-h", "--help"]},
)
@click.option(
"--dataset",
"-d",
type=click.Choice(ZenodoFetcher().get_known_datasets()),
default=list(ZenodoFetcher().get_known_datasets()),
multiple=True,
help=(
"Specifies what dataset to work with. The default is to download all datasets. "
"Note that downloading all datasets may take hours depending on network speed. "
"This option may be applied multiple times to specify multiple datasets."
),
)
@click.option(
"--validate",
is_flag=True,
default=False,
help="Validate the contents of locally cached data, but don't download anything.",
)
@click.option(
"--list-partitions",
help=(
"List the available partition keys and values for each dataset specified "
"using the --dataset argument, or all datasets if --dataset is not used."
),
is_flag=True,
default=False,
)
@click.option(
"--partition",
"-p",
multiple=True,
help=(
"Only operate on dataset partitions matching these conditions. The argument "
"should have the form: key1=val1,key2=val2,... Conditions are combined with "
"a boolean AND, functionally meaning each key can only appear once. "
"If a key is repeated, only the last value is used. "
"So state=ca,year=2022 will retrieve all California data for 2022, and "
"state=ca,year=2021,year=2022 will also retrieve California data for 2022, "
"while state=ca by itself will retrieve all years of California data."
),
callback=_parse_key_values,
)
@click.option(
"--bypass-local-cache",
is_flag=True,
default=False,
help=(
"If enabled, locally cached data will not be used. Instead, a new copy will be "
"downloaded from Zenodo or the GCS cache if specified."
),
)
@click.option(
"--gcs-cache-path",
type=str,
help=(
"Load cached inputs from Google Cloud Storage if possible. This is usually "
"much faster and more reliable than downloading from Zenodo directly. The "
"path should be a URL of the form gs://bucket[/path_prefix]. Internally we use "
"gs://internal-zenodo-cache.catalyst.coop. A public cache is available at "
"gs://zenodo-cache.catalyst.coop but requires GCS authentication and a billing "
"project to pay data egress costs."
),
)
@click.option(
"--logfile",
help="If specified, write logs to this file.",
type=click.Path(
exists=False,
resolve_path=True,
path_type=pathlib.Path,
),
)
@click.option(
"--loglevel",
default="INFO",
type=click.Choice(
["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], case_sensitive=False
),
)
def pudl_datastore(
dataset: list[str],
validate: bool,
list_partitions: bool,
partition: dict[str, int | str],
gcs_cache_path: str,
bypass_local_cache: bool,
logfile: pathlib.Path,
loglevel: str,
):
"""Manage the raw data inputs to the PUDL data processing pipeline.
Download all the raw FERC Form 2 data:
pudl_datastore --dataset ferc2
Download the raw FERC Form 2 data only for 2021
pudl_datastore --dataset ferc2 --partition year=2021
Re-download the raw FERC Form 2 data for 2021 even if you already have it:
pudl_datastore --dataset ferc2 --partition year=2021 --bypass-local-cache
Validate all California EPA CEMS data in the local datastore:
pudl_datastore --dataset epacems --validate --partition state=ca
List the available partitions in the EIA-860 and EIA-923 datasets:
pudl_datastore --dataset eia860 --dataset eia923 --list-partitions
"""
pudl.logging_helpers.configure_root_logger(logfile=logfile, loglevel=loglevel)

cache_path = None
if not args.bypass_local_cache:
if not bypass_local_cache:
cache_path = PudlPaths().input_dir

dstore = Datastore(
gcs_cache_path=args.gcs_cache_path,
gcs_cache_path=gcs_cache_path,
local_cache_path=cache_path,
)

datasets = [args.dataset] if args.dataset else dstore.get_known_datasets()

if args.partition:
logger.info(f"Only retrieving resources for partition: {args.partition}")
if partition:
logger.info(f"Only considering resource partitions: {partition}")

if args.list_partitions:
print_partitions(dstore, datasets)
elif args.validate:
validate_cache(dstore, datasets, args)
if list_partitions:
print_partitions(dstore, dataset)
elif validate:
validate_cache(dstore, dataset, partition)
else:
fetch_resources(dstore, datasets, args)
fetch_resources(
dstore=dstore,
datasets=dataset,
partition=partition,
gcs_cache_path=gcs_cache_path,
bypass_local_cache=bypass_local_cache,
)

return 0


if __name__ == "__main__":
sys.exit(main())
sys.exit(pudl_datastore())

0 comments on commit 3d741d3

Please sign in to comment.