Skip to content

Commit f49c503

Browse files
authored
feat: add track_worker_function for edge function monitoring (#505)
# Add track_worker_function for edge function monitoring This PR introduces a new SQL function `pgflow.track_worker_function()` that allows edge functions to register themselves for monitoring by the `ensure_workers()` cron job. The function: - Takes a function name as input - Inserts or updates a record in the `pgflow.worker_functions` table - Updates the `updated_at` timestamp on each call This enables worker functions to "check in" periodically, allowing the system to detect and restart any workers that have stopped responding. Added comprehensive tests to verify: - New worker functions are inserted correctly - Default values are set appropriately (enabled=true, heartbeat_timeout=6) - Timestamps are updated properly on subsequent calls - Multiple different functions can be tracked Also updated TypeScript database types to include the new function.
1 parent 1a75ef1 commit f49c503

File tree

5 files changed

+114
-1
lines changed

5 files changed

+114
-1
lines changed
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
-- Track Worker Function
2+
-- Registers an edge function for monitoring by ensure_workers() cron
3+
4+
drop function if exists pgflow.track_worker_function(text);
5+
6+
create or replace function pgflow.track_worker_function(
7+
function_name text
8+
) returns void
9+
language sql
10+
as $$
11+
insert into pgflow.worker_functions (function_name, updated_at, last_invoked_at)
12+
values (track_worker_function.function_name, clock_timestamp(), clock_timestamp())
13+
on conflict (function_name)
14+
do update set
15+
updated_at = clock_timestamp(),
16+
last_invoked_at = clock_timestamp();
17+
$$;
18+
19+
comment on function pgflow.track_worker_function(text) is
20+
'Registers an edge function for monitoring. Called by workers on startup.
21+
Sets last_invoked_at to prevent cron from pinging during startup (debounce).';

pkgs/core/src/database-types.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -578,6 +578,10 @@ export type Database = {
578578
isSetofReturn: true
579579
}
580580
}
581+
track_worker_function: {
582+
Args: { function_name: string }
583+
Returns: undefined
584+
}
581585
}
582586
Enums: {
583587
[_ in never]: never
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
-- Create "track_worker_function" function
2+
CREATE FUNCTION "pgflow"."track_worker_function" ("function_name" text) RETURNS void LANGUAGE sql AS $$
3+
insert into pgflow.worker_functions (function_name, updated_at, last_invoked_at)
4+
values (track_worker_function.function_name, clock_timestamp(), clock_timestamp())
5+
on conflict (function_name)
6+
do update set
7+
updated_at = clock_timestamp(),
8+
last_invoked_at = clock_timestamp();
9+
$$;
10+
-- Set comment to function: "track_worker_function"
11+
COMMENT ON FUNCTION "pgflow"."track_worker_function" IS 'Registers an edge function for monitoring. Called by workers on startup.
12+
Sets last_invoked_at to prevent cron from pinging during startup (debounce).';

pkgs/core/supabase/migrations/atlas.sum

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
h1:1+E3ra0bCEMcyTaOltxInks1QXaP9l45RgA3GSyY7To=
1+
h1:Uh2fD3kgXbd4Emzw0ZL3tDr0KKpzWSFzI0tkl0XnytM=
22
20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s=
33
20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY=
44
20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg=
@@ -15,3 +15,4 @@ h1:1+E3ra0bCEMcyTaOltxInks1QXaP9l45RgA3GSyY7To=
1515
20251204115929_pgflow_temp_is_local.sql h1:arP+PC2OYI9ktAFx0+G9/w8zsaq/AbFWjLgK6YDujvQ=
1616
20251204142050_pgflow_temp_ensure_flow_compiled_auto_detect.sql h1:9FsEd0iyaIv9X1alACBQCzyebt2/+m1rgL4ozXJWBcA=
1717
20251204145037_pgflow_temp_worker_functions_schema.sql h1:mrLKtkM8aWHBY+LIxQrsXxE7aKuPJaQJO2qd0NnrJ74=
18+
20251204164612_pgflow_temp_track_worker_function.sql h1:3Ht8wUx3saKPo98osuTG/nxD/tKR48qe8jHNVwuu2lY=
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
begin;
2+
select plan(9);
3+
select pgflow_tests.reset_db();
4+
5+
-- TEST: Inserts new worker function when it does not exist
6+
select pgflow.track_worker_function('my-edge-function');
7+
select is(
8+
(select count(*) from pgflow.worker_functions where function_name = 'my-edge-function'),
9+
1::bigint,
10+
'track_worker_function() inserts new worker function'
11+
);
12+
13+
-- TEST: New worker function has enabled=true by default
14+
select is(
15+
(select enabled from pgflow.worker_functions where function_name = 'my-edge-function'),
16+
true,
17+
'New worker function has enabled=true by default'
18+
);
19+
20+
-- TEST: New worker function has default heartbeat_timeout_seconds
21+
select is(
22+
(select heartbeat_timeout_seconds from pgflow.worker_functions where function_name = 'my-edge-function'),
23+
6,
24+
'New worker function has heartbeat_timeout_seconds=6 by default'
25+
);
26+
27+
-- TEST: New worker function has last_invoked_at set (debounce protection)
28+
select isnt(
29+
(select last_invoked_at from pgflow.worker_functions where function_name = 'my-edge-function'),
30+
null::timestamptz,
31+
'New worker function has last_invoked_at set on insert (debounce protection)'
32+
);
33+
34+
-- TEST: last_invoked_at is set to approximately now
35+
select ok(
36+
(select last_invoked_at >= now() - interval '1 second'
37+
from pgflow.worker_functions
38+
where function_name = 'my-edge-function'),
39+
'last_invoked_at is set to approximately now on insert'
40+
);
41+
42+
-- TEST: Upsert updates updated_at on conflict
43+
-- First, get the original updated_at timestamp
44+
select pg_sleep(0.01); -- Small delay to ensure timestamp difference
45+
select pgflow.track_worker_function('my-edge-function');
46+
select ok(
47+
(select updated_at > created_at from pgflow.worker_functions where function_name = 'my-edge-function'),
48+
'Upsert updates updated_at timestamp on conflict'
49+
);
50+
51+
-- TEST: Upsert updates last_invoked_at on conflict
52+
select ok(
53+
(select last_invoked_at >= now() - interval '1 second'
54+
from pgflow.worker_functions
55+
where function_name = 'my-edge-function'),
56+
'Upsert updates last_invoked_at on conflict (refreshes debounce)'
57+
);
58+
59+
-- TEST: Upsert does not duplicate rows
60+
select is(
61+
(select count(*) from pgflow.worker_functions where function_name = 'my-edge-function'),
62+
1::bigint,
63+
'Upsert does not create duplicate rows'
64+
);
65+
66+
-- TEST: Can track multiple different functions
67+
select pgflow.track_worker_function('another-function');
68+
select is(
69+
(select count(*) from pgflow.worker_functions),
70+
2::bigint,
71+
'Can track multiple different worker functions'
72+
);
73+
74+
select finish();
75+
rollback;

0 commit comments

Comments
 (0)