Skip to content

lambiase/spark-bigquery

 
 

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

101 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

spark-bigquery

Build Status

This Spark module allows saving DataFrame as BigQuery table.

The project was inspired by spotify/spark-bigquery, but there are several differences and enhancements:

  • Use of the Structured Streaming API

  • Use within Pyspark

  • Saving via Decorators

  • Allow saving to partitioned tables

  • Easy integration with Databricks

  • Use of Standard SQL

  • Run Data Manipulation Language Queries DML

  • Update schemas on writes using the setSchemaUpdateOptions

  • JSON is used as an intermediate format instead of Avro. This allows having fields on different levels named the same:

{
  "obj": {
    "data": {
      "data": {}
    }
  }
}
  • DataFrame's schema is automatically adapted to a legal one:

    1. Illegal characters are replaced with _
    2. Field names are converted to lower case to avoid ambiguity
    3. Duplicate field names are given a numeric suffix (_1, _2, etc.)

Docker!

I created a container that launches zepplin with spark and the connector for ease of use and quick startup. You can find it here

Usage

Including spark-bigquery into your project

Maven

<repositories>
  <repository>
    <id>oss-sonatype</id>
    <name>oss-sonatype</name>
    <url>https://oss.sonatype.org/content/repositories/releases/</url>
    <snapshots>
      <enabled>true</enabled>
    </snapshots>
  </repository>
</repositories>

<dependencies>
  <dependency>
    <groupId>com.github.samelamin</groupId>
    <artifactId>spark-bigquery_${scala.binary.version}</artifactId>
    <version>0.2.2</version>
  </dependency>
</dependencies>

SBT

To use it in a local SBT console first add the package as a dependency then set up your project details

resolvers += Opts.resolver.sonatypeReleases

libraryDependencies += "com.github.samelamin" %% "spark-bigquery" % "0.2.2"
import com.samelamin.spark.bigquery._

// Set up GCP credentials
sqlContext.setGcpJsonKeyFile("<JSON_KEY_FILE>")

// Set up BigQuery project and bucket
sqlContext.setBigQueryProjectId("<BILLING_PROJECT>")
sqlContext.setBigQueryGcsBucket("<GCS_BUCKET>")

// Set up BigQuery dataset location, default is US
sqlContext.setBigQueryDatasetLocation("<DATASET_LOCATION>")

Structured Streaming from S3/HDFS to BigQuery

S3 and HDFS are the defacto technology for storage in the cloud, this package allows you to stream any data added to a Big Query Table of your choice

import com.samelamin.spark.bigquery._

val df = spark.readStream.json("s3a://bucket")

df.writeStream
      .option("checkpointLocation", "s3a://checkpoint/dir")
      .option("tableReferenceSink","my-project:my_dataset.my_table")
      .format("com.samelamin.spark.bigquery")
      .start()

Structured Streaming from BigQuery Table

You can use this connector to stream from a BigQuery Table. The connector uses a Timestamped column to get offsets.

import com.samelamin.spark.bigquery._

val df = spark
          .readStream
          .option("tableReferenceSource","my-project:my_dataset.my_table")
          .format("com.samelamin.spark.bigquery")
          .load()

You can also specify a custom timestamp column:

import com.samelamin.spark.bigquery._

sqlContext.setBQTableTimestampColumn("column_name")

Saving DataFrame using BigQuery Hadoop writer API

By Default any table created by this connector has a timestamp column of bq_load_timestamp which has the value of the current timestamp.

import com.samelamin.spark.bigquery._

val df = ...
df.saveAsBigQueryTable("project-id:dataset-id.table-name")

You can also save to a table decorator by saving to 'dataset-id.table-name$YYYYMMDD'

Saving DataFrame using Pyspark

BQ_PROJECT_ID = "projectId"
DATASET_ID = "datasetId"
jsonFile = "/path/to/json"
GcsBucket = "gcs-bucket"
session = SparkSession.builder.getOrCreate()
bq = session._sc._jvm.com.samelamin.spark.bigquery.BigQuerySQLContext(session._wrapped._jsqlContext)
bq.setGcpJsonKeyFile(jsonFile)
bq.setBigQueryProjectId(BQ_PROJECT_ID)
bq.setGSProjectId(BQ_PROJECT_ID)
bq.setBigQueryGcsBucket(GcsBucket)
bq.setBigQueryDatasetLocation("US")
tableName = "{0}:{1}.{2}".format(BQ_PROJECT_ID,DATASET_ID,TABLE_NAME)
bqDF = session._sc._jvm.com.samelamin.spark.bigquery.BigQueryDataFrame(df._jdf)
bqDF.saveAsBigQueryTable(tableName, False, 0,None,None)

Reading DataFrame From BigQuery

import com.samelamin.spark.bigquery._


// Load everything from a table
val table = sqlContext.bigQueryTable("bigquery-public-data:samples.shakespeare")

// Load results from a SQL query
// Defaults to legacy SQL dialect
// To use standard SQL, set  --conf spark.hadoop.USE_STANDARD_SQL_DIALECT=true
val df = sqlContext.bigQuerySelect(
  "SELECT word, word_count FROM [bigquery-public-data:samples.shakespeare]")

Reading DataFrame From BigQuery in Pyspark

bq = spark._sc._jvm.com.samelamin.spark.bigquery.BigQuerySQLContext(spark._wrapped._jsqlContext)
df= DataFrame(bq.bigQuerySelect("SELECT word, word_count FROM [bigquery-public-data:samples.shakespeare]"), session._wrapped)

Running DML Queries

import com.samelamin.spark.bigquery._

// Load results from a SQL query
sqlContext.runDMLQuery("UPDATE dataset-id.table-name SET test_col = new_value WHERE test_col = old_value")

Please note that DML queries need to be done using Standard SQL

Update Schemas

You can also allow the saving of a dataframe to update a schema:

import com.samelamin.spark.bigquery._

sqlContext.setAllowSchemaUpdates()

Notes on using this API:

  • Structured Streaming needs a partitioned table which is created by default when writing a stream
  • Structured Streaming needs a timestamp column where offsets are retrieved from, by default all tables are created with a bq_load_timestamp column with a default value of the current timstamp.
  • For use with Databricks please follow this guide

License

Copyright 2016 samelamin.

Licensed under the Apache License, Version 2.0: http://www.apache.org/licenses/LICENSE-2.0

About

Google BigQuery support for Spark, Structured Streaming, SQL, and DataFrames with easy Databricks integration.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages

  • Scala 95.2%
  • Java 4.8%