feat(table): add support for merge-on-read delete#721
feat(table): add support for merge-on-read delete#721alexandre-normand wants to merge 1 commit intoapache:mainfrom
Conversation
5079248 to
114fc57
Compare
| var PositionalDeleteSchema = NewSchema(0, | ||
| NestedField{ID: 2147483546, Type: PrimitiveTypes.String, Name: "file_path", Required: true}, | ||
| NestedField{ID: 2147483545, Type: PrimitiveTypes.Int32, Name: "pos", Required: true}, | ||
| NestedField{ID: 2147483545, Type: PrimitiveTypes.Int64, Name: "pos", Required: true}, |
There was a problem hiding this comment.
The spec says that pos is a long so I updated this to be Int64 rather than Int32.
c9e30c4 to
a7a7ce6
Compare
a7a7ce6 to
6d41651
Compare
laskoviymishka
left a comment
There was a problem hiding this comment.
Overall, this looks correct (I mostly compared it with Iceberg Java), but I think a couple of things need attention:
panic/recoveris used in normal write-path error handling;- position-delete fanout needs additional tests;
- missing focused local unit tests for invariant-heavy code (
enrichRecordsWithPosDeleteFields, position-delete fanout).
Regarding the API change: the ToDataFile change is internal (package scope), but the ManifestWriter.ToManifestFile signature change is public and should be explicitly called out in the compatibility/release notes.
I would advocate for not changing this API and instead adding a new one with the extra argument, but this decision, in my opinion, is not critical.
|
|
||
| currentSpec, err := meta.CurrentSpec() | ||
| if err != nil || currentSpec == nil { | ||
| panic(fmt.Errorf("%w: cannot write files without a current spec", err)) |
There was a problem hiding this comment.
Can we avoid panic/recover for expected failures here?
CurrentSpec() and schema conversion errors are regular error cases and returning an error iterator directly would be clearer/safer than panicking. Also, the recover block only handles string and error; any other panic type could produce yield(nil, nil), which masks failure.
| // enrichRecordsWithPosDeleteFields enriches a RecordBatch with the columns declared in the PositionalDeleteArrowSchema | ||
| // so that during the pipeline filtering stages that sheds filtered out records, we still have a way to | ||
| // preserve the original position of those records. | ||
| func enrichRecordsWithPosDeleteFields(ctx context.Context, filePath iceberg.DataFile) recProcessFn { |
There was a problem hiding this comment.
Please add focused unit tests for enrichRecordsWithPosDeleteFields:
- monotonically increasing pos across multiple input batches
- exact appended schema/column ordering
- empty batch behavior
- failure mode if required fields are missing from PositionalDeleteArrowSchema.
| } | ||
|
|
||
| func (w *ManifestWriter) ToManifestFile(location string, length int64) (ManifestFile, error) { | ||
| func (w *ManifestWriter) ToManifestFile(location string, length int64, content ManifestContent) (ManifestFile, error) { |
There was a problem hiding this comment.
I think table/internal.DataFileStatistics.ToDataFile signature changes are acceptable (it's internal so who care?), but ManifestWriter.ToManifestFile is externally visible and now requires a new content arg.
This is backward incompatible change so 2 questions:
- Is it really really needed?
- If it's needed - we need to state it in changelog.
| // positionDeletePartitionedFanoutWriter distributes Arrow position delete records across multiple partitions based on | ||
| // a partition specification, writing data to separate delete files for each partition using | ||
| // a fanout pattern with configurable parallelism. | ||
| type positionDeletePartitionedFanoutWriter struct { |
There was a problem hiding this comment.
Could we add unit tests for positionDeletePartitionedFanoutWriter covering:
- mixed file_path values in one batch (assert/handle explicitly)
- missing partitionDataByFilePath entry (error instead of silent nil)
- empty batch behavior and cancellation path.
Also, consider avoiding defer record.Release() inside the loop to prevent delayed releases in long runs.
This adds support for merge-on-read deletes. It offers an alternative to the copy-on-write to generate position delete files instead of rewriting existing data files.
I'm not very confident in the elegance of my solution as I'm still new to the internals of iceberg-go but the high-level is:
RecordBatchwith the file Path and position before the original position is lost due to filtering.Testing
Integration tests were added to exercise the partitioned and unpartitioned paths and the data is such that it's meant to actually produce a position delete file rather than just go through the quick path that drops an entire file because all records are gone.
Indirect fixes
While working on this change and adding the testing for the partitioned table deletions, I realized that the manifest evaluation when the filter affected a field that was part of a partition spec was not built correctly. It needed to use similar code as what's done during scanning to build projections and build a manifest evaluator per partition id. This is fixed in this PR but this technically also applies to copy-on-write and overwrite paths so the fix goes beyond the scope of the
merge-on-read.