Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement support for an emergency killswitch. #164

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
container = pkgs.dockerTools.buildLayeredImage {
name = "lightswitch";
config = {
Entrypoint = [ "${lightswitch}/bin/lightswitch" ];
Entrypoint = [ "${lightswitch}/bin/lightswitch --killswitch-file-path /tmp/lightswitch.killswitch" ];
Env = [
"RUST_BACKTRACE=1"
];
Expand Down
6 changes: 6 additions & 0 deletions src/cli/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,12 @@ pub(crate) struct CliArgs {
pub(crate) enable_deadlock_detector: bool,
#[arg(long, default_value = ProfilerConfig::default().cache_dir_base.into_os_string())]
pub(crate) cache_dir_base: PathBuf,
#[arg(
long,
help = "killswitch file to stop or prevent the profiler from starting. Required if duration is not set",
required_unless_present = "duration"
)]
pub(crate) killswitch_file_path: Option<String>,
#[arg(long, help = "force perf buffers even if ring buffers can be used")]
pub(crate) force_perf_buffer: bool,
}
211 changes: 131 additions & 80 deletions src/cli/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ use std::time::Duration;
use clap::Parser;
use crossbeam_channel::bounded;
use inferno::flamegraph;
use lightswitch::collector::ThreadSafeCollector;
use lightswitch::collector::{AggregatorCollector, Collector, NullCollector, StreamingCollector};
use lightswitch::debug_info::DebugInfoManager;
use lightswitch_metadata::metadata_provider::GlobalMetadataProvider;
use nix::unistd::Uid;
use prost::Message;
use runner::Runner;
use tracing::{debug, error, info, Level};
use tracing_subscriber::fmt::format::FmtSpan;
use tracing_subscriber::FmtSubscriber;
Expand All @@ -28,12 +30,13 @@ use lightswitch::debug_info::{
};
use lightswitch::profile::symbolize_profile;
use lightswitch::profile::{fold_profile, to_pprof};
use lightswitch::profiler::{Profiler, ProfilerConfig};
use lightswitch::profiler::{Profiler, ProfilerConfig, ThreadSafeProfiler};
use lightswitch::unwind_info::compact_unwind_info;
use lightswitch::unwind_info::CompactUnwindInfoBuilder;
use lightswitch_object::ObjectFile;

mod args;
mod runner;
mod validators;

use crate::args::CliArgs;
Expand Down Expand Up @@ -71,6 +74,95 @@ fn start_deadlock_detector() {
});
}

fn is_continous_pofiling_mode(duration: &Duration) -> bool {
duration.as_secs() == Duration::MAX.as_secs()
}

fn collect_profiles_adhoc(
args: &CliArgs,
profiler: ThreadSafeProfiler,
collector: ThreadSafeCollector,
metadata_provider: ThreadSafeGlobalMetadataProvider,
) -> Result<(), Box<dyn Error>> {
let profile_duration = profiler.lock().unwrap().run(); // blocks the main thread

let collector = collector.lock().unwrap();
let (mut profile, procs, objs) = collector.finish();

// If we need to send the profile to the backend there's nothing else to do.
match args.sender {
ProfileSender::Remote | ProfileSender::None => {
return Ok(());
}
_ => {}
}

// Otherwise let's symbolize the profile and write it to disk.
if args.symbolizer == Symbolizer::Local {
profile = symbolize_profile(&profile, procs, objs);
}

let profile_path = args.profile_path.clone().unwrap_or(PathBuf::from(""));
match args.profile_format {
ProfileFormat::FlameGraph => {
let folded = fold_profile(profile);
let mut options: flamegraph::Options<'_> = flamegraph::Options::default();
let data = folded.as_bytes();
let profile_name = args
.profile_name
.clone()
.unwrap_or_else(|| "flame.svg".into());
let profile_path = profile_path.clone().join(profile_name);
let f = File::create(&profile_path).unwrap();
match flamegraph::from_reader(&mut options, data, f) {
Ok(_) => {
eprintln!(
"Flamegraph profile successfully written to {}",
profile_path.to_string_lossy()
);
}
Err(e) => {
error!("Failed generate flamegraph: {:?}", e);
}
}
}
ProfileFormat::Pprof => {
let mut buffer = Vec::new();
let pprof_profile = to_pprof(
profile,
procs,
objs,
&metadata_provider,
profile_duration,
args.sample_freq,
);
pprof_profile.encode(&mut buffer).unwrap();
let profile_name = args
.profile_name
.clone()
.unwrap_or_else(|| "profile.pb".into());
let profile_path = profile_path.clone().join(profile_name);
let mut pprof_file = File::create(&profile_path).unwrap();

match pprof_file.write_all(&buffer) {
Ok(_) => {
eprintln!(
"Pprof profile successfully written to {}",
profile_path.to_string_lossy()
);
}
Err(e) => {
error!("Failed generate pprof: {:?}", e);
}
}
}
ProfileFormat::None => {
// Do nothing
}
}
Ok(())
}

