diff --git a/apps/backend/drizzle/0007_device_revocation.sql b/apps/backend/drizzle/0007_device_revocation.sql new file mode 100644 index 0000000..36bb90e --- /dev/null +++ b/apps/backend/drizzle/0007_device_revocation.sql @@ -0,0 +1,25 @@ +CREATE TABLE "devices" ( + "id" uuid PRIMARY KEY DEFAULT gen_random_uuid() NOT NULL, + "user_id" uuid NOT NULL, + "name" text, + "public_key" text NOT NULL, + "revoked_at" timestamp, + "last_seen_at" timestamp, + "created_at" timestamp DEFAULT now() NOT NULL +); +--> statement-breakpoint +CREATE TABLE "device_prekeys" ( + "id" uuid PRIMARY KEY DEFAULT gen_random_uuid() NOT NULL, + "device_id" uuid NOT NULL, + "key_id" integer NOT NULL, + "public_key" text NOT NULL, + "created_at" timestamp DEFAULT now() NOT NULL +); +--> statement-breakpoint +ALTER TABLE "devices" ADD CONSTRAINT "devices_user_id_users_id_fk" FOREIGN KEY ("user_id") REFERENCES "public"."users"("id") ON DELETE cascade ON UPDATE no action; +--> statement-breakpoint +ALTER TABLE "device_prekeys" ADD CONSTRAINT "device_prekeys_device_id_devices_id_fk" FOREIGN KEY ("device_id") REFERENCES "public"."devices"("id") ON DELETE cascade ON UPDATE no action; +--> statement-breakpoint +CREATE INDEX "devices_user_id_idx" ON "devices" USING btree ("user_id"); +--> statement-breakpoint +CREATE INDEX "device_prekeys_device_id_idx" ON "device_prekeys" USING btree ("device_id"); diff --git a/apps/backend/drizzle/meta/_journal.json b/apps/backend/drizzle/meta/_journal.json index 8fea85e..125a063 100644 --- a/apps/backend/drizzle/meta/_journal.json +++ b/apps/backend/drizzle/meta/_journal.json @@ -50,6 +50,13 @@ "when": 1780560000000, "tag": "0006_add_conversation_avatar_url", "breakpoints": true + }, + { + "idx": 7, + "version": "7", + "when": 1780646400000, + "tag": "0007_device_revocation", + "breakpoints": true } ] } \ No newline at end of file diff --git a/apps/backend/src/__tests__/deviceRevocation.test.ts b/apps/backend/src/__tests__/deviceRevocation.test.ts new file mode 100644 index 0000000..8570c73 --- /dev/null +++ b/apps/backend/src/__tests__/deviceRevocation.test.ts @@ -0,0 +1,122 @@ +import { describe, it, expect, vi, beforeEach } from 'vitest'; + +const mockFindDevice = vi.fn(); +const mockFindMembers = vi.fn(); +const mockCount = vi.fn(); +const mockUpdateReturning = vi.fn(); +const mockDeleteWhere = vi.fn(); + +const tx = { + update: vi.fn(() => ({ + set: vi.fn(() => ({ where: vi.fn(() => ({ returning: mockUpdateReturning })) })), + })), + delete: vi.fn(() => ({ where: mockDeleteWhere })), +}; + +const mockTransaction = vi.fn( + (cb: (t: typeof tx) => Promise) => cb(tx) as Promise, +); + +vi.mock('../db/index.js', () => ({ + db: { + query: { + devices: { findFirst: mockFindDevice }, + conversationMembers: { findMany: mockFindMembers }, + }, + $count: mockCount, + transaction: mockTransaction, + }, +})); + +vi.mock('../db/schema.js', () => ({ + devices: { id: 'id', userId: 'userId', revokedAt: 'revokedAt' }, + devicePrekeys: { deviceId: 'deviceId' }, + conversationMembers: { userId: 'userId', conversationId: 'conversationId' }, +})); + +vi.mock('drizzle-orm', () => ({ + and: vi.fn((...args: unknown[]) => args.filter(Boolean)), + eq: vi.fn((col: unknown, val: unknown) => ({ col, val })), + isNull: vi.fn((col: unknown) => ({ col, op: 'isNull' })), + sql: vi.fn(), +})); + +const { revokeDevice } = await import('../services/deviceRevocation.js'); + +beforeEach(() => { + vi.clearAllMocks(); +}); + +describe('revokeDevice', () => { + it('returns 404 when the device is missing', async () => { + mockFindDevice.mockResolvedValue(undefined); + + const result = await revokeDevice('user-1', 'dev-1'); + + expect(result).toEqual({ ok: false, status: 404, error: 'Device not found' }); + expect(mockTransaction).not.toHaveBeenCalled(); + }); + + it('returns 403 when the device belongs to someone else', async () => { + mockFindDevice.mockResolvedValue({ id: 'dev-1', userId: 'other', revokedAt: null }); + + const result = await revokeDevice('user-1', 'dev-1'); + + expect(result).toMatchObject({ ok: false, status: 403 }); + }); + + it('returns 409 when the device is already revoked', async () => { + mockFindDevice.mockResolvedValue({ + id: 'dev-1', + userId: 'user-1', + revokedAt: new Date(), + }); + + const result = await revokeDevice('user-1', 'dev-1'); + + expect(result).toMatchObject({ ok: false, status: 409, error: 'Device is already revoked' }); + }); + + it('returns 409 when it is the last active device', async () => { + mockFindDevice.mockResolvedValue({ id: 'dev-1', userId: 'user-1', revokedAt: null }); + mockCount.mockResolvedValue(1); + + const result = await revokeDevice('user-1', 'dev-1'); + + expect(result).toMatchObject({ + ok: false, + status: 409, + error: 'Cannot revoke the last active device', + }); + expect(mockTransaction).not.toHaveBeenCalled(); + }); + + it('revokes, deletes prekeys, and returns shared conversations', async () => { + const revoked = { id: 'dev-1', userId: 'user-1', revokedAt: new Date() }; + mockFindDevice.mockResolvedValue({ id: 'dev-1', userId: 'user-1', revokedAt: null }); + mockCount.mockResolvedValue(2); + mockUpdateReturning.mockResolvedValue([revoked]); + mockFindMembers.mockResolvedValue([{ conversationId: 'conv-1' }, { conversationId: 'conv-2' }]); + + const result = await revokeDevice('user-1', 'dev-1'); + + expect(result).toEqual({ + ok: true, + device: revoked, + conversationIds: ['conv-1', 'conv-2'], + }); + expect(tx.delete).toHaveBeenCalled(); + expect(mockDeleteWhere).toHaveBeenCalled(); + }); + + it('returns 409 when the atomic revoke loses a race', async () => { + mockFindDevice.mockResolvedValue({ id: 'dev-1', userId: 'user-1', revokedAt: null }); + mockCount.mockResolvedValue(2); + mockUpdateReturning.mockResolvedValue([]); + + const result = await revokeDevice('user-1', 'dev-1'); + + expect(result).toMatchObject({ ok: false, status: 409 }); + expect(mockFindMembers).not.toHaveBeenCalled(); + }); +}); diff --git a/apps/backend/src/__tests__/devices.routes.test.ts b/apps/backend/src/__tests__/devices.routes.test.ts new file mode 100644 index 0000000..823c579 --- /dev/null +++ b/apps/backend/src/__tests__/devices.routes.test.ts @@ -0,0 +1,169 @@ +import { describe, it, expect, vi, beforeEach } from 'vitest'; +import request from 'supertest'; +import express from 'express'; + +const mockRevokeDevice = vi.fn(); +const mockFindManyDevices = vi.fn(); + +const mockEmit = vi.fn(); +const mockTo = vi.fn(() => ({ emit: mockEmit })); +const mockDisconnectSockets = vi.fn(); +const mockIn = vi.fn(() => ({ disconnectSockets: mockDisconnectSockets })); +const mockPublish = vi.fn(); + +vi.mock('../services/deviceRevocation.js', () => ({ + revokeDevice: mockRevokeDevice, +})); + +vi.mock('../db/index.js', () => ({ + db: { + query: { + devices: { findMany: mockFindManyDevices }, + }, + }, +})); + +vi.mock('../db/schema.js', () => ({ + devices: { id: 'id', userId: 'userId', createdAt: 'createdAt' }, +})); + +vi.mock('drizzle-orm', () => ({ + asc: vi.fn(), + eq: vi.fn((col: unknown, val: unknown) => ({ col, val })), +})); + +vi.mock('../lib/socket.js', () => ({ + getSocketServer: () => ({ to: mockTo, in: mockIn }), +})); + +vi.mock('../lib/redis.js', () => ({ + get redis() { + return { publish: mockPublish }; + }, +})); + +vi.mock('../lib/deviceBus.js', () => ({ + deviceRoom: (id: string) => `device:${id}`, + publishDeviceRevoked: (_redis: unknown, event: unknown) => mockPublish(event), +})); + +vi.mock('../middleware/auth.js', () => ({ + requireAuth: (req: express.Request, _res: express.Response, next: express.NextFunction) => { + (req as express.Request & { auth: { userId: string } }).auth = { userId: 'user-1' }; + next(); + }, +})); + +const { devicesRouter } = await import('../routes/devices.js'); + +function makeApp() { + const app = express(); + app.use(express.json()); + app.use('/devices', devicesRouter); + return app; +} + +beforeEach(() => { + vi.clearAllMocks(); +}); + +describe('GET /devices', () => { + it("lists the caller's devices", async () => { + mockFindManyDevices.mockResolvedValue([ + { + id: 'dev-1', + name: 'Laptop', + publicKey: 'pk', + revokedAt: null, + lastSeenAt: null, + createdAt: new Date('2026-01-01'), + }, + ]); + + const res = await request(makeApp()).get('/devices'); + + expect(res.status).toBe(200); + expect(res.body.devices).toHaveLength(1); + expect(res.body.devices[0]).toMatchObject({ id: 'dev-1', name: 'Laptop' }); + }); +}); + +describe('DELETE /devices/:id', () => { + it('returns 404 when the device does not exist', async () => { + mockRevokeDevice.mockResolvedValue({ ok: false, status: 404, error: 'Device not found' }); + + const res = await request(makeApp()).delete('/devices/missing'); + + expect(res.status).toBe(404); + expect(mockDisconnectSockets).not.toHaveBeenCalled(); + expect(mockPublish).not.toHaveBeenCalled(); + }); + + it('returns 403 when the device belongs to another user', async () => { + mockRevokeDevice.mockResolvedValue({ + ok: false, + status: 403, + error: 'You do not own this device', + }); + + const res = await request(makeApp()).delete('/devices/dev-9'); + + expect(res.status).toBe(403); + }); + + it('returns 409 when revoking the last active device', async () => { + mockRevokeDevice.mockResolvedValue({ + ok: false, + status: 409, + error: 'Cannot revoke the last active device', + }); + + const res = await request(makeApp()).delete('/devices/dev-1'); + + expect(res.status).toBe(409); + expect(res.body.error).toBe('Cannot revoke the last active device'); + expect(mockDisconnectSockets).not.toHaveBeenCalled(); + }); + + it('revokes the device, disconnects sockets, notifies peers, and publishes on the bus', async () => { + const revokedAt = new Date('2026-06-25T00:00:00.000Z'); + mockRevokeDevice.mockResolvedValue({ + ok: true, + device: { + id: 'dev-1', + name: 'Laptop', + publicKey: 'pk', + revokedAt, + lastSeenAt: null, + createdAt: new Date('2026-01-01'), + }, + conversationIds: ['conv-1', 'conv-2'], + }); + + const res = await request(makeApp()).delete('/devices/dev-1'); + + expect(res.status).toBe(200); + expect(res.body).toMatchObject({ id: 'dev-1' }); + + // Live sockets bound to the device are disconnected. + expect(mockIn).toHaveBeenCalledWith('device:dev-1'); + expect(mockDisconnectSockets).toHaveBeenCalledWith(true); + + // Peers in each shared conversation receive a key-change notice. + expect(mockTo).toHaveBeenCalledWith('conv-1'); + expect(mockTo).toHaveBeenCalledWith('conv-2'); + expect(mockEmit).toHaveBeenCalledWith( + 'key_change', + expect.objectContaining({ userId: 'user-1', deviceId: 'dev-1' }), + ); + + // Revocation is published on the Redis bus for cross-instance fan-out. + expect(mockPublish).toHaveBeenCalledWith( + expect.objectContaining({ + deviceId: 'dev-1', + userId: 'user-1', + conversationIds: ['conv-1', 'conv-2'], + }), + ); + }); +}); diff --git a/apps/backend/src/app.ts b/apps/backend/src/app.ts index ede35cf..70bb0f5 100644 --- a/apps/backend/src/app.ts +++ b/apps/backend/src/app.ts @@ -9,6 +9,7 @@ import { authRouter } from './routes/auth.js'; import { conversationsRouter } from './routes/conversations.js'; import { messagesRouter } from './routes/messages.js'; import { usersRouter } from './routes/users.js'; +import { devicesRouter } from './routes/devices.js'; import { requireAuth, type AuthRequest } from './middleware/auth.js'; const packageJson = JSON.parse( @@ -47,6 +48,7 @@ app.use('/auth', authRouter); app.use('/conversations', conversationsRouter); app.use('/messages', messagesRouter); app.use('/users', usersRouter); +app.use('/devices', devicesRouter); app.get('/me', requireAuth, (req, res) => { res.json({ user: (req as AuthRequest).auth }); diff --git a/apps/backend/src/db/schema.ts b/apps/backend/src/db/schema.ts index 419c59f..fe4105b 100644 --- a/apps/backend/src/db/schema.ts +++ b/apps/backend/src/db/schema.ts @@ -1,4 +1,13 @@ -import { pgTable, text, timestamp, uuid, boolean, pgEnum, index } from 'drizzle-orm/pg-core'; +import { + pgTable, + text, + timestamp, + uuid, + boolean, + integer, + pgEnum, + index, +} from 'drizzle-orm/pg-core'; import { relations, sql } from 'drizzle-orm'; export const users = pgTable('users', { @@ -19,6 +28,45 @@ export const wallets = pgTable('wallets', { createdAt: timestamp('created_at').notNull().defaultNow(), }); +// ─── Devices & E2E key material (#157) ───────────────────────────────────────── +// +// Each user authenticates one or more devices, every device advertising a +// long-term identity `publicKey` plus a pool of one-time `device_prekeys` peers +// consume when starting an encrypted session. Revoking a device is a *soft* +// delete: `revokedAt` is stamped (the row is kept for audit/history), its +// prekeys are dropped, live sockets are disconnected, and the device is excluded +// from future fan-out. Active devices are those with `revoked_at IS NULL`. + +export const devices = pgTable( + 'devices', + { + id: uuid('id').primaryKey().defaultRandom(), + userId: uuid('user_id') + .notNull() + .references(() => users.id, { onDelete: 'cascade' }), + name: text('name'), + publicKey: text('public_key').notNull(), + revokedAt: timestamp('revoked_at'), + lastSeenAt: timestamp('last_seen_at'), + createdAt: timestamp('created_at').notNull().defaultNow(), + }, + (table) => [index('devices_user_id_idx').on(table.userId)], +); + +export const devicePrekeys = pgTable( + 'device_prekeys', + { + id: uuid('id').primaryKey().defaultRandom(), + deviceId: uuid('device_id') + .notNull() + .references(() => devices.id, { onDelete: 'cascade' }), + keyId: integer('key_id').notNull(), + publicKey: text('public_key').notNull(), + createdAt: timestamp('created_at').notNull().defaultNow(), + }, + (table) => [index('device_prekeys_device_id_idx').on(table.deviceId)], +); + // ─── Conversations ──────────────────────────────────────────────────────────── export const conversationTypeEnum = pgEnum('conversation_type', ['dm', 'group']); @@ -98,12 +146,22 @@ export const usersRelations = relations(users, ({ many }) => ({ memberships: many(conversationMembers), messages: many(messages), transfers: many(tokenTransfers), + devices: many(devices), })); export const walletsRelations = relations(wallets, ({ one }) => ({ user: one(users, { fields: [wallets.userId], references: [users.id] }), })); +export const devicesRelations = relations(devices, ({ one, many }) => ({ + user: one(users, { fields: [devices.userId], references: [users.id] }), + prekeys: many(devicePrekeys), +})); + +export const devicePrekeysRelations = relations(devicePrekeys, ({ one }) => ({ + device: one(devices, { fields: [devicePrekeys.deviceId], references: [devices.id] }), +})); + export const conversationsRelations = relations(conversations, ({ many }) => ({ members: many(conversationMembers), messages: many(messages), @@ -150,3 +208,7 @@ export type Message = typeof messages.$inferSelect; export type NewMessage = typeof messages.$inferInsert; export type TokenTransfer = typeof tokenTransfers.$inferSelect; export type NewTokenTransfer = typeof tokenTransfers.$inferInsert; +export type Device = typeof devices.$inferSelect; +export type NewDevice = typeof devices.$inferInsert; +export type DevicePrekey = typeof devicePrekeys.$inferSelect; +export type NewDevicePrekey = typeof devicePrekeys.$inferInsert; diff --git a/apps/backend/src/index.ts b/apps/backend/src/index.ts index 63ac88b..a4a32cd 100644 --- a/apps/backend/src/index.ts +++ b/apps/backend/src/index.ts @@ -5,12 +5,14 @@ import { createClient } from 'redis'; import dotenv from 'dotenv'; import { eq } from 'drizzle-orm'; import { db } from './db/index.js'; -import { conversationMembers } from './db/schema.js'; +import { conversationMembers, devices } from './db/schema.js'; import { socketAuthMiddleware, type AuthSocket } from './middleware/socketAuth.js'; import { registerMessagingHandlers } from './socket/messaging.js'; import { app } from './app.js'; import { redis as appRedis } from './lib/redis.js'; import { setSocketServer } from './lib/socket.js'; +import { deviceRoom } from './lib/deviceBus.js'; +import { registerDeviceBusSubscriber } from './services/deviceBusSubscriber.js'; import { setOnline, setOffline, refreshPresence } from './services/presence.js'; import { buildRpcFetcher, runForever as runStellarListener } from './services/stellarListener.js'; import { loadEnv } from './config.js'; @@ -34,6 +36,25 @@ io.on('connection', async (socket: AuthSocket) => { const userId = socket.auth!.userId; console.log('User connected:', userId, socket.id); + // Bind the socket to its device (#157). A revoked or foreign device is + // refused so it can never rejoin rooms or receive fan-out; an active device + // joins its room so it can be torn down on revocation. + const deviceId = socket.handshake.auth['deviceId'] as string | undefined; + if (deviceId) { + const device = await db.query.devices.findFirst({ + where: eq(devices.id, deviceId), + columns: { id: true, userId: true, revokedAt: true }, + }); + + if (!device || device.userId !== userId || device.revokedAt) { + socket.emit('device_revoked', { deviceId }); + socket.disconnect(true); + return; + } + + await socket.join(deviceRoom(deviceId)); + } + // Auto-join all conversation rooms so the socket receives new_message events // for every conversation the user belongs to (needed for unread badge tracking). const memberships = await db.query.conversationMembers.findMany({ @@ -119,6 +140,12 @@ httpServer.listen(PORT, () => { // Redis is unreachable; on failure we fall back to the in-process adapter. void attachRedisAdapter(); +// #157 — react to device revocations published on the Redis bus by tearing +// down the revoked device's sockets on this instance. +if (appRedis) { + registerDeviceBusSubscriber(io, appRedis); +} + // #46 — Stellar transfer event listener. Only spin up when the contract // id is configured so local-dev and unit-test runs don't try to talk to // Soroban RPC. The listener never throws out of runForever, so a failed diff --git a/apps/backend/src/lib/deviceBus.ts b/apps/backend/src/lib/deviceBus.ts new file mode 100644 index 0000000..ffb0949 --- /dev/null +++ b/apps/backend/src/lib/deviceBus.ts @@ -0,0 +1,33 @@ +/** + * Device revocation bus (#157). + * + * When a device is revoked we publish a `device_revoked` event on a dedicated + * Redis pub/sub channel so every backend instance — and any out-of-process + * fan-out worker — can drop the device from delivery and tear down its live + * sockets. This is the cross-instance signal that complements the Socket.IO + * Redis adapter (which already mirrors `disconnectSockets` across the cluster). + */ +import type { Redis } from 'ioredis'; + +export const DEVICE_REVOKED_CHANNEL = 'bus:device_revoked'; + +export interface DeviceRevokedEvent { + deviceId: string; + userId: string; + revokedAt: string; + conversationIds: string[]; +} + +/** Socket.IO room a device's sockets join so they can be addressed as a group. */ +export function deviceRoom(deviceId: string): string { + return `device:${deviceId}`; +} + +/** Publish a revocation on the bus. Best-effort: the DB state is the source of truth. */ +export async function publishDeviceRevoked(redis: Redis, event: DeviceRevokedEvent): Promise { + try { + await redis.publish(DEVICE_REVOKED_CHANNEL, JSON.stringify(event)); + } catch { + // Bus delivery is best-effort — local disconnect + persisted revocation already applied. + } +} diff --git a/apps/backend/src/routes/devices.ts b/apps/backend/src/routes/devices.ts new file mode 100644 index 0000000..6e16ed9 --- /dev/null +++ b/apps/backend/src/routes/devices.ts @@ -0,0 +1,94 @@ +import { Router } from 'express'; +import type { IRouter } from 'express'; +import { asc, eq } from 'drizzle-orm'; +import { db } from '../db/index.js'; +import { devices } from '../db/schema.js'; +import type { Device } from '../db/schema.js'; +import { requireAuth, type AuthRequest } from '../middleware/auth.js'; +import { redis } from '../lib/redis.js'; +import { getSocketServer } from '../lib/socket.js'; +import { revokeDevice } from '../services/deviceRevocation.js'; +import { deviceRoom, publishDeviceRevoked } from '../lib/deviceBus.js'; + +export const devicesRouter: IRouter = Router(); + +devicesRouter.use(requireAuth); + +function serializeDevice(device: Device) { + return { + id: device.id, + name: device.name, + publicKey: device.publicKey, + revokedAt: device.revokedAt, + lastSeenAt: device.lastSeenAt, + createdAt: device.createdAt, + }; +} + +// GET /devices — list the caller's devices (active first, newest first). +devicesRouter.get('/', async (req: AuthRequest, res) => { + const userId = req.auth!.userId; + + const rows = await db.query.devices.findMany({ + where: eq(devices.userId, userId), + orderBy: asc(devices.createdAt), + }); + + res.json({ devices: rows.map(serializeDevice) }); +}); + +// DELETE /devices/:id — revoke (unlink) a device. +// +// Soft-revokes the device (sets revokedAt, deletes its prekeys), disconnects its +// live sockets, publishes a `device_revoked` event on the Redis bus so the +// device is dropped from future fan-out, and emits a key-change notice to peers +// in shared conversations. Revoking the only active device is rejected with 409. +devicesRouter.delete('/:id', async (req: AuthRequest, res) => { + const userId = req.auth!.userId; + const deviceId = req.params['id'] as string | undefined; + + if (!deviceId) { + res.status(400).json({ error: 'Device id is required' }); + return; + } + + const result = await revokeDevice(userId, deviceId); + + if (!result.ok) { + res.status(result.status).json({ error: result.error }); + return; + } + + const { device, conversationIds } = result; + const revokedAt = device.revokedAt ?? new Date(); + + const io = getSocketServer(); + if (io) { + // Disconnect every socket bound to this device. With the Socket.IO Redis + // adapter attached this fans out across all instances, severing the + // revoked device's live bindings everywhere. + io.in(deviceRoom(device.id)).disconnectSockets(true); + + // Notify peers in shared conversations so clients refresh key material. + for (const conversationId of conversationIds) { + io.to(conversationId).emit('key_change', { + conversationId, + userId, + deviceId: device.id, + revokedAt, + }); + } + } + + // Cross-instance / out-of-process signal: drop the device from fan-out. + if (redis) { + await publishDeviceRevoked(redis, { + deviceId: device.id, + userId, + revokedAt: revokedAt.toISOString(), + conversationIds, + }); + } + + res.json(serializeDevice(device)); +}); diff --git a/apps/backend/src/services/deviceBusSubscriber.ts b/apps/backend/src/services/deviceBusSubscriber.ts new file mode 100644 index 0000000..d07e29b --- /dev/null +++ b/apps/backend/src/services/deviceBusSubscriber.ts @@ -0,0 +1,38 @@ +/** + * Device revocation bus subscriber (#157). + * + * Subscribes to the `device_revoked` channel and tears down any sockets bound + * to a revoked device on the local instance. The publishing instance already + * disconnects them cluster-wide via the Socket.IO Redis adapter; this consumer + * makes revocation robust even for deployments running without that adapter and + * gives other subsystems a single place to react to revocations. + */ +import type { Server } from 'socket.io'; +import type { Redis } from 'ioredis'; +import { DEVICE_REVOKED_CHANNEL, deviceRoom, type DeviceRevokedEvent } from '../lib/deviceBus.js'; + +export function registerDeviceBusSubscriber(io: Server, redis: Redis): void { + // A connection in subscriber mode cannot issue other commands, so duplicate. + const subscriber = redis.duplicate(); + + subscriber.on('error', () => { + // Degrade silently — revocation is still persisted and applied on the publisher. + }); + + subscriber.on('message', (channel, raw) => { + if (channel !== DEVICE_REVOKED_CHANNEL) return; + + let event: DeviceRevokedEvent; + try { + event = JSON.parse(raw) as DeviceRevokedEvent; + } catch { + return; + } + + io.in(deviceRoom(event.deviceId)).disconnectSockets(true); + }); + + void subscriber.subscribe(DEVICE_REVOKED_CHANNEL).catch(() => { + // Subscription failed (Redis down) — connection-level error handler logs it. + }); +} diff --git a/apps/backend/src/services/deviceRevocation.ts b/apps/backend/src/services/deviceRevocation.ts new file mode 100644 index 0000000..cb0070a --- /dev/null +++ b/apps/backend/src/services/deviceRevocation.ts @@ -0,0 +1,76 @@ +/** + * Device revocation core logic (#157). + * + * Soft-revokes a device: stamps `revokedAt`, deletes its one-time prekeys, and + * reports the conversations whose peers must be notified of the key change. All + * state changes run in a single transaction, and the "cannot revoke the last + * active device" rule is enforced atomically inside the UPDATE so two concurrent + * revokes can never strip a user of their final device. + * + * Socket teardown and bus/peer notifications are the caller's responsibility + * (see routes/devices.ts) — this module owns persistence only, which keeps it + * unit-testable without a Socket.IO server or Redis. + */ +import { and, eq, isNull, sql } from 'drizzle-orm'; +import { db } from '../db/index.js'; +import { devices, devicePrekeys, conversationMembers } from '../db/schema.js'; +import type { Device } from '../db/schema.js'; + +export type RevokeDeviceResult = + | { ok: true; device: Device; conversationIds: string[] } + | { ok: false; status: 403 | 404 | 409; error: string }; + +/** Count of a user's devices that are still active (not revoked). */ +export async function countActiveDevices(userId: string): Promise { + return db.$count(devices, and(eq(devices.userId, userId), isNull(devices.revokedAt))); +} + +export async function revokeDevice(userId: string, deviceId: string): Promise { + const device = await db.query.devices.findFirst({ where: eq(devices.id, deviceId) }); + + if (!device) { + return { ok: false, status: 404, error: 'Device not found' }; + } + if (device.userId !== userId) { + return { ok: false, status: 403, error: 'You do not own this device' }; + } + if (device.revokedAt) { + return { ok: false, status: 409, error: 'Device is already revoked' }; + } + if ((await countActiveDevices(userId)) <= 1) { + return { ok: false, status: 409, error: 'Cannot revoke the last active device' }; + } + + // Revoke + drop prekeys atomically. The correlated count guard re-checks the + // last-device rule under the row lock, closing the check-then-act race above. + const revoked = await db.transaction(async (tx) => { + const [updated] = await tx + .update(devices) + .set({ revokedAt: new Date() }) + .where( + and( + eq(devices.id, deviceId), + isNull(devices.revokedAt), + sql`(SELECT count(*) FROM ${devices} d2 WHERE d2.user_id = ${userId} AND d2.revoked_at IS NULL) > 1`, + ), + ) + .returning(); + + if (!updated) return null; + + await tx.delete(devicePrekeys).where(eq(devicePrekeys.deviceId, deviceId)); + return updated; + }); + + if (!revoked) { + // Lost a race with a concurrent revoke of this or the user's other devices. + return { ok: false, status: 409, error: 'Device is already revoked' }; + } + + const memberships = await db.query.conversationMembers.findMany({ + where: eq(conversationMembers.userId, userId), + columns: { conversationId: true }, + }); + + return { ok: true, device: revoked, conversationIds: memberships.map((m) => m.conversationId) }; +}