diff --git a/CHANGELOG.md b/CHANGELOG.md index 15b4472..0a7f8d0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed +- Cursor resume crashed with `ValueError: malformed node or string on line 1: ` whenever the stored pagination dict contained a boolean or `None`. Pagination is written to the backend via `json.dumps`, but the producer was reading it back with `ast.literal_eval`, which can't parse JSON's `true` / `false` / `null` (they become `ast.Name` nodes). Connectors like Notion (`has_more`, `data_sources_loaded`) and Cycle (GraphQL `pageInfo.hasNextPage`) could never resume — the first retry would hit this error and the pod would exit with `BACKEND_ERROR`. Switched the producer to `json.loads`, matching the write path. + ## [0.3.15] - 2026-04-17 ### Fixed diff --git a/bizon/engine/pipeline/producer.py b/bizon/engine/pipeline/producer.py index d4a8813..5abd519 100644 --- a/bizon/engine/pipeline/producer.py +++ b/bizon/engine/pipeline/producer.py @@ -1,4 +1,4 @@ -import ast +import json import multiprocessing import multiprocessing.synchronize import threading @@ -58,7 +58,7 @@ def get_or_create_cursor(self, job_id: str, session=None) -> Cursor: total_records=job.total_records_to_fetch, iteration=cursor_from_db.to_source_iteration + 1, rows_fetched=self.backend.get_number_of_written_rows_for_job(job_id=job_id), - pagination=ast.literal_eval(cursor_from_db.pagination), + pagination=json.loads(cursor_from_db.pagination), ) else: # Get the total number of records diff --git a/tests/engine/test_producer_recovery.py b/tests/engine/test_producer_recovery.py index 642e92b..01e2c60 100644 --- a/tests/engine/test_producer_recovery.py +++ b/tests/engine/test_producer_recovery.py @@ -144,3 +144,43 @@ def test_e2e_dummy_to_file_recovery(file_destination, my_sqlite_backend, sqlite_ assert cursor.source_name == "dummy" assert cursor.pagination == {"cursor": CURSOR} assert cursor.iteration == N_ITERATION + 1 # Should always be last iteration written to destination + 1 + + +def test_recovery_with_json_only_pagination_values(file_destination, my_sqlite_backend, sqlite_db_session): + """Regression test: pagination dicts with bool/None values must survive resume. + + These are valid JSON but not Python literals, so parsing with ast.literal_eval + raised `ValueError: malformed node or string`. Stored via json.dumps, they + must be read back with json.loads. + """ + N_ITERATION = 2 + CURSOR = str(uuid4()) + PAGINATION = {"cursor": CURSOR, "has_more": True, "next_page": None} + + file_destination.buffer.buffer_size = 0 + file_destination.write_or_buffer_records( + df_destination_records=df_destination_records, + iteration=N_ITERATION, + session=sqlite_db_session, + pagination=PAGINATION, + ) + runner = RunnerFactory.create_from_config_dict(yaml.safe_load(BIZON_CONFIG_DUMMY_TO_FILE)) + + bizon_config = runner.bizon_config + config = runner.config + kwargs = runner.get_kwargs() + source = AbstractRunner.get_source(bizon_config=bizon_config, config=config) + queue = AbstractRunner.get_queue(bizon_config=bizon_config, **kwargs) + + producer = AbstractRunner.get_producer( + bizon_config=bizon_config, + source=source, + queue=queue, + backend=my_sqlite_backend, + ) + + cursor = producer.get_or_create_cursor(job_id=file_destination.sync_metadata.job_id, session=sqlite_db_session) + + assert cursor is not None + assert cursor.pagination == PAGINATION + assert cursor.iteration == N_ITERATION + 1