STATE = AtomicIntegerFieldUpdater.newUpdater(Entry.class, "state");
+
+ private final Runnable task;
+ private final Cancellable onCancel;
+ private volatile int state;
+
+ Entry(final Runnable task, final Cancellable onCancel) {
+ this.task = task;
+ this.onCancel = onCancel;
+ }
+
+ void run() {
+ task.run();
+ }
+
+ boolean tryStart() {
+ return STATE.compareAndSet(this, QUEUED, STARTED);
+ }
+
+ @Override
+ public boolean cancel() {
+ if (STATE.compareAndSet(this, QUEUED, CANCELLED)) {
+ onCancel.cancel();
+ return true;
+ }
+ return false;
+ }
+ }
+
+}
diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/examples/AsyncClientQueueCap.java b/httpclient5/src/test/java/org/apache/hc/client5/http/examples/AsyncClientQueueCap.java
deleted file mode 100644
index 9fbf72c2cb..0000000000
--- a/httpclient5/src/test/java/org/apache/hc/client5/http/examples/AsyncClientQueueCap.java
+++ /dev/null
@@ -1,242 +0,0 @@
-/*
- * ====================================================================
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- * ====================================================================
- *
- * This software consists of voluntary contributions made by many
- * individuals on behalf of the Apache Software Foundation. For more
- * information on the Apache Software Foundation, please see
- * .
- *
- */
-package org.apache.hc.client5.http.examples;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
-import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
-import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder;
-import org.apache.hc.client5.http.async.methods.SimpleRequestProducer;
-import org.apache.hc.client5.http.async.methods.SimpleResponseConsumer;
-import org.apache.hc.client5.http.config.TlsConfig;
-import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
-import org.apache.hc.client5.http.impl.async.HttpAsyncClients;
-import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager;
-import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
-import org.apache.hc.core5.concurrent.FutureCallback;
-import org.apache.hc.core5.http.HttpHost;
-import org.apache.hc.core5.http.message.StatusLine;
-import org.apache.hc.core5.http2.HttpVersionPolicy;
-import org.apache.hc.core5.http2.config.H2Config;
-import org.apache.hc.core5.io.CloseMode;
-import org.apache.hc.core5.reactor.IOReactorConfig;
-
-/**
- * Demonstrates capping the number of queued / in-flight request executions within the internal
- * async execution pipeline using {@link org.apache.hc.client5.http.impl.async.HttpAsyncClientBuilder#setMaxQueuedRequests(int)}.
- *
- * When the cap is reached, submissions may fail fast with {@link java.util.concurrent.RejectedExecutionException}.
- */
-public class AsyncClientQueueCap {
-
- public static void main(final String[] args) throws Exception {
-
- final int maxQueuedRequests = 2;
-
- final PoolingAsyncClientConnectionManager connectionManager = PoolingAsyncClientConnectionManagerBuilder.create()
- .setMaxConnTotal(1)
- .setMaxConnPerRoute(1)
- .setDefaultTlsConfig(TlsConfig.custom()
- .setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_2)
- .build())
- .setMessageMultiplexing(true)
- .build();
-
- final CloseableHttpAsyncClient client = HttpAsyncClients.custom()
- .setMaxQueuedRequests(maxQueuedRequests)
- .setIOReactorConfig(IOReactorConfig.DEFAULT)
- .setConnectionManager(connectionManager)
- .setH2Config(H2Config.DEFAULT)
- .build();
-
- client.start();
-
- final HttpHost target = new HttpHost("https", "httpbingo.org");
-
- final SimpleHttpRequest warmup = SimpleRequestBuilder.get()
- .setHttpHost(target)
- .setPath("/get")
- .build();
-
- final CountDownLatch warmupLatch = new CountDownLatch(1);
-
- System.out.println("Executing warm-up request " + warmup);
- client.execute(
- SimpleRequestProducer.create(warmup),
- SimpleResponseConsumer.create(),
- new FutureCallback() {
-
- @Override
- public void completed(final SimpleHttpResponse response) {
- System.out.println(warmup + "->" + new StatusLine(response));
- warmupLatch.countDown();
- }
-
- @Override
- public void failed(final Exception ex) {
- System.out.println(warmup + "->" + ex);
- warmupLatch.countDown();
- }
-
- @Override
- public void cancelled() {
- System.out.println(warmup + " cancelled");
- warmupLatch.countDown();
- }
-
- });
-
- warmupLatch.await();
-
- final List batch1 = new ArrayList<>();
- batch1.add(SimpleRequestBuilder.get().setHttpHost(target)
- .setPath("/drip?numbytes=100&duration=8&delay=0&code=200&i=0").build());
- batch1.add(SimpleRequestBuilder.get().setHttpHost(target)
- .setPath("/drip?numbytes=100&duration=8&delay=0&code=200&i=1").build());
-
- final List batch2 = new ArrayList<>();
- batch2.add(SimpleRequestBuilder.get().setHttpHost(target)
- .setPath("/drip?numbytes=100&duration=8&delay=0&code=200&i=2").build());
- batch2.add(SimpleRequestBuilder.get().setHttpHost(target)
- .setPath("/drip?numbytes=100&duration=8&delay=0&code=200&i=3").build());
-
- final CountDownLatch latch1 = new CountDownLatch(batch1.size());
- final CountDownLatch latchAttempt2 = new CountDownLatch(batch2.size());
- final AtomicInteger rejected = new AtomicInteger(0);
-
- System.out.println("Submitting first batch (expected to execute)");
- for (final SimpleHttpRequest request : batch1) {
- System.out.println("Executing request " + request);
- client.execute(
- SimpleRequestProducer.create(request),
- SimpleResponseConsumer.create(),
- new FutureCallback() {
-
- @Override
- public void completed(final SimpleHttpResponse response) {
- System.out.println(request + "->" + new StatusLine(response));
- latch1.countDown();
- }
-
- @Override
- public void failed(final Exception ex) {
- System.out.println(request + "->" + ex);
- latch1.countDown();
- }
-
- @Override
- public void cancelled() {
- System.out.println(request + " cancelled");
- latch1.countDown();
- }
-
- });
- }
-
- System.out.println("Submitting second batch immediately (may reject)");
- for (final SimpleHttpRequest request : batch2) {
- System.out.println("Executing request " + request);
- client.execute(
- SimpleRequestProducer.create(request),
- SimpleResponseConsumer.create(),
- new FutureCallback() {
-
- @Override
- public void completed(final SimpleHttpResponse response) {
- System.out.println(request + "->" + new StatusLine(response));
- latchAttempt2.countDown();
- }
-
- @Override
- public void failed(final Exception ex) {
- if (ex instanceof RejectedExecutionException) {
- rejected.incrementAndGet();
- System.out.println(request + "-> rejected: " + ex.getMessage());
- } else {
- System.out.println(request + "->" + ex);
- }
- latchAttempt2.countDown();
- }
-
- @Override
- public void cancelled() {
- System.out.println(request + " cancelled");
- latchAttempt2.countDown();
- }
-
- });
- }
-
- System.out.println("Waiting for first batch to complete");
- latch1.await();
-
- System.out.println("Waiting for second batch completion");
- latchAttempt2.await();
-
- if (rejected.get() == batch2.size()) {
- System.out.println("Re-submitting second batch after completion (should execute now)");
- final CountDownLatch latch2 = new CountDownLatch(batch2.size());
- for (final SimpleHttpRequest request : batch2) {
- System.out.println("Executing request " + request);
- client.execute(
- SimpleRequestProducer.create(request),
- SimpleResponseConsumer.create(),
- new FutureCallback() {
-
- @Override
- public void completed(final SimpleHttpResponse response) {
- System.out.println(request + "->" + new StatusLine(response));
- latch2.countDown();
- }
-
- @Override
- public void failed(final Exception ex) {
- System.out.println(request + "->" + ex);
- latch2.countDown();
- }
-
- @Override
- public void cancelled() {
- System.out.println(request + " cancelled");
- latch2.countDown();
- }
-
- });
- }
- latch2.await();
- }
-
- System.out.println("Shutting down");
- client.close(CloseMode.GRACEFUL);
- }
-
-}
diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/examples/AsyncSharedClientQueueExample.java b/httpclient5/src/test/java/org/apache/hc/client5/http/examples/AsyncSharedClientQueueExample.java
new file mode 100644
index 0000000000..2cd396cb47
--- /dev/null
+++ b/httpclient5/src/test/java/org/apache/hc/client5/http/examples/AsyncSharedClientQueueExample.java
@@ -0,0 +1,82 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * .
+ *
+ */
+package org.apache.hc.client5.http.examples;
+
+import java.util.concurrent.Future;
+
+import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
+import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
+import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder;
+import org.apache.hc.client5.http.async.methods.SimpleRequestProducer;
+import org.apache.hc.client5.http.async.methods.SimpleResponseConsumer;
+import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
+import org.apache.hc.client5.http.impl.async.HttpAsyncClients;
+import org.apache.hc.core5.http.HttpHost;
+import org.apache.hc.core5.http.message.StatusLine;
+import org.apache.hc.core5.io.CloseMode;
+
+/**
+ * Demonstrates client-level request throttling for the async client using a shared FIFO queue.
+ * The {@code setMaxQueuedRequests} setting caps the number of concurrently executing requests
+ * for the client instance; requests submitted beyond that limit are queued in FIFO order.
+ *
+ * @since 5.7
+ */
+public final class AsyncSharedClientQueueExample {
+
+ public static void main(final String[] args) throws Exception {
+
+ final CloseableHttpAsyncClient client = HttpAsyncClients.custom()
+ .setMaxQueuedRequests(2)
+ .build();
+
+ client.start();
+
+ final HttpHost target = new HttpHost("https", "nghttp2.org");
+ final String[] paths = {"/httpbin", "/httpbin/ip", "/httpbin/user-agent", "/httpbin/headers"};
+
+ for (final String path : paths) {
+ final SimpleHttpRequest request = SimpleRequestBuilder.get()
+ .setHttpHost(target)
+ .setPath(path)
+ .build();
+
+ final Future future = client.execute(
+ SimpleRequestProducer.create(request),
+ SimpleResponseConsumer.create(),
+ null);
+ final SimpleHttpResponse response = future.get();
+ System.out.println(request + " -> " + new StatusLine(response));
+ }
+
+ client.close(CloseMode.GRACEFUL);
+ }
+
+ private AsyncSharedClientQueueExample() {
+ }
+
+}
diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/examples/H2SharedClientQueueLocalExample.java b/httpclient5/src/test/java/org/apache/hc/client5/http/examples/H2SharedClientQueueLocalExample.java
new file mode 100644
index 0000000000..bd0fe0d20f
--- /dev/null
+++ b/httpclient5/src/test/java/org/apache/hc/client5/http/examples/H2SharedClientQueueLocalExample.java
@@ -0,0 +1,83 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * .
+ *
+ */
+package org.apache.hc.client5.http.examples;
+
+import java.util.concurrent.Future;
+
+import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
+import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
+import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder;
+import org.apache.hc.client5.http.async.methods.SimpleRequestProducer;
+import org.apache.hc.client5.http.async.methods.SimpleResponseConsumer;
+import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
+import org.apache.hc.client5.http.impl.async.H2AsyncClientBuilder;
+import org.apache.hc.core5.http.HttpHost;
+import org.apache.hc.core5.http.message.StatusLine;
+import org.apache.hc.core5.io.CloseMode;
+
+/**
+ * Demonstrates client-level request throttling for the HTTP/2 async client using a shared FIFO queue.
+ * The {@link H2AsyncClientBuilder#setMaxQueuedRequests(int)} setting caps the number of concurrently
+ * executing requests for the client instance; requests submitted beyond that limit are queued in FIFO
+ * order and dispatched as in-flight slots become available.
+ *
+ * @since 5.7
+ */
+public final class H2SharedClientQueueLocalExample {
+
+ public static void main(final String[] args) throws Exception {
+
+ final CloseableHttpAsyncClient client = H2AsyncClientBuilder.create()
+ .setMaxQueuedRequests(2)
+ .build();
+
+ client.start();
+
+ final HttpHost target = new HttpHost("https", "nghttp2.org");
+ final String[] paths = {"/httpbin", "/httpbin/ip", "/httpbin/user-agent", "/httpbin/headers"};
+
+ for (final String path : paths) {
+ final SimpleHttpRequest request = SimpleRequestBuilder.get()
+ .setHttpHost(target)
+ .setPath(path)
+ .build();
+
+ final Future future = client.execute(
+ SimpleRequestProducer.create(request),
+ SimpleResponseConsumer.create(),
+ null);
+ final SimpleHttpResponse response = future.get();
+ System.out.println(request + " -> " + new StatusLine(response));
+ }
+
+ client.close(CloseMode.GRACEFUL);
+ }
+
+ private H2SharedClientQueueLocalExample() {
+ }
+
+}
diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/HttpAsyncClientBuilderMaxQueuedRequestsTest.java b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/HttpAsyncClientBuilderMaxQueuedRequestsTest.java
index f37bd43c71..15848db9e4 100644
--- a/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/HttpAsyncClientBuilderMaxQueuedRequestsTest.java
+++ b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/HttpAsyncClientBuilderMaxQueuedRequestsTest.java
@@ -29,13 +29,14 @@
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import java.io.IOException;
import java.net.SocketAddress;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
-import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hc.client5.http.EndpointInfo;
@@ -120,8 +121,14 @@ public void cancelled() {
}
});
- assertTrue(latch.await(2, TimeUnit.SECONDS), "rejection should arrive quickly");
- assertInstanceOf(RejectedExecutionException.class, failure.get(), "Expected RejectedExecutionException, got: " + failure.get());
+ // With queueing semantics, r2 is queued and must not complete until r1 is finished.
+ assertTrue(!latch.await(200, TimeUnit.MILLISECONDS), "second request should be queued");
+
+ // Finish r1, release the slot -> r2 should now execute and fail (see BlockingEndpoint.execute()).
+ endpoint.failFirst(new IOException("release-slot"));
+
+ assertTrue(latch.await(2, TimeUnit.SECONDS), "second request should complete after slot released");
+ assertInstanceOf(IOException.class, failure.get(), "Expected IOException, got: " + failure.get());
}
}
@@ -179,11 +186,27 @@ public void close() {
private static final class BlockingEndpoint extends AsyncConnectionEndpoint {
volatile boolean connected = true;
+ private final AtomicInteger execCount = new AtomicInteger(0);
+ private final AtomicReference first = new AtomicReference<>();
+
@Override
public void execute(final String id,
final AsyncClientExchangeHandler handler,
final HandlerFactory pushHandlerFactory,
final HttpContext context) {
+ final int n = execCount.incrementAndGet();
+ if (n == 1) {
+ first.set(handler); // keep slot occupied
+ } else {
+ handler.failed(new IOException("queued request executed"));
+ }
+ }
+
+ void failFirst(final Exception ex) {
+ final AsyncClientExchangeHandler h = first.getAndSet(null);
+ if (h != null) {
+ h.failed(ex);
+ }
}
@Override
diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/InternalH2AsyncExecRuntimeQueueCapTest.java b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/InternalH2AsyncExecRuntimeQueueCapTest.java
index 0309d8710d..7a63e863f0 100644
--- a/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/InternalH2AsyncExecRuntimeQueueCapTest.java
+++ b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/InternalH2AsyncExecRuntimeQueueCapTest.java
@@ -37,7 +37,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
-import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -68,16 +67,17 @@
public class InternalH2AsyncExecRuntimeQueueCapTest {
- private static InternalH2AsyncExecRuntime newRuntime(final int maxQueued) {
+ private static InternalH2AsyncExecRuntime newRuntime(final int maxConcurrent) {
final IOSession ioSession = newImmediateFailSession();
final FakeH2ConnPool connPool = new FakeH2ConnPool(ioSession);
- final AtomicInteger queued = maxQueued > 0 ? new AtomicInteger(0) : null;
+
+ final SharedRequestExecutionQueue queue = maxConcurrent > 0 ? new SharedRequestExecutionQueue(maxConcurrent) : null;
+
return new InternalH2AsyncExecRuntime(
LoggerFactory.getLogger("test"),
connPool,
new NoopPushFactory(),
- maxQueued,
- queued);
+ queue);
}
private static void acquireEndpoint(
@@ -109,19 +109,19 @@ public void cancelled() {
}
/**
- * With no cap (maxQueued <= 0) the recursive re-entry path should blow the stack.
- * This documents the pathological behaviour without queue protection.
+ * With no cap / no queue, the re-entrant execute path can recurse until SOE
+ * if failures are delivered synchronously.
*/
@Test
void testRecursiveReentryCausesSOEWithoutCap() throws Exception {
- final InternalH2AsyncExecRuntime runtime = newRuntime(-1);
+ final InternalH2AsyncExecRuntime runtime = newRuntime(0);
final HttpClientContext ctx = HttpClientContext.create();
ctx.setRequestConfig(RequestConfig.custom().build());
acquireEndpoint(runtime, ctx);
- final ReentrantHandler loop = new ReentrantHandler(runtime, ctx);
+ final ReentrantHandler loop = new ReentrantHandler(runtime, ctx, Integer.MAX_VALUE);
assertThrows(StackOverflowError.class, () -> {
runtime.execute("loop", loop, ctx);
@@ -129,8 +129,8 @@ void testRecursiveReentryCausesSOEWithoutCap() throws Exception {
}
/**
- * With a cap of 1, the second re-entrant execute call must be rejected and
- * the recursion broken.
+ * With cap=1 and QUEUE semantics, re-entrant submissions should be queued,
+ * not executed inline recursively. This test must NOT expect rejection.
*/
@Test
void testCapBreaksRecursiveReentry() throws Exception {
@@ -141,14 +141,13 @@ void testCapBreaksRecursiveReentry() throws Exception {
acquireEndpoint(runtime, ctx);
- final ReentrantHandler loop = new ReentrantHandler(runtime, ctx);
+ final int maxAttempts = 200;
+ final ReentrantHandler loop = new ReentrantHandler(runtime, ctx, maxAttempts);
runtime.execute("loop", loop, ctx);
- // immediate fail path runs synchronously; small wait is just defensive
- Thread.sleep(50);
- assertTrue(loop.lastException.get() instanceof RejectedExecutionException,
- "Expected queue rejection to break recursion");
+ assertTrue(loop.done.await(2, TimeUnit.SECONDS), "expected bounded re-entry loop to terminate");
+ assertTrue(loop.attempts.get() >= maxAttempts, "expected at least " + maxAttempts + " attempts, got " + loop.attempts.get());
}
/**
@@ -168,8 +167,7 @@ public Future getSession(
final HttpRoute route,
final Timeout timeout,
final FutureCallback callback) {
- final CompletableFuture cf = new CompletableFuture<>();
- cf.complete(session);
+ final CompletableFuture cf = CompletableFuture.completedFuture(session);
if (callback != null) {
callback.completed(session);
}
@@ -228,19 +226,33 @@ private static final class ReentrantHandler implements AsyncClientExchangeHandle
private final InternalH2AsyncExecRuntime runtime;
private final HttpClientContext context;
- final AtomicReference lastException = new AtomicReference<>();
+ private final int maxAttempts;
+
+ final AtomicInteger attempts;
+ final AtomicReference lastException;
+ final CountDownLatch done;
- ReentrantHandler(final InternalH2AsyncExecRuntime runtime, final HttpClientContext context) {
+ ReentrantHandler(final InternalH2AsyncExecRuntime runtime, final HttpClientContext context, final int maxAttempts) {
this.runtime = runtime;
this.context = context;
+ this.maxAttempts = maxAttempts;
+ this.attempts = new AtomicInteger(0);
+ this.lastException = new AtomicReference<>();
+ this.done = new CountDownLatch(1);
}
@Override
public void failed(final Exception cause) {
lastException.set(cause);
- if (!(cause instanceof RejectedExecutionException)) {
- runtime.execute("loop/reenter", this, context);
+
+ final int n = attempts.incrementAndGet();
+ if (n >= maxAttempts) {
+ done.countDown();
+ return;
}
+
+ // Re-enter. With QUEUE cap=1 this must not recurse inline until SOE.
+ runtime.execute("loop/reenter/" + n, this, context);
}
@Override
diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncExecRuntimeQueueCapTest.java b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncExecRuntimeQueueCapTest.java
index 6aae8413af..0a00c17fbf 100644
--- a/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncExecRuntimeQueueCapTest.java
+++ b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncExecRuntimeQueueCapTest.java
@@ -27,6 +27,7 @@
package org.apache.hc.client5.http.impl.async;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -41,7 +42,6 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
-import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -79,112 +79,163 @@
public class InternalHttpAsyncExecRuntimeQueueCapTest {
@Test
- void testFailFastWhenQueueFull() throws Exception {
+ void testRequestsQueuedWhenOverCap() throws Exception {
final FakeEndpoint endpoint = new FakeEndpoint();
final FakeManager manager = new FakeManager(endpoint);
+
final InternalHttpAsyncExecRuntime runtime = new InternalHttpAsyncExecRuntime(
LoggerFactory.getLogger("test"),
manager,
new NoopInitiator(),
new NoopPushFactory(),
TlsConfig.DEFAULT,
- 2,
- new AtomicInteger()
- );
+ new SharedRequestExecutionQueue(2));
final HttpClientContext ctx = HttpClientContext.create();
ctx.setRequestConfig(RequestConfig.custom().build());
- runtime.acquireEndpoint("id", new HttpRoute(new HttpHost("localhost", 80)), null, ctx, new FutureCallback() {
- @Override
- public void completed(final AsyncExecRuntime result) {
- }
-
- @Override
- public void failed(final Exception ex) {
- fail(ex);
- }
-
- @Override
- public void cancelled() {
- fail("cancelled");
- }
- });
-
- final CountDownLatch rejected = new CountDownLatch(1);
+ runtime.acquireEndpoint("id", new HttpRoute(new HttpHost("localhost", 80)), null, ctx, new NoopRuntimeCallback());
final LatchingHandler h1 = new LatchingHandler();
final LatchingHandler h2 = new LatchingHandler();
+ final LatchingHandler h3 = new LatchingHandler();
+
runtime.execute("r1", h1, ctx);
runtime.execute("r2", h2, ctx);
-
- final LatchingHandler h3 = new LatchingHandler() {
- @Override
- public void failed(final Exception cause) {
- super.failed(cause);
- rejected.countDown();
- }
- };
runtime.execute("r3", h3, ctx);
- assertTrue(rejected.await(2, TimeUnit.SECONDS), "r3 should be failed fast");
- assertTrue(h3.failedException.get() instanceof RejectedExecutionException);
+ assertTrue(waitFor(() -> endpoint.executedCount() == 2, 2, TimeUnit.SECONDS), "r1 and r2 should start");
+ assertTrue(endpoint.executedIds.contains("r1"));
+ assertTrue(endpoint.executedIds.contains("r2"));
+ assertTrue(!endpoint.executedIds.contains("r3"), "r3 should be queued, not started yet");
+
+ endpoint.completeOne(); // releases a slot and should trigger queued r3
+
+ assertTrue(waitFor(() -> endpoint.executedIds.contains("r3"), 2, TimeUnit.SECONDS), "r3 should start after slot release");
+
assertNull(h1.failedException.get());
assertNull(h2.failedException.get());
+ assertNull(h3.failedException.get());
}
@Test
void testSlotReleasedOnTerminalSignalAllowsNext() throws Exception {
final FakeEndpoint endpoint = new FakeEndpoint();
final FakeManager manager = new FakeManager(endpoint);
+
final InternalHttpAsyncExecRuntime runtime = new InternalHttpAsyncExecRuntime(
LoggerFactory.getLogger("test"),
manager,
new NoopInitiator(),
new NoopPushFactory(),
TlsConfig.DEFAULT,
- 1,
- new AtomicInteger()
- );
+ new SharedRequestExecutionQueue(1));
final HttpClientContext ctx = HttpClientContext.create();
ctx.setRequestConfig(RequestConfig.custom().build());
- runtime.acquireEndpoint("id", new HttpRoute(new HttpHost("localhost", 80)), null, ctx,
- new FutureCallback() {
- @Override
- public void completed(final AsyncExecRuntime result) {
- }
-
- @Override
- public void failed(final Exception ex) {
- fail(ex);
- }
-
- @Override
- public void cancelled() {
- fail("cancelled");
- }
- });
+ runtime.acquireEndpoint("id", new HttpRoute(new HttpHost("localhost", 80)), null, ctx, new NoopRuntimeCallback());
final LatchingHandler h1 = new LatchingHandler();
- runtime.execute("r1", h1, ctx);
-
final LatchingHandler h2 = new LatchingHandler();
+
+ runtime.execute("r1", h1, ctx);
runtime.execute("r2", h2, ctx);
- assertTrue(h2.awaitFailed(2, TimeUnit.SECONDS));
- assertTrue(h2.failedException.get() instanceof RejectedExecutionException);
- // free the slot via releaseResources(), not failed()
+ assertTrue(waitFor(() -> endpoint.executedIds.contains("r1"), 2, TimeUnit.SECONDS));
+ assertTrue(!endpoint.executedIds.contains("r2"), "r2 should be queued until r1 completes");
+
endpoint.completeOne();
- final LatchingHandler h3 = new LatchingHandler();
- runtime.execute("r3", h3, ctx);
- Thread.sleep(150);
- assertNull(h3.failedException.get(), "r3 should not be rejected after slot released");
- h3.cancel();
+ assertTrue(waitFor(() -> endpoint.executedIds.contains("r2"), 2, TimeUnit.SECONDS), "r2 should start after r1 completes");
+
+ assertNull(h1.failedException.get());
+ assertNull(h2.failedException.get());
}
+ @Test
+ void testRecursiveReentryCausesSOEWithoutCap() {
+ final ImmediateFailEndpoint endpoint = new ImmediateFailEndpoint();
+ final FakeManager manager = new FakeManager(endpoint);
+
+ // no queue => old synchronous recursion behaviour remains
+ final InternalHttpAsyncExecRuntime runtime = new InternalHttpAsyncExecRuntime(
+ LoggerFactory.getLogger("test"),
+ manager,
+ new NoopInitiator(),
+ new NoopPushFactory(),
+ TlsConfig.DEFAULT,
+ null);
+
+ final HttpClientContext ctx = HttpClientContext.create();
+ ctx.setRequestConfig(RequestConfig.custom().build());
+
+ runtime.acquireEndpoint("id", new HttpRoute(new HttpHost("localhost", 80)), null, ctx, new NoopRuntimeCallback());
+
+ final ReentrantHandler loop = new ReentrantHandler(runtime, ctx);
+
+ assertThrows(StackOverflowError.class, () -> runtime.execute("loop", loop, ctx));
+ }
+
+ @Test
+ void testCapBreaksRecursiveReentry() throws Exception {
+ final ImmediateFailEndpoint endpoint = new ImmediateFailEndpoint();
+ final FakeManager manager = new FakeManager(endpoint);
+
+ // queue => no synchronous recursion -> no SOE
+ final InternalHttpAsyncExecRuntime runtime = new InternalHttpAsyncExecRuntime(
+ LoggerFactory.getLogger("test"),
+ manager,
+ new NoopInitiator(),
+ new NoopPushFactory(),
+ TlsConfig.DEFAULT,
+ new SharedRequestExecutionQueue(1));
+
+ final HttpClientContext ctx = HttpClientContext.create();
+ ctx.setRequestConfig(RequestConfig.custom().build());
+
+ runtime.acquireEndpoint("id", new HttpRoute(new HttpHost("localhost", 80)), null, ctx, new NoopRuntimeCallback());
+
+ final CountDownLatch done = new CountDownLatch(1);
+ final BoundedReentrantHandler loop = new BoundedReentrantHandler(runtime, ctx, 50, done);
+
+ assertDoesNotThrow(() -> runtime.execute("loop", loop, ctx));
+ assertTrue(done.await(2, TimeUnit.SECONDS), "Expected bounded re-entry loop to complete without SOE");
+ assertTrue(loop.invocations.get() >= 1);
+ assertTrue(loop.lastException.get() instanceof IOException);
+ }
+
+ private static boolean waitFor(final Condition condition, final long time, final TimeUnit unit) throws InterruptedException {
+ final long deadline = System.nanoTime() + unit.toNanos(time);
+ while (System.nanoTime() < deadline) {
+ if (condition.get()) {
+ return true;
+ }
+ Thread.sleep(10);
+ }
+ return condition.get();
+ }
+
+ @FunctionalInterface
+ private interface Condition {
+ boolean get();
+ }
+
+ private static final class NoopRuntimeCallback implements FutureCallback {
+ @Override
+ public void completed(final AsyncExecRuntime result) {
+ }
+
+ @Override
+ public void failed(final Exception ex) {
+ fail(ex);
+ }
+
+ @Override
+ public void cancelled() {
+ fail("cancelled");
+ }
+ }
private static final class NoopInitiator implements ConnectionInitiator {
@Override
@@ -267,36 +318,37 @@ public void close() {
private static final class FakeEndpoint extends AsyncConnectionEndpoint {
volatile boolean connected = true;
- private final ConcurrentLinkedQueue inFlight = new ConcurrentLinkedQueue<>();
+
+ private static final class InFlightEntry {
+ final String id;
+ final AsyncClientExchangeHandler handler;
+
+ InFlightEntry(final String id, final AsyncClientExchangeHandler handler) {
+ this.id = id;
+ this.handler = handler;
+ }
+ }
+
+ final ConcurrentLinkedQueue executedIds = new ConcurrentLinkedQueue<>();
+ private final ConcurrentLinkedQueue inFlight = new ConcurrentLinkedQueue<>();
@Override
public void execute(final String id,
final AsyncClientExchangeHandler handler,
final HandlerFactory pushHandlerFactory,
final HttpContext context) {
- // keep the guarded handler so tests can signal terminal events
- inFlight.add(handler);
- }
-
- // helpers for tests
- void failOne(final Exception ex) {
- final AsyncClientExchangeHandler h = inFlight.poll();
- if (h != null) {
- h.failed(ex);
- }
+ executedIds.add(id);
+ inFlight.add(new InFlightEntry(id, handler));
}
- void cancelOne() {
- final AsyncClientExchangeHandler h = inFlight.poll();
- if (h != null) {
- h.cancel();
- }
+ int executedCount() {
+ return executedIds.size();
}
void completeOne() {
- final AsyncClientExchangeHandler h = inFlight.poll();
- if (h != null) {
- h.releaseResources();
+ final InFlightEntry e = inFlight.poll();
+ if (e != null) {
+ e.handler.releaseResources();
}
}
@@ -320,8 +372,6 @@ public EndpointInfo getInfo() {
}
}
-
-
private static class LatchingHandler implements AsyncClientExchangeHandler {
final AtomicReference failedException = new AtomicReference<>();
final CountDownLatch failLatch = new CountDownLatch(1);
@@ -378,94 +428,6 @@ public void failed(final Exception cause) {
}
}
- @Test
- void testRecursiveReentryCausesSOEWithoutCap() {
- final ImmediateFailEndpoint endpoint = new ImmediateFailEndpoint();
- final FakeManager manager = new FakeManager(endpoint);
-
- final InternalHttpAsyncExecRuntime runtime = new InternalHttpAsyncExecRuntime(
- LoggerFactory.getLogger("test"),
- manager,
- new NoopInitiator(),
- new NoopPushFactory(),
- TlsConfig.DEFAULT,
- -1,
- null // no cap, no counter
- );
-
- final HttpClientContext ctx = HttpClientContext.create();
- ctx.setRequestConfig(RequestConfig.custom().build());
-
- runtime.acquireEndpoint("id", new HttpRoute(new HttpHost("localhost", 80)), null, ctx,
- new FutureCallback() {
- @Override
- public void completed(final AsyncExecRuntime result) {
- }
-
- @Override
- public void failed(final Exception ex) {
- fail(ex);
- }
-
- @Override
- public void cancelled() {
- fail("cancelled");
- }
- });
-
- final ReentrantHandler loop = new ReentrantHandler(runtime, ctx);
-
- assertThrows(StackOverflowError.class, () -> {
- runtime.execute("loop", loop, ctx); // execute -> endpoint.execute -> failed() -> execute -> ...
- });
- }
-
- @Test
- void testCapBreaksRecursiveReentry() throws Exception {
- final ImmediateFailEndpoint endpoint = new ImmediateFailEndpoint();
- final FakeManager manager = new FakeManager(endpoint);
-
- final InternalHttpAsyncExecRuntime runtime = new InternalHttpAsyncExecRuntime(
- LoggerFactory.getLogger("test"),
- manager,
- new NoopInitiator(),
- new NoopPushFactory(),
- TlsConfig.DEFAULT,
- 1,
- new AtomicInteger()
- );
-
- final HttpClientContext ctx = HttpClientContext.create();
- ctx.setRequestConfig(RequestConfig.custom().build());
-
- runtime.acquireEndpoint("id", new HttpRoute(new HttpHost("localhost", 80)), null, ctx,
- new FutureCallback() {
- @Override
- public void completed(final AsyncExecRuntime result) {
- }
-
- @Override
- public void failed(final Exception ex) {
- fail(ex);
- }
-
- @Override
- public void cancelled() {
- fail("cancelled");
- }
- });
-
- final ReentrantHandler loop = new ReentrantHandler(runtime, ctx);
-
- // Should NOT blow the stack; the re-entrant call should be rejected.
- runtime.execute("loop", loop, ctx);
- // allow the immediate fail+re-submit path to run
- Thread.sleep(50);
-
- assertTrue(loop.lastException.get() instanceof RejectedExecutionException,
- "Expected rejection to break the recursion");
- }
-
/**
* Endpoint that synchronously fails any handler passed to execute().
*/
@@ -513,9 +475,78 @@ private static final class ReentrantHandler implements AsyncClientExchangeHandle
@Override
public void failed(final Exception cause) {
lastException.set(cause);
- // Re-enter only if this was NOT the cap rejecting us
- if (!(cause instanceof RejectedExecutionException)) {
+ runtime.execute("loop/reenter", this, ctx);
+ }
+
+ @Override
+ public void produceRequest(final RequestChannel channel, final HttpContext context) {
+ }
+
+ @Override
+ public void consumeResponse(final HttpResponse response, final EntityDetails entityDetails, final HttpContext context) {
+ }
+
+ @Override
+ public void consumeInformation(final HttpResponse response, final HttpContext context) {
+ }
+
+ @Override
+ public void cancel() {
+ }
+
+ @Override
+ public int available() {
+ return 0;
+ }
+
+ @Override
+ public void produce(final DataStreamChannel channel) {
+ }
+
+ @Override
+ public void updateCapacity(final CapacityChannel capacityChannel) {
+ }
+
+ @Override
+ public void consume(final ByteBuffer src) {
+ }
+
+ @Override
+ public void streamEnd(final List extends Header> trailers) {
+ }
+
+ @Override
+ public void releaseResources() {
+ }
+ }
+
+ private static final class BoundedReentrantHandler implements AsyncClientExchangeHandler {
+ private final InternalHttpAsyncExecRuntime runtime;
+ private final HttpClientContext ctx;
+ private final AtomicInteger remaining;
+ private final CountDownLatch done;
+
+ final AtomicInteger invocations = new AtomicInteger(0);
+ final AtomicReference lastException = new AtomicReference<>();
+
+ BoundedReentrantHandler(final InternalHttpAsyncExecRuntime runtime,
+ final HttpClientContext ctx,
+ final int maxReentries,
+ final CountDownLatch done) {
+ this.runtime = runtime;
+ this.ctx = ctx;
+ this.remaining = new AtomicInteger(maxReentries);
+ this.done = done;
+ }
+
+ @Override
+ public void failed(final Exception cause) {
+ invocations.incrementAndGet();
+ lastException.set(cause);
+ if (remaining.getAndDecrement() > 0) {
runtime.execute("loop/reenter", this, ctx);
+ } else {
+ done.countDown();
}
}