diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 000000000..20d9a2d81 --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,113 @@ +# Parallel Consumer - Agent Context + +Project context for AI coding agents (Claude Code, Copilot, Cursor, etc.). + +## Overview + +Parallel Consumer is a Java library that enables concurrent message processing from Apache Kafka with a single consumer, avoiding the need to increase partition counts. It maintains ordering guarantees (by partition or key) while processing messages in parallel. + +This is a community-maintained fork of `confluentinc/parallel-consumer` (the upstream is no longer actively maintained), published to Maven Central as `io.github.astubbs.parallelconsumer`. + +## Build Requirements + +- **JDK 17** (required - the project uses Jabel to compile Java 17 source to Java 8 bytecode) +- **Docker** (required for integration tests - TestContainers spins up Kafka brokers) +- **Maven** via wrapper (`./mvnw`) - do not use system Maven + +## How to Build + +```bash +# Quick local build (compile + unit tests) +bin/build.sh + +# Full CI build with all tests (unit + integration) +bin/ci-build.sh + +# Full CI build against a specific Kafka version +bin/ci-build.sh 3.9.1 +``` + +## Module Structure + +| Module | Purpose | +|--------|---------| +| `parallel-consumer-core` | Core library - consumer, producer, offset management, sharding | +| `parallel-consumer-vertx` | Vert.x integration for async HTTP | +| `parallel-consumer-reactor` | Project Reactor integration | +| `parallel-consumer-mutiny` | SmallRye Mutiny integration (Quarkus) | +| `parallel-consumer-examples` | Example implementations for each module | + +## Key Architecture Decisions + +- **Jabel cross-compilation**: Source is Java 17, bytecode targets Java 8 via Jabel annotation processor. This means `--release 8` is set in the compiler plugin, which restricts available APIs to Java 8 surface. The Mutiny module overrides this to `--release 9` because Mutiny uses `java.util.concurrent.Flow` (Java 9+). +- **Offset encoding**: Custom offset map encoding (run-length, bitset) stored in Kafka commit metadata for tracking in-flight messages. +- **Sharding**: Messages are distributed to processing shards by key or partition for ordering guarantees. + +## Testing + +- **Unit tests**: `mvn test` / surefire plugin. Source in `src/test/java/`. +- **Integration tests**: `mvn verify` / failsafe plugin. Source in `src/test-integration/java/`. Uses TestContainers with `confluentinc/cp-kafka` Docker image. +- **Test exclusion patterns**: `**/integrationTest*/**/*.java` and `**/*IT.java` are excluded from surefire, included in failsafe. +- **Kafka version matrix**: CI tests against multiple Kafka versions via `-Dkafka.version=X.Y.Z`. +- **Performance tests**: Tagged `@Tag("performance")` and excluded from regular CI by default. They run on a self-hosted runner via `.github/workflows/performance.yml` (see `docs/SELF_HOSTED_RUNNER.md`). Run locally with `bin/performance-test.sh` (or `bin/performance-test.cmd` on Windows). Override the test group selection with Maven properties: `-Dincluded.groups=performance` to run only perf, `-Dexcluded.groups=` to run everything. + +## Known Issues + +- **Mutiny module**: Has a `release.target=9` override in its pom.xml because Mutiny's `Multi` implements `java.util.concurrent.Flow.Publisher` which is not available with `--release 8`. + +## Development Rules + +- **Working directory**: This is a multi-module Maven project. All `./mvnw` and `git` commands MUST run from the **project root** (the directory containing `pom.xml` and `./mvnw`). Never `cd` into a submodule and run Maven from there — use `-pl ` to target a specific module instead. If you need to check the current directory, run `pwd` first. The `cd` command does NOT persist between tool calls. +- **Dependency injection**: Always wire new components through `PCModule` (and `PCModuleTestEnv` for tests). Don't bypass the DI by storing direct references to components. +- **Reuse test infrastructure**: Before creating new test utilities, check existing harnesses: `AbstractParallelEoSStreamProcessorTestBase`, `BrokerIntegrationTest`, `KafkaClientUtils`, `ManagedPCInstance`, `ModelUtils`, `LongPollingMockConsumer`, `ProgressTracker`, `PCModuleTestEnv`. +- **Never weaken test assertions**: Tests are critical for this project. When modifying test error handling, classify exceptions (whitelist expected ones) rather than ignoring them. Integration/load tests serve as both specific scenario tests AND general stability canaries. +- **License check**: Always pass `-Dlicense.skip` to Maven commands unless intentionally formatting headers. The plugin breaks in git worktrees. +- **Run full test suite before pushing**: After significant production code changes, run: `./mvnw clean verify -Dlicense.skip -Dexcluded.groups=performance` from the project root. Don't push until this passes. + +## Code Style + +- **Lombok**: Used extensively (builders, getters, logging). IntelliJ Lombok plugin required. +- **EditorConfig**: Enforced via `.editorconfig` - 4-space indent for Java, 120 char line length. +- **License headers**: Managed by `license-maven-plugin` (Mycila). See "License headers" section below. +- **Google Truth**: Used for test assertions alongside JUnit 5 and Mockito. + +## License headers + +The Mycila `license-maven-plugin` enforces a Confluent copyright header on all source files. It uses git-derived years via `${license.git.copyrightYears}`. + +**Skipping the check** (for any Maven goal): +``` +./mvnw -Dlicense.skip +``` + +**When to skip:** +- Running builds inside a git worktree — the git-years lookup fails with `Bare Repository has neither a working tree, nor an index` +- Local iteration where you don't want years auto-bumped on touched files +- Any command other than the canonical `mvn install` flow when copyright drift would create noise in `git status` + +The default behavior on macOS dev machines is `format` mode (auto-fixes headers) via the `license-format` profile (auto-activated). The `ci` profile flips this to `check` mode (fails the build on drift). Both `bin/build.sh` and `bin/ci-build.sh` already pass `-Dlicense.skip` for this reason. + +**When NOT to skip:** +- You're intentionally running `mvn license:format` to update headers +- You want to verify CI's check would pass before pushing + +## CI + +- **`.github/workflows/maven.yml`** — Build and test on every push/PR. Push uses default Kafka version, PRs run the full version matrix. Includes concurrency cancellation. +- **`.github/workflows/publish.yml`** — Publishes to Maven Central on every push to `master`. The pom.xml version is the source of truth: `-SNAPSHOT` versions deploy as snapshots, non-snapshot versions deploy as full releases (and create a git tag + GitHub release). +- **`.semaphore/`** — Legacy Confluent internal CI/release pipelines, retained but inactive on the fork. + +## Releasing + +The pom.xml version drives publishing — there is no `maven-release-plugin` dance. + +**Cut a release:** +1. Open a PR removing `-SNAPSHOT` from `` in the parent pom (e.g. `0.6.0.0-SNAPSHOT` → `0.6.0.0`) +2. Merge it to master → CI publishes to Maven Central, tags `v0.6.0.0`, creates a GitHub release +3. Open another PR bumping to the next snapshot (e.g. `0.6.0.1-SNAPSHOT`) and merge + +**Required GitHub repo secrets** for `publish.yml`: +- `MAVEN_CENTRAL_USERNAME` — Sonatype Central Portal token username +- `MAVEN_CENTRAL_PASSWORD` — Sonatype Central Portal token password +- `MAVEN_GPG_PRIVATE_KEY` — Armored GPG private key for signing artifacts +- `MAVEN_GPG_PASSPHRASE` — Passphrase for the GPG key diff --git a/docs/BUG_857_INVESTIGATION.md b/docs/BUG_857_INVESTIGATION.md new file mode 100644 index 000000000..3e5d5a1c9 --- /dev/null +++ b/docs/BUG_857_INVESTIGATION.md @@ -0,0 +1,254 @@ +# Bug #857 Investigation: Paused Consumption After Rebalance + +Upstream issue: https://github.com/confluentinc/parallel-consumer/issues/857 + +## Summary + +Multiple users report that after Kafka rebalances (especially with cooperative sticky assignor under heavy load), Parallel Consumer stops processing messages on certain partitions. Lag accumulates indefinitely; only restart fixes it. + +## Reproduction + +**Test:** `MultiInstanceRebalanceTest.largeNumberOfInstances` (was `@Disabled` since 2022, re-enabled for this investigation) + +- 80 partitions, 12 PC instances, 500k messages, chaos monkey toggling instances +- **Failure rate: ~80% (4/5 runs)** with original code +- **Failure rate: 100% (3/3 runs)** after fixing the restart logic +- Stalls at varying progress points (17%-74%), confirming timing-dependent race + +## Root Cause Found + +``` +ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access + currentThread(name: pc-broker-poll-PC-4, id: 1466) + otherThread(id: 1465) +``` + +**Call stack:** +``` +ConsumerManager.updateCache() + → ConsumerManager.poll() + → BrokerPollSystem.pollBrokerForRecords() +``` + +When `close()` is called from an external thread (chaos monkey, shutdown signal, rebalance handler) while the broker poll thread is mid-`consumer.poll()` or `consumer.groupMetadata()`, the Kafka client detects multi-threaded access and throws `ConcurrentModificationException`. This crashes the PC instance via the control loop's error handler (`AbstractParallelEoSStreamProcessor:854`), setting `failureReason` and closing the PC. + +### Why this causes "paused consumption" + +In production, the sequence is: +1. Rebalance starts → `onPartitionsRevoked` callback fires on the poll thread +2. Meanwhile, `close()` or another operation touches the consumer from a different thread +3. `ConcurrentModificationException` → PC crashes internally +4. The consumer group coordinator sees the member as failed → partitions redistributed +5. But the PC's work containers, shard state, and epoch tracking are left in an inconsistent state +6. If the same JVM process creates a new PC instance (e.g., supervisor restart), it starts fresh — but the consumer group's committed offsets may not reflect all in-flight work, leading to a gap + +In the test: +1. Chaos monkey calls `stop()` → `close()` from the chaos thread +2. Poll thread is mid-`consumer.poll()` or `consumer.groupMetadata()` +3. `ConcurrentModificationException` crashes the PC +4. `failFast` detects the dead PC → test fails with "Terminal failure" + +### What sangreal's PR #882 fix addressed + +PR #882 fixed stale work container cleanup in `ProcessingShard.getWorkIfAvailable()`. That fix is correct and necessary, but it addresses a different symptom: stale containers blocking new work after a clean rebalance. It does NOT address the concurrent access crash. + +### What the deterministic unit tests showed + +The `ShardManagerStaleContainerTest` tests (3 tests, all pass) prove that the stale container logic works correctly in single-threaded scenarios. The epoch tracking, stale detection, and mid-iteration removal all function as designed. The bug is purely a concurrency issue. + +## Bug 2: Silent Stall (the real #857) + +After fixing the restart logic in tests, we still see 100% failure rate — but with a different pattern: NO exceptions, NO crashes, consumers alive and running, but consumption stops making progress. This is exactly what production users describe. + +### Root Cause: `numberRecordsOutForProcessing` counter drift + +**File:** `WorkManager.java:65` — `private int numberRecordsOutForProcessing = 0` + +This counter tracks how many work items have been dispatched to the worker pool but not yet completed. It's used by `calculateQuantityToRequest()` to determine how many new work items to fetch from shards: + +``` +delta = target - numberRecordsOutForProcessing +``` + +**The bug:** When partitions are revoked (`onPartitionsRevoked`), work is removed from shards and partition state — but `numberRecordsOutForProcessing` is NOT adjusted. In-flight work for revoked partitions is expected to complete through the mailbox, where `handleFutureResult()` detects them as stale and decrements the counter. But if the work items don't make it back through the mailbox (e.g., the worker pool was shut down during close, interrupting in-flight tasks), the counter stays permanently inflated. + +**The consequence:** `calculateQuantityToRequest()` computes `target - inflated_counter = 0` (or negative). No new work is requested. No records are polled. Consumption stalls silently. + +**Evidence:** In the failing test runs: +- All partitions are correctly assigned after rebalances +- No exceptions, no errors, no crashes +- But the overall progress count stops incrementing +- The `ProgressTracker` detects "No progress after 11 rounds" + +### Proposed Fix + +In `WorkManager.onPartitionsRevoked()`, count the number of in-flight work containers for the revoked partitions and subtract them from `numberRecordsOutForProcessing`. This ensures the counter accurately reflects the actual amount of outstanding work after a rebalance. + +## Fix for Bug 1 (CME) + +The `close()` path needs to safely interrupt the poll thread via `consumer.wakeup()` instead of directly touching the consumer from another thread. Partial fix committed: moved `updateCache()` after `pollingBroker=false` in `ConsumerManager.poll()`. May need additional work. + +## Current Status + +Under moderate rebalance stress, PC handles multi-instance rebalancing correctly. Under extreme stress (12 instances toggling every 500ms), consumption stalls. + +### What we observed + +Diagnostic logging in the poll loop during the aggressive test stall: +``` +#857-poll: runState=RUNNING, pausedForThrottling=false, assignment=0 +``` +All PC instances were running, not paused, but the Kafka consumer reported zero assigned partitions. The control loop was requesting work (`delta=41`) but shards were empty because no records were being polled. + +### What we don't yet know + +The `assignment=0` observation has multiple possible explanations: +- The Kafka group coordinator can't keep up with rapid membership changes (12 instances toggling every 500ms) +- PC's `close()` doesn't cleanly deregister from the consumer group, delaying rebalance completion +- The lifecycle wait in `ManagedPCInstance` (10s) isn't long enough for the old consumer to fully leave +- There's a PC bug that only manifests under high concurrency/churn, and the gentle test doesn't hit the race window +- The `consumerManager.assignment()` cache is stale or reported incorrectly + +Further investigation is needed to determine whether this is a Kafka group protocol limitation under extreme churn, a PC issue with consumer group cleanup during close, or something else entirely. + +### Test Matrix + +| Test | Assignor | Chaos | Result | +|------|----------|-------|--------| +| No chaos, 2 instances | Range | None | **6/6 PASS** | +| Gentle chaos, 6 instances | Range | 3s intervals | **3/3 PASS** | +| Gentle chaos, 4 instances | Cooperative Sticky | 3s intervals | **3/3 PASS** | +| Aggressive chaos, 12 instances | Range | 500ms intervals | **~20% PASS** | + +### Defensive fixes applied + +These are all correct improvements regardless of the root cause: + +1. **CME prevention**: moved `updateCache()` after `pollingBroker=false` in `ConsumerManager.poll()` +2. **Counter adjustment**: `adjustOutForProcessingOnRevoke()` in `WorkManager` before shard cleanup +3. **Throttle reset**: `pausedForThrottling=false` on partition assignment in `BrokerPollSystem` +4. **Lifecycle wait**: `ManagedPCInstance.run()` waits for previous PC to fully close before creating a new one + +### Regarding production #857 + +The production reports describe consumers that are stable (not being rapidly toggled). The aggressive chaos test may not reproduce the exact production scenario. With gentle chaos (which better simulates production rebalances from deployments), PC handles rebalances correctly with both Range and Cooperative Sticky assignors. + +## Bug 3: commitCommand Lock Contention — THE ROOT CAUSE + +### Discovery + +With 12 instances + gentle chaos (3s intervals): 0/3 pass. The instance count matters, not just chaos frequency. Analysis of the failed run showed: + +1. All 80 partitions revoked at `20:05` via `onPartitionsRevoked` callbacks +2. Zero `onPartitionsAssigned` callbacks fire after that — ever +3. All threads go silent — no poll, no control loop, no chaos monkey +4. System is completely dead for the remaining 15 minutes until timeout + +### Root Cause: `synchronized(commitCommand)` deadlock + +**File:** `AbstractParallelEoSStreamProcessor.java:1314` + +`commitOffsetsThatAreReady()` takes `synchronized(commitCommand)`. This method is called from both: +- **Poll thread** (line 424): inside `onPartitionsRevoked`, which runs during a Kafka rebalance callback +- **Control thread** (line 894): inside `controlLoop`, as part of normal offset commit cycle + +When the control thread holds the `commitCommand` lock (mid-commit), and a rebalance fires on the poll thread, `onPartitionsRevoked` tries to acquire the same lock. The poll thread blocks. But the control thread's `consumer.commitSync()` (called through `committer.retrieveOffsetsAndCommit()`) needs the poll thread to be responsive for the Kafka protocol to work. **Deadlock.** + +With more instances: +- More consumers in the group = more frequent rebalances +- More rebalances = higher probability of hitting the window where the control thread is mid-commit +- This explains why 6 instances passes and 12 fails: the collision probability scales with group size + +### This IS a PC bug + +The `commitCommand` lock creates a cross-thread dependency between the poll thread (which must remain responsive during rebalance) and the control thread (which holds the lock during potentially slow broker commits). This violates Kafka's threading model: rebalance callbacks must complete quickly, and the poll thread must not be blocked by operations on other threads. + +### Fix: ReentrantLock with tryLock() + +Replaced `synchronized(commitCommand)` in `commitOffsetsThatAreReady()` with a `ReentrantLock`. In `onPartitionsRevoked`, use `tryLock()` — if the lock is held by the control thread mid-commit, skip the commit. Kafka re-delivers uncommitted records to the new assignee, so this is safe. + +### Results after fix + +| Test | Before | After | +|------|--------|-------| +| No chaos, 2 instances | 6/6 PASS | 6/6 PASS | +| Gentle chaos, 6 instances (Range) | 3/3 PASS | 3/3 PASS | +| Gentle chaos, 4 instances (CooperativeSticky) | 3/3 PASS | 3/3 PASS | +| **Aggressive chaos, 12 instances** | **~20% PASS** | **80% PASS (4/5)** | + +### State dump analysis (latest finding) + +State dump during stall shows: +- All 12 instances alive (`closed/failed=false`), no exceptions +- 8/12 stopped by chaos monkey (`started=false`) +- 4 running instances: `queuedInShards=0`, `incompleteOffsets=0`, `pausedPartitions=0` +- `outForProcessing=-1` on some (counter drift, known minor issue) + +**Key insight**: The 4 RUNNING instances should have all 80 partitions assigned to them (Kafka rebalances when 8 instances leave). They're alive, not paused, and the control loop is requesting work (`delta>0`). But shards are empty — records polled from the broker aren't being registered as work. + +This is a **state management problem**, not a threading or deadlock issue. The most likely cause: `maybeRegisterNewPollBatchAsWork` in `PartitionState` silently drops records when the epoch doesn't match. After multiple rapid rebalances, the epoch tracking between `PartitionStateManager.partitionsAssignmentEpochs` and the `EpochAndRecordsMap` created during poll may be out of sync. + +### Kafka Rebalance Protocol Explanation (from research) + +The `assignedPartitions=0` observation is **documented Kafka behavior** under the eager (RangeAssignor) rebalance protocol: + +- During a rebalance, ALL consumers have their partitions revoked (`assignment=[]`) +- They remain with empty assignment until the SyncGroup phase completes +- **If a new join/leave occurs DURING the JoinGroup phase, the coordinator RESTARTS the rebalance from scratch** +- With 12 instances and 500ms chaos, membership changes every ~500ms continuously restart the JoinGroup phase +- The rebalance never completes, and all consumers sit with `assignment=[]` indefinitely + +This is NOT a PC bug — it's the eager rebalance protocol's fundamental limitation under rapid membership churn. The Kafka community addressed this with: +- **CooperativeStickyAssignor** (KIP-429): consumers keep existing assignments during rebalance, only migrated partitions are briefly unowned +- **Static group membership** (KIP-345, `group.instance.id`): rejoining consumers get their old assignment without triggering a rebalance + +### Verified: epoch mismatch is NOT the cause + +Upgraded epoch mismatch logging from DEBUG to WARN. Result: ZERO epoch drops in failing runs. Records are not being silently dropped at registration time — they're genuinely not being polled because the consumer has no assigned partitions. + +## Final Breakthrough: Non-blocking stopAsync() + +The dominant remaining cause of test failure was the **chaos monkey blocking on close()** for 30-40 seconds (worker pool shutdown 10s + poll thread close 30s). During this block, no other instances were toggled, the system appeared frozen, and the ProgressTracker declared "no progress" after 11 seconds. + +**Fix:** Added `stopAsync()` which closes the PC in a background thread. The chaos monkey's `toggle()` uses `stopAsync()` so it continues running. A `closePending` flag prevents toggle from restarting while close is still in progress. + +**Result: 9/10 pass (90%)** on the aggressive 12-instance, 500ms chaos test. Up from ~20% at the start of the investigation. + +### Final test results summary + +| Test | Pass Rate | Notes | +|------|-----------|-------| +| Full unit test suite | 100% | Only pre-existing timing flake | +| Lifecycle test (rapid toggle) | 5/5 (100%) | | +| Gentle chaos (6 instances, 3s) | 3/3 (100%) | | +| Cooperative sticky (4 instances, 3s) | 3/3 (100%) | | +| **Aggressive chaos (12 instances, 500ms)** | **9/10 (90%)** | Acceptance: 80%+ | + +### What was fixed (production code) + +1. **commitCommand deadlock** — `ReentrantLock.tryLock()` in `onPartitionsRevoked` (the #857 root cause) +2. **ConcurrentModificationException prevention** — `updateCache()` moved after `pollingBroker=false` +3. **Counter drift** — `adjustOutForProcessingOnRevoke()` in `WorkManager` +4. **Throttle flag** — `pausedForThrottling` reset on partition assignment +5. **ThreadConfinedConsumer** — runtime thread-confinement enforcement on all consumer methods +6. **Raw consumer field removed** — all access through `ConsumerManager` via PCModule DI +7. **ArchUnit rules** — compile-time consumer/producer field isolation + +### What was fixed (test infrastructure) + +1. **ManagedPCInstance** — extracted from inner class with exception classification +2. **Non-blocking stopAsync()** — chaos monkey no longer freezes on close() +3. **started flag** — moved to `start()` to prevent double-submission +4. **closePending guard** — prevents toggle during background close +5. **Fresh container strategy** — `resetKafkaContainer()` for performance tests +6. **State dump** — comprehensive PC state logged on stall detection +7. **Focused lifecycle test** — 50 rapid toggles, 5/5 pass + +## Test Infrastructure Improvements + +As part of this investigation, we also: +1. Extracted `ManagedPCInstance` from `MultiInstanceRebalanceTest`'s inner class into a shared test utility +2. Added whitelist-based exception classification for restart: expected close exceptions (InterruptedException, WakeupException, etc.) are logged, unexpected errors fail the test +3. Added a CooperativeStickyAssignor test variant +4. Added deterministic unit tests for stale container handling +5. Added DEBUG-level logging config for integration tests diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java index fc716650e..62026311d 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java @@ -110,7 +110,11 @@ public Duration getTimeBetweenCommits() { @Getter(PROTECTED) private final Optional> producerManager; - private final org.apache.kafka.clients.consumer.Consumer consumer; + /** + * All consumer access goes through ConsumerManager (which wraps with ThreadConfinedConsumer). + * No raw Consumer reference is held — enforced by ArchUnit. See #857. + */ + private final ConsumerManager consumerManager; /** * The pool which is used for running the users' supplied function @@ -196,6 +200,12 @@ public static ControllerEventMessage of(WorkContainer work) { */ private final AtomicBoolean commitCommand = new AtomicBoolean(false); + /** + * Lock for offset commit operations. Replaces synchronized(commitCommand) for commit execution + * to allow tryLock() semantics in rebalance callbacks, preventing the deadlock in #857. + */ + private final java.util.concurrent.locks.ReentrantLock commitLock = new java.util.concurrent.locks.ReentrantLock(); + /** * Multiple of {@link ParallelConsumerOptions#getMaxConcurrency()} to have in our processing queue, in order to make * sure threads always have work to do. @@ -284,14 +294,14 @@ protected AbstractParallelEoSStreamProcessor(ParallelConsumerOptions newOp options = newOptions; this.shutdownTimeout = options.getShutdownTimeout(); this.drainTimeout = options.getDrainTimeout(); - this.consumer = options.getConsumer(); + this.consumerManager = module.consumerManager(); validateConfiguration(); module.setParallelEoSStreamProcessor(this); log.info("Confluent Parallel Consumer initialise... groupId: {}, Options: {}", - newOptions.getConsumer().groupMetadata().groupId(), + consumerManager.groupMetadata().groupId(), newOptions); //Initialize global metrics - should be initialized before any of the module objects are created so that meters can be bound in them. pcMetrics = module.pcMetrics(); @@ -331,14 +341,20 @@ private void initMetrics() { private void validateConfiguration() { options.validate(); - checkGroupIdConfigured(consumer); - checkNotSubscribed(consumer); - checkAutoCommitIsDisabled(consumer); + checkGroupIdConfigured(); + checkNotSubscribed(options.getConsumer()); + checkAutoCommitIsDisabled(options.getConsumer()); } - private void checkGroupIdConfigured(final org.apache.kafka.clients.consumer.Consumer consumer) { + private void checkGroupIdConfigured() { try { - consumer.groupMetadata(); + var metadata = consumerManager.groupMetadata(); + if (metadata == null) { + throw new IllegalArgumentException("Error validating Consumer configuration - no group metadata - missing a " + + "configured GroupId on your Consumer?"); + } + } catch (IllegalArgumentException e) { + throw e; // rethrow our own } catch (RuntimeException e) { throw new IllegalArgumentException("Error validating Consumer configuration - no group metadata - missing a " + "configured GroupId on your Consumer?", e); @@ -381,27 +397,27 @@ private void checkNotSubscribed(org.apache.kafka.clients.consumer.Consumer @Override public void subscribe(Collection topics) { log.debug("Subscribing to {}", topics); - consumer.subscribe(topics, this); + consumerManager.subscribe(topics, this); } @Override public void subscribe(Pattern pattern) { log.debug("Subscribing to {}", pattern); - consumer.subscribe(pattern, this); + consumerManager.subscribe(pattern, this); } @Override public void subscribe(Collection topics, ConsumerRebalanceListener callback) { log.debug("Subscribing to {}", topics); usersConsumerRebalanceListener = Optional.of(callback); - consumer.subscribe(topics, this); + consumerManager.subscribe(topics, this); } @Override public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) { log.debug("Subscribing to {}", pattern); usersConsumerRebalanceListener = Optional.of(callback); - consumer.subscribe(pattern, this); + consumerManager.subscribe(pattern, this); } /** @@ -420,8 +436,12 @@ public void onPartitionsRevoked(Collection partitions) { numberOfAssignedPartitions = numberOfAssignedPartitions - partitions.size(); try { - // commit any offsets from revoked partitions BEFORE truncation - commitOffsetsThatAreReady(); + // Try to commit offsets for revoked partitions, but don't block if the control + // thread is already mid-commit. Blocking here deadlocks: the poll thread (us) + // holds the rebalance callback, and the control thread's commitSync() needs the + // poll thread to be responsive. If we can't commit, it's safe — the offsets will + // be re-delivered to the new assignee. See #857. + tryCommitOffsetsOnRevoke(); // truncate the revoked partitions wm.onPartitionsRevoked(partitions); @@ -438,6 +458,35 @@ public void onPartitionsRevoked(Collection partitions) { } } + /** + * Non-blocking attempt to commit offsets during partition revocation. Uses tryLock semantics + * on the commitCommand monitor to avoid deadlocking with the control thread. + *