fn main() -> Result<(), Box<dyn Error>> {
panic_thread_hook();
let args = CliArgs::parse();
Expand Down Expand Up @@ -119,7 +211,7 @@ fn main() -> Result<(), Box<dyn Error>> {
std::process::exit(1);
}

let server_url = args.server_url.unwrap_or(DEFAULT_SERVER_URL.into());
let server_url = args.server_url.clone().unwrap_or(DEFAULT_SERVER_URL.into());

let metadata_provider: ThreadSafeGlobalMetadataProvider =
Arc::new(Mutex::new(GlobalMetadataProvider::default()));
Expand All @@ -137,7 +229,7 @@ fn main() -> Result<(), Box<dyn Error>> {
)),
}));

let debug_info_manager: Box<dyn DebugInfoManager> = match args.debug_info_backend {
let debug_info_manager: Box<dyn DebugInfoManager + Send> = match args.debug_info_backend {
DebugInfoBackend::None => Box::new(DebugInfoBackendNull {}),
DebugInfoBackend::Copy => Box::new(DebugInfoBackendFilesystem {
path: PathBuf::from("/tmp"),
Expand All @@ -153,7 +245,7 @@ fn main() -> Result<(), Box<dyn Error>> {
!args.force_perf_buffer && system_info.available_bpf_features.has_ring_buf;

let profiler_config = ProfilerConfig {
cache_dir_base: args.cache_dir_base,
cache_dir_base: args.cache_dir_base.clone(),
libbpf_debug: args.libbpf_debug,
bpf_logging: args.bpf_logging,
duration: args.duration,
Expand All @@ -170,89 +262,48 @@ fn main() -> Result<(), Box<dyn Error>> {
..Default::default()
};

let (stop_signal_sender, stop_signal_receive) = bounded(1);
let (profiler_stop_signal_sender, profiler_stop_signal_receiver) = bounded(1);
let profiler: ThreadSafeProfiler = Arc::new(Mutex::new(Profiler::new(
Copy link
Collaborator Author

@patnebe patnebe Feb 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some thoughts on this.

Wrapping the Profiler in an Arc<Mutex<>> + Profiler::run() being a blocking call means that the mutex won't be unlocked until Profiler::run() returns or the thread running the profiler exits. One side effect is that only one instance of the profiler can be run at any given time. This sounds like desirable behaviour, but maybe there are reasons why we wouldn't want this.

profiler_config,
profiler_stop_signal_receiver,
collector.clone(),
)));
profiler.lock().unwrap().profile_pids(args.pids.clone());

ctrlc::set_handler(move || {
info!("received Ctrl+C, stopping...");
let _ = stop_signal_sender.send(());
})
.expect("Error setting Ctrl-C handler");
if is_continous_pofiling_mode(&args.duration) {
info!("Starting in continuous profiling mode...");

let mut p: Profiler = Profiler::new(profiler_config, stop_signal_receive);
p.profile_pids(args.pids);
let profile_duration = p.run(collector.clone());
let (runner_stop_signal_sender, runner_stop_signal_receiver) = bounded(1);

let collector = collector.lock().unwrap();
let (mut profile, procs, objs) = collector.finish();
ctrlc::set_handler(move || {
info!("received Ctrl+C, stopping runner...");
let _ = runner_stop_signal_sender.send(());
})
.expect("Error setting Ctrl-C handler");

let killswitch_file = args.killswitch_file_path.expect("");
let mut runner = Runner::new(
profiler,
killswitch_file,
runner_stop_signal_receiver,
profiler_stop_signal_sender,
);

// If we need to send the profile to the backend there's nothing else to do.
match args.sender {
ProfileSender::Remote | ProfileSender::None => {
return Ok(());
}
_ => {}
}
runner.run(); // Blocks the main thread

// Otherwise let's symbolize the profile and write it to disk.
if args.symbolizer == Symbolizer::Local {
profile = symbolize_profile(&profile, procs, objs);
// return early
// TODO: confirm this is the desired behavior
return Ok(());
}

let profile_path = args.profile_path.unwrap_or(PathBuf::from(""));

match args.profile_format {
ProfileFormat::FlameGraph => {
let folded = fold_profile(profile);
let mut options: flamegraph::Options<'_> = flamegraph::Options::default();
let data = folded.as_bytes();
let profile_name = args.profile_name.unwrap_or_else(|| "flame.svg".into());
let profile_path = profile_path.join(profile_name);
let f = File::create(&profile_path).unwrap();
match flamegraph::from_reader(&mut options, data, f) {
Ok(_) => {
eprintln!(
"Flamegraph profile successfully written to {}",
profile_path.to_string_lossy()
);
}
Err(e) => {
error!("Failed generate flamegraph: {:?}", e);
}
}
}
ProfileFormat::Pprof => {
let mut buffer = Vec::new();
let pprof_profile = to_pprof(
profile,
procs,
objs,
&metadata_provider,
profile_duration,
args.sample_freq,
);
pprof_profile.encode(&mut buffer).unwrap();
let profile_name = args.profile_name.unwrap_or_else(|| "profile.pb".into());
let profile_path = profile_path.join(profile_name);
let mut pprof_file = File::create(&profile_path).unwrap();

match pprof_file.write_all(&buffer) {
Ok(_) => {
eprintln!(
"Pprof profile successfully written to {}",
profile_path.to_string_lossy()
);
}
Err(e) => {
error!("Failed generate pprof: {:?}", e);
}
}
}
ProfileFormat::None => {
// Do nothing
}
}
// Adhoc profiling mode
ctrlc::set_handler(move || {
info!("received Ctrl+C, stopping...");
let _ = profiler_stop_signal_sender.send(());
})
.expect("Error setting Ctrl-C handler");

Ok(())
collect_profiles_adhoc(&args, profiler, collector, metadata_provider)
}

fn show_unwind_info(path: &str) {
Expand Down Expand Up @@ -299,7 +350,7 @@ mod tests {
cmd.arg("--help");
cmd.assert().success();
let actual = String::from_utf8(cmd.unwrap().stdout).unwrap();
insta::assert_yaml_snapshot!(actual, @r#""Usage: lightswitch [OPTIONS]\n\nOptions:\n --pids <PIDS>\n Specific PIDs to profile\n\n --tids <TIDS>\n Specific TIDs to profile (these can be outside the PIDs selected above)\n\n --show-unwind-info <PATH_TO_BINARY>\n Show unwind info for given binary\n\n --show-info <PATH_TO_BINARY>\n Show build ID for given binary\n\n -D, --duration <DURATION>\n How long this agent will run in seconds\n \n [default: 18446744073709551615]\n\n --libbpf-debug\n Enable libbpf logs. This includes the BPF verifier output\n\n --bpf-logging\n Enable BPF programs logging\n\n --logging <LOGGING>\n Set lightswitch's logging level\n \n [default: info]\n [possible values: trace, debug, info, warn, error]\n\n --sample-freq <SAMPLE_FREQ_IN_HZ>\n Per-CPU Sampling Frequency in Hz\n \n [default: 19]\n\n --profile-format <PROFILE_FORMAT>\n Output file for Flame Graph in SVG format\n \n [default: flame-graph]\n [possible values: none, flame-graph, pprof]\n\n --profile-path <PROFILE_PATH>\n Path for the generated profile\n\n --profile-name <PROFILE_NAME>\n Name for the generated profile\n\n --sender <SENDER>\n Where to write the profile\n \n [default: local-disk]\n\n Possible values:\n - none: Discard the profile. Used for kernel tests\n - local-disk\n - remote\n\n --server-url <SERVER_URL>\n \n\n --perf-buffer-bytes <PERF_BUFFER_BYTES>\n Size of each profiler perf buffer, in bytes (must be a power of 2)\n \n [default: 524288]\n\n --mapsize-info\n Print eBPF map sizes after creation\n\n --mapsize-stacks <MAPSIZE_STACKS>\n max number of individual stacks to capture before aggregation\n \n [default: 100000]\n\n --mapsize-aggregated-stacks <MAPSIZE_AGGREGATED_STACKS>\n max number of unique stacks after aggregation\n \n [default: 10000]\n\n --mapsize-rate-limits <MAPSIZE_RATE_LIMITS>\n max number of rate limit entries\n \n [default: 5000]\n\n --exclude-self\n Do not profile the profiler (myself)\n\n --symbolizer <SYMBOLIZER>\n [default: local]\n [possible values: local, none]\n\n --debug-info-backend <DEBUG_INFO_BACKEND>\n [default: none]\n [possible values: none, copy, remote]\n\n --max-native-unwind-info-size-mb <MAX_NATIVE_UNWIND_INFO_SIZE_MB>\n approximate max size in megabytes used for the BPF maps that hold unwind information\n \n [default: 2147483647]\n\n --enable-deadlock-detector\n enable parking_lot's deadlock detector\n\n --cache-dir-base <CACHE_DIR_BASE>\n [default: /tmp]\n\n --force-perf-buffer\n force perf buffers even if ring buffers can be used\n\n -h, --help\n Print help (see a summary with '-h')\n""#);
insta::assert_yaml_snapshot!(actual, @r#""Usage: lightswitch [OPTIONS]\n\nOptions:\n --pids <PIDS>\n Specific PIDs to profile\n\n --tids <TIDS>\n Specific TIDs to profile (these can be outside the PIDs selected above)\n\n --show-unwind-info <PATH_TO_BINARY>\n Show unwind info for given binary\n\n --show-info <PATH_TO_BINARY>\n Show build ID for given binary\n\n -D, --duration <DURATION>\n How long this agent will run in seconds\n \n [default: 18446744073709551615]\n\n --libbpf-debug\n Enable libbpf logs. This includes the BPF verifier output\n\n --bpf-logging\n Enable BPF programs logging\n\n --logging <LOGGING>\n Set lightswitch's logging level\n \n [default: info]\n [possible values: trace, debug, info, warn, error]\n\n --sample-freq <SAMPLE_FREQ_IN_HZ>\n Per-CPU Sampling Frequency in Hz\n \n [default: 19]\n\n --profile-format <PROFILE_FORMAT>\n Output file for Flame Graph in SVG format\n \n [default: flame-graph]\n [possible values: none, flame-graph, pprof]\n\n --profile-path <PROFILE_PATH>\n Path for the generated profile\n\n --profile-name <PROFILE_NAME>\n Name for the generated profile\n\n --sender <SENDER>\n Where to write the profile\n \n [default: local-disk]\n\n Possible values:\n - none: Discard the profile. Used for kernel tests\n - local-disk\n - remote\n\n --server-url <SERVER_URL>\n \n\n --perf-buffer-bytes <PERF_BUFFER_BYTES>\n Size of each profiler perf buffer, in bytes (must be a power of 2)\n \n [default: 524288]\n\n --mapsize-info\n Print eBPF map sizes after creation\n\n --mapsize-stacks <MAPSIZE_STACKS>\n max number of individual stacks to capture before aggregation\n \n [default: 100000]\n\n --mapsize-aggregated-stacks <MAPSIZE_AGGREGATED_STACKS>\n max number of unique stacks after aggregation\n \n [default: 10000]\n\n --mapsize-rate-limits <MAPSIZE_RATE_LIMITS>\n max number of rate limit entries\n \n [default: 5000]\n\n --exclude-self\n Do not profile the profiler (myself)\n\n --symbolizer <SYMBOLIZER>\n [default: local]\n [possible values: local, none]\n\n --debug-info-backend <DEBUG_INFO_BACKEND>\n [default: none]\n [possible values: none, copy, remote]\n\n --max-native-unwind-info-size-mb <MAX_NATIVE_UNWIND_INFO_SIZE_MB>\n approximate max size in megabytes used for the BPF maps that hold unwind information\n \n [default: 2147483647]\n\n --enable-deadlock-detector\n enable parking_lot's deadlock detector\n\n --cache-dir-base <CACHE_DIR_BASE>\n [default: /tmp]\n\n --killswitch-file-path <KILLSWITCH_FILE_PATH>\n killswitch file to stop or prevent the profiler from starting. Required if duration is not set\n\n --force-perf-buffer\n force perf buffers even if ring buffers can be used\n\n -h, --help\n Print help (see a summary with '-h')\n""#);
}

#[rstest]
Expand Down
Loading