Skip to content

Commit

Permalink
Todo cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
Sushisource committed Feb 8, 2025
1 parent 6f9801d commit 7b4e70c
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 25 deletions.
41 changes: 38 additions & 3 deletions client/src/retry.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::{
raw::IsUserLongPoll, Client, IsWorkerTaskLongPoll, NamespacedClient, Result, RetryConfig,
raw::IsUserLongPoll, Client, IsWorkerTaskLongPoll, NamespacedClient, NoRetryOnMatching, Result,
RetryConfig,
};
use backoff::{backoff::Backoff, exponential::ExponentialBackoff, Clock, SystemClock};
use futures_retry::{ErrorHandler, FutureRetry, RetryPolicy};
Expand Down Expand Up @@ -58,24 +59,27 @@ impl<SG> RetryClient<SG> {
request: Option<&Request<R>>,
) -> CallInfo {
let mut call_type = CallType::Normal;
let mut retry_short_circuit = None;
if let Some(r) = request.as_ref() {
let ext = r.extensions();
if ext.get::<IsUserLongPoll>().is_some() {
call_type = CallType::UserLongPoll;
} else if ext.get::<IsWorkerTaskLongPoll>().is_some() {
call_type = CallType::TaskLongPoll;
}

retry_short_circuit = ext.get::<NoRetryOnMatching>().cloned();
}
let retry_cfg = if call_type == CallType::TaskLongPoll {
RetryConfig::task_poll_retry_policy()
} else {
(*self.retry_config).clone()
};
// TODO: Set retry short-circuits
CallInfo {
call_type,
call_name,
retry_cfg,
retry_short_circuit,
}
}

Expand Down Expand Up @@ -112,6 +116,7 @@ pub(crate) struct TonicErrorHandler<C: Clock> {
call_type: CallType,
call_name: &'static str,
have_retried_goaway_cancel: bool,
retry_short_circuit: Option<NoRetryOnMatching>,
}
impl TonicErrorHandler<SystemClock> {
fn new(call_info: CallInfo, throttle_cfg: RetryConfig) -> Self {
Expand Down Expand Up @@ -140,6 +145,7 @@ where
backoff: call_info.retry_cfg.into_exp_backoff(clock),
throttle_backoff: throttle_cfg.into_exp_backoff(throttle_clock),
have_retried_goaway_cancel: false,
retry_short_circuit: call_info.retry_short_circuit,
}
}

Expand All @@ -165,11 +171,12 @@ where
}
}

#[derive(Clone, Debug, PartialEq)]
#[derive(Clone, Debug)]
pub(crate) struct CallInfo {
pub call_type: CallType,
call_name: &'static str,
retry_cfg: RetryConfig,
retry_short_circuit: Option<NoRetryOnMatching>,
}

#[doc(hidden)]
Expand Down Expand Up @@ -200,6 +207,12 @@ where
return RetryPolicy::ForwardError(e);
}

if let Some(sc) = self.retry_short_circuit.as_ref() {
if (sc.predicate)(&e) {
return RetryPolicy::ForwardError(e);
}
}

// Task polls are OK with being cancelled or running into the timeout because there's
// nothing to do but retry anyway
let long_poll_allowed = self.call_type == CallType::TaskLongPoll
Expand Down Expand Up @@ -306,6 +319,7 @@ mod tests {
call_type: CallType::TaskLongPoll,
call_name,
retry_cfg: TEST_RETRY_CONFIG,
retry_short_circuit: None,
},
TEST_RETRY_CONFIG,
FixedClock(Instant::now()),
Expand Down Expand Up @@ -333,6 +347,7 @@ mod tests {
call_type: CallType::TaskLongPoll,
call_name,
retry_cfg: TEST_RETRY_CONFIG,
retry_short_circuit: None,
},
TEST_RETRY_CONFIG,
FixedClock(Instant::now()),
Expand All @@ -358,6 +373,7 @@ mod tests {
call_type: CallType::TaskLongPoll,
call_name: POLL_WORKFLOW_METH_NAME,
retry_cfg: TEST_RETRY_CONFIG,
retry_short_circuit: None,
},
RetryConfig {
initial_interval: Duration::from_millis(2),
Expand Down Expand Up @@ -388,6 +404,25 @@ mod tests {
}
}

#[tokio::test]
async fn retry_short_circuit() {
let mut err_handler = TonicErrorHandler::new_with_clock(
CallInfo {
call_type: CallType::TaskLongPoll,
call_name: POLL_WORKFLOW_METH_NAME,
retry_cfg: TEST_RETRY_CONFIG,
retry_short_circuit: Some(NoRetryOnMatching {
predicate: |s: &Status| s.code() == Code::ResourceExhausted,
}),
},
TEST_RETRY_CONFIG,
FixedClock(Instant::now()),
FixedClock(Instant::now()),
);
let result = err_handler.handle(1, Status::new(Code::ResourceExhausted, "leave me alone"));
assert_matches!(result, RetryPolicy::ForwardError(_))
}

#[rstest::rstest]
#[tokio::test]
async fn task_poll_retries_forever<R>(
Expand Down
16 changes: 0 additions & 16 deletions core-api/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,22 +218,6 @@ impl WorkerConfigBuilder {
return Err("max_outstanding_* fields are mutually exclusive with `tuner`".to_owned());
}

// TODO: Does this really matter with how slot suppliers work now?
// let max_wft_polls = self
// .max_concurrent_wft_polls
// .unwrap_or(MAX_CONCURRENT_WFT_POLLS_DEFAULT);
//
// // It wouldn't make any sense to have more outstanding polls than workflows we can possibly
// // cache. If we allow this at low values it's possible for sticky pollers to reserve all
// // available slots, crowding out the normal queue and gumming things up.
// if let Some(max_cache) = self.max_cached_workflows {
// if max_cache > 0 && max_wft_polls > max_cache {
// return Err(
// "`max_concurrent_wft_polls` cannot exceed `max_cached_workflows`".to_owned(),
// );
// }
// }

if self.use_worker_versioning.unwrap_or_default()
&& self
.worker_build_id
Expand Down
7 changes: 2 additions & 5 deletions core/src/pollers/poll_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,6 @@ impl PollScalerReportHandle {
return;
}
if let Some(scaling_decision) = res.scaling_decision() {
warn!("Got sd {:?}", scaling_decision);
match scaling_decision.poller_delta.cmp(&0) {
cmp::Ordering::Less => self.change_target(
usize::saturating_sub,
Expand All @@ -316,18 +315,16 @@ impl PollScalerReportHandle {
// We want to avoid scaling down on empty polls if the server has never made any scaling
// decisions - otherwise we might never scale up again.
else if self.ever_saw_scaling_decision.load(Ordering::Relaxed) && res.is_empty() {
warn!("Removing poller - empty response");
self.change_target(usize::saturating_sub, 1);
}
}
Err(e) => {
// We should only see (and react to) errors in autoscaling mode
if matches!(self.behavior, PollerBehavior::Autoscaling { .. }) {
// TODO: Make debug before merge
warn!("Got error from server: {:?}", e);
debug!("Got error from server while polling: {:?}", e);
// TODO (REVIEW): A concern here is we can bounce off of ratelimiter
// because server keeps telling us "scale up!" and then we hit ratelimit
// and halve again.
// and halve again. Not necessarily a huge issue, but open to ideas to fix.
if e.code() == Code::ResourceExhausted {
// Scale down significantly for resource exhaustion
self.change_target(usize::saturating_div, 2);
Expand Down
1 change: 0 additions & 1 deletion tests/manual_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ async fn poller_load_spiky() {
})
.await;
info!("Initial load ran for {:?}", start_processing.elapsed());
// TODO: Maybe send signals for round two
ah.abort();
// Wait a minute for poller count to drop
tokio::time::sleep(Duration::from_secs(60)).await;
Expand Down

0 comments on commit 7b4e70c

Please sign in to comment.