[Python] Add Watch transform with growth_of polling SDF#39023
Draft
Eliaaazzz wants to merge 1 commit into
Draft
Conversation
Add an experimental Watch transform that watches a growing set of outputs per input element. Watch.growth_of(poll_fn) runs a periodic poll loop as a splittable DoFn and emits an unbounded PCollection of (input, output) pairs. Each process() performs one poll round and self-checkpoints via defer_remainder. New outputs are deduplicated with a stable 128-bit blake2b hash of the encoded output, and a manual watermark estimator advances per poll. Per-input termination supports never() and after_total_of(); polling also stops when a poll returns PollResult.complete(). The DoFn is its own RestrictionProvider, and restriction state serializes through a tagged GrowthState coder. Tests cover termination conditions, coder round-trips, the restriction tracker claim/checkpoint/dedup logic, and DirectRunner end-to-end runs.
0cd6937 to
5b039a0
Compare
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #39023 +/- ##
============================================
- Coverage 55.16% 54.15% -1.01%
Complexity 1676 1676
============================================
Files 1068 1071 +3
Lines 167265 166847 -418
Branches 1208 1208
============================================
- Hits 92265 90352 -1913
- Misses 72816 74311 +1495
Partials 2184 2184
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
This adds an experimental Watch transform to the Python SDK. It addresses #21521.
Watch watches a growing set of outputs for each input element. It calls a user poll function on an interval and emits an unbounded PCollection of input and output pairs. Polling stops per input when the poll reports completion or when a termination condition fires. The transform is a splittable DoFn, so each process call performs one poll round and then self checkpoints with defer_remainder, which keeps the polling loop durable across bundles. New outputs are deduplicated with a stable 128 bit blake2b hash of the encoded output, and a manual watermark estimator advances once per poll.
This is an initial minimal version intended for review and iteration. It implements growth_of, PollResult, PollFn, the never and after_total_of termination conditions, the growth state restriction with its coder, and the polling splittable DoFn. It is validated end to end on the DirectRunner. The output coder must be deterministic for dedup to hold across workers, and the transform logs a warning when the resolved coder is not deterministic.
The following are out of scope for this initial change and are not included here: side input poll functions, the remaining termination conditions, a separate bounded stage for exploding large poll results, and bounded growth state eviction.
Testing
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.