Skip to content

Commit b4006a2

Browse files
committed
Add CSM for batch write flow control
1 parent 6998e3b commit b4006a2

File tree

6 files changed

+134
-17
lines changed

6 files changed

+134
-17
lines changed

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/RateLimitingServerStreamingCallable.java

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.util.concurrent.atomic.AtomicReference;
3838
import java.util.logging.Logger;
3939
import javax.annotation.Nonnull;
40+
import javax.annotation.Nullable;
4041

4142
class RateLimitingServerStreamingCallable
4243
extends ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> {
@@ -69,6 +70,8 @@ class RateLimitingServerStreamingCallable
6970

7071
private final ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> innerCallable;
7172

73+
private BigtableTracer bigtableTracer;
74+
7275
RateLimitingServerStreamingCallable(
7376
@Nonnull ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> innerCallable) {
7477
this.limiter = new ConditionalRateLimiter(DEFAULT_QPS);
@@ -84,8 +87,8 @@ public void call(
8487
limiter.acquire();
8588
stopwatch.stop();
8689
if (context.getTracer() instanceof BigtableTracer) {
87-
((BigtableTracer) context.getTracer())
88-
.batchRequestThrottled(stopwatch.elapsed(TimeUnit.NANOSECONDS));
90+
bigtableTracer = (BigtableTracer) context.getTracer();
91+
bigtableTracer.batchRequestThrottled(stopwatch.elapsed(TimeUnit.NANOSECONDS));
8992
}
9093
RateLimitingResponseObserver innerObserver = new RateLimitingResponseObserver(responseObserver);
9194
innerCallable.call(request, innerObserver, context);
@@ -155,22 +158,26 @@ public double getRate() {
155158
* Sets the rate and the next rate update time based on period, if the current time exceeds the
156159
* next rate update time. Otherwise, no-op.
157160
*
158-
* @param rate The new rate of the rate limiter.
159-
* @param period The period during which rate should not be updated again and the rate limiter
160-
* should not be disabled.
161+
* @param rate The new rate of the rate limiter.
162+
* @param period The period during which rate should not be updated again and the rate limiter
163+
* should not be disabled.
164+
* @param bigtableTracer The tracer for exporting client-side metrics.
161165
*/
162-
public void trySetRate(double rate, Duration period) {
166+
public void trySetRate(double rate, Duration period, BigtableTracer bigtableTracer, double cappedFactor,
167+
@Nullable Throwable status) {
163168
Instant nextTime = nextRateUpdateTime.get();
164169
Instant now = Instant.now();
165170

166171
if (now.isBefore(nextTime)) {
172+
bigtableTracer.addBatchWriteFlowControlFactor(cappedFactor, status, false);
167173
return;
168174
}
169175

170176
Instant newNextTime = now.plusSeconds(period.getSeconds());
171177

172178
if (!nextRateUpdateTime.compareAndSet(nextTime, newNextTime)) {
173179
// Someone else updated it already.
180+
bigtableTracer.addBatchWriteFlowControlFactor(cappedFactor, status, false);
174181
return;
175182
}
176183
final double oldRate = limiter.getRate();
@@ -183,6 +190,8 @@ public void trySetRate(double rate, Duration period) {
183190
+ " with period "
184191
+ period.getSeconds()
185192
+ " seconds.");
193+
bigtableTracer.setBatchWriteFlowControlTargetQps(rate);
194+
bigtableTracer.addBatchWriteFlowControlFactor(cappedFactor, status, true);
186195
}
187196

188197
@VisibleForTesting
@@ -236,7 +245,7 @@ protected void onResponseImpl(MutateRowsResponse response) {
236245
RateLimitInfo info = response.getRateLimitInfo();
237246
updateQps(
238247
info.getFactor(),
239-
Duration.ofSeconds(com.google.protobuf.util.Durations.toSeconds(info.getPeriod())));
248+
Duration.ofSeconds(com.google.protobuf.util.Durations.toSeconds(info.getPeriod())), null);
240249
} else {
241250
limiter.tryDisable();
242251
}
@@ -250,7 +259,7 @@ protected void onErrorImpl(Throwable t) {
250259
if (t instanceof DeadlineExceededException
251260
|| t instanceof UnavailableException
252261
|| t instanceof ResourceExhaustedException) {
253-
updateQps(MIN_FACTOR, DEFAULT_PERIOD);
262+
updateQps(MIN_FACTOR, DEFAULT_PERIOD, t);
254263
}
255264
outerObserver.onError(t);
256265
}
@@ -260,11 +269,11 @@ protected void onCompleteImpl() {
260269
outerObserver.onComplete();
261270
}
262271

263-
private void updateQps(double factor, Duration period) {
272+
private void updateQps(double factor, Duration period, @Nullable Throwable t) {
264273
double cappedFactor = Math.min(Math.max(factor, MIN_FACTOR), MAX_FACTOR);
265274
double currentRate = limiter.getRate();
266275
double cappedRate = Math.min(Math.max(currentRate * cappedFactor, MIN_QPS), MAX_QPS);
267-
limiter.trySetRate(cappedRate, period);
276+
limiter.trySetRate(cappedRate, period, bigtableTracer, cappedFactor, t);
268277
}
269278
}
270279

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,7 @@ public void onRequest(int requestCount) {
4141
// noop
4242
}
4343

44-
/**
45-
* annotate when automatic flow control is disabled. This will be called in BuiltinMetricsTracer.
46-
*/
44+
/** annotate when automatic flow control is disabled. This will be called in BuiltinMetricsTracer. */
4745
public void disableFlowControl() {
4846
// noop
4947
}
@@ -92,7 +90,8 @@ public void setLocations(String zone, String cluster) {
9290
}
9391

9492
/** Set the underlying transport used to process the attempt */
95-
public void setTransportAttrs(BuiltinMetricsTracer.TransportAttrs attrs) {}
93+
public void setTransportAttrs(BuiltinMetricsTracer.TransportAttrs attrs) {
94+
}
9695

9796
@Deprecated
9897
/**
@@ -115,4 +114,23 @@ public void grpcMessageSent() {
115114
public void setTotalTimeoutDuration(Duration totalTimeoutDuration) {
116115
// noop
117116
}
117+
118+
/**
119+
* Record the target QPS for batch write flow control.
120+
*
121+
* @param targetQps The new target QPS for the client.
122+
*/
123+
public void setBatchWriteFlowControlTargetQps(double targetQps) {
124+
}
125+
126+
/**
127+
* Record the factors received from server-side for batch write flow control. The factors are capped by min and max
128+
* allowed factor values. Status and whether the factor was actually applied are also recorded.
129+
*
130+
* @param factor Capped factor from server-side. For non-OK response, min factor is used.
131+
* @param status Status of the request.
132+
* @param applied Whether the factor was actually applied.
133+
*/
134+
public void addBatchWriteFlowControlFactor(double factor, @Nullable Throwable status, boolean applied) {
135+
}
118136
}

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsConstants.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ public class BuiltinMetricsConstants {
4949
static final AttributeKey<String> METHOD_KEY = AttributeKey.stringKey("method");
5050
static final AttributeKey<String> STATUS_KEY = AttributeKey.stringKey("status");
5151
static final AttributeKey<String> CLIENT_UID_KEY = AttributeKey.stringKey("client_uid");
52+
static final AttributeKey<Boolean> APPLIED_KEY = AttributeKey.booleanKey("applied");
5253

5354
static final AttributeKey<String> TRANSPORT_TYPE = AttributeKey.stringKey("transport_type");
5455
static final AttributeKey<String> TRANSPORT_REGION = AttributeKey.stringKey("transport_region");
@@ -70,6 +71,8 @@ public class BuiltinMetricsConstants {
7071
static final String REMAINING_DEADLINE_NAME = "remaining_deadline";
7172
static final String CLIENT_BLOCKING_LATENCIES_NAME = "throttling_latencies";
7273
static final String PER_CONNECTION_ERROR_COUNT_NAME = "per_connection_error_count";
74+
static final String BATCH_WRITE_FLOW_CONTROL_TARGET_QPS_NAME = "batch_write_flow_control_target_qps";
75+
static final String BATCH_WRITE_FLOW_CONTROL_FACTOR_NAME = "batch_write_flow_control_factor";
7376

7477
// Start allow list of metrics that will be exported as internal
7578
public static final Map<String, Set<String>> GRPC_METRICS =
@@ -140,6 +143,11 @@ public class BuiltinMetricsConstants {
140143
500_000.0,
141144
1_000_000.0));
142145

146+
private static final Aggregation AGGREGATION_BATCH_WRITE_FLOW_CONTROL_FACTOR_HISTOGRAM =
147+
Aggregation.explicitBucketHistogram(
148+
ImmutableList.of(
149+
0.7, 0.8, 0.9, 1.0, 1.1, 1.2, 1.3));
150+
143151
static final Set<AttributeKey> COMMON_ATTRIBUTES =
144152
ImmutableSet.of(
145153
BIGTABLE_PROJECT_ID_KEY,
@@ -286,7 +294,23 @@ public static Map<InstrumentSelector, View> getAllViews() {
286294
.addAll(COMMON_ATTRIBUTES)
287295
.add(STREAMING_KEY, STATUS_KEY)
288296
.build());
289-
297+
defineView(
298+
views,
299+
BATCH_WRITE_FLOW_CONTROL_TARGET_QPS_NAME,
300+
Aggregation.sum(),
301+
InstrumentType.GAUGE,
302+
"1",
303+
ImmutableSet.<AttributeKey>builder().addAll(COMMON_ATTRIBUTES).add(STATUS_KEY).build());
304+
defineView(
305+
views,
306+
BATCH_WRITE_FLOW_CONTROL_FACTOR_NAME,
307+
AGGREGATION_BATCH_WRITE_FLOW_CONTROL_FACTOR_HISTOGRAM,
308+
InstrumentType.HISTOGRAM,
309+
"1",
310+
ImmutableSet.<AttributeKey>builder()
311+
.addAll(COMMON_ATTRIBUTES)
312+
.add(STREAMING_KEY, STATUS_KEY)
313+
.build());
290314
return views.build();
291315
}
292316
}

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.TRANSPORT_TYPE;
2929
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.TRANSPORT_ZONE;
3030
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.ZONE_ID_KEY;
31+
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.APPLIED_KEY;
3132

3233
import com.google.api.core.ObsoleteApi;
3334
import com.google.api.gax.retrying.ServerStreamingAttemptException;
@@ -41,6 +42,7 @@
4142
import com.google.gson.reflect.TypeToken;
4243
import io.grpc.Deadline;
4344
import io.opentelemetry.api.common.Attributes;
45+
import io.opentelemetry.api.metrics.DoubleGauge;
4446
import io.opentelemetry.api.metrics.DoubleHistogram;
4547
import io.opentelemetry.api.metrics.LongCounter;
4648
import java.time.Duration;
@@ -136,6 +138,8 @@ static TransportAttrs create(@Nullable String locality, @Nullable String backend
136138
private final DoubleHistogram remainingDeadlineHistogram;
137139
private final LongCounter connectivityErrorCounter;
138140
private final LongCounter retryCounter;
141+
private final DoubleGauge batchWriteFlowControlTargetQps;
142+
private final DoubleHistogram batchWriteFlowControlFactorHistogram;
139143

140144
BuiltinMetricsTracer(
141145
OperationType operationType,
@@ -150,7 +154,9 @@ static TransportAttrs create(@Nullable String locality, @Nullable String backend
150154
DoubleHistogram applicationBlockingLatenciesHistogram,
151155
DoubleHistogram deadlineHistogram,
152156
LongCounter connectivityErrorCounter,
153-
LongCounter retryCounter) {
157+
LongCounter retryCounter,
158+
DoubleGauge batchWriteFlowControlTargetQps,
159+
DoubleHistogram batchWriteFlowControlFactorHistogram) {
154160
this.operationType = operationType;
155161
this.spanName = spanName;
156162
this.baseAttributes = attributes;
@@ -165,6 +171,8 @@ static TransportAttrs create(@Nullable String locality, @Nullable String backend
165171
this.remainingDeadlineHistogram = deadlineHistogram;
166172
this.connectivityErrorCounter = connectivityErrorCounter;
167173
this.retryCounter = retryCounter;
174+
this.batchWriteFlowControlTargetQps = batchWriteFlowControlTargetQps;
175+
this.batchWriteFlowControlFactorHistogram = batchWriteFlowControlFactorHistogram;
168176
}
169177

170178
@Override
@@ -496,4 +504,28 @@ private static double convertToMs(long nanoSeconds) {
496504
double toMs = 1e-6;
497505
return nanoSeconds * toMs;
498506
}
507+
508+
@Override
509+
public void setBatchWriteFlowControlTargetQps(double targetQps) {
510+
Attributes attributes =
511+
baseAttributes.toBuilder()
512+
.put(METHOD_KEY, spanName.toString())
513+
.build();
514+
515+
batchWriteFlowControlTargetQps.set(targetQps, attributes);
516+
}
517+
518+
@Override
519+
public void addBatchWriteFlowControlFactor(double factor, @Nullable Throwable status, boolean applied) {
520+
String statusStr = status == null ? "OK" : Util.extractStatus(status);
521+
522+
Attributes attributes =
523+
baseAttributes.toBuilder()
524+
.put(METHOD_KEY, spanName.toString())
525+
.put(STATUS_KEY, statusStr)
526+
.put(APPLIED_KEY, applied)
527+
.build();
528+
529+
batchWriteFlowControlFactorHistogram.record(factor, attributes);
530+
}
499531
}

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerFactory.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.REMAINING_DEADLINE_NAME;
2727
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.RETRY_COUNT_NAME;
2828
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.SERVER_LATENCIES_NAME;
29+
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.BATCH_WRITE_FLOW_CONTROL_TARGET_QPS_NAME;
30+
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.BATCH_WRITE_FLOW_CONTROL_FACTOR_NAME;
2931

3032
import com.google.api.core.InternalApi;
3133
import com.google.api.gax.tracing.ApiTracer;
@@ -34,9 +36,11 @@
3436
import com.google.api.gax.tracing.SpanName;
3537
import io.opentelemetry.api.OpenTelemetry;
3638
import io.opentelemetry.api.common.Attributes;
39+
import io.opentelemetry.api.metrics.DoubleGauge;
3740
import io.opentelemetry.api.metrics.DoubleHistogram;
3841
import io.opentelemetry.api.metrics.LongCounter;
3942
import io.opentelemetry.api.metrics.Meter;
43+
4044
import java.io.IOException;
4145

4246
/**
@@ -61,6 +65,8 @@ public class BuiltinMetricsTracerFactory extends BaseApiTracerFactory {
6165
private final DoubleHistogram remainingDeadlineHistogram;
6266
private final LongCounter connectivityErrorCounter;
6367
private final LongCounter retryCounter;
68+
private final DoubleGauge batchWriteFlowControlTargetQps;
69+
private final DoubleHistogram batchWriteFlowControlFactorHistogram;
6470

6571
public static BuiltinMetricsTracerFactory create(
6672
OpenTelemetry openTelemetry, Attributes attributes) throws IOException {
@@ -147,6 +153,18 @@ public static BuiltinMetricsTracerFactory create(
147153
.setDescription("The number of additional RPCs sent after the initial attempt.")
148154
.setUnit(COUNT)
149155
.build();
156+
batchWriteFlowControlTargetQps =
157+
meter
158+
.gaugeBuilder(BATCH_WRITE_FLOW_CONTROL_TARGET_QPS_NAME)
159+
.setDescription("The current target QPS of the client under batch write flow control.")
160+
.setUnit("1").build();
161+
batchWriteFlowControlFactorHistogram =
162+
meter
163+
.histogramBuilder(BATCH_WRITE_FLOW_CONTROL_FACTOR_NAME)
164+
.setDescription(
165+
"The distribution of batch write flow control factors received from the server.")
166+
.setUnit("1")
167+
.build();
150168
}
151169

152170
@Override
@@ -164,6 +182,8 @@ public ApiTracer newTracer(ApiTracer parent, SpanName spanName, OperationType op
164182
applicationBlockingLatenciesHistogram,
165183
remainingDeadlineHistogram,
166184
connectivityErrorCounter,
167-
retryCounter);
185+
retryCounter,
186+
batchWriteFlowControlTargetQps,
187+
batchWriteFlowControlFactorHistogram);
168188
}
169189
}

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,4 +266,18 @@ public void setTotalTimeoutDuration(java.time.Duration totalTimeoutDuration) {
266266
tracer.setTotalTimeoutDuration(totalTimeoutDuration);
267267
}
268268
}
269+
270+
@Override
271+
public void setBatchWriteFlowControlTargetQps(double targetQps) {
272+
for (BigtableTracer tracer : bigtableTracers) {
273+
tracer.setBatchWriteFlowControlTargetQps(targetQps);
274+
}
275+
}
276+
277+
@Override
278+
public void addBatchWriteFlowControlFactor(double factor, @Nullable Throwable t, boolean applied) {
279+
for (BigtableTracer tracer : bigtableTracers) {
280+
tracer.addBatchWriteFlowControlFactor(factor, t, applied);
281+
}
282+
}
269283
}

0 commit comments

Comments
 (0)