Skip to content

Commit

Permalink
Refactor cache_filter to expect caches to post cb
Browse files Browse the repository at this point in the history
Signed-off-by: Raven Black <[email protected]>
  • Loading branch information
ravenblackx committed Sep 17, 2024
1 parent 02ae005 commit 948e1b4
Show file tree
Hide file tree
Showing 14 changed files with 289 additions and 232 deletions.
69 changes: 18 additions & 51 deletions source/extensions/filters/http/cache/cache_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -303,38 +303,19 @@ CacheFilter::resolveLookupStatus(absl::optional<CacheEntryStatus> cache_entry_st

void CacheFilter::getHeaders(Http::RequestHeaderMap& request_headers) {
ASSERT(lookup_, "CacheFilter is trying to call getHeaders with no LookupContext");

// If the cache posts a callback to the dispatcher then the CacheFilter is destroyed for any
// reason (e.g client disconnected and HTTP stream terminated), then there is no guarantee that
// the posted callback will run before the filter is deleted. Hence, a weak_ptr to the CacheFilter
// is captured and used to make sure the CacheFilter is still alive before accessing it in the
// posted callback.
// TODO(yosrym93): Look into other options for handling this (also in getBody and getTrailers) as
// they arise, e.g. cancellable posts, guaranteed ordering of posted callbacks and deletions, etc.
CacheFilterWeakPtr self = weak_from_this();

// The dispatcher needs to be captured because there's no guarantee that
// decoder_callbacks_->dispatcher() is thread-safe.
lookup_->getHeaders([self, &request_headers, &dispatcher = decoder_callbacks_->dispatcher()](
callback_called_directly_ = true;
lookup_->getHeaders([this, &request_headers, &dispatcher = decoder_callbacks_->dispatcher()](
LookupResult&& result, bool end_stream) {
// The callback is posted to the dispatcher to make sure it is called on the worker thread.
dispatcher.post([self, &request_headers, result = std::move(result), end_stream]() mutable {
if (CacheFilterSharedPtr cache_filter = self.lock()) {
cache_filter->onHeaders(std::move(result), request_headers, end_stream);
}
});
ASSERT(!callback_called_directly_ && dispatcher.isThreadSafe(),
"caches must post the callback to the filter's dispatcher");
onHeaders(std::move(result), request_headers, end_stream);
});
callback_called_directly_ = false;
}

void CacheFilter::getBody() {
ASSERT(lookup_, "CacheFilter is trying to call getBody with no LookupContext");
ASSERT(!remaining_ranges_.empty(), "No reason to call getBody when there's no body to get.");
// If the cache posts a callback to the dispatcher then the CacheFilter is destroyed for any
// reason (e.g client disconnected and HTTP stream terminated), then there is no guarantee that
// the posted callback will run before the filter is deleted. Hence, a weak_ptr to the CacheFilter
// is captured and used to make sure the CacheFilter is still alive before accessing it in the
// posted callback.
CacheFilterWeakPtr self = weak_from_this();

// We don't want to request more than a buffer-size at a time from the cache.
uint64_t fetch_size_limit = encoder_callbacks_->encoderBufferLimit();
Expand All @@ -347,41 +328,27 @@ void CacheFilter::getBody() {
? (remaining_ranges_[0].begin() + fetch_size_limit)
: remaining_ranges_[0].end()};

// The dispatcher needs to be captured because there's no guarantee that
// decoder_callbacks_->dispatcher() is thread-safe.
lookup_->getBody(fetch_range, [self, &dispatcher = decoder_callbacks_->dispatcher()](
callback_called_directly_ = true;
lookup_->getBody(fetch_range, [this, &dispatcher = decoder_callbacks_->dispatcher()](
Buffer::InstancePtr&& body, bool end_stream) {
// The callback is posted to the dispatcher to make sure it is called on the worker thread.
dispatcher.post([self, body = std::move(body), end_stream]() mutable {
if (CacheFilterSharedPtr cache_filter = self.lock()) {
cache_filter->onBody(std::move(body), end_stream);
}
});
ASSERT(!callback_called_directly_ && dispatcher.isThreadSafe(),
"caches must post the callback to the filter's dispatcher");
onBody(std::move(body), end_stream);
});
callback_called_directly_ = false;
}

void CacheFilter::getTrailers() {
ASSERT(lookup_, "CacheFilter is trying to call getTrailers with no LookupContext");

// If the cache posts a callback to the dispatcher then the CacheFilter is destroyed for any
// reason (e.g client disconnected and HTTP stream terminated), then there is no guarantee that
// the posted callback will run before the filter is deleted. Hence, a weak_ptr to the CacheFilter
// is captured and used to make sure the CacheFilter is still alive before accessing it in the
// posted callback.
CacheFilterWeakPtr self = weak_from_this();

// The dispatcher needs to be captured because there's no guarantee that
// decoder_callbacks_->dispatcher() is thread-safe.
lookup_->getTrailers([self, &dispatcher = decoder_callbacks_->dispatcher()](
callback_called_directly_ = true;
lookup_->getTrailers([this, &dispatcher = decoder_callbacks_->dispatcher()](
Http::ResponseTrailerMapPtr&& trailers) {
// The callback is posted to the dispatcher to make sure it is called on the worker thread.
// The lambda must be mutable as it captures trailers as a unique_ptr.
dispatcher.post([self, trailers = std::move(trailers)]() mutable {
if (CacheFilterSharedPtr cache_filter = self.lock()) {
cache_filter->onTrailers(std::move(trailers));
}
});
ASSERT(!callback_called_directly_ && dispatcher.isThreadSafe(),
"caches must post the callback to the filter's dispatcher");
onTrailers(std::move(trailers));
});
callback_called_directly_ = false;
}

void CacheFilter::onHeaders(LookupResult&& result, Http::RequestHeaderMap& request_headers,
Expand Down
2 changes: 2 additions & 0 deletions source/extensions/filters/http/cache/cache_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@ class CacheFilter : public Http::PassThroughFilter,
FilterState filter_state_ = FilterState::Initial;

bool is_head_request_ = false;
// This toggle is used to detect callbacks being called directly and not posted.
bool callback_called_directly_ = false;
// The status of the insert operation or header update, or decision not to insert or update.
// If it's too early to determine the final status, this is empty.
absl::optional<InsertStatus> insert_status_;
Expand Down
114 changes: 55 additions & 59 deletions source/extensions/filters/http/cache/cache_insert_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class CacheInsertFragment {
// on_complete is called when the cache completes the operation.
virtual void
send(InsertContext& context,
std::function<void(bool cache_success, bool end_stream, size_t sz)> on_complete) PURE;
absl::AnyInvocable<void(bool cache_success, bool end_stream, size_t sz)> on_complete) PURE;

virtual ~CacheInsertFragment() = default;
};
Expand All @@ -27,14 +27,14 @@ class CacheInsertFragmentBody : public CacheInsertFragment {
CacheInsertFragmentBody(const Buffer::Instance& buffer, bool end_stream)
: buffer_(buffer), end_stream_(end_stream) {}

void
send(InsertContext& context,
std::function<void(bool cache_success, bool end_stream, size_t sz)> on_complete) override {
void send(InsertContext& context,
absl::AnyInvocable<void(bool cache_success, bool end_stream, size_t sz)> on_complete)
override {
size_t sz = buffer_.length();
context.insertBody(
std::move(buffer_),
[on_complete, end_stream = end_stream_, sz](bool cache_success) {
on_complete(cache_success, end_stream, sz);
[cb = std::move(on_complete), end_stream = end_stream_, sz](bool cache_success) mutable {
std::move(cb)(cache_success, end_stream, sz);
},
end_stream_);
}
Expand All @@ -52,14 +52,15 @@ class CacheInsertFragmentTrailers : public CacheInsertFragment {
Http::ResponseTrailerMapImpl::copyFrom(*trailers_, trailers);
}

void
send(InsertContext& context,
std::function<void(bool cache_success, bool end_stream, size_t sz)> on_complete) override {
void send(InsertContext& context,
absl::AnyInvocable<void(bool cache_success, bool end_stream, size_t sz)> on_complete)
override {
// While zero isn't technically true for the size of trailers, it doesn't
// matter at this point because watermarks after the stream is complete
// aren't useful.
context.insertTrailers(
*trailers_, [on_complete](bool cache_success) { on_complete(cache_success, true, 0); });
context.insertTrailers(*trailers_, [cb = std::move(on_complete)](bool cache_success) mutable {
std::move(cb)(cache_success, true, 0);
});
}

private:
Expand All @@ -72,7 +73,7 @@ CacheInsertQueue::CacheInsertQueue(std::shared_ptr<HttpCache> cache,
: dispatcher_(encoder_callbacks.dispatcher()), insert_context_(std::move(insert_context)),
low_watermark_bytes_(encoder_callbacks.encoderBufferLimit() / 2),
high_watermark_bytes_(encoder_callbacks.encoderBufferLimit()),
encoder_callbacks_(encoder_callbacks), abort_callback_(abort), cache_(cache) {}
encoder_callbacks_(encoder_callbacks), abort_callback_(std::move(abort)), cache_(cache) {}

void CacheInsertQueue::insertHeaders(const Http::ResponseHeaderMap& response_headers,
const ResponseMetadata& metadata, bool end_stream) {
Expand Down Expand Up @@ -123,59 +124,54 @@ void CacheInsertQueue::insertTrailers(const Http::ResponseTrailerMap& trailers)
}

void CacheInsertQueue::onFragmentComplete(bool cache_success, bool end_stream, size_t sz) {
// If the cache implementation is asynchronous, this may be called from whatever
// thread that cache implementation runs on. Therefore, we post it to the
// dispatcher to be certain any callbacks and updates are called on the filter's
// thread (and therefore we don't have to mutex-guard anything).
dispatcher_.post([this, cache_success, end_stream, sz]() {
fragment_in_flight_ = false;
if (aborting_) {
// Parent filter was destroyed, so we can quit this operation.
fragments_.clear();
self_ownership_.reset();
return;
ASSERT(dispatcher_.isThreadSafe());
fragment_in_flight_ = false;
if (aborting_) {
// Parent filter was destroyed, so we can quit this operation.
fragments_.clear();
self_ownership_.reset();
return;
}
ASSERT(queue_size_bytes_ >= sz, "queue can't be emptied by more than its size");
queue_size_bytes_ -= sz;
if (watermarked_ && queue_size_bytes_ <= low_watermark_bytes_) {
if (encoder_callbacks_.has_value()) {
encoder_callbacks_.value().get().onEncoderFilterBelowWriteBufferLowWatermark();
}
ASSERT(queue_size_bytes_ >= sz, "queue can't be emptied by more than its size");
queue_size_bytes_ -= sz;
if (watermarked_ && queue_size_bytes_ <= low_watermark_bytes_) {
watermarked_ = false;
}
if (!cache_success) {
// canceled by cache; unwatermark if necessary, inform the filter if
// it's still around, and delete the queue.
if (watermarked_) {
if (encoder_callbacks_.has_value()) {
encoder_callbacks_.value().get().onEncoderFilterBelowWriteBufferLowWatermark();
}
watermarked_ = false;
}
if (!cache_success) {
// canceled by cache; unwatermark if necessary, inform the filter if
// it's still around, and delete the queue.
if (watermarked_) {
if (encoder_callbacks_.has_value()) {
encoder_callbacks_.value().get().onEncoderFilterBelowWriteBufferLowWatermark();
}
watermarked_ = false;
}
fragments_.clear();
// Clearing self-ownership might provoke the destructor, so take a copy of the
// abort callback to avoid reading from 'this' after it may be deleted.
auto abort_callback = abort_callback_;
self_ownership_.reset();
abort_callback();
return;
}
if (end_stream) {
ASSERT(fragments_.empty(), "ending a stream with the queue not empty is a bug");
ASSERT(!watermarked_, "being over the high watermark when the queue is empty makes no sense");
self_ownership_.reset();
return;
}
if (!fragments_.empty()) {
// If there's more in the queue, push the next fragment to the cache.
auto fragment = std::move(fragments_.front());
fragments_.pop_front();
fragment_in_flight_ = true;
fragment->send(*insert_context_, [this](bool cache_success, bool end_stream, size_t sz) {
onFragmentComplete(cache_success, end_stream, sz);
});
}
});
fragments_.clear();
// Clearing self-ownership might provoke the destructor, so take a copy of the
// abort callback to avoid reading from 'this' after it may be deleted.
auto abort_callback = std::move(abort_callback_);
self_ownership_.reset();
std::move(abort_callback)();
return;
}
if (end_stream) {
ASSERT(fragments_.empty(), "ending a stream with the queue not empty is a bug");
ASSERT(!watermarked_, "being over the high watermark when the queue is empty makes no sense");
self_ownership_.reset();
return;
}
if (!fragments_.empty()) {
// If there's more in the queue, push the next fragment to the cache.
auto fragment = std::move(fragments_.front());
fragments_.pop_front();
fragment_in_flight_ = true;
fragment->send(*insert_context_, [this](bool cache_success, bool end_stream, size_t sz) {
onFragmentComplete(cache_success, end_stream, sz);
});
}
}

void CacheInsertQueue::setSelfOwned(std::unique_ptr<CacheInsertQueue> self) {
Expand Down
2 changes: 1 addition & 1 deletion source/extensions/filters/http/cache/cache_insert_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace Cache {

using OverHighWatermarkCallback = std::function<void()>;
using UnderLowWatermarkCallback = std::function<void()>;
using AbortInsertCallback = std::function<void()>;
using AbortInsertCallback = absl::AnyInvocable<void()>;
class CacheInsertFragment;

// This queue acts as an intermediary between CacheFilter and the cache
Expand Down
40 changes: 25 additions & 15 deletions source/extensions/filters/http/cache/http_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,19 +122,20 @@ struct CacheInfo {
bool supports_range_requests_ = false;
};

using LookupBodyCallback = std::function<void(Buffer::InstancePtr&&, bool end_stream)>;
using LookupHeadersCallback = std::function<void(LookupResult&&, bool end_stream)>;
using LookupTrailersCallback = std::function<void(Http::ResponseTrailerMapPtr&&)>;
using InsertCallback = std::function<void(bool success_ready_for_more)>;
using LookupBodyCallback = absl::AnyInvocable<void(Buffer::InstancePtr&&, bool end_stream)>;
using LookupHeadersCallback = absl::AnyInvocable<void(LookupResult&&, bool end_stream)>;
using LookupTrailersCallback = absl::AnyInvocable<void(Http::ResponseTrailerMapPtr&&)>;
using InsertCallback = absl::AnyInvocable<void(bool success_ready_for_more)>;
using UpdateHeadersCallback = absl::AnyInvocable<void(bool)>;

// Manages the lifetime of an insertion.
class InsertContext {
public:
// Accepts response_headers for caching. Only called once.
//
// Implementations MUST call insert_complete(true) on success, or
// insert_complete(false) to attempt to abort the insertion. This
// call may be made asynchronously, but any async operation that can
// Implementations MUST post to the filter's dispatcher insert_complete(true)
// on success, or insert_complete(false) to attempt to abort the insertion.
// This call may be made asynchronously, but any async operation that can
// potentially silently fail must include a timeout, to avoid memory leaks.
virtual void insertHeaders(const Http::ResponseHeaderMap& response_headers,
const ResponseMetadata& metadata, InsertCallback insert_complete,
Expand All @@ -149,17 +150,17 @@ class InsertContext {
// InsertContextPtr. A cache can abort the insertion by passing 'false' into
// ready_for_next_fragment.
//
// The cache implementation MUST call ready_for_next_fragment. This call may be
// made asynchronously, but any async operation that can potentially silently
// fail must include a timeout, to avoid memory leaks.
// The cache implementation MUST post ready_for_next_fragment to the filter's
// dispatcher. This post may be made asynchronously, but any async operation
// that can potentially silently fail must include a timeout, to avoid memory leaks.
virtual void insertBody(const Buffer::Instance& fragment, InsertCallback ready_for_next_fragment,
bool end_stream) PURE;

// Inserts trailers into the cache.
//
// The cache implementation MUST call insert_complete. This call may be
// made asynchronously, but any async operation that can potentially silently
// fail must include a timeout, to avoid memory leaks.
// The cache implementation MUST post insert_complete to the filter's dispatcher.
// This call may be made asynchronously, but any async operation that can
// potentially silently fail must include a timeout, to avoid memory leaks.
virtual void insertTrailers(const Http::ResponseTrailerMap& trailers,
InsertCallback insert_complete) PURE;

Expand Down Expand Up @@ -199,6 +200,9 @@ class LookupContext {
// implementation should wait until that is known before calling the callback,
// and must pass a LookupResult with range_details_->satisfiable_ = false
// if the request is invalid.
//
// A cache that posts the callback must wrap it such that if the LookupContext is
// destroyed before the callback is executed, the callback is not executed.
virtual void getHeaders(LookupHeadersCallback&& cb) PURE;

// Reads the next fragment from the cache, calling cb when the fragment is ready.
Expand Down Expand Up @@ -228,11 +232,17 @@ class LookupContext {
// getBody requests bytes 0-23 .......... callback with bytes 0-9
// getBody requests bytes 10-23 .......... callback with bytes 10-19
// getBody requests bytes 20-23 .......... callback with bytes 20-23
//
// A cache that posts the callback must wrap it such that if the LookupContext is
// destroyed before the callback is executed, the callback is not executed.
virtual void getBody(const AdjustedByteRange& range, LookupBodyCallback&& cb) PURE;

// Get the trailers from the cache. Only called if the request reached the end of
// the body and LookupBodyCallback did not pass true for end_stream. The
// Http::ResponseTrailerMapPtr passed to cb must not be null.
//
// A cache that posts the callback must wrap it such that if the LookupContext is
// destroyed before the callback is executed, the callback is not executed.
virtual void getTrailers(LookupTrailersCallback&& cb) PURE;

// This routine is called prior to a LookupContext being destroyed. LookupContext is responsible
Expand All @@ -248,7 +258,7 @@ class LookupContext {
// 5. [Other thread] RPC completes and calls RPCLookupContext::onRPCDone.
// --> RPCLookupContext's destructor and onRpcDone cause a data race in RPCLookupContext.
// onDestroy() should cancel any outstanding async operations and, if necessary,
// it should block on that cancellation to avoid data races. InsertContext must not invoke any
// it should block on that cancellation to avoid data races. LookupContext must not invoke any
// callbacks to the CacheFilter after having onDestroy() invoked.
virtual void onDestroy() PURE;

Expand Down Expand Up @@ -289,7 +299,7 @@ class HttpCache {
virtual void updateHeaders(const LookupContext& lookup_context,
const Http::ResponseHeaderMap& response_headers,
const ResponseMetadata& metadata,
std::function<void(bool)> on_complete) PURE;
UpdateHeadersCallback on_complete) PURE;

// Returns statically known information about a cache.
virtual CacheInfo cacheInfo() const PURE;
Expand Down
Loading

0 comments on commit 948e1b4

Please sign in to comment.