Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,25 @@ public DocWriteResponse(ShardId shardId, String id, long seqNo, long primaryTerm
}

// needed for deserialization
protected DocWriteResponse(ShardId shardId, StreamInput in) throws IOException {
super(in);
this.shardId = shardId;
if (in.getVersion().before(Version.V_8_0_0)) {
String type = in.readString();
assert MapperService.SINGLE_MAPPING_NAME.equals(type) : "Expected [_doc] but received [" + type + "]";
}
id = in.readString();
version = in.readZLong();
seqNo = in.readZLong();
primaryTerm = in.readVLong();
forcedRefresh = in.readBoolean();
result = Result.readFrom(in);
}

/**
* Needed for deserialization of single item requests in {@link org.elasticsearch.action.index.IndexAction} and BwC
* deserialization path
*/
protected DocWriteResponse(StreamInput in) throws IOException {
super(in);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we assert anything about in.getVersion() here? Not looked in detail, but I hope that this constructor will become obsolete after this change is complete.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not quite I think. We still have TransportIndexAction which still uses and logically needs the full deserialization on an IndexResponse as far as I can tell and won't go away in 8 right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we get rid of that yet? It was deprecated over 3 years ago.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right in 8 we can ... opening a PR for that sec

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Urgh nevermind, technically we can remove this now but it's a far from trivial change to do so (we're still using that action all over the place in tests). I don't think I can do that in the short-term.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, can we mention IndexAction in this method's Javadoc so we keep track of the dependency.

shardId = new ShardId(in);
Expand Down Expand Up @@ -258,10 +277,19 @@ public String getLocation(@Nullable String routing) {
return location.toString();
}

public void writeThin(StreamOutput out) throws IOException {
super.writeTo(out);
writeWithoutShardId(out);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
shardId.writeTo(out);
writeWithoutShardId(out);
}

