Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/copilot-instructions.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Read and follow AGENTS.md before making any changes to this repository.
29 changes: 29 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Agent Instructions

## General

- This is a Go HTTP API backed by PostgreSQL. Source lives under `src/`.
- Build: `cd src && go build ./...`
- Test: `cd src && go test ./...`
- Always run tests after making code changes to verify nothing is broken.

## README

**Keep `README.md` up to date whenever you:**

- Add, remove, or rename a route.
- Change a route's query parameters, request body, or response shape.
- Add or remove environment variables.
- Change build or run instructions.

The API routes table and each route's section must reflect the live code in
`src/main.go` and `src/handlers/`.

## Database

- Schema source of truth: `https://github.com/markmnl/fmsgd/blob/main/dd.sql`
- Ensure all SQL in Go source files aligns with that schema.
- When adding recipients via the `add-to` route, update `msg.add_to_from`
in the same transaction as the `msg_add_to` inserts.
- The `new_msg_to` LISTEN/NOTIFY channel is used by the `/wait` long-poll
endpoint. Do not rename or remove the trigger without updating the handler.
3 changes: 3 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Claude Instructions

Read and follow `AGENTS.md` before making any changes to this repository.
39 changes: 39 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ All routes are prefixed with `/fmsg` and require a valid `Authorization: Bearer
| Method | Path | Description |
| -------- | ------------------------------------------- | ------------------------ |
| `GET` | `/fmsg` | List messages for user |
| `GET` | `/fmsg/wait` | Long-poll for new messages |
| `POST` | `/fmsg` | Create a draft message |
| `GET` | `/fmsg/:id` | Retrieve a message |
| `PUT` | `/fmsg/:id` | Update a draft message |
Expand All @@ -72,6 +73,44 @@ All routes are prefixed with `/fmsg` and require a valid `Authorization: Bearer
| `GET` | `/fmsg/:id/attach/:filename`| Download an attachment |
| `DELETE` | `/fmsg/:id/attach/:filename`| Delete an attachment |

### GET `/fmsg/wait`

Long-polls until a new message arrives for the authenticated user, then returns immediately. Intended for CLI and daemon clients that want near-instant delivery notification without polling.

Uses PostgreSQL `LISTEN` on the `new_msg_to` channel — woken directly by the database trigger on new recipient rows.

**Query parameters:**

| Parameter | Default | Description |
| ---------- | ------- | ----------- |
| `since_id` | `0` | Only messages with `id` greater than this value are considered new |
| `timeout` | `25` | Maximum seconds to wait before returning (1–60) |

**Response:**

| Status | Meaning |
| ------ | ------- |
| `200` | New message available. Body: `{"has_new": true, "latest_id": <id>}`. Use `latest_id` with `GET /fmsg/<id>` to fetch the message. |
| `204` | Timeout elapsed — no new messages. Client should immediately re-issue the request. |

**Errors:**

| Status | Condition |
| ------ | --------- |
| `400` | Invalid `since_id` or `timeout` |
| `401` | Missing or invalid JWT |

**Client loop example:**
```
latestID = 0
loop:
response = GET /fmsg/wait?since_id=<latestID>
if response.status == 200:
latestID = response.body.latest_id
fetch and display GET /fmsg/<latestID>
# on 204 or transient error: loop immediately (with brief back-off on error)
```

### GET `/fmsg`

Returns messages where the authenticated user is a recipient (listed in `msg_to` or `msg_add_to`), ordered by message ID descending.
Expand Down
177 changes: 170 additions & 7 deletions src/handlers/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,141 @@ type messageInput struct {
Data string `json:"data"`
}

