Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(task): cancel timeout task in case of memory leak #2143

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions src/rpc/rpc_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,12 @@ bool rpc_client_matcher::on_recv_reply(network *net, uint64_t key, message_ex *r
DCHECK_NOTNULL(timeout_task, "rpc timeout task cannot be nullptr");

if (timeout_task != task::get_current_task()) {
timeout_task->cancel(false); // no need to wait
timeout_task->cancel_delay_timer();
timeout_task->cancel(false); // No need to wait.
}

auto req = call->get_request();
auto spec = task_spec::get(req->local_rpc_code);
auto *req = call->get_request();
auto *spec = task_spec::get(req->local_rpc_code);

// if rpc is early terminated with empty reply
if (nullptr == reply) {
Expand Down
3 changes: 3 additions & 0 deletions src/runtime/api_layer1.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@ dsn_threadpool_code_register.
; thread/worker count
worker_count = 2

; the number of threads used for timers per worker
timer_thread_count_per_worker = 1

; task worker provider name
worker_factory_name =

Expand Down
8 changes: 5 additions & 3 deletions src/runtime/fault_injector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -317,11 +317,12 @@ void fault_injector::install(service_spec &spec)

s_fj_opts = new fj_opt[dsn::task_code::max() + 1];
fj_opt default_opt;
read_config("task..default", default_opt);
read_config("task..default", default_opt, nullptr);

for (int i = 0; i <= dsn::task_code::max(); i++) {
if (i == TASK_CODE_INVALID)
if (i == TASK_CODE_INVALID) {
continue;
}

std::string section_name = fmt::format("task.{}", dsn::task_code(i));
task_spec *spec = task_spec::get(i);
Expand All @@ -330,8 +331,9 @@ void fault_injector::install(service_spec &spec)
fj_opt &lopt = s_fj_opts[i];
read_config(section_name.c_str(), lopt, &default_opt);

if (!lopt.fault_injection_enabled)
if (!lopt.fault_injection_enabled) {
continue;
}

spec->on_task_enqueue.put_back(fault_on_task_enqueue, "fault_injector");
spec->on_task_begin.put_back(fault_on_task_begin, "fault_injector");
Expand Down
75 changes: 59 additions & 16 deletions src/task/simple_task_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@

#include "simple_task_queue.h"

#include <cstdio>
#include <cstdint>
#include <memory>
#include <string>
#include <utility>

#include "boost/asio/basic_deadline_timer.hpp"
Expand All @@ -43,6 +44,7 @@
#include "boost/asio/io_service.hpp"
#include "boost/date_time/posix_time/posix_time_duration.hpp"
#include "boost/system/detail/error_code.hpp"
#include "fmt/core.h"
#include "runtime/tool_api.h"
#include "task.h"
#include "task/task_queue.h"
Expand All @@ -63,26 +65,30 @@ simple_timer_service::simple_timer_service(service_node *node, timer_service *in
{
}

void simple_timer_service::start()
void simple_timer_service::start(int thread_count)
{
if (_is_running) {
return;
}

_worker = std::thread([this]() {
task::set_tls_dsn_context(node(), nullptr);
for (uint32_t i = 0; i < thread_count; ++i) {
_workers.emplace_back([this]() {
task::set_tls_dsn_context(node(), nullptr);

char buffer[128];
sprintf(buffer, "%s.timer", get_service_node_name(node()));
std::string name(fmt::format("{}.timer", get_service_node_name(node())));

task_worker::set_name(buffer);
task_worker::set_priority(worker_priority_t::THREAD_xPRIORITY_ABOVE_NORMAL);
task_worker::set_name(name.c_str());
task_worker::set_priority(worker_priority_t::THREAD_xPRIORITY_ABOVE_NORMAL);

LOG_INFO("{} thread started", name);

boost::asio::io_service::work work(_ios);
boost::system::error_code ec;
_ios.run(ec);
CHECK(!ec, "io_service in simple_timer_service run failed: {}", ec.message());
});
}

boost::asio::io_service::work work(_ios);
boost::system::error_code ec;
_ios.run(ec);
CHECK(!ec, "io_service in simple_timer_service run failed: {}", ec.message());
});
_is_running = true;
}

Expand All @@ -93,24 +99,61 @@ void simple_timer_service::stop()
}

_ios.stop();
_worker.join();

for (auto &worker : _workers) {
worker.join();
}

_is_running = false;
}

namespace {

class simple_delay_timer : public dsn::task::delay_timer
{
public:
explicit simple_delay_timer(std::shared_ptr<boost::asio::deadline_timer> timer)
: _timer(std::move(timer))
{
}
~simple_delay_timer() override = default;

void cancel() override
{
if (!_timer) {
return;
}

_timer->cancel();
}

private:
const std::shared_ptr<boost::asio::deadline_timer> _timer;

DISALLOW_COPY_AND_ASSIGN(simple_delay_timer);
DISALLOW_MOVE_AND_ASSIGN(simple_delay_timer);
};

} // anonymous namespace

void simple_timer_service::add_timer(task *task)
{
std::shared_ptr<boost::asio::deadline_timer> timer(new boost::asio::deadline_timer(_ios));
timer->expires_from_now(boost::posix_time::milliseconds(task->delay_milliseconds()));
task->set_delay(0);

task->set_delay_timer(std::make_unique<simple_delay_timer>(timer));

timer->async_wait([task, timer](const boost::system::error_code &ec) {
if (!ec) {
task->enqueue();
} else if (ec != ::boost::asio::error::operation_aborted) {
} else if (ec != boost::asio::error::operation_aborted) {
LOG_FATAL("timer failed for task {}, err = {}", task->spec().name, ec.message());
}

// to consume the added ref count by task::enqueue for add_timer
task->reset_delay_timer();

// To consume the added ref count by task::enqueue for add_timer.
task->release_ref();
});
}
Expand Down
17 changes: 9 additions & 8 deletions src/task/simple_task_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

#include <algorithm>
#include <thread>
#include <vector>

#include "boost/asio/io_service.hpp"
#include "task_code.h"
Expand All @@ -48,11 +49,11 @@ class simple_task_queue : public task_queue

~simple_task_queue() override = default;

virtual void enqueue(task *task) override;
virtual task *dequeue(/*inout*/ int &batch_size) override;
void enqueue(task *task) override;
task *dequeue(/*inout*/ int &batch_size) override;

private:
typedef utils::blocking_priority_queue<task *, TASK_PRIORITY_COUNT> tqueue;
using tqueue = utils::blocking_priority_queue<task *, TASK_PRIORITY_COUNT>;
tqueue _samples;
};

Expand All @@ -61,18 +62,18 @@ class simple_timer_service : public timer_service
public:
simple_timer_service(service_node *node, timer_service *inner_provider);

~simple_timer_service() override { stop(); }
~simple_timer_service() override = default;

// after milliseconds, the provider should call task->enqueue()
virtual void add_timer(task *task) override;
void add_timer(task *task) override;

virtual void start() override;
void start(int thread_count) override;

virtual void stop() override;
void stop() override;

private:
boost::asio::io_service _ios;
std::thread _worker;
std::vector<std::thread> _workers;
bool _is_running;
};

