From 6206cfd60ab43537fd2879c925df1798099b0ce6 Mon Sep 17 00:00:00 2001 From: Daksh Mhatre Date: Fri, 12 Jun 2026 08:59:52 +0000 Subject: [PATCH 1/3] feat(research): manual retry, resume, and proper lifecycle states - Add distinct 'cancelled' terminal state (was conflated with 'completed') - Add resumeSession() for cancelled/failed sessions - Add manual retryTask() with UI button (replaces automatic retry) - Add POST /tasks/:taskId/retry endpoint - Fix agent adapter type: 'claude' -> 'claude_local' with command config - Add migration 0095 for cancelled/cancelling enum values - Fix migration 0094 IF NOT EXISTS for idempotent installs --- .../db/src/migrations/0094_smart_orphan.sql | 173 ++++ .../0095_research_cancelled_status.sql | 12 + server/src/routes/research.ts | 283 +++++++ server/src/services/hire-hook.ts | 2 +- server/src/services/research-engine.ts | 786 ++++++++++++++++++ server/src/services/research.ts | 511 ++++++++++++ ui/src/api/research.ts | 150 ++++ ui/src/components/ResearchTaskList.tsx | 188 +++++ ui/src/pages/ResearchSessionDetail.tsx | 500 +++++++++++ ui/src/pages/ResearchSessions.tsx | 172 ++++ 10 files changed, 2776 insertions(+), 1 deletion(-) create mode 100644 packages/db/src/migrations/0094_smart_orphan.sql create mode 100644 packages/db/src/migrations/0095_research_cancelled_status.sql create mode 100644 server/src/routes/research.ts create mode 100644 server/src/services/research-engine.ts create mode 100644 server/src/services/research.ts create mode 100644 ui/src/api/research.ts create mode 100644 ui/src/components/ResearchTaskList.tsx create mode 100644 ui/src/pages/ResearchSessionDetail.tsx create mode 100644 ui/src/pages/ResearchSessions.tsx diff --git a/packages/db/src/migrations/0094_smart_orphan.sql b/packages/db/src/migrations/0094_smart_orphan.sql new file mode 100644 index 00000000000..1864ce89d30 --- /dev/null +++ b/packages/db/src/migrations/0094_smart_orphan.sql @@ -0,0 +1,173 @@ +DO $$ BEGIN IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'research_depth') THEN CREATE TYPE "public"."research_depth" AS ENUM('shallow', 'medium', 'deep'); END IF; END $$;--> statement-breakpoint +DO $$ BEGIN IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'research_finding_confidence') THEN CREATE TYPE "public"."research_finding_confidence" AS ENUM('high', 'medium', 'low'); END IF; END $$;--> statement-breakpoint +DO $$ BEGIN IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'research_session_status') THEN CREATE TYPE "public"."research_session_status" AS ENUM('planning', 'running', 'cancelling', 'paused', 'completed', 'failed', 'cancelled'); END IF; END $$;--> statement-breakpoint +DO $$ BEGIN IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'research_task_status') THEN CREATE TYPE "public"."research_task_status" AS ENUM('pending', 'running', 'completed', 'failed', 'skipped'); END IF; END $$;--> statement-breakpoint +CREATE TABLE IF NOT EXISTS "cloud_upstream_connections" ( + "id" uuid PRIMARY KEY DEFAULT gen_random_uuid() NOT NULL, + "company_id" uuid NOT NULL, + "remote_url" text NOT NULL, + "source_instance_id" text NOT NULL, + "source_instance_fingerprint" text NOT NULL, + "source_public_key" text NOT NULL, + "private_key_pem" text NOT NULL, + "token_status" text NOT NULL, + "scopes" text[] DEFAULT '{}' NOT NULL, + "authorized_global_user_id" text, + "access_token" text, + "token_id" text, + "token_expires_at" timestamp with time zone, + "target_stack_id" text NOT NULL, + "target_stack_slug" text, + "target_stack_display_name" text, + "target_company_id" text NOT NULL, + "target_origin" text NOT NULL, + "target_primary_host" text NOT NULL, + "target_product" text NOT NULL, + "target_schema_major" integer NOT NULL, + "target_max_chunk_bytes" integer NOT NULL, + "pending_state" text, + "pending_code_verifier" text, + "pending_redirect_uri" text, + "pending_token_url" text, + "last_run_id" uuid, + "created_at" timestamp with time zone DEFAULT now() NOT NULL, + "updated_at" timestamp with time zone DEFAULT now() NOT NULL +); +--> statement-breakpoint +CREATE TABLE IF NOT EXISTS "cloud_upstream_runs" ( + "id" uuid PRIMARY KEY DEFAULT gen_random_uuid() NOT NULL, + "connection_id" uuid NOT NULL, + "company_id" uuid NOT NULL, + "remote_run_id" text, + "status" text NOT NULL, + "active_step" text NOT NULL, + "progress_percent" integer DEFAULT 0 NOT NULL, + "dry_run" boolean DEFAULT false NOT NULL, + "retry_of_run_id" uuid, + "summary" jsonb DEFAULT '[]'::jsonb NOT NULL, + "warnings" jsonb DEFAULT '[]'::jsonb NOT NULL, + "conflicts" jsonb DEFAULT '[]'::jsonb NOT NULL, + "events" jsonb DEFAULT '[]'::jsonb NOT NULL, + "report" jsonb DEFAULT '{}'::jsonb NOT NULL, + "idempotency_key" text NOT NULL, + "manifest_hash" text NOT NULL, + "target_url" text, + "created_at" timestamp with time zone DEFAULT now() NOT NULL, + "updated_at" timestamp with time zone DEFAULT now() NOT NULL, + "completed_at" timestamp with time zone +); +--> statement-breakpoint +CREATE TABLE IF NOT EXISTS "research_findings" ( + "id" uuid PRIMARY KEY DEFAULT gen_random_uuid() NOT NULL, + "task_id" uuid NOT NULL, + "session_id" uuid NOT NULL, + "company_id" uuid NOT NULL, + "content" text NOT NULL, + "source_url" text, + "source_title" text, + "source_domain" text, + "confidence" "research_finding_confidence" DEFAULT 'medium', + "reliability_score" integer, + "category" text, + "is_duplicate" boolean DEFAULT false, + "duplicate_of_id" uuid, + "metadata" jsonb DEFAULT '{}'::jsonb, + "created_at" timestamp with time zone DEFAULT now() NOT NULL +); +--> statement-breakpoint +CREATE TABLE IF NOT EXISTS "research_memory" ( + "id" uuid PRIMARY KEY DEFAULT gen_random_uuid() NOT NULL, + "company_id" uuid NOT NULL, + "key" text NOT NULL, + "value" jsonb NOT NULL, + "session_id" uuid, + "source_finding_id" uuid, + "created_at" timestamp with time zone DEFAULT now() NOT NULL, + "updated_at" timestamp with time zone DEFAULT now() NOT NULL +); +--> statement-breakpoint +CREATE TABLE IF NOT EXISTS "research_sessions" ( + "id" uuid PRIMARY KEY DEFAULT gen_random_uuid() NOT NULL, + "company_id" uuid NOT NULL, + "title" text NOT NULL, + "query" text NOT NULL, + "status" "research_session_status" DEFAULT 'planning' NOT NULL, + "plan" jsonb, + "report" text, + "original_report" text, + "is_edited" boolean DEFAULT false NOT NULL, + "progress_percent" integer DEFAULT 0 NOT NULL, + "depth" "research_depth" DEFAULT 'medium' NOT NULL, + "max_subtopics" integer DEFAULT 5 NOT NULL, + "created_by" text NOT NULL, + "started_at" timestamp with time zone, + "completed_at" timestamp with time zone, + "created_at" timestamp with time zone DEFAULT now() NOT NULL, + "updated_at" timestamp with time zone DEFAULT now() NOT NULL +); +--> statement-breakpoint +CREATE TABLE IF NOT EXISTS "research_sources" ( + "id" uuid PRIMARY KEY DEFAULT gen_random_uuid() NOT NULL, + "session_id" uuid NOT NULL, + "company_id" uuid NOT NULL, + "url" text NOT NULL, + "title" text, + "domain" text, + "reliability_score" integer, + "access_count" integer DEFAULT 1 NOT NULL, + "last_accessed_at" timestamp with time zone, + "created_at" timestamp with time zone DEFAULT now() NOT NULL +); +--> statement-breakpoint +CREATE TABLE IF NOT EXISTS "research_tasks" ( + "id" uuid PRIMARY KEY DEFAULT gen_random_uuid() NOT NULL, + "session_id" uuid NOT NULL, + "company_id" uuid NOT NULL, + "title" text NOT NULL, + "status" "research_task_status" DEFAULT 'pending' NOT NULL, + "findings_summary" text, + "sources" jsonb DEFAULT '[]'::jsonb, + "reliability_score" integer, + "started_at" timestamp with time zone, + "completed_at" timestamp with time zone, + "sequence_order" integer DEFAULT 0 NOT NULL, + "created_at" timestamp with time zone DEFAULT now() NOT NULL, + "updated_at" timestamp with time zone DEFAULT now() NOT NULL +); +--> statement-breakpoint +DO $$ BEGIN IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'cloud_upstream_connections_company_id_companies_id_fk') THEN ALTER TABLE "cloud_upstream_connections" ADD CONSTRAINT "cloud_upstream_connections_company_id_companies_id_fk" FOREIGN KEY ("company_id") REFERENCES "public"."companies"("id") ON DELETE cascade ON UPDATE no action; END IF; END $$;--> statement-breakpoint +DO $$ BEGIN IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'cloud_upstream_runs_connection_id_cloud_upstream_connections_id_fk') THEN ALTER TABLE "cloud_upstream_runs" ADD CONSTRAINT "cloud_upstream_runs_connection_id_cloud_upstream_connections_id_fk" FOREIGN KEY ("connection_id") REFERENCES "public"."cloud_upstream_connections"("id") ON DELETE cascade ON UPDATE no action; END IF; END $$;--> statement-breakpoint +DO $$ BEGIN IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'cloud_upstream_runs_company_id_companies_id_fk') THEN ALTER TABLE "cloud_upstream_runs" ADD CONSTRAINT "cloud_upstream_runs_company_id_companies_id_fk" FOREIGN KEY ("company_id") REFERENCES "public"."companies"("id") ON DELETE cascade ON UPDATE no action; END IF; END $$;--> statement-breakpoint +DO $$ BEGIN IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'research_findings_task_id_research_tasks_id_fk') THEN ALTER TABLE "research_findings" ADD CONSTRAINT "research_findings_task_id_research_tasks_id_fk" FOREIGN KEY ("task_id") REFERENCES "public"."research_tasks"("id") ON DELETE cascade ON UPDATE no action; END IF; END $$;--> statement-breakpoint +DO $$ BEGIN IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'research_findings_session_id_research_sessions_id_fk') THEN ALTER TABLE "research_findings" ADD CONSTRAINT "research_findings_session_id_research_sessions_id_fk" FOREIGN KEY ("session_id") REFERENCES "public"."research_sessions"("id") ON DELETE cascade ON UPDATE no action; END IF; END $$;--> statement-breakpoint +DO $$ BEGIN IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'research_findings_company_id_companies_id_fk') THEN ALTER TABLE "research_findings" ADD CONSTRAINT "research_findings_company_id_companies_id_fk" FOREIGN KEY ("company_id") REFERENCES "public"."companies"("id") ON DELETE cascade ON UPDATE no action; END IF; END $$;--> statement-breakpoint +DO $$ BEGIN IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'research_findings_duplicate_of_id_research_findings_id_fk') THEN ALTER TABLE "research_findings" ADD CONSTRAINT "research_findings_duplicate_of_id_research_findings_id_fk" FOREIGN KEY ("duplicate_of_id") REFERENCES "public"."research_findings"("id") ON DELETE no action ON UPDATE no action; END IF; END $$;--> statement-breakpoint +DO $$ BEGIN IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'research_memory_company_id_companies_id_fk') THEN ALTER TABLE "research_memory" ADD CONSTRAINT "research_memory_company_id_companies_id_fk" FOREIGN KEY ("company_id") REFERENCES "public"."companies"("id") ON DELETE cascade ON UPDATE no action; END IF; END $$;--> statement-breakpoint +DO $$ BEGIN IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'research_memory_session_id_research_sessions_id_fk') THEN ALTER TABLE "research_memory" ADD CONSTRAINT "research_memory_session_id_research_sessions_id_fk" FOREIGN KEY ("session_id") REFERENCES "public"."research_sessions"("id") ON DELETE set null ON UPDATE no action; END IF; END $$;--> statement-breakpoint +DO $$ BEGIN IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'research_memory_source_finding_id_research_findings_id_fk') THEN ALTER TABLE "research_memory" ADD CONSTRAINT "research_memory_source_finding_id_research_findings_id_fk" FOREIGN KEY ("source_finding_id") REFERENCES "public"."research_findings"("id") ON DELETE set null ON UPDATE no action; END IF; END $$;--> statement-breakpoint +DO $$ BEGIN IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'research_sessions_company_id_companies_id_fk') THEN ALTER TABLE "research_sessions" ADD CONSTRAINT "research_sessions_company_id_companies_id_fk" FOREIGN KEY ("company_id") REFERENCES "public"."companies"("id") ON DELETE cascade ON UPDATE no action; END IF; END $$;--> statement-breakpoint +DO $$ BEGIN IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'research_sources_session_id_research_sessions_id_fk') THEN ALTER TABLE "research_sources" ADD CONSTRAINT "research_sources_session_id_research_sessions_id_fk" FOREIGN KEY ("session_id") REFERENCES "public"."research_sessions"("id") ON DELETE cascade ON UPDATE no action; END IF; END $$;--> statement-breakpoint +DO $$ BEGIN IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'research_sources_company_id_companies_id_fk') THEN ALTER TABLE "research_sources" ADD CONSTRAINT "research_sources_company_id_companies_id_fk" FOREIGN KEY ("company_id") REFERENCES "public"."companies"("id") ON DELETE cascade ON UPDATE no action; END IF; END $$;--> statement-breakpoint +DO $$ BEGIN IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'research_tasks_session_id_research_sessions_id_fk') THEN ALTER TABLE "research_tasks" ADD CONSTRAINT "research_tasks_session_id_research_sessions_id_fk" FOREIGN KEY ("session_id") REFERENCES "public"."research_sessions"("id") ON DELETE cascade ON UPDATE no action; END IF; END $$;--> statement-breakpoint +DO $$ BEGIN IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'research_tasks_company_id_companies_id_fk') THEN ALTER TABLE "research_tasks" ADD CONSTRAINT "research_tasks_company_id_companies_id_fk" FOREIGN KEY ("company_id") REFERENCES "public"."companies"("id") ON DELETE cascade ON UPDATE no action; END IF; END $$;--> statement-breakpoint +CREATE INDEX IF NOT EXISTS "cloud_upstream_connections_company_idx" ON "cloud_upstream_connections" USING btree ("company_id");--> statement-breakpoint +CREATE INDEX IF NOT EXISTS "cloud_upstream_runs_company_created_idx" ON "cloud_upstream_runs" USING btree ("company_id","created_at");--> statement-breakpoint +CREATE INDEX IF NOT EXISTS "cloud_upstream_runs_connection_idx" ON "cloud_upstream_runs" USING btree ("connection_id");--> statement-breakpoint +CREATE INDEX IF NOT EXISTS "research_findings_task_idx" ON "research_findings" USING btree ("task_id");--> statement-breakpoint +CREATE INDEX IF NOT EXISTS "research_findings_session_idx" ON "research_findings" USING btree ("session_id");--> statement-breakpoint +CREATE INDEX IF NOT EXISTS "research_findings_company_idx" ON "research_findings" USING btree ("company_id");--> statement-breakpoint +CREATE INDEX IF NOT EXISTS "research_findings_duplicate_idx" ON "research_findings" USING btree ("is_duplicate","duplicate_of_id");--> statement-breakpoint +CREATE INDEX IF NOT EXISTS "research_findings_category_idx" ON "research_findings" USING btree ("category");--> statement-breakpoint +CREATE INDEX IF NOT EXISTS "research_findings_company_session_created_idx" ON "research_findings" USING btree ("company_id","session_id","created_at");--> statement-breakpoint +CREATE UNIQUE INDEX IF NOT EXISTS "research_memory_company_key_idx" ON "research_memory" USING btree ("company_id","key");--> statement-breakpoint +CREATE INDEX IF NOT EXISTS "research_memory_session_idx" ON "research_memory" USING btree ("session_id");--> statement-breakpoint +CREATE INDEX IF NOT EXISTS "research_sessions_company_idx" ON "research_sessions" USING btree ("company_id");--> statement-breakpoint +CREATE INDEX IF NOT EXISTS "research_sessions_status_idx" ON "research_sessions" USING btree ("status");--> statement-breakpoint +CREATE INDEX IF NOT EXISTS "research_sessions_created_idx" ON "research_sessions" USING btree ("company_id","created_at");--> statement-breakpoint +CREATE INDEX IF NOT EXISTS "research_sources_session_idx" ON "research_sources" USING btree ("session_id");--> statement-breakpoint +CREATE INDEX IF NOT EXISTS "research_sources_url_idx" ON "research_sources" USING btree ("url");--> statement-breakpoint +CREATE INDEX IF NOT EXISTS "research_sources_domain_idx" ON "research_sources" USING btree ("domain");--> statement-breakpoint +CREATE INDEX IF NOT EXISTS "research_tasks_session_idx" ON "research_tasks" USING btree ("session_id");--> statement-breakpoint +CREATE INDEX IF NOT EXISTS "research_tasks_company_idx" ON "research_tasks" USING btree ("company_id");--> statement-breakpoint +CREATE INDEX IF NOT EXISTS "research_tasks_status_idx" ON "research_tasks" USING btree ("status");--> statement-breakpoint +CREATE INDEX IF NOT EXISTS "research_tasks_session_order_idx" ON "research_tasks" USING btree ("session_id","sequence_order"); \ No newline at end of file diff --git a/packages/db/src/migrations/0095_research_cancelled_status.sql b/packages/db/src/migrations/0095_research_cancelled_status.sql new file mode 100644 index 00000000000..b3e2bf5e087 --- /dev/null +++ b/packages/db/src/migrations/0095_research_cancelled_status.sql @@ -0,0 +1,12 @@ +-- Add missing enum values for research session status +DO $$ BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_enum WHERE enumlabel = 'cancelling' AND enumtypid = (SELECT oid FROM pg_type WHERE typname = 'research_session_status')) THEN + ALTER TYPE "research_session_status" ADD VALUE 'cancelling'; + END IF; +END $$; + +DO $$ BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_enum WHERE enumlabel = 'cancelled' AND enumtypid = (SELECT oid FROM pg_type WHERE typname = 'research_session_status')) THEN + ALTER TYPE "research_session_status" ADD VALUE 'cancelled'; + END IF; +END $$; diff --git a/server/src/routes/research.ts b/server/src/routes/research.ts new file mode 100644 index 00000000000..42ecc3fd3c8 --- /dev/null +++ b/server/src/routes/research.ts @@ -0,0 +1,283 @@ +import { Router } from "express"; +import type { Db } from "@paperclipai/db"; +import { + createResearchSessionSchema, + updateResearchSessionSchema, + generateSubtopicsSchema, + createResearchTaskSchema, + updateResearchTaskSchema, + createResearchFindingSchema, + markDuplicateSchema, + createResearchMemorySchema, +} from "@paperclipai/shared"; +import { researchService } from "../services/research.js"; +import { assertCompanyAccess } from "./authz.js"; +import { badRequest } from "../errors.js"; +import type { Config } from "../config.js"; + +export function researchRoutes(db: Db, config?: Partial) { + const router = Router(); + const svc = researchService(db, config as Config | undefined); + + // ──────────────────────────────────────────────────────────────────────────── + // Sessions + // ──────────────────────────────────────────────────────────────────────────── + + router.get("/companies/:companyId/research/sessions", async (req, res) => { + const companyId = req.params.companyId as string; + assertCompanyAccess(req, companyId); + + const status = req.query.status as string | undefined; + const limit = req.query.limit ? Number(req.query.limit) : undefined; + const offset = req.query.offset ? Number(req.query.offset) : undefined; + + const result = await svc.listSessions(companyId, { status, limit, offset }); + res.json(result); + }); + + router.post("/companies/:companyId/research/sessions", async (req, res) => { + const companyId = req.params.companyId as string; + assertCompanyAccess(req, companyId); + + const parsed = createResearchSessionSchema.safeParse(req.body); + if (!parsed.success) { + throw badRequest("Invalid request body", parsed.error.format()); + } + + const actorId = req.actor.type === "board" ? req.actor.userId ?? "unknown" : req.actor.agentId ?? "agent"; + const session = await svc.createSession(companyId, actorId, parsed.data); + res.status(201).json(session); + }); + + router.get("/companies/:companyId/research/sessions/:sessionId", async (req, res) => { + const companyId = req.params.companyId as string; + const sessionId = req.params.sessionId as string; + assertCompanyAccess(req, companyId); + + const session = await svc.getSession(companyId, sessionId); + res.json(session); + }); + + router.patch("/companies/:companyId/research/sessions/:sessionId", async (req, res) => { + const companyId = req.params.companyId as string; + const sessionId = req.params.sessionId as string; + assertCompanyAccess(req, companyId); + + const parsed = updateResearchSessionSchema.safeParse(req.body); + if (!parsed.success) { + throw badRequest("Invalid request body", parsed.error.format()); + } + + const session = await svc.updateSession(companyId, sessionId, parsed.data); + res.json(session); + }); + + router.delete("/companies/:companyId/research/sessions/:sessionId", async (req, res) => { + const companyId = req.params.companyId as string; + const sessionId = req.params.sessionId as string; + assertCompanyAccess(req, companyId); + + await svc.deleteSession(companyId, sessionId); + res.status(204).send(); + }); + + // ──────────────────────────────────────────────────────────────────────────── + // Engine + // ──────────────────────────────────────────────────────────────────────────── + + router.post("/companies/:companyId/research/sessions/:sessionId/start", async (req, res) => { + const companyId = req.params.companyId as string; + const sessionId = req.params.sessionId as string; + assertCompanyAccess(req, companyId); + + const result = await svc.startSession(companyId, sessionId); + res.json(result); + }); + + router.post("/companies/:companyId/research/generate-subtopics", async (req, res) => { + const companyId = req.params.companyId as string; + assertCompanyAccess(req, companyId); + + const parsed = generateSubtopicsSchema.safeParse(req.body); + if (!parsed.success) { + throw badRequest("Invalid request body", parsed.error.format()); + } + + const result = await svc.generateSubtopics(parsed.data); + res.json(result); + }); + + router.post("/companies/:companyId/research/sessions/:sessionId/cancel", async (req, res) => { + const companyId = req.params.companyId as string; + const sessionId = req.params.sessionId as string; + assertCompanyAccess(req, companyId); + + const result = await svc.cancelSession(companyId, sessionId); + res.json(result); + }); + + router.post("/companies/:companyId/research/sessions/:sessionId/resume", async (req, res) => { + const companyId = req.params.companyId as string; + const sessionId = req.params.sessionId as string; + assertCompanyAccess(req, companyId); + + const result = await svc.resumeSession(companyId, sessionId); + res.json(result); + }); + + router.post("/companies/:companyId/research/sessions/:sessionId/tasks/:taskId/retry", async (req, res) => { + const companyId = req.params.companyId as string; + const sessionId = req.params.sessionId as string; + const taskId = req.params.taskId as string; + assertCompanyAccess(req, companyId); + + const result = await svc.retryTask(companyId, sessionId, taskId); + res.json(result); + }); + + // ──────────────────────────────────────────────────────────────────────────── + // Tasks + // ──────────────────────────────────────────────────────────────────────────── + + router.get("/companies/:companyId/research/sessions/:sessionId/tasks", async (req, res) => { + const companyId = req.params.companyId as string; + const sessionId = req.params.sessionId as string; + assertCompanyAccess(req, companyId); + + const tasks = await svc.listTasks(companyId, sessionId); + res.json(tasks); + }); + + router.post("/companies/:companyId/research/sessions/:sessionId/tasks", async (req, res) => { + const companyId = req.params.companyId as string; + const sessionId = req.params.sessionId as string; + assertCompanyAccess(req, companyId); + + const parsed = createResearchTaskSchema.safeParse(req.body); + if (!parsed.success) { + throw badRequest("Invalid request body", parsed.error.format()); + } + + const task = await svc.createTask(companyId, sessionId, parsed.data); + res.status(201).json(task); + }); + + router.get("/companies/:companyId/research/tasks/:taskId", async (req, res) => { + const companyId = req.params.companyId as string; + const taskId = req.params.taskId as string; + assertCompanyAccess(req, companyId); + + const task = await svc.getTask(companyId, taskId); + res.json(task); + }); + + router.patch("/companies/:companyId/research/tasks/:taskId", async (req, res) => { + const companyId = req.params.companyId as string; + const taskId = req.params.taskId as string; + assertCompanyAccess(req, companyId); + + const parsed = updateResearchTaskSchema.safeParse(req.body); + if (!parsed.success) { + throw badRequest("Invalid request body", parsed.error.format()); + } + + const task = await svc.updateTask(companyId, taskId, parsed.data); + res.json(task); + }); + + // ──────────────────────────────────────────────────────────────────────────── + // Findings + // ──────────────────────────────────────────────────────────────────────────── + + router.get("/companies/:companyId/research/tasks/:taskId/findings", async (req, res) => { + const companyId = req.params.companyId as string; + const taskId = req.params.taskId as string; + assertCompanyAccess(req, companyId); + + const limit = req.query.limit ? Number(req.query.limit) : undefined; + const offset = req.query.offset ? Number(req.query.offset) : undefined; + + const result = await svc.listFindings(companyId, { taskId, limit, offset }); + res.json(result); + }); + + router.post("/companies/:companyId/research/findings", async (req, res) => { + const companyId = req.params.companyId as string; + assertCompanyAccess(req, companyId); + + const parsed = createResearchFindingSchema.safeParse(req.body); + if (!parsed.success) { + throw badRequest("Invalid request body", parsed.error.format()); + } + + const finding = await svc.createFinding(companyId, parsed.data); + res.status(201).json(finding); + }); + + router.post("/companies/:companyId/research/findings/:findingId/mark-duplicate", async (req, res) => { + const companyId = req.params.companyId as string; + const findingId = req.params.findingId as string; + assertCompanyAccess(req, companyId); + + const parsed = markDuplicateSchema.safeParse(req.body); + if (!parsed.success) { + throw badRequest("Invalid request body", parsed.error.format()); + } + + const finding = await svc.markDuplicate(companyId, findingId, parsed.data.duplicateOfId); + res.json(finding); + }); + + // ──────────────────────────────────────────────────────────────────────────── + // Sources + // ──────────────────────────────────────────────────────────────────────────── + + router.get("/companies/:companyId/research/sessions/:sessionId/sources", async (req, res) => { + const companyId = req.params.companyId as string; + const sessionId = req.params.sessionId as string; + assertCompanyAccess(req, companyId); + + const sources = await svc.getSources(companyId, sessionId); + res.json(sources); + }); + + // ──────────────────────────────────────────────────────────────────────────── + // Memory + // ──────────────────────────────────────────────────────────────────────────── + + router.get("/companies/:companyId/research/memory", async (req, res) => { + const companyId = req.params.companyId as string; + assertCompanyAccess(req, companyId); + + const key = req.query.key as string | undefined; + const memory = await svc.getMemory(companyId, key); + res.json(memory); + }); + + router.post("/companies/:companyId/research/memory", async (req, res) => { + const companyId = req.params.companyId as string; + assertCompanyAccess(req, companyId); + + const parsed = createResearchMemorySchema.safeParse(req.body); + if (!parsed.success) { + throw badRequest("Invalid request body", parsed.error.format()); + } + + const memory = await svc.setMemory(companyId, parsed.data); + res.status(201).json(memory); + }); + + // ──────────────────────────────────────────────────────────────────────────── + // Dashboard + // ──────────────────────────────────────────────────────────────────────────── + + router.get("/companies/:companyId/research/dashboard", async (req, res) => { + const companyId = req.params.companyId as string; + assertCompanyAccess(req, companyId); + + const dashboard = await svc.getDashboard(companyId); + res.json(dashboard); + }); + + return router; +} diff --git a/server/src/services/hire-hook.ts b/server/src/services/hire-hook.ts index 79a38177ee1..27a5221e776 100644 --- a/server/src/services/hire-hook.ts +++ b/server/src/services/hire-hook.ts @@ -39,7 +39,7 @@ export async function notifyHireApproved( return; } - const adapterType = row.adapterType ?? "process"; + const adapterType = row.adapterType ?? "claude_local"; const adapter = findActiveServerAdapter(adapterType); const onHireApproved = adapter?.onHireApproved; if (!onHireApproved) { diff --git a/server/src/services/research-engine.ts b/server/src/services/research-engine.ts new file mode 100644 index 00000000000..dde33cfb009 --- /dev/null +++ b/server/src/services/research-engine.ts @@ -0,0 +1,786 @@ +/** + * Research Engine Service + * + * Core orchestrator for autonomous research execution. + * + * Workflow: + * planning → running → [cancelling] → completed/failed + * + * Each session runs sequentially (1 concurrent per company). + */ +import { eq, and } from "drizzle-orm"; +import type { Db } from "@paperclipai/db"; +import { + researchSessions, + researchTasks, + researchFindings, + researchSources, +} from "@paperclipai/db"; +import type { Config } from "../config.js"; +import { logger } from "../middleware/logger.js"; +import { generateResearchPlan, extractFindingsFromContent, generateResearchReport } from "./research-llm.js"; +import { createSearchProvider, type SearchProvider, filterSourcesByQuality, fetchPageContent } from "./research-search.js"; +import { researchProgressService } from "./research-progress.js"; + +// In-memory lock: one session per company at a time +const runningSessions = new Map(); // companyId -> sessionId +const cancelFlags = new Map(); // sessionId -> shouldCancel + +export interface ResearchEngineDeps { + db: Db; + config: Config; +} + +export function researchEngine(deps: ResearchEngineDeps) { + const { db, config } = deps; + const progress = researchProgressService(db); + + // Resolve search provider based on config + const providerType = config.researchSearchProvider; + const providerKey = + providerType === "serper" + ? config.serperApiKey + : providerType === "semantic-scholar" + ? config.semanticScholarApiKey + : undefined; + const searchProvider: SearchProvider = createSearchProvider(providerType, providerKey); + + return { + /** + * Start executing a research session. + */ + async executeSession(sessionId: string, companyId: string): Promise { + // Check concurrent session lock + const existing = runningSessions.get(companyId); + if (existing && existing !== sessionId) { + throw new Error(`Another research session is already running for this company: ${existing}`); + } + + // Set lock + runningSessions.set(companyId, sessionId); + cancelFlags.set(sessionId, false); + + try { + await runSession(sessionId, companyId); + } finally { + runningSessions.delete(companyId); + cancelFlags.delete(sessionId); + } + }, + + /** + * Request cancellation of a running session. + */ + async requestCancel(sessionId: string, companyId: string): Promise { + const running = runningSessions.get(companyId); + if (running !== sessionId) { + return false; // Not running or different session + } + + cancelFlags.set(sessionId, true); + + // Update status to cancelling + await db + .update(researchSessions) + .set({ status: "cancelling" as any, updatedAt: new Date() }) + .where( + and( + eq(researchSessions.id, sessionId), + eq(researchSessions.companyId, companyId) + ) + ); + + progress.publishSessionUpdate(companyId, sessionId, "cancelling", { + message: "Cancellation requested", + }); + + return true; + }, + + /** + * Check if a session is currently running. + */ + isRunning(sessionId: string, companyId: string): boolean { + return runningSessions.get(companyId) === sessionId; + }, + + /** + * Retry a single failed task. + * Resets the task to pending and re-executes it. + */ + async retryTask(taskId: string, sessionId: string, companyId: string): Promise { + // Check if session is already running + const existing = runningSessions.get(companyId); + if (existing && existing !== sessionId) { + throw new Error(`Another research session is already running for this company: ${existing}`); + } + + // Fetch the task + const [task] = await db + .select() + .from(researchTasks) + .where( + and( + eq(researchTasks.id, taskId), + eq(researchTasks.sessionId, sessionId), + eq(researchTasks.companyId, companyId) + ) + ) + .limit(1); + + if (!task) { + throw new Error("Research task not found"); + } + + if (task.status !== "failed") { + throw new Error(`Cannot retry task with status: ${task.status}. Only failed tasks can be retried.`); + } + + // Set lock + runningSessions.set(companyId, sessionId); + cancelFlags.set(sessionId, false); + + try { + // Reset task to pending + await db + .update(researchTasks) + .set({ + status: "pending" as any, + findingsSummary: null, + sources: null, + startedAt: null, + completedAt: null, + updatedAt: new Date(), + }) + .where(eq(researchTasks.id, taskId)); + + // Delete old findings for this task + await db + .delete(researchFindings) + .where(eq(researchFindings.taskId, taskId)); + + progress.publishTaskUpdate(companyId, sessionId, taskId, "pending", { + message: "Task queued for retry", + }); + + // Update session status back to running if it was failed + await db + .update(researchSessions) + .set({ status: "running" as any, updatedAt: new Date() }) + .where(eq(researchSessions.id, sessionId)); + + progress.publishSessionUpdate(companyId, sessionId, "running", { + message: `Retrying task: ${task.title}`, + }); + + // Re-execute the task + await executeTask(taskId, sessionId, companyId, task.title); + await progress.updateProgress(sessionId, companyId); + + // Check if all tasks are now completed + const remainingTasks = await db + .select() + .from(researchTasks) + .where(eq(researchTasks.sessionId, sessionId)); + + const allCompleted = remainingTasks.every((t) => t.status === "completed"); + const anyFailed = remainingTasks.some((t) => t.status === "failed"); + + if (allCompleted) { + // Generate report + const [session] = await db + .select() + .from(researchSessions) + .where( + and( + eq(researchSessions.id, sessionId), + eq(researchSessions.companyId, companyId) + ) + ) + .limit(1); + + const allFindings = await db + .select() + .from(researchFindings) + .where(eq(researchFindings.sessionId, sessionId)) + .orderBy(researchFindings.createdAt); + + const reportFindings = allFindings.map((f) => ({ + content: f.content, + category: f.category || "General", + confidence: f.confidence || "medium", + sourceUrl: f.sourceUrl, + sourceTitle: f.sourceTitle, + sourceDomain: f.sourceDomain, + })); + + const generated = await generateResearchReport( + session.query, + reportFindings, + { model: config.researchLlmModel, apiKey: config.researchLlmApiKey } + ); + + await db + .update(researchSessions) + .set({ + status: "completed" as any, + report: generated.markdown, + progressPercent: 100, + completedAt: new Date(), + updatedAt: new Date(), + }) + .where(eq(researchSessions.id, sessionId)); + + progress.publishSessionUpdate(companyId, sessionId, "completed", { + message: "Research completed after retry", + findingsCount: allFindings.length, + }); + } else if (anyFailed) { + // Some tasks still failed — mark session as failed + await db + .update(researchSessions) + .set({ + status: "failed" as any, + updatedAt: new Date(), + }) + .where(eq(researchSessions.id, sessionId)); + + progress.publishSessionUpdate(companyId, sessionId, "failed", { + message: "Some tasks still failed after retry", + }); + } else { + // Still running (more tasks pending) + await db + .update(researchSessions) + .set({ + status: "running" as any, + updatedAt: new Date(), + }) + .where(eq(researchSessions.id, sessionId)); + } + } finally { + runningSessions.delete(companyId); + cancelFlags.delete(sessionId); + } + }, + + /** + * Resume a cancelled or failed research session. + * Picks up from the last completed task. + */ + async resumeSession(sessionId: string, companyId: string): Promise { + // Check concurrent session lock + const existing = runningSessions.get(companyId); + if (existing && existing !== sessionId) { + throw new Error(`Another research session is already running for this company: ${existing}`); + } + + // Set lock + runningSessions.set(companyId, sessionId); + cancelFlags.set(sessionId, false); + + try { + await resumeRunSession(sessionId, companyId); + } finally { + runningSessions.delete(companyId); + cancelFlags.delete(sessionId); + } + }, + }; + + // ────────────────────────────────────────────────────────────────────────── + // Main execution loop + // ────────────────────────────────────────────────────────────────────────── + + async function runSession(sessionId: string, companyId: string): Promise { + logger.info({ sessionId, companyId }, "Research session starting"); + + // 1. Fetch session + const [session] = await db + .select() + .from(researchSessions) + .where( + and( + eq(researchSessions.id, sessionId), + eq(researchSessions.companyId, companyId) + ) + ) + .limit(1); + + if (!session) { + throw new Error("Research session not found"); + } + + // 2. Transition to running + await db + .update(researchSessions) + .set({ + status: "running" as any, + startedAt: new Date(), + updatedAt: new Date(), + }) + .where(eq(researchSessions.id, sessionId)); + + progress.publishSessionUpdate(companyId, sessionId, "running", { + message: "Research started", + }); + + try { + // 3. Generate plan (or reuse existing) + let plan = session.plan as { + strategy: string; + subtopics: Array<{ id: string; title: string; description: string; priority: number }>; + } | null; + + if (!plan || !plan.subtopics || plan.subtopics.length === 0) { + plan = await generateResearchPlan( + session.query, + session.maxSubtopics, + session.depth, + { model: config.researchLlmModel, apiKey: config.researchLlmApiKey } + ); + + // Save plan + await db + .update(researchSessions) + .set({ plan: plan as any, updatedAt: new Date() }) + .where(eq(researchSessions.id, sessionId)); + } + + // Check cancellation + if (isCancelled(sessionId)) { + await completeCancellation(sessionId, companyId); + return; + } + + // 4. Create tasks from plan + const subtopics = plan.subtopics.slice(0, session.maxSubtopics); + for (let i = 0; i < subtopics.length; i++) { + const subtopic = subtopics[i]; + await db.insert(researchTasks).values({ + sessionId, + companyId, + title: subtopic.title, + sequenceOrder: i, + status: "pending" as any, + }); + } + + logger.info( + { sessionId, taskCount: subtopics.length }, + "Research tasks created" + ); + + // 5. Execute each task + const tasks = await db + .select() + .from(researchTasks) + .where(eq(researchTasks.sessionId, sessionId)) + .orderBy(researchTasks.sequenceOrder); + + for (const task of tasks) { + if (isCancelled(sessionId)) { + await completeCancellation(sessionId, companyId); + return; + } + + await executeTask(task.id, sessionId, companyId, task.title); + await progress.updateProgress(sessionId, companyId); + } + + // 6. Generate report + const allFindings = await db + .select() + .from(researchFindings) + .where(eq(researchFindings.sessionId, sessionId)) + .orderBy(researchFindings.createdAt); + + const reportFindings = allFindings.map((f) => ({ + content: f.content, + category: f.category || "General", + confidence: f.confidence || "medium", + sourceUrl: f.sourceUrl, + sourceTitle: f.sourceTitle, + sourceDomain: f.sourceDomain, + })); + + const generated = await generateResearchReport( + session.query, + reportFindings, + { model: config.researchLlmModel, apiKey: config.researchLlmApiKey } + ); + + const report = generated.markdown; + + // 7. Mark completed + await db + .update(researchSessions) + .set({ + status: "completed" as any, + report, + progressPercent: 100, + completedAt: new Date(), + updatedAt: new Date(), + }) + .where(eq(researchSessions.id, sessionId)); + + progress.publishSessionUpdate(companyId, sessionId, "completed", { + message: "Research completed", + findingsCount: allFindings.length, + sourcesCount: generated.sources.length, + }); + + logger.info( + { sessionId, findingsCount: allFindings.length }, + "Research session completed" + ); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + logger.error({ sessionId, error: message }, "Research session failed"); + + await db + .update(researchSessions) + .set({ + status: "failed" as any, + updatedAt: new Date(), + }) + .where(eq(researchSessions.id, sessionId)); + + progress.publishSessionUpdate(companyId, sessionId, "failed", { + error: message, + }); + } + } + + // ────────────────────────────────────────────────────────────────────────── + // Resume session (pick up from where it was cancelled/failed) + // ────────────────────────────────────────────────────────────────────────── + + async function resumeRunSession(sessionId: string, companyId: string): Promise { + logger.info({ sessionId, companyId }, "Research session resuming"); + + // 1. Fetch session + const [session] = await db + .select() + .from(researchSessions) + .where( + and( + eq(researchSessions.id, sessionId), + eq(researchSessions.companyId, companyId) + ) + ) + .limit(1); + + if (!session) { + throw new Error("Research session not found"); + } + + // Only allow resuming cancelled or failed sessions + if (session.status !== "cancelled" && session.status !== "failed" && session.status !== "cancelling") { + throw new Error(`Cannot resume session with status: ${session.status}`); + } + + // 2. Transition to running + await db + .update(researchSessions) + .set({ + status: "running" as any, + updatedAt: new Date(), + }) + .where(eq(researchSessions.id, sessionId)); + + progress.publishSessionUpdate(companyId, sessionId, "running", { + message: "Research resumed", + }); + + try { + // 3. Find pending or failed tasks to resume + const tasks = await db + .select() + .from(researchTasks) + .where(eq(researchTasks.sessionId, sessionId)) + .orderBy(researchTasks.sequenceOrder); + + // Find the first task that is not completed + let resumeFromIndex = 0; + for (let i = 0; i < tasks.length; i++) { + if (tasks[i].status === "completed") { + resumeFromIndex = i + 1; + } else { + break; + } + } + + // Execute remaining tasks + for (let i = resumeFromIndex; i < tasks.length; i++) { + if (isCancelled(sessionId)) { + await completeCancellation(sessionId, companyId); + return; + } + + const task = tasks[i]; + await executeTask(task.id, sessionId, companyId, task.title); + await progress.updateProgress(sessionId, companyId); + } + + // 4. Generate report (reuse existing findings + new ones) + const allFindings = await db + .select() + .from(researchFindings) + .where(eq(researchFindings.sessionId, sessionId)) + .orderBy(researchFindings.createdAt); + + const reportFindings = allFindings.map((f) => ({ + content: f.content, + category: f.category || "General", + confidence: f.confidence || "medium", + sourceUrl: f.sourceUrl, + sourceTitle: f.sourceTitle, + sourceDomain: f.sourceDomain, + })); + + const generated = await generateResearchReport( + session.query, + reportFindings, + { model: config.researchLlmModel, apiKey: config.researchLlmApiKey } + ); + + const report = generated.markdown; + + // 5. Mark completed + await db + .update(researchSessions) + .set({ + status: "completed" as any, + report, + progressPercent: 100, + completedAt: new Date(), + updatedAt: new Date(), + }) + .where(eq(researchSessions.id, sessionId)); + + progress.publishSessionUpdate(companyId, sessionId, "completed", { + message: "Research completed (resumed)", + findingsCount: allFindings.length, + sourcesCount: generated.sources.length, + }); + + logger.info( + { sessionId, findingsCount: allFindings.length }, + "Research session completed after resume" + ); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + logger.error({ sessionId, error: message }, "Research session failed after resume"); + + await db + .update(researchSessions) + .set({ + status: "failed" as any, + updatedAt: new Date(), + }) + .where(eq(researchSessions.id, sessionId)); + + progress.publishSessionUpdate(companyId, sessionId, "failed", { + error: message, + }); + } + } + + // ────────────────────────────────────────────────────────────────────────── + // Task execution + // ────────────────────────────────────────────────────────────────────────── + + async function executeTask( + taskId: string, + sessionId: string, + companyId: string, + taskTitle: string + ): Promise { + logger.info({ taskId, taskTitle }, "Executing research task"); + + // Mark task as running + await db + .update(researchTasks) + .set({ status: "running" as any, startedAt: new Date(), updatedAt: new Date() }) + .where(eq(researchTasks.id, taskId)); + + progress.publishTaskUpdate(companyId, sessionId, taskId, "running", { + title: taskTitle, + }); + + try { + // Search for the task topic + const searchResults = await searchProvider.search( + taskTitle, + config.researchMaxSearchResults + ); + + // Filter by quality score + const qualityResults = filterSourcesByQuality(searchResults, 40); + const resultsToProcess = qualityResults.length > 0 ? qualityResults : searchResults; + + // Track unique sources for this task + const taskSources: Array<{ url: string; title: string; snippet: string; qualityScore?: number }> = []; + let findingsCount = 0; + + for (const result of resultsToProcess.slice(0, config.researchMaxFindingsPerTask)) { + if (isCancelled(sessionId)) break; + + // Publish source processing event + progress.publishSourceProcessing(companyId, sessionId, taskId, result.url, result.title); + + // Store source (skip if already exists for this session+url) + try { + await db + .insert(researchSources) + .values({ + sessionId, + companyId, + url: result.url, + title: result.title, + domain: result.domain, + reliabilityScore: result.qualityScore ?? confidenceToScore("medium"), + accessCount: 1, + lastAccessedAt: new Date(), + }); + } catch { + // Source already exists for this session+url, ignore + } + + taskSources.push({ + url: result.url, + title: result.title, + snippet: result.snippet, + qualityScore: result.qualityScore, + }); + + // Fetch real page content for richer findings + let contentToAnalyze = result.snippet; + const pageContent = await fetchPageContent(result.url); + if (pageContent) { + contentToAnalyze = pageContent; + } + + // Extract findings from content + const findings = await extractFindingsFromContent( + taskTitle, + result.title, + contentToAnalyze, + { model: config.researchLlmModel, apiKey: config.researchLlmApiKey } + ); + + for (const finding of findings) { + await db.insert(researchFindings).values({ + taskId, + sessionId, + companyId, + content: finding.content, + sourceUrl: result.url, + sourceTitle: result.title, + sourceDomain: result.domain, + confidence: finding.confidence as any, + category: finding.category, + reliabilityScore: result.qualityScore ?? confidenceToScore(finding.confidence), + }); + + findingsCount++; + progress.publishFindingCreated(companyId, sessionId, taskId, { + content: finding.content.slice(0, 100), + category: finding.category, + }); + } + + // Publish finding progress after each source + progress.publishFindingProgress( + companyId, + sessionId, + taskId, + findingsCount, + taskSources.length, + { currentSource: result.title } + ); + } + + // Update task with summary + await db + .update(researchTasks) + .set({ + status: "completed" as any, + findingsSummary: `Found ${findingsCount} finding(s) from ${taskSources.length} source(s)`, + sources: taskSources as any, + completedAt: new Date(), + updatedAt: new Date(), + }) + .where(eq(researchTasks.id, taskId)); + + progress.publishTaskUpdate(companyId, sessionId, taskId, "completed", { + findingsCount, + sourcesCount: taskSources.length, + }); + + logger.info({ taskId, findingsCount, sourcesCount: taskSources.length }, "Task completed"); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + logger.error({ taskId, error: message }, "Task failed"); + + await db + .update(researchTasks) + .set({ + status: "failed" as any, + findingsSummary: `Error: ${message}`, + updatedAt: new Date(), + }) + .where(eq(researchTasks.id, taskId)); + + progress.publishTaskUpdate(companyId, sessionId, taskId, "failed", { + error: message, + }); + + // Re-throw to fail the entire session + throw new Error(`Task failed: ${message}`); + } + } + + // ────────────────────────────────────────────────────────────────────────── + // Cancellation helpers + // ────────────────────────────────────────────────────────────────────────── + + function isCancelled(sessionId: string): boolean { + return cancelFlags.get(sessionId) === true; + } + + async function completeCancellation(sessionId: string, companyId: string): Promise { + logger.info({ sessionId }, "Research session cancelled"); + + await db + .update(researchSessions) + .set({ + status: "cancelled" as any, + updatedAt: new Date(), + }) + .where(eq(researchSessions.id, sessionId)); + + progress.publishSessionUpdate(companyId, sessionId, "cancelled", { + message: "Research cancelled by user", + cancelled: true, + }); + } +} + +// ────────────────────────────────────────────────────────────────────────── +// Helpers +// ────────────────────────────────────────────────────────────────────────── + +function confidenceToScore(confidence: string): number { + switch (confidence) { + case "high": + return 85; + case "medium": + return 60; + case "low": + return 35; + default: + return 50; + } +} diff --git a/server/src/services/research.ts b/server/src/services/research.ts new file mode 100644 index 00000000000..2cb435293e8 --- /dev/null +++ b/server/src/services/research.ts @@ -0,0 +1,511 @@ +import { and, eq, sql, desc, count } from "drizzle-orm"; +import type { Db } from "@paperclipai/db"; +import { + researchSessions, + researchTasks, + researchFindings, + researchSources, + researchMemory, +} from "@paperclipai/db"; +import { notFound } from "../errors.js"; +import { logger } from "../middleware/logger.js"; +import type { + CreateResearchSession, + UpdateResearchSession, + CreateResearchTask, + UpdateResearchTask, + CreateResearchFinding, + CreateResearchMemory, +} from "@paperclipai/shared"; +import { researchEngine } from "./research-engine.js"; +import { generateResearchPlan } from "./research-llm.js"; +import type { Config } from "../config.js"; + +export function researchService(db: Db, config?: Config) { + const engine = config ? researchEngine({ db, config }) : null; + + return { + // ────────────────────────────────────────────────────────────────────────── + // Engine + // ────────────────────────────────────────────────────────────────────────── + + startSession: async (companyId: string, sessionId: string) => { + if (!engine) { + throw new Error("Research engine is not configured"); + } + if (!config?.researchEngineEnabled) { + throw new Error("Research engine is disabled"); + } + + // Verify session exists and is in planning state + const session = await db + .select() + .from(researchSessions) + .where(and(eq(researchSessions.id, sessionId), eq(researchSessions.companyId, companyId))) + .then((rows) => rows[0] ?? null); + + if (!session) throw notFound("Research session not found"); + if (session.status !== "planning") { + throw new Error(`Cannot start session from status: ${session.status}`); + } + + // Start execution in background (do not await) + void engine.executeSession(sessionId, companyId).catch((err) => { + logger.error({ sessionId, err }, "Research engine execution error"); + }); + + return { started: true, sessionId }; + }, + + cancelSession: async (companyId: string, sessionId: string) => { + if (!engine) { + throw new Error("Research engine is not configured"); + } + + const cancelled = await engine.requestCancel(sessionId, companyId); + return { cancelled, sessionId }; + }, + + resumeSession: async (companyId: string, sessionId: string) => { + if (!engine) { + throw new Error("Research engine is not configured"); + } + if (!config?.researchEngineEnabled) { + throw new Error("Research engine is disabled"); + } + + // Verify session exists and is in cancelled or failed state + const session = await db + .select() + .from(researchSessions) + .where(and(eq(researchSessions.id, sessionId), eq(researchSessions.companyId, companyId))) + .then((rows) => rows[0] ?? null); + + if (!session) throw notFound("Research session not found"); + if (session.status !== "cancelled" && session.status !== "failed" && session.status !== "cancelling") { + throw new Error(`Cannot resume session from status: ${session.status}`); + } + + // Resume execution in background (do not await) + void engine.resumeSession(sessionId, companyId).catch((err) => { + logger.error({ sessionId, err }, "Research engine resume error"); + }); + + return { resumed: true, sessionId }; + }, + + retryTask: async (companyId: string, sessionId: string, taskId: string) => { + if (!engine) { + throw new Error("Research engine is not configured"); + } + if (!config?.researchEngineEnabled) { + throw new Error("Research engine is disabled"); + } + + // Verify session exists + const session = await db + .select() + .from(researchSessions) + .where(and(eq(researchSessions.id, sessionId), eq(researchSessions.companyId, companyId))) + .then((rows) => rows[0] ?? null); + + if (!session) throw notFound("Research session not found"); + + // Retry task in background (do not await) + void engine.retryTask(taskId, sessionId, companyId).catch((err) => { + logger.error({ sessionId, taskId, err }, "Research engine retry task error"); + }); + + return { retried: true, taskId, sessionId }; + }, + + // ────────────────────────────────────────────────────────────────────────── + // Subtopic Generation + // ────────────────────────────────────────────────────────────────────────── + + generateSubtopics: async (data: { query: string; depth?: string; maxSubtopics?: number }) => { + const plan = await generateResearchPlan( + data.query, + data.maxSubtopics ?? 5, + data.depth ?? "medium", + { model: config?.researchLlmModel, apiKey: config?.researchLlmApiKey } + ); + return plan; + }, + + // ────────────────────────────────────────────────────────────────────────── + // Sessions + // ────────────────────────────────────────────────────────────────────────── + + createSession: async (companyId: string, actorId: string, data: CreateResearchSession) => { + const [session] = await db + .insert(researchSessions) + .values({ + companyId, + title: data.title, + query: data.query, + depth: data.depth ?? "medium", + maxSubtopics: data.maxSubtopics ?? 5, + createdBy: actorId, + plan: data.plan ? (data.plan as any) : null, + }) + .returning(); + return session; + }, + + listSessions: async (companyId: string, opts?: { status?: string; limit?: number; offset?: number }) => { + const limit = opts?.limit ?? 50; + const offset = opts?.offset ?? 0; + + const conditions = [eq(researchSessions.companyId, companyId)]; + if (opts?.status) { + conditions.push(eq(researchSessions.status, opts.status as any)); + } + + const [items, totalResult] = await Promise.all([ + db + .select() + .from(researchSessions) + .where(and(...conditions)) + .orderBy(desc(researchSessions.createdAt)) + .limit(limit) + .offset(offset), + db + .select({ count: count() }) + .from(researchSessions) + .where(and(...conditions)) + .then((rows) => Number(rows[0]?.count ?? 0)), + ]); + + return { items, total: totalResult, limit, offset }; + }, + + getSession: async (companyId: string, sessionId: string) => { + const session = await db + .select() + .from(researchSessions) + .where(and(eq(researchSessions.id, sessionId), eq(researchSessions.companyId, companyId))) + .then((rows) => rows[0] ?? null); + + if (!session) throw notFound("Research session not found"); + + const [tasks, findings, sources] = await Promise.all([ + db + .select() + .from(researchTasks) + .where(eq(researchTasks.sessionId, sessionId)) + .orderBy(researchTasks.sequenceOrder), + db + .select() + .from(researchFindings) + .where(eq(researchFindings.sessionId, sessionId)) + .orderBy(desc(researchFindings.createdAt)), + db + .select() + .from(researchSources) + .where(eq(researchSources.sessionId, sessionId)) + .orderBy(desc(researchSources.accessCount)), + ]); + + return { ...session, tasks, findings, sources }; + }, + + updateSession: async (companyId: string, sessionId: string, data: UpdateResearchSession) => { + // Fetch current session to check if we need to save original report + const currentSession = await db + .select() + .from(researchSessions) + .where(and(eq(researchSessions.id, sessionId), eq(researchSessions.companyId, companyId))) + .then((rows) => rows[0] ?? null); + + if (!currentSession) throw notFound("Research session not found"); + + const updateData: Record = { + updatedAt: new Date(), + }; + + if (data.title !== undefined) updateData.title = data.title; + if (data.query !== undefined) updateData.query = data.query; + if (data.status !== undefined) updateData.status = data.status; + if (data.depth !== undefined) updateData.depth = data.depth; + if (data.maxSubtopics !== undefined) updateData.maxSubtopics = data.maxSubtopics; + + // Handle report editing + if (data.report !== undefined) { + // If this is the first edit, save the original report + if (!currentSession.isEdited && currentSession.report && !currentSession.originalReport) { + updateData.originalReport = currentSession.report; + } + updateData.report = data.report; + updateData.isEdited = true; + } + + const [updated] = await db + .update(researchSessions) + .set(updateData) + .where(and(eq(researchSessions.id, sessionId), eq(researchSessions.companyId, companyId))) + .returning(); + + return updated; + }, + + deleteSession: async (companyId: string, sessionId: string) => { + const [deleted] = await db + .delete(researchSessions) + .where(and(eq(researchSessions.id, sessionId), eq(researchSessions.companyId, companyId))) + .returning(); + + if (!deleted) throw notFound("Research session not found"); + return deleted; + }, + + // ────────────────────────────────────────────────────────────────────────── + // Tasks + // ────────────────────────────────────────────────────────────────────────── + + createTask: async (companyId: string, sessionId: string, data: CreateResearchTask) => { + const [task] = await db + .insert(researchTasks) + .values({ + companyId, + sessionId, + title: data.title, + sequenceOrder: data.sequenceOrder ?? 0, + }) + .returning(); + return task; + }, + + listTasks: async (companyId: string, sessionId: string) => { + return db + .select() + .from(researchTasks) + .where(and(eq(researchTasks.sessionId, sessionId), eq(researchTasks.companyId, companyId))) + .orderBy(researchTasks.sequenceOrder); + }, + + getTask: async (companyId: string, taskId: string) => { + const task = await db + .select() + .from(researchTasks) + .where(and(eq(researchTasks.id, taskId), eq(researchTasks.companyId, companyId))) + .then((rows) => rows[0] ?? null); + + if (!task) throw notFound("Research task not found"); + return task; + }, + + updateTask: async (companyId: string, taskId: string, data: UpdateResearchTask) => { + const [updated] = await db + .update(researchTasks) + .set({ + ...(data.title !== undefined && { title: data.title }), + ...(data.status !== undefined && { status: data.status as any }), + ...(data.findingsSummary !== undefined && { findingsSummary: data.findingsSummary }), + ...(data.sources !== undefined && { sources: data.sources }), + ...(data.reliabilityScore !== undefined && { reliabilityScore: data.reliabilityScore }), + updatedAt: new Date(), + }) + .where(and(eq(researchTasks.id, taskId), eq(researchTasks.companyId, companyId))) + .returning(); + + if (!updated) throw notFound("Research task not found"); + return updated; + }, + + // ────────────────────────────────────────────────────────────────────────── + // Findings + // ────────────────────────────────────────────────────────────────────────── + + createFinding: async (companyId: string, data: CreateResearchFinding) => { + const [finding] = await db + .insert(researchFindings) + .values({ + companyId, + taskId: data.taskId, + sessionId: ( + await db + .select({ sessionId: researchTasks.sessionId }) + .from(researchTasks) + .where(eq(researchTasks.id, data.taskId)) + .then((rows) => rows[0]?.sessionId) + )!, + content: data.content, + sourceUrl: data.sourceUrl, + sourceTitle: data.sourceTitle, + sourceDomain: data.sourceDomain, + confidence: data.confidence as any, + reliabilityScore: data.reliabilityScore, + category: data.category, + metadata: data.metadata ?? {}, + }) + .returning(); + return finding; + }, + + listFindings: async (companyId: string, opts?: { sessionId?: string; taskId?: string; category?: string; limit?: number; offset?: number }) => { + const limit = Math.min(opts?.limit ?? 50, 100); + const offset = opts?.offset ?? 0; + const conditions = [eq(researchFindings.companyId, companyId)]; + if (opts?.sessionId) conditions.push(eq(researchFindings.sessionId, opts.sessionId)); + if (opts?.taskId) conditions.push(eq(researchFindings.taskId, opts.taskId)); + if (opts?.category) conditions.push(eq(researchFindings.category, opts.category)); + + const [items, totalResult] = await Promise.all([ + db + .select() + .from(researchFindings) + .where(and(...conditions)) + .orderBy(desc(researchFindings.createdAt)) + .limit(limit) + .offset(offset), + db + .select({ count: sql`count(*)` }) + .from(researchFindings) + .where(and(...conditions)) + .then((rows) => Number(rows[0]?.count ?? 0)), + ]); + + return { items, total: totalResult, limit, offset }; + }, + + markDuplicate: async (companyId: string, findingId: string, duplicateOfId: string) => { + const [updated] = await db + .update(researchFindings) + .set({ isDuplicate: true, duplicateOfId }) + .where(and(eq(researchFindings.id, findingId), eq(researchFindings.companyId, companyId))) + .returning(); + + if (!updated) throw notFound("Research finding not found"); + return updated; + }, + + // ────────────────────────────────────────────────────────────────────────── + // Sources + // ────────────────────────────────────────────────────────────────────────── + + getSources: async (companyId: string, sessionId: string) => { + return db + .select() + .from(researchSources) + .where(and(eq(researchSources.sessionId, sessionId), eq(researchSources.companyId, companyId))) + .orderBy(desc(researchSources.accessCount)); + }, + + // ────────────────────────────────────────────────────────────────────────── + // Memory + // ────────────────────────────────────────────────────────────────────────── + + getMemory: async (companyId: string, key?: string) => { + if (key) { + return db + .select() + .from(researchMemory) + .where(and(eq(researchMemory.companyId, companyId), eq(researchMemory.key, key))) + .then((rows) => rows[0] ?? null); + } + return db + .select() + .from(researchMemory) + .where(eq(researchMemory.companyId, companyId)) + .orderBy(desc(researchMemory.updatedAt)); + }, + + setMemory: async (companyId: string, data: CreateResearchMemory) => { + const existing = await db + .select() + .from(researchMemory) + .where(and(eq(researchMemory.companyId, companyId), eq(researchMemory.key, data.key))) + .then((rows) => rows[0] ?? null); + + if (existing) { + const [updated] = await db + .update(researchMemory) + .set({ + value: data.value as any, + sessionId: data.sessionId ?? existing.sessionId, + sourceFindingId: data.sourceFindingId ?? existing.sourceFindingId, + updatedAt: new Date(), + }) + .where(eq(researchMemory.id, existing.id)) + .returning(); + return updated; + } + + const [created] = await db + .insert(researchMemory) + .values({ + companyId, + key: data.key, + value: data.value as any, + sessionId: data.sessionId, + sourceFindingId: data.sourceFindingId, + }) + .returning(); + return created; + }, + + // ────────────────────────────────────────────────────────────────────────── + // Dashboard + // ────────────────────────────────────────────────────────────────────────── + + getDashboard: async (companyId: string) => { + const [sessionStats, taskStats, findingStats, sourceStats] = await Promise.all([ + db + .select({ status: researchSessions.status, count: sql`count(*)` }) + .from(researchSessions) + .where(eq(researchSessions.companyId, companyId)) + .groupBy(researchSessions.status), + db + .select({ status: researchTasks.status, count: sql`count(*)` }) + .from(researchTasks) + .where(eq(researchTasks.companyId, companyId)) + .groupBy(researchTasks.status), + db + .select({ + total: sql`count(*)`, + duplicates: sql`count(*) filter (where ${researchFindings.isDuplicate} = true)`, + avgReliability: sql`avg(${researchFindings.reliabilityScore})`, + }) + .from(researchFindings) + .where(eq(researchFindings.companyId, companyId)), + db + .select({ count: sql`count(*)` }) + .from(researchSources) + .where(eq(researchSources.companyId, companyId)) + .then((rows) => Number(rows[0]?.count ?? 0)), + ]); + + const sessionCounts: Record = {}; + for (const row of sessionStats) { + sessionCounts[row.status] = Number(row.count); + } + + const taskCounts: Record = {}; + for (const row of taskStats) { + taskCounts[row.status] = Number(row.count); + } + + return { + sessions: { + total: Object.values(sessionCounts).reduce((a, b) => a + b, 0), + byStatus: sessionCounts, + }, + tasks: { + total: Object.values(taskCounts).reduce((a, b) => a + b, 0), + byStatus: taskCounts, + }, + findings: { + total: Number(findingStats[0]?.total ?? 0), + duplicates: Number(findingStats[0]?.duplicates ?? 0), + avgReliability: findingStats[0]?.avgReliability + ? Number(Number(findingStats[0].avgReliability).toFixed(2)) + : null, + }, + sources: { + total: sourceStats, + }, + }; + }, + }; +} diff --git a/ui/src/api/research.ts b/ui/src/api/research.ts new file mode 100644 index 00000000000..9d9bc86bdb1 --- /dev/null +++ b/ui/src/api/research.ts @@ -0,0 +1,150 @@ +import type { + ResearchSession, + ResearchSessionDetail, + ResearchTask, + ResearchFinding, + ResearchSource, + CreateResearchSessionRequest, + UpdateResearchSessionRequest, + CreateResearchTaskRequest, + UpdateResearchTaskRequest, + CreateResearchFindingRequest, + MarkDuplicateRequest, + CreateResearchMemoryRequest, + ResearchDashboardSummary, +} from "@paperclipai/shared"; +import { api } from "./client"; + +export interface ResearchDashboard { + sessions: { + total: number; + byStatus: Record; + }; + tasks: { + total: number; + byStatus: Record; + }; + findings: { + total: number; + duplicates: number; + avgReliability: number | null; + }; + sources: { + total: number; + }; +} + +export interface PaginatedFindings { + items: ResearchFinding[]; + total: number; + limit: number; + offset: number; +} + +export interface PaginatedSessions { + items: ResearchSession[]; + total: number; + limit: number; + offset: number; +} + +export interface PaginatedFindings { + items: ResearchFinding[]; + total: number; + limit: number; + offset: number; +} + +export const researchApi = { + // Dashboard + dashboard: (companyId: string) => + api.get(`/companies/${companyId}/research/dashboard`), + + // Sessions + listSessions: (companyId: string, params?: { status?: string; limit?: number; offset?: number }) => { + const searchParams = new URLSearchParams(); + if (params?.status) searchParams.set("status", params.status); + if (params?.limit) searchParams.set("limit", String(params.limit)); + if (params?.offset) searchParams.set("offset", String(params.offset)); + const qs = searchParams.toString(); + return api.get(`/companies/${companyId}/research/sessions${qs ? `?${qs}` : ""}`); + }, + + createSession: (companyId: string, data: CreateResearchSessionRequest) => + api.post(`/companies/${companyId}/research/sessions`, data), + + getSession: (companyId: string, sessionId: string) => + api.get( + `/companies/${companyId}/research/sessions/${sessionId}` + ), + + updateSession: (companyId: string, sessionId: string, data: UpdateResearchSessionRequest) => + api.patch(`/companies/${companyId}/research/sessions/${sessionId}`, data), + + deleteSession: (companyId: string, sessionId: string) => + api.delete(`/companies/${companyId}/research/sessions/${sessionId}`), + + startSession: (companyId: string, sessionId: string) => + api.post<{ started: boolean; sessionId: string }>(`/companies/${companyId}/research/sessions/${sessionId}/start`), + + cancelSession: (companyId: string, sessionId: string) => + api.post<{ cancelled: boolean; sessionId: string }>(`/companies/${companyId}/research/sessions/${sessionId}/cancel`), + + resumeSession: (companyId: string, sessionId: string) => + api.post<{ resumed: boolean; sessionId: string }>(`/companies/${companyId}/research/sessions/${sessionId}/resume`), + + retryTask: (companyId: string, sessionId: string, taskId: string) => + api.post<{ retried: boolean; taskId: string; sessionId: string }>(`/companies/${companyId}/research/sessions/${sessionId}/tasks/${taskId}/retry`), + + // Tasks + listTasks: (companyId: string, sessionId: string) => + api.get(`/companies/${companyId}/research/sessions/${sessionId}/tasks`), + + createTask: (companyId: string, sessionId: string, data: CreateResearchTaskRequest) => + api.post(`/companies/${companyId}/research/sessions/${sessionId}/tasks`, data), + + getTask: (companyId: string, taskId: string) => + api.get(`/companies/${companyId}/research/tasks/${taskId}`), + + updateTask: (companyId: string, taskId: string, data: UpdateResearchTaskRequest) => + api.patch(`/companies/${companyId}/research/tasks/${taskId}`, data), + + // Findings + listFindings: (companyId: string, taskId: string, params?: { limit?: number; offset?: number }) => { + const searchParams = new URLSearchParams(); + if (params?.limit) searchParams.set("limit", String(params.limit)); + if (params?.offset) searchParams.set("offset", String(params.offset)); + const qs = searchParams.toString(); + return api.get(`/companies/${companyId}/research/tasks/${taskId}/findings${qs ? `?${qs}` : ""}`); + }, + + createFinding: (companyId: string, data: CreateResearchFindingRequest) => + api.post(`/companies/${companyId}/research/findings`, data), + + markDuplicate: (companyId: string, findingId: string, data: MarkDuplicateRequest) => + api.post(`/companies/${companyId}/research/findings/${findingId}/mark-duplicate`, data), + + // Sources + listSources: (companyId: string, sessionId: string) => + api.get(`/companies/${companyId}/research/sessions/${sessionId}/sources`), + + // Engine + startSession: (companyId: string, sessionId: string) => + api.post<{ started: boolean; sessionId: string }>(`/companies/${companyId}/research/sessions/${sessionId}/start`, {}), + + cancelSession: (companyId: string, sessionId: string) => + api.post<{ cancelled: boolean; sessionId: string }>(`/companies/${companyId}/research/sessions/${sessionId}/cancel`, {}), + + // Subtopic Generation + generateSubtopics: (companyId: string, data: { query: string; depth?: string; maxSubtopics?: number }) => + api.post<{ strategy: string; subtopics: Array<{ id: string; title: string; description: string; priority: number }> }>(`/companies/${companyId}/research/generate-subtopics`, data), + + // Memory + getMemory: (companyId: string, key?: string) => { + const qs = key ? `?key=${encodeURIComponent(key)}` : ""; + return api.get(`/companies/${companyId}/research/memory${qs}`); + }, + + setMemory: (companyId: string, data: CreateResearchMemoryRequest) => + api.post(`/companies/${companyId}/research/memory`, data), +}; diff --git a/ui/src/components/ResearchTaskList.tsx b/ui/src/components/ResearchTaskList.tsx new file mode 100644 index 00000000000..372e66e55af --- /dev/null +++ b/ui/src/components/ResearchTaskList.tsx @@ -0,0 +1,188 @@ +import { useState } from "react"; +import { useQuery, useMutation, useQueryClient } from "@tanstack/react-query"; +import { queryKeys } from "../lib/queryKeys"; +import { researchApi } from "../api/research"; +import { PageSkeleton } from "./PageSkeleton"; +import { EmptyState } from "./EmptyState"; +import { Button } from "@/components/ui/button"; +import { + ClipboardList, + Clock, + CheckCircle, + AlertTriangle, + SkipForward, + PlayCircle, + ChevronDown, + ChevronRight, + RefreshCw, +} from "lucide-react"; +import type { ResearchTask } from "@paperclipai/shared"; + +function TaskStatusIcon({ status }: { status: string }) { + if (status === "completed") return ; + if (status === "running") return ; + if (status === "failed") return ; + if (status === "skipped") return ; + return ; +} + +function TaskStatusBadge({ status }: { status: string }) { + const styles: Record = { + pending: "bg-amber-100 text-amber-700 dark:bg-amber-900/40 dark:text-amber-300", + running: "bg-blue-100 text-blue-700 dark:bg-blue-900/40 dark:text-blue-300", + completed: "bg-green-100 text-green-700 dark:bg-green-900/40 dark:text-green-300", + failed: "bg-red-100 text-red-700 dark:bg-red-900/40 dark:text-red-300", + skipped: "bg-slate-100 text-slate-700 dark:bg-slate-800 dark:text-slate-300", + }; + return ( + + + {status.charAt(0).toUpperCase() + status.slice(1)} + + ); +} + +interface ResearchTaskListProps { + companyId: string; + sessionId: string; +} + +export function ResearchTaskList({ companyId, sessionId }: ResearchTaskListProps) { + const [expandedTaskId, setExpandedTaskId] = useState(null); + const queryClient = useQueryClient(); + + const { data: tasks, isLoading, error } = useQuery({ + queryKey: queryKeys.research.tasks(companyId, sessionId), + queryFn: () => researchApi.listTasks(companyId, sessionId), + enabled: !!companyId && !!sessionId, + }); + + const retryMutation = useMutation({ + mutationFn: (taskId: string) => researchApi.retryTask(companyId, sessionId, taskId), + onSuccess: () => { + // Invalidate tasks and session queries to refresh UI + queryClient.invalidateQueries({ queryKey: queryKeys.research.tasks(companyId, sessionId) }); + queryClient.invalidateQueries({ queryKey: queryKeys.research.session(companyId, sessionId) }); + }, + }); + + if (isLoading) { + return ; + } + + if (error) { + return ( +
+

{error.message}

+
+ ); + } + + const taskList = tasks ?? []; + + if (taskList.length === 0) { + return ( + + ); + } + + return ( +
+ {taskList.map((task: ResearchTask) => ( +
+ + )} + {task.reliabilityScore != null && ( +
+ Reliability: {Math.round(task.reliabilityScore * 100)}% +
+ )} +
+ + + {expandedTaskId === task.id && ( +
+ {task.findingsSummary && ( +
+

+ Findings Summary +

+

{task.findingsSummary}

+
+ )} + + {task.sources && task.sources.length > 0 && ( +
+

+ Sources +

+
+ {task.sources.map((source, idx) => ( + + {source.title || source.url} + + ))} +
+
+ )} + +
+ {task.startedAt && ( + Started: {new Date(task.startedAt).toLocaleDateString()} + )} + {task.completedAt && ( + Completed: {new Date(task.completedAt).toLocaleDateString()} + )} +
+
+ )} +
+ ))} + + ); +} diff --git a/ui/src/pages/ResearchSessionDetail.tsx b/ui/src/pages/ResearchSessionDetail.tsx new file mode 100644 index 00000000000..f135368a74c --- /dev/null +++ b/ui/src/pages/ResearchSessionDetail.tsx @@ -0,0 +1,500 @@ +import { useEffect, useState } from "react"; +import { useParams, Link } from "@/lib/router"; +import { useQuery, useMutation, useQueryClient } from "@tanstack/react-query"; +import { useCompany } from "../context/CompanyContext"; +import { useBreadcrumbs } from "../context/BreadcrumbContext"; +import { queryKeys } from "../lib/queryKeys"; +import { researchApi } from "../api/research"; +import { PageSkeleton } from "../components/PageSkeleton"; +import { EmptyState } from "../components/EmptyState"; +import { PageTabBar } from "../components/PageTabBar"; +import { Tabs, TabsContent } from "@/components/ui/tabs"; +import { Button } from "@/components/ui/button"; +import { + FlaskConical, + Clock, + CheckCircle, + AlertTriangle, + PauseCircle, + PlayCircle, + Calendar, + BarChart3, + Layers, + Loader2, + XCircle, + Edit3, + Save, + RotateCcw, + Eye, + Pencil, +} from "lucide-react"; +import type { ResearchSessionDetail as ResearchSessionDetailType } from "@paperclipai/shared"; +import { ResearchTaskList } from "../components/ResearchTaskList"; +import { ResearchFindingsList } from "../components/ResearchFindingsList"; +import { ResearchSourcesList } from "../components/ResearchSourcesList"; +import { ResearchMemoryView } from "../components/ResearchMemoryView"; + +function SessionStatusIcon({ status }: { status: string }) { + if (status === "completed") return ; + if (status === "cancelled") return ; + if (status === "running") return ; + if (status === "failed") return ; + if (status === "paused") return ; + return ; +} + +function SessionStatusBadge({ status }: { status: string }) { + const styles: Record = { + planning: "bg-slate-100 text-slate-700 dark:bg-slate-800 dark:text-slate-300", + running: "bg-blue-100 text-blue-700 dark:bg-blue-900/40 dark:text-blue-300", + paused: "bg-amber-100 text-amber-700 dark:bg-amber-900/40 dark:text-amber-300", + completed: "bg-green-100 text-green-700 dark:bg-green-900/40 dark:text-green-300", + failed: "bg-red-100 text-red-700 dark:bg-red-900/40 dark:text-red-300", + cancelled: "bg-orange-100 text-orange-700 dark:bg-orange-900/40 dark:text-orange-300", + }; + return ( + + + {status.charAt(0).toUpperCase() + status.slice(1)} + + ); +} + +function ProgressBar({ percent }: { percent: number }) { + return ( +
+
+
+ ); +} + +type ResearchTab = "overview" | "tasks" | "findings" | "sources" | "memory"; + +export function ResearchSessionDetail() { + const { sessionId } = useParams<{ sessionId: string }>(); + const { selectedCompanyId } = useCompany(); + const { setBreadcrumbs } = useBreadcrumbs(); + const queryClient = useQueryClient(); + const [activeTab, setActiveTab] = useState("overview"); + + const { + data: session, + isLoading, + error, + } = useQuery({ + queryKey: queryKeys.research.session(selectedCompanyId!, sessionId!), + queryFn: () => researchApi.getSession(selectedCompanyId!, sessionId!), + enabled: !!selectedCompanyId && !!sessionId, + refetchInterval: (query) => { + const data = query.state.data as ResearchSessionDetailType | undefined; + if (data?.status === "running" || data?.status === "cancelling") return 2000; + return false; + }, + }); + + const startMutation = useMutation({ + mutationFn: () => researchApi.startSession(selectedCompanyId!, sessionId!), + onSuccess: () => { + queryClient.invalidateQueries({ queryKey: queryKeys.research.session(selectedCompanyId!, sessionId!) }); + }, + }); + + const cancelMutation = useMutation({ + mutationFn: () => researchApi.cancelSession(selectedCompanyId!, sessionId!), + onSuccess: () => { + queryClient.invalidateQueries({ queryKey: queryKeys.research.session(selectedCompanyId!, sessionId!) }); + }, + }); + + const resumeMutation = useMutation({ + mutationFn: () => researchApi.resumeSession(selectedCompanyId!, sessionId!), + onSuccess: () => { + queryClient.invalidateQueries({ queryKey: queryKeys.research.session(selectedCompanyId!, sessionId!) }); + queryClient.invalidateQueries({ queryKey: queryKeys.research.sessions(selectedCompanyId!) }); + }, + }); + + useEffect(() => { + setBreadcrumbs([ + { label: "Research", href: "/research" }, + { label: "Sessions", href: "/research/sessions" }, + { label: session?.title ?? sessionId ?? "Session" }, + ]); + }, [setBreadcrumbs, session, sessionId]); + + if (!selectedCompanyId || !sessionId) { + return ( + + ); + } + + if (isLoading) { + return ; + } + + if (error) { + return ( +
+

{error.message}

+
+ ); + } + + if (!session) { + return ( + + ); + } + + const tabs = [ + { value: "overview", label: "Overview" }, + { value: "tasks", label: `Tasks (${session.tasks?.length ?? 0})` }, + { value: "findings", label: `Findings (${session.findings?.length ?? 0})` }, + { value: "sources", label: `Sources (${session.sources?.length ?? 0})` }, + { value: "memory", label: "Memory" }, + ]; + + return ( +
+ {/* Header */} +
+
+
+ + + {session.depth} depth + + {session.progressPercent > 0 && ( + + {session.progressPercent}% complete + + )} +
+
+ {session.status === "planning" && ( + + )} + {(session.status === "running" || session.status === "cancelling") && ( + + )} + {(session.status === "cancelled" || session.status === "failed") && ( + + )} +
+
+ +

{session.title}

+ +

{session.query}

+ + {session.progressPercent > 0 && ( +
+ +
+ )} + +
+ + + Created {new Date(session.createdAt).toLocaleDateString()} + + + + Updated {new Date(session.updatedAt).toLocaleDateString()} + + {session.startedAt && ( + + + Started {new Date(session.startedAt).toLocaleDateString()} + + )} +
+
+ + {/* Tabs */} + setActiveTab(v as ResearchTab)}> + setActiveTab(v as ResearchTab)} + /> + + + + + + + + + + + + + + + + + + + + + +
+ ); +} + +function OverviewTab({ session, companyId }: { session: ResearchSessionDetailType; companyId: string }) { + const queryClient = useQueryClient(); + const [isEditing, setIsEditing] = useState(false); + const [editContent, setEditContent] = useState(session.report || ""); + const [showOriginal, setShowOriginal] = useState(false); + + const updateMutation = useMutation({ + mutationFn: (report: string) => + researchApi.updateSession(companyId, session.id, { report }), + onSuccess: () => { + queryClient.invalidateQueries({ queryKey: queryKeys.research.session(companyId, session.id) }); + setIsEditing(false); + }, + }); + + const handleSave = () => { + updateMutation.mutate(editContent); + }; + + const handleCancel = () => { + setEditContent(session.report || ""); + setIsEditing(false); + setShowOriginal(false); + }; + + const handleEdit = () => { + setEditContent(session.report || ""); + setIsEditing(true); + setShowOriginal(false); + }; + + const displayReport = showOriginal && session.originalReport + ? session.originalReport + : session.report; + + return ( +
+ {/* Plan */} + {session.plan && session.plan.subtopics.length > 0 && ( +
+

+ + Research Plan +

+

{session.plan.strategy}

+
+ {session.plan.subtopics.map((subtopic, idx) => ( +
+ + {idx + 1} + +
+

{subtopic.title}

+

{subtopic.description}

+
+
+ ))} +
+
+ )} + + {/* Report */} + {session.report && ( +
+
+
+

+ + Research Report +

+ {session.isEdited && ( + + + User Modified + + )} +
+
+ {!isEditing && session.originalReport && ( + + )} + {!isEditing && ( + + )} +
+
+ + {showOriginal && session.originalReport && ( +
+

+ Viewing original generated report +

+
+ )} + + {isEditing ? ( +
+