Skip to content

Commit ebbdc8a

Browse files
committed
rust: add open telemetry tracing
1 parent beb94f0 commit ebbdc8a

File tree

5 files changed

+127
-16
lines changed

5 files changed

+127
-16
lines changed

Cargo.toml

+12
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,18 @@ stderrlog = "0.6.0"
1919
structopt = "0.3.26"
2020
tokio = {version = "1.40.0", features = ["full", "test-util", "tracing", "macros", "rt-multi-thread"] }
2121
tonic = "0.12.2"
22+
opentelemetry_sdk = { version = "*", features = ["rt-tokio"] }
23+
opentelemetry-otlp = { version = "*", features = ["grpc-tonic"] }
24+
opentelemetry = "0.27.1"
25+
tonic-tracing-opentelemetry = "0.24.3"
26+
tower = "0.5.2"
27+
tracing-opentelemetry-instrumentation-sdk = "0.24.1"
28+
http = "1.2.0"
29+
axum-tracing-opentelemetry = "0.25.0"
30+
opentelemetry-stdout = "0.27.0"
31+
tracing-subscriber = { version="0.3.19", features = ["fmt", "env-filter"]}
32+
tracing-opentelemetry = "0.28.0"
33+
tracing = "0.1.41"
2234

2335
[build-dependencies]
2436
tonic-build = "0.12.2"

src/lib.rs

+89-3
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,18 @@ use std::env;
1515
use std::sync::Arc;
1616

1717
use anyhow::Result;
18+
use log::info;
1819
use pyo3::exceptions::{PyRuntimeError, PyTimeoutError};
1920
use structopt::StructOpt;
2021
use tokio::runtime::Runtime;
2122
use tokio::task::JoinHandle;
22-
use tonic::transport::Channel;
2323
use tonic::Status;
2424

2525
pub mod torchftpb {
2626
tonic::include_proto!("torchft");
2727
}
2828

29+
use crate::net::Channel;
2930
use crate::torchftpb::manager_service_client::ManagerServiceClient;
3031
use crate::torchftpb::{CheckpointAddressRequest, ManagerQuorumRequest, ShouldCommitRequest};
3132
use pyo3::prelude::*;
@@ -301,8 +302,7 @@ impl From<Status> for StatusError {
301302
}
302303
}
303304

