Skip to content

Commit

Permalink
Split gRPC Server and client settings and set connection backoff to 1…
Browse files Browse the repository at this point in the history
…00 ms

The split is needed to move Function Executor into Tensorlake repo
so FE code only include grpc server settings.

The 100 ms channel connection backoff is needed to disable defaul exponential
backoff logic in grpc client which results in very long connection durations
if the first attempt to connect to grpc server wasn't successfull.

This reduces duration of SDK tests by about 4x.
  • Loading branch information
eabatalov committed Jan 23, 2025
1 parent 8e06c7b commit c453e23
Show file tree
Hide file tree
Showing 8 changed files with 28 additions and 13 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,3 @@ sqlite*

# Miscellaneous
/executor-py/~

Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

from grpc.aio import AioRpcError

from indexify.function_executor.proto.configuration import HEALTH_CHECK_TIMEOUT_SEC
from indexify.function_executor.proto.function_executor_pb2 import (
HealthCheckRequest,
HealthCheckResponse,
Expand All @@ -13,6 +12,8 @@
FunctionExecutorStub,
)

from .server.client_configuration import HEALTH_CHECK_TIMEOUT_SEC

HEALTH_CHECK_POLL_PERIOD_SEC = 10


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# We send function inputs and outputs over gRPC.
# -1 means unlimited. We don't want to limit the size of data customers are using.
# The effective max message size in this case is about 1.9 GB, see the max payload test.
# This is due to internal hard gRPC limits. When we want to increase the message sizes
# we'll have to implement chunking for large messages.
_MAX_GRPC_MESSAGE_LENGTH = -1

# Optimize the channels for low latency connection establishement as we are running on the same host.
_RECONNECT_BACKOFF_MS = 100

GRPC_CHANNEL_OPTIONS = [
("grpc.max_receive_message_length", _MAX_GRPC_MESSAGE_LENGTH),
("grpc.max_send_message_length", _MAX_GRPC_MESSAGE_LENGTH),
("grpc.min_reconnect_backoff_ms", _RECONNECT_BACKOFF_MS),
("grpc.max_reconnect_backoff_ms", _RECONNECT_BACKOFF_MS),
("grpc.initial_reconnect_backoff_ms", _RECONNECT_BACKOFF_MS),
]

# If a health check takes more than this duration then the server is considered unhealthy.
HEALTH_CHECK_TIMEOUT_SEC = 5
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@

import grpc

from indexify.function_executor.proto.configuration import GRPC_CHANNEL_OPTIONS

from .client_configuration import GRPC_CHANNEL_OPTIONS
from .function_executor_server import FunctionExecutorServer


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,3 @@
("grpc.max_send_message_length", _MAX_GRPC_MESSAGE_LENGTH),
("grpc.so_reuseport", _REUSE_SERVER_PORT),
]

GRPC_CHANNEL_OPTIONS = GRPC_SERVER_OPTIONS

# If a health check takes more than this duration then the server is considered unhealthy.
HEALTH_CHECK_TIMEOUT_SEC = 5
2 changes: 1 addition & 1 deletion indexify/src/indexify/function_executor/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

import grpc

from .proto.configuration import GRPC_SERVER_OPTIONS
from .proto.function_executor_pb2_grpc import add_FunctionExecutorServicer_to_server
from .proto.server_configuration import GRPC_SERVER_OPTIONS
from .service import Service

# Temporary limit until we have a better way to control this.
Expand Down
2 changes: 1 addition & 1 deletion indexify/tests/function_executor/test_health_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
run_task,
)

from indexify.function_executor.proto.configuration import HEALTH_CHECK_TIMEOUT_SEC
from indexify.function_executor.proto.function_executor_pb2 import (
HealthCheckRequest,
HealthCheckResponse,
Expand All @@ -30,6 +29,7 @@

# Lower - faster tests but more CPU usage.
HEALTH_CHECK_POLL_PERIOD_SEC = 0.1
HEALTH_CHECK_TIMEOUT_SEC = 5


@tensorlake_function()
Expand Down
5 changes: 3 additions & 2 deletions indexify/tests/function_executor/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import grpc
from tensorlake.functions_sdk.object_serializer import CloudPickleSerializer

from indexify.function_executor.proto.configuration import GRPC_CHANNEL_OPTIONS
from indexify.function_executor.proto.function_executor_pb2 import (
FunctionOutput,
RunTaskRequest,
Expand All @@ -15,6 +14,7 @@
from indexify.function_executor.proto.function_executor_pb2_grpc import (
FunctionExecutorStub,
)
from indexify.function_executor.proto.server_configuration import GRPC_SERVER_OPTIONS

# Default Executor range is 50000:51000.
# Use a value outside of this range to not conflict with other tests.
Expand Down Expand Up @@ -43,9 +43,10 @@ def __exit__(self, exc_type, exc_value, traceback):


def rpc_channel(context_manager: FunctionExecutorProcessContextManager) -> grpc.Channel:
# The GRPC_SERVER_OPTIONS include the maximum message size which we need to set in the client channel.
channel: grpc.Channel = grpc.insecure_channel(
f"localhost:{context_manager.port}",
options=GRPC_CHANNEL_OPTIONS,
options=GRPC_SERVER_OPTIONS,
)
try:
SERVER_STARTUP_TIMEOUT_SEC = 5
Expand Down

0 comments on commit c453e23

Please sign in to comment.