Skip to content

Commit 2344a4c

Browse files
committed
Make counting of active streams more robust
1 parent 550ef03 commit 2344a4c

File tree

4 files changed

+31
-11
lines changed

4 files changed

+31
-11
lines changed

java/org/apache/coyote/http2/Http2AsyncUpgradeHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ void sendStreamReset(StreamStateMachine state, StreamException se) throws IOExce
157157
boolean active = state.isActive();
158158
state.sendReset();
159159
if (active) {
160-
decrementActiveRemoteStreamCount();
160+
decrementActiveRemoteStreamCount(getStream(se.getStreamId()));
161161
}
162162
}
163163

java/org/apache/coyote/http2/Http2UpgradeHandler.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -290,8 +290,8 @@ protected void processStreamOnContainerThread(Stream stream) {
290290
}
291291

292292

293-
protected void decrementActiveRemoteStreamCount() {
294-
setConnectionTimeoutForStreamCount(activeRemoteStreamCount.decrementAndGet());
293+
protected void decrementActiveRemoteStreamCount(Stream stream) {
294+
setConnectionTimeoutForStreamCount(stream.decrementAndGetActiveRemoteStreamCount());
295295
}
296296

297297

@@ -598,7 +598,7 @@ void sendStreamReset(StreamStateMachine state, StreamException se) throws IOExce
598598
boolean active = state.isActive();
599599
state.sendReset();
600600
if (active) {
601-
decrementActiveRemoteStreamCount();
601+
decrementActiveRemoteStreamCount(getStream(se.getStreamId()));
602602
}
603603
}
604604
socketWrapper.write(true, rstFrame, 0, rstFrame.length);
@@ -825,7 +825,7 @@ void writeBody(Stream stream, ByteBuffer data, int len, boolean finished) throws
825825
protected void sentEndOfStream(Stream stream) {
826826
stream.sentEndOfStream();
827827
if (!stream.isActive()) {
828-
decrementActiveRemoteStreamCount();
828+
decrementActiveRemoteStreamCount(stream);
829829
}
830830
}
831831

@@ -1208,7 +1208,7 @@ private int allocate(AbstractStream stream, int allocation) {
12081208
}
12091209

12101210

1211-
private Stream getStream(int streamId) {
1211+
Stream getStream(int streamId) {
12121212
Integer key = Integer.valueOf(streamId);
12131213
AbstractStream result = streams.get(key);
12141214
if (result instanceof Stream) {
@@ -1536,6 +1536,7 @@ public HeaderEmitter headersStart(int streamId, boolean headersEndStream) throws
15361536
Stream stream = getStream(streamId, false);
15371537
if (stream == null) {
15381538
stream = createRemoteStream(streamId);
1539+
activeRemoteStreamCount.incrementAndGet();
15391540
}
15401541
if (streamId < maxActiveRemoteStreamId) {
15411542
throw new ConnectionException(sm.getString("upgradeHandler.stream.old", Integer.valueOf(streamId),
@@ -1597,9 +1598,8 @@ public void headersEnd(int streamId, boolean endOfStream) throws Http2Exception
15971598
Stream stream = (Stream) abstractNonZeroStream;
15981599
if (stream.isActive()) {
15991600
if (stream.receivedEndOfHeaders()) {
1600-
1601-
if (localSettings.getMaxConcurrentStreams() < activeRemoteStreamCount.incrementAndGet()) {
1602-
decrementActiveRemoteStreamCount();
1601+
if (localSettings.getMaxConcurrentStreams() < activeRemoteStreamCount.get()) {
1602+
decrementActiveRemoteStreamCount(stream);
16031603
// Ignoring maxConcurrentStreams increases the overhead count
16041604
increaseOverheadCount(FrameType.HEADERS);
16051605
throw new StreamException(
@@ -1643,7 +1643,7 @@ public void receivedEndOfStream(int streamId) throws ConnectionException {
16431643
private void receivedEndOfStream(Stream stream) throws ConnectionException {
16441644
stream.receivedEndOfStream();
16451645
if (!stream.isActive()) {
1646-
decrementActiveRemoteStreamCount();
1646+
decrementActiveRemoteStreamCount(stream);
16471647
}
16481648
}
16491649

@@ -1669,7 +1669,7 @@ public void reset(int streamId, long errorCode) throws Http2Exception {
16691669
boolean active = stream.isActive();
16701670
stream.receiveReset(errorCode);
16711671
if (active) {
1672-
decrementActiveRemoteStreamCount();
1672+
decrementActiveRemoteStreamCount(stream);
16731673
}
16741674
}
16751675
}

java/org/apache/coyote/http2/Stream.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.Locale;
2626
import java.util.Map;
2727
import java.util.Set;
28+
import java.util.concurrent.atomic.AtomicBoolean;
2829
import java.util.concurrent.locks.Lock;
2930
import java.util.concurrent.locks.ReentrantLock;
3031
import java.util.function.Supplier;
@@ -88,6 +89,7 @@ class Stream extends AbstractNonZeroStream implements HeaderEmitter {
8889
private final StreamInputBuffer inputBuffer;
8990
private final StreamOutputBuffer streamOutputBuffer = new StreamOutputBuffer();
9091
private final Http2OutputBuffer http2OutputBuffer = new Http2OutputBuffer(coyoteResponse, streamOutputBuffer);
92+
private final AtomicBoolean removedFromActiveCount = new AtomicBoolean(false);
9193

9294
// State machine would be too much overhead
9395
private int headerState = HEADER_STATE_START;
@@ -838,6 +840,20 @@ public void setIncremental(boolean incremental) {
838840
}
839841

840842

843+
int decrementAndGetActiveRemoteStreamCount() {
844+
/*
845+
* Protect against mis-counting of active streams. This method should only be called once per stream but since
846+
* the count of active streams is used to enforce the maximum concurrent streams limit, make sure each stream is
847+
* only removed from the active count exactly once.
848+
*/
849+
if (removedFromActiveCount.compareAndSet(false, true)) {
850+
return handler.activeRemoteStreamCount.decrementAndGet();
851+
} else {
852+
return handler.activeRemoteStreamCount.get();
853+
}
854+
}
855+
856+
841857
class StreamOutputBuffer implements HttpOutputBuffer, WriteBuffer.Sink {
842858

843859
private final Lock writeLock = new ReentrantLock();

webapps/docs/changelog.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,10 @@
161161
<code>Connector</code> element, similar to the <code>Executor</code>
162162
element, for consistency. (remm)
163163
</update>
164+
<fix>
165+
Make counting of active HTTP/2 streams per connection more robust.
166+
(markt)
167+
</fix>
164168
</changelog>
165169
</subsection>
166170
<subsection name="Jasper">

0 commit comments

Comments
 (0)