Skip to content

Commit

Permalink
move filter to parallel and remove thread
Browse files Browse the repository at this point in the history
  • Loading branch information
CJ-Wright committed Sep 18, 2018
1 parent 55f2238 commit 5be3026
Show file tree
Hide file tree
Showing 5 changed files with 166 additions and 512 deletions.
74 changes: 63 additions & 11 deletions streamz_ext/parallel.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
from functools import wraps

from streamz import Stream
from streamz.core import _truthy
from streamz_ext.core import get_io_loop
from streamz_ext.clients import DEFAULT_BACKENDS
from operator import getitem
Expand All @@ -9,6 +13,34 @@
from . import core, sources
from .core import Stream, identity

from collections import Sequence

NULL_COMPUTE = '~~NULL_COMPUTE~~'


def return_null(func):
@wraps(func)
def inner(x, *args, **kwargs):
tv = func(x, *args, **kwargs)
if tv:
return x
else:
return NULL_COMPUTE

return inner


def filter_null_wrapper(func):
@wraps(func)
def inner(*args, **kwargs):
if (any(a == NULL_COMPUTE for a in args)
or any(v == NULL_COMPUTE for v in kwargs.values())):
return NULL_COMPUTE
else:
return func(*args, **kwargs)

return inner


class ParallelStream(Stream):
""" A Parallel stream using multiple backends
Expand Down Expand Up @@ -112,14 +144,16 @@ class gather(ParallelStream):
def update(self, x, who=None):
client = self.default_client()
result = yield client.gather(x, asynchronous=True)
result2 = yield self._emit(result)
raise gen.Return(result2)
if not ((isinstance(result, Sequence) and any(
r == NULL_COMPUTE for r in result)) or result == NULL_COMPUTE):
result2 = yield self._emit(result)
raise gen.Return(result2)


@ParallelStream.register_api()
class map(ParallelStream):
def __init__(self, upstream, func, *args, **kwargs):
self.func = func
self.func = filter_null_wrapper(func)
self.kwargs = kwargs
self.args = args

Expand All @@ -134,14 +168,14 @@ def update(self, x, who=None):
@ParallelStream.register_api()
class accumulate(ParallelStream):
def __init__(
self,
upstream,
func,
start=core.no_default,
returns_state=False,
**kwargs
self,
upstream,
func,
start=core.no_default,
returns_state=False,
**kwargs
):
self.func = func
self.func = filter_null_wrapper(func)
self.state = start
self.returns_state = returns_state
self.kwargs = kwargs
Expand All @@ -166,7 +200,7 @@ def update(self, x, who=None):
@ParallelStream.register_api()
class starmap(ParallelStream):
def __init__(self, upstream, func, **kwargs):
self.func = func
self.func = filter_null_wrapper(func)
stream_name = kwargs.pop("stream_name", None)
self.kwargs = kwargs

Expand All @@ -178,6 +212,24 @@ def update(self, x, who=None):
return self._emit(result)


@ParallelStream.register_api()
class filter(ParallelStream):
def __init__(self, upstream, predicate, *args, **kwargs):
if predicate is None:
predicate = _truthy
self.predicate = return_null(predicate)
stream_name = kwargs.pop("stream_name", None)
self.kwargs = kwargs
self.args = args

ParallelStream.__init__(self, upstream, stream_name=stream_name)

def update(self, x, who=None):
client = self.default_client()
result = client.submit(self.predicate, x, *self.args, **self.kwargs)
return self._emit(result)


@ParallelStream.register_api()
class buffer(ParallelStream, core.buffer):
pass
Expand Down
Loading

0 comments on commit 5be3026

Please sign in to comment.