1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
use atomic::AtomicU64;
use timer::{HandlePriv, Inner};
use Error;

use crossbeam_utils::CachePadded;
use futures::task::AtomicTask;
use futures::Poll;

use std::cell::UnsafeCell;
use std::ptr;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::{Relaxed, SeqCst};
use std::sync::{Arc, Weak};
use std::time::{Duration, Instant};
use std::u64;

/// Internal state shared between a `Delay` instance and the timer.
///
/// This struct is used as a node in two intrusive data structures:
///
/// * An atomic stack used to signal to the timer thread that the entry state
///   has changed. The timer thread will observe the entry on this stack and
///   perform any actions as necessary.
///
/// * A doubly linked list used **only** by the timer thread. Each slot in the
///   timer wheel is a head pointer to the list of entries that must be
///   processed during that timer tick.
#[derive(Debug)]
pub(crate) struct Entry {
    /// Only accessed from `Registration`.
    time: CachePadded<UnsafeCell<Time>>,

    /// Timer internals. Using a weak pointer allows the timer to shutdown
    /// without all `Delay` instances having completed.
    ///
    /// When `None`, the entry has not yet been linked with a timer instance.
    inner: Option<Weak<Inner>>,

    /// Tracks the entry state. This value contains the following information:
    ///
    /// * The deadline at which the entry must be "fired".
    /// * A flag indicating if the entry has already been fired.
    /// * Whether or not the entry transitioned to the error state.
    ///
    /// When an `Entry` is created, `state` is initialized to the instant at
    /// which the entry must be fired. When a timer is reset to a different
    /// instant, this value is changed.
    state: AtomicU64,

    /// Task to notify once the deadline is reached.
    task: AtomicTask,

    /// True when the entry is queued in the "process" stack. This value
    /// is set before pushing the value and unset after popping the value.
    ///
    /// TODO: This could possibly be rolled up into `state`.
    pub(super) queued: AtomicBool,

    /// Next entry in the "process" linked list.
    ///
    /// Access to this field is coordinated by the `queued` flag.
    ///
    /// Represents a strong Arc ref.
    pub(super) next_atomic: UnsafeCell<*mut Entry>,

    /// When the entry expires, relative to the `start` of the timer
    /// (Inner::start). This is only used by the timer.
    ///
    /// A `Delay` instance can be reset to a different deadline by the thread
    /// that owns the `Delay` instance. In this case, the timer thread will not
    /// immediately know that this has happened. The timer thread must know the
    /// last deadline that it saw as it uses this value to locate the entry in
    /// its wheel.
    ///
    /// Once the timer thread observes that the instant has changed, it updates
    /// the wheel and sets this value. The idea is that this value eventually
    /// converges to the value of `state` as the timer thread makes updates.
    when: UnsafeCell<Option<u64>>,

    /// Next entry in the State's linked list.
    ///
    /// This is only accessed by the timer
    pub(super) next_stack: UnsafeCell<Option<Arc<Entry>>>,

    /// Previous entry in the State's linked list.
    ///
    /// This is only accessed by the timer and is used to unlink a canceled
    /// entry.
    ///
    /// This is a weak reference.
    pub(super) prev_stack: UnsafeCell<*const Entry>,
}

/// Stores the info for `Delay`.
#[derive(Debug)]
pub(crate) struct Time {
    pub(crate) deadline: Instant,
    pub(crate) duration: Duration,
}

/// Flag indicating a timer entry has elapsed
const ELAPSED: u64 = 1 << 63;

/// Flag indicating a timer entry has reached an error state
const ERROR: u64 = u64::MAX;

// ===== impl Entry =====

impl Entry {
    pub fn new(deadline: Instant, duration: Duration) -> Entry {
        Entry {
            time: CachePadded::new(UnsafeCell::new(Time { deadline, duration })),
            inner: None,
            task: AtomicTask::new(),
            state: AtomicU64::new(0),
            queued: AtomicBool::new(false),
            next_atomic: UnsafeCell::new(ptr::null_mut()),
            when: UnsafeCell::new(None),
            next_stack: UnsafeCell::new(None),
            prev_stack: UnsafeCell::new(ptr::null_mut()),
        }
    }

    /// Only called by `Registration`
    pub fn time_ref(&self) -> &Time {
        unsafe { &*self.time.get() }
    }

    /// Only called by `Registration`
    pub fn time_mut(&self) -> &mut Time {
        unsafe { &mut *self.time.get() }
    }

    /// Returns `true` if the `Entry` is currently associated with a timer
    /// instance.
    pub fn is_registered(&self) -> bool {
        self.inner.is_some()
    }

    /// Only called by `Registration`
    pub fn register(me: &mut Arc<Self>) {
        let handle = match HandlePriv::try_current() {
            Ok(handle) => handle,
            Err(_) => {
                // Could not associate the entry with a timer, transition the
                // state to error
                Arc::get_mut(me).unwrap().transition_to_error();

                return;
            }
        };

        Entry::register_with(me, handle)
    }

