diff --git a/docs/api/openapi.json b/docs/api/openapi.json index e828f0d6..e7b43333 100644 --- a/docs/api/openapi.json +++ b/docs/api/openapi.json @@ -23881,6 +23881,7 @@ "required": [ "ledgerId", "seriesId", + "seriesTitle", "sourceId", "pluginId", "language", @@ -23911,6 +23912,10 @@ "type": "string", "format": "uuid" }, + "seriesTitle": { + "type": "string", + "description": "Series display title (`series_metadata.title`, falling back to the\nseries directory name). Carried in the event so notifications can\nrender a clickable series link without a second round-trip." + }, "sourceId": { "type": "string", "format": "uuid" @@ -34859,6 +34864,18 @@ "$ref": "#/components/schemas/BoolOperator" } } + }, + { + "type": "object", + "description": "Filter by whether release tracking is enabled for the series.\n\n`IsTrue` returns only series whose `series_tracking.tracked` flag is\n`true`. `IsFalse` returns everything else, including series with no\n`series_tracking` row at all (the common case for a fresh library).", + "required": [ + "isTracked" + ], + "properties": { + "isTracked": { + "$ref": "#/components/schemas/BoolOperator" + } + } } ], "description": "Series-level search conditions\n\nConditions can be composed using `allOf` (AND) and `anyOf` (OR).\nUses untagged enum for cleaner JSON without explicit type field." diff --git a/plugins/release-mangaupdates/src/index.test.ts b/plugins/release-mangaupdates/src/index.test.ts index 0a066ec5..85105a23 100644 --- a/plugins/release-mangaupdates/src/index.test.ts +++ b/plugins/release-mangaupdates/src/index.test.ts @@ -1,6 +1,6 @@ import { HostRpcClient, type TrackedSeriesEntry } from "@ashdev/codex-plugin-sdk"; -import { describe, expect, it, vi } from "vitest"; -import { pollSeries } from "./index.js"; +import { beforeEach, describe, expect, it, vi } from "vitest"; +import { _resetState, poll, pollSeries } from "./index.js"; import { EXTERNAL_ID_SOURCE_MANGAUPDATES } from "./manifest.js"; // ----------------------------------------------------------------------------- @@ -218,4 +218,213 @@ describe("pollSeries", () => { expect(out.parsed).toBe(3); expect(out.recorded).toBe(0); // every record returned deduped:true }); + + it("uses the channel-level as payloadUrl on the v1 RSS feed shape", async () => { + // The current MU v1 feed has no per-item . The plugin should + // fall through to the channel-level link (the series page) rather + // than emitting an opaque `urn:mu:` URN, which is useless for the + // user clicking through from the inbox. + const v1Feed = ` + + + https://www.mangaupdates.com/series/uu4rl66/series-slug + + Series v.13 c.116 + Galaxy Degen Scans + + + Series c.113a + Comikey + + + Series + OneshotGroup + + + `; + const { rpc, calls } = makeMockRpc(() => ({ ledgerId: "ld", deduped: false })); + const out = await pollSeries(rpc, "src-1", trackedEntry("series-1"), { + blockedGroups: [], + timeoutMs: 1000, + fetchImpl: mockFetchOk(v1Feed), + }); + // Title-only item (no chapter/volume) is dropped before reaching record(). + expect(out.parsed).toBe(2); + expect(out.recorded).toBe(2); + + const recordCalls = calls.filter((c) => c.method === "releases/record"); + expect(recordCalls).toHaveLength(2); + for (const call of recordCalls) { + const params = call.params as { candidate: { payloadUrl: string } }; + expect(params.candidate.payloadUrl).toBe( + "https://www.mangaupdates.com/series/uu4rl66/series-slug", + ); + } + }); + + it("emits distinct candidates for the same chapter from different groups", async () => { + // Three groups releasing the same chapter must surface as three + // ledger rows. The externalReleaseId hash includes the group so the + // host's `(source_id, external_release_id)` dedup doesn't collapse + // them into one. + const sameChapterDifferentGroups = ` + + + https://www.mangaupdates.com/series/abc/series + + Series c.200 + Asura + + + Series c.200 + FLAME-SCANS + + + Series c.200 + LeviatanScans + + + `; + const { rpc, calls } = makeMockRpc(() => ({ ledgerId: "ld", deduped: false })); + const out = await pollSeries(rpc, "src-1", trackedEntry("series-1"), { + blockedGroups: [], + timeoutMs: 1000, + fetchImpl: mockFetchOk(sameChapterDifferentGroups), + }); + expect(out.parsed).toBe(3); + expect(out.recorded).toBe(3); + + const ids = calls + .filter((c) => c.method === "releases/record") + .map( + (c) => + (c.params as { candidate: { externalReleaseId: string } }).candidate.externalReleaseId, + ); + expect(new Set(ids).size).toBe(3); + }); +}); + +// ----------------------------------------------------------------------------- +// poll (top-level): count_tracked + report_progress integration +// ----------------------------------------------------------------------------- + +describe("poll", () => { + beforeEach(() => { + _resetState(); + }); + + it("calls count_tracked once and report_progress per series with the right denominator", async () => { + // Two tracked series, both with MU IDs, both upstream-200 with one item. + const tracked: TrackedSeriesEntry[] = [ + { + seriesId: "series-1", + externalIds: { [EXTERNAL_ID_SOURCE_MANGAUPDATES]: "11111" } as Record, + }, + { + seriesId: "series-2", + externalIds: { [EXTERNAL_ID_SOURCE_MANGAUPDATES]: "22222" } as Record, + }, + ]; + const v1Feed = ` + + + https://www.mangaupdates.com/series/abc/series + + Series c.1 + SomeGroup + + + `; + + const { rpc, calls } = makeMockRpc((method) => { + if (method === "releases/count_tracked") return { total: tracked.length }; + if (method === "releases/list_tracked") return { tracked, nextOffset: undefined }; + if (method === "releases/record") return { ledgerId: "ld", deduped: false }; + if (method === "releases/report_progress") return { emitted: true }; + throw new Error(`unexpected method: ${method}`); + }); + + // Initialize plugin state directly (bypass createReleaseSourcePlugin). + // The SDK normally injects state through `onInitialize`; for this test + // we only need the RPC client wired up, since `poll` reads `state.*` + // for blocked groups + timeout but works fine with the defaults. + // + // `Response.text()` consumes the body, so each `fetch` call needs a + // fresh `Response` — `mockImplementation` returns a new instance each + // invocation. + const fetchImpl = vi + .fn() + .mockImplementation( + async () => new Response(v1Feed, { status: 200, headers: { etag: '"e"' } }), + ) as unknown as typeof fetch; + // Replace global fetch for this test so pollSeries -> fetcher uses it. + const origFetch = globalThis.fetch; + globalThis.fetch = fetchImpl; + try { + await poll({ sourceId: "src-1", sourceKey: "default", config: null, etag: null }, rpc); + } finally { + globalThis.fetch = origFetch; + } + + const countCalls = calls.filter((c) => c.method === "releases/count_tracked"); + expect(countCalls).toHaveLength(1); + expect((countCalls[0]?.params as { sourceId: string }).sourceId).toBe("src-1"); + + const progressCalls = calls.filter((c) => c.method === "releases/report_progress"); + // One emit per tracked series: 2 total. Denominator equals count. + expect(progressCalls).toHaveLength(2); + expect(progressCalls[0]?.params).toMatchObject({ current: 1, total: 2 }); + expect(progressCalls[1]?.params).toMatchObject({ current: 2, total: 2 }); + }); + + it("falls back to progressive denominator when count_tracked is unsupported", async () => { + // Older host: count_tracked returns METHOD_NOT_FOUND. The plugin + // should keep working and emit progress with `total = current`. + const tracked: TrackedSeriesEntry[] = [ + { + seriesId: "series-1", + externalIds: { [EXTERNAL_ID_SOURCE_MANGAUPDATES]: "11111" } as Record, + }, + ]; + const v1Feed = ` + + + https://www.mangaupdates.com/series/abc/series + Series c.1G + + `; + + const { rpc, calls } = makeMockRpc((method) => { + if (method === "releases/count_tracked") { + // Synthesize a JSON-RPC METHOD_NOT_FOUND error. + const err = Object.assign(new Error("Method not found"), { code: -32_601 }); + // Throwing inside `respond` is captured by the mock writeFn and + // surfaced as an error response; HostRpcClient wraps it in + // HostRpcError which the plugin catches. + throw err; + } + if (method === "releases/list_tracked") return { tracked, nextOffset: undefined }; + if (method === "releases/record") return { ledgerId: "ld", deduped: false }; + if (method === "releases/report_progress") return { emitted: true }; + throw new Error(`unexpected method: ${method}`); + }); + + const fetchImpl = vi + .fn() + .mockResolvedValue( + new Response(v1Feed, { status: 200, headers: { etag: '"e"' } }), + ) as unknown as typeof fetch; + const origFetch = globalThis.fetch; + globalThis.fetch = fetchImpl; + try { + await poll({ sourceId: "src-1", sourceKey: "default", config: null, etag: null }, rpc); + } finally { + globalThis.fetch = origFetch; + } + + const progressCalls = calls.filter((c) => c.method === "releases/report_progress"); + expect(progressCalls).toHaveLength(1); + // No total known => current == total. + expect(progressCalls[0]?.params).toMatchObject({ current: 1, total: 1 }); + }); }); diff --git a/plugins/release-mangaupdates/src/index.ts b/plugins/release-mangaupdates/src/index.ts index e1356013..6f3dee29 100644 --- a/plugins/release-mangaupdates/src/index.ts +++ b/plugins/release-mangaupdates/src/index.ts @@ -82,6 +82,10 @@ interface RecordResponse { deduped: boolean; } +interface CountTrackedResponse { + total: number; +} + async function listTracked( rpc: HostRpcClient, sourceId: string, @@ -95,6 +99,54 @@ async function listTracked( }); } +/** + * Total tracked-series denominator for this source, scoped by the + * plugin's `requires_external_ids` manifest declaration. Returns `null` + * when the host doesn't know the method (older host build) — callers + * fall back to progressive denominator emits in that case. + */ +async function countTracked(rpc: HostRpcClient, sourceId: string): Promise { + try { + const r = await rpc.call(RELEASES_METHODS.COUNT_TRACKED, { + sourceId, + }); + return r.total; + } catch (err) { + if (err instanceof HostRpcError && err.code === -32601) { + // Host doesn't know `count_tracked` — older build. Degrade silently. + return null; + } + const reason = err instanceof Error ? err.message : String(err); + logger.warn(`count_tracked failed for ${sourceId}: ${reason}`); + return null; + } +} + +/** + * Best-effort progress emit. Failures are swallowed — progress is a + * UX nice-to-have, never a reason to abort a poll. + */ +async function reportProgress( + rpc: HostRpcClient, + current: number, + total: number, + message?: string, +): Promise { + try { + await rpc.call( + RELEASES_METHODS.REPORT_PROGRESS, + message !== undefined ? { current, total, message } : { current, total }, + ); + } catch (err) { + if (err instanceof HostRpcError && err.code === -32601) { + // Older host without progress support — silently drop. + return; + } + const reason = err instanceof Error ? err.message : String(err); + logger.debug(`report_progress dropped: ${reason}`); + } +} + async function recordCandidate( rpc: HostRpcClient, sourceId: string, @@ -175,8 +227,24 @@ function effectiveLanguagesForSeries(_entry: TrackedSeriesEntry): string[] { /** * Map a `ParsedRssItem` to a `ReleaseCandidate`. Confidence is 1.0 because * the match is keyed by external ID — there's no fuzzy matching. + * + * `payloadUrl` priority: per-item link (legacy feed shape) → channel-level + * series page link (current v1 RSS shape) → last-resort `urn:mu:` URN. The + * URN fallback should never fire in practice; it exists so a malformed + * feed without even a channel link doesn't break the host's non-empty + * `payload_url` invariant. */ -function toCandidate(entry: TrackedSeriesEntry, item: ParsedRssItem): ReleaseCandidate { +function toCandidate( + entry: TrackedSeriesEntry, + item: ParsedRssItem, + channelLink: string | null, +): ReleaseCandidate { + const payloadUrl = + item.link.length > 0 + ? item.link + : channelLink && channelLink.length > 0 + ? channelLink + : `urn:mu:${item.externalReleaseId}`; const candidate: ReleaseCandidate = { seriesMatch: { codexSeriesId: entry.seriesId, @@ -191,7 +259,7 @@ function toCandidate(entry: TrackedSeriesEntry, item: ParsedRssItem): ReleaseCan chapters: item.chapter === null ? null : [{ start: item.chapter, end: item.chapter }], language: item.language, groupOrUploader: item.group, - payloadUrl: item.link.length > 0 ? item.link : `urn:mu:${item.externalReleaseId}`, + payloadUrl, observedAt: item.observedAt, }; return candidate; @@ -294,7 +362,7 @@ export async function pollSeries( } // result.kind === "ok" - const items = parseFeed(result.body); + const { items, channelLink } = parseFeed(result.body); const filters = resolveFilters({ languages: effectiveLanguagesForSeries(entry), blockedGroups: options.blockedGroups, @@ -305,7 +373,7 @@ export async function pollSeries( for (const item of items) { if (!passesFilters(item, filters)) continue; matched++; - const candidate = toCandidate(entry, item); + const candidate = toCandidate(entry, item, channelLink); const outcome = await recordCandidate(rpc, sourceId, candidate); if (!outcome) continue; if (outcome.deduped) { @@ -332,10 +400,24 @@ export async function pollSeries( // Top-level poll handler // ============================================================================= -async function poll(params: ReleasePollRequest, rpc: HostRpcClient): Promise { +/** + * Top-level poll handler. Exported for tests (no underscore prefix because + * it's actually a load-bearing function that just happens to live behind + * the SDK plugin wrapper at module scope; `_resetState` is the + * pattern for state-only test seams). + */ +export async function poll( + params: ReleasePollRequest, + rpc: HostRpcClient, +): Promise { const sourceId = params.sourceId; const blockedGroups = parseCommaList(state.blockedGroupsCsv); + // Pre-count so progress emits can carry a stable denominator. Falls + // back to progressive ('N polled' with no total) when the host doesn't + // implement count_tracked, keeping us forward-compatible. + const total = await countTracked(rpc, sourceId); + let parsed = 0; let matched = 0; let recorded = 0; @@ -369,6 +451,16 @@ async function poll(params: ReleasePollRequest, rpc: HostRpcClient): Promise 0) { diff --git a/plugins/release-mangaupdates/src/parser.test.ts b/plugins/release-mangaupdates/src/parser.test.ts index 4023be76..b48041ce 100644 --- a/plugins/release-mangaupdates/src/parser.test.ts +++ b/plugins/release-mangaupdates/src/parser.test.ts @@ -82,6 +82,41 @@ describe("parseTitle", () => { const t = parseTitle("c.143 by Group (en) "); expect(t.language).toBe("en"); }); + + it("parses chapter from a current-format title (series prefix, no group)", () => { + // The MU v1 RSS feed ships titles like 'Series Name v.13 c.116' with + // the group living in . Chapter and volume must still + // come out cleanly; group is null because the title doesn't carry it. + const t = parseTitle("Solo Leveling v.13 c.116"); + expect(t.chapter).toBe(116); + expect(t.volume).toBe(13); + expect(t.group).toBeNull(); + expect(t.language).toBe("en"); + }); + + it("strips letter suffix from chapter (c.113a -> 113)", () => { + // MangaUpdates uses 'a'/'b' suffixes for split chapter releases. The + // older `\b` regex required a word boundary the digit-letter join + // can't satisfy, so these dropped to chapter=null. Capture the integer + // and let the group-keyed externalReleaseId keep the halves distinct. + const a = parseTitle("Series v.13 c.113a"); + expect(a.chapter).toBe(113); + expect(a.volume).toBe(13); + const b = parseTitle("Series v.13 c.113b"); + expect(b.chapter).toBe(113); + expect(b.volume).toBe(13); + }); + + it("preserves decimal chapters when an `a/b` suffix is absent", () => { + const t = parseTitle("Series c.113.5"); + expect(t.chapter).toBe(113.5); + }); + + it("returns null chapter and volume for a series-name-only title", () => { + const t = parseTitle("Solo Leveling"); + expect(t.chapter).toBeNull(); + expect(t.volume).toBeNull(); + }); }); // ----------------------------------------------------------------------------- @@ -132,6 +167,52 @@ describe("parseItem", () => { expect(a?.externalReleaseId.startsWith("t:")).toBe(true); }); + it("includes the group in the deterministic id so different groups don't collide", () => { + // The current MU v1 RSS feed has no // per item, + // so all 3 fall to the deterministic-hash branch. If the hash didn't + // include the group, three groups posting the same chapter would all + // hash to the same externalReleaseId and dedupe down to one row. + const a = parseItem(` + Series c.200 + Asura + `); + const b = parseItem(` + Series c.200 + FLAME-SCANS + `); + const c = parseItem(` + Series c.200 + Asura + `); + expect(a?.externalReleaseId).not.toBe(b?.externalReleaseId); + // Same group + same title hashes to the same id (idempotent re-poll). + expect(a?.externalReleaseId).toBe(c?.externalReleaseId); + }); + + it("reads the scanlation group from on the v1 RSS feed", () => { + const xml = ` + Solo Leveling v.13 c.116 + Galaxy Degen Scans + `; + const item = parseItem(xml); + expect(item).not.toBeNull(); + if (!item) return; + expect(item.group).toBe("Galaxy Degen Scans"); + expect(item.chapter).toBe(116); + expect(item.volume).toBe(13); + }); + + it("skips items that carry neither chapter nor volume", () => { + // Series-name-only entries / oneshot announcements / series headers + // are inbox noise — the host has no useful sort key for them and they + // surface as empty `Ch / Vol` rows in the UI. + const xml = ` + Solo Leveling + Some Group + `; + expect(parseItem(xml)).toBeNull(); + }); + it("returns null for a malformed item missing title", () => { const xml = `https://example.com`; expect(parseItem(xml)).toBeNull(); @@ -203,7 +284,8 @@ const multilingualFeed = ` describe("parseFeed", () => { it("parses all items in a multi-language fixture", () => { - const items = parseFeed(multilingualFeed); + const { items, channelLink } = parseFeed(multilingualFeed); + expect(channelLink).toBeNull(); expect(items).toHaveLength(5); expect(items[0]?.language).toBe("en"); expect(items[1]?.language).toBe("es"); @@ -216,12 +298,52 @@ describe("parseFeed", () => { expect(items[4]?.language).toBe("en"); }); - it("returns an empty array for an empty channel", () => { - expect(parseFeed("")).toEqual([]); + it("returns an empty result for an empty channel", () => { + expect(parseFeed("")).toEqual({ + channelLink: null, + items: [], + }); }); - it("returns an empty array for malformed XML", () => { + it("returns an empty result for malformed XML", () => { // Non-fatal: parseFeed should never throw, just return whatever it can. - expect(parseFeed("<<>>")).toEqual([]); + expect(parseFeed("<<>>")).toEqual({ channelLink: null, items: [] }); + }); + + it("extracts the channel-level link from the v1 RSS shape", () => { + // Mirror of the real `https://api.mangaupdates.com/v1/series/{id}/rss` + // shape: chapters in the title, group in , no per-item + // links, channel-level link points at the series page. + const v1Feed = ` + + + Series Title - Releases on MangaUpdates + https://www.mangaupdates.com/series/uu4rl66/series-slug + ... + + Series Title v.13 c.116 + Galaxy Degen Scans + + + Series Title c.113a + Comikey + + + Series Title + OneshotGroup + + + `; + const { items, channelLink } = parseFeed(v1Feed); + expect(channelLink).toBe("https://www.mangaupdates.com/series/uu4rl66/series-slug"); + // Third item drops out: no chapter, no volume. + expect(items).toHaveLength(2); + expect(items[0]?.chapter).toBe(116); + expect(items[0]?.volume).toBe(13); + expect(items[0]?.group).toBe("Galaxy Degen Scans"); + // c.113a -> chapter 113, suffix discarded. + expect(items[1]?.chapter).toBe(113); + expect(items[1]?.volume).toBeNull(); + expect(items[1]?.group).toBe("Comikey"); }); }); diff --git a/plugins/release-mangaupdates/src/parser.ts b/plugins/release-mangaupdates/src/parser.ts index 321cc2c1..56b2abc4 100644 --- a/plugins/release-mangaupdates/src/parser.ts +++ b/plugins/release-mangaupdates/src/parser.ts @@ -3,12 +3,16 @@ * * Per-series feed: `https://api.mangaupdates.com/v1/series/{series_id}/rss` * - * Each `` is one scanlation release. The plugin extracts: - * - chapter / volume from the title - * - scanlation group from the title - * - language tag (parenthesized two-letter code) from the title - * - link (the MangaUpdates release page) used as `payloadUrl` - * - pubDate as `observedAt` + * The v1 RSS feed is intentionally sparse: + * - `` carries `{Series Name} {v.N}? {c.N}` — chapter and/or volume + * suffixed with optional letter (`c.113a`, `c.113b` for split chapters) + * - `<description>` carries the scanlation group name + * - per-item `<link>`, `<guid>`, `<pubDate>` are NOT present; only the + * channel-level `<link>` (the series page on mangaupdates.com) exists + * + * Items that carry neither chapter nor volume info are dropped — they're + * usually announcements ("oneshot release", series-name-only entries) and + * have no place in an inbox. * * Implementation note: we do NOT pull in a heavy XML parser. The MangaUpdates * RSS format is simple, well-formed, and stable. A small targeted regex @@ -112,23 +116,34 @@ export function parseTitle(title: string): { } { const trimmed = title.trim(); - // Chapter: c.N or ch.N (allow decimals). + // Chapter: c.N or ch.N. Decimals (`47.5`) and letter suffixes (`113a`, + // `113b` for split chapters) are both supported; the letter suffix is + // stripped so `c.113a` and `c.113b` map to chapter 113. Letter-suffix + // variants get distinct externalReleaseIds via the group, so they remain + // separate ledger rows even though they share an integer. The lookahead + // (`(?![0-9])`) replaces the older `\b` so the trailing letter doesn't + // block the match the way `\b` does between two word characters. let chapter: number | null = null; - const chMatch = trimmed.match(/\bc(?:h)?\.?\s*([0-9]+(?:\.[0-9]+)?)\b/i); + const chMatch = trimmed.match(/\bc(?:h)?\.?\s*([0-9]+(?:\.[0-9]+)?)[a-z]?(?![0-9])/i); if (chMatch?.[1]) { const n = Number.parseFloat(chMatch[1]); if (Number.isFinite(n)) chapter = n; } - // Volume: v.N or vol.N. + // Volume: v.N or vol.N. Letter suffixes accepted and discarded for the + // same reason as chapters. let volume: number | null = null; - const volMatch = trimmed.match(/\bv(?:ol)?\.?\s*([0-9]+)\b/i); + const volMatch = trimmed.match(/\bv(?:ol)?\.?\s*([0-9]+)[a-z]?(?![0-9])/i); if (volMatch?.[1]) { const n = Number.parseInt(volMatch[1], 10); if (Number.isFinite(n)) volume = n; } - // Group: "by <Group>" up to "(" or end. + // Group: legacy "by <Group>" pattern. The current MangaUpdates v1 RSS + // feed places the scanlation group in `<description>`, not the title; + // this branch is kept as a fallback so older / legacy feed shapes still + // surface a group. Captured up to `(` or end-of-string so a trailing + // `(en)` language tag doesn't bleed into the group name. let group: string | null = null; const groupMatch = trimmed.match(/\bby\s+(.+?)(?:\s*\([a-z]{2,3}\)\s*)?$/i); if (groupMatch?.[1]) { @@ -174,23 +189,29 @@ function pubDateToIso(raw: string | null): string { } /** - * Derive a stable external_release_id. Prefer `<guid>`, then the link URL, - * otherwise fall back to a deterministic hash of `(title + pubDate)`. + * Derive a stable external_release_id. * - * Stability is what matters: re-polling the same item must produce the same - * ID so the host's `(source_id, external_release_id)` dedup catches it. + * Priority: + * 1. `<guid>` if present (richest legacy format). + * 2. `<link>` if present (legacy format with per-item links). + * 3. Deterministic hash of `(title + group + pubDate)` for the current + * v1 RSS shape, which carries none of the above per-item fields. + * Including the group in the hash is what lets multiple groups + * releasing the same chapter ("c.200" by Asura, by FLAME-SCANS, + * by LeviatanScans) hash to distinct IDs and become distinct + * ledger rows. Same-group same-chapter re-polls collide on the + * hash and dedupe, which is what the host expects. */ function deriveExternalReleaseId( guid: string | null, link: string | null, title: string, + group: string | null, pubDate: string | null, ): string { if (guid && guid.trim().length > 0) return guid.trim(); if (link && link.trim().length > 0) return link.trim(); - // Deterministic fallback for feeds that omit both. djb2-ish hash keeps the - // ID short while staying stable across polls. - const fallback = `${title}|${pubDate ?? ""}`; + const fallback = `${title}|${group ?? ""}|${pubDate ?? ""}`; let h = 5381; for (let i = 0; i < fallback.length; i++) { h = ((h << 5) + h + fallback.charCodeAt(i)) | 0; @@ -200,7 +221,10 @@ function deriveExternalReleaseId( /** * Parse a single MangaUpdates `<item>` block into a `ParsedRssItem`. Returns - * null if the title is missing entirely (truly malformed item). + * null when the item is unusable: + * - missing `<title>` (truly malformed), or + * - title carries neither chapter nor volume (announcements, oneshot + * stubs, series-name-only entries — pure inbox noise). */ export function parseItem(itemXml: string): ParsedRssItem | null { const title = extractTagText(itemXml, "title"); @@ -209,11 +233,18 @@ export function parseItem(itemXml: string): ParsedRssItem | null { const link = extractTagText(itemXml, "link"); const guid = extractTagText(itemXml, "guid"); const pubDate = extractTagText(itemXml, "pubDate"); + const description = extractTagText(itemXml, "description"); + + const { chapter, volume, group: groupFromTitle, language } = parseTitle(title); + if (chapter === null && volume === null) return null; - const { chapter, volume, group, language } = parseTitle(title); + // The v1 RSS feed places the scanlation group in `<description>`. Prefer + // it; fall back to the legacy "by <Group>" title pattern. + const descTrimmed = description?.trim(); + const group = descTrimmed && descTrimmed.length > 0 ? descTrimmed : groupFromTitle; return { - externalReleaseId: deriveExternalReleaseId(guid, link, title, pubDate), + externalReleaseId: deriveExternalReleaseId(guid, link, title, group, pubDate), title, chapter, volume, @@ -224,13 +255,42 @@ export function parseItem(itemXml: string): ParsedRssItem | null { }; } +/** Parsed feed: items plus the channel-level link (if any). */ +export interface ParsedFeed { + /** Channel-level `<link>` — the series page on mangaupdates.com. Used as + * the `payloadUrl` for releases when no per-item link exists (the v1 + * RSS shape). `null` when the channel block is missing or malformed. */ + channelLink: string | null; + items: ParsedRssItem[]; +} + +/** + * Parse a full MangaUpdates per-series RSS feed body. Items that fail + * `parseItem` (missing title, or no chapter/volume) are dropped silently — + * the feed parser is best-effort tolerant. + */ +export function parseFeed(xml: string): ParsedFeed { + return { + channelLink: extractChannelLink(xml), + items: splitItems(xml) + .map(parseItem) + .filter((i): i is ParsedRssItem => i !== null), + }; +} + /** - * Parse a full MangaUpdates per-series RSS feed body into items. Bad items - * (missing title) are dropped silently — the feed should be best-effort - * tolerant. + * Extract the channel-level `<link>` from a feed. The v1 RSS feed uses + * `<channel><link>https://...</link></channel>` and that URL is the series + * page on mangaupdates.com. We prefer the first `<link>` *outside* any + * `<item>` block so per-item legacy links (which we don't expect at the + * channel level anyway) can never bleed in. */ -export function parseFeed(xml: string): ParsedRssItem[] { - return splitItems(xml) - .map(parseItem) - .filter((i): i is ParsedRssItem => i !== null); +function extractChannelLink(xml: string): string | null { + // Strip every <item>...</item> block before searching — cheap way to + // scope to the channel header. + const stripped = xml.replace(/<item\b[^>]*>[\s\S]*?<\/item>/gi, ""); + const link = extractTagText(stripped, "link"); + if (!link) return null; + const trimmed = link.trim(); + return trimmed.length > 0 ? trimmed : null; } diff --git a/plugins/sdk-typescript/src/types/releases.ts b/plugins/sdk-typescript/src/types/releases.ts index 1345acbb..a2ce26d4 100644 --- a/plugins/sdk-typescript/src/types/releases.ts +++ b/plugins/sdk-typescript/src/types/releases.ts @@ -21,6 +21,25 @@ export const RELEASES_METHODS = { /** List tracked series, scoped to what the plugin's manifest declared. */ LIST_TRACKED: "releases/list_tracked", + /** + * Count tracked series scoped to the plugin's `requiresExternalIds`. + * + * Plugins call this once at the start of a poll to learn the total + * denominator before iterating, so subsequent `REPORT_PROGRESS` calls + * carry a stable `current/total` ratio. Cheap (one batched DB lookup); + * safe to call from `poll`. + */ + COUNT_TRACKED: "releases/count_tracked", + /** + * Report intra-poll progress to the host. The host translates this into + * a `TaskProgressEvent` on the active task's broadcaster; the inbox + * progress bar updates live. Best-effort — calls outside an active + * task scope are silently dropped, and rapid back-to-back calls are + * rate-limited (~10/sec) by the host. Plugins SHOULD call this after + * each unit of work (e.g. after each polled series) with `current` set + * to the count of completed units and `total` from `COUNT_TRACKED`. + */ + REPORT_PROGRESS: "releases/report_progress", /** Submit a candidate to the host's release ledger. */ RECORD: "releases/record", /** Get persisted per-source state (etag, last_polled_at, last_error). */ diff --git a/src/api/routes/v1/dto/filter.rs b/src/api/routes/v1/dto/filter.rs index 83a0c145..27ad97f5 100644 --- a/src/api/routes/v1/dto/filter.rs +++ b/src/api/routes/v1/dto/filter.rs @@ -107,6 +107,15 @@ pub enum SeriesCondition { #[serde(rename = "hasUserRating")] has_user_rating: BoolOperator, }, + /// Filter by whether release tracking is enabled for the series. + /// + /// `IsTrue` returns only series whose `series_tracking.tracked` flag is + /// `true`. `IsFalse` returns everything else, including series with no + /// `series_tracking` row at all (the common case for a fresh library). + IsTracked { + #[serde(rename = "isTracked")] + is_tracked: BoolOperator, + }, } /// Book-level search conditions @@ -631,4 +640,39 @@ mod tests { _ => panic!("Expected HasUserRating condition with IsTrue operator"), } } + + #[test] + fn test_is_tracked_condition_is_true() { + let condition = SeriesCondition::IsTracked { + is_tracked: BoolOperator::IsTrue, + }; + + let json = serde_json::to_string(&condition).unwrap(); + assert!(json.contains(r#""isTracked""#)); + assert!(json.contains(r#""operator":"isTrue""#)); + } + + #[test] + fn test_is_tracked_condition_is_false() { + let condition = SeriesCondition::IsTracked { + is_tracked: BoolOperator::IsFalse, + }; + + let json = serde_json::to_string(&condition).unwrap(); + assert!(json.contains(r#""isTracked""#)); + assert!(json.contains(r#""operator":"isFalse""#)); + } + + #[test] + fn test_is_tracked_condition_deserialization() { + let json = r#"{"isTracked":{"operator":"isTrue"}}"#; + let condition: SeriesCondition = serde_json::from_str(json).unwrap(); + + match condition { + SeriesCondition::IsTracked { + is_tracked: BoolOperator::IsTrue, + } => {} + _ => panic!("Expected IsTracked condition with IsTrue operator"), + } + } } diff --git a/src/events/mod.rs b/src/events/mod.rs index dc3e9a52..7daacf4a 100644 --- a/src/events/mod.rs +++ b/src/events/mod.rs @@ -12,7 +12,10 @@ mod task_context; mod types; pub use broadcaster::{EventBroadcaster, RecordedEvent}; -pub use task_context::{current_recording_broadcaster, with_recording_broadcaster}; +pub use task_context::{ + TaskIdentity, current_recording_broadcaster, current_task_identity, with_recording_broadcaster, + with_task_identity, +}; // TaskProgress is part of the public API for task progress reporting #[allow(unused_imports)] pub use types::{ diff --git a/src/events/task_context.rs b/src/events/task_context.rs index df5aad83..b5219e0a 100644 --- a/src/events/task_context.rs +++ b/src/events/task_context.rs @@ -19,6 +19,9 @@ //! call), so the task-local set up by [`crate::tasks::worker`] is in scope. use std::sync::Arc; +use std::sync::Mutex; + +use uuid::Uuid; use super::EventBroadcaster; @@ -27,6 +30,49 @@ tokio::task_local! { /// worker around `handler.handle(...)`. Read by reverse-RPC handlers via /// [`current_recording_broadcaster`]. static CURRENT_RECORDING_BROADCASTER: Arc<EventBroadcaster>; + /// Identity + progress-throttle state for the currently-executing task. + /// Set by the worker around `handler.handle(...)`. Read by reverse-RPC + /// handlers via [`current_task_identity`] when they need to construct a + /// `TaskProgressEvent` (which requires the task's id and type) or + /// rate-limit progress emits. + static CURRENT_TASK_IDENTITY: Arc<TaskIdentity>; +} + +/// Task identity exposed to reverse-RPC handlers via the +/// [`CURRENT_TASK_IDENTITY`] task-local. Carries the fields needed to build +/// a [`super::TaskProgressEvent`] plus a tiny throttle-state cell so +/// `releases/report_progress` can drop emits arriving faster than the +/// configured cadence. +#[derive(Debug)] +pub struct TaskIdentity { + pub task_id: Uuid, + pub task_type: String, + pub library_id: Option<Uuid>, + pub series_id: Option<Uuid>, + pub book_id: Option<Uuid>, + /// Last time a progress emit went through. `None` until the first emit. + /// Wrapped in a `Mutex` so reverse-RPC handlers (which see the identity + /// behind an `Arc`) can update it without a `&mut`. + pub last_progress_emit: Mutex<Option<std::time::Instant>>, +} + +impl TaskIdentity { + pub fn new( + task_id: Uuid, + task_type: impl Into<String>, + library_id: Option<Uuid>, + series_id: Option<Uuid>, + book_id: Option<Uuid>, + ) -> Self { + Self { + task_id, + task_type: task_type.into(), + library_id, + series_id, + book_id, + last_progress_emit: Mutex::new(None), + } + } } /// Run `fut` with `broadcaster` as the current task's recording broadcaster. @@ -51,6 +97,21 @@ pub fn current_recording_broadcaster() -> Option<Arc<EventBroadcaster>> { CURRENT_RECORDING_BROADCASTER.try_with(|b| b.clone()).ok() } +/// Run `fut` with `identity` as the current task's identity. +pub async fn with_task_identity<F, T>(identity: Arc<TaskIdentity>, fut: F) -> T +where + F: std::future::Future<Output = T>, +{ + CURRENT_TASK_IDENTITY.scope(identity, fut).await +} + +/// Snapshot the current task's identity, if any. +/// +/// Returns `None` when called outside of a `with_task_identity` scope. +pub fn current_task_identity() -> Option<Arc<TaskIdentity>> { + CURRENT_TASK_IDENTITY.try_with(|i| i.clone()).ok() +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/events/types.rs b/src/events/types.rs index 9b7e547a..7f62093e 100644 --- a/src/events/types.rs +++ b/src/events/types.rs @@ -179,6 +179,11 @@ pub enum EntityEvent { ledger_id: Uuid, #[serde(rename = "seriesId")] series_id: Uuid, + /// Series display title (`series_metadata.title`, falling back to the + /// series directory name). Carried in the event so notifications can + /// render a clickable series link without a second round-trip. + #[serde(rename = "seriesTitle")] + series_title: String, #[serde(rename = "sourceId")] source_id: Uuid, /// Plugin name that owns the source (`release_sources.plugin_id`). @@ -287,14 +292,19 @@ impl EntityChangeEvent { /// /// Wraps the variant construction so callers in the polling task and the /// reverse-RPC handler share one source of truth for the event shape. + /// `series_title` should be the canonical display title for the series + /// (typically `series_metadata.title`, falling back to the series + /// directory name); the frontend renders it as a clickable link. pub fn release_announced( row: &crate::db::entities::release_ledger::Model, plugin_id: &str, + series_title: String, ) -> Self { Self::new( EntityEvent::ReleaseAnnounced { ledger_id: row.id, series_id: row.series_id, + series_title, source_id: row.source_id, plugin_id: plugin_id.to_string(), chapter: row.chapter, diff --git a/src/services/filter.rs b/src/services/filter.rs index 559e72b6..a5b29665 100644 --- a/src/services/filter.rs +++ b/src/services/filter.rs @@ -146,6 +146,10 @@ impl FilterService { Self::filter_by_has_user_rating(db, has_user_rating, candidate_ids, user_id) .await } + + SeriesCondition::IsTracked { is_tracked } => { + Self::filter_by_is_tracked(db, is_tracked, candidate_ids).await + } } }) } @@ -906,6 +910,64 @@ impl FilterService { } } + /// Filter series by whether release tracking is enabled. + /// + /// - `IsTrue`: return series whose `series_tracking.tracked` flag is `true`. + /// - `IsFalse`: return everything else, *including* series that have no + /// `series_tracking` row at all (the common case for a fresh library). + async fn filter_by_is_tracked( + db: &DatabaseConnection, + operator: &BoolOperator, + candidate_ids: Option<&HashSet<Uuid>>, + ) -> Result<HashSet<Uuid>> { + use crate::db::entities::{series, series_tracking}; + use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, QuerySelect}; + + // Series with tracking explicitly enabled. + let tracked_series: HashSet<Uuid> = series_tracking::Entity::find() + .select_only() + .column(series_tracking::Column::SeriesId) + .filter(series_tracking::Column::Tracked.eq(true)) + .into_tuple() + .all(db) + .await? + .into_iter() + .collect(); + + match operator { + BoolOperator::IsTrue => { + if let Some(candidates) = candidate_ids { + Ok(tracked_series.intersection(candidates).cloned().collect()) + } else { + Ok(tracked_series) + } + } + BoolOperator::IsFalse => { + if let Some(candidates) = candidate_ids { + Ok(candidates + .iter() + .filter(|id| !tracked_series.contains(id)) + .cloned() + .collect()) + } else { + let all_series: HashSet<Uuid> = series::Entity::find() + .select_only() + .column(series::Column::Id) + .into_tuple() + .all(db) + .await? + .into_iter() + .collect(); + + Ok(all_series + .into_iter() + .filter(|id| !tracked_series.contains(id)) + .collect()) + } + } + } + } + async fn filter_by_status( db: &DatabaseConnection, operator: &FieldOperator, @@ -2438,4 +2500,32 @@ mod tests { _ => panic!("Expected HasExternalSourceId condition"), } } + + #[test] + fn test_series_condition_is_tracked_is_true() { + let condition = SeriesCondition::IsTracked { + is_tracked: BoolOperator::IsTrue, + }; + + match condition { + SeriesCondition::IsTracked { is_tracked } => { + assert!(matches!(is_tracked, BoolOperator::IsTrue)); + } + _ => panic!("Expected IsTracked condition"), + } + } + + #[test] + fn test_series_condition_is_tracked_is_false() { + let condition = SeriesCondition::IsTracked { + is_tracked: BoolOperator::IsFalse, + }; + + match condition { + SeriesCondition::IsTracked { is_tracked } => { + assert!(matches!(is_tracked, BoolOperator::IsFalse)); + } + _ => panic!("Expected IsTracked condition"), + } + } } diff --git a/src/services/plugin/permissions.rs b/src/services/plugin/permissions.rs index 695f5a17..803a97df 100644 --- a/src/services/plugin/permissions.rs +++ b/src/services/plugin/permissions.rs @@ -73,6 +73,8 @@ pub fn required_capability(method: &str) -> Option<RequiredCapability> { // Releases — gated on the `release_source` capability. methods::RELEASES_LIST_TRACKED + | methods::RELEASES_COUNT_TRACKED + | methods::RELEASES_REPORT_PROGRESS | methods::RELEASES_RECORD | methods::RELEASES_SOURCE_STATE_GET | methods::RELEASES_SOURCE_STATE_SET @@ -162,6 +164,8 @@ mod tests { fn releases_methods_require_release_source_capability() { for m in [ methods::RELEASES_LIST_TRACKED, + methods::RELEASES_COUNT_TRACKED, + methods::RELEASES_REPORT_PROGRESS, methods::RELEASES_RECORD, methods::RELEASES_SOURCE_STATE_GET, methods::RELEASES_SOURCE_STATE_SET, @@ -210,6 +214,14 @@ mod tests { required_capability(methods::RELEASES_RECORD), Some(RequiredCapability::ReleaseSource) ); + assert_eq!( + required_capability(methods::RELEASES_COUNT_TRACKED), + Some(RequiredCapability::ReleaseSource) + ); + assert_eq!( + required_capability(methods::RELEASES_REPORT_PROGRESS), + Some(RequiredCapability::ReleaseSource) + ); assert_eq!( required_capability(methods::STORAGE_GET), Some(RequiredCapability::AlwaysAllowed) diff --git a/src/services/plugin/protocol.rs b/src/services/plugin/protocol.rs index cf08858d..94cb06a3 100644 --- a/src/services/plugin/protocol.rs +++ b/src/services/plugin/protocol.rs @@ -265,6 +265,15 @@ pub mod methods { // Release-source reverse-RPC methods (plugin -> host) /// List tracked series scoped to what the source needs. pub const RELEASES_LIST_TRACKED: &str = "releases/list_tracked"; + /// Count tracked series scoped to what the source needs. Plugins call + /// this once at the start of a poll to learn the total denominator + /// before iterating, so progress emits can carry a stable + /// `current/total` ratio. + pub const RELEASES_COUNT_TRACKED: &str = "releases/count_tracked"; + /// Report intra-poll progress. Translates to a `TaskProgressEvent` + /// emitted on the active task's broadcaster. Best-effort — plugins + /// MAY ignore this if they don't have a meaningful denominator. + pub const RELEASES_REPORT_PROGRESS: &str = "releases/report_progress"; /// Record a release candidate in the ledger. pub const RELEASES_RECORD: &str = "releases/record"; /// Get the persisted state for a release source (etag, cursor, etc.). diff --git a/src/services/plugin/releases_handler.rs b/src/services/plugin/releases_handler.rs index 0597252d..5ea1c23f 100644 --- a/src/services/plugin/releases_handler.rs +++ b/src/services/plugin/releases_handler.rs @@ -42,6 +42,11 @@ use crate::services::release::matcher::{evaluate, resolve_threshold}; const DEFAULT_TRACKED_PAGE_SIZE: u64 = 200; /// Hard cap on `limit` to keep a single page bounded. const MAX_TRACKED_PAGE_SIZE: u64 = 1_000; +/// Minimum interval between successive `releases/report_progress` emits +/// for a given task (~10 emits/second). Plugins polling 500+ series at +/// network speed would otherwise flood the SSE pipeline; the final emit +/// (current >= total) is exempt so the progress bar always lands. +const MIN_PROGRESS_EMIT_INTERVAL: std::time::Duration = std::time::Duration::from_millis(100); /// Reverse-RPC handler for the `releases/*` namespace. /// @@ -97,6 +102,8 @@ impl ReleasesRequestHandler { match method { methods::RELEASES_LIST_TRACKED => self.handle_list_tracked(request).await, + methods::RELEASES_COUNT_TRACKED => self.handle_count_tracked(request).await, + methods::RELEASES_REPORT_PROGRESS => self.handle_report_progress(request).await, methods::RELEASES_RECORD => self.handle_record(request).await, methods::RELEASES_SOURCE_STATE_GET => self.handle_state_get(request).await, methods::RELEASES_SOURCE_STATE_SET => self.handle_state_set(request).await, @@ -237,6 +244,183 @@ impl ReleasesRequestHandler { JsonRpcResponse::success(id, serde_json::to_value(response).unwrap()) } + /// Count tracked series scoped to what the plugin's manifest declared. + /// + /// Mirrors the scoping rules of `handle_list_tracked`: when the plugin + /// declares `requires_external_ids`, the returned count is restricted + /// to tracked series that have at least one matching `series_external_ids` + /// row. When the plugin declares no required external IDs, the count + /// equals the total number of tracked series. + /// + /// Plugins call this once at the start of a poll to learn the total + /// denominator before iterating, so subsequent `report_progress` calls + /// can carry a stable `current/total` ratio. + async fn handle_count_tracked(&self, request: &JsonRpcRequest) -> JsonRpcResponse { + let id = request.id.clone(); + let params: CountTrackedRequest = match parse_params(&request.params) { + Ok(p) => p, + Err(resp) => return resp.with_id(id), + }; + + if let Err(resp) = self.assert_source_belongs(¶ms.source_id, &id).await { + return resp; + } + + // Fast path: no external-id scoping declared, count_tracked is enough. + if self.capability.requires_external_ids.is_empty() { + return match SeriesTrackingRepository::count_tracked(&self.db).await { + Ok(total) => { + let resp = CountTrackedResponse { total }; + JsonRpcResponse::success(id, serde_json::to_value(resp).unwrap()) + } + Err(e) => { + error!(error = %e, "tracked-series count failed"); + JsonRpcResponse::error( + Some(id), + JsonRpcError::new(error_codes::INTERNAL_ERROR, format!("db error: {}", e)), + ) + } + }; + } + + // Scoped path: count tracked series that have at least one + // `series_external_ids` row whose source matches the plugin's + // requirements (with both `api:` and `plugin:` prefix conventions + // accepted, mirroring `handle_list_tracked`). + let tracked_ids = match SeriesTrackingRepository::list_tracked_ids(&self.db, 0, 0).await { + Ok(ids) => ids, + Err(e) => { + error!(error = %e, "tracked-series listing failed during count"); + return JsonRpcResponse::error( + Some(id), + JsonRpcError::new(error_codes::INTERNAL_ERROR, format!("db error: {}", e)), + ); + } + }; + if tracked_ids.is_empty() { + let resp = CountTrackedResponse { total: 0 }; + return JsonRpcResponse::success(id, serde_json::to_value(resp).unwrap()); + } + + let by_series = + match SeriesExternalIdRepository::get_for_series_ids(&self.db, &tracked_ids).await { + Ok(map) => map, + Err(e) => { + error!(error = %e, "external_id batched lookup failed during count"); + return JsonRpcResponse::error( + Some(id), + JsonRpcError::new(error_codes::INTERNAL_ERROR, format!("db error: {}", e)), + ); + } + }; + + let total = tracked_ids + .iter() + .filter(|sid| { + by_series.get(sid).is_some_and(|rows| { + rows.iter().any(|row| { + let normalized = strip_external_id_namespace(&row.source); + self.capability + .requires_external_ids + .iter() + .any(|req| req == normalized) + }) + }) + }) + .count() as u64; + + let resp = CountTrackedResponse { total }; + JsonRpcResponse::success(id, serde_json::to_value(resp).unwrap()) + } + + /// Report intra-poll progress for the active task. + /// + /// Looks up the task identity + recording broadcaster from the worker's + /// task-locals (set by [`crate::tasks::worker`] around the handler + /// call). Synthesizes a [`TaskProgressEvent::progress`] and emits it. + /// Calls arriving more frequently than `MIN_PROGRESS_EMIT_INTERVAL` + /// are dropped — except the final emit (`current >= total`), which + /// always passes so the bar lands at 100%. + async fn handle_report_progress(&self, request: &JsonRpcRequest) -> JsonRpcResponse { + let id = request.id.clone(); + let params: ReportProgressRequest = match parse_params(&request.params) { + Ok(p) => p, + Err(resp) => return resp.with_id(id), + }; + + // The reverse-RPC dispatcher runs on the same tokio task as the + // forward call (the worker's `handler.handle(...)`), so the + // task-locals set up by the worker are in scope here. Outside a + // task (e.g. plugins poking the host on their own initiative), + // both calls return None and we silently no-op — there's no task + // to attach progress to. + let identity = match crate::events::current_task_identity() { + Some(id_arc) => id_arc, + None => { + debug!( + "releases/report_progress called outside a task scope; dropping (current={} total={})", + params.current, params.total + ); + return JsonRpcResponse::success( + id, + serde_json::to_value(ReportProgressResponse { emitted: false }).unwrap(), + ); + } + }; + let broadcaster = match crate::events::current_recording_broadcaster() { + Some(b) => b, + None => { + debug!("releases/report_progress: no broadcaster in scope, dropping"); + return JsonRpcResponse::success( + id, + serde_json::to_value(ReportProgressResponse { emitted: false }).unwrap(), + ); + } + }; + + // Rate-limit: drop emits arriving within MIN_PROGRESS_EMIT_INTERVAL + // of the previous one, EXCEPT the final emit (current >= total) so + // the UI lands at 100% even when the last few series flushed back + // to back. `now()` is taken inside the lock so concurrent emits + // race deterministically against the single timestamp slot. + let is_final = params.total > 0 && params.current >= params.total; + if !is_final { + let now = std::time::Instant::now(); + let mut last = identity.last_progress_emit.lock().expect("poisoned"); + if let Some(prev) = *last + && now.duration_since(prev) < MIN_PROGRESS_EMIT_INTERVAL + { + return JsonRpcResponse::success( + id.clone(), + serde_json::to_value(ReportProgressResponse { emitted: false }).unwrap(), + ); + } + *last = Some(now); + } else { + // Always update the timestamp for the final emit too, so + // any post-final stragglers stay rate-limited. + *identity.last_progress_emit.lock().expect("poisoned") = + Some(std::time::Instant::now()); + } + + let event = crate::events::TaskProgressEvent::progress( + identity.task_id, + identity.task_type.clone(), + params.current, + params.total, + params.message.clone(), + identity.library_id, + identity.series_id, + identity.book_id, + ); + let _ = broadcaster.emit_task(event); + + JsonRpcResponse::success( + id, + serde_json::to_value(ReportProgressResponse { emitted: true }).unwrap(), + ) + } + async fn handle_record(&self, request: &JsonRpcRequest) -> JsonRpcResponse { let id = request.id.clone(); let params: RecordRequest = match parse_params(&request.params) { @@ -388,9 +572,16 @@ impl ReleasesRequestHandler { "Skipping release_announced emit for non-announced state" ); } else if let Some(broadcaster) = crate::events::current_recording_broadcaster() { + let series_title = + crate::tasks::handlers::poll_release_source::lookup_series_title( + &self.db, + outcome.row.series_id, + ) + .await; let _ = broadcaster.emit(crate::events::EntityChangeEvent::release_announced( &outcome.row, &self.plugin_name, + series_title, )); } else { debug!( @@ -771,6 +962,48 @@ impl ReleasesRequestHandler { // Wire-format request/response types // ============================================================================= +#[derive(Debug, Clone, Deserialize)] +#[serde(rename_all = "camelCase")] +struct CountTrackedRequest { + /// Source the count is being requested for. Authorization-checked the + /// same way as `list_tracked` — the source must belong to this plugin. + source_id: Uuid, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +struct CountTrackedResponse { + /// Total tracked series visible to this plugin. Filtered by the + /// plugin's `requires_external_ids` so the denominator matches what + /// the plugin will actually iterate. + total: u64, +} + +#[derive(Debug, Clone, Deserialize)] +#[serde(rename_all = "camelCase")] +struct ReportProgressRequest { + /// Items processed so far (1-indexed semantically: pass `current=1` + /// after the first unit of work, not before). + current: usize, + /// Total items expected. When `current >= total`, the emit is + /// considered final and bypasses the rate-limit so the bar lands + /// at 100%. + total: usize, + /// Optional human-readable detail (e.g. "Polling Series Name"); shown + /// alongside the progress bar. + #[serde(default)] + message: Option<String>, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +struct ReportProgressResponse { + /// `true` if a `TaskProgressEvent` was emitted, `false` if the call + /// was no-op'd (rate-limited or no task scope in context). Plugins + /// can ignore this; the field is exposed for tests / observability. + emitted: bool, +} + #[derive(Debug, Clone, Deserialize)] #[serde(rename_all = "camelCase")] struct ListTrackedRequest { @@ -943,6 +1176,8 @@ pub fn is_releases_method(method: &str) -> bool { matches!( method, "releases/list_tracked" + | "releases/count_tracked" + | "releases/report_progress" | "releases/record" | "releases/source_state/get" | "releases/source_state/set" @@ -1245,6 +1480,7 @@ mod tests { match event.event { EntityEvent::ReleaseAnnounced { series_id: ev_series, + series_title, source_id: ev_source, plugin_id, chapter, @@ -1252,6 +1488,13 @@ mod tests { .. } => { assert_eq!(ev_series, series_id); + // `setup` inserts the series with directory name "Series". + // The handler resolves the title via series_metadata if + // present, otherwise falls back to `series.name`. + assert!( + !series_title.is_empty(), + "expected non-empty series_title in event" + ); assert_eq!(ev_source, source_id); assert_eq!(plugin_id, "release-nyaa"); assert_eq!(chapter, Some(143.0)); @@ -1501,6 +1744,8 @@ mod tests { #[test] fn is_releases_method_detects_namespace() { assert!(is_releases_method(methods::RELEASES_LIST_TRACKED)); + assert!(is_releases_method(methods::RELEASES_COUNT_TRACKED)); + assert!(is_releases_method(methods::RELEASES_REPORT_PROGRESS)); assert!(is_releases_method(methods::RELEASES_RECORD)); assert!(is_releases_method(methods::RELEASES_SOURCE_STATE_GET)); assert!(is_releases_method(methods::RELEASES_SOURCE_STATE_SET)); @@ -1509,6 +1754,254 @@ mod tests { assert!(!is_releases_method("storage/get")); } + // ------------------------------------------------------------------------- + // count_tracked tests + // ------------------------------------------------------------------------- + + /// Helper: track an additional series in the same test world used by + /// `setup`. Returns the new series id. + async fn add_tracked_series(db: &DatabaseConnection, name: &str) -> Uuid { + // `setup` already created a library; reuse the first one. + let libs = LibraryRepository::list_all(db).await.unwrap(); + let lib = libs.first().expect("setup library"); + let series = SeriesRepository::create(db, lib.id, name, None) + .await + .unwrap(); + SeriesTrackingRepository::upsert( + db, + series.id, + TrackingUpdate { + tracked: Some(true), + ..Default::default() + }, + ) + .await + .unwrap(); + series.id + } + + #[tokio::test] + async fn count_tracked_unscoped_returns_total_tracked() { + let (db, _t) = create_test_db().await; + let conn = db.sea_orm_connection(); + let (_series, source_id) = setup(conn, "release-nyaa").await; + // Add two more tracked series; total = 3. + let _ = add_tracked_series(conn, "Series 2").await; + let _ = add_tracked_series(conn, "Series 3").await; + + let handler = ReleasesRequestHandler::new( + conn.clone(), + "release-nyaa".to_string(), + make_capability(false, vec![]), // no requires_external_ids + ); + let req = make_request( + methods::RELEASES_COUNT_TRACKED, + json!({"sourceId": source_id}), + ); + let resp = handler.handle_request(&req).await; + assert!(!resp.is_error(), "{:?}", resp.error); + let body: CountTrackedResponse = serde_json::from_value(resp.result.unwrap()).unwrap(); + assert_eq!(body.total, 3); + } + + #[tokio::test] + async fn count_tracked_scoped_only_counts_series_with_required_external_id() { + let (db, _t) = create_test_db().await; + let conn = db.sea_orm_connection(); + let (s1, source_id) = setup(conn, "release-mangaupdates").await; + let s2 = add_tracked_series(conn, "Series 2").await; + let _s3_no_ext_id = add_tracked_series(conn, "Series 3").await; + + // s1 has the required external ID (`api:mangaupdates`), s2 has it + // under the `plugin:` prefix. Both should count. s3 has none. + SeriesExternalIdRepository::upsert(conn, s1, "api:mangaupdates", "abc1", None, None) + .await + .unwrap(); + SeriesExternalIdRepository::upsert(conn, s2, "plugin:mangaupdates", "abc2", None, None) + .await + .unwrap(); + + let handler = ReleasesRequestHandler::new( + conn.clone(), + "release-mangaupdates".to_string(), + make_capability(false, vec!["mangaupdates"]), + ); + let req = make_request( + methods::RELEASES_COUNT_TRACKED, + json!({"sourceId": source_id}), + ); + let resp = handler.handle_request(&req).await; + assert!(!resp.is_error(), "{:?}", resp.error); + let body: CountTrackedResponse = serde_json::from_value(resp.result.unwrap()).unwrap(); + assert_eq!( + body.total, 2, + "scoped count must filter out series without a matching external ID" + ); + } + + #[tokio::test] + async fn count_tracked_rejects_source_belonging_to_other_plugin() { + let (db, _t) = create_test_db().await; + let conn = db.sea_orm_connection(); + let (_series, source_id) = setup(conn, "release-nyaa").await; + // Handler is for a *different* plugin name, so the source-belongs + // check should reject the call. + let handler = ReleasesRequestHandler::new( + conn.clone(), + "release-mangaupdates".to_string(), + make_capability(false, vec![]), + ); + let req = make_request( + methods::RELEASES_COUNT_TRACKED, + json!({"sourceId": source_id}), + ); + let resp = handler.handle_request(&req).await; + assert!(resp.is_error()); + } + + // ------------------------------------------------------------------------- + // report_progress tests + // ------------------------------------------------------------------------- + + #[tokio::test] + async fn report_progress_outside_task_scope_no_ops_cleanly() { + let (db, _t) = create_test_db().await; + let conn = db.sea_orm_connection(); + let handler = ReleasesRequestHandler::new( + conn.clone(), + "release-nyaa".to_string(), + make_capability(false, vec![]), + ); + let req = make_request( + methods::RELEASES_REPORT_PROGRESS, + json!({"current": 1, "total": 10}), + ); + let resp = handler.handle_request(&req).await; + assert!(!resp.is_error()); + let body: ReportProgressResponse = serde_json::from_value(resp.result.unwrap()).unwrap(); + assert!( + !body.emitted, + "no task scope -> no emit, but the call must succeed (don't break the plugin's poll)" + ); + } + + #[tokio::test] + async fn report_progress_inside_task_scope_emits_progress_event() { + use crate::events::EventBroadcaster; + + let (db, _t) = create_test_db().await; + let conn = db.sea_orm_connection(); + let handler = ReleasesRequestHandler::new( + conn.clone(), + "release-nyaa".to_string(), + make_capability(false, vec![]), + ); + + let broadcaster = Arc::new(EventBroadcaster::new(8)); + let mut rx = broadcaster.subscribe_tasks(); + let identity = Arc::new(crate::events::TaskIdentity::new( + Uuid::new_v4(), + "poll_release_source", + None, + None, + None, + )); + + let req = make_request( + methods::RELEASES_REPORT_PROGRESS, + json!({"current": 3, "total": 10, "message": "Polled 3/10 series"}), + ); + let resp = crate::events::with_task_identity( + identity.clone(), + crate::events::with_recording_broadcaster(broadcaster.clone(), async { + handler.handle_request(&req).await + }), + ) + .await; + assert!(!resp.is_error(), "{:?}", resp.error); + let body: ReportProgressResponse = serde_json::from_value(resp.result.unwrap()).unwrap(); + assert!(body.emitted); + + // The broadcaster should have one progress event with the values + // we passed in. + let event = rx.try_recv().expect("progress event was emitted"); + assert_eq!(event.task_id, identity.task_id); + assert_eq!(event.task_type, "poll_release_source"); + let p = event.progress.expect("progress payload"); + assert_eq!(p.current, 3); + assert_eq!(p.total, 10); + assert_eq!(p.message.as_deref(), Some("Polled 3/10 series")); + } + + #[tokio::test] + async fn report_progress_rate_limits_back_to_back_emits_but_lets_final_through() { + use crate::events::EventBroadcaster; + + let (db, _t) = create_test_db().await; + let conn = db.sea_orm_connection(); + let handler = ReleasesRequestHandler::new( + conn.clone(), + "release-nyaa".to_string(), + make_capability(false, vec![]), + ); + + let broadcaster = Arc::new(EventBroadcaster::new(16)); + let mut rx = broadcaster.subscribe_tasks(); + let identity = Arc::new(crate::events::TaskIdentity::new( + Uuid::new_v4(), + "poll_release_source", + None, + None, + None, + )); + + crate::events::with_task_identity( + identity.clone(), + crate::events::with_recording_broadcaster(broadcaster.clone(), async { + // First emit goes through (last_progress_emit was None). + let r1 = handler + .handle_request(&make_request( + methods::RELEASES_REPORT_PROGRESS, + json!({"current": 1, "total": 10}), + )) + .await; + let b1: ReportProgressResponse = + serde_json::from_value(r1.result.unwrap()).unwrap(); + assert!(b1.emitted); + + // Second emit, immediate, dropped by the rate-limit. + let r2 = handler + .handle_request(&make_request( + methods::RELEASES_REPORT_PROGRESS, + json!({"current": 2, "total": 10}), + )) + .await; + let b2: ReportProgressResponse = + serde_json::from_value(r2.result.unwrap()).unwrap(); + assert!(!b2.emitted, "back-to-back emit must be rate-limited"); + + // Final emit (current >= total) bypasses the rate-limit. + let r3 = handler + .handle_request(&make_request( + methods::RELEASES_REPORT_PROGRESS, + json!({"current": 10, "total": 10, "message": "done"}), + )) + .await; + let b3: ReportProgressResponse = + serde_json::from_value(r3.result.unwrap()).unwrap(); + assert!(b3.emitted, "final emit must always pass through"); + }), + ) + .await; + + // Drain the broadcaster: we expect exactly two emits. + let e1 = rx.try_recv().expect("first emit"); + assert_eq!(e1.progress.unwrap().current, 1); + let e2 = rx.try_recv().expect("final emit"); + assert_eq!(e2.progress.unwrap().current, 10); + assert!(rx.try_recv().is_err(), "no more events"); + } + // ------------------------------------------------------------------------- // latest_known_* advancement tests (Phase 6) // ------------------------------------------------------------------------- diff --git a/src/tasks/handlers/poll_release_source.rs b/src/tasks/handlers/poll_release_source.rs index a2bf7d52..8db5ee1c 100644 --- a/src/tasks/handlers/poll_release_source.rs +++ b/src/tasks/handlers/poll_release_source.rs @@ -353,6 +353,11 @@ impl TaskHandler for PollReleaseSourceHandler { // the same series, so we don't want N+1 queries here. let mut owned_cache: std::collections::HashMap<Uuid, OwnedReleaseKeys> = std::collections::HashMap::new(); + // Cache series-title lookups for the same reason: each emit + // needs the title for the toast, but a poll often produces many + // events for one series. + let mut series_title_cache: std::collections::HashMap<Uuid, String> = + std::collections::HashMap::new(); for cand in response.candidates { let series_id = cand.series_match.codex_series_id; @@ -409,10 +414,23 @@ impl TaskHandler for PollReleaseSourceHandler { outcome.row.state == ledger_state::ANNOUNCED; if landed_announced && let Some(broadcaster) = event_broadcaster { + let title = if let Some(cached) = + series_title_cache.get(&outcome.row.series_id) + { + cached.clone() + } else { + let resolved = + lookup_series_title(db, outcome.row.series_id) + .await; + series_title_cache + .insert(outcome.row.series_id, resolved.clone()); + resolved + }; emit_release_announced( broadcaster, &outcome.row, &source.plugin_id, + title, ); } } @@ -597,14 +615,41 @@ pub(crate) fn build_poll_summary( /// Emit a `ReleaseAnnounced` event for a freshly-inserted ledger row. /// +/// `series_title` is the human-readable label rendered by frontend +/// notifications (typically `series_metadata.title`, falling back to the +/// series directory name; see [`lookup_series_title`]). +/// /// Failure to broadcast (no subscribers, channel closed) is a benign noop — /// the ledger row is the source of truth, the SSE event is a UX nicety. pub(crate) fn emit_release_announced( broadcaster: &EventBroadcaster, row: &crate::db::entities::release_ledger::Model, plugin_id: &str, + series_title: String, ) { - let _ = broadcaster.emit(EntityChangeEvent::release_announced(row, plugin_id)); + let _ = broadcaster.emit(EntityChangeEvent::release_announced( + row, + plugin_id, + series_title, + )); +} + +/// Resolve the display title for a series, preferring `series_metadata.title` +/// and falling back to the directory-derived `series.name`. Returns an empty +/// string if the series row is missing (shouldn't happen for a valid ledger +/// insert, but we don't want a notification failure to surface as a panic). +pub(crate) async fn lookup_series_title(db: &DatabaseConnection, series_id: Uuid) -> String { + match SeriesRepository::get_with_metadata(db, series_id).await { + Ok(Some((series, metadata))) => metadata.map(|m| m.title).unwrap_or(series.name), + Ok(None) => String::new(), + Err(e) => { + warn!( + "Failed to look up title for series {} (release notification): {}", + series_id, e + ); + String::new() + } + } } /// Compute the initial ledger state for a candidate. Returns @@ -725,13 +770,19 @@ mod tests { created_at: Utc::now(), }; - emit_release_announced(&broadcaster, &row, "release-mangaupdates"); + emit_release_announced( + &broadcaster, + &row, + "release-mangaupdates", + "Test Series".to_string(), + ); let event = rx.try_recv().expect("expected one event"); match event.event { EntityEvent::ReleaseAnnounced { ledger_id, series_id, + series_title, source_id, plugin_id, chapter, @@ -740,6 +791,7 @@ mod tests { } => { assert_eq!(ledger_id, row.id); assert_eq!(series_id, row.series_id); + assert_eq!(series_title, "Test Series"); assert_eq!(source_id, row.source_id); assert_eq!(plugin_id, "release-mangaupdates"); assert_eq!(chapter, Some(143.0)); @@ -777,7 +829,7 @@ mod tests { observed_at: Utc::now(), created_at: Utc::now(), }; - emit_release_announced(&broadcaster, &row, "release-nyaa"); + emit_release_announced(&broadcaster, &row, "release-nyaa", "Whatever".to_string()); } #[test] diff --git a/src/tasks/handlers/user_plugin_recommendations.rs b/src/tasks/handlers/user_plugin_recommendations.rs index f0c7e6b9..48d12ee6 100644 --- a/src/tasks/handlers/user_plugin_recommendations.rs +++ b/src/tasks/handlers/user_plugin_recommendations.rs @@ -182,12 +182,47 @@ impl UserPluginRecommendationsHandler { } } +/// Phase labels used for progress emits. Five distinct phases give the UI +/// enough granularity to show "what's happening" without spamming events on +/// every tiny step. The numeric `current` advances by phase so the bar moves +/// monotonically; `total` stays at `TOTAL_PHASES`. +const TOTAL_PHASES: usize = 5; +const PHASE_INIT: (usize, &str) = (1, "Starting recommendation plugin"); +const PHASE_BUILD_LIBRARY: (usize, &str) = (2, "Building user library"); +const PHASE_CURATE_SEEDS: (usize, &str) = (3, "Curating seeds"); +const PHASE_CALL_PLUGIN: (usize, &str) = (4, "Generating recommendations"); +const PHASE_PERSIST: (usize, &str) = (5, "Persisting recommendations"); + +fn emit_phase( + broadcaster: Option<&Arc<EventBroadcaster>>, + task: &tasks::Model, + phase: (usize, &str), + detail: Option<String>, +) { + if let Some(b) = broadcaster { + let message = match detail { + Some(d) => format!("{}: {}", phase.1, d), + None => phase.1.to_string(), + }; + let _ = b.emit_task(crate::events::TaskProgressEvent::progress( + task.id, + "user_plugin_recommendations", + phase.0, + TOTAL_PHASES, + Some(message), + task.library_id, + task.series_id, + task.book_id, + )); + } +} + impl TaskHandler for UserPluginRecommendationsHandler { fn handle<'a>( &'a self, task: &'a tasks::Model, db: &'a DatabaseConnection, - _event_broadcaster: Option<&'a Arc<EventBroadcaster>>, + event_broadcaster: Option<&'a Arc<EventBroadcaster>>, ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<TaskResult>> + Send + 'a>> { Box::pin(async move { // Extract task parameters @@ -212,6 +247,8 @@ impl TaskHandler for UserPluginRecommendationsHandler { task.id, plugin_id, user_id ); + emit_phase(event_broadcaster, task, PHASE_INIT, None); + // Read configured task timeout from settings let request_timeout = self.task_request_timeout().await; @@ -261,6 +298,8 @@ impl TaskHandler for UserPluginRecommendationsHandler { rec_settings.drop_threshold ); + emit_phase(event_broadcaster, task, PHASE_BUILD_LIBRARY, None); + // Build user library data let library = build_user_library(db, user_id).await.unwrap_or_else(|e| { warn!( @@ -313,6 +352,13 @@ impl TaskHandler for UserPluginRecommendationsHandler { exclude_ids.len() ); + emit_phase( + event_broadcaster, + task, + PHASE_CURATE_SEEDS, + Some(format!("from {} library entries", library.len())), + ); + // Curate seeds from library: rated entries first, then recent reads let seeds = curate_seeds(&library, &rec_settings); @@ -332,6 +378,13 @@ impl TaskHandler for UserPluginRecommendationsHandler { exclude_ids, }; + emit_phase( + event_broadcaster, + task, + PHASE_CALL_PLUGIN, + Some(format!("limit={}", rec_settings.max_recommendations)), + ); + let result = handle .call_method::<RecommendationRequest, RecommendationResponse>( methods::RECOMMENDATIONS_GET, @@ -354,6 +407,13 @@ impl TaskHandler for UserPluginRecommendationsHandler { let count = response.recommendations.len(); + emit_phase( + event_broadcaster, + task, + PHASE_PERSIST, + Some(format!("{} recommendations", count)), + ); + // Stamp generation time and persist to user_plugin_data for the GET endpoint response.generated_at = Some(Utc::now().to_rfc3339()); let cached_data = serde_json::to_value(&response) diff --git a/src/tasks/worker.rs b/src/tasks/worker.rs index 70ca269b..366d0408 100644 --- a/src/tasks/worker.rs +++ b/src/tasks/worker.rs @@ -151,6 +151,15 @@ impl TaskWorker { self } + /// Register or replace a task handler. Test-only: real callers register + /// handlers via the `with_*_service` builders, which set up the full + /// dependency graph each handler requires. + #[cfg(test)] + pub fn with_handler(mut self, task_type: &str, handler: Arc<dyn TaskHandler>) -> Self { + self.handlers.insert(task_type.to_string(), handler); + self + } + /// Set the event broadcaster for task progress events pub fn with_event_broadcaster(mut self, broadcaster: Arc<EventBroadcaster>) -> Self { self.event_broadcaster = Some(broadcaster); @@ -594,6 +603,18 @@ impl TaskWorker { anyhow::anyhow!("No handler registered for task type: {}", task.task_type) })?; + // Build the task identity exposed to reverse-RPC handlers via the + // task-local context. Used by `releases/report_progress` to + // construct a `TaskProgressEvent` (which needs the task id/type) + // and to rate-limit emits. + let task_identity = Arc::new(crate::events::TaskIdentity::new( + task.id, + task.task_type.clone(), + task.library_id, + task.series_id, + task.book_id, + )); + // In distributed mode, create a recording broadcaster to capture events // that need to be replayed by the TaskListener on the web server let (task_broadcaster, recorded_events): ( @@ -604,18 +625,21 @@ impl TaskWorker { let recording_broadcaster = Arc::new(EventBroadcaster::new_with_recording(1000, true)); let broadcaster_clone = recording_broadcaster.clone(); - // Execute the handler inside a task-local scope that exposes the - // recording broadcaster to any code on this task's await chain — - // including reverse-RPC handlers (e.g. `releases/record`), which + // Execute the handler inside task-local scopes that expose the + // recording broadcaster and the task identity to any code on + // this task's await chain — including reverse-RPC handlers + // (e.g. `releases/record`, `releases/report_progress`), which // are dispatched on this task by `RpcClient::call_with_timeout` // when the plugin tags reverse-RPCs with the parent forward - // request id. Without this, plugins that emit events via - // reverse-RPC (rather than synchronously through the handler's - // broadcaster argument) would have no recording context and - // their events would never replay. - let result = crate::events::with_recording_broadcaster( - recording_broadcaster.clone(), - handler.handle(&task, &self.db, Some(&recording_broadcaster)), + // request id. Without these scopes, plugins that emit events + // via reverse-RPC would have no recording context and their + // events would never replay. + let result = crate::events::with_task_identity( + task_identity.clone(), + crate::events::with_recording_broadcaster( + recording_broadcaster.clone(), + handler.handle(&task, &self.db, Some(&recording_broadcaster)), + ), ) .await; @@ -627,12 +651,24 @@ impl TaskWorker { Some(events) }; - // Return result info for later processing + // Return result info for later processing. A handler that + // returns `Ok(TaskResult { success: false, .. })` is signalling + // a logical failure (e.g. plugin RPC timeout, missing source) — + // route it to `fail_task` so the task row reflects reality + // instead of recording a green "completed" status. match result { - Ok(task_result) => { + Ok(task_result) if task_result.success => { self.complete_task(&task, task_result, started_at, events) .await?; } + Ok(task_result) => { + let err = anyhow::anyhow!( + task_result + .message + .unwrap_or_else(|| "task reported failure".to_string()) + ); + self.fail_task(&task, err, started_at).await?; + } Err(e) => { self.fail_task(&task, e, started_at).await?; } @@ -645,28 +681,44 @@ impl TaskWorker { }; // Execute task with shared broadcaster (single-process mode). - // Set the task-local to the shared broadcaster too, so reverse-RPC - // handlers see *the same* broadcaster the rest of the task uses. + // Set the task-locals to the shared broadcaster + task identity so + // reverse-RPC handlers see *the same* broadcaster the rest of the + // task uses, and can synthesize `TaskProgressEvent`s for the task. // The shared broadcaster has recording disabled here (web/single- // process mode), so emits flow straight to live SSE subscribers. let result = if let Some(ref shared) = task_broadcaster { - crate::events::with_recording_broadcaster( - shared.clone(), - handler.handle(&task, &self.db, task_broadcaster.as_ref()), + crate::events::with_task_identity( + task_identity.clone(), + crate::events::with_recording_broadcaster( + shared.clone(), + handler.handle(&task, &self.db, task_broadcaster.as_ref()), + ), ) .await } else { - handler - .handle(&task, &self.db, task_broadcaster.as_ref()) - .await + crate::events::with_task_identity( + task_identity.clone(), + handler.handle(&task, &self.db, task_broadcaster.as_ref()), + ) + .await }; - // Update task status based on result + // Update task status based on result. See the matching block in + // distributed mode above for the rationale on the `success: false` + // branch. match result { - Ok(task_result) => { + Ok(task_result) if task_result.success => { self.complete_task(&task, task_result, started_at, recorded_events) .await?; } + Ok(task_result) => { + let err = anyhow::anyhow!( + task_result + .message + .unwrap_or_else(|| "task reported failure".to_string()) + ); + self.fail_task(&task, err, started_at).await?; + } Err(e) => { self.fail_task(&task, e, started_at).await?; } @@ -886,7 +938,99 @@ impl TaskWorker { #[cfg(test)] mod tests { use super::*; + use crate::db::repositories::TaskRepository; + use crate::db::test_helpers::create_test_db; use crate::events::{EntityChangeEvent, EntityEvent, EntityType}; + use crate::tasks::handlers::TaskHandler; + use crate::tasks::types::{TaskResult, TaskType}; + + /// Stub handler that returns whatever `TaskResult` it was constructed with. + /// Used to drive the worker through specific result branches without + /// dragging in the real handlers' dependency graphs. + struct StubHandler { + result: TaskResult, + } + + impl TaskHandler for StubHandler { + fn handle<'a>( + &'a self, + _task: &'a crate::db::entities::tasks::Model, + _db: &'a sea_orm::DatabaseConnection, + _event_broadcaster: Option<&'a Arc<EventBroadcaster>>, + ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<TaskResult>> + Send + 'a>> + { + let r = self.result.clone(); + Box::pin(async move { Ok(r) }) + } + } + + /// Regression: a handler returning `Ok(TaskResult::failure(...))` must + /// land the task in the `failed` state. Before this fix, the worker + /// dispatched both `Ok` arms to `complete_task`, silently writing + /// `status = "completed"` over the handler's failure signal. + #[tokio::test] + async fn handler_failure_result_marks_task_failed() { + let (test_db, _temp) = create_test_db().await; + let db = test_db.sea_orm_connection().clone(); + let task_id = TaskRepository::enqueue(&db, TaskType::FindDuplicates, None) + .await + .expect("enqueue"); + + let stub = Arc::new(StubHandler { + result: TaskResult::failure("synthetic failure"), + }); + let worker = TaskWorker::new(db.clone()) + .with_handler("find_duplicates", stub) + .with_poll_interval(Duration::from_millis(10)); + + let processed = worker.process_once().await.expect("process_once"); + assert!(processed, "worker should have claimed the task"); + + let task = TaskRepository::get_by_id(&db, task_id) + .await + .expect("get_by_id") + .expect("task row"); + // FindDuplicates has retries enabled, so on the first failure the + // task is bounced back to `pending` for retry; the load-bearing + // assertion is "not completed". `last_error` must reflect the + // handler's message regardless of retry state. + assert_ne!( + task.status, "completed", + "Ok(TaskResult::failure) must not be recorded as completed" + ); + assert_eq!( + task.last_error.as_deref(), + Some("synthetic failure"), + "the handler's failure message must surface on the task row" + ); + } + + /// Symmetric positive-case: a `Ok(TaskResult::success(..))` still flows + /// to `complete_task` after the routing change. + #[tokio::test] + async fn handler_success_result_marks_task_completed() { + let (test_db, _temp) = create_test_db().await; + let db = test_db.sea_orm_connection().clone(); + let task_id = TaskRepository::enqueue(&db, TaskType::FindDuplicates, None) + .await + .expect("enqueue"); + + let stub = Arc::new(StubHandler { + result: TaskResult::success("done"), + }); + let worker = TaskWorker::new(db.clone()) + .with_handler("find_duplicates", stub) + .with_poll_interval(Duration::from_millis(10)); + + worker.process_once().await.expect("process_once"); + + let task = TaskRepository::get_by_id(&db, task_id) + .await + .expect("get_by_id") + .expect("task row"); + assert_eq!(task.status, "completed"); + assert!(task.last_error.is_none()); + } #[test] fn test_worker_creation() { diff --git a/tests/api/series.rs b/tests/api/series.rs index bbf656e6..bdd0e0b5 100644 --- a/tests/api/series.rs +++ b/tests/api/series.rs @@ -5494,6 +5494,97 @@ async fn test_list_series_filtered_by_has_external_source_id() { assert_eq!(series_list.data[0].title, "Series Without External ID"); } +// ============================================================================ +// IsTracked Filter Tests +// ============================================================================ + +#[tokio::test] +async fn test_list_series_filtered_by_is_tracked() { + let (db, _temp_dir) = setup_test_db().await; + + use codex::db::repositories::{SeriesTrackingRepository, TrackingUpdate}; + + let library = LibraryRepository::create(&db, "Library", "/lib", ScanningStrategy::Default) + .await + .unwrap(); + + // A series with tracking explicitly enabled. + let tracked_series = SeriesRepository::create(&db, library.id, "Tracked Series", None) + .await + .unwrap(); + SeriesTrackingRepository::upsert( + &db, + tracked_series.id, + TrackingUpdate { + tracked: Some(true), + ..Default::default() + }, + ) + .await + .unwrap(); + + // A series with a tracking row that has tracked=false (rare but possible + // after a user un-tracks something). + let untracked_row_series = + SeriesRepository::create(&db, library.id, "Untracked With Row", None) + .await + .unwrap(); + SeriesTrackingRepository::upsert( + &db, + untracked_row_series.id, + TrackingUpdate { + tracked: Some(false), + ..Default::default() + }, + ) + .await + .unwrap(); + + // A series with no tracking row at all (the common case for a fresh DB). + let _no_row_series = SeriesRepository::create(&db, library.id, "No Tracking Row", None) + .await + .unwrap(); + + let state = create_test_auth_state(db.clone()).await; + let token = create_admin_and_token(&db, &state).await; + let app = create_test_router(state).await; + + // isTracked = true returns only series with tracking explicitly enabled. + let request_body = SeriesListRequest { + condition: Some(SeriesCondition::IsTracked { + is_tracked: BoolOperator::IsTrue, + }), + ..Default::default() + }; + let request = post_json_request_with_auth("/api/v1/series/list", &request_body, &token); + let (status, response): (StatusCode, Option<SeriesListResponse>) = + make_json_request(app.clone(), request).await; + + assert_eq!(status, StatusCode::OK); + let series_list = response.unwrap(); + assert_eq!(series_list.data.len(), 1); + assert_eq!(series_list.data[0].title, "Tracked Series"); + + // isTracked = false returns the inverse: tracked=false rows AND series + // with no tracking row at all. + let request_body = SeriesListRequest { + condition: Some(SeriesCondition::IsTracked { + is_tracked: BoolOperator::IsFalse, + }), + ..Default::default() + }; + let request = post_json_request_with_auth("/api/v1/series/list", &request_body, &token); + let (status, response): (StatusCode, Option<SeriesListResponse>) = + make_json_request(app, request).await; + + assert_eq!(status, StatusCode::OK); + let series_list = response.unwrap(); + assert_eq!(series_list.data.len(), 2); + let titles: Vec<&str> = series_list.data.iter().map(|s| s.title.as_str()).collect(); + assert!(titles.contains(&"Untracked With Row")); + assert!(titles.contains(&"No Tracking Row")); +} + // ============================================================================ // Has User Rating Filter Tests // ============================================================================ diff --git a/web/openapi.json b/web/openapi.json index e828f0d6..e7b43333 100644 --- a/web/openapi.json +++ b/web/openapi.json @@ -23881,6 +23881,7 @@ "required": [ "ledgerId", "seriesId", + "seriesTitle", "sourceId", "pluginId", "language", @@ -23911,6 +23912,10 @@ "type": "string", "format": "uuid" }, + "seriesTitle": { + "type": "string", + "description": "Series display title (`series_metadata.title`, falling back to the\nseries directory name). Carried in the event so notifications can\nrender a clickable series link without a second round-trip." + }, "sourceId": { "type": "string", "format": "uuid" @@ -34859,6 +34864,18 @@ "$ref": "#/components/schemas/BoolOperator" } } + }, + { + "type": "object", + "description": "Filter by whether release tracking is enabled for the series.\n\n`IsTrue` returns only series whose `series_tracking.tracked` flag is\n`true`. `IsFalse` returns everything else, including series with no\n`series_tracking` row at all (the common case for a fresh library).", + "required": [ + "isTracked" + ], + "properties": { + "isTracked": { + "$ref": "#/components/schemas/BoolOperator" + } + } } ], "description": "Series-level search conditions\n\nConditions can be composed using `allOf` (AND) and `anyOf` (OR).\nUses untagged enum for cleaner JSON without explicit type field." diff --git a/web/src/components/layout/ReleasesNavBadge.tsx b/web/src/components/layout/ReleasesNavBadge.tsx index 2b414b6d..f85867f0 100644 --- a/web/src/components/layout/ReleasesNavBadge.tsx +++ b/web/src/components/layout/ReleasesNavBadge.tsx @@ -9,7 +9,7 @@ export function ReleasesNavBadge() { const unseen = useReleaseAnnouncementsStore((s) => s.unseenCount); if (unseen <= 0) return null; return ( - <Badge color="orange" variant="filled" size="sm" circle> + <Badge color="orange" variant="filled" size="sm"> {unseen > 99 ? "99+" : unseen} </Badge> ); diff --git a/web/src/components/library/ActiveFilters.test.tsx b/web/src/components/library/ActiveFilters.test.tsx index fa36a657..48a1fc08 100644 --- a/web/src/components/library/ActiveFilters.test.tsx +++ b/web/src/components/library/ActiveFilters.test.tsx @@ -226,4 +226,38 @@ describe("ActiveFilters", () => { }); expect(removeButton).toBeInTheDocument(); }); + + it("should render isTracked include filter chip", () => { + render( + <TestWrapper initialRoute="/?trf=include"> + <ActiveFilters /> + </TestWrapper>, + ); + + expect(screen.getByText("Filters:")).toBeInTheDocument(); + expect(screen.getByText("Tracked")).toBeInTheDocument(); + }); + + it("should render isTracked exclude filter chip with NOT prefix", () => { + render( + <TestWrapper initialRoute="/?trf=exclude"> + <ActiveFilters /> + </TestWrapper>, + ); + + expect(screen.getByText(/NOT Tracked/)).toBeInTheDocument(); + }); + + it("should have remove button on isTracked chip", () => { + render( + <TestWrapper initialRoute="/?trf=include"> + <ActiveFilters /> + </TestWrapper>, + ); + + const removeButton = screen.getByRole("button", { + name: /Remove tracking filter/i, + }); + expect(removeButton).toBeInTheDocument(); + }); }); diff --git a/web/src/components/library/ActiveFilters.tsx b/web/src/components/library/ActiveFilters.tsx index 129a2ea3..f2ad95a0 100644 --- a/web/src/components/library/ActiveFilters.tsx +++ b/web/src/components/library/ActiveFilters.tsx @@ -24,6 +24,7 @@ export function ActiveFilters() { setSharingTagState, setCompletionState, setHasExternalSourceIdState, + setIsTrackedState, clearAll, } = useSeriesFilterState(); @@ -156,6 +157,33 @@ export function ActiveFilters() { ); } + // Add isTracked chip if active + if (filters.isTracked !== "neutral") { + const isExclude = filters.isTracked === "exclude"; + allChips.push( + <Badge + key="isTracked" + variant="filled" + color={isExclude ? "red" : "blue"} + size="md" + className={styles.chip} + rightSection={ + <ActionIcon + size="xs" + variant="transparent" + color="white" + onClick={() => setIsTrackedState("neutral")} + aria-label="Remove tracking filter" + > + <IconX size={12} /> + </ActionIcon> + } + > + {isExclude ? "NOT " : ""}Tracked + </Badge>, + ); + } + return ( <Group gap="xs" className={styles.container}> <Text size="sm" c="dimmed" fw={500}> diff --git a/web/src/components/library/SeriesFilterPanel.tsx b/web/src/components/library/SeriesFilterPanel.tsx index 2d317227..f60c0378 100644 --- a/web/src/components/library/SeriesFilterPanel.tsx +++ b/web/src/components/library/SeriesFilterPanel.tsx @@ -278,6 +278,26 @@ export function SeriesFilterPanel() { showModeToggle={false} /> + {/* Release Tracking Filter */} + <FilterGroup + title="Release Tracking" + options={[{ value: "tracked", label: "Tracked" }]} + state={{ + mode: "allOf", + values: + draftState.draftFilters.isTracked !== "neutral" + ? new Map([ + ["tracked", draftState.draftFilters.isTracked], + ]) + : new Map(), + }} + onValueChange={(_value, state) => + draftState.setIsTrackedState(state) + } + onModeChange={() => {}} + showModeToggle={false} + /> + {/* Metadata Section - Only show if there's data */} {hasMetadataFilters && ( <> diff --git a/web/src/hooks/useDraftSeriesFilterState.ts b/web/src/hooks/useDraftSeriesFilterState.ts index 5f25c29a..495b0da2 100644 --- a/web/src/hooks/useDraftSeriesFilterState.ts +++ b/web/src/hooks/useDraftSeriesFilterState.ts @@ -53,6 +53,9 @@ interface UseDraftSeriesFilterStateReturn { // Actions for hasUserRating filter setHasUserRatingState: (state: TriState) => void; + // Actions for isTracked filter + setIsTrackedState: (state: TriState) => void; + // Bulk actions on draft clearAllDraft: () => void; clearGroupDraft: (group: keyof SeriesFilterState) => void; @@ -98,6 +101,7 @@ function cloneFilterState(state: SeriesFilterState): SeriesFilterState { completion: state.completion, hasExternalSourceId: state.hasExternalSourceId, hasUserRating: state.hasUserRating, + isTracked: state.isTracked, }; } @@ -112,10 +116,11 @@ function filterStatesEqual( if (a.completion !== b.completion) return false; if (a.hasExternalSourceId !== b.hasExternalSourceId) return false; if (a.hasUserRating !== b.hasUserRating) return false; + if (a.isTracked !== b.isTracked) return false; // Compare FilterGroupState fields const groups: (keyof Omit< SeriesFilterState, - "completion" | "hasExternalSourceId" | "hasUserRating" + "completion" | "hasExternalSourceId" | "hasUserRating" | "isTracked" >)[] = [ "genres", "tags", @@ -187,7 +192,7 @@ export function useDraftSeriesFilterState(): UseDraftSeriesFilterStateReturn { ( group: keyof Omit< SeriesFilterState, - "completion" | "hasExternalSourceId" | "hasUserRating" + "completion" | "hasExternalSourceId" | "hasUserRating" | "isTracked" >, updater: (current: FilterGroupState) => FilterGroupState, ) => { @@ -393,6 +398,17 @@ export function useDraftSeriesFilterState(): UseDraftSeriesFilterStateReturn { [updateDraft], ); + // IsTracked actions + const setIsTrackedState = useCallback( + (state: TriState) => { + updateDraft((current) => ({ + ...current, + isTracked: state, + })); + }, + [updateDraft], + ); + // Clear all draft filters const clearAllDraft = useCallback(() => { setDraftFilters(createEmptySeriesFilterState()); @@ -420,6 +436,7 @@ export function useDraftSeriesFilterState(): UseDraftSeriesFilterStateReturn { newParams.delete("cf"); newParams.delete("esf"); newParams.delete("urf"); + newParams.delete("trf"); // Add new filter params (will be empty for cleared filters) for (const [key, value] of filterParams) { newParams.set(key, value); @@ -456,6 +473,11 @@ export function useDraftSeriesFilterState(): UseDraftSeriesFilterStateReturn { ...current, hasUserRating: "neutral", })); + } else if (group === "isTracked") { + setDraftFilters((current) => ({ + ...current, + isTracked: "neutral", + })); } else { setDraftFilters((current) => ({ ...current, @@ -482,6 +504,7 @@ export function useDraftSeriesFilterState(): UseDraftSeriesFilterStateReturn { newParams.delete("cf"); newParams.delete("esf"); newParams.delete("urf"); + newParams.delete("trf"); // Add new filter params for (const [key, value] of filterParams) { newParams.set(key, value); @@ -512,6 +535,7 @@ export function useDraftSeriesFilterState(): UseDraftSeriesFilterStateReturn { hasExternalSourceId: draftFilters.hasExternalSourceId !== "neutral" ? 1 : 0, hasUserRating: draftFilters.hasUserRating !== "neutral" ? 1 : 0, + isTracked: draftFilters.isTracked !== "neutral" ? 1 : 0, }), [draftFilters], ); @@ -546,6 +570,7 @@ export function useDraftSeriesFilterState(): UseDraftSeriesFilterStateReturn { setCompletionState, setHasExternalSourceIdState, setHasUserRatingState, + setIsTrackedState, clearAllDraft, clearGroupDraft, clearAllAndApply, diff --git a/web/src/hooks/useEntityEvents.test.ts b/web/src/hooks/useEntityEvents.test.ts index 8fcd6126..6d3c8e2e 100644 --- a/web/src/hooks/useEntityEvents.test.ts +++ b/web/src/hooks/useEntityEvents.test.ts @@ -7,7 +7,11 @@ import * as eventsApi from "@/api/events"; import { useAuthStore } from "@/store/authStore"; import { useCoverUpdatesStore } from "@/store/coverUpdatesStore"; import type { EntityChangeEvent } from "@/types"; -import { shouldNotifyRelease, useEntityEvents } from "./useEntityEvents"; +import { + formatReleaseLabel, + shouldNotifyRelease, + useEntityEvents, +} from "./useEntityEvents"; // Mock the events API vi.mock("@/api/events"); @@ -580,3 +584,63 @@ describe("shouldNotifyRelease", () => { ).toBe(true); }); }); + +// ============================================================================= +// formatReleaseLabel — aggregated chapter/volume label for the release toast +// ============================================================================= + +describe("formatReleaseLabel", () => { + it("falls back to 'New release' when nothing was announced", () => { + expect( + formatReleaseLabel({ + chapters: new Set(), + volumes: new Set(), + pluginId: "release-nyaa", + }), + ).toBe("New release"); + }); + + it("formats a single chapter", () => { + expect( + formatReleaseLabel({ + chapters: new Set([42]), + volumes: new Set(), + pluginId: "release-nyaa", + }), + ).toBe("Ch 42"); + }); + + it("formats a single volume", () => { + expect( + formatReleaseLabel({ + chapters: new Set(), + volumes: new Set([3]), + pluginId: "release-nyaa", + }), + ).toBe("Vol 3"); + }); + + it("sorts numerically and lists volumes before chapters", () => { + expect( + formatReleaseLabel({ + chapters: new Set([12, 11, 13]), + volumes: new Set([2, 1]), + pluginId: "release-nyaa", + }), + ).toBe("Vol 1, 2 / Ch 11, 12, 13"); + }); + + it("truncates with an ellipsis once the label gets too long", () => { + const chapters = new Set<number>(); + for (let i = 1; i <= 80; i++) chapters.add(i); + const out = formatReleaseLabel({ + chapters, + volumes: new Set(), + pluginId: "release-nyaa", + }); + // Capped at 70 chars: 69 chars + the ellipsis. + expect(out.length).toBe(70); + expect(out.endsWith("…")).toBe(true); + expect(out.startsWith("Ch ")).toBe(true); + }); +}); diff --git a/web/src/hooks/useEntityEvents.ts b/web/src/hooks/useEntityEvents.tsx similarity index 74% rename from web/src/hooks/useEntityEvents.ts rename to web/src/hooks/useEntityEvents.tsx index d33e293f..382c4eea 100644 --- a/web/src/hooks/useEntityEvents.ts +++ b/web/src/hooks/useEntityEvents.tsx @@ -1,7 +1,9 @@ +import { Anchor } from "@mantine/core"; import { notifications } from "@mantine/notifications"; import { useQueryClient } from "@tanstack/react-query"; import { useEffect, useState } from "react"; import { eventsApi } from "@/api/events"; +import { navigationService } from "@/services/navigation"; import { useAuthStore } from "@/store/authStore"; import { useCoverUpdatesStore } from "@/store/coverUpdatesStore"; import { useReleaseAnnouncementsStore } from "@/store/releaseAnnouncementsStore"; @@ -13,6 +15,44 @@ type ConnectionState = "connecting" | "connected" | "disconnected" | "failed"; const log = createDevLog("[SSE]"); +/** Per-series state for the aggregated release toast. Lives at module scope + * so a burst of `release_announced` events for the same series collapses + * into a single toast that updates in place rather than spawning N toasts. + * + * The set of chapters/volumes accumulates while the toast is visible; the + * `onClose` handler clears the entry, so the next event after dismissal + * starts a fresh toast. Exported for testability. */ +type ReleaseToastState = { + chapters: Set<number>; + volumes: Set<number>; + pluginId: string; +}; +export const releaseToastState = new Map<string, ReleaseToastState>(); + +/** Cap on the rendered chapter/volume label so a series with dozens of + * announced releases doesn't blow the toast width out. */ +const RELEASE_LABEL_MAX_CHARS = 70; + +/** Build the message body for the aggregated release toast. Lists volumes + * first (typically fewer entries), then chapters; truncates with `…` once + * the joined string exceeds `RELEASE_LABEL_MAX_CHARS`. Pure helper — + * exported for testing. */ +export function formatReleaseLabel(state: ReleaseToastState): string { + const sortAsc = (s: Set<number>) => [...s].sort((a, b) => a - b); + const parts: string[] = []; + if (state.volumes.size > 0) { + parts.push(`Vol ${sortAsc(state.volumes).join(", ")}`); + } + if (state.chapters.size > 0) { + parts.push(`Ch ${sortAsc(state.chapters).join(", ")}`); + } + let label = parts.length > 0 ? parts.join(" / ") : "New release"; + if (label.length > RELEASE_LABEL_MAX_CHARS) { + label = `${label.slice(0, RELEASE_LABEL_MAX_CHARS - 1).trimEnd()}…`; + } + return label; +} + /** Best-effort decode of a JSON-array string (settings + user_preferences * values are stored as JSON-encoded strings). Non-string entries and parse * failures collapse to an empty list. */ @@ -335,20 +375,67 @@ function handleEntityEvent( queryKey: ["series", event.seriesId, "full"], }); - // Surface a low-priority toast. Toast text uses chapter or volume - // when the source provided one; falls back to a neutral message. - const label = - event.chapter !== null && event.chapter !== undefined - ? `Ch ${event.chapter}` - : event.volume !== null && event.volume !== undefined - ? `Vol ${event.volume}` - : "New release"; - notifications.show({ - id: `release-${event.ledgerId}`, - title: "New release", - message: `${label} from ${event.pluginId}`, - color: "orange", - }); + // Surface a low-priority toast. To avoid spamming the user when a + // single poll lands a dozen releases for one series, we aggregate by + // `seriesId`: one toast per series, updated in place as more events + // arrive. The title is the series name (clickable); the body lists + // every volume/chapter announced while the toast is visible. + const seriesTitle = + event.seriesTitle.length > 0 ? event.seriesTitle : "New release"; + const toastId = `release-series-${event.seriesId}`; + const seriesPath = `/series/${event.seriesId}#releases`; + + const existing = releaseToastState.get(event.seriesId); + const state: ReleaseToastState = existing ?? { + chapters: new Set(), + volumes: new Set(), + pluginId: event.pluginId, + }; + if (event.chapter !== null && event.chapter !== undefined) { + state.chapters.add(event.chapter); + } + if (event.volume !== null && event.volume !== undefined) { + state.volumes.add(event.volume); + } + state.pluginId = event.pluginId; + + const message = `${formatReleaseLabel(state)} from ${event.pluginId}`; + // Mantine renders the toast inside its own portal, which sits above + // <BrowserRouter> in the tree, so a `<Link>` here would crash with + // "Cannot destructure property 'basename' of useContext(...)". + // Use the global navigationService (already wired to react-router's + // navigate) so SPA navigation still works from inside the toast. + const titleNode = ( + <Anchor + href={seriesPath} + onClick={(e) => { + e.preventDefault(); + notifications.hide(toastId); + navigationService.navigateTo(seriesPath); + }} + inherit + > + {seriesTitle} + </Anchor> + ); + + if (existing) { + notifications.update({ + id: toastId, + title: titleNode, + message, + color: "orange", + }); + } else { + releaseToastState.set(event.seriesId, state); + notifications.show({ + id: toastId, + title: titleNode, + message, + color: "orange", + onClose: () => releaseToastState.delete(event.seriesId), + }); + } break; } diff --git a/web/src/hooks/useSeriesFilterState.ts b/web/src/hooks/useSeriesFilterState.ts index 2615b47c..426bac7c 100644 --- a/web/src/hooks/useSeriesFilterState.ts +++ b/web/src/hooks/useSeriesFilterState.ts @@ -54,6 +54,9 @@ interface UseSeriesFilterStateReturn { // Actions for hasUserRating filter setHasUserRatingState: (state: TriState) => void; + // Actions for isTracked filter + setIsTrackedState: (state: TriState) => void; + // Bulk actions clearAll: () => void; clearGroup: (group: keyof SeriesFilterState) => void; @@ -107,6 +110,7 @@ export function useSeriesFilterState(): UseSeriesFilterStateReturn { newParams.delete("cf"); newParams.delete("esf"); newParams.delete("urf"); + newParams.delete("trf"); // Add new filter params for (const [key, value] of filterParams) { newParams.set(key, value); @@ -125,7 +129,7 @@ export function useSeriesFilterState(): UseSeriesFilterStateReturn { ( group: keyof Omit< SeriesFilterState, - "completion" | "hasExternalSourceId" | "hasUserRating" + "completion" | "hasExternalSourceId" | "hasUserRating" | "isTracked" >, updater: (current: FilterGroupState) => FilterGroupState, ) => { @@ -324,6 +328,15 @@ export function useSeriesFilterState(): UseSeriesFilterStateReturn { [filters, updateFilters], ); + // IsTracked actions + const setIsTrackedState = useCallback( + (state: TriState) => { + const newFilters = { ...filters, isTracked: state }; + updateFilters(newFilters); + }, + [filters, updateFilters], + ); + // Clear all filters const clearAll = useCallback(() => { updateFilters(createEmptySeriesFilterState()); @@ -347,6 +360,12 @@ export function useSeriesFilterState(): UseSeriesFilterStateReturn { hasUserRating: "neutral" as const, }; updateFilters(newFilters); + } else if (group === "isTracked") { + const newFilters = { + ...filters, + isTracked: "neutral" as const, + }; + updateFilters(newFilters); } else { updateGroup(group, (current) => ({ ...current, @@ -370,6 +389,7 @@ export function useSeriesFilterState(): UseSeriesFilterStateReturn { completion: filters.completion !== "neutral" ? 1 : 0, hasExternalSourceId: filters.hasExternalSourceId !== "neutral" ? 1 : 0, hasUserRating: filters.hasUserRating !== "neutral" ? 1 : 0, + isTracked: filters.isTracked !== "neutral" ? 1 : 0, }), [filters], ); @@ -404,6 +424,7 @@ export function useSeriesFilterState(): UseSeriesFilterStateReturn { setCompletionState, setHasExternalSourceIdState, setHasUserRatingState, + setIsTrackedState, clearAll, clearGroup, hasActiveFilters, diff --git a/web/src/pages/settings/TasksSettings.tsx b/web/src/pages/settings/TasksSettings.tsx index 567e2ffe..0182f2ad 100644 --- a/web/src/pages/settings/TasksSettings.tsx +++ b/web/src/pages/settings/TasksSettings.tsx @@ -184,6 +184,52 @@ export function TasksSettings() { refetchInterval: 5000, }); + // Seed `activeProgress` from currently-processing tasks so the "Active + // Tasks" panel reflects them immediately on page load, even before any + // SSE progress event arrives. Without this, opening the page mid-poll + // shows an empty panel until the running task fires its next progress + // emit (which can be many seconds for slow polls or never for handlers + // that don't emit progress at all). + // + // Existing entries (already populated by SSE) win — we never overwrite + // a richer, more recent event with a bare polling snapshot. + useEffect(() => { + if (!tasks || tasks.length === 0) return; + setActiveProgress((prev) => { + let changed = false; + const next = new Map(prev); + const processingIds = new Set<string>(); + for (const t of tasks) { + if (t.status !== "processing") continue; + processingIds.add(t.id); + if (next.has(t.id)) continue; + next.set(t.id, { + taskId: t.id, + taskType: t.taskType, + status: "running", + progress: undefined, + error: undefined, + startedAt: t.startedAt ?? new Date().toISOString(), + completedAt: undefined, + libraryId: t.libraryId ?? undefined, + seriesId: t.seriesId ?? undefined, + bookId: t.bookId ?? undefined, + }); + changed = true; + } + // Drop running entries that are no longer in the processing list — + // they completed/failed without an SSE delete reaching us (e.g. SSE + // dropped the event, or the page just opened post-completion). + for (const [id, ev] of prev) { + if (ev.status === "running" && !processingIds.has(id)) { + next.delete(id); + changed = true; + } + } + return changed ? next : prev; + }); + }, [tasks]); + // Subscribe to real-time task progress useEffect(() => { const unsubscribe = subscribeToTaskProgress( diff --git a/web/src/types/api.generated.ts b/web/src/types/api.generated.ts index b57bc42c..4d8d90b1 100644 --- a/web/src/types/api.generated.ts +++ b/web/src/types/api.generated.ts @@ -9800,6 +9800,12 @@ export interface components { pluginId: string; /** Format: uuid */ seriesId: string; + /** + * @description Series display title (`series_metadata.title`, falling back to the + * series directory name). Carried in the event so notifications can + * render a clickable series link without a second round-trip. + */ + seriesTitle: string; /** Format: uuid */ sourceId: string; /** @enum {string} */ @@ -15794,6 +15800,8 @@ export interface components { hasExternalSourceId: components["schemas"]["BoolOperator"]; } | { hasUserRating: components["schemas"]["BoolOperator"]; + } | { + isTracked: components["schemas"]["BoolOperator"]; }; /** * @description Series context for template and condition evaluation. diff --git a/web/src/types/filters.ts b/web/src/types/filters.ts index 69f5a0a4..9c19f8d1 100644 --- a/web/src/types/filters.ts +++ b/web/src/types/filters.ts @@ -64,7 +64,8 @@ export type SeriesCondition = | { sharingTag: FieldOperator } | { completion: BoolOperator } | { hasExternalSourceId: BoolOperator } - | { hasUserRating: BoolOperator }; + | { hasUserRating: BoolOperator } + | { isTracked: BoolOperator }; // ============================================================================= // Book conditions (matches backend BookCondition) @@ -136,6 +137,7 @@ export interface SeriesFilterState { completion: TriState; hasExternalSourceId: TriState; hasUserRating: TriState; + isTracked: TriState; } /** @@ -178,6 +180,7 @@ export function createEmptySeriesFilterState(): SeriesFilterState { completion: "neutral", hasExternalSourceId: "neutral", hasUserRating: "neutral", + isTracked: "neutral", }; } @@ -344,6 +347,13 @@ export function seriesFilterStateToCondition( allConditions.push({ hasUserRating: { operator: "isFalse" } }); } + // Add isTracked condition + if (state.isTracked === "include") { + allConditions.push({ isTracked: { operator: "isTrue" } }); + } else if (state.isTracked === "exclude") { + allConditions.push({ isTracked: { operator: "isFalse" } }); + } + // Return combined condition if (allConditions.length === 0) { return undefined; @@ -538,6 +548,7 @@ export const FILTER_PARAM_KEYS = { completion: "cf", hasExternalSourceId: "esf", hasUserRating: "urf", + isTracked: "trf", } as const; /** @@ -586,6 +597,10 @@ export function serializeSeriesFilters( params.set(FILTER_PARAM_KEYS.hasUserRating, state.hasUserRating); } + if (state.isTracked !== "neutral") { + params.set(FILTER_PARAM_KEYS.isTracked, state.isTracked); + } + return params; } @@ -598,6 +613,7 @@ export function parseSeriesFilters(params: URLSearchParams): SeriesFilterState { FILTER_PARAM_KEYS.hasExternalSourceId, ); const hasUserRatingParam = params.get(FILTER_PARAM_KEYS.hasUserRating); + const isTrackedParam = params.get(FILTER_PARAM_KEYS.isTracked); return { genres: parseFilterGroup(params.get(FILTER_PARAM_KEYS.genres)), tags: parseFilterGroup(params.get(FILTER_PARAM_KEYS.tags)), @@ -619,6 +635,10 @@ export function parseSeriesFilters(params: URLSearchParams): SeriesFilterState { hasUserRatingParam === "include" || hasUserRatingParam === "exclude" ? hasUserRatingParam : "neutral", + isTracked: + isTrackedParam === "include" || isTrackedParam === "exclude" + ? isTrackedParam + : "neutral", }; }