Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions crates/bevy_asset/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2087,4 +2087,82 @@ mod tests {
Some(())
});
}

#[test]
fn loading_subassets_in_parallel_results_in_one_task() {
struct TestAssetLoader;

impl AssetLoader for TestAssetLoader {
type Asset = TestAsset;
type Error = std::io::Error;
type Settings = ();

async fn load(
&self,
_reader: &mut dyn Reader,
_settings: &Self::Settings,
load_context: &mut LoadContext<'_>,
) -> core::result::Result<Self::Asset, Self::Error> {
load_context.add_labeled_asset("A".into(), SubText { text: "A".into() });
load_context.add_labeled_asset("B".into(), SubText { text: "B".into() });

Ok(TestAsset)
}

fn extensions(&self) -> &[&str] {
&["ron"]
}
}

let dir = Dir::default();
dir.insert_asset(Path::new("test.ron"), &[]);

let asset_source = AssetSource::build()
.with_reader(move || Box::new(MemoryAssetReader { root: dir.clone() }));

// Set up the app.

let mut app = App::new();

app.register_asset_source(AssetSourceId::Default, asset_source)
.add_plugins((TaskPoolPlugin::default(), AssetPlugin::default()))
.init_asset::<TestAsset>()
.init_asset::<SubText>()
.register_asset_loader(TestAssetLoader);

let asset_server = app.world().resource::<AssetServer>();
let _handle_a: Handle<SubText> = asset_server.load("test.ron#A");
let _handle_b: Handle<SubText> = asset_server.load("test.ron#B");

/// A function to take the asset events out of the world so they aren't deleted.
fn take_events(app: &mut App, events: &mut Vec<AssetEvent<SubText>>) {
events.extend(
app.world_mut()
.resource_mut::<Events<AssetEvent<SubText>>>()
.drain(),
);
}

// We have no API for determining whether two tasks were spawned (that's not exposed by the
// asset server or even the TaskPool API). So the best we can do is just stall for a bit to
// give both (hypothetical) tasks time to finish. If we were to break this feature, this
// test would become flaky.
let mut events = vec![];
for _ in 0..5 {
app.update();
take_events(&mut app, &mut events);
}

let mut received_adds = 0;
for event in events {
match event {
AssetEvent::Added { .. } => { received_adds += 1; }
AssetEvent::LoadedWithDependencies { .. } => {}
AssetEvent::Modified { .. } => panic!("None of the assets should have been modified, since there should only be one load going on"),
_ => {}
}
}

assert_eq!(received_adds, 2);
}
}
1 change: 1 addition & 0 deletions crates/bevy_asset/src/server/info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ pub(crate) struct AssetInfos {
pub(crate) dependency_loaded_event_sender: TypeIdMap<fn(&mut World, UntypedAssetId)>,
pub(crate) dependency_failed_event_sender:
TypeIdMap<fn(&mut World, UntypedAssetId, AssetPath<'static>, AssetLoadError)>,
pub(crate) root_asset_to_loading_count: HashMap<(TypeId, AssetPath<'static>), u32>,
pub(crate) pending_tasks: HashMap<UntypedAssetId, Task<()>>,
}

Expand Down
59 changes: 58 additions & 1 deletion crates/bevy_asset/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ use alloc::{
};
use atomicow::CowArc;
use bevy_ecs::prelude::*;
use bevy_platform::collections::HashSet;
use bevy_platform::collections::{hash_map::Entry, HashSet};
use bevy_tasks::IoTaskPool;
use bevy_utils::OnDrop;
use core::{any::TypeId, future::Future, panic::AssertUnwindSafe, task::Poll};
use crossbeam_channel::{Receiver, Sender};
use either::Either;
Expand Down Expand Up @@ -677,6 +678,62 @@ impl AssetServer {
}
})?;

// We want to prevent loading the same root asset multiple times
// (`get_or_create_path_handle` prevents loading an asset again if the asset is already
// loaded). So flag that the root asset is loading when we start loading the asset, and
// clear that flag when the asset stops loading.
let _loading_guard = {
let asset_type_id = loader.asset_type_id();
let root_path = path.without_label().clone_owned();

let mut infos = self.data.infos.write();
let entry = infos
.root_asset_to_loading_count
.entry((asset_type_id, root_path.clone_owned()));

match (entry, input_handle.is_some()) {
(Entry::Occupied(_), true) => {
// Bail out early. There's already a load for this asset running, and we already
// have the handle we need.
return Ok(None);
}
(Entry::Occupied(mut entry), false) => {
// There is already a load running, but the caller expects us to return an asset
// handle (or an error). So we proceed with the load anyway and just count this
// new load. For a concrete case, if we use `load_untyped_async`, we need to get
// the handle back, so we can't just bail out with no handle - we need to go
// through the rest of the loading process.
*entry.get_mut() += 1;
}
(Entry::Vacant(entry), _) => {
// There's a new load, so keep track of it.
entry.insert(1);
}
}

// On drop, we should decrement the loading count so future loads can load the asset.
let this = self;
OnDrop::new(move || {
let mut infos = this.data.infos.write();
let Entry::Occupied(mut entry) = infos
.root_asset_to_loading_count
.entry((asset_type_id, root_path))
else {
// Each load increments the loading count before starting, and we only decrement
// the count in this OnDrop (at most once per load), so this should never be
// missing.
unreachable!("No loading count associated with currently loading asset.");
};

// Decrement the loading count and remove it if the count is zero (so the entry is
// no longer occupied).
*entry.get_mut() -= 1;
if *entry.get() == 0 {
entry.remove();
}
})
};

if let Some(meta_transform) = input_handle.as_ref().and_then(|h| h.meta_transform()) {
(*meta_transform)(&mut *meta);
}
Expand Down
Loading