Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ services:
environment:
# GitHub Configuration
GITHUB_REPOS: "mozilla-firefox/firefox"
GITHUB_TOKEN: "" # Not needed for mock API

# Use mock GitHub API instead of real API
GITHUB_API_URL: "http://mock-github-api:5000"
Expand Down
255 changes: 177 additions & 78 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@

logger = logging.getLogger(__name__)

_RETRYABLE_STATUS_CODES: frozenset[int] = frozenset({500, 502, 503, 504})
_MAX_RETRIES: int = 5
_RETRY_BASE_DELAY: float = 1.0
_RETRY_MAX_DELAY: float = 60.0
_RETRY_MULTIPLIER: float = 2.0
_MAX_AUTH_RETRIES: int = 2
_REQUEST_TIMEOUT: float = 30.0


@dataclass(frozen=True)
class AccessToken:
Expand Down Expand Up @@ -213,7 +221,7 @@ def extract_pull_requests(
while True:
if refresh_auth:
refresh_auth()
resp = github_get(session, base_url, params=params)
resp = github_get(session, base_url, params=params, refresh_auth=refresh_auth)

batch = resp.json()
pages += 1
Expand All @@ -229,13 +237,13 @@ def extract_pull_requests(
if not pr_number:
continue
pr["commit_data"] = extract_commits(
session, repo, pr_number, github_api_url
session, repo, pr_number, github_api_url, refresh_auth=refresh_auth
)
pr["reviewer_data"] = extract_reviewers(
session, repo, pr_number, github_api_url
session, repo, pr_number, github_api_url, refresh_auth=refresh_auth
)
pr["comment_data"] = extract_comments(
session, repo, pr_number, github_api_url
session, repo, pr_number, github_api_url, refresh_auth=refresh_auth
)

yield batch
Expand Down Expand Up @@ -276,6 +284,7 @@ def extract_commits(
repo: str,
pr_number: int,
github_api_url: str = "https://api.github.com",
refresh_auth: Optional[Callable[[], None]] = None,
) -> list[dict]:
"""
Extract commits and files for a specific pull request.
Expand All @@ -294,13 +303,13 @@ def extract_commits(

logger.info(f"Commits URL: {commits_url}")

resp = github_get(session, commits_url)
resp = github_get(session, commits_url, refresh_auth=refresh_auth)

commits = resp.json()
for commit in commits:
commit_sha = commit.get("sha")
commit_url = f"{github_api_url}/repos/{repo}/commits/{commit_sha}"
commit_data = github_get(session, commit_url).json()
commit_data = github_get(session, commit_url, refresh_auth=refresh_auth).json()
commit["files"] = commit_data.get("files", [])

logger.info(f"Extracted {len(commits)} commits for PR #{pr_number}")
Expand All @@ -312,6 +321,7 @@ def extract_reviewers(
repo: str,
pr_number: int,
github_api_url: str = "https://api.github.com",
refresh_auth: Optional[Callable[[], None]] = None,
) -> list[dict]:
"""
Extract reviewers for a specific pull request.
Expand All @@ -330,7 +340,7 @@ def extract_reviewers(

logger.info(f"Reviewers URL: {reviewers_url}")

reviewers = github_get(session, reviewers_url).json()
reviewers = github_get(session, reviewers_url, refresh_auth=refresh_auth).json()

filtered = [r for r in reviewers if r.get("user") is not None]
skipped = len(reviewers) - len(filtered)
Expand All @@ -346,6 +356,7 @@ def extract_comments(
repo: str,
pr_number: int,
github_api_url: str = "https://api.github.com",
refresh_auth: Optional[Callable[[], None]] = None,
) -> list[dict]:
"""
Extract comments for a specific pull request.
Expand All @@ -364,7 +375,7 @@ def extract_comments(

logger.info(f"Comments URL: {comments_url}")

comments = github_get(session, comments_url).json()
comments = github_get(session, comments_url, refresh_auth=refresh_auth).json()

filtered = [c for c in comments if c.get("user") is not None and c.get("body")]
skipped = len(comments) - len(filtered)
Expand All @@ -389,35 +400,109 @@ def sleep_for_rate_limit(resp: requests.Response) -> None:
time.sleep(sleep_time)


def _is_html_error_page(resp: requests.Response) -> bool:
"""Return True when GitHub returns an HTML error page instead of JSON."""
content_type = resp.headers.get("Content-Type", "")
return "text/html" in content_type and resp.status_code >= 400


def github_get(
session: requests.Session,
url: str,
params: Optional[dict] = None,
refresh_auth: Optional[Callable[[], None]] = None,
) -> requests.Response:
"""
Make a GitHub API GET request, retrying in a loop on rate limit.
Make a GitHub API GET request, retrying on transient errors and expired tokens.

Retry behaviour:
- 403 rate-limit: sleeps until reset, then retries (unbounded, existing behaviour).
- 401 bad credentials: calls refresh_auth() then retries, up to
_MAX_AUTH_RETRIES times. If refresh_auth is None the error is
treated as non-retryable.
- 5xx / HTML error page: exponential backoff up to _MAX_RETRIES attempts.
- Network-level errors (Timeout, ConnectionError): same exponential backoff.
- All other non-200 responses (404, 422 ...): raises SystemExit immediately.

Args:
session: Authenticated requests session
url: URL to fetch
params: Optional query parameters
refresh_auth: Optional callable that refreshes the session's Authorization
header. Called automatically when a 401 response is received.

Returns:
Successful response (status 200)

Raises:
SystemExit: On non-200, non-rate-limit errors
SystemExit: When all retries are exhausted or a non-retryable error occurs.
"""
auth_retries = _MAX_AUTH_RETRIES
transient_retries = _MAX_RETRIES
backoff = _RETRY_BASE_DELAY

while True:
resp = session.get(url, params=params)
try:
resp = session.get(url, params=params, timeout=_REQUEST_TIMEOUT)
except (
requests.exceptions.Timeout,
requests.exceptions.ConnectionError,
) as exc:
if transient_retries > 0:
logger.warning(
f"Network error for {url}: {exc}. "
f"Retrying in {backoff:.0f}s ({transient_retries} retries left)"
)
time.sleep(backoff)
backoff = min(backoff * _RETRY_MULTIPLIER, _RETRY_MAX_DELAY)
transient_retries -= 1
continue
raise SystemExit(
f"GitHub API request failed after retries for {url}: {exc}"
)

if resp.status_code == 200:
return resp

if (
resp.status_code == 403
and int(resp.headers.get("X-RateLimit-Remaining", "1")) == 0
):
sleep_for_rate_limit(resp)
continue

if resp.status_code == 401:
if auth_retries > 0 and refresh_auth is not None:
logger.warning(
f"401 for {url}, refreshing auth token ({auth_retries} retries left)"
)
refresh_auth()
auth_retries -= 1
continue
if refresh_auth is None:
auth_error_detail = "with no refresh_auth configured"
else:
auth_error_detail = f"after {_MAX_AUTH_RETRIES} refresh attempts"
raise SystemExit(
f"GitHub API auth error 401 for {url} {auth_error_detail}: "
f"{resp.text or 'No response text'}"
)

if resp.status_code in _RETRYABLE_STATUS_CODES or _is_html_error_page(resp):
if transient_retries > 0:
logger.warning(
f"Transient error {resp.status_code} for {url}. "
f"Retrying in {backoff:.0f}s ({transient_retries} retries left)"
)
time.sleep(backoff)
backoff = min(backoff * _RETRY_MULTIPLIER, _RETRY_MAX_DELAY)
transient_retries -= 1
continue
raise SystemExit(
f"GitHub API error {resp.status_code} for {url} after {_MAX_RETRIES} retries: "
f"{resp.text or 'No response text'}"
)

raise SystemExit(
f"GitHub API error {resp.status_code} for {url}: {resp.text or 'No response text'}"
)
Expand Down Expand Up @@ -826,81 +911,95 @@ def _main() -> int:
total_processed = 0
snapshot_date = datetime.now(timezone.utc).strftime("%Y-%m-%d")

failed_repos: list[str] = []

for repo in github_repos:
# Delete any existing rows for this (repo, snapshot_date) before loading.
# This makes every run idempotent: if a previous run crashed mid-way and left
# partial data, a rerun will clean up the partial write and reload cleanly.
if snapshot_exists(bigquery_client, bigquery_dataset, repo, snapshot_date):
logger.info(
f"Deleting partial/existing snapshot for {repo} on {snapshot_date} before reload"
)
delete_existing_snapshot(
bigquery_client, bigquery_dataset, repo, snapshot_date
)
try:
# Delete any existing rows for this (repo, snapshot_date) before loading.
# This makes every run idempotent: if a previous run crashed mid-way and left
# partial data, a rerun will clean up the partial write and reload cleanly.
if snapshot_exists(bigquery_client, bigquery_dataset, repo, snapshot_date):
logger.info(
f"Deleting partial/existing snapshot for {repo} on {snapshot_date} before reload"
)
delete_existing_snapshot(
bigquery_client, bigquery_dataset, repo, snapshot_date
)

# Build a per-repo token refresh callable. It is called by the generator
# before each page fetch, so every API request (PRs + commits + reviewers +
# comments) uses a valid token. The access_token_cache means this only hits
# the GitHub API when the cached token has <60 seconds remaining.
refresh_auth: Optional[Callable[[], None]] = None
if github_app_id and github_private_key:

def _make_refresh(
_repo: str = repo,
) -> Callable[[], None]:
def _refresh() -> None:
try:
app_jwt = generate_github_jwt(github_app_id, github_private_key)
access_token = get_installation_access_token(
app_jwt, _repo, github_api_url
)
except Exception as e:
raise RuntimeError(
f"Failed to obtain GitHub App access token for {_repo}: {e}. "
"Check that GITHUB_APP_ID is correct and GITHUB_PRIVATE_KEY "
"is a valid PEM-encoded RSA private key."
) from e
session.headers["Authorization"] = f"Bearer {access_token}"

return _refresh

refresh_auth = _make_refresh()
# Set the token immediately so the first generator page is authenticated.
refresh_auth()
# Build a per-repo token refresh callable. It is called by the generator
# before each page fetch, so every API request (PRs + commits + reviewers +
# comments) uses a valid token. The access_token_cache means this only hits
# the GitHub API when the cached token has <60 seconds remaining.
refresh_auth: Optional[Callable[[], None]] = None
if github_app_id and github_private_key:

def _make_refresh(
_repo: str = repo,
) -> Callable[[], None]:
def _refresh() -> None:
try:
app_jwt = generate_github_jwt(
github_app_id, github_private_key
)
access_token = get_installation_access_token(
app_jwt, _repo, github_api_url
)
except Exception as e:
raise RuntimeError(
f"Failed to obtain GitHub App access token for {_repo}: {e}. "
"Check that GITHUB_APP_ID is correct and GITHUB_PRIVATE_KEY "
"is a valid PEM-encoded RSA private key."
) from e
session.headers["Authorization"] = f"Bearer {access_token}"

return _refresh

refresh_auth = _make_refresh()
# Set the token immediately so the first generator page is authenticated.
refresh_auth()

for chunk_count, chunk in enumerate(
extract_pull_requests(
session,
repo,
chunk_size=100,
github_api_url=github_api_url,
refresh_auth=refresh_auth,
),
start=1,
):
logger.info(f"Processing chunk {chunk_count} with {len(chunk)} PRs")

# Transform
transformed_data = transform_data(chunk, repo)

# Load
load_data(
bigquery_client,
bigquery_dataset,
transformed_data,
snapshot_date,
use_streaming_insert=bool(emulator_host),
)

for chunk_count, chunk in enumerate(
extract_pull_requests(
session,
repo,
chunk_size=100,
github_api_url=github_api_url,
refresh_auth=refresh_auth,
),
start=1,
):
logger.info(f"Processing chunk {chunk_count} with {len(chunk)} PRs")

# Transform
transformed_data = transform_data(chunk, repo)

# Load
load_data(
bigquery_client,
bigquery_dataset,
transformed_data,
snapshot_date,
use_streaming_insert=bool(emulator_host),
)
total_processed += len(chunk)
logger.info(
f"Completed chunk {chunk_count}. Total PRs processed: {total_processed}"
)
except (SystemExit, RuntimeError) as exc:
logger.error(f"Failed to process repo {repo}: {exc}")
failed_repos.append(repo)
continue

total_processed += len(chunk)
logger.info(
f"Completed chunk {chunk_count}. Total PRs processed: {total_processed}"
)
if failed_repos:
logger.error(
f"ETL completed with failures. Failed repos: {', '.join(failed_repos)}"
)
return 1

logger.info(
f"GitHub ETL process completed successfully. Total PRs processed: {total_processed}"
)

return 0


Expand Down
1 change: 1 addition & 0 deletions tests/test_extract_comments.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ def test_api_error_comments(mock_session):
error_response = Mock()
error_response.status_code = 404
error_response.text = "Not Found"
error_response.headers = {}

mock_session.get.return_value = error_response

Expand Down
Loading
Loading