Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions prisma/migrations/20260619100000_add_sync_jobs/migration.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
-- CreateEnum
CREATE TYPE "SyncJobStatus" AS ENUM ('PENDING', 'RUNNING', 'SUCCEEDED', 'FAILED');

-- CreateTable
CREATE TABLE "SyncJob" (
"id" TEXT NOT NULL,
"connectionId" TEXT NOT NULL,
"status" "SyncJobStatus" NOT NULL DEFAULT 'PENDING',
"attempts" INTEGER NOT NULL DEFAULT 0,
"maxAttempts" INTEGER NOT NULL DEFAULT 3,
"lastError" TEXT,
"runAfter" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"startedAt" TIMESTAMP(3),
"finishedAt" TIMESTAMP(3),
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updatedAt" TIMESTAMP(3) NOT NULL,

CONSTRAINT "SyncJob_pkey" PRIMARY KEY ("id")
);

-- CreateIndex
CREATE INDEX "SyncJob_status_runAfter_idx" ON "SyncJob"("status", "runAfter");

-- CreateIndex
CREATE INDEX "SyncJob_connectionId_idx" ON "SyncJob"("connectionId");

-- AddForeignKey
ALTER TABLE "SyncJob" ADD CONSTRAINT "SyncJob_connectionId_fkey" FOREIGN KEY ("connectionId") REFERENCES "DirectoryConnection"("id") ON DELETE CASCADE ON UPDATE CASCADE;
29 changes: 28 additions & 1 deletion prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,38 @@ model DirectoryConnection {
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt

company Company @relation(fields: [companyId], references: [id], onDelete: Cascade)
company Company @relation(fields: [companyId], references: [id], onDelete: Cascade)
syncJobs SyncJob[]

@@index([companyId])
}

model SyncJob {
id String @id @default(cuid())
connectionId String
status SyncJobStatus @default(PENDING)
attempts Int @default(0)
maxAttempts Int @default(3)
lastError String?
runAfter DateTime @default(now())
startedAt DateTime?
finishedAt DateTime?
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt

connection DirectoryConnection @relation(fields: [connectionId], references: [id], onDelete: Cascade)

@@index([status, runAfter])
@@index([connectionId])
}

enum SyncJobStatus {
PENDING
RUNNING
SUCCEEDED
FAILED
}

