From 07b5ff12739337d891b96733d746af339d2a69e2 Mon Sep 17 00:00:00 2001 From: christopher Date: Thu, 11 Jan 2018 13:34:54 -0500 Subject: [PATCH] WIP: get repo ready for release --- .condarc | 8 + .coveragerc | 14 + .flake8 | 9 + .gitignore | 94 +++ .landscape.yml | 3 + .travis.yml | 39 + LICENSE | 62 ++ requirements.txt | 3 + requirements/build.txt | 2 + requirements/pip.txt | 0 requirements/run.txt | 3 + requirements/test.txt | 4 + rever.xsh | 16 + setup.py | 19 + streamz_ext/__init__.py | 3 + streamz_ext/batch.py | 1 + streamz_ext/core.py | 1260 +++++++++++++++++++++++++++++++++ streamz_ext/dask.py | 1 + streamz_ext/dataframe.py | 1 + streamz_ext/graph.py | 1 + streamz_ext/orderedweakset.py | 35 + test/test_core.py | 26 + 22 files changed, 1604 insertions(+) create mode 100644 .condarc create mode 100644 .coveragerc create mode 100644 .flake8 create mode 100644 .gitignore create mode 100644 .landscape.yml create mode 100644 .travis.yml create mode 100644 LICENSE create mode 100644 requirements.txt create mode 100644 requirements/build.txt create mode 100644 requirements/pip.txt create mode 100644 requirements/run.txt create mode 100644 requirements/test.txt create mode 100644 rever.xsh create mode 100755 setup.py create mode 100644 streamz_ext/__init__.py create mode 100644 streamz_ext/batch.py create mode 100644 streamz_ext/core.py create mode 100644 streamz_ext/dask.py create mode 100644 streamz_ext/dataframe.py create mode 100644 streamz_ext/graph.py create mode 100644 streamz_ext/orderedweakset.py create mode 100644 test/test_core.py diff --git a/.condarc b/.condarc new file mode 100644 index 0000000..c00951d --- /dev/null +++ b/.condarc @@ -0,0 +1,8 @@ +always_yes: true +show_channel_urls: true +channels: +- conda-forge +- defaults +show_channel_urls: true +track_features: +- nomkl \ No newline at end of file diff --git a/.coveragerc b/.coveragerc new file mode 100644 index 0000000..8ec47f1 --- /dev/null +++ b/.coveragerc @@ -0,0 +1,14 @@ +[run] +source = + streamz_ext +[report] +omit = + */python?.?/* + */site-packages/nose/* + # ignore _version.py and versioneer.py + .*version.* + *_version.py + */tests/utils.py + +exclude_lines = + if __name__ == '__main__': diff --git a/.flake8 b/.flake8 new file mode 100644 index 0000000..3ba77ce --- /dev/null +++ b/.flake8 @@ -0,0 +1,9 @@ +[flake8] +exclude = + .git, + __pycache__, + docs/conf.py, + old, + build, + dist, +max-complexity = 25 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..2a087b7 --- /dev/null +++ b/.gitignore @@ -0,0 +1,94 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +env/ +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +*.egg-info/ +.installed.cfg +*.egg + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*,cover +.hypothesis/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# IPython Notebook +.ipynb_checkpoints + +# pyenv +.python-version + +# celery beat schedule file +celerybeat-schedule + +# dotenv +.env + +# virtualenv +venv/ +ENV/ + +# Spyder project settings +.spyderproject + +# Rope project settings +.ropeproject + +# pycharm +.idea/ +# Rever +rever/ diff --git a/.landscape.yml b/.landscape.yml new file mode 100644 index 0000000..b5bd0f1 --- /dev/null +++ b/.landscape.yml @@ -0,0 +1,3 @@ +python-targets: + - 3 +doc-warnings: true \ No newline at end of file diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..a4719bc --- /dev/null +++ b/.travis.yml @@ -0,0 +1,39 @@ +language: python +sudo: false + +cache: + directories: + - $HOME/.cache/pip + - $HOME/.cache/matplotlib + +python: + - 2.7 + - 3.5 + - 3.6 +before_install: + - wget https://repo.continuum.io/miniconda/Miniconda3-latest-Linux-x86_64.sh -O miniconda.sh + - chmod +x miniconda.sh + - ./miniconda.sh -b -p ~/mc + - export PATH=~/mc/bin:$PATH + - conda update conda --yes + - conda config --add channels conda-forge + + +install: + - export GIT_FULL_HASH=`git rev-parse HEAD` + - conda create --yes -n testenv python=$TRAVIS_PYTHON_VERSION + - source activate testenv + - conda install --yes --file requirements/build.txt + - conda install pip + - pip install -r requirements/pip.txt + - pip install . + +script: + - set -e + - conda install --yes --file requirements/run.txt + - conda install --yes --file requirements/test.txt + - pip install codecov pytest-env + - coverage run run_tests.py + - coverage report -m + - codecov + - flake8 streamz_ext diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..a18b77d --- /dev/null +++ b/LICENSE @@ -0,0 +1,62 @@ +Copyright (c) 2018, Board of Trustees of Columbia University in the +city of New York. 2017, Continuum Analytics, Inc. and contributors +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions +are met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + + * Neither the name of the copyright holder nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY COPYRIGHT HOLDER "AS IS". COPYRIGHT HOLDER +EXPRESSLY DISCLAIMS ANY AND ALL WARRANTIES AND CONDITIONS, EITHER +EXPRESS OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY, TITLE, FITNESS, ADEQUACY OR SUITABILITY +FOR A PARTICULAR PURPOSE, AND ANY WARRANTIES OF FREEDOM FROM +INFRINGEMENT OF ANY DOMESTIC OR FOREIGN PATENT, COPYRIGHTS, TRADE +SECRETS OR OTHER PROPRIETARY RIGHTS OF ANY PARTY. IN NO EVENT SHALL +COPYRIGHT HOLDER BE LIABLE TO ANY PARTY FOR ANY DIRECT, INDIRECT, +INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT +NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF +USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON +ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF +THIS SOFTWARE OR RELATING TO THIS AGREEMENT, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. + +please email Prof. Simon Billinge at sb2896@columbia.edu with any questions. + +Redistribution and use in source and binary forms, with or without modification, +are permitted provided that the following conditions are met: + +Redistributions of source code must retain the above copyright notice, +this list of conditions and the following disclaimer. + +Redistributions in binary form must reproduce the above copyright notice, +this list of conditions and the following disclaimer in the documentation +and/or other materials provided with the distribution. + +Neither the name of Continuum Analytics nor the names of any contributors +may be used to endorse or promote products derived from this software +without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF +THE POSSIBILITY OF SUCH DAMAGE. diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..6852d96 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +toolz +tornado +streamz diff --git a/requirements/build.txt b/requirements/build.txt new file mode 100644 index 0000000..f72d870 --- /dev/null +++ b/requirements/build.txt @@ -0,0 +1,2 @@ +python +setuptools diff --git a/requirements/pip.txt b/requirements/pip.txt new file mode 100644 index 0000000..e69de29 diff --git a/requirements/run.txt b/requirements/run.txt new file mode 100644 index 0000000..6852d96 --- /dev/null +++ b/requirements/run.txt @@ -0,0 +1,3 @@ +toolz +tornado +streamz diff --git a/requirements/test.txt b/requirements/test.txt new file mode 100644 index 0000000..6e5234f --- /dev/null +++ b/requirements/test.txt @@ -0,0 +1,4 @@ +pytest +flake8 +codecov +coverage \ No newline at end of file diff --git a/rever.xsh b/rever.xsh new file mode 100644 index 0000000..f505087 --- /dev/null +++ b/rever.xsh @@ -0,0 +1,16 @@ +$PROJECT = 'streamz_ext' +$ACTIVITIES = ['version_bump', + 'changelog', + 'tag', + 'push_tag', + 'ghrelease'] + +$VERSION_BUMP_PATTERNS = [ + ($PROJECT + '/__init__.py', '__version__\s*=.*', "__version__ = '$VERSION'"), + ('setup.py', 'version\s*=.*,', "version='$VERSION',") + ] +$CHANGELOG_FILENAME = 'CHANGELOG.rst' +$CHANGELOG_IGNORE = ['TEMPLATE'] + +$GITHUB_ORG = 'xpdAcq' +$GITHUB_REPO = $PROJECT diff --git a/setup.py b/setup.py new file mode 100755 index 0000000..db01d1d --- /dev/null +++ b/setup.py @@ -0,0 +1,19 @@ +#!/usr/bin/env python + +from os.path import exists +from setuptools import setup, find_packages + + +setup(name='streamz_ext', + version='0.0.0', + description='Streams', + url='http://github.com/xpdAcq/streamz_ext/', + maintainer='Christopher J. Wright', + maintainer_email='cjwright4242@gmail.com', + license='BSD', + keywords='streams', + packages=find_packages(), + long_description=(open('README.rst').read() if exists('README.rst') + else ''), + install_requires=list(open('requirements.txt').read().strip().split('\n')), + zip_safe=False) diff --git a/streamz_ext/__init__.py b/streamz_ext/__init__.py new file mode 100644 index 0000000..b0aa19e --- /dev/null +++ b/streamz_ext/__init__.py @@ -0,0 +1,3 @@ +__version__ = '0.0.0' + +from .core import * diff --git a/streamz_ext/batch.py b/streamz_ext/batch.py new file mode 100644 index 0000000..0005d4d --- /dev/null +++ b/streamz_ext/batch.py @@ -0,0 +1 @@ +from streamz.batch import * \ No newline at end of file diff --git a/streamz_ext/core.py b/streamz_ext/core.py new file mode 100644 index 0000000..f3aeb17 --- /dev/null +++ b/streamz_ext/core.py @@ -0,0 +1,1260 @@ +# Note I can remove most of this once the orderedweakset is in conda-forge +from __future__ import absolute_import, division, print_function +from collections import deque, MutableMapping +from datetime import timedelta +import functools +import logging +import six +import sys +import threading +from time import time +import weakref + +import toolz +from tornado import gen +from tornado.locks import Condition +from tornado.ioloop import IOLoop +from tornado.queues import Queue +from collections import Iterable + +from streamz.compatibility import get_thread_identity +from .orderedweakset import OrderedWeakrefSet + +no_default = '--no-default--' + +_global_sinks = set() + +_html_update_streams = set() + +thread_state = threading.local() + +logger = logging.getLogger(__name__) + + +def identity(x): + return x + + +class Stream(object): + """ A Stream is an infinite sequence of data + + Streams subscribe to each other passing and transforming data between them. + A Stream object listens for updates from upstream, reacts to these updates, + and then emits more data to flow downstream to all Stream objects that + subscribe to it. Downstream Stream objects may connect at any point of a + Stream graph to get a full view of the data coming off of that point to do + with as they will. + + Examples + -------- + >>> def inc(x): + ... return x + 1 + + >>> source = Stream() # Create a stream object + >>> s = source.map(inc).map(str) # Subscribe to make new streams + >>> s.sink(print) # take an action whenever an element reaches the end + + >>> L = list() + >>> s.sink(L.append) # or take multiple actions (streams can branch) + + >>> for i in range(5): + ... source.emit(i) # push data in at the source + '1' + '2' + '3' + '4' + '5' + >>> L # and the actions happen at the sinks + ['1', '2', '3', '4', '5'] + """ + _graphviz_shape = 'ellipse' + _graphviz_style = 'rounded,filled' + _graphviz_fillcolor = 'white' + _graphviz_orientation = 0 + + str_list = ['func', 'predicate', 'n', 'interval'] + + def __init__(self, upstream=None, upstreams=None, stream_name=None, + loop=None, asynchronous=False): + self.asynchronous = asynchronous + self.downstreams = OrderedWeakrefSet() + if upstreams is not None: + self.upstreams = upstreams + else: + self.upstreams = [upstream] + if loop is None: + for upstream in self.upstreams: + if upstream and upstream.loop: + loop = upstream.loop + break + self.loop = loop + for upstream in self.upstreams: + if upstream: + upstream.downstreams.add(self) + self.name = stream_name + if loop: + for upstream in self.upstreams: + if upstream: + upstream._inform_loop(loop) + + def _inform_loop(self, loop): + """ + Percolate information about an event loop to the rest of the stream + """ + if self.loop is loop: + return + elif self.loop is None: + self.loop = loop + for upstream in self.upstreams: + if upstream: + upstream._inform_loop(loop) + for downstream in self.downstreams: + if downstream: + downstream._inform_loop(loop) + else: + raise ValueError("Two different event loops active") + + @classmethod + def register_api(cls, modifier=identity): + """ Add callable to Stream API + + This allows you to register a new method onto this class. You can use + it as a decorator.:: + + >>> @Stream.register_api() + ... class foo(Stream): + ... ... + + >>> Stream().foo(...) # this works now + + It attaches the callable as a normal attribute to the class object. In + doing so it respsects inheritance (all subclasses of Stream will also + get the foo attribute). + + By default callables are assumed to be instance methods. If you like + you can include modifiers to apply before attaching to the class as in + the following case where we construct a ``staticmethod``. + + >>> @Stream.register_api(staticmethod) + ... class foo(Stream): + ... ... + + >>> Stream.foo(...) # Foo operates as a static method + """ + def _(func): + @functools.wraps(func) + def wrapped(*args, **kwargs): + return func(*args, **kwargs) + setattr(cls, func.__name__, modifier(wrapped)) + return func + return _ + + def __str__(self): + s_list = [] + if self.name: + s_list.append('{}; {}'.format(self.name, self.__class__.__name__)) + else: + s_list.append(self.__class__.__name__) + + for m in self.str_list: + s = '' + at = getattr(self, m, None) + if at: + if not callable(at): + s = str(at) + elif hasattr(at, '__name__'): + s = getattr(self, m).__name__ + elif hasattr(at.__class__, '__name__'): + s = getattr(self, m).__class__.__name__ + else: + s = None + if s: + s_list.append('{}={}'.format(m, s)) + if len(s_list) <= 2: + s_list = [term.split('=')[-1] for term in s_list] + + text = "<" + text += s_list[0] + if len(s_list) > 1: + text += ': ' + text += ', '.join(s_list[1:]) + text += '>' + return text + + __repr__ = __str__ + + def _ipython_display_(self, **kwargs): + try: + from ipywidgets import Output + import IPython + except ImportError: + return self._repr_html_() + output = Output(_view_count=0) + output_ref = weakref.ref(output) + + def update_cell(val): + output = output_ref() + if output is None: + return + with output: + IPython.display.clear_output(wait=True) + IPython.display.display(val) + + s = self.map(update_cell) + _html_update_streams.add(s) + + self.output_ref = output_ref + s_ref = weakref.ref(s) + + def remove_stream(change): + output = output_ref() + if output is None: + return + + if output._view_count == 0: + ss = s_ref() + ss.destroy() + _html_update_streams.remove(ss) # trigger gc + + output.observe(remove_stream, '_view_count') + + return output._ipython_display_(**kwargs) + + def _emit(self, x): + result = [] + for downstream in list(self.downstreams): + r = downstream.update(x, who=self) + if type(r) is list: + result.extend(r) + else: + result.append(r) + + return [element for element in result if element is not None] + + def emit(self, x, asynchronous=False): + """ Push data into the stream at this point + + This is typically done only at source Streams but can theortically be + done at any point + """ + ts_async = getattr(thread_state, 'asynchronous', False) + if asynchronous or self.loop is None or ts_async or self.asynchronous: + if not ts_async: + thread_state.asynchronous = True + try: + result = self._emit(x) + return gen.convert_yielded(result) + finally: + thread_state.asynchronous = ts_async + else: + @gen.coroutine + def _(): + thread_state.asynchronous = True + try: + result = yield self._emit(x) + finally: + del thread_state.asynchronous + + raise gen.Return(result) + return sync(self.loop, _) + + def update(self, x, who=None): + self._emit(x) + + def gather(self): + """ This is a no-op for core streamz + + This allows gather to be used in both dask and core streams + """ + return self + + def connect(self, downstream): + ''' Connect this stream to a downstream element. + + Parameters + ---------- + downstream: Stream + The downstream stream to connect to + ''' + self.downstreams.add(downstream) + + if downstream.upstreams == [None]: + downstream.upstreams = [self] + else: + downstream.upstreams.append(self) + + def disconnect(self, downstream): + ''' Disconnect this stream to a downstream element. + + Parameters + ---------- + downstream: Stream + The downstream stream to disconnect from + ''' + self.downstreams.remove(downstream) + + downstream.upstreams.remove(self) + + @property + def upstream(self): + if len(self.upstreams) != 1: + raise ValueError("Stream has multiple upstreams") + else: + return self.upstreams[0] + + def destroy(self, streams=None): + """ + Disconnect this stream from any upstream sources + """ + if streams is None: + streams = self.upstreams + for upstream in list(streams): + upstream.downstreams.remove(self) + self.upstreams.remove(upstream) + + def scatter(self, **kwargs): + from .dask import scatter + return scatter(self, **kwargs) + + def remove(self, predicate): + """ Only pass through elements for which the predicate returns False """ + return self.filter(lambda x: not predicate(x)) + + @property + def scan(self): + return self.accumulate + + @property + def concat(self): + return self.flatten + + def sink_to_list(self): + """ Append all elements of a stream to a list as they come in + + Examples + -------- + >>> source = Stream() + >>> L = source.map(lambda x: 10 * x).sink_to_list() + >>> for i in range(5): + ... source.emit(i) + >>> L + [0, 10, 20, 30, 40] + """ + L = [] + self.sink(L.append) + return L + + def frequencies(self, **kwargs): + """ Count occurrences of elements """ + def update_frequencies(last, x): + return toolz.assoc(last, x, last.get(x, 0) + 1) + + return self.scan(update_frequencies, start={}, **kwargs) + + def visualize(self, filename='mystream.png', source_node=False, **kwargs): + """Render the computation of this object's task graph using graphviz. + + Requires ``graphviz`` to be installed. + + Parameters + ---------- + filename : str, optional + The name of the file to write to disk. + source_node: bool, optional + If True then the node is the source node and we can label the + edges in their execution order. Defaults to False + kwargs: + Graph attributes to pass to graphviz like ``rankdir="LR"`` + """ + from .graph import visualize + return visualize(self, filename, source_node=source_node, **kwargs) + + def to_dataframe(self, example): + """ Convert a stream of Pandas dataframes to a DataFrame + + Examples + -------- + >>> source = Stream() + >>> sdf = source.to_dataframe() + >>> L = sdf.groupby(sdf.x).y.mean().stream.sink_to_list() + >>> source.emit(pd.DataFrame(...)) # doctest: +SKIP + >>> source.emit(pd.DataFrame(...)) # doctest: +SKIP + >>> source.emit(pd.DataFrame(...)) # doctest: +SKIP + """ + from .dataframe import DataFrame + return DataFrame(stream=self, example=example) + + def to_batch(self, **kwargs): + """ Convert a stream of lists to a Batch + + All elements of the stream are assumed to be lists or tuples + + Examples + -------- + >>> source = Stream() + >>> batches = source.to_batch() + >>> L = batches.pluck('value').map(inc).sum().stream.sink_to_list() + >>> source.emit([{'name': 'Alice', 'value': 1}, + ... {'name': 'Bob', 'value': 2}, + ... {'name': 'Charlie', 'value': 3}]) + >>> source.emit([{'name': 'Alice', 'value': 4}, + ... {'name': 'Bob', 'value': 5}, + ... {'name': 'Charlie', 'value': 6}]) + """ + from .batch import Batch + return Batch(stream=self, **kwargs) + + +@Stream.register_api() +class sink(Stream): + """ Apply a function on every element + + Examples + -------- + >>> source = Stream() + >>> L = list() + >>> source.sink(L.append) + >>> source.sink(print) + >>> source.sink(print) + >>> source.emit(123) + 123 + 123 + >>> L + [123] + + See Also + -------- + map + Stream.sink_to_list + """ + _graphviz_shape = 'trapezium' + + def __init__(self, upstream, func, *args, **kwargs): + self.func = func + # take the stream specific kwargs out + stream_name = kwargs.pop("stream_name", None) + self.kwargs = kwargs + self.args = args + + Stream.__init__(self, upstream, stream_name=stream_name) + _global_sinks.add(self) + + def update(self, x, who=None): + result = self.func(x, *self.args, **self.kwargs) + if gen.isawaitable(result): + return result + else: + return [] + + +@Stream.register_api() +class starsink(Stream): + """ Apply a function on every element + + Examples + -------- + >>> source = Stream() + >>> L = list() + >>> source.sink(L.append) + >>> source.sink(print) + >>> source.sink(print) + >>> source.emit(123) + 123 + 123 + >>> L + [123] + + See Also + -------- + map + Stream.sink_to_list + """ + _graphviz_shape = 'trapezium' + + def __init__(self, upstream, func, *args, **kwargs): + self.func = func + # take the stream specific kwargs out + stream_name = kwargs.pop("stream_name", None) + self.kwargs = kwargs + self.args = args + + Stream.__init__(self, upstream, stream_name=stream_name) + _global_sinks.add(self) + + def update(self, x, who=None): + y = x + self.args + result = self.func(*y, **self.kwargs) + if gen.isawaitable(result): + return result + else: + return [] + + +@Stream.register_api() +class map(Stream): + """ Apply a function to every element in the stream + + Parameters + ---------- + func: callable + *args : + The arguments to pass to the function. + **kwargs: + Keyword arguments to pass to func + + Examples + -------- + >>> source = Stream() + >>> source.map(lambda x: 2*x).sink(print) + >>> for i in range(5): + ... source.emit(i) + 0 + 2 + 4 + 6 + 8 + """ + def __init__(self, upstream, func, *args, **kwargs): + self.func = func + # this is one of a few stream specific kwargs + stream_name = kwargs.pop('stream_name', None) + self.kwargs = kwargs + self.args = args + + Stream.__init__(self, upstream, stream_name=stream_name) + + def update(self, x, who=None): + try: + result = self.func(x, *self.args, **self.kwargs) + except Exception as e: + logger.exception(e) + raise + else: + return self._emit(result) + + +@Stream.register_api() +class starmap(Stream): + """ Apply a function to every element in the stream, splayed out + + Parameters + ---------- + func: callable + *args : + The arguments to pass to the function. + **kwargs: + Keyword arguments to pass to func + + Examples + -------- + >>> source = Stream() + >>> source.smap(lambda a, b: a + b).sink(print) + >>> for i in range(5): + ... source.emit((i, i)) + 0 + 2 + 4 + 6 + 8 + """ + def __init__(self, upstream, func, *args, **kwargs): + self.func = func + # this is one of a few stream specific kwargs + stream_name = kwargs.pop('stream_name', None) + self.kwargs = kwargs + self.args = args + + Stream.__init__(self, upstream, stream_name=stream_name) + + def update(self, x, who=None): + y = x + self.args + try: + result = self.func(*y, **self.kwargs) + except Exception as e: + logger.exception(e) + raise + else: + return self._emit(result) + + +def _truthy(x): + return not not x + + +@Stream.register_api() +class filter(Stream): + """ Only pass through elements that satisfy the predicate + + Parameters + ---------- + predicate : function + The predicate. Should return True or False, where + True means that the predicate is satisfied. + + Examples + -------- + >>> source = Stream() + >>> source.filter(lambda x: x % 2 == 0).sink(print) + >>> for i in range(5): + ... source.emit(i) + 0 + 2 + 4 + """ + def __init__(self, upstream, predicate, **kwargs): + if predicate is None: + predicate = _truthy + self.predicate = predicate + + Stream.__init__(self, upstream, **kwargs) + + def update(self, x, who=None): + if self.predicate(x): + return self._emit(x) + + +@Stream.register_api() +class accumulate(Stream): + """ Accumulate results with previous state + + This performs running or cumulative reductions, applying the function + to the previous total and the new element. The function should take + two arguments, the previous accumulated state and the next element and + it should return a new accumulated state. + + Parameters + ---------- + func: callable + start: object + Initial value. Defaults to the first submitted element + returns_state: boolean + If true then func should return both the state and the value to emit + If false then both values are the same, and func returns one value + **kwargs: + Keyword arguments to pass to func + + Examples + -------- + >>> source = Stream() + >>> source.accumulate(lambda acc, x: acc + x).sink(print) + >>> for i in range(5): + ... source.emit(i) + 1 + 3 + 6 + 10 + """ + _graphviz_shape = 'box' + + def __init__(self, upstream, func, start=no_default, returns_state=False, + **kwargs): + self.func = func + self.kwargs = kwargs + self.state = start + self.returns_state = returns_state + # this is one of a few stream specific kwargs + stream_name = kwargs.pop('stream_name', None) + Stream.__init__(self, upstream, stream_name=stream_name) + + def update(self, x, who=None): + if self.state is no_default: + self.state = x + return self._emit(x) + else: + try: + result = self.func(self.state, x, **self.kwargs) + except Exception as e: + logger.exception(e) + raise + if self.returns_state: + state, result = result + else: + state = result + self.state = state + return self._emit(result) + + +@Stream.register_api() +class partition(Stream): + """ Partition stream into tuples of equal size + + Examples + -------- + >>> source = Stream() + >>> source.partition(3).sink(print) + >>> for i in range(10): + ... source.emit(i) + (0, 1, 2) + (3, 4, 5) + (6, 7, 8) + """ + _graphviz_shape = 'diamond' + + def __init__(self, upstream, n, **kwargs): + self.n = n + self.buffer = [] + Stream.__init__(self, upstream, **kwargs) + + def update(self, x, who=None): + self.buffer.append(x) + if len(self.buffer) == self.n: + result, self.buffer = self.buffer, [] + return self._emit(tuple(result)) + else: + return [] + + +@Stream.register_api() +class sliding_window(Stream): + """ Produce overlapping tuples of size n + + Examples + -------- + >>> source = Stream() + >>> source.sliding_window(3).sink(print) + >>> for i in range(8): + ... source.emit(i) + (0, 1, 2) + (1, 2, 3) + (2, 3, 4) + (3, 4, 5) + (4, 5, 6) + (5, 6, 7) + """ + _graphviz_shape = 'diamond' + + def __init__(self, upstream, n, **kwargs): + self.n = n + self.buffer = deque(maxlen=n) + Stream.__init__(self, upstream, **kwargs) + + def update(self, x, who=None): + self.buffer.append(x) + if len(self.buffer) == self.n: + return self._emit(tuple(self.buffer)) + else: + return [] + + +def convert_interval(interval): + if isinstance(interval, str): + import pandas as pd + interval = pd.Timedelta(interval).total_seconds() + return interval + + +@Stream.register_api() +class timed_window(Stream): + """ Emit a tuple of collected results every interval + + Every ``interval`` seconds this emits a tuple of all of the results + seen so far. This can help to batch data coming off of a high-volume + stream. + """ + _graphviz_shape = 'octagon' + + def __init__(self, upstream, interval, loop=None, **kwargs): + loop = loop or upstream.loop or IOLoop.current() + self.interval = convert_interval(interval) + self.buffer = [] + self.last = gen.moment + + Stream.__init__(self, upstream, loop=loop, **kwargs) + + self.loop.add_callback(self.cb) + + def update(self, x, who=None): + self.buffer.append(x) + return self.last + + @gen.coroutine + def cb(self): + while True: + L, self.buffer = self.buffer, [] + self.last = self._emit(L) + yield self.last + yield gen.sleep(self.interval) + + +@Stream.register_api() +class delay(Stream): + """ Add a time delay to results """ + _graphviz_shape = 'octagon' + + def __init__(self, upstream, interval, loop=None, **kwargs): + loop = loop or upstream.loop or IOLoop.current() + self.interval = convert_interval(interval) + self.queue = Queue() + + Stream.__init__(self, upstream, loop=loop, **kwargs) + + self.loop.add_callback(self.cb) + + @gen.coroutine + def cb(self): + while True: + last = time() + x = yield self.queue.get() + yield self._emit(x) + duration = self.interval - (time() - last) + if duration > 0: + yield gen.sleep(duration) + + def update(self, x, who=None): + return self.queue.put(x) + + +@Stream.register_api() +class rate_limit(Stream): + """ Limit the flow of data + + This stops two elements of streaming through in an interval shorter + than the provided value. + + Parameters + ---------- + interval: float + Time in seconds + """ + _graphviz_shape = 'octagon' + + def __init__(self, upstream, interval, **kwargs): + self.interval = convert_interval(interval) + self.next = 0 + + Stream.__init__(self, upstream, **kwargs) + + @gen.coroutine + def update(self, x, who=None): + now = time() + old_next = self.next + self.next = max(now, self.next) + self.interval + if now < old_next: + yield gen.sleep(old_next - now) + yield self._emit(x) + + +@Stream.register_api() +class buffer(Stream): + """ Allow results to pile up at this point in the stream + + This allows results to buffer in place at various points in the stream. + This can help to smooth flow through the system when backpressure is + applied. + """ + _graphviz_shape = 'diamond' + + def __init__(self, upstream, n, loop=None, **kwargs): + loop = loop or upstream.loop or IOLoop.current() + self.queue = Queue(maxsize=n) + + Stream.__init__(self, upstream, loop=loop, **kwargs) + + self.loop.add_callback(self.cb) + + def update(self, x, who=None): + return self.queue.put(x) + + @gen.coroutine + def cb(self): + while True: + x = yield self.queue.get() + yield self._emit(x) + + +@Stream.register_api() +class zip(Stream): + """ Combine streams together into a stream of tuples + + We emit a new tuple once all streams have produce a new tuple. + + See also + -------- + combine_latest + zip_latest + """ + _graphviz_orientation = 270 + _graphviz_shape = 'triangle' + + def __init__(self, *upstreams, **kwargs): + self.maxsize = kwargs.pop('maxsize', 10) + self.condition = Condition() + self.literals = [(i, val) for i, val in enumerate(upstreams) + if not isinstance(val, Stream)] + + self.buffers = {upstream: deque() + for upstream in upstreams + if isinstance(upstream, Stream)} + + upstreams2 = [upstream for upstream in upstreams if isinstance(upstream, Stream)] + + Stream.__init__(self, upstreams=upstreams2, **kwargs) + + def pack_literals(self, tup): + """ Fill buffers for literals whenever we empty them """ + inp = list(tup)[::-1] + out = [] + for i, val in self.literals: + while len(out) < i: + out.append(inp.pop()) + out.append(val) + + while inp: + out.append(inp.pop()) + + return tuple(out) + + def update(self, x, who=None): + L = self.buffers[who] # get buffer for stream + L.append(x) + if len(L) == 1 and all(self.buffers.values()): + tup = tuple(self.buffers[up][0] for up in self.upstreams) + for buf in self.buffers.values(): + buf.popleft() + self.condition.notify_all() + if self.literals: + tup = self.pack_literals(tup) + return self._emit(tup) + elif len(L) > self.maxsize: + return self.condition.wait() + + +@Stream.register_api() +class combine_latest(Stream): + """ Combine multiple streams together to a stream of tuples + + This will emit a new tuple of all of the most recent elements seen from + any stream. + + Parameters + ---------- + emit_on : stream or list of streams or None + only emit upon update of the streams listed. + If None, emit on update from any stream + + See Also + -------- + zip + """ + _graphviz_orientation = 270 + _graphviz_shape = 'triangle' + + def __init__(self, *upstreams, **kwargs): + emit_on = kwargs.pop('emit_on', None) + + self.last = [None for _ in upstreams] + self.missing = set(upstreams) + if emit_on is not None: + if not isinstance(emit_on, Iterable): + emit_on = (emit_on, ) + emit_on = tuple( + upstreams[x] if isinstance(x, int) else x for x in emit_on) + self.emit_on = emit_on + else: + self.emit_on = upstreams + Stream.__init__(self, upstreams=upstreams, **kwargs) + + def update(self, x, who=None): + if self.missing and who in self.missing: + self.missing.remove(who) + + self.last[self.upstreams.index(who)] = x + if not self.missing and who in self.emit_on: + tup = tuple(self.last) + return self.emit(tup) + + +@Stream.register_api() +class flatten(Stream): + """ Flatten streams of lists or iterables into a stream of elements + + Examples + -------- + >>> source = Stream() + >>> source.flatten().sink(print) + >>> for x in [[1, 2, 3], [4, 5], [6, 7, 7]]: + ... source.emit(x) + 1 + 2 + 3 + 4 + 5 + 6 + 7 + + See Also + -------- + partition + """ + def update(self, x, who=None): + L = [] + for item in x: + y = self._emit(item) + if type(y) is list: + L.extend(y) + else: + L.append(y) + return L + + +@Stream.register_api() +class unique(Stream): + """ Avoid sending through repeated elements + + This deduplicates a stream so that only new elements pass through. + You can control how much of a history is stored with the ``history=`` + parameter. For example setting ``history=1`` avoids sending through + elements when one is repeated right after the other. + + Examples + -------- + >>> source = Stream() + >>> source.unique(history=1).sink(print) + >>> for x in [1, 1, 2, 2, 2, 1, 3]: + ... source.emit(x) + 1 + 2 + 1 + 3 + """ + + def __init__(self, upstream, history=None, key=identity, **kwargs): + self.seen = dict() + self.key = key + if history: + from zict import LRU + self.seen = LRU(history, self.seen) + self.dict_seen = deque(maxlen=history) + + Stream.__init__(self, upstream, **kwargs) + + def update(self, x, who=None): + y = self.key(x) + # If y is a dict then we can't use LRU cache use FILO deque instead + if isinstance(y, MutableMapping): + if y not in self.dict_seen: + self.dict_seen.append(y) + return self._emit(x) + else: + if y not in self.seen: + self.seen[y] = 1 + return self._emit(x) + + +@Stream.register_api() +class union(Stream): + """ Combine multiple streams into one + + Every element from any of the upstreams streams will immediately flow + into the output stream. They will not be combined with elements from + other streams. + + See also + -------- + Stream.zip + Stream.combine_latest + """ + def __init__(self, *upstreams, **kwargs): + super(union, self).__init__(upstreams=upstreams, **kwargs) + + def update(self, x, who=None): + return self._emit(x) + + +@Stream.register_api() +class pluck(Stream): + """ Select elements from elements in the stream. + + Parameters + ---------- + pluck : object, list + The element(s) to pick from the incoming element in the stream + If an instance of list, will pick multiple elements. + + Examples + -------- + >>> source = Stream() + >>> source.pluck([0, 3]).sink(print) + >>> for x in [[1, 2, 3, 4], [4, 5, 6, 7], [8, 9, 10, 11]]: + ... source.emit(x) + (1, 4) + (4, 7) + (8, 11) + + >>> source = Stream() + >>> source.pluck('name').sink(print) + >>> for x in [{'name': 'Alice', 'x': 123}, {'name': 'Bob', 'x': 456}]: + ... source.emit(x) + 'Alice' + 'Bob' + """ + def __init__(self, upstream, pick, **kwargs): + self.pick = pick + super(pluck, self).__init__(upstream, **kwargs) + + def update(self, x, who=None): + if isinstance(self.pick, list): + return self._emit(tuple([x[ind] for ind in self.pick])) + else: + return self._emit(x[self.pick]) + + +@Stream.register_api() +class collect(Stream): + """ + Hold elements in a cache and emit them as a collection when flushed. + + Examples + -------- + >>> source1 = Stream() + >>> source2 = Stream() + >>> collector = collect(source1) + >>> collector.sink(print) + >>> source2.sink(collector.flush) + >>> source1.emit(1) + >>> source1.emit(2) + >>> source2.emit('anything') # flushes collector + ... + [1, 2] + """ + def __init__(self, upstream, cache=None, **kwargs): + if cache is None: + cache = deque() + self.cache = cache + + Stream.__init__(self, upstream, **kwargs) + + def update(self, x, who=None): + self.cache.append(x) + + def flush(self, _=None): + out = tuple(self.cache) + self._emit(out) + self.cache.clear() + + +@Stream.register_api() +class zip_latest(Stream): + """Combine multiple streams together to a stream of tuples + + The stream which this is called from is lossless. All elements from + the lossless stream are emitted reguardless of when they came in. + This will emit a new tuple consisting of an element from the lossless + stream paired with the latest elements from the other streams. + Elements are only emitted when an element on the lossless stream are + received, similar to ``combine_latest`` with the ``emit_on`` flag. + + See Also + -------- + Stream.combine_latest + Stream.zip + """ + def __init__(self, lossless, *upstreams, **kwargs): + upstreams = (lossless,) + upstreams + self.last = [None for _ in upstreams] + self.missing = set(upstreams) + self.lossless = lossless + self.lossless_buffer = deque() + Stream.__init__(self, upstreams=upstreams, **kwargs) + + def update(self, x, who=None): + idx = self.upstreams.index(who) + if who is self.lossless: + self.lossless_buffer.append(x) + + self.last[idx] = x + if self.missing and who in self.missing: + self.missing.remove(who) + + if not self.missing: + L = [] + while self.lossless_buffer: + self.last[0] = self.lossless_buffer.popleft() + L.append(self._emit(tuple(self.last))) + return L + + +@Stream.register_api() +class latest(Stream): + """ Drop held-up data and emit the latest result + + This allows you to skip intermediate elements in the stream if there is + some back pressure causing a slowdown. Use this when you only care about + the latest elements, and are willing to lose older data. + + This passes through values without modification otherwise. + + Examples + -------- + >>> source.map(f).latest().map(g) # doctest: +SKIP + """ + _graphviz_shape = 'octagon' + + def __init__(self, upstream, loop=None): + loop = loop or upstream.loop or IOLoop.current() + self.condition = Condition() + self.next = [] + + Stream.__init__(self, upstream, loop=loop) + + self.loop.add_callback(self.cb) + + def update(self, x, who=None): + self.next = [x] + self.loop.add_callback(self.condition.notify) + + @gen.coroutine + def cb(self): + while True: + yield self.condition.wait() + [x] = self.next + yield self._emit(x) + + +def sync(loop, func, *args, **kwargs): + """ + Run coroutine in loop running in separate thread. + """ + # This was taken from distrbuted/utils.py + timeout = kwargs.pop('callback_timeout', None) + + def make_coro(): + coro = gen.maybe_future(func(*args, **kwargs)) + if timeout is None: + return coro + else: + return gen.with_timeout(timedelta(seconds=timeout), coro) + + if not loop._running: + try: + return loop.run_sync(make_coro) + except RuntimeError: # loop already running + pass + + e = threading.Event() + main_tid = get_thread_identity() + result = [None] + error = [False] + + @gen.coroutine + def f(): + try: + if main_tid == get_thread_identity(): + raise RuntimeError("sync() called from thread of running loop") + yield gen.moment + thread_state.asynchronous = True + result[0] = yield make_coro() + except Exception as exc: + logger.exception(exc) + error[0] = sys.exc_info() + finally: + thread_state.asynchronous = False + e.set() + + loop.add_callback(f) + while not e.is_set(): + e.wait(1000000) + if error[0]: + six.reraise(*error[0]) + else: + return result[0] diff --git a/streamz_ext/dask.py b/streamz_ext/dask.py new file mode 100644 index 0000000..d0b1865 --- /dev/null +++ b/streamz_ext/dask.py @@ -0,0 +1 @@ +from streamz.dask import * \ No newline at end of file diff --git a/streamz_ext/dataframe.py b/streamz_ext/dataframe.py new file mode 100644 index 0000000..ac32e3c --- /dev/null +++ b/streamz_ext/dataframe.py @@ -0,0 +1 @@ +from streamz.dataframe import * \ No newline at end of file diff --git a/streamz_ext/graph.py b/streamz_ext/graph.py new file mode 100644 index 0000000..540e63b --- /dev/null +++ b/streamz_ext/graph.py @@ -0,0 +1 @@ +from streamz.graph import * \ No newline at end of file diff --git a/streamz_ext/orderedweakset.py b/streamz_ext/orderedweakset.py new file mode 100644 index 0000000..82136ec --- /dev/null +++ b/streamz_ext/orderedweakset.py @@ -0,0 +1,35 @@ +# -*- coding: utf8 -*- +# This is a copy from Stack Overflow +# https://stackoverflow.com/questions/7828444/indexable-weak-ordered-set-in-python +# Asked by Neil G https://stackoverflow.com/users/99989/neil-g +# Answered/edited by https://stackoverflow.com/users/1001643/raymond-hettinger +import collections +import weakref + + +class OrderedSet(collections.MutableSet): + def __init__(self, values=()): + self._od = collections.OrderedDict().fromkeys(values) + + def __len__(self): + return len(self._od) + + def __iter__(self): + return iter(self._od) + + def __contains__(self, value): + return value in self._od + + def add(self, value): + self._od[value] = None + + def discard(self, value): + self._od.pop(value, None) + + +class OrderedWeakrefSet(weakref.WeakSet): + def __init__(self, values=()): + super(OrderedWeakrefSet, self).__init__() + self.data = OrderedSet() + for elem in values: + self.add(elem) diff --git a/test/test_core.py b/test/test_core.py new file mode 100644 index 0000000..29dbb87 --- /dev/null +++ b/test/test_core.py @@ -0,0 +1,26 @@ +from streamz_ext import Stream + + +def test_star_sink(): + L = [] + + def add(x, y): + L.append(x + y) + + source = Stream() + source.starsink(add) + + source.emit((1, 10)) + + assert L[0] == 11 + + +def test_unique_dict(): + source = Stream() + L = source.unique(history=1).sink_to_list() + + source.emit({'a': 1}) + source.emit({'a': 1}) + source.emit({'a': 1}) + + assert L == [{'a': 1}]