@@ -318,8 +318,8 @@ func NewSingleBlockQuerierFromMeta(phlarectx context.Context, bucketReader phlar
318
318
return q
319
319
}
320
320
321
- func (b * singleBlockQuerier ) Profiles () parquet. Rows {
322
- return parquet . NewReader ( b .profiles .file . File , schemav1 . ProfilesSchema )
321
+ func (b * singleBlockQuerier ) Profiles () ProfileReader {
322
+ return b .profiles .file
323
323
}
324
324
325
325
func (b * singleBlockQuerier ) Index () IndexReader {
@@ -531,11 +531,6 @@ func (b *singleBlockQuerier) Bounds() (model.Time, model.Time) {
531
531
return b .meta .MinTime , b .meta .MaxTime
532
532
}
533
533
534
- type labelsInfo struct {
535
- fp model.Fingerprint
536
- lbs phlaremodel.Labels
537
- }
538
-
539
534
type Profile interface {
540
535
StacktracePartition () uint64
541
536
Timestamp () model.Time
@@ -548,8 +543,10 @@ type Querier interface {
548
543
549
544
SelectMatchingProfiles (ctx context.Context , params * ingestv1.SelectProfilesRequest ) (iter.Iterator [Profile ], error )
550
545
MergeByStacktraces (ctx context.Context , rows iter.Iterator [Profile ]) (* phlaremodel.Tree , error )
546
+ SelectMergeByStacktraces (ctx context.Context , params * ingestv1.SelectProfilesRequest ) (* phlaremodel.Tree , error )
551
547
MergeBySpans (ctx context.Context , rows iter.Iterator [Profile ], spans phlaremodel.SpanSelector ) (* phlaremodel.Tree , error )
552
548
MergeByLabels (ctx context.Context , rows iter.Iterator [Profile ], by ... string ) ([]* typesv1.Series , error )
549
+ SelectMergeByLabels (ctx context.Context , params * ingestv1.SelectProfilesRequest , by ... string ) ([]* typesv1.Series , error )
553
550
MergePprof (ctx context.Context , rows iter.Iterator [Profile ]) (* profile.Profile , error )
554
551
Series (ctx context.Context , params * ingestv1.SeriesRequest ) ([]* typesv1.Labels , error )
555
552
ProfileTypes (context.Context , * connect.Request [ingestv1.ProfileTypesRequest ]) (* connect.Response [ingestv1.ProfileTypesResponse ], error )
@@ -814,25 +811,8 @@ func MergeProfilesStacktraces(ctx context.Context, stream *connect.BidiStream[in
814
811
for _ , querier := range queriers {
815
812
querier := querier
816
813
g .Go (util .RecoverPanic (func () error {
817
- iters , err := querier .SelectMatchingProfiles (ctx , request )
818
- if err != nil {
819
- return err
820
- }
821
- defer func () {
822
- iters .Close ()
823
- }()
824
-
825
- profiles , err := iter .Slice (iters )
826
- if err != nil {
827
- return err
828
- }
829
-
830
- if len (profiles ) == 0 {
831
- return nil
832
- }
833
-
834
814
// TODO(simonswine): Split profiles per row group and run the MergeByStacktraces in parallel.
835
- merge , err := querier .MergeByStacktraces (ctx , iter . NewSliceIterator ( querier . Sort ( profiles )) )
815
+ merge , err := querier .SelectMergeByStacktraces (ctx , request )
836
816
if err != nil {
837
817
return err
838
818
}
@@ -1071,24 +1051,7 @@ func MergeProfilesLabels(ctx context.Context, stream *connect.BidiStream[ingestv
1071
1051
for _ , querier := range queriers {
1072
1052
querier := querier
1073
1053
g .Go (util .RecoverPanic (func () error {
1074
- iters , err := querier .SelectMatchingProfiles (ctx , request )
1075
- if err != nil {
1076
- return err
1077
- }
1078
- defer func () {
1079
- iters .Close ()
1080
- }()
1081
-
1082
- profiles , err := iter .Slice (iters )
1083
- if err != nil {
1084
- return err
1085
- }
1086
-
1087
- if len (profiles ) == 0 {
1088
- return nil
1089
- }
1090
-
1091
- merge , err := querier .MergeByLabels (ctx , iter .NewSliceIterator (querier .Sort (profiles )), by ... )
1054
+ merge , err := querier .SelectMergeByLabels (ctx , request , by ... )
1092
1055
if err != nil {
1093
1056
return err
1094
1057
}
@@ -1511,6 +1474,11 @@ func retrieveStacktracePartition(buf [][]parquet.Value, pos int) uint64 {
1511
1474
return uint64 (0 )
1512
1475
}
1513
1476
1477
+ type labelsInfo struct {
1478
+ fp model.Fingerprint
1479
+ lbs phlaremodel.Labels
1480
+ }
1481
+
1514
1482
func (b * singleBlockQuerier ) SelectMatchingProfiles (ctx context.Context , params * ingestv1.SelectProfilesRequest ) (iter.Iterator [Profile ], error ) {
1515
1483
sp , ctx := opentracing .StartSpanFromContext (ctx , "SelectMatchingProfiles - Block" )
1516
1484
defer sp .Finish ()
@@ -1608,6 +1576,169 @@ func (b *singleBlockQuerier) SelectMatchingProfiles(ctx context.Context, params
1608
1576
return iter .NewMergeIterator (maxBlockProfile , false , iters ... ), nil
1609
1577
}
1610
1578
1579
+ func (b * singleBlockQuerier ) SelectMergeByLabels (ctx context.Context , params * ingestv1.SelectProfilesRequest , by ... string ) ([]* typesv1.Series , error ) {
1580
+ sp , ctx := opentracing .StartSpanFromContext (ctx , "SelectMergeByLabels - Block" )
1581
+ defer sp .Finish ()
1582
+ sp .SetTag ("block ULID" , b .meta .ULID .String ())
1583
+
1584
+ if err := b .Open (ctx ); err != nil {
1585
+ return nil , err
1586
+ }
1587
+ matchers , err := parser .ParseMetricSelector (params .LabelSelector )
1588
+ if err != nil {
1589
+ return nil , status .Error (codes .InvalidArgument , "failed to parse label selectors: " + err .Error ())
1590
+ }
1591
+ if params .Type == nil {
1592
+ return nil , errors .New ("no profileType given" )
1593
+ }
1594
+ matchers = append (matchers , phlaremodel .SelectorFromProfileType (params .Type ))
1595
+
1596
+ postings , err := PostingsForMatchers (b .index , nil , matchers ... )
1597
+ if err != nil {
1598
+ return nil , err
1599
+ }
1600
+ var (
1601
+ chks = make ([]index.ChunkMeta , 1 )
1602
+ lblsPerRef = make (map [int64 ]labelsInfo )
1603
+ lbls = make (phlaremodel.Labels , 0 , 6 )
1604
+ )
1605
+ // get all relevant labels/fingerprints
1606
+ for postings .Next () {
1607
+ fp , err := b .index .SeriesBy (postings .At (), & lbls , & chks , by ... )
1608
+ if err != nil {
1609
+ return nil , err
1610
+ }
1611
+
1612
+ _ , ok := lblsPerRef [int64 (chks [0 ].SeriesIndex )]
1613
+ if ! ok {
1614
+ info := labelsInfo {
1615
+ fp : model .Fingerprint (fp ),
1616
+ lbs : make (phlaremodel.Labels , len (lbls )),
1617
+ }
1618
+ copy (info .lbs , lbls )
1619
+ lblsPerRef [int64 (chks [0 ].SeriesIndex )] = info
1620
+ }
1621
+ }
1622
+ it := query .NewBinaryJoinIterator (
1623
+ 0 ,
1624
+ b .profiles .columnIter (ctx , "SeriesIndex" , query .NewMapPredicate (lblsPerRef ), "SeriesIndex" ),
1625
+ b .profiles .columnIter (ctx , "TimeNanos" , query .NewIntBetweenPredicate (model .Time (params .Start ).UnixNano (), model .Time (params .End ).UnixNano ()), "TimeNanos" ),
1626
+ )
1627
+
1628
+ currSeriesIndex := int64 (- 1 )
1629
+ currSeriesInfo := labelsInfo {}
1630
+ buf := make ([][]parquet.Value , 2 )
1631
+
1632
+ // todo: we should stream profile to merge instead of loading all in memory.
1633
+ // This is a temporary solution for now since there's a memory corruption happening.
1634
+ rows , err := iter.Slice [Profile ](
1635
+ & RowsIterator [Profile ]{
1636
+ rows : it ,
1637
+ at : func (ir * query.IteratorResult ) Profile {
1638
+ buf = ir .Columns (buf , "SeriesIndex" , "TimeNanos" )
1639
+ seriesIndex := buf [0 ][0 ].Int64 ()
1640
+ if seriesIndex != currSeriesIndex {
1641
+ currSeriesIndex = seriesIndex
1642
+ currSeriesInfo = lblsPerRef [seriesIndex ]
1643
+ }
1644
+ return BlockProfile {
1645
+ labels : currSeriesInfo .lbs ,
1646
+ fp : currSeriesInfo .fp ,
1647
+ ts : model .TimeFromUnixNano (buf [1 ][0 ].Int64 ()),
1648
+ RowNum : ir .RowNumber [0 ],
1649
+ }
1650
+ },
1651
+ })
1652
+ if err != nil {
1653
+ return nil , err
1654
+ }
1655
+
1656
+ columnName := "TotalValue"
1657
+ if b .meta .Version == 1 {
1658
+ columnName = "Samples.list.element.Value"
1659
+ }
1660
+ return mergeByLabels [Profile ](ctx , b .profiles .file , columnName , iter .NewSliceIterator (rows ), by ... )
1661
+ }
1662
+
1663
+ func (b * singleBlockQuerier ) SelectMergeByStacktraces (ctx context.Context , params * ingestv1.SelectProfilesRequest ) (tree * phlaremodel.Tree , err error ) {
1664
+ sp , ctx := opentracing .StartSpanFromContext (ctx , "SelectMergeByStacktraces - Block" )
1665
+ defer sp .Finish ()
1666
+ sp .SetTag ("block ULID" , b .meta .ULID .String ())
1667
+
1668
+ if err := b .Open (ctx ); err != nil {
1669
+ return nil , err
1670
+ }
1671
+ matchers , err := parser .ParseMetricSelector (params .LabelSelector )
1672
+ if err != nil {
1673
+ return nil , status .Error (codes .InvalidArgument , "failed to parse label selectors: " + err .Error ())
1674
+ }
1675
+ if params .Type == nil {
1676
+ return nil , errors .New ("no profileType given" )
1677
+ }
1678
+ matchers = append (matchers , phlaremodel .SelectorFromProfileType (params .Type ))
1679
+
1680
+ postings , err := PostingsForMatchers (b .index , nil , matchers ... )
1681
+ if err != nil {
1682
+ return nil , err
1683
+ }
1684
+
1685
+ var (
1686
+ chks = make ([]index.ChunkMeta , 1 )
1687
+ lblsPerRef = make (map [int64 ]struct {})
1688
+ )
1689
+
1690
+ // get all relevant labels/fingerprints
1691
+ for postings .Next () {
1692
+ _ , err := b .index .Series (postings .At (), nil , & chks )
1693
+ if err != nil {
1694
+ return nil , err
1695
+ }
1696
+ lblsPerRef [int64 (chks [0 ].SeriesIndex )] = struct {}{}
1697
+ }
1698
+ r := symdb .NewResolver (ctx , b .symbols )
1699
+ defer r .Release ()
1700
+
1701
+ it := query .NewBinaryJoinIterator (
1702
+ 0 ,
1703
+ b .profiles .columnIter (ctx , "SeriesIndex" , query .NewMapPredicate (lblsPerRef ), "" ),
1704
+ b .profiles .columnIter (ctx , "TimeNanos" , query .NewIntBetweenPredicate (model .Time (params .Start ).UnixNano (), model .Time (params .End ).UnixNano ()), "" ),
1705
+ )
1706
+
1707
+ if b .meta .Version >= 2 {
1708
+ it = query .NewBinaryJoinIterator (0 ,
1709
+ it ,
1710
+ b .profiles .columnIter (ctx , "StacktracePartition" , nil , "StacktracePartition" ),
1711
+ )
1712
+ }
1713
+ buf := make ([][]parquet.Value , 1 )
1714
+
1715
+ // todo: we should stream profile to merge instead of loading all in memory.
1716
+ // This is a temporary solution for now since there's a memory corruption happening.
1717
+ rows , err := iter.Slice [rowProfile ](
1718
+ & RowsIterator [rowProfile ]{
1719
+ rows : it ,
1720
+ at : func (ir * query.IteratorResult ) rowProfile {
1721
+ buf = ir .Columns (buf , "StacktracePartition" )
1722
+ if len (buf [0 ]) == 0 {
1723
+ return rowProfile {
1724
+ rowNum : ir .RowNumber [0 ],
1725
+ }
1726
+ }
1727
+ return rowProfile {
1728
+ rowNum : ir .RowNumber [0 ],
1729
+ partition : buf [0 ][0 ].Uint64 (),
1730
+ }
1731
+ },
1732
+ })
1733
+ if err != nil {
1734
+ return nil , err
1735
+ }
1736
+ if err := mergeByStacktraces [rowProfile ](ctx , b .profiles .file , iter .NewSliceIterator (rows ), r ); err != nil {
1737
+ return nil , err
1738
+ }
1739
+ return r .Tree ()
1740
+ }
1741
+
1611
1742
// Series selects the series labels from this block.
1612
1743
//
1613
1744
// Note: It will select ALL the labels in the block, not necessarily just the
@@ -1837,7 +1968,7 @@ func (r *parquetReader[M, P]) relPath() string {
1837
1968
}
1838
1969
1839
1970
func (r * parquetReader [M , P ]) columnIter (ctx context.Context , columnName string , predicate query.Predicate , alias string ) query.Iterator {
1840
- index , _ := query .GetColumnIndexByPath (r .file .File , columnName )
1971
+ index , _ := query .GetColumnIndexByPath (r .file .File . Root () , columnName )
1841
1972
if index == - 1 {
1842
1973
return query .NewErrIterator (fmt .Errorf ("column '%s' not found in parquet file '%s'" , columnName , r .relPath ()))
1843
1974
}
0 commit comments