Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ import * as Layer from "effect/Layer";
import { HostProcessHostname, HostProcessPlatform } from "@t3tools/shared/hostProcess";
import { vi } from "vite-plus/test";

import { ProcessRunner, ProcessSpawnError, type ProcessRunnerShape } from "../../processRunner.ts";
import * as ProcessRunner from "../../processRunner.ts";
import { resolveServerEnvironmentLabel } from "./ServerEnvironmentLabel.ts";
import { ChildProcessSpawner } from "effect/unstable/process";

const runMock = vi.fn<ProcessRunnerShape["run"]>();
const runMock = vi.fn<ProcessRunner.ProcessRunner["Service"]["run"]>();

const ProcessRunnerTest = Layer.succeed(
ProcessRunner,
ProcessRunner.of({
ProcessRunner.ProcessRunner,
ProcessRunner.ProcessRunner.of({
run: (input) => runMock(input),
}),
);
Expand Down Expand Up @@ -136,7 +136,7 @@ describe("resolveServerEnvironmentLabel", () => {
Effect.gen(function* () {
runMock.mockReturnValueOnce(
Effect.fail(
new ProcessSpawnError({
new ProcessRunner.ProcessSpawnError({
command: "scutil",
args: ["--get", "ComputerName"],
cause: new Error("spawn scutil ENOENT"),
Expand Down
16 changes: 8 additions & 8 deletions apps/server/src/mcp/PreviewAutomationBroker.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ const makeOwner = (overrides: Partial<PreviewAutomationOwner> = {}): PreviewAuto
it.effect("atomically registers a connected owner and correlates its response", () =>
Effect.scoped(
Effect.gen(function* () {
const broker = yield* PreviewAutomationBroker.__testing.make;
const broker = yield* PreviewAutomationBroker.make;
const requests = yield* broker.connect(makeOwner());
yield* Stream.runForEach(requests, (request) =>
broker.respond({
Expand All @@ -61,7 +61,7 @@ it.effect("atomically registers a connected owner and correlates its response",

it.effect("rejects calls when no focused owner exists", () =>
Effect.gen(function* () {
const broker = yield* PreviewAutomationBroker.__testing.make;
const broker = yield* PreviewAutomationBroker.make;
const error = yield* broker
.invoke<void>({ scope, operation: "status", input: {} })
.pipe(Effect.flip);
Expand All @@ -72,7 +72,7 @@ it.effect("rejects calls when no focused owner exists", () =>
it.effect("routes interactive commands to a hidden durable browser host", () =>
Effect.scoped(
Effect.gen(function* () {
const broker = yield* PreviewAutomationBroker.__testing.make;
const broker = yield* PreviewAutomationBroker.make;
const requests = yield* broker.connect(
makeOwner({ clientId: "client-hidden", tabId: "tab-hidden" }),
);
Expand All @@ -89,7 +89,7 @@ it.effect("routes interactive commands to a hidden durable browser host", () =>
it.effect("lets the browser host resolve an active tab that has not been reported yet", () =>
Effect.scoped(
Effect.gen(function* () {
const broker = yield* PreviewAutomationBroker.__testing.make;
const broker = yield* PreviewAutomationBroker.make;
const requests = yield* broker.connect(makeOwner({ tabId: null }));
let routedTabId: string | undefined;
yield* Stream.runForEach(requests, (request) => {
Expand All @@ -108,7 +108,7 @@ it.effect("lets the browser host resolve an active tab that has not been reporte
it.effect("preserves current owner metadata when its request stream reconnects", () =>
Effect.scoped(
Effect.gen(function* () {
const broker = yield* PreviewAutomationBroker.__testing.make;
const broker = yield* PreviewAutomationBroker.make;
const firstRequests = yield* broker.connect(makeOwner());
yield* Stream.runDrain(firstRequests).pipe(Effect.forkScoped);
yield* broker.reportOwner(makeOwner({ tabId: "tab-current", visible: true }));
Expand All @@ -131,7 +131,7 @@ it.effect("preserves current owner metadata when its request stream reconnects",
it.effect("ignores stale owner cleanup after the client moves to another thread", () =>
Effect.scoped(
Effect.gen(function* () {
const broker = yield* PreviewAutomationBroker.__testing.make;
const broker = yield* PreviewAutomationBroker.make;
const requests = yield* broker.connect(makeOwner());
yield* Stream.runForEach(requests, (request) =>
broker.respond({ requestId: request.requestId, ok: true }),
Expand All @@ -152,7 +152,7 @@ it.effect("ignores stale owner cleanup after the client moves to another thread"
it.effect("fails requests assigned to a browser stream when that stream reconnects", () =>
Effect.scoped(
Effect.gen(function* () {
const broker = yield* PreviewAutomationBroker.__testing.make;
const broker = yield* PreviewAutomationBroker.make;
const _requests = yield* broker.connect(makeOwner());
const pending = yield* broker
.invoke<void>({ scope, operation: "status", input: {} })
Expand All @@ -170,7 +170,7 @@ it.effect("fails requests assigned to a browser stream when that stream reconnec
it.effect("falls back to an older connected owner when a newer report is not connected", () =>
Effect.scoped(
Effect.gen(function* () {
const broker = yield* PreviewAutomationBroker.__testing.make;
const broker = yield* PreviewAutomationBroker.make;
const requests = yield* broker.connect(makeOwner({ clientId: "client-connected" }));
yield* Stream.runForEach(requests, (request) =>
broker.respond({ requestId: request.requestId, ok: true, result: "connected" }),
Expand Down
55 changes: 22 additions & 33 deletions apps/server/src/mcp/PreviewAutomationBroker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,34 +35,28 @@ export interface PreviewAutomationInvokeInput {
readonly timeoutMs?: number;
}

export interface PreviewAutomationBrokerShape {
readonly connect: (
owner: PreviewAutomationOwner,
) => Effect.Effect<Stream.Stream<PreviewAutomationRequest>>;
readonly reportOwner: (
owner: PreviewAutomationOwner,
) => Effect.Effect<void, PreviewAutomationError>;
readonly clearOwner: (owner: PreviewAutomationOwnerIdentity) => Effect.Effect<void>;
readonly respond: (
response: PreviewAutomationResponse,
) => Effect.Effect<void, PreviewAutomationError>;
readonly invoke: <A = unknown>(
request: PreviewAutomationInvokeInput,
) => Effect.Effect<A, PreviewAutomationError>;
}

export class PreviewAutomationBroker extends Context.Service<
PreviewAutomationBroker,
PreviewAutomationBrokerShape
{
readonly connect: (
owner: PreviewAutomationOwner,
) => Effect.Effect<Stream.Stream<PreviewAutomationRequest>>;
readonly reportOwner: (
owner: PreviewAutomationOwner,
) => Effect.Effect<void, PreviewAutomationError>;
readonly clearOwner: (owner: PreviewAutomationOwnerIdentity) => Effect.Effect<void>;
readonly respond: (
response: PreviewAutomationResponse,
) => Effect.Effect<void, PreviewAutomationError>;
readonly invoke: <A = unknown>(
request: PreviewAutomationInvokeInput,
) => Effect.Effect<A, PreviewAutomationError>;
}
>()("t3/mcp/PreviewAutomationBroker") {}

interface ClientConnection {
readonly clientId: string;
readonly queue: Queue.Queue<
Parameters<PreviewAutomationBrokerShape["respond"]>[0] extends never
? never
: import("@t3tools/contracts").PreviewAutomationRequest
>;
readonly queue: Queue.Queue<PreviewAutomationRequest>;
}

interface PendingRequest {
Expand Down Expand Up @@ -123,7 +117,7 @@ const makeResponseError = (
}
};

const make = Effect.gen(function* PreviewAutomationBrokerMake() {
export const make = Effect.gen(function* PreviewAutomationBrokerMake() {
const state = yield* SynchronizedRef.make<BrokerState>({
clients: new Map(),
owners: new Map(),
Expand Down Expand Up @@ -166,7 +160,7 @@ const make = Effect.gen(function* PreviewAutomationBrokerMake() {
yield* Queue.shutdown(queue);
});

const connect: PreviewAutomationBrokerShape["connect"] = Effect.fn(
const connect: PreviewAutomationBroker["Service"]["connect"] = Effect.fn(
"PreviewAutomationBroker.connect",
)(function* (owner) {
const clientId = owner.clientId;
Expand All @@ -189,7 +183,7 @@ const make = Effect.gen(function* PreviewAutomationBrokerMake() {
return Stream.fromQueue(queue).pipe(Stream.ensuring(disconnect(clientId, queue)));
});

const reportOwner: PreviewAutomationBrokerShape["reportOwner"] = Effect.fn(
const reportOwner: PreviewAutomationBroker["Service"]["reportOwner"] = Effect.fn(
"PreviewAutomationBroker.reportOwner",
)(function* (owner) {
yield* SynchronizedRef.update(state, (current) => {
Expand All @@ -199,7 +193,7 @@ const make = Effect.gen(function* PreviewAutomationBrokerMake() {
});
});

const clearOwner: PreviewAutomationBrokerShape["clearOwner"] = Effect.fn(
const clearOwner: PreviewAutomationBroker["Service"]["clearOwner"] = Effect.fn(
"PreviewAutomationBroker.clearOwner",
)(function* (owner) {
yield* SynchronizedRef.update(state, (current) => {
Expand All @@ -217,7 +211,7 @@ const make = Effect.gen(function* PreviewAutomationBrokerMake() {
});
});

const respond: PreviewAutomationBrokerShape["respond"] = Effect.fn(
const respond: PreviewAutomationBroker["Service"]["respond"] = Effect.fn(
"PreviewAutomationBroker.respond",
)(function* (response) {
const pending = yield* SynchronizedRef.modify(state, (current) => {
Expand All @@ -243,7 +237,7 @@ const make = Effect.gen(function* PreviewAutomationBrokerMake() {
});

const invoke = Effect.fn("PreviewAutomationBroker.invoke")(function* <A = unknown>(
input: Parameters<PreviewAutomationBrokerShape["invoke"]>[0],
input: Parameters<PreviewAutomationBroker["Service"]["invoke"]>[0],
): Effect.fn.Return<A, PreviewAutomationError> {
const current = yield* SynchronizedRef.get(state);
const candidates = Array.from(current.owners.values())
Expand Down Expand Up @@ -317,8 +311,3 @@ const make = Effect.gen(function* PreviewAutomationBrokerMake() {
}).pipe(Effect.withSpan("PreviewAutomationBroker.make"));

export const layer = Layer.effect(PreviewAutomationBroker, make);

/** Exposed for tests. */
export const __testing = {
make,
};
Loading
Loading