Skip to content

Save table and database create statements as tars#298

Open
kirillgarbar wants to merge 10 commits intoyandex:mainfrom
kirillgarbar:tar-metadata
Open

Save table and database create statements as tars#298
kirillgarbar wants to merge 10 commits intoyandex:mainfrom
kirillgarbar:tar-metadata

Conversation

@kirillgarbar
Copy link
Contributor

@kirillgarbar kirillgarbar commented Mar 2, 2026

Summary by Sourcery

Store table and database CREATE statements in tarball-based metadata and adjust backup/restore logic to use the new format while remaining compatible with the legacy layout.

New Features:

  • Support uploading and downloading multiple database CREATE statements as a single tarball in backup storage.
  • Support uploading and downloading multiple table CREATE statements per database as tarballs in backup storage.
  • Introduce async-pipeline stages and pipelines to upload and download in-memory data as TAR streams instead of individual files.

Bug Fixes:

  • Prevent partial table backups by checking metadata change times before backing up data and create statements, cleaning up frozen data when metadata changes.

Enhancements:

  • Refine table backup flow to separate metadata-change checks from data backup and delay uploading CREATE statements until all tables are processed.
  • Batch retrieval of table CREATE statements during restore to reduce storage calls.
  • Refactor TAR stream reading/writing stages into reusable base classes and add in-memory TAR unpacking.
  • Extend calculators to estimate aligned in-memory data sizes for TAR uploads.
  • Simplify database backup logic by separating databases with embedded metadata and consolidating metadata registration and upload.
  • Add helper to run async pipelines and collect all produced results.

@sourcery-ai
Copy link

sourcery-ai bot commented Mar 2, 2026

Reviewer's Guide

This PR changes backup/restore of ClickHouse database and table CREATE statements to use tarball-based bulk upload/download instead of per-object files, introduces generic tar-streaming primitives for in‑memory data in the async pipeline, and adjusts backup logic to only persist metadata after confirming table metadata stability and to batch table metadata upload per database.

Sequence diagram for batched table CREATE backup using tarball

sequenceDiagram
    participant TB as TableBackup
    participant CH as ClickHouse
    participant BL as BackupLayout
    participant SL as StorageLoader
    participant PE as PipelineExecutor
    participant UP as upload_data_tarball_pipeline

    TB->>CH: create_shadow_increment()
    TB->>TB: ThreadExecPool over tables_
    loop for each table
        TB->>CH: _freeze_table(context, db, table, backup_name, ...)
    end

    TB->>TB: _check_metadata_change_time(table, backup_name, change_times)
    alt metadata unchanged
        TB->>TB: collect (table.name, table.create_statement)
        TB->>TB: backup_meta.add_table(TableMetadata)
        TB->>TB: _backup_frozen_table_data(...)
        TB->>TB: _backup_cloud_storage_metadata(...)
    else metadata changed
        TB->>CH: remove_freezed_data(backup_name, table)
    end

    TB-->>BL: upload_create_statements(backup_meta, db, create_statements_to_backup)
    activate BL
    BL->>SL: upload_data_tarball(file_names, data_list, remote_path, is_async=True, encryption)
    activate SL
    SL->>PE: upload_data_tarball(file_names, data_list, remote_path, is_async, encryption, compression=False)
    activate PE
    PE->>UP: run(upload_data_tarball_pipeline(...))
    activate UP
    UP->>UP: ReadDataTarballStage -> EncryptStage? -> UploadingStage
    UP-->>PE: complete
    deactivate UP
    PE-->>SL: done
    deactivate PE
    SL-->>BL: done
    deactivate SL
    BL-->>TB: metadata upload scheduled
    deactivate BL

    TB->>BL: wait()
    TB->>CH: remove_freezed_data()
    TB-->>TB: backup done
Loading

Sequence diagram for database restore using tarball CREATE statements

