#[cfg(feature = "standard-clock")]
use crate::clock::StandardClock;
use crate::clock::{BlockingClock, Clock};
use pin_project_lite::pin_project;
use std::{
future::Future,
mem,
ops::Sub,
pin::Pin,
sync::Arc,
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Mutex,
},
task::{Context, Poll},
time::Duration,
};
#[derive(Debug, Clone, Copy)]
struct Bucket<I> {
last_updated: I,
speed_limit: f64,
refill: f64,
value: f64,
}
impl<I> Bucket<I> {
fn capacity(&self) -> f64 {
self.speed_limit * self.refill
}
fn consume(&mut self, size: f64) -> Duration {
self.value -= size;
if self.value > 0.0 {
Duration::from_secs(0)
} else {
let sleep_secs = self.refill - self.value / self.speed_limit;
Duration::from_secs_f64(sleep_secs)
}
}
fn set_speed_limit(&mut self, new_speed_limit: f64) {
let old_capacity = self.capacity();
self.speed_limit = new_speed_limit;
if new_speed_limit.is_finite() {
let new_capacity = self.capacity();
if old_capacity.is_finite() {
self.value += new_capacity - old_capacity;
} else {
self.value = new_capacity;
}
}
}
}
impl<I: Copy + Sub<Output = Duration>> Bucket<I> {
fn refill(&mut self, now: I) {
let elapsed = (now - self.last_updated).as_secs_f64();
let refilled = self.speed_limit * elapsed;
self.value = self.capacity().min(self.value + refilled);
self.last_updated = now;
}
}
#[cfg_attr(feature = "standard-clock", doc = "```rust")]
#[cfg_attr(not(feature = "standard-clock"), doc = "```ignore")]
#[derive(Debug)]
pub struct Builder<C: Clock> {
clock: C,
bucket: Bucket<C::Instant>,
}
impl<C: Clock> Builder<C> {
pub fn new(speed_limit: f64) -> Self {
let clock = C::default();
let mut result = Self {
bucket: Bucket {
last_updated: clock.now(),
speed_limit: 0.0,
refill: 0.1,
value: 0.0,
},
clock,
};
result.speed_limit(speed_limit);
result
}
pub fn speed_limit(&mut self, speed_limit: f64) -> &mut Self {
assert!(speed_limit > 0.0, "speed limit must be positive");
self.bucket.speed_limit = speed_limit;
self
}
pub fn refill(&mut self, dur: Duration) -> &mut Self {
assert!(
dur > Duration::from_secs(0),
"refill duration must not be zero"
);
self.bucket.refill = dur.as_secs_f64();
self
}
pub fn clock(&mut self, clock: C) -> &mut Self {
self.clock = clock;
self
}
pub fn build(&mut self) -> Limiter<C> {
self.bucket.value = self.bucket.capacity();
self.bucket.last_updated = self.clock.now();
let is_unlimited = self.bucket.speed_limit.is_infinite();
Limiter {
bucket: Arc::new(Mutex::new(self.bucket)),
clock: mem::take(&mut self.clock),
total_bytes_consumed: Arc::new(AtomicUsize::new(0)),
is_unlimited: Arc::new(AtomicBool::new(is_unlimited)),
}
}
}
macro_rules! declare_limiter {
($($default_clock:tt)*) => {
#[cfg_attr(feature = "standard-clock", doc = "```rust")]
#[cfg_attr(not(feature = "standard-clock"), doc = "```ignore")]
#[derive(Debug, Clone)]
pub struct Limiter<C: Clock $($default_clock)*> {
bucket: Arc<Mutex<Bucket<C::Instant>>>,
clock: C,
total_bytes_consumed: Arc<AtomicUsize>,
is_unlimited: Arc<AtomicBool>,
}
}
}
#[cfg(feature = "standard-clock")]
declare_limiter! { = StandardClock }
#[cfg(not(feature = "standard-clock"))]
declare_limiter! {}
impl<C: Clock> Limiter<C> {
pub fn new(speed_limit: f64) -> Self {
Builder::new(speed_limit).build()
}
pub fn builder(speed_limit: f64) -> Builder<C> {
Builder::new(speed_limit)
}
pub fn clock(&self) -> &C {
&self.clock
}
pub fn set_speed_limit(&self, speed_limit: f64) {
debug_assert!(speed_limit > 0.0, "speed limit must be positive");
self.bucket.lock().unwrap().set_speed_limit(speed_limit);
self.is_unlimited
.store(speed_limit.is_infinite(), Ordering::Relaxed);
}
pub fn speed_limit(&self) -> f64 {
self.bucket.lock().unwrap().speed_limit
}
pub fn total_bytes_consumed(&self) -> usize {
self.total_bytes_consumed.load(Ordering::Relaxed)
}
pub fn reset_statistics(&self) {
self.total_bytes_consumed.store(0, Ordering::Relaxed);
}
fn consume_duration(&self, byte_size: usize) -> Duration {
self.total_bytes_consumed
.fetch_add(byte_size, Ordering::Relaxed);
if self.is_unlimited.load(Ordering::Relaxed) {
return Duration::from_secs(0);
}
#[allow(clippy::cast_precision_loss)]
let size = byte_size as f64;
let mut bucket = self.bucket.lock().unwrap();
bucket.refill(self.clock.now());
bucket.consume(size)
}
pub fn consume(&self, byte_size: usize) -> Consume<C, ()> {
let sleep_dur = self.consume_duration(byte_size);
let future = if sleep_dur == Duration::from_secs(0) {
None
} else {
Some(self.clock.sleep(sleep_dur))
};
Consume {
future,
result: Some(()),
}
}
pub fn limit<R>(self, resource: R) -> Resource<R, C> {
Resource::new(self, resource)
}
#[cfg(test)]
fn shared_count(&self) -> usize {
Arc::strong_count(&self.bucket)
}
}
impl<C: BlockingClock> Limiter<C> {
pub fn blocking_consume(&self, byte_size: usize) {
let sleep_dur = self.consume_duration(byte_size);
self.clock.blocking_sleep(sleep_dur);
}
}
#[derive(Debug)]
pub struct Consume<C: Clock, R> {
future: Option<C::Delay>,
result: Option<R>,
}
#[allow(clippy::use_self)]
impl<C: Clock, R> Consume<C, R> {
pub fn map<T, F: FnOnce(R) -> T>(self, f: F) -> Consume<C, T> {
Consume {
future: self.future,
result: self.result.map(f),
}
}
}
impl<C: Clock, R: Unpin> Future for Consume<C, R> {
type Output = R;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
let is_ready = match &mut this.future {
Some(future) => Pin::new(future).poll(cx).is_ready(),
None => true,
};
if is_ready {
if let Some(value) = this.result.take() {
return Poll::Ready(value);
}
}
Poll::Pending
}
}
#[cfg(feature = "fused-future")]
impl<C: Clock, R: Unpin> futures_core::future::FusedFuture for Consume<C, R> {
fn is_terminated(&self) -> bool {
self.result.is_none()
}
}
pin_project! {
pub struct Resource<R, C: Clock> {
limiter: Limiter<C>,
#[pin]
resource: R,
waiter: Option<Consume<C, ()>>,
}
}
impl<R, C: Clock> Resource<R, C> {
pub fn new(limiter: Limiter<C>, resource: R) -> Self {
Self {
limiter,
resource,
waiter: None,
}
}
pub fn into_inner(self) -> R {
self.resource
}
pub fn get_ref(&self) -> &R {
&self.resource
}
pub fn get_mut(&mut self) -> &mut R {
&mut self.resource
}
pub fn get_pin_ref(self: Pin<&Self>) -> Pin<&R> {
self.project_ref().resource
}
pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> {
self.project().resource
}
}
impl<R, C: Clock> Resource<R, C> {
#[allow(dead_code)]
pub(crate) fn poll_limited<T>(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
length: impl FnOnce(&T) -> usize,
poll: impl FnOnce(Pin<&mut R>, &mut Context<'_>) -> Poll<T>,
) -> Poll<T> {
let this = self.project();
if let Some(waiter) = this.waiter {
let res = Pin::new(waiter).poll(cx);
if res.is_pending() {
return Poll::Pending;
}
*this.waiter = None;
}
let res = poll(this.resource, cx);
if let Poll::Ready(obj) = &res {
let len = length(obj);
if len > 0 {
*this.waiter = Some(this.limiter.consume(len));
}
}
res
}
}
#[cfg(test)]
mod tests_with_manual_clock {
use super::*;
use crate::clock::{Clock, ManualClock, Nanoseconds};
use futures_executor::LocalPool;
use futures_util::task::SpawnExt;
use std::{future::Future, thread::panicking};
#[derive(Clone)]
struct SharedFixture {
limiter: Limiter<ManualClock>,
}
impl SharedFixture {
fn now(&self) -> u64 {
self.limiter.clock().now().0
}
fn sleep(&self, nanos: u64) -> impl Future<Output = ()> + '_ {
self.limiter.clock().sleep(Duration::from_nanos(nanos))
}
fn consume(&self, bytes: usize) -> impl Future<Output = ()> + '_ {
self.limiter.consume(bytes)
}
}
struct Fixture {
shared: SharedFixture,
pool: LocalPool,
}
impl Fixture {
fn new() -> Self {
Self {
shared: SharedFixture {
limiter: Limiter::builder(512.0)
.refill(Duration::from_secs(1))
.build(),
},
pool: LocalPool::new(),
}
}
fn spawn<F, G>(&self, f: F)
where
F: FnOnce(SharedFixture) -> G,
G: Future<Output = ()> + Send + 'static,
{
self.pool.spawner().spawn(f(self.shared.clone())).unwrap();
}
fn set_time(&mut self, time: u64) {
self.shared.limiter.clock().set_time(Nanoseconds(time));
self.pool.run_until_stalled();
}
fn set_speed_limit(&self, limit: f64) {
self.shared.limiter.set_speed_limit(limit);
}
fn total_bytes_consumed(&self) -> usize {
self.shared.limiter.total_bytes_consumed()
}
}
impl Drop for Fixture {
fn drop(&mut self) {
if !panicking() {
assert_eq!(self.shared.limiter.shared_count(), 1);
}
}
}
#[test]
fn under_limit_single_thread() {
let mut fx = Fixture::new();
fx.spawn(|sfx| async move {
sfx.consume(50).await;
assert_eq!(sfx.now(), 0);
sfx.consume(51).await;
assert_eq!(sfx.now(), 0);
sfx.consume(52).await;
assert_eq!(sfx.now(), 0);
sfx.consume(53).await;
assert_eq!(sfx.now(), 0);
sfx.consume(54).await;
assert_eq!(sfx.now(), 0);
sfx.consume(55).await;
assert_eq!(sfx.now(), 0);
});
fx.set_time(0);
assert_eq!(fx.total_bytes_consumed(), 315);
}
#[test]
fn over_limit_single_thread() {
let mut fx = Fixture::new();
fx.spawn(|sfx| {
async move {
sfx.consume(200).await;
assert_eq!(sfx.now(), 0);
sfx.consume(201).await;
assert_eq!(sfx.now(), 0);
sfx.consume(202).await;
assert_eq!(sfx.now(), 1_177_734_375);
sfx.consume(203).await;
assert_eq!(sfx.now(), 1_177_734_375);
sfx.consume(204).await;
assert_eq!(sfx.now(), 1_177_734_375);
sfx.consume(205).await;
assert_eq!(sfx.now(), 2_373_046_875);
}
});
fx.set_time(0);
assert_eq!(fx.total_bytes_consumed(), 603);
fx.set_time(1_177_734_374);
assert_eq!(fx.total_bytes_consumed(), 603);
fx.set_time(1_177_734_375);
assert_eq!(fx.total_bytes_consumed(), 1215);
fx.set_time(2_373_046_874);
assert_eq!(fx.total_bytes_consumed(), 1215);
fx.set_time(2_373_046_875);
assert_eq!(fx.total_bytes_consumed(), 1215);
}
#[test]
fn over_limit_multi_thread() {
let mut fx = Fixture::new();
fx.spawn(|sfx| async move {
sfx.consume(200).await;
assert_eq!(sfx.now(), 0);
sfx.consume(202).await;
assert_eq!(sfx.now(), 0);
sfx.consume(204).await;
assert_eq!(sfx.now(), 1_183_593_750);
sfx.consume(206).await;
assert_eq!(sfx.now(), 1_183_593_750);
sfx.consume(208).await;
assert_eq!(sfx.now(), 2_384_765_625);
});
fx.spawn(|sfx| async move {
sfx.consume(201).await;
assert_eq!(sfx.now(), 1_576_171_875);
sfx.consume(203).await;
assert_eq!(sfx.now(), 2_781_250_000);
sfx.consume(205).await;
assert_eq!(sfx.now(), 2_781_250_000);
sfx.consume(207).await;
assert_eq!(sfx.now(), 2_781_250_000);
sfx.consume(209).await;
assert_eq!(sfx.now(), 3_994_140_625);
});
fx.set_time(0);
assert_eq!(fx.total_bytes_consumed(), 807);
fx.set_time(1_183_593_749);
assert_eq!(fx.total_bytes_consumed(), 807);
fx.set_time(1_183_593_750);
assert_eq!(fx.total_bytes_consumed(), 1221);
fx.set_time(1_576_171_874);
assert_eq!(fx.total_bytes_consumed(), 1221);
fx.set_time(1_576_171_875);
assert_eq!(fx.total_bytes_consumed(), 1424);
fx.set_time(2_384_765_624);
assert_eq!(fx.total_bytes_consumed(), 1424);
fx.set_time(2_384_765_625);
assert_eq!(fx.total_bytes_consumed(), 1424);
fx.set_time(2_781_249_999);
assert_eq!(fx.total_bytes_consumed(), 1424);
fx.set_time(2_781_250_000);
assert_eq!(fx.total_bytes_consumed(), 2045);
fx.set_time(3_994_140_624);
assert_eq!(fx.total_bytes_consumed(), 2045);
fx.set_time(3_994_140_625);
assert_eq!(fx.total_bytes_consumed(), 2045);
}
#[test]
fn over_limit_multi_thread_2() {
let mut fx = Fixture::new();
fx.spawn(|sfx| async move {
sfx.consume(300).await;
assert_eq!(sfx.now(), 0);
sfx.consume(301).await;
assert_eq!(sfx.now(), 1_173_828_125);
sfx.consume(302).await;
assert_eq!(sfx.now(), 1_173_828_125);
sfx.consume(303).await;
assert_eq!(sfx.now(), 2_550_781_250);
sfx.consume(304).await;
assert_eq!(sfx.now(), 2_550_781_250);
});
fx.spawn(|sfx| async move {
sfx.consume(100).await;
assert_eq!(sfx.now(), 1_369_140_625);
sfx.consume(101).await;
assert_eq!(sfx.now(), 2_748_046_875);
sfx.consume(102).await;
assert_eq!(sfx.now(), 2_748_046_875);
sfx.consume(103).await;
assert_eq!(sfx.now(), 2_748_046_875);
sfx.consume(104).await;
assert_eq!(sfx.now(), 3_945_312_500);
});
fx.set_time(0);
assert_eq!(fx.total_bytes_consumed(), 701);
fx.set_time(1_173_828_125);
assert_eq!(fx.total_bytes_consumed(), 1306);
fx.set_time(1_369_140_625);
assert_eq!(fx.total_bytes_consumed(), 1407);
fx.set_time(2_550_781_250);
assert_eq!(fx.total_bytes_consumed(), 1711);
fx.set_time(2_748_046_875);
assert_eq!(fx.total_bytes_consumed(), 2020);
fx.set_time(3_945_312_500);
assert_eq!(fx.total_bytes_consumed(), 2020);
}
#[test]
fn over_limit_multi_thread_yielded() {
let mut fx = Fixture::new();
fx.spawn(|sfx| async move {
sfx.consume(300).await;
assert_eq!(sfx.now(), 0);
sfx.sleep(1).await;
sfx.consume(301).await;
assert_eq!(sfx.now(), 1_369_140_625);
sfx.sleep(1).await;
sfx.consume(302).await;
assert_eq!(sfx.now(), 1_369_140_626);
sfx.sleep(1).await;
sfx.consume(303).await;
assert_eq!(sfx.now(), 2_748_046_875);
sfx.sleep(1).await;
sfx.consume(304).await;
assert_eq!(sfx.now(), 2_748_046_876);
});
fx.spawn(|sfx| async move {
sfx.consume(100).await;
assert_eq!(sfx.now(), 0);
sfx.sleep(1).await;
sfx.consume(101).await;
assert_eq!(sfx.now(), 1_566_406_250);
sfx.sleep(1).await;
sfx.consume(102).await;
assert_eq!(sfx.now(), 2_947_265_625);
sfx.sleep(1).await;
sfx.consume(103).await;
assert_eq!(sfx.now(), 2_947_265_626);
sfx.sleep(1).await;
sfx.consume(104).await;
assert_eq!(sfx.now(), 2_947_265_627);
});
fx.set_time(0);
assert_eq!(fx.total_bytes_consumed(), 400);
fx.set_time(1);
assert_eq!(fx.total_bytes_consumed(), 802);
fx.set_time(1_369_140_625);
assert_eq!(fx.total_bytes_consumed(), 802);
fx.set_time(1_369_140_626);
assert_eq!(fx.total_bytes_consumed(), 1104);
fx.set_time(1_566_406_250);
assert_eq!(fx.total_bytes_consumed(), 1407);
fx.set_time(1_566_406_251);
assert_eq!(fx.total_bytes_consumed(), 1509);
fx.set_time(2_748_046_875);
assert_eq!(fx.total_bytes_consumed(), 1509);
fx.set_time(2_748_046_876);
assert_eq!(fx.total_bytes_consumed(), 1813);
fx.set_time(2_947_265_625);
assert_eq!(fx.total_bytes_consumed(), 1813);
fx.set_time(2_947_265_626);
assert_eq!(fx.total_bytes_consumed(), 1916);
fx.set_time(2_947_265_627);
assert_eq!(fx.total_bytes_consumed(), 2020);
}
#[test]
fn hiatus() {
let mut fx = Fixture::new();
fx.spawn(|sfx| async move {
sfx.consume(400).await;
assert_eq!(sfx.now(), 0);
sfx.consume(401).await;
assert_eq!(sfx.now(), 1_564_453_125);
sfx.sleep(10_000_000_000).await;
assert_eq!(sfx.now(), 11_564_453_125);
sfx.consume(402).await;
assert_eq!(sfx.now(), 11_564_453_125);
sfx.consume(403).await;
assert_eq!(sfx.now(), 13_136_718_750);
});
fx.set_time(0);
assert_eq!(fx.total_bytes_consumed(), 801);
fx.set_time(1_564_453_125);
assert_eq!(fx.total_bytes_consumed(), 801);
fx.set_time(11_564_453_125);
assert_eq!(fx.total_bytes_consumed(), 1606);
fx.set_time(13_136_718_750);
assert_eq!(fx.total_bytes_consumed(), 1606);
}
#[test]
fn burst() {
let mut fx = Fixture::new();
fx.spawn(|sfx| async move {
sfx.consume(5000).await;
assert_eq!(sfx.now(), 9_765_625_000);
sfx.consume(5001).await;
assert_eq!(sfx.now(), 19_533_203_125);
sfx.consume(5002).await;
assert_eq!(sfx.now(), 29_302_734_375);
});
fx.set_time(0);
assert_eq!(fx.total_bytes_consumed(), 5000);
fx.set_time(9_765_625_000);
assert_eq!(fx.total_bytes_consumed(), 10001);
fx.set_time(19_533_203_125);
assert_eq!(fx.total_bytes_consumed(), 15003);
fx.set_time(29_302_734_375);
assert_eq!(fx.total_bytes_consumed(), 15003);
}
#[test]
fn change_speed_limit() {
let mut fx = Fixture::new();
fx.spawn(|sfx| async move {
for _ in 0..20 {
sfx.consume(256).await;
}
});
fx.set_time(0);
assert_eq!(fx.total_bytes_consumed(), 512);
fx.set_time(500_000_000);
assert_eq!(fx.total_bytes_consumed(), 512);
fx.set_time(1_000_000_000);
assert_eq!(fx.total_bytes_consumed(), 1024);
fx.set_time(1_500_000_000);
assert_eq!(fx.total_bytes_consumed(), 1024);
fx.set_speed_limit(256.0);
fx.set_time(1_500_000_001);
assert_eq!(fx.total_bytes_consumed(), 1024);
fx.set_time(2_000_000_000);
assert_eq!(fx.total_bytes_consumed(), 1280);
fx.set_time(2_500_000_000);
assert_eq!(fx.total_bytes_consumed(), 1280);
fx.set_time(3_000_000_000);
assert_eq!(fx.total_bytes_consumed(), 1280);
fx.set_time(3_500_000_000);
assert_eq!(fx.total_bytes_consumed(), 1280);
fx.set_time(4_000_000_000);
assert_eq!(fx.total_bytes_consumed(), 1536);
fx.set_time(4_500_000_000);
assert_eq!(fx.total_bytes_consumed(), 1536);
fx.set_speed_limit(1024.0);
fx.set_time(4_500_000_001);
assert_eq!(fx.total_bytes_consumed(), 1536);
fx.set_time(5_000_000_000);
assert_eq!(fx.total_bytes_consumed(), 2560);
fx.set_time(5_500_000_000);
assert_eq!(fx.total_bytes_consumed(), 2560);
fx.set_time(6_000_000_000);
assert_eq!(fx.total_bytes_consumed(), 3584);
fx.set_time(6_500_000_000);
assert_eq!(fx.total_bytes_consumed(), 3584);
fx.set_time(7_000_000_000);
assert_eq!(fx.total_bytes_consumed(), 4608);
fx.set_time(7_500_000_000);
assert_eq!(fx.total_bytes_consumed(), 4608);
fx.set_time(8_000_000_000);
assert_eq!(fx.total_bytes_consumed(), 5120);
}
#[test]
fn thousand_cuts() {
let mut fx = Fixture::new();
fx.spawn(|sfx| async move {
for _ in 0..64 {
sfx.consume(16).await;
}
});
fx.spawn(|sfx| async move {
sfx.consume(555).await;
assert_eq!(sfx.now(), 2_083_984_375);
sfx.consume(556).await;
assert_eq!(sfx.now(), 3_201_171_875);
});
fx.set_time(0);
assert_eq!(fx.total_bytes_consumed(), 1067);
fx.set_time(1_000_000_000);
assert_eq!(fx.total_bytes_consumed(), 1083);
fx.set_time(2_000_000_000);
assert_eq!(fx.total_bytes_consumed(), 1083);
fx.set_time(2_083_984_375);
assert_eq!(fx.total_bytes_consumed(), 1639);
fx.set_time(3_000_000_000);
assert_eq!(fx.total_bytes_consumed(), 2055);
fx.set_time(3_201_171_875);
assert_eq!(fx.total_bytes_consumed(), 2055);
fx.set_time(4_000_000_000);
assert_eq!(fx.total_bytes_consumed(), 2055);
fx.set_time(4_169_921_875);
assert_eq!(fx.total_bytes_consumed(), 2135);
}
#[test]
fn set_infinite_speed_limit() {
let mut fx = Fixture::new();
fx.spawn(|sfx| async move {
for _ in 0..1000 {
sfx.consume(512).await;
}
sfx.sleep(1).await;
for _ in 0..1000 {
sfx.consume(512).await;
}
sfx.sleep(1).await;
sfx.consume(512).await;
sfx.consume(512).await;
});
fx.set_time(0);
assert_eq!(fx.total_bytes_consumed(), 512);
fx.set_time(1_000_000_000);
assert_eq!(fx.total_bytes_consumed(), 1024);
fx.set_speed_limit(std::f64::INFINITY);
fx.set_time(1_500_000_000);
assert_eq!(fx.total_bytes_consumed(), 1024);
fx.set_time(2_000_000_000);
assert_eq!(fx.total_bytes_consumed(), 512_000);
fx.set_speed_limit(std::f64::INFINITY);
fx.set_time(2_000_000_001);
assert_eq!(fx.total_bytes_consumed(), 1_024_000);
fx.set_speed_limit(512.0);
fx.set_time(2_000_000_002);
assert_eq!(fx.total_bytes_consumed(), 1_024_512);
fx.set_time(3_000_000_002);
assert_eq!(fx.total_bytes_consumed(), 1_025_024);
fx.set_time(4_000_000_002);
assert_eq!(fx.total_bytes_consumed(), 1_025_024);
}
}
#[cfg(test)]
#[cfg(feature = "standard-clock")]
mod tests_with_standard_clock {
use super::*;
use futures_executor::LocalPool;
use futures_util::{future::join_all, task::SpawnExt};
use rand::{thread_rng, Rng};
use std::time::Instant;
#[test]
fn rate() {
eprintln!("tests_with_standard_clock::rate() will run for 20 seconds, please be patient");
let mut pool = LocalPool::new();
let sp = pool.spawner();
for &i in &[1, 2, 4, 8, 16] {
let target = i * 10_240;
let limiter = <Limiter>::new(target as f64);
for &speed_limit in &[target, target * 2] {
limiter.reset_statistics();
limiter.set_speed_limit(speed_limit as f64);
let start = Instant::now();
let handles = (0..i).map(|_| {
let limiter = limiter.clone();
sp.spawn_with_handle(async move {
let until = Instant::now() + Duration::from_secs(2);
while Instant::now() < until {
let size = thread_rng().gen_range(1, 1 + target / 10);
limiter.consume(size).await;
}
})
.unwrap()
});
pool.run_until(join_all(handles));
assert_eq!(limiter.shared_count(), 1);
let elapsed = start.elapsed();
let speed = limiter.total_bytes_consumed() as f64 / elapsed.as_secs_f64();
let diff_ratio = speed / speed_limit as f64;
eprintln!(
"rate: {} threads, expected speed {} B/s, actual speed {:.0} B/s, elapsed {:?}",
i, speed_limit, speed, elapsed
);
assert!(0.80 <= diff_ratio && diff_ratio <= 1.25);
assert!(elapsed <= Duration::from_secs(4));
}
}
}
#[test]
fn block() {
eprintln!("tests_with_standard_clock::block() will run for 20 seconds, please be patient");
for &i in &[1, 2, 4, 8, 16] {
let target = i * 10_240;
let limiter = <Limiter>::new(target as f64);
for &speed_limit in &[target, target * 2] {
limiter.reset_statistics();
limiter.set_speed_limit(speed_limit as f64);
let start = Instant::now();
let handles = (0..i)
.map(|_| {
let limiter = limiter.clone();
std::thread::spawn(move || {
let until = Instant::now() + Duration::from_secs(2);
while Instant::now() < until {
let size = thread_rng().gen_range(1, 1 + target / 10);
limiter.blocking_consume(size);
}
})
})
.collect::<Vec<_>>();
for jh in handles {
jh.join().unwrap();
}
assert_eq!(limiter.shared_count(), 1);
let elapsed = start.elapsed();
let speed = limiter.total_bytes_consumed() as f64 / elapsed.as_secs_f64();
let diff_ratio = speed / speed_limit as f64;
eprintln!(
"block: {} threads, expected speed {} B/s, actual speed {:.0} B/s, elapsed {:?}",
i, speed_limit, speed, elapsed
);
assert!(0.80 <= diff_ratio && diff_ratio <= 1.25);
assert!(elapsed <= Duration::from_secs(4));
}
}
}
}