Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/main/java/com/rabbitmq/stream/Environment.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,16 @@ static EnvironmentBuilder builder() {
*/
StreamStats queryStreamStats(String stream);

/**
* Return whether a stream exists or not.
*
* @param stream
* @return true if stream exists, false if it does not exist
* @throws StreamException if response code is different from {@link Constants#RESPONSE_CODE_OK}
* or {@link Constants#RESPONSE_CODE_STREAM_DOES_NOT_EXIST}
*/
boolean streamExists(String stream);

/**
* Create a {@link ProducerBuilder} to configure and create a {@link Producer}.
*
Expand Down
41 changes: 26 additions & 15 deletions src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,11 @@
import static com.rabbitmq.stream.impl.Utils.exceptionMessage;
import static com.rabbitmq.stream.impl.Utils.formatConstant;
import static com.rabbitmq.stream.impl.Utils.namedRunnable;
import static java.lang.String.format;
import static java.util.concurrent.TimeUnit.SECONDS;

import com.rabbitmq.stream.Address;
import com.rabbitmq.stream.AddressResolver;
import com.rabbitmq.stream.BackOffDelayPolicy;
import com.rabbitmq.stream.Codec;
import com.rabbitmq.stream.ConsumerBuilder;
import com.rabbitmq.stream.Environment;
import com.rabbitmq.stream.MessageHandler;
import com.rabbitmq.stream.*;
import com.rabbitmq.stream.MessageHandler.Context;
import com.rabbitmq.stream.NoOffsetException;
import com.rabbitmq.stream.OffsetSpecification;
import com.rabbitmq.stream.ProducerBuilder;
import com.rabbitmq.stream.StreamCreator;
import com.rabbitmq.stream.StreamException;
import com.rabbitmq.stream.StreamStats;
import com.rabbitmq.stream.SubscriptionListener;
import com.rabbitmq.stream.compression.CompressionCodecFactory;
import com.rabbitmq.stream.impl.Client.ClientParameters;
import com.rabbitmq.stream.impl.Client.ShutdownListener;
Expand Down Expand Up @@ -515,12 +503,35 @@ public StreamStats queryStreamStats(String stream) {
response.getResponseCode(),
stream,
() ->
"Error while querying stream info: "
"Error while querying stream stats: "
+ formatConstant(response.getResponseCode())
+ ".");
}
}

@Override
public boolean streamExists(String stream) {
checkNotClosed();
this.maybeInitializeLocator();
StreamStatsResponse response =
locatorOperation(
Utils.namedFunction(
client -> client.streamStats(stream), "Query stream stats on stream '%s'", stream));
if (response.isOk()) {
return true;
} else if (response.getResponseCode() == Constants.RESPONSE_CODE_STREAM_DOES_NOT_EXIST) {
return false;
} else {
throw convertCodeToException(
response.getResponseCode(),
stream,
() ->
format(
"Unexpected result when checking if stream '%s' exists: %s.",
stream, formatConstant(response.getResponseCode())));
}
}

private static class DefaultStreamStats implements StreamStats {

private final LongSupplier firstOffsetSupplier, committedOffsetSupplier;
Expand Down
17 changes: 14 additions & 3 deletions src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -559,8 +559,8 @@ void queryStreamStatsShouldReturnFirstOffsetAndCommittedOffset(boolean lazyInit)
throws Exception {
try (Environment env = environmentBuilder.lazyInitialization(lazyInit).build()) {
StreamStats stats = env.queryStreamStats(stream);
assertThatThrownBy(() -> stats.firstOffset()).isInstanceOf(NoOffsetException.class);
assertThatThrownBy(() -> stats.committedChunkId()).isInstanceOf(NoOffsetException.class);
assertThatThrownBy(stats::firstOffset).isInstanceOf(NoOffsetException.class);
assertThatThrownBy(stats::committedChunkId).isInstanceOf(NoOffsetException.class);

int publishCount = 20_000;
TestUtils.publishAndWaitForConfirms(cf, publishCount, stream);
Expand Down Expand Up @@ -595,6 +595,16 @@ void queryStreamStatsShouldThrowExceptionWhenStreamDoesNotExist() {
}
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
@BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_11)
void streamExists(boolean lazyInit) {
try (Environment env = environmentBuilder.lazyInitialization(lazyInit).build()) {
assertThat(env.streamExists(stream)).isTrue();
assertThat(env.streamExists(UUID.randomUUID().toString())).isFalse();
}
}

@Test
void methodsShouldThrowExceptionWhenEnvironmentIsClosed() {
Environment env = environmentBuilder.build();
Expand All @@ -605,7 +615,8 @@ void methodsShouldThrowExceptionWhenEnvironmentIsClosed() {
() -> env.producerBuilder(),
() -> env.consumerBuilder(),
() -> env.deleteStream("does not matter"),
() -> env.queryStreamStats("does not matter")
() -> env.queryStreamStats("does not matter"),
() -> env.streamExists("does not matter")
};
Arrays.stream(calls)
.forEach(call -> assertThatThrownBy(call).isInstanceOf(IllegalStateException.class));
Expand Down