Skip to content
This repository was archived by the owner on Jul 7, 2025. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
71 commits
Select commit Hold shift + click to select a range
6041b91
Autoschema
Vadoid Oct 17, 2022
c2c6d03
Autoschema
Vadoid Oct 17, 2022
f5a3a01
Autoschema
Vadoid Oct 17, 2022
b688a28
Autoschema
Vadoid Oct 17, 2022
78cf155
Autoschema
Vadoid Oct 17, 2022
ebd53e9
Autoschema
Vadoid Oct 17, 2022
3477c0b
Autoschema
Vadoid Oct 17, 2022
6ffe111
Autoschema
Vadoid Oct 17, 2022
a60c157
Autoschema
Vadoid Oct 17, 2022
6ecebdd
Autoschema
Vadoid Oct 17, 2022
c7ba624
Autoschema
Vadoid Oct 17, 2022
6a060ed
Autoschema
Vadoid Oct 17, 2022
021b969
Autoschema
Vadoid Oct 17, 2022
49baf4c
Autoschema
Vadoid Oct 17, 2022
192f5a7
Autoschema
Vadoid Oct 17, 2022
489c851
Autoschema
Vadoid Oct 17, 2022
36b2e9f
Autoschema
Vadoid Oct 17, 2022
659b1c4
Autoschema
Vadoid Oct 17, 2022
11333cb
Autoschema
Vadoid Oct 17, 2022
e3cdacf
Autoschema
Vadoid Oct 17, 2022
5b690c1
Autoschema
Vadoid Oct 17, 2022
6043676
Autoschema
Vadoid Oct 17, 2022
3afd0ce
Autoschema
Vadoid Oct 17, 2022
581b71c
Autoschema
Vadoid Oct 17, 2022
6afd985
Autoschema
Vadoid Oct 17, 2022
81a8847
Autoschema
Vadoid Oct 17, 2022
d406808
Autoschema
Vadoid Oct 17, 2022
2b5fd70
Autoschema
Vadoid Oct 17, 2022
685cf53
Autoschema
Vadoid Oct 17, 2022
278f5c2
Autoschema
Vadoid Oct 17, 2022
8c50390
Autoschema
Vadoid Oct 17, 2022
4f6d993
Autoschema
Vadoid Oct 17, 2022
672af69
Autoschema
Vadoid Oct 18, 2022
fd246e2
Autoschema
Vadoid Oct 18, 2022
2c4ac00
Autoschema
Vadoid Oct 18, 2022
e5bb02f
Autoschema
Vadoid Oct 18, 2022
a25dcac
Autoschema
Vadoid Oct 18, 2022
e11ad26
Autoschema
Vadoid Oct 18, 2022
106b89e
Autoschema
Vadoid Oct 18, 2022
cbea299
Autoschema
Vadoid Oct 18, 2022
2869e27
Autoschema
Vadoid Oct 18, 2022
9c6826b
Autoschema
Vadoid Oct 18, 2022
816822c
Autoschema
Vadoid Oct 18, 2022
ab241c9
Autoschema
Vadoid Oct 18, 2022
bec35a6
Autoschema
Vadoid Oct 18, 2022
8470a88
Autoschema
Vadoid Oct 18, 2022
7c6b7a5
Autoschema
Vadoid Oct 18, 2022
b5f2021
Autoschema
Vadoid Oct 18, 2022
3b4012e
Autoschema
Vadoid Oct 18, 2022
aebfc94
Autoschema
Vadoid Oct 18, 2022
5dda048
Autoschema
Vadoid Oct 18, 2022
9ef6f1a
Autoschema
Vadoid Oct 18, 2022
b667a33
Autoschema
Vadoid Oct 18, 2022
9c62e4d
Autoschema
Vadoid Oct 18, 2022
b6f3047
Autoschema
Vadoid Oct 18, 2022
a9b77fa
Autoschema
Vadoid Oct 18, 2022
f92526e
Finalising Generic SHP Load
Vadoid Oct 18, 2022
08bc296
Example call
Vadoid Oct 18, 2022
ab6fca6
Geobeam Util updates
Vadoid Oct 18, 2022
4f5c7ea
Geobeam Util updates
Vadoid Oct 18, 2022
d3636a5
Geobeam Util updates
Vadoid Oct 18, 2022
c466fbf
Geobeam Util updates
Vadoid Oct 18, 2022
145c900
Geobeam Util updates
Vadoid Oct 18, 2022
6da35b0
Geobeam Util updates
Vadoid Oct 18, 2022
e78a0ad
Geobeam Util updates
Vadoid Oct 18, 2022
3bce551
Geobeam Util updates
Vadoid Oct 18, 2022
e03651c
Version Updates
Vadoid Oct 19, 2022
97a51e8
Version Updates
Vadoid Oct 19, 2022
e311e3d
Typos
Vadoid Oct 19, 2022
84b9740
Example fixes
Vadoid Oct 19, 2022
0b04658
Util fix
Vadoid Oct 20, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 14 additions & 12 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2021 Google LLC
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -12,7 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.

