diff --git a/src/main/java/com/rabbitmq/stream/Environment.java b/src/main/java/com/rabbitmq/stream/Environment.java index 7b97dc4493..4e3631a2f0 100644 --- a/src/main/java/com/rabbitmq/stream/Environment.java +++ b/src/main/java/com/rabbitmq/stream/Environment.java @@ -72,6 +72,16 @@ static EnvironmentBuilder builder() { */ StreamStats queryStreamStats(String stream); + /** + * Return whether a stream exists or not. + * + * @param stream + * @return true if stream exists, false if it does not exist + * @throws StreamException if response code is different from {@link Constants#RESPONSE_CODE_OK} + * or {@link Constants#RESPONSE_CODE_STREAM_DOES_NOT_EXIST} + */ + boolean streamExists(String stream); + /** * Create a {@link ProducerBuilder} to configure and create a {@link Producer}. * diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java index 5ee21b723b..b2065f98dc 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java @@ -17,23 +17,11 @@ import static com.rabbitmq.stream.impl.Utils.exceptionMessage; import static com.rabbitmq.stream.impl.Utils.formatConstant; import static com.rabbitmq.stream.impl.Utils.namedRunnable; +import static java.lang.String.format; import static java.util.concurrent.TimeUnit.SECONDS; -import com.rabbitmq.stream.Address; -import com.rabbitmq.stream.AddressResolver; -import com.rabbitmq.stream.BackOffDelayPolicy; -import com.rabbitmq.stream.Codec; -import com.rabbitmq.stream.ConsumerBuilder; -import com.rabbitmq.stream.Environment; -import com.rabbitmq.stream.MessageHandler; +import com.rabbitmq.stream.*; import com.rabbitmq.stream.MessageHandler.Context; -import com.rabbitmq.stream.NoOffsetException; -import com.rabbitmq.stream.OffsetSpecification; -import com.rabbitmq.stream.ProducerBuilder; -import com.rabbitmq.stream.StreamCreator; -import com.rabbitmq.stream.StreamException; -import com.rabbitmq.stream.StreamStats; -import com.rabbitmq.stream.SubscriptionListener; import com.rabbitmq.stream.compression.CompressionCodecFactory; import com.rabbitmq.stream.impl.Client.ClientParameters; import com.rabbitmq.stream.impl.Client.ShutdownListener; @@ -515,12 +503,35 @@ public StreamStats queryStreamStats(String stream) { response.getResponseCode(), stream, () -> - "Error while querying stream info: " + "Error while querying stream stats: " + formatConstant(response.getResponseCode()) + "."); } } + @Override + public boolean streamExists(String stream) { + checkNotClosed(); + this.maybeInitializeLocator(); + StreamStatsResponse response = + locatorOperation( + Utils.namedFunction( + client -> client.streamStats(stream), "Query stream stats on stream '%s'", stream)); + if (response.isOk()) { + return true; + } else if (response.getResponseCode() == Constants.RESPONSE_CODE_STREAM_DOES_NOT_EXIST) { + return false; + } else { + throw convertCodeToException( + response.getResponseCode(), + stream, + () -> + format( + "Unexpected result when checking if stream '%s' exists: %s.", + stream, formatConstant(response.getResponseCode()))); + } + } + private static class DefaultStreamStats implements StreamStats { private final LongSupplier firstOffsetSupplier, committedOffsetSupplier; diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java index 1ac339839b..55f3ce4a38 100644 --- a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java @@ -559,8 +559,8 @@ void queryStreamStatsShouldReturnFirstOffsetAndCommittedOffset(boolean lazyInit) throws Exception { try (Environment env = environmentBuilder.lazyInitialization(lazyInit).build()) { StreamStats stats = env.queryStreamStats(stream); - assertThatThrownBy(() -> stats.firstOffset()).isInstanceOf(NoOffsetException.class); - assertThatThrownBy(() -> stats.committedChunkId()).isInstanceOf(NoOffsetException.class); + assertThatThrownBy(stats::firstOffset).isInstanceOf(NoOffsetException.class); + assertThatThrownBy(stats::committedChunkId).isInstanceOf(NoOffsetException.class); int publishCount = 20_000; TestUtils.publishAndWaitForConfirms(cf, publishCount, stream); @@ -595,6 +595,16 @@ void queryStreamStatsShouldThrowExceptionWhenStreamDoesNotExist() { } } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + @BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_11) + void streamExists(boolean lazyInit) { + try (Environment env = environmentBuilder.lazyInitialization(lazyInit).build()) { + assertThat(env.streamExists(stream)).isTrue(); + assertThat(env.streamExists(UUID.randomUUID().toString())).isFalse(); + } + } + @Test void methodsShouldThrowExceptionWhenEnvironmentIsClosed() { Environment env = environmentBuilder.build(); @@ -605,7 +615,8 @@ void methodsShouldThrowExceptionWhenEnvironmentIsClosed() { () -> env.producerBuilder(), () -> env.consumerBuilder(), () -> env.deleteStream("does not matter"), - () -> env.queryStreamStats("does not matter") + () -> env.queryStreamStats("does not matter"), + () -> env.streamExists("does not matter") }; Arrays.stream(calls) .forEach(call -> assertThatThrownBy(call).isInstanceOf(IllegalStateException.class));