English | 中文
A complete demonstration of Change Data Capture (CDC) from MySQL to BigQuery using Google Cloud Platform services.
The pipeline continuously polls MySQL for changes based on the updated_at column and syncs modified records to BigQuery using the Storage Write API with UPSERT semantics.
IMPORTANT: This project is designed to demonstrate how to use Dataflow to perform CDC into BigQuery using the Storage Write API with UPSERT semantics. It is NOT a production-ready MySQL CDC solution.
For production MySQL CDC that handles INSERT, UPDATE, and DELETE operations, you should use binlog-based solutions such as:
- Debezium with Dataflow - Reads MySQL binlog for real-time change capture
- Google Datastream - Managed CDC service for MySQL to BigQuery
This demo periodically reads the MySQL table and relies solely on the
updated_atcolumn for change detection, which cannot detect DELETE operations.
| Component | Description |
|---|---|
| MySQL (Cloud SQL) | Source database with sample item table |
| Dataflow Pipeline | Java/Apache Beam pipeline for CDC processing |
| BigQuery | Destination data warehouse |
Before starting, ensure you have:
-
Google Cloud SDK installed and configured
gcloud --version
-
Java 11+ and Maven 3.6+ for Dataflow pipeline
java -version mvn -version
-
Python 3.8+ for MySQL and BigQuery scripts
python3 --version
-
GCP Project with the following APIs enabled:
- Cloud SQL Admin API
- BigQuery API
- Dataflow API
- Compute Engine API
-
Service Account with appropriate permissions:
- Cloud SQL Admin
- BigQuery Admin
- Dataflow Admin
- Storage Admin
# Navigate to project directory
cd bqcdc
# Review and edit configuration (optional)
# Default values work out of the box
cat conf.yml# Create virtual environment and install dependencies
make setup
# Or if you prefer to install dependencies globally
make install_deps# Create Cloud SQL instance, database, and seed data
# This may take 5-10 minutes for instance creation
make init_mysqlWhat this does:
- Creates Cloud SQL MySQL 8.0 instance
- Generates a secure root password (saved to
mysql.password) - Configures public access for demo purposes
- Creates
dingocdcdatabase withitemtable - Inserts 10 sample records
# Create BigQuery dataset and table
make init_bqWhat this does:
- Creates
dingocdcdataset - Creates
itemtable with matching schema
# Build the Java pipeline JAR
make build_dataflowOpen Terminal 1 - Start the Dataflow job:
make run_cdcOpen Terminal 2 - Start continuous updates:
make update_mysqlThis will randomly update item prices every 1-3 seconds until you press Ctrl+C.
# Query the BigQuery table to see synced data
bq query --project_id=du-hast-mich \
"SELECT * FROM dingocdc.item ORDER BY updated_at DESC LIMIT 10"Or use the BigQuery Console in GCP.
Edit conf.yml to customize:
gcp:
project_id: "du-hast-mich" # Your GCP project ID
region: "us-central1" # GCP region
service_account_path: "~/workspace/google/sa.json"
mysql:
instance_name: "dingomysql" # Cloud SQL instance name
db_name: "dingocdc" # Database name
table_name: "item" # Table name
tier: "db-f1-micro" # Machine type
bigquery:
dataset: "dingocdc" # BigQuery dataset
table_name: "item" # BigQuery table
location: "US" # Dataset location
dataflow:
job_name: "dingo-cdc" # Dataflow job name
num_workers: 1 # Initial number of workers
max_workers: 2 # Maximum number of workers
machine_type: "e2-medium" # Worker machine type
cdc:
polling_interval_seconds: 10 # How often to poll for changes
update_all_if_ts_null: true # Startup behavior (see below)| Option | Default | Description |
|---|---|---|
polling_interval_seconds |
10 | How often (in seconds) the pipeline polls MySQL for changes |
update_all_if_ts_null |
true | Controls behavior on pipeline startup: true = full table sync first, false = only capture new changes |
| Target | Description |
|---|---|
make help |
Show all available commands |
make setup |
Create virtual env and install dependencies |
make init_mysql |
Create Cloud SQL instance and seed data |
make update_mysql |
Start continuous MySQL updates |
make init_bq |
Create BigQuery dataset and table |
make build_dataflow |
Build Dataflow pipeline JAR |
make run_cdc |
Launch Dataflow CDC job |
make status |
Show status of all components |
make cleanup_all |
Delete all GCP resources |
To use a custom Python interpreter, set the PYTHON3 variable:
# Use a specific Python version
make PYTHON3=/usr/bin/python3.11 setup
# Use pyenv Python
make PYTHON3=~/.pyenv/shims/python3 init_mysql
# Use conda Python
make PYTHON3=/opt/conda/bin/python3 init_bq| Column | Type | Description |
|---|---|---|
id |
INTEGER | Primary key (1-10) |
description |
STRING | Item description |
price |
FLOAT | Item price (randomly updated) |
created_at |
DATETIME | Record creation timestamp |
updated_at |
DATETIME | Last update timestamp |
bqcdc/
├── conf.yml # Configuration file
├── Makefile # Build and run automation
├── README.md # This file
├── mysql.password # Generated MySQL password (gitignored)
├── .gitignore # Git ignore rules
│
├── mysql/ # MySQL-related scripts
│ ├── init_mysql.py # Initialize Cloud SQL instance
│ ├── update_mysql.py # Continuous update script
│ └── requirements.txt # Python dependencies
│
├── bigquery/ # BigQuery-related scripts
│ ├── init_bq.py # Initialize BigQuery
│ └── requirements.txt # Python dependencies
│
└── dataflow/ # Dataflow pipeline (Java/Maven)
├── pom.xml # Maven configuration
└── src/main/java/com/bindiego/cdc/
├── CdcPipeline.java # Streaming CDC pipeline with UPSERT
└── CdcPipelineOptions.java # Pipeline options interface
This pipeline implements a stateful streaming CDC approach using Apache Beam's ValueState to track the last processed updated_at timestamp in memory. The pipeline runs continuously and polls MySQL at a configurable interval.
┌────────────────────────────────────────────────────────────────────────────┐
│ STREAMING CDC FLOW DIAGRAM │
├────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────┐ │
│ │ Pipeline Start │ │
│ └────────┬────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────┐ │
│ │ Is lastTimestamp │ │
│ │ NULL? (First Poll) │ │
│ └────────┬────────────┘ │
│ │ │
│ ┌─────┴─────┐ │
│ │ │ │
│ YES NO │
│ │ │ │
│ ▼ │ │
│ ┌──────────────────────┐ │
│ │ updateAllIfTsNull? │ │
│ └──────────┬───────────┘ │
│ │ │
│ ┌──────┴──────┐ ┌────────────────────┐ │
│ │ │ │ │ │
│ TRUE FALSE │ │ │
│ │ │ │ ▼ │
│ ▼ ▼ │ ┌─────────────────────┐│
│ ┌───────────┐ ┌───────────────────┐ │ │ Query records WHERE ││
│ │ FULL SYNC │ │ Query MAX │ │ │ updated_at > ││
│ │ │ │ (updated_at) │ │ │ lastTimestamp ││
│ │ Query ALL │ │ │ │ └──────────┬──────────┘│
│ │ records │ │ Record timestamp │ │ │ │
│ │ │ │ (no data sync) │ │ ▼ │
│ │ Sync to │ │ │ │ ┌─────────────────────┐│
│ │ BigQuery │ │ Wait for next │ │ │ Sync changed records││
│ │ │ │ poll │ │ │ to BigQuery ││
│ │ Record │ └───────────────────┘ │ └──────────┬──────────┘│
│ │ max(ts) │ │ │ │
│ └───────────┘ │ ▼ │
│ │ ┌─────────────────────┐│
│ │ │ Update lastTimestamp││
│ │ │ to max(updated_at) ││
│ │ └──────────┬──────────┘│
│ │ │ │
│ └─────────────┤ │
│ ▼ │
│ ┌─────────────────────┐ │
│ │ Wait polling_interval│ │
│ │ seconds, then repeat │ │
│ └─────────────────────┘ │
└────────────────────────────────────────────────────────────────────────────┘
Case A: update_all_if_ts_null = true (Full Table Sync)
- Query:
SELECT * FROM table ORDER BY updated_at - Emit ALL records to BigQuery
- Record
max(updated_at)as the newlastTimestamp - Subsequent polls: Only capture records newer than
lastTimestamp
Use Case: When you need existing data in BigQuery before capturing changes.
Case B: update_all_if_ts_null = false (Incremental Only)
- Query:
SELECT MAX(updated_at) FROM table - Record this timestamp as
lastTimestamp - Emit NOTHING to BigQuery (no initial sync)
- Wait for next poll to capture new changes
Use Case: When you only want to capture NEW changes going forward, ignoring existing data.
For all polls after the first:
- Query:
SELECT * FROM table WHERE updated_at > lastTimestamp ORDER BY updated_at - Emit only the changed records to BigQuery
- Update
lastTimestamptomax(updated_at)from the fetched records - If no changes found, keep the existing
lastTimestamp
Time Event lastTimestamp BigQuery Action
───── ───── ───────────── ──────────────
T0 Pipeline starts NULL -
(update_all_if_ts_null=false)
Query MAX(updated_at)=10:00:00
Record timestamp 10:00:00 Nothing synced
T1 Poll #1 (10 sec later) 10:00:00 -
Query WHERE updated_at > 10:00
No records found 10:00:00 Nothing synced
T2 MySQL UPDATE item SET - -
price=99.99 WHERE id=5
(updated_at = 10:00:15)
T3 Poll #2 (10 sec later) 10:00:00 -
Query WHERE updated_at > 10:00
Found 1 record (id=5)
Sync to BigQuery 10:00:15 1 record inserted
Update timestamp
T4 Poll #3 (10 sec later) 10:00:15 -
Query WHERE updated_at > 10:00:15
No records found 10:00:15 Nothing synced
-
BigQuery CDC with UPSERT: This pipeline uses BigQuery's native CDC feature with the Storage Write API (
STORAGE_API_AT_LEAST_ONCEmethod). It usesRowMutationInformationwithMutationType.UPSERTto update existing rows by primary key (id) rather than appending new rows. Theupdated_attimestamp is used as the sequence number for CDC ordering. -
Primary Key Required: The BigQuery table must have a PRIMARY KEY constraint on the
idcolumn. Theinit_bq.pyscript creates the table withPRIMARY KEY (id) NOT ENFORCED. -
State Persistence: The
lastTimestampis stored in Beam's state backend. If the pipeline restarts, the state may be lost depending on the runner configuration. For production, consider persisting the watermark to an external store. -
Timestamp Precision: Uses
>(greater than) comparison to avoid re-processing records with the exact same timestamp. Ensure yourupdated_atcolumn has sufficient precision (milliseconds recommended). -
Single-Threaded Polling: Uses a single key ("cdc-poller") to ensure all state is managed in one place. This serializes the polling but guarantees consistency.
Important: This demo uses the updated_at column to identify data changes, which has limitations:
-
No DELETE detection: Records deleted from MySQL will NOT be detected or removed from BigQuery. The polling approach only sees records that exist with
updated_at > lastTimestamp. -
Requires timestamp column: Your source table must have a reliably updated timestamp column.
-
Higher latency: Changes are detected on a polling interval (default 10 seconds), not in real-time.
For production use cases, consider using binlog-based CDC solutions like:
- Google Datastream - Managed CDC service that reads MySQL binlog
- Debezium + Pub/Sub - Open-source binlog parser with message queue
These solutions capture INSERT, UPDATE, and DELETE operations in real-time.
However, the primary purpose of this demo is to illustrate how to write Apache Beam/Dataflow code with BigQuery's native CDC (Storage Write API with UPSERT semantics), not to provide a production-ready CDC solution. The polling mechanism is intentionally simple to keep the focus on the Dataflow pipeline implementation.
| Approach | Pros | Cons |
|---|---|---|
| This Demo (Polling + Storage Write API CDC) | True UPSERT semantics, no binlog access needed, uses native BigQuery CDC, simple to understand | No DELETE support, higher latency, depends on updated_at column |
| Google Datastream | Managed, real-time, binlog-based, supports DELETE | Additional service cost |
| Debezium + Pub/Sub | Real-time, open-source, supports DELETE | Complex setup, requires binlog access |
# Check if instance is running
gcloud sql instances describe dingomysql --format="value(state)"
# Verify public IP access
gcloud sql instances describe dingomysql --format="value(ipAddresses)"
# Check authorized networks
gcloud sql instances describe dingomysql --format="value(settings.ipConfiguration.authorizedNetworks)"# List datasets
bq ls --project_id=du-hast-mich
# Describe table
bq show du-hast-mich:dingocdc.item# List running jobs
gcloud dataflow jobs list --region=us-central1 --filter="state:Running"
# View job logs
gcloud dataflow jobs show JOB_ID --region=us-central1To remove all GCP resources created by this demo:
# Cancel Dataflow jobs, delete BigQuery dataset, and Cloud SQL instance
make cleanup_allOr individually:
make cleanup_dataflow # Cancel Dataflow jobs
make cleanup_bq # Delete BigQuery dataset
make cleanup_mysql # Delete Cloud SQL instanceThis demo uses minimal resources:
- Cloud SQL:
db-f1-micro(~$9/month if running 24/7) - Dataflow: 1-2x
e2-mediumworkers with Streaming Engine (pay per use) - BigQuery: Pay per query/storage
Recommendation: Run make cleanup_all when done to avoid charges.
| Dependency | Version | Notes |
|---|---|---|
| Apache Beam | 2.70.0 | Core streaming framework |
| google-auth-library | 1.34.0+ | Required for mTLS support (CertificateSourceUnavailableException) |
| MySQL Connector/J | 8.0.33 | JDBC driver for MySQL |
| Java | 11+ | Runtime requirement |
- Streaming Engine: Enabled via
--experiments=enable_streaming_enginefor better resource utilization - Storage Write API: Uses
STORAGE_API_AT_LEAST_ONCEmethod for CDC writes - Stateful Processing: Uses Beam's
ValueStatefor tracking last processed timestamp - CDC with Primary Key: BigQuery table uses
PRIMARY KEY (id) NOT ENFORCEDfor UPSERT semantics
