diff --git a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java index 68f4232daa..281ffdc08e 100644 --- a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java +++ b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java @@ -73,7 +73,7 @@ public class BulkIngester implements AutoCloseable { private BackoffPolicy backoffPolicy; // Current state - private List> operations = new ArrayList<>(); + private List> operations = new ArrayList<>(); private long currentSize; private int requestsInFlightCount; private volatile boolean isClosed = false; @@ -190,7 +190,7 @@ public Duration flushInterval() { * The number of operations that have been buffered, waiting to be sent. */ public int pendingOperations() { - List> operations = this.operations; + List> operations = this.operations; return operations == null ? 0 : operations.size(); } @@ -296,12 +296,12 @@ private void failsafeFlush() { } public void flush() { - List> sentRequests = new ArrayList<>(); + List> sentRequests = new ArrayList<>(); RequestExecution exec = sendRequestCondition.whenReadyIf( () -> { // May happen on manual and periodic flushes return !operations.isEmpty() && operations.stream() - .anyMatch(RetryableBulkOperation::isSendable); + .anyMatch(IngesterOperation::isSendable); }, () -> { // Selecting operations that can be sent immediately, @@ -309,13 +309,14 @@ public void flush() { List immediateOps = new ArrayList<>(); List contexts = new ArrayList<>(); - for(Iterator> it = operations.iterator(); it.hasNext();){ - RetryableBulkOperation op = it.next(); + for(Iterator> it = operations.iterator(); it.hasNext();){ + IngesterOperation op = it.next(); if (op.isSendable()) { immediateOps.add(op.operation()); contexts.add(op.context()); sentRequests.add(op); + currentSize -= op.size(); it.remove(); } } @@ -324,7 +325,6 @@ public void flush() { BulkRequest request = newRequest().operations(immediateOps).build(); // Prepare for next round - currentSize = operations.size(); addCondition.signalIfReady(); long id = sendRequestCondition.invocations(); @@ -362,7 +362,7 @@ public void flush() { // Partial success, retrying failed requests if policy allows it // Keeping list of retryable requests/responses, to exclude them for calling // listener later - List> retryableReq = new ArrayList<>(); + List> retryableReq = new ArrayList<>(); List> refires = new ArrayList<>(); List retryableResp = new ArrayList<>(); @@ -381,7 +381,7 @@ public void flush() { // Creating partial BulkRequest List partialOps = new ArrayList<>(); List partialCtx = new ArrayList<>(); - for (RetryableBulkOperation op : sentRequests) { + for (IngesterOperation op : sentRequests) { partialOps.add(op.operation()); partialCtx.add(op.context()); } @@ -428,16 +428,17 @@ public void flush() { } private void selectingRetries(int index, BulkResponseItem bulkItemResponse, - List> sentRequests, + List> sentRequests, List retryableResp, - List> retryableReq, + List> retryableReq, List> refires) { // Getting original failed, requests and keeping successful ones to send to the listener - RetryableBulkOperation original = sentRequests.get(index); + IngesterOperation original = sentRequests.get(index); if (original.canRetry()) { retryableResp.add(bulkItemResponse); - Iterator retryTimes = Optional.ofNullable(original.retries()).orElse(backoffPolicy.iterator()); + RetryableBulkOperation repeatableOp = original.repeatableOperation(); + Iterator retryTimes = Optional.ofNullable(repeatableOp.retries()).orElse(backoffPolicy.iterator()); RetryableBulkOperation refire = new RetryableBulkOperation<>(original.operation(), original.context(), retryTimes); retryableReq.add(original); refires.add(refire); @@ -517,10 +518,10 @@ private void addRetry(RetryableBulkOperation repeatableOp) { } private void innerAdd(RetryableBulkOperation repeatableOp) { - IngesterOperation ingestOp = IngesterOperation.of(repeatableOp, client._jsonpMapper()); + IngesterOperation ingestOp = IngesterOperation.of(repeatableOp, client._jsonpMapper()); addCondition.whenReady(() -> { - operations.add(ingestOp.repeatableOperation()); + operations.add(ingestOp); currentSize += ingestOp.size(); if (!canAddOperation()) { diff --git a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/IngesterOperation.java b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/IngesterOperation.java index 5467cc1124..36d51642ca 100644 --- a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/IngesterOperation.java +++ b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/IngesterOperation.java @@ -35,16 +35,16 @@ /** * A bulk operation whose size has been calculated and content turned to a binary blob (to compute its size). */ -class IngesterOperation { - private final RetryableBulkOperation repeatableOp; +class IngesterOperation { + private final RetryableBulkOperation repeatableOp; private final long size; - IngesterOperation(RetryableBulkOperation repeatableOp, long size) { + IngesterOperation(RetryableBulkOperation repeatableOp, long size) { this.repeatableOp = repeatableOp; this.size = size; } - public static IngesterOperation of(RetryableBulkOperation repeatableOp, JsonpMapper mapper) { + public static IngesterOperation of(RetryableBulkOperation repeatableOp, JsonpMapper mapper) { switch (repeatableOp.operation()._kind()) { case Create: return createOperation(repeatableOp, mapper); @@ -59,7 +59,7 @@ public static IngesterOperation of(RetryableBulkOperation repeatableOp, JsonpMap } } - public RetryableBulkOperation repeatableOperation() { + public RetryableBulkOperation repeatableOperation() { return this.repeatableOp; } @@ -67,9 +67,25 @@ public long size() { return this.size; } - private static IngesterOperation createOperation(RetryableBulkOperation repeatableOp, JsonpMapper mapper) { + public BulkOperation operation() { + return repeatableOp.operation(); + } + + public Context context() { + return repeatableOp.context(); + } + + public boolean isSendable() { + return repeatableOp.isSendable(); + } + + public boolean canRetry() { + return repeatableOp.canRetry(); + } + + private static IngesterOperation createOperation(RetryableBulkOperation repeatableOp, JsonpMapper mapper) { CreateOperation create = repeatableOp.operation().create(); - RetryableBulkOperation newOperation; + RetryableBulkOperation newOperation; long size = basePropertiesSize(create); @@ -80,18 +96,18 @@ private static IngesterOperation createOperation(RetryableBulkOperation repeatab } else { BinaryData binaryDoc = BinaryData.of(create.document(), mapper); size += binaryDoc.size(); - newOperation = new RetryableBulkOperation(BulkOperation.of(bo -> bo.create(idx -> { + newOperation = new RetryableBulkOperation<>(BulkOperation.of(bo -> bo.create(idx -> { copyCreateProperties(create, idx); return idx.document(binaryDoc); })),repeatableOp.context(),repeatableOp.retries()); } - return new IngesterOperation(newOperation, size); + return new IngesterOperation<>(newOperation, size); } - private static IngesterOperation indexOperation(RetryableBulkOperation repeatableOp, JsonpMapper mapper) { + private static IngesterOperation indexOperation(RetryableBulkOperation repeatableOp, JsonpMapper mapper) { IndexOperation index = repeatableOp.operation().index(); - RetryableBulkOperation newOperation; + RetryableBulkOperation newOperation; long size = basePropertiesSize(index); @@ -102,18 +118,18 @@ private static IngesterOperation indexOperation(RetryableBulkOperation repeatabl } else { BinaryData binaryDoc = BinaryData.of(index.document(), mapper); size += binaryDoc.size(); - newOperation = new RetryableBulkOperation(BulkOperation.of(bo -> bo.index(idx -> { + newOperation = new RetryableBulkOperation<>(BulkOperation.of(bo -> bo.index(idx -> { copyIndexProperties(index, idx); return idx.document(binaryDoc); })),repeatableOp.context(),repeatableOp.retries()); } - return new IngesterOperation(newOperation, size); + return new IngesterOperation<>(newOperation, size); } - private static IngesterOperation updateOperation(RetryableBulkOperation repeatableOp, JsonpMapper mapper) { + private static IngesterOperation updateOperation(RetryableBulkOperation repeatableOp, JsonpMapper mapper) { UpdateOperation update = repeatableOp.operation().update(); - RetryableBulkOperation newOperation; + RetryableBulkOperation newOperation; long size = basePropertiesSize(update) + size("retry_on_conflict", update.retryOnConflict()) + @@ -126,7 +142,7 @@ private static IngesterOperation updateOperation(RetryableBulkOperation repeatab } else { BinaryData action = BinaryData.of(update.action(), mapper); size += action.size(); - newOperation = new RetryableBulkOperation(BulkOperation.of(bo -> bo.update(u -> { + newOperation = new RetryableBulkOperation<>(BulkOperation.of(bo -> bo.update(u -> { copyBaseProperties(update, u); return u .binaryAction(action) @@ -135,12 +151,12 @@ private static IngesterOperation updateOperation(RetryableBulkOperation repeatab })),repeatableOp.context(),repeatableOp.retries()); } - return new IngesterOperation(newOperation, size); + return new IngesterOperation<>(newOperation, size); } - private static IngesterOperation deleteOperation(RetryableBulkOperation repeatableOp) { + private static IngesterOperation deleteOperation(RetryableBulkOperation repeatableOp) { DeleteOperation delete = repeatableOp.operation().delete(); - return new IngesterOperation(repeatableOp, basePropertiesSize(delete)); + return new IngesterOperation<>(repeatableOp, basePropertiesSize(delete)); }