Skip to content

Commit a6b7275

Browse files
committed
add track_worker_function() SQL function for worker registration
1 parent b0378fa commit a6b7275

File tree

5 files changed

+114
-1
lines changed

5 files changed

+114
-1
lines changed
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
-- Track Worker Function
2+
-- Registers an edge function for monitoring by ensure_workers() cron
3+
4+
drop function if exists pgflow.track_worker_function(text);
5+
6+
create or replace function pgflow.track_worker_function(
7+
function_name text
8+
) returns void
9+
language sql
10+
as $$
11+
insert into pgflow.worker_functions (function_name, updated_at, last_invoked_at)
12+
values (track_worker_function.function_name, clock_timestamp(), clock_timestamp())
13+
on conflict (function_name)
14+
do update set
15+
updated_at = clock_timestamp(),
16+
last_invoked_at = clock_timestamp();
17+
$$;
18+
19+
comment on function pgflow.track_worker_function(text) is
20+
'Registers an edge function for monitoring. Called by workers on startup.
21+
Sets last_invoked_at to prevent cron from pinging during startup (debounce).';

pkgs/core/src/database-types.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -578,6 +578,10 @@ export type Database = {
578578
isSetofReturn: true
579579
}
580580
}
581+
track_worker_function: {
582+
Args: { function_name: string }
583+
Returns: undefined
584+
}
581585
}
582586
Enums: {
583587
[_ in never]: never
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
-- Create "track_worker_function" function
2+
CREATE FUNCTION "pgflow"."track_worker_function" ("function_name" text) RETURNS void LANGUAGE sql AS $$
3+
insert into pgflow.worker_functions (function_name, updated_at, last_invoked_at)
4+
values (track_worker_function.function_name, clock_timestamp(), clock_timestamp())
5+
on conflict (function_name)
6+
do update set
7+
updated_at = clock_timestamp(),
8+
last_invoked_at = clock_timestamp();
9+
$$;
10+
-- Set comment to function: "track_worker_function"
11+
COMMENT ON FUNCTION "pgflow"."track_worker_function" IS 'Registers an edge function for monitoring. Called by workers on startup.
12+
Sets last_invoked_at to prevent cron from pinging during startup (debounce).';

pkgs/core/supabase/migrations/atlas.sum

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
h1:IMyCv/DI0r48ffoJN/Lx1SsZLaotPIwNGXbIqw9iB7A=
1+
h1:eMTY2CZPFm2BibVAHjtIMhCWIFcmTcqQq9VRXfs1/1A=
22
20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s=
33
20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY=
44
20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg=
@@ -19,3 +19,4 @@ h1:IMyCv/DI0r48ffoJN/Lx1SsZLaotPIwNGXbIqw9iB7A=
1919
20251204115929_pgflow_temp_is_local.sql h1:pjOFO6k8FCmbxp6S7U3fPImsqW81WwdLwq/UZK74BG4=
2020
20251204142050_pgflow_temp_ensure_flow_compiled_auto_detect.sql h1:VwqZiOcVaCahb6BZ918ioFLgwQcF/sy1TR9a4lSnVvs=
2121
20251204145037_pgflow_temp_worker_functions_schema.sql h1:5DJJEP0jcg7yapTe7t6FX2ypZIj92lGvJ12AX8g5fz4=
22+
20251204164612_pgflow_temp_track_worker_function.sql h1:SC8Z4Un37A2XVaIIvyZJ0eQ/7r0JENSi6RcBx4GaUCE=
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
begin;
2+
select plan(9);
3+
select pgflow_tests.reset_db();
4+
5+
-- TEST: Inserts new worker function when it does not exist
6+
select pgflow.track_worker_function('my-edge-function');
7+
select is(
8+
(select count(*) from pgflow.worker_functions where function_name = 'my-edge-function'),
9+
1::bigint,
10+
'track_worker_function() inserts new worker function'
11+
);
12+
13+
-- TEST: New worker function has enabled=true by default
14+
select is(
15+
(select enabled from pgflow.worker_functions where function_name = 'my-edge-function'),
16+
true,
17+
'New worker function has enabled=true by default'
18+
);
19+
20+
-- TEST: New worker function has default heartbeat_timeout_seconds
21+
select is(
22+
(select heartbeat_timeout_seconds from pgflow.worker_functions where function_name = 'my-edge-function'),
23+
6,
24+
'New worker function has heartbeat_timeout_seconds=6 by default'
25+
);
26+
27+
-- TEST: New worker function has last_invoked_at set (debounce protection)
28+
select isnt(
29+
(select last_invoked_at from pgflow.worker_functions where function_name = 'my-edge-function'),
30+
null::timestamptz,
31+
'New worker function has last_invoked_at set on insert (debounce protection)'
32+
);
33+
34+
-- TEST: last_invoked_at is set to approximately now
35+
select ok(
36+
(select last_invoked_at >= now() - interval '1 second'
37+
from pgflow.worker_functions
38+
where function_name = 'my-edge-function'),
39+
'last_invoked_at is set to approximately now on insert'
40+
);
41+
42+
-- TEST: Upsert updates updated_at on conflict
43+
-- First, get the original updated_at timestamp
44+
select pg_sleep(0.01); -- Small delay to ensure timestamp difference
45+
select pgflow.track_worker_function('my-edge-function');
46+
select ok(
47+
(select updated_at > created_at from pgflow.worker_functions where function_name = 'my-edge-function'),
48+
'Upsert updates updated_at timestamp on conflict'
49+
);
50+
51+
-- TEST: Upsert updates last_invoked_at on conflict
52+
select ok(
53+
(select last_invoked_at >= now() - interval '1 second'
54+
from pgflow.worker_functions
55+
where function_name = 'my-edge-function'),
56+
'Upsert updates last_invoked_at on conflict (refreshes debounce)'
57+
);
58+
59+
-- TEST: Upsert does not duplicate rows
60+
select is(
61+
(select count(*) from pgflow.worker_functions where function_name = 'my-edge-function'),
62+
1::bigint,
63+
'Upsert does not create duplicate rows'
64+
);
65+
66+
-- TEST: Can track multiple different functions
67+
select pgflow.track_worker_function('another-function');
68+
select is(
69+
(select count(*) from pgflow.worker_functions),
70+
2::bigint,
71+
'Can track multiple different worker functions'
72+
);
73+
74+
select finish();
75+
rollback;

0 commit comments

Comments
 (0)