diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java index a404f10d3dacb..60af8f380a017 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java @@ -194,15 +194,13 @@ public synchronized CompletableFuture disconnect(boolean failIfHasBacklog) return CompletableFuture.completedFuture(null); } - if (producer != null && (STATE_UPDATER.compareAndSet(this, State.Starting, State.Stopping) - || STATE_UPDATER.compareAndSet(this, State.Started, State.Stopping))) { + if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Stopping) + || STATE_UPDATER.compareAndSet(this, State.Started, State.Stopping)) { log.info("[{}][{} -> {}] Disconnect replicator at position {} with backlog {}", topicName, localCluster, remoteCluster, getReplicatorReadPosition(), getNumberOfEntriesInBacklog()); - return closeProducerAsync(); } - STATE_UPDATER.set(this, State.Stopped); - return CompletableFuture.completedFuture(null); + return closeProducerAsync(); } public CompletableFuture remove() {