This guide covers the architecture, patterns, and best practices for developing the hensu-server module.
- Architecture Overview
- Local Development
- Server Initialization
- Package Structure
- Multi-Tenancy
- REST API Development
- SSE Streaming
- MCP Integration
- Testing
- Distributed Recovery (Leasing)
- Pause / Resume Protocol
- GraalVM Native Image
- Configuration
- Best Practices
- See Also
The server module extends hensu-core with HTTP capabilities. Core infrastructure is initialized
via HensuFactory.builder() - never by constructing components directly.
flowchart TD
subgraph server["hensu-server"]
direction TB
subgraph api["api/ (REST + SSE)"]
direction LR
wr(["WorkflowResource"]) ~~~ er(["ExecutionResource"]) ~~~ eer(["ExecutionEventResource"])
end
subgraph svc["service/"]
direction LR
ws(["WorkflowService"])
end
subgraph mid[""]
direction LR
streaming(["streaming/\n(SSE Events)"]) ~~~ mcp(["mcp/\n(MCP Split-Pipe)"])
end
subgraph infra[""]
direction LR
action(["action/\nServerActionExecutor"]) ~~~ config(["config/\nEnvironmentProducer\nServerConfiguration"]) ~~~ tenant(["tenant/\nTenantContext\n(ScopedValue)"])
end
api --> svc --> mid --> infra
end
subgraph core["hensu-core"]
direction LR
we(["WorkflowExecutor"]) ~~~ ar(["AgentRegistry"]) ~~~ pp(["PlanPipeline"]) ~~~ shr(["StepHandlerRegistry"]) ~~~ tr(["ToolRegistry"])
end
server --> core
style server fill:#2c2c2e, stroke:#3a3a3c, color:#ebebf5, stroke-width:1px
style api fill:#3a3a3c, stroke:#48484a, color:#ebebf5, stroke-width:1px
style svc fill:#3a3a3c, stroke:#48484a, color:#ebebf5, stroke-width:1px
style mid fill:#3a3a3c, stroke:#48484a, color:#ebebf5, stroke-width:1px
style infra fill:#3a3a3c, stroke:#48484a, color:#ebebf5, stroke-width:1px
style core fill:#2c2c2e, stroke:#3a3a3c, color:#ebebf5, stroke-width:1px
style wr fill:#2c2c2e, stroke:#48484a, color:#ebebf5, stroke-width:1px
style er fill:#2c2c2e, stroke:#48484a, color:#ebebf5, stroke-width:1px
style eer fill:#2c2c2e, stroke:#48484a, color:#ebebf5, stroke-width:1px
style ws fill:#2c2c2e, stroke:#48484a, color:#ebebf5, stroke-width:1px
style streaming fill:#2c2c2e, stroke:#48484a, color:#ebebf5, stroke-width:1px
style mcp fill:#2c2c2e, stroke:#48484a, color:#ebebf5, stroke-width:1px
style action fill:#2c2c2e, stroke:#48484a, color:#ebebf5, stroke-width:1px
style config fill:#2c2c2e, stroke:#48484a, color:#ebebf5, stroke-width:1px
style tenant fill:#2c2c2e, stroke:#48484a, color:#ebebf5, stroke-width:1px
style we fill:#2c2c2e, stroke:#48484a, color:#ebebf5, stroke-width:1px
style ar fill:#2c2c2e, stroke:#48484a, color:#ebebf5, stroke-width:1px
style pp fill:#2c2c2e, stroke:#48484a, color:#ebebf5, stroke-width:1px
style shr fill:#2c2c2e, stroke:#48484a, color:#ebebf5, stroke-width:1px
style tr fill:#2c2c2e, stroke:#48484a, color:#ebebf5, stroke-width:1px
linkStyle default stroke:#0A84FF, stroke-width:1px
- HTTP request arrives at REST resource (
api/) - Tenant ID extracted from the JWT
tenant_idclaim viaRequestTenantResolver TenantContextestablished for the request scope- Service layer processes business logic
- Core engine executes workflow
- Events broadcast via SSE to subscribed clients
- Docker (
docker-compose up -d) openssl(keypair generation)
1. Configure environment
cp .env.example .envEdit .env — set HENSU_DB_PASSWORD and verify HENSU_JWT_PUBLIC_KEY path.
.env is gitignored; never commit it.
2. Generate your JWT keypair
Keys are personal per-developer. There is no shared dev key — every developer generates their own.
mkdir -p dev/keys
# Private key — never commit
openssl genrsa -out dev/keys/privateKey.pem 2048
# Public key — used by the server to verify tokens
openssl rsa -in dev/keys/privateKey.pem -pubout -out dev/keys/publicKey.pemBoth files land in dev/keys/ (gitignored). Set HENSU_JWT_PUBLIC_KEY=file:/absolute/path/to/repo/dev/keys/publicKey.pem in .env.
3. Start PostgreSQL
docker-compose up -dFlyway runs V1__create_schema automatically on server startup. No manual DB setup needed.
4. Run the server
./gradlew :hensu-server:quarkusDevThe %dev profile reads DB credentials from your environment (sourced from .env by your shell
or IDE). Quarkus Dev Services is disabled — the docker-compose container is used instead.
5. Generate a dev JWT token
TOKEN=$(bash dev/gen-jwt.sh)Token is valid for 1 hour. Pass it as a Bearer header:
curl -H "Authorization: Bearer $TOKEN" http://localhost:8080/api/v1/workflows
hensu push my-workflow --token "$TOKEN"| Profile | Database | Auth | Use case |
|---|---|---|---|
%dev |
docker-compose PostgreSQL | JWT required | Local development |
%inmem |
In-memory (no DB) | Disabled | Integration tests |
%prod |
Env var HENSU_DB_URL |
JWT required | Production |
The server MUST use HensuFactory.builder() to create core infrastructure.
This is wired through CDI in three classes:
Creates the HensuEnvironment singleton via HensuFactory. Conditionally wires
JDBC repositories when a DataSource is available (default), or falls back to in-memory
when the inmem profile disables the datasource:
@Produces
@ApplicationScoped
public HensuEnvironment hensuEnvironment() {
Properties properties = extractHensuProperties();
HensuFactory.Builder factoryBuilder = HensuFactory.builder()
.loadCredentials(properties)
.agentProviders(List.of(new LangChain4jProvider()))
.actionExecutor(actionExecutor) // ServerActionExecutor (send-action dispatcher)
.planObservers(planObservers.stream().toList())
.planResponseParser(new JacksonPlanResponseParser(objectMapper));
// Conditional persistence: JDBC when DataSource available, in-memory otherwise
boolean dsActive = config.getOptionalValue("quarkus.datasource.active", Boolean.class)
.orElse(true);
if (dsActive && dataSourceInstance.isResolvable()) {
DataSource ds = dataSourceInstance.get();
factoryBuilder
.workflowRepository(new JdbcWorkflowRepository(ds))
.workflowStateRepository(new JdbcWorkflowStateRepository(ds, objectMapper, leaseManager.getServerNodeId()));
}
hensuEnvironment = factoryBuilder.build();
registerGenericHandlers();
return hensuEnvironment;
}Delegates HensuEnvironment components for CDI injection and produces server-specific beans.
Repository instances are created by HensuEnvironmentProducer (JDBC or in-memory depending on
profile) and exposed here as CDI beans via delegation:
// Core components delegated from HensuEnvironment
@Produces @Singleton
public WorkflowExecutor workflowExecutor(HensuEnvironment env) {
return env.getWorkflowExecutor();
}
@Produces @Singleton
public AgentRegistry agentRegistry(HensuEnvironment env) {
return env.getAgentRegistry();
}
// Repositories — created by HensuFactory, delegated for CDI injection
@Produces @Singleton
public WorkflowRepository workflowRepository(HensuEnvironment env) {
return env.getWorkflowRepository();
}
@Produces @Singleton
public WorkflowStateRepository workflowStateRepository(HensuEnvironment env) {
return env.getWorkflowStateRepository();
}
// Shared ObjectMapper with Hensu serialization support
@Produces @Singleton
public ObjectMapper objectMapper() {
return WorkflowSerializer.createMapper();
}Note: Use
@Singleton(not@ApplicationScoped) only for@Producesdelegate methods like these.@ApplicationScopedcreates a CDI client proxy that breaksinstanceofchecks against the concrete type returned (e.g.,InMemoryWorkflowStateRepositoryused in test cleanup). Regular CDI beans that are not produced via@Produces— service classes, scheduled jobs, handlers — should use@ApplicationScoped.
Server-specific ActionExecutor that routes Action.Send to registered handlers (such as McpSidecar) and rejects Action.Execute (local command execution):
@Override
public ActionResult execute(Action action, Map<String, Object> context) {
return switch (action) {
case Action.Send send -> executeSend(send, context);
case Action.Execute exec -> ActionResult.failure(
"Server mode does not support local command execution");
};
}- NEVER create
WorkflowExecutor,AgentRegistry, or other core components directly - NEVER create a
StubAgentmanually —HensuFactoryhas stub mode built-in - NEVER create repository instances directly in server producers — delegate from
HensuEnvironment - NEVER support local command execution in server mode
io.hensu.server/
├── action/ # Server-specific action execution
│ └── ServerActionExecutor # Send-action dispatcher (rejects Action.Execute)
│
├── api/ # HTTP endpoints (REST + SSE)
│ ├── WorkflowResource # Workflow definition management (push/pull/delete/list)
│ ├── ExecutionResource # Execution runtime (start/resume/status/plan)
│ ├── ExecutionEventResource # Execution monitoring SSE
│ ├── McpGatewayResource # MCP split-pipe SSE/POST
│ ├── ExecutionStartRequest # Request DTO for POST /executions
│ ├── ResumeRequest # Request DTO for POST /executions/{id}/resume
│ ├── ResumeResponse # Response DTO for POST /executions/{id}/resume
│ ├── PushWorkflowResponse # Response DTO for POST /workflows
│ ├── WorkflowSummary # Response DTO for GET /workflows list entries
│ ├── GatewayStatusResponse # Response DTO for GET /mcp/status
│ └── ClientStatusResponse # Response DTO for GET /mcp/clients/{id}/status
│
├── validation/ # Input validation (Bean Validation)
│ ├── InputValidator # Shared validation predicates (safe-ID, dangerous chars, size)
│ ├── ValidId # Custom identifier constraint annotation
│ ├── ValidIdValidator # Regex-based validator implementation
│ ├── ValidMessage # Custom constraint for raw message body strings
│ ├── ValidMessageValidator # Size-limit + control-character validator
│ ├── ValidWorkflow # Custom constraint for Workflow request bodies
│ ├── ValidWorkflowValidator # Deep-validates workflow object graph
│ ├── LogSanitizer # Strips CR/LF for log injection prevention
│ ├── IllegalArgumentExceptionMapper # Maps IllegalArgumentException → 400
│ └── ConstraintViolationExceptionMapper # Global 400 error mapper
│
├── config/ # CDI configuration
│ ├── HensuEnvironmentProducer # HensuFactory → HensuEnvironment
│ ├── CoreModelNativeConfig # @RegisterForReflection — Hensu domain model
│ ├── LangChain4jNativeConfig # @RegisterForReflection — JDK HTTP transport
│ ├── LangChain4jAnthropicNativeConfig # @RegisterForReflection — Anthropic DTOs
│ ├── LangChain4jGeminiNativeConfig # @RegisterForReflection — Gemini DTOs
│ ├── ExecutionEventNativeConfig # @RegisterForReflection — SSE event sealed subtypes
│ ├── ServerBootstrap # Startup registrations
│ └── ServerConfiguration # CDI delegation + server beans
│
├── dev/ # Dev-only handlers (excluded from prod image)
│ └── SleepHandler # Simulates long-running node for crash-recovery tests
│
├── execution/ # Server-side execution listeners
│ ├── LoggingExecutionListener # Logs plan/step lifecycle events
│ └── CompositeExecutionListener # Combines multiple ExecutionListeners
│
├── mcp/ # MCP protocol implementation
│ ├── JsonRpc # JSON-RPC 2.0 message helper
│ ├── McpSessionManager # SSE session management
│ ├── McpConnection # Connection interface
│ ├── McpConnectionFactory # Factory for per-tenant connections
│ ├── McpConnectionPool # Connection pooling
│ ├── McpException # Checked MCP protocol errors
│ ├── McpSidecar # ActionHandler for MCP tools
│ ├── McpToolDiscovery # Runtime tool schema discovery + cache
│ ├── SseMcpConnection # SSE-based connection impl
│ └── TenantToolRegistry # Merges base + tenant MCP tools (MCP precedence)
│
├── security/ # JWT + tenant resolution + error mapping
│ ├── GlobalExceptionMapper # Global @Provider — normalizes errors to JSON
│ └── RequestTenantResolver # Extracts tenant_id claim from JWT
│
├── persistence/ # PostgreSQL persistence (plain JDBC)
│ ├── JdbcWorkflowRepository # Workflow definitions (JSONB)
│ ├── JdbcWorkflowStateRepository # Execution state snapshots (JSONB + lease columns)
│ ├── ExecutionLeaseManager # Distributed lease management (@ApplicationScoped)
│ ├── WorkflowPushLock # Cluster-wide push mutex (pg_advisory_xact_lock + JVM fallback)
│ ├── JdbcSupport # JDBC helper (queryList, update)
│ └── PersistenceException # Unchecked wrapper for SQLException
│
├── workflow/ # Business logic layer
│ ├── WorkflowService # Facade over registry + execution + query services
│ ├── WorkflowRegistryService # Push pipeline: WorkflowPushLock + SubWorkflowGraphValidator
│ ├── WorkflowExecutionService # Start/resume orchestration
│ ├── ExecutionQueryService # Read-side: status, plan, output, paused list
│ ├── ExecutionStateService # Snapshot load/save with split-brain guard
│ ├── ExecutionResultHandler # Shared ExecutionResult → snapshot + SSE dispatch
│ ├── WorkflowContextUtil # Filters internal (_-prefixed) keys from context
│ ├── ExecutionHeartbeatJob # Periodic heartbeat emission (@Scheduled)
│ ├── WorkflowRecoveryJob # Orphaned execution sweeper (@Scheduled)
│ ├── ExecutionStartResult / ExecutionOutput / ExecutionSummary / PlanInfo # DTOs
│ ├── ExecutionStatus # DTO for execution status (with correlationId)
│ └── {Execution,Workflow}{NotFound,Execution}Exception # Domain exceptions
│
├── streaming/ # Execution event streaming
│ ├── ExecutionEvent # Event DTOs (sealed interface)
│ └── ExecutionEventBroadcaster # PlanObserver + broadcaster
│
├── review/ # Server-side review handling
│ └── InteractiveReviewHandler # @ApplicationScoped default ReviewHandler for plan reviews
│
└── tenant/ # Multi-tenancy
├── TenantContext # ScopedValue-based context
├── TenantAware # Marker interface
└── TenantResolutionInterceptor
Uses Java 25 ScopedValue for thread-safe tenant isolation:
// In REST resource or interceptor
TenantInfo tenant = TenantInfo.withMcp(tenantId, mcpEndpoint);
TenantContext.runAs(tenant, () -> {
// All code in this scope sees the tenant
TenantInfo current = TenantContext.current();
// Core engine, MCP calls, DB queries are tenant-scoped
workflowExecutor.execute(workflow, context);
});- Inject or access
TenantContext.current()where needed - Use tenant ID for data isolation (DB queries, caches)
- Route MCP calls to tenant-specific endpoints
@ApplicationScoped
public class TenantAwareRepository {
public List<Workflow> findAll() {
String tenantId = TenantContext.current().tenantId();
return db.query("SELECT * FROM workflows WHERE tenant_id = ?", tenantId);
}
}The REST API is split into two distinct resources:
-
WorkflowResource (
/api/v1/workflows) - Workflow definition management (CLI integration)- Push, pull, delete, list workflow definitions
- Uses
WorkflowRepositorydirectly
-
ExecutionResource (
/api/v1/executions) - Execution runtime (client integration)- Start, resume, status, plan, result
- Uses
WorkflowServicefor business logic
Workflow deletion is a soft-delete. DELETE /workflows/{workflowId} sets a deleted_at
timestamp rather than removing the row – hard-delete would violate FK constraints from
execution_states that reference the workflow. All queries filter on deleted_at IS NULL,
so soft-deleted workflows are invisible to the application.
Do not mix definition management with execution in the same resource.
@Path("/api/v1/myresource")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
public class MyResource {
private final MyService service;
@Inject
public MyResource(MyService service) {
this.service = service;
}
@Inject RequestTenantResolver tenantResolver;
@GET
@Path("/{id}")
public Response get(@PathParam("id") String id) {
String tenantId = tenantResolver.tenantId();
// Business logic via service layer
MyEntity entity = service.findById(tenantId, id);
return Response.ok(entity).build();
}
}| Status | Usage |
|---|---|
| 200 OK | Successful GET, PUT, POST with body |
| 201 Created | Resource created (workflow push - new) |
| 202 Accepted | Async operation started (execution start) |
| 204 No Content | Successful DELETE |
| 400 Bad Request | Invalid input, missing headers |
| 404 Not Found | Resource not found |
| 500 Internal Server Error | Unexpected errors |
The server uses Bean Validation (Hibernate Validator via Quarkus) to enforce input constraints declaratively on REST endpoint parameters and request DTOs.
| Class | Role |
|---|---|
InputValidator |
Shared predicates: safe-ID pattern, dangerous-char detection, size limit |
@ValidId |
Custom constraint for path/query identifiers |
ValidIdValidator |
Validates against [a-zA-Z0-9][a-zA-Z0-9._-]{0,254} |
@ValidMessage |
Custom constraint for raw String message bodies |
ValidMessageValidator |
Checks non-blank, UTF-8 byte size limit, and dangerous control chars |
@ValidWorkflow |
Custom constraint for full Workflow request bodies |
ValidWorkflowValidator |
Deep-validates the entire workflow object graph (IDs + free text) |
LogSanitizer |
Strips CR/LF from values before logging (defense-in-depth) |
ConstraintViolationExceptionMapper |
Global @Provider — translates violations into standardized 400 JSON |
All classes live in io.hensu.server.validation.
Apply @ValidId to every path parameter and query parameter that accepts a user-provided
identifier (workflowId, executionId, clientId, etc.). The constraint rejects null,
blank, and malformed strings — preventing path traversal, injection, and overly long IDs
at the API boundary.
Valid identifiers:
- Start with an alphanumeric character (
a-z,A-Z,0-9) - Contain only alphanumeric characters, dots (
.), hyphens (-), and underscores (_) - Are 1–255 characters long
// Path parameter — @ValidId validates the raw string
@GET
@Path("/{workflowId}")
public Response get(@PathParam("workflowId") @ValidId String workflowId) {
// workflowId is guaranteed safe here
}
// Request body DTO — @Valid triggers nested validation, @NotNull rejects missing body
@POST
public Response create(@Valid @NotNull CreateRequest request) { ... }
// DTO record with field-level constraints
public record CreateRequest(
@NotBlank(message = "workflowId is required") @ValidId String workflowId) {}Validation is triggered automatically by the JAX-RS pipeline — no manual checks needed.
Apply @ValidMessage to raw String body parameters that receive free-text content (e.g., MCP
messages, chat inputs). The constraint enforces three checks:
- Not null or blank — rejects missing bodies
- UTF-8 byte size — must not exceed
maxBytes(default 1 MB) - No dangerous control characters — rejects U+0000–U+0008, U+000B, U+000C, U+000E–U+001F, U+007F. TAB, LF, and CR are permitted since they are legitimate in free text.
Each failing condition produces a distinct violation message:
| Condition | Violation message |
|---|---|
| Null or blank | Message body is required |
| Exceeds byte limit | Message exceeds maximum allowed size |
| Control characters | Message contains illegal control characters |
// Default 1 MB limit
@POST
@Consumes(MediaType.APPLICATION_JSON)
public Uni<Response> receive(@ValidMessage String body) { ... }
// Custom size limit (64 KB)
@POST
public Uni<Response> receive(@ValidMessage(maxBytes = 65_536) String body) { ... }ConstraintViolationExceptionMapper catches all ConstraintViolationExceptions and returns
a standardized JSON response consistent with the GlobalExceptionMapper format:
{"error": "workflowId: must be a valid identifier (alphanumeric, dots, hyphens, underscores; 1-255 chars)", "status": 400}Multiple violations are joined with ; .
When a Workflow object is submitted via POST /api/v1/workflows, the @ValidWorkflow constraint
triggers ValidWorkflowValidator, which deep-validates the entire object graph:
- Identifier fields (workflow ID, node IDs, agent IDs, branch IDs, rubric keys, etc.) must match
the safe-ID pattern
[a-zA-Z0-9][a-zA-Z0-9._-]{0,254} - Free-text fields (prompts, instructions, rubric content, metadata) are scanned for dangerous control characters (U+0000–U+0008, U+000B, U+000C, U+000E–U+001F, U+007F). Tabs, newlines, and carriage returns are permitted since they are legitimate in prompt text.
The validator walks all node types via pattern matching (StandardNode, ParallelNode,
SubWorkflowNode, ForkNode, JoinNode, GenericNode, EndNode) and validates type-specific
fields (e.g., branch prompts, input/output mappings, await targets).
@POST
public Response push(@ValidWorkflow Workflow workflow) {
// workflow is guaranteed safe here — all IDs and text fields validated
}ValidWorkflowValidator checks the incoming workflow in isolation. Cross-workflow
consistency — cycles and dangling sub-workflow references — is a second pass handled by
WorkflowRegistryService before the row is written:
// WorkflowRegistryService.push()
pushLock.withLock(tenantId, workflowId, () -> {
SubWorkflowGraphValidator.validate(
workflow,
id -> id.equals(workflow.getId())
? Optional.of(workflow) // incoming overrides repo
: repository.findById(tenantId, id)); // lazy resolution
repository.save(tenantId, workflow);
});Key properties:
- Cluster-wide mutex —
WorkflowPushLockwraps the save inpg_advisory_xact_lock(JVMReentrantLockfallback whenquarkus.datasource.active=false). Without this, two concurrent pushes on different nodes could each observe a clean graph and together introduce a cycle. - Post-push view — the resolver short-circuits to the incoming workflow for its own id, so the DFS sees the graph as it will exist after the push — no intermediate write is needed, and a push that would fix an existing cycle validates correctly.
- Single DFS —
SubWorkflowGraphValidator.validate(Workflow, Function)uses onegloballyVisitedset for both cycle detection and dangling-ref detection. - Tenant-scoped resolution —
repository.findById(tenantId, id)never looks up workflows from another tenant, so a sub-workflow target that exists under a different tenant is rejected as dangling. - Runtime bounds — even if validation passes, recursion is capped at
SubWorkflowNodeExecutor.MAX_DEPTH = 16(tracked via_sub_workflow_depth) and_tenant_idis propagated into the child context, so a malicious deep chain cannot exhaust the stack and a child cannot escape its tenant boundary.
See the hensu-core Developer Guide — SubWorkflowGraphValidator checks
for the core-side algorithm and the CLI-only validate(Collection<Workflow>) overload.
LogSanitizer.sanitize() strips CR/LF characters from user-derived values before they reach log
output, preventing log injection attacks. Apply it at every log call site that includes
user-controlled input:
LOG.infov("Processing workflow: id={0}", LogSanitizer.sanitize(workflowId));- Add
@ValidIdto all path/query params accepting identifiers - Add
@ValidMessageto rawStringbody params receiving free-text content - Add
@ValidWorkflowtoWorkflowbody parameters (or@Valid @NotNullfor other DTOs) - Add field-level constraints (
@NotBlank,@ValidId,@Size, etc.) to DTO records - Use
LogSanitizer.sanitize()when logging any user-provided string - Write a test in
InputValidationIntegrationTestcovering the new constraints
See hensu-server/src/test/java/io/hensu/server/integration/InputValidationIntegrationTest.java for
comprehensive examples.
- ExecutionEventResource - One-way event streaming for monitoring
- McpGatewayResource - Split-pipe bidirectional communication
- Add record to
ExecutionEventsealed interface:
public sealed interface ExecutionEvent {
// ... existing events ...
/// My new event type.
record MyNewEvent(
String executionId,
String customField,
Instant timestamp) implements ExecutionEvent {
@Override
public String type() {
return "my.new.event";
}
public static MyNewEvent now(String executionId, String customField) {
return new MyNewEvent(executionId, customField, Instant.now());
}
}
}- Update
ExecutionEventBroadcaster.convertEvent()if mapping fromPlanEvent:
private ExecutionEvent convertEvent(String executionId, PlanEvent event) {
return switch (event) {
// ... existing cases ...
case PlanEvent.MyNewPlanEvent e -> ExecutionEvent.MyNewEvent.now(
executionId, e.customField());
};
}- Publish directly where needed:
broadcaster.publish(executionId, ExecutionEvent.MyNewEvent.now(executionId, "value"));ExecutionEventBroadcaster uses a ScopedValue — not ThreadLocal — to carry the current execution ID into
PlanObserver callbacks. This is mandatory for correctness with virtual threads (Project Loom).
Wrap execution blocks with runAs():
// WorkflowService — correct pattern
eventBroadcaster.runAs(executionId, () -> {
TenantContext.runAs(tenantId, () -> {
workflowExecutor.executeFrom(workflow, snapshot);
});
return null;
});Do not call broadcaster.setCurrentExecution() — that method no longer exists. ScopedValue is structurally
scoped: the binding is automatically released when runAs() returns, even on exception paths.
If you need to route PlanEvent callbacks from a background thread (e.g. an async agent) to the right execution,
call broadcaster.registerPlan(planId, executionId) before execution starts. The broadcaster will prefer the
plan→execution map over the ScopedValue when both are present.
The execution.completed SSE event now carries the final workflow output:
{
"type": "execution.completed",
"executionId": "exec-123",
"workflowId": "order-processing",
"success": true,
"finalNodeId": "end",
"output": {
"summary": "Order validated",
"items": 3
},
"timestamp": "2024-01-01T12:00:00Z"
}output contains the public workflow context — all keys not prefixed with _. Internal routing keys
(_tenant_id, _execution_id, _last_output, etc.) are stripped before publishing.
output may be empty {} if the workflow produced no public context keys.
For fan-out to multiple subscribers:
@ApplicationScoped
public class MyBroadcaster {
private final Map<String, BroadcastProcessor<MyEvent>> processors =
new ConcurrentHashMap<>();
public Multi<MyEvent> subscribe(String channel) {
BroadcastProcessor<MyEvent> processor = processors.computeIfAbsent(
channel, k -> BroadcastProcessor.create());
return processor;
}
public void publish(String channel, MyEvent event) {
BroadcastProcessor<MyEvent> processor = processors.get(channel);
if (processor != null) {
processor.onNext(event);
}
}
public void complete(String channel) {
BroadcastProcessor<MyEvent> processor = processors.remove(channel);
if (processor != null) {
processor.onComplete();
}
}
}After execution completes, clients can fetch the output via REST instead of (or in addition to) consuming the SSE stream:
GET /api/v1/executions/{executionId}/result
Authorization: Bearer <jwt>
Response (200 OK):
{
"executionId": "exec-123",
"workflowId": "order-processing",
"status": "COMPLETED",
"output": {
"summary": "Order validated",
"items": 3
}
}status is COMPLETED when currentNodeId is null in the snapshot, PAUSED otherwise. Internal context keys
(prefixed with _) are filtered the same way as in the SSE event. Returns 404 if the execution ID does not
exist for the requesting tenant.
Downstream (SSE): Server → Client
- JSON-RPC requests pushed via EventSource
- Client receives tool call requests
Upstream (HTTP POST): Client → Server
- JSON-RPC responses sent via POST /mcp/message
- Server correlates by request ID
Via DSL action — send the tool name directly; ServerActionExecutor automatically
falls back to MCP for any unrecognized handler:
node("process") {
action {
send("read_file", mapOf("path" to "/data/input.json"))
}
}The manual MCP envelope (send("mcp", mapOf("tool" to "read_file", ...))) is also accepted
for explicitness, but unnecessary — the fallback path in ServerActionExecutor.executeSend()
wraps the call automatically when no direct handler is registered for the tool name.
Via direct connection:
McpConnection conn = connectionPool.getForTenant(tenantId);
Map<String, Object> result = conn.callTool("search", Map.of("query", "test"));MCP tools are discovered at runtime — no server code changes are required to support new tools.
- Discovery & caching:
McpToolDiscoveryfetches the tool schema from the tenant's MCP server and caches it per endpoint. On the first call, schemas are fetched; subsequent calls are served from cache. - Precedence:
TenantToolRegistrymerges base (built-in) tools with the tenant's MCP tools. On naming collisions, the MCP tool takes precedence — tenants can override built-in tools with their own MCP implementations. - No server changes:
McpSidecar.execute()resolves tool names dynamically from the JSON-RPC payload. Adding a new tool on the MCP server side is sufficient; noMcpSidecarupdate is needed.
class MyResourceTest {
private MyService service;
private MyResource resource;
@BeforeEach
void setUp() {
service = mock(MyService.class);
resource = new MyResource(service);
}
@Test
void shouldReturnEntityWhenFound() {
when(service.findById("tenant-1", "id-1"))
.thenReturn(new MyEntity("id-1", "data"));
Response response = resource.get("id-1", "tenant-1");
assertThat(response.getStatus()).isEqualTo(200);
verify(service).findById("tenant-1", "id-1");
}
@Test
void shouldReturn403WhenNoTenantContext() {
// RequestTenantResolver throws ForbiddenException when no JWT tenant_id claim
assertThatThrownBy(() -> resource.get("id-1"))
.isInstanceOf(ForbiddenException.class)
.hasMessageContaining("No tenant context");
}
}@QuarkusTest
class MyResourceIT {
@Test
void shouldPushWorkflow() {
given()
.auth().preemptive().oauth2("test-token")
.contentType(ContentType.JSON)
.body(workflowJson)
.when()
.post("/api/v1/workflows")
.then()
.statusCode(201)
.body("id", notNullValue())
.body("created", equalTo(true));
}
@Test
void shouldStartExecution() {
given()
.auth().preemptive().oauth2("test-token")
.contentType(ContentType.JSON)
.body(Map.of("workflowId", "my-workflow", "context", Map.of()))
.when()
.post("/api/v1/executions")
.then()
.statusCode(202)
.body("executionId", notNullValue());
}
}Full-stack integration tests exercise the workflow engine end-to-end within a bootstrapped Quarkus context, using the Stub Agent System to intercept all model requests. Behavior tests use the inmem profile (no Docker required); repository tests use Testcontainers PostgreSQL.
| Class | Role |
|---|---|
IntegrationTestBase |
Abstract base: CDI injection, state cleanup, helper methods (@TestProfile(InMemoryTestProfile.class)) |
InMemoryTestProfile |
Quarkus test profile activating inmem — disables PostgreSQL and Flyway |
TestActionHandler |
Records action payloads for plan/action dispatch assertions |
TestReviewHandler |
Scriptable review decisions (approve, backtrack, reject) |
TestValidatorHandler |
Generic node handler for "validator" type nodes |
TestPauseHandler |
Generic node handler that pauses on first call, succeeds on next |
All infrastructure lives in io.hensu.server.integration (package-private).
Every integration test extends IntegrationTestBase, which provides:
- CDI-injected beans:
workflowRepository,workflowStateRepository,workflowService,agentRegistry,hensuEnvironment - Per-test cleanup (
@BeforeEach): clears stub responses, workflow repository, and workflow state repository - Helper methods:
| Method | Description |
|---|---|
loadWorkflow(resourceName) |
Loads JSON fixtures from /workflows/ |
pushAndExecute(workflow, context) |
Saves workflow + executes under TEST_TENANT |
pushAndExecuteWithMcp(workflow, ctx, endpoint) |
Executes with MCP-enabled tenant context |
registerStub(key, response) |
Programmatic stub registration by node ID or agent ID |
registerStub(scenario, key, response) |
Scenario-specific stub registration |
@QuarkusTest
class MyWorkflowIntegrationTest extends IntegrationTestBase {
@Test
void shouldExecuteWorkflow() {
// 1. Load workflow fixture
Workflow workflow = loadWorkflow("my-workflow.json");
// 2. Register stub responses by node ID
registerStub("research", "Research findings about the topic");
registerStub("draft", "Article draft based on research");
// 3. Execute
ExecutionStartResult result = pushAndExecute(
workflow, Map.of("topic", "AI"));
// 4. Assert on final snapshot
List<HensuSnapshot> snapshots = workflowStateRepository
.findByWorkflowId(TEST_TENANT, result.workflowId());
assertThat(snapshots).isNotEmpty();
HensuSnapshot snapshot = snapshots.getLast();
assertThat(snapshot.checkpointReason()).isEqualTo("completed");
assertThat(snapshot.context())
.containsEntry("research", "Research findings about the topic");
}
}Register stubs by node ID (e.g., "research", "draft"), not by agent ID. The StandardNodeExecutor propagates the current node ID into the execution context, so StubResponseRegistry resolves node-ID-based stubs first. See Stub Agent System — Response Resolution Order for the full lookup chain.
TestReviewHandler is a CDI @Alternative (priority 1) that replaces the default AUTO_APPROVE handler. Enqueue decisions before execution — they are consumed in FIFO order:
@Inject TestReviewHandler testReviewHandler;
@BeforeEach
void resetReviewHandler() {
testReviewHandler.reset();
}
@Test
void shouldBacktrackAndRetry() {
Workflow workflow = loadWorkflow("review-workflow.json");
registerStub("research", "Research");
registerStub("draft", "Content");
// First review: backtrack to "research" node
testReviewHandler.enqueueDecision(
new ReviewDecision.Backtrack("research", null, "Needs more detail"));
// Second review: approve
testReviewHandler.enqueueDecision(new ReviewDecision.Approve());
ExecutionStartResult result = pushAndExecute(
workflow, Map.of("topic", "test"));
HensuSnapshot snapshot = /* ... get last snapshot ... */;
List<BacktrackEvent> backtracks = snapshot.history().getBacktracks();
assertThat(backtracks).isNotEmpty();
assertThat(backtracks.getFirst().getFrom()).isEqualTo("draft");
assertThat(backtracks.getFirst().getTo()).isEqualTo("research");
}When the queue is empty, TestReviewHandler falls back to a configurable default (approve by default). Use setDefaultDecision() to change the fallback.
TestActionHandler records all payloads dispatched to the "test-tool" handler ID:
@Inject TestActionHandler testActionHandler;
@Inject ActionExecutor actionExecutor;
@BeforeEach
void resetActionHandler() {
testActionHandler.reset();
actionExecutor.registerHandler(testActionHandler);
}
@Test
void shouldDispatchPlanSteps() {
Workflow workflow = loadWorkflow("plan-static.json");
registerStub("execute", "Plan execution complete");
pushAndExecute(workflow, Map.of("task", "test"));
List<Map<String, Object>> payloads = testActionHandler.getReceivedPayloads();
assertThat(payloads).hasSize(2);
assertThat(payloads.getFirst()).containsEntry("action", "search");
assertThat(payloads.get(1)).containsEntry("action", "process");
}Place workflow definitions in src/test/resources/workflows/. Use model: "stub" for all agents:
{
"id": "my-workflow",
"version": "1.0.0",
"startNode": "process",
"agents": {
"writer": { "id": "writer", "role": "writer", "model": "stub", "temperature": 0.7 }
},
"nodes": {
"process": {
"id": "process",
"nodeType": "STANDARD",
"agentId": "writer",
"prompt": "Write about {topic}",
"transitionRules": [{ "type": "success", "targetNode": "done" }]
},
"done": { "id": "done", "nodeType": "END", "status": "SUCCESS" }
}
}Rubrics are parsed at build time and stored directly on workflow nodes. Workflow JSON fixtures
include inline rubric content in the "rubric" field, which the deserializer parses into typed
Rubric objects at load time. No separate registration step is needed — just call
loadWorkflow("fixture.json") and the rubric is ready.
JDBC repository tests live in io.hensu.server.persistence and use Testcontainers PostgreSQL (no Quarkus context). They extend JdbcRepositoryTestBase which provides:
- A shared PostgreSQL container per test class
- Flyway migration (same
V1__create_persistence_tables.sqlused in production) - Pre-configured
DataSourceandObjectMapper
class JdbcWorkflowRepositoryTest extends JdbcRepositoryTestBase {
private JdbcWorkflowRepository repo;
@BeforeEach
void setUp() {
repo = new JdbcWorkflowRepository(dataSource);
repo.deleteAllForTenant(TENANT);
}
@Test
void saveAndFindById_roundTrip() {
Workflow workflow = buildWorkflow("wf-1");
repo.save(TENANT, workflow);
Optional<Workflow> found = repo.findById(TENANT, "wf-1");
assertThat(found).isPresent();
assertThat(found.get().getId()).isEqualTo("wf-1");
}
}These tests require Docker for Testcontainers. Run them separately:
./gradlew :hensu-server:test --tests "*.persistence.*"In production multi-instance deployments, each server node holds a lease on the executions it is currently running. When a node crashes, surviving nodes detect the stale lease and atomically claim the orphaned execution for re-execution.
| Class | Package | Role |
|---|---|---|
ExecutionLeaseManager |
persistence/ |
@ApplicationScoped CDI bean; owns lease SQL, generates server_node_id, exposes updateHeartbeats() and claimStaleExecutions() |
ExecutionHeartbeatJob |
service/ |
@Scheduled — runs every ${hensu.lease.heartbeat-interval:30s}; calls leaseManager.updateHeartbeats() |
WorkflowRecoveryJob |
service/ |
@Scheduled — runs every ${hensu.lease.recovery-interval:60s}; claims stale executions and calls workflowService.resumeExecution() for each |
The JdbcWorkflowStateRepository.save() method sets or clears lease columns based on
checkpointReason:
| Reason | server_node_id |
last_heartbeat_at |
|---|---|---|
"checkpoint" |
set to this node's ID | set to NOW() |
"paused" / "completed" / "failed" / "rejected" |
NULL |
NULL |
This means findPaused() (which filters WHERE server_node_id IS NULL) only returns
human-review checkpoints — active running executions are never surfaced as paused.
# Node identity — auto-generated UUID on startup if left blank
hensu.node.id=
# Heartbeat interval — how often active leases are renewed
hensu.lease.heartbeat-interval=30s
# Recovery sweep interval — how often the sweeper runs
hensu.lease.recovery-interval=60s
# Stale threshold — executions older than this are claimed by the sweeper
hensu.lease.stale-threshold=90sThe inmem test profile disables the scheduler entirely:
%inmem.quarkus.scheduler.enabled=falseExecutionLeaseManager.isActive() returns false when quarkus.datasource.active=false.
All lease operations are no-ops; WorkflowRecoveryJob guards with if (!leaseManager.isActive()) return;.
Lease behavior is tested in io.hensu.server.persistence.ExecutionLeaseTest (Testcontainers
PostgreSQL, no Quarkus context). Key properties covered:
- Orphaned row (stale heartbeat) is claimed by the sweeper node
- Row with a fresh heartbeat is never claimed (live execution safety)
updateHeartbeats()only touches rows owned by the calling node (crashed-node isolation)
Use the SleepHandler (node type "sleep", package io.hensu.server.dev) to simulate
long-running active executions for manual recovery testing. It blocks the execution thread
for durationSeconds (default 30 s), giving you a window to kill -9 the server process.
Upon restart, WorkflowRecoveryJob detects the stale lease and resumes the orphaned execution
on the surviving node.
{ "id": "long-task", "nodeType": "GENERIC", "handlerType": "sleep",
"config": { "durationSeconds": 60 } }When a post-processor needs out-of-band input (e.g. human review), the execution pauses without re-running the agent on resume. The protocol has four moving parts:
ExecutionPhase (sealed, in hensu-core) tracks where the execution stopped:
| Variant | Meaning |
|---|---|
Initial |
Normal forward execution – no pause in progress. |
Awaiting |
Paused inside the post-pipeline. Carries nodeId, processorId, cachedResult, correlationId, and requestedAt. |
Terminal |
Execution reached a final state (completed / failed / rejected). |
The phase is persisted as JSONB in the phase column (V2__add_execution_phase.sql).
JdbcWorkflowStateRepository reads and writes it via ExecutionPhaseSerializer /
ExecutionPhaseDeserializer (registered in HensuJacksonModule).
A post-processor signals a pause by returning SuspendForExternal(correlationId). The pipeline
saves the current position and the cached node result, then returns control to WorkflowExecutor,
which sets the phase to Awaiting and checkpoints the state.
ResumeInput (sealed, in hensu-core) carries the caller's intent when resuming:
| Variant | Purpose |
|---|---|
ApplyReview |
Wraps a ReviewDecision (Approve / Reject / Backtrack). |
ApplyContextEdits |
Merges key-value edits into execution context. |
None |
Plain resume with no additional input. |
ResumeInput is transient – it lives on HensuState but is never persisted or snapshotted.
- REST layer –
ExecutionResourcereceives aResumeRequestand callstoResumeInput()to map it to the core sealed type. - Service layer –
ExecutionStateService.resumeExecution()restores theHensuSnapshot, converts it to aHensuState, setsstate.setResumeInput(resumeInput), and callsexecuteFrom(). - Executor –
WorkflowExecutor.executeLoop()reads the phase. ForAwaitingit re-enters the post-pipeline at the named processor viaProcessorPipeline.executePostFrom(). - Post-processor – the processor (e.g.
ReviewPostProcessor) checksstate.getResumeInput(). If anApplyReviewis present it consumes the decision directly instead of callingrequestReview()again (which would returnPendingand loop).
The SSE stream is closed when an execution pauses. Paused executions may wait days or weeks
for human review – holding an HTTP connection open is wasteful and will be killed by idle
timeouts long before the review arrives. Clients must re-subscribe to
GET /executions/{executionId}/events after submitting a POST /executions/{executionId}/resume
to receive post-resume events.
{
"decision": "approve | reject | backtrack",
"reason": "optional – required for reject/backtrack",
"targetStep": "optional – node ID for backtrack",
"contextEdits": { "key": "value" }
}Omitting decision with non-empty contextEdits produces ApplyContextEdits. An empty or null
body produces ResumeInput.None.
The server is deployed as a GraalVM native image via Quarkus. All server code — and any dependency it pulls in — must be native-image safe. See the hensu-core Developer Guide for the foundational rules (no reflection, no classpath scanning, no dynamic proxies, no runtime bytecode generation). This section covers server-specific concerns.
./gradlew hensu-server:build -Dquarkus.native.enabled=true -Dquarkus.package.type=nativeThe output binary is at hensu-server/build/hensu-server-*-runner:
QUARKUS_PROFILE=inmem ./hensu-server/build/hensu-server-*-runnerPrerequisites:
- GraalVM JDK 25+ with
native-imageinstalled - Linux:
glibc-devel,zlib-devel,gcc(or equivalent) - macOS: Xcode Command Line Tools
The build takes several minutes. For day-to-day development, use JVM mode
(./gradlew hensu-server:quarkusDev) and reserve native builds for release verification.
Quarkus performs heavy build-time processing that relaxes some raw GraalVM constraints:
| Feature | Raw GraalVM | With Quarkus |
|---|---|---|
CDI injection (@Inject) |
Requires reflection config | Works — Quarkus resolves beans at build time (ArC) |
@ConfigProperty |
Requires reflection config | Works — processed at build time |
JAX-RS resources (@Path, @GET) |
Requires reflection config | Works — REST layer is build-time wired |
Jackson @JsonProperty on DTOs |
Requires reflection config | Works — quarkus-jackson registers metadata |
ServiceLoader |
Fails at runtime | Works — Quarkus scans META-INF/services at build time |
| LangChain4j AI services (CDI) | Requires reflection config | Works — quarkus-langchain4j extensions register CDI beans |
| LangChain4j AI services (programmatic) | Requires reflection config | Manual — extensions only wire CDI; programmatic ChatModel builders bypass build-time scan |
Key insight: Within Quarkus-managed code, standard annotations and CDI work normally. The constraints only bite when you introduce code that Quarkus doesn't know about — custom reflection, third-party libraries without Quarkus extensions, or hensu-core internals that bypass the framework.
When adding a new library to hensu-server:
-
Check if a Quarkus extension exists. Search extensions catalog first. Extensions provide build-time metadata, so you get native-image support automatically.
-
If an extension exists, add the Quarkus extension (not the raw library):
// build.gradle.kts implementation("io.quarkus:quarkus-my-library") // Quarkus extension // NOT: implementation("org.example:my-library") // raw library
-
If no extension exists, you must verify native-image compatibility:
- Run
./gradlew hensu-server:build -Dquarkus.native.enabled=true -Dquarkus.package.type=native - Test the binary:
./hensu-server/build/hensu-server-*-runner - If it fails with
ClassNotFoundExceptionorNoSuchMethodException, add reflection configuration:// src/main/resources/reflect-config.json [ { "name": "com.example.SomeClass", "allDeclaredConstructors": true, "allPublicMethods": true } ]
- Run
-
Pin the version to match Quarkus BOM. If the library is managed by the Quarkus BOM (e.g., Jackson, Vert.x), do not override the version. Mismatched versions cause subtle native-image failures.
CDI producers in ServerConfiguration are native-image safe because Quarkus processes them at build time. Follow these patterns:
// SAFE — delegates from HensuEnvironment
@Produces @Singleton
public WorkflowExecutor workflowExecutor(HensuEnvironment env) {
return env.getWorkflowExecutor();
}
// SAFE — delegates repository created by HensuFactory
@Produces @Singleton
public WorkflowRepository workflowRepository(HensuEnvironment env) {
return env.getWorkflowRepository();
}
// UNSAFE — dynamic class loading in a producer
@Produces @Singleton
public Object dynamicBean() {
return Class.forName(config.getClassName()).newInstance(); // fails in native
}hensu-core domain classes are deliberately free of Quarkus annotations. When Jackson needs to access them reflectively at runtime (private constructors, builder setters, build() methods), GraalVM static analysis cannot trace the call sites. The fix lives entirely in hensu-server.
Registrations are split across five dedicated classes to keep each concern isolated:
| Class | Covers |
|---|---|
CoreModelNativeConfig |
Hensu domain model — Jackson mixin/builder targets, treeToValue types, embedded records |
LangChain4jNativeConfig |
Shared JDK HTTP transport — ServiceLoader-instantiated client factory/builder/client |
LangChain4jAnthropicNativeConfig |
Anthropic API request/response DTOs and enums |
LangChain4jGeminiNativeConfig |
Google AI Gemini API request/response DTOs and enums |
ExecutionEventNativeConfig |
SSE event DTOs – ExecutionEvent sealed subtypes for native serialization |
Five patterns require registration here (matching CoreModelNativeConfig javadoc §1–§5):
-
@JsonPOJOBuildermixin targets — Jackson instantiates the builder via its private no-arg constructor, calls each setter, then callsbuild(). GraalVM cannot trace these calls through the generic mixin machinery. -
treeToValuedelegation — When a custom deserializer callsmapper.treeToValue(node, SomeClass.class), Jackson uses POJO reflection forSomeClass. Simple records (primitives, strings, enums only) should be fixed by switching to manualJsonNodeextraction instead. Register only types where manual extraction is impractical (e.g., nestedDurationfields). -
Manual deser, default Jackson ser — Types deserialized manually in
NodeDeserializer(directJsonNodeextraction) but serialized via Jackson's defaultBeanSerializerinWorkflowSerializer.toJson(). The deserialization path needs no reflection, but the serialization path reads record component accessors reflectively. Current types:Branch,ConsensusConfig,ConsensusStrategy,ConsensusResult,ConsensusResult.Vote,ConsensusResult.VoteType,ScoreCondition,DoubleRange. -
Simple immutable types with custom deser but default ser —
WorkflowStateSchemaandStateVariableDeclarationuse a custom deserializer (WorkflowStateSchemaDeserializer) that extracts fields manually. However, Jackson's default serializer readsgetVariables()reflectively, so both classes must be registered. -
Record types embedded in builder classes — When a
recordis a field inside a mixin-registered builder type, Jackson reaches it via its canonical constructor and component accessors. GraalVM cannot trace those calls statically. Register the record and every nested record transitively. No mixin or custom deserializer is needed — registration alone is sufficient. Current types:ReviewConfig,HensuSnapshot,PlanSnapshothierarchy.
When to add vs. fix: if the class is a simple record with no Duration/nested-complex fields, fix the deserializer. If it contains Duration or deeply nested types, add it here. For records embedded in builder types, always register them. For types in pattern 3, registration is needed only because the serialization path uses default Jackson. See hensu-serialization Developer Guide for the full rule.
LangChain4jProvider creates all ChatModel instances programmatically via builders, outside CDI. This bypasses the Quarkiverse build-time scan that would normally wire reflection metadata. Two additional patterns arise:
-
ServiceLoaderHTTP transport (LangChain4jNativeConfig) — The JDK HTTP client is resolved at runtime viaServiceLoader. GraalVM cannot traceServiceLoaderstatically. Three classes must be registered (factory, builder, client), and the service file must also be bundled viaquarkus.native.resources.includesinapplication.properties. -
Static
ObjectMapperin provider SDKs (LangChain4jAnthropicNativeConfig,LangChain4jGeminiNativeConfig) — Each provider SDK owns a staticObjectMapperinstance independent of the Quarkus-managed one. Quarkus's build-time Jackson configuration never reaches it. All request-path DTOs (Java → JSON) and response-path DTOs (JSON → Java) must be registered explicitly, including enums. Builder inner classes are omitted — these SDKs use direct field access, not the builder pattern, for Jackson binding.
GraalVM's static analysis only sees resource paths known at build time. Any class that loads resources via a dynamically constructed path — e.g., getResourceAsStream("/stubs/" + scenario + "/" + key + ".txt") — will silently receive null at runtime in the native image, because the files were never embedded.
Fix: declare the affected path patterns in application.properties:
# Bundle all stub response files into the native image.
# StubResponseRegistry builds paths like /stubs/{scenario}/{key}.txt at runtime;
# GraalVM cannot trace these dynamically — explicit inclusion is required.
quarkus.native.resources.includes=stubs/**Rule: any directory whose contents are loaded via a runtime-computed path needs a corresponding quarkus.native.resources.includes entry. Add it next to the relevant property comment, not at the bottom of the file.
Mutiny reactive types are safe. Uni, Multi, BroadcastProcessor all work in native image — Quarkus handles their registration.
MCP JSON-RPC uses explicit Jackson. The JsonRpc class uses ObjectMapper directly with readTree/writeValueAsString — no reflection-based deserialization. This is intentionally safe for native image.
# Full native build (slow — run before releases, not on every change)
./gradlew hensu-server:build -Dquarkus.native.enabled=true -Dquarkus.package.type=native
# Quick JVM-mode test (catches most issues except native-specific ones)
./gradlew hensu-server:test
# Native integration tests
./gradlew hensu-server:test -Dquarkus.test.native-image-profile=true| Pattern | Safe | Notes |
|---|---|---|
@Inject / @Produces |
Yes | Quarkus ArC — build-time CDI |
@ConfigProperty |
Yes | Build-time processed |
| Quarkus extensions | Yes | Provide native metadata |
| Raw third-party libs | Maybe | Need reflect-config.json if reflective |
ObjectMapper.readTree() |
Yes | No reflection — tree-model parsing |
new ObjectMapper().readValue(json, MyClass.class) |
Maybe | Needs entry in CoreModelNativeConfig unless Quarkus-managed |
mapper.treeToValue(node, SimpleRecord.class) |
No | Fix the deserializer — extract fields manually from JsonNode |
getResourceAsStream("/path/" + dynamic + ".txt") |
No | Add pattern to quarkus.native.resources.includes |
@RegisterForReflection on hensu-core classes |
No | Keep Quarkus annotations out of core — register in CoreModelNativeConfig |
quarkus-jackson for mixin-pattern types |
No | Extension only scans direct annotations; mixins are runtime events |
LangChain4j ChatModel created via builder |
No | Programmatic creation bypasses CDI scan — register DTOs in LangChain4j*NativeConfig |
Mutiny Uni/Multi |
Yes | Quarkus-managed |
# HTTP Server
quarkus.http.port=8080
quarkus.http.host=0.0.0.0
# MCP Configuration
hensu.mcp.connection-timeout=30s
hensu.mcp.read-timeout=60s
hensu.mcp.pool-size=10
# Planning Configuration
hensu.planning.default-max-steps=10
hensu.planning.default-max-replans=3
hensu.planning.default-timeout=5m
# PostgreSQL (Dev Services auto-starts a container in dev/test mode)
quarkus.datasource.db-kind=postgresql
%prod.quarkus.datasource.username=hensu
%prod.quarkus.datasource.password=hensu
%prod.quarkus.datasource.jdbc.url=jdbc:postgresql://localhost:5432/hensu
# Flyway schema migrations
quarkus.flyway.migrate-at-start=true
quarkus.flyway.schemas=runtime
# In-memory profile (no PostgreSQL)
%inmem.quarkus.datasource.active=false
%inmem.quarkus.datasource.devservices.enabled=false
%inmem.quarkus.flyway.migrate-at-start=false
# Distributed recovery leasing
hensu.node.id=
hensu.lease.heartbeat-interval=30s
hensu.lease.recovery-interval=60s
hensu.lease.stale-threshold=90s
%inmem.quarkus.scheduler.enabled=false
# Credentials — loaded by HensuEnvironmentProducer from hensu.credentials.* prefix
# hensu.credentials.ANTHROPIC_API_KEY=sk-ant-...
# hensu.credentials.GOOGLE_API_KEY=AIza...
# Default tenant (used when JWT tenant_id claim is absent, e.g. dev mode)
# hensu.tenant.default=default
# Verbose execution logging (enables LoggingExecutionListener)
hensu.verbose.enabled=false
# Logging
quarkus.log.category."io.hensu".level=DEBUG@ApplicationScoped
public class MyComponent {
@ConfigProperty(name = "hensu.mcp.connection-timeout", defaultValue = "30s")
Duration connectionTimeout;
@ConfigProperty(name = "hensu.planning.default-max-steps", defaultValue = "10")
int maxSteps;
}- Inject
RequestTenantResolverfor tenant identity (resolved from JWTtenant_idclaim) - Use
HensuFactory.builder()for core infrastructure (never construct directly) - Keep workflow definition management separate from execution operations
- Use service layer for business logic, resources for HTTP concerns
- Prefer constructor injection over field injection
- Use sealed interfaces for event types (exhaustive pattern matching)
- Clean up SSE subscriptions on disconnect
- Log at appropriate levels (INFO for requests, DEBUG for details)
- Don't bypass
HensuFactoryby constructing core components directly - Don't create
StubAgentmanually (use built-in stub mode) - Don't create repository instances directly in server producers — delegate from
HensuEnvironment - Don't support local command execution in server mode
- Don't put business logic in REST resources
- Don't expose internal exceptions to clients
- Don't block on I/O in reactive streams (use virtual threads or async)
- Don't mix definition management with execution in same REST resource
- README.md - Module overview and quick start
- Unified Architecture - Architecture decisions and vision
- hensu-core Developer Guide - Core engine documentation
- hensu-serialization Developer Guide - Jackson patterns,
treeToValuerule, native image implications - DSL Reference - Workflow DSL syntax