This Python script scans a Google Cloud Platform (GCP) project's BigQuery datasets and tables to analyze storage costs. It compares the Logical (default) vs. Physical (compressed) storage billing models and recommends the most cost-effective option for each dataset.
- Project-Wide Scan: Iterates through all datasets and tables in a given GCP project.
- Dataset-Level Aggregation: Aggregates storage statistics and costs at the Dataset level, where billing models are applied.
- Cost Comparison: Calculates estimated monthly costs for both Logical and Physical storage models using standard pricing.
- Optimization Recommendations: Suggests switching to "Physical" if it offers savings, or "Logical" if it's cheaper.
- Sorted Results: Outputs results sorted by "Potential Monthly Savings ($)" in descending order, highlighting the biggest opportunities first.
- CSV Export: Saves detailed results including dataset sizes, table counts, costs, and potential savings to a CSV file.
- Python 3.7+
- A Google Cloud Project with BigQuery API enabled.
- Google Cloud credentials with
bigquery.tables.listandbigquery.tables.getpermissions.
-
Clone this repository:
git clone https://github.com/your-username/bigquery-storage-optimizer.git cd bigquery-storage-optimizer -
Create and activate a virtual environment:
python3 -m venv venv source venv/bin/activate -
Install dependencies:
pip install -r requirements.txt
-
Authenticate with Google Cloud (if running locally):
gcloud auth application-default login
Or ensure your service account key environment variable is set.
-
Run the script:
python optimize_bq_storage.py YOUR_PROJECT_ID --output bq_optimization_results.csv
-
Check the Output: By default the script will generate
bq_optimization_results.csvin the current directory. Open this file to view the analysis and recommended actions.
The script uses standard US multi-region pricing for estimation:
- Logical Storage: ~$0.02/GB (Active), ~$0.01/GB (Long Term)
- Physical Storage: ~$0.04/GB (Active), ~$0.02/GB (Long Term)
Note: Actual costs may vary by region and specific negotiated rates. This tool provides an estimation to identify potential optimization targets.
For reference, here is the full code of the optimization script (optimize_bq_storage.py):
import argparse
import csv
from google.cloud import bigquery
from google.api_core.exceptions import NotFound
# Pricing constants (approximate US multi-region prices, adjust as needed)
# Logical Storage
PRICE_LOGICAL_ACTIVE = 0.02
PRICE_LOGICAL_LONG_TERM = 0.01
# Physical Storage
PRICE_PHYSICAL_ACTIVE = 0.04
PRICE_PHYSICAL_LONG_TERM = 0.02
def get_storage_cost(active_bytes, long_term_bytes, price_active, price_long_term):
"""Calculates monthly storage cost."""
gb_divider = 1024 ** 3
active_gb = active_bytes / gb_divider
long_term_gb = long_term_bytes / gb_divider
return (active_gb * price_active) + (long_term_gb * price_long_term)
def scan_project(project_id, output_file):
"""Scans all datasets and tables in the project to find optimization opportunities."""
client = bigquery.Client(project=project_id)
print(f"Scanning project: {project_id}...")
results = []
try:
datasets = list(client.list_datasets())
except Exception as e:
print(f"Error listing datasets: {e}")
return
for dataset_item in datasets:
dataset_id = dataset_item.dataset_id
try:
# Fetch full dataset to get storage_billing_model
dataset = client.get_dataset(dataset_item.reference)
# Default to Logical if not set
current_billing_model = getattr(dataset, 'storage_billing_model', 'LOGICAL')
if not current_billing_model:
current_billing_model = 'LOGICAL'
print(f" Scanning dataset: {dataset_id} (Current Bill Model: {current_billing_model})...")
except Exception as e:
print(f" Error getting dataset {dataset_id}: {e}")
current_billing_model = 'LOGICAL'
# Dataset Accumulators
ds_logical_bytes = 0
ds_long_term_logical_bytes = 0
ds_physical_bytes = 0
ds_long_term_physical_bytes = 0
ds_total_physical_billing_bytes = 0 # active + long term
try:
tables = list(client.list_tables(dataset_item.reference))
except Exception as e:
print(f" Error listing tables in {dataset_id}: {e}")
continue
table_count = 0
for table_item in tables:
try:
table = client.get_table(table_item.reference)
except NotFound:
continue # Table might have been deleted during scan
# Skip views and external tables
if table.table_type != 'TABLE':
continue
table_count += 1
# Helper to safely get property from attribute or raw dict
def get_val(obj, attr_name, dict_key):
val = getattr(obj, attr_name, None)
if val is not None:
return int(val)
# Fallback to _properties dictionary if available
props = getattr(obj, '_properties', {})
if dict_key in props:
return int(props[dict_key])
return 0
# Get storage stats
t_logical = get_val(table, 'num_bytes', 'numBytes')
t_lt_logical = get_val(table, 'num_long_term_bytes', 'numLongTermBytes')
# Try standard 'num_physical_bytes' first, then 'num_total_physical_bytes' (matches numTotalPhysicalBytes)
t_physical = get_val(table, 'num_physical_bytes', 'numPhysicalBytes')
if t_physical == 0:
t_physical = get_val(table, 'num_total_physical_bytes', 'numTotalPhysicalBytes')
t_tt_physical = get_val(table, 'num_time_travel_physical_bytes', 'numTimeTravelPhysicalBytes')
t_fs_physical = get_val(table, 'num_fail_safe_physical_bytes', 'numFailSafePhysicalBytes')
# If t_physical is still 0 but we have active/long term physical, sum them
if t_physical == 0:
p_active = get_val(table, 'num_active_physical_bytes', 'numActivePhysicalBytes')
p_long_term = get_val(table, 'num_long_term_physical_bytes', 'numLongTermPhysicalBytes')
t_physical = p_active + p_long_term
t_total_physical = t_physical + t_tt_physical + t_fs_physical
t_lt_physical = get_val(table, 'num_long_term_physical_bytes', 'numLongTermPhysicalBytes')
# Add to dataset totals
# Note: Logic bytes are straightforward
ds_logical_bytes += t_logical
ds_long_term_logical_bytes += t_lt_logical
# Physical bytes
ds_total_physical_billing_bytes += t_total_physical
ds_long_term_physical_bytes += t_lt_physical
# Also track active physical explicitly if needed, but total - longterm works for cost calc
# (which assumes total physical = active + long term + time travel + fail safe ?
# Actually, 'numTotalPhysicalBytes' usually implies the main storage.
# Let's trust the sum we built: t_total_physical includes everything.
# Active Physical = t_total_physical - t_lt_physical - t_tt_physical - t_fs_physical?
# Simplest: Active Physical = Total (billed) - Long Term (billed at lower rate).
# But Wait: Time Travel and Failsafe are billed at Active rate or separate?
# Usually standard Physical Storage price covers active data. Time travel is billed at physical storage rates.
# So treating (Total - LongTerm) as "Active Price" bytes is a safe approximation for estimation.
# Calculate Dataset Metrics
ds_active_logical_bytes = ds_logical_bytes - ds_long_term_logical_bytes
ds_active_physical_bytes = ds_total_physical_billing_bytes - ds_long_term_physical_bytes
# Calculate Costs for whole dataset
cost_logical = get_storage_cost(ds_active_logical_bytes, ds_long_term_logical_bytes, PRICE_LOGICAL_ACTIVE, PRICE_LOGICAL_LONG_TERM)
cost_physical = get_storage_cost(ds_active_physical_bytes, ds_long_term_physical_bytes, PRICE_PHYSICAL_ACTIVE, PRICE_PHYSICAL_LONG_TERM)
recommendation = "KEEP"
savings = 0.0
if cost_physical < cost_logical:
if current_billing_model != 'PHYSICAL':
recommendation = "SWITCH TO PHYSICAL"
savings = cost_logical - cost_physical
elif cost_logical < cost_physical:
if current_billing_model == 'PHYSICAL':
recommendation = "SWITCH TO LOGICAL"
savings = cost_physical - cost_logical
if table_count > 0:
results.append({
"Project": project_id,
"Dataset": dataset_id,
"Table Count": table_count,
"Current Model": current_billing_model,
"Logical Bytes": ds_logical_bytes,
"Physical Bytes": ds_total_physical_billing_bytes,
"Logical Cost ($)": round(cost_logical, 4),
"Physical Cost ($)": round(cost_physical, 4),
"Recommendation": recommendation,
"Potential Monthly Savings ($)": round(savings, 4)
})
# Sort results by Potential Monthly Savings (Desc)
results.sort(key=lambda x: x.get("Potential Monthly Savings ($)", 0), reverse=True)
# Write to CSV
if results:
keys = results[0].keys()
with open(output_file, 'w', newline='') as f:
dict_writer = csv.DictWriter(f, fieldnames=keys)
dict_writer.writeheader()
dict_writer.writerows(results)
print(f"Scan complete. Results saved to {output_file}")
else:
print("No tables found or no data to write.")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Scan BigQuery tables to optimize storage billing models.")
parser.add_argument("project_id", help="The GCP Project ID to scan.")
parser.add_argument("--output", default="bq_optimization_results.csv", help="Output CSV filename.")
args = parser.parse_args()
scan_project(args.project_id, args.output)