Skip to content

[SPARK-56335][SQL] Implement SupportsMetadataColumns in FileTable#55320

Draft
LuciferYang wants to merge 12 commits intoapache:masterfrom
LuciferYang:SPARK-56335
Draft

[SPARK-56335][SQL] Implement SupportsMetadataColumns in FileTable#55320
LuciferYang wants to merge 12 commits intoapache:masterfrom
LuciferYang:SPARK-56335

Conversation

@LuciferYang
Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

Expose the V1-compatible _metadata struct column (file_path, file_name,
file_size, file_block_start, file_block_length, file_modification_time)
on V2 file-based tables so that queries like
SELECT _metadata.file_path FROM parquet.`` work against the V2 scan
path instead of forcing a V1 fallback.

Key changes:

  1. FileTable implements SupportsMetadataColumns: metadataColumns()
    returns a single _metadata struct column whose fields come from
    FileFormat.BASE_METADATA_FIELDS. Formats can extend metadataSchemaFields
    to expose additional fields (e.g., Parquet's row_index, tracked in
    SPARK-56371).

  2. FileScanBuilder.pruneColumns: Intercepts the _metadata field from
    the required schema, stores the pruned metadata struct on
    requestedMetadataFields, and keeps it out of readDataSchema so the
    format-specific reader stays unchanged.

  3. FileScan.readSchema: Re-exposes _metadata as a trailing struct field
    when metadata is requested, so V2ScanRelationPushDown can rebind the
    downstream attribute reference back to the scan output.

  4. MetadataAppendingFilePartitionReaderFactory (new): Wraps the
    format-specific reader factory and appends a single _metadata struct value
    (via JoinedRow + an inner GenericInternalRow) to each row. Columnar
    reads are disabled while metadata is requested since ConstantColumnVector
    is scalar and cannot represent a struct column; queries fall back to the row
    path.

  5. All six concrete scans (Parquet/ORC/CSV/JSON/Text/Avro): Take
    requestedMetadataFields as a trailing default-valued case-class parameter
    and call the new wrapWithMetadataIfNeeded helper when constructing their
    reader factory. Their ScanBuilder.build() implementations pass the field
    through from FileScanBuilder.

Parquet's generated row_index metadata field is intentionally out of scope;
follow-up work is tracked in SPARK-56371.

Why are the changes needed?

Before this change, _metadata on a DSv2 file table was unresolvable and the
query fell back to the V1 FileSourceScanExec path. This is one of the
remaining blockers for deprecating the V1 file sources (SPARK-56170).

Does this PR introduce any user-facing change?

Yes. _metadata.* queries now work against V2 file sources with the same
semantics as V1.

How was this patch tested?

New FileMetadataColumnsV2Suite (24 tests) exercises read and projection paths
for Parquet/ORC/JSON/CSV/Text, forcing the V2 path via
spark.sql.sources.useV1SourceList = "", and asserts the metadata struct values
against the underlying file's java.io.File stats.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code

…Frame API writes and delete FallBackFileSourceV2

Key changes:
- FileWrite: added partitionSchema, customPartitionLocations,
  dynamicPartitionOverwrite, isTruncate; path creation and truncate
  logic; dynamic partition overwrite via FileCommitProtocol
- FileTable: createFileWriteBuilder with SupportsDynamicOverwrite
  and SupportsTruncate; capabilities now include TRUNCATE and
  OVERWRITE_DYNAMIC; fileIndex skips file existence checks when
  userSpecifiedSchema is provided (write path)
- All file format writes (Parquet, ORC, CSV, JSON, Text, Avro) use
  createFileWriteBuilder with partition/truncate/overwrite support
- DataFrameWriter.lookupV2Provider: enabled FileDataSourceV2 for
  non-partitioned Append and Overwrite via df.write.save(path)
- DataFrameWriter.insertInto: V1 fallback for file sources
  (TODO: SPARK-56175)
- DataFrameWriter.saveAsTable: V1 fallback for file sources
  (TODO: SPARK-56230, needs StagingTableCatalog)
- DataSourceV2Utils.getTableProvider: V1 fallback for file sources
  (TODO: SPARK-56175)
- Removed FallBackFileSourceV2 rule
- V2SessionCatalog.createTable: V1 FileFormat data type validation
…catalog table loading, and gate removal

Key changes:
- FileTable extends SupportsPartitionManagement with createPartition,
  dropPartition, listPartitionIdentifiers, partitionSchema
- Partition operations sync to catalog metastore (best-effort)
- V2SessionCatalog.loadTable returns FileTable instead of V1Table,
  sets catalogTable and useCatalogFileIndex on FileTable
