From 97b2011e31774038bb732523c2c69d7afed3cbfa Mon Sep 17 00:00:00 2001 From: Helt Date: Tue, 2 Jul 2024 05:33:17 +0800 Subject: [PATCH 1/3] Backport of apache/iceberg#9402 to openhouse-1.5.2. Core: Fix ParallelIterable memory leak where queue continues to be populated even after iterator close (#9402) (cherry picked from commit d3cb1b696a1631a9cca4619f93252cd0a985fbfc) --- .../apache/iceberg/util/ParallelIterable.java | 6 ++ .../iceberg/util/TestParallelIterable.java | 61 +++++++++++++++++++ 2 files changed, 67 insertions(+) diff --git a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java b/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java index 108757b415..d7221e7d45 100644 --- a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java +++ b/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java @@ -67,6 +67,12 @@ private ParallelIterator( try (Closeable ignored = (iterable instanceof Closeable) ? (Closeable) iterable : () -> {}) { for (T item : iterable) { + // exit manually because `ConcurrentLinkedQueue` can't be + // interrupted + if (closed) { + return; + } + queue.add(item); } } catch (IOException e) { diff --git a/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java b/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java index 68685614d3..af9c6ec521 100644 --- a/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java +++ b/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.lang.reflect.Field; import java.util.Collections; +import java.util.Iterator; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; @@ -72,6 +73,66 @@ public CloseableIterator iterator() { .untilAsserted(() -> assertThat(queue).isEmpty()); } + @Test + public void closeMoreDataParallelIteratorWithoutCompleteIteration() + throws IOException, IllegalAccessException, NoSuchFieldException { + ExecutorService executor = Executors.newFixedThreadPool(1); + Iterator integerIterator = + new Iterator() { + private int number = 1; + + @Override + public boolean hasNext() { + if (number > 1000) { + return false; + } + + number++; + return true; + } + + @Override + public Integer next() { + try { + // sleep to control number generate rate + Thread.sleep(10); + } catch (InterruptedException e) { + // Sleep interrupted, we ignore it! + } + return number; + } + }; + Iterable> transform = + Iterables.transform( + Lists.newArrayList(1), + item -> + new CloseableIterable() { + @Override + public void close() {} + + @Override + public CloseableIterator iterator() { + return CloseableIterator.withClose(integerIterator); + } + }); + + ParallelIterable parallelIterable = new ParallelIterable<>(transform, executor); + CloseableIterator iterator = parallelIterable.iterator(); + Field queueField = iterator.getClass().getDeclaredField("queue"); + queueField.setAccessible(true); + ConcurrentLinkedQueue queue = (ConcurrentLinkedQueue) queueField.get(iterator); + + assertThat(iterator.hasNext()).isTrue(); + assertThat(iterator.next()).isNotNull(); + Awaitility.await("Queue is populated") + .atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> queueHasElements(iterator, queue)); + iterator.close(); + Awaitility.await("Queue is cleared") + .atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> assertThat(queue).as("Queue is not empty after cleaning").isEmpty()); + } + private void queueHasElements(CloseableIterator iterator, Queue queue) { assertThat(iterator.hasNext()).isTrue(); assertThat(iterator.next()).isNotNull(); From 6631a9e260608bdfc23ab1c5cfd686840f1ce77e Mon Sep 17 00:00:00 2001 From: Raunaq Morarka Date: Fri, 26 Jul 2024 21:50:25 +0530 Subject: [PATCH 2/3] Backport of apache/iceberg#10787 to openhouse-1.5.2. Core: Limit ParallelIterable memory consumption by yielding in tasks (backport #10691) (#10787) ParallelIterable schedules 2 * WORKER_THREAD_POOL_SIZE tasks for processing input iterables. This defaults to 2 * # CPU cores. When one or some of the input iterables are considerable in size and the ParallelIterable consumer is not quick enough, this could result in unbounded allocation inside `ParallelIterator.queue`. This commit bounds the queue. When queue is full, the tasks yield and get removed from the executor. They are resumed when consumer catches up. (cherry picked from commit 7831a8dfc3a2de546ca069f4fc1e7afd03777554) Co-authored-by: Piotr Findeisen (cherry picked from commit e18a2fe10214f5f3ffa0a317a28af8b2a619817a) --- .../apache/iceberg/util/ParallelIterable.java | 222 +++++++++++++----- .../iceberg/util/TestParallelIterable.java | 48 ++++ 2 files changed, 215 insertions(+), 55 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java b/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java index d7221e7d45..6486bd7fd4 100644 --- a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java +++ b/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java @@ -20,84 +20,117 @@ import java.io.Closeable; import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.ArrayDeque; +import java.util.Deque; import java.util.Iterator; import java.util.NoSuchElementException; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import org.apache.iceberg.exceptions.RuntimeIOException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; import org.apache.iceberg.io.CloseableGroup; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.io.Closer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class ParallelIterable extends CloseableGroup implements CloseableIterable { + + private static final Logger LOG = LoggerFactory.getLogger(ParallelIterable.class); + + // Logic behind default value: ParallelIterable is often used for file planning. + // Assuming that a DataFile or DeleteFile is about 500 bytes, a 30k limit uses 14.3 MB of memory. + private static final int DEFAULT_MAX_QUEUE_SIZE = 30_000; + private final Iterable> iterables; private final ExecutorService workerPool; + // Bound for number of items in the queue to limit memory consumption + // even in the case when input iterables are large. + private final int approximateMaxQueueSize; + public ParallelIterable(Iterable> iterables, ExecutorService workerPool) { - this.iterables = iterables; - this.workerPool = workerPool; + this(iterables, workerPool, DEFAULT_MAX_QUEUE_SIZE); + } + + public ParallelIterable( + Iterable> iterables, + ExecutorService workerPool, + int approximateMaxQueueSize) { + this.iterables = Preconditions.checkNotNull(iterables, "Input iterables cannot be null"); + this.workerPool = Preconditions.checkNotNull(workerPool, "Worker pool cannot be null"); + this.approximateMaxQueueSize = approximateMaxQueueSize; } @Override public CloseableIterator iterator() { - ParallelIterator iter = new ParallelIterator<>(iterables, workerPool); + ParallelIterator iter = + new ParallelIterator<>(iterables, workerPool, approximateMaxQueueSize); addCloseable(iter); return iter; } private static class ParallelIterator implements CloseableIterator { - private final Iterator tasks; + private final Iterator> tasks; + private final Deque> yieldedTasks = new ArrayDeque<>(); private final ExecutorService workerPool; - private final Future[] taskFutures; + private final CompletableFuture>>[] taskFutures; private final ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>(); - private volatile boolean closed = false; + private final int maxQueueSize; + private final AtomicBoolean closed = new AtomicBoolean(false); private ParallelIterator( - Iterable> iterables, ExecutorService workerPool) { + Iterable> iterables, ExecutorService workerPool, int maxQueueSize) { this.tasks = Iterables.transform( - iterables, - iterable -> - (Runnable) - () -> { - try (Closeable ignored = - (iterable instanceof Closeable) ? (Closeable) iterable : () -> {}) { - for (T item : iterable) { - // exit manually because `ConcurrentLinkedQueue` can't be - // interrupted - if (closed) { - return; - } - - queue.add(item); - } - } catch (IOException e) { - throw new RuntimeIOException(e, "Failed to close iterable"); - } - }) + iterables, iterable -> new Task<>(iterable, queue, closed, maxQueueSize)) .iterator(); this.workerPool = workerPool; + this.maxQueueSize = maxQueueSize; // submit 2 tasks per worker at a time - this.taskFutures = new Future[2 * ThreadPools.WORKER_THREAD_POOL_SIZE]; + this.taskFutures = new CompletableFuture[2 * ThreadPools.WORKER_THREAD_POOL_SIZE]; } @Override public void close() { // close first, avoid new task submit - this.closed = true; + this.closed.set(true); - // cancel background tasks - for (Future taskFuture : taskFutures) { - if (taskFuture != null && !taskFuture.isDone()) { - taskFuture.cancel(true); + try (Closer closer = Closer.create()) { + synchronized (this) { + yieldedTasks.forEach(closer::register); + yieldedTasks.clear(); } + + // cancel background tasks and close continuations if any + for (CompletableFuture>> taskFuture : taskFutures) { + if (taskFuture != null) { + taskFuture.cancel(true); + taskFuture.thenAccept( + continuation -> { + if (continuation.isPresent()) { + try { + continuation.get().close(); + } catch (IOException e) { + LOG.error("Task close failed", e); + } + } + }); + } + } + + // clean queue + this.queue.clear(); + } catch (IOException e) { + throw new UncheckedIOException("Close failed", e); } - // clean queue - this.queue.clear(); } /** @@ -107,15 +140,17 @@ public void close() { * * @return true if there are pending tasks, false otherwise */ - private boolean checkTasks() { + private synchronized boolean checkTasks() { + Preconditions.checkState(!closed.get(), "Already closed"); boolean hasRunningTask = false; for (int i = 0; i < taskFutures.length; i += 1) { if (taskFutures[i] == null || taskFutures[i].isDone()) { if (taskFutures[i] != null) { - // check for task failure and re-throw any exception + // check for task failure and re-throw any exception. Enqueue continuation if any. try { - taskFutures[i].get(); + Optional> continuation = taskFutures[i].get(); + continuation.ifPresent(yieldedTasks::addLast); } catch (ExecutionException e) { if (e.getCause() instanceof RuntimeException) { // rethrow a runtime exception @@ -136,30 +171,33 @@ private boolean checkTasks() { } } - return !closed && (tasks.hasNext() || hasRunningTask); + return !closed.get() && (tasks.hasNext() || hasRunningTask); } - private Future submitNextTask() { - if (!closed && tasks.hasNext()) { - return workerPool.submit(tasks.next()); + private CompletableFuture>> submitNextTask() { + if (!closed.get()) { + if (!yieldedTasks.isEmpty()) { + return CompletableFuture.supplyAsync(yieldedTasks.removeFirst(), workerPool); + } else if (tasks.hasNext()) { + return CompletableFuture.supplyAsync(tasks.next(), workerPool); + } } return null; } @Override public synchronized boolean hasNext() { - Preconditions.checkState(!closed, "Already closed"); - - // if the consumer is processing records more slowly than the producers, then this check will - // prevent tasks from being submitted. while the producers are running, this will always - // return here before running checkTasks. when enough of the tasks are finished that the - // consumer catches up, then lots of new tasks will be submitted at once. this behavior is - // okay because it ensures that records are not stacking up waiting to be consumed and taking - // up memory. - // - // consumers that process results quickly will periodically exhaust the queue and submit new - // tasks when checkTasks runs. fast consumers should not be delayed. - if (!queue.isEmpty()) { + Preconditions.checkState(!closed.get(), "Already closed"); + + // If the consumer is processing records more slowly than the producers, the producers will + // eventually fill the queue and yield, returning continuations. Continuations and new tasks + // are started by checkTasks(). The check here prevents us from restarting continuations or + // starting new tasks too early (when queue is almost full) or too late (when queue is already + // emptied). Restarting too early would lead to tasks yielding very quickly (CPU waste on + // scheduling). Restarting too late would mean the consumer may need to wait for the tasks + // to produce new items. A consumer slower than producers shouldn't need to wait. + int queueLowWaterMark = maxQueueSize / 2; + if (queue.size() > queueLowWaterMark) { return true; } @@ -192,4 +230,78 @@ public synchronized T next() { return queue.poll(); } } + + private static class Task implements Supplier>>, Closeable { + private final Iterable input; + private final ConcurrentLinkedQueue queue; + private final AtomicBoolean closed; + private final int approximateMaxQueueSize; + + private Iterator iterator = null; + + Task( + Iterable input, + ConcurrentLinkedQueue queue, + AtomicBoolean closed, + int approximateMaxQueueSize) { + this.input = Preconditions.checkNotNull(input, "input cannot be null"); + this.queue = Preconditions.checkNotNull(queue, "queue cannot be null"); + this.closed = Preconditions.checkNotNull(closed, "closed cannot be null"); + this.approximateMaxQueueSize = approximateMaxQueueSize; + } + + @Override + public Optional> get() { + try { + if (iterator == null) { + iterator = input.iterator(); + } + + while (iterator.hasNext()) { + if (queue.size() >= approximateMaxQueueSize) { + // Yield when queue is over the size limit. Task will be resubmitted later and continue + // the work. + return Optional.of(this); + } + + T next = iterator.next(); + if (closed.get()) { + break; + } + + queue.add(next); + } + } catch (Throwable e) { + try { + close(); + } catch (IOException closeException) { + // self-suppression is not permitted + // (e and closeException to be the same is unlikely, but possible) + if (closeException != e) { + e.addSuppressed(closeException); + } + } + + throw e; + } + + try { + close(); + } catch (IOException e) { + throw new UncheckedIOException("Close failed", e); + } + + // The task is complete. Returning empty means there is no continuation that should be + // executed. + return Optional.empty(); + } + + @Override + public void close() throws IOException { + iterator = null; + if (input instanceof Closeable) { + ((Closeable) input).close(); + } + } + } } diff --git a/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java b/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java index af9c6ec521..4910732f6e 100644 --- a/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java +++ b/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java @@ -24,15 +24,22 @@ import java.lang.reflect.Field; import java.util.Collections; import java.util.Iterator; +import java.util.List; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; +import java.util.stream.Stream; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.relocated.com.google.common.collect.HashMultiset; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMultiset; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Multiset; import org.awaitility.Awaitility; import org.junit.jupiter.api.Test; @@ -133,6 +140,47 @@ public CloseableIterator iterator() { .untilAsserted(() -> assertThat(queue).as("Queue is not empty after cleaning").isEmpty()); } + @Test + public void limitQueueSize() throws IOException, IllegalAccessException, NoSuchFieldException { + + List> iterables = + ImmutableList.of( + () -> IntStream.range(0, 100).iterator(), + () -> IntStream.range(0, 100).iterator(), + () -> IntStream.range(0, 100).iterator()); + + Multiset expectedValues = + IntStream.range(0, 100) + .boxed() + .flatMap(i -> Stream.of(i, i, i)) + .collect(ImmutableMultiset.toImmutableMultiset()); + + int maxQueueSize = 20; + ExecutorService executor = Executors.newCachedThreadPool(); + ParallelIterable parallelIterable = + new ParallelIterable<>(iterables, executor, maxQueueSize); + CloseableIterator iterator = parallelIterable.iterator(); + Field queueField = iterator.getClass().getDeclaredField("queue"); + queueField.setAccessible(true); + ConcurrentLinkedQueue queue = (ConcurrentLinkedQueue) queueField.get(iterator); + + Multiset actualValues = HashMultiset.create(); + + while (iterator.hasNext()) { + assertThat(queue) + .as("iterator internal queue") + .hasSizeLessThanOrEqualTo(maxQueueSize + iterables.size()); + actualValues.add(iterator.next()); + } + + assertThat(actualValues) + .as("multiset of values returned by the iterator") + .isEqualTo(expectedValues); + + iterator.close(); + executor.shutdownNow(); + } + private void queueHasElements(CloseableIterator iterator, Queue queue) { assertThat(iterator.hasNext()).isTrue(); assertThat(iterator.next()).isNotNull(); From 4e3efcde2e47f19a0ed915e8ac7c94813dedb02e Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Thu, 22 Aug 2024 00:05:17 +0200 Subject: [PATCH 3/3] Backport of apache/iceberg#10979 to openhouse-1.5.2. Drop ParallelIterable's queue low water mark (#10979) As part of the change in commit 7831a8dfc3a2de546ca069f4fc1e7afd03777554, queue low water mark was introduced. However, it resulted in increased number of manifests being read when planning LIMIT queries in Trino Iceberg connector. To avoid increased I/O, back out the change for now. (cherry picked from commit ed53c6d326cb7efef2d41e26ef001e1d7b17fd78) --- .../java/org/apache/iceberg/util/ParallelIterable.java | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java b/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java index 6486bd7fd4..17c45b6348 100644 --- a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java +++ b/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java @@ -192,12 +192,9 @@ public synchronized boolean hasNext() { // If the consumer is processing records more slowly than the producers, the producers will // eventually fill the queue and yield, returning continuations. Continuations and new tasks // are started by checkTasks(). The check here prevents us from restarting continuations or - // starting new tasks too early (when queue is almost full) or too late (when queue is already - // emptied). Restarting too early would lead to tasks yielding very quickly (CPU waste on - // scheduling). Restarting too late would mean the consumer may need to wait for the tasks - // to produce new items. A consumer slower than producers shouldn't need to wait. - int queueLowWaterMark = maxQueueSize / 2; - if (queue.size() > queueLowWaterMark) { + // starting new tasks before the queue is emptied. Restarting too early would lead to tasks + // yielding very quickly (CPU waste on scheduling). + if (!queue.isEmpty()) { return true; }