enum DirectoryType {
AZURE_AD
GOOGLE_WORKSPACE
Expand Down
70 changes: 28 additions & 42 deletions src/app/api/directory/[id]/sync/route.test.ts
Original file line number Diff line number Diff line change
@@ -1,78 +1,64 @@
import { describe, it, expect, vi, beforeEach } from "vitest"

const requireAdmin = vi.fn()
const queryRaw = vi.fn()
const transaction = vi.fn()
const syncDirectoryConnection = vi.fn()
const findFirst = vi.fn()
const enqueueSyncJob = vi.fn()
const processSyncJobs = vi.fn()

vi.mock("@/lib/apiAuth", () => ({ requireAdmin: () => requireAdmin() }))
vi.mock("@/lib/prisma", () => ({ prisma: { $transaction: (cb: unknown) => transaction(cb) } }))
vi.mock("@/lib/directory/sync", () => ({
syncDirectoryConnection: (id: string, companyId: string) =>
syncDirectoryConnection(id, companyId),
vi.mock("@/lib/prisma", () => ({
prisma: { directoryConnection: { findFirst: (a: unknown) => findFirst(a) } },
}))
vi.mock("@/lib/directory/jobs", () => ({
enqueueSyncJob: (id: string) => enqueueSyncJob(id),
processSyncJobs: () => processSyncJobs(),
}))

import { POST } from "./route"

const params = Promise.resolve({ id: "conn-1" })
const req = new Request("http://localhost/api/directory/conn-1/sync", { method: "POST" })

// $transaction runs the callback with a tx whose $queryRaw yields the lock result.
function withLock(locked: boolean) {
queryRaw.mockResolvedValue([{ locked }])
transaction.mockImplementation((cb: (tx: unknown) => unknown) =>
cb({ $queryRaw: () => queryRaw() })
)
}

beforeEach(() => {
vi.clearAllMocks()
requireAdmin.mockResolvedValue({ session: { user: { companyId: "co-1" } }, error: null })
findFirst.mockResolvedValue({ id: "conn-1" })
enqueueSyncJob.mockResolvedValue({ id: "job-1", status: "PENDING" })
processSyncJobs.mockResolvedValue({ processed: 1 })
})

describe("POST /api/directory/[id]/sync", () => {
it("returns 401-style error from requireAdmin without touching the lock", async () => {
const error = NextResponseLike(401)
it("propagates the auth error without enqueueing", async () => {
const error = { status: 401 } as unknown
requireAdmin.mockResolvedValue({ session: null, error })

const res = await POST(req, { params })

expect(res).toBe(error)
expect(transaction).not.toHaveBeenCalled()
expect(enqueueSyncJob).not.toHaveBeenCalled()
})

it("runs the sync when the advisory lock is acquired", async () => {
withLock(true)
syncDirectoryConnection.mockResolvedValue({ synced: 42 })
it("returns 404 when the connection is not in the caller's company", async () => {
findFirst.mockResolvedValue(null)

const res = await POST(req, { params })

expect(syncDirectoryConnection).toHaveBeenCalledWith("conn-1", "co-1")
expect(res.status).toBe(200)
expect(await res.json()).toEqual({ synced: 42 })
expect(res.status).toBe(404)
expect(enqueueSyncJob).not.toHaveBeenCalled()
})

it("returns 409 and skips the sync when the lock is held elsewhere", async () => {
withLock(false)

it("enqueues a job and returns 202 with the job id", async () => {
const res = await POST(req, { params })

expect(syncDirectoryConnection).not.toHaveBeenCalled()
expect(res.status).toBe(409)
expect(await res.json()).toEqual({ error: "Sync already running" })
expect(enqueueSyncJob).toHaveBeenCalledWith("conn-1")
expect(res.status).toBe(202)
expect(await res.json()).toEqual({ jobId: "job-1", status: "PENDING" })
})

it("returns 500 when the sync throws", async () => {
withLock(true)
syncDirectoryConnection.mockRejectedValue(new Error("boom"))

const res = await POST(req, { params })

expect(res.status).toBe(500)
expect(await res.json()).toEqual({ error: "boom" })
it("scopes the lookup to the caller's company", async () => {
await POST(req, { params })
expect(findFirst).toHaveBeenCalledWith(
expect.objectContaining({ where: { id: "conn-1", companyId: "co-1" } })
)
})
})

function NextResponseLike(status: number) {
return { status, body: "auth" } as unknown
}
36 changes: 17 additions & 19 deletions src/app/api/directory/[id]/sync/route.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { NextResponse } from "next/server"
import { requireAdmin } from "@/lib/apiAuth"
import { prisma } from "@/lib/prisma"
import { syncDirectoryConnection } from "@/lib/directory/sync"
import { enqueueSyncJob, processSyncJobs } from "@/lib/directory/jobs"

export async function POST(
_req: Request,
Expand All @@ -12,23 +12,21 @@ export async function POST(

const { id } = await params

// Shared Postgres advisory lock: survives multi-replica, unlike an
// in-memory Set. xact_lock auto-releases on commit/rollback, so no stale
// lock is left behind if the process crashes during the sync.
try {
const result = await prisma.$transaction(async (tx) => {
const [{ locked }] = await tx.$queryRaw<{ locked: boolean }[]>`
SELECT pg_try_advisory_xact_lock(hashtextextended(${id}, 0)) AS locked
`
if (!locked) return { conflict: true as const }
const synced = await syncDirectoryConnection(id, session.user.companyId)
return { conflict: false as const, synced }
})
// Scope the connection to the caller's company before enqueueing.
const connection = await prisma.directoryConnection.findFirst({
where: { id, companyId: session.user.companyId },
select: { id: true },
})
if (!connection)
return NextResponse.json({ error: "Connection not found" }, { status: 404 })

if (result.conflict)
return NextResponse.json({ error: "Sync already running" }, { status: 409 })
return NextResponse.json(result.synced)
} catch (e: unknown) {
return NextResponse.json({ error: (e as Error).message }, { status: 500 })
}
const job = await enqueueSyncJob(id)

// Kick the queue without blocking the response. The sync runs outside the
// request cycle (no timeout risk on large directories); a scheduler also
// drains the queue, so a dropped fire-and-forget here is only a delay, not a
// lost job. Concurrency is handled by the job claim, not this call.
void processSyncJobs().catch(() => {})

return NextResponse.json({ jobId: job.id, status: job.status }, { status: 202 })
}
115 changes: 115 additions & 0 deletions src/lib/directory/jobs.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import { describe, it, expect, vi, beforeEach } from "vitest"

const syncJobFindFirst = vi.fn()
const syncJobCreate = vi.fn()
const syncJobUpdate = vi.fn()
const connFindUnique = vi.fn()
const queryRaw = vi.fn()
const syncDirectoryConnection = vi.fn()

vi.mock("@/lib/prisma", () => ({
prisma: {
syncJob: {
findFirst: (a: unknown) => syncJobFindFirst(a),
create: (a: unknown) => syncJobCreate(a),
update: (a: unknown) => syncJobUpdate(a),
},
directoryConnection: { findUnique: (a: unknown) => connFindUnique(a) },
$queryRaw: () => queryRaw(),
},
}))
vi.mock("./sync", () => ({
syncDirectoryConnection: (id: string, companyId: string) =>
syncDirectoryConnection(id, companyId),
}))

import { backoffMs, enqueueSyncJob, processSyncJobs } from "./jobs"

beforeEach(() => {
vi.clearAllMocks()
connFindUnique.mockResolvedValue({ companyId: "co-1" })
})

describe("backoffMs", () => {
it("grows exponentially from 30s", () => {
expect(backoffMs(1)).toBe(30_000)
expect(backoffMs(2)).toBe(60_000)
expect(backoffMs(3)).toBe(120_000)
})
})

describe("enqueueSyncJob", () => {
it("reuses an existing pending/running job", async () => {
syncJobFindFirst.mockResolvedValue({ id: "job-1", status: "PENDING" })
const res = await enqueueSyncJob("c1")
expect(res).toEqual({ id: "job-1", status: "PENDING" })
expect(syncJobCreate).not.toHaveBeenCalled()
})

it("creates a job when none is active", async () => {
syncJobFindFirst.mockResolvedValue(null)
syncJobCreate.mockResolvedValue({ id: "job-2", status: "PENDING" })
const res = await enqueueSyncJob("c1")
expect(res).toEqual({ id: "job-2", status: "PENDING" })
expect(syncJobCreate).toHaveBeenCalledWith(
expect.objectContaining({ data: { connectionId: "c1" } })
)
})
})

describe("processSyncJobs", () => {
it("stops when there is no due job", async () => {
queryRaw.mockResolvedValue([])
const res = await processSyncJobs()
expect(res).toEqual({ processed: 0 })
expect(syncDirectoryConnection).not.toHaveBeenCalled()
})

it("marks a job SUCCEEDED on a successful sync", async () => {
queryRaw
.mockResolvedValueOnce([{ id: "j1", connectionId: "c1", attempts: 0, maxAttempts: 3 }])
.mockResolvedValue([])
syncDirectoryConnection.mockResolvedValue({ synced: 3 })

const res = await processSyncJobs()

expect(res.processed).toBe(1)
expect(syncDirectoryConnection).toHaveBeenCalledWith("c1", "co-1")
expect(syncJobUpdate).toHaveBeenCalledWith(
expect.objectContaining({
where: { id: "j1" },
data: expect.objectContaining({ status: "SUCCEEDED", attempts: 1, lastError: null }),
})
)
})

it("re-queues with backoff when attempts remain", async () => {
queryRaw
.mockResolvedValueOnce([{ id: "j1", connectionId: "c1", attempts: 0, maxAttempts: 3 }])
.mockResolvedValue([])
syncDirectoryConnection.mockRejectedValue(new Error("boom"))

await processSyncJobs()

const arg = syncJobUpdate.mock.calls[0][0]
expect(arg.data.status).toBe("PENDING")
expect(arg.data.attempts).toBe(1)
expect(arg.data.lastError).toBe("boom")
expect(arg.data.runAfter).toBeInstanceOf(Date)
})

it("marks FAILED once attempts are exhausted", async () => {
queryRaw
.mockResolvedValueOnce([{ id: "j1", connectionId: "c1", attempts: 2, maxAttempts: 3 }])
.mockResolvedValue([])
syncDirectoryConnection.mockRejectedValue(new Error("still down"))

await processSyncJobs()

expect(syncJobUpdate).toHaveBeenCalledWith(
expect.objectContaining({
data: expect.objectContaining({ status: "FAILED", attempts: 3, lastError: "still down" }),
})
)
})
})
Loading