diff --git a/pkg/adhoc/pull.go b/pkg/adhoc/pull.go index 67382a7f91..bd0d42b4df 100644 --- a/pkg/adhoc/pull.go +++ b/pkg/adhoc/pull.go @@ -50,7 +50,7 @@ func newPull(cfg *config.Adhoc, args []string, st *storage.Storage, logger *logr } p := parser.New(logger, st, e) - m := scrape.NewManager(logger, p, defaultMetricsRegistry) + m := scrape.NewManager(logger, p, defaultMetricsRegistry, true) scrapeCfg := &(*scrapeconfig.DefaultConfig()) scrapeCfg.JobName = "adhoc" scrapeCfg.EnabledProfiles = []string{"cpu", "mem"} diff --git a/pkg/adhoc/push.go b/pkg/adhoc/push.go index e33ef50601..7ea7479532 100644 --- a/pkg/adhoc/push.go +++ b/pkg/adhoc/push.go @@ -36,7 +36,7 @@ func newPush(_ *config.Adhoc, args []string, st *storage.Storage, logger *logrus p := parser.New(logger, st, e) return push{ args: args, - handler: server.NewIngestHandler(logger, p, func(*ingestion.IngestInput) {}, httputils.NewDefaultHelper(logger)), + handler: server.NewIngestHandler(logger, p, func(*ingestion.IngestInput) {}, httputils.NewDefaultHelper(logger), true), logger: logger, }, nil } diff --git a/pkg/cli/server.go b/pkg/cli/server.go index 2a836e286a..890fec1ad8 100644 --- a/pkg/cli/server.go +++ b/pkg/cli/server.go @@ -201,7 +201,8 @@ func newServerService(c *config.Server) (*serverService, error) { svc.scrapeManager = scrape.NewManager( svc.logger.WithField("component", "scrape-manager"), ingester, - defaultMetricsRegistry) + defaultMetricsRegistry, + !svc.config.RemoteWrite.Enabled) svc.controller, err = server.New(server.Config{ Configuration: svc.config, diff --git a/pkg/convert/pprof/profile.go b/pkg/convert/pprof/profile.go index 7be2d8a6db..25d9601ac7 100644 --- a/pkg/convert/pprof/profile.go +++ b/pkg/convert/pprof/profile.go @@ -4,6 +4,8 @@ import ( "bytes" "context" "encoding/json" + "errors" + "github.com/pyroscope-io/pyroscope/pkg/util/cumulativepprof" "io" "mime/multipart" "sync" @@ -22,6 +24,8 @@ type RawProfile struct { // References the next profile in the sequence (cumulative type only). next *RawProfile + mergers *cumulativepprof.Mergers + m sync.Mutex // Initializes lazily on Bytes, if not present. RawData []byte // Represents raw request body as per ingestion API. @@ -50,28 +54,73 @@ func (p *RawProfile) ContentType() string { // two consecutive samples to calculate the diff. If parser is not // present due to a failure, or sequence violation, the profiles will // be re-parsed. -func (p *RawProfile) Push(profile []byte, cumulative bool) *RawProfile { +func (p *RawProfile) Push(profile []byte, cumulative, mergeCumulative bool) *RawProfile { p.m.Lock() p.Profile = profile p.RawData = nil - n := &RawProfile{ - SampleTypeConfig: p.SampleTypeConfig, - } if cumulative { + n := &RawProfile{ + SampleTypeConfig: p.SampleTypeConfig, + } // N.B the parser state is only propagated // after successful Parse call. n.PreviousProfile = p.Profile p.next = n + if mergeCumulative { + mergers := p.mergers + if mergers == nil { + mergers = cumulativepprof.NewMergers() + } + err := p.mergeCumulativeLocked(mergers) + if err == nil { + n.mergers = mergers + } + } } p.m.Unlock() return p.next } +func (p *RawProfile) MergeCumulative(ms *cumulativepprof.Mergers) error { + p.m.Lock() + defer p.m.Unlock() + return p.mergeCumulativeLocked(ms) +} + +func (p *RawProfile) mergeCumulativeLocked(ms *cumulativepprof.Mergers) error { + if p.Profile == nil && p.PreviousProfile == nil && p.RawData != nil && p.FormDataContentType != "" { + err := p.loadPprofFromForm() + if err != nil { + return err + } + } + if p.PreviousProfile == nil { + return ErrCumulativeMergeNoPreviousProfile + } + merged, stConfig, err := ms.Merge(p.PreviousProfile, p.Profile, p.SampleTypeConfig) + if err != nil { + return err + } + var mergedProfileBytes bytes.Buffer + err = merged.Write(&mergedProfileBytes) + if err != nil { + return err + } + p.Profile = mergedProfileBytes.Bytes() + p.PreviousProfile = nil + p.SampleTypeConfig = stConfig + p.RawData = nil + return nil +} + const ( formFieldProfile, formFileProfile = "profile", "profile.pprof" formFieldPreviousProfile, formFilePreviousProfile = "prev_profile", "profile.pprof" formFieldSampleTypeConfig, formFileSampleTypeConfig = "sample_type_config", "sample_type_config.json" ) +var ( + ErrCumulativeMergeNoPreviousProfile = errors.New("no previous profile for cumulative merge") +) func (p *RawProfile) Bytes() ([]byte, error) { p.m.Lock() diff --git a/pkg/scrape/config/config.go b/pkg/scrape/config/config.go index 393fe07d23..83b6aa6e2b 100644 --- a/pkg/scrape/config/config.go +++ b/pkg/scrape/config/config.go @@ -88,33 +88,33 @@ func DefaultConfig() *Config { }, "mutex": { Path: "/debug/pprof/mutex", - Params: nil, + Params: url.Values{ + "seconds": []string{"10"}, + }, SampleTypes: map[string]*profile.SampleTypeConfig{ "contentions": { DisplayName: "mutex_count", Units: metadata.LockSamplesUnits, - Cumulative: true, }, "delay": { DisplayName: "mutex_duration", Units: metadata.LockNanosecondsUnits, - Cumulative: true, }, }, }, "block": { Path: "/debug/pprof/block", - Params: nil, + Params: url.Values{ + "seconds": []string{"10"}, + }, SampleTypes: map[string]*profile.SampleTypeConfig{ "contentions": { DisplayName: "block_count", Units: metadata.LockSamplesUnits, - Cumulative: true, }, "delay": { DisplayName: "block_duration", Units: metadata.LockNanosecondsUnits, - Cumulative: true, }, }, }, diff --git a/pkg/scrape/manager.go b/pkg/scrape/manager.go index a41f9910af..961a97f7b7 100644 --- a/pkg/scrape/manager.go +++ b/pkg/scrape/manager.go @@ -45,19 +45,22 @@ type Manager struct { targetSets map[string][]*targetgroup.Group reloadC chan struct{} + + disableCumulativeMerge bool } // NewManager is the Manager constructor -func NewManager(logger logrus.FieldLogger, p ingestion.Ingester, r prometheus.Registerer) *Manager { +func NewManager(logger logrus.FieldLogger, p ingestion.Ingester, r prometheus.Registerer, disableCumulativeMerge bool) *Manager { c := make(map[string]*config.Config) return &Manager{ - ingester: p, - logger: logger, - scrapeConfigs: c, - scrapePools: make(map[string]*scrapePool), - stop: make(chan struct{}), - reloadC: make(chan struct{}, 1), - metrics: newMetrics(r), + ingester: p, + logger: logger, + scrapeConfigs: c, + scrapePools: make(map[string]*scrapePool), + stop: make(chan struct{}), + reloadC: make(chan struct{}, 1), + metrics: newMetrics(r), + disableCumulativeMerge: disableCumulativeMerge, } } @@ -90,7 +93,7 @@ func (m *Manager) reload() { Errorf("reloading target set") continue } - sp, err := newScrapePool(scrapeConfig, m.ingester, m.logger, m.metrics) + sp, err := newScrapePool(scrapeConfig, m.ingester, m.logger, m.metrics, m.disableCumulativeMerge) if err != nil { m.logger.WithError(err). WithField("scrape_pool", setName). diff --git a/pkg/scrape/scrape.go b/pkg/scrape/scrape.go index 53d04d43fb..e727c909c1 100644 --- a/pkg/scrape/scrape.go +++ b/pkg/scrape/scrape.go @@ -65,9 +65,11 @@ type scrapePool struct { // set of hashes. activeTargets map[uint64]*Target droppedTargets []*Target + + disableCumulativeMerge bool } -func newScrapePool(cfg *config.Config, p ingestion.Ingester, logger logrus.FieldLogger, m *metrics) (*scrapePool, error) { +func newScrapePool(cfg *config.Config, p ingestion.Ingester, logger logrus.FieldLogger, m *metrics, disableCumulativeMerge bool) (*scrapePool, error) { m.pools.Inc() client, err := config.NewClientFromConfig(cfg.HTTPClientConfig, cfg.JobName) if err != nil { @@ -88,6 +90,7 @@ func newScrapePool(cfg *config.Config, p ingestion.Ingester, logger logrus.Field metrics: m, poolMetrics: m.poolMetrics(cfg.JobName), + disableCumulativeMerge: disableCumulativeMerge, } return &sp, nil @@ -105,6 +108,8 @@ func (sp *scrapePool) newScrapeLoop(s *scraper, i, t time.Duration) *scrapeLoop delta: d, interval: i, timeout: t, + + disableCumulativeMerge: sp.disableCumulativeMerge, } x.ctx, x.cancel = context.WithCancel(sp.ctx) return &x @@ -324,6 +329,8 @@ type scrapeLoop struct { delta time.Duration interval time.Duration timeout time.Duration + + disableCumulativeMerge bool } func (sl *scrapeLoop) run() { @@ -421,7 +428,8 @@ func (sl *scrapeLoop) scrape(startTime, endTime time.Time) error { } profile := sl.scraper.profile - sl.scraper.profile = profile.Push(buf.Bytes(), sl.scraper.cumulative) + sl.scraper.profile = profile.Push(buf.Bytes(), sl.scraper.cumulative, !sl.disableCumulativeMerge) + return sl.scraper.ingester.Ingest(ctx, &ingestion.IngestInput{ Profile: profile, Metadata: ingestion.Metadata{ diff --git a/pkg/server/ingest.go b/pkg/server/ingest.go index 7cd2ad6b19..cc61718c95 100644 --- a/pkg/server/ingest.go +++ b/pkg/server/ingest.go @@ -9,6 +9,8 @@ import ( "strings" "time" + "github.com/pyroscope-io/pyroscope/pkg/util/cumulativepprof" + "github.com/pyroscope-io/pyroscope/pkg/convert/speedscope" "github.com/sirupsen/logrus" @@ -28,6 +30,8 @@ type ingestHandler struct { ingester ingestion.Ingester onSuccess func(*ingestion.IngestInput) httpUtils httputils.Utils + + disableCumulativeMerge bool } func (ctrl *Controller) ingestHandler() http.Handler { @@ -35,15 +39,16 @@ func (ctrl *Controller) ingestHandler() http.Handler { ctrl.StatsInc("ingest") ctrl.StatsInc("ingest:" + pi.Metadata.SpyName) ctrl.appStats.Add(hashString(pi.Metadata.Key.AppName())) - }, ctrl.httpUtils) + }, ctrl.httpUtils, !ctrl.config.RemoteWrite.Enabled) } -func NewIngestHandler(log *logrus.Logger, p ingestion.Ingester, onSuccess func(*ingestion.IngestInput), httpUtils httputils.Utils) http.Handler { +func NewIngestHandler(log *logrus.Logger, p ingestion.Ingester, onSuccess func(*ingestion.IngestInput), httpUtils httputils.Utils, disableCumulativeMerge bool) http.Handler { return ingestHandler{ - log: log, - ingester: p, - onSuccess: onSuccess, - httpUtils: httpUtils, + log: log, + ingester: p, + onSuccess: onSuccess, + httpUtils: httpUtils, + disableCumulativeMerge: disableCumulativeMerge, } } @@ -159,10 +164,14 @@ func (h ingestHandler) ingestInputFromRequest(r *http.Request) (*ingestion.Inges } case strings.Contains(contentType, "multipart/form-data"): - input.Profile = &pprof.RawProfile{ + p := &pprof.RawProfile{ FormDataContentType: contentType, RawData: b, } + if !h.disableCumulativeMerge { + p.MergeCumulative(cumulativepprof.NewMergers()) + } + input.Profile = p } if input.Profile == nil { diff --git a/pkg/util/cumulativepprof/cumulative.go b/pkg/util/cumulativepprof/cumulative.go new file mode 100644 index 0000000000..2f11c331dc --- /dev/null +++ b/pkg/util/cumulativepprof/cumulative.go @@ -0,0 +1,154 @@ +package cumulativepprof + +import ( + "fmt" + pprofile "github.com/google/pprof/profile" + "github.com/pyroscope-io/pyroscope/pkg/storage/tree" +) + +type Merger struct { + SampleTypes []string + MergeRatios []float64 + SampleTypeConfig map[string]*tree.SampleTypeConfig + Name string + + prev *pprofile.Profile +} + +type Mergers struct { + Heap *Merger + Block *Merger + Mutex *Merger +} + +func NewMergers() *Mergers { + return &Mergers{ + Block: &Merger{ + SampleTypes: []string{"contentions", "delay"}, + MergeRatios: []float64{-1, -1}, + SampleTypeConfig: map[string]*tree.SampleTypeConfig{ + "contentions": { + DisplayName: "block_count", + Units: "lock_samples", + }, + "delay": { + DisplayName: "block_duration", + Units: "lock_nanoseconds", + }, + }, + Name: "block", + }, + Mutex: &Merger{ + SampleTypes: []string{"contentions", "delay"}, + MergeRatios: []float64{-1, -1}, + SampleTypeConfig: map[string]*tree.SampleTypeConfig{ + "contentions": { + DisplayName: "mutex_count", + Units: "lock_samples", + }, + "delay": { + DisplayName: "mutex_duration", + Units: "lock_nanoseconds", + }, + }, + Name: "mutex", + }, + Heap: &Merger{ + SampleTypes: []string{"alloc_objects", "alloc_space", "inuse_objects", "inuse_space"}, + MergeRatios: []float64{-1, -1, 0, 0}, + SampleTypeConfig: map[string]*tree.SampleTypeConfig{ + "alloc_objects": { + Units: "objects", + }, + "alloc_space": { + Units: "bytes", + }, + "inuse_space": { + Units: "bytes", + Aggregation: "average", + }, + "inuse_objects": { + Units: "objects", + Aggregation: "average", + }, + }, + Name: "heap", + }, + } +} + +func (m *Mergers) Merge(prev, cur []byte, sampleTypeConfig map[string]*tree.SampleTypeConfig) (*pprofile.Profile, map[string]*tree.SampleTypeConfig, error) { + p, err := pprofile.ParseData(cur) + if err != nil { + return nil, nil, err + } + if len(p.SampleType) == 4 { + for i := 0; i < 4; i++ { + if p.SampleType[i].Type != m.Heap.SampleTypes[i] { + return nil, nil, fmt.Errorf("unknown sample type order %v", p.SampleType) + } + } + cfg, ok := sampleTypeConfig["alloc_objects"] + if cfg != nil && ok { + if !cfg.Cumulative { + return nil, nil, fmt.Errorf("alloc_objects profile is already not cumulative: %v", sampleTypeConfig) + } + } + return m.Heap.Merge(prev, p) + } + if len(sampleTypeConfig) == 2 { + for i := 0; i < 2; i++ { + if p.SampleType[i].Type != m.Block.SampleTypes[i] { + return nil, nil, fmt.Errorf("unknown sample type order %v", p.SampleType) + } + } + cfg, ok := sampleTypeConfig["contentions"] + if cfg != nil && ok { + if !cfg.Cumulative { + return nil, nil, fmt.Errorf("contentions profile is already not cumulative: %v", sampleTypeConfig) + } + if cfg.DisplayName == "mutex_count" { + return m.Mutex.Merge(prev, p) + } + if cfg.DisplayName == "block_count" { + return m.Block.Merge(prev, p) + } + } + return nil, nil, fmt.Errorf("unkown profile: %v %v", p.SampleType, sampleTypeConfig) + } + return nil, nil, fmt.Errorf("unknown profile %v %v", p.SampleType, sampleTypeConfig) +} + +func (m *Merger) Merge(prev []byte, cur *pprofile.Profile) (*pprofile.Profile, map[string]*tree.SampleTypeConfig, error) { + var err error + p2 := cur + + p1 := m.prev + if p1 == nil { + p1, err = pprofile.ParseData(prev) + if err != nil { + return nil, nil, err + } + } + + err = p1.ScaleN(m.MergeRatios) + if err != nil { + return nil, nil, err + } + + p, err := pprofile.Merge([]*pprofile.Profile{p1, p2}) + if err != nil { + return nil, nil, err + } + + for _, sample := range p.Sample { + if len(sample.Value) > 0 && sample.Value[0] < 0 { + for i := range sample.Value { + sample.Value[i] = 0 + } + } + } + + m.prev = p2 + return p, m.SampleTypeConfig, nil +}