Phase 1: daily incremental assembly version tracking#9
Conversation
Reviewer's GuideImplements Phase 1 of daily incremental assembly version tracking by adding an incremental updater that detects newly superseded assemblies from current JSONL vs the previous TSV and appends them to a historical TSV, a companion backfill tool that fetches and merges missing assembly versions, and a comprehensive pytest suite covering both flows and their key helper functions. Sequence diagram for incremental historical update flowsequenceDiagram
actor Operator
participant IncrementalCLI as update_historical_incremental_main
participant IncrementalFlow as run_incremental_historical_update
participant Loader as load_previous_parsed_by_base
participant Comparator as identify_newly_superseded
participant Summary as print_superseded_summary
participant Warn as print_missing_versions_warning
participant Writer as append_superseded_to_tsv
participant PreviousTSV as assembly_current_tsv_previous
participant NewJSONL as assembly_data_report_jsonl
participant HistoricalTSV as assembly_historical_tsv
Operator->>IncrementalCLI: invoke with input_path, previous_tsv, historical_tsv
IncrementalCLI->>IncrementalFlow: run_incremental_historical_update(new_jsonl, previous_tsv, historical_tsv)
IncrementalFlow->>Loader: load_previous_parsed_by_base(previous_tsv)
Loader->>PreviousTSV: open and read
PreviousTSV-->>Loader: rows grouped by base_accession and version
Loader-->>IncrementalFlow: previous_by_base
alt no_previous_data
IncrementalFlow->>IncrementalCLI: return summary with zero counts
IncrementalCLI-->>Operator: print first run message
else previous_data_exists
IncrementalFlow->>Comparator: identify_newly_superseded(new_jsonl, previous_by_base)
Comparator->>NewJSONL: stream jsonl lines
NewJSONL-->>Comparator: current assemblies
Comparator-->>IncrementalFlow: newly_superseded, missing
IncrementalFlow->>Summary: print_superseded_summary(newly_superseded)
IncrementalFlow->>Warn: print_missing_versions_warning(missing)
IncrementalFlow->>Writer: append_superseded_to_tsv(newly_superseded, historical_tsv)
Writer->>HistoricalTSV: read existing rows if file exists
Writer->>HistoricalTSV: write merged rows with new superseded entries
Writer-->>IncrementalFlow: write complete
IncrementalFlow-->>IncrementalCLI: summary dict with counts and missing list
IncrementalCLI-->>Operator: print completion summary
end
Sequence diagram for missing version backfill flowsequenceDiagram
actor Operator
participant BackfillCLI as backfill_missing_versions_main
participant BackfillFlow as backfill_missing_versions
participant ConfigLoader as utils_load_config
participant MissingLoader as load_missing_versions
participant HistoricalLoader as load_existing_historical
participant Finder as find_all_assembly_versions
participant Parser as parse_historical_version
participant TSVWriter as write_to_tsv
participant MissingJSON as missing_versions_json
participant HistoricalTSV as assembly_historical_tsv
participant NCBIFTP as ncbi_ftp_service
Operator->>BackfillCLI: invoke with missing_json, yaml_path, work_dir
BackfillCLI->>BackfillFlow: backfill_missing_versions(missing_json, yaml_path, work_dir)
BackfillFlow->>ConfigLoader: load_config(yaml_path)
ConfigLoader-->>BackfillFlow: config with meta file_name
BackfillFlow->>MissingLoader: load_missing_versions(missing_json)
MissingLoader->>MissingJSON: open and read
MissingJSON-->>MissingLoader: list of missing version records
MissingLoader-->>BackfillFlow: missing list
alt no_missing_entries
BackfillFlow-->>BackfillCLI: print no missing versions
BackfillCLI-->>Operator: exit
else has_missing_entries
BackfillFlow->>HistoricalLoader: load_existing_historical(historical_tsv)
HistoricalLoader->>HistoricalTSV: open and read if exists
HistoricalTSV-->>HistoricalLoader: existing rows by genbankAccession
HistoricalLoader-->>BackfillFlow: existing mapping
loop each_missing_entry
BackfillFlow->>Finder: find_all_assembly_versions(new_accession, work_dir)
Finder->>NCBIFTP: list all assembly versions for series
NCBIFTP-->>Finder: version metadata list
Finder-->>BackfillFlow: all_versions
alt target_version_found
BackfillFlow->>Parser: parse_historical_version(version_data, config, base_accession, version_num, current_accession)
Parser->>NCBIFTP: fetch required metadata and files
NCBIFTP-->>Parser: assembly data for missing version
Parser-->>BackfillFlow: parsed_row
BackfillFlow->>BackfillFlow: update parsed mapping with parsed_row
else target_version_missing
BackfillFlow->>BackfillFlow: increment failed counter
end
end
alt any_succeeded
BackfillFlow->>TSVWriter: write_to_tsv(parsed, config)
TSVWriter->>HistoricalTSV: write merged records
TSVWriter-->>BackfillFlow: write complete
end
BackfillFlow-->>BackfillCLI: print backfill summary
BackfillCLI-->>Operator: completion status
end
Flow diagram for incremental update and backfill data lifecycleflowchart TD
subgraph DailyIncrementalUpdate
A_new_jsonl["assembly_data_report.jsonl<br/>current day parse output"]
A_prev_tsv["assembly_current.tsv.previous<br/>previous run parsed TSV"]
A_script["update_historical_incremental.py<br/>run_incremental_historical_update"]
A_hist_tsv["assembly_historical.tsv<br/>historical versions store"]
A_new_jsonl --> A_script
A_prev_tsv --> A_script
A_hist_tsv --> A_script
A_script --> A_hist_tsv
A_script --> A_missing_in_memory["missing versions list<br/>(for backfill)"]
end
subgraph BackfillMissingVersions
B_missing_json["missing_versions.json<br/>list of missing assembly versions"]
B_yaml["assembly_historical.types.yaml"]
B_script["backfill_missing_versions.py<br/>backfill_missing_versions"]
B_cache["cache directories in work_dir"]
B_hist_tsv["assembly_historical.tsv<br/>updated with backfilled rows"]
B_ncbi["NCBI FTP and metadata services"]
B_missing_json --> B_script
B_yaml --> B_script
B_script --> B_cache
B_script --> B_ncbi
B_ncbi --> B_script
B_hist_tsv --> B_script
B_script --> B_hist_tsv
end
A_missing_in_memory -. used to create .-> B_missing_json
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Hey - I've found 5 issues, and left some high level feedback:
- In
identify_newly_superseded, defaultingrelease_datetodatetime.now().isoformat()whenreleaseDateis missing makes the process non-deterministic between runs; consider using a stable default (e.g.Noneor a sentinel value) or explicitly requiringreleaseDateupstream so historical records are reproducible. - In
build_superseded_row, the base accession is derived viaprevious_row["accession"].split(".")[0]while elsewhere you rely onparse_accession; it would be more robust and consistent to reuseparse_accessionhere so any future accession-format changes are handled in one place.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In `identify_newly_superseded`, defaulting `release_date` to `datetime.now().isoformat()` when `releaseDate` is missing makes the process non-deterministic between runs; consider using a stable default (e.g. `None` or a sentinel value) or explicitly requiring `releaseDate` upstream so historical records are reproducible.
- In `build_superseded_row`, the base accession is derived via `previous_row["accession"].split(".")[0]` while elsewhere you rely on `parse_accession`; it would be more robust and consistent to reuse `parse_accession` here so any future accession-format changes are handled in one place.
## Individual Comments
### Comment 1
<location path="flows/parsers/update_historical_incremental.py" line_range="168" />
<code_context>
+ continue
+
+ previous_version = new_version - 1
+ release_date = assembly.get("releaseDate", datetime.now().isoformat())
+
+ if base_acc not in previous_by_base:
</code_context>
<issue_to_address>
**issue (bug_risk):** Using `datetime.now().isoformat()` as a fallback release date may introduce misleading historical metadata.
This creates synthetic metadata that varies by run and doesn’t reflect the source data, which hurts reproducibility and historical accuracy. Prefer either skipping these assemblies, using a clear sentinel (e.g. empty string or explicit placeholder), or logging and leaving the field unset so downstream consumers can distinguish real from inferred dates.
</issue_to_address>
### Comment 2
<location path="flows/parsers/update_historical_incremental.py" line_range="383" />
<code_context>
+ [INPUT_PATH, PREVIOUS_TSV, HISTORICAL_TSV],
+ description="Daily incremental update of historical assembly records",
+ )
+ results = run_incremental_historical_update(
+ new_jsonl=args.input_path,
+ previous_tsv=args.previous_tsv,
</code_context>
<issue_to_address>
**issue (bug_risk):** The incremental updater never writes the `missing_versions` list to the JSON file expected by the backfill script.
`backfill_missing_versions` takes a `--missing_json` file produced by the incremental updater, but this flow only returns `missing_versions` and logs a warning; it never writes the JSON. Unless another component handles this, the documented CLI backfill can’t be used. Please add a way here (e.g., optional arg or helper) to persist `missing_versions` to `missing_versions.json` in the expected format/location.
</issue_to_address>
### Comment 3
<location path="tests/test_incremental_update.py" line_range="296" />
<code_context>
+# TestIncrementalOrchestrator
+# ---------------------------------------------------------------------------
+
+class TestIncrementalOrchestrator:
+ """run_incremental_historical_update orchestrator behaviour."""
+
</code_context>
<issue_to_address>
**suggestion (testing):** Add a test case covering missing-version detection in the orchestrator result
These tests don’t currently exercise the path where `identify_newly_superseded` yields missing versions and populates `missing_versions` in the result. Please add a scenario that:
- uses `previous_tsv` with only v1 for a base,
- provides a JSONL containing v3 but not v2,
- asserts `run_incremental_historical_update` returns `missing_versions_count == 1`, and
- checks that `missing_versions` contains the expected `base_accession` and `missing_version`.
This will verify the end‑to‑end wiring of missing‑version detection in the orchestrator result.
Suggested implementation:
```python
class TestIncrementalOrchestrator:
"""run_incremental_historical_update orchestrator behaviour."""
def test_missing_version_detected_in_orchestrator_result(self, tmp_path):
"""Orchestrator exposes missing-versions reported by identify_newly_superseded."""
# previous TSV with only v1 as the base
previous_tsv = tmp_path / "historical.tsv"
previous_records = [
{
"accession": "BASE123",
"assembly_version": 1,
"superseded_by": "",
"superseded_on": "",
}
]
append_superseded_to_tsv(previous_records, str(previous_tsv))
# JSONL containing v3 but not v2
jsonl_path = tmp_path / "updates.jsonl"
jsonl_path.write_text(
json.dumps(
{
"accession": "BASE123",
"assembly_version": 3,
"superseded_by": "",
"run_date": "2024-01-01",
}
)
+ "\n",
encoding="utf-8",
)
# run orchestrator and assert missing-version detection is surfaced
result = run_incremental_historical_update(
input_jsonl_path=str(jsonl_path),
previous_tsv_path=str(previous_tsv),
output_tsv_path=str(tmp_path / "out.tsv"),
)
assert result.missing_versions_count == 1
assert result.missing_versions == [
{"base_accession": "BASE123", "missing_version": 2}
]
```
Depending on the existing contents of `tests/test_incremental_update.py`, you may need to:
1. Ensure `json` is imported at the top of the file:
`import json`
2. Confirm that `append_superseded_to_tsv` and `run_incremental_historical_update` are already imported or available in scope; if not, import them from the module under test using the same pattern as other tests in this file.
3. Adjust the argument names in the `run_incremental_historical_update(...)` call if the actual signature differs (e.g. positional args instead of `input_jsonl_path=...`, or different parameter names).
4. Align the field names in `previous_records` and JSONL payload with the actual schema used elsewhere in this test file (e.g. `assembly_version` vs `version`, `accession` vs `base_accession`), and adjust the expected `missing_versions` dictionaries accordingly if a typed object or dataclass is returned instead of plain dicts.
</issue_to_address>
### Comment 4
<location path="tests/test_incremental_update.py" line_range="464-473" />
<code_context>
+ def test_no_write_when_version_not_found_in_ftp(
</code_context>
<issue_to_address>
**suggestion (testing):** Add coverage for the case where no versions are returned from FTP at all
There’s a related edge case where `find_all_assembly_versions` returns an empty list (no versions at all). That path currently increments `failed` and continues, but isn’t covered.
Please add a test with `mock_find.return_value = []` that asserts:
- `parse_historical_version` is not called
- `write_to_tsv` is not called (if all entries fail)
- `backfill_missing_versions` returns without raising
to cover the “FTP is empty, skip gracefully” branch.
Suggested implementation:
```python
assert "GCA_000222935.1" in written_parsed
@patch.object(backfill_module, "write_to_tsv")
@patch.object(backfill_module, "parse_historical_version")
@patch.object(backfill_module, "find_all_assembly_versions")
@patch.object(backfill_module, "utils")
def test_no_versions_returned_from_ftp(
self, mock_utils, mock_find, mock_parse, mock_write, tmp_path
):
"""If FTP has no versions at all, skip gracefully without writing."""
mock_utils.load_config.return_value = MagicMock(
meta={"file_name": str(tmp_path / "historical.tsv")}
)
# No versions discovered on FTP
mock_find.return_value = []
missing_json = self._write_missing_json(
tmp_path,
[
{
"accession": "GCA_000412225.1",
"refseq_accession": "GCF_000412225.1",
"assembly_name": "Example assembly 1",
},
{
"accession": "GCA_000222935.1",
"refseq_accession": "GCF_000222935.1",
"assembly_name": "Example assembly 2",
},
],
)
# Should not raise even though all entries "fail" to resolve
backfill_module.backfill_missing_versions(
missing_json=missing_json,
work_dir=str(tmp_path),
)
# No parsing or writing should happen if nothing can be resolved
mock_parse.assert_not_called()
mock_write.assert_not_called()
```
1. Ensure `backfill_module` is already imported in this test module (it likely is, since it's used in nearby tests).
2. If the helper `_write_missing_json` expects a different schema for the entries, adjust the dictionaries in the new test accordingly to match the rest of the test file.
3. If `backfill_missing_versions` requires additional arguments in other tests (e.g. config paths, logging), pass them here in the same way to keep the call consistent.
</issue_to_address>
### Comment 5
<location path="tests/test_incremental_update.py" line_range="430-439" />
<code_context>
+ def test_merges_with_existing_historical(
</code_context>
<issue_to_address>
**suggestion (testing):** Consider testing behaviour when parsing a missing version fails mid-run
To better cover `backfill_missing_versions`, please add a case where `parse_historical_version` raises for one of multiple `missing_json` entries while others succeed. Then assert that `write_to_tsv` still receives the successfully parsed records and that processing continues despite the exception, confirming the intended best-effort behaviour.
Suggested implementation:
```python
@patch.object(backfill_module, "write_to_tsv")
@patch.object(backfill_module, "find_all_assembly_versions")
@patch.object(backfill_module, "parse_historical_version")
@patch.object(backfill_module, "utils")
def test_merges_with_existing_historical(
self, mock_utils, mock_parse, mock_find, mock_write, tmp_path
):
"""
New rows must be merged with existing historical rows before writing.
Also verify best-effort behaviour: if parsing one of multiple missing
versions fails, successfully parsed versions are still written and
processing continues.
"""
historical_tsv = tmp_path / "historical.tsv"
existing_historical_row = {
"genbankAccession": "GCA_000412225.1",
"version_status": "superseded",
}
write_tsv(historical_tsv, [existing_historical_row])
mock_utils.load_config.return_value = MagicMock(
meta={"file_name": str(historical_tsv)}
)
# Two missing versions: one will parse successfully, the other will raise.
missing_versions = [
{"accession": "GCA_000412225.2"},
{"accession": "GCA_000412225.3"},
]
# Adjust this return value structure if find_all_assembly_versions returns
# something different in your implementation.
mock_find.return_value = (missing_versions, [])
successful_parsed_row = {
"genbankAccession": "GCA_000412225.2",
"version_status": "current",
}
def parse_side_effect(version_data, *args, **kwargs):
# First missing version parses successfully, second raises.
if version_data["accession"] == "GCA_000412225.3":
raise ValueError("Failed to parse historical version")
return successful_parsed_row
mock_parse.side_effect = parse_side_effect
# Run the backfill logic; this should call parse_historical_version for
# each missing version and write_to_tsv once with both historical and
# successfully parsed rows.
backfill_module.backfill_missing_versions(work_dir=str(tmp_path))
# Both missing versions should have been attempted.
assert mock_parse.call_count == 2
# write_to_tsv should be called once with merged rows that include:
# - the original historical row
# - the successfully parsed new row
mock_write.assert_called_once()
write_path, written_rows = mock_write.call_args[0]
assert str(write_path) == str(historical_tsv)
assert existing_historical_row in written_rows
assert successful_parsed_row in written_rows
# We should not have written any row corresponding to the failed parse.
accessions = {row["genbankAccession"] for row in written_rows}
assert "GCA_000412225.3" not in accessions
```
The above test assumes:
1. `backfill_missing_versions` accepts a `work_dir` keyword argument and internally uses `utils.load_config` and `find_all_assembly_versions`. If the signature differs, update the call accordingly.
2. `find_all_assembly_versions` returns a `(missing_versions, existing_versions)` tuple where `missing_versions` is iterable and each element is a dict with an `"accession"` key. Adjust `mock_find.return_value` to match the actual return type/shape if necessary.
3. `write_to_tsv(path, rows)` is called by `backfill_missing_versions` with the `historical_tsv` path as the first positional argument and a list of dict rows as the second. If your implementation uses different argument positions or names, update the `write_path, written_rows = mock_write.call_args[0]` unpacking and subsequent assertions to match.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
|
Closing to revise before re-review. |
dfe6087 to
5f8d54c
Compare
5f8d54c to
0ea8206
Compare
|
Hi @fchen13, Sorry it's taken me a while to get round to looking at this. Before I test if it runs I just want to clarify some parser-updater conventions - looks like you have an updater - Can you change the names to reflect which one updates the json and which one parses that into tsv, and move the updater to the updaters directory. Since you started on this, I've added an AGENTs file and more contributing instructions to main that should help with defining the difference between the script types. Worth noting that updaters aren't tied to the shared_args in the same way as parsers. If you merge the latest main branch back into your branch you should be able to make use of the AGENTS file, but I also found there was a validation issue relating to the way the internal imports were defined so you may find there are mismatches with the import syntax to deal with when you merge. Hopefully the file renaming and param changes will be straightforward - let me know when they are done and I'll have a closer look at the code and try a test run |
fchen13
left a comment
There was a problem hiding this comment.
- the daily incremental update has been renamed to parse_assembly_versions()
- the backfill missing version has been renamed to update_assembly_versions(), emit_event added, wrapper/plugin removed
- test_assembly_versions() includes all updated imports and references
|
@rjchallis sorry that I forgot to tag you in my last comment. below are the changes made in the most recent commit:
|
rjchallis
left a comment
There was a problem hiding this comment.
Thanks @fchen13. There are still some departures from the pipeline conventions here that I think we need to resolve. I confess I'm having trouble wrapping my head around exactly how to organise this as it doesn't quite fit the pattern of other datasets. Also this implementation is slightly complicated by the fact that the backfill parser breaks some conventions, but does so as a one-off script so the division of functionality is less critical.
I think the main problem to get around is the mix of fetching and parsing in the updater. This file should really only concern itself with producing a JSONL file for any historic assemblies that are new since the last run. Some of the current approach is based on efficiency and reuse of parsed data, but for this pipeline, it is probably better to focus on clear separation of responsibilities, even if that means a few extra fetches.
Both of the scripts contain command line argument definitions. For the updater, this should be moved to the main args module file alongside the other arg definitions. For the parser, having non-standard args will break the fetch-parse-validate flow so is not allowed. These extra args are setting file/dir names that can be derived from the standard args so there is no need to add them. You can set up a helper function to set the names of any additional files based on the yaml/tsv and assume they will always be in the same directory as a file defined by a shared arg. This is the approach used in the main ncbi assembly data updater/parser.
Also, the updater shouldn't import functions from a parser - any shared functions must be in a separate module file. Assume that any of the current parsers/updaters could be renamed, moved or deleted at some point so it is important to ensure that code doesn't rely on these files existing or with the current name/location.
Both parsers follow the plugin/flow/shared_args conventions established in Phase 0 (parse_backfill_historical_versions.py).
Summary by Sourcery
Introduce a daily incremental pipeline for tracking newly superseded NCBI assembly versions and targeted backfill of missing historical versions.
New Features:
Enhancements:
Tests: