Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add tracing spans #82

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
464 changes: 380 additions & 84 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion cas_client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ async-trait = "0.1.9"
anyhow = "1"
http = "1.1.0"
tempfile = "3.13.0"
tracing = "0.1.31"
tracing = "0.1.40"
tracing-opentelemetry = "0.27.0"
bytes = "1"
itertools = "0.10"
reqwest = { version = "0.12.7", features = ["json"] }
Expand Down
22 changes: 20 additions & 2 deletions cas_client/src/http_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ use std::time::Duration;

use anyhow::anyhow;
use error_printer::OptionPrinter;
use http::Extensions;
use reqwest::header::{HeaderValue, AUTHORIZATION};
use reqwest::{Request, Response};
use reqwest_middleware::{ClientBuilder, ClientWithMiddleware, Middleware, Next};
use reqwest_retry::policies::ExponentialBackoff;
use reqwest_retry::{default_on_request_failure, default_on_request_success, RetryTransientMiddleware, Retryable};
use tracing::warn;
use tracing::{info_span, warn, Instrument};
use utils::auth::{AuthConfig, TokenProvider};

use crate::CasClientError;
Expand Down Expand Up @@ -55,6 +56,7 @@ pub fn build_auth_http_client(
.maybe_with(auth_middleware)
.maybe_with(Some(retry_middleware))
.maybe_with(logging_middleware)
.maybe_with(Some(TraceMiddleware))
.build())
}

Expand All @@ -72,6 +74,7 @@ pub fn build_http_client(
Ok(ClientBuilder::new(reqwest_client)
.maybe_with(Some(retry_middleware))
.maybe_with(logging_middleware)
.maybe_with(Some(TraceMiddleware))
.build())
}

Expand Down Expand Up @@ -177,6 +180,21 @@ impl Middleware for AuthMiddleware {
}
}

pub struct TraceMiddleware;

#[async_trait::async_trait]
impl Middleware for TraceMiddleware {
async fn handle(
&self,
req: Request,
extensions: &mut Extensions,
next: Next<'_>,
) -> reqwest_middleware::Result<Response> {
let span = info_span!("client::request", method = %req.method(), url = req.url().as_str());
next.run(req, extensions).instrument(span).await
}
}

#[cfg(test)]
mod tests {
use std::time::SystemTime;
Expand Down Expand Up @@ -264,6 +282,6 @@ mod tests {
assert!(logs_contain("Status Code: 500. Retrying..."));
assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR);
assert_eq!(3, mock.hits());
assert_eq!(start_time.elapsed().unwrap() > Duration::from_secs(0), true);
assert!(start_time.elapsed().unwrap() > Duration::from_secs(0));
}
}
4 changes: 4 additions & 0 deletions cas_client/src/http_shard_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use mdb_shard::shard_file_reconstructor::FileReconstructor;
use merklehash::{HashedWrite, MerkleHash};
use reqwest::Url;
use reqwest_middleware::ClientWithMiddleware;
use tracing::instrument;
use utils::auth::AuthConfig;

