diff --git a/Dockerfile b/Dockerfile index f8c9a08..75d3519 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -# Copyright 2021 Google LLC +# Copyright 2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,7 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -FROM apache/beam_python3.8_sdk:2.41.0 +### Changelog ### +#10.2022 - Dockerfile versions bump to core beam sdk 3.9:2.41 and latest versions of libs (to be ran in GCP Console with Dataflow runner) +#Use with cloudbuild like: +#gcloud builds submit --tag gcr.io/[project]]/geobeam-3.9-base --timeout 3600s --machine-type n2-highcpu-16 + +FROM apache/beam_python3.9_sdk:2.41.0 ARG WORKDIR=/pipeline RUN mkdir -p ${WORKDIR} @@ -22,18 +27,18 @@ ENV CCACHE_DISABLE=1 ENV PATH=$PATH:$WORKDIR/build/usr/local/bin RUN apt-get update -y \ - && apt-get install libffi-dev g++ cmake automake pkg-config -y \ + && apt-get install libffi-dev git g++ make cmake automake pkg-config -y \ && apt-get clean -ENV CURL_VERSION 7.73.0 +ENV CURL_VERSION 7.85.0 RUN wget -q https://curl.haxx.se/download/curl-${CURL_VERSION}.tar.gz \ && tar -xzf curl-${CURL_VERSION}.tar.gz && cd curl-${CURL_VERSION} \ - && ./configure --prefix=/usr/local \ + && ./configure --prefix=/usr/local --without-ssl \ && echo "building CURL ${CURL_VERSION}..." \ && make --quiet -j$(nproc) && make --quiet install \ && cd $WORKDIR && rm -rf curl-${CURL_VERSION}.tar.gz curl-${CURL_VERSION} -ENV GEOS_VERSION 3.9.0 +ENV GEOS_VERSION 3.11.0 RUN wget -q https://download.osgeo.org/geos/geos-${GEOS_VERSION}.tar.bz2 \ && tar -xjf geos-${GEOS_VERSION}.tar.bz2 \ && cd geos-${GEOS_VERSION} \ @@ -42,8 +47,8 @@ RUN wget -q https://download.osgeo.org/geos/geos-${GEOS_VERSION}.tar.bz2 \ && make --quiet -j$(nproc) && make --quiet install \ && cd $WORKDIR && rm -rf geos-${GEOS_VERSION}.tar.bz2 geos-${GEOS_VERSION} -ENV SQLITE_VERSION 3330000 -ENV SQLITE_YEAR 2020 +ENV SQLITE_VERSION 3390400 +ENV SQLITE_YEAR 2022 RUN wget -q https://sqlite.org/${SQLITE_YEAR}/sqlite-autoconf-${SQLITE_VERSION}.tar.gz \ && tar -xzf sqlite-autoconf-${SQLITE_VERSION}.tar.gz && cd sqlite-autoconf-${SQLITE_VERSION} \ && ./configure --prefix=/usr/local \ @@ -62,7 +67,7 @@ RUN wget -q https://download.osgeo.org/proj/proj-${PROJ_VERSION}.tar.gz \ && cmake --build . --target install \ && cd $WORKDIR && rm -rf proj-${PROJ_VERSION}.tar.gz proj-${PROJ_VERSION} -ENV OPENJPEG_VERSION 2.3.1 +ENV OPENJPEG_VERSION 2.5.0 RUN wget -q -O openjpeg-${OPENJPEG_VERSION}.tar.gz https://github.com/uclouvain/openjpeg/archive/v${OPENJPEG_VERSION}.tar.gz \ && tar -zxf openjpeg-${OPENJPEG_VERSION}.tar.gz \ && cd openjpeg-${OPENJPEG_VERSION} \ @@ -81,9 +86,6 @@ RUN wget -q https://download.osgeo.org/gdal/${GDAL_VERSION}/gdal-${GDAL_VERSION} && cmake --build . --target install \ && cd $WORKDIR && rm -rf gdal-${GDAL_VERSION}.tar.gz gdal-${GDAL_VERSION} -RUN apt-get remove g++ cmake automake pkg-config -y \ - && apt-get clean - RUN cd $WORKDIR RUN ldconfig RUN pip install --upgrade pip diff --git a/geobeam/examples/Dockerfile b/geobeam/examples/Dockerfile index c2678b9..e8793cc 100644 --- a/geobeam/examples/Dockerfile +++ b/geobeam/examples/Dockerfile @@ -12,9 +12,14 @@ # See the License for the specific language governing permissions and # limitations under the License. -FROM gcr.io/dataflow-geobeam/base +### Changelog ### +#10.2022 - Dockerfile versions bump to core beam sdk 3.9:2.41 and latest versions of libs (to be ran in GCP Console with Dataflow runner) +#Use with cloudbuild like: +#gcloud builds submit --tag gcr.io/[project]]/geobeam-example --timeout 3600s --machine-type n2-highcpu-16 -COPY geobeam/examples/requirements.txt ./requirements.txt +FROM gcr.io/vadimzaripov-477-2022062208552/geobeam-base + +COPY requirements.txt ./requirements.txt RUN pip install --upgrade pip RUN pip install -r requirements.txt diff --git a/geobeam/examples/requirements.txt b/geobeam/examples/requirements.txt index aec58c7..e91bc4c 100644 --- a/geobeam/examples/requirements.txt +++ b/geobeam/examples/requirements.txt @@ -1,4 +1,4 @@ -pyproj==3.0.0.post1 -fiona==1.8.18 -shapely==1.7.1 -rasterio==1.1.8 +pyproj==3.4.0 +fiona==1.8.22 +shapely==1.8.5.post1 +rasterio==1.3.2 diff --git a/geobeam/examples/shapefile_generic.py b/geobeam/examples/shapefile_generic.py new file mode 100644 index 0000000..9128060 --- /dev/null +++ b/geobeam/examples/shapefile_generic.py @@ -0,0 +1,93 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Example pipeline that loads any shape dataset into BigQuery and creates a table with the schema derived from SHP file +""" + +import apache_beam as beam +from apache_beam.io.gcp.internal.clients import bigquery as beam_bigquery +from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json +from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions +from geobeam.io import ShapefileSource + +#import geobeam +from geobeam.fn import format_record, make_valid, filter_invalid +from geobeam.util import create_table_from_shp + + +#function to orient polygons correctly (based on the linestrings) +def orient_polygon(element): + from shapely.geometry import shape, polygon, MultiPolygon + + props, geom = element + geom_shape = shape(geom) + + if geom_shape.geom_type == 'Polygon': + oriented_geom = polygon.orient(geom_shape) + return props, oriented_geom + + if geom_shape.geom_type == 'MultiPolygon': + pgons = [] + for pgon in geom_shape.geoms: + pgons.append(polygon.orient(pgon)) + oriented_mpgon = MultiPolygon(pgons) + return props, oriented_mpgon + + return props, geom + + +#Primary run function +def run(pipeline_args, known_args): + """ + Invoked by the Beam runner + """ + pipeline_options = PipelineOptions([ + '--experiments', 'use_beam_bq_sink', + ] + pipeline_args) + + with beam.Pipeline(options=pipeline_options) as p: + (p + | beam.io.Read(ShapefileSource(known_args.gcs_url, + layer_name=known_args.layer_name)) + | 'MakeValid' >> beam.Map(make_valid) + | 'FilterInvalid' >> beam.Filter(filter_invalid) + | 'OrientPolygon' >> beam.Map(orient_polygon) + | 'FormatRecords' >> beam.Map(format_record) + | 'WriteToBigQuery' >> beam.io.WriteToBigQuery( + beam_bigquery.TableReference( + datasetId=known_args.dataset, + tableId=known_args.table), + method=beam.io.WriteToBigQuery.Method.FILE_LOADS, + write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE, + create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER)) + + + +if __name__ == '__main__': + import logging + import argparse + + logging.getLogger().setLevel(logging.INFO) + + parser = argparse.ArgumentParser() + parser.add_argument('--gcs_url') + parser.add_argument('--dataset') + parser.add_argument('--table') + parser.add_argument('--layer_name') + parser.add_argument('--in_epsg', type=int, default=None) + known_args, pipeline_args = parser.parse_known_args() + + create_table_from_shp(known_args,pipeline_args) + run(pipeline_args, known_args) diff --git a/geobeam/examples/shapefile_generic_example_call.md b/geobeam/examples/shapefile_generic_example_call.md new file mode 100644 index 0000000..5fb527f --- /dev/null +++ b/geobeam/examples/shapefile_generic_example_call.md @@ -0,0 +1,11 @@ +python -m shapefile_generic \ +--runner DataflowRunner \ +--sdk_container_image "gcr.io/vadimzaripov-477-2022062208552/geobeam-example" \ +--temp_location [gcs temp location with /tmp folder] \ +--service_account_email [service account to be used] \ +--region us-central1 \ +--gcs_url [shapefile zip] \ +--layer_name [layer name in shapefile]] \ +--project [gcp project] \ +--dataset [bq dataset] \ +--table [bq table] \ No newline at end of file diff --git a/geobeam/util.py b/geobeam/util.py index 1669991..266bae5 100644 --- a/geobeam/util.py +++ b/geobeam/util.py @@ -171,6 +171,101 @@ def get_bigquery_schema_dataflow(filepath, layer_name=None, gdb_name=None): return json.JSONEncoder(sort_keys=True).encode({"fields": bq_schema}) +#function read shapefile based on the layer submitted, derive schema and create BQ table if doesn't exist +#beam.io.WriteToBigQuery in run function for some reason struggles with the standard json schema definitions like {"NAME":"TYPE"} (sends it with escaped " and BQ isn't happy about it) + +def create_table_from_shp(known_args,pipeline_args): + + import fiona + from fiona import BytesCollection + import json + + from google.cloud import bigquery + from google.cloud.exceptions import NotFound + from google.cloud import storage + + from apache_beam.options.pipeline_options import PipelineOptions + + + gcs_url = known_args.gcs_url + bucket_name = gcs_url.split('/')[2] + file_name = '/'.join(gcs_url.split('/')[3:]) + zip_name = gcs_url.split('/')[-1].split('.')[0] + + storage_client = storage.Client() + blob = storage_client.bucket(bucket_name).get_blob(file_name) + source_bucket = storage_client.bucket(bucket_name) + blob_uri = gcs_url + + blob_2 = source_bucket.blob(file_name) + data = blob.download_as_string() + + profile = None + layer_name= known_args.layer_name + + if layer_name is not None: + with fiona.io.ZipMemoryFile(data) as zip: + with zip.open(f'{layer_name}.shp') as collection: + print(collection) + profile = collection.profile + elif layer_name is not None: + profile = BytesCollection(data, layer=layer_name).profile + else: + profile = fiona.open(gcs_url).profile + + from fiona import prop_type + + BQ_FIELD_TYPES = { + 'int': 'INTEGER', + 'str': 'STRING', + 'float': 'FLOAT', + 'bool': 'BOOL', + 'date': 'DATE', + 'time': 'TIME', + 'datetime': 'DATETIME', + 'bytes': 'BYTES' + } + + bq_schema = [] + + for field_name, field_type in profile['schema']['properties'].items(): + fiona_type = prop_type(field_type) + bq_type = BQ_FIELD_TYPES[fiona.schema.FIELD_TYPES_MAP_REV[fiona_type]] + bq_schema.append({ + 'name': field_name, + 'type': bq_type + }) + + bq_schema.append({ + 'name': 'geom', + 'type': 'GEOGRAPHY', + }) + + schema_json = json.JSONEncoder(sort_keys=True).encode(bq_schema) + + client = bigquery.Client() + + options = list(PipelineOptions(pipeline_args).display_data().values()) #impossible to acquire project from known_args, had to be creative with PipelineOptions + table_id=f"{options[1]}.{known_args.dataset}.{known_args.table}" + + try: + client.get_table(table_id) + print("Table {} already exists.".format(table_id)) + #table = client.delete_table(table_id) #We are using WRITE_TRUNCATE in BigQuery, so no need to delete, if exists + except: + print("Table {} is not found. Creating.".format(table_id)) + bigquerySchema = [] + bigqueryColumns = json.loads(schema_json) + for col in bigqueryColumns: + bigquerySchema.append(bigquery.SchemaField(col['name'], col['type'])) + table = bigquery.Table(table_id, schema=bigquerySchema) + table = client.create_table(table) # Make an API request. + print( + "Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id) + ) + + + if __name__ == '__main__': import argparse import json diff --git a/setup.py b/setup.py index 26a99b5..f8d67a1 100644 --- a/setup.py +++ b/setup.py @@ -1,4 +1,4 @@ -# Copyright 2021 Google LLC +# Copyright 2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,6 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +### Changelog ### +#10.2022 - VZ - Version bump in sync with base dockerfile + from __future__ import absolute_import import setuptools @@ -20,9 +23,9 @@ import geobeam REQUIRED_PACKAGES = [ - 'apache_beam[gcp]==2.41.0', - 'fiona==1.8.21', - 'shapely==1.8.4', + 'apache_beam[gcp]==2.42.0', + 'fiona==1.8.22', + 'shapely==1.8.5.post1', 'rasterio==1.3.2', 'google-cloud-storage==2.5.0', 'esridump==1.11.0'