Skip to content

Conversation

@ashwanthgoli
Copy link
Contributor

@ashwanthgoli ashwanthgoli commented Nov 11, 2025

What this PR does / why we need it:

Introduces xcap pkg to capture statistics from query execution.

  • [Region] tracks individual events similar to OpenTelemetry span. A region is created per execution node.
  • [Capture] is a collection of regions, each task starts a new capture.

Captures can be serialised which allows merging captures from all the tasks and summarising it in the scheduler.
Following is a physical plan enriched with observations from the capture

VectorAggregation operation=sum group_by=(ambiguous.level) id=VectorAggregation-01K9RWCQAJ1NZ0Y3XFKNEDRXED
	│   ├── @metrics exec_duration_ns=464604459 read_calls=2 rows_out=5760
	└── RangeAggregation operation=sum start=2024-01-01T00:00:00Z end=2024-01-02T00:00:00Z step=1m0s range=1m0s partition_by=(ambiguous.level) id=RangeAggregation-01K9RWCQAJ1NZ0Y3XFKR0NEZAG
	    │   ├── @metrics read_calls=2 rows_out=5760 exec_duration_ns=463113417
	    └── Parallelize id=Parallelize-01K9RWCQAWJWWJQG3QZB2DYKDF
	        └── Projection all=true expand=(CAST_FLOAT(ambiguous.rows_affected)) id=Projection-01K9RWCQAX1G78WH3MVN3Z0E7K
	            │   ├── @metrics rows_out=5162 exec_duration_ns=26294462 read_calls=47
	            └── Compat src=parsed dst=parsed collision=label id=Compat-01K9RWCQAX1G78WH3MVHBWVGN7
	                │   ├── @metrics read_calls=47 rows_out=5162 exec_duration_ns=25813832
	                └── Projection all=true expand=(PARSE_JSON(builtin.message, [level, rows_affected], false, false)) id=Projection-01K9RWCQAX1G78WH3MVDZVJC36
	                    │   ├── @metrics read_calls=47 rows_out=5162 exec_duration_ns=25740249
	                    └── Compat src=metadata dst=metadata collision=label id=Compat-01K9RWCQAWJWWJQG3R4M473TYP
	                        │   ├── @metrics read_calls=47 rows_out=5162 exec_duration_ns=19526086
	                        └── ScanSet num_targets=88 projections=(ambiguous.level, builtin.message, ambiguous.rows_affected, builtin.timestamp) predicate[0]=GTE(builtin.timestamp, 2023-12-31T23:59:00Z) predicate[1]=LT(builtin.timestamp, 2024-01-02T00:00:00Z) id=ScanSet-01K9RWCQAWJWWJQG3QZDZ3N5N3
	                            ├── DataObjScan location=objects/fe/01076f7ddb641efd7674346f0617ba8a669050e255dcb865955900 streams=20 section_id=15 projections=(ambiguous.level, builtin.message, ambiguous.rows_affected, builtin.timestamp) predicate[0]=GTE(builtin.timestamp, 2023-12-31T23:59:00Z) predicate[1]=LT(builtin.timestamp, 2024-01-02T00:00:00Z) id=DataObjScan-01K9RWCQAWJWWJQG3QZEBE3042
	                            │       ├── @max_time_range start=2024-01-01T05:30:35.992978093+05:30 end=2024-01-02T05:29:59.772881419+05:30
	                            │       └── @metrics page_download_time_nanos=2755666 dataset_secondary_columns=2 secondary_column_uncompressed_bytes=4195958 secondary_rows_read=5162 exec_duration_ns=18259542 primary_row_bytes_read=222528 secondary_column_bytes=802746 dataset_secondary_column_pages=142 primary_column_uncompressed_bytes=156299 pages_scanned=92 dataset_primary_columns=2 primary_rows_read=22654 secondary_row_bytes_read=959883 dataset_max_rows=70654 rows_to_read_after_pruning=22654 pages_found_in_cache=90 primary_column_bytes=156299 dataset_primary_column_pages=142 batch_download_requests=2 rows_out=5162 read_calls=47
	                            ├── DataObjScan location=objects/8e/1acacdc2ecad024367a9b15ced2d089d58cb0c125db77647d28e49 streams=20 section_id=11 projections=(ambiguous.level, builtin.message, ambiguous.rows_affected, builtin.timestamp) predicate[0]=GTE(builtin.timestamp, 2023-12-31T23:59:00Z) predicate[1]=LT(builtin.timestamp, 2024-01-02T00:00:00Z) id=DataObjScan-01K9RWCQAWJWWJQG3QZJ6YS0BP
	                            │       ├── @max_time_range start=2024-01-01T05:31:02.935080752+05:30 end=2024-01-02T05:29:59.350342697+05:30
	                            │       └── @metrics read_calls=55 secondary_row_bytes_read=1109866 secondary_column_bytes=983075 dataset_secondary_column_pages=142 rows_to_read_after_pruning=27000 dataset_primary_column_pages=142 primary_row_bytes_read=264040 rows_out=6005 secondary_column_uncompressed_bytes=5188747 pages_found_in_cache=106 pages_scanned=108 page_download_time_nanos=6820333 primary_column_bytes=186249 exec_duration_ns=23277167 dataset_max_rows=70581 primary_column_uncompressed_bytes=186249 dataset_primary_columns=2 dataset_secondary_columns=2 primary_rows_read=27000 secondary_rows_read=6005 batch_download_requests=2
	                            ├── DataObjScan location=objects/8e/1acacdc2ecad024367a9b15ced2d089d58cb0c125db77647d28e49 streams=20 section_id=4 projections=(ambiguous.level, builtin.message, ambiguous.rows_affected, builtin.timestamp) predicate[0]=GTE(builtin.timestamp, 2023-12-31T23:59:00Z) predicate[1]=LT(builtin.timestamp, 2024-01-02T00:00:00Z) id=DataObjScan-01K9RWCQAWJWWJQG3QZK73GY4X
	                            │       ├── @max_time_range start=2024-01-01T05:30:00.757267024+05:30 end=2024-01-02T05:29:59.332067063+05:30
	                            │       └── @metrics batch_download_requests=2 dataset_primary_columns=2 pages_scanned=100 exec_duration_ns=23185124 dataset_primary_column_pages=144 rows_to_read_after_pruning=25000 secondary_rows_read=5878 secondary_column_bytes=915635 rows_out=5878 primary_rows_read=25000 dataset_max_rows=71163 page_download_time_nanos=5819418 primary_column_bytes=172402 secondary_column_uncompressed_bytes=4827583 dataset_secondary_columns=2 primary_row_bytes_read=247024 primary_column_uncompressed_bytes=172402 read_calls=51 dataset_secondary_column_pages=144 secondary_row_bytes_read=1087245 pages_found_in_cache=98
	                            ├── DataObjScan location=objects/8e/1acacdc2ecad024367a9b15ced2d089d58cb0c125db77647d28e49 streams=20 section_id=1 projections=(ambiguous.level, builtin.message, ambiguous.rows_affected, builtin.timestamp) predicate[0]=GTE(builtin.timestamp, 2023-12-31T23:59:00Z) predicate[1]=LT(builtin.timestamp, 2024-01-02T00:00:00Z) id=DataObjScan-01K9RWCQAWJWWJQG3QZP5957RQ
	                            │       ├── @max_time_range start=2024-01-01T05:30:12.809327202+05:30 end=2024-01-02T05:29:59.07595663+05:30
	                            │       └── @metrics secondary_column_uncompressed_bytes=4508078 rows_out=5413 dataset_max_rows=70657 pages_found_in_cache=94 batch_download_requests=2 primary_column_bytes=165438 exec_duration_ns=19765712 primary_rows_read=24000 rows_to_read_after_pruning=24000 secondary_rows_read=5413 primary_row_bytes_read=235304 page_download_time_nanos=4227041 dataset_secondary_columns=2 dataset_primary_column_pages=142 pages_scanned=96 read_calls=49 dataset_secondary_column_pages=142 primary_column_uncompressed_bytes=165438 secondary_column_bytes=853772 dataset_primary_columns=2 secondary_row_bytes_read=1001884
	                            ├── DataObjScan location=objects/fe/01076f7ddb641efd7674346f0617ba8a669050e255dcb865955900 streams=20 section_id=6 projections=(ambiguous.level, builtin.message, ambiguous.rows_affected, builtin.timestamp) predicate[0]=GTE(builtin.timestamp, 2023-12-31T23:59:00Z) predicate[1]=LT(builtin.timestamp, 2024-01-02T00:00:00Z) id=DataObjScan-01K9RWCQAWJWWJQG3QZR7M73E3
	                            │       ├── @max_time_range start=2024-01-01T05:30:30.287565305+05:30 end=2024-01-02T05:29:58.355501108+05:30
	                            │       └── @metrics batch_download_requests=2 pages_found_in_cache=94 rows_to_read_after_pruning=23886 primary_row_bytes_read=233704 primary_rows_read=23886 secondary_rows_read=5327 primary_column_bytes=164807 secondary_column_bytes=858729 primary_column_uncompressed_bytes=164807 dataset_primary_columns=2 dataset_max_rows=70886 secondary_column_uncompressed_bytes=4551490 exec_duration_ns=21907459 read_calls=49 dataset_primary_column_pages=142 secondary_row_bytes_read=985936 pages_scanned=96 dataset_secondary_column_pages=142 page_download_time_nanos=7202750 rows_out=5327 dataset_secondary_columns=2

