diff --git a/.gitignore b/.gitignore index 2ba4e260a..67047dec1 100644 --- a/.gitignore +++ b/.gitignore @@ -61,4 +61,3 @@ sqlite* # Miscellaneous /executor-py/~ - diff --git a/indexify/src/indexify/executor/function_executor/health_checker.py b/indexify/src/indexify/executor/function_executor/health_checker.py index 841cf9cbc..0f6c77703 100644 --- a/indexify/src/indexify/executor/function_executor/health_checker.py +++ b/indexify/src/indexify/executor/function_executor/health_checker.py @@ -4,7 +4,6 @@ from grpc.aio import AioRpcError -from indexify.function_executor.proto.configuration import HEALTH_CHECK_TIMEOUT_SEC from indexify.function_executor.proto.function_executor_pb2 import ( HealthCheckRequest, HealthCheckResponse, @@ -13,6 +12,8 @@ FunctionExecutorStub, ) +from .server.client_configuration import HEALTH_CHECK_TIMEOUT_SEC + HEALTH_CHECK_POLL_PERIOD_SEC = 10 diff --git a/indexify/src/indexify/executor/function_executor/server/client_configuration.py b/indexify/src/indexify/executor/function_executor/server/client_configuration.py new file mode 100644 index 000000000..e77c4195b --- /dev/null +++ b/indexify/src/indexify/executor/function_executor/server/client_configuration.py @@ -0,0 +1,20 @@ +# We send function inputs and outputs over gRPC. +# -1 means unlimited. We don't want to limit the size of data customers are using. +# The effective max message size in this case is about 1.9 GB, see the max payload test. +# This is due to internal hard gRPC limits. When we want to increase the message sizes +# we'll have to implement chunking for large messages. +_MAX_GRPC_MESSAGE_LENGTH = -1 + +# Optimize the channels for low latency connection establishement as we are running on the same host. +_RECONNECT_BACKOFF_MS = 100 + +GRPC_CHANNEL_OPTIONS = [ + ("grpc.max_receive_message_length", _MAX_GRPC_MESSAGE_LENGTH), + ("grpc.max_send_message_length", _MAX_GRPC_MESSAGE_LENGTH), + ("grpc.min_reconnect_backoff_ms", _RECONNECT_BACKOFF_MS), + ("grpc.max_reconnect_backoff_ms", _RECONNECT_BACKOFF_MS), + ("grpc.initial_reconnect_backoff_ms", _RECONNECT_BACKOFF_MS), +] + +# If a health check takes more than this duration then the server is considered unhealthy. +HEALTH_CHECK_TIMEOUT_SEC = 5 diff --git a/indexify/src/indexify/executor/function_executor/server/subprocess_function_executor_server.py b/indexify/src/indexify/executor/function_executor/server/subprocess_function_executor_server.py index 91cf8b74d..16cc4ec50 100644 --- a/indexify/src/indexify/executor/function_executor/server/subprocess_function_executor_server.py +++ b/indexify/src/indexify/executor/function_executor/server/subprocess_function_executor_server.py @@ -3,8 +3,7 @@ import grpc -from indexify.function_executor.proto.configuration import GRPC_CHANNEL_OPTIONS - +from .client_configuration import GRPC_CHANNEL_OPTIONS from .function_executor_server import FunctionExecutorServer diff --git a/indexify/src/indexify/function_executor/proto/configuration.py b/indexify/src/indexify/function_executor/proto/server_configuration.py similarity index 86% rename from indexify/src/indexify/function_executor/proto/configuration.py rename to indexify/src/indexify/function_executor/proto/server_configuration.py index 709508bdb..006d4f977 100644 --- a/indexify/src/indexify/function_executor/proto/configuration.py +++ b/indexify/src/indexify/function_executor/proto/server_configuration.py @@ -17,8 +17,3 @@ ("grpc.max_send_message_length", _MAX_GRPC_MESSAGE_LENGTH), ("grpc.so_reuseport", _REUSE_SERVER_PORT), ] - -GRPC_CHANNEL_OPTIONS = GRPC_SERVER_OPTIONS - -# If a health check takes more than this duration then the server is considered unhealthy. -HEALTH_CHECK_TIMEOUT_SEC = 5 diff --git a/indexify/src/indexify/function_executor/server.py b/indexify/src/indexify/function_executor/server.py index e125f39d5..bf32e5899 100644 --- a/indexify/src/indexify/function_executor/server.py +++ b/indexify/src/indexify/function_executor/server.py @@ -2,8 +2,8 @@ import grpc -from .proto.configuration import GRPC_SERVER_OPTIONS from .proto.function_executor_pb2_grpc import add_FunctionExecutorServicer_to_server +from .proto.server_configuration import GRPC_SERVER_OPTIONS from .service import Service # Temporary limit until we have a better way to control this. diff --git a/indexify/tests/function_executor/test_health_check.py b/indexify/tests/function_executor/test_health_check.py index 4f76ebf90..8f87d9dc1 100644 --- a/indexify/tests/function_executor/test_health_check.py +++ b/indexify/tests/function_executor/test_health_check.py @@ -15,7 +15,6 @@ run_task, ) -from indexify.function_executor.proto.configuration import HEALTH_CHECK_TIMEOUT_SEC from indexify.function_executor.proto.function_executor_pb2 import ( HealthCheckRequest, HealthCheckResponse, @@ -30,6 +29,7 @@ # Lower - faster tests but more CPU usage. HEALTH_CHECK_POLL_PERIOD_SEC = 0.1 +HEALTH_CHECK_TIMEOUT_SEC = 5 @tensorlake_function() diff --git a/indexify/tests/function_executor/testing.py b/indexify/tests/function_executor/testing.py index a7b73dfd6..db12465af 100644 --- a/indexify/tests/function_executor/testing.py +++ b/indexify/tests/function_executor/testing.py @@ -5,7 +5,6 @@ import grpc from tensorlake.functions_sdk.object_serializer import CloudPickleSerializer -from indexify.function_executor.proto.configuration import GRPC_CHANNEL_OPTIONS from indexify.function_executor.proto.function_executor_pb2 import ( FunctionOutput, RunTaskRequest, @@ -15,6 +14,7 @@ from indexify.function_executor.proto.function_executor_pb2_grpc import ( FunctionExecutorStub, ) +from indexify.function_executor.proto.server_configuration import GRPC_SERVER_OPTIONS # Default Executor range is 50000:51000. # Use a value outside of this range to not conflict with other tests. @@ -43,9 +43,10 @@ def __exit__(self, exc_type, exc_value, traceback): def rpc_channel(context_manager: FunctionExecutorProcessContextManager) -> grpc.Channel: + # The GRPC_SERVER_OPTIONS include the maximum message size which we need to set in the client channel. channel: grpc.Channel = grpc.insecure_channel( f"localhost:{context_manager.port}", - options=GRPC_CHANNEL_OPTIONS, + options=GRPC_SERVER_OPTIONS, ) try: SERVER_STARTUP_TIMEOUT_SEC = 5 diff --git a/tensorlake b/tensorlake index bf95aca9c..ee0c33036 160000 --- a/tensorlake +++ b/tensorlake @@ -1 +1 @@ -Subproject commit bf95aca9cdcb3467f56485c42831b366b6579f68 +Subproject commit ee0c330365f3fd13e6b4072a8c99235143ec5476