Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions packages/das/src/entities/LabelEvent.entity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,10 @@ export class LabelEvent {

@Column({ type: "timestamptz" })
timestamp: string;

// Globally-unique id of the GraphQL LabeledEvent/UnlabeledEvent node — the
// only path-independent event identity. Set on authoritative backfill rows;
// NULL on provisional live-webhook rows (the payload doesn't carry it).
@Column({ name: "github_node_id", type: "varchar", nullable: true })
githubNodeId: string | null;
}
84 changes: 77 additions & 7 deletions packages/das/src/webhook/github-fetcher.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ const MAX_FILE_SIZE_BYTES = 1_000_000;
// Starting batch size for batched GraphQL file-content requests. Halves on failure.
const GRAPHQL_FILES_BATCH_SIZE = 50;

// Max skew between a live label webhook's mirror-receive time and the GitHub
// event createdAt that backfill records, used to pair a provisional live row to
// its authoritative backfill row (#129). Covers normal webhook delivery latency
// plus clock skew; kept small so genuine repeat actions (add -> remove -> re-add)
// aren't merged. MUST match the interval in packages/db/07_label_events.sql.
const LABEL_EVENT_RECONCILE_WINDOW_SECONDS = 120;

const sleep = (ms: number): Promise<void> =>
new Promise((resolve) => setTimeout(resolve, ms));

Expand Down Expand Up @@ -732,6 +739,7 @@ export class GitHubFetcherService implements OnModuleInit {
nodes {
__typename
... on LabeledEvent {
id
createdAt
label { name }
actor {
Expand All @@ -740,6 +748,7 @@ export class GitHubFetcherService implements OnModuleInit {
}
}
... on UnlabeledEvent {
id
createdAt
label { name }
actor {
Expand Down Expand Up @@ -921,6 +930,7 @@ export class GitHubFetcherService implements OnModuleInit {
createdAt
}
... on LabeledEvent {
id
createdAt
label { name }
actor {
Expand All @@ -929,6 +939,7 @@ export class GitHubFetcherService implements OnModuleInit {
}
}
... on UnlabeledEvent {
id
createdAt
label { name }
actor {
Expand Down Expand Up @@ -1027,12 +1038,14 @@ export class GitHubFetcherService implements OnModuleInit {
}

/**
* Insert LABELED_EVENT / UNLABELED_EVENT timeline nodes into label_events.
* Idempotent: relies on the uq_label_events_natural_key UNIQUE index so
* re-running backfill (or BullMQ retries) collapses to a no-op for events
* already written. Actor role is resolved at read time via
* contributor_repo_roles using stored PR/issue, review, and comment
* association evidence; GraphQL's actor type doesn't expose authorAssociation.
* Insert LABELED_EVENT / UNLABELED_EVENT timeline nodes into label_events as
* the authoritative record (github_node_id set, timestamp = GitHub createdAt).
* Idempotent across backfill re-runs via the uq_label_events_github_node_id
* partial unique index — same node id collapses to a no-op. After writing, it
* reconciles away any provisional live-webhook duplicates (see #129). Actor
* role is resolved at read time via contributor_repo_roles using stored
* PR/issue, review, and comment association evidence; GraphQL's actor type
* doesn't expose authorAssociation.
*/
private async saveLabelTimelineEvents(
repoFullName: string,
Expand All @@ -1041,7 +1054,7 @@ export class GitHubFetcherService implements OnModuleInit {
nodes: any[],
): Promise<void> {
const rows = nodes
.filter((node) => node && node.label?.name && node.createdAt)
.filter((node) => node && node.id && node.label?.name && node.createdAt)
.map((node) => ({
repoFullName,
targetNumber,
Expand All @@ -1053,6 +1066,7 @@ export class GitHubFetcherService implements OnModuleInit {
: null,
actorLogin: node.actor?.login ?? null,
timestamp: node.createdAt,
githubNodeId: node.id,
}));

if (rows.length === 0) return;
Expand All @@ -1063,5 +1077,61 @@ export class GitHubFetcherService implements OnModuleInit {
.values(rows)
.orIgnore()
.execute();

await this.reconcileProvisionalLabelEvents(
repoFullName,
targetNumber,
targetType,
);
}

/**
* Collapse the live↔backfill cross-path duplicate (#129). The live webhook
* can only write a provisional row (github_node_id NULL, timestamp = mirror
* receive time); backfill writes the authoritative row (github_node_id set,
* timestamp = GitHub createdAt). The two timestamps never match, so they can't
* be deduped by key. Here, once the authoritative rows exist, delete each
* provisional row that an authoritative row supersedes.
*
* Pairing is 1:1 and nearest-in-time: each authoritative row claims at most
* the single closest provisional row (same repo/target/label/action) within
* the delivery-latency window. A provisional row with no authoritative row to
* claim it survives — so events backfill hasn't captured yet (or were dropped
* by the timeline's truncation) are never lost.
*/
private async reconcileProvisionalLabelEvents(
repoFullName: string,
targetNumber: number,
targetType: "pr" | "issue",
): Promise<void> {
await this.labelEventRepo.query(
`
DELETE FROM label_events
WHERE id IN (
SELECT DISTINCT ON (auth.id) prov.id
FROM label_events auth
JOIN label_events prov
ON prov.repo_full_name = auth.repo_full_name
AND prov.target_number IS NOT DISTINCT FROM auth.target_number
AND prov.target_type = auth.target_type
AND prov.label_name = auth.label_name
AND prov.action = auth.action
AND prov.github_node_id IS NULL
AND auth.timestamp BETWEEN prov.timestamp - ($4::int * interval '1 second')
AND prov.timestamp + ($4::int * interval '1 second')
WHERE auth.repo_full_name = $1
AND auth.target_number IS NOT DISTINCT FROM $2
AND auth.target_type = $3
AND auth.github_node_id IS NOT NULL
ORDER BY auth.id, abs(extract(epoch FROM (auth.timestamp - prov.timestamp)))
)
`,
[
repoFullName,
targetNumber,
targetType,
LABEL_EVENT_RECONCILE_WINDOW_SECONDS,
],
);
}
}
23 changes: 10 additions & 13 deletions packages/das/src/webhook/handlers/installation.handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,22 +38,19 @@ export class InstallationHandler {
payload.repositories ?? payload.repositories_added ?? [];

for (const repo of repos) {
// Check existence first so we only set added_at on insert, not on every
// re-fire of installation.created / installation_repositories.added.
const existing = await this.repoRepo.findOneBy({
repoFullName: repo.full_name,
});
if (existing) {
await this.repoRepo.update(repo.full_name, {
installationId: String(installationId),
});
} else {
await this.repoRepo.insert({
// Atomic upsert: insert with addedAt on first encounter; on conflict only
// update installationId so addedAt is never overwritten on re-fires.
await this.repoRepo
.createQueryBuilder()
.insert()
.into(Repo)
.values({
repoFullName: repo.full_name,
installationId: String(installationId),
addedAt: new Date().toISOString(),
});
}
})
.orUpdate(["installationId"], ["repoFullName"])
.execute();
this.logger.log(`Tracking repo: ${repo.full_name}`);
}

Expand Down
19 changes: 12 additions & 7 deletions packages/das/src/webhook/handlers/label.handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,17 @@ export class LabelHandler {
const targetNumber: number =
source === "pr" ? payload.pull_request.number : payload.issue.number;

// Append to label_events log. Actor's repo role is resolved at read time
// via contributor_repo_roles (see pr_labels_by_actor view) using stored
// PR/issue, review, and comment association evidence — neither the webhook
// sender nor GraphQL LabeledEvent.actor expose author_association.
// orIgnore() makes the insert idempotent under the uq_label_events_natural_key
// constraint; same-delivery retries are already gated upstream by
// webhook_deliveries, this is defense-in-depth.
// Append a PROVISIONAL row to label_events. The webhook payload carries no
// LabeledEvent/UnlabeledEvent node id, so github_node_id is left NULL and the
// timestamp is the mirror-receive time (not GitHub's event time). Backfill
// later writes the authoritative row (github_node_id set, timestamp =
// createdAt) and reconciles this provisional duplicate away — the two clocks
// never match, so they can't be deduped by key (see #129). Actor's repo role
// is resolved at read time via contributor_repo_roles (see pr_labels_by_actor
// view) using stored PR/issue, review, and comment association evidence —
// neither the webhook sender nor GraphQL LabeledEvent.actor expose
// author_association. same-delivery retries are gated upstream by
// webhook_deliveries; orIgnore() is harmless defense-in-depth.
await this.labelEventRepo
.createQueryBuilder()
.insert()
Expand All @@ -52,6 +56,7 @@ export class LabelHandler {
actorGithubId: sender ? String(sender.id) : null,
actorLogin: sender?.login ?? null,
timestamp: new Date().toISOString(),
githubNodeId: null,
})
.orIgnore()
.execute();
Expand Down
64 changes: 59 additions & 5 deletions packages/db/07_label_events.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,16 @@
-- Actor's repo role (author_association) is NOT stored here — neither the
-- webhook sender nor GraphQL LabeledEvent.actor expose it. The labels views
-- resolve the role at read time via contributor_repo_roles.
--
-- Event identity is github_node_id: the globally-unique id of the
-- LabeledEvent/UnlabeledEvent GraphQL node. It is the only path-independent
-- identifier — backfill carries it; the live webhook payload does not. So the
-- live path writes a *provisional* row (github_node_id NULL, timestamp = mirror
-- receive time) and backfill writes the *authoritative* row (github_node_id set,
-- timestamp = GitHub createdAt). Backfill then reconciles: it deletes the
-- provisional duplicate once the authoritative row lands. `timestamp` alone can
-- NOT be the dedup key — it is sourced from two clocks (mirror vs GitHub) that
-- never coincide for the same action (see issue #129).

CREATE TABLE IF NOT EXISTS label_events (
id SERIAL PRIMARY KEY,
Expand All @@ -12,12 +22,56 @@ CREATE TABLE IF NOT EXISTS label_events (
action VARCHAR(20) NOT NULL,
actor_github_id VARCHAR(255),
actor_login VARCHAR(255),
timestamp TIMESTAMPTZ NOT NULL
timestamp TIMESTAMPTZ NOT NULL,
github_node_id VARCHAR
);

-- Existing deployments: add the identity column (NULL for every historic row;
-- those rows are reconciled/back-identified on the next backfill).
ALTER TABLE label_events ADD COLUMN IF NOT EXISTS github_node_id VARCHAR;

CREATE INDEX IF NOT EXISTS idx_label_events_target ON label_events(repo_full_name, target_number, timestamp);

CREATE UNIQUE INDEX IF NOT EXISTS uq_label_events_natural_key
ON label_events (repo_full_name, target_number, target_type,
label_name, action, timestamp)
NULLS NOT DISTINCT;
-- One-time cross-path dedup of historic rows. Gated on the presence of the
-- pre-#129 uq_label_events_natural_key index, used here purely as a "not yet
-- migrated" sentinel so this runs exactly once and is a no-op on every later
-- (re)deploy. Before github_node_id existed, the live path stored mirror-receive
-- time and backfill stored GitHub createdAt for the SAME action, and the old
-- natural key (which includes timestamp) never collapsed them — so every action
-- seen by both paths produced two rows. Collapse each such cluster to its
-- earliest row (closest to GitHub's event time). Genuine repeat actions
-- (add -> remove -> re-add) are spaced far wider than the webhook delivery
-- window, so they survive. The 120s window MUST match
-- LABEL_EVENT_RECONCILE_WINDOW_SECONDS in github-fetcher.service.ts.
DO $$
BEGIN
IF to_regclass('public.uq_label_events_natural_key') IS NOT NULL THEN
DELETE FROM label_events later
WHERE later.github_node_id IS NULL
AND EXISTS (
SELECT 1
FROM label_events earlier
WHERE earlier.repo_full_name = later.repo_full_name
AND earlier.target_number IS NOT DISTINCT FROM later.target_number
AND earlier.target_type = later.target_type
AND earlier.label_name = later.label_name
AND earlier.action = later.action
AND earlier.github_node_id IS NULL
AND earlier.timestamp < later.timestamp
AND later.timestamp - earlier.timestamp <= interval '120 seconds'
);
END IF;
END $$;

-- Retire the old guard: it includes `timestamp` (so it never collapses
-- cross-path duplicates) and, worse, with NULLS NOT DISTINCT it would collapse
-- two genuinely-distinct events that happen to share a createdAt second — the
-- exact case github_node_id is meant to keep apart.
DROP INDEX IF EXISTS uq_label_events_natural_key;

-- Dedup guard repointed onto the stable GitHub identity. Partial so the many
-- provisional live rows (github_node_id NULL) are never constrained against each
-- other; backfill↔backfill collapses by true identity.
CREATE UNIQUE INDEX IF NOT EXISTS uq_label_events_github_node_id
ON label_events (github_node_id)
WHERE github_node_id IS NOT NULL;
Loading