Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 8 additions & 15 deletions async-nats/src/jetstream/consumer/pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2249,22 +2249,12 @@ async fn recreate_consumer_stream(
sequence = sequence
);
let _span_handle = span.enter();
debug!("recreating consumer");
let config = config.to_owned();
trace!("get stream");
let stream = tokio::time::timeout(Duration::from_secs(5), context.get_stream(stream_name))
.await
.map_err(|err| {
ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::TimedOut, err)
})?
.map_err(|err| {
ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::GetStream, err)
})?;
trace!("delete old consumer before creating new one");

tokio::time::timeout(
Duration::from_secs(5),
stream.delete_consumer(consumer_name),
context.delete_consumer_from_stream(consumer_name, stream_name),
)
.await
.ok();
Expand All @@ -2281,10 +2271,13 @@ async fn recreate_consumer_stream(
trace!("create the new ordered consumer for sequence {}", sequence);
let consumer = tokio::time::timeout(
Duration::from_secs(5),
stream.create_consumer(jetstream::consumer::pull::OrderedConfig {
deliver_policy,
..config.clone()
}),
context.create_consumer_on_stream(
jetstream::consumer::pull::OrderedConfig {
deliver_policy,
..config.clone()
},
stream_name,
),
)
.await
.map_err(|err| ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::TimedOut, err))?
Expand Down
119 changes: 117 additions & 2 deletions async-nats/src/jetstream/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,14 @@ use std::time::Duration;
use tokio::sync::oneshot;
use tracing::debug;

use super::consumer::{Consumer, FromConsumer, IntoConsumerConfig};
use super::consumer::{self, Consumer, FromConsumer, IntoConsumerConfig};
use super::errors::ErrorCode;
use super::kv::{Store, MAX_HISTORY};
use super::object_store::{is_valid_bucket_name, ObjectStore};
use super::stream::{self, Config, DeleteStatus, DiscardPolicy, External, Info, Stream};
use super::stream::{
self, Config, ConsumerError, ConsumerErrorKind, DeleteStatus, DiscardPolicy, External, Info,
Stream,
};

/// A context which can perform jetstream scoped requests.
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -784,6 +787,118 @@ impl Context {
))
}

/// Delete a [crate::jetstream::consumer::Consumer] straight from [Context], without binding to a [Stream] first.
///
/// It has one less interaction with the server when binding to only one
/// [crate::jetstream::consumer::Consumer].
///
/// # Examples:
///
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// use async_nats::jetstream::consumer::PullConsumer;
///
/// let client = async_nats::connect("localhost:4222").await?;
/// let jetstream = async_nats::jetstream::new(client);
///
/// jetstream
/// .delete_consumer_from_stream("consumer", "stream")
/// .await?;
///
/// # Ok(())
/// # }
/// ```
pub async fn delete_consumer_from_stream<C: AsRef<str>, S: AsRef<str>>(
&self,
consumer: C,
stream: S,
) -> Result<DeleteStatus, ConsumerError> {
let subject = format!("CONSUMER.DELETE.{}.{}", stream.as_ref(), consumer.as_ref());

match self.request(subject, &json!({})).await? {
Response::Ok(delete_status) => Ok(delete_status),
Response::Err { error } => Err(error.into()),
}
}

/// Create a new `Durable` or `Ephemeral` Consumer (if `durable_name` was not provided) and
/// returns the info from the server about created [Consumer] without binding to a [Stream] first.
///
/// # Examples
///
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// use async_nats::jetstream::consumer;
/// let client = async_nats::connect("localhost:4222").await?;
/// let jetstream = async_nats::jetstream::new(client);
///
/// let consumer: consumer::PullConsumer = jetstream
/// .create_consumer_on_stream(
/// consumer::pull::Config {
/// durable_name: Some("pull".to_string()),
/// ..Default::default()
/// },
/// "stream",
/// )
/// .await?;
/// # Ok(())
/// # }
/// ```
pub async fn create_consumer_on_stream<C: IntoConsumerConfig + FromConsumer, S: AsRef<str>>(
&self,
config: C,
stream: S,
) -> Result<Consumer<C>, ConsumerError> {
let config = config.into_consumer_config();

let subject = {
if self.client.is_server_compatible(2, 9, 0) {
let filter = if config.filter_subject.is_empty() {
"".to_string()
} else {
format!(".{}", config.filter_subject)
};
config
.name
.as_ref()
.or(config.durable_name.as_ref())
.map(|name| format!("CONSUMER.CREATE.{}.{}{}", stream.as_ref(), name, filter))
.unwrap_or_else(|| format!("CONSUMER.CREATE.{}", stream.as_ref()))
} else if config.name.is_some() {
return Err(ConsumerError::with_source(
ConsumerErrorKind::Other,
"can't use consumer name with server < 2.9.0",
));
} else if let Some(ref durable_name) = config.durable_name {
format!(
"CONSUMER.DURABLE.CREATE.{}.{}",
stream.as_ref(),
durable_name
)
} else {
format!("CONSUMER.CREATE.{}", stream.as_ref())
}
};

match self
.request(
subject,
&json!({"stream_name": stream.as_ref(), "config": config}),
)
.await?
{
Response::Err { error } => Err(ConsumerError::new(ConsumerErrorKind::JetStream(error))),
Response::Ok::<consumer::Info>(info) => Ok(Consumer::new(
FromConsumer::try_from_consumer_config(info.clone().config)
.map_err(|err| ConsumerError::with_source(ConsumerErrorKind::Other, err))?,
info,
self.clone(),
)),
}
}

