use std::cell::RefCell;
use std::collections::HashMap;
use std::convert::From;
use std::sync::{
atomic::{AtomicU64 as StdAtomicU64, Ordering},
Arc, Mutex,
};
use std::time::{Duration, Instant as StdInstant};
use crate::atomic64::{Atomic, AtomicF64, AtomicU64};
use crate::desc::{Desc, Describer};
use crate::errors::{Error, Result};
use crate::metrics::{Collector, LocalMetric, Metric, Opts};
use crate::proto;
use crate::value::make_label_pairs;
use crate::vec::{MetricVec, MetricVecBuilder};
pub const DEFAULT_BUCKETS: &[f64; 11] = &[
0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
];
pub const BUCKET_LABEL: &str = "le";
#[inline]
fn check_bucket_label(label: &str) -> Result<()> {
if label == BUCKET_LABEL {
return Err(Error::Msg(
"`le` is not allowed as label name in histograms".to_owned(),
));
}
Ok(())
}
fn check_and_adjust_buckets(mut buckets: Vec<f64>) -> Result<Vec<f64>> {
if buckets.is_empty() {
buckets = Vec::from(DEFAULT_BUCKETS as &'static [f64]);
}
for (i, upper_bound) in buckets.iter().enumerate() {
if i < (buckets.len() - 1) && *upper_bound >= buckets[i + 1] {
return Err(Error::Msg(format!(
"histogram buckets must be in increasing \
order: {} >= {}",
upper_bound,
buckets[i + 1]
)));
}
}
let tail = *buckets.last().unwrap();
if tail.is_sign_positive() && tail.is_infinite() {
buckets.pop();
}
Ok(buckets)
}
#[derive(Clone, Debug)]
pub struct HistogramOpts {
pub common_opts: Opts,
pub buckets: Vec<f64>,
}
impl HistogramOpts {
pub fn new<S1: Into<String>, S2: Into<String>>(name: S1, help: S2) -> HistogramOpts {
HistogramOpts {
common_opts: Opts::new(name, help),
buckets: Vec::from(DEFAULT_BUCKETS as &'static [f64]),
}
}
pub fn namespace<S: Into<String>>(mut self, namesapce: S) -> Self {
self.common_opts.namespace = namesapce.into();
self
}
pub fn subsystem<S: Into<String>>(mut self, subsystem: S) -> Self {
self.common_opts.subsystem = subsystem.into();
self
}
pub fn const_labels(mut self, const_labels: HashMap<String, String>) -> Self {
self.common_opts = self.common_opts.const_labels(const_labels);
self
}
pub fn const_label<S1: Into<String>, S2: Into<String>>(mut self, name: S1, value: S2) -> Self {
self.common_opts = self.common_opts.const_label(name, value);
self
}
pub fn variable_labels(mut self, variable_labels: Vec<String>) -> Self {
self.common_opts = self.common_opts.variable_labels(variable_labels);
self
}
pub fn variable_label<S: Into<String>>(mut self, name: S) -> Self {
self.common_opts = self.common_opts.variable_label(name);
self
}
pub fn fq_name(&self) -> String {
self.common_opts.fq_name()
}
pub fn buckets(mut self, buckets: Vec<f64>) -> Self {
self.buckets = buckets;
self
}
}
impl Describer for HistogramOpts {
fn describe(&self) -> Result<Desc> {
self.common_opts.describe()
}
}
impl From<Opts> for HistogramOpts {
fn from(opts: Opts) -> HistogramOpts {
HistogramOpts {
common_opts: opts,
buckets: Vec::from(DEFAULT_BUCKETS as &'static [f64]),
}
}
}
#[derive(Debug)]
struct Shard {
sum: AtomicF64,
count: AtomicU64,
buckets: Vec<AtomicU64>,
}
impl Shard {
fn new(num_buckets: usize) -> Self {
let mut buckets = Vec::new();
for _ in 0..num_buckets {
buckets.push(AtomicU64::new(0));
}
Shard {
sum: AtomicF64::new(0.0),
count: AtomicU64::new(0),
buckets,
}
}
}
#[derive(Debug, Clone, Copy)]
enum ShardIndex {
First,
Second,
}
impl ShardIndex {
fn inverse(self) -> ShardIndex {
match self {
ShardIndex::First => ShardIndex::Second,
ShardIndex::Second => ShardIndex::First,
}
}
}
impl From<u64> for ShardIndex {
fn from(index: u64) -> Self {
match index {
0 => ShardIndex::First,
1 => ShardIndex::Second,
_ => panic!(
"Invalid shard index {:?}. A histogram only has two shards.",
index
),
}
}
}
impl From<ShardIndex> for usize {
fn from(index: ShardIndex) -> Self {
match index {
ShardIndex::First => 0,
ShardIndex::Second => 1,
}
}
}
#[derive(Debug)]
struct ShardAndCount {
inner: StdAtomicU64,
}
impl ShardAndCount {
fn new() -> Self {
ShardAndCount {
inner: StdAtomicU64::new(0),
}
}
fn flip(&self, ordering: Ordering) -> (ShardIndex, u64) {
let n = self.inner.fetch_add(1 << 63, ordering);
ShardAndCount::split_shard_index_and_count(n)
}
fn get(&self) -> (ShardIndex, u64) {
let n = self.inner.load(Ordering::Relaxed);
ShardAndCount::split_shard_index_and_count(n)
}
fn inc_by(&self, delta: u64, ordering: Ordering) -> (ShardIndex, u64) {
let n = self.inner.fetch_add(delta, ordering);
ShardAndCount::split_shard_index_and_count(n)
}
fn inc(&self, ordering: Ordering) -> (ShardIndex, u64) {
self.inc_by(1, ordering)
}
fn split_shard_index_and_count(n: u64) -> (ShardIndex, u64) {
let shard = n >> 63;
let count = n & ((1 << 63) - 1);
(shard.into(), count)
}
}
#[derive(Debug)]
pub struct HistogramCore {
desc: Desc,
label_pairs: Vec<proto::LabelPair>,
collect_lock: Mutex<()>,
shard_and_count: ShardAndCount,
shards: [Shard; 2],
upper_bounds: Vec<f64>,
}
impl HistogramCore {
pub fn new(opts: &HistogramOpts, label_values: &[&str]) -> Result<HistogramCore> {
let desc = opts.describe()?;
for name in &desc.variable_labels {
check_bucket_label(name)?;
}
for pair in &desc.const_label_pairs {
check_bucket_label(pair.get_name())?;
}
let label_pairs = make_label_pairs(&desc, label_values)?;
let buckets = check_and_adjust_buckets(opts.buckets.clone())?;
Ok(HistogramCore {
desc,
label_pairs,
collect_lock: Mutex::new(()),
shard_and_count: ShardAndCount::new(),
shards: [Shard::new(buckets.len()), Shard::new(buckets.len())],
upper_bounds: buckets,
})
}
pub fn observe(&self, v: f64) {
let (shard_index, _count) = self.shard_and_count.inc(Ordering::Acquire);
let shard: &Shard = &self.shards[usize::from(shard_index)];
let mut iter = self
.upper_bounds
.iter()
.enumerate()
.filter(|&(_, f)| v <= *f);
if let Some((i, _)) = iter.next() {
shard.buckets[i].inc_by(1);
}
shard.sum.inc_by(v);
shard.count.inc_by_with_ordering(1, Ordering::Release);
}
pub fn proto(&self) -> proto::Histogram {
let collect_guard = self.collect_lock.lock().expect("Lock poisoned");
let (cold_shard_index, overall_count) = self.shard_and_count.flip(Ordering::AcqRel);
let cold_shard = &self.shards[usize::from(cold_shard_index)];
let hot_shard = &self.shards[usize::from(cold_shard_index.inverse())];
while overall_count
!= cold_shard.count.compare_and_swap(
overall_count,
0,
Ordering::Acquire,
)
{}
let cold_shard_sum = cold_shard.sum.swap(0.0, Ordering::AcqRel);
let mut h = proto::Histogram::default();
h.set_sample_sum(cold_shard_sum);
h.set_sample_count(overall_count);
let mut cumulative_count = 0;
let mut buckets = Vec::with_capacity(self.upper_bounds.len());
for (i, upper_bound) in self.upper_bounds.iter().enumerate() {
let cold_bucket_count = cold_shard.buckets[i].swap(0, Ordering::AcqRel);
hot_shard.buckets[i].inc_by(cold_bucket_count);
cumulative_count += cold_bucket_count;
let mut b = proto::Bucket::default();
b.set_cumulative_count(cumulative_count);
b.set_upper_bound(*upper_bound);
buckets.push(b);
}
h.set_bucket(from_vec!(buckets));
hot_shard.count.inc_by(overall_count);
hot_shard.sum.inc_by(cold_shard_sum);
drop(collect_guard);
h
}
fn sample_sum(&self) -> f64 {
let _guard = self.collect_lock.lock().expect("Lock poisoned");
let (shard_index, _count) = self.shard_and_count.get();
self.shards[shard_index as usize].sum.get()
}
fn sample_count(&self) -> u64 {
self.shard_and_count.get().1
}
}
#[cfg(all(feature = "nightly", target_os = "linux"))]
pub struct Timespec(libc::timespec);
#[cfg(all(feature = "nightly", target_os = "linux"))]
impl std::fmt::Debug for Timespec {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Timespec {{ tv_sec: {}, tv_nsec: {} }}",
self.0.tv_sec, self.0.tv_nsec
)
}
}
#[derive(Debug)]
pub enum Instant {
Monotonic(StdInstant),
#[cfg(all(feature = "nightly", target_os = "linux"))]
MonotonicCoarse(Timespec),
}
impl Instant {
pub fn now() -> Instant {
Instant::Monotonic(StdInstant::now())
}
#[cfg(all(feature = "nightly", target_os = "linux"))]
pub fn now_coarse() -> Instant {
Instant::MonotonicCoarse(get_time_coarse())
}
#[cfg(all(feature = "nightly", not(target_os = "linux")))]
pub fn now_coarse() -> Instant {
Instant::Monotonic(StdInstant::now())
}
pub fn elapsed(&self) -> Duration {
match &*self {
Instant::Monotonic(i) => i.elapsed(),
#[cfg(all(feature = "nightly", target_os = "linux"))]
Instant::MonotonicCoarse(t) => {
let now = get_time_coarse();
let now_ms = now.0.tv_sec * MILLIS_PER_SEC + now.0.tv_nsec / NANOS_PER_MILLI;
let t_ms = t.0.tv_sec * MILLIS_PER_SEC + t.0.tv_nsec / NANOS_PER_MILLI;
let dur = now_ms - t_ms;
if dur >= 0 {
Duration::from_millis(dur as u64)
} else {
Duration::from_millis(0)
}
}
}
}
#[inline]
pub fn elapsed_sec(&self) -> f64 {
duration_to_seconds(self.elapsed())
}
}
#[cfg(all(feature = "nightly", target_os = "linux"))]
use self::coarse::*;
#[cfg(all(feature = "nightly", target_os = "linux"))]
mod coarse {
use crate::histogram::Timespec;
pub use libc::timespec;
use libc::{clock_gettime, CLOCK_MONOTONIC_COARSE};
pub const NANOS_PER_MILLI: i64 = 1_000_000;
pub const MILLIS_PER_SEC: i64 = 1_000;
pub fn get_time_coarse() -> Timespec {
let mut t = Timespec(timespec {
tv_sec: 0,
tv_nsec: 0,
});
assert_eq!(
unsafe { clock_gettime(CLOCK_MONOTONIC_COARSE, &mut t.0) },
0
);
t
}
}
#[must_use = "Timer should be kept in a variable otherwise it cannot observe duration"]
#[derive(Debug)]
pub struct HistogramTimer {
histogram: Histogram,
observed: bool,
start: Instant,
}
impl HistogramTimer {
fn new(histogram: Histogram) -> Self {
Self {
histogram,
observed: false,
start: Instant::now(),
}
}
#[cfg(feature = "nightly")]
fn new_coarse(histogram: Histogram) -> Self {
HistogramTimer {
histogram,
observed: false,
start: Instant::now_coarse(),
}
}
pub fn observe_duration(self) {
self.stop_and_record();
}
pub fn stop_and_record(self) -> f64 {
let mut timer = self;
timer.observe(true)
}
pub fn stop_and_discard(self) -> f64 {
let mut timer = self;
timer.observe(false)
}
fn observe(&mut self, record: bool) -> f64 {
let v = self.start.elapsed_sec();
self.observed = true;
if record {
self.histogram.observe(v);
}
v
}
}
impl Drop for HistogramTimer {
fn drop(&mut self) {
if !self.observed {
self.observe(true);
}
}
}
#[derive(Clone, Debug)]
pub struct Histogram {
core: Arc<HistogramCore>,
}
impl Histogram {
pub fn with_opts(opts: HistogramOpts) -> Result<Histogram> {
Histogram::with_opts_and_label_values(&opts, &[])
}
fn with_opts_and_label_values(
opts: &HistogramOpts,
label_values: &[&str],
) -> Result<Histogram> {
let core = HistogramCore::new(opts, label_values)?;
Ok(Histogram {
core: Arc::new(core),
})
}
}
impl Histogram {
pub fn observe(&self, v: f64) {
self.core.observe(v)
}
pub fn start_timer(&self) -> HistogramTimer {
HistogramTimer::new(self.clone())
}
#[cfg(feature = "nightly")]
pub fn start_coarse_timer(&self) -> HistogramTimer {
HistogramTimer::new_coarse(self.clone())
}
pub fn observe_closure_duration<F, T>(&self, f: F) -> T
where
F: FnOnce() -> T,
{
let instant = Instant::now();
let res = f();
let elapsed = instant.elapsed_sec();
self.observe(elapsed);
res
}
#[cfg(feature = "nightly")]
pub fn observe_closure_duration_coarse<F, T>(&self, f: F) -> T
where
F: FnOnce() -> T,
{
let instant = Instant::now_coarse();
let res = f();
let elapsed = instant.elapsed_sec();
self.observe(elapsed);
res
}
pub fn local(&self) -> LocalHistogram {
LocalHistogram::new(self.clone())
}
pub fn get_sample_sum(&self) -> f64 {
self.core.sample_sum()
}
pub fn get_sample_count(&self) -> u64 {
self.core.sample_count()
}
}
impl Metric for Histogram {
fn metric(&self) -> proto::Metric {
let mut m = proto::Metric::default();
m.set_label(from_vec!(self.core.label_pairs.clone()));
let h = self.core.proto();
m.set_histogram(h);
m
}
}
impl Collector for Histogram {
fn desc(&self) -> Vec<&Desc> {
vec![&self.core.desc]
}
fn collect(&self) -> Vec<proto::MetricFamily> {
let mut m = proto::MetricFamily::default();
m.set_name(self.core.desc.fq_name.clone());
m.set_help(self.core.desc.help.clone());
m.set_field_type(proto::MetricType::HISTOGRAM);
m.set_metric(from_vec!(vec![self.metric()]));
vec![m]
}
}
#[derive(Clone, Debug)]
pub struct HistogramVecBuilder {}
impl MetricVecBuilder for HistogramVecBuilder {
type M = Histogram;
type P = HistogramOpts;
fn build(&self, opts: &HistogramOpts, vals: &[&str]) -> Result<Histogram> {
Histogram::with_opts_and_label_values(opts, vals)
}
}
pub type HistogramVec = MetricVec<HistogramVecBuilder>;
impl HistogramVec {
pub fn new(opts: HistogramOpts, label_names: &[&str]) -> Result<HistogramVec> {
let variable_names = label_names.iter().map(|s| (*s).to_owned()).collect();
let opts = opts.variable_labels(variable_names);
let metric_vec =
MetricVec::create(proto::MetricType::HISTOGRAM, HistogramVecBuilder {}, opts)?;
Ok(metric_vec as HistogramVec)
}
pub fn local(&self) -> LocalHistogramVec {
let vec = self.clone();
LocalHistogramVec::new(vec)
}
}
pub fn linear_buckets(start: f64, width: f64, count: usize) -> Result<Vec<f64>> {
if count < 1 {
return Err(Error::Msg(format!(
"LinearBuckets needs a positive count, count: {}",
count
)));
}
if width <= 0.0 {
return Err(Error::Msg(format!(
"LinearBuckets needs a width greater then 0, width: {}",
width
)));
}
let buckets: Vec<_> = (0..count)
.map(|step| start + width * (step as f64))
.collect();
Ok(buckets)
}
pub fn exponential_buckets(start: f64, factor: f64, count: usize) -> Result<Vec<f64>> {
if count < 1 {
return Err(Error::Msg(format!(
"exponential_buckets needs a positive count, count: {}",
count
)));
}
if start <= 0.0 {
return Err(Error::Msg(format!(
"exponential_buckets needs a positive start value, \
start: {}",
start
)));
}
if factor <= 1.0 {
return Err(Error::Msg(format!(
"exponential_buckets needs a factor greater than 1, \
factor: {}",
factor
)));
}
let mut next = start;
let mut buckets = Vec::with_capacity(count);
for _ in 0..count {
buckets.push(next);
next *= factor;
}
Ok(buckets)
}
#[inline]
pub fn duration_to_seconds(d: Duration) -> f64 {
let nanos = f64::from(d.subsec_nanos()) / 1e9;
d.as_secs() as f64 + nanos
}
#[derive(Clone, Debug)]
pub struct LocalHistogramCore {
histogram: Histogram,
counts: Vec<u64>,
count: u64,
sum: f64,
}
#[derive(Debug)]
pub struct LocalHistogram {
core: RefCell<LocalHistogramCore>,
}
impl Clone for LocalHistogram {
fn clone(&self) -> LocalHistogram {
let core = self.core.clone();
let lh = LocalHistogram { core };
lh.clear();
lh
}
}
#[must_use = "Timer should be kept in a variable otherwise it cannot observe duration"]
#[derive(Debug)]
pub struct LocalHistogramTimer {
local: LocalHistogram,
observed: bool,
start: Instant,
}
impl LocalHistogramTimer {
fn new(histogram: LocalHistogram) -> Self {
Self {
local: histogram,
observed: false,
start: Instant::now(),
}
}
#[cfg(feature = "nightly")]
fn new_coarse(histogram: LocalHistogram) -> Self {
Self {
local: histogram,
observed: false,
start: Instant::now_coarse(),
}
}
pub fn observe_duration(self) {
self.stop_and_record();
}
pub fn stop_and_record(self) -> f64 {
let mut timer = self;
timer.observe(true)
}
pub fn stop_and_discard(self) -> f64 {
let mut timer = self;
timer.observe(false)
}
fn observe(&mut self, record: bool) -> f64 {
let v = self.start.elapsed_sec();
self.observed = true;
if record {
self.local.observe(v);
}
v
}
}
impl Drop for LocalHistogramTimer {
fn drop(&mut self) {
if !self.observed {
self.observe(true);
}
}
}
impl LocalHistogramCore {
fn new(histogram: Histogram) -> LocalHistogramCore {
let counts = vec![0; histogram.core.upper_bounds.len()];
LocalHistogramCore {
histogram,
counts,
count: 0,
sum: 0.0,
}
}
pub fn observe(&mut self, v: f64) {
let mut iter = self
.histogram
.core
.upper_bounds
.iter()
.enumerate()
.filter(|&(_, f)| v <= *f);
if let Some((i, _)) = iter.next() {
self.counts[i] += 1;
}
self.count += 1;
self.sum += v;
}
pub fn clear(&mut self) {
for v in &mut self.counts {
*v = 0
}
self.count = 0;
self.sum = 0.0;
}
pub fn flush(&mut self) {
if self.count == 0 {
return;
}
{
let (shard_index, _count) = self
.histogram
.core
.shard_and_count
.inc_by(self.count, Ordering::Acquire);
let shard = &self.histogram.core.shards[shard_index as usize];
for (i, v) in self.counts.iter().enumerate() {
if *v > 0 {
shard.buckets[i].inc_by(*v);
}
}
shard.sum.inc_by(self.sum);
shard
.count
.inc_by_with_ordering(self.count, Ordering::Release);
}
self.clear()
}
fn sample_sum(&self) -> f64 {
self.sum
}
fn sample_count(&self) -> u64 {
self.count
}
}
impl LocalHistogram {
fn new(histogram: Histogram) -> LocalHistogram {
let core = LocalHistogramCore::new(histogram);
LocalHistogram {
core: RefCell::new(core),
}
}
pub fn observe(&self, v: f64) {
self.core.borrow_mut().observe(v);
}
pub fn start_timer(&self) -> LocalHistogramTimer {
LocalHistogramTimer::new(self.clone())
}
#[cfg(feature = "nightly")]
pub fn start_coarse_timer(&self) -> LocalHistogramTimer {
LocalHistogramTimer::new_coarse(self.clone())
}
pub fn observe_closure_duration<F, T>(&self, f: F) -> T
where
F: FnOnce() -> T,
{
let instant = Instant::now();
let res = f();
let elapsed = instant.elapsed_sec();
self.observe(elapsed);
res
}
#[cfg(feature = "nightly")]
pub fn observe_closure_duration_coarse<F, T>(&self, f: F) -> T
where
F: FnOnce() -> T,
{
let instant = Instant::now_coarse();
let res = f();
let elapsed = instant.elapsed_sec();
self.observe(elapsed);
res
}
pub fn clear(&self) {
self.core.borrow_mut().clear();
}
pub fn flush(&self) {
self.core.borrow_mut().flush();
}
pub fn get_sample_sum(&self) -> f64 {
self.core.borrow().sample_sum()
}
pub fn get_sample_count(&self) -> u64 {
self.core.borrow().sample_count()
}
}
impl LocalMetric for LocalHistogram {
fn flush(&self) {
LocalHistogram::flush(self);
}
}
impl Drop for LocalHistogram {
fn drop(&mut self) {
self.flush()
}
}
#[derive(Debug)]
pub struct LocalHistogramVec {
vec: HistogramVec,
local: HashMap<u64, LocalHistogram>,
}
impl LocalHistogramVec {
fn new(vec: HistogramVec) -> LocalHistogramVec {
let local = HashMap::with_capacity(vec.v.children.read().len());
LocalHistogramVec { vec, local }
}
pub fn with_label_values<'a>(&'a mut self, vals: &[&str]) -> &'a LocalHistogram {
let hash = self.vec.v.hash_label_values(vals).unwrap();
let vec = &self.vec;
self.local
.entry(hash)
.or_insert_with(|| vec.with_label_values(vals).local())
}
pub fn remove_label_values(&mut self, vals: &[&str]) -> Result<()> {
let hash = self.vec.v.hash_label_values(vals)?;
self.local.remove(&hash);
self.vec.v.delete_label_values(vals)
}
pub fn flush(&self) {
for h in self.local.values() {
h.flush();
}
}
}
impl LocalMetric for LocalHistogramVec {
fn flush(&self) {
LocalHistogramVec::flush(self)
}
}
impl Clone for LocalHistogramVec {
fn clone(&self) -> LocalHistogramVec {
LocalHistogramVec::new(self.vec.clone())
}
}
#[cfg(test)]
mod tests {
use std::f64::{EPSILON, INFINITY};
use std::thread;
use std::time::Duration;
use super::*;
use crate::metrics::{Collector, Metric};
#[test]
fn test_histogram() {
let opts = HistogramOpts::new("test1", "test help")
.const_label("a", "1")
.const_label("b", "2");
let histogram = Histogram::with_opts(opts).unwrap();
histogram.observe(1.0);
let timer = histogram.start_timer();
thread::sleep(Duration::from_millis(100));
timer.observe_duration();
let timer = histogram.start_timer();
let handler = thread::spawn(move || {
let _timer = timer;
thread::sleep(Duration::from_millis(400));
});
assert!(handler.join().is_ok());
let mut mfs = histogram.collect();
assert_eq!(mfs.len(), 1);
let mf = mfs.pop().unwrap();
let m = mf.get_metric().get(0).unwrap();
assert_eq!(m.get_label().len(), 2);
let proto_histogram = m.get_histogram();
assert_eq!(proto_histogram.get_sample_count(), 3);
assert!(proto_histogram.get_sample_sum() >= 1.5);
assert_eq!(proto_histogram.get_bucket().len(), DEFAULT_BUCKETS.len());
let buckets = vec![1.0, 2.0, 3.0];
let opts = HistogramOpts::new("test2", "test help").buckets(buckets.clone());
let histogram = Histogram::with_opts(opts).unwrap();
let mut mfs = histogram.collect();
assert_eq!(mfs.len(), 1);
let mf = mfs.pop().unwrap();
let m = mf.get_metric().get(0).unwrap();
assert_eq!(m.get_label().len(), 0);
let proto_histogram = m.get_histogram();
assert_eq!(proto_histogram.get_sample_count(), 0);
assert!((proto_histogram.get_sample_sum() - 0.0) < EPSILON);
assert_eq!(proto_histogram.get_bucket().len(), buckets.len())
}
#[test]
#[cfg(feature = "nightly")]
fn test_histogram_coarse_timer() {
let opts = HistogramOpts::new("test1", "test help");
let histogram = Histogram::with_opts(opts).unwrap();
let timer = histogram.start_coarse_timer();
thread::sleep(Duration::from_millis(100));
timer.observe_duration();
let timer = histogram.start_coarse_timer();
let handler = thread::spawn(move || {
let _timer = timer;
thread::sleep(Duration::from_millis(400));
});
assert!(handler.join().is_ok());
histogram.observe_closure_duration(|| {
thread::sleep(Duration::from_millis(400));
});
let mut mfs = histogram.collect();
assert_eq!(mfs.len(), 1);
let mf = mfs.pop().unwrap();
let m = mf.get_metric().get(0).unwrap();
let proto_histogram = m.get_histogram();
assert_eq!(proto_histogram.get_sample_count(), 3);
assert!((proto_histogram.get_sample_sum() - 0.0) > EPSILON);
}
#[test]
#[cfg(feature = "nightly")]
fn test_instant_on_smp() {
let zero = Duration::from_millis(0);
for i in 0..100_000 {
let now = Instant::now();
let now_coarse = Instant::now_coarse();
if i % 100 == 0 {
thread::yield_now();
}
assert!(now.elapsed() >= zero);
assert!(now_coarse.elapsed() >= zero);
}
}
#[test]
fn test_buckets_invalidation() {
let table = vec![
(vec![], true, DEFAULT_BUCKETS.len()),
(vec![-2.0, -1.0, -0.5, 0.0, 0.5, 1.0, 2.0], true, 7),
(vec![-2.0, -1.0, -0.5, 10.0, 0.5, 1.0, 2.0], false, 7),
(vec![-2.0, -1.0, -0.5, 0.0, 0.5, 1.0, INFINITY], true, 6),
];
for (buckets, is_ok, length) in table {
let got = check_and_adjust_buckets(buckets);
assert_eq!(got.is_ok(), is_ok);
if is_ok {
assert_eq!(got.unwrap().len(), length);
}
}
}
#[test]
fn test_buckets_functions() {
let linear_table = vec![
(
-15.0,
5.0,
6,
true,
vec![-15.0, -10.0, -5.0, 0.0, 5.0, 10.0],
),
(-15.0, 0.0, 6, false, vec![]),
(-15.0, 5.0, 0, false, vec![]),
];
for (param1, param2, param3, is_ok, vec) in linear_table {
let got = linear_buckets(param1, param2, param3);
assert_eq!(got.is_ok(), is_ok);
if got.is_ok() {
assert_eq!(got.unwrap(), vec);
}
}
let exponential_table = vec![
(100.0, 1.2, 3, true, vec![100.0, 120.0, 144.0]),
(100.0, 0.5, 3, false, vec![]),
(100.0, 1.2, 0, false, vec![]),
];
for (param1, param2, param3, is_ok, vec) in exponential_table {
let got = exponential_buckets(param1, param2, param3);
assert_eq!(got.is_ok(), is_ok);
if got.is_ok() {
assert_eq!(got.unwrap(), vec);
}
}
}
#[test]
fn test_duration_to_seconds() {
let tbls = vec![(1000, 1.0), (1100, 1.1), (100_111, 100.111)];
for (millis, seconds) in tbls {
let d = Duration::from_millis(millis);
let v = duration_to_seconds(d);
assert!((v - seconds).abs() < EPSILON);
}
}
#[test]
fn test_histogram_vec_with_label_values() {
let vec = HistogramVec::new(
HistogramOpts::new("test_histogram_vec", "test histogram vec help"),
&["l1", "l2"],
)
.unwrap();
assert!(vec.remove_label_values(&["v1", "v2"]).is_err());
vec.with_label_values(&["v1", "v2"]).observe(1.0);
assert!(vec.remove_label_values(&["v1", "v2"]).is_ok());
assert!(vec.remove_label_values(&["v1"]).is_err());
assert!(vec.remove_label_values(&["v1", "v3"]).is_err());
}
#[test]
fn test_histogram_vec_with_opts_buckets() {
let labels = ["l1", "l2"];
let buckets = vec![1.0, 2.0, 3.0];
let vec = HistogramVec::new(
HistogramOpts::new("test_histogram_vec", "test histogram vec help")
.buckets(buckets.clone()),
&labels,
)
.unwrap();
let histogram = vec.with_label_values(&["v1", "v2"]);
histogram.observe(1.0);
let m = histogram.metric();
assert_eq!(m.get_label().len(), labels.len());
let proto_histogram = m.get_histogram();
assert_eq!(proto_histogram.get_sample_count(), 1);
assert!((proto_histogram.get_sample_sum() - 1.0) < EPSILON);
assert_eq!(proto_histogram.get_bucket().len(), buckets.len())
}
#[test]
fn test_histogram_local() {
let buckets = vec![1.0, 2.0, 3.0];
let opts = HistogramOpts::new("test_histogram_local", "test histogram local help")
.buckets(buckets.clone());
let histogram = Histogram::with_opts(opts).unwrap();
let local = histogram.local();
let check = |count, sum| {
let m = histogram.metric();
let proto_histogram = m.get_histogram();
assert_eq!(proto_histogram.get_sample_count(), count);
assert!((proto_histogram.get_sample_sum() - sum) < EPSILON);
};
local.observe(1.0);
local.observe(4.0);
check(0, 0.0);
local.flush();
check(2, 5.0);
local.observe(2.0);
local.clear();
check(2, 5.0);
local.observe(2.0);
drop(local);
check(3, 7.0);
}
#[test]
fn test_histogram_vec_local() {
let vec = HistogramVec::new(
HistogramOpts::new("test_histogram_vec_local", "test histogram vec help"),
&["l1", "l2"],
)
.unwrap();
let mut local_vec = vec.local();
vec.remove_label_values(&["v1", "v2"]).unwrap_err();
local_vec.remove_label_values(&["v1", "v2"]).unwrap_err();
let check = |count, sum| {
let ms = vec.collect()[0].take_metric();
let proto_histogram = ms[0].get_histogram();
assert_eq!(proto_histogram.get_sample_count(), count);
assert!((proto_histogram.get_sample_sum() - sum) < EPSILON);
};
{
let h = local_vec.with_label_values(&["v1", "v2"]);
h.observe(1.0);
h.flush();
check(1, 1.0);
}
{
local_vec.with_label_values(&["v1", "v2"]).observe(4.0);
local_vec.flush();
check(2, 5.0);
}
{
local_vec.remove_label_values(&["v1", "v2"]).unwrap();
local_vec.with_label_values(&["v1", "v2"]).observe(2.0);
drop(local_vec);
check(1, 2.0);
}
}
#[test]
fn atomic_observe_across_collects() {
let done = Arc::new(std::sync::atomic::AtomicBool::default());
let histogram =
Histogram::with_opts(HistogramOpts::new("test_name", "test help").buckets(vec![1.0]))
.unwrap();
let done_clone = done.clone();
let histogram_clone = histogram.clone();
let observing_thread = std::thread::spawn(move || loop {
if done_clone.load(std::sync::atomic::Ordering::Relaxed) {
break;
}
for _ in 0..1_000_000 {
histogram_clone.observe(1.0);
}
});
let mut sample_count = 0;
let mut cumulative_count = 0;
let mut sample_sum = 0;
for _ in 0..1_000_000 {
let metric = &histogram.collect()[0].take_metric()[0];
let proto = metric.get_histogram();
sample_count = proto.get_sample_count();
sample_sum = proto.get_sample_sum() as u64;
cumulative_count = proto.get_bucket()[0].get_cumulative_count();
if sample_count != cumulative_count {
break;
}
if sample_count != sample_sum {
break;
}
}
done.store(true, std::sync::atomic::Ordering::Relaxed);
observing_thread.join().unwrap();
if sample_count != cumulative_count {
panic!(
"Histogram invariant violated: For a histogram with a single \
bucket observing values below the bucket's upper bound only \
the histogram's count should always be equal to the buckets's \
cumulative count, got {:?} and {:?} instead.",
sample_count, cumulative_count,
);
}
if sample_count != sample_sum {
panic!(
"Histogram invariant violated: For a histogram which is only \
ever observing a value of `1.0` the sample count should equal \
the sum, instead got: {:?} and {:?}",
sample_count, sample_sum,
)
}
}
#[test]
fn test_error_on_inconsistent_label_cardinality() {
let hist = Histogram::with_opts(
histogram_opts!(
"example_histogram",
"Used as an example",
vec![0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 5.0]
)
.variable_label("example_variable"),
);
if let Err(Error::InconsistentCardinality { expect, got }) = hist {
assert_eq!(1, expect);
assert_eq!(0, got);
} else {
panic!("Expected InconsistentCardinality error.")
}
}
}