Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions crates/bevy_ecs/src/query/access.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1288,6 +1288,14 @@ impl<T: SparseSetIndex> Clone for FilteredAccessSet<T> {
}

impl<T: SparseSetIndex> FilteredAccessSet<T> {
/// Creates an empty [`FilteredAccessSet`].
pub const fn new() -> Self {
Self {
combined_access: Access::new(),
filtered_accesses: Vec::new(),
}
}

/// Returns a reference to the unfiltered access of the entire set.
#[inline]
pub fn combined_access(&self) -> &Access<T> {
Expand Down Expand Up @@ -1411,10 +1419,7 @@ impl<T: SparseSetIndex> FilteredAccessSet<T> {

impl<T: SparseSetIndex> Default for FilteredAccessSet<T> {
fn default() -> Self {
Self {
combined_access: Default::default(),
filtered_accesses: Vec::new(),
}
Self::new()
}
}

Expand Down
6 changes: 5 additions & 1 deletion crates/bevy_ecs/src/schedule/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::{
component::{ComponentId, Tick},
error::{BevyError, ErrorContext, Result},
prelude::{IntoSystemSet, SystemSet},
query::Access,
query::{Access, FilteredAccessSet},
schedule::{BoxedCondition, InternedSystemSet, NodeId, SystemTypeSet},
system::{ScheduleSystem, System, SystemIn, SystemParamValidationError},
world::{unsafe_world_cell::UnsafeWorldCell, DeferredWorld, World},
Expand Down Expand Up @@ -174,6 +174,10 @@ impl System for ApplyDeferred {
const { &Access::new() }
}

fn component_access_set(&self) -> &FilteredAccessSet<ComponentId> {
const { &FilteredAccessSet::new() }
}

fn archetype_component_access(&self) -> &Access<ArchetypeComponentId> {
// This system accesses no archetype components.
const { &Access::new() }
Expand Down
122 changes: 79 additions & 43 deletions crates/bevy_ecs/src/schedule/executor/multi_threaded.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use alloc::{boxed::Box, vec::Vec};
use bevy_platform::sync::Arc;
use bevy_tasks::{ComputeTaskPool, Scope, TaskPool, ThreadExecutor};
use bevy_utils::{default, syncunsafecell::SyncUnsafeCell};
use bevy_utils::syncunsafecell::SyncUnsafeCell;
use concurrent_queue::ConcurrentQueue;
use core::{any::Any, panic::AssertUnwindSafe};
use fixedbitset::FixedBitSet;
Expand All @@ -13,10 +13,8 @@ use std::sync::{Mutex, MutexGuard};
use tracing::{info_span, Span};

use crate::{
archetype::ArchetypeComponentId,
error::{default_error_handler, BevyError, ErrorContext, Result},
prelude::Resource,
query::Access,
schedule::{is_apply_deferred, BoxedCondition, ExecutorKind, SystemExecutor, SystemSchedule},
system::ScheduleSystem,
world::{unsafe_world_cell::UnsafeWorldCell, World},
Expand Down Expand Up @@ -62,8 +60,13 @@ impl<'env, 'sys> Environment<'env, 'sys> {
/// Per-system data used by the [`MultiThreadedExecutor`].
// Copied here because it can't be read from the system when it's running.
struct SystemTaskMetadata {
/// The [`ArchetypeComponentId`] access of the system.
archetype_component_access: Access<ArchetypeComponentId>,
/// The set of systems whose `component_access_set()` conflicts with this one.
conflicting_systems: FixedBitSet,
/// The set of systems whose `component_access_set()` conflicts with this system's conditions.
/// Note that this is separate from `conflicting_systems` to handle the case where
/// a system is skipped by an earlier system set condition or system stepping,
/// and needs access to run its conditions but not for itself.
condition_conflicting_systems: FixedBitSet,
/// Indices of the systems that directly depend on the system.
dependents: Vec<usize>,
/// Is `true` if the system does not access `!Send` data.
Expand Down Expand Up @@ -97,8 +100,8 @@ pub struct MultiThreadedExecutor {
pub struct ExecutorState {
/// Metadata for scheduling and running system tasks.
system_task_metadata: Vec<SystemTaskMetadata>,
/// Union of the accesses of all currently running systems.
active_access: Access<ArchetypeComponentId>,
/// The set of systems whose `component_access_set()` conflicts with this system set's conditions.
set_condition_conflicting_systems: Vec<FixedBitSet>,
/// Returns `true` if a system with non-`Send` access is running.
local_thread_running: bool,
/// Returns `true` if an exclusive system is running.
Expand Down Expand Up @@ -164,7 +167,8 @@ impl SystemExecutor for MultiThreadedExecutor {
state.system_task_metadata = Vec::with_capacity(sys_count);
for index in 0..sys_count {
state.system_task_metadata.push(SystemTaskMetadata {
archetype_component_access: default(),
conflicting_systems: FixedBitSet::with_capacity(sys_count),
condition_conflicting_systems: FixedBitSet::with_capacity(sys_count),
dependents: schedule.system_dependents[index].clone(),
is_send: schedule.systems[index].is_send(),
is_exclusive: schedule.systems[index].is_exclusive(),
Expand All @@ -174,6 +178,60 @@ impl SystemExecutor for MultiThreadedExecutor {
}
}

{
#[cfg(feature = "trace")]
let _span = info_span!("calculate conflicting systems").entered();
for index1 in 0..sys_count {
let system1 = &schedule.systems[index1];
for index2 in 0..index1 {
let system2 = &schedule.systems[index2];
if !system2
.component_access_set()
.is_compatible(system1.component_access_set())
{
state.system_task_metadata[index1]
.conflicting_systems
.insert(index2);
state.system_task_metadata[index2]
.conflicting_systems
.insert(index1);
}
}

for index2 in 0..sys_count {
let system2 = &schedule.systems[index2];
if schedule.system_conditions[index1].iter().any(|condition| {
!system2
.component_access_set()
.is_compatible(condition.component_access_set())
}) {
state.system_task_metadata[index1]
.condition_conflicting_systems
.insert(index2);
}
}
}

state.set_condition_conflicting_systems.clear();
state.set_condition_conflicting_systems.reserve(set_count);
for set_idx in 0..set_count {
let mut conflicting_systems = FixedBitSet::with_capacity(sys_count);
for sys_index in 0..sys_count {
let system = &schedule.systems[sys_index];
if schedule.set_conditions[set_idx].iter().any(|condition| {
!system
.component_access_set()
.is_compatible(condition.component_access_set())
}) {
conflicting_systems.insert(sys_index);
}
}
state
.set_condition_conflicting_systems
.push(conflicting_systems);
}
}

state.num_dependencies_remaining = Vec::with_capacity(sys_count);
}

Expand Down Expand Up @@ -257,7 +315,6 @@ impl SystemExecutor for MultiThreadedExecutor {

debug_assert!(state.ready_systems.is_clear());
debug_assert!(state.running_systems.is_clear());
state.active_access.clear();
state.evaluated_sets.clear();
state.skipped_systems.clear();
state.completed_systems.clear();
Expand Down Expand Up @@ -345,9 +402,9 @@ impl ExecutorState {
fn new() -> Self {
Self {
system_task_metadata: Vec::new(),
set_condition_conflicting_systems: Vec::new(),
num_running_systems: 0,
num_dependencies_remaining: Vec::new(),
active_access: default(),
local_thread_running: false,
exclusive_running: false,
evaluated_sets: FixedBitSet::new(),
Expand All @@ -368,8 +425,6 @@ impl ExecutorState {
self.finish_system_and_handle_dependents(result);
}

self.rebuild_active_access();

// SAFETY:
// - `finish_system_and_handle_dependents` has updated the currently running systems.
// - `rebuild_active_access` locks access for all currently running systems.
Expand Down Expand Up @@ -488,37 +543,30 @@ impl ExecutorState {
{
for condition in &mut conditions.set_conditions[set_idx] {
condition.update_archetype_component_access(world);
if !condition
.archetype_component_access()
.is_compatible(&self.active_access)
{
return false;
}
}
if !self.set_condition_conflicting_systems[set_idx].is_disjoint(&self.running_systems) {
return false;
}
}

for condition in &mut conditions.system_conditions[system_index] {
condition.update_archetype_component_access(world);
if !condition
.archetype_component_access()
.is_compatible(&self.active_access)
{
return false;
}
}
if !system_meta
.condition_conflicting_systems
.is_disjoint(&self.running_systems)
{
return false;
}

if !self.skipped_systems.contains(system_index) {
system.update_archetype_component_access(world);
if !system
.archetype_component_access()
.is_compatible(&self.active_access)
if !system_meta
.conflicting_systems
.is_disjoint(&self.running_systems)
{
return false;
}

self.system_task_metadata[system_index]
.archetype_component_access
.clone_from(system.archetype_component_access());
}

true
Expand Down Expand Up @@ -648,9 +696,6 @@ impl ExecutorState {
context.system_completed(system_index, res, system);
};

self.active_access
.extend(&system_meta.archetype_component_access);

if system_meta.is_send {
context.scope.spawn(task);
} else {
Expand Down Expand Up @@ -741,15 +786,6 @@ impl ExecutorState {
}
}
}

fn rebuild_active_access(&mut self) {
self.active_access.clear();
for index in self.running_systems.ones() {
let system_meta = &self.system_task_metadata[index];
self.active_access
.extend(&system_meta.archetype_component_access);
}
}
}

fn apply_deferred(
Expand Down
6 changes: 6 additions & 0 deletions crates/bevy_ecs/src/system/adapter_system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,12 @@ where
self.system.component_access()
}

fn component_access_set(
&self,
) -> &crate::query::FilteredAccessSet<crate::component::ComponentId> {
self.system.component_access_set()
}

#[inline]
fn archetype_component_access(
&self,
Expand Down
38 changes: 25 additions & 13 deletions crates/bevy_ecs/src/system/combinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{
archetype::ArchetypeComponentId,
component::{ComponentId, Tick},
prelude::World,
query::Access,
query::{Access, FilteredAccessSet},
schedule::InternedSystemSet,
system::{input::SystemInput, SystemIn, SystemParamValidationError},
world::unsafe_world_cell::UnsafeWorldCell,
Expand Down Expand Up @@ -114,21 +114,21 @@ pub struct CombinatorSystem<Func, A, B> {
a: A,
b: B,
name: Cow<'static, str>,
component_access: Access<ComponentId>,
component_access_set: FilteredAccessSet<ComponentId>,
archetype_component_access: Access<ArchetypeComponentId>,
}

impl<Func, A, B> CombinatorSystem<Func, A, B> {
/// Creates a new system that combines two inner systems.
///
/// The returned system will only be usable if `Func` implements [`Combine<A, B>`].
pub const fn new(a: A, b: B, name: Cow<'static, str>) -> Self {
pub fn new(a: A, b: B, name: Cow<'static, str>) -> Self {
Self {
_marker: PhantomData,
a,
b,
name,
component_access: Access::new(),
component_access_set: FilteredAccessSet::default(),
archetype_component_access: Access::new(),
}
}
Expand All @@ -148,7 +148,11 @@ where
}

fn component_access(&self) -> &Access<ComponentId> {
&self.component_access
self.component_access_set.combined_access()
}

fn component_access_set(&self) -> &FilteredAccessSet<ComponentId> {
&self.component_access_set
}

fn archetype_component_access(&self) -> &Access<ArchetypeComponentId> {
Expand Down Expand Up @@ -211,8 +215,10 @@ where
fn initialize(&mut self, world: &mut World) {
self.a.initialize(world);
self.b.initialize(world);
self.component_access.extend(self.a.component_access());
self.component_access.extend(self.b.component_access());
self.component_access_set
.extend(self.a.component_access_set().clone());
self.component_access_set
.extend(self.b.component_access_set().clone());
}

fn update_archetype_component_access(&mut self, world: UnsafeWorldCell) {
Expand Down Expand Up @@ -343,7 +349,7 @@ pub struct PipeSystem<A, B> {
a: A,
b: B,
name: Cow<'static, str>,
component_access: Access<ComponentId>,
component_access_set: FilteredAccessSet<ComponentId>,
archetype_component_access: Access<ArchetypeComponentId>,
}

Expand All @@ -354,12 +360,12 @@ where
for<'a> B::In: SystemInput<Inner<'a> = A::Out>,
{
/// Creates a new system that pipes two inner systems.
pub const fn new(a: A, b: B, name: Cow<'static, str>) -> Self {
pub fn new(a: A, b: B, name: Cow<'static, str>) -> Self {
Self {
a,
b,
name,
component_access: Access::new(),
component_access_set: FilteredAccessSet::default(),
archetype_component_access: Access::new(),
}
}
Expand All @@ -379,7 +385,11 @@ where
}

fn component_access(&self) -> &Access<ComponentId> {
&self.component_access
self.component_access_set.combined_access()
}

fn component_access_set(&self) -> &FilteredAccessSet<ComponentId> {
&self.component_access_set
}

fn archetype_component_access(&self) -> &Access<ArchetypeComponentId> {
Expand Down Expand Up @@ -443,8 +453,10 @@ where
fn initialize(&mut self, world: &mut World) {
self.a.initialize(world);
self.b.initialize(world);
self.component_access.extend(self.a.component_access());
self.component_access.extend(self.b.component_access());
self.component_access_set
.extend(self.a.component_access_set().clone());
self.component_access_set
.extend(self.b.component_access_set().clone());
}

fn update_archetype_component_access(&mut self, world: UnsafeWorldCell) {
Expand Down
Loading