Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Start EIA-176 pipelines: company data #2949

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
826a77a
Start of reusable CSV extractor, incorporating preexisting patterns a…
Oct 6, 2023
bc6eddf
Table schema and archive objects for CSV extraction, pipeline-/form-s…
davidmudrauskas Oct 30, 2023
ff6b5bf
Merge branch 'dev' into eia176_start_implementation
davidmudrauskas Oct 30, 2023
9903674
Unit tests for CsvTableSchema
davidmudrauskas Nov 3, 2023
3a0bfe2
Full unit test coverage for CSV extractor
davidmudrauskas Nov 4, 2023
1fb52e9
Follow patterns for clobber and test file names, implement delete_sch…
davidmudrauskas Nov 6, 2023
8dbd975
Merge branch 'dev' into eia176_start_implementation
davidmudrauskas Nov 6, 2023
1531313
Merge branch 'dev' into eia176_start_implementation
davidmudrauskas Nov 16, 2023
9aff0a8
Update CSV extractor to just return dataframes, integrate with Dagster
davidmudrauskas Nov 16, 2023
caaa212
Combine thin CSV extraction-related class, update tests
davidmudrauskas Nov 16, 2023
fe3fbb7
Remove extraneous files, undo find-replace error
davidmudrauskas Nov 16, 2023
0b703c6
Extract one table using CSV extractor
davidmudrauskas Nov 17, 2023
cb8e7e1
Move managing zipfile and table-file map to CSV extractor client, sim…
davidmudrauskas Nov 17, 2023
4dc4ad9
Merge branch 'dev' into eia176_start_implementation
davidmudrauskas Nov 21, 2023
87b7c51
Merge branch 'dev' into eia176_start_implementation
davidmudrauskas Nov 22, 2023
82504f0
Merge branch 'dev' into eia176_start_implementation
davidmudrauskas Nov 27, 2023
8f4d93e
Merge branch 'dev' into eia176_start_implementation
davidmudrauskas Dec 1, 2023
35de6a8
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Dec 1, 2023
35fabe6
Merge branch 'dev' into eia176_start_implementation
davidmudrauskas Dec 6, 2023
07b48f3
Merge branch 'dev' into eia176_start_implementation
davidmudrauskas Jan 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
257 changes: 257 additions & 0 deletions src/pudl/extract/csv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,257 @@
"""Extractor for CSV data."""
import contextlib
from csv import DictReader
from functools import lru_cache
from importlib import resources
from io import TextIOWrapper
from pathlib import Path
from zipfile import ZipFile

import pandas as pd
import sqlalchemy as sa

import pudl.logging_helpers
from pudl.workspace.datastore import Datastore

logger = pudl.logging_helpers.get_logger(__name__)


class CsvTableSchema:
"""Provides the data definition of a table."""

def __init__(self, table_name: str):
"""Creates new instance of the table schema setting.

The table name will be set as table_name and table will have no columns.
"""
self.name = table_name
self._columns = []
self._column_types = {}
self._short_name_map = {} # short_name_map[short_name] -> long_name

def add_column(
self,
col_name: str,
col_type: sa.types.TypeEngine,
short_name: str | None = None,
):
"""Adds a new column to this table schema."""
assert col_name not in self._columns
self._columns.append(col_name)
self._column_types[col_name] = col_type
if short_name is not None:
self._short_name_map[short_name] = col_name

def get_columns(self) -> list[tuple[str, sa.types.TypeEngine]]:
"""Iterates over the (column_name, column_type) pairs."""
for col_name in self._columns:
yield (col_name, self._column_types[col_name])

def get_column_names(self) -> set[str]:
"""Returns set of long column names."""
return set(self._columns)

def get_column_rename_map(self) -> dict[str, str]:
"""Returns dictionary that maps from short to long column names."""
return dict(self._short_name_map)

def create_sa_table(self, sa_meta: sa.MetaData) -> sa.Table:
"""Creates SQLAlchemy table described by this instance.

Args:
sa_meta: new table will be written to this MetaData object.
"""
table = sa.Table(self.name, sa_meta)
for col_name, col_type in self.get_columns():
table.append_column(sa.Column(col_name, col_type))
return table


class CsvArchive:
"""Represents API for accessing files within a single CSV archive."""

def __init__(
self,
zipfile: ZipFile,
table_file_map: dict[str, str],
column_types: dict[str, dict[str, sa.types.TypeEngine]],
):
"""Constructs new instance of CsvArchive."""
self.zipfile = zipfile
self._table_file_map = table_file_map
self._column_types = column_types
self._table_schemas: dict[str, list[str]] = {}

