-
Notifications
You must be signed in to change notification settings - Fork 96
Better behaviour in the presence of 429s #786
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 18 commits
2dcd409
c583b33
d51d3ac
19a8173
d7164de
39388b7
f2927d0
8f57de6
8f02b8b
fbbcc41
27a2153
e548996
a4c9e68
c68bc89
91dd6d2
2229a81
ee8e539
036d45b
1e18435
fbdeab1
951dfdd
ed18c48
fda4d64
8a088d7
a9721e4
1ce7b82
bc38b76
addbdca
c97925a
baaa142
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,122 @@ | ||
| /* | ||
| * (c) Copyright 2018 Palantir Technologies Inc. All rights reserved. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package com.palantir.remoting3.okhttp; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @iamdanfox what's the deal with remoting-vs-conjure in PRs? |
||
|
|
||
| import com.google.common.annotations.VisibleForTesting; | ||
| import com.netflix.concurrency.limits.Limiter; | ||
| import com.netflix.concurrency.limits.limiter.BlockingLimiter; | ||
| import com.palantir.remoting3.tracing.okhttp3.OkhttpTraceInterceptor; | ||
| import java.util.Optional; | ||
| import java.util.concurrent.ConcurrentHashMap; | ||
| import java.util.concurrent.ConcurrentMap; | ||
| import okhttp3.Request; | ||
|
|
||
| /** | ||
| * Flow control in Conjure is a collaborative effort between servers and clients. Servers advertise an overloaded state | ||
| * via 429/503 responses, and clients throttle the number of requests that they send concurrently as a response to this. | ||
| * The latter is implemented as a combination of two techniques, yielding a mechanism similar to flow control in TCP/IP. | ||
| * <ol> | ||
| * <li> | ||
| * Clients use the frequency of 429/503 responses (as well as the request latency) to determine an estimate | ||
| * for the number of permissible concurrent requests | ||
| * </li> | ||
| * <li> | ||
| * Each such request gets scheduled according to an exponential backoff algorithm. | ||
| * </li> | ||
| * </ol> | ||
| * <p> | ||
| * This class provides an asynchronous implementation of Netflix's | ||
| * <a href="https://github.com/Netflix/concurrency-limits/">concurrency-limits</a> library for determining the | ||
| * above mentioned concurrency estimates. | ||
| * <p> | ||
| * In order to use this class, one should acquire a Limiter for their request, which returns a future. once the Future | ||
| * is completed, the caller can assume that the request is schedulable. After the request completes, the caller | ||
| * <b>must</b> call one of the methods on {@link Limiter.Listener} in order to provide feedback about the request's | ||
| * success. If this is not done, a deadlock could result. | ||
| */ | ||
| final class ConcurrencyLimiters { | ||
| private static final Void NO_CONTEXT = null; | ||
| private static final String FALLBACK = ""; | ||
|
|
||
| private final ConcurrentMap<String, Limiter<Void>> limiters = new ConcurrentHashMap<>(); | ||
|
|
||
| @VisibleForTesting | ||
| Limiter.Listener limiter(String name) { | ||
| return limiters.computeIfAbsent(name, key -> | ||
| new IdempotentLimiter(new BlockingLimiter<>(RemotingConcurrencyLimiter.createDefault()))) | ||
| .acquire(NO_CONTEXT).get(); | ||
| } | ||
|
|
||
| Limiter.Listener limiter(Request request) { | ||
| return limiter(limiterKey(request)); | ||
| } | ||
|
|
||
| private static String limiterKey(Request request) { | ||
| String pathTemplate = request.header(OkhttpTraceInterceptor.PATH_TEMPLATE_HEADER); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is a bit dodgy
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is still dodgy
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't really see a way of avoiding this. It seems reasonable to do this by endpoint, and if you do that you end up with this.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. could also see this being something that uses a dynamic proxy which makes it much easier to limit per method or per some annotation. think the only sad thing about this is relying on the tracing header which is only every passed around internally (never sent across the wire)
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK. But then I'd rename the code bits so that they're no longer "trace"-specific. Probably also need to stop deleting the header in the trace-specific code path |
||
| if (pathTemplate == null) { | ||
| return FALLBACK; | ||
| } else { | ||
| return request.method() + " " + pathTemplate; | ||
| } | ||
| } | ||
|
|
||
| private static final class IdempotentLimiter implements Limiter<Void> { | ||
| private final Limiter<Void> delegate; | ||
|
|
||
| private IdempotentLimiter(Limiter<Void> delegate) { | ||
| this.delegate = delegate; | ||
| } | ||
|
|
||
| @Override | ||
| public Optional<Listener> acquire(Void context) { | ||
| return delegate.acquire(context).map(IdempotentListener::new); | ||
| } | ||
| } | ||
|
|
||
| private static final class IdempotentListener implements Limiter.Listener { | ||
| private final Limiter.Listener delegate; | ||
| private boolean consumed = false; | ||
|
|
||
| private IdempotentListener(Limiter.Listener delegate) { | ||
| this.delegate = delegate; | ||
| } | ||
|
|
||
| @Override | ||
| public void onSuccess() { | ||
| if (!consumed) { | ||
| delegate.onSuccess(); | ||
| } | ||
| consumed = true; | ||
| } | ||
|
|
||
| @Override | ||
| public void onIgnore() { | ||
| if (!consumed) { | ||
| delegate.onIgnore(); | ||
| } | ||
| consumed = true; | ||
| } | ||
|
|
||
| @Override | ||
| public void onDropped() { | ||
| if (!consumed) { | ||
| delegate.onDropped(); | ||
| } | ||
| consumed = true; | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,163 @@ | ||
| package com.palantir.remoting3.okhttp; | ||
|
|
||
| import static com.google.common.base.Preconditions.checkState; | ||
|
|
||
| import com.netflix.concurrency.limits.Limiter; | ||
| import java.io.IOException; | ||
| import java.util.Optional; | ||
| import java.util.concurrent.TimeUnit; | ||
| import okhttp3.Call; | ||
| import okhttp3.Callback; | ||
| import okhttp3.Interceptor; | ||
| import okhttp3.Request; | ||
| import okhttp3.Response; | ||
| import okhttp3.ResponseBody; | ||
| import okio.AsyncTimeout; | ||
| import okio.BufferedSource; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| /** | ||
| * WIP docs for benefit of reviewer. | ||
| * | ||
| * An interceptor for limiting the concurrency of requests to an endpoint. | ||
| * | ||
| * Requests must be tagged (before reaching this point) with a ConcurrencyLimitTag. At this point, we block on | ||
| * receiving a permit to run the request, and store the listener in the tag. | ||
|
||
| * | ||
| * When we see evidence of being dropped, we write this into the tag, and when the request retries again the permit | ||
| * will be returned to the pool before acquiring a new one. | ||
| * | ||
| * Users must also wrap the final callback they use; this is used in two ways; first it clears the state in case of | ||
| * failure, secondly on success it will wait until the response is closed before handing back permits. In other words, | ||
| * if you have a server with a concurrency limit (e.g. it is CPU bound), clients should respect the server's | ||
| * concurrency limit. | ||
| * | ||
| * This has a timeout of 1 minute (before an error is logged) in order to try to catch people who have leaked responses | ||
| * (which here will deadlock otherwise). It indicates an application bug every time, but might affect users poorly. | ||
| * I'm happy to remove it, but think there should probably be another solution? | ||
| */ | ||
| final class ConcurrencyLimitingInterceptor implements Interceptor { | ||
| private static final Logger log = LoggerFactory.getLogger(ConcurrencyLimitingInterceptor.class); | ||
| private final ConcurrencyLimiters limiters = new ConcurrencyLimiters(); | ||
|
|
||
| @Override | ||
| public Response intercept(Chain chain) throws IOException { | ||
| ConcurrencyLimitTag tagState = chain.request().tag(ConcurrencyLimitTag.class); | ||
| tagState.invalidate(); | ||
| tagState.setListener(limiters.limiter(chain.request())); | ||
| return chain.proceed(chain.request()); | ||
| } | ||
|
|
||
| public static Callback wrapCallback(Callback callback) { | ||
|
||
| return new Callback() { | ||
| @Override | ||
| public void onFailure(Call call, IOException e) { | ||
| Optional.ofNullable(call.request().tag(ConcurrencyLimitTag.class)).ifPresent(ConcurrencyLimitTag::invalidate); | ||
| callback.onFailure(call, e); | ||
| } | ||
|
|
||
| @Override | ||
| public void onResponse(Call call, Response response) throws IOException { | ||
| Response newResponse = | ||
| Optional.ofNullable(call.request().tag(ConcurrencyLimitTag.class)) | ||
| .map(t -> wrapResponse(t, response)) | ||
| .orElse(response); | ||
| callback.onResponse(call, newResponse); | ||
| } | ||
| }; | ||
| } | ||
|
|
||
| public static Request wrapRequest(Request request) { | ||
| return request.newBuilder().tag(ConcurrencyLimitTag.class, new ConcurrencyLimitTag()).build(); | ||
| } | ||
|
|
||
| private static Response wrapResponse(ConcurrencyLimitTag tag, Response response) { | ||
| if (response.body() == null) { | ||
| return response; | ||
| } | ||
| ResponseBody currentBody = response.body(); | ||
| ResourceDeallocator deallocator = new ResourceDeallocator(tag); | ||
| ResponseBody newResponseBody = | ||
| ResponseBody.create(currentBody.contentType(), currentBody.contentLength(), | ||
| new ReleaseConcurrencyLimitBufferedSource(currentBody.source(), tag, deallocator)); | ||
| deallocator.timeout(1, TimeUnit.MINUTES); | ||
JacekLach marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| deallocator.enter(); | ||
| return response.newBuilder() | ||
| .body(newResponseBody) | ||
| .build(); | ||
| } | ||
|
|
||
| static final class ConcurrencyLimitTag { | ||
| private Limiter.Listener listener; | ||
| private boolean wasDropped = false; | ||
|
|
||
| private void invalidate() { | ||
| if (listener == null) { | ||
| return; | ||
| } | ||
|
|
||
| if (wasDropped) { | ||
| listener.onDropped(); | ||
| } else { | ||
| listener.onIgnore(); | ||
| } | ||
| listener = null; | ||
| wasDropped = false; | ||
| } | ||
|
|
||
| private void setListener(Limiter.Listener listener) { | ||
| checkState(listener == null); | ||
| this.listener = listener; | ||
| } | ||
|
|
||
| public void success() { | ||
| listener.onSuccess(); | ||
| } | ||
|
|
||
| public void wasDropped() { | ||
| wasDropped = true; | ||
| } | ||
| } | ||
|
|
||
| private static final class ResourceDeallocator extends AsyncTimeout { | ||
|
||
| private final ConcurrencyLimitTag tag; | ||
|
|
||
| private ResourceDeallocator(ConcurrencyLimitTag tag) { | ||
| this.tag = tag; | ||
| } | ||
|
|
||
| @Override | ||
| public void timedOut() { | ||
| log.warn("A call appears to have been leaked. We think this is an application bug caused by not properly " | ||
| + "cleaning up the response object. Make sure you close() it!"); | ||
| tag.invalidate(); | ||
| } | ||
| } | ||
|
|
||
| private static final class ReleaseConcurrencyLimitBufferedSource extends ForwardingBufferedSource { | ||
| private final BufferedSource delegate; | ||
| private final ConcurrencyLimitTag tag; | ||
| private final ResourceDeallocator deallocator; | ||
|
|
||
| private ReleaseConcurrencyLimitBufferedSource(BufferedSource delegate, | ||
| ConcurrencyLimitTag tag, | ||
| ResourceDeallocator deallocator) { | ||
| super(delegate); | ||
| this.delegate = delegate; | ||
| this.tag = tag; | ||
| this.deallocator = deallocator; | ||
| } | ||
|
|
||
| @Override | ||
| public void close() throws IOException { | ||
| if (deallocator.exit()) { | ||
| log.info("The timeout fired but we have now closed the source. This implies a very long lived " | ||
| + "call being used properly, which the Conjure devs do not expect."); | ||
| } | ||
| tag.success(); | ||
| delegate.close(); | ||
| } | ||
| } | ||
|
|
||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@samrogerson fysa
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
uhm, I thought we had merged such a change already?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, but we did it in only one of the two places (see above in this class)