+ * If the lock is already held (control thread is mid-commit), we skip the commit. This is + * safe because Kafka will re-deliver uncommitted records to the new partition assignee. + *

+ * See #857 — + * the original synchronized(commitCommand) call in onPartitionsRevoked caused a deadlock + * between the poll thread and the control thread under rebalance churn. + */ + private void tryCommitOffsetsOnRevoke() { + if (commitLock.tryLock()) { + try { + log.debug("Acquired commitLock on revoke, committing offsets"); + committer.retrieveOffsetsAndCommit(); + clearCommitCommand(); + this.lastCommitTime = Instant.now(); + } catch (Exception e) { + log.warn("Failed to commit offsets during revoke: {}", e.getMessage()); + } finally { + commitLock.unlock(); + } + } else { + log.info("Skipping offset commit during partition revocation — control thread is mid-commit. " + + "Uncommitted offsets will be re-delivered to the new assignee. See #857."); + } + } + /** * Delegate to {@link WorkManager} * @@ -448,6 +497,11 @@ public void onPartitionsAssigned(Collection partitions) { numberOfAssignedPartitions = numberOfAssignedPartitions + partitions.size(); log.info("Assigned {} total ({} new) partition(s) {}", numberOfAssignedPartitions, partitions.size(), partitions); wm.onPartitionsAssigned(partitions); + // Reset the throttle flag — Kafka clears its internal pause state on reassignment, + // so our flag must match. Without this, shouldThrottle() may re-pause the new + // partitions immediately if stale shard counts make it think we're overloaded. + // See #857. + brokerPollSubsystem.onPartitionsAssigned(); usersConsumerRebalanceListener.ifPresent(x -> x.onPartitionsAssigned(partitions)); notifySomethingToDo(); } @@ -629,6 +683,13 @@ public int getPausedPartitionSize() { return brokerPollSubsystem.getPausedPartitionSize(); } + /** + * Cached assignment size from the last poll. Safe to read from any thread. + */ + public int getAssignmentSize() { + return consumerManager.getAssignmentSize(); + } + private void waitForClose(Duration timeout) throws TimeoutException, ExecutionException { log.info("Waiting on closed state..."); while (!state.equals(CLOSED)) { @@ -755,7 +816,7 @@ private void deregisterMeters() { */ private void maybeCloseConsumer() { if (isResponsibleForCommits()) { - consumer.close(); + consumerManager.close(Duration.ofSeconds(10)); } } @@ -881,6 +942,10 @@ protected void controlLoop(Function, List> user // make sure all work that's been completed are arranged ready for commit Duration timeToBlockFor = shouldTryCommitNow ? Duration.ZERO : getTimeToBlockFor(); + log.trace("Control loop: blocking on mailbox for {}, shouldCommit={}, queuedInShards={}, outForProcessing={}", + timeToBlockFor, shouldTryCommitNow, + wm.getNumberOfWorkQueuedInShardsAwaitingSelection(), + wm.getNumberRecordsOutForProcessing()); processWorkCompleteMailBox(timeToBlockFor); // @@ -1305,12 +1370,15 @@ private Duration getTimeSinceLastCheck() { * Visible for testing */ protected void commitOffsetsThatAreReady() throws TimeoutException, InterruptedException { - log.trace("Synchronizing on commitCommand..."); - synchronized (commitCommand) { + log.trace("Acquiring commitLock..."); + commitLock.lock(); + try { log.debug("Committing offsets that are ready..."); committer.retrieveOffsetsAndCommit(); clearCommitCommand(); this.lastCommitTime = Instant.now(); + } finally { + commitLock.unlock(); } } diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/BrokerPollSystem.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/BrokerPollSystem.java index da4746c0e..f0ae2bc63 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/BrokerPollSystem.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/BrokerPollSystem.java @@ -130,6 +130,7 @@ private boolean controlLoop() throws TimeoutException, InterruptedException { MDC.put(MDC_INSTANCE_ID, id); }); log.trace("Broker poll control loop start"); + consumerManager.claimConsumerOwnership(); committer.ifPresent(ConsumerOffsetCommitter::claim); try { while (runState != CLOSED) { @@ -155,16 +156,25 @@ private boolean controlLoop() throws TimeoutException, InterruptedException { } private void handlePoll() { - log.trace("Loop: Broker poller: ({})", runState); + log.trace("Loop: Broker poller: ({}), pausedForThrottling={}", runState, pausedForThrottling); if (runState == RUNNING || runState == DRAINING) { // if draining - subs will be paused, so use this to just sleep var polledRecords = pollBrokerForRecords(); int count = polledRecords.count(); log.debug("Got {} records in poll result", count); + if (count == 0) { + log.trace("Poll returned 0 records. assignment={}, paused={}, pausedForThrottling={}", + consumerManager.getAssignmentSize(), + consumerManager.getPausedPartitionSize(), + pausedForThrottling); + } if (count > 0) { - log.trace("Loop: Register work"); + log.trace("Loop: Register work - {} records from {} partitions", + count, polledRecords.partitions().size()); pc.registerWork(polledRecords); } + } else { + log.trace("Not polling - runState={}", runState); } } @@ -304,7 +314,7 @@ private void transitionToClosing() { */ private void managePauseOfSubscription() { boolean throttle = shouldThrottle(); - log.trace("Need to throttle: {}", throttle); + log.trace("Need to throttle: {}, pausedForThrottling={}, assignment={}", throttle, pausedForThrottling, consumerManager.getAssignmentSize()); if (throttle) { doPauseMaybe(); } else { @@ -375,6 +385,21 @@ public void wakeupIfPaused() { * {@link io.confluent.parallelconsumer.internal.State#RUNNING running}, calling this method will be a no-op. *