sequenceDiagram
    participant DBL as DatabaseLogic
    participant BL as BackupLayout
    participant SL as StorageLoader
    participant PE as PipelineExecutor
    participant DP as download_data_tarball_pipeline
    participant CH as ClickHouse

    DBL->>BL: get_database_create_statements(backup_meta, db_names)
    activate BL
    BL->>SL: download_data_tarball(remote_path, encryption=backup_meta.encrypted, compression=True?)
    activate SL
    SL->>PE: download_data_tarball(remote_path, is_async=False, encryption, compression)
    activate PE
    PE->>DP: run(download_data_tarball_pipeline(...))
    activate DP
    DP->>DP: DownloadStorageStage -> DecryptStage? -> DecompressStage? -> UnpackTarballStage
    DP-->>PE: List[(name, sql_bytes)]
    deactivate DP
    PE-->>SL: List[(name, sql_bytes)]
    deactivate PE
    SL-->>BL: List[(name, sql_bytes)]
    deactivate SL
    BL->>BL: decode bytes to utf-8
    BL-->>DBL: List[(db_name, create_sql)]
    deactivate BL

    DBL->>DBL: build dict(db_name -> create_sql)
    loop for each db to restore
        alt db.has_embedded_metadata()
            DBL->>DBL: embedded_schema_db_sql(db)
        else
            DBL->>DBL: db_sql = create_statements[db.name]
        end
        DBL->>CH: execute CREATE DATABASE or ATTACH db_sql
    end
Loading

Class diagram for new tarball-based metadata and pipeline components

classDiagram
    class BackupLayout {
        +upload_backup_metadata(backup: BackupMetadata) void
        +upload_database_create_statements(backup_meta: BackupMetadata, databases: list~Database~) list~Database~
        +upload_create_statements(backup_meta: BackupMetadata, db: Database, create_statements: list~tuple~) void
        +get_database_create_statement(backup_meta: BackupMetadata, db_name: str) str
        +get_database_create_statements(backup_meta: BackupMetadata, db_names: list~str~) list~tuple~
        +get_table_create_statement(backup_meta: BackupMetadata, db_name: str, table_name: str) str
        +get_table_create_statements(backup_meta: BackupMetadata, db_name: str, table_names: list~str~) list~tuple~
    }

    class DatabaseLogic {
        +backup(context: BackupContext, databases: list~Database~) list~Database~
        +restore(context: BackupContext, databases: list~Database~) list~Database~
    }

    class TableBackup {
        +_backup(context: BackupContext, db: Database, tables_: list~Table~, backup_name: str, ...) void
        +_check_metadata_change_time(context: BackupContext, table: Table, backup_name: str, change_times: dict~str,TableMetadataChangeTime~) bool
        +_backup_frozen_table_data(context: BackupContext, table: Table, backup_name: str) void
        +_get_tables_from_meta(context: BackupContext, meta: list~TableMetadata~) list~Table~
    }

    class StorageLoader {
        +upload_data(data: bytes, remote_path: str, is_async: bool, encryption: bool) str
        +upload_data_tarball(file_names: list~str~, data_list: list~bytes~, remote_path: str, is_async: bool, encryption: bool, compression: bool) str
        +download_data(remote_path: str, is_async: bool, encryption: bool, compression: bool, encoding: str) Any
        +download_data_tarball(remote_path: str, is_async: bool, encryption: bool, compression: bool) list~tuple~
    }

    class PipelineExecutor {
        +upload_data(data: bytes, remote_path: str, is_async: bool, encryption: bool, compression: bool) void
        +upload_data_tarball(file_names: list~str~, data_list: list~bytes~, remote_path: str, is_async: bool, encryption: bool, compression: bool) void
        +download_data(remote_path: str, is_async: bool, encryption: bool, compression: bool) bytes
        +download_data_tarball(remote_path: str, is_async: bool, encryption: bool, compression: bool) list~tuple~
    }

    class PipelineBuilder {
        +build_read_files_tarball_stage(dir_path: Path) PipelineBuilder
        +build_read_data_tarball_stage(file_names: list~str~, data_list: list~bytes~) PipelineBuilder
        +build_unpack_data_tarball_stage() PipelineBuilder
    }

    class ReadFilesTarballStageBase {
        -chunk_size: int
        +__call__() Iterator~bytes~
        +_read_file_content(file_path: Path) Iterator~bytes~
        +_read_content(stream: BinaryIO, size: int) Iterator~bytes~
    }

    class ReadDataTarballStage {
        +__init__(config: dict, file_names: List~str~, data_list: List~bytes~)
        +__call__() Iterator~bytes~
    }

    class TarStreamProcessorBase {
        -tarstream: BytesFIFO
        -state: State
        -bytes_to_process: int
        -tarinfo: TarInfo
        +__call__(data: bytes, index: int) void
        +on_done() Any
        +_read_header() bool
        +_process_long_name() bool
        +_process_data() bool
        +_skip_bytes() bool
        +_on_file_complete() void
        +_on_file_start() void
        +_write_data(data: bytes) void
    }

    class WriteFilesStage {
        -dir: Path
        -fobj: IO
        +__init__(config: dict, dir_path: Path, buffer_size: int)
        +_on_file_complete() void
        +_on_file_start() void
        +_write_data(data: bytes) void
    }

    class UnpackTarballStage {
        -current_data: bytearray
        -results: list~tuple~
        +__init__(config: dict, buffer_size: int)
        +_on_file_complete() void
        +_on_file_start() void
        +_write_data(data: bytes) void
        +on_done() Iterator~tuple~
    }

    class Calculators {
        +calc_aligned_files_size(files: List~Path~, alignment: int) int
        +calc_aligned_data_size(data_list: List~bytes~, alignment: int) int
        +calc_tarball_size_scan(dir_path: Path, aligned_files_size: int, ...) int
        +calc_tarball_size(file_names: List~str~, aligned_data_size: int) int
    }

    BackupLayout --> StorageLoader : uses
    DatabaseLogic --> BackupLayout : uses
    TableBackup --> BackupLayout : uses
    TableBackup --> DatabaseLogic : coordinate backup/restore
    StorageLoader --> PipelineExecutor : delegates
    PipelineExecutor --> PipelineBuilder : builds pipelines
    PipelineBuilder --> ReadDataTarballStage : creates
    PipelineBuilder --> UnpackTarballStage : creates
    ReadFilesTarballStageBase <|-- ReadDataTarballStage
    TarStreamProcessorBase <|-- WriteFilesStage
    TarStreamProcessorBase <|-- UnpackTarballStage
    Calculators <.. PipelineBuilder : uses calc_aligned_data_size
    Calculators <.. upload_data_tarball_pipeline : uses calc_aligned_data_size
    TarStreamProcessorBase ..> State : uses
    TarStreamProcessorBase ..> BytesFIFO : uses
    TarStreamProcessorBase ..> TarInfo : uses
