Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 33 additions & 34 deletions src/OpenTelemetry/Metrics/AggregatorStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ internal sealed class AggregatorStore
internal readonly bool OutputDeltaWithUnusedMetricPointReclaimEnabled;
internal readonly int CardinalityLimit;
internal readonly bool EmitOverflowAttribute;
internal readonly ConcurrentDictionary<Tags, LookupData>? TagsToMetricPointIndexDictionaryDelta;
internal long DroppedMeasurements = 0;

private static readonly string MetricPointCapHitFixMessage = "Consider opting in for the experimental SDK feature to emit all the throttled metrics under the overflow attribute by setting env variable OTEL_DOTNET_EXPERIMENTAL_METRICS_EMIT_OVERFLOW_ATTRIBUTE = true. You could also modify instrumentation to reduce the number of unique key/value pair combinations. Or use Views to drop unwanted tags. Or use MeterProviderBuilder.SetMaxMetricPointsPerMetricStream to set higher limit.";
Expand All @@ -32,8 +33,6 @@ internal sealed class AggregatorStore
private readonly ConcurrentDictionary<Tags, int> tagsToMetricPointIndexDictionary =
new();

private readonly ConcurrentDictionary<Tags, LookupData>? tagsToMetricPointIndexDictionaryDelta;

private readonly string name;
private readonly string metricPointCapHitMessage;
private readonly MetricPoint[] metricPoints;
Expand Down Expand Up @@ -110,7 +109,7 @@ internal AggregatorStore(
// There is no overload which only takes capacity as the parameter
// Using the DefaultConcurrencyLevel defined in the ConcurrentDictionary class: https://github.com/dotnet/runtime/blob/v7.0.5/src/libraries/System.Collections.Concurrent/src/System/Collections/Concurrent/ConcurrentDictionary.cs#L2020
// We expect at the most (maxMetricPoints - reservedMetricPointsCount) * 2 entries- one for sorted and one for unsorted input
this.tagsToMetricPointIndexDictionaryDelta =
this.TagsToMetricPointIndexDictionaryDelta =
new ConcurrentDictionary<Tags, LookupData>(concurrencyLevel: Environment.ProcessorCount, capacity: (cardinalityLimit - reservedMetricPointsCount) * 2);

// Add all the indices except for the reserved ones to the queue so that threads have
Expand Down Expand Up @@ -266,28 +265,28 @@ internal void SnapshotDeltaWithMetricPointReclaim()
// Snapshot method can use this to skip trying to reclaim indices which have already been reclaimed and added to the queue.
metricPoint.LookupData = null;

Debug.Assert(this.tagsToMetricPointIndexDictionaryDelta != null, "this.tagsToMetricPointIndexDictionaryDelta was null");
Debug.Assert(this.TagsToMetricPointIndexDictionaryDelta != null, "this.tagsToMetricPointIndexDictionaryDelta was null");

lock (this.tagsToMetricPointIndexDictionaryDelta!)
lock (this.TagsToMetricPointIndexDictionaryDelta!)
{
LookupData? dictionaryValue;
if (lookupData.SortedTags != Tags.EmptyTags)
{
// Check if no other thread added a new entry for the same Tags.
// If no, then remove the existing entries.
if (this.tagsToMetricPointIndexDictionaryDelta.TryGetValue(lookupData.SortedTags, out dictionaryValue) &&
if (this.TagsToMetricPointIndexDictionaryDelta.TryGetValue(lookupData.SortedTags, out dictionaryValue) &&
dictionaryValue == lookupData)
{
this.tagsToMetricPointIndexDictionaryDelta.TryRemove(lookupData.SortedTags, out var _);
this.tagsToMetricPointIndexDictionaryDelta.TryRemove(lookupData.GivenTags, out var _);
this.TagsToMetricPointIndexDictionaryDelta.TryRemove(lookupData.SortedTags, out var _);
this.TagsToMetricPointIndexDictionaryDelta.TryRemove(lookupData.GivenTags, out var _);
}
}
else
{
if (this.tagsToMetricPointIndexDictionaryDelta.TryGetValue(lookupData.GivenTags, out dictionaryValue) &&
if (this.TagsToMetricPointIndexDictionaryDelta.TryGetValue(lookupData.GivenTags, out dictionaryValue) &&
dictionaryValue == lookupData)
{
this.tagsToMetricPointIndexDictionaryDelta.TryRemove(lookupData.GivenTags, out var _);
this.TagsToMetricPointIndexDictionaryDelta.TryRemove(lookupData.GivenTags, out var _);
}
}

Expand Down Expand Up @@ -550,11 +549,11 @@ private int LookupAggregatorStoreForDeltaWithReclaim(KeyValuePair<string, object
int index;
var givenTags = new Tags(tagKeysAndValues);

Debug.Assert(this.tagsToMetricPointIndexDictionaryDelta != null, "this.tagsToMetricPointIndexDictionaryDelta was null");
Debug.Assert(this.TagsToMetricPointIndexDictionaryDelta != null, "this.tagsToMetricPointIndexDictionaryDelta was null");

bool newMetricPointCreated = false;

if (!this.tagsToMetricPointIndexDictionaryDelta!.TryGetValue(givenTags, out var lookupData))
if (!this.TagsToMetricPointIndexDictionaryDelta!.TryGetValue(givenTags, out var lookupData))
{
if (length > 1)
{
Expand All @@ -567,7 +566,7 @@ private int LookupAggregatorStoreForDeltaWithReclaim(KeyValuePair<string, object

var sortedTags = new Tags(tempSortedTagKeysAndValues);

if (!this.tagsToMetricPointIndexDictionaryDelta.TryGetValue(sortedTags, out lookupData))
if (!this.TagsToMetricPointIndexDictionaryDelta.TryGetValue(sortedTags, out lookupData))
{
// Note: We are using storage from ThreadStatic (for up to MaxTagCacheSize tags) for both the input order of tags and the sorted order of tags,
// so we need to make a deep copy for Dictionary storage.
Expand All @@ -585,10 +584,10 @@ private int LookupAggregatorStoreForDeltaWithReclaim(KeyValuePair<string, object

Debug.Assert(this.availableMetricPoints != null, "this.availableMetricPoints was null");

lock (this.tagsToMetricPointIndexDictionaryDelta)
lock (this.TagsToMetricPointIndexDictionaryDelta)
{
// check again after acquiring lock.
if (!this.tagsToMetricPointIndexDictionaryDelta.TryGetValue(sortedTags, out lookupData))
if (!this.TagsToMetricPointIndexDictionaryDelta.TryGetValue(sortedTags, out lookupData))
{
// Check for an available MetricPoint
if (this.availableMetricPoints!.Count > 0)
Expand All @@ -612,8 +611,8 @@ private int LookupAggregatorStoreForDeltaWithReclaim(KeyValuePair<string, object
// MetricPoint, if dictionary entry found.

// Add the sorted order along with the given order of tags
this.tagsToMetricPointIndexDictionaryDelta.TryAdd(sortedTags, lookupData);
this.tagsToMetricPointIndexDictionaryDelta.TryAdd(givenTags, lookupData);
this.TagsToMetricPointIndexDictionaryDelta.TryAdd(sortedTags, lookupData);
this.TagsToMetricPointIndexDictionaryDelta.TryAdd(givenTags, lookupData);
}
}
}
Expand All @@ -631,10 +630,10 @@ private int LookupAggregatorStoreForDeltaWithReclaim(KeyValuePair<string, object

Debug.Assert(this.availableMetricPoints != null, "this.availableMetricPoints was null");

lock (this.tagsToMetricPointIndexDictionaryDelta)
lock (this.TagsToMetricPointIndexDictionaryDelta)
{
// check again after acquiring lock.
if (!this.tagsToMetricPointIndexDictionaryDelta.TryGetValue(givenTags, out lookupData))
if (!this.TagsToMetricPointIndexDictionaryDelta.TryGetValue(givenTags, out lookupData))
{
// Check for an available MetricPoint
if (this.availableMetricPoints!.Count > 0)
Expand All @@ -658,7 +657,7 @@ private int LookupAggregatorStoreForDeltaWithReclaim(KeyValuePair<string, object
// MetricPoint, if dictionary entry found.

// givenTags will always be sorted when tags length == 1
this.tagsToMetricPointIndexDictionaryDelta.TryAdd(givenTags, lookupData);
this.TagsToMetricPointIndexDictionaryDelta.TryAdd(givenTags, lookupData);
}
}
}
Expand Down Expand Up @@ -735,7 +734,7 @@ private bool TryGetAvailableMetricPointRare(
out LookupData? lookupData,
out bool newMetricPointCreated)
{
Debug.Assert(this.tagsToMetricPointIndexDictionaryDelta != null, "this.tagsToMetricPointIndexDictionaryDelta was null");
Debug.Assert(this.TagsToMetricPointIndexDictionaryDelta != null, "this.tagsToMetricPointIndexDictionaryDelta was null");
Debug.Assert(this.availableMetricPoints != null, "this.availableMetricPoints was null");

int index;
Expand All @@ -744,8 +743,8 @@ private bool TryGetAvailableMetricPointRare(
if (length > 1)
{
// check again after acquiring lock.
if (!this.tagsToMetricPointIndexDictionaryDelta!.TryGetValue(givenTags, out lookupData) &&
!this.tagsToMetricPointIndexDictionaryDelta.TryGetValue(sortedTags, out lookupData))
if (!this.TagsToMetricPointIndexDictionaryDelta!.TryGetValue(givenTags, out lookupData) &&
!this.TagsToMetricPointIndexDictionaryDelta.TryGetValue(sortedTags, out lookupData))
{
// Check for an available MetricPoint
if (this.availableMetricPoints!.Count > 0)
Expand All @@ -769,14 +768,14 @@ private bool TryGetAvailableMetricPointRare(
// MetricPoint, if dictionary entry found.

// Add the sorted order along with the given order of tags
this.tagsToMetricPointIndexDictionaryDelta.TryAdd(sortedTags, lookupData);
this.tagsToMetricPointIndexDictionaryDelta.TryAdd(givenTags, lookupData);
this.TagsToMetricPointIndexDictionaryDelta.TryAdd(sortedTags, lookupData);
this.TagsToMetricPointIndexDictionaryDelta.TryAdd(givenTags, lookupData);
}
}
else
{
// check again after acquiring lock.
if (!this.tagsToMetricPointIndexDictionaryDelta!.TryGetValue(givenTags, out lookupData))
if (!this.TagsToMetricPointIndexDictionaryDelta!.TryGetValue(givenTags, out lookupData))
{
// Check for an available MetricPoint
if (this.availableMetricPoints!.Count > 0)
Expand All @@ -800,7 +799,7 @@ private bool TryGetAvailableMetricPointRare(
// MetricPoint, if dictionary entry found.

// givenTags will always be sorted when tags length == 1
this.tagsToMetricPointIndexDictionaryDelta.TryAdd(givenTags, lookupData);
this.TagsToMetricPointIndexDictionaryDelta.TryAdd(givenTags, lookupData);
}
}

Expand All @@ -823,23 +822,23 @@ private int RemoveStaleEntriesAndGetAvailableMetricPointRare(LookupData lookupDa
// If self-claimed, then add a fresh entry to the dictionary
// If an available MetricPoint is found, then only increment the ReferenceCount

Debug.Assert(this.tagsToMetricPointIndexDictionaryDelta != null, "this.tagsToMetricPointIndexDictionaryDelta was null");
Debug.Assert(this.TagsToMetricPointIndexDictionaryDelta != null, "this.tagsToMetricPointIndexDictionaryDelta was null");

// Delete the entry for these Tags and get another MetricPoint.
lock (this.tagsToMetricPointIndexDictionaryDelta!)
lock (this.TagsToMetricPointIndexDictionaryDelta!)
{
LookupData? dictionaryValue;
if (lookupData.SortedTags != Tags.EmptyTags)
{
// Check if no other thread added a new entry for the same Tags in the meantime.
// If no, then remove the existing entries.
if (this.tagsToMetricPointIndexDictionaryDelta.TryGetValue(lookupData.SortedTags, out dictionaryValue))
if (this.TagsToMetricPointIndexDictionaryDelta.TryGetValue(lookupData.SortedTags, out dictionaryValue))
{
if (dictionaryValue == lookupData)
{
// No other thread added a new entry for the same Tags.
this.tagsToMetricPointIndexDictionaryDelta.TryRemove(lookupData.SortedTags, out _);
this.tagsToMetricPointIndexDictionaryDelta.TryRemove(lookupData.GivenTags, out _);
this.TagsToMetricPointIndexDictionaryDelta.TryRemove(lookupData.SortedTags, out _);
this.TagsToMetricPointIndexDictionaryDelta.TryRemove(lookupData.GivenTags, out _);
}
else
{
Expand All @@ -851,12 +850,12 @@ private int RemoveStaleEntriesAndGetAvailableMetricPointRare(LookupData lookupDa
}
else
{
if (this.tagsToMetricPointIndexDictionaryDelta.TryGetValue(lookupData.GivenTags, out dictionaryValue))
if (this.TagsToMetricPointIndexDictionaryDelta.TryGetValue(lookupData.GivenTags, out dictionaryValue))
{
if (dictionaryValue == lookupData)
{
// No other thread added a new entry for the same Tags.
this.tagsToMetricPointIndexDictionaryDelta.TryRemove(lookupData.GivenTags, out _);
this.TagsToMetricPointIndexDictionaryDelta.TryRemove(lookupData.GivenTags, out _);
}
else
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

using System.Collections.Concurrent;
using System.Diagnostics.Metrics;
using System.Reflection;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using OpenTelemetry.Tests;
Expand Down Expand Up @@ -286,23 +284,17 @@ private sealed class CustomExporter : BaseExporter<Metric>

private readonly bool assertNoDroppedMeasurements;

private readonly FieldInfo metricPointLookupDictionaryFieldInfo;

public CustomExporter(bool assertNoDroppedMeasurements)
{
this.assertNoDroppedMeasurements = assertNoDroppedMeasurements;

var aggregatorStoreFields = typeof(AggregatorStore).GetFields(BindingFlags.NonPublic | BindingFlags.Instance);
this.metricPointLookupDictionaryFieldInfo = aggregatorStoreFields!.FirstOrDefault(field => field.Name == "tagsToMetricPointIndexDictionaryDelta");
}

public override ExportResult Export(in Batch<Metric> batch)
{
foreach (var metric in batch)
{
var aggStore = metric.AggregatorStore;
var metricPointLookupDictionary = this.metricPointLookupDictionaryFieldInfo.GetValue(aggStore) as ConcurrentDictionary<Tags, LookupData>;

var metricPointLookupDictionary = aggStore.TagsToMetricPointIndexDictionaryDelta;
var droppedMeasurements = aggStore.DroppedMeasurements;

if (this.assertNoDroppedMeasurements)
Expand Down