Skip to content

Commit 44ab0d4

Browse files
committed
fix(http): Use ScopedValue for per-request state, not HttpExchange attributes
HttpExchange.setAttribute writes to a context-shared map; concurrent requests overwrote each other's body bytes/operation-id/path-parameters, which manifested in k6 as ~30% empty 200 response bodies under modest keep-alive load. The original master code avoided this via a private attribute map on its BodyHandler.RequestBodyWrapper, which the refactor inadvertently dropped. Replaces the four exchange attributes with a single immutable RequestContext bound to ScopedValue.CONTEXT for the duration of the request. Request.bytes/parsed/operationId/pathParams are now no-arg accessors on the bound value; they throw NoSuchElementException if read outside a request scope. Verified with the curl-equivalent k6 profile: 30 sustained VUs / 45 s / 1,436,394 requests / 2,393,990 checks → 100% pass, 0% HTTP failures. README updated to retract the keep-alive caveat (it was a misdiagnosis) and document the ScopedValue ⇒ off-thread capture rule for handlers.
1 parent f01387c commit 44ab0d4

12 files changed

Lines changed: 171 additions & 85 deletions

File tree

.claude/settings.local.json

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
{
2+
"permissions": {
3+
"allow": [
4+
"Bash(mvn *)",
5+
"Bash(git add *)",
6+
"Bash(git commit *)",
7+
"Bash(git push *)",
8+
"Bash(k6 run *)"
9+
]
10+
}
11+
}

README.md

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -124,13 +124,13 @@ Schemas are located under test resources folder.
124124

125125
The library wraps the JDK's bundled `com.sun.net.httpserver.HttpServer` and uses a virtual-thread-per-request executor. On a developer laptop (Apple Silicon, single instance, default JVM flags) it sustains roughly:
126126

127-
- **20k–35k requests/second** for small JSON GETs and POSTs (~300 byte bodies), measured both via parallelised `curl` (no connection reuse) and via `k6` at moderate VU counts.
128-
- The HTTP layer reports **0% request failures** even when sustained at 30 concurrent virtual users for 45 seconds (~1.5M requests).
127+
- **~32k requests/second** for small JSON GETs and POSTs (~300 byte bodies), measured via `k6` at 30 sustained VUs over 45 seconds (1.4M requests, **100% of checks passing**, 0% HTTP failures).
129128

130-
That said, the underlying JDK HttpServer is documented as a low-throughput / dev-test server, and a few caveats are worth knowing:
129+
A few things to know:
131130

132-
- **Keep-alive reuse at high concurrency.** k6 and other clients that aggressively reuse HTTP/1.1 connections will occasionally observe empty response bodies (with the correct `Content-Length` header) when running tens of VUs against this server. Curl-style clients that open a fresh connection per request do not see this. If you need consistent throughput above a few thousand RPS with keep-alive clients, consider deploying behind a real HTTP server (Jetty, Helidon Níma, Netty) — this library's filter/validator stack is independent of the underlying server and could be ported.
133131
- **Single-process model.** No horizontal scaling primitives are bundled; run multiple instances behind a load balancer for production scale.
134-
- **`HttpExchange.sendResponseHeaders(rCode, length)` semantics.** When a handler has no response body, pass `-1` (no body, `Content-Length: 0`); passing `0` produces a chunked response with zero chunks, which is technically non-conformant.
132+
- **JDK HttpServer is the throughput ceiling.** It's documented as a low-throughput / dev-test server. If you need to go materially above the rates above, deploy the same filter/validator/router stack on Jetty, Helidon Níma, or Netty — the spec and validation code is server-agnostic.
133+
- **Per-request state uses `ScopedValue`** (Java 25, JEP 506), not `HttpExchange.setAttribute`. This matters if a handler offloads work to an executor that's not a `StructuredTaskScope`-managed child thread: the `ScopedValue` is not visible there, so the handler must capture the values it needs (e.g. `byte[] body = Request.bytes();`) before submitting.
134+
- **`HttpExchange.sendResponseHeaders(rCode, length)` gotcha.** When a handler has no response body, pass `-1` (`Content-Length: 0`, no body); passing `0` produces a chunked response with zero chunks, which is technically non-conformant.
135135

