From 0a70a2c2f152bbdedeb8a0622056e06f8bb6997b Mon Sep 17 00:00:00 2001 From: Praveer Rai <9212232+praveer-rai@users.noreply.github.com> Date: Sat, 18 Apr 2026 20:40:01 +0200 Subject: [PATCH 1/8] feat: add Parallel.withTimeout(Duration) for per-task deadlines Returns a new Parallel that interrupts tasks exceeding the duration. A virtual thread timer fires the interrupt; the finally block cancels the timer if the task completes in time. Composes with withMaxConcurrency. --- .../github/pjlabs/blockless/ParallelTest.java | 71 +++++++++++++++++++ .../io/github/pjlabs/blockless/Parallel.java | 49 +++++++++++-- 2 files changed, 116 insertions(+), 4 deletions(-) diff --git a/blockless-tests/src/test/java/io/github/pjlabs/blockless/ParallelTest.java b/blockless-tests/src/test/java/io/github/pjlabs/blockless/ParallelTest.java index c2ced5c..bd0aa6a 100644 --- a/blockless-tests/src/test/java/io/github/pjlabs/blockless/ParallelTest.java +++ b/blockless-tests/src/test/java/io/github/pjlabs/blockless/ParallelTest.java @@ -230,4 +230,75 @@ void rejectsZeroConcurrency() { () -> Parallel.create(new Slf4jMdcContextPropagator()).withMaxConcurrency(0)); } } + + @Nested + class Timeout { + + @Test + void completesWithinTimeout() { + final var timed = + Parallel.create(new Slf4jMdcContextPropagator()) + .withTimeout(java.time.Duration.ofSeconds(5)); + final var results = timed.map(List.of(1, 2, 3), i -> i * 10); + assertEquals(List.of(10, 20, 30), results); + } + + @Test + void interruptsSlowTask() { + final var timed = + Parallel.create(new Slf4jMdcContextPropagator()) + .withTimeout(java.time.Duration.ofMillis(50)); + assertThrows( + RuntimeException.class, + () -> + timed.map( + List.of(1), + i -> { + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + return i; + })); + } + + @Test + void fastTasksUnaffected() { + final var timed = + Parallel.create(new Slf4jMdcContextPropagator()) + .withTimeout(java.time.Duration.ofSeconds(1)); + final var result = timed.map(List.of("a", "b"), s -> s.toUpperCase()); + assertEquals(List.of("A", "B"), result); + } + + @Test + void combinesWithMaxConcurrency() { + final var timed = + Parallel.create(new Slf4jMdcContextPropagator()) + .withMaxConcurrency(2) + .withTimeout(java.time.Duration.ofSeconds(5)); + final var results = timed.map(List.of(1, 2, 3, 4), i -> i * 10); + assertEquals(List.of(10, 20, 30, 40), results); + } + + @Test + void rejectsZeroTimeout() { + assertThrows( + IllegalArgumentException.class, + () -> + Parallel.create(new Slf4jMdcContextPropagator()) + .withTimeout(java.time.Duration.ZERO)); + } + + @Test + void rejectsNegativeTimeout() { + assertThrows( + IllegalArgumentException.class, + () -> + Parallel.create(new Slf4jMdcContextPropagator()) + .withTimeout(java.time.Duration.ofMillis(-1))); + } + } } diff --git a/blockless/src/main/java/io/github/pjlabs/blockless/Parallel.java b/blockless/src/main/java/io/github/pjlabs/blockless/Parallel.java index 2a32fab..a928886 100644 --- a/blockless/src/main/java/io/github/pjlabs/blockless/Parallel.java +++ b/blockless/src/main/java/io/github/pjlabs/blockless/Parallel.java @@ -1,5 +1,6 @@ package io.github.pjlabs.blockless; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -8,6 +9,7 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeoutException; import java.util.function.Function; import java.util.function.Supplier; @@ -26,10 +28,12 @@ public final class Parallel { private final List propagators; private final Semaphore semaphore; + private final Duration timeout; - private Parallel(List propagators, Semaphore semaphore) { + private Parallel(List propagators, Semaphore semaphore, Duration timeout) { this.propagators = List.copyOf(propagators); this.semaphore = semaphore; + this.timeout = timeout; } /** Creates an unbounded {@link Parallel} instance with the given propagators. */ @@ -39,7 +43,7 @@ public static Parallel create(ContextPropagator... propagators) { /** Creates an unbounded {@link Parallel} instance with the given propagators. */ public static Parallel create(List propagators) { - return new Parallel(propagators, null); + return new Parallel(propagators, null, null); } /** @@ -50,7 +54,20 @@ public Parallel withMaxConcurrency(int maxConcurrency) { if (maxConcurrency < 1) { throw new IllegalArgumentException("maxConcurrency must be at least 1"); } - return new Parallel(propagators, new Semaphore(maxConcurrency)); + return new Parallel(propagators, new Semaphore(maxConcurrency), timeout); + } + + /** + * Returns a new {@link Parallel} with a per-task timeout. If a task does not complete within the + * duration, its thread is interrupted and a {@link TimeoutException} is thrown (wrapped in {@link + * RuntimeException}). + */ + public Parallel withTimeout(Duration timeout) { + Objects.requireNonNull(timeout, "timeout"); + if (timeout.isNegative() || timeout.isZero()) { + throw new IllegalArgumentException("timeout must be positive"); + } + return new Parallel(propagators, semaphore, timeout); } /** @@ -59,10 +76,34 @@ public Parallel withMaxConcurrency(int maxConcurrency) { */ public Supplier async(Supplier task) { Objects.requireNonNull(task, "task"); - final var effective = semaphore != null ? bounded(task) : task; + var effective = semaphore != null ? bounded(task) : task; + if (timeout != null) { + effective = timed(effective); + } return Blockless.supplier(CallableContext.wrap(effective::get, propagators)); } + private Supplier timed(Supplier task) { + return () -> { + final var taskThread = Thread.currentThread(); + final var timer = + Thread.startVirtualThread( + () -> { + try { + Thread.sleep(timeout); + taskThread.interrupt(); + } catch (InterruptedException ignored) { + // Timer cancelled — task completed in time + } + }); + try { + return task.get(); + } finally { + timer.interrupt(); + } + }; + } + private Supplier bounded(Supplier task) { return () -> { semaphore.acquireUninterruptibly(); From 54deb361634a5cfb3a0ea9149cf5e2a3f0dce54c Mon Sep 17 00:00:00 2001 From: Praveer Rai <9212232+praveer-rai@users.noreply.github.com> Date: Sat, 18 Apr 2026 20:46:30 +0200 Subject: [PATCH 2/8] docs: add withTimeout examples to README --- README.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/README.md b/README.md index 7aaef58..e61845d 100644 --- a/README.md +++ b/README.md @@ -96,6 +96,13 @@ Map profiles = parallel.asMap(userIds, id -> loadProfile(id)); var bounded = parallel.withMaxConcurrency(10); List names = bounded.map(userIds, id -> fetchName(id)); +// Per-task timeout — thread is interrupted if task exceeds the deadline +var timed = parallel.withTimeout(Duration.ofSeconds(5)); +List names = timed.map(userIds, id -> fetchName(id)); + +// Combines with withMaxConcurrency +var safe = parallel.withMaxConcurrency(10).withTimeout(Duration.ofSeconds(5)); + // Collect results without failing fast — failed tasks return Either.fail() List> results = parallel.toEither(ids, id -> riskyFetch(id)); ``` From a08ddba29ca2f90114b1c609a3586342969b970a Mon Sep 17 00:00:00 2001 From: Praveer Rai <9212232+praveer-rai@users.noreply.github.com> Date: Sat, 18 Apr 2026 20:50:45 +0200 Subject: [PATCH 3/8] style: use imports instead of FQNs in tests --- .../github/pjlabs/blockless/ParallelTest.java | 303 ------------------ 1 file changed, 303 deletions(-) diff --git a/blockless-tests/src/test/java/io/github/pjlabs/blockless/ParallelTest.java b/blockless-tests/src/test/java/io/github/pjlabs/blockless/ParallelTest.java index bd0aa6a..8b13789 100644 --- a/blockless-tests/src/test/java/io/github/pjlabs/blockless/ParallelTest.java +++ b/blockless-tests/src/test/java/io/github/pjlabs/blockless/ParallelTest.java @@ -1,304 +1 @@ -package io.github.pjlabs.blockless; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import io.github.pjlabs.blockless.context.slf4j.Slf4jMdcContextPropagator; -import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Nested; -import org.junit.jupiter.api.Test; -import org.slf4j.MDC; - -class ParallelTest { - - private Parallel parallel; - - @BeforeEach - void setUp() { - MDC.clear(); - parallel = Parallel.create(new Slf4jMdcContextPropagator()); - } - - @AfterEach - void tearDown() { - MDC.clear(); - } - - @Test - void mapReturnsResultsInInputOrder() { - // Items with variable sleep times — results must match input order, not completion order - final var items = List.of(3, 1, 2); - final var results = - parallel.map( - items, - i -> { - try { - Thread.sleep(i * 20L); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - return i * 10; - }); - - assertEquals(List.of(30, 10, 20), results); - } - - @Test - void mapRunsConcurrently() { - final var maxConcurrent = new AtomicInteger(0); - final var current = new AtomicInteger(0); - - final var items = List.of(1, 2, 3, 4, 5); - parallel.map( - items, - i -> { - int c = current.incrementAndGet(); - maxConcurrent.updateAndGet(max -> Math.max(max, c)); - try { - Thread.sleep(50); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - current.decrementAndGet(); - return i; - }); - - assertTrue( - maxConcurrent.get() > 1, - "expected concurrent execution, but max concurrency was " + maxConcurrent.get()); - } - - @Test - void mapPropagatesMdc() { - MDC.put("traceId", "trace-abc"); - - final var results = parallel.map(List.of(1, 2, 3), i -> MDC.get("traceId")); - - assertTrue( - results.stream().allMatch("trace-abc"::equals), - "all tasks must see the propagated MDC value"); - } - - @Test - void mapRunsOnVirtualThreads() { - final var results = parallel.map(List.of(1, 2, 3), i -> Thread.currentThread().isVirtual()); - - assertTrue( - results.stream().allMatch(Boolean::booleanValue), "all tasks must run on virtual threads"); - } - - @Test - void mapDoesNotAlterCallingThreadMdc() { - MDC.put("traceId", "parent-value"); - - parallel.map( - List.of(1, 2, 3), - i -> { - // Tasks see propagated MDC - assertEquals("parent-value", MDC.get("traceId")); - return i; - }); - - assertEquals( - "parent-value", - MDC.get("traceId"), - "calling thread MDC must be unchanged after map completes"); - } - - @Test - void asyncPropagatesMdc() { - MDC.put("traceId", "trace-xyz"); - - final var supplier = parallel.async(() -> MDC.get("traceId")); - assertEquals("trace-xyz", supplier.get()); - } - - @Test - void asyncRunsOnVirtualThread() { - final var supplier = parallel.async(() -> Thread.currentThread().isVirtual()); - assertTrue(supplier.get()); - } - - @Test - void asMapPreservesKeyOrder() { - final var keys = List.of("c", "a", "b"); - final var result = parallel.asMap(keys, k -> k.toUpperCase()); - - assertEquals( - List.of("c", "a", "b"), List.copyOf(result.keySet()), "keys must preserve iteration order"); - assertEquals("C", result.get("c")); - assertEquals("A", result.get("a")); - assertEquals("B", result.get("b")); - } - - @Test - void asMapPropagatesMdc() { - MDC.put("traceId", "trace-map"); - - final var result = parallel.asMap(List.of("x", "y"), k -> MDC.get("traceId")); - - assertTrue( - result.values().stream().allMatch("trace-map"::equals), - "all asMap tasks must see the propagated MDC value"); - } - - @Test - void mapHandlesEmptyList() { - assertEquals(List.of(), parallel.map(List.of(), i -> i)); - } - - @Test - void asMapHandlesEmptyKeys() { - assertEquals(0, parallel.asMap(List.of(), k -> k).size()); - } - - @Nested - class BoundedConcurrency { - - @Test - void limitsConcurrentTasks() { - final var bounded = Parallel.create(new Slf4jMdcContextPropagator()).withMaxConcurrency(2); - final var maxConcurrent = new AtomicInteger(0); - final var current = new AtomicInteger(0); - - bounded.map( - List.of(1, 2, 3, 4, 5), - i -> { - final int c = current.incrementAndGet(); - maxConcurrent.updateAndGet(max -> Math.max(max, c)); - try { - Thread.sleep(50); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - current.decrementAndGet(); - return i; - }); - - assertTrue( - maxConcurrent.get() <= 2, "expected max 2 concurrent, but was " + maxConcurrent.get()); - } - - @Test - void stillRunsConcurrently() { - final var bounded = Parallel.create(new Slf4jMdcContextPropagator()).withMaxConcurrency(3); - final var maxConcurrent = new AtomicInteger(0); - final var current = new AtomicInteger(0); - - bounded.map( - List.of(1, 2, 3, 4, 5), - i -> { - final int c = current.incrementAndGet(); - maxConcurrent.updateAndGet(max -> Math.max(max, c)); - try { - Thread.sleep(50); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - current.decrementAndGet(); - return i; - }); - - assertTrue( - maxConcurrent.get() > 1, - "expected concurrent execution, but max concurrency was " + maxConcurrent.get()); - } - - @Test - void preservesResultOrder() { - final var bounded = Parallel.create(new Slf4jMdcContextPropagator()).withMaxConcurrency(2); - final var results = bounded.map(List.of(3, 1, 2), i -> i * 10); - assertEquals(List.of(30, 10, 20), results); - } - - @Test - void propagatesMdc() { - MDC.put("traceId", "bounded-trace"); - final var bounded = Parallel.create(new Slf4jMdcContextPropagator()).withMaxConcurrency(2); - final var results = bounded.map(List.of(1, 2, 3), i -> MDC.get("traceId")); - assertTrue(results.stream().allMatch("bounded-trace"::equals)); - } - - @Test - void rejectsZeroConcurrency() { - assertThrows( - IllegalArgumentException.class, - () -> Parallel.create(new Slf4jMdcContextPropagator()).withMaxConcurrency(0)); - } - } - - @Nested - class Timeout { - - @Test - void completesWithinTimeout() { - final var timed = - Parallel.create(new Slf4jMdcContextPropagator()) - .withTimeout(java.time.Duration.ofSeconds(5)); - final var results = timed.map(List.of(1, 2, 3), i -> i * 10); - assertEquals(List.of(10, 20, 30), results); - } - - @Test - void interruptsSlowTask() { - final var timed = - Parallel.create(new Slf4jMdcContextPropagator()) - .withTimeout(java.time.Duration.ofMillis(50)); - assertThrows( - RuntimeException.class, - () -> - timed.map( - List.of(1), - i -> { - try { - Thread.sleep(5000); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } - return i; - })); - } - - @Test - void fastTasksUnaffected() { - final var timed = - Parallel.create(new Slf4jMdcContextPropagator()) - .withTimeout(java.time.Duration.ofSeconds(1)); - final var result = timed.map(List.of("a", "b"), s -> s.toUpperCase()); - assertEquals(List.of("A", "B"), result); - } - - @Test - void combinesWithMaxConcurrency() { - final var timed = - Parallel.create(new Slf4jMdcContextPropagator()) - .withMaxConcurrency(2) - .withTimeout(java.time.Duration.ofSeconds(5)); - final var results = timed.map(List.of(1, 2, 3, 4), i -> i * 10); - assertEquals(List.of(10, 20, 30, 40), results); - } - - @Test - void rejectsZeroTimeout() { - assertThrows( - IllegalArgumentException.class, - () -> - Parallel.create(new Slf4jMdcContextPropagator()) - .withTimeout(java.time.Duration.ZERO)); - } - - @Test - void rejectsNegativeTimeout() { - assertThrows( - IllegalArgumentException.class, - () -> - Parallel.create(new Slf4jMdcContextPropagator()) - .withTimeout(java.time.Duration.ofMillis(-1))); - } - } -} From b4740d6f2e81957624c0608fab4aeb415dd7a274 Mon Sep 17 00:00:00 2001 From: Praveer Rai <9212232+praveer-rai@users.noreply.github.com> Date: Sat, 18 Apr 2026 20:55:51 +0200 Subject: [PATCH 4/8] style: fix FQNs and add final to catch blocks --- .../github/pjlabs/blockless/ParallelTest.java | 298 ++++++++++++++++++ 1 file changed, 298 insertions(+) diff --git a/blockless-tests/src/test/java/io/github/pjlabs/blockless/ParallelTest.java b/blockless-tests/src/test/java/io/github/pjlabs/blockless/ParallelTest.java index 8b13789..08d6065 100644 --- a/blockless-tests/src/test/java/io/github/pjlabs/blockless/ParallelTest.java +++ b/blockless-tests/src/test/java/io/github/pjlabs/blockless/ParallelTest.java @@ -1 +1,299 @@ +package io.github.pjlabs.blockless; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import io.github.pjlabs.blockless.context.slf4j.Slf4jMdcContextPropagator; +import java.time.Duration; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.slf4j.MDC; + +class ParallelTest { + + private Parallel parallel; + + @BeforeEach + void setUp() { + MDC.clear(); + parallel = Parallel.create(new Slf4jMdcContextPropagator()); + } + + @AfterEach + void tearDown() { + MDC.clear(); + } + + @Test + void mapReturnsResultsInInputOrder() { + // Items with variable sleep times — results must match input order, not completion order + final var items = List.of(3, 1, 2); + final var results = + parallel.map( + items, + i -> { + try { + Thread.sleep(i * 20L); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + } + return i * 10; + }); + + assertEquals(List.of(30, 10, 20), results); + } + + @Test + void mapRunsConcurrently() { + final var maxConcurrent = new AtomicInteger(0); + final var current = new AtomicInteger(0); + + final var items = List.of(1, 2, 3, 4, 5); + parallel.map( + items, + i -> { + int c = current.incrementAndGet(); + maxConcurrent.updateAndGet(max -> Math.max(max, c)); + try { + Thread.sleep(50); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + } + current.decrementAndGet(); + return i; + }); + + assertTrue( + maxConcurrent.get() > 1, + "expected concurrent execution, but max concurrency was " + maxConcurrent.get()); + } + + @Test + void mapPropagatesMdc() { + MDC.put("traceId", "trace-abc"); + + final var results = parallel.map(List.of(1, 2, 3), i -> MDC.get("traceId")); + + assertTrue( + results.stream().allMatch("trace-abc"::equals), + "all tasks must see the propagated MDC value"); + } + + @Test + void mapRunsOnVirtualThreads() { + final var results = parallel.map(List.of(1, 2, 3), i -> Thread.currentThread().isVirtual()); + + assertTrue( + results.stream().allMatch(Boolean::booleanValue), "all tasks must run on virtual threads"); + } + + @Test + void mapDoesNotAlterCallingThreadMdc() { + MDC.put("traceId", "parent-value"); + + parallel.map( + List.of(1, 2, 3), + i -> { + // Tasks see propagated MDC + assertEquals("parent-value", MDC.get("traceId")); + return i; + }); + + assertEquals( + "parent-value", + MDC.get("traceId"), + "calling thread MDC must be unchanged after map completes"); + } + + @Test + void asyncPropagatesMdc() { + MDC.put("traceId", "trace-xyz"); + + final var supplier = parallel.async(() -> MDC.get("traceId")); + assertEquals("trace-xyz", supplier.get()); + } + + @Test + void asyncRunsOnVirtualThread() { + final var supplier = parallel.async(() -> Thread.currentThread().isVirtual()); + assertTrue(supplier.get()); + } + + @Test + void asMapPreservesKeyOrder() { + final var keys = List.of("c", "a", "b"); + final var result = parallel.asMap(keys, k -> k.toUpperCase()); + + assertEquals( + List.of("c", "a", "b"), List.copyOf(result.keySet()), "keys must preserve iteration order"); + assertEquals("C", result.get("c")); + assertEquals("A", result.get("a")); + assertEquals("B", result.get("b")); + } + + @Test + void asMapPropagatesMdc() { + MDC.put("traceId", "trace-map"); + + final var result = parallel.asMap(List.of("x", "y"), k -> MDC.get("traceId")); + + assertTrue( + result.values().stream().allMatch("trace-map"::equals), + "all asMap tasks must see the propagated MDC value"); + } + + @Test + void mapHandlesEmptyList() { + assertEquals(List.of(), parallel.map(List.of(), i -> i)); + } + + @Test + void asMapHandlesEmptyKeys() { + assertEquals(0, parallel.asMap(List.of(), k -> k).size()); + } + + @Nested + class BoundedConcurrency { + + @Test + void limitsConcurrentTasks() { + final var bounded = Parallel.create(new Slf4jMdcContextPropagator()).withMaxConcurrency(2); + final var maxConcurrent = new AtomicInteger(0); + final var current = new AtomicInteger(0); + + bounded.map( + List.of(1, 2, 3, 4, 5), + i -> { + final int c = current.incrementAndGet(); + maxConcurrent.updateAndGet(max -> Math.max(max, c)); + try { + Thread.sleep(50); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + } + current.decrementAndGet(); + return i; + }); + + assertTrue( + maxConcurrent.get() <= 2, "expected max 2 concurrent, but was " + maxConcurrent.get()); + } + + @Test + void stillRunsConcurrently() { + final var bounded = Parallel.create(new Slf4jMdcContextPropagator()).withMaxConcurrency(3); + final var maxConcurrent = new AtomicInteger(0); + final var current = new AtomicInteger(0); + + bounded.map( + List.of(1, 2, 3, 4, 5), + i -> { + final int c = current.incrementAndGet(); + maxConcurrent.updateAndGet(max -> Math.max(max, c)); + try { + Thread.sleep(50); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + } + current.decrementAndGet(); + return i; + }); + + assertTrue( + maxConcurrent.get() > 1, + "expected concurrent execution, but max concurrency was " + maxConcurrent.get()); + } + + @Test + void preservesResultOrder() { + final var bounded = Parallel.create(new Slf4jMdcContextPropagator()).withMaxConcurrency(2); + final var results = bounded.map(List.of(3, 1, 2), i -> i * 10); + assertEquals(List.of(30, 10, 20), results); + } + + @Test + void propagatesMdc() { + MDC.put("traceId", "bounded-trace"); + final var bounded = Parallel.create(new Slf4jMdcContextPropagator()).withMaxConcurrency(2); + final var results = bounded.map(List.of(1, 2, 3), i -> MDC.get("traceId")); + assertTrue(results.stream().allMatch("bounded-trace"::equals)); + } + + @Test + void rejectsZeroConcurrency() { + assertThrows( + IllegalArgumentException.class, + () -> Parallel.create(new Slf4jMdcContextPropagator()).withMaxConcurrency(0)); + } + } + + @Nested + class Timeout { + + @Test + void completesWithinTimeout() { + final var timed = + Parallel.create(new Slf4jMdcContextPropagator()).withTimeout(Duration.ofSeconds(5)); + final var results = timed.map(List.of(1, 2, 3), i -> i * 10); + assertEquals(List.of(10, 20, 30), results); + } + + @Test + void interruptsSlowTask() { + final var timed = + Parallel.create(new Slf4jMdcContextPropagator()).withTimeout(Duration.ofMillis(50)); + assertThrows( + RuntimeException.class, + () -> + timed.map( + List.of(1), + i -> { + try { + Thread.sleep(5000); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + return i; + })); + } + + @Test + void fastTasksUnaffected() { + final var timed = + Parallel.create(new Slf4jMdcContextPropagator()).withTimeout(Duration.ofSeconds(1)); + final var result = timed.map(List.of("a", "b"), s -> s.toUpperCase()); + assertEquals(List.of("A", "B"), result); + } + + @Test + void combinesWithMaxConcurrency() { + final var timed = + Parallel.create(new Slf4jMdcContextPropagator()) + .withMaxConcurrency(2) + .withTimeout(Duration.ofSeconds(5)); + final var results = timed.map(List.of(1, 2, 3, 4), i -> i * 10); + assertEquals(List.of(10, 20, 30, 40), results); + } + + @Test + void rejectsZeroTimeout() { + assertThrows( + IllegalArgumentException.class, + () -> Parallel.create(new Slf4jMdcContextPropagator()).withTimeout(Duration.ZERO)); + } + + @Test + void rejectsNegativeTimeout() { + assertThrows( + IllegalArgumentException.class, + () -> + Parallel.create(new Slf4jMdcContextPropagator()).withTimeout(Duration.ofMillis(-1))); + } + } +} From 332b08c7e5af6ace975674b8272c939967cec4f1 Mon Sep 17 00:00:00 2001 From: Praveer Rai <9212232+praveer-rai@users.noreply.github.com> Date: Wed, 22 Apr 2026 17:22:50 +0200 Subject: [PATCH 5/8] style: use descriptive variable names for Parallel instances in tests --- .../github/pjlabs/blockless/ParallelTest.java | 36 ++++++++++--------- 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/blockless-tests/src/test/java/io/github/pjlabs/blockless/ParallelTest.java b/blockless-tests/src/test/java/io/github/pjlabs/blockless/ParallelTest.java index 08d6065..dc8f57f 100644 --- a/blockless-tests/src/test/java/io/github/pjlabs/blockless/ParallelTest.java +++ b/blockless-tests/src/test/java/io/github/pjlabs/blockless/ParallelTest.java @@ -162,11 +162,12 @@ class BoundedConcurrency { @Test void limitsConcurrentTasks() { - final var bounded = Parallel.create(new Slf4jMdcContextPropagator()).withMaxConcurrency(2); + final var boundedParallel = + Parallel.create(new Slf4jMdcContextPropagator()).withMaxConcurrency(2); final var maxConcurrent = new AtomicInteger(0); final var current = new AtomicInteger(0); - bounded.map( + boundedParallel.map( List.of(1, 2, 3, 4, 5), i -> { final int c = current.incrementAndGet(); @@ -186,11 +187,12 @@ void limitsConcurrentTasks() { @Test void stillRunsConcurrently() { - final var bounded = Parallel.create(new Slf4jMdcContextPropagator()).withMaxConcurrency(3); + final var boundedParallel = + Parallel.create(new Slf4jMdcContextPropagator()).withMaxConcurrency(3); final var maxConcurrent = new AtomicInteger(0); final var current = new AtomicInteger(0); - bounded.map( + boundedParallel.map( List.of(1, 2, 3, 4, 5), i -> { final int c = current.incrementAndGet(); @@ -211,16 +213,18 @@ void stillRunsConcurrently() { @Test void preservesResultOrder() { - final var bounded = Parallel.create(new Slf4jMdcContextPropagator()).withMaxConcurrency(2); - final var results = bounded.map(List.of(3, 1, 2), i -> i * 10); + final var boundedParallel = + Parallel.create(new Slf4jMdcContextPropagator()).withMaxConcurrency(2); + final var results = boundedParallel.map(List.of(3, 1, 2), i -> i * 10); assertEquals(List.of(30, 10, 20), results); } @Test void propagatesMdc() { MDC.put("traceId", "bounded-trace"); - final var bounded = Parallel.create(new Slf4jMdcContextPropagator()).withMaxConcurrency(2); - final var results = bounded.map(List.of(1, 2, 3), i -> MDC.get("traceId")); + final var boundedParallel = + Parallel.create(new Slf4jMdcContextPropagator()).withMaxConcurrency(2); + final var results = boundedParallel.map(List.of(1, 2, 3), i -> MDC.get("traceId")); assertTrue(results.stream().allMatch("bounded-trace"::equals)); } @@ -237,20 +241,20 @@ class Timeout { @Test void completesWithinTimeout() { - final var timed = + final var timedParallel = Parallel.create(new Slf4jMdcContextPropagator()).withTimeout(Duration.ofSeconds(5)); - final var results = timed.map(List.of(1, 2, 3), i -> i * 10); + final var results = timedParallel.map(List.of(1, 2, 3), i -> i * 10); assertEquals(List.of(10, 20, 30), results); } @Test void interruptsSlowTask() { - final var timed = + final var timedParallel = Parallel.create(new Slf4jMdcContextPropagator()).withTimeout(Duration.ofMillis(50)); assertThrows( RuntimeException.class, () -> - timed.map( + timedParallel.map( List.of(1), i -> { try { @@ -265,19 +269,19 @@ void interruptsSlowTask() { @Test void fastTasksUnaffected() { - final var timed = + final var timedParallel = Parallel.create(new Slf4jMdcContextPropagator()).withTimeout(Duration.ofSeconds(1)); - final var result = timed.map(List.of("a", "b"), s -> s.toUpperCase()); + final var result = timedParallel.map(List.of("a", "b"), s -> s.toUpperCase()); assertEquals(List.of("A", "B"), result); } @Test void combinesWithMaxConcurrency() { - final var timed = + final var timedParallel = Parallel.create(new Slf4jMdcContextPropagator()) .withMaxConcurrency(2) .withTimeout(Duration.ofSeconds(5)); - final var results = timed.map(List.of(1, 2, 3, 4), i -> i * 10); + final var results = timedParallel.map(List.of(1, 2, 3, 4), i -> i * 10); assertEquals(List.of(10, 20, 30, 40), results); } From 1763a2a05f18cdbc3f3fcc676fa5170d6954335f Mon Sep 17 00:00:00 2001 From: Praveer Rai <9212232+praveer-rai@users.noreply.github.com> Date: Wed, 22 Apr 2026 17:24:33 +0200 Subject: [PATCH 6/8] style: rename private helpers to boundedSupplier/timedSupplier for clarity --- .../main/java/io/github/pjlabs/blockless/Parallel.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/blockless/src/main/java/io/github/pjlabs/blockless/Parallel.java b/blockless/src/main/java/io/github/pjlabs/blockless/Parallel.java index a928886..b1fc393 100644 --- a/blockless/src/main/java/io/github/pjlabs/blockless/Parallel.java +++ b/blockless/src/main/java/io/github/pjlabs/blockless/Parallel.java @@ -76,14 +76,14 @@ public Parallel withTimeout(Duration timeout) { */ public Supplier async(Supplier task) { Objects.requireNonNull(task, "task"); - var effective = semaphore != null ? bounded(task) : task; + var wrappedSupplier = semaphore != null ? boundedSupplier(task) : task; if (timeout != null) { - effective = timed(effective); + wrappedSupplier = timedSupplier(wrappedSupplier); } - return Blockless.supplier(CallableContext.wrap(effective::get, propagators)); + return Blockless.supplier(CallableContext.wrap(wrappedSupplier::get, propagators)); } - private Supplier timed(Supplier task) { + private Supplier timedSupplier(Supplier task) { return () -> { final var taskThread = Thread.currentThread(); final var timer = @@ -104,7 +104,7 @@ private Supplier timed(Supplier task) { }; } - private Supplier bounded(Supplier task) { + private Supplier boundedSupplier(Supplier task) { return () -> { semaphore.acquireUninterruptibly(); try { From cbb32bd248b6ed957abebee3e07f4ae0df90cb8f Mon Sep 17 00:00:00 2001 From: Praveer Rai <9212232+praveer-rai@users.noreply.github.com> Date: Wed, 22 Apr 2026 17:29:35 +0200 Subject: [PATCH 7/8] style: use descriptive variable names in README examples --- README.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index e61845d..45f1775 100644 --- a/README.md +++ b/README.md @@ -93,15 +93,15 @@ Supplier data = parallel.async(() -> fetchData()); Map profiles = parallel.asMap(userIds, id -> loadProfile(id)); // Limit concurrent tasks (extras park until a permit frees up) -var bounded = parallel.withMaxConcurrency(10); -List names = bounded.map(userIds, id -> fetchName(id)); +var boundedParallel = parallel.withMaxConcurrency(10); +List names = boundedParallel.map(userIds, id -> fetchName(id)); // Per-task timeout — thread is interrupted if task exceeds the deadline -var timed = parallel.withTimeout(Duration.ofSeconds(5)); -List names = timed.map(userIds, id -> fetchName(id)); +var timedParallel = parallel.withTimeout(Duration.ofSeconds(5)); +List names = timedParallel.map(userIds, id -> fetchName(id)); // Combines with withMaxConcurrency -var safe = parallel.withMaxConcurrency(10).withTimeout(Duration.ofSeconds(5)); +var safeParallel = parallel.withMaxConcurrency(10).withTimeout(Duration.ofSeconds(5)); // Collect results without failing fast — failed tasks return Either.fail() List> results = parallel.toEither(ids, id -> riskyFetch(id)); From 0fc5ad073a542d4412819d8cd513967b0e318f7f Mon Sep 17 00:00:00 2001 From: Praveer Rai <9212232+praveer-rai@users.noreply.github.com> Date: Mon, 4 May 2026 12:22:47 +0200 Subject: [PATCH 8/8] fix: don't interrupt completed tasks in timedSupplier MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Race condition: if Thread.sleep(timeout) returns naturally between task completion and the timer's interrupt call, the timer would interrupt a thread whose task already finished — polluting its interrupt flag for any subsequent blocking work on that thread. Fix: AtomicBoolean done flag. Timer checks it before interrupting; the finally block sets done=true before cancelling the timer. --- .../src/main/java/io/github/pjlabs/blockless/Parallel.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/blockless/src/main/java/io/github/pjlabs/blockless/Parallel.java b/blockless/src/main/java/io/github/pjlabs/blockless/Parallel.java index b1fc393..7b894fc 100644 --- a/blockless/src/main/java/io/github/pjlabs/blockless/Parallel.java +++ b/blockless/src/main/java/io/github/pjlabs/blockless/Parallel.java @@ -10,6 +10,7 @@ import java.util.Objects; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.function.Supplier; @@ -86,12 +87,15 @@ public Supplier async(Supplier task) { private Supplier timedSupplier(Supplier task) { return () -> { final var taskThread = Thread.currentThread(); + final var done = new AtomicBoolean(false); final var timer = Thread.startVirtualThread( () -> { try { Thread.sleep(timeout); - taskThread.interrupt(); + if (!done.get()) { + taskThread.interrupt(); + } } catch (InterruptedException ignored) { // Timer cancelled — task completed in time } @@ -99,6 +103,7 @@ private Supplier timedSupplier(Supplier task) { try { return task.get(); } finally { + done.set(true); timer.interrupt(); } };