Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split gRPC Server and client settings and set connection backoff to 100 ms #1185

Merged
merged 1 commit into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,3 @@ sqlite*

# Miscellaneous
/executor-py/~

Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

from grpc.aio import AioRpcError

from indexify.function_executor.proto.configuration import HEALTH_CHECK_TIMEOUT_SEC
from indexify.function_executor.proto.function_executor_pb2 import (
HealthCheckRequest,
HealthCheckResponse,
Expand All @@ -13,6 +12,8 @@
FunctionExecutorStub,
)

from .server.client_configuration import HEALTH_CHECK_TIMEOUT_SEC

HEALTH_CHECK_POLL_PERIOD_SEC = 10


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# We send function inputs and outputs over gRPC.
# -1 means unlimited. We don't want to limit the size of data customers are using.
# The effective max message size in this case is about 1.9 GB, see the max payload test.
# This is due to internal hard gRPC limits. When we want to increase the message sizes
# we'll have to implement chunking for large messages.
_MAX_GRPC_MESSAGE_LENGTH = -1

# Optimize the channels for low latency connection establishement as we are running on the same host.
_RECONNECT_BACKOFF_MS = 100

GRPC_CHANNEL_OPTIONS = [
("grpc.max_receive_message_length", _MAX_GRPC_MESSAGE_LENGTH),
("grpc.max_send_message_length", _MAX_GRPC_MESSAGE_LENGTH),
("grpc.min_reconnect_backoff_ms", _RECONNECT_BACKOFF_MS),
("grpc.max_reconnect_backoff_ms", _RECONNECT_BACKOFF_MS),
("grpc.initial_reconnect_backoff_ms", _RECONNECT_BACKOFF_MS),
]

# If a health check takes more than this duration then the server is considered unhealthy.
HEALTH_CHECK_TIMEOUT_SEC = 5
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@

import grpc

from indexify.function_executor.proto.configuration import GRPC_CHANNEL_OPTIONS

from .client_configuration import GRPC_CHANNEL_OPTIONS
from .function_executor_server import FunctionExecutorServer


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,3 @@
("grpc.max_send_message_length", _MAX_GRPC_MESSAGE_LENGTH),
("grpc.so_reuseport", _REUSE_SERVER_PORT),
]

GRPC_CHANNEL_OPTIONS = GRPC_SERVER_OPTIONS

# If a health check takes more than this duration then the server is considered unhealthy.
HEALTH_CHECK_TIMEOUT_SEC = 5
2 changes: 1 addition & 1 deletion indexify/src/indexify/function_executor/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

import grpc

from .proto.configuration import GRPC_SERVER_OPTIONS
from .proto.function_executor_pb2_grpc import add_FunctionExecutorServicer_to_server
from .proto.server_configuration import GRPC_SERVER_OPTIONS
from .service import Service

# Temporary limit until we have a better way to control this.
Expand Down
2 changes: 1 addition & 1 deletion indexify/tests/function_executor/test_health_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
run_task,
)

from indexify.function_executor.proto.configuration import HEALTH_CHECK_TIMEOUT_SEC
from indexify.function_executor.proto.function_executor_pb2 import (
HealthCheckRequest,
HealthCheckResponse,
Expand All @@ -30,6 +29,7 @@

# Lower - faster tests but more CPU usage.
HEALTH_CHECK_POLL_PERIOD_SEC = 0.1
HEALTH_CHECK_TIMEOUT_SEC = 5


@tensorlake_function()
Expand Down
5 changes: 3 additions & 2 deletions indexify/tests/function_executor/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import grpc
from tensorlake.functions_sdk.object_serializer import CloudPickleSerializer

from indexify.function_executor.proto.configuration import GRPC_CHANNEL_OPTIONS
from indexify.function_executor.proto.function_executor_pb2 import (
FunctionOutput,
RunTaskRequest,
Expand All @@ -15,6 +14,7 @@
from indexify.function_executor.proto.function_executor_pb2_grpc import (
FunctionExecutorStub,
)
from indexify.function_executor.proto.server_configuration import GRPC_SERVER_OPTIONS

# Default Executor range is 50000:51000.
# Use a value outside of this range to not conflict with other tests.
Expand Down Expand Up @@ -43,9 +43,10 @@ def __exit__(self, exc_type, exc_value, traceback):


def rpc_channel(context_manager: FunctionExecutorProcessContextManager) -> grpc.Channel:
# The GRPC_SERVER_OPTIONS include the maximum message size which we need to set in the client channel.
channel: grpc.Channel = grpc.insecure_channel(
f"localhost:{context_manager.port}",
options=GRPC_CHANNEL_OPTIONS,
options=GRPC_SERVER_OPTIONS,
)
try:
SERVER_STARTUP_TIMEOUT_SEC = 5
Expand Down
Loading