Skip to content

Commit 35eb165

Browse files
yichyayuhan6665
authored andcommitted
feat: metrics including pprof, expvars
1 parent 91ffb76 commit 35eb165

File tree

9 files changed

+530
-1
lines changed

9 files changed

+530
-1
lines changed

app/metrics/config.pb.go

Lines changed: 148 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

app/metrics/config.proto

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
syntax = "proto3";
2+
3+
package xray.app.metrics;
4+
option csharp_namespace = "Xray.App.Metrics";
5+
option go_package = "github.com/xtls/xray-core/app/metrics";
6+
option java_package = "com.xray.app.metrics";
7+
option java_multiple_files = true;
8+
9+
// Config is the settings for metrics.
10+
message Config {
11+
// Tag of the outbound handler that handles metrics http connections.
12+
string tag = 1;
13+
}

app/metrics/errors.generated.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package metrics
2+
3+
import "github.com/xtls/xray-core/common/errors"
4+
5+
type errPathObjHolder struct{}
6+
7+
func newError(values ...interface{}) *errors.Error {
8+
return errors.New(values...).WithPathObj(errPathObjHolder{})
9+
}

app/metrics/metrics.go

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
package metrics
2+
3+
import (
4+
"context"
5+
"expvar"
6+
"net/http"
7+
_ "net/http/pprof"
8+
"strings"
9+
10+
"github.com/xtls/xray-core/app/observatory"
11+
"github.com/xtls/xray-core/app/stats"
12+
"github.com/xtls/xray-core/common"
13+
"github.com/xtls/xray-core/common/net"
14+
"github.com/xtls/xray-core/common/signal/done"
15+
"github.com/xtls/xray-core/core"
16+
"github.com/xtls/xray-core/features/extension"
17+
"github.com/xtls/xray-core/features/outbound"
18+
feature_stats "github.com/xtls/xray-core/features/stats"
19+
)
20+
21+
type MetricsHandler struct {
22+
ohm outbound.Manager
23+
statsManager feature_stats.Manager
24+
observatory extension.Observatory
25+
tag string
26+
}
27+
28+
// NewMetricsHandler creates a new MetricsHandler based on the given config.
29+
func NewMetricsHandler(ctx context.Context, config *Config) (*MetricsHandler, error) {
30+
c := &MetricsHandler{
31+
tag: config.Tag,
32+
}
33+
common.Must(core.RequireFeatures(ctx, func(om outbound.Manager, sm feature_stats.Manager) {
34+
c.statsManager = sm
35+
c.ohm = om
36+
}))
37+
expvar.Publish("stats", expvar.Func(func() interface{} {
38+
manager, ok := c.statsManager.(*stats.Manager)
39+
if !ok {
40+
return nil
41+
}
42+
var resp = map[string]map[string]map[string]int64{
43+
"inbound": {},
44+
"outbound": {},
45+
"user": {},
46+
}
47+
manager.VisitCounters(func(name string, counter feature_stats.Counter) bool {
48+
nameSplit := strings.Split(name, ">>>")
49+
typeName, tagOrUser, direction := nameSplit[0], nameSplit[1], nameSplit[3]
50+
if item, found := resp[typeName][tagOrUser]; found {
51+
item[direction] = counter.Value()
52+
} else {
53+
resp[typeName][tagOrUser] = map[string]int64{
54+
direction: counter.Value(),
55+
}
56+
}
57+
return true
58+
})
59+
return resp
60+
}))
61+
expvar.Publish("observatory", expvar.Func(func() interface{} {
62+
if c.observatory == nil {
63+
common.Must(core.RequireFeatures(ctx, func(observatory extension.Observatory) error {
64+
c.observatory = observatory
65+
return nil
66+
}))
67+
}
68+
var resp = map[string]*observatory.OutboundStatus{}
69+
if o, err := c.observatory.GetObservation(context.Background()); err != nil {
70+
return err
71+
} else {
72+
for _, x := range o.(*observatory.ObservationResult).GetStatus() {
73+
resp[x.OutboundTag] = x
74+
}
75+
}
76+
return resp
77+
}))
78+
return c, nil
79+
}
80+
81+
func (p *MetricsHandler) Type() interface{} {
82+
return (*MetricsHandler)(nil)
83+
}
84+
85+
func (p *MetricsHandler) Start() error {
86+
listener := &OutboundListener{
87+
buffer: make(chan net.Conn, 4),
88+
done: done.New(),
89+
}
90+
91+
go func() {
92+
if err := http.Serve(listener, http.DefaultServeMux); err != nil {
93+
newError("failed to start metrics server").Base(err).AtError().WriteToLog()
94+
}
95+
}()
96+
97+
if err := p.ohm.RemoveHandler(context.Background(), p.tag); err != nil {
98+
newError("failed to remove existing handler").WriteToLog()
99+
}
100+
101+
return p.ohm.AddHandler(context.Background(), &Outbound{
102+
tag: p.tag,
103+
listener: listener,
104+
})
105+
}
106+
107+
func (p *MetricsHandler) Close() error {
108+
return nil
109+
}
110+
111+
func init() {
112+
common.Must(common.RegisterConfig((*Config)(nil), func(ctx context.Context, cfg interface{}) (interface{}, error) {
113+
return NewMetricsHandler(ctx, cfg.(*Config))
114+
}))
115+
}