FROM apache/beam_python3.8_sdk:2.41.0
### Changelog ###
#10.2022 - Dockerfile versions bump to core beam sdk 3.9:2.41 and latest versions of libs (to be ran in GCP Console with Dataflow runner)
#Use with cloudbuild like:
#gcloud builds submit --tag gcr.io/[project]]/geobeam-3.9-base --timeout 3600s --machine-type n2-highcpu-16

FROM apache/beam_python3.9_sdk:2.41.0

ARG WORKDIR=/pipeline
RUN mkdir -p ${WORKDIR}
Expand All @@ -22,18 +27,18 @@ ENV CCACHE_DISABLE=1
ENV PATH=$PATH:$WORKDIR/build/usr/local/bin

RUN apt-get update -y \
&& apt-get install libffi-dev g++ cmake automake pkg-config -y \
&& apt-get install libffi-dev git g++ make cmake automake pkg-config -y \
&& apt-get clean

ENV CURL_VERSION 7.73.0
ENV CURL_VERSION 7.85.0
RUN wget -q https://curl.haxx.se/download/curl-${CURL_VERSION}.tar.gz \
&& tar -xzf curl-${CURL_VERSION}.tar.gz && cd curl-${CURL_VERSION} \
&& ./configure --prefix=/usr/local \
&& ./configure --prefix=/usr/local --without-ssl \
&& echo "building CURL ${CURL_VERSION}..." \
&& make --quiet -j$(nproc) && make --quiet install \
&& cd $WORKDIR && rm -rf curl-${CURL_VERSION}.tar.gz curl-${CURL_VERSION}

ENV GEOS_VERSION 3.9.0
ENV GEOS_VERSION 3.11.0
RUN wget -q https://download.osgeo.org/geos/geos-${GEOS_VERSION}.tar.bz2 \
&& tar -xjf geos-${GEOS_VERSION}.tar.bz2 \
&& cd geos-${GEOS_VERSION} \
Expand All @@ -42,8 +47,8 @@ RUN wget -q https://download.osgeo.org/geos/geos-${GEOS_VERSION}.tar.bz2 \
&& make --quiet -j$(nproc) && make --quiet install \
&& cd $WORKDIR && rm -rf geos-${GEOS_VERSION}.tar.bz2 geos-${GEOS_VERSION}

ENV SQLITE_VERSION 3330000
ENV SQLITE_YEAR 2020
ENV SQLITE_VERSION 3390400
ENV SQLITE_YEAR 2022
RUN wget -q https://sqlite.org/${SQLITE_YEAR}/sqlite-autoconf-${SQLITE_VERSION}.tar.gz \
&& tar -xzf sqlite-autoconf-${SQLITE_VERSION}.tar.gz && cd sqlite-autoconf-${SQLITE_VERSION} \
&& ./configure --prefix=/usr/local \
Expand All @@ -62,7 +67,7 @@ RUN wget -q https://download.osgeo.org/proj/proj-${PROJ_VERSION}.tar.gz \
&& cmake --build . --target install \
&& cd $WORKDIR && rm -rf proj-${PROJ_VERSION}.tar.gz proj-${PROJ_VERSION}

