Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import { SearchModule } from './search/search.module';
import { AnalyticsModule } from './analytics/analytics.module';

import { MessagingModule } from './messaging/messaging.module';

Check warning on line 11 in src/app.module.ts

View workflow job for this annotation

GitHub Actions / Quality Gates (lint)

'MessagingModule' is defined but never used. Allowed unused vars must match /^_/u

Check warning on line 11 in src/app.module.ts

View workflow job for this annotation

GitHub Actions / ESLint

'MessagingModule' is defined but never used. Allowed unused vars must match /^_/u
import { IndexOptimizationModule } from './database/index-optimization/index-optimization.module';
import { RateLimitingModule } from './rate-limiting/rate-limiting.module';
import { QuotaGuard } from './rate-limiting/guards/quota.guard';
Expand All @@ -21,6 +21,8 @@
import { IncidentManagementModule } from './incident-management/incident-management.module';
import { MonitoringModule } from './monitoring/monitoring.module';
import { RequestTimeoutInterceptor } from './common/interceptors/request-timeout.interceptor';
import { IdempotencyModule } from './common/modules/idempotency.module';
import { IdempotencyInterceptor } from './common/interceptors/idempotency.interceptor';
import { DeepLinkModule } from './deep-link/deep-link.module';
import { InvoicesModule } from './payments/invoices/invoices.module';
import { ReportingModule } from './payments/reporting/reporting.module';
Expand Down Expand Up @@ -48,6 +50,7 @@
CanaryModule,
IncidentManagementModule,
MonitoringModule,
IdempotencyModule,
DeepLinkModule,
InvoicesModule,
ReportingModule,
Expand All @@ -66,6 +69,7 @@
providers: [
...(featureFlags.ENABLE_RATE_LIMITING ? [{ provide: APP_GUARD, useClass: QuotaGuard }] : []),
{ provide: APP_INTERCEPTOR, useClass: RequestTimeoutInterceptor },
{ provide: APP_INTERCEPTOR, useClass: IdempotencyInterceptor },
],
})
export class AppModule {}
export class AppModule {}
9 changes: 9 additions & 0 deletions src/common/constants/idempotency.constants.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
export const IDEMPOTENCY_REDIS_CLIENT = 'IDEMPOTENCY_REDIS_CLIENT';

export const IDEMPOTENCY_METADATA_KEY = 'idempotency:options';
export const IDEMPOTENCY_DEFAULT_HEADER_NAME = 'Idempotency-Key';
export const IDEMPOTENCY_DEFAULT_TTL_SECONDS = 86400;
export const IDEMPOTENCY_DEFAULT_LOCK_TTL_MS = 5000;
export const IDEMPOTENCY_DEFAULT_POLL_INTERVAL_MS = 50;
export const IDEMPOTENCY_DEFAULT_WAIT_TIMEOUT_MS = 5000;

15 changes: 11 additions & 4 deletions src/common/decorators/idempotency.decorator.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
import { SetMetadata } from '@nestjs/common';

export const IDEMPOTENCY_KEY_METADATA = 'idempotency:ttl';
import { IDEMPOTENCY_METADATA_KEY, IDEMPOTENCY_DEFAULT_HEADER_NAME, IDEMPOTENCY_DEFAULT_TTL_SECONDS } from '../constants/idempotency.constants';

export interface IdempotencyOptions {
ttl?: number; // Time-to-live in seconds
headerName?: string; // Custom header name for idempotency key
lockTtlMs?: number;
pollIntervalMs?: number;
waitTimeoutMs?: number;
}

