Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 63 additions & 52 deletions metrics/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"reflect"
"strings"
"sync"
"sync/atomic"
)

// DuplicateMetric is the error returned by Registry.Register when a metric
Expand All @@ -16,6 +17,13 @@ func (err DuplicateMetric) Error() string {
return fmt.Sprintf("duplicate metric: %s", string(err))
}

// UnknownMetric is the error returned by Registry.Register when a metric type is unknown.
type UnknownMetric string

func (err UnknownMetric) Error() string {
return fmt.Sprintf("unknown metric: %s", string(err))
}

// A Registry holds references to a set of metrics by name and can iterate
// over them, calling callback functions provided by the user.
//
Expand Down Expand Up @@ -45,21 +53,19 @@ type Registry interface {

// Unregister the metric with the given name.
Unregister(string)

// Unregister all metrics. (Mostly for testing.)
UnregisterAll()
}

// The standard implementation of a Registry is a mutex-protected map
// The standard implementation of a Registry uses sync.map
// of names to metrics.
type StandardRegistry struct {
metrics map[string]interface{}
mutex sync.Mutex
metrics sync.Map
// not representing 1:1 map size.
size atomic.Int32
}

// Create a new registry.
func NewRegistry() Registry {
return &StandardRegistry{metrics: make(map[string]interface{})}
return &StandardRegistry{}
}

// Call the given function for each registered metric.
Expand All @@ -71,45 +77,60 @@ func (r *StandardRegistry) Each(f func(string, interface{})) {

// Get the metric by the given name or nil if none is registered.
func (r *StandardRegistry) Get(name string) interface{} {
r.mutex.Lock()
defer r.mutex.Unlock()
return r.metrics[name]
item, _ := r.metrics.Load(name)
return item
}

// Gets an existing metric or creates and registers a new one. Threadsafe
// alternative to calling Get and Register on failure.
// The interface can be the metric to register if not found in registry,
// or a function returning the metric for lazy instantiation.
func (r *StandardRegistry) GetOrRegister(name string, i interface{}) interface{} {
r.mutex.Lock()
defer r.mutex.Unlock()
if metric, ok := r.metrics[name]; ok {
return metric
// fast path
cached, ok := r.metrics.Load(name)
if ok {
return cached
}
if v := reflect.ValueOf(i); v.Kind() == reflect.Func {
i = v.Call(nil)[0].Interface()
}
r.register(name, i)
return i
if !r.isKnownType(i) {
return i
}
item, _ := r.register(name, i)
return item
}

// Register the given metric under the given name. Returns a DuplicateMetric
// if a metric by the given name is already registered.
func (r *StandardRegistry) Register(name string, i interface{}) error {
r.mutex.Lock()
defer r.mutex.Unlock()
return r.register(name, i)
// fast path
_, ok := r.metrics.Load(name)
if ok {
return DuplicateMetric(name)
}

if v := reflect.ValueOf(i); v.Kind() == reflect.Func {
i = v.Call(nil)[0].Interface()
}
if !r.isKnownType(i) {
return UnknownMetric(name)
}
_, loaded := r.register(name, i)
if loaded {
return DuplicateMetric(name)
}
return nil
}

// Run all registered healthchecks.
func (r *StandardRegistry) RunHealthchecks() {
r.mutex.Lock()
defer r.mutex.Unlock()
for _, i := range r.metrics {
if h, ok := i.(Healthcheck); ok {
r.metrics.Range(func(key, value any) bool {
if h, ok := value.(Healthcheck); ok {
h.Check()
}
}
return true
})
}

// GetAll metrics in the Registry
Expand Down Expand Up @@ -177,45 +198,40 @@ func (r *StandardRegistry) GetAll() map[string]map[string]interface{} {

// Unregister the metric with the given name.
func (r *StandardRegistry) Unregister(name string) {
r.mutex.Lock()
defer r.mutex.Unlock()
r.stop(name)
delete(r.metrics, name)
if _, ok := r.metrics.LoadAndDelete(name); ok {
r.size.Add(-1)
}
}

// Unregister all metrics. (Mostly for testing.)
func (r *StandardRegistry) UnregisterAll() {
r.mutex.Lock()
defer r.mutex.Unlock()
for name := range r.metrics {
r.stop(name)
delete(r.metrics, name)
func (r *StandardRegistry) register(name string, i interface{}) (interface{}, bool) {
item, loaded := r.metrics.LoadOrStore(name, i)
if loaded {
return item, loaded
}
r.size.Add(1)
return item, loaded
}

func (r *StandardRegistry) register(name string, i interface{}) error {
if _, ok := r.metrics[name]; ok {
return DuplicateMetric(name)
}
func (r *StandardRegistry) isKnownType(i interface{}) bool {
switch i.(type) {
case Counter, CounterFloat64, Gauge, GaugeFloat64, Healthcheck, Histogram, Meter, Timer, ResettingTimer:
r.metrics[name] = i
return true
}
return nil
return false
}

func (r *StandardRegistry) registered() map[string]interface{} {
r.mutex.Lock()
defer r.mutex.Unlock()
metrics := make(map[string]interface{}, len(r.metrics))
for name, i := range r.metrics {
metrics[name] = i
}
metrics := make(map[string]interface{}, r.size.Load())
r.metrics.Range(func(key, value any) bool {
metrics[key.(string)] = value
return true
})
return metrics
}

func (r *StandardRegistry) stop(name string) {
if i, ok := r.metrics[name]; ok {
if i, ok := r.metrics.Load(name); ok {
if s, ok := i.(Stoppable); ok {
s.Stop()
}
Expand Down Expand Up @@ -308,11 +324,6 @@ func (r *PrefixedRegistry) Unregister(name string) {
r.underlying.Unregister(realName)
}

// Unregister all metrics. (Mostly for testing.)
func (r *PrefixedRegistry) UnregisterAll() {
r.underlying.UnregisterAll()
}

var (
DefaultRegistry = NewRegistry()
EphemeralRegistry = NewRegistry()
Expand Down
25 changes: 25 additions & 0 deletions metrics/registry_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package metrics

import (
"sync"
"testing"
)

Expand All @@ -13,6 +14,30 @@ func BenchmarkRegistry(b *testing.B) {
}
}

func BenchmarkRegistryGetOrRegisterParallel_8(b *testing.B) {
benchmarkRegistryGetOrRegisterParallel(b, 8)
}

func BenchmarkRegistryGetOrRegisterParallel_32(b *testing.B) {
benchmarkRegistryGetOrRegisterParallel(b, 32)
}

func benchmarkRegistryGetOrRegisterParallel(b *testing.B, amount int) {
r := NewRegistry()
b.ResetTimer()
var wg sync.WaitGroup
for i := 0; i < amount; i++ {
wg.Add(1)
go func() {
for i := 0; i < b.N; i++ {
r.GetOrRegister("foo", NewMeter)
}
wg.Done()
}()
}
wg.Wait()
}

func TestRegistry(t *testing.T) {
r := NewRegistry()
r.Register("foo", NewCounter())
Expand Down