diff --git a/app/services/jobs/crawl.server.ts b/app/services/jobs/crawl.server.ts index 84210072..ef764853 100644 --- a/app/services/jobs/crawl.server.ts +++ b/app/services/jobs/crawl.server.ts @@ -56,6 +56,8 @@ export const crawlJob = defineJob({ const repoCount = organization.repositories.length const updatedPrNumbers = new Map>() + const FETCH_ALL_SENTINEL = '2000-01-01T00:00:00Z' + // Step 2: Fetch per repo for (let i = 0; i < organization.repositories.length; i++) { const repo = organization.repositories[i] @@ -81,35 +83,33 @@ export const crawlJob = defineJob({ }) } - // Step 2b: Fetch PR list - const allPullRequests = await step.run( - `fetch-prs:${repoLabel}`, - async () => { - step.progress(i + 1, repoCount, `Fetching PR list: ${repoLabel}...`) - return await fetcher.pullrequests() - }, - ) - - // Determine which PRs need detail fetching (cached for deterministic resume) + // Step 2b: Determine lastFetchedAt (before list fetch for early termination) const lastFetchedAt = await step.run( `last-fetched-at:${repoLabel}`, async () => { - if (input.refresh) return '2000-01-01T00:00:00Z' + if (input.refresh) return FETCH_ALL_SENTINEL return ( (await store.getLatestUpdatedAt().catch(() => null)) ?? - '2000-01-01T00:00:00Z' + FETCH_ALL_SENTINEL ) }, ) + // Step 2c: Fetch lightweight PR list (number + updatedAt only) const prNumberSet = input.prNumbers ? new Set(input.prNumbers) : null - const prsToFetch = prNumberSet - ? allPullRequests.filter((pr) => prNumberSet.has(pr.number)) - : input.refresh - ? allPullRequests - : allPullRequests.filter((pr) => pr.updatedAt > lastFetchedAt) - - // Step 2c: Fetch details per PR + const prsToFetch: Array<{ number: number }> = prNumberSet + ? // --pr 指定時: リスト取得をスキップし、指定番号だけ処理する + (input.prNumbers?.map((n) => ({ number: n })) ?? []) + : await step.run(`fetch-prs:${repoLabel}`, async () => { + step.progress(i + 1, repoCount, `Fetching PR list: ${repoLabel}...`) + const stopBefore = + input.refresh || lastFetchedAt === FETCH_ALL_SENTINEL + ? undefined + : lastFetchedAt + return await fetcher.pullrequestList(stopBefore) + }) + + // Step 2d: Fetch details per PR (including PR metadata) const repoUpdated = new Set() for (let j = 0; j < prsToFetch.length; j++) { const pr = prsToFetch[j] @@ -122,16 +122,23 @@ export const crawlJob = defineJob({ `Fetching ${repoLabel}#${pr.number} (${j + 1}/${prsToFetch.length})...`, ) try { - const [commits, discussions, reviews, timelineItems, files] = - await Promise.all([ - fetcher.commits(pr.number), - fetcher.comments(pr.number), - fetcher.reviews(pr.number), - fetcher.timelineItems(pr.number), - fetcher.files(pr.number), - ]) - pr.files = files - await store.savePrData(pr, { + const [ + prMetadata, + commits, + discussions, + reviews, + timelineItems, + files, + ] = await Promise.all([ + fetcher.pullrequest(pr.number), + fetcher.commits(pr.number), + fetcher.comments(pr.number), + fetcher.reviews(pr.number), + fetcher.timelineItems(pr.number), + fetcher.files(pr.number), + ]) + prMetadata.files = files + await store.savePrData(prMetadata, { commits, reviews, discussions, diff --git a/batch/github/fetcher.ts b/batch/github/fetcher.ts index c5e3335d..07f03e07 100644 --- a/batch/github/fetcher.ts +++ b/batch/github/fetcher.ts @@ -82,6 +82,85 @@ const GetPullRequestsQuery = graphql(` } `) +const GetPullRequestListQuery = graphql(` + query GetPullRequestList( + $owner: String! + $repo: String! + $cursor: String + $first: Int = 100 + ) { + repository(owner: $owner, name: $repo) { + pullRequests( + first: $first + after: $cursor + orderBy: { field: UPDATED_AT, direction: DESC } + ) { + pageInfo { + hasNextPage + endCursor + } + nodes { + number + updatedAt + } + } + } + } +`) + +const GetPullRequestQuery = graphql(` + query GetPullRequest($owner: String!, $repo: String!, $number: Int!) { + repository(owner: $owner, name: $repo) { + pullRequest(number: $number) { + databaseId + number + state + title + body + url + isDraft + createdAt + updatedAt + mergedAt + closedAt + additions + deletions + changedFiles + headRefName + baseRefName + mergeCommit { + oid + } + author { + __typename + login + } + assignees(first: 100) { + nodes { + login + } + } + reviewRequests(first: 100) { + nodes { + requestedReviewer { + __typename + ... on User { + login + } + ... on Bot { + login + } + ... on Mannequin { + login + } + } + } + } + } + } + } +`) + const GetPullRequestTimelineQuery = graphql(` query GetPullRequestTimeline($owner: String!, $repo: String!, $number: Int!) { repository(owner: $owner, name: $repo) { @@ -591,34 +670,50 @@ const GetTagsQuery = graphql(` * GitHub GraphQL の 502/504 タイムアウトに対してページサイズを縮小してリトライする。 * @returns 'retry' ならページサイズ縮小済みで continue すべき、それ以外は結果 or throw */ +/** error から HTTP status を取り出す */ +function getErrorStatus(error: unknown): number | null { + return error && typeof error === 'object' && 'status' in error + ? (error as { status: number }).status + : null +} + +/** error が transient (502/504) かどうか */ +function isTransientError(error: unknown): boolean { + const status = getErrorStatus(error) + return status === 502 || status === 504 +} + +/** error から partial data を取り出す(GraphQL の data + errors レスポンス) */ +function getPartialData(error: unknown): T | null { + if ( + error && + typeof error === 'object' && + 'data' in error && + error.data != null + ) { + return error.data as T + } + return null +} + function handleGraphQLError( error: unknown, pageSize: { value: number }, minPageSize: number, label: string, ): { action: 'retry' } | { action: 'use'; data: T } { - const status = - error && typeof error === 'object' && 'status' in error - ? (error as { status: number }).status - : null - - // HTTP 502/504 - if ((status === 502 || status === 504) && pageSize.value > minPageSize) { + // HTTP 502/504 → ページサイズ削減してリトライ + if (isTransientError(error) && pageSize.value > minPageSize) { pageSize.value = Math.max(minPageSize, Math.floor(pageSize.value / 2)) logger.warn( - `${label}: GitHub API timeout (${status}), reducing page size to ${pageSize.value}`, + `${label}: GitHub API timeout (${getErrorStatus(error)}), reducing page size to ${pageSize.value}`, ) return { action: 'retry' } } // GraphQL partial error (data + errors) - if ( - error && - typeof error === 'object' && - 'data' in error && - error.data != null - ) { - const partialData = error.data as T + const partialData = getPartialData(error) + if (partialData != null) { // data はあるが中身が空の場合はタイムアウト起因の可能性が高い const repo = partialData && @@ -635,7 +730,9 @@ function handleGraphQLError( } logger.warn( `${label}: GraphQL partial error`, - 'errors' in error ? JSON.stringify(error.errors) : '', + error && typeof error === 'object' && 'errors' in error + ? JSON.stringify((error as { errors: unknown }).errors) + : '', ) return { action: 'use', data: partialData } } @@ -710,6 +807,87 @@ export function buildRequestedAtMap( return map } +function shapePullRequestNode( + node: { + databaseId?: number | null + number: number + state: string + title: string + body?: string | null + url: string + isDraft: boolean + createdAt: string + updatedAt: string + mergedAt?: string | null + closedAt?: string | null + additions?: number | null + deletions?: number | null + changedFiles?: number | null + headRefName: string + baseRefName: string + mergeCommit?: { oid: string } | null + author?: { __typename: string; login: string } | null + assignees: { nodes?: Array<{ login: string } | null> | null } + reviewRequests?: { + nodes?: Array<{ + requestedReviewer?: { + __typename: string + login?: string + } | null + } | null> | null + } | null + } | null, + owner: string, + repo: string, +): ShapedGitHubPullRequest | null { + if (!node || !node.databaseId) return null + + const state = node.state === 'OPEN' ? 'open' : ('closed' as 'open' | 'closed') + + const reviewers: { login: string; requestedAt: string | null }[] = [] + if (node.reviewRequests?.nodes) { + for (const rr of node.reviewRequests.nodes) { + const reviewer = rr?.requestedReviewer + if ( + reviewer?.login && + (reviewer.__typename === 'User' || + reviewer.__typename === 'Bot' || + reviewer.__typename === 'Mannequin') + ) { + reviewers.push({ login: reviewer.login, requestedAt: null }) + } + } + } + + return { + id: node.databaseId, + organization: owner, + repo, + number: node.number, + state, + title: node.title, + body: node.body ?? null, + url: node.url, + author: node.author?.login ?? null, + authorIsBot: node.author?.__typename === 'Bot', + assignees: + node.assignees.nodes?.filter((n) => n != null).map((n) => n.login) ?? [], + reviewers, + draft: node.isDraft, + sourceBranch: node.headRefName, + targetBranch: node.baseRefName, + createdAt: node.createdAt, + updatedAt: node.updatedAt, + mergedAt: node.mergedAt ?? null, + closedAt: node.closedAt ?? null, + mergeCommitSha: node.mergeCommit?.oid ?? null, + additions: node.additions ?? null, + deletions: node.deletions ?? null, + changedFiles: node.changedFiles ?? null, + files: [], + } +} + interface createFetcherProps { owner: string repo: string @@ -761,24 +939,37 @@ export function shapeTagNode(node: { export const createFetcher = ({ owner, repo, octokit }: createFetcherProps) => { /** タイムアウト付き GraphQL リクエスト */ - function graphqlWithTimeout( + async function graphqlWithTimeout( query: string, variables: Record, + maxRetries = 2, ): Promise { - return Promise.race([ - octokit.graphql(query, variables), - new Promise((_, reject) => - setTimeout( - () => - reject( - Object.assign(new Error('GraphQL request timeout'), { - status: 504, - }), + for (let attempt = 0; ; attempt++) { + try { + return await Promise.race([ + octokit.graphql(query, variables), + new Promise((_, reject) => + setTimeout( + () => + reject( + Object.assign(new Error('GraphQL request timeout'), { + status: 504, + }), + ), + REQUEST_TIMEOUT_MS, ), - REQUEST_TIMEOUT_MS, - ), - ), - ]) + ), + ]) + } catch (error) { + if (isTransientError(error) && attempt < maxRetries) { + logger.warn( + `GraphQL transient error (${getErrorStatus(error)}), retrying (${attempt + 1}/${maxRetries})...`, + ) + continue + } + throw error + } + } } /** @@ -838,68 +1029,108 @@ export const createFetcher = ({ owner, repo, octokit }: createFetcherProps) => { } for (const node of pullRequests.nodes) { - if (!node || !node.databaseId) continue + const shaped = shapePullRequestNode(node, owner, repo) + if (shaped) allPulls.push(shaped) + } - // GraphQL state は OPEN/CLOSED/MERGED、REST 互換で open/closed に変換 - const state = - node.state === 'OPEN' ? 'open' : ('closed' as 'open' | 'closed') + hasNextPage = pullRequests.pageInfo.hasNextPage + cursor = pullRequests.pageInfo.endCursor ?? null + } - // reviewRequests(現在の pending reviewer)を取得 - // requestedAt は pullrequestsWithDetails() のフルリフレッシュ時のみ取得 - const reviewers: { login: string; requestedAt: string | null }[] = [] - if (node.reviewRequests?.nodes) { - for (const rr of node.reviewRequests.nodes) { - const reviewer = rr?.requestedReviewer - if ( - reviewer && - (reviewer.__typename === 'User' || - reviewer.__typename === 'Bot' || - reviewer.__typename === 'Mannequin') - ) { - reviewers.push({ - login: reviewer.login, - requestedAt: null, - }) - } - } - } + return allPulls + } - allPulls.push({ - id: node.databaseId, - organization: owner, + /** + * stopBefore が指定されると、updatedAt がそれより古い PR が出た時点でページング停止 + */ + const pullrequestList = async (stopBefore?: string) => { + type ListResult = ResultOf + + const queryStr = print(GetPullRequestListQuery) + const items: Array<{ number: number; updatedAt: string }> = [] + let cursor: string | null = null + let hasNextPage = true + const pageSizeRef = { value: 100 } + + while (hasNextPage) { + let result: ListResult + try { + result = await graphqlWithTimeout(queryStr, { + owner, repo, - number: node.number, - state, - title: node.title, - body: node.body ?? null, - url: node.url, - author: node.author?.login ?? null, - authorIsBot: node.author?.__typename === 'Bot', - assignees: - node.assignees.nodes - ?.filter((n) => n != null) - .map((n) => n.login) ?? [], - reviewers, - draft: node.isDraft, - sourceBranch: node.headRefName, - targetBranch: node.baseRefName, - createdAt: node.createdAt, - updatedAt: node.updatedAt, - mergedAt: node.mergedAt ?? null, - closedAt: node.closedAt ?? null, - mergeCommitSha: node.mergeCommit?.oid ?? null, - additions: node.additions ?? null, - deletions: node.deletions ?? null, - changedFiles: node.changedFiles ?? null, - files: [], + cursor, + first: pageSizeRef.value, }) + } catch (error: unknown) { + const handled = handleGraphQLError( + error, + pageSizeRef, + 10, + 'pullrequestList()', + ) + if (handled.action === 'retry') continue + result = handled.data } + const pullRequests = result?.repository?.pullRequests + if (!pullRequests?.nodes) { + if (pageSizeRef.value > 10) { + pageSizeRef.value = Math.max(10, Math.floor(pageSizeRef.value / 2)) + logger.warn( + `pullrequestList(): empty response, reducing page size to ${pageSizeRef.value}`, + ) + continue + } + logger.warn( + 'pullrequestList(): unexpected empty response (already at min page size)', + JSON.stringify({ + hasRepository: !!result?.repository, + hasPullRequests: !!pullRequests, + hasNodes: !!pullRequests?.nodes, + pageSize: pageSizeRef.value, + cursor, + }), + ) + break + } + + let stopped = false + for (const node of pullRequests.nodes) { + if (!node) continue + // ISO 8601 UTC 文字列同士なので lexicographic 比較 = 時系列比較 + if (stopBefore && node.updatedAt < stopBefore) { + stopped = true + break + } + items.push({ number: node.number, updatedAt: node.updatedAt }) + } + + if (stopped) break hasNextPage = pullRequests.pageInfo.hasNextPage cursor = pullRequests.pageInfo.endCursor ?? null } - return allPulls + return items + } + + const pullrequest = async ( + pullNumber: number, + ): Promise => { + type PRResult = ResultOf + + const queryStr = print(GetPullRequestQuery) + const result = await graphqlWithTimeout(queryStr, { + owner, + repo, + number: pullNumber, + }) + + const node = result?.repository?.pullRequest ?? null + const shaped = shapePullRequestNode(node, owner, repo) + if (!shaped) { + throw new Error(`PR #${pullNumber} not found in ${owner}/${repo}`) + } + return shaped } const commits = async (pullNumber: number) => { @@ -1372,6 +1603,8 @@ export const createFetcher = ({ owner, repo, octokit }: createFetcherProps) => { return { pullrequests, + pullrequestList, + pullrequest, pullrequestsWithDetails, commits, comments,