diff --git a/.condarc b/.condarc new file mode 100644 index 0000000..c00951d --- /dev/null +++ b/.condarc @@ -0,0 +1,8 @@ +always_yes: true +show_channel_urls: true +channels: +- conda-forge +- defaults +show_channel_urls: true +track_features: +- nomkl \ No newline at end of file diff --git a/.coveragerc b/.coveragerc new file mode 100644 index 0000000..8ec47f1 --- /dev/null +++ b/.coveragerc @@ -0,0 +1,14 @@ +[run] +source = + streamz_ext +[report] +omit = + */python?.?/* + */site-packages/nose/* + # ignore _version.py and versioneer.py + .*version.* + *_version.py + */tests/utils.py + +exclude_lines = + if __name__ == '__main__': diff --git a/.flake8 b/.flake8 new file mode 100644 index 0000000..3ba77ce --- /dev/null +++ b/.flake8 @@ -0,0 +1,9 @@ +[flake8] +exclude = + .git, + __pycache__, + docs/conf.py, + old, + build, + dist, +max-complexity = 25 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..2a087b7 --- /dev/null +++ b/.gitignore @@ -0,0 +1,94 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +env/ +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +*.egg-info/ +.installed.cfg +*.egg + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*,cover +.hypothesis/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# IPython Notebook +.ipynb_checkpoints + +# pyenv +.python-version + +# celery beat schedule file +celerybeat-schedule + +# dotenv +.env + +# virtualenv +venv/ +ENV/ + +# Spyder project settings +.spyderproject + +# Rope project settings +.ropeproject + +# pycharm +.idea/ +# Rever +rever/ diff --git a/.landscape.yml b/.landscape.yml new file mode 100644 index 0000000..b5bd0f1 --- /dev/null +++ b/.landscape.yml @@ -0,0 +1,3 @@ +python-targets: + - 3 +doc-warnings: true \ No newline at end of file diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..a4719bc --- /dev/null +++ b/.travis.yml @@ -0,0 +1,39 @@ +language: python +sudo: false + +cache: + directories: + - $HOME/.cache/pip + - $HOME/.cache/matplotlib + +python: + - 2.7 + - 3.5 + - 3.6 +before_install: + - wget https://repo.continuum.io/miniconda/Miniconda3-latest-Linux-x86_64.sh -O miniconda.sh + - chmod +x miniconda.sh + - ./miniconda.sh -b -p ~/mc + - export PATH=~/mc/bin:$PATH + - conda update conda --yes + - conda config --add channels conda-forge + + +install: + - export GIT_FULL_HASH=`git rev-parse HEAD` + - conda create --yes -n testenv python=$TRAVIS_PYTHON_VERSION + - source activate testenv + - conda install --yes --file requirements/build.txt + - conda install pip + - pip install -r requirements/pip.txt + - pip install . + +script: + - set -e + - conda install --yes --file requirements/run.txt + - conda install --yes --file requirements/test.txt + - pip install codecov pytest-env + - coverage run run_tests.py + - coverage report -m + - codecov + - flake8 streamz_ext diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..a18b77d --- /dev/null +++ b/LICENSE @@ -0,0 +1,62 @@ +Copyright (c) 2018, Board of Trustees of Columbia University in the +city of New York. 2017, Continuum Analytics, Inc. and contributors +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions +are met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + + * Neither the name of the copyright holder nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY COPYRIGHT HOLDER "AS IS". COPYRIGHT HOLDER +EXPRESSLY DISCLAIMS ANY AND ALL WARRANTIES AND CONDITIONS, EITHER +EXPRESS OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY, TITLE, FITNESS, ADEQUACY OR SUITABILITY +FOR A PARTICULAR PURPOSE, AND ANY WARRANTIES OF FREEDOM FROM +INFRINGEMENT OF ANY DOMESTIC OR FOREIGN PATENT, COPYRIGHTS, TRADE +SECRETS OR OTHER PROPRIETARY RIGHTS OF ANY PARTY. IN NO EVENT SHALL +COPYRIGHT HOLDER BE LIABLE TO ANY PARTY FOR ANY DIRECT, INDIRECT, +INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT +NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF +USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON +ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF +THIS SOFTWARE OR RELATING TO THIS AGREEMENT, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. + +please email Prof. Simon Billinge at sb2896@columbia.edu with any questions. + +Redistribution and use in source and binary forms, with or without modification, +are permitted provided that the following conditions are met: + +Redistributions of source code must retain the above copyright notice, +this list of conditions and the following disclaimer. + +Redistributions in binary form must reproduce the above copyright notice, +this list of conditions and the following disclaimer in the documentation +and/or other materials provided with the distribution. + +Neither the name of Continuum Analytics nor the names of any contributors +may be used to endorse or promote products derived from this software +without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF +THE POSSIBILITY OF SUCH DAMAGE. diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..6852d96 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +toolz +tornado +streamz diff --git a/requirements/build.txt b/requirements/build.txt new file mode 100644 index 0000000..f72d870 --- /dev/null +++ b/requirements/build.txt @@ -0,0 +1,2 @@ +python +setuptools diff --git a/requirements/pip.txt b/requirements/pip.txt new file mode 100644 index 0000000..e69de29 diff --git a/requirements/run.txt b/requirements/run.txt new file mode 100644 index 0000000..6852d96 --- /dev/null +++ b/requirements/run.txt @@ -0,0 +1,3 @@ +toolz +tornado +streamz diff --git a/requirements/test.txt b/requirements/test.txt new file mode 100644 index 0000000..6e5234f --- /dev/null +++ b/requirements/test.txt @@ -0,0 +1,4 @@ +pytest +flake8 +codecov +coverage \ No newline at end of file diff --git a/rever.xsh b/rever.xsh new file mode 100644 index 0000000..f505087 --- /dev/null +++ b/rever.xsh @@ -0,0 +1,16 @@ +$PROJECT = 'streamz_ext' +$ACTIVITIES = ['version_bump', + 'changelog', + 'tag', + 'push_tag', + 'ghrelease'] + +$VERSION_BUMP_PATTERNS = [ + ($PROJECT + '/__init__.py', '__version__\s*=.*', "__version__ = '$VERSION'"), + ('setup.py', 'version\s*=.*,', "version='$VERSION',") + ] +$CHANGELOG_FILENAME = 'CHANGELOG.rst' +$CHANGELOG_IGNORE = ['TEMPLATE'] + +$GITHUB_ORG = 'xpdAcq' +$GITHUB_REPO = $PROJECT diff --git a/setup.py b/setup.py new file mode 100755 index 0000000..db01d1d --- /dev/null +++ b/setup.py @@ -0,0 +1,19 @@ +#!/usr/bin/env python + +from os.path import exists +from setuptools import setup, find_packages + + +setup(name='streamz_ext', + version='0.0.0', + description='Streams', + url='http://github.com/xpdAcq/streamz_ext/', + maintainer='Christopher J. Wright', + maintainer_email='cjwright4242@gmail.com', + license='BSD', + keywords='streams', + packages=find_packages(), + long_description=(open('README.rst').read() if exists('README.rst') + else ''), + install_requires=list(open('requirements.txt').read().strip().split('\n')), + zip_safe=False) diff --git a/streamz_ext/__init__.py b/streamz_ext/__init__.py new file mode 100644 index 0000000..b0aa19e --- /dev/null +++ b/streamz_ext/__init__.py @@ -0,0 +1,3 @@ +__version__ = '0.0.0' + +from .core import * diff --git a/streamz_ext/batch.py b/streamz_ext/batch.py new file mode 100644 index 0000000..0005d4d --- /dev/null +++ b/streamz_ext/batch.py @@ -0,0 +1 @@ +from streamz.batch import * \ No newline at end of file diff --git a/streamz_ext/core.py b/streamz_ext/core.py new file mode 100644 index 0000000..f3aeb17 --- /dev/null +++ b/streamz_ext/core.py @@ -0,0 +1,1260 @@ +# Note I can remove most of this once the orderedweakset is in conda-forge +from __future__ import absolute_import, division, print_function +from collections import deque, MutableMapping +from datetime import timedelta +import functools +import logging +import six +import sys +import threading +from time import time +import weakref + +import toolz +from tornado import gen +from tornado.locks import Condition +from tornado.ioloop import IOLoop +from tornado.queues import Queue +from collections import Iterable + +from streamz.compatibility import get_thread_identity +from .orderedweakset import OrderedWeakrefSet + +no_default = '--no-default--' + +_global_sinks = set() + +_html_update_streams = set() + +thread_state = threading.local() + +logger = logging.getLogger(__name__) + + +def identity(x): + return x + + +class Stream(object): + """ A Stream is an infinite sequence of data + + Streams subscribe to each other passing and transforming data between them. + A Stream object listens for updates from upstream, reacts to these updates, + and then emits more data to flow downstream to all Stream objects that + subscribe to it. Downstream Stream objects may connect at any point of a + Stream graph to get a full view of the data coming off of that point to do + with as they will. + + Examples + -------- + >>> def inc(x): + ... return x + 1 + + >>> source = Stream() # Create a stream object + >>> s = source.map(inc).map(str) # Subscribe to make new streams + >>> s.sink(print) # take an action whenever an element reaches the end + + >>> L = list() + >>> s.sink(L.append) # or take multiple actions (streams can branch) + + >>> for i in range(5): + ... source.emit(i) # push data in at the source + '1' + '2' + '3' + '4' + '5' + >>> L # and the actions happen at the sinks + ['1', '2', '3', '4', '5'] + """ + _graphviz_shape = 'ellipse' + _graphviz_style = 'rounded,filled' + _graphviz_fillcolor = 'white' + _graphviz_orientation = 0 + + str_list = ['func', 'predicate', 'n', 'interval'] + + def __init__(self, upstream=None, upstreams=None, stream_name=None, + loop=None, asynchronous=False): + self.asynchronous = asynchronous + self.downstreams = OrderedWeakrefSet() + if upstreams is not None: + self.upstreams = upstreams + else: + self.upstreams = [upstream] + if loop is None: + for upstream in self.upstreams: + if upstream and upstream.loop: + loop = upstream.loop + break + self.loop = loop + for upstream in self.upstreams: + if upstream: + upstream.downstreams.add(self) + self.name = stream_name + if loop: + for upstream in self.upstreams: + if upstream: + upstream._inform_loop(loop) + + def _inform_loop(self, loop): + """ + Percolate information about an event loop to the rest of the stream + """ + if self.loop is loop: + return + elif self.loop is None: + self.loop = loop + for upstream in self.upstreams: + if upstream: + upstream._inform_loop(loop) + for downstream in self.downstreams: + if downstream: + downstream._inform_loop(loop) + else: + raise ValueError("Two different event loops active") + + @classmethod + def register_api(cls, modifier=identity): + """ Add callable to Stream API + + This allows you to register a new method onto this class. You can use + it as a decorator.:: + + >>> @Stream.register_api() + ... class foo(Stream): + ... ... + + >>> Stream().foo(...) # this works now + + It attaches the callable as a normal attribute to the class object. In + doing so it respsects inheritance (all subclasses of Stream will also + get the foo attribute). + + By default callables are assumed to be instance methods. If you like + you can include modifiers to apply before attaching to the class as in + the following case where we construct a ``staticmethod``. + + >>> @Stream.register_api(staticmethod) + ... class foo(Stream): + ... ... + + >>> Stream.foo(...) # Foo operates as a static method + """ + def _(func): + @functools.wraps(func) + def wrapped(*args, **kwargs): + return func(*args, **kwargs) + setattr(cls, func.__name__, modifier(wrapped)) + return func + return _ + + def __str__(self): + s_list = [] + if self.name: + s_list.append('{}; {}'.format(self.name, self.__class__.__name__)) + else: + s_list.append(self.__class__.__name__) + + for m in self.str_list: + s = '' + at = getattr(self, m, None) + if at: + if not callable(at): + s = str(at) + elif hasattr(at, '__name__'): + s = getattr(self, m).__name__ + elif hasattr(at.__class__, '__name__'): + s = getattr(self, m).__class__.__name__ + else: + s = None + if s: + s_list.append('{}={}'.format(m, s)) + if len(s_list) <= 2: + s_list = [term.split('=')[-1] for term in s_list] + + text = "<" + text += s_list[0] + if len(s_list) > 1: + text += ': ' + text += ', '.join(s_list[1:]) + text += '>' + return text + + __repr__ = __str__ + + def _ipython_display_(self, **kwargs): + try: + from ipywidgets import Output + import IPython + except ImportError: + return self._repr_html_() + output = Output(_view_count=0) + output_ref = weakref.ref(output) + + def update_cell(val): + output = output_ref() + if output is None: + return + with output: + IPython.display.clear_output(wait=True) + IPython.display.display(val) + + s = self.map(update_cell) + _html_update_streams.add(s) + + self.output_ref = output_ref + s_ref = weakref.ref(s) + + def remove_stream(change): + output = output_ref() + if output is None: + return + + if output._view_count == 0: + ss = s_ref() + ss.destroy() + _html_update_streams.remove(ss) # trigger gc + + output.observe(remove_stream, '_view_count') + + return output._ipython_display_(**kwargs) + + def _emit(self, x): + result = [] + for downstream in list(self.downstreams): + r = downstream.update(x, who=self) + if type(r) is list: + result.extend(r) + else: + result.append(r) + + return [element for element in result if element is not None] + + def emit(self, x, asynchronous=False): + """ Push data into the stream at this point + + This is typically done only at source Streams but can theortically be + done at any point + """ + ts_async = getattr(thread_state, 'asynchronous', False) + if asynchronous or self.loop is None or ts_async or self.asynchronous: + if not ts_async: + thread_state.asynchronous = True + try: + result = self._emit(x) + return gen.convert_yielded(result) + finally: + thread_state.asynchronous = ts_async + else: + @gen.coroutine + def _(): + thread_state.asynchronous = True + try: + result = yield self._emit(x) + finally: + del thread_state.asynchronous + + raise gen.Return(result) + return sync(self.loop, _) + + def update(self, x, who=None): + self._emit(x) + + def gather(self): + """ This is a no-op for core streamz + + This allows gather to be used in both dask and core streams + """ + return self + + def connect(self, downstream): + ''' Connect this stream to a downstream element. + + Parameters + ---------- + downstream: Stream + The downstream stream to connect to + ''' + self.downstreams.add(downstream) + + if downstream.upstreams == [None]: + downstream.upstreams = [self] + else: + downstream.upstreams.append(self) + + def disconnect(self, downstream): + ''' Disconnect this stream to a downstream element. + + Parameters + ---------- + downstream: Stream + The downstream stream to disconnect from + ''' + self.downstreams.remove(downstream) + + downstream.upstreams.remove(self) + + @property + def upstream(self): + if len(self.upstreams) != 1: + raise ValueError("Stream has multiple upstreams") + else: + return self.upstreams[0] + + def destroy(self, streams=None): + """ + Disconnect this stream from any upstream sources + """ + if streams is None: + streams = self.upstreams + for upstream in list(streams): + upstream.downstreams.remove(self) + self.upstreams.remove(upstream) + + def scatter(self, **kwargs): + from .dask import scatter + return scatter(self, **kwargs) + + def remove(self, predicate): + """ Only pass through elements for which the predicate returns False """ + return self.filter(lambda x: not predicate(x)) + + @property + def scan(self): + return self.accumulate + + @property + def concat(self): + return self.flatten + + def sink_to_list(self): + """ Append all elements of a stream to a list as they come in + + Examples + -------- + >>> source = Stream() + >>> L = source.map(lambda x: 10 * x).sink_to_list() + >>> for i in range(5): + ... source.emit(i) + >>> L + [0, 10, 20, 30, 40] + """ + L = [] + self.sink(L.append) + return L + + def frequencies(self, **kwargs): + """ Count occurrences of elements """ + def update_frequencies(last, x): + return toolz.assoc(last, x, last.get(x, 0) + 1) + + return self.scan(update_frequencies, start={}, **kwargs) + + def visualize(self, filename='mystream.png', source_node=False, **kwargs): + """Render the computation of this object's task graph using graphviz. + + Requires ``graphviz`` to be installed. + + Parameters + ---------- + filename : str, optional + The name of the file to write to disk. + source_node: bool, optional + If True then the node is the source node and we can label the + edges in their execution order. Defaults to False + kwargs: + Graph attributes to pass to graphviz like ``rankdir="LR"`` + """ + from .graph import visualize + return visualize(self, filename, source_node=source_node, **kwargs) + + def to_dataframe(self, example): + """ Convert a stream of Pandas dataframes to a DataFrame + + Examples + -------- + >>> source = Stream() + >>> sdf = source.to_dataframe() + >>> L = sdf.groupby(sdf.x).y.mean().stream.sink_to_list() + >>> source.emit(pd.DataFrame(...)) # doctest: +SKIP + >>> source.emit(pd.DataFrame(...)) # doctest: +SKIP + >>> source.emit(pd.DataFrame(...)) # doctest: +SKIP + """ + from .dataframe import DataFrame + return DataFrame(stream=self, example=example) + + def to_batch(self, **kwargs): + """ Convert a stream of lists to a Batch + + All elements of the stream are assumed to be lists or tuples + + Examples + -------- + >>> source = Stream() + >>> batches = source.to_batch() + >>> L = batches.pluck('value').map(inc).sum().stream.sink_to_list() + >>> source.emit([{'name': 'Alice', 'value': 1}, + ... {'name': 'Bob', 'value': 2}, + ... {'name': 'Charlie', 'value': 3}]) + >>> source.emit([{'name': 'Alice', 'value': 4}, + ... {'name': 'Bob', 'value': 5}, + ... {'name': 'Charlie', 'value': 6}]) + """ + from .batch import Batch + return Batch(stream=self, **kwargs) + + +@Stream.register_api() +class sink(Stream): + """ Apply a function on every element + + Examples + -------- + >>> source = Stream() + >>> L = list() + >>> source.sink(L.append) + >>> source.sink(print) + >>> source.sink(print) + >>> source.emit(123) + 123 + 123 + >>> L + [123] + + See Also + -------- + map + Stream.sink_to_list + """ + _graphviz_shape = 'trapezium' + + def __init__(self, upstream, func, *args, **kwargs): + self.func = func + # take the stream specific kwargs out + stream_name = kwargs.pop("stream_name", None) + self.kwargs = kwargs + self.args = args + + Stream.__init__(self, upstream, stream_name=stream_name) + _global_sinks.add(self) + + def update(self, x, who=None): + result = self.func(x, *self.args, **self.kwargs) + if gen.isawaitable(result): + return result + else: + return [] + + +@Stream.register_api() +class starsink(Stream): + """ Apply a function on every element + + Examples + -------- + >>> source = Stream() + >>> L = list() + >>> source.sink(L.append) + >>> source.sink(print) + >>> source.sink(print) + >>> source.emit(123) + 123 + 123 + >>> L + [123] + + See Also + -------- + map + Stream.sink_to_list + """ + _graphviz_shape = 'trapezium' + + def __init__(self, upstream, func, *args, **kwargs): + self.func = func + # take the stream specific kwargs out + stream_name = kwargs.pop("stream_name", None) + self.kwargs = kwargs + self.args = args + + Stream.__init__(self, upstream, stream_name=stream_name) + _global_sinks.add(self) + + def update(self, x, who=None): + y = x + self.args + result = self.func(*y, **self.kwargs) + if gen.isawaitable(result): + return result + else: + return [] + + +@Stream.register_api() +class map(Stream): + """ Apply a function to every element in the stream + + Parameters + ---------- + func: callable + *args : + The arguments to pass to the function. + **kwargs: + Keyword arguments to pass to func + + Examples + -------- + >>> source = Stream() + >>> source.map(lambda x: 2*x).sink(print) + >>> for i in range(5): + ... source.emit(i) + 0 + 2 + 4 + 6 + 8 + """ + def __init__(self, upstream, func, *args, **kwargs): + self.func = func + # this is one of a few stream specific kwargs + stream_name = kwargs.pop('stream_name', None) + self.kwargs = kwargs + self.args = args + + Stream.__init__(self, upstream, stream_name=stream_name) + + def update(self, x, who=None): + try: + result = self.func(x, *self.args, **self.kwargs) + except Exception as e: + logger.exception(e) + raise + else: + return self._emit(result) + + +@Stream.register_api() +class starmap(Stream): + """ Apply a function to every element in the stream, splayed out + + Parameters + ---------- + func: callable + *args : + The arguments to pass to the function. + **kwargs: + Keyword arguments to pass to func + + Examples + -------- + >>> source = Stream() + >>> source.smap(lambda a, b: a + b).sink(print) + >>> for i in range(5): + ... source.emit((i, i)) + 0 + 2 + 4 + 6 + 8 + """ + def __init__(self, upstream, func, *args, **kwargs): + self.func = func + # this is one of a few stream specific kwargs + stream_name = kwargs.pop('stream_name', None) + self.kwargs = kwargs + self.args = args + + Stream.__init__(self, upstream, stream_name=stream_name) + + def update(self, x, who=None): + y = x + self.args + try: + result = self.func(*y, **self.kwargs) + except Exception as e: + logger.exception(e) + raise + else: + return self._emit(result) + + +def _truthy(x): + return not not x + + +@Stream.register_api() +class filter(Stream): + """ Only pass through elements that satisfy the predicate + + Parameters + ---------- + predicate : function + The predicate. Should return True or False, where + True means that the predicate is satisfied. + + Examples + -------- + >>> source = Stream() + >>> source.filter(lambda x: x % 2 == 0).sink(print) + >>> for i in range(5): + ... source.emit(i) + 0 + 2 + 4 + """ + def __init__(self, upstream, predicate, **kwargs): + if predicate is None: + predicate = _truthy + self.predicate = predicate + + Stream.__init__(self, upstream, **kwargs) + + def update(self, x, who=None): + if self.predicate(x): + return self._emit(x) + + +@Stream.register_api() +class accumulate(Stream): + """ Accumulate results with previous state + + This performs running or cumulative reductions, applying the function + to the previous total and the new element. The function should take + two arguments, the previous accumulated state and the next element and + it should return a new accumulated state. + + Parameters + ---------- + func: callable + start: object + Initial value. Defaults to the first submitted element + returns_state: boolean + If true then func should return both the state and the value to emit + If false then both values are the same, and func returns one value + **kwargs: + Keyword arguments to pass to func + + Examples + -------- + >>> source = Stream() + >>> source.accumulate(lambda acc, x: acc + x).sink(print) + >>> for i in range(5): + ... source.emit(i) + 1 + 3 + 6 + 10 + """ + _graphviz_shape = 'box' + + def __init__(self, upstream, func, start=no_default, returns_state=False, + **kwargs): + self.func = func + self.kwargs = kwargs + self.state = start + self.returns_state = returns_state + # this is one of a few stream specific kwargs + stream_name = kwargs.pop('stream_name', None) + Stream.__init__(self, upstream, stream_name=stream_name) + + def update(self, x, who=None): + if self.state is no_default: + self.state = x + return self._emit(x) + else: + try: + result = self.func(self.state, x, **self.kwargs) + except Exception as e: + logger.exception(e) + raise + if self.returns_state: + state, result = result + else: + state = result + self.state = state + return self._emit(result) + + +@Stream.register_api() +class partition(Stream): + """ Partition stream into tuples of equal size + + Examples + -------- + >>> source = Stream() + >>> source.partition(3).sink(print) + >>> for i in range(10): + ... source.emit(i) + (0, 1, 2) + (3, 4, 5) + (6, 7, 8) + """ + _graphviz_shape = 'diamond' + + def __init__(self, upstream, n, **kwargs): + self.n = n + self.buffer = [] + Stream.__init__(self, upstream, **kwargs) + + def update(self, x, who=None): + self.buffer.append(x) + if len(self.buffer) == self.n: + result, self.buffer = self.buffer, [] + return self._emit(tuple(result)) + else: + return [] + + +@Stream.register_api() +class sliding_window(Stream): + """ Produce overlapping tuples of size n + + Examples + -------- + >>> source = Stream() + >>> source.sliding_window(3).sink(print) + >>> for i in range(8): + ... source.emit(i) + (0, 1, 2) + (1, 2, 3) + (2, 3, 4) + (3, 4, 5) + (4, 5, 6) + (5, 6, 7) + """ + _graphviz_shape = 'diamond' + + def __init__(self, upstream, n, **kwargs): + self.n = n + self.buffer = deque(maxlen=n) + Stream.__init__(self, upstream, **kwargs) + + def update(self, x, who=None): + self.buffer.append(x) + if len(self.buffer) == self.n: + return self._emit(tuple(self.buffer)) + else: + return [] + + +def convert_interval(interval): + if isinstance(interval, str): + import pandas as pd + interval = pd.Timedelta(interval).total_seconds() + return interval + + +@Stream.register_api() +class timed_window(Stream): + """ Emit a tuple of collected results every interval + + Every ``interval`` seconds this emits a tuple of all of the results + seen so far. This can help to batch data coming off of a high-volume + stream. + """ + _graphviz_shape = 'octagon' + + def __init__(self, upstream, interval, loop=None, **kwargs): + loop = loop or upstream.loop or IOLoop.current() + self.interval = convert_interval(interval) + self.buffer = [] + self.last = gen.moment + + Stream.__init__(self, upstream, loop=loop, **kwargs) + + self.loop.add_callback(self.cb) + + def update(self, x, who=None): + self.buffer.append(x) + return self.last + + @gen.coroutine + def cb(self): + while True: + L, self.buffer = self.buffer, [] + self.last = self._emit(L) + yield self.last + yield gen.sleep(self.interval) + + +@Stream.register_api() +class delay(Stream): + """ Add a time delay to results """ + _graphviz_shape = 'octagon' + + def __init__(self, upstream, interval, loop=None, **kwargs): + loop = loop or upstream.loop or IOLoop.current() + self.interval = convert_interval(interval) + self.queue = Queue() + + Stream.__init__(self, upstream, loop=loop, **kwargs) + + self.loop.add_callback(self.cb) + + @gen.coroutine + def cb(self): + while True: + last = time() + x = yield self.queue.get() + yield self._emit(x) + duration = self.interval - (time() - last) + if duration > 0: + yield gen.sleep(duration) + + def update(self, x, who=None): + return self.queue.put(x) + + +@Stream.register_api() +class rate_limit(Stream): + """ Limit the flow of data + + This stops two elements of streaming through in an interval shorter + than the provided value. + + Parameters + ---------- + interval: float + Time in seconds + """ + _graphviz_shape = 'octagon' + + def __init__(self, upstream, interval, **kwargs): + self.interval = convert_interval(interval) + self.next = 0 + + Stream.__init__(self, upstream, **kwargs) + + @gen.coroutine + def update(self, x, who=None): + now = time() + old_next = self.next + self.next = max(now, self.next) + self.interval + if now < old_next: + yield gen.sleep(old_next - now) + yield self._emit(x) + + +@Stream.register_api() +class buffer(Stream): + """ Allow results to pile up at this point in the stream + + This allows results to buffer in place at various points in the stream. + This can help to smooth flow through the system when backpressure is + applied. + """ + _graphviz_shape = 'diamond' + + def __init__(self, upstream, n, loop=None, **kwargs): + loop = loop or upstream.loop or IOLoop.current() + self.queue = Queue(maxsize=n) + + Stream.__init__(self, upstream, loop=loop, **kwargs) + + self.loop.add_callback(self.cb) + + def update(self, x, who=None): + return self.queue.put(x) + + @gen.coroutine + def cb(self): + while True: + x = yield self.queue.get() + yield self._emit(x) + + +@Stream.register_api() +class zip(Stream): + """ Combine streams together into a stream of tuples + + We emit a new tuple once all streams have produce a new tuple. + + See also + -------- + combine_latest + zip_latest + """ + _graphviz_orientation = 270 + _graphviz_shape = 'triangle' + + def __init__(self, *upstreams, **kwargs): + self.maxsize = kwargs.pop('maxsize', 10) + self.condition = Condition() + self.literals = [(i, val) for i, val in enumerate(upstreams) + if not isinstance(val, Stream)] + + self.buffers = {upstream: deque() + for upstream in upstreams + if isinstance(upstream, Stream)} + + upstreams2 = [upstream for upstream in upstreams if isinstance(upstream, Stream)] + + Stream.__init__(self, upstreams=upstreams2, **kwargs) + + def pack_literals(self, tup): + """ Fill buffers for literals whenever we empty them """ + inp = list(tup)[::-1] + out = [] + for i, val in self.literals: + while len(out) < i: + out.append(inp.pop()) + out.append(val) + + while inp: + out.append(inp.pop()) + + return tuple(out) + + def update(self, x, who=None): + L = self.buffers[who] # get buffer for stream + L.append(x) + if len(L) == 1 and all(self.buffers.values()): + tup = tuple(self.buffers[up][0] for up in self.upstreams) + for buf in self.buffers.values(): + buf.popleft() + self.condition.notify_all() + if self.literals: + tup = self.pack_literals(tup) + return self._emit(tup) + elif len(L) > self.maxsize: + return self.condition.wait() + + +@Stream.register_api() +class combine_latest(Stream): + """ Combine multiple streams together to a stream of tuples + + This will emit a new tuple of all of the most recent elements seen from + any stream. + + Parameters + ---------- + emit_on : stream or list of streams or None + only emit upon update of the streams listed. + If None, emit on update from any stream + + See Also + -------- + zip + """ + _graphviz_orientation = 270 + _graphviz_shape = 'triangle' + + def __init__(self, *upstreams, **kwargs): + emit_on = kwargs.pop('emit_on', None) + + self.last = [None for _ in upstreams] + self.missing = set(upstreams) + if emit_on is not None: + if not isinstance(emit_on, Iterable): + emit_on = (emit_on, ) + emit_on = tuple( + upstreams[x] if isinstance(x, int) else x for x in emit_on) + self.emit_on = emit_on + else: + self.emit_on = upstreams + Stream.__init__(self, upstreams=upstreams, **kwargs) + + def update(self, x, who=None): + if self.missing and who in self.missing: + self.missing.remove(who) + + self.last[self.upstreams.index(who)] = x + if not self.missing and who in self.emit_on: + tup = tuple(self.last) + return self.emit(tup) + + +@Stream.register_api() +class flatten(Stream): + """ Flatten streams of lists or iterables into a stream of elements + + Examples + -------- + >>> source = Stream() + >>> source.flatten().sink(print) + >>> for x in [[1, 2, 3], [4, 5], [6, 7, 7]]: + ... source.emit(x) + 1 + 2 + 3 + 4 + 5 + 6 + 7 + + See Also + -------- + partition + """ + def update(self, x, who=None): + L = [] + for item in x: + y = self._emit(item) + if type(y) is list: + L.extend(y) + else: + L.append(y) + return L + + +@Stream.register_api() +class unique(Stream): + """ Avoid sending through repeated elements + + This deduplicates a stream so that only new elements pass through. + You can control how much of a history is stored with the ``history=`` + parameter. For example setting ``history=1`` avoids sending through + elements when one is repeated right after the other. + + Examples + -------- + >>> source = Stream() + >>> source.unique(history=1).sink(print) + >>> for x in [1, 1, 2, 2, 2, 1, 3]: + ... source.emit(x) + 1 + 2 + 1 + 3 + """ + + def __init__(self, upstream, history=None, key=identity, **kwargs): + self.seen = dict() + self.key = key + if history: + from zict import LRU + self.seen = LRU(history, self.seen) + self.dict_seen = deque(maxlen=history) + + Stream.__init__(self, upstream, **kwargs) + + def update(self, x, who=None): + y = self.key(x) + # If y is a dict then we can't use LRU cache use FILO deque instead + if isinstance(y, MutableMapping): + if y not in self.dict_seen: + self.dict_seen.append(y) + return self._emit(x) + else: + if y not in self.seen: + self.seen[y] = 1 + return self._emit(x) + + +@Stream.register_api() +class union(Stream): + """ Combine multiple streams into one + + Every element from any of the upstreams streams will immediately flow + into the output stream. They will not be combined with elements from + other streams. + + See also + -------- + Stream.zip + Stream.combine_latest + """ + def __init__(self, *upstreams, **kwargs): + super(union, self).__init__(upstreams=upstreams, **kwargs) + + def update(self, x, who=None): + return self._emit(x) + + +@Stream.register_api() +class pluck(Stream): + """ Select elements from elements in the stream. + + Parameters + ---------- + pluck : object, list + The element(s) to pick from the incoming element in the stream + If an instance of list, will pick multiple elements. + + Examples + -------- + >>> source = Stream() + >>> source.pluck([0, 3]).sink(print) + >>> for x in [[1, 2, 3, 4], [4, 5, 6, 7], [8, 9, 10, 11]]: + ... source.emit(x) + (1, 4) + (4, 7) + (8, 11) + + >>> source = Stream() + >>> source.pluck('name').sink(print) + >>> for x in [{'name': 'Alice', 'x': 123}, {'name': 'Bob', 'x': 456}]: + ... source.emit(x) + 'Alice' + 'Bob' + """ + def __init__(self, upstream, pick, **kwargs): + self.pick = pick + super(pluck, self).__init__(upstream, **kwargs) + + def update(self, x, who=None): + if isinstance(self.pick, list): + return self._emit(tuple([x[ind] for ind in self.pick])) + else: + return self._emit(x[self.pick]) + + +@Stream.register_api() +class collect(Stream): + """ + Hold elements in a cache and emit them as a collection when flushed. + + Examples + -------- + >>> source1 = Stream() + >>> source2 = Stream() + >>> collector = collect(source1) + >>> collector.sink(print) + >>> source2.sink(collector.flush) + >>> source1.emit(1) + >>> source1.emit(2) + >>> source2.emit('anything') # flushes collector + ... + [1, 2] + """ + def __init__(self, upstream, cache=None, **kwargs): + if cache is None: + cache = deque() + self.cache = cache + + Stream.__init__(self, upstream, **kwargs) + + def update(self, x, who=None): + self.cache.append(x) + + def flush(self, _=None): + out = tuple(self.cache) + self._emit(out) + self.cache.clear() + + +@Stream.register_api() +class zip_latest(Stream): + """Combine multiple streams together to a stream of tuples + + The stream which this is called from is lossless. All elements from + the lossless stream are emitted reguardless of when they came in. + This will emit a new tuple consisting of an element from the lossless + stream paired with the latest elements from the other streams. + Elements are only emitted when an element on the lossless stream are + received, similar to ``combine_latest`` with the ``emit_on`` flag. + + See Also + -------- + Stream.combine_latest + Stream.zip + """ + def __init__(self, lossless, *upstreams, **kwargs): + upstreams = (lossless,) + upstreams + self.last = [None for _ in upstreams] + self.missing = set(upstreams) + self.lossless = lossless + self.lossless_buffer = deque() + Stream.__init__(self, upstreams=upstreams, **kwargs) + + def update(self, x, who=None): + idx = self.upstreams.index(who) + if who is self.lossless: + self.lossless_buffer.append(x) + + self.last[idx] = x + if self.missing and who in self.missing: + self.missing.remove(who) + + if not self.missing: + L = [] + while self.lossless_buffer: + self.last[0] = self.lossless_buffer.popleft() + L.append(self._emit(tuple(self.last))) + return L + + +@Stream.register_api() +class latest(Stream): + """ Drop held-up data and emit the latest result + + This allows you to skip intermediate elements in the stream if there is + some back pressure causing a slowdown. Use this when you only care about + the latest elements, and are willing to lose older data. + + This passes through values without modification otherwise. + + Examples + -------- + >>> source.map(f).latest().map(g) # doctest: +SKIP + """ + _graphviz_shape = 'octagon' + + def __init__(self, upstream, loop=None): + loop = loop or upstream.loop or IOLoop.current() + self.condition = Condition() + self.next = [] + + Stream.__init__(self, upstream, loop=loop) + + self.loop.add_callback(self.cb) + + def update(self, x, who=None): + self.next = [x] + self.loop.add_callback(self.condition.notify) + + @gen.coroutine + def cb(self): + while True: + yield self.condition.wait() + [x] = self.next + yield self._emit(x) + + +def sync(loop, func, *args, **kwargs): + """ + Run coroutine in loop running in separate thread. + """ + # This was taken from distrbuted/utils.py + timeout = kwargs.pop('callback_timeout', None) + + def make_coro(): + coro = gen.maybe_future(func(*args, **kwargs)) + if timeout is None: + return coro + else: + return gen.with_timeout(timedelta(seconds=timeout), coro) + + if not loop._running: + try: + return loop.run_sync(make_coro) + except RuntimeError: # loop already running + pass + + e = threading.Event() + main_tid = get_thread_identity() + result = [None] + error = [False] + + @gen.coroutine + def f(): + try: + if main_tid == get_thread_identity(): + raise RuntimeError("sync() called from thread of running loop") + yield gen.moment + thread_state.asynchronous = True + result[0] = yield make_coro() + except Exception as exc: + logger.exception(exc) + error[0] = sys.exc_info() + finally: + thread_state.asynchronous = False + e.set() + + loop.add_callback(f) + while not e.is_set(): + e.wait(1000000) + if error[0]: + six.reraise(*error[0]) + else: + return result[0] diff --git a/streamz_ext/dask.py b/streamz_ext/dask.py new file mode 100644 index 0000000..d0b1865 --- /dev/null +++ b/streamz_ext/dask.py @@ -0,0 +1 @@ +from streamz.dask import * \ No newline at end of file diff --git a/streamz_ext/dataframe.py b/streamz_ext/dataframe.py new file mode 100644 index 0000000..ac32e3c --- /dev/null +++ b/streamz_ext/dataframe.py @@ -0,0 +1 @@ +from streamz.dataframe import * \ No newline at end of file diff --git a/streamz_ext/graph.py b/streamz_ext/graph.py new file mode 100644 index 0000000..540e63b --- /dev/null +++ b/streamz_ext/graph.py @@ -0,0 +1 @@ +from streamz.graph import * \ No newline at end of file diff --git a/streamz_ext/orderedweakset.py b/streamz_ext/orderedweakset.py new file mode 100644 index 0000000..82136ec --- /dev/null +++ b/streamz_ext/orderedweakset.py @@ -0,0 +1,35 @@ +# -*- coding: utf8 -*- +# This is a copy from Stack Overflow +# https://stackoverflow.com/questions/7828444/indexable-weak-ordered-set-in-python +# Asked by Neil G https://stackoverflow.com/users/99989/neil-g +# Answered/edited by https://stackoverflow.com/users/1001643/raymond-hettinger +import collections +import weakref + + +class OrderedSet(collections.MutableSet): + def __init__(self, values=()): + self._od = collections.OrderedDict().fromkeys(values) + + def __len__(self): + return len(self._od) + + def __iter__(self): + return iter(self._od) + + def __contains__(self, value): + return value in self._od + + def add(self, value): + self._od[value] = None + + def discard(self, value): + self._od.pop(value, None) + + +class OrderedWeakrefSet(weakref.WeakSet): + def __init__(self, values=()): + super(OrderedWeakrefSet, self).__init__() + self.data = OrderedSet() + for elem in values: + self.add(elem) diff --git a/test/test_core.py b/test/test_core.py new file mode 100644 index 0000000..29dbb87 --- /dev/null +++ b/test/test_core.py @@ -0,0 +1,26 @@ +from streamz_ext import Stream + + +def test_star_sink(): + L = [] + + def add(x, y): + L.append(x + y) + + source = Stream() + source.starsink(add) + + source.emit((1, 10)) + + assert L[0] == 11 + + +def test_unique_dict(): + source = Stream() + L = source.unique(history=1).sink_to_list() + + source.emit({'a': 1}) + source.emit({'a': 1}) + source.emit({'a': 1}) + + assert L == [{'a': 1}]