Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 38 additions & 20 deletions src/com/nutrons/framework/commands/Command.java
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
package com.nutrons.framework.commands;

import static com.nutrons.framework.util.FlowOperators.toFlow;

import io.reactivex.Flowable;
import io.reactivex.Observable;
import io.reactivex.disposables.Disposable;
import io.reactivex.flowables.ConnectableFlowable;
import io.reactivex.schedulers.Schedulers;
import org.reactivestreams.Publisher;

import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import static com.nutrons.framework.util.FlowOperators.toFlow;
import org.reactivestreams.Publisher;

public class Command implements CommandWorkUnit {

Expand Down Expand Up @@ -71,23 +70,35 @@ public static Command parallel(Command... commands) {
return new Command(new ParallelCommand(commands));
}

public static Command fromSwitch(Publisher<? extends CommandWorkUnit> commandStream, boolean subcommandsSelfTerminate) {
return fromSwitch(commandStream, subcommandsSelfTerminate, true);
}

/**
* Creates a command that runs sequentially to another.
* Creates a command that runs commands in a stream.
*
* @param commandStream A flowable of commands
* @param commandStream A flowable of commands
* @param subcommandsSelfTerminate if true, commands in the stream will self terminate;
* @param subcommandsForceTerminate if true, commands in the stream will terminate the previous command.
* @retuns Second command after first is executed.
*/
public static Command fromSwitch(Publisher<? extends CommandWorkUnit> commandStream) {
return new Command(x -> Flowable.defer(() ->
Flowable.switchOnNext(Flowable.fromPublisher(commandStream).map(y -> y.execute(x))
.subscribeOn(Schedulers.io()))).scan((a, b) -> {
a.run();
return b;
}));
public static Command fromSwitch(Publisher<? extends CommandWorkUnit> commandStream,
boolean subcommandsSelfTerminate,
boolean subcommandsForceTerminate) {
return new Command(x -> Flowable.fromPublisher(commandStream)
.concatMap(y -> Flowable.<Terminator>just(FlattenedTerminator.from(y.execute(subcommandsSelfTerminate)))
.subscribeOn(Schedulers.io()))
.scan((a, b) -> {
if (subcommandsForceTerminate) {
a.run();
}
return b;
}).replay().autoConnect());
}

public Command addFinalTerminator(Terminator terminator) {
return Command.just(x -> this.source.execute(x).flatMap(y -> Flowable.<Terminator>just(y, terminator)).subscribeOn(Schedulers.io()));
return Command.just(x -> this.source.execute(x)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need on backpressure drop here?

.flatMap(y -> Flowable.<Terminator>just(y, terminator)).subscribeOn(Schedulers.io()));
}

/**
Expand Down Expand Up @@ -125,7 +136,7 @@ public Command terminable(Publisher<?> terminator) {
*/
public Command endsWhen(Publisher<?> terminator, boolean terminatesAtEnd) {
return Command.just(x -> {
Flowable<Terminator> sourceTerminator = this.execute(terminatesAtEnd);
Flowable<? extends Terminator> sourceTerminator = this.execute(terminatesAtEnd);
Terminator multi = FlattenedTerminator.from(sourceTerminator);
return Flowable.defer(() -> Flowable.<Terminator>never().takeUntil(terminator)
.mergeWith(Flowable.just(multi::run)));
Expand All @@ -137,7 +148,8 @@ public Command endsWhen(Publisher<?> terminator, boolean terminatesAtEnd) {
* will only complete once endCondition returns true.
*/
public Command until(Supplier<Boolean> endCondition) {
ConnectableFlowable<?> terminator = emptyPulse.map(x -> endCondition.get()).filter(x -> x).onBackpressureDrop().publish();
ConnectableFlowable<?> terminator = emptyPulse.map(x -> endCondition.get()).filter(x -> x)
.onBackpressureDrop().publish();
terminator.connect();
return this.terminable(terminator);
}
Expand All @@ -163,18 +175,24 @@ public Command delayFinish(long delay, TimeUnit unit) {
return this.terminable(Flowable.timer(delay, unit));
}

/**
* End and terminate this command only after the specified time has passed.
*/
public Command killAfter(long delay, TimeUnit unit) {
return Command.just(x -> {
Flowable<Terminator> terms = this.terminable(Flowable.timer(delay, unit)).execute(x);
Flowable<? extends Terminator> terms = this.terminable(Flowable.timer(delay, unit))
.execute(x);
return terms;
});
}

@Override
public Flowable<Terminator> execute(boolean selfTerminating) {
Flowable<Terminator> terms = source.execute(selfTerminating).subscribeOn(Schedulers.io());
public Flowable<? extends Terminator> execute(boolean selfTerminating) {
Flowable<? extends Terminator> terms = source.execute(selfTerminating)
.subscribeOn(Schedulers.io());
if (selfTerminating) {
terms.toList().subscribe(x -> Observable.fromIterable(x).blockingSubscribe(Terminator::run));
terms.toList().map(Observable::fromIterable).subscribeOn(Schedulers.io())
.subscribe(x -> x.subscribe(Terminator::run));
}
return terms;
}
Expand Down
2 changes: 1 addition & 1 deletion src/com/nutrons/framework/commands/CommandWorkUnit.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@
import io.reactivex.Flowable;

public interface CommandWorkUnit {
Flowable<Terminator> execute(boolean selfTerminating);
Flowable<? extends Terminator> execute(boolean selfTerminating);
}
6 changes: 3 additions & 3 deletions src/com/nutrons/framework/commands/FlattenedTerminator.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ class FlattenedTerminator implements Terminator {

private final AtomicBoolean lock;
private final ArrayList<Terminator> terminators;
private final Flowable<Terminator> terminatorStream;
private final Flowable<? extends Terminator> terminatorStream;

private FlattenedTerminator(Flowable<Terminator> terminators) {
private FlattenedTerminator(Flowable<? extends Terminator> terminators) {
this.lock = new AtomicBoolean(false);
this.terminators = new ArrayList<>();
this.terminatorStream = terminators;
Expand All @@ -29,7 +29,7 @@ private FlattenedTerminator(Flowable<Terminator> terminators) {
});
}

static FlattenedTerminator from(Flowable<Terminator> terminators) {
static FlattenedTerminator from(Flowable<? extends Terminator> terminators) {
return new FlattenedTerminator(terminators);
}

Expand Down
30 changes: 4 additions & 26 deletions test/com/nutrons/framework/test/MultiCommandTest.java
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
package com.nutrons.framework.test;

import static com.nutrons.framework.commands.Command.parallel;
import static junit.framework.TestCase.assertTrue;

import com.nutrons.framework.commands.Command;
import com.nutrons.framework.commands.TerminatorWrapper;
import io.reactivex.Flowable;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Test;

import java.util.concurrent.TimeUnit;

import static com.nutrons.framework.commands.Command.parallel;
import static junit.framework.TestCase.assertTrue;

public class MultiCommandTest {
private Command delay;

Expand Down Expand Up @@ -41,24 +39,4 @@ public void testOneThenAnother() throws InterruptedException {
Thread.sleep(2000);
assertTrue(record[0] == 0);
}

@Test
public void testSwitch() throws InterruptedException {
int[] record = new int[1];
record[0] = 0;
Command inc = Command.just(x -> {
synchronized (record) {
record[0] += 1;
}
return Flowable.just(new TerminatorWrapper(() -> {
synchronized (record) {
record[0] -= 1;
}
}));
});
Command.fromSwitch(Flowable.interval(1, TimeUnit.SECONDS).map(x -> inc).take(5))
.execute(true).blockingSubscribe();
Thread.sleep(2000);
assertTrue(record[0] == 0);
}
}
29 changes: 19 additions & 10 deletions test/com/nutrons/framework/test/TestCommand.java
Original file line number Diff line number Diff line change
@@ -1,24 +1,23 @@
package com.nutrons.framework.test;

import static com.nutrons.framework.commands.Command.parallel;
import static com.nutrons.framework.commands.Command.serial;
import static junit.framework.TestCase.assertTrue;

import com.nutrons.framework.commands.Command;
import com.nutrons.framework.commands.Terminator;
import io.reactivex.Flowable;
import io.reactivex.processors.PublishProcessor;
import io.reactivex.schedulers.Schedulers;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Test;

import java.util.concurrent.TimeUnit;

import static com.nutrons.framework.commands.Command.parallel;
import static com.nutrons.framework.commands.Command.serial;
import static junit.framework.TestCase.assertTrue;

public class TestCommand {

private Command delay;

static void waitForCommand(Flowable<Terminator> commandExecution) {
static void waitForCommand(Flowable<? extends Terminator> commandExecution) {
commandExecution.blockingSubscribe();
}

Expand Down Expand Up @@ -72,7 +71,7 @@ public void inParallelTimed() {
public void testTerminable() throws InterruptedException {
final long start = System.currentTimeMillis();
PublishProcessor pp = PublishProcessor.create();
Flowable<Terminator> td = serial(delay, delay, delay, delay)
final Flowable<? extends Terminator> td = serial(delay, delay, delay, delay)
.terminable(pp).execute(true);
Thread.sleep(3000);
pp.onNext(new Object());
Expand All @@ -86,7 +85,8 @@ public void testUntil() throws InterruptedException {
int[] record = new int[2];
assertTrue(record[0] == 0);
long start = System.currentTimeMillis();
Flowable<Terminator> td = Command.fromAction(() -> record[0] = 1).until(() -> record[1] == 1).execute(true);
Flowable<? extends Terminator> td = Command.fromAction(() -> record[0] = 1)
.until(() -> record[1] == 1).execute(true);
Flowable.timer(1, TimeUnit.SECONDS).subscribeOn(Schedulers.io()).subscribe(x -> record[1] = 1);
waitForCommand(td);
assertTrue(System.currentTimeMillis() - 1000 > start);
Expand All @@ -109,7 +109,7 @@ public void testStartable() {
public void testWhen() throws InterruptedException {
int[] record = new int[2];
assertTrue(record[0] == 0);
final Flowable<Terminator> td = Command.fromAction(() -> record[0] = 1)
final Flowable<? extends Terminator> td = Command.fromAction(() -> record[0] = 1)
.when(() -> record[1] == 1)
.execute(true);
Thread.sleep(1000);
Expand Down Expand Up @@ -141,4 +141,13 @@ public void killAfter() throws InterruptedException {
Thread.sleep(4000);
assertTrue(record[0] == 1);
}

@Test
public void notSelfTerminating() throws InterruptedException {
Command doesntTermrinate = Command.just(x ->
Flowable.<Terminator>just(() -> assertTrue(false))
.mergeWith(Flowable.never()));
doesntTermrinate.execute(true);
Thread.sleep(2000);
}
}
94 changes: 94 additions & 0 deletions test/com/nutrons/framework/test/TestSwitchCommand.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package com.nutrons.framework.test;

import static junit.framework.TestCase.assertFalse;
import static junit.framework.TestCase.assertTrue;

import com.nutrons.framework.commands.Command;
import com.nutrons.framework.commands.TerminatorWrapper;
import io.reactivex.Flowable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.junit.Test;

public class TestSwitchCommand {

@Test
public void testSwitchStart() throws InterruptedException {
List<Long> list = new ArrayList<>();
Flowable<Command> commandStream = Flowable.interval(100, TimeUnit.MILLISECONDS)
.map(x -> putNumber(list, x)).take(5);
Command.fromSwitch(commandStream, true).execute(true);
Thread.sleep(300);
assertFalse(list.contains(4));
Thread.sleep(500);
for (long i = 0; i < 5; i++) {
assertTrue(list.contains(i));
}
}

private Command putNumber(List<Long> list, long number) {
return Command.fromAction(() -> list.add(number));
}


@Test
public void testSwitchStartAndStop() throws InterruptedException {
int[] record = new int[1];
record[0] = 0;
Command inc = Command.just(x -> {
synchronized (record) {
record[0] += 1;
}
return Flowable.just(new TerminatorWrapper(() -> {
synchronized (record) {
record[0] -= 1;
}
}));
});
Command.fromSwitch(Flowable.interval(1, TimeUnit.SECONDS).map(x -> inc).take(5), true)
.execute(true).blockingSubscribe();
Thread.sleep(2000);
assertTrue(record[0] == 0);
}

@Test
public void testSwitchTerminateRealtime() throws InterruptedException {
int[] record = new int[1];
long start = System.currentTimeMillis();
record[0] = 0;
Command inc = Command.just(x -> Flowable.just(new TerminatorWrapper(() -> {
synchronized (record) {
record[0] += 1;
}
})));
Command.fromSwitch(Flowable.interval(1, TimeUnit.SECONDS).map(x -> inc).take(5), true)
.execute(true);
assertTrue(System.currentTimeMillis() - start < 1000);
Thread.sleep(2000);
assertTrue(record[0] < 5);
assertTrue(record[0] > 0);
}

@Test
public void testSwitchNotTerminating() throws InterruptedException {
Command doesntFinish = Command.just(x -> Flowable.just(() -> assertTrue(false)));
doesntFinish.execute(false);
Thread.sleep(1000);
Command justOne = Command.fromSwitch(Flowable.<Command>never()
.mergeWith(Flowable.just(doesntFinish)), false);
justOne.execute(false);
Thread.sleep(1000);
justOne.execute(true);
Thread.sleep(1000);
}

@Test(expected = RuntimeException.class)
public void testSwitchTerminatesOnNext() {
Command doesntFinish = Command.just(x -> Flowable.just(() -> {
throw new RuntimeException();
}));
Command two = Command.fromSwitch(Flowable.just(doesntFinish, doesntFinish), false);
two.execute(false).blockingSubscribe();
}
}