Skip to content

Commit 0854d3f

Browse files
QardStephen Belanger
authored andcommitted
feat: add support for Distributed Tracing (#538)
1 parent bcdc458 commit 0854d3f

File tree

16 files changed

+835
-112
lines changed

16 files changed

+835
-112
lines changed

docs/agent-api.asciidoc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1027,7 +1027,7 @@ NOTE: `apm.middleware.connect` _must_ be added to the middleware stack _before_
10271027

10281028
[source,js]
10291029
----
1030-
var transaction = apm.startTransaction([name][, type])
1030+
var transaction = apm.startTransaction([name][, type][, traceparent])
10311031
----
10321032

10331033
Start a new transaction.
@@ -1042,6 +1042,8 @@ Defaults to `unnamed`
10421042
You can always set this later via <<transaction-type,`transaction.type`>>.
10431043
Defaults to `custom`
10441044

1045+
* `traceparent` - The traceparent header received from a remote service (string).
1046+
10451047
Use this function to create a custom transaction.
10461048
Note that the agent will do this for you automatically when ever your application receives an incoming HTTP request.
10471049
You only need to use this function to create custom transactions.

lib/agent.js

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,12 @@ Object.defineProperty(Agent.prototype, 'currentTransaction', {
5454
}
5555
})
5656

