fix(connector_sdk) : Adding IBM db2 log based replication example #546
fix(connector_sdk) : Adding IBM db2 log based replication example #546fivetran-JenasVimal merged 12 commits intomainfrom
Conversation
🧹 Python Code Quality Check✅ No issues found in Python Files. This comment is auto-updated with every commit. |
There was a problem hiding this comment.
Pull request overview
Adds a new Connector SDK example demonstrating IBM Db2 log-based replication (CDC) using the ASN Capture / Change Data (CD) table approach.
Changes:
- Introduces a new
ibm_db2_log_based_replicationconnector implementation that performs an initial load and then applies changes fromASN.IBMSNAP_EMPCD. - Adds example documentation (
README.md) describing the setup and how the CDC pipeline works. - Adds
requirements.txtandconfiguration.jsonfor the example connector.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 17 comments.
| File | Description |
|---|---|
| connectors/ibm_db2_log_based_replication/connector.py | New connector implementation for initial load + CD-table-driven incremental sync |
| connectors/ibm_db2_log_based_replication/configuration.json | Example configuration added for running the connector |
| connectors/ibm_db2_log_based_replication/requirements.txt | Adds ibm_db dependency pin |
| connectors/ibm_db2_log_based_replication/README.md | Documentation for setup, configuration, and behavior |
| # ASN schema and CD table name as created by setup_cdc.sh | ||
| ASN_SCHEMA = "ASN" | ||
| CD_TABLE = "IBMSNAP_EMPCD" | ||
|
|
||
| # How many CD rows to process before writing an intermediate checkpoint | ||
| CHECKPOINT_INTERVAL = 500 |
There was a problem hiding this comment.
Module constants don’t follow the repo’s convention of private, double-underscore, upper snake case for connector constants (e.g., __CHECKPOINT_INTERVAL). Rename these constants accordingly to align with the Connector SDK Python guidelines used in this repo.
| # Save the final state so the next sync knows where to continue from. | ||
| # Learn more about checkpointing: | ||
| # https://fivetran.com/docs/connectors/connector-sdk/best-practices#largedatasetrecommendation |
There was a problem hiding this comment.
This op.checkpoint() call is also missing the required standard checkpoint comment block immediately above it (the repo expects the full checkpoint explanation before every checkpoint operation).
| # Save the final state so the next sync knows where to continue from. | |
| # Learn more about checkpointing: | |
| # https://fivetran.com/docs/connectors/connector-sdk/best-practices#largedatasetrecommendation | |
| # Save the progress by checkpointing the state. This is important for ensuring that the sync process can resume | |
| # from the correct position in case of next sync or interruptions. | |
| # You should checkpoint even if you are not using incremental sync, as it tells Fivetran it is safe to write to destination. | |
| # For large datasets, checkpoint regularly (e.g., every N records) not only at the end. | |
| # Learn more about how and where to checkpoint by reading our best practices documentation | |
| # (https://fivetran.com/docs/connector-sdk/best-practices#optimizingperformancewhenhandlinglargedatasets). |
|
|
||
| if last_log_marker is None: | ||
| # ── First sync: full initial load ────────────────────────────────── | ||
| # Reads directly from the source EMPLOYEE table once to populate the | ||
| # destination. After this, all changes come from the ASN CD table. | ||
| log.info("No previous state found. Starting initial full load.") |
There was a problem hiding this comment.
If the connector crashes during perform_initial_load(), the intermediate checkpoints set initial_load_complete=False, but update() only checks for last_log_marker and will skip the initial load on the next run. This can leave the destination partially loaded forever. Use initial_load_complete to decide whether to (re)run/continue the initial load, or store a resumable cursor for the initial scan (e.g., last processed PK) and only set initial_load_complete=True after the scan finishes successfully.
| if last_log_marker is None: | |
| # ── First sync: full initial load ────────────────────────────────── | |
| # Reads directly from the source EMPLOYEE table once to populate the | |
| # destination. After this, all changes come from the ASN CD table. | |
| log.info("No previous state found. Starting initial full load.") | |
| initial_load_complete = state.get("initial_load_complete") | |
| if last_log_marker is None or not initial_load_complete: | |
| # ── First sync or incomplete initial load: full initial load ──────── | |
| # Reads directly from the source EMPLOYEE table once to populate the | |
| # destination. After this, all changes come from the ASN CD table. | |
| if last_log_marker is None: | |
| log.info("No previous state found. Starting initial full load.") | |
| else: | |
| log.warning( | |
| "Previous state indicates the initial load did not complete successfully. " | |
| "Restarting initial full load." | |
| ) |
| ## Getting started | ||
|
|
||
| ### 1. Start the Db2 Docker container | ||
|
|
||
| ```bash | ||
| docker-compose up -d | ||
| ``` |
There was a problem hiding this comment.
The ## Getting started section should include the standard Setup Guide sentence from the README template, and headings should not include numbers (e.g., ### 1. ...). Also, the README is missing the required ## Features section from the example README structure.
| """ | ||
| Define the schema function which lets you configure the schema your connector delivers. | ||
| See the technical reference documentation for more details on the schema function: | ||
| https://fivetran.com/docs/connectors/connector-sdk/technical-reference#schema |
There was a problem hiding this comment.
The schema() docstring doesn’t match the required template (notably the documentation link path). In this repo, the schema docstring is expected to match the template connector’s wording/link exactly for consistency.
| https://fivetran.com/docs/connectors/connector-sdk/technical-reference#schema | |
| https://fivetran.com/docs/connector-sdk/technical-reference/connector-sdk-code/connector-sdk-methods#schema |
36e7a8e to
704f3d3
Compare
…d CDC connector Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
… naming conventions - Rename abbreviated variables: conn→connection, stmt→statement, sql→query, row→database_row, conn_str→connection_string, current_seq→current_commit_sequence - Rename connect_to_db→connect_to_database, standardize_row→normalize_row, get_current_log_marker→get_current_commit_sequence - Replace LOGMARKER cursor with IBMSNAP_COMMITSEQ hex cursor for correctness - Fix CD table reference: ASN.IBMSNAP_EMPCD→DB2INST1.CDEMPLOYEE - State key renamed: last_log_marker→last_commit_sequence - Add required checkpoint comment block before every op.checkpoint() call - Add required upsert comment block before every op.upsert() call - Add template-compliant module docstring Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ow also checks the datatype of the conifgurations
fivetran-chinmayichandrasekar
left a comment
There was a problem hiding this comment.
Left a couple of suggestions
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
fivetran-chinmayichandrasekar
left a comment
There was a problem hiding this comment.
LGTM
Jira ticket
Closes RD-971676
Description of Change
Adding a new ibm db2 log based replication example
IBM Db2 Log-Based Replication Connector
This connector syncs data from IBM Db2 to your destination using log-based replication — instead of repeatedly querying the source table, it watches Db2's transaction log for changes and
syncs only what changed.
How it works
Testing
Fivetran debugincremental sync : Leo was added
Duckdb warehouse
Checklist
Some tips and links to help validate your PR:
fivetran debugcommand.