Skip to content

Commit 8696889

Browse files
committed
Add optional Executor to CopilotClientOptions; wire all internal *Async calls through it; shared timeout scheduler.
src/main/java/com/github/copilot/sdk/json/CopilotClientOptions.java - Added `Executor` field, `getExecutor()`, fluent `setExecutor(Executor)` with pending-null guard, and clone support. src/main/java/com/github/copilot/sdk/CopilotClient.java - Extracted `startCoreBody()` from `startCore()` lambda; `supplyAsync` uses provided executor when non-null. - `stop()` routes session-close `runAsync` through provided executor when non-null. - Passes executor to `RpcHandlerDispatcher` constructor. - Sets executor on new sessions via `session.setExecutor()` in `createSession` and `resumeSession`. src/main/java/com/github/copilot/sdk/RpcHandlerDispatcher.java - Added `Executor` field and 3-arg constructor. - All 5 `CompletableFuture.runAsync()` calls now go through private `runAsync(Runnable)` helper that uses executor when non-null. src/main/java/com/github/copilot/sdk/CopilotSession.java - Added `Executor` field and package-private `setExecutor()`. - Replaced per-call `ScheduledExecutorService` with shared `timeoutScheduler` (daemon thread, shut down in `close()`). - `executeToolAndRespondAsync` and `executePermissionAndRespondAsync` use executor when non-null. src/test/java/com/github/copilot/sdk/ExecutorWiringTest.java (new) - 6 E2E tests using `TrackingExecutor` decorator to verify all `*Async` paths route through the provided executor: client start, tool call, permission, user input, hooks, and client stop. src/test/java/com/github/copilot/sdk/RpcHandlerDispatcherTest.java - Updated constructor call to pass `null` for new executor parameter.
1 parent a36d145 commit 8696889

File tree

6 files changed

+497
-44
lines changed

6 files changed

+497
-44
lines changed

src/main/java/com/github/copilot/sdk/CopilotClient.java

