-
-
Notifications
You must be signed in to change notification settings - Fork 350
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add asynchronous file io and path wrappers #180
Merged
Merged
Changes from all commits
Commits
Show all changes
57 commits
Select commit
Hold shift + click to select a range
9cbc366
trio.io: initial implementation
buhman b401562
trio.io: wrap open in AsyncGeneratorContextManager
buhman f20874f
trio.io: wrap heap classes
buhman f7f7bcc
io: replace async_generator_context with closing
buhman b0668dd
tests: add io tests
buhman ab54e72
test_io: fix python3.5 compatibility
buhman 4d3614f
io: fix warnings
buhman 6dc85bb
trio.io: docs wip
buhman 7b704a2
io: re-add AsyncIOBase context manager
buhman e5b69c8
io: add aiter compatibility wrapper
buhman 77fde94
io: rename public API
buhman 482ca8f
trio: rename trio.io to trio._file_io
buhman 1dad9ee
_file_io: gaurantee the underlying file is closed during cancellation
buhman bc5c085
_file_io: move _method_factory to _helpers module
buhman d828f39
trio: initial path implementation
buhman df0b6cb
_path: fix python3.5 compatibility
buhman 39b4d0c
_path: remove WindowsPath and PosixPath wrappers
buhman efb739c
_file_io: promote wrapper_factory cls re-wrapping to _helpers
buhman 86543fb
_path: special-case open method
buhman 95a1966
_path: implement __dir__
buhman 7f88ca0
_file_io: despecialize AsyncIO; remove wrapper type
buhman 13d0735
_file_io/_helpers: consolidate __name__ magic in async_wraps
buhman 84c0238
async_wraps: add __doc__
buhman 2a26335
test_file_io: clean up tests
buhman 066d1f2
_file_io: remove closing
buhman d585856
_file_io: compute available attributes in __dir__
buhman f91c717
_file_io: add methods and attributes not defined in *IOBase
buhman 45d8ad8
docs: improve file_io documentation
buhman f217d73
trio: rename AsyncIO to AsyncIOWrapper
buhman c7203b6
path: add more tests
buhman 266bedb
path: add proper support for path magic
buhman 3126442
path: add tests for repr and forward rewrap
buhman d0d9b05
path: add type tests
buhman 9cdf33e
path: fix python 3.5 compatibility
buhman da7b8ec
path: add documentation
buhman 179393a
file_io: make AsyncIOWrapper private
buhman ed6bd54
file_io: remove package
buhman 67e8f9c
test_file_io: use wrap_file instead of private _wrapped
buhman 39c33f0
path: rename AsyncPath to Path
buhman 5034e5d
path: replace __new__ with __init__
buhman d9cac31
test_file_io: cleanup
buhman 953db74
file_io: make _FILE_SYNC_ATTRS and _FILE_SYNC_METHODS sets instead of…
buhman bc8c317
file_io: add support for duck-files
buhman b70c413
path: add comment for 3.5-compatibility in __fspath__
buhman db69f48
path: fix comparison magic
buhman 04ed0ad
path: add tests for paths being initialized from other paths
buhman 00d6eb8
path: fix python3.5 compatibility
buhman f2b237f
test_file_io: skip coverage on fake methods
buhman d9c09af
path: improve comments
buhman 7555d3b
file_io: make duck-file definition more restrictive
buhman 66836f3
test_path: improve magic tests
buhman 6042f8b
path: use more strict compatibility logic
buhman 8f076d9
path: unwrap any trio.Path argument to any wrapped method
buhman e63b4cc
path: de-genericify path return value rewrapping logic
buhman f0e5fa4
path: re-wrap property return values
buhman e17ac07
path: add support for __truediv__, __rtruediv__
buhman e74396c
test_path: fix windows compatibility
buhman File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
:orphan: | ||
|
||
.. _glossary: | ||
|
||
******** | ||
Glossary | ||
******** | ||
|
||
.. glossary:: | ||
|
||
asynchronous file object | ||
This is an object with an API identical to a :term:`file object`, with | ||
the exception that all non-computational methods are async functions. | ||
|
||
The main way to create an asynchronous file object is by using the | ||
:func:`trio.open_file` function. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -83,3 +83,4 @@ Vital statistics: | |
* :ref:`genindex` | ||
* :ref:`modindex` | ||
* :ref:`search` | ||
* :ref:`glossary` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,168 @@ | ||
from functools import partial | ||
import io | ||
|
||
import trio | ||
from trio import _core | ||
from trio._util import aiter_compat, async_wraps | ||
|
||
|
||
__all__ = ['open_file', 'wrap_file'] | ||
|
||
_FILE_SYNC_ATTRS = { | ||
'closed', | ||
'encoding', 'errors', 'fileno', 'isatty', 'newlines', | ||
'readable', 'seekable', 'writable', | ||
# not defined in *IOBase: | ||
'buffer', 'raw', 'line_buffering', 'closefd', 'name', 'mode', | ||
'getvalue', 'getbuffer', | ||
} | ||
|
||
_FILE_ASYNC_METHODS = { | ||
'flush', | ||
'read', 'read1', 'readall', 'readinto', 'readline', 'readlines', | ||
'seek', 'tell', 'truncate', | ||
'write', 'writelines', | ||
# not defined in *IOBase: | ||
'readinto1', 'peek', | ||
} | ||
|
||
|
||
class AsyncIOWrapper: | ||
"""A generic :class:`~io.IOBase` wrapper that implements the :term:`asynchronous | ||
file object` interface. Wrapped methods that could block are executed in | ||
:meth:`trio.run_in_worker_thread`. | ||
|
||
All properties and methods defined in in :mod:`~io` are exposed by this | ||
wrapper, if they exist in the wrapped file object. | ||
|
||
""" | ||
|
||
def __init__(self, file): | ||
self._wrapped = file | ||
|
||
@property | ||
def wrapped(self): | ||
"""object: A reference to the wrapped file object | ||
|
||
""" | ||
|
||
return self._wrapped | ||
|
||
def __getattr__(self, name): | ||
if name in _FILE_SYNC_ATTRS: | ||
return getattr(self._wrapped, name) | ||
if name in _FILE_ASYNC_METHODS: | ||
meth = getattr(self._wrapped, name) | ||
|
||
@async_wraps(self.__class__, self._wrapped.__class__, name) | ||
async def wrapper(*args, **kwargs): | ||
func = partial(meth, *args, **kwargs) | ||
return await trio.run_in_worker_thread(func) | ||
|
||
# cache the generated method | ||
setattr(self, name, wrapper) | ||
return wrapper | ||
|
||
raise AttributeError(name) | ||
|
||
def __dir__(self): | ||
attrs = set(super().__dir__()) | ||
attrs.update(a for a in _FILE_SYNC_ATTRS if hasattr(self.wrapped, a)) | ||
attrs.update(a for a in _FILE_ASYNC_METHODS if hasattr(self.wrapped, a)) | ||
return attrs | ||
|
||
|
||
@aiter_compat | ||
def __aiter__(self): | ||
return self | ||
|
||
async def __anext__(self): | ||
line = await self.readline() | ||
if line: | ||
return line | ||
else: | ||
raise StopAsyncIteration | ||
|
||
async def __aenter__(self): | ||
return self | ||
|
||
async def __aexit__(self, typ, value, traceback): | ||
await self.close() | ||
|
||
async def detach(self): | ||
"""Like :meth:`~io.BufferedIOBase.detach`, but async. | ||
|
||
This also re-wraps the result in a new :term:`asynchronous file object` | ||
wrapper. | ||
|
||
""" | ||
|
||
raw = await trio.run_in_worker_thread(self._wrapped.detach) | ||
return wrap_file(raw) | ||
|
||
async def close(self): | ||
"""Like :meth:`~io.IOBase.close`, but async. | ||
|
||
This is also shielded from cancellation; if a cancellation scope is | ||
cancelled, the wrapped file object will still be safely closed. | ||
|
||
""" | ||
|
||
# ensure the underling file is closed during cancellation | ||
with _core.open_cancel_scope(shield=True): | ||
await trio.run_in_worker_thread(self._wrapped.close) | ||
|
||
await _core.yield_if_cancelled() | ||
|
||
|
||
async def open_file(file, mode='r', buffering=-1, encoding=None, errors=None, | ||
newline=None, closefd=True, opener=None): | ||
"""Asynchronous version of :func:`~io.open`. | ||
|
||
Returns: | ||
An :term:`asynchronous file object` | ||
|
||
Example:: | ||
|
||
async with await trio.open_file(filename) as f: | ||
async for line in f: | ||
pass | ||
|
||
assert f.closed | ||
|
||
""" | ||
# python3.5 compat | ||
if isinstance(file, trio.Path): | ||
file = file.__fspath__() | ||
|
||
_file = wrap_file(await trio.run_in_worker_thread(io.open, file, mode, | ||
buffering, encoding, errors, newline, closefd, opener)) | ||
return _file | ||
|
||
|
||
def wrap_file(file): | ||
"""This wraps any file object in a wrapper that provides an asynchronous file | ||
object interface. | ||
|
||
Args: | ||
file: a :term:`file object` | ||
|
||
Returns: | ||
An :term:`asynchronous file object` that wraps `file` | ||
|
||
Example:: | ||
|
||
async_file = trio.wrap_file(StringIO('asdf')) | ||
|
||
assert await async_file.read() == 'asdf' | ||
|
||
""" | ||
|
||
def has(attr): | ||
return hasattr(file, attr) and callable(getattr(file, attr)) | ||
|
||
if not (has('close') and (has('read') or has('write'))): | ||
raise TypeError('{} does not implement required duck-file methods: ' | ||
'close and (read or write)'.format(file)) | ||
|
||
return AsyncIOWrapper(file) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,163 @@ | ||
from functools import wraps, partial | ||
import os | ||
import types | ||
import pathlib | ||
|
||
import trio | ||
from trio._util import async_wraps | ||
|
||
|
||
__all__ = ['Path'] | ||
|
||
|
||
# python3.5 compat: __fspath__ does not exist in 3.5, so unwrap any trio.Path | ||
# being passed to any wrapped method | ||
def unwrap_paths(args): | ||
new_args = [] | ||
for arg in args: | ||
if isinstance(arg, Path): | ||
arg = arg._wrapped | ||
new_args.append(arg) | ||
return new_args | ||
|
||
|
||
# re-wrap return value from methods that return new instances of pathlib.Path | ||
def rewrap_path(value): | ||
if isinstance(value, pathlib.Path): | ||
value = Path(value) | ||
return value | ||
|
||
|
||
def _forward_factory(cls, attr_name, attr): | ||
@wraps(attr) | ||
def wrapper(self, *args, **kwargs): | ||
args = unwrap_paths(args) | ||
attr = getattr(self._wrapped, attr_name) | ||
value = attr(*args, **kwargs) | ||
return rewrap_path(value) | ||
|
||
return wrapper | ||
|
||
|
||
def _forward_magic(cls, attr): | ||
sentinel = object() | ||
@wraps(attr) | ||
def wrapper(self, other=sentinel): | ||
if other is sentinel: | ||
return attr(self._wrapped) | ||
if isinstance(other, cls): | ||
other = other._wrapped | ||
value = attr(self._wrapped, other) | ||
return rewrap_path(value) | ||
return wrapper | ||
|
||
|
||
def thread_wrapper_factory(cls, meth_name): | ||
@async_wraps(cls, pathlib.Path, meth_name) | ||
async def wrapper(self, *args, **kwargs): | ||
args = unwrap_paths(args) | ||
meth = getattr(self._wrapped, meth_name) | ||
func = partial(meth, *args, **kwargs) | ||
value = await trio.run_in_worker_thread(func) | ||
return rewrap_path(value) | ||
|
||
return wrapper | ||
|
||
|
||
class AsyncAutoWrapperType(type): | ||
def __init__(cls, name, bases, attrs): | ||
super().__init__(name, bases, attrs) | ||
|
||
cls._forward = [] | ||
type(cls).generate_forwards(cls, attrs) | ||
type(cls).generate_wraps(cls, attrs) | ||
type(cls).generate_magic(cls, attrs) | ||
|
||
def generate_forwards(cls, attrs): | ||
# forward functions of _forwards | ||
for attr_name, attr in cls._forwards.__dict__.items(): | ||
if attr_name.startswith('_') or attr_name in attrs: | ||
continue | ||
|
||
if isinstance(attr, property): | ||
cls._forward.append(attr_name) | ||
elif isinstance(attr, types.FunctionType): | ||
wrapper = _forward_factory(cls, attr_name, attr) | ||
setattr(cls, attr_name, wrapper) | ||
else: | ||
raise TypeError(attr_name, type(attr)) | ||
|
||
def generate_wraps(cls, attrs): | ||
# generate wrappers for functions of _wraps | ||
for attr_name, attr in cls._wraps.__dict__.items(): | ||
if attr_name.startswith('_') or attr_name in attrs: | ||
continue | ||
|
||
if isinstance(attr, classmethod): | ||
setattr(cls, attr_name, attr) | ||
elif isinstance(attr, types.FunctionType): | ||
wrapper = thread_wrapper_factory(cls, attr_name) | ||
setattr(cls, attr_name, wrapper) | ||
else: | ||
raise TypeError(attr_name, type(attr)) | ||
|
||
def generate_magic(cls, attrs): | ||
# generate wrappers for magic | ||
for attr_name in cls._forward_magic: | ||
attr = getattr(cls._forwards, attr_name) | ||
wrapper = _forward_magic(cls, attr) | ||
setattr(cls, attr_name, wrapper) | ||
|
||
|
||
class Path(metaclass=AsyncAutoWrapperType): | ||
"""A :class:`~pathlib.Path` wrapper that executes blocking Path methods in | ||
:meth:`trio.run_in_worker_thread`. | ||
|
||
""" | ||
|
||
_wraps = pathlib.Path | ||
_forwards = pathlib.PurePath | ||
_forward_magic = [ | ||
'__str__', '__bytes__', '__truediv__', '__rtruediv__', | ||
'__eq__', '__lt__', '__le__', '__gt__', '__ge__' | ||
] | ||
|
||
def __init__(self, *args): | ||
args = unwrap_paths(args) | ||
|
||
self._wrapped = pathlib.Path(*args) | ||
|
||
def __getattr__(self, name): | ||
if name in self._forward: | ||
value = getattr(self._wrapped, name) | ||
return rewrap_path(value) | ||
raise AttributeError(name) | ||
|
||
def __dir__(self): | ||
return super().__dir__() + self._forward | ||
|
||
def __repr__(self): | ||
return 'trio.Path({})'.format(str(self)) | ||
|
||
def __fspath__(self): | ||
try: | ||
return self._wrapped.__fspath__() | ||
# python3.5 compat | ||
except AttributeError: # pragma: no cover | ||
return str(self) | ||
|
||
async def open(self, *args, **kwargs): | ||
func = partial(self._wrapped.open, *args, **kwargs) | ||
value = await trio.run_in_worker_thread(func) | ||
return trio.wrap_file(value) | ||
|
||
|
||
# The value of Path.absolute.__doc__ makes a reference to | ||
# :meth:~pathlib.Path.absolute, which does not exist. Removing this makes more | ||
# sense than inventing our own special docstring for this. | ||
del Path.absolute.__doc__ | ||
|
||
|
||
# python3.5 compat | ||
if hasattr(os, 'PathLike'): | ||
os.PathLike.register(Path) |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having a glossary is an interesting idea... we should probably add checkpoint and some other key terms (maybe nursery), but that can be a separate PR and I think I like it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