Skip to content

Commit cae6fe0

Browse files
HyunSangHanPeter Alfonsi
authored andcommitted
Replace Version with RemoteVersion in reindex module (opensearch-project#18457)
- Add RemoteVersion class with backward compatibility for Elasticsearch versions - Support version parsing from string with distribution parameter - Include version comparison methods (before, after, onOrAfter, onOrBefore) - Support both Elasticsearch and OpenSearch version constants --------- Signed-off-by: Hyunsang Han <[email protected]>
1 parent c56f46d commit cae6fe0

File tree

7 files changed

+425
-73
lines changed

7 files changed

+425
-73
lines changed

modules/reindex/src/main/java/org/opensearch/index/reindex/remote/RemoteRequestBuilders.java

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
import org.apache.hc.core5.http.ContentType;
3636
import org.apache.hc.core5.http.io.entity.StringEntity;
3737
import org.opensearch.OpenSearchException;
38-
import org.opensearch.Version;
3938
import org.opensearch.action.search.SearchRequest;
4039
import org.opensearch.client.Request;
4140
import org.opensearch.common.logging.DeprecationLogger;
@@ -74,7 +73,7 @@ final class RemoteRequestBuilders {
7473

7574
private RemoteRequestBuilders() {}
7675

77-
static Request initialSearch(SearchRequest searchRequest, BytesReference query, Version remoteVersion) {
76+
static Request initialSearch(SearchRequest searchRequest, BytesReference query, RemoteVersion remoteVersion) {
7877
// It is nasty to build paths with StringBuilder but we'll be careful....
7978
StringBuilder path = new StringBuilder("/");
8079
addIndices(path, searchRequest.indices());
@@ -84,7 +83,7 @@ static Request initialSearch(SearchRequest searchRequest, BytesReference query,
8483
if (searchRequest.scroll() != null) {
8584
TimeValue keepAlive = searchRequest.scroll().keepAlive();
8685
// V_5_0_0
87-
if (remoteVersion.before(Version.fromId(5000099))) {
86+
if (remoteVersion.before(RemoteVersion.ELASTICSEARCH_5_0_0)) {
8887
/* Versions of Elasticsearch before 5.0 couldn't parse nanos or micros
8988
* so we toss out that resolution, rounding up because more scroll
9089
* timeout seems safer than less. */
@@ -103,7 +102,7 @@ static Request initialSearch(SearchRequest searchRequest, BytesReference query,
103102
if (searchRequest.source().sorts() != null) {
104103
boolean useScan = false;
105104
// Detect if we should use search_type=scan rather than a sort
106-
if (remoteVersion.before(Version.fromId(2010099))) {
105+
if (remoteVersion.before(RemoteVersion.ELASTICSEARCH_2_1_0)) {
107106
for (SortBuilder<?> sort : searchRequest.source().sorts()) {
108107
if (sort instanceof FieldSortBuilder) {
109108
FieldSortBuilder f = (FieldSortBuilder) sort;
@@ -124,10 +123,10 @@ static Request initialSearch(SearchRequest searchRequest, BytesReference query,
124123
request.addParameter("sort", sorts.toString());
125124
}
126125
}
127-
if (remoteVersion.before(Version.fromId(2000099))) {
126+
if (remoteVersion.before(RemoteVersion.ELASTICSEARCH_2_0_0)) {
128127
// Versions before 2.0.0 need prompting to return interesting fields. Note that timestamp isn't available at all....
129128
searchRequest.source().storedField("_parent").storedField("_routing").storedField("_ttl");
130-
if (remoteVersion.before(Version.fromId(1000099))) {
129+
if (remoteVersion.before(RemoteVersion.ELASTICSEARCH_1_0_0)) {
131130
// Versions before 1.0.0 don't support `"_source": true` so we have to ask for the _source in a funny way.
132131
if (false == searchRequest.source().storedFields().fieldNames().contains("_source")) {
133132
searchRequest.source().storedField("_source");
@@ -140,11 +139,11 @@ static Request initialSearch(SearchRequest searchRequest, BytesReference query,
140139
fields.append(',').append(searchRequest.source().storedFields().fieldNames().get(i));
141140
}
142141
// V_5_0_0
143-
String storedFieldsParamName = remoteVersion.before(Version.fromId(5000099)) ? "fields" : "stored_fields";
142+
String storedFieldsParamName = remoteVersion.before(RemoteVersion.ELASTICSEARCH_5_0_0) ? "fields" : "stored_fields";
144143
request.addParameter(storedFieldsParamName, fields.toString());
145144
}
146145

147-
if (remoteVersion.onOrAfter(Version.fromId(6030099))) {
146+
if (remoteVersion.onOrAfter(RemoteVersion.ELASTICSEARCH_6_3_0)) {
148147
// allow_partial_results introduced in 6.3, running remote reindex against earlier versions still silently discards RED shards.
149148
request.addParameter("allow_partial_search_results", "false");
150149
}
@@ -173,7 +172,7 @@ static Request initialSearch(SearchRequest searchRequest, BytesReference query,
173172
if (searchRequest.source().fetchSource() != null) {
174173
entity.field("_source", searchRequest.source().fetchSource());
175174
} else {
176-
if (remoteVersion.onOrAfter(Version.fromId(1000099))) {
175+
if (remoteVersion.onOrAfter(RemoteVersion.ELASTICSEARCH_1_0_0)) {
177176
// Versions before 1.0 don't support `"_source": true` so we have to ask for the source as a stored field.
178177
entity.field("_source", true);
179178
}
@@ -225,19 +224,18 @@ private static String sortToUri(SortBuilder<?> sort) {
225224
throw new IllegalArgumentException("Unsupported sort [" + sort + "]");
226225
}
227226

228-
static Request scroll(String scroll, TimeValue keepAlive, Version remoteVersion) {
227+
static Request scroll(String scroll, TimeValue keepAlive, RemoteVersion remoteVersion) {
229228
Request request = new Request("POST", "/_search/scroll");
230229

231-
// V_5_0_0
232-
if (remoteVersion.before(Version.fromId(5000099))) {
230+
if (remoteVersion.before(RemoteVersion.ELASTICSEARCH_5_0_0)) {
233231
/* Versions of Elasticsearch before 5.0 couldn't parse nanos or micros
234232
* so we toss out that resolution, rounding up so we shouldn't end up
235233
* with 0s. */
236234
keepAlive = timeValueMillis((long) Math.ceil(keepAlive.millisFrac()));
237235
}
238236
request.addParameter("scroll", keepAlive.getStringRep());
239237

240-
if (remoteVersion.before(Version.fromId(2000099))) {
238+
if (remoteVersion.before(RemoteVersion.ELASTICSEARCH_2_0_0)) {
241239
// Versions before 2.0.0 extract the plain scroll_id from the body
242240
request.setEntity(new StringEntity(scroll, ContentType.TEXT_PLAIN));
243241
return request;
@@ -252,10 +250,10 @@ static Request scroll(String scroll, TimeValue keepAlive, Version remoteVersion)
252250
return request;
253251
}
254252

255-
static Request clearScroll(String scroll, Version remoteVersion) {
253+
static Request clearScroll(String scroll, RemoteVersion remoteVersion) {
256254
Request request = new Request("DELETE", "/_search/scroll");
257255

258-
if (remoteVersion.before(Version.fromId(2000099))) {
256+
if (remoteVersion.before(RemoteVersion.ELASTICSEARCH_2_0_0)) {
259257
// Versions before 2.0.0 extract the plain scroll_id from the body
260258
request.setEntity(new StringEntity(scroll, ContentType.TEXT_PLAIN));
261259
return request;

modules/reindex/src/main/java/org/opensearch/index/reindex/remote/RemoteResponseParsers.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,6 @@
3333
package org.opensearch.index.reindex.remote;
3434

3535
import org.apache.lucene.search.TotalHits;
36-
import org.opensearch.LegacyESVersion;
37-
import org.opensearch.Version;
3836
import org.opensearch.common.collect.Tuple;
3937
import org.opensearch.core.ParseField;
4038
import org.opensearch.core.common.ParsingException;
@@ -294,20 +292,21 @@ public void setCausedBy(Throwable causedBy) {
294292
}
295293

296294
/**
297-
* Parses the main action to return just the {@linkplain Version} that it returns. We throw everything else out.
295+
* Parses the main action to return just the {@linkplain RemoteVersion} that it returns. We throw everything else out.
298296
*/
299-
public static final ConstructingObjectParser<Version, MediaType> MAIN_ACTION_PARSER = new ConstructingObjectParser<>(
297+
public static final ConstructingObjectParser<RemoteVersion, MediaType> MAIN_ACTION_PARSER = new ConstructingObjectParser<>(
300298
"/",
301299
true,
302-
a -> (Version) a[0]
300+
a -> (RemoteVersion) a[0]
303301
);
304302
static {
305-
ConstructingObjectParser<Version, MediaType> versionParser = new ConstructingObjectParser<>(
303+
ConstructingObjectParser<RemoteVersion, MediaType> versionParser = new ConstructingObjectParser<>(
306304
"version",
307305
true,
308-
a -> a[0] == null
309-
? LegacyESVersion.fromString(((String) a[1]).replace("-SNAPSHOT", "").replaceFirst("-(alpha\\d+|beta\\d+|rc\\d+)", ""))
310-
: Version.fromString(((String) a[1]).replace("-SNAPSHOT", "").replaceFirst("-(alpha\\d+|beta\\d+|rc\\d+)", ""))
306+
a -> RemoteVersion.fromString(
307+
((String) a[1]).replace("-SNAPSHOT", "").replaceFirst("-(alpha\\d+|beta\\d+|rc\\d+)", ""),
308+
a[0] != null
309+
)
311310
);
312311
versionParser.declareStringOrNull(optionalConstructorArg(), new ParseField("distribution"));
313312
versionParser.declareString(constructorArg(), new ParseField("number"));

modules/reindex/src/main/java/org/opensearch/index/reindex/remote/RemoteScrollableHitSource.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
import org.apache.logging.log4j.util.Supplier;
4343
import org.opensearch.OpenSearchException;
4444
import org.opensearch.OpenSearchStatusException;
45-
import org.opensearch.Version;
4645
import org.opensearch.action.bulk.BackoffPolicy;
4746
import org.opensearch.action.search.SearchRequest;
4847
import org.opensearch.client.Request;
@@ -81,7 +80,7 @@ public class RemoteScrollableHitSource extends ScrollableHitSource {
8180
private final RestClient client;
8281
private final BytesReference query;
8382
private final SearchRequest searchRequest;
84-
Version remoteVersion;
83+
RemoteVersion remoteVersion;
8584

8685
public RemoteScrollableHitSource(
8786
Logger logger,
@@ -116,7 +115,7 @@ protected void doStart(RejectAwareActionListener<Response> searchListener) {
116115
}, searchListener::onFailure, searchListener::onFailure));
117116
}
118117

119-
void lookupRemoteVersion(RejectAwareActionListener<Version> listener) {
118+
void lookupRemoteVersion(RejectAwareActionListener<RemoteVersion> listener) {
120119
logger.trace("Checking version for remote domain");
121120
// We're skipping retries for the first call to remote cluster so that we fail fast & respond back immediately
122121
// instead of retrying for longer duration.
@@ -159,7 +158,8 @@ public void onFailure(Exception e) {
159158
private void logFailure(Exception e) {
160159
if (e instanceof ResponseException) {
161160
ResponseException re = (ResponseException) e;
162-
if (remoteVersion.before(Version.fromId(2000099)) && re.getResponse().getStatusLine().getStatusCode() == 404) {
161+
if (remoteVersion.before(RemoteVersion.ELASTICSEARCH_2_0_0)
162+
&& re.getResponse().getStatusLine().getStatusCode() == 404) {
163163
logger.debug(
164164
(Supplier<?>) () -> new ParameterizedMessage(
165165
"Failed to clear scroll [{}] from pre-2.0 OpenSearch. This is normal if the request terminated "
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.reindex.remote;
10+
11+
import java.util.Objects;
12+
13+
/**
14+
* Represents a version of a remote Elasticsearch or OpenSearch cluster for reindexing purposes.
15+
* This class provides backward compatibility support for communicating with older Elasticsearch versions
16+
* without relying on the global Version class.
17+
*/
18+
public final class RemoteVersion implements Comparable<RemoteVersion> {
19+
20+
private final int major;
21+
private final int minor;
22+
private final int revision;
23+
final boolean isOpenSearch;
24+
25+
// Common version constants for backward compatibility
26+
public static final RemoteVersion ELASTICSEARCH_0_20_5 = new RemoteVersion(0, 20, 5, false);
27+
public static final RemoteVersion ELASTICSEARCH_0_90_13 = new RemoteVersion(0, 90, 13, false);
28+
public static final RemoteVersion ELASTICSEARCH_1_0_0 = new RemoteVersion(1, 0, 0, false);
29+
public static final RemoteVersion ELASTICSEARCH_1_7_5 = new RemoteVersion(1, 7, 5, false);
30+
public static final RemoteVersion ELASTICSEARCH_2_0_0 = new RemoteVersion(2, 0, 0, false);
31+
public static final RemoteVersion ELASTICSEARCH_2_1_0 = new RemoteVersion(2, 1, 0, false);
32+
public static final RemoteVersion ELASTICSEARCH_2_3_3 = new RemoteVersion(2, 3, 3, false);
33+
public static final RemoteVersion ELASTICSEARCH_5_0_0 = new RemoteVersion(5, 0, 0, false);
34+
public static final RemoteVersion ELASTICSEARCH_6_0_0 = new RemoteVersion(6, 0, 0, false);
35+
public static final RemoteVersion ELASTICSEARCH_6_3_0 = new RemoteVersion(6, 3, 0, false);
36+
public static final RemoteVersion ELASTICSEARCH_7_0_0 = new RemoteVersion(7, 0, 0, false);
37+
38+
// OpenSearch versions
39+
public static final RemoteVersion OPENSEARCH_1_0_0 = new RemoteVersion(1, 0, 0, true);
40+
public static final RemoteVersion OPENSEARCH_2_0_0 = new RemoteVersion(2, 0, 0, true);
41+
public static final RemoteVersion OPENSEARCH_3_1_0 = new RemoteVersion(3, 1, 0, true);
42+
43+
public RemoteVersion(int major, int minor, int revision, boolean isOpenSearch) {
44+
this.major = major;
45+
this.minor = minor;
46+
this.revision = revision;
47+
this.isOpenSearch = isOpenSearch;
48+
}
49+
50+
/**
51+
* Parse version string like "7.10.2" or "1.0.0"
52+
*/
53+
public static RemoteVersion fromString(String version, boolean isOpenSearch) {
54+
if (version == null || version.trim().isEmpty()) {
55+
throw new IllegalArgumentException("Version string cannot be null or empty");
56+
}
57+
58+
// Remove snapshot and pre-release qualifiers
59+
String cleanVersion = version.replace("-SNAPSHOT", "").replaceFirst("-(alpha\\d+|beta\\d+|rc\\d+)", "");
60+
61+
String[] parts = cleanVersion.split("\\.");
62+
if (parts.length < 2) {
63+
throw new IllegalArgumentException("Invalid version format: " + version);
64+
}
65+
66+
try {
67+
int major = Integer.parseInt(parts[0]);
68+
int minor = Integer.parseInt(parts[1]);
69+
int revision = parts.length > 2 ? Integer.parseInt(parts[2]) : 0;
70+
71+
return new RemoteVersion(major, minor, revision, isOpenSearch);
72+
} catch (NumberFormatException e) {
73+
throw new IllegalArgumentException("Invalid version format: " + version, e);
74+
}
75+
}
76+
77+
public boolean before(RemoteVersion other) {
78+
return this.compareTo(other) < 0;
79+
}
80+
81+
public boolean onOrAfter(RemoteVersion other) {
82+
return this.compareTo(other) >= 0;
83+
}
84+
85+
public boolean onOrBefore(RemoteVersion other) {
86+
return this.compareTo(other) <= 0;
87+
}
88+
89+
public boolean after(RemoteVersion other) {
90+
return this.compareTo(other) > 0;
91+
}
92+
93+
@Override
94+
public int compareTo(RemoteVersion other) {
95+
if (other == null) {
96+
return 1;
97+
}
98+
99+
if (this.isOpenSearch != other.isOpenSearch) {
100+
return this.isOpenSearch ? 1 : -1;
101+
}
102+
103+
int result = Integer.compare(this.major, other.major);
104+
if (result != 0) {
105+
return result;
106+
}
107+
108+
result = Integer.compare(this.minor, other.minor);
109+
if (result != 0) {
110+
return result;
111+
}
112+
113+
return Integer.compare(this.revision, other.revision);
114+
}
115+
116+
@Override
117+
public boolean equals(Object obj) {
118+
if (this == obj) {
119+
return true;
120+
}
121+
if (obj == null || getClass() != obj.getClass()) {
122+
return false;
123+
}
124+
RemoteVersion that = (RemoteVersion) obj;
125+
return major == that.major && minor == that.minor && revision == that.revision && Objects.equals(isOpenSearch, that.isOpenSearch);
126+
}
127+
128+
@Override
129+
public int hashCode() {
130+
return Objects.hash(major, minor, revision, isOpenSearch);
131+
}
132+
133+
@Override
134+
public String toString() {
135+
return major + "." + minor + "." + revision;
136+
}
137+
138+
public int getMajor() {
139+
return major;
140+
}
141+
142+
public int getMinor() {
143+
return minor;
144+
}
145+
146+
public int getRevision() {
147+
return revision;
148+
}
149+
}

0 commit comments

Comments
 (0)