Loading

File-Level Changes

Change Details Files
Batch table CREATE statement backup per database and only after verifying table metadata has not changed during freeze, while reusing the same metadata for data/cloud-storage backup.
  • Replace per-table _backup_freezed_table with _check_metadata_change_time to only validate metadata timestamps and skip unstable tables, moving metadata registration and data backup into the freeze-completion loop in _backup
  • Accumulate (table_name, create_statement) pairs for successfully validated tables and upload them with backup_layout.upload_create_statements in a finally block
  • Log table backup at data-backup time in _backup_frozen_table_data instead of in the removed _backup_freezed_table helper
ch_backup/logic/table.py
Switch database metadata backup to tarball-based bulk upload/download, and adjust database backup/restore flow accordingly.
  • Split databases into ones with embedded metadata and ones needing external CREATE statements; bulk-upload external CREATE statements with upload_database_create_statements and then add all databases to BackupMetadata
  • On restore, prefetch all required database CREATE statements with get_database_create_statements into a dict and use it instead of per-database get_database_create_statement calls
  • Remove the old _backup_database helper which handled databases one by one
ch_backup/logic/database.py
ch_backup/backup/layout.py
Introduce tarball-based upload/download of arbitrary in-memory data and refactor TAR stream processing to support both filesystem writes and in-memory unpacking.
  • Add ReadDataTarballStage for streaming in-memory data (file_names + bytes) into a TAR stream, reusing padding logic from ReadFilesTarballStageBase
  • Refactor WriteFilesStage into a generic TarStreamProcessorBase with overridable hooks, keep WriteFilesStage for filesystem output, and add UnpackTarballStage that collects TAR entries into (filename, bytes) pairs
  • Add upload_data_tarball_pipeline and download_data_tarball_pipeline plus run_and_collect_all to wire new stages into the async pipeline framework
ch_backup/storage/async_pipeline/stages/filesystem/read_files_tarball_stage.py
ch_backup/storage/async_pipeline/stages/filesystem/write_files_stage.py
ch_backup/storage/async_pipeline/pipelines.py
ch_backup/storage/async_pipeline/pipeline_executor.py
ch_backup/storage/async_pipeline/pipeline_builder.py
Expose tarball upload/download on the storage loader and wire it into BackupLayout to read/write table and database CREATE statement tarballs with backward compatibility.
  • Add StorageLoader.upload_data_tarball and download_data_tarball methods delegating to the pipeline executor
  • Implement BackupLayout.upload_database_create_statements and upload_create_statements which pack multiple CREATE statements into a single tarball and upload via upload_data_tarball
  • Implement BackupLayout.get_database_create_statements and get_table_create_statements, which detect new tarball layout, otherwise fall back to old per-object metadata files for compatibility
