Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate fsspec to enable accessing WFDB files from cloud URIs #523

Open
wants to merge 21 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,20 @@ jobs:
- name: Check style
run: uv run --extra dev black --check --diff .

test-deb10-i386:
name: Python 3.7 on Debian 10 i386
test-deb11-i386:
name: Python 3.7 on Debian 11 i386
runs-on: ubuntu-latest
container: i386/debian:10
container: i386/debian:11
steps:
- name: Install dependencies
run: |
apt-get update
apt-get install -y --no-install-recommends \
python3-fsspec \
python3-matplotlib \
python3-numpy \
python3-pandas \
python3-pip \
python3-requests \
python3-scipy \
python3-soundfile \
Expand Down
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ dependencies = [
"soundfile >= 0.10.0",
"matplotlib >= 3.2.2",
"requests >= 2.8.1",
"fsspec >= 2023.10.0",
"aiohttp >= 3.10.11",
]
dynamic = ["version"]

Expand Down
19 changes: 16 additions & 3 deletions wfdb/io/_coreio.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
import posixpath

import fsspec

from wfdb.io import _url
from wfdb.io.download import config


# Cloud protocols
CLOUD_PROTOCOLS = ["az://", "azureml://", "s3://", "gs://"]


def _open_file(
pn_dir,
file_name,
Expand All @@ -28,8 +34,9 @@ def _open_file(
The PhysioNet database directory where the file is stored, or None
if file_name is a local path.
file_name : str
The name of the file, either as a local filesystem path (if
`pn_dir` is None) or a URL path (if `pn_dir` is a string.)
The name of the file, either as a local filesystem path or cloud
URL (if `pn_dir` is None) or a PhysioNet URL path
(if `pn_dir` is a string.)
mode : str, optional
The standard I/O mode for the file ("r" by default). If `pn_dir`
is not None, this must be "r", "rt", or "rb".
Expand All @@ -47,7 +54,7 @@ def _open_file(

"""
if pn_dir is None:
return open(
return fsspec.open(
file_name,
mode,
buffering=buffering,
Expand All @@ -56,6 +63,12 @@ def _open_file(
newline=newline,
)
else:
# check to make sure a cloud path isn't being passed under pn_dir
if any(pn_dir.startswith(proto) for proto in CLOUD_PROTOCOLS):
raise ValueError(
"Cloud paths should be passed under record_name, not under pn_dir"
)

url = posixpath.join(config.db_index_url, pn_dir, file_name)
return _url.openurl(
url,
Expand Down
54 changes: 40 additions & 14 deletions wfdb/io/_signal.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import math
import os
import posixpath
import sys

import fsspec
import numpy as np

from wfdb.io import download, _coreio, util

from wfdb.io._coreio import CLOUD_PROTOCOLS

MAX_I32 = 2147483647
MIN_I32 = -2147483648
Expand Down Expand Up @@ -1643,10 +1645,10 @@ def _rd_dat_file(file_name, dir_name, pn_dir, fmt, start_byte, n_samp):
The name of the dat file.
dir_name : str
The full directory where the dat file(s) are located, if the dat
file(s) are local.
file(s) are local or in the cloud.
pn_dir : str
The PhysioNet directory where the dat file(s) are located, if
the dat file(s) are remote.
the dat file(s) are on a PhysioNet server.
fmt : str
The format of the dat file.
start_byte : int
Expand Down Expand Up @@ -1686,15 +1688,22 @@ def _rd_dat_file(file_name, dir_name, pn_dir, fmt, start_byte, n_samp):
element_count = n_samp
byte_count = n_samp * BYTES_PER_SAMPLE[fmt]

# Local dat file
# Local or cloud dat file
if pn_dir is None:
with open(os.path.join(dir_name, file_name), "rb") as fp:
with fsspec.open(os.path.join(dir_name, file_name), "rb") as fp:
fp.seek(start_byte)
sig_data = np.fromfile(
sig_data = util.fromfile(
fp, dtype=np.dtype(DATA_LOAD_TYPES[fmt]), count=element_count
)
# Stream dat file from Physionet

# Stream dat file from PhysioNet
else:
# check to make sure a cloud path isn't being passed under pn_dir
if any(pn_dir.startswith(proto) for proto in CLOUD_PROTOCOLS):
raise ValueError(
"Cloud paths should be passed under record_name, not under pn_dir"
)

dtype_in = np.dtype(DATA_LOAD_TYPES[fmt])
sig_data = download._stream_dat(
file_name, pn_dir, byte_count, start_byte, dtype_in
Expand Down Expand Up @@ -1840,8 +1849,9 @@ def _rd_compressed_file(
file_name : str
The name of the signal file.
dir_name : str
The full directory where the signal file is located, if local.
This argument is ignored if `pn_dir` is not None.
The full directory where the signal file is located, if this
is a local or cloud path. This argument is ignored if `pn_dir`
is not None.
pn_dir : str or None
The PhysioNet database directory where the signal file is located.
fmt : str
Expand Down Expand Up @@ -2585,10 +2595,10 @@ def _infer_sig_len(
The byte offset of the dat file. None is equivalent to zero.
dir_name : str
The full directory where the dat file(s) are located, if the dat
file(s) are local.
file(s) are local or on the cloud.
pn_dir : str, optional
The PhysioNet directory where the dat file(s) are located, if
the dat file(s) are remote.
the dat file(s) are on a PhysioNet server.

Returns
-------
Expand All @@ -2600,13 +2610,29 @@ def _infer_sig_len(
sig_len * tsamps_per_frame * bytes_per_sample == file_size

"""
if pn_dir is None:
file_size = os.path.getsize(os.path.join(dir_name, file_name))
else:
from wfdb.io.record import CLOUD_PROTOCOLS

# If this is a cloud path, use posixpath to construct the path and fsspec to open file
if any(dir_name.startswith(proto) for proto in CLOUD_PROTOCOLS):
with fsspec.open(posixpath.join(dir_name, file_name), mode="rb") as f:
file_size = f.seek(0, os.SEEK_END)

# If the PhysioNet database path is provided, construct the download path using the database version
elif pn_dir is not None:
# check to make sure a cloud path isn't being passed under pn_dir
if any(pn_dir.startswith(proto) for proto in CLOUD_PROTOCOLS):
raise ValueError(
"Cloud paths should be passed under record_name, not under pn_dir"
)

file_size = download._remote_file_size(
file_name=file_name, pn_dir=pn_dir
)

# If it isn't a cloud path or a PhysioNet path, we treat as a local file
else:
file_size = os.path.getsize(os.path.join(dir_name, file_name))

if byte_offset is None:
byte_offset = 0
data_size = file_size - byte_offset
Expand Down
30 changes: 20 additions & 10 deletions wfdb/io/annotation.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import copy
import fsspec
import numpy as np
import os
import pandas as pd
Expand All @@ -9,6 +10,8 @@
from wfdb.io import download
from wfdb.io import _header
from wfdb.io import record
from wfdb.io import util
from wfdb.io._coreio import CLOUD_PROTOCOLS


class Annotation(object):
Expand Down Expand Up @@ -1892,7 +1895,7 @@ def rdann(
----------
record_name : str
The record name of the WFDB annotation file. ie. for file '100.atr',
record_name='100'.
record_name='100'. The path to the file can be a cloud URL.
extension : str
The annotatator extension of the annotation file. ie. for file
'100.atr', extension='atr'.
Expand Down Expand Up @@ -1936,11 +1939,17 @@ def rdann(
>>> ann = wfdb.rdann('sample-data/100', 'atr', sampto=300000)

"""
if (pn_dir is not None) and ("." not in pn_dir):
dir_list = pn_dir.split("/")
pn_dir = posixpath.join(
dir_list[0], download.get_version(dir_list[0]), *dir_list[1:]
)
if pn_dir is not None:
# check to make sure a cloud path isn't being passed under pn_dir
if any(pn_dir.startswith(proto) for proto in CLOUD_PROTOCOLS):
raise ValueError(
"Cloud paths should be passed under record_name, not under pn_dir"
)
if "." not in pn_dir:
dir_list = pn_dir.split("/")
pn_dir = posixpath.join(
dir_list[0], download.get_version(dir_list[0]), *dir_list[1:]
)

return_label_elements = check_read_inputs(
sampfrom, sampto, return_label_elements
Expand Down Expand Up @@ -2071,7 +2080,7 @@ def load_byte_pairs(record_name, extension, pn_dir):
----------
record_name : str
The record name of the WFDB annotation file. ie. for file '100.atr',
record_name='100'.
record_name='100'. The path to the file can be a cloud URL.
extension : str
The annotatator extension of the annotation file. ie. for file
'100.atr', extension='atr'.
Expand All @@ -2086,10 +2095,11 @@ def load_byte_pairs(record_name, extension, pn_dir):
The input filestream converted to an Nx2 array of unsigned bytes.

"""
# local file
# local or cloud file
if pn_dir is None:
with open(record_name + "." + extension, "rb") as f:
filebytes = np.fromfile(f, "<u1").reshape([-1, 2])
with fsspec.open(record_name + "." + extension, "rb") as f:
filebytes = util.fromfile(f, "<u1").reshape([-1, 2])

# PhysioNet file
else:
filebytes = download._stream_annotation(
Expand Down
76 changes: 52 additions & 24 deletions wfdb/io/record.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import os
import re

import fsspec
import numpy as np
import pandas as pd

Expand All @@ -13,6 +14,7 @@
from wfdb.io import download
from wfdb.io import header
from wfdb.io import util
from wfdb.io._coreio import CLOUD_PROTOCOLS


# -------------- WFDB Signal Calibration and Classification ---------- #
Expand Down Expand Up @@ -1824,27 +1826,39 @@ def rdheader(record_name, pn_dir=None, rd_segments=False):

"""
dir_name, base_record_name = os.path.split(record_name)
dir_name = os.path.abspath(dir_name)
file_name = f"{base_record_name}.hea"

# Construct the download path using the database version
if (pn_dir is not None) and ("." not in pn_dir):
dir_list = pn_dir.split("/")
pn_dir = posixpath.join(
dir_list[0], download.get_version(dir_list[0]), *dir_list[1:]
)
# If this is a cloud path, use posixpath to construct the path and fsspec to open file
if any(dir_name.startswith(proto) for proto in CLOUD_PROTOCOLS):
with fsspec.open(posixpath.join(dir_name, file_name), mode="r") as f:
header_content = f.read()

# Read the local or remote header file.
file_name = f"{base_record_name}.hea"
if pn_dir is None:
with open(
# If the PhysioNet database path is provided, construct the download path using the database version
elif pn_dir is not None:
# check to make sure a cloud path isn't being passed under pn_dir
if any(pn_dir.startswith(proto) for proto in CLOUD_PROTOCOLS):
raise ValueError(
"Cloud paths should be passed under record_name, not under pn_dir"
)

if "." not in pn_dir:
dir_list = pn_dir.split("/")
pn_dir = posixpath.join(
dir_list[0], download.get_version(dir_list[0]), *dir_list[1:]
)

header_content = download._stream_header(file_name, pn_dir)

# If it isn't a cloud path or a PhysioNet path, we treat as a local file
else:
dir_name = os.path.abspath(dir_name)
with fsspec.open(
os.path.join(dir_name, file_name),
"r",
encoding="ascii",
errors="ignore",
) as f:
header_content = f.read()
else:
header_content = download._stream_header(file_name, pn_dir)

# Separate comment and non-comment lines
header_lines, comment_lines = header.parse_header_content(header_content)
Expand Down Expand Up @@ -2017,14 +2031,22 @@ def rdrecord(

"""
dir_name, base_record_name = os.path.split(record_name)
dir_name = os.path.abspath(dir_name)
# Update the dir_name using abspath unless it is a cloud path
if not any(dir_name.startswith(proto) for proto in CLOUD_PROTOCOLS):
dir_name = os.path.abspath(dir_name)

# Read the header fields
if (pn_dir is not None) and ("." not in pn_dir):
dir_list = pn_dir.split("/")
pn_dir = posixpath.join(
dir_list[0], download.get_version(dir_list[0]), *dir_list[1:]
)
if pn_dir is not None:
# check to make sure a cloud path isn't being passed under pn_dir
if any(pn_dir.startswith(proto) for proto in CLOUD_PROTOCOLS):
raise ValueError(
"Cloud paths should be passed under record_name, not under pn_dir"
)
if "." not in pn_dir:
dir_list = pn_dir.split("/")
pn_dir = posixpath.join(
dir_list[0], download.get_version(dir_list[0]), *dir_list[1:]
)

record = rdheader(record_name, pn_dir=pn_dir, rd_segments=False)

Expand Down Expand Up @@ -2308,11 +2330,17 @@ def rdsamp(
channels=[1,3])

"""
if (pn_dir is not None) and ("." not in pn_dir):
dir_list = pn_dir.split("/")
pn_dir = posixpath.join(
dir_list[0], download.get_version(dir_list[0]), *dir_list[1:]
)
if pn_dir is not None:
# check to make sure a cloud path isn't being passed under pn_dir
if any(pn_dir.startswith(proto) for proto in CLOUD_PROTOCOLS):
raise ValueError(
"Cloud paths should be passed under record_name, not under pn_dir"
)
if "." not in pn_dir:
dir_list = pn_dir.split("/")
pn_dir = posixpath.join(
dir_list[0], download.get_version(dir_list[0]), *dir_list[1:]
)

record = rdrecord(
record_name=record_name,
Expand Down
Loading