Skip to content

Commit

Permalink
black
Browse files Browse the repository at this point in the history
  • Loading branch information
CJ-Wright committed Jul 31, 2018
1 parent 422f61d commit 1004ed3
Show file tree
Hide file tree
Showing 15 changed files with 67 additions and 54 deletions.
6 changes: 3 additions & 3 deletions run_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
import sys
import pytest

if __name__ == '__main__':
if __name__ == "__main__":
# show output results from every test function
args = ['-v']
args = ["-v"]
# show the message output for skipped and expected failure tests
if len(sys.argv) > 1:
args.extend(sys.argv[1:])
print('pytest arguments: {}'.format(args))
print("pytest arguments: {}".format(args))
# # compute coverage stats for xpdAcq
# args.extend(['--cov', 'xpdAcq'])
# call pytest and exit with the return code from pytest so that
Expand Down
29 changes: 16 additions & 13 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,19 @@
from setuptools import setup, find_packages


setup(name='streamz_ext',
version='0.2.0',
description='Streams',
url='http://github.com/xpdAcq/streamz_ext/',
maintainer='Christopher J. Wright',
maintainer_email='[email protected]',
license='BSD',
keywords='streams',
packages=find_packages(),
long_description=(open('README.rst').read() if exists('README.rst')
else ''),
install_requires=list(open('requirements.txt').read().strip().split('\n')),
zip_safe=False)
setup(
name="streamz_ext",
version="0.2.0",
description="Streams",
url="http://github.com/xpdAcq/streamz_ext/",
maintainer="Christopher J. Wright",
maintainer_email="[email protected]",
license="BSD",
keywords="streams",
packages=find_packages(),
long_description=(
open("README.rst").read() if exists("README.rst") else ""
),
install_requires=list(open("requirements.txt").read().strip().split("\n")),
zip_safe=False,
)
2 changes: 1 addition & 1 deletion streamz_ext/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
__version__ = '0.2.0'
__version__ = "0.2.0"

from .core import *
2 changes: 1 addition & 1 deletion streamz_ext/batch.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from streamz.batch import *
from streamz.batch import *
24 changes: 14 additions & 10 deletions streamz_ext/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
from collections.abc import Sequence

from streamz.core import *
from streamz.core import (combine_latest as _combine_latest,
zip as _zip,
zip_latest as _zip_latest)
from streamz.core import (
combine_latest as _combine_latest,
zip as _zip,
zip_latest as _zip_latest,
)
from streamz.core import _global_sinks, _truthy


Expand All @@ -30,7 +32,8 @@ class starsink(Stream):
map
Stream.sink_to_list
"""
_graphviz_shape = 'trapezium'

_graphviz_shape = "trapezium"

def __init__(self, upstream, func, *args, **kwargs):
self.func = func
Expand Down Expand Up @@ -71,11 +74,12 @@ class filter(Stream):
2
4
"""

def __init__(self, upstream, predicate, *args, **kwargs):
if predicate is None:
predicate = _truthy
self.predicate = predicate
stream_name = kwargs.pop('stream_name', None)
stream_name = kwargs.pop("stream_name", None)
self.kwargs = kwargs
self.args = args

Expand Down Expand Up @@ -112,6 +116,7 @@ def __init__(self, upstream, history=None, key=identity, **kwargs):
self.key = key
if history:
from zict import LRU

self.seen = LRU(history, self.seen)
self.non_hash_seen = deque(maxlen=history)

Expand Down Expand Up @@ -143,7 +148,6 @@ def _first(node, f):
del n



@Stream.register_api()
class combine_latest(_combine_latest):
""" Combine multiple streams together to a stream of tuples
Expand All @@ -166,14 +170,13 @@ class combine_latest(_combine_latest):
"""

def __init__(self, *upstreams, **kwargs):
first = kwargs.pop('first', None)
first = kwargs.pop("first", None)

_combine_latest.__init__(self, *upstreams, **kwargs)
if first:
_first(self, first)



@Stream.register_api()
class zip(_zip):
""" Combine streams together into a stream of tuples
Expand All @@ -193,7 +196,7 @@ class zip(_zip):
"""

def __init__(self, *upstreams, **kwargs):
first = kwargs.pop('first', None)
first = kwargs.pop("first", None)