// Wait handles GET /fmsg/wait — blocks until new messages are available for the authenticated user.
// Query parameters:
// - since_id (default 0): only messages with id > since_id are considered new.
// - timeout (default 25, max 60): long-poll wait timeout in seconds.
func (h *MessageHandler) Wait(c *gin.Context) {
identity := middleware.GetIdentity(c)
if identity == "" {
c.JSON(http.StatusUnauthorized, gin.H{"error": "unauthorized"})
return
}

sinceID := int64(0)
if s := c.Query("since_id"); s != "" {
parsed, err := strconv.ParseInt(s, 10, 64)
if err != nil || parsed < 0 {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid since_id"})
return
}
sinceID = parsed
}

timeoutSec := 25
if t := c.Query("timeout"); t != "" {
parsed, err := strconv.Atoi(t)
if err != nil || parsed < 1 {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid timeout"})
return
}
if parsed > 60 {
parsed = 60
}
timeoutSec = parsed
}

ctx := c.Request.Context()

// Fast path: return immediately when messages already exist.
hasNew, latestID, err := h.hasNewMessages(ctx, identity, sinceID)
if err != nil {
log.Printf("wait messages: initial check: %v", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to check messages"})
return
}
if hasNew {
c.JSON(http.StatusOK, gin.H{"has_new": true, "latest_id": latestID})
return
}

waitCtx, cancel := context.WithTimeout(ctx, time.Duration(timeoutSec)*time.Second)
defer cancel()

conn, err := h.DB.Pool.Acquire(waitCtx)
if err != nil {
if errors.Is(waitCtx.Err(), context.DeadlineExceeded) {
c.Status(http.StatusNoContent)
return
}
if errors.Is(waitCtx.Err(), context.Canceled) {
return
}
log.Printf("wait messages: acquire conn: %v", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to wait for messages"})
return
}
defer conn.Release()

if _, err = conn.Exec(waitCtx, "LISTEN new_msg_to"); err != nil {
log.Printf("wait messages: listen: %v", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to wait for messages"})
return
}
defer func() {
_, _ = conn.Exec(context.Background(), "UNLISTEN new_msg_to")
}()

for {
notif, err := conn.Conn().WaitForNotification(waitCtx)
if err != nil {
if errors.Is(waitCtx.Err(), context.DeadlineExceeded) {
c.Status(http.StatusNoContent)
return
}
if errors.Is(waitCtx.Err(), context.Canceled) {
return
}
log.Printf("wait messages: notification wait: %v", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed while waiting for messages"})
return
}

// Payload is "<msg_id>,<addr>" — check whether this notification is
// for the authenticated user and that the msg_id exceeds since_id.
msgID, addr, ok := parseNotifyPayload(notif.Payload)
if ok && strings.EqualFold(addr, identity) && msgID > sinceID {
c.JSON(http.StatusOK, gin.H{"has_new": true, "latest_id": msgID})
return
}
}
}

// parseNotifyPayload splits the "msg_id,addr" payload emitted by the
// notify_msg_to_insert trigger. Returns ok=false if the format is unexpected.
func parseNotifyPayload(payload string) (msgID int64, addr string, ok bool) {
idx := strings.Index(payload, ",")
if idx < 1 {
return 0, "", false
}
id, err := strconv.ParseInt(payload[:idx], 10, 64)
if err != nil {
return 0, "", false
}
return id, payload[idx+1:], true
}

func (h *MessageHandler) hasNewMessages(ctx context.Context, identity string, sinceID int64) (bool, int64, error) {
var latestID *int64
err := h.DB.Pool.QueryRow(ctx,
`SELECT MAX(m.id)
FROM msg m
WHERE m.id > $2
AND (
EXISTS (SELECT 1 FROM msg_to mt WHERE mt.msg_id = m.id AND lower(mt.addr) = lower($1))
OR EXISTS (SELECT 1 FROM msg_add_to mat WHERE mat.msg_id = m.id AND lower(mat.addr) = lower($1))
)`,
identity, sinceID,
).Scan(&latestID)
if err != nil {
return false, 0, err
}
if latestID == nil {
return false, 0, nil
}
return true, *latestID, nil
}

// List handles GET /api/v1/messages — lists messages where the authenticated user is a recipient.
func (h *MessageHandler) List(c *gin.Context) {
identity := middleware.GetIdentity(c)
Expand Down Expand Up @@ -615,16 +750,42 @@ func (h *MessageHandler) AddRecipients(c *gin.Context) {
return
}

// Keep add_to_from aligned with spec: this participant initiated the add_to batch.
tx, err := h.DB.Pool.Begin(ctx)
if err != nil {
log.Printf("add recipients: begin tx: %v", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to add recipients"})
return
}
defer tx.Rollback(ctx)

if _, err = tx.Exec(ctx,
"UPDATE msg SET add_to_from = $1 WHERE id = $2",
identity, msgID,
); err != nil {
log.Printf("add recipients: update add_to_from: %v", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to add recipients"})
return
}

// Insert the new add_to recipients.
for _, addr := range input.AddTo {
if _, err = h.DB.Pool.Exec(ctx,
if _, err = tx.Exec(ctx,
"INSERT INTO msg_add_to (msg_id, addr) VALUES ($1, $2) ON CONFLICT DO NOTHING",
msgID, addr,
); err != nil {
log.Printf("add recipients: insert %s: %v", addr, err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to add recipients"})
return
}
}

if err = tx.Commit(ctx); err != nil {
log.Printf("add recipients: commit tx: %v", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to add recipients"})
return
}

c.JSON(http.StatusOK, gin.H{"id": msgID, "added": len(input.AddTo)})
}

Expand Down Expand Up @@ -768,23 +929,25 @@ func isZip(data []byte) bool {
return len(data) >= 4 && data[0] == 0x50 && data[1] == 0x4b && data[2] == 0x03 && data[3] == 0x04
}

// checkDistinctRecipients returns an error if any address in to or addTo
// appears more than once (case-insensitive).
// checkDistinctRecipients returns an error if any address appears more than
// once within to, or more than once within addTo. Overlap between the two
// lists is allowed.
func checkDistinctRecipients(to, addTo []string) error {
seen := make(map[string]struct{}, len(to)+len(addTo))
seen := make(map[string]struct{}, len(to))
for _, addr := range to {
key := strings.ToLower(addr)
if _, dup := seen[key]; dup {
return fmt.Errorf("duplicate recipient: %s", addr)
}
seen[key] = struct{}{}
}
addToSeen := make(map[string]struct{}, len(addTo))
for _, addr := range addTo {
key := strings.ToLower(addr)
if _, dup := seen[key]; dup {
return fmt.Errorf("duplicate recipient: %s", addr)
if _, dup := addToSeen[key]; dup {
return fmt.Errorf("duplicate add_to recipient: %s", addr)
}
seen[key] = struct{}{}
addToSeen[key] = struct{}{}
}
return nil
}
6 changes: 3 additions & 3 deletions src/handlers/messages_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,9 @@ func TestCheckDistinctRecipients(t *testing.T) {
t.Error("expected duplicate error for case-insensitive match in to")
}

// Duplicate across to and addTo.
if err := checkDistinctRecipients([]string{"@a@b.com"}, []string{"@A@B.COM"}); err == nil {
t.Error("expected duplicate error across to and addTo")
// Same address in both to and addTo is allowed.
if err := checkDistinctRecipients([]string{"@a@b.com"}, []string{"@A@B.COM"}); err != nil {
t.Errorf("expected no error for address in both to and addTo, got %v", err)
}

// Duplicate within addTo only.
Expand Down
1 change: 1 addition & 0 deletions src/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func main() {
fmsg := router.Group("/fmsg")
fmsg.Use(jwtMiddleware.MiddlewareFunc())
{
fmsg.GET("/wait", msgHandler.Wait)
fmsg.GET("", msgHandler.List)
fmsg.POST("", msgHandler.Create)
fmsg.GET("/:id", msgHandler.Get)
Expand Down
Loading