Skip to content

Commit 179a131

Browse files
committed
add track_worker_function() SQL function for worker registration
1 parent b0378fa commit 179a131

File tree

5 files changed

+85
-1
lines changed

5 files changed

+85
-1
lines changed
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
-- Track Worker Function
2+
-- Registers an edge function for monitoring by ensure_workers() cron
3+
4+
drop function if exists pgflow.track_worker_function(text);
5+
6+
create or replace function pgflow.track_worker_function(
7+
function_name text
8+
) returns void
9+
language sql
10+
as $$
11+
insert into pgflow.worker_functions (function_name, updated_at)
12+
values (track_worker_function.function_name, clock_timestamp())
13+
on conflict (function_name)
14+
do update set updated_at = clock_timestamp();
15+
$$;
16+
17+
comment on function pgflow.track_worker_function(text) is
18+
'Registers an edge function for monitoring. Called by workers on startup.';

pkgs/core/src/database-types.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -578,6 +578,10 @@ export type Database = {
578578
isSetofReturn: true
579579
}
580580
}
581+
track_worker_function: {
582+
Args: { function_name: string }
583+
Returns: undefined
584+
}
581585
}
582586
Enums: {
583587
[_ in never]: never
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
-- Create "track_worker_function" function
2+
CREATE FUNCTION "pgflow"."track_worker_function" ("function_name" text) RETURNS void LANGUAGE sql AS $$
3+
insert into pgflow.worker_functions (function_name, updated_at)
4+
values (track_worker_function.function_name, clock_timestamp())
5+
on conflict (function_name)
6+
do update set updated_at = clock_timestamp();
7+
$$;
8+
-- Set comment to function: "track_worker_function"
9+
COMMENT ON FUNCTION "pgflow"."track_worker_function" IS 'Registers an edge function for monitoring. Called by workers on startup.';

pkgs/core/supabase/migrations/atlas.sum

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
h1:IMyCv/DI0r48ffoJN/Lx1SsZLaotPIwNGXbIqw9iB7A=
1+
h1:LucdPvVTEPGtKKeNWWuGA+n9/VVFaU4c7nzMZNZQdt0=
22
20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s=
33
20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY=
44
20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg=
@@ -19,3 +19,4 @@ h1:IMyCv/DI0r48ffoJN/Lx1SsZLaotPIwNGXbIqw9iB7A=
1919
20251204115929_pgflow_temp_is_local.sql h1:pjOFO6k8FCmbxp6S7U3fPImsqW81WwdLwq/UZK74BG4=
2020
20251204142050_pgflow_temp_ensure_flow_compiled_auto_detect.sql h1:VwqZiOcVaCahb6BZ918ioFLgwQcF/sy1TR9a4lSnVvs=
2121
20251204145037_pgflow_temp_worker_functions_schema.sql h1:5DJJEP0jcg7yapTe7t6FX2ypZIj92lGvJ12AX8g5fz4=
22+
20251204145342_pgflow_temp_track_worker_function.sql h1:ABzIoNXnxx2T08Dd/JmSk+DdrXdAi2j+TVxMsuXkOIU=
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
begin;
2+
select plan(6);
3+
select pgflow_tests.reset_db();
4+
5+
-- TEST: Inserts new worker function when it does not exist
6+
select pgflow.track_worker_function('my-edge-function');
7+
select is(
8+
(select count(*) from pgflow.worker_functions where function_name = 'my-edge-function'),
9+
1::bigint,
10+
'track_worker_function() inserts new worker function'
11+
);
12+
13+
-- TEST: New worker function has enabled=true by default
14+
select is(
15+
(select enabled from pgflow.worker_functions where function_name = 'my-edge-function'),
16+
true,
17+
'New worker function has enabled=true by default'
18+
);
19+
20+
-- TEST: New worker function has default heartbeat_timeout_seconds
21+
select is(
22+
(select heartbeat_timeout_seconds from pgflow.worker_functions where function_name = 'my-edge-function'),
23+
6,
24+
'New worker function has heartbeat_timeout_seconds=6 by default'
25+
);
26+
27+
-- TEST: Upsert updates updated_at on conflict
28+
-- First, get the original updated_at timestamp
29+
select pg_sleep(0.01); -- Small delay to ensure timestamp difference
30+
select pgflow.track_worker_function('my-edge-function');
31+
select ok(
32+
(select updated_at > created_at from pgflow.worker_functions where function_name = 'my-edge-function'),
33+
'Upsert updates updated_at timestamp on conflict'
34+
);
35+
36+
-- TEST: Upsert does not duplicate rows
37+
select is(
38+
(select count(*) from pgflow.worker_functions where function_name = 'my-edge-function'),
39+
1::bigint,
40+
'Upsert does not create duplicate rows'
41+
);
42+
43+
-- TEST: Can track multiple different functions
44+
select pgflow.track_worker_function('another-function');
45+
select is(
46+
(select count(*) from pgflow.worker_functions),
47+
2::bigint,
48+
'Can track multiple different worker functions'
49+
);
50+
51+
select finish();
52+
rollback;

0 commit comments

Comments
 (0)