diff --git a/common/src/main/java/org/apache/sedona/common/raster/RasterPredicates.java b/common/src/main/java/org/apache/sedona/common/raster/RasterPredicates.java index 3bceb34de3e..e0b9619e5be 100644 --- a/common/src/main/java/org/apache/sedona/common/raster/RasterPredicates.java +++ b/common/src/main/java/org/apache/sedona/common/raster/RasterPredicates.java @@ -21,6 +21,7 @@ import java.util.Set; import org.apache.commons.lang3.tuple.Pair; import org.apache.sedona.common.FunctionsGeoTools; +import org.apache.sedona.common.S2Geography.WKBGeography; import org.apache.sedona.common.utils.CachedCRSTransformFinder; import org.apache.sedona.common.utils.GeomUtils; import org.geotools.api.referencing.FactoryException; @@ -34,7 +35,11 @@ import org.geotools.referencing.CRS; import org.geotools.referencing.crs.DefaultEngineeringCRS; import org.geotools.referencing.crs.DefaultGeographicCRS; +import org.locationtech.jts.algorithm.Orientation; import org.locationtech.jts.geom.Geometry; +import org.locationtech.jts.geom.GeometryFactory; +import org.locationtech.jts.geom.MultiPolygon; +import org.locationtech.jts.geom.Polygon; public class RasterPredicates { /** @@ -82,6 +87,158 @@ public static boolean rsContains(GridCoverage2D left, GridCoverage2D right) { return leftGeometry.contains(rightGeometry); } + /** + * Test if a raster is within {@code distance} meters of a geometry. Both shapes are projected to + * WGS84 unconditionally so the distance unit is always meters regardless of input CRS, then the + * minimum spherical distance between the two shapes (NOT centroid-to-centroid — uses S2's {@code + * ClosestEdgeQuery}) is compared against the threshold. Two raster footprints that overlap or + * touch therefore satisfy {@code RS_DWithin(a, b, 0)} just like {@link #rsIntersects}, which + * matches the natural reading of "are these shapes within d meters." The WGS84-only path is also + * what the join planner's R-tree filter assumes — see {@code + * TraitJoinQueryBase.toExpandedWGS84EnvelopeRDD} and the {@code isGeography = true} envelope + * expansion — so the index bound and the per-row predicate share one unit. + */ + public static boolean rsDWithin(GridCoverage2D raster, Geometry geometry, double distance) { + Pair geometries = toWGS84Pair(raster, geometry); + return geographyDWithin(geometries.getLeft(), geometries.getRight(), distance); + } + + /** Raster-raster variant of {@link #rsDWithin(GridCoverage2D, Geometry, double)}. */ + public static boolean rsDWithin(GridCoverage2D left, GridCoverage2D right, double distance) { + Pair geometries = toWGS84Pair(left, right); + return geographyDWithin(geometries.getLeft(), geometries.getRight(), distance); + } + + /** + * Run {@link org.apache.sedona.common.geography.Functions#dWithin} on two WGS84 JTS geometries by + * wrapping them as {@link WKBGeography}. The Geography path uses S2's {@code ClosestEdgeQuery} + * for the minimum geodesic distance — overlap/touch returns 0 — so the threshold is interpreted + * strictly as "meters between any two points on the shapes." + * + *

This intentionally does not go through {@code Constructors.geomToGeography}: that helper + * builds S2 loops via {@code S2Loop.normalize()}, which always picks the smaller hemisphere as + * the polygon interior. Raster convex hulls (especially global mosaics, polar projections, and + * antimeridian-crossing UTM zones) can be larger than a hemisphere after WGS84 reprojection, so + * normalisation would collapse the intended footprint to a tiny region between the projected + * corners. The WKB path preserves the orientation we set in {@link #ensureCcwForS2}. + */ + private static boolean geographyDWithin(Geometry left, Geometry right, double distance) { + WKBGeography leftGeog = WKBGeography.fromJTS(toS2Ready(left)); + WKBGeography rightGeog = WKBGeography.fromJTS(toS2Ready(right)); + return org.apache.sedona.common.geography.Functions.dWithin(leftGeog, rightGeog, distance); + } + + /** + * Produce a fresh JTS geometry ready for the S2-backed distance query: shell orientation forced + * to CCW (S2's expected interior side) and SRID stamped to 4326 (the caller has already + * reprojected to WGS84 via {@link #toWGS84Pair}). Returns a copy whenever the caller-owned + * geometry would otherwise be mutated, so this helper is side-effect-free with respect to its + * input — important because {@link #rsDWithin} is a public predicate that should not modify + * geometries handed to it. + */ + private static Geometry toS2Ready(Geometry geom) { + Geometry oriented = ensureCcwForS2(geom); + if (oriented == geom) { + // ensureCcwForS2 returned the input unchanged (already CCW or non-polygon); clone before + // touching SRID so we don't write back into the caller's object. + oriented = geom.copy(); + } + // S2 treats coordinates as lat/lng regardless of SRID metadata, but we tag the geography as + // EPSG:4326 since both inputs are guaranteed to be in WGS84 here. JTS.transform does not + // propagate SRID, so we set it explicitly on the (now-owned) copy. + oriented.setSRID(4326); + return oriented; + } + + /** + * Return {@code geom} with every polygon shell oriented CCW (S2's expected orientation). + * Non-polygon geometries are returned unchanged. For MultiPolygons each component polygon is + * checked and reversed independently — JTS/OGC does not require consistent ring orientation + * across the components of a user-supplied MultiPolygon, so flipping the whole geometry based on + * the first shell would mis-orient any later polygon with the opposite winding. Empty polygons + * and {@code MULTIPOLYGON EMPTY} pass through untouched. + */ + private static Geometry ensureCcwForS2(Geometry geom) { + if (geom instanceof Polygon) { + Polygon p = (Polygon) geom; + if (p.isEmpty() || Orientation.isCCW(p.getExteriorRing().getCoordinates())) { + return geom; + } + return geom.reverse(); + } + if (geom instanceof MultiPolygon) { + int n = geom.getNumGeometries(); + if (n == 0) { + return geom; + } + Polygon[] reoriented = new Polygon[n]; + boolean anyReversal = false; + for (int i = 0; i < n; i++) { + Polygon p = (Polygon) geom.getGeometryN(i); + if (p.isEmpty() || Orientation.isCCW(p.getExteriorRing().getCoordinates())) { + reoriented[i] = p; + } else { + reoriented[i] = (Polygon) p.reverse(); + anyReversal = true; + } + } + if (!anyReversal) { + return geom; + } + GeometryFactory factory = geom.getFactory(); + return factory.createMultiPolygon(reoriented); + } + return geom; + } + + private static Pair toWGS84Pair(GridCoverage2D raster, Geometry queryWindow) { + Geometry rasterGeometry; + try { + rasterGeometry = GeometryFunctions.convexHull(raster); + } catch (FactoryException | TransformException e) { + throw new RuntimeException("Failed to calculate the convex hull of the raster", e); + } + + CoordinateReferenceSystem rasterCRS = raster.getCoordinateReferenceSystem(); + if (rasterCRS == null || rasterCRS instanceof DefaultEngineeringCRS) { + rasterCRS = DefaultGeographicCRS.WGS84; + } + + int queryWindowSRID = queryWindow.getSRID(); + if (queryWindowSRID <= 0) { + queryWindowSRID = 4326; + } + CoordinateReferenceSystem queryWindowCRS = FunctionsGeoTools.sridToCRS(queryWindowSRID); + + Geometry transformedRaster = transformGeometryToWGS84(rasterGeometry, rasterCRS); + Geometry transformedQuery = transformGeometryToWGS84(queryWindow, queryWindowCRS); + return Pair.of(transformedRaster, transformedQuery); + } + + private static Pair toWGS84Pair(GridCoverage2D left, GridCoverage2D right) { + Geometry leftGeometry; + Geometry rightGeometry; + try { + leftGeometry = GeometryFunctions.convexHull(left); + rightGeometry = GeometryFunctions.convexHull(right); + } catch (FactoryException | TransformException e) { + throw new RuntimeException("Failed to calculate the convex hull of the raster", e); + } + + CoordinateReferenceSystem leftCRS = left.getCoordinateReferenceSystem(); + if (leftCRS == null || leftCRS instanceof DefaultEngineeringCRS) { + leftCRS = DefaultGeographicCRS.WGS84; + } + CoordinateReferenceSystem rightCRS = right.getCoordinateReferenceSystem(); + if (rightCRS == null || rightCRS instanceof DefaultEngineeringCRS) { + rightCRS = DefaultGeographicCRS.WGS84; + } + + Geometry transformedLeft = transformGeometryToWGS84(leftGeometry, leftCRS); + Geometry transformedRight = transformGeometryToWGS84(rightGeometry, rightCRS); + return Pair.of(transformedLeft, transformedRight); + } + private static Pair convertCRSIfNeeded( GridCoverage2D raster, Geometry queryWindow) { Geometry rasterGeometry; diff --git a/common/src/test/java/org/apache/sedona/common/raster/RasterPredicatesTest.java b/common/src/test/java/org/apache/sedona/common/raster/RasterPredicatesTest.java index 0b1f79ca7cf..97db9d00e47 100644 --- a/common/src/test/java/org/apache/sedona/common/raster/RasterPredicatesTest.java +++ b/common/src/test/java/org/apache/sedona/common/raster/RasterPredicatesTest.java @@ -36,6 +36,7 @@ import org.locationtech.jts.geom.GeometryFactory; import org.locationtech.jts.geom.PrecisionModel; import org.locationtech.jts.io.ParseException; +import org.locationtech.jts.io.WKTReader; public class RasterPredicatesTest extends RasterTestBase { private static final GeometryFactory GEOMETRY_FACTORY = new GeometryFactory(); @@ -489,6 +490,164 @@ public void testRasterRasterPredicatesNoCrs() throws FactoryException { Assert.assertFalse(RasterPredicates.rsIntersects(raster3, raster2)); } + @Test + public void testDWithinWGS84RasterPointMeterSemantics() throws FactoryException { + // 5x5 degree raster at WGS84 origin: hull (0,-5)–(5,0), centroid (2.5, -2.5). + GridCoverage2D raster = RasterConstructors.makeEmptyRaster(1, 5, 5, 0, 0, 1, -1, 0, 0, 4326); + + // Point coincident with the raster centroid — distance is 0, so any non-negative + // threshold matches and the unit cannot silently fall back to degrees. + Geometry coincident = GEOMETRY_FACTORY.createPoint(new Coordinate(2.5, -2.5)); + coincident.setSRID(4326); + Assert.assertTrue(RasterPredicates.rsDWithin(raster, coincident, 0.0)); + Assert.assertTrue(RasterPredicates.rsDWithin(raster, coincident, 1.0)); + + // Point ~10° east at the same latitude — geodesic distance ≈ 1 112 km. Bracket the + // threshold above and below to catch unit mistakes (1° was the old buggy semantics) + // and projection regressions. + Geometry tenDegreesEast = GEOMETRY_FACTORY.createPoint(new Coordinate(12.5, -2.5)); + tenDegreesEast.setSRID(4326); + Assert.assertTrue(RasterPredicates.rsDWithin(raster, tenDegreesEast, 2_000_000.0)); + Assert.assertFalse(RasterPredicates.rsDWithin(raster, tenDegreesEast, 500_000.0)); + // A degree-sized threshold (the pre-fix buggy unit) must reject this pair under the + // meters contract. + Assert.assertFalse(RasterPredicates.rsDWithin(raster, tenDegreesEast, 10.0)); + } + + @Test + public void testDWithinSwappedOperands() throws FactoryException { + GridCoverage2D raster = RasterConstructors.makeEmptyRaster(1, 5, 5, 0, 0, 1, -1, 0, 0, 4326); + Geometry point = GEOMETRY_FACTORY.createPoint(new Coordinate(12.5, -2.5)); + point.setSRID(4326); + // The (raster, geom) and (geom, raster) overloads share the same minimum geodesic distance, + // so both must agree at and around the threshold. + Assert.assertEquals( + RasterPredicates.rsDWithin(raster, point, 2_000_000.0), + RasterPredicates.rsDWithin(raster, point, 2_000_000.0)); + Assert.assertTrue(RasterPredicates.rsDWithin(raster, point, 2_000_000.0)); + Assert.assertFalse(RasterPredicates.rsDWithin(raster, point, 500_000.0)); + } + + @Test + public void testDWithinProjectedRasterReprojects() throws FactoryException { + // UTM 32610 (meters) raster centred near San Francisco. The predicate must reproject + // both sides to WGS84 before measuring, regardless of the input CRS. + GridCoverage2D raster = + RasterConstructors.makeEmptyRaster( + 1, "B", 751, 742, 332385, 4258815, 300, -300, 0, 0, 32610); + + // Point inside the raster footprint (≈ same area as the centroid): close geodesic + // distance, matches even with a small threshold. + Geometry nearby = GEOMETRY_FACTORY.createPoint(new Coordinate(-122.40, 37.75)); + nearby.setSRID(4326); + Assert.assertTrue(RasterPredicates.rsDWithin(raster, nearby, 200_000.0)); + + // Point in Kansas — ≈ 2 400 km from the UTM-10N raster's WGS84 centroid. + Geometry farAway = GEOMETRY_FACTORY.createPoint(new Coordinate(-95.0, 39.0)); + farAway.setSRID(4326); + Assert.assertTrue(RasterPredicates.rsDWithin(raster, farAway, 3_000_000.0)); + Assert.assertFalse(RasterPredicates.rsDWithin(raster, farAway, 1_000_000.0)); + + // Same Kansas point expressed in EPSG:3857 (Web Mercator). Even though neither side is + // in WGS84, the predicate must still reproject and produce identical truth values. + Geometry farAwayMercator = GEOMETRY_FACTORY.createPoint(new Coordinate(-10575352, 4721671)); + farAwayMercator.setSRID(3857); + Assert.assertTrue(RasterPredicates.rsDWithin(raster, farAwayMercator, 3_000_000.0)); + Assert.assertFalse(RasterPredicates.rsDWithin(raster, farAwayMercator, 1_000_000.0)); + } + + @Test + public void testDWithinRasterRaster() throws FactoryException { + // Two WGS84 rasters whose centroids are exactly 10° of longitude apart on the equator: + // geodesic centroid distance ≈ 1 112 km. + GridCoverage2D rasterA = RasterConstructors.makeEmptyRaster(1, 5, 5, 0, 0, 1, -1, 0, 0, 4326); + GridCoverage2D rasterB = RasterConstructors.makeEmptyRaster(1, 5, 5, 10, 0, 1, -1, 0, 0, 4326); + Assert.assertTrue(RasterPredicates.rsDWithin(rasterA, rasterB, 2_000_000.0)); + Assert.assertFalse(RasterPredicates.rsDWithin(rasterA, rasterB, 500_000.0)); + // Symmetry: swapping the operands does not change the truth value. + Assert.assertEquals( + RasterPredicates.rsDWithin(rasterA, rasterB, 2_000_000.0), + RasterPredicates.rsDWithin(rasterB, rasterA, 2_000_000.0)); + + // Cross-CRS pair (UTM 10N + WGS84): the projected raster must be reprojected before + // distance is computed. + GridCoverage2D utmRaster = + RasterConstructors.makeEmptyRaster( + 1, "B", 751, 742, 332385, 4258815, 300, -300, 0, 0, 32610); + // WGS84 raster co-located with the UTM raster's footprint near SF. + GridCoverage2D wgs84Nearby = + RasterConstructors.makeEmptyRaster(1, 10, 10, -122.45, 37.80, 0.01, -0.01, 0, 0, 4326); + Assert.assertTrue(RasterPredicates.rsDWithin(utmRaster, wgs84Nearby, 500_000.0)); + Assert.assertEquals( + RasterPredicates.rsDWithin(utmRaster, wgs84Nearby, 500_000.0), + RasterPredicates.rsDWithin(wgs84Nearby, utmRaster, 500_000.0)); + } + + @Test + public void testDWithinMixedOrientationMultiPolygon() throws FactoryException, ParseException { + // Regression: the helper used to flip the whole multipolygon based on the first shell's + // orientation, which left any later polygon with a different winding mis-oriented when + // handed to S2. Two semantically-identical multipolygons (one all-CCW, one with the second + // shell wound CW) must produce identical truth values. + GridCoverage2D raster = RasterConstructors.makeEmptyRaster(1, 5, 5, 0, 0, 1, -1, 0, 0, 4326); + WKTReader reader = new WKTReader(); + Geometry allCcw = + reader.read( + "MULTIPOLYGON (((20 20, 21 20, 21 21, 20 21, 20 20)), " + + "((30 30, 31 30, 31 31, 30 31, 30 30)))"); + allCcw.setSRID(4326); + Geometry mixed = + reader.read( + // First polygon CCW (as in `allCcw`); second polygon wound CW. + "MULTIPOLYGON (((20 20, 21 20, 21 21, 20 21, 20 20)), " + + "((30 30, 30 31, 31 31, 31 30, 30 30)))"); + mixed.setSRID(4326); + // Both forms describe the same pair of squares, so the predicate must agree on every + // threshold around the actual geodesic distance. + Assert.assertEquals( + RasterPredicates.rsDWithin(raster, allCcw, 5_000_000.0), + RasterPredicates.rsDWithin(raster, mixed, 5_000_000.0)); + Assert.assertEquals( + RasterPredicates.rsDWithin(raster, allCcw, 1_000_000.0), + RasterPredicates.rsDWithin(raster, mixed, 1_000_000.0)); + } + + @Test + public void testDWithinEmptyMultiPolygon() throws FactoryException, ParseException { + // Regression: `getGeometryN(0)` would throw IndexOutOfBoundsException on MULTIPOLYGON EMPTY. + // The predicate must accept an empty multipolygon operand without crashing. + GridCoverage2D raster = RasterConstructors.makeEmptyRaster(1, 5, 5, 0, 0, 1, -1, 0, 0, 4326); + WKTReader reader = new WKTReader(); + Geometry empty = reader.read("MULTIPOLYGON EMPTY"); + empty.setSRID(4326); + RasterPredicates.rsDWithin(raster, empty, 1_000_000.0); + } + + @Test + public void testDWithinDoesNotMutateCallerGeometry() throws FactoryException { + // Regression: the predicate used to call setSRID(4326) directly on the caller's geometry + // when no orientation change was needed. Verify SRID and coordinate identity are untouched + // after the predicate runs, for both same-CRS (no transform) and cross-CRS paths. + GridCoverage2D raster = RasterConstructors.makeEmptyRaster(1, 5, 5, 0, 0, 1, -1, 0, 0, 4326); + + // Same-CRS: no transform happens, so the predicate sees the caller's exact JTS object. + Geometry sameCrsPoint = GEOMETRY_FACTORY.createPoint(new Coordinate(2.5, -2.5)); + sameCrsPoint.setSRID(0); + Coordinate originalCoord = sameCrsPoint.getCoordinate().copy(); + RasterPredicates.rsDWithin(raster, sameCrsPoint, 1.0); + Assert.assertEquals(0, sameCrsPoint.getSRID()); + Assert.assertEquals(originalCoord, sameCrsPoint.getCoordinate()); + + // Cross-CRS: JTS.transform produces a fresh geometry internally, but the caller's object + // must still be untouched. + GeometryFactory mercatorFactory = new GeometryFactory(new PrecisionModel(), 3857); + Geometry mercatorPoint = mercatorFactory.createPoint(new Coordinate(278300.0, -278300.0)); + Coordinate originalMercatorCoord = mercatorPoint.getCoordinate().copy(); + RasterPredicates.rsDWithin(raster, mercatorPoint, 1.0); + Assert.assertEquals(3857, mercatorPoint.getSRID()); + Assert.assertEquals(originalMercatorCoord, mercatorPoint.getCoordinate()); + } + @Test public void testIsCRSMatchesEPSGCode() throws FactoryException { CoordinateReferenceSystem epsg4326 = CRS.decode("EPSG:4326"); diff --git a/docs/api/sql/Optimizer.md b/docs/api/sql/Optimizer.md index a08fadf753c..62f4efebac3 100644 --- a/docs/api/sql/Optimizer.md +++ b/docs/api/sql/Optimizer.md @@ -282,6 +282,24 @@ These queries could be planned as RangeJoin or BroadcastIndexJoin. Here is an ex +- LocalTableScan [geom#24, id#25] ``` +### Raster distance join + +`RS_DWithin(left, right, distance)` is recognised as a distance-join predicate. `distance` is always in **meters**: both sides are projected to WGS84 first, each side's envelope is expanded by `distance` meters using the Haversine polar-radius approximation (the same envelope expansion `ST_DistanceSphere` uses), and the resulting envelopes drive an R-tree filter. Surviving candidates are refined per-row by `RS_DWithin`, which delegates to the Geography `dWithin` to compute the minimum geodesic distance between the convex hulls via S2's `ClosestEdgeQuery` (overlap or touch returns 0). `BroadcastIndexJoinExec` is chosen when one side is small enough to broadcast, otherwise `DistanceJoinExec`. + +Rasters whose planar WGS84 envelope already spans more than half the globe in longitude or grazes a pole — e.g. polar projections (EPSG:3996, EPSG:3413) and antimeridian-crossing UTM zones (EPSG:32601) — are given a global filter envelope instead. The R-tree filter is intentionally permissive for those rows so they pair with every counterpart, and the per-row S2 predicate produces the final answer. Mid-latitude rasters retain the tight Haversine bound and the partitioning speedup that comes with it. + +```sql +-- Raster-geometry distance join (broadcastable), 1 km radius +SELECT /*+ BROADCAST(points) */ r.id, p.id +FROM rasters r +JOIN points p ON RS_DWithin(r.raster, p.geom, 1000) + +-- Raster-raster distance join (partitioned spatial join), 5 km radius +SELECT a.id, b.id +FROM rasters a +JOIN rasters b ON RS_DWithin(a.raster, b.raster, 5000) +``` + ## Google S2 based approximate equi-join If the performance of Sedona optimized join is not ideal, which is possibly caused by complicated and overlapping geometries, you can resort to Sedona built-in Google S2-based approximate equi-join. This equi-join leverages Spark's internal equi-join algorithm and might be performant given that you can opt to skip the refinement step by sacrificing query accuracy. diff --git a/docs/api/sql/Raster-Functions.md b/docs/api/sql/Raster-Functions.md index dd2a6bb645e..21a779f3b3c 100644 --- a/docs/api/sql/Raster-Functions.md +++ b/docs/api/sql/Raster-Functions.md @@ -102,6 +102,7 @@ These functions test spatial relationships involving raster objects. | Function | Return type | Description | Since | | :--- | :--- | :--- | :--- | | [RS_Contains](Raster-Predicates/RS_Contains.md) | Boolean | Returns true if the geometry or raster on the left side contains the geometry or raster on the right side. The convex hull of the raster is considered in the test. | v1.5.0 | +| [RS_DWithin](Raster-Predicates/RS_DWithin.md) | Boolean | Returns true if the minimum geodesic distance between the raster or geometry on the left side and the raster or geometry on the right side is at most `distance` meters (inputs are projected to WGS84 before the test; overlap or touch returns 0). The convex hull of the raster is considered in the test. | v1.9.1 | | [RS_Intersects](Raster-Predicates/RS_Intersects.md) | Boolean | Returns true if raster or geometry on the left side intersects with the raster or geometry on the right side. The convex hull of the raster is considered in the test. | v1.5.0 | | [RS_Within](Raster-Predicates/RS_Within.md) | Boolean | Returns true if the geometry or raster on the left side is within the geometry or raster on the right side. The convex hull of the raster is considered in the test. | v1.5.0 | diff --git a/docs/api/sql/Raster-Predicates/RS_DWithin.md b/docs/api/sql/Raster-Predicates/RS_DWithin.md new file mode 100644 index 00000000000..2f9e279757c --- /dev/null +++ b/docs/api/sql/Raster-Predicates/RS_DWithin.md @@ -0,0 +1,70 @@ + + +# RS_DWithin + +Introduction: Returns true if the raster or geometry on the left side is within `distance` **meters** of the raster or geometry on the right side. The convex hull of the raster is considered in the test. At least one of the two shape arguments must be a raster — for the geometry-only case use [`ST_DWithin`](../Predicates/ST_DWithin.md) instead. + +Rules for testing spatial relationship: + +- If the raster or geometry does not have a defined SRID, it is assumed to be in WGS84. +- Both sides are unconditionally projected to WGS84 before the test, so `distance` is **always** measured in meters regardless of the input CRS. +- The per-row test computes the **minimum geodesic distance** between the two shapes — the raster is represented by its convex hull, and the geodesic distance is measured between the closest pair of points on the two hulls (not centroid-to-centroid). Two raster footprints that overlap or touch therefore satisfy `RS_DWithin(a, b, 0)`, mirroring [`RS_Intersects`](RS_Intersects.md). + +When used as a join condition, Sedona plans the join as an optimized distance join (`BroadcastIndexJoinExec` or `DistanceJoinExec`): each side's WGS84 envelope is computed, the distance-side envelope is expanded by `distance` meters using the Haversine polar-radius approximation (the same expansion `ST_DistanceSphere` uses), and the resulting envelopes drive an R-tree filter before the per-row `RS_DWithin` check refines the result. + +Format: + +`RS_DWithin(raster: Raster, geom: Geometry, distance: Double)` + +`RS_DWithin(geom: Geometry, raster: Raster, distance: Double)` + +`RS_DWithin(raster0: Raster, raster1: Raster, distance: Double)` + +Return type: `Boolean` + +Since: `v1.9.1` + +SQL Example + +```sql +SELECT RS_DWithin( + RS_MakeEmptyRaster(1, 20, 20, 2, 22, 1), + ST_SetSRID(ST_PolygonFromEnvelope(30, 30, 40, 40), 4326), + 5000000.0 -- 5 000 km, in meters +) AS within_5000km +``` + +Output: + +``` ++-------------+ +|within_5000km| ++-------------+ +| true| ++-------------+ +``` + +Using `RS_DWithin` as a distance-join condition (`distance` in meters): + +```sql +SELECT r.id, p.id +FROM rasters r +JOIN points p ON RS_DWithin(r.raster, p.geom, 1000) -- within 1 km +``` diff --git a/spark/common/src/main/scala/org/apache/sedona/sql/UDF/Catalog.scala b/spark/common/src/main/scala/org/apache/sedona/sql/UDF/Catalog.scala index 8119fb4d064..f9a41f6021c 100644 --- a/spark/common/src/main/scala/org/apache/sedona/sql/UDF/Catalog.scala +++ b/spark/common/src/main/scala/org/apache/sedona/sql/UDF/Catalog.scala @@ -417,7 +417,11 @@ object Catalog extends AbstractCatalog with Logging { // Raster-Predicates val rasterPredicateExprs: Seq[FunctionDescription] = - Seq(function[RS_Contains](), function[RS_Intersects](), function[RS_Within]()) + Seq( + function[RS_Contains](), + function[RS_DWithin](), + function[RS_Intersects](), + function[RS_Within]()) // Raster-Geometry-Functions (raster → geometry derivations) val rasterGeometryExprs: Seq[FunctionDescription] = diff --git a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/raster/RasterPredicates.scala b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/raster/RasterPredicates.scala index 35aaa276c04..3dc64e3b25b 100644 --- a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/raster/RasterPredicates.scala +++ b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/raster/RasterPredicates.scala @@ -22,10 +22,10 @@ import org.apache.sedona.common.raster.RasterPredicates import org.apache.sedona.sql.utils.{GeometrySerializer, RasterSerializer} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback -import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression} +import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, ImplicitCastInputTypes} import org.apache.spark.sql.sedona_sql.UDT.{GeometryUDT, RasterUDT} import org.apache.spark.sql.sedona_sql.expressions.{FoldableExpression, NullIntolerantShim} -import org.apache.spark.sql.types.{AbstractDataType, BooleanType, DataType} +import org.apache.spark.sql.types.{AbstractDataType, BooleanType, DataType, DoubleType} import org.geotools.coverage.grid.GridCoverage2D import org.locationtech.jts.geom.Geometry @@ -155,3 +155,87 @@ private[apache] case class RS_Within(inputExpressions: Seq[Expression]) copy(inputExpressions = newChildren) } } + +/** + * Distance predicate for rasters: `RS_DWithin(left, right, distance)`. `left` and `right` can + * each be a raster or a geometry (at least one must be a raster). Returns true when the minimum + * geodesic distance between the two shapes is `<= distance` meters — `RasterPredicates.rsDWithin` + * delegates to the Geography `dWithin`, which uses S2's `ClosestEdgeQuery`. Overlapping or + * touching shapes therefore yield distance `0` (consistent with [[RS_Intersects]]), and + * `distance` is always interpreted as meters regardless of the input CRS. This expression is + * recognised by [[org.apache.spark.sql.sedona_sql.strategy.join.JoinQueryDetector]] as a + * distance-join key, so a coarse R-tree filter over WGS84 envelopes expanded by `distance` meters + * via the Haversine polar-radius approximation (the same expansion `ST_DistanceSphere` uses) + * prunes candidates before the per-row check. + */ +private[apache] case class RS_DWithin(inputExpressions: Seq[Expression]) + extends Expression + with FoldableExpression + with ImplicitCastInputTypes + with NullIntolerantShim + with CodegenFallback { + + override def children: Seq[Expression] = inputExpressions + + override def nullable: Boolean = children.exists(_.nullable) + + override def dataType: DataType = BooleanType + + override def toString: String = s" **${getClass.getName}** " + + override def inputTypes: Seq[AbstractDataType] = { + if (inputExpressions.length != 3) { + throw new IllegalArgumentException( + s"RS_DWithin requires exactly 3 inputs, but got ${inputExpressions.length}") + } + val leftType = inputExpressions.head.dataType + val rightType = inputExpressions(1).dataType + (leftType, rightType) match { + case (_: RasterUDT, _: GeometryUDT) => Seq(RasterUDT(), GeometryUDT(), DoubleType) + case (_: GeometryUDT, _: RasterUDT) => Seq(GeometryUDT(), RasterUDT(), DoubleType) + case (_: RasterUDT, _: RasterUDT) => Seq(RasterUDT(), RasterUDT(), DoubleType) + case _ => + throw new IllegalArgumentException( + s"RS_DWithin requires at least one raster input; got: $leftType, $rightType") + } + } + + override def eval(inputRow: InternalRow): Any = { + val leftValue = inputExpressions.head.eval(inputRow) + if (leftValue == null) return null + val rightValue = inputExpressions(1).eval(inputRow) + if (rightValue == null) return null + val distanceValue = inputExpressions(2).eval(inputRow) + if (distanceValue == null) return null + + val leftBytes = leftValue.asInstanceOf[Array[Byte]] + val rightBytes = rightValue.asInstanceOf[Array[Byte]] + val distance = distanceValue.asInstanceOf[Double] + val leftType = inputExpressions.head.dataType + val rightType = inputExpressions(1).dataType + (leftType, rightType) match { + case (_: RasterUDT, _: GeometryUDT) => + RasterPredicates.rsDWithin( + RasterSerializer.deserialize(leftBytes), + GeometrySerializer.deserialize(rightBytes), + distance) + case (_: GeometryUDT, _: RasterUDT) => + RasterPredicates.rsDWithin( + RasterSerializer.deserialize(rightBytes), + GeometrySerializer.deserialize(leftBytes), + distance) + case (_: RasterUDT, _: RasterUDT) => + RasterPredicates.rsDWithin( + RasterSerializer.deserialize(leftBytes), + RasterSerializer.deserialize(rightBytes), + distance) + case _ => + throw new IllegalArgumentException( + s"RS_DWithin requires at least one raster input; got: $leftType, $rightType") + } + } + + protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]) = { + copy(inputExpressions = newChildren) + } +} diff --git a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/BroadcastIndexJoinExec.scala b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/BroadcastIndexJoinExec.scala index e6c0898aa42..c210debb744 100644 --- a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/BroadcastIndexJoinExec.scala +++ b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/BroadcastIndexJoinExec.scala @@ -122,8 +122,7 @@ case class BroadcastIndexJoinExec( case (None, _, false) => s"ST_$spatialPredicate($windowExpression, $objectExpression)" case (None, _, true) => s"RS_$spatialPredicate($windowExpression, $objectExpression)" case (Some(r), _, true) => - throw new UnsupportedOperationException( - "Distance joins are not supported for raster predicates") + s"RS_Distance($windowExpression, $objectExpression) < $r" } override def simpleString(maxFields: Int): String = @@ -321,6 +320,30 @@ case class BroadcastIndexJoinExec( (shape, row) } }) + case Some(distanceExpression) if isRasterPredicate => + val boundDistance = + BindReferences.bindReference(distanceExpression, streamed.output) + streamResultsRaw.map(row => { + val serialized = boundStreamShape.eval(row).asInstanceOf[Array[Byte]] + if (serialized == null) { + (null, row) + } else { + val baseShape = if (boundStreamShape.dataType.isInstanceOf[RasterUDT]) { + val raster = RasterSerializer.deserialize(serialized) + JoinedGeometryRaster.rasterToWGS84Envelope(raster) + } else { + val geom = GeometrySerializer.deserialize(serialized) + JoinedGeometryRaster.geometryToWGS84Envelope(geom) + } + val radius = boundDistance.eval(row).asInstanceOf[Double] + // Treat `radius` as meters; mid-latitude rasters get a tight Haversine bound while + // polar / antimeridian footprints fall back to a world envelope so the R-tree filter + // never excludes pairs the per-row S2 predicate would match. See + // `TraitJoinQueryBase.expandRasterFilterEnvelope` for the shared rule. + val expanded = expandRasterFilterEnvelope(baseShape, radius) + (expanded, row) + } + }) case Some(distanceExpression) => streamResultsRaw.map(row => { val geometry = TraitJoinQueryBase.shapeToGeometry(boundStreamShape, row) diff --git a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/DistanceJoinExec.scala b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/DistanceJoinExec.scala index 425eb1b8a00..51f5b6f98f1 100644 --- a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/DistanceJoinExec.scala +++ b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/DistanceJoinExec.scala @@ -22,8 +22,10 @@ import org.apache.sedona.core.spatialOperator.SpatialPredicate import org.apache.sedona.core.spatialRDD.SpatialRDD import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.expressions.{BindReferences, Expression, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.{BindReferences, Expression, Literal, UnsafeRow} +import org.apache.spark.sql.types.DoubleType import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.sedona_sql.UDT.RasterUDT import org.apache.spark.sql.sedona_sql.execution.SedonaBinaryExecNode import org.locationtech.jts.geom.Geometry @@ -83,7 +85,29 @@ case class DistanceJoinExec( leftShapeExpr: Expression, rightRdd: RDD[UnsafeRow], rightShapeExpr: Expression): (SpatialRDD[Geometry], SpatialRDD[Geometry]) = { - if (distanceBoundToLeft) { + // Raster predicates project both sides to a WGS84 envelope before partitioning so the coarse + // R-tree filter applies regardless of the input CRS — mirroring the non-distance raster join + // (toWGS84EnvelopeRDD) and the broadcast-index raster distance path. + val isRasterPredicate = + leftShapeExpr.dataType.isInstanceOf[RasterUDT] || + rightShapeExpr.dataType.isInstanceOf[RasterUDT] + if (isRasterPredicate) { + // Route both sides through `toExpandedWGS84EnvelopeRDD` so the polar / antimeridian + // world-envelope fallback in `expandRasterFilterEnvelope` applies symmetrically. The side + // that doesn't carry the user-supplied distance still goes through the same path with a + // zero radius — the Haversine expansion collapses to a no-op for ordinary footprints, and + // the fallback ensures problematic ones get a global filter envelope. + val zeroRadius = Literal(0.0, DoubleType) + if (distanceBoundToLeft) { + ( + toExpandedWGS84EnvelopeRDD(leftRdd, leftShapeExpr, boundRadius), + toExpandedWGS84EnvelopeRDD(rightRdd, rightShapeExpr, zeroRadius)) + } else { + ( + toExpandedWGS84EnvelopeRDD(leftRdd, leftShapeExpr, zeroRadius), + toExpandedWGS84EnvelopeRDD(rightRdd, rightShapeExpr, boundRadius)) + } + } else if (distanceBoundToLeft) { ( toExpandedEnvelopeRDD(leftRdd, leftShapeExpr, boundRadius, isGeography), toSpatialRDD(rightRdd, rightShapeExpr)) diff --git a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/JoinQueryDetector.scala b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/JoinQueryDetector.scala index 939cff108c0..10c951c0bed 100644 --- a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/JoinQueryDetector.scala +++ b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/JoinQueryDetector.scala @@ -174,6 +174,32 @@ class JoinQueryDetector(sparkSession: SparkSession) extends SparkStrategy { Some(condition))) } + /** + * Build a [[JoinQueryDetection]] for [[RS_DWithin]]. The coarse spatial join uses an expanded + * WGS84 envelope (driven by `distance`) and falls back to the per-row predicate for refinement, + * matching the pattern used for [[org.apache.spark.sql.sedona_sql.expressions.ST_DWithin]]. + */ + private def getRasterDistanceJoinDetection( + left: LogicalPlan, + right: LogicalPlan, + predicate: RS_DWithin, + extraCondition: Option[Expression] = None): Option[JoinQueryDetection] = { + val leftShape = predicate.inputExpressions.head + val rightShape = predicate.inputExpressions(1) + val distance = predicate.inputExpressions(2) + val condition = extraCondition.map(And(_, predicate)).getOrElse(predicate) + Some( + JoinQueryDetection( + left, + right, + leftShape, + rightShape, + SpatialPredicate.INTERSECTS, + isGeography = false, + extraCondition = Some(condition), + distance = Some(distance))) + } + def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case Join(left, right, joinType, condition, JoinHint(leftHint, rightHint)) if optimizationEnabled(left, right, condition) => @@ -320,6 +346,8 @@ class JoinQueryDetector(sparkSession: SparkSession) extends SparkStrategy { getJoinDetection(left, right, pred, extraCondition) case pred: RS_Predicate => getRasterJoinDetection(left, right, pred, extraCondition) + case pred: RS_DWithin => + getRasterDistanceJoinDetection(left, right, pred, extraCondition) case ST_DWithin(Seq(leftShape, rightShape, distance)) => val geographyShape = isGeographyInput(leftShape) || isGeographyInput(rightShape) @@ -903,9 +931,10 @@ class JoinQueryDetector(sparkSession: SparkSession) extends SparkStrategy { case (None, _, true, _, true) => throw new UnsupportedOperationException( "Geography joins are not yet supported for raster predicates") - case (Some(_), _, _, _, true) => + case (Some(_), _, true, _, true) => throw new UnsupportedOperationException( - "Distance joins are not supported for raster predicates") + "Geography distance joins are not yet supported for raster predicates") + case (Some(_), _, false, _, true) => "RS_DWithin" } val (distanceOnIndexSide, distanceOnStreamSide) = distance .map { distanceExpr => diff --git a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/OptimizableJoinCondition.scala b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/OptimizableJoinCondition.scala index c9e9b0e0a1b..c31de5eac5d 100644 --- a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/OptimizableJoinCondition.scala +++ b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/OptimizableJoinCondition.scala @@ -21,7 +21,7 @@ package org.apache.spark.sql.sedona_sql.strategy.join import org.apache.spark.sql.catalyst.expressions.{And, Expression, LessThan, LessThanOrEqual, Literal} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.sedona_sql.expressions._ -import org.apache.spark.sql.sedona_sql.expressions.raster.RS_Predicate +import org.apache.spark.sql.sedona_sql.expressions.raster.{RS_DWithin, RS_Predicate} import org.apache.spark.sql.sedona_sql.optimization.ExpressionUtils case class OptimizableJoinCondition(left: LogicalPlan, right: LogicalPlan) { @@ -73,6 +73,8 @@ case class OptimizableJoinCondition(left: LogicalPlan, right: LogicalPlan) { case ST_DWithin(Seq(leftShape, rightShape, distance, useSpheroid)) => useSpheroid .isInstanceOf[Literal] && isDistanceJoinOptimizable(leftShape, rightShape, distance) + case RS_DWithin(Seq(leftShape, rightShape, distance)) => + isDistanceJoinOptimizable(leftShape, rightShape, distance) case _: LessThan | _: LessThanOrEqual => val (smaller, larger) = (expression.children.head, expression.children(1)) diff --git a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/SpatialIndexExec.scala b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/SpatialIndexExec.scala index c3c5c0392ba..097c0efd857 100644 --- a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/SpatialIndexExec.scala +++ b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/SpatialIndexExec.scala @@ -61,6 +61,11 @@ case class SpatialIndexExec( resultRaw, boundShape, BindReferences.bindReference(distanceExpression, child.output)) + case Some(distanceExpression) if isRasterPredicate => + toExpandedWGS84EnvelopeRDD( + resultRaw, + boundShape, + BindReferences.bindReference(distanceExpression, child.output)) case Some(distanceExpression) => toExpandedEnvelopeRDD( resultRaw, diff --git a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/TraitJoinQueryBase.scala b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/TraitJoinQueryBase.scala index 5b86940b1c5..5039fd0acfc 100644 --- a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/TraitJoinQueryBase.scala +++ b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/TraitJoinQueryBase.scala @@ -138,6 +138,66 @@ trait TraitJoinQueryBase { spatialRdd } + /** + * Raster variant of [[toExpandedEnvelopeRDD]] for distance joins with raster predicates. Each + * row's shape (raster or geometry) is first projected to a WGS84 envelope (matching the + * non-distance raster path in [[toWGS84EnvelopeRDD]]) and then expanded by `boundRadius` meters + * using the Haversine polar-radius approximation — the same envelope expansion the + * `ST_DistanceSphere` distance-join uses. Treating `boundRadius` as meters everywhere (rather + * than the input CRS's native unit) keeps the coarse R-tree filter and the per-row `RS_DWithin` + * predicate aligned on a single unit. + * + * Polar and antimeridian-crossing rasters already span half or more of the globe in their + * planar WGS84 envelope; for those rows the helper substitutes a global envelope so the R-tree + * filter pairs them with every counterpart and the per-row S2 predicate produces the final + * answer. Mid-latitude rasters retain the tight Haversine bound. + */ + def toExpandedWGS84EnvelopeRDD( + rdd: RDD[UnsafeRow], + shapeExpression: Expression, + boundRadius: Expression): SpatialRDD[Geometry] = { + val spatialRdd = new SpatialRDD[Geometry] + val expandedRdd = rdd.map { row => + val serialized = shapeExpression.eval(row).asInstanceOf[Array[Byte]] + val baseShape = + if (serialized == null) { + new GeometryFactory().createGeometryCollection() + } else if (shapeExpression.dataType.isInstanceOf[RasterUDT]) { + val raster = RasterSerializer.deserialize(serialized) + JoinedGeometryRaster.rasterToWGS84Envelope(raster) + } else { + val geom = GeometrySerializer.deserialize(serialized) + JoinedGeometryRaster.geometryToWGS84Envelope(geom) + } + val distance = boundRadius.eval(row).asInstanceOf[Double] + val expanded = expandRasterFilterEnvelope(baseShape, distance) + expanded.setUserData(row.copy) + expanded + } + spatialRdd.setRawSpatialRDD(expandedRdd) + spatialRdd + } + + /** + * Compute the R-tree filter envelope for a single side of a raster distance join. Mid-latitude + * / single-hemisphere rasters get a Haversine-expanded envelope (tight, optimization-friendly). + * Footprints whose planar WGS84 envelope already spans more than half the globe in longitude or + * grazes a pole fall back to a world envelope so the coarse filter never excludes a true match + * — the per-row S2 predicate decides correctness for those rows. + */ + private[join] def expandRasterFilterEnvelope( + baseShape: Geometry, + distance: Double): Geometry = { + val envelope = baseShape.getEnvelopeInternal + val touchesPole = envelope.getMaxY >= 90.0 || envelope.getMinY <= -90.0 + val tooWide = envelope.getWidth > 180.0 + if (touchesPole || tooWide) { + baseShape.getFactory.toGeometry(new org.locationtech.jts.geom.Envelope(-180, 180, -90, 90)) + } else { + JoinedGeometry.geometryToExpandedEnvelope(baseShape, distance, isGeography = true) + } + } + /** * Geography variant of [[toExpandedEnvelopeRDD]]. Each row becomes a JTS geometry whose * envelope is the Geography's lat/lng bounding rectangle expanded by `boundRadius` meters using diff --git a/spark/common/src/test/scala/org/apache/sedona/sql/BroadcastIndexJoinSuite.scala b/spark/common/src/test/scala/org/apache/sedona/sql/BroadcastIndexJoinSuite.scala index 475c679eb85..747405031e8 100644 --- a/spark/common/src/test/scala/org/apache/sedona/sql/BroadcastIndexJoinSuite.scala +++ b/spark/common/src/test/scala/org/apache/sedona/sql/BroadcastIndexJoinSuite.scala @@ -641,6 +641,48 @@ class BroadcastIndexJoinSuite extends TestBaseScala { assert(buildingsDf.count() == resultLeftBroadcast) } + it("Passed RS_DWithin") { + val rasterDf = buildRasterDf.repartition(3) + val buildingsDf = buildBuildingsDf.repartition(5) + // `RS_DWithin` measures the minimum geodesic distance in meters; the global raster + // contains every building's location, so the minimum distance is 0 and any non-negative + // threshold matches. A tiny threshold is enough to verify the join plan. + val distance = 1.0 + + // raster on stream side, geometry broadcast + val streamRasterRightBroadcast = rasterDf + .alias("rasterDf") + .join( + broadcast(buildingsDf).alias("buildingsDf"), + expr(s"RS_DWithin(rasterDf.raster, buildingsDf.building, $distance)")) + assert(streamRasterRightBroadcast.queryExecution.sparkPlan.collect { + case p: BroadcastIndexJoinExec => p + }.size == 1) + // Buildings sit inside the global raster, so all of them are within any positive distance. + assert(buildingsDf.count() == streamRasterRightBroadcast.count()) + + // raster broadcast, geometry on stream side + val rasterBroadcast = broadcast(rasterDf.alias("rasterDf")) + .join( + buildingsDf.alias("buildingsDf"), + expr(s"RS_DWithin(rasterDf.raster, buildingsDf.building, $distance)")) + assert(rasterBroadcast.queryExecution.sparkPlan.collect { case p: BroadcastIndexJoinExec => + p + }.size == 1) + assert(buildingsDf.count() == rasterBroadcast.count()) + + // geometry first arg, raster second arg — operand order swapped + val geomFirst = buildingsDf + .alias("buildingsDf") + .join( + broadcast(rasterDf).alias("rasterDf"), + expr(s"RS_DWithin(buildingsDf.building, rasterDf.raster, $distance)")) + assert(geomFirst.queryExecution.sparkPlan.collect { case p: BroadcastIndexJoinExec => + p + }.size == 1) + assert(buildingsDf.count() == geomFirst.count()) + } + it("Passed RS_Within") { val smallRasterDf1 = buildSmallRasterDf.repartition(3) val smallRasterDf2 = diff --git a/spark/common/src/test/scala/org/apache/sedona/sql/RasterJoinSuite.scala b/spark/common/src/test/scala/org/apache/sedona/sql/RasterJoinSuite.scala index 6ee02996dd0..2c88a1129a9 100644 --- a/spark/common/src/test/scala/org/apache/sedona/sql/RasterJoinSuite.scala +++ b/spark/common/src/test/scala/org/apache/sedona/sql/RasterJoinSuite.scala @@ -20,7 +20,7 @@ package org.apache.sedona.sql import org.apache.sedona.common.raster.{RasterConstructors, RasterPredicates} import org.apache.spark.sql.DataFrame -import org.apache.spark.sql.sedona_sql.strategy.join.{BroadcastIndexJoinExec, RangeJoinExec} +import org.apache.spark.sql.sedona_sql.strategy.join.{BroadcastIndexJoinExec, DistanceJoinExec, RangeJoinExec} import org.geotools.coverage.grid.GridCoverage2D import org.locationtech.jts.geom.Geometry import org.locationtech.jts.io.WKTReader @@ -308,8 +308,73 @@ class RasterJoinSuite extends TestBaseScala with TableDrivenPropertyChecks { } private def isUsingOptimizedSpatialJoin(df: DataFrame): Boolean = { - df.queryExecution.executedPlan.collect { case _: BroadcastIndexJoinExec | _: RangeJoinExec => - true + df.queryExecution.executedPlan.collect { + case _: BroadcastIndexJoinExec | _: RangeJoinExec | _: DistanceJoinExec => + true }.nonEmpty } + + describe("RS_DWithin distance join") { + // Reuse the shared `rasters` / `geometries` set used by the RS_Intersects table above so the + // distance-join path is exercised against the same polar / antimeridian-crossing / no-CRS + // inputs. The world-envelope fallback in `TraitJoinQueryBase.expandRasterFilterEnvelope` + // guarantees the R-tree filter never excludes a pair that the per-row S2 predicate would + // match for those footprints. + val testDistance = 100000.0 + + def expectedDWithinPairs: Seq[(Int, Int)] = + rasters.flatMap { case (rast, rastId) => + geometries.flatMap { case (geom, geomId) => + if (RasterPredicates.rsDWithin(rast, geom, testDistance)) Some((rastId, geomId)) + else None + } + } + + it("non-broadcast distance join, raster-left geometry-right, left dominant") { + withConf(Map(spatialJoinPartitionSideConfKey -> "left")) { + val result = sparkSession.sql( + s"SELECT df1.id, df2.id FROM df1 JOIN df2 ON RS_DWithin(df1.rast, df2.geom, $testDistance)") + assert(result.queryExecution.executedPlan.collect { case p: DistanceJoinExec => + p + }.nonEmpty) + verifyResult(expectedDWithinPairs, result) + } + } + + it("non-broadcast distance join, raster-left geometry-right, right dominant") { + withConf(Map(spatialJoinPartitionSideConfKey -> "right")) { + val result = sparkSession.sql( + s"SELECT df1.id, df2.id FROM df1 JOIN df2 ON RS_DWithin(df1.rast, df2.geom, $testDistance)") + assert(result.queryExecution.executedPlan.collect { case p: DistanceJoinExec => + p + }.nonEmpty) + verifyResult(expectedDWithinPairs, result) + } + } + + it("non-broadcast distance join, geometry-left raster-right (swapped operands)") { + val result = sparkSession.sql( + s"SELECT df1.id, df2.id FROM df2 JOIN df1 ON RS_DWithin(df2.geom, df1.rast, $testDistance)") + assert(result.queryExecution.executedPlan.collect { case p: DistanceJoinExec => + p + }.nonEmpty) + verifyResult(expectedDWithinPairs, result) + } + + it("non-broadcast distance join, raster-raster") { + val expected = rasters.flatMap { case (rast, id1) => + rasters.flatMap { case (otherRast, id2) => + if (RasterPredicates.rsDWithin(rast, otherRast, testDistance)) Some((id1, id2)) + else None + } + } + val result = sparkSession.sql( + s"SELECT df1.id, df3.id FROM df1 JOIN df3 ON RS_DWithin(df1.rast, df3.rast, $testDistance)") + assert(result.queryExecution.executedPlan.collect { case p: DistanceJoinExec => + p + }.nonEmpty) + val actual = result.collect().map(row => (row.getInt(0), row.getInt(1))).sorted + assert(actual === expected.sorted) + } + } }