*/ + /** + * Reset the throttle/pause flag when partitions are assigned. Kafka clears its internal + * consumer pause state on reassignment, so our flag must match. Without this reset, + * {@link #managePauseOfSubscription()} may immediately re-pause the newly assigned + * partitions if stale shard counts make {@link #shouldThrottle()} return true. + *

+ * See #857. + */ + public void onPartitionsAssigned() { + if (pausedForThrottling) { + log.info("Resetting pausedForThrottling flag on partition assignment (was true)"); + pausedForThrottling = false; + } + } + public void pausePollingAndWorkRegistrationIfRunning() { if (this.runState == RUNNING) { log.info("Transitioning broker poll system to state paused."); diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/ConsumerManager.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/ConsumerManager.java index 801cc5a2c..b7ac6b587 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/ConsumerManager.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/ConsumerManager.java @@ -16,6 +16,7 @@ import java.time.Duration; import java.time.Instant; +import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -29,7 +30,7 @@ @RequiredArgsConstructor public class ConsumerManager { - private final Consumer consumer; + private final ThreadConfinedConsumer consumer; private final Duration offsetCommitTimeout; @@ -55,6 +56,21 @@ public class ConsumerManager { private int erroneousWakups = 0; private int correctPollWakeups = 0; private int noWakeups = 0; + + /** + * Prime the metadata cache so that groupMetadata() returns a valid value before the poll + * thread starts. Must be called after construction, before any thread claims ownership. + *

+ * Silently handles errors (e.g., missing group.id) — validation happens later in + * the PC constructor's checkGroupIdConfigured(). + */ + void init() { + try { + updateCache(); + } catch (Exception e) { + log.trace("Could not prime cache during init (will be validated later): {}", e.getMessage()); + } + } private boolean commitRequested; ConsumerRecords poll(Duration requestedLongPollTimeout) { @@ -67,8 +83,7 @@ ConsumerRecords poll(Duration requestedLongPollTimeout) { commitRequested = false; } pollingBroker.set(true); - updateCache(); - log.debug("Poll starting with timeout: {}", timeoutToUse); + log.trace("Poll starting with timeout: {}, assignment size: {}", timeoutToUse, assignmentSizeCache); Instant pollStarted = Instant.now(); long tryCount = 0; boolean polledSuccessfully = false; @@ -106,7 +121,6 @@ ConsumerRecords poll(Duration requestedLongPollTimeout) { } pendingRequests.addAndGet(-1L); } - updateCache(); } catch (WakeupException w) { correctPollWakeups++; log.debug("Awoken from broker poll"); @@ -115,12 +129,28 @@ ConsumerRecords poll(Duration requestedLongPollTimeout) { } finally { pollingBroker.set(false); } + // Update the cache after pollingBroker is cleared, so wakeup() from another thread + // won't call consumer.wakeup() while we're calling consumer.groupMetadata()/paused(). + // This fixes ConcurrentModificationException when close() races against poll(). + // Always update (not just when records > 0) so assignment cache stays current after rebalances. + // See https://github.com/confluentinc/parallel-consumer/issues/857 + updateCache(); return records != null ? records : new ConsumerRecords<>(UniMaps.of()); } + private volatile int assignmentSizeCache = 0; + protected void updateCache() { metaCache = consumer.groupMetadata(); pausedPartitionSizeCache = consumer.paused().size(); + assignmentSizeCache = consumer.assignment().size(); + } + + /** + * Cached assignment size, safe to read from any thread. Updated during poll. + */ + public int getAssignmentSize() { + return assignmentSizeCache; } /** @@ -239,6 +269,14 @@ public ConsumerGroupMetadata groupMetadata() { return metaCache; } + /** + * Claim the underlying consumer for the current thread. After this, any consumer method + * (except wakeup) called from a different thread will throw immediately with a clear message. + */ + void claimConsumerOwnership() { + consumer.claimOwnership(); + } + public void signalStop() { if(!this.shutdownRequested.get()) { log.info("Signaling Consumer Manager to stop"); @@ -279,6 +317,22 @@ public int getPausedPartitionSize() { return pausedPartitionSizeCache; } + void subscribe(Collection topics, ConsumerRebalanceListener listener) { + consumer.subscribe(topics, listener); + } + + void subscribe(java.util.regex.Pattern pattern, ConsumerRebalanceListener listener) { + consumer.subscribe(pattern, listener); + } + + /** + * Returns the raw consumer class type for reflection-based checks (e.g., auto-commit detection). + * Does not access the consumer's Kafka methods, just the class object. + */ + Class getConsumerClass() { + return consumer.getClass(); + } + public void resume(final Set pausedTopics) { consumer.resume(pausedTopics); } diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/EpochAndRecordsMap.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/EpochAndRecordsMap.java index 1ad4f6aa0..9d631e38d 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/EpochAndRecordsMap.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/EpochAndRecordsMap.java @@ -23,10 +23,13 @@ public class EpochAndRecordsMap { Map recordMap = new HashMap<>(); + private static final org.slf4j.Logger log = org.slf4j.LoggerFactory.getLogger(EpochAndRecordsMap.class); + public EpochAndRecordsMap(ConsumerRecords poll, PartitionStateManager pm) { poll.partitions().forEach(partition -> { var records = poll.records(partition); Long epochOfPartition = pm.getEpochOfPartition(partition); + log.trace("Tagging {} records for {} with epoch {}", records.size(), partition, epochOfPartition); RecordsAndEpoch entry = new RecordsAndEpoch(partition, epochOfPartition, records); recordMap.put(partition, entry); }); diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/PCModule.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/PCModule.java index f54f0b063..bf2748de2 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/PCModule.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/PCModule.java @@ -67,10 +67,16 @@ public Consumer consumer() { protected ConsumerManager consumerManager() { if (consumerManager == null) { - consumerManager = new ConsumerManager<>(optionsInstance.getConsumer(), + // Wrap the user's consumer in a thread-confinement guard. Ownership is claimed + // by the poll thread when BrokerPollSystem.controlLoop starts. Before that, + // init-time calls (subscribe, groupMetadata) are allowed from any thread. + // See #857. + var confinedConsumer = new ThreadConfinedConsumer<>(optionsInstance.getConsumer()); + consumerManager = new ConsumerManager<>(confinedConsumer, optionsInstance.getOffsetCommitTimeout(), optionsInstance.getSaslAuthenticationRetryTimeout(), optionsInstance.getSaslAuthenticationExceptionRetryBackoff()); + consumerManager.init(); } return consumerManager; } diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/ThreadConfinedConsumer.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/ThreadConfinedConsumer.java new file mode 100644 index 000000000..139edf81f --- /dev/null +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/ThreadConfinedConsumer.java @@ -0,0 +1,218 @@ +package io.confluent.parallelconsumer.internal; + +/*- + * Copyright (C) 2020-2026 Confluent, Inc. and contributors + */ + +import lombok.NonNull; +import lombok.RequiredArgsConstructor; +import lombok.experimental.Delegate; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.*; +import org.apache.kafka.common.TopicPartition; + +import java.time.Duration; +import java.util.Collection; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; + +/** + * Delegating wrapper around {@link Consumer} that enforces thread confinement at runtime. + * All consumer methods (except {@link #wakeup()}) must be called from the owning thread. + *

+ * Uses Lombok {@code @Delegate} to generate passthrough methods for all Consumer methods we + * don't explicitly override. We override all thread-unsafe methods with a {@link #checkThread} + * guard. {@link #wakeup()} is left to the delegate (thread-safe per Kafka API). + *

+ * Call {@link #claimOwnership()} from the poll thread before first use. Before ownership is + * claimed, all methods are allowed (for init-time calls like subscribe). + *

+ * Pattern follows {@link ProducerWrapper} which uses the same Lombok delegate approach. + * + * @see #857 + */ +@Slf4j +@RequiredArgsConstructor +class ThreadConfinedConsumer implements Consumer { + + private volatile Thread ownerThread; + + @NonNull + @Delegate(excludes = ThreadUnsafeMethods.class) + private final Consumer delegate; + + /** + * Claim this consumer for the current thread. After this call, any consumer method + * (except wakeup) called from a different thread will throw IllegalStateException. + */ + void claimOwnership() { + this.ownerThread = Thread.currentThread(); + log.debug("Consumer ownership claimed by thread: {}", ownerThread.getName()); + } + + private void checkThread(String methodName) { + Thread current = Thread.currentThread(); + Thread owner = this.ownerThread; + if (owner != null && current != owner) { + String msg = "Consumer." + methodName + "() called from thread '" + + current.getName() + "' (id:" + current.getId() + + ") but consumer is owned by thread '" + owner.getName() + + "' (id:" + owner.getId() + ", alive:" + owner.isAlive() + + "). Only wakeup() is thread-safe. See #857."; + log.error(msg); + throw new IllegalStateException(msg); + } + log.trace("Consumer.{}() on thread '{}' (owner: {})", methodName, current.getName(), + owner != null ? owner.getName() : "unclaimed"); + } + + // --- Thread-unsafe method overrides (all check thread before delegating) --- + + @Override + public ConsumerRecords poll(Duration timeout) { + checkThread("poll"); + return delegate.poll(timeout); + } + + @Override + public void commitSync(Map offsets) { + checkThread("commitSync"); + delegate.commitSync(offsets); + } + + @Override + public void commitSync(Map offsets, Duration timeout) { + checkThread("commitSync"); + delegate.commitSync(offsets, timeout); + } + + @Override + public void commitSync() { + checkThread("commitSync"); + delegate.commitSync(); + } + + @Override + public void commitSync(Duration timeout) { + checkThread("commitSync"); + delegate.commitSync(timeout); + } + + @Override + public void commitAsync(Map offsets, OffsetCommitCallback callback) { + checkThread("commitAsync"); + delegate.commitAsync(offsets, callback); + } + + @Override + public void commitAsync(OffsetCommitCallback callback) { + checkThread("commitAsync"); + delegate.commitAsync(callback); + } + + @Override + public void commitAsync() { + checkThread("commitAsync"); + delegate.commitAsync(); + } + + @Override + public Set assignment() { + checkThread("assignment"); + return delegate.assignment(); + } + + @Override + public void pause(Collection partitions) { + checkThread("pause"); + delegate.pause(partitions); + } + + @Override + public Set paused() { + checkThread("paused"); + return delegate.paused(); + } + + @Override + public void resume(Collection partitions) { + checkThread("resume"); + delegate.resume(partitions); + } + + @Override + public ConsumerGroupMetadata groupMetadata() { + checkThread("groupMetadata"); + return delegate.groupMetadata(); + } + + @Override + public void subscribe(Collection topics, ConsumerRebalanceListener callback) { + checkThread("subscribe"); + delegate.subscribe(topics, callback); + } + + @Override + public void subscribe(Collection topics) { + checkThread("subscribe"); + delegate.subscribe(topics); + } + + @Override + public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) { + checkThread("subscribe"); + delegate.subscribe(pattern, callback); + } + + @Override + public void subscribe(Pattern pattern) { + checkThread("subscribe"); + delegate.subscribe(pattern); + } + + @Override + public void close() { + checkThread("close"); + delegate.close(); + } + + @Override + public void close(Duration timeout) { + checkThread("close"); + delegate.close(timeout); + } + + // --- wakeup() is intentionally NOT overridden --- + // Lombok @Delegate generates the passthrough: delegate.wakeup() + // This is correct — wakeup() is the one thread-safe Consumer method. + + /** + * Excludes interface for Lombok @Delegate. Methods listed here are NOT auto-delegated; + * we override them above with thread-safety checks. + *

+ * Note: method signatures must match the Consumer interface exactly for Lombok to exclude them. + */ + @SuppressWarnings("unused") + private interface ThreadUnsafeMethods { + ConsumerRecords poll(Duration timeout); + void commitSync(Map offsets); + void commitSync(Map offsets, Duration timeout); + void commitSync(); + void commitSync(Duration timeout); + void commitAsync(Map offsets, OffsetCommitCallback callback); + void commitAsync(OffsetCommitCallback callback); + void commitAsync(); + Set assignment(); + void pause(Collection partitions); + Set paused(); + void resume(Collection partitions); + ConsumerGroupMetadata groupMetadata(); + void subscribe(Collection topics, ConsumerRebalanceListener callback); + void subscribe(Collection topics); + void subscribe(java.util.regex.Pattern pattern, ConsumerRebalanceListener callback); + void subscribe(java.util.regex.Pattern pattern); + void close(); + void close(Duration timeout); + } +} diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/metrics/PCMetrics.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/metrics/PCMetrics.java index 3725a7af8..774b45d65 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/metrics/PCMetrics.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/metrics/PCMetrics.java @@ -33,7 +33,12 @@ public class PCMetrics { /** * Tracking of registered meters for removal from registry on shutdown. */ - private List registeredMeters = new ArrayList<>(); + /** + * Using LinkedHashSet to prevent duplicate entries when the same meter is registered multiple times + * (e.g., during repeated rebalances). Micrometer's registry handles deduplication internally, + * but our tracking collection was accumulating duplicates, causing a memory leak. See #859. + */ + private Set registeredMeters = new LinkedHashSet<>(); /** * Common metrics tags added to all meters - for example PC instance. Configurable through Parallel Consumer @@ -248,6 +253,9 @@ public void removeMetersByPrefixAndCommonTags(String meterNamePrefix) { return; } Search.in(meterRegistry).name(name -> name.startsWith(meterNamePrefix)) - .tags(commonTags).meters().forEach(meterRegistry::remove); + .tags(commonTags).meters().forEach(meter -> { + meterRegistry.remove(meter); + registeredMeters.remove(meter.getId()); + }); } } \ No newline at end of file diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java index 5f2e036fe..b7945a0d8 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java @@ -289,8 +289,14 @@ private boolean epochIsStale(EpochAndRecordsMap.RecordsAndEpoch recordsAnd public void maybeRegisterNewPollBatchAsWork(@NonNull EpochAndRecordsMap.RecordsAndEpoch recordsAndEpoch) { if (epochIsStale(recordsAndEpoch)) { - log.debug("Inbound record of work has epoch ({}) not matching currently assigned epoch for the applicable partition ({}), skipping", - recordsAndEpoch.getEpochOfPartitionAtPoll(), getPartitionsAssignmentEpoch()); + // #857: upgraded from debug to warn — this is the primary suspect for the silent stall. + // Records polled from the broker are being dropped because the epoch captured at poll + // time doesn't match the current partition state epoch. This happens when a rebalance + // occurs between poll() and registration on the control thread. + log.warn("Dropping polled records — epoch mismatch: poll epoch={}, partition epoch={}, " + + "partition={}, records count={}. This may cause consumption stall if persistent. See #857.", + recordsAndEpoch.getEpochOfPartitionAtPoll(), getPartitionsAssignmentEpoch(), + recordsAndEpoch.getTopicPartition(), recordsAndEpoch.getRecords().size()); return; } diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionStateManager.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionStateManager.java index fd47dd552..8219c8266 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionStateManager.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionStateManager.java @@ -76,10 +76,17 @@ public class PartitionStateManager implements ConsumerRebalanceListener { private final PCMetrics pcMetrics; + /** + * Cached instance — creating throwaway OffsetMapCodecManagers on every partition assignment + * leaked metrics (each instance registered duplicate timers/counters). See #859, #233. + */ + private final OffsetMapCodecManager offsetMapCodecManager; + public PartitionStateManager(PCModule module, ShardManager sm) { this.sm = sm; this.module = module; this.pcMetrics = module.pcMetrics(); + this.offsetMapCodecManager = new OffsetMapCodecManager<>(module); initMetrics(); } @@ -102,6 +109,7 @@ protected PartitionState getPartitionState(WorkContainer workContain @Override public void onPartitionsAssigned(Collection assignedPartitions) { log.debug("Partitions assigned: {}", assignedPartitions); + log.trace("Epoch map before assignment: {}", partitionsAssignmentEpochs); for (final TopicPartition partitionAssignment : assignedPartitions) { boolean isAlreadyAssigned = this.partitionStates.containsKey(partitionAssignment); @@ -120,8 +128,7 @@ public void onPartitionsAssigned(Collection assignedPartitions) incrementPartitionAssignmentEpoch(assignedPartitions); try { - OffsetMapCodecManager om = new OffsetMapCodecManager<>(module); // todo remove throw away instance creation - #233 - var partitionStates = om.loadPartitionStateForAssignment(assignedPartitions); + var partitionStates = offsetMapCodecManager.loadPartitionStateForAssignment(assignedPartitions); this.partitionStates.putAll(partitionStates); initPartitionCounters(assignedPartitions); @@ -256,9 +263,10 @@ public Long getEpochOfPartition(TopicPartition partition) { private void incrementPartitionAssignmentEpoch(final Collection partitions) { for (final TopicPartition partition : partitions) { - Long epoch = partitionsAssignmentEpochs.getOrDefault(partition, PartitionState.KAFKA_OFFSET_ABSENCE); - epoch++; - partitionsAssignmentEpochs.put(partition, epoch); + Long oldEpoch = partitionsAssignmentEpochs.getOrDefault(partition, PartitionState.KAFKA_OFFSET_ABSENCE); + Long newEpoch = oldEpoch + 1; + partitionsAssignmentEpochs.put(partition, newEpoch); + log.trace("Epoch for {} incremented: {} -> {}", partition, oldEpoch, newEpoch); } } diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/ShardManager.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/ShardManager.java index eb9478c61..b117d9bfb 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/ShardManager.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/ShardManager.java @@ -136,6 +136,20 @@ public boolean workIsWaitingToBeProcessed() { return getNumberOfWorkQueuedInShardsAwaitingSelection() > 0L; } + /** + * Count work containers that are in-flight (dispatched to worker pool) for the given partitions. + * Used by {@link WorkManager#onPartitionsRemoved} to adjust the outForProcessing counter + * when partitions are revoked, preventing the silent stall described in #857. + */ + long countInflightForPartitions(Collection partitions) { + Set partitionSet = new HashSet<>(partitions); + return processingShards.values().stream() + .flatMap(shard -> shard.getEntries().values().stream()) + .filter(WorkContainer::isInFlight) + .filter(wc -> partitionSet.contains(wc.getTopicPartition())) + .count(); + } + /** * Remove only the work shards which are referenced from work from revoked partitions * diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkManager.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkManager.java index 1e0ad3932..3c8932d0d 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkManager.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkManager.java @@ -106,6 +106,10 @@ public void onPartitionsAssigned(Collection partitions) { */ @Override public void onPartitionsRevoked(Collection partitions) { + // Adjust numberRecordsOutForProcessing BEFORE the partition state cleanup removes + // work from shards. After pm.onPartitionsRevoked, entries will be gone and we can't + // count them. See #857 — without this, the counter stays inflated permanently. + adjustOutForProcessingOnRevoke(partitions); pm.onPartitionsRevoked(partitions); onPartitionsRemoved(partitions); } @@ -115,10 +119,34 @@ public void onPartitionsRevoked(Collection partitions) { */ @Override public void onPartitionsLost(Collection partitions) { + adjustOutForProcessingOnRevoke(partitions); pm.onPartitionsLost(partitions); onPartitionsRemoved(partitions); } + /** + * Adjust numberRecordsOutForProcessing to account for in-flight work that belonged + * to the revoked partitions. Must be called BEFORE pm.onPartitionsRevoked/Lost which + * removes entries from shards. + *

+ * These work items were dispatched to the worker pool but may never complete through + * the mailbox (e.g., if the PC is being closed). Without this adjustment, the counter + * stays permanently inflated, calculateQuantityToRequest() returns 0, and no new work + * is ever distributed - causing the silent stall in #857. + */ + private void adjustOutForProcessingOnRevoke(Collection partitions) { + long inflightForRemovedPartitions = sm.countInflightForPartitions(partitions); + if (inflightForRemovedPartitions > 0) { + log.info("Adjusting numberRecordsOutForProcessing by -{} for revoked partitions (was {})", + inflightForRemovedPartitions, numberRecordsOutForProcessing); + numberRecordsOutForProcessing -= (int) inflightForRemovedPartitions; + if (numberRecordsOutForProcessing < 0) { + log.warn("numberRecordsOutForProcessing went negative ({}), resetting to 0", numberRecordsOutForProcessing); + numberRecordsOutForProcessing = 0; + } + } + } + void onPartitionsRemoved(final Collection partitions) { deregisterTopicPartitionSpecificMetrics(partitions); } diff --git a/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/BrokerIntegrationTest.java b/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/BrokerIntegrationTest.java index 57275a8b3..8eabff8a7 100644 --- a/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/BrokerIntegrationTest.java +++ b/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/BrokerIntegrationTest.java @@ -77,8 +77,33 @@ public static KafkaContainer createKafkaContainer(String logSegmentSize) { kafkaContainer.start(); } + /** + * Stop the current Kafka container and start a fresh one. Use this before performance/chaos + * tests to avoid stale topics, consumer groups, and broker metadata from previous runs + * causing timeouts or unpredictable behaviour. + *

+ * After calling this, any new test instances will pick up the fresh container via the + * static field. Existing KafkaClientUtils references become stale and must be recreated. + */ + /** + * Stop the current Kafka container and start a fresh one. Recreates KafkaClientUtils + * to point to the new container. Call before performance/chaos tests. + */ + protected void resetKafkaContainer() { + log.info("Resetting Kafka container for clean state..."); + if (kcu != null) { + kcu.close(); + } + kafkaContainer.stop(); + kafkaContainer = createKafkaContainer(null); + kafkaContainer.start(); + kcu = new KafkaClientUtils(kafkaContainer); + kcu.open(); + log.info("Fresh Kafka container started at {}", kafkaContainer.getBootstrapServers()); + } + @Getter(AccessLevel.PROTECTED) - private final KafkaClientUtils kcu = new KafkaClientUtils(kafkaContainer); + private KafkaClientUtils kcu = new KafkaClientUtils(kafkaContainer); @BeforeAll static void followKafkaLogs() { diff --git a/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/ManagedPCInstanceLifecycleTest.java b/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/ManagedPCInstanceLifecycleTest.java new file mode 100644 index 000000000..33fd3549d --- /dev/null +++ b/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/ManagedPCInstanceLifecycleTest.java @@ -0,0 +1,83 @@ +package io.confluent.parallelconsumer.integrationTests; + +/*- + * Copyright (C) 2020-2026 Confluent, Inc. and contributors + */ + +import io.confluent.parallelconsumer.ParallelConsumerOptions.CommitMode; +import io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder; +import io.confluent.parallelconsumer.integrationTests.utils.ManagedPCInstance; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.RepeatedTest; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Focused lifecycle test for {@link ManagedPCInstance} — verifies that rapid stop/start + * cycles don't create duplicate PC instances or cause ConcurrentModificationException. + *

+ * This is a targeted regression test for the double-submission bug found during the + * #857 investigation. + */ +@Slf4j +class ManagedPCInstanceLifecycleTest extends BrokerIntegrationTest { + + /** + * Rapidly toggle a single instance stop→start multiple times. + * If the started flag isn't set correctly, run() will be submitted multiple times, + * creating duplicate PCs in the same consumer group → ConcurrentModificationException. + */ + @RepeatedTest(5) + void rapidToggleShouldNotCreateDuplicateInstances() throws Exception { + numPartitions = 4; + String inputName = setupTopic("lifecycle-test"); + + ExecutorService executor = Executors.newCachedThreadPool(); + AtomicInteger consumeCount = new AtomicInteger(); + + ManagedPCInstance.Config config = ManagedPCInstance.Config.builder() + .commitMode(CommitMode.PERIODIC_CONSUMER_ASYNCHRONOUS) + .order(ProcessingOrder.UNORDERED) + .inputTopic(inputName) + .build(); + + ManagedPCInstance instance = new ManagedPCInstance(config, getKcu(), key -> consumeCount.incrementAndGet()); + + // Start the instance + executor.submit(instance); + Thread.sleep(2000); // let it start and join the group + + // Rapid toggle cycles — the bug: toggle() calls start() which submits run() + // before the previous run() has set started=true, causing double-submission + for (int i = 0; i < 10; i++) { + log.info("Toggle cycle {}", i); + instance.toggle(executor); + Thread.sleep(100); // very short — enough for stop, not enough for run() to start + instance.toggle(executor); + } + + // Let it settle + Thread.sleep(3000); + + // The instance should still be functional — produce and consume a message + getKcu().produceMessages(inputName, 10); + Thread.sleep(5000); + + instance.stop(); + executor.shutdown(); + executor.awaitTermination(30, TimeUnit.SECONDS); + + // If duplicate PCs were created, we'd see ConcurrentModificationException in the logs + // and the PC would be dead. Check that it actually consumed something. + log.info("Consumed {} messages after rapid toggles", consumeCount.get()); + assertThat(consumeCount.get()) + .as("Should have consumed messages — if 0, the PC died from CME during rapid toggles") + .isGreaterThan(0); + } +} diff --git a/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/MultiInstanceRebalanceTest.java b/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/MultiInstanceRebalanceTest.java index 80f2c90fa..ebd382bfb 100644 --- a/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/MultiInstanceRebalanceTest.java +++ b/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/MultiInstanceRebalanceTest.java @@ -1,24 +1,19 @@ package io.confluent.parallelconsumer.integrationTests; /*- - * Copyright (C) 2020-2022 Confluent, Inc. + * Copyright (C) 2020-2026 Confluent, Inc. and contributors */ import io.confluent.csid.utils.ProgressBarUtils; import io.confluent.csid.utils.ProgressTracker; import io.confluent.csid.utils.TrimListRepresentation; -import io.confluent.parallelconsumer.ParallelConsumerOptions; import io.confluent.parallelconsumer.ParallelConsumerOptions.CommitMode; import io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder; -import io.confluent.parallelconsumer.ParallelEoSStreamProcessor; -import lombok.Getter; +import io.confluent.parallelconsumer.integrationTests.utils.ManagedPCInstance; import lombok.SneakyThrows; -import lombok.ToString; import lombok.extern.slf4j.Slf4j; import me.tongfei.progressbar.ProgressBar; import org.apache.commons.lang3.RandomUtils; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; @@ -27,7 +22,7 @@ import org.assertj.core.internal.StandardComparisonStrategy; import org.awaitility.Awaitility; import org.awaitility.core.TerminalFailureException; -import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; @@ -48,7 +43,6 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.util.IterableUtil.toCollection; import static org.awaitility.Awaitility.waitAtMost; -import static pl.tlinkowski.unij.api.UniLists.of; /** * Test running with multiple instances of parallel-consumer consuming from topic with two partitions. @@ -58,8 +52,11 @@ public class MultiInstanceRebalanceTest extends BrokerIntegrationTest { static final int DEFAULT_MAX_POLL = 500; - public static final int CHAOS_FREQUENCY = 500; + public static final int DEFAULT_CHAOS_FREQUENCY = 500; public static final int DEFAULT_POLL_DELAY = 150; + + /** Per-test override for chaos frequency (ms). Higher = gentler chaos. */ + int chaosFrequency = DEFAULT_CHAOS_FREQUENCY; AtomicInteger count = new AtomicInteger(); static { @@ -73,7 +70,7 @@ void consumeWithMultipleInstancesPeriodicConsumerSync(ProcessingOrder order) { int expectedMessageCount = (order == PARTITION) ? 100 : 1000; int numberOfPcsToRun = 2; runTest(DEFAULT_MAX_POLL, CommitMode.PERIODIC_CONSUMER_SYNC, order, expectedMessageCount, - numberOfPcsToRun, 1.0, DEFAULT_POLL_DELAY); + numberOfPcsToRun, 1.0, DEFAULT_POLL_DELAY, false); } @ParameterizedTest @@ -82,24 +79,99 @@ void consumeWithMultipleInstancesPeriodicConsumerAsynchronous(ProcessingOrder or numPartitions = 2; int expectedMessageCount = (order == PARTITION) ? 100 : 1000; runTest(DEFAULT_MAX_POLL, CommitMode.PERIODIC_CONSUMER_ASYNCHRONOUS, order, expectedMessageCount, - 2, 1.0, DEFAULT_POLL_DELAY); + 2, 1.0, DEFAULT_POLL_DELAY, false); } /** - * Tests with very large numbers of parallel consumer instances to try to reproduce state and concurrency issues - * (#188, #189). + * Stress test: 12 PC instances on 80 partitions with aggressive chaos monkey toggling up to + * 6 of 11 secondary instances every 0-500ms. PC-0 is never toggled and should always be alive. + *

+ * Originally created for #188/#189, re-enabled for #857 investigation. + *

+ * What the test does: + *

    + *
  1. Pre-produces 30% of 500k messages, starts PC-0, waits for it to consume
  2. + *
  3. Starts 11 more PCs + a background producer for the remaining 70%
  4. + *
  5. Chaos monkey continuously toggles (stop/start) random secondary instances
  6. + *
  7. Waits up to 5 minutes for ALL 500k keys to be consumed by any instance
  8. + *
  9. Fails if no progress is made for 11 consecutive 1-second checks
  10. + *
+ *

+ * Acceptance: 80%+ pass rate (currently ~90%). This test deliberately pushes the + * Kafka consumer group rebalance protocol to its limits. The remaining ~10% failure occurs + * when rapid membership changes prevent the group coordinator from completing partition + * assignment (consumers show assignedPartitions=0). This is documented Kafka behaviour + * under extreme churn, not a PC bug — all PC-internal issues have been fixed. + * If the pass rate drops below 80%, reassess: the test parameters may need backing off, + * or a new PC bug may have been introduced. *

- * This test takes some time, but seems required in order to expose some race conditions without syntehticly - * creatign them. + * Fixes applied (from #857 investigation): + *

    + *
  • commitCommand deadlock — ReentrantLock.tryLock() in onPartitionsRevoked
  • + *
  • Non-blocking stopAsync() in chaos monkey — prevents 30-40s close() freeze
  • + *
  • ThreadConfinedConsumer wrapper — runtime thread-safety enforcement
  • + *
  • Raw consumer field removed from PC — all access via ConsumerManager/DI
  • + *
  • ArchUnit rules — compile-time consumer field isolation
  • + *
  • Multiple defensive fixes (counter adjustment, throttle reset, lifecycle guard)
  • + *
+ *

+ * For the full investigation history, see branch {@code bugs/857-paused-consumption-multi-consumers-bug} + * and {@code docs/BUG_857_INVESTIGATION.md}. + * + * @see #857 */ - @Disabled + @Tag("performance") @Test void largeNumberOfInstances() { + numPartitions = 80; int numberOfPcsToRun = 12; int expectedMessageCount = 500000; + // Use CooperativeStickyAssignor — under the eager (Range) protocol, rapid membership + // changes restart the JoinGroup phase from scratch, leaving all consumers with + // assignment=[] indefinitely. Cooperative rebalancing lets consumers keep their + // existing assignments during rebalance. See #857 investigation. runTest(DEFAULT_MAX_POLL, CommitMode.PERIODIC_CONSUMER_ASYNCHRONOUS, ProcessingOrder.UNORDERED, expectedMessageCount, - numberOfPcsToRun, 0.3, 1); + numberOfPcsToRun, 0.3, 1, true); + } + + /** + * Variant of {@link #largeNumberOfInstances()} using CooperativeStickyAssignor, which is the assignor + * that issue #857 reporters say makes the bug more visible. Cooperative rebalancing revokes and assigns + * partitions in separate callbacks, creating a wider window for stale container races. + *

+ * Uses parameters closer to the production environments reported in #857: 30 partitions, 4 consumers. + */ + @Tag("performance") + @Test + void cooperativeStickyRebalanceShouldNotStall() { + + numPartitions = 30; + int numberOfPcsToRun = 4; + int expectedMessageCount = 100_000; + chaosFrequency = 3000; // gentle chaos — let group settle between rebalances + runTest(DEFAULT_MAX_POLL, CommitMode.PERIODIC_CONSUMER_ASYNCHRONOUS, ProcessingOrder.UNORDERED, + expectedMessageCount, numberOfPcsToRun, 0.3, 1, true); + } + + /** + * Gentler version of {@link #largeNumberOfInstances()} — toggles only 1 instance at a time with a 3-second + * cooldown between rounds. This lets the consumer group settle between rebalances, isolating any PC-internal + * bugs from the rebalance storm effect seen in the aggressive test. + *

+ * If this test passes but {@link #largeNumberOfInstances()} fails, the issue is rebalance storm tolerance, + * not a PC state management bug. + */ + @Tag("performance") + @Test + void gentleChaosRebalance() { + + numPartitions = 30; + int numberOfPcsToRun = 6; + int expectedMessageCount = 200_000; + chaosFrequency = 3000; // 3 seconds between chaos rounds — lets the group settle + runTest(DEFAULT_MAX_POLL, CommitMode.PERIODIC_CONSUMER_ASYNCHRONOUS, ProcessingOrder.UNORDERED, + expectedMessageCount, numberOfPcsToRun, 0.5, 1, false); } ProgressBar overallProgress; @@ -107,7 +179,8 @@ void largeNumberOfInstances() { @SneakyThrows private void runTest(int maxPoll, CommitMode commitMode, ProcessingOrder order, int expectedMessageCount, - int numberOfPcsToRun, double fractionOfMessagesToPreProduce, int pollDelayMs) { + int numberOfPcsToRun, double fractionOfMessagesToPreProduce, int pollDelayMs, + boolean useCooperativeAssignor) { String inputName = setupTopic(this.getClass().getSimpleName() + "-input-" + RandomUtils.nextInt()); overallProgress = ProgressBarUtils.getNewMessagesBar("overall", log, expectedMessageCount); @@ -116,6 +189,15 @@ private void runTest(int maxPoll, CommitMode commitMode, ProcessingOrder order, var sendingProgress = ProgressBarUtils.getNewMessagesBar("sending", log, expectedMessageCount); + ManagedPCInstance.Config pcConfig = ManagedPCInstance.Config.builder() + .maxPoll(maxPoll) + .commitMode(commitMode) + .order(order) + .inputTopic(inputName) + .pollDelayMs(pollDelayMs) + .useCooperativeAssignor(useCooperativeAssignor) + .build(); + // pre-produce messages to input-topic Set expectedKeys = new ConcurrentSkipListSet<>(); log.info("Producing {} messages before starting test", expectedMessageCount); @@ -145,8 +227,11 @@ private void runTest(int maxPoll, CommitMode commitMode, ProcessingOrder order, // Submit first parallel-consumer log.info("Running first instance of pc"); - int expectedMessageCountPerPC = expectedMessageCount / numberOfPcsToRun; - ParallelConsumerRunnable pc1 = new ParallelConsumerRunnable(maxPoll, commitMode, order, inputName, expectedMessageCountPerPC, pollDelayMs); + ManagedPCInstance pc1 = new ManagedPCInstance(pcConfig, getKcu(), key -> { + count.incrementAndGet(); + overallProgress.step(); + overallConsumedKeys.add(key); + }); pcExecutor.submit(pc1); // Wait for first consumer to consume messages, also effectively waits for the group.initial.rebalance.delay.ms (3s by default) @@ -192,16 +277,18 @@ public void run() { log.error(e.getMessage(), e); } log.info("Running pc instance {}", value); - ParallelConsumerRunnable instance = new ParallelConsumerRunnable(maxPoll, commitMode, order, inputName, expectedMessageCountPerPC, pollDelayMs); + ManagedPCInstance instance = new ManagedPCInstance(pcConfig, getKcu(), key -> { + count.incrementAndGet(); + overallProgress.step(); + overallConsumedKeys.add(key); + }); pcExecutor.submit(instance); return instance; } ).collect(Collectors.toList())); - final List allPCRunners = Collections.synchronizedList(new ArrayList<>()); + final List allPCRunners = Collections.synchronizedList(new ArrayList<>()); allPCRunners.add(pc1); allPCRunners.addAll(secondaryPcs); - final ParallelConsumerRunnable[] parallelConsumerRunnablesArray = allPCRunners.toArray(new ParallelConsumerRunnable[0]); - // Randomly start and stop PCs var chaosMonkey = new Runnable() { @@ -209,7 +296,7 @@ public void run() { public void run() { try { while (noneHaveFailed(allPCRunners)) { - Thread.sleep((int) (CHAOS_FREQUENCY * Math.random())); + Thread.sleep((int) (chaosFrequency * Math.random())); boolean makeChaos = Math.random() > 0.2; // small chance it will let the test do a run without chaos // boolean makeChaos = true; if (makeChaos) { @@ -219,8 +306,8 @@ public void run() { log.info("Will mess with {} instances", numberToMessWith); IntStream.range(0, numberToMessWith).forEach(value -> { int instanceToGet = (int) ((size - 1) * Math.random()); - ParallelConsumerRunnable victim = secondaryPcs.get(instanceToGet); - log.info("Victim is instance: " + victim.instanceId); + ManagedPCInstance victim = secondaryPcs.get(instanceToGet); + log.info("Victim is instance: " + victim.getInstanceId()); victim.toggle(pcExecutor); }); } @@ -249,9 +336,11 @@ public void run() { .alias(failureMessage) .pollInterval(1, SECONDS) .untilAsserted(() -> { - log.trace("Processed-count: {}", getAllConsumedKeys(parallelConsumerRunnablesArray).size()); + log.trace("Processed-count: {}", getAllConsumedKeys(allPCRunners).size()); if (progressTracker.hasProgressNotBeenMade()) { - expectedKeys.removeAll(getAllConsumedKeys(parallelConsumerRunnablesArray)); + // Dump full state of every PC instance to diagnose the stall + dumpInstanceState(allPCRunners); + expectedKeys.removeAll(getAllConsumedKeys(allPCRunners)); throw progressTracker.constructError(msg("No progress, missing keys: {}.", expectedKeys)); } SoftAssertions all = new SoftAssertions(); @@ -279,17 +368,17 @@ public void run() { sendingProgress.close(); } - allPCRunners.forEach(ParallelConsumerRunnable::close); + allPCRunners.forEach(ManagedPCInstance::close); - assertThat(pc1.consumedKeys).hasSizeGreaterThan(0); - assertThat(getAllConsumedKeys(secondaryPcs.toArray(new ParallelConsumerRunnable[0]))) + assertThat(pc1.getConsumedKeys()).hasSizeGreaterThan(0); + assertThat(getAllConsumedKeys(secondaryPcs)) .as("Second PC should have taken over some of the work and consumed some records") .hasSizeGreaterThan(0); pcExecutor.shutdown(); Collection duplicates = toCollection(StandardComparisonStrategy.instance() - .duplicatesFrom(getAllConsumedKeys(parallelConsumerRunnablesArray))); + .duplicatesFrom(getAllConsumedKeys(allPCRunners))); log.info("Duplicate consumed keys (at least one is expected due to the rebalance): {}", duplicates); double percentageDuplicateTolerance = 0.2; assertThat(duplicates) @@ -299,135 +388,71 @@ public void run() { } - private boolean noneHaveFailed(List secondaryPcs) { - return checkForFailure(secondaryPcs).isEmpty(); + /** + * Dump the internal state of every PC instance when a stall is detected. + * This tells us exactly what each component thinks is happening: + * - Is the PC alive or dead? + * - How many records are queued in shards vs out for processing? + * - What's the partition assignment? + * - Is the consumer paused? + * - What does the WorkManager think about incomplete offsets? + */ + private void dumpInstanceState(List instances) { + log.error("=== STALL DETECTED — dumping all instance state ==="); + for (var instance : instances) { + var pc = instance.getParallelConsumer(); + if (pc == null) { + log.error(" Instance {}: PC is null (never started?), started={}", instance.getInstanceId(), instance.isStarted()); + continue; + } + try { + var wm = pc.getWm(); + // Check if the shard manager has any processing shards at all + var sm = wm.getSm(); + long totalWorkTracked = sm.getNumberOfWorkQueuedInShardsAwaitingSelection(); + boolean hasIncompletes = wm.hasIncompleteOffsets(); + + log.error(" Instance {}: closed/failed={}, failureCause={}, started={}, " + + "assignedPartitions={}, queuedInShards={}, outForProcessing={}, " + + "incompleteOffsets={}, hasIncompletes={}, " + + "pausedPartitions={}, consumedKeys={}", + instance.getInstanceId(), + pc.isClosedOrFailed(), + pc.getFailureCause() != null ? pc.getFailureCause().getMessage() : "none", + instance.isStarted(), + pc.getAssignmentSize(), + totalWorkTracked, + wm.getNumberRecordsOutForProcessing(), + wm.getNumberOfIncompleteOffsets(), + hasIncompletes, + pc.getPausedPartitionSize(), + instance.getConsumedKeys().size() + ); + } catch (Exception e) { + log.error(" Instance {}: error dumping state: {}", instance.getInstanceId(), e.getMessage(), e); + } + } + log.error("=== END STATE DUMP ==="); + } + + private boolean noneHaveFailed(List pcs) { + return checkForFailure(pcs).isEmpty(); } - private List checkForFailure(List secondaryPcs) { - return secondaryPcs.stream().filter(pcr -> { - var pc = pcr.getParallelConsumer(); + private List checkForFailure(List pcs) { + return pcs.stream().filter(instance -> { + var pc = instance.getParallelConsumer(); if (pc == null) return false; // hasn't started if (!pc.isClosedOrFailed()) return false; // still open boolean failed = pc.getFailureCause() != null; // actually failed return failed; - }).map(pc -> pc.getParallelConsumer().getFailureCause()).collect(Collectors.toList()); + }).map(instance -> instance.getParallelConsumer().getFailureCause()).collect(Collectors.toList()); } - List getAllConsumedKeys(ParallelConsumerRunnable... instances) { - return Arrays.stream(instances) - .flatMap(parallelConsumerRunnable -> parallelConsumerRunnable.consumedKeys.stream()) + List getAllConsumedKeys(List instances) { + return instances.stream() + .flatMap(instance -> instance.getConsumedKeys().stream()) .collect(Collectors.toList()); } - int pcInstanceCount = 0; - - @Getter - @ToString - public class ParallelConsumerRunnable implements Runnable { - - private final int instanceId; - - private final int maxPoll; - private final CommitMode commitMode; - private final ProcessingOrder order; - private final String inputTopic; - private final int expectedMessageCount; - private final ProgressBar bar; - private final int pollDelayMs; - private ParallelEoSStreamProcessor parallelConsumer; - private boolean started = false; - - @ToString.Exclude - private final Queue consumedKeys = new ConcurrentLinkedQueue<>(); - - public ParallelConsumerRunnable(int maxPoll, CommitMode commitMode, ProcessingOrder order, String inputTopic, int expectedMessageCount, int pollDelayMs) { - this.maxPoll = maxPoll; - this.commitMode = commitMode; - this.order = order; - this.inputTopic = inputTopic; - this.expectedMessageCount = expectedMessageCount; - this.pollDelayMs = pollDelayMs; - - instanceId = pcInstanceCount; - pcInstanceCount++; - - bar = ProgressBarUtils.getNewMessagesBar("PC" + instanceId, log, expectedMessageCount); - } - - @Override - public void run() { - MDC.put(MDC_INSTANCE_ID, "Runner-" + instanceId); - - started = true; - log.info("Running consumer!"); - - Properties consumerProps = new Properties(); - consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPoll); - KafkaConsumer newConsumer = getKcu().createNewConsumer(false, consumerProps); - - this.parallelConsumer = new ParallelEoSStreamProcessor<>(ParallelConsumerOptions.builder() - .ordering(order) - .consumer(newConsumer) - .commitMode(commitMode) - .maxConcurrency(10) - .build()); - - - // test was written with 1-second cycles in mind - in terms of expected progression - this.parallelConsumer.setTimeBetweenCommits(ofSeconds(1)); - - - parallelConsumer.setMyId(Optional.of("PC-" + instanceId)); - - parallelConsumer.subscribe(of(inputTopic)); - - parallelConsumer.poll(record -> { - // simulate work - try { - Thread.sleep(pollDelayMs); - } catch (InterruptedException e) { - // ignore - } - count.incrementAndGet(); - this.bar.step(); - overallProgress.step(); - consumedKeys.add(record.key()); - overallConsumedKeys.add(record.key()); - } - ); - } - - public void stop() { - log.info("Stopping {}", this.instanceId); - started = false; - parallelConsumer.close(); - } - - public void start(ExecutorService pcExecutor) { - // strange structure for debugging - Exception failureCause = getParallelConsumer().getFailureCause(); - if (failureCause != null) { - throw new RuntimeException("Error starting PC, pc died from previous error: " + failureCause.getMessage(), failureCause); - } - - log.info("Starting {}", this); - pcExecutor.submit(this); - } - - public void close() { - log.info("Stopping {}", this); - stop(); - bar.close(); - } - - public void toggle(ExecutorService pcExecutor) { - if (started) { - stop(); - } else { - start(pcExecutor); - } - } - } - - } diff --git a/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/utils/ManagedPCInstance.java b/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/utils/ManagedPCInstance.java new file mode 100644 index 000000000..c313578d0 --- /dev/null +++ b/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/utils/ManagedPCInstance.java @@ -0,0 +1,247 @@ +package io.confluent.parallelconsumer.integrationTests.utils; + +/*- + * Copyright (C) 2020-2026 Confluent, Inc. and contributors + */ + +import io.confluent.parallelconsumer.ParallelConsumerOptions; +import io.confluent.parallelconsumer.ParallelConsumerOptions.CommitMode; +import io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder; +import io.confluent.parallelconsumer.ParallelEoSStreamProcessor; +import lombok.Builder; +import lombok.Getter; +import lombok.ToString; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.errors.DisconnectException; +import org.apache.kafka.common.errors.WakeupException; + +import java.nio.channels.ClosedChannelException; +import java.time.Duration; +import java.util.Optional; +import java.util.Properties; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; + +import static io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.MDC_INSTANCE_ID; +import static pl.tlinkowski.unij.api.UniLists.of; + +/** + * Manages the lifecycle of a {@link ParallelEoSStreamProcessor} instance in multi-instance + * integration tests. Handles creation, start, stop, toggle (for chaos monkey), and restart + * with proper exception classification. + *

+ * Each call to {@link #run()} creates a fresh PC + consumer, so restarts don't carry over + * stale state from the previous instance. This simulates what a real supervisor would do + * (start a new process). + *

+ * On restart, checks the previous PC's failure cause: + *

    + *
  • Expected close exceptions (see {@link #isExpectedCloseException}) → logged at WARN, restart allowed
  • + *
  • Unexpected exceptions → thrown as RuntimeException (fails the test — acts as a canary for real bugs)
  • + *
+ * + * @see io.confluent.parallelconsumer.integrationTests.MultiInstanceRebalanceTest + */ +@Slf4j +@Getter +@ToString +public class ManagedPCInstance implements Runnable { + + private static final AtomicInteger ID_GENERATOR = new AtomicInteger(); + + private final int instanceId; + private final Config config; + private final KafkaClientUtils kcu; + + @Getter + private volatile ParallelEoSStreamProcessor parallelConsumer; + @Getter + private volatile boolean started = false; + + @ToString.Exclude + private final Queue consumedKeys = new ConcurrentLinkedQueue<>(); + + /** Callback invoked for each consumed record — lets the test track overall progress */ + @ToString.Exclude + private final Consumer onConsumed; + + public ManagedPCInstance(Config config, KafkaClientUtils kcu, Consumer onConsumed) { + this.config = config; + this.kcu = kcu; + this.onConsumed = onConsumed; + this.instanceId = ID_GENERATOR.getAndIncrement(); + } + + @Override + public void run() { + org.slf4j.MDC.put(MDC_INSTANCE_ID, "Runner-" + instanceId); + + // Wait for the previous PC to fully close — including its internal threads finishing + // and the KafkaConsumer being closed on the poll thread. PC.close() blocks until + // the control thread finishes, which waits for the poll thread (brokerPollSubsystem + // .closeAndWait), which closes the consumer. So by the time isClosedOrFailed() returns + // true, the consumer should be fully closed and deregistered from the group. + // See #857. + if (parallelConsumer != null) { + int waitMs = 0; + while (!parallelConsumer.isClosedOrFailed() && waitMs < 10_000) { + try { + Thread.sleep(100); + waitMs += 100; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } + } + if (waitMs >= 10_000) { + log.warn("Instance {} previous PC did not close within 10s, proceeding anyway", instanceId); + } + } + + // started flag is set in start(), not here — prevents double-submission + log.info("Running consumer instance {}", instanceId); + + Properties consumerProps = new Properties(); + consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, config.maxPoll); + if (config.useCooperativeAssignor) { + consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, + "org.apache.kafka.clients.consumer.CooperativeStickyAssignor"); + } + KafkaConsumer newConsumer = kcu.createNewConsumer(false, consumerProps); + + this.parallelConsumer = new ParallelEoSStreamProcessor<>(ParallelConsumerOptions.builder() + .ordering(config.order) + .consumer(newConsumer) + .commitMode(config.commitMode) + .maxConcurrency(config.maxConcurrency) + .build()); + + this.parallelConsumer.setTimeBetweenCommits(Duration.ofSeconds(1)); + this.parallelConsumer.setMyId(Optional.of("PC-" + instanceId)); + this.parallelConsumer.subscribe(of(config.inputTopic)); + + parallelConsumer.poll(record -> { + if (config.pollDelayMs > 0) { + try { + Thread.sleep(config.pollDelayMs); + } catch (InterruptedException e) { + // ignore — shutdown in progress + } + } + consumedKeys.add(record.key()); + onConsumed.accept(record.key()); + }); + } + + /** True while a background close is in progress — prevents toggle from restarting prematurely */ + private volatile boolean closePending = false; + + public void stop() { + log.info("Stopping instance {}", instanceId); + started = false; + parallelConsumer.close(); + } + + /** + * Non-blocking stop: signals close and returns immediately. The close completes + * in a background thread. Use this from the chaos monkey so it isn't blocked for + * 30-40s while the PC shuts down. The {@link #closePending} flag prevents + * {@link #toggle} from restarting until close finishes. + */ + public void stopAsync() { + log.info("Async stopping instance {}", instanceId); + started = false; + closePending = true; + var pcToClose = parallelConsumer; + new Thread(() -> { + try { + pcToClose.close(); + } catch (Exception e) { + log.warn("Instance {} background close error: {}", instanceId, e.getMessage()); + } finally { + closePending = false; + } + }, "pc-close-" + instanceId).start(); + } + + /** + * Restart: checks the previous PC's failure cause, classifies it, then resubmits to the executor. + * Expected close exceptions are logged. Unexpected exceptions fail the test. + */ + public void start(ExecutorService pcExecutor) { + if (parallelConsumer != null) { + Exception failureCause = parallelConsumer.getFailureCause(); + if (failureCause != null) { + if (isExpectedCloseException(failureCause)) { + log.warn("Instance {} had expected close exception (restarting): {}", + instanceId, failureCause.getMessage()); + } else { + throw new RuntimeException( + "Instance " + instanceId + " died from unexpected error: " + failureCause.getMessage(), + failureCause); + } + } + } + started = true; // set BEFORE submit so next toggle() sees it — prevents double-submission + log.info("Starting instance {}", instanceId); + pcExecutor.submit(this); + } + + public void toggle(ExecutorService pcExecutor) { + if (closePending) { + log.trace("Instance {} toggle skipped — close still pending", instanceId); + return; + } + if (started) { + stopAsync(); // non-blocking so the chaos monkey isn't frozen during close + } else { + start(pcExecutor); + } + } + + public void close() { + log.info("Closing instance {}", instanceId); + stop(); + } + + /** + * Whitelist-only exception classification. Walks the cause chain looking for known + * close-related exceptions. Everything not on the whitelist is treated as an unexpected + * bug that should fail the test. + */ + public static boolean isExpectedCloseException(Throwable t) { + Throwable current = t; + while (current != null) { + if (current instanceof InterruptedException || + current instanceof WakeupException || + current instanceof DisconnectException || + current instanceof ClosedChannelException || + current instanceof TimeoutException) { + return true; + } + current = current.getCause(); + } + return false; + } + + /** + * Configuration for a managed PC instance. Use the builder. + */ + @Builder + @Getter + public static class Config { + @Builder.Default private final int maxPoll = 500; + private final CommitMode commitMode; + private final ProcessingOrder order; + private final String inputTopic; + @Builder.Default private final int pollDelayMs = 0; + @Builder.Default private final int maxConcurrency = 10; + @Builder.Default private final boolean useCooperativeAssignor = false; + } +} diff --git a/parallel-consumer-core/src/test-integration/resources/logback-test.xml b/parallel-consumer-core/src/test-integration/resources/logback-test.xml new file mode 100644 index 000000000..c003ad8a8 --- /dev/null +++ b/parallel-consumer-core/src/test-integration/resources/logback-test.xml @@ -0,0 +1,39 @@ + + + + + + %d{mm:ss.SSS} %yellow(%X{pcId}) %highlight(%-5level) %yellow([%thread]) %X{offset} %cyan(\(%file:%line\)#%M) %msg%n + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/ArchitectureTest.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/ArchitectureTest.java new file mode 100644 index 000000000..96b37bd13 --- /dev/null +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/ArchitectureTest.java @@ -0,0 +1,96 @@ +package io.confluent.parallelconsumer; + +/*- + * Copyright (C) 2020-2026 Confluent, Inc. and contributors + */ + +import com.tngtech.archunit.core.domain.JavaField; +import com.tngtech.archunit.core.importer.ImportOption; +import com.tngtech.archunit.junit.AnalyzeClasses; +import com.tngtech.archunit.junit.ArchTest; +import com.tngtech.archunit.lang.ArchCondition; +import com.tngtech.archunit.lang.ArchRule; +import com.tngtech.archunit.lang.ConditionEvents; +import com.tngtech.archunit.lang.SimpleConditionEvent; +import io.confluent.parallelconsumer.internal.ConsumerManager; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.KafkaConsumer; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + +import com.tngtech.archunit.core.domain.JavaAccess; + +import static com.tngtech.archunit.lang.syntax.ArchRuleDefinition.fields; +import static com.tngtech.archunit.lang.syntax.ArchRuleDefinition.noClasses; + +/** + * ArchUnit rules enforcing the architecture of the Parallel Consumer. + *

+ * These rules prevent regressions in thread-safety and encapsulation. + * See #857. + */ +@AnalyzeClasses( + packages = "io.confluent.parallelconsumer", + importOptions = ImportOption.DoNotIncludeTests.class +) +class ArchitectureTest { + + // Classes allowed to hold a Consumer field. Use getName() to avoid hardcoded strings. + // ThreadConfinedConsumer is package-private so we reference it by name. + private static final Set ALLOWED_CONSUMER_HOLDERS = new HashSet<>(Arrays.asList( + ConsumerManager.class.getName(), + "io.confluent.parallelconsumer.internal.ThreadConfinedConsumer", + ParallelConsumerOptions.class.getName(), + // Lombok @Builder generates this inner class which also holds the consumer field + ParallelConsumerOptions.class.getName() + "$ParallelConsumerOptionsBuilder" + )); + + /** + * Only the designated wrapper/options classes may hold a Consumer or KafkaConsumer field. + * This prevents accidental raw consumer access that bypasses the thread-confinement wrapper. + */ + @ArchTest + static final ArchRule noRawConsumerFieldsOutsideWrappers = + fields() + .that().haveRawType(Consumer.class) + .or().haveRawType(KafkaConsumer.class) + .should(beInAllowedClasses(ALLOWED_CONSUMER_HOLDERS)) + .as("Only " + ALLOWED_CONSUMER_HOLDERS + " may hold a Consumer field. " + + "All other consumer access must go through ConsumerManager. See #857."); + + /** + * Only ProducerWrapper should hold a raw Producer field. + * ProducerManager holds ProducerWrapper, not raw Producer. + */ + @ArchTest + static final ArchRule noRawProducerFieldsOutsideWrapper = + fields() + .that().haveRawType("org.apache.kafka.clients.producer.Producer") + .or().haveRawType("org.apache.kafka.clients.producer.KafkaProducer") + .should(beInAllowedClasses(new HashSet<>(Arrays.asList( + "io.confluent.parallelconsumer.internal.ProducerWrapper", + ParallelConsumerOptions.class.getName(), + ParallelConsumerOptions.class.getName() + "$ParallelConsumerOptionsBuilder" + )))) + .as("Only ProducerWrapper and ParallelConsumerOptions may hold a Producer field. " + + "All other producer access must go through ProducerWrapper/ProducerManager."); + + // Future: add rule that ConsumerManager is only constructed by PCModule. + // Requires DescribedPredicate API which is verbose — defer for now. + + private static ArchCondition beInAllowedClasses(Set allowedClassNames) { + return new ArchCondition<>("be declared in an allowed class") { + @Override + public void check(JavaField field, ConditionEvents events) { + String ownerName = field.getOwner().getName(); + if (!allowedClassNames.contains(ownerName)) { + events.add(SimpleConditionEvent.violated(field, + "Field " + field.getFullName() + " holds a Consumer/Producer reference but " + + ownerName + " is not in the allowed list: " + allowedClassNames)); + } + } + }; + } +} diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/metrics/PCMetricsTest859.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/metrics/PCMetricsTest859.java new file mode 100644 index 000000000..3fd9a780e --- /dev/null +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/metrics/PCMetricsTest859.java @@ -0,0 +1,128 @@ +package io.confluent.parallelconsumer.metrics; + +/*- + * Copyright (C) 2020-2026 Confluent, Inc. and contributors + */ + +import io.micrometer.core.instrument.Tag; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.lang.reflect.Field; +import java.util.Collections; +import java.util.Set; + +import static com.google.common.truth.Truth.assertThat; + +/** + * Regression tests for #859 — PCMetrics memory leak from duplicate meter registrations. + *

+ * The bug: {@code registeredMeters} was an {@code ArrayList} that accumulated duplicate + * {@code Meter.Id} entries every time the same meter was registered. After 3 days in + * production with frequent offset commits, this consumed 96% of heap. + * + * @see #859 + */ +class PCMetricsTest859 { + + private SimpleMeterRegistry registry; + private PCMetrics pcMetrics; + + @BeforeEach + void setUp() { + registry = new SimpleMeterRegistry(); + pcMetrics = new PCMetrics(registry, Collections.emptyList(), "test-instance"); + } + + @AfterEach + void tearDown() { + pcMetrics.close(); + } + + /** + * Core regression test: registering the same timer multiple times must not grow + * the tracking set beyond 1 entry. Before the fix, each call added a duplicate. + */ + @Test + void duplicateTimerRegistrationShouldNotGrowTrackingSet() { + // Register the same timer 100 times (simulating 100 rebalances) + for (int i = 0; i < 100; i++) { + pcMetrics.getTimerFromMetricDef(PCMetricsDef.OFFSETS_ENCODING_TIME); + } + + Set registeredMeters = getRegisteredMeters(); + assertThat(registeredMeters).hasSize(1); + } + + /** + * Same test for counters with identical tags. + */ + @Test + void duplicateCounterRegistrationShouldNotGrowTrackingSet() { + Tag encoding = Tag.of("encoding", "RunLength"); + for (int i = 0; i < 100; i++) { + pcMetrics.getCounterFromMetricDef(PCMetricsDef.OFFSETS_ENCODING_USAGE, encoding); + } + + Set registeredMeters = getRegisteredMeters(); + assertThat(registeredMeters).hasSize(1); + } + + /** + * Different tags should create separate entries (this is correct behaviour, not a leak). + */ + @Test + void differentTagsShouldCreateSeparateEntries() { + pcMetrics.getCounterFromMetricDef(PCMetricsDef.OFFSETS_ENCODING_USAGE, Tag.of("encoding", "RunLength")); + pcMetrics.getCounterFromMetricDef(PCMetricsDef.OFFSETS_ENCODING_USAGE, Tag.of("encoding", "BitSet")); + pcMetrics.getCounterFromMetricDef(PCMetricsDef.OFFSETS_ENCODING_USAGE, Tag.of("encoding", "BitSetCompressed")); + + Set registeredMeters = getRegisteredMeters(); + assertThat(registeredMeters).hasSize(3); + } + + /** + * After close, the tracking set should be empty. + */ + @Test + void closeShouldClearAllRegisteredMeters() { + pcMetrics.getTimerFromMetricDef(PCMetricsDef.OFFSETS_ENCODING_TIME); + pcMetrics.getCounterFromMetricDef(PCMetricsDef.OFFSETS_ENCODING_USAGE, Tag.of("encoding", "RunLength")); + + Set registeredMeters = getRegisteredMeters(); + assertThat(registeredMeters).hasSize(2); + + pcMetrics.close(); + + assertThat(registeredMeters).isEmpty(); + } + + /** + * removeMetersByPrefixAndCommonTags should also clean the tracking set, + * not just the registry. Before the fix, it left stale Meter.Id references. + */ + @Test + void removeMetersByPrefixShouldCleanTrackingSet() { + pcMetrics.getTimerFromMetricDef(PCMetricsDef.OFFSETS_ENCODING_TIME); + + Set registeredMeters = getRegisteredMeters(); + assertThat(registeredMeters).hasSize(1); + + pcMetrics.removeMetersByPrefixAndCommonTags(PCMetricsDef.OFFSETS_ENCODING_TIME.getName()); + + assertThat(registeredMeters).isEmpty(); + } + + @SuppressWarnings("unchecked") + private Set getRegisteredMeters() { + try { + Field field = PCMetrics.class.getDeclaredField("registeredMeters"); + field.setAccessible(true); + return (Set) field.get(pcMetrics); + } catch (Exception e) { + throw new RuntimeException("Failed to access registeredMeters field", e); + } + } +} diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/offsets/WorkManagerOffsetMapCodecManagerTest.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/offsets/WorkManagerOffsetMapCodecManagerTest.java index 102c67a55..5a8f057db 100644 --- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/offsets/WorkManagerOffsetMapCodecManagerTest.java +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/offsets/WorkManagerOffsetMapCodecManagerTest.java @@ -80,10 +80,9 @@ class WorkManagerOffsetMapCodecManagerTest { @Mock ConsumerRecord mockCr; - @BeforeEach - void setupMock() { - injectSucceededWorkAtOffset(highestSucceeded); - } + // Removed separate @BeforeEach setupMock() — it depended on state being initialized + // by setup() first, but JUnit 5 doesn't guarantee @BeforeEach ordering. + // The injectSucceededWorkAtOffset(highestSucceeded) call is now at the end of setup(). private void injectSucceededWorkAtOffset(long offset) { Mockito.doReturn(offset).when(mockCr).offset(); @@ -125,6 +124,10 @@ void setup() { wm = module.workManager(); wm.onPartitionsAssigned(UniLists.of(tp)); offsetCodecManager = new OffsetMapCodecManager<>(module); + + // Was a separate @BeforeEach (setupMock) but JUnit 5 doesn't guarantee ordering + // between @BeforeEach methods. Must run after state is initialized. + injectSucceededWorkAtOffset(highestSucceeded); } @BeforeAll diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/ModelUtils.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/ModelUtils.java index 20c45e35c..1d35b7591 100644 --- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/ModelUtils.java +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/ModelUtils.java @@ -32,9 +32,15 @@ public ModelUtils() { } public WorkContainer createWorkFor(long offset) { + return createWorkFor(offset, 0); + } + + public WorkContainer createWorkFor(long offset, long epoch) { ConsumerRecord mockCr = Mockito.mock(ConsumerRecord.class); - WorkContainer workContainer = new WorkContainer<>(0, mockCr, module); Mockito.doReturn(offset).when(mockCr).offset(); + Mockito.doReturn(topic).when(mockCr).topic(); + Mockito.doReturn(0).when(mockCr).partition(); + WorkContainer workContainer = new WorkContainer<>(epoch, mockCr, module); return workContainer; } diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/ShardManagerStaleContainerTest.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/ShardManagerStaleContainerTest.java new file mode 100644 index 000000000..29aee5fab --- /dev/null +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/ShardManagerStaleContainerTest.java @@ -0,0 +1,194 @@ +package io.confluent.parallelconsumer.state; + +/*- + * Copyright (C) 2020-2026 Confluent, Inc. and contributors + */ + +import io.confluent.parallelconsumer.internal.PCModuleTestEnv; +import io.confluent.parallelconsumer.offsets.OffsetMapCodecManager; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import pl.tlinkowski.unij.api.UniLists; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import static com.google.common.truth.Truth.assertThat; +import static com.google.common.truth.Truth.assertWithMessage; + +/** + * Deterministic unit tests for stale container handling across rebalances. + *

+ * These tests construct the exact scenarios suspected in + * #857 + * without requiring a broker, so they run fast and fail deterministically. + * + * @see ShardManager + * @see ProcessingShard + * @see PartitionStateManager + */ +@Slf4j +class ShardManagerStaleContainerTest { + + ModelUtils mu = new ModelUtils(); + WorkManager wm; + ShardManager sm; + PartitionStateManager pm; + + String topic = "topic"; + TopicPartition tp = new TopicPartition(topic, 0); + + @BeforeEach + void setup() { + PCModuleTestEnv module = mu.getModule(); + wm = module.workManager(); + sm = wm.getSm(); + pm = wm.getPm(); + + // initial assignment at epoch 0 + wm.onPartitionsAssigned(UniLists.of(tp)); + } + + /** + * Core reproduction scenario for #857: after a revoke+reassign cycle, stale work containers + * from the old epoch should not block new work from being taken. + */ + @Test + void staleContainerShouldNotBlockNewWorkAfterRebalance() { + long initialEpoch = pm.getEpochOfPartition(tp); + + // Add work at the initial epoch + for (int i = 0; i < 5; i++) { + sm.addWorkContainer(initialEpoch, new ConsumerRecord<>(topic, 0, i, "key-" + i, "value")); + } + + // Verify we can take work (sanity check) + List> initialWork = sm.getWorkIfAvailable(10); + assertThat(initialWork).hasSize(5); + + // Simulate revoke → reassign (epoch goes from N to N+2) + wm.onPartitionsRevoked(UniLists.of(tp)); + wm.onPartitionsAssigned(UniLists.of(tp)); + + long newEpoch = pm.getEpochOfPartition(tp); + assertWithMessage("Epoch should have advanced past initial") + .that(newEpoch).isGreaterThan(initialEpoch); + + // Add new work at the new epoch + for (int i = 10; i < 15; i++) { + sm.addWorkContainer(newEpoch, new ConsumerRecord<>(topic, 0, i, "key-" + i, "value")); + } + + // Also inject a "late arriving" container with the OLD epoch — simulates a poll + // that was in-flight during the rebalance and arrived after reassignment + sm.addWorkContainer(initialEpoch, new ConsumerRecord<>(topic, 0, 99, "key-stale", "value")); + + // Now try to take work — the new epoch's containers should be returned, + // and the stale one should not block them + List> workAfterRebalance = sm.getWorkIfAvailable(10); + + assertWithMessage("Should be able to take new-epoch work after rebalance. " + + "If this fails with 0, stale containers are blocking the shard — this is bug #857.") + .that(workAfterRebalance).isNotEmpty(); + + // Verify the returned work is from the new epoch + for (var wc : workAfterRebalance) { + assertWithMessage("Returned work should be from new epoch, not stale") + .that(wc.getEpoch()).isEqualTo(newEpoch); + } + } + + /** + * Rapid successive rebalances (revoke→assign→revoke→assign) should clean up all stale + * containers and not leave any behind to block future work. + */ + @Test + void multipleRapidRebalancesShouldNotLeaveStaleContainers() { + long epoch0 = pm.getEpochOfPartition(tp); + + // Add work at epoch 0 + sm.addWorkContainer(epoch0, new ConsumerRecord<>(topic, 0, 0, "key-0", "value")); + sm.addWorkContainer(epoch0, new ConsumerRecord<>(topic, 0, 1, "key-1", "value")); + + // Rapid rebalance cycle 1 + wm.onPartitionsRevoked(UniLists.of(tp)); + wm.onPartitionsAssigned(UniLists.of(tp)); + + long epoch1 = pm.getEpochOfPartition(tp); + sm.addWorkContainer(epoch1, new ConsumerRecord<>(topic, 0, 2, "key-2", "value")); + + // Rapid rebalance cycle 2 + wm.onPartitionsRevoked(UniLists.of(tp)); + wm.onPartitionsAssigned(UniLists.of(tp)); + + long epoch2 = pm.getEpochOfPartition(tp); + sm.addWorkContainer(epoch2, new ConsumerRecord<>(topic, 0, 3, "key-3", "value")); + + // Rapid rebalance cycle 3 + wm.onPartitionsRevoked(UniLists.of(tp)); + wm.onPartitionsAssigned(UniLists.of(tp)); + + long finalEpoch = pm.getEpochOfPartition(tp); + assertThat(finalEpoch).isGreaterThan(epoch2); + + // Add work at the final epoch + sm.addWorkContainer(finalEpoch, new ConsumerRecord<>(topic, 0, 10, "key-fresh", "value")); + + // Explicitly run stale removal + long staleCount = sm.removeStaleContainers(); + log.info("Removed {} stale containers after rapid rebalances", staleCount); + + // Take work — should get the fresh container + List> work = sm.getWorkIfAvailable(10); + assertWithMessage("Should get fresh work after rapid rebalances") + .that(work).isNotEmpty(); + + for (var wc : work) { + assertWithMessage("All returned work should be from final epoch") + .that(wc.getEpoch()).isEqualTo(finalEpoch); + } + } + + /** + * Verify that the stale container removal actually removes containers from all prior epochs, + * not just the immediately previous one. + */ + @Test + void staleRemovalShouldCatchContainersFromAllPriorEpochs() { + long epoch0 = pm.getEpochOfPartition(tp); + sm.addWorkContainer(epoch0, new ConsumerRecord<>(topic, 0, 0, "key-e0", "value")); + + wm.onPartitionsRevoked(UniLists.of(tp)); + wm.onPartitionsAssigned(UniLists.of(tp)); + + long epoch1 = pm.getEpochOfPartition(tp); + sm.addWorkContainer(epoch1, new ConsumerRecord<>(topic, 0, 1, "key-e1", "value")); + + wm.onPartitionsRevoked(UniLists.of(tp)); + wm.onPartitionsAssigned(UniLists.of(tp)); + + long epoch2 = pm.getEpochOfPartition(tp); + + // Now inject containers from BOTH old epochs (simulating two separate late polls) + sm.addWorkContainer(epoch0, new ConsumerRecord<>(topic, 0, 90, "key-stale-e0", "value")); + sm.addWorkContainer(epoch1, new ConsumerRecord<>(topic, 0, 91, "key-stale-e1", "value")); + + // Add fresh work + sm.addWorkContainer(epoch2, new ConsumerRecord<>(topic, 0, 10, "key-fresh", "value")); + + // Take work — stale containers from both old epochs should not block + List> work = sm.getWorkIfAvailable(10); + + assertWithMessage("Fresh work should be available despite stale containers from multiple epochs") + .that(work).isNotEmpty(); + + for (var wc : work) { + assertWithMessage("No stale work should be returned") + .that(wc.getEpoch()).isEqualTo(epoch2); + } + } +} diff --git a/parallel-consumer-mutiny/pom.xml b/parallel-consumer-mutiny/pom.xml index 08fc730e0..d9bc78409 100644 --- a/parallel-consumer-mutiny/pom.xml +++ b/parallel-consumer-mutiny/pom.xml @@ -15,6 +15,11 @@ Confluent Parallel Consumer SmallRye Mutiny parallel-consumer-mutiny + + + 9 + + io.confluent.parallelconsumer diff --git a/pom.xml b/pom.xml index 78a81326a..5cb3de224 100644 --- a/pom.xml +++ b/pom.xml @@ -103,6 +103,7 @@ 1.3.0 0.8 5.12.0 + 1.1.1 0.1.1 1.0.0 1.5.19 @@ -333,6 +334,12 @@ ${mockito.version} test + + com.tngtech.archunit + archunit-junit5 + ${archunit.version} + test + com.google.auto.service auto-service-annotations