diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/ProcessingShard.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/ProcessingShard.java index 6dc4d6890..f2ec527a7 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/ProcessingShard.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/ProcessingShard.java @@ -59,8 +59,17 @@ public class ProcessingShard { public void addWorkContainer(WorkContainer wc) { long key = wc.offset(); - if (entries.containsKey(key)) { - log.debug("Entry for {} already exists in shard queue, dropping record", wc); + WorkContainer existing = entries.get(key); + if (existing != null) { + // Check if the existing entry is stale and should be replaced + if (isWorkContainerStale(existing)) { + log.debug("Replacing stale entry (epoch {}) for offset {} with fresh one (epoch {})", + existing.getEpoch(), key, wc.getEpoch()); + entries.put(key, wc); + // availableWorkContainerCnt stays the same since we're replacing, not adding + } else { + log.debug("Entry for {} already exists in shard queue, dropping record", wc); + } } else { entries.put(key, wc); availableWorkContainerCnt.incrementAndGet();