From e664e62ce6c84e61609f073be836cf31ce653333 Mon Sep 17 00:00:00 2001 From: Bright Chen Date: Sat, 10 Aug 2024 01:16:34 +0800 Subject: [PATCH 1/5] Support backup request policy --- src/brpc/backup_request_policy.h | 40 ++++++++++ src/brpc/channel.cpp | 3 + src/brpc/channel.h | 7 ++ src/brpc/controller.cpp | 11 +++ src/brpc/controller.h | 7 +- test/brpc_channel_unittest.cpp | 131 ++++++++++++++++++++++++++++++- 6 files changed, 196 insertions(+), 3 deletions(-) create mode 100644 src/brpc/backup_request_policy.h diff --git a/src/brpc/backup_request_policy.h b/src/brpc/backup_request_policy.h new file mode 100644 index 0000000000..c8d04fbf82 --- /dev/null +++ b/src/brpc/backup_request_policy.h @@ -0,0 +1,40 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +#ifndef BRPC_BACKUP_REQUEST_POLICY_H +#define BRPC_BACKUP_REQUEST_POLICY_H + +#include "brpc/controller.h" + +namespace brpc { + +class BackupRequestPolicy { +public: + virtual ~BackupRequestPolicy() = default; + + // Return the time in milliseconds in which another request + // will be sent if RPC does not finish. + virtual int32_t GetBackupRequestMs() const = 0; + + // Return true if the backup request should be sent. + virtual bool DoBackup(const Controller* controller) const = 0; +}; + +} + +#endif // BRPC_BACKUP_REQUEST_POLICY_H diff --git a/src/brpc/channel.cpp b/src/brpc/channel.cpp index 5fc6609637..dbeedf878b 100644 --- a/src/brpc/channel.cpp +++ b/src/brpc/channel.cpp @@ -55,6 +55,7 @@ ChannelOptions::ChannelOptions() , log_succeed_without_server(true) , use_rdma(false) , auth(NULL) + , backup_request_policy(NULL) , retry_policy(NULL) , ns_filter(NULL) {} @@ -497,6 +498,7 @@ void Channel::CallMethod(const google::protobuf::MethodDescriptor* method, cntl->_connect_timeout_ms = _options.connect_timeout_ms; if (cntl->backup_request_ms() == UNSET_MAGIC_NUM) { cntl->set_backup_request_ms(_options.backup_request_ms); + cntl->_backup_request_policy = _options.backup_request_policy; } if (cntl->connection_type() == CONNECTION_TYPE_UNKNOWN) { cntl->set_connection_type(_options.connection_type); @@ -536,6 +538,7 @@ void Channel::CallMethod(const google::protobuf::MethodDescriptor* method, // Currently we cannot handle retry and backup request correctly cntl->set_max_retry(0); cntl->set_backup_request_ms(-1); + cntl->_backup_request_policy = NULL; } if (cntl->backup_request_ms() >= 0 && diff --git a/src/brpc/channel.h b/src/brpc/channel.h index 651c5084bb..304c6114c3 100644 --- a/src/brpc/channel.h +++ b/src/brpc/channel.h @@ -34,6 +34,7 @@ #include "brpc/controller.h" // brpc::Controller #include "brpc/details/profiler_linker.h" #include "brpc/retry_policy.h" +#include "brpc/backup_request_policy.h" #include "brpc/naming_service_filter.h" namespace brpc { @@ -112,6 +113,12 @@ struct ChannelOptions { // Default: NULL const Authenticator* auth; + // Customize the backup request time and whether to send backup request. + // Priority: `backup_request_policy' > `backup_request_ms'. + // This object is NOT owned by channel and should remain valid when channel is used. + // Default: NULL + const BackupRequestPolicy* backup_request_policy; + // Customize the error code that should be retried. The interface is // defined in src/brpc/retry_policy.h // This object is NOT owned by channel and should remain valid when diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp index 98e25ae2c6..858432afbb 100644 --- a/src/brpc/controller.cpp +++ b/src/brpc/controller.cpp @@ -258,6 +258,7 @@ void Controller::ResetPods() { _connection_type = CONNECTION_TYPE_UNKNOWN; _timeout_ms = UNSET_MAGIC_NUM; _backup_request_ms = UNSET_MAGIC_NUM; + _backup_request_policy = NULL; _connect_timeout_ms = UNSET_MAGIC_NUM; _real_timeout_ms = UNSET_MAGIC_NUM; _deadline_us = -1; @@ -344,6 +345,11 @@ void Controller::set_backup_request_ms(int64_t timeout_ms) { } } +int64_t Controller::backup_request_ms() const { + return NULL != _backup_request_policy ? + _backup_request_policy->GetBackupRequestMs() : _backup_request_ms; +} + void Controller::set_max_retry(int max_retry) { if (max_retry > MAX_RETRY_COUNT) { LOG(WARNING) << "Retry count can't be larger than " @@ -1259,6 +1265,11 @@ int Controller::HandleSocketFailed(bthread_id_t id, void* data, int error_code, cntl->timeout_ms(), butil::endpoint2str(cntl->remote_side()).c_str()); } else if (error_code == EBACKUPREQUEST) { + const BackupRequestPolicy* policy = cntl->_backup_request_policy; + if (NULL != policy && !policy->DoBackup(cntl)) { + // No need to do backup request. + return bthread_id_unlock(id); + } cntl->SetFailed(error_code, "Reached backup timeout=%" PRId64 "ms @%s", cntl->backup_request_ms(), butil::endpoint2str(cntl->remote_side()).c_str()); diff --git a/src/brpc/controller.h b/src/brpc/controller.h index 5b2132b4f2..ce6100f068 100644 --- a/src/brpc/controller.h +++ b/src/brpc/controller.h @@ -47,6 +47,7 @@ #include "brpc/grpc.h" #include "brpc/kvmap.h" #include "brpc/rpc_dump.h" +#include "brpc/backup_request_policy.h" // EAUTH is defined in MAC #ifndef EAUTH @@ -180,7 +181,7 @@ friend void policy::ProcessThriftRequest(InputMessageBase*); // Set/get the delay to send backup request in milliseconds. Use // ChannelOptions.backup_request_ms on unset. void set_backup_request_ms(int64_t timeout_ms); - int64_t backup_request_ms() const { return _backup_request_ms; } + int64_t backup_request_ms() const; // Set/get maximum times of retrying. Use ChannelOptions.max_retry on unset. // <=0 means no retry. @@ -670,7 +671,7 @@ friend void policy::ProcessThriftRequest(InputMessageBase*); struct ClientSettings { int32_t timeout_ms; int32_t backup_request_ms; - int max_retry; + int max_retry; int32_t tos; ConnectionType connection_type; CompressType request_compress_type; @@ -800,6 +801,8 @@ friend void policy::ProcessThriftRequest(InputMessageBase*); int32_t _timeout_ms; int32_t _connect_timeout_ms; int32_t _backup_request_ms; + // Copied from `Channel' which might be destroyed after CallMethod. + const BackupRequestPolicy* _backup_request_policy; // If this rpc call has retry/backup request,this var save the real timeout for current call int64_t _real_timeout_ms; // Deadline of this RPC (since the Epoch in microseconds). diff --git a/test/brpc_channel_unittest.cpp b/test/brpc_channel_unittest.cpp index d43a0f4b95..6e12efa98c 100644 --- a/test/brpc_channel_unittest.cpp +++ b/test/brpc_channel_unittest.cpp @@ -310,7 +310,8 @@ class ChannelTest : public ::testing::Test{ bool single_server, bool short_connection, const brpc::Authenticator* auth = NULL, - std::string connection_group = std::string()) { + std::string connection_group = std::string(), + bool use_backup_request_policy = false) { brpc::ChannelOptions opt; if (short_connection) { opt.connection_type = brpc::CONNECTION_TYPE_SHORT; @@ -318,6 +319,9 @@ class ChannelTest : public ::testing::Test{ opt.auth = auth; opt.max_retry = 0; opt.connection_group = connection_group; + if (use_backup_request_policy) { + opt.backup_request_policy = &_backup_request_policy; + } if (single_server) { EXPECT_EQ(0, channel->Init(_ep, &opt)); } else { @@ -1918,6 +1922,110 @@ class ChannelTest : public ::testing::Test{ StopAndJoin(); } + void TestBackupRequest(bool single_server, bool async, + bool short_connection) { + std::cout << " *** single=" << single_server + << " async=" << async + << " short=" << short_connection << std::endl; + + ASSERT_EQ(0, StartAccept(_ep)); + brpc::Channel channel; + SetUpChannel(&channel, single_server, short_connection); + + const int RETRY_NUM = 1; + test::EchoRequest req; + test::EchoResponse res; + brpc::Controller cntl; + req.set_message(__FUNCTION__); + + cntl.set_max_retry(RETRY_NUM); + cntl.set_backup_request_ms(10); // 10ms + cntl.set_timeout_ms(100); // 10ms + req.set_sleep_us(50000); // 100ms + CallMethod(&channel, &cntl, &req, &res, async); + ASSERT_EQ(0, cntl.ErrorCode()) << cntl.ErrorText(); + ASSERT_TRUE(cntl.has_backup_request()); + ASSERT_EQ(RETRY_NUM, cntl.retried_count()); + bthread_usleep(70000); // wait for the sleep task to finish + + if (short_connection) { + // Sleep to let `_messenger' detect `Socket' being `SetFailed' + const int64_t start_time = butil::gettimeofday_us(); + while (_messenger.ConnectionCount() != 0) { + EXPECT_LT(butil::gettimeofday_us(), start_time + 100000L/*100ms*/); + bthread_usleep(1000); + } + } else { + EXPECT_GE(1ul, _messenger.ConnectionCount()); + } + StopAndJoin(); + } + + class BackupRequestPolicyImpl : public brpc::BackupRequestPolicy { + public: + int32_t GetBackupRequestMs() const override { + return 10; + } + + // Return true if the backup request should be sent. + bool DoBackup(const brpc::Controller*) const override { + return backup; + } + + bool backup{true}; + + }; + + void TestBackupRequestPolicy(bool single_server, bool async, + bool short_connection) { + ASSERT_EQ(0, StartAccept(_ep)); + for (int i = 0; i < 3; ++i) { + bool backup = i != 1; + std::cout << " *** single=" << single_server + << " async=" << async + << " short=" << short_connection + << " backup=" << backup + << std::endl; + + brpc::Channel channel; + SetUpChannel(&channel, single_server, short_connection, NULL, "", true); + + const int RETRY_NUM = 1; + test::EchoRequest req; + test::EchoResponse res; + brpc::Controller cntl; + req.set_message(__FUNCTION__); + + _backup_request_policy.backup = i == 0; + if (i == 2) { + // use `set_backup_request_ms'. + // Although _backup_request_policy.DoBackup return false, it is ignored. + cntl.set_backup_request_ms(10); // 10ms + } + cntl.set_max_retry(RETRY_NUM); + cntl.set_timeout_ms(100); // 100ms + req.set_sleep_us(50000); // 50ms + CallMethod(&channel, &cntl, &req, &res, async); + ASSERT_EQ(0, cntl.ErrorCode()) << cntl.ErrorText(); + ASSERT_EQ(backup, cntl.has_backup_request()); + ASSERT_EQ(backup ? RETRY_NUM : 0, cntl.retried_count()); + bthread_usleep(70000); // wait for the sleep task to finish + + if (short_connection) { + // Sleep to let `_messenger' detect `Socket' being `SetFailed' + const int64_t start_time = butil::gettimeofday_us(); + while (_messenger.ConnectionCount() != 0) { + EXPECT_LT(butil::gettimeofday_us(), start_time + 100000L/*100ms*/); + bthread_usleep(1000); + } + } else { + EXPECT_GE(1ul, _messenger.ConnectionCount()); + } + } + + StopAndJoin(); + } + butil::EndPoint _ep; butil::TempFile _server_list; std::string _naming_url; @@ -1930,6 +2038,7 @@ class ChannelTest : public ::testing::Test{ bool _close_fd_once; MyEchoService _svc; + BackupRequestPolicyImpl _backup_request_policy; }; class MyShared : public brpc::SharedObject { @@ -2597,6 +2706,26 @@ TEST_F(ChannelTest, retry_backoff) { } } +TEST_F(ChannelTest, backup_request) { + for (int i = 0; i <= 1; ++i) { // Flag SingleServer + for (int j = 0; j <= 1; ++j) { // Flag Asynchronous + for (int k = 0; k <= 1; ++k) { // Flag ShortConnection + TestBackupRequest(i, j, k); + } + } + } +} + +TEST_F(ChannelTest, backup_request_policy) { + for (int i = 0; i <= 1; ++i) { // Flag SingleServer + for (int j = 0; j <= 1; ++j) { // Flag Asynchronous + for (int k = 0; k <= 1; ++k) { // Flag ShortConnection + TestBackupRequestPolicy(i, j, k); + } + } + } +} + TEST_F(ChannelTest, multiple_threads_single_channel) { srand(time(NULL)); ASSERT_EQ(0, StartAccept(_ep)); From c8a88bf5b7fd7222ee7e174641745a95a847175a Mon Sep 17 00:00:00 2001 From: Bright Chen Date: Sat, 10 Aug 2024 17:55:54 +0800 Subject: [PATCH 2/5] Support Controller::set_backup_request_policy --- src/brpc/channel.cpp | 3 ++- src/brpc/channel.h | 7 +++++-- src/brpc/controller.cpp | 11 +++++++++-- src/brpc/controller.h | 6 +++++- test/brpc_channel_unittest.cpp | 15 +++++---------- 5 files changed, 26 insertions(+), 16 deletions(-) diff --git a/src/brpc/channel.cpp b/src/brpc/channel.cpp index dbeedf878b..c15611ea8f 100644 --- a/src/brpc/channel.cpp +++ b/src/brpc/channel.cpp @@ -496,7 +496,8 @@ void Channel::CallMethod(const google::protobuf::MethodDescriptor* method, // overriding connect_timeout_ms does not make sense, just use the // one in ChannelOptions cntl->_connect_timeout_ms = _options.connect_timeout_ms; - if (cntl->backup_request_ms() == UNSET_MAGIC_NUM) { + if (cntl->backup_request_ms() == UNSET_MAGIC_NUM && + NULL == cntl->_backup_request_policy) { cntl->set_backup_request_ms(_options.backup_request_ms); cntl->_backup_request_policy = _options.backup_request_policy; } diff --git a/src/brpc/channel.h b/src/brpc/channel.h index 304c6114c3..59e4c80230 100644 --- a/src/brpc/channel.h +++ b/src/brpc/channel.h @@ -56,11 +56,12 @@ struct ChannelOptions { int32_t timeout_ms; // Send another request if RPC does not finish after so many milliseconds. - // Overridable by Controller.set_backup_request_ms(). + // Overridable by Controller.set_backup_request_ms() or + // Controller.set_backup_request_policy(). // The request will be sent to a different server by best effort. // If timeout_ms is set and backup_request_ms >= timeout_ms, backup request // will never be sent. - // backup request does NOT imply server-side cancelation. + // backup request does NOT imply server-side cancellation. // Default: -1 (disabled) // Maximum: 0x7fffffff (roughly 30 days) int32_t backup_request_ms; @@ -115,6 +116,8 @@ struct ChannelOptions { // Customize the backup request time and whether to send backup request. // Priority: `backup_request_policy' > `backup_request_ms'. + // Overridable by Controller.set_backup_request_ms() or + // Controller.set_backup_request_policy(). // This object is NOT owned by channel and should remain valid when channel is used. // Default: NULL const BackupRequestPolicy* backup_request_policy; diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp index 858432afbb..285638c273 100644 --- a/src/brpc/controller.cpp +++ b/src/brpc/controller.cpp @@ -346,8 +346,13 @@ void Controller::set_backup_request_ms(int64_t timeout_ms) { } int64_t Controller::backup_request_ms() const { - return NULL != _backup_request_policy ? - _backup_request_policy->GetBackupRequestMs() : _backup_request_ms; + int timeout_ms = NULL != _backup_request_policy ? + _backup_request_policy->GetBackupRequestMs() : _backup_request_ms; + if (timeout_ms > 0x7fffffff) { + timeout_ms = 0x7fffffff; + LOG(WARNING) << "backup_request_ms is limited to 0x7fffffff (roughly 24 days)"; + } + return timeout_ms; } void Controller::set_max_retry(int max_retry) { @@ -1324,6 +1329,7 @@ CallId Controller::call_id() { void Controller::SaveClientSettings(ClientSettings* s) const { s->timeout_ms = _timeout_ms; s->backup_request_ms = _backup_request_ms; + s->backup_request_policy = _backup_request_policy; s->max_retry = _max_retry; s->tos = _tos; s->connection_type = _connection_type; @@ -1336,6 +1342,7 @@ void Controller::SaveClientSettings(ClientSettings* s) const { void Controller::ApplyClientSettings(const ClientSettings& s) { set_timeout_ms(s.timeout_ms); set_backup_request_ms(s.backup_request_ms); + set_backup_request_policy(s.backup_request_policy); set_max_retry(s.max_retry); set_type_of_service(s.tos); set_connection_type(s.connection_type); diff --git a/src/brpc/controller.h b/src/brpc/controller.h index ce6100f068..2e160fd099 100644 --- a/src/brpc/controller.h +++ b/src/brpc/controller.h @@ -181,6 +181,9 @@ friend void policy::ProcessThriftRequest(InputMessageBase*); // Set/get the delay to send backup request in milliseconds. Use // ChannelOptions.backup_request_ms on unset. void set_backup_request_ms(int64_t timeout_ms); + void set_backup_request_policy(const BackupRequestPolicy* policy) { + _backup_request_policy = policy; + } int64_t backup_request_ms() const; // Set/get maximum times of retrying. Use ChannelOptions.max_retry on unset. @@ -671,6 +674,7 @@ friend void policy::ProcessThriftRequest(InputMessageBase*); struct ClientSettings { int32_t timeout_ms; int32_t backup_request_ms; + const BackupRequestPolicy* backup_request_policy; int max_retry; int32_t tos; ConnectionType connection_type; @@ -801,7 +805,7 @@ friend void policy::ProcessThriftRequest(InputMessageBase*); int32_t _timeout_ms; int32_t _connect_timeout_ms; int32_t _backup_request_ms; - // Copied from `Channel' which might be destroyed after CallMethod. + // Priority: `_backup_request_policy' > `_backup_request_ms'. const BackupRequestPolicy* _backup_request_policy; // If this rpc call has retry/backup request,this var save the real timeout for current call int64_t _real_timeout_ms; diff --git a/test/brpc_channel_unittest.cpp b/test/brpc_channel_unittest.cpp index 6e12efa98c..dd7285e71d 100644 --- a/test/brpc_channel_unittest.cpp +++ b/test/brpc_channel_unittest.cpp @@ -1979,8 +1979,8 @@ class ChannelTest : public ::testing::Test{ void TestBackupRequestPolicy(bool single_server, bool async, bool short_connection) { ASSERT_EQ(0, StartAccept(_ep)); - for (int i = 0; i < 3; ++i) { - bool backup = i != 1; + for (int i = 0; i < 2; ++i) { + bool backup = i == 0; std::cout << " *** single=" << single_server << " async=" << async << " short=" << short_connection @@ -1996,12 +1996,7 @@ class ChannelTest : public ::testing::Test{ brpc::Controller cntl; req.set_message(__FUNCTION__); - _backup_request_policy.backup = i == 0; - if (i == 2) { - // use `set_backup_request_ms'. - // Although _backup_request_policy.DoBackup return false, it is ignored. - cntl.set_backup_request_ms(10); // 10ms - } + _backup_request_policy.backup = backup; cntl.set_max_retry(RETRY_NUM); cntl.set_timeout_ms(100); // 100ms req.set_sleep_us(50000); // 50ms @@ -2015,11 +2010,11 @@ class ChannelTest : public ::testing::Test{ // Sleep to let `_messenger' detect `Socket' being `SetFailed' const int64_t start_time = butil::gettimeofday_us(); while (_messenger.ConnectionCount() != 0) { - EXPECT_LT(butil::gettimeofday_us(), start_time + 100000L/*100ms*/); + ASSERT_LT(butil::gettimeofday_us(), start_time + 100000L/*100ms*/); bthread_usleep(1000); } } else { - EXPECT_GE(1ul, _messenger.ConnectionCount()); + ASSERT_GE(1ul, _messenger.ConnectionCount()); } } From c69a4e351130e3ad703e517e6fd78da5d3a79914 Mon Sep 17 00:00:00 2001 From: Bright Chen Date: Mon, 19 Aug 2024 23:48:39 +0800 Subject: [PATCH 3/5] Pass Controller to GetBackupRequestMs and update cn/client.md --- docs/cn/client.md | 26 ++++++++++++++++++++++++++ src/brpc/backup_request_policy.h | 2 +- src/brpc/controller.cpp | 2 +- src/brpc/controller.h | 2 +- test/brpc_channel_unittest.cpp | 2 +- 5 files changed, 30 insertions(+), 4 deletions(-) diff --git a/docs/cn/client.md b/docs/cn/client.md index 27f1fa7025..e0c47c6262 100755 --- a/docs/cn/client.md +++ b/docs/cn/client.md @@ -708,6 +708,32 @@ options.retry_policy = &g_my_retry_policy; - [brpc::RpcRetryPolicyWithFixedBackoff](https://github.com/apache/brpc/blob/master/src/brpc/retry_policy.h)(固定时间间隔退策略)和[brpc::RpcRetryPolicyWithJitteredBackoff](https://github.com/apache/brpc/blob/master/src/brpc/retry_policy.h)(随机时间间隔退策略)继承了[brpc::RpcRetryPolicy](https://github.com/apache/brpc/blob/master/src/brpc/retry_policy.h),使用框架默认的DoRetry。 - 在pthread中进行重试退避(实际上通过bthread_usleep实现)会阻塞pthread,所以默认不会在pthread上进行重试退避。 +### backup request + +如果response没有在backup_request_ms内返回,则发送另外一个请求,哪个先回来就取哪个。新请求会被尽量送到不同的server。注意如果backup_request_ms大于超时,则backup request总不会被发送。backup request会消耗一次重试次数。backup request不意味着server端cancel。 + +ChannelOptions.backup_request_ms影响该Channel上所有RPC,单位毫秒,默认值-1(表示不开启)。Controller.set_backup_request_ms()可修改某次RPC的值。 + +用户可以通过继承[brpc::BackupRequestPolicy](https://github.com/apache/brpc/blob/master/src/brpc/backup_request_policy.h)自定义策略(backup_request_ms和熔断backup request)。 比如根据延时调节backup_request_ms或者根据错误率调节发起backup request的百分比。 + +ChannelOptions.backup_request_policy同样影响该Channel上所有RPC。Controller.set_backup_request_policy()可修改某次RPC的策略。backup_request_policy优先级高于backup_request_ms。 + +brpc::BackupRequestPolicy接口如下: + +```c++ +class BackupRequestPolicy { +public: + virtual ~BackupRequestPolicy() = default; + + // Return the time in milliseconds in which another request + // will be sent if RPC does not finish. + virtual int32_t GetBackupRequestMs(const Controller* controller) const = 0; + + // Return true if the backup request should be sent. + virtual bool DoBackup(const Controller* controller) const = 0; +}; +``` + ### 重试应当保守 由于成本的限制,大部分线上server的冗余度是有限的,主要是满足多机房互备的需求。而激进的重试逻辑很容易导致众多client对server集群造成2-3倍的压力,最终使集群雪崩:由于server来不及处理导致队列越积越长,使所有的请求得经过很长的排队才被处理而最终超时,相当于服务停摆。默认的重试是比较安全的: 只要连接不断RPC就不会重试,一般不会产生大量的重试请求。用户可以通过RetryPolicy定制重试策略,但也可能使重试变成一场“风暴”。当你定制RetryPolicy时,你需要仔细考虑client和server的协作关系,并设计对应的异常测试,以确保行为符合预期。 diff --git a/src/brpc/backup_request_policy.h b/src/brpc/backup_request_policy.h index c8d04fbf82..774e72178a 100644 --- a/src/brpc/backup_request_policy.h +++ b/src/brpc/backup_request_policy.h @@ -29,7 +29,7 @@ class BackupRequestPolicy { // Return the time in milliseconds in which another request // will be sent if RPC does not finish. - virtual int32_t GetBackupRequestMs() const = 0; + virtual int32_t GetBackupRequestMs(const Controller* controller) const = 0; // Return true if the backup request should be sent. virtual bool DoBackup(const Controller* controller) const = 0; diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp index 285638c273..19e7f771de 100644 --- a/src/brpc/controller.cpp +++ b/src/brpc/controller.cpp @@ -347,7 +347,7 @@ void Controller::set_backup_request_ms(int64_t timeout_ms) { int64_t Controller::backup_request_ms() const { int timeout_ms = NULL != _backup_request_policy ? - _backup_request_policy->GetBackupRequestMs() : _backup_request_ms; + _backup_request_policy->GetBackupRequestMs(this) : _backup_request_ms; if (timeout_ms > 0x7fffffff) { timeout_ms = 0x7fffffff; LOG(WARNING) << "backup_request_ms is limited to 0x7fffffff (roughly 24 days)"; diff --git a/src/brpc/controller.h b/src/brpc/controller.h index 2e160fd099..70ae4afdfd 100644 --- a/src/brpc/controller.h +++ b/src/brpc/controller.h @@ -47,7 +47,6 @@ #include "brpc/grpc.h" #include "brpc/kvmap.h" #include "brpc/rpc_dump.h" -#include "brpc/backup_request_policy.h" // EAUTH is defined in MAC #ifndef EAUTH @@ -72,6 +71,7 @@ class RPCSender; class StreamSettings; class MongoContext; class RetryPolicy; +class BackupRequestPolicy; class InputMessageBase; class ThriftStub; namespace policy { diff --git a/test/brpc_channel_unittest.cpp b/test/brpc_channel_unittest.cpp index dd7285e71d..0d25dd7db1 100644 --- a/test/brpc_channel_unittest.cpp +++ b/test/brpc_channel_unittest.cpp @@ -1963,7 +1963,7 @@ class ChannelTest : public ::testing::Test{ class BackupRequestPolicyImpl : public brpc::BackupRequestPolicy { public: - int32_t GetBackupRequestMs() const override { + int32_t GetBackupRequestMs(const brpc::Controller*) const override { return 10; } From 0b9208fac2cc89bcbd9bad1804b9a49b8c361ba6 Mon Sep 17 00:00:00 2001 From: Bright Chen Date: Wed, 21 Aug 2024 22:43:44 +0800 Subject: [PATCH 4/5] Feedback call info --- docs/cn/client.md | 11 +++++------ src/brpc/backup_request_policy.h | 3 +++ src/brpc/channel.h | 2 +- src/brpc/controller.cpp | 10 +++++++++- src/brpc/controller.h | 10 ++++------ test/brpc_channel_unittest.cpp | 2 ++ 6 files changed, 24 insertions(+), 14 deletions(-) diff --git a/docs/cn/client.md b/docs/cn/client.md index e0c47c6262..0cf3dc7519 100755 --- a/docs/cn/client.md +++ b/docs/cn/client.md @@ -584,10 +584,6 @@ r34717后Controller.has_backup_request()获知是否发送过backup_request。 如果server一直没有返回,但连接没有问题,这种情况下不会重试。如果你需要在一定时间后发送另一个请求,使用backup request。 -工作机制如下:如果response没有在backup_request_ms内返回,则发送另外一个请求,哪个先回来就取哪个。新请求会被尽量送到不同的server。注意如果backup_request_ms大于超时,则backup request总不会被发送。backup request会消耗一次重试次数。backup request不意味着server端cancel。 - -ChannelOptions.backup_request_ms影响该Channel上所有RPC,单位毫秒,默认值-1(表示不开启),Controller.set_backup_request_ms()可修改某次RPC的值。 - ### 没到超时 超时后RPC会尽快结束。 @@ -710,11 +706,11 @@ options.retry_policy = &g_my_retry_policy; ### backup request -如果response没有在backup_request_ms内返回,则发送另外一个请求,哪个先回来就取哪个。新请求会被尽量送到不同的server。注意如果backup_request_ms大于超时,则backup request总不会被发送。backup request会消耗一次重试次数。backup request不意味着server端cancel。 +工作机制如下:如果response没有在backup_request_ms内返回,则发送另外一个请求,哪个先回来就取哪个。新请求会被尽量送到不同的server。注意如果backup_request_ms大于超时,则backup request总不会被发送。backup request会消耗一次重试次数。backup request不意味着server端cancel。 ChannelOptions.backup_request_ms影响该Channel上所有RPC,单位毫秒,默认值-1(表示不开启)。Controller.set_backup_request_ms()可修改某次RPC的值。 -用户可以通过继承[brpc::BackupRequestPolicy](https://github.com/apache/brpc/blob/master/src/brpc/backup_request_policy.h)自定义策略(backup_request_ms和熔断backup request)。 比如根据延时调节backup_request_ms或者根据错误率调节发起backup request的百分比。 +用户可以通过继承[brpc::BackupRequestPolicy](https://github.com/apache/brpc/blob/master/src/brpc/backup_request_policy.h)自定义策略(backup_request_ms和熔断backup request)。 比如根据延时调节backup_request_ms或者根据错误率熔断部分backup request。 ChannelOptions.backup_request_policy同样影响该Channel上所有RPC。Controller.set_backup_request_policy()可修改某次RPC的策略。backup_request_policy优先级高于backup_request_ms。 @@ -731,6 +727,9 @@ public: // Return true if the backup request should be sent. virtual bool DoBackup(const Controller* controller) const = 0; + + // Called when a rpc is end, user can collect call information to adjust policy. + virtual void OnRPCEnd(const Controller* controller) = 0; }; ``` diff --git a/src/brpc/backup_request_policy.h b/src/brpc/backup_request_policy.h index 774e72178a..ea254f1dbf 100644 --- a/src/brpc/backup_request_policy.h +++ b/src/brpc/backup_request_policy.h @@ -33,6 +33,9 @@ class BackupRequestPolicy { // Return true if the backup request should be sent. virtual bool DoBackup(const Controller* controller) const = 0; + + // Called when a rpc is end, user can collect call information to adjust policy. + virtual void OnRPCEnd(const Controller* controller) = 0; }; } diff --git a/src/brpc/channel.h b/src/brpc/channel.h index 59e4c80230..a5412eaa68 100644 --- a/src/brpc/channel.h +++ b/src/brpc/channel.h @@ -120,7 +120,7 @@ struct ChannelOptions { // Controller.set_backup_request_policy(). // This object is NOT owned by channel and should remain valid when channel is used. // Default: NULL - const BackupRequestPolicy* backup_request_policy; + BackupRequestPolicy* backup_request_policy; // Customize the error code that should be retried. The interface is // defined in src/brpc/retry_policy.h diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp index 19e7f771de..18cb9b109f 100644 --- a/src/brpc/controller.cpp +++ b/src/brpc/controller.cpp @@ -980,6 +980,14 @@ void Controller::EndRPC(const CompletionInfo& info) { CHECK_EQ(0, bthread_id_unlock_and_destroy(saved_cid)); } } + +void Controller::OnRPCEnd(int64_t end_time_us) { + _end_time_us = end_time_us; + if (NULL != _backup_request_policy) { + _backup_request_policy->OnRPCEnd(this); + } +} + void Controller::RunDoneInBackupThread(void* arg) { static_cast(arg)->DoneInBackupThread(); } @@ -1270,7 +1278,7 @@ int Controller::HandleSocketFailed(bthread_id_t id, void* data, int error_code, cntl->timeout_ms(), butil::endpoint2str(cntl->remote_side()).c_str()); } else if (error_code == EBACKUPREQUEST) { - const BackupRequestPolicy* policy = cntl->_backup_request_policy; + BackupRequestPolicy* policy = cntl->_backup_request_policy; if (NULL != policy && !policy->DoBackup(cntl)) { // No need to do backup request. return bthread_id_unlock(id); diff --git a/src/brpc/controller.h b/src/brpc/controller.h index 70ae4afdfd..9b3c0201ae 100644 --- a/src/brpc/controller.h +++ b/src/brpc/controller.h @@ -181,7 +181,7 @@ friend void policy::ProcessThriftRequest(InputMessageBase*); // Set/get the delay to send backup request in milliseconds. Use // ChannelOptions.backup_request_ms on unset. void set_backup_request_ms(int64_t timeout_ms); - void set_backup_request_policy(const BackupRequestPolicy* policy) { + void set_backup_request_policy(BackupRequestPolicy* policy) { _backup_request_policy = policy; } int64_t backup_request_ms() const; @@ -674,7 +674,7 @@ friend void policy::ProcessThriftRequest(InputMessageBase*); struct ClientSettings { int32_t timeout_ms; int32_t backup_request_ms; - const BackupRequestPolicy* backup_request_policy; + BackupRequestPolicy* backup_request_policy; int max_retry; int32_t tos; ConnectionType connection_type; @@ -742,9 +742,7 @@ friend void policy::ProcessThriftRequest(InputMessageBase*); _end_time_us = begin_time_us; } - void OnRPCEnd(int64_t end_time_us) { - _end_time_us = end_time_us; - } + void OnRPCEnd(int64_t end_time_us); static void RunDoneInBackupThread(void*); void DoneInBackupThread(); @@ -806,7 +804,7 @@ friend void policy::ProcessThriftRequest(InputMessageBase*); int32_t _connect_timeout_ms; int32_t _backup_request_ms; // Priority: `_backup_request_policy' > `_backup_request_ms'. - const BackupRequestPolicy* _backup_request_policy; + BackupRequestPolicy* _backup_request_policy; // If this rpc call has retry/backup request,this var save the real timeout for current call int64_t _real_timeout_ms; // Deadline of this RPC (since the Epoch in microseconds). diff --git a/test/brpc_channel_unittest.cpp b/test/brpc_channel_unittest.cpp index 0d25dd7db1..b9437820e9 100644 --- a/test/brpc_channel_unittest.cpp +++ b/test/brpc_channel_unittest.cpp @@ -1972,6 +1972,8 @@ class ChannelTest : public ::testing::Test{ return backup; } + void OnRPCEnd(const brpc::Controller*) override {} + bool backup{true}; }; From 6a69c2d1447e8e37f206fa1ff2e74e3e6dadca15 Mon Sep 17 00:00:00 2001 From: Bright Chen Date: Fri, 20 Sep 2024 22:16:07 +0800 Subject: [PATCH 5/5] Avoid to block the timer thread in HandleSocketFailed --- src/brpc/controller.cpp | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp index 18cb9b109f..afebb3c282 100644 --- a/src/brpc/controller.cpp +++ b/src/brpc/controller.cpp @@ -617,6 +617,13 @@ void Controller::OnVersionedRPCReturned(const CompletionInfo& info, goto END_OF_RPC; } if (_error_code == EBACKUPREQUEST) { + if (NULL != _backup_request_policy && !_backup_request_policy->DoBackup(this)) { + // No need to do backup request. + _error_code = saved_error; + CHECK_EQ(0, bthread_id_unlock(info.id)); + return; + } + // Reset timeout if needed int rc = 0; if (timeout_ms() >= 0) { @@ -1278,11 +1285,6 @@ int Controller::HandleSocketFailed(bthread_id_t id, void* data, int error_code, cntl->timeout_ms(), butil::endpoint2str(cntl->remote_side()).c_str()); } else if (error_code == EBACKUPREQUEST) { - BackupRequestPolicy* policy = cntl->_backup_request_policy; - if (NULL != policy && !policy->DoBackup(cntl)) { - // No need to do backup request. - return bthread_id_unlock(id); - } cntl->SetFailed(error_code, "Reached backup timeout=%" PRId64 "ms @%s", cntl->backup_request_ms(), butil::endpoint2str(cntl->remote_side()).c_str());