- 
                Notifications
    You must be signed in to change notification settings 
- Fork 503
feat(metadata): extract stream range index by lazy load StreamSetObject #2710
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This pull request introduces lazy loading of StreamSetObject metadata with range indexing to optimize object retrieval performance. It adds a new indexing system that helps efficiently search for objects based on stream IDs and offsets.
- Adds StreamSetObjectRangeIndexfor mapping (streamId, startOffset) → objectId relationships
- Implements StreamIdBloomFilterto optimize stream presence checks in stream set objects
- Introduces incremental loading of stream set objects (5 at a time) with preloading strategies
Reviewed Changes
Copilot reviewed 5 out of 6 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description | 
|---|---|
| StreamSetObjectRangeIndex.java | New index class providing lazy loading and caching of stream set object range mappings | 
| S3StreamsMetadataImage.java | Updated getObjects implementation with incremental loading, preloading, and comprehensive debug context | 
| S3StreamsMetadataImageTest.java | Updated test interface methods to match new RangeGetter signature | 
| S3StreamsMetadataImageTest.java (core) | New comprehensive test suite for validating getObjects behavior with generated metadata | 
| StreamMetadataManager.java | Added bloom filter implementation and updated range getter with indexing support | 
Comments suppressed due to low confidence (1)
metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java:445
- The calculation 'newStartOffset + r.nextLong(0, newStartOffset)' will result in endOffset being between newStartOffset and 2*newStartOffset, but this seems incorrect as endOffset should typically be greater than startOffset. The bound should likely be a fixed range or use a different calculation.
        if (objects.size() > ctx.limit) {
| public static final ExecutorService UPDATE_INDEX_THREAD_POOL = Executors.newSingleThreadExecutor( | ||
| ThreadUtils.createThreadFactory("StreamSetObjectRangeIndex", true)); | ||
|  | ||
| private static final Object DUMMAY_OBJECT = new Object(); | 
    
      
    
      Copilot
AI
    
    
    
      Aug 8, 2025 
    
  
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The constant name has a typo: 'DUMMAY_OBJECT' should be 'DUMMY_OBJECT'.
| private static final Object DUMMAY_OBJECT = new Object(); | |
| private static final Object DUMMY_OBJECT = new Object(); | 
|  | ||
| public static StreamSetObjectRangeIndex getInstance() { | ||
| if (instance == null) { | ||
| synchronized (NodeRangeIndexCache.class) { | 
    
      
    
      Copilot
AI
    
    
    
      Aug 8, 2025 
    
  
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Synchronizing on NodeRangeIndexCache.class is incorrect for StreamSetObjectRangeIndex singleton initialization. This should synchronize on StreamSetObjectRangeIndex.class instead.
| synchronized (NodeRangeIndexCache.class) { | |
| synchronized (StreamSetObjectRangeIndex.class) { | 
|  | ||
| public void touch(Long streamId) { | ||
| try { | ||
| expireCache.get(streamId, () -> DUMMAY_OBJECT); | 
    
      
    
      Copilot
AI
    
    
    
      Aug 8, 2025 
    
  
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reference to misspelled constant 'DUMMAY_OBJECT' should be 'DUMMY_OBJECT'.
| expireCache.get(streamId, () -> DUMMAY_OBJECT); | |
| expireCache.get(streamId, () -> DUMMY_OBJECT); | 
| } | ||
|  | ||
| withLock(streamId, () -> { | ||
| longLongTreeMap.remove(startOffset, objectId); | 
    
      
    
      Copilot
AI
    
    
    
      Aug 8, 2025 
    
  
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The TreeMap.remove(key, value) method removes the mapping only if the key maps to the specified value. This should likely be just remove(startOffset) to remove the mapping for the key, regardless of the current value.
| longLongTreeMap.remove(startOffset, objectId); | |
| longLongTreeMap.remove(startOffset); | 
|  | ||
| // update streamBloomFilter | ||
| Set<Long> sets = Sets.difference(this.streamSetObjectIds, streamSetObjectIds); | ||
| sets.forEach(STREAM_ID_BLOOM_FILTER::removeObject); | 
    
      
    
      Copilot
AI
    
    
    
      Aug 8, 2025 
    
  
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The variable name 'sets' is misleading as it contains the difference between two sets (newly added objects). It should be called 'removedObjects' or similar to clarify that these are objects being removed from the bloom filter.
| sets.forEach(STREAM_ID_BLOOM_FILTER::removeObject); | |
| Set<Long> removedStreamSetObjectIds = Sets.difference(this.streamSetObjectIds, streamSetObjectIds); | |
| removedStreamSetObjectIds.forEach(STREAM_ID_BLOOM_FILTER::removeObject); | 
| // retry all pending tasks | ||
| retryPendingTasks(); | ||
| this.indexCache.asyncPrune(this::getStreamSetObjectIds); | ||
| this.indexCache.asyncPrune(() -> streamSetObjectIds); | 
    
      
    
      Copilot
AI
    
    
    
      Aug 8, 2025 
    
  
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The lambda captures the local variable 'streamSetObjectIds' which refers to the old set of objects. This should capture 'this.streamSetObjectIds' to use the updated set of stream set object IDs.
| this.indexCache.asyncPrune(() -> streamSetObjectIds); | |
| this.indexCache.asyncPrune(() -> this.streamSetObjectIds); | 
| this.metadataImage = newImage; | ||
| changedStreams = delta.getOrCreateStreamsMetadataDelta().changedStreams(); | ||
| } | ||
| this.streamSetObjectIds = Collections.unmodifiableSet(getStreamSetObjectIds()); | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#getStreamSetObjectIds only returns the current node's SSO. I think that STREAM_ID_BLOOM_FILTER#removeObject is expected to remove all deleted SSO.
| } | ||
| } | ||
|  | ||
| public static class DefaultRangeGetter implements S3StreamsMetadataImage.RangeGetter { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The class could be extracted to a separated file.
| }); | ||
| } | ||
|  | ||
| public CompletableFuture<Long> searchObjectId(int nodeId, long streamId, long startOffset) { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The nodeId is unused in here.
| public CompletableFuture<InRangeObjects> getObjects(long streamId, long startOffset, long endOffset, int limit, | ||
| RangeGetter rangeGetter) { | ||
| return getObjects(streamId, startOffset, endOffset, limit, rangeGetter, null); | ||
| return getObjects(streamId, startOffset, endOffset, limit, rangeGetter, LocalStreamRangeIndexCache.create()); | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we still need LocalStreamRangeIndexCache? I think we could replace it with StreamSetObjectRangeIndex#putIndex when the current node've uploaded a new SSO.
| return ctx.cf; | ||
| } | ||
|  | ||
| private boolean readEndOffset(long streamId, long endOffset) { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- If it's a Kafka-Level read, the endOffset will be the streamEndOffset.
- If it's a readhead read, the endOffset wll be -1.
So both cases are readEndOffset == true.
add
StreamIdBloomFilter, help search objectId -> streamIdadd
StreamSetObjectRangeIndexhelp search (streamId, startOffset) -> objectIdwhen read sso from ObjectReader update the
StreamSetObjectRangeIndexandStreamIdBloomFiltergetObjects support load the streamsetobjects by steps (5 sso each time)
support preload sso to help index not found case.
add
S3StreamsMetadataImageTestwhich generate the image and compare if getObjects return right result.