Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion crates/media/src/sources/screen_capture.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use cap_flags::FLAGS;
use cidre::cm;
use cpal::traits::{DeviceTrait, HostTrait};
use ffmpeg::{format::Sample, frame::Audio, ChannelLayout};
use ffmpeg_sys_next::AV_TIME_BASE_Q;
Expand Down
8 changes: 1 addition & 7 deletions crates/rendering/src/decoder/avassetreader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use std::{
path::PathBuf,
rc::Rc,
sync::{mpsc, Arc},
time::Instant,
};

use cidre::{
Expand All @@ -13,9 +12,8 @@ use cidre::{
};
use ffmpeg::{format, frame, Rational};
use tokio::{runtime::Handle as TokioHandle, sync::oneshot};
use tracing::{debug, info};

use super::{pts_to_frame, DecodedFrame, VideoDecoderMessage, FRAME_CACHE_SIZE};
use super::{pts_to_frame, VideoDecoderMessage, FRAME_CACHE_SIZE};

#[derive(Clone)]
struct ProcessedFrame {
Expand Down Expand Up @@ -189,22 +187,19 @@ impl CachedFrame {

pub struct AVAssetReaderDecoder {
inner: cap_video_decode::AVAssetReaderDecoder,
last_decoded_frame: Option<(u32, CachedFrame)>,
is_done: bool,
}

impl AVAssetReaderDecoder {
fn new(path: PathBuf, tokio_handle: TokioHandle) -> Result<Self, String> {
Ok(Self {
inner: cap_video_decode::AVAssetReaderDecoder::new(path, tokio_handle)?,
last_decoded_frame: None,
is_done: false,
})
}

fn reset(&mut self, requested_time: f32) {
let _ = self.inner.reset(requested_time);
self.last_decoded_frame = None;
}

pub fn spawn(
Expand Down Expand Up @@ -306,7 +301,6 @@ impl AVAssetReaderDecoder {
number: current_frame,
};

this.last_decoded_frame = Some((current_frame, cache_frame.clone()));
this.is_done = false;

// Handles frame skips.
Expand Down
226 changes: 117 additions & 109 deletions crates/rendering/src/decoder/ffmpeg.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,28 @@
use std::{
cell::RefCell,
collections::BTreeMap,
path::PathBuf,
rc::Rc,
sync::{mpsc, Arc},
};

use ffmpeg::{codec, format, frame, software, Codec};
use ffmpeg_sys_next::{avcodec_find_decoder, AVHWDeviceType};
use log::debug;
use tokio::sync::oneshot;
use tracing::error;

use super::{pts_to_frame, DecodedFrame, VideoDecoderMessage, FRAME_CACHE_SIZE};
use super::{pts_to_frame, VideoDecoderMessage, FRAME_CACHE_SIZE};

#[derive(Clone)]
struct CachedFrame {
data: CachedFrameData,
struct ProcessedFrame {
number: u32,
data: Arc<Vec<u8>>,
}

impl CachedFrame {
fn process(&mut self, width: u32, height: u32) -> Arc<Vec<u8>> {
match &mut self.data {
CachedFrameData::Raw(frame) => {
fn process(&mut self, width: u32, height: u32) -> ProcessedFrame {
match self {
Self::Raw { frame, number } => {
let rgb_frame = if frame.format() != format::Pixel::RGBA {
// Reinitialize the scaler with the new input format
let mut scaler =
Expand Down Expand Up @@ -48,21 +50,24 @@ impl CachedFrame {
frame_buffer.extend_from_slice(&line_data[0..width * 4]);
}

let data = Arc::new(frame_buffer);
let data = ProcessedFrame {
data: Arc::new(frame_buffer),
number: *number,
};

self.data = CachedFrameData::Processed(data.clone());
*self = Self::Processed(data.clone());

data
}
CachedFrameData::Processed(data) => data.clone(),
Self::Processed(data) => data.clone(),
}
}
}

#[derive(Clone)]
enum CachedFrameData {
Raw(frame::Video),
Processed(Arc<Vec<u8>>),
enum CachedFrame {
Raw { frame: frame::Video, number: u32 },
Processed(ProcessedFrame),
}

pub struct FfmpegDecoder;
Expand Down Expand Up @@ -95,12 +100,11 @@ impl FfmpegDecoder {
// frames that are within render_more_margin of this frame won't trigger decode.
let mut last_active_frame = None::<u32>;

let mut last_decoded_frame = None::<u32>;
let mut last_sent_frame = None::<(u32, DecodedFrame)>;
let last_sent_frame = Rc::new(RefCell::new(None::<ProcessedFrame>));

let mut peekable_requests = PeekableReceiver { rx, peeked: None };

let mut frames = this.frames().peekable();
let mut frames = this.frames();

let _ = ready_tx.send(Ok(()));

Expand All @@ -114,143 +118,147 @@ impl FfmpegDecoder {
let mut sender = if let Some(cached) = cache.get_mut(&requested_frame) {
let data = cached.process(width, height);

sender.send(data.clone()).ok();
last_sent_frame = Some((requested_frame, data));
sender.send(data.data.clone()).ok();
*last_sent_frame.borrow_mut() = Some(data);
continue;
} else {
Some(sender)
let last_sent_frame = last_sent_frame.clone();
Some(move |data: ProcessedFrame| {
*last_sent_frame.borrow_mut() = Some(data.clone());
let _ = sender.send(data.data);
})
};

let cache_min = requested_frame.saturating_sub(FRAME_CACHE_SIZE as u32 / 2);
let cache_max = requested_frame + FRAME_CACHE_SIZE as u32 / 2;

if requested_frame == 0
|| last_sent_frame
.borrow()
.as_ref()
.map(|last| {
requested_frame < last.0 ||
// seek forward for big jumps. this threshold is arbitrary but should be derived from i-frames in future
requested_frame - last.0 > FRAME_CACHE_SIZE as u32
requested_frame < last.number
// seek forward for big jumps. this threshold is arbitrary but should be derived from i-frames in future
|| requested_frame - last.number > FRAME_CACHE_SIZE as u32
})
.unwrap_or(true)
{
debug!("seeking to {}", requested_frame);

this.reset(requested_time);
frames = this.frames().peekable();

last_decoded_frame = None;
let _ = this.reset(requested_time);
frames = this.frames();
}

last_active_frame = Some(requested_frame);

let mut exit = false;

loop {
if peekable_requests.peek().is_some() {
break;
}
for frame in &mut frames {
let Ok(frame) = frame.map_err(|e| format!("read frame / {e}")) else {
continue;
};

let Some(frame) = frames.next() else {
break;
let current_frame =
pts_to_frame(frame.pts().unwrap() - start_time, time_base, fps);

let mut cache_frame = CachedFrame::Raw {
frame,
number: current_frame,
};

// Handles frame skips.
// We use the cache instead of last_sent_frame as newer non-matching frames could have been decoded.
if let Some(most_recent_prev_frame) =
cache.iter_mut().rev().find(|v| *v.0 < requested_frame)
{
let frame = match frame {
Ok(f) => f,
Err(e) => {
error!("Error decoding frame: {}", e);
break;
}
};

let current_frame =
pts_to_frame(frame.pts().unwrap() - start_time, time_base, fps);

// Handles frame skips. requested_frame == last_decoded_frame should be handled by the frame cache.
if let Some((last_decoded_frame, sender)) = last_decoded_frame
.filter(|last_decoded_frame| {
requested_frame > *last_decoded_frame
&& requested_frame < current_frame
})
.and_then(|l| Some((l, sender.take()?)))
{
let Some(data) = cache
.get_mut(&last_decoded_frame)
.map(|f| f.process(width, height))
else {
break;
};

last_sent_frame = Some((last_decoded_frame, data.clone()));
sender.send(data).ok();
if let Some(sender) = sender.take() {
(sender)(most_recent_prev_frame.1.process(width, height));
}
}

last_decoded_frame = Some(current_frame);

let exceeds_cache_bounds = current_frame > cache_max;
let too_small_for_cache_bounds = current_frame < cache_min;
let exceeds_cache_bounds = current_frame > cache_max;
let too_small_for_cache_bounds = current_frame < cache_min;

if !too_small_for_cache_bounds {
let mut cache_frame = CachedFrame {
data: CachedFrameData::Raw(frame),
};
let cache_frame = if !too_small_for_cache_bounds {
if current_frame == requested_frame {
if let Some(sender) = sender.take() {
let data = cache_frame.process(width, height);
// info!("sending frame {requested_frame}");

if current_frame == requested_frame {
if let Some(sender) = sender.take() {
let data = cache_frame.process(width, height);
last_sent_frame = Some((current_frame, data.clone()));
sender.send(data).ok();
(sender)(data);

break;
}
break;
}
}

if cache.len() >= FRAME_CACHE_SIZE {
if let Some(last_active_frame) = &last_active_frame {
let frame = if requested_frame > *last_active_frame {
*cache.keys().next().unwrap()
} else if requested_frame < *last_active_frame {
*cache.keys().next_back().unwrap()
} else {
let min = *cache.keys().min().unwrap();
let max = *cache.keys().max().unwrap();
if cache.len() >= FRAME_CACHE_SIZE {
if let Some(last_active_frame) = &last_active_frame {
let frame = if requested_frame > *last_active_frame {
*cache.keys().next().unwrap()
} else if requested_frame < *last_active_frame {
*cache.keys().next_back().unwrap()
} else {
let min = *cache.keys().min().unwrap();
let max = *cache.keys().max().unwrap();

if current_frame > max {
min
} else {
max
}
};
if current_frame > max {
min
} else {
max
}
};

cache.remove(&frame);
} else {
cache.clear()
}
cache.remove(&frame);
} else {
cache.clear()
}

cache.insert(current_frame, cache_frame);
}

exit = exit || exceeds_cache_bounds;
}
}
cache.insert(current_frame, cache_frame);
cache.get_mut(&current_frame).unwrap()
} else {
&mut cache_frame
};

if current_frame > requested_frame && sender.is_some() {
// not inlining this is important so that last_sent_frame is dropped before the sender is invoked
let last_sent_frame = last_sent_frame.borrow().clone();

// handles the case where the cache doesn't contain a frame so we fallback to the previously sent one
if let Some(last_sent_frame) = &last_sent_frame {
if last_sent_frame.0 < requested_frame {
sender.take().map(|s| s.send(last_sent_frame.1.clone()));
if let Some((sender, last_sent_frame)) =
last_sent_frame.and_then(|l| Some((sender.take()?, l)))
{
// info!(
// "sending previous frame {} for {requested_frame}",
// last_sent_frame.0
// );

(sender)(last_sent_frame);
} else if let Some(sender) = sender.take() {
// info!(
// "sending forward frame {current_frame} for {requested_frame}",
// );

(sender)(cache_frame.process(width, height));
}
}
}

if exit {
continue;
exit = exit || exceeds_cache_bounds;

if exit {
break;
}
}

if let Some((sender, last_sent_frame)) =
sender.take().zip(last_sent_frame.clone())
// not inlining this is important so that last_sent_frame is dropped before the sender is invoked
let last_sent_frame = last_sent_frame.borrow().clone();
if let Some((sender, last_sent_frame)) = sender.take().zip(last_sent_frame)
{
sender.send(last_sent_frame.1).ok();
// info!(
// "sending hail mary frame {} for {requested_frame}",
// last_sent_frame.0
// );

(sender)(last_sent_frame);
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions crates/video-decode/src/ffmpeg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
.video()
.map_err(|e| format!("video decoder / {e}"))?;

decoder.set_time_base(input_stream.time_base());

let width = decoder.width();
let height = decoder.height();

Expand Down Expand Up @@ -112,7 +114,7 @@
}
}

impl<'a> Iterator for FramesIter<'a> {

Check warning on line 117 in crates/video-decode/src/ffmpeg.rs

View workflow job for this annotation

GitHub Actions / Clippy

the following explicit lifetimes could be elided: 'a

warning: the following explicit lifetimes could be elided: 'a --> crates/video-decode/src/ffmpeg.rs:117:6 | 117 | impl<'a> Iterator for FramesIter<'a> { | ^^ ^^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#needless_lifetimes help: elide the lifetimes | 117 - impl<'a> Iterator for FramesIter<'a> { 117 + impl Iterator for FramesIter<'_> { |
type Item = Result<avframe::Video, avutil::error::Error>;

fn next(&mut self) -> Option<Self::Item> {
Expand All @@ -131,9 +133,9 @@
Err(e) => return Some(Err(e)),
}

let Some((stream, packet)) = self.packets.next() else {
return None;
};

Check warning on line 138 in crates/video-decode/src/ffmpeg.rs

View workflow job for this annotation

GitHub Actions / Clippy

this `let...else` may be rewritten with the `?` operator

warning: this `let...else` may be rewritten with the `?` operator --> crates/video-decode/src/ffmpeg.rs:136:13 | 136 | / let Some((stream, packet)) = self.packets.next() else { 137 | | return None; 138 | | }; | |______________^ help: replace it with: `let (stream, packet) = self.packets.next()?;` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#question_mark = note: `#[warn(clippy::question_mark)]` on by default

if stream.index() != self.stream_index {
continue;
Expand Down
Loading
Loading