Skip to content

thatapicompany/bigquery-storage-optimizer

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

2 Commits
 
 
 
 
 
 
 
 

Repository files navigation

BigQuery Billing Type Auto-Optimiser

This Python script scans a Google Cloud Platform (GCP) project's BigQuery datasets and tables to analyze storage costs. It compares the Logical (default) vs. Physical (compressed) storage billing models and recommends the most cost-effective option for each dataset.

Features

  • Project-Wide Scan: Iterates through all datasets and tables in a given GCP project.
  • Dataset-Level Aggregation: Aggregates storage statistics and costs at the Dataset level, where billing models are applied.
  • Cost Comparison: Calculates estimated monthly costs for both Logical and Physical storage models using standard pricing.
  • Optimization Recommendations: Suggests switching to "Physical" if it offers savings, or "Logical" if it's cheaper.
  • Sorted Results: Outputs results sorted by "Potential Monthly Savings ($)" in descending order, highlighting the biggest opportunities first.
  • CSV Export: Saves detailed results including dataset sizes, table counts, costs, and potential savings to a CSV file.

Prerequisites

  • Python 3.7+
  • A Google Cloud Project with BigQuery API enabled.
  • Google Cloud credentials with bigquery.tables.list and bigquery.tables.get permissions.

Installation

  1. Clone this repository:

    git clone https://github.com/your-username/bigquery-storage-optimizer.git
    cd bigquery-storage-optimizer
  2. Create and activate a virtual environment:

    python3 -m venv venv
    source venv/bin/activate
  3. Install dependencies:

    pip install -r requirements.txt

Usage

  1. Authenticate with Google Cloud (if running locally):

    gcloud auth application-default login

    Or ensure your service account key environment variable is set.

  2. Run the script:

    python optimize_bq_storage.py YOUR_PROJECT_ID --output bq_optimization_results.csv
  3. Check the Output: By default the script will generate bq_optimization_results.csv in the current directory. Open this file to view the analysis and recommended actions.

Logic

The script uses standard US multi-region pricing for estimation:

  • Logical Storage: ~$0.02/GB (Active), ~$0.01/GB (Long Term)
  • Physical Storage: ~$0.04/GB (Active), ~$0.02/GB (Long Term)

Note: Actual costs may vary by region and specific negotiated rates. This tool provides an estimation to identify potential optimization targets.

Script Source

For reference, here is the full code of the optimization script (optimize_bq_storage.py):

import argparse
import csv
from google.cloud import bigquery
from google.api_core.exceptions import NotFound

# Pricing constants (approximate US multi-region prices, adjust as needed)
# Logical Storage
PRICE_LOGICAL_ACTIVE = 0.02
PRICE_LOGICAL_LONG_TERM = 0.01

# Physical Storage
PRICE_PHYSICAL_ACTIVE = 0.04
PRICE_PHYSICAL_LONG_TERM = 0.02

def get_storage_cost(active_bytes, long_term_bytes, price_active, price_long_term):
    """Calculates monthly storage cost."""
    gb_divider = 1024 ** 3
    active_gb = active_bytes / gb_divider
    long_term_gb = long_term_bytes / gb_divider
    return (active_gb * price_active) + (long_term_gb * price_long_term)

