-
Notifications
You must be signed in to change notification settings - Fork 62
[sled-diagnostics] use ParallelTaskSet for multiple commands #8151
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
edb0440
6a5a376
b5eb158
dee2f7e
ef87515
c1ff648
f89127e
10de4e4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,7 +4,8 @@ | |
|
|
||
| //! Diagnostics for an Oxide sled that exposes common support commands. | ||
|
|
||
| use futures::{StreamExt, stream::FuturesUnordered}; | ||
| use std::sync::Arc; | ||
|
|
||
| use slog::Logger; | ||
|
|
||
| #[macro_use] | ||
|
|
@@ -21,6 +22,7 @@ cfg_if::cfg_if! { | |
|
|
||
| pub mod logs; | ||
| pub use logs::{LogError, LogsHandle}; | ||
| use tokio::{sync::Semaphore, task::JoinSet}; | ||
|
|
||
| mod queries; | ||
| pub use crate::queries::{ | ||
|
|
@@ -29,6 +31,41 @@ pub use crate::queries::{ | |
| }; | ||
| use queries::*; | ||
|
|
||
| /// Max number of commands to run in parallel | ||
| const MAX_PARALLELISM: usize = 50; | ||
|
||
|
|
||
| struct MultipleCommands<T> { | ||
| semaphore: Arc<Semaphore>, | ||
| set: JoinSet<T>, | ||
| } | ||
|
|
||
| impl<T: 'static + Send> MultipleCommands<T> { | ||
| fn new() -> MultipleCommands<T> { | ||
| let semaphore = Arc::new(Semaphore::new(MAX_PARALLELISM)); | ||
|
||
| let set = JoinSet::new(); | ||
|
|
||
| Self { semaphore, set } | ||
| } | ||
|
|
||
| fn add_command<F>(&mut self, command: F) | ||
| where | ||
| F: std::future::Future<Output = T> + Send + 'static, | ||
| { | ||
| let semaphore = Arc::clone(&self.semaphore); | ||
| let _abort_handle = self.set.spawn(async move { | ||
| // Hold onto the permit until the command finishes executing | ||
| let _permit = | ||
| semaphore.acquire_owned().await.expect("semaphore acquire"); | ||
| command.await | ||
| }); | ||
| } | ||
|
|
||
| /// Wait for all commands to execute and return their output. | ||
| async fn join_all(self) -> Vec<T> { | ||
| self.set.join_all().await | ||
| } | ||
| } | ||
|
|
||
| /// List all zones on a sled. | ||
| pub async fn zoneadm_info() | ||
| -> Result<SledDiagnosticsCmdOutput, SledDiagnosticsCmdError> { | ||
|
|
@@ -38,33 +75,31 @@ pub async fn zoneadm_info() | |
| /// Retrieve various `ipadm` command output for the system. | ||
| pub async fn ipadm_info() | ||
| -> Vec<Result<SledDiagnosticsCmdOutput, SledDiagnosticsCmdError>> { | ||
| [ipadm_show_interface(), ipadm_show_addr(), ipadm_show_prop()] | ||
| .into_iter() | ||
| .map(|c| async move { | ||
| execute_command_with_timeout(c, DEFAULT_TIMEOUT).await | ||
| }) | ||
| .collect::<FuturesUnordered<_>>() | ||
| .collect::<Vec<Result<_, _>>>() | ||
| .await | ||
| let mut commands = MultipleCommands::new(); | ||
| for command in | ||
| [ipadm_show_interface(), ipadm_show_addr(), ipadm_show_prop()] | ||
| { | ||
| commands | ||
| .add_command(execute_command_with_timeout(command, DEFAULT_TIMEOUT)) | ||
| } | ||
| commands.join_all().await | ||
| } | ||
|
|
||
| /// Retrieve various `dladm` command output for the system. | ||
| pub async fn dladm_info() | ||
| -> Vec<Result<SledDiagnosticsCmdOutput, SledDiagnosticsCmdError>> { | ||
| [ | ||
| let mut commands = MultipleCommands::new(); | ||
| for command in [ | ||
| dladm_show_phys(), | ||
| dladm_show_ether(), | ||
| dladm_show_link(), | ||
| dladm_show_vnic(), | ||
| dladm_show_linkprop(), | ||
| ] | ||
| .into_iter() | ||
| .map(|c| async move { | ||
| execute_command_with_timeout(c, DEFAULT_TIMEOUT).await | ||
| }) | ||
| .collect::<FuturesUnordered<_>>() | ||
| .collect::<Vec<Result<_, _>>>() | ||
| .await | ||
| ] { | ||
| commands | ||
| .add_command(execute_command_with_timeout(command, DEFAULT_TIMEOUT)) | ||
| } | ||
| commands.join_all().await | ||
| } | ||
|
|
||
| pub async fn nvmeadm_info() | ||
|
|
@@ -83,14 +118,14 @@ pub async fn pargs_oxide_processes( | |
| Err(e) => return vec![Err(e.into())], | ||
| }; | ||
|
|
||
| pids.iter() | ||
| .map(|pid| pargs_process(*pid)) | ||
| .map(|c| async move { | ||
| execute_command_with_timeout(c, DEFAULT_TIMEOUT).await | ||
| }) | ||
| .collect::<FuturesUnordered<_>>() | ||
| .collect::<Vec<Result<_, _>>>() | ||
| .await | ||
| let mut commands = MultipleCommands::new(); | ||
| for pid in pids { | ||
| commands.add_command(execute_command_with_timeout( | ||
| pargs_process(pid), | ||
| DEFAULT_TIMEOUT, | ||
| )); | ||
| } | ||
| commands.join_all().await | ||
| } | ||
|
|
||
| pub async fn pstack_oxide_processes( | ||
|
|
@@ -104,14 +139,14 @@ pub async fn pstack_oxide_processes( | |
| Err(e) => return vec![Err(e.into())], | ||
| }; | ||
|
|
||
| pids.iter() | ||
| .map(|pid| pstack_process(*pid)) | ||
| .map(|c| async move { | ||
| execute_command_with_timeout(c, DEFAULT_TIMEOUT).await | ||
| }) | ||
| .collect::<FuturesUnordered<_>>() | ||
| .collect::<Vec<Result<_, _>>>() | ||
| .await | ||
| let mut commands = MultipleCommands::new(); | ||
| for pid in pids { | ||
| commands.add_command(execute_command_with_timeout( | ||
| pstack_process(pid), | ||
| DEFAULT_TIMEOUT, | ||
| )); | ||
| } | ||
| commands.join_all().await | ||
| } | ||
|
|
||
| pub async fn pfiles_oxide_processes( | ||
|
|
@@ -125,14 +160,14 @@ pub async fn pfiles_oxide_processes( | |
| Err(e) => return vec![Err(e.into())], | ||
| }; | ||
|
|
||
| pids.iter() | ||
| .map(|pid| pfiles_process(*pid)) | ||
| .map(|c| async move { | ||
| execute_command_with_timeout(c, DEFAULT_TIMEOUT).await | ||
| }) | ||
| .collect::<FuturesUnordered<_>>() | ||
| .collect::<Vec<Result<_, _>>>() | ||
| .await | ||
| let mut commands = MultipleCommands::new(); | ||
| for pid in pids { | ||
| commands.add_command(execute_command_with_timeout( | ||
| pfiles_process(pid), | ||
| DEFAULT_TIMEOUT, | ||
| )); | ||
| } | ||
| commands.join_all().await | ||
| } | ||
|
|
||
| /// Retrieve various `zfs` command output for the system. | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.