_zip.__init__(self, *upstreams, **kwargs)
if first:
Expand Down Expand Up @@ -222,8 +225,9 @@ class zip_latest(_zip_latest):
Stream.combine_latest
Stream.zip
"""

def __init__(self, *upstreams, **kwargs):
first = kwargs.pop('first', None)
first = kwargs.pop("first", None)

_zip_latest.__init__(self, *upstreams, **kwargs)
if first:
Expand Down
2 changes: 1 addition & 1 deletion streamz_ext/dask.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from streamz.dask import *
from streamz.dask import *
2 changes: 1 addition & 1 deletion streamz_ext/dataframe.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from streamz.dataframe import *
from streamz.dataframe import *
2 changes: 1 addition & 1 deletion streamz_ext/graph.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from streamz.graph import *
from streamz.graph import *
3 changes: 2 additions & 1 deletion streamz_ext/link.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ def micro_link(input_graph, output_graph):
input_graph[name].connect(output_graph[name])
# TODO: idiomatic way to do this? (essentially left update)
input_graph.update(
{k: v for k, v in output_graph.items() if k not in input_graph})
{k: v for k, v in output_graph.items() if k not in input_graph}
)


# TODO: support bypass? Maybe that should be a separate pipeline optimization
Expand Down
2 changes: 1 addition & 1 deletion streamz_ext/sources.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from streamz.sources import *
from streamz.sources import *
1 change: 1 addition & 0 deletions test/test_batch.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from streamz_ext.batch import *

try:
from streamz.tests.test_batch import *
except ImportError:
Expand Down
20 changes: 10 additions & 10 deletions test/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,22 @@ def test_unique_dict():
source = Stream()
L = source.unique(history=1).sink_to_list()

source.emit({'a': 1})
source.emit({'a': 1})
source.emit({'b': 1})
source.emit({"a": 1})
source.emit({"a": 1})
source.emit({"b": 1})

assert L == [{'a': 1}, {'b': 1}]
assert L == [{"a": 1}, {"b": 1}]


def test_unique_list():
source = Stream()
L = source.unique(history=1).sink_to_list()

source.emit(['a'])
source.emit(['a'])
source.emit(['b'])
source.emit(["a"])
source.emit(["a"])
source.emit(["b"])

assert L == [['a'], ['b']]
assert L == [["a"], ["b"]]


def test_execution_order():
Expand All @@ -50,7 +50,7 @@ def test_execution_order():
b = s.pluck(1)
a = s.pluck(0)
l = a.combine_latest(b, emit_on=a).sink_to_list()
z = [(1, 'red'), (2, 'blue'), (3, 'green')]
z = [(1, "red"), (2, "blue"), (3, "green")]
for zz in z:
s.emit(zz)
L.append((l,))
Expand All @@ -63,7 +63,7 @@ def test_execution_order():
a = s.pluck(0)
b = s.pluck(1)
l = a.combine_latest(b, emit_on=a).sink_to_list()
z = [(1, 'red'), (2, 'blue'), (3, 'green')]
z = [(1, "red"), (2, "blue"), (3, "green")]
for zz in z:
s.emit(zz)
L2.append((l,))
Expand Down
1 change: 1 addition & 0 deletions test/test_dask.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from streamz_ext.dask import *

try:
from streamz.tests.test_dask import *
except ImportError:
Expand Down
1 change: 1 addition & 0 deletions test/test_graph.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from streamz_ext.graph import *

try:
from streamz.tests.test_graph import *
except ImportError:
Expand Down
24 changes: 13 additions & 11 deletions test/test_link.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from streamz_ext.link import *
from streamz_ext import Stream

try:
from streamz.tests.test_link import *
except ImportError:
Expand All @@ -10,21 +11,22 @@ def test_link():
def make_a():
source = Stream()
out_a = source.map(lambda x: x + 1)
return {'in_a': source, 'out_a': out_a}
return {"in_a": source, "out_a": out_a}

def make_b():
out_a = Stream()
out_b = out_a.map(lambda x: x * 2)
return {'out_a': out_a, 'out_b': out_b}
return {"out_a": out_a, "out_b": out_b}

a = make_a()
b = make_b()
L = b['out_b'].sink_to_list()
L = b["out_b"].sink_to_list()
for i in range(10):
a['in_a'].emit(i)
a["in_a"].emit(i)
assert len(L) == 0
link(a, b)
for i in range(10):
a['in_a'].emit(i)
a["in_a"].emit(i)
assert L == [(i + 1) * 2 for i in range(10)]


Expand Down Expand Up @@ -54,29 +56,29 @@ def test_double_link():
def make_a():
source = Stream()
out_a = source.map(lambda x: x + 1)
return {'in_a': source, 'out_a': out_a}
return {"in_a": source, "out_a": out_a}

def make_b():
out_a = Stream()
out_b = out_a.map(lambda x: x * 2)
return {'out_a': out_a, 'out_b': out_b}
return {"out_a": out_a, "out_b": out_b}

def make_c():
out_a = Stream()
out_b = Stream()
out_c = out_a.zip(out_b).map(sum)
return {'out_a': out_a, 'out_b': out_b, 'out_c': out_c}
return {"out_a": out_a, "out_b": out_b, "out_c": out_c}

a = make_a()
b = make_b()
L = b['out_b'].sink_to_list()
L = b["out_b"].sink_to_list()
ab = link(a, b)
c = make_c()
L2 = c['out_c'].sink_to_list()
L2 = c["out_c"].sink_to_list()
abc = link(ab, c)

for i in range(10):
a['in_a'].emit(i)
a["in_a"].emit(i)
assert L == [(i + 1) * 2 for i in range(10)]
assert L2 == [((i + 1) * 2) + i + 1 for i in range(10)]

Expand Down

0 comments on commit 1004ed3

Please sign in to comment.