Save table and database create statements as tars#298
Open
kirillgarbar wants to merge 10 commits intoyandex:mainfrom
Open
Save table and database create statements as tars#298kirillgarbar wants to merge 10 commits intoyandex:mainfrom
kirillgarbar wants to merge 10 commits intoyandex:mainfrom
Conversation
Reviewer's GuideThis PR changes backup/restore of ClickHouse database and table CREATE statements to use tarball-based bulk upload/download instead of per-object files, introduces generic tar-streaming primitives for in‑memory data in the async pipeline, and adjusts backup logic to only persist metadata after confirming table metadata stability and to batch table metadata upload per database. Sequence diagram for batched table CREATE backup using tarballsequenceDiagram
participant TB as TableBackup
participant CH as ClickHouse
participant BL as BackupLayout
participant SL as StorageLoader
participant PE as PipelineExecutor
participant UP as upload_data_tarball_pipeline
TB->>CH: create_shadow_increment()
TB->>TB: ThreadExecPool over tables_
loop for each table
TB->>CH: _freeze_table(context, db, table, backup_name, ...)
end
TB->>TB: _check_metadata_change_time(table, backup_name, change_times)
alt metadata unchanged
TB->>TB: collect (table.name, table.create_statement)
TB->>TB: backup_meta.add_table(TableMetadata)
TB->>TB: _backup_frozen_table_data(...)
TB->>TB: _backup_cloud_storage_metadata(...)
else metadata changed
TB->>CH: remove_freezed_data(backup_name, table)
end
TB-->>BL: upload_create_statements(backup_meta, db, create_statements_to_backup)
activate BL
BL->>SL: upload_data_tarball(file_names, data_list, remote_path, is_async=True, encryption)
activate SL
SL->>PE: upload_data_tarball(file_names, data_list, remote_path, is_async, encryption, compression=False)
activate PE
PE->>UP: run(upload_data_tarball_pipeline(...))
activate UP
UP->>UP: ReadDataTarballStage -> EncryptStage? -> UploadingStage
UP-->>PE: complete
deactivate UP
PE-->>SL: done
deactivate PE
SL-->>BL: done
deactivate SL
BL-->>TB: metadata upload scheduled
deactivate BL
TB->>BL: wait()
TB->>CH: remove_freezed_data()
TB-->>TB: backup done
Sequence diagram for database restore using tarball CREATE statementssequenceDiagram
participant DBL as DatabaseLogic
participant BL as BackupLayout
participant SL as StorageLoader
participant PE as PipelineExecutor
participant DP as download_data_tarball_pipeline
participant CH as ClickHouse
DBL->>BL: get_database_create_statements(backup_meta, db_names)
activate BL
BL->>SL: download_data_tarball(remote_path, encryption=backup_meta.encrypted, compression=True?)
activate SL
SL->>PE: download_data_tarball(remote_path, is_async=False, encryption, compression)
activate PE
PE->>DP: run(download_data_tarball_pipeline(...))
activate DP
DP->>DP: DownloadStorageStage -> DecryptStage? -> DecompressStage? -> UnpackTarballStage
DP-->>PE: List[(name, sql_bytes)]
deactivate DP
PE-->>SL: List[(name, sql_bytes)]
deactivate PE
SL-->>BL: List[(name, sql_bytes)]
deactivate SL
BL->>BL: decode bytes to utf-8
BL-->>DBL: List[(db_name, create_sql)]
deactivate BL
DBL->>DBL: build dict(db_name -> create_sql)
loop for each db to restore
alt db.has_embedded_metadata()
DBL->>DBL: embedded_schema_db_sql(db)
else
DBL->>DBL: db_sql = create_statements[db.name]
end
DBL->>CH: execute CREATE DATABASE or ATTACH db_sql
end
Class diagram for new tarball-based metadata and pipeline componentsclassDiagram
class BackupLayout {
+upload_backup_metadata(backup: BackupMetadata) void
+upload_database_create_statements(backup_meta: BackupMetadata, databases: list~Database~) list~Database~
+upload_create_statements(backup_meta: BackupMetadata, db: Database, create_statements: list~tuple~) void
+get_database_create_statement(backup_meta: BackupMetadata, db_name: str) str
+get_database_create_statements(backup_meta: BackupMetadata, db_names: list~str~) list~tuple~
+get_table_create_statement(backup_meta: BackupMetadata, db_name: str, table_name: str) str
+get_table_create_statements(backup_meta: BackupMetadata, db_name: str, table_names: list~str~) list~tuple~
}
class DatabaseLogic {
+backup(context: BackupContext, databases: list~Database~) list~Database~
+restore(context: BackupContext, databases: list~Database~) list~Database~
}
class TableBackup {
+_backup(context: BackupContext, db: Database, tables_: list~Table~, backup_name: str, ...) void
+_check_metadata_change_time(context: BackupContext, table: Table, backup_name: str, change_times: dict~str,TableMetadataChangeTime~) bool
+_backup_frozen_table_data(context: BackupContext, table: Table, backup_name: str) void
+_get_tables_from_meta(context: BackupContext, meta: list~TableMetadata~) list~Table~
}
class StorageLoader {
+upload_data(data: bytes, remote_path: str, is_async: bool, encryption: bool) str
+upload_data_tarball(file_names: list~str~, data_list: list~bytes~, remote_path: str, is_async: bool, encryption: bool, compression: bool) str
+download_data(remote_path: str, is_async: bool, encryption: bool, compression: bool, encoding: str) Any
+download_data_tarball(remote_path: str, is_async: bool, encryption: bool, compression: bool) list~tuple~
}
class PipelineExecutor {
+upload_data(data: bytes, remote_path: str, is_async: bool, encryption: bool, compression: bool) void
+upload_data_tarball(file_names: list~str~, data_list: list~bytes~, remote_path: str, is_async: bool, encryption: bool, compression: bool) void
+download_data(remote_path: str, is_async: bool, encryption: bool, compression: bool) bytes
+download_data_tarball(remote_path: str, is_async: bool, encryption: bool, compression: bool) list~tuple~
}
class PipelineBuilder {
+build_read_files_tarball_stage(dir_path: Path) PipelineBuilder
+build_read_data_tarball_stage(file_names: list~str~, data_list: list~bytes~) PipelineBuilder
+build_unpack_data_tarball_stage() PipelineBuilder
}
class ReadFilesTarballStageBase {
-chunk_size: int
+__call__() Iterator~bytes~
+_read_file_content(file_path: Path) Iterator~bytes~
+_read_content(stream: BinaryIO, size: int) Iterator~bytes~
}
class ReadDataTarballStage {
+__init__(config: dict, file_names: List~str~, data_list: List~bytes~)
+__call__() Iterator~bytes~
}
class TarStreamProcessorBase {
-tarstream: BytesFIFO
-state: State
-bytes_to_process: int
-tarinfo: TarInfo
+__call__(data: bytes, index: int) void
+on_done() Any
+_read_header() bool
+_process_long_name() bool
+_process_data() bool
+_skip_bytes() bool
+_on_file_complete() void
+_on_file_start() void
+_write_data(data: bytes) void
}
class WriteFilesStage {
-dir: Path
-fobj: IO
+__init__(config: dict, dir_path: Path, buffer_size: int)
+_on_file_complete() void
+_on_file_start() void
+_write_data(data: bytes) void
}
class UnpackTarballStage {
-current_data: bytearray
-results: list~tuple~
+__init__(config: dict, buffer_size: int)
+_on_file_complete() void
+_on_file_start() void
+_write_data(data: bytes) void
+on_done() Iterator~tuple~
}
class Calculators {
+calc_aligned_files_size(files: List~Path~, alignment: int) int
+calc_aligned_data_size(data_list: List~bytes~, alignment: int) int
+calc_tarball_size_scan(dir_path: Path, aligned_files_size: int, ...) int
+calc_tarball_size(file_names: List~str~, aligned_data_size: int) int
}
BackupLayout --> StorageLoader : uses
DatabaseLogic --> BackupLayout : uses
TableBackup --> BackupLayout : uses
TableBackup --> DatabaseLogic : coordinate backup/restore
StorageLoader --> PipelineExecutor : delegates
PipelineExecutor --> PipelineBuilder : builds pipelines
PipelineBuilder --> ReadDataTarballStage : creates
PipelineBuilder --> UnpackTarballStage : creates
ReadFilesTarballStageBase <|-- ReadDataTarballStage
TarStreamProcessorBase <|-- WriteFilesStage
TarStreamProcessorBase <|-- UnpackTarballStage
Calculators <.. PipelineBuilder : uses calc_aligned_data_size
Calculators <.. upload_data_tarball_pipeline : uses calc_aligned_data_size
TarStreamProcessorBase ..> State : uses
TarStreamProcessorBase ..> BytesFIFO : uses
TarStreamProcessorBase ..> TarInfo : uses
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Hey - I've found 2 issues, and left some high level feedback:
- In
_get_tables_from_meta, the list comprehension[meta.name for meta in meta]both shadows themetaargument and passes table names from all databases toget_table_create_statements; this will cause incorrect fallback behavior for non‑tar layouts (wrong db_name used) and is confusing, so it should be refactored to use a distinct variable and only pass names for the currentdatabase. - The new tarball APIs (
upload_database_create_statements,upload_create_statements,get_*_create_statements) rely on raw names as tar member paths; if there is any possibility of clashes or names needing escaping (e.g., special characters or case sensitivity), consider reusing the existingescape_metadata_file_name/quoting logic for tar entry names to keep behavior consistent with the old per‑file layout.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In `_get_tables_from_meta`, the list comprehension `[meta.name for meta in meta]` both shadows the `meta` argument and passes table names from *all* databases to `get_table_create_statements`; this will cause incorrect fallback behavior for non‑tar layouts (wrong db_name used) and is confusing, so it should be refactored to use a distinct variable and only pass names for the current `database`.
- The new tarball APIs (`upload_database_create_statements`, `upload_create_statements`, `get_*_create_statements`) rely on raw names as tar member paths; if there is any possibility of clashes or names needing escaping (e.g., special characters or case sensitivity), consider reusing the existing `escape_metadata_file_name`/quoting logic for tar entry names to keep behavior consistent with the old per‑file layout.
## Individual Comments
### Comment 1
<location path="ch_backup/logic/table.py" line_range="369-373" />
<code_context>
logging.debug("Retrieving tables from tables metadata")
- tables_to_preprocess: List[Table] = list(
- map(lambda meta: self._get_table_from_meta(context, meta), tables_meta)
+ tables_to_preprocess: List[Table] = self._get_tables_from_meta(
+ context, tables_meta
)
</code_context>
<issue_to_address>
**issue (bug_risk):** Incorrect table_names list passed to get_table_create_statements mixes tables from all databases
`_get_tables_from_meta` passes `[meta.name for meta in meta]` into `get_table_create_statements`, which includes tables from all databases, not just the current one. While the tarball path ignores this list, the fallback path will attempt to read `metadata/{db}/{table}.sql` for invalid `(database, table)` combinations, causing `download_data` to fail. Please filter `meta` by `database` before building `table_names`, e.g.:
```python
table_names = [m.name for m in meta if m.database == database]
statements = context.backup_layout.get_table_create_statements(
context.backup_meta, database, table_names,
)
```
</issue_to_address>
### Comment 2
<location path="ch_backup/storage/async_pipeline/stages/filesystem/write_files_stage.py" line_range="177" />
<code_context>
+ self._fobj.write(data)
+
+
+class UnpackTarballStage(TarStreamProcessorBase):
+ """
+ Unpack TAR stream to in-memory data pairs (filename, data).
</code_context>
<issue_to_address>
**issue (bug_risk):** Tuple is used in type hints but not imported, which will raise NameError at runtime
In `UnpackTarballStage`, `Tuple` is used in annotations (e.g. `self._results: list[Tuple[str, bytes]]`, `def on_done(self) -> Iterator[Tuple[str, bytes]]`) but not imported. Since this module doesn’t use `from __future__ import annotations`, these are evaluated at runtime and will raise `NameError`.
Please import `Tuple` (and `Iterator`, if not already imported) from `typing`, or switch this module to string annotations / `from __future__ import annotations` so the type names aren’t evaluated at runtime.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary by Sourcery
Store table and database CREATE statements in tarball-based metadata and adjust backup/restore logic to use the new format while remaining compatible with the legacy layout.
New Features:
Bug Fixes:
Enhancements: