Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# bigrquery (development version)

* `bq_perform_query()` and friends gain a `labels` argument that attaches [BigQuery labels](https://cloud.google.com/bigquery/docs/labels-intro) to the resulting job, useful for cost allocation. `dbConnect()` gains a matching `labels` argument that is forwarded to every job run on the connection. Defaults to `getOption("bigrquery.labels")` (@JulianUmbhau, #673).
* BigQuery error messages containing `{` or `}` are no longer mistaken for cli expressions, so the underlying server message is shown instead of a cli parse failure (#677).
* `bq_perform_upload()` and friends now default to 22 digits of accuracy, and now allow you to change this value with the new `json_digits` argument.
* Always upload `POSIXt` objects with 6 digits (i.e. microsecond) precision (#660).
Expand Down
83 changes: 68 additions & 15 deletions R/bq-perform.R
Original file line number Diff line number Diff line change
Expand Up @@ -56,21 +56,28 @@ NULL
#' snake_case names are automatically converted to camelCase.
#' @param print_header Whether to print out a header row in the results.
#' @param billing Identifier of project to bill.
#' @param labels A named list of strings used to attach
#' [BigQuery labels](https://cloud.google.com/bigquery/docs/labels-intro)
#' to the resulting job, e.g. `list(env = "prod", team = "data")`. This
#' is most useful for cost allocation and other FinOps reporting.
#' Defaults to the value of `getOption("bigrquery.labels")`.
bq_perform_extract <- function(
x,
destination_uris,
destination_format = "NEWLINE_DELIMITED_JSON",
compression = "NONE",
...,
print_header = TRUE,
billing = x$project
billing = x$project,
labels = getOption("bigrquery.labels")
) {
x <- as_bq_table(x)
destination_uris <- as.character(destination_uris) # for gs_object
check_string(destination_format)
check_string(compression)
check_bool(print_header)
check_string(billing)
check_labels(labels)

url <- bq_path(billing, jobs = "")
body <- list(
Expand All @@ -81,7 +88,8 @@ bq_perform_extract <- function(
destinationFormat = unbox(destination_format),
compression = unbox(compression),
printHeader = unbox(print_header)
)
),
labels = labels
)
)

Expand Down Expand Up @@ -127,7 +135,8 @@ bq_perform_upload <- function(
write_disposition = "WRITE_EMPTY",
...,
billing = x$project,
json_digits = NULL
json_digits = NULL,
labels = getOption("bigrquery.labels")
) {
x <- as_bq_table(x)
if (!is.data.frame(values)) {
Expand All @@ -139,6 +148,7 @@ bq_perform_upload <- function(
check_string(write_disposition)
check_string(billing)
json_digits <- check_digits(json_digits)
check_labels(labels)

load <- list(
sourceFormat = unbox(source_format),
Expand All @@ -154,11 +164,21 @@ bq_perform_upload <- function(
load$autodetect <- unbox(TRUE)
}

metadata <- list(configuration = list(load = load))
metadata <- list(
configuration = list(
load = load,
labels = labels
)
)
metadata <- bq_body(metadata, ...)
metadata <- list(
"type" = "application/json; charset=UTF-8",
"content" = jsonlite::toJSON(metadata, pretty = TRUE, digits = json_digits)
"content" = jsonlite::toJSON(
metadata,
auto_unbox = TRUE,
pretty = TRUE,
digits = json_digits
)
)

if (source_format == "NEWLINE_DELIMITED_JSON") {
Expand Down Expand Up @@ -251,7 +271,8 @@ bq_perform_load <- function(
nskip = 0,
create_disposition = "CREATE_IF_NEEDED",
write_disposition = "WRITE_EMPTY",
...
...,
labels = getOption("bigrquery.labels")
) {
x <- as_bq_table(x)
source_uris <- as.character(source_uris)
Expand All @@ -260,6 +281,7 @@ bq_perform_load <- function(
check_number_decimal(nskip, min = 0)
check_string(create_disposition)
check_string(write_disposition)
check_labels(labels)

load <- list(
sourceUris = as.list(source_uris),
Expand All @@ -280,7 +302,12 @@ bq_perform_load <- function(
load$autodetect <- TRUE
}

body <- list(configuration = list(load = load))
body <- list(
configuration = list(
load = load,
labels = labels
)
)

url <- bq_path(billing, jobs = "")
res <- bq_post(
Expand Down Expand Up @@ -322,7 +349,8 @@ bq_perform_query <- function(
create_disposition = "CREATE_IF_NEEDED",
write_disposition = "WRITE_EMPTY",
use_legacy_sql = FALSE,
priority = "INTERACTIVE"
priority = "INTERACTIVE",
labels = getOption("bigrquery.labels")
) {
query <- as_query(query)
check_string(billing)
Expand All @@ -331,6 +359,7 @@ bq_perform_query <- function(
check_string(write_disposition)
check_bool(use_legacy_sql)
check_string(priority)
check_labels(labels)

query <- list(
query = unbox(query),
Expand All @@ -357,7 +386,12 @@ bq_perform_query <- function(
}

url <- bq_path(billing, jobs = "")
body <- list(configuration = list(query = query))
body <- list(
configuration = list(
query = query,
labels = labels
)
)

res <- bq_post(
url,
Expand All @@ -375,17 +409,25 @@ bq_perform_query_dry_run <- function(
...,
default_dataset = NULL,
parameters = NULL,
use_legacy_sql = FALSE
use_legacy_sql = FALSE,
labels = getOption("bigrquery.labels")
) {
query <- bq_perform_query_data(
query = query,
default_dataset = default_dataset,
parameters = parameters,
use_legacy_sql = use_legacy_sql
)
check_labels(labels)

url <- bq_path(billing, jobs = "")
body <- list(configuration = list(query = query, dryRun = unbox(TRUE)))
body <- list(
configuration = list(
query = query,
labels = labels,
dryRun = unbox(TRUE)
)
)

res <- bq_post(
url,
Expand All @@ -403,17 +445,25 @@ bq_perform_query_schema <- function(
billing,
...,
default_dataset = NULL,
parameters = NULL
parameters = NULL,
labels = getOption("bigrquery.labels")
) {
query <- bq_perform_query_data(
query = query,
default_dataset = default_dataset,
parameters = parameters,
use_legacy_sql = FALSE
)
check_labels(labels)

url <- bq_path(billing, jobs = "")
body <- list(configuration = list(query = query, dryRun = unbox(TRUE)))
body <- list(
configuration = list(
query = query,
labels = labels,
dryRun = unbox(TRUE)
)
)

res <- bq_post(
url,
Expand Down Expand Up @@ -459,10 +509,12 @@ bq_perform_copy <- function(
create_disposition = "CREATE_IF_NEEDED",
write_disposition = "WRITE_EMPTY",
...,
billing = NULL
billing = NULL,
labels = getOption("bigrquery.labels")
) {
billing <- billing %||% dest$project
url <- bq_path(billing, jobs = "")
check_labels(labels)

body <- list(
configuration = list(
Expand All @@ -471,7 +523,8 @@ bq_perform_copy <- function(
destinationTable = tableReference(dest),
createDisposition = unbox(create_disposition),
writeDisposition = unbox(write_disposition)
)
),
labels = labels
)
)

Expand Down
12 changes: 9 additions & 3 deletions R/dbi-connection.R
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ BigQueryConnection <- function(
page_size = 1e4,
quiet = NA,
use_legacy_sql = FALSE,
bigint = c("integer", "integer64", "numeric", "character")
bigint = c("integer", "integer64", "numeric", "character"),
labels = NULL
) {
connection_capture()

Expand All @@ -20,7 +21,8 @@ BigQueryConnection <- function(
page_size = as.integer(page_size),
quiet = quiet,
use_legacy_sql = use_legacy_sql,
bigint = match.arg(bigint)
bigint = match.arg(bigint),
labels = labels
)
}

Expand All @@ -36,7 +38,8 @@ setClass(
use_legacy_sql = "logical",
page_size = "integer",
quiet = "logical",
bigint = "character"
bigint = "character",
labels = "ANY"
)
)

Expand Down Expand Up @@ -113,6 +116,7 @@ setMethod(
default_dataset = ds,
quiet = conn@quiet,
parameters = params,
labels = conn@labels,
...
)
bq_job_wait(job, quiet = conn@quiet)
Expand Down Expand Up @@ -256,6 +260,7 @@ dbWriteTable_bq <- function(
create_disposition = create_disposition,
write_disposition = write_disposition,
billing = conn@billing,
labels = conn@labels,
...
)
invisible(TRUE)
Expand Down Expand Up @@ -307,6 +312,7 @@ dbAppendTable_bq <- function(conn, name, value, ..., row.names = NULL) {
create_disposition = "CREATE_NEVER",
write_disposition = "WRITE_APPEND",
billing = conn@billing,
labels = conn@labels,
...
)
on_connection_updated(conn, toString(tb))
Expand Down
5 changes: 4 additions & 1 deletion R/dbi-driver.R
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ setMethod(
quiet = NA,
use_legacy_sql = FALSE,
bigint = c("integer", "integer64", "numeric", "character"),
labels = getOption("bigrquery.labels"),
...
) {
check_string(project)
Expand All @@ -93,6 +94,7 @@ setMethod(
check_bool(quiet, allow_na = TRUE)
check_bool(use_legacy_sql)
bigint <- arg_match(bigint)
check_labels(labels, call = quote(dbConnect()))

BigQueryConnection(
project = project,
Expand All @@ -101,7 +103,8 @@ setMethod(
page_size = page_size,
quiet = quiet,
use_legacy_sql = use_legacy_sql,
bigint = bigint
bigint = bigint,
labels = labels
)
}
)
Expand Down
1 change: 1 addition & 0 deletions R/dbi-result.R
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ BigQueryResult <- function(conn, sql, params = NULL, ...) {
default_dataset = ds,
quiet = conn@quiet,
parameters = params,
labels = conn@labels,
...
)

Expand Down
34 changes: 28 additions & 6 deletions R/dplyr.R
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ tbl.BigQueryConnection <- function(src, from, ...) {
schema <- bq_perform_query_schema(
sql,
billing = src$con@billing,
default_dataset = dataset
default_dataset = dataset,
labels = src$con@labels
)
vars <- map_chr(schema, "[[", "name")

Expand Down Expand Up @@ -94,7 +95,8 @@ db_compute.BigQueryConnection <- function(
tb <- bq_project_query(
con@project,
sql,
destination_table = destination_table
destination_table = destination_table,
labels = con@labels
)
} else {
ds <- bq_dataset(con@project, con@dataset)
Expand All @@ -103,7 +105,8 @@ db_compute.BigQueryConnection <- function(
tb <- bq_dataset_query(
ds,
query = sql,
destination_table = destination_table
destination_table = destination_table,
labels = con@labels
)
}

Expand Down Expand Up @@ -133,7 +136,13 @@ db_copy_to.BigQueryConnection <- function(

tb <- as_bq_table(con, table)
write <- if (overwrite) "WRITE_TRUNCATE" else "WRITE_EMPTY"
bq_table_upload(tb, values, fields = types, write_disposition = write)
bq_table_upload(
tb,
values,
fields = types,
write_disposition = write,
labels = con@labels
)

table
}
Expand Down Expand Up @@ -181,10 +190,23 @@ collect.tbl_BigQueryConnection <- function(
sql <- dbplyr::db_sql_render(con, x)

if (is.null(con@dataset)) {
tb <- bq_project_query(billing, sql, quiet = con@quiet, ...)
tb <- bq_project_query(
billing,
sql,
quiet = con@quiet,
labels = con@labels,
...
)
} else {
ds <- as_bq_dataset(con)
tb <- bq_dataset_query(ds, sql, quiet = con@quiet, billing = billing, ...)
tb <- bq_dataset_query(
ds,
sql,
quiet = con@quiet,
billing = billing,
labels = con@labels,
...
)
}
}

Expand Down
Loading
Loading