Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions relay-server/benches/benches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ fn benchmark_sqlite_envelope_stack(c: &mut Criterion) {
2,
ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
true,
);

let mut envelopes = Vec::with_capacity(size);
Expand Down Expand Up @@ -139,6 +140,7 @@ fn benchmark_sqlite_envelope_stack(c: &mut Criterion) {
2,
ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
true,
);

// Pre-fill the stack
Expand Down Expand Up @@ -179,6 +181,7 @@ fn benchmark_sqlite_envelope_stack(c: &mut Criterion) {
2,
ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
true,
);

// Pre-generate envelopes
Expand Down
19 changes: 14 additions & 5 deletions relay-server/src/services/buffer/envelope_buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::services::buffer::envelope_stack::EnvelopeStack;
use crate::services::buffer::envelope_store::sqlite::SqliteEnvelopeStoreError;
use crate::services::buffer::stack_provider::memory::MemoryStackProvider;
use crate::services::buffer::stack_provider::sqlite::SqliteStackProvider;
use crate::services::buffer::stack_provider::StackProvider;
use crate::services::buffer::stack_provider::{StackCreationType, StackProvider};
use crate::statsd::{RelayCounters, RelayGauges, RelayHistograms, RelayTimers};
use crate::utils::MemoryChecker;

Expand Down Expand Up @@ -230,8 +230,14 @@ where
{
stack.push(envelope).await?;
} else {
self.push_stack(ProjectKeyPair::from_envelope(&envelope), Some(envelope))
.await?;
// Since we have initialization code that creates all the necessary stacks, we assume
// that any new stack that is added during the envelope buffer's lifecycle, is recreated.
self.push_stack(
StackCreationType::New,
ProjectKeyPair::from_envelope(&envelope),
Some(envelope),
)
.await?;
}
self.priority_queue
.change_priority_by(&project_key_pair, |prio| {
Expand Down Expand Up @@ -355,14 +361,17 @@ where
/// Pushes a new [`EnvelopeStack`] with the given [`Envelope`] inserted.
async fn push_stack(
&mut self,
stack_creation_type: StackCreationType,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make StackCreationType an enum with data and remove project_key_pair and envelope parameters, something like

enum StackOrigin {
    Existing(ProjectKeyPair),
    NewEnvelope(Box<Envelope>),
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this is a confusing enumerator since it doesn't depict what is going on. I would rather always create an empty stack and specify the context in which it is created.

project_key_pair: ProjectKeyPair,
envelope: Option<Box<Envelope>>,
) -> Result<(), EnvelopeBufferError> {
let received_at = envelope
.as_ref()
.map_or(Instant::now(), |e| e.meta().start_time());

let mut stack = self.stack_provider.create_stack(project_key_pair);
let mut stack = self
.stack_provider
.create_stack(stack_creation_type, project_key_pair);
if let Some(envelope) = envelope {
stack.push(envelope).await?;
}
Expand Down Expand Up @@ -411,7 +420,7 @@ where
/// Creates all the [`EnvelopeStack`]s with no data given a set of [`ProjectKeyPair`].
async fn load_stacks(&mut self, project_key_pairs: HashSet<ProjectKeyPair>) {
for project_key_pair in project_key_pairs {
self.push_stack(project_key_pair, None)
self.push_stack(StackCreationType::Initialization, project_key_pair, None)
.await
.expect("Pushing an empty stack raised an error");
}
Expand Down
9 changes: 8 additions & 1 deletion relay-server/src/services/buffer/envelope_stack/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ impl SqliteEnvelopeStack {
max_batches: usize,
own_key: ProjectKey,
sampling_key: ProjectKey,
check_disk: bool,
) -> Self {
Self {
envelope_store,
Expand All @@ -64,7 +65,7 @@ impl SqliteEnvelopeStack {
sampling_key,
batches_buffer: VecDeque::with_capacity(max_batches),
batches_buffer_size: 0,
check_disk: true,
check_disk,
}
}

Expand Down Expand Up @@ -247,6 +248,7 @@ mod tests {
2,
ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
ProjectKey::parse("c25ae32be2584e0bbd7a4cbb95971fe1").unwrap(),
true,
);

let envelope = mock_envelope(Instant::now());
Expand All @@ -263,6 +265,7 @@ mod tests {
2,
ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(),
true,
);

let envelopes = mock_envelopes(4);
Expand Down Expand Up @@ -315,6 +318,7 @@ mod tests {
2,
ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(),
true,
);

// We pop with an invalid db.
Expand All @@ -334,6 +338,7 @@ mod tests {
2,
ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(),
true,
);

// We pop with no elements.
Expand All @@ -351,6 +356,7 @@ mod tests {
2,
ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(),
true,
);

let envelopes = mock_envelopes(5);
Expand Down Expand Up @@ -388,6 +394,7 @@ mod tests {
2,
ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(),
true,
);

let envelopes = mock_envelopes(15);
Expand Down
6 changes: 4 additions & 2 deletions relay-server/src/services/buffer/stack_provider/memory.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use crate::services::buffer::common::ProjectKeyPair;
use crate::services::buffer::envelope_stack::memory::MemoryEnvelopeStack;
use crate::services::buffer::stack_provider::{InitializationState, StackProvider};
use crate::services::buffer::stack_provider::{
InitializationState, StackCreationType, StackProvider,
};
use crate::utils::MemoryChecker;

#[derive(Debug)]
Expand All @@ -23,7 +25,7 @@ impl StackProvider for MemoryStackProvider {
InitializationState::empty()
}

fn create_stack(&self, _: ProjectKeyPair) -> Self::Stack {
fn create_stack(&self, _: StackCreationType, _: ProjectKeyPair) -> Self::Stack {
MemoryEnvelopeStack::new()
}

Expand Down
14 changes: 13 additions & 1 deletion relay-server/src/services/buffer/stack_provider/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ impl InitializationState {
}
}

/// The creation type for the [`EnvelopeStack`].
pub enum StackCreationType {
/// An [`EnvelopeStack`] that is created during initialization.
Initialization,
/// An [`EnvelopeStack`] that is created when an envelope is received.
New,
}

/// A provider of [`EnvelopeStack`] instances that is responsible for creating them.
pub trait StackProvider: std::fmt::Debug {
/// The implementation of [`EnvelopeStack`] that this manager creates.
Expand All @@ -37,7 +45,11 @@ pub trait StackProvider: std::fmt::Debug {
fn initialize(&self) -> impl Future<Output = InitializationState>;

/// Creates an [`EnvelopeStack`].
fn create_stack(&self, project_key_pair: ProjectKeyPair) -> Self::Stack;
fn create_stack(
&self,
stack_creation_type: StackCreationType,
project_key_pair: ProjectKeyPair,
) -> Self::Stack;

/// Returns `true` if the store used by this [`StackProvider`] has space to add new
/// stacks or items to the stacks.
Expand Down
16 changes: 14 additions & 2 deletions relay-server/src/services/buffer/stack_provider/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ use crate::services::buffer::common::ProjectKeyPair;
use crate::services::buffer::envelope_store::sqlite::{
SqliteEnvelopeStore, SqliteEnvelopeStoreError,
};
use crate::services::buffer::stack_provider::{InitializationState, StackProvider};
use crate::services::buffer::stack_provider::{
InitializationState, StackCreationType, StackProvider,
};
use crate::SqliteEnvelopeStack;

#[derive(Debug)]
Expand Down Expand Up @@ -46,13 +48,23 @@ impl StackProvider for SqliteStackProvider {
}
}

fn create_stack(&self, project_key_pair: ProjectKeyPair) -> Self::Stack {
fn create_stack(
&self,
stack_creation_type: StackCreationType,
project_key_pair: ProjectKeyPair,
) -> Self::Stack {
SqliteEnvelopeStack::new(
self.envelope_store.clone(),
self.disk_batch_size,
self.max_batches,
project_key_pair.own_key,
project_key_pair.sampling_key,
// We want to check the disk by default if we are creating the stack for the first time,
// since we might have some data on disk.
// On the other hand, if we are recreating a stack, it means that we popped it because
// it was empty, or we never had data on disk for that stack, so we assume by default
// that there is no need to check disk until some data is spooled.
matches!(stack_creation_type, StackCreationType::Initialization),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Can we move this to a helper function assume_data_on_disk(stack_creation_type) to clarify intent? The code comment can then be a doc comment on that function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, will do!

)
}

Expand Down