From 65dc632970bd2e3a75f05a156f48bd004efbbf7c Mon Sep 17 00:00:00 2001 From: Mark Mennell Date: Sat, 11 Apr 2026 15:46:06 +0800 Subject: [PATCH 1/3] long poll new messages --- .github/copilot-instructions.md | 1 + AGENTS.md | 29 ++++++ CLAUDE.md | 3 + README.md | 39 ++++++++ src/handlers/messages.go | 163 +++++++++++++++++++++++++++++++- src/main.go | 1 + 6 files changed, 235 insertions(+), 1 deletion(-) create mode 100644 .github/copilot-instructions.md create mode 100644 AGENTS.md create mode 100644 CLAUDE.md diff --git a/.github/copilot-instructions.md b/.github/copilot-instructions.md new file mode 100644 index 0000000..5cc6449 --- /dev/null +++ b/.github/copilot-instructions.md @@ -0,0 +1 @@ +Read and follow AGENTS.md before making any changes to this repository. diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 0000000..34844d7 --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,29 @@ +# Agent Instructions + +## General + +- This is a Go HTTP API backed by PostgreSQL. Source lives under `src/`. +- Build: `cd src && go build ./...` +- Test: `cd src && go test ./...` +- Always run tests after making code changes to verify nothing is broken. + +## README + +**Keep `README.md` up to date whenever you:** + +- Add, remove, or rename a route. +- Change a route's query parameters, request body, or response shape. +- Add or remove environment variables. +- Change build or run instructions. + +The API routes table and each route's section must reflect the live code in +`src/main.go` and `src/handlers/`. + +## Database + +- Schema source of truth: `https://github.com/markmnl/fmsgd/blob/main/dd.sql` +- Ensure all SQL in Go source files aligns with that schema. +- When adding recipients via the `add-to` route, update `msg.add_to_from` + in the same transaction as the `msg_add_to` inserts. +- The `new_msg_to` LISTEN/NOTIFY channel is used by the `/wait` long-poll + endpoint. Do not rename or remove the trigger without updating the handler. diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..b80f07c --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,3 @@ +# Claude Instructions + +Read and follow `AGENTS.md` before making any changes to this repository. diff --git a/README.md b/README.md index cd9ed31..7211dd6 100644 --- a/README.md +++ b/README.md @@ -61,6 +61,7 @@ All routes are prefixed with `/fmsg` and require a valid `Authorization: Bearer | Method | Path | Description | | -------- | ------------------------------------------- | ------------------------ | | `GET` | `/fmsg` | List messages for user | +| `GET` | `/fmsg/wait` | Long-poll for new messages | | `POST` | `/fmsg` | Create a draft message | | `GET` | `/fmsg/:id` | Retrieve a message | | `PUT` | `/fmsg/:id` | Update a draft message | @@ -72,6 +73,44 @@ All routes are prefixed with `/fmsg` and require a valid `Authorization: Bearer | `GET` | `/fmsg/:id/attach/:filename`| Download an attachment | | `DELETE` | `/fmsg/:id/attach/:filename`| Delete an attachment | +### GET `/fmsg/wait` + +Long-polls until a new message arrives for the authenticated user, then returns immediately. Intended for CLI and daemon clients that want near-instant delivery notification without polling. + +Uses PostgreSQL `LISTEN` on the `new_msg_to` channel — woken directly by the database trigger on new recipient rows. + +**Query parameters:** + +| Parameter | Default | Description | +| ---------- | ------- | ----------- | +| `since_id` | `0` | Only messages with `id` greater than this value are considered new | +| `timeout` | `25` | Maximum seconds to wait before returning (1–60) | + +**Response:** + +| Status | Meaning | +| ------ | ------- | +| `200` | New message available. Body: `{"has_new": true, "latest_id": }`. Use `latest_id` with `GET /fmsg/` to fetch the message. | +| `204` | Timeout elapsed — no new messages. Client should immediately re-issue the request. | + +**Errors:** + +| Status | Condition | +| ------ | --------- | +| `400` | Invalid `since_id` or `timeout` | +| `401` | Missing or invalid JWT | + +**Client loop example:** +``` +latestID = 0 +loop: + response = GET /fmsg/wait?since_id= + if response.status == 200: + latestID = response.body.latest_id + fetch and display GET /fmsg/ + # on 204 or transient error: loop immediately (with brief back-off on error) +``` + ### GET `/fmsg` Returns messages where the authenticated user is a recipient (listed in `msg_to` or `msg_add_to`), ordered by message ID descending. diff --git a/src/handlers/messages.go b/src/handlers/messages.go index e1881eb..522fa9a 100644 --- a/src/handlers/messages.go +++ b/src/handlers/messages.go @@ -60,6 +60,141 @@ type messageInput struct { Data string `json:"data"` } +// Wait handles GET /fmsg/wait — blocks until new messages are available for the authenticated user. +// Query parameters: +// - since_id (default 0): only messages with id > since_id are considered new. +// - timeout (default 25, max 60): long-poll wait timeout in seconds. +func (h *MessageHandler) Wait(c *gin.Context) { + identity := middleware.GetIdentity(c) + if identity == "" { + c.JSON(http.StatusUnauthorized, gin.H{"error": "unauthorized"}) + return + } + + sinceID := int64(0) + if s := c.Query("since_id"); s != "" { + parsed, err := strconv.ParseInt(s, 10, 64) + if err != nil || parsed < 0 { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid since_id"}) + return + } + sinceID = parsed + } + + timeoutSec := 25 + if t := c.Query("timeout"); t != "" { + parsed, err := strconv.Atoi(t) + if err != nil || parsed < 1 { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid timeout"}) + return + } + if parsed > 60 { + parsed = 60 + } + timeoutSec = parsed + } + + ctx := c.Request.Context() + + // Fast path: return immediately when messages already exist. + hasNew, latestID, err := h.hasNewMessages(ctx, identity, sinceID) + if err != nil { + log.Printf("wait messages: initial check: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to check messages"}) + return + } + if hasNew { + c.JSON(http.StatusOK, gin.H{"has_new": true, "latest_id": latestID}) + return + } + + waitCtx, cancel := context.WithTimeout(ctx, time.Duration(timeoutSec)*time.Second) + defer cancel() + + conn, err := h.DB.Pool.Acquire(waitCtx) + if err != nil { + if errors.Is(waitCtx.Err(), context.DeadlineExceeded) { + c.Status(http.StatusNoContent) + return + } + if errors.Is(waitCtx.Err(), context.Canceled) { + return + } + log.Printf("wait messages: acquire conn: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to wait for messages"}) + return + } + defer conn.Release() + + if _, err = conn.Exec(waitCtx, "LISTEN new_msg_to"); err != nil { + log.Printf("wait messages: listen: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to wait for messages"}) + return + } + defer func() { + _, _ = conn.Exec(context.Background(), "UNLISTEN new_msg_to") + }() + + for { + notif, err := conn.Conn().WaitForNotification(waitCtx) + if err != nil { + if errors.Is(waitCtx.Err(), context.DeadlineExceeded) { + c.Status(http.StatusNoContent) + return + } + if errors.Is(waitCtx.Err(), context.Canceled) { + return + } + log.Printf("wait messages: notification wait: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed while waiting for messages"}) + return + } + + // Payload is "," — check whether this notification is + // for the authenticated user and that the msg_id exceeds since_id. + msgID, addr, ok := parseNotifyPayload(notif.Payload) + if ok && strings.EqualFold(addr, identity) && msgID > sinceID { + c.JSON(http.StatusOK, gin.H{"has_new": true, "latest_id": msgID}) + return + } + } +} + +// parseNotifyPayload splits the "msg_id,addr" payload emitted by the +// notify_msg_to_insert trigger. Returns ok=false if the format is unexpected. +func parseNotifyPayload(payload string) (msgID int64, addr string, ok bool) { + idx := strings.Index(payload, ",") + if idx < 1 { + return 0, "", false + } + id, err := strconv.ParseInt(payload[:idx], 10, 64) + if err != nil { + return 0, "", false + } + return id, payload[idx+1:], true +} + +func (h *MessageHandler) hasNewMessages(ctx context.Context, identity string, sinceID int64) (bool, int64, error) { + var latestID *int64 + err := h.DB.Pool.QueryRow(ctx, + `SELECT MAX(m.id) + FROM msg m + WHERE m.id > $2 + AND ( + EXISTS (SELECT 1 FROM msg_to mt WHERE mt.msg_id = m.id AND lower(mt.addr) = lower($1)) + OR EXISTS (SELECT 1 FROM msg_add_to mat WHERE mat.msg_id = m.id AND lower(mat.addr) = lower($1)) + )`, + identity, sinceID, + ).Scan(&latestID) + if err != nil { + return false, 0, err + } + if latestID == nil { + return false, 0, nil + } + return true, *latestID, nil +} + // List handles GET /api/v1/messages — lists messages where the authenticated user is a recipient. func (h *MessageHandler) List(c *gin.Context) { identity := middleware.GetIdentity(c) @@ -615,16 +750,42 @@ func (h *MessageHandler) AddRecipients(c *gin.Context) { return } + // Keep add_to_from aligned with spec: this participant initiated the add_to batch. + tx, err := h.DB.Pool.Begin(ctx) + if err != nil { + log.Printf("add recipients: begin tx: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to add recipients"}) + return + } + defer tx.Rollback(ctx) + + if _, err = tx.Exec(ctx, + "UPDATE msg SET add_to_from = $1 WHERE id = $2", + identity, msgID, + ); err != nil { + log.Printf("add recipients: update add_to_from: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to add recipients"}) + return + } + // Insert the new add_to recipients. for _, addr := range input.AddTo { - if _, err = h.DB.Pool.Exec(ctx, + if _, err = tx.Exec(ctx, "INSERT INTO msg_add_to (msg_id, addr) VALUES ($1, $2) ON CONFLICT DO NOTHING", msgID, addr, ); err != nil { log.Printf("add recipients: insert %s: %v", addr, err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to add recipients"}) + return } } + if err = tx.Commit(ctx); err != nil { + log.Printf("add recipients: commit tx: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to add recipients"}) + return + } + c.JSON(http.StatusOK, gin.H{"id": msgID, "added": len(input.AddTo)}) } diff --git a/src/main.go b/src/main.go index 0ea0c14..cf8230e 100644 --- a/src/main.go +++ b/src/main.go @@ -51,6 +51,7 @@ func main() { fmsg := router.Group("/fmsg") fmsg.Use(jwtMiddleware.MiddlewareFunc()) { + fmsg.GET("/wait", msgHandler.Wait) fmsg.GET("", msgHandler.List) fmsg.POST("", msgHandler.Create) fmsg.GET("/:id", msgHandler.Get) From f4be87098cf4abd0291d6d94713c48c36c967cf4 Mon Sep 17 00:00:00 2001 From: Mark Mennell Date: Sat, 11 Apr 2026 15:50:25 +0800 Subject: [PATCH 2/3] to x add to distinctness --- src/handlers/messages.go | 14 ++++++++------ src/handlers/messages_test.go | 6 +++--- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/src/handlers/messages.go b/src/handlers/messages.go index 522fa9a..562927e 100644 --- a/src/handlers/messages.go +++ b/src/handlers/messages.go @@ -929,10 +929,11 @@ func isZip(data []byte) bool { return len(data) >= 4 && data[0] == 0x50 && data[1] == 0x4b && data[2] == 0x03 && data[3] == 0x04 } -// checkDistinctRecipients returns an error if any address in to or addTo -// appears more than once (case-insensitive). +// checkDistinctRecipients returns an error if any address appears more than +// once within to, or more than once within addTo. Overlap between the two +// lists is allowed. func checkDistinctRecipients(to, addTo []string) error { - seen := make(map[string]struct{}, len(to)+len(addTo)) + seen := make(map[string]struct{}, len(to)) for _, addr := range to { key := strings.ToLower(addr) if _, dup := seen[key]; dup { @@ -940,12 +941,13 @@ func checkDistinctRecipients(to, addTo []string) error { } seen[key] = struct{}{} } + addToSeen := make(map[string]struct{}, len(addTo)) for _, addr := range addTo { key := strings.ToLower(addr) - if _, dup := seen[key]; dup { - return fmt.Errorf("duplicate recipient: %s", addr) + if _, dup := addToSeen[key]; dup { + return fmt.Errorf("duplicate add_to recipient: %s", addr) } - seen[key] = struct{}{} + addToSeen[key] = struct{}{} } return nil } diff --git a/src/handlers/messages_test.go b/src/handlers/messages_test.go index 502f06c..e058ede 100644 --- a/src/handlers/messages_test.go +++ b/src/handlers/messages_test.go @@ -100,9 +100,9 @@ func TestCheckDistinctRecipients(t *testing.T) { t.Error("expected duplicate error for case-insensitive match in to") } - // Duplicate across to and addTo. - if err := checkDistinctRecipients([]string{"@a@b.com"}, []string{"@A@B.COM"}); err == nil { - t.Error("expected duplicate error across to and addTo") + // Same address in both to and addTo is allowed. + if err := checkDistinctRecipients([]string{"@a@b.com"}, []string{"@A@B.COM"}); err != nil { + t.Errorf("expected no error for address in both to and addTo, got %v", err) } // Duplicate within addTo only. From 7a99235cf0e7750b4f150711ca3adf471f2a66fe Mon Sep 17 00:00:00 2001 From: Mark Mennell Date: Sat, 11 Apr 2026 15:57:43 +0800 Subject: [PATCH 3/3] use add_to_from --- src/handlers/messages.go | 31 ++++++++++++++++++++++++------- src/models/models.go | 1 + 2 files changed, 25 insertions(+), 7 deletions(-) diff --git a/src/handlers/messages.go b/src/handlers/messages.go index 562927e..2aebbc6 100644 --- a/src/handlers/messages.go +++ b/src/handlers/messages.go @@ -47,6 +47,7 @@ type messageListItem struct { From string `json:"from"` To []string `json:"to"` AddTo []string `json:"add_to"` + AddToFrom *string `json:"add_to_from"` Time *float64 `json:"time"` Topic string `json:"topic"` Type string `json:"type"` @@ -231,7 +232,7 @@ func (h *MessageHandler) List(c *gin.Context) { ctx := c.Request.Context() rows, err := h.DB.Pool.Query(ctx, - `SELECT m.id, m.version, m.pid, m.no_reply, m.is_important, m.is_deflate, m.time_sent, m.from_addr, m.topic, m.type, m.size + `SELECT m.id, m.version, m.pid, m.no_reply, m.is_important, m.is_deflate, m.time_sent, m.from_addr, m.add_to_from, m.topic, m.type, m.size FROM msg m WHERE EXISTS (SELECT 1 FROM msg_to mt WHERE mt.msg_id = m.id AND mt.addr = $1) OR EXISTS (SELECT 1 FROM msg_add_to mat WHERE mat.msg_id = m.id AND mat.addr = $1) @@ -250,7 +251,7 @@ func (h *MessageHandler) List(c *gin.Context) { var msgIDs []int64 for rows.Next() { var m messageListItem - if err := rows.Scan(&m.ID, &m.Version, &m.PID, &m.NoReply, &m.Important, &m.Deflate, &m.Time, &m.From, &m.Topic, &m.Type, &m.Size); err != nil { + if err := rows.Scan(&m.ID, &m.Version, &m.PID, &m.NoReply, &m.Important, &m.Deflate, &m.Time, &m.From, &m.AddToFrom, &m.Topic, &m.Type, &m.Size); err != nil { log.Printf("list messages scan: %v", err) c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to list messages"}) return @@ -377,13 +378,19 @@ func (h *MessageHandler) Create(c *gin.Context) { // Parse extension from MIME type. ext := mimeToExt(msg.Type) + // add_to_from is the authenticated identity when add_to recipients are present. + var addToFrom interface{} + if len(msg.AddTo) > 0 { + addToFrom = identity + } + // Insert message row with empty filepath; update after we know the ID. var msgID int64 err := h.DB.Pool.QueryRow(ctx, - `INSERT INTO msg (version, pid, no_reply, is_important, is_deflate, from_addr, topic, type, size, filepath, time_sent) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, '', NULL) + `INSERT INTO msg (version, pid, no_reply, is_important, is_deflate, from_addr, add_to_from, topic, type, size, filepath, time_sent) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, '', NULL) RETURNING id`, - msg.Version, msg.PID, msg.NoReply, msg.Important, msg.Deflate, msg.From, msg.Topic, msg.Type, msg.Size, + msg.Version, msg.PID, msg.NoReply, msg.Important, msg.Deflate, msg.From, addToFrom, msg.Topic, msg.Type, msg.Size, ).Scan(&msgID) if err != nil { log.Printf("create message: insert: %v", err) @@ -416,6 +423,16 @@ func (h *MessageHandler) Create(c *gin.Context) { } } + // Insert add_to recipients. + for _, addr := range msg.AddTo { + if _, err = h.DB.Pool.Exec(ctx, + "INSERT INTO msg_add_to (msg_id, addr) VALUES ($1, $2) ON CONFLICT DO NOTHING", + msgID, addr, + ); err != nil { + log.Printf("create message: insert add_to recipient %s: %v", addr, err) + } + } + c.JSON(http.StatusCreated, gin.H{"id": msgID}) } @@ -792,14 +809,14 @@ func (h *MessageHandler) AddRecipients(c *gin.Context) { // fetchMessage loads a message with its recipients and attachments from the DB. func (h *MessageHandler) fetchMessage(ctx context.Context, msgID int64) (*models.Message, error) { row := h.DB.Pool.QueryRow(ctx, - `SELECT version, pid, no_reply, is_important, is_deflate, time_sent, from_addr, topic, type, size FROM msg WHERE id = $1`, + `SELECT version, pid, no_reply, is_important, is_deflate, time_sent, from_addr, add_to_from, topic, type, size FROM msg WHERE id = $1`, msgID, ) msg := &models.Message{} var pid *int64 var timeSent *float64 - if err := row.Scan(&msg.Version, &pid, &msg.NoReply, &msg.Important, &msg.Deflate, &timeSent, &msg.From, &msg.Topic, &msg.Type, &msg.Size); err != nil { + if err := row.Scan(&msg.Version, &pid, &msg.NoReply, &msg.Important, &msg.Deflate, &timeSent, &msg.From, &msg.AddToFrom, &msg.Topic, &msg.Type, &msg.Size); err != nil { return nil, err } msg.PID = pid diff --git a/src/models/models.go b/src/models/models.go index 5fb87e6..c19324e 100644 --- a/src/models/models.go +++ b/src/models/models.go @@ -18,6 +18,7 @@ type Message struct { From string `json:"from"` To []string `json:"to"` AddTo []string `json:"add_to"` + AddToFrom *string `json:"add_to_from"` Time *float64 `json:"time"` Topic string `json:"topic"` Type string `json:"type"`