diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index b9403ad..127fb0e 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -1,28 +1,24 @@ -name: Go 1.19 +name: Build & Test on: - push: - branches: [ "main" ] + workflow_dispatch: pull_request: - branches: [ "main" ] + branches: [main] jobs: - - build: + build-and-test: runs-on: ubuntu-latest - defaults: - run: - working-directory: ./src steps: - - uses: actions/checkout@v3 - - - name: Set up Go - uses: actions/setup-go@v3 - with: - go-version: 1.19 + - uses: actions/checkout@v4 + + - uses: actions/setup-go@v5 + with: + go-version-file: src/go.mod - - name: Build - run: go build -v . + - name: Build + working-directory: src + run: go build ./... - - name: Test - run: go test -v . + - name: Test + working-directory: src + run: go test ./... diff --git a/.gitignore b/.gitignore index ead0741..465de6a 100644 --- a/.gitignore +++ b/.gitignore @@ -13,4 +13,4 @@ .env .vscode/ -.claude/ \ No newline at end of file +.claude/fmsgid diff --git a/README.md b/README.md index 19c5758..b52fb8e 100644 --- a/README.md +++ b/README.md @@ -15,8 +15,15 @@ See .env.example for a list of environment variables which can be copied to a `. ``` GIN_MODE=release FMSGID_PORT=8080 +FMSGID_CSV_FILE=/path/to/addresses.csv ``` +| Variable | Description | Default | +|----------|-------------|---------| +| `GIN_MODE` | Gin framework mode (`release`, `debug`, `test`) | `debug` | +| `FMSGID_PORT` | Port to listen on | `8080` | +| `FMSGID_CSV_FILE` | Path to a CSV file to sync addresses from. When set, fmsgid watches the file for changes and automatically syncs the `address` table. When unset, CSV sync is disabled. | _(unset)_ | + ## Build From the `src` directory: @@ -34,6 +41,41 @@ PostgreSQL database with tables created from `dd.sql` is required. The database ./fmsgid ``` +## CSV Identity Provider + +When `FMSGID_CSV_FILE` is set, fmsgid reads the CSV file at startup and watches it for changes using filesystem notifications. On each change the `address` table is synced: + +- Addresses in the CSV are **upserted** (created or updated). +- Addresses in the database but **not** in the CSV have `accepting_new` set to `false` (they are not deleted). + +The CSV must have a header row. Column names correspond to the `address` table columns. Only the `address` column is required; all others are optional and use the same defaults as the table. + +To get started, copy the example file and edit it with your addresses: + +``` +cp addresses.csv.example addresses.csv +``` + +Then set `FMSGID_CSV_FILE=addresses.csv` in your `.env` file (or environment). + +Available columns: + +| Column | Required | Default | Description | +|--------|----------|---------|-------------| +| `address` | yes | | fmsg address (e.g. `@alice@example.com`) | +| `display_name` | no | _(empty)_ | Display name | +| `accepting_new` | no | `true` | Whether the address accepts new messages | +| `limit_recv_size_total` | no | `102400000` | Total received size limit (bytes) | +| `limit_recv_size_per_msg` | no | `10240` | Max size per received message | +| `limit_recv_size_per_1d` | no | `102400` | Received size limit per day | +| `limit_recv_count_per_1d` | no | `1000` | Received message count limit per day | +| `limit_send_size_total` | no | `102400000` | Total sent size limit | +| `limit_send_size_per_msg` | no | `10240` | Max size per sent message | +| `limit_send_size_per_1d` | no | `102400` | Sent size limit per day | +| `limit_send_count_per_1d` | no | `1000` | Sent message count limit per day | + +See `addresses.csv.example` for a complete example with all columns. + ## API Routes All routes are served over HTTPS under the `/fmsgid` path. diff --git a/addresses.example.csv b/addresses.example.csv new file mode 100644 index 0000000..5f06967 --- /dev/null +++ b/addresses.example.csv @@ -0,0 +1,3 @@ +address,display_name,accepting_new,limit_recv_size_total,limit_recv_size_per_msg,limit_recv_size_per_1d,limit_recv_count_per_1d,limit_send_size_total,limit_send_size_per_msg,limit_send_size_per_1d,limit_send_count_per_1d +@alice@example.com,Alice,true,102400000,10240,102400,1000,102400000,10240,102400,1000 +@bob@example.com,Bob,true,102400000,10240,102400,1000,102400000,10240,102400,1000 diff --git a/dd.sql b/dd.sql index 42f1e83..dcfc926 100644 --- a/dd.sql +++ b/dd.sql @@ -11,14 +11,14 @@ create table if not exists address ( address text not null, display_name text, accepting_new bool not null default true, - limit_recv_size_total bigint not null default -1, - limit_recv_size_per_msg bigint not null default -1, - limit_recv_size_per_1d bigint not null default -1, - limit_recv_count_per_1d bigint not null default -1, - limit_send_size_total bigint not null default -1, - limit_send_size_per_msg bigint not null default -1, - limit_send_size_per_1d bigint not null default -1, - limit_send_count_per_1d bigint not null default -1 + limit_recv_size_total bigint not null default 102400000, + limit_recv_size_per_msg bigint not null default 10240, + limit_recv_size_per_1d bigint not null default 102400, + limit_recv_count_per_1d bigint not null default 1000, + limit_send_size_total bigint not null default 102400000, + limit_send_size_per_msg bigint not null default 10240, + limit_send_size_per_1d bigint not null default 102400, + limit_send_count_per_1d bigint not null default 1000 ); create table if not exists address_tx ( diff --git a/src/csv.go b/src/csv.go new file mode 100644 index 0000000..ccd2de1 --- /dev/null +++ b/src/csv.go @@ -0,0 +1,266 @@ +package main + +import ( + "context" + "encoding/csv" + "fmt" + "log" + "os" + "path/filepath" + "strconv" + "strings" + "sync" + "time" + + "github.com/fsnotify/fsnotify" + "github.com/jackc/pgx/v4" + "github.com/jackc/pgx/v4/pgxpool" + "golang.org/x/text/cases" +) + +type addressRow struct { + AddressLower string + Address string + DisplayName string + AcceptingNew bool + LimitRecvSizeTotal int64 + LimitRecvSizePerMsg int64 + LimitRecvSizePer1d int64 + LimitRecvCountPer1d int64 + LimitSendSizeTotal int64 + LimitSendSizePerMsg int64 + LimitSendSizePer1d int64 + LimitSendCountPer1d int64 +} + +// parseCSV reads a CSV file and returns address rows. The CSV must have a header +// row with column names matching the address table columns. Only the "address" +// column is required; others use defaults (accepting_new=true, limits=-1). +// Malformed rows are logged and skipped. +func parseCSV(filePath string) ([]addressRow, error) { + f, err := os.Open(filePath) + if err != nil { + return nil, fmt.Errorf("opening CSV file: %w", err) + } + defer f.Close() + + reader := csv.NewReader(f) + reader.TrimLeadingSpace = true + + records, err := reader.ReadAll() + if err != nil { + return nil, fmt.Errorf("reading CSV: %w", err) + } + + if len(records) < 1 { + return nil, fmt.Errorf("CSV file is empty (no header row)") + } + + // Build column index from header + header := records[0] + colIdx := make(map[string]int, len(header)) + for i, name := range header { + colIdx[strings.TrimSpace(strings.ToLower(name))] = i + } + + addrCol, ok := colIdx["address"] + if !ok { + return nil, fmt.Errorf("CSV missing required 'address' column in header") + } + + fold := cases.Fold() + var rows []addressRow + + for lineNum, record := range records[1:] { + csvLine := lineNum + 2 // 1-indexed, skip header + + if len(record) <= addrCol { + log.Printf("WARN: CSV line %d: not enough columns, skipping", csvLine) + continue + } + + addr := strings.TrimSpace(record[addrCol]) + if addr == "" { + log.Printf("WARN: CSV line %d: empty address, skipping", csvLine) + continue + } + + row := addressRow{ + Address: addr, + AddressLower: fold.String(addr), + AcceptingNew: true, + LimitRecvSizeTotal: 102400000, + LimitRecvSizePerMsg: 10240, + LimitRecvSizePer1d: 102400, + LimitRecvCountPer1d: 1000, + LimitSendSizeTotal: 102400000, + LimitSendSizePerMsg: 10240, + LimitSendSizePer1d: 102400, + LimitSendCountPer1d: 1000, + } + + if idx, ok := colIdx["display_name"]; ok && idx < len(record) { + row.DisplayName = strings.TrimSpace(record[idx]) + } + if idx, ok := colIdx["accepting_new"]; ok && idx < len(record) { + val := strings.TrimSpace(strings.ToLower(record[idx])) + if val != "" { + b, err := strconv.ParseBool(val) + if err != nil { + log.Printf("WARN: CSV line %d: invalid accepting_new %q, skipping row", csvLine, val) + continue + } + row.AcceptingNew = b + } + } + + parseOK := true + parseInt64Col := func(colName string, dest *int64) { + if idx, ok := colIdx[colName]; ok && idx < len(record) { + val := strings.TrimSpace(record[idx]) + if val != "" { + n, err := strconv.ParseInt(val, 10, 64) + if err != nil { + log.Printf("WARN: CSV line %d: invalid %s %q, skipping row", csvLine, colName, val) + parseOK = false + return + } + *dest = n + } + } + } + + parseInt64Col("limit_recv_size_total", &row.LimitRecvSizeTotal) + parseInt64Col("limit_recv_size_per_msg", &row.LimitRecvSizePerMsg) + parseInt64Col("limit_recv_size_per_1d", &row.LimitRecvSizePer1d) + parseInt64Col("limit_recv_count_per_1d", &row.LimitRecvCountPer1d) + parseInt64Col("limit_send_size_total", &row.LimitSendSizeTotal) + parseInt64Col("limit_send_size_per_msg", &row.LimitSendSizePerMsg) + parseInt64Col("limit_send_size_per_1d", &row.LimitSendSizePer1d) + parseInt64Col("limit_send_count_per_1d", &row.LimitSendCountPer1d) + + if !parseOK { + continue + } + + rows = append(rows, row) + } + + return rows, nil +} + +// syncCSV upserts the given addresses into the database and sets accepting_new=false +// for any addresses in the DB that are not present in the given slice. +func syncCSV(ctx context.Context, pool *pgxpool.Pool, addresses []addressRow) error { + tx, err := pool.Begin(ctx) + if err != nil { + return fmt.Errorf("beginning transaction: %w", err) + } + defer tx.Rollback(ctx) + + batch := &pgx.Batch{} + addressLowers := make([]string, 0, len(addresses)) + + for _, a := range addresses { + batch.Queue(sqlUpsertAddress, + a.AddressLower, a.Address, a.DisplayName, a.AcceptingNew, + a.LimitRecvSizeTotal, a.LimitRecvSizePerMsg, a.LimitRecvSizePer1d, a.LimitRecvCountPer1d, + a.LimitSendSizeTotal, a.LimitSendSizePerMsg, a.LimitSendSizePer1d, a.LimitSendCountPer1d, + ) + addressLowers = append(addressLowers, a.AddressLower) + } + + br := tx.SendBatch(ctx, batch) + for range addresses { + if _, err := br.Exec(); err != nil { + br.Close() + return fmt.Errorf("upserting address: %w", err) + } + } + br.Close() + + // Disable addresses not present in the CSV + if len(addressLowers) > 0 { + _, err = tx.Exec(ctx, sqlDisableAbsentAddresses, addressLowers) + if err != nil { + return fmt.Errorf("disabling absent addresses: %w", err) + } + } + + if err := tx.Commit(ctx); err != nil { + return fmt.Errorf("committing transaction: %w", err) + } + + return nil +} + +// startCSVWatcher performs an initial CSV sync and then watches the file for changes, +// re-syncing on each write or replace. It blocks until the context is cancelled. +func startCSVWatcher(ctx context.Context, pool *pgxpool.Pool, filePath string) { + // mu serializes concurrent sync calls that may be triggered by the debounce timer. + var mu sync.Mutex + doSync := func() { + mu.Lock() + defer mu.Unlock() + addresses, err := parseCSV(filePath) + if err != nil { + log.Printf("ERROR: CSV parse: %s", err) + return + } + if err := syncCSV(ctx, pool, addresses); err != nil { + log.Printf("ERROR: CSV sync: %s", err) + return + } + log.Printf("INFO: CSV sync complete, %d addresses processed", len(addresses)) + } + + doSync() + + watcher, err := fsnotify.NewWatcher() + if err != nil { + log.Printf("ERROR: Creating CSV file watcher: %s", err) + return + } + defer watcher.Close() + + // Watch the parent directory so that atomic renames (used by many editors + // and tools) trigger events even when the original inode is replaced. + dir := filepath.Dir(filePath) + base := filepath.Base(filePath) + + if err := watcher.Add(dir); err != nil { + log.Printf("ERROR: Watching directory %s: %s", dir, err) + return + } + + log.Printf("INFO: Watching CSV file %s for changes", filePath) + + var debounce *time.Timer + + for { + select { + case <-ctx.Done(): + return + case event, ok := <-watcher.Events: + if !ok { + return + } + // Filter to events on our target file only. + if filepath.Base(event.Name) != base { + continue + } + if event.Op&(fsnotify.Write|fsnotify.Create|fsnotify.Rename|fsnotify.Remove) != 0 { + // Debounce: wait 500ms after last event before syncing + if debounce != nil { + debounce.Stop() + } + debounce = time.AfterFunc(500*time.Millisecond, doSync) + } + case err, ok := <-watcher.Errors: + if !ok { + return + } + log.Printf("ERROR: CSV file watcher: %s", err) + } + } +} diff --git a/src/csv_test.go b/src/csv_test.go new file mode 100644 index 0000000..de7ea67 --- /dev/null +++ b/src/csv_test.go @@ -0,0 +1,260 @@ +package main + +import ( + "os" + "path/filepath" + "testing" +) + +func writeTestCSV(t *testing.T, content string) string { + t.Helper() + path := filepath.Join(t.TempDir(), "test.csv") + err := os.WriteFile(path, []byte(content), 0600) + if err != nil { + t.Fatal(err) + } + return path +} + +func TestParseCSV_AllColumns(t *testing.T) { + csv := `address,display_name,accepting_new,limit_recv_size_total,limit_recv_size_per_msg,limit_recv_size_per_1d,limit_recv_count_per_1d,limit_send_size_total,limit_send_size_per_msg,limit_send_size_per_1d,limit_send_count_per_1d +@alice@example.com,Alice,true,100,200,300,400,500,600,700,800 +` + rows, err := parseCSV(writeTestCSV(t, csv)) + if err != nil { + t.Fatal(err) + } + if len(rows) != 1 { + t.Fatalf("expected 1 row, got %d", len(rows)) + } + r := rows[0] + if r.Address != "@alice@example.com" { + t.Errorf("Address = %q", r.Address) + } + if r.DisplayName != "Alice" { + t.Errorf("DisplayName = %q", r.DisplayName) + } + if r.AcceptingNew != true { + t.Errorf("AcceptingNew = %v", r.AcceptingNew) + } + if r.LimitRecvSizeTotal != 100 { + t.Errorf("LimitRecvSizeTotal = %d", r.LimitRecvSizeTotal) + } + if r.LimitRecvSizePerMsg != 200 { + t.Errorf("LimitRecvSizePerMsg = %d", r.LimitRecvSizePerMsg) + } + if r.LimitRecvSizePer1d != 300 { + t.Errorf("LimitRecvSizePer1d = %d", r.LimitRecvSizePer1d) + } + if r.LimitRecvCountPer1d != 400 { + t.Errorf("LimitRecvCountPer1d = %d", r.LimitRecvCountPer1d) + } + if r.LimitSendSizeTotal != 500 { + t.Errorf("LimitSendSizeTotal = %d", r.LimitSendSizeTotal) + } + if r.LimitSendSizePerMsg != 600 { + t.Errorf("LimitSendSizePerMsg = %d", r.LimitSendSizePerMsg) + } + if r.LimitSendSizePer1d != 700 { + t.Errorf("LimitSendSizePer1d = %d", r.LimitSendSizePer1d) + } + if r.LimitSendCountPer1d != 800 { + t.Errorf("LimitSendCountPer1d = %d", r.LimitSendCountPer1d) + } +} + +func TestParseCSV_AddressOnly(t *testing.T) { + csv := "address\n@bob@example.com\n" + rows, err := parseCSV(writeTestCSV(t, csv)) + if err != nil { + t.Fatal(err) + } + if len(rows) != 1 { + t.Fatalf("expected 1 row, got %d", len(rows)) + } + r := rows[0] + if r.Address != "@bob@example.com" { + t.Errorf("Address = %q", r.Address) + } + if r.AcceptingNew != true { + t.Errorf("AcceptingNew should default to true, got %v", r.AcceptingNew) + } + if r.LimitRecvSizeTotal != 102400000 { + t.Errorf("LimitRecvSizeTotal should default to 102400000, got %d", r.LimitRecvSizeTotal) + } + if r.LimitSendSizePerMsg != 10240 { + t.Errorf("LimitSendSizePerMsg should default to 10240, got %d", r.LimitSendSizePerMsg) + } +} + +func TestParseCSV_CaseFolding(t *testing.T) { + csv := "address\n@Alice@Example.COM\n" + rows, err := parseCSV(writeTestCSV(t, csv)) + if err != nil { + t.Fatal(err) + } + if len(rows) != 1 { + t.Fatalf("expected 1 row, got %d", len(rows)) + } + if rows[0].Address != "@Alice@Example.COM" { + t.Errorf("Address should preserve original case, got %q", rows[0].Address) + } + if rows[0].AddressLower != "@alice@example.com" { + t.Errorf("AddressLower should be case-folded, got %q", rows[0].AddressLower) + } +} + +func TestParseCSV_Defaults(t *testing.T) { + csv := "address,display_name\n@user@example.com,\n" + rows, err := parseCSV(writeTestCSV(t, csv)) + if err != nil { + t.Fatal(err) + } + if len(rows) != 1 { + t.Fatalf("expected 1 row, got %d", len(rows)) + } + r := rows[0] + if r.DisplayName != "" { + t.Errorf("DisplayName should be empty, got %q", r.DisplayName) + } + if r.AcceptingNew != true { + t.Errorf("AcceptingNew should default to true") + } + expectedDefaults := []int64{ + 102400000, 10240, 102400, 1000, + 102400000, 10240, 102400, 1000, + } + actuals := []int64{ + r.LimitRecvSizeTotal, r.LimitRecvSizePerMsg, r.LimitRecvSizePer1d, r.LimitRecvCountPer1d, + r.LimitSendSizeTotal, r.LimitSendSizePerMsg, r.LimitSendSizePer1d, r.LimitSendCountPer1d, + } + for i, v := range actuals { + if v != expectedDefaults[i] { + t.Errorf("limit[%d] should default to %d, got %d", i, expectedDefaults[i], v) + } + } +} + +func TestParseCSV_SkipsMalformedRows(t *testing.T) { + csv := `address,accepting_new,limit_recv_size_total +@good@example.com,true,100 +@bad-bool@example.com,notabool,100 +@bad-limit@example.com,true,notanumber +@also-good@example.com,false,200 +` + rows, err := parseCSV(writeTestCSV(t, csv)) + if err != nil { + t.Fatal(err) + } + if len(rows) != 2 { + t.Fatalf("expected 2 valid rows, got %d", len(rows)) + } + if rows[0].Address != "@good@example.com" { + t.Errorf("first row Address = %q", rows[0].Address) + } + if rows[1].Address != "@also-good@example.com" { + t.Errorf("second row Address = %q", rows[1].Address) + } + if rows[1].AcceptingNew != false { + t.Errorf("second row AcceptingNew should be false") + } +} + +func TestParseCSV_SkipsEmptyAddress(t *testing.T) { + csv := "address,display_name\n,Alice\n@real@example.com,Bob\n" + rows, err := parseCSV(writeTestCSV(t, csv)) + if err != nil { + t.Fatal(err) + } + if len(rows) != 1 { + t.Fatalf("expected 1 row, got %d", len(rows)) + } + if rows[0].Address != "@real@example.com" { + t.Errorf("Address = %q", rows[0].Address) + } +} + +func TestParseCSV_EmptyFile(t *testing.T) { + path := writeTestCSV(t, "") + _, err := parseCSV(path) + if err == nil { + t.Fatal("expected error for empty file") + } +} + +func TestParseCSV_MissingAddressColumn(t *testing.T) { + csv := "display_name,accepting_new\nAlice,true\n" + _, err := parseCSV(writeTestCSV(t, csv)) + if err == nil { + t.Fatal("expected error for missing address column") + } +} + +func TestParseCSV_FileNotFound(t *testing.T) { + _, err := parseCSV("/nonexistent/path/file.csv") + if err == nil { + t.Fatal("expected error for missing file") + } +} + +func TestParseCSV_ColumnOrderIndependent(t *testing.T) { + csv := `display_name,limit_send_size_per_msg,address,accepting_new +Carol,4096,@carol@example.com,false +` + rows, err := parseCSV(writeTestCSV(t, csv)) + if err != nil { + t.Fatal(err) + } + if len(rows) != 1 { + t.Fatalf("expected 1 row, got %d", len(rows)) + } + r := rows[0] + if r.Address != "@carol@example.com" { + t.Errorf("Address = %q", r.Address) + } + if r.DisplayName != "Carol" { + t.Errorf("DisplayName = %q", r.DisplayName) + } + if r.AcceptingNew != false { + t.Errorf("AcceptingNew = %v", r.AcceptingNew) + } + if r.LimitSendSizePerMsg != 4096 { + t.Errorf("LimitSendSizePerMsg = %d", r.LimitSendSizePerMsg) + } + // Unspecified limits should use defaults + if r.LimitRecvSizeTotal != 102400000 { + t.Errorf("LimitRecvSizeTotal should be 102400000, got %d", r.LimitRecvSizeTotal) + } +} + +func TestParseCSV_MultipleRows(t *testing.T) { + csv := `address,display_name +@a@example.com,Alpha +@b@example.com,Bravo +@c@example.com,Charlie +` + rows, err := parseCSV(writeTestCSV(t, csv)) + if err != nil { + t.Fatal(err) + } + if len(rows) != 3 { + t.Fatalf("expected 3 rows, got %d", len(rows)) + } +} + +func TestParseCSV_HeaderCaseInsensitive(t *testing.T) { + csv := "ADDRESS,Display_Name,ACCEPTING_NEW\n@user@example.com,Test,false\n" + rows, err := parseCSV(writeTestCSV(t, csv)) + if err != nil { + t.Fatal(err) + } + if len(rows) != 1 { + t.Fatalf("expected 1 row, got %d", len(rows)) + } + if rows[0].DisplayName != "Test" { + t.Errorf("DisplayName = %q", rows[0].DisplayName) + } + if rows[0].AcceptingNew != false { + t.Errorf("AcceptingNew should be false") + } +} diff --git a/src/fmsgid.go b/src/fmsgid.go index 3836b7f..5d44523 100644 --- a/src/fmsgid.go +++ b/src/fmsgid.go @@ -8,12 +8,13 @@ import ( "strings" "github.com/gin-gonic/gin" - "github.com/jackc/pgx/v4" "github.com/jackc/pgx/v4/pgxpool" "github.com/joho/godotenv" "golang.org/x/text/cases" ) +var pool *pgxpool.Pool + func init() { // Load .env file if present (ignore error if not found) _ = godotenv.Load() @@ -56,30 +57,18 @@ type AddressDetail struct { Tags []string `json:"tags"` } -func testDb() error { - ctx := context.Background() - db, err := pgx.Connect(ctx, "") +func initPool() error { + var err error + pool, err = pgxpool.Connect(context.Background(), "") if err != nil { return err } - defer db.Close(ctx) - err = db.Ping(ctx) - if err != nil { - return err - } - // TODO check at least tables exist - return nil + return pool.Ping(context.Background()) } func getAddressDetail(c *gin.Context) { ctx := c.Request.Context() - pool, err := pgxpool.Connect(ctx, "") - if err != nil { - c.AbortWithError(500, err) - } - defer pool.Close() - // TODO move data to body to hide addr, hasAddr := c.Params.Get("address") if !hasAddr { @@ -165,12 +154,6 @@ func postAddressTx(c *gin.Context, typ int) { return } - pool, err := pgxpool.Connect(ctx, "") - if err != nil { - c.AbortWithError(500, err) - } - defer pool.Close() - _, err = pool.Exec(ctx, sqlInsertTx, tx.Address, tx.Timestamp, typ, tx.Size) if err != nil { c.AbortWithError(500, err) @@ -181,11 +164,21 @@ func postAddressTx(c *gin.Context, typ int) { func main() { log.SetPrefix("fmsgid: ") - err := testDb() + err := initPool() if err != nil { - log.Fatalf("ERROR: Failed to initDb: %s", err) + log.Fatalf("ERROR: Failed to connect to database: %s", err) } - log.Println("INFO: Database initalized") + defer pool.Close() + log.Println("INFO: Database initialized") + + // Start CSV sync if configured + csvFile := os.Getenv("FMSGID_CSV_FILE") + if csvFile != "" { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go startCSVWatcher(ctx, pool, csvFile) + } + port := os.Getenv("FMSGID_PORT") if port == "" { port = "8080" diff --git a/src/go.mod b/src/go.mod index 8f5483e..ececcc2 100644 --- a/src/go.mod +++ b/src/go.mod @@ -3,6 +3,7 @@ module github.com/markmnl/fmsgid go 1.25.0 require ( + github.com/fsnotify/fsnotify v1.9.0 github.com/gin-gonic/gin v1.9.0 github.com/jackc/pgx/v4 v4.18.1 github.com/joho/godotenv v1.5.1 @@ -37,7 +38,7 @@ require ( golang.org/x/arch v0.0.0-20210923205945-b76863e36670 // indirect golang.org/x/crypto v0.6.0 // indirect golang.org/x/net v0.7.0 // indirect - golang.org/x/sys v0.5.0 // indirect + golang.org/x/sys v0.13.0 // indirect google.golang.org/protobuf v1.28.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/src/go.sum b/src/go.sum index 55ea3ba..e804c8e 100644 --- a/src/go.sum +++ b/src/go.sum @@ -14,6 +14,8 @@ github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7Do github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k= +github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0= github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE= github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI= github.com/gin-gonic/gin v1.9.0 h1:OjyFBKICoexlu99ctXNR2gg+c5pKrKMuyjgARg9qeY8= @@ -216,8 +218,9 @@ golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= +golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= diff --git a/src/sql.go b/src/sql.go index fa7f62a..c277720 100644 --- a/src/sql.go +++ b/src/sql.go @@ -19,6 +19,27 @@ where const sqlInsertTx string = `insert into address_tx (address_lower, ts, type, size) VALUES ($1, to_timestamp($2), $3, $4);` +const sqlUpsertAddress string = `insert into address ( + address_lower, address, display_name, accepting_new, + limit_recv_size_total, limit_recv_size_per_msg, limit_recv_size_per_1d, limit_recv_count_per_1d, + limit_send_size_total, limit_send_size_per_msg, limit_send_size_per_1d, limit_send_count_per_1d +) values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) +on conflict (address_lower) do update set + address = excluded.address, + display_name = excluded.display_name, + accepting_new = excluded.accepting_new, + limit_recv_size_total = excluded.limit_recv_size_total, + limit_recv_size_per_msg = excluded.limit_recv_size_per_msg, + limit_recv_size_per_1d = excluded.limit_recv_size_per_1d, + limit_recv_count_per_1d = excluded.limit_recv_count_per_1d, + limit_send_size_total = excluded.limit_send_size_total, + limit_send_size_per_msg = excluded.limit_send_size_per_msg, + limit_send_size_per_1d = excluded.limit_send_size_per_1d, + limit_send_count_per_1d = excluded.limit_send_count_per_1d;` + +// sqlDisableAbsentAddresses disables addresses not present in the provided parameter array. +const sqlDisableAbsentAddresses string = `update address set accepting_new = false where address_lower != ALL($1);` + const sqlActuals string = `select coalesce(sum(size) filter (where type = 2), 0) as sent_size_total , coalesce(sum(size) filter (where type = 2 and ts > now() - interval '1 day'), 0) as sent_size_1d