1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.

//! The task queues available for the thread pool.
//!
//! A task queue has two parts: a shared `[TaskInjector]` and several
//! [`LocalQueue`]s. Unlike usual MPMC queues, [`LocalQueue`] is not required
//! to be `Sync`. The thread pool will use one [`LocalQueue`] per thread,
//! which make it possible to do extreme optimizations and define complicated
//! data structs.

pub mod multilevel;

mod extras;
mod single_level;

pub use self::extras::Extras;

use std::time::Instant;

/// A cell containing a task and needed extra information.
pub trait TaskCell {
    /// Gets mutable extra information.
    fn mut_extras(&mut self) -> &mut Extras;
}

/// A convenient trait that support construct a TaskCell with
/// given extras.
pub trait WithExtras<T> {
    /// Return a TaskCell with the given extras.
    fn with_extras(self, extras: impl FnOnce() -> Extras) -> T;
}

impl<F: TaskCell> WithExtras<F> for F {
    fn with_extras(self, _: impl FnOnce() -> Extras) -> F {
        self
    }
}

/// The injector of a task queue.
pub(crate) struct TaskInjector<T>(InjectorInner<T>);

enum InjectorInner<T> {
    SingleLevel(single_level::TaskInjector<T>),
    Multilevel(multilevel::TaskInjector<T>),
}

impl<T: TaskCell + Send> TaskInjector<T> {
    /// Pushes a task to the queue.
    pub fn push(&self, task_cell: T) {
        match &self.0 {
            InjectorInner::SingleLevel(q) => q.push(task_cell),
            InjectorInner::Multilevel(q) => q.push(task_cell),
        }
    }

    pub fn default_extras(&self) -> Extras {
        match self.0 {
            InjectorInner::SingleLevel(_) => Extras::single_level(),
            InjectorInner::Multilevel(_) => Extras::multilevel_default(),
        }
    }
}

/// Popped task cell from a task queue.
pub struct Pop<T> {
    /// The task cell
    pub task_cell: T,

    /// When the task was pushed to the queue.
    pub schedule_time: Instant,

    /// Whether the task comes from the current [`LocalQueue`] instead of being
    /// just stolen from the injector or other [`LocalQueue`]s.
    pub from_local: bool,
}

/// The local queue of a task queue.
pub(crate) struct LocalQueue<T>(LocalQueueInner<T>);

enum LocalQueueInner<T> {
    SingleLevel(single_level::LocalQueue<T>),
    Multilevel(multilevel::LocalQueue<T>),
}

impl<T: TaskCell + Send> LocalQueue<T> {
    /// Pushes a task to the local queue.
    pub fn push(&mut self, task_cell: T) {
        match &mut self.0 {
            LocalQueueInner::SingleLevel(q) => q.push(task_cell),
            LocalQueueInner::Multilevel(q) => q.push(task_cell),
        }
    }

    /// Gets a task cell from the queue. Returns `None` if there is no task cell
    /// available.
    pub fn pop(&mut self) -> Option<Pop<T>> {
        match &mut self.0 {
            LocalQueueInner::SingleLevel(q) => q.pop(),
            LocalQueueInner::Multilevel(q) => q.pop(),
        }
    }

    pub fn default_extras(&self) -> Extras {
        match self.0 {
            LocalQueueInner::SingleLevel(_) => Extras::single_level(),
            LocalQueueInner::Multilevel(_) => Extras::multilevel_default(),
        }
    }

    /// If there are tasks in the local queue, returns true. Otherwise, pulls
    /// tasks from the global queue and returns whether it succeeds.
    pub fn has_tasks_or_pull(&mut self) -> bool {
        match &mut self.0 {
            LocalQueueInner::SingleLevel(q) => q.has_tasks_or_pull(),
            LocalQueueInner::Multilevel(q) => q.has_tasks_or_pull(),
        }
    }
}

/// Supported available queues.
pub enum QueueType {
    /// A single level work stealing queue.
    SingleLevel,
    /// A multilevel feedback queue.
    ///
    /// More to see: https://en.wikipedia.org/wiki/Multilevel_feedback_queue.
    Multilevel(multilevel::Builder),
}

impl Default for QueueType {
    fn default() -> QueueType {
        QueueType::SingleLevel
    }
}

impl From<multilevel::Builder> for QueueType {
    fn from(b: multilevel::Builder) -> QueueType {
        QueueType::Multilevel(b)
    }
}

pub(crate) fn build<T>(ty: QueueType, local_num: usize) -> (TaskInjector<T>, Vec<LocalQueue<T>>) {
    match ty {
        QueueType::SingleLevel => single_level(local_num),
        QueueType::Multilevel(b) => b.build(local_num),
    }
}

/// Creates a task queue that allows given number consumers.
fn single_level<T>(local_num: usize) -> (TaskInjector<T>, Vec<LocalQueue<T>>) {
    let (injector, locals) = single_level::create(local_num);
    (
        TaskInjector(InjectorInner::SingleLevel(injector)),
        locals
            .into_iter()
            .map(|i| LocalQueue(LocalQueueInner::SingleLevel(i)))
            .collect(),
    )
}