Expand Down
2 changes: 1 addition & 1 deletion src/task/task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ task::~task()
}
}

bool task::set_retry(bool enqueue_immediately /*= true*/)
bool task::set_retry(bool enqueue_immediately)
{
task_state RUNNING_STATE = TASK_STATE_RUNNING;
if (_state.compare_exchange_strong(
Expand Down
33 changes: 31 additions & 2 deletions src/task/task.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include <chrono>
#include <functional>
#include <list>
#include <memory>
#include <tuple>
#include <utility>
#include <vector>
Expand Down Expand Up @@ -215,12 +216,38 @@ class task : public ref_counter, public extensible_object<task, 4>
// return:
// true : change task state from TASK_STATE_RUNNING to TASK_STATE_READY succeed
// false : change task state failed
bool set_retry(bool enqueue_immediately = true);
bool set_retry(bool enqueue_immediately);

void set_error_code(error_code err) { _error = err; }
void set_delay(int delay_milliseconds = 0) { _delay_milliseconds = delay_milliseconds; }
void set_delay(int delay_milliseconds) { _delay_milliseconds = delay_milliseconds; }
void set_tracker(task_tracker *tracker) { _context_tracker.set_tracker(tracker, this); }

// Control the timer that is used to launch a task if it is delayed.
class delay_timer
{
public:
delay_timer() = default;
virtual ~delay_timer() = default;

// Cancel the timer.
virtual void cancel() = 0;

private:
DISALLOW_COPY_AND_ASSIGN(delay_timer);
DISALLOW_MOVE_AND_ASSIGN(delay_timer);
};

void set_delay_timer(std::unique_ptr<delay_timer> timer) { _delay_timer = std::move(timer); }
void cancel_delay_timer()
{
if (!_delay_timer) {
return;
}

_delay_timer->cancel();
}
void reset_delay_timer() { _delay_timer.reset(); }

uint64_t id() const { return _task_id; }
task_state state() const { return _state.load(std::memory_order_acquire); }
task_code code() const { return _spec->code; }
Expand Down Expand Up @@ -312,6 +339,8 @@ class task : public ref_counter, public extensible_object<task, 4>
service_node *_node;
trackable_task _context_tracker; // when tracker is gone, the task is cancelled automatically

std::unique_ptr<delay_timer> _delay_timer;

public:
// used by task queue only
task *next;
Expand Down
Loading
Loading