Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
1 change: 1 addition & 0 deletions apps/nestjs-backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@
"@teable/v2-contract-http-openapi": "workspace:*",
"@teable/v2-core": "workspace:*",
"@teable/v2-di": "workspace:*",
"@teable/v2-dottea": "workspace:*",
"@teable/v2-import": "workspace:*",
"@valibot/to-json-schema": "1.3.0",
"ai": "6.0.169",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,16 @@ describe('envValidationSchema', () => {
expect(value.PRISMA_DATABASE_URL).toContain('/teable');
});

it('accepts split meta/data env without the legacy alias', () => {
it('accepts split meta env without the legacy alias', () => {
const { error, value } = envValidationSchema.validate(
createEnv({
PRISMA_META_DATABASE_URL:
'postgresql://teable:teable@127.0.0.1:5432/teable-meta?schema=public',
PRISMA_DATA_DATABASE_URL:
'postgresql://teable:teable@127.0.0.1:5432/teable-data?schema=public',
})
);

expect(error).toBeUndefined();
expect(value.PRISMA_META_DATABASE_URL).toContain('/teable-meta');
expect(value.PRISMA_DATA_DATABASE_URL).toContain('/teable-data');
});

it('accepts DATABASE_URL as the last-resort meta fallback', () => {
Expand Down
1 change: 0 additions & 1 deletion apps/nestjs-backend/src/configs/env.validation.schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ export const envValidationSchema = Joi.object({
// database_url
PRISMA_DATABASE_URL: Joi.string(),
PRISMA_META_DATABASE_URL: Joi.string(),
PRISMA_DATA_DATABASE_URL: Joi.string(),
DATABASE_URL: Joi.string(),

STORAGE_PREFIX: Joi.string().uri().optional(),
Expand Down
2 changes: 1 addition & 1 deletion apps/nestjs-backend/src/configs/threshold.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ export const thresholdConfig = registerAs('threshold', () => ({
jitter: Number(process.env.BACKEND_DB_DEADLOCK_JITTER ?? 1.0),
},
baseNodeMaxFolderDepth: Number(process.env.BASE_NODE_MAX_FOLDER_DEPTH ?? 2),
maxOwnedSpaceCount: Number(process.env.MAX_SPACE_OWNER_COUNT ?? 10),
maxFreeOwnedSpaceCount: Number(process.env.MAX_FREE_SPACE_OWNER_COUNT ?? 2),
changeEmailSendCodeMailRate: Number(process.env.BACKEND_CHANGE_EMAIL_SEND_CODE_MAIL_RATE ?? 30),
resetPasswordSendMailRate: Number(process.env.BACKEND_RESET_PASSWORD_SEND_MAIL_RATE ?? 30),
signupVerificationSendCodeMailRate: Number(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ export abstract class DuplicateTableQueryAbstract {
targetTable: string,
newColumns: string[],
oldColumns: string[],
crossBaseLinkDbFieldNames: { dbFieldName: string; isMultipleCellValue: boolean }[]
crossBaseLinkDbFieldNames: { dbFieldName: string; isMultipleCellValue: boolean }[],
range?: {
minAutoNumberExclusive?: number;
maxAutoNumberInclusive?: number;
}
): Knex.QueryBuilder;
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@ export class DuplicateTableQueryPostgres extends DuplicateTableQueryAbstract {
targetTable: string,
newColumns: string[],
oldColumns: string[],
crossBaseLinkDbFieldNames: { dbFieldName: string; isMultipleCellValue: boolean }[]
crossBaseLinkDbFieldNames: { dbFieldName: string; isMultipleCellValue: boolean }[],
range?: {
minAutoNumberExclusive?: number;
maxAutoNumberInclusive?: number;
}
) {
const newColumnList = newColumns.map((col) => `"${col}"`).join(', ');
const oldColumnList = oldColumns
Expand All @@ -37,9 +41,20 @@ export class DuplicateTableQueryPostgres extends DuplicateTableQueryAbstract {
return `"${col}"`;
})
.join(', ');
const whereClauses: string[] = [];
const whereBindings: unknown[] = [];
if (range?.minAutoNumberExclusive != null) {
whereClauses.push('"__auto_number" > ?');
whereBindings.push(range.minAutoNumberExclusive);
}
if (range?.maxAutoNumberInclusive != null) {
whereClauses.push('"__auto_number" <= ?');
whereBindings.push(range.maxAutoNumberInclusive);
}
const whereSql = whereClauses.length ? ` WHERE ${whereClauses.join(' AND ')}` : '';
return this.knex.raw(
`INSERT INTO ?? (${newColumnList}) SELECT ${oldColumnList} FROM ?? ORDER BY __auto_number`,
[targetTable, sourceTable]
`INSERT INTO ?? (${newColumnList}) SELECT ${oldColumnList} FROM ??${whereSql} ORDER BY "__auto_number"`,
[targetTable, sourceTable, ...whereBindings]
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { Module } from '@nestjs/common';
import { DistributedLockService } from './distributed-lock.service';

/**
* Provides {@link DistributedLockService}. Import it into any feature module
* that needs to guard startup seeding or other once-per-deployment work.
*/
@Module({
providers: [DistributedLockService],
exports: [DistributedLockService],
})
export class DistributedLockModule {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import type { ConfigService } from '@nestjs/config';
import { afterEach, describe, expect, it, vi } from 'vitest';
import type { CacheService } from '../cache/cache.service';
import { DistributedLockService } from './distributed-lock.service';

describe('DistributedLockService', () => {
const cache = { setnx: vi.fn(), get: vi.fn(), del: vi.fn() };
const config = { get: vi.fn() };
const newService = () =>
new DistributedLockService(
cache as unknown as CacheService,
config as unknown as ConfigService
);

afterEach(() => {
vi.clearAllMocks();
});

describe('with Redis', () => {
const useRedis = () => config.get.mockReturnValue({ provider: 'redis' });

it('runs the task when the lock is acquired', async () => {
useRedis();
cache.setnx.mockResolvedValue(true);
const task = vi.fn().mockResolvedValue(undefined);

const ran = await newService().runExclusive('seed', 60, task);

expect(ran).toBe(true);
expect(cache.setnx).toHaveBeenCalledWith('lock:seed', expect.any(String), 60);
expect(task).toHaveBeenCalledOnce();
});

it('skips the task when another instance holds the lock', async () => {
useRedis();
cache.setnx.mockResolvedValue(false);
const task = vi.fn();

const ran = await newService().runExclusive('seed', 60, task);

expect(ran).toBe(false);
expect(task).not.toHaveBeenCalled();
});

it('releases the lock it owns after the task', async () => {
useRedis();
cache.setnx.mockResolvedValue(true);
// Mirror Redis: `get` returns the value `setnx` stored.
cache.get.mockImplementation(async () => cache.setnx.mock.calls[0]?.[1]);

await newService().runExclusive('seed', 60, vi.fn().mockResolvedValue(undefined));

expect(cache.del).toHaveBeenCalledWith('lock:seed');
});

it('releases the lock even when the task throws', async () => {
useRedis();
cache.setnx.mockResolvedValue(true);
cache.get.mockImplementation(async () => cache.setnx.mock.calls[0]?.[1]);
const task = vi.fn().mockRejectedValue(new Error('boom'));

await expect(newService().runExclusive('seed', 60, task)).rejects.toThrow('boom');
expect(cache.del).toHaveBeenCalledWith('lock:seed');
});

it('does not release a lock owned by another instance', async () => {
useRedis();
cache.setnx.mockResolvedValue(true);
cache.get.mockResolvedValue('another-instance');

await newService().runExclusive('seed', 60, vi.fn().mockResolvedValue(undefined));

expect(cache.del).not.toHaveBeenCalled();
});

it('runs the task anyway when acquiring the lock errors', async () => {
useRedis();
cache.setnx.mockRejectedValue(new Error('redis down'));
const task = vi.fn().mockResolvedValue(undefined);

const ran = await newService().runExclusive('seed', 60, task);

expect(ran).toBe(true);
expect(task).toHaveBeenCalledOnce();
});
});

describe('without Redis', () => {
it('runs the task without acquiring a lock', async () => {
config.get.mockReturnValue({ provider: 'memory' });
const task = vi.fn().mockResolvedValue(undefined);

const ran = await newService().runExclusive('seed', 60, task);

expect(ran).toBe(true);
expect(cache.setnx).not.toHaveBeenCalled();
expect(task).toHaveBeenCalledOnce();
});
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import { Injectable, Logger } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { CacheService } from '../cache/cache.service';
import type { ICacheConfig } from '../configs/cache.config';

/**
* Best-effort distributed lock backed by Redis (`SET NX`).
*
* Lets a caller run a critical section on exactly one instance across a
* multi-pod deployment. Without Redis there is no shared store, so the lock
* degrades to a no-op and every instance proceeds — callers must therefore
* keep the guarded work idempotent.
*/
@Injectable()
export class DistributedLockService {
private readonly logger = new Logger(DistributedLockService.name);

/** Unique per process — identifies the locks this instance owns. */
private readonly owner = `${process.pid}-${Date.now()}-${Math.random().toString(36).slice(2)}`;

constructor(
private readonly cacheService: CacheService,
private readonly configService: ConfigService
) {}

/**
* Run `task` while holding the lock named `name`, so only one instance runs
* it at a time. If another instance holds the lock, `task` is skipped. The
* lock is released afterwards and also auto-expires after `ttlSeconds`.
*
* @returns `true` if `task` ran, `false` if it was skipped.
*/
async runExclusive(
name: string,
ttlSeconds: number,
task: () => Promise<void>
): Promise<boolean> {
const key = `lock:${name}` as const;

if (!(await this.acquire(key, ttlSeconds))) {
this.logger.debug(`Lock "${name}" held by another instance, skipping`);
return false;
}

try {
await task();
} finally {
await this.release(key);
}
return true;
}

private get usesRedis(): boolean {
return this.configService.get<ICacheConfig>('cache')?.provider === 'redis';
}

private async acquire(key: `lock:${string}`, ttlSeconds: number): Promise<boolean> {
// No Redis — no shared store to lock against; let the caller proceed.
if (!this.usesRedis) {
return true;
}
try {
return await this.cacheService.setnx(key, this.owner, ttlSeconds);
} catch (error) {
this.logger.warn(`Failed to acquire lock "${key}", proceeding anyway`, error);
return true;
}
}

private async release(key: `lock:${string}`): Promise<void> {
if (!this.usesRedis) {
return;
}
try {
// Only release a lock this instance still owns.
if ((await this.cacheService.get(key)) === this.owner) {
await this.cacheService.del(key);
Comment on lines +76 to +77
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Release Redis lock atomically with owner check

The get + del sequence is not atomic, so a lock can be deleted after ownership changes: if this process reads its own owner value, then the key expires and another instance acquires it before del runs, the subsequent del removes the new owner’s lock. That breaks mutual exclusion and can allow overlapping critical sections when tasks run near TTL boundaries. Use an atomic compare-and-delete (e.g., a Lua script) instead of separate calls.

Useful? React with 👍 / 👎.

}
} catch (error) {
this.logger.warn(`Failed to release lock "${key}"`, error);
}
}
}
2 changes: 2 additions & 0 deletions apps/nestjs-backend/src/distributed-lock/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export { DistributedLockModule } from './distributed-lock.module';
export { DistributedLockService } from './distributed-lock.service';
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,12 @@ import { Injectable } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';
import type { ISelectFieldOptions } from '@teable/core';
import { FieldType, generateRecordHistoryId } from '@teable/core';
import { DataPrismaService } from '@teable/db-data-prisma';
import type { Field } from '@teable/db-main-prisma';
import { Knex } from 'knex';
import { isEqual, isObject, isString } from 'lodash';
import { InjectModel } from 'nest-knexjs';
import { BaseConfig, IBaseConfig } from '../../configs/base.config';
import { DataLoaderService } from '../../features/data-loader/data-loader.service';
import { rawField2FieldObj } from '../../features/field/model/factory';
import { DATA_KNEX } from '../../global/knex/knex.module';
import { DatabaseRouter } from '../../global/database-router.service';
import { EventEmitterService } from '../event-emitter.service';
import { Events, RecordUpdateEvent } from '../events';

Expand All @@ -21,10 +18,9 @@ const SELECT_FIELD_TYPE_SET = new Set([FieldType.SingleSelect, FieldType.Multipl
@Injectable()
export class RecordHistoryListener {
constructor(
private readonly dataPrismaService: DataPrismaService,
private readonly databaseRouter: DatabaseRouter,
private readonly eventEmitterService: EventEmitterService,
@BaseConfig() private readonly baseConfig: IBaseConfig,
@InjectModel(DATA_KNEX) private readonly knex: Knex,
private readonly dataLoaderService: DataLoaderService
) {}

Expand Down Expand Up @@ -129,9 +125,16 @@ export class RecordHistoryListener {
});

if (recordHistoryList.length) {
const query = this.knex.insert(recordHistoryList).into('record_history').toQuery();

await this.dataPrismaService.txClient().$executeRawUnsafe(query);
const dataKnex = await this.databaseRouter.dataKnexForTable(tableId);
const dataDbUrl = await this.databaseRouter.getDataDatabaseUrlForTable(tableId);
const dataDbInternalSchema = new URL(dataDbUrl).searchParams.get('schema') || 'public';
const query = dataKnex
.withSchema(dataDbInternalSchema)
.insert(recordHistoryList)
.into('record_history')
.toQuery();

await this.databaseRouter.executeDataPrismaForTable(tableId, query);
}
}

Expand Down
Loading
Loading