Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/parseable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ impl Parseable {
}

// Gets write privileges only for creating the stream when it doesn't already exist.
self.streams.create(
self.streams.get_or_create(
self.options.clone(),
stream_name.to_owned(),
LogStreamMetadata::default(),
Expand Down Expand Up @@ -342,7 +342,7 @@ impl Parseable {
schema_version,
log_source,
);
self.streams.create(
self.streams.get_or_create(
self.options.clone(),
stream_name.to_string(),
metadata,
Expand Down Expand Up @@ -652,7 +652,7 @@ impl Parseable {
SchemaVersion::V1, // New stream
log_source,
);
self.streams.create(
self.streams.get_or_create(
self.options.clone(),
stream_name.to_string(),
metadata,
Expand Down
13 changes: 9 additions & 4 deletions src/parseable/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -737,17 +737,22 @@ pub struct Streams(RwLock<HashMap<String, StreamRef>>);
// 4. When first event is sent to stream (update the schema)
// 5. When set alert API is called (update the alert)
impl Streams {
pub fn create(
/// Checks after getting an excluse lock whether already stream exists, else creates it.
/// NOTE: This is done to ensure we don't have contention among threads.
pub fn get_or_create(
&self,
options: Arc<Options>,
stream_name: String,
metadata: LogStreamMetadata,
ingestor_id: Option<String>,
) -> StreamRef {
let mut guard = self.write().expect(LOCK_EXPECT);
if let Some(stream) = guard.get(&stream_name) {
return stream.clone();
}

let stream = Stream::new(options, &stream_name, metadata, ingestor_id);
self.write()
.expect(LOCK_EXPECT)
.insert(stream_name, stream.clone());
guard.insert(stream_name, stream.clone());

stream
}
Expand Down
Loading