diff --git a/server/src/main/java/org/elasticsearch/action/DocWriteResponse.java b/server/src/main/java/org/elasticsearch/action/DocWriteResponse.java index 93c372392b269..9ae090cbd6e4c 100644 --- a/server/src/main/java/org/elasticsearch/action/DocWriteResponse.java +++ b/server/src/main/java/org/elasticsearch/action/DocWriteResponse.java @@ -131,6 +131,25 @@ public DocWriteResponse(ShardId shardId, String id, long seqNo, long primaryTerm } // needed for deserialization + protected DocWriteResponse(ShardId shardId, StreamInput in) throws IOException { + super(in); + this.shardId = shardId; + if (in.getVersion().before(Version.V_8_0_0)) { + String type = in.readString(); + assert MapperService.SINGLE_MAPPING_NAME.equals(type) : "Expected [_doc] but received [" + type + "]"; + } + id = in.readString(); + version = in.readZLong(); + seqNo = in.readZLong(); + primaryTerm = in.readVLong(); + forcedRefresh = in.readBoolean(); + result = Result.readFrom(in); + } + + /** + * Needed for deserialization of single item requests in {@link org.elasticsearch.action.index.IndexAction} and BwC + * deserialization path + */ protected DocWriteResponse(StreamInput in) throws IOException { super(in); shardId = new ShardId(in); @@ -258,10 +277,19 @@ public String getLocation(@Nullable String routing) { return location.toString(); } + public void writeThin(StreamOutput out) throws IOException { + super.writeTo(out); + writeWithoutShardId(out); + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); shardId.writeTo(out); + writeWithoutShardId(out); + } + + private void writeWithoutShardId(StreamOutput out) throws IOException { if (out.getVersion().before(Version.V_8_0_0)) { out.writeString(MapperService.SINGLE_MAPPING_NAME); } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java index bdf7abaa13d77..5707e4b2b9a5b 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java @@ -40,6 +40,7 @@ import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.rest.RestStatus; import java.io.IOException; @@ -359,6 +360,26 @@ public String toString() { BulkItemResponse() {} + BulkItemResponse(ShardId shardId, StreamInput in) throws IOException { + id = in.readVInt(); + opType = OpType.fromId(in.readByte()); + + byte type = in.readByte(); + if (type == 0) { + response = new IndexResponse(shardId, in); + } else if (type == 1) { + response = new DeleteResponse(shardId, in); + } else if (type == 3) { // make 3 instead of 2, because 2 is already in use for 'no responses' + response = new UpdateResponse(shardId, in); + } else if (type != 2) { + throw new IllegalArgumentException("Unexpected type [" + type + "]"); + } + + if (in.readBoolean()) { + failure = new Failure(in); + } + } + BulkItemResponse(StreamInput in) throws IOException { id = in.readVInt(); opType = OpType.fromId(in.readByte()); @@ -370,6 +391,8 @@ public String toString() { response = new DeleteResponse(in); } else if (type == 3) { // make 3 instead of 2, because 2 is already in use for 'no responses' response = new UpdateResponse(in); + } else if (type != 2) { + throw new IllegalArgumentException("Unexpected type [" + type + "]"); } if (in.readBoolean()) { @@ -473,13 +496,7 @@ public void writeTo(StreamOutput out) throws IOException { if (response == null) { out.writeByte((byte) 2); } else { - if (response instanceof IndexResponse) { - out.writeByte((byte) 0); - } else if (response instanceof DeleteResponse) { - out.writeByte((byte) 1); - } else if (response instanceof UpdateResponse) { - out.writeByte((byte) 3); // make 3 instead of 2, because 2 is already in use for 'no responses' - } + writeResponseType(out); response.writeTo(out); } if (failure == null) { @@ -489,4 +506,34 @@ public void writeTo(StreamOutput out) throws IOException { failure.writeTo(out); } } + + public void writeThin(StreamOutput out) throws IOException { + out.writeVInt(id); + out.writeByte(opType.getId()); + + if (response == null) { + out.writeByte((byte) 2); + } else { + writeResponseType(out); + response.writeThin(out); + } + if (failure == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + failure.writeTo(out); + } + } + + private void writeResponseType(StreamOutput out) throws IOException { + if (response instanceof IndexResponse) { + out.writeByte((byte) 0); + } else if (response instanceof DeleteResponse) { + out.writeByte((byte) 1); + } else if (response instanceof UpdateResponse) { + out.writeByte((byte) 3); // make 3 instead of 2, because 2 is already in use for 'no responses' + } else { + throw new IllegalStateException("Unexpected response type found [" + response.getClass() + "]"); + } + } } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkShardResponse.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkShardResponse.java index 63ab78547d651..a795863eb3667 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkShardResponse.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkShardResponse.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.bulk; +import org.elasticsearch.Version; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.support.WriteResponse; import org.elasticsearch.action.support.replication.ReplicationResponse; @@ -30,6 +31,8 @@ public class BulkShardResponse extends ReplicationResponse implements WriteResponse { + private static final Version COMPACT_SHARD_ID_VERSION = Version.V_8_0_0; + private final ShardId shardId; private final BulkItemResponse[] responses; @@ -37,8 +40,14 @@ public class BulkShardResponse extends ReplicationResponse implements WriteRespo super(in); shardId = new ShardId(in); responses = new BulkItemResponse[in.readVInt()]; - for (int i = 0; i < responses.length; i++) { - responses[i] = new BulkItemResponse(in); + if (in.getVersion().onOrAfter(COMPACT_SHARD_ID_VERSION)) { + for (int i = 0; i < responses.length; i++) { + responses[i] = new BulkItemResponse(shardId, in); + } + } else { + for (int i = 0; i < responses.length; i++) { + responses[i] = new BulkItemResponse(in); + } } } @@ -75,8 +84,14 @@ public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); shardId.writeTo(out); out.writeVInt(responses.length); - for (BulkItemResponse response : responses) { - response.writeTo(out); + if (out.getVersion().onOrAfter(COMPACT_SHARD_ID_VERSION)) { + for (BulkItemResponse response : responses) { + response.writeThin(out); + } + } else { + for (BulkItemResponse response : responses) { + response.writeTo(out); + } } } } diff --git a/server/src/main/java/org/elasticsearch/action/delete/DeleteResponse.java b/server/src/main/java/org/elasticsearch/action/delete/DeleteResponse.java index f60b755c84617..5e45e3118da41 100644 --- a/server/src/main/java/org/elasticsearch/action/delete/DeleteResponse.java +++ b/server/src/main/java/org/elasticsearch/action/delete/DeleteResponse.java @@ -37,6 +37,10 @@ */ public class DeleteResponse extends DocWriteResponse { + public DeleteResponse(ShardId shardId, StreamInput in) throws IOException { + super(shardId, in); + } + public DeleteResponse(StreamInput in) throws IOException { super(in); } diff --git a/server/src/main/java/org/elasticsearch/action/index/IndexResponse.java b/server/src/main/java/org/elasticsearch/action/index/IndexResponse.java index 592c6a97f835f..1696d01224979 100644 --- a/server/src/main/java/org/elasticsearch/action/index/IndexResponse.java +++ b/server/src/main/java/org/elasticsearch/action/index/IndexResponse.java @@ -38,6 +38,10 @@ */ public class IndexResponse extends DocWriteResponse { + public IndexResponse(ShardId shardId, StreamInput in) throws IOException { + super(shardId, in); + } + public IndexResponse(StreamInput in) throws IOException { super(in); } diff --git a/server/src/main/java/org/elasticsearch/action/update/UpdateResponse.java b/server/src/main/java/org/elasticsearch/action/update/UpdateResponse.java index 5eba54544f9c4..50c71de72efa8 100644 --- a/server/src/main/java/org/elasticsearch/action/update/UpdateResponse.java +++ b/server/src/main/java/org/elasticsearch/action/update/UpdateResponse.java @@ -38,6 +38,13 @@ public class UpdateResponse extends DocWriteResponse { private GetResult getResult; + public UpdateResponse(ShardId shardId, StreamInput in) throws IOException { + super(shardId, in); + if (in.readBoolean()) { + getResult = new GetResult(in); + } + } + public UpdateResponse(StreamInput in) throws IOException { super(in); if (in.readBoolean()) { @@ -72,9 +79,19 @@ public RestStatus status() { return this.result == Result.CREATED ? RestStatus.CREATED : super.status(); } + @Override + public void writeThin(StreamOutput out) throws IOException { + super.writeThin(out); + writeGetResult(out); + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); + writeGetResult(out); + } + + private void writeGetResult(StreamOutput out) throws IOException { if (getResult == null) { out.writeBoolean(false); } else {