Skip to content

Commit 6aef83f

Browse files
authored
Producer getting producer busy is removing existing producer from list (apache#11804)
### Motivation When a producer is getting error because of ProducerBusy (existing producer with the same name), it will trigger a producer close operation that will eventually lead to the existing producer getting removed from the topic map (even though that producer is still writing on the topic). The problem is the producer close is triggering a removal from the map: pulsar/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java Line 683 in 43ded59 if (producers.remove(producer.getProducerName(), producer)) { Even though we check for producer equality, the Producer.equals() is only comparing the producer name, so the old instance gets removed from the map. Instead, the equality of producer needs to be based on the connection id (local & remote addresses and unique id), plus the producer id within that connection. * Producer getting producer busy is removing existing producer from list * Fixed test
1 parent f32527f commit 6aef83f

File tree

3 files changed

+67
-1
lines changed

3 files changed

+67
-1
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,10 @@ public int hashCode() {
147147
public boolean equals(Object obj) {
148148
if (obj instanceof Producer) {
149149
Producer other = (Producer) obj;
150-
return Objects.equals(producerName, other.producerName) && Objects.equals(topic, other.topic);
150+
return Objects.equals(producerName, other.producerName)
151+
&& Objects.equals(topic, other.topic)
152+
&& producerId == other.producerId
153+
&& Objects.equals(cnx, other.cnx);
151154
}
152155

153156
return false;

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import java.util.IdentityHashMap;
4242
import java.util.Map;
4343
import java.util.NoSuchElementException;
44+
import java.util.Objects;
4445
import java.util.Optional;
4546
import java.util.Set;
4647
import java.util.concurrent.CompletableFuture;
@@ -1530,6 +1531,23 @@ protected void handleSeek(CommandSeek seek) {
15301531
}
15311532
}
15321533

1534+
@Override
1535+
public boolean equals(Object o) {
1536+
if (this == o) {
1537+
return true;
1538+
}
1539+
if (o == null || getClass() != o.getClass()) {
1540+
return false;
1541+
}
1542+
ServerCnx other = (ServerCnx) o;
1543+
return Objects.equals(ctx().channel().id(), other.ctx().channel().id());
1544+
}
1545+
1546+
@Override
1547+
public int hashCode() {
1548+
return Objects.hash(ctx().channel().id());
1549+
}
1550+
15331551
@Override
15341552
protected void handleCloseProducer(CommandCloseProducer closeProducer) {
15351553
checkArgument(state == State.Connected);

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1814,4 +1814,49 @@ public void testWithEventTime() throws Exception {
18141814
assertEquals(msg.getValue(), "test");
18151815
assertEquals(msg.getEventTime(), 5);
18161816
}
1817+
1818+
@Test
1819+
public void testProducerBusy() throws Exception {
1820+
final String topicName = "prop/ns-abc/producer-busy-" + System.nanoTime();
1821+
1822+
@Cleanup
1823+
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
1824+
.topic(topicName)
1825+
.producerName("xxx")
1826+
.create();
1827+
1828+
assertEquals(admin.topics().getStats(topicName).getPublishers().size(), 1);
1829+
1830+
for (int i =0; i < 5; i++) {
1831+
try {
1832+
pulsarClient.newProducer(Schema.STRING)
1833+
.topic(topicName)
1834+
.producerName("xxx")
1835+
.create();
1836+
fail("Should have failed");
1837+
} catch (ProducerBusyException e) {
1838+
// Expected
1839+
}
1840+
1841+
assertEquals(admin.topics().getStats(topicName).getPublishers().size(), 1);
1842+
}
1843+
1844+
// Try from different connection
1845+
@Cleanup
1846+
PulsarClient client2 = PulsarClient.builder()
1847+
.serviceUrl(getPulsar().getBrokerServiceUrl())
1848+
.build();
1849+
1850+
try {
1851+
client2.newProducer(Schema.STRING)
1852+
.topic(topicName)
1853+
.producerName("xxx")
1854+
.create();
1855+
fail("Should have failed");
1856+
} catch (ProducerBusyException e) {
1857+
// Expected
1858+
}
1859+
1860+
assertEquals(admin.topics().getStats(topicName).getPublishers().size(), 1);
1861+
}
18171862
}

0 commit comments

Comments
 (0)