diff --git a/prisma/migrations/20260619100000_add_sync_jobs/migration.sql b/prisma/migrations/20260619100000_add_sync_jobs/migration.sql new file mode 100644 index 0000000..58c898e --- /dev/null +++ b/prisma/migrations/20260619100000_add_sync_jobs/migration.sql @@ -0,0 +1,28 @@ +-- CreateEnum +CREATE TYPE "SyncJobStatus" AS ENUM ('PENDING', 'RUNNING', 'SUCCEEDED', 'FAILED'); + +-- CreateTable +CREATE TABLE "SyncJob" ( + "id" TEXT NOT NULL, + "connectionId" TEXT NOT NULL, + "status" "SyncJobStatus" NOT NULL DEFAULT 'PENDING', + "attempts" INTEGER NOT NULL DEFAULT 0, + "maxAttempts" INTEGER NOT NULL DEFAULT 3, + "lastError" TEXT, + "runAfter" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "startedAt" TIMESTAMP(3), + "finishedAt" TIMESTAMP(3), + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP(3) NOT NULL, + + CONSTRAINT "SyncJob_pkey" PRIMARY KEY ("id") +); + +-- CreateIndex +CREATE INDEX "SyncJob_status_runAfter_idx" ON "SyncJob"("status", "runAfter"); + +-- CreateIndex +CREATE INDEX "SyncJob_connectionId_idx" ON "SyncJob"("connectionId"); + +-- AddForeignKey +ALTER TABLE "SyncJob" ADD CONSTRAINT "SyncJob_connectionId_fkey" FOREIGN KEY ("connectionId") REFERENCES "DirectoryConnection"("id") ON DELETE CASCADE ON UPDATE CASCADE; diff --git a/prisma/schema.prisma b/prisma/schema.prisma index 88a806b..b654276 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -36,11 +36,38 @@ model DirectoryConnection { createdAt DateTime @default(now()) updatedAt DateTime @updatedAt - company Company @relation(fields: [companyId], references: [id], onDelete: Cascade) + company Company @relation(fields: [companyId], references: [id], onDelete: Cascade) + syncJobs SyncJob[] @@index([companyId]) } +model SyncJob { + id String @id @default(cuid()) + connectionId String + status SyncJobStatus @default(PENDING) + attempts Int @default(0) + maxAttempts Int @default(3) + lastError String? + runAfter DateTime @default(now()) + startedAt DateTime? + finishedAt DateTime? + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + + connection DirectoryConnection @relation(fields: [connectionId], references: [id], onDelete: Cascade) + + @@index([status, runAfter]) + @@index([connectionId]) +} + +enum SyncJobStatus { + PENDING + RUNNING + SUCCEEDED + FAILED +} + enum DirectoryType { AZURE_AD GOOGLE_WORKSPACE diff --git a/src/app/api/directory/[id]/sync/route.test.ts b/src/app/api/directory/[id]/sync/route.test.ts index 9dc2ada..83610ac 100644 --- a/src/app/api/directory/[id]/sync/route.test.ts +++ b/src/app/api/directory/[id]/sync/route.test.ts @@ -1,15 +1,17 @@ import { describe, it, expect, vi, beforeEach } from "vitest" const requireAdmin = vi.fn() -const queryRaw = vi.fn() -const transaction = vi.fn() -const syncDirectoryConnection = vi.fn() +const findFirst = vi.fn() +const enqueueSyncJob = vi.fn() +const processSyncJobs = vi.fn() vi.mock("@/lib/apiAuth", () => ({ requireAdmin: () => requireAdmin() })) -vi.mock("@/lib/prisma", () => ({ prisma: { $transaction: (cb: unknown) => transaction(cb) } })) -vi.mock("@/lib/directory/sync", () => ({ - syncDirectoryConnection: (id: string, companyId: string) => - syncDirectoryConnection(id, companyId), +vi.mock("@/lib/prisma", () => ({ + prisma: { directoryConnection: { findFirst: (a: unknown) => findFirst(a) } }, +})) +vi.mock("@/lib/directory/jobs", () => ({ + enqueueSyncJob: (id: string) => enqueueSyncJob(id), + processSyncJobs: () => processSyncJobs(), })) import { POST } from "./route" @@ -17,62 +19,46 @@ import { POST } from "./route" const params = Promise.resolve({ id: "conn-1" }) const req = new Request("http://localhost/api/directory/conn-1/sync", { method: "POST" }) -// $transaction runs the callback with a tx whose $queryRaw yields the lock result. -function withLock(locked: boolean) { - queryRaw.mockResolvedValue([{ locked }]) - transaction.mockImplementation((cb: (tx: unknown) => unknown) => - cb({ $queryRaw: () => queryRaw() }) - ) -} - beforeEach(() => { vi.clearAllMocks() requireAdmin.mockResolvedValue({ session: { user: { companyId: "co-1" } }, error: null }) + findFirst.mockResolvedValue({ id: "conn-1" }) + enqueueSyncJob.mockResolvedValue({ id: "job-1", status: "PENDING" }) + processSyncJobs.mockResolvedValue({ processed: 1 }) }) describe("POST /api/directory/[id]/sync", () => { - it("returns 401-style error from requireAdmin without touching the lock", async () => { - const error = NextResponseLike(401) + it("propagates the auth error without enqueueing", async () => { + const error = { status: 401 } as unknown requireAdmin.mockResolvedValue({ session: null, error }) const res = await POST(req, { params }) expect(res).toBe(error) - expect(transaction).not.toHaveBeenCalled() + expect(enqueueSyncJob).not.toHaveBeenCalled() }) - it("runs the sync when the advisory lock is acquired", async () => { - withLock(true) - syncDirectoryConnection.mockResolvedValue({ synced: 42 }) + it("returns 404 when the connection is not in the caller's company", async () => { + findFirst.mockResolvedValue(null) const res = await POST(req, { params }) - expect(syncDirectoryConnection).toHaveBeenCalledWith("conn-1", "co-1") - expect(res.status).toBe(200) - expect(await res.json()).toEqual({ synced: 42 }) + expect(res.status).toBe(404) + expect(enqueueSyncJob).not.toHaveBeenCalled() }) - it("returns 409 and skips the sync when the lock is held elsewhere", async () => { - withLock(false) - + it("enqueues a job and returns 202 with the job id", async () => { const res = await POST(req, { params }) - expect(syncDirectoryConnection).not.toHaveBeenCalled() - expect(res.status).toBe(409) - expect(await res.json()).toEqual({ error: "Sync already running" }) + expect(enqueueSyncJob).toHaveBeenCalledWith("conn-1") + expect(res.status).toBe(202) + expect(await res.json()).toEqual({ jobId: "job-1", status: "PENDING" }) }) - it("returns 500 when the sync throws", async () => { - withLock(true) - syncDirectoryConnection.mockRejectedValue(new Error("boom")) - - const res = await POST(req, { params }) - - expect(res.status).toBe(500) - expect(await res.json()).toEqual({ error: "boom" }) + it("scopes the lookup to the caller's company", async () => { + await POST(req, { params }) + expect(findFirst).toHaveBeenCalledWith( + expect.objectContaining({ where: { id: "conn-1", companyId: "co-1" } }) + ) }) }) - -function NextResponseLike(status: number) { - return { status, body: "auth" } as unknown -} diff --git a/src/app/api/directory/[id]/sync/route.ts b/src/app/api/directory/[id]/sync/route.ts index e7536cd..4d50993 100644 --- a/src/app/api/directory/[id]/sync/route.ts +++ b/src/app/api/directory/[id]/sync/route.ts @@ -1,7 +1,7 @@ import { NextResponse } from "next/server" import { requireAdmin } from "@/lib/apiAuth" import { prisma } from "@/lib/prisma" -import { syncDirectoryConnection } from "@/lib/directory/sync" +import { enqueueSyncJob, processSyncJobs } from "@/lib/directory/jobs" export async function POST( _req: Request, @@ -12,23 +12,21 @@ export async function POST( const { id } = await params - // Shared Postgres advisory lock: survives multi-replica, unlike an - // in-memory Set. xact_lock auto-releases on commit/rollback, so no stale - // lock is left behind if the process crashes during the sync. - try { - const result = await prisma.$transaction(async (tx) => { - const [{ locked }] = await tx.$queryRaw<{ locked: boolean }[]>` - SELECT pg_try_advisory_xact_lock(hashtextextended(${id}, 0)) AS locked - ` - if (!locked) return { conflict: true as const } - const synced = await syncDirectoryConnection(id, session.user.companyId) - return { conflict: false as const, synced } - }) + // Scope the connection to the caller's company before enqueueing. + const connection = await prisma.directoryConnection.findFirst({ + where: { id, companyId: session.user.companyId }, + select: { id: true }, + }) + if (!connection) + return NextResponse.json({ error: "Connection not found" }, { status: 404 }) - if (result.conflict) - return NextResponse.json({ error: "Sync already running" }, { status: 409 }) - return NextResponse.json(result.synced) - } catch (e: unknown) { - return NextResponse.json({ error: (e as Error).message }, { status: 500 }) - } + const job = await enqueueSyncJob(id) + + // Kick the queue without blocking the response. The sync runs outside the + // request cycle (no timeout risk on large directories); a scheduler also + // drains the queue, so a dropped fire-and-forget here is only a delay, not a + // lost job. Concurrency is handled by the job claim, not this call. + void processSyncJobs().catch(() => {}) + + return NextResponse.json({ jobId: job.id, status: job.status }, { status: 202 }) } diff --git a/src/lib/directory/jobs.test.ts b/src/lib/directory/jobs.test.ts new file mode 100644 index 0000000..7c2d4f7 --- /dev/null +++ b/src/lib/directory/jobs.test.ts @@ -0,0 +1,115 @@ +import { describe, it, expect, vi, beforeEach } from "vitest" + +const syncJobFindFirst = vi.fn() +const syncJobCreate = vi.fn() +const syncJobUpdate = vi.fn() +const connFindUnique = vi.fn() +const queryRaw = vi.fn() +const syncDirectoryConnection = vi.fn() + +vi.mock("@/lib/prisma", () => ({ + prisma: { + syncJob: { + findFirst: (a: unknown) => syncJobFindFirst(a), + create: (a: unknown) => syncJobCreate(a), + update: (a: unknown) => syncJobUpdate(a), + }, + directoryConnection: { findUnique: (a: unknown) => connFindUnique(a) }, + $queryRaw: () => queryRaw(), + }, +})) +vi.mock("./sync", () => ({ + syncDirectoryConnection: (id: string, companyId: string) => + syncDirectoryConnection(id, companyId), +})) + +import { backoffMs, enqueueSyncJob, processSyncJobs } from "./jobs" + +beforeEach(() => { + vi.clearAllMocks() + connFindUnique.mockResolvedValue({ companyId: "co-1" }) +}) + +describe("backoffMs", () => { + it("grows exponentially from 30s", () => { + expect(backoffMs(1)).toBe(30_000) + expect(backoffMs(2)).toBe(60_000) + expect(backoffMs(3)).toBe(120_000) + }) +}) + +describe("enqueueSyncJob", () => { + it("reuses an existing pending/running job", async () => { + syncJobFindFirst.mockResolvedValue({ id: "job-1", status: "PENDING" }) + const res = await enqueueSyncJob("c1") + expect(res).toEqual({ id: "job-1", status: "PENDING" }) + expect(syncJobCreate).not.toHaveBeenCalled() + }) + + it("creates a job when none is active", async () => { + syncJobFindFirst.mockResolvedValue(null) + syncJobCreate.mockResolvedValue({ id: "job-2", status: "PENDING" }) + const res = await enqueueSyncJob("c1") + expect(res).toEqual({ id: "job-2", status: "PENDING" }) + expect(syncJobCreate).toHaveBeenCalledWith( + expect.objectContaining({ data: { connectionId: "c1" } }) + ) + }) +}) + +describe("processSyncJobs", () => { + it("stops when there is no due job", async () => { + queryRaw.mockResolvedValue([]) + const res = await processSyncJobs() + expect(res).toEqual({ processed: 0 }) + expect(syncDirectoryConnection).not.toHaveBeenCalled() + }) + + it("marks a job SUCCEEDED on a successful sync", async () => { + queryRaw + .mockResolvedValueOnce([{ id: "j1", connectionId: "c1", attempts: 0, maxAttempts: 3 }]) + .mockResolvedValue([]) + syncDirectoryConnection.mockResolvedValue({ synced: 3 }) + + const res = await processSyncJobs() + + expect(res.processed).toBe(1) + expect(syncDirectoryConnection).toHaveBeenCalledWith("c1", "co-1") + expect(syncJobUpdate).toHaveBeenCalledWith( + expect.objectContaining({ + where: { id: "j1" }, + data: expect.objectContaining({ status: "SUCCEEDED", attempts: 1, lastError: null }), + }) + ) + }) + + it("re-queues with backoff when attempts remain", async () => { + queryRaw + .mockResolvedValueOnce([{ id: "j1", connectionId: "c1", attempts: 0, maxAttempts: 3 }]) + .mockResolvedValue([]) + syncDirectoryConnection.mockRejectedValue(new Error("boom")) + + await processSyncJobs() + + const arg = syncJobUpdate.mock.calls[0][0] + expect(arg.data.status).toBe("PENDING") + expect(arg.data.attempts).toBe(1) + expect(arg.data.lastError).toBe("boom") + expect(arg.data.runAfter).toBeInstanceOf(Date) + }) + + it("marks FAILED once attempts are exhausted", async () => { + queryRaw + .mockResolvedValueOnce([{ id: "j1", connectionId: "c1", attempts: 2, maxAttempts: 3 }]) + .mockResolvedValue([]) + syncDirectoryConnection.mockRejectedValue(new Error("still down")) + + await processSyncJobs() + + expect(syncJobUpdate).toHaveBeenCalledWith( + expect.objectContaining({ + data: expect.objectContaining({ status: "FAILED", attempts: 3, lastError: "still down" }), + }) + ) + }) +}) diff --git a/src/lib/directory/jobs.ts b/src/lib/directory/jobs.ts new file mode 100644 index 0000000..e1b6d73 --- /dev/null +++ b/src/lib/directory/jobs.ts @@ -0,0 +1,97 @@ +import { prisma } from "@/lib/prisma" +import { syncDirectoryConnection } from "./sync" +import type { SyncJobStatus } from "@prisma/client" + +const BASE_BACKOFF_MS = 30_000 + +// Exponential backoff between retries: 30s, 60s, 120s, ... +export function backoffMs(attempts: number): number { + return BASE_BACKOFF_MS * 2 ** Math.max(0, attempts - 1) +} + +// Enqueue a sync for a connection, unless one is already pending or running. +// The find-then-create has a small race window; a duplicate at worst causes a +// redundant re-sync, which is harmless (upserts are idempotent). +export async function enqueueSyncJob( + connectionId: string +): Promise<{ id: string; status: SyncJobStatus }> { + const existing = await prisma.syncJob.findFirst({ + where: { connectionId, status: { in: ["PENDING", "RUNNING"] } }, + select: { id: true, status: true }, + }) + if (existing) return existing + return prisma.syncJob.create({ + data: { connectionId }, + select: { id: true, status: true }, + }) +} + +type ClaimedJob = { + id: string + connectionId: string + attempts: number + maxAttempts: number +} + +// Atomically claim the next due job and mark it RUNNING. FOR UPDATE SKIP LOCKED +// lets multiple instances drain the queue without processing the same job twice. +async function claimNextJob(): Promise { + const rows = await prisma.$queryRaw` + UPDATE "SyncJob" + SET status = 'RUNNING', "startedAt" = now(), "updatedAt" = now() + WHERE id = ( + SELECT id FROM "SyncJob" + WHERE status = 'PENDING' AND "runAfter" <= now() + ORDER BY "runAfter" ASC + FOR UPDATE SKIP LOCKED + LIMIT 1 + ) + RETURNING id, "connectionId", attempts, "maxAttempts" + ` + return rows[0] ?? null +} + +async function runJob(job: ClaimedJob): Promise { + const attempts = job.attempts + 1 + try { + const conn = await prisma.directoryConnection.findUnique({ + where: { id: job.connectionId }, + select: { companyId: true }, + }) + if (!conn) throw new Error("Connection not found") + + await syncDirectoryConnection(job.connectionId, conn.companyId) + await prisma.syncJob.update({ + where: { id: job.id }, + data: { status: "SUCCEEDED", attempts, lastError: null, finishedAt: new Date() }, + }) + } catch (e: unknown) { + const message = (e as Error)?.message ?? "Unknown error" + const exhausted = attempts >= job.maxAttempts + await prisma.syncJob.update({ + where: { id: job.id }, + data: exhausted + ? { status: "FAILED", attempts, lastError: message, finishedAt: new Date() } + : { + status: "PENDING", + attempts, + lastError: message, + runAfter: new Date(Date.now() + backoffMs(attempts)), + }, + }) + } +} + +// Drain up to `limit` due jobs. Returns how many were processed (run to a +// terminal or re-queued state). Safe to call from a request, a scheduler, or +// concurrently across instances. +export async function processSyncJobs(limit = 10): Promise<{ processed: number }> { + let processed = 0 + for (let i = 0; i < limit; i++) { + const job = await claimNextJob() + if (!job) break + await runJob(job) + processed++ + } + return { processed } +}