Skip to content

Commit f45bedf

Browse files
committed
Adding integration of derived source feature across diff paths
Signed-off-by: Tanik Pansuriya <[email protected]>
1 parent 2f13824 commit f45bedf

32 files changed

+2335
-48
lines changed

modules/reindex/src/test/java/org/opensearch/index/reindex/ReindexBasicTests.java

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
package org.opensearch.index.reindex;
3434

3535
import org.opensearch.action.index.IndexRequestBuilder;
36+
import org.opensearch.common.xcontent.XContentType;
3637

3738
import java.util.ArrayList;
3839
import java.util.Collection;
@@ -41,7 +42,9 @@
4142
import java.util.Map;
4243
import java.util.stream.Collectors;
4344

45+
import static org.opensearch.index.query.QueryBuilders.matchAllQuery;
4446
import static org.opensearch.index.query.QueryBuilders.termQuery;
47+
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
4548
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
4649
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
4750
import static org.hamcrest.Matchers.hasSize;
@@ -177,4 +180,111 @@ public void testMissingSources() {
177180
assertThat(response, matcher().created(0).slices(hasSize(0)));
178181
}
179182

183+
public void testReindexWithDerivedSource() throws Exception {
184+
// Create source index with _source option set as derived
185+
String sourceIndexMapping = """
186+
{
187+
"settings": {
188+
"index": {
189+
"number_of_shards": 1,
190+
"number_of_replicas": 0
191+
}
192+
},
193+
"mappings": {
194+
"_doc": {
195+
"_source": {
196+
"enabled": "derived"
197+
},
198+
"properties": {
199+
"foo": {
200+
"type": "keyword",
201+
"store": true
202+
},
203+
"bar": {
204+
"type": "integer",
205+
"store": true
206+
}
207+
}
208+
}
209+
}
210+
}""";
211+
212+
// Create indices
213+
assertAcked(prepareCreate("source_index").setSource(sourceIndexMapping, XContentType.JSON));
214+
assertAcked(prepareCreate("dest_index").setSource(sourceIndexMapping, XContentType.JSON));
215+
ensureGreen();
216+
217+
// Index some documents
218+
int numDocs = randomIntBetween(5, 20);
219+
List<IndexRequestBuilder> docs = new ArrayList<>();
220+
for (int i = 0; i < numDocs; i++) {
221+
docs.add(client().prepareIndex("source_index").setId(Integer.toString(i)).setSource("foo", "value_" + i, "bar", i));
222+
}
223+
indexRandom(true, docs);
224+
225+
// Test 1: Basic reindex
226+
ReindexRequestBuilder copy = reindex().source("source_index").destination("dest_index").refresh(true);
227+
228+
BulkByScrollResponse response = copy.get();
229+
assertThat(response, matcher().created(numDocs));
230+
long expectedCount = client().prepareSearch("dest_index").setQuery(matchAllQuery()).get().getHits().getTotalHits().value();
231+
assertEquals(numDocs, expectedCount);
232+
233+
// Test 2: Reindex with query filter
234+
String destIndexFiltered = "dest_index_filtered";
235+
assertAcked(prepareCreate(destIndexFiltered).setSource(sourceIndexMapping, XContentType.JSON));
236+
237+
copy = reindex().source("source_index").destination(destIndexFiltered).filter(termQuery("bar", 1)).refresh(true);
238+
239+
response = copy.get();
240+
expectedCount = client().prepareSearch("source_index").setQuery(termQuery("bar", 1)).get().getHits().getTotalHits().value();
241+
assertThat(response, matcher().created(expectedCount));
242+
243+
// Test 3: Reindex with slices
244+
String destIndexSliced = "dest_index_sliced";
245+
assertAcked(prepareCreate(destIndexSliced).setSource(sourceIndexMapping, XContentType.JSON));
246+
247+
int slices = randomSlices();
248+
int expectedSlices = expectedSliceStatuses(slices, "source_index");
249+
250+
copy = reindex().source("source_index").destination(destIndexSliced).setSlices(slices).refresh(true);
251+
252+
response = copy.get();
253+
assertThat(response, matcher().created(numDocs).slices(hasSize(expectedSlices)));
254+
255+
// Test 4: Reindex with maxDocs
256+
String destIndexMaxDocs = "dest_index_maxdocs";
257+
assertAcked(prepareCreate(destIndexMaxDocs).setSource(sourceIndexMapping, XContentType.JSON));
258+
259+
int maxDocs = numDocs / 2;
260+
copy = reindex().source("source_index").destination(destIndexMaxDocs).maxDocs(maxDocs).refresh(true);
261+
262+
response = copy.get();
263+
assertThat(response, matcher().created(maxDocs));
264+
expectedCount = client().prepareSearch(destIndexMaxDocs).setQuery(matchAllQuery()).get().getHits().getTotalHits().value();
265+
assertEquals(maxDocs, expectedCount);
266+
267+
// Test 5: Multiple source indices
268+
String sourceIndex2 = "source_index_2";
269+
assertAcked(prepareCreate(sourceIndex2).setSource(sourceIndexMapping, XContentType.JSON));
270+
271+
int numDocs2 = randomIntBetween(5, 20);
272+
List<IndexRequestBuilder> docs2 = new ArrayList<>();
273+
for (int i = 0; i < numDocs2; i++) {
274+
docs2.add(
275+
client().prepareIndex(sourceIndex2).setId(Integer.toString(i + numDocs)).setSource("foo", "value2_" + i, "bar", i + numDocs)
276+
);
277+
}
278+
indexRandom(true, docs2);
279+
280+
String destIndexMulti = "dest_index_multi";
281+
assertAcked(prepareCreate(destIndexMulti).setSource(sourceIndexMapping, XContentType.JSON));
282+
283+
copy = reindex().source("source_index", "source_index_2").destination(destIndexMulti).refresh(true);
284+
285+
response = copy.get();
286+
assertThat(response, matcher().created(numDocs + numDocs2));
287+
expectedCount = client().prepareSearch(destIndexMulti).setQuery(matchAllQuery()).get().getHits().getTotalHits().value();
288+
assertEquals(numDocs + numDocs2, expectedCount);
289+
}
180290
}

server/src/internalClusterTest/java/org/opensearch/get/GetActionIT.java

Lines changed: 212 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@
5151
import org.opensearch.core.common.bytes.BytesReference;
5252
import org.opensearch.core.rest.RestStatus;
5353
import org.opensearch.core.xcontent.MediaTypeRegistry;
54+
import org.opensearch.core.xcontent.XContentBuilder;
55+
import org.opensearch.geometry.utils.Geohash;
5456
import org.opensearch.index.engine.VersionConflictEngineException;
5557
import org.opensearch.plugins.Plugin;
5658
import org.opensearch.test.InternalSettingsPlugin;
@@ -60,11 +62,14 @@
6062
import java.util.Collection;
6163
import java.util.Collections;
6264
import java.util.HashSet;
65+
import java.util.List;
66+
import java.util.Map;
6367
import java.util.Set;
6468

6569
import static java.util.Collections.singleton;
6670
import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder;
6771
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
72+
import static org.hamcrest.Matchers.containsInRelativeOrder;
6873
import static org.hamcrest.Matchers.endsWith;
6974
import static org.hamcrest.Matchers.equalTo;
7075
import static org.hamcrest.Matchers.hasKey;
@@ -784,6 +789,213 @@ public void testGeneratedStringFieldsStored() throws IOException {
784789
assertGetFieldsNull(indexOrAlias(), "_doc", "1", alwaysNotStoredFieldsList);
785790
}
786791

