Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,13 @@ public PayloadPipelineBuilder<I> registerStep(StepAssembler<? extends Indexable,
return this;
}

@SuppressWarnings("unchecked")
public PayloadPipelineBuilder<I> insertStep(StepAssembler<? extends Indexable, I> builder, int index)
{
this.steps.add(index, (StepAssembler<Indexable, I>) builder);
return this;
}

public PayloadPipelineBuilder<I> registerStepAssemblers(List<? extends StepAssembler<? extends Indexable, I>> assemblers)
{
for (StepAssembler<? extends Indexable, I> assembler : assemblers)
Expand Down Expand Up @@ -247,6 +254,12 @@ public PayloadPipelineBuilder<I> registerSink(SinkAssembler assembler)
return this;
}

public PayloadPipelineBuilder<I> insertSink(SinkAssembler assembler, int index)
{
this.sinks.add(index, assembler);
return this;
}

public PayloadPipelineBuilder<I> registerSinks(List<? extends Sink> sinks)
{
for (Sink sink : sinks)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,12 @@ public SimplePipelineBuilder<I> registerSink(SinkAssembler assembler)
return this;
}

public SimplePipelineBuilder<I> insertSink(SinkAssembler assembler, int index)
{
this.sinks.add(index, assembler);
return this;
}

public SimplePipelineBuilder<I> registerSinks(List<? extends Sink> sinks)
{
for (Sink sink : sinks)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,15 @@ public void test__simple()
.setDefaultStepErrorHandler(stepErrorHandler)
.setDefaultSinkErrorHandler(sinkErrorHandler)
.setDefaultEvaluator(resultEvaluator)
.registerStep(step1)
.registerStep(b -> b.step(step2))
.registerSteps(List.of(step3, step4))
.insertStep(b -> b.step(step1), 0)
.registerStepAssemblers(List.of(
b -> b.step(step5),
b -> b.step(step6)
))
.registerSink(sink)
.registerSinks(List.of(sink, sink))
.insertSink(b -> b.sink(sink), 0)
.registerObserver(stepCounter)
.registerOnCloseHandler(closeHandler)
);
Expand Down Expand Up @@ -98,8 +98,8 @@ public void test__payload()
(payload, results, context) -> new TestResult("2"),
(PayloadStep) (payload, results, context) -> new TestResult("3")
))
.registerSink(sink)
.registerSinks(List.of(sink, sink))
.insertSink(b -> b.sink(sink), 0)
.registerObserver(stepCounter)
.registerOnCloseHandler(closeHandler)
);
Expand Down Expand Up @@ -131,20 +131,22 @@ public void test__payload__withAssemblers()
.setDefaultStepErrorHandler(stepErrorHandler)
.setDefaultSinkErrorHandler(sinkErrorHandler)
.setDefaultEvaluator(resultEvaluator)
.registerStep(b -> b.withId("step-builder").step(step1))
.registerStepAssemblers(List.of(
b -> b.step(step2),
b -> b.step(step3)
))
.registerSink(b -> b
.withId("sink-builder")
.sink(sink)
.setAsync(true)
)
.insertStep(b -> b.withId("step-builder").step(step1), 0)
.registerSinkAssemblers(List.of(
b -> b.sink(sink),
b -> b.sink(sink).setAsync(true)
))
.insertSink(
b -> b
.withId("sink-builder")
.sink(sink)
.setAsync(true),
0
)
.registerOnCloseHandler(closeHandler)
);

Expand Down
Loading