Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ protected void addDefaultAttributes(Span span) {

@Override
public Span startSpan(SpanCreationContext spanCreationContext, Map<String, Collection<String>> headers) {
if (spanCreationContext.getParent() != null) {
startSpan(spanCreationContext); // If already contains a parentSpan, we don't need to create a new one below.
}
Optional<Span> propagatedSpan = tracingTelemetry.getContextPropagator().extractFromHeaders(headers);
return startSpan(spanCreationContext.parent(propagatedSpan.map(SpanContext::new).orElse(null)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,24 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.plugins.Plugin;
import org.opensearch.tasks.Task;
import org.opensearch.telemetry.IntegrationTestOTelTelemetryPlugin;
import org.opensearch.telemetry.OTelTelemetrySettings;
import org.opensearch.telemetry.TelemetrySettings;
import org.opensearch.telemetry.tracing.attributes.Attributes;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.telemetry.tracing.TelemetryValidators;
import org.opensearch.test.telemetry.tracing.validators.AllSpansAreEndedProperly;
import org.opensearch.test.telemetry.tracing.validators.AllSpansHaveCorrectTraceId;
import org.opensearch.test.telemetry.tracing.validators.AllSpansHaveUniqueId;
import org.opensearch.test.telemetry.tracing.validators.NumberOfTraceIDsEqualToRequests;
import org.opensearch.test.telemetry.tracing.validators.TotalRootSpansEqualToRequests;
import org.opensearch.transport.client.Client;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

import static org.opensearch.index.query.QueryBuilders.queryStringQuery;

Expand Down Expand Up @@ -74,9 +78,22 @@ public void testSanityChecksWhenTracingEnabled() throws Exception {
ensureGreen();
refresh();

Map<String, String> headers = new HashMap<>();
headers.put(Task.TRACE_PARENT, "00-19d538d7c42d09240be001d1e4ff6203-0651eba1347dceea-01");

// Make the search calls; adding the searchType and PreFilterShardSize to make the query path predictable across all the runs.
client.prepareSearch().setSearchType("dfs_query_then_fetch").setPreFilterShardSize(2).setQuery(queryStringQuery("fox")).get();
client.prepareSearch().setSearchType("dfs_query_then_fetch").setPreFilterShardSize(2).setQuery(queryStringQuery("jumps")).get();
client.filterWithHeader(headers)
.prepareSearch()
.setSearchType("dfs_query_then_fetch")
.setPreFilterShardSize(2)
.setQuery(queryStringQuery("fox"))
.get();
client.filterWithHeader(headers)
.prepareSearch()
.setSearchType("dfs_query_then_fetch")
.setPreFilterShardSize(2)
.setQuery(queryStringQuery("jumps"))
.get();

// Sleep for about 3s to wait for traces are published, delay is (the delay is 1s).
Thread.sleep(3000);
Expand All @@ -86,7 +103,11 @@ public void testSanityChecksWhenTracingEnabled() throws Exception {
new AllSpansAreEndedProperly(),
new AllSpansHaveUniqueId(),
new NumberOfTraceIDsEqualToRequests(Attributes.create().addAttribute("action", "indices:data/read/search[phase/query]")),
new TotalRootSpansEqualToRequests()
new TotalRootSpansEqualToRequests(),
new AllSpansHaveCorrectTraceId(
"19d538d7c42d09240be001d1e4ff6203",
Attributes.create().addAttribute("action", "indices:data/read/search[phase/query]")
)
)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, minNumDataNodes = 2)
public class TasksIT extends AbstractTasksIT {

private static final String SAMPLE_TRACE_PARENT_HEADER = "00-19d538d7c42d09240be001d1e4ff6203-0651eba1347dceea-01";
private static final String SAMPLE_TRACE_ID_HEADER = "19d538d7c42d09240be001d1e4ff6203";

protected final TaskInfo taskInfo = new TaskInfo(
new TaskId("fake", 1),
"test_type",
Expand Down Expand Up @@ -379,6 +382,11 @@ public void testSearchTaskDescriptions() {

Map<String, String> headers = new HashMap<>();
headers.put(Task.X_OPAQUE_ID, "my_id");
// Add trace parent request header
headers.put(Task.TRACE_PARENT, SAMPLE_TRACE_PARENT_HEADER);
// Add traceId. This is explictly not allowed to be passed from outside, but for this test we add this to verify its correct
// propagation.
headers.put(Task.TRACE_ID, SAMPLE_TRACE_ID_HEADER);
headers.put("Foo-Header", "bar");
headers.put("Custom-Task-Header", "my_value");
assertSearchResponse(client().filterWithHeader(headers).prepareSearch("test").setQuery(QueryBuilders.matchAllQuery()).get());
Expand Down Expand Up @@ -427,6 +435,7 @@ public void testSearchTaskHeaderLimit() {

Map<String, String> headers = new HashMap<>();
headers.put(Task.X_OPAQUE_ID, "my_id");
headers.put(Task.TRACE_PARENT, SAMPLE_TRACE_PARENT_HEADER);
headers.put("Custom-Task-Header", randomAlphaOfLengthBetween(maxSize, maxSize + 100));
IllegalArgumentException ex = expectThrows(
IllegalArgumentException.class,
Expand All @@ -436,9 +445,15 @@ public void testSearchTaskHeaderLimit() {
}

private void assertTaskHeaders(TaskInfo taskInfo) {
assertThat(taskInfo.getHeaders().keySet(), hasSize(2));
assertThat(taskInfo.getHeaders().keySet(), hasSize(4));
assertEquals("my_id", taskInfo.getHeaders().get(Task.X_OPAQUE_ID));
assertEquals(SAMPLE_TRACE_ID_HEADER, taskInfo.getHeaders().get(Task.TRACE_ID));
assertEquals("my_value", taskInfo.getHeaders().get("Custom-Task-Header"));
if (taskInfo.getParentTaskId() != null) {
assertTrue(taskInfo.getHeaders().get(Task.TRACE_PARENT).contains(SAMPLE_TRACE_ID_HEADER));
} else {
assertEquals(SAMPLE_TRACE_PARENT_HEADER, taskInfo.getHeaders().get(Task.TRACE_PARENT));
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,7 @@ public ActionModule(
actionPlugins.stream().flatMap(p -> p.getRestHeaders().stream()),
Stream.of(
new RestHeaderDefinition(Task.X_OPAQUE_ID, false),
new RestHeaderDefinition(Task.TRACE_PARENT, false),
new RestHeaderDefinition(WorkloadGroupTask.WORKLOAD_GROUP_ID_HEADER, false)
)
).collect(Collectors.toSet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ private static Map<String, Object> prepareMap(
}

messageFields.put("id", context.getTask().getHeader(Task.X_OPAQUE_ID));
messageFields.put("trace-id", context.getTask().getHeader(Task.TRACE_ID));
return messageFields;
}

Expand All @@ -222,6 +223,11 @@ private static String message(SearchPhaseContext context, long tookInNanos, Sear
} else {
sb.append("id[]");
}
if (context.getTask().getHeader(Task.TRACE_ID) != null) {
sb.append("trace-id[").append(context.getTask().getHeader(Task.TRACE_ID)).append("]");
} else {
sb.append("trace-id[]");
}
return sb.toString();
}

Expand Down
27 changes: 27 additions & 0 deletions server/src/main/java/org/opensearch/common/util/TraceUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.util;

/**
* Util class to extract trace id from traceparent header
*/
public class TraceUtil {

/**
* Extract trace id from traceparent header
* @param traceparent traceparent header value
* @return trace id
*/
public static String extractTraceId(String traceparent) {
if (traceparent == null || traceparent.length() < 55) {
throw new IllegalArgumentException("invalid traceparent");
}
return traceparent.split("-")[1];
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -154,13 +154,13 @@ public StoredContext stashContext() {

ThreadContextStruct threadContextStruct = DEFAULT_CONTEXT.putPersistent(context.persistentHeaders);

if (context.requestHeaders.containsKey(Task.X_OPAQUE_ID)) {
threadContextStruct = threadContextStruct.putHeaders(
MapBuilder.<String, String>newMapBuilder()
.put(Task.X_OPAQUE_ID, context.requestHeaders.get(Task.X_OPAQUE_ID))
.immutableMap()
);
MapBuilder<String, String> builder = MapBuilder.newMapBuilder();
for (String requestHeader : Task.REQUEST_HEADERS) {
if (context.requestHeaders.containsKey(requestHeader)) {
builder.put(requestHeader, context.requestHeaders.get(requestHeader));
}
}
threadContextStruct = threadContextStruct.putHeaders(builder.immutableMap());

final Map<String, Object> transientHeaders = propagateTransients(context.transientHeaders, context.isSystemContext);
if (!transientHeaders.isEmpty()) {
Expand Down Expand Up @@ -697,6 +697,7 @@ private ThreadContextStruct(
this.persistentHeaders = persistentHeaders;
this.isSystemContext = isSystemContext;
this.warningHeadersSize = warningHeadersSize;

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import java.util.List;
import java.util.Map;

import static org.opensearch.tasks.Task.TRACE_PARENT;
import static org.opensearch.tasks.Task.X_OPAQUE_ID;

/**
Expand Down Expand Up @@ -150,6 +151,9 @@ public void sendResponse(RestResponse restResponse) {
if (opaque != null) {
setHeaderField(httpResponse, X_OPAQUE_ID, opaque);
}
if (request.header(TRACE_PARENT) != null) {
setHeaderField(httpResponse, TRACE_PARENT, request.header(TRACE_PARENT));
}

// Add all custom headers
addCustomHeaders(httpResponse, restResponse.getHeaders());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ private static Map<String, Object> prepareMap(SearchContext context, long tookIn
}

messageFields.put("id", context.getTask().getHeader(Task.X_OPAQUE_ID));
messageFields.put("trace-id", context.getTask().getHeader(Task.TRACE_ID));
return messageFields;
}

Expand Down Expand Up @@ -290,6 +291,11 @@ private static String message(SearchContext context, long tookInNanos) {
} else {
sb.append("id[], ");
}
if (context.getTask().getHeader(Task.TRACE_ID) != null) {
sb.append("trace-id[").append(context.getTask().getHeader(Task.TRACE_ID)).append("]");
} else {
sb.append("trace-id[], ");
}
return sb.toString();
}

Expand Down
2 changes: 1 addition & 1 deletion server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -1271,7 +1271,7 @@ protected Node(final Environment initialEnvironment, Collection<PluginInfo> clas

Set<String> taskHeaders = Stream.concat(
pluginsService.filterPlugins(ActionPlugin.class).stream().flatMap(p -> p.getTaskHeaders().stream()),
Stream.of(Task.X_OPAQUE_ID)
Task.REQUEST_HEADERS.stream()
).collect(Collectors.toSet());

final TransportService transportService = newTransportService(
Expand Down
11 changes: 11 additions & 0 deletions server/src/main/java/org/opensearch/rest/RestController.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.logging.DeprecationLogger;
import org.opensearch.common.path.PathTrie;
import org.opensearch.common.util.TraceUtil;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.common.util.io.Streams;
import org.opensearch.common.xcontent.XContentType;
Expand All @@ -54,6 +55,7 @@
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.http.HttpChunk;
import org.opensearch.http.HttpServerTransport;
import org.opensearch.tasks.Task;
import org.opensearch.transport.client.node.NodeClient;
import org.opensearch.usage.UsageService;

Expand Down Expand Up @@ -431,6 +433,15 @@ private void tryAllHandlers(final RestRequest request, final RestChannel channel
return;
} else {
threadContext.putHeader(name, String.join(",", distinctHeaderValues));
if (Task.TRACE_PARENT.equals(restHeader.getName())) {
String traceId = TraceUtil.extractTraceId(distinctHeaderValues.getFirst());
if (traceId != null && !traceId.isBlank()) {
// Extract traceId from traceParent and add this to the header as well. So that it can be reused across
// different places
// like slowLog etc.
threadContext.putHeader(Task.TRACE_ID, traceId);
}
}
}
}
}
Expand Down
27 changes: 27 additions & 0 deletions server/src/main/java/org/opensearch/tasks/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

Expand All @@ -70,6 +71,32 @@ public class Task {
*/
public static final String X_OPAQUE_ID = "X-Opaque-Id";

/**
* The W3 'traceparent' header value for distributed tracing.
*
* This header uniquely identifies a request as it contains unique trace Id that ties together all spans in the trace, as well as a span ID
* identifying this particular span. By propagating this header across service, all desired components can record telemetry as part of the same end-to-end request.
*
* It follows the W3C Trace Context specification:
* traceparent: 00-{trace-id}-{span-id}-{flags}
*
* trace-id — 32-character lowercase hex trace identifier that uniquely defines the distributed trace.
* span-id — 16-character lowercase hex span identifier representing the current operation's position in the trace.
* flags — 2-character hex sampling flags (e.g. "01" if sampled).
*
* Example:
* traceparent: 00-19d538d7c42d09240be001d1e4ff6203-0651eba1347dceea-01
*/
public static final String TRACE_PARENT = "traceparent";

/**
* This uniquely identifies a request. This traceId is essentially extracted from traceparent above, so that we can use this to log it in places like slowlogs etc,
* and we are able to trace the request across OpenSearch.
*/
public static final String TRACE_ID = "trace-id";

public static final Set<String> REQUEST_HEADERS = Set.of(Task.X_OPAQUE_ID, Task.TRACE_PARENT, Task.TRACE_ID);

private static final String TOTAL = "total";

private static final String AVERAGE = "average";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;

/**
* The main OpenSearch transport service
Expand Down Expand Up @@ -972,7 +973,15 @@ public final <T extends TransportResponse> void sendRequest(
final TransportRequestOptions options,
final TransportResponseHandler<T> handler
) {
final Span span = tracer.startSpan(SpanBuilder.from(action, connection));
// Use the traceparent header(if present) to start the span
final Span span = tracer.startSpan(
SpanBuilder.from(action, connection),
threadPool.getThreadContext()
.getHeaders()
.entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> List.of(e.getValue())))
);
try (SpanScope spanScope = tracer.withSpanInScope(span)) {
TransportResponseHandler<T> traceableTransportResponseHandler = TraceableTransportResponseHandler.create(handler, span, tracer);
sendRequestAsync(connection, action, request, options, traceableTransportResponseHandler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.endsWith;
Expand Down Expand Up @@ -293,7 +294,7 @@ public void testSlowLogSearchContextPrinterToLog() throws IOException {
assertThat(p.getFormattedMessage(), startsWith("[foo][0]"));
// Makes sure that output doesn't contain any new lines
assertThat(p.getFormattedMessage(), not(containsString("\n")));
assertThat(p.getFormattedMessage(), endsWith("id[my_id], "));
assertThat(p.getFormattedMessage(), endsWith("trace-id[sample_trace_id]"));
}

public void testLevelSetting() {
Expand Down Expand Up @@ -610,7 +611,9 @@ private SearchContext searchContextWithSourceAndTask(IndexService index) {
SearchContext ctx = createSearchContext(index);
SearchSourceBuilder source = SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery());
ctx.request().source(source);
ctx.setTask(new SearchShardTask(0, "n/a", "n/a", "test", null, Collections.singletonMap(Task.X_OPAQUE_ID, "my_id")));
ctx.setTask(
new SearchShardTask(0, "n/a", "n/a", "test", null, Map.of(Task.X_OPAQUE_ID, "my_id", Task.TRACE_ID, "sample_trace_id"))
);
return ctx;
}
}
Loading
Loading