Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ public void apply(Settings value, Settings current, Settings previous) {
HttpTransportSettings.SETTING_HTTP_BIND_HOST,
HttpTransportSettings.SETTING_HTTP_PORT,
HttpTransportSettings.SETTING_HTTP_PUBLISH_PORT,
HttpTransportSettings.SETTING_HTTP_GRACEFUL_SHUTDOWN,
HttpTransportSettings.SETTING_PIPELINING_MAX_EVENTS,
HttpTransportSettings.SETTING_HTTP_COMPRESSION,
HttpTransportSettings.SETTING_HTTP_COMPRESSION_LEVEL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchException;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.common.network.CloseableChannel;
import org.opensearch.common.network.NetworkAddress;
Expand Down Expand Up @@ -76,10 +77,12 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import static org.opensearch.http.HttpTransportSettings.SETTING_HTTP_BIND_HOST;
import static org.opensearch.http.HttpTransportSettings.SETTING_HTTP_GRACEFUL_SHUTDOWN;
import static org.opensearch.http.HttpTransportSettings.SETTING_HTTP_MAX_CONTENT_LENGTH;
import static org.opensearch.http.HttpTransportSettings.SETTING_HTTP_PORT;
import static org.opensearch.http.HttpTransportSettings.SETTING_HTTP_PUBLISH_HOST;
Expand Down Expand Up @@ -240,6 +243,39 @@ protected void doStop() {
}
}

long gracefulShutdownMillis = SETTING_HTTP_GRACEFUL_SHUTDOWN.get(settings).getMillis();
if (gracefulShutdownMillis > 0 && httpChannels.size() > 0) {

logger.info(
"There are {} open client connections, try to close gracefully within {}ms.",
httpChannels.size(),
gracefulShutdownMillis
);

// Close connection after the client is getting a response.
handlingSettings.setForceCloseConnection();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry @tmanninger , getting to it again just now. So I think this is what we want:

  • try to send the "in-flight" response to the clients within gracefulShutdownMillis timeframe
  • close the connection (forceably or not)

I believe we could make it part of the RestChannel (and leave AbstractHttpServerTransport out of it) by introducing RestChannel::closeGracefuly(long timeout) method and calling it in CloseableChannel.closeChannels (would need some adjustments).


// Wait until all connections are closed or the graceful timeout is reached.
final long startTimeMillis = System.currentTimeMillis();
while (System.currentTimeMillis() - startTimeMillis < gracefulShutdownMillis) {
if (httpChannels.isEmpty()) {
break;
}
try {
TimeUnit.MILLISECONDS.sleep(30);
} catch (InterruptedException ie) {
throw new OpenSearchException("Interrupted waiting for completion of [{}]", ie);
}
}

if (!httpChannels.isEmpty()) {
logger.info("Timeout reached, {} connections not closed gracefully.", httpChannels.size());
} else {
logger.info("Closed all connections gracefully.");
}
}

// Close all channels that are not yet closed
try {
CloseableChannel.closeChannels(new ArrayList<>(httpChannels), true);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public void sendResponse(RestResponse restResponse) {
Releasables.closeWhileHandlingException(httpRequest::release);

final ArrayList<Releasable> toClose = new ArrayList<>(3);
if (HttpUtils.shouldCloseConnection(httpRequest)) {
if (settings.forceCloseConnection() || HttpUtils.shouldCloseConnection(httpRequest)) {
toClose.add(() -> CloseableChannel.closeChannel(httpChannel));
}

Expand Down Expand Up @@ -151,6 +151,14 @@ public void sendResponse(RestResponse restResponse) {
setHeaderField(httpResponse, X_OPAQUE_ID, opaque);
}

if (settings.forceCloseConnection()) {
// Server is in shutdown mode. Send client to close the connection.
// -------------
// Only works with http1
// How to check request httpversion is 2 AND send goaway signal?
setHeaderField(httpResponse, CONNECTION, CLOSE);
}

// Add all custom headers
addCustomHeaders(httpResponse, restResponse.getHeaders());
addCustomHeaders(httpResponse, threadContext.getResponseHeaders());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ public class HttpHandlingSettings {
private final long readTimeoutMillis;
private boolean corsEnabled;

// Close the connection after response is sent to the client (even if keep-alive is set)
private boolean forceCloseConnection;

public HttpHandlingSettings(
int maxContentLength,
int maxChunkSize,
Expand All @@ -89,6 +92,7 @@ public HttpHandlingSettings(
this.pipeliningMaxEvents = pipeliningMaxEvents;
this.readTimeoutMillis = readTimeoutMillis;
this.corsEnabled = corsEnabled;
this.forceCloseConnection = false;
}

public static HttpHandlingSettings fromSettings(Settings settings) {
Expand Down Expand Up @@ -150,4 +154,12 @@ public long getReadTimeoutMillis() {
public boolean isCorsEnabled() {
return corsEnabled;
}

public void setForceCloseConnection() {
forceCloseConnection = true;
}

public boolean forceCloseConnection() {
return forceCloseConnection;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,14 @@ public final class HttpTransportSettings {
Property.NodeScope
);
public static final Setting<Integer> SETTING_HTTP_PUBLISH_PORT = intSetting("http.publish_port", -1, -1, Property.NodeScope);

public static final Setting<TimeValue> SETTING_HTTP_GRACEFUL_SHUTDOWN = Setting.timeSetting(
"http.graceful_shutdown",
new TimeValue(0),
new TimeValue(0),
Property.NodeScope
);

public static final Setting<Boolean> SETTING_HTTP_DETAILED_ERRORS_ENABLED = Setting.boolSetting(
"http.detailed_errors.enabled",
true,
Expand Down
Loading