diff --git a/.env.example b/.env.example index ffedda5..4bbab9d 100644 --- a/.env.example +++ b/.env.example @@ -22,6 +22,12 @@ DIRECTORY_ENCRYPTION_KEY=change-me-to-a-long-random-32-char-secret # re-encrypted (npm run reencrypt:directory). Remove it once rotation is done. # DIRECTORY_ENCRYPTION_KEY_PREVIOUS= +# Optional. Secret for the scheduler tick endpoint (POST /api/cron). Set it, +# then have an external cron call the endpoint on an interval with header +# `authorization: Bearer `. Without it, /api/cron returns 503. +# Generate with: openssl rand -base64 32 +CRON_SECRET= + # Optional. Enables Have I Been Pwned breach lookups. HIBP_API_KEY= diff --git a/docs/scheduler.md b/docs/scheduler.md new file mode 100644 index 0000000..121e0c9 --- /dev/null +++ b/docs/scheduler.md @@ -0,0 +1,52 @@ +# Scheduling (auto scan + sync) + +DataShield does periodic work, exposure scans and directory syncs, without a +dedicated worker process. The model is "internal cron + table": schedule state +lives in the database, and a single tick endpoint advances it. An external +scheduler calls the endpoint on a fixed interval. + +## Configuration + +- **Directory sync** (per connection): `DirectoryConnection.autoSyncIntervalMinutes` + (null = disabled). Set it via `PATCH /api/directory/:id` + `{ "autoSyncIntervalMinutes": 60 }`. SCIM connections are push-based and + cannot be put on a pull schedule. +- **Exposure scan** (per company): `Company.scanIntervalMinutes` (null = + disabled). Set it via `PATCH /api/company` `{ "scanIntervalMinutes": 1440 }`. + +Both intervals must be `null` or an integer >= 5 minutes. + +## The tick endpoint + +`POST /api/cron`, authenticated with `CRON_SECRET`: + +``` +curl -X POST -H "authorization: Bearer $CRON_SECRET" https://your-host/api/cron +``` + +Without `CRON_SECRET` set, the endpoint returns 503. Each tick: + +1. Enqueues a `SyncJob` for every connection whose `autoSyncIntervalMinutes` + has elapsed since `lastSyncAt`. +2. Starts an exposure scan for every company whose `scanIntervalMinutes` has + elapsed since `lastScanAt` (stamped up front so a slow scan is not + re-triggered). +3. Drains the sync queue (`processSyncJobs`), which runs syncs with retry and + backoff (see #54). + +## Driving the tick + +Run the call on whatever cron you have, e.g. every 5 minutes: + +- **System cron**: `*/5 * * * * curl -fsS -X POST -H "authorization: Bearer $CRON_SECRET" https://your-host/api/cron` +- **Vercel Cron / similar**: schedule a POST to `/api/cron` and inject the + header. + +The tick is idempotent: nothing runs before its interval elapses, and sync +concurrency is guarded by the job claim, so overlapping ticks are safe. + +## Notes + +- Directory sync is durable (queued, retried). Auto-scan currently runs + detached (fire-and-forget) and is not yet retried; a failed scheduled scan is + retried on the next due tick. diff --git a/prisma/migrations/20260619110000_add_schedule_config/migration.sql b/prisma/migrations/20260619110000_add_schedule_config/migration.sql new file mode 100644 index 0000000..50a171e --- /dev/null +++ b/prisma/migrations/20260619110000_add_schedule_config/migration.sql @@ -0,0 +1,6 @@ +-- AlterTable +ALTER TABLE "Company" ADD COLUMN "scanIntervalMinutes" INTEGER, +ADD COLUMN "lastScanAt" TIMESTAMP(3); + +-- AlterTable +ALTER TABLE "DirectoryConnection" ADD COLUMN "autoSyncIntervalMinutes" INTEGER; diff --git a/prisma/schema.prisma b/prisma/schema.prisma index b654276..d8964d0 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -11,6 +11,10 @@ model Company { name String domain String @unique logoUrl String? + // Auto-scan cadence in minutes (null = disabled); lastScanAt tracks the last + // scheduler-triggered scan so the next one is only due after the interval. + scanIntervalMinutes Int? + lastScanAt DateTime? createdAt DateTime @default(now()) updatedAt DateTime @updatedAt @@ -33,6 +37,9 @@ model DirectoryConnection { lastSyncAt DateTime? lastSyncCount Int? errorMessage String? + // Auto-sync cadence in minutes (null = disabled). The scheduler enqueues a + // SyncJob once lastSyncAt is older than this interval. + autoSyncIntervalMinutes Int? createdAt DateTime @default(now()) updatedAt DateTime @updatedAt diff --git a/src/app/api/company/route.test.ts b/src/app/api/company/route.test.ts new file mode 100644 index 0000000..f855b1b --- /dev/null +++ b/src/app/api/company/route.test.ts @@ -0,0 +1,49 @@ +import { describe, it, expect, vi, beforeEach } from "vitest" + +const requireAdmin = vi.fn() +const update = vi.fn() + +vi.mock("@/lib/apiAuth", () => ({ requireAdmin: () => requireAdmin() })) +vi.mock("@/lib/prisma", () => ({ + prisma: { company: { update: (a: unknown) => update(a) } }, +})) + +import { PATCH } from "./route" + +function patch(body: unknown): Request { + return new Request("http://localhost/api/company", { + method: "PATCH", + body: JSON.stringify(body), + }) +} + +beforeEach(() => { + vi.clearAllMocks() + requireAdmin.mockResolvedValue({ session: { user: { companyId: "co1" } }, error: null }) +}) + +describe("PATCH /api/company", () => { + it("rejects a non-integer interval", async () => { + const res = await PATCH(patch({ scanIntervalMinutes: 2.5 })) + expect(res.status).toBe(400) + expect(update).not.toHaveBeenCalled() + }) + + it("sets the scan interval scoped to the caller's company", async () => { + const res = await PATCH(patch({ scanIntervalMinutes: 1440 })) + expect(res.status).toBe(200) + expect(update).toHaveBeenCalledWith({ + where: { id: "co1" }, + data: { scanIntervalMinutes: 1440 }, + }) + }) + + it("disables scheduled scans with null", async () => { + const res = await PATCH(patch({ scanIntervalMinutes: null })) + expect(res.status).toBe(200) + expect(update).toHaveBeenCalledWith({ + where: { id: "co1" }, + data: { scanIntervalMinutes: null }, + }) + }) +}) diff --git a/src/app/api/company/route.ts b/src/app/api/company/route.ts new file mode 100644 index 0000000..7677424 --- /dev/null +++ b/src/app/api/company/route.ts @@ -0,0 +1,32 @@ +import { NextResponse } from "next/server" +import { requireAdmin } from "@/lib/apiAuth" +import { prisma } from "@/lib/prisma" + +const MIN_INTERVAL_MINUTES = 5 + +function parseInterval(value: unknown): number | null | undefined { + if (value === null) return null + if (typeof value === "number" && Number.isInteger(value) && value >= MIN_INTERVAL_MINUTES) + return value + return undefined +} + +// Set the company-wide auto-scan cadence. null disables scheduled scans. +export async function PATCH(req: Request) { + const { session, error } = await requireAdmin() + if (error) return error + + const body = (await req.json()) as { scanIntervalMinutes?: unknown } + const interval = parseInterval(body.scanIntervalMinutes) + if (interval === undefined) + return NextResponse.json( + { error: `scanIntervalMinutes must be null or an integer >= ${MIN_INTERVAL_MINUTES}` }, + { status: 400 } + ) + + await prisma.company.update({ + where: { id: session.user.companyId }, + data: { scanIntervalMinutes: interval }, + }) + return NextResponse.json({ scanIntervalMinutes: interval }) +} diff --git a/src/app/api/cron/route.test.ts b/src/app/api/cron/route.test.ts new file mode 100644 index 0000000..a98db05 --- /dev/null +++ b/src/app/api/cron/route.test.ts @@ -0,0 +1,43 @@ +import { describe, it, expect, vi, beforeEach } from "vitest" + +const runDueSchedules = vi.fn() +vi.mock("@/lib/scheduler", () => ({ runDueSchedules: () => runDueSchedules() })) + +import { POST } from "./route" + +function req(auth?: string): Request { + const headers = new Headers() + if (auth !== undefined) headers.set("authorization", auth) + return new Request("http://localhost/api/cron", { method: "POST", headers }) +} + +beforeEach(() => { + vi.clearAllMocks() + runDueSchedules.mockResolvedValue({ syncsEnqueued: 1, scansStarted: 0, jobsProcessed: 1 }) +}) + +describe("POST /api/cron", () => { + it("returns 503 when CRON_SECRET is unset", async () => { + delete process.env.CRON_SECRET + const res = await POST(req("Bearer x")) + expect(res.status).toBe(503) + expect(runDueSchedules).not.toHaveBeenCalled() + }) + + it("returns 401 on a missing or wrong secret", async () => { + process.env.CRON_SECRET = "right" + expect((await POST(req())).status).toBe(401) + expect((await POST(req("Bearer wrong"))).status).toBe(401) + expect(runDueSchedules).not.toHaveBeenCalled() + delete process.env.CRON_SECRET + }) + + it("runs the scheduler on a valid secret", async () => { + process.env.CRON_SECRET = "right" + const res = await POST(req("Bearer right")) + expect(res.status).toBe(200) + expect(await res.json()).toEqual({ syncsEnqueued: 1, scansStarted: 0, jobsProcessed: 1 }) + expect(runDueSchedules).toHaveBeenCalled() + delete process.env.CRON_SECRET + }) +}) diff --git a/src/app/api/cron/route.ts b/src/app/api/cron/route.ts new file mode 100644 index 0000000..4a775d3 --- /dev/null +++ b/src/app/api/cron/route.ts @@ -0,0 +1,26 @@ +import { timingSafeEqual } from "crypto" +import { NextResponse } from "next/server" +import { runDueSchedules } from "@/lib/scheduler" + +function safeEqual(a: string, b: string): boolean { + const bufA = Buffer.from(a) + const bufB = Buffer.from(b) + if (bufA.length !== bufB.length) return false + return timingSafeEqual(bufA, bufB) +} + +// Scheduler tick. Meant to be called by an external cron (system cron, Vercel +// Cron, uptime pinger) on a fixed interval, authenticated with CRON_SECRET: +// curl -X POST -H "authorization: Bearer $CRON_SECRET" https://host/api/cron +export async function POST(req: Request) { + const secret = process.env.CRON_SECRET + if (!secret) + return NextResponse.json({ error: "Scheduler not configured" }, { status: 503 }) + + const provided = (req.headers.get("authorization") ?? "").match(/^Bearer\s+(\S.*)$/i)?.[1] + if (!provided || !safeEqual(provided, secret)) + return NextResponse.json({ error: "Unauthorized" }, { status: 401 }) + + const result = await runDueSchedules() + return NextResponse.json(result) +} diff --git a/src/app/api/directory/[id]/route.test.ts b/src/app/api/directory/[id]/route.test.ts new file mode 100644 index 0000000..6148342 --- /dev/null +++ b/src/app/api/directory/[id]/route.test.ts @@ -0,0 +1,69 @@ +import { describe, it, expect, vi, beforeEach } from "vitest" + +const requireAdmin = vi.fn() +const findFirst = vi.fn() +const update = vi.fn() + +vi.mock("@/lib/apiAuth", () => ({ requireAdmin: () => requireAdmin() })) +vi.mock("@/lib/prisma", () => ({ + prisma: { + directoryConnection: { + findFirst: (a: unknown) => findFirst(a), + update: (a: unknown) => update(a), + }, + }, +})) + +import { PATCH } from "./route" + +const params = Promise.resolve({ id: "c1" }) +function patch(body: unknown): Request { + return new Request("http://localhost/api/directory/c1", { + method: "PATCH", + body: JSON.stringify(body), + }) +} + +beforeEach(() => { + vi.clearAllMocks() + requireAdmin.mockResolvedValue({ session: { user: { companyId: "co1" } }, error: null }) + findFirst.mockResolvedValue({ type: "OKTA" }) +}) + +describe("PATCH /api/directory/[id]", () => { + it("rejects an interval below the minimum", async () => { + const res = await PATCH(patch({ autoSyncIntervalMinutes: 1 }), { params }) + expect(res.status).toBe(400) + expect(update).not.toHaveBeenCalled() + }) + + it("returns 404 for a connection outside the company", async () => { + findFirst.mockResolvedValue(null) + const res = await PATCH(patch({ autoSyncIntervalMinutes: 60 }), { params }) + expect(res.status).toBe(404) + }) + + it("rejects a pull schedule on a SCIM connection", async () => { + findFirst.mockResolvedValue({ type: "SCIM" }) + const res = await PATCH(patch({ autoSyncIntervalMinutes: 60 }), { params }) + expect(res.status).toBe(400) + expect(update).not.toHaveBeenCalled() + }) + + it("sets the interval", async () => { + const res = await PATCH(patch({ autoSyncIntervalMinutes: 60 }), { params }) + expect(res.status).toBe(200) + expect(update).toHaveBeenCalledWith( + expect.objectContaining({ where: { id: "c1" }, data: { autoSyncIntervalMinutes: 60 } }) + ) + }) + + it("disables auto-sync with null (allowed for SCIM too)", async () => { + findFirst.mockResolvedValue({ type: "SCIM" }) + const res = await PATCH(patch({ autoSyncIntervalMinutes: null }), { params }) + expect(res.status).toBe(200) + expect(update).toHaveBeenCalledWith( + expect.objectContaining({ data: { autoSyncIntervalMinutes: null } }) + ) + }) +}) diff --git a/src/app/api/directory/[id]/route.ts b/src/app/api/directory/[id]/route.ts index 24cfbb2..daa3899 100644 --- a/src/app/api/directory/[id]/route.ts +++ b/src/app/api/directory/[id]/route.ts @@ -2,6 +2,52 @@ import { NextResponse } from "next/server" import { requireAdmin } from "@/lib/apiAuth" import { prisma } from "@/lib/prisma" +const MIN_INTERVAL_MINUTES = 5 + +// Validate an interval field: null disables, otherwise an integer >= the floor. +function parseInterval(value: unknown): number | null | undefined { + if (value === null) return null + if (typeof value === "number" && Number.isInteger(value) && value >= MIN_INTERVAL_MINUTES) + return value + return undefined // invalid +} + +// Set the auto-sync cadence for a connection. SCIM is push-based, so it cannot +// be put on a pull schedule. +export async function PATCH( + req: Request, + { params }: { params: Promise<{ id: string }> } +) { + const { session, error } = await requireAdmin() + if (error) return error + + const { id } = await params + const body = (await req.json()) as { autoSyncIntervalMinutes?: unknown } + const interval = parseInterval(body.autoSyncIntervalMinutes) + if (interval === undefined) + return NextResponse.json( + { error: `autoSyncIntervalMinutes must be null or an integer >= ${MIN_INTERVAL_MINUTES}` }, + { status: 400 } + ) + + const connection = await prisma.directoryConnection.findFirst({ + where: { id, companyId: session.user.companyId }, + select: { type: true }, + }) + if (!connection) return NextResponse.json({ error: "Not found" }, { status: 404 }) + if (connection.type === "SCIM" && interval !== null) + return NextResponse.json( + { error: "SCIM connections are push-based and cannot be auto-synced" }, + { status: 400 } + ) + + await prisma.directoryConnection.update({ + where: { id }, + data: { autoSyncIntervalMinutes: interval }, + }) + return NextResponse.json({ id, autoSyncIntervalMinutes: interval }) +} + export async function DELETE( _req: Request, { params }: { params: Promise<{ id: string }> } diff --git a/src/lib/scheduler.test.ts b/src/lib/scheduler.test.ts new file mode 100644 index 0000000..810e920 --- /dev/null +++ b/src/lib/scheduler.test.ts @@ -0,0 +1,88 @@ +import { describe, it, expect, vi, beforeEach } from "vitest" + +const connFindMany = vi.fn() +const companyFindMany = vi.fn() +const companyUpdate = vi.fn() +const enqueueSyncJob = vi.fn() +const processSyncJobs = vi.fn() +const loadActiveProviders = vi.fn() +const runScan = vi.fn() + +vi.mock("@/lib/prisma", () => ({ + prisma: { + directoryConnection: { findMany: (a: unknown) => connFindMany(a) }, + company: { + findMany: (a: unknown) => companyFindMany(a), + update: (a: unknown) => companyUpdate(a), + }, + }, +})) +vi.mock("./directory/jobs", () => ({ + enqueueSyncJob: (id: string) => enqueueSyncJob(id), + processSyncJobs: () => processSyncJobs(), +})) +vi.mock("./scan/runner", () => ({ + loadActiveProviders: (id: string) => loadActiveProviders(id), + runScan: (id: string, p: unknown) => runScan(id, p), +})) + +import { isDue, runDueSchedules } from "./scheduler" + +const NOW = new Date("2026-06-19T12:00:00Z") + +describe("isDue", () => { + it("is due when never run", () => { + expect(isDue(null, 60, NOW)).toBe(true) + }) + it("is due once the interval has elapsed", () => { + expect(isDue(new Date("2026-06-19T11:00:00Z"), 60, NOW)).toBe(true) + }) + it("is not due before the interval", () => { + expect(isDue(new Date("2026-06-19T11:30:00Z"), 60, NOW)).toBe(false) + }) +}) + +describe("runDueSchedules", () => { + beforeEach(() => { + vi.clearAllMocks() + connFindMany.mockResolvedValue([]) + companyFindMany.mockResolvedValue([]) + processSyncJobs.mockResolvedValue({ processed: 0 }) + runScan.mockResolvedValue({ scanned: 0, newRecords: 0, newAlerts: 0 }) + }) + + it("enqueues only connections whose interval has elapsed", async () => { + connFindMany.mockResolvedValue([ + { id: "due", lastSyncAt: new Date("2026-06-19T10:00:00Z"), autoSyncIntervalMinutes: 60 }, + { id: "fresh", lastSyncAt: new Date("2026-06-19T11:59:00Z"), autoSyncIntervalMinutes: 60 }, + ]) + const res = await runDueSchedules(NOW) + expect(enqueueSyncJob).toHaveBeenCalledTimes(1) + expect(enqueueSyncJob).toHaveBeenCalledWith("due") + expect(res.syncsEnqueued).toBe(1) + }) + + it("starts a due scan only when providers are configured", async () => { + companyFindMany.mockResolvedValue([ + { id: "co-go", lastScanAt: null, scanIntervalMinutes: 60 }, + { id: "co-nokey", lastScanAt: null, scanIntervalMinutes: 60 }, + ]) + loadActiveProviders.mockImplementation((id: string) => + id === "co-go" ? Promise.resolve([{ provider: {}, key: "k" }]) : Promise.resolve([]) + ) + const res = await runDueSchedules(NOW) + + expect(companyUpdate).toHaveBeenCalledWith( + expect.objectContaining({ where: { id: "co-go" }, data: { lastScanAt: NOW } }) + ) + expect(runScan).toHaveBeenCalledTimes(1) + expect(res.scansStarted).toBe(1) + }) + + it("drains the sync queue and reports the count", async () => { + processSyncJobs.mockResolvedValue({ processed: 4 }) + const res = await runDueSchedules(NOW) + expect(processSyncJobs).toHaveBeenCalled() + expect(res.jobsProcessed).toBe(4) + }) +}) diff --git a/src/lib/scheduler.ts b/src/lib/scheduler.ts new file mode 100644 index 0000000..e2ef2c4 --- /dev/null +++ b/src/lib/scheduler.ts @@ -0,0 +1,52 @@ +import { prisma } from "@/lib/prisma" +import { enqueueSyncJob, processSyncJobs } from "./directory/jobs" +import { loadActiveProviders, runScan } from "./scan/runner" + +// A task is due when it has never run, or the interval has elapsed since. +export function isDue(last: Date | null, intervalMinutes: number, now: Date): boolean { + if (!last) return true + return now.getTime() - last.getTime() >= intervalMinutes * 60_000 +} + +export type SchedulerResult = { + syncsEnqueued: number + scansStarted: number + jobsProcessed: number +} + +// Enqueue due directory syncs and start due company scans, then drain the sync +// queue. Idempotent per tick: nothing runs before its interval elapses. +// Designed to be called by an external cron hitting /api/cron. +export async function runDueSchedules(now: Date = new Date()): Promise { + // Directory syncs. SCIM is push-based (the IdP drives it), so it is excluded. + const connections = await prisma.directoryConnection.findMany({ + where: { autoSyncIntervalMinutes: { not: null }, type: { not: "SCIM" } }, + select: { id: true, lastSyncAt: true, autoSyncIntervalMinutes: true }, + }) + let syncsEnqueued = 0 + for (const c of connections) { + if (isDue(c.lastSyncAt, c.autoSyncIntervalMinutes!, now)) { + await enqueueSyncJob(c.id) + syncsEnqueued++ + } + } + + // Company exposure scans. Stamp lastScanAt up front so a slow scan is not + // re-triggered on the next tick; the scan itself runs detached. + const companies = await prisma.company.findMany({ + where: { scanIntervalMinutes: { not: null } }, + select: { id: true, lastScanAt: true, scanIntervalMinutes: true }, + }) + let scansStarted = 0 + for (const co of companies) { + if (!isDue(co.lastScanAt, co.scanIntervalMinutes!, now)) continue + const providers = await loadActiveProviders(co.id) + if (!providers.length) continue + await prisma.company.update({ where: { id: co.id }, data: { lastScanAt: now } }) + void runScan(co.id, providers).catch(() => {}) + scansStarted++ + } + + const { processed } = await processSyncJobs() + return { syncsEnqueued, scansStarted, jobsProcessed: processed } +}