A Gatling extension for load testing Kafka applications, with a focus on Request-Reply (RPC) patterns, Quality of Service (QoS) measurement, and resilience testing.
This project is organized as a focused Open Source repository for Kafka load testing:
gatling-kafka-oss/
βββ pom.xml # Project configuration
βββ README.md # This file
βββ src/
βββ main/java/pl/perfluencer/
β βββ cache/ # RequestStore implementations (InMemory default)
β βββ common/ # Shared utilities (Check API)
β βββ kafka/ # Core Kafka logic
βββ test/ # Integration tests and example simulations
# Build the project
mvn clean package -DskipTests
# Run all tests
mvn clean testThis framework is designed not just to generate load, but to act as a precise instrument for measuring the end-to-end quality of service of your event-driven architecture.
Unlike simple producer benchmarks, this extension measures the full round-trip time of a business transaction:
- Send: Gatling produces a request message.
- Process: Your system consumes, processes, and produces a response.
- Receive: Gatling consumes the response and matches it to the request.
- Measure: The reported latency is the true time the user waits, capturing producer delays, broker latency, consumer lag, and application processing time.
The extension protects against and measures various failure modes:
- Race Conditions: By using a distributed
RequestStore(Redis/Postgres), it handles scenarios where a response arrives before the request is fully acknowledged or when multiple Gatling nodes are active. - Assurance of Delivery:
- At-Least-Once: Configurable producer
acks=allensures requests are durably persisted before being tracked. - Idempotency: Supports
enable.idempotence=trueto prevent duplicate requests from skewing test results.
- At-Least-Once: Configurable producer
- Data Integrity: The
MessageCheckAPI verifies that the response payload strictly matches expectations, catching subtle data corruption or logic errors under load.
A critical design goal is to measure the impact of system failures:
- Broker Outages: When a broker fails, the extension tracks the exact impact on latency and throughput.
- Application Restarts: If your application crashes and restarts, the extension's
RequestStoreensures that pending requests are not lost. When the application recovers and sends delayed responses, Gatling correctly matches them and reports the high latency, providing a true picture of the outage's impact on the user experience. - Chaos Engineering: Perfect for validating system behavior during network partitions or component failures.
Here is a complete example using the fluent DSL:
package pl.perfluencer.kafka.simulations;
import io.gatling.javaapi.core.ScenarioBuilder;
import io.gatling.javaapi.core.Simulation;
import pl.perfluencer.kafka.javaapi.KafkaDsl;
import pl.perfluencer.kafka.javaapi.KafkaProtocolBuilder;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import static io.gatling.javaapi.core.CoreDsl.*;
import static pl.perfluencer.kafka.javaapi.Kafka.kafka;
import static pl.perfluencer.kafka.MessageCheck.*;
public class KafkaExampleSimulation extends Simulation {
{
// 1. Configure the Protocol
KafkaProtocolBuilder protocol = KafkaDsl.kafka()
.bootstrapServers("localhost:9092")
.numProducers(4)
.numConsumers(4);
// 2. Build the Scenario with Fluent DSL
ScenarioBuilder scn = scenario("Kafka Request-Reply Demo")
.exec(
kafka("Order Request")
.requestReply()
.requestTopic("orders-request")
.responseTopic("orders-response")
.key(session -> "order-" + session.userId())
.value("{\"orderId\":\"ORD-123\",\"action\":\"CREATE\"}")
.check(echoCheck()) // Verify echo
.check(jsonPathEquals("$.status", "OK")) // JSON validation
.check(responseContains("SUCCESS")) // Contains check
.timeout(10, TimeUnit.SECONDS)
);
// 3. Set up the Load Injection
setUp(
scn.injectOpen(constantUsersPerSec(10).during(Duration.ofSeconds(30)))
).protocols(protocol);
}
}- Request-Reply Support: Handle asynchronous request-reply flows where Gatling sends a request and waits for a correlated response on a different topic.
- Robust State Management: Use external stores (PostgreSQL, Redis) to track in-flight requests, enabling tests to survive Gatling node restarts or long-running async processes.
- Flexible Serialization: Support for String, ByteArray, Protobuf, Avro, and custom Object payloads.
- Message Validation: Powerful
MessageCheckAPI to validate response payloads against the original request. - Fire-and-Forget: High-performance mode for sending messages without waiting for broker acknowledgement.
- Multi-Protocol Execution: Support for registering multiple concurrent Kafka protocols inside a single simulation for multi-topic, multi-serde environments.
The Gatling Kafka extension supports defining multiple discrete KafkaProtocol instances within a single simulation. This is especially useful when your application interacts with different message formats (e.g., JSON and Avro) simultaneously.
To achieve maximum performance and latency granularity at the transport layer, each protocol instance is inherently designed to handle a single discrete topic configuration.
If you need to load test multiple isolated topics (with independent connection pools, serializers, or Schema Registry settings), you must define separate Gatling protocols and explicitly map them to their corresponding scenarios using .protocols().
// Protocol 1: JSON on Topica A
KafkaProtocolBuilder jsonProtocol = KafkaDsl.kafka()
.producerProperties(Map.of(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()));
// Protocol 2: Avro on Topic B
KafkaProtocolBuilder avroProtocol = KafkaDsl.kafka()
.producerProperties(Map.of(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()));
// Scenario 1 binds to Protocol 1
ScenarioBuilder jsonScenario = scenario("JSON Flow")
.exec(KafkaDsl.kafka("Send JSON").send().topic("topic-a").value("..."));
// Scenario 2 binds to Protocol 2
ScenarioBuilder avroScenario = scenario("Avro Flow")
.exec(KafkaDsl.kafka("Send Avro").send().topic("topic-b").asAvro().value(...));
setUp(
jsonScenario.injectOpen(atOnceUsers(10)).protocols(jsonProtocol),
avroScenario.injectOpen(atOnceUsers(10)).protocols(avroProtocol)
);KafkaProducerActor: Handles sending messages to Kafka. It uses the standard Kafka Producer API and supports genericObjectpayloads.KafkaConsumerThread: Consumes messages from the response topic. It preserves metadata (headers, timestamp) and passes records to the processor.RequestStore: Persists in-flight request data (correlation ID, payload, timestamp). Implementations includeInMemoryRequestStore(default),RedisRequestStore[ENTERPRISE ONLY], andPostgresRequestStore[ENTERPRISE ONLY].MessageCheck: Validates the received response against the stored request. Use fluent shortcuts likeMessageCheck.echoCheck()orMessageCheck.jsonPathEquals().
The kafka action allows you to send messages to a topic.
By default, the action waits for the Kafka broker to acknowledge the message (acks=all recommended).
import static pl.perfluencer.kafka.javaapi.KafkaDsl.*;
// ... inside your scenario
.exec(
kafka("Simple Send")
.send()
.topic("request_topic")
.key("my-key")
.value("my-value")
)For maximum throughput where you don't need delivery guarantees or response tracking, use "Fire and Forget".
Note: In this mode, requests are NOT persisted to the Request Store (Redis/Postgres). This means you cannot track response times or verify replies for these messages.
.exec(
kafka("Fire Event")
.send()
.topic("events_topic")
.key("key")
.value("value")
)Sometimes you need to consume messages from a topic without sending a request first (e.g., verifying a side-effect of another action).
You can use the consume action to read a message from a topic and save it to the session for validation.
.exec(
KafkaDsl.consume("Consume Event", "my_topic")
.saveAs("myMessage") // Save the message value to session
)
.exec(session -> {
String message = session.getString("myMessage");
if (!"expected_value".equals(message)) {
throw new RuntimeException("Unexpected message: " + message);
}
return session;
})You can send custom headers with your Kafka messages. This is useful for passing metadata, tracing information, or routing details.
.exec(
kafka("Send with Headers")
.send()
.topic("my_topic")
.key("my_key")
.value("my_value")
.headers(Map.of(
"X-Correlation-ID", session -> UUID.randomUUID().toString(),
"X-Source", session -> "Gatling-Load-Test",
"X-Timestamp", session -> String.valueOf(System.currentTimeMillis())
))
)Gatling feeders allow you to drive your Kafka tests with external data sources (CSV files, databases, or programmatically generated data). This is essential for realistic load testing with varied, production-like payloads.
Create a CSV file at src/test/resources/transaction_data.csv:
accountId,amount,currency,customerName
ACC-001,1500.00,USD,John Smith
ACC-002,2500.50,EUR,Jane Doe
ACC-003,750.25,GBP,Bob WilsonUse it in your simulation:
import static io.gatling.javaapi.core.CoreDsl.*;
public class TransactionSimulation extends Simulation {
{
// Load CSV feeder - circular() wraps around when exhausted
FeederBuilder<String> csvFeeder = csv("transaction_data.csv").circular();
ScenarioBuilder scn = scenario("Transaction Processing")
.feed(csvFeeder) // Inject data into each user's session
.exec(
KafkaDsl.kafka("Transaction Request")
.requestReply()
.requestTopic("transaction-requests")
.responseTopic("transaction-responses")
.key(session -> session.getString("accountId")) // Key from CSV
.value(session -> String.format(
"{\"accountId\":\"%s\",\"amount\":%s,\"currency\":\"%s\"}",
session.getString("accountId"),
session.getString("amount"),
session.getString("currency")
))
.serializationType(String.class, String.class, SerializationType.STRING)
.checks(transactionChecks)
.timeout(10, TimeUnit.SECONDS)
);
setUp(scn.injectOpen(constantUsersPerSec(10).during(60)))
.protocols(kafkaProtocol);
}
}Generate dynamic test data on-the-fly:
FeederBuilder<Object> randomFeeder = Stream.of(
Map.of(
"randomAmount", () -> String.format("%.2f", Math.random() * 10000),
"randomAccountId", () -> "ACC-" + UUID.randomUUID().toString().substring(0, 8),
"timestamp", () -> String.valueOf(System.currentTimeMillis())
)
).toFeeder();
ScenarioBuilder scn = scenario("Dynamic Transactions")
.feed(randomFeeder)
.exec(
KafkaDsl.kafka("Payment Request")
.requestReply()
.requestTopic("transaction-responses")
.responseTopic("payment-responses")
.key(session -> session.getString("randomAccountId"))
.value(session -> String.format(
"{\"accountId\":\"%s\",\"amount\":%s,\"ts\":%s}",
session.getString("randomAccountId"),
session.getString("randomAmount"),
session.getString("timestamp")
))
.serializationType(String.class, String.class, SerializationType.STRING)
.checks(checks)
.timeout(10, TimeUnit.SECONDS)
);| Strategy | Usage | Best For |
|---|---|---|
.queue() |
Each record used once | Unique transactions |
.circular() |
Wraps to start | Continuous load tests |
.random() |
Random selection | Varied data patterns |
.shuffle() |
Random order, once each | Unique + randomized |
This is the core feature of the extension. It allows you to test systems that consume a message, process it, and send a reply to another topic.
Flow:
- Gatling generates a unique
correlationId. - Gatling sends the request message (with
correlationIdin headers or payload) to therequest_topic. - Gatling stores the request details (key, payload, timestamp) in a Request Store.
- Your Application consumes the request, processes it, and sends a response to the
response_topic. - Crucial: The response MUST contain the same
correlationId(usually in the header, configurable viacorrelationHeaderName). - The Gatling Kafka Consumer reads the response.
- It looks up the original request in the Request Store using the
correlationId. - If found, it runs your Message Checks to validate the response against the request and records the transaction time in the Gatling report.
The extension supports generic Object payloads. You can pass any object as the message value, provided you configure the appropriate serializer in the KafkaProtocol.
Example with Custom Object:
// In Protocol Configuration
.producerProperties(Map.of(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, MyCustomSerializer.class.getName()
))
// In Scenariotatus
.exec(
KafkaDsl.kafka("Custom Payload")
.requestReply()
.requestTopic("req_topic")
.responseTopic("res_topic")
.key(session -> "key")
.value(session -> new MyCustomObject("data")) // Pass Object directly
.serializationType(byte[].class, byte[].class, SerializationType.BYTE_ARRAY) // Or custom enum
.checks(checks)
.timeout(10, TimeUnit.SECONDS)
)You can fine-tune the internal behavior of the Request Stores (timeouts, batch sizes) using the storeConfig DSL. This applies to all store types (InMemory, Redis, Postgres).
import java.time.Duration;
KafkaProtocolBuilder protocol = kafka()
// ... other config ...
.storeConfig(config -> config
// Update timeout check interval for ALL stores (default: 10s)
.timeoutCheckInterval(Duration.ofSeconds(5))
// Update batch size for processing timeouts (Redis only for now)
.timeoutBatchSize(2000)
);You must configure a backing store for in-flight requests.
Requires a requests table. Recommended for high reliability and audit logs.
CREATE TABLE requests (
correlation_id UUID PRIMARY KEY,
request_key TEXT,
request_value_bytes BYTEA,
serialization_type VARCHAR(50),
transaction_name VARCHAR(255),
scenario_name VARCHAR(255),
start_time TIMESTAMP,
timeout_time TIMESTAMP,
expired BOOLEAN DEFAULT FALSE
);
-- Recommended indexes for performance
CREATE INDEX idx_requests_timeout ON requests(timeout_time) WHERE NOT expired;
CREATE INDEX idx_requests_scenario ON requests(scenario_name);HikariCP Connection Pool Configuration:
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
HikariConfig config = new HikariConfig();
config.setJdbcUrl("jdbc:postgresql://localhost:5432/gatling");
config.setUsername("gatling");
config.setPassword("password");
// Connection pool sizing
config.setMaximumPoolSize(30); // See sizing guide below
config.setMinimumIdle(5); // Keep warm connections
config.setConnectionTimeout(10000); // 10s max wait
config.setIdleTimeout(600000); // 10min idle timeout
config.setMaxLifetime(1800000); // 30min max lifetime
config.setLeakDetectionThreshold(60000); // Warn if connection held > 60s
// Performance optimizations
config.addDataSourceProperty("cachePrepStmts", "true");
config.addDataSourceProperty("prepStmtCacheSize", "250");
config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");
HikariDataSource dataSource = new HikariDataSource(config);PostgreSQL Connection Pool Sizing:
| Test Scale | Concurrent Writes | Recommended Pool Size | Postgres max_connections |
|---|---|---|---|
| Small | < 50/sec | 10-20 | 100 |
| Medium | 50-200/sec | 20-40 | 200 |
| Large | 200-500/sec | 40-80 | 300 |
| Very Large | 500+/sec | Consider sharding | 500+ |
Sizing Formula:
Pool Size = (Concurrent Requests) Γ (Query Duration in sec) + 5 (overhead)
Example: 100 concurrent Γ 0.05s query time + 5 = 10 connections
Important: PostgreSQL
max_connectionsshould be at least 2Γ the total pool size across all Gatling instances.
High-performance distributed tracking, ideal for horizontally scaled Gatling clusters.
Connection Pool Configuration:
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setMaxTotal(50); // Max connections (see sizing guide below)
poolConfig.setMaxIdle(20); // Max idle connections
poolConfig.setMinIdle(5); // Min idle connections for fast startup
poolConfig.setTestOnBorrow(true); // Validate connections
poolConfig.setBlockWhenExhausted(true);
poolConfig.setMaxWaitMillis(3000); // Wait max 3s for connection
JedisPool pool = new JedisPool(poolConfig, "localhost", 6379);Redis Connection Pool Sizing Recommendations:
| Test Scale | Users/sec | Recommended maxTotal |
Notes |
|---|---|---|---|
| Small (Dev) | < 10 | 10-20 | Default settings OK |
| Medium | 10-100 | 30-50 | Monitor connection waits |
| Large | 100-500 | 50-100 | Tune minIdle to 20-30 |
| Very Large | 500+ | 100-200 | Consider Redis clustering |
Sizing Formula:
maxTotal = (Peak Users/sec) Γ (Avg Request Duration in sec) Γ 1.5 (safety margin)
Example: 100 users/sec Γ 0.2s duration Γ 1.5 = 30 connections
Performance Tips:
- Monitor: Track "pool exhausted" errors in logs
- Redis Memory: Set
maxmemoryand useallkeys-lrueviction policy - Network: Ensure low latency to Redis (< 1ms ideal)
- Persistence: Disable RDB/AOF for pure cache use cases
For extremely high-throughput scenarios where the latency of synchronous DB/Redis writes becomes a bottleneck for the producer, the Enterprise edition provides a BufferedRequestStore with parallel background workers.
How it works:
- Write Buffer: Requests are instantly queued in memory (non-blocking).
- Parallel Flush: A pool of background workers flushes requests to the persistent layer (L2) in optimized batches.
- Retry Logic: The standard distributed retry mechanism handles the race condition where a fast response arrives before the request is flushed.
import pl.perfluencer.cache.BufferedRequestStore;
// 1. Create the persistent L2 store (Postgres or Redis)
RequestStore l2Store = new PostgresRequestStore(dataSource);
// 2. Wrap it with the Buffered Store
// parallelWorkers = 4 (default)
// batchSize = 500 records
// batchLatency = 50ms (max wait before flush)
RequestStore store = new BufferedRequestStore(l2Store, 500, 50);Configuration & Tuning:
batchSize(e.g., 500): Larger batches improve database throughput but increase memory usage.batchTimeMillis(e.g., 50ms): The "linger" time.- Relationship to Minimum Service Time: Ideally, set this lower than your application's fastest expected response time.
- Valid responses arriving faster than this window will rely on the Retry Loop (wait and retry).
parallelism: Number of concurrent writer threads. Set this based on your database connection pool size.new BufferedRequestStore(l2Store, 500, 50, 8, 200)-> 8 parallel workers.
Critical Safety features:
- Flush Warning: If a batch flush takes longer than the safe retry window (default 200ms), a warning is logged:
SLOW FLUSH DETECTED. - Strict Error Handling: If the persistence layer fails (e.g., DB down), the store defaults to a Strict Stop policy, executing
System.exit(1)to prevent invalid test results.
Stores requests in a local ConcurrentHashMap. Warning: Data is lost on restart. Perfect for development and debugging.
When using asynchronous storage (like the Buffered Request Store) or dealing with distributed system latency, a response might arrive before the request is fully persisted and visible to the consumer. To handle this, the framework implements an efficient retry mechanism.
You can configure the retry behavior in the protocol:
KafkaProtocolBuilder protocol = kafka()
// ... other config ...
.retryBackoff(Duration.ofMillis(50)) // Wait 50ms before retrying a check
.maxRetries(3); // Retry 3 times before failingDefault values:
retryBackoff: 50msmaxRetries: 3
When to tune:
- Increase
retryBackoffif you are seeing "Request not found" errors but the database is healthy (persistence lag > 50ms). - Increase
maxRetriesif you have very high jitter in your L2 store write times.
Proper configuration is critical for high-performance Kafka load testing.
numProducers(int):- Default: 1
- Impact: Kafka producers are thread-safe and high-throughput. A single producer can often saturate a network link.
- Best Practice: Start with 1. Increasing this splits the load into smaller batches across multiple producers, which can reduce throughput and compression ratio. Only increase if you observe thread contention on the producer.
linger.ms:- Description: Time to wait for more records to arrive before sending a batch.
- Impact: Higher values (e.g., 5-10ms) improve batching and throughput at the cost of slight latency.
- Best Practice: Set to
5or10for high throughput. Set to0for lowest latency.
batch.size:- Description: Maximum size of a batch in bytes.
- Best Practice: Increase to
65536(64KB) or131072(128KB) for high throughput.
compression.type:- Best Practice: Use
lz4orsnappyfor a good balance of CPU usage and compression.zstdoffers better compression but higher CPU cost.
- Best Practice: Use
acks:all: Highest durability (slowest).1: Leader acknowledgement (faster).0: Fire and forget (fastest, no guarantees).
To ensure that messages are delivered exactly once per partition (preventing duplicates due to retries), enable idempotence. This is often a prerequisite for transactional producers but can be used independently.
KafkaProtocolBuilder protocol = kafka()
.producerProperties(Map.of(
ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true",
ProducerConfig.ACKS_CONFIG, "all", // Required for idempotence
ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE.toString()
));To enable transactional support (Exactly-Once Semantics), configure a transactionalId.
KafkaProtocolBuilder protocol = kafka()
.bootstrapServers("localhost:9092")
.transactionalId("my-transactional-id") // Enables transactions
.numProducers(4); // Each producer gets a unique ID: my-transactional-id-0, my-transactional-id-1, etc.Key Behaviors:
- Atomic Writes: Messages are written to the topic atomically. Consumers with
isolation.level=read_committedwill only see committed messages. - Blocking Commit: The producer will block to commit the transaction after each message (or batch). This ensures data integrity but may impact throughput.
- Unique IDs: When
numProducers > 1, the extension automatically appends a suffix (-0,-1, etc.) to thetransactionalIdto ensure each producer instance has a unique ID, preventing "Fenced Producer" errors.
numConsumers(int):- Default: 1
- Impact: Number of threads polling the response topic.
- Best Practice: Set this equal to the number of partitions of your response topic. Having more consumers than partitions is wasteful (idle threads).
fetch.min.bytes:- Best Practice: Increase (e.g.,
1024) to reduce the number of fetch requests if you can tolerate slight latency.
- Best Practice: Increase (e.g.,
When consuming from topics that contain transactional messages, you must decide whether to see all messages (including aborted transactions) or only committed ones.
read_uncommitted(Default): Consumers see all messages, including those from aborted transactions.read_committed: Consumers only see messages from committed transactions. This is essential for Exactly-Once Semantics.
KafkaProtocolBuilder protocol = kafka()
.consumerProperties(Map.of(
ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"
));KafkaProtocolBuilder protocol = kafka()
.bootstrapServers("localhost:9092")
.numProducers(1)
.numConsumers(4) // Matches response topic partitions
.producerProperties(Map.of(
ProducerConfig.ACKS_CONFIG, "1", // Faster than "all"
ProducerConfig.LINGER_MS_CONFIG, "10", // Wait for batching
ProducerConfig.BATCH_SIZE_CONFIG, "131072", // 128KB batches
ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4"
))
.consumerProperties(Map.of(
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"
));Validating that your system returns the correct data is just as important as measuring how fast it is. This section provides examples of how to verify response content.
The recommended approach is to use the Gatling-style fluent check chain: extractor β find strategy β validator. This API mirrors Gatling's HTTP checks, making it intuitive for experienced Gatling users.
import static pl.perfluencer.common.checks.Checks.*;
// ... inside your scenario
.check(
// JSONPath Extraction
jsonPath("$.status").find().is("OK"),
jsonPath("$.amount").ofDouble().gte(100.0),
jsonPath("$.items[*]").count().is(3),
jsonPath("$.items[*]").findAll().satisfies(items -> items.contains("Widget")),
// Regex Extraction
regex("orderId=([\\w-]+)").find(1).exists(),
regex("amount=(\\d+)").find(1).transform(Integer::parseInt).gt(100),
// XPath (XML) Extraction
xpath("/response/status").find().is("OK"),
xpath("/response/items/item").count().gt(0),
// Substring and Body
substring("SUCCESS").find().exists(),
bodyString().transform(String::length).gt(10)
)You can easily compare values extracted from the response against values extracted from the original request using the typed field() extractor or MessageCheckBuilder. This is crucial for verifying that the system processed the correct data.
import static pl.perfluencer.common.checks.Checks.field;
// Verify that the response's 'echoId' matches the request's 'reqId'
.check(
field(MyResponse.class).assertEquals(MyRequest::getReqId, MyResponse::getEchoId)
)
// Compare specific parsed values using the builder
.check(
MessageCheckBuilder.strings()
.named("Cross-Message Amount Check")
.check((req, res) -> {
double reqAmount = extractAmount(req);
double resAmount = extractAmount(res);
return reqAmount == resAmount
? Optional.empty()
: Optional.of("Amount mismatch: Req=" + reqAmount + ", Res=" + resAmount);
})
)For absolute simplicity, you can still use the legacy static shortcuts, though the Fluent API is preferred for new tests:
import static pl.perfluencer.common.checks.Checks.*;
// Echo check - verify response equals request exactly
var echoCheck = echoCheck();
// Basic string checks
var containsCheck = responseContains("SUCCESS");
var notEmptyCheck = responseNotEmpty();| Method | Description |
|---|---|
echoCheck() |
Verify response equals request |
responseContains(text) |
Verify response contains substring |
responseEquals(value) |
Verify response equals value |
responseMatches(regex) |
Verify response matches pattern |
responseNotEmpty() |
Verify response is not null/empty |
For validation logic that goes beyond the shortcuts, use MessageCheckBuilder:
import pl.perfluencer.common.checks.MessageCheckBuilder;
// Type-safe field comparison (great for Protobuf/POJOs)
var orderCheck = MessageCheckBuilder.forTypes(OrderRequest.class, OrderResponse.class)
.named("Order ID Match")
.verifyFieldEquals(OrderRequest::getOrderId, OrderResponse::getOrderIdEcho);
// Custom validation logic
var businessCheck = MessageCheckBuilder.strings()
.named("Business Rules")
.check((req, res) -> {
if (res.contains("ERROR")) {
return Optional.of("Response contains error: " + res);
}
if (res.length() < req.length()) {
return Optional.of("Response shorter than request");
}
return Optional.empty(); // Success
});Convert checks to MessageCheck for use with Kafka or MqMessageCheck for MQ:
// Kafka - static methods on MessageCheck
MessageCheck<String, String> check = MessageCheck.echoCheck();
MessageCheck<String, String> jsonCheck = MessageCheck.jsonPathEquals("$.id", "123");
// MQ - static methods on MqMessageCheck
MqMessageCheck<String, String> mqCheck = MqMessageCheck.responseContains("SUCCESS");
// Or convert from builder result
MessageCheck<String, String> custom = MessageCheck.from(
MessageCheckBuilder.strings().named("My Check").verify(String::equals)
);Checks defined in a requestReply block are strictly scoped to that specific request definition. They are not applied globally to all messages.
When you define multiple request types in the same simulation:
// Request A: Expects "Status: OK"
kafka("CreateUser")
.requestReply()
.check(jsonPath("$.status").is("OK"))
// Request B: Expects "Status: CREATED"
kafka("CreateAccount")
.requestReply()
.check(jsonPath("$.status").is("CREATED"))- A response correlated to the "CreateUser" request will only execute the checks defined for "CreateUser".
- A response correlated to the "CreateAccount" request will only execute the checks defined for "CreateAccount".
This isolation is handled automatically by the RequestStore. When a response arrives, the system looks up the original request by its correlationId and retrieves only the checks associated with that specific transaction type.
For complex scenarios requiring full control over serialization and validation logic.
When you need explicit control over serialization types (e.g., mixing Protobuf request with JSON response):
List<MessageCheck<?, ?>> checks = List.of(new MessageCheck<>(
"Content Check",
String.class, SerializationType.STRING, // Request type
String.class, SerializationType.STRING, // Response type
(req, res) -> {
if (!res.contains("\"status\":\"success\"")) {
return Optional.of("Response missing success status");
}
return Optional.empty();
}
));For complex JSON structures where JSONPath isn't sufficient:
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
ObjectMapper mapper = new ObjectMapper();
List<MessageCheck<?, ?>> checks = List.of(new MessageCheck<>(
"JSON Logic Check",
String.class, SerializationType.STRING,
String.class, SerializationType.STRING,
(req, res) -> {
try {
JsonNode root = mapper.readTree(res);
// Complex nested validation
int userId = root.path("user").path("id").asInt();
if (userId <= 0) {
return Optional.of("Invalid User ID: " + userId);
}
// Array validation
if (root.path("items").size() < 1) {
return Optional.of("Items array is empty");
}
return Optional.empty();
} catch (Exception e) {
return Optional.of("Failed to parse JSON: " + e.getMessage());
}
}
));Important
Enterprise Feature: Session-Aware Message Checks require the Enterprise edition with external Request Stores (Redis, PostgreSQL).
For scenarios with large payloads where storing the full request is memory-intensive, use SessionAwareMessageCheck to validate responses against session variables instead of the full payload.
Key Benefits:
- Memory Efficient: Store only the fields you need (e.g.,
accountId,amount) instead of entire payloads - Faster Lookups: Smaller session variable maps are quicker to serialize/deserialize
- Flexible Validation: Access any session variable in your check logic
import pl.perfluencer.kafka.SessionAwareMessageCheck;
import pl.perfluencer.common.util.SerializationType;
import java.util.Map;
import java.util.Optional;
// Define a session-aware check
SessionAwareMessageCheck<String> accountCheck = new SessionAwareMessageCheck<>(
"Account ID Check",
String.class, SerializationType.STRING,
(sessionVars, response) -> {
String expectedAccountId = sessionVars.get("accountId");
if (response.contains("\"accountId\":\"" + expectedAccountId + "\"")) {
return Optional.empty(); // Success
}
return Optional.of("Account ID mismatch: expected " + expectedAccountId);
});
// Check multiple session variables
SessionAwareMessageCheck<String> transactionCheck = new SessionAwareMessageCheck<>(
"Transaction Validation",
String.class, SerializationType.STRING,
(sessionVars, response) -> {
String accountId = sessionVars.get("accountId");
String amount = sessionVars.get("amount");
String currency = sessionVars.get("currency");
if (!response.contains(accountId)) {
return Optional.of("Account ID not found in response");
}
if (!response.contains(amount)) {
return Optional.of("Amount not found in response");
}
return Optional.empty();
});| Scenario | Use MessageCheck |
Use SessionAwareMessageCheck |
|---|---|---|
| Small payloads (<1KB) | β Recommended | Overkill |
| Large payloads (>10KB) | Memory intensive | β Recommended |
| Need full request access | β Required | Only session vars available |
| High-volume tests | May impact store performance | β Optimized |
Tip
Best Practice: When using session-aware checks, store only the fields needed for validation. Common fields include: accountId, transactionId, correlationId, amount, timestamp.
When using SessionAwareMessageCheck, you need to ensure the relevant session variables are persisted in the RequestStore alongside the request. By default, no session variables are stored.
You can specify which variables to persist using the .storeSession() method in the DSL:
KafkaDsl.kafka("Request")
.requestReply()
.requestTopic("request_topic")
.responseTopic("response_topic")
// ... basic config ...
.storeSession("accountId", "amount", "transactionId") // Persist these keys
.checks(...)These variables will then be available in your SessionAwareMessageCheck logic.
For high-throughput tests or large payloads, storing the entire request body in the RequestStore (Redis/Postgres) can be a bottleneck and consume excessive storage.
If you are using Session-Aware Message Checks and do not need the full payload for verification (e.g., you only need the accountId session variable), you can disable payload storage:
KafkaDsl.kafka("Request")
.requestReply()
.requestTopic("request_topic")
.responseTopic("response_topic")
.key("key")
.value("payload")
.storeSession("accountId")
.skipPayloadStorage() // Do NOT store the request body in Redis/DB
.checks(...)This significantly reduces I/O and storage requirements while keeping the correlation and session data needed for validation.
Sometimes you need to extract a value from the response to use in a subsequent request (though standard Request-Reply handles the correlation ID automatically).
If you need to extract a value from a JSON response body to correlate (instead of using headers), use the JsonPathExtractor:
// Configure the protocol to look for the correlation ID in the JSON body
KafkaProtocolBuilder protocol = kafka()
.correlationExtractor(
// Extract the value at path $.meta.correlationId
new JsonPathExtractor("$.meta.correlationId")
)
// ... other config
;The framework matches a response to a request using a Correlation ID. You can configure how this ID is extracted from the response message.
By default, the extension expects the Kafka Record Key of the response to match the Correlation ID of the request. This is the most common pattern in Kafka streams.
KafkaProtocolBuilder protocol = kafka()
.bootstrapServers("localhost:9092")
// No extra config needed; Key Matching is the default
;If your application uses a specific header (e.g., correlationId, traceId) to propagate context, you can configure the extension to look there.
KafkaProtocolBuilder protocol = kafka()
.bootstrapServers("localhost:9092")
.correlationHeaderName("correlationId") // Look for ID in this header
;If the Correlation ID is embedded within the message payload (e.g., inside a JSON body), use correlationExtractor with a specific implementation like JsonPathExtractor.
KafkaProtocolBuilder protocol = kafka()
.correlationExtractor(
new JsonPathExtractor("$.meta.correlationId") // Extract from JSON path
);By default, the extension uses the system clock to measure the end time of a transaction. However, you can configure it to use the timestamp from the Kafka message header instead. This is useful when you want to measure the time until the message was produced to the response topic, rather than when it was consumed by Gatling.
KafkaProtocolBuilder protocol = kafka()
.bootstrapServers("localhost:9092")
.useTimestampHeader(true) // Use Kafka message timestamp as end time
// ... other config
## Schema Registry Support (Avro & Protobuf)
The extension supports Avro and Protobuf serialization, integrated with Confluent Schema Registry.
### Dependencies
Add the following dependencies to your `pom.xml`:
```xml
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>7.6.0</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-protobuf-serializer</artifactId>
<version>7.6.0</version>
</dependency>
Ensure you have the Confluent repository configured:
<repositories>
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
</repository>
</repositories>KafkaProtocolBuilder protocol = kafka()
.bootstrapServers("localhost:9092")
.producerProperties(Map.of(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName(),
"schema.registry.url", "http://localhost:8081"
))
.consumerProperties(Map.of(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName(),
"schema.registry.url", "http://localhost:8081",
"specific.avro.reader", "true" // If using specific Avro classes
));
// In your scenario
.exec(
KafkaDsl.kafka("Avro Request")
.requestReply()
.requestTopic("avro-req")
.responseTopic("avro-res")
.key(session -> "key")
.value(session -> new User("Alice", 30)) // Specific Avro Record
.serializationType(String.class, User.class, SerializationType.AVRO) // Or BYTE_ARRAY if serializer handles it
.checks(checks)
.timeout(10, TimeUnit.SECONDS)
)KafkaProtocolBuilder protocol = kafka()
.bootstrapServers("localhost:9092")
.producerProperties(Map.of(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaProtobufSerializer.class.getName(),
"schema.registry.url", "http://localhost:8081"
))
.consumerProperties(Map.of(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaProtobufDeserializer.class.getName(),
"schema.registry.url", "http://localhost:8081",
"specific.protobuf.value.type", Person.class.getName() // Required for specific type
));
// In your scenario
.exec(
KafkaDsl.kafka("Protobuf Request")
.requestReply()
.requestTopic("proto-req")
.responseTopic("proto-res")
.key(session -> "key")
.value(session -> Person.newBuilder().setName("Bob").build())
.serializationType(String.class, Person.class, SerializationType.PROTOBUF)
.checks(checks)
.timeout(10, TimeUnit.SECONDS)
)When your load injection profile finishes, there may still be requests "in-flight" (sent but not yet replied to). If the simulation stops immediately, these requests will be lost or not recorded, potentially skewing your results.
To ensure all requests have a chance to complete (or timeout), you should add a "cooldown" period at the end of your injection profile using nothingFor().
Recommendation: Set the cooldown duration to be at least as long as your request timeout.
setUp(
scn.injectOpen(
rampUsersPerSec(10).to(100).during(60),
constantUsersPerSec(100).during(300),
// Cooldown: Wait for in-flight requests to complete or timeout
nothingFor(Duration.ofSeconds(10))
)
).protocols(kafkaProtocol);This project includes comprehensive integration tests using Testcontainers.
- Install Testcontainers Desktop (Recommended) or configure a remote Docker daemon.
- Enable Tests: Remove
@Ignoreannotations from test classes insrc/test/java/pl/perfluencer/kafka/integration/. - Run:
mvn test
Troubleshooting:
If you encounter Could not find a valid Docker environment, ensure your Docker daemon is running and accessible. You may need to set the DOCKER_HOST environment variable or configure src/test/resources/testcontainers.properties.
- KafkaIntegrationTest: End-to-end producer-consumer flow.
- RedisIntegrationTest:
RedisRequestStoreoperations [ENTERPRISE ONLY]. - PostgresIntegrationTest:
PostgresRequestStoreoperations [ENTERPRISE ONLY]. - MockKafkaRequestReplyIntegrationTest: Uses
MockProducerandMockConsumerfor fast, dependency-free testing of the logic.
The easiest way to get started is using the provided Docker Compose environment, which includes Kafka (with TLS), Redis, PostgreSQL, and Kafka UI.
- Docker Desktop or Docker Engine + Docker Compose
- OpenSSL and Java keytool (for TLS certificates)
# 1. Generate TLS certificates (first time only)
chmod +x scripts/generate-tls-certs.sh
./scripts/generate-tls-certs.sh
# 2. Start all services
docker-compose up -d
# 3. Verify services are healthy
docker-compose ps
# 4. Create test topics
docker exec gatling-kafka kafka-topics --create \
--bootstrap-server localhost:9092 \
--topic request_topic \
--partitions 4 \
--replication-factor 1
docker exec gatling-kafka kafka-topics --create \
--bootstrap-server localhost:9092 \
--topic response_topic \
--partitions 4 \
--replication-factor 1
# 5. Access Kafka UI (optional)
open http://localhost:8080| Service | Port(s) | Description |
|---|---|---|
| Kafka | 9092 (PLAINTEXT), 9093 (TLS) | Message broker |
| Zookeeper | 2181 | Kafka coordination |
| Redis | 6379 | Request store (high performance) |
| PostgreSQL | 5432 | Request store (durable) |
| Kafka UI | 8080 | Web interface for Kafka |
The Docker Compose setup supports both plaintext (port 9092) and TLS (port 9093) connections.
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SslConfigs;
import java.util.Map;
KafkaProtocolBuilder protocol = kafka()
.bootstrapServers("localhost:9093") // Use TLS port
.producerProperties(Map.of(
// TLS Configuration
ProducerConfig.SECURITY_PROTOCOL_CONFIG, "SSL",
SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "./tls-certs/kafka.client.truststore.jks",
SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "kafkatest123",
SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "", // Disable for localhost
// Performance settings
ProducerConfig.ACKS_CONFIG, "all",
ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4"
))
.consumerProperties(Map.of(
// TLS Configuration
"security.protocol", "SSL",
SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "./tls-certs/kafka.client.truststore.jks",
SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "kafkatest123",
SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""
));Security Note: The generated certificates use a test password (
kafkatest123). For production, use a secrets management system (e.g., HashiCorp Vault, AWS Secrets Manager).
# Stop services
docker-compose down
# Remove all data (Redis cache, Postgres DB)
docker-compose down -vThe Gatling Kafka extension uses standard Apache Kafka clients, which automatically expose internal metrics via JMX (Java Management Extensions). You can monitor these metrics using tools like JConsole, VisualVM, or by attaching a Prometheus JMX Exporter.
record-send-rate: The average number of records sent per second.record-error-rate: The average number of record sends that resulted in errors per second.compression-rate: The average compression rate of record batches.buffer-available-bytes: The total amount of buffer memory that is available (not currently used for buffering records). If this drops to 0, the producer will block.request-latency-avg: The average request latency in ms.
records-consumed-rate: The average number of records consumed per second.records-lag-max: The maximum lag in terms of number of records for any partition in this window. Critical for verifying if consumers can keep up.fetch-latency-avg: The average time taken for a fetch request.
Consumer Fetch Metrics (kafka.consumer:type=consumer-fetch-manager-metrics,client-id=*,topic=*,partition=*)
records-lag: The latest lag of the consumer for a specific partition.
- JConsole / VisualVM: Connect to the Gatling JVM process and navigate to the MBeans tab.
- Prometheus JMX Exporter: Configure the agent to scrape the MBeans listed above and visualize them in Grafana.
You can inject internal Kafka client metrics (like consumer lag) directly into the Gatling simulation loop. This allows you to fail the test if the system is not healthy, even if response times are low.
Enable Metric Injection:
KafkaProtocolBuilder protocol = kafka()
// ... other config
.metricInjectionInterval(Duration.ofSeconds(1)); // Inject metrics every secondAssert on Metrics:
Once enabled, metrics appear in the Gatling stats as pseudo-requests with the name "Kafka Metrics". You can use standard Gatling assertions on them.
setUp(scn.injectOpen(atOnceUsers(1)))
.protocols(protocol)
.assertions(
// Fail if max consumer lag exceeds 100 records
details("Kafka Metrics").max("kafka-consumer-lag-max").lt(100),
// Fail if we have any record errors
details("Kafka Metrics").max("kafka-producer-record-error-rate").is(0.0)
);Available Injected Metrics:
kafka-consumer-lag-maxkafka-producer-record-error-ratekafka-producer-request-latency-avgkafka-consumer-fetch-latency-avg
In addition to JMX metrics, the extension reports specific breakdown metrics to Gatling's StatsEngine for every transaction. This allows you to pinpoint latency sources.
| Metric Name Suffix | Description |
|---|---|
(none) |
The full End-to-End Latency. Includes: Send + Process + Store + Receive time. |
-send |
Send Duration: Time taken by the Kafka Producer to send the message and receive acknowledgement (if waitForAck=true). High values indicate network issues or broker congestion. |
-store |
Store Duration: Time taken to persist the request in the Request Store (Redis/Postgres). High values indicate database bottlenecks. |
Example:
If your request name is TransactionRequest:
TransactionRequest: Total time (e.g., 150ms)TransactionRequest-send: Broker ack time (e.g., 20ms)TransactionRequest-store: Redis write time (e.g., 2ms)
You can use standard Gatling assertions on these breakdown metrics:
// Assert that Redis write times are fast
details("TransactionRequest-store").responseTime().percentile99().lt(10)
// Assert that Broker ack times are reasonable
details("TransactionRequest-send").responseTime().percentile99().lt(50)Configuration: Detailed store latency metrics are disabled by default to minimize overhead. You can enable them if you need to debug request storage performance:
KafkaProtocolBuilder protocol = kafka()
// ...
.measureStoreLatency(true); // Enable detailed store metricsOne of the most powerful features of this extension is its ability to measure the true impact of failures on your event-driven architecture. Unlike simple throughput tests, this framework tracks end-to-end latency and data integrity during outages, giving you real insight into user experience during incidents.
Most Kafka load tests only measure "happy path" performance:
- β How many messages per second?
- β What's the average latency?
- β What happens when a broker dies?
- β How long does recovery take?
- β Are any messages lost or corrupted?
This framework answers those critical questions.
- Persistent Request Tracking: Using Redis/PostgreSQL, requests survive application crashes and restarts
- Timeout Detection: Automatically detects and reports requests that never complete due to application failures
- End-to-End Measurement: Captures the full impact of failures on user experience, including recovery time
- Data Validation: Verifies responses are correct, not just "a response arrived"
Test Objective: Measure the impact of a broker failure on end-to-end latency and error rates.
Setup:
// Configure with durable request store for failure resilience
KafkaProtocolBuilder protocol = kafka()
.bootstrapServers("broker1:9092,broker2:9092,broker3:9092")
.requestStore(postgresStore) // Survives test disruptions
.producerProperties(Map.of(
ProducerConfig.ACKS_CONFIG, "all",
ProducerConfig.RETRIES_CONFIG, "10",
ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "500"
))
.consumerProperties(Map.of(
"session.timeout.ms", "30000",
"heartbeat.interval.ms", "3000"
));
ScenarioBuilder chaosScenario = scenario("Broker Outage Test")
.exec(
KafkaDsl.kafka("Outage Test")
.requestReply()
.requestTopic("request-topic")
.responseTopic("response-topic")
.key(session -> UUID.randomUUID().toString())
.value(session -> generateTestPayload())
.serializationType(String.class, String.class, SerializationType.STRING)
.checks(validationChecks)
.timeout(30, TimeUnit.SECONDS)
);
setUp(
chaosScenario.injectOpen(
constantUsersPerSec(50).during(120) // 2 minutes of steady load
)
).protocols(protocol);Test Procedure:
- Start the test (50 requests/sec baseline)
- At T+30s: Kill a broker (e.g.,
docker stop kafka-broker-2) - Observe: Latency spikes as clients reconnect, some requests may time out
- At T+60s: Restart the broker (
docker start kafka-broker-2) - Observe: Recovery time, pending requests completing
What to Measure:
- Latency Spike: How high does P95/P99 latency go during the outage?
- Error Rate: What percentage of requests fail or timeout?
- Recovery Time: How long until latency returns to baseline?
- Data Integrity: Do all successful responses still pass validation?
Expected Results (healthy system):
- Brief latency spike (5-15 seconds) during metadata refresh
- No data corruption (all checks pass)
- Automatic recovery when broker returns
Test Objective: Measure the impact of application deployment (rolling restart) on request-reply latency.
Setup:
KafkaProtocolBuilder protocol = kafka()
.bootstrapServers("localhost:9092")
.requestStore(redisStore) // Track requests during app restart
.numConsumers(4) // Match your application's consumer count
.pollTimeout(Duration.ofSeconds(1));
ScenarioBuilder rebalanceScenario = scenario("Consumer Rebalance Test")
.forever()
.exec(
KafkaDsl.kafka("Rebalance Test")
.requestReply()
.requestTopic("orders-in")
.responseTopic("orders-out")
.key(session -> "order-" + session.userId())
.value(session -> createOrder())
.serializationType(String.class, String.class, SerializationType.JSON)
.checks(orderValidationChecks)
.timeout(45, TimeUnit.SECONDS) // Higher timeout for rebalance
)
.pace(Duration.ofSeconds(1)); // 1 request per second per user
setUp(
rebalanceScenario.injectOpen(
rampUsers(20).during(30),
constantUsersPerSec(20).during(300), // 5 minutes steady state
nothingFor(Duration.ofSeconds(60)) // Cooldown for pending requests
)
).protocols(protocol);Test Procedure:
- Start test with 20 concurrent users
- At T+60s: Restart one application instance (triggers rebalance)
- Observe: Consumer group pauses, partition reassignment
- At T+120s: Restart another application instance
- Observe: Additional rebalance
- At T+180s: Scale up application (add new instance)
- Observe: Final rebalance
What to Measure:
- Rebalance Duration: How long does the consumer group pause?
- Latency Impact: Do requests queue up and complete later with higher latency?
- Timeout Rate: Do any requests timeout during the rebalance?
- Message Loss: Are all requests eventually processed?
Gatling Assertions:
.assertions(
global().responseTime().percentile3().lt(10000), // P99 < 10s even during rebalance
global().successfulRequests().percent().gt(99.0) // 99% success rate
)Test Objective: Test behavior when a partition leader fails and a new leader must be elected.
Setup:
// Target a specific partition for controlled chaos
KafkaProtocolBuilder protocol = kafka()
.bootstrapServers("localhost:9092")
.requestStore(postgresStore)
.producerProperties(Map.of(
ProducerConfig.ACKS_CONFIG, "all", // Requires in-sync replicas
ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1", // Strong ordering
ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "30000"
));
// Use consistent partitioning to target specific partition
ScenarioBuilder partitionChaos = scenario("Leader Election Test")
.exec(
KafkaDsl.kafka("Leader Election")
.requestReply()
.requestTopic("test-topic")
.responseTopic("response-topic")
.key(session -> "fixed-key-partition-0") // Always partition 0
.value(session -> "test-message-" + System.nanoTime())
.serializationType(String.class, String.class, SerializationType.STRING)
.checks(checks)
.timeout(20, TimeUnit.SECONDS)
);Test Procedure:
- Start test targeting partition 0
- Identify leader:
kafka-topics.sh --describe --topic test-topic - Kill leader broker: Stop the broker hosting partition 0's leader
- Observe: ISR shrinks, new leader elected from replicas
- Monitor: Request latency during election (typically 5-30 seconds)
Advanced Metrics Collection:
// Enable metric injection to track producer retry behavior
.metricInjectionInterval(Duration.ofMillis(500))
// Assert that retries happen but eventually succeed
.assertions(
details("Kafka Metrics").max("kafka-producer-record-error-rate").lt(0.01),
global().successfulRequests().percent().gt(95.0)
)Test Objective: Verify that requests survive application crashes and are correctly measured when the app recovers.
Key Insight: This scenario showcases why PostgresRequestStore or RedisRequestStore is critical for resilience testing.
Setup:
// Use PostgresRequestStore - survives crashes
PostgresRequestStore persistentStore = new PostgresRequestStore(
dataSource,
"requests"
);
KafkaProtocolBuilder protocol = kafka()
.bootstrapServers("localhost:9092")
.requestStore(persistentStore)
.timeoutCheckInterval(Duration.ofSeconds(5));
ScenarioBuilder crashRecoveryScenario = scenario("Crash Recovery Test")
.exec(
KafkaDsl.kafka("Crash Recovery")
.requestReply()
.requestTopic("crash-test-in")
.responseTopic("crash-test-out")
.key(session -> UUID.randomUUID().toString())
.value(session -> createComplexTransaction())
.serializationType(String.class, String.class, SerializationType.PROTOBUF)
.checks(transactionChecks)
.timeout(120, TimeUnit.SECONDS) // Long timeout for crash recovery
);Test Procedure:
- T+0s: Start test, requests flowing normally
- T+30s: Force crash your application (
kill -9 <pid>) - Observe: Requests stored in Postgres, waiting for responses
- T+60s: Restart application
- Observe: Application processes queued messages, sends responses
- Result: Framework matches responses to requests, reports true latency including downtime
What This Proves:
- No Silent Failures: Every request is accounted for
- True User Impact: Latency includes the crash downtime (what users actually experience)
- Data Durability: Request metadata survives crashes
Sample Results:
Request sent at T+25s
Application crashes at T+30s (request in Postgres)
Application restarts at T+60s
Response received at T+65s
Reported latency: 40 seconds (TRUE user experience)
Test Objective: Test behavior when Gatling can reach Kafka but the application cannot (or vice versa).
Setup Using Toxiproxy:
# Setup Toxiproxy to introduce network failures
docker run -d --name toxiproxy \
-p 8474:8474 -p 9093:9093 \
ghcr.io/shopify/toxiproxy
# Create proxy for Kafka broker
toxiproxy-cli create kafka-proxy \
--listen 0.0.0.0:9093 \
--upstream kafka:9092Gatling Configuration:
KafkaProtocolBuilder protocol = kafka()
.bootstrapServers("toxiproxy:9093") // Through proxy
.requestStore(redisStore)
.producerProperties(Map.of(
ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000",
ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "15000"
));Test Procedure:
- Start test with normal traffic
- Inject latency:
toxiproxy-cli toxic add -t latency -a latency=5000 kafka-proxy - Observe: Slow responses, potential timeouts
- Inject packet loss:
toxiproxy-cli toxic add -t slow_close kafka-proxy - Observe: Connection failures, retry behavior
-
Use Persistent Request Stores
- InMemoryRequestStore: Development only
- RedisRequestStore: Most chaos scenarios
- PostgresRequestStore: Audit trail needed
-
Set Realistic Timeouts
- Too short: False failures during recovery
- Too long: Hides real problems
- Recommendation: 2-3x normal P99 latency
-
Add Cooldown Periods
nothingFor(Duration.ofSeconds(60)) // Let pending requests complete
-
Monitor Both Sides
- Gatling metrics (client view)
- Kafka broker metrics (server view)
- Application logs (processing view)
-
Gradual Chaos Introduction
- Start with one failure type
- Baseline performance first
- Document expected vs. actual behavior
-
Automate Chaos Injection
exec(session -> { if (session.userId() % 100 == 0) { // Trigger chaos every 100 iterations Runtime.getRuntime().exec("scripts/kill-random-broker.sh"); } return session; })
After chaos testing, you should capture:
| Metric | Baseline | During Failure | Recovery Time |
|---|---|---|---|
| Avg Latency | 150ms | 8,500ms | 12 seconds |
| P99 Latency | 450ms | 35,000ms | 25 seconds |
| Error Rate | 0.01% | 12% | - |
| Throughput | 500/sec | 180/sec | 45 seconds |
| Validation Failures | 0 | 0 | - |
Key Finding: System degraded but no data corruption (validation passed for all completed requests).
If you are new to Gatling or Kafka, this framework might look a bit intimidating. Here is a guide to help you get started without getting overwhelmed.
Don't worry about Redis or Postgres yet. The framework uses an InMemoryRequestStore by default if you don't configure anything else.
- What it does: Keeps track of your requests in the memory of the running Java process.
- Limitation: If you stop the test, the data is gone. But for writing and debugging your first script, this is perfect.
The old constructor-based MessageCheck was verbose. Now you have fluent shortcuts:
// Old way (verbose)
new MessageCheck<>("Check", String.class, STRING, String.class, STRING, (req, res) -> ...);
// New way (one-liner!)
MessageCheck.echoCheck() // Request == Response
MessageCheck.responseContains("SUCCESS") // Response contains text
MessageCheck.jsonPathEquals("$.id", "123") // JSONPath checkTip: Start with MessageCheck.echoCheck() or MessageCheck.jsonPathEquals() β they cover 90% of use cases!
- Use Request-Reply (Default) when you need to verify that your application actually processed the message correctly. This is for Quality.
- Use Fire-and-Forget (
waitForAck = false) when you just want to see if your broker can handle 1 million messages per second. This is for Quantity.
The kafkaRequestReply method has many arguments. Use your IDE (IntelliJ/Eclipse) to help you:
- Cmd+P / Ctrl+P: Shows you which parameter comes next.
- Auto-Complete: Type
KafkaDsl.and see what's available.
Kafka is designed to move massive amounts of data, but you have to choose between Efficiency (Throughput) and Speed (Latency).
-
The Bus (High Throughput):
- Scenario: You want to move 10,000 people.
- Strategy: Wait for the bus to fill up (
linger.ms=10). - Result: High efficiency, fewer network requests.
- Trade-off: The first person on the bus has to wait for the last person before leaving.
-
The Taxi (Low Latency):
- Scenario: You want to move 1 person immediately.
- Strategy: Leave right now (
linger.ms=0). - Result: Lowest possible latency for that person.
- Trade-off: Inefficient. You are sending a whole vehicle for just one person.
-
The Fleet (Concurrency):
- Scenario: You have 10,000 people who ALL want "Taxi" speed.
- Problem: A single driver cannot drive 10,000 taxis at once.
- Solution: You need more drivers (
numProducers). - Rule of Thumb:
- Standard:
numProducers(1)is enough for 90% of cases (Bus mode). - Low Latency: If you force
linger.ms=0(Taxi mode) AND have high volume, you MUST increasenumProducers(e.g., 4-8) to handle the concurrency, otherwise requests will queue up waiting for the driver.
- Standard:
It is important to understand that the Kafka Broker writes Batches to disk, not individual messages.
- 1000 messages in 1 batch = 1 Disk Write (Efficient).
- 1000 messages in 1000 batches = 1000 Disk Writes (Heavy Load).
Your Goal: Unless you are specifically testing the Broker's limits (e.g., "Can it handle 50k IOPS?"), your goal is usually to measure the End-to-End Latency of your application.
- Don't obsess over "Messages Per Second" if it means destroying your latency.
- A realistic test often uses moderate batching (e.g.,
linger.ms=5) to simulate real-world traffic patterns where multiple users are active simultaneously.