Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions relay-server/benches/benches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ fn benchmark_sqlite_envelope_stack(c: &mut Criterion) {
2,
ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
true,
);

let mut envelopes = Vec::with_capacity(size);
Expand Down Expand Up @@ -139,6 +140,7 @@ fn benchmark_sqlite_envelope_stack(c: &mut Criterion) {
2,
ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
true,
);

// Pre-fill the stack
Expand Down Expand Up @@ -179,6 +181,7 @@ fn benchmark_sqlite_envelope_stack(c: &mut Criterion) {
2,
ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
true,
);

// Pre-generate envelopes
Expand Down
19 changes: 14 additions & 5 deletions relay-server/src/services/buffer/envelope_buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::services::buffer::envelope_stack::EnvelopeStack;
use crate::services::buffer::envelope_store::sqlite::SqliteEnvelopeStoreError;
use crate::services::buffer::stack_provider::memory::MemoryStackProvider;
use crate::services::buffer::stack_provider::sqlite::SqliteStackProvider;
use crate::services::buffer::stack_provider::StackProvider;
use crate::services::buffer::stack_provider::{StackCreationType, StackProvider};
use crate::statsd::{RelayCounters, RelayGauges, RelayHistograms, RelayTimers};
use crate::utils::MemoryChecker;

Expand Down Expand Up @@ -230,8 +230,14 @@ where
{
stack.push(envelope).await?;
} else {
self.push_stack(ProjectKeyPair::from_envelope(&envelope), Some(envelope))
.await?;
// Since we have initialization code that creates all the necessary stacks, we assume
// that any new stack that is added during the envelope buffer's lifecycle, is recreated.
self.push_stack(
StackCreationType::New,
ProjectKeyPair::from_envelope(&envelope),
Some(envelope),
)
.await?;
}
self.priority_queue
.change_priority_by(&project_key_pair, |prio| {
Expand Down Expand Up @@ -355,14 +361,17 @@ where
/// Pushes a new [`EnvelopeStack`] with the given [`Envelope`] inserted.
async fn push_stack(
&mut self,
stack_creation_type: StackCreationType,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make StackCreationType an enum with data and remove project_key_pair and envelope parameters, something like

enum StackOrigin {
    Existing(ProjectKeyPair),
    NewEnvelope(Box<Envelope>),
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this is a confusing enumerator since it doesn't depict what is going on. I would rather always create an empty stack and specify the context in which it is created.

project_key_pair: ProjectKeyPair,
envelope: Option<Box<Envelope>>,
) -> Result<(), EnvelopeBufferError> {
let received_at = envelope
.as_ref()
.map_or(Instant::now(), |e| e.meta().start_time());

let mut stack = self.stack_provider.create_stack(project_key_pair);
let mut stack = self
.stack_provider
.create_stack(stack_creation_type, project_key_pair);
if let Some(envelope) = envelope {
stack.push(envelope).await?;
}
Expand Down Expand Up @@ -411,7 +420,7 @@ where
/// Creates all the [`EnvelopeStack`]s with no data given a set of [`ProjectKeyPair`].
async fn load_stacks(&mut self, project_key_pairs: HashSet<ProjectKeyPair>) {
for project_key_pair in project_key_pairs {
self.push_stack(project_key_pair, None)
self.push_stack(StackCreationType::Initialization, project_key_pair, None)
.await
.expect("Pushing an empty stack raised an error");
}
Expand Down
9 changes: 8 additions & 1 deletion relay-server/src/services/buffer/envelope_stack/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ impl SqliteEnvelopeStack {
max_batches: usize,
own_key: ProjectKey,
sampling_key: ProjectKey,
check_disk: bool,
) -> Self {
Self {
envelope_store,
Expand All @@ -64,7 +65,7 @@ impl SqliteEnvelopeStack {
sampling_key,
batches_buffer: VecDeque::with_capacity(max_batches),
batches_buffer_size: 0,
check_disk: true,
check_disk,
}
}

Expand Down Expand Up @@ -247,6 +248,7 @@ mod tests {
2,
ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
ProjectKey::parse("c25ae32be2584e0bbd7a4cbb95971fe1").unwrap(),
true,
);

let envelope = mock_envelope(Instant::now());
Expand All @@ -263,6 +265,7 @@ mod tests {
2,
ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(),
true,
);

let envelopes = mock_envelopes(4);
Expand Down Expand Up @@ -315,6 +318,7 @@ mod tests {
2,
ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(),
true,
);

// We pop with an invalid db.
Expand All @@ -334,6 +338,7 @@ mod tests {
2,
ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(),
true,
);

// We pop with no elements.
Expand All @@ -351,6 +356,7 @@ mod tests {
2,
ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(),
true,
);

let envelopes = mock_envelopes(5);
Expand Down Expand Up @@ -388,6 +394,7 @@ mod tests {
2,
ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(),
true,
);

let envelopes = mock_envelopes(15);
Expand Down
6 changes: 4 additions & 2 deletions relay-server/src/services/buffer/stack_provider/memory.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use crate::services::buffer::common::ProjectKeyPair;
use crate::services::buffer::envelope_stack::memory::MemoryEnvelopeStack;
use crate::services::buffer::stack_provider::{InitializationState, StackProvider};
use crate::services::buffer::stack_provider::{
InitializationState, StackCreationType, StackProvider,
};
use crate::utils::MemoryChecker;

#[derive(Debug)]
Expand All @@ -23,7 +25,7 @@ impl StackProvider for MemoryStackProvider {
InitializationState::empty()
}

fn create_stack(&self, _: ProjectKeyPair) -> Self::Stack {
fn create_stack(&self, _: StackCreationType, _: ProjectKeyPair) -> Self::Stack {
MemoryEnvelopeStack::new()
}

Expand Down
14 changes: 13 additions & 1 deletion relay-server/src/services/buffer/stack_provider/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ impl InitializationState {
}
}

/// The creation type for the [`EnvelopeStack`].
pub enum StackCreationType {
/// An [`EnvelopeStack`] that is created during initialization.
Initialization,
/// An [`EnvelopeStack`] that is created when an envelope is received.
New,
}

/// A provider of [`EnvelopeStack`] instances that is responsible for creating them.
pub trait StackProvider: std::fmt::Debug {
/// The implementation of [`EnvelopeStack`] that this manager creates.
Expand All @@ -37,7 +45,11 @@ pub trait StackProvider: std::fmt::Debug {
fn initialize(&self) -> impl Future<Output = InitializationState>;

/// Creates an [`EnvelopeStack`].
fn create_stack(&self, project_key_pair: ProjectKeyPair) -> Self::Stack;
fn create_stack(
&self,
stack_creation_type: StackCreationType,
project_key_pair: ProjectKeyPair,
) -> Self::Stack;

/// Returns `true` if the store used by this [`StackProvider`] has space to add new
/// stacks or items to the stacks.
Expand Down
21 changes: 19 additions & 2 deletions relay-server/src/services/buffer/stack_provider/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ use crate::services::buffer::common::ProjectKeyPair;
use crate::services::buffer::envelope_store::sqlite::{
SqliteEnvelopeStore, SqliteEnvelopeStoreError,
};
use crate::services::buffer::stack_provider::{InitializationState, StackProvider};
use crate::services::buffer::stack_provider::{
InitializationState, StackCreationType, StackProvider,
};
use crate::SqliteEnvelopeStack;

#[derive(Debug)]
Expand All @@ -28,6 +30,11 @@ impl SqliteStackProvider {
max_disk_size: config.spool_envelopes_max_disk_size(),
})
}

/// Returns `true` when there might be data residing on disk, `false` otherwise.
fn assume_data_on_disk(stack_creation_type: StackCreationType) -> bool {
matches!(stack_creation_type, StackCreationType::Initialization)
}
}

impl StackProvider for SqliteStackProvider {
Expand All @@ -46,13 +53,23 @@ impl StackProvider for SqliteStackProvider {
}
}

fn create_stack(&self, project_key_pair: ProjectKeyPair) -> Self::Stack {
fn create_stack(
&self,
stack_creation_type: StackCreationType,
project_key_pair: ProjectKeyPair,
) -> Self::Stack {
SqliteEnvelopeStack::new(
self.envelope_store.clone(),
self.disk_batch_size,
self.max_batches,
project_key_pair.own_key,
project_key_pair.sampling_key,
// We want to check the disk by default if we are creating the stack for the first time,
// since we might have some data on disk.
// On the other hand, if we are recreating a stack, it means that we popped it because
// it was empty, or we never had data on disk for that stack, so we assume by default
// that there is no need to check disk until some data is spooled.
Self::assume_data_on_disk(stack_creation_type),
)
}

Expand Down