Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pipeline/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
<module>differ</module>
<module>ingestion</module>
<module>spanner</module>
<module>timeseries-backfill</module>
<module>util</module>
</modules>

Expand Down
280 changes: 280 additions & 0 deletions pipeline/timeseries-backfill/README.md

Large diffs are not rendered by default.

114 changes: 114 additions & 0 deletions pipeline/timeseries-backfill/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>org.datacommons</groupId>
<artifactId>pipeline</artifactId>
<version>${revision}</version>
</parent>

<groupId>org.datacommons</groupId>
<artifactId>timeseries-backfill</artifactId>
<version>${revision}</version>
<name>Data Commons - Timeseries Backfill</name>

<dependencies>
<dependency>
<artifactId>spanner</artifactId>
<groupId>org.datacommons</groupId>
<version>${revision}</version>
</dependency>
<dependency>
<artifactId>data</artifactId>
<groupId>org.datacommons</groupId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-extensions-avro</artifactId>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-spanner</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-jdk14</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<configuration>
<skip>false</skip>
<cleanupDaemonThreads>false</cleanupDaemonThreads>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.5.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<finalName>${project.artifactId}-bundled-${project.version}</finalName>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.datacommons.ingestion.timeseries.TimeseriesBackfillPipeline</mainClass>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package org.datacommons.ingestion.timeseries;

import com.google.cloud.Timestamp;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.LongSupplier;
import org.slf4j.Logger;

