Skip to content

Feat: ingest align block ranges. #2521

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Oct 12, 2023
Merged
65 changes: 2 additions & 63 deletions go.work.sum

Large diffs are not rendered by default.

3 changes: 1 addition & 2 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ func (i *ingesterFlusherCompat) Flush() {
}

func New(phlarectx context.Context, cfg Config, dbConfig phlaredb.Config, storageBucket phlareobj.Bucket, limits Limits) (*Ingester, error) {

i := &Ingester{
cfg: cfg,
phlarectx: phlarectx,
Expand Down Expand Up @@ -268,7 +267,7 @@ func (i *Ingester) Flush(ctx context.Context, req *connect.Request[ingesterv1.Fl
i.instancesMtx.RLock()
defer i.instancesMtx.RUnlock()
for _, inst := range i.instances {
if err := inst.Flush(ctx); err != nil {
if err := inst.Flush(ctx, true, "api"); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should "api" be a constant somewhere? Just like flushReasonMaxBlockBytes and flushReasonMaxDuration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may be ? It doesn't add much if it used once, but +1 on consistency.

return nil, err
}
}
Expand Down
4 changes: 0 additions & 4 deletions pkg/ingester/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,6 @@ func (i *Ingester) ProfileTypes(ctx context.Context, req *connect.Request[ingest
// Series returns labels series for the given set of matchers.
func (i *Ingester) Series(ctx context.Context, req *connect.Request[ingestv1.SeriesRequest]) (*connect.Response[ingestv1.SeriesResponse], error) {
return forInstanceUnary(ctx, i, func(instance *instance) (*connect.Response[ingestv1.SeriesResponse], error) {
legacyRequest := req.Msg.Start == 0 || req.Msg.End == 0
if legacyRequest {
return instance.LegacySeries(ctx, req)
}
return instance.Series(ctx, req)
})
}
Expand Down
195 changes: 192 additions & 3 deletions pkg/phlaredb/block_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,21 @@ func (b *singleBlockQuerier) Meta() block.Meta {
return *b.meta
}

func (b *singleBlockQuerier) ProfileTypes(ctx context.Context, req *connect.Request[ingestv1.ProfileTypesRequest]) (*connect.Response[ingestv1.ProfileTypesResponse], error) {
// todo
return connect.NewResponse(&ingestv1.ProfileTypesResponse{}), nil
}

func (b *singleBlockQuerier) LabelValues(ctx context.Context, req *connect.Request[typesv1.LabelValuesRequest]) (*connect.Response[typesv1.LabelValuesResponse], error) {
// todo
return connect.NewResponse(&typesv1.LabelValuesResponse{}), nil
}

func (b *singleBlockQuerier) LabelNames(ctx context.Context, req *connect.Request[typesv1.LabelNamesRequest]) (*connect.Response[typesv1.LabelNamesResponse], error) {
// todo
return connect.NewResponse(&typesv1.LabelNamesResponse{}), nil
}

func (b *singleBlockQuerier) Close() error {
b.openLock.Lock()
defer func() {
Expand Down Expand Up @@ -389,6 +404,9 @@ type Querier interface {
MergeByLabels(ctx context.Context, rows iter.Iterator[Profile], by ...string) ([]*typesv1.Series, error)
MergePprof(ctx context.Context, rows iter.Iterator[Profile]) (*profile.Profile, error)
Series(ctx context.Context, params *ingestv1.SeriesRequest) ([]*typesv1.Labels, error)
ProfileTypes(context.Context, *connect.Request[ingestv1.ProfileTypesRequest]) (*connect.Response[ingestv1.ProfileTypesResponse], error)
LabelValues(ctx context.Context, req *connect.Request[typesv1.LabelValuesRequest]) (*connect.Response[typesv1.LabelValuesResponse], error)
LabelNames(ctx context.Context, req *connect.Request[typesv1.LabelNamesRequest]) (*connect.Response[typesv1.LabelNamesResponse], error)
Open(ctx context.Context) error
// Sorts profiles for retrieval.
Sort([]Profile) []Profile
Expand All @@ -405,6 +423,19 @@ func InRange(q Querier, start, end model.Time) bool {
return true
}

type ReadAPI interface {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This type is here strictly to enforce Queriers implements these methods, correct? I don't see it used anywhere else.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah we don't really need it, but I think this is the start of a bigger refactoring, we in fact should only implement this in Queriers.

LabelValues(context.Context, *connect.Request[typesv1.LabelValuesRequest]) (*connect.Response[typesv1.LabelValuesResponse], error)
LabelNames(context.Context, *connect.Request[typesv1.LabelNamesRequest]) (*connect.Response[typesv1.LabelNamesResponse], error)
ProfileTypes(context.Context, *connect.Request[ingestv1.ProfileTypesRequest]) (*connect.Response[ingestv1.ProfileTypesResponse], error)
Series(context.Context, *connect.Request[ingestv1.SeriesRequest]) (*connect.Response[ingestv1.SeriesResponse], error)
MergeProfilesStacktraces(context.Context, *connect.BidiStream[ingestv1.MergeProfilesStacktracesRequest, ingestv1.MergeProfilesStacktracesResponse]) error
MergeProfilesLabels(context.Context, *connect.BidiStream[ingestv1.MergeProfilesLabelsRequest, ingestv1.MergeProfilesLabelsResponse]) error
MergeProfilesPprof(context.Context, *connect.BidiStream[ingestv1.MergeProfilesPprofRequest, ingestv1.MergeProfilesPprofResponse]) error
MergeSpanProfile(context.Context, *connect.BidiStream[ingestv1.MergeSpanProfileRequest, ingestv1.MergeSpanProfileResponse]) error
}

var _ ReadAPI = make(Queriers, 0)

type Queriers []Querier

func (queriers Queriers) Open(ctx context.Context) error {
Expand All @@ -430,7 +461,167 @@ func (queriers Queriers) SelectMatchingProfiles(ctx context.Context, params *ing
return iter.NewMergeIterator(maxBlockProfile, true, iters...), nil
}

func (queriers Queriers) ForTimeRange(_ context.Context, start, end model.Time) (Queriers, error) {
func (queriers Queriers) LabelValues(ctx context.Context, req *connect.Request[typesv1.LabelValuesRequest]) (*connect.Response[typesv1.LabelValuesResponse], error) {
// todo: Add support start and end.
// if req.Msg.Start != 0 && req.Msg.End != 0 {
// var err error
// queriers, err = queriers.forTimeRange(ctx, model.Time(req.Msg.Start), model.Time(req.Msg.End))
// if err != nil {
// return nil, err
// }
// }

g, ctx := errgroup.WithContext(ctx)
uniqValues := make(map[string]struct{})
mutex := sync.Mutex{}

for _, q := range queriers {
q := q
g.Go(func() error {
res, err := q.LabelValues(ctx, req)
if err != nil {
return err
}
mutex.Lock()
defer mutex.Unlock()
if res != nil {
for _, name := range res.Msg.Names {
uniqValues[name] = struct{}{}
}
}
return nil
})
}
if err := g.Wait(); err != nil {
return nil, err
}

values := lo.Keys(uniqValues)
sort.Strings(values)
return connect.NewResponse(&typesv1.LabelValuesResponse{
Names: values,
}), nil
}

func (queriers Queriers) LabelNames(ctx context.Context, req *connect.Request[typesv1.LabelNamesRequest]) (*connect.Response[typesv1.LabelNamesResponse], error) {
// todo: Add support start and end.
// if req.Msg.Start != 0 && req.Msg.End != 0 {
// var err error
// queriers, err = queriers.forTimeRange(ctx, model.Time(req.Msg.Start), model.Time(req.Msg.End))
// if err != nil {
// return nil, err
// }
// }

uniqNames := make(map[string]struct{})
mutex := sync.Mutex{}
g, ctx := errgroup.WithContext(ctx)

for _, q := range queriers {
q := q
g.Go(func() error {
res, err := q.LabelNames(ctx, req)
if err != nil {
return err
}
mutex.Lock()
defer mutex.Unlock()
if res != nil {
for _, name := range res.Msg.Names {
uniqNames[name] = struct{}{}
}
}
return nil
})
}
if err := g.Wait(); err != nil {
return nil, err
}
names := lo.Keys(uniqNames)
sort.Strings(names)
return connect.NewResponse(&typesv1.LabelNamesResponse{
Names: names,
}), nil
}

func (queriers Queriers) ProfileTypes(ctx context.Context, req *connect.Request[ingestv1.ProfileTypesRequest]) (*connect.Response[ingestv1.ProfileTypesResponse], error) {
// todo: Add support start and end.
// if req.Msg.Start != 0 && req.Msg.End != 0 {
// var err error
// queriers, err = queriers.forTimeRange(ctx, model.Time(req.Msg.Start), model.Time(req.Msg.End))
// if err != nil {
// return nil, err
// }
// }

g, ctx := errgroup.WithContext(ctx)
uniqTypes := make(map[string]*typesv1.ProfileType)
mutex := sync.Mutex{}

for _, q := range queriers {
q := q
g.Go(func() error {
res, err := q.ProfileTypes(ctx, req)
if err != nil {
return err
}
mutex.Lock()
defer mutex.Unlock()
if res != nil {
for _, t := range res.Msg.ProfileTypes {
uniqTypes[t.ID] = t.CloneVT()
}
}
return nil
})
}
if err := g.Wait(); err != nil {
return nil, err
}
types := lo.Values(uniqTypes)
sort.Slice(types, func(i, j int) bool {
return types[i].ID < types[j].ID
})
return connect.NewResponse(&ingestv1.ProfileTypesResponse{
ProfileTypes: types,
}), nil
}

func (queriers Queriers) Series(ctx context.Context, req *connect.Request[ingestv1.SeriesRequest]) (*connect.Response[ingestv1.SeriesResponse], error) {
// todo: verify empty timestamp request should return all series
blockGetter := queriers.forTimeRange
// Legacy Series queries without a range should return all series from all head blocks.
if req.Msg.Start == 0 || req.Msg.End == 0 {
blockGetter = func(_ context.Context, _, _ model.Time) (Queriers, error) {
return queriers, nil
}
}
res, err := Series(ctx, req.Msg, blockGetter)
if err != nil {
return nil, err
}
return connect.NewResponse(res), nil
}

func (queriers Queriers) MergeProfilesStacktraces(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesStacktracesRequest, ingestv1.MergeProfilesStacktracesResponse]) error {
return MergeProfilesStacktraces(ctx, stream, queriers.forTimeRange)
}

func (queriers Queriers) MergeProfilesLabels(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesLabelsRequest, ingestv1.MergeProfilesLabelsResponse]) error {
return MergeProfilesLabels(ctx, stream, queriers.forTimeRange)
}

func (queriers Queriers) MergeProfilesPprof(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesPprofRequest, ingestv1.MergeProfilesPprofResponse]) error {
return MergeProfilesPprof(ctx, stream, queriers.forTimeRange)
}

func (queriers Queriers) MergeSpanProfile(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeSpanProfileRequest, ingestv1.MergeSpanProfileResponse]) error {
return MergeSpanProfile(ctx, stream, queriers.forTimeRange)
}

type BlockGetter func(ctx context.Context, start, end model.Time) (Queriers, error)

func (queriers Queriers) forTimeRange(_ context.Context, start, end model.Time) (Queriers, error) {
result := make(Queriers, 0, len(queriers))
for _, q := range queriers {
if InRange(q, start, end) {
Expand All @@ -440,8 +631,6 @@ func (queriers Queriers) ForTimeRange(_ context.Context, start, end model.Time)
return result, nil
}

type BlockGetter func(ctx context.Context, start, end model.Time) (Queriers, error)

// SelectMatchingProfiles returns a list iterator of profiles matching the given request.
func SelectMatchingProfiles(ctx context.Context, request *ingestv1.SelectProfilesRequest, queriers Queriers) ([]iter.Iterator[Profile], error) {
g, ctx := errgroup.WithContext(ctx)
Expand Down
Loading