792+
public void testDerivedSourceSimple() throws IOException {
793+
// Create index with _source option as derived source
794+
String createIndexSource = """
795+
{
796+
"settings": {
797+
"index": {
798+
"number_of_shards": 2,
799+
"number_of_replicas": 0
800+
}
801+
},
802+
"mappings": {
803+
"_doc": {
804+
"_source": {
805+
"enabled": "derived"
806+
},
807+
"properties": {
808+
"geopoint_field": {
809+
"type": "geo_point"
810+
},
811+
"keyword_field": {
812+
"type": "keyword"
813+
},
814+
"numeric_field": {
815+
"type": "long"
816+
},
817+
"date_field": {
818+
"type": "date"
819+
},
820+
"bool_field": {
821+
"type": "boolean"
822+
},
823+
"text_field": {
824+
"type": "text",
825+
"store": true
826+
},
827+
"ip_field": {
828+
"type": "ip"
829+
}
830+
}
831+
}
832+
}
833+
}""";
834+
835+
assertAcked(prepareCreate("test_derive").setSource(createIndexSource, MediaTypeRegistry.JSON));
836+
ensureGreen();
837+
838+
// Index a document with various field types
839+
client().prepareIndex("test_derive")
840+
.setId("1")
841+
.setSource(
842+
jsonBuilder().startObject()
843+
.field("geopoint_field", Geohash.stringEncode(40.33, 75.98))
844+
.field("keyword_field", "test_keyword")
845+
.field("numeric_field", 123)
846+
.field("date_field", "2023-01-01")
847+
.field("bool_field", true)
848+
.field("text_field", "test text")
849+
.field("ip_field", "1.2.3.4")
850+
.endObject()
851+
)
852+
.get();
853+
854+
// before refresh - document is only in translog
855+
GetResponse getResponse = client().prepareGet("test_derive", "1").get();
856+
assertTrue(getResponse.isExists());
857+
Map<String, Object> source = getResponse.getSourceAsMap();
858+
assertNotNull("Derived source should not be null", source);
859+
validateDeriveSource(source);
860+
861+
refresh();
862+
// after refresh - document is in translog and also indexed
863+
getResponse = client().prepareGet("test_derive", "1").get();
864+
assertTrue(getResponse.isExists());
865+
source = getResponse.getSourceAsMap();
866+
assertNotNull("Derived source should not be null", source);
867+
validateDeriveSource(source);
868+
869+
flush();
870+
// after flush - document is in not anymore translog - only indexed
871+
getResponse = client().prepareGet("test_derive", "1").get();
872+
assertTrue(getResponse.isExists());
873+
source = getResponse.getSourceAsMap();
874+
assertNotNull("Derived source should not be null", source);
875+
validateDeriveSource(source);
876+
877+
// Test get with selective field inclusion
878+
getResponse = client().prepareGet("test_derive", "1").setFetchSource(new String[] { "keyword_field", "numeric_field" }, null).get();
879+
assertTrue(getResponse.isExists());
880+
source = getResponse.getSourceAsMap();
881+
assertEquals(2, source.size());
882+
assertEquals("test_keyword", source.get("keyword_field"));
883+
assertEquals(123, source.get("numeric_field"));
884+
885+
// Test get with field exclusion
886+
getResponse = client().prepareGet("test_derive", "1").setFetchSource(null, new String[] { "text_field", "date_field" }).get();
887+
assertTrue(getResponse.isExists());
888+
source = getResponse.getSourceAsMap();
889+
assertEquals(5, source.size());
890+
assertFalse(source.containsKey("text_field"));
891+
assertFalse(source.containsKey("date_field"));
892+
}
893+
894+
public void testDerivedSource_MultiValuesAndComplexField() throws Exception {
895+
// Create mapping with properly closed objects
896+
String mapping = XContentFactory.jsonBuilder()
897+
.startObject()
898+
.startObject("_source")
899+
.field("enabled", "derived")
900+
.endObject()
901+
.startObject("properties")
902+
.startObject("level1")
903+
.startObject("properties")
904+
.startObject("level2")
905+
.startObject("properties")
906+
.startObject("level3")
907+
.startObject("properties")
908+
.startObject("num_field")
909+
.field("type", "integer")
910+
.endObject()
911+
.startObject("ip_field")
912+
.field("type", "ip")
913+
.endObject()
914+
.endObject()
915+
.endObject()
916+
.endObject()
917+
.endObject()
918+
.endObject()
919+
.endObject()
920+
.endObject()
921+
.endObject()
922+
.toString();
923+
924+
// Create index with settings and mapping
925+
assertAcked(
926+
prepareCreate("test_derive").setSettings(Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0))
927+
.setMapping(mapping)
928+
);
929+
ensureGreen();
930+
931+
// Create source document
932+
XContentBuilder sourceBuilder = jsonBuilder().startObject()
933+
.startArray("level1")
934+
.startObject()
935+
.startObject("level2")
936+
.startArray("level3")
937+
.startObject()
938+
.startArray("num_field")
939+
.value(2)
940+
.value(1)
941+
.value(1)
942+
.endArray()
943+
.endObject()
944+
.endArray()
945+
.endObject()
946+
.endObject()
947+
.startObject()
948+
.startObject("level2")
949+
.startArray("level3")
950+
.startObject()
951+
.startArray("ip_field")
952+
.value("1.2.3.4")
953+
.value("2.3.4.5")
954+
.value("1.2.3.4")
955+
.endArray()
956+
.endObject()
957+
.endArray()
958+
.endObject()
959+
.endObject()
960+
.endArray()
961+
.endObject();
962+
963+
// Index the document
964+
IndexResponse indexResponse = client().prepareIndex("test_derive").setId("1").setSource(sourceBuilder).get();
965+
assertThat(indexResponse.status(), equalTo(RestStatus.CREATED));
966+
967+
refresh();
968+
969+
// Test numeric field retrieval
970+
GetResponse getResponse = client().prepareGet("test_derive", "1").get();
971+
assertThat(getResponse.isExists(), equalTo(true));
972+
Map<String, Object> source = getResponse.getSourceAsMap();
973+
Map<String, Object> level1 = (Map<String, Object>) source.get("level1");
974+
Map<String, Object> level2 = (Map<String, Object>) level1.get("level2");
975+
Map<String, Object> level3 = (Map<String, Object>) level2.get("level3");
976+
List<Object> numValues = (List<Object>) level3.get("num_field");
977+
assertThat(numValues.size(), equalTo(3));
978+
// Number field is stored as Sorted Numeric, so result should be in sorted order
979+
assertThat(numValues, containsInRelativeOrder(1, 1, 2));
980+
981+
List<Object> ipValues = (List<Object>) level3.get("ip_field");
982+
assertThat(ipValues.size(), equalTo(2));
983+
// Ip field is stored as Sorted Set, so duplicates should be removed and result should be in sorted order
984+
assertThat(ipValues, containsInRelativeOrder("1.2.3.4", "2.3.4.5"));
985+
}
986+
987+
void validateDeriveSource(Map<String, Object> source) {
988+
Map<String, Object> latLon = (Map<String, Object>) source.get("geopoint_field");
989+
assertEquals(75.98, (Double) latLon.get("lat"), 0.001);
990+
assertEquals(40.33, (Double) latLon.get("lon"), 0.001);
991+
assertEquals("test_keyword", source.get("keyword_field"));
992+
assertEquals(123, source.get("numeric_field"));
993+
assertEquals("2023-01-01T00:00:00.000Z", source.get("date_field"));
994+
assertEquals(true, source.get("bool_field"));
995+
assertEquals("test text", source.get("text_field"));
996+
assertEquals("1.2.3.4", source.get("ip_field"));
997+
}
998+
787999
void indexSingleDocumentWithStringFieldsGeneratedFromText(boolean stored, boolean sourceEnabled) {
7881000

7891001
String storedString = stored ? "true" : "false";

0 commit comments

Comments
 (0)