-
-
Notifications
You must be signed in to change notification settings - Fork 350
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #180 from buhman/io
add asynchronous file io and path wrappers
- Loading branch information
Showing
9 changed files
with
768 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
:orphan: | ||
|
||
.. _glossary: | ||
|
||
******** | ||
Glossary | ||
******** | ||
|
||
.. glossary:: | ||
|
||
asynchronous file object | ||
This is an object with an API identical to a :term:`file object`, with | ||
the exception that all non-computational methods are async functions. | ||
|
||
The main way to create an asynchronous file object is by using the | ||
:func:`trio.open_file` function. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -83,3 +83,4 @@ Vital statistics: | |
* :ref:`genindex` | ||
* :ref:`modindex` | ||
* :ref:`search` | ||
* :ref:`glossary` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,168 @@ | ||
from functools import partial | ||
import io | ||
|
||
import trio | ||
from trio import _core | ||
from trio._util import aiter_compat, async_wraps | ||
|
||
|
||
__all__ = ['open_file', 'wrap_file'] | ||
|
||
_FILE_SYNC_ATTRS = { | ||
'closed', | ||
'encoding', 'errors', 'fileno', 'isatty', 'newlines', | ||
'readable', 'seekable', 'writable', | ||
# not defined in *IOBase: | ||
'buffer', 'raw', 'line_buffering', 'closefd', 'name', 'mode', | ||
'getvalue', 'getbuffer', | ||
} | ||
|
||
_FILE_ASYNC_METHODS = { | ||
'flush', | ||
'read', 'read1', 'readall', 'readinto', 'readline', 'readlines', | ||
'seek', 'tell', 'truncate', | ||
'write', 'writelines', | ||
# not defined in *IOBase: | ||
'readinto1', 'peek', | ||
} | ||
|
||
|
||
class AsyncIOWrapper: | ||
"""A generic :class:`~io.IOBase` wrapper that implements the :term:`asynchronous | ||
file object` interface. Wrapped methods that could block are executed in | ||
:meth:`trio.run_in_worker_thread`. | ||
All properties and methods defined in in :mod:`~io` are exposed by this | ||
wrapper, if they exist in the wrapped file object. | ||
""" | ||
|
||
def __init__(self, file): | ||
self._wrapped = file | ||
|
||
@property | ||
def wrapped(self): | ||
"""object: A reference to the wrapped file object | ||
""" | ||
|
||
return self._wrapped | ||
|
||
def __getattr__(self, name): | ||
if name in _FILE_SYNC_ATTRS: | ||
return getattr(self._wrapped, name) | ||
if name in _FILE_ASYNC_METHODS: | ||
meth = getattr(self._wrapped, name) | ||
|
||
@async_wraps(self.__class__, self._wrapped.__class__, name) | ||
async def wrapper(*args, **kwargs): | ||
func = partial(meth, *args, **kwargs) | ||
return await trio.run_in_worker_thread(func) | ||
|
||
# cache the generated method | ||
setattr(self, name, wrapper) | ||
return wrapper | ||
|
||
raise AttributeError(name) | ||
|
||
def __dir__(self): | ||
attrs = set(super().__dir__()) | ||
attrs.update(a for a in _FILE_SYNC_ATTRS if hasattr(self.wrapped, a)) | ||
attrs.update(a for a in _FILE_ASYNC_METHODS if hasattr(self.wrapped, a)) | ||
return attrs | ||
|
||
|
||
@aiter_compat | ||
def __aiter__(self): | ||
return self | ||
|
||
async def __anext__(self): | ||
line = await self.readline() | ||
if line: | ||
return line | ||
else: | ||
raise StopAsyncIteration | ||
|
||
async def __aenter__(self): | ||
return self | ||
|
||
async def __aexit__(self, typ, value, traceback): | ||
await self.close() | ||
|
||
async def detach(self): | ||
"""Like :meth:`~io.BufferedIOBase.detach`, but async. | ||
This also re-wraps the result in a new :term:`asynchronous file object` | ||
wrapper. | ||
""" | ||
|
||
raw = await trio.run_in_worker_thread(self._wrapped.detach) | ||
return wrap_file(raw) | ||
|
||
async def close(self): | ||
"""Like :meth:`~io.IOBase.close`, but async. | ||
This is also shielded from cancellation; if a cancellation scope is | ||
cancelled, the wrapped file object will still be safely closed. | ||
""" | ||
|
||
# ensure the underling file is closed during cancellation | ||
with _core.open_cancel_scope(shield=True): | ||
await trio.run_in_worker_thread(self._wrapped.close) | ||
|
||
await _core.yield_if_cancelled() | ||
|
||
|
||
async def open_file(file, mode='r', buffering=-1, encoding=None, errors=None, | ||
newline=None, closefd=True, opener=None): | ||
"""Asynchronous version of :func:`~io.open`. | ||
Returns: | ||
An :term:`asynchronous file object` | ||
Example:: | ||
async with await trio.open_file(filename) as f: | ||
async for line in f: | ||
pass | ||
assert f.closed | ||
""" | ||
# python3.5 compat | ||
if isinstance(file, trio.Path): | ||
file = file.__fspath__() | ||
|
||
_file = wrap_file(await trio.run_in_worker_thread(io.open, file, mode, | ||
buffering, encoding, errors, newline, closefd, opener)) | ||
return _file | ||
|
||
|
||
def wrap_file(file): | ||
"""This wraps any file object in a wrapper that provides an asynchronous file | ||
object interface. | ||
Args: | ||
file: a :term:`file object` | ||
Returns: | ||
An :term:`asynchronous file object` that wraps `file` | ||
Example:: | ||
async_file = trio.wrap_file(StringIO('asdf')) | ||
assert await async_file.read() == 'asdf' | ||
""" | ||
|
||
def has(attr): | ||
return hasattr(file, attr) and callable(getattr(file, attr)) | ||
|
||
if not (has('close') and (has('read') or has('write'))): | ||
raise TypeError('{} does not implement required duck-file methods: ' | ||
'close and (read or write)'.format(file)) | ||
|
||
return AsyncIOWrapper(file) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,163 @@ | ||
from functools import wraps, partial | ||
import os | ||
import types | ||
import pathlib | ||
|
||
import trio | ||
from trio._util import async_wraps | ||
|
||
|
||
__all__ = ['Path'] | ||
|
||
|
||
# python3.5 compat: __fspath__ does not exist in 3.5, so unwrap any trio.Path | ||
# being passed to any wrapped method | ||
def unwrap_paths(args): | ||
new_args = [] | ||
for arg in args: | ||
if isinstance(arg, Path): | ||
arg = arg._wrapped | ||
new_args.append(arg) | ||
return new_args | ||
|
||
|
||
# re-wrap return value from methods that return new instances of pathlib.Path | ||
def rewrap_path(value): | ||
if isinstance(value, pathlib.Path): | ||
value = Path(value) | ||
return value | ||
|
||
|
||
def _forward_factory(cls, attr_name, attr): | ||
@wraps(attr) | ||
def wrapper(self, *args, **kwargs): | ||
args = unwrap_paths(args) | ||
attr = getattr(self._wrapped, attr_name) | ||
value = attr(*args, **kwargs) | ||
return rewrap_path(value) | ||
|
||
return wrapper | ||
|
||
|
||
def _forward_magic(cls, attr): | ||
sentinel = object() | ||
@wraps(attr) | ||
def wrapper(self, other=sentinel): | ||
if other is sentinel: | ||
return attr(self._wrapped) | ||
if isinstance(other, cls): | ||
other = other._wrapped | ||
value = attr(self._wrapped, other) | ||
return rewrap_path(value) | ||
return wrapper | ||
|
||
|
||
def thread_wrapper_factory(cls, meth_name): | ||
@async_wraps(cls, pathlib.Path, meth_name) | ||
async def wrapper(self, *args, **kwargs): | ||
args = unwrap_paths(args) | ||
meth = getattr(self._wrapped, meth_name) | ||
func = partial(meth, *args, **kwargs) | ||
value = await trio.run_in_worker_thread(func) | ||
return rewrap_path(value) | ||
|
||
return wrapper | ||
|
||
|
||
class AsyncAutoWrapperType(type): | ||
def __init__(cls, name, bases, attrs): | ||
super().__init__(name, bases, attrs) | ||
|
||
cls._forward = [] | ||
type(cls).generate_forwards(cls, attrs) | ||
type(cls).generate_wraps(cls, attrs) | ||
type(cls).generate_magic(cls, attrs) | ||
|
||
def generate_forwards(cls, attrs): | ||
# forward functions of _forwards | ||
for attr_name, attr in cls._forwards.__dict__.items(): | ||
if attr_name.startswith('_') or attr_name in attrs: | ||
continue | ||
|
||
if isinstance(attr, property): | ||
cls._forward.append(attr_name) | ||
elif isinstance(attr, types.FunctionType): | ||
wrapper = _forward_factory(cls, attr_name, attr) | ||
setattr(cls, attr_name, wrapper) | ||
else: | ||
raise TypeError(attr_name, type(attr)) | ||
|
||
def generate_wraps(cls, attrs): | ||
# generate wrappers for functions of _wraps | ||
for attr_name, attr in cls._wraps.__dict__.items(): | ||
if attr_name.startswith('_') or attr_name in attrs: | ||
continue | ||
|
||
if isinstance(attr, classmethod): | ||
setattr(cls, attr_name, attr) | ||
elif isinstance(attr, types.FunctionType): | ||
wrapper = thread_wrapper_factory(cls, attr_name) | ||
setattr(cls, attr_name, wrapper) | ||
else: | ||
raise TypeError(attr_name, type(attr)) | ||
|
||
def generate_magic(cls, attrs): | ||
# generate wrappers for magic | ||
for attr_name in cls._forward_magic: | ||
attr = getattr(cls._forwards, attr_name) | ||
wrapper = _forward_magic(cls, attr) | ||
setattr(cls, attr_name, wrapper) | ||
|
||
|
||
class Path(metaclass=AsyncAutoWrapperType): | ||
"""A :class:`~pathlib.Path` wrapper that executes blocking Path methods in | ||
:meth:`trio.run_in_worker_thread`. | ||
""" | ||
|
||
_wraps = pathlib.Path | ||
_forwards = pathlib.PurePath | ||
_forward_magic = [ | ||
'__str__', '__bytes__', '__truediv__', '__rtruediv__', | ||
'__eq__', '__lt__', '__le__', '__gt__', '__ge__' | ||
] | ||
|
||
def __init__(self, *args): | ||
args = unwrap_paths(args) | ||
|
||
self._wrapped = pathlib.Path(*args) | ||
|
||
def __getattr__(self, name): | ||
if name in self._forward: | ||
value = getattr(self._wrapped, name) | ||
return rewrap_path(value) | ||
raise AttributeError(name) | ||
|
||
def __dir__(self): | ||
return super().__dir__() + self._forward | ||
|
||
def __repr__(self): | ||
return 'trio.Path({})'.format(str(self)) | ||
|
||
def __fspath__(self): | ||
try: | ||
return self._wrapped.__fspath__() | ||
# python3.5 compat | ||
except AttributeError: # pragma: no cover | ||
return str(self) | ||
|
||
async def open(self, *args, **kwargs): | ||
func = partial(self._wrapped.open, *args, **kwargs) | ||
value = await trio.run_in_worker_thread(func) | ||
return trio.wrap_file(value) | ||
|
||
|
||
# The value of Path.absolute.__doc__ makes a reference to | ||
# :meth:~pathlib.Path.absolute, which does not exist. Removing this makes more | ||
# sense than inventing our own special docstring for this. | ||
del Path.absolute.__doc__ | ||
|
||
|
||
# python3.5 compat | ||
if hasattr(os, 'PathLike'): | ||
os.PathLike.register(Path) |
Oops, something went wrong.