Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Fixed
- Cursor resume crashed with `ValueError: malformed node or string on line 1: <ast.Name object>` whenever the stored pagination dict contained a boolean or `None`. Pagination is written to the backend via `json.dumps`, but the producer was reading it back with `ast.literal_eval`, which can't parse JSON's `true` / `false` / `null` (they become `ast.Name` nodes). Connectors like Notion (`has_more`, `data_sources_loaded`) and Cycle (GraphQL `pageInfo.hasNextPage`) could never resume — the first retry would hit this error and the pod would exit with `BACKEND_ERROR`. Switched the producer to `json.loads`, matching the write path.

## [0.3.15] - 2026-04-17

### Fixed
Expand Down
4 changes: 2 additions & 2 deletions bizon/engine/pipeline/producer.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import ast
import json
import multiprocessing
import multiprocessing.synchronize
import threading
Expand Down Expand Up @@ -58,7 +58,7 @@ def get_or_create_cursor(self, job_id: str, session=None) -> Cursor:
total_records=job.total_records_to_fetch,
iteration=cursor_from_db.to_source_iteration + 1,
rows_fetched=self.backend.get_number_of_written_rows_for_job(job_id=job_id),
pagination=ast.literal_eval(cursor_from_db.pagination),
pagination=json.loads(cursor_from_db.pagination),
)
else:
# Get the total number of records
Expand Down
40 changes: 40 additions & 0 deletions tests/engine/test_producer_recovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,3 +144,43 @@ def test_e2e_dummy_to_file_recovery(file_destination, my_sqlite_backend, sqlite_
assert cursor.source_name == "dummy"
assert cursor.pagination == {"cursor": CURSOR}
assert cursor.iteration == N_ITERATION + 1 # Should always be last iteration written to destination + 1


def test_recovery_with_json_only_pagination_values(file_destination, my_sqlite_backend, sqlite_db_session):
"""Regression test: pagination dicts with bool/None values must survive resume.

These are valid JSON but not Python literals, so parsing with ast.literal_eval
raised `ValueError: malformed node or string`. Stored via json.dumps, they
must be read back with json.loads.
"""
N_ITERATION = 2
CURSOR = str(uuid4())
PAGINATION = {"cursor": CURSOR, "has_more": True, "next_page": None}

file_destination.buffer.buffer_size = 0
file_destination.write_or_buffer_records(
df_destination_records=df_destination_records,
iteration=N_ITERATION,
session=sqlite_db_session,
pagination=PAGINATION,
)
runner = RunnerFactory.create_from_config_dict(yaml.safe_load(BIZON_CONFIG_DUMMY_TO_FILE))

bizon_config = runner.bizon_config
config = runner.config
kwargs = runner.get_kwargs()
source = AbstractRunner.get_source(bizon_config=bizon_config, config=config)
queue = AbstractRunner.get_queue(bizon_config=bizon_config, **kwargs)

producer = AbstractRunner.get_producer(
bizon_config=bizon_config,
source=source,
queue=queue,
backend=my_sqlite_backend,
)

cursor = producer.get_or_create_cursor(job_id=file_destination.sync_metadata.job_id, session=sqlite_db_session)

assert cursor is not None
assert cursor.pagination == PAGINATION
assert cursor.iteration == N_ITERATION + 1
Loading