diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..8f63db8 --- /dev/null +++ b/.env.example @@ -0,0 +1,29 @@ +# fmsgd Environment Variables + +# Required +FMSG_DATA_DIR=/opt/fmsg/data +FMSG_DOMAIN=example.com +FMSG_ID_URL=http://127.0.0.1:8080 + +# Optional +FMSG_MAX_MSG_SIZE=10240 +FMSG_PORT=4930 +FMSG_MAX_PAST_TIME_DELTA=604800 +FMSG_MAX_FUTURE_TIME_DELTA=300 +FMSG_MIN_DOWNLOAD_RATE=5000 +FMSG_MIN_UPLOAD_RATE=5000 +FMSG_READ_BUFFER_SIZE=1600 +FMSG_RETRY_INTERVAL=20 +FMSG_RETRY_MAX_AGE=86400 +FMSG_POLL_INTERVAL=10 +FMSG_MAX_CONCURRENT_SEND=1024 +FMSG_SKIP_DOMAIN_IP_CHECK=false +FMSG_SKIP_AUTHORISED_IPS=false # WARNING: skipping authorised IP check is a security risk and should only be used for testing + +# PostgreSQL connection variables (see https://www.postgresql.org/docs/current/libpq-envars.html) +PGHOST= +PGPORT= +PGUSER= +PGPASSWORD= +PGDATABASE= +PGSSLMODE=disable \ No newline at end of file diff --git a/TODO2.md b/TODO2.md index 54fd073..990d720 100644 --- a/TODO2.md +++ b/TODO2.md @@ -10,51 +10,15 @@ These are foundational: the header hash (used for challenge verification and pid references) and message hash (used for duplicate detection, challenge response, and pid) are both wrong until Encode() is complete. -### ~~1. Encode() must include size and attachment headers~~ DONE -**File:** `defs.go` `Encode()` -Spec defines "message header" as all fields through the attachment headers. -Encode() currently stops after the type field — it omits: - - size (uint32 LE) - - attachment count (uint8) + attachment headers (flags, type, filename, size) - The header hash (SHA-256 of the encoded header) will be incorrect without these fields, which breaks challenge verification and pid references. -### ~~2. Encode() must omit topic when pid is set~~ DONE -**File:** `defs.go` `Encode()` -Spec: "When pid exists the entire topic field MUST NOT be included on the wire." -Encode() always writes the topic. It must only write topic when pid is absent. - -### 3. Encode() must support common-type encoding for type field -**File:** `defs.go` `Encode()` -When common-type flag (bit 2) is set, type on the wire is a single uint8 index, -not a length-prefixed string. Encode() always writes length-prefixed. - -### ~~4. Add Attachments field to FMsgHeader~~ DONE -**File:** `defs.go` struct -Add `Attachments []FMsgAttachmentHeader` to store parsed attachment headers. -Required before Encode() can include attachment headers, before attachment -parsing/validation/download, and before hash computation is correct. - -### ~~5. Complete FMsgAttachmentHeader struct~~ DONE -**File:** `defs.go` struct -Currently only has Filename, Size, Filepath. Missing per-spec fields: - - Flags (uint8) — including per-attachment common-type (bit 0) and deflate (bit 1) - - Type (string) — the attachment's media type - ### 6. Add ChallengeCompleted flag to FMsgHeader **File:** `defs.go` struct Add `ChallengeCompleted bool` to distinguish "challenge was completed and ChallengeHash is valid" from "challenge was not performed." Without this, the hash check in downloadMessage erroneously fails when the challenge was skipped. -### ~~7. GetMessageHash() must include attachment data~~ DONE -**File:** `defs.go` `GetMessageHash()` -Spec: message hash is SHA-256 of the entire message — header + data + -attachment data. Currently attachment data (sequential byte sequences following -the message body) is not included. - ---- ## P1 — Receiving: Header Exchange (host.go readHeader) @@ -72,18 +36,7 @@ returns an error without sending any code. ### 10. Validate at least one "to" recipient **File:** `host.go` `readHeader()` Spec 1.4.i.a: If to count is 0, reject code 1 (invalid). Currently no check. - -### ~~11. Make topic conditional on pid absence~~ DONE -**File:** `host.go` `readHeader()` -Spec: topic field is only present when pid is NOT set. Currently topic is -read unconditionally regardless of pid. - -### 12. Handle common-type flag for type field -**File:** `host.go` `readHeader()` -When common-type flag (bit 2) is set, type is a single uint8 index into the -Common Media Types table. If the value has no mapping → reject code 1. -Currently always reads type as a length-prefixed string. - +\ ### 13. Parse and validate attachment headers **File:** `host.go` `readHeader()` Currently rejects any non-zero attachment count. Must: @@ -124,11 +77,6 @@ referenced by pid." Not currently checked. ## P2 — Receiving: Challenge (host.go challenge) -### ~~18. Connection 2 must target same IP as Connection 1~~ DONE -**File:** `host.go` `challenge()` -Spec 2.1: Dial conn.RemoteAddr() IP, not h.From.Domain. Dialling the domain -may resolve to a different IP. - ### 19. Make challenge mode configurable **File:** `host.go` `challenge()` / `handleConn()` Spec defines challenge modes (NEVER, ALWAYS, HAS_NOT_PARTICIPATED, diff --git a/dd.sql b/dd.sql index 8e05dc9..5a4dc7a 100644 --- a/dd.sql +++ b/dd.sql @@ -16,7 +16,8 @@ create table if not exists msg ( time_sent double precision, -- time sending host recieved message for sending, message timestamp field, NULL means message not ready for sending i.e. draft from_addr varchar(255) not null, topic varchar(255) not null, - type varchar(255) not null, + type varchar(255), -- NULL when common_type is set (full string not needed on wire) + common_type smallint, -- common media type number when common type flag was set on received message, NULL otherwise; needed to faithfully reconstruct wire bytes for hash computation sha256 bytea unique, psha256 bytea, size int not null, -- spec allows uint32 but we don't enforced by FMSG_MAX_MSG_SIZE @@ -35,14 +36,21 @@ create table if not exists msg_to ( ); create index on msg_to ((lower(addr))); -create table if not exists msg_add_to ( +create table if not exists msg_add_to_batch ( id bigserial primary key, msg_id bigint not null references msg (id), + batch_no int not null, + sha256 bytea not null, -- hash of message bytes with this batch's add_to recipients + unique (msg_id, batch_no) +); + +create table if not exists msg_add_to ( + id bigserial primary key, + batch_id bigint not null references msg_add_to_batch (id), addr varchar(255) not null, time_delivered double precision, -- if sending, time sending host recieved delivery confirmation, if receiving, time successfully received message time_last_attempt double precision, -- only used when sending, time of last delivery attempt if failed; otherwise null - response_code smallint, -- only used when sending, response code of last delivery attempt if failed; otherwise null - unique (msg_id, addr) + response_code smallint -- only used when sending, response code of last delivery attempt if failed; otherwise null ); create index on msg_add_to ((lower(addr))); @@ -71,9 +79,40 @@ create trigger trg_msg_to_insert after insert on msg_to for each row execute function notify_msg_to_insert(); +-- notify when a new msg_to row is inserted with time_delivered set so that +-- listeners can be notified a new message has arrived. +create or replace function notify_msg_to_received() returns trigger as $$ +begin + if NEW.time_delivered is not null then + perform pg_notify('new_msg_from', NEW.msg_id::text || ',' || NEW.addr); + end if; + return NEW; +end; +$$ language plpgsql; + +drop trigger if exists trg_msg_to_received on msg_to; +create trigger trg_msg_to_received + after insert on msg_to + for each row execute function notify_msg_to_received(); + -- notify when a new msg_add_to row is inserted with null time_delivered so the -- sender can pick it up immediately instead of waiting for the next poll. +-- Must resolve msg_id through the batch table since msg_add_to references +-- msg_add_to_batch, not msg directly. +create or replace function notify_msg_add_to_insert() returns trigger as $$ +declare + v_msg_id bigint; +begin + if NEW.time_delivered is null then + select msg_id into v_msg_id from msg_add_to_batch where id = NEW.batch_id; + perform pg_notify('new_msg_to', v_msg_id::text || ',' || NEW.addr) + from msg where id = v_msg_id and time_sent is not null; + end if; + return NEW; +end; +$$ language plpgsql; + drop trigger if exists trg_msg_add_to_insert on msg_add_to; create trigger trg_msg_add_to_insert after insert on msg_add_to - for each row execute function notify_msg_to_insert(); + for each row execute function notify_msg_add_to_insert(); diff --git a/src/defs.go b/src/defs.go index 87e2245..f3a650d 100644 --- a/src/defs.go +++ b/src/defs.go @@ -93,8 +93,18 @@ func (h *FMsgHeader) Encode() []byte { b.WriteByte(byte(len(h.Topic))) b.WriteString(h.Topic) } - b.WriteByte(byte(len(h.Type))) - b.WriteString(h.Type) + // type: when common-type flag is set, write a single uint8 index; + // otherwise write uint8 length + ASCII string. + if h.Flags&FlagCommonType != 0 { + num, ok := mediaTypeToNumber[h.Type] + if !ok { + panic(fmt.Sprintf("common type flag set but %q has no mapping", h.Type)) + } + b.WriteByte(num) + } else { + b.WriteByte(byte(len(h.Type))) + b.WriteString(h.Type) + } // size (uint32 LE) if err := binary.Write(&b, binary.LittleEndian, h.Size); err != nil { panic(err) @@ -103,8 +113,17 @@ func (h *FMsgHeader) Encode() []byte { b.WriteByte(byte(len(h.Attachments))) for _, att := range h.Attachments { b.WriteByte(att.Flags) - b.WriteByte(byte(len(att.Type))) - b.WriteString(att.Type) + // attachment type: common-type flag is bit 0 of attachment flags + if att.Flags&AttachmentFlagCommonType != 0 { + num, ok := mediaTypeToNumber[att.Type] + if !ok { + panic(fmt.Sprintf("attachment common type flag set but %q has no mapping", att.Type)) + } + b.WriteByte(num) + } else { + b.WriteByte(byte(len(att.Type))) + b.WriteString(att.Type) + } b.WriteByte(byte(len(att.Filename))) b.WriteString(att.Filename) if err := binary.Write(&b, binary.LittleEndian, att.Size); err != nil { diff --git a/src/defs_test.go b/src/defs_test.go index 4dca209..e52a295 100644 --- a/src/defs_test.go +++ b/src/defs_test.go @@ -387,7 +387,7 @@ func TestEncodeWithAttachments(t *testing.T) { Size: 100, Attachments: []FMsgAttachmentHeader{ {Flags: 0, Type: "image/png", Filename: "pic.png", Size: 2048}, - {Flags: 1, Type: "a", Filename: "doc.txt", Size: 512}, + {Flags: AttachmentFlagCommonType, Type: "application/pdf", Filename: "doc.txt", Size: 512}, }, } b := h.Encode() @@ -449,16 +449,14 @@ func TestEncodeWithAttachments(t *testing.T) { t.Fatalf("att[0] size = %d, want 2048", att0Size) } - // attachment 1 + // attachment 1 (common type: application/pdf = 6) att1Flags, _ := r.ReadByte() - if att1Flags != 1 { - t.Fatalf("att[1] flags = %d, want 1", att1Flags) + if att1Flags != AttachmentFlagCommonType { + t.Fatalf("att[1] flags = %d, want %d", att1Flags, AttachmentFlagCommonType) } - att1TypeLen, _ := r.ReadByte() - att1Type := make([]byte, att1TypeLen) - r.Read(att1Type) - if string(att1Type) != "a" { - t.Fatalf("att[1] type = %q, want %q", string(att1Type), "a") + att1TypeNum, _ := r.ReadByte() + if att1TypeNum != 6 { + t.Fatalf("att[1] common type number = %d, want 6 (application/pdf)", att1TypeNum) } att1FnLen, _ := r.ReadByte() att1Fn := make([]byte, att1FnLen) @@ -476,3 +474,70 @@ func TestEncodeWithAttachments(t *testing.T) { t.Fatalf("unexpected %d trailing bytes", r.Len()) } } + +func TestEncodeCommonType(t *testing.T) { + // When FlagCommonType is set, the type field should be a single uint8 + // index (56 = "text/plain;charset=UTF-8") instead of length+string. + h := &FMsgHeader{ + Version: 1, + Flags: FlagCommonType, + From: FMsgAddress{User: "a", Domain: "b.com"}, + To: []FMsgAddress{{User: "c", Domain: "d.com"}}, + Timestamp: 0, + Topic: "", + Type: "text/plain;charset=UTF-8", + } + b := h.Encode() + r := bytes.NewReader(b) + + r.ReadByte() // version + r.ReadByte() // flags + + // skip from + fLen, _ := r.ReadByte() + r.Read(make([]byte, fLen)) + // skip to count + to[0] + r.ReadByte() + tLen, _ := r.ReadByte() + r.Read(make([]byte, tLen)) + // skip timestamp + var ts float64 + binary.Read(r, binary.LittleEndian, &ts) + // skip topic (present because no pid) + topicLen, _ := r.ReadByte() + r.Read(make([]byte, topicLen)) + + // type: should be single byte = 56 + typeNum, _ := r.ReadByte() + if typeNum != 56 { + t.Fatalf("common type number = %d, want 56 (text/plain;charset=UTF-8)", typeNum) + } + + // size + attachment count + var size uint32 + binary.Read(r, binary.LittleEndian, &size) + attachCount, _ := r.ReadByte() + if attachCount != 0 { + t.Fatalf("attach count = %d, want 0", attachCount) + } + + if r.Len() != 0 { + t.Fatalf("unexpected %d trailing bytes", r.Len()) + } + + // Encoding with common type should be shorter than without + h2 := &FMsgHeader{ + Version: 1, + Flags: 0, + From: FMsgAddress{User: "a", Domain: "b.com"}, + To: []FMsgAddress{{User: "c", Domain: "d.com"}}, + Timestamp: 0, + Topic: "", + Type: "text/plain;charset=UTF-8", + } + b2 := h2.Encode() + // common type: 1 byte vs length-prefixed: 1 + 24 = 25 bytes → 24 bytes shorter + if len(b) >= len(b2) { + t.Fatalf("common type encoding (%d bytes) should be shorter than length-prefixed (%d bytes)", len(b), len(b2)) + } +} diff --git a/src/host.go b/src/host.go index f08a4af..82fcdbe 100644 --- a/src/host.go +++ b/src/host.go @@ -39,6 +39,11 @@ const ( FlagNoReply uint8 = 1 << 4 FlagDeflate uint8 = 1 << 5 + // Attachment flag bit assignments per SPEC.md: + // bit 0 = common type, bit 1 = deflate, bits 2-7 = reserved. + AttachmentFlagCommonType uint8 = 1 + AttachmentFlagDeflate uint8 = 1 << 1 + RejectCodeInvalid uint8 = 1 RejectCodeUnsupportedVersion uint8 = 2 RejectCodeUndisclosed uint8 = 3 @@ -174,7 +179,6 @@ func setIDURL() { if err != nil { log.Panicf("ERROR: FMSG_ID_URL lookup failed, %s: %s", url, err) } - // TODO ping URL to verify its up and responding in a timely manner IDURI = rawUrl log.Printf("INFO: ID URL: %s", IDURI) } @@ -448,18 +452,29 @@ func readHeader(c net.Conn) (*FMsgHeader, *bufio.Reader, error) { h.Topic = string(topic) } - // TODO [Spec 1.4.i.b / flag bit 2]: When the "common type" flag (bit 2) is - // set, the type field is a single uint8 that maps to a predefined Media Type - // string per the Common Media Types table. If the value has no mapping, reject - // with code 1 (invalid). Currently the code always reads type as uint8-prefixed - // string, ignoring the common type flag entirely. - - // read type - mime, err := ReadUInt8Slice(r) - if err != nil { - return h, r, err + // read type: when common-type flag is set, the field is a single uint8 + // index into the Common Media Types table; otherwise uint8 length + ASCII string. + if flags&FlagCommonType != 0 { + typeNum, err := r.ReadByte() + if err != nil { + return h, r, err + } + typStr, ok := numberToMediaType[typeNum] + if !ok { + codes := []byte{RejectCodeInvalid} + if err := rejectAccept(c, codes); err != nil { + return h, r, err + } + return h, r, fmt.Errorf("unknown common type number: %d", typeNum) + } + h.Type = typStr + } else { + mime, err := ReadUInt8Slice(r) + if err != nil { + return h, r, err + } + h.Type = string(mime) } - h.Type = string(mime) // read message size if err := binary.Read(r, binary.LittleEndian, &h.Size); err != nil { @@ -531,7 +546,7 @@ func readHeader(c net.Conn) (*FMsgHeader, *bufio.Reader, error) { } // record add-to recipients for future hash reconstruction, respond code 11 - if err := storeMsgHeaderOnly(h); err != nil { + if err := storeAddToBatch(parentID, h); err != nil { codes := []byte{RejectCodeUndisclosed} if err2 := rejectAccept(c, codes); err2 != nil { return h, r, err2 diff --git a/src/mediatypes.go b/src/mediatypes.go new file mode 100644 index 0000000..cbf4447 --- /dev/null +++ b/src/mediatypes.go @@ -0,0 +1,84 @@ +package main + +// Common Media Types mapping defined in the fmsg SPECIFICATION. +// When the common-type flag is set, the type field on the wire is a single +// uint8 index into this table instead of a length-prefixed ASCII string. + +// numberToMediaType maps a common type uint8 index to its full Media Type string +// exactly as listed in the fmsg specification Common Media Types table. +var numberToMediaType = map[uint8]string{ + 1: "application/epub+zip", + 2: "application/gzip", + 3: "application/json", + 4: "application/msword", + 5: "application/octet-stream", + 6: "application/pdf", + 7: "application/rtf", + 8: "application/vnd.amazon.ebook", + 9: "application/vnd.ms-excel", + 10: "application/vnd.ms-powerpoint", + 11: "application/vnd.oasis.opendocument.presentation", + 12: "application/vnd.oasis.opendocument.spreadsheet", + 13: "application/vnd.oasis.opendocument.text", + 14: "application/vnd.openxmlformats-officedocument.presentationml.presentation", + 15: "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", + 16: "application/vnd.openxmlformats-officedocument.wordprocessingml.document", + 17: "application/x-tar", + 18: "application/xhtml+xml", + 19: "application/xml", + 20: "application/zip", + 21: "audio/aac", + 22: "audio/midi", + 23: "audio/mpeg", + 24: "audio/ogg", + 25: "audio/opus", + 26: "audio/vnd.wave", + 27: "audio/webm", + 28: "font/otf", + 29: "font/ttf", + 30: "font/woff", + 31: "font/woff2", + 32: "image/apng", + 33: "image/avif", + 34: "image/bmp", + 35: "image/gif", + 36: "image/heic", + 37: "image/jpeg", + 38: "image/png", + 39: "image/svg+xml", + 40: "image/tiff", + 41: "image/webp", + 42: "model/3mf", + 43: "model/gltf-binary", + 44: "model/obj", + 45: "model/step", + 46: "model/stl", + 47: "model/vnd.usdz+zip", + 48: "text/calendar", + 49: "text/css", + 50: "text/csv", + 51: "text/html", + 52: "text/javascript", + 53: "text/markdown", + 54: "text/plain;charset=US-ASCII", + 55: "text/plain;charset=UTF-16", + 56: "text/plain;charset=UTF-8", + 57: "text/vcard", + 58: "video/H264", + 59: "video/H265", + 60: "video/H266", + 61: "video/ogg", + 62: "video/VP8", + 63: "video/VP9", + 64: "video/webm", +} + +// mediaTypeToNumber is the reverse mapping built at init. +var mediaTypeToNumber map[string]uint8 + +func init() { + mediaTypeToNumber = make(map[string]uint8, len(numberToMediaType)) + for num, s := range numberToMediaType { + mediaTypeToNumber[s] = num + } +} diff --git a/src/sender.go b/src/sender.go index 6f36137..c2493b3 100644 --- a/src/sender.go +++ b/src/sender.go @@ -28,10 +28,14 @@ func loadSenderEnvConfig() { MaxConcurrentSend = env.GetIntDefault("FMSG_MAX_CONCURRENT_SEND", 1024) } -// pendingTarget identifies a (message, domain) pair that needs delivery. +// pendingTarget identifies a (message, batch, domain) tuple that needs delivery. +// BatchID and BatchNo are both 0 for original message delivery. For add-to +// batch delivery they identify the specific msg_add_to_batch row. type pendingTarget struct { - MsgID int64 - Domain string + MsgID int64 + BatchID int64 + BatchNo int + Domain string } // findPendingTargets discovers (msg_id, domain) pairs with undelivered, @@ -54,7 +58,7 @@ func findPendingTargets() ([]pendingTarget, error) { now := timeutil.TimestampNow().Float64() - // query both msg_to and msg_add_to for pending targets + // Original message recipients (batch_id=0, batch_no=0). rows, err := db.Query(` SELECT mt.msg_id, mt.addr FROM msg_to mt @@ -64,15 +68,6 @@ func findPendingTargets() ([]pendingTarget, error) { AND (mt.response_code IS NULL OR mt.response_code IN (3, 5)) AND (mt.time_last_attempt IS NULL OR ($1 - mt.time_last_attempt) > $2) AND ($1 - m.time_sent) < $3 - UNION ALL - SELECT mat.msg_id, mat.addr - FROM msg_add_to mat - INNER JOIN msg m ON m.id = mat.msg_id - WHERE mat.time_delivered IS NULL - AND m.time_sent IS NOT NULL - AND (mat.response_code IS NULL OR mat.response_code IN (3, 5)) - AND (mat.time_last_attempt IS NULL OR ($1 - mat.time_last_attempt) > $2) - AND ($1 - m.time_sent) < $3 `, now, RetryInterval, RetryMaxAge) if err != nil { return nil, err @@ -80,8 +75,9 @@ func findPendingTargets() ([]pendingTarget, error) { defer rows.Close() type key struct { - msgID int64 - domain string + msgID int64 + batchNo int + domain string } seen := make(map[key]bool) var targets []pendingTarget @@ -98,36 +94,96 @@ func findPendingTargets() ([]pendingTarget, error) { } domain := addr[lastAt+1:] if strings.EqualFold(domain, Domain) { - continue // local domain — no remote delivery needed + continue } - k := key{msgID, domain} + k := key{msgID, 0, domain} if !seen[k] { seen[k] = true targets = append(targets, pendingTarget{MsgID: msgID, Domain: domain}) } } - return targets, rows.Err() + if err := rows.Err(); err != nil { + return nil, err + } + + // Add-to batch recipients. Join through msg_add_to_batch. + // m.sha256 IS NOT NULL ensures original was delivered (hash populated) + // before we attempt add-to delivery. + addToRows, err := db.Query(` + SELECT b.msg_id, b.id, b.batch_no, mat.addr + FROM msg_add_to mat + INNER JOIN msg_add_to_batch b ON b.id = mat.batch_id + INNER JOIN msg m ON m.id = b.msg_id + WHERE mat.time_delivered IS NULL + AND m.time_sent IS NOT NULL + AND m.sha256 IS NOT NULL + AND (mat.response_code IS NULL OR mat.response_code IN (3, 5)) + AND (mat.time_last_attempt IS NULL OR ($1 - mat.time_last_attempt) > $2) + AND ($1 - m.time_sent) < $3 + `, now, RetryInterval, RetryMaxAge) + if err != nil { + return nil, err + } + defer addToRows.Close() + + for addToRows.Next() { + var msgID, batchID int64 + var batchNo int + var addr string + if err := addToRows.Scan(&msgID, &batchID, &batchNo, &addr); err != nil { + return nil, err + } + lastAt := strings.LastIndex(addr, "@") + if lastAt == -1 { + continue + } + domain := addr[lastAt+1:] + if strings.EqualFold(domain, Domain) { + continue + } + k := key{msgID, batchNo, domain} + if !seen[k] { + seen[k] = true + targets = append(targets, pendingTarget{MsgID: msgID, BatchID: batchID, BatchNo: batchNo, Domain: domain}) + } + } + return targets, addToRows.Err() } // deliverMessage handles delivery of a single message to a single remote domain. // -// It manages its own database transaction with the following lifecycle: -// - Locks the pending msg_to rows for this (message, domain) via FOR UPDATE SKIP LOCKED. -// - Loads the full message including ALL recipients (for the original wire header). -// - Sends the complete original message to the remote host. -// - On success: updates time_delivered + response_code, commits. -// - On rejection (got response code): updates response_code + time_last_attempt, commits. -// - On error (no response): logs error, rolls back — delivery retried in future. +// For original messages (target.BatchNo == 0): locks pending msg_to rows, +// loads the original message, sends the complete message, handles per-recipient +// response codes for to recipients on the target domain. +// +// For add-to batches (target.BatchNo > 0): locks pending msg_add_to rows for +// the specific batch, loads the message with that batch's add-to recipients +// (pid = original hash), sends the complete message, handles per-recipient +// response codes for both to and add-to recipients on the target domain. func deliverMessage(target pendingTarget) { if strings.EqualFold(target.Domain, Domain) { - // local domain — mark as delivered rather than sending remotely - db, err := sql.Open("postgres", "") - if err != nil { - log.Printf("ERROR: sender: db open for local delivery: %s", err) - return - } - defer db.Close() - now := timeutil.TimestampNow().Float64() + deliverLocal(target) + return + } + + if target.BatchNo == 0 { + deliverOriginal(target) + } else { + deliverAddToBatch(target) + } +} + +// deliverLocal marks local-domain recipients as delivered without network I/O. +func deliverLocal(target pendingTarget) { + db, err := sql.Open("postgres", "") + if err != nil { + log.Printf("ERROR: sender: db open for local delivery: %s", err) + return + } + defer db.Close() + now := timeutil.TimestampNow().Float64() + + if target.BatchNo == 0 { if _, err := db.Exec(` UPDATE msg_to SET time_delivered = $1, response_code = 200 WHERE msg_id = $2 AND time_delivered IS NULL @@ -135,9 +191,27 @@ func deliverMessage(target pendingTarget) { `, now, target.MsgID, target.Domain); err != nil { log.Printf("ERROR: sender: marking local recipients delivered for msg %d: %s", target.MsgID, err) } - return + } else { + if _, err := db.Exec(` + UPDATE msg_add_to SET time_delivered = $1, response_code = 200 + WHERE batch_id = $2 AND time_delivered IS NULL + AND lower(split_part(addr, '@', 3)) = lower($3) + `, now, target.BatchID, target.Domain); err != nil { + log.Printf("ERROR: sender: marking local add-to recipients delivered for msg %d batch %d: %s", + target.MsgID, target.BatchNo, err) + } } +} + +// lockedRow holds a locked recipient row's database ID and address. +type lockedRow struct { + ID int64 + Addr string +} +// deliverOriginal handles delivery of the original message (no add-to) to a +// remote domain. Locks msg_to rows, sends the full message, processes response. +func deliverOriginal(target pendingTarget) { db, err := sql.Open("postgres", "") if err != nil { log.Printf("ERROR: sender: db open: %s", err) @@ -159,10 +233,9 @@ func deliverMessage(target pendingTarget) { now := timeutil.TimestampNow().Float64() - // Lock pending (undelivered, retryable) msg_to rows for this message - // on the target domain. SKIP LOCKED avoids blocking concurrent senders. + // Lock pending msg_to rows for this message on the target domain. lockRows, err := tx.Query(` - SELECT mt.addr + SELECT mt.id, mt.addr FROM msg_to mt INNER JOIN msg m ON m.id = mt.msg_id WHERE mt.msg_id = $1 @@ -178,17 +251,17 @@ func deliverMessage(target pendingTarget) { return } - var lockedAddrs []string + var locked []lockedRow for lockRows.Next() { - var addr string - if err := lockRows.Scan(&addr); err != nil { + var r lockedRow + if err := lockRows.Scan(&r.ID, &r.Addr); err != nil { lockRows.Close() - log.Printf("ERROR: sender: scan locked addr: %s", err) + log.Printf("ERROR: sender: scan locked row: %s", err) return } - lastAt := strings.LastIndex(addr, "@") - if lastAt != -1 && strings.EqualFold(addr[lastAt+1:], target.Domain) { - lockedAddrs = append(lockedAddrs, addr) + lastAt := strings.LastIndex(r.Addr, "@") + if lastAt != -1 && strings.EqualFold(r.Addr[lastAt+1:], target.Domain) { + locked = append(locked, r) } } lockRows.Close() @@ -196,57 +269,18 @@ func deliverMessage(target pendingTarget) { log.Printf("ERROR: sender: lock rows err for msg %d: %s", target.MsgID, err) return } - - // Also lock pending msg_add_to rows for this message on the target domain. - lockAddToRows, err := tx.Query(` - SELECT mat.addr - FROM msg_add_to mat - INNER JOIN msg m ON m.id = mat.msg_id - WHERE mat.msg_id = $1 - AND mat.time_delivered IS NULL - AND m.time_sent IS NOT NULL - AND (mat.response_code IS NULL OR mat.response_code IN (3, 5)) - AND (mat.time_last_attempt IS NULL OR ($2 - mat.time_last_attempt) > $3) - AND ($2 - m.time_sent) < $4 - FOR UPDATE OF mat SKIP LOCKED - `, target.MsgID, now, RetryInterval, RetryMaxAge) - if err != nil { - log.Printf("ERROR: sender: lock add-to rows for msg %d: %s", target.MsgID, err) - return - } - - var lockedAddToAddrs []string - for lockAddToRows.Next() { - var addr string - if err := lockAddToRows.Scan(&addr); err != nil { - lockAddToRows.Close() - log.Printf("ERROR: sender: scan locked add-to addr: %s", err) - return - } - lastAt := strings.LastIndex(addr, "@") - if lastAt != -1 && strings.EqualFold(addr[lastAt+1:], target.Domain) { - lockedAddToAddrs = append(lockedAddToAddrs, addr) - } - } - lockAddToRows.Close() - if err := lockAddToRows.Err(); err != nil { - log.Printf("ERROR: sender: lock add-to rows err for msg %d: %s", target.MsgID, err) + if len(locked) == 0 { return } - if len(lockedAddrs) == 0 && len(lockedAddToAddrs) == 0 { - return // already locked by another sender or no longer eligible - } - - // Load the full message from msg table - h, err := loadMsg(tx, target.MsgID) + // Load the original message (batchNo=0, no add-to). + h, err := loadMsg(tx, target.MsgID, 0) if err != nil { log.Printf("ERROR: sender: %s", err) return } - // Ensure sha256 is populated for outgoing messages so future pid lookups - // (e.g. add-to notifications referencing this message) can find it. + // Ensure sha256 is populated so future add-to batches can reference it. msgHash, err := h.GetMessageHash() if err != nil { log.Printf("ERROR: sender: computing message hash for msg %d: %s", target.MsgID, err) @@ -258,62 +292,41 @@ func deliverMessage(target pendingTarget) { return } - // Register in outgoing map so challenge handler can look up this message + // Register in outgoing map for challenge handler. hash := h.GetHeaderHash() hashArr := *(*[32]byte)(hash) log.Printf("INFO: sender: registering outgoing message %s", hex.EncodeToString(hash[:])) registerOutgoing(hashArr, h) defer deleteOutgoing(hashArr) - // Build the list of recipients on the target domain (in order) and - // note which ones we locked (i.e. are pending delivery this round). Per spec, - // response codes arrive per-recipient in to then add-to order excluding other domains. - // TODO check retry logic when some recipients on this domain are not locked - // (e.g. already delivered or locked by another sender) — do we still get - // per-recipient codes for the locked ones? - lockedSet := make(map[string]bool) - for _, a := range lockedAddrs { - lockedSet[strings.ToLower(a)] = true - } - for _, a := range lockedAddToAddrs { - lockedSet[strings.ToLower(a)] = true + // Build the list of to-recipients on target domain, noting which we locked. + lockedSet := make(map[string]int64) // lowercase addr → row ID + for _, r := range locked { + lockedSet[strings.ToLower(r.Addr)] = r.ID } type domainRecip struct { + rowID int64 addr string isLocked bool - isAddTo bool } var domainRecips []domainRecip for _, addr := range h.To { if strings.EqualFold(addr.Domain, target.Domain) { s := addr.ToString() + rid, ok := lockedSet[strings.ToLower(s)] domainRecips = append(domainRecips, domainRecip{ + rowID: rid, addr: s, - isLocked: lockedSet[strings.ToLower(s)], - isAddTo: false, - }) - } - } - for _, addr := range h.AddTo { - if strings.EqualFold(addr.Domain, target.Domain) { - s := addr.ToString() - domainRecips = append(domainRecips, domainRecip{ - addr: s, - isLocked: lockedSet[strings.ToLower(s)], - isAddTo: true, + isLocked: ok, }) } } // --- network delivery --- - - // TODO [Spec]: DNSSEC validation SHOULD be performed on the DNS lookup. - // If DNSSEC validation fails the connection MUST terminate (no retry). - // lookupAuthorisedIPs does not currently perform or report DNSSEC validation. targetIPs, err := lookupAuthorisedIPs(target.Domain) if err != nil { log.Printf("ERROR: sender: DNS lookup for _fmsg.%s failed: %s", target.Domain, err) - return // rollback, retry later + return } var conn net.Conn @@ -327,179 +340,379 @@ func deliverMessage(target pendingTarget) { } if conn == nil { log.Printf("ERROR: sender: could not connect to any IP for _fmsg.%s", target.Domain) - return // rollback, retry later + return } defer conn.Close() - // Send header — Encode() writes all fields through attachment headers per spec - headerBytes := h.Encode() - if _, err := conn.Write(headerBytes); err != nil { - log.Printf("ERROR: sender: writing header: %s", err) + // Send header + message data + attachments. + if err := sendMessageData(conn, h); err != nil { + log.Printf("ERROR: sender: msg %d: %s", target.MsgID, err) return } - // message data - fd, err := os.Open(h.Filepath) + // --- read response --- + conn.SetReadDeadline(time.Now().Add(30 * time.Second)) + firstByte := make([]byte, 1) + if _, err := io.ReadFull(conn, firstByte); err != nil { + log.Printf("ERROR: sender: reading response: %s", err) + return + } + + code := firstByte[0] + now = timeutil.TimestampNow().Float64() + + if code < 100 { + // Global code — update all locked rows. + if code == AcceptCodeAddTo { + log.Printf("INFO: sender: msg %d accepted (code 11) by %s", target.MsgID, target.Domain) + } else { + log.Printf("WARN: sender: msg %d rejected by %s: %s (%d)", + target.MsgID, target.Domain, responseCodeName(code), code) + } + for _, r := range locked { + if code == AcceptCodeAddTo || code == RejectCodeAccept { + if _, err := tx.Exec(`UPDATE msg_to SET time_delivered = $1, response_code = $2 WHERE id = $3`, + now, int(code), r.ID); err != nil { + log.Printf("ERROR: sender: update delivered for %s: %s", r.Addr, err) + } + } else { + if _, err := tx.Exec(`UPDATE msg_to SET time_last_attempt = $1, response_code = $2 WHERE id = $3`, + now, int(code), r.ID); err != nil { + log.Printf("ERROR: sender: update last attempt for %s: %s", r.Addr, err) + } + } + } + if err := tx.Commit(); err != nil { + log.Printf("ERROR: sender: commit tx for msg %d: %s", target.MsgID, err) + } else { + committed = true + } + return + } + + // Per-recipient codes. + codes := make([]byte, len(domainRecips)) + codes[0] = code + if len(domainRecips) > 1 { + rest := make([]byte, len(domainRecips)-1) + if _, err := io.ReadFull(conn, rest); err != nil { + log.Printf("ERROR: sender: reading remaining response codes: %s", err) + return + } + copy(codes[1:], rest) + } + + for i, dr := range domainRecips { + if !dr.isLocked { + continue + } + c := codes[i] + if c == RejectCodeAccept { + log.Printf("INFO: sender: delivered msg %d to %s", target.MsgID, dr.addr) + if _, err := tx.Exec(`UPDATE msg_to SET time_delivered = $1, response_code = 200 WHERE id = $2`, + now, dr.rowID); err != nil { + log.Printf("ERROR: sender: update delivered for %s: %s", dr.addr, err) + } + } else { + log.Printf("WARN: sender: msg %d to %s rejected: %s (%d)", + target.MsgID, dr.addr, responseCodeName(c), c) + if _, err := tx.Exec(`UPDATE msg_to SET time_last_attempt = $1, response_code = $2 WHERE id = $3`, + now, int(c), dr.rowID); err != nil { + log.Printf("ERROR: sender: update last attempt for %s: %s", dr.addr, err) + } + } + } + + if err := tx.Commit(); err != nil { + log.Printf("ERROR: sender: commit tx for msg %d: %s", target.MsgID, err) + } else { + committed = true + } +} + +// deliverAddToBatch handles delivery of an add-to batch message to a remote +// domain. Locks pending msg_add_to rows for the batch, loads the message with +// that batch's recipients, sends the complete message, and processes response. +func deliverAddToBatch(target pendingTarget) { + db, err := sql.Open("postgres", "") if err != nil { - log.Printf("ERROR: sender: opening file %s: %s", h.Filepath, err) + log.Printf("ERROR: sender: db open: %s", err) return } - defer fd.Close() + defer db.Close() - conn.SetWriteDeadline(time.Now().Add(calcNetIODuration(int(h.Size), MinUploadRate))) - n, err := io.CopyN(conn, fd, int64(h.Size)) - if n != int64(h.Size) { - log.Printf("ERROR: sender: file size mismatch for msg %d: expected %d, got %d", target.MsgID, h.Size, n) + tx, err := db.Begin() + if err != nil { + log.Printf("ERROR: sender: begin tx: %s", err) return } + committed := false + defer func() { + if !committed { + tx.Rollback() + } + }() + + now := timeutil.TimestampNow().Float64() + + // Lock pending msg_add_to rows for this batch on the target domain. + lockRows, err := tx.Query(` + SELECT mat.id, mat.addr + FROM msg_add_to mat + INNER JOIN msg_add_to_batch b ON b.id = mat.batch_id + INNER JOIN msg m ON m.id = b.msg_id + WHERE mat.batch_id = $1 + AND mat.time_delivered IS NULL + AND m.time_sent IS NOT NULL + AND m.sha256 IS NOT NULL + AND (mat.response_code IS NULL OR mat.response_code IN (3, 5)) + AND (mat.time_last_attempt IS NULL OR ($2 - mat.time_last_attempt) > $3) + AND ($2 - m.time_sent) < $4 + FOR UPDATE OF mat SKIP LOCKED + `, target.BatchID, now, RetryInterval, RetryMaxAge) if err != nil { - log.Printf("ERROR: sender: sending data (%d/%d bytes): %s", n, h.Size, err) + log.Printf("ERROR: sender: lock add-to rows for msg %d batch %d: %s", target.MsgID, target.BatchNo, err) return } - // attachment data — sequential byte sequences bounded by header sizes - for _, att := range h.Attachments { - af, err := os.Open(att.Filepath) - if err != nil { - log.Printf("ERROR: sender: opening attachment %s: %s", att.Filename, err) + var lockedAddTo []lockedRow + for lockRows.Next() { + var r lockedRow + if err := lockRows.Scan(&r.ID, &r.Addr); err != nil { + lockRows.Close() + log.Printf("ERROR: sender: scan locked add-to row: %s", err) return } - an, err := io.CopyN(conn, af, int64(att.Size)) - af.Close() - if an != int64(att.Size) { - log.Printf("ERROR: sender: attachment %s size mismatch: expected %d, got %d", att.Filename, att.Size, an) - return + lastAt := strings.LastIndex(r.Addr, "@") + if lastAt != -1 && strings.EqualFold(r.Addr[lastAt+1:], target.Domain) { + lockedAddTo = append(lockedAddTo, r) } - if err != nil { - log.Printf("ERROR: sender: sending attachment %s: %s", att.Filename, err) - return + } + lockRows.Close() + if err := lockRows.Err(); err != nil { + log.Printf("ERROR: sender: lock add-to rows err for msg %d batch %d: %s", target.MsgID, target.BatchNo, err) + return + } + if len(lockedAddTo) == 0 { + return + } + + // Load the message with this batch's add-to recipients. + // pid will be the original message's hash, add-to will be this batch only. + h, err := loadMsg(tx, target.MsgID, target.BatchNo) + if err != nil { + log.Printf("ERROR: sender: %s", err) + return + } + + // Register in outgoing map for challenge handler. + hash := h.GetHeaderHash() + hashArr := *(*[32]byte)(hash) + log.Printf("INFO: sender: registering outgoing add-to batch %d for msg %d: %s", + target.BatchNo, target.MsgID, hex.EncodeToString(hash[:])) + registerOutgoing(hashArr, h) + defer deleteOutgoing(hashArr) + + // Build domain recipients: to + add-to on target domain. + // Per spec, response codes arrive per-recipient in to then add-to order + // excluding other domains. + lockedSet := make(map[string]int64) // lowercase addr → row ID + for _, r := range lockedAddTo { + lockedSet[strings.ToLower(r.Addr)] = r.ID + } + type domainRecip struct { + rowID int64 + addr string + isLocked bool + isAddTo bool + } + var domainRecips []domainRecip + for _, addr := range h.To { + if strings.EqualFold(addr.Domain, target.Domain) { + domainRecips = append(domainRecips, domainRecip{ + addr: addr.ToString(), + isAddTo: false, + }) + } + } + for _, addr := range h.AddTo { + if strings.EqualFold(addr.Domain, target.Domain) { + s := addr.ToString() + rid, ok := lockedSet[strings.ToLower(s)] + domainRecips = append(domainRecips, domainRecip{ + rowID: rid, + addr: s, + isLocked: ok, + isAddTo: true, + }) } } + // --- network delivery --- + targetIPs, err := lookupAuthorisedIPs(target.Domain) + if err != nil { + log.Printf("ERROR: sender: DNS lookup for _fmsg.%s failed: %s", target.Domain, err) + return + } + + var conn net.Conn + for _, ip := range targetIPs { + addr := net.JoinHostPort(ip.String(), fmt.Sprintf("%d", RemotePort)) + conn, err = net.DialTimeout("tcp", addr, 10*time.Second) + if err == nil { + break + } + log.Printf("WARN: sender: connect to %s failed: %s", addr, err) + } + if conn == nil { + log.Printf("ERROR: sender: could not connect to any IP for _fmsg.%s", target.Domain) + return + } + defer conn.Close() + + // Send header + message data + attachments. The receiver may already + // have the body and respond with code 11 before we finish writing — + // TCP is full duplex, so a write error is non-fatal if we got a response. + sendErr := sendMessageData(conn, h) + // --- read response --- - // A code < 100 is a global rejection (single byte for all recipients). - // Otherwise one code per recipient on this domain, in To-field order. conn.SetReadDeadline(time.Now().Add(30 * time.Second)) - firstByte := make([]byte, 1) if _, err := io.ReadFull(conn, firstByte); err != nil { - log.Printf("ERROR: sender: reading response: %s", err) - return // rollback, retry later + if sendErr != nil { + log.Printf("ERROR: sender: msg %d batch %d: send: %s; read response: %s", + target.MsgID, target.BatchNo, sendErr, err) + } else { + log.Printf("ERROR: sender: msg %d batch %d: reading response: %s", + target.MsgID, target.BatchNo, err) + } + return + } + if sendErr != nil { + log.Printf("INFO: sender: msg %d batch %d: receiver responded before send completed", + target.MsgID, target.BatchNo) } code := firstByte[0] now = timeutil.TimestampNow().Float64() if code < 100 { - // Code 11 (accept add to) — additional recipients received. if code == AcceptCodeAddTo { - log.Printf("INFO: sender: msg %d additional recipients received by %s (code 11)", target.MsgID, target.Domain) - for _, a := range lockedAddrs { - if _, err := tx.Exec(` - UPDATE msg_to SET time_delivered = $1, response_code = $2 - WHERE msg_id = $3 AND addr = $4 - `, now, int(code), target.MsgID, a); err != nil { - log.Printf("ERROR: sender: update delivered for %s: %s", a, err) + log.Printf("INFO: sender: msg %d batch %d add-to accepted by %s", + target.MsgID, target.BatchNo, target.Domain) + for _, r := range lockedAddTo { + if _, err := tx.Exec(`UPDATE msg_add_to SET time_delivered = $1, response_code = $2 WHERE id = $3`, + now, int(code), r.ID); err != nil { + log.Printf("ERROR: sender: update delivered add-to for %s: %s", r.Addr, err) } } - for _, a := range lockedAddToAddrs { - if _, err := tx.Exec(` - UPDATE msg_add_to SET time_delivered = $1, response_code = $2 - WHERE msg_id = $3 AND addr = $4 - `, now, int(code), target.MsgID, a); err != nil { - log.Printf("ERROR: sender: update delivered add-to for %s: %s", a, err) + } else { + log.Printf("WARN: sender: msg %d batch %d rejected by %s: %s (%d)", + target.MsgID, target.BatchNo, target.Domain, responseCodeName(code), code) + for _, r := range lockedAddTo { + if _, err := tx.Exec(`UPDATE msg_add_to SET time_last_attempt = $1, response_code = $2 WHERE id = $3`, + now, int(code), r.ID); err != nil { + log.Printf("ERROR: sender: update last attempt add-to for %s: %s", r.Addr, err) } } - if err := tx.Commit(); err != nil { - log.Printf("ERROR: sender: commit tx for msg %d: %s", target.MsgID, err) - } else { - committed = true - } - return - } - - // global rejection — update all locked recipients - // - // TODO [Spec]: Permanent failures (1 invalid, 2 unsupported version, - // 4 too big, 10 duplicate) should NOT be retried. Currently all global - // codes are stored identically; findPendingTargets only retries codes - // 3 and 5, which is correct for global codes. But ensure code 10 - // (duplicate) is explicitly recognised as permanent and not retried. - log.Printf("WARN: sender: msg %d rejected by %s: %s (%d)", - target.MsgID, target.Domain, responseCodeName(code), code) - for _, a := range lockedAddrs { - if _, err := tx.Exec(` - UPDATE msg_to SET time_last_attempt = $1, response_code = $2 - WHERE msg_id = $3 AND addr = $4 - `, now, int(code), target.MsgID, a); err != nil { - log.Printf("ERROR: sender: update last attempt for %s: %s", a, err) - } - } - for _, a := range lockedAddToAddrs { - if _, err := tx.Exec(` - UPDATE msg_add_to SET time_last_attempt = $1, response_code = $2 - WHERE msg_id = $3 AND addr = $4 - `, now, int(code), target.MsgID, a); err != nil { - log.Printf("ERROR: sender: update last attempt add-to for %s: %s", a, err) - } } if err := tx.Commit(); err != nil { - log.Printf("ERROR: sender: commit tx for msg %d: %s", target.MsgID, err) + log.Printf("ERROR: sender: commit tx for msg %d batch %d: %s", target.MsgID, target.BatchNo, err) } else { committed = true } return } - // per-recipient codes + // Per-recipient codes (to order then add-to order, this domain only). codes := make([]byte, len(domainRecips)) codes[0] = code if len(domainRecips) > 1 { rest := make([]byte, len(domainRecips)-1) if _, err := io.ReadFull(conn, rest); err != nil { log.Printf("ERROR: sender: reading remaining response codes: %s", err) - return // rollback, retry later + return } copy(codes[1:], rest) } for i, dr := range domainRecips { - if !dr.isLocked { - continue // not our responsibility this round - // TODO well receiving host still attempted delivery to this recipient — do we update response code for it? Spec is not clear on this scenario. - } c := codes[i] - table := "msg_to" - if dr.isAddTo { - table = "msg_add_to" + if !dr.isAddTo { + // to-recipient codes during add-to delivery — log but don't update msg_to. + if c != RejectCodeAccept { + log.Printf("WARN: sender: msg %d batch %d to-recip %s code: %s (%d)", + target.MsgID, target.BatchNo, dr.addr, responseCodeName(c), c) + } + continue + } + if !dr.isLocked { + continue } if c == RejectCodeAccept { - log.Printf("INFO: sender: delivered msg %d to %s", target.MsgID, dr.addr) - if _, err := tx.Exec(fmt.Sprintf(` - UPDATE %s SET time_delivered = $1, response_code = 200 - WHERE msg_id = $2 AND addr = $3 - `, table), now, target.MsgID, dr.addr); err != nil { - log.Printf("ERROR: sender: update delivered for %s: %s", dr.addr, err) + log.Printf("INFO: sender: delivered msg %d batch %d to %s", + target.MsgID, target.BatchNo, dr.addr) + if _, err := tx.Exec(`UPDATE msg_add_to SET time_delivered = $1, response_code = 200 WHERE id = $2`, + now, dr.rowID); err != nil { + log.Printf("ERROR: sender: update delivered add-to for %s: %s", dr.addr, err) } } else { - log.Printf("WARN: sender: msg %d to %s rejected: %s (%d)", - target.MsgID, dr.addr, responseCodeName(c), c) - if _, err := tx.Exec(fmt.Sprintf(` - UPDATE %s SET time_last_attempt = $1, response_code = $2 - WHERE msg_id = $3 AND addr = $4 - `, table), now, int(c), target.MsgID, dr.addr); err != nil { - log.Printf("ERROR: sender: update last attempt for %s: %s", dr.addr, err) + log.Printf("WARN: sender: msg %d batch %d to %s rejected: %s (%d)", + target.MsgID, target.BatchNo, dr.addr, responseCodeName(c), c) + if _, err := tx.Exec(`UPDATE msg_add_to SET time_last_attempt = $1, response_code = $2 WHERE id = $3`, + now, int(c), dr.rowID); err != nil { + log.Printf("ERROR: sender: update last attempt add-to for %s: %s", dr.addr, err) } } } if err := tx.Commit(); err != nil { - log.Printf("ERROR: sender: commit tx for msg %d: %s", target.MsgID, err) + log.Printf("ERROR: sender: commit tx for msg %d batch %d: %s", target.MsgID, target.BatchNo, err) } else { committed = true } } +// sendMessageData writes the encoded header, message body, and attachment data +// to conn. Returns an error on any I/O failure. +func sendMessageData(conn net.Conn, h *FMsgHeader) error { + headerBytes := h.Encode() + if _, err := conn.Write(headerBytes); err != nil { + return fmt.Errorf("writing header: %w", err) + } + + fd, err := os.Open(h.Filepath) + if err != nil { + return fmt.Errorf("opening file %s: %w", h.Filepath, err) + } + defer fd.Close() + + conn.SetWriteDeadline(time.Now().Add(calcNetIODuration(int(h.Size), MinUploadRate))) + n, err := io.CopyN(conn, fd, int64(h.Size)) + if n != int64(h.Size) { + return fmt.Errorf("file size mismatch: expected %d, got %d", h.Size, n) + } + if err != nil { + return fmt.Errorf("sending data (%d/%d bytes): %w", n, h.Size, err) + } + + for _, att := range h.Attachments { + af, err := os.Open(att.Filepath) + if err != nil { + return fmt.Errorf("opening attachment %s: %w", att.Filename, err) + } + an, err := io.CopyN(conn, af, int64(att.Size)) + af.Close() + if an != int64(att.Size) { + return fmt.Errorf("attachment %s size mismatch: expected %d, got %d", att.Filename, att.Size, an) + } + if err != nil { + return fmt.Errorf("sending attachment %s: %w", att.Filename, err) + } + } + return nil +} + // processPendingMessages finds messages needing delivery and dispatches a // goroutine per (message, domain) pair, bounded by the semaphore. func processPendingMessages(sem chan struct{}) { diff --git a/src/store.go b/src/store.go index bbc65d5..4286301 100644 --- a/src/store.go +++ b/src/store.go @@ -9,6 +9,28 @@ import ( _ "github.com/lib/pq" ) +// commonTypeParam returns the common media type number (as *int16) if the +// FlagCommonType flag is set on msg, or nil otherwise. Used as the SQL +// parameter for the nullable common_type column. +func commonTypeParam(msg *FMsgHeader) interface{} { + if msg.Flags&FlagCommonType != 0 { + if num, ok := mediaTypeToNumber[msg.Type]; ok { + v := int16(num) + return &v + } + } + return nil +} + +// typeParam returns the type string when common type flag is NOT set, or nil +// when common type flag IS set (the number in common_type is sufficient). +func typeParam(msg *FMsgHeader) interface{} { + if msg.Flags&FlagCommonType != 0 { + return nil + } + return msg.Type +} + func testDb() error { db, err := sql.Open("postgres", "") if err != nil { @@ -28,7 +50,7 @@ func testDb() error { log.Printf("INFO: Database connected: %s@%s:%s/%s", user, host, port, dbName) // verify required tables exist - for _, table := range []string{"msg", "msg_to", "msg_add_to", "msg_attachment"} { + for _, table := range []string{"msg", "msg_to", "msg_add_to_batch", "msg_add_to", "msg_attachment"} { var exists bool err = db.QueryRow(`SELECT EXISTS ( SELECT FROM information_schema.tables @@ -45,7 +67,9 @@ func testDb() error { } // lookupMsgIdByHash returns the msg id for a message with the given SHA256 hash, -// or 0 if no such message exists. +// or 0 if no such message exists. Checks both msg.sha256 and +// msg_add_to_batch.sha256 (add-to batches produce distinct hashes that can be +// referenced as pid by future messages). func lookupMsgIdByHash(hash []byte) (int64, error) { db, err := sql.Open("postgres", "") if err != nil { @@ -55,6 +79,15 @@ func lookupMsgIdByHash(hash []byte) (int64, error) { var id int64 err = db.QueryRow("SELECT id FROM msg WHERE sha256 = $1", hash).Scan(&id) + if err == nil { + return id, nil + } + if err != sql.ErrNoRows { + return 0, err + } + + // check add-to batch hashes + err = db.QueryRow("SELECT msg_id FROM msg_add_to_batch WHERE sha256 = $1", hash).Scan(&id) if err == sql.ErrNoRows { return 0, nil } @@ -89,11 +122,12 @@ func storeMsgDetail(msg *FMsgHeader) error { , from_addr , topic , type + , common_type , sha256 , psha256 , size , filepath) -values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) +values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) returning id`, msg.Version, msg.Flags&FlagNoReply != 0, @@ -102,7 +136,8 @@ returning id`, msg.Timestamp, msg.From.ToString(), msg.Topic, - msg.Type, + typeParam(msg), + commonTypeParam(msg), msgHash, msg.Pid, int(msg.Size), @@ -130,10 +165,28 @@ values ($1, $2, $3)`) } } - // insert add-to recipients into msg_add_to + // insert add-to batch and recipients if len(msg.AddTo) > 0 { - addToStmt, err := tx.Prepare(`insert into msg_add_to (msg_id, addr, time_delivered) -values ($1, $2, $3)`) + addToHash, err := msg.GetMessageHash() + if err != nil { + return err + } + + // determine next batch_no for this msg + var batchNo int + err = tx.QueryRow(`SELECT coalesce(max(batch_no), 0) + 1 FROM msg_add_to_batch WHERE msg_id = $1`, msgID).Scan(&batchNo) + if err != nil { + return err + } + + var batchID int64 + err = tx.QueryRow(`INSERT INTO msg_add_to_batch (msg_id, batch_no, sha256) VALUES ($1, $2, $3) RETURNING id`, + msgID, batchNo, addToHash).Scan(&batchID) + if err != nil { + return err + } + + addToStmt, err := tx.Prepare(`INSERT INTO msg_add_to (batch_id, addr, time_delivered) VALUES ($1, $2, $3)`) if err != nil { return err } @@ -144,19 +197,26 @@ values ($1, $2, $3)`) if addr.Domain == Domain { delivered = now } - if _, err := addToStmt.Exec(msgID, addr.ToString(), delivered); err != nil { + if _, err := addToStmt.Exec(batchID, addr.ToString(), delivered); err != nil { return err } } } - // resolve pid from psha256 (parent message hash) + // resolve pid from psha256 (parent message hash) — check both msg.sha256 + // and msg_add_to_batch.sha256 since a reply can reference an add-to batch if len(msg.Pid) > 0 { var parentID sql.NullInt64 err = tx.QueryRow("SELECT id FROM msg WHERE sha256 = $1", msg.Pid).Scan(&parentID) if err != nil && err != sql.ErrNoRows { return err } + if !parentID.Valid { + err = tx.QueryRow("SELECT msg_id FROM msg_add_to_batch WHERE sha256 = $1", msg.Pid).Scan(&parentID) + if err != nil && err != sql.ErrNoRows { + return err + } + } if parentID.Valid { if _, err = tx.Exec("UPDATE msg SET pid = $1 WHERE id = $2", parentID.Int64, msgID); err != nil { return err @@ -168,10 +228,11 @@ values ($1, $2, $3)`) } -// storeMsgHeaderOnly stores just the message header for add-to notifications -// (spec code 11). Only the header is recorded so the header hash can be -// faithfully computed for subsequent messages referencing this one via pid. -func storeMsgHeaderOnly(msg *FMsgHeader) error { +// storeAddToBatch stores a batch of add-to recipients for an existing message. +// Called when we receive an add-to notification (code 11) — the parent message +// is already stored, we just need to record the new add-to recipients and the +// header hash so future messages can reference this version via pid. +func storeAddToBatch(msgID int64, msg *FMsgHeader) error { db, err := sql.Open("postgres", "") if err != nil { return err @@ -186,101 +247,71 @@ func storeMsgHeaderOnly(msg *FMsgHeader) error { headerHash := msg.GetHeaderHash() - var msgID int64 - err = tx.QueryRow(`insert into msg (version - , no_reply - , is_important - , is_deflate - , time_sent - , from_addr - , topic - , type - , sha256 - , psha256 - , size - , filepath) -values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) -returning id`, - msg.Version, - msg.Flags&FlagNoReply != 0, - msg.Flags&FlagImportant != 0, - msg.Flags&FlagDeflate != 0, - msg.Timestamp, - msg.From.ToString(), - msg.Topic, - msg.Type, - headerHash, - msg.Pid, - int(msg.Size), - "").Scan(&msgID) + // determine next batch_no for this msg + var batchNo int + err = tx.QueryRow(`SELECT coalesce(max(batch_no), 0) + 1 FROM msg_add_to_batch WHERE msg_id = $1`, msgID).Scan(&batchNo) if err != nil { return err } - // insert to recipients (for record keeping) - toStmt, err := tx.Prepare(`insert into msg_to (msg_id, addr) values ($1, $2)`) + var batchID int64 + err = tx.QueryRow(`INSERT INTO msg_add_to_batch (msg_id, batch_no, sha256) VALUES ($1, $2, $3) RETURNING id`, + msgID, batchNo, headerHash).Scan(&batchID) if err != nil { return err } - defer toStmt.Close() - for _, addr := range msg.To { - if _, err := toStmt.Exec(msgID, addr.ToString()); err != nil { - return err - } - } - // insert add-to recipients - if len(msg.AddTo) > 0 { - addToStmt, err := tx.Prepare(`insert into msg_add_to (msg_id, addr) values ($1, $2)`) - if err != nil { - return err - } - defer addToStmt.Close() - for _, addr := range msg.AddTo { - if _, err := addToStmt.Exec(msgID, addr.ToString()); err != nil { - return err - } - } + addToStmt, err := tx.Prepare(`INSERT INTO msg_add_to (batch_id, addr) VALUES ($1, $2)`) + if err != nil { + return err } - - // resolve pid from psha256 - if len(msg.Pid) > 0 { - var parentID sql.NullInt64 - err = tx.QueryRow("SELECT id FROM msg WHERE sha256 = $1", msg.Pid).Scan(&parentID) - if err != nil && err != sql.ErrNoRows { + defer addToStmt.Close() + for _, addr := range msg.AddTo { + if _, err := addToStmt.Exec(batchID, addr.ToString()); err != nil { return err } - if parentID.Valid { - if _, err = tx.Exec("UPDATE msg SET pid = $1 WHERE id = $2", parentID.Int64, msgID); err != nil { - return err - } - } } return tx.Commit() } -// loadMsg loads a message and all its recipients from the database within the +// loadMsg loads a message and its recipients from the database within the // given transaction and returns a fully populated FMsgHeader. // +// batchNo controls add-to recipient loading: +// - 0: load the original message without any add-to recipients +// - >0: load add-to recipients from the specified batch only +// +// When batchNo > 0, pid is overridden with the msg row's own sha256 +// (the original message hash) since add-to messages reference the original. // TODO [Spec]: Attachment headers are not loaded. Once the msg_attachment table // stores attachment metadata (flags, type, filename, size, filepath), loadMsg // should populate FMsgHeader.Attachments so the sender can write attachment // headers and data on the wire and compute a correct header/message hash. -func loadMsg(tx *sql.Tx, msgID int64) (*FMsgHeader, error) { +func loadMsg(tx *sql.Tx, msgID int64, batchNo int) (*FMsgHeader, error) { var version, size int var noReply, isImportant, isDeflate bool var pid, msgHash []byte - var fromAddr, topic, typ, filepath string + var fromAddr, topic, filepath string + var typ sql.NullString + var commonType sql.NullInt16 var timeSent float64 err := tx.QueryRow(` - SELECT version, no_reply, is_important, is_deflate, psha256, sha256, from_addr, topic, type, time_sent, size, filepath + SELECT version, no_reply, is_important, is_deflate, psha256, sha256, from_addr, topic, type, common_type, time_sent, size, filepath FROM msg WHERE id = $1 - `, msgID).Scan(&version, &noReply, &isImportant, &isDeflate, &pid, &msgHash, &fromAddr, &topic, &typ, &timeSent, &size, &filepath) + `, msgID).Scan(&version, &noReply, &isImportant, &isDeflate, &pid, &msgHash, &fromAddr, &topic, &typ, &commonType, &timeSent, &size, &filepath) if err != nil { return nil, fmt.Errorf("load msg %d: %w", msgID, err) } + // Resolve the type string: prefer the stored string; fall back to common type number. + var resolvedType string + if typ.Valid { + resolvedType = typ.String + } else if commonType.Valid { + resolvedType = numberToMediaType[uint8(commonType.Int16)] + } + recipRows, err := tx.Query(`SELECT addr FROM msg_to WHERE msg_id = $1 ORDER BY id`, msgID) if err != nil { return nil, fmt.Errorf("load recipients for msg %d: %w", msgID, err) @@ -312,41 +343,42 @@ func loadMsg(tx *sql.Tx, msgID int64) (*FMsgHeader, error) { allTo = append(allTo, *addr) } - // load add-to recipients from msg_add_to - addToRows, err := tx.Query(`SELECT addr FROM msg_add_to WHERE msg_id = $1 ORDER BY id`, msgID) - if err != nil { - return nil, fmt.Errorf("load add-to recipients for msg %d: %w", msgID, err) - } + // load add-to recipients up to the specified batch number var allAddTo []FMsgAddress - for addToRows.Next() { - var a string - if err := addToRows.Scan(&a); err != nil { - addToRows.Close() - return nil, fmt.Errorf("scan add-to addr: %w", err) - } - addr, err := parseAddress([]byte(a)) + if batchNo > 0 { + addToRows, err := tx.Query(`SELECT a.addr FROM msg_add_to a + JOIN msg_add_to_batch b ON a.batch_id = b.id + WHERE b.msg_id = $1 AND b.batch_no = $2 + ORDER BY a.id`, msgID, batchNo) if err != nil { - addToRows.Close() - return nil, fmt.Errorf("invalid add-to address %s: %w", a, err) + return nil, fmt.Errorf("load add-to recipients for msg %d batch %d: %w", msgID, batchNo, err) + } + for addToRows.Next() { + var a string + if err := addToRows.Scan(&a); err != nil { + addToRows.Close() + return nil, fmt.Errorf("scan add-to addr: %w", err) + } + addr, err := parseAddress([]byte(a)) + if err != nil { + addToRows.Close() + return nil, fmt.Errorf("invalid add-to address %s: %w", a, err) + } + allAddTo = append(allAddTo, *addr) + } + addToRows.Close() + if err := addToRows.Err(); err != nil { + return nil, fmt.Errorf("add-to recipients query err for msg %d: %w", msgID, err) } - allAddTo = append(allAddTo, *addr) - } - addToRows.Close() - if err := addToRows.Err(); err != nil { - return nil, fmt.Errorf("add-to recipients query err for msg %d: %w", msgID, err) } - // Compute flags bitfield from stored booleans and loaded data. - // has_pid and has_add_to are derived from actual data rather than stored, - // so add-to recipients added after the original message are included. - // - // When add-to recipients exist on a root message (no pid), set pid to the - // message's own hash so the wire format is valid: spec requires pid when - // add-to is present. This turns the outgoing message into an add-to - // notification referencing the original message. - if len(allAddTo) > 0 && len(pid) == 0 { + // When loading an add-to batch, pid is the original message's own hash + // (not its parent), since the add-to message references the original. + if batchNo > 0 { pid = msgHash } + + // Compute flags bitfield from stored booleans and loaded data. var flags uint8 if len(pid) > 0 { flags |= FlagHasPid @@ -363,6 +395,9 @@ func loadMsg(tx *sql.Tx, msgID int64) (*FMsgHeader, error) { if isDeflate { flags |= FlagDeflate } + if commonType.Valid { + flags |= FlagCommonType + } return &FMsgHeader{ Version: uint8(version), @@ -373,7 +408,7 @@ func loadMsg(tx *sql.Tx, msgID int64) (*FMsgHeader, error) { AddTo: allAddTo, Timestamp: timeSent, Topic: topic, - Type: typ, + Type: resolvedType, Size: uint32(size), Filepath: filepath, }, nil