57+
Object.defineProperty(Agent.prototype, 'currentSpan', {
58+
get () {
59+
return this._instrumentation.currentSpan
60+
}
61+
})
62+
5763
Agent.prototype.destroy = function () {
5864
if (this._apmServer) this._apmServer.destroy()
5965
}
@@ -242,8 +248,9 @@ Agent.prototype.captureError = function (err, opts, cb) {
242248

243249
var agent = this
244250
var trans = this.currentTransaction
251+
var span = this.currentSpan
245252
var timestamp = Date.now() * 1000
246-
var id = crypto.randomBytes(128 / 8).toString('hex')
253+
var context = (span || trans || {}).context || {}
247254
var req = opts && opts.request instanceof IncomingMessage
248255
? opts.request
249256
: trans && trans.req
@@ -271,7 +278,9 @@ Agent.prototype.captureError = function (err, opts, cb) {
271278
}
272279

273280
function prepareError (error) {
274-
error.id = id
281+
error.id = crypto.randomBytes(16).toString('hex')
282+
error.parent_id = context.id
283+
error.trace_id = context.traceId
275284
error.timestamp = timestamp
276285
error.context = {
277286
user: Object.assign(
@@ -291,7 +300,8 @@ Agent.prototype.captureError = function (err, opts, cb) {
291300
opts && opts.custom
292301
)
293302
}
294-
if (trans) error.transaction = { id: trans.id }
303+
304+
if (trans) error.transaction_id = trans.id
295305

296306
if (error.exception) {
297307
error.exception.handled = !opts || opts.handled
@@ -345,21 +355,21 @@ Agent.prototype.captureError = function (err, opts, cb) {
345355
error = agent._errorFilters.process(error)
346356

347357
if (!error) {
348-
agent.logger.debug('error ignored by filter %o', { id: id })
349-
if (cb) cb(null, id)
358+
agent.logger.debug('error ignored by filter %o', { id: error.id })
359+
if (cb) cb(null, error.id)
350360
return
351361
}
352362

353363
if (agent._apmServer) {
354-
agent.logger.info(`Sending error ${id} to Elastic APM`)
364+
agent.logger.info(`Sending error to Elastic APM`, { id: error.id })
355365
agent._apmServer.sendError(error, function () {
356366
agent._apmServer.flush(function (err) {
357-
if (cb) cb(err, id)
367+
if (cb) cb(err, error.id)
358368
})
359369
})
360370
} else if (cb) {
361371
// TODO: Swallow this error just as it's done in agent.flush()?
362-
process.nextTick(cb.bind(null, new Error('cannot capture error before agent is started'), id))
372+
process.nextTick(cb.bind(null, new Error('cannot capture error before agent is started'), error.id))
363373
}
364374
}
365375
}

lib/instrumentation/async-hooks.js

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,29 @@
33
const asyncHooks = require('async_hooks')
44

55
module.exports = function (ins) {
6-
const asyncHook = asyncHooks.createHook({ init, destroy })
7-
const transactions = new Map()
6+
const asyncHook = asyncHooks.createHook({ init, before, destroy })
87

8+
const activeTransactions = new Map()
99
Object.defineProperty(ins, 'currentTransaction', {
1010
get () {
1111
const asyncId = asyncHooks.executionAsyncId()
12-
return transactions.has(asyncId) ? transactions.get(asyncId) : null
12+
return activeTransactions.has(asyncId) ? activeTransactions.get(asyncId) : null
1313
},
1414
set (trans) {
1515
const asyncId = asyncHooks.executionAsyncId()
16-
transactions.set(asyncId, trans)
16+
activeTransactions.set(asyncId, trans)
17+
}
18+
})
19+
20+
const activeSpans = new Map()
21+
Object.defineProperty(ins, 'activeSpan', {
22+
get () {
23+
const asyncId = asyncHooks.executionAsyncId()
24+
return activeSpans.has(asyncId) ? activeSpans.get(asyncId) : null
25+
},
26+
set (span) {
27+
const asyncId = asyncHooks.executionAsyncId()
28+
activeSpans.set(asyncId, span)
1729
}
1830
})
1931

@@ -25,11 +37,21 @@ module.exports = function (ins) {
2537
// type, which will init for each scheduled timer.
2638
if (type === 'TIMERWRAP') return
2739

28-
transactions.set(asyncId, ins.currentTransaction)
40+
activeTransactions.set(asyncId, ins.currentTransaction)
41+
activeSpans.set(asyncId, ins.bindingSpan || ins.activeSpan)
42+
}
43+
44+
function before (asyncId) {
45+
ins.bindingSpan = null
2946
}
3047

3148
function destroy (asyncId) {
32-
if (!transactions.has(asyncId)) return // in case type === TIMERWRAP
33-
transactions.delete(asyncId)
49+
// in case type === TIMERWRAP
50+
if (activeTransactions.has(asyncId)) {
51+
activeTransactions.delete(asyncId)
52+
}
53+
if (activeSpans.has(asyncId)) {
54+
activeSpans.delete(asyncId)
55+
}
3456
}
3557
}

lib/instrumentation/http-shared.js

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ exports.instrumentRequest = function (agent, moduleName) {
1818
// don't leak previous transaction
1919
agent._instrumentation.currentTransaction = null
2020
} else {
21-
var trans = agent.startTransaction()
21+
var traceparent = req.headers['elastic-apm-traceparent']
22+
var trans = agent.startTransaction(null, null, traceparent)
2223
trans.type = 'request'
2324
trans.req = req
2425
trans.res = res
@@ -78,19 +79,52 @@ function isRequestBlacklisted (agent, req) {
7879
return false
7980
}
8081

82+
// NOTE: This will also stringify and parse URL instances
83+
// to a format which can be mixed into the options object.
84+
function ensureUrl (v) {
85+
if (typeof v === 'string' || v instanceof url.URL) {
86+
return url.parse(String(v))
87+
} else {
88+
return v
89+
}
90+
}
91+
8192
exports.traceOutgoingRequest = function (agent, moduleName) {
8293
var spanType = 'ext.' + moduleName + '.http'
8394
var ins = agent._instrumentation
8495

8596
return function (orig) {
86-
return function () {
97+
return function (...args) {
8798
var span = agent.buildSpan()
8899
var id = span && span.transaction.id
89100

90101
agent.logger.debug('intercepted call to %s.request %o', moduleName, { id: id })
91102

92-
var req = orig.apply(this, arguments)
103+
var options = {}
104+
var newArgs = [ options ]
105+
for (let arg of args) {
106+
if (typeof arg === 'function') {
107+
newArgs.push(arg)
108+
} else {
109+
Object.assign(options, ensureUrl(arg))
110+
}
111+
}
112+
113+
if (!options.headers) options.headers = {}
114+
115+
// Attempt to use the span context as a traceparent header.
116+
// If the transaction is unsampled the span will not exist,
117+
// however a traceparent header must still be propagated
118+
// to indicate requested services should not be sampled.
119+
// Use the transaction context as the parent, in this case.
120+
var parent = span || agent.currentTransaction
121+
if (parent && parent.context) {
122+
options.headers['elastic-apm-traceparent'] = parent.context.toString()
123+
}
124+
125+
var req = orig.apply(this, newArgs)
93126
if (!span) return req
127+
94128
if (req._headers.host === agent._conf.serverHost) {
95129
agent.logger.debug('ignore %s request to intake API %o', moduleName, { id: id })
96130
return req

lib/instrumentation/index.js

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,18 @@ function Instrumentation (agent) {
4646
this._hook = null // this._hook is only exposed for testing purposes
4747
this._started = false
4848
this.currentTransaction = null
49+
50+
// Span for binding callbacks
51+
this.bindingSpan = null
52+
53+
// Span which is actively bound
54+
this.activeSpan = null
55+
56+
Object.defineProperty(this, 'currentSpan', {
57+
get () {
58+
return this.bindingSpan || this.activeSpan
59+
}
60+
})
4961
}
5062

5163
Instrumentation.modules = Object.freeze(MODULES)
@@ -130,8 +142,8 @@ Instrumentation.prototype.addEndedSpan = function (span) {
130142
}
131143
}
132144

133-
Instrumentation.prototype.startTransaction = function (name, type) {
134-
return new Transaction(this._agent, name, type)
145+
Instrumentation.prototype.startTransaction = function (name, type, traceparent) {
146+
return new Transaction(this._agent, name, type, traceparent)
135147
}
136148

137149
Instrumentation.prototype.endTransaction = function (result) {
@@ -174,17 +186,20 @@ Instrumentation.prototype.bindFunction = function (original) {
174186

175187
var ins = this
176188
var trans = this.currentTransaction
189+
var span = this.currentSpan
177190
if (trans && !trans.sampled) {
178191
return original
179192
}
180193

181194
return elasticAPMCallbackWrapper
182195

183196
function elasticAPMCallbackWrapper () {
184-
var prev = ins.currentTransaction
197+
var prevTrans = ins.currentTransaction
185198
ins.currentTransaction = trans
199+
ins.bindingSpan = null
200+
ins.activeSpan = span
186201
var result = original.apply(this, arguments)
187-
ins.currentTransaction = prev
202+
ins.currentTransaction = prevTrans
188203
return result
189204
}
190205
}

lib/instrumentation/span.js

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
'use strict'
22

3-
var crypto = require('crypto')
4-
53
var afterAll = require('after-all-results')
64
var Value = require('async-value-promise')
75

@@ -14,7 +12,6 @@ const TEST = process.env.ELASTIC_APM_TEST
1412
module.exports = Span
1513

1614
function Span (transaction) {
17-
this.id = crypto.randomBytes(64 / 8).toString('hex') // TODO: Replace with correct id once OT is ready
1815
this.transaction = transaction
1916
this.started = false
2017
this.ended = false
@@ -26,9 +23,18 @@ function Span (transaction) {
2623
this._stackObj = null
2724
this._agent = transaction._agent
2825

26+
var current = this._agent._instrumentation.activeSpan || transaction
27+
this.context = current.context.child()
28+
2929
this._agent.logger.debug('init span %o', { id: this.transaction.id })
3030
}
3131

32+
Object.defineProperty(Span.prototype, 'id', {
33+
get () {
34+
return this.context.id
35+
}
36+
})
37+
3238
Span.prototype.start = function (name, type) {
3339
if (this.started) {
3440
this._agent.logger.debug('tried to call span.start() on already started span %o', { id: this.transaction.id, name: this.name, type: this.type })
@@ -39,6 +45,8 @@ Span.prototype.start = function (name, type) {
3945
this.name = name || this.name || 'unnamed'
4046
this.type = type || this.type || 'custom'
4147

48+
this._agent._instrumentation.bindingSpan = this
49+
4250
if (this._agent._conf.captureSpanStackTraces && !this._stackObj) {
4351
this._recordStackTrace()
4452
}
@@ -140,10 +148,10 @@ Span.prototype._encode = function (cb) {
140148
}
141149

142150
var payload = {
143-
id: self.id,
151+
id: self.context.id,
144152
transaction_id: self.transaction.id,
145-
parent_id: self.transaction.id,
146-
trace_id: self.transaction.traceId,
153+
parent_id: self.context.parentId,
154+
trace_id: self.context.traceId,
147155
name: self.name,
148156
type: self.type,
149157
timestamp: self.timestamp,

0 commit comments

Comments
 (0)