Skip to content

Commit d73e6ae

Browse files
committed
Fix Compressed contentEncoding Delimiter
Resolves #1251 Delimiter should be a comma and whitespace trimmeed. Handle both delimiters in decompressors and add property for backwards compatibility. **I will backport to 2.2.x with default `:`** * Do not add a delimiter if the original encoding is an empty String.
1 parent b14fce2 commit d73e6ae

File tree

6 files changed

+60
-17
lines changed

6 files changed

+60
-17
lines changed

spring-amqp/src/main/java/org/springframework/amqp/support/postprocessor/AbstractCompressingPostProcessor.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2019 the original author or authors.
2+
* Copyright 2014-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -31,7 +31,9 @@
3131
import org.springframework.amqp.core.MessageProperties;
3232
import org.springframework.amqp.core.MessagePropertiesBuilder;
3333
import org.springframework.core.Ordered;
34+
import org.springframework.util.Assert;
3435
import org.springframework.util.FileCopyUtils;
36+
import org.springframework.util.StringUtils;
3537

3638
/**
3739
* Base class for post processors that compress the message body. The content encoding is
@@ -53,6 +55,8 @@ public abstract class AbstractCompressingPostProcessor implements MessagePostPro
5355

5456
private boolean copyProperties = false;
5557

58+
private String encodingDelimiter = ":";
59+
5660
/**
5761
* Construct a post processor that will include the
5862
* {@link MessageProperties#SPRING_AUTO_DECOMPRESS} header set to 'true'.
@@ -85,6 +89,18 @@ public void setCopyProperties(boolean copyProperties) {
8589
this.copyProperties = copyProperties;
8690
}
8791

92+
/**
93+
* Set a delimiter to be added between the compression type and the original encoding,
94+
* if any. Defaults to {@code ":"} for compatibility with consumers using versions of
95+
* spring-amqp earlier than 2.2.12. In version 2.3, it will default to {@code ", "}.
96+
* @param encodingDelimiter the delimiter.
97+
* @since 2.2.12
98+
*/
99+
public void setEncodingDelimiter(String encodingDelimiter) {
100+
Assert.notNull(encodingDelimiter, "'encodingDelimiter' cannot be null");
101+
this.encodingDelimiter = encodingDelimiter;
102+
}
103+
88104
@Override
89105
public Message postProcessMessage(Message message) throws AmqpException {
90106
try {
@@ -109,9 +125,9 @@ public Message postProcessMessage(Message message) throws AmqpException {
109125

110126
MessageProperties messageProperties =
111127
messagePropertiesBuilder.setContentEncoding(getEncoding() +
112-
(originalProperties.getContentEncoding() == null
128+
(!StringUtils.hasText(originalProperties.getContentEncoding())
113129
? ""
114-
: ":" + originalProperties.getContentEncoding()))
130+
: this.encodingDelimiter + originalProperties.getContentEncoding()))
115131
.build();
116132

117133
return new Message(compressed, messageProperties);

spring-amqp/src/main/java/org/springframework/amqp/support/postprocessor/AbstractDecompressingPostProcessor.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2019 the original author or authors.
2+
* Copyright 2014-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -91,17 +91,22 @@ public Message postProcessMessage(Message message) throws AmqpException {
9191
FileCopyUtils.copy(unzipper, out);
9292
MessageProperties messageProperties = message.getMessageProperties();
9393
String encoding = messageProperties.getContentEncoding();
94-
int colonAt = encoding.indexOf(':');
95-
if (colonAt > 0) {
96-
encoding = encoding.substring(0, colonAt);
94+
int delimAt = encoding.indexOf(':');
95+
if (delimAt < 0) {
96+
delimAt = encoding.indexOf(',');
97+
}
98+
if (delimAt > 0) {
99+
encoding = encoding.substring(0, delimAt);
97100
}
98101
Assert.state(getEncoding().equals(encoding), "Content encoding must be:" + getEncoding() + ", was:"
99102
+ encoding);
100-
if (colonAt < 0) {
103+
if (delimAt < 0) {
101104
messageProperties.setContentEncoding(null);
102105
}
103106
else {
104-
messageProperties.setContentEncoding(messageProperties.getContentEncoding().substring(colonAt + 1));
107+
messageProperties.setContentEncoding(messageProperties.getContentEncoding()
108+
.substring(delimAt + 1)
109+
.trim());
105110
}
106111
messageProperties.getHeaders().remove(MessageProperties.SPRING_AUTO_DECOMPRESS);
107112
return new Message(out.toByteArray(), messageProperties);

spring-amqp/src/main/java/org/springframework/amqp/support/postprocessor/DelegatingDecompressingPostProcessor.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2019 the original author or authors.
2+
* Copyright 2014-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -94,9 +94,12 @@ public Message postProcessMessage(Message message) throws AmqpException {
9494
return message;
9595
}
9696
else {
97-
int colonAt = encoding.indexOf(':');
98-
if (colonAt > 0) {
99-
encoding = encoding.substring(0, colonAt);
97+
int delimAt = encoding.indexOf(':');
98+
if (delimAt < 0) {
99+
delimAt = encoding.indexOf(',');
100+
}
101+
if (delimAt > 0) {
102+
encoding = encoding.substring(0, delimAt);
100103
}
101104
MessagePostProcessor decompressor = this.decompressors.get(encoding);
102105
if (decompressor != null) {

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/BatchingRabbitTemplateTests.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -494,10 +494,14 @@ public void testSimpleBatchGZippedWithEncodingInflated() throws Exception {
494494
BatchingStrategy batchingStrategy = new SimpleBatchingStrategy(2, Integer.MAX_VALUE, 30000);
495495
BatchingRabbitTemplate template = new BatchingRabbitTemplate(batchingStrategy, this.scheduler);
496496
template.setConnectionFactory(this.connectionFactory);
497-
template.setBeforePublishPostProcessors(new GZipPostProcessor());
497+
AtomicReference<String> encoding = new AtomicReference<>();
498+
template.setBeforePublishPostProcessors(new GZipPostProcessor(), msg -> {
499+
encoding.set(msg.getMessageProperties().getContentEncoding());
500+
return msg;
501+
});
498502
template.setAfterReceivePostProcessors(new DelegatingDecompressingPostProcessor());
499503
MessageProperties props = new MessageProperties();
500-
props.setContentEncoding("foo");
504+
props.setContentEncoding("");
501505
Message message = new Message("foo".getBytes(), props);
502506
template.send("", ROUTE, message);
503507
message = new Message("bar".getBytes(), props);
@@ -506,6 +510,7 @@ public void testSimpleBatchGZippedWithEncodingInflated() throws Exception {
506510
byte[] out = (byte[]) template.receiveAndConvert(ROUTE);
507511
assertThat(out).isNotNull();
508512
assertThat(new String(out)).isEqualTo("\u0000\u0000\u0000\u0003foo\u0000\u0000\u0000\u0003bar");
513+
assertThat(encoding.get()).isEqualTo("gzip");
509514
}
510515

511516
@Test
@@ -535,6 +540,7 @@ public void testSimpleBatchZippedWithEncoding() throws Exception {
535540
BatchingRabbitTemplate template = new BatchingRabbitTemplate(batchingStrategy, this.scheduler);
536541
template.setConnectionFactory(this.connectionFactory);
537542
ZipPostProcessor zipPostProcessor = new ZipPostProcessor();
543+
zipPostProcessor.setEncodingDelimiter(", ");
538544
assertThat(getStreamLevel(zipPostProcessor)).isEqualTo(Deflater.BEST_SPEED);
539545
template.setBeforePublishPostProcessors(zipPostProcessor);
540546
MessageProperties props = new MessageProperties();
@@ -544,7 +550,7 @@ public void testSimpleBatchZippedWithEncoding() throws Exception {
544550
message = new Message("bar".getBytes(), props);
545551
template.send("", ROUTE, message);
546552
message = receive(template);
547-
assertThat(message.getMessageProperties().getContentEncoding()).isEqualTo("zip:foo");
553+
assertThat(message.getMessageProperties().getContentEncoding()).isEqualTo("zip, foo");
548554
UnzipPostProcessor unzipper = new UnzipPostProcessor();
549555
message = unzipper.postProcessMessage(message);
550556
assertThat(new String(message.getBody())).isEqualTo("\u0000\u0000\u0000\u0003foo\u0000\u0000\u0000\u0003bar");
@@ -597,6 +603,7 @@ public void testSimpleBatchDeflaterWithEncoding() throws Exception {
597603
BatchingRabbitTemplate template = new BatchingRabbitTemplate(batchingStrategy, this.scheduler);
598604
template.setConnectionFactory(this.connectionFactory);
599605
DeflaterPostProcessor deflaterPostProcessor = new DeflaterPostProcessor();
606+
deflaterPostProcessor.setEncodingDelimiter(", ");
600607
assertThat(getStreamLevel(deflaterPostProcessor)).isEqualTo(Deflater.BEST_SPEED);
601608
template.setBeforePublishPostProcessors(deflaterPostProcessor);
602609
MessageProperties props = new MessageProperties();
@@ -606,7 +613,7 @@ public void testSimpleBatchDeflaterWithEncoding() throws Exception {
606613
message = new Message("bar".getBytes(), props);
607614
template.send("", ROUTE, message);
608615
message = receive(template);
609-
assertThat(message.getMessageProperties().getContentEncoding()).isEqualTo("deflate:foo");
616+
assertThat(message.getMessageProperties().getContentEncoding()).isEqualTo("deflate, foo");
610617
InflaterPostProcessor inflater = new InflaterPostProcessor();
611618
message = inflater.postProcessMessage(message);
612619
assertThat(new String(message.getBody())).isEqualTo("\u0000\u0000\u0000\u0003foo\u0000\u0000\u0000\u0003bar");

src/reference/asciidoc/amqp.adoc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3828,6 +3828,10 @@ By default, these properties are reused for performance reasons, and modified wi
38283828
If you retain a reference to the original outbound message, its properties will change as well.
38293829
So, if your application retains a copy of an outbound message with these message post processors, consider turning the `copyProperties` option on.
38303830

3831+
IMPORTANT: Starting with version 2.2.12, you can configure the delimiter that the compressing post processors use between content encoding elements.
3832+
With versions 2.2.11 and before, this was hard-coded as `:`; it is now set to `:` by default but can be overridden to a standard `,` by setting the `encodingDelimiter` property; this will be the default in 2.3 and above.
3833+
The decompressors will work with both delimiters.
3834+
38313835
Similarly, the `SimpleMessageListenerContainer` also has a `setAfterReceivePostProcessors()` method, letting the decompression be performed after messages are received by the container.
38323836

38333837
Starting with version 2.1.4, `addBeforePublishPostProcessors()` and `addAfterReceivePostProcessors()` have been added to the `RabbitTemplate` to allow appending new post processors to the list of before publish and after receive post processors respectively.

src/reference/asciidoc/whats-new.adoc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,3 +127,11 @@ See <<message-properties-converters>> for more information.
127127

128128
Recovery of failed producer-created batches is now supported.
129129
See <<batch-retry>> for more information.
130+
131+
A new listener container property `consumeDelay` is now available; it is helpful when using the https://github.com/rabbitmq/rabbitmq-sharding[RabbitMQ Sharding Plugin].
132+
See <<containerAttributes>> for more information.
133+
134+
==== MessagePostProcessor Changes
135+
136+
The compressing `MessagePostProcessor` s now can use a comma to separate multiple content encodings instead of a colon.
137+
See the IMPORTANT note in <<post-processing>> for more information.

0 commit comments

Comments
 (0)