Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions lucene/CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ API Changes

* GITHUB#15187: Restrict visibility of PerFieldKnnVectorsFormat.FieldsReader (Simon Cooper)

* GITHUB##15428: Add support for ExecutorServices to be passed into DirectoryReader.open() API (Bryce Kane)

New Features
---------------------
* GITHUB#14097: Binary partitioning merge policy over float-valued vector field. (Mike Sokolov)
Expand Down
80 changes: 77 additions & 3 deletions lucene/core/src/java/org/apache/lucene/index/DirectoryReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import org.apache.lucene.search.SearcherManager; // javadocs
import org.apache.lucene.store.Directory;

Expand Down Expand Up @@ -60,6 +61,18 @@ public static DirectoryReader open(final Directory directory) throws IOException
return StandardDirectoryReader.open(directory, null, null);
}

/**
* Returns a IndexReader reading the index in the given Directory
*
* @param directory the index directory
* @param executorService an executor service for processing the creation of segment readers
* @throws IOException if there is a low-level IO error
*/
public static DirectoryReader open(final Directory directory, ExecutorService executorService)
throws IOException {
return StandardDirectoryReader.open(directory, null, null, executorService);
}

/**
* Returns a IndexReader for the index in the given Directory
*
Expand All @@ -74,7 +87,27 @@ public static DirectoryReader open(final Directory directory) throws IOException
*/
public static DirectoryReader open(final Directory directory, Comparator<LeafReader> leafSorter)
throws IOException {
return StandardDirectoryReader.open(directory, null, leafSorter);
return StandardDirectoryReader.open(directory, null, leafSorter, null);
}

/**
* Returns a IndexReader for the index in the given Directory
*
* @param directory the index directory
* @param leafSorter a comparator for sorting leaf readers. Providing leafSorter is useful for
* indices on which it is expected to run many queries with particular sort criteria (e.g. for
* time-based indices this is usually a descending sort on timestamp). In this case {@code
* leafSorter} should sort leaves according to this sort criteria. Providing leafSorter allows
* to speed up this particular type of sort queries by early terminating while iterating
* through segments and segments' documents.
* @param executorService provies an executor service that will be utilized to intialize segment
* readers. Providing an executor is useful to parallelize
* @throws IOException if there is a low-level IO error
*/
public static DirectoryReader open(
final Directory directory, Comparator<LeafReader> leafSorter, ExecutorService executorService)
throws IOException {
return StandardDirectoryReader.open(directory, null, leafSorter, executorService);
}

/**
Expand Down Expand Up @@ -119,7 +152,19 @@ public static DirectoryReader open(
* @throws IOException if there is a low-level IO error
*/
public static DirectoryReader open(final IndexCommit commit) throws IOException {
return StandardDirectoryReader.open(commit.getDirectory(), commit, null);
return StandardDirectoryReader.open(commit.getDirectory(), commit, null, null);
}

/**
* Expert: returns an IndexReader reading the index in the given {@link IndexCommit}.
*
* @param commit the commit point to open
* @param executorService executor provided for intra-open parallelization
* @throws IOException if there is a low-level IO error
*/
public static DirectoryReader open(final IndexCommit commit, ExecutorService executorService)
throws IOException {
return StandardDirectoryReader.open(commit.getDirectory(), commit, null, executorService);
}

/**
Expand All @@ -144,7 +189,36 @@ public static DirectoryReader open(
final IndexCommit commit, int minSupportedMajorVersion, Comparator<LeafReader> leafSorter)
throws IOException {
return StandardDirectoryReader.open(
commit.getDirectory(), minSupportedMajorVersion, commit, leafSorter);
commit.getDirectory(), minSupportedMajorVersion, commit, leafSorter, null);
}

/**
* Expert: returns an IndexReader reading the index on the given {@link IndexCommit}. This method
* allows to open indices that were created with a Lucene version older than N-1 provided that all
* codecs for this index are available in the classpath and the segment file format used was
* created with Lucene 7 or newer. Users of this API must be aware that Lucene doesn't guarantee
* semantic compatibility for indices created with versions older than N-1. All backwards
* compatibility aside from the file format is optional and applied on a best effort basis.
*
* @param commit the commit point to open
* @param minSupportedMajorVersion the minimum supported major index version
* @param leafSorter a comparator for sorting leaf readers. Providing leafSorter is useful for
* indices on which it is expected to run many queries with particular sort criteria (e.g. for
* time-based indices, this is usually a descending sort on timestamp). In this case {@code
* leafSorter} should sort leaves according to this sort criteria. Providing leafSorter allows
* to speed up this particular type of sort queries by early terminating while iterating
* through segments and segments' documents
* @param executorService executor provided for intra-open parallelization
* @throws IOException if there is a low-level IO error
*/
public static DirectoryReader open(
final IndexCommit commit,
int minSupportedMajorVersion,
Comparator<LeafReader> leafSorter,
ExecutorService executorService)
throws IOException {
return StandardDirectoryReader.open(
commit.getDirectory(), minSupportedMajorVersion, commit, leafSorter, executorService);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
Expand Down Expand Up @@ -62,17 +65,21 @@ public final class StandardDirectoryReader extends DirectoryReader {
}

static DirectoryReader open(
final Directory directory, final IndexCommit commit, Comparator<LeafReader> leafSorter)
final Directory directory,
final IndexCommit commit,
Comparator<LeafReader> leafSorter,
ExecutorService executor)
throws IOException {
return open(directory, Version.MIN_SUPPORTED_MAJOR, commit, leafSorter);
return open(directory, Version.MIN_SUPPORTED_MAJOR, commit, leafSorter, executor);
}

/** called from DirectoryReader.open(...) methods */
static DirectoryReader open(
final Directory directory,
int minSupportedMajorVersion,
final IndexCommit commit,
Comparator<LeafReader> leafSorter)
Comparator<LeafReader> leafSorter,
ExecutorService executor)
throws IOException {
return new SegmentInfos.FindSegmentsFile<DirectoryReader>(directory) {
@Override
Expand All @@ -88,10 +95,37 @@ protected DirectoryReader doBody(String segmentFileName) throws IOException {
SegmentInfos.readCommit(directory, segmentFileName, minSupportedMajorVersion);
final SegmentReader[] readers = new SegmentReader[sis.size()];
try {
for (int i = sis.size() - 1; i >= 0; i--) {
readers[i] =
new SegmentReader(
sis.info(i), sis.getIndexCreatedVersionMajor(), IOContext.DEFAULT);
if (executor != null) {
List<Future<SegmentReader>> futures = new ArrayList<>();
for (int i = sis.size() - 1; i >= 0; i--) {
final int index = i;
// parallelize segment reader initialization
futures.add(
(executor)
.submit(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you considered using the existing TaskExecutor pattern and instead implementing this as such:

TaskExecutor taskExecutor = new TaskExecutor(executorService);  
List<Callable<SegmentReader>> tasks = new ArrayList<>(sis.size());  
        
for (int i = 0; i < sis.size(); i++) {  
    final int index = i;  
    tasks.add(() -> new SegmentReader(  
        sis.info(index), sis.getIndexCreatedVersionMajor(), IOContext.DEFAULT));  
}  

Based on the comments in that class, there are some optimizations we could inherit here:

// try to execute as many tasks as possible on the current thread to minimize context
// switching in case of long running concurrent
// tasks as well as dead-locking if the current thread is part of #executor for executors that
// have limited or no parallelism

() ->
new SegmentReader(
sis.info(index),
sis.getIndexCreatedVersionMajor(),
IOContext.DEFAULT)));
}
RuntimeException firstException = null;
for (int i = 0; i < futures.size(); i++) {
try {
readers[sis.size() - 1 - i] = futures.get(i).get();
} catch (ExecutionException | InterruptedException e) {
// If there is an exception creating the reader we still process
// the rest of the completed futures to allow us to close created readers
if (firstException == null) firstException = new RuntimeException(e);
}
}
if (firstException != null) throw firstException;
} else {
for (int i = sis.size() - 1; i >= 0; i--) {
readers[i] =
new SegmentReader(
sis.info(i), sis.getIndexCreatedVersionMajor(), IOContext.DEFAULT);
}
}
// This may throw CorruptIndexException if there are too many docs, so
// it must be inside try clause so we close readers in that case:
Expand Down
125 changes: 125 additions & 0 deletions lucene/core/src/test/org/apache/lucene/index/TestDirectoryReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,15 @@
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.FieldType;
Expand All @@ -49,6 +52,7 @@
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.NamedThreadFactory;
import org.apache.lucene.util.SuppressForbidden;
import org.apache.lucene.util.Version;
import org.junit.Assume;
Expand Down Expand Up @@ -1121,4 +1125,125 @@ public void testOpenWithInvalidMinCompatVersion() throws IOException {
DirectoryReader.open(commit, random().nextInt(Version.LATEST.major + 1), null).close();
}
}

public void testOpenWithExecutorService() throws IOException {
Directory dir = newDirectory();
createMultiSegmentIndex(dir, 5);

ExecutorService executor =
Executors.newFixedThreadPool(1, new NamedThreadFactory("TestDirectoryReader"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason to only test with 1 thread in these tests? Can we add more threads to test for edge cases and races, like only one thread sees an exception while opening a segment reader?

try {
DirectoryReader reader = DirectoryReader.open(dir, executor);
assertNotNull(reader);
assertTrue(reader.numDocs() > 0);
reader.close();
} finally {
executor.shutdown();
dir.close();
}
}

public void testOpenWithLeafSorterAndExecutor() throws IOException {
Directory dir = newDirectory();
createMultiSegmentIndex(dir, 3);

ExecutorService executor =
Executors.newFixedThreadPool(1, new NamedThreadFactory("TestDirectoryReader"));
Comparator<LeafReader> sorter = (r1, r2) -> Integer.compare(r1.numDocs(), r2.numDocs());

try {
DirectoryReader reader = DirectoryReader.open(dir, sorter, executor);
assertNotNull(reader);
reader.close();
} finally {
executor.shutdown();
dir.close();
}
}

public void testOpenCommitWithExecutor() throws IOException {
Directory dir = newDirectory();
IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig());
Document doc = new Document();
doc.add(newStringField("id", "1", Field.Store.YES));
writer.addDocument(doc);
writer.commit();
writer.close();

IndexCommit commit = DirectoryReader.listCommits(dir).get(0);
ExecutorService executor =
Executors.newFixedThreadPool(1, new NamedThreadFactory("TestDirectoryReader"));
try {
DirectoryReader reader = DirectoryReader.open(commit, executor);
assertNotNull(reader);
assertEquals(1, reader.numDocs());
reader.close();
} finally {
executor.shutdown();
dir.close();
}
}

public void testOpenCommitWithMinVersionAndExecutor() throws IOException {
Directory dir = newDirectory();
createMultiSegmentIndex(dir, 3);

ExecutorService executor =
Executors.newFixedThreadPool(1, new NamedThreadFactory("TestDirectoryReader"));
Comparator<LeafReader> sorter = (r1, r2) -> Integer.compare(r1.numDocs(), r2.numDocs());

try (DirectoryReader baseReader = DirectoryReader.open(dir)) {
IndexCommit commit = baseReader.getIndexCommit();
DirectoryReader reader =
DirectoryReader.open(commit, Version.MIN_SUPPORTED_MAJOR, sorter, executor);
assertNotNull(reader);
assertEquals(baseReader.numDocs(), reader.numDocs());
reader.close();
} finally {
executor.shutdown();
dir.close();
}
}

public void testNullExecutorService() throws IOException {
Directory dir = newDirectory();
createMultiSegmentIndex(dir, 3);

// Should work fine with null executor (fallback to sequential)
DirectoryReader reader = DirectoryReader.open(dir, (ExecutorService) null);
assertNotNull(reader);
assertTrue(reader.numDocs() > 0);
reader.close();
dir.close();
}

public void testExecutorServiceExceptionHandling() throws IOException {
Directory dir = newDirectory();
createMultiSegmentIndex(dir, 3);

// Test with shutdown executor
ExecutorService executor =
Executors.newFixedThreadPool(1, new NamedThreadFactory("TestDirectoryReader"));
executor.shutdown();

expectThrows(
RuntimeException.class,
() -> {
DirectoryReader.open(dir, executor);
});

dir.close();
}

private void createMultiSegmentIndex(Directory dir, int numSegments) throws IOException {
IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig().setMaxBufferedDocs(2));
for (int i = 0; i < numSegments; i++) {
Document doc = new Document();
doc.add(newStringField("id", String.valueOf(i), Field.Store.YES));
doc.add(newTextField("content", "segment " + i + " content", Field.Store.YES));
writer.addDocument(doc);
writer.commit(); // Force new segment
}
writer.close();
}
}