Skip to content

[Python] Add Watch transform with growth_of polling SDF#39023

Draft
Eliaaazzz wants to merge 1 commit into
apache:masterfrom
Eliaaazzz:python-watch-transform
Draft

[Python] Add Watch transform with growth_of polling SDF#39023
Eliaaazzz wants to merge 1 commit into
apache:masterfrom
Eliaaazzz:python-watch-transform

Conversation

@Eliaaazzz

Copy link
Copy Markdown
Contributor

This adds an experimental Watch transform to the Python SDK. It addresses #21521.

Watch watches a growing set of outputs for each input element. It calls a user poll function on an interval and emits an unbounded PCollection of input and output pairs. Polling stops per input when the poll reports completion or when a termination condition fires. The transform is a splittable DoFn, so each process call performs one poll round and then self checkpoints with defer_remainder, which keeps the polling loop durable across bundles. New outputs are deduplicated with a stable 128 bit blake2b hash of the encoded output, and a manual watermark estimator advances once per poll.

This is an initial minimal version intended for review and iteration. It implements growth_of, PollResult, PollFn, the never and after_total_of termination conditions, the growth state restriction with its coder, and the polling splittable DoFn. It is validated end to end on the DirectRunner. The output coder must be deterministic for dedup to hold across workers, and the transform logs a warning when the resolved coder is not deterministic.

The following are out of scope for this initial change and are not included here: side input poll functions, the remaining termination conditions, a separate bounded stage for exploding large poll results, and bounded growth state eviction.

Testing

  • Unit tests for the termination conditions and for the growth state coder round trips, including ordered completed hashes and a fixed width 16 byte hash coder.
  • Restriction tracker tests for claim, dedup, checkpoint, replay, boundedness, and a terminal split residual for every stop reason, so a checkpoint after a terminal poll cannot resume polling.
  • DirectRunner end to end tests for single poll completion, multiple inputs, and multi round dedup.
  • Local run reports 32 tests passing and pylint at 10.00 of 10.

Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

Add an experimental Watch transform that watches a growing set of outputs
per input element. Watch.growth_of(poll_fn) runs a periodic poll loop as a
splittable DoFn and emits an unbounded PCollection of (input, output) pairs.

Each process() performs one poll round and self-checkpoints via
defer_remainder. New outputs are deduplicated with a stable 128-bit blake2b
hash of the encoded output, and a manual watermark estimator advances per
poll. Per-input termination supports never() and after_total_of(); polling
also stops when a poll returns PollResult.complete(). The DoFn is its own
RestrictionProvider, and restriction state serializes through a tagged
GrowthState coder.

Tests cover termination conditions, coder round-trips, the restriction
tracker claim/checkpoint/dedup logic, and DirectRunner end-to-end runs.
@Eliaaazzz Eliaaazzz force-pushed the python-watch-transform branch from 0cd6937 to 5b039a0 Compare June 19, 2026 01:48
@codecov

codecov Bot commented Jun 19, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 88.21656% with 37 lines in your changes missing coverage. Please review.
✅ Project coverage is 54.15%. Comparing base (8801afd) to head (5b039a0).
⚠️ Report is 1065 commits behind head on master.

Files with missing lines Patch % Lines
sdks/python/apache_beam/io/watch.py 88.21% 37 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #39023      +/-   ##
============================================
- Coverage     55.16%   54.15%   -1.01%     
  Complexity     1676     1676              
============================================
  Files          1068     1071       +3     
  Lines        167265   166847     -418     
  Branches       1208     1208              
============================================
- Hits          92265    90352    -1913     
- Misses        72816    74311    +1495     
  Partials       2184     2184              
Flag Coverage Δ
python 78.76% <88.21%> (-2.02%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant