Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 20 additions & 15 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ anyhow = "1.0.31"
ape = "0.3.0"
app_dirs = "1.1.1"
base64 = "0.12.1"
crossbeam-channel = "0.4"
diesel = { version = "1.4.4", features = ["sqlite", "r2d2"] }
diesel_migrations = { version = "1.4", features = ["sqlite"] }
flame = { version = "0.2.2", optional = true }
Expand Down
146 changes: 84 additions & 62 deletions src/index/update.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use anyhow::*;
use crossbeam_channel::{Receiver, Sender};
use diesel;
use diesel::prelude::*;
#[cfg(feature = "profile-index")]
Expand All @@ -7,14 +8,14 @@ use log::{error, info};
use rayon::prelude::*;
use regex::Regex;
use std::fs;
use std::path::Path;
use std::sync::mpsc::*;
use std::path::{Path, PathBuf};
use std::time;

use crate::config::MiscSettings;
use crate::db::{directories, misc_settings, songs, DB};
use crate::index::metadata;
use crate::vfs::VFSSource;
use metadata::SongTags;

const INDEX_BUILDING_INSERT_BUFFER_SIZE: usize = 1000; // Insertions in each transaction
const INDEX_BUILDING_CLEAN_BUFFER_SIZE: usize = 500; // Insertions in each transaction
Expand Down Expand Up @@ -82,12 +83,12 @@ impl IndexUpdater {
}

