future = connectionFuture;
connectionFuture = null;
if (future == null) {
return CompletableFuture.completedFuture(null);
}
return future.thenAccept(connection -> {
try {
connection.rpc.close();
} catch (Exception e) {
LOG.log(Level.FINE, "Error closing RPC", e);
}
if (connection.process != null) {
try {
if (connection.process.isAlive()) {
connection.process.destroyForcibly();
}
} catch (Exception e) {
LOG.log(Level.FINE, "Error killing process", e);
}
}
}).exceptionally(ex -> null);
}
/**
* Creates a new Copilot session with the specified configuration.
*
* The session maintains conversation state and can be used to send messages and
* receive responses. Remember to close the session when done.
*
* @param config
* configuration for the session (model, tools, etc.)
* @return a future that resolves with the created CopilotSession
* @see #createSession()
* @see SessionConfig
*/
public CompletableFuture createSession(SessionConfig config) {
return ensureConnected().thenCompose(connection -> {
CreateSessionRequest request = new CreateSessionRequest();
if (config != null) {
request.setModel(config.getModel());
request.setSessionId(config.getSessionId());
request.setTools(config.getTools() != null
? config.getTools().stream()
.map(t -> new ToolDef(t.getName(), t.getDescription(), t.getParameters()))
.collect(Collectors.toList())
: null);
request.setSystemMessage(config.getSystemMessage());
request.setAvailableTools(config.getAvailableTools());
request.setExcludedTools(config.getExcludedTools());
request.setProvider(config.getProvider());
request.setRequestPermission(config.getOnPermissionRequest() != null ? true : null);
request.setStreaming(config.isStreaming() ? true : null);
request.setMcpServers(config.getMcpServers());
request.setCustomAgents(config.getCustomAgents());
request.setInfiniteSessions(config.getInfiniteSessions());
}
return connection.rpc.invoke("session.create", request, CreateSessionResponse.class).thenApply(response -> {
CopilotSession session = new CopilotSession(response.getSessionId(), connection.rpc,
response.getWorkspacePath());
if (config != null && config.getTools() != null) {
session.registerTools(config.getTools());
}
if (config != null && config.getOnPermissionRequest() != null) {
session.registerPermissionHandler(config.getOnPermissionRequest());
}
sessions.put(response.getSessionId(), session);
return session;
});
});
}
/**
* Creates a new Copilot session with default configuration.
*
* @return a future that resolves with the created CopilotSession
* @see #createSession(SessionConfig)
*/
public CompletableFuture createSession() {
return createSession(null);
}
/**
* Resumes an existing Copilot session.
*
* This restores a previously saved session, allowing you to continue a
* conversation. The session's history is preserved.
*
* @param sessionId
* the ID of the session to resume
* @param config
* configuration for the resumed session
* @return a future that resolves with the resumed CopilotSession
* @see #resumeSession(String)
* @see #listSessions()
* @see #getLastSessionId()
*/
public CompletableFuture resumeSession(String sessionId, ResumeSessionConfig config) {
return ensureConnected().thenCompose(connection -> {
ResumeSessionRequest request = new ResumeSessionRequest();
request.setSessionId(sessionId);
if (config != null) {
request.setTools(config.getTools() != null
? config.getTools().stream()
.map(t -> new ToolDef(t.getName(), t.getDescription(), t.getParameters()))
.collect(Collectors.toList())
: null);
request.setProvider(config.getProvider());
request.setRequestPermission(config.getOnPermissionRequest() != null ? true : null);
request.setStreaming(config.isStreaming() ? true : null);
request.setMcpServers(config.getMcpServers());
request.setCustomAgents(config.getCustomAgents());
}
return connection.rpc.invoke("session.resume", request, ResumeSessionResponse.class).thenApply(response -> {
CopilotSession session = new CopilotSession(response.getSessionId(), connection.rpc,
response.getWorkspacePath());
if (config != null && config.getTools() != null) {
session.registerTools(config.getTools());
}
if (config != null && config.getOnPermissionRequest() != null) {
session.registerPermissionHandler(config.getOnPermissionRequest());
}
sessions.put(response.getSessionId(), session);
return session;
});
});
}
/**
* Resumes an existing session with default configuration.
*
* @param sessionId
* the ID of the session to resume
* @return a future that resolves with the resumed CopilotSession
* @see #resumeSession(String, ResumeSessionConfig)
*/
public CompletableFuture resumeSession(String sessionId) {
return resumeSession(sessionId, null);
}
/**
* Gets the current connection state.
*
* @return the current connection state
* @see ConnectionState
*/
public ConnectionState getState() {
if (connectionFuture == null)
return ConnectionState.DISCONNECTED;
if (connectionFuture.isCompletedExceptionally())
return ConnectionState.ERROR;
if (!connectionFuture.isDone())
return ConnectionState.CONNECTING;
return ConnectionState.CONNECTED;
}
/**
* Pings the server to check connectivity.
*
* This can be used to verify that the server is responsive and to check the
* protocol version.
*
* @param message
* an optional message to echo back
* @return a future that resolves with the ping response
* @see PingResponse
*/
public CompletableFuture ping(String message) {
return ensureConnected().thenCompose(connection -> connection.rpc.invoke("ping",
Map.of("message", message != null ? message : ""), PingResponse.class));
}
/**
* Gets CLI status including version and protocol information.
*
* @return a future that resolves with the status response containing version
* and protocol version
* @see GetStatusResponse
*/
public CompletableFuture getStatus() {
return ensureConnected()
.thenCompose(connection -> connection.rpc.invoke("status.get", Map.of(), GetStatusResponse.class));
}
/**
* Gets current authentication status.
*
* @return a future that resolves with the authentication status
* @see GetAuthStatusResponse
*/
public CompletableFuture getAuthStatus() {
return ensureConnected().thenCompose(
connection -> connection.rpc.invoke("auth.getStatus", Map.of(), GetAuthStatusResponse.class));
}
/**
* Lists available models with their metadata.
*
* @return a future that resolves with a list of available models
* @see ModelInfo
*/
public CompletableFuture> listModels() {
return ensureConnected().thenCompose(connection -> connection.rpc
.invoke("models.list", Map.of(), GetModelsResponse.class).thenApply(GetModelsResponse::getModels));
}
/**
* Gets the ID of the most recently used session.
*
* This is useful for resuming the last conversation without needing to list all
* sessions.
*
* @return a future that resolves with the last session ID, or {@code null} if
* no sessions exist
* @see #resumeSession(String)
*/
public CompletableFuture getLastSessionId() {
return ensureConnected().thenCompose(
connection -> connection.rpc.invoke("session.getLastId", Map.of(), GetLastSessionIdResponse.class)
.thenApply(GetLastSessionIdResponse::getSessionId));
}
/**
* Deletes a session by ID.
*
* This permanently removes the session and its conversation history.
*
* @param sessionId
* the ID of the session to delete
* @return a future that completes when the session is deleted
* @throws RuntimeException
* if the deletion fails
*/
public CompletableFuture deleteSession(String sessionId) {
return ensureConnected().thenCompose(connection -> connection.rpc
.invoke("session.delete", Map.of("sessionId", sessionId), DeleteSessionResponse.class)
.thenAccept(response -> {
if (!response.isSuccess()) {
throw new RuntimeException(
"Failed to delete session " + sessionId + ": " + response.getError());
}
sessions.remove(sessionId);
}));
}
/**
* Lists all available sessions.
*
* Returns metadata about all sessions that can be resumed, including their IDs,
* start times, and summaries.
*
* @return a future that resolves with a list of session metadata
* @see SessionMetadata
* @see #resumeSession(String)
*/
public CompletableFuture> listSessions() {
return ensureConnected()
.thenCompose(connection -> connection.rpc.invoke("session.list", Map.of(), ListSessionsResponse.class)
.thenApply(ListSessionsResponse::getSessions));
}
private CompletableFuture ensureConnected() {
if (connectionFuture == null && !options.isAutoStart()) {
throw new IllegalStateException("Client not connected. Call start() first.");
}
start();
return connectionFuture;
}
private ProcessInfo startCliServer() throws IOException, InterruptedException {
String cliPath = options.getCliPath() != null ? options.getCliPath() : "copilot";
List args = new ArrayList<>();
if (options.getCliArgs() != null) {
args.addAll(Arrays.asList(options.getCliArgs()));
}
args.add("--server");
args.add("--log-level");
args.add(options.getLogLevel());
if (options.isUseStdio()) {
args.add("--stdio");
} else if (options.getPort() > 0) {
args.add("--port");
args.add(String.valueOf(options.getPort()));
}
List command = resolveCliCommand(cliPath, args);
ProcessBuilder pb = new ProcessBuilder(command);
pb.redirectErrorStream(false);
if (options.getCwd() != null) {
pb.directory(new File(options.getCwd()));
}
if (options.getEnvironment() != null) {
pb.environment().clear();
pb.environment().putAll(options.getEnvironment());
}
pb.environment().remove("NODE_DEBUG");
Process process = pb.start();
// Forward stderr to logger in background
Thread stderrThread = new Thread(() -> {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getErrorStream()))) {
String line;
while ((line = reader.readLine()) != null) {
LOG.fine("[CLI] " + line);
}
} catch (IOException e) {
LOG.log(Level.FINE, "Error reading stderr", e);
}
}, "cli-stderr-reader");
stderrThread.setDaemon(true);
stderrThread.start();
Integer detectedPort = null;
if (!options.isUseStdio()) {
// Wait for port announcement
BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
Pattern portPattern = Pattern.compile("listening on port (\\d+)", Pattern.CASE_INSENSITIVE);
long deadline = System.currentTimeMillis() + 30000;
while (System.currentTimeMillis() < deadline) {
String line = reader.readLine();
if (line == null) {
throw new IOException("CLI process exited unexpectedly");
}
Matcher matcher = portPattern.matcher(line);
if (matcher.find()) {
detectedPort = Integer.parseInt(matcher.group(1));
break;
}
}
if (detectedPort == null) {
process.destroyForcibly();
throw new IOException("Timeout waiting for CLI to announce port");
}
}
return new ProcessInfo(process, detectedPort);
}
private List resolveCliCommand(String cliPath, List args) {
boolean isJsFile = cliPath.toLowerCase().endsWith(".js");
if (isJsFile) {
List result = new ArrayList<>();
result.add("node");
result.add(cliPath);
result.addAll(args);
return result;
}
// On Windows, use cmd /c to resolve the executable
String os = System.getProperty("os.name").toLowerCase();
if (os.contains("win") && !new File(cliPath).isAbsolute()) {
List result = new ArrayList<>();
result.add("cmd");
result.add("/c");
result.add(cliPath);
result.addAll(args);
return result;
}
List result = new ArrayList<>();
result.add(cliPath);
result.addAll(args);
return result;
}
private Connection connectToServer(Process process, String tcpHost, Integer tcpPort) throws IOException {
JsonRpcClient rpc;
if (options.isUseStdio()) {
if (process == null) {
throw new IllegalStateException("CLI process not started");
}
rpc = JsonRpcClient.fromProcess(process);
} else {
if (tcpHost == null || tcpPort == null) {
throw new IllegalStateException("Cannot connect because TCP host or port are not available");
}
Socket socket = new Socket(tcpHost, tcpPort);
rpc = JsonRpcClient.fromSocket(socket);
}
return new Connection(rpc, process);
}
@Override
public void close() {
if (disposed)
return;
disposed = true;
try {
forceStop().get(5, TimeUnit.SECONDS);
} catch (Exception e) {
LOG.log(Level.FINE, "Error during close", e);
}
}
private static class ProcessInfo {
final Process process;
final Integer port;
ProcessInfo(Process process, Integer port) {
this.process = process;
this.port = port;
}
}
private static class Connection {
final JsonRpcClient rpc;
final Process process;
Connection(JsonRpcClient rpc, Process process) {
this.rpc = rpc;
this.process = process;
}
}
}