diff --git a/rewrite-core/src/main/java/org/openrewrite/CsvDataTableStore.java b/rewrite-core/src/main/java/org/openrewrite/CsvDataTableStore.java index 8f655f8ded..0f5942fc56 100644 --- a/rewrite-core/src/main/java/org/openrewrite/CsvDataTableStore.java +++ b/rewrite-core/src/main/java/org/openrewrite/CsvDataTableStore.java @@ -15,6 +15,9 @@ */ package org.openrewrite; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.module.paramnames.ParameterNamesModule; import com.univocity.parsers.csv.CsvParser; import com.univocity.parsers.csv.CsvParserSettings; import com.univocity.parsers.csv.CsvWriter; @@ -28,6 +31,7 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.StandardOpenOption; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.*; @@ -44,20 +48,34 @@ * Supports configurable output stream creation (e.g., for GZIP compression) * and static prefix/suffix columns (e.g., repository metadata). * + *

+ * When {@link #getRows} is called, open writers for the requested table are + * closed (not merely flushed) so that compression trailers such as + * the GZIP footer are written, producing a fully valid file on disk. If + * {@link #insertRow} is called again for the same table, a new writer is + * created in append mode. For this reason the {@code outputStreamFactory} + * must open the stream with {@link java.nio.file.StandardOpenOption#CREATE + * CREATE} and {@link java.nio.file.StandardOpenOption#APPEND APPEND} semantics + * so that data written before the close is preserved. For compressed streams + * this produces a multi-member archive (e.g., concatenated GZIP members), + * which {@link java.util.zip.GZIPInputStream} handles transparently. + * *

{@code
  * // Plain CSV
  * new CsvDataTableStore(outputDir)
  *
  * // GZIP compressed with repository columns (write-only)
  * new CsvDataTableStore(outputDir,
- *     path -> new GZIPOutputStream(Files.newOutputStream(path)),
+ *     path -> new GZIPOutputStream(Files.newOutputStream(path,
+ *         StandardOpenOption.CREATE, StandardOpenOption.APPEND)),
  *     ".csv.gz",
  *     Map.of("repositoryOrigin", origin, "repositoryPath", path),
  *     Map.of("org1", orgValue))
  *
  * // GZIP compressed with read-back support
  * new CsvDataTableStore(outputDir,
- *     path -> new GZIPOutputStream(Files.newOutputStream(path)),
+ *     path -> new GZIPOutputStream(Files.newOutputStream(path,
+ *         StandardOpenOption.CREATE, StandardOpenOption.APPEND)),
  *     path -> new GZIPInputStream(Files.newInputStream(path)),
  *     ".csv.gz",
  *     Map.of("repositoryOrigin", origin, "repositoryPath", path),
@@ -73,6 +91,8 @@ public class CsvDataTableStore implements DataTableStore, AutoCloseable {
     private final Map prefixColumns;
     private final Map suffixColumns;
     private final ConcurrentHashMap writers = new ConcurrentHashMap<>();
+    private final ConcurrentHashMap rowMetadata = new ConcurrentHashMap<>();
+    private final ConcurrentHashMap> knownTables = new ConcurrentHashMap<>();
 
     /**
      * Create a store that writes plain CSV files.
@@ -112,7 +132,11 @@ public CsvDataTableStore(Path outputDir,
      * and additional static columns prepended/appended to each row.
      *
      * @param outputDir           directory to write files into
-     * @param outputStreamFactory creates an output stream for each file path (e.g., wrapping with GZIPOutputStream)
+     * @param outputStreamFactory creates an output stream for each file path. Must use
+     *                            {@link java.nio.file.StandardOpenOption#CREATE CREATE} and
+     *                            {@link java.nio.file.StandardOpenOption#APPEND APPEND} so that
+     *                            rows written before a mid-run {@link #getRows} call are preserved
+     *                            when the writer is re-opened for subsequent inserts.
      * @param inputStreamFactory  creates an input stream for each file path (e.g., wrapping with GZIPInputStream)
      * @param fileExtension       file extension including dot (e.g., ".csv" or ".csv.gz")
      * @param prefixColumns       static columns prepended to each row, in insertion order
@@ -139,7 +163,7 @@ public CsvDataTableStore(Path outputDir,
 
     private static OutputStream defaultOutputStream(Path path) {
         try {
-            return Files.newOutputStream(path);
+            return Files.newOutputStream(path, StandardOpenOption.CREATE, StandardOpenOption.APPEND);
         } catch (IOException e) {
             throw new UncheckedIOException(e);
         }
@@ -155,6 +179,9 @@ private static InputStream defaultInputStream(Path path) {
 
     @Override
     public  void insertRow(DataTable dataTable, ExecutionContext ctx, Row row) {
+        String metaKey = metaKey(dataTable.getName(), dataTable.getGroup());
+        rowMetadata.computeIfAbsent(metaKey, k -> RowMetadata.from(dataTable));
+        knownTables.putIfAbsent(fileKey(dataTable), dataTable);
         String fileKey = fileKey(dataTable);
         BucketWriter writer = writers.computeIfAbsent(fileKey, k -> createBucketWriter(dataTable));
         writer.writeRow(row);
@@ -162,25 +189,43 @@ public  void insertRow(DataTable dataTable, ExecutionContext ctx, Row
 
     @Override
     public Stream getRows(String dataTableName, @Nullable String group) {
-        // Flush any open writers for this data table so all rows are on disk
-        for (BucketWriter writer : writers.values()) {
+        // Close (not just flush) matching writers so that compression trailers
+        // (e.g., GZIP footer) are written, making the files fully readable.
+        // Removed writers will be lazily re-created in append mode on the next insertRow().
+        Iterator> it = writers.entrySet().iterator();
+        while (it.hasNext()) {
+            Map.Entry entry = it.next();
+            BucketWriter writer = entry.getValue();
             if (writer.dataTable.getName().equals(dataTableName) &&
                 Objects.equals(writer.dataTable.getGroup(), group)) {
-                writer.flush();
+                writer.close();
+                it.remove();
             }
         }
 
-        List allRows = new ArrayList<>();
+        RowMetadata meta = rowMetadata.get(metaKey(dataTableName, group));
+
+        List allRows = new ArrayList<>();
         //noinspection DataFlowIssue
         File[] files = outputDir.toFile().listFiles((dir, name) -> name.endsWith(fileExtension));
         if (files == null) {
             return Stream.empty();
         }
 
+        // Build set of file paths with still-open writers (other tables).
+        // These files have incomplete compression trailers and cannot be read.
+        Set activeWriterPaths = new HashSet<>();
+        for (Map.Entry entry : writers.entrySet()) {
+            activeWriterPaths.add(outputDir.resolve(entry.getKey() + fileExtension));
+        }
+
         int prefixCount = prefixColumns.size();
         int suffixCount = suffixColumns.size();
 
         for (File file : files) {
+            if (activeWriterPaths.contains(file.toPath())) {
+                continue;
+            }
             try (InputStream is = inputStreamFactory.apply(file.toPath())) {
                 DataTableDescriptor descriptor = readDescriptor(is);
                 if (descriptor == null ||
@@ -191,7 +236,7 @@ public Stream getRows(String dataTableName, @Nullable String group) {
                 // readDescriptor consumed comment lines; now parse the remaining CSV
                 // (header + data rows). Re-read the full file with CsvParser.
             } catch (IOException e) {
-                continue;
+                throw new UncheckedIOException(e);
             }
 
             try (InputStream is = inputStreamFactory.apply(file.toPath())) {
@@ -205,13 +250,14 @@ public Stream getRows(String dataTableName, @Nullable String group) {
                 while ((row = parser.parseNext()) != null) {
                     // Strip prefix and suffix columns, returning only data table columns
                     int dataCount = row.length - prefixCount - suffixCount;
+                    String[] dataRow;
                     if (dataCount <= 0) {
-                        allRows.add(row);
+                        dataRow = row;
                     } else {
-                        String[] dataRow = new String[dataCount];
+                        dataRow = new String[dataCount];
                         System.arraycopy(row, prefixCount, dataRow, 0, dataCount);
-                        allRows.add(dataRow);
                     }
+                    allRows.add(meta != null ? meta.toRow(dataRow) : dataRow);
                 }
                 parser.stopParsing();
             } catch (IOException e) {
@@ -224,11 +270,7 @@ public Stream getRows(String dataTableName, @Nullable String group) {
 
     @Override
     public Collection> getDataTables() {
-        List> result = new ArrayList<>(writers.size());
-        for (BucketWriter writer : writers.values()) {
-            result.add(writer.dataTable);
-        }
-        return Collections.unmodifiableCollection(result);
+        return Collections.unmodifiableCollection(knownTables.values());
     }
 
     @Override
@@ -242,6 +284,7 @@ public void close() {
     private BucketWriter createBucketWriter(DataTable dataTable) {
         String fileKey = fileKey(dataTable);
         Path path = outputDir.resolve(fileKey + fileExtension);
+        boolean append = Files.exists(path);
 
         DataTableDescriptor descriptor = dataTableDescriptorFromDataTable(dataTable);
         List fieldNames = new ArrayList<>();
@@ -259,15 +302,17 @@ private BucketWriter createBucketWriter(DataTable dataTable) {
         OutputStream os = outputStreamFactory.apply(path);
         try {
             CsvWriterSettings settings = new CsvWriterSettings();
-            settings.setHeaderWritingEnabled(true);
+            settings.setHeaderWritingEnabled(!append);
             settings.getFormat().setComment('#');
             CsvWriter csvWriter = new CsvWriter(os, settings);
 
-            // Write metadata as comments
-            csvWriter.commentRow(" @name " + dataTable.getName());
-            csvWriter.commentRow(" @instanceName " + dataTable.getInstanceName());
-            csvWriter.commentRow(" @group " + (dataTable.getGroup() != null ? dataTable.getGroup() : ""));
-            csvWriter.writeHeaders(headers);
+            if (!append) {
+                // Write metadata as comments only for new files
+                csvWriter.commentRow(" @name " + dataTable.getName());
+                csvWriter.commentRow(" @instanceName " + dataTable.getInstanceName());
+                csvWriter.commentRow(" @group " + (dataTable.getGroup() != null ? dataTable.getGroup() : ""));
+                csvWriter.writeHeaders(headers);
+            }
 
             return new BucketWriter(dataTable, csvWriter, os, fieldNames, headers.size());
         } catch (Exception e) {
@@ -326,19 +371,57 @@ synchronized void writeRow(Object row) {
             csvWriter.writeRow((Object[]) values);
         }
 
-        synchronized void flush() {
-            csvWriter.flush();
+        void close() {
+            csvWriter.close();
             try {
-                os.flush();
+                os.close();
             } catch (IOException ignored) {
             }
         }
+    }
 
-        void close() {
-            csvWriter.close();
+    private static String metaKey(String dataTableName, @Nullable String group) {
+        return dataTableName + "\0" + (group != null ? group : "");
+    }
+
+    /**
+     * Holds the row class and its @Column field names so that
+     * String[] rows read from CSV can be deserialized back to typed objects
+     * via Jackson's {@link ObjectMapper#convertValue}.
+     */
+    private static class RowMetadata {
+        private static final ObjectMapper MAPPER = new ObjectMapper()
+                .registerModule(new ParameterNamesModule())
+                .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+
+        final String rowClassName;
+        final List fieldNames;
+
+        RowMetadata(String rowClassName, List fieldNames) {
+            this.rowClassName = rowClassName;
+            this.fieldNames = fieldNames;
+        }
+
+        static RowMetadata from(DataTable dataTable) {
+            Class rowClass = dataTable.getType();
+            List names = new ArrayList<>();
+            for (Field f : rowClass.getDeclaredFields()) {
+                if (f.isAnnotationPresent(Column.class)) {
+                    names.add(f.getName());
+                }
+            }
+            return new RowMetadata(rowClass.getName(), names);
+        }
+
+        Object toRow(String[] values) {
+            Map map = new LinkedHashMap<>();
+            for (int i = 0; i < fieldNames.size(); i++) {
+                map.put(fieldNames.get(i), i < values.length ? values[i] : "");
+            }
             try {
-                os.close();
-            } catch (IOException ignored) {
+                return MAPPER.convertValue(map, Class.forName(rowClassName));
+            } catch (ClassNotFoundException e) {
+                throw new IllegalStateException("Row class not found: " + rowClassName, e);
             }
         }
     }
diff --git a/rewrite-core/src/main/java/org/openrewrite/table/SourcesFileErrors.java b/rewrite-core/src/main/java/org/openrewrite/table/SourcesFileErrors.java
index e3ce3becb0..1c364856d4 100644
--- a/rewrite-core/src/main/java/org/openrewrite/table/SourcesFileErrors.java
+++ b/rewrite-core/src/main/java/org/openrewrite/table/SourcesFileErrors.java
@@ -18,6 +18,7 @@
 import lombok.Value;
 import org.openrewrite.Column;
 import org.openrewrite.DataTable;
+import org.openrewrite.ExecutionContext;
 import org.openrewrite.Recipe;
 
 public class SourcesFileErrors extends DataTable {
@@ -41,4 +42,9 @@ public static class Row {
                 description = "The stack trace of the failure.")
         String stackTrace;
     }
+
+    @Override
+    protected boolean allowWritingInThisCycle(ExecutionContext ctx) {
+        return true;
+    }
 }
diff --git a/rewrite-core/src/test/java/org/openrewrite/DataTableStoreTest.java b/rewrite-core/src/test/java/org/openrewrite/DataTableStoreTest.java
index 4728adba2b..88b6763e62 100644
--- a/rewrite-core/src/test/java/org/openrewrite/DataTableStoreTest.java
+++ b/rewrite-core/src/test/java/org/openrewrite/DataTableStoreTest.java
@@ -23,10 +23,16 @@
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.openrewrite.internal.InMemoryLargeSourceSet;
+import org.openrewrite.text.PlainText;
+import org.openrewrite.text.PlainTextVisitor;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -279,8 +285,8 @@ void csvStoreGetRowsReadsBackWrittenData(@TempDir Path tempDir) {
 
             List rows = store.getRows(table.getName(), null).collect(Collectors.toList());
             assertThat(rows).hasSize(2);
-            assertThat((String[]) rows.get(0)).containsExactly("alice");
-            assertThat((String[]) rows.get(1)).containsExactly("bob");
+            assertThat(rows.get(0)).isEqualTo(new TestTable.Row("alice"));
+            assertThat(rows.get(1)).isEqualTo(new TestTable.Row("bob"));
         }
     }
 
@@ -293,8 +299,8 @@ void csvStoreGetRowsMultipleColumns(@TempDir Path tempDir) {
 
             List rows = store.getRows(table.getName(), null).collect(Collectors.toList());
             assertThat(rows).hasSize(2);
-            assertThat((String[]) rows.get(0)).containsExactly("1", "hello");
-            assertThat((String[]) rows.get(1)).containsExactly("2", "world");
+            assertThat(rows.get(0)).isEqualTo(new MultiColTable.Row(1, "hello"));
+            assertThat(rows.get(1)).isEqualTo(new MultiColTable.Row(2, "world"));
         }
     }
 
@@ -319,11 +325,11 @@ void csvStoreGetRowsMatchesByGroup(@TempDir Path tempDir) {
 
             List groupedRows = store.getRows(grouped.getName(), "group-a").collect(Collectors.toList());
             assertThat(groupedRows).hasSize(1);
-            assertThat((String[]) groupedRows.getFirst()).containsExactly("grouped-row");
+            assertThat(groupedRows.getFirst()).isEqualTo(new TestTable.Row("grouped-row"));
 
             List ungroupedRows = store.getRows(ungrouped.getName(), null).collect(Collectors.toList());
             assertThat(ungroupedRows).hasSize(1);
-            assertThat((String[]) ungroupedRows.getFirst()).containsExactly("ungrouped-row");
+            assertThat(ungroupedRows.getFirst()).isEqualTo(new TestTable.Row("ungrouped-row"));
         }
     }
 
@@ -354,7 +360,7 @@ void csvStoreGetRowsStripsPrefixAndSuffixColumns(@TempDir Path tempDir) {
             List rows = store.getRows(table.getName(), null).collect(Collectors.toList());
             assertThat(rows).hasSize(1);
             // Should only contain the data column, not prefix/suffix
-            assertThat((String[]) rows.getFirst()).containsExactly("alice");
+            assertThat(rows.getFirst()).isEqualTo(new TestTable.Row("alice"));
         }
     }
 
@@ -385,9 +391,9 @@ void csvStoreGetRowsHandlesSpecialCharacters(@TempDir Path tempDir) {
 
             List rows = store.getRows(table.getName(), null).collect(Collectors.toList());
             assertThat(rows).hasSize(3);
-            assertThat((String[]) rows.get(0)).containsExactly("value with, comma");
-            assertThat((String[]) rows.get(1)).containsExactly("value with \"quotes\"");
-            assertThat((String[]) rows.get(2)).containsExactly("value with\nnewline");
+            assertThat(rows.get(0)).isEqualTo(new TestTable.Row("value with, comma"));
+            assertThat(rows.get(1)).isEqualTo(new TestTable.Row("value with \"quotes\""));
+            assertThat(rows.get(2)).isEqualTo(new TestTable.Row("value with\nnewline"));
         }
     }
 
