From 6041b9119310b0fc41b8ad939d4a6abfd9238509 Mon Sep 17 00:00:00 2001 From: Vadim Zaripov Date: Mon, 17 Oct 2022 16:26:37 +0100 Subject: [PATCH 01/71] Autoschema --- .../shapefile_parcel_schema_detect.py | 154 ++++++++++++++++++ 1 file changed, 154 insertions(+) create mode 100644 geobeam/examples/shapefile_parcel_schema_detect.py diff --git a/geobeam/examples/shapefile_parcel_schema_detect.py b/geobeam/examples/shapefile_parcel_schema_detect.py new file mode 100644 index 0000000..48934cb --- /dev/null +++ b/geobeam/examples/shapefile_parcel_schema_detect.py @@ -0,0 +1,154 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Example pipeline that loads a county parcel shape dataset into BigQuery. +""" + +def orient_polygon(element): + from shapely.geometry import shape, polygon, MultiPolygon + + props, geom = element + geom_shape = shape(geom) + + if geom_shape.geom_type == 'Polygon': + oriented_geom = polygon.orient(geom_shape) + return props, oriented_geom + + if geom_shape.geom_type == 'MultiPolygon': + pgons = [] + for pgon in geom_shape.geoms: + pgons.append(polygon.orient(pgon)) + oriented_mpgon = MultiPolygon(pgons) + return props, oriented_mpgon + + return props, geom + + +def typecast_fields(record): + return { + **record, + 'LRSN': str(record['LRSN']) + } + +def get_schema(known_args): + + gcs_url=known_args.gcs_url + +bucket_name = gcs_url.split('/')[2] +file_name = '/'.join(gcs_url.split('/')[3:]) +zip_name = gcs_url.split('/')[-1].split('.')[0] + +storage_client = storage.Client() +blob = storage_client.bucket(bucket_name).get_blob(file_name) +source_bucket = storage_client.bucket(bucket_name) +blob_uri = gcs_url + +blob_2 = source_bucket.blob(file_name) +data = blob.download_as_string() + +profile = None + +if layer_name is not None: + with fiona.io.ZipMemoryFile(data) as zip: + with zip.open(f'{zip_name}.shp') as collection: + print(collection) + profile = collection.profile +elif layer_name is not None: + profile = BytesCollection(data, layer=layer_name).profile +else: + profile = fiona.open(gcs_url).profile + +from fiona import prop_type + +BQ_FIELD_TYPES = { + 'int': 'INT64', + 'str': 'STRING', + 'float': 'FLOAT64', + 'bool': 'BOOL', + 'date': 'DATE', + 'time': 'TIME', + 'datetime': 'DATETIME', + 'bytes': 'BYTES' +} + +bq_schema = [] + +for field_name, field_type in profile['schema']['properties'].items(): + fiona_type = prop_type(field_type) + bq_type = BQ_FIELD_TYPES[fiona.schema.FIELD_TYPES_MAP_REV[fiona_type]] + bq_schema.append({ + 'name': field_name, + 'type': bq_type + }) + +bq_schema.append({ + 'name': 'geom', + 'type': 'GEOGRAPHY', + 'description': '{} reprojected from {}. source: {}'.format( + profile['schema']['geometry'], profile['crs']['init'], profile['driver']) +}) + +#return json.JSONEncoder(sort_keys=True).encode({"fields": bq_schema}) + + +def run(pipeline_args, known_args): + """ + Invoked by the Beam runner + """ + + import apache_beam as beam + from apache_beam.io.gcp.internal.clients import bigquery as beam_bigquery + from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions + from geobeam.io import ShapefileSource + from geobeam.fn import format_record, make_valid, filter_invalid + + pipeline_options = PipelineOptions([ + '--experiments', 'use_beam_bq_sink', + ] + pipeline_args) + + with beam.Pipeline(options=pipeline_options) as p: + (p + | beam.io.Read(ShapefileSource(known_args.gcs_url, + layer_name=known_args.layer_name)) + | 'OrientPolygon' >> beam.Map(orient_polygon) + #| 'MakeValid' >> beam.Map(make_valid) + #| 'FilterInvalid' >> beam.Filter(filter_invalid) + | 'FormatRecords' >> beam.Map(format_record) + | 'TypecastRecords' >> beam.Map(typecast_fields) + | 'WriteToBigQuery' >> beam.io.WriteToBigQuery( + beam_bigquery.TableReference( + datasetId=known_args.dataset, + tableId=known_args.table), + schema=get_schema(known_args), + method=beam.io.WriteToBigQuery.Method.FILE_LOADS, + write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE, + create_disposition=beam.io.BigQueryDisposition.CREATE_IFNEEDED)) + + +if __name__ == '__main__': + import logging + import argparse + + logging.getLogger().setLevel(logging.INFO) + + parser = argparse.ArgumentParser() + parser.add_argument('--gcs_url') + parser.add_argument('--dataset') + parser.add_argument('--table') + parser.add_argument('--layer_name') + parser.add_argument('--in_epsg', type=int, default=None) + known_args, pipeline_args = parser.parse_known_args() + + run(pipeline_args, known_args) From c2c6d039f6ab7d900d4025547e6aec271b7d5964 Mon Sep 17 00:00:00 2001 From: Vadim Zaripov Date: Mon, 17 Oct 2022 16:42:16 +0100 Subject: [PATCH 02/71] Autoschema --- .../shapefile_parcel_schema_detect.py | 98 +++++++++---------- 1 file changed, 49 insertions(+), 49 deletions(-) diff --git a/geobeam/examples/shapefile_parcel_schema_detect.py b/geobeam/examples/shapefile_parcel_schema_detect.py index 48934cb..9e7fa31 100644 --- a/geobeam/examples/shapefile_parcel_schema_detect.py +++ b/geobeam/examples/shapefile_parcel_schema_detect.py @@ -44,61 +44,61 @@ def typecast_fields(record): def get_schema(known_args): - gcs_url=known_args.gcs_url - -bucket_name = gcs_url.split('/')[2] -file_name = '/'.join(gcs_url.split('/')[3:]) -zip_name = gcs_url.split('/')[-1].split('.')[0] + + gcs_url = known_args.gcs_url + bucket_name = gcs_url.split('/')[2] + file_name = '/'.join(gcs_url.split('/')[3:]) + zip_name = gcs_url.split('/')[-1].split('.')[0] -storage_client = storage.Client() -blob = storage_client.bucket(bucket_name).get_blob(file_name) -source_bucket = storage_client.bucket(bucket_name) -blob_uri = gcs_url + storage_client = storage.Client() + blob = storage_client.bucket(bucket_name).get_blob(file_name) + source_bucket = storage_client.bucket(bucket_name) + blob_uri = gcs_url -blob_2 = source_bucket.blob(file_name) -data = blob.download_as_string() + blob_2 = source_bucket.blob(file_name) + data = blob.download_as_string() -profile = None - -if layer_name is not None: - with fiona.io.ZipMemoryFile(data) as zip: - with zip.open(f'{zip_name}.shp') as collection: - print(collection) - profile = collection.profile -elif layer_name is not None: - profile = BytesCollection(data, layer=layer_name).profile -else: - profile = fiona.open(gcs_url).profile + profile = None + + if layer_name is not None: + with fiona.io.ZipMemoryFile(data) as zip: + with zip.open(f'{zip_name}.shp') as collection: + print(collection) + profile = collection.profile + elif layer_name is not None: + profile = BytesCollection(data, layer=layer_name).profile + else: + profile = fiona.open(gcs_url).profile -from fiona import prop_type - -BQ_FIELD_TYPES = { - 'int': 'INT64', - 'str': 'STRING', - 'float': 'FLOAT64', - 'bool': 'BOOL', - 'date': 'DATE', - 'time': 'TIME', - 'datetime': 'DATETIME', - 'bytes': 'BYTES' -} - -bq_schema = [] - -for field_name, field_type in profile['schema']['properties'].items(): - fiona_type = prop_type(field_type) - bq_type = BQ_FIELD_TYPES[fiona.schema.FIELD_TYPES_MAP_REV[fiona_type]] + from fiona import prop_type + + BQ_FIELD_TYPES = { + 'int': 'INT64', + 'str': 'STRING', + 'float': 'FLOAT64', + 'bool': 'BOOL', + 'date': 'DATE', + 'time': 'TIME', + 'datetime': 'DATETIME', + 'bytes': 'BYTES' + } + + bq_schema = [] + + for field_name, field_type in profile['schema']['properties'].items(): + fiona_type = prop_type(field_type) + bq_type = BQ_FIELD_TYPES[fiona.schema.FIELD_TYPES_MAP_REV[fiona_type]] + bq_schema.append({ + 'name': field_name, + 'type': bq_type + }) + bq_schema.append({ - 'name': field_name, - 'type': bq_type + 'name': 'geom', + 'type': 'GEOGRAPHY', + 'description': '{} reprojected from {}. source: {}'.format( + profile['schema']['geometry'], profile['crs']['init'], profile['driver']) }) - -bq_schema.append({ - 'name': 'geom', - 'type': 'GEOGRAPHY', - 'description': '{} reprojected from {}. source: {}'.format( - profile['schema']['geometry'], profile['crs']['init'], profile['driver']) -}) #return json.JSONEncoder(sort_keys=True).encode({"fields": bq_schema}) From f5a3a014728aa440f4b5fb6c193c7315bd021724 Mon Sep 17 00:00:00 2001 From: Vadim Zaripov Date: Mon, 17 Oct 2022 16:47:46 +0100 Subject: [PATCH 03/71] Autoschema --- .../shapefile_parcel_schema_detect.py | 21 ++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/geobeam/examples/shapefile_parcel_schema_detect.py b/geobeam/examples/shapefile_parcel_schema_detect.py index 9e7fa31..3dbc773 100644 --- a/geobeam/examples/shapefile_parcel_schema_detect.py +++ b/geobeam/examples/shapefile_parcel_schema_detect.py @@ -16,6 +16,20 @@ Example pipeline that loads a county parcel shape dataset into BigQuery. """ +import apache_beam as beam +import geobeam +from apache_beam.io.gcp.internal.clients import bigquery as beam_bigquery +from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json +from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions +from geobeam.io import ShapefileSource +from geobeam.fn import format_record, make_valid, filter_invalid +from geobeam.util import get_bigquery_schema_dataflow,get_bigquery_schema + +from google.cloud import storage +import fiona +import json +from fiona import BytesCollection + def orient_polygon(element): from shapely.geometry import shape, polygon, MultiPolygon @@ -107,13 +121,6 @@ def run(pipeline_args, known_args): """ Invoked by the Beam runner """ - - import apache_beam as beam - from apache_beam.io.gcp.internal.clients import bigquery as beam_bigquery - from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions - from geobeam.io import ShapefileSource - from geobeam.fn import format_record, make_valid, filter_invalid - pipeline_options = PipelineOptions([ '--experiments', 'use_beam_bq_sink', ] + pipeline_args) From b688a282a28057aa97a3ac06618a77d65639565c Mon Sep 17 00:00:00 2001 From: Vadim Zaripov Date: Mon, 17 Oct 2022 16:50:17 +0100 Subject: [PATCH 04/71] Autoschema --- geobeam/examples/shapefile_parcel_schema_detect.py | 1 + 1 file changed, 1 insertion(+) diff --git a/geobeam/examples/shapefile_parcel_schema_detect.py b/geobeam/examples/shapefile_parcel_schema_detect.py index 3dbc773..ab39ca5 100644 --- a/geobeam/examples/shapefile_parcel_schema_detect.py +++ b/geobeam/examples/shapefile_parcel_schema_detect.py @@ -73,6 +73,7 @@ def get_schema(known_args): data = blob.download_as_string() profile = None + layer_name= None if layer_name is not None: with fiona.io.ZipMemoryFile(data) as zip: From 78cf1554b123db9997aac76a2f5d119de30c3664 Mon Sep 17 00:00:00 2001 From: Vadim Zaripov Date: Mon, 17 Oct 2022 20:03:05 +0100 Subject: [PATCH 05/71] Autoschema --- geobeam/examples/shapefile_parcel_schema_detect.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/geobeam/examples/shapefile_parcel_schema_detect.py b/geobeam/examples/shapefile_parcel_schema_detect.py index ab39ca5..7be8abb 100644 --- a/geobeam/examples/shapefile_parcel_schema_detect.py +++ b/geobeam/examples/shapefile_parcel_schema_detect.py @@ -73,7 +73,7 @@ def get_schema(known_args): data = blob.download_as_string() profile = None - layer_name= None + layer_name= known_args.layer_name if layer_name is not None: with fiona.io.ZipMemoryFile(data) as zip: From ebd53e9fe7c7ed8aff374a6be58c9244c8a8d844 Mon Sep 17 00:00:00 2001 From: Vadim Zaripov Date: Mon, 17 Oct 2022 20:04:51 +0100 Subject: [PATCH 06/71] Autoschema --- geobeam/examples/shapefile_parcel_schema_detect.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/geobeam/examples/shapefile_parcel_schema_detect.py b/geobeam/examples/shapefile_parcel_schema_detect.py index 7be8abb..dfa258e 100644 --- a/geobeam/examples/shapefile_parcel_schema_detect.py +++ b/geobeam/examples/shapefile_parcel_schema_detect.py @@ -142,7 +142,7 @@ def run(pipeline_args, known_args): schema=get_schema(known_args), method=beam.io.WriteToBigQuery.Method.FILE_LOADS, write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE, - create_disposition=beam.io.BigQueryDisposition.CREATE_IFNEEDED)) + create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)) if __name__ == '__main__': From 3477c0b0235fb0beb93b11195ae5f175f1bf8cee Mon Sep 17 00:00:00 2001 From: Vadim Zaripov Date: Mon, 17 Oct 2022 20:29:10 +0100 Subject: [PATCH 07/71] Autoschema --- geobeam/examples/shapefile_parcel_schema_detect.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/geobeam/examples/shapefile_parcel_schema_detect.py b/geobeam/examples/shapefile_parcel_schema_detect.py index dfa258e..cf21336 100644 --- a/geobeam/examples/shapefile_parcel_schema_detect.py +++ b/geobeam/examples/shapefile_parcel_schema_detect.py @@ -30,6 +30,7 @@ import json from fiona import BytesCollection +''' def orient_polygon(element): from shapely.geometry import shape, polygon, MultiPolygon @@ -50,11 +51,14 @@ def orient_polygon(element): return props, geom + def typecast_fields(record): return { **record, 'LRSN': str(record['LRSN']) } + +''' def get_schema(known_args): @@ -130,11 +134,9 @@ def run(pipeline_args, known_args): (p | beam.io.Read(ShapefileSource(known_args.gcs_url, layer_name=known_args.layer_name)) - | 'OrientPolygon' >> beam.Map(orient_polygon) - #| 'MakeValid' >> beam.Map(make_valid) - #| 'FilterInvalid' >> beam.Filter(filter_invalid) + | 'MakeValid' >> beam.Map(make_valid) + | 'FilterInvalid' >> beam.Filter(filter_invalid) | 'FormatRecords' >> beam.Map(format_record) - | 'TypecastRecords' >> beam.Map(typecast_fields) | 'WriteToBigQuery' >> beam.io.WriteToBigQuery( beam_bigquery.TableReference( datasetId=known_args.dataset, @@ -143,6 +145,7 @@ def run(pipeline_args, known_args): method=beam.io.WriteToBigQuery.Method.FILE_LOADS, write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE, create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)) + if __name__ == '__main__': From 6ffe111a916d5f73dbba71d2eed3e372acc3c4df Mon Sep 17 00:00:00 2001 From: Vadim Zaripov Date: Mon, 17 Oct 2022 21:28:11 +0100 Subject: [PATCH 08/71] Autoschema --- geobeam/examples/shapefile_parcel_schema_detect.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/geobeam/examples/shapefile_parcel_schema_detect.py b/geobeam/examples/shapefile_parcel_schema_detect.py index cf21336..eb6fbfa 100644 --- a/geobeam/examples/shapefile_parcel_schema_detect.py +++ b/geobeam/examples/shapefile_parcel_schema_detect.py @@ -92,9 +92,9 @@ def get_schema(known_args): from fiona import prop_type BQ_FIELD_TYPES = { - 'int': 'INT64', + 'int': 'INTEGER', 'str': 'STRING', - 'float': 'FLOAT64', + 'float': 'FLOAT', 'bool': 'BOOL', 'date': 'DATE', 'time': 'TIME', @@ -119,7 +119,7 @@ def get_schema(known_args): profile['schema']['geometry'], profile['crs']['init'], profile['driver']) }) -#return json.JSONEncoder(sort_keys=True).encode({"fields": bq_schema}) + return json.JSONEncoder(sort_keys=True).encode({"fields": bq_schema}) def run(pipeline_args, known_args): From a60c157c6399da2791b24712b9a5101380752a3d Mon Sep 17 00:00:00 2001 From: Vadim Zaripov Date: Mon, 17 Oct 2022 21:36:40 +0100 Subject: [PATCH 09/71] Autoschema --- geobeam/examples/shapefile_parcel_schema_detect.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/geobeam/examples/shapefile_parcel_schema_detect.py b/geobeam/examples/shapefile_parcel_schema_detect.py index eb6fbfa..4ce6aef 100644 --- a/geobeam/examples/shapefile_parcel_schema_detect.py +++ b/geobeam/examples/shapefile_parcel_schema_detect.py @@ -81,7 +81,7 @@ def get_schema(known_args): if layer_name is not None: with fiona.io.ZipMemoryFile(data) as zip: - with zip.open(f'{zip_name}.shp') as collection: + with zip.open(f'{zip_name}.shp',layer_name) as collection: print(collection) profile = collection.profile elif layer_name is not None: @@ -119,7 +119,7 @@ def get_schema(known_args): profile['schema']['geometry'], profile['crs']['init'], profile['driver']) }) - return json.JSONEncoder(sort_keys=True).encode({"fields": bq_schema}) + return json.JSONEncoder(sort_keys=True).encode(bq_schema) def run(pipeline_args, known_args): From 6ecebddbd98da88bc8e30168ed5008ce14cb1d49 Mon Sep 17 00:00:00 2001 From: Vadim Zaripov Date: Mon, 17 Oct 2022 21:43:39 +0100 Subject: [PATCH 10/71] Autoschema --- geobeam/examples/shapefile_parcel_schema_detect.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/geobeam/examples/shapefile_parcel_schema_detect.py b/geobeam/examples/shapefile_parcel_schema_detect.py index 4ce6aef..038e052 100644 --- a/geobeam/examples/shapefile_parcel_schema_detect.py +++ b/geobeam/examples/shapefile_parcel_schema_detect.py @@ -115,9 +115,7 @@ def get_schema(known_args): bq_schema.append({ 'name': 'geom', 'type': 'GEOGRAPHY', - 'description': '{} reprojected from {}. source: {}'.format( - profile['schema']['geometry'], profile['crs']['init'], profile['driver']) - }) + }) return json.JSONEncoder(sort_keys=True).encode(bq_schema) From c7ba624e007becee09a90c51b01439070350e7d5 Mon Sep 17 00:00:00 2001 From: Vadim Zaripov Date: Mon, 17 Oct 2022 22:05:51 +0100 Subject: [PATCH 11/71] Autoschema --- geobeam/examples/shapefile_parcel_schema_detect.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/geobeam/examples/shapefile_parcel_schema_detect.py b/geobeam/examples/shapefile_parcel_schema_detect.py index 038e052..60a7aaa 100644 --- a/geobeam/examples/shapefile_parcel_schema_detect.py +++ b/geobeam/examples/shapefile_parcel_schema_detect.py @@ -139,7 +139,8 @@ def run(pipeline_args, known_args): beam_bigquery.TableReference( datasetId=known_args.dataset, tableId=known_args.table), - schema=get_schema(known_args), + #schema=get_schema(known_args), + schema='SCHEMA_AUTODETECT', method=beam.io.WriteToBigQuery.Method.FILE_LOADS, write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE, create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)) From 6a060ed5517445e323b126e4156f761bf392647c Mon Sep 17 00:00:00 2001 From: Vadim Zaripov Date: Mon, 17 Oct 2022 22:11:23 +0100 Subject: [PATCH 12/71] Autoschema --- geobeam/examples/shapefile_parcel_schema_detect.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/geobeam/examples/shapefile_parcel_schema_detect.py b/geobeam/examples/shapefile_parcel_schema_detect.py index 60a7aaa..63fc605 100644 --- a/geobeam/examples/shapefile_parcel_schema_detect.py +++ b/geobeam/examples/shapefile_parcel_schema_detect.py @@ -117,7 +117,8 @@ def get_schema(known_args): 'type': 'GEOGRAPHY', }) - return json.JSONEncoder(sort_keys=True).encode(bq_schema) + #return json.JSONEncoder(sort_keys=True).encode(bq_schema) + return bq_schema def run(pipeline_args, known_args): From 021b969ec6196b6bdc5e56ac00b4120c61d57ba3 Mon Sep 17 00:00:00 2001 From: Vadim Zaripov Date: Mon, 17 Oct 2022 22:15:16 +0100 Subject: [PATCH 13/71] Autoschema --- geobeam/examples/shapefile_parcel_schema_detect.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/geobeam/examples/shapefile_parcel_schema_detect.py b/geobeam/examples/shapefile_parcel_schema_detect.py index 63fc605..d3a800f 100644 --- a/geobeam/examples/shapefile_parcel_schema_detect.py +++ b/geobeam/examples/shapefile_parcel_schema_detect.py @@ -140,8 +140,8 @@ def run(pipeline_args, known_args): beam_bigquery.TableReference( datasetId=known_args.dataset, tableId=known_args.table), - #schema=get_schema(known_args), - schema='SCHEMA_AUTODETECT', + schema=get_schema(known_args), + #schema='SCHEMA_AUTODETECT', method=beam.io.WriteToBigQuery.Method.FILE_LOADS, write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE, create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)) From 49baf4c5dccf1a36b6fe2af85320e25ba8c55947 Mon Sep 17 00:00:00 2001 From: Vadim Zaripov Date: Mon, 17 Oct 2022 22:23:59 +0100 Subject: [PATCH 14/71] Autoschema --- geobeam/examples/shapefile_parcel_schema_detect.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/geobeam/examples/shapefile_parcel_schema_detect.py b/geobeam/examples/shapefile_parcel_schema_detect.py index d3a800f..80f360f 100644 --- a/geobeam/examples/shapefile_parcel_schema_detect.py +++ b/geobeam/examples/shapefile_parcel_schema_detect.py @@ -117,8 +117,8 @@ def get_schema(known_args): 'type': 'GEOGRAPHY', }) - #return json.JSONEncoder(sort_keys=True).encode(bq_schema) - return bq_schema + return json.JSONEncoder(sort_keys=True).encode(bq_schema) + #return bq_schema def run(pipeline_args, known_args): From 192f5a7fcf1b3030898912768e9596d6baf1a4df Mon Sep 17 00:00:00 2001 From: Vadim Zaripov Date: Mon, 17 Oct 2022 22:31:01 +0100 Subject: [PATCH 15/71] Autoschema --- geobeam/examples/shapefile_parcel_schema_detect.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/geobeam/examples/shapefile_parcel_schema_detect.py b/geobeam/examples/shapefile_parcel_schema_detect.py index 80f360f..f171e3b 100644 --- a/geobeam/examples/shapefile_parcel_schema_detect.py +++ b/geobeam/examples/shapefile_parcel_schema_detect.py @@ -117,7 +117,7 @@ def get_schema(known_args): 'type': 'GEOGRAPHY', }) - return json.JSONEncoder(sort_keys=True).encode(bq_schema) + return json.JSONEncoder(sort_keys=True).encode({"field": bq_schema}) #return bq_schema From 489c85111f812d388476cc93ac0102b308ec8142 Mon Sep 17 00:00:00 2001 From: Vadim Zaripov Date: Mon, 17 Oct 2022 22:35:29 +0100 Subject: [PATCH 16/71] Autoschema --- geobeam/examples/shapefile_parcel_schema_detect.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/geobeam/examples/shapefile_parcel_schema_detect.py b/geobeam/examples/shapefile_parcel_schema_detect.py index f171e3b..ae42b4f 100644 --- a/geobeam/examples/shapefile_parcel_schema_detect.py +++ b/geobeam/examples/shapefile_parcel_schema_detect.py @@ -117,7 +117,7 @@ def get_schema(known_args): 'type': 'GEOGRAPHY', }) - return json.JSONEncoder(sort_keys=True).encode({"field": bq_schema}) + return json.JSONEncoder(sort_keys=True).encode({"fields": bq_schema}) #return bq_schema From 36b2e9f6a48d948c6908ceeddead3b452fc40292 Mon Sep 17 00:00:00 2001 From: Vadim Zaripov Date: Mon, 17 Oct 2022 22:47:41 +0100 Subject: [PATCH 17/71] Autoschema --- geobeam/examples/shapefile_parcel_schema_detect.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/geobeam/examples/shapefile_parcel_schema_detect.py b/geobeam/examples/shapefile_parcel_schema_detect.py index ae42b4f..6eeba26 100644 --- a/geobeam/examples/shapefile_parcel_schema_detect.py +++ b/geobeam/examples/shapefile_parcel_schema_detect.py @@ -133,7 +133,7 @@ def run(pipeline_args, known_args): (p | beam.io.Read(ShapefileSource(known_args.gcs_url, layer_name=known_args.layer_name)) - | 'MakeValid' >> beam.Map(make_valid) + #| 'MakeValid' >> beam.Map(make_valid) | 'FilterInvalid' >> beam.Filter(filter_invalid) | 'FormatRecords' >> beam.Map(format_record) | 'WriteToBigQuery' >> beam.io.WriteToBigQuery( From 659b1c4b4274bb8cc8c70bdbf34dcd2e37a3571d Mon Sep 17 00:00:00 2001 From: Vadim Zaripov Date: Mon, 17 Oct 2022 23:42:02 +0100 Subject: [PATCH 18/71] Autoschema --- .../shapefile_parcel_schema_detect.py | 127 ++++++++++-------- 1 file changed, 71 insertions(+), 56 deletions(-) diff --git a/geobeam/examples/shapefile_parcel_schema_detect.py b/geobeam/examples/shapefile_parcel_schema_detect.py index 6eeba26..02ff3f2 100644 --- a/geobeam/examples/shapefile_parcel_schema_detect.py +++ b/geobeam/examples/shapefile_parcel_schema_detect.py @@ -17,6 +17,7 @@ """ import apache_beam as beam +from google.cloud import bigquery import geobeam from apache_beam.io.gcp.internal.clients import bigquery as beam_bigquery from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json @@ -60,65 +61,80 @@ def typecast_fields(record): ''' -def get_schema(known_args): - - - gcs_url = known_args.gcs_url - bucket_name = gcs_url.split('/')[2] - file_name = '/'.join(gcs_url.split('/')[3:]) - zip_name = gcs_url.split('/')[-1].split('.')[0] + +gcs_url = known_args.gcs_url +bucket_name = gcs_url.split('/')[2] +file_name = '/'.join(gcs_url.split('/')[3:]) +zip_name = gcs_url.split('/')[-1].split('.')[0] - storage_client = storage.Client() - blob = storage_client.bucket(bucket_name).get_blob(file_name) - source_bucket = storage_client.bucket(bucket_name) - blob_uri = gcs_url +storage_client = storage.Client() +blob = storage_client.bucket(bucket_name).get_blob(file_name) +source_bucket = storage_client.bucket(bucket_name) +blob_uri = gcs_url - blob_2 = source_bucket.blob(file_name) - data = blob.download_as_string() +blob_2 = source_bucket.blob(file_name) +data = blob.download_as_string() - profile = None - layer_name= known_args.layer_name - - if layer_name is not None: - with fiona.io.ZipMemoryFile(data) as zip: - with zip.open(f'{zip_name}.shp',layer_name) as collection: - print(collection) - profile = collection.profile - elif layer_name is not None: - profile = BytesCollection(data, layer=layer_name).profile - else: - profile = fiona.open(gcs_url).profile +profile = None +layer_name= known_args.layer_name + +if layer_name is not None: + with fiona.io.ZipMemoryFile(data) as zip: + with zip.open(f'{zip_name}.shp',layer_name) as collection: + print(collection) + profile = collection.profile +elif layer_name is not None: + profile = BytesCollection(data, layer=layer_name).profile +else: + profile = fiona.open(gcs_url).profile - from fiona import prop_type - - BQ_FIELD_TYPES = { - 'int': 'INTEGER', - 'str': 'STRING', - 'float': 'FLOAT', - 'bool': 'BOOL', - 'date': 'DATE', - 'time': 'TIME', - 'datetime': 'DATETIME', - 'bytes': 'BYTES' - } - - bq_schema = [] +from fiona import prop_type + +BQ_FIELD_TYPES = { + 'int': 'INTEGER', + 'str': 'STRING', + 'float': 'FLOAT', + 'bool': 'BOOL', + 'date': 'DATE', + 'time': 'TIME', + 'datetime': 'DATETIME', + 'bytes': 'BYTES' +} + +bq_schema = [] + +for field_name, field_type in profile['schema']['properties'].items(): + fiona_type = prop_type(field_type) + bq_type = BQ_FIELD_TYPES[fiona.schema.FIELD_TYPES_MAP_REV[fiona_type]] + bq_schema.append({ + 'name': field_name, + 'type': bq_type + }) - for field_name, field_type in profile['schema']['properties'].items(): - fiona_type = prop_type(field_type) - bq_type = BQ_FIELD_TYPES[fiona.schema.FIELD_TYPES_MAP_REV[fiona_type]] - bq_schema.append({ - 'name': field_name, - 'type': bq_type +bq_schema.append({ + 'name': 'geom', + 'type': 'GEOGRAPHY', }) - - bq_schema.append({ - 'name': 'geom', - 'type': 'GEOGRAPHY', - }) - return json.JSONEncoder(sort_keys=True).encode({"fields": bq_schema}) - #return bq_schema +schema_json = json.JSONEncoder(sort_keys=True).encode(bq_schema) + +# Construct a BigQuery client object. +client = bigquery.Client() + +# TODO(developer): Set table_id to the ID of the table to create. +table_id = format(known_args.project, known_args.dataset, known_args.table) + +bigquerySchema = [] + +bigqueryColumns = json.loads(schema_json) +for col in bigqueryColumns: + bigquerySchema.append(bigquery.SchemaField(col['name'], col['type'])) + +table = bigquery.Table(table_id, schema=bigquerySchema) +table = client.create_table(table) # Make an API request. +print( + "Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id) +) def run(pipeline_args, known_args): @@ -133,18 +149,16 @@ def run(pipeline_args, known_args): (p | beam.io.Read(ShapefileSource(known_args.gcs_url, layer_name=known_args.layer_name)) - #| 'MakeValid' >> beam.Map(make_valid) + | 'MakeValid' >> beam.Map(make_valid) | 'FilterInvalid' >> beam.Filter(filter_invalid) | 'FormatRecords' >> beam.Map(format_record) | 'WriteToBigQuery' >> beam.io.WriteToBigQuery( beam_bigquery.TableReference( datasetId=known_args.dataset, tableId=known_args.table), - schema=get_schema(known_args), - #schema='SCHEMA_AUTODETECT', method=beam.io.WriteToBigQuery.Method.FILE_LOADS, write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE, - create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)) + create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER)) @@ -158,6 +172,7 @@ def run(pipeline_args, known_args): parser.add_argument('--gcs_url') parser.add_argument('--dataset') parser.add_argument('--table') + parser.add_argument('--project') parser.add_argument('--layer_name') parser.add_argument('--in_epsg', type=int, default=None) known_args, pipeline_args = parser.parse_known_args() From 11333cbcf0c08b06c58b3f09d8330287bc765b35 Mon Sep 17 00:00:00 2001 From: Vadim Zaripov Date: Mon, 17 Oct 2022 23:46:35 +0100 Subject: [PATCH 19/71] Autoschema --- .../shapefile_parcel_schema_detect.py | 132 +++++++++--------- 1 file changed, 67 insertions(+), 65 deletions(-) diff --git a/geobeam/examples/shapefile_parcel_schema_detect.py b/geobeam/examples/shapefile_parcel_schema_detect.py index 02ff3f2..1b9cb6e 100644 --- a/geobeam/examples/shapefile_parcel_schema_detect.py +++ b/geobeam/examples/shapefile_parcel_schema_detect.py @@ -17,7 +17,6 @@ """ import apache_beam as beam -from google.cloud import bigquery import geobeam from apache_beam.io.gcp.internal.clients import bigquery as beam_bigquery from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json @@ -61,80 +60,82 @@ def typecast_fields(record): ''' - -gcs_url = known_args.gcs_url -bucket_name = gcs_url.split('/')[2] -file_name = '/'.join(gcs_url.split('/')[3:]) -zip_name = gcs_url.split('/')[-1].split('.')[0] +def create_table(known_args): + + + gcs_url = known_args.gcs_url + bucket_name = gcs_url.split('/')[2] + file_name = '/'.join(gcs_url.split('/')[3:]) + zip_name = gcs_url.split('/')[-1].split('.')[0] -storage_client = storage.Client() -blob = storage_client.bucket(bucket_name).get_blob(file_name) -source_bucket = storage_client.bucket(bucket_name) -blob_uri = gcs_url + storage_client = storage.Client() + blob = storage_client.bucket(bucket_name).get_blob(file_name) + source_bucket = storage_client.bucket(bucket_name) + blob_uri = gcs_url -blob_2 = source_bucket.blob(file_name) -data = blob.download_as_string() + blob_2 = source_bucket.blob(file_name) + data = blob.download_as_string() -profile = None -layer_name= known_args.layer_name - -if layer_name is not None: - with fiona.io.ZipMemoryFile(data) as zip: - with zip.open(f'{zip_name}.shp',layer_name) as collection: - print(collection) - profile = collection.profile -elif layer_name is not None: - profile = BytesCollection(data, layer=layer_name).profile -else: - profile = fiona.open(gcs_url).profile + profile = None + layer_name= known_args.layer_name + + if layer_name is not None: + with fiona.io.ZipMemoryFile(data) as zip: + with zip.open(f'{zip_name}.shp',layer_name) as collection: + print(collection) + profile = collection.profile + elif layer_name is not None: + profile = BytesCollection(data, layer=layer_name).profile + else: + profile = fiona.open(gcs_url).profile -from fiona import prop_type - -BQ_FIELD_TYPES = { - 'int': 'INTEGER', - 'str': 'STRING', - 'float': 'FLOAT', - 'bool': 'BOOL', - 'date': 'DATE', - 'time': 'TIME', - 'datetime': 'DATETIME', - 'bytes': 'BYTES' -} - -bq_schema = [] - -for field_name, field_type in profile['schema']['properties'].items(): - fiona_type = prop_type(field_type) - bq_type = BQ_FIELD_TYPES[fiona.schema.FIELD_TYPES_MAP_REV[fiona_type]] - bq_schema.append({ - 'name': field_name, - 'type': bq_type - }) + from fiona import prop_type + + BQ_FIELD_TYPES = { + 'int': 'INTEGER', + 'str': 'STRING', + 'float': 'FLOAT', + 'bool': 'BOOL', + 'date': 'DATE', + 'time': 'TIME', + 'datetime': 'DATETIME', + 'bytes': 'BYTES' + } + + bq_schema = [] -bq_schema.append({ - 'name': 'geom', - 'type': 'GEOGRAPHY', + for field_name, field_type in profile['schema']['properties'].items(): + fiona_type = prop_type(field_type) + bq_type = BQ_FIELD_TYPES[fiona.schema.FIELD_TYPES_MAP_REV[fiona_type]] + bq_schema.append({ + 'name': field_name, + 'type': bq_type }) + + bq_schema.append({ + 'name': 'geom', + 'type': 'GEOGRAPHY', + }) -schema_json = json.JSONEncoder(sort_keys=True).encode(bq_schema) + schema_json = json.JSONEncoder(sort_keys=True).encode(bq_schema) -# Construct a BigQuery client object. -client = bigquery.Client() + # Construct a BigQuery client object. + client = bigquery.Client() -# TODO(developer): Set table_id to the ID of the table to create. -table_id = format(known_args.project, known_args.dataset, known_args.table) + # TODO(developer): Set table_id to the ID of the table to create. + table_id = format(known_args.project, known_args.dataset, known_args.table) -bigquerySchema = [] + bigquerySchema = [] -bigqueryColumns = json.loads(schema_json) -for col in bigqueryColumns: - bigquerySchema.append(bigquery.SchemaField(col['name'], col['type'])) + bigqueryColumns = json.loads(schema_json) + for col in bigqueryColumns: + bigquerySchema.append(bigquery.SchemaField(col['name'], col['type'])) -table = bigquery.Table(table_id, schema=bigquerySchema) -table = client.create_table(table) # Make an API request. -print( - "Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id) -) + table = bigquery.Table(table_id, schema=bigquerySchema) + table = client.create_table(table) # Make an API request. + print( + "Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id) + ) def run(pipeline_args, known_args): @@ -158,7 +159,7 @@ def run(pipeline_args, known_args): tableId=known_args.table), method=beam.io.WriteToBigQuery.Method.FILE_LOADS, write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE, - create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER)) + create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)) @@ -171,10 +172,11 @@ def run(pipeline_args, known_args): parser = argparse.ArgumentParser() parser.add_argument('--gcs_url') parser.add_argument('--dataset') - parser.add_argument('--table') parser.add_argument('--project') + parser.add_argument('--table') parser.add_argument('--layer_name') parser.add_argument('--in_epsg', type=int, default=None) known_args, pipeline_args = parser.parse_known_args() + create_table(known_args) run(pipeline_args, known_args) From e3cdacfa5f0f15a95131e4fcec4751885d1f1c31 Mon Sep 17 00:00:00 2001 From: Vadim Zaripov Date: Mon, 17 Oct 2022 23:47:14 +0100 Subject: [PATCH 20/71] Autoschema --- geobeam/examples/shapefile_parcel_schema_detect.py | 1 + 1 file changed, 1 insertion(+) diff --git a/geobeam/examples/shapefile_parcel_schema_detect.py b/geobeam/examples/shapefile_parcel_schema_detect.py index 1b9cb6e..4e0e084 100644 --- a/geobeam/examples/shapefile_parcel_schema_detect.py +++ b/geobeam/examples/shapefile_parcel_schema_detect.py @@ -17,6 +17,7 @@ """ import apache_beam as beam +from google.cloud import bigquery import geobeam from apache_beam.io.gcp.internal.clients import bigquery as beam_bigquery from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json From 5b690c1b9f4bdf201e02c7377083a30678e00df2 Mon Sep 17 00:00:00 2001 From: Vadim Zaripov Date: Tue, 18 Oct 2022 00:04:49 +0100 Subject: [PATCH 21/71] Autoschema --- geobeam/examples/shapefile_parcel_schema_detect.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/geobeam/examples/shapefile_parcel_schema_detect.py b/geobeam/examples/shapefile_parcel_schema_detect.py index 4e0e084..b50075b 100644 --- a/geobeam/examples/shapefile_parcel_schema_detect.py +++ b/geobeam/examples/shapefile_parcel_schema_detect.py @@ -124,7 +124,8 @@ def create_table(known_args): client = bigquery.Client() # TODO(developer): Set table_id to the ID of the table to create. - table_id = format(known_args.project, known_args.dataset, known_args.table) + #table_id = format(known_args.project, known_args.dataset, known_args.table) + table_id=f"{known_args.project}.{known_args.dataset}.{known_args.table}" bigquerySchema = [] From 604367686dcdaa174a89cbdd9d66d1b0fbdd100c Mon Sep 17 00:00:00 2001 From: Vadim Zaripov Date: Tue, 18 Oct 2022 00:10:45 +0100 Subject: [PATCH 22/71] Autoschema --- geobeam/examples/shapefile_parcel_schema_detect.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/geobeam/examples/shapefile_parcel_schema_detect.py b/geobeam/examples/shapefile_parcel_schema_detect.py index b50075b..ec23a9e 100644 --- a/geobeam/examples/shapefile_parcel_schema_detect.py +++ b/geobeam/examples/shapefile_parcel_schema_detect.py @@ -60,7 +60,7 @@ def typecast_fields(record): } ''' - + def create_table(known_args): @@ -134,6 +134,7 @@ def create_table(known_args): bigquerySchema.append(bigquery.SchemaField(col['name'], col['type'])) table = bigquery.Table(table_id, schema=bigquerySchema) + table = client.delete_table(table) table = client.create_table(table) # Make an API request. print( "Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id) From 3afd0ced28d63b70eb0bf4ec774cd6ba827e0fbb Mon Sep 17 00:00:00 2001 From: Vadim Zaripov Date: Tue, 18 Oct 2022 00:12:29 +0100 Subject: [PATCH 23/71] Autoschema --- geobeam/examples/shapefile_parcel_schema_detect.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/geobeam/examples/shapefile_parcel_schema_detect.py b/geobeam/examples/shapefile_parcel_schema_detect.py index ec23a9e..6f64af7 100644 --- a/geobeam/examples/shapefile_parcel_schema_detect.py +++ b/geobeam/examples/shapefile_parcel_schema_detect.py @@ -126,6 +126,7 @@ def create_table(known_args): # TODO(developer): Set table_id to the ID of the table to create. #table_id = format(known_args.project, known_args.dataset, known_args.table) table_id=f"{known_args.project}.{known_args.dataset}.{known_args.table}" + table = client.delete_table(table_id) bigquerySchema = [] @@ -134,7 +135,6 @@ def create_table(known_args): bigquerySchema.append(bigquery.SchemaField(col['name'], col['type'])) table = bigquery.Table(table_id, schema=bigquerySchema) - table = client.delete_table(table) table = client.create_table(table) # Make an API request. print( "Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id) From 581b71ce754792dcfe7a0e49eca754917e07baad Mon Sep 17 00:00:00 2001 From: Vadim Zaripov Date: Tue, 18 Oct 2022 00:21:13 +0100 Subject: [PATCH 24/71] Autoschema --- .../shapefile_parcel_schema_detect.py | 32 +++++++++++-------- 1 file changed, 19 insertions(+), 13 deletions(-) diff --git a/geobeam/examples/shapefile_parcel_schema_detect.py b/geobeam/examples/shapefile_parcel_schema_detect.py index 6f64af7..0c87143 100644 --- a/geobeam/examples/shapefile_parcel_schema_detect.py +++ b/geobeam/examples/shapefile_parcel_schema_detect.py @@ -18,6 +18,7 @@ import apache_beam as beam from google.cloud import bigquery +from google.api_core.exceptions import NotFound import geobeam from apache_beam.io.gcp.internal.clients import bigquery as beam_bigquery from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json @@ -126,19 +127,24 @@ def create_table(known_args): # TODO(developer): Set table_id to the ID of the table to create. #table_id = format(known_args.project, known_args.dataset, known_args.table) table_id=f"{known_args.project}.{known_args.dataset}.{known_args.table}" - table = client.delete_table(table_id) - - bigquerySchema = [] - - bigqueryColumns = json.loads(schema_json) - for col in bigqueryColumns: - bigquerySchema.append(bigquery.SchemaField(col['name'], col['type'])) - - table = bigquery.Table(table_id, schema=bigquerySchema) - table = client.create_table(table) # Make an API request. - print( - "Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id) - ) + + if not self.exists(table_id): + raise NotFoundException("Table does not exist") + + table_ref = self.client.dataset(self.dataset_id).table(table_id) + try: + self.client.delete_table(table_ref) + except NotFound: + # Ignore 404 error which may occur if table already deleted + table = client.delete_table(table_id) + bigquerySchema = [] + bigqueryColumns = json.loads(schema_json) + for col in bigqueryColumns: + bigquerySchema.append(bigquery.SchemaField(col['name'], col['type'])) + table = bigquery.Table(table_id, schema=bigquerySchema) + table = client.create_table(table) # Make an API request. + except self.http_error as ex: + self.process_http_error(ex) def run(pipeline_args, known_args): From 6afd985b33c7edc3da3aae7a1424a90d11c768af Mon Sep 17 00:00:00 2001 From: Vadim Zaripov Date: Tue, 18 Oct 2022 00:23:42 +0100 Subject: [PATCH 25/71] Autoschema --- geobeam/examples/shapefile_parcel_schema_detect.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/geobeam/examples/shapefile_parcel_schema_detect.py b/geobeam/examples/shapefile_parcel_schema_detect.py index 0c87143..5c29f48 100644 --- a/geobeam/examples/shapefile_parcel_schema_detect.py +++ b/geobeam/examples/shapefile_parcel_schema_detect.py @@ -128,12 +128,12 @@ def create_table(known_args): #table_id = format(known_args.project, known_args.dataset, known_args.table) table_id=f"{known_args.project}.{known_args.dataset}.{known_args.table}" - if not self.exists(table_id): + if not exists(table_id): raise NotFoundException("Table does not exist") - table_ref = self.client.dataset(self.dataset_id).table(table_id) + table_ref = client.dataset(dataset_id).table(table_id) try: - self.client.delete_table(table_ref) + client.delete_table(table_ref) except NotFound: # Ignore 404 error which may occur if table already deleted table = client.delete_table(table_id) @@ -143,8 +143,8 @@ def create_table(known_args): bigquerySchema.append(bigquery.SchemaField(col['name'], col['type'])) table = bigquery.Table(table_id, schema=bigquerySchema) table = client.create_table(table) # Make an API request. - except self.http_error as ex: - self.process_http_error(ex) + except http_error as ex: + process_http_error(ex) def run(pipeline_args, known_args): From 81a88479de6d30d3ff9baf784bd7692f8e6ff5d3 Mon Sep 17 00:00:00 2001 From: Vadim Zaripov Date: Tue, 18 Oct 2022 00:26:42 +0100 Subject: [PATCH 26/71] Autoschema --- .../shapefile_parcel_schema_detect.py | 32 +++++++++---------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/geobeam/examples/shapefile_parcel_schema_detect.py b/geobeam/examples/shapefile_parcel_schema_detect.py index 5c29f48..e62dea8 100644 --- a/geobeam/examples/shapefile_parcel_schema_detect.py +++ b/geobeam/examples/shapefile_parcel_schema_detect.py @@ -18,7 +18,6 @@ import apache_beam as beam from google.cloud import bigquery -from google.api_core.exceptions import NotFound import geobeam from apache_beam.io.gcp.internal.clients import bigquery as beam_bigquery from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json @@ -128,23 +127,24 @@ def create_table(known_args): #table_id = format(known_args.project, known_args.dataset, known_args.table) table_id=f"{known_args.project}.{known_args.dataset}.{known_args.table}" - if not exists(table_id): - raise NotFoundException("Table does not exist") - - table_ref = client.dataset(dataset_id).table(table_id) try: - client.delete_table(table_ref) + client.get_table(table_id) # Make an API request. + print("Table {} already exists.".format(table_id)) except NotFound: - # Ignore 404 error which may occur if table already deleted - table = client.delete_table(table_id) - bigquerySchema = [] - bigqueryColumns = json.loads(schema_json) - for col in bigqueryColumns: - bigquerySchema.append(bigquery.SchemaField(col['name'], col['type'])) - table = bigquery.Table(table_id, schema=bigquerySchema) - table = client.create_table(table) # Make an API request. - except http_error as ex: - process_http_error(ex) + print("Table {} is not found.".format(table_id)) + table = client.delete_table(table_id) + + bigquerySchema = [] + + bigqueryColumns = json.loads(schema_json) + for col in bigqueryColumns: + bigquerySchema.append(bigquery.SchemaField(col['name'], col['type'])) + + table = bigquery.Table(table_id, schema=bigquerySchema) + table = client.create_table(table) # Make an API request. + print( + "Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id) + ) def run(pipeline_args, known_args): From d4068084bacf5f87ccb7247a2604d3704b94b199 Mon Sep 17 00:00:00 2001 From: Vadim Zaripov Date: Tue, 18 Oct 2022 00:27:18 +0100 Subject: [PATCH 27/71] Autoschema --- geobeam/examples/shapefile_parcel_schema_detect.py | 1 + 1 file changed, 1 insertion(+) diff --git a/geobeam/examples/shapefile_parcel_schema_detect.py b/geobeam/examples/shapefile_parcel_schema_detect.py index e62dea8..b1ee437 100644 --- a/geobeam/examples/shapefile_parcel_schema_detect.py +++ b/geobeam/examples/shapefile_parcel_schema_detect.py @@ -18,6 +18,7 @@ import apache_beam as beam from google.cloud import bigquery +from google.cloud.exceptions import NotFound import geobeam from apache_beam.io.gcp.internal.clients import bigquery as beam_bigquery from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json From 2b5fd701375f0a61c645660f30ec6c2bf763cedf Mon Sep 17 00:00:00 2001 From: Vadim Zaripov Date: Tue, 18 Oct 2022 00:29:24 +0100 Subject: [PATCH 28/71] Autoschema --- geobeam/examples/shapefile_parcel_schema_detect.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/geobeam/examples/shapefile_parcel_schema_detect.py b/geobeam/examples/shapefile_parcel_schema_detect.py index b1ee437..0c98667 100644 --- a/geobeam/examples/shapefile_parcel_schema_detect.py +++ b/geobeam/examples/shapefile_parcel_schema_detect.py @@ -131,9 +131,10 @@ def create_table(known_args): try: client.get_table(table_id) # Make an API request. print("Table {} already exists.".format(table_id)) + table = client.delete_table(table_id) except NotFound: print("Table {} is not found.".format(table_id)) - table = client.delete_table(table_id) + bigquerySchema = [] From 685cf53ea27d835223a2b7105617e75575876419 Mon Sep 17 00:00:00 2001 From: Vadim Zaripov Date: Tue, 18 Oct 2022 00:30:39 +0100 Subject: [PATCH 29/71] Autoschema --- geobeam/examples/shapefile_parcel_schema_detect.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/geobeam/examples/shapefile_parcel_schema_detect.py b/geobeam/examples/shapefile_parcel_schema_detect.py index 0c98667..a8eec2e 100644 --- a/geobeam/examples/shapefile_parcel_schema_detect.py +++ b/geobeam/examples/shapefile_parcel_schema_detect.py @@ -181,9 +181,9 @@ def run(pipeline_args, known_args): logging.getLogger().setLevel(logging.INFO) parser = argparse.ArgumentParser() + parser.add_argument('--project') parser.add_argument('--gcs_url') parser.add_argument('--dataset') - parser.add_argument('--project') parser.add_argument('--table') parser.add_argument('--layer_name') parser.add_argument('--in_epsg', type=int, default=None) From 278f5c20e91fd7016ac788116623d36a611ae98e Mon Sep 17 00:00:00 2001 From: Vadim Zaripov Date: Tue, 18 Oct 2022 00:36:14 +0100 Subject: [PATCH 30/71] Autoschema --- geobeam/examples/shapefile_parcel_schema_detect.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/geobeam/examples/shapefile_parcel_schema_detect.py b/geobeam/examples/shapefile_parcel_schema_detect.py index a8eec2e..2f4f00c 100644 --- a/geobeam/examples/shapefile_parcel_schema_detect.py +++ b/geobeam/examples/shapefile_parcel_schema_detect.py @@ -181,7 +181,7 @@ def run(pipeline_args, known_args): logging.getLogger().setLevel(logging.INFO) parser = argparse.ArgumentParser() - parser.add_argument('--project') + #parser.add_argument('--project') parser.add_argument('--gcs_url') parser.add_argument('--dataset') parser.add_argument('--table') From 8c503903e6b6639d6b1df9153f0b30581d8ae5d6 Mon Sep 17 00:00:00 2001 From: Vadim Zaripov Date: Tue, 18 Oct 2022 00:39:37 +0100 Subject: [PATCH 31/71] Autoschema --- geobeam/examples/shapefile_parcel_schema_detect.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/geobeam/examples/shapefile_parcel_schema_detect.py b/geobeam/examples/shapefile_parcel_schema_detect.py index 2f4f00c..ae3ce2b 100644 --- a/geobeam/examples/shapefile_parcel_schema_detect.py +++ b/geobeam/examples/shapefile_parcel_schema_detect.py @@ -181,7 +181,7 @@ def run(pipeline_args, known_args): logging.getLogger().setLevel(logging.INFO) parser = argparse.ArgumentParser() - #parser.add_argument('--project') + parser.add_argument('--project',type=str, default=None) parser.add_argument('--gcs_url') parser.add_argument('--dataset') parser.add_argument('--table') From 4f6d9936c16af53d4875420bb2a73f12d36c8d8d Mon Sep 17 00:00:00 2001 From: Vadim Zaripov Date: Tue, 18 Oct 2022 00:46:14 +0100 Subject: [PATCH 32/71] Autoschema --- geobeam/examples/shapefile_parcel_schema_detect.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/geobeam/examples/shapefile_parcel_schema_detect.py b/geobeam/examples/shapefile_parcel_schema_detect.py index ae3ce2b..57c7f0f 100644 --- a/geobeam/examples/shapefile_parcel_schema_detect.py +++ b/geobeam/examples/shapefile_parcel_schema_detect.py @@ -161,7 +161,7 @@ def run(pipeline_args, known_args): (p | beam.io.Read(ShapefileSource(known_args.gcs_url, layer_name=known_args.layer_name)) - | 'MakeValid' >> beam.Map(make_valid) + #| 'MakeValid' >> beam.Map(make_valid) | 'FilterInvalid' >> beam.Filter(filter_invalid) | 'FormatRecords' >> beam.Map(format_record) | 'WriteToBigQuery' >> beam.io.WriteToBigQuery( @@ -181,7 +181,7 @@ def run(pipeline_args, known_args): logging.getLogger().setLevel(logging.INFO) parser = argparse.ArgumentParser() - parser.add_argument('--project',type=str, default=None) + parser.add_argument('--project', type=str, default=None) parser.add_argument('--gcs_url') parser.add_argument('--dataset') parser.add_argument('--table') From 672af696ad128ffc9d9f10fe4d00e1f9e82d0575 Mon Sep 17 00:00:00 2001 From: Vadim Zaripov Date: Tue, 18 Oct 2022 01:04:13 +0100 Subject: [PATCH 33/71] Autoschema --- geobeam/examples/shapefile_parcel_schema_detect.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/geobeam/examples/shapefile_parcel_schema_detect.py b/geobeam/examples/shapefile_parcel_schema_detect.py index 57c7f0f..2f4f00c 100644 --- a/geobeam/examples/shapefile_parcel_schema_detect.py +++ b/geobeam/examples/shapefile_parcel_schema_detect.py @@ -161,7 +161,7 @@ def run(pipeline_args, known_args): (p | beam.io.Read(ShapefileSource(known_args.gcs_url, layer_name=known_args.layer_name)) - #| 'MakeValid' >> beam.Map(make_valid) + | 'MakeValid' >> beam.Map(make_valid) | 'FilterInvalid' >> beam.Filter(filter_invalid) | 'FormatRecords' >> beam.Map(format_record) | 'WriteToBigQuery' >> beam.io.WriteToBigQuery( @@ -181,7 +181,7 @@ def run(pipeline_args, known_args): logging.getLogger().setLevel(logging.INFO) parser = argparse.ArgumentParser() - parser.add_argument('--project', type=str, default=None) + #parser.add_argument('--project') parser.add_argument('--gcs_url') parser.add_argument('--dataset') parser.add_argument('--table') From fd246e23f0b867a6ec500ea2e3220e7081a36a8b Mon Sep 17 00:00:00 2001 From: Vadim Zaripov Date: Tue, 18 Oct 2022 01:11:40 +0100 Subject: [PATCH 34/71] Autoschema --- geobeam/examples/shapefile_parcel_schema_detect.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/geobeam/examples/shapefile_parcel_schema_detect.py b/geobeam/examples/shapefile_parcel_schema_detect.py index 2f4f00c..648a5a6 100644 --- a/geobeam/examples/shapefile_parcel_schema_detect.py +++ b/geobeam/examples/shapefile_parcel_schema_detect.py @@ -62,7 +62,7 @@ def typecast_fields(record): ''' -def create_table(known_args): +def create_table(known_args,pipeline_args): gcs_url = known_args.gcs_url @@ -126,7 +126,7 @@ def create_table(known_args): # TODO(developer): Set table_id to the ID of the table to create. #table_id = format(known_args.project, known_args.dataset, known_args.table) - table_id=f"{known_args.project}.{known_args.dataset}.{known_args.table}" + table_id=f"{pipeline_args.project}.{known_args.dataset}.{known_args.table}" try: client.get_table(table_id) # Make an API request. From 2c4ac004213e837b64a8dd976f0cd37f3c6e25f6 Mon Sep 17 00:00:00 2001 From: Vadim Zaripov Date: Tue, 18 Oct 2022 01:12:30 +0100 Subject: [PATCH 35/71] Autoschema --- geobeam/examples/shapefile_parcel_schema_detect.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/geobeam/examples/shapefile_parcel_schema_detect.py b/geobeam/examples/shapefile_parcel_schema_detect.py index 648a5a6..893f607 100644 --- a/geobeam/examples/shapefile_parcel_schema_detect.py +++ b/geobeam/examples/shapefile_parcel_schema_detect.py @@ -189,5 +189,5 @@ def run(pipeline_args, known_args): parser.add_argument('--in_epsg', type=int, default=None) known_args, pipeline_args = parser.parse_known_args() - create_table(known_args) + create_table(known_args,pipeline_args) run(pipeline_args, known_args) From e5bb02ff260c994e2f3c7fcf995135d2a39f3f4c Mon Sep 17 00:00:00 2001 From: Vadim Zaripov Date: Tue, 18 Oct 2022 01:14:47 +0100 Subject: [PATCH 36/71] Autoschema --- geobeam/examples/shapefile_parcel_schema_detect.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/geobeam/examples/shapefile_parcel_schema_detect.py b/geobeam/examples/shapefile_parcel_schema_detect.py index 893f607..b690a2e 100644 --- a/geobeam/examples/shapefile_parcel_schema_detect.py +++ b/geobeam/examples/shapefile_parcel_schema_detect.py @@ -64,6 +64,7 @@ def typecast_fields(record): def create_table(known_args,pipeline_args): + gcp_args = pipeline_args.view_as(GoogleCloudOptions) gcs_url = known_args.gcs_url bucket_name = gcs_url.split('/')[2] @@ -126,7 +127,7 @@ def create_table(known_args,pipeline_args): # TODO(developer): Set table_id to the ID of the table to create. #table_id = format(known_args.project, known_args.dataset, known_args.table) - table_id=f"{pipeline_args.project}.{known_args.dataset}.{known_args.table}" + table_id=f"{gcp_args.project}.{known_args.dataset}.{known_args.table}" try: client.get_table(table_id) # Make an API request. From a25dcac9e5e3aee6075dbc0003085433a1d493b3 Mon Sep 17 00:00:00 2001 From: Vadim Zaripov Date: Tue, 18 Oct 2022 01:18:43 +0100 Subject: [PATCH 37/71] Autoschema --- geobeam/examples/shapefile_parcel_schema_detect.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/geobeam/examples/shapefile_parcel_schema_detect.py b/geobeam/examples/shapefile_parcel_schema_detect.py index b690a2e..87b0325 100644 --- a/geobeam/examples/shapefile_parcel_schema_detect.py +++ b/geobeam/examples/shapefile_parcel_schema_detect.py @@ -62,10 +62,8 @@ def typecast_fields(record): ''' -def create_table(known_args,pipeline_args): +def create_table(known_args): - gcp_args = pipeline_args.view_as(GoogleCloudOptions) - gcs_url = known_args.gcs_url bucket_name = gcs_url.split('/')[2] file_name = '/'.join(gcs_url.split('/')[3:]) @@ -127,7 +125,7 @@ def create_table(known_args,pipeline_args): # TODO(developer): Set table_id to the ID of the table to create. #table_id = format(known_args.project, known_args.dataset, known_args.table) - table_id=f"{gcp_args.project}.{known_args.dataset}.{known_args.table}" + table_id=f"{known_args.dataset}.{known_args.table}" try: client.get_table(table_id) # Make an API request. @@ -190,5 +188,5 @@ def run(pipeline_args, known_args): parser.add_argument('--in_epsg', type=int, default=None) known_args, pipeline_args = parser.parse_known_args() - create_table(known_args,pipeline_args) + create_table(known_args) run(pipeline_args, known_args) From e11ad26587c06afa00492472bd13d2c9efc959a5 Mon Sep 17 00:00:00 2001 From: Vadim Zaripov Date: Tue, 18 Oct 2022 01:22:08 +0100 Subject: [PATCH 38/71] Autoschema --- geobeam/examples/shapefile_parcel_schema_detect.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/geobeam/examples/shapefile_parcel_schema_detect.py b/geobeam/examples/shapefile_parcel_schema_detect.py index 87b0325..8c77dd9 100644 --- a/geobeam/examples/shapefile_parcel_schema_detect.py +++ b/geobeam/examples/shapefile_parcel_schema_detect.py @@ -125,7 +125,10 @@ def create_table(known_args): # TODO(developer): Set table_id to the ID of the table to create. #table_id = format(known_args.project, known_args.dataset, known_args.table) - table_id=f"{known_args.dataset}.{known_args.table}" + project_id = "vadimzaripov-477-2022062208552" + + #TODO: FIX THE DAMNED PROJECT ID + table_id=f"{project_id}.{known_args.dataset}.{known_args.table}" try: client.get_table(table_id) # Make an API request. From 106b89e69058ac9828b879c1f2867b3c66d806e1 Mon Sep 17 00:00:00 2001 From: Vadim Zaripov Date: Tue, 18 Oct 2022 01:36:35 +0100 Subject: [PATCH 39/71] Autoschema --- geobeam/examples/shapefile_parcel_schema_detect.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/geobeam/examples/shapefile_parcel_schema_detect.py b/geobeam/examples/shapefile_parcel_schema_detect.py index 8c77dd9..bc96cf4 100644 --- a/geobeam/examples/shapefile_parcel_schema_detect.py +++ b/geobeam/examples/shapefile_parcel_schema_detect.py @@ -32,7 +32,7 @@ import json from fiona import BytesCollection -''' + def orient_polygon(element): from shapely.geometry import shape, polygon, MultiPolygon @@ -53,7 +53,7 @@ def orient_polygon(element): return props, geom - +''' def typecast_fields(record): return { **record, @@ -165,6 +165,7 @@ def run(pipeline_args, known_args): layer_name=known_args.layer_name)) | 'MakeValid' >> beam.Map(make_valid) | 'FilterInvalid' >> beam.Filter(filter_invalid) + | 'OrientPolygon' >> beam.Map(orient_polygon) | 'FormatRecords' >> beam.Map(format_record) | 'WriteToBigQuery' >> beam.io.WriteToBigQuery( beam_bigquery.TableReference( From cbea2997377eb0f48c2703700a039af5102c51cf Mon Sep 17 00:00:00 2001 From: Vadim Zaripov Date: Tue, 18 Oct 2022 01:52:39 +0100 Subject: [PATCH 40/71] Autoschema --- geobeam/examples/shapefile_parcel_schema_detect.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/geobeam/examples/shapefile_parcel_schema_detect.py b/geobeam/examples/shapefile_parcel_schema_detect.py index bc96cf4..c3ba08e 100644 --- a/geobeam/examples/shapefile_parcel_schema_detect.py +++ b/geobeam/examples/shapefile_parcel_schema_detect.py @@ -82,7 +82,7 @@ def create_table(known_args): if layer_name is not None: with fiona.io.ZipMemoryFile(data) as zip: - with zip.open(f'{zip_name}.shp',layer_name) as collection: + with zip.open(f'{zip_name}.shp',layer=layer_name) as collection: print(collection) profile = collection.profile elif layer_name is not None: From 2869e2748e3040360bd48c8d8c8faa9b8a4d56cc Mon Sep 17 00:00:00 2001 From: Vadim Zaripov Date: Tue, 18 Oct 2022 01:53:00 +0100 Subject: [PATCH 41/71] Autoschema --- .../{shapefile_parcel_schema_detect.py => shapefile_generic.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename geobeam/examples/{shapefile_parcel_schema_detect.py => shapefile_generic.py} (100%) diff --git a/geobeam/examples/shapefile_parcel_schema_detect.py b/geobeam/examples/shapefile_generic.py similarity index 100% rename from geobeam/examples/shapefile_parcel_schema_detect.py rename to geobeam/examples/shapefile_generic.py From 9c6826b109cf13752f1f977f81ff1ca33ef969eb Mon Sep 17 00:00:00 2001 From: Vadim Zaripov Date: Tue, 18 Oct 2022 01:55:00 +0100 Subject: [PATCH 42/71] Autoschema --- geobeam/examples/shapefile_generic.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/geobeam/examples/shapefile_generic.py b/geobeam/examples/shapefile_generic.py index c3ba08e..58f670d 100644 --- a/geobeam/examples/shapefile_generic.py +++ b/geobeam/examples/shapefile_generic.py @@ -82,7 +82,7 @@ def create_table(known_args): if layer_name is not None: with fiona.io.ZipMemoryFile(data) as zip: - with zip.open(f'{zip_name}.shp',layer=layer_name) as collection: + with zip.open(f'{zip_name}.shp',layer={layer_name}) as collection: print(collection) profile = collection.profile elif layer_name is not None: From 816822cf39793f30ddb9f3a1949e63b1336d2010 Mon Sep 17 00:00:00 2001 From: Vadim Zaripov Date: Tue, 18 Oct 2022 01:57:26 +0100 Subject: [PATCH 43/71] Autoschema --- geobeam/examples/shapefile_generic.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/geobeam/examples/shapefile_generic.py b/geobeam/examples/shapefile_generic.py index 58f670d..ee55149 100644 --- a/geobeam/examples/shapefile_generic.py +++ b/geobeam/examples/shapefile_generic.py @@ -82,7 +82,7 @@ def create_table(known_args): if layer_name is not None: with fiona.io.ZipMemoryFile(data) as zip: - with zip.open(f'{zip_name}.shp',layer={layer_name}) as collection: + with zip.open(f'{layer_name}.shp',layer=layer_name) as collection: print(collection) profile = collection.profile elif layer_name is not None: From ab241c96ce32ce4e6eeffd1bfc767a1da1ef6927 Mon Sep 17 00:00:00 2001 From: Vadim Zaripov Date: Tue, 18 Oct 2022 10:44:49 +0100 Subject: [PATCH 44/71] Autoschema --- geobeam/examples/shapefile_generic.py | 17 +++-------------- 1 file changed, 3 insertions(+), 14 deletions(-) diff --git a/geobeam/examples/shapefile_generic.py b/geobeam/examples/shapefile_generic.py index ee55149..d41a2e5 100644 --- a/geobeam/examples/shapefile_generic.py +++ b/geobeam/examples/shapefile_generic.py @@ -25,7 +25,6 @@ from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions from geobeam.io import ShapefileSource from geobeam.fn import format_record, make_valid, filter_invalid -from geobeam.util import get_bigquery_schema_dataflow,get_bigquery_schema from google.cloud import storage import fiona @@ -53,15 +52,6 @@ def orient_polygon(element): return props, geom -''' -def typecast_fields(record): - return { - **record, - 'LRSN': str(record['LRSN']) - } - -''' - def create_table(known_args): gcs_url = known_args.gcs_url @@ -120,15 +110,14 @@ def create_table(known_args): schema_json = json.JSONEncoder(sort_keys=True).encode(bq_schema) - # Construct a BigQuery client object. client = bigquery.Client() # TODO(developer): Set table_id to the ID of the table to create. #table_id = format(known_args.project, known_args.dataset, known_args.table) - project_id = "vadimzaripov-477-2022062208552" + #project_id = "vadimzaripov-477-2022062208552" #TODO: FIX THE DAMNED PROJECT ID - table_id=f"{project_id}.{known_args.dataset}.{known_args.table}" + table_id=f"{known_args.project_id}.{known_args.dataset}.{known_args.table}" try: client.get_table(table_id) # Make an API request. @@ -184,7 +173,7 @@ def run(pipeline_args, known_args): logging.getLogger().setLevel(logging.INFO) parser = argparse.ArgumentParser() - #parser.add_argument('--project') + parser.add_argument('--project_id') parser.add_argument('--gcs_url') parser.add_argument('--dataset') parser.add_argument('--table') From bec35a664f5e41fb9654c8a24570558b88d2fafb Mon Sep 17 00:00:00 2001 From: Vadim Zaripov Date: Tue, 18 Oct 2022 10:49:50 +0100 Subject: [PATCH 45/71] Autoschema --- geobeam/examples/shapefile_generic.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/geobeam/examples/shapefile_generic.py b/geobeam/examples/shapefile_generic.py index d41a2e5..ce0a7ac 100644 --- a/geobeam/examples/shapefile_generic.py +++ b/geobeam/examples/shapefile_generic.py @@ -117,7 +117,7 @@ def create_table(known_args): #project_id = "vadimzaripov-477-2022062208552" #TODO: FIX THE DAMNED PROJECT ID - table_id=f"{known_args.project_id}.{known_args.dataset}.{known_args.table}" + table_id=f"{known_args.project}.{known_args.dataset}.{known_args.table}" try: client.get_table(table_id) # Make an API request. From 8470a88e4d7b447edc0ac4f8571a7a0bfc3e5057 Mon Sep 17 00:00:00 2001 From: Vadim Zaripov Date: Tue, 18 Oct 2022 10:53:02 +0100 Subject: [PATCH 46/71] Autoschema --- geobeam/examples/shapefile_generic.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/geobeam/examples/shapefile_generic.py b/geobeam/examples/shapefile_generic.py index ce0a7ac..1bec947 100644 --- a/geobeam/examples/shapefile_generic.py +++ b/geobeam/examples/shapefile_generic.py @@ -173,7 +173,7 @@ def run(pipeline_args, known_args): logging.getLogger().setLevel(logging.INFO) parser = argparse.ArgumentParser() - parser.add_argument('--project_id') + parser.add_argument('--project') parser.add_argument('--gcs_url') parser.add_argument('--dataset') parser.add_argument('--table') From 7c6b7a56339d5e551a0cf672dc354009f9558be3 Mon Sep 17 00:00:00 2001 From: Vadim Zaripov Date: Tue, 18 Oct 2022 10:54:39 +0100 Subject: [PATCH 47/71] Autoschema --- geobeam/examples/shapefile_generic.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/geobeam/examples/shapefile_generic.py b/geobeam/examples/shapefile_generic.py index 1bec947..023ff51 100644 --- a/geobeam/examples/shapefile_generic.py +++ b/geobeam/examples/shapefile_generic.py @@ -173,7 +173,7 @@ def run(pipeline_args, known_args): logging.getLogger().setLevel(logging.INFO) parser = argparse.ArgumentParser() - parser.add_argument('--project') + #parser.add_argument('--project') parser.add_argument('--gcs_url') parser.add_argument('--dataset') parser.add_argument('--table') From b5f2021184f4d45a507dcab83f2dee4f99aed54e Mon Sep 17 00:00:00 2001 From: Vadim Zaripov Date: Tue, 18 Oct 2022 10:57:09 +0100 Subject: [PATCH 48/71] Autoschema --- geobeam/examples/shapefile_generic.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/geobeam/examples/shapefile_generic.py b/geobeam/examples/shapefile_generic.py index 023ff51..9627dad 100644 --- a/geobeam/examples/shapefile_generic.py +++ b/geobeam/examples/shapefile_generic.py @@ -52,7 +52,7 @@ def orient_polygon(element): return props, geom -def create_table(known_args): +def create_table(known_args,pipeline_args): gcs_url = known_args.gcs_url bucket_name = gcs_url.split('/')[2] @@ -117,7 +117,7 @@ def create_table(known_args): #project_id = "vadimzaripov-477-2022062208552" #TODO: FIX THE DAMNED PROJECT ID - table_id=f"{known_args.project}.{known_args.dataset}.{known_args.table}" + table_id=f"{pipeline_args.project}.{known_args.dataset}.{known_args.table}" try: client.get_table(table_id) # Make an API request. @@ -181,5 +181,5 @@ def run(pipeline_args, known_args): parser.add_argument('--in_epsg', type=int, default=None) known_args, pipeline_args = parser.parse_known_args() - create_table(known_args) + create_table(known_args,pipeline_args) run(pipeline_args, known_args) From 3b4012ee7950572f18508acf80edaa1c9dd58980 Mon Sep 17 00:00:00 2001 From: Vadim Zaripov Date: Tue, 18 Oct 2022 11:20:51 +0100 Subject: [PATCH 49/71] Autoschema --- geobeam/examples/shapefile_generic.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/geobeam/examples/shapefile_generic.py b/geobeam/examples/shapefile_generic.py index 9627dad..6f92533 100644 --- a/geobeam/examples/shapefile_generic.py +++ b/geobeam/examples/shapefile_generic.py @@ -52,7 +52,7 @@ def orient_polygon(element): return props, geom -def create_table(known_args,pipeline_args): +def create_table(known_args): gcs_url = known_args.gcs_url bucket_name = gcs_url.split('/')[2] @@ -117,7 +117,12 @@ def create_table(known_args,pipeline_args): #project_id = "vadimzaripov-477-2022062208552" #TODO: FIX THE DAMNED PROJECT ID - table_id=f"{pipeline_args.project}.{known_args.dataset}.{known_args.table}" + + known_args_ext = known_args.extend([ + '--project=' + known_args.project + ]) + + table_id=f"{known_args_ext.project}.{known_args.dataset}.{known_args.table}" try: client.get_table(table_id) # Make an API request. @@ -162,7 +167,7 @@ def run(pipeline_args, known_args): tableId=known_args.table), method=beam.io.WriteToBigQuery.Method.FILE_LOADS, write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE, - create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)) + create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER)) @@ -181,5 +186,5 @@ def run(pipeline_args, known_args): parser.add_argument('--in_epsg', type=int, default=None) known_args, pipeline_args = parser.parse_known_args() - create_table(known_args,pipeline_args) + create_table(known_args) run(pipeline_args, known_args) From aebfc9416d4003f29c82aa6767baf831f047bd01 Mon Sep 17 00:00:00 2001 From: Vadim Zaripov Date: Tue, 18 Oct 2022 11:25:12 +0100 Subject: [PATCH 50/71] Autoschema --- geobeam/examples/shapefile_generic.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/geobeam/examples/shapefile_generic.py b/geobeam/examples/shapefile_generic.py index 6f92533..756a5fe 100644 --- a/geobeam/examples/shapefile_generic.py +++ b/geobeam/examples/shapefile_generic.py @@ -118,7 +118,7 @@ def create_table(known_args): #TODO: FIX THE DAMNED PROJECT ID - known_args_ext = known_args.extend([ + known_args_ext = known_args_extend([ '--project=' + known_args.project ]) From 5dda048d147fdfac26c1bf0696ad6c7371931b7c Mon Sep 17 00:00:00 2001 From: Vadim Zaripov Date: Tue, 18 Oct 2022 11:28:40 +0100 Subject: [PATCH 51/71] Autoschema --- geobeam/examples/shapefile_generic.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/geobeam/examples/shapefile_generic.py b/geobeam/examples/shapefile_generic.py index 756a5fe..cf48146 100644 --- a/geobeam/examples/shapefile_generic.py +++ b/geobeam/examples/shapefile_generic.py @@ -118,11 +118,8 @@ def create_table(known_args): #TODO: FIX THE DAMNED PROJECT ID - known_args_ext = known_args_extend([ - '--project=' + known_args.project - ]) - table_id=f"{known_args_ext.project}.{known_args.dataset}.{known_args.table}" + table_id=f"{known_args.projectid}.{known_args.dataset}.{known_args.table}" try: client.get_table(table_id) # Make an API request. From 9ef6f1a721d1d7a688f00bfe91c8151b9bb6affa Mon Sep 17 00:00:00 2001 From: Vadim Zaripov Date: Tue, 18 Oct 2022 11:34:02 +0100 Subject: [PATCH 52/71] Autoschema --- geobeam/examples/shapefile_generic.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/geobeam/examples/shapefile_generic.py b/geobeam/examples/shapefile_generic.py index cf48146..143f0de 100644 --- a/geobeam/examples/shapefile_generic.py +++ b/geobeam/examples/shapefile_generic.py @@ -119,7 +119,7 @@ def create_table(known_args): #TODO: FIX THE DAMNED PROJECT ID - table_id=f"{known_args.projectid}.{known_args.dataset}.{known_args.table}" + table_id=f"{known_args.project}.{known_args.dataset}.{known_args.table}" try: client.get_table(table_id) # Make an API request. From b667a33d3bba9b106f477fc3dbebb71722af045e Mon Sep 17 00:00:00 2001 From: Vadim Zaripov Date: Tue, 18 Oct 2022 11:37:45 +0100 Subject: [PATCH 53/71] Autoschema --- geobeam/examples/shapefile_generic.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/geobeam/examples/shapefile_generic.py b/geobeam/examples/shapefile_generic.py index 143f0de..35df910 100644 --- a/geobeam/examples/shapefile_generic.py +++ b/geobeam/examples/shapefile_generic.py @@ -175,7 +175,7 @@ def run(pipeline_args, known_args): logging.getLogger().setLevel(logging.INFO) parser = argparse.ArgumentParser() - #parser.add_argument('--project') + parser.add_argument('--project') parser.add_argument('--gcs_url') parser.add_argument('--dataset') parser.add_argument('--table') From 9c62e4d26fc18752af3663bcb998a2f95624ab61 Mon Sep 17 00:00:00 2001 From: Vadim Zaripov Date: Tue, 18 Oct 2022 12:20:18 +0100 Subject: [PATCH 54/71] Autoschema --- geobeam/examples/shapefile_generic.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/geobeam/examples/shapefile_generic.py b/geobeam/examples/shapefile_generic.py index 35df910..543c3b0 100644 --- a/geobeam/examples/shapefile_generic.py +++ b/geobeam/examples/shapefile_generic.py @@ -52,7 +52,7 @@ def orient_polygon(element): return props, geom -def create_table(known_args): +def create_table(known_args,pipeline_args): gcs_url = known_args.gcs_url bucket_name = gcs_url.split('/')[2] @@ -118,8 +118,9 @@ def create_table(known_args): #TODO: FIX THE DAMNED PROJECT ID - - table_id=f"{known_args.project}.{known_args.dataset}.{known_args.table}" + beam_options = PipelineOptions(pipeline_args) + options = list(beam_options.display_data().values()) + table_id=f"{options}.{known_args.dataset}.{known_args.table}" try: client.get_table(table_id) # Make an API request. @@ -183,5 +184,5 @@ def run(pipeline_args, known_args): parser.add_argument('--in_epsg', type=int, default=None) known_args, pipeline_args = parser.parse_known_args() - create_table(known_args) + create_table(known_args,pipeline_args) run(pipeline_args, known_args) From b6f3047d275552fd4b65438468a0c7fe5e64aae9 Mon Sep 17 00:00:00 2001 From: Vadim Zaripov Date: Tue, 18 Oct 2022 12:21:35 +0100 Subject: [PATCH 55/71] Autoschema --- geobeam/examples/shapefile_generic.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/geobeam/examples/shapefile_generic.py b/geobeam/examples/shapefile_generic.py index 543c3b0..29d1a03 100644 --- a/geobeam/examples/shapefile_generic.py +++ b/geobeam/examples/shapefile_generic.py @@ -176,7 +176,7 @@ def run(pipeline_args, known_args): logging.getLogger().setLevel(logging.INFO) parser = argparse.ArgumentParser() - parser.add_argument('--project') + #parser.add_argument('--project') parser.add_argument('--gcs_url') parser.add_argument('--dataset') parser.add_argument('--table') From a9b77fa888df4f9550f1497198d2deac58d7c3f3 Mon Sep 17 00:00:00 2001 From: Vadim Zaripov Date: Tue, 18 Oct 2022 12:22:54 +0100 Subject: [PATCH 56/71] Autoschema --- geobeam/examples/shapefile_generic.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/geobeam/examples/shapefile_generic.py b/geobeam/examples/shapefile_generic.py index 29d1a03..063f433 100644 --- a/geobeam/examples/shapefile_generic.py +++ b/geobeam/examples/shapefile_generic.py @@ -118,9 +118,8 @@ def create_table(known_args,pipeline_args): #TODO: FIX THE DAMNED PROJECT ID - beam_options = PipelineOptions(pipeline_args) - options = list(beam_options.display_data().values()) - table_id=f"{options}.{known_args.dataset}.{known_args.table}" + options = list(PipelineOptions(pipeline_args).display_data().values()) + table_id=f"{options[1]}.{known_args.dataset}.{known_args.table}" try: client.get_table(table_id) # Make an API request. From f92526e50e0dc476478fb0ae4727dff7b4160568 Mon Sep 17 00:00:00 2001 From: Vadim Zaripov Date: Tue, 18 Oct 2022 12:40:12 +0100 Subject: [PATCH 57/71] Finalising Generic SHP Load --- geobeam/examples/shapefile_generic.py | 63 +++++++++++++-------------- 1 file changed, 30 insertions(+), 33 deletions(-) diff --git a/geobeam/examples/shapefile_generic.py b/geobeam/examples/shapefile_generic.py index 063f433..d09cf57 100644 --- a/geobeam/examples/shapefile_generic.py +++ b/geobeam/examples/shapefile_generic.py @@ -16,23 +16,29 @@ Example pipeline that loads a county parcel shape dataset into BigQuery. """ -import apache_beam as beam + from google.cloud import bigquery from google.cloud.exceptions import NotFound -import geobeam +from google.cloud import storage + +import apache_beam as beam from apache_beam.io.gcp.internal.clients import bigquery as beam_bigquery from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions + +import geobeam from geobeam.io import ShapefileSource from geobeam.fn import format_record, make_valid, filter_invalid -from google.cloud import storage + import fiona -import json from fiona import BytesCollection +import json + -def orient_polygon(element): +#function to orient polygons correctly (based on the linestrings) +def orient_polygon(element): from shapely.geometry import shape, polygon, MultiPolygon props, geom = element @@ -51,8 +57,10 @@ def orient_polygon(element): return props, geom +#function read shapefile based on the layer submitted, derive schema and create BQ table if doesn't exist +#beam.io.WriteToBigQuery in run function for some reason struggles with the standard json schema definitions like {"NAME":"TYPE"} (sends it with escaped " and BQ isn't happy about it) -def create_table(known_args,pipeline_args): +def create_table(known_args,pipeline_args): gcs_url = known_args.gcs_url bucket_name = gcs_url.split('/')[2] @@ -111,37 +119,27 @@ def create_table(known_args,pipeline_args): schema_json = json.JSONEncoder(sort_keys=True).encode(bq_schema) client = bigquery.Client() - - # TODO(developer): Set table_id to the ID of the table to create. - #table_id = format(known_args.project, known_args.dataset, known_args.table) - #project_id = "vadimzaripov-477-2022062208552" - - #TODO: FIX THE DAMNED PROJECT ID - - options = list(PipelineOptions(pipeline_args).display_data().values()) + + options = list(PipelineOptions(pipeline_args).display_data().values()) #impoassible to acquire project from known_args, had to be creative with PipelineOptions table_id=f"{options[1]}.{known_args.dataset}.{known_args.table}" try: - client.get_table(table_id) # Make an API request. + client.get_table(table_id) print("Table {} already exists.".format(table_id)) - table = client.delete_table(table_id) + #table = client.delete_table(table_id) #We are using WRITE_TRUNCATE in BigQuery, so no need to delete, if exists except NotFound: - print("Table {} is not found.".format(table_id)) - - - bigquerySchema = [] - - bigqueryColumns = json.loads(schema_json) - for col in bigqueryColumns: - bigquerySchema.append(bigquery.SchemaField(col['name'], col['type'])) - - table = bigquery.Table(table_id, schema=bigquerySchema) - table = client.create_table(table) # Make an API request. - print( - "Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id) - ) - - + print("Table {} is not found. Creating.".format(table_id)) + bigquerySchema = [] + bigqueryColumns = json.loads(schema_json) + for col in bigqueryColumns: + bigquerySchema.append(bigquery.SchemaField(col['name'], col['type'])) + table = bigquery.Table(table_id, schema=bigquerySchema) + table = client.create_table(table) # Make an API request. + print( + "Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id) + ) + +#Primary run function def run(pipeline_args, known_args): """ Invoked by the Beam runner @@ -175,7 +173,6 @@ def run(pipeline_args, known_args): logging.getLogger().setLevel(logging.INFO) parser = argparse.ArgumentParser() - #parser.add_argument('--project') parser.add_argument('--gcs_url') parser.add_argument('--dataset') parser.add_argument('--table') From 08bc296165dc53ce1c03538ec5b5a5c857ab72c3 Mon Sep 17 00:00:00 2001 From: Vadim Zaripov Date: Tue, 18 Oct 2022 12:57:07 +0100 Subject: [PATCH 58/71] Example call --- geobeam/examples/sahpefile_generic_example_call.md | 11 +++++++++++ 1 file changed, 11 insertions(+) create mode 100644 geobeam/examples/sahpefile_generic_example_call.md diff --git a/geobeam/examples/sahpefile_generic_example_call.md b/geobeam/examples/sahpefile_generic_example_call.md new file mode 100644 index 0000000..7100293 --- /dev/null +++ b/geobeam/examples/sahpefile_generic_example_call.md @@ -0,0 +1,11 @@ +python -m shapefile_generic \ +--runner DataflowRunner \ +--sdk_container_image gcr.io/vadimzaripov-477-2022062208552/geobeam-example \ +--temp_location gs://vz-geobeam-pipeline-tmp/tmp \ +--service_account_email geobeam@vadimzaripov-477-2022062208552.iam.gserviceaccount.com \ +--project vadimzaripov-477-2022062208552 \ +--region us-central1 \ +--gcs_url "gs://exp_bucket_vz/World Heritage Sites.zip" \ +--layer_name WorldHeritageSites_20Aug2021 \ +--dataset experiments \ +--table WorldHeritageSites_20Aug2021 \ No newline at end of file From ab6fca69fc29a9b2c8aa882f8291931330ce1c02 Mon Sep 17 00:00:00 2001 From: Vadim Zaripov Date: Tue, 18 Oct 2022 14:57:24 +0100 Subject: [PATCH 59/71] Geobeam Util updates --- geobeam/examples/shapefile_generic.py | 13 ++-- geobeam/util.py | 93 +++++++++++++++++++++++++++ 2 files changed, 101 insertions(+), 5 deletions(-) diff --git a/geobeam/examples/shapefile_generic.py b/geobeam/examples/shapefile_generic.py index d09cf57..8d8ad43 100644 --- a/geobeam/examples/shapefile_generic.py +++ b/geobeam/examples/shapefile_generic.py @@ -16,10 +16,11 @@ Example pipeline that loads a county parcel shape dataset into BigQuery. """ - +''' from google.cloud import bigquery from google.cloud.exceptions import NotFound from google.cloud import storage +''' import apache_beam as beam from apache_beam.io.gcp.internal.clients import bigquery as beam_bigquery @@ -29,12 +30,13 @@ import geobeam from geobeam.io import ShapefileSource from geobeam.fn import format_record, make_valid, filter_invalid +from geobeam.util import create_table_from_shp - +''' import fiona from fiona import BytesCollection import json - +''' #function to orient polygons correctly (based on the linestrings) @@ -57,6 +59,7 @@ def orient_polygon(element): return props, geom +''' #function read shapefile based on the layer submitted, derive schema and create BQ table if doesn't exist #beam.io.WriteToBigQuery in run function for some reason struggles with the standard json schema definitions like {"NAME":"TYPE"} (sends it with escaped " and BQ isn't happy about it) @@ -138,7 +141,7 @@ def create_table(known_args,pipeline_args): print( "Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id) ) - +''' #Primary run function def run(pipeline_args, known_args): """ @@ -180,5 +183,5 @@ def run(pipeline_args, known_args): parser.add_argument('--in_epsg', type=int, default=None) known_args, pipeline_args = parser.parse_known_args() - create_table(known_args,pipeline_args) + create_table_from_shp(known_args,pipeline_args) run(pipeline_args, known_args) diff --git a/geobeam/util.py b/geobeam/util.py index 1669991..ee9200a 100644 --- a/geobeam/util.py +++ b/geobeam/util.py @@ -171,6 +171,99 @@ def get_bigquery_schema_dataflow(filepath, layer_name=None, gdb_name=None): return json.JSONEncoder(sort_keys=True).encode({"fields": bq_schema}) +#function read shapefile based on the layer submitted, derive schema and create BQ table if doesn't exist +#beam.io.WriteToBigQuery in run function for some reason struggles with the standard json schema definitions like {"NAME":"TYPE"} (sends it with escaped " and BQ isn't happy about it) + +def create_table_from_shp(known_args,pipeline_args): + + import fiona + from fiona import BytesCollection + import json + + from google.cloud import bigquery + from google.cloud.exceptions import NotFound + from google.cloud import storage + + + gcs_url = known_args.gcs_url + bucket_name = gcs_url.split('/')[2] + file_name = '/'.join(gcs_url.split('/')[3:]) + zip_name = gcs_url.split('/')[-1].split('.')[0] + + storage_client = storage.Client() + blob = storage_client.bucket(bucket_name).get_blob(file_name) + source_bucket = storage_client.bucket(bucket_name) + blob_uri = gcs_url + + blob_2 = source_bucket.blob(file_name) + data = blob.download_as_string() + + profile = None + layer_name= known_args.layer_name + + if layer_name is not None: + with fiona.io.ZipMemoryFile(data) as zip: + with zip.open(f'{layer_name}.shp') as collection: + print(collection) + profile = collection.profile + elif layer_name is not None: + profile = BytesCollection(data, layer=layer_name).profile + else: + profile = fiona.open(gcs_url).profile + + from fiona import prop_type + + BQ_FIELD_TYPES = { + 'int': 'INTEGER', + 'str': 'STRING', + 'float': 'FLOAT', + 'bool': 'BOOL', + 'date': 'DATE', + 'time': 'TIME', + 'datetime': 'DATETIME', + 'bytes': 'BYTES' + } + + bq_schema = [] + + for field_name, field_type in profile['schema']['properties'].items(): + fiona_type = prop_type(field_type) + bq_type = BQ_FIELD_TYPES[fiona.schema.FIELD_TYPES_MAP_REV[fiona_type]] + bq_schema.append({ + 'name': field_name, + 'type': bq_type + }) + + bq_schema.append({ + 'name': 'geom', + 'type': 'GEOGRAPHY', + }) + + schema_json = json.JSONEncoder(sort_keys=True).encode(bq_schema) + + client = bigquery.Client() + + options = list(PipelineOptions(pipeline_args).display_data().values()) #impoassible to acquire project from known_args, had to be creative with PipelineOptions + table_id=f"{options[1]}.{known_args.dataset}.{known_args.table}" + + try: + client.get_table(table_id) + print("Table {} already exists.".format(table_id)) + #table = client.delete_table(table_id) #We are using WRITE_TRUNCATE in BigQuery, so no need to delete, if exists + except NotFound: + print("Table {} is not found. Creating.".format(table_id)) + bigquerySchema = [] + bigqueryColumns = json.loads(schema_json) + for col in bigqueryColumns: + bigquerySchema.append(bigquery.SchemaField(col['name'], col['type'])) + table = bigquery.Table(table_id, schema=bigquerySchema) + table = client.create_table(table) # Make an API request. + print( + "Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id) + ) + + + if __name__ == '__main__': import argparse import json From 4f5c7eacfcbf7982ce60f1a38574e902b170cbc8 Mon Sep 17 00:00:00 2001 From: Vadim Zaripov Date: Tue, 18 Oct 2022 15:06:46 +0100 Subject: [PATCH 60/71] Geobeam Util updates --- geobeam/examples/shapefile_generic.py | 103 +------------------------- geobeam/util.py | 2 + 2 files changed, 6 insertions(+), 99 deletions(-) diff --git a/geobeam/examples/shapefile_generic.py b/geobeam/examples/shapefile_generic.py index 8d8ad43..46e1154 100644 --- a/geobeam/examples/shapefile_generic.py +++ b/geobeam/examples/shapefile_generic.py @@ -1,4 +1,4 @@ -# Copyright 2021 Google LLC +# Copyright 2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,31 +13,18 @@ # limitations under the License. """ -Example pipeline that loads a county parcel shape dataset into BigQuery. +Example pipeline that loads any shape dataset into BigQuery and creates a table with the schema derived from SHP file """ -''' -from google.cloud import bigquery -from google.cloud.exceptions import NotFound -from google.cloud import storage -''' - import apache_beam as beam from apache_beam.io.gcp.internal.clients import bigquery as beam_bigquery from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions -import geobeam -from geobeam.io import ShapefileSource +#import geobeam from geobeam.fn import format_record, make_valid, filter_invalid from geobeam.util import create_table_from_shp -''' -import fiona -from fiona import BytesCollection -import json -''' - #function to orient polygons correctly (based on the linestrings) def orient_polygon(element): @@ -59,89 +46,7 @@ def orient_polygon(element): return props, geom -''' -#function read shapefile based on the layer submitted, derive schema and create BQ table if doesn't exist -#beam.io.WriteToBigQuery in run function for some reason struggles with the standard json schema definitions like {"NAME":"TYPE"} (sends it with escaped " and BQ isn't happy about it) - -def create_table(known_args,pipeline_args): - - gcs_url = known_args.gcs_url - bucket_name = gcs_url.split('/')[2] - file_name = '/'.join(gcs_url.split('/')[3:]) - zip_name = gcs_url.split('/')[-1].split('.')[0] - - storage_client = storage.Client() - blob = storage_client.bucket(bucket_name).get_blob(file_name) - source_bucket = storage_client.bucket(bucket_name) - blob_uri = gcs_url - - blob_2 = source_bucket.blob(file_name) - data = blob.download_as_string() - - profile = None - layer_name= known_args.layer_name - - if layer_name is not None: - with fiona.io.ZipMemoryFile(data) as zip: - with zip.open(f'{layer_name}.shp',layer=layer_name) as collection: - print(collection) - profile = collection.profile - elif layer_name is not None: - profile = BytesCollection(data, layer=layer_name).profile - else: - profile = fiona.open(gcs_url).profile - - from fiona import prop_type - - BQ_FIELD_TYPES = { - 'int': 'INTEGER', - 'str': 'STRING', - 'float': 'FLOAT', - 'bool': 'BOOL', - 'date': 'DATE', - 'time': 'TIME', - 'datetime': 'DATETIME', - 'bytes': 'BYTES' - } - - bq_schema = [] - - for field_name, field_type in profile['schema']['properties'].items(): - fiona_type = prop_type(field_type) - bq_type = BQ_FIELD_TYPES[fiona.schema.FIELD_TYPES_MAP_REV[fiona_type]] - bq_schema.append({ - 'name': field_name, - 'type': bq_type - }) - - bq_schema.append({ - 'name': 'geom', - 'type': 'GEOGRAPHY', - }) - - schema_json = json.JSONEncoder(sort_keys=True).encode(bq_schema) - - client = bigquery.Client() - - options = list(PipelineOptions(pipeline_args).display_data().values()) #impoassible to acquire project from known_args, had to be creative with PipelineOptions - table_id=f"{options[1]}.{known_args.dataset}.{known_args.table}" - - try: - client.get_table(table_id) - print("Table {} already exists.".format(table_id)) - #table = client.delete_table(table_id) #We are using WRITE_TRUNCATE in BigQuery, so no need to delete, if exists - except NotFound: - print("Table {} is not found. Creating.".format(table_id)) - bigquerySchema = [] - bigqueryColumns = json.loads(schema_json) - for col in bigqueryColumns: - bigquerySchema.append(bigquery.SchemaField(col['name'], col['type'])) - table = bigquery.Table(table_id, schema=bigquerySchema) - table = client.create_table(table) # Make an API request. - print( - "Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id) - ) -''' + #Primary run function def run(pipeline_args, known_args): """ diff --git a/geobeam/util.py b/geobeam/util.py index ee9200a..3d4dd17 100644 --- a/geobeam/util.py +++ b/geobeam/util.py @@ -183,6 +183,8 @@ def create_table_from_shp(known_args,pipeline_args): from google.cloud import bigquery from google.cloud.exceptions import NotFound from google.cloud import storage + + from geobeam.io import ShapefileSource gcs_url = known_args.gcs_url From d3636a5893ce0661f5689873b567fb5d1cef35bf Mon Sep 17 00:00:00 2001 From: Vadim Zaripov Date: Tue, 18 Oct 2022 15:11:00 +0100 Subject: [PATCH 61/71] Geobeam Util updates --- geobeam/examples/shapefile_generic.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/geobeam/examples/shapefile_generic.py b/geobeam/examples/shapefile_generic.py index 46e1154..c19c983 100644 --- a/geobeam/examples/shapefile_generic.py +++ b/geobeam/examples/shapefile_generic.py @@ -23,7 +23,7 @@ #import geobeam from geobeam.fn import format_record, make_valid, filter_invalid -from geobeam.util import create_table_from_shp +from .. import create_table_from_shp #function to orient polygons correctly (based on the linestrings) From c466fbf965d73dc5a3242498d84e80f389c48a01 Mon Sep 17 00:00:00 2001 From: Vadim Zaripov Date: Tue, 18 Oct 2022 15:14:10 +0100 Subject: [PATCH 62/71] Geobeam Util updates --- geobeam/examples/shapefile_generic.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/geobeam/examples/shapefile_generic.py b/geobeam/examples/shapefile_generic.py index c19c983..8ddaaf4 100644 --- a/geobeam/examples/shapefile_generic.py +++ b/geobeam/examples/shapefile_generic.py @@ -23,7 +23,7 @@ #import geobeam from geobeam.fn import format_record, make_valid, filter_invalid -from .. import create_table_from_shp +from ..util import create_table_from_shp #function to orient polygons correctly (based on the linestrings) From 145c900b518556fe102dde7491e62377f29872f3 Mon Sep 17 00:00:00 2001 From: Vadim Zaripov Date: Tue, 18 Oct 2022 15:16:17 +0100 Subject: [PATCH 63/71] Geobeam Util updates --- geobeam/examples/shapefile_generic.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/geobeam/examples/shapefile_generic.py b/geobeam/examples/shapefile_generic.py index 8ddaaf4..40ed9bb 100644 --- a/geobeam/examples/shapefile_generic.py +++ b/geobeam/examples/shapefile_generic.py @@ -23,7 +23,7 @@ #import geobeam from geobeam.fn import format_record, make_valid, filter_invalid -from ..util import create_table_from_shp +from ...geobeam.util import create_table_from_shp #function to orient polygons correctly (based on the linestrings) From 6da35b082e202101ee0caae1adc89b2144cc5845 Mon Sep 17 00:00:00 2001 From: Vadim Zaripov Date: Tue, 18 Oct 2022 15:19:16 +0100 Subject: [PATCH 64/71] Geobeam Util updates --- geobeam/examples/shapefile_generic.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/geobeam/examples/shapefile_generic.py b/geobeam/examples/shapefile_generic.py index 40ed9bb..46e1154 100644 --- a/geobeam/examples/shapefile_generic.py +++ b/geobeam/examples/shapefile_generic.py @@ -23,7 +23,7 @@ #import geobeam from geobeam.fn import format_record, make_valid, filter_invalid -from ...geobeam.util import create_table_from_shp +from geobeam.util import create_table_from_shp #function to orient polygons correctly (based on the linestrings) From e78a0ad12b988d42f327c22cb3f3347b55a62a2a Mon Sep 17 00:00:00 2001 From: Vadim Zaripov Date: Tue, 18 Oct 2022 15:47:44 +0100 Subject: [PATCH 65/71] Geobeam Util updates --- geobeam/util.py | 1 + 1 file changed, 1 insertion(+) diff --git a/geobeam/util.py b/geobeam/util.py index 3d4dd17..64a752e 100644 --- a/geobeam/util.py +++ b/geobeam/util.py @@ -185,6 +185,7 @@ def create_table_from_shp(known_args,pipeline_args): from google.cloud import storage from geobeam.io import ShapefileSource + from apache_beam.options.pipeline_options import PipelineOptions gcs_url = known_args.gcs_url From 3bce551beb6bf3660f3ae646fa784f0346580423 Mon Sep 17 00:00:00 2001 From: Vadim Zaripov Date: Tue, 18 Oct 2022 15:50:39 +0100 Subject: [PATCH 66/71] Geobeam Util updates --- geobeam/examples/shapefile_generic.py | 1 + geobeam/util.py | 3 +-- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/geobeam/examples/shapefile_generic.py b/geobeam/examples/shapefile_generic.py index 46e1154..9128060 100644 --- a/geobeam/examples/shapefile_generic.py +++ b/geobeam/examples/shapefile_generic.py @@ -20,6 +20,7 @@ from apache_beam.io.gcp.internal.clients import bigquery as beam_bigquery from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions +from geobeam.io import ShapefileSource #import geobeam from geobeam.fn import format_record, make_valid, filter_invalid diff --git a/geobeam/util.py b/geobeam/util.py index 64a752e..99b5031 100644 --- a/geobeam/util.py +++ b/geobeam/util.py @@ -183,8 +183,7 @@ def create_table_from_shp(known_args,pipeline_args): from google.cloud import bigquery from google.cloud.exceptions import NotFound from google.cloud import storage - - from geobeam.io import ShapefileSource + from apache_beam.options.pipeline_options import PipelineOptions From e03651c24292b207716d5e048fc0a245972661c8 Mon Sep 17 00:00:00 2001 From: Vadim Zaripov Date: Wed, 19 Oct 2022 10:01:58 +0100 Subject: [PATCH 67/71] Version Updates --- geobeam/examples/Dockerfile | 9 +++++++-- geobeam/examples/requirements.txt | 8 ++++---- geobeam/util.py | 4 ++-- 3 files changed, 13 insertions(+), 8 deletions(-) diff --git a/geobeam/examples/Dockerfile b/geobeam/examples/Dockerfile index c2678b9..e8793cc 100644 --- a/geobeam/examples/Dockerfile +++ b/geobeam/examples/Dockerfile @@ -12,9 +12,14 @@ # See the License for the specific language governing permissions and # limitations under the License. -FROM gcr.io/dataflow-geobeam/base +### Changelog ### +#10.2022 - Dockerfile versions bump to core beam sdk 3.9:2.41 and latest versions of libs (to be ran in GCP Console with Dataflow runner) +#Use with cloudbuild like: +#gcloud builds submit --tag gcr.io/[project]]/geobeam-example --timeout 3600s --machine-type n2-highcpu-16 -COPY geobeam/examples/requirements.txt ./requirements.txt +FROM gcr.io/vadimzaripov-477-2022062208552/geobeam-base + +COPY requirements.txt ./requirements.txt RUN pip install --upgrade pip RUN pip install -r requirements.txt diff --git a/geobeam/examples/requirements.txt b/geobeam/examples/requirements.txt index aec58c7..e91bc4c 100644 --- a/geobeam/examples/requirements.txt +++ b/geobeam/examples/requirements.txt @@ -1,4 +1,4 @@ -pyproj==3.0.0.post1 -fiona==1.8.18 -shapely==1.7.1 -rasterio==1.1.8 +pyproj==3.4.0 +fiona==1.8.22 +shapely==1.8.5.post1 +rasterio==1.3.2 diff --git a/geobeam/util.py b/geobeam/util.py index 99b5031..c562ca0 100644 --- a/geobeam/util.py +++ b/geobeam/util.py @@ -245,7 +245,7 @@ def create_table_from_shp(known_args,pipeline_args): client = bigquery.Client() - options = list(PipelineOptions(pipeline_args).display_data().values()) #impoassible to acquire project from known_args, had to be creative with PipelineOptions + options = list(PipelineOptions(pipeline_args).display_data().values()) #impossible to acquire project from known_args, had to be creative with PipelineOptions table_id=f"{options[1]}.{known_args.dataset}.{known_args.table}" try: @@ -259,7 +259,7 @@ def create_table_from_shp(known_args,pipeline_args): for col in bigqueryColumns: bigquerySchema.append(bigquery.SchemaField(col['name'], col['type'])) table = bigquery.Table(table_id, schema=bigquerySchema) - table = client.create_table(table) # Make an API request. + table = client.create_table(table) print( "Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id) ) From 97a51e85991e4177783e1061d0152ad6a9d6f122 Mon Sep 17 00:00:00 2001 From: Vadim Zaripov Date: Wed, 19 Oct 2022 10:03:38 +0100 Subject: [PATCH 68/71] Version Updates --- Dockerfile | 26 ++++++++++++++------------ setup.py | 11 +++++++---- 2 files changed, 21 insertions(+), 16 deletions(-) diff --git a/Dockerfile b/Dockerfile index f8c9a08..75d3519 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -# Copyright 2021 Google LLC +# Copyright 2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,7 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -FROM apache/beam_python3.8_sdk:2.41.0 +### Changelog ### +#10.2022 - Dockerfile versions bump to core beam sdk 3.9:2.41 and latest versions of libs (to be ran in GCP Console with Dataflow runner) +#Use with cloudbuild like: +#gcloud builds submit --tag gcr.io/[project]]/geobeam-3.9-base --timeout 3600s --machine-type n2-highcpu-16 + +FROM apache/beam_python3.9_sdk:2.41.0 ARG WORKDIR=/pipeline RUN mkdir -p ${WORKDIR} @@ -22,18 +27,18 @@ ENV CCACHE_DISABLE=1 ENV PATH=$PATH:$WORKDIR/build/usr/local/bin RUN apt-get update -y \ - && apt-get install libffi-dev g++ cmake automake pkg-config -y \ + && apt-get install libffi-dev git g++ make cmake automake pkg-config -y \ && apt-get clean -ENV CURL_VERSION 7.73.0 +ENV CURL_VERSION 7.85.0 RUN wget -q https://curl.haxx.se/download/curl-${CURL_VERSION}.tar.gz \ && tar -xzf curl-${CURL_VERSION}.tar.gz && cd curl-${CURL_VERSION} \ - && ./configure --prefix=/usr/local \ + && ./configure --prefix=/usr/local --without-ssl \ && echo "building CURL ${CURL_VERSION}..." \ && make --quiet -j$(nproc) && make --quiet install \ && cd $WORKDIR && rm -rf curl-${CURL_VERSION}.tar.gz curl-${CURL_VERSION} -ENV GEOS_VERSION 3.9.0 +ENV GEOS_VERSION 3.11.0 RUN wget -q https://download.osgeo.org/geos/geos-${GEOS_VERSION}.tar.bz2 \ && tar -xjf geos-${GEOS_VERSION}.tar.bz2 \ && cd geos-${GEOS_VERSION} \ @@ -42,8 +47,8 @@ RUN wget -q https://download.osgeo.org/geos/geos-${GEOS_VERSION}.tar.bz2 \ && make --quiet -j$(nproc) && make --quiet install \ && cd $WORKDIR && rm -rf geos-${GEOS_VERSION}.tar.bz2 geos-${GEOS_VERSION} -ENV SQLITE_VERSION 3330000 -ENV SQLITE_YEAR 2020 +ENV SQLITE_VERSION 3390400 +ENV SQLITE_YEAR 2022 RUN wget -q https://sqlite.org/${SQLITE_YEAR}/sqlite-autoconf-${SQLITE_VERSION}.tar.gz \ && tar -xzf sqlite-autoconf-${SQLITE_VERSION}.tar.gz && cd sqlite-autoconf-${SQLITE_VERSION} \ && ./configure --prefix=/usr/local \ @@ -62,7 +67,7 @@ RUN wget -q https://download.osgeo.org/proj/proj-${PROJ_VERSION}.tar.gz \ && cmake --build . --target install \ && cd $WORKDIR && rm -rf proj-${PROJ_VERSION}.tar.gz proj-${PROJ_VERSION} -ENV OPENJPEG_VERSION 2.3.1 +ENV OPENJPEG_VERSION 2.5.0 RUN wget -q -O openjpeg-${OPENJPEG_VERSION}.tar.gz https://github.com/uclouvain/openjpeg/archive/v${OPENJPEG_VERSION}.tar.gz \ && tar -zxf openjpeg-${OPENJPEG_VERSION}.tar.gz \ && cd openjpeg-${OPENJPEG_VERSION} \ @@ -81,9 +86,6 @@ RUN wget -q https://download.osgeo.org/gdal/${GDAL_VERSION}/gdal-${GDAL_VERSION} && cmake --build . --target install \ && cd $WORKDIR && rm -rf gdal-${GDAL_VERSION}.tar.gz gdal-${GDAL_VERSION} -RUN apt-get remove g++ cmake automake pkg-config -y \ - && apt-get clean - RUN cd $WORKDIR RUN ldconfig RUN pip install --upgrade pip diff --git a/setup.py b/setup.py index 26a99b5..f8d67a1 100644 --- a/setup.py +++ b/setup.py @@ -1,4 +1,4 @@ -# Copyright 2021 Google LLC +# Copyright 2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,6 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +### Changelog ### +#10.2022 - VZ - Version bump in sync with base dockerfile + from __future__ import absolute_import import setuptools @@ -20,9 +23,9 @@ import geobeam REQUIRED_PACKAGES = [ - 'apache_beam[gcp]==2.41.0', - 'fiona==1.8.21', - 'shapely==1.8.4', + 'apache_beam[gcp]==2.42.0', + 'fiona==1.8.22', + 'shapely==1.8.5.post1', 'rasterio==1.3.2', 'google-cloud-storage==2.5.0', 'esridump==1.11.0' From e311e3d8c4f658a6f59b3415e933e5ef1771a7d6 Mon Sep 17 00:00:00 2001 From: Vadim Zaripov Date: Wed, 19 Oct 2022 10:19:56 +0100 Subject: [PATCH 69/71] Typos --- ...ic_example_call.md => shapefile_generic_example_call.md} | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) rename geobeam/examples/{sahpefile_generic_example_call.md => shapefile_generic_example_call.md} (65%) diff --git a/geobeam/examples/sahpefile_generic_example_call.md b/geobeam/examples/shapefile_generic_example_call.md similarity index 65% rename from geobeam/examples/sahpefile_generic_example_call.md rename to geobeam/examples/shapefile_generic_example_call.md index 7100293..80a0d69 100644 --- a/geobeam/examples/sahpefile_generic_example_call.md +++ b/geobeam/examples/shapefile_generic_example_call.md @@ -1,9 +1,9 @@ python -m shapefile_generic \ --runner DataflowRunner \ ---sdk_container_image gcr.io/vadimzaripov-477-2022062208552/geobeam-example \ ---temp_location gs://vz-geobeam-pipeline-tmp/tmp \ +--sdk_container_image "gcr.io/vadimzaripov-477-2022062208552/geobeam-example" \ +--temp_location "gs://vz-geobeam-pipeline-tmp/tmp" \ --service_account_email geobeam@vadimzaripov-477-2022062208552.iam.gserviceaccount.com \ ---project vadimzaripov-477-2022062208552 \ +--project "vadimzaripov-477-2022062208552" \ --region us-central1 \ --gcs_url "gs://exp_bucket_vz/World Heritage Sites.zip" \ --layer_name WorldHeritageSites_20Aug2021 \ From 84b974018e0383967638c9d344fd696953aac61f Mon Sep 17 00:00:00 2001 From: Vadim Zaripov Date: Wed, 19 Oct 2022 10:22:46 +0100 Subject: [PATCH 70/71] Example fixes --- geobeam/examples/shapefile_generic_example_call.md | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/geobeam/examples/shapefile_generic_example_call.md b/geobeam/examples/shapefile_generic_example_call.md index 80a0d69..5fb527f 100644 --- a/geobeam/examples/shapefile_generic_example_call.md +++ b/geobeam/examples/shapefile_generic_example_call.md @@ -1,11 +1,11 @@ python -m shapefile_generic \ --runner DataflowRunner \ --sdk_container_image "gcr.io/vadimzaripov-477-2022062208552/geobeam-example" \ ---temp_location "gs://vz-geobeam-pipeline-tmp/tmp" \ ---service_account_email geobeam@vadimzaripov-477-2022062208552.iam.gserviceaccount.com \ ---project "vadimzaripov-477-2022062208552" \ +--temp_location [gcs temp location with /tmp folder] \ +--service_account_email [service account to be used] \ --region us-central1 \ ---gcs_url "gs://exp_bucket_vz/World Heritage Sites.zip" \ ---layer_name WorldHeritageSites_20Aug2021 \ ---dataset experiments \ ---table WorldHeritageSites_20Aug2021 \ No newline at end of file +--gcs_url [shapefile zip] \ +--layer_name [layer name in shapefile]] \ +--project [gcp project] \ +--dataset [bq dataset] \ +--table [bq table] \ No newline at end of file From 0b0465880215a5da80ed8d48531718de05d164bd Mon Sep 17 00:00:00 2001 From: Vadim Zaripov Date: Thu, 20 Oct 2022 12:23:08 +0100 Subject: [PATCH 71/71] Util fix --- geobeam/util.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/geobeam/util.py b/geobeam/util.py index c562ca0..266bae5 100644 --- a/geobeam/util.py +++ b/geobeam/util.py @@ -252,17 +252,17 @@ def create_table_from_shp(known_args,pipeline_args): client.get_table(table_id) print("Table {} already exists.".format(table_id)) #table = client.delete_table(table_id) #We are using WRITE_TRUNCATE in BigQuery, so no need to delete, if exists - except NotFound: - print("Table {} is not found. Creating.".format(table_id)) - bigquerySchema = [] - bigqueryColumns = json.loads(schema_json) - for col in bigqueryColumns: + except: + print("Table {} is not found. Creating.".format(table_id)) + bigquerySchema = [] + bigqueryColumns = json.loads(schema_json) + for col in bigqueryColumns: bigquerySchema.append(bigquery.SchemaField(col['name'], col['type'])) - table = bigquery.Table(table_id, schema=bigquerySchema) - table = client.create_table(table) - print( - "Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id) - ) + table = bigquery.Table(table_id, schema=bigquerySchema) + table = client.create_table(table) # Make an API request. + print( + "Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id) + )