ENV OPENJPEG_VERSION 2.3.1
ENV OPENJPEG_VERSION 2.5.0
RUN wget -q -O openjpeg-${OPENJPEG_VERSION}.tar.gz https://github.com/uclouvain/openjpeg/archive/v${OPENJPEG_VERSION}.tar.gz \
&& tar -zxf openjpeg-${OPENJPEG_VERSION}.tar.gz \
&& cd openjpeg-${OPENJPEG_VERSION} \
Expand All @@ -81,9 +86,6 @@ RUN wget -q https://download.osgeo.org/gdal/${GDAL_VERSION}/gdal-${GDAL_VERSION}
&& cmake --build . --target install \
&& cd $WORKDIR && rm -rf gdal-${GDAL_VERSION}.tar.gz gdal-${GDAL_VERSION}

RUN apt-get remove g++ cmake automake pkg-config -y \
&& apt-get clean

RUN cd $WORKDIR
RUN ldconfig
RUN pip install --upgrade pip
Expand Down
9 changes: 7 additions & 2 deletions geobeam/examples/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.

FROM gcr.io/dataflow-geobeam/base
### Changelog ###
#10.2022 - Dockerfile versions bump to core beam sdk 3.9:2.41 and latest versions of libs (to be ran in GCP Console with Dataflow runner)
#Use with cloudbuild like:
#gcloud builds submit --tag gcr.io/[project]]/geobeam-example --timeout 3600s --machine-type n2-highcpu-16

COPY geobeam/examples/requirements.txt ./requirements.txt
FROM gcr.io/vadimzaripov-477-2022062208552/geobeam-base

COPY requirements.txt ./requirements.txt
RUN pip install --upgrade pip
RUN pip install -r requirements.txt

