Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Added the core process for warming merged segments in remote-store enabled domains ([#18683](https://github.com/opensearch-project/OpenSearch/pull/18683))
- Optimize Composite Aggregations by removing unnecessary object allocations ([#18531](https://github.com/opensearch-project/OpenSearch/pull/18531))
- [Star-Tree] Add search support for ip field type ([#18671](https://github.com/opensearch-project/OpenSearch/pull/18671))
- [Derived Source] Add integration of derived source feature across various paths like get/search/recovery ([#18565](https://github.com/opensearch-project/OpenSearch/pull/18565))

### Changed
- Update Subject interface to use CheckedRunnable ([#18570](https://github.com/opensearch-project/OpenSearch/issues/18570))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,14 @@

package org.opensearch.index.reindex;

import org.opensearch.action.bulk.BulkRequestBuilder;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.index.IndexRequestBuilder;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.search.SearchHit;
import org.opensearch.search.sort.SortOrder;

import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -41,7 +48,9 @@
import java.util.Map;
import java.util.stream.Collectors;

import static org.opensearch.index.query.QueryBuilders.matchAllQuery;
import static org.opensearch.index.query.QueryBuilders.termQuery;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
Expand Down Expand Up @@ -177,4 +186,301 @@ public void testMissingSources() {
assertThat(response, matcher().created(0).slices(hasSize(0)));
}

public void testReindexWithDerivedSource() throws Exception {
// Create source index with derived source setting enabled
String sourceIndexMapping = """
{
"settings": {
"index": {
"number_of_shards": 1,
"number_of_replicas": 0,
"derived_source": {
"enabled": true
}
}
},
"mappings": {
"_doc": {
"properties": {
"foo": {
"type": "keyword",
"store": true
},
"bar": {
"type": "integer",
"store": true
}
}
}
}
}""";

// Create indices
assertAcked(prepareCreate("source_index").setSource(sourceIndexMapping, XContentType.JSON));
assertAcked(prepareCreate("dest_index").setSource(sourceIndexMapping, XContentType.JSON));
ensureGreen();

// Index some documents
int numDocs = randomIntBetween(5, 20);
List<IndexRequestBuilder> docs = new ArrayList<>();
for (int i = 0; i < numDocs; i++) {
docs.add(client().prepareIndex("source_index").setId(Integer.toString(i)).setSource("foo", "value_" + i, "bar", i));
}
indexRandom(true, docs);

// Test 1: Basic reindex
ReindexRequestBuilder copy = reindex().source("source_index").destination("dest_index").refresh(true);

BulkByScrollResponse response = copy.get();
assertThat(response, matcher().created(numDocs));
long expectedCount = client().prepareSearch("dest_index").setQuery(matchAllQuery()).get().getHits().getTotalHits().value();
assertEquals(numDocs, expectedCount);

// Test 2: Reindex with query filter
String destIndexFiltered = "dest_index_filtered";
assertAcked(prepareCreate(destIndexFiltered).setSource(sourceIndexMapping, XContentType.JSON));

copy = reindex().source("source_index").destination(destIndexFiltered).filter(termQuery("bar", 1)).refresh(true);

response = copy.get();
expectedCount = client().prepareSearch("source_index").setQuery(termQuery("bar", 1)).get().getHits().getTotalHits().value();
assertThat(response, matcher().created(expectedCount));

// Test 3: Reindex with slices
String destIndexSliced = "dest_index_sliced";
assertAcked(prepareCreate(destIndexSliced).setSource(sourceIndexMapping, XContentType.JSON));

int slices = randomSlices();
int expectedSlices = expectedSliceStatuses(slices, "source_index");

copy = reindex().source("source_index").destination(destIndexSliced).setSlices(slices).refresh(true);

response = copy.get();
assertThat(response, matcher().created(numDocs).slices(hasSize(expectedSlices)));

// Test 4: Reindex with maxDocs
String destIndexMaxDocs = "dest_index_maxdocs";
assertAcked(prepareCreate(destIndexMaxDocs).setSource(sourceIndexMapping, XContentType.JSON));

int maxDocs = numDocs / 2;
copy = reindex().source("source_index").destination(destIndexMaxDocs).maxDocs(maxDocs).refresh(true);

response = copy.get();
assertThat(response, matcher().created(maxDocs));
expectedCount = client().prepareSearch(destIndexMaxDocs).setQuery(matchAllQuery()).get().getHits().getTotalHits().value();
assertEquals(maxDocs, expectedCount);

// Test 5: Multiple source indices
String sourceIndex2 = "source_index_2";
assertAcked(prepareCreate(sourceIndex2).setSource(sourceIndexMapping, XContentType.JSON));

int numDocs2 = randomIntBetween(5, 20);
List<IndexRequestBuilder> docs2 = new ArrayList<>();
for (int i = 0; i < numDocs2; i++) {
docs2.add(
client().prepareIndex(sourceIndex2).setId(Integer.toString(i + numDocs)).setSource("foo", "value2_" + i, "bar", i + numDocs)
);
}
indexRandom(true, docs2);

String destIndexMulti = "dest_index_multi";
assertAcked(prepareCreate(destIndexMulti).setSource(sourceIndexMapping, XContentType.JSON));

copy = reindex().source("source_index", "source_index_2").destination(destIndexMulti).refresh(true);

response = copy.get();
assertThat(response, matcher().created(numDocs + numDocs2));
expectedCount = client().prepareSearch(destIndexMulti).setQuery(matchAllQuery()).get().getHits().getTotalHits().value();
assertEquals(numDocs + numDocs2, expectedCount);
}

public void testReindexFromDerivedSourceToNormalIndex() throws Exception {
// Create source index with derived source enabled
String sourceMapping = """
{
"properties": {
"text_field": {
"type": "text",
"store": true
},
"keyword_field": {
"type": "keyword"
},
"numeric_field": {
"type": "long",
"doc_values": true
},
"date_field": {
"type": "date",
"store": true
}
}
}""";

// Create destination index with normal settings
String destMapping = """
{
"properties": {
"text_field": {
"type": "text"
},
"keyword_field": {
"type": "keyword"
},
"numeric_field": {
"type": "long"
},
"date_field": {
"type": "date"
}
}
}""";

// Create source index
assertAcked(
prepareCreate("source_index").setSettings(
Settings.builder().put("index.number_of_shards", 2).put("index.derived_source.enabled", true)
).setMapping(sourceMapping)
);

// Create destination index
assertAcked(prepareCreate("dest_index").setMapping(destMapping));

// Index test documents
int numDocs = randomIntBetween(100, 200);
final List<IndexRequestBuilder> docs = new ArrayList<>();
for (int i = 0; i < numDocs; i++) {
docs.add(
client().prepareIndex("source_index")
.setId(Integer.toString(i))
.setSource(
"text_field",
"text value " + i,
"keyword_field",
"key_" + i,
"numeric_field",
i,
"date_field",
System.currentTimeMillis()
)
);
}
indexRandom(true, docs);
refresh("source_index");

// Test 1: Basic reindex without slices
ReindexRequestBuilder reindex = reindex().source("source_index").destination("dest_index").refresh(true);
BulkByScrollResponse response = reindex.get();
assertThat(response, matcher().created(numDocs));
verifyReindexedContent("dest_index", numDocs);

// Test 2: Reindex with query filter
String destFilteredIndex = "dest_filtered_index";
assertAcked(prepareCreate(destFilteredIndex).setMapping(destMapping));
reindex = reindex().source("source_index").destination(destFilteredIndex).filter(termQuery("keyword_field", "key_1")).refresh(true);
response = reindex.get();
assertThat(response, matcher().created(1));
verifyReindexedContent(destFilteredIndex, 1);

// Test 3: Reindex with slices
String destSlicedIndex = "dest_sliced_index";
assertAcked(prepareCreate(destSlicedIndex).setMapping(destMapping));
int slices = randomSlices();
int expectedSlices = expectedSliceStatuses(slices, "source_index");

reindex = reindex().source("source_index").destination(destSlicedIndex).setSlices(slices).refresh(true);
response = reindex.get();
assertThat(response, matcher().created(numDocs).slices(hasSize(expectedSlices)));
verifyReindexedContent(destSlicedIndex, numDocs);

// Test 4: Reindex with field transformation
String destTransformedIndex = "dest_transformed_index";
String transformedMapping = """
{
"properties": {
"new_text_field": {
"type": "text"
},
"new_keyword_field": {
"type": "keyword"
},
"modified_numeric": {
"type": "long"
},
"date_field": {
"type": "date"
}
}
}""";
assertAcked(prepareCreate(destTransformedIndex).setMapping(transformedMapping));

// First reindex the documents
reindex = reindex().source("source_index").destination(destTransformedIndex).refresh(true);
response = reindex.get();
assertThat(response, matcher().created(numDocs));

// Then transform using bulk update
BulkRequestBuilder bulkRequest = client().prepareBulk();
SearchResponse searchResponse = client().prepareSearch(destTransformedIndex).setQuery(matchAllQuery()).setSize(numDocs).get();

for (SearchHit hit : searchResponse.getHits()) {
Map<String, Object> source = hit.getSourceAsMap();
Map<String, Object> newSource = new HashMap<>();

// Transform fields
newSource.put("new_text_field", source.get("text_field"));
newSource.put("new_keyword_field", source.get("keyword_field"));
newSource.put("modified_numeric", ((Number) source.get("numeric_field")).longValue() + 1000);
newSource.put("date_field", source.get("date_field"));

bulkRequest.add(client().prepareIndex(destTransformedIndex).setId(hit.getId()).setSource(newSource));
}

BulkResponse bulkResponse = bulkRequest.get();
assertFalse(bulkResponse.hasFailures());
refresh(destTransformedIndex);
verifyTransformedContent(destTransformedIndex, numDocs);
}

private void verifyReindexedContent(String indexName, int expectedCount) {
refresh(indexName);
SearchResponse searchResponse = client().prepareSearch(indexName)
.setQuery(matchAllQuery())
.setSize(expectedCount)
.addSort("numeric_field", SortOrder.ASC)
.get();

assertHitCount(searchResponse, expectedCount);

for (SearchHit hit : searchResponse.getHits()) {
Map<String, Object> source = hit.getSourceAsMap();
int id = Integer.parseInt(hit.getId());

assertEquals("text value " + id, source.get("text_field"));
assertEquals("key_" + id, source.get("keyword_field"));
assertEquals(id, ((Number) source.get("numeric_field")).intValue());
assertNotNull(source.get("date_field"));
}
}

private void verifyTransformedContent(String indexName, int expectedCount) {
refresh(indexName);
SearchResponse searchResponse = client().prepareSearch(indexName)
.setQuery(matchAllQuery())
.setSize(expectedCount)
.addSort("modified_numeric", SortOrder.ASC)
.get();

assertHitCount(searchResponse, expectedCount);

for (SearchHit hit : searchResponse.getHits()) {
Map<String, Object> source = hit.getSourceAsMap();
int id = Integer.parseInt(hit.getId());

assertEquals("text value " + id, source.get("new_text_field"));
assertEquals("key_" + id, source.get("new_keyword_field"));
assertEquals(id + 1000, ((Number) source.get("modified_numeric")).longValue());
assertNotNull(source.get("date_field"));
}
}
}
Loading
Loading