use crate::error::{CasClientError, Result};
Expand Down Expand Up @@ -40,6 +41,7 @@ impl HttpShardClient {

#[async_trait]
impl RegistrationClient for HttpShardClient {
#[instrument(skip_all, name = "shard_client::upload_shard", fields(hash = hash.hex()))]
async fn upload_shard(
&self,
prefix: &str,
Expand Down Expand Up @@ -82,6 +84,7 @@ impl RegistrationClient for HttpShardClient {

#[async_trait]
impl FileReconstructor<CasClientError> for HttpShardClient {
#[instrument(skip_all, name = "shard_client::get_file_reconstruction_info", fields(hash = file_hash.hex()))]
async fn get_file_reconstruction_info(
&self,
file_hash: &MerkleHash,
Expand Down Expand Up @@ -118,6 +121,7 @@ impl FileReconstructor<CasClientError> for HttpShardClient {

#[async_trait]
impl ShardDedupProber<CasClientError> for HttpShardClient {
#[instrument(skip_all, name = "shard_client::get_dedup_shards")]
async fn get_dedup_shards(
&self,
prefix: &str,
Expand Down
22 changes: 17 additions & 5 deletions cas_client/src/remote_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ use http::header::RANGE;
use merklehash::MerkleHash;
use reqwest::{StatusCode, Url};
use reqwest_middleware::ClientWithMiddleware;
use tracing::{debug, error, info};
use tracing::{debug, error, info, info_span, instrument, Instrument, Span};
use tracing_opentelemetry::OpenTelemetrySpanExt;
use utils::auth::AuthConfig;
use utils::ThreadPool;

Expand Down Expand Up @@ -163,6 +164,7 @@ impl ReconstructionClient for RemoteClient {

#[async_trait]
impl Reconstructable for RemoteClient {
#[instrument(skip_all, name = "remote_client::get_reconstruction")]
async fn get_reconstruction(
&self,
file_id: &MerkleHash,
Expand Down Expand Up @@ -196,6 +198,7 @@ impl Reconstructable for RemoteClient {
impl Client for RemoteClient {}

impl RemoteClient {
#[instrument(skip_all, name = "remote_client::batch_get_reconstruction")]
async fn batch_get_reconstruction(
&self,
file_ids: impl Iterator<Item = &MerkleHash>,
Expand Down Expand Up @@ -229,6 +232,8 @@ impl RemoteClient {
Ok(query_reconstruction_response)
}

#[instrument(skip_all, name = "remote_client::upload_xorb", fields(key = key.hash.hex(), num_chunks = chunk_and_boundaries.len()
))]
pub async fn upload(
&self,
key: &Key,
Expand Down Expand Up @@ -282,10 +287,16 @@ impl RemoteClient {
} else {
terms.iter().fold(0, |acc, x| acc + x.unpacked_length as u64)
};

let futs_iter = terms
.into_iter()
.map(|term| get_one_term(http_client.clone(), self.chunk_cache.clone(), term, fetch_info.clone()));
let ctx = Span::current().context();
let futs_iter = terms.into_iter().map(|term| {
let term_span = info_span!(
"remote_client::reconstruct_term_task",
hash = format!("{}", term.hash),
num_chunks = term.range.end - term.range.start
);
term_span.set_parent(ctx.clone());
get_one_term(http_client.clone(), self.chunk_cache.clone(), term, fetch_info.clone()).instrument(term_span)
});
let mut futs_buffered_enumerated =
futures::stream::iter(futs_iter).buffered(NUM_CONCURRENT_RANGE_GETS).enumerate();

Expand Down Expand Up @@ -428,6 +439,7 @@ impl Write for ThreadSafeBuffer {
/// use the provided http_client to make requests to S3/blob store using the url and url_range
/// parts of a CASReconstructionFetchInfo. The url_range part is used directly in an http Range header
/// value (see fn `range_header`).
#[instrument(skip_all, name = "remote_client::download_range")]
async fn download_range(
http_client: Arc<ClientWithMiddleware>,
fetch_term: &CASReconstructionFetchInfo,
Expand Down
6 changes: 5 additions & 1 deletion chunk_cache/src/disk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use cas_types::{ChunkRange, Key};
use error_printer::ErrorPrinter;
use file_utils::SafeFileCreator;
use merklehash::MerkleHash;
use tracing::{debug, warn};
use tracing::{debug, instrument, warn};
#[cfg(feature = "analysis")]
use utils::output_bytes;

Expand Down Expand Up @@ -239,6 +239,7 @@ impl DiskCache {
Ok(CacheState::new(state, num_items, total_bytes))
}

#[instrument(skip_all, name = "disk_cache::get_impl")]
fn get_impl(&self, key: &Key, range: &ChunkRange) -> OptionResult<Vec<u8>, ChunkCacheError> {
if range.start >= range.end {
return Err(ChunkCacheError::InvalidArguments);
Expand Down Expand Up @@ -325,6 +326,7 @@ impl DiskCache {
Ok(None)
}

#[instrument(skip_all, name = "disk_cache::put_impl")]
fn put_impl(
&self,
key: &Key,
Expand Down Expand Up @@ -737,6 +739,7 @@ fn try_parse_cache_file(file_result: io::Result<DirEntry>, capacity: u64) -> Opt
}

/// removes a file but disregards a "NotFound" error if the file is already gone
#[instrument(skip_all, name = "disk_cache::remove_file")]
fn remove_file(path: impl AsRef<Path>) -> Result<(), ChunkCacheError> {
if let Err(e) = std::fs::remove_file(path) {
if e.kind() != ErrorKind::NotFound {
Expand All @@ -747,6 +750,7 @@ fn remove_file(path: impl AsRef<Path>) -> Result<(), ChunkCacheError> {
}

/// removes a directory but disregards a "NotFound" error if the directory is already gone
#[instrument(skip_all, name = "disk_cache::remove_dir")]
fn remove_dir(path: impl AsRef<Path>) -> Result<(), ChunkCacheError> {
if let Err(e) = std::fs::remove_dir(path) {
if e.kind() != ErrorKind::NotFound {
Expand Down
1 change: 1 addition & 0 deletions data/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ xet_error = { path = "../xet_error" }
tokio = { version = "1.36", features = ["full"] }
anyhow = "1"
tracing = "0.1.*"
tracing-opentelemetry = "0.27.0"
async-trait = "0.1.53"
clap = { version = "3.1.6", features = ["derive"] }
http = "0.2.8"
Expand Down
Loading