/** Emits local progress logs for validator and DirectRunner runs. */
final class LocalProgressTracker implements AutoCloseable {
interface LogSink {
void info(String message);
}

private final String runnerName;
private final int progressEverySourceRows;
private final int heartbeatSeconds;
private final LogSink logSink;
private final LongSupplier currentTimeMillis;
private final ScheduledExecutorService heartbeatExecutor;
private final AtomicLong sourceRows;
private final AtomicLong timeSeriesRows;
private final AtomicLong timeSeriesAttributeRows;
private final AtomicLong statVarObservationRows;
private final AtomicLong lastRowProgressMillis;
private final AtomicBoolean closed;

LocalProgressTracker(
String runnerName, int progressEverySourceRows, int heartbeatSeconds, Logger logger) {
this(
runnerName,
progressEverySourceRows,
heartbeatSeconds,
logger::info,
System::currentTimeMillis,
createHeartbeatExecutor(runnerName, heartbeatSeconds));
}

LocalProgressTracker(
String runnerName,
int progressEverySourceRows,
int heartbeatSeconds,
LogSink logSink,
LongSupplier currentTimeMillis,
ScheduledExecutorService heartbeatExecutor) {
this.runnerName = runnerName;
this.progressEverySourceRows = progressEverySourceRows;
this.heartbeatSeconds = heartbeatSeconds;
this.logSink = logSink;
this.currentTimeMillis = currentTimeMillis;
this.heartbeatExecutor = heartbeatExecutor;
this.sourceRows = new AtomicLong();
this.timeSeriesRows = new AtomicLong();
this.timeSeriesAttributeRows = new AtomicLong();
this.statVarObservationRows = new AtomicLong();
this.lastRowProgressMillis = new AtomicLong(currentTimeMillis.getAsLong());
this.closed = new AtomicBoolean();

if (heartbeatExecutor != null) {
heartbeatExecutor.scheduleAtFixedRate(
this::logHeartbeatNow, heartbeatSeconds, heartbeatSeconds, TimeUnit.SECONDS);
}
}

boolean isEnabled() {
return progressEverySourceRows > 0 || heartbeatSeconds > 0;
}

void recordRow(int timeSeriesAttributeRowsDelta, int statVarObservationRowsDelta) {
long sourceRowCount = sourceRows.incrementAndGet();
timeSeriesRows.incrementAndGet();
timeSeriesAttributeRows.addAndGet(timeSeriesAttributeRowsDelta);
statVarObservationRows.addAndGet(statVarObservationRowsDelta);
lastRowProgressMillis.set(currentTimeMillis.getAsLong());

if (progressEverySourceRows > 0 && sourceRowCount % progressEverySourceRows == 0) {
logSink.info(buildProgressMessage("progress"));
}
}

void recordValidatorFlush(int mutationCount, Timestamp writeTimestamp) {
logSink.info(
runnerName + " flush: mutations=" + mutationCount + ", write_timestamp=" + writeTimestamp);
}

void logHeartbeatNow() {
if (heartbeatSeconds <= 0 || closed.get()) {
return;
}
logSink.info(buildProgressMessage("heartbeat"));
}

@Override
public void close() {
if (!closed.compareAndSet(false, true)) {
return;
}
if (heartbeatExecutor != null) {
heartbeatExecutor.shutdownNow();
}
}

private String buildProgressMessage(String kind) {
long sourceRowCount = sourceRows.get();
long idleSeconds =
TimeUnit.MILLISECONDS.toSeconds(
currentTimeMillis.getAsLong() - lastRowProgressMillis.get());
StringBuilder message =
new StringBuilder()
.append(runnerName)
.append(' ')
.append(kind)
.append(": source_rows=")
.append(sourceRowCount)
.append(", timeseries_rows=")
.append(timeSeriesRows.get())
.append(", timeseries_attribute_rows=")
.append(timeSeriesAttributeRows.get())
.append(", stat_var_observation_rows=")
.append(statVarObservationRows.get())
.append(", seconds_since_last_row=")
.append(idleSeconds);
if (sourceRowCount == 0) {
message.append(", no_source_rows_yet=true");
}
return message.toString();
}

private static ScheduledExecutorService createHeartbeatExecutor(
String runnerName, int heartbeatSeconds) {
if (heartbeatSeconds <= 0) {
return null;
}
ThreadFactory threadFactory =
runnable -> {
Thread thread = new Thread(runnable, runnerName + "-progress-heartbeat");
thread.setDaemon(true);
return thread;
};
return Executors.newSingleThreadScheduledExecutor(threadFactory);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package org.datacommons.ingestion.timeseries;

import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import java.io.IOException;
import java.io.Reader;
import java.nio.channels.Channels;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.ResourceId;

/** Resolves Avro export files from either an export directory or an explicit file list. */
final class ObservationExportFiles {
private ObservationExportFiles() {}

static void validateOptions(TimeseriesBackfillOptions options) {
boolean hasInputExportDir = !options.getInputExportDir().isEmpty();
boolean hasInputFiles = !options.getInputFiles().isEmpty();
if (hasInputExportDir == hasInputFiles) {
throw new IllegalArgumentException(
"Exactly one of inputExportDir or inputFiles must be provided for the Avro pipeline.");
}
}

static List<String> resolveInputFiles(TimeseriesBackfillOptions options) {
if (!options.getInputFiles().isEmpty()) {
return parseCsv(options.getInputFiles());
}
return readManifest(
options, options.getInputExportDir(), options.getSourceObservationTableName());
}

private static List<String> readManifest(
TimeseriesBackfillOptions options, String exportDir, String sourceTableName) {
FileSystems.setDefaultPipelineOptions(options);
String normalizedExportDir = trimTrailingSlash(exportDir);
String manifestPath = normalizedExportDir + "/" + sourceTableName + "-manifest.json";
ResourceId manifestResource;
try {
manifestResource = FileSystems.matchSingleFileSpec(manifestPath).resourceId();
} catch (IOException e) {
throw new RuntimeException("Failed to find export manifest at " + manifestPath, e);
}

try (Reader reader =
Channels.newReader(FileSystems.open(manifestResource), StandardCharsets.UTF_8.name())) {
JsonElement root = JsonParser.parseReader(reader);
List<String> files = parseManifest(root, normalizedExportDir, sourceTableName);
if (!files.isEmpty()) {
return files;
}
throw new IllegalArgumentException(
"No Avro files for " + sourceTableName + " were found in " + manifestPath);
} catch (IOException e) {
throw new RuntimeException("Failed to read export manifest at " + manifestPath, e);
}
}

static List<String> parseManifest(JsonElement root, String exportDir, String sourceTableName) {
Pattern filePattern =
Pattern.compile("(^|.*/)" + Pattern.quote(sourceTableName) + "\\.avro-\\d{5}-of-\\d{5}$");
Set<String> files = new LinkedHashSet<>();
collectFiles(root, exportDir, filePattern, files);
return new ArrayList<>(files);
}

private static void collectFiles(
JsonElement element, String exportDir, Pattern filePattern, Set<String> files) {
if (element == null || element.isJsonNull()) {
return;
}
if (element.isJsonPrimitive() && element.getAsJsonPrimitive().isString()) {
String value = element.getAsString();
if (filePattern.matcher(value).find()) {
files.add(toAbsolutePath(exportDir, value));
}
return;
}
if (element.isJsonArray()) {
for (JsonElement child : element.getAsJsonArray()) {
collectFiles(child, exportDir, filePattern, files);
}
return;
}
JsonObject object = element.getAsJsonObject();
for (String key : object.keySet()) {
collectFiles(object.get(key), exportDir, filePattern, files);
}
}

private static List<String> parseCsv(String csv) {
List<String> fileSpecs = new ArrayList<>();
for (String part : csv.split(",")) {
String trimmed = part.trim();
if (!trimmed.isEmpty()) {
fileSpecs.add(trimmed);
}
}
if (!fileSpecs.isEmpty()) {
return fileSpecs;
}
throw new IllegalArgumentException("inputFiles must contain at least one Avro file path.");
}

private static String toAbsolutePath(String exportDir, String value) {
if (value.contains("://") || value.startsWith("/")) {
return value;
}
String relativePath = value.startsWith("./") ? value.substring(2) : value;
while (relativePath.startsWith("/")) {
relativePath = relativePath.substring(1);
}
return exportDir + "/" + relativePath;
}

private static String trimTrailingSlash(String value) {
if (value.endsWith("/")) {
return value.substring(0, value.length() - 1);
}
return value;
}
}
Loading