Skip to content

Commit 7dc64ff

Browse files
committed
Add traceparent/traceId support
Signed-off-by: Sagar Upadhyaya <[email protected]>
1 parent 8dd1286 commit 7dc64ff

File tree

15 files changed

+204
-15
lines changed

15 files changed

+204
-15
lines changed

libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/DefaultTracer.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,9 @@ protected void addDefaultAttributes(Span span) {
103103

104104
@Override
105105
public Span startSpan(SpanCreationContext spanCreationContext, Map<String, Collection<String>> headers) {
106+
if (spanCreationContext.getParent() != null) {
107+
startSpan(spanCreationContext); // If already contains a parentSpan, we don't need to create a new one below.
108+
}
106109
Optional<Span> propagatedSpan = tracingTelemetry.getContextPropagator().extractFromHeaders(headers);
107110
return startSpan(spanCreationContext.parent(propagatedSpan.map(SpanContext::new).orElse(null)));
108111
}

plugins/telemetry-otel/src/internalClusterTest/java/org/opensearch/telemetry/tracing/TelemetryTracerEnabledSanityIT.java

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,20 +11,24 @@
1111
import org.opensearch.common.settings.Settings;
1212
import org.opensearch.common.unit.TimeValue;
1313
import org.opensearch.plugins.Plugin;
14+
import org.opensearch.tasks.Task;
1415
import org.opensearch.telemetry.IntegrationTestOTelTelemetryPlugin;
1516
import org.opensearch.telemetry.OTelTelemetrySettings;
1617
import org.opensearch.telemetry.TelemetrySettings;
1718
import org.opensearch.telemetry.tracing.attributes.Attributes;
1819
import org.opensearch.test.OpenSearchIntegTestCase;
1920
import org.opensearch.test.telemetry.tracing.TelemetryValidators;
2021
import org.opensearch.test.telemetry.tracing.validators.AllSpansAreEndedProperly;
22+
import org.opensearch.test.telemetry.tracing.validators.AllSpansHaveCorrectTraceId;
2123
import org.opensearch.test.telemetry.tracing.validators.AllSpansHaveUniqueId;
2224
import org.opensearch.test.telemetry.tracing.validators.NumberOfTraceIDsEqualToRequests;
2325
import org.opensearch.test.telemetry.tracing.validators.TotalRootSpansEqualToRequests;
2426
import org.opensearch.transport.client.Client;
2527

2628
import java.util.Arrays;
2729
import java.util.Collection;
30+
import java.util.HashMap;
31+
import java.util.Map;
2832

2933
import static org.opensearch.index.query.QueryBuilders.queryStringQuery;
3034

@@ -74,9 +78,22 @@ public void testSanityChecksWhenTracingEnabled() throws Exception {
7478
ensureGreen();
7579
refresh();
7680

81+
Map<String, String> headers = new HashMap<>();
82+
headers.put(Task.TRACE_PARENT, "00-19d538d7c42d09240be001d1e4ff6203-0651eba1347dceea-01");
83+
7784
// Make the search calls; adding the searchType and PreFilterShardSize to make the query path predictable across all the runs.
78-
client.prepareSearch().setSearchType("dfs_query_then_fetch").setPreFilterShardSize(2).setQuery(queryStringQuery("fox")).get();
79-
client.prepareSearch().setSearchType("dfs_query_then_fetch").setPreFilterShardSize(2).setQuery(queryStringQuery("jumps")).get();
85+
client.filterWithHeader(headers)
86+
.prepareSearch()
87+
.setSearchType("dfs_query_then_fetch")
88+
.setPreFilterShardSize(2)
89+
.setQuery(queryStringQuery("fox"))
90+
.get();
91+
client.filterWithHeader(headers)
92+
.prepareSearch()
93+
.setSearchType("dfs_query_then_fetch")
94+
.setPreFilterShardSize(2)
95+
.setQuery(queryStringQuery("jumps"))
96+
.get();
8097

8198
// Sleep for about 3s to wait for traces are published, delay is (the delay is 1s).
8299
Thread.sleep(3000);
@@ -86,7 +103,11 @@ public void testSanityChecksWhenTracingEnabled() throws Exception {
86103
new AllSpansAreEndedProperly(),
87104
new AllSpansHaveUniqueId(),
88105
new NumberOfTraceIDsEqualToRequests(Attributes.create().addAttribute("action", "indices:data/read/search[phase/query]")),
89-
new TotalRootSpansEqualToRequests()
106+
new TotalRootSpansEqualToRequests(),
107+
new AllSpansHaveCorrectTraceId(
108+
"19d538d7c42d09240be001d1e4ff6203",
109+
Attributes.create().addAttribute("action", "indices:data/read/search[phase/query]")
110+
)
90111
)
91112
);
92113

server/src/internalClusterTest/java/org/opensearch/action/admin/cluster/node/tasks/TasksIT.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,9 @@
125125
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, minNumDataNodes = 2)
126126
public class TasksIT extends AbstractTasksIT {
127127

128+
private static final String SAMPLE_TRACE_PARENT_HEADER = "00-19d538d7c42d09240be001d1e4ff6203-0651eba1347dceea-01";
129+
private static final String SAMPLE_TRACE_ID_HEADER = "19d538d7c42d09240be001d1e4ff6203";
130+
128131
protected final TaskInfo taskInfo = new TaskInfo(
129132
new TaskId("fake", 1),
130133
"test_type",
@@ -379,6 +382,11 @@ public void testSearchTaskDescriptions() {
379382

380383
Map<String, String> headers = new HashMap<>();
381384
headers.put(Task.X_OPAQUE_ID, "my_id");
385+
// Add trace parent request header
386+
headers.put(Task.TRACE_PARENT, SAMPLE_TRACE_PARENT_HEADER);
387+
// Add traceId. This is explictly not allowed to be passed from outside, but for this test we add this to verify its correct
388+
// propagation.
389+
headers.put(Task.TRACE_ID, SAMPLE_TRACE_ID_HEADER);
382390
headers.put("Foo-Header", "bar");
383391
headers.put("Custom-Task-Header", "my_value");
384392
assertSearchResponse(client().filterWithHeader(headers).prepareSearch("test").setQuery(QueryBuilders.matchAllQuery()).get());
@@ -427,6 +435,7 @@ public void testSearchTaskHeaderLimit() {
427435

428436
Map<String, String> headers = new HashMap<>();
429437
headers.put(Task.X_OPAQUE_ID, "my_id");
438+
headers.put(Task.TRACE_PARENT, SAMPLE_TRACE_PARENT_HEADER);
430439
headers.put("Custom-Task-Header", randomAlphaOfLengthBetween(maxSize, maxSize + 100));
431440
IllegalArgumentException ex = expectThrows(
432441
IllegalArgumentException.class,
@@ -436,9 +445,15 @@ public void testSearchTaskHeaderLimit() {
436445
}
437446

438447
private void assertTaskHeaders(TaskInfo taskInfo) {
439-
assertThat(taskInfo.getHeaders().keySet(), hasSize(2));
448+
assertThat(taskInfo.getHeaders().keySet(), hasSize(4));
440449
assertEquals("my_id", taskInfo.getHeaders().get(Task.X_OPAQUE_ID));
450+
assertEquals(SAMPLE_TRACE_ID_HEADER, taskInfo.getHeaders().get(Task.TRACE_ID));
441451
assertEquals("my_value", taskInfo.getHeaders().get("Custom-Task-Header"));
452+
if (taskInfo.getParentTaskId() != null) {
453+
assertTrue(taskInfo.getHeaders().get(Task.TRACE_PARENT).contains(SAMPLE_TRACE_ID_HEADER));
454+
} else {
455+
assertEquals(SAMPLE_TRACE_PARENT_HEADER, taskInfo.getHeaders().get(Task.TRACE_PARENT));
456+
}
442457
}
443458

444459
/**

server/src/main/java/org/opensearch/action/ActionModule.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -590,6 +590,7 @@ public ActionModule(
590590
actionPlugins.stream().flatMap(p -> p.getRestHeaders().stream()),
591591
Stream.of(
592592
new RestHeaderDefinition(Task.X_OPAQUE_ID, false),
593+
new RestHeaderDefinition(Task.TRACE_PARENT, false),
593594
new RestHeaderDefinition(WorkloadGroupTask.WORKLOAD_GROUP_ID_HEADER, false)
594595
)
595596
).collect(Collectors.toSet());

server/src/main/java/org/opensearch/action/search/SearchRequestSlowLog.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,7 @@ private static Map<String, Object> prepareMap(
196196
}
197197

198198
messageFields.put("id", context.getTask().getHeader(Task.X_OPAQUE_ID));
199+
messageFields.put("trace-id", context.getTask().getHeader(Task.TRACE_ID));
199200
return messageFields;
200201
}
201202

@@ -222,6 +223,11 @@ private static String message(SearchPhaseContext context, long tookInNanos, Sear
222223
} else {
223224
sb.append("id[]");
224225
}
226+
if (context.getTask().getHeader(Task.TRACE_ID) != null) {
227+
sb.append("trace-id[").append(context.getTask().getHeader(Task.TRACE_ID)).append("]");
228+
} else {
229+
sb.append("trace-id[]");
230+
}
225231
return sb.toString();
226232
}
227233

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.common.util;
10+
11+
/**
12+
* Util class to extract trace id from traceparent header
13+
*/
14+
public class TraceUtil {
15+
16+
/**
17+
* Extract trace id from traceparent header
18+
* @param traceparent traceparent header value
19+
* @return trace id
20+
*/
21+
public static String extractTraceId(String traceparent) {
22+
if (traceparent == null || traceparent.length() < 55) {
23+
throw new IllegalArgumentException("invalid traceparent");
24+
}
25+
return traceparent.split("-")[1];
26+
}
27+
}

server/src/main/java/org/opensearch/common/util/concurrent/ThreadContext.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -154,13 +154,13 @@ public StoredContext stashContext() {
154154

155155
ThreadContextStruct threadContextStruct = DEFAULT_CONTEXT.putPersistent(context.persistentHeaders);
156156

157-
if (context.requestHeaders.containsKey(Task.X_OPAQUE_ID)) {
158-
threadContextStruct = threadContextStruct.putHeaders(
159-
MapBuilder.<String, String>newMapBuilder()
160-
.put(Task.X_OPAQUE_ID, context.requestHeaders.get(Task.X_OPAQUE_ID))
161-
.immutableMap()
162-
);
157+
MapBuilder<String, String> builder = MapBuilder.newMapBuilder();
158+
for (String requestHeader : Task.REQUEST_HEADERS) {
159+
if (context.requestHeaders.containsKey(requestHeader)) {
160+
builder.put(requestHeader, context.requestHeaders.get(requestHeader));
161+
}
163162
}
163+
threadContextStruct = threadContextStruct.putHeaders(builder.immutableMap());
164164

165165
final Map<String, Object> transientHeaders = propagateTransients(context.transientHeaders, context.isSystemContext);
166166
if (!transientHeaders.isEmpty()) {
@@ -697,6 +697,7 @@ private ThreadContextStruct(
697697
this.persistentHeaders = persistentHeaders;
698698
this.isSystemContext = isSystemContext;
699699
this.warningHeadersSize = warningHeadersSize;
700+
700701
}
701702

702703
/**

server/src/main/java/org/opensearch/http/DefaultRestChannel.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import java.util.List;
5555
import java.util.Map;
5656

57+
import static org.opensearch.tasks.Task.TRACE_PARENT;
5758
import static org.opensearch.tasks.Task.X_OPAQUE_ID;
5859

5960
/**
@@ -150,6 +151,9 @@ public void sendResponse(RestResponse restResponse) {
150151
if (opaque != null) {
151152
setHeaderField(httpResponse, X_OPAQUE_ID, opaque);
152153
}
154+
if (request.header(TRACE_PARENT) != null) {
155+
setHeaderField(httpResponse, TRACE_PARENT, request.header(TRACE_PARENT));
156+
}
153157

154158
// Add all custom headers
155159
addCustomHeaders(httpResponse, restResponse.getHeaders());

server/src/main/java/org/opensearch/index/SearchSlowLog.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,7 @@ private static Map<String, Object> prepareMap(SearchContext context, long tookIn
247247
}
248248

249249
messageFields.put("id", context.getTask().getHeader(Task.X_OPAQUE_ID));
250+
messageFields.put("trace-id", context.getTask().getHeader(Task.TRACE_ID));
250251
return messageFields;
251252
}
252253

@@ -290,6 +291,11 @@ private static String message(SearchContext context, long tookInNanos) {
290291
} else {
291292
sb.append("id[], ");
292293
}
294+
if (context.getTask().getHeader(Task.TRACE_ID) != null) {
295+
sb.append("trace-id[").append(context.getTask().getHeader(Task.TRACE_ID)).append("]");
296+
} else {
297+
sb.append("trace-id[], ");
298+
}
293299
return sb.toString();
294300
}
295301

server/src/main/java/org/opensearch/node/Node.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1271,7 +1271,7 @@ protected Node(final Environment initialEnvironment, Collection<PluginInfo> clas
12711271

12721272
Set<String> taskHeaders = Stream.concat(
12731273
pluginsService.filterPlugins(ActionPlugin.class).stream().flatMap(p -> p.getTaskHeaders().stream()),
1274-
Stream.of(Task.X_OPAQUE_ID)
1274+
Task.REQUEST_HEADERS.stream()
12751275
).collect(Collectors.toSet());
12761276

12771277
final TransportService transportService = newTransportService(

0 commit comments

Comments
 (0)