Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@
package com.vaadin.testbench.unit;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import com.example.base.signals.SignalsView;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

import com.vaadin.flow.signals.SignalEnvironment;

@ViewPackages(packages = "com.example.base.signals")
@Timeout(10)
public class SignalsTest extends UIUnitTest {
Expand Down Expand Up @@ -67,6 +70,30 @@ void attachedComponent_triggerSignalFromNonUIThreadThroughComponentEffect_effect
Assertions.assertEquals("Counter: 10", counterTester.getText());
}

@Test
void effectDispatcher_routesToTestQueue_notServiceThreadPool()
throws InterruptedException {
// On the test thread, both VaadinServiceEnvironment (from
// MockVaadin) and TestSignalEnvironment are active. The default
// effect dispatcher must resolve to TestSignalEnvironment's queue
// (via registerFirst) so that runPendingSignalsTasks() can drive
// effect execution deterministically. Without registerFirst,
// VaadinServiceEnvironment's thread pool would be used instead,
// making effects run asynchronously outside the test's control.
navigate(SignalsView.class);

var latch = new CountDownLatch(1);
SignalEnvironment.getDefaultEffectDispatcher()
.execute(latch::countDown);

Assertions.assertFalse(latch.await(50, TimeUnit.MILLISECONDS),
"Task should be queued, not executed immediately on a "
+ "thread pool");
runPendingSignalsTasks();
Assertions.assertTrue(latch.await(0, TimeUnit.MILLISECONDS),
"Task should have executed after draining the test queue");
}

@Test
void attachedComponent_slowEffect_effectEvaluatedAsynchronously() {
var view = navigate(SignalsView.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
* <p>
* How it works:
* <ul>
* <li>{@link #getResultNotifier()} and {@link #getEffectDispatcher()} return
* executors that enqueue tasks into an internal queue.</li>
* <li>{@link #getEffectDispatcher()} returns an executor that enqueues tasks
* into an internal queue. {@link #getResultNotifier()} returns {@code null} so
* that result notifications fall through to the next environment or run
* immediately.</li>
* <li>Tests call {@link #runPendingTasks(long, TimeUnit)} to dequeue and run
* all pending tasks on the calling thread.</li>
* <li>If the current thread holds a {@link VaadinSession} lock, the lock is
Expand Down Expand Up @@ -72,7 +74,7 @@ private TestSignalEnvironment() {
*/
static TestSignalEnvironment register() {
TestSignalEnvironment environment = new TestSignalEnvironment();
environment.cleanup = SignalEnvironment.register(environment);
environment.cleanup = SignalEnvironment.registerFirst(environment);
return environment;
}

Expand Down Expand Up @@ -102,79 +104,95 @@ protected boolean isActive() {

@Override
protected Executor getResultNotifier() {
return createTaskEnqueueExecutor();
// Return null so result notifications fall through to the next
// environment (e.g. VaadinServiceEnvironment) or to the immediate
// executor. This keeps result processing synchronous on the calling
// thread, which is important for deterministic test behavior when
// signal operations are triggered on the test thread.
return null;
}

@Override
protected Executor getEffectDispatcher() {
return createTaskEnqueueExecutor();
}

private Executor createTaskEnqueueExecutor() {
return tasks::offer;
}

/**
* Executes all pending tasks from the queue, waiting for the first task to
* arrive if necessary. Once the first task is processed, all remaining
* tasks are processed immediately without additional waiting.
* Executes pending tasks from the queue, continuously polling for new tasks
* until the timeout expires with no new task arriving.
*
* <p>
* If a {@link VaadinSession} lock is held by the current thread, it is
* temporarily released during the wait for the first task, allowing
* background threads to acquire the lock and enqueue tasks. The lock is
* automatically reacquired before this method returns.
* temporarily released while polling for tasks, allowing background threads
* to acquire the lock and enqueue tasks. The lock is reacquired before
* running each task and released again before the next poll.
*
* <p>
* If the current thread is interrupted while waiting for tasks, the method
* restores the interrupt status and fails with an {@link AssertionError}.
*
* @param maxWaitTime
* the maximum time to wait for the first task to arrive in the
* the maximum time to wait for the next task to arrive in the
* given time unit. If &lt;= 0, returns immediately if no tasks
* are available.
* @param unit
* the time unit of the timeout value
* @return {@code true} if any pending Signals tasks were processed.
*/
boolean runPendingTasks(long maxWaitTime, TimeUnit unit) {
long waitMillis = unit.toMillis(maxWaitTime);
long deadlineMillis = System.currentTimeMillis()
+ unit.toMillis(maxWaitTime);
VaadinSession session = VaadinSession.getCurrent();
boolean hadLock = false;
if (session != null && session.hasLock()) {
hadLock = true;
session.unlock();
}
try {
// Wait for the first task with the specified timeout
Runnable task;
try {
task = waitMillis > 0
? tasks.poll(waitMillis, TimeUnit.MILLISECONDS)
: tasks.poll();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new AssertionError(
"Thread interrupted while waiting for pending Signals tasks");
}
if (task == null) {
LoggerFactory.getLogger(TestSignalEnvironment.class).debug(
"No pending Signals tasks found after waiting for {} {}",
maxWaitTime, unit);
return false;
}
while (task != null) {
task.run();
// Process remaining tasks immediately, do not wait for
// additional tasks to be enqueued
task = tasks.poll();
boolean processedAny = false;
while (true) {
long remainingMillis = deadlineMillis
- System.currentTimeMillis();
Runnable task;
try {
task = remainingMillis > 0
? tasks.poll(remainingMillis, TimeUnit.MILLISECONDS)
: tasks.poll();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new AssertionError(
"Thread interrupted while waiting for pending Signals tasks");
}
if (task == null) {
if (!processedAny) {
LoggerFactory.getLogger(TestSignalEnvironment.class)
.debug("No pending Signals tasks found after waiting for {} {}",
maxWaitTime, unit);
}
break;
}
// Re-acquire the session lock before running the task so
// that DOM operations (which assert the lock is held) work
// correctly when the effect runs directly on the test
// thread instead of going through ui.access().
if (hadLock) {
session.lock();
}
try {
task.run();
} finally {
if (hadLock) {
session.unlock();
}
}
processedAny = true;
}
return processedAny;
} finally {
if (hadLock) {
session.lock();
}
}
return true;
}

}
Loading