Expand Down
8 changes: 4 additions & 4 deletions geobeam/examples/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
pyproj==3.0.0.post1
fiona==1.8.18
shapely==1.7.1
rasterio==1.1.8
pyproj==3.4.0
fiona==1.8.22
shapely==1.8.5.post1
rasterio==1.3.2
93 changes: 93 additions & 0 deletions geobeam/examples/shapefile_generic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Example pipeline that loads any shape dataset into BigQuery and creates a table with the schema derived from SHP file
"""

import apache_beam as beam
from apache_beam.io.gcp.internal.clients import bigquery as beam_bigquery
from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions
from geobeam.io import ShapefileSource

#import geobeam
from geobeam.fn import format_record, make_valid, filter_invalid
from geobeam.util import create_table_from_shp


#function to orient polygons correctly (based on the linestrings)
def orient_polygon(element):
from shapely.geometry import shape, polygon, MultiPolygon

props, geom = element
geom_shape = shape(geom)

if geom_shape.geom_type == 'Polygon':
oriented_geom = polygon.orient(geom_shape)
return props, oriented_geom

if geom_shape.geom_type == 'MultiPolygon':
pgons = []
for pgon in geom_shape.geoms:
pgons.append(polygon.orient(pgon))
oriented_mpgon = MultiPolygon(pgons)
return props, oriented_mpgon

return props, geom


#Primary run function
def run(pipeline_args, known_args):
"""
Invoked by the Beam runner
"""
pipeline_options = PipelineOptions([
'--experiments', 'use_beam_bq_sink',
] + pipeline_args)

with beam.Pipeline(options=pipeline_options) as p:
(p
| beam.io.Read(ShapefileSource(known_args.gcs_url,
layer_name=known_args.layer_name))
| 'MakeValid' >> beam.Map(make_valid)
| 'FilterInvalid' >> beam.Filter(filter_invalid)
| 'OrientPolygon' >> beam.Map(orient_polygon)
| 'FormatRecords' >> beam.Map(format_record)
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
beam_bigquery.TableReference(
datasetId=known_args.dataset,
tableId=known_args.table),
method=beam.io.WriteToBigQuery.Method.FILE_LOADS,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER))



if __name__ == '__main__':
import logging
import argparse

logging.getLogger().setLevel(logging.INFO)

parser = argparse.ArgumentParser()
parser.add_argument('--gcs_url')
parser.add_argument('--dataset')
parser.add_argument('--table')
parser.add_argument('--layer_name')
parser.add_argument('--in_epsg', type=int, default=None)
known_args, pipeline_args = parser.parse_known_args()

create_table_from_shp(known_args,pipeline_args)
run(pipeline_args, known_args)
11 changes: 11 additions & 0 deletions geobeam/examples/shapefile_generic_example_call.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
python -m shapefile_generic \
--runner DataflowRunner \
--sdk_container_image "gcr.io/vadimzaripov-477-2022062208552/geobeam-example" \
--temp_location [gcs temp location with /tmp folder] \
--service_account_email [service account to be used] \
--region us-central1 \
--gcs_url [shapefile zip] \
--layer_name [layer name in shapefile]] \
--project [gcp project] \
--dataset [bq dataset] \
--table [bq table]
95 changes: 95 additions & 0 deletions geobeam/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,101 @@ def get_bigquery_schema_dataflow(filepath, layer_name=None, gdb_name=None):
return json.JSONEncoder(sort_keys=True).encode({"fields": bq_schema})


#function read shapefile based on the layer submitted, derive schema and create BQ table if doesn't exist
#beam.io.WriteToBigQuery in run function for some reason struggles with the standard json schema definitions like {"NAME":"TYPE"} (sends it with escaped " and BQ isn't happy about it)

def create_table_from_shp(known_args,pipeline_args):

import fiona
from fiona import BytesCollection
import json

from google.cloud import bigquery
from google.cloud.exceptions import NotFound
from google.cloud import storage

from apache_beam.options.pipeline_options import PipelineOptions


gcs_url = known_args.gcs_url
bucket_name = gcs_url.split('/')[2]
file_name = '/'.join(gcs_url.split('/')[3:])
zip_name = gcs_url.split('/')[-1].split('.')[0]

storage_client = storage.Client()
blob = storage_client.bucket(bucket_name).get_blob(file_name)
source_bucket = storage_client.bucket(bucket_name)
blob_uri = gcs_url

blob_2 = source_bucket.blob(file_name)
data = blob.download_as_string()

profile = None
layer_name= known_args.layer_name

if layer_name is not None:
with fiona.io.ZipMemoryFile(data) as zip:
with zip.open(f'{layer_name}.shp') as collection:
print(collection)
profile = collection.profile
elif layer_name is not None:
profile = BytesCollection(data, layer=layer_name).profile
else:
profile = fiona.open(gcs_url).profile

from fiona import prop_type

BQ_FIELD_TYPES = {
'int': 'INTEGER',
'str': 'STRING',
'float': 'FLOAT',
'bool': 'BOOL',
'date': 'DATE',
'time': 'TIME',
'datetime': 'DATETIME',
'bytes': 'BYTES'
}

bq_schema = []

for field_name, field_type in profile['schema']['properties'].items():
fiona_type = prop_type(field_type)
bq_type = BQ_FIELD_TYPES[fiona.schema.FIELD_TYPES_MAP_REV[fiona_type]]
bq_schema.append({
'name': field_name,
'type': bq_type
})

bq_schema.append({
'name': 'geom',
'type': 'GEOGRAPHY',
})

schema_json = json.JSONEncoder(sort_keys=True).encode(bq_schema)

client = bigquery.Client()

options = list(PipelineOptions(pipeline_args).display_data().values()) #impossible to acquire project from known_args, had to be creative with PipelineOptions
table_id=f"{options[1]}.{known_args.dataset}.{known_args.table}"

try:
client.get_table(table_id)
print("Table {} already exists.".format(table_id))
#table = client.delete_table(table_id) #We are using WRITE_TRUNCATE in BigQuery, so no need to delete, if exists
except:
print("Table {} is not found. Creating.".format(table_id))
bigquerySchema = []
bigqueryColumns = json.loads(schema_json)
for col in bigqueryColumns:
bigquerySchema.append(bigquery.SchemaField(col['name'], col['type']))
table = bigquery.Table(table_id, schema=bigquerySchema)
table = client.create_table(table) # Make an API request.
print(
"Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id)
)



if __name__ == '__main__':
import argparse
import json
Expand Down
11 changes: 7 additions & 4 deletions setup.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2021 Google LLC
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -12,6 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.

### Changelog ###
#10.2022 - VZ - Version bump in sync with base dockerfile

from __future__ import absolute_import

import setuptools
Expand All @@ -20,9 +23,9 @@
import geobeam

REQUIRED_PACKAGES = [
'apache_beam[gcp]==2.41.0',
'fiona==1.8.21',
'shapely==1.8.4',
'apache_beam[gcp]==2.42.0',
'fiona==1.8.22',
'shapely==1.8.5.post1',
'rasterio==1.3.2',
'google-cloud-storage==2.5.0',
'esridump==1.11.0'
Expand Down