Skip to content

Commit

Permalink
A few fixes
Browse files Browse the repository at this point in the history
- Minor Stylistic changes.
- Use latest Tensorlake version.
- Add Function Executor lifecycle test.
- Move test_function_concurrency form SDK package
  because it depends on Executor logic.
  • Loading branch information
eabatalov committed Jan 23, 2025
1 parent d538182 commit 8e06c7b
Show file tree
Hide file tree
Showing 8 changed files with 304 additions and 17 deletions.
10 changes: 5 additions & 5 deletions indexify/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion indexify/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ grpcio = "1.68.1"

# Function Executor only
grpcio-tools = "1.68.1"
tensorlake = ">=0.1.10"
tensorlake = ">=0.1.11"

# Executor only
pydantic = "2.10.4"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def __init__(
graph_invocation_id=request.graph_invocation_id,
task_id=request.task_id,
)
self._func_wrapper = function_wrapper
self._function_wrapper = function_wrapper
self._input_loader = FunctionInputsLoader(request)
self._response_helper = ResponseHelper(task_id=request.task_id)
# TODO: use files for stdout, stderr capturing. This puts a natural and thus reasonable
Expand Down Expand Up @@ -81,8 +81,8 @@ def _run_func(self, inputs: FunctionInputs) -> RunTaskResponse:
graph_version=self._graph_version,
invocation_state=self._invocation_state,
)
if _is_router(self._func_wrapper):
result: RouterCallResult = self._func_wrapper.invoke_router(
if _is_router(self._function_wrapper):
result: RouterCallResult = self._function_wrapper.invoke_router(
ctx, self._function_name, inputs.input
)
return self._response_helper.router_response(
Expand All @@ -91,12 +91,12 @@ def _run_func(self, inputs: FunctionInputs) -> RunTaskResponse:
stderr=self._func_stderr.getvalue(),
)
else:
result: FunctionCallResult = self._func_wrapper.invoke_fn_ser(
result: FunctionCallResult = self._function_wrapper.invoke_fn_ser(
ctx, self._function_name, inputs.input, inputs.init_value
)
return self._response_helper.function_response(
result=result,
is_reducer=_func_is_reducer(self._func_wrapper),
is_reducer=_function_is_reducer(self._function_wrapper),
stdout=self._func_stdout.getvalue(),
stderr=self._func_stderr.getvalue(),
)
Expand All @@ -122,5 +122,5 @@ def _is_router(func_wrapper: TensorlakeFunctionWrapper) -> bool:
)


def _func_is_reducer(func_wrapper: TensorlakeFunctionWrapper) -> bool:
def _function_is_reducer(func_wrapper: TensorlakeFunctionWrapper) -> bool:
return func_wrapper.indexify_function.accumulate is not None
9 changes: 5 additions & 4 deletions indexify/src/indexify/function_executor/service.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Iterator, Optional, Union
from typing import Iterator, Optional

import grpc
import structlog
Expand Down Expand Up @@ -31,7 +31,7 @@ def __init__(self):
self._graph_name: Optional[str] = None
self._graph_version: Optional[str] = None
self._function_name: Optional[str] = None
self._func_wrapper: Optional[TensorlakeFunctionWrapper] = None
self._function_wrapper: Optional[TensorlakeFunctionWrapper] = None
self._invocation_state_proxy_server: Optional[InvocationStateProxyServer] = None

def initialize(
Expand Down Expand Up @@ -61,9 +61,10 @@ def initialize(
try:
# Process user controlled input in a try-except block to not treat errors here as our
# internal platform errors.
# TODO: capture stdout and stderr and report exceptions the same way as when we run a task.
graph = graph_serializer.deserialize(request.graph.bytes)
function = graph_serializer.deserialize(graph[request.function_name])
self._func_wrapper = TensorlakeFunctionWrapper(function)
self._function_wrapper = TensorlakeFunctionWrapper(function)
except Exception as e:
return InitializeResponse(success=False, customer_error=str(e))

Expand Down Expand Up @@ -99,7 +100,7 @@ def run_task(
invocation_state=ProxiedInvocationState(
request.task_id, self._invocation_state_proxy_server
),
function_wrapper=self._func_wrapper,
function_wrapper=self._function_wrapper,
logger=self._logger,
).run()

Expand Down
151 changes: 151 additions & 0 deletions indexify/tests/executor/test_function_concurrency.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
import multiprocessing
import time
import unittest

from tensorlake import Graph, tensorlake_function
from tensorlake.remote_graph import RemoteGraph
from testing import test_graph_name


@tensorlake_function()
def sleep_a(secs: int) -> str:
time.sleep(secs)
return "success"


@tensorlake_function()
def sleep_b(secs: int) -> str:
time.sleep(secs)
return "success"


def invoke_sleep_graph(graph_name, func_name, func_arg_secs: int):
graph = RemoteGraph.by_name(graph_name)
invocation_id = graph.run(
block_until_done=True,
secs=func_arg_secs,
)
# Run in a new process because this call blocks and with threads
# we won't be able to run it with a real concurrency.
output = graph.output(invocation_id, func_name)
if output != ["success"]:
raise Exception(f"Expected output to be ['success'], got {output}")


class TestRemoteGraphFunctionConcurrency(unittest.TestCase):
def test_two_same_functions_run_with_concurrency_of_one(self):
# This test verifies that two invocations of the same function run sequentially
# because a single Executor can have only one Function Executor per function
# version and because each Function Executor can only run a single task concurrently.
graph = Graph(
name=test_graph_name(self),
description="test",
start_node=sleep_a,
)
graph = RemoteGraph.deploy(graph)

# Pre-warm Executor so Executor delays in the next invokes are very low.
invoke_sleep_graph(
graph_name=test_graph_name(self), func_name="sleep_a", func_arg_secs=0.01
)

processes = [
multiprocessing.Process(
target=invoke_sleep_graph,
kwargs={
"graph_name": test_graph_name(self),
"func_name": "sleep_a",
"func_arg_secs": 0.51,
},
),
multiprocessing.Process(
target=invoke_sleep_graph,
kwargs={
"graph_name": test_graph_name(self),
"func_name": "sleep_a",
"func_arg_secs": 0.51,
},
),
]

for process in processes:
process.start()

start_time = time.time()
for process in processes:
process.join()
self.assertEqual(process.exitcode, 0)

end_time = time.time()
duration = end_time - start_time
self.assertGreaterEqual(
duration,
1.0,
"The two invocations of the same function should run sequentially",
)

def test_two_different_functions_run_with_concurrency_of_two(self):
# This test verifies that two invocations of different functions run concurrently
# because a single Executor can have a Function Executor for each different function.
graph_a_name = test_graph_name(self) + "_a"
graph_a = Graph(
name=graph_a_name,
description="test",
start_node=sleep_a,
)
graph_a = RemoteGraph.deploy(graph_a)

graph_b_name = test_graph_name(self) + "_b"
graph_b = Graph(
name=graph_b_name,
description="test",
start_node=sleep_b,
)
graph_b = RemoteGraph.deploy(graph_b)

# Pre-warm Executor so Executor delays in the next invokes are very low.
invoke_sleep_graph(
graph_name=graph_a_name, func_name="sleep_a", func_arg_secs=0.01
)
invoke_sleep_graph(
graph_name=graph_b_name, func_name="sleep_b", func_arg_secs=0.01
)

processes = [
multiprocessing.Process(
target=invoke_sleep_graph,
kwargs={
"graph_name": graph_a_name,
"func_name": "sleep_a",
"func_arg_secs": 0.51,
},
),
multiprocessing.Process(
target=invoke_sleep_graph,
kwargs={
"graph_name": graph_b_name,
"func_name": "sleep_b",
"func_arg_secs": 0.51,
},
),
]

for process in processes:
process.start()

start_time = time.time()
for process in processes:
process.join()
self.assertEqual(process.exitcode, 0)

end_time = time.time()
duration = end_time - start_time
self.assertLessEqual(
duration,
1.0,
"The two invocations of different functions should run concurrently",
)


if __name__ == "__main__":
unittest.main()
120 changes: 120 additions & 0 deletions indexify/tests/executor/test_function_executor_routing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import unittest

from tensorlake import Graph, RemoteGraph, tensorlake_function
from testing import test_graph_name


def function_executor_id() -> str:
# We can't use PIDs as they are reused when a process exits.
# Use memory address of the function instead.
return str(id(function_executor_id))


@tensorlake_function()
def get_function_executor_id_1() -> str:
return function_executor_id()


@tensorlake_function()
def get_function_executor_id_2(id_from_1: str) -> str:
return function_executor_id()


class TestFunctionExecutorRouting(unittest.TestCase):
def test_functions_of_same_version_run_in_same_function_executor(self):
graph = Graph(
name=test_graph_name(self),
description="test",
start_node=get_function_executor_id_1,
)
graph = RemoteGraph.deploy(graph)

invocation_id = graph.run(block_until_done=True)
output = graph.output(invocation_id, "get_function_executor_id_1")
self.assertEqual(len(output), 1)
function_executor_id_1 = output[0]

invocation_id = graph.run(block_until_done=True)
output = graph.output(invocation_id, "get_function_executor_id_1")
self.assertEqual(len(output), 1)
function_executor_id_2 = output[0]

self.assertEqual(function_executor_id_1, function_executor_id_2)

def test_functions_of_different_versions_run_in_different_function_executors(self):
graph = Graph(
name=test_graph_name(self),
description="test",
start_node=get_function_executor_id_1,
version="1.0",
)
graph1 = RemoteGraph.deploy(graph)

invocation_id = graph1.run(block_until_done=True)
output = graph1.output(invocation_id, "get_function_executor_id_1")
self.assertEqual(len(output), 1)
function_executor_id_1 = output[0]

graph.version = "2.0"
graph2 = RemoteGraph.deploy(graph)
invocation_id = graph2.run(block_until_done=True)
output = graph2.output(invocation_id, "get_function_executor_id_1")
self.assertEqual(len(output), 1)
function_executor_id_2 = output[0]

self.assertNotEqual(function_executor_id_1, function_executor_id_2)

def test_different_functions_of_same_graph_run_in_different_function_executors(
self,
):
graph = Graph(
name=test_graph_name(self),
description="test",
start_node=get_function_executor_id_1,
)
graph.add_edge(get_function_executor_id_1, get_function_executor_id_2)
graph = RemoteGraph.deploy(graph)

invocation_id = graph.run(block_until_done=True)
output = graph.output(invocation_id, "get_function_executor_id_1")
self.assertEqual(len(output), 1)
function_executor_id_1 = output[0]

output = graph.output(invocation_id, "get_function_executor_id_2")
self.assertEqual(len(output), 1)
function_executor_id_2 = output[0]

self.assertNotEqual(function_executor_id_1, function_executor_id_2)

def test_same_functions_of_different_graphs_run_in_different_function_executors(
self,
):
graph1 = Graph(
name=test_graph_name(self) + "_1",
description="test",
start_node=get_function_executor_id_1,
)
graph1 = RemoteGraph.deploy(graph1)

graph2 = Graph(
name=test_graph_name(self) + "_2",
description="test",
start_node=get_function_executor_id_1,
)
graph2 = RemoteGraph.deploy(graph2)

invocation_id = graph1.run(block_until_done=True)
output = graph1.output(invocation_id, "get_function_executor_id_1")
self.assertEqual(len(output), 1)
function_executor_id_1 = output[0]

invocation_id = graph2.run(block_until_done=True)
output = graph2.output(invocation_id, "get_function_executor_id_1")
self.assertEqual(len(output), 1)
function_executor_id_2 = output[0]

self.assertNotEqual(function_executor_id_1, function_executor_id_2)


if __name__ == "__main__":
unittest.main()
Loading

0 comments on commit 8e06c7b

Please sign in to comment.