private void writeWithoutShardId(StreamOutput out) throws IOException {
if (out.getVersion().before(Version.V_8_0_0)) {
out.writeString(MapperService.SINGLE_MAPPING_NAME);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.rest.RestStatus;

import java.io.IOException;
Expand Down Expand Up @@ -359,6 +360,26 @@ public String toString() {

BulkItemResponse() {}

BulkItemResponse(ShardId shardId, StreamInput in) throws IOException {
id = in.readVInt();
opType = OpType.fromId(in.readByte());

byte type = in.readByte();
if (type == 0) {
response = new IndexResponse(shardId, in);
} else if (type == 1) {
response = new DeleteResponse(shardId, in);
} else if (type == 3) { // make 3 instead of 2, because 2 is already in use for 'no responses'
response = new UpdateResponse(shardId, in);
} else if (type != 2) {
throw new IllegalArgumentException("Unexpected type [" + type + "]");
}

if (in.readBoolean()) {
failure = new Failure(in);
}
}

BulkItemResponse(StreamInput in) throws IOException {
id = in.readVInt();
opType = OpType.fromId(in.readByte());
Expand All @@ -370,6 +391,8 @@ public String toString() {
response = new DeleteResponse(in);
} else if (type == 3) { // make 3 instead of 2, because 2 is already in use for 'no responses'
response = new UpdateResponse(in);
} else if (type != 2) {
throw new IllegalArgumentException("Unexpected type [" + type + "]");
}

if (in.readBoolean()) {
Expand Down Expand Up @@ -473,13 +496,7 @@ public void writeTo(StreamOutput out) throws IOException {
if (response == null) {
out.writeByte((byte) 2);
} else {
if (response instanceof IndexResponse) {
out.writeByte((byte) 0);
} else if (response instanceof DeleteResponse) {
out.writeByte((byte) 1);
} else if (response instanceof UpdateResponse) {
out.writeByte((byte) 3); // make 3 instead of 2, because 2 is already in use for 'no responses'
}
writeResponseType(out);
response.writeTo(out);
}
if (failure == null) {
Expand All @@ -489,4 +506,34 @@ public void writeTo(StreamOutput out) throws IOException {
failure.writeTo(out);
}
}

public void writeThin(StreamOutput out) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to leave BulkItemResponse#writeTo() as dead code that mostly duplicates this. Does that mean that we don't need BulkItemResponse implements Writeable? If so, can you remove that and rename these methods more appropriately?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're still using it in org.elasticsearch.action.bulk.BulkItemRequest#writeTo (though now that I looked over that, we might not have to ever write the ShardId for that one as well logically because it's always part of a BulkShardRequest which never has null for the ShardId).
To me it seems easier to leave it Writable for now so we have the symmetry with the constructor and rename (and not make it a Writable) after working out how to not write the shard id in that last spot as well. (that's easier to do after we clean up the request side of things in #56092 )

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok

out.writeVInt(id);
out.writeByte(opType.getId());

if (response == null) {
out.writeByte((byte) 2);
} else {
writeResponseType(out);
response.writeThin(out);
}
if (failure == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
failure.writeTo(out);
}
}

private void writeResponseType(StreamOutput out) throws IOException {
if (response instanceof IndexResponse) {
out.writeByte((byte) 0);
} else if (response instanceof DeleteResponse) {
out.writeByte((byte) 1);
} else if (response instanceof UpdateResponse) {
out.writeByte((byte) 3); // make 3 instead of 2, because 2 is already in use for 'no responses'
} else {
throw new IllegalStateException("Unexpected response type found [" + response.getClass() + "]");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.action.bulk;

import org.elasticsearch.Version;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.support.WriteResponse;
import org.elasticsearch.action.support.replication.ReplicationResponse;
Expand All @@ -30,15 +31,23 @@

public class BulkShardResponse extends ReplicationResponse implements WriteResponse {

private static final Version COMPACT_SHARD_ID_VERSION = Version.V_8_0_0;

private final ShardId shardId;
private final BulkItemResponse[] responses;

BulkShardResponse(StreamInput in) throws IOException {
super(in);
shardId = new ShardId(in);
responses = new BulkItemResponse[in.readVInt()];
for (int i = 0; i < responses.length; i++) {
responses[i] = new BulkItemResponse(in);
if (in.getVersion().onOrAfter(COMPACT_SHARD_ID_VERSION)) {
for (int i = 0; i < responses.length; i++) {
responses[i] = new BulkItemResponse(shardId, in);
}
} else {
for (int i = 0; i < responses.length; i++) {
responses[i] = new BulkItemResponse(in);
}
}
}

Expand Down Expand Up @@ -75,8 +84,14 @@ public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
shardId.writeTo(out);
out.writeVInt(responses.length);
for (BulkItemResponse response : responses) {
response.writeTo(out);
if (out.getVersion().onOrAfter(COMPACT_SHARD_ID_VERSION)) {
for (BulkItemResponse response : responses) {
response.writeThin(out);
}
} else {
for (BulkItemResponse response : responses) {
response.writeTo(out);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@
*/
public class DeleteResponse extends DocWriteResponse {

public DeleteResponse(ShardId shardId, StreamInput in) throws IOException {
super(shardId, in);
}

public DeleteResponse(StreamInput in) throws IOException {
super(in);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@
*/
public class IndexResponse extends DocWriteResponse {

public IndexResponse(ShardId shardId, StreamInput in) throws IOException {
super(shardId, in);
}

public IndexResponse(StreamInput in) throws IOException {
super(in);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ public class UpdateResponse extends DocWriteResponse {

private GetResult getResult;

public UpdateResponse(ShardId shardId, StreamInput in) throws IOException {
super(shardId, in);
if (in.readBoolean()) {
getResult = new GetResult(in);
}
}

public UpdateResponse(StreamInput in) throws IOException {
super(in);
if (in.readBoolean()) {
Expand Down Expand Up @@ -72,9 +79,19 @@ public RestStatus status() {
return this.result == Result.CREATED ? RestStatus.CREATED : super.status();
}

@Override
public void writeThin(StreamOutput out) throws IOException {
super.writeThin(out);
writeGetResult(out);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
writeGetResult(out);
}

private void writeGetResult(StreamOutput out) throws IOException {
if (getResult == null) {
out.writeBoolean(false);
} else {
Expand Down