use std::borrow::Cow;
use std::io;
#[cfg(feature = "multithreaded")]
use std::mem;
#[cfg(feature = "multithreaded")]
use std::sync::Arc;
use ahash::AHashMap;
#[cfg(feature = "multithreaded")]
use crossbeam::channel;
#[cfg(feature = "multithreaded")]
use dashmap::DashMap;
use lazy_static::lazy_static;
macro_rules! invalid_data_error {
($($arg:tt)*) => {{
Err(io::Error::new(
io::ErrorKind::InvalidData,
format!($($arg)*),
))
}};
}
const CAPACITY_HASHMAP: usize = 512;
pub(crate) const CAPACITY_READER: usize = 128 * 1024;
pub(crate) const DEFAULT_NSTACKS_PER_JOB: usize = 100;
#[cfg(feature = "multithreaded")]
const NBYTES_PER_STACK_GUESS: usize = 1024;
const RUST_HASH_LENGTH: usize = 17;
#[cfg(feature = "multithreaded")]
lazy_static! {
#[doc(hidden)]
pub static ref DEFAULT_NTHREADS: usize = num_cpus::get();
}
#[cfg(not(feature = "multithreaded"))]
lazy_static! {
#[doc(hidden)]
pub static ref DEFAULT_NTHREADS: usize = 1;
}
pub trait CollapsePrivate: Send + Sized {
fn pre_process<R>(&mut self, reader: &mut R, occurrences: &mut Occurrences) -> io::Result<()>
where
R: io::BufRead;
fn collapse_single_threaded<R>(
&mut self,
reader: R,
occurrences: &mut Occurrences,
) -> io::Result<()>
where
R: io::BufRead;
fn would_end_stack(&mut self, line: &[u8]) -> bool;
fn clone_and_reset_stack_context(&self) -> Self;
fn is_applicable(&mut self, input: &str) -> Option<bool>;
fn nstacks_per_job(&self) -> usize;
fn set_nstacks_per_job(&mut self, n: usize);
fn nthreads(&self) -> usize;
fn set_nthreads(&mut self, n: usize);
fn collapse<R, W>(&mut self, mut reader: R, writer: W) -> io::Result<()>
where
R: io::BufRead,
W: io::Write,
{
let mut occurrences = Occurrences::new(self.nthreads());
self.pre_process(&mut reader, &mut occurrences)?;
if occurrences.is_concurrent() {
self.collapse_multi_threaded(reader, &mut occurrences)?;
} else {
self.collapse_single_threaded(reader, &mut occurrences)?;
}
occurrences.write_and_clear(writer)
}
#[cfg(not(feature = "multithreaded"))]
fn collapse_multi_threaded<R>(&mut self, _: R, _: &mut Occurrences) -> io::Result<()>
where
R: io::BufRead,
{
unimplemented!();
}
#[cfg(feature = "multithreaded")]
fn collapse_multi_threaded<R>(
&mut self,
mut reader: R,
occurrences: &mut Occurrences,
) -> io::Result<()>
where
R: io::BufRead,
{
let nstacks_per_job = self.nstacks_per_job();
let nthreads = self.nthreads();
assert_ne!(nstacks_per_job, 0);
assert!(nthreads > 1);
assert!(occurrences.is_concurrent());
crossbeam::thread::scope(|scope| {
let (tx_error, rx_error) = channel::bounded::<io::Error>(1);
let (tx_input, rx_input) = channel::bounded::<Vec<u8>>(2 * nthreads);
let (tx_stop, rx_stop) = channel::bounded::<()>(nthreads - 1);
let mut handles = Vec::with_capacity(nthreads);
for _ in 0..nthreads {
let tx_error = tx_error.clone();
let rx_input = rx_input.clone();
let (tx_stop, rx_stop) = (tx_stop.clone(), rx_stop.clone());
let mut folder = self.clone_and_reset_stack_context();
let mut occurrences = occurrences.clone();
#[allow(clippy::drop_copy, clippy::zero_ptr)]
let handle = scope.spawn(move |_| loop {
channel::select! {
recv(rx_input) -> input => {
let data = match input {
Ok(data) => data,
Err(_) => return,
};
if let Err(e) = folder.collapse_single_threaded(&data[..], &mut occurrences) {
for _ in 0..(nthreads - 1) {
let _ = tx_stop.try_send(());
}
let _ = tx_error.try_send(e);
return;
}
},
recv(rx_stop) -> _ => {
return;
},
}
});
handles.push(handle);
}
drop(rx_input);
let buf_capacity = usize::next_power_of_two(NBYTES_PER_STACK_GUESS * nstacks_per_job);
let mut buf = Vec::with_capacity(buf_capacity);
let (mut index, mut nstacks) = (0, 0);
loop {
let n = reader.read_until(b'\n', &mut buf)?;
if n == 0 {
let _ = tx_input.send(buf);
break;
}
let line = &buf[index..index + n];
index += n;
if self.would_end_stack(line) {
nstacks += 1;
if nstacks == nstacks_per_job {
let buf_capacity = usize::next_power_of_two(buf.capacity());
let chunk = mem::replace(&mut buf, Vec::with_capacity(buf_capacity));
if tx_input.send(chunk).is_err() {
break;
}
index = 0;
nstacks = 0;
}
continue
}
}
drop(tx_input);
drop(tx_error);
if let Some(e) = rx_error.iter().next() {
return Err(e);
}
for handle in handles {
handle.join().unwrap();
}
Ok(())
})
.unwrap()
}
}
#[derive(Clone, Debug)]
pub enum Occurrences {
SingleThreaded(AHashMap<String, usize>),
#[cfg(feature = "multithreaded")]
MultiThreaded(Arc<DashMap<String, usize, ahash::RandomState>>),
}
impl Occurrences {
#[cfg(feature = "multithreaded")]
pub(crate) fn new(nthreads: usize) -> Self {
assert_ne!(nthreads, 0);
if nthreads == 1 {
Self::new_single_threaded()
} else {
Self::new_multi_threaded()
}
}
#[cfg(not(feature = "multithreaded"))]
pub(crate) fn new(nthreads: usize) -> Self {
assert_ne!(nthreads, 0);
Self::new_single_threaded()
}
fn new_single_threaded() -> Self {
let map =
AHashMap::with_capacity_and_hasher(CAPACITY_HASHMAP, ahash::RandomState::default());
Occurrences::SingleThreaded(map)
}
#[cfg(feature = "multithreaded")]
fn new_multi_threaded() -> Self {
let map =
DashMap::with_capacity_and_hasher(CAPACITY_HASHMAP, ahash::RandomState::default());
Occurrences::MultiThreaded(Arc::new(map))
}
pub(crate) fn insert(&mut self, key: String, count: usize) -> Option<usize> {
use self::Occurrences::*;
match self {
SingleThreaded(map) => map.insert(key, count),
#[cfg(feature = "multithreaded")]
MultiThreaded(arc) => arc.insert(key, count),
}
}
pub(crate) fn insert_or_add(&mut self, key: String, count: usize) {
use self::Occurrences::*;
match self {
SingleThreaded(map) => *map.entry(key).or_insert(0) += count,
#[cfg(feature = "multithreaded")]
MultiThreaded(arc) => *arc.entry(key).or_insert(0) += count,
}
}
pub(crate) fn is_concurrent(&self) -> bool {
use self::Occurrences::*;
match self {
SingleThreaded(_) => false,
#[cfg(feature = "multithreaded")]
MultiThreaded(_) => true,
}
}
pub(crate) fn write_and_clear<W>(&mut self, mut writer: W) -> io::Result<()>
where
W: io::Write,
{
use self::Occurrences::*;
match self {
SingleThreaded(ref mut map) => {
let mut contents: Vec<_> = map.drain().collect();
contents.sort();
for (key, value) in contents {
writeln!(writer, "{} {}", key, value)?;
}
}
#[cfg(feature = "multithreaded")]
MultiThreaded(ref mut arc) => {
let map = match Arc::get_mut(arc) {
Some(map) => map,
None => panic!(
"Attempting to drain the contents of a concurrent HashMap \
when more than one thread has access to it, which is \
not allowed."
),
};
let map = mem::replace(
map,
DashMap::with_capacity_and_hasher(
CAPACITY_HASHMAP,
ahash::RandomState::default(),
),
);
let contents = map.iter().collect::<Vec<_>>();
let mut pairs = contents.iter().map(|pair| pair.pair()).collect::<Vec<_>>();
pairs.sort();
for (key, value) in pairs {
writeln!(writer, "{} {}", key, value)?;
}
}
}
Ok(())
}
}
#[allow(clippy::cognitive_complexity)]
pub(crate) fn fix_partially_demangled_rust_symbol(symbol: &str) -> Cow<str> {
let is_rust_hash = |s: &str| s.starts_with('h') && s[1..].chars().all(|c| c.is_digit(16));
if symbol.len() < RUST_HASH_LENGTH || !is_rust_hash(&symbol[symbol.len() - RUST_HASH_LENGTH..])
{
return Cow::Borrowed(symbol);
}
let mut rest = &symbol[..symbol.len() - RUST_HASH_LENGTH];
if rest.ends_with("::") {
rest = &rest[..rest.len() - 2];
}
if rest.starts_with("_$") {
rest = &rest[1..];
}
let mut demangled = String::new();
while !rest.is_empty() {
if rest.starts_with('.') {
if let Some('.') = rest[1..].chars().next() {
demangled.push_str("::");
rest = &rest[2..];
} else {
demangled.push_str(".");
rest = &rest[1..];
}
} else if rest.starts_with('$') {
macro_rules! demangle {
($($pat:expr => $demangled:expr,)*) => ({
$(if rest.starts_with($pat) {
demangled.push_str($demangled);
rest = &rest[$pat.len()..];
} else)*
{
demangled.push_str(rest);
break;
}
})
}
demangle! {
"$SP$" => "@",
"$BP$" => "*",
"$RF$" => "&",
"$LT$" => "<",
"$GT$" => ">",
"$LP$" => "(",
"$RP$" => ")",
"$C$" => ",",
"$u7e$" => "~",
"$u20$" => " ",
"$u27$" => "'",
"$u3d$" => "=",
"$u5b$" => "[",
"$u5d$" => "]",
"$u7b$" => "{",
"$u7d$" => "}",
"$u3b$" => ";",
"$u2b$" => "+",
"$u21$" => "!",
"$u22$" => "\"",
}
} else {
let idx = match rest.char_indices().find(|&(_, c)| c == '$' || c == '.') {
None => rest.len(),
Some((i, _)) => i,
};
demangled.push_str(&rest[..idx]);
rest = &rest[idx..];
}
}
Cow::Owned(demangled)
}
#[cfg(test)]
pub(crate) mod testing {
use std::collections::HashMap;
use std::fmt;
use std::fs::File;
use std::io::Write;
use std::io::{self, BufRead, Read};
use std::path::{Path, PathBuf};
use std::time::{Duration, Instant};
use libflate::gzip::Decoder;
use super::*;
use crate::collapse::Collapse;
pub(crate) trait DurationExt {
fn as_nanos_compat(&self) -> u128;
}
impl DurationExt for Duration {
fn as_nanos_compat(&self) -> u128 {
self.as_secs() as u128 * 1_000_000_000 + self.subsec_nanos() as u128
}
}
pub(crate) fn read_inputs<P>(inputs: &[P]) -> io::Result<HashMap<PathBuf, Vec<u8>>>
where
P: AsRef<Path>,
{
let mut map = HashMap::default();
for path in inputs.iter() {
let path = path.as_ref();
let bytes = {
let mut buf = Vec::new();
let mut file = File::open(path)?;
if path.to_str().unwrap().ends_with(".gz") {
let mut reader = Decoder::new(file)?;
reader.read_to_end(&mut buf)?;
} else {
file.read_to_end(&mut buf)?;
}
buf
};
map.insert(path.to_path_buf(), bytes);
}
Ok(map)
}
pub(crate) fn test_collapse_multi<C, P>(folder: &mut C, inputs: &[P]) -> io::Result<()>
where
C: Collapse + CollapsePrivate,
P: AsRef<Path>,
{
const MAX_THREADS: usize = 16;
for (path, bytes) in read_inputs(inputs)? {
folder.set_nthreads(1);
let mut writer = Vec::new();
<C as Collapse>::collapse(folder, &bytes[..], &mut writer)?;
let expected = std::str::from_utf8(&writer[..]).unwrap();
for n in 2..=MAX_THREADS {
folder.set_nthreads(n);
let mut writer = Vec::new();
<C as Collapse>::collapse(folder, &bytes[..], &mut writer)?;
let actual = std::str::from_utf8(&writer[..]).unwrap();
assert_eq!(
actual,
expected,
"Collapsing with {} threads does not produce the same output as collapsing with 1 thread for {}",
n,
path.display()
);
}
}
Ok(())
}
pub(crate) fn bench_nstacks<C, P>(folder: &mut C, inputs: &[P]) -> io::Result<()>
where
C: CollapsePrivate,
P: AsRef<Path>,
{
const MIN_LINES: usize = 2000;
const NSAMPLES: usize = 100;
const WARMUP_SECS: usize = 3;
let _stdout = io::stdout();
let _stderr = io::stdout();
let mut stdout = _stdout.lock();
let _stderr = _stderr.lock();
struct Foo<'a> {
default: usize,
nlines: usize,
nstacks: usize,
path: &'a Path,
results: HashMap<usize, u64>,
}
impl<'a> Foo<'a> {
fn new<C>(
folder: &mut C,
path: &'a Path,
bytes: &[u8],
stdout: &mut io::StdoutLock,
) -> io::Result<Option<Self>>
where
C: CollapsePrivate,
{
let default = folder.nstacks_per_job();
let (nlines, nstacks) = count_lines_and_stacks(&bytes);
if nlines < MIN_LINES {
return Ok(None);
}
let mut results = HashMap::default();
let iter = vec![default]
.into_iter()
.chain(1..=10)
.chain((20..=nstacks).step_by(10));
for nstacks_per_job in iter {
folder.set_nstacks_per_job(nstacks_per_job);
let mut durations = Vec::new();
for _ in 0..NSAMPLES {
let now = Instant::now();
folder.collapse(&bytes[..], io::sink())?;
durations.push(now.elapsed().as_nanos_compat());
}
let avg_duration =
(durations.iter().sum::<u128>() as f64 / durations.len() as f64) as u64;
results.insert(nstacks_per_job, avg_duration);
stdout.write_all(&[b'.'])?;
stdout.flush()?;
}
Ok(Some(Self {
default,
nlines,
nstacks,
path,
results,
}))
}
}
impl<'a> fmt::Display for Foo<'a> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
writeln!(
f,
"{} (nstacks: {}, lines: {})",
self.path.display(),
self.nstacks,
self.nlines
)?;
let default_duration = self.results[&self.default];
let mut results = self.results.iter().collect::<Vec<_>>();
results.sort_by(|(_, d1), (_, d2)| (**d1).cmp(*d2));
for (nstacks_per_job, duration) in results.iter().take(10) {
writeln!(
f,
" nstacks_per_job: {:>4} (% of total: {:>3.0}%) | time: {:.0}% of default",
nstacks_per_job,
(**nstacks_per_job as f32 / self.nstacks as f32) * 100.0,
**duration as f64 / default_duration as f64 * 100.0,
)?;
}
writeln!(f)?;
Ok(())
}
}
fn count_lines_and_stacks(bytes: &[u8]) -> (usize, usize) {
let mut reader = io::BufReader::new(bytes);
let mut line = String::new();
let (mut nlines, mut nstacks) = (0, 0);
loop {
line.clear();
let n = reader.read_line(&mut line).unwrap();
if n == 0 {
nstacks += 1;
break;
}
nlines += 1;
if line.trim().is_empty() {
nstacks += 1;
}
}
(nlines, nstacks)
}
let inputs = read_inputs(inputs)?;
let now = Instant::now();
stdout.write_fmt(format_args!(
"# Warming up for approximately {} seconds.\n",
WARMUP_SECS
))?;
stdout.flush()?;
while now.elapsed() < std::time::Duration::from_secs(WARMUP_SECS as u64) {
for (_, bytes) in inputs.iter() {
folder.collapse(&bytes[..], io::sink())?;
}
}
let mut foos = Vec::new();
for (path, bytes) in &inputs {
stdout.write_fmt(format_args!("# {} ", path.display()))?;
stdout.flush()?;
if let Some(foo) = Foo::new(folder, path, bytes, &mut stdout)? {
foos.push(foo);
}
stdout.write_all(&[b'\n'])?;
stdout.flush()?;
}
stdout.write_all(&[b'\n'])?;
stdout.flush()?;
foos.sort_by(|a, b| b.nstacks.cmp(&a.nstacks));
for foo in foos {
stdout.write_fmt(format_args!("{}", foo))?;
stdout.flush()?;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
macro_rules! t {
($a:expr, $b:expr) => {
assert!(ok($a, $b))
};
}
macro_rules! t_unchanged {
($a:expr) => {
assert!(ok_unchanged($a))
};
}
fn ok(sym: &str, expected: &str) -> bool {
let result = super::fix_partially_demangled_rust_symbol(sym);
if result == expected {
true
} else {
println!("\n{}\n!=\n{}\n", result, expected);
false
}
}
fn ok_unchanged(sym: &str) -> bool {
let result = super::fix_partially_demangled_rust_symbol(sym);
if result == sym {
true
} else {
println!("{} should have been unchanged, but got {}", sym, result);
false
}
}
#[test]
fn fix_partially_demangled_rust_symbols() {
t!(
"std::sys::unix::fs::File::open::hb90e1c1c787080f0",
"std::sys::unix::fs::File::open"
);
t!("_$LT$std..fs..ReadDir$u20$as$u20$core..iter..traits..iterator..Iterator$GT$::next::hc14f1750ca79129b", "<std::fs::ReadDir as core::iter::traits::iterator::Iterator>::next");
t!("rg::search_parallel::_$u7b$$u7b$closure$u7d$$u7d$::_$u7b$$u7b$closure$u7d$$u7d$::h6e849b55a66fcd85", "rg::search_parallel::_{{closure}}::_{{closure}}");
t!(
"_$LT$F$u20$as$u20$alloc..boxed..FnBox$LT$A$GT$$GT$::call_box::h8612a2a83552fc2d",
"<F as alloc::boxed::FnBox<A>>::call_box"
);
t!(
"_$LT$$RF$std..fs..File$u20$as$u20$std..io..Read$GT$::read::h5d84059cf335c8e6",
"<&std::fs::File as std::io::Read>::read"
);
t!(
"_$LT$std..thread..JoinHandle$LT$T$GT$$GT$::join::hca6aa63e512626da",
"<std::thread::JoinHandle<T>>::join"
);
t!(
"std::sync::mpsc::shared::Packet$LT$T$GT$::recv::hfde2d9e28d13fd56",
"std::sync::mpsc::shared::Packet<T>::recv"
);
t!("crossbeam_utils::thread::ScopedThreadBuilder::spawn::_$u7b$$u7b$closure$u7d$$u7d$::h8fdc7d4f74c0da05", "crossbeam_utils::thread::ScopedThreadBuilder::spawn::_{{closure}}");
}
#[test]
fn fix_partially_demangled_rust_symbol_on_fully_mangled_symbols() {
t_unchanged!("_ZN4testE");
t_unchanged!("_ZN4test1a2bcE");
t_unchanged!("_ZN7inferno10flamegraph5merge6frames17hacfe2d67301633c2E");
t_unchanged!("_ZN3std2rt19lang_start_internal17h540c897fe52ba9c5E");
t_unchanged!("_ZN116_$LT$core..str..pattern..CharSearcher$LT$$u27$a$GT$$u20$as$u20$core..str..pattern..ReverseSearcher$LT$$u27$a$GT$$GT$15next_match_back17h09d544049dd719bbE");
t_unchanged!("_ZN3std5panic12catch_unwind17h0562757d03ff60b3E");
t_unchanged!("_ZN3std9panicking3try17h9c1cbc5599e1efbfE");
}
#[test]
fn fix_partially_demangled_rust_symbol_on_fully_demangled_symbols() {
t_unchanged!("std::sys::unix::fs::File::open");
t_unchanged!("<F as alloc::boxed::FnBox<A>>::call_box");
t_unchanged!("<std::fs::ReadDir as core::iter::traits::iterator::Iterator>::next");
t_unchanged!("<rg::search::SearchWorker<W>>::search_impl");
t_unchanged!("<grep_searcher::searcher::glue::ReadByLine<'s, M, R, S>>::run");
t_unchanged!("<alloc::raw_vec::RawVec<T, A>>::reserve_internal");
}
}