-
Notifications
You must be signed in to change notification settings - Fork 486
[server] When applying projection pushdown, return empty records instead of skipping them to ensure offset movement in the client. #2370
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
…ead of skipping them to ensure offset movement in the client.
wuchong
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@loserwang1024 , I left some comments to optimize the test.
| DEFAULT_COMPRESSION); | ||
| } | ||
|
|
||
| private MemoryLogRecords logRecords( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not used, remove
| // In this way, if skip empty batch, the read will in stuck forever. | ||
| conf.set(ConfigOptions.REMOTE_LOG_TASK_INTERVAL_DURATION, Duration.ZERO); | ||
| conf.set( | ||
| ConfigOptions.LOG_SEGMENT_FILE_SIZE, | ||
| new MemorySize(5 * V0_RECORD_BATCH_HEADER_SIZE)); | ||
| conf.set( | ||
| ConfigOptions.CLIENT_SCANNER_LOG_FETCH_MAX_BYTES_FOR_BUCKET, | ||
| new MemorySize(5 * V0_RECORD_BATCH_HEADER_SIZE)); | ||
| final FlussClusterExtension flussClusterExtension = | ||
| FlussClusterExtension.builder() | ||
| .setNumOfTabletServers(3) | ||
| .setClusterConf(conf) | ||
| .build(); | ||
| flussClusterExtension.start(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my local environment, the original test takes about 15 seconds, but it increases to 55 seconds after applying this PR. I think we should optimize it. Here are several optimization opportunities:
- The test starts a new Fluss cluster in addition to the existing one from the test base class, which adds significant overhead.
- The issue only occurs when projection pushdown is enabled (
doProjection = true), so there’s no need to testdoProjection = false. - The table bucket count can be reduced to 1, as it’s sufficient for reproducing the problem.
- The number of empty record batches can be reduced from 10 to 2, this should still reliably reproduce the issue while avoiding 10 unnecessary RPC round-trips that dominate the runtime.
I suggest introducing a dedicated test class like CustomFlussClusterITCase (without extending ClientToServerITCaseBase) for tests that require manual cluster management, and adding a focused test method such as testProjectionPushdownWithEmptyBatches that incorporates all these optimizations.
Purpose
Linked issue: the detailed analysis is in #2369.
Brief change log
Not filter empty log in projection.
Tests
org.apache.fluss.client.table.FlussTableITCase#testFirstRowMergeEngine
API and Format
Documentation