Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
2dcd409
Better behaviour in the presence of backing off
j-baker Aug 2, 2018
c583b33
Merge branch 'develop' into jbaker/better_429_behaviour
j-baker Aug 2, 2018
d51d3ac
more docs, easier to read
j-baker Aug 2, 2018
19a8173
Better concurrency limiters
j-baker Aug 3, 2018
d7164de
fixes
j-baker Aug 3, 2018
39388b7
simplify
j-baker Aug 3, 2018
f2927d0
Checkstyle
j-baker Aug 13, 2018
8f57de6
checkstyle
j-baker Aug 13, 2018
8f02b8b
PR comments
j-baker Sep 2, 2018
fbbcc41
Tweak the concurrency limiters lib
j-baker Sep 2, 2018
27a2153
reset flow control test
j-baker Sep 2, 2018
e548996
changes
j-baker Sep 2, 2018
a4c9e68
Changes
j-baker Sep 2, 2018
c68bc89
Passes the build
j-baker Sep 2, 2018
91dd6d2
update lockfiles
j-baker Sep 2, 2018
2229a81
Merge remote-tracking branch 'origin/develop' into jbaker/better_429_…
j-baker Sep 11, 2018
ee8e539
New attempt using interceptor
j-baker Sep 11, 2018
036d45b
more comments
j-baker Sep 11, 2018
1e18435
some bullshit
j-baker Sep 12, 2018
fbdeab1
Perfect
j-baker Sep 12, 2018
951dfdd
cleanup
j-baker Sep 12, 2018
ed18c48
Ready to go?
j-baker Sep 13, 2018
fda4d64
docs
j-baker Sep 13, 2018
8a088d7
Checkstyle
j-baker Sep 13, 2018
a9721e4
chekcstyle
j-baker Sep 13, 2018
1ce7b82
Metric
j-baker Sep 13, 2018
bc38b76
Javadoc
iamdanfox Sep 13, 2018
addbdca
README describes new flow control
iamdanfox Sep 13, 2018
c97925a
Move docs -> class level javadoc
iamdanfox Sep 13, 2018
baaa142
Rename ConcurrencyLimiters#limiter -> acquireLimiter
iamdanfox Sep 13, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public static ClientConfiguration of(
.enableGcmCipherSuites(DEFAULT_ENABLE_GCM_CIPHERS)
.proxy(ProxySelector.getDefault())
.proxyCredentials(Optional.empty())
.maxNumRetries(uris.size())
.maxNumRetries(DEFAULT_MAX_NUM_RETRIES)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uhm, I thought we had merged such a change already?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, but we did it in only one of the two places (see above in this class)

.backoffSlotSize(DEFAULT_BACKOFF_SLOT_SIZE)
.nodeSelectionStrategy(DEFAULT_NODE_SELECTION_STRATEGY)
.failedUrlCooldown(DEFAULT_FAILED_URL_COOLDOWN)
Expand Down
1 change: 1 addition & 0 deletions okhttp-clients/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ dependencies {
compile project(':http-clients')
compile project(':tracing-okhttp3')
compile 'com.google.guava:guava'
compile 'com.netflix.concurrency-limits:concurrency-limits-core'
compile 'com.palantir.safe-logging:preconditions'
compile 'com.palantir.tritium:tritium-registry'
compile 'com.squareup.okhttp3:logging-interceptor'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
/*
* Copyright 2018 Palantir Technologies, Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.remoting3.okhttp;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@iamdanfox what's the deal with remoting-vs-conjure in PRs?


import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalCause;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.netflix.concurrency.limits.Limiter;
import com.netflix.concurrency.limits.limit.AIMDLimit;
import com.netflix.concurrency.limits.limit.TracingLimitDecorator;
import com.netflix.concurrency.limits.limiter.DefaultLimiter;
import com.netflix.concurrency.limits.strategy.SimpleStrategy;
import com.palantir.remoting3.tracing.okhttp3.OkhttpTraceInterceptor;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import okhttp3.Request;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Remoting calls may observe 429 or 503 responses from the server, at which point they back off in order to
* reduce excess load. Unfortunately this state on backing off is stored per-call, so 429s or 503s in one call do not
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe just describe how it works rather lamenting ("Unfortunately") the decomp?

Flow control in Conjure is a collaborative effort between servers and clients: Servers advertise an overloaded state via 429/503 responses, and clients throttle the number of requests they are sending. The latter is implemented as a combination of two techniques, yielding a mechanism similar to flow control in TCP/IP: First, clients use the frequency of 429/503 responses to determine an estimate for the number of permissible concurrent requests. Second, each such request gets scheduled according to an exponential backoff algorithm. This class provides an asynchronous implementation of Netflix's concurrency-limits library for determining the above-mentioned concurrency estimates. [...]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, thanks

* cause any request rate slowdown in subsequent calls. This class affects this by adjusting the number of requests
* that might be dispatched to a given endpoint.
* <p>
* This is based on Netflix's <a href="https://github.com/Netflix/concurrency-limits/">Concurrency Limits</a> library,
* which provides a number of primitives for this.
* <p>
* In order to use this class, one should get a Limiter for their request, which returns a future. once the Future is
* done, the caller can assume that the request is schedulable. After the request completes, the caller <b>must</b>
* call one of the methods on {@link Limiter.Listener} in order to provide feedback about the request's success.
* If this is not done, throughput will be negatively affected. We attempt to eventually recover to avoid a total
* deadlock, but this is not guaranteed.
*/
final class ConcurrencyLimiters {
private static final Logger log = LoggerFactory.getLogger(ConcurrencyLimiters.class);
private static final String FALLBACK = "";

// If a request is never marked as complete and is thrown away, recover on the next GC instead of deadlocking
private static final Map<Limiter.Listener, Runnable> activeListeners = CacheBuilder.newBuilder()
.weakKeys()
.<Limiter.Listener, Runnable>removalListener(notification -> {
if (notification.getCause().equals(RemovalCause.COLLECTED)) {
log.warn("Concurrency limiter was leaked."
+ " This implies a remoting bug or classpath issue, and may cause degraded performance");
notification.getValue().run();
}
})
.build()
.asMap();

private final ConcurrentMap<String, ConcurrencyLimiter> limiters = new ConcurrentHashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we make this a size limited cache?


private static Limiter<Void> newLimiter() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inline in ConcurrencyLimiter constructor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

return DefaultLimiter.newBuilder()
.limit(TracingLimitDecorator.wrap(AIMDLimit.newBuilder().initialLimit(1).build()))
.build(new SimpleStrategy<>());
}

@VisibleForTesting
ConcurrencyLimiter limiter(String name) {
return limiters.computeIfAbsent(name, key -> new ConcurrencyLimiter(activeListeners, newLimiter()));
}

ConcurrencyLimiter limiter(Request request) {
final String limiterKey;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm always pro moving out logic into a function when you have this conditional initialization. Could also just make this String limiterKey = limiter != null ? blah : FALLBACK;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

String pathTemplate = request.header(OkhttpTraceInterceptor.PATH_TEMPLATE_HEADER);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a bit dodgy

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is still dodgy

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really see a way of avoiding this. It seems reasonable to do this by endpoint, and if you do that you end up with this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could also see this being something that uses a dynamic proxy which makes it much easier to limit per method or per some annotation. think the only sad thing about this is relying on the tracing header which is only every passed around internally (never sent across the wire)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. But then I'd rename the code bits so that they're no longer "trace"-specific. Probably also need to stop deleting the header in the trace-specific code path

if (pathTemplate == null) {
limiterKey = FALLBACK;
} else {
limiterKey = request.method() + " " + pathTemplate;
}
return limiter(limiterKey);
}

/**
* The Netflix library provides either a blocking approach or a non-blocking approach which might say
* you can't be scheduled at this time. All of our HTTP calls are asynchronous, so we really want to get
* a {@link ListenableFuture} that we can add a callback to. This class then is a translation of
* {@link com.netflix.concurrency.limits.limiter.BlockingLimiter} to be asynchronous, maintaining a queue
* of currently waiting requests.
* <p>
* Upon a request finishing, we check if there are any waiting requests, and if there are we attempt to trigger
* some more.
*/
static final class ConcurrencyLimiter {
private final Map<Limiter.Listener, Runnable> activeListeners;
private final Queue<SettableFuture<Limiter.Listener>> waitingRequests = new LinkedBlockingQueue<>();
private final Limiter<Void> limiter;

public ConcurrencyLimiter(
Map<Limiter.Listener, Runnable> activeListeners,
Limiter<Void> limiter) {
this.activeListeners = activeListeners;
this.limiter = limiter;
}

public ListenableFuture<Limiter.Listener> acquire() {
Optional<Limiter.Listener> maybeListener = limiter.acquire(null);
if (maybeListener.isPresent()) {
return Futures.immediateFuture(wrap(activeListeners, maybeListener.get()));
}
SettableFuture<Limiter.Listener> future = SettableFuture.create();
waitingRequests.add(future);
return future;
}

private void processQueue() {
while (!waitingRequests.isEmpty()) {
Optional<Limiter.Listener> maybeAcquired = limiter.acquire(null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does null represent? would making it a named constant make this more readable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed to NO_CONTEXT

if (!maybeAcquired.isPresent()) {
return;
}
Limiter.Listener acquired = maybeAcquired.get();
SettableFuture<Limiter.Listener> head = waitingRequests.poll();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would prefer to make the multi-threadedness here a little easier to understand by calling poll only

SettableFuture<> head;
// Note that different threads may be executing processQueue; this is safe because ...
while( (head = waitingRequests.poll()) != null ) {
  ..
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah but if you do that, you're not guaranteed to have any permits to give them

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i see. then add a comment, please

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if (head == null) {
acquired.onIgnore();
} else {
head.set(acquired);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this construction satisfy some basic fairness properties, i.e., every request will get scheduled/acquired eventually?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, via the FIFO queue

}
}
}

private Limiter.Listener wrap(
Map<Limiter.Listener, Runnable> activeListeners, Limiter.Listener listener) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove activeListeners param?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe even make ConcurrencyLimiter non-static?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Limiter.Listener res = new Limiter.Listener() {
@Override
public void onSuccess() {
listener.onSuccess();
activeListeners.remove(this);
processQueue();
}

@Override
public void onIgnore() {
listener.onIgnore();
activeListeners.remove(this);
processQueue();
}

@Override
public void onDropped() {
listener.onDropped();
activeListeners.remove(this);
processQueue();
}
};
activeListeners.put(res, () -> {
listener.onIgnore();
processQueue();
});
return res;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@
import com.palantir.tritium.metrics.registry.MetricName;
import com.palantir.tritium.metrics.registry.TaggedMetricRegistry;
import java.util.Collection;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import okhttp3.ConnectionPool;
import okhttp3.ConnectionSpec;
Expand Down Expand Up @@ -221,7 +221,8 @@ private static RemotingOkHttpClient createInternal(

return new RemotingOkHttpClient(
client.build(),
() -> new ExponentialBackoff(config.maxNumRetries(), config.backoffSlotSize(), new Random()),
() -> new ExponentialBackoff(
config.maxNumRetries(), config.backoffSlotSize(), ThreadLocalRandom.current()),
urlSelector,
schedulingExecutor,
executionExecutor);
Expand Down
Loading