Lines changed: 50 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import java.util.concurrent.CompletableFuture;
1414
import java.util.concurrent.CompletionException;
1515
import java.util.concurrent.ConcurrentHashMap;
16+
import java.util.concurrent.Executor;
1617
import java.util.concurrent.TimeUnit;
1718
import java.util.logging.Level;
1819
import java.util.logging.Logger;
@@ -150,42 +151,48 @@ public CompletableFuture<Void> start() {
150151
private CompletableFuture<Connection> startCore() {
151152
LOG.fine("Starting Copilot client");
152153

153-
return CompletableFuture.supplyAsync(() -> {
154-
try {
155-
JsonRpcClient rpc;
156-
Process process = null;
157-
158-
if (optionsHost != null && optionsPort != null) {
159-
// External server (TCP)
160-
rpc = serverManager.connectToServer(null, optionsHost, optionsPort);
161-
} else {
162-
// Child process (stdio or TCP)
163-
CliServerManager.ProcessInfo processInfo = serverManager.startCliServer();
164-
process = processInfo.process();
165-
rpc = serverManager.connectToServer(process, processInfo.port() != null ? "localhost" : null,
166-
processInfo.port());
167-
}
154+
Executor exec = options.getExecutor();
155+
return exec != null
156+
? CompletableFuture.supplyAsync(this::startCoreBody, exec)
157+
: CompletableFuture.supplyAsync(this::startCoreBody);
158+
}
168159

169-
Connection connection = new Connection(rpc, process);
160+
private Connection startCoreBody() {
161+
try {
162+
JsonRpcClient rpc;
163+
Process process = null;
164+
165+
if (optionsHost != null && optionsPort != null) {
166+
// External server (TCP)
167+
rpc = serverManager.connectToServer(null, optionsHost, optionsPort);
168+
} else {
169+
// Child process (stdio or TCP)
170+
CliServerManager.ProcessInfo processInfo = serverManager.startCliServer();
171+
process = processInfo.process();
172+
rpc = serverManager.connectToServer(process, processInfo.port() != null ? "localhost" : null,
173+
processInfo.port());
174+
}
170175

171-
// Register handlers for server-to-client calls
172-
RpcHandlerDispatcher dispatcher = new RpcHandlerDispatcher(sessions, lifecycleManager::dispatch);
173-
dispatcher.registerHandlers(rpc);
176+
Connection connection = new Connection(rpc, process);
174177

175-
// Verify protocol version
176-
verifyProtocolVersion(connection);
178+
// Register handlers for server-to-client calls
179+
RpcHandlerDispatcher dispatcher = new RpcHandlerDispatcher(sessions, lifecycleManager::dispatch,
180+
options.getExecutor());
181+
dispatcher.registerHandlers(rpc);
177182

178-
LOG.info("Copilot client connected");
179-
return connection;
180-
} catch (Exception e) {
181-
String stderr = serverManager.getStderrOutput();
182-
if (!stderr.isEmpty()) {
183-
throw new CompletionException(
184-
new IOException("CLI process exited unexpectedly. stderr: " + stderr, e));
185-
}
186-
throw new CompletionException(e);
183+
// Verify protocol version
184+
verifyProtocolVersion(connection);
185+
186+
LOG.info("Copilot client connected");
187+
return connection;
188+
} catch (Exception e) {
189+
String stderr = serverManager.getStderrOutput();
190+
if (!stderr.isEmpty()) {
191+
throw new CompletionException(
192+
new IOException("CLI process exited unexpectedly. stderr: " + stderr, e));
187193
}
188-
});
194+
throw new CompletionException(e);
195+
}
189196
}
190197

191198
private static final int MIN_PROTOCOL_VERSION = 2;
@@ -228,15 +235,19 @@ private void verifyProtocolVersion(Connection connection) throws Exception {
228235
*/
229236
public CompletableFuture<Void> stop() {
230237
var closeFutures = new ArrayList<CompletableFuture<Void>>();
238+
Executor exec = options.getExecutor();
231239

232240
for (CopilotSession session : new ArrayList<>(sessions.values())) {
233-
closeFutures.add(CompletableFuture.runAsync(() -> {
241+
Runnable closeTask = () -> {
234242
try {
235243
session.close();
236244
} catch (Exception e) {
237245
LOG.log(Level.WARNING, "Error closing session " + session.getSessionId(), e);
238246
}
239-
}));
247+
};
248+
closeFutures.add(exec != null
249+
? CompletableFuture.runAsync(closeTask, exec)
250+
: CompletableFuture.runAsync(closeTask));
240251
}
241252
sessions.clear();
242253

@@ -329,6 +340,9 @@ public CompletableFuture<CopilotSession> createSession(SessionConfig config) {
329340
: java.util.UUID.randomUUID().toString();
330341

331342
var session = new CopilotSession(sessionId, connection.rpc);
343+
if (options.getExecutor() != null) {
344+
session.setExecutor(options.getExecutor());
345+
}
332346
SessionRequestBuilder.configureSession(session, config);
333347
sessions.put(sessionId, session);
334348

@@ -399,6 +413,9 @@ public CompletableFuture<CopilotSession> resumeSession(String sessionId, ResumeS
399413
return ensureConnected().thenCompose(connection -> {
400414
// Register the session before the RPC call to avoid missing early events.
401415
var session = new CopilotSession(sessionId, connection.rpc);
416+
if (options.getExecutor() != null) {
417+
session.setExecutor(options.getExecutor());
418+
}
402419
SessionRequestBuilder.configureSession(session, config);
403420
sessions.put(sessionId, session);
404421

src/main/java/com/github/copilot/sdk/CopilotSession.java

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import java.util.Set;
1414
import java.util.concurrent.CompletableFuture;
1515
import java.util.concurrent.ConcurrentHashMap;
16+
import java.util.concurrent.Executor;
1617
import java.util.concurrent.Executors;
1718
import java.util.concurrent.ScheduledExecutorService;
1819
import java.util.concurrent.TimeUnit;
@@ -123,6 +124,7 @@ public final class CopilotSession implements AutoCloseable {
123124
private volatile EventErrorPolicy eventErrorPolicy = EventErrorPolicy.PROPAGATE_AND_LOG_ERRORS;
124125
private volatile Map<String, java.util.function.Function<String, CompletableFuture<String>>> transformCallbacks;
125126
private final ScheduledExecutorService timeoutScheduler;
127+
private volatile Executor executor;
126128

127129
/** Tracks whether this session instance has been terminated via close(). */
128130
private volatile boolean isTerminated = false;
@@ -166,6 +168,14 @@ public final class CopilotSession implements AutoCloseable {
166168
});
167169
}
168170

171+
/**
172+
* Sets the executor for internal async operations. Package-private; called by
173+
* CopilotClient after construction.
174+
*/
175+
void setExecutor(Executor executor) {
176+
this.executor = executor;
177+
}
178+
169179
/**
170180
* Gets the unique identifier for this session.
171181
*
@@ -651,7 +661,7 @@ private void handleBroadcastEventAsync(AbstractSessionEvent event) {
651661
*/
652662
private void executeToolAndRespondAsync(String requestId, String toolName, String toolCallId, Object arguments,
653663
ToolDefinition tool) {
654-
CompletableFuture.runAsync(() -> {
664+
Runnable task = () -> {
655665
try {
656666
JsonNode argumentsNode = arguments instanceof JsonNode jn
657667
? jn
@@ -696,7 +706,12 @@ private void executeToolAndRespondAsync(String requestId, String toolName, Strin
696706
LOG.log(Level.WARNING, "Error sending tool error for requestId=" + requestId, sendEx);
697707
}
698708
}
699-
});
709+
};
710+
if (executor != null) {
711+
CompletableFuture.runAsync(task, executor);
712+
} else {
713+
CompletableFuture.runAsync(task);
714+
}
700715
}
701716

702717
/**
@@ -705,7 +720,7 @@ private void executeToolAndRespondAsync(String requestId, String toolName, Strin
705720
*/
706721
private void executePermissionAndRespondAsync(String requestId, PermissionRequest permissionRequest,
707722
PermissionHandler handler) {
708-
CompletableFuture.runAsync(() -> {
723+
Runnable task = () -> {
709724
try {
710725
var invocation = new PermissionInvocation();
711726
invocation.setSessionId(sessionId);
@@ -744,7 +759,12 @@ private void executePermissionAndRespondAsync(String requestId, PermissionReques
744759
LOG.log(Level.WARNING, "Error sending permission denied for requestId=" + requestId, sendEx);
745760
}
746761
}
747-
});
762+
};
763+
if (executor != null) {
764+
CompletableFuture.runAsync(task, executor);
765+
} else {
766+
CompletableFuture.runAsync(task);
767+
}
748768
}
749769

750770
/**

src/main/java/com/github/copilot/sdk/RpcHandlerDispatcher.java

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import java.util.Collections;
1010
import java.util.Map;
1111
import java.util.concurrent.CompletableFuture;
12+
import java.util.concurrent.Executor;
1213
import java.util.logging.Level;
1314
import java.util.logging.Logger;
1415

@@ -45,6 +46,7 @@ final class RpcHandlerDispatcher {
4546

4647
private final Map<String, CopilotSession> sessions;
4748
private final LifecycleEventDispatcher lifecycleDispatcher;
49+
private final Executor executor;
4850

4951
/**
5052
* Creates a dispatcher with session registry and lifecycle dispatcher.
@@ -53,10 +55,14 @@ final class RpcHandlerDispatcher {
5355
* the session registry to look up sessions by ID
5456
* @param lifecycleDispatcher
5557
* callback for dispatching lifecycle events
58+
* @param executor
59+
* the executor for async dispatch, or {@code null} for default
5660
*/
57-
RpcHandlerDispatcher(Map<String, CopilotSession> sessions, LifecycleEventDispatcher lifecycleDispatcher) {
61+
RpcHandlerDispatcher(Map<String, CopilotSession> sessions, LifecycleEventDispatcher lifecycleDispatcher,
62+
Executor executor) {
5863
this.sessions = sessions;
5964
this.lifecycleDispatcher = lifecycleDispatcher;
65+
this.executor = executor;
6066
}
6167

6268
/**
@@ -118,7 +124,7 @@ private void handleLifecycleEvent(JsonNode params) {
118124
}
119125

120126
private void handleToolCall(JsonRpcClient rpc, String requestId, JsonNode params) {
121-
CompletableFuture.runAsync(() -> {
127+
runAsync(() -> {
122128
try {
123129
String sessionId = params.get("sessionId").asText();
124130
String toolCallId = params.get("toolCallId").asText();
@@ -178,7 +184,7 @@ private void handleToolCall(JsonRpcClient rpc, String requestId, JsonNode params
178184
}
179185

180186
private void handlePermissionRequest(JsonRpcClient rpc, String requestId, JsonNode params) {
181-
CompletableFuture.runAsync(() -> {
187+
runAsync(() -> {
182188
try {
183189
String sessionId = params.get("sessionId").asText();
184190
JsonNode permissionRequest = params.get("permissionRequest");
@@ -222,7 +228,7 @@ private void handlePermissionRequest(JsonRpcClient rpc, String requestId, JsonNo
222228

223229
private void handleUserInputRequest(JsonRpcClient rpc, String requestId, JsonNode params) {
224230
LOG.fine("Received userInput.request: " + params);
225-
CompletableFuture.runAsync(() -> {
231+
runAsync(() -> {
226232
try {
227233
String sessionId = params.get("sessionId").asText();
228234
String question = params.get("question").asText();
@@ -278,7 +284,7 @@ private void handleUserInputRequest(JsonRpcClient rpc, String requestId, JsonNod
278284
}
279285

280286
private void handleHooksInvoke(JsonRpcClient rpc, String requestId, JsonNode params) {
281-
CompletableFuture.runAsync(() -> {
287+
runAsync(() -> {
282288
try {
283289
String sessionId = params.get("sessionId").asText();
284290
String hookType = params.get("hookType").asText();
@@ -321,7 +327,7 @@ interface LifecycleEventDispatcher {
321327
}
322328

323329
private void handleSystemMessageTransform(JsonRpcClient rpc, String requestId, JsonNode params) {
324-
CompletableFuture.runAsync(() -> {
330+
runAsync(() -> {
325331
try {
326332
final long requestIdLong;
327333
try {
@@ -359,4 +365,12 @@ private void handleSystemMessageTransform(JsonRpcClient rpc, String requestId, J
359365
}
360366
});
361367
}
368+
369+
private void runAsync(Runnable task) {
370+
if (executor != null) {
371+
CompletableFuture.runAsync(task, executor);
372+
} else {
373+
CompletableFuture.runAsync(task);
374+
}
375+
}
362376
}

src/main/java/com/github/copilot/sdk/json/CopilotClientOptions.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import java.util.List;
88
import java.util.Map;
99
import java.util.concurrent.CompletableFuture;
10+
import java.util.concurrent.Executor;
1011
import java.util.function.Supplier;
1112

1213
import com.fasterxml.jackson.annotation.JsonInclude;
@@ -49,6 +50,7 @@ public class CopilotClientOptions {
4950
private Boolean useLoggedInUser;
5051
private Supplier<CompletableFuture<List<ModelInfo>>> onListModels;
5152
private TelemetryConfig telemetry;
53+
private Executor executor;
5254

5355
/**
5456
* Gets the path to the Copilot CLI executable.
@@ -412,6 +414,37 @@ public CopilotClientOptions setTelemetry(TelemetryConfig telemetry) {
412414
return this;
413415
}
414416

417+
/**
418+
* Gets the executor used for internal asynchronous operations.
419+
*
420+
* @return the executor, or {@code null} to use the default
421+
* {@code ForkJoinPool.commonPool()}
422+
*/
423+
public Executor getExecutor() {
424+
return executor;
425+
}
426+
427+
/**
428+
* Sets the executor used for internal asynchronous operations.
429+
* <p>
430+
* When provided, the SDK uses this executor for all internal
431+
* {@code CompletableFuture} combinators instead of the default
432+
* {@code ForkJoinPool.commonPool()}. This allows callers to isolate SDK
433+
* work onto a dedicated thread pool or integrate with container-managed
434+
* threading.
435+
*
436+
* @param executor
437+
* the executor to use, or {@code null} for the default
438+
* @return this options instance for fluent chaining
439+
*/
440+
public CopilotClientOptions setExecutor(Executor executor) {
441+
if (null == executor) {
442+
throw new IllegalArgumentException("PENDING(copilot): not implemented");
443+
}
444+
this.executor = executor;
445+
return this;
446+
}
447+
415448
/**
416449
* Creates a shallow clone of this {@code CopilotClientOptions} instance.
417450
* <p>
@@ -439,6 +472,7 @@ public CopilotClientOptions clone() {
439472
copy.useLoggedInUser = this.useLoggedInUser;
440473
copy.onListModels = this.onListModels;
441474
copy.telemetry = this.telemetry;
475+
copy.executor = this.executor;
442476
return copy;
443477
}
444478
}

0 commit comments

Comments
 (0)