Skip to content

Commit

Permalink
use apply with multiple args
Browse files Browse the repository at this point in the history
  • Loading branch information
CJ-Wright committed Sep 26, 2018
1 parent fe36aa6 commit d5b5e65
Show file tree
Hide file tree
Showing 9 changed files with 35 additions and 25 deletions.
16 changes: 9 additions & 7 deletions streamz_ext/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,26 @@
from .core import identity


def result_maybe(future_maybe):
if isinstance(future_maybe, Sequence):
aa = []
for a in future_maybe:
aa.append(result_maybe(a))
return aa
def result_maybe(future_maybe, top=False):
try:
return future_maybe.result()
except AttributeError:
if isinstance(future_maybe, Sequence):
aa = []
for a in future_maybe:
aa.append(result_maybe(a, top=False))
if isinstance(future_maybe, tuple):
aa = tuple(aa)
return aa
return future_maybe



def delayed_execution(func):
@wraps(func)
def inner(*args, **kwargs):
args = tuple([result_maybe(v) for v in args])
kwargs = {k: result_maybe(v) for k, v in kwargs.items()}
print("delayed", func, args)
return func(*args, **kwargs)

return inner
Expand Down
11 changes: 11 additions & 0 deletions streamz_ext/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,15 @@
from zstreamz.core import _global_sinks, _truthy


def apply(func, args, args2=None, kwargs=None):
if args2:
args = args + args2
if kwargs:
return func(*args, **kwargs)
else:
return func(*args)


def scatter(self, **kwargs):
from .parallel import scatter

Expand Down Expand Up @@ -277,3 +286,5 @@ def destroy_pipeline(source_node: Stream):
# some source nodes are tuples and some are bad wekrefs
except (AttributeError, KeyError) as e:
pass


21 changes: 9 additions & 12 deletions streamz_ext/parallel.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
from concurrent.futures import Future
from functools import wraps

from zstreamz import Stream
from streamz_ext import apply
from zstreamz.core import _truthy, args_kwargs
from streamz_ext.core import get_io_loop
from streamz_ext.clients import DEFAULT_BACKENDS
from operator import getitem

from tornado import gen

from dask.compatibility import apply

from . import core, sources
from .core import Stream, identity
from .core import Stream

from collections import Sequence

Expand All @@ -38,7 +37,6 @@ def inner(*args, **kwargs):
):
return NULL_COMPUTE
else:
print("filter", args)
return func(*args, **kwargs)

return inner
Expand Down Expand Up @@ -160,10 +158,11 @@ def update(self, x, who=None):
class map(ParallelStream):
def __init__(self, upstream, func, *args, **kwargs):
self.func = filter_null_wrapper(func)
stream_name = kwargs.pop("stream_name", None)
self.kwargs = kwargs
self.args = args

ParallelStream.__init__(self, upstream)
ParallelStream.__init__(self, upstream, stream_name=stream_name)

def update(self, x, who=None):
client = self.default_client()
Expand All @@ -185,8 +184,9 @@ def __init__(
self.func = filter_null_wrapper(func)
self.state = start
self.returns_state = returns_state
stream_name = kwargs.pop("stream_name", None)
self.kwargs = kwargs
ParallelStream.__init__(self, upstream)
ParallelStream.__init__(self, upstream, stream_name=stream_name)

def update(self, x, who=None):
if self.state is core.no_default:
Expand Down Expand Up @@ -215,12 +215,9 @@ def __init__(self, upstream, func, *args, **kwargs):

ParallelStream.__init__(self, upstream, stream_name=stream_name)

def update(self, x, who=None):
if not isinstance(x, tuple):
x = (x,)
y = x + self.args
def update(self, x: Future, who=None):
client = self.default_client()
result = client.submit(apply, self.func, y, self.kwargs)
result = client.submit(apply, self.func, x, self.args, self.kwargs)
return self._emit(result)


Expand Down
2 changes: 1 addition & 1 deletion test/test_batch.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from streamz_ext.batch import *

try:
from streamz.tests.test_batch import *
from zstreamz.tests.test_batch import *
except ImportError:
pass
2 changes: 1 addition & 1 deletion test/test_core.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import operator as op

try:
from streamz.tests.test_core import *
from zstreamz.tests.test_core import *
except ImportError as e:
pass
from streamz_ext import Stream, destroy_pipeline
Expand Down
2 changes: 1 addition & 1 deletion test/test_dask.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from streamz_ext.dask import *

try:
from streamz.tests.test_dask import *
from zstreamz.tests.test_dask import *
except ImportError:
pass
2 changes: 1 addition & 1 deletion test/test_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from streamz_ext.graph import *

try:
from streamz.tests.test_graph import *
from zstreamz.tests.test_graph import *
except ImportError:
pass

Expand Down
2 changes: 1 addition & 1 deletion test/test_link.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from streamz_ext import Stream

try:
from streamz.tests.test_link import *
from zstreamz.tests.test_link import *
except ImportError:
pass

Expand Down
2 changes: 1 addition & 1 deletion test/test_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def f(acc, i):
def test_zip(backend):
a = Stream(asynchronous=True)
b = Stream(asynchronous=True)
c = scatter(a, backend=backend).zip(scatter(b, backend="thread"))
c = scatter(a, backend=backend).zip(scatter(b, backend=backend))

L = c.gather().sink_to_list()

Expand Down

0 comments on commit d5b5e65

Please sign in to comment.