From 2d8261278bb29c515517971c6fc2ddaeaaf80fda Mon Sep 17 00:00:00 2001 From: Natalie Diaz Date: Wed, 4 Mar 2026 14:58:46 -0800 Subject: [PATCH 1/4] Update facet id generation to match mixer --- pipeline/data/pom.xml | 8 +- .../datacommons/ingestion/data/DataUtils.java | 67 ++++++++++++ .../ingestion/data/Observation.java | 6 +- .../ingestion/data/DataUtilsTest.java | 100 ++++++++++++++++++ .../ingestion/util/CacheReaderTest.java | 25 +++-- 5 files changed, 191 insertions(+), 15 deletions(-) create mode 100644 pipeline/data/src/main/java/org/datacommons/ingestion/data/DataUtils.java create mode 100644 pipeline/data/src/test/java/org/datacommons/ingestion/data/DataUtilsTest.java diff --git a/pipeline/data/pom.xml b/pipeline/data/pom.xml index c43e68f4..0d1b53ee 100644 --- a/pipeline/data/pom.xml +++ b/pipeline/data/pom.xml @@ -43,10 +43,16 @@ protobuf-java ${protobuf.java.version} - + com.google.cloud google-cloud-storage + + junit + junit + ${junit.version} + test + diff --git a/pipeline/data/src/main/java/org/datacommons/ingestion/data/DataUtils.java b/pipeline/data/src/main/java/org/datacommons/ingestion/data/DataUtils.java new file mode 100644 index 00000000..9ed3227c --- /dev/null +++ b/pipeline/data/src/main/java/org/datacommons/ingestion/data/DataUtils.java @@ -0,0 +1,67 @@ +package org.datacommons.ingestion.data; + +import com.google.common.base.Joiner; +import java.nio.charset.StandardCharsets; + +/** Util functions for the pipeline data model. */ +public class DataUtils { + + // Standard FNV-1a 32-bit constants + private static final int FNV_32_INIT = 0x811c9dc5; + private static final int FNV_32_PRIME = 0x01000193; + + /** + * Generates a consistent facet ID using the FNV-1a 32-bit hash algorithm. + * + *

This is designed to replicate the legacy Go facet ID generation implementation in Mixer's + * GetFacetID function. See + * https://github.com/datacommonsorg/mixer/blob/0618c1f3ef80703c98fc97f6c6c6e5cd3d7c13d3/internal/util/util.go#L497-L515 + * + * @param importName The name of the import this observation belongs to. + * @param measurementMethod The measurement method of the observation. + * @param observationPeriod The observation period of the observation. + * @param scalingFactor The scaling factor of the observation. + * @param unit The unit of the observation. + * @param isDcAggregate Whether the observation is a DC aggregate. + * @return A consistent facet ID string. + */ + public static String generateFacetId( + String importName, + String measurementMethod, + String observationPeriod, + String scalingFactor, + String unit, + boolean isDcAggregate) { + // Only include fields that are set in hash. + // This is so the hashes stay consistent if more fields are added. + String s = + Joiner.on("-").join(importName, measurementMethod, observationPeriod, scalingFactor, unit); + if (isDcAggregate) { + s += "-IsDcAggregate"; + } + + int hash = fnv1a32(s); + + // Go's fmt.Sprint on a uint32 treats it as unsigned. + // We must do the same in Java to avoid negative string values. + return Integer.toUnsignedString(hash); + } + + /** + * Computes the 32-bit FNV-1a hash of a string. + * + *

Note: Java does not provide a built-in FNV-1a implementation, so we implement it manually + * here. + * + * @param data The input string to hash. + * @return The FNV-1a 32-bit hash as an integer. + */ + private static int fnv1a32(String data) { + int hash = FNV_32_INIT; + for (byte b : data.getBytes(StandardCharsets.UTF_8)) { + hash ^= (b & 0xff); // Bitwise XOR with the unsigned byte value + hash *= FNV_32_PRIME; + } + return hash; + } +} diff --git a/pipeline/data/src/main/java/org/datacommons/ingestion/data/Observation.java b/pipeline/data/src/main/java/org/datacommons/ingestion/data/Observation.java index cf046947..3b4cc79e 100644 --- a/pipeline/data/src/main/java/org/datacommons/ingestion/data/Observation.java +++ b/pipeline/data/src/main/java/org/datacommons/ingestion/data/Observation.java @@ -270,11 +270,9 @@ public Builder provenanceUrl(String provenanceUrl) { } public Observation build() { - int intHash = - Objects.hash( + this.facetId = + DataUtils.generateFacetId( importName, measurementMethod, observationPeriod, scalingFactor, unit, isDcAggregate); - // Convert to positive long and then to string - this.facetId = String.valueOf((long) intHash & 0x7fffffffL); return new Observation(this); } } diff --git a/pipeline/data/src/test/java/org/datacommons/ingestion/data/DataUtilsTest.java b/pipeline/data/src/test/java/org/datacommons/ingestion/data/DataUtilsTest.java new file mode 100644 index 00000000..1fe82916 --- /dev/null +++ b/pipeline/data/src/test/java/org/datacommons/ingestion/data/DataUtilsTest.java @@ -0,0 +1,100 @@ +package org.datacommons.ingestion.data; + +import static org.junit.Assert.assertEquals; + +import java.util.Arrays; +import java.util.Collection; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +@RunWith(Parameterized.class) +public class DataUtilsTest { + + private final String expectedId; + private final String importName; + private final String measurementMethod; + private final String observationPeriod; + private final String scalingFactor; + private final String unit; + private final boolean isDcAggregate; + + public DataUtilsTest( + String expectedId, + String importName, + String measurementMethod, + String observationPeriod, + String scalingFactor, + String unit, + boolean isDcAggregate) { + this.expectedId = expectedId; + this.importName = importName; + this.measurementMethod = measurementMethod; + this.observationPeriod = observationPeriod; + this.scalingFactor = scalingFactor; + this.unit = unit; + this.isDcAggregate = isDcAggregate; + } + + // This method provides the data for the test below + @Parameters(name = "Test {index}: expected {0} for {1}") + public static Collection data() { + return Arrays.asList( + new Object[][] { + // Format: expectedId, importName, measurementMethod, observationPeriod, scalingFactor, + // unit, isDcAggregate + {"3981252704", "WorldDevelopmentIndicators", "", "P1Y", "", "", false}, + { + "10983471", + "CensusACS5YearSurvey_SubjectTables_S2601A", + "CensusACS5yrSurveySubjectTable", + "", + "", + "", + false + }, + {"2825511676", "CDC_Mortality_UnderlyingCause", "", "", "", "", false}, + {"1226172227", "CensusACS1YearSurvey", "CensusACS1yrSurvey", "", "", "", false}, + {"2176550201", "USCensusPEP_Annual_Population", "CensusPEPSurvey", "P1Y", "", "", false}, + {"2645850372", "CensusACS5YearSurvey_AggCountry", "CensusACS5yrSurvey", "", "", "", true}, + { + "1541763368", + "USDecennialCensus_RedistrictingRelease", + "USDecennialCensus", + "", + "", + "", + false + }, + { + "4181918134", + "OECDRegionalDemography_Population", + "OECDRegionalStatistics", + "P1Y", + "", + "", + false + }, + { + "1964317807", + "CensusACS5YearSurvey_SubjectTables_S0101", + "CensusACS5yrSurveySubjectTable", + "", + "", + "", + false + }, + {"2517965213", "CensusPEP", "CensusPEPSurvey", "", "", "", false} + }); + } + + @Test + public void testGenerateFacetId() { + String facetId = + DataUtils.generateFacetId( + importName, measurementMethod, observationPeriod, scalingFactor, unit, isDcAggregate); + + assertEquals(expectedId, facetId); + } +} diff --git a/pipeline/util/src/test/java/org/datacommons/ingestion/util/CacheReaderTest.java b/pipeline/util/src/test/java/org/datacommons/ingestion/util/CacheReaderTest.java index 636fd7b7..5a913365 100644 --- a/pipeline/util/src/test/java/org/datacommons/ingestion/util/CacheReaderTest.java +++ b/pipeline/util/src/test/java/org/datacommons/ingestion/util/CacheReaderTest.java @@ -206,40 +206,45 @@ public void testParseTimeSeriesRow() { new NodesEdges() .addNode( Node.builder() - .subjectId("dc/os/Mean_PrecipitableWater_Atmosphere_geoId_sch2915390_870755137") - .value("dc/os/Mean_PrecipitableWater_Atmosphere_geoId_sch2915390_870755137") - .name("Mean_PrecipitableWater_Atmosphere | geoId/sch2915390 | 870755137") + .subjectId( + "dc/os/Mean_PrecipitableWater_Atmosphere_geoId_sch2915390_4134842815") + .value("dc/os/Mean_PrecipitableWater_Atmosphere_geoId_sch2915390_4134842815") + .name("Mean_PrecipitableWater_Atmosphere | geoId/sch2915390 | 4134842815") .types(List.of("StatVarObsSeries")) .build()) .addNode( Node.builder() - .subjectId("jVWNIHt73yOspqKD0fnvTCH8GCW7m38F3gW+JB+aWms=") - .value("Mean_PrecipitableWater_Atmosphere | geoId/sch2915390 | 870755137") + .subjectId("jKdXZgRFpeibUoXpgItXgC+oCoPMFsqP5UFyqNJ+Xss=") + .value("Mean_PrecipitableWater_Atmosphere | geoId/sch2915390 | 4134842815") .build()) .addEdge( Edge.builder() - .subjectId("dc/os/Mean_PrecipitableWater_Atmosphere_geoId_sch2915390_870755137") + .subjectId( + "dc/os/Mean_PrecipitableWater_Atmosphere_geoId_sch2915390_4134842815") .predicate("variableMeasured") .objectId("Mean_PrecipitableWater_Atmosphere") .provenance("dc/base/NOAA_GFS_WeatherForecast") .build()) .addEdge( Edge.builder() - .subjectId("dc/os/Mean_PrecipitableWater_Atmosphere_geoId_sch2915390_870755137") + .subjectId( + "dc/os/Mean_PrecipitableWater_Atmosphere_geoId_sch2915390_4134842815") .predicate("observationAbout") .objectId("geoId/sch2915390") .provenance("dc/base/NOAA_GFS_WeatherForecast") .build()) .addEdge( Edge.builder() - .subjectId("dc/os/Mean_PrecipitableWater_Atmosphere_geoId_sch2915390_870755137") + .subjectId( + "dc/os/Mean_PrecipitableWater_Atmosphere_geoId_sch2915390_4134842815") .predicate("name") - .objectId("jVWNIHt73yOspqKD0fnvTCH8GCW7m38F3gW+JB+aWms=") + .objectId("jKdXZgRFpeibUoXpgItXgC+oCoPMFsqP5UFyqNJ+Xss=") .provenance("dc/base/NOAA_GFS_WeatherForecast") .build()) .addEdge( Edge.builder() - .subjectId("dc/os/Mean_PrecipitableWater_Atmosphere_geoId_sch2915390_870755137") + .subjectId( + "dc/os/Mean_PrecipitableWater_Atmosphere_geoId_sch2915390_4134842815") .predicate("typeOf") .objectId("StatVarObsSeries") .provenance("dc/base/NOAA_GFS_WeatherForecast") From 42cd45a2e8b550baeaa00be0d5e1483856a0a12e Mon Sep 17 00:00:00 2001 From: Natalie Diaz Date: Wed, 4 Mar 2026 15:23:09 -0800 Subject: [PATCH 2/4] gemini --- .../datacommons/ingestion/data/DataUtils.java | 25 +++++------ .../ingestion/data/Observation.java | 44 +++++++++++-------- .../ingestion/data/DataUtilsTest.java | 14 ++++-- 3 files changed, 45 insertions(+), 38 deletions(-) diff --git a/pipeline/data/src/main/java/org/datacommons/ingestion/data/DataUtils.java b/pipeline/data/src/main/java/org/datacommons/ingestion/data/DataUtils.java index 9ed3227c..02ec744c 100644 --- a/pipeline/data/src/main/java/org/datacommons/ingestion/data/DataUtils.java +++ b/pipeline/data/src/main/java/org/datacommons/ingestion/data/DataUtils.java @@ -17,26 +17,21 @@ public class DataUtils { * GetFacetID function. See * https://github.com/datacommonsorg/mixer/blob/0618c1f3ef80703c98fc97f6c6c6e5cd3d7c13d3/internal/util/util.go#L497-L515 * - * @param importName The name of the import this observation belongs to. - * @param measurementMethod The measurement method of the observation. - * @param observationPeriod The observation period of the observation. - * @param scalingFactor The scaling factor of the observation. - * @param unit The unit of the observation. - * @param isDcAggregate Whether the observation is a DC aggregate. + * @param builder The Observation builder containing the fields to hash. * @return A consistent facet ID string. */ - public static String generateFacetId( - String importName, - String measurementMethod, - String observationPeriod, - String scalingFactor, - String unit, - boolean isDcAggregate) { + public static String generateFacetId(Observation.Builder builder) { // Only include fields that are set in hash. // This is so the hashes stay consistent if more fields are added. String s = - Joiner.on("-").join(importName, measurementMethod, observationPeriod, scalingFactor, unit); - if (isDcAggregate) { + Joiner.on("-") + .join( + builder.getImportName(), + builder.getMeasurementMethod(), + builder.getObservationPeriod(), + builder.getScalingFactor(), + builder.getUnit()); + if (builder.getIsDcAggregate()) { s += "-IsDcAggregate"; } diff --git a/pipeline/data/src/main/java/org/datacommons/ingestion/data/Observation.java b/pipeline/data/src/main/java/org/datacommons/ingestion/data/Observation.java index 3b4cc79e..79e693d0 100644 --- a/pipeline/data/src/main/java/org/datacommons/ingestion/data/Observation.java +++ b/pipeline/data/src/main/java/org/datacommons/ingestion/data/Observation.java @@ -184,22 +184,6 @@ public boolean equals(Object o) { && Objects.equals(isDcAggregate, that.isDcAggregate); } - @Override - public int hashCode() { - return Objects.hash( - observationAbout, - variableMeasured, - observations, - observationPeriod, - measurementMethod, - unit, - scalingFactor, - importName, - provenanceUrl, - facetId, - isDcAggregate); - } - // Builder for Observation public static class Builder { private String observationAbout = ""; @@ -269,10 +253,32 @@ public Builder provenanceUrl(String provenanceUrl) { return this; } + public String getImportName() { + return importName; + } + + public String getMeasurementMethod() { + return measurementMethod; + } + + public String getObservationPeriod() { + return observationPeriod; + } + + public String getScalingFactor() { + return scalingFactor; + } + + public String getUnit() { + return unit; + } + + public boolean getIsDcAggregate() { + return isDcAggregate; + } + public Observation build() { - this.facetId = - DataUtils.generateFacetId( - importName, measurementMethod, observationPeriod, scalingFactor, unit, isDcAggregate); + this.facetId = DataUtils.generateFacetId(this); return new Observation(this); } } diff --git a/pipeline/data/src/test/java/org/datacommons/ingestion/data/DataUtilsTest.java b/pipeline/data/src/test/java/org/datacommons/ingestion/data/DataUtilsTest.java index 1fe82916..f3d54cc8 100644 --- a/pipeline/data/src/test/java/org/datacommons/ingestion/data/DataUtilsTest.java +++ b/pipeline/data/src/test/java/org/datacommons/ingestion/data/DataUtilsTest.java @@ -38,7 +38,7 @@ public DataUtilsTest( } // This method provides the data for the test below - @Parameters(name = "Test {index}: expected {0} for {1}") + @Parameters(name = "Test {index}: expected {0} for {index}") public static Collection data() { return Arrays.asList( new Object[][] { @@ -91,9 +91,15 @@ public static Collection data() { @Test public void testGenerateFacetId() { - String facetId = - DataUtils.generateFacetId( - importName, measurementMethod, observationPeriod, scalingFactor, unit, isDcAggregate); + Observation.Builder builder = + Observation.builder() + .importName(importName) + .measurementMethod(measurementMethod) + .observationPeriod(observationPeriod) + .scalingFactor(scalingFactor) + .unit(unit) + .isDcAggregate(isDcAggregate); + String facetId = DataUtils.generateFacetId(builder); assertEquals(expectedId, facetId); } From 28bc2ba7ba25c1274aa1a0657cff09579d480902 Mon Sep 17 00:00:00 2001 From: Natalie Diaz Date: Wed, 4 Mar 2026 15:24:34 -0800 Subject: [PATCH 3/4] clean --- .../test/java/org/datacommons/ingestion/data/DataUtilsTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipeline/data/src/test/java/org/datacommons/ingestion/data/DataUtilsTest.java b/pipeline/data/src/test/java/org/datacommons/ingestion/data/DataUtilsTest.java index f3d54cc8..5d81e9cc 100644 --- a/pipeline/data/src/test/java/org/datacommons/ingestion/data/DataUtilsTest.java +++ b/pipeline/data/src/test/java/org/datacommons/ingestion/data/DataUtilsTest.java @@ -38,7 +38,7 @@ public DataUtilsTest( } // This method provides the data for the test below - @Parameters(name = "Test {index}: expected {0} for {index}") + @Parameters(name = "Test {index}: expected {0}") public static Collection data() { return Arrays.asList( new Object[][] { From 5bdecc19e37dad7360b7f9c6edc89d3eed6d6ad9 Mon Sep 17 00:00:00 2001 From: Natalie Diaz Date: Wed, 4 Mar 2026 16:16:40 -0800 Subject: [PATCH 4/4] gemini --- .../datacommons/ingestion/data/Observation.java | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/pipeline/data/src/main/java/org/datacommons/ingestion/data/Observation.java b/pipeline/data/src/main/java/org/datacommons/ingestion/data/Observation.java index 79e693d0..1ca7d43e 100644 --- a/pipeline/data/src/main/java/org/datacommons/ingestion/data/Observation.java +++ b/pipeline/data/src/main/java/org/datacommons/ingestion/data/Observation.java @@ -184,6 +184,22 @@ public boolean equals(Object o) { && Objects.equals(isDcAggregate, that.isDcAggregate); } + @Override + public int hashCode() { + return Objects.hash( + observationAbout, + variableMeasured, + observations, + observationPeriod, + measurementMethod, + unit, + scalingFactor, + importName, + provenanceUrl, + facetId, + isDcAggregate); + } + // Builder for Observation public static class Builder { private String observationAbout = "";