- V2SessionCatalog.getDataSourceOptions includes storage.properties
  for proper option propagation (header, ORC bloom filter, etc.)
- V2SessionCatalog.createTable validates data types via FileTable
- FileTable.columns() restores NOT NULL constraints from catalogTable
- FileTable.partitioning() falls back to userSpecifiedPartitioning
  or catalog partition columns
- FileTable.fileIndex uses CatalogFileIndex when catalog has
  registered partitions (custom partition locations)
- FileTable.schema checks column name duplication for non-catalog
  tables only
- DataSourceV2Utils.getTableProvider: removed FileDataSourceV2 gate
- DataFrameWriter.insertInto: enabled V2 for file sources
- DataFrameWriter.saveAsTable: V1 fallback (TODO: SPARK-56230)
- ResolveSessionCatalog: V1 fallback for FileTable-backed commands
  (AnalyzeTable, AnalyzeColumn, TruncateTable, TruncatePartition,
  ShowPartitions, RecoverPartitions, AddPartitions, RenamePartitions,
  DropPartitions, SetTableLocation, CREATE TABLE validation,
  REPLACE TABLE blocking)
- FindDataSourceTable: streaming V1 fallback for FileTable
  (TODO: SPARK-56233)
- DataSource.planForWritingFileFormat: graceful V2 handling
Enable bucketed writes for V2 file tables via catalog BucketSpec.

Key changes:
- FileWrite: add bucketSpec field, use V1WritesUtils.getWriterBucketSpec()
  instead of hardcoded None
- FileTable: createFileWriteBuilder passes catalogTable.bucketSpec
  to the write pipeline
- FileDataSourceV2: getTable uses collect to skip BucketTransform
  (handled via catalogTable.bucketSpec instead)
- FileWriterFactory: use DynamicPartitionDataConcurrentWriter for
  bucketed writes since V2's RequiresDistributionAndOrdering cannot
  express hash-based ordering
- All 6 format Write/Table classes updated with BucketSpec parameter

Note: bucket pruning and bucket join (read-path optimization) are
not included in this patch (tracked under SPARK-56231).
Add RepairTableExec to sync filesystem partition directories with
catalog metastore for V2 file tables.

Key changes:
- New RepairTableExec: scans filesystem partitions via
  FileTable.listPartitionIdentifiers(), compares with catalog,
  registers missing partitions and drops orphaned entries
- DataSourceV2Strategy: route RepairTable and RecoverPartitions
  for FileTable to new V2 exec node
Implement SupportsOverwriteV2 for V2 file tables to support static
partition overwrite (INSERT OVERWRITE TABLE t PARTITION(p=1) SELECT ...).

Key changes:
- FileTable: replace SupportsTruncate with SupportsOverwriteV2 on
  WriteBuilder, implement overwrite(predicates)
- FileWrite: extend toBatch() to delete only the matching partition
  directory, ordered by partitionSchema
- FileTable.CAPABILITIES: add OVERWRITE_BY_FILTER
- All 6 format Write/Table classes: plumb overwritePredicates parameter

This is a prerequisite for SPARK-56304 (ifPartitionNotExists).
…EAD)

### What changes were proposed in this pull request?

Implements `MicroBatchStream` support for V2 file tables, enabling structured streaming reads through the V2 path instead of falling back to V1 `FileStreamSource`.

Key changes:
- New `FileMicroBatchStream` class implementing `MicroBatchStream`, `SupportsAdmissionControl`, and `SupportsTriggerAvailableNow` — handles file discovery, offset management, rate limiting, and partition planning
- Override `FileScan.toMicroBatchStream()` to return `FileMicroBatchStream`
- Add `withFileIndex` method to `FileScan` and all 6 concrete scans for creating batch-specific scans
- Add `MICRO_BATCH_READ` to `FileTable.CAPABILITIES`
- Update `ResolveDataSource` to allow `FileDataSourceV2` into the V2 streaming path (respects `USE_V1_SOURCE_LIST` for backward compatibility)
- Remove the `FileTable` streaming fallback in `FindDataSourceTable`
- Reuses V1 infrastructure (`FileStreamSourceLog`, `FileStreamSourceOffset`, `SeenFilesMap`) for checkpoint compatibility

### Why are the changes needed?

V2 file tables cannot be fully adopted until streaming reads are supported. Without this, the V1 `FileStreamSource` fallback prevents deprecation of V1 file source code.

### Does this PR introduce _any_ user-facing change?

No. By default, `USE_V1_SOURCE_LIST` includes all file formats, so streaming reads still use V1. Users can opt into V2 by clearing the list. Existing checkpoints are compatible.

### How was this patch tested?

New `FileStreamV2ReadSuite` with 6 E2E tests. Existing `FileStreamSourceSuite` (76 tests) passes with V1 forced via `USE_V1_SOURCE_LIST`.
…ITE)

### What changes were proposed in this pull request?

Implements `StreamingWrite` support for V2 file tables, enabling structured streaming writes through the V2 path instead of falling back to V1 `FileStreamSink`.

Key changes:
- New `FileStreamingWrite` class implementing `StreamingWrite` — uses `ManifestFileCommitProtocol` for file commit and `FileStreamSinkLog` for metadata tracking
- New `FileStreamingWriterFactory` bridging `DataWriterFactory` to `StreamingDataWriterFactory`
- Override `FileWrite.toStreaming()` to return `FileStreamingWrite`
- Add `STREAMING_WRITE` to `FileTable.CAPABILITIES`
- Idempotent `commit(epochId, messages)` — skips already-committed batches
- Supports `retention` option for metadata log cleanup (V1 parity)
- Checkpoint compatible with V1 `FileStreamSink` (same `_spark_metadata` format)

### Why are the changes needed?

V2 file tables cannot be fully adopted until streaming writes are supported. Without this, the V1 `FileStreamSink` fallback prevents deprecation of V1 file source code. Together with SPARK-56232 (streaming read), this completes the streaming support needed for V1 deprecation.

### Does this PR introduce _any_ user-facing change?

No. By default, `USE_V1_SOURCE_LIST` includes all file formats, so streaming writes still use V1. Users can opt into V2 by clearing the list. Existing checkpoints are compatible.

### How was this patch tested?

New `FileStreamV2WriteSuite` with 4 E2E tests. Existing `FileStreamSinkV1Suite` passes. All 108 streaming file tests pass.
Exposes the V1-compatible `_metadata` struct column (`file_path`, `file_name`,
`file_size`, `file_block_start`, `file_block_length`, `file_modification_time`)
on V2 file-based tables so that queries like
`SELECT _metadata.file_path FROM parquet.`<path>`` work against the V2 scan
path instead of forcing a V1 fallback.

The wiring is:

* `FileTable` implements `SupportsMetadataColumns.metadataColumns()` and returns
  a single `_metadata` struct column whose fields come from
  `FileFormat.BASE_METADATA_FIELDS`. Formats may extend `metadataSchemaFields`
  later to expose additional fields (e.g., Parquet's `row_index`, tracked in
  SPARK-56371).
* `FileScanBuilder.pruneColumns` intercepts the `_metadata` field from the
  required schema, stores the pruned metadata struct on
  `requestedMetadataFields`, and keeps it out of `readDataSchema` so the
  format-specific reader stays unchanged.
* `FileScan.readSchema` re-exposes `_metadata` as a trailing struct field when
  metadata is requested, so `V2ScanRelationPushDown` can rebind the downstream
  attribute reference back to the scan output.
* A new `MetadataAppendingFilePartitionReaderFactory` wraps the format-specific
  reader factory and appends a single `_metadata` struct value (via
  `JoinedRow` + an inner `GenericInternalRow`) to each row. Columnar reads are
  disabled while metadata is requested since `ConstantColumnVector` is scalar
  and cannot represent a struct column; queries fall back to the row path.
* All six concrete scans (Parquet/ORC/CSV/JSON/Text/Avro) take
  `requestedMetadataFields` as a trailing default-valued case-class parameter
  and call the new `wrapWithMetadataIfNeeded` helper when constructing their
  reader factory. Their `ScanBuilder.build()` implementations pass the field
  through from `FileScanBuilder`.

Parquet's generated `row_index` metadata field is intentionally out of scope;
follow-up work is tracked in SPARK-56371.

Before this change, `_metadata` on a DSv2 file table was unresolvable and the
query fell back to the V1 `FileSourceScanExec` path, which is one of the
remaining blockers for deprecating the V1 file sources (SPARK-56170).

Yes. `_metadata.*` queries now work against the V2 file sources with the same
semantics as V1.

New `FileMetadataColumnsV2Suite` exercises read and projection paths for
Parquet/ORC/JSON/CSV/Text, forcing the V2 path via `useV1SourceList`, and
asserts the metadata struct values against the underlying file's
`java.io.File` stats. All 16 tests pass.
@LuciferYang LuciferYang marked this pull request as draft April 13, 2026 07:05
@LuciferYang
Copy link
Copy Markdown
Contributor Author

This is the 12th PR for SPARK-56170

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant