From 9c6b99f87ab0682c9ad26302cab8f7deed52e834 Mon Sep 17 00:00:00 2001 From: Shlok Gilda Date: Tue, 5 May 2026 00:39:52 -0400 Subject: [PATCH 1/3] centralize materialized view registry and refactor refresh task MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - add collectoss/application/db/materialized_views.py: single source of truth for all 15 existing materialized view definitions (SQL + unique index columns). get_refresh_sql() helper constructs REFRESH statements. - refactor collectoss/tasks/db/refresh_materialized_views.py: replace 14 hardcoded try/except-pass blocks with a dynamic loop over the registry. uses an AUTOCOMMIT connection so REFRESH CONCURRENTLY works correctly (it cannot run inside a transaction block). raises RuntimeError if any view fails both concurrent and non-concurrent refresh. - wire alembic_utils into collectoss/application/schema/alembic/env.py: register all views from the registry so that alembic revision --autogenerate detects SQL definition changes automatically. - add alembic-utils==0.8.8 to pyproject.toml and regenerate uv.lock. closes #243 (partial — heatmap views follow in a separate PR) Signed-off-by: Shlok Gilda --- .../application/db/materialized_views.py | 806 ++++++++++++++++++ collectoss/application/schema/alembic/env.py | 29 + .../tasks/db/refresh_materialized_views.py | 205 +---- pyproject.toml | 1 + uv.lock | 39 + 5 files changed, 904 insertions(+), 176 deletions(-) create mode 100644 collectoss/application/db/materialized_views.py diff --git a/collectoss/application/db/materialized_views.py b/collectoss/application/db/materialized_views.py new file mode 100644 index 000000000..b1598877b --- /dev/null +++ b/collectoss/application/db/materialized_views.py @@ -0,0 +1,806 @@ +"""Centralized registry of PostgreSQL materialized views in CollectOSS. + +This module is the single source of truth for materialized view definitions. +The refresh task iterates over MATERIALIZED_VIEWS to refresh all views. +Alembic migrations for NEW views should inline their SQL (migrations are +immutable snapshots), but this registry is the canonical reference for what +views exist and how to refresh them. + +alembic_utils autogenerate is wired up (phase 2 complete). Running +`alembic revision --autogenerate` will detect changes to view SQL definitions. +alembic_utils uses a round-trip comparison: it creates the Python-defined view +in a savepoint, reads back what PostgreSQL stored, and diffs that against the +live database. Manual SQL normalization is not required. + +WARNING: ALL views must be registered here or autogenerate will propose +dropping unregistered ones. Keep this list complete. + +alembic_utils does NOT manage indexes on materialized views. See the developer +note in collectoss/application/schema/alembic/env.py for the full post-replace +procedure (index recreation + non-concurrent refresh). +""" + + +# --------------------------------------------------------------------------- +# View 1: api_get_all_repo_prs (source: migration 4, index: migration 25) +# --------------------------------------------------------------------------- +_API_GET_ALL_REPO_PRS = """\ +SELECT pull_requests.repo_id, + count(*) AS pull_requests_all_time +FROM augur_data.pull_requests +GROUP BY pull_requests.repo_id""" + +# --------------------------------------------------------------------------- +# View 2: issue_reporter_created_at (source: augur_full.sql legacy DDL) +# NOTE: This view has only a non-unique btree index (repo_id); it cannot be +# refreshed CONCURRENTLY. The refresh task will fall back to non-concurrent. +# --------------------------------------------------------------------------- +_ISSUE_REPORTER_CREATED_AT = """\ +SELECT i.reporter_id, + i.created_at, + i.repo_id + FROM augur_data.issues i + ORDER BY i.created_at""" + +# --------------------------------------------------------------------------- +# View 3: explorer_entry_list (source: migration 4, index: migration 25) +# --------------------------------------------------------------------------- +_EXPLORER_ENTRY_LIST = """\ +SELECT DISTINCT r.repo_git, + r.repo_id, + r.repo_name, + rg.rg_name +FROM (augur_data.repo r + JOIN augur_data.repo_groups rg ON ((rg.repo_group_id = r.repo_group_id))) +ORDER BY rg.rg_name""" + +# --------------------------------------------------------------------------- +# View 4: explorer_commits_and_committers_daily_count +# (source: migration 4, index: migration 25) +# --------------------------------------------------------------------------- +_EXPLORER_COMMITS_AND_COMMITTERS_DAILY_COUNT = """\ +SELECT repo.repo_id, + repo.repo_name, + commits.cmt_committer_date, + count(commits.cmt_id) AS num_of_commits, + count(DISTINCT commits.cmt_committer_raw_email) AS num_of_unique_committers +FROM (augur_data.commits + LEFT JOIN augur_data.repo ON ((repo.repo_id = commits.repo_id))) +GROUP BY repo.repo_id, repo.repo_name, commits.cmt_committer_date +ORDER BY repo.repo_id, commits.cmt_committer_date""" + +# --------------------------------------------------------------------------- +# View 5: api_get_all_repos_commits (source: migration 4, index: migration 25) +# --------------------------------------------------------------------------- +_API_GET_ALL_REPOS_COMMITS = """\ +SELECT commits.repo_id, + count(DISTINCT commits.cmt_commit_hash) AS commits_all_time +FROM augur_data.commits +GROUP BY commits.repo_id""" + +# --------------------------------------------------------------------------- +# View 6: api_get_all_repos_issues (source: migration 4, index: migration 25) +# --------------------------------------------------------------------------- +_API_GET_ALL_REPOS_ISSUES = """\ +SELECT issues.repo_id, + count(*) AS issues_all_time +FROM augur_data.issues +WHERE (issues.pull_request IS NULL) +GROUP BY issues.repo_id""" + +# --------------------------------------------------------------------------- +# View 7: augur_new_contributors (source: migration 25, recreated) +# --------------------------------------------------------------------------- +_AUGUR_NEW_CONTRIBUTORS = """\ +SELECT a.id AS cntrb_id, + a.created_at, + a.repo_id, + a.action, + repo.repo_name, + a.login, + row_number() OVER (PARTITION BY a.id, a.repo_id ORDER BY a.created_at DESC) AS rank + FROM ( SELECT commits.cmt_ght_author_id AS id, + commits.cmt_author_timestamp AS created_at, + commits.repo_id, + 'commit'::text AS action, + contributors.cntrb_login AS login + FROM (augur_data.commits + LEFT JOIN augur_data.contributors ON (((contributors.cntrb_id)::text = (commits.cmt_ght_author_id)::text))) + GROUP BY commits.cmt_commit_hash, commits.cmt_ght_author_id, commits.repo_id, commits.cmt_author_timestamp, 'commit'::text, contributors.cntrb_login + UNION ALL + SELECT issues.reporter_id AS id, + issues.created_at, + issues.repo_id, + 'issue_opened'::text AS action, + contributors.cntrb_login AS login + FROM (augur_data.issues + LEFT JOIN augur_data.contributors ON ((contributors.cntrb_id = issues.reporter_id))) + WHERE (issues.pull_request IS NULL) + UNION ALL + SELECT pull_request_events.cntrb_id AS id, + pull_request_events.created_at, + pull_requests.repo_id, + 'pull_request_closed'::text AS action, + contributors.cntrb_login AS login + FROM augur_data.pull_requests, + (augur_data.pull_request_events + LEFT JOIN augur_data.contributors ON ((contributors.cntrb_id = pull_request_events.cntrb_id))) + WHERE ((pull_requests.pull_request_id = pull_request_events.pull_request_id) AND (pull_requests.pr_merged_at IS NULL) AND ((pull_request_events.action)::text = 'closed'::text)) + UNION ALL + SELECT pull_request_events.cntrb_id AS id, + pull_request_events.created_at, + pull_requests.repo_id, + 'pull_request_merged'::text AS action, + contributors.cntrb_login AS login + FROM augur_data.pull_requests, + (augur_data.pull_request_events + LEFT JOIN augur_data.contributors ON ((contributors.cntrb_id = pull_request_events.cntrb_id))) + WHERE ((pull_requests.pull_request_id = pull_request_events.pull_request_id) AND ((pull_request_events.action)::text = 'merged'::text)) + UNION ALL + SELECT issue_events.cntrb_id AS id, + issue_events.created_at, + issues.repo_id, + 'issue_closed'::text AS action, + contributors.cntrb_login AS login + FROM augur_data.issues, + (augur_data.issue_events + LEFT JOIN augur_data.contributors ON ((contributors.cntrb_id = issue_events.cntrb_id))) + WHERE ((issues.issue_id = issue_events.issue_id) AND (issues.pull_request IS NULL) AND ((issue_events.action)::text = 'closed'::text)) + UNION ALL + SELECT pull_request_reviews.cntrb_id AS id, + pull_request_reviews.pr_review_submitted_at AS created_at, + pull_requests.repo_id, + ('pull_request_review_'::text || (pull_request_reviews.pr_review_state)::text) AS action, + contributors.cntrb_login AS login + FROM augur_data.pull_requests, + (augur_data.pull_request_reviews + LEFT JOIN augur_data.contributors ON ((contributors.cntrb_id = pull_request_reviews.cntrb_id))) + WHERE (pull_requests.pull_request_id = pull_request_reviews.pull_request_id) + UNION ALL + SELECT pull_requests.pr_augur_contributor_id AS id, + pull_requests.pr_created_at AS created_at, + pull_requests.repo_id, + 'pull_request_open'::text AS action, + contributors.cntrb_login AS login + FROM (augur_data.pull_requests + LEFT JOIN augur_data.contributors ON ((pull_requests.pr_augur_contributor_id = contributors.cntrb_id))) + UNION ALL + SELECT message.cntrb_id AS id, + message.msg_timestamp AS created_at, + pull_requests.repo_id, + 'pull_request_comment'::text AS action, + contributors.cntrb_login AS login + FROM augur_data.pull_requests, + augur_data.pull_request_message_ref, + (augur_data.message + LEFT JOIN augur_data.contributors ON ((contributors.cntrb_id = message.cntrb_id))) + WHERE ((pull_request_message_ref.pull_request_id = pull_requests.pull_request_id) AND (pull_request_message_ref.msg_id = message.msg_id)) + UNION ALL + SELECT issues.reporter_id AS id, + message.msg_timestamp AS created_at, + issues.repo_id, + 'issue_comment'::text AS action, + contributors.cntrb_login AS login + FROM augur_data.issues, + augur_data.issue_message_ref, + (augur_data.message + LEFT JOIN augur_data.contributors ON ((contributors.cntrb_id = message.cntrb_id))) + WHERE ((issue_message_ref.msg_id = message.msg_id) AND (issues.issue_id = issue_message_ref.issue_id) AND (issues.closed_at <> message.msg_timestamp))) a, + augur_data.repo + WHERE (a.repo_id = repo.repo_id) + ORDER BY a.created_at DESC""" + +# --------------------------------------------------------------------------- +# View 8: explorer_contributor_actions (source: migration 25, recreated) +# --------------------------------------------------------------------------- +_EXPLORER_CONTRIBUTOR_ACTIONS = """\ +SELECT a.id AS cntrb_id, + a.created_at, + a.repo_id, + a.action, + repo.repo_name, + a.login, + row_number() OVER (PARTITION BY a.id, a.repo_id ORDER BY a.created_at desc) AS rank + FROM ( SELECT commits.cmt_ght_author_id AS id, + commits.cmt_author_timestamp AS created_at, + commits.repo_id, + 'commit'::text AS action, + contributors.cntrb_login AS login + FROM (augur_data.commits + LEFT JOIN augur_data.contributors ON (((contributors.cntrb_id)::text = (commits.cmt_ght_author_id)::text))) + GROUP BY commits.cmt_commit_hash, commits.cmt_ght_author_id, commits.repo_id, commits.cmt_author_timestamp, 'commit'::text, contributors.cntrb_login + UNION ALL + SELECT issues.reporter_id AS id, + issues.created_at, + issues.repo_id, + 'issue_opened'::text AS action, + contributors.cntrb_login AS login + FROM (augur_data.issues + LEFT JOIN augur_data.contributors ON ((contributors.cntrb_id = issues.reporter_id))) + WHERE (issues.pull_request IS NULL) + UNION ALL + SELECT pull_request_events.cntrb_id AS id, + pull_request_events.created_at, + pull_requests.repo_id, + 'pull_request_closed'::text AS action, + contributors.cntrb_login AS login + FROM augur_data.pull_requests, + (augur_data.pull_request_events + LEFT JOIN augur_data.contributors ON ((contributors.cntrb_id = pull_request_events.cntrb_id))) + WHERE ((pull_requests.pull_request_id = pull_request_events.pull_request_id) AND (pull_requests.pr_merged_at IS NULL) AND ((pull_request_events.action)::text = 'closed'::text)) + UNION ALL + SELECT pull_request_events.cntrb_id AS id, + pull_request_events.created_at, + pull_requests.repo_id, + 'pull_request_merged'::text AS action, + contributors.cntrb_login AS login + FROM augur_data.pull_requests, + (augur_data.pull_request_events + LEFT JOIN augur_data.contributors ON ((contributors.cntrb_id = pull_request_events.cntrb_id))) + WHERE ((pull_requests.pull_request_id = pull_request_events.pull_request_id) AND ((pull_request_events.action)::text = 'merged'::text)) + UNION ALL + SELECT issue_events.cntrb_id AS id, + issue_events.created_at, + issues.repo_id, + 'issue_closed'::text AS action, + contributors.cntrb_login AS login + FROM augur_data.issues, + (augur_data.issue_events + LEFT JOIN augur_data.contributors ON ((contributors.cntrb_id = issue_events.cntrb_id))) + WHERE ((issues.issue_id = issue_events.issue_id) AND (issues.pull_request IS NULL) AND ((issue_events.action)::text = 'closed'::text)) + UNION ALL + SELECT pull_request_reviews.cntrb_id AS id, + pull_request_reviews.pr_review_submitted_at AS created_at, + pull_requests.repo_id, + ('pull_request_review_'::text || (pull_request_reviews.pr_review_state)::text) AS action, + contributors.cntrb_login AS login + FROM augur_data.pull_requests, + (augur_data.pull_request_reviews + LEFT JOIN augur_data.contributors ON ((contributors.cntrb_id = pull_request_reviews.cntrb_id))) + WHERE (pull_requests.pull_request_id = pull_request_reviews.pull_request_id) + UNION ALL + SELECT pull_requests.pr_augur_contributor_id AS id, + pull_requests.pr_created_at AS created_at, + pull_requests.repo_id, + 'pull_request_open'::text AS action, + contributors.cntrb_login AS login + FROM (augur_data.pull_requests + LEFT JOIN augur_data.contributors ON ((pull_requests.pr_augur_contributor_id = contributors.cntrb_id))) + UNION ALL + SELECT message.cntrb_id AS id, + message.msg_timestamp AS created_at, + pull_requests.repo_id, + 'pull_request_comment'::text AS action, + contributors.cntrb_login AS login + FROM augur_data.pull_requests, + augur_data.pull_request_message_ref, + (augur_data.message + LEFT JOIN augur_data.contributors ON ((contributors.cntrb_id = message.cntrb_id))) + WHERE ((pull_request_message_ref.pull_request_id = pull_requests.pull_request_id) AND (pull_request_message_ref.msg_id = message.msg_id)) + UNION ALL + SELECT issues.reporter_id AS id, + message.msg_timestamp AS created_at, + issues.repo_id, + 'issue_comment'::text AS action, + contributors.cntrb_login AS login + FROM augur_data.issues, + augur_data.issue_message_ref, + (augur_data.message + LEFT JOIN augur_data.contributors ON ((contributors.cntrb_id = message.cntrb_id))) + WHERE ((issue_message_ref.msg_id = message.msg_id) AND (issues.issue_id = issue_message_ref.issue_id) AND (issues.closed_at <> message.msg_timestamp))) a, + augur_data.repo + WHERE (a.repo_id = repo.repo_id) + ORDER BY a.created_at DESC""" + +# --------------------------------------------------------------------------- +# View 9: explorer_new_contributors (source: migration 25, recreated) +# --------------------------------------------------------------------------- +_EXPLORER_NEW_CONTRIBUTORS = """\ +SELECT x.cntrb_id, + x.created_at, + x.month, + x.year, + x.repo_id, + x.repo_name, + x.full_name, + x.login, + x.rank + FROM ( SELECT b.cntrb_id, + b.created_at, + b.month, + b.year, + b.repo_id, + b.repo_name, + b.full_name, + b.login, + b.action, + b.rank + FROM ( SELECT a.id AS cntrb_id, + a.created_at, + date_part('month'::text, (a.created_at)::date) AS month, + date_part('year'::text, (a.created_at)::date) AS year, + a.repo_id, + repo.repo_name, + a.full_name, + a.login, + a.action, + row_number() OVER (PARTITION BY a.id, a.repo_id ORDER BY a.created_at desc) AS rank + FROM ( SELECT canonical_full_names.canonical_id AS id, + issues.created_at, + issues.repo_id, + 'issue_opened'::text AS action, + contributors.cntrb_full_name AS full_name, + contributors.cntrb_login AS login + FROM ((augur_data.issues + LEFT JOIN augur_data.contributors ON ((contributors.cntrb_id = issues.reporter_id))) + LEFT JOIN ( SELECT DISTINCT ON (contributors_1.cntrb_canonical) contributors_1.cntrb_full_name, + contributors_1.cntrb_canonical AS canonical_email, + contributors_1.data_collection_date, + contributors_1.cntrb_id AS canonical_id + FROM augur_data.contributors contributors_1 + WHERE ((contributors_1.cntrb_canonical)::text = (contributors_1.cntrb_email)::text) + ORDER BY contributors_1.cntrb_canonical) canonical_full_names ON (((canonical_full_names.canonical_email)::text = (contributors.cntrb_canonical)::text))) + WHERE (issues.pull_request IS NULL) + GROUP BY canonical_full_names.canonical_id, issues.repo_id, issues.created_at, contributors.cntrb_full_name, contributors.cntrb_login + UNION ALL + SELECT canonical_full_names.canonical_id AS id, + to_timestamp((commits.cmt_author_date)::text, 'YYYY-MM-DD'::text) AS created_at, + commits.repo_id, + 'commit'::text AS action, + contributors.cntrb_full_name AS full_name, + contributors.cntrb_login AS login + FROM ((augur_data.commits + LEFT JOIN augur_data.contributors ON (((contributors.cntrb_canonical)::text = (commits.cmt_author_email)::text))) + LEFT JOIN ( SELECT DISTINCT ON (contributors_1.cntrb_canonical) contributors_1.cntrb_full_name, + contributors_1.cntrb_canonical AS canonical_email, + contributors_1.data_collection_date, + contributors_1.cntrb_id AS canonical_id + FROM augur_data.contributors contributors_1 + WHERE ((contributors_1.cntrb_canonical)::text = (contributors_1.cntrb_email)::text) + ORDER BY contributors_1.cntrb_canonical) canonical_full_names ON (((canonical_full_names.canonical_email)::text = (contributors.cntrb_canonical)::text))) + GROUP BY commits.repo_id, canonical_full_names.canonical_email, canonical_full_names.canonical_id, commits.cmt_author_date, contributors.cntrb_full_name, contributors.cntrb_login + UNION ALL + SELECT message.cntrb_id AS id, + commit_comment_ref.created_at, + commits.repo_id, + 'commit_comment'::text AS action, + contributors.cntrb_full_name AS full_name, + contributors.cntrb_login AS login + FROM augur_data.commit_comment_ref, + augur_data.commits, + ((augur_data.message + LEFT JOIN augur_data.contributors ON ((contributors.cntrb_id = message.cntrb_id))) + LEFT JOIN ( SELECT DISTINCT ON (contributors_1.cntrb_canonical) contributors_1.cntrb_full_name, + contributors_1.cntrb_canonical AS canonical_email, + contributors_1.data_collection_date, + contributors_1.cntrb_id AS canonical_id + FROM augur_data.contributors contributors_1 + WHERE ((contributors_1.cntrb_canonical)::text = (contributors_1.cntrb_email)::text) + ORDER BY contributors_1.cntrb_canonical) canonical_full_names ON (((canonical_full_names.canonical_email)::text = (contributors.cntrb_canonical)::text))) + WHERE ((commits.cmt_id = commit_comment_ref.cmt_id) AND (commit_comment_ref.msg_id = message.msg_id)) + GROUP BY message.cntrb_id, commits.repo_id, commit_comment_ref.created_at, contributors.cntrb_full_name, contributors.cntrb_login + UNION ALL + SELECT issue_events.cntrb_id AS id, + issue_events.created_at, + issues.repo_id, + 'issue_closed'::text AS action, + contributors.cntrb_full_name AS full_name, + contributors.cntrb_login AS login + FROM augur_data.issues, + ((augur_data.issue_events + LEFT JOIN augur_data.contributors ON ((contributors.cntrb_id = issue_events.cntrb_id))) + LEFT JOIN ( SELECT DISTINCT ON (contributors_1.cntrb_canonical) contributors_1.cntrb_full_name, + contributors_1.cntrb_canonical AS canonical_email, + contributors_1.data_collection_date, + contributors_1.cntrb_id AS canonical_id + FROM augur_data.contributors contributors_1 + WHERE ((contributors_1.cntrb_canonical)::text = (contributors_1.cntrb_email)::text) + ORDER BY contributors_1.cntrb_canonical) canonical_full_names ON (((canonical_full_names.canonical_email)::text = (contributors.cntrb_canonical)::text))) + WHERE ((issues.issue_id = issue_events.issue_id) AND (issues.pull_request IS NULL) AND (issue_events.cntrb_id IS NOT NULL) AND ((issue_events.action)::text = 'closed'::text)) + GROUP BY issue_events.cntrb_id, issues.repo_id, issue_events.created_at, contributors.cntrb_full_name, contributors.cntrb_login + UNION ALL + SELECT pull_requests.pr_augur_contributor_id AS id, + pull_requests.pr_created_at AS created_at, + pull_requests.repo_id, + 'open_pull_request'::text AS action, + contributors.cntrb_full_name AS full_name, + contributors.cntrb_login AS login + FROM ((augur_data.pull_requests + LEFT JOIN augur_data.contributors ON ((pull_requests.pr_augur_contributor_id = contributors.cntrb_id))) + LEFT JOIN ( SELECT DISTINCT ON (contributors_1.cntrb_canonical) contributors_1.cntrb_full_name, + contributors_1.cntrb_canonical AS canonical_email, + contributors_1.data_collection_date, + contributors_1.cntrb_id AS canonical_id + FROM augur_data.contributors contributors_1 + WHERE ((contributors_1.cntrb_canonical)::text = (contributors_1.cntrb_email)::text) + ORDER BY contributors_1.cntrb_canonical) canonical_full_names ON (((canonical_full_names.canonical_email)::text = (contributors.cntrb_canonical)::text))) + GROUP BY pull_requests.pr_augur_contributor_id, pull_requests.repo_id, pull_requests.pr_created_at, contributors.cntrb_full_name, contributors.cntrb_login + UNION ALL + SELECT message.cntrb_id AS id, + message.msg_timestamp AS created_at, + pull_requests.repo_id, + 'pull_request_comment'::text AS action, + contributors.cntrb_full_name AS full_name, + contributors.cntrb_login AS login + FROM augur_data.pull_requests, + augur_data.pull_request_message_ref, + ((augur_data.message + LEFT JOIN augur_data.contributors ON ((contributors.cntrb_id = message.cntrb_id))) + LEFT JOIN ( SELECT DISTINCT ON (contributors_1.cntrb_canonical) contributors_1.cntrb_full_name, + contributors_1.cntrb_canonical AS canonical_email, + contributors_1.data_collection_date, + contributors_1.cntrb_id AS canonical_id + FROM augur_data.contributors contributors_1 + WHERE ((contributors_1.cntrb_canonical)::text = (contributors_1.cntrb_email)::text) + ORDER BY contributors_1.cntrb_canonical) canonical_full_names ON (((canonical_full_names.canonical_email)::text = (contributors.cntrb_canonical)::text))) + WHERE ((pull_request_message_ref.pull_request_id = pull_requests.pull_request_id) AND (pull_request_message_ref.msg_id = message.msg_id)) + GROUP BY message.cntrb_id, pull_requests.repo_id, message.msg_timestamp, contributors.cntrb_full_name, contributors.cntrb_login + UNION ALL + SELECT issues.reporter_id AS id, + message.msg_timestamp AS created_at, + issues.repo_id, + 'issue_comment'::text AS action, + contributors.cntrb_full_name AS full_name, + contributors.cntrb_login AS login + FROM augur_data.issues, + augur_data.issue_message_ref, + ((augur_data.message + LEFT JOIN augur_data.contributors ON ((contributors.cntrb_id = message.cntrb_id))) + LEFT JOIN ( SELECT DISTINCT ON (contributors_1.cntrb_canonical) contributors_1.cntrb_full_name, + contributors_1.cntrb_canonical AS canonical_email, + contributors_1.data_collection_date, + contributors_1.cntrb_id AS canonical_id + FROM augur_data.contributors contributors_1 + WHERE ((contributors_1.cntrb_canonical)::text = (contributors_1.cntrb_email)::text) + ORDER BY contributors_1.cntrb_canonical) canonical_full_names ON (((canonical_full_names.canonical_email)::text = (contributors.cntrb_canonical)::text))) + WHERE ((issue_message_ref.msg_id = message.msg_id) AND (issues.issue_id = issue_message_ref.issue_id) AND (issues.pull_request_id = NULL::bigint)) + GROUP BY issues.reporter_id, issues.repo_id, message.msg_timestamp, contributors.cntrb_full_name, contributors.cntrb_login) a, + augur_data.repo + WHERE ((a.id IS NOT NULL) AND (a.repo_id = repo.repo_id)) + GROUP BY a.id, a.repo_id, a.action, a.created_at, repo.repo_name, a.full_name, a.login + ORDER BY a.id) b + WHERE (b.rank = ANY (ARRAY[(1)::bigint, (2)::bigint, (3)::bigint, (4)::bigint, (5)::bigint, (6)::bigint, (7)::bigint]))) x""" + +# --------------------------------------------------------------------------- +# View 10: explorer_pr_assignments (source: migration 26) +# --------------------------------------------------------------------------- +_EXPLORER_PR_ASSIGNMENTS = """\ +SELECT + pr.pull_request_id, + pr.repo_id AS ID, + pr.pr_created_at AS created, + pr.pr_closed_at AS closed, + pre.created_at AS assign_date, + pre.ACTION AS assignment_action, + pre.cntrb_id AS assignee, + pre.node_id AS node_id +FROM + ( + augur_data.pull_requests pr + LEFT JOIN augur_data.pull_request_events pre ON ( + ( + ( pr.pull_request_id = pre.pull_request_id ) + AND ( + ( pre.ACTION ) :: TEXT = ANY ( ARRAY [ ( 'unassigned' :: CHARACTER VARYING ) :: TEXT, ( 'assigned' :: CHARACTER VARYING ) :: TEXT ] ) + ) + ) + ) + )""" + +# --------------------------------------------------------------------------- +# View 11: explorer_pr_response (source: migration 26) +# --------------------------------------------------------------------------- +_EXPLORER_PR_RESPONSE = """\ +SELECT pr.pull_request_id, + pr.repo_id AS id, + pr.pr_augur_contributor_id AS cntrb_id, + m.msg_timestamp, + m.msg_cntrb_id, + pr.pr_created_at, + pr.pr_closed_at + FROM (augur_data.pull_requests pr + LEFT JOIN ( SELECT prr.pull_request_id, + m_1.msg_timestamp, + m_1.cntrb_id AS msg_cntrb_id + FROM augur_data.pull_request_review_message_ref prrmr, + augur_data.pull_requests pr_1, + augur_data.message m_1, + augur_data.pull_request_reviews prr + WHERE ((prrmr.pr_review_id = prr.pr_review_id) AND (prrmr.msg_id = m_1.msg_id) AND (prr.pull_request_id = pr_1.pull_request_id)) + UNION + SELECT prmr.pull_request_id, + m_1.msg_timestamp, + m_1.cntrb_id AS msg_cntrb_id + FROM augur_data.pull_request_message_ref prmr, + augur_data.pull_requests pr_1, + augur_data.message m_1 + WHERE ((prmr.pull_request_id = pr_1.pull_request_id) AND (prmr.msg_id = m_1.msg_id))) m ON ((m.pull_request_id = pr.pull_request_id)))""" + +# --------------------------------------------------------------------------- +# View 12: explorer_user_repos (source: migration 26) +# --------------------------------------------------------------------------- +_EXPLORER_USER_REPOS = """\ +SELECT a.login_name, + a.user_id, + b.group_id, + c.repo_id + FROM augur_operations.users a, + augur_operations.user_groups b, + augur_operations.user_repos c + WHERE ((a.user_id = b.user_id) AND (b.group_id = c.group_id)) + ORDER BY a.user_id""" + +# --------------------------------------------------------------------------- +# View 13: explorer_pr_response_times (source: migration 26) +# --------------------------------------------------------------------------- +_EXPLORER_PR_RESPONSE_TIMES = """\ +SELECT repo.repo_id, + pull_requests.pr_src_id, + repo.repo_name, + pull_requests.pr_src_author_association, + repo_groups.rg_name AS repo_group, + pull_requests.pr_src_state, + pull_requests.pr_merged_at, + pull_requests.pr_created_at, + pull_requests.pr_closed_at, + date_part('year'::text, (pull_requests.pr_created_at)::date) AS created_year, + date_part('month'::text, (pull_requests.pr_created_at)::date) AS created_month, + date_part('year'::text, (pull_requests.pr_closed_at)::date) AS closed_year, + date_part('month'::text, (pull_requests.pr_closed_at)::date) AS closed_month, + base_labels.pr_src_meta_label, + base_labels.pr_head_or_base, + ((EXTRACT(epoch FROM pull_requests.pr_closed_at) - EXTRACT(epoch FROM pull_requests.pr_created_at)) / (3600)::numeric) AS hours_to_close, + ((EXTRACT(epoch FROM pull_requests.pr_closed_at) - EXTRACT(epoch FROM pull_requests.pr_created_at)) / (86400)::numeric) AS days_to_close, + ((EXTRACT(epoch FROM response_times.first_response_time) - EXTRACT(epoch FROM pull_requests.pr_created_at)) / (3600)::numeric) AS hours_to_first_response, + ((EXTRACT(epoch FROM response_times.first_response_time) - EXTRACT(epoch FROM pull_requests.pr_created_at)) / (86400)::numeric) AS days_to_first_response, + ((EXTRACT(epoch FROM response_times.last_response_time) - EXTRACT(epoch FROM pull_requests.pr_created_at)) / (3600)::numeric) AS hours_to_last_response, + ((EXTRACT(epoch FROM response_times.last_response_time) - EXTRACT(epoch FROM pull_requests.pr_created_at)) / (86400)::numeric) AS days_to_last_response, + response_times.first_response_time, + response_times.last_response_time, + response_times.average_time_between_responses, + response_times.assigned_count, + response_times.review_requested_count, + response_times.labeled_count, + response_times.subscribed_count, + response_times.mentioned_count, + response_times.referenced_count, + response_times.closed_count, + response_times.head_ref_force_pushed_count, + response_times.merged_count, + response_times.milestoned_count, + response_times.unlabeled_count, + response_times.head_ref_deleted_count, + response_times.comment_count, + master_merged_counts.lines_added, + master_merged_counts.lines_removed, + all_commit_counts.commit_count, + master_merged_counts.file_count + FROM augur_data.repo, + augur_data.repo_groups, + ((((augur_data.pull_requests + LEFT JOIN ( SELECT pull_requests_1.pull_request_id, + count(*) FILTER (WHERE ((pull_request_events.action)::text = 'assigned'::text)) AS assigned_count, + count(*) FILTER (WHERE ((pull_request_events.action)::text = 'review_requested'::text)) AS review_requested_count, + count(*) FILTER (WHERE ((pull_request_events.action)::text = 'labeled'::text)) AS labeled_count, + count(*) FILTER (WHERE ((pull_request_events.action)::text = 'unlabeled'::text)) AS unlabeled_count, + count(*) FILTER (WHERE ((pull_request_events.action)::text = 'subscribed'::text)) AS subscribed_count, + count(*) FILTER (WHERE ((pull_request_events.action)::text = 'mentioned'::text)) AS mentioned_count, + count(*) FILTER (WHERE ((pull_request_events.action)::text = 'referenced'::text)) AS referenced_count, + count(*) FILTER (WHERE ((pull_request_events.action)::text = 'closed'::text)) AS closed_count, + count(*) FILTER (WHERE ((pull_request_events.action)::text = 'head_ref_force_pushed'::text)) AS head_ref_force_pushed_count, + count(*) FILTER (WHERE ((pull_request_events.action)::text = 'head_ref_deleted'::text)) AS head_ref_deleted_count, + count(*) FILTER (WHERE ((pull_request_events.action)::text = 'milestoned'::text)) AS milestoned_count, + count(*) FILTER (WHERE ((pull_request_events.action)::text = 'merged'::text)) AS merged_count, + min(message.msg_timestamp) AS first_response_time, + count(DISTINCT message.msg_timestamp) AS comment_count, + max(message.msg_timestamp) AS last_response_time, + ((max(message.msg_timestamp) - min(message.msg_timestamp)) / (count(DISTINCT message.msg_timestamp))::double precision) AS average_time_between_responses + FROM augur_data.pull_request_events, + augur_data.pull_requests pull_requests_1, + augur_data.repo repo_1, + augur_data.pull_request_message_ref, + augur_data.message + WHERE ((repo_1.repo_id = pull_requests_1.repo_id) AND (pull_requests_1.pull_request_id = pull_request_events.pull_request_id) AND (pull_requests_1.pull_request_id = pull_request_message_ref.pull_request_id) AND (pull_request_message_ref.msg_id = message.msg_id)) + GROUP BY pull_requests_1.pull_request_id) response_times ON ((pull_requests.pull_request_id = response_times.pull_request_id))) + LEFT JOIN ( SELECT pull_request_commits.pull_request_id, + count(DISTINCT pull_request_commits.pr_cmt_sha) AS commit_count + FROM augur_data.pull_request_commits, + augur_data.pull_requests pull_requests_1, + augur_data.pull_request_meta + WHERE ((pull_requests_1.pull_request_id = pull_request_commits.pull_request_id) AND (pull_requests_1.pull_request_id = pull_request_meta.pull_request_id) AND ((pull_request_commits.pr_cmt_sha)::text <> (pull_requests_1.pr_merge_commit_sha)::text) AND ((pull_request_commits.pr_cmt_sha)::text <> (pull_request_meta.pr_sha)::text)) + GROUP BY pull_request_commits.pull_request_id) all_commit_counts ON ((pull_requests.pull_request_id = all_commit_counts.pull_request_id))) + LEFT JOIN ( SELECT max(pull_request_meta.pr_repo_meta_id) AS max, + pull_request_meta.pull_request_id, + pull_request_meta.pr_head_or_base, + pull_request_meta.pr_src_meta_label + FROM augur_data.pull_requests pull_requests_1, + augur_data.pull_request_meta + WHERE ((pull_requests_1.pull_request_id = pull_request_meta.pull_request_id) AND ((pull_request_meta.pr_head_or_base)::text = 'base'::text)) + GROUP BY pull_request_meta.pull_request_id, pull_request_meta.pr_head_or_base, pull_request_meta.pr_src_meta_label) base_labels ON ((base_labels.pull_request_id = all_commit_counts.pull_request_id))) + LEFT JOIN ( SELECT sum(commits.cmt_added) AS lines_added, + sum(commits.cmt_removed) AS lines_removed, + pull_request_commits.pull_request_id, + count(DISTINCT commits.cmt_filename) AS file_count + FROM augur_data.pull_request_commits, + augur_data.commits, + augur_data.pull_requests pull_requests_1, + augur_data.pull_request_meta + WHERE (((commits.cmt_commit_hash)::text = (pull_request_commits.pr_cmt_sha)::text) AND (pull_requests_1.pull_request_id = pull_request_commits.pull_request_id) AND (pull_requests_1.pull_request_id = pull_request_meta.pull_request_id) AND (commits.repo_id = pull_requests_1.repo_id) AND ((commits.cmt_commit_hash)::text <> (pull_requests_1.pr_merge_commit_sha)::text) AND ((commits.cmt_commit_hash)::text <> (pull_request_meta.pr_sha)::text)) + GROUP BY pull_request_commits.pull_request_id) master_merged_counts ON ((base_labels.pull_request_id = master_merged_counts.pull_request_id))) + WHERE ((repo.repo_group_id = repo_groups.repo_group_id) AND (repo.repo_id = pull_requests.repo_id)) + ORDER BY response_times.merged_count DESC""" + +# --------------------------------------------------------------------------- +# View 14: explorer_issue_assignments (source: migration 26) +# --------------------------------------------------------------------------- +_EXPLORER_ISSUE_ASSIGNMENTS = """\ +SELECT + i.issue_id, + i.repo_id AS ID, + i.created_at AS created, + i.closed_at AS closed, + ie.created_at AS assign_date, + ie.ACTION AS assignment_action, + ie.cntrb_id AS assignee, + ie.node_id as node_id +FROM + ( + augur_data.issues i + LEFT JOIN augur_data.issue_events ie ON ( + ( + ( i.issue_id = ie.issue_id ) + AND ( + ( ie.ACTION ) :: TEXT = ANY ( ARRAY [ ( 'unassigned' :: CHARACTER VARYING ) :: TEXT, ( 'assigned' :: CHARACTER VARYING ) :: TEXT ] ) + ) + ) + ) + )""" + +# --------------------------------------------------------------------------- +# View 15: explorer_repo_languages (source: migration 28) +# --------------------------------------------------------------------------- +_EXPLORER_REPO_LANGUAGES = """\ +SELECT e.repo_id, + repo.repo_git, + repo.repo_name, + e.programming_language, + e.code_lines, + e.files + FROM augur_data.repo, + ( SELECT d.repo_id, + d.programming_language, + sum(d.code_lines) AS code_lines, + (count(*))::integer AS files + FROM ( SELECT repo_labor.repo_id, + repo_labor.programming_language, + repo_labor.code_lines + FROM augur_data.repo_labor, + ( SELECT repo_labor_1.repo_id, + max(repo_labor_1.data_collection_date) AS last_collected + FROM augur_data.repo_labor repo_labor_1 + GROUP BY repo_labor_1.repo_id) recent + WHERE ((repo_labor.repo_id = recent.repo_id) AND (repo_labor.data_collection_date > (recent.last_collected - ((5)::double precision * '00:01:00'::interval))))) d + GROUP BY d.repo_id, d.programming_language) e + WHERE (repo.repo_id = e.repo_id) + ORDER BY e.repo_id""" + + +# ============================================================================ +# Registry: single source of truth for all materialized views +# ============================================================================ + +MATERIALIZED_VIEWS = [ + # --- View 1: legacy DDL (augur_full.sql), no unique index --- + { + "name": "issue_reporter_created_at", + "schema": "augur_data", + "sql": _ISSUE_REPORTER_CREATED_AT, + "unique_index_columns": [], # only a non-unique btree on repo_id + }, + # --- Views 2-6: from migration 4, indexes from migration 25 --- + { + "name": "api_get_all_repo_prs", + "schema": "augur_data", + "sql": _API_GET_ALL_REPO_PRS, + "unique_index_columns": ["repo_id"], + }, + { + "name": "explorer_entry_list", + "schema": "augur_data", + "sql": _EXPLORER_ENTRY_LIST, + "unique_index_columns": ["repo_id"], + }, + { + "name": "explorer_commits_and_committers_daily_count", + "schema": "augur_data", + "sql": _EXPLORER_COMMITS_AND_COMMITTERS_DAILY_COUNT, + "unique_index_columns": ["repo_id", "cmt_committer_date"], + }, + { + "name": "api_get_all_repos_commits", + "schema": "augur_data", + "sql": _API_GET_ALL_REPOS_COMMITS, + "unique_index_columns": ["repo_id"], + }, + { + "name": "api_get_all_repos_issues", + "schema": "augur_data", + "sql": _API_GET_ALL_REPOS_ISSUES, + "unique_index_columns": ["repo_id"], + }, + # --- Views 6-8: from migration 25, recreated --- + { + "name": "augur_new_contributors", + "schema": "augur_data", + "sql": _AUGUR_NEW_CONTRIBUTORS, + "unique_index_columns": ["cntrb_id", "created_at", "repo_id", "repo_name", "login", "rank"], + }, + { + "name": "explorer_contributor_actions", + "schema": "augur_data", + "sql": _EXPLORER_CONTRIBUTOR_ACTIONS, + "unique_index_columns": ["cntrb_id", "created_at", "repo_id", "action", "repo_name", "login", "rank"], + }, + { + "name": "explorer_new_contributors", + "schema": "augur_data", + "sql": _EXPLORER_NEW_CONTRIBUTORS, + "unique_index_columns": ["cntrb_id", "created_at", "month", "year", "repo_id", "full_name", "repo_name", "login", "rank"], + }, + # --- Views 9-13: from migration 26 --- + { + "name": "explorer_pr_assignments", + "schema": "augur_data", + "sql": _EXPLORER_PR_ASSIGNMENTS, + "unique_index_columns": ["pull_request_id", "id", "node_id"], + }, + { + "name": "explorer_pr_response", + "schema": "augur_data", + "sql": _EXPLORER_PR_RESPONSE, + "unique_index_columns": ["pull_request_id", "id", "cntrb_id", "msg_cntrb_id", "msg_timestamp"], + }, + { + "name": "explorer_user_repos", + "schema": "augur_data", + "sql": _EXPLORER_USER_REPOS, + "unique_index_columns": ["login_name", "user_id", "group_id", "repo_id"], + }, + { + "name": "explorer_pr_response_times", + "schema": "augur_data", + "sql": _EXPLORER_PR_RESPONSE_TIMES, + "unique_index_columns": ["repo_id", "pr_src_id", "pr_src_meta_label"], + }, + { + "name": "explorer_issue_assignments", + "schema": "augur_data", + "sql": _EXPLORER_ISSUE_ASSIGNMENTS, + "unique_index_columns": ["issue_id", "id", "node_id"], + }, + # --- View 15: from migration 28 --- + { + "name": "explorer_repo_languages", + "schema": "augur_data", + "sql": _EXPLORER_REPO_LANGUAGES, + "unique_index_columns": ["repo_id", "programming_language"], + }, +] + + +def get_refresh_sql(view, concurrently=True): + """Construct a REFRESH MATERIALIZED VIEW statement. + + Args: + view: A dict from MATERIALIZED_VIEWS. + concurrently: If True, use CONCURRENTLY (requires a unique index + and the view to have been populated at least once). + + Returns: + SQL string ready to be wrapped in sqlalchemy.sql.text(). + """ + mode = "CONCURRENTLY " if concurrently else "" + return ( + f"REFRESH MATERIALIZED VIEW {mode}" + f"{view['schema']}.{view['name']} WITH DATA;" + ) diff --git a/collectoss/application/schema/alembic/env.py b/collectoss/application/schema/alembic/env.py index d7f160d49..f7d1674c1 100644 --- a/collectoss/application/schema/alembic/env.py +++ b/collectoss/application/schema/alembic/env.py @@ -26,6 +26,35 @@ # target_metadata = mymodel.Base.metadata target_metadata = Base.metadata +# NOTE FOR DEVELOPERS: alembic_utils manages materialized view definitions via +# DROP + CREATE (replace). When a view is replaced: +# +# 1. ALL indexes are destroyed. Manually add CREATE UNIQUE INDEX statements +# after the replace op, using unique_index_columns from the registry +# (collectoss/application/db/materialized_views.py). +# +# 2. The view is recreated WITH NO DATA. You CANNOT run REFRESH CONCURRENTLY +# immediately — it requires both a unique index and pre-existing data. +# After recreating the index, run a non-concurrent refresh first: +# REFRESH MATERIALIZED VIEW augur_data. WITH DATA; +# Only after that will the Celery refresh task's CONCURRENTLY succeed. +# +# WARNING: If MATERIALIZED_VIEWS is ever emptied, autogenerate will propose +# dropping all registered views. Keep the list complete. +from alembic_utils.pg_materialized_view import PGMaterializedView +from alembic_utils.replaceable_entity import register_entities +from collectoss.application.db.materialized_views import MATERIALIZED_VIEWS +_materialized_view_entities = [ + PGMaterializedView( + schema=view["schema"], + signature=view["name"], + definition=view["sql"], + with_data=False, + ) + for view in MATERIALIZED_VIEWS +] +register_entities(_materialized_view_entities, entity_types=[PGMaterializedView]) + # other values from the config, defined by the needs of env.py, # can be acquired: # my_important_option = config.get_main_option("my_important_option") diff --git a/collectoss/tasks/db/refresh_materialized_views.py b/collectoss/tasks/db/refresh_materialized_views.py index 95f169722..d3734bbaf 100644 --- a/collectoss/tasks/db/refresh_materialized_views.py +++ b/collectoss/tasks/db/refresh_materialized_views.py @@ -3,7 +3,7 @@ import sqlalchemy as s from collectoss.tasks.init.celery_app import celery_app as celery -from collectoss.application.db.lib import execute_sql +from collectoss.application.db.materialized_views import MATERIALIZED_VIEWS, get_refresh_sql from collectoss.tasks.git.util.facade_worker.facade_worker.config import FacadeHelper from collectoss.tasks.git.util.facade_worker.facade_worker.rebuildcache import invalidate_caches, rebuild_unknown_affiliation_and_web_caches @@ -11,189 +11,46 @@ @celery.task(bind=True) def refresh_materialized_views(self): - #self.logger = SystemLogger("data_collection_jobs").get_logger() - - engine = self.app.engine - logger = logging.getLogger(refresh_materialized_views.__name__) - #self.logger = logging.getLogger(refresh_materialized_views.__name__) - - mv1_refresh = s.sql.text(""" - REFRESH MATERIALIZED VIEW concurrently augur_data.api_get_all_repo_prs with data; - COMMIT; - """) - - mv2_refresh = s.sql.text(""" - REFRESH MATERIALIZED VIEW concurrently augur_data.api_get_all_repos_commits with data; - COMMIT; - """) - - mv3_refresh = s.sql.text(""" - REFRESH MATERIALIZED VIEW concurrently augur_data.api_get_all_repos_issues with data; - COMMIT; - """) - - mv4_refresh = s.sql.text(""" - REFRESH MATERIALIZED VIEW concurrently augur_data.augur_new_contributors with data; - COMMIT; - """) - mv5_refresh = s.sql.text(""" - REFRESH MATERIALIZED VIEW concurrently augur_data.explorer_commits_and_committers_daily_count with data; - COMMIT; - """) - - mv6_refresh = s.sql.text(""" - REFRESH MATERIALIZED VIEW concurrently augur_data.explorer_new_contributors with data; - COMMIT; - """) - - mv7_refresh = s.sql.text(""" - REFRESH MATERIALIZED VIEW concurrently augur_data.explorer_entry_list with data; - COMMIT; - """) - - mv8_refresh = s.sql.text(""" - - REFRESH MATERIALIZED VIEW concurrently augur_data.explorer_contributor_actions with data; - COMMIT; - """) - - mv9_refresh = s.sql.text(""" - - REFRESH MATERIALIZED VIEW concurrently augur_data.explorer_user_repos with data; - COMMIT; - """) - - mv10_refresh = s.sql.text(""" - - REFRESH MATERIALIZED VIEW concurrently augur_data.explorer_pr_response_times with data; - COMMIT; - """) - - mv11_refresh = s.sql.text(""" - - REFRESH MATERIALIZED VIEW concurrently augur_data.explorer_pr_assignments with data; - COMMIT; - """) - - mv12_refresh = s.sql.text(""" - - REFRESH MATERIALIZED VIEW concurrently augur_data.explorer_issue_assignments with data; - COMMIT; - """) - - mv13_refresh = s.sql.text(""" - - REFRESH MATERIALIZED VIEW concurrently augur_data.explorer_pr_response with data; - COMMIT; - """) - - mv14_refresh = s.sql.text(""" - REFRESH MATERIALIZED VIEW concurrently augur_data.explorer_repo_languages with data; - COMMIT; - """) - - try: - execute_sql(mv1_refresh) - except Exception as e: - logger.info(f"error is {e}") - pass - - try: - execute_sql(mv2_refresh) - except Exception as e: - logger.info(f"error is {e}") - pass - - try: - execute_sql(mv3_refresh) - except Exception as e: - logger.info(f"error is {e}") - pass - - try: - execute_sql(mv4_refresh) - except Exception as e: - logger.info(f"error is {e}") - pass - - try: - execute_sql(mv5_refresh) - except Exception as e: - logger.info(f"error is {e}") - pass - - try: - execute_sql(mv6_refresh) - except Exception as e: - logger.info(f"error is {e}") - pass - - try: - execute_sql(mv7_refresh) - except Exception as e: - logger.info(f"error is {e}") - pass - - try: - execute_sql(mv8_refresh) - except Exception as e: - logger.info(f"error is {e}") - pass - - try: - execute_sql(mv9_refresh) - except Exception as e: - logger.info(f"error is {e}") - pass - - try: - execute_sql(mv10_refresh) - except Exception as e: - logger.info(f"error is {e}") - pass - - try: - execute_sql(mv11_refresh) - except Exception as e: - logger.info(f"error is {e}") - pass - - try: - execute_sql(mv12_refresh) - except Exception as e: - logger.info(f"error is {e}") - pass - - try: - execute_sql(mv13_refresh) - except Exception as e: - logger.info(f"error is {e}") - pass - - try: - execute_sql(mv14_refresh) - except Exception as e: - logger.info(f"error is {e}") - pass + # REFRESH MATERIALIZED VIEW CONCURRENTLY cannot run inside a transaction + # block, so we use an autocommit connection rather than execute_sql(). + failed_views = [] + with self.app.engine.connect().execution_options(isolation_level="AUTOCOMMIT") as conn: + for view in MATERIALIZED_VIEWS: + view_fqn = f"{view['schema']}.{view['name']}" + logger.info(f"Refreshing materialized view: {view_fqn}") + try: + conn.execute(s.sql.text(get_refresh_sql(view, concurrently=True))) + except Exception as e: + logger.warning(f"Concurrent refresh failed for {view_fqn}, trying non-concurrent: {e}") + try: + conn.execute(s.sql.text(get_refresh_sql(view, concurrently=False))) + except Exception as e2: + logger.error(f"Non-concurrent refresh also failed for {view_fqn}: {e2}") + failed_views.append(view_fqn) + + if failed_views: + raise RuntimeError( + f"{len(failed_views)} materialized view(s) failed to refresh: {failed_views}" + ) #Now refresh facade tables - #Use this class to get all the settings and + #Use this class to get all the settings and #utility functions for facade facade_helper = FacadeHelper(logger) if facade_helper.nuke_stored_affiliations: logger.error("Nuke stored affiliations is deprecated!") - # deprecated because the UI component of facade where affiliations would be - # nuked upon change no longer exists, and this information can easily be derived + # deprecated because the UI component of facade where affiliations would be + # nuked upon change no longer exists, and this information can easily be derived # from queries and materialized views in the current version of CollectOSS. # This method is also a major performance bottleneck with little value. - + if not facade_helper.limited_run or (facade_helper.limited_run and facade_helper.fix_affiliations): logger.error("Fill empty affiliations is deprecated!") - # deprecated because the UI component of facade where affiliations would need - # to be fixed upon change no longer exists, and this information can easily be derived + # deprecated because the UI component of facade where affiliations would need + # to be fixed upon change no longer exists, and this information can easily be derived # from queries and materialized views in the current version of CollectOSS. # This method is also a major performance bottleneck with little value. @@ -202,13 +59,9 @@ def refresh_materialized_views(self): invalidate_caches(facade_helper) except Exception as e: logger.info(f"error is {e}") - + if not facade_helper.limited_run or (facade_helper.limited_run and facade_helper.rebuild_caches): try: rebuild_unknown_affiliation_and_web_caches(facade_helper) except Exception as e: logger.info(f"error is {e}") - - - - diff --git a/pyproject.toml b/pyproject.toml index 6445d832e..452262671 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,6 +19,7 @@ classifiers = [ ] dependencies = [ "alembic>=1.17.1", + "alembic-utils==0.8.8", "Beaker==1.11.0", "boto3==1.17.57", "bs4==0.0.1", diff --git a/uv.lock b/uv.lock index 01c49f632..f6a8bae45 100644 --- a/uv.lock +++ b/uv.lock @@ -43,6 +43,22 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/a5/32/7df1d81ec2e50fb661944a35183d87e62d3f6c6d9f8aff64a4f245226d55/alembic-1.17.1-py3-none-any.whl", hash = "sha256:cbc2386e60f89608bb63f30d2d6cc66c7aaed1fe105bd862828600e5ad167023", size = 247848, upload-time = "2025-10-29T00:23:18.79Z" }, ] +[[package]] +name = "alembic-utils" +version = "0.8.8" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "alembic" }, + { name = "flupy" }, + { name = "parse" }, + { name = "sqlalchemy" }, + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/ec/7a/eae622a97ba1721fd7e420c60060a74852b800ac1fecbaa2e67a35941d6d/alembic_utils-0.8.8.tar.gz", hash = "sha256:99de5d13194f26536bc0322f0c1660020a305015700d8447ccfc20e7d1494e5b", size = 21638, upload-time = "2025-04-10T18:58:13.212Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/dd/01/d55bd80997df2ec1ff2fd40cd3eeadec93c4b3c5492df3c6852b29f9e393/alembic_utils-0.8.8-py3-none-any.whl", hash = "sha256:2c2545dc545833c5deb63bce2c3cde01c1807bf99da5efab2497bc8d817cb86e", size = 31044, upload-time = "2025-04-10T18:58:12.247Z" }, +] + [[package]] name = "amqp" version = "5.3.1" @@ -380,6 +396,7 @@ name = "collectoss" source = { editable = "." } dependencies = [ { name = "alembic" }, + { name = "alembic-utils" }, { name = "beaker" }, { name = "boto3" }, { name = "bs4" }, @@ -480,6 +497,7 @@ test = [ [package.metadata] requires-dist = [ { name = "alembic", specifier = ">=1.17.1" }, + { name = "alembic-utils", specifier = "==0.8.8" }, { name = "beaker", specifier = "==1.11.0" }, { name = "boto3", specifier = "==1.17.57" }, { name = "bs4", specifier = "==0.0.1" }, @@ -834,6 +852,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/a6/ff/ee2f67c0ff146ec98b5df1df637b2bc2d17beeb05df9f427a67bd7a7d79c/flower-2.0.1-py2.py3-none-any.whl", hash = "sha256:9db2c621eeefbc844c8dd88be64aef61e84e2deb29b271e02ab2b5b9f01068e2", size = 383553, upload-time = "2023-08-13T14:37:41.552Z" }, ] +[[package]] +name = "flupy" +version = "1.2.3" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/fd/a5/15fe839297d761e04c4578b11013ed46353e63b44b5e42b59c2078602fa1/flupy-1.2.3.tar.gz", hash = "sha256:220b6d40dea238cd2d66784c0d4d2a5483447a48acd343385768e0c740af9609", size = 12327, upload-time = "2025-07-15T14:08:21.14Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/7c/26/d4d1629f846ae2913e88f74955a3c3f41f3863e74c5fbc1cb79af9550717/flupy-1.2.3-py3-none-any.whl", hash = "sha256:be0f5a393bad2b3534697fbab17081993cd3f5817169dd3a61e8b2e0887612e6", size = 12512, upload-time = "2025-07-18T20:15:21.384Z" }, +] + [[package]] name = "fonttools" version = "4.58.4" @@ -2038,6 +2068,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/da/6d/1235da14daddaa6e47f74ba0c255358f0ce7a6ee05da8bf8eb49161aa6b5/pandas-1.5.3-cp311-cp311-win_amd64.whl", hash = "sha256:bc4c368f42b551bf72fac35c5128963a171b40dce866fb066540eeaf46faa003", size = 10303385, upload-time = "2023-01-19T08:30:11.148Z" }, ] +[[package]] +name = "parse" +version = "1.22.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/7b/a2/dd269daedd5ac3a244ca7855b4878d8655393fd4554d5c24a56bc31e302a/parse-1.22.0.tar.gz", hash = "sha256:d4987d68ccf08b6ba3bf80b5004ff7de61c4337cba2d8350ae5c9925794979d9", size = 36767, upload-time = "2026-05-02T01:36:25.575Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/69/3a/0c2cf5922c6133b74c1cebe4b66f6949818e2cf8121aa59e3ebcd64ac6ac/parse-1.22.0-py2.py3-none-any.whl", hash = "sha256:eea8ed34e2614cea65d9c1d4af9cb68cce26aea13d44bdcaf83c1b40884fe945", size = 20839, upload-time = "2026-05-02T01:36:24.403Z" }, +] + [[package]] name = "parso" version = "0.8.4" From 977e375ad6e86f7c2f4d5721d090e33d308c2bc4 Mon Sep 17 00:00:00 2001 From: Shlok Gilda Date: Tue, 5 May 2026 12:07:53 -0400 Subject: [PATCH 2/3] model materialized view registry as a dataclass Replace the list-of-dicts registry with a frozen MaterializedView dataclass exposing fqn, refresh_sql(), and to_pg_view(). Brings the registry's shape in line with the declarative ORM style used elsewhere in the codebase and gives callers attribute access + type checking instead of string-keyed dict lookups. unique_index_columns is a tuple so frozen=True actually means immutable. __repr__ is overridden to keep the multi-hundred-line view SQL out of debug logs. Refresh task and alembic env.py updated to use the new API; get_refresh_sql free function removed (only two call sites). Emitted REFRESH SQL is byte-identical to the previous version. --- .../application/db/materialized_views.py | 236 ++++++++++-------- collectoss/application/schema/alembic/env.py | 14 +- .../tasks/db/refresh_materialized_views.py | 15 +- 3 files changed, 137 insertions(+), 128 deletions(-) diff --git a/collectoss/application/db/materialized_views.py b/collectoss/application/db/materialized_views.py index b1598877b..dea4d1c6f 100644 --- a/collectoss/application/db/materialized_views.py +++ b/collectoss/application/db/materialized_views.py @@ -20,6 +20,42 @@ procedure (index recreation + non-concurrent refresh). """ +from dataclasses import dataclass + +from alembic_utils.pg_materialized_view import PGMaterializedView + + +@dataclass(frozen=True) +class MaterializedView: + name: str + schema: str + sql: str + unique_index_columns: tuple[str, ...] = () + + @property + def fqn(self) -> str: + return f"{self.schema}.{self.name}" + + def refresh_sql(self, concurrently: bool = True) -> str: + mode = "CONCURRENTLY " if concurrently else "" + return f"REFRESH MATERIALIZED VIEW {mode}{self.fqn} WITH DATA;" + + def to_pg_view(self, with_data: bool = False) -> PGMaterializedView: + return PGMaterializedView( + schema=self.schema, + signature=self.name, + definition=self.sql, + with_data=with_data, + ) + + def __repr__(self) -> str: + # Default dataclass repr would dump the full SQL string (some views + # are 100+ lines), making logs and debug output unreadable. + return ( + f"MaterializedView(name={self.name!r}, schema={self.schema!r}, " + f"unique_index_columns={self.unique_index_columns!r})" + ) + # --------------------------------------------------------------------------- # View 1: api_get_all_repo_prs (source: migration 4, index: migration 25) @@ -689,118 +725,100 @@ # Registry: single source of truth for all materialized views # ============================================================================ -MATERIALIZED_VIEWS = [ +MATERIALIZED_VIEWS: list[MaterializedView] = [ # --- View 1: legacy DDL (augur_full.sql), no unique index --- - { - "name": "issue_reporter_created_at", - "schema": "augur_data", - "sql": _ISSUE_REPORTER_CREATED_AT, - "unique_index_columns": [], # only a non-unique btree on repo_id - }, + MaterializedView( + name="issue_reporter_created_at", + schema="augur_data", + sql=_ISSUE_REPORTER_CREATED_AT, + unique_index_columns=(), # only a non-unique btree on repo_id + ), # --- Views 2-6: from migration 4, indexes from migration 25 --- - { - "name": "api_get_all_repo_prs", - "schema": "augur_data", - "sql": _API_GET_ALL_REPO_PRS, - "unique_index_columns": ["repo_id"], - }, - { - "name": "explorer_entry_list", - "schema": "augur_data", - "sql": _EXPLORER_ENTRY_LIST, - "unique_index_columns": ["repo_id"], - }, - { - "name": "explorer_commits_and_committers_daily_count", - "schema": "augur_data", - "sql": _EXPLORER_COMMITS_AND_COMMITTERS_DAILY_COUNT, - "unique_index_columns": ["repo_id", "cmt_committer_date"], - }, - { - "name": "api_get_all_repos_commits", - "schema": "augur_data", - "sql": _API_GET_ALL_REPOS_COMMITS, - "unique_index_columns": ["repo_id"], - }, - { - "name": "api_get_all_repos_issues", - "schema": "augur_data", - "sql": _API_GET_ALL_REPOS_ISSUES, - "unique_index_columns": ["repo_id"], - }, + MaterializedView( + name="api_get_all_repo_prs", + schema="augur_data", + sql=_API_GET_ALL_REPO_PRS, + unique_index_columns=("repo_id",), + ), + MaterializedView( + name="explorer_entry_list", + schema="augur_data", + sql=_EXPLORER_ENTRY_LIST, + unique_index_columns=("repo_id",), + ), + MaterializedView( + name="explorer_commits_and_committers_daily_count", + schema="augur_data", + sql=_EXPLORER_COMMITS_AND_COMMITTERS_DAILY_COUNT, + unique_index_columns=("repo_id", "cmt_committer_date",), + ), + MaterializedView( + name="api_get_all_repos_commits", + schema="augur_data", + sql=_API_GET_ALL_REPOS_COMMITS, + unique_index_columns=("repo_id",), + ), + MaterializedView( + name="api_get_all_repos_issues", + schema="augur_data", + sql=_API_GET_ALL_REPOS_ISSUES, + unique_index_columns=("repo_id",), + ), # --- Views 6-8: from migration 25, recreated --- - { - "name": "augur_new_contributors", - "schema": "augur_data", - "sql": _AUGUR_NEW_CONTRIBUTORS, - "unique_index_columns": ["cntrb_id", "created_at", "repo_id", "repo_name", "login", "rank"], - }, - { - "name": "explorer_contributor_actions", - "schema": "augur_data", - "sql": _EXPLORER_CONTRIBUTOR_ACTIONS, - "unique_index_columns": ["cntrb_id", "created_at", "repo_id", "action", "repo_name", "login", "rank"], - }, - { - "name": "explorer_new_contributors", - "schema": "augur_data", - "sql": _EXPLORER_NEW_CONTRIBUTORS, - "unique_index_columns": ["cntrb_id", "created_at", "month", "year", "repo_id", "full_name", "repo_name", "login", "rank"], - }, + MaterializedView( + name="augur_new_contributors", + schema="augur_data", + sql=_AUGUR_NEW_CONTRIBUTORS, + unique_index_columns=("cntrb_id", "created_at", "repo_id", "repo_name", "login", "rank",), + ), + MaterializedView( + name="explorer_contributor_actions", + schema="augur_data", + sql=_EXPLORER_CONTRIBUTOR_ACTIONS, + unique_index_columns=("cntrb_id", "created_at", "repo_id", "action", "repo_name", "login", "rank",), + ), + MaterializedView( + name="explorer_new_contributors", + schema="augur_data", + sql=_EXPLORER_NEW_CONTRIBUTORS, + unique_index_columns=("cntrb_id", "created_at", "month", "year", "repo_id", "full_name", "repo_name", "login", "rank",), + ), # --- Views 9-13: from migration 26 --- - { - "name": "explorer_pr_assignments", - "schema": "augur_data", - "sql": _EXPLORER_PR_ASSIGNMENTS, - "unique_index_columns": ["pull_request_id", "id", "node_id"], - }, - { - "name": "explorer_pr_response", - "schema": "augur_data", - "sql": _EXPLORER_PR_RESPONSE, - "unique_index_columns": ["pull_request_id", "id", "cntrb_id", "msg_cntrb_id", "msg_timestamp"], - }, - { - "name": "explorer_user_repos", - "schema": "augur_data", - "sql": _EXPLORER_USER_REPOS, - "unique_index_columns": ["login_name", "user_id", "group_id", "repo_id"], - }, - { - "name": "explorer_pr_response_times", - "schema": "augur_data", - "sql": _EXPLORER_PR_RESPONSE_TIMES, - "unique_index_columns": ["repo_id", "pr_src_id", "pr_src_meta_label"], - }, - { - "name": "explorer_issue_assignments", - "schema": "augur_data", - "sql": _EXPLORER_ISSUE_ASSIGNMENTS, - "unique_index_columns": ["issue_id", "id", "node_id"], - }, + MaterializedView( + name="explorer_pr_assignments", + schema="augur_data", + sql=_EXPLORER_PR_ASSIGNMENTS, + unique_index_columns=("pull_request_id", "id", "node_id",), + ), + MaterializedView( + name="explorer_pr_response", + schema="augur_data", + sql=_EXPLORER_PR_RESPONSE, + unique_index_columns=("pull_request_id", "id", "cntrb_id", "msg_cntrb_id", "msg_timestamp",), + ), + MaterializedView( + name="explorer_user_repos", + schema="augur_data", + sql=_EXPLORER_USER_REPOS, + unique_index_columns=("login_name", "user_id", "group_id", "repo_id",), + ), + MaterializedView( + name="explorer_pr_response_times", + schema="augur_data", + sql=_EXPLORER_PR_RESPONSE_TIMES, + unique_index_columns=("repo_id", "pr_src_id", "pr_src_meta_label",), + ), + MaterializedView( + name="explorer_issue_assignments", + schema="augur_data", + sql=_EXPLORER_ISSUE_ASSIGNMENTS, + unique_index_columns=("issue_id", "id", "node_id",), + ), # --- View 15: from migration 28 --- - { - "name": "explorer_repo_languages", - "schema": "augur_data", - "sql": _EXPLORER_REPO_LANGUAGES, - "unique_index_columns": ["repo_id", "programming_language"], - }, + MaterializedView( + name="explorer_repo_languages", + schema="augur_data", + sql=_EXPLORER_REPO_LANGUAGES, + unique_index_columns=("repo_id", "programming_language",), + ), ] - - -def get_refresh_sql(view, concurrently=True): - """Construct a REFRESH MATERIALIZED VIEW statement. - - Args: - view: A dict from MATERIALIZED_VIEWS. - concurrently: If True, use CONCURRENTLY (requires a unique index - and the view to have been populated at least once). - - Returns: - SQL string ready to be wrapped in sqlalchemy.sql.text(). - """ - mode = "CONCURRENTLY " if concurrently else "" - return ( - f"REFRESH MATERIALIZED VIEW {mode}" - f"{view['schema']}.{view['name']} WITH DATA;" - ) diff --git a/collectoss/application/schema/alembic/env.py b/collectoss/application/schema/alembic/env.py index f7d1674c1..77f2bd2e4 100644 --- a/collectoss/application/schema/alembic/env.py +++ b/collectoss/application/schema/alembic/env.py @@ -30,8 +30,8 @@ # DROP + CREATE (replace). When a view is replaced: # # 1. ALL indexes are destroyed. Manually add CREATE UNIQUE INDEX statements -# after the replace op, using unique_index_columns from the registry -# (collectoss/application/db/materialized_views.py). +# after the replace op, using MaterializedView.unique_index_columns from +# the registry (collectoss/application/db/materialized_views.py). # # 2. The view is recreated WITH NO DATA. You CANNOT run REFRESH CONCURRENTLY # immediately — it requires both a unique index and pre-existing data. @@ -44,15 +44,7 @@ from alembic_utils.pg_materialized_view import PGMaterializedView from alembic_utils.replaceable_entity import register_entities from collectoss.application.db.materialized_views import MATERIALIZED_VIEWS -_materialized_view_entities = [ - PGMaterializedView( - schema=view["schema"], - signature=view["name"], - definition=view["sql"], - with_data=False, - ) - for view in MATERIALIZED_VIEWS -] +_materialized_view_entities = [view.to_pg_view() for view in MATERIALIZED_VIEWS] register_entities(_materialized_view_entities, entity_types=[PGMaterializedView]) # other values from the config, defined by the needs of env.py, diff --git a/collectoss/tasks/db/refresh_materialized_views.py b/collectoss/tasks/db/refresh_materialized_views.py index d3734bbaf..94c833c76 100644 --- a/collectoss/tasks/db/refresh_materialized_views.py +++ b/collectoss/tasks/db/refresh_materialized_views.py @@ -3,7 +3,7 @@ import sqlalchemy as s from collectoss.tasks.init.celery_app import celery_app as celery -from collectoss.application.db.materialized_views import MATERIALIZED_VIEWS, get_refresh_sql +from collectoss.application.db.materialized_views import MATERIALIZED_VIEWS from collectoss.tasks.git.util.facade_worker.facade_worker.config import FacadeHelper from collectoss.tasks.git.util.facade_worker.facade_worker.rebuildcache import invalidate_caches, rebuild_unknown_affiliation_and_web_caches @@ -18,17 +18,16 @@ def refresh_materialized_views(self): failed_views = [] with self.app.engine.connect().execution_options(isolation_level="AUTOCOMMIT") as conn: for view in MATERIALIZED_VIEWS: - view_fqn = f"{view['schema']}.{view['name']}" - logger.info(f"Refreshing materialized view: {view_fqn}") + logger.info(f"Refreshing materialized view: {view.fqn}") try: - conn.execute(s.sql.text(get_refresh_sql(view, concurrently=True))) + conn.execute(s.sql.text(view.refresh_sql(concurrently=True))) except Exception as e: - logger.warning(f"Concurrent refresh failed for {view_fqn}, trying non-concurrent: {e}") + logger.warning(f"Concurrent refresh failed for {view.fqn}, trying non-concurrent: {e}") try: - conn.execute(s.sql.text(get_refresh_sql(view, concurrently=False))) + conn.execute(s.sql.text(view.refresh_sql(concurrently=False))) except Exception as e2: - logger.error(f"Non-concurrent refresh also failed for {view_fqn}: {e2}") - failed_views.append(view_fqn) + logger.error(f"Non-concurrent refresh also failed for {view.fqn}: {e2}") + failed_views.append(view.fqn) if failed_views: raise RuntimeError( From 0db2f0555dada41e3d84023795fb7945bdaff2a0 Mon Sep 17 00:00:00 2001 From: Adrian Edwards Date: Mon, 18 May 2026 13:39:09 -0400 Subject: [PATCH 3/3] adjust refresh logic so that things get intelligently refreshed in the correct way based on whether indexes exist or not Signed-off-by: Adrian Edwards --- .../tasks/db/refresh_materialized_views.py | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/collectoss/tasks/db/refresh_materialized_views.py b/collectoss/tasks/db/refresh_materialized_views.py index 94c833c76..0febe34b8 100644 --- a/collectoss/tasks/db/refresh_materialized_views.py +++ b/collectoss/tasks/db/refresh_materialized_views.py @@ -19,15 +19,19 @@ def refresh_materialized_views(self): with self.app.engine.connect().execution_options(isolation_level="AUTOCOMMIT") as conn: for view in MATERIALIZED_VIEWS: logger.info(f"Refreshing materialized view: {view.fqn}") - try: - conn.execute(s.sql.text(view.refresh_sql(concurrently=True))) - except Exception as e: - logger.warning(f"Concurrent refresh failed for {view.fqn}, trying non-concurrent: {e}") + + if len(view.unique_index_columns) > 0: try: - conn.execute(s.sql.text(view.refresh_sql(concurrently=False))) - except Exception as e2: - logger.error(f"Non-concurrent refresh also failed for {view.fqn}: {e2}") - failed_views.append(view.fqn) + conn.execute(s.sql.text(view.refresh_sql(concurrently=True))) + continue + except Exception as e: + logger.warning(f"Concurrent refresh failed for {view.fqn}, trying non-concurrent: {e}") + + try: + conn.execute(s.sql.text(view.refresh_sql(concurrently=False))) + except Exception as e2: + logger.error(f"Non-concurrent refresh failed for {view.fqn}: {e2}") + failed_views.append(view.fqn) if failed_views: raise RuntimeError(