export const Idempotent = (options: IdempotencyOptions = {}) => {
const ttl = options.ttl || 86400; // Default 24 hours
return SetMetadata(IDEMPOTENCY_KEY_METADATA, ttl);
return SetMetadata(IDEMPOTENCY_METADATA_KEY, {
ttl: options.ttl ?? IDEMPOTENCY_DEFAULT_TTL_SECONDS,
headerName: options.headerName ?? IDEMPOTENCY_DEFAULT_HEADER_NAME,
lockTtlMs: options.lockTtlMs,
pollIntervalMs: options.pollIntervalMs,
waitTimeoutMs: options.waitTimeoutMs,
});
};
162 changes: 126 additions & 36 deletions src/common/interceptors/idempotency.interceptor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,26 @@ import {
NestInterceptor,
ExecutionContext,
CallHandler,
HttpException,
HttpStatus,
ConflictException,
BadRequestException,
} from '@nestjs/common';
import { Reflector } from '@nestjs/core';
import { Observable, of, throwError } from 'rxjs';
import { catchError, tap } from 'rxjs/operators';
import { Observable, of, from } from 'rxjs';
import { finalize, map, mergeMap } from 'rxjs/operators';
import { Request, Response } from 'express';
import { IdempotencyService } from '../services/idempotency.service';
import { IDEMPOTENCY_KEY_METADATA } from '../decorators/idempotency.decorator';
import {
IdempotencyRecord,
IdempotencyService,
} from '../services/idempotency.service';
import {
IDEMPOTENCY_DEFAULT_HEADER_NAME,
IDEMPOTENCY_DEFAULT_LOCK_TTL_MS,
IDEMPOTENCY_DEFAULT_POLL_INTERVAL_MS,
IDEMPOTENCY_DEFAULT_WAIT_TIMEOUT_MS,
IDEMPOTENCY_METADATA_KEY,
} from '../constants/idempotency.constants';
import { IdempotencyOptions } from '../decorators/idempotency.decorator';

