Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public class BulkIngester<Context> implements AutoCloseable {
private BackoffPolicy backoffPolicy;

// Current state
private List<RetryableBulkOperation<Context>> operations = new ArrayList<>();
private List<IngesterOperation<Context>> operations = new ArrayList<>();
private long currentSize;
private int requestsInFlightCount;
private volatile boolean isClosed = false;
Expand Down Expand Up @@ -190,7 +190,7 @@ public Duration flushInterval() {
* The number of operations that have been buffered, waiting to be sent.
*/
public int pendingOperations() {
List<RetryableBulkOperation<Context>> operations = this.operations;
List<IngesterOperation<Context>> operations = this.operations;
return operations == null ? 0 : operations.size();
}

Expand Down Expand Up @@ -296,26 +296,27 @@ private void failsafeFlush() {
}

public void flush() {
List<RetryableBulkOperation<Context>> sentRequests = new ArrayList<>();
List<IngesterOperation<Context>> sentRequests = new ArrayList<>();
RequestExecution<Context> exec = sendRequestCondition.whenReadyIf(
() -> {
// May happen on manual and periodic flushes
return !operations.isEmpty() && operations.stream()
.anyMatch(RetryableBulkOperation::isSendable);
.anyMatch(IngesterOperation::isSendable);
},
() -> {
// Selecting operations that can be sent immediately,
// Dividing actual operations from contexts
List<BulkOperation> immediateOps = new ArrayList<>();
List<Context> contexts = new ArrayList<>();

for(Iterator<RetryableBulkOperation<Context>> it = operations.iterator(); it.hasNext();){
RetryableBulkOperation<Context> op = it.next();
for(Iterator<IngesterOperation<Context>> it = operations.iterator(); it.hasNext();){
IngesterOperation<Context> op = it.next();
if (op.isSendable()) {
immediateOps.add(op.operation());
contexts.add(op.context());

sentRequests.add(op);
currentSize -= op.size();
it.remove();
}
}
Expand All @@ -324,7 +325,6 @@ public void flush() {
BulkRequest request = newRequest().operations(immediateOps).build();

// Prepare for next round
currentSize = operations.size();
addCondition.signalIfReady();

long id = sendRequestCondition.invocations();
Expand Down Expand Up @@ -362,7 +362,7 @@ public void flush() {
// Partial success, retrying failed requests if policy allows it
// Keeping list of retryable requests/responses, to exclude them for calling
// listener later
List<RetryableBulkOperation<Context>> retryableReq = new ArrayList<>();
List<IngesterOperation<Context>> retryableReq = new ArrayList<>();
List<RetryableBulkOperation<Context>> refires = new ArrayList<>();
List<BulkResponseItem> retryableResp = new ArrayList<>();

Expand All @@ -381,7 +381,7 @@ public void flush() {
// Creating partial BulkRequest
List<BulkOperation> partialOps = new ArrayList<>();
List<Context> partialCtx = new ArrayList<>();
for (RetryableBulkOperation<Context> op : sentRequests) {
for (IngesterOperation<Context> op : sentRequests) {
partialOps.add(op.operation());
partialCtx.add(op.context());
}
Expand Down Expand Up @@ -428,16 +428,17 @@ public void flush() {
}

private void selectingRetries(int index, BulkResponseItem bulkItemResponse,
List<RetryableBulkOperation<Context>> sentRequests,
List<IngesterOperation<Context>> sentRequests,
List<BulkResponseItem> retryableResp,
List<RetryableBulkOperation<Context>> retryableReq,
List<IngesterOperation<Context>> retryableReq,
List<RetryableBulkOperation<Context>> refires) {

// Getting original failed, requests and keeping successful ones to send to the listener
RetryableBulkOperation<Context> original = sentRequests.get(index);
IngesterOperation<Context> original = sentRequests.get(index);
if (original.canRetry()) {
retryableResp.add(bulkItemResponse);
Iterator<Long> retryTimes = Optional.ofNullable(original.retries()).orElse(backoffPolicy.iterator());
RetryableBulkOperation<Context> repeatableOp = original.repeatableOperation();
Iterator<Long> retryTimes = Optional.ofNullable(repeatableOp.retries()).orElse(backoffPolicy.iterator());
RetryableBulkOperation<Context> refire = new RetryableBulkOperation<>(original.operation(), original.context(), retryTimes);
retryableReq.add(original);
refires.add(refire);
Expand Down Expand Up @@ -517,10 +518,10 @@ private void addRetry(RetryableBulkOperation<Context> repeatableOp) {
}

private void innerAdd(RetryableBulkOperation<Context> repeatableOp) {
IngesterOperation ingestOp = IngesterOperation.of(repeatableOp, client._jsonpMapper());
IngesterOperation<Context> ingestOp = IngesterOperation.of(repeatableOp, client._jsonpMapper());

addCondition.whenReady(() -> {
operations.add(ingestOp.repeatableOperation());
operations.add(ingestOp);
currentSize += ingestOp.size();

if (!canAddOperation()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,16 @@
/**
* A bulk operation whose size has been calculated and content turned to a binary blob (to compute its size).
*/
class IngesterOperation {
private final RetryableBulkOperation repeatableOp;
class IngesterOperation<Context> {
private final RetryableBulkOperation<Context> repeatableOp;
private final long size;

IngesterOperation(RetryableBulkOperation repeatableOp, long size) {
IngesterOperation(RetryableBulkOperation<Context> repeatableOp, long size) {
this.repeatableOp = repeatableOp;
this.size = size;
}

public static IngesterOperation of(RetryableBulkOperation repeatableOp, JsonpMapper mapper) {
public static <Context> IngesterOperation<Context> of(RetryableBulkOperation<Context> repeatableOp, JsonpMapper mapper) {
switch (repeatableOp.operation()._kind()) {
case Create:
return createOperation(repeatableOp, mapper);
Expand All @@ -59,17 +59,33 @@ public static IngesterOperation of(RetryableBulkOperation repeatableOp, JsonpMap
}
}

public RetryableBulkOperation repeatableOperation() {
public RetryableBulkOperation<Context> repeatableOperation() {
return this.repeatableOp;
}

public long size() {
return this.size;
}

private static IngesterOperation createOperation(RetryableBulkOperation repeatableOp, JsonpMapper mapper) {
public BulkOperation operation() {
return repeatableOp.operation();
}

public Context context() {
return repeatableOp.context();
}

public boolean isSendable() {
return repeatableOp.isSendable();
}

public boolean canRetry() {
return repeatableOp.canRetry();
}

private static <Context> IngesterOperation<Context> createOperation(RetryableBulkOperation<Context> repeatableOp, JsonpMapper mapper) {
CreateOperation<?> create = repeatableOp.operation().create();
RetryableBulkOperation newOperation;
RetryableBulkOperation<Context> newOperation;

long size = basePropertiesSize(create);

Expand All @@ -80,18 +96,18 @@ private static IngesterOperation createOperation(RetryableBulkOperation repeatab
} else {
BinaryData binaryDoc = BinaryData.of(create.document(), mapper);
size += binaryDoc.size();
newOperation = new RetryableBulkOperation(BulkOperation.of(bo -> bo.create(idx -> {
newOperation = new RetryableBulkOperation<>(BulkOperation.of(bo -> bo.create(idx -> {
copyCreateProperties(create, idx);
return idx.document(binaryDoc);
})),repeatableOp.context(),repeatableOp.retries());
}

return new IngesterOperation(newOperation, size);
return new IngesterOperation<>(newOperation, size);
}

private static IngesterOperation indexOperation(RetryableBulkOperation repeatableOp, JsonpMapper mapper) {
private static <Context> IngesterOperation<Context> indexOperation(RetryableBulkOperation<Context> repeatableOp, JsonpMapper mapper) {
IndexOperation<?> index = repeatableOp.operation().index();
RetryableBulkOperation newOperation;
RetryableBulkOperation<Context> newOperation;

long size = basePropertiesSize(index);

Expand All @@ -102,18 +118,18 @@ private static IngesterOperation indexOperation(RetryableBulkOperation repeatabl
} else {
BinaryData binaryDoc = BinaryData.of(index.document(), mapper);
size += binaryDoc.size();
newOperation = new RetryableBulkOperation(BulkOperation.of(bo -> bo.index(idx -> {
newOperation = new RetryableBulkOperation<>(BulkOperation.of(bo -> bo.index(idx -> {
copyIndexProperties(index, idx);
return idx.document(binaryDoc);
})),repeatableOp.context(),repeatableOp.retries());
}

return new IngesterOperation(newOperation, size);
return new IngesterOperation<>(newOperation, size);
}

private static IngesterOperation updateOperation(RetryableBulkOperation repeatableOp, JsonpMapper mapper) {
private static <Context> IngesterOperation<Context> updateOperation(RetryableBulkOperation<Context> repeatableOp, JsonpMapper mapper) {
UpdateOperation<?, ?> update = repeatableOp.operation().update();
RetryableBulkOperation newOperation;
RetryableBulkOperation<Context> newOperation;

long size = basePropertiesSize(update) +
size("retry_on_conflict", update.retryOnConflict()) +
Expand All @@ -126,7 +142,7 @@ private static IngesterOperation updateOperation(RetryableBulkOperation repeatab
} else {
BinaryData action = BinaryData.of(update.action(), mapper);
size += action.size();
newOperation = new RetryableBulkOperation(BulkOperation.of(bo -> bo.update(u -> {
newOperation = new RetryableBulkOperation<>(BulkOperation.of(bo -> bo.update(u -> {
copyBaseProperties(update, u);
return u
.binaryAction(action)
Expand All @@ -135,12 +151,12 @@ private static IngesterOperation updateOperation(RetryableBulkOperation repeatab
})),repeatableOp.context(),repeatableOp.retries());
}

return new IngesterOperation(newOperation, size);
return new IngesterOperation<>(newOperation, size);
}

private static IngesterOperation deleteOperation(RetryableBulkOperation repeatableOp) {
private static <Context> IngesterOperation<Context> deleteOperation(RetryableBulkOperation<Context> repeatableOp) {
DeleteOperation delete = repeatableOp.operation().delete();
return new IngesterOperation(repeatableOp, basePropertiesSize(delete));
return new IngesterOperation<>(repeatableOp, basePropertiesSize(delete));
}


Expand Down