From b350559fc4bb08b49c7705190ceaeddb32a5fc85 Mon Sep 17 00:00:00 2001 From: Martin Studer Date: Tue, 7 Nov 2017 16:57:16 +0100 Subject: [PATCH 1/4] Upgrade to databricks spark-avro 4.0.0 (officially supporting Spark 2.2) --- build.sbt | 2 +- .../spark/avro/SchemaConverters.scala | 368 ++++++++++++------ .../spark/bigquery/BigQuerySQLContext.scala | 2 +- 3 files changed, 250 insertions(+), 122 deletions(-) diff --git a/build.sbt b/build.sbt index 821f798..dcbca8b 100644 --- a/build.sbt +++ b/build.sbt @@ -27,7 +27,7 @@ spAppendScalaVersion := true spIncludeMaven := true libraryDependencies ++= Seq( - "com.databricks" %% "spark-avro" % "3.0.0", + "com.databricks" %% "spark-avro" % "4.0.0", "com.google.cloud.bigdataoss" % "bigquery-connector" % "0.7.5-hadoop2" exclude ("com.google.guava", "guava-jdk5"), "org.slf4j" % "slf4j-simple" % "1.7.21", diff --git a/src/main/scala/com/databricks/spark/avro/SchemaConverters.scala b/src/main/scala/com/databricks/spark/avro/SchemaConverters.scala index 350d117..648d2ab 100644 --- a/src/main/scala/com/databricks/spark/avro/SchemaConverters.scala +++ b/src/main/scala/com/databricks/spark/avro/SchemaConverters.scala @@ -17,29 +17,31 @@ package com.databricks.spark.avro import java.nio.ByteBuffer -import java.util.HashMap -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import org.apache.avro.generic.GenericData.Fixed import org.apache.avro.generic.{GenericData, GenericRecord} import org.apache.avro.{Schema, SchemaBuilder} import org.apache.avro.SchemaBuilder._ import org.apache.avro.Schema.Type._ -import org.apache.spark.sql.Row + +import org.apache.spark.sql.catalyst.expressions.GenericRow import org.apache.spark.sql.types._ /** - * This object contains method that are used to convert sparkSQL schemas to avro schemas and vice - * versa. - */ + * This object contains method that are used to convert sparkSQL schemas to avro schemas and vice + * versa. + */ object SchemaConverters { + class IncompatibleSchemaException(msg: String, ex: Throwable = null) extends Exception(msg, ex) + case class SchemaType(dataType: DataType, nullable: Boolean) /** - * This function takes an avro schema and returns a sql schema. - */ + * This function takes an avro schema and returns a sql schema. + */ def toSqlType(avroSchema: Schema): SchemaType = { avroSchema.getType match { case INT => SchemaType(IntegerType, nullable = false) @@ -53,7 +55,7 @@ object SchemaConverters { case ENUM => SchemaType(StringType, nullable = false) case RECORD => - val fields = avroSchema.getFields.map { f => + val fields = avroSchema.getFields.asScala.map { f => val schemaType = toSqlType(f.schema()) StructField(f.name, schemaType.dataType, schemaType.nullable) } @@ -73,37 +75,46 @@ object SchemaConverters { nullable = false) case UNION => - if (avroSchema.getTypes.exists(_.getType == NULL)) { + if (avroSchema.getTypes.asScala.exists(_.getType == NULL)) { // In case of a union with null, eliminate it and make a recursive call - val remainingUnionTypes = avroSchema.getTypes.filterNot(_.getType == NULL) + val remainingUnionTypes = avroSchema.getTypes.asScala.filterNot(_.getType == NULL) if (remainingUnionTypes.size == 1) { - toSqlType(remainingUnionTypes.get(0)).copy(nullable = true) + toSqlType(remainingUnionTypes.head).copy(nullable = true) } else { - toSqlType(Schema.createUnion(remainingUnionTypes)).copy(nullable = true) + toSqlType(Schema.createUnion(remainingUnionTypes.asJava)).copy(nullable = true) } - } else avroSchema.getTypes.map(_.getType) match { + } else avroSchema.getTypes.asScala.map(_.getType) match { case Seq(t1) => toSqlType(avroSchema.getTypes.get(0)) case Seq(t1, t2) if Set(t1, t2) == Set(INT, LONG) => SchemaType(LongType, nullable = false) case Seq(t1, t2) if Set(t1, t2) == Set(FLOAT, DOUBLE) => SchemaType(DoubleType, nullable = false) - case other => throw new UnsupportedOperationException( - s"This mix of union types is not supported (see README): $other") + case _ => + // Convert complex unions to struct types where field names are member0, member1, etc. + // This is consistent with the behavior when converting between Avro and Parquet. + val fields = avroSchema.getTypes.asScala.zipWithIndex.map { + case (s, i) => + val schemaType = toSqlType(s) + // All fields are nullable because only one of them is set at a time + StructField(s"member$i", schemaType.dataType, nullable = true) + } + + SchemaType(StructType(fields), nullable = false) } - case other => throw new UnsupportedOperationException(s"Unsupported type $other") + case other => throw new IncompatibleSchemaException(s"Unsupported type $other") } } /** - * This function converts sparkSQL StructType into avro schema. This method uses two other - * converter methods in order to do the conversion. - */ + * This function converts sparkSQL StructType into avro schema. This method uses two other + * converter methods in order to do the conversion. + */ def convertStructToAvro[T]( - structType: StructType, - schemaBuilder: RecordBuilder[T], - recordNamespace: String): T = { + structType: StructType, + schemaBuilder: RecordBuilder[T], + recordNamespace: String): T = { val fieldsAssembler: FieldAssembler[T] = schemaBuilder.fields() structType.fields.foreach { field => val newField = fieldsAssembler.name(field.name).`type`() @@ -120,99 +131,190 @@ object SchemaConverters { } /** - * Returns a function that is used to convert avro types to their - * corresponding sparkSQL representations. - */ - def createConverterToSQL(schema: Schema): Any => Any = { - schema.getType match { - // Avro strings are in Utf8, so we have to call toString on them - case STRING | ENUM => (item: Any) => if (item == null) null else item.toString - case INT | BOOLEAN | DOUBLE | FLOAT | LONG => identity - // Byte arrays are reused by avro, so we have to make a copy of them. - case FIXED => (item: Any) => if (item == null) { - null - } else { - item.asInstanceOf[Fixed].bytes().clone() - } - case BYTES => (item: Any) => if (item == null) { - null - } else { - val bytes = item.asInstanceOf[ByteBuffer] - val javaBytes = new Array[Byte](bytes.remaining) - bytes.get(javaBytes) - javaBytes - } - case RECORD => - val fieldConverters = schema.getFields.map(f => createConverterToSQL(f.schema)) - (item: Any) => if (item == null) { - null - } else { - val record = item.asInstanceOf[GenericRecord] - val converted = new Array[Any](fieldConverters.size) - var idx = 0 - while (idx < fieldConverters.size) { - converted(idx) = fieldConverters.apply(idx)(record.get(idx)) - idx += 1 - } - Row.fromSeq(converted.toSeq) - } - case ARRAY => - val elementConverter = createConverterToSQL(schema.getElementType) - (item: Any) => if (item == null) { - null - } else { - item.asInstanceOf[GenericData.Array[Any]].map(elementConverter) - } - case MAP => - val valueConverter = createConverterToSQL(schema.getValueType) - (item: Any) => if (item == null) { - null - } else { - item.asInstanceOf[HashMap[Any, Any]].map(x => (x._1.toString, valueConverter(x._2))).toMap - } - case UNION => - if (schema.getTypes.exists(_.getType == NULL)) { - val remainingUnionTypes = schema.getTypes.filterNot(_.getType == NULL) - if (remainingUnionTypes.size == 1) { - createConverterToSQL(remainingUnionTypes.get(0)) - } else { - createConverterToSQL(Schema.createUnion(remainingUnionTypes)) + * Returns a converter function to convert row in avro format to GenericRow of catalyst. + * + * @param sourceAvroSchema Source schema before conversion inferred from avro file by passed in + * by user. + * @param targetSqlType Target catalyst sql type after the conversion. + * @return returns a converter function to convert row in avro format to GenericRow of catalyst. + */ + def createConverterToSQL(sourceAvroSchema: Schema, targetSqlType: DataType): AnyRef => AnyRef = { + + def createConverter(avroSchema: Schema, + sqlType: DataType, path: List[String]): AnyRef => AnyRef = { + val avroType = avroSchema.getType + (sqlType, avroType) match { + // Avro strings are in Utf8, so we have to call toString on them + case (StringType, STRING) | (StringType, ENUM) => + (item: AnyRef) => if (item == null) null else item.toString + // Byte arrays are reused by avro, so we have to make a copy of them. + case (IntegerType, INT) | (BooleanType, BOOLEAN) | (DoubleType, DOUBLE) | + (FloatType, FLOAT) | (LongType, LONG) => + identity + case (BinaryType, FIXED) => + (item: AnyRef) => + if (item == null) { + null + } else { + item.asInstanceOf[Fixed].bytes().clone() + } + case (BinaryType, BYTES) => + (item: AnyRef) => + if (item == null) { + null + } else { + val byteBuffer = item.asInstanceOf[ByteBuffer] + val bytes = new Array[Byte](byteBuffer.remaining) + byteBuffer.get(bytes) + bytes + } + + case (struct: StructType, RECORD) => + val length = struct.fields.length + val converters = new Array[AnyRef => AnyRef](length) + val avroFieldIndexes = new Array[Int](length) + var i = 0 + while (i < length) { + val sqlField = struct.fields(i) + val avroField = avroSchema.getField(sqlField.name) + if (avroField != null) { + val converter = createConverter(avroField.schema(), sqlField.dataType, + path :+ sqlField.name) + converters(i) = converter + avroFieldIndexes(i) = avroField.pos() + } else if (!sqlField.nullable) { + throw new IncompatibleSchemaException( + s"Cannot find non-nullable field ${sqlField.name} at path ${path.mkString(".")} " + + "in Avro schema\n" + + s"Source Avro schema: $sourceAvroSchema.\n" + + s"Target Catalyst type: $targetSqlType") + } + i += 1 } - } else schema.getTypes.map(_.getType) match { - case Seq(t1) => - createConverterToSQL(schema.getTypes.get(0)) - case Seq(t1, t2) if Set(t1, t2) == Set(INT, LONG) => - (item: Any) => { - item match { - case l: Long => l - case i: Int => i.toLong - case null => null + + (item: AnyRef) => { + if (item == null) { + null + } else { + val record = item.asInstanceOf[GenericRecord] + + val result = new Array[Any](length) + var i = 0 + while (i < converters.length) { + if (converters(i) != null) { + val converter = converters(i) + result(i) = converter(record.get(avroFieldIndexes(i))) + } + i += 1 } + new GenericRow(result) } - case Seq(t1, t2) if Set(t1, t2) == Set(FLOAT, DOUBLE) => - (item: Any) => { - item match { - case d: Double => d - case f: Float => f.toDouble - case null => null + } + case (arrayType: ArrayType, ARRAY) => + val elementConverter = createConverter(avroSchema.getElementType, arrayType.elementType, + path) + val allowsNull = arrayType.containsNull + (item: AnyRef) => { + if (item == null) { + null + } else { + item.asInstanceOf[java.lang.Iterable[AnyRef]].asScala.map { element => + if (element == null && !allowsNull) { + throw new RuntimeException(s"Array value at path ${path.mkString(".")} is not " + + "allowed to be null") + } else { + elementConverter(element) + } } } - case other => throw new UnsupportedOperationException( - s"This mix of union types is not supported (see README): $other") - } - case other => throw new UnsupportedOperationException(s"invalid avro type: $other") + } + case (mapType: MapType, MAP) if mapType.keyType == StringType => + val valueConverter = createConverter(avroSchema.getValueType, mapType.valueType, path) + val allowsNull = mapType.valueContainsNull + (item: AnyRef) => { + if (item == null) { + null + } else { + item.asInstanceOf[java.util.Map[AnyRef, AnyRef]].asScala.map { x => + if (x._2 == null && !allowsNull) { + throw new RuntimeException(s"Map value at path ${path.mkString(".")} is not " + + "allowed to be null") + } else { + (x._1.toString, valueConverter(x._2)) + } + }.toMap + } + } + case (sqlType, UNION) => + if (avroSchema.getTypes.asScala.exists(_.getType == NULL)) { + val remainingUnionTypes = avroSchema.getTypes.asScala.filterNot(_.getType == NULL) + if (remainingUnionTypes.size == 1) { + createConverter(remainingUnionTypes.head, sqlType, path) + } else { + createConverter(Schema.createUnion(remainingUnionTypes.asJava), sqlType, path) + } + } else avroSchema.getTypes.asScala.map(_.getType) match { + case Seq(t1) => createConverter(avroSchema.getTypes.get(0), sqlType, path) + case Seq(a, b) if Set(a, b) == Set(INT, LONG) && sqlType == LongType => + (item: AnyRef) => { + item match { + case null => null + case l: java.lang.Long => l + case i: java.lang.Integer => new java.lang.Long(i.longValue()) + } + } + case Seq(a, b) if Set(a, b) == Set(FLOAT, DOUBLE) && sqlType == DoubleType => + (item: AnyRef) => { + item match { + case null => null + case d: java.lang.Double => d + case f: java.lang.Float => new java.lang.Double(f.doubleValue()) + } + } + case other => + sqlType match { + case t: StructType if t.fields.length == avroSchema.getTypes.size => + val fieldConverters = t.fields.zip(avroSchema.getTypes.asScala).map { + case (field, schema) => + createConverter(schema, field.dataType, path :+ field.name) + } + + (item: AnyRef) => if (item == null) { + null + } else { + val i = GenericData.get().resolveUnion(avroSchema, item) + val converted = new Array[Any](fieldConverters.length) + converted(i) = fieldConverters(i)(item) + new GenericRow(converted) + } + case _ => throw new IncompatibleSchemaException( + s"Cannot convert Avro schema to catalyst type because schema at path " + + s"${path.mkString(".")} is not compatible " + + s"(avroType = $other, sqlType = $sqlType). \n" + + s"Source Avro schema: $sourceAvroSchema.\n" + + s"Target Catalyst type: $targetSqlType") + } + } + case (left, right) => + throw new IncompatibleSchemaException( + s"Cannot convert Avro schema to catalyst type because schema at path " + + s"${path.mkString(".")} is not compatible (avroType = $left, sqlType = $right). \n" + + s"Source Avro schema: $sourceAvroSchema.\n" + + s"Target Catalyst type: $targetSqlType") + } } + createConverter(sourceAvroSchema, targetSqlType, List.empty[String]) } /** - * This function is used to convert some sparkSQL type to avro type. Note that this function won't - * be used to construct fields of avro record (convertFieldTypeToAvro is used for that). - */ + * This function is used to convert some sparkSQL type to avro type. Note that this function won't + * be used to construct fields of avro record (convertFieldTypeToAvro is used for that). + */ private def convertTypeToAvro[T]( - dataType: DataType, - schemaBuilder: BaseTypeBuilder[T], - structName: String, - recordNamespace: String): T = { + dataType: DataType, + schemaBuilder: BaseTypeBuilder[T], + structName: String, + recordNamespace: String): T = { dataType match { case ByteType => schemaBuilder.intType() case ShortType => schemaBuilder.intType() @@ -225,6 +327,7 @@ object SchemaConverters { case BinaryType => schemaBuilder.bytesType() case BooleanType => schemaBuilder.booleanType() case TimestampType => schemaBuilder.longType() + case DateType => schemaBuilder.longType() case ArrayType(elementType, _) => val builder = getSchemaBuilder(dataType.asInstanceOf[ArrayType].containsNull) @@ -242,20 +345,20 @@ object SchemaConverters { schemaBuilder.record(structName).namespace(recordNamespace), recordNamespace) - case other => throw new IllegalArgumentException(s"Unexpected type $dataType.") + case other => throw new IncompatibleSchemaException(s"Unexpected type $dataType.") } } /** - * This function is used to construct fields of the avro record, where schema of the field is - * specified by avro representation of dataType. Since builders for record fields are different - * from those for everything else, we have to use a separate method. - */ + * This function is used to construct fields of the avro record, where schema of the field is + * specified by avro representation of dataType. Since builders for record fields are different + * from those for everything else, we have to use a separate method. + */ private def convertFieldTypeToAvro[T]( - dataType: DataType, - newFieldBuilder: BaseFieldTypeBuilder[T], - structName: String, - recordNamespace: String): FieldDefault[T, _] = { + dataType: DataType, + newFieldBuilder: BaseFieldTypeBuilder[T], + structName: String, + recordNamespace: String): FieldDefault[T, _] = { dataType match { case ByteType => newFieldBuilder.intType() case ShortType => newFieldBuilder.intType() @@ -268,24 +371,49 @@ object SchemaConverters { case BinaryType => newFieldBuilder.bytesType() case BooleanType => newFieldBuilder.booleanType() case TimestampType => newFieldBuilder.longType() + case DateType => newFieldBuilder.longType() case ArrayType(elementType, _) => val builder = getSchemaBuilder(dataType.asInstanceOf[ArrayType].containsNull) - val elementSchema = convertTypeToAvro(elementType, builder, structName, recordNamespace) + val elementSchema = convertTypeToAvro( + elementType, + builder, + structName, + getNewRecordNamespace(elementType, recordNamespace, structName)) newFieldBuilder.array().items(elementSchema) case MapType(StringType, valueType, _) => val builder = getSchemaBuilder(dataType.asInstanceOf[MapType].valueContainsNull) - val valueSchema = convertTypeToAvro(valueType, builder, structName, recordNamespace) + val valueSchema = convertTypeToAvro( + valueType, + builder, + structName, + getNewRecordNamespace(valueType, recordNamespace, structName)) newFieldBuilder.map().values(valueSchema) case structType: StructType => convertStructToAvro( structType, - newFieldBuilder.record(structName).namespace(recordNamespace), - recordNamespace) + newFieldBuilder.record(structName).namespace(s"$recordNamespace.$structName"), + s"$recordNamespace.$structName") + + case other => throw new IncompatibleSchemaException(s"Unexpected type $dataType.") + } + } + + /** + * Returns a new namespace depending on the data type of the element. + * If the data type is a StructType it returns the current namespace concatenated + * with the element name, otherwise it returns the current namespace as it is. + */ + private[avro] def getNewRecordNamespace( + elementDataType: DataType, + currentRecordNamespace: String, + elementName: String): String = { - case other => throw new UnsupportedOperationException(s"Unexpected type $dataType.") + elementDataType match { + case StructType(_) => s"$currentRecordNamespace.$elementName" + case _ => currentRecordNamespace } } diff --git a/src/main/scala/com/spotify/spark/bigquery/BigQuerySQLContext.scala b/src/main/scala/com/spotify/spark/bigquery/BigQuerySQLContext.scala index a65acd4..3e45a58 100644 --- a/src/main/scala/com/spotify/spark/bigquery/BigQuerySQLContext.scala +++ b/src/main/scala/com/spotify/spark/bigquery/BigQuerySQLContext.scala @@ -113,7 +113,7 @@ class BigQuerySQLContext(sqlContext: SQLContext) { val schema = new Schema.Parser().parse(schemaString) val structType = SchemaConverters.toSqlType(schema).dataType.asInstanceOf[StructType] - val converter = SchemaConverters.createConverterToSQL(schema) + val converter = SchemaConverters.createConverterToSQL(schema, structType) .asInstanceOf[GenericData.Record => Row] sqlContext.createDataFrame(rdd.map(converter), structType) } From 1da9ef5bde431ba5e55e6ebbc35e1ee254d69f4e Mon Sep 17 00:00:00 2001 From: Nicola Lambiase Date: Wed, 8 Nov 2017 11:15:56 +0100 Subject: [PATCH 2/4] Upgraded bigquery-connector to version 0.10.2-hadoop2 --- build.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index dcbca8b..c7682d2 100644 --- a/build.sbt +++ b/build.sbt @@ -28,7 +28,7 @@ spIncludeMaven := true libraryDependencies ++= Seq( "com.databricks" %% "spark-avro" % "4.0.0", - "com.google.cloud.bigdataoss" % "bigquery-connector" % "0.7.5-hadoop2" + "com.google.cloud.bigdataoss" % "bigquery-connector" % "0.10.2-hadoop2" exclude ("com.google.guava", "guava-jdk5"), "org.slf4j" % "slf4j-simple" % "1.7.21", "joda-time" % "joda-time" % "2.9.3", From 5c481f7fd1b33f45f9828eee224bdfe71955aa00 Mon Sep 17 00:00:00 2001 From: Omer Demirel Date: Wed, 8 Nov 2017 14:07:31 +0100 Subject: [PATCH 3/4] Converting Joda time to Java time --- Databricks.md | 3 +-- build.sbt | 1 - .../com/spotify/spark/bigquery/BigQueryClient.scala | 9 +++++---- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/Databricks.md b/Databricks.md index 6993ba7..6f68530 100644 --- a/Databricks.md +++ b/Databricks.md @@ -40,8 +40,7 @@ assemblyJarName := "uber-SampleInDatabricks-1.0-SNAPSHOT.jar" libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % "2.0.0" % "provided", "org.apache.spark" %% "spark-streaming" % "2.0.0" % "provided", - "org.apache.spark" %% "spark-sql" % "2.0.0" % "provided", - "joda-time" % "joda-time" % "2.9.6" + "org.apache.spark" %% "spark-sql" % "2.0.0" % "provided" ) // META-INF discarding diff --git a/build.sbt b/build.sbt index c7682d2..e875e7c 100644 --- a/build.sbt +++ b/build.sbt @@ -31,7 +31,6 @@ libraryDependencies ++= Seq( "com.google.cloud.bigdataoss" % "bigquery-connector" % "0.10.2-hadoop2" exclude ("com.google.guava", "guava-jdk5"), "org.slf4j" % "slf4j-simple" % "1.7.21", - "joda-time" % "joda-time" % "2.9.3", "org.scalatest" %% "scalatest" % "2.2.1" % "test" ) diff --git a/src/main/scala/com/spotify/spark/bigquery/BigQueryClient.scala b/src/main/scala/com/spotify/spark/bigquery/BigQueryClient.scala index e14120e..6cac9d7 100644 --- a/src/main/scala/com/spotify/spark/bigquery/BigQueryClient.scala +++ b/src/main/scala/com/spotify/spark/bigquery/BigQueryClient.scala @@ -19,6 +19,8 @@ package com.spotify.spark.bigquery import java.util.UUID import java.util.concurrent.TimeUnit +import java.time.Instant +import java.time.format.DateTimeFormatter import com.google.api.client.googleapis.auth.oauth2.GoogleCredential import com.google.api.client.googleapis.json.GoogleJsonResponseException @@ -28,10 +30,9 @@ import com.google.api.services.bigquery.model._ import com.google.api.services.bigquery.{Bigquery, BigqueryScopes} import com.google.cloud.hadoop.io.bigquery._ import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache} + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.util.Progressable -import org.joda.time.Instant -import org.joda.time.format.DateTimeFormat import org.slf4j.{Logger, LoggerFactory} import scala.collection.JavaConverters._ @@ -97,7 +98,7 @@ private[bigquery] class BigQueryClient(conf: Configuration) { private val PRIORITY = if (inConsole) "INTERACTIVE" else "BATCH" private val TABLE_ID_PREFIX = "spark_bigquery" private val JOB_ID_PREFIX = "spark_bigquery" - private val TIME_FORMATTER = DateTimeFormat.forPattern("yyyyMMddHHmmss") + private val TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyyMMddHHmmss") /** * Perform a BigQuery SELECT query and save results to a temporary table. @@ -163,7 +164,7 @@ private[bigquery] class BigQueryClient(conf: Configuration) { } private def temporaryTable(location: String): TableReference = { - val now = Instant.now().toString(TIME_FORMATTER) + val now = TIME_FORMATTER.format(Instant.now) val tableId = TABLE_ID_PREFIX + "_" + now + "_" + Random.nextInt(Int.MaxValue) new TableReference() .setProjectId(projectId) From 2ac252a968dc63307cee95ad13fa5b282a770f42 Mon Sep 17 00:00:00 2001 From: Omer Demirel Date: Wed, 8 Nov 2017 15:52:16 +0100 Subject: [PATCH 4/4] defaulting the time zone in Java 8 Time --- .../scala/com/spotify/spark/bigquery/BigQueryClient.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/main/scala/com/spotify/spark/bigquery/BigQueryClient.scala b/src/main/scala/com/spotify/spark/bigquery/BigQueryClient.scala index 6cac9d7..ae4c711 100644 --- a/src/main/scala/com/spotify/spark/bigquery/BigQueryClient.scala +++ b/src/main/scala/com/spotify/spark/bigquery/BigQueryClient.scala @@ -19,7 +19,7 @@ package com.spotify.spark.bigquery import java.util.UUID import java.util.concurrent.TimeUnit -import java.time.Instant +import java.time.{Instant, ZoneId} import java.time.format.DateTimeFormatter import com.google.api.client.googleapis.auth.oauth2.GoogleCredential @@ -98,7 +98,8 @@ private[bigquery] class BigQueryClient(conf: Configuration) { private val PRIORITY = if (inConsole) "INTERACTIVE" else "BATCH" private val TABLE_ID_PREFIX = "spark_bigquery" private val JOB_ID_PREFIX = "spark_bigquery" - private val TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyyMMddHHmmss") + private val timeFormatter = DateTimeFormatter.ofPattern("yyyyMMddHHmmss") + .withZone(ZoneId.systemDefault) /** * Perform a BigQuery SELECT query and save results to a temporary table. @@ -164,7 +165,7 @@ private[bigquery] class BigQueryClient(conf: Configuration) { } private def temporaryTable(location: String): TableReference = { - val now = TIME_FORMATTER.format(Instant.now) + val now = timeFormatter.format(Instant.now) val tableId = TABLE_ID_PREFIX + "_" + now + "_" + Random.nextInt(Int.MaxValue) new TableReference() .setProjectId(projectId)