def scan_project(project_id, output_file):
    """Scans all datasets and tables in the project to find optimization opportunities."""
    client = bigquery.Client(project=project_id)
    
    print(f"Scanning project: {project_id}...")
    
    results = []

    try:
        datasets = list(client.list_datasets())
    except Exception as e:
        print(f"Error listing datasets: {e}")
        return

    for dataset_item in datasets:
        dataset_id = dataset_item.dataset_id
        
        try:
            # Fetch full dataset to get storage_billing_model
            dataset = client.get_dataset(dataset_item.reference)
            # Default to Logical if not set
            current_billing_model = getattr(dataset, 'storage_billing_model', 'LOGICAL') 
            if not current_billing_model: 
                current_billing_model = 'LOGICAL'
                
            print(f"  Scanning dataset: {dataset_id} (Current Bill Model: {current_billing_model})...")
        except Exception as e:
             print(f"  Error getting dataset {dataset_id}: {e}")
             current_billing_model = 'LOGICAL'

        # Dataset Accumulators
        ds_logical_bytes = 0
        ds_long_term_logical_bytes = 0
        ds_physical_bytes = 0
        ds_long_term_physical_bytes = 0
        ds_total_physical_billing_bytes = 0 # active + long term

        try:
            tables = list(client.list_tables(dataset_item.reference))
        except Exception as e:
            print(f"    Error listing tables in {dataset_id}: {e}")
            continue
        
        table_count = 0
        for table_item in tables:
            try:
                table = client.get_table(table_item.reference)
            except NotFound:
                continue # Table might have been deleted during scan

            # Skip views and external tables
            if table.table_type != 'TABLE':
                continue
            
            table_count += 1

            # Helper to safely get property from attribute or raw dict
            def get_val(obj, attr_name, dict_key):
                val = getattr(obj, attr_name, None)
                if val is not None:
                    return int(val)
                # Fallback to _properties dictionary if available
                props = getattr(obj, '_properties', {})
                if dict_key in props:
                    return int(props[dict_key])
                return 0

            # Get storage stats
            t_logical = get_val(table, 'num_bytes', 'numBytes')
            t_lt_logical = get_val(table, 'num_long_term_bytes', 'numLongTermBytes')
            
            # Try standard 'num_physical_bytes' first, then 'num_total_physical_bytes' (matches numTotalPhysicalBytes)
            t_physical = get_val(table, 'num_physical_bytes', 'numPhysicalBytes')
            if t_physical == 0:
                 t_physical = get_val(table, 'num_total_physical_bytes', 'numTotalPhysicalBytes')

            t_tt_physical = get_val(table, 'num_time_travel_physical_bytes', 'numTimeTravelPhysicalBytes')
            t_fs_physical = get_val(table, 'num_fail_safe_physical_bytes', 'numFailSafePhysicalBytes')
            
            # If t_physical is still 0 but we have active/long term physical, sum them
            if t_physical == 0:
                p_active = get_val(table, 'num_active_physical_bytes', 'numActivePhysicalBytes')
                p_long_term = get_val(table, 'num_long_term_physical_bytes', 'numLongTermPhysicalBytes')
                t_physical = p_active + p_long_term

            t_total_physical = t_physical + t_tt_physical + t_fs_physical
            
            t_lt_physical = get_val(table, 'num_long_term_physical_bytes', 'numLongTermPhysicalBytes')
            
            # Add to dataset totals
            # Note: Logic bytes are straightforward
            ds_logical_bytes += t_logical
            ds_long_term_logical_bytes += t_lt_logical
            
            # Physical bytes
            ds_total_physical_billing_bytes += t_total_physical
            ds_long_term_physical_bytes += t_lt_physical

            # Also track active physical explicitly if needed, but total - longterm works for cost calc
            # (which assumes total physical = active + long term + time travel + fail safe ? 
            #  Actually, 'numTotalPhysicalBytes' usually implies the main storage. 
            #  Let's trust the sum we built: t_total_physical includes everything.
            #  Active Physical = t_total_physical - t_lt_physical - t_tt_physical - t_fs_physical?
            #  Simplest: Active Physical = Total (billed) - Long Term (billed at lower rate).
            #  But Wait: Time Travel and Failsafe are billed at Active rate or separate? 
            #  Usually standard Physical Storage price covers active data. Time travel is billed at physical storage rates.
            #  So treating (Total - LongTerm) as "Active Price" bytes is a safe approximation for estimation.

        # Calculate Dataset Metrics
        ds_active_logical_bytes = ds_logical_bytes - ds_long_term_logical_bytes
        ds_active_physical_bytes = ds_total_physical_billing_bytes - ds_long_term_physical_bytes

        # Calculate Costs for whole dataset
        cost_logical = get_storage_cost(ds_active_logical_bytes, ds_long_term_logical_bytes, PRICE_LOGICAL_ACTIVE, PRICE_LOGICAL_LONG_TERM)
        cost_physical = get_storage_cost(ds_active_physical_bytes, ds_long_term_physical_bytes, PRICE_PHYSICAL_ACTIVE, PRICE_PHYSICAL_LONG_TERM)

        recommendation = "KEEP"
        savings = 0.0

        if cost_physical < cost_logical:
            if current_billing_model != 'PHYSICAL':
                recommendation = "SWITCH TO PHYSICAL"
                savings = cost_logical - cost_physical
        elif cost_logical < cost_physical:
             if current_billing_model == 'PHYSICAL':
                recommendation = "SWITCH TO LOGICAL"
                savings = cost_physical - cost_logical
        
        if table_count > 0:
            results.append({
                "Project": project_id,
                "Dataset": dataset_id,
                "Table Count": table_count,
                "Current Model": current_billing_model,
                "Logical Bytes": ds_logical_bytes,
                "Physical Bytes": ds_total_physical_billing_bytes,
                "Logical Cost ($)": round(cost_logical, 4),
                "Physical Cost ($)": round(cost_physical, 4),
                "Recommendation": recommendation,
                "Potential Monthly Savings ($)": round(savings, 4)
            })

    # Sort results by Potential Monthly Savings (Desc)
    results.sort(key=lambda x: x.get("Potential Monthly Savings ($)", 0), reverse=True)

    # Write to CSV
    if results:
        keys = results[0].keys()
        with open(output_file, 'w', newline='') as f:
            dict_writer = csv.DictWriter(f, fieldnames=keys)
            dict_writer.writeheader()
            dict_writer.writerows(results)
        print(f"Scan complete. Results saved to {output_file}")
    else:
        print("No tables found or no data to write.")

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Scan BigQuery tables to optimize storage billing models.")
    parser.add_argument("project_id", help="The GCP Project ID to scan.")
    parser.add_argument("--output", default="bq_optimization_results.csv", help="Output CSV filename.")
    
    args = parser.parse_args()
    
    scan_project(args.project_id, args.output)

About

BigQuery storage billing model optimization tool with cost analysis and recommendations.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages