Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2019 the original author or authors.
* Copyright 2014-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -31,7 +31,9 @@
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.MessagePropertiesBuilder;
import org.springframework.core.Ordered;
import org.springframework.util.Assert;
import org.springframework.util.FileCopyUtils;
import org.springframework.util.StringUtils;

/**
* Base class for post processors that compress the message body. The content encoding is
Expand All @@ -53,6 +55,8 @@ public abstract class AbstractCompressingPostProcessor implements MessagePostPro

private boolean copyProperties = false;

private String encodingDelimiter = ", ";

/**
* Construct a post processor that will include the
* {@link MessageProperties#SPRING_AUTO_DECOMPRESS} header set to 'true'.
Expand Down Expand Up @@ -85,6 +89,19 @@ public void setCopyProperties(boolean copyProperties) {
this.copyProperties = copyProperties;
}

/**
* Set a delimiter to be added between the compression type and the original encoding,
* if any. Defaults to {@code ", "} (since 2.3); for compatibility with consumers
* using versions of spring-amqp earlier than 2.2.12, set it to {@code ":"} (no
* trailing space).
* @param encodingDelimiter the delimiter.
* @since 2.2.12
*/
public void setEncodingDelimiter(String encodingDelimiter) {
Assert.notNull(encodingDelimiter, "'encodingDelimiter' cannot be null");
this.encodingDelimiter = encodingDelimiter;
}

