- 
          
- 
                Notifications
    You must be signed in to change notification settings 
- Fork 816
feat: added filter aggregation #2711
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
…r` + Added tests
| 
 Should be fixed now (after rebase) | 
        
          
                benches/agg_bench.rs
              
                Outdated
          
        
      |  | ||
| // Filter aggregation benchmarks | ||
|  | ||
| fn filter_agg_all_query(index: &Index) { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| fn filter_agg_all_query(index: &Index) { | |
| fn filter_agg_all_query_count_agg(index: &Index) { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
        
          
                benches/agg_bench.rs
              
                Outdated
          
        
      | execute_agg(index, agg_req); | ||
| } | ||
|  | ||
| fn filter_agg_term_query(index: &Index) { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| fn filter_agg_term_query(index: &Index) { | |
| fn filter_agg_term_query_count_agg(index: &Index) { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
        
          
                benches/agg_bench.rs
              
                Outdated
          
        
      | execute_agg(index, agg_req); | ||
| } | ||
|  | ||
| fn filter_agg_all_query_with_sub_agg(index: &Index) { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| fn filter_agg_all_query_with_sub_agg(index: &Index) { | |
| fn filter_agg_all_query_with_sub_aggs(index: &Index) { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
        
          
                benches/agg_bench.rs
              
                Outdated
          
        
      | execute_agg(index, agg_req); | ||
| } | ||
|  | ||
| fn filter_agg_term_query_with_sub_agg(index: &Index) { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| fn filter_agg_term_query_with_sub_agg(index: &Index) { | |
| fn filter_agg_term_query_with_sub_aggs(index: &Index) { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@PSeitz-dd Thanks for the comments. I also noticed there was a bug and fixed it in 94bdd5d
        
          
                benches/agg_bench.rs
              
                Outdated
          
        
      |  | ||
| // Filter aggregation benchmarks | ||
|  | ||
| fn filter_agg_all_query(index: &Index) { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
        
          
                benches/agg_bench.rs
              
                Outdated
          
        
      | execute_agg(index, agg_req); | ||
| } | ||
|  | ||
| fn filter_agg_term_query(index: &Index) { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
        
          
                benches/agg_bench.rs
              
                Outdated
          
        
      | execute_agg(index, agg_req); | ||
| } | ||
|  | ||
| fn filter_agg_all_query_with_sub_agg(index: &Index) { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
        
          
                benches/agg_bench.rs
              
                Outdated
          
        
      | execute_agg(index, agg_req); | ||
| } | ||
|  | ||
| fn filter_agg_term_query_with_sub_agg(index: &Index) { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
        
          
                src/aggregation/bucket/filter.rs
              
                Outdated
          
        
      | fn parse_query(&self, schema: &Schema) -> crate::Result<Box<dyn Query>> { | ||
| match &self.query { | ||
| FilterQuery::QueryString(query_str) => { | ||
| let tokenizer_manager = TokenizerManager::default(); | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default tokenizer manager will fail for any fields with custom tokenizers. We'll need a mechanism to pass the TokenizerManager in there.
Probably the same way we pass the aggregations limits, we could put them both in a AggContextParams struct or similar.
    fn for_segment(
        &self,
        segment_local_id: crate::SegmentOrdinal,
        reader: &crate::SegmentReader,
    ) -> crate::Result<Self::Child> {
        AggregationSegmentCollector::from_agg_req_and_reader(
            &self.agg,
            reader,
            segment_local_id,
            &self.limits,
        )
    }There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for catching this. Used the default and forgot to pipe it through. Fixed it.
| } | ||
|  | ||
| #[test] | ||
| pub fn test_set_default_field_integer() { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this was removed by accident
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops. Yeah. Fixed it.
        
          
                tests/filter_aggregation.rs
              
                Outdated
          
        
      | @@ -0,0 +1,1013 @@ | |||
| //! Test suite for Filter Aggregation | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you move this to the filter aggregation implementation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
6fa68d6    to
    42c9935      
    Compare
  
    There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@PSeitz Thanks for the comments. Please take another look.
| } | ||
|  | ||
| #[test] | ||
| pub fn test_set_default_field_integer() { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops. Yeah. Fixed it.
        
          
                tests/filter_aggregation.rs
              
                Outdated
          
        
      | @@ -0,0 +1,1013 @@ | |||
| //! Test suite for Filter Aggregation | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
        
          
                src/aggregation/bucket/filter.rs
              
                Outdated
          
        
      | fn parse_query(&self, schema: &Schema) -> crate::Result<Box<dyn Query>> { | ||
| match &self.query { | ||
| FilterQuery::QueryString(query_str) => { | ||
| let tokenizer_manager = TokenizerManager::default(); | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for catching this. Used the default and forgot to pipe it through. Fixed it.
| /// Get the fast field names used by this aggregation (none for filter aggregation) | ||
| pub fn get_fast_field_names(&self) -> Vec<&str> { | ||
| // Filter aggregation doesn't use fast fields directly | ||
| vec![] | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added further comments. IMO, it should be fixed with a broader change in a follow-up PR.
| /// - Extension query types | ||
| /// | ||
| /// Note: This variant cannot be serialized to JSON (only QueryString can be serialized) | ||
| CustomQuery(Box<dyn SerializableQuery>), | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we use SerializableQuery when the query cannot be serialized?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a query constructor would be more suitable here, than de/serializing runtime objects, which may carry state.
| // | ||
| // This limitation exists because: | ||
| // - Query::weight() is called during execution, not during planning | ||
| // - The fallback decision is made per-segment based on field configuration | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the decision depends on the schema, which is not segment specific
Ticket(s) Closed
What
Implements filter aggregation support in Tantivy, enabling multiple filtered aggregations in a single query.
Why
Currently, there's no way to compute aggregations on different filtered subsets of documents in a single query. Users must run separate queries for each filter, which is slow and inefficient. For example, computing "average price overall + average price for t-shirts + count of electronics" requires three separate queries.
Elasticsearch's filter aggregation solves this by creating a single bucket containing documents matching a query, with support for nested sub-aggregations. This is a common analytics pattern that Tantivy now supports!
How
Added a new
FilterAggregationbucket aggregation type that:QueryParser) and directQueryobjects for custom query typesDocumentQueryEvaluatorto evaluate filter queries per-document during aggregation collection, avoiding separate query executionsSegmentReaderreferences, enabling filter aggregations to create query weights and scorers per segmentSome Implementation Details:
FilterAggregationsupports two modes:FilterQuery::QueryString: Parsed using Tantivy's standardQueryParserFilterQuery::Direct: AcceptsBox<dyn Query>for custom query extensionsFilterSegmentCollectorevaluates the filter query on each document collected by the main querydoc_countand flattened sub-aggregation resultsTests
Test suite with 20 tests covering:
All tests use the
assert_agg_results!macro for clean, consistent result validation with floating-point tolerance.