@@ -448,4 +454,125 @@ void sanitizeDifferentInputsProduceDifferentHashes() {
         String b = CsvDataTableStore.sanitize("Find methods 'delete(..)'");
         assertThat(a).isNotEqualTo(b);
     }
+
+    // =========================================================================
+    // CsvDataTableStore: write in cycle 1, read back in cycle 2
+    // =========================================================================
+
+    /**
+     * A scanning recipe that writes to a data table during cycle 1's scanner,
+     * then reads rows back from the DataTableStore in cycle 2's getInitialValue.
+     */
+    static class WriteThenReadRecipe extends ScanningRecipe> {
+        transient TestTable table = new TestTable(this);
+
+        @Override
+        public String getDisplayName() {
+            return "Write then read";
+        }
+
+        @Override
+        public String getDescription() {
+            return "Writes data table rows in cycle 1, reads them back in cycle 2.";
+        }
+
+        @Override
+        public boolean causesAnotherCycle() {
+            return true;
+        }
+
+        @Override
+        public List getInitialValue(ExecutionContext ctx) {
+            // On cycle 2+, the store already contains rows written in cycle 1
+            List readBack = new ArrayList<>();
+            DataTableStore store = DataTableExecutionContextView.view(ctx).getDataTableStore();
+            try (Stream rows = store.getRows(table.getName(), null)) {
+                rows.forEach(row -> {
+                    if (row instanceof TestTable.Row) {
+                        readBack.add(((TestTable.Row) row).getName());
+                    } else {
+                        readBack.add(((String[]) row)[0]);
+                    }
+                });
+            }
+            return readBack;
+        }
+
+        @Override
+        public TreeVisitor getScanner(List acc) {
+            return new PlainTextVisitor<>() {
+                @Override
+                public PlainText visitText(PlainText text, ExecutionContext ctx) {
+                    // DataTable.insertRow only writes during cycle 1
+                    table.insertRow(ctx, new TestTable.Row(text.getText()));
+                    return text;
+                }
+            };
+        }
+
+        @Override
+        public TreeVisitor getVisitor(List acc) {
+            return new PlainTextVisitor<>() {
+                @Override
+                public PlainText visitText(PlainText text, ExecutionContext ctx) {
+                    if (acc.isEmpty()) {
+                        // Cycle 1: no read-back data yet; make a change to trigger cycle 2
+                        return text.withText(text.getText() + "-scanned");
+                    }
+                    // Cycle 2: append the data read back from the store
+                    return text.withText(text.getText() + "-read:" + String.join(",", acc));
+                }
+            };
+        }
+    }
+
+    @Test
+    void csvStoreIntermixedWritesAndReads(@TempDir Path tempDir) {
+        try (CsvDataTableStore store = new CsvDataTableStore(tempDir)) {
+            TestTable table = new TestTable(Recipe.noop());
+
+            // First batch of writes
+            store.insertRow(table, ctx(), new TestTable.Row("alice"));
+            store.insertRow(table, ctx(), new TestTable.Row("bob"));
+
+            // Mid-run read (closes the writer internally)
+            List firstRead = store.getRows(table.getName(), null).collect(Collectors.toList());
+            assertThat(firstRead).hasSize(2);
+            assertThat(firstRead.get(0)).isEqualTo(new TestTable.Row("alice"));
+            assertThat(firstRead.get(1)).isEqualTo(new TestTable.Row("bob"));
+
+            // Second batch of writes (writer re-created in append mode)
+            store.insertRow(table, ctx(), new TestTable.Row("charlie"));
+
+            // Second read should see all three rows
+            List secondRead = store.getRows(table.getName(), null).collect(Collectors.toList());
+            assertThat(secondRead).hasSize(3);
+            assertThat(secondRead.get(0)).isEqualTo(new TestTable.Row("alice"));
+            assertThat(secondRead.get(1)).isEqualTo(new TestTable.Row("bob"));
+            assertThat(secondRead.get(2)).isEqualTo(new TestTable.Row("charlie"));
+        }
+    }
+
+    @Test
+    void csvStoreWriteInCycle1ReadBackInCycle2(@TempDir Path tempDir) {
+        ExecutionContext ctx = ctx();
+        DataTableExecutionContextView.view(ctx).setDataTableStore(new CsvDataTableStore(tempDir));
+
+        List sources = List.of(
+                PlainText.builder().text("hello").sourcePath(Path.of("test.txt")).build()
+        );
+
+        RecipeRun run = new RecipeScheduler().scheduleRun(
+                new WriteThenReadRecipe(),
+                new InMemoryLargeSourceSet(sources),
+                ctx, 2, 1
+        );
+
+        // The recipe should have run 2 cycles:
+        //   Cycle 1: scanner writes "hello" to data table, visitor appends "-scanned"
+        //   Cycle 2: getInitialValue reads back ["hello"], visitor appends "-read:hello"
+        PlainText after = (PlainText) run.getChangeset().getAllResults().getFirst().getAfter();
+        assertThat(after).isNotNull();
+        assertThat(after.getText()).isEqualTo("hello-scanned-read:hello");
+    }
 }