diff --git a/rewrite-core/src/main/java/org/openrewrite/CsvDataTableStore.java b/rewrite-core/src/main/java/org/openrewrite/CsvDataTableStore.java
index 8f655f8ded..0f5942fc56 100644
--- a/rewrite-core/src/main/java/org/openrewrite/CsvDataTableStore.java
+++ b/rewrite-core/src/main/java/org/openrewrite/CsvDataTableStore.java
@@ -15,6 +15,9 @@
*/
package org.openrewrite;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.module.paramnames.ParameterNamesModule;
import com.univocity.parsers.csv.CsvParser;
import com.univocity.parsers.csv.CsvParserSettings;
import com.univocity.parsers.csv.CsvWriter;
@@ -28,6 +31,7 @@
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.*;
@@ -44,20 +48,34 @@
* Supports configurable output stream creation (e.g., for GZIP compression)
* and static prefix/suffix columns (e.g., repository metadata).
*
+ *
+ * When {@link #getRows} is called, open writers for the requested table are
+ * closed (not merely flushed) so that compression trailers such as
+ * the GZIP footer are written, producing a fully valid file on disk. If
+ * {@link #insertRow} is called again for the same table, a new writer is
+ * created in append mode. For this reason the {@code outputStreamFactory}
+ * must open the stream with {@link java.nio.file.StandardOpenOption#CREATE
+ * CREATE} and {@link java.nio.file.StandardOpenOption#APPEND APPEND} semantics
+ * so that data written before the close is preserved. For compressed streams
+ * this produces a multi-member archive (e.g., concatenated GZIP members),
+ * which {@link java.util.zip.GZIPInputStream} handles transparently.
+ *
*
{@code
* // Plain CSV
* new CsvDataTableStore(outputDir)
*
* // GZIP compressed with repository columns (write-only)
* new CsvDataTableStore(outputDir,
- * path -> new GZIPOutputStream(Files.newOutputStream(path)),
+ * path -> new GZIPOutputStream(Files.newOutputStream(path,
+ * StandardOpenOption.CREATE, StandardOpenOption.APPEND)),
* ".csv.gz",
* Map.of("repositoryOrigin", origin, "repositoryPath", path),
* Map.of("org1", orgValue))
*
* // GZIP compressed with read-back support
* new CsvDataTableStore(outputDir,
- * path -> new GZIPOutputStream(Files.newOutputStream(path)),
+ * path -> new GZIPOutputStream(Files.newOutputStream(path,
+ * StandardOpenOption.CREATE, StandardOpenOption.APPEND)),
* path -> new GZIPInputStream(Files.newInputStream(path)),
* ".csv.gz",
* Map.of("repositoryOrigin", origin, "repositoryPath", path),
@@ -73,6 +91,8 @@ public class CsvDataTableStore implements DataTableStore, AutoCloseable {
private final Map prefixColumns;
private final Map suffixColumns;
private final ConcurrentHashMap writers = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap rowMetadata = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap> knownTables = new ConcurrentHashMap<>();
/**
* Create a store that writes plain CSV files.
@@ -112,7 +132,11 @@ public CsvDataTableStore(Path outputDir,
* and additional static columns prepended/appended to each row.
*
* @param outputDir directory to write files into
- * @param outputStreamFactory creates an output stream for each file path (e.g., wrapping with GZIPOutputStream)
+ * @param outputStreamFactory creates an output stream for each file path. Must use
+ * {@link java.nio.file.StandardOpenOption#CREATE CREATE} and
+ * {@link java.nio.file.StandardOpenOption#APPEND APPEND} so that
+ * rows written before a mid-run {@link #getRows} call are preserved
+ * when the writer is re-opened for subsequent inserts.
* @param inputStreamFactory creates an input stream for each file path (e.g., wrapping with GZIPInputStream)
* @param fileExtension file extension including dot (e.g., ".csv" or ".csv.gz")
* @param prefixColumns static columns prepended to each row, in insertion order
@@ -139,7 +163,7 @@ public CsvDataTableStore(Path outputDir,
private static OutputStream defaultOutputStream(Path path) {
try {
- return Files.newOutputStream(path);
+ return Files.newOutputStream(path, StandardOpenOption.CREATE, StandardOpenOption.APPEND);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
@@ -155,6 +179,9 @@ private static InputStream defaultInputStream(Path path) {
@Override
public void insertRow(DataTable dataTable, ExecutionContext ctx, Row row) {
+ String metaKey = metaKey(dataTable.getName(), dataTable.getGroup());
+ rowMetadata.computeIfAbsent(metaKey, k -> RowMetadata.from(dataTable));
+ knownTables.putIfAbsent(fileKey(dataTable), dataTable);
String fileKey = fileKey(dataTable);
BucketWriter writer = writers.computeIfAbsent(fileKey, k -> createBucketWriter(dataTable));
writer.writeRow(row);
@@ -162,25 +189,43 @@ public void insertRow(DataTable dataTable, ExecutionContext ctx, Row
@Override
public Stream> getRows(String dataTableName, @Nullable String group) {
- // Flush any open writers for this data table so all rows are on disk
- for (BucketWriter writer : writers.values()) {
+ // Close (not just flush) matching writers so that compression trailers
+ // (e.g., GZIP footer) are written, making the files fully readable.
+ // Removed writers will be lazily re-created in append mode on the next insertRow().
+ Iterator> it = writers.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry entry = it.next();
+ BucketWriter writer = entry.getValue();
if (writer.dataTable.getName().equals(dataTableName) &&
Objects.equals(writer.dataTable.getGroup(), group)) {
- writer.flush();
+ writer.close();
+ it.remove();
}
}
- List allRows = new ArrayList<>();
+ RowMetadata meta = rowMetadata.get(metaKey(dataTableName, group));
+
+ List allRows = new ArrayList<>();
//noinspection DataFlowIssue
File[] files = outputDir.toFile().listFiles((dir, name) -> name.endsWith(fileExtension));
if (files == null) {
return Stream.empty();
}
+ // Build set of file paths with still-open writers (other tables).
+ // These files have incomplete compression trailers and cannot be read.
+ Set activeWriterPaths = new HashSet<>();
+ for (Map.Entry entry : writers.entrySet()) {
+ activeWriterPaths.add(outputDir.resolve(entry.getKey() + fileExtension));
+ }
+
int prefixCount = prefixColumns.size();
int suffixCount = suffixColumns.size();
for (File file : files) {
+ if (activeWriterPaths.contains(file.toPath())) {
+ continue;
+ }
try (InputStream is = inputStreamFactory.apply(file.toPath())) {
DataTableDescriptor descriptor = readDescriptor(is);
if (descriptor == null ||
@@ -191,7 +236,7 @@ public Stream> getRows(String dataTableName, @Nullable String group) {
// readDescriptor consumed comment lines; now parse the remaining CSV
// (header + data rows). Re-read the full file with CsvParser.
} catch (IOException e) {
- continue;
+ throw new UncheckedIOException(e);
}
try (InputStream is = inputStreamFactory.apply(file.toPath())) {
@@ -205,13 +250,14 @@ public Stream> getRows(String dataTableName, @Nullable String group) {
while ((row = parser.parseNext()) != null) {
// Strip prefix and suffix columns, returning only data table columns
int dataCount = row.length - prefixCount - suffixCount;
+ String[] dataRow;
if (dataCount <= 0) {
- allRows.add(row);
+ dataRow = row;
} else {
- String[] dataRow = new String[dataCount];
+ dataRow = new String[dataCount];
System.arraycopy(row, prefixCount, dataRow, 0, dataCount);
- allRows.add(dataRow);
}
+ allRows.add(meta != null ? meta.toRow(dataRow) : dataRow);
}
parser.stopParsing();
} catch (IOException e) {
@@ -224,11 +270,7 @@ public Stream> getRows(String dataTableName, @Nullable String group) {
@Override
public Collection> getDataTables() {
- List> result = new ArrayList<>(writers.size());
- for (BucketWriter writer : writers.values()) {
- result.add(writer.dataTable);
- }
- return Collections.unmodifiableCollection(result);
+ return Collections.unmodifiableCollection(knownTables.values());
}
@Override
@@ -242,6 +284,7 @@ public void close() {
private BucketWriter createBucketWriter(DataTable> dataTable) {
String fileKey = fileKey(dataTable);
Path path = outputDir.resolve(fileKey + fileExtension);
+ boolean append = Files.exists(path);
DataTableDescriptor descriptor = dataTableDescriptorFromDataTable(dataTable);
List fieldNames = new ArrayList<>();
@@ -259,15 +302,17 @@ private BucketWriter createBucketWriter(DataTable> dataTable) {
OutputStream os = outputStreamFactory.apply(path);
try {
CsvWriterSettings settings = new CsvWriterSettings();
- settings.setHeaderWritingEnabled(true);
+ settings.setHeaderWritingEnabled(!append);
settings.getFormat().setComment('#');
CsvWriter csvWriter = new CsvWriter(os, settings);
- // Write metadata as comments
- csvWriter.commentRow(" @name " + dataTable.getName());
- csvWriter.commentRow(" @instanceName " + dataTable.getInstanceName());
- csvWriter.commentRow(" @group " + (dataTable.getGroup() != null ? dataTable.getGroup() : ""));
- csvWriter.writeHeaders(headers);
+ if (!append) {
+ // Write metadata as comments only for new files
+ csvWriter.commentRow(" @name " + dataTable.getName());
+ csvWriter.commentRow(" @instanceName " + dataTable.getInstanceName());
+ csvWriter.commentRow(" @group " + (dataTable.getGroup() != null ? dataTable.getGroup() : ""));
+ csvWriter.writeHeaders(headers);
+ }
return new BucketWriter(dataTable, csvWriter, os, fieldNames, headers.size());
} catch (Exception e) {
@@ -326,19 +371,57 @@ synchronized void writeRow(Object row) {
csvWriter.writeRow((Object[]) values);
}
- synchronized void flush() {
- csvWriter.flush();
+ void close() {
+ csvWriter.close();
try {
- os.flush();
+ os.close();
} catch (IOException ignored) {
}
}
+ }
- void close() {
- csvWriter.close();
+ private static String metaKey(String dataTableName, @Nullable String group) {
+ return dataTableName + "\0" + (group != null ? group : "");
+ }
+
+ /**
+ * Holds the row class and its @Column field names so that
+ * String[] rows read from CSV can be deserialized back to typed objects
+ * via Jackson's {@link ObjectMapper#convertValue}.
+ */
+ private static class RowMetadata {
+ private static final ObjectMapper MAPPER = new ObjectMapper()
+ .registerModule(new ParameterNamesModule())
+ .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+
+ final String rowClassName;
+ final List fieldNames;
+
+ RowMetadata(String rowClassName, List fieldNames) {
+ this.rowClassName = rowClassName;
+ this.fieldNames = fieldNames;
+ }
+
+ static RowMetadata from(DataTable> dataTable) {
+ Class> rowClass = dataTable.getType();
+ List names = new ArrayList<>();
+ for (Field f : rowClass.getDeclaredFields()) {
+ if (f.isAnnotationPresent(Column.class)) {
+ names.add(f.getName());
+ }
+ }
+ return new RowMetadata(rowClass.getName(), names);
+ }
+
+ Object toRow(String[] values) {
+ Map map = new LinkedHashMap<>();
+ for (int i = 0; i < fieldNames.size(); i++) {
+ map.put(fieldNames.get(i), i < values.length ? values[i] : "");
+ }
try {
- os.close();
- } catch (IOException ignored) {
+ return MAPPER.convertValue(map, Class.forName(rowClassName));
+ } catch (ClassNotFoundException e) {
+ throw new IllegalStateException("Row class not found: " + rowClassName, e);
}
}
}
diff --git a/rewrite-core/src/main/java/org/openrewrite/table/SourcesFileErrors.java b/rewrite-core/src/main/java/org/openrewrite/table/SourcesFileErrors.java
index e3ce3becb0..1c364856d4 100644
--- a/rewrite-core/src/main/java/org/openrewrite/table/SourcesFileErrors.java
+++ b/rewrite-core/src/main/java/org/openrewrite/table/SourcesFileErrors.java
@@ -18,6 +18,7 @@
import lombok.Value;
import org.openrewrite.Column;
import org.openrewrite.DataTable;
+import org.openrewrite.ExecutionContext;
import org.openrewrite.Recipe;
public class SourcesFileErrors extends DataTable {
@@ -41,4 +42,9 @@ public static class Row {
description = "The stack trace of the failure.")
String stackTrace;
}
+
+ @Override
+ protected boolean allowWritingInThisCycle(ExecutionContext ctx) {
+ return true;
+ }
}
diff --git a/rewrite-core/src/test/java/org/openrewrite/DataTableStoreTest.java b/rewrite-core/src/test/java/org/openrewrite/DataTableStoreTest.java
index 4728adba2b..88b6763e62 100644
--- a/rewrite-core/src/test/java/org/openrewrite/DataTableStoreTest.java
+++ b/rewrite-core/src/test/java/org/openrewrite/DataTableStoreTest.java
@@ -23,10 +23,16 @@
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.openrewrite.internal.InMemoryLargeSourceSet;
+import org.openrewrite.text.PlainText;
+import org.openrewrite.text.PlainTextVisitor;
import static org.assertj.core.api.Assertions.assertThat;
@@ -279,8 +285,8 @@ void csvStoreGetRowsReadsBackWrittenData(@TempDir Path tempDir) {
List> rows = store.getRows(table.getName(), null).collect(Collectors.toList());
assertThat(rows).hasSize(2);
- assertThat((String[]) rows.get(0)).containsExactly("alice");
- assertThat((String[]) rows.get(1)).containsExactly("bob");
+ assertThat(rows.get(0)).isEqualTo(new TestTable.Row("alice"));
+ assertThat(rows.get(1)).isEqualTo(new TestTable.Row("bob"));
}
}
@@ -293,8 +299,8 @@ void csvStoreGetRowsMultipleColumns(@TempDir Path tempDir) {
List> rows = store.getRows(table.getName(), null).collect(Collectors.toList());
assertThat(rows).hasSize(2);
- assertThat((String[]) rows.get(0)).containsExactly("1", "hello");
- assertThat((String[]) rows.get(1)).containsExactly("2", "world");
+ assertThat(rows.get(0)).isEqualTo(new MultiColTable.Row(1, "hello"));
+ assertThat(rows.get(1)).isEqualTo(new MultiColTable.Row(2, "world"));
}
}
@@ -319,11 +325,11 @@ void csvStoreGetRowsMatchesByGroup(@TempDir Path tempDir) {
List> groupedRows = store.getRows(grouped.getName(), "group-a").collect(Collectors.toList());
assertThat(groupedRows).hasSize(1);
- assertThat((String[]) groupedRows.getFirst()).containsExactly("grouped-row");
+ assertThat(groupedRows.getFirst()).isEqualTo(new TestTable.Row("grouped-row"));
List> ungroupedRows = store.getRows(ungrouped.getName(), null).collect(Collectors.toList());
assertThat(ungroupedRows).hasSize(1);
- assertThat((String[]) ungroupedRows.getFirst()).containsExactly("ungrouped-row");
+ assertThat(ungroupedRows.getFirst()).isEqualTo(new TestTable.Row("ungrouped-row"));
}
}
@@ -354,7 +360,7 @@ void csvStoreGetRowsStripsPrefixAndSuffixColumns(@TempDir Path tempDir) {
List> rows = store.getRows(table.getName(), null).collect(Collectors.toList());
assertThat(rows).hasSize(1);
// Should only contain the data column, not prefix/suffix
- assertThat((String[]) rows.getFirst()).containsExactly("alice");
+ assertThat(rows.getFirst()).isEqualTo(new TestTable.Row("alice"));
}
}
@@ -385,9 +391,9 @@ void csvStoreGetRowsHandlesSpecialCharacters(@TempDir Path tempDir) {
List> rows = store.getRows(table.getName(), null).collect(Collectors.toList());
assertThat(rows).hasSize(3);
- assertThat((String[]) rows.get(0)).containsExactly("value with, comma");
- assertThat((String[]) rows.get(1)).containsExactly("value with \"quotes\"");
- assertThat((String[]) rows.get(2)).containsExactly("value with\nnewline");
+ assertThat(rows.get(0)).isEqualTo(new TestTable.Row("value with, comma"));
+ assertThat(rows.get(1)).isEqualTo(new TestTable.Row("value with \"quotes\""));
+ assertThat(rows.get(2)).isEqualTo(new TestTable.Row("value with\nnewline"));
}
}
@@ -448,4 +454,125 @@ void sanitizeDifferentInputsProduceDifferentHashes() {
String b = CsvDataTableStore.sanitize("Find methods 'delete(..)'");
assertThat(a).isNotEqualTo(b);
}
+
+ // =========================================================================
+ // CsvDataTableStore: write in cycle 1, read back in cycle 2
+ // =========================================================================
+
+ /**
+ * A scanning recipe that writes to a data table during cycle 1's scanner,
+ * then reads rows back from the DataTableStore in cycle 2's getInitialValue.
+ */
+ static class WriteThenReadRecipe extends ScanningRecipe> {
+ transient TestTable table = new TestTable(this);
+
+ @Override
+ public String getDisplayName() {
+ return "Write then read";
+ }
+
+ @Override
+ public String getDescription() {
+ return "Writes data table rows in cycle 1, reads them back in cycle 2.";
+ }
+
+ @Override
+ public boolean causesAnotherCycle() {
+ return true;
+ }
+
+ @Override
+ public List getInitialValue(ExecutionContext ctx) {
+ // On cycle 2+, the store already contains rows written in cycle 1
+ List readBack = new ArrayList<>();
+ DataTableStore store = DataTableExecutionContextView.view(ctx).getDataTableStore();
+ try (Stream> rows = store.getRows(table.getName(), null)) {
+ rows.forEach(row -> {
+ if (row instanceof TestTable.Row) {
+ readBack.add(((TestTable.Row) row).getName());
+ } else {
+ readBack.add(((String[]) row)[0]);
+ }
+ });
+ }
+ return readBack;
+ }
+
+ @Override
+ public TreeVisitor, ExecutionContext> getScanner(List acc) {
+ return new PlainTextVisitor<>() {
+ @Override
+ public PlainText visitText(PlainText text, ExecutionContext ctx) {
+ // DataTable.insertRow only writes during cycle 1
+ table.insertRow(ctx, new TestTable.Row(text.getText()));
+ return text;
+ }
+ };
+ }
+
+ @Override
+ public TreeVisitor, ExecutionContext> getVisitor(List acc) {
+ return new PlainTextVisitor<>() {
+ @Override
+ public PlainText visitText(PlainText text, ExecutionContext ctx) {
+ if (acc.isEmpty()) {
+ // Cycle 1: no read-back data yet; make a change to trigger cycle 2
+ return text.withText(text.getText() + "-scanned");
+ }
+ // Cycle 2: append the data read back from the store
+ return text.withText(text.getText() + "-read:" + String.join(",", acc));
+ }
+ };
+ }
+ }
+
+ @Test
+ void csvStoreIntermixedWritesAndReads(@TempDir Path tempDir) {
+ try (CsvDataTableStore store = new CsvDataTableStore(tempDir)) {
+ TestTable table = new TestTable(Recipe.noop());
+
+ // First batch of writes
+ store.insertRow(table, ctx(), new TestTable.Row("alice"));
+ store.insertRow(table, ctx(), new TestTable.Row("bob"));
+
+ // Mid-run read (closes the writer internally)
+ List> firstRead = store.getRows(table.getName(), null).collect(Collectors.toList());
+ assertThat(firstRead).hasSize(2);
+ assertThat(firstRead.get(0)).isEqualTo(new TestTable.Row("alice"));
+ assertThat(firstRead.get(1)).isEqualTo(new TestTable.Row("bob"));
+
+ // Second batch of writes (writer re-created in append mode)
+ store.insertRow(table, ctx(), new TestTable.Row("charlie"));
+
+ // Second read should see all three rows
+ List> secondRead = store.getRows(table.getName(), null).collect(Collectors.toList());
+ assertThat(secondRead).hasSize(3);
+ assertThat(secondRead.get(0)).isEqualTo(new TestTable.Row("alice"));
+ assertThat(secondRead.get(1)).isEqualTo(new TestTable.Row("bob"));
+ assertThat(secondRead.get(2)).isEqualTo(new TestTable.Row("charlie"));
+ }
+ }
+
+ @Test
+ void csvStoreWriteInCycle1ReadBackInCycle2(@TempDir Path tempDir) {
+ ExecutionContext ctx = ctx();
+ DataTableExecutionContextView.view(ctx).setDataTableStore(new CsvDataTableStore(tempDir));
+
+ List sources = List.of(
+ PlainText.builder().text("hello").sourcePath(Path.of("test.txt")).build()
+ );
+
+ RecipeRun run = new RecipeScheduler().scheduleRun(
+ new WriteThenReadRecipe(),
+ new InMemoryLargeSourceSet(sources),
+ ctx, 2, 1
+ );
+
+ // The recipe should have run 2 cycles:
+ // Cycle 1: scanner writes "hello" to data table, visitor appends "-scanned"
+ // Cycle 2: getInitialValue reads back ["hello"], visitor appends "-read:hello"
+ PlainText after = (PlainText) run.getChangeset().getAllResults().getFirst().getAfter();
+ assertThat(after).isNotNull();
+ assertThat(after.getText()).isEqualTo("hello-scanned-read:hello");
+ }
}