-
-
Notifications
You must be signed in to change notification settings - Fork 2.8k
Implement task dumps for current-thread runtime. #5608
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 16 commits
1222dfb
d9aa093
66cd9c9
206b3d4
b998f74
8a727a1
80dbaf7
c132b33
9c86604
306a547
725bf3f
c20af63
52c8ca9
fdc01e1
144f378
c824389
0c655bb
578c3c4
ac7fa6b
a246b11
a62f9ee
f695499
bb6990f
51d15f0
178bf74
0a8d240
b6d093a
1ff355b
9e0dd91
9a5e7fa
f536db5
d3cda94
a29974d
51f970f
8a8dceb
b0b1160
8318d93
0dc4b68
04bd5e1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,48 @@ | ||
| //! This example demonstrates tokio's experimental taskdumping functionality. | ||
|
|
||
| #[cfg(all( | ||
| tokio_unstable, | ||
| target_os = "linux", | ||
| any(target_arch = "aarch64", target_arch = "i686", target_arch = "x86_64") | ||
| ))] | ||
| #[tokio::main(flavor = "current_thread")] | ||
| async fn main() { | ||
| use std::hint::black_box; | ||
|
|
||
| #[inline(never)] | ||
| async fn a() { | ||
| black_box(b()).await | ||
| } | ||
|
|
||
| #[inline(never)] | ||
| async fn b() { | ||
| black_box(c()).await | ||
| } | ||
|
|
||
| #[inline(never)] | ||
| async fn c() { | ||
| black_box(tokio::task::yield_now()).await | ||
| } | ||
|
|
||
| tokio::spawn(a()); | ||
| tokio::spawn(b()); | ||
| tokio::spawn(c()); | ||
|
|
||
| let handle = tokio::runtime::Handle::current(); | ||
| let dump = handle.dump(); | ||
|
|
||
| for (i, task) in dump.tasks().iter().enumerate() { | ||
| let trace = task.trace(); | ||
| println!("task {i} trace:"); | ||
| println!("{trace}"); | ||
| } | ||
| } | ||
|
|
||
| #[cfg(not(all( | ||
| tokio_unstable, | ||
| target_os = "linux", | ||
| any(target_arch = "aarch64", target_arch = "i686", target_arch = "x86_64") | ||
| )))] | ||
| fn main() { | ||
| println!("task dumps are not available") | ||
| } |
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,66 @@ | ||||||
| //! Snapshots of runtime state. | ||||||
| use std::fmt; | ||||||
|
|
||||||
| /// A snapshot of a runtime's state. | ||||||
| #[derive(Debug)] | ||||||
| pub struct Dump { | ||||||
| tasks: Tasks, | ||||||
| } | ||||||
|
|
||||||
| /// Snapshots of tasks. | ||||||
| #[derive(Debug)] | ||||||
| pub struct Tasks { | ||||||
| tasks: Vec<Task>, | ||||||
| } | ||||||
|
|
||||||
| /// A snapshot of a task. | ||||||
| #[derive(Debug)] | ||||||
| pub struct Task { | ||||||
| trace: Trace, | ||||||
| } | ||||||
|
|
||||||
| /// An execution trace of a task's last poll. | ||||||
| #[derive(Debug)] | ||||||
| pub struct Trace { | ||||||
| inner: super::task::trace::Trace, | ||||||
| } | ||||||
|
|
||||||
| impl Dump { | ||||||
| pub(crate) fn new(tasks: Vec<Task>) -> Self { | ||||||
| Self { | ||||||
| tasks: Tasks { tasks }, | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| /// Tasks in this snapshot. | ||||||
| pub fn tasks(&self) -> &Tasks { | ||||||
| &self.tasks | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| impl Tasks { | ||||||
| /// Iterate over tasks. | ||||||
| pub fn iter(&self) -> impl Iterator<Item = &Task> { | ||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One possible improvement here is to use impl Trait to simplify the definition of methods that return iterators. For example:
Suggested change
'_ means that the lifetime of the returned iterator is bound to the lifetime of the Tasks object |
||||||
| self.tasks.iter() | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| impl Task { | ||||||
| pub(crate) fn new(trace: super::task::trace::Trace) -> Self { | ||||||
| Self { | ||||||
| trace: Trace { inner: trace }, | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| /// A trace of this task's state. | ||||||
| pub fn trace(&self) -> &Trace { | ||||||
| &self.trace | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| impl fmt::Display for Trace { | ||||||
| fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | ||||||
| self.inner.fmt(f) | ||||||
| } | ||||||
| } | ||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -252,6 +252,14 @@ impl Handle { | |
| /// [`tokio::time`]: crate::time | ||
| #[track_caller] | ||
| pub fn block_on<F: Future>(&self, future: F) -> F::Output { | ||
| #[cfg(all( | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't require this change for the PR to land, but generally, these inline cfg flags are hard to maintain. I tend to prefer using cfg to define APIs and then using the API regardless of the cfg in code like this. For example, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similarly, many of these cfgs could be reduced to a single There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm concerned that reducing these to just |
||
| tokio_unstable, | ||
| feature = "taskdump", | ||
| target_os = "linux", | ||
| any(target_arch = "aarch64", target_arch = "i686", target_arch = "x86_64") | ||
| ))] | ||
| let future = super::task::trace::Trace::root(future); | ||
|
|
||
| #[cfg(all(tokio_unstable, feature = "tracing"))] | ||
| let future = | ||
| crate::util::trace::task(future, "block_on", None, super::task::Id::next().as_u64()); | ||
|
|
@@ -274,6 +282,13 @@ impl Handle { | |
| F::Output: Send + 'static, | ||
| { | ||
| let id = crate::runtime::task::Id::next(); | ||
| #[cfg(all( | ||
| tokio_unstable, | ||
| feature = "taskdump", | ||
| target_os = "linux", | ||
| any(target_arch = "aarch64", target_arch = "i686", target_arch = "x86_64") | ||
| ))] | ||
| let future = super::task::trace::Trace::root(future); | ||
| #[cfg(all(tokio_unstable, feature = "tracing"))] | ||
| let future = crate::util::trace::task(future, "task", _name, id.as_u64()); | ||
| self.inner.spawn(future, id) | ||
|
|
@@ -321,6 +336,20 @@ cfg_metrics! { | |
| } | ||
| } | ||
|
|
||
| cfg_taskdump! { | ||
| impl Handle { | ||
| /// Capture a snapshot of this runtime's state. | ||
| pub fn dump(&self) -> crate::runtime::Dump { | ||
| match &self.inner { | ||
| scheduler::Handle::CurrentThread(handle) => handle.dump(), | ||
| #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] | ||
| scheduler::Handle::MultiThread(_) => | ||
| unimplemented!("taskdumps are unsupported on the multi-thread runtime"), | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /// Error returned by `try_current` when no Runtime has been started | ||
| #[derive(Debug)] | ||
| pub struct TryCurrentError { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -233,6 +233,11 @@ cfg_rt! { | |
| mod defer; | ||
| pub(crate) use defer::Defer; | ||
|
|
||
| cfg_taskdump! { | ||
| pub mod dump; | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not critical for this PR, but we probably want to reconsider the API details before stabilization. Today, |
||
| pub use dump::Dump; | ||
| } | ||
|
|
||
| mod handle; | ||
| pub use handle::{EnterGuard, Handle, TryCurrentError}; | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -377,6 +377,51 @@ impl Handle { | |
| handle | ||
| } | ||
|
|
||
| /// Capture a snapshot of this runtime's state. | ||
| #[cfg(all( | ||
| tokio_unstable, | ||
| feature = "taskdump", | ||
| target_os = "linux", | ||
| any(target_arch = "aarch64", target_arch = "i686", target_arch = "x86_64") | ||
| ))] | ||
| pub(crate) fn dump(&self) -> crate::runtime::Dump { | ||
| use crate::runtime::dump; | ||
| use task::trace::trace_current_thread; | ||
|
|
||
| let mut traces = vec![]; | ||
|
|
||
| // todo: how to make this work outside of a runtime context? | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You will most likely have to send a signal somehow. |
||
| CURRENT.with(|maybe_context| { | ||
| // drain the local queue | ||
| let context = if let Some(context) = maybe_context { | ||
| context | ||
| } else { | ||
| return; | ||
| }; | ||
| let mut maybe_core = context.core.borrow_mut(); | ||
| let core = if let Some(core) = maybe_core.as_mut() { | ||
| core | ||
| } else { | ||
| return; | ||
| }; | ||
| let local = &mut core.tasks; | ||
|
|
||
| let mut injection = self.shared.queue.lock(); | ||
| let injection = if let Some(injection) = injection.as_mut() { | ||
| injection | ||
| } else { | ||
| return; | ||
| }; | ||
|
|
||
| traces = trace_current_thread(&self.shared.owned, local, injection) | ||
| .into_iter() | ||
| .map(dump::Task::new) | ||
| .collect(); | ||
| }); | ||
|
|
||
| dump::Dump::new(traces) | ||
| } | ||
|
|
||
| fn pop(&self) -> Option<task::Notified<Arc<Handle>>> { | ||
| match self.shared.queue.lock().as_mut() { | ||
| Some(queue) => queue.pop_front(), | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.