From 8e4091c55739329f3efb08b49cb27979f7a2d529 Mon Sep 17 00:00:00 2001 From: christopher Date: Fri, 28 Sep 2018 12:19:35 -0400 Subject: [PATCH] black --- streamz_ext/clients.py | 2 +- streamz_ext/core.py | 2 -- test/test_parallel.py | 15 +++++++-------- test/test_parallel_dask.py | 4 ++-- 4 files changed, 10 insertions(+), 13 deletions(-) diff --git a/streamz_ext/clients.py b/streamz_ext/clients.py index 188b8a4..aadf3de 100644 --- a/streamz_ext/clients.py +++ b/streamz_ext/clients.py @@ -13,7 +13,7 @@ def result_maybe(future_maybe): return future_maybe.result() except AttributeError: if isinstance(future_maybe, Sequence) and not isinstance( - future_maybe, str + future_maybe, str ): aa = [] for a in future_maybe: diff --git a/streamz_ext/core.py b/streamz_ext/core.py index 0111b12..12410ad 100644 --- a/streamz_ext/core.py +++ b/streamz_ext/core.py @@ -288,5 +288,3 @@ def destroy_pipeline(source_node: Stream): # some source nodes are tuples and some are bad wekrefs except (AttributeError, KeyError) as e: pass - - diff --git a/test/test_parallel.py b/test/test_parallel.py index 17e3a86..da10448 100644 --- a/test/test_parallel.py +++ b/test/test_parallel.py @@ -324,17 +324,16 @@ def test_combined_latest(backend): def delay(x): time.sleep(.5) return x + source = Stream(asynchronous=True) source2 = Stream(asynchronous=True) - futures = source.scatter(backend=backend).map(delay).combine_latest( - source2.scatter(backend=backend), emit_on=1) - futures_L = futures.sink_to_list() - L = ( - futures - .buffer(10) - .gather() - .sink_to_list() + futures = ( + source.scatter(backend=backend) + .map(delay) + .combine_latest(source2.scatter(backend=backend), emit_on=1) ) + futures_L = futures.sink_to_list() + L = futures.buffer(10).gather().sink_to_list() for i in range(5): yield source.emit(i) diff --git a/test/test_parallel_dask.py b/test/test_parallel_dask.py index f7e1913..2fac8a9 100644 --- a/test/test_parallel_dask.py +++ b/test/test_parallel_dask.py @@ -265,9 +265,9 @@ def test_double_scatter(c, s, a, b): r = sm.buffer(10).gather() L = r.sink_to_list() - print('hi') + print("hi") for i in range(5): - print('hi') + print("hi") yield source1.emit(i) yield source2.emit(i)