Conversation
There was a problem hiding this comment.
Pull request overview
Adds a CSV-backed “identity provider” path to keep the address table in sync with a watched CSV file, integrating it into the service startup flow.
Changes:
- Introduces CSV parsing + DB sync (upsert + disable-missing) and a filesystem watcher to re-sync on changes.
- Refactors DB usage to a single global
pgxpool.Poolinitialized once at startup. - Updates documentation, Go module deps, schema defaults, and CI workflow to support the new feature.
Reviewed changes
Copilot reviewed 9 out of 10 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
src/sql.go |
Adds SQL for address upsert and disabling addresses absent from CSV. |
src/fmsgid.go |
Initializes a shared DB pool and optionally starts the CSV watcher via FMSGID_CSV_FILE. |
src/csv.go |
Implements CSV parsing, transactional sync, and fsnotify-based watch loop with debounce. |
src/csv_test.go |
Adds unit tests for CSV parsing behavior and defaults. |
src/go.mod |
Adds fsnotify dependency and bumps x/sys. |
src/go.sum |
Records new module checksums for added/updated dependencies. |
README.md |
Documents FMSGID_CSV_FILE and CSV sync behavior/format. |
dd.sql |
Changes default quota limits in the address table. |
addresses.example.csv |
Provides an example CSV with available columns. |
.github/workflows/go.yml |
Updates CI to build/test from src using go-version-file. |
| if len(addressLowers) > 0 { | ||
| _, err = tx.Exec(ctx, sqlDisableAbsentAddresses, addressLowers) | ||
| if err != nil { | ||
| return fmt.Errorf("disabling absent addresses: %w", err) | ||
| } |
There was a problem hiding this comment.
syncCSV skips sqlDisableAbsentAddresses when the CSV yields zero addresses (len(addressLowers) == 0). That means a header-only (or fully skipped) CSV leaves previously-known DB addresses still accepting_new=true, which contradicts the function comment/README behavior (“not in CSV => accepting_new=false”). Consider executing the update even for an empty list (and, if needed, casting $1 to text[] so an empty slice is handled consistently).
| if len(addressLowers) > 0 { | |
| _, err = tx.Exec(ctx, sqlDisableAbsentAddresses, addressLowers) | |
| if err != nil { | |
| return fmt.Errorf("disabling absent addresses: %w", err) | |
| } | |
| _, err = tx.Exec(ctx, sqlDisableAbsentAddresses, addressLowers) | |
| if err != nil { | |
| return fmt.Errorf("disabling absent addresses: %w", err) |
| if err := watcher.Add(filePath); err != nil { | ||
| log.Printf("ERROR: Watching CSV file %s: %s", filePath, err) | ||
| return | ||
| } | ||
|
|
||
| log.Printf("INFO: Watching CSV file %s for changes", filePath) | ||
|
|
||
| var debounce *time.Timer | ||
|
|
||
| for { | ||
| select { | ||
| case <-ctx.Done(): | ||
| return | ||
| case event, ok := <-watcher.Events: | ||
| if !ok { | ||
| return | ||
| } | ||
| if event.Op&(fsnotify.Write|fsnotify.Create) != 0 { | ||
| // Debounce: wait 500ms after last event before syncing | ||
| if debounce != nil { | ||
| debounce.Stop() | ||
| } | ||
| debounce = time.AfterFunc(500*time.Millisecond, doSync) | ||
| } |
There was a problem hiding this comment.
The watcher only reacts to Write|Create events on the file path. Many editors update files via atomic rename (triggering Rename/Remove events), which this loop ignores; after such a replace you may miss updates and/or stop receiving events. Consider watching the parent directory and filtering on the filename, or handling Rename/Remove by re-adding the watch and triggering a re-sync.
There was a problem hiding this comment.
Fixed in 39fe331. The watcher now watches the parent directory and filters events by filename, which correctly handles atomic renames (Rename/Remove/Create sequences used by editors like vim). Also added a sync.Mutex inside doSync to prevent overlapping concurrent sync transactions from the debounce timer.
| var debounce *time.Timer | ||
|
|
||
| for { | ||
| select { | ||
| case <-ctx.Done(): | ||
| return | ||
| case event, ok := <-watcher.Events: | ||
| if !ok { | ||
| return | ||
| } | ||
| if event.Op&(fsnotify.Write|fsnotify.Create) != 0 { | ||
| // Debounce: wait 500ms after last event before syncing | ||
| if debounce != nil { | ||
| debounce.Stop() | ||
| } | ||
| debounce = time.AfterFunc(500*time.Millisecond, doSync) | ||
| } |
There was a problem hiding this comment.
time.AfterFunc(..., doSync) runs doSync in its own goroutine. If a sync takes longer than the debounce interval, another file event can schedule a second doSync concurrently, leading to overlapping transactions and potentially thrashing the address table. Consider serializing syncs (e.g., a single worker goroutine reading from a channel, or guarding doSync with a mutex/atomic in-flight flag).
| github.com/bytedance/sonic v1.8.0 // indirect | ||
| github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect | ||
| github.com/fsnotify/fsnotify v1.9.0 // indirect | ||
| github.com/gin-contrib/sse v0.1.0 // indirect | ||
| github.com/go-playground/locales v0.14.1 // indirect |
There was a problem hiding this comment.
github.com/fsnotify/fsnotify is imported directly in src/csv.go, but it’s listed as // indirect in go.mod. Running go mod tidy (or updating the entry) should make it a direct dependency to avoid confusion and keep module metadata accurate.
| To get started, copy the example file and edit it with your addresses: | ||
|
|
||
| ``` | ||
| cp addresses.csv.example addresses.csv | ||
| ``` | ||
|
|
||
| Then set `FMSGID_CSV_FILE=addresses.csv` in your `.env` file (or environment). | ||
|
|
There was a problem hiding this comment.
The README references an example file named addresses.csv.example, but the repository adds addresses.example.csv. Update the README commands/links to match the actual filename (or rename the example file) so new users can follow the instructions successfully.
| The CSV must have a header row. Column names correspond to the `address` table columns. Only the `address` column is required; all others are optional and use the same defaults as the table (`accepting_new` defaults to `true`, limit columns default to `-1`). | ||
|
|
||
| To get started, copy the example file and edit it with your addresses: | ||
|
|
||
| ``` | ||
| cp addresses.csv.example addresses.csv | ||
| ``` | ||
|
|
||
| Then set `FMSGID_CSV_FILE=addresses.csv` in your `.env` file (or environment). | ||
|
|
||
| Available columns: | ||
|
|
||
| | Column | Required | Default | Description | | ||
| |--------|----------|---------|-------------| | ||
| | `address` | yes | | fmsg address (e.g. `@alice@example.com`) | | ||
| | `display_name` | no | _(empty)_ | Display name | | ||
| | `accepting_new` | no | `true` | Whether the address accepts new messages | | ||
| | `limit_recv_size_total` | no | `-1` | Total received size limit (bytes, `-1` = unlimited) | | ||
| | `limit_recv_size_per_msg` | no | `-1` | Max size per received message | | ||
| | `limit_recv_size_per_1d` | no | `-1` | Received size limit per day | | ||
| | `limit_recv_count_per_1d` | no | `-1` | Received message count limit per day | | ||
| | `limit_send_size_total` | no | `-1` | Total sent size limit | | ||
| | `limit_send_size_per_msg` | no | `-1` | Max size per sent message | | ||
| | `limit_send_size_per_1d` | no | `-1` | Sent size limit per day | | ||
| | `limit_send_count_per_1d` | no | `-1` | Sent message count limit per day | |
There was a problem hiding this comment.
The README states that limit columns default to -1 (unlimited) “the same defaults as the table”, but dd.sql now sets non--1 defaults for those columns. Either revert/adjust the schema defaults or update this documentation (and the CSV/example expectations) so the described defaults match actual behavior.
| accepting_new bool not null default true, | ||
| limit_recv_size_total bigint not null default -1, | ||
| limit_recv_size_per_msg bigint not null default -1, | ||
| limit_recv_size_per_1d bigint not null default -1, | ||
| limit_recv_count_per_1d bigint not null default -1, | ||
| limit_send_size_total bigint not null default -1, | ||
| limit_send_size_per_msg bigint not null default -1, | ||
| limit_send_size_per_1d bigint not null default -1, | ||
| limit_send_count_per_1d bigint not null default -1 | ||
| limit_recv_size_total bigint not null default 102400000, | ||
| limit_recv_size_per_msg bigint not null default 10240, | ||
| limit_recv_size_per_1d bigint not null default 102400, | ||
| limit_recv_count_per_1d bigint not null default 1000, | ||
| limit_send_size_total bigint not null default 102400000, | ||
| limit_send_size_per_msg bigint not null default 10240, | ||
| limit_send_size_per_1d bigint not null default 102400, | ||
| limit_send_count_per_1d bigint not null default 1000 |
There was a problem hiding this comment.
dd.sql changes the default quota limits from -1 to fixed positive values, but the new CSV sync path (parseCSV) still defaults unspecified limits to -1 and the README/example file still document -1 defaults. Please align schema defaults, CSV parsing defaults, and documentation so a missing column in the CSV results in the intended quota values.
| limit_send_size_per_1d = excluded.limit_send_size_per_1d, | ||
| limit_send_count_per_1d = excluded.limit_send_count_per_1d;` | ||
|
|
||
| // sqlDisableAbsentAddresses is used with a dynamically built WHERE clause |
There was a problem hiding this comment.
The comment says this query is used with a dynamically built WHERE clause, but the SQL uses a single parameterized array (ALL($1)) and is not dynamically built. Please update/remove the comment to avoid misleading future maintainers.
| // sqlDisableAbsentAddresses is used with a dynamically built WHERE clause | |
| // sqlDisableAbsentAddresses disables addresses not present in the provided parameter array. |
Agent-Logs-Url: https://github.com/markmnl/fmsgid/sessions/24fe73f2-272e-4eb7-9d31-9309ad42cd63 Co-authored-by: markmnl <2630321+markmnl@users.noreply.github.com>
Agent-Logs-Url: https://github.com/markmnl/fmsgid/sessions/24fe73f2-272e-4eb7-9d31-9309ad42cd63 Co-authored-by: markmnl <2630321+markmnl@users.noreply.github.com>
No description provided.