@@ -16,6 +16,7 @@ import {
1616 Prisma ,
1717 PrismaClient ,
1818 PrismaClientOrTransaction ,
19+ PrismaReplicaClient ,
1920 TaskRun ,
2021 TaskRunExecutionSnapshot ,
2122 Waitpoint ,
@@ -50,6 +51,7 @@ import { TtlSystem } from "./systems/ttlSystem.js";
5051import { WaitpointSystem } from "./systems/waitpointSystem.js" ;
5152import { EngineWorker , HeartbeatTimeouts , RunEngineOptions , TriggerParams } from "./types.js" ;
5253import { workerCatalog } from "./workerCatalog.js" ;
54+ import { getFinalRunStatuses , isFinalRunStatus } from "./statuses.js" ;
5355
5456export class RunEngine {
5557 private runLockRedis : Redis ;
@@ -61,6 +63,7 @@ export class RunEngine {
6163 private heartbeatTimeouts : HeartbeatTimeouts ;
6264
6365 prisma : PrismaClient ;
66+ readOnlyPrisma : PrismaReplicaClient ;
6467 runQueue : RunQueue ;
6568 eventBus : EventBus = new EventEmitter < EventBusEvents > ( ) ;
6669 executionSnapshotSystem : ExecutionSnapshotSystem ;
@@ -79,6 +82,7 @@ export class RunEngine {
7982 constructor ( private readonly options : RunEngineOptions ) {
8083 this . logger = options . logger ?? new Logger ( "RunEngine" , this . options . logLevel ?? "info" ) ;
8184 this . prisma = options . prisma ;
85+ this . readOnlyPrisma = options . readOnlyPrisma ?? this . prisma ;
8286 this . runLockRedis = createRedisClient (
8387 {
8488 ...options . runLock . redis ,
@@ -123,7 +127,7 @@ export class RunEngine {
123127 defaultEnvConcurrencyLimit : options . queue ?. defaultEnvConcurrency ?? 10 ,
124128 } ) ,
125129 defaultEnvConcurrency : options . queue ?. defaultEnvConcurrency ?? 10 ,
126- logger : new Logger ( "RunQueue" , this . options . logLevel ?? "info" ) ,
130+ logger : new Logger ( "RunQueue" , options . queue ? .logLevel ?? "info" ) ,
127131 redis : { ...options . queue . redis , keyPrefix : `${ options . queue . redis . keyPrefix } runqueue:` } ,
128132 retryOptions : options . queue ?. retryOptions ,
129133 workerOptions : {
@@ -133,6 +137,13 @@ export class RunEngine {
133137 immediatePollIntervalMs : options . worker . immediatePollIntervalMs ,
134138 shutdownTimeoutMs : options . worker . shutdownTimeoutMs ,
135139 } ,
140+ concurrencySweeper : {
141+ scanSchedule : options . queue ?. concurrencySweeper ?. scanSchedule ,
142+ processMarkedSchedule : options . queue ?. concurrencySweeper ?. processMarkedSchedule ,
143+ scanJitterInMs : options . queue ?. concurrencySweeper ?. scanJitterInMs ,
144+ processMarkedJitterInMs : options . queue ?. concurrencySweeper ?. processMarkedJitterInMs ,
145+ callback : this . #concurrencySweeperCallback. bind ( this ) ,
146+ } ,
136147 shardCount : options . queue ?. shardCount ,
137148 masterQueueConsumersDisabled : options . queue ?. masterQueueConsumersDisabled ,
138149 masterQueueConsumersIntervalMs : options . queue ?. masterQueueConsumersIntervalMs ,
@@ -1329,4 +1340,44 @@ export class RunEngine {
13291340 }
13301341 } ) ;
13311342 }
1343+
1344+ async #concurrencySweeperCallback(
1345+ runIds : string [ ]
1346+ ) : Promise < Array < { id : string ; orgId : string } > > {
1347+ const runs = await this . readOnlyPrisma . taskRun . findMany ( {
1348+ where : {
1349+ id : { in : runIds } ,
1350+ completedAt : {
1351+ lte : new Date ( Date . now ( ) - 1000 * 60 * 10 ) , // This only finds runs that were completed more than 10 minutes ago
1352+ } ,
1353+ organizationId : {
1354+ not : null ,
1355+ } ,
1356+ status : {
1357+ in : getFinalRunStatuses ( ) ,
1358+ } ,
1359+ } ,
1360+ select : {
1361+ id : true ,
1362+ status : true ,
1363+ organizationId : true ,
1364+ } ,
1365+ } ) ;
1366+
1367+ // Log the finished runs
1368+ for ( const run of runs ) {
1369+ this . logger . info ( "Concurrency sweeper callback found finished run" , {
1370+ runId : run . id ,
1371+ orgId : run . organizationId ,
1372+ status : run . status ,
1373+ } ) ;
1374+ }
1375+
1376+ return runs
1377+ . filter ( ( run ) => ! ! run . organizationId )
1378+ . map ( ( run ) => ( {
1379+ id : run . id ,
1380+ orgId : run . organizationId ! ,
1381+ } ) ) ;
1382+ }
13321383}
0 commit comments