While reviewing #4912, Yicong-Huang noted that there should be an API on ProgressiveUtils (or a sibling helper) that resolves a stream of insertion/retraction-flagged tuples down to a final materialized result — applying retractions to undo prior insertions.
A search of the current codebase confirms no such public API exists today:
ProgressiveUtils exposes only producers (addInsertionFlag, addRetractionFlag) and per-tuple readers (isInsertion, getTupleFlagAndValue).
- No downstream consumer references
insertRetractFlagAttr, __internal_is_insertion, or getTupleFlagAndValue — so the flag column has no production reader applying retractions.
This issue tracks adding that API. Suggested shape:
object ProgressiveUtils {
// Fold a stream of flagged tuples into the materialized "current" set:
// an insertion-flagged tuple is added; a retraction-flagged tuple removes
// any previously-inserted tuple that matches by value.
def materialize(flagged: Iterator[Tuple]): Set[Tuple] = ...
}
Open questions for the implementer:
- Should the result keep ordering (use
LinkedHashSet/Vector) or is Set fine?
- Equality basis for "matches by value" — Tuple already has a value-based
equals, so the default Set semantics should be enough, but worth confirming downstream sinks agree.
- What should happen if a retraction arrives for a tuple that was never inserted? Today the unflagged-default reads as insertion; the materializer should presumably ignore an unmatched retraction (or log a warning).
Tests to add alongside the API:
- Insertion-only stream → all inserted tuples present.
- Insertion + matching retraction → the retracted tuple is gone.
- Out-of-order retraction (retraction first, no prior matching insertion) → consistent behavior (probably no-op).
- Insertion + retraction + re-insertion → the tuple is back in the result.
- Mixed-type tuple payloads (the round-trip is already exercised in
ProgressiveUtilsSpec).
Out of scope of #4912 (which is test-only); intentionally split out so the test PR stays narrow.
While reviewing #4912, Yicong-Huang noted that there should be an API on
ProgressiveUtils(or a sibling helper) that resolves a stream of insertion/retraction-flagged tuples down to a final materialized result — applying retractions to undo prior insertions.A search of the current codebase confirms no such public API exists today:
ProgressiveUtilsexposes only producers (addInsertionFlag,addRetractionFlag) and per-tuple readers (isInsertion,getTupleFlagAndValue).insertRetractFlagAttr,__internal_is_insertion, orgetTupleFlagAndValue— so the flag column has no production reader applying retractions.This issue tracks adding that API. Suggested shape:
Open questions for the implementer:
LinkedHashSet/Vector) or isSetfine?equals, so the default Set semantics should be enough, but worth confirming downstream sinks agree.Tests to add alongside the API:
ProgressiveUtilsSpec).Out of scope of #4912 (which is test-only); intentionally split out so the test PR stays narrow.