app/metrics/outbound.go

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
package metrics
2+
3+
import (
4+
"context"
5+
"sync"
6+
7+
"github.com/xtls/xray-core/common"
8+
"github.com/xtls/xray-core/common/net"
9+
"github.com/xtls/xray-core/common/net/cnc"
10+
"github.com/xtls/xray-core/common/signal/done"
11+
"github.com/xtls/xray-core/transport"
12+
)
13+
14+
// OutboundListener is a net.Listener for listening pprof http connections.
15+
type OutboundListener struct {
16+
buffer chan net.Conn
17+
done *done.Instance
18+
}
19+
20+
func (l *OutboundListener) add(conn net.Conn) {
21+
select {
22+
case l.buffer <- conn:
23+
case <-l.done.Wait():
24+
conn.Close()
25+
default:
26+
conn.Close()
27+
}
28+
}
29+
30+
// Accept implements net.Listener.
31+
func (l *OutboundListener) Accept() (net.Conn, error) {
32+
select {
33+
case <-l.done.Wait():
34+
return nil, newError("listen closed")
35+
case c := <-l.buffer:
36+
return c, nil
37+
}
38+
}
39+
40+
// Close implement net.Listener.
41+
func (l *OutboundListener) Close() error {
42+
common.Must(l.done.Close())
43+
L:
44+
for {
45+
select {
46+
case c := <-l.buffer:
47+
c.Close()
48+
default:
49+
break L
50+
}
51+
}
52+
return nil
53+
}
54+
55+
// Addr implements net.Listener.
56+
func (l *OutboundListener) Addr() net.Addr {
57+
return &net.TCPAddr{
58+
IP: net.IP{0, 0, 0, 0},
59+
Port: 0,
60+
}
61+
}
62+
63+
// Outbound is a outbound.Handler that handles pprof http connections.
64+
type Outbound struct {
65+
tag string
66+
listener *OutboundListener
67+
access sync.RWMutex
68+
closed bool
69+
}
70+
71+
// Dispatch implements outbound.Handler.
72+
func (co *Outbound) Dispatch(ctx context.Context, link *transport.Link) {
73+
co.access.RLock()
74+
75+
if co.closed {
76+
common.Interrupt(link.Reader)
77+
common.Interrupt(link.Writer)
78+
co.access.RUnlock()
79+
return
80+
}
81+
82+
closeSignal := done.New()
83+
c := cnc.NewConnection(cnc.ConnectionInputMulti(link.Writer), cnc.ConnectionOutputMulti(link.Reader), cnc.ConnectionOnClose(closeSignal))
84+
co.listener.add(c)
85+
co.access.RUnlock()
86+
<-closeSignal.Wait()
87+
}
88+
89+
// Tag implements outbound.Handler.
90+
func (co *Outbound) Tag() string {
91+
return co.tag
92+
}
93+
94+
// Start implements common.Runnable.
95+
func (co *Outbound) Start() error {
96+
co.access.Lock()
97+
co.closed = false
98+
co.access.Unlock()
99+
return nil
100+
}
101+
102+
// Close implements common.Closable.
103+
func (co *Outbound) Close() error {
104+
co.access.Lock()
105+
defer co.access.Unlock()
106+
107+
co.closed = true
108+
return co.listener.Close()
109+
}

0 commit comments

Comments
 (0)