    /// Only called by `Registration`
    pub fn register_with(me: &mut Arc<Self>, handle: HandlePriv) {
        assert!(!me.is_registered(), "only register an entry once");

        let deadline = me.time_ref().deadline;

        let inner = match handle.inner() {
            Some(inner) => inner,
            None => {
                // Could not associate the entry with a timer, transition the
                // state to error
                Arc::get_mut(me).unwrap().transition_to_error();

                return;
            }
        };

        // Increment the number of active timeouts
        if inner.increment().is_err() {
            Arc::get_mut(me).unwrap().transition_to_error();

            return;
        }

        // Associate the entry with the timer
        Arc::get_mut(me).unwrap().inner = Some(handle.into_inner());

        let when = inner.normalize_deadline(deadline);

        // Relaxed OK: At this point, there are no other threads that have
        // access to this entry.
        if when <= inner.elapsed() {
            me.state.store(ELAPSED, Relaxed);
            return;
        } else {
            me.state.store(when, Relaxed);
        }

        if inner.queue(me).is_err() {
            // The timer has shutdown, transition the entry to the error state.
            me.error();
        }
    }

    fn transition_to_error(&mut self) {
        self.inner = Some(Weak::new());
        self.state = AtomicU64::new(ERROR);
    }

    /// The current entry state as known by the timer. This is not the value of
    /// `state`, but lets the timer know how to converge its state to `state`.
    pub fn when_internal(&self) -> Option<u64> {
        unsafe { (*self.when.get()) }
    }

    pub fn set_when_internal(&self, when: Option<u64>) {
        unsafe {
            (*self.when.get()) = when;
        }
    }

    /// Called by `Timer` to load the current value of `state` for processing
    pub fn load_state(&self) -> Option<u64> {
        let state = self.state.load(SeqCst);

        if is_elapsed(state) {
            None
        } else {
            Some(state)
        }
    }

    pub fn is_elapsed(&self) -> bool {
        let state = self.state.load(SeqCst);
        is_elapsed(state)
    }

    pub fn fire(&self, when: u64) {
        let mut curr = self.state.load(SeqCst);

        loop {
            if is_elapsed(curr) || curr > when {
                return;
            }

            let next = ELAPSED | curr;
            let actual = self.state.compare_and_swap(curr, next, SeqCst);

            if curr == actual {
                break;
            }

            curr = actual;
        }

        self.task.notify();
    }

    pub fn error(&self) {
        // Only transition to the error state if not currently elapsed
        let mut curr = self.state.load(SeqCst);

        loop {
            if is_elapsed(curr) {
                return;
            }

            let next = ERROR;

            let actual = self.state.compare_and_swap(curr, next, SeqCst);

            if curr == actual {
                break;
            }

            curr = actual;
        }

        self.task.notify();
    }

    pub fn cancel(entry: &Arc<Entry>) {
        let state = entry.state.fetch_or(ELAPSED, SeqCst);

        if is_elapsed(state) {
            // Nothing more to do
            return;
        }

        // If registered with a timer instance, try to upgrade the Arc.
        let inner = match entry.upgrade_inner() {
            Some(inner) => inner,
            None => return,
        };

        let _ = inner.queue(entry);
    }

    pub fn poll_elapsed(&self) -> Poll<(), Error> {
        use futures::Async::NotReady;

        let mut curr = self.state.load(SeqCst);

        if is_elapsed(curr) {
            if curr == ERROR {
                return Err(Error::shutdown());
            } else {
                return Ok(().into());
            }
        }

        self.task.register();

        curr = self.state.load(SeqCst).into();

        if is_elapsed(curr) {
            if curr == ERROR {
                return Err(Error::shutdown());
            } else {
                return Ok(().into());
            }
        }

        Ok(NotReady)
    }

    /// Only called by `Registration`
    pub fn reset(entry: &mut Arc<Entry>) {
        if !entry.is_registered() {
            return;
        }

        let inner = match entry.upgrade_inner() {
            Some(inner) => inner,
            None => return,
        };

        let deadline = entry.time_ref().deadline;
        let when = inner.normalize_deadline(deadline);
        let elapsed = inner.elapsed();

        let mut curr = entry.state.load(SeqCst);
        let mut notify;

        loop {
            // In these two cases, there is no work to do when resetting the
            // timer. If the `Entry` is in an error state, then it cannot be
            // used anymore. If resetting the entry to the current value, then
            // the reset is a noop.
            if curr == ERROR || curr == when {
                return;
            }

            let next;

            if when <= elapsed {
                next = ELAPSED;
                notify = !is_elapsed(curr);
            } else {
                next = when;
                notify = true;
            }

            let actual = entry.state.compare_and_swap(curr, next, SeqCst);

            if curr == actual {
                break;
            }

            curr = actual;
        }

        if notify {
            let _ = inner.queue(entry);
        }
    }

    fn upgrade_inner(&self) -> Option<Arc<Inner>> {
        self.inner.as_ref().and_then(|inner| inner.upgrade())
    }
}

fn is_elapsed(state: u64) -> bool {
    state & ELAPSED == ELAPSED
}

impl Drop for Entry {
    fn drop(&mut self) {
        let inner = match self.upgrade_inner() {
            Some(inner) => inner,
            None => return,
        };

        inner.decrement();
    }
}

unsafe impl Send for Entry {}
unsafe impl Sync for Entry {}