Skip to content

Commit

Permalink
black
Browse files Browse the repository at this point in the history
  • Loading branch information
CJ-Wright committed Sep 18, 2018
1 parent 5be3026 commit 98c548e
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 26 deletions.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

setup(
name="streamz_ext",
version='0.2.1',
version="0.2.1",
description="Streams",
url="http://github.com/xpdAcq/streamz_ext/",
maintainer="Christopher J. Wright",
Expand Down
28 changes: 17 additions & 11 deletions streamz_ext/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

from collections import Sequence

NULL_COMPUTE = '~~NULL_COMPUTE~~'
NULL_COMPUTE = "~~NULL_COMPUTE~~"


def return_null(func):
Expand All @@ -33,8 +33,9 @@ def inner(x, *args, **kwargs):
def filter_null_wrapper(func):
@wraps(func)
def inner(*args, **kwargs):
if (any(a == NULL_COMPUTE for a in args)
or any(v == NULL_COMPUTE for v in kwargs.values())):
if any(a == NULL_COMPUTE for a in args) or any(
v == NULL_COMPUTE for v in kwargs.values()
):
return NULL_COMPUTE
else:
return func(*args, **kwargs)
Expand Down Expand Up @@ -144,8 +145,13 @@ class gather(ParallelStream):
def update(self, x, who=None):
client = self.default_client()
result = yield client.gather(x, asynchronous=True)
if not ((isinstance(result, Sequence) and any(
r == NULL_COMPUTE for r in result)) or result == NULL_COMPUTE):
if not (
(
isinstance(result, Sequence)
and any(r == NULL_COMPUTE for r in result)
)
or result == NULL_COMPUTE
):
result2 = yield self._emit(result)
raise gen.Return(result2)

Expand All @@ -168,12 +174,12 @@ def update(self, x, who=None):
@ParallelStream.register_api()
class accumulate(ParallelStream):
def __init__(
self,
upstream,
func,
start=core.no_default,
returns_state=False,
**kwargs
self,
upstream,
func,
start=core.no_default,
returns_state=False,
**kwargs
):
self.func = filter_null_wrapper(func)
self.state = start
Expand Down
18 changes: 10 additions & 8 deletions test/test_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ def f(acc, i):

L = (
scatter(source, backend=backend)
.scan(f, returns_state=True)
.gather()
.sink_to_list()
.scan(f, returns_state=True)
.gather()
.sink_to_list()
)
for i in range(3):
yield source.emit(i)
Expand Down Expand Up @@ -141,10 +141,10 @@ def test_buffer(backend):
source = Stream(asynchronous=True)
L = (
source.scatter(backend=backend)
.map(slowinc, delay=0.5)
.buffer(5)
.gather()
.sink_to_list()
.map(slowinc, delay=0.5)
.buffer(5)
.gather()
.sink_to_list()
)

start = time.time()
Expand Down Expand Up @@ -186,7 +186,9 @@ def test_filter(backend):
@gen_test()
def test_filter_map(backend):
source = Stream(asynchronous=True)
futures = scatter(source, backend=backend).filter(lambda x: x % 2 == 0).map(inc)
futures = (
scatter(source, backend=backend).filter(lambda x: x % 2 == 0).map(inc)
)
futures_L = futures.sink_to_list()
L = futures.gather().sink_to_list()

Expand Down
11 changes: 5 additions & 6 deletions test/test_parallel_dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,10 @@ def test_buffer(c, s, a, b):
source = Stream(asynchronous=True)
L = (
source.scatter()
.map(slowinc, delay=0.5)
.buffer(5)
.gather()
.sink_to_list()
.map(slowinc, delay=0.5)
.buffer(5)
.gather()
.sink_to_list()
)

start = time.time()
Expand Down Expand Up @@ -211,8 +211,7 @@ def test_filter(c, s, a, b):
@gen_cluster(client=True)
def test_filter_map(c, s, a, b):
source = Stream(asynchronous=True)
futures = scatter(source).filter(
lambda x: x % 2 == 0).map(inc)
futures = scatter(source).filter(lambda x: x % 2 == 0).map(inc)
futures_L = futures.sink_to_list()
L = futures.gather().sink_to_list()

Expand Down

0 comments on commit 98c548e

Please sign in to comment.