Skip to content

Commit

Permalink
Instrument tarpc with tracing.
Browse files Browse the repository at this point in the history
tarpc is now instrumented with tracing primitives extended with
OpenTelemetry traces. Using a compatible tracing-opentelemetry
subscriber like Jaeger, each RPC can be traced through the client,
server, amd other dependencies downstream of the server. Even for
applications not connected to a distributed tracing collector, the
instrumentation can also be ingested by regular loggers like env_logger.

 # Breaking Changes

 ## Logging

Logged events are now structured using tracing. For applications using a
logger and not a tracing subscriber, these logs may look different or
contain information in a less consumable manner. The easiest solution is
to add a tracing subscriber that logs to stdout, such as
tracing_subscriber::fmt.

 ##  Context

- Context no longer has parent_span, which was actually never needed,
  because the context sent in an RPC is inherently the parent context.
  For purposes of distributed tracing, the client side of the RPC has all
  necessary information to link the span to its parent; the server side
  need do nothing more than export the (trace ID, span ID) tuple.
- Context has a new field, SamplingDecision, which has two variants,
  Sampled and Unsampled. This field can be used by downstream systems to
  determine whether a trace needs to be exported. If the parent span is
  sampled, the expectation is that all child spans be exported, as well;
  to do otherwise could result in lossy traces being exported. Note that
  if an Openetelemetry tracing subscriber is not installed, the fallback
  context will still be used, but the Context's sampling decision will
  always be inherited by the parent Context's sampling decision.
- Context::scope has been removed. Context propagation is now done via
  tracing's task-local spans. Spans can be propagated across tasks via
  Span::in_scope. When a service receives a request, it attaches an
  Opentelemetry context to the local Span created before request handling,
  and this context contains the request deadline. This span-local deadline
  is retrieved by Context::current, but it cannot be modified so that
  future Context::current calls contain a different deadline. However, the
  deadline in the context passed into an RPC call will override it, so
  users can retrieve the current context and then modify the deadline
  field, as has been historically possible.
- Context propgation precedence changes: when an RPC is initiated, the
  current Span's Opentelemetry context takes precedence over the trace
  context passed into the RPC method. If there is no current Span, then
  the trace context argument is used as it has been historically. Note
  that Opentelemetry context propagation requires an Opentelemetry
  tracing subscriber to be installed.

 ## Server

- The server::Channel trait now has an additional required associated
  type and method which returns the underlying transport. This makes it
  more ergonomic for users to retrieve transport-specific information,
  like IP Address. BaseChannel implements Channel::transport by returning
  the underlying transport, and channel decorators like Throttler just
  delegate to the Channel::transport method of the wrapped channel.

 # References

[1] https://github.com/tokio-rs/tracing
[2] https://opentelemetry.io
[3] https://github.com/open-telemetry/opentelemetry-rust/tree/main/opentelemetry-jaeger
[4] https://github.com/env-logger-rs/env_logger
  • Loading branch information
tikue committed Apr 2, 2021
1 parent db0c778 commit 7b7c182
Show file tree
Hide file tree
Showing 23 changed files with 865 additions and 523 deletions.
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
[workspace]
resolver = "2"

members = [
"example-service",
"tarpc",
"plugins",
]