304-
#[pymodule]
305-
fn torchft(m: &Bound<'_, PyModule>) -> PyResult<()> {
305+
fn init_logging() -> PyResult<()> {
306306
// setup logging on import
307307
let mut log = stderrlog::new();
308308
log.verbosity(2)
@@ -316,6 +316,92 @@ fn torchft(m: &Bound<'_, PyModule>) -> PyResult<()> {
316316
log.init()
317317
.map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
318318

319+
Ok(())
320+
}
321+
322+
fn init_tracing() -> PyResult<()> {
323+
use opentelemetry::trace::Tracer;
324+
use opentelemetry::trace::TracerProvider as OpenTelemetryTracerProvider;
325+
use opentelemetry_otlp::WithExportConfig;
326+
use opentelemetry_sdk::trace::TracerProvider;
327+
use tracing_subscriber::layer::SubscriberExt;
328+
use tracing_subscriber::{filter::EnvFilter, Layer};
329+
330+
fn set_tracer_provider(tracer_provider: TracerProvider) -> PyResult<()> {
331+
opentelemetry::global::set_tracer_provider(tracer_provider.clone());
332+
333+
let layer = tracing_opentelemetry::layer()
334+
.with_error_records_to_exceptions(true)
335+
.with_tracer(tracer_provider.tracer(""));
336+
337+
// Create a new tracing::Fmt layer to print the logs to stdout. It has a
338+
// default filter of `info` level and above, and `debug` and above for logs
339+
// from OpenTelemetry crates. The filter levels can be customized as needed.
340+
let filter_fmt =
341+
EnvFilter::new("info").add_directive("opentelemetry=debug".parse().unwrap());
342+
let fmt_layer = tracing_subscriber::fmt::layer()
343+
.with_thread_names(true)
344+
.with_filter(filter_fmt);
345+
346+
let subscriber = tracing_subscriber::registry().with(fmt_layer).with(layer);
347+
tracing::subscriber::set_global_default(subscriber)
348+
.map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
349+
350+
info!("OpenTelemetry tracing enabled");
351+
352+
Ok(())
353+
}
354+
355+
match env::var("TORCHFT_OTEL_OTLP") {
356+
Ok(endpoint) => {
357+
let runtime = Runtime::new()?;
358+
359+
runtime.block_on(async move {
360+
info!("Enabling OpenTelemetry OTLP with {}", endpoint);
361+
let exporter = opentelemetry_otlp::SpanExporter::builder()
362+
.with_tonic()
363+
.with_endpoint(endpoint)
364+
.with_timeout(Duration::from_secs(10))
365+
.build()
366+
.map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
367+
368+
let tracer_provider = TracerProvider::builder()
369+
.with_batch_exporter(exporter, opentelemetry_sdk::runtime::Tokio)
370+
.build();
371+
372+
set_tracer_provider(tracer_provider)?;
373+
374+
Ok::<(), pyo3::PyErr>(())
375+
})?;
376+
}
377+
Err(_) => {}
378+
};
379+
match env::var("TORCHFT_OTEL_STDOUT") {
380+
Ok(_) => {
381+
info!("Enabling OpenTelemetry stdout");
382+
let exporter = opentelemetry_stdout::SpanExporter::default();
383+
let tracer_provider = TracerProvider::builder()
384+
.with_simple_exporter(exporter)
385+
.build();
386+
387+
set_tracer_provider(tracer_provider)?;
388+
}
389+
Err(_) => {}
390+
}
391+
392+
let tracer = opentelemetry::global::tracer("my_tracer");
393+
tracer.in_span("doing_work", |cx| {
394+
// Traced app logic here...
395+
});
396+
397+
Ok(())
398+
}
399+
400+
#[pymodule]
401+
fn torchft(m: &Bound<'_, PyModule>) -> PyResult<()> {
402+
init_logging()?;
403+
init_tracing()?;
404+
319405
m.add_class::<Manager>()?;
320406
m.add_class::<ManagerClient>()?;
321407
m.add_class::<Lighthouse>()?;

src/lighthouse.rs

+9-4
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use axum::{
2020
routing::{get, post},
2121
Router,
2222
};
23+
use axum_tracing_opentelemetry::middleware::{OtelAxumLayer, OtelInResponseLayer};
2324
use gethostname::gethostname;
2425
use log::{error, info};
2526
use structopt::StructOpt;
@@ -31,6 +32,7 @@ use tonic::service::Routes;
3132
use tonic::transport::server::TcpIncoming;
3233
use tonic::transport::Server;
3334
use tonic::{Request, Response, Status};
35+
use tonic_tracing_opentelemetry::middleware::server::OtelGrpcLayer;
3436

3537
use crate::manager::manager_client_new;
3638
use crate::torchftpb::{
@@ -345,12 +347,17 @@ impl Lighthouse {
345347
let self_clone = self.clone();
346348
move |path| async { self_clone.kill(path).await }
347349
}),
348-
);
350+
)
351+
// include trace context as header into the response
352+
.layer(OtelInResponseLayer::default())
353+
//start OpenTelemetry trace on incoming request
354+
.layer(OtelAxumLayer::default());
349355

350356
// register the GRPC service
351357
let routes = Routes::from(app).add_service(LighthouseServiceServer::new(self));
352358

353359
Server::builder()
360+
.layer(OtelGrpcLayer::default())
354361
// allow non-GRPC connections
355362
.accept_http1(true)
356363
.add_routes(routes)
@@ -571,9 +578,7 @@ mod tests {
571578
use super::*;
572579
use std::ops::Sub;
573580

574-
use tonic::transport::Channel;
575-
576-
use crate::net::connect;
581+
use crate::net::{connect, Channel};
577582
use crate::torchftpb::lighthouse_service_client::LighthouseServiceClient;
578583

579584
async fn lighthouse_client_new(addr: String) -> Result<LighthouseServiceClient<Channel>> {

src/manager.rs

+8-6
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,11 @@ use tokio::sync::Mutex;
1616
use tokio::task::JoinSet;
1717
use tokio::time::sleep;
1818
use tonic::transport::server::TcpIncoming;
19-
use tonic::transport::Channel;
2019
use tonic::transport::Server;
2120
use tonic::{Request, Response, Status};
21+
use tonic_tracing_opentelemetry::middleware::server::OtelGrpcLayer;
2222

23-
use crate::net::connect;
23+
use crate::net::{connect, Channel};
2424
use crate::timeout::try_parse_grpc_timeout;
2525
use crate::torchftpb::lighthouse_service_client::LighthouseServiceClient;
2626
use crate::torchftpb::manager_service_client::ManagerServiceClient;
@@ -64,17 +64,18 @@ pub async fn manager_client_new(
6464
connect_timeout: Duration,
6565
) -> Result<ManagerServiceClient<Channel>> {
6666
info!("ManagerClient: establishing connection to {}", &addr);
67-
let conn = connect(addr, connect_timeout).await?;
68-
Ok(ManagerServiceClient::new(conn))
67+
let channel = connect(addr, connect_timeout).await?;
68+
69+
Ok(ManagerServiceClient::new(channel))
6970
}
7071

7172
pub async fn lighthouse_client_new(
7273
addr: String,
7374
connect_timeout: Duration,
7475
) -> Result<LighthouseServiceClient<Channel>> {
7576
info!("LighthouseClient: establishing connection to {}", &addr);
76-
let conn = connect(addr, connect_timeout).await?;
77-
Ok(LighthouseServiceClient::new(conn))
77+
let channel = connect(addr, connect_timeout).await?;
78+
Ok(LighthouseServiceClient::new(channel))
7879
}
7980

8081
impl Manager {
@@ -146,6 +147,7 @@ impl Manager {
146147
TcpIncoming::from_listener(listener, true, None).map_err(|e| anyhow::anyhow!(e))?;
147148

148149
Server::builder()
150+
.layer(OtelGrpcLayer::default())
149151
.add_service(ManagerServiceServer::new(self))
150152
.serve_with_incoming(incoming)
151153
.await

src/net.rs

+9-3
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,16 @@
11
use std::time::Duration;
22

33
use anyhow::Result;
4-
use tonic::transport::{Channel, Endpoint};
4+
use tonic::transport::Endpoint;
5+
use tonic_tracing_opentelemetry::middleware::client::{OtelGrpcLayer, OtelGrpcService};
6+
use tower::ServiceBuilder;
57

68
use crate::retry::{retry_backoff, ExponentialBackoff};
79

10+
pub type Channel = OtelGrpcService<tonic::transport::Channel>;
11+
812
pub async fn connect_once(addr: String, connect_timeout: Duration) -> Result<Channel> {
9-
let conn = Endpoint::new(addr)?
13+
let channel = Endpoint::new(addr)?
1014
.connect_timeout(connect_timeout)
1115
// Enable HTTP2 keep alives
1216
.http2_keep_alive_interval(Duration::from_secs(60))
@@ -16,7 +20,9 @@ pub async fn connect_once(addr: String, connect_timeout: Duration) -> Result<Cha
1620
.keep_alive_while_idle(true)
1721
.connect()
1822
.await?;
19-
Ok(conn)
23+
let channel = ServiceBuilder::new().layer(OtelGrpcLayer).service(channel);
24+
25+
Ok(channel)
2026
}
2127

2228
pub async fn connect(addr: String, connect_timeout: Duration) -> Result<Channel> {

0 commit comments

Comments
 (0)