Skip to content

Commit d784ae0

Browse files
authored
refactor: remove last_invoked_at from track_worker_function to fix debounce handling (#520)
# Improve worker function tracking and lifecycle management This PR refines the worker function tracking mechanism and improves worker lifecycle management: 1. Removed `last_invoked_at` from `track_worker_function()` to separate concerns: - Worker registration is now handled by `track_worker_function()` - Debounce protection is managed by `ensure_workers()` setting `last_invoked_at` 2. Enhanced worker lifecycle management: - Workers now only start when in the "Created" state - Added `isCreated` getter to lifecycle interfaces - Added comprehensive unit tests for `Worker.startOnlyOnce()` 3. Updated migration file with clearer extension creation comments 4. Fixed tests to reflect the new behavior: - Updated debounce tests to manually set `last_invoked_at` - Adjusted worker function tracking tests These changes improve separation of concerns and make the worker startup process more robust.
1 parent 90276ce commit d784ae0

File tree

10 files changed

+175
-44
lines changed

10 files changed

+175
-44
lines changed

pkgs/core/schemas/0057_function_track_worker_function.sql

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,12 @@ create or replace function pgflow.track_worker_function(
66
) returns void
77
language sql
88
as $$
9-
insert into pgflow.worker_functions (function_name, updated_at, last_invoked_at)
10-
values (track_worker_function.function_name, clock_timestamp(), clock_timestamp())
9+
insert into pgflow.worker_functions (function_name, updated_at)
10+
values (track_worker_function.function_name, clock_timestamp())
1111
on conflict (function_name)
1212
do update set
13-
updated_at = clock_timestamp(),
14-
last_invoked_at = clock_timestamp();
13+
updated_at = clock_timestamp();
1514
$$;
1615

1716
comment on function pgflow.track_worker_function(text) is
18-
'Registers an edge function for monitoring. Called by workers on startup.
19-
Sets last_invoked_at to prevent cron from pinging during startup (debounce).';
17+
'Registers an edge function for monitoring. Called by workers on startup.';

pkgs/core/supabase/migrations/20251207185658_pgflow_worker_management.sql renamed to pkgs/core/supabase/migrations/20251208045933_pgflow_worker_management.sql

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
-- Create extension "pg_net"
1+
-- Create extension "pg_net" (if not exists)
22
CREATE EXTENSION IF NOT EXISTS "pg_net" WITH SCHEMA "public";
3-
-- Create extension "pg_cron"
3+
-- Create extension "pg_cron" (if not exists)
44
CREATE EXTENSION IF NOT EXISTS "pg_cron";
55
-- Modify "workers" table
66
ALTER TABLE "pgflow"."workers" ADD COLUMN "stopped_at" timestamptz NULL;
@@ -245,18 +245,15 @@ Replaces existing jobs if they exist (idempotent).
245245
Returns a confirmation message with job IDs.';
246246
-- Create "track_worker_function" function
247247
CREATE FUNCTION "pgflow"."track_worker_function" ("function_name" text) RETURNS void LANGUAGE sql AS $$
248-
insert into pgflow.worker_functions (function_name, updated_at, last_invoked_at)
249-
values (track_worker_function.function_name, clock_timestamp(), clock_timestamp())
248+
insert into pgflow.worker_functions (function_name, updated_at)
249+
values (track_worker_function.function_name, clock_timestamp())
250250
on conflict (function_name)
251251
do update set
252-
updated_at = clock_timestamp(),
253-
last_invoked_at = clock_timestamp();
252+
updated_at = clock_timestamp();
254253
$$;
255254
-- Set comment to function: "track_worker_function"
256-
COMMENT ON FUNCTION "pgflow"."track_worker_function" IS 'Registers an edge function for monitoring. Called by workers on startup.
257-
Sets last_invoked_at to prevent cron from pinging during startup (debounce).';
255+
COMMENT ON FUNCTION "pgflow"."track_worker_function" IS 'Registers an edge function for monitoring. Called by workers on startup.';
258256
-- Drop "ensure_flow_compiled" function
259257
DROP FUNCTION "pgflow"."ensure_flow_compiled" (text, jsonb, text);
260-
261258
-- Auto-install ensure_workers cron job (1 second interval)
262259
SELECT pgflow.setup_ensure_workers_cron('1 second');

pkgs/core/supabase/migrations/atlas.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
h1:GOF4slSDF67Y0jRat+hxKnNE1MIRJy/Z8pJog3iEuVk=
1+
h1:VgOpcmBfLwSXeC+q/2hT/Quc8P7YRzKa3x4cZ1z+ZRM=
22
20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s=
33
20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY=
44
20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg=
@@ -12,4 +12,4 @@ h1:GOF4slSDF67Y0jRat+hxKnNE1MIRJy/Z8pJog3iEuVk=
1212
20251103222045_pgflow_fix_broadcast_order_and_timestamp_handling.sql h1:K/XnZpOmxfelsaNoJbR5HxhBrs/oW4aYja222h5cps4=
1313
20251104080523_pgflow_upgrade_pgmq_1_5_1.sql h1:Fw7zpMWnjhAHQ0qBJAprAvGl7dJMd8ExNHg8aKvkzTg=
1414
20251130000000_pgflow_auto_compilation.sql h1:qs+3qq1Vsyo0ETzbxDnmkVtSUa6XHkd/K9wF/3W46jM=
15-
20251207185658_pgflow_worker_management.sql h1:nPudzivCuG1bwmxnNzSvvcpMjjINKXP6ypzBajzhWqY=
15+
20251208045933_pgflow_worker_management.sql h1:B9LcYfhZQ5hyoTmXLl6pd/kPik9EJOuPdGI/9THmrks=

pkgs/core/supabase/tests/ensure_workers/debounce.test.sql

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,11 @@ select pgflow_tests.reset_db();
77
select pgflow.track_worker_function('my-function');
88

99
-- TEST: Function with recent last_invoked_at is NOT returned (debounce active)
10-
-- Note: track_worker_function sets last_invoked_at to now()
10+
-- Manually set last_invoked_at to now() to simulate recent invocation
11+
update pgflow.worker_functions
12+
set last_invoked_at = now()
13+
where function_name = 'my-function';
14+
1115
set local app.settings.jwt_secret = 'super-secret-jwt-token-with-at-least-32-characters-long';
1216
select is(
1317
(select count(*) from pgflow.ensure_workers()),

pkgs/core/supabase/tests/track_worker_function/basic.test.sql

Lines changed: 9 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
begin;
2-
select plan(9);
2+
select plan(8);
33
select pgflow_tests.reset_db();
44

55
-- TEST: Inserts new worker function when it does not exist
@@ -24,19 +24,11 @@ select is(
2424
'New worker function has heartbeat_timeout_seconds=6 by default'
2525
);
2626

27-
-- TEST: New worker function has last_invoked_at set (debounce protection)
28-
select isnt(
27+
-- TEST: Does NOT set last_invoked_at on insert (debounce handled by ensure_workers)
28+
select is(
2929
(select last_invoked_at from pgflow.worker_functions where function_name = 'my-edge-function'),
3030
null::timestamptz,
31-
'New worker function has last_invoked_at set on insert (debounce protection)'
32-
);
33-
34-
-- TEST: last_invoked_at is set to approximately now
35-
select ok(
36-
(select last_invoked_at >= now() - interval '1 second'
37-
from pgflow.worker_functions
38-
where function_name = 'my-edge-function'),
39-
'last_invoked_at is set to approximately now on insert'
31+
'track_worker_function does NOT set last_invoked_at (debounce handled by ensure_workers)'
4032
);
4133

4234
-- TEST: Upsert updates updated_at on conflict
@@ -48,12 +40,11 @@ select ok(
4840
'Upsert updates updated_at timestamp on conflict'
4941
);
5042

51-
-- TEST: Upsert updates last_invoked_at on conflict
52-
select ok(
53-
(select last_invoked_at >= now() - interval '1 second'
54-
from pgflow.worker_functions
55-
where function_name = 'my-edge-function'),
56-
'Upsert updates last_invoked_at on conflict (refreshes debounce)'
43+
-- TEST: Upsert does NOT set last_invoked_at on conflict (keeps it NULL)
44+
select is(
45+
(select last_invoked_at from pgflow.worker_functions where function_name = 'my-edge-function'),
46+
null::timestamptz,
47+
'Upsert does NOT set last_invoked_at on conflict (keeps it NULL)'
5748
);
5849

5950
-- TEST: Upsert does not duplicate rows

pkgs/edge-worker/src/core/Worker.ts

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,11 @@ export class Worker {
2424
}
2525

2626
startOnlyOnce(workerBootstrap: WorkerBootstrap) {
27-
if (this.lifecycle.isRunning) {
28-
this.logger.debug('Worker already running, ignoring start request');
27+
if (!this.lifecycle.isCreated) {
28+
this.logger.debug('Worker not in Created state, ignoring start request');
2929
return;
3030
}
31-
32-
if (!this.mainLoopPromise) {
33-
this.mainLoopPromise = this.start(workerBootstrap);
34-
}
31+
this.mainLoopPromise = this.start(workerBootstrap);
3532
}
3633

3734
private async start(workerBootstrap: WorkerBootstrap) {

pkgs/edge-worker/src/core/WorkerLifecycle.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ export class WorkerLifecycle<IMessage extends Json> implements ILifecycle {
3030
this.workerState.transitionTo(States.Starting);
3131

3232
// Register this edge function for monitoring by ensure_workers() cron.
33-
// Must be called early to set last_invoked_at (debounce) before heartbeat timeout.
3433
await this.queries.trackWorkerFunction(workerBootstrap.edgeFunctionName);
3534

3635
this.logger.debug(`Ensuring queue '${this.queue.queueName}' exists...`);
@@ -95,6 +94,10 @@ export class WorkerLifecycle<IMessage extends Json> implements ILifecycle {
9594
}
9695
}
9796

97+
get isCreated() {
98+
return this.workerState.isCreated;
99+
}
100+
98101
get isRunning() {
99102
return this.workerState.isRunning;
100103
}

pkgs/edge-worker/src/core/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ export interface ILifecycle {
2626

2727
get edgeFunctionName(): string | undefined;
2828
get queueName(): string;
29+
get isCreated(): boolean;
2930
get isRunning(): boolean;
3031
get isStopping(): boolean;
3132
get isStopped(): boolean;

pkgs/edge-worker/src/flow/FlowWorkerLifecycle.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ export class FlowWorkerLifecycle<TFlow extends AnyFlow> implements ILifecycle {
4242
this._workerId = workerBootstrap.workerId;
4343

4444
// Register this edge function for monitoring by ensure_workers() cron.
45-
// Must be called early to set last_invoked_at (debounce) before heartbeat timeout.
4645
await this.queries.trackWorkerFunction(workerBootstrap.edgeFunctionName);
4746

4847
// Compile/verify flow as part of Starting (before registering worker)
@@ -129,6 +128,10 @@ export class FlowWorkerLifecycle<TFlow extends AnyFlow> implements ILifecycle {
129128
}
130129
}
131130

131+
get isCreated() {
132+
return this.workerState.isCreated;
133+
}
134+
132135
get isRunning() {
133136
return this.workerState.isRunning;
134137
}
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
import { assertEquals } from '@std/assert';
2+
import { Worker } from '../../src/core/Worker.ts';
3+
import type {
4+
IBatchProcessor,
5+
ILifecycle,
6+
WorkerBootstrap,
7+
} from '../../src/core/types.ts';
8+
import { createLoggingFactory } from '../../src/platform/logging.ts';
9+
10+
const loggingFactory = createLoggingFactory();
11+
loggingFactory.setLogLevel('error'); // Suppress debug output during tests
12+
const logger = loggingFactory.createLogger('Worker.startOnlyOnce.test');
13+
14+
// Mock ILifecycle that tracks calls and allows controlling state
15+
function createMockLifecycle(
16+
state: 'created' | 'starting' | 'running' | 'stopping' | 'stopped'
17+
): ILifecycle & { acknowledgeStartCalled: boolean } {
18+
return {
19+
acknowledgeStartCalled: false,
20+
acknowledgeStart: function () {
21+
this.acknowledgeStartCalled = true;
22+
return Promise.resolve();
23+
},
24+
acknowledgeStop: () => {},
25+
sendHeartbeat: async () => {},
26+
edgeFunctionName: 'test-function',
27+
queueName: 'test-queue',
28+
isCreated: state === 'created',
29+
isRunning: state === 'running',
30+
isStopping: state === 'stopping',
31+
isStopped: state === 'stopped',
32+
transitionToStopping: () => {},
33+
};
34+
}
35+
36+
// Mock IBatchProcessor
37+
function createMockBatchProcessor(): IBatchProcessor {
38+
return {
39+
processBatch: async () => {},
40+
awaitCompletion: async () => {},
41+
};
42+
}
43+
44+
// Mock SQL connection
45+
const mockSql = {
46+
end: async () => {},
47+
} as never;
48+
49+
const workerBootstrap: WorkerBootstrap = {
50+
edgeFunctionName: 'test-function',
51+
workerId: 'test-worker-id',
52+
};
53+
54+
Deno.test('Worker.startOnlyOnce - starts worker when in Created state', async () => {
55+
const lifecycle = createMockLifecycle('created');
56+
const batchProcessor = createMockBatchProcessor();
57+
const worker = new Worker(batchProcessor, lifecycle, mockSql as never, logger);
58+
59+
worker.startOnlyOnce(workerBootstrap);
60+
61+
// Give the async start() a moment to call acknowledgeStart
62+
await new Promise((resolve) => setTimeout(resolve, 10));
63+
64+
assertEquals(
65+
lifecycle.acknowledgeStartCalled,
66+
true,
67+
'Worker should start when in Created state'
68+
);
69+
});
70+
71+
Deno.test('Worker.startOnlyOnce - ignores request when in Starting state', async () => {
72+
const lifecycle = createMockLifecycle('starting');
73+
const batchProcessor = createMockBatchProcessor();
74+
const worker = new Worker(batchProcessor, lifecycle, mockSql as never, logger);
75+
76+
worker.startOnlyOnce(workerBootstrap);
77+
78+
// Give any async operations a moment
79+
await new Promise((resolve) => setTimeout(resolve, 10));
80+
81+
assertEquals(
82+
lifecycle.acknowledgeStartCalled,
83+
false,
84+
'Worker should NOT start when in Starting state'
85+
);
86+
});
87+
88+
Deno.test('Worker.startOnlyOnce - ignores request when in Running state', async () => {
89+
const lifecycle = createMockLifecycle('running');
90+
const batchProcessor = createMockBatchProcessor();
91+
const worker = new Worker(batchProcessor, lifecycle, mockSql as never, logger);
92+
93+
worker.startOnlyOnce(workerBootstrap);
94+
95+
// Give any async operations a moment
96+
await new Promise((resolve) => setTimeout(resolve, 10));
97+
98+
assertEquals(
99+
lifecycle.acknowledgeStartCalled,
100+
false,
101+
'Worker should NOT start when in Running state'
102+
);
103+
});
104+
105+
Deno.test('Worker.startOnlyOnce - ignores request when in Stopping state', async () => {
106+
const lifecycle = createMockLifecycle('stopping');
107+
const batchProcessor = createMockBatchProcessor();
108+
const worker = new Worker(batchProcessor, lifecycle, mockSql as never, logger);
109+
110+
worker.startOnlyOnce(workerBootstrap);
111+
112+
// Give any async operations a moment
113+
await new Promise((resolve) => setTimeout(resolve, 10));
114+
115+
assertEquals(
116+
lifecycle.acknowledgeStartCalled,
117+
false,
118+
'Worker should NOT start when in Stopping state'
119+
);
120+
});
121+
122+
Deno.test('Worker.startOnlyOnce - ignores request when in Stopped state', async () => {
123+
const lifecycle = createMockLifecycle('stopped');
124+
const batchProcessor = createMockBatchProcessor();
125+
const worker = new Worker(batchProcessor, lifecycle, mockSql as never, logger);
126+
127+
worker.startOnlyOnce(workerBootstrap);
128+
129+
// Give any async operations a moment
130+
await new Promise((resolve) => setTimeout(resolve, 10));
131+
132+
assertEquals(
133+
lifecycle.acknowledgeStartCalled,
134+
false,
135+
'Worker should NOT start when in Stopped state'
136+
);
137+
});

0 commit comments

Comments
 (0)