ch_backup/storage/loader.py
ch_backup/backup/layout.py
Add calculator helper for sizing in-memory tarballs used in the new pipelines.
  • Introduce calc_aligned_data_size to compute total size of in-memory data chunks with alignment padding, and use it when estimating tarball size for upload_data_tarball_pipeline
ch_backup/calculators.py
ch_backup/storage/async_pipeline/pipelines.py
Optimize table restore metadata fetch by bulk-downloading CREATE statements per database instead of per-table.
  • Replace _get_table_from_meta with _get_tables_from_meta which builds Table objects with empty create_statement, bulk-downloads CREATE statements per database via get_table_create_statements, and fills them in
  • Use the new _get_tables_from_meta in restore instead of mapping meta entries one by one
ch_backup/logic/table.py
ch_backup/backup/layout.py

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Copy link

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 2 issues, and left some high level feedback:

  • In _get_tables_from_meta, the list comprehension [meta.name for meta in meta] both shadows the meta argument and passes table names from all databases to get_table_create_statements; this will cause incorrect fallback behavior for non‑tar layouts (wrong db_name used) and is confusing, so it should be refactored to use a distinct variable and only pass names for the current database.
  • The new tarball APIs (upload_database_create_statements, upload_create_statements, get_*_create_statements) rely on raw names as tar member paths; if there is any possibility of clashes or names needing escaping (e.g., special characters or case sensitivity), consider reusing the existing escape_metadata_file_name/quoting logic for tar entry names to keep behavior consistent with the old per‑file layout.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- In `_get_tables_from_meta`, the list comprehension `[meta.name for meta in meta]` both shadows the `meta` argument and passes table names from *all* databases to `get_table_create_statements`; this will cause incorrect fallback behavior for non‑tar layouts (wrong db_name used) and is confusing, so it should be refactored to use a distinct variable and only pass names for the current `database`.
- The new tarball APIs (`upload_database_create_statements`, `upload_create_statements`, `get_*_create_statements`) rely on raw names as tar member paths; if there is any possibility of clashes or names needing escaping (e.g., special characters or case sensitivity), consider reusing the existing `escape_metadata_file_name`/quoting logic for tar entry names to keep behavior consistent with the old per‑file layout.

## Individual Comments

### Comment 1
<location path="ch_backup/logic/table.py" line_range="369-373" />
<code_context>
         logging.debug("Retrieving tables from tables metadata")
-        tables_to_preprocess: List[Table] = list(
-            map(lambda meta: self._get_table_from_meta(context, meta), tables_meta)
+        tables_to_preprocess: List[Table] = self._get_tables_from_meta(
+            context, tables_meta
         )
</code_context>
<issue_to_address>
**issue (bug_risk):** Incorrect table_names list passed to get_table_create_statements mixes tables from all databases

`_get_tables_from_meta` passes `[meta.name for meta in meta]` into `get_table_create_statements`, which includes tables from all databases, not just the current one. While the tarball path ignores this list, the fallback path will attempt to read `metadata/{db}/{table}.sql` for invalid `(database, table)` combinations, causing `download_data` to fail. Please filter `meta` by `database` before building `table_names`, e.g.:

```python
table_names = [m.name for m in meta if m.database == database]
statements = context.backup_layout.get_table_create_statements(
    context.backup_meta, database, table_names,
)
```
</issue_to_address>

### Comment 2
<location path="ch_backup/storage/async_pipeline/stages/filesystem/write_files_stage.py" line_range="177" />
<code_context>
+        self._fobj.write(data)
+
+
+class UnpackTarballStage(TarStreamProcessorBase):
+    """
+    Unpack TAR stream to in-memory data pairs (filename, data).
</code_context>
<issue_to_address>
**issue (bug_risk):** Tuple is used in type hints but not imported, which will raise NameError at runtime

In `UnpackTarballStage`, `Tuple` is used in annotations (e.g. `self._results: list[Tuple[str, bytes]]`, `def on_done(self) -> Iterator[Tuple[str, bytes]]`) but not imported. Since this module doesn’t use `from __future__ import annotations`, these are evaluated at runtime and will raise `NameError`.

Please import `Tuple` (and `Iterator`, if not already imported) from `typing`, or switch this module to string annotations / `from __future__ import annotations` so the type names aren’t evaluated at runtime.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant