Skip to content

Conversation

@varunbharadwaj
Copy link
Contributor

@varunbharadwaj varunbharadwaj commented Apr 3, 2025

Description

  1. This PR adds multi-threaded writer support in the pull-based ingestion flow. The incoming message will be hashed by ID and written to one of the blocking queue partitions. A processor thread will be started to consume and process updates from each blocking queue partition. This thread will handoff the updates to the engine to update the index. Number of processor threads can be defined at the time of index creation by setting ingestion_source.num_processor_threads. If not set, a default value of 1 will be used.
  2. This PR also enhances the ingestion engine to support updates and deletes. Updates/deletes work in a multi-threaded environment since updates/deletes to the same document are sequentially processed.

Note that this PR adds the framework for multi-threaded writes, but does not allow users to turn on the feature. We'll allow users to use this feature once the feature is complete and we can support atleast/exactly once processing guarantees.

Related Issues

Builds on top of #16929

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@varunbharadwaj varunbharadwaj changed the title [Pull-based ingestion] Support multiple processor threads in pull-based ingestion [Pull-based ingestion] Support multiple-threaded writes in pull-based ingestion Apr 3, 2025
@varunbharadwaj varunbharadwaj changed the title [Pull-based ingestion] Support multiple-threaded writes in pull-based ingestion [Pull-based ingestion] Support multi-threaded writes in pull-based ingestion Apr 3, 2025
@github-actions
Copy link
Contributor

github-actions bot commented Apr 3, 2025

❌ Gradle check result for 5fccdaa: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Contributor

github-actions bot commented Apr 4, 2025

❌ Gradle check result for 95970b5: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@varunbharadwaj varunbharadwaj changed the title [Pull-based ingestion] Support multi-threaded ingestion along with upserts and deletes [WIP][Pull-based ingestion] Support multi-threaded ingestion along with upserts and deletes Apr 4, 2025
@varunbharadwaj varunbharadwaj changed the title [WIP][Pull-based ingestion] Support multi-threaded ingestion along with upserts and deletes [Pull-based ingestion] Support multi-threaded ingestion along with upserts and deletes Apr 4, 2025
@github-actions
Copy link
Contributor

github-actions bot commented Apr 4, 2025

❌ Gradle check result for 69cc527: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Contributor

github-actions bot commented Apr 4, 2025

❕ Gradle check result for 6330594: UNSTABLE

Please review all flaky tests that succeeded after retry and create an issue if one does not already exist to track the flaky failure.

@codecov
Copy link

codecov bot commented Apr 4, 2025

Codecov Report

Attention: Patch coverage is 77.08333% with 33 lines in your changes missing coverage. Please review.

Project coverage is 72.48%. Comparing base (2d42bd0) to head (a68cdf7).
Report is 9 commits behind head on main.

Files with missing lines Patch % Lines
...a/org/opensearch/index/engine/IngestionEngine.java 21.05% 13 Missing and 2 partials ⚠️
...ndices/pollingingest/MessageProcessorRunnable.java 54.16% 8 Missing and 3 partials ⚠️
...rch/indices/pollingingest/DefaultStreamPoller.java 81.25% 2 Missing and 1 partial ⚠️
...g/opensearch/cluster/metadata/IngestionSource.java 60.00% 0 Missing and 2 partials ⚠️
...llingingest/PartitionedBlockingQueueContainer.java 97.18% 1 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main   #17771      +/-   ##
============================================
+ Coverage     72.39%   72.48%   +0.09%     
- Complexity    66066    66306     +240     
============================================
  Files          5358     5387      +29     
  Lines        306500   307248     +748     
  Branches      44409    44563     +154     
============================================
+ Hits         221888   222708     +820     
+ Misses        66474    66332     -142     
- Partials      18138    18208      +70     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@github-actions
Copy link
Contributor

github-actions bot commented Apr 5, 2025

❌ Gradle check result for f726ea0: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Contributor

github-actions bot commented Apr 5, 2025

❌ Gradle check result for c23f437: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Contributor

github-actions bot commented Apr 5, 2025

❕ Gradle check result for a68cdf7: UNSTABLE

Please review all flaky tests that succeeded after retry and create an issue if one does not already exist to track the flaky failure.

}

encounteredError = false;
if (results.isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i wonder in the corner case that the last processed message in the last batch runs into err after line 215, and also there's no new message after this message, then will this logic result in rewinding to the previous batch start pointer?

because this condition results.isEmpty() becomes true, and in the next iteration, encounteredError becomes false, then it will assume the lastProcessedPointer did not run ito error?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants