diff --git a/apps/server/src/environment/Layers/ServerEnvironmentLabel.test.ts b/apps/server/src/environment/Layers/ServerEnvironmentLabel.test.ts index 3a4dce1627c..14580369a78 100644 --- a/apps/server/src/environment/Layers/ServerEnvironmentLabel.test.ts +++ b/apps/server/src/environment/Layers/ServerEnvironmentLabel.test.ts @@ -5,15 +5,15 @@ import * as Layer from "effect/Layer"; import { HostProcessHostname, HostProcessPlatform } from "@t3tools/shared/hostProcess"; import { vi } from "vite-plus/test"; -import { ProcessRunner, ProcessSpawnError, type ProcessRunnerShape } from "../../processRunner.ts"; +import * as ProcessRunner from "../../processRunner.ts"; import { resolveServerEnvironmentLabel } from "./ServerEnvironmentLabel.ts"; import { ChildProcessSpawner } from "effect/unstable/process"; -const runMock = vi.fn(); +const runMock = vi.fn(); const ProcessRunnerTest = Layer.succeed( - ProcessRunner, - ProcessRunner.of({ + ProcessRunner.ProcessRunner, + ProcessRunner.ProcessRunner.of({ run: (input) => runMock(input), }), ); @@ -136,7 +136,7 @@ describe("resolveServerEnvironmentLabel", () => { Effect.gen(function* () { runMock.mockReturnValueOnce( Effect.fail( - new ProcessSpawnError({ + new ProcessRunner.ProcessSpawnError({ command: "scutil", args: ["--get", "ComputerName"], cause: new Error("spawn scutil ENOENT"), diff --git a/apps/server/src/mcp/PreviewAutomationBroker.test.ts b/apps/server/src/mcp/PreviewAutomationBroker.test.ts index 06b18259833..5631b3bef57 100644 --- a/apps/server/src/mcp/PreviewAutomationBroker.test.ts +++ b/apps/server/src/mcp/PreviewAutomationBroker.test.ts @@ -37,7 +37,7 @@ const makeOwner = (overrides: Partial = {}): PreviewAuto it.effect("atomically registers a connected owner and correlates its response", () => Effect.scoped( Effect.gen(function* () { - const broker = yield* PreviewAutomationBroker.__testing.make; + const broker = yield* PreviewAutomationBroker.make; const requests = yield* broker.connect(makeOwner()); yield* Stream.runForEach(requests, (request) => broker.respond({ @@ -61,7 +61,7 @@ it.effect("atomically registers a connected owner and correlates its response", it.effect("rejects calls when no focused owner exists", () => Effect.gen(function* () { - const broker = yield* PreviewAutomationBroker.__testing.make; + const broker = yield* PreviewAutomationBroker.make; const error = yield* broker .invoke({ scope, operation: "status", input: {} }) .pipe(Effect.flip); @@ -72,7 +72,7 @@ it.effect("rejects calls when no focused owner exists", () => it.effect("routes interactive commands to a hidden durable browser host", () => Effect.scoped( Effect.gen(function* () { - const broker = yield* PreviewAutomationBroker.__testing.make; + const broker = yield* PreviewAutomationBroker.make; const requests = yield* broker.connect( makeOwner({ clientId: "client-hidden", tabId: "tab-hidden" }), ); @@ -89,7 +89,7 @@ it.effect("routes interactive commands to a hidden durable browser host", () => it.effect("lets the browser host resolve an active tab that has not been reported yet", () => Effect.scoped( Effect.gen(function* () { - const broker = yield* PreviewAutomationBroker.__testing.make; + const broker = yield* PreviewAutomationBroker.make; const requests = yield* broker.connect(makeOwner({ tabId: null })); let routedTabId: string | undefined; yield* Stream.runForEach(requests, (request) => { @@ -108,7 +108,7 @@ it.effect("lets the browser host resolve an active tab that has not been reporte it.effect("preserves current owner metadata when its request stream reconnects", () => Effect.scoped( Effect.gen(function* () { - const broker = yield* PreviewAutomationBroker.__testing.make; + const broker = yield* PreviewAutomationBroker.make; const firstRequests = yield* broker.connect(makeOwner()); yield* Stream.runDrain(firstRequests).pipe(Effect.forkScoped); yield* broker.reportOwner(makeOwner({ tabId: "tab-current", visible: true })); @@ -131,7 +131,7 @@ it.effect("preserves current owner metadata when its request stream reconnects", it.effect("ignores stale owner cleanup after the client moves to another thread", () => Effect.scoped( Effect.gen(function* () { - const broker = yield* PreviewAutomationBroker.__testing.make; + const broker = yield* PreviewAutomationBroker.make; const requests = yield* broker.connect(makeOwner()); yield* Stream.runForEach(requests, (request) => broker.respond({ requestId: request.requestId, ok: true }), @@ -152,7 +152,7 @@ it.effect("ignores stale owner cleanup after the client moves to another thread" it.effect("fails requests assigned to a browser stream when that stream reconnects", () => Effect.scoped( Effect.gen(function* () { - const broker = yield* PreviewAutomationBroker.__testing.make; + const broker = yield* PreviewAutomationBroker.make; const _requests = yield* broker.connect(makeOwner()); const pending = yield* broker .invoke({ scope, operation: "status", input: {} }) @@ -170,7 +170,7 @@ it.effect("fails requests assigned to a browser stream when that stream reconnec it.effect("falls back to an older connected owner when a newer report is not connected", () => Effect.scoped( Effect.gen(function* () { - const broker = yield* PreviewAutomationBroker.__testing.make; + const broker = yield* PreviewAutomationBroker.make; const requests = yield* broker.connect(makeOwner({ clientId: "client-connected" })); yield* Stream.runForEach(requests, (request) => broker.respond({ requestId: request.requestId, ok: true, result: "connected" }), diff --git a/apps/server/src/mcp/PreviewAutomationBroker.ts b/apps/server/src/mcp/PreviewAutomationBroker.ts index 3cd7563bd9d..ee9d5bdbd0d 100644 --- a/apps/server/src/mcp/PreviewAutomationBroker.ts +++ b/apps/server/src/mcp/PreviewAutomationBroker.ts @@ -35,34 +35,28 @@ export interface PreviewAutomationInvokeInput { readonly timeoutMs?: number; } -export interface PreviewAutomationBrokerShape { - readonly connect: ( - owner: PreviewAutomationOwner, - ) => Effect.Effect>; - readonly reportOwner: ( - owner: PreviewAutomationOwner, - ) => Effect.Effect; - readonly clearOwner: (owner: PreviewAutomationOwnerIdentity) => Effect.Effect; - readonly respond: ( - response: PreviewAutomationResponse, - ) => Effect.Effect; - readonly invoke: ( - request: PreviewAutomationInvokeInput, - ) => Effect.Effect; -} - export class PreviewAutomationBroker extends Context.Service< PreviewAutomationBroker, - PreviewAutomationBrokerShape + { + readonly connect: ( + owner: PreviewAutomationOwner, + ) => Effect.Effect>; + readonly reportOwner: ( + owner: PreviewAutomationOwner, + ) => Effect.Effect; + readonly clearOwner: (owner: PreviewAutomationOwnerIdentity) => Effect.Effect; + readonly respond: ( + response: PreviewAutomationResponse, + ) => Effect.Effect; + readonly invoke: ( + request: PreviewAutomationInvokeInput, + ) => Effect.Effect; + } >()("t3/mcp/PreviewAutomationBroker") {} interface ClientConnection { readonly clientId: string; - readonly queue: Queue.Queue< - Parameters[0] extends never - ? never - : import("@t3tools/contracts").PreviewAutomationRequest - >; + readonly queue: Queue.Queue; } interface PendingRequest { @@ -123,7 +117,7 @@ const makeResponseError = ( } }; -const make = Effect.gen(function* PreviewAutomationBrokerMake() { +export const make = Effect.gen(function* PreviewAutomationBrokerMake() { const state = yield* SynchronizedRef.make({ clients: new Map(), owners: new Map(), @@ -166,7 +160,7 @@ const make = Effect.gen(function* PreviewAutomationBrokerMake() { yield* Queue.shutdown(queue); }); - const connect: PreviewAutomationBrokerShape["connect"] = Effect.fn( + const connect: PreviewAutomationBroker["Service"]["connect"] = Effect.fn( "PreviewAutomationBroker.connect", )(function* (owner) { const clientId = owner.clientId; @@ -189,7 +183,7 @@ const make = Effect.gen(function* PreviewAutomationBrokerMake() { return Stream.fromQueue(queue).pipe(Stream.ensuring(disconnect(clientId, queue))); }); - const reportOwner: PreviewAutomationBrokerShape["reportOwner"] = Effect.fn( + const reportOwner: PreviewAutomationBroker["Service"]["reportOwner"] = Effect.fn( "PreviewAutomationBroker.reportOwner", )(function* (owner) { yield* SynchronizedRef.update(state, (current) => { @@ -199,7 +193,7 @@ const make = Effect.gen(function* PreviewAutomationBrokerMake() { }); }); - const clearOwner: PreviewAutomationBrokerShape["clearOwner"] = Effect.fn( + const clearOwner: PreviewAutomationBroker["Service"]["clearOwner"] = Effect.fn( "PreviewAutomationBroker.clearOwner", )(function* (owner) { yield* SynchronizedRef.update(state, (current) => { @@ -217,7 +211,7 @@ const make = Effect.gen(function* PreviewAutomationBrokerMake() { }); }); - const respond: PreviewAutomationBrokerShape["respond"] = Effect.fn( + const respond: PreviewAutomationBroker["Service"]["respond"] = Effect.fn( "PreviewAutomationBroker.respond", )(function* (response) { const pending = yield* SynchronizedRef.modify(state, (current) => { @@ -243,7 +237,7 @@ const make = Effect.gen(function* PreviewAutomationBrokerMake() { }); const invoke = Effect.fn("PreviewAutomationBroker.invoke")(function* ( - input: Parameters[0], + input: Parameters[0], ): Effect.fn.Return { const current = yield* SynchronizedRef.get(state); const candidates = Array.from(current.owners.values()) @@ -317,8 +311,3 @@ const make = Effect.gen(function* PreviewAutomationBrokerMake() { }).pipe(Effect.withSpan("PreviewAutomationBroker.make")); export const layer = Layer.effect(PreviewAutomationBroker, make); - -/** Exposed for tests. */ -export const __testing = { - make, -}; diff --git a/apps/server/src/preview/Manager.ts b/apps/server/src/preview/Manager.ts index 8fa3a3668bf..159932c4bdc 100644 --- a/apps/server/src/preview/Manager.ts +++ b/apps/server/src/preview/Manager.ts @@ -28,33 +28,30 @@ import { normalizePreviewUrl, PreviewUrlNormalizationError, } from "@t3tools/shared/preview"; -import { - Context, - DateTime, - Effect, - Layer, - PubSub, - type Scope, - Stream, - SynchronizedRef, -} from "effect"; - -export interface PreviewManagerShape { - readonly open: (input: PreviewOpenInput) => Effect.Effect; - readonly navigate: ( - input: PreviewNavigateInput, - ) => Effect.Effect; - readonly reportStatus: (input: PreviewReportStatusInput) => Effect.Effect; - readonly refresh: (input: PreviewRefreshInput) => Effect.Effect; - readonly close: (input: PreviewCloseInput) => Effect.Effect; - readonly list: (input: PreviewListInput) => Effect.Effect; - readonly events: Stream.Stream; - readonly subscribeEvents: Effect.Effect, never, Scope.Scope>; -} +import * as Context from "effect/Context"; +import * as DateTime from "effect/DateTime"; +import * as Effect from "effect/Effect"; +import * as Layer from "effect/Layer"; +import * as PubSub from "effect/PubSub"; +import * as Scope from "effect/Scope"; +import * as Stream from "effect/Stream"; +import * as SynchronizedRef from "effect/SynchronizedRef"; -export class PreviewManager extends Context.Service()( - "t3/preview/Manager/PreviewManager", -) {} +export class PreviewManager extends Context.Service< + PreviewManager, + { + readonly open: (input: PreviewOpenInput) => Effect.Effect; + readonly navigate: ( + input: PreviewNavigateInput, + ) => Effect.Effect; + readonly reportStatus: (input: PreviewReportStatusInput) => Effect.Effect; + readonly refresh: (input: PreviewRefreshInput) => Effect.Effect; + readonly close: (input: PreviewCloseInput) => Effect.Effect; + readonly list: (input: PreviewListInput) => Effect.Effect; + readonly events: Stream.Stream; + readonly subscribeEvents: Effect.Effect, never, Scope.Scope>; + } +>()("t3/preview/Manager/PreviewManager") {} interface PreviewSessionState { readonly threadId: string; @@ -127,7 +124,7 @@ const buildIdleSnapshot = (input: { updatedAt: input.updatedAt, }); -const make = Effect.gen(function* PreviewManagerMake() { +export const make = Effect.gen(function* PreviewManagerMake() { const stateRef = yield* SynchronizedRef.make(initialState); // Unbounded PubSub is fine here — events are tiny and we don't want to // block publishers if a subscriber is slow. WS clients backpressure on @@ -184,38 +181,40 @@ const make = Effect.gen(function* PreviewManagerMake() { ); }; - const open: PreviewManagerShape["open"] = Effect.fn("PreviewManager.open")(function* (input) { - const tabId = newPreviewTabId(); - const updatedAt = yield* currentIsoTimestamp; - const snapshot = input.url - ? buildLoadingSnapshot({ + const open: PreviewManager["Service"]["open"] = Effect.fn("PreviewManager.open")( + function* (input) { + const tabId = newPreviewTabId(); + const updatedAt = yield* currentIsoTimestamp; + const snapshot = input.url + ? buildLoadingSnapshot({ + threadId: input.threadId, + tabId, + url: yield* normalizeUrl(input.url), + title: "", + updatedAt, + }) + : buildIdleSnapshot({ threadId: input.threadId, tabId, updatedAt }); + yield* SynchronizedRef.update(stateRef, (state) => { + const sessions = new Map(state.sessions); + sessions.set(compositeKey(input.threadId, tabId), { threadId: input.threadId, tabId, - url: yield* normalizeUrl(input.url), - title: "", - updatedAt, - }) - : buildIdleSnapshot({ threadId: input.threadId, tabId, updatedAt }); - yield* SynchronizedRef.update(stateRef, (state) => { - const sessions = new Map(state.sessions); - sessions.set(compositeKey(input.threadId, tabId), { + snapshot, + }); + return { sessions }; + }); + yield* PubSub.publish(eventsPubSub, { + type: "opened", threadId: input.threadId, tabId, + createdAt: snapshot.updatedAt, snapshot, }); - return { sessions }; - }); - yield* PubSub.publish(eventsPubSub, { - type: "opened", - threadId: input.threadId, - tabId, - createdAt: snapshot.updatedAt, - snapshot, - }); - return snapshot; - }); + return snapshot; + }, + ); - const navigate: PreviewManagerShape["navigate"] = Effect.fn("PreviewManager.navigate")( + const navigate: PreviewManager["Service"]["navigate"] = Effect.fn("PreviewManager.navigate")( function* (input) { const url = yield* normalizeUrl(input.url); return yield* mutateExistingSession( @@ -250,7 +249,7 @@ const make = Effect.gen(function* PreviewManagerMake() { }, ); - const reportStatus: PreviewManagerShape["reportStatus"] = Effect.fn( + const reportStatus: PreviewManager["Service"]["reportStatus"] = Effect.fn( "PreviewManager.reportStatus", )(function* (input) { yield* mutateExistingSession( @@ -294,7 +293,7 @@ const make = Effect.gen(function* PreviewManagerMake() { ); }); - const refresh: PreviewManagerShape["refresh"] = Effect.fn("PreviewManager.refresh")( + const refresh: PreviewManager["Service"]["refresh"] = Effect.fn("PreviewManager.refresh")( function* (input) { // Verify the session exists; the desktop bridge handles the actual reload // and will report progress back via `reportStatus`. No event emitted. @@ -304,50 +303,54 @@ const make = Effect.gen(function* PreviewManagerMake() { }, ); - const close: PreviewManagerShape["close"] = Effect.fn("PreviewManager.close")(function* (input) { - const createdAt = yield* currentIsoTimestamp; - const events = yield* SynchronizedRef.modify(stateRef, (state) => { - const eventsToEmit: PreviewEvent[] = []; - const sessions = new Map(state.sessions); - const targets = input.tabId - ? [state.sessions.get(compositeKey(input.threadId, input.tabId))].filter( - (entry): entry is PreviewSessionState => entry !== undefined, - ) - : sessionsForThread(state, input.threadId); - for (const target of targets) { - sessions.delete(compositeKey(target.threadId, target.tabId)); - eventsToEmit.push({ - type: "closed", - threadId: target.threadId, - tabId: target.tabId, - createdAt, + const close: PreviewManager["Service"]["close"] = Effect.fn("PreviewManager.close")( + function* (input) { + const createdAt = yield* currentIsoTimestamp; + const events = yield* SynchronizedRef.modify(stateRef, (state) => { + const eventsToEmit: PreviewEvent[] = []; + const sessions = new Map(state.sessions); + const targets = input.tabId + ? [state.sessions.get(compositeKey(input.threadId, input.tabId))].filter( + (entry): entry is PreviewSessionState => entry !== undefined, + ) + : sessionsForThread(state, input.threadId); + for (const target of targets) { + sessions.delete(compositeKey(target.threadId, target.tabId)); + eventsToEmit.push({ + type: "closed", + threadId: target.threadId, + tabId: target.tabId, + createdAt, + }); + } + if (eventsToEmit.length === 0) { + return [eventsToEmit, state] as const; + } + return [eventsToEmit, { sessions }] as const; + }); + if (events.length > 0) { + yield* Effect.forEach(events, (event) => PubSub.publish(eventsPubSub, event), { + discard: true, }); } - if (eventsToEmit.length === 0) { - return [eventsToEmit, state] as const; - } - return [eventsToEmit, { sessions }] as const; - }); - if (events.length > 0) { - yield* Effect.forEach(events, (event) => PubSub.publish(eventsPubSub, event), { - discard: true, - }); - } - }); + }, + ); - const list: PreviewManagerShape["list"] = Effect.fn("PreviewManager.list")(function* (input) { - return yield* SynchronizedRef.get(stateRef).pipe( - Effect.map( - (state): PreviewListResult => ({ - sessions: sessionsForThread(state, input.threadId) - .map((s) => s.snapshot) - .toSorted((a, b) => a.updatedAt.localeCompare(b.updatedAt)), - }), - ), - ); - }); + const list: PreviewManager["Service"]["list"] = Effect.fn("PreviewManager.list")( + function* (input) { + return yield* SynchronizedRef.get(stateRef).pipe( + Effect.map( + (state): PreviewListResult => ({ + sessions: sessionsForThread(state, input.threadId) + .map((s) => s.snapshot) + .toSorted((a, b) => a.updatedAt.localeCompare(b.updatedAt)), + }), + ), + ); + }, + ); - return { + return PreviewManager.of({ open, navigate, reportStatus, @@ -356,7 +359,7 @@ const make = Effect.gen(function* PreviewManagerMake() { list, events, subscribeEvents: PubSub.subscribe(eventsPubSub), - } satisfies PreviewManagerShape; + }); }).pipe(Effect.withSpan("PreviewManager.make")); export const layer = Layer.effect(PreviewManager, make); diff --git a/apps/server/src/preview/PortScanner.ts b/apps/server/src/preview/PortScanner.ts index 183d5d4f009..16ff0fed58f 100644 --- a/apps/server/src/preview/PortScanner.ts +++ b/apps/server/src/preview/PortScanner.ts @@ -15,30 +15,36 @@ import { ThreadId, type DiscoveredLocalServer } from "@t3tools/contracts"; import { HostProcessPlatform } from "@t3tools/shared/hostProcess"; import * as Net from "@t3tools/shared/Net"; import { LSOF_LOCAL_HOST_TOKENS } from "@t3tools/shared/preview"; -import { Cause, Context, Duration, Effect, Layer, Ref, Schedule, Scope } from "effect"; +import * as Cause from "effect/Cause"; +import * as Context from "effect/Context"; +import * as Duration from "effect/Duration"; +import * as Effect from "effect/Effect"; +import * as Layer from "effect/Layer"; +import * as Ref from "effect/Ref"; +import * as Schedule from "effect/Schedule"; +import * as Scope from "effect/Scope"; -import { ProcessRunner } from "../processRunner.ts"; +import * as ProcessRunner from "../processRunner.ts"; -export interface PortDiscoveryShape { - readonly scan: () => Effect.Effect>; - readonly subscribe: ( - listener: (servers: ReadonlyArray) => Effect.Effect, - ) => Effect.Effect; - readonly retain: Effect.Effect; - readonly registerTerminalProcesses: (input: { - readonly threadId: string; - readonly terminalId: string; - readonly processIds: ReadonlyArray; - }) => Effect.Effect; - readonly unregisterTerminal: (input: { - readonly threadId: string; - readonly terminalId: string; - }) => Effect.Effect; -} - -export class PortDiscovery extends Context.Service()( - "t3/preview/PortScanner/PortDiscovery", -) {} +export class PortDiscovery extends Context.Service< + PortDiscovery, + { + readonly scan: () => Effect.Effect>; + readonly subscribe: ( + listener: (servers: ReadonlyArray) => Effect.Effect, + ) => Effect.Effect; + readonly retain: Effect.Effect; + readonly registerTerminalProcesses: (input: { + readonly threadId: string; + readonly terminalId: string; + readonly processIds: ReadonlyArray; + }) => Effect.Effect; + readonly unregisterTerminal: (input: { + readonly threadId: string; + readonly terminalId: string; + }) => Effect.Effect; + } +>()("t3/preview/PortScanner/PortDiscovery") {} export const COMMON_DEV_PORTS: ReadonlyArray = Object.freeze([ 3000, 3001, 3333, 4173, 4200, 4321, 5000, 5173, 5174, 5175, 5500, 8000, 8080, 8081, 8888, 9000, @@ -180,9 +186,9 @@ const serversEqual = ( return true; }; -const make = Effect.gen(function* PortDiscoveryMake() { +export const make = Effect.gen(function* PortDiscoveryMake() { const net = yield* Net.NetService; - const processRunner = yield* ProcessRunner; + const processRunner = yield* ProcessRunner.ProcessRunner; const hostPlatform = yield* HostProcessPlatform; const stateRef = yield* Ref.make({ lastSnapshot: [], @@ -296,14 +302,14 @@ const make = Effect.gen(function* PortDiscoveryMake() { } }); - const retain: PortDiscoveryShape["retain"] = Effect.acquireRelease(acquireRetention(), () => + const retain: PortDiscovery["Service"]["retain"] = Effect.acquireRelease(acquireRetention(), () => Ref.update(stateRef, (state) => ({ ...state, retainCount: Math.max(0, state.retainCount - 1), })), ); - const subscribe: PortDiscoveryShape["subscribe"] = Effect.fn("PortDiscovery.subscribe")( + const subscribe: PortDiscovery["Service"]["subscribe"] = Effect.fn("PortDiscovery.subscribe")( (listener) => Effect.acquireRelease( Ref.update(stateRef, (state) => ({ @@ -319,29 +325,28 @@ const make = Effect.gen(function* PortDiscoveryMake() { ), ); - const registerTerminalProcesses: PortDiscoveryShape["registerTerminalProcesses"] = Effect.fn( - "PortDiscovery.registerTerminalProcesses", - )(function* (input) { - const owner = { - threadId: ThreadId.make(input.threadId), - terminalId: input.terminalId, - }; - const processIds = new Set( - input.processIds.filter((processId) => Number.isInteger(processId) && processId > 0), - ); - yield* Ref.update(stateRef, (state) => { - const terminalProcesses = new Map(state.terminalProcesses); - const key = terminalOwnerKey(owner); - if (processIds.size === 0) { - terminalProcesses.delete(key); - } else { - terminalProcesses.set(key, { owner, processIds }); - } - return { ...state, terminalProcesses }; + const registerTerminalProcesses: PortDiscovery["Service"]["registerTerminalProcesses"] = + Effect.fn("PortDiscovery.registerTerminalProcesses")(function* (input) { + const owner = { + threadId: ThreadId.make(input.threadId), + terminalId: input.terminalId, + }; + const processIds = new Set( + input.processIds.filter((processId) => Number.isInteger(processId) && processId > 0), + ); + yield* Ref.update(stateRef, (state) => { + const terminalProcesses = new Map(state.terminalProcesses); + const key = terminalOwnerKey(owner); + if (processIds.size === 0) { + terminalProcesses.delete(key); + } else { + terminalProcesses.set(key, { owner, processIds }); + } + return { ...state, terminalProcesses }; + }); }); - }); - const unregisterTerminal: PortDiscoveryShape["unregisterTerminal"] = Effect.fn( + const unregisterTerminal: PortDiscovery["Service"]["unregisterTerminal"] = Effect.fn( "PortDiscovery.unregisterTerminal", )(function* (input) { yield* Ref.update(stateRef, (state) => { @@ -351,13 +356,13 @@ const make = Effect.gen(function* PortDiscoveryMake() { }); }); - return { + return PortDiscovery.of({ scan: scanOnce, subscribe, retain, registerTerminalProcesses, unregisterTerminal, - } satisfies PortDiscoveryShape; + }); }).pipe(Effect.withSpan("PortDiscovery.make")); export const layer = Layer.effect(PortDiscovery, make); diff --git a/apps/server/src/process/externalLauncher.ts b/apps/server/src/process/externalLauncher.ts index 0b40acef5c0..e8cfce0e96a 100644 --- a/apps/server/src/process/externalLauncher.ts +++ b/apps/server/src/process/externalLauncher.ts @@ -22,7 +22,8 @@ import * as FileSystem from "effect/FileSystem"; import * as Layer from "effect/Layer"; import * as Option from "effect/Option"; import * as Path from "effect/Path"; -import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"; +import * as ChildProcess from "effect/unstable/process/ChildProcess"; +import * as ChildProcessSpawner from "effect/unstable/process/ChildProcessSpawner"; // ============================== // Definitions @@ -282,30 +283,23 @@ const resolveAvailableEditors = Effect.fn("externalLauncher.resolveAvailableEdit return yield* buildAvailableEditors(platform, env); }); -/** - * ExternalLauncherShape - Service API for browser and editor launch actions. - */ -export interface ExternalLauncherShape { - readonly resolveAvailableEditors: () => Effect.Effect>; - /** - * Launch a URL target in the default browser. - */ - readonly launchBrowser: (target: string) => Effect.Effect; - - /** - * Launch a workspace path in a selected editor integration. - * - * Launches the editor as a detached process so server startup is not blocked. - */ - readonly launchEditor: (input: LaunchEditorInput) => Effect.Effect; -} - /** * ExternalLauncher - Service tag for browser/editor launch operations. */ -export class ExternalLauncher extends Context.Service()( - "t3/process/externalLauncher", -) {} +export class ExternalLauncher extends Context.Service< + ExternalLauncher, + { + readonly resolveAvailableEditors: () => Effect.Effect>; + /** Launch a URL target in the default browser. */ + readonly launchBrowser: (target: string) => Effect.Effect; + /** + * Launch a workspace path in a selected editor integration. + * + * Launches the editor as a detached process so server startup is not blocked. + */ + readonly launchEditor: (input: LaunchEditorInput) => Effect.Effect; + } +>()("t3/process/externalLauncher") {} // ============================== // Implementations @@ -397,7 +391,7 @@ const launchEditorProcess = Effect.fn("externalLauncher.launchEditorProcess")(fu ); }); -const make = Effect.gen(function* () { +export const make = Effect.gen(function* () { const spawner = yield* ChildProcessSpawner.ChildProcessSpawner; const fileSystem = yield* FileSystem.FileSystem; const path = yield* Path.Path; @@ -410,7 +404,7 @@ const make = Effect.gen(function* () { Effect.provideService(Path.Path, path), ); - return { + return ExternalLauncher.of({ resolveAvailableEditors: () => provideCommandResolutionServices(resolveAvailableEditors()), launchBrowser: (target) => launchBrowser(target).pipe( @@ -424,7 +418,7 @@ const make = Effect.gen(function* () { ), ), ), - } satisfies ExternalLauncherShape; + }); }); export const layer = Layer.effect(ExternalLauncher, make); diff --git a/apps/server/src/processRunner.test.ts b/apps/server/src/processRunner.test.ts index f914c667a1c..2c9d9f95038 100644 --- a/apps/server/src/processRunner.test.ts +++ b/apps/server/src/processRunner.test.ts @@ -11,14 +11,7 @@ import { ChildProcessSpawner } from "effect/unstable/process"; import { HostProcessEnvironment, HostProcessPlatform } from "@t3tools/shared/hostProcess"; import { SpawnExecutableResolution } from "@t3tools/shared/shell"; -import { - isWindowsCommandNotFound, - ProcessOutputLimitError, - ProcessRunner, - ProcessTimeoutError, - layer as ProcessRunnerLive, - type ProcessRunInput, -} from "./processRunner.ts"; +import * as ProcessRunner from "./processRunner.ts"; type ChildProcessCommand = { readonly command: string; @@ -68,15 +61,16 @@ function makeSpawner( } const runWith = - (spawner: ChildProcessSpawner.ChildProcessSpawner["Service"]) => (input: ProcessRunInput) => - Effect.service(ProcessRunner).pipe( + (spawner: ChildProcessSpawner.ChildProcessSpawner["Service"]) => + (input: ProcessRunner.ProcessRunInput) => + Effect.service(ProcessRunner.ProcessRunner).pipe( Effect.flatMap((runner) => runner.run({ ...input, }), ), Effect.provide( - ProcessRunnerLive.pipe( + ProcessRunner.layer.pipe( Layer.provide(Layer.succeed(ChildProcessSpawner.ChildProcessSpawner, spawner)), ), ), @@ -112,12 +106,12 @@ describe("runProcess", () => { return makeHandle({ stdout: "service ok" }); }), ); - const layer = ProcessRunnerLive.pipe( + const layer = ProcessRunner.layer.pipe( Layer.provide(Layer.succeed(ChildProcessSpawner.ChildProcessSpawner, spawner)), ); return Effect.gen(function* () { - const runner = yield* ProcessRunner; + const runner = yield* ProcessRunner.ProcessRunner; const result = yield* runner.run({ command: "fake", args: ["--service"], @@ -175,7 +169,7 @@ describe("runProcess", () => { maxOutputBytes: 128, }).pipe(Effect.flip); - expect(error).toBeInstanceOf(ProcessOutputLimitError); + expect(error).toBeInstanceOf(ProcessRunner.ProcessOutputLimitError); }), ); @@ -200,7 +194,7 @@ describe("runProcess", () => { timeout: "2 seconds", }).pipe(Effect.flip); - expect(error).toBeInstanceOf(ProcessOutputLimitError); + expect(error).toBeInstanceOf(ProcessRunner.ProcessOutputLimitError); }), ); @@ -285,7 +279,7 @@ describe("runProcess", () => { yield* TestClock.adjust(Duration.millis(50)); const error = yield* Fiber.join(errorFiber); - expect(error).toBeInstanceOf(ProcessTimeoutError); + expect(error).toBeInstanceOf(ProcessRunner.ProcessTimeoutError); }), ); @@ -324,7 +318,7 @@ describe("runProcess", () => { describe("isWindowsCommandNotFound", () => { it.effect("matches the localized German cmd.exe error text", () => Effect.gen(function* () { - const isCommandNotFound = yield* isWindowsCommandNotFound( + const isCommandNotFound = yield* ProcessRunner.isWindowsCommandNotFound( 1, "wird nicht als interner oder externer Befehl, betriebsfahiges Programm oder Batch-Datei erkannt", ).pipe(Effect.provideService(HostProcessPlatform, "win32")); diff --git a/apps/server/src/processRunner.ts b/apps/server/src/processRunner.ts index 4cfb764c557..5f01fcc344b 100644 --- a/apps/server/src/processRunner.ts +++ b/apps/server/src/processRunner.ts @@ -1,13 +1,14 @@ -import * as Data from "effect/Data"; import * as Context from "effect/Context"; import * as Duration from "effect/Duration"; import * as Effect from "effect/Effect"; import * as Layer from "effect/Layer"; import * as Option from "effect/Option"; import * as PlatformError from "effect/PlatformError"; +import * as Schema from "effect/Schema"; import * as Scope from "effect/Scope"; import * as Stream from "effect/Stream"; -import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"; +import * as ChildProcess from "effect/unstable/process/ChildProcess"; +import * as ChildProcessSpawner from "effect/unstable/process/ChildProcessSpawner"; import { HostProcessPlatform } from "@t3tools/shared/hostProcess"; import { resolveSpawnCommand } from "@t3tools/shared/shell"; import { @@ -42,42 +43,82 @@ export interface ProcessRunOutput { readonly stderrTruncated: boolean; } -export class ProcessSpawnError extends Data.TaggedError("ProcessSpawnError")<{ - readonly command: string; - readonly args: ReadonlyArray; - readonly cwd?: string | undefined; - readonly cause: unknown; -}> {} +const ProcessInvocationFields = { + command: Schema.String, + args: Schema.Array(Schema.String), + cwd: Schema.optional(Schema.String), +}; -export class ProcessStdinError extends Data.TaggedError("ProcessStdinError")<{ +const formatProcessInvocation = (input: { readonly command: string; readonly args: ReadonlyArray; readonly cwd?: string | undefined; - readonly cause: unknown; -}> {} +}): string => { + const command = [input.command, ...input.args].join(" "); + return input.cwd === undefined ? `'${command}'` : `'${command}' in '${input.cwd}'`; +}; -export class ProcessOutputLimitError extends Data.TaggedError("ProcessOutputLimitError")<{ - readonly command: string; - readonly args: ReadonlyArray; - readonly cwd?: string | undefined; - readonly stream: "stdout" | "stderr"; - readonly maxBytes: number; -}> {} +export class ProcessSpawnError extends Schema.TaggedErrorClass()( + "ProcessSpawnError", + { + ...ProcessInvocationFields, + cause: Schema.Defect(), + }, +) { + override get message(): string { + return `Failed to spawn process ${formatProcessInvocation(this)}`; + } +} -export class ProcessReadError extends Data.TaggedError("ProcessReadError")<{ - readonly command: string; - readonly args: ReadonlyArray; - readonly cwd?: string | undefined; - readonly stream: "stdout" | "stderr" | "exitCode"; - readonly cause: unknown; -}> {} +export class ProcessStdinError extends Schema.TaggedErrorClass()( + "ProcessStdinError", + { + ...ProcessInvocationFields, + cause: Schema.Defect(), + }, +) { + override get message(): string { + return `Failed to write stdin for process ${formatProcessInvocation(this)}`; + } +} -export class ProcessTimeoutError extends Data.TaggedError("ProcessTimeoutError")<{ - readonly command: string; - readonly args: ReadonlyArray; - readonly cwd?: string | undefined; - readonly timeoutMs: number; -}> {} +export class ProcessOutputLimitError extends Schema.TaggedErrorClass()( + "ProcessOutputLimitError", + { + ...ProcessInvocationFields, + stream: Schema.Literals(["stdout", "stderr"]), + maxBytes: Schema.Number, + }, +) { + override get message(): string { + return `Process ${formatProcessInvocation(this)} ${this.stream} exceeded ${this.maxBytes} bytes`; + } +} + +export class ProcessReadError extends Schema.TaggedErrorClass()( + "ProcessReadError", + { + ...ProcessInvocationFields, + stream: Schema.Literals(["stdout", "stderr", "exitCode"]), + cause: Schema.Defect(), + }, +) { + override get message(): string { + return `Failed to read ${this.stream} for process ${formatProcessInvocation(this)}`; + } +} + +export class ProcessTimeoutError extends Schema.TaggedErrorClass()( + "ProcessTimeoutError", + { + ...ProcessInvocationFields, + timeoutMs: Schema.Number, + }, +) { + override get message(): string { + return `Process ${formatProcessInvocation(this)} timed out after ${this.timeoutMs}ms`; + } +} export type ProcessRunError = | ProcessSpawnError @@ -86,13 +127,12 @@ export type ProcessRunError = | ProcessReadError | ProcessTimeoutError; -export interface ProcessRunnerShape { - readonly run: (input: ProcessRunInput) => Effect.Effect; -} - -export class ProcessRunner extends Context.Service()( - "t3/processRunner", -) {} +export class ProcessRunner extends Context.Service< + ProcessRunner, + { + readonly run: (input: ProcessRunInput) => Effect.Effect; + } +>()("t3/processRunner") {} const DEFAULT_TIMEOUT = "60 seconds"; const DEFAULT_MAX_OUTPUT_BYTES = 8 * 1024 * 1024; @@ -332,10 +372,10 @@ const runProcessCore = Effect.fn("processRunner.runProcessCore")(function* ( } satisfies ProcessRunOutput; }); -export const make = Effect.fn("makeProcessRunner")(function* () { +export const make = Effect.fn("ProcessRunner.make")(function* () { const spawner = yield* ChildProcessSpawner.ChildProcessSpawner; - const run: ProcessRunnerShape["run"] = (input) => + const run: ProcessRunner["Service"]["run"] = (input) => finalizeRunProcess(runProcessCore(spawner, input), input); return ProcessRunner.of({