diff --git a/packages/das/src/entities/LabelEvent.entity.ts b/packages/das/src/entities/LabelEvent.entity.ts index fc5a329..2ae1664 100644 --- a/packages/das/src/entities/LabelEvent.entity.ts +++ b/packages/das/src/entities/LabelEvent.entity.ts @@ -28,4 +28,10 @@ export class LabelEvent { @Column({ type: "timestamptz" }) timestamp: string; + + // Globally-unique id of the GraphQL LabeledEvent/UnlabeledEvent node — the + // only path-independent event identity. Set on authoritative backfill rows; + // NULL on provisional live-webhook rows (the payload doesn't carry it). + @Column({ name: "github_node_id", type: "varchar", nullable: true }) + githubNodeId: string | null; } diff --git a/packages/das/src/webhook/github-fetcher.service.ts b/packages/das/src/webhook/github-fetcher.service.ts index c50e067..fd944d4 100644 --- a/packages/das/src/webhook/github-fetcher.service.ts +++ b/packages/das/src/webhook/github-fetcher.service.ts @@ -31,6 +31,13 @@ const MAX_FILE_SIZE_BYTES = 1_000_000; // Starting batch size for batched GraphQL file-content requests. Halves on failure. const GRAPHQL_FILES_BATCH_SIZE = 50; +// Max skew between a live label webhook's mirror-receive time and the GitHub +// event createdAt that backfill records, used to pair a provisional live row to +// its authoritative backfill row (#129). Covers normal webhook delivery latency +// plus clock skew; kept small so genuine repeat actions (add -> remove -> re-add) +// aren't merged. MUST match the interval in packages/db/07_label_events.sql. +const LABEL_EVENT_RECONCILE_WINDOW_SECONDS = 120; + const sleep = (ms: number): Promise => new Promise((resolve) => setTimeout(resolve, ms)); @@ -732,6 +739,7 @@ export class GitHubFetcherService implements OnModuleInit { nodes { __typename ... on LabeledEvent { + id createdAt label { name } actor { @@ -740,6 +748,7 @@ export class GitHubFetcherService implements OnModuleInit { } } ... on UnlabeledEvent { + id createdAt label { name } actor { @@ -921,6 +930,7 @@ export class GitHubFetcherService implements OnModuleInit { createdAt } ... on LabeledEvent { + id createdAt label { name } actor { @@ -929,6 +939,7 @@ export class GitHubFetcherService implements OnModuleInit { } } ... on UnlabeledEvent { + id createdAt label { name } actor { @@ -1027,12 +1038,14 @@ export class GitHubFetcherService implements OnModuleInit { } /** - * Insert LABELED_EVENT / UNLABELED_EVENT timeline nodes into label_events. - * Idempotent: relies on the uq_label_events_natural_key UNIQUE index so - * re-running backfill (or BullMQ retries) collapses to a no-op for events - * already written. Actor role is resolved at read time via - * contributor_repo_roles using stored PR/issue, review, and comment - * association evidence; GraphQL's actor type doesn't expose authorAssociation. + * Insert LABELED_EVENT / UNLABELED_EVENT timeline nodes into label_events as + * the authoritative record (github_node_id set, timestamp = GitHub createdAt). + * Idempotent across backfill re-runs via the uq_label_events_github_node_id + * partial unique index — same node id collapses to a no-op. After writing, it + * reconciles away any provisional live-webhook duplicates (see #129). Actor + * role is resolved at read time via contributor_repo_roles using stored + * PR/issue, review, and comment association evidence; GraphQL's actor type + * doesn't expose authorAssociation. */ private async saveLabelTimelineEvents( repoFullName: string, @@ -1041,7 +1054,7 @@ export class GitHubFetcherService implements OnModuleInit { nodes: any[], ): Promise { const rows = nodes - .filter((node) => node && node.label?.name && node.createdAt) + .filter((node) => node && node.id && node.label?.name && node.createdAt) .map((node) => ({ repoFullName, targetNumber, @@ -1053,6 +1066,7 @@ export class GitHubFetcherService implements OnModuleInit { : null, actorLogin: node.actor?.login ?? null, timestamp: node.createdAt, + githubNodeId: node.id, })); if (rows.length === 0) return; @@ -1063,5 +1077,61 @@ export class GitHubFetcherService implements OnModuleInit { .values(rows) .orIgnore() .execute(); + + await this.reconcileProvisionalLabelEvents( + repoFullName, + targetNumber, + targetType, + ); + } + + /** + * Collapse the live↔backfill cross-path duplicate (#129). The live webhook + * can only write a provisional row (github_node_id NULL, timestamp = mirror + * receive time); backfill writes the authoritative row (github_node_id set, + * timestamp = GitHub createdAt). The two timestamps never match, so they can't + * be deduped by key. Here, once the authoritative rows exist, delete each + * provisional row that an authoritative row supersedes. + * + * Pairing is 1:1 and nearest-in-time: each authoritative row claims at most + * the single closest provisional row (same repo/target/label/action) within + * the delivery-latency window. A provisional row with no authoritative row to + * claim it survives — so events backfill hasn't captured yet (or were dropped + * by the timeline's truncation) are never lost. + */ + private async reconcileProvisionalLabelEvents( + repoFullName: string, + targetNumber: number, + targetType: "pr" | "issue", + ): Promise { + await this.labelEventRepo.query( + ` + DELETE FROM label_events + WHERE id IN ( + SELECT DISTINCT ON (auth.id) prov.id + FROM label_events auth + JOIN label_events prov + ON prov.repo_full_name = auth.repo_full_name + AND prov.target_number IS NOT DISTINCT FROM auth.target_number + AND prov.target_type = auth.target_type + AND prov.label_name = auth.label_name + AND prov.action = auth.action + AND prov.github_node_id IS NULL + AND auth.timestamp BETWEEN prov.timestamp - ($4::int * interval '1 second') + AND prov.timestamp + ($4::int * interval '1 second') + WHERE auth.repo_full_name = $1 + AND auth.target_number IS NOT DISTINCT FROM $2 + AND auth.target_type = $3 + AND auth.github_node_id IS NOT NULL + ORDER BY auth.id, abs(extract(epoch FROM (auth.timestamp - prov.timestamp))) + ) + `, + [ + repoFullName, + targetNumber, + targetType, + LABEL_EVENT_RECONCILE_WINDOW_SECONDS, + ], + ); } } diff --git a/packages/das/src/webhook/handlers/installation.handler.ts b/packages/das/src/webhook/handlers/installation.handler.ts index 2638652..7eaf630 100644 --- a/packages/das/src/webhook/handlers/installation.handler.ts +++ b/packages/das/src/webhook/handlers/installation.handler.ts @@ -38,22 +38,19 @@ export class InstallationHandler { payload.repositories ?? payload.repositories_added ?? []; for (const repo of repos) { - // Check existence first so we only set added_at on insert, not on every - // re-fire of installation.created / installation_repositories.added. - const existing = await this.repoRepo.findOneBy({ - repoFullName: repo.full_name, - }); - if (existing) { - await this.repoRepo.update(repo.full_name, { - installationId: String(installationId), - }); - } else { - await this.repoRepo.insert({ + // Atomic upsert: insert with addedAt on first encounter; on conflict only + // update installationId so addedAt is never overwritten on re-fires. + await this.repoRepo + .createQueryBuilder() + .insert() + .into(Repo) + .values({ repoFullName: repo.full_name, installationId: String(installationId), addedAt: new Date().toISOString(), - }); - } + }) + .orUpdate(["installationId"], ["repoFullName"]) + .execute(); this.logger.log(`Tracking repo: ${repo.full_name}`); } diff --git a/packages/das/src/webhook/handlers/label.handler.ts b/packages/das/src/webhook/handlers/label.handler.ts index ab813bb..6412380 100644 --- a/packages/das/src/webhook/handlers/label.handler.ts +++ b/packages/das/src/webhook/handlers/label.handler.ts @@ -33,13 +33,17 @@ export class LabelHandler { const targetNumber: number = source === "pr" ? payload.pull_request.number : payload.issue.number; - // Append to label_events log. Actor's repo role is resolved at read time - // via contributor_repo_roles (see pr_labels_by_actor view) using stored - // PR/issue, review, and comment association evidence — neither the webhook - // sender nor GraphQL LabeledEvent.actor expose author_association. - // orIgnore() makes the insert idempotent under the uq_label_events_natural_key - // constraint; same-delivery retries are already gated upstream by - // webhook_deliveries, this is defense-in-depth. + // Append a PROVISIONAL row to label_events. The webhook payload carries no + // LabeledEvent/UnlabeledEvent node id, so github_node_id is left NULL and the + // timestamp is the mirror-receive time (not GitHub's event time). Backfill + // later writes the authoritative row (github_node_id set, timestamp = + // createdAt) and reconciles this provisional duplicate away — the two clocks + // never match, so they can't be deduped by key (see #129). Actor's repo role + // is resolved at read time via contributor_repo_roles (see pr_labels_by_actor + // view) using stored PR/issue, review, and comment association evidence — + // neither the webhook sender nor GraphQL LabeledEvent.actor expose + // author_association. same-delivery retries are gated upstream by + // webhook_deliveries; orIgnore() is harmless defense-in-depth. await this.labelEventRepo .createQueryBuilder() .insert() @@ -52,6 +56,7 @@ export class LabelHandler { actorGithubId: sender ? String(sender.id) : null, actorLogin: sender?.login ?? null, timestamp: new Date().toISOString(), + githubNodeId: null, }) .orIgnore() .execute(); diff --git a/packages/db/07_label_events.sql b/packages/db/07_label_events.sql index 8c2374f..4f11ec8 100644 --- a/packages/db/07_label_events.sql +++ b/packages/db/07_label_events.sql @@ -2,6 +2,16 @@ -- Actor's repo role (author_association) is NOT stored here — neither the -- webhook sender nor GraphQL LabeledEvent.actor expose it. The labels views -- resolve the role at read time via contributor_repo_roles. +-- +-- Event identity is github_node_id: the globally-unique id of the +-- LabeledEvent/UnlabeledEvent GraphQL node. It is the only path-independent +-- identifier — backfill carries it; the live webhook payload does not. So the +-- live path writes a *provisional* row (github_node_id NULL, timestamp = mirror +-- receive time) and backfill writes the *authoritative* row (github_node_id set, +-- timestamp = GitHub createdAt). Backfill then reconciles: it deletes the +-- provisional duplicate once the authoritative row lands. `timestamp` alone can +-- NOT be the dedup key — it is sourced from two clocks (mirror vs GitHub) that +-- never coincide for the same action (see issue #129). CREATE TABLE IF NOT EXISTS label_events ( id SERIAL PRIMARY KEY, @@ -12,12 +22,56 @@ CREATE TABLE IF NOT EXISTS label_events ( action VARCHAR(20) NOT NULL, actor_github_id VARCHAR(255), actor_login VARCHAR(255), - timestamp TIMESTAMPTZ NOT NULL + timestamp TIMESTAMPTZ NOT NULL, + github_node_id VARCHAR ); +-- Existing deployments: add the identity column (NULL for every historic row; +-- those rows are reconciled/back-identified on the next backfill). +ALTER TABLE label_events ADD COLUMN IF NOT EXISTS github_node_id VARCHAR; + CREATE INDEX IF NOT EXISTS idx_label_events_target ON label_events(repo_full_name, target_number, timestamp); -CREATE UNIQUE INDEX IF NOT EXISTS uq_label_events_natural_key - ON label_events (repo_full_name, target_number, target_type, - label_name, action, timestamp) - NULLS NOT DISTINCT; +-- One-time cross-path dedup of historic rows. Gated on the presence of the +-- pre-#129 uq_label_events_natural_key index, used here purely as a "not yet +-- migrated" sentinel so this runs exactly once and is a no-op on every later +-- (re)deploy. Before github_node_id existed, the live path stored mirror-receive +-- time and backfill stored GitHub createdAt for the SAME action, and the old +-- natural key (which includes timestamp) never collapsed them — so every action +-- seen by both paths produced two rows. Collapse each such cluster to its +-- earliest row (closest to GitHub's event time). Genuine repeat actions +-- (add -> remove -> re-add) are spaced far wider than the webhook delivery +-- window, so they survive. The 120s window MUST match +-- LABEL_EVENT_RECONCILE_WINDOW_SECONDS in github-fetcher.service.ts. +DO $$ +BEGIN + IF to_regclass('public.uq_label_events_natural_key') IS NOT NULL THEN + DELETE FROM label_events later + WHERE later.github_node_id IS NULL + AND EXISTS ( + SELECT 1 + FROM label_events earlier + WHERE earlier.repo_full_name = later.repo_full_name + AND earlier.target_number IS NOT DISTINCT FROM later.target_number + AND earlier.target_type = later.target_type + AND earlier.label_name = later.label_name + AND earlier.action = later.action + AND earlier.github_node_id IS NULL + AND earlier.timestamp < later.timestamp + AND later.timestamp - earlier.timestamp <= interval '120 seconds' + ); + END IF; +END $$; + +-- Retire the old guard: it includes `timestamp` (so it never collapses +-- cross-path duplicates) and, worse, with NULLS NOT DISTINCT it would collapse +-- two genuinely-distinct events that happen to share a createdAt second — the +-- exact case github_node_id is meant to keep apart. +DROP INDEX IF EXISTS uq_label_events_natural_key; + +-- Dedup guard repointed onto the stable GitHub identity. Partial so the many +-- provisional live rows (github_node_id NULL) are never constrained against each +-- other; backfill↔backfill collapses by true identity. +CREATE UNIQUE INDEX IF NOT EXISTS uq_label_events_github_node_id + ON label_events (github_node_id) + WHERE github_node_id IS NOT NULL;