Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 103 additions & 42 deletions sled-diagnostics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

//! Diagnostics for an Oxide sled that exposes common support commands.

use futures::{StreamExt, stream::FuturesUnordered};
use std::sync::Arc;

use slog::Logger;

#[macro_use]
Expand All @@ -21,6 +22,7 @@ cfg_if::cfg_if! {

pub mod logs;
pub use logs::{LogError, LogsHandle};
use tokio::{sync::Semaphore, task::JoinSet};

mod queries;
pub use crate::queries::{
Expand All @@ -29,6 +31,59 @@ pub use crate::queries::{
};
use queries::*;

/// Max number of commands to run in parallel
const MAX_PARALLELISM: usize = 50;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How did we arrive at this number?


trait ParallelCommandExecution {
type Output;

/// Add a command to the set of commands to be executed.
async fn add_command<F>(&mut self, command: F)
where
F: std::future::Future<Output = Self::Output> + Send + 'static;
}

struct MultipleCommands<T> {
semaphore: Arc<Semaphore>,
set: JoinSet<T>,
}

impl<T: 'static> MultipleCommands<T> {
fn new() -> MultipleCommands<T> {
let semaphore = Arc::new(Semaphore::new(MAX_PARALLELISM));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the subject of the concurrency limit, it seems to me that the functions in this crate that run multiple commands fall into two categories: those that run a fairly small, fixed number of commands, like ipadm_info and dladm_info, and those which run a command or set of commands against every Oxide process PID (pargs_oxide_processes etc).

For the functions that run commands against every Oxide process pid, the concurrency limit is certainly useful, as there may be basically any number of pids. But something like ipadm_info will always spawn exactly 3 processes, which is below the concurrency limit, and all this faffing around with a Semaphore is unnecessary.

I kind of wonder if the class of functions that spawn a fixed set of commands should eschew the use of MultipleCommands and just construct a JoinSet and spawn their 3 or 5 tasks or whatever. In practice, any overhead from the semaphore acquire/release/drop and stuff is probably insignificant compared to "actually spawning a child process" so this probably doesn't actually matter, but we could avoid doing it...up to you.

let set = JoinSet::new();

Self { semaphore, set }
}

/// Wait for all commands to execute and return their output.
async fn join_all(self) -> Vec<T> {
self.set.join_all().await
}
}

impl<T> ParallelCommandExecution for MultipleCommands<T>
where
T: Send + 'static,
{
type Output = T;

async fn add_command<F>(&mut self, command: F)
where
F: std::future::Future<Output = Self::Output> + Send + 'static,
{
let permit = Arc::clone(&self.semaphore)
.acquire_owned()
.await
.expect("semaphore acquire");
let _abort_handle = self.set.spawn(async move {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, this will not even spawn the tasks until there's capacity in the concurrency limit. What happens if you change it to:

Suggested change
let permit = Arc::clone(&self.semaphore)
.acquire_owned()
.await
.expect("semaphore acquire");
let _abort_handle = self.set.spawn(async move {
let semaphore = self.semaphore.clone();
let _abort_handle = self.set.spawn(async move {
let permit = semaphore.acquire().await.expect("semaphore acquire");

This way, all the tasks are spawned immediately, and the task adding commands to the set can do so synchronously (changing add_command to a normal fn) and then just wait for them to all come back. Right now, the task that adds commands has to get woken up a bunch of times to add the next one to the set; I wonder whether changing this to spawn everything synchronously would make a meaningful difference in performance...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this change I see the following...

stable rust + 1gb RSS:

took 24.416534527s

beta rust + 1gb RSS:

took 3.829577938s

Over chat I told you that with beta rust and the joinset we saw took 3.674288649s so it's negligible but overall I think a better design to go with your suggestion. I am going to push a commit to this branch.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah well, it was worth a shot, i guess! thanks for giving it a spin!

let res = command.await;
drop(permit);
res
});
}
}

/// List all zones on a sled.
pub async fn zoneadm_info()
-> Result<SledDiagnosticsCmdOutput, SledDiagnosticsCmdError> {
Expand All @@ -38,33 +93,33 @@ pub async fn zoneadm_info()
/// Retrieve various `ipadm` command output for the system.
pub async fn ipadm_info()
-> Vec<Result<SledDiagnosticsCmdOutput, SledDiagnosticsCmdError>> {
[ipadm_show_interface(), ipadm_show_addr(), ipadm_show_prop()]
.into_iter()
.map(|c| async move {
execute_command_with_timeout(c, DEFAULT_TIMEOUT).await
})
.collect::<FuturesUnordered<_>>()
.collect::<Vec<Result<_, _>>>()
.await
let mut commands = MultipleCommands::new();
for command in
[ipadm_show_interface(), ipadm_show_addr(), ipadm_show_prop()]
{
commands
.add_command(execute_command_with_timeout(command, DEFAULT_TIMEOUT))
.await
}
commands.join_all().await
}

/// Retrieve various `dladm` command output for the system.
pub async fn dladm_info()
-> Vec<Result<SledDiagnosticsCmdOutput, SledDiagnosticsCmdError>> {
[
let mut commands = MultipleCommands::new();
for command in [
dladm_show_phys(),
dladm_show_ether(),
dladm_show_link(),
dladm_show_vnic(),
dladm_show_linkprop(),
]
.into_iter()
.map(|c| async move {
execute_command_with_timeout(c, DEFAULT_TIMEOUT).await
})
.collect::<FuturesUnordered<_>>()
.collect::<Vec<Result<_, _>>>()
.await
] {
commands
.add_command(execute_command_with_timeout(command, DEFAULT_TIMEOUT))
.await
}
commands.join_all().await
}

pub async fn nvmeadm_info()
Expand All @@ -83,14 +138,16 @@ pub async fn pargs_oxide_processes(
Err(e) => return vec![Err(e.into())],
};

pids.iter()
.map(|pid| pargs_process(*pid))
.map(|c| async move {
execute_command_with_timeout(c, DEFAULT_TIMEOUT).await
})
.collect::<FuturesUnordered<_>>()
.collect::<Vec<Result<_, _>>>()
.await
let mut commands = MultipleCommands::new();
for pid in pids {
commands
.add_command(execute_command_with_timeout(
pargs_process(pid),
DEFAULT_TIMEOUT,
))
.await;
}
commands.join_all().await
}

pub async fn pstack_oxide_processes(
Expand All @@ -104,14 +161,16 @@ pub async fn pstack_oxide_processes(
Err(e) => return vec![Err(e.into())],
};

pids.iter()
.map(|pid| pstack_process(*pid))
.map(|c| async move {
execute_command_with_timeout(c, DEFAULT_TIMEOUT).await
})
.collect::<FuturesUnordered<_>>()
.collect::<Vec<Result<_, _>>>()
.await
let mut commands = MultipleCommands::new();
for pid in pids {
commands
.add_command(execute_command_with_timeout(
pstack_process(pid),
DEFAULT_TIMEOUT,
))
.await;
}
commands.join_all().await
}

pub async fn pfiles_oxide_processes(
Expand All @@ -125,14 +184,16 @@ pub async fn pfiles_oxide_processes(
Err(e) => return vec![Err(e.into())],
};

pids.iter()
.map(|pid| pfiles_process(*pid))
.map(|c| async move {
execute_command_with_timeout(c, DEFAULT_TIMEOUT).await
})
.collect::<FuturesUnordered<_>>()
.collect::<Vec<Result<_, _>>>()
.await
let mut commands = MultipleCommands::new();
for pid in pids {
commands
.add_command(execute_command_with_timeout(
pfiles_process(pid),
DEFAULT_TIMEOUT,
))
.await;
}
commands.join_all().await
}

/// Retrieve various `zfs` command output for the system.
Expand Down
Loading