Skip to content

Commit dd135cc

Browse files
committed
PIP-132
1 parent 61f99cb commit dd135cc

File tree

4 files changed

+194
-11
lines changed

4 files changed

+194
-11
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java

Lines changed: 89 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,10 @@
1919
package org.apache.pulsar.broker.service;
2020

2121
import com.google.common.collect.Sets;
22-
2322
import java.util.Optional;
2423
import java.util.concurrent.TimeUnit;
25-
2624
import lombok.Cleanup;
25+
import lombok.extern.slf4j.Slf4j;
2726
import org.apache.bookkeeper.conf.ServerConfiguration;
2827
import org.apache.pulsar.broker.PulsarService;
2928
import org.apache.pulsar.broker.ServiceConfiguration;
@@ -32,6 +31,7 @@
3231
import org.apache.pulsar.client.api.Producer;
3332
import org.apache.pulsar.client.api.PulsarClient;
3433
import org.apache.pulsar.client.api.PulsarClientException;
34+
import org.apache.pulsar.client.impl.MessageImpl;
3535
import org.apache.pulsar.common.policies.data.ClusterData;
3636
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
3737
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
@@ -41,6 +41,7 @@
4141
import org.testng.annotations.Test;
4242

4343
@Test(groups = "broker")
44+
@Slf4j
4445
public class MaxMessageSizeTest {
4546

4647
PulsarService pulsar;
@@ -55,7 +56,7 @@ void setup() {
5556
try {
5657
bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
5758
ServerConfiguration conf = new ServerConfiguration();
58-
conf.setNettyMaxFrameSizeBytes(10 * 1024 * 1024);
59+
conf.setNettyMaxFrameSizeBytes(10 * 1024 * 1024 + 10 * 1024);
5960
bkEnsemble.startStandalone(conf, false);
6061

6162
configuration = new ServiceConfiguration();
@@ -78,7 +79,8 @@ void setup() {
7879
admin = PulsarAdmin.builder().serviceHttpUrl(url).build();
7980
admin.clusters().createCluster("max_message_test", ClusterData.builder().serviceUrl(url).build());
8081
admin.tenants()
81-
.createTenant("test", new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet("max_message_test")));
82+
.createTenant("test",
83+
new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet("max_message_test")));
8284
admin.namespaces().createNamespace("test/message", Sets.newHashSet("max_message_test"));
8385
} catch (Exception e) {
8486
e.printStackTrace();
@@ -101,8 +103,8 @@ public void testMaxMessageSetting() throws PulsarClientException {
101103
@Cleanup
102104
PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build();
103105
String topicName = "persistent://test/message/topic1";
104-
Producer producer = client.newProducer().topic(topicName).sendTimeout(60, TimeUnit.SECONDS).create();
105-
Consumer consumer = client.newConsumer().topic(topicName).subscriptionName("test1").subscribe();
106+
Producer<byte[]> producer = client.newProducer().topic(topicName).sendTimeout(60, TimeUnit.SECONDS).create();
107+
Consumer<byte[]> consumer = client.newConsumer().topic(topicName).subscriptionName("test1").subscribe();
106108

107109
// less than 5MB message
108110

@@ -139,6 +141,14 @@ public void testMaxMessageSetting() throws PulsarClientException {
139141
byte[] consumerNewNormalMsg = consumer.receive().getData();
140142
Assert.assertEquals(newNormalMsg, consumerNewNormalMsg);
141143

144+
// 2MB metadata and 8 MB payload
145+
try {
146+
producer.newMessage().keyBytes(new byte[2 * 1024 * 1024]).value(newNormalMsg).send();
147+
Assert.fail("Shouldn't send out this message");
148+
} catch (PulsarClientException e) {
149+
//no-op
150+
}
151+
142152
// equals 10MB message
143153
byte[] newLimitMsg = new byte[10 * 1024 * 1024];
144154
try {
@@ -151,6 +161,79 @@ public void testMaxMessageSetting() throws PulsarClientException {
151161
consumer.unsubscribe();
152162
consumer.close();
153163
producer.close();
164+
}
154165

166+
@Test
167+
public void testNonBatchingMaxMessageSize() throws Exception {
168+
@Cleanup
169+
PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build();
170+
String topicName = "persistent://test/message/testNonBatchingMaxMessageSize";
171+
@Cleanup
172+
Producer<byte[]> producer = client.newProducer()
173+
.topic(topicName)
174+
.enableBatching(false)
175+
.sendTimeout(30, TimeUnit.SECONDS).create();
176+
@Cleanup
177+
Consumer<byte[]> consumer = client.newConsumer().topic(topicName).subscriptionName("test1").subscribe();
178+
179+
byte[] data = new byte[8 * 1024 * 1024];
180+
try {
181+
producer.newMessage().value(data).send();
182+
} catch (PulsarClientException e) {
183+
Assert.fail("Shouldn't have exception at here", e);
184+
}
185+
Assert.assertEquals(consumer.receive().getData(), data);
186+
187+
// 1MB metadata and 8 MB payload
188+
try {
189+
producer.newMessage().property("P", new String(new byte[1024 * 1024])).value(data).send();
190+
} catch (PulsarClientException e) {
191+
Assert.fail("Shouldn't have exception at here", e);
192+
}
193+
Assert.assertEquals(consumer.receive().getData(), data);
194+
195+
// 2MB metadata and 8 MB payload, should fail.
196+
try {
197+
producer.newMessage().property("P", new String(new byte[2 * 1024 * 1024])).value(data).send();
198+
Assert.fail("Shouldn't send out this message");
199+
} catch (PulsarClientException e) {
200+
//no-op
201+
}
202+
}
203+
204+
@Test
205+
public void testChunkingMaxMessageSize() throws Exception {
206+
@Cleanup
207+
PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build();
208+
String topicName = "persistent://test/message/testChunkingMaxMessageSize";
209+
@Cleanup
210+
Producer<byte[]> producer = client.newProducer()
211+
.topic(topicName)
212+
.enableBatching(false)
213+
.enableChunking(true)
214+
.sendTimeout(30, TimeUnit.SECONDS).create();
215+
@Cleanup
216+
Consumer<byte[]> consumer = client.newConsumer().topic(topicName).subscriptionName("test1").subscribe();
217+
218+
// 12 MB payload, there should be 2 chunks
219+
byte[] data = new byte[12 * 1024 * 1024];
220+
try {
221+
producer.newMessage().value(data).send();
222+
} catch (PulsarClientException e) {
223+
Assert.fail("Shouldn't have exception at here", e);
224+
}
225+
MessageImpl<byte[]> msg = (MessageImpl<byte[]>) consumer.receive();
226+
Assert.assertEquals(msg.getData(), data);
227+
Assert.assertEquals(msg.getMessageBuilder().getNumChunksFromMsg(), 2);
228+
229+
// 5MB metadata and 12 MB payload, there should be 3 chunks
230+
try {
231+
producer.newMessage().property("P", new String(new byte[5 * 1024 * 1024])).value(data).send();
232+
} catch (PulsarClientException e) {
233+
Assert.fail("Shouldn't have exception at here", e);
234+
}
235+
msg = (MessageImpl<byte[]>) consumer.receive();
236+
Assert.assertEquals(msg.getData(), data);
237+
Assert.assertEquals(msg.getMessageBuilder().getNumChunksFromMsg(), 3);
155238
}
156239
}

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java

Lines changed: 54 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@
7777
import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
7878
import org.apache.pulsar.client.impl.schema.JSONSchema;
7979
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
80+
import org.apache.pulsar.client.util.MathUtils;
8081
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
8182
import org.apache.pulsar.common.api.proto.MessageMetadata;
8283
import org.apache.pulsar.common.api.proto.ProtocolVersion;
@@ -470,9 +471,20 @@ public void sendAsync(Message<?> message, SendCallback callback) {
470471
}
471472

472473
// send in chunks
473-
int totalChunks = canAddToBatch(msg) ? 1
474-
: Math.max(1, compressedPayload.readableBytes()) / ClientCnx.getMaxMessageSize()
475-
+ (Math.max(1, compressedPayload.readableBytes()) % ClientCnx.getMaxMessageSize() == 0 ? 0 : 1);
474+
int totalChunks;
475+
int payloadChunkSize;
476+
if (canAddToBatch(msg) || !conf.isChunkingEnabled()) {
477+
totalChunks = 1;
478+
payloadChunkSize = ClientCnx.getMaxMessageSize();
479+
} else {
480+
// Reserve current metadata size for chunk size to avoid message size overflow.
481+
// NOTE: this is not strictly bounded, as metadata will be updated after chunking.
482+
// So there is a small chance that the final message size is larger than ClientCnx.getMaxMessageSize().
483+
// But it won't cause produce failure as broker have 10 KB padding space for these cases.
484+
payloadChunkSize = ClientCnx.getMaxMessageSize() - msgMetadata.getSerializedSize();
485+
totalChunks = MathUtils.ceilDiv(Math.max(1, compressedPayload.readableBytes()), payloadChunkSize);
486+
}
487+
476488
// chunked message also sent individually so, try to acquire send-permits
477489
for (int i = 0; i < (totalChunks - 1); i++) {
478490
if (!canEnqueueRequest(callback, message.getSequenceId(), 0 /* The memory was already reserved */)) {
@@ -512,9 +524,9 @@ public void sendAsync(Message<?> message, SendCallback callback) {
512524
}
513525
}
514526
serializeAndSendMessage(msg, payload, sequenceId, uuid, chunkId, totalChunks,
515-
readStartIndex, ClientCnx.getMaxMessageSize(), compressedPayload, compressed,
527+
readStartIndex, payloadChunkSize, compressedPayload, compressed,
516528
compressedPayload.readableBytes(), uncompressedSize, callback, chunkedMessageCtx);
517-
readStartIndex = ((chunkId + 1) * ClientCnx.getMaxMessageSize());
529+
readStartIndex = ((chunkId + 1) * payloadChunkSize);
518530
}
519531
}
520532
} catch (PulsarClientException e) {
@@ -1400,6 +1412,19 @@ void setMessageId(ChunkMessageIdImpl chunkMessageId) {
14001412
}
14011413
}
14021414

1415+
public int getMessageHeaderAndPayloadSize() {
1416+
if (cmd == null) {
1417+
return 0;
1418+
}
1419+
ByteBuf cmdHeader = cmd.getFirst();
1420+
cmdHeader.markReaderIndex();
1421+
int totalSize = cmdHeader.readInt();
1422+
int cmdSize = cmdHeader.readInt();
1423+
int msgHeadersAndPayloadSize = totalSize - cmdSize - 4;
1424+
cmdHeader.resetReaderIndex();
1425+
return msgHeadersAndPayloadSize;
1426+
}
1427+
14031428
private OpSendMsg(Handle<OpSendMsg> recyclerHandle) {
14041429
this.recyclerHandle = recyclerHandle;
14051430
}
@@ -1982,6 +2007,9 @@ protected void processOpSendMsg(OpSendMsg op) {
19822007
if (op.msg != null && isBatchMessagingEnabled()) {
19832008
batchMessageAndSend();
19842009
}
2010+
if (checkMaxMessageSize(op)) {
2011+
return;
2012+
}
19852013
pendingMessages.add(op);
19862014
if (op.msg != null) {
19872015
LAST_SEQ_ID_PUSHED_UPDATER.getAndUpdate(this,
@@ -2049,6 +2077,9 @@ private void recoverProcessOpSendMsgFrom(ClientCnx cnx, MessageImpl from, long e
20492077
if (op.cmd == null) {
20502078
checkState(op.rePopulate != null);
20512079
op.rePopulate.run();
2080+
if (checkMaxMessageSize(op)) {
2081+
continue;
2082+
}
20522083
}
20532084
if (stripChecksum) {
20542085
stripChecksum(op);
@@ -2074,6 +2105,24 @@ private void recoverProcessOpSendMsgFrom(ClientCnx cnx, MessageImpl from, long e
20742105
}
20752106
}
20762107

2108+
/**
2109+
* Check final message size for non-batch and non-chunked messages only
2110+
*/
2111+
public boolean checkMaxMessageSize(OpSendMsg op) {
2112+
if (op.msg != null && op.totalChunks <= 1) {
2113+
int messageSize = op.getMessageHeaderAndPayloadSize();
2114+
if (messageSize > ClientCnx.getMaxMessageSize()) {
2115+
releaseSemaphoreForSendOp(op);
2116+
op.sendComplete(new PulsarClientException.InvalidMessageException(
2117+
format("The producer %s of the topic %s sends a message with %d bytes that exceeds %d bytes",
2118+
producerName, topic, messageSize, ClientCnx.getMaxMessageSize()),
2119+
op.sequenceId));
2120+
return true;
2121+
}
2122+
}
2123+
return false;
2124+
}
2125+
20772126
public long getDelayInMillis() {
20782127
OpSendMsg firstMsg = pendingMessages.peek();
20792128
if (firstMsg != null) {

pulsar-client/src/main/java/org/apache/pulsar/client/util/MathUtils.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,4 +38,15 @@ public static int signSafeMod(long dividend, int divisor) {
3838

3939
return mod;
4040
}
41+
42+
/**
43+
* Ceil version of Math.floorDiv().
44+
* @param x the dividend
45+
* @param y the divisor
46+
* @return the smallest value that is larger than or equal to the algebraic quotient.
47+
*
48+
*/
49+
public static int ceilDiv(int x, int y) {
50+
return -Math.floorDiv(-x, y);
51+
}
4152
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.pulsar.client.util;
21+
22+
import static org.testng.Assert.*;
23+
import org.testng.annotations.Test;
24+
25+
public class MathUtilsTest {
26+
27+
@Test
28+
public void testCeilDiv() {
29+
assertEquals(MathUtils.ceilDiv(0, 1024), 0);
30+
assertEquals(MathUtils.ceilDiv(1, 1024), 1);
31+
assertEquals(MathUtils.ceilDiv(1023, 1024), 1);
32+
assertEquals(MathUtils.ceilDiv(1024, 1024), 1);
33+
assertEquals(MathUtils.ceilDiv(1025, 1024), 2);
34+
35+
assertEquals(MathUtils.ceilDiv(0, Integer.MAX_VALUE), 0);
36+
assertEquals(MathUtils.ceilDiv(1, Integer.MAX_VALUE), 1);
37+
assertEquals(MathUtils.ceilDiv(Integer.MAX_VALUE - 1, Integer.MAX_VALUE), 1);
38+
assertEquals(MathUtils.ceilDiv(Integer.MAX_VALUE, Integer.MAX_VALUE), 1);
39+
}
40+
}

0 commit comments

Comments
 (0)