[profile.dev]
split-debuginfo = "unpacked"
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ process, and no context switching between different languages.
Some other features of tarpc:
- Pluggable transport: any type impling `Stream<Item = Request> + Sink<Response>` can be
used as a transport to connect the client and server.
- `Send + 'static` optional: if the transport doesn't require it, neither does tarpc!
- Cascading cancellation: dropping a request will send a cancellation message to the server.
The server will cease any unfinished work on the request, subsequently cancelling any of its
own requests, repeating for the entire chain of transitive dependencies.
Expand All @@ -51,6 +50,14 @@ Some other features of tarpc:
requests sent by the server that use the request context will propagate the request deadline.
For example, if a server is handling a request with a 10s deadline, does 2s of work, then
sends a request to another server, that server will see an 8s deadline.
- Distributed tracing: tarpc is instrumented with [tracing](https://github.com/tokio-rs/tracing)
primitives extended with [OpenTelemetry](https://opentelemetry.io/) traces. Using a compatible
tracing subscriber like
[Jaeger](https://github.com/open-telemetry/opentelemetry-rust/tree/main/opentelemetry-jaeger),
each RPC can be traced through the client, server, amd other dependencies downstream of the
server. Even for applications not connected to a distributed tracing collector, the
instrumentation can also be ingested by regular loggers like
[env_logger](https://github.com/env-logger-rs/env_logger/).
- Serde serialization: enabling the `serde1` Cargo feature will make service requests and
responses `Serialize + Deserialize`. It's entirely optional, though: in-memory transports can
be used, as well, so the price of serialization doesn't have to be paid when it's not needed.
Expand Down
12 changes: 10 additions & 2 deletions example-service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,21 @@ readme = "../README.md"
description = "An example server built on tarpc."

[dependencies]
clap = "2.33"
env_logger = "0.8"
anyhow = "1.0"
clap = "3.0.0-beta.2"
log = "0.4"
futures = "0.3"
opentelemetry = { version = "0.13", features = ["rt-tokio"] }
opentelemetry-jaeger = { version = "0.12", features = ["tokio"] }
rand = "0.8"
serde = { version = "1.0" }
tarpc = { path = "../tarpc", features = ["full"] }
tokio = { version = "1", features = ["macros", "net", "rt-multi-thread"] }
tokio-serde = { version = "0.8", features = ["json"] }
tracing = { version = "0.1" }
tracing-appender = "0.1"
tracing-opentelemetry = "0.12"
tracing-subscriber = "0.2"

[lib]
name = "service"
Expand Down
82 changes: 37 additions & 45 deletions example-service/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,57 +4,49 @@
// license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.

use clap::{App, Arg};
use std::{io, net::SocketAddr};
use clap::Clap;
use service::{init_tracing, WorldClient};
use std::{net::SocketAddr, time::Duration};
use tarpc::{client, context, tokio_serde::formats::Json};
use tokio::time::sleep;
use tracing::Instrument;

#[derive(Clap)]
struct Flags {
/// Sets the server address to connect to.
#[clap(long)]
server_addr: SocketAddr,
/// Sets the name to say hello to.
#[clap(long)]
name: String,
}

#[tokio::main]
async fn main() -> io::Result<()> {
env_logger::init();

let flags = App::new("Hello Client")
.version("0.1")
.author("Tim <[email protected]>")
.about("Say hello!")
.arg(
Arg::with_name("server_addr")
.long("server_addr")
.value_name("ADDRESS")
.help("Sets the server address to connect to.")
.required(true)
.takes_value(true),
)
.arg(
Arg::with_name("name")
.short("n")
.long("name")
.value_name("STRING")
.help("Sets the name to say hello to.")
.required(true)
.takes_value(true),
)
.get_matches();

let server_addr = flags.value_of("server_addr").unwrap();
let server_addr = server_addr
.parse::<SocketAddr>()
.unwrap_or_else(|e| panic!(r#"--server_addr value "{}" invalid: {}"#, server_addr, e));

let name = flags.value_of("name").unwrap().into();

let mut transport = tarpc::serde_transport::tcp::connect(server_addr, Json::default);
transport.config_mut().max_frame_length(usize::MAX);
async fn main() -> anyhow::Result<()> {
let flags = Flags::parse();
let _uninstall = init_tracing("Tarpc Example Client")?;

let transport = tarpc::serde_transport::tcp::connect(flags.server_addr, Json::default);

// WorldClient is generated by the service attribute. It has a constructor `new` that takes a
// config and any Transport as input.
let client = service::WorldClient::new(client::Config::default(), transport.await?).spawn()?;

// The client has an RPC method for each RPC defined in the annotated trait. It takes the same
// args as defined, with the addition of a Context, which is always the first arg. The Context
// specifies a deadline and trace information which can be helpful in debugging requests.
let hello = client.hello(context::current(), name).await?;

println!("{}", hello);
let client = WorldClient::new(client::Config::default(), transport.await?).spawn()?;

let hello = async move {
// Send the request twice, just to be safe! ;)
tokio::select! {
hello1 = client.hello(context::current(), format!("{}1", flags.name)) => { hello1 }
hello2 = client.hello(context::current(), format!("{}2", flags.name)) => { hello2 }
}
}
.instrument(tracing::info_span!("Two Hellos"))
.await;

tracing::info!("{:?}", hello);

// Let the background span processor finish.
sleep(Duration::from_micros(1)).await;
opentelemetry::global::shutdown_tracer_provider();

Ok(())
}
28 changes: 28 additions & 0 deletions example-service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,38 @@
// license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.

use std::env;
use tracing_subscriber::{fmt::format::FmtSpan, prelude::*};

/// This is the service definition. It looks a lot like a trait definition.
/// It defines one RPC, hello, which takes one arg, name, and returns a String.
#[tarpc::service]
pub trait World {
/// Returns a greeting for name.
async fn hello(name: String) -> String;
}

/// Initializes an OpenTelemetry tracing subscriber with a Jaeger backend.
pub fn init_tracing(
service_name: &str,
) -> anyhow::Result<tracing_appender::non_blocking::WorkerGuard> {
env::set_var("OTEL_BSP_MAX_EXPORT_BATCH_SIZE", "12");

let tracer = opentelemetry_jaeger::new_pipeline()
.with_service_name(service_name)
.with_max_packet_size(2usize.pow(13))
.install_batch(opentelemetry::runtime::Tokio)?;
let (non_blocking, guard) = tracing_appender::non_blocking(std::io::stdout());

tracing_subscriber::registry()
.with(tracing_subscriber::EnvFilter::from_default_env())
.with(
tracing_subscriber::fmt::layer()
.with_span_events(FmtSpan::NEW | FmtSpan::CLOSE)
.with_writer(non_blocking),
)
.with(tracing_opentelemetry::layer().with_tracer(tracer))
.try_init()?;

Ok(guard)
}
54 changes: 25 additions & 29 deletions example-service/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,30 @@
// license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.

use clap::{App, Arg};
use clap::Clap;
use futures::{future, prelude::*};
use service::World;
use rand::{
distributions::{Distribution, Uniform},
thread_rng,
};
use service::{init_tracing, World};
use std::{
io,
net::{IpAddr, SocketAddr},
time::Duration,
};
use tarpc::{
context,
server::{self, Channel, Incoming},
tokio_serde::formats::Json,
};
use tokio::time;

#[derive(Clap)]
struct Flags {
/// Sets the port number to listen on.
#[clap(long)]
port: u16,
}

// This is the type that implements the generated World trait. It is the business logic
// and is used to start the server.
Expand All @@ -25,35 +37,19 @@ struct HelloServer(SocketAddr);
#[tarpc::server]
impl World for HelloServer {
async fn hello(self, _: context::Context, name: String) -> String {
let sleep_time =
Duration::from_millis(Uniform::new_inclusive(1, 10).sample(&mut thread_rng()));
time::sleep(sleep_time).await;
format!("Hello, {}! You are connected from {:?}.", name, self.0)
}
}

#[tokio::main]
async fn main() -> io::Result<()> {
env_logger::init();

let flags = App::new("Hello Server")
.version("0.1")
.author("Tim <[email protected]>")
.about("Say hello!")
.arg(
Arg::with_name("port")
.short("p")
.long("port")
.value_name("NUMBER")
.help("Sets the port number to listen on")
.required(true)
.takes_value(true),
)
.get_matches();

let port = flags.value_of("port").unwrap();
let port = port
.parse()
.unwrap_or_else(|e| panic!(r#"--port value "{}" invalid: {}"#, port, e));
async fn main() -> anyhow::Result<()> {
let flags = Flags::parse();
let _uninstall = init_tracing("Tarpc Example Server")?;

let server_addr = (IpAddr::from([0, 0, 0, 0]), port);
let server_addr = (IpAddr::from([0, 0, 0, 0]), flags.port);

// JSON transport is provided by the json_transport tarpc module. It makes it easy
// to start up a serde-powered json serialization strategy over TCP.
Expand All @@ -64,12 +60,12 @@ async fn main() -> io::Result<()> {
.filter_map(|r| future::ready(r.ok()))
.map(server::BaseChannel::with_defaults)
// Limit channels to 1 per IP.
.max_channels_per_key(1, |t| t.as_ref().peer_addr().unwrap().ip())
.max_channels_per_key(1, |t| t.transport().peer_addr().unwrap().ip())
// serve is generated by the service attribute. It takes as input any type implementing
// the generated World trait.
.map(|channel| {
let server = HelloServer(channel.as_ref().as_ref().peer_addr().unwrap());
channel.requests().execute(server.serve())
let server = HelloServer(channel.transport().peer_addr().unwrap());
channel.execute(server.serve())
})
// Max 10 channels.
.buffer_unordered(10)
Expand Down
2 changes: 1 addition & 1 deletion plugins/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,4 @@ proc-macro = true
assert-type-eq = "0.1.0"
futures = "0.3"
serde = { version = "1.0", features = ["derive"] }
tarpc = { path = "../tarpc" }
tarpc = { path = "../tarpc", features = ["serde1"] }
24 changes: 22 additions & 2 deletions plugins/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,12 @@ pub fn service(attr: TokenStream, input: TokenStream) -> TokenStream {
None
};

let methods = rpcs.iter().map(|rpc| &rpc.ident).collect::<Vec<_>>();
let request_names = methods
.iter()
.map(|m| format!("{}.{}", ident, m))
.collect::<Vec<_>>();

ServiceGenerator {
response_fut_name,
service_ident: ident,
Expand All @@ -278,7 +284,8 @@ pub fn service(attr: TokenStream, input: TokenStream) -> TokenStream {
vis,
args,
method_attrs: &rpcs.iter().map(|rpc| &*rpc.attrs).collect::<Vec<_>>(),
method_idents: &rpcs.iter().map(|rpc| &rpc.ident).collect::<Vec<_>>(),
method_idents: &methods,
request_names: &*request_names,
attrs,
rpcs,
return_types: &rpcs
Expand Down Expand Up @@ -441,6 +448,7 @@ struct ServiceGenerator<'a> {
camel_case_idents: &'a [Ident],
future_types: &'a [Type],
method_idents: &'a [&'a Ident],
request_names: &'a [String],
method_attrs: &'a [&'a [Attribute]],
args: &'a [&'a [PatType]],
return_types: &'a [&'a Type],
Expand Down Expand Up @@ -524,6 +532,7 @@ impl<'a> ServiceGenerator<'a> {
camel_case_idents,
arg_pats,
method_idents,
request_names,
..
} = self;

Expand All @@ -534,6 +543,16 @@ impl<'a> ServiceGenerator<'a> {
type Resp = #response_ident;
type Fut = #response_fut_ident<S>;

fn method(&self, req: &#request_ident) -> Option<&'static str> {
Some(match req {
#(
#request_ident::#camel_case_idents{..} => {
#request_names
}
)*
})
}

fn serve(self, ctx: tarpc::context::Context, req: #request_ident) -> Self::Fut {
match req {
#(
Expand Down Expand Up @@ -714,6 +733,7 @@ impl<'a> ServiceGenerator<'a> {
method_attrs,
vis,
method_idents,
request_names,
args,
return_types,
arg_pats,
Expand All @@ -729,7 +749,7 @@ impl<'a> ServiceGenerator<'a> {
#vis fn #method_idents(&self, ctx: tarpc::context::Context, #( #args ),*)
-> impl std::future::Future<Output = std::io::Result<#return_types>> + '_ {
let request = #request_ident::#camel_case_idents { #( #arg_pats ),* };
let resp = self.0.call(ctx, request);
let resp = self.0.call(ctx, #request_names, request);
async move {
match resp.await? {
#response_ident::#camel_case_idents(msg) => std::result::Result::Ok(msg),
Expand Down
Loading

0 comments on commit 7b7c182

Please sign in to comment.