diff --git a/setup.py b/setup.py index 65d2c69..64fa7d7 100755 --- a/setup.py +++ b/setup.py @@ -6,7 +6,7 @@ setup( name="streamz_ext", - version='0.2.1', + version="0.2.1", description="Streams", url="http://github.com/xpdAcq/streamz_ext/", maintainer="Christopher J. Wright", diff --git a/streamz_ext/parallel.py b/streamz_ext/parallel.py index e161921..ac612ac 100644 --- a/streamz_ext/parallel.py +++ b/streamz_ext/parallel.py @@ -15,7 +15,7 @@ from collections import Sequence -NULL_COMPUTE = '~~NULL_COMPUTE~~' +NULL_COMPUTE = "~~NULL_COMPUTE~~" def return_null(func): @@ -33,8 +33,9 @@ def inner(x, *args, **kwargs): def filter_null_wrapper(func): @wraps(func) def inner(*args, **kwargs): - if (any(a == NULL_COMPUTE for a in args) - or any(v == NULL_COMPUTE for v in kwargs.values())): + if any(a == NULL_COMPUTE for a in args) or any( + v == NULL_COMPUTE for v in kwargs.values() + ): return NULL_COMPUTE else: return func(*args, **kwargs) @@ -144,8 +145,13 @@ class gather(ParallelStream): def update(self, x, who=None): client = self.default_client() result = yield client.gather(x, asynchronous=True) - if not ((isinstance(result, Sequence) and any( - r == NULL_COMPUTE for r in result)) or result == NULL_COMPUTE): + if not ( + ( + isinstance(result, Sequence) + and any(r == NULL_COMPUTE for r in result) + ) + or result == NULL_COMPUTE + ): result2 = yield self._emit(result) raise gen.Return(result2) @@ -168,12 +174,12 @@ def update(self, x, who=None): @ParallelStream.register_api() class accumulate(ParallelStream): def __init__( - self, - upstream, - func, - start=core.no_default, - returns_state=False, - **kwargs + self, + upstream, + func, + start=core.no_default, + returns_state=False, + **kwargs ): self.func = filter_null_wrapper(func) self.state = start diff --git a/test/test_parallel.py b/test/test_parallel.py index cf49024..d6be14a 100644 --- a/test/test_parallel.py +++ b/test/test_parallel.py @@ -71,9 +71,9 @@ def f(acc, i): L = ( scatter(source, backend=backend) - .scan(f, returns_state=True) - .gather() - .sink_to_list() + .scan(f, returns_state=True) + .gather() + .sink_to_list() ) for i in range(3): yield source.emit(i) @@ -141,10 +141,10 @@ def test_buffer(backend): source = Stream(asynchronous=True) L = ( source.scatter(backend=backend) - .map(slowinc, delay=0.5) - .buffer(5) - .gather() - .sink_to_list() + .map(slowinc, delay=0.5) + .buffer(5) + .gather() + .sink_to_list() ) start = time.time() @@ -186,7 +186,9 @@ def test_filter(backend): @gen_test() def test_filter_map(backend): source = Stream(asynchronous=True) - futures = scatter(source, backend=backend).filter(lambda x: x % 2 == 0).map(inc) + futures = ( + scatter(source, backend=backend).filter(lambda x: x % 2 == 0).map(inc) + ) futures_L = futures.sink_to_list() L = futures.gather().sink_to_list() diff --git a/test/test_parallel_dask.py b/test/test_parallel_dask.py index 868896a..5ff7c1b 100644 --- a/test/test_parallel_dask.py +++ b/test/test_parallel_dask.py @@ -117,10 +117,10 @@ def test_buffer(c, s, a, b): source = Stream(asynchronous=True) L = ( source.scatter() - .map(slowinc, delay=0.5) - .buffer(5) - .gather() - .sink_to_list() + .map(slowinc, delay=0.5) + .buffer(5) + .gather() + .sink_to_list() ) start = time.time() @@ -211,8 +211,7 @@ def test_filter(c, s, a, b): @gen_cluster(client=True) def test_filter_map(c, s, a, b): source = Stream(asynchronous=True) - futures = scatter(source).filter( - lambda x: x % 2 == 0).map(inc) + futures = scatter(source).filter(lambda x: x % 2 == 0).map(inc) futures_L = futures.sink_to_list() L = futures.gather().sink_to_list()