The Scroll API provides efficient streaming access to large result sets from Elasticsearch using Akka Streams. It automatically selects the optimal scrolling strategy based on your Elasticsearch version, and whether aggregations are present.
Key Features:
- Automatic strategy selection (PIT + search_after, search_after, or classic scroll)
- Akka Streams integration for reactive data processing
- Type-safe result conversion with automatic deserialization
- Built-in metrics tracking (throughput, batches, duration)
- Automatic error handling with retry logic
- Memory-efficient streaming for large datasets
- Configurable batch sizes and limits
Dependencies:
- Requires
SearchApifor query execution - Requires Akka Streams for reactive streaming
- Requires
ElasticConversionfor result parsing
- Core Concepts
- Scroll Strategies
- Configuration
- Basic Usage
- Typed Scrolling
- Metrics and Monitoring
- Error Handling
- Performance Tuning
- Advanced Patterns
- Testing
- Best Practices
The API automatically selects the best strategy based on your query and the Elasticsearch version:
Strategy Selection Matrix:
| ES Version | Aggregations | Strategy |
|---|---|---|
| 7.10+ | No | PIT + search_after (recommended) |
| 7.10+ | Yes | Classic scroll |
| < 7.10 | No | search_after |
| < 7.10 | Yes | Classic scroll |
sealed trait ScrollStrategy
// Point In Time + search_after (ES 7.10+, best performance)
case object UsePIT extends ScrollStrategy
// search_after only (efficient, no server state)
case object UseSearchAfter extends ScrollStrategy
// Classic scroll (supports aggregations)
case object UseScroll extends ScrollStrategyStrategy Comparison:
| Strategy | Server State | Aggregations | Deep Pagination | Timeout Issues | Performance |
|---|---|---|---|---|---|
| PIT + search_after | Minimal | ❌ No | ✅ Excellent | ❌ None | ⭐⭐⭐⭐⭐ |
| search_after | None | ❌ No | ✅ Good | ❌ None | ⭐⭐⭐⭐ |
| Classic scroll | Yes | ✅ Yes | ⭐⭐⭐ |
// Basic scroll source (returns Map with metrics)
def scroll(
sql: SQLQuery,
config: ScrollConfig = ScrollConfig()
)(implicit system: ActorSystem): Source[(Map[String, Any], ScrollMetrics), NotUsed]
// Typed scroll source (automatic deserialization)
def scrollAsUnchecked[T](
sql: SQLQuery,
config: ScrollConfig = ScrollConfig()
)(implicit
system: ActorSystem,
m: Manifest[T],
formats: Formats
): Source[(T, ScrollMetrics), NotUsed]Best for: ES 7.10+, large result sets, no aggregations
Advantages:
- ✅ Consistent snapshot across pagination
- ✅ No scroll timeout issues
- ✅ Better resource usage
- ✅ Automatic cleanup
- ✅ Suitable for deep pagination
Limitations:
- ❌ Not supported with aggregations
- ❌ Requires ES 7.10+
// Automatically used for ES 7.10+ without aggregations
val query = SQLQuery(
query = """
SELECT id, name, price
FROM products
WHERE category = 'electronics'
ORDER BY price DESC
"""
)
// Will use PIT + search_after strategy
client.scroll(query).runWith(Sink.seq)Best for: ES < 7.10, large result sets, no aggregations
Advantages:
- ✅ No server-side state
- ✅ Efficient pagination
- ✅ No timeout issues
- ✅ Good for deep pagination
Limitations:
- ❌ Not supported with aggregations
- ❌ Requires sort fields
⚠️ No consistent snapshot (data can change between pages)
// Automatically used for ES < 7.10 without aggregations
val query = SQLQuery(
query = """
SELECT id, name, price
FROM products
WHERE category = 'electronics'
ORDER BY created_at DESC, id ASC
"""
)
// Will use search_after strategy
client.scroll(query).runWith(Sink.seq)Best for: Queries with aggregations, consistent snapshots required
Advantages:
- ✅ Supports aggregations
- ✅ Consistent snapshot
- ✅ Works on all ES versions
- ✅ Automatic cleanup
Limitations:
⚠️ Server-side state (scroll context)⚠️ Subject to scroll timeout⚠️ Higher resource usage⚠️ Limited deep pagination
// Automatically used when aggregations are present
val query = SQLQuery(
query = """
SELECT
category,
COUNT(*) as total,
AVG(price) as avg_price
FROM products
GROUP BY category
"""
)
// Will use classic scroll strategy
client.scroll(query).runWith(Sink.seq)case class ScrollConfig(
// Batch size (documents per request)
scrollSize: Int = 1000,
// Keep-alive time for scroll context
keepAlive: String = "1m",
// Maximum documents to retrieve (None = unlimited)
maxDocuments: Option[Long] = None,
// Prefer search_after over classic scroll
preferSearchAfter: Boolean = true,
// Log progress every N batches
logEvery: Int = 10,
// Initial metrics
metrics: ScrollMetrics = ScrollMetrics(),
// Retry configuration
retryConfig: RetryConfig = RetryConfig()
)Configuration Options:
| Parameter | Type | Default | Description |
|---|---|---|---|
scrollSize |
Int |
1000 |
Number of documents per batch |
keepAlive |
String |
"1m" |
Scroll context timeout (classic scroll only) |
maxDocuments |
Option[Long] |
None |
Maximum documents to retrieve |
preferSearchAfter |
Boolean |
true |
Prefer search_after when available |
logEvery |
Int |
10 |
Log progress every N batches |
metrics |
ScrollMetrics |
ScrollMetrics() |
Initial metrics state |
case class ScrollMetrics(
totalDocuments: Long = 0,
totalBatches: Int = 0,
startTime: Long = System.currentTimeMillis(),
endTime: Option[Long] = None
) {
// Calculate duration in milliseconds
def duration: Long = endTime.getOrElse(System.currentTimeMillis()) - startTime
// Calculate throughput (documents per second)
def documentsPerSecond: Double = {
val durationSec = duration / 1000.0
if (durationSec > 0) totalDocuments / durationSec else 0.0
}
// Mark as complete
def complete: ScrollMetrics = copy(endTime = Some(System.currentTimeMillis()))
}Metrics Fields:
| Field | Type | Description |
|---|---|---|
totalDocuments |
Long |
Total documents retrieved |
totalBatches |
Int |
Total batches processed |
startTime |
Long |
Start timestamp (milliseconds) |
endTime |
Option[Long] |
End timestamp (milliseconds) |
duration |
Long |
Total duration (milliseconds) |
documentsPerSecond |
Double |
Throughput rate |
import akka.actor.ActorSystem
import akka.stream.scaladsl.{Sink, Source}
import scala.concurrent.ExecutionContext.Implicits.global
implicit val system: ActorSystem = ActorSystem("scroll-example")
// Simple SQL query
val query = SQLQuery(
query = """
SELECT id, name, price, category
FROM products
WHERE price > 100
ORDER BY price DESC
"""
)
// Scroll through results
client.scroll(query).runWith(Sink.foreach { case (doc, metrics) =>
println(s"Document: $doc")
println(s"Progress: ${metrics.totalDocuments} docs, ${metrics.documentsPerSecond} docs/sec")
})// Collect all documents into a sequence
val allDocs: Future[Seq[(Map[String, Any], ScrollMetrics)]] =
client.scroll(query).runWith(Sink.seq)
allDocs.foreach { results =>
println(s"Retrieved ${results.size} documents")
// Get final metrics
val finalMetrics = results.lastOption.map(_._2)
finalMetrics.foreach { m =>
println(s"Total time: ${m.duration}ms")
println(s"Throughput: ${m.documentsPerSecond} docs/sec")
}
// Process documents
results.foreach { case (doc, _) =>
println(s"ID: ${doc.get("id")}, Name: ${doc.get("name")}")
}
}// Limit to first 5000 documents
val config = ScrollConfig(
scrollSize = 500,
maxDocuments = Some(5000)
)
client.scroll(query, config).runWith(Sink.foreach { case (doc, metrics) =>
println(s"Document ${metrics.totalDocuments}: ${doc.get("name")}")
})// Process in batches of 2000
val config = ScrollConfig(scrollSize = 2000)
client.scroll(query, config)
.grouped(2000)
.runWith(Sink.foreach { batch =>
println(s"Processing batch of ${batch.size} documents")
// Batch processing logic
processBatch(batch.map(_._1))
})case class Product(
id: String,
name: String,
price: Double,
category: String,
stock: Int
)
implicit val formats: Formats = DefaultFormats
val query = SQLQuery(
query = """
SELECT id, name, price, category, stock
FROM products
WHERE category = 'electronics'
"""
)
// Scroll with automatic type conversion
client.scrollAsUnchecked[Product](query).runWith(Sink.foreach { case (product, metrics) =>
println(s"Product: ${product.name} - $${product.price}")
println(s"Progress: ${metrics.totalDocuments} products")
})// Collect all products
val allProducts: Future[Seq[Product]] =
client.scrollAsUnchecked[Product](query)
.map(_._1) // Extract product, discard metrics
.runWith(Sink.seq)
allProducts.foreach { products =>
println(s"Retrieved ${products.size} products")
val totalValue = products.map(_.price).sum
println(f"Total inventory value: $$${totalValue}%,.2f")
}// Filter expensive products during streaming
client.scrollAsUnchecked[Product](query)
.filter { case (product, _) => product.price > 500 }
.map(_._1) // Extract product
.runWith(Sink.seq)
.foreach { expensiveProducts =>
println(s"Found ${expensiveProducts.size} expensive products")
expensiveProducts.foreach { p =>
println(s" ${p.name}: $${p.price}")
}
}case class ProductSummary(name: String, value: Double)
client.scrollAsUnchecked[Product](query)
.map { case (product, _) =>
ProductSummary(
name = product.name,
value = product.price * product.stock
)
}
.runWith(Sink.seq)
.foreach { summaries =>
val totalValue = summaries.map(_.value).sum
println(f"Total inventory value: $$${totalValue}%,.2f")
}val query =
"""
SELECT id, name, price, category, stock
FROM products
WHERE category = 'electronics'
"""
client.scrollAs[Product](query)
.map { case (product, _) =>
ProductSummary(
name = product.name,
value = product.price * product.stock
)
}
.runWith(Sink.seq)
.foreach { summaries =>
val totalValue = summaries.map(_.value).sum
println(f"Total inventory value: $$${totalValue}%,.2f")
}
)
📖 Full SQL Validation Documentation
val config = ScrollConfig(
scrollSize = 1000,
logEvery = 5 // Log every 5 batches
)
client.scroll(query, config).runWith(Sink.foreach { case (doc, metrics) =>
// Metrics are automatically updated
if (metrics.totalBatches % 5 == 0) {
println(s"Progress Report:")
println(s" Documents: ${metrics.totalDocuments}")
println(s" Batches: ${metrics.totalBatches}")
println(s" Duration: ${metrics.duration}ms")
println(s" Throughput: ${metrics.documentsPerSecond} docs/sec")
}
})
// Output:
// Progress Report:
// Documents: 5000
// Batches: 5
// Duration: 2345ms
// Throughput: 2132.2 docs/secclient.scroll(query)
.runWith(Sink.last)
.foreach { case (_, finalMetrics) =>
val completed = finalMetrics.complete
println("Scroll Completed!")
println(s" Total Documents: ${completed.totalDocuments}")
println(s" Total Batches: ${completed.totalBatches}")
println(s" Total Duration: ${completed.duration}ms")
println(s" Average Throughput: ${completed.documentsPerSecond} docs/sec")
}case class CustomMetrics(
scrollMetrics: ScrollMetrics,
processedCount: Long = 0,
errorCount: Long = 0,
skippedCount: Long = 0
)
client.scroll(query)
.scan(CustomMetrics(ScrollMetrics())) { case (custom, (doc, scrollMetrics)) =>
// Update custom metrics
val processed = if (processDocument(doc)) {
custom.copy(
scrollMetrics = scrollMetrics,
processedCount = custom.processedCount + 1
)
} else {
custom.copy(
scrollMetrics = scrollMetrics,
skippedCount = custom.skippedCount + 1
)
}
processed
}
.runWith(Sink.last)
.foreach { finalCustom =>
println(s"Processed: ${finalCustom.processedCount}")
println(s"Skipped: ${finalCustom.skippedCount}")
println(s"Errors: ${finalCustom.errorCount}")
}
def processDocument(doc: Map[String, Any]): Boolean = {
// Processing logic
true
}The API automatically handles:
- ✅ Network timeouts (with retry)
- ✅ Expired scroll contexts
- ✅ Elasticsearch errors
- ✅ Connection issues
// Automatic error handling is built-in
client.scroll(query).runWith(Sink.seq).recover {
case ex: Exception =>
logger.error("Scroll failed", ex)
Seq.empty
}import akka.stream.Supervision
// Define custom recovery strategy
implicit val decider: Supervision.Decider = {
case _: java.net.SocketTimeoutException =>
logger.warn("Timeout, resuming...")
Supervision.Resume
case _: org.elasticsearch.ElasticsearchException =>
logger.error("ES error, stopping...")
Supervision.Stop
case ex =>
logger.error(s"Unexpected error: ${ex.getMessage}")
Supervision.Stop
}
// Apply supervision strategy
client.scroll(query)
.withAttributes(ActorAttributes.supervisionStrategy(decider))
.runWith(Sink.seq)import akka.stream.scaladsl.RetryFlow
import scala.concurrent.duration._
// Add retry logic for failed batches
client.scroll(query)
.via(RetryFlow.withBackoff(
minBackoff = 1.second,
maxBackoff = 10.seconds,
randomFactor = 0.2,
maxRetries = 3
) { case (doc, _) =>
Future {
processDocument(doc)
}
})
.runWith(Sink.seq)client.scroll(query)
.recover {
case ex: java.net.SocketTimeoutException =>
logger.error("Network timeout", ex)
throw ex
case ex: org.elasticsearch.ElasticsearchException =>
logger.error(s"Elasticsearch error: ${ex.getMessage}", ex)
throw ex
case ex: Exception =>
logger.error("Unexpected error during scroll", ex)
throw ex
}
.runWith(Sink.seq)
.recover {
case ex =>
logger.error("Failed to complete scroll", ex)
Seq.empty
}def scrollWithFallback(query: SQLQuery): Future[Seq[Map[String, Any]]] = {
client.scroll(query)
.map(_._1) // Extract documents
.runWith(Sink.seq)
.recoverWith {
case ex: Exception =>
logger.warn(s"Scroll failed, trying regular search: ${ex.getMessage}")
// Fallback to regular search
client.search(query).map {
case ElasticSuccess(results) => results
case ElasticFailure(error) =>
logger.error(s"Fallback also failed: ${error.message}")
Seq.empty
}
}
}// Small documents (< 1KB each)
val smallDocConfig = ScrollConfig(scrollSize = 5000)
// Medium documents (1-10KB each)
val mediumDocConfig = ScrollConfig(scrollSize = 1000)
// Large documents (> 10KB each)
val largeDocConfig = ScrollConfig(scrollSize = 100)
// Choose based on document size
val config = if (avgDocSize < 1024) smallDocConfig
else if (avgDocSize < 10240) mediumDocConfig
else largeDocConfig
client.scroll(query, config).runWith(Sink.seq)// Process batches in parallel
client.scroll(query)
.grouped(1000)
.mapAsync(parallelism = 4) { batch =>
Future {
// Parallel batch processing
processBatchInParallel(batch.map(_._1))
}
}
.runWith(Sink.ignore)// Stream to file to avoid memory issues
import java.io.PrintWriter
val writer = new PrintWriter("results.json")
client.scroll(query)
.map { case (doc, _) =>
// Convert to JSON string
compact(render(Extraction.decompose(doc)))
}
.runWith(Sink.foreach { json =>
writer.println(json)
})
.onComplete { _ =>
writer.close()
println("Results written to file")
}// Add buffer to handle backpressure
client.scroll(query)
.buffer(100, OverflowStrategy.backpressure)
.mapAsync(parallelism = 2) { case (doc, _) =>
// Slow processing
processDocumentAsync(doc)
}
.runWith(Sink.seq)import scala.concurrent.duration._
// Throttle to 100 documents per second
client.scroll(query)
.throttle(100, 1.second)
.runWith(Sink.foreach { case (doc, metrics) =>
println(s"Processing: ${doc.get("id")}")
})// Process in batches with commit points
client.scroll(query)
.grouped(1000)
.mapAsync(1) { batch =>
for {
_ <- processBatch(batch.map(_._1))
_ <- commitBatch(batch.size)
} yield batch.size
}
.runWith(Sink.fold(0)(_ + _))
.foreach { total =>
println(s"Processed and committed $total documents")
}
def processBatch(docs: Seq[Map[String, Any]]): Future[Unit] = {
// Batch processing logic
Future.successful(())
}
def commitBatch(size: Int): Future[Unit] = {
// Commit logic (e.g., database transaction)
Future.successful(())
}case class RawProduct(id: String, name: String, price: Double)
case class EnrichedProduct(id: String, name: String, price: Double, category: String, tags: Seq[String])
client.scrollAsUnchecked[RawProduct](query)
.mapAsync(parallelism = 4) { case (raw, _) =>
// Enrich each product
enrichProduct(raw)
}
.filter(_.tags.nonEmpty) // Filter enriched products
.grouped(100)
.mapAsync(1) { batch =>
// Bulk index enriched products
bulkIndexProducts(batch)
}
.runWith(Sink.ignore)
def enrichProduct(raw: RawProduct): Future[EnrichedProduct] = {
// Enrichment logic (e.g., external API call)
Future.successful(
EnrichedProduct(
raw.id,
raw.name,
raw.price,
"electronics",
Seq("popular", "sale")
)
)
}
def bulkIndexProducts(products: Seq[EnrichedProduct]): Future[Unit] = {
// Bulk indexing logic
Future.successful(())
}import akka.stream.scaladsl.Broadcast
// Fan-out to multiple sinks
val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val source = client.scroll(query).map(_._1)
val broadcast = builder.add(Broadcast[Map[String, Any]](3))
// Sink 1: Write to file
val fileSink = Sink.foreach[Map[String, Any]] { doc =>
writeToFile(doc)
}
// Sink 2: Index to another ES
val indexSink = Sink.foreach[Map[String, Any]] { doc =>
indexToElasticsearch(doc)
}
// Sink 3: Send to Kafka
val kafkaSink = Sink.foreach[Map[String, Any]] { doc =>
sendToKafka(doc)
}
source ~> broadcast
broadcast ~> fileSink
broadcast ~> indexSink
broadcast ~> kafkaSink
ClosedShape
})
graph.run()
def writeToFile(doc: Map[String, Any]): Unit = { /* ... */ }
def indexToElasticsearch(doc: Map[String, Any]): Unit = { /* ... */ }
def sendToKafka(doc: Map[String, Any]): Unit = { /* ... */ }case class Statistics(
count: Long = 0,
sum: Double = 0.0,
min: Double = Double.MaxValue,
max: Double = Double.MinValue
) {
def avg: Double = if (count > 0) sum / count else 0.0
def update(value: Double): Statistics = Statistics(
count = count + 1,
sum = sum + value,
min = math.min(min, value),
max = math.max(max, value)
)
}
client.scrollAsUnchecked[Product](query)
.map(_._1.price) // Extract prices
.fold(Statistics())(_ update _)
.runWith(Sink.head)
.foreach { stats =>
println(s"Price Statistics:")
println(f" Count: ${stats.count}")
println(f" Average: $$${stats.avg}%.2f")
println(f" Min: $$${stats.min}%.2f")
println(f" Max: $$${stats.max}%.2f")
}client.scrollAsUnchecked[Product](query)
.mapAsync(parallelism = 4) { case (product, _) =>
product.category match {
case "electronics" => processElectronics(product)
case "clothing" => processClothing(product)
case "books" => processBooks(product)
case _ => processGeneric(product)
}
}
.runWith(Sink.ignore)
def processElectronics(p: Product): Future[Unit] = Future.successful(())
def processClothing(p: Product): Future[Unit] = Future.successful(())
def processBooks(p: Product): Future[Unit] = Future.successful(())
def processGeneric(p: Product): Future[Unit] = Future.successful(())import org.scalatest.flatspec.AsyncFlatSpec
import org.scalatest.matchers.should.Matchers
import akka.stream.scaladsl.Sink
class ScrollApiSpec extends AsyncFlatSpec with Matchers {
implicit val system: ActorSystem = ActorSystem("test")
"ScrollApi" should "scroll through all documents" in {
val testIndex = "test-scroll"
for {
// Setup
_ <- client.createIndexAsync(testIndex)
_ <- Future.sequence((1 to 100).map { i =>
client.indexAsync(testIndex, i.toString, s"""{"id": $i, "value": ${i * 10}}""")
})
_ <- client.refreshAsync(testIndex)
// Test
query = SQLQuery(query = s"SELECT * FROM $testIndex")
results <- client.scroll(query).map(_._1).runWith(Sink.seq)
// Assertions
_ = {
results should have size 100
results.map(_("id").toString.toInt).sorted shouldBe (1 to 100)
}
// Cleanup
_ <- client.deleteIndexAsync(testIndex)
} yield succeed
}
}"ScrollApi" should "scroll with type conversion" in {
case class TestDoc(id: Int, value: Int)
implicit val formats: Formats = DefaultFormats
val testIndex = "test-typed-scroll"
for {
// Setup
_ <- client.createIndexAsync(testIndex)
_ <- Future.sequence((1 to 50).map { i =>
client.indexAsync(testIndex, i.toString, s"""{"id": $i, "value": ${i * 10}}""")
})
_ <- client.refreshAsync(testIndex)
// Test
query = SQLQuery(query = s"SELECT id, value FROM $testIndex")
results <- client.scrollAsUnchecked[TestDoc](query).map(_._1).runWith(Sink.seq)
// Assertions
_ = {
results should have size 50
results.map(_.id).sorted shouldBe (1 to 50)
results.foreach { doc =>
doc.value shouldBe doc.id * 10
}
}
// Cleanup
_ <- client.deleteIndexAsync(testIndex)
} yield succeed
}"ScrollApi" should "track metrics correctly" in {
val testIndex = "test-metrics"
for {
// Setup
_ <- client.createIndexAsync(testIndex)
_ <- Future.sequence((1 to 1000).map { i =>
client.indexAsync(testIndex, i.toString, s"""{"id": $i}""")
})
_ <- client.refreshAsync(testIndex)
// Test
query = SQLQuery(query = s"SELECT * FROM $testIndex")
config = ScrollConfig(scrollSize = 100)
lastMetrics <- client.scroll(query, config)
.map(_._2)
.runWith(Sink.last)
// Assertions
_ = {
val finalMetrics = lastMetrics.complete
finalMetrics.totalDocuments shouldBe 1000
finalMetrics.totalBatches shouldBe 10
finalMetrics.duration should be > 0L
finalMetrics.documentsPerSecond should be > 0.0
}
// Cleanup
_ <- client.deleteIndexAsync(testIndex)
} yield succeed
}"ScrollApi" should "respect maxDocuments limit" in {
val testIndex = "test-limit"
for {
// Setup
_ <- client.createIndexAsync(testIndex)
_ <- Future.sequence((1 to 1000).map { i =>
client.indexAsync(testIndex, i.toString, s"""{"id": $i}""")
})
_ <- client.refreshAsync(testIndex)
// Test with limit
query = SQLQuery(query = s"SELECT * FROM $testIndex")
config = ScrollConfig(
scrollSize = 100,
maxDocuments = Some(250)
)
results <- client.scroll(query, config).map(_._1).runWith(Sink.seq)
// Assertions
_ = {
results.size shouldBe 250
}
// Cleanup
_ <- client.deleteIndexAsync(testIndex)
} yield succeed
}"ScrollApi" should "handle errors gracefully" in {
val testIndex = "test-error-recovery"
for {
// Setup
_ <- client.createIndexAsync(testIndex)
_ <- Future.sequence((1 to 100).map { i =>
client.indexAsync(testIndex, i.toString, s"""{"id": $i}""")
})
_ <- client.refreshAsync(testIndex)
// Test with error handling
query = SQLQuery(query = s"SELECT * FROM $testIndex")
result <- client.scroll(query)
.map(_._1)
.runWith(Sink.seq)
.recover {
case ex: Exception =>
// Should recover from errors
Seq.empty
}
// Assertions
_ = {
result should not be empty
}
// Cleanup
_ <- client.deleteIndexAsync(testIndex)
} yield succeed
}"ScrollApi" should "handle empty index" in {
val testIndex = "test-empty"
for {
// Setup empty index
_ <- client.createIndexAsync(testIndex)
_ <- client.refreshAsync(testIndex)
// Test
query = SQLQuery(query = s"SELECT * FROM $testIndex")
results <- client.scroll(query).map(_._1).runWith(Sink.seq)
// Assertions
_ = {
results shouldBe empty
}
// Cleanup
_ <- client.deleteIndexAsync(testIndex)
} yield succeed
}"ScrollApi" should "process documents in batches" in {
val testIndex = "test-batches"
for {
// Setup
_ <- client.createIndexAsync(testIndex)
_ <- Future.sequence((1 to 500).map { i =>
client.indexAsync(testIndex, i.toString, s"""{"id": $i}""")
})
_ <- client.refreshAsync(testIndex)
// Test
query = SQLQuery(query = s"SELECT * FROM $testIndex")
config = ScrollConfig(scrollSize = 100)
batches <- client.scroll(query, config)
.map(_._1)
.grouped(100)
.runWith(Sink.seq)
// Assertions
_ = {
batches should have size 5
batches.foreach { batch =>
batch should have size 100
}
}
// Cleanup
_ <- client.deleteIndexAsync(testIndex)
} yield succeed
}// ❌ BAD: Too small batch size (too many requests)
val badConfig = ScrollConfig(scrollSize = 10)
// ❌ BAD: Too large batch size (memory issues)
val tooBigConfig = ScrollConfig(scrollSize = 50000)
// ✅ GOOD: Reasonable batch size based on document size
val goodConfig = ScrollConfig(
scrollSize = if (avgDocumentSize < 1024) 5000
else if (avgDocumentSize < 10240) 1000
else 100
)
client.scroll(query, goodConfig).runWith(Sink.seq)// ❌ BAD: No limit (could consume all memory)
client.scroll(query).runWith(Sink.seq)
// ✅ GOOD: Set reasonable limit
val config = ScrollConfig(
scrollSize = 1000,
maxDocuments = Some(100000) // Safety limit
)
client.scroll(query, config).runWith(Sink.seq)case class Product(id: String, name: String, price: Double)
// ❌ BAD: Manual type conversion
client.scroll(query).map { case (doc, _) =>
Product(
doc("id").toString,
doc("name").toString,
doc("price").toString.toDouble
)
}.runWith(Sink.seq)
// ✅ GOOD: Automatic type conversion
implicit val formats: Formats = DefaultFormats
client.scrollAsUnchecked[Product](query)
.map(_._1)
.runWith(Sink.seq)// ✅ GOOD: Add buffer for backpressure handling
client.scroll(query)
.buffer(100, OverflowStrategy.backpressure)
.mapAsync(parallelism = 4) { case (doc, _) =>
// Slow async processing
processDocumentAsync(doc)
}
.runWith(Sink.ignore)// ✅ GOOD: Log progress regularly
val config = ScrollConfig(
scrollSize = 1000,
logEvery = 10 // Log every 10 batches
)
client.scroll(query, config)
.runWith(Sink.foreach { case (doc, metrics) =>
// Metrics are automatically logged
// Custom processing here
processDocument(doc)
})// ✅ GOOD: Ensure proper cleanup
val scrollFuture = client.scroll(query).runWith(Sink.seq)
scrollFuture.onComplete {
case Success(results) =>
logger.info(s"Scroll completed: ${results.size} documents")
// Cleanup is automatic
case Failure(ex) =>
logger.error("Scroll failed", ex)
// Cleanup is automatic even on failure
}// ✅ GOOD: Let the API choose the best strategy
val query = SQLQuery(
query = """
SELECT id, name, price
FROM products
WHERE category = 'electronics'
ORDER BY price DESC
"""
)
// Automatically uses:
// - PIT + search_after for ES 7.10+ (best performance)
// - search_after for ES < 7.10
// - Classic scroll for aggregations
client.scroll(query).runWith(Sink.seq)// ✅ GOOD: Stream to file instead of collecting in memory
import java.io.{BufferedWriter, FileWriter}
val writer = new BufferedWriter(new FileWriter("results.jsonl"))
client.scroll(query)
.map { case (doc, _) =>
compact(render(Extraction.decompose(doc)))
}
.runWith(Sink.foreach { json =>
writer.write(json)
writer.newLine()
})
.onComplete { _ =>
writer.close()
logger.info("Results written to file")
}// ✅ GOOD: Comprehensive error handling
implicit val decider: Supervision.Decider = {
case _: java.net.SocketTimeoutException =>
logger.warn("Network timeout, resuming...")
Supervision.Resume
case ex: org.elasticsearch.ElasticsearchException =>
logger.error(s"ES error: ${ex.getMessage}")
Supervision.Stop
case ex =>
logger.error(s"Unexpected error: ${ex.getMessage}", ex)
Supervision.Stop
}
client.scroll(query)
.withAttributes(ActorAttributes.supervisionStrategy(decider))
.runWith(Sink.seq)
.recover {
case ex: Exception =>
logger.error("Failed to complete scroll", ex)
Seq.empty
}// ❌ BAD: Select all fields (wastes bandwidth)
val badQuery = SQLQuery(query = "SELECT * FROM products")
// ✅ GOOD: Select only needed fields
val goodQuery = SQLQuery(
query = """
SELECT id, name, price
FROM products
WHERE category = 'electronics'
AND price > 100
ORDER BY price DESC
"""
)
client.scroll(goodQuery).runWith(Sink.seq)// ✅ GOOD: Balance parallelism with resources
val parallelism = Runtime.getRuntime.availableProcessors()
client.scroll(query)
.mapAsync(parallelism) { case (doc, _) =>
// Process documents in parallel
processDocumentAsync(doc)
}
.runWith(Sink.ignore)// ✅ GOOD: Test with different scenarios
class ScrollBehaviorSpec extends AsyncFlatSpec with Matchers {
"Scroll" should "work with small datasets" in {
testScroll(documentCount = 100)
}
it should "work with large datasets" in {
testScroll(documentCount = 10000)
}
it should "work with empty results" in {
testScroll(documentCount = 0)
}
it should "respect maxDocuments limit" in {
testScrollWithLimit(
documentCount = 1000,
maxDocuments = 500
)
}
def testScroll(documentCount: Int): Future[Assertion] = {
// Test implementation
Future.successful(succeed)
}
def testScrollWithLimit(
documentCount: Int,
maxDocuments: Int
): Future[Assertion] = {
// Test implementation
Future.successful(succeed)
}
}def exportToFile(
query: SQLQuery,
outputPath: String
): Future[Long] = {
val writer = new PrintWriter(new FileWriter(outputPath))
client.scroll(query)
.map { case (doc, _) =>
compact(render(Extraction.decompose(doc)))
}
.runWith(Sink.fold(0L) { (count, json) =>
writer.println(json)
count + 1
})
.andThen {
case _ => writer.close()
}
}
// Usage
exportToFile(
SQLQuery(query = "SELECT * FROM products"),
"products.jsonl"
).foreach { count =>
println(s"Exported $count documents")
}def bulkReindex(
sourceQuery: SQLQuery,
targetIndex: String,
batchSize: Int = 1000
): Future[Long] = {
client.scroll(sourceQuery)
.map(_._1) // Extract documents
.grouped(batchSize)
.mapAsync(1) { batch =>
// Bulk index to target
val bulkRequest = batch.map { doc =>
s"""{"index":{"_index":"$targetIndex"}}
|${compact(render(Extraction.decompose(doc)))}
|""".stripMargin
}.mkString
client.bulkAsync(bulkRequest).map(_ => batch.size)
}
.runWith(Sink.fold(0L)(_ + _))
}
// Usage
bulkReindex(
SQLQuery(query = "SELECT * FROM old_products"),
"new_products"
).foreach { count =>
println(s"Reindexed $count documents")
}case class ValidationResult(
valid: Long,
invalid: Long,
errors: Seq[String]
)
def validateData(query: SQLQuery): Future[ValidationResult] = {
client.scrollAsUnchecked[Product](query)
.map(_._1)
.runWith(Sink.fold(ValidationResult(0, 0, Seq.empty)) { (result, product) =>
if (isValid(product)) {
result.copy(valid = result.valid + 1)
} else {
result.copy(
invalid = result.invalid + 1,
errors = result.errors :+ s"Invalid product: ${product.id}"
)
}
})
}
def isValid(product: Product): Boolean = {
product.price > 0 && product.name.nonEmpty
}
// Usage
validateData(SQLQuery(query = "SELECT * FROM products")).foreach { result =>
println(s"Valid: ${result.valid}")
println(s"Invalid: ${result.invalid}")
if (result.errors.nonEmpty) {
println("Errors:")
result.errors.take(10).foreach(println)
}
}case class CategoryStats(
category: String,
count: Long,
totalValue: Double,
avgPrice: Double
)
def aggregateByCategory(query: SQLQuery): Future[Map[String, CategoryStats]] = {
client.scrollAsUnchecked[Product](query)
.map(_._1)
.runWith(Sink.fold(Map.empty[String, CategoryStats]) { (stats, product) =>
val current = stats.getOrElse(
product.category,
CategoryStats(product.category, 0, 0.0, 0.0)
)
val updated = CategoryStats(
category = product.category,
count = current.count + 1,
totalValue = current.totalValue + product.price,
avgPrice = (current.totalValue + product.price) / (current.count + 1)
)
stats + (product.category -> updated)
})
}
// Usage
aggregateByCategory(SQLQuery(query = "SELECT * FROM products")).foreach { stats =>
println("Category Statistics:")
stats.values.foreach { s =>
println(s" ${s.category}:")
println(f" Count: ${s.count}")
println(f" Total Value: $$${s.totalValue}%,.2f")
println(f" Avg Price: $$${s.avgPrice}%.2f")
}
}case class RawOrder(id: String, customerId: String, total: Double, items: Seq[String])
case class EnrichedOrder(
id: String,
customerId: String,
customerName: String,
total: Double,
itemCount: Int,
category: String
)
def transformOrders(query: SQLQuery): Future[Seq[EnrichedOrder]] = {
client.scrollAsUnchecked[RawOrder](query)
.map(_._1)
.mapAsync(parallelism = 4) { order =>
// Enrich with customer data
fetchCustomerName(order.customerId).map { customerName =>
EnrichedOrder(
id = order.id,
customerId = order.customerId,
customerName = customerName,
total = order.total,
itemCount = order.items.size,
category = categorizeOrder(order)
)
}
}
.filter(_.itemCount > 0) // Filter empty orders
.runWith(Sink.seq)
}
def fetchCustomerName(customerId: String): Future[String] = {
// Fetch from database or cache
Future.successful(s"Customer $customerId")
}
def categorizeOrder(order: RawOrder): String = {
if (order.total > 1000) "premium"
else if (order.total > 100) "standard"
else "basic"
}
// Usage
transformOrders(SQLQuery(query = "SELECT * FROM orders")).foreach { enriched =>
println(s"Transformed ${enriched.size} orders")
}The Scroll API provides:
✅ Automatic strategy selection for optimal performance
✅ Akka Streams integration for reactive processing
✅ Type-safe scrolling with automatic deserialization
✅ Built-in metrics tracking for monitoring
✅ Automatic error handling with retry logic
✅ Memory-efficient streaming for large datasets
✅ Flexible configuration for different use cases
Key Features by Strategy:
| Feature | PIT + search_after | search_after | Classic Scroll |
|---|---|---|---|
| ES Version | 7.10+ | All | All |
| Aggregations | ❌ | ❌ | ✅ |
| Consistent Snapshot | ✅ | ❌ | ✅ |
| Deep Pagination | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐ |
| Performance | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐ |
| Resource Usage | Low | Low | Medium |
| Timeout Issues | ❌ | ❌ |
When to Use:
- PIT + search_after: ES 7.10+, large datasets, no aggregations (recommended)
- search_after: ES < 7.10, large datasets, no aggregations
- Classic scroll: Any version with aggregations, or when consistent snapshot is required
Best Practices:
- ✅ Choose appropriate batch size based on document size
- ✅ Always set
maxDocumentsfor safety - ✅ Use typed scrolling when possible
- ✅ Handle backpressure with buffers
- ✅ Monitor progress with metrics
- ✅ Implement proper error handling
- ✅ Stream to file for large result sets
- ✅ Use parallel processing wisely
- ✅ Optimize queries (select only needed fields)
- ✅ Test with different scenarios
Performance Tips:
- 📊 Small documents (< 1KB): batch size 5000
- 📊 Medium documents (1-10KB): batch size 1000
- 📊 Large documents (> 10KB): batch size 100
- 🚀 Use parallelism = number of CPU cores
- 💾 Stream to file for > 100K documents
- ⏱️ Add throttling for rate-limited operations