136136
## Known limitations or missing features
Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,39 @@
11
package com.retailsvc.http;
22

3-
import com.sun.net.httpserver.HttpExchange;
3+
import com.retailsvc.http.internal.RequestContext;
44
import java.util.Map;
55

6+
/**
7+
* Static accessors for per-request state populated by the request-preparation filter.
8+
*
9+
* <p>The state is bound to a {@link ScopedValue} for the duration of the request rather than stored
10+
* on the {@code HttpExchange}, because {@code HttpExchange.setAttribute} writes to a context-shared
11+
* map and would race across concurrent requests.
12+
*
13+
* <p>If a handler dispatches work to a non-structured executor (i.e. not a {@code
14+
* StructuredTaskScope}-managed thread), it must capture the values it needs before submitting — the
15+
* {@link ScopedValue} is not visible from arbitrary worker threads.
16+
*/
617
public final class Request {
7-
public static final String BODY = "body";
8-
public static final String PARSED_BODY = "parsed-body";
9-
public static final String OPERATION_ID = "operation-id";
10-
public static final String PATH_PARAMETERS = "path-parameters";
18+
19+
/** Bound by {@code RequestPreparationFilter} for the duration of each request. */
20+
public static final ScopedValue<RequestContext> CONTEXT = ScopedValue.newInstance();
1121

1222
private Request() {}
1323

14-
public static byte[] bytes(HttpExchange e) {
15-
return (byte[]) e.getAttribute(BODY);
24+
public static byte[] bytes() {
25+
return CONTEXT.get().body();
1626
}
1727

18-
public static Object parsed(HttpExchange e) {
19-
return e.getAttribute(PARSED_BODY);
28+
public static Object parsed() {
29+
return CONTEXT.get().parsedBody();
2030
}
2131

22-
public static String operationId(HttpExchange e) {
23-
return (String) e.getAttribute(OPERATION_ID);
32+
public static String operationId() {
33+
return CONTEXT.get().operationId();
2434
}
2535

26-
@SuppressWarnings("unchecked")
27-
public static Map<String, String> pathParams(HttpExchange e) {
28-
return (Map<String, String>) e.getAttribute(PATH_PARAMETERS);
36+
public static Map<String, String> pathParams() {
37+
return CONTEXT.get().pathParameters();
2938
}
3039
}

src/main/java/com/retailsvc/http/internal/DispatchHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ public DispatchHandler(Map<String, HttpHandler> handlers) {
1616

1717
@Override
1818
public void handle(HttpExchange exchange) throws IOException {
19-
String opId = Request.operationId(exchange);
19+
String opId = Request.operationId();
2020
HttpHandler h = handlers.get(opId);
2121
if (h == null) {
2222
throw new MissingOperationHandlerException(opId);
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package com.retailsvc.http.internal;
2+
3+
import java.util.Map;
4+
5+
/**
6+
* Immutable per-request data populated by {@link RequestPreparationFilter} and read by handlers via
7+
* {@link com.retailsvc.http.Request}. Bound to a {@link ScopedValue} for the duration of a single
8+
* request — never written to the {@code HttpExchange}'s context-shared attribute map.
9+
*/
10+
public record RequestContext(
11+
byte[] body, Object parsedBody, String operationId, Map<String, String> pathParameters) {}

src/main/java/com/retailsvc/http/internal/RequestPreparationFilter.java

Lines changed: 32 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,9 @@
11
package com.retailsvc.http.internal;
22

3-
import static com.retailsvc.http.Request.BODY;
4-
import static com.retailsvc.http.Request.OPERATION_ID;
5-
import static com.retailsvc.http.Request.PARSED_BODY;
6-
import static com.retailsvc.http.Request.PATH_PARAMETERS;
7-
83
import com.retailsvc.http.JsonMapper;
94
import com.retailsvc.http.MethodNotAllowedException;
105
import com.retailsvc.http.NotFoundException;
6+
import com.retailsvc.http.Request;
117
import com.retailsvc.http.ValidationException;
128
import com.retailsvc.http.spec.HttpMethod;
139
import com.retailsvc.http.spec.MediaType;
@@ -48,7 +44,6 @@ public String description() {
4844
@Override
4945
public void doFilter(HttpExchange exchange, Chain chain) throws IOException {
5046
byte[] body = exchange.getRequestBody().readAllBytes();
51-
exchange.setAttribute(BODY, body);
5247

5348
HttpMethod method = HttpMethod.parse(exchange.getRequestMethod());
5449
String path = stripBasePath(exchange.getRequestURI().getPath());
@@ -64,13 +59,35 @@ public void doFilter(HttpExchange exchange, Chain chain) throws IOException {
6459
Router.Match match = matchOpt.get();
6560

6661
Operation op = match.operation();
67-
exchange.setAttribute(OPERATION_ID, op.operationId());
68-
exchange.setAttribute(PATH_PARAMETERS, match.pathParameters());
69-
7062
validateParameters(exchange, op, match.pathParameters());
71-
validateBody(exchange, op, body);
63+
Object parsedBody = validateAndParseBody(exchange, op, body);
64+
65+
RequestContext ctx =
66+
new RequestContext(body, parsedBody, op.operationId(), match.pathParameters());
67+
68+
runWithRequestContext(ctx, () -> chain.doFilter(exchange));
69+
}
70+
71+
private static void runWithRequestContext(RequestContext ctx, IORunnable work)
72+
throws IOException {
73+
try {
74+
ScopedValue.where(Request.CONTEXT, ctx)
75+
.call(
76+
() -> {
77+
work.run();
78+
return null;
79+
});
80+
} catch (IOException | RuntimeException e) {
81+
throw e;
82+
} catch (Exception e) {
83+
// Callable.call() throws Exception; nothing else can actually be thrown by the chain.
84+
throw new IOException(e);
85+
}
86+
}
7287

73-
chain.doFilter(exchange);
88+
@FunctionalInterface
89+
private interface IORunnable {
90+
void run() throws IOException;
7491
}
7592

7693
private String stripBasePath(String path) {
@@ -108,17 +125,17 @@ private void validateParameters(
108125
}
109126
}
110127

111-
private void validateBody(HttpExchange exchange, Operation op, byte[] body) {
128+
private Object validateAndParseBody(HttpExchange exchange, Operation op, byte[] body) {
112129
Optional<RequestBody> rb = op.requestBody();
113130
if (rb.isEmpty()) {
114-
return;
131+
return null;
115132
}
116133
if (body.length == 0) {
117134
if (rb.get().required()) {
118135
throw new ValidationException(
119136
new ValidationError("/body", "required", "request body is required", null));
120137
}
121-
return;
138+
return null;
122139
}
123140
String contentType = exchange.getRequestHeaders().getFirst("Content-Type");
124141
if (contentType == null) {
@@ -132,8 +149,8 @@ private void validateBody(HttpExchange exchange, Operation op, byte[] body) {
132149
"/body", "content-type", "unsupported content type: " + contentType, null));
133150
}
134151
Object parsed = jsonMapper.mapFrom(body);
135-
exchange.setAttribute(PARSED_BODY, parsed);
136152
validator.validate(parsed, mt.schema(), "");
153+
return parsed;
137154
}
138155

139156
private static Map<String, String> parseQuery(String query) {
Lines changed: 33 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,44 @@
11
package com.retailsvc.http;
22

33
import static org.assertj.core.api.Assertions.assertThat;
4+
import static org.junit.jupiter.api.Assertions.assertThrows;
45

5-
import com.sun.net.httpserver.HttpExchange;
6+
import com.retailsvc.http.internal.RequestContext;
67
import java.util.Map;
8+
import java.util.NoSuchElementException;
9+
import java.util.concurrent.atomic.AtomicReference;
710
import org.junit.jupiter.api.Test;
8-
import org.mockito.Mockito;
911

1012
class RequestTest {
13+
14+
@Test
15+
void readsBoundContext() throws Exception {
16+
RequestContext ctx =
17+
new RequestContext(new byte[] {1, 2, 3}, Map.of("k", "v"), "get-x", Map.of("id", "42"));
18+
19+
AtomicReference<byte[]> seenBytes = new AtomicReference<>();
20+
AtomicReference<Object> seenParsed = new AtomicReference<>();
21+
AtomicReference<String> seenOpId = new AtomicReference<>();
22+
AtomicReference<Map<String, String>> seenPathParams = new AtomicReference<>();
23+
24+
ScopedValue.where(Request.CONTEXT, ctx)
25+
.call(
26+
() -> {
27+
seenBytes.set(Request.bytes());
28+
seenParsed.set(Request.parsed());
29+
seenOpId.set(Request.operationId());
30+
seenPathParams.set(Request.pathParams());
31+
return null;
32+
});
33+
34+
assertThat(seenBytes.get()).containsExactly(1, 2, 3);
35+
assertThat(seenParsed.get()).isEqualTo(Map.of("k", "v"));
36+
assertThat(seenOpId.get()).isEqualTo("get-x");
37+
assertThat(seenPathParams.get()).containsEntry("id", "42");
38+
}
39+
1140
@Test
12-
void readsAttributes() {
13-
HttpExchange ex = Mockito.mock(HttpExchange.class);
14-
Mockito.when(ex.getAttribute("body")).thenReturn(new byte[] {1, 2, 3});
15-
Mockito.when(ex.getAttribute("parsed-body")).thenReturn(Map.of("k", "v"));
16-
Mockito.when(ex.getAttribute("operation-id")).thenReturn("get-x");
17-
Mockito.when(ex.getAttribute("path-parameters")).thenReturn(Map.of("id", "42"));
18-
19-
assertThat(Request.bytes(ex)).containsExactly(1, 2, 3);
20-
assertThat(Request.parsed(ex)).isEqualTo(Map.of("k", "v"));
21-
assertThat(Request.operationId(ex)).isEqualTo("get-x");
22-
assertThat(Request.pathParams(ex)).containsEntry("id", "42");
41+
void readingOutsideScopeThrows() {
42+
assertThrows(NoSuchElementException.class, () -> Request.bytes());
2343
}
2444
}

src/test/java/com/retailsvc/http/internal/DispatchHandlerTest.java

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,23 +11,42 @@
1111
import org.mockito.Mockito;
1212

1313
class DispatchHandlerTest {
14-
private HttpExchange exchange(String operationId) {
15-
HttpExchange ex = Mockito.mock(HttpExchange.class);
16-
Mockito.when(ex.getAttribute(Request.OPERATION_ID)).thenReturn(operationId);
17-
return ex;
14+
15+
private static void withOperationId(
16+
String operationId, ScopedValue.CallableOp<Void, Exception> body) throws Exception {
17+
RequestContext ctx = new RequestContext(new byte[0], null, operationId, Map.of());
18+
ScopedValue.where(Request.CONTEXT, ctx).call(body);
1819
}
1920

2021
@Test
2122
void invokesRegisteredHandler() throws Exception {
2223
HttpHandler handler = Mockito.mock(HttpHandler.class);
23-
new DispatchHandler(Map.of("get-x", handler)).handle(exchange("get-x"));
24+
HttpExchange ex = Mockito.mock(HttpExchange.class);
25+
26+
withOperationId(
27+
"get-x",
28+
() -> {
29+
new DispatchHandler(Map.of("get-x", handler)).handle(ex);
30+
return null;
31+
});
32+
// bound op-id is "get-x"; DispatchHandler should look up the registered HttpHandler.
33+
2434
Mockito.verify(handler).handle(Mockito.any());
2535
}
2636

2737
@Test
2838
void throwsWhenHandlerMissing() {
2939
DispatchHandler d = new DispatchHandler(Map.of());
30-
HttpExchange ex = exchange("ghost");
31-
assertThatThrownBy(() -> d.handle(ex)).isInstanceOf(MissingOperationHandlerException.class);
40+
HttpExchange ex = Mockito.mock(HttpExchange.class);
41+
42+
assertThatThrownBy(
43+
() ->
44+
withOperationId(
45+
"ghost",
46+
() -> {
47+
d.handle(ex);
48+
return null;
49+
}))
50+
.isInstanceOf(MissingOperationHandlerException.class);
3251
}
3352
}

0 commit comments

Comments
 (0)