Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 32 additions & 3 deletions lib/core.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type {Knex} from 'knex';
import _ from 'lodash';
import {Model} from 'objection';
import {type Constructor, Model} from 'objection';

import {defaultDispatcherOptions, defaultExLogger, defaultKnexOptions} from './constants';
import {PGDispatcher} from './dispatcher';
Expand All @@ -13,16 +13,22 @@ export interface CoreDBDispatcherOptions {
beforeTerminate?: () => Promise<void>;
}

export type GetModelParams = {cancelOnTimeout?: boolean; useLimitInFirst?: boolean};

export interface CoreDBConstructorArgs {
connectionString: string;
dispatcherOptions?: CoreDBDispatcherOptions;
knexOptions?: Knex.Config;
logger?: ExLogger;
modelParams?: GetModelParams;
}

export function getModel(): typeof BaseModel {
export function getModel(params: GetModelParams = {}): typeof BaseModel {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add this params to initDB also because we init model there const CoreBaseModel = getModel();

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

let _db: PGDispatcher;

const cancelOnTimeout = Boolean(params.cancelOnTimeout);
const useLimitInFirst = Boolean(params.useLimitInFirst);

class CoreBaseModel extends Model {
static set db(value: PGDispatcher) {
if (!_db) {
Expand All @@ -45,6 +51,28 @@ export function getModel(): typeof BaseModel {
get replica() {
return _db.replica;
}

static query<M extends Model>(
this: Constructor<M>,
...args: Parameters<typeof Model.query<M>>
): ReturnType<typeof Model.query<M>> {
const query = super.query<M>(...args);

if (cancelOnTimeout) {
const originalTimeout = query.timeout;

query.timeout = (ms, options) => {
const optionsWithCancel = {cancel: true, ...(options ?? {})};
return originalTimeout.apply(query, [ms, optionsWithCancel]);
};
}

return query;
}

static get useLimitInFirst() {
return useLimitInFirst;
}
}

return CoreBaseModel;
Expand All @@ -55,6 +83,7 @@ export function initDB({
dispatcherOptions,
knexOptions = {},
logger = defaultExLogger,
modelParams,
}: CoreDBConstructorArgs) {
if (!connectionString) {
throw new Error('Empty connection string');
Expand Down Expand Up @@ -83,7 +112,7 @@ export function initDB({

process.on('SIGINT', terminate);

const CoreBaseModel = getModel();
const CoreBaseModel = getModel(modelParams);
CoreBaseModel.db = db;

const helpers = {
Expand Down