-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathgithub.py
More file actions
364 lines (327 loc) · 11.8 KB
/
github.py
File metadata and controls
364 lines (327 loc) · 11.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
import os
from datetime import datetime, timedelta
from functools import lru_cache
from typing import Any, Dict, List
import requests
import threading
from concurrent.futures import ThreadPoolExecutor
from dotenv import load_dotenv
from gql import Client, gql
from gql.transport.aiohttp import AIOHTTPTransport
from config import get_github_orgs
load_dotenv()
token = os.getenv("GITHUB_TOKEN")
headers = {"Authorization": f"bearer {token}"}
_thread_local = threading.local()
def _get_client():
client = getattr(_thread_local, "client", None)
if client is None:
transport = AIOHTTPTransport(
url="https://api.github.com/graphql",
headers=headers,
)
client = Client(transport=transport, fetch_schema_from_transport=False)
_thread_local.client = client
return client
def _execute(query, variable_values=None):
client = _get_client()
if variable_values is None:
return client.execute(query)
return client.execute(query, variable_values=variable_values)
# headers used for REST API requests
rest_headers = {
"Authorization": f"bearer {token}",
"Accept": "application/vnd.github.v3.diff",
# Use the latest stable REST API version
"X-GitHub-Api-Version": "2022-11-28",
}
@lru_cache(maxsize=1)
def get_repo_ids():
if not token:
return []
# List of repositories to track in the format "owner/name".
repos = [
"apollosproject/apollos-platforms",
"apollosproject/apollos-cluster",
"apollosproject/apollos-admin",
"apollosproject/admin-transcriptions",
"apollosproject/apollos-shovel",
"apollosproject/apollos-embeds",
"differential/crossroads-anywhere",
]
ids = []
# GraphQL query for fetching a repository ID by owner and name.
repo_id_query = gql(
"""
query RepoId($owner: String!, $name: String!) {
repository(owner: $owner, name: $name) {
id
}
}
"""
)
for full_name in repos:
try:
owner, name = full_name.split("/", 1)
except ValueError:
# Skip invalid entries.
continue
params = {"owner": owner, "name": name}
try:
data = _execute(repo_id_query, variable_values=params)
except Exception:
continue
ids.append(data["repository"]["id"])
return ids
def get_prs(repo_id, pr_states):
if not token:
return []
query = gql(
"""
query PRs ($repo_id: ID!, $pr_states: [PullRequestState!], $cursor: String) {
node(id: $repo_id) {
... on Repository {
pullRequests(
first: 100,
after: $cursor,
states: $pr_states,
orderBy: {field: UPDATED_AT, direction: DESC}
) {
nodes {
author {
login
}
title
url
closedAt
isDraft
additions
reviews(
first: 10,
states: [APPROVED, CHANGES_REQUESTED]
) {
nodes {
author {
login
}
state
}
}
timelineItems(
first: 50,
itemTypes: [REVIEW_REQUESTED_EVENT],
) {
nodes {
... on ReviewRequestedEvent {
createdAt
requestedReviewer {
... on User {
login
}
}
}
}
}
reviewRequests(first: 10) {
nodes {
requestedReviewer {
... on User {
login
}
}
}
}
number
mergeable
statusCheckRollup {
state
}
}
pageInfo {
endCursor
hasNextPage
}
}
}
}
}
"""
)
all_prs = []
cursor = None
while True:
params = {"repo_id": repo_id, "pr_states": pr_states, "cursor": cursor}
try:
data = _execute(query, variable_values=params)
except Exception:
return []
payload = data["node"]["pullRequests"]
all_prs.extend(payload["nodes"])
page_info = payload["pageInfo"]
if not page_info["hasNextPage"]:
break
cursor = page_info["endCursor"]
non_draft_prs = [pr for pr in all_prs if not pr.get("isDraft", False)]
return non_draft_prs
def has_failing_required_checks(pr):
"""Return True if the PR has any failing required checks."""
rollup = pr.get("statusCheckRollup") or {}
return rollup.get("state") != "SUCCESS"
def _get_all_prs(pr_states: List[str]) -> List[Dict[str, Any]]:
"""Fetch PRs for all tracked repositories concurrently."""
repo_ids = get_repo_ids()
if not repo_ids:
return []
with ThreadPoolExecutor(max_workers=len(repo_ids)) as executor:
futures = [
executor.submit(get_prs, repo_id, pr_states) for repo_id in repo_ids
]
all_prs: List[Dict[str, Any]] = []
for future in futures:
try:
all_prs.extend(future.result())
except Exception:
continue
return all_prs
def prs_by_approver():
all_prs = _get_all_prs(["MERGED"])
prs_by_approver = {}
for pr in all_prs:
for review in pr["reviews"]["nodes"]:
if review.get("author") and review.get("state") == "APPROVED":
approver = review["author"]["login"]
prs_by_approver.setdefault(approver, []).append(pr)
return prs_by_approver
def _get_merged_prs(days: int = 30):
"""Return merged PRs within the last ``days`` days using GitHub search."""
if not token:
return []
orgs = get_github_orgs()
if not orgs:
return []
cutoff = datetime.utcnow() - timedelta(days=days)
cutoff_date = cutoff.date().isoformat()
org_filter = " ".join(f"org:{org}" for org in orgs)
search_query = f"{org_filter} is:pr is:merged merged:>={cutoff_date}"
query = gql(
"""
query SearchMergedPRs($query: String!, $cursor: String) {
search(type: ISSUE, query: $query, first: 100, after: $cursor) {
nodes {
... on PullRequest {
author { login }
reviews(first: 10, states: [APPROVED]) {
nodes {
author { login }
state
}
}
}
}
pageInfo {
hasNextPage
endCursor
}
}
}
"""
)
prs = []
cursor = None
max_pages = 10
pages = 0
while True:
try:
data = _execute(
query, variable_values={"query": search_query, "cursor": cursor}
)
except Exception:
return []
payload = data.get("search", {}) or {}
nodes = payload.get("nodes", []) or []
for node in nodes:
if node:
prs.append(node)
page_info = payload.get("pageInfo", {}) or {}
if not page_info.get("hasNextPage"):
break
cursor = page_info.get("endCursor")
pages += 1
if pages >= max_pages:
break
return prs
def merged_prs_by_author(days: int = 30) -> Dict[str, List[Dict[str, Any]]]:
"""Return merged PRs grouped by author within the given timeframe."""
prs = _get_merged_prs(days)
prs_by_author: Dict[str, List[Dict[str, Any]]] = {}
for pr in prs:
author = pr.get("author", {}).get("login")
if not author:
continue
prs_by_author.setdefault(author, []).append(pr)
return prs_by_author
def merged_prs_by_reviewer(days: int = 30) -> Dict[str, List[Dict[str, Any]]]:
"""Return merged PRs grouped by reviewer within the given timeframe."""
prs = _get_merged_prs(days)
prs_by_reviewer: Dict[str, List[Dict[str, Any]]] = {}
for pr in prs:
for review in pr.get("reviews", {}).get("nodes", []):
if review.get("author") and review.get("state") == "APPROVED":
reviewer = review["author"]["login"]
prs_by_reviewer.setdefault(reviewer, []).append(pr)
return prs_by_reviewer
def get_prs_waiting_for_review_by_reviewer():
"""Return PRs waiting on review, grouped by reviewer.
Includes pull requests with an open review request that was made more
than 24 hours ago, even if the PR has previously been reviewed. Only
includes PRs with fewer than 200 lines added.
"""
all_prs = _get_all_prs(["OPEN"])
stuck_prs = {}
for pr in all_prs:
additions = pr.get("additions")
if additions is None or additions >= 200:
continue
# only consider pull requests that are mergeable
if pr.get("mergeable") != "MERGEABLE":
continue
if not pr["reviewRequests"]["nodes"]:
continue
if any(r.get("state") == "APPROVED" for r in pr["reviews"]["nodes"]):
continue
if has_failing_required_checks(pr):
# waiting on author to fix checks
continue
for review in pr["timelineItems"]["nodes"]:
if (
review["requestedReviewer"]
and review["createdAt"]
< (datetime.now() - timedelta(hours=24)).isoformat()
):
reviewer = review["requestedReviewer"]["login"]
open_review_requests = [
req["requestedReviewer"]["login"]
for req in pr["reviewRequests"]["nodes"]
]
if reviewer not in open_review_requests:
continue
if reviewer not in stuck_prs:
stuck_prs[reviewer] = []
stuck_prs[reviewer].append(pr)
return stuck_prs
def get_prs_with_changes_requested_by_reviewer():
"""Return open PRs with change requests, grouped by the reviewer who requested changes."""
all_prs = _get_all_prs(["OPEN"])
cr_prs = {}
for pr in all_prs:
for review in pr.get("reviews", {}).get("nodes", []):
if review.get("author") and review.get("state") == "CHANGES_REQUESTED":
reviewer = review["author"]["login"]
cr_prs.setdefault(reviewer, []).append(pr)
return cr_prs
def get_pr_diff(owner: str, repo: str, number: int) -> str:
"""Return the diff for a pull request."""
url = f"https://api.github.com/repos/{owner}/{repo}/pulls/{number}"
resp = requests.get(url, headers=rest_headers)
resp.raise_for_status()
return resp.text