Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ public void apply(Settings value, Settings current, Settings previous) {
HttpTransportSettings.SETTING_HTTP_BIND_HOST,
HttpTransportSettings.SETTING_HTTP_PORT,
HttpTransportSettings.SETTING_HTTP_PUBLISH_PORT,
HttpTransportSettings.SETTING_HTTP_GRACEFUL_SHUTDOWN,
HttpTransportSettings.SETTING_PIPELINING_MAX_EVENTS,
HttpTransportSettings.SETTING_HTTP_COMPRESSION,
HttpTransportSettings.SETTING_HTTP_COMPRESSION_LEVEL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchException;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.common.network.CloseableChannel;
import org.opensearch.common.network.NetworkAddress;
Expand Down Expand Up @@ -76,10 +77,12 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import static org.opensearch.http.HttpTransportSettings.SETTING_HTTP_BIND_HOST;
import static org.opensearch.http.HttpTransportSettings.SETTING_HTTP_GRACEFUL_SHUTDOWN;
import static org.opensearch.http.HttpTransportSettings.SETTING_HTTP_MAX_CONTENT_LENGTH;
import static org.opensearch.http.HttpTransportSettings.SETTING_HTTP_PORT;
import static org.opensearch.http.HttpTransportSettings.SETTING_HTTP_PUBLISH_HOST;
Expand Down Expand Up @@ -240,6 +243,41 @@ protected void doStop() {
}
}

long gracefulShutdownMillis = SETTING_HTTP_GRACEFUL_SHUTDOWN.get(settings).getMillis();
if (gracefulShutdownMillis > 0 && httpChannels.size() > 0) {

logger.info(
"There are {} open client connections, try to close gracefully within {}ms.",
httpChannels.size(),
gracefulShutdownMillis
);

// Set all httpchannels to close gracefully
for (HttpChannel httpChannel : httpChannels) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The HttpChannel is CloseableChannel,I believe it should be sufficient to call HttpChannel::close() to gracefully disconnect HTTP clients?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I testet it and yes, i can replace ((DefaultRestChannel) httpChannel).gracefulCloseConnection(); with httpChannel.close(); and it works.
I will remove the gracefulCloseConnection() stuff

Copy link
Contributor

@reta reta Sep 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is exactly what we do already on line 282:

 CloseableChannel.closeChannels(new ArrayList<>(httpChannels), true);

It seems like the current impl does block indefinitely, so we could use the newly introduced SETTING_HTTP_GRACEFUL_SHUTDOWN to time bound it

Copy link
Author

@tmanninger tmanninger Sep 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

 CloseableChannel.closeChannels(new ArrayList<>(httpChannels), true);

This close the TCP socket immediately without waiting. This must be the last step of the "shutdown" procedure.
We must close the connection gracefully in DefaultRestChannel::sendResponse() to ensure that the connection will be closed after the client received the response.

if (httpChannel instanceof DefaultRestChannel) {
((DefaultRestChannel) httpChannel).gracefulCloseConnection();
}
}
final long startTimeMillis = System.currentTimeMillis();
while (System.currentTimeMillis() - startTimeMillis < gracefulShutdownMillis) {
if (httpChannels.isEmpty()) {
break;
}
try {
TimeUnit.MILLISECONDS.sleep(30);
} catch (InterruptedException ie) {
throw new OpenSearchException("Interrupted waiting for completion of [{}]", ie);
}
}

if (!httpChannels.isEmpty()) {
logger.info("Timeout reached, {} connections not closed gracefully.", httpChannels.size());
} else {
logger.info("Closed all connections gracefully.");
}
}

// Close all channels that are not yet closed
try {
CloseableChannel.closeChannels(new ArrayList<>(httpChannels), true);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ class DefaultRestChannel extends AbstractRestChannel implements RestChannel {
private final HttpChannel httpChannel;
private final CorsHandler corsHandler;
private final Map<String, List<String>> SERVER_VERSION_HEADER = Map.of(SERVER_VERSION, List.of(SERVER_VERSION_VALUE));
private boolean gracefulCloseConnection = false;

@Nullable
private final HttpTracer tracerLog;
Expand Down Expand Up @@ -119,7 +120,7 @@ public void sendResponse(RestResponse restResponse) {
Releasables.closeWhileHandlingException(httpRequest::release);

final ArrayList<Releasable> toClose = new ArrayList<>(3);
if (HttpUtils.shouldCloseConnection(httpRequest)) {
if (this.gracefulCloseConnection || HttpUtils.shouldCloseConnection(httpRequest)) {
toClose.add(() -> CloseableChannel.closeChannel(httpChannel));
}

Expand Down Expand Up @@ -213,4 +214,8 @@ private void addCookies(HttpResponse response) {
}
}
}

public void gracefulCloseConnection() {
this.gracefulCloseConnection = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,14 @@ public final class HttpTransportSettings {
Property.NodeScope
);
public static final Setting<Integer> SETTING_HTTP_PUBLISH_PORT = intSetting("http.publish_port", -1, -1, Property.NodeScope);

public static final Setting<TimeValue> SETTING_HTTP_GRACEFUL_SHUTDOWN = Setting.timeSetting(
"http.graceful_shutdown",
new TimeValue(0),
new TimeValue(0),
Property.NodeScope
);

public static final Setting<Boolean> SETTING_HTTP_DETAILED_ERRORS_ENABLED = Setting.boolSetting(
"http.detailed_errors.enabled",
true,
Expand Down