Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 113 additions & 30 deletions rewrite-core/src/main/java/org/openrewrite/CsvDataTableStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
*/
package org.openrewrite;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.module.paramnames.ParameterNamesModule;
import com.univocity.parsers.csv.CsvParser;
import com.univocity.parsers.csv.CsvParserSettings;
import com.univocity.parsers.csv.CsvWriter;
Expand All @@ -28,6 +31,7 @@
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.*;
Expand All @@ -44,20 +48,34 @@
* Supports configurable output stream creation (e.g., for GZIP compression)
* and static prefix/suffix columns (e.g., repository metadata).
*
* <p>
* When {@link #getRows} is called, open writers for the requested table are
* <em>closed</em> (not merely flushed) so that compression trailers such as
* the GZIP footer are written, producing a fully valid file on disk. If
* {@link #insertRow} is called again for the same table, a new writer is
* created in append mode. For this reason the {@code outputStreamFactory}
* <strong>must</strong> open the stream with {@link java.nio.file.StandardOpenOption#CREATE
* CREATE} and {@link java.nio.file.StandardOpenOption#APPEND APPEND} semantics
* so that data written before the close is preserved. For compressed streams
* this produces a multi-member archive (e.g., concatenated GZIP members),
* which {@link java.util.zip.GZIPInputStream} handles transparently.
*
* <pre>{@code
* // Plain CSV
* new CsvDataTableStore(outputDir)
*
* // GZIP compressed with repository columns (write-only)
* new CsvDataTableStore(outputDir,
* path -> new GZIPOutputStream(Files.newOutputStream(path)),
* path -> new GZIPOutputStream(Files.newOutputStream(path,
* StandardOpenOption.CREATE, StandardOpenOption.APPEND)),
* ".csv.gz",
* Map.of("repositoryOrigin", origin, "repositoryPath", path),
* Map.of("org1", orgValue))
*
* // GZIP compressed with read-back support
* new CsvDataTableStore(outputDir,
* path -> new GZIPOutputStream(Files.newOutputStream(path)),
* path -> new GZIPOutputStream(Files.newOutputStream(path,
* StandardOpenOption.CREATE, StandardOpenOption.APPEND)),
* path -> new GZIPInputStream(Files.newInputStream(path)),
* ".csv.gz",
* Map.of("repositoryOrigin", origin, "repositoryPath", path),
Expand All @@ -73,6 +91,8 @@ public class CsvDataTableStore implements DataTableStore, AutoCloseable {
private final Map<String, String> prefixColumns;
private final Map<String, String> suffixColumns;
private final ConcurrentHashMap<String, BucketWriter> writers = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, RowMetadata> rowMetadata = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, DataTable<?>> knownTables = new ConcurrentHashMap<>();

/**
* Create a store that writes plain CSV files.
Expand Down Expand Up @@ -112,7 +132,11 @@ public CsvDataTableStore(Path outputDir,
* and additional static columns prepended/appended to each row.
*
* @param outputDir directory to write files into
* @param outputStreamFactory creates an output stream for each file path (e.g., wrapping with GZIPOutputStream)
* @param outputStreamFactory creates an output stream for each file path. <strong>Must</strong> use
* {@link java.nio.file.StandardOpenOption#CREATE CREATE} and
* {@link java.nio.file.StandardOpenOption#APPEND APPEND} so that
* rows written before a mid-run {@link #getRows} call are preserved
* when the writer is re-opened for subsequent inserts.
* @param inputStreamFactory creates an input stream for each file path (e.g., wrapping with GZIPInputStream)
* @param fileExtension file extension including dot (e.g., ".csv" or ".csv.gz")
* @param prefixColumns static columns prepended to each row, in insertion order
Expand All @@ -139,7 +163,7 @@ public CsvDataTableStore(Path outputDir,

private static OutputStream defaultOutputStream(Path path) {
try {
return Files.newOutputStream(path);
return Files.newOutputStream(path, StandardOpenOption.CREATE, StandardOpenOption.APPEND);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
Expand All @@ -155,32 +179,53 @@ private static InputStream defaultInputStream(Path path) {

@Override
public <Row> void insertRow(DataTable<Row> dataTable, ExecutionContext ctx, Row row) {
String metaKey = metaKey(dataTable.getName(), dataTable.getGroup());
rowMetadata.computeIfAbsent(metaKey, k -> RowMetadata.from(dataTable));
knownTables.putIfAbsent(fileKey(dataTable), dataTable);
String fileKey = fileKey(dataTable);
BucketWriter writer = writers.computeIfAbsent(fileKey, k -> createBucketWriter(dataTable));
writer.writeRow(row);
}

@Override
public Stream<?> getRows(String dataTableName, @Nullable String group) {
// Flush any open writers for this data table so all rows are on disk
for (BucketWriter writer : writers.values()) {
// Close (not just flush) matching writers so that compression trailers
// (e.g., GZIP footer) are written, making the files fully readable.
// Removed writers will be lazily re-created in append mode on the next insertRow().
Iterator<Map.Entry<String, BucketWriter>> it = writers.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<String, BucketWriter> entry = it.next();
BucketWriter writer = entry.getValue();
if (writer.dataTable.getName().equals(dataTableName) &&
Objects.equals(writer.dataTable.getGroup(), group)) {
writer.flush();
writer.close();
it.remove();
}
}

List<String[]> allRows = new ArrayList<>();
RowMetadata meta = rowMetadata.get(metaKey(dataTableName, group));

List<Object> allRows = new ArrayList<>();
//noinspection DataFlowIssue
File[] files = outputDir.toFile().listFiles((dir, name) -> name.endsWith(fileExtension));
if (files == null) {
return Stream.empty();
}

// Build set of file paths with still-open writers (other tables).
// These files have incomplete compression trailers and cannot be read.
Set<Path> activeWriterPaths = new HashSet<>();
for (Map.Entry<String, BucketWriter> entry : writers.entrySet()) {
activeWriterPaths.add(outputDir.resolve(entry.getKey() + fileExtension));
}

int prefixCount = prefixColumns.size();
int suffixCount = suffixColumns.size();

for (File file : files) {
if (activeWriterPaths.contains(file.toPath())) {
continue;
}
try (InputStream is = inputStreamFactory.apply(file.toPath())) {
DataTableDescriptor descriptor = readDescriptor(is);
if (descriptor == null ||
Expand All @@ -191,7 +236,7 @@ public Stream<?> getRows(String dataTableName, @Nullable String group) {
// readDescriptor consumed comment lines; now parse the remaining CSV
// (header + data rows). Re-read the full file with CsvParser.
} catch (IOException e) {
continue;
throw new UncheckedIOException(e);
}

try (InputStream is = inputStreamFactory.apply(file.toPath())) {
Expand All @@ -205,13 +250,14 @@ public Stream<?> getRows(String dataTableName, @Nullable String group) {
while ((row = parser.parseNext()) != null) {
// Strip prefix and suffix columns, returning only data table columns
int dataCount = row.length - prefixCount - suffixCount;
String[] dataRow;
if (dataCount <= 0) {
allRows.add(row);
dataRow = row;
} else {
String[] dataRow = new String[dataCount];
dataRow = new String[dataCount];
System.arraycopy(row, prefixCount, dataRow, 0, dataCount);
allRows.add(dataRow);
}
allRows.add(meta != null ? meta.toRow(dataRow) : dataRow);
}
parser.stopParsing();
} catch (IOException e) {
Expand All @@ -224,11 +270,7 @@ public Stream<?> getRows(String dataTableName, @Nullable String group) {

@Override
public Collection<DataTable<?>> getDataTables() {
List<DataTable<?>> result = new ArrayList<>(writers.size());
for (BucketWriter writer : writers.values()) {
result.add(writer.dataTable);
}
return Collections.unmodifiableCollection(result);
return Collections.unmodifiableCollection(knownTables.values());
}

@Override
Expand All @@ -242,6 +284,7 @@ public void close() {
private BucketWriter createBucketWriter(DataTable<?> dataTable) {
String fileKey = fileKey(dataTable);
Path path = outputDir.resolve(fileKey + fileExtension);
boolean append = Files.exists(path);

DataTableDescriptor descriptor = dataTableDescriptorFromDataTable(dataTable);
List<String> fieldNames = new ArrayList<>();
Expand All @@ -259,15 +302,17 @@ private BucketWriter createBucketWriter(DataTable<?> dataTable) {
OutputStream os = outputStreamFactory.apply(path);
try {
CsvWriterSettings settings = new CsvWriterSettings();
settings.setHeaderWritingEnabled(true);
settings.setHeaderWritingEnabled(!append);
settings.getFormat().setComment('#');
CsvWriter csvWriter = new CsvWriter(os, settings);

// Write metadata as comments
csvWriter.commentRow(" @name " + dataTable.getName());
csvWriter.commentRow(" @instanceName " + dataTable.getInstanceName());
csvWriter.commentRow(" @group " + (dataTable.getGroup() != null ? dataTable.getGroup() : ""));
csvWriter.writeHeaders(headers);
if (!append) {
// Write metadata as comments only for new files
csvWriter.commentRow(" @name " + dataTable.getName());
csvWriter.commentRow(" @instanceName " + dataTable.getInstanceName());
csvWriter.commentRow(" @group " + (dataTable.getGroup() != null ? dataTable.getGroup() : ""));
csvWriter.writeHeaders(headers);
}

return new BucketWriter(dataTable, csvWriter, os, fieldNames, headers.size());
} catch (Exception e) {
Expand Down Expand Up @@ -326,19 +371,57 @@ synchronized void writeRow(Object row) {
csvWriter.writeRow((Object[]) values);
}

synchronized void flush() {
csvWriter.flush();
void close() {
csvWriter.close();
try {
os.flush();
os.close();
} catch (IOException ignored) {
}
}
}

void close() {
csvWriter.close();
private static String metaKey(String dataTableName, @Nullable String group) {
return dataTableName + "\0" + (group != null ? group : "");
}

/**
* Holds the row class and its @Column field names so that
* String[] rows read from CSV can be deserialized back to typed objects
* via Jackson's {@link ObjectMapper#convertValue}.
*/
private static class RowMetadata {
private static final ObjectMapper MAPPER = new ObjectMapper()
.registerModule(new ParameterNamesModule())
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

final String rowClassName;
final List<String> fieldNames;

RowMetadata(String rowClassName, List<String> fieldNames) {
this.rowClassName = rowClassName;
this.fieldNames = fieldNames;
}

static RowMetadata from(DataTable<?> dataTable) {
Class<?> rowClass = dataTable.getType();
List<String> names = new ArrayList<>();
for (Field f : rowClass.getDeclaredFields()) {
if (f.isAnnotationPresent(Column.class)) {
names.add(f.getName());
}
}
return new RowMetadata(rowClass.getName(), names);
}

Object toRow(String[] values) {
Map<String, String> map = new LinkedHashMap<>();
for (int i = 0; i < fieldNames.size(); i++) {
map.put(fieldNames.get(i), i < values.length ? values[i] : "");
}
try {
os.close();
} catch (IOException ignored) {
return MAPPER.convertValue(map, Class.forName(rowClassName));
} catch (ClassNotFoundException e) {
throw new IllegalStateException("Row class not found: " + rowClassName, e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import lombok.Value;
import org.openrewrite.Column;
import org.openrewrite.DataTable;
import org.openrewrite.ExecutionContext;
import org.openrewrite.Recipe;

public class SourcesFileErrors extends DataTable<SourcesFileErrors.Row> {
Expand All @@ -41,4 +42,9 @@ public static class Row {
description = "The stack trace of the failure.")
String stackTrace;
}

@Override
protected boolean allowWritingInThisCycle(ExecutionContext ctx) {
return true;
}
}
Loading