@lru_cache
def get_table_schema(self, table_name: str) -> CsvTableSchema:
"""Returns TableSchema for a given table."""
with self.zipfile.open(self._table_file_map[table_name]) as f:
text_f = TextIOWrapper(f)
table_columns = DictReader(text_f).fieldnames

if sorted(table_columns) != sorted(self._column_types[table_name].keys()):
raise ValueError(
f"Columns extracted from CSV for {table_name} do not match expected columns"
)

schema = CsvTableSchema(table_name)
for column_name in table_columns:
col_type = self._column_types[table_name][column_name]
schema.add_column(column_name, col_type)
return schema

def load_table(self, filename: str) -> pd.DataFrame:
"""Read the data from the CSV source and return as a dataframe."""
logger.info(f"Extracting {filename} from CSV into pandas DataFrame.")
with self.zipfile.open(filename) as f:
df = pd.read_csv(f)
return df


class CsvReader:
davidmudrauskas marked this conversation as resolved.
Show resolved Hide resolved
"""Wrapper to provide standardized access to CSV files."""

def __init__(
self,
datastore: Datastore,
dataset: str,
column_types: dict[str, dict[str, sa.types.TypeEngine]],
):
"""Create a new instance of CsvReader.

This can be used for retrieving data from CSV files.

Args:
datastore: provides access to raw files on disk.
dataset: name of the dataset (e.g. eia176), this is used to load metadata
from package_data/{dataset} subdirectory.
"""
self.datastore = datastore
self.dataset = dataset
self._table_file_map = {}
self._column_types = column_types
for row in self._open_csv_resource("table_file_map.csv"):
self._table_file_map[row["table"]] = row["filename"]

def _open_csv_resource(self, base_filename: str) -> DictReader:
"""Open the given resource file as :class:`csv.DictReader`."""
csv_path = resources.files(f"pudl.package_data.{self.dataset}") / base_filename
return DictReader(csv_path.open())

def get_table_names(self) -> list[str]:
"""Returns list of tables that this datastore provides access to."""
return list(self._table_file_map)

@lru_cache
def get_archive(self) -> CsvArchive:
"""Returns a CsvArchive instance corresponding to the dataset."""
return CsvArchive(
self.datastore.get_zipfile_resource(self.dataset),
table_file_map=self._table_file_map,
column_types=self._column_types,
)


class CsvExtractor:
davidmudrauskas marked this conversation as resolved.
Show resolved Hide resolved
"""Generalized class for extracting and loading data from CSV files into SQL database.

When subclassing from this generic extractor, one should implement dataset specific
logic in the following manner:

1. Set DATABASE_NAME class attribute. This controls what filename is used for the output
sqlite database.
2. Set DATASET class attribute. This is used to load metadata from package_data/{dataset} subdirectory.
3. Set COLUMN_TYPES to a map of tables to column names and their sqlalchemy types. This is used to generate DDL.

Dataset specific logic and transformations can be injected by overriding:

1. finalize_schema() in order to modify sqlite schema. This is called just before
the schema is written into the sqlite database. This is good place for adding
primary and/or foreign key constraints to tables.
2. postprocess() is called after data is written to sqlite. This can be used for
database level final cleanup and transformations (e.g. injecting missing IDs).

The extraction logic is invoked by calling execute() method of this class.
"""

DATABASE_NAME = None
DATASET = None
COLUMN_TYPES = {}

def __init__(self, datastore: Datastore, output_path: Path, clobber: bool = False):
"""Constructs new instance of CsvExtractor.

Args:
datastore: top-level datastore instance for accessing raw data files.
output_path: directory where the output databases should be stored.
clobber: if True, existing databases should be replaced.
"""
self.clobber = clobber
self.output_path = output_path
self.csv_reader = self.get_csv_reader(datastore)
self.sqlite_engine = sa.create_engine(self.get_db_path())
self.sqlite_meta = sa.MetaData()

def get_db_path(self) -> str:
"""Returns the connection string for the sqlite database."""
db_path = str(Path(self.output_path) / self.DATABASE_NAME)
return f"sqlite:///{db_path}"

def get_csv_reader(self, datastore: Datastore):
"""Returns instance of CsvReader to access the data."""
return CsvReader(datastore, self.DATASET, self.COLUMN_TYPES)

