Skip to content

Commit

Permalink
Refactor download_file
Browse files Browse the repository at this point in the history
  • Loading branch information
gutzbenj committed Mar 9, 2025
1 parent e7d1843 commit 294aa66
Show file tree
Hide file tree
Showing 21 changed files with 401 additions and 146 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## Development

### Refactor
- Refactor `download_file`

## 0.106.0 - 2025-03-05

### Fix
Expand Down
15 changes: 13 additions & 2 deletions tests/util/test_network.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,17 @@

def test_create_fsspec_filesystem() -> None:
"""Test if a fsspec filesystem can be created."""
fs1 = NetworkFilesystemManager.get(settings=Settings(), ttl=CacheExpiry.METAINDEX)
fs2 = NetworkFilesystemManager.get(settings=Settings(), ttl=CacheExpiry.METAINDEX)
default_settings = Settings()
fs1 = NetworkFilesystemManager.get(
cache_dir=default_settings.cache_dir,
ttl=CacheExpiry.METAINDEX,
client_kwargs=default_settings.fsspec_client_kwargs,
cache_disable=default_settings.cache_disable,
)
fs2 = NetworkFilesystemManager.get(
cache_dir=default_settings.cache_dir,
ttl=CacheExpiry.METAINDEX,
client_kwargs=default_settings.fsspec_client_kwargs,
cache_disable=default_settings.cache_disable,
)
assert id(fs1) == id(fs2)
9 changes: 7 additions & 2 deletions wetterdienst/provider/dwd/dmo/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,8 +362,13 @@ def __init__(

def _all(self) -> pl.LazyFrame:
"""Get all stations from DMO."""
log.info(f"Downloading file {self._url}.")
payload = download_file(self._url, self.settings, CacheExpiry.METAINDEX)
payload = download_file(
url=self._url,
cache_dir=self.settings.cache_dir,
ttl=CacheExpiry.METAINDEX,
client_kwargs=self.settings.fsspec_client_kwargs,
cache_disable=self.settings.cache_disable,
)
text = StringIO(payload.read().decode(encoding="latin-1"))
lines = text.readlines()
header = lines.pop(0)
Expand Down
7 changes: 6 additions & 1 deletion wetterdienst/provider/dwd/mosmix/access.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,12 @@ def __init__(self, settings: Settings) -> None:
self.nsmap = None
self.iter_elems = None

self.dwdfs = NetworkFilesystemManager.get(settings=settings, ttl=CacheExpiry.FIVE_MINUTES)
self.dwdfs = NetworkFilesystemManager.get(
cache_dir=settings.cache_dir,
ttl=CacheExpiry.FIVE_MINUTES,
client_kwargs=settings.fsspec_client_kwargs,
cache_disable=settings.cache_disable,
)

def download(self, url: str) -> BytesIO:
"""Download kml file as bytes.
Expand Down
9 changes: 7 additions & 2 deletions wetterdienst/provider/dwd/mosmix/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,13 @@ def __init__(

def _all(self) -> pl.LazyFrame:
"""Read the MOSMIX station catalog from the DWD server and return a DataFrame."""
log.info(f"Downloading file {self._url}.")
payload = download_file(self._url, self.settings, CacheExpiry.METAINDEX)
payload = download_file(
url=self._url,
cache_dir=self.settings.cache_dir,
ttl=CacheExpiry.METAINDEX,
client_kwargs=self.settings.fsspec_client_kwargs,
cache_disable=self.settings.cache_disable,
)
text = StringIO(payload.read().decode(encoding="latin-1"))
lines = text.readlines()
header = lines.pop(0)
Expand Down
9 changes: 7 additions & 2 deletions wetterdienst/provider/dwd/observation/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,13 @@ def _download_climate_observations_data(remote_file: str, settings: Settings) ->


def __download_climate_observations_data(remote_file: str, settings: Settings) -> bytes:
log.info(f"Downloading file {remote_file}.")
file = download_file(remote_file, settings=settings, ttl=CacheExpiry.FIVE_MINUTES)
file = download_file(
url=remote_file,
cache_dir=settings.cache_dir,
ttl=CacheExpiry.FIVE_MINUTES,
client_kwargs=settings.fsspec_client_kwargs,
cache_disable=settings.cache_disable,
)
try:
zfs = ZipFileSystem(file)
except BadZipFile as e:
Expand Down
49 changes: 31 additions & 18 deletions wetterdienst/provider/dwd/observation/metaindex.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import datetime as dt
import logging
from concurrent.futures import ThreadPoolExecutor
from io import BytesIO, StringIO
from typing import TYPE_CHECKING
from zoneinfo import ZoneInfo
Expand All @@ -22,7 +21,7 @@
_create_file_index_for_dwd_server,
)
from wetterdienst.provider.dwd.observation.metadata import DWD_URBAN_DATASETS, DwdObservationMetadata
from wetterdienst.util.network import download_file
from wetterdienst.util.network import download_file, download_files

if TYPE_CHECKING:
from wetterdienst.core.timeseries.metadata import DatasetModel
Expand Down Expand Up @@ -105,8 +104,13 @@ def _create_meta_index_for_climate_observations(
msg = f"No meta file was found amongst the files at {url}."
raise MetaFileNotFoundError(msg)
meta_file = df_files.get_column("url").to_list()[0]
log.info(f"Downloading file {meta_file}.")
payload = download_file(meta_file, settings=settings, ttl=CacheExpiry.METAINDEX)
payload = download_file(
url=meta_file,
cache_dir=settings.cache_dir,
ttl=CacheExpiry.METAINDEX,
client_kwargs=settings.fsspec_client_kwargs,
cache_disable=settings.cache_disable,
)
return _read_meta_df(payload)


Expand Down Expand Up @@ -167,10 +171,20 @@ def _create_meta_index_for_subdaily_extreme_wind(period: Period, settings: Setti
.get_column("url")
.first()
)
log.info(f"Downloading file {meta_file_fx3}.")
payload_fx3 = download_file(meta_file_fx3, settings=settings, ttl=CacheExpiry.METAINDEX)
log.info(f"Downloading file {meta_file_fx6}.")
payload_fx6 = download_file(meta_file_fx6, settings=settings, ttl=CacheExpiry.METAINDEX)
payload_fx3 = download_file(
url=meta_file_fx3,
cache_dir=settings.cache_dir,
ttl=CacheExpiry.METAINDEX,
client_kwargs=settings.fsspec_client_kwargs,
cache_disable=settings.cache_disable,
)
payload_fx6 = download_file(
url=meta_file_fx6,
cache_dir=settings.cache_dir,
ttl=CacheExpiry.METAINDEX,
client_kwargs=settings.fsspec_client_kwargs,
cache_disable=settings.cache_disable,
)
df_fx3 = _read_meta_df(payload_fx3)
df_fx6 = _read_meta_df(payload_fx6)
df_fx6 = df_fx6.join(df_fx3.select("station_id"), on=["station_id"], how="inner")
Expand All @@ -188,18 +202,17 @@ def _create_meta_index_for_1minute_historical_precipitation(settings: Settings)
)
files_and_station_ids = df_files.select(["url", "station_id"]).collect().to_struct().to_list()
log.info(f"Downloading {len(files_and_station_ids)} files for 1minute precipitation historical metadata.")
with ThreadPoolExecutor() as executor:
metadata_files = executor.map(
lambda file_and_station_id: download_file(
url=file_and_station_id["url"],
settings=settings,
ttl=CacheExpiry.NO_CACHE,
),
files_and_station_ids,
)
remote_files = [file_and_station_id["url"] for file_and_station_id in files_and_station_ids]
files = download_files(
urls=remote_files,
cache_dir=settings.cache_dir,
ttl=CacheExpiry.NO_CACHE,
client_kwargs=settings.fsspec_client_kwargs,
cache_disable=settings.cache_disable,
)
dfs = [
_parse_geo_metadata((file, file_and_station_id["station_id"]))
for file, file_and_station_id in zip(metadata_files, files_and_station_ids, strict=False)
for file, file_and_station_id in zip(files, files_and_station_ids, strict=False)
]
df = pl.concat(dfs)
df = df.with_columns(
Expand Down
23 changes: 14 additions & 9 deletions wetterdienst/provider/dwd/radar/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,8 +436,13 @@ def _download_generic_data(self, url: str) -> Iterator[RadarResult]: # noqa: C9
ttl = CacheExpiry.FIVE_MINUTES
if not self._should_cache_download(url):
ttl = CacheExpiry.NO_CACHE
log.info(f"Downloading file {url}.")
data = download_file(url=url, ttl=ttl, settings=self.settings)
data = download_file(
url=url,
cache_dir=self.settings.cache_dir,
ttl=ttl,
client_kwargs=self.settings.fsspec_client_kwargs,
cache_disable=self.settings.cache_disable,
)

# RadarParameter.FX_REFLECTIVITY
if url.endswith(Extension.TAR_BZ2.value):
Expand Down Expand Up @@ -498,7 +503,13 @@ def _download_generic_data(self, url: str) -> Iterator[RadarResult]: # noqa: C9

def _download_radolan_data(self, url: str, start_date: dt.datetime, end_date: dt.datetime) -> Iterator[RadarResult]:
"""Download RADOLAN_CDC data for a given datetime."""
archive_in_bytes = self.__download_radolan_data(url=url, settings=self.settings)
archive_in_bytes = download_file(
url=url,
cache_dir=self.settings.cache_dir,
ttl=CacheExpiry.TWELVE_HOURS,
client_kwargs=self.settings.fsspec_client_kwargs,
cache_disable=self.settings.cache_disable,
)

for result in self._extract_radolan_data(archive_in_bytes):
if not result.timestamp:
Expand All @@ -512,12 +523,6 @@ def _download_radolan_data(self, url: str, start_date: dt.datetime, end_date: dt

yield result

@staticmethod
def __download_radolan_data(url: str, settings: Settings) -> BytesIO:
"""Download RADOLAN_CDC data for a given datetime."""
log.info(f"Downloading file {url}.")
return download_file(url=url, ttl=CacheExpiry.TWELVE_HOURS, settings=settings)

@staticmethod
def _extract_radolan_data(archive_in_bytes: BytesIO) -> Iterator[RadarResult]:
"""Extract the RADOLAN_CDC data from the archive."""
Expand Down
33 changes: 19 additions & 14 deletions wetterdienst/provider/dwd/road/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from __future__ import annotations

import logging
from concurrent.futures import ThreadPoolExecutor
from enum import Enum
from functools import reduce
from tempfile import NamedTemporaryFile
Expand All @@ -25,7 +24,7 @@
from wetterdienst.metadata.cache import CacheExpiry
from wetterdienst.provider.dwd.metadata import _METADATA
from wetterdienst.util.eccodes import check_pdbufr
from wetterdienst.util.network import download_file, list_remote_files_fsspec
from wetterdienst.util.network import download_file, download_files, list_remote_files_fsspec

if TYPE_CHECKING:
from io import BytesIO
Expand Down Expand Up @@ -259,25 +258,26 @@ def _collect_data_by_station_group(
parameters: list[ParameterModel],
) -> pl.DataFrame:
"""Collect data from DWD Road Weather stations."""
remote_files = self._create_file_index_for_dwd_road_weather_station(road_weather_station_group)
df_files = self._create_file_index_for_dwd_road_weather_station(road_weather_station_group)
if self.sr.start_date:
remote_files = remote_files.filter(
df_files = df_files.filter(
pl.col("date").is_between(self.sr.start_date, self.sr.end_date),
)
remote_files = remote_files.get_column("filename").to_list()
remote_files = df_files.get_column("filename").to_list()
filenames_and_files = self._download_road_weather_observations(remote_files, self.sr.settings)
return self._parse_dwd_road_weather_data(filenames_and_files, parameters)

@staticmethod
def _download_road_weather_observations(remote_files: list[str], settings: Settings) -> list[tuple[str, BytesIO]]:
"""Download the road weather station data from a given file and returns a DataFrame."""
log.info(f"Downloading {len(remote_files)} files from DWD Road Weather.")
with ThreadPoolExecutor() as p:
files_in_bytes = p.map(
lambda file: download_file(url=file, settings=settings, ttl=CacheExpiry.TWELVE_HOURS),
remote_files,
)
return list(zip(remote_files, files_in_bytes, strict=False))
files = download_files(
urls=remote_files,
cache_dir=settings.cache_dir,
ttl=CacheExpiry.TWELVE_HOURS,
client_kwargs=settings.fsspec_client_kwargs,
cache_disable=settings.cache_disable,
)
return list(zip(remote_files, files, strict=False))

def _parse_dwd_road_weather_data(
self,
Expand Down Expand Up @@ -434,8 +434,13 @@ def __init__(
)

def _all(self) -> pl.LazyFrame:
log.info(f"Downloading file {self._endpoint}.")
payload = download_file(self._endpoint, self.settings, CacheExpiry.METAINDEX)
payload = download_file(
url=self._endpoint,
cache_dir=self.settings.cache_dir,
ttl=CacheExpiry.METAINDEX,
client_kwargs=self.settings.fsspec_client_kwargs,
cache_disable=self.settings.cache_disable,
)
df = pl.read_excel(source=payload, sheet_name="Tabelle1", infer_schema_length=0)
df = df.rename(mapping=self._column_mapping)
df = df.select(pl.col(col) for col in self._column_mapping.values())
Expand Down
27 changes: 21 additions & 6 deletions wetterdienst/provider/ea/hydrology/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,13 @@ def _collect_station_parameter_or_dataset(
) -> pl.DataFrame:
"""Collect data for a station, parameter or dataset."""
url = self._url.format(station_id=station_id)
log.info(f"Downloading file {url}.")
payload = download_file(url=url, settings=self.sr.stations.settings, ttl=CacheExpiry.NO_CACHE)
payload = download_file(
url=url,
cache_dir=self.sr.stations.settings.cache_dir,
ttl=CacheExpiry.NO_CACHE,
client_kwargs=self.sr.stations.settings.fsspec_client_kwargs,
cache_disable=self.sr.stations.settings.cache_disable,
)
df_measures = pl.read_json(
payload,
schema={
Expand Down Expand Up @@ -170,8 +175,13 @@ def _collect_station_parameter_or_dataset(
except IndexError:
return pl.DataFrame()
readings_url = f"{readings_id_url}/readings.json"
log.info(f"Downloading file {readings_url}.")
payload = download_file(url=readings_url, settings=self.sr.stations.settings, ttl=CacheExpiry.FIVE_MINUTES)
payload = download_file(
url=readings_url,
cache_dir=self.sr.stations.settings.cache_dir,
ttl=CacheExpiry.FIVE_MINUTES,
client_kwargs=self.sr.stations.settings.fsspec_client_kwargs,
cache_disable=self.sr.stations.settings.cache_disable,
)
df = pl.read_json(
payload,
schema={
Expand Down Expand Up @@ -244,8 +254,13 @@ def __init__(

def _all(self) -> pl.LazyFrame:
"""Acquire all stations and filter for stations that have wanted resolution and parameter combinations."""
log.info(f"Acquiring station listing from {self._url}")
payload = download_file(self._url, self.settings, CacheExpiry.FIVE_MINUTES)
payload = download_file(
url=self._url,
cache_dir=self.settings.cache_dir,
ttl=CacheExpiry.FIVE_MINUTES,
client_kwargs=self.settings.fsspec_client_kwargs,
cache_disable=self.settings.cache_disable,
)
df = pl.read_json(
payload,
schema={
Expand Down
27 changes: 21 additions & 6 deletions wetterdienst/provider/eaufrance/hubeau/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,13 @@ def _get_dynamic_frequency(
parameter: ParameterModel,
) -> tuple[int, Literal["m", "H"]]:
url = self._endpoint_freq.format(station_id=station_id, grandeur_hydro=parameter.name_original)
log.info(f"Downloading file {url}.")
response = download_file(url=url, settings=self.sr.stations.settings, ttl=CacheExpiry.METAINDEX)
response = download_file(
url=url,
cache_dir=self.sr.stations.settings.cache_dir,
ttl=CacheExpiry.METAINDEX,
client_kwargs=self.sr.stations.settings.fsspec_client_kwargs,
cache_disable=self.sr.stations.settings.cache_disable,
)
values_dict = json.load(response)["data"]
try:
second_date = values_dict[1]["date_obs"]
Expand Down Expand Up @@ -154,8 +159,13 @@ def _collect_station_parameter_or_dataset(
start_date=start_date.isoformat(),
end_date=end_date.isoformat(),
)
log.info(f"Downloading file {url}.")
response = download_file(url=url, settings=self.sr.stations.settings)
response = download_file(
url=url,
cache_dir=self.sr.stations.settings.cache_dir,
ttl=CacheExpiry.FIVE_MINUTES,
client_kwargs=self.sr.stations.settings.fsspec_client_kwargs,
cache_disable=self.sr.stations.settings.cache_disable,
)
df = pl.read_json(
response,
schema={
Expand Down Expand Up @@ -230,8 +240,13 @@ def __init__(

def _all(self) -> pl.LazyFrame:
""":return:"""
log.info(f"Downloading file {self._endpoint}.")
response = download_file(url=self._endpoint, settings=self.settings, ttl=CacheExpiry.METAINDEX)
response = download_file(
url=self._endpoint,
cache_dir=self.settings.cache_dir,
ttl=CacheExpiry.METAINDEX,
client_kwargs=self.settings.fsspec_client_kwargs,
cache_disable=self.settings.cache_disable,
)
df_raw = pl.read_json(
response,
schema={
Expand Down
Loading

0 comments on commit 294aa66

Please sign in to comment.