Skip to content

Commit e75a6f1

Browse files
Stop Redundantly Serializing ShardId in BulkShardResponse (#56094) (#56866)
When reading/writing the individual doc responses in the context of a bulk shard response there is no need to serialize the `ShardId` over and over. This can waste a lot of memory when handling large bulk requests.
1 parent c02850f commit e75a6f1

File tree

6 files changed

+123
-11
lines changed

6 files changed

+123
-11
lines changed

server/src/main/java/org/elasticsearch/action/DocWriteResponse.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,22 @@ public DocWriteResponse(ShardId shardId, String type, String id, long seqNo, lon
133133
}
134134

135135
// needed for deserialization
136+
protected DocWriteResponse(ShardId shardId, StreamInput in) throws IOException {
137+
super(in);
138+
this.shardId = shardId;
139+
type = in.readString();
140+
id = in.readString();
141+
version = in.readZLong();
142+
seqNo = in.readZLong();
143+
primaryTerm = in.readVLong();
144+
forcedRefresh = in.readBoolean();
145+
result = Result.readFrom(in);
146+
}
147+
148+
/**
149+
* Needed for deserialization of single item requests in {@link org.elasticsearch.action.index.IndexAction} and BwC
150+
* deserialization path
151+
*/
136152
protected DocWriteResponse(StreamInput in) throws IOException {
137153
super(in);
138154
shardId = new ShardId(in);
@@ -272,10 +288,19 @@ public String getLocation(@Nullable String routing) {
272288
return location.toString();
273289
}
274290

291+
public void writeThin(StreamOutput out) throws IOException {
292+
super.writeTo(out);
293+
writeWithoutShardId(out);
294+
}
295+
275296
@Override
276297
public void writeTo(StreamOutput out) throws IOException {
277298
super.writeTo(out);
278299
shardId.writeTo(out);
300+
writeWithoutShardId(out);
301+
}
302+
303+
private void writeWithoutShardId(StreamOutput out) throws IOException {
279304
out.writeString(type);
280305
out.writeString(id);
281306
out.writeZLong(version);

server/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java

Lines changed: 54 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.elasticsearch.common.xcontent.XContentBuilder;
4040
import org.elasticsearch.common.xcontent.XContentParser;
4141
import org.elasticsearch.index.seqno.SequenceNumbers;
42+
import org.elasticsearch.index.shard.ShardId;
4243
import org.elasticsearch.rest.RestStatus;
4344

4445
import java.io.IOException;
@@ -366,6 +367,26 @@ public String toString() {
366367

367368
BulkItemResponse() {}
368369

370+
BulkItemResponse(ShardId shardId, StreamInput in) throws IOException {
371+
id = in.readVInt();
372+
opType = OpType.fromId(in.readByte());
373+
374+
byte type = in.readByte();
375+
if (type == 0) {
376+
response = new IndexResponse(shardId, in);
377+
} else if (type == 1) {
378+
response = new DeleteResponse(shardId, in);
379+
} else if (type == 3) { // make 3 instead of 2, because 2 is already in use for 'no responses'
380+
response = new UpdateResponse(shardId, in);
381+
} else if (type != 2) {
382+
throw new IllegalArgumentException("Unexpected type [" + type + "]");
383+
}
384+
385+
if (in.readBoolean()) {
386+
failure = new Failure(in);
387+
}
388+
}
389+
369390
BulkItemResponse(StreamInput in) throws IOException {
370391
id = in.readVInt();
371392
opType = OpType.fromId(in.readByte());
@@ -377,6 +398,8 @@ public String toString() {
377398
response = new DeleteResponse(in);
378399
} else if (type == 3) { // make 3 instead of 2, because 2 is already in use for 'no responses'
379400
response = new UpdateResponse(in);
401+
} else if (type != 2) {
402+
throw new IllegalArgumentException("Unexpected type [" + type + "]");
380403
}
381404

382405
if (in.readBoolean()) {
@@ -490,13 +513,7 @@ public void writeTo(StreamOutput out) throws IOException {
490513
if (response == null) {
491514
out.writeByte((byte) 2);
492515
} else {
493-
if (response instanceof IndexResponse) {
494-
out.writeByte((byte) 0);
495-
} else if (response instanceof DeleteResponse) {
496-
out.writeByte((byte) 1);
497-
} else if (response instanceof UpdateResponse) {
498-
out.writeByte((byte) 3); // make 3 instead of 2, because 2 is already in use for 'no responses'
499-
}
516+
writeResponseType(out);
500517
response.writeTo(out);
501518
}
502519
if (failure == null) {
@@ -506,4 +523,34 @@ public void writeTo(StreamOutput out) throws IOException {
506523
failure.writeTo(out);
507524
}
508525
}
526+
527+
public void writeThin(StreamOutput out) throws IOException {
528+
out.writeVInt(id);
529+
out.writeByte(opType.getId());
530+
531+
if (response == null) {
532+
out.writeByte((byte) 2);
533+
} else {
534+
writeResponseType(out);
535+
response.writeThin(out);
536+
}
537+
if (failure == null) {
538+
out.writeBoolean(false);
539+
} else {
540+
out.writeBoolean(true);
541+
failure.writeTo(out);
542+
}
543+
}
544+
545+
private void writeResponseType(StreamOutput out) throws IOException {
546+
if (response instanceof IndexResponse) {
547+
out.writeByte((byte) 0);
548+
} else if (response instanceof DeleteResponse) {
549+
out.writeByte((byte) 1);
550+
} else if (response instanceof UpdateResponse) {
551+
out.writeByte((byte) 3); // make 3 instead of 2, because 2 is already in use for 'no responses'
552+
} else {
553+
throw new IllegalStateException("Unexpected response type found [" + response.getClass() + "]");
554+
}
555+
}
509556
}

server/src/main/java/org/elasticsearch/action/bulk/BulkShardResponse.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.elasticsearch.action.bulk;
2121

22+
import org.elasticsearch.Version;
2223
import org.elasticsearch.action.DocWriteResponse;
2324
import org.elasticsearch.action.support.WriteResponse;
2425
import org.elasticsearch.action.support.replication.ReplicationResponse;
@@ -30,15 +31,23 @@
3031

3132
public class BulkShardResponse extends ReplicationResponse implements WriteResponse {
3233

34+
private static final Version COMPACT_SHARD_ID_VERSION = Version.V_7_9_0;
35+
3336
private final ShardId shardId;
3437
private final BulkItemResponse[] responses;
3538

3639
BulkShardResponse(StreamInput in) throws IOException {
3740
super(in);
3841
shardId = new ShardId(in);
3942
responses = new BulkItemResponse[in.readVInt()];
40-
for (int i = 0; i < responses.length; i++) {
41-
responses[i] = new BulkItemResponse(in);
43+
if (in.getVersion().onOrAfter(COMPACT_SHARD_ID_VERSION)) {
44+
for (int i = 0; i < responses.length; i++) {
45+
responses[i] = new BulkItemResponse(shardId, in);
46+
}
47+
} else {
48+
for (int i = 0; i < responses.length; i++) {
49+
responses[i] = new BulkItemResponse(in);
50+
}
4251
}
4352
}
4453

@@ -75,8 +84,14 @@ public void writeTo(StreamOutput out) throws IOException {
7584
super.writeTo(out);
7685
shardId.writeTo(out);
7786
out.writeVInt(responses.length);
78-
for (BulkItemResponse response : responses) {
79-
response.writeTo(out);
87+
if (out.getVersion().onOrAfter(COMPACT_SHARD_ID_VERSION)) {
88+
for (BulkItemResponse response : responses) {
89+
response.writeThin(out);
90+
}
91+
} else {
92+
for (BulkItemResponse response : responses) {
93+
response.writeTo(out);
94+
}
8095
}
8196
}
8297
}

server/src/main/java/org/elasticsearch/action/delete/DeleteResponse.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@
3737
*/
3838
public class DeleteResponse extends DocWriteResponse {
3939

40+
public DeleteResponse(ShardId shardId, StreamInput in) throws IOException {
41+
super(shardId, in);
42+
}
43+
4044
public DeleteResponse(StreamInput in) throws IOException {
4145
super(in);
4246
}

server/src/main/java/org/elasticsearch/action/index/IndexResponse.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,10 @@
3838
*/
3939
public class IndexResponse extends DocWriteResponse {
4040

41+
public IndexResponse(ShardId shardId, StreamInput in) throws IOException {
42+
super(shardId, in);
43+
}
44+
4145
public IndexResponse(StreamInput in) throws IOException {
4246
super(in);
4347
}

server/src/main/java/org/elasticsearch/action/update/UpdateResponse.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,13 @@ public class UpdateResponse extends DocWriteResponse {
3838

3939
private GetResult getResult;
4040

41+
public UpdateResponse(ShardId shardId, StreamInput in) throws IOException {
42+
super(shardId, in);
43+
if (in.readBoolean()) {
44+
getResult = new GetResult(in);
45+
}
46+
}
47+
4148
public UpdateResponse(StreamInput in) throws IOException {
4249
super(in);
4350
if (in.readBoolean()) {
@@ -72,9 +79,19 @@ public RestStatus status() {
7279
return this.result == Result.CREATED ? RestStatus.CREATED : super.status();
7380
}
7481

82+
@Override
83+
public void writeThin(StreamOutput out) throws IOException {
84+
super.writeThin(out);
85+
writeGetResult(out);
86+
}
87+
7588
@Override
7689
public void writeTo(StreamOutput out) throws IOException {
7790
super.writeTo(out);
91+
writeGetResult(out);
92+
}
93+
94+
private void writeGetResult(StreamOutput out) throws IOException {
7895
if (getResult == null) {
7996
out.writeBoolean(false);
8097
} else {

0 commit comments

Comments
 (0)