/// Send a request to the jetstream JSON API.
///
/// This is a low level API used mostly internally, that should be used only in
Expand Down
54 changes: 3 additions & 51 deletions async-nats/src/jetstream/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -705,57 +705,9 @@ impl Stream {
&self,
config: C,
) -> Result<Consumer<C>, ConsumerError> {
let config = config.into_consumer_config();

let subject = {
if self.context.client.is_server_compatible(2, 9, 0) {
let filter = if config.filter_subject.is_empty() {
"".to_string()
} else {
format!(".{}", config.filter_subject)
};
config
.name
.as_ref()
.or(config.durable_name.as_ref())
.map(|name| {
format!(
"CONSUMER.CREATE.{}.{}{}",
self.info.config.name, name, filter
)
})
.unwrap_or_else(|| format!("CONSUMER.CREATE.{}", self.info.config.name))
} else if config.name.is_some() {
return Err(ConsumerError::with_source(
ConsumerErrorKind::Other,
"can't use consumer name with server < 2.9.0",
));
} else if let Some(ref durable_name) = config.durable_name {
format!(
"CONSUMER.DURABLE.CREATE.{}.{}",
self.info.config.name, durable_name
)
} else {
format!("CONSUMER.CREATE.{}", self.info.config.name)
}
};

match self
.context
.request(
subject,
&json!({"stream_name": self.info.config.name.clone(), "config": config}),
)
.await?
{
Response::Err { error } => Err(ConsumerError::new(ConsumerErrorKind::JetStream(error))),
Response::Ok::<consumer::Info>(info) => Ok(Consumer::new(
FromConsumer::try_from_consumer_config(info.clone().config)
.map_err(|err| ConsumerError::with_source(ConsumerErrorKind::Other, err))?,
info,
self.context.clone(),
)),
}
self.context
.create_consumer_on_stream(config, self.info.config.name.clone())
.await
}

/// Retrieve [Info] about [Consumer] from the server.
Expand Down
61 changes: 61 additions & 0 deletions async-nats/tests/jetstream_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1130,6 +1130,67 @@ mod jetstream {
.unwrap();
}

#[tokio::test]
async fn delete_consumer_from_stream() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
let client = async_nats::connect(server.client_url()).await.unwrap();
let context = async_nats::jetstream::new(client);

let stream = context.get_or_create_stream("stream").await.unwrap();
stream
.create_consumer(consumer::pull::Config {
durable_name: Some("pull".into()),
..Default::default()
})
.await
.unwrap();
stream
.create_consumer(consumer::push::Config {
durable_name: Some("push".into()),
deliver_subject: "subject".into(),
..Default::default()
})
.await
.unwrap();

let _consumer = context
.delete_consumer_from_stream("pull", "stream")
.await
.unwrap();
assert!(stream
.get_consumer::<consumer::Config>("pull")
.await
.is_err());
}

#[tokio::test]
async fn create_consumer_on_stream() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
let client = async_nats::connect(server.client_url()).await.unwrap();
let context = async_nats::jetstream::new(client);

let stream = context.get_or_create_stream("stream").await.unwrap();
stream
.create_consumer(consumer::pull::Config {
durable_name: Some("pull".into()),
..Default::default()
})
.await
.unwrap();
let consumer = context
.create_consumer_on_stream(
consumer::push::Config {
durable_name: Some("push".into()),
deliver_subject: "subject".into(),
..Default::default()
},
"stream",
)
.await
.unwrap();
assert_eq!(consumer.cached_info().name, "push");
}

#[tokio::test]
async fn get_or_create_consumer() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
Expand Down