@Override
public Message postProcessMessage(Message message) throws AmqpException {
try {
Expand All @@ -109,9 +126,9 @@ public Message postProcessMessage(Message message) throws AmqpException {

MessageProperties messageProperties =
messagePropertiesBuilder.setContentEncoding(getEncoding() +
(originalProperties.getContentEncoding() == null
(!StringUtils.hasText(originalProperties.getContentEncoding())
? ""
: ":" + originalProperties.getContentEncoding()))
: this.encodingDelimiter + originalProperties.getContentEncoding()))
.build();

return new Message(compressed, messageProperties);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2019 the original author or authors.
* Copyright 2014-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -91,17 +91,22 @@ public Message postProcessMessage(Message message) throws AmqpException {
FileCopyUtils.copy(unzipper, out);
MessageProperties messageProperties = message.getMessageProperties();
String encoding = messageProperties.getContentEncoding();
int colonAt = encoding.indexOf(':');
if (colonAt > 0) {
encoding = encoding.substring(0, colonAt);
int delimAt = encoding.indexOf(':');
if (delimAt < 0) {
delimAt = encoding.indexOf(',');
}
if (delimAt > 0) {
encoding = encoding.substring(0, delimAt);
}
Assert.state(getEncoding().equals(encoding), "Content encoding must be:" + getEncoding() + ", was:"
+ encoding);
if (colonAt < 0) {
if (delimAt < 0) {
messageProperties.setContentEncoding(null);
}
else {
messageProperties.setContentEncoding(messageProperties.getContentEncoding().substring(colonAt + 1));
messageProperties.setContentEncoding(messageProperties.getContentEncoding()
.substring(delimAt + 1)
.trim());
}
messageProperties.getHeaders().remove(MessageProperties.SPRING_AUTO_DECOMPRESS);
return new Message(out.toByteArray(), messageProperties);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2019 the original author or authors.
* Copyright 2014-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -94,9 +94,12 @@ public Message postProcessMessage(Message message) throws AmqpException {
return message;
}
else {
int colonAt = encoding.indexOf(':');
if (colonAt > 0) {
encoding = encoding.substring(0, colonAt);
int delimAt = encoding.indexOf(':');
if (delimAt < 0) {
delimAt = encoding.indexOf(',');
}
if (delimAt > 0) {
encoding = encoding.substring(0, delimAt);
}
MessagePostProcessor decompressor = this.decompressors.get(encoding);
if (decompressor != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@ public void testSimpleBatchGZipped() throws Exception {
BatchingRabbitTemplate template = new BatchingRabbitTemplate(batchingStrategy, this.scheduler);
template.setConnectionFactory(this.connectionFactory);
GZipPostProcessor gZipPostProcessor = new GZipPostProcessor();
gZipPostProcessor.setEncodingDelimiter(":"); // unzip messages from older versions
assertThat(getStreamLevel(gZipPostProcessor)).isEqualTo(Deflater.BEST_SPEED);
template.setBeforePublishPostProcessors(gZipPostProcessor);
MessageProperties props = new MessageProperties();
Expand Down Expand Up @@ -489,7 +490,7 @@ public void testSimpleBatchGZippedWithEncoding() throws Exception {
message = new Message("bar".getBytes(), props);
template.send("", ROUTE, message);
message = receive(template);
assertThat(message.getMessageProperties().getContentEncoding()).isEqualTo("gzip:foo");
assertThat(message.getMessageProperties().getContentEncoding()).isEqualTo("gzip, foo");
GUnzipPostProcessor unzipper = new GUnzipPostProcessor();
message = unzipper.postProcessMessage(message);
assertThat(new String(message.getBody())).isEqualTo("\u0000\u0000\u0000\u0003foo\u0000\u0000\u0000\u0003bar");
Expand All @@ -500,10 +501,14 @@ public void testSimpleBatchGZippedWithEncodingInflated() throws Exception {
BatchingStrategy batchingStrategy = new SimpleBatchingStrategy(2, Integer.MAX_VALUE, 30000);
BatchingRabbitTemplate template = new BatchingRabbitTemplate(batchingStrategy, this.scheduler);
template.setConnectionFactory(this.connectionFactory);
template.setBeforePublishPostProcessors(new GZipPostProcessor());
AtomicReference<String> encoding = new AtomicReference<>();
template.setBeforePublishPostProcessors(new GZipPostProcessor(), msg -> {
encoding.set(msg.getMessageProperties().getContentEncoding());
return msg;
});
template.setAfterReceivePostProcessors(new DelegatingDecompressingPostProcessor());
MessageProperties props = new MessageProperties();
props.setContentEncoding("foo");
props.setContentEncoding("");
Message message = new Message("foo".getBytes(), props);
template.send("", ROUTE, message);
message = new Message("bar".getBytes(), props);
Expand All @@ -512,6 +517,7 @@ public void testSimpleBatchGZippedWithEncodingInflated() throws Exception {
byte[] out = (byte[]) template.receiveAndConvert(ROUTE);
assertThat(out).isNotNull();
assertThat(new String(out)).isEqualTo("\u0000\u0000\u0000\u0003foo\u0000\u0000\u0000\u0003bar");
assertThat(encoding.get()).isEqualTo("gzip");
}

@Test
Expand Down Expand Up @@ -550,7 +556,7 @@ public void testSimpleBatchZippedWithEncoding() throws Exception {
message = new Message("bar".getBytes(), props);
template.send("", ROUTE, message);
message = receive(template);
assertThat(message.getMessageProperties().getContentEncoding()).isEqualTo("zip:foo");
assertThat(message.getMessageProperties().getContentEncoding()).isEqualTo("zip, foo");
UnzipPostProcessor unzipper = new UnzipPostProcessor();
message = unzipper.postProcessMessage(message);
assertThat(new String(message.getBody())).isEqualTo("\u0000\u0000\u0000\u0003foo\u0000\u0000\u0000\u0003bar");
Expand Down Expand Up @@ -612,7 +618,7 @@ public void testSimpleBatchDeflaterWithEncoding() throws Exception {
message = new Message("bar".getBytes(), props);
template.send("", ROUTE, message);
message = receive(template);
assertThat(message.getMessageProperties().getContentEncoding()).isEqualTo("deflate:foo");
assertThat(message.getMessageProperties().getContentEncoding()).isEqualTo("deflate, foo");
InflaterPostProcessor inflater = new InflaterPostProcessor();
message = inflater.postProcessMessage(message);
assertThat(new String(message.getBody())).isEqualTo("\u0000\u0000\u0000\u0003foo\u0000\u0000\u0000\u0003bar");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1393,7 +1393,7 @@ public String handleMessage(String message) {
Message message = new Message("foo".getBytes(), props);
Message reply = template.sendAndReceive("", ROUTE, message);
assertThat(reply).isNotNull();
assertThat(reply.getMessageProperties().getContentEncoding()).isEqualTo("gzip:UTF-8");
assertThat(reply.getMessageProperties().getContentEncoding()).isEqualTo("gzip, UTF-8");
GUnzipPostProcessor unzipper = new GUnzipPostProcessor();
reply = unzipper.postProcessMessage(reply);
assertThat(new String(reply.getBody())).isEqualTo("FOO");
Expand Down
6 changes: 6 additions & 0 deletions src/reference/asciidoc/amqp.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -3946,6 +3946,12 @@ By default, these properties are reused for performance reasons, and modified wi
If you retain a reference to the original outbound message, its properties will change as well.
So, if your application retains a copy of an outbound message with these message post processors, consider turning the `copyProperties` option on.

IMPORTANT: Starting with version 2.2.12, you can configure the delimiter that the compressing post processors use between content encoding elements.
With versions 2.2.11 and before, this was hard-coded as `:`, it is now set to `, ` by default.
The decompressors will work with both delimiters.
However, if you publish messages with 2.3 or later and consume with 2.2.11 or earlier, you MUST set the `encodingDelimiter` property on the compressor(s) to `:`.
When your consumers are upgraded to 2.2.11 or later, you can revert to the default of `, `.

Similarly, the `SimpleMessageListenerContainer` also has a `setAfterReceivePostProcessors()` method, letting the decompression be performed after messages are received by the container.

Starting with version 2.1.4, `addBeforePublishPostProcessors()` and `addAfterReceivePostProcessors()` have been added to the `RabbitTemplate` to allow appending new post processors to the list of before publish and after receive post processors respectively.
Expand Down
6 changes: 6 additions & 0 deletions src/reference/asciidoc/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,9 @@ See <<template-confirms>> for more information.

A new listener container property `consumeDelay` is now available; it is helpful when using the https://github.com/rabbitmq/rabbitmq-sharding[RabbitMQ Sharding Plugin].
See <<containerAttributes>> for more information.

==== MessagePostProcessor Changes

The compressing `MessagePostProcessor` s now use a comma to separate multiple content encodings instead of a colon.
The decompressors can handle both formats but, if you produce messages with this version that are consumed by versions earlier than 2.2.12, you should configure the compressor to use the old delimiter.
See the IMPORTANT note in <<post-processing>> for more information.