diff --git a/.idea/FRamework.iml b/.idea/FRamework.iml index 8696830..0a84278 100644 --- a/.idea/FRamework.iml +++ b/.idea/FRamework.iml @@ -12,5 +12,14 @@ + + + + + + + + + \ No newline at end of file diff --git a/FRamework.iml b/FRamework.iml new file mode 100644 index 0000000..5f4ccd1 --- /dev/null +++ b/FRamework.iml @@ -0,0 +1,15 @@ + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/docs/google_checks.xml b/docs/google_checks.xml index 5c47285..20b86ba 100644 --- a/docs/google_checks.xml +++ b/docs/google_checks.xml @@ -108,32 +108,32 @@ value="Type name ''{0}'' must match pattern ''{1}''."/> - + - + - + - + - + @@ -221,7 +221,7 @@ - + diff --git a/ivy.xml b/ivy.xml index de7ca05..232299b 100644 --- a/ivy.xml +++ b/ivy.xml @@ -13,5 +13,6 @@ + diff --git a/src/com/nutrons/framework/Robot.java b/src/com/nutrons/framework/Robot.java index 6423377..1d6c3d5 100644 --- a/src/com/nutrons/framework/Robot.java +++ b/src/com/nutrons/framework/Robot.java @@ -3,14 +3,13 @@ import static com.nutrons.framework.util.CompMode.AUTO; import static com.nutrons.framework.util.CompMode.TELE; import static com.nutrons.framework.util.CompMode.TEST; -import static io.reactivex.Flowable.combineLatest; import com.nutrons.framework.commands.Command; import com.nutrons.framework.util.CompMode; import com.nutrons.framework.util.FlowOperators; import edu.wpi.first.wpilibj.SampleRobot; import io.reactivex.Flowable; -import io.reactivex.schedulers.Schedulers; +import java.util.concurrent.TimeUnit; public abstract class Robot extends SampleRobot { @@ -62,9 +61,9 @@ public final Flowable enabledStream() { */ public final Flowable competitionStream() { return Flowable.merge( - FlowOperators.toFlow(this::isAutonomous).filter(x -> x).map((x) -> AUTO), + FlowOperators.toFlow(this::isAutonomous, 50, TimeUnit.MILLISECONDS).filter(x -> x).map((x) -> AUTO), FlowOperators.toFlow(this::isOperatorControl).filter(x -> x).map((x) -> TELE), - FlowOperators.toFlow(this::isTest).filter(x -> x).map((x) -> TEST)).distinctUntilChanged(); + FlowOperators.toFlow(this::isTest).filter(x -> x).map((x) -> TEST)).distinctUntilChanged().replay(1).autoConnect(); // filter(x -> x) will filter all false values from the stream. } @@ -140,4 +139,4 @@ public final boolean isOperatorControl() { public final boolean isNewDataAvailable() { return super.isNewDataAvailable(); } -} \ No newline at end of file +} diff --git a/src/com/nutrons/framework/StreamManager.java b/src/com/nutrons/framework/StreamManager.java index 8c91302..dfb7d55 100644 --- a/src/com/nutrons/framework/StreamManager.java +++ b/src/com/nutrons/framework/StreamManager.java @@ -1,5 +1,9 @@ package com.nutrons.framework; +import static com.nutrons.framework.util.CompMode.AUTO; +import static com.nutrons.framework.util.CompMode.TELE; +import static io.reactivex.Flowable.combineLatest; + import com.nutrons.framework.commands.Command; import com.nutrons.framework.util.CompMode; import io.reactivex.Flowable; @@ -10,9 +14,7 @@ import java.util.List; import java.util.function.Supplier; -import static com.nutrons.framework.util.CompMode.AUTO; -import static com.nutrons.framework.util.CompMode.TELE; -import static io.reactivex.Flowable.combineLatest; + /** * This class sets up the I/O factories and initializes subsystems. @@ -38,7 +40,7 @@ public StreamManager(Robot robot) { * Subclasses should register subsystems in their constructor. * * @param enabled a Flowable of booleans, representing changes in the enabled state of the robot - * @param mode a Flowable of CompModes, representing changes in the competition game mode + * @param mode a Flowable of CompModes, representing changes in the competition game mode */ public StreamManager(Flowable enabled, Flowable mode) { this.subsystems = new ArrayList<>(); @@ -50,7 +52,8 @@ public StreamManager(Flowable enabled, Flowable mode) { * Called to by the bootstrapper to subscribe subsystem streams, * and start the competition loop. */ - public final void startCompetition(Supplier autoSupplier, Supplier teleopSupplier) { + public final void startCompetition(Supplier autoSupplier, + Supplier teleopSupplier) { Observable.fromIterable(this.subsystems).blockingSubscribe(x -> { System.out.println("registering " + x.getClass().getName()); x.registerSubscriptions(); @@ -59,7 +62,7 @@ public final void startCompetition(Supplier autoSupplier, Supplier x && y.compareTo(AUTO) == 0) + combineLatest(this.enabled, this.mode, (x, y) -> x && y.compareTo(AUTO) == 0).onBackpressureDrop() .subscribeOn(Schedulers.io()) .filter(x -> x) .map((a) -> { @@ -70,7 +73,7 @@ public final void startCompetition(Supplier autoSupplier, Supplier x && y.compareTo(TELE) == 0) + combineLatest(this.enabled, this.mode, (x, y) -> x && y.compareTo(TELE) == 0).onBackpressureDrop() .subscribeOn(Schedulers.io()) .filter(x -> x) .map((a) -> { diff --git a/src/com/nutrons/framework/commands/Command.java b/src/com/nutrons/framework/commands/Command.java index 38b98dc..7ddbb0b 100644 --- a/src/com/nutrons/framework/commands/Command.java +++ b/src/com/nutrons/framework/commands/Command.java @@ -1,16 +1,15 @@ package com.nutrons.framework.commands; +import static com.nutrons.framework.util.FlowOperators.toFlow; + import io.reactivex.Flowable; import io.reactivex.Observable; import io.reactivex.disposables.Disposable; import io.reactivex.flowables.ConnectableFlowable; import io.reactivex.schedulers.Schedulers; -import org.reactivestreams.Publisher; - import java.util.concurrent.TimeUnit; import java.util.function.Supplier; - -import static com.nutrons.framework.util.FlowOperators.toFlow; +import org.reactivestreams.Publisher; public class Command implements CommandWorkUnit { @@ -71,23 +70,48 @@ public static Command parallel(Command... commands) { return new Command(new ParallelCommand(commands)); } + public static Command fromSwitch(Publisher commandStream, boolean subcommandsSelfTerminate) { + return fromSwitch(commandStream, subcommandsSelfTerminate, true); + } + /** - * Creates a command that runs sequentially to another. + * Creates a command that runs commands in a stream. * - * @param commandStream A flowable of commands + * @param commandStream A flowable of commands + * @param subcommandsSelfTerminate if true, commands in the stream will self terminate; + * @param subcommandsForceTerminate if true, commands in the stream will terminate the previous command. * @retuns Second command after first is executed. */ - public static Command fromSwitch(Publisher commandStream) { - return new Command(x -> Flowable.defer(() -> - Flowable.switchOnNext(Flowable.fromPublisher(commandStream).map(y -> y.execute(x)) - .subscribeOn(Schedulers.io()))).scan((a, b) -> { - a.run(); - return b; - })); + public static Command fromSwitch(Publisher commandStream, + boolean subcommandsSelfTerminate, + boolean subcommandsForceTerminate) { + return new Command(x -> Flowable.fromPublisher(commandStream) + .concatMap(y -> Flowable.just(FlattenedTerminator.from(y.execute(subcommandsSelfTerminate))) + .subscribeOn(Schedulers.trampoline())) + .scan((a, b) -> { + if (subcommandsForceTerminate) { + a.run(); + } + return b; + })); + } + + public static Command defer(Supplier supplier) { + return Command.just(x -> { + Command actual = supplier.get(); + return actual.execute(x); + }); } + /** + * Adds a command that will terminate the current command. + * + * @param terminator Terminator command you wish to add. + * @return teriminatable command. + */ public Command addFinalTerminator(Terminator terminator) { - return Command.just(x -> this.source.execute(x).flatMap(y -> Flowable.just(y, terminator)).subscribeOn(Schedulers.io())); + return Command.just(x -> this.source.execute(x) + .flatMap(y -> Flowable.just(y, terminator))); } /** @@ -95,7 +119,7 @@ public Command addFinalTerminator(Terminator terminator) { */ public Command startable(Publisher starter) { return new Command(new SerialCommand( - Command.just(x -> Flowable.defer(() -> Flowable.never().takeUntil(starter))), + Command.just(x -> Flowable.never().takeUntil(starter)), this)); } @@ -104,8 +128,7 @@ public Command startable(Publisher starter) { * will delay the execution of all actions until startCondition returns true. */ public Command when(Supplier startCondition) { - Flowable ignition = Flowable - .defer(() -> emptyPulse.map(x -> startCondition.get()).filter(x -> x).onBackpressureDrop()); + Flowable ignition = emptyPulse.map(x -> startCondition.get()).filter(x -> x).onBackpressureDrop(); return this.startable(ignition); } @@ -120,15 +143,15 @@ public Command terminable(Publisher terminator) { /** * Copies this command into one which will end when terminator emits an item. * - * @param terminatesAtEnd if true, the command will terminate only when the end is reached, - * if false, the command may terminate before the end is reached. + * @param terminatesAtEnd if true, the command will terminate only when the end is reached, if + * false, the command may terminate before the end is reached. */ public Command endsWhen(Publisher terminator, boolean terminatesAtEnd) { return Command.just(x -> { - Flowable sourceTerminator = this.execute(terminatesAtEnd); + Flowable sourceTerminator = this.execute(terminatesAtEnd); Terminator multi = FlattenedTerminator.from(sourceTerminator); - return Flowable.defer(() -> Flowable.never().takeUntil(terminator) - .mergeWith(Flowable.just(multi::run))); + return Flowable.never().takeUntil(terminator) + .mergeWith(Flowable.just(multi)); }); } @@ -137,7 +160,8 @@ public Command endsWhen(Publisher terminator, boolean terminatesAtEnd) { * will only complete once endCondition returns true. */ public Command until(Supplier endCondition) { - ConnectableFlowable terminator = emptyPulse.map(x -> endCondition.get()).filter(x -> x).onBackpressureDrop().publish(); + ConnectableFlowable terminator = emptyPulse.map(x -> endCondition.get()).filter(x -> x) + .onBackpressureDrop().publish(); terminator.connect(); return this.terminable(terminator); } @@ -153,7 +177,7 @@ public Command then(Command next) { * Copies this command into one which will delay execution for a period of time. */ public Command delayStart(long delay, TimeUnit unit) { - return this.startable(Flowable.timer(delay, unit)); + return this.startable(Flowable.timer(delay, unit).onBackpressureBuffer()); } /** @@ -163,18 +187,31 @@ public Command delayFinish(long delay, TimeUnit unit) { return this.terminable(Flowable.timer(delay, unit)); } + /** + * End and terminate this command only after the specified time has passed. + * Kills a command after a given time and unit + * + * @param delay a number in relation to a unit 1000 = 1 ms if unit is ms. + * @param unit unit you wish to count in. + * @return a command that will terminate after a given time. + */ public Command killAfter(long delay, TimeUnit unit) { return Command.just(x -> { - Flowable terms = this.terminable(Flowable.timer(delay, unit)).execute(x); + Flowable terms = this.terminable(Flowable.timer(delay, unit)) + .execute(x); return terms; }); } @Override - public Flowable execute(boolean selfTerminating) { - Flowable terms = source.execute(selfTerminating).subscribeOn(Schedulers.io()); + public Flowable execute(boolean selfTerminating) { + Flowable terms = source.execute(selfTerminating) + .subscribeOn(Schedulers.trampoline()); if (selfTerminating) { - terms.toList().subscribe(x -> Observable.fromIterable(x).blockingSubscribe(Terminator::run)); + terms.toList().map(Observable::fromIterable).subscribeOn(Schedulers.trampoline()) + .subscribe(x -> x.subscribe(Terminator::run)); + } else { + terms.subscribe(); } return terms; } diff --git a/src/com/nutrons/framework/commands/CommandWorkUnit.java b/src/com/nutrons/framework/commands/CommandWorkUnit.java index 8458be0..34c83b0 100644 --- a/src/com/nutrons/framework/commands/CommandWorkUnit.java +++ b/src/com/nutrons/framework/commands/CommandWorkUnit.java @@ -3,5 +3,5 @@ import io.reactivex.Flowable; public interface CommandWorkUnit { - Flowable execute(boolean selfTerminating); + Flowable execute(boolean selfTerminating); } diff --git a/src/com/nutrons/framework/commands/FlattenedTerminator.java b/src/com/nutrons/framework/commands/FlattenedTerminator.java index 33779b4..04aa03c 100644 --- a/src/com/nutrons/framework/commands/FlattenedTerminator.java +++ b/src/com/nutrons/framework/commands/FlattenedTerminator.java @@ -10,13 +10,13 @@ class FlattenedTerminator implements Terminator { private final AtomicBoolean lock; private final ArrayList terminators; - private final Flowable terminatorStream; + private final Flowable terminatorStream; - private FlattenedTerminator(Flowable terminators) { + private FlattenedTerminator(Flowable terminators) { this.lock = new AtomicBoolean(false); this.terminators = new ArrayList<>(); this.terminatorStream = terminators; - terminators.subscribeOn(Schedulers.io()).subscribe(x -> { + terminators.subscribe(x -> { if (!lock.get()) { synchronized (lock) { if (!lock.get()) { @@ -29,13 +29,13 @@ private FlattenedTerminator(Flowable terminators) { }); } - static FlattenedTerminator from(Flowable terminators) { + static FlattenedTerminator from(Flowable terminators) { return new FlattenedTerminator(terminators); } public Flowable toSingle() { return Flowable.just(this).mergeWith(terminatorStream.ignoreElements().toFlowable()) - .subscribeOn(Schedulers.io()); + .subscribeOn(Schedulers.trampoline()); } @Override diff --git a/src/com/nutrons/framework/commands/ParallelCommand.java b/src/com/nutrons/framework/commands/ParallelCommand.java index df227ad..3b5b021 100644 --- a/src/com/nutrons/framework/commands/ParallelCommand.java +++ b/src/com/nutrons/framework/commands/ParallelCommand.java @@ -1,20 +1,24 @@ package com.nutrons.framework.commands; import io.reactivex.Flowable; -import io.reactivex.schedulers.Schedulers; public class ParallelCommand implements CommandWorkUnit { + private final Flowable commands; ParallelCommand(CommandWorkUnit... commands) { - this.commands = Flowable.fromArray(commands).map(Command::just); + this.commands = Flowable.fromArray(commands).concatMap(x -> { + if (x instanceof ParallelCommand) { + return ((ParallelCommand) x).commands; + } + return Flowable.just(Command.just(x)); + }); } @Override public Flowable execute(boolean selfTerminating) { - Flowable terminators = this.commands - .flatMap(x -> x.execute(selfTerminating).subscribeOn(Schedulers.io())) - .subscribeOn(Schedulers.io()).publish().autoConnect(); + Flowable terminators = this.commands + .flatMap(x -> x.execute(selfTerminating)).replay().autoConnect(); return Flowable.just(FlattenedTerminator.from(terminators)) .mergeWith(terminators.ignoreElements().toFlowable()); } diff --git a/src/com/nutrons/framework/commands/SerialCommand.java b/src/com/nutrons/framework/commands/SerialCommand.java index 0914471..488d641 100644 --- a/src/com/nutrons/framework/commands/SerialCommand.java +++ b/src/com/nutrons/framework/commands/SerialCommand.java @@ -1,20 +1,24 @@ package com.nutrons.framework.commands; import io.reactivex.Flowable; -import io.reactivex.schedulers.Schedulers; public class SerialCommand implements CommandWorkUnit { + private final Flowable commands; SerialCommand(CommandWorkUnit... commands) { - this.commands = Flowable.fromArray(commands).map(Command::just); + this.commands = Flowable.fromArray(commands).concatMap(x -> { + if (x instanceof SerialCommand) { + return ((SerialCommand) x).commands; + } + return Flowable.just(Command.just(x)); + }); } @Override public Flowable execute(boolean selfTerminating) { - Flowable terminators = this.commands - .concatMap(x -> x.execute(selfTerminating).subscribeOn(Schedulers.io())) - .subscribeOn(Schedulers.io()).publish().autoConnect(); + Flowable terminators = this.commands + .concatMap(x -> x.execute(selfTerminating)).replay().autoConnect(); return Flowable.just(FlattenedTerminator.from(terminators)) .mergeWith(terminators.ignoreElements().toFlowable()); } diff --git a/src/com/nutrons/framework/controllers/ControllerEvent.java b/src/com/nutrons/framework/controllers/ControllerEvent.java index ccaf9af..5ac1a94 100644 --- a/src/com/nutrons/framework/controllers/ControllerEvent.java +++ b/src/com/nutrons/framework/controllers/ControllerEvent.java @@ -11,4 +11,5 @@ default void actOn(VirtualSpeedController controller) { throw new EventUnimplementedException( controller.getClass().toString(), this.getClass().toString()); } + } diff --git a/src/com/nutrons/framework/controllers/Events.java b/src/com/nutrons/framework/controllers/Events.java index 32a2718..65b5464 100644 --- a/src/com/nutrons/framework/controllers/Events.java +++ b/src/com/nutrons/framework/controllers/Events.java @@ -26,6 +26,11 @@ public void actOn(Talon talon) { pid.actOn(talon); setpointEvent.actOn(talon); } + @Override + public void actOn(VirtualSpeedController controller) { + pid.actOn(controller); + setpointEvent.actOn(controller); + } }; } @@ -35,6 +40,7 @@ public static ControllerEvent pid(double pval, double ival, double dval, double /** * Combines actions that are preformed on the talon. + * * @return combined event */ public static ControllerEvent combine(ControllerEvent... events) { @@ -45,6 +51,12 @@ public void actOn(Talon talon) { e.actOn(talon); } } + @Override + public void actOn(VirtualSpeedController controller) { + for (ControllerEvent e : events) { + e.actOn(controller); + } + } }; } @@ -94,4 +106,5 @@ public static ControllerEvent setOutputVoltage(double min, double max) { public static ControllerEvent resetPosition(double position) { return new ResetPositionEvent(position); } + } diff --git a/src/com/nutrons/framework/controllers/FollowEvent.java b/src/com/nutrons/framework/controllers/FollowEvent.java index a46dc09..7d717de 100644 --- a/src/com/nutrons/framework/controllers/FollowEvent.java +++ b/src/com/nutrons/framework/controllers/FollowEvent.java @@ -6,6 +6,7 @@ public class FollowEvent implements ControllerEvent { /** * Sets a LoopSpeedController to follow another LoopSpeedController + * * @param targetToFollow is a LoopSpeedController which will be followed. */ public FollowEvent(LoopSpeedController targetToFollow) { diff --git a/src/com/nutrons/framework/controllers/LoopModeEvent.java b/src/com/nutrons/framework/controllers/LoopModeEvent.java index 82dc206..ce8cefb 100644 --- a/src/com/nutrons/framework/controllers/LoopModeEvent.java +++ b/src/com/nutrons/framework/controllers/LoopModeEvent.java @@ -16,4 +16,8 @@ public LoopModeEvent(ControlMode mode) { public void actOn(Talon talon) { talon.changeControlMode(mode); } + + public void actOn(VirtualSpeedController controller) { + controller.changeControlMode(mode); + } } diff --git a/src/com/nutrons/framework/controllers/LoopPropertiesEvent.java b/src/com/nutrons/framework/controllers/LoopPropertiesEvent.java index b7c3c8b..cddab84 100644 --- a/src/com/nutrons/framework/controllers/LoopPropertiesEvent.java +++ b/src/com/nutrons/framework/controllers/LoopPropertiesEvent.java @@ -27,4 +27,9 @@ public void actOn(Talon talon) { talon.setD(dval); talon.setF(fval); } + + @Override + public void actOn(VirtualSpeedController controller) { + controller.setLoopProperties(pval, ival, dval, fval); + } } diff --git a/src/com/nutrons/framework/controllers/LoopSpeedController.java b/src/com/nutrons/framework/controllers/LoopSpeedController.java index c4a6b38..5e1d16c 100644 --- a/src/com/nutrons/framework/controllers/LoopSpeedController.java +++ b/src/com/nutrons/framework/controllers/LoopSpeedController.java @@ -8,6 +8,8 @@ */ public abstract class LoopSpeedController implements Consumer { + public abstract double getCurrent(); + public abstract Flowable feedback(); @Override @@ -46,4 +48,6 @@ public void setReversedSensor(boolean flipped) { public abstract boolean revLimitSwitchClosed(); public abstract double position(); + + public abstract void setVoltageRampRate(double v); } diff --git a/src/com/nutrons/framework/controllers/ResetPositionEvent.java b/src/com/nutrons/framework/controllers/ResetPositionEvent.java index e426607..69f35b3 100644 --- a/src/com/nutrons/framework/controllers/ResetPositionEvent.java +++ b/src/com/nutrons/framework/controllers/ResetPositionEvent.java @@ -16,4 +16,8 @@ public ResetPositionEvent(double position) { public void actOn(Talon talon) { talon.resetPositionTo(position); } + + public void actOn(VirtualSpeedController controller) { + controller.resetPositionTo(position); + } } diff --git a/src/com/nutrons/framework/controllers/RevServo.java b/src/com/nutrons/framework/controllers/RevServo.java new file mode 100644 index 0000000..ef40826 --- /dev/null +++ b/src/com/nutrons/framework/controllers/RevServo.java @@ -0,0 +1,44 @@ +package com.nutrons.framework.controllers; + +import edu.wpi.first.wpilibj.Servo; + +/** + * The RevServo controlle used for the programming and running of Rev Smart Servos. + */ +public class RevServo extends ServoController { + + private final Servo servo; + + /** + * Makes a RevServo object at the given port. + * + * @param port Port of RevServo + */ + public RevServo(int port) { + servo = new Servo(port); + } + + @Override + public void accept(ServoCommand servoCommand) { + servoCommand.actOn(this); + } + + /** + * Sets the Servo to a given angle ranging from -90 to 90 left and right reflectively. + * + * @param angle given angle to turn to. + */ + public void setAngle(double angle) { + this.servo.setAngle(angle); + } + + /** + * Set the motor to a position given a value 0.0 is full left 1.0 is full right. + * + * @param value The value to turn the servo to. + */ + public void set(double value) { + this.servo.set(value); + } + +} diff --git a/src/com/nutrons/framework/controllers/RunAtPowerEvent.java b/src/com/nutrons/framework/controllers/RunAtPowerEvent.java index c460cac..f4cb29e 100644 --- a/src/com/nutrons/framework/controllers/RunAtPowerEvent.java +++ b/src/com/nutrons/framework/controllers/RunAtPowerEvent.java @@ -26,4 +26,9 @@ public void actOn(Talon talon) { } talon.set(power); } + + public void actOn(VirtualSpeedController controller) { + controller.setControlMode(ControlMode.MANUAL); + controller.setRawOutput(power); + } } diff --git a/src/com/nutrons/framework/controllers/ServoCommand.java b/src/com/nutrons/framework/controllers/ServoCommand.java new file mode 100644 index 0000000..5b1e4b2 --- /dev/null +++ b/src/com/nutrons/framework/controllers/ServoCommand.java @@ -0,0 +1,10 @@ +package com.nutrons.framework.controllers; + +public interface ServoCommand { + + default void actOn(RevServo servo) { + throw new EventUnimplementedException( + servo.getClass().toString(), this.getClass().toString()); + } + +} diff --git a/src/com/nutrons/framework/controllers/ServoController.java b/src/com/nutrons/framework/controllers/ServoController.java new file mode 100644 index 0000000..e079faa --- /dev/null +++ b/src/com/nutrons/framework/controllers/ServoController.java @@ -0,0 +1,17 @@ +package com.nutrons.framework.controllers; + +import io.reactivex.functions.Consumer; + +public abstract class ServoController implements Consumer { + + @Override + public abstract void accept(ServoCommand servoCommand); + + public void setAngle(double angle) { + this.accept(ServoInstr.setAngle(angle)); + } + + public void set(double value) { + this.accept(ServoInstr.set(value)); + } +} diff --git a/src/com/nutrons/framework/controllers/ServoInstr.java b/src/com/nutrons/framework/controllers/ServoInstr.java new file mode 100644 index 0000000..3c34377 --- /dev/null +++ b/src/com/nutrons/framework/controllers/ServoInstr.java @@ -0,0 +1,13 @@ +package com.nutrons.framework.controllers; + +public class ServoInstr { + + public static ServoCommand setAngle(double angle) { + return new SetAngleCommand(angle); + } + + public static ServoCommand set(double value) { + return new SetCommand(value); + } + +} diff --git a/src/com/nutrons/framework/controllers/SetAngleCommand.java b/src/com/nutrons/framework/controllers/SetAngleCommand.java new file mode 100644 index 0000000..38206a8 --- /dev/null +++ b/src/com/nutrons/framework/controllers/SetAngleCommand.java @@ -0,0 +1,31 @@ +package com.nutrons.framework.controllers; + +public class SetAngleCommand implements ServoCommand { + + private final double angle; + + /** + * An event which sets the servo to turn to a certain angle. + * + * @param angle the angle to turn the servo to -90 (left) to 90 (right). + */ + public SetAngleCommand(double angle) { + this.angle = angle; + } + + public double angle() { + return this.angle; + + } + + @Override + public void actOn(RevServo servo) { + if (Math.abs(angle) > 90.0) { + throw new IllegalArgumentException( + "Angle greater than 90 degrees is not supported for Servos"); + } + servo.setAngle(angle); + + } + +} diff --git a/src/com/nutrons/framework/controllers/SetCommand.java b/src/com/nutrons/framework/controllers/SetCommand.java new file mode 100644 index 0000000..8a893eb --- /dev/null +++ b/src/com/nutrons/framework/controllers/SetCommand.java @@ -0,0 +1,33 @@ +package com.nutrons.framework.controllers; + +public class SetCommand implements ServoCommand { + + private final double value; + + /** + * An event which sets the servo to turn to a certain angle. + * + * @param value the input value to give the servo -1 is full left 1 is full right. + */ + public SetCommand(double value) { + this.value = value; + + } + + public double value() { + return this.value; + + } + + @Override + public void actOn(RevServo servo) { + if (Math.abs(value) > 1.0) { + throw new IllegalArgumentException( + "Value input greater than 1.0 is not supported for this servo"); + + } + servo.set(value); + + } + +} diff --git a/src/com/nutrons/framework/controllers/SetReversedSensorEvent.java b/src/com/nutrons/framework/controllers/SetReversedSensorEvent.java index c58e215..c1d0aad 100644 --- a/src/com/nutrons/framework/controllers/SetReversedSensorEvent.java +++ b/src/com/nutrons/framework/controllers/SetReversedSensorEvent.java @@ -12,4 +12,9 @@ public SetReversedSensorEvent(boolean flipped) { public void actOn(Talon talon) { talon.reverseSensor(flipped); } + + @Override + public void actOn(VirtualSpeedController controller) { + controller.reverseSensor(true); + } } diff --git a/src/com/nutrons/framework/controllers/SetpointEvent.java b/src/com/nutrons/framework/controllers/SetpointEvent.java index 469028c..63086e8 100644 --- a/src/com/nutrons/framework/controllers/SetpointEvent.java +++ b/src/com/nutrons/framework/controllers/SetpointEvent.java @@ -12,4 +12,9 @@ class SetpointEvent implements ControllerEvent { public void actOn(Talon talon) { talon.changeSetpoint(setpoint); } + + @Override + public void actOn(VirtualSpeedController controller) { + controller.changeSetpoint(setpoint); + } } diff --git a/src/com/nutrons/framework/controllers/Talon.java b/src/com/nutrons/framework/controllers/Talon.java index 5babd71..e177d4c 100644 --- a/src/com/nutrons/framework/controllers/Talon.java +++ b/src/com/nutrons/framework/controllers/Talon.java @@ -1,11 +1,12 @@ package com.nutrons.framework.controllers; +import static com.nutrons.framework.util.FlowOperators.toFlow; + import com.ctre.CANTalon; +import edu.wpi.first.wpilibj.smartdashboard.SmartDashboard; import io.reactivex.Flowable; - import java.security.InvalidParameterException; -import static com.nutrons.framework.util.FlowOperators.toFlow; public class Talon extends LoopSpeedController { @@ -42,6 +43,7 @@ public Talon(int port, Talon toFollow) { } void set(double value) { + SmartDashboard.putNumber("talon " + this.id(), value); this.talon.set(value); } @@ -69,6 +71,10 @@ void changeSetpoint(double setpoint) { } + public double getCurrent() { + return this.talon.getOutputCurrent(); + } + void setP(double pval) { this.talon.setP(pval); } @@ -133,7 +139,7 @@ void resetPositionTo(double position) { @Override - public boolean fwdLimitSwitchClosed(){ + public boolean fwdLimitSwitchClosed() { return this.talon.isFwdLimitSwitchClosed(); } @@ -145,4 +151,9 @@ public boolean revLimitSwitchClosed() { public double getClosedLoopError() { return this.talon.getClosedLoopError(); } + + @Override + public void setVoltageRampRate(double v){ + this.talon.setVoltageRampRate(v); + } } diff --git a/src/com/nutrons/framework/controllers/VirtualSpeedController.java b/src/com/nutrons/framework/controllers/VirtualSpeedController.java index 730738e..ca88ae0 100644 --- a/src/com/nutrons/framework/controllers/VirtualSpeedController.java +++ b/src/com/nutrons/framework/controllers/VirtualSpeedController.java @@ -1,9 +1,63 @@ package com.nutrons.framework.controllers; import io.reactivex.Flowable; +import io.reactivex.flowables.ConnectableFlowable; +import io.reactivex.processors.PublishProcessor; public class VirtualSpeedController extends LoopSpeedController { + private final PublishProcessor outputDirection; + private final ConnectableFlowable outputDirectionReplay; + private final PublishProcessor pidProperties; + private final ConnectableFlowable pidPropertiesReplay; + private final Flowable position; + private final Flowable speed; + private final PublishProcessor setpoint; + private final ConnectableFlowable setpointReplay; + private final PublishProcessor rawOutput; + private final ConnectableFlowable rawOutputReplay; + private final PublishProcessor mode; + private final ConnectableFlowable modeReplay; + private final PublishProcessor sensorDirection; + private final ConnectableFlowable sensorDirectionRelay; + private volatile double lastSpeed; + private volatile double lastPosition; + + public VirtualSpeedController(Flowable position, Flowable speed) { + this.outputDirection = PublishProcessor.create(); + this.outputDirectionReplay = this.outputDirection.distinctUntilChanged().replay(1); + this.pidProperties = PublishProcessor.create(); + this.pidPropertiesReplay = this.pidProperties.distinctUntilChanged().replay(1); + this.setpoint = PublishProcessor.create(); + this.setpointReplay = this.setpoint.distinctUntilChanged().replay(1); + this.rawOutput = PublishProcessor.create(); + this.rawOutputReplay = this.rawOutput.distinctUntilChanged().replay(1); + this.mode = PublishProcessor.create(); + this.modeReplay = this.mode.distinctUntilChanged().replay(1); + this.sensorDirection = PublishProcessor.create(); + this.sensorDirectionRelay = this.sensorDirection.distinctUntilChanged().replay(1); + + this.modeReplay.connect(); + this.rawOutputReplay.connect(); + this.outputDirectionReplay.connect(); + this.pidPropertiesReplay.connect(); + this.setpointReplay.connect(); + + this.position = position; + this.speed = speed; + this.position.subscribe(x -> this.lastPosition = x); + this.speed.subscribe(x -> this.lastSpeed = x); + } + + public VirtualSpeedController() { + this(Flowable.just(0.0), Flowable.just(0.0)); + } + + @Override + public double getCurrent() { + return 0; + } + @Override public Flowable feedback() { return Flowable.empty(); @@ -14,9 +68,13 @@ public void accept(ControllerEvent controllerEvent) { controllerEvent.actOn(this); } + public Flowable outputDirection() { + return outputDirectionReplay; + } + @Override public void setOutputFlipped(boolean flipped) { - + outputDirection.onNext(flipped); } @Override @@ -31,10 +89,59 @@ public boolean revLimitSwitchClosed() { @Override public double position() { - return 0; + return lastPosition; + } + + @Override + public void setVoltageRampRate(double v) { + } public double speed() { - return 0; + return lastSpeed; + } + + void changeControlMode(ControlMode mode) { + this.mode.onNext(mode); + } + + public Flowable mode() { + return modeReplay; + } + + void setRawOutput(double output) { + this.rawOutput.onNext(output); + } + + public Flowable rawOutput() { + return rawOutputReplay; + } + + void setLoopProperties(double p, double i, double d, double f) { + pidProperties.onNext(new Double[] {p, i, d, f}); + } + + public Flowable loopProperties() { + return pidPropertiesReplay; + } + + void changeSetpoint(double setpoint) { + this.setpoint.onNext(setpoint); + } + + public Flowable setpoint() { + return setpointReplay; + } + + public void reverseSensor(boolean b) { + this.sensorDirection.onNext(b); + } + + public Flowable sensorDirection() { + return this.sensorDirectionRelay; + } + + public void resetPositionTo(double position) { + } } diff --git a/src/com/nutrons/framework/inputs/RadioBox.java b/src/com/nutrons/framework/inputs/RadioBox.java new file mode 100644 index 0000000..fdca643 --- /dev/null +++ b/src/com/nutrons/framework/inputs/RadioBox.java @@ -0,0 +1,43 @@ +package com.nutrons.framework.inputs; + +import static com.nutrons.framework.util.FlowOperators.toFlow; + +import com.nutrons.framework.Subsystem; +import edu.wpi.first.wpilibj.smartdashboard.SendableChooser; +import edu.wpi.first.wpilibj.smartdashboard.SmartDashboard; +import io.reactivex.Flowable; +import io.reactivex.Observable; +import java.util.HashMap; +import java.util.Map; + +public class RadioBox implements Subsystem { + private final SendableChooser chooser; + private final Map options; + private final String name; + private Map.Entry defaultOption; + + public RadioBox(String name, Map options, String defaultOption) { + this.name = name; + this.chooser = new SendableChooser<>(); + this.options = new HashMap<>(); + Observable> optionStream = Observable.fromIterable(options.entrySet()); + optionStream.filter(x -> x.getKey().equals(defaultOption)) + .blockingSubscribe(x -> this.defaultOption = x); + optionStream.filter(x -> !x.getKey().equals(defaultOption)) + .blockingSubscribe(x -> this.options.put(x.getKey(), x.getValue())); + } + + @Override + public void registerSubscriptions() { + if (defaultOption != null) { + chooser.addDefault(defaultOption.getKey(), defaultOption.getValue()); + } + Observable.fromIterable(options.entrySet()).blockingSubscribe(x -> + chooser.addObject(x.getKey(), x.getValue())); + SmartDashboard.putData(name, chooser); + } + + public Flowable selected() { + return toFlow(chooser::getSelected).distinctUntilChanged(); + } +} diff --git a/src/com/nutrons/framework/inputs/Serial.java b/src/com/nutrons/framework/inputs/Serial.java index e872c5e..ed03b17 100644 --- a/src/com/nutrons/framework/inputs/Serial.java +++ b/src/com/nutrons/framework/inputs/Serial.java @@ -1,13 +1,12 @@ package com.nutrons.framework.inputs; -import static com.nutrons.framework.util.FlowOperators.toFlow; - import com.nutrons.framework.Subsystem; import com.nutrons.framework.util.IntervalCache; import edu.wpi.first.wpilibj.SerialPort; import io.reactivex.Flowable; import io.reactivex.processors.PublishProcessor; import java.util.function.Supplier; +import static com.nutrons.framework.util.FlowOperators.toFlowFast; /** * A wrapper around WPI's SerialPort class which provides @@ -41,7 +40,7 @@ public Serial(int bufferSize, int packetLength) { * @param bufferSize represents how many bytes to cache unread before clearing buffer * @param packetLength represents the length of each read from the buffer * @param terminationCharacter allows users to set a custom termination character - default is the - * newline character '\n' + * newline character '\n' */ public Serial(SerialPort.Port port, int bufferSize, @@ -56,7 +55,7 @@ public Serial(SerialPort.Port port, * @param bufferSize represents how many bytes to cache unread before clearing buffer * @param packetLength represents the length of each read from the buffer * @param terminationCharacter allows users to set a custom termination character - default is the - * newline character '\n' + * newline character '\n' * @param baudrate See Symbol Rate */ public Serial(int baudrate, @@ -80,7 +79,7 @@ public Serial(int baudrate, } return serial.read(packetLength); }; - this.dataStream = toFlow(new IntervalCache(100, supplier)) + this.dataStream = toFlowFast(new IntervalCache(100, supplier)) .filter(x -> x.length == packetLength); } catch (RuntimeException ex) { System.out.println(ex.getMessage()); diff --git a/src/com/nutrons/framework/subsystems/WpiSmartDashboard.java b/src/com/nutrons/framework/subsystems/WpiSmartDashboard.java index 6675229..44f3b5c 100644 --- a/src/com/nutrons/framework/subsystems/WpiSmartDashboard.java +++ b/src/com/nutrons/framework/subsystems/WpiSmartDashboard.java @@ -24,6 +24,7 @@ public WpiSmartDashboard() { /** * Takes a double and passes it to the Smart Dashboard. + * * @param key decimal passed to the Smartdash. */ public Consumer getTextFieldDouble(String key) { @@ -39,7 +40,8 @@ public Consumer getTextFieldDouble(String key) { /** * Prints string to the Smart Dashborad. - * @param key A specific String. + * + * @param key A specific String. */ public Consumer getTextFieldString(String key) { if (!stringFields.containsKey(key)) { @@ -54,6 +56,7 @@ public Consumer getTextFieldString(String key) { /** * Prints Output of a Boolean to the Smart Dashboard. + * * @param key boolean statement. */ public Consumer getTextFieldBoolean(String key) { diff --git a/src/com/nutrons/framework/util/FlowOperators.java b/src/com/nutrons/framework/util/FlowOperators.java index 20ed960..1c85a8a 100644 --- a/src/com/nutrons/framework/util/FlowOperators.java +++ b/src/com/nutrons/framework/util/FlowOperators.java @@ -1,18 +1,19 @@ package com.nutrons.framework.util; +import static java.lang.Math.abs; +import static java.lang.Math.max; +import static java.lang.Math.min; + import io.reactivex.Flowable; import io.reactivex.FlowableTransformer; import io.reactivex.disposables.CompositeDisposable; import io.reactivex.disposables.Disposable; +import io.reactivex.flowables.ConnectableFlowable; import io.reactivex.functions.Function; import io.reactivex.schedulers.Schedulers; - import java.util.concurrent.TimeUnit; import java.util.function.Supplier; -import static java.lang.Math.abs; -import static java.lang.Math.max; -import static java.lang.Math.min; public class FlowOperators { @@ -24,9 +25,10 @@ public class FlowOperators { */ public static Flowable toFlow(Supplier supplier, long ignored, TimeUnit unit) { - return Flowable.interval(ignored, unit).subscribeOn(Schedulers.io()) - .map(x -> supplier.get()).onBackpressureDrop().observeOn(Schedulers.computation()) - .onBackpressureDrop(); + ConnectableFlowable toRet = Flowable.interval(ignored, unit, Schedulers.io()) + .map(x -> supplier.get()).onBackpressureDrop().observeOn(Schedulers.computation()).publish(); + toRet.connect(); + return toRet; } /** @@ -38,6 +40,10 @@ public static Flowable toFlow(Supplier supplier) { return toFlow(supplier, 100, TimeUnit.MILLISECONDS); } + public static Flowable toFlowFast(Supplier supplier) { + return toFlow(supplier, 50, TimeUnit.MILLISECONDS); + } + /** * Modifies motor input to prevent motors running at very small values * @@ -51,12 +57,25 @@ public static Flowable deadband(Flowable input) { /** * Creates a function that will return the input value, unless that value is within the range * specified by minimum and maximum. If so, the value will be changed to remap. + * 0 if in deadband else f(x-d) */ public static Function deadbandMap(double minimum, double maximum, double remap) { return bandMap(minimum, maximum, x -> remap); } + public static Function deadbandAssign(double deadzone, double rampRate) { + return (Double x) -> { + if (x <= -(deadzone / 2)) { + return -Math.pow(Math.abs(x + deadzone / 2) / (1 - deadzone / 2), rampRate); + } else if (x >= (deadzone / 2)) { + return Math.pow(x - deadzone / 2, rampRate) / (1 - deadzone / 2); + } else { + return 0.0; + } + }; + } + /** * Creates a function that will return the input value, unless that value is within the range * specified by minimum and maximum. If so, the value will be passed through the remap function. @@ -86,20 +105,37 @@ public static FlowableTransformer pidLoop(double proportional, .map(last -> last.stream().reduce(0.0, (x, y) -> x - y)) .map(x -> x * derivative); Flowable output = Flowable.combineLatest(errorP, errorI, errorD, - (p, i, d) -> p + i + d); + (p, i, d) -> p + i + d).publish().autoConnect(); return output; }; } + public static FlowableTransformer pdLoop(double proportional, + double derivative) { + return error -> { + Flowable errorP = error.map(x -> x * proportional); + Flowable errorD = error.buffer(2, 1) + .map(last -> last.stream().reduce(0.0, (x, y) -> x - y)) + .map(x -> x * derivative); + return Flowable.combineLatest(errorP, errorD, (p, d) -> p + d).publish().autoConnect(); + }; + } + public static Function limitWithin(double minimum, double maximum) { return x -> max(min(x, maximum), minimum); } - public static T printId(T t) { - System.out.println(t); - return t; + public static T printId(T flowables) { + System.out.println(flowables); + return flowables; } + /** + * Combines disposable given to this method in order to return them all at once. + * + * @param disposables the disposables you desire to combine + * @return combined disposables. + */ public static Disposable combineDisposable(Disposable... disposables) { CompositeDisposable cd = new CompositeDisposable(); Flowable.fromArray(disposables).blockingSubscribe(cd::add); diff --git a/test/com/nutrons/framework/test/MultiCommandTest.java b/test/com/nutrons/framework/test/MultiCommandTest.java index 510e164..520dd37 100644 --- a/test/com/nutrons/framework/test/MultiCommandTest.java +++ b/test/com/nutrons/framework/test/MultiCommandTest.java @@ -1,16 +1,14 @@ package com.nutrons.framework.test; +import static com.nutrons.framework.commands.Command.parallel; +import static junit.framework.TestCase.assertTrue; + import com.nutrons.framework.commands.Command; -import com.nutrons.framework.commands.TerminatorWrapper; import io.reactivex.Flowable; +import java.util.concurrent.TimeUnit; import org.junit.Before; import org.junit.Test; -import java.util.concurrent.TimeUnit; - -import static com.nutrons.framework.commands.Command.parallel; -import static junit.framework.TestCase.assertTrue; - public class MultiCommandTest { private Command delay; @@ -41,24 +39,4 @@ public void testOneThenAnother() throws InterruptedException { Thread.sleep(2000); assertTrue(record[0] == 0); } - - @Test - public void testSwitch() throws InterruptedException { - int[] record = new int[1]; - record[0] = 0; - Command inc = Command.just(x -> { - synchronized (record) { - record[0] += 1; - } - return Flowable.just(new TerminatorWrapper(() -> { - synchronized (record) { - record[0] -= 1; - } - })); - }); - Command.fromSwitch(Flowable.interval(1, TimeUnit.SECONDS).map(x -> inc).take(5)) - .execute(true).blockingSubscribe(); - Thread.sleep(2000); - assertTrue(record[0] == 0); - } } diff --git a/test/com/nutrons/framework/test/TestCommand.java b/test/com/nutrons/framework/test/TestCommand.java index bc33a49..c55d8bd 100644 --- a/test/com/nutrons/framework/test/TestCommand.java +++ b/test/com/nutrons/framework/test/TestCommand.java @@ -1,31 +1,29 @@ package com.nutrons.framework.test; +import static com.nutrons.framework.commands.Command.parallel; +import static com.nutrons.framework.commands.Command.serial; +import static junit.framework.TestCase.assertTrue; + import com.nutrons.framework.commands.Command; import com.nutrons.framework.commands.Terminator; import io.reactivex.Flowable; import io.reactivex.processors.PublishProcessor; import io.reactivex.schedulers.Schedulers; +import java.util.concurrent.TimeUnit; import org.junit.Before; import org.junit.Test; -import java.util.concurrent.TimeUnit; - -import static com.nutrons.framework.commands.Command.parallel; -import static com.nutrons.framework.commands.Command.serial; -import static junit.framework.TestCase.assertTrue; - public class TestCommand { private Command delay; - static void waitForCommand(Flowable commandExecution) { + static void waitForCommand(Flowable commandExecution) { commandExecution.blockingSubscribe(); } /** * Delays commands until test starts. */ - @Before public void setupCommands() { delay = Command.fromAction(() -> { @@ -33,6 +31,18 @@ public void setupCommands() { }).killAfter(1000, TimeUnit.MILLISECONDS); } + @Test + public void fromAction() throws InterruptedException { + int[] record = new int[1]; + Command.fromAction(() -> { + synchronized (record) { + record[0] = 1; + } + }).execute(true); + Thread.sleep(1000); + assertTrue(record[0] == 1); + } + @Test public void single() { final Integer[] arr = new Integer[1]; @@ -72,7 +82,7 @@ public void inParallelTimed() { public void testTerminable() throws InterruptedException { final long start = System.currentTimeMillis(); PublishProcessor pp = PublishProcessor.create(); - Flowable td = serial(delay, delay, delay, delay) + final Flowable td = serial(delay, delay, delay, delay) .terminable(pp).execute(true); Thread.sleep(3000); pp.onNext(new Object()); @@ -86,7 +96,8 @@ public void testUntil() throws InterruptedException { int[] record = new int[2]; assertTrue(record[0] == 0); long start = System.currentTimeMillis(); - Flowable td = Command.fromAction(() -> record[0] = 1).until(() -> record[1] == 1).execute(true); + Flowable td = Command.fromAction(() -> record[0] = 1) + .until(() -> record[1] == 1).execute(true); Flowable.timer(1, TimeUnit.SECONDS).subscribeOn(Schedulers.io()).subscribe(x -> record[1] = 1); waitForCommand(td); assertTrue(System.currentTimeMillis() - 1000 > start); @@ -109,7 +120,7 @@ public void testStartable() { public void testWhen() throws InterruptedException { int[] record = new int[2]; assertTrue(record[0] == 0); - final Flowable td = Command.fromAction(() -> record[0] = 1) + final Flowable td = Command.fromAction(() -> record[0] = 1) .when(() -> record[1] == 1) .execute(true); Thread.sleep(1000); @@ -141,4 +152,13 @@ public void killAfter() throws InterruptedException { Thread.sleep(4000); assertTrue(record[0] == 1); } + + @Test + public void notSelfTerminating() throws InterruptedException { + Command doesntTermrinate = Command.just(x -> + Flowable.just(() -> assertTrue(false)) + .mergeWith(Flowable.never())); + doesntTermrinate.execute(true); + Thread.sleep(2000); + } } diff --git a/test/com/nutrons/framework/test/TestConnectables.java b/test/com/nutrons/framework/test/TestConnectables.java index 0de262d..e83ba6e 100644 --- a/test/com/nutrons/framework/test/TestConnectables.java +++ b/test/com/nutrons/framework/test/TestConnectables.java @@ -27,4 +27,18 @@ public void testConnectableDiscards() throws InterruptedException { flow.take(1).subscribeOn(Schedulers.io()).subscribe(x -> assertTrue(x != 0)); Thread.sleep(1000); } + + @Test + public void testConnectableWaitsToRetrieve() throws InterruptedException { + Runnable avoid = () -> assertTrue(false); + Runnable wanted = () -> {}; + ConnectableFlowable source = Flowable.interval(1000, TimeUnit.MILLISECONDS, Schedulers.io()).map(x -> avoid) + .take(5).concatWith(Flowable.timer(3, TimeUnit.SECONDS).map(x -> wanted)).publish(); + source.connect(); + ConnectableFlowable listener = source.take(1).publish(); + listener.subscribe(Runnable::run); + Thread.sleep(6000); + listener.connect(); + Thread.sleep(1000); + } } diff --git a/test/com/nutrons/framework/test/TestSwitchCommand.java b/test/com/nutrons/framework/test/TestSwitchCommand.java new file mode 100644 index 0000000..22658af --- /dev/null +++ b/test/com/nutrons/framework/test/TestSwitchCommand.java @@ -0,0 +1,94 @@ +package com.nutrons.framework.test; + +import static junit.framework.TestCase.assertFalse; +import static junit.framework.TestCase.assertTrue; + +import com.nutrons.framework.commands.Command; +import com.nutrons.framework.commands.TerminatorWrapper; +import io.reactivex.Flowable; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.junit.Test; + +public class TestSwitchCommand { + + @Test + public void testSwitchStart() throws InterruptedException { + List list = new ArrayList<>(); + Flowable commandStream = Flowable.interval(100, TimeUnit.MILLISECONDS) + .map(x -> putNumber(list, x)).take(5); + Command.fromSwitch(commandStream, true).execute(true); + Thread.sleep(300); + assertFalse(list.contains(4)); + Thread.sleep(500); + for (long i = 0; i < 5; i++) { + assertTrue(list.contains(i)); + } + } + + private Command putNumber(List list, long number) { + return Command.fromAction(() -> list.add(number)); + } + + + @Test + public void testSwitchStartAndStop() throws InterruptedException { + int[] record = new int[1]; + record[0] = 0; + Command inc = Command.just(x -> { + synchronized (record) { + record[0] += 1; + } + return Flowable.just(new TerminatorWrapper(() -> { + synchronized (record) { + record[0] -= 1; + } + })); + }); + Command.fromSwitch(Flowable.interval(1, TimeUnit.SECONDS).map(x -> inc).take(5), true) + .execute(true).blockingSubscribe(); + Thread.sleep(2000); + assertTrue(record[0] == 0); + } + + @Test + public void testSwitchTerminateRealtime() throws InterruptedException { + int[] record = new int[1]; + long start = System.currentTimeMillis(); + record[0] = 0; + Command inc = Command.just(x -> Flowable.just(new TerminatorWrapper(() -> { + synchronized (record) { + record[0] += 1; + } + }))); + Command.fromSwitch(Flowable.interval(1, TimeUnit.SECONDS).map(x -> inc).take(5), true) + .execute(true); + assertTrue(System.currentTimeMillis() - start < 1000); + Thread.sleep(2000); + assertTrue(record[0] < 5); + assertTrue(record[0] > 0); + } + + @Test + public void testSwitchNotTerminating() throws InterruptedException { + Command doesntFinish = Command.just(x -> Flowable.just(() -> assertTrue(false))); + doesntFinish.execute(false); + Thread.sleep(1000); + Command justOne = Command.fromSwitch(Flowable.never() + .mergeWith(Flowable.just(doesntFinish)), false); + justOne.execute(false); + Thread.sleep(1000); + justOne.execute(true); + Thread.sleep(1000); + } + + @Test(expected = RuntimeException.class) + public void testSwitchTerminatesOnNext() { + Command doesntFinish = Command.just(x -> Flowable.just(() -> { + throw new RuntimeException(); + })); + Command two = Command.fromSwitch(Flowable.just(doesntFinish, doesntFinish), false); + two.execute(false).blockingSubscribe(); + } +}