@@ -116,7 +116,7 @@ func (sh *shard) flushSegment(ctx context.Context, wg *sync.WaitGroup) {
116
116
if s .debuginfo .movedHeads > 0 {
117
117
_ = level .Debug (s .logger ).Log ("msg" ,
118
118
"writing segment block done" ,
119
- "heads-count" , len (s .datasets ),
119
+ "heads-count" , len (s .heads ),
120
120
"heads-moved-count" , s .debuginfo .movedHeads ,
121
121
"inflight-duration" , s .debuginfo .waitInflight ,
122
122
"flush-heads-duration" , s .debuginfo .flushHeadsDuration ,
@@ -203,7 +203,7 @@ func (sw *segmentsWriter) newSegment(sh *shard, sk shardKey, sl log.Logger) *seg
203
203
s := & segment {
204
204
logger : log .With (sl , "segment-id" , id .String ()),
205
205
ulid : id ,
206
- datasets : make (map [datasetKey ]* dataset ),
206
+ heads : make (map [datasetKey ]dataset ),
207
207
sw : sw ,
208
208
sh : sh ,
209
209
shard : sk ,
@@ -216,7 +216,7 @@ func (sw *segmentsWriter) newSegment(sh *shard, sk shardKey, sl log.Logger) *seg
216
216
func (s * segment ) flush (ctx context.Context ) (err error ) {
217
217
span , ctx := opentracing .StartSpanFromContext (ctx , "segment.flush" , opentracing.Tags {
218
218
"block_id" : s .ulid .String (),
219
- "datasets" : len (s .datasets ),
219
+ "datasets" : len (s .heads ),
220
220
"shard" : s .shard ,
221
221
})
222
222
defer span .Finish ()
@@ -340,10 +340,6 @@ func concatSegmentHead(f *headFlush, w *writerOffset, s *metadata.StringTable) *
340
340
lb .WithLabelSet (model .LabelNameServiceName , f .head .key .service , model .LabelNameProfileType , profileType )
341
341
}
342
342
343
- if f .flushed .HasUnsymbolizedProfiles {
344
- lb .WithLabelSet (model .LabelNameServiceName , f .head .key .service , metadata .LabelNameUnsymbolized , "true" )
345
- }
346
-
347
343
// Other optional labels:
348
344
// lb.WithLabelSet("label_name", "label_value", ...)
349
345
ds .Labels = lb .Build ()
@@ -352,8 +348,8 @@ func concatSegmentHead(f *headFlush, w *writerOffset, s *metadata.StringTable) *
352
348
}
353
349
354
350
func (s * segment ) flushHeads (ctx context.Context ) flushStream {
355
- heads := maps .Values (s .datasets )
356
- slices .SortFunc (heads , func (a , b * dataset ) int {
351
+ heads := maps .Values (s .heads )
352
+ slices .SortFunc (heads , func (a , b dataset ) int {
357
353
return a .key .compare (b .key )
358
354
})
359
355
@@ -368,15 +364,15 @@ func (s *segment) flushHeads(ctx context.Context) flushStream {
368
364
defer close (f .done )
369
365
flushed , err := s .flushHead (ctx , f .head )
370
366
if err != nil {
371
- level .Error (s .logger ).Log ("msg" , "failed to flush dataset " , "err" , err )
367
+ level .Error (s .logger ).Log ("msg" , "failed to flush head " , "err" , err )
372
368
return
373
369
}
374
370
if flushed == nil {
375
- level .Debug (s .logger ).Log ("msg" , "skipping nil dataset " )
371
+ level .Debug (s .logger ).Log ("msg" , "skipping nil head " )
376
372
return
377
373
}
378
374
if flushed .Meta .NumSamples == 0 {
379
- level .Debug (s .logger ).Log ("msg" , "skipping empty dataset " )
375
+ level .Debug (s .logger ).Log ("msg" , "skipping empty head " )
380
376
return
381
377
}
382
378
f .flushed = flushed
@@ -407,24 +403,24 @@ func (s *flushStream) Next() bool {
407
403
return false
408
404
}
409
405
410
- func (s * segment ) flushHead (ctx context.Context , e * dataset ) (* memdb.FlushedHead , error ) {
406
+ func (s * segment ) flushHead (ctx context.Context , e dataset ) (* memdb.FlushedHead , error ) {
411
407
th := time .Now ()
412
408
flushed , err := e .head .Flush (ctx )
413
409
if err != nil {
414
410
s .sw .metrics .flushServiceHeadDuration .WithLabelValues (s .sshard , e .key .tenant ).Observe (time .Since (th ).Seconds ())
415
411
s .sw .metrics .flushServiceHeadError .WithLabelValues (s .sshard , e .key .tenant ).Inc ()
416
- return nil , fmt .Errorf ("failed to flush dataset : %w" , err )
412
+ return nil , fmt .Errorf ("failed to flush head : %w" , err )
417
413
}
418
414
s .sw .metrics .flushServiceHeadDuration .WithLabelValues (s .sshard , e .key .tenant ).Observe (time .Since (th ).Seconds ())
419
415
level .Debug (s .logger ).Log (
420
- "msg" , "flushed dataset " ,
416
+ "msg" , "flushed head " ,
421
417
"tenant" , e .key .tenant ,
422
418
"service" , e .key .service ,
423
419
"profiles" , flushed .Meta .NumProfiles ,
424
420
"profiletypes" , fmt .Sprintf ("%v" , flushed .Meta .ProfileTypeNames ),
425
421
"mintime" , flushed .Meta .MinTimeNanos ,
426
422
"maxtime" , flushed .Meta .MaxTimeNanos ,
427
- "dataset -flush-duration" , time .Since (th ).String (),
423
+ "head -flush-duration" , time .Since (th ).String (),
428
424
)
429
425
return flushed , nil
430
426
}
@@ -447,7 +443,7 @@ type dataset struct {
447
443
}
448
444
449
445
type headFlush struct {
450
- head * dataset
446
+ head dataset
451
447
flushed * memdb.FlushedHead
452
448
// protects head
453
449
done chan struct {}
@@ -458,12 +454,10 @@ type segment struct {
458
454
shard shardKey
459
455
sshard string
460
456
inFlightProfiles sync.WaitGroup
461
-
462
- mu sync.RWMutex
463
- datasets map [datasetKey ]* dataset
464
-
465
- logger log.Logger
466
- sw * segmentsWriter
457
+ heads map [datasetKey ]dataset
458
+ headsLock sync.RWMutex
459
+ logger log.Logger
460
+ sw * segmentsWriter
467
461
468
462
// TODO(kolesnikovae): Revisit.
469
463
doneChan chan struct {}
@@ -507,12 +501,11 @@ func (s *segment) ingest(tenantID string, p *profilev1.Profile, id uuid.UUID, la
507
501
tenant : tenantID ,
508
502
service : model .Labels (labels ).Get (model .LabelNameServiceName ),
509
503
}
510
- ds := s .datasetForIngest (k )
511
504
size := p .SizeVT ()
512
505
rules := s .sw .limits .IngestionRelabelingRules (tenantID )
513
506
usage := s .sw .limits .DistributorUsageGroups (tenantID ).GetUsageGroups (tenantID , labels )
514
507
appender := & sampleAppender {
515
- dataset : ds ,
508
+ head : s . headForIngest ( k ) ,
516
509
profile : p ,
517
510
id : id ,
518
511
annotations : annotations ,
@@ -526,7 +519,7 @@ func (s *segment) ingest(tenantID string, p *profilev1.Profile, id uuid.UUID, la
526
519
527
520
type sampleAppender struct {
528
521
id uuid.UUID
529
- dataset * dataset
522
+ head * memdb. Head
530
523
profile * profilev1.Profile
531
524
exporter * pprofmodel.SampleExporter
532
525
annotations []* typesv1.ProfileAnnotation
@@ -536,7 +529,7 @@ type sampleAppender struct {
536
529
}
537
530
538
531
func (v * sampleAppender ) VisitProfile (labels []* typesv1.LabelPair ) {
539
- v .dataset . head .Ingest (v .profile , v .id , labels , v .annotations )
532
+ v .head .Ingest (v .profile , v .id , labels , v .annotations )
540
533
}
541
534
542
535
func (v * sampleAppender ) VisitSampleSeries (labels []* typesv1.LabelPair , samples []* profilev1.Sample ) {
@@ -545,36 +538,37 @@ func (v *sampleAppender) VisitSampleSeries(labels []*typesv1.LabelPair, samples
545
538
}
546
539
var n profilev1.Profile
547
540
v .exporter .ExportSamples (& n , samples )
548
- v .dataset . head .Ingest (v . profile , v .id , labels , v .annotations )
541
+ v .head .Ingest (& n , v .id , labels , v .annotations )
549
542
}
550
543
551
544
func (v * sampleAppender ) Discarded (profiles , bytes int ) {
552
545
v .discardedProfiles += profiles
553
546
v .discardedBytes += bytes
554
547
}
555
548
556
- func (s * segment ) datasetForIngest (k datasetKey ) * dataset {
557
- s .mu .RLock ()
558
- ds , ok := s .datasets [k ]
559
- s .mu .RUnlock ()
549
+ func (s * segment ) headForIngest (k datasetKey ) * memdb. Head {
550
+ s .headsLock .RLock ()
551
+ h , ok := s .heads [k ]
552
+ s .headsLock .RUnlock ()
560
553
if ok {
561
- return ds
554
+ return h . head
562
555
}
563
556
564
- s .mu .Lock ()
565
- defer s .mu .Unlock ()
566
- if ds , ok = s .datasets [k ]; ok {
567
- return ds
557
+ s .headsLock .Lock ()
558
+ defer s .headsLock .Unlock ()
559
+ h , ok = s .heads [k ]
560
+ if ok {
561
+ return h .head
568
562
}
569
563
570
- h := memdb .NewHead (s .sw .headMetrics )
571
- ds = & dataset {
564
+ nh := memdb .NewHead (s .sw .headMetrics )
565
+
566
+ s .heads [k ] = dataset {
572
567
key : k ,
573
- head : h ,
568
+ head : nh ,
574
569
}
575
570
576
- s .datasets [k ] = ds
577
- return ds
571
+ return nh
578
572
}
579
573
580
574
func (sw * segmentsWriter ) uploadBlock (ctx context.Context , blockData []byte , meta * metastorev1.BlockMeta , s * segment ) error {
0 commit comments