Skip to content

Commit 312b38b

Browse files
fix size calculation in bulk ingester (#1167) (#1172)
Co-authored-by: Laura Trotta <153528055+l-trotta@users.noreply.github.com>
1 parent 27f0801 commit 312b38b

2 files changed

Lines changed: 51 additions & 34 deletions

File tree

java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ public class BulkIngester<Context> implements AutoCloseable {
7373
private BackoffPolicy backoffPolicy;
7474

7575
// Current state
76-
private List<RetryableBulkOperation<Context>> operations = new ArrayList<>();
76+
private List<IngesterOperation<Context>> operations = new ArrayList<>();
7777
private long currentSize;
7878
private int requestsInFlightCount;
7979
private volatile boolean isClosed = false;
@@ -190,7 +190,7 @@ public Duration flushInterval() {
190190
* The number of operations that have been buffered, waiting to be sent.
191191
*/
192192
public int pendingOperations() {
193-
List<RetryableBulkOperation<Context>> operations = this.operations;
193+
List<IngesterOperation<Context>> operations = this.operations;
194194
return operations == null ? 0 : operations.size();
195195
}
196196

@@ -296,26 +296,27 @@ private void failsafeFlush() {
296296
}
297297

298298
public void flush() {
299-
List<RetryableBulkOperation<Context>> sentRequests = new ArrayList<>();
299+
List<IngesterOperation<Context>> sentRequests = new ArrayList<>();
300300
RequestExecution<Context> exec = sendRequestCondition.whenReadyIf(
301301
() -> {
302302
// May happen on manual and periodic flushes
303303
return !operations.isEmpty() && operations.stream()
304-
.anyMatch(RetryableBulkOperation::isSendable);
304+
.anyMatch(IngesterOperation::isSendable);
305305
},
306306
() -> {
307307
// Selecting operations that can be sent immediately,
308308
// Dividing actual operations from contexts
309309
List<BulkOperation> immediateOps = new ArrayList<>();
310310
List<Context> contexts = new ArrayList<>();
311311

312-
for(Iterator<RetryableBulkOperation<Context>> it = operations.iterator(); it.hasNext();){
313-
RetryableBulkOperation<Context> op = it.next();
312+
for(Iterator<IngesterOperation<Context>> it = operations.iterator(); it.hasNext();){
313+
IngesterOperation<Context> op = it.next();
314314
if (op.isSendable()) {
315315
immediateOps.add(op.operation());
316316
contexts.add(op.context());
317317

318318
sentRequests.add(op);
319+
currentSize -= op.size();
319320
it.remove();
320321
}
321322
}
@@ -324,7 +325,6 @@ public void flush() {
324325
BulkRequest request = newRequest().operations(immediateOps).build();
325326

326327
// Prepare for next round
327-
currentSize = operations.size();
328328
addCondition.signalIfReady();
329329

330330
long id = sendRequestCondition.invocations();
@@ -362,7 +362,7 @@ public void flush() {
362362
// Partial success, retrying failed requests if policy allows it
363363
// Keeping list of retryable requests/responses, to exclude them for calling
364364
// listener later
365-
List<RetryableBulkOperation<Context>> retryableReq = new ArrayList<>();
365+
List<IngesterOperation<Context>> retryableReq = new ArrayList<>();
366366
List<RetryableBulkOperation<Context>> refires = new ArrayList<>();
367367
List<BulkResponseItem> retryableResp = new ArrayList<>();
368368

@@ -381,7 +381,7 @@ public void flush() {
381381
// Creating partial BulkRequest
382382
List<BulkOperation> partialOps = new ArrayList<>();
383383
List<Context> partialCtx = new ArrayList<>();
384-
for (RetryableBulkOperation<Context> op : sentRequests) {
384+
for (IngesterOperation<Context> op : sentRequests) {
385385
partialOps.add(op.operation());
386386
partialCtx.add(op.context());
387387
}
@@ -428,16 +428,17 @@ public void flush() {
428428
}
429429

430430
private void selectingRetries(int index, BulkResponseItem bulkItemResponse,
431-
List<RetryableBulkOperation<Context>> sentRequests,
431+
List<IngesterOperation<Context>> sentRequests,
432432
List<BulkResponseItem> retryableResp,
433-
List<RetryableBulkOperation<Context>> retryableReq,
433+
List<IngesterOperation<Context>> retryableReq,
434434
List<RetryableBulkOperation<Context>> refires) {
435435

436436
// Getting original failed, requests and keeping successful ones to send to the listener
437-
RetryableBulkOperation<Context> original = sentRequests.get(index);
437+
IngesterOperation<Context> original = sentRequests.get(index);
438438
if (original.canRetry()) {
439439
retryableResp.add(bulkItemResponse);
440-
Iterator<Long> retryTimes = Optional.ofNullable(original.retries()).orElse(backoffPolicy.iterator());
440+
RetryableBulkOperation<Context> repeatableOp = original.repeatableOperation();
441+
Iterator<Long> retryTimes = Optional.ofNullable(repeatableOp.retries()).orElse(backoffPolicy.iterator());
441442
RetryableBulkOperation<Context> refire = new RetryableBulkOperation<>(original.operation(), original.context(), retryTimes);
442443
retryableReq.add(original);
443444
refires.add(refire);
@@ -517,10 +518,10 @@ private void addRetry(RetryableBulkOperation<Context> repeatableOp) {
517518
}
518519

519520
private void innerAdd(RetryableBulkOperation<Context> repeatableOp) {
520-
IngesterOperation ingestOp = IngesterOperation.of(repeatableOp, client._jsonpMapper());
521+
IngesterOperation<Context> ingestOp = IngesterOperation.of(repeatableOp, client._jsonpMapper());
521522

522523
addCondition.whenReady(() -> {
523-
operations.add(ingestOp.repeatableOperation());
524+
operations.add(ingestOp);
524525
currentSize += ingestOp.size();
525526

526527
if (!canAddOperation()) {

java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/IngesterOperation.java

Lines changed: 35 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -35,16 +35,16 @@
3535
/**
3636
* A bulk operation whose size has been calculated and content turned to a binary blob (to compute its size).
3737
*/
38-
class IngesterOperation {
39-
private final RetryableBulkOperation repeatableOp;
38+
class IngesterOperation<Context> {
39+
private final RetryableBulkOperation<Context> repeatableOp;
4040
private final long size;
4141

42-
IngesterOperation(RetryableBulkOperation repeatableOp, long size) {
42+
IngesterOperation(RetryableBulkOperation<Context> repeatableOp, long size) {
4343
this.repeatableOp = repeatableOp;
4444
this.size = size;
4545
}
4646

47-
public static IngesterOperation of(RetryableBulkOperation repeatableOp, JsonpMapper mapper) {
47+
public static <Context> IngesterOperation<Context> of(RetryableBulkOperation<Context> repeatableOp, JsonpMapper mapper) {
4848
switch (repeatableOp.operation()._kind()) {
4949
case Create:
5050
return createOperation(repeatableOp, mapper);
@@ -59,17 +59,33 @@ public static IngesterOperation of(RetryableBulkOperation repeatableOp, JsonpMap
5959
}
6060
}
6161

62-
public RetryableBulkOperation repeatableOperation() {
62+
public RetryableBulkOperation<Context> repeatableOperation() {
6363
return this.repeatableOp;
6464
}
6565

6666
public long size() {
6767
return this.size;
6868
}
6969

70-
private static IngesterOperation createOperation(RetryableBulkOperation repeatableOp, JsonpMapper mapper) {
70+
public BulkOperation operation() {
71+
return repeatableOp.operation();
72+
}
73+
74+
public Context context() {
75+
return repeatableOp.context();
76+
}
77+
78+
public boolean isSendable() {
79+
return repeatableOp.isSendable();
80+
}
81+
82+
public boolean canRetry() {
83+
return repeatableOp.canRetry();
84+
}
85+
86+
private static <Context> IngesterOperation<Context> createOperation(RetryableBulkOperation<Context> repeatableOp, JsonpMapper mapper) {
7187
CreateOperation<?> create = repeatableOp.operation().create();
72-
RetryableBulkOperation newOperation;
88+
RetryableBulkOperation<Context> newOperation;
7389

7490
long size = basePropertiesSize(create);
7591

@@ -80,18 +96,18 @@ private static IngesterOperation createOperation(RetryableBulkOperation repeatab
8096
} else {
8197
BinaryData binaryDoc = BinaryData.of(create.document(), mapper);
8298
size += binaryDoc.size();
83-
newOperation = new RetryableBulkOperation(BulkOperation.of(bo -> bo.create(idx -> {
99+
newOperation = new RetryableBulkOperation<>(BulkOperation.of(bo -> bo.create(idx -> {
84100
copyCreateProperties(create, idx);
85101
return idx.document(binaryDoc);
86102
})),repeatableOp.context(),repeatableOp.retries());
87103
}
88104

89-
return new IngesterOperation(newOperation, size);
105+
return new IngesterOperation<>(newOperation, size);
90106
}
91107

92-
private static IngesterOperation indexOperation(RetryableBulkOperation repeatableOp, JsonpMapper mapper) {
108+
private static <Context> IngesterOperation<Context> indexOperation(RetryableBulkOperation<Context> repeatableOp, JsonpMapper mapper) {
93109
IndexOperation<?> index = repeatableOp.operation().index();
94-
RetryableBulkOperation newOperation;
110+
RetryableBulkOperation<Context> newOperation;
95111

96112
long size = basePropertiesSize(index);
97113

@@ -102,18 +118,18 @@ private static IngesterOperation indexOperation(RetryableBulkOperation repeatabl
102118
} else {
103119
BinaryData binaryDoc = BinaryData.of(index.document(), mapper);
104120
size += binaryDoc.size();
105-
newOperation = new RetryableBulkOperation(BulkOperation.of(bo -> bo.index(idx -> {
121+
newOperation = new RetryableBulkOperation<>(BulkOperation.of(bo -> bo.index(idx -> {
106122
copyIndexProperties(index, idx);
107123
return idx.document(binaryDoc);
108124
})),repeatableOp.context(),repeatableOp.retries());
109125
}
110126

111-
return new IngesterOperation(newOperation, size);
127+
return new IngesterOperation<>(newOperation, size);
112128
}
113129

114-
private static IngesterOperation updateOperation(RetryableBulkOperation repeatableOp, JsonpMapper mapper) {
130+
private static <Context> IngesterOperation<Context> updateOperation(RetryableBulkOperation<Context> repeatableOp, JsonpMapper mapper) {
115131
UpdateOperation<?, ?> update = repeatableOp.operation().update();
116-
RetryableBulkOperation newOperation;
132+
RetryableBulkOperation<Context> newOperation;
117133

118134
long size = basePropertiesSize(update) +
119135
size("retry_on_conflict", update.retryOnConflict()) +
@@ -126,7 +142,7 @@ private static IngesterOperation updateOperation(RetryableBulkOperation repeatab
126142
} else {
127143
BinaryData action = BinaryData.of(update.action(), mapper);
128144
size += action.size();
129-
newOperation = new RetryableBulkOperation(BulkOperation.of(bo -> bo.update(u -> {
145+
newOperation = new RetryableBulkOperation<>(BulkOperation.of(bo -> bo.update(u -> {
130146
copyBaseProperties(update, u);
131147
return u
132148
.binaryAction(action)
@@ -135,12 +151,12 @@ private static IngesterOperation updateOperation(RetryableBulkOperation repeatab
135151
})),repeatableOp.context(),repeatableOp.retries());
136152
}
137153

138-
return new IngesterOperation(newOperation, size);
154+
return new IngesterOperation<>(newOperation, size);
139155
}
140156

141-
private static IngesterOperation deleteOperation(RetryableBulkOperation repeatableOp) {
157+
private static <Context> IngesterOperation<Context> deleteOperation(RetryableBulkOperation<Context> repeatableOp) {
142158
DeleteOperation delete = repeatableOp.operation().delete();
143-
return new IngesterOperation(repeatableOp, basePropertiesSize(delete));
159+
return new IngesterOperation<>(repeatableOp, basePropertiesSize(delete));
144160
}
145161

146162

0 commit comments

Comments
 (0)