#![warn(missing_docs)]
#[macro_use]
extern crate slog;
extern crate thread_local;
extern crate take_mut;
use slog::{Record, RecordStatic, Level, SingleKV, KV, BorrowedKV};
use slog::{Serializer, OwnedKVList, Key};
use slog::Drain;
use std::{io, thread};
use std::error::Error;
use std::fmt;
use std::sync;
use std::sync::{mpsc, Mutex};
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use take_mut::take;
struct ToSendSerializer {
kv: Box<KV + Send>,
}
impl ToSendSerializer {
fn new() -> Self {
ToSendSerializer { kv: Box::new(()) }
}
fn finish(self) -> Box<KV + Send> {
self.kv
}
}
impl Serializer for ToSendSerializer {
fn emit_bool(&mut self, key: Key, val: bool) -> slog::Result {
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
fn emit_unit(&mut self, key: Key) -> slog::Result {
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, ()))));
Ok(())
}
fn emit_none(&mut self, key: Key) -> slog::Result {
let val: Option<()> = None;
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
fn emit_char(&mut self, key: Key, val: char) -> slog::Result {
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
fn emit_u8(&mut self, key: Key, val: u8) -> slog::Result {
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
fn emit_i8(&mut self, key: Key, val: i8) -> slog::Result {
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
fn emit_u16(&mut self, key: Key, val: u16) -> slog::Result {
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
fn emit_i16(&mut self, key: Key, val: i16) -> slog::Result {
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
fn emit_u32(&mut self, key: Key, val: u32) -> slog::Result {
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
fn emit_i32(&mut self, key: Key, val: i32) -> slog::Result {
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
fn emit_f32(&mut self, key: Key, val: f32) -> slog::Result {
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
fn emit_u64(&mut self, key: Key, val: u64) -> slog::Result {
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
fn emit_i64(&mut self, key: Key, val: i64) -> slog::Result {
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
fn emit_f64(&mut self, key: Key, val: f64) -> slog::Result {
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
fn emit_usize(&mut self, key: Key, val: usize) -> slog::Result {
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
fn emit_isize(&mut self, key: Key, val: isize) -> slog::Result {
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
fn emit_str(&mut self, key: Key, val: &str) -> slog::Result {
let val = val.to_owned();
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
fn emit_arguments(
&mut self,
key: Key,
val: &fmt::Arguments,
) -> slog::Result {
let val = fmt::format(*val);
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
#[cfg(feature = "nested-values")]
fn emit_serde(&mut self, key: Key, value: &slog::SerdeValue) -> slog::Result {
let val = value.to_sendable();
take(&mut self.kv, |kv| Box::new((kv, SingleKV(key, val))));
Ok(())
}
}
#[derive(Debug)]
pub enum AsyncError {
Full,
Fatal(Box<std::error::Error>),
}
impl<T> From<mpsc::TrySendError<T>> for AsyncError {
fn from(_: mpsc::TrySendError<T>) -> AsyncError {
AsyncError::Full
}
}
impl<T> From<std::sync::TryLockError<T>> for AsyncError {
fn from(_: std::sync::TryLockError<T>) -> AsyncError {
AsyncError::Full
}
}
impl<T> From<mpsc::SendError<T>> for AsyncError {
fn from(_: mpsc::SendError<T>) -> AsyncError {
AsyncError::Fatal(Box::new(
io::Error::new(io::ErrorKind::BrokenPipe, "The logger thread terminated"),
))
}
}
impl<T> From<std::sync::PoisonError<T>> for AsyncError {
fn from(err: std::sync::PoisonError<T>) -> AsyncError {
AsyncError::Fatal(Box::new(
io::Error::new(io::ErrorKind::BrokenPipe, err.description()),
))
}
}
pub type AsyncResult<T> = std::result::Result<T, AsyncError>;
pub struct AsyncCoreBuilder<D>
where
D: slog::Drain<Err = slog::Never, Ok = ()> + Send + 'static,
{
chan_size: usize,
blocking: bool,
drain: D,
thread_name: Option<String>,
}
impl<D> AsyncCoreBuilder<D>
where
D: slog::Drain<Err = slog::Never, Ok = ()> + Send + 'static,
{
fn new(drain: D) -> Self {
AsyncCoreBuilder {
chan_size: 128,
blocking: false,
drain,
thread_name: None,
}
}
pub fn thread_name(mut self, name: String) -> Self {
assert!(name.find('\0').is_none(), "Name with \\'0\\' in it passed");
self.thread_name = Some(name);
self
}
pub fn chan_size(mut self, s: usize) -> Self {
self.chan_size = s;
self
}
pub fn blocking(mut self, blocking: bool) -> Self {
self.blocking = blocking;
self
}
fn spawn_thread(
self,
) -> (thread::JoinHandle<()>, mpsc::SyncSender<AsyncMsg>) {
let (tx, rx) = mpsc::sync_channel(self.chan_size);
let mut builder = thread::Builder::new();
if let Some(thread_name) = self.thread_name {
builder = builder.name(thread_name);
}
let drain = self.drain;
let join = builder.spawn(move || loop {
match rx.recv().unwrap() {
AsyncMsg::Record(r) => {
let rs = RecordStatic {
location: &*r.location,
level: r.level,
tag: &r.tag,
};
drain
.log(
&Record::new(
&rs,
&format_args!("{}", r.msg),
BorrowedKV(&r.kv),
),
&r.logger_values,
)
.unwrap();
}
AsyncMsg::Finish => return,
}
}).unwrap();
(join, tx)
}
pub fn build(self) -> AsyncCore {
self.build_no_guard()
}
pub fn build_no_guard(self) -> AsyncCore {
let blocking = self.blocking;
let (join, tx) = self.spawn_thread();
AsyncCore {
ref_sender: Mutex::new(tx),
tl_sender: thread_local::ThreadLocal::new(),
join: Mutex::new(Some(join)),
blocking,
}
}
pub fn build_with_guard(self) -> (AsyncCore, AsyncGuard) {
let blocking = self.blocking;
let (join, tx) = self.spawn_thread();
(
AsyncCore {
ref_sender: Mutex::new(tx.clone()),
tl_sender: thread_local::ThreadLocal::new(),
join: Mutex::new(None),
blocking,
},
AsyncGuard {
join: Some(join),
tx,
},
)
}
}
pub struct AsyncGuard {
join: Option<thread::JoinHandle<()>>,
tx: mpsc::SyncSender<AsyncMsg>,
}
impl Drop for AsyncGuard {
fn drop(&mut self) {
let _err: Result<(), Box<std::error::Error>> = {
|| {
let _ = self.tx.send(AsyncMsg::Finish);
self.join.take().unwrap().join().map_err(|_| {
io::Error::new(
io::ErrorKind::BrokenPipe,
"Logging thread worker join error",
)
})?;
Ok(())
}
}();
}
}
pub struct AsyncCore {
ref_sender: Mutex<mpsc::SyncSender<AsyncMsg>>,
tl_sender: thread_local::ThreadLocal<mpsc::SyncSender<AsyncMsg>>,
join: Mutex<Option<thread::JoinHandle<()>>>,
blocking: bool,
}
impl AsyncCore {
pub fn new<D>(drain: D) -> Self
where
D: slog::Drain<Err = slog::Never, Ok = ()> + Send + 'static,
D: std::panic::RefUnwindSafe,
{
AsyncCoreBuilder::new(drain).build()
}
pub fn custom<
D: slog::Drain<Err = slog::Never, Ok = ()> + Send + 'static,
>(
drain: D,
) -> AsyncCoreBuilder<D> {
AsyncCoreBuilder::new(drain)
}
fn get_sender(
&self,
) -> Result<
&mpsc::SyncSender<AsyncMsg>,
std::sync::PoisonError<sync::MutexGuard<mpsc::SyncSender<AsyncMsg>>>,
> {
self.tl_sender
.get_or_try(|| Ok(Box::new(self.ref_sender.lock()?.clone())))
}
fn send(&self, r: AsyncRecord) -> AsyncResult<()> {
let sender = self.get_sender()?;
if self.blocking {
sender.send(AsyncMsg::Record(r))?;
} else {
sender.try_send(AsyncMsg::Record(r))?;
}
Ok(())
}
}
impl Drain for AsyncCore {
type Ok = ();
type Err = AsyncError;
fn log(
&self,
record: &Record,
logger_values: &OwnedKVList,
) -> AsyncResult<()> {
let mut ser = ToSendSerializer::new();
record
.kv()
.serialize(record, &mut ser)
.expect("`ToSendSerializer` can't fail");
self.send(AsyncRecord {
msg: fmt::format(*record.msg()),
level: record.level(),
location: Box::new(*record.location()),
tag: String::from(record.tag()),
logger_values: logger_values.clone(),
kv: ser.finish(),
})
}
}
struct AsyncRecord {
msg: String,
level: Level,
location: Box<slog::RecordLocation>,
tag: String,
logger_values: OwnedKVList,
kv: Box<KV + Send>,
}
enum AsyncMsg {
Record(AsyncRecord),
Finish,
}
impl Drop for AsyncCore {
fn drop(&mut self) {
let _err: Result<(), Box<std::error::Error>> = {
|| {
if let Some(join) = self.join.lock()?.take() {
let _ = self.get_sender()?.send(AsyncMsg::Finish);
join.join().map_err(|_| {
io::Error::new(
io::ErrorKind::BrokenPipe,
"Logging thread worker join error",
)
})?;
}
Ok(())
}
}();
}
}
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash, Ord, PartialOrd)]
pub enum OverflowStrategy {
DropAndReport,
Drop,
Block,
#[doc(hidden)]
DoNotMatchAgainstThisAndReadTheDocs,
}
pub struct AsyncBuilder<D>
where
D: slog::Drain<Err = slog::Never, Ok = ()> + Send + 'static,
{
core: AsyncCoreBuilder<D>,
inc_dropped: bool,
}
impl<D> AsyncBuilder<D>
where
D: slog::Drain<Err = slog::Never, Ok = ()> + Send + 'static,
{
fn new(drain: D) -> AsyncBuilder<D> {
AsyncBuilder {
core: AsyncCoreBuilder::new(drain),
inc_dropped: true,
}
}
pub fn chan_size(self, s: usize) -> Self {
AsyncBuilder {
core: self.core.chan_size(s),
.. self
}
}
pub fn overflow_strategy(self, overflow_strategy: OverflowStrategy) -> Self {
let (block, inc) = match overflow_strategy {
OverflowStrategy::Block => (true, false),
OverflowStrategy::Drop => (false, false),
OverflowStrategy::DropAndReport => (false, true),
OverflowStrategy::DoNotMatchAgainstThisAndReadTheDocs => panic!("Invalid variant"),
};
AsyncBuilder {
core: self.core.blocking(block),
inc_dropped: inc,
}
}
pub fn thread_name(self, name: String) -> Self {
AsyncBuilder {
core: self.core.thread_name(name),
.. self
}
}
pub fn build(self) -> Async {
Async {
core: self.core.build_no_guard(),
dropped: AtomicUsize::new(0),
inc_dropped: self.inc_dropped,
}
}
pub fn build_no_guard(self) -> Async {
Async {
core: self.core.build_no_guard(),
dropped: AtomicUsize::new(0),
inc_dropped: self.inc_dropped,
}
}
pub fn build_with_guard(self) -> (Async, AsyncGuard) {
let (core, guard) = self.core.build_with_guard();
(
Async {
core,
dropped: AtomicUsize::new(0),
inc_dropped: self.inc_dropped,
},
guard,
)
}
}
pub struct Async {
core: AsyncCore,
dropped: AtomicUsize,
inc_dropped: bool,
}
impl Async {
pub fn default<
D: slog::Drain<Err = slog::Never, Ok = ()> + Send + 'static,
>(
drain: D,
) -> Self {
AsyncBuilder::new(drain).build()
}
pub fn new<D: slog::Drain<Err = slog::Never, Ok = ()> + Send + 'static>(
drain: D,
) -> AsyncBuilder<D> {
AsyncBuilder::new(drain)
}
fn push_dropped(&self, logger_values: &OwnedKVList) -> AsyncResult<()> {
let dropped = self.dropped.swap(0, Ordering::Relaxed);
if dropped > 0 {
match self.core.log(
&record!(
slog::Level::Error,
"slog-async",
&format_args!(
"slog-async: logger dropped messages \
due to channel \
overflow"
),
b!("count" => dropped)
),
logger_values,
) {
Ok(()) => {}
Err(AsyncError::Full) => {
self.dropped.fetch_add(dropped + 1, Ordering::Relaxed);
return Ok(());
}
Err(e) => return Err(e),
}
}
Ok(())
}
}
impl Drain for Async {
type Ok = ();
type Err = AsyncError;
fn log(
&self,
record: &Record,
logger_values: &OwnedKVList,
) -> AsyncResult<()> {
self.push_dropped(logger_values)?;
match self.core.log(record, logger_values) {
Ok(()) => {}
Err(AsyncError::Full) if self.inc_dropped => {
self.dropped.fetch_add(1, Ordering::Relaxed);
},
Err(AsyncError::Full) => {},
Err(e) => return Err(e),
}
Ok(())
}
}
impl Drop for Async {
fn drop(&mut self) {
let _ = self.push_dropped(&o!().into());
}
}