@Injectable()
export class IdempotencyInterceptor implements NestInterceptor {
Expand All @@ -30,55 +41,134 @@ export class IdempotencyInterceptor implements NestInterceptor {
}

// Check if endpoint is marked as idempotent
const ttl = this.reflector.get<number>(IDEMPOTENCY_KEY_METADATA, context.getHandler());
if (!ttl) {
const options = this.reflector.getAllAndOverride<IdempotencyOptions>(IDEMPOTENCY_METADATA_KEY, [
context.getHandler(),
context.getClass(),
]);
if (!options) {
return next.handle();
}

// Get idempotency key from header
const idempotencyKey = request.headers['x-idempotency-key'] as string;
const headerName = (options.headerName ?? IDEMPOTENCY_DEFAULT_HEADER_NAME).toLowerCase();
const idempotencyKey = this.getHeaderValue(request, headerName);
if (!idempotencyKey) {
throw new HttpException(
'X-Idempotency-Key header is required for this operation',
HttpStatus.BAD_REQUEST,
throw new BadRequestException(
`${options.headerName ?? IDEMPOTENCY_DEFAULT_HEADER_NAME} header is required for this operation`,
);
}

const routePath = this.getRoutePath(request);
const scopeKey = this.idempotencyService.buildScopeKey({
method: request.method,
routePath,
idempotencyKey,
});
const fingerprint = this.idempotencyService.buildFingerprint({
method: request.method,
routePath,
body: request.body,
query: request.query,
params: request.params,
});

// Check if request already processed
const existingRecord = await this.idempotencyService.getRecord(idempotencyKey);
const existingRecord = await this.idempotencyService.getRecord(scopeKey);
if (existingRecord) {
response.status(existingRecord.statusCode);
this.assertFingerprintMatch(existingRecord, fingerprint);
this.applyCachedResponse(response, existingRecord);
return of(existingRecord.response);
}

// Try to acquire lock
const lockAcquired = await this.idempotencyService.acquireLock(idempotencyKey);
const lockAcquired = await this.idempotencyService.acquireLock(
scopeKey,
fingerprint,
options.lockTtlMs ?? IDEMPOTENCY_DEFAULT_LOCK_TTL_MS,
);
if (!lockAcquired) {
throw new HttpException('Request is being processed, please wait', HttpStatus.CONFLICT);
const lockRecord = await this.idempotencyService.getLockRecord(scopeKey);

if (lockRecord && lockRecord.fingerprint !== fingerprint) {
throw new ConflictException('Idempotency key already used for a different payload');
}

const cachedRecord = await this.idempotencyService.waitForRecord(
scopeKey,
options.waitTimeoutMs ?? IDEMPOTENCY_DEFAULT_WAIT_TIMEOUT_MS,
options.pollIntervalMs ?? IDEMPOTENCY_DEFAULT_POLL_INTERVAL_MS,
);

if (cachedRecord) {
this.assertFingerprintMatch(cachedRecord, fingerprint);
this.applyCachedResponse(response, cachedRecord);
return of(cachedRecord.response);
}

throw new ConflictException('Request is being processed, please retry');
}

try {
// Process the request
return next.handle().pipe(
tap(async (data) => {
// Save successful response
await this.idempotencyService.saveRecord(idempotencyKey, {
idempotencyKey,
return next.handle().pipe(
mergeMap((data) =>
from(
this.idempotencyService.saveRecord(scopeKey, {
idempotencyKey: scopeKey,
fingerprint,
statusCode: response.statusCode || HttpStatus.OK,
response: data,
timestamp: Date.now(),
ttl,
});
}),
catchError(async (error) => {
// Release lock on error
await this.idempotencyService.releaseLock(idempotencyKey);
return throwError(() => error);
}),
);
} catch (error) {
await this.idempotencyService.releaseLock(idempotencyKey);
throw error;
cachedAt: Date.now(),
ttlSeconds: options.ttl,
} as IdempotencyRecord),
).pipe(map(() => data)),
),
finalize(() => {
void this.idempotencyService.releaseLock(scopeKey);
}),
);
}

private applyCachedResponse(response: Response, record: IdempotencyRecord): void {
response.status(record.statusCode);
response.setHeader('X-Idempotent-Replayed', 'true');

if (record.responseHeaders) {
for (const [headerName, headerValue] of Object.entries(record.responseHeaders)) {
response.setHeader(headerName, headerValue);
}
}
}

private assertFingerprintMatch(record: IdempotencyRecord, fingerprint: string): void {
if (record.fingerprint !== fingerprint) {
throw new ConflictException('Idempotency key already used for a different payload');
}
}

private getRoutePath(request: Request): string {
return `${request.baseUrl || ''}${request.route?.path || request.path || ''}`;
}

private getHeaderValue(request: Request, headerName: string): string | undefined {
const normalizedHeaderName = headerName.toLowerCase();
const candidates = new Set([
normalizedHeaderName,
headerName,
normalizedHeaderName.startsWith('x-')
? normalizedHeaderName.replace(/^x-/, '')
: `x-${normalizedHeaderName}`,
]);

const headerValue = [...candidates].reduce<string | string[] | undefined>((value, candidate) => {
if (value !== undefined) {
return value;
}

return request.headers[candidate];
}, undefined);

if (Array.isArray(headerValue)) {
return headerValue[0];
}

return typeof headerValue === 'string' ? headerValue : undefined;
}
}
15 changes: 14 additions & 1 deletion src/common/modules/idempotency.module.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,22 @@
import { Module } from '@nestjs/common';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { getSharedRedisClient } from '../../config/cache.config';
import { IDEMPOTENCY_REDIS_CLIENT } from '../constants/idempotency.constants';
import { IdempotencyService } from '../services/idempotency.service';
import { IdempotencyInterceptor } from '../interceptors/idempotency.interceptor';

@Module({
providers: [IdempotencyService, IdempotencyInterceptor],
imports: [ConfigModule],
providers: [
{
provide: IDEMPOTENCY_REDIS_CLIENT,
inject: [ConfigService],
useFactory: (configService: ConfigService): ReturnType<typeof getSharedRedisClient> =>
getSharedRedisClient(configService),
},
IdempotencyService,
IdempotencyInterceptor,
],
exports: [IdempotencyService, IdempotencyInterceptor],
})
export class IdempotencyModule {}
Loading
Loading