-
Notifications
You must be signed in to change notification settings - Fork 2.3k
[Remote Routing Table] Implement write and read flow for shard diff file. #14684
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Bukhtawar
merged 20 commits into
opensearch-project:main
from
shailendra0811:singhlhs-rrt-shard-diff
Jul 23, 2024
Merged
Changes from all commits
Commits
Show all changes
20 commits
Select commit
Hold shift + click to select a range
011dead
Implement write and read flow to upload/download shard diff file.
66ddb7f
Create RoutingTableIncrementalDiff non-remote entity.
740e94b
Address comments
8258729
Add removed UTs.
2c9bf50
Address comments.
cef920f
Add UTs for write
dce3b88
Add tests for upload/download shard diff file.
4817099
Remove unwanted UT.
46b89ec
Add UTs for Manifest diff file.
ba79c44
Add UTs for SerDe RemoteIndexRoutingTableDiff
8351a4c
Remove unwanted changes
4773bbb
Add DiffManifest SerDe Test
1d52191
Delegate responsibility to IndexShardRouting
5d3f31d
RoutingTableIncrementalDiff implements Diff<RoutingTable>
88beb03
Merge 'main' into singhlhs-rrt-shard-diff
ba0bf37
Address comments
ae019cc
Merge branch 'main' into singhlhs-rrt-shard-diff
183c8bc
Update ITs for routing table service
a1506ec
Avoid reading shard diff in case of full state apply.
91151ad
Delete stale indices routing diff file.
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
168 changes: 168 additions & 0 deletions
168
server/src/main/java/org/opensearch/cluster/routing/RoutingTableIncrementalDiff.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,168 @@ | ||
| /* | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| * | ||
| * The OpenSearch Contributors require contributions made to | ||
| * this file be licensed under the Apache-2.0 license or a | ||
| * compatible open source license. | ||
| */ | ||
|
|
||
| package org.opensearch.cluster.routing; | ||
|
|
||
| import org.opensearch.cluster.Diff; | ||
| import org.opensearch.core.common.io.stream.StreamInput; | ||
| import org.opensearch.core.common.io.stream.StreamOutput; | ||
|
|
||
| import java.io.IOException; | ||
| import java.util.ArrayList; | ||
| import java.util.HashMap; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
|
|
||
| /** | ||
| * Represents a difference between {@link RoutingTable} objects that can be serialized and deserialized. | ||
| */ | ||
| public class RoutingTableIncrementalDiff implements Diff<RoutingTable> { | ||
|
|
||
| private final Map<String, Diff<IndexRoutingTable>> diffs; | ||
|
|
||
| /** | ||
| * Constructs a new RoutingTableIncrementalDiff with the given differences. | ||
| * | ||
| * @param diffs a map containing the differences of {@link IndexRoutingTable}. | ||
| */ | ||
| public RoutingTableIncrementalDiff(Map<String, Diff<IndexRoutingTable>> diffs) { | ||
| this.diffs = diffs; | ||
| } | ||
|
|
||
| /** | ||
| * Gets the map of differences of {@link IndexRoutingTable}. | ||
| * | ||
| * @return a map containing the differences. | ||
| */ | ||
| public Map<String, Diff<IndexRoutingTable>> getDiffs() { | ||
| return diffs; | ||
| } | ||
|
|
||
| /** | ||
| * Reads a {@link RoutingTableIncrementalDiff} from the given {@link StreamInput}. | ||
| * | ||
| * @param in the input stream to read from. | ||
| * @return the deserialized RoutingTableIncrementalDiff. | ||
| * @throws IOException if an I/O exception occurs while reading from the stream. | ||
| */ | ||
| public static RoutingTableIncrementalDiff readFrom(StreamInput in) throws IOException { | ||
| int size = in.readVInt(); | ||
| Map<String, Diff<IndexRoutingTable>> diffs = new HashMap<>(); | ||
|
|
||
| for (int i = 0; i < size; i++) { | ||
| String key = in.readString(); | ||
| Diff<IndexRoutingTable> diff = IndexRoutingTableIncrementalDiff.readFrom(in); | ||
| diffs.put(key, diff); | ||
| } | ||
| return new RoutingTableIncrementalDiff(diffs); | ||
| } | ||
|
|
||
| /** | ||
| * Applies the differences to the provided {@link RoutingTable}. | ||
| * | ||
| * @param part the original RoutingTable to which the differences will be applied. | ||
| * @return the updated RoutingTable with the applied differences. | ||
| */ | ||
| @Override | ||
| public RoutingTable apply(RoutingTable part) { | ||
| RoutingTable.Builder builder = new RoutingTable.Builder(); | ||
| for (IndexRoutingTable indexRoutingTable : part) { | ||
| builder.add(indexRoutingTable); // Add existing index routing tables to builder | ||
| } | ||
|
|
||
| // Apply the diffs | ||
| for (Map.Entry<String, Diff<IndexRoutingTable>> entry : diffs.entrySet()) { | ||
| builder.add(entry.getValue().apply(part.index(entry.getKey()))); | ||
| } | ||
|
|
||
| return builder.build(); | ||
| } | ||
|
|
||
| /** | ||
| * Writes the differences to the given {@link StreamOutput}. | ||
| * | ||
| * @param out the output stream to write to. | ||
| * @throws IOException if an I/O exception occurs while writing to the stream. | ||
| */ | ||
| @Override | ||
| public void writeTo(StreamOutput out) throws IOException { | ||
| out.writeVInt(diffs.size()); | ||
| for (Map.Entry<String, Diff<IndexRoutingTable>> entry : diffs.entrySet()) { | ||
| out.writeString(entry.getKey()); | ||
| entry.getValue().writeTo(out); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Represents a difference between {@link IndexShardRoutingTable} objects that can be serialized and deserialized. | ||
| */ | ||
| public static class IndexRoutingTableIncrementalDiff implements Diff<IndexRoutingTable> { | ||
shailendra0811 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| private final List<IndexShardRoutingTable> indexShardRoutingTables; | ||
|
|
||
| /** | ||
| * Constructs a new IndexShardRoutingTableDiff with the given shard routing tables. | ||
| * | ||
| * @param indexShardRoutingTables a list of IndexShardRoutingTable representing the differences. | ||
| */ | ||
| public IndexRoutingTableIncrementalDiff(List<IndexShardRoutingTable> indexShardRoutingTables) { | ||
| this.indexShardRoutingTables = indexShardRoutingTables; | ||
| } | ||
|
|
||
| /** | ||
| * Applies the differences to the provided {@link IndexRoutingTable}. | ||
| * | ||
| * @param part the original IndexRoutingTable to which the differences will be applied. | ||
| * @return the updated IndexRoutingTable with the applied differences. | ||
| */ | ||
| @Override | ||
| public IndexRoutingTable apply(IndexRoutingTable part) { | ||
| IndexRoutingTable.Builder builder = new IndexRoutingTable.Builder(part.getIndex()); | ||
| for (IndexShardRoutingTable shardRoutingTable : part) { | ||
| builder.addIndexShard(shardRoutingTable); // Add existing shards to builder | ||
| } | ||
|
|
||
| // Apply the diff: update or add the new shard routing tables | ||
| for (IndexShardRoutingTable diffShard : indexShardRoutingTables) { | ||
| builder.addIndexShard(diffShard); | ||
| } | ||
| return builder.build(); | ||
| } | ||
|
|
||
| /** | ||
| * Writes the differences to the given {@link StreamOutput}. | ||
| * | ||
| * @param out the output stream to write to. | ||
| * @throws IOException if an I/O exception occurs while writing to the stream. | ||
| */ | ||
| @Override | ||
| public void writeTo(StreamOutput out) throws IOException { | ||
| out.writeVInt(indexShardRoutingTables.size()); | ||
| for (IndexShardRoutingTable shardRoutingTable : indexShardRoutingTables) { | ||
| IndexShardRoutingTable.Builder.writeTo(shardRoutingTable, out); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Reads a {@link IndexRoutingTableIncrementalDiff} from the given {@link StreamInput}. | ||
| * | ||
| * @param in the input stream to read from. | ||
| * @return the deserialized IndexShardRoutingTableDiff. | ||
| * @throws IOException if an I/O exception occurs while reading from the stream. | ||
| */ | ||
| public static IndexRoutingTableIncrementalDiff readFrom(StreamInput in) throws IOException { | ||
| int size = in.readVInt(); | ||
| List<IndexShardRoutingTable> indexShardRoutingTables = new ArrayList<>(size); | ||
| for (int i = 0; i < size; i++) { | ||
| IndexShardRoutingTable shardRoutingTable = IndexShardRoutingTable.Builder.readFrom(in); | ||
| indexShardRoutingTables.add(shardRoutingTable); | ||
| } | ||
| return new IndexRoutingTableIncrementalDiff(indexShardRoutingTables); | ||
| } | ||
| } | ||
| } | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.