#[cfg_attr(feature = "profile-index", flame)]
fn push_song(&mut self, song: NewSong) -> Result<()> {
fn push_song(&self, song: NewSong) -> Result<()> {
self.song_sender.send(song).map_err(Error::new)
}

#[cfg_attr(feature = "profile-index", flame)]
fn push_directory(&mut self, directory: NewDirectory) -> Result<()> {
fn push_directory(&self, directory: NewDirectory) -> Result<()> {
self.directory_sender.send(directory).map_err(Error::new)
}

Expand All @@ -103,7 +104,7 @@ impl IndexUpdater {
Ok(None)
}

fn populate_directory(&mut self, parent: Option<&Path>, path: &Path) -> Result<()> {
fn populate_directory(&self, parent: Option<&Path>, path: &Path) -> Result<()> {
#[cfg(feature = "profile-index")]
let _guard = flame::start_guard(format!(
"dir: {}",
Expand Down Expand Up @@ -148,13 +149,22 @@ impl IndexUpdater {

// Sub directories
let mut sub_directories = Vec::new();
let mut song_files = Vec::new();

let files = match fs::read_dir(path) {
Ok(files) => files,
Err(e) => {
error!("Directory read error for `{}`: {}", path.display(), e);
return Err(e.into());
}
};

// Insert content
for file in fs::read_dir(path)? {
for file in files {
let file_path = match file {
Ok(ref f) => f.path(),
_ => {
error!("File read error within {}", path_string);
Err(e) => {
error!("File read error within `{}`: {}", path_string, e);
break;
}
};
Expand All @@ -174,54 +184,64 @@ impl IndexUpdater {
continue;
}

if let Some(file_path_string) = file_path.to_str() {
if let Some(tags) = metadata::read(file_path.as_path()) {
if tags.year.is_some() {
inconsistent_directory_year |=
directory_year.is_some() && directory_year != tags.year;
directory_year = tags.year;
}

if tags.album.is_some() {
inconsistent_directory_album |=
directory_album.is_some() && directory_album != tags.album;
directory_album = tags.album.as_ref().cloned();
}

if tags.album_artist.is_some() {
inconsistent_directory_artist |=
directory_artist.is_some() && directory_artist != tags.album_artist;
directory_artist = tags.album_artist.as_ref().cloned();
} else if tags.artist.is_some() {
inconsistent_directory_artist |=
directory_artist.is_some() && directory_artist != tags.artist;
directory_artist = tags.artist.as_ref().cloned();
}

let song = NewSong {
path: file_path_string.to_owned(),
parent: path_string.to_owned(),
disc_number: tags.disc_number.map(|n| n as i32),
track_number: tags.track_number.map(|n| n as i32),
title: tags.title,
duration: tags.duration.map(|n| n as i32),
artist: tags.artist,
album_artist: tags.album_artist,
album: tags.album,
year: tags.year,
artwork: artwork.as_ref().cloned(),
};

self.push_song(song)?;
}
song_files.push(file_path);
}

let song_metadata = |path: PathBuf| -> Option<(String, SongTags)> {
#[cfg(feature = "profile-index")]
let _guard = flame::start_guard("song_metadata");

path.to_str().and_then(|file_path_string| {
metadata::read(&path).map(|m| (file_path_string.to_owned(), m))
})
};
let song_tags = song_files
.into_par_iter()
.filter_map(song_metadata)
.collect::<Vec<_>>();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could avoid the allocation here by doing a fold.

Copy link
Owner

@agersant agersant Jul 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good enough for me and very readable in this form. I like this a lot better than what it was before.


for (file_path_string, tags) in song_tags {
if tags.year.is_some() {
inconsistent_directory_year |=
directory_year.is_some() && directory_year != tags.year;
directory_year = tags.year;
}

if tags.album.is_some() {
inconsistent_directory_album |=
directory_album.is_some() && directory_album != tags.album;
directory_album = tags.album.as_ref().cloned();
}

if tags.album_artist.is_some() {
inconsistent_directory_artist |=
directory_artist.is_some() && directory_artist != tags.album_artist;
directory_artist = tags.album_artist.as_ref().cloned();
} else if tags.artist.is_some() {
inconsistent_directory_artist |=
directory_artist.is_some() && directory_artist != tags.artist;
directory_artist = tags.artist.as_ref().cloned();
}

let song = NewSong {
path: file_path_string.to_owned(),
parent: path_string.to_owned(),
disc_number: tags.disc_number.map(|n| n as i32),
track_number: tags.track_number.map(|n| n as i32),
title: tags.title,
duration: tags.duration.map(|n| n as i32),
artist: tags.artist,
album_artist: tags.album_artist,
album: tags.album,
year: tags.year,
artwork: artwork.as_ref().cloned(),
};

self.push_song(song)?;
}

// Insert directory
let directory = {
#[cfg(feature = "profile-index")]
let _guard = flame::start_guard("create_directory");

if inconsistent_directory_year {
directory_year = None;
}
Expand All @@ -246,11 +266,10 @@ impl IndexUpdater {
self.push_directory(directory)?;

// Populate subdirectories
for sub_directory in sub_directories {
self.populate_directory(Some(path), &sub_directory)?;
}

Ok(())
sub_directories
.into_par_iter()
.map(|sub_directory| self.populate_directory(Some(path), &sub_directory))
Copy link
Owner

@agersant agersant Jul 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For clarity, could we use .for_each() here instead of map and collect? I'm also unsure how this collect() compacts the Iter of results into a single value.

Copy link
Contributor Author

@lnicola lnicola Jul 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's the same as the similar Iterator method. It propagates one of the Err results (note how you were using ? before, exiting at the first error). With my other error printing change, we should get an error if the channel was closed. It probably doesn't matter much, but it seemed more clear to exit like this than to not handle the errors.

I'll add a comment here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.collect() // propagate an error to the caller if one of them failed
}
}

Expand Down Expand Up @@ -322,8 +341,8 @@ pub fn populate(db: &DB) -> Result<()> {
Regex::new(&settings.index_album_art_pattern)?
};

let (directory_sender, directory_receiver) = channel();
let (song_sender, song_receiver) = channel();
let (directory_sender, directory_receiver) = crossbeam_channel::unbounded();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switched to crossbeam-channel because it's Sender is Sync (and it's also faster and more reliable than the std one).

let (song_sender, song_receiver) = crossbeam_channel::unbounded();

let songs_db = db.clone();
let directories_db = db.clone();
Expand All @@ -337,10 +356,13 @@ pub fn populate(db: &DB) -> Result<()> {
});

{
let mut updater = IndexUpdater::new(album_art_pattern, directory_sender, song_sender)?;
for target in mount_points.values() {
updater.populate_directory(None, target.as_path())?;
}
let updater = IndexUpdater::new(album_art_pattern, directory_sender, song_sender)?;
let mount_points = mount_points.values().collect::<Vec<_>>();
mount_points
.iter()
.par_bridge()
.map(|target| updater.populate_directory(None, target.as_path()))
.collect::<Result<()>>()?;
Copy link
Owner

@agersant agersant Jul 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like the same collect magic! Does something implement Into<Result> for Iter<Result>?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, it returns an error if one of the mounts failed.

}

match directories_thread.join() {
Expand Down