Which issue(s) this PR fixes:
Fixes #

Special notes for your reviewer:

Checklist

  • Reviewed the CONTRIBUTING.md guide (required)
  • Documentation added
  • Tests updated
  • Title matches the required conventional commits format, see here
    • Note that Promtail is considered to be feature complete, and future development for logs collection will be in Grafana Alloy. As such, feat PRs are unlikely to be accepted unless a case can be made for the feature actually being a bug fix to existing behavior.
  • Changes that require user attention or interaction to upgrade are documented in docs/sources/setup/upgrade/_index.md
  • If the change is deprecating or removing a configuration option, update the deprecated-config.yaml and deleted-config.yaml files respectively in the tools/deprecated-config-checker directory. Example PR

// in the returned record are ordered by timestamp in the direction specified
// by opts.Direction.
func newDataobjScanPipeline(opts dataobjScanOptions, logger log.Logger) *dataobjScan {
func newDataobjScanPipeline(opts dataobjScanOptions, logger log.Logger, region *xcap.Region) *dataobjScan {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

region is created once per node. The node owns the region, it has to end the region on Close()
alternatively we could also create it on the first Read() call

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably be pulling the current region out of the context so we don't have to pass it around everywhere, maybe an xcap.CurrentRegion(ctx)? (If there isn't a current region, it can make one that does nothing by checking to see if the capture is nil or not)

@ashwanthgoli ashwanthgoli marked this pull request as ready for review November 12, 2025 11:07
@ashwanthgoli ashwanthgoli requested a review from a team as a code owner November 12, 2025 11:07
Copy link
Member

@rfratto rfratto left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

starting out with a round of comments for things we don't need to address right away, but stood out to me while looking through the code

Comment on lines 69 to 81
// NewScope creates a new Scope with the given data.
// This function is mainly used for unmarshaling from protobuf.
func NewScope(name string, startTime, endTime time.Time, observations map[StatisticKey]AggregatedObservation, ended bool, capture *Capture) *Scope {
return &Scope{
capture: capture,
name: name,
attributes: nil, // Attributes not available during unmarshaling
startTime: startTime,
endTime: endTime,
observations: observations,
ended: ended,
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for the future: feels weird to expose this just for protobuf, maybe we'll eventually want to have the protobuf conversion as internal logic in the xcap package 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i didn't like adding this either, i'll make that change in a follow-up

}

// Create OpenTelemetry span for this scope.
ctx, span := xcapTracer.Start(ctx, name, trace.WithAttributes(cfg.attributes...))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for the future: eventually this will need to be moved to an export step; as it is now any span we create will get considered for exporting even if we want to bubble regions up

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah 👍 I left this in intentionally so we have connected spans for a task.

package xcap

// AggregatedObservation holds an aggregated value for a statistic within a scope.
type AggregatedObservation struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for the future: I don't think AggregatedObservation should be exposed as part of the xcap API; this feels more like a detail of how we bubble up scopes.

Comment on lines +47 to +50
Name() string
DataType() DataType
Aggregation() AggregationType
Key() StatisticKey
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for the future: I think we should unexport all methods except for Name here; that would also allow us to unexport DataType, AggregationType, and StatisticKey since we probably don't need xcap users to know about those.

(This will be more possible once we do the other change to move protobuf conversion as unexported logic in xcap)

Copy link
Member

@rfratto rfratto left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice job! i'm really excited to have this, especially for some of the stats I wanted to make that would have non-sum aggregations (like for start/end times)

this round of comments is stuff I'd like to discuss or see addressed before we merge.

Comment on lines 27 to 33
// ScopeProvider is an optional interface that pipelines can implement
// to expose their associated xcap scope for statistics collection.
type ScopeProvider interface {
// Scope returns the xcap scope associated with this pipeline node, if any.
// Returns nil if no scope is associated with this pipeline.
Scope() *xcap.Scope
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm having a hard time telling where the result of Scope() gets called for any reason other than implementing Scope().

Is it possible to handle this with context.Context instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is being used by observedPipeline to capture common stats for all nodes

func newObservedPipeline(inner Pipeline) *observedPipeline {
p := &observedPipeline{
inner: inner,
}
if provider, ok := inner.(ScopeProvider); ok {
p.scope = provider.Scope()
}
return p
}
// Read implements Pipeline.
func (p *observedPipeline) Read(ctx context.Context) (arrow.RecordBatch, error) {
start := time.Now()
if p.scope != nil {
p.scope.Record(statReadCalls.Observe(1))
}
rec, err := p.inner.Read(ctx)
if p.scope != nil {
if rec != nil {
p.scope.Record(statRowsOut.Observe(rec.NumRows()))
}
p.scope.Record(statReadDuration.Observe(time.Since(start).Nanoseconds()))
}

an alternate could be to store the scope of child node in observedPipeline. On every read call it attaches the scope to the context before passing it to the child node


// newObservedPipeline wraps a pipeline to automatically collect common statistics.
// If the pipeline has a region, statistics will be recorded to it.
func newObservedPipeline(inner Pipeline) *observedPipeline {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only called when executing ScanSet, so we'd only get these stats for the basic engine where the scheduler isn't being used; is that intentional?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants