Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Failure to create an index with ingest v2 returns 429 #5719

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions quickwit/quickwit-ingest/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ impl From<IngestFailure> for IngestServiceError {
IngestFailureReason::CircuitBreaker => {
IngestServiceError::RateLimited(RateLimitingCause::CircuitBreaker)
}
IngestFailureReason::Unavailable => {
IngestServiceError::Unavailable("internal service unavailable".to_string())
}
}
}
}
Expand Down
62 changes: 44 additions & 18 deletions quickwit/quickwit-ingest/src/ingest_v2/debouncing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@ use std::collections::HashMap;
use std::sync::Arc;

use quickwit_proto::control_plane::{
GetOrCreateOpenShardsRequest, GetOrCreateOpenShardsSubrequest,
ControlPlaneError, GetOrCreateOpenShardsFailure, GetOrCreateOpenShardsRequest,
GetOrCreateOpenShardsSubrequest,
};
use quickwit_proto::ingest::ShardIds;
use quickwit_proto::types::{IndexId, SourceId};
use quickwit_proto::types::{IndexId, SourceId, SubrequestId};
use tokio::sync::{OwnedRwLockWriteGuard, RwLock};
use tracing::error;

#[derive(Default)]
struct Debouncer(Arc<RwLock<()>>);
struct Debouncer(Arc<RwLock<Option<GetOrCreateOpenShardsErrorDebounced>>>);

impl Debouncer {
fn acquire(&self) -> Result<PermitGuard, BarrierGuard> {
Expand All @@ -37,14 +39,14 @@ impl Debouncer {
}

#[derive(Debug)]
pub(super) struct PermitGuard(#[allow(dead_code)] OwnedRwLockWriteGuard<()>);
pub(super) struct PermitGuard(OwnedRwLockWriteGuard<Option<GetOrCreateOpenShardsErrorDebounced>>);

#[derive(Debug)]
pub(super) struct BarrierGuard(Arc<RwLock<()>>);
pub(super) struct BarrierGuard(Arc<RwLock<Option<GetOrCreateOpenShardsErrorDebounced>>>);

impl BarrierGuard {
pub async fn wait(self) {
let _ = self.0.read().await;
async fn wait(self) -> Option<GetOrCreateOpenShardsErrorDebounced> {
self.0.read().await.clone()
}
}

Expand Down Expand Up @@ -96,31 +98,55 @@ impl DebouncedGetOrCreateOpenShardsRequest {
subrequest: GetOrCreateOpenShardsSubrequest,
permit: PermitGuard,
) {
self.rendezvous
.permits
.insert(subrequest.subrequest_id, permit);
self.subrequests.push(subrequest);
self.rendezvous.permits.push(permit);
}

pub fn push_barrier(&mut self, barrier: BarrierGuard) {
self.rendezvous.barriers.push(barrier);
pub fn push_barrier(&mut self, subrequest_id: u32, barrier: BarrierGuard) {
self.rendezvous.barriers.insert(subrequest_id, barrier);
}
}

#[derive(Debug, Clone)]
pub enum GetOrCreateOpenShardsErrorDebounced {
ControlPlaneError(ControlPlaneError),
Failure(GetOrCreateOpenShardsFailure),
}

#[derive(Default)]
pub(super) struct Rendezvous {
permits: Vec<PermitGuard>,
barriers: Vec<BarrierGuard>,
permits: HashMap<SubrequestId, PermitGuard>,
barriers: HashMap<SubrequestId, BarrierGuard>,
}

impl Rendezvous {
/// Releases the permits and waits for the barriers to be lifted.
pub async fn wait(mut self) {
pub fn write_error(
&mut self,
subrequest_id: SubrequestId,
response: GetOrCreateOpenShardsErrorDebounced,
) {
if let Some(permit) = self.permits.get_mut(&subrequest_id) {
*permit.0 = Some(response)
} else {
error!("no permit found for subrequest, please report");
}
}

/// Releases the permits and waits for the barriers to be lifted, returning errors if any.
pub async fn wait(mut self) -> HashMap<SubrequestId, GetOrCreateOpenShardsErrorDebounced> {
// Releasing the permits before waiting for the barriers is necessary to avoid
// dead locks.
self.permits.clear();

for barrier in self.barriers {
barrier.wait().await;
let mut responses = HashMap::with_capacity(self.barriers.len());
for (sub_request_id, barrier) in self.barriers {
if let Some(error) = barrier.wait().await {
responses.insert(sub_request_id, error);
}
}
responses
}
}

Expand Down Expand Up @@ -203,15 +229,15 @@ mod tests {
GetOrCreateOpenShardsSubrequest {
index_id: "test-index".to_string(),
source_id: "test-source-foo".to_string(),
..Default::default()
subrequest_id: 1,
},
permit,
);

let barrier = debouncer
.acquire("test-index", "test-source-foo")
.unwrap_err();
debounced_request.push_barrier(barrier);
debounced_request.push_barrier(2, barrier);

let (request_opt, rendezvous) = debounced_request.take();
let request = request_opt.unwrap();
Expand Down
1 change: 0 additions & 1 deletion quickwit/quickwit-ingest/src/ingest_v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ use quickwit_proto::ingest::{CommitTypeV2, DocBatchV2};
use quickwit_proto::types::{DocUid, DocUidGenerator, IndexId, NodeId, SubrequestId};
use serde::Serialize;
use tracing::{error, info};
use workbench::pending_subrequests;

pub use self::fetch::{FetchStreamError, MultiFetchStream};
pub use self::ingester::{wait_for_ingester_decommission, wait_for_ingester_status, Ingester};
Expand Down
Loading