def execute(self):
"""Runs the extraction of the data from csv to sqlite."""
self.delete_schema()
self.create_sqlite_tables()
self.load_table_data()
self.postprocess()

def delete_schema(self):
"""Drops all tables from the existing sqlite database."""
with contextlib.suppress(sa.exc.OperationalError):
pudl.helpers.drop_tables(
self.sqlite_engine,
clobber=self.clobber,
)

self.sqlite_engine = sa.create_engine(self.get_db_path())
self.sqlite_meta = sa.MetaData()
self.sqlite_meta.reflect(self.sqlite_engine)

def create_sqlite_tables(self):
"""Creates database schema based on the input tables."""
csv_archive = self.csv_reader.get_archive()
for tn in self.csv_reader.get_table_names():
csv_archive.get_table_schema(tn).create_sa_table(self.sqlite_meta)
self.finalize_schema(self.sqlite_meta)
self.sqlite_meta.create_all(self.sqlite_engine)

def load_table_data(self) -> None:
"""Extracts and loads csv data into sqlite."""
for table in self.csv_reader.get_table_names():
filename = self.csv_reader._table_file_map[table]
df = self.csv_reader.get_archive().load_table(filename)
coltypes = {col.name: col.type for col in self.sqlite_meta.tables[table].c}
logger.info(f"SQLite: loading {len(df)} rows into {table}.")
df.to_sql(
table,
self.sqlite_engine,
if_exists="append",
chunksize=100000,
dtype=coltypes,
index=False,
)

def finalize_schema(self, meta: sa.MetaData) -> sa.MetaData:
"""This method is called just before the schema is written to sqlite.

You can use this method to apply dataset specific alterations to the schema,
such as adding primary and foreign key constraints.
"""
return meta

def postprocess(self):
"""This method is called after all the data is loaded into sqlite to transform raw data to targets."""
pass
2 changes: 1 addition & 1 deletion src/pudl/extract/dbf.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ def get_table_schema(self, table_name: str) -> DbfTableSchema:
table_columns = self.get_db_schema()[table_name]
dbf = self.get_table_dbf(table_name)
dbf_fields = [field for field in dbf.fields if field.name != "_NullFlags"]
if len(table_columns) != len(table_columns):
if len(dbf_fields) != len(table_columns):
davidmudrauskas marked this conversation as resolved.
Show resolved Hide resolved
return ValueError(
f"Number of DBF fields in {table_name} does not match what was "
f"found in the DBC index file for {self.partition}."
Expand Down
22 changes: 22 additions & 0 deletions src/pudl/extract/eia176.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
"""Extract EIA Form 176 data from CSVs.

The EIA Form 176 archive also contains CSVs for EIA Form 191 and EIA Form 757.
davidmudrauskas marked this conversation as resolved.
Show resolved Hide resolved
"""

import sqlalchemy as sa # noqa: I001

from pudl.extract.csv import CsvExtractor


class Eia176CsvExtractor(CsvExtractor):
"""Extractor for EIA Form 176 data."""

DATABASE_NAME = "eia176.sqlite"
DATASET = "eia176"
COLUMN_TYPES = {
"e176_company": {
"COMPANY_ID": sa.String,
"ACTIVITY_STATUS": sa.String,
"NAME1": sa.String,
},
}
1 change: 1 addition & 0 deletions src/pudl/extract/eia191.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Extract EIA Form 191 data from CSVs."""
1 change: 1 addition & 0 deletions src/pudl/extract/eia757.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Extract EIA Form 757 data from CSVs."""
2 changes: 2 additions & 0 deletions src/pudl/package_data/eia176/table_file_map.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
table,filename
e176_company,all_company_176.csv
2 changes: 2 additions & 0 deletions src/pudl/workspace/datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ class ZenodoDoiSettings(BaseSettings):
# Sandbox DOIs are provided for reference
censusdp1tract: ZenodoDoi = "10.5281/zenodo.4127049"
# censusdp1tract: ZenodoDoi = "10.5072/zenodo.674992"
eia176: ZenodoDoi = "10.5281/zenodo.7682358"
# eia176: ZenodoDoi - "10.5072/zenodo.1166385"
eia860: ZenodoDoi = "10.5281/zenodo.8164776"
# eia860: ZenodoDoi = "10.5072/zenodo.1222854"
eia860m: ZenodoDoi = "10.5281/zenodo.8188017"
Expand Down
Loading