diff --git a/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPool.java b/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPool.java index c3e26dc4e2..ddc42f907c 100644 --- a/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPool.java +++ b/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPool.java @@ -30,6 +30,7 @@ package com.google.api.gax.grpc; import com.google.api.core.InternalApi; +import com.google.api.gax.tracing.ClientMetricsTracer; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; @@ -80,11 +81,27 @@ class ChannelPool extends ManagedChannel { private final AtomicInteger indexTicker = new AtomicInteger(); private final String authority; + private ClientMetricsTracer clientMetricsTracer; + static ChannelPool create(ChannelPoolSettings settings, ChannelFactory channelFactory) throws IOException { return new ChannelPool(settings, channelFactory, Executors.newSingleThreadScheduledExecutor()); } + static ChannelPool create( + ChannelPoolSettings settings, + ChannelFactory channelFactory, + ClientMetricsTracer clientMetricsTracer) + throws IOException { + ChannelPool channelPool = + new ChannelPool(settings, channelFactory, Executors.newSingleThreadScheduledExecutor()); + channelPool.clientMetricsTracer = clientMetricsTracer; + if (channelPool.clientMetricsTracer != null) { + channelPool.clientMetricsTracer.recordCurrentChannelSize(settings.getInitialChannelCount()); + } + return channelPool; + } + /** * Initializes the channel pool. Assumes that all channels have the same authority. * @@ -302,6 +319,7 @@ void resize() { shrink(dampenedTarget); } + clientMetricsTracer.recordCurrentChannelSize(entries.get().size()); } /** Not threadsafe, must be called under the entryWriteLock monitor */ diff --git a/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/GrpcCallContext.java b/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/GrpcCallContext.java index 5f33c36448..321177987a 100644 --- a/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/GrpcCallContext.java +++ b/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/GrpcCallContext.java @@ -66,7 +66,7 @@ */ @BetaApi("Reference ApiCallContext instead - this class is likely to experience breaking changes") public final class GrpcCallContext implements ApiCallContext { - static final CallOptions.Key TRACER_KEY = CallOptions.Key.create("gax.tracer"); + public static final CallOptions.Key TRACER_KEY = CallOptions.Key.create("gax.tracer"); private final Channel channel; private final CallOptions callOptions; @@ -504,7 +504,10 @@ public ApiTracer getTracer() { @Override public GrpcCallContext withTracer(@Nonnull ApiTracer tracer) { Preconditions.checkNotNull(tracer); - return withCallOptions(callOptions.withOption(TRACER_KEY, tracer)); + return withCallOptions( + callOptions + .withOption(TRACER_KEY, tracer) + .withStreamTracerFactory(new GrpcStreamTracer.Factory(tracer))); } /** {@inheritDoc} */ diff --git a/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/GrpcDirectCallable.java b/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/GrpcDirectCallable.java index 5b6a5f1bad..eaec58a1aa 100644 --- a/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/GrpcDirectCallable.java +++ b/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/GrpcDirectCallable.java @@ -30,13 +30,20 @@ package com.google.api.gax.grpc; import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; import com.google.api.core.ListenableFutureToApiFuture; import com.google.api.gax.rpc.ApiCallContext; import com.google.api.gax.rpc.UnaryCallable; +import com.google.api.gax.tracing.ApiTracer; import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.MoreExecutors; import io.grpc.ClientCall; +import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.stub.ClientCalls; +import java.util.regex.Matcher; +import java.util.regex.Pattern; /** * {@code GrpcDirectCallable} creates gRPC calls. @@ -56,18 +63,71 @@ class GrpcDirectCallable extends UnaryCallable futureCall(RequestT request, ApiCallContext inputContext) { Preconditions.checkNotNull(request); Preconditions.checkNotNull(inputContext); - - ClientCall clientCall = GrpcClientCalls.newCall(descriptor, inputContext); - + final GrpcResponseMetadata responseMetadata = new GrpcResponseMetadata(); + GrpcCallContext grpcCallContext = responseMetadata.addHandlers(inputContext); + ClientCall clientCall = + GrpcClientCalls.newCall(descriptor, grpcCallContext); + GfeUnaryCallback callback = + new GfeUnaryCallback(inputContext.getTracer(), responseMetadata); + ApiFuture future; if (awaitTrailers) { - return new ListenableFutureToApiFuture<>(ClientCalls.futureUnaryCall(clientCall, request)); + future = new ListenableFutureToApiFuture<>(ClientCalls.futureUnaryCall(clientCall, request)); } else { - return GrpcClientCalls.eagerFutureUnaryCall(clientCall, request); + future = GrpcClientCalls.eagerFutureUnaryCall(clientCall, request); } + ApiFutures.addCallback(future, callback, MoreExecutors.directExecutor()); + return future; } @Override public String toString() { return String.format("direct(%s)", descriptor); } + + private static final Metadata.Key SERVER_TIMING_HEADER_KEY = + Metadata.Key.of("server-timing", Metadata.ASCII_STRING_MARSHALLER); + + private static final Pattern SERVER_TIMING_HEADER_PATTERN = Pattern.compile(".*dur=(?\\d+)"); + + static class GfeUnaryCallback implements ApiFutureCallback { + + private final ApiTracer tracer; + private final GrpcResponseMetadata responseMetadata; + + GfeUnaryCallback(ApiTracer tracer, GrpcResponseMetadata responseMetadata) { + this.tracer = tracer; + this.responseMetadata = responseMetadata; + } + + @Override + public void onFailure(Throwable throwable) { + // Util.recordMetricsFromMetadata(responseMetadata, tracer, throwable); + } + + @Override + public void onSuccess(ResponseT response) { + Metadata metadata = responseMetadata.getMetadata(); + if (metadata == null) { + return; + } + String allKeys = metadata.keys().stream().reduce((a, b) -> a + ", " + b).get(); + // System.out.println( + // "************************ metadata size: " + // + metadata.keys().size() + // + ", all keys: " + // + allKeys); + if (metadata.get(SERVER_TIMING_HEADER_KEY) == null) { + return; + } + + String durMetadata = metadata.get(SERVER_TIMING_HEADER_KEY); + Matcher matcher = SERVER_TIMING_HEADER_PATTERN.matcher(durMetadata); + // this should always be true + if (matcher.find()) { + long latency = Long.valueOf(matcher.group("dur")); + tracer.recordGfeMetadata(latency); + } + // System.out.println("GFE metadata: " + metadata.get(SERVER_TIMING_HEADER_KEY)); + } + } } diff --git a/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/GrpcStreamTracer.java b/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/GrpcStreamTracer.java new file mode 100644 index 0000000000..e38e967f27 --- /dev/null +++ b/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/GrpcStreamTracer.java @@ -0,0 +1,71 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.api.gax.grpc; + +import com.google.api.gax.tracing.ApiTracer; +import com.google.common.base.Stopwatch; +import io.grpc.Attributes; +import io.grpc.ClientStreamTracer; +import io.grpc.Metadata; +import java.util.concurrent.TimeUnit; + +/** + * Records the time a request is enqueued in a grpc channel queue. Its primary purpose is to measure + * the transition time between asking gRPC to start an RPC and gRPC actually serializing that RPC. + */ +class GrpcStreamTracer extends ClientStreamTracer { + + private final Stopwatch stopwatch = Stopwatch.createUnstarted(); + private final ApiTracer tracer; + + public GrpcStreamTracer(ApiTracer tracer) { + this.tracer = tracer; + stopwatch.start(); + } + + @Override + public void createPendingStream() { + tracer.grpcTargetResolutionDelay(stopwatch.elapsed(TimeUnit.NANOSECONDS)); + stopwatch.reset(); + stopwatch.start(); + } + + @Override + public void streamCreated(Attributes transportAttrs, Metadata headers) { + tracer.grpcChannelReadinessDelay(stopwatch.elapsed(TimeUnit.NANOSECONDS)); + stopwatch.reset(); + stopwatch.start(); + } + + @Override + public void outboundMessageSent(int seqNo, long optionalWireSize, long optionalUncompressedSize) { + tracer.grpcCallSendDelay(stopwatch.elapsed(TimeUnit.NANOSECONDS)); + } + + static class Factory extends ClientStreamTracer.Factory { + + private final ApiTracer tracer; + + Factory(ApiTracer tracer) { + this.tracer = tracer; + } + + @Override + public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) { + return new GrpcStreamTracer(tracer); + } + } +} diff --git a/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/InstantiatingGrpcChannelProvider.java b/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/InstantiatingGrpcChannelProvider.java index 2703d13486..8779fe8b49 100644 --- a/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/InstantiatingGrpcChannelProvider.java +++ b/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/InstantiatingGrpcChannelProvider.java @@ -40,6 +40,7 @@ import com.google.api.gax.rpc.TransportChannelProvider; import com.google.api.gax.rpc.internal.EnvironmentProvider; import com.google.api.gax.rpc.mtls.MtlsProvider; +import com.google.api.gax.tracing.ClientMetricsTracer; import com.google.auth.Credentials; import com.google.auth.oauth2.ComputeEngineCredentials; import com.google.common.annotations.VisibleForTesting; @@ -117,6 +118,7 @@ public final class InstantiatingGrpcChannelProvider implements TransportChannelP @Nullable private final Boolean allowNonDefaultServiceAccount; @VisibleForTesting final ImmutableMap directPathServiceConfig; @Nullable private final MtlsProvider mtlsProvider; + @Nullable private ClientMetricsTracer clientMetricsTracer; @Nullable private final ApiFunction channelConfigurator; @@ -184,6 +186,11 @@ public String getTransportName() { return GrpcTransportChannel.getGrpcTransportName(); } + @Override + public void setClientMetricsTracer(ClientMetricsTracer clientMetricsTracer) { + this.clientMetricsTracer = clientMetricsTracer; + } + @Override public boolean needsEndpoint() { return endpoint == null; @@ -241,7 +248,9 @@ public TransportChannel getTransportChannel() throws IOException { private TransportChannel createChannel() throws IOException { return GrpcTransportChannel.create( ChannelPool.create( - channelPoolSettings, InstantiatingGrpcChannelProvider.this::createSingleChannel)); + channelPoolSettings, + InstantiatingGrpcChannelProvider.this::createSingleChannel, + clientMetricsTracer)); } private boolean isDirectPathEnabled() { diff --git a/gax-java/gax/pom.xml b/gax-java/gax/pom.xml index 01b565be60..7f779cd7d0 100644 --- a/gax-java/gax/pom.xml +++ b/gax-java/gax/pom.xml @@ -57,6 +57,18 @@ graal-sdk provided + + io.opentelemetry + opentelemetry-api + + + io.opentelemetry + opentelemetry-sdk + + + io.opentelemetry + opentelemetry-exporter-otlp + diff --git a/gax-java/gax/src/main/java/com/google/api/gax/retrying/BasicRetryingFuture.java b/gax-java/gax/src/main/java/com/google/api/gax/retrying/BasicRetryingFuture.java index de7b5b5acb..1a46409ab0 100644 --- a/gax-java/gax/src/main/java/com/google/api/gax/retrying/BasicRetryingFuture.java +++ b/gax-java/gax/src/main/java/com/google/api/gax/retrying/BasicRetryingFuture.java @@ -199,7 +199,8 @@ void handleAttempt(Throwable throwable, ResponseT response) { } super.setException(throwable); } else { - tracer.attemptSucceeded(); + tracer.attemptSucceeded(response); + tracer.retryCount(attemptSettings.getAttemptCount()); super.set(response); } } catch (CancellationException e) { diff --git a/gax-java/gax/src/main/java/com/google/api/gax/rpc/ClientContext.java b/gax-java/gax/src/main/java/com/google/api/gax/rpc/ClientContext.java index e7fac9d0c6..46f7818281 100644 --- a/gax-java/gax/src/main/java/com/google/api/gax/rpc/ClientContext.java +++ b/gax-java/gax/src/main/java/com/google/api/gax/rpc/ClientContext.java @@ -39,6 +39,7 @@ import com.google.api.gax.rpc.internal.QuotaProjectIdHidingCredentials; import com.google.api.gax.tracing.ApiTracerFactory; import com.google.api.gax.tracing.BaseApiTracerFactory; +import com.google.api.gax.tracing.ClientMetricsTracer; import com.google.auth.Credentials; import com.google.auth.oauth2.GdchCredentials; import com.google.auto.value.AutoValue; @@ -223,6 +224,9 @@ public static ClientContext create(StubSettings settings) throws IOException { if (transportChannelProvider.needsEndpoint()) { transportChannelProvider = transportChannelProvider.withEndpoint(endpoint); } + ClientMetricsTracer clientMetricsTracer = settings.getTracerFactory().newClientMetricsTracer(); + transportChannelProvider.setClientMetricsTracer(clientMetricsTracer); + TransportChannel transportChannel = transportChannelProvider.getTransportChannel(); ApiCallContext defaultCallContext = diff --git a/gax-java/gax/src/main/java/com/google/api/gax/rpc/TransportChannelProvider.java b/gax-java/gax/src/main/java/com/google/api/gax/rpc/TransportChannelProvider.java index 21f3c31f63..343ccd56e4 100644 --- a/gax-java/gax/src/main/java/com/google/api/gax/rpc/TransportChannelProvider.java +++ b/gax-java/gax/src/main/java/com/google/api/gax/rpc/TransportChannelProvider.java @@ -31,6 +31,7 @@ import com.google.api.core.BetaApi; import com.google.api.core.InternalExtensionOnly; +import com.google.api.gax.tracing.ClientMetricsTracer; import com.google.auth.Credentials; import java.io.IOException; import java.util.Map; @@ -143,6 +144,8 @@ public interface TransportChannelProvider { */ String getTransportName(); + default void setClientMetricsTracer(ClientMetricsTracer clientMetricsTracer) {}; + /** * User set custom endpoint for the Transport Channel Provider * diff --git a/gax-java/gax/src/main/java/com/google/api/gax/tracing/ApiTracer.java b/gax-java/gax/src/main/java/com/google/api/gax/tracing/ApiTracer.java index 3176be4b92..e18ed11e0e 100644 --- a/gax-java/gax/src/main/java/com/google/api/gax/tracing/ApiTracer.java +++ b/gax-java/gax/src/main/java/com/google/api/gax/tracing/ApiTracer.java @@ -57,6 +57,8 @@ public interface ApiTracer { */ void operationSucceeded(); + default void operationSucceeded(Object response) {}; + /** * Signals that the operation was cancelled by the user. The tracer is now considered closed and * should no longer be used. @@ -76,7 +78,7 @@ public interface ApiTracer { * * @param id the local connection identifier of the selected connection. */ - void connectionSelected(String id); + default void connectionSelected(String id) {}; /** * Adds an annotation that an attempt is about to start. In general this should occur at the very @@ -96,10 +98,17 @@ public interface ApiTracer { * @param attemptNumber the zero based sequential attempt number. * @param request request of this attempt. */ - void attemptStarted(Object request, int attemptNumber); + default void attemptStarted(Object request, int attemptNumber) {}; /** Adds an annotation that the attempt succeeded. */ - void attemptSucceeded(); + default void attemptSucceeded() {}; + + default void attemptSucceeded(Object response) {}; + + // This is for libraries to override to intended name + default String attemptLatencyName() { + return "attempt_latency"; + }; /** Add an annotation that the attempt was cancelled by the user. */ void attemptCancelled(); @@ -128,24 +137,25 @@ public interface ApiTracer { */ void attemptPermanentFailure(Throwable error); + default void retryCount(int count) {}; /** * Signals that the initial RPC for the long running operation failed. * * @param error the error that caused the long running operation fail. */ - void lroStartFailed(Throwable error); + default void lroStartFailed(Throwable error) {}; /** * Signals that the initial RPC successfully started the long running operation. The long running * operation will now be polled for completion. */ - void lroStartSucceeded(); + default void lroStartSucceeded() {}; /** Adds an annotation that a streaming response has been received. */ - void responseReceived(); + default void responseReceived() {}; /** Adds an annotation that a streaming request has been sent. */ - void requestSent(); + default void requestSent() {}; /** * Adds an annotation that a batch of writes has been flushed. @@ -153,8 +163,17 @@ public interface ApiTracer { * @param elementCount the number of elements in the batch. * @param requestSize the size of the batch in bytes. */ - void batchRequestSent(long elementCount, long requestSize); + default void batchRequestSent(long elementCount, long requestSize) {}; + + default void grpcTargetResolutionDelay(long elapsed) {}; + + default void grpcChannelReadinessDelay(long elapsed) {}; + + default void grpcCallSendDelay(long elapsed) {}; + + default void recordGfeMetadata(long latency) {}; + default void addAdditionalAttributes(String key, String value) {}; /** * A context class to be used with {@link #inScope()} and a try-with-resources block. Closing a * {@link Scope} removes any context that the underlying implementation might've set in {@link diff --git a/gax-java/gax/src/main/java/com/google/api/gax/tracing/ApiTracerFactory.java b/gax-java/gax/src/main/java/com/google/api/gax/tracing/ApiTracerFactory.java index bb8345b88c..d8cbef51bf 100644 --- a/gax-java/gax/src/main/java/com/google/api/gax/tracing/ApiTracerFactory.java +++ b/gax-java/gax/src/main/java/com/google/api/gax/tracing/ApiTracerFactory.java @@ -61,4 +61,9 @@ enum OperationType { * @param operationType the type of operation that the tracer will trace */ ApiTracer newTracer(ApiTracer parent, SpanName spanName, OperationType operationType); + + // This probably needs to be moved to a new factory + default ClientMetricsTracer newClientMetricsTracer() { + return null; + }; } diff --git a/gax-java/gax/src/main/java/com/google/api/gax/tracing/ClientMetricsTracer.java b/gax-java/gax/src/main/java/com/google/api/gax/tracing/ClientMetricsTracer.java new file mode 100644 index 0000000000..a67b951808 --- /dev/null +++ b/gax-java/gax/src/main/java/com/google/api/gax/tracing/ClientMetricsTracer.java @@ -0,0 +1,12 @@ +package com.google.api.gax.tracing; + +public interface ClientMetricsTracer { + + void recordCurrentChannelSize(int channelSize); + + default void recordGaxThread(int threadCount) {}; + + default String channelSizeName() { + return "channel_size"; + }; +} diff --git a/gax-java/gax/src/main/java/com/google/api/gax/tracing/MetricsRecorder.java b/gax-java/gax/src/main/java/com/google/api/gax/tracing/MetricsRecorder.java new file mode 100644 index 0000000000..39b2b3ddff --- /dev/null +++ b/gax-java/gax/src/main/java/com/google/api/gax/tracing/MetricsRecorder.java @@ -0,0 +1,111 @@ +package com.google.api.gax.tracing; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.api.metrics.DoubleHistogram; +import io.opentelemetry.api.metrics.LongCounter; +import io.opentelemetry.api.metrics.LongHistogram; +import io.opentelemetry.api.metrics.Meter; +import java.util.Map; + +public class MetricsRecorder { + protected Meter meter; + + protected DoubleHistogram attemptLatencyRecorder; + + protected DoubleHistogram operationLatencyRecorder; + protected LongHistogram retryCountRecorder; + protected LongHistogram gfeLatencyRecorder; + + protected DoubleHistogram targetResolutionDelayRecorder; + protected DoubleHistogram channelReadinessDelayRecorder; + protected DoubleHistogram callSendDelayRecorder; + protected LongCounter operationCountRecorder; + + protected LongCounter attemptCountRecorder; + + protected Attributes attributes; + + public MetricsRecorder(Meter meter) { + this.meter = meter; + this.attemptLatencyRecorder = + meter + .histogramBuilder("attempt_latency") + .setDescription("Duration of an individual attempt") + .setUnit("ms") + .build(); + this.operationLatencyRecorder = + meter + .histogramBuilder("operation_latency") + .setDescription( + "Total time until final operation success or failure, including retries and backoff.") + .setUnit("ms") + .build(); + this.retryCountRecorder = + meter + .histogramBuilder("retry_count") + .setDescription("Number of additional attempts per operation after initial attempt") + .setUnit("1") + .ofLongs() + .build(); + this.gfeLatencyRecorder = + meter + .histogramBuilder("gfe_latency") + .setDescription("GFE latency") + .setUnit("1") + .ofLongs() + .build(); + this.targetResolutionDelayRecorder = + meter + .histogramBuilder("target_resolution_delay") + .setDescription("Delay caused by name resolution") + .setUnit("ns") + .build(); + this.channelReadinessDelayRecorder = + meter + .histogramBuilder("channel_readiness_delay") + .setDescription("Delay caused by establishing connection") + .setUnit("ns") + .build(); + this.callSendDelayRecorder = + meter + .histogramBuilder("call_send_delay") + .setDescription("Call send delay. (after the connection is ready)") + .setUnit("ns") + .build(); + this.operationCountRecorder = + meter + .counterBuilder("operation_count") + .setDescription("Count of Operations") + .setUnit("1") + .build(); + this.attemptCountRecorder = + meter + .counterBuilder("attempt_count") + .setDescription("Count of Attempts") + .setUnit("1") + .build(); + } + + public void recordAttemptLatency(double attemptLatency, Map attributes) { + attemptLatencyRecorder.record(attemptLatency, toOtelAttributes(attributes)); + } + + public void recordAttemptCount(long count, Map attributes) { + attemptCountRecorder.add(count, toOtelAttributes(attributes)); + } + + public void recordOperationLatency(double operationLatency, Map attributes) { + operationLatencyRecorder.record(operationLatency, toOtelAttributes(attributes)); + } + + public void recordOperationCount(long count, Map attributes) { + operationCountRecorder.add(count, toOtelAttributes(attributes)); + } + + private Attributes toOtelAttributes(Map attributes) { + AttributesBuilder attributesBuilder = Attributes.builder(); + attributes.forEach(attributesBuilder::put); + return attributesBuilder.build(); + } +} diff --git a/gax-java/gax/src/main/java/com/google/api/gax/tracing/MetricsTracer.java b/gax-java/gax/src/main/java/com/google/api/gax/tracing/MetricsTracer.java new file mode 100644 index 0000000000..9c02a50a78 --- /dev/null +++ b/gax-java/gax/src/main/java/com/google/api/gax/tracing/MetricsTracer.java @@ -0,0 +1,141 @@ +/* + * Copyright 2023 Google LLC + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google LLC nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package com.google.api.gax.tracing; + +import com.google.api.gax.rpc.ApiException; +import com.google.api.gax.rpc.StatusCode; +import com.google.common.base.Stopwatch; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CancellationException; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; +import org.threeten.bp.Duration; + +public class MetricsTracer implements ApiTracer { + public static final String STATUS_ATTRIBUTE = "status"; + + private Stopwatch attemptTimer; + + private final Stopwatch operationTimer = Stopwatch.createStarted(); + + private final Map attributes = new HashMap<>(); + + protected MetricsRecorder metricsRecorder; + + public MetricsTracer( + SpanName spanName, MetricsRecorder metricsRecorder) { + this.attributes.put("method_name", spanName.toString()); + this.metricsRecorder = metricsRecorder; + } + + @Override + public Scope inScope() { + return () -> {}; + } + + @Override + public void operationSucceeded() {} + + @Override + public void operationSucceeded(Object response) { + attributes.put(STATUS_ATTRIBUTE, StatusCode.Code.OK.toString()); + metricsRecorder.recordOperationLatency(operationTimer.elapsed(TimeUnit.MILLISECONDS), + attributes); + metricsRecorder.recordOperationCount(1, attributes); + } + + @Override + public void operationCancelled() {} + + @Override + public void operationFailed(Throwable error) { + attributes.put(STATUS_ATTRIBUTE, extractStatus(error)); + metricsRecorder.recordOperationLatency(operationTimer.elapsed(TimeUnit.MILLISECONDS), + attributes); + metricsRecorder.recordOperationCount(1, attributes); + } + + @Override + public void attemptStarted(int attemptNumber) {} + + @Override + public void attemptStarted(Object request, int attemptNumber) { + attemptTimer = Stopwatch.createStarted(); + } + + @Override + public void attemptSucceeded() {} + + @Override + public void attemptSucceeded(Object response) { + attributes.put(STATUS_ATTRIBUTE, StatusCode.Code.OK.toString()); + metricsRecorder.recordAttemptLatency( + attemptTimer.elapsed(TimeUnit.MILLISECONDS), attributes); + metricsRecorder.recordAttemptCount(1, attributes); + } + + @Override + public void attemptCancelled() {} + + @Override + public void attemptFailed(Throwable error, Duration delay) { + attributes.put(STATUS_ATTRIBUTE, extractStatus(error)); + metricsRecorder.recordAttemptLatency(attemptTimer.elapsed(TimeUnit.MILLISECONDS), attributes); + metricsRecorder.recordAttemptCount(1, attributes); + } + + @Override + public void attemptFailedRetriesExhausted(Throwable error) {} + + @Override + public void attemptPermanentFailure(Throwable error) {} + + static String extractStatus(@Nullable Throwable error) { + final String statusString; + + if (error == null) { + return StatusCode.Code.OK.toString(); + } else if (error instanceof CancellationException) { + statusString = StatusCode.Code.CANCELLED.toString(); + } else if (error instanceof ApiException) { + statusString = ((ApiException) error).getStatusCode().getCode().toString(); + } else { + statusString = StatusCode.Code.UNKNOWN.toString(); + } + + return statusString; + } + + public void addAdditionalAttributes(String key, String value) { + attributes.put(key, value); + } +} diff --git a/gax-java/gax/src/main/java/com/google/api/gax/tracing/OpenTelemetryClientMetricsTracer.java b/gax-java/gax/src/main/java/com/google/api/gax/tracing/OpenTelemetryClientMetricsTracer.java new file mode 100644 index 0000000000..1537bde102 --- /dev/null +++ b/gax-java/gax/src/main/java/com/google/api/gax/tracing/OpenTelemetryClientMetricsTracer.java @@ -0,0 +1,35 @@ +package com.google.api.gax.tracing; + +import io.opentelemetry.api.metrics.LongGaugeBuilder; +import io.opentelemetry.api.metrics.Meter; + +public class OpenTelemetryClientMetricsTracer implements ClientMetricsTracer { + + private final LongGaugeBuilder channelSizeRecorder; + private final LongGaugeBuilder threadCountRecorder; + private Meter meter; + + public OpenTelemetryClientMetricsTracer(Meter meter) { + this.meter = meter; + channelSizeRecorder = + meter.gaugeBuilder(channelSizeName()).setDescription("Channel Size").setUnit("1").ofLongs(); + threadCountRecorder = + meter + .gaugeBuilder("client_thread_count") + .setDescription("Current Thread Count created by Client") + .setUnit("1") + .ofLongs(); + } + + @Override + public void recordCurrentChannelSize(int channelSize) { + channelSizeRecorder.buildWithCallback( + observableLongMeasurement -> observableLongMeasurement.record(channelSize)); + } + + @Override + public void recordGaxThread(int threadCount) { + threadCountRecorder.buildWithCallback( + observableLongMeasurement -> observableLongMeasurement.record(threadCount)); + } +} diff --git a/gax-java/gax/src/main/java/com/google/api/gax/tracing/OpenTelemetryMetricsFactory.java b/gax-java/gax/src/main/java/com/google/api/gax/tracing/OpenTelemetryMetricsFactory.java new file mode 100644 index 0000000000..2eef2c347f --- /dev/null +++ b/gax-java/gax/src/main/java/com/google/api/gax/tracing/OpenTelemetryMetricsFactory.java @@ -0,0 +1,71 @@ +/* + * Copyright 2023 Google LLC + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google LLC nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.google.api.gax.tracing; + +import com.google.api.core.InternalApi; +import com.google.api.gax.core.GaxProperties; +import io.opencensus.trace.Tracer; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.metrics.Meter; + +/** + * A {@link ApiTracerFactory} to build instances of {@link OpencensusTracer}. + * + *

This class wraps the {@link Tracer} provided by Opencensus in {@code Tracing.getTracer()}. It + * will be used to create new spans and wrap them in {@link OpencensusTracer} defined in gax. + * + *

This class is thread safe. + */ +@InternalApi("For google-cloud-java client use only") +public class OpenTelemetryMetricsFactory implements ApiTracerFactory { + protected Meter meter; + + protected MetricsRecorder metricsRecorder; + + public OpenTelemetryMetricsFactory( + OpenTelemetry openTelemetry, String libraryName, String libraryVersion) { + meter = + openTelemetry + .meterBuilder("gax") + .setInstrumentationVersion(GaxProperties.getGaxVersion()) + .build(); + metricsRecorder = new MetricsRecorder(meter); + } + + @Override + public ApiTracer newTracer(ApiTracer parent, SpanName spanName, OperationType operationType) { + return new MetricsTracer(spanName, metricsRecorder); + } + + @Override + public ClientMetricsTracer newClientMetricsTracer() { + return new OpenTelemetryClientMetricsTracer(meter); + } +} diff --git a/gax-java/gax/src/main/java/com/google/api/gax/tracing/TraceFinisher.java b/gax-java/gax/src/main/java/com/google/api/gax/tracing/TraceFinisher.java index 292a827759..1a517dfc7a 100644 --- a/gax-java/gax/src/main/java/com/google/api/gax/tracing/TraceFinisher.java +++ b/gax-java/gax/src/main/java/com/google/api/gax/tracing/TraceFinisher.java @@ -53,6 +53,6 @@ public void onFailure(Throwable throwable) { @Override public void onSuccess(T responseT) { - tracer.operationSucceeded(); + tracer.operationSucceeded(responseT); } } diff --git a/gax-java/gax/src/main/java/com/google/api/gax/tracing/TracedResponseObserver.java b/gax-java/gax/src/main/java/com/google/api/gax/tracing/TracedResponseObserver.java index ba72d2f5b7..768de72f12 100644 --- a/gax-java/gax/src/main/java/com/google/api/gax/tracing/TracedResponseObserver.java +++ b/gax-java/gax/src/main/java/com/google/api/gax/tracing/TracedResponseObserver.java @@ -106,7 +106,7 @@ public void onError(Throwable t) { @Override public void onComplete() { - tracer.operationSucceeded(); + tracer.operationSucceeded(null); innerObserver.onComplete(); } } diff --git a/gax-java/pom.xml b/gax-java/pom.xml index a8cea24b91..89df61d674 100644 --- a/gax-java/pom.xml +++ b/gax-java/pom.xml @@ -158,6 +158,13 @@ pom import + + io.opentelemetry + opentelemetry-bom + 1.27.0 + pom + import + diff --git a/java-core/google-cloud-core/src/main/java/com/google/cloud/ServiceOptions.java b/java-core/google-cloud-core/src/main/java/com/google/cloud/ServiceOptions.java index 16879b8914..af63a18d52 100644 --- a/java-core/google-cloud-core/src/main/java/com/google/cloud/ServiceOptions.java +++ b/java-core/google-cloud-core/src/main/java/com/google/cloud/ServiceOptions.java @@ -40,6 +40,7 @@ import com.google.api.gax.rpc.FixedHeaderProvider; import com.google.api.gax.rpc.HeaderProvider; import com.google.api.gax.rpc.NoHeaderProvider; +import com.google.api.gax.tracing.ApiTracerFactory; import com.google.auth.Credentials; import com.google.auth.oauth2.GoogleCredentials; import com.google.auth.oauth2.QuotaProjectIdProvider; @@ -110,6 +111,8 @@ public abstract class ServiceOptions< private transient ServiceT service; private transient ServiceRpc rpc; + private final ApiTracerFactory apiTracerFactory; + /** * Builder for {@code ServiceOptions}. * @@ -137,6 +140,8 @@ public abstract static class Builder< private String clientLibToken = ServiceOptions.getGoogApiClientLibName(); private String quotaProjectId; + private ApiTracerFactory apiTracerFactory; + @InternalApi("This class should only be extended within google-cloud-java") protected Builder() {} @@ -153,6 +158,7 @@ protected Builder(ServiceOptions options) { transportOptions = options.transportOptions; clientLibToken = options.clientLibToken; quotaProjectId = options.quotaProjectId; + apiTracerFactory = options.apiTracerFactory; } protected abstract ServiceOptions build(); @@ -306,6 +312,11 @@ public B setQuotaProjectId(String quotaProjectId) { return self(); } + public B setApiTracerFactory(ApiTracerFactory apiTracerFactory) { + this.apiTracerFactory = apiTracerFactory; + return self(); + } + protected Set getAllowedClientLibTokens() { return allowedClientLibTokens; } @@ -347,6 +358,7 @@ protected ServiceOptions( builder.quotaProjectId != null ? builder.quotaProjectId : getValueFromCredentialsFile(getCredentialsPath(), "quota_project_id"); + apiTracerFactory = builder.apiTracerFactory; } private static String getCredentialsPath() { @@ -692,6 +704,10 @@ public String getLibraryVersion() { return GaxProperties.getLibraryVersion(this.getClass()); } + public ApiTracerFactory getApiTracerFactory() { + return apiTracerFactory; + } + @InternalApi public final HeaderProvider getMergedHeaderProvider(HeaderProvider internalHeaderProvider) { Map mergedHeaders =