diff --git a/.idea/FRamework.iml b/.idea/FRamework.iml
index 8696830..0a84278 100644
--- a/.idea/FRamework.iml
+++ b/.idea/FRamework.iml
@@ -12,5 +12,14 @@
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/FRamework.iml b/FRamework.iml
new file mode 100644
index 0000000..5f4ccd1
--- /dev/null
+++ b/FRamework.iml
@@ -0,0 +1,15 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/docs/google_checks.xml b/docs/google_checks.xml
index 5c47285..20b86ba 100644
--- a/docs/google_checks.xml
+++ b/docs/google_checks.xml
@@ -108,32 +108,32 @@
value="Type name ''{0}'' must match pattern ''{1}''."/>
-
+
-
+
-
+
-
+
-
+
@@ -221,7 +221,7 @@
-
+
diff --git a/ivy.xml b/ivy.xml
index de7ca05..232299b 100644
--- a/ivy.xml
+++ b/ivy.xml
@@ -13,5 +13,6 @@
+
diff --git a/src/com/nutrons/framework/Robot.java b/src/com/nutrons/framework/Robot.java
index 6423377..1d6c3d5 100644
--- a/src/com/nutrons/framework/Robot.java
+++ b/src/com/nutrons/framework/Robot.java
@@ -3,14 +3,13 @@
import static com.nutrons.framework.util.CompMode.AUTO;
import static com.nutrons.framework.util.CompMode.TELE;
import static com.nutrons.framework.util.CompMode.TEST;
-import static io.reactivex.Flowable.combineLatest;
import com.nutrons.framework.commands.Command;
import com.nutrons.framework.util.CompMode;
import com.nutrons.framework.util.FlowOperators;
import edu.wpi.first.wpilibj.SampleRobot;
import io.reactivex.Flowable;
-import io.reactivex.schedulers.Schedulers;
+import java.util.concurrent.TimeUnit;
public abstract class Robot extends SampleRobot {
@@ -62,9 +61,9 @@ public final Flowable enabledStream() {
*/
public final Flowable competitionStream() {
return Flowable.merge(
- FlowOperators.toFlow(this::isAutonomous).filter(x -> x).map((x) -> AUTO),
+ FlowOperators.toFlow(this::isAutonomous, 50, TimeUnit.MILLISECONDS).filter(x -> x).map((x) -> AUTO),
FlowOperators.toFlow(this::isOperatorControl).filter(x -> x).map((x) -> TELE),
- FlowOperators.toFlow(this::isTest).filter(x -> x).map((x) -> TEST)).distinctUntilChanged();
+ FlowOperators.toFlow(this::isTest).filter(x -> x).map((x) -> TEST)).distinctUntilChanged().replay(1).autoConnect();
// filter(x -> x) will filter all false values from the stream.
}
@@ -140,4 +139,4 @@ public final boolean isOperatorControl() {
public final boolean isNewDataAvailable() {
return super.isNewDataAvailable();
}
-}
\ No newline at end of file
+}
diff --git a/src/com/nutrons/framework/StreamManager.java b/src/com/nutrons/framework/StreamManager.java
index 8c91302..dfb7d55 100644
--- a/src/com/nutrons/framework/StreamManager.java
+++ b/src/com/nutrons/framework/StreamManager.java
@@ -1,5 +1,9 @@
package com.nutrons.framework;
+import static com.nutrons.framework.util.CompMode.AUTO;
+import static com.nutrons.framework.util.CompMode.TELE;
+import static io.reactivex.Flowable.combineLatest;
+
import com.nutrons.framework.commands.Command;
import com.nutrons.framework.util.CompMode;
import io.reactivex.Flowable;
@@ -10,9 +14,7 @@
import java.util.List;
import java.util.function.Supplier;
-import static com.nutrons.framework.util.CompMode.AUTO;
-import static com.nutrons.framework.util.CompMode.TELE;
-import static io.reactivex.Flowable.combineLatest;
+
/**
* This class sets up the I/O factories and initializes subsystems.
@@ -38,7 +40,7 @@ public StreamManager(Robot robot) {
* Subclasses should register subsystems in their constructor.
*
* @param enabled a Flowable of booleans, representing changes in the enabled state of the robot
- * @param mode a Flowable of CompModes, representing changes in the competition game mode
+ * @param mode a Flowable of CompModes, representing changes in the competition game mode
*/
public StreamManager(Flowable enabled, Flowable mode) {
this.subsystems = new ArrayList<>();
@@ -50,7 +52,8 @@ public StreamManager(Flowable enabled, Flowable mode) {
* Called to by the bootstrapper to subscribe subsystem streams,
* and start the competition loop.
*/
- public final void startCompetition(Supplier autoSupplier, Supplier teleopSupplier) {
+ public final void startCompetition(Supplier autoSupplier,
+ Supplier teleopSupplier) {
Observable.fromIterable(this.subsystems).blockingSubscribe(x -> {
System.out.println("registering " + x.getClass().getName());
x.registerSubscriptions();
@@ -59,7 +62,7 @@ public final void startCompetition(Supplier autoSupplier, Supplier x && y.compareTo(AUTO) == 0)
+ combineLatest(this.enabled, this.mode, (x, y) -> x && y.compareTo(AUTO) == 0).onBackpressureDrop()
.subscribeOn(Schedulers.io())
.filter(x -> x)
.map((a) -> {
@@ -70,7 +73,7 @@ public final void startCompetition(Supplier autoSupplier, Supplier x && y.compareTo(TELE) == 0)
+ combineLatest(this.enabled, this.mode, (x, y) -> x && y.compareTo(TELE) == 0).onBackpressureDrop()
.subscribeOn(Schedulers.io())
.filter(x -> x)
.map((a) -> {
diff --git a/src/com/nutrons/framework/commands/Command.java b/src/com/nutrons/framework/commands/Command.java
index 38b98dc..7ddbb0b 100644
--- a/src/com/nutrons/framework/commands/Command.java
+++ b/src/com/nutrons/framework/commands/Command.java
@@ -1,16 +1,15 @@
package com.nutrons.framework.commands;
+import static com.nutrons.framework.util.FlowOperators.toFlow;
+
import io.reactivex.Flowable;
import io.reactivex.Observable;
import io.reactivex.disposables.Disposable;
import io.reactivex.flowables.ConnectableFlowable;
import io.reactivex.schedulers.Schedulers;
-import org.reactivestreams.Publisher;
-
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
-
-import static com.nutrons.framework.util.FlowOperators.toFlow;
+import org.reactivestreams.Publisher;
public class Command implements CommandWorkUnit {
@@ -71,23 +70,48 @@ public static Command parallel(Command... commands) {
return new Command(new ParallelCommand(commands));
}
+ public static Command fromSwitch(Publisher extends CommandWorkUnit> commandStream, boolean subcommandsSelfTerminate) {
+ return fromSwitch(commandStream, subcommandsSelfTerminate, true);
+ }
+
/**
- * Creates a command that runs sequentially to another.
+ * Creates a command that runs commands in a stream.
*
- * @param commandStream A flowable of commands
+ * @param commandStream A flowable of commands
+ * @param subcommandsSelfTerminate if true, commands in the stream will self terminate;
+ * @param subcommandsForceTerminate if true, commands in the stream will terminate the previous command.
* @retuns Second command after first is executed.
*/
- public static Command fromSwitch(Publisher extends CommandWorkUnit> commandStream) {
- return new Command(x -> Flowable.defer(() ->
- Flowable.switchOnNext(Flowable.fromPublisher(commandStream).map(y -> y.execute(x))
- .subscribeOn(Schedulers.io()))).scan((a, b) -> {
- a.run();
- return b;
- }));
+ public static Command fromSwitch(Publisher extends CommandWorkUnit> commandStream,
+ boolean subcommandsSelfTerminate,
+ boolean subcommandsForceTerminate) {
+ return new Command(x -> Flowable.fromPublisher(commandStream)
+ .concatMap(y -> Flowable.just(FlattenedTerminator.from(y.execute(subcommandsSelfTerminate)))
+ .subscribeOn(Schedulers.trampoline()))
+ .scan((a, b) -> {
+ if (subcommandsForceTerminate) {
+ a.run();
+ }
+ return b;
+ }));
+ }
+
+ public static Command defer(Supplier supplier) {
+ return Command.just(x -> {
+ Command actual = supplier.get();
+ return actual.execute(x);
+ });
}
+ /**
+ * Adds a command that will terminate the current command.
+ *
+ * @param terminator Terminator command you wish to add.
+ * @return teriminatable command.
+ */
public Command addFinalTerminator(Terminator terminator) {
- return Command.just(x -> this.source.execute(x).flatMap(y -> Flowable.just(y, terminator)).subscribeOn(Schedulers.io()));
+ return Command.just(x -> this.source.execute(x)
+ .flatMap(y -> Flowable.just(y, terminator)));
}
/**
@@ -95,7 +119,7 @@ public Command addFinalTerminator(Terminator terminator) {
*/
public Command startable(Publisher> starter) {
return new Command(new SerialCommand(
- Command.just(x -> Flowable.defer(() -> Flowable.never().takeUntil(starter))),
+ Command.just(x -> Flowable.never().takeUntil(starter)),
this));
}
@@ -104,8 +128,7 @@ public Command startable(Publisher> starter) {
* will delay the execution of all actions until startCondition returns true.
*/
public Command when(Supplier startCondition) {
- Flowable ignition = Flowable
- .defer(() -> emptyPulse.map(x -> startCondition.get()).filter(x -> x).onBackpressureDrop());
+ Flowable ignition = emptyPulse.map(x -> startCondition.get()).filter(x -> x).onBackpressureDrop();
return this.startable(ignition);
}
@@ -120,15 +143,15 @@ public Command terminable(Publisher> terminator) {
/**
* Copies this command into one which will end when terminator emits an item.
*
- * @param terminatesAtEnd if true, the command will terminate only when the end is reached,
- * if false, the command may terminate before the end is reached.
+ * @param terminatesAtEnd if true, the command will terminate only when the end is reached, if
+ * false, the command may terminate before the end is reached.
*/
public Command endsWhen(Publisher> terminator, boolean terminatesAtEnd) {
return Command.just(x -> {
- Flowable sourceTerminator = this.execute(terminatesAtEnd);
+ Flowable extends Terminator> sourceTerminator = this.execute(terminatesAtEnd);
Terminator multi = FlattenedTerminator.from(sourceTerminator);
- return Flowable.defer(() -> Flowable.never().takeUntil(terminator)
- .mergeWith(Flowable.just(multi::run)));
+ return Flowable.never().takeUntil(terminator)
+ .mergeWith(Flowable.just(multi));
});
}
@@ -137,7 +160,8 @@ public Command endsWhen(Publisher> terminator, boolean terminatesAtEnd) {
* will only complete once endCondition returns true.
*/
public Command until(Supplier endCondition) {
- ConnectableFlowable> terminator = emptyPulse.map(x -> endCondition.get()).filter(x -> x).onBackpressureDrop().publish();
+ ConnectableFlowable> terminator = emptyPulse.map(x -> endCondition.get()).filter(x -> x)
+ .onBackpressureDrop().publish();
terminator.connect();
return this.terminable(terminator);
}
@@ -153,7 +177,7 @@ public Command then(Command next) {
* Copies this command into one which will delay execution for a period of time.
*/
public Command delayStart(long delay, TimeUnit unit) {
- return this.startable(Flowable.timer(delay, unit));
+ return this.startable(Flowable.timer(delay, unit).onBackpressureBuffer());
}
/**
@@ -163,18 +187,31 @@ public Command delayFinish(long delay, TimeUnit unit) {
return this.terminable(Flowable.timer(delay, unit));
}
+ /**
+ * End and terminate this command only after the specified time has passed.
+ * Kills a command after a given time and unit
+ *
+ * @param delay a number in relation to a unit 1000 = 1 ms if unit is ms.
+ * @param unit unit you wish to count in.
+ * @return a command that will terminate after a given time.
+ */
public Command killAfter(long delay, TimeUnit unit) {
return Command.just(x -> {
- Flowable terms = this.terminable(Flowable.timer(delay, unit)).execute(x);
+ Flowable extends Terminator> terms = this.terminable(Flowable.timer(delay, unit))
+ .execute(x);
return terms;
});
}
@Override
- public Flowable execute(boolean selfTerminating) {
- Flowable terms = source.execute(selfTerminating).subscribeOn(Schedulers.io());
+ public Flowable extends Terminator> execute(boolean selfTerminating) {
+ Flowable extends Terminator> terms = source.execute(selfTerminating)
+ .subscribeOn(Schedulers.trampoline());
if (selfTerminating) {
- terms.toList().subscribe(x -> Observable.fromIterable(x).blockingSubscribe(Terminator::run));
+ terms.toList().map(Observable::fromIterable).subscribeOn(Schedulers.trampoline())
+ .subscribe(x -> x.subscribe(Terminator::run));
+ } else {
+ terms.subscribe();
}
return terms;
}
diff --git a/src/com/nutrons/framework/commands/CommandWorkUnit.java b/src/com/nutrons/framework/commands/CommandWorkUnit.java
index 8458be0..34c83b0 100644
--- a/src/com/nutrons/framework/commands/CommandWorkUnit.java
+++ b/src/com/nutrons/framework/commands/CommandWorkUnit.java
@@ -3,5 +3,5 @@
import io.reactivex.Flowable;
public interface CommandWorkUnit {
- Flowable execute(boolean selfTerminating);
+ Flowable extends Terminator> execute(boolean selfTerminating);
}
diff --git a/src/com/nutrons/framework/commands/FlattenedTerminator.java b/src/com/nutrons/framework/commands/FlattenedTerminator.java
index 33779b4..04aa03c 100644
--- a/src/com/nutrons/framework/commands/FlattenedTerminator.java
+++ b/src/com/nutrons/framework/commands/FlattenedTerminator.java
@@ -10,13 +10,13 @@ class FlattenedTerminator implements Terminator {
private final AtomicBoolean lock;
private final ArrayList terminators;
- private final Flowable terminatorStream;
+ private final Flowable extends Terminator> terminatorStream;
- private FlattenedTerminator(Flowable terminators) {
+ private FlattenedTerminator(Flowable extends Terminator> terminators) {
this.lock = new AtomicBoolean(false);
this.terminators = new ArrayList<>();
this.terminatorStream = terminators;
- terminators.subscribeOn(Schedulers.io()).subscribe(x -> {
+ terminators.subscribe(x -> {
if (!lock.get()) {
synchronized (lock) {
if (!lock.get()) {
@@ -29,13 +29,13 @@ private FlattenedTerminator(Flowable terminators) {
});
}
- static FlattenedTerminator from(Flowable terminators) {
+ static FlattenedTerminator from(Flowable extends Terminator> terminators) {
return new FlattenedTerminator(terminators);
}
public Flowable extends Terminator> toSingle() {
return Flowable.just(this).mergeWith(terminatorStream.ignoreElements().toFlowable())
- .subscribeOn(Schedulers.io());
+ .subscribeOn(Schedulers.trampoline());
}
@Override
diff --git a/src/com/nutrons/framework/commands/ParallelCommand.java b/src/com/nutrons/framework/commands/ParallelCommand.java
index df227ad..3b5b021 100644
--- a/src/com/nutrons/framework/commands/ParallelCommand.java
+++ b/src/com/nutrons/framework/commands/ParallelCommand.java
@@ -1,20 +1,24 @@
package com.nutrons.framework.commands;
import io.reactivex.Flowable;
-import io.reactivex.schedulers.Schedulers;
public class ParallelCommand implements CommandWorkUnit {
+
private final Flowable commands;
ParallelCommand(CommandWorkUnit... commands) {
- this.commands = Flowable.fromArray(commands).map(Command::just);
+ this.commands = Flowable.fromArray(commands).concatMap(x -> {
+ if (x instanceof ParallelCommand) {
+ return ((ParallelCommand) x).commands;
+ }
+ return Flowable.just(Command.just(x));
+ });
}
@Override
public Flowable execute(boolean selfTerminating) {
- Flowable terminators = this.commands
- .flatMap(x -> x.execute(selfTerminating).subscribeOn(Schedulers.io()))
- .subscribeOn(Schedulers.io()).publish().autoConnect();
+ Flowable extends Terminator> terminators = this.commands
+ .flatMap(x -> x.execute(selfTerminating)).replay().autoConnect();
return Flowable.just(FlattenedTerminator.from(terminators))
.mergeWith(terminators.ignoreElements().toFlowable());
}
diff --git a/src/com/nutrons/framework/commands/SerialCommand.java b/src/com/nutrons/framework/commands/SerialCommand.java
index 0914471..488d641 100644
--- a/src/com/nutrons/framework/commands/SerialCommand.java
+++ b/src/com/nutrons/framework/commands/SerialCommand.java
@@ -1,20 +1,24 @@
package com.nutrons.framework.commands;
import io.reactivex.Flowable;
-import io.reactivex.schedulers.Schedulers;
public class SerialCommand implements CommandWorkUnit {
+
private final Flowable commands;
SerialCommand(CommandWorkUnit... commands) {
- this.commands = Flowable.fromArray(commands).map(Command::just);
+ this.commands = Flowable.fromArray(commands).concatMap(x -> {
+ if (x instanceof SerialCommand) {
+ return ((SerialCommand) x).commands;
+ }
+ return Flowable.just(Command.just(x));
+ });
}
@Override
public Flowable execute(boolean selfTerminating) {
- Flowable terminators = this.commands
- .concatMap(x -> x.execute(selfTerminating).subscribeOn(Schedulers.io()))
- .subscribeOn(Schedulers.io()).publish().autoConnect();
+ Flowable extends Terminator> terminators = this.commands
+ .concatMap(x -> x.execute(selfTerminating)).replay().autoConnect();
return Flowable.just(FlattenedTerminator.from(terminators))
.mergeWith(terminators.ignoreElements().toFlowable());
}
diff --git a/src/com/nutrons/framework/controllers/ControllerEvent.java b/src/com/nutrons/framework/controllers/ControllerEvent.java
index ccaf9af..5ac1a94 100644
--- a/src/com/nutrons/framework/controllers/ControllerEvent.java
+++ b/src/com/nutrons/framework/controllers/ControllerEvent.java
@@ -11,4 +11,5 @@ default void actOn(VirtualSpeedController controller) {
throw new EventUnimplementedException(
controller.getClass().toString(), this.getClass().toString());
}
+
}
diff --git a/src/com/nutrons/framework/controllers/Events.java b/src/com/nutrons/framework/controllers/Events.java
index 32a2718..65b5464 100644
--- a/src/com/nutrons/framework/controllers/Events.java
+++ b/src/com/nutrons/framework/controllers/Events.java
@@ -26,6 +26,11 @@ public void actOn(Talon talon) {
pid.actOn(talon);
setpointEvent.actOn(talon);
}
+ @Override
+ public void actOn(VirtualSpeedController controller) {
+ pid.actOn(controller);
+ setpointEvent.actOn(controller);
+ }
};
}
@@ -35,6 +40,7 @@ public static ControllerEvent pid(double pval, double ival, double dval, double
/**
* Combines actions that are preformed on the talon.
+ *
* @return combined event
*/
public static ControllerEvent combine(ControllerEvent... events) {
@@ -45,6 +51,12 @@ public void actOn(Talon talon) {
e.actOn(talon);
}
}
+ @Override
+ public void actOn(VirtualSpeedController controller) {
+ for (ControllerEvent e : events) {
+ e.actOn(controller);
+ }
+ }
};
}
@@ -94,4 +106,5 @@ public static ControllerEvent setOutputVoltage(double min, double max) {
public static ControllerEvent resetPosition(double position) {
return new ResetPositionEvent(position);
}
+
}
diff --git a/src/com/nutrons/framework/controllers/FollowEvent.java b/src/com/nutrons/framework/controllers/FollowEvent.java
index a46dc09..7d717de 100644
--- a/src/com/nutrons/framework/controllers/FollowEvent.java
+++ b/src/com/nutrons/framework/controllers/FollowEvent.java
@@ -6,6 +6,7 @@ public class FollowEvent implements ControllerEvent {
/**
* Sets a LoopSpeedController to follow another LoopSpeedController
+ *
* @param targetToFollow is a LoopSpeedController which will be followed.
*/
public FollowEvent(LoopSpeedController targetToFollow) {
diff --git a/src/com/nutrons/framework/controllers/LoopModeEvent.java b/src/com/nutrons/framework/controllers/LoopModeEvent.java
index 82dc206..ce8cefb 100644
--- a/src/com/nutrons/framework/controllers/LoopModeEvent.java
+++ b/src/com/nutrons/framework/controllers/LoopModeEvent.java
@@ -16,4 +16,8 @@ public LoopModeEvent(ControlMode mode) {
public void actOn(Talon talon) {
talon.changeControlMode(mode);
}
+
+ public void actOn(VirtualSpeedController controller) {
+ controller.changeControlMode(mode);
+ }
}
diff --git a/src/com/nutrons/framework/controllers/LoopPropertiesEvent.java b/src/com/nutrons/framework/controllers/LoopPropertiesEvent.java
index b7c3c8b..cddab84 100644
--- a/src/com/nutrons/framework/controllers/LoopPropertiesEvent.java
+++ b/src/com/nutrons/framework/controllers/LoopPropertiesEvent.java
@@ -27,4 +27,9 @@ public void actOn(Talon talon) {
talon.setD(dval);
talon.setF(fval);
}
+
+ @Override
+ public void actOn(VirtualSpeedController controller) {
+ controller.setLoopProperties(pval, ival, dval, fval);
+ }
}
diff --git a/src/com/nutrons/framework/controllers/LoopSpeedController.java b/src/com/nutrons/framework/controllers/LoopSpeedController.java
index c4a6b38..5e1d16c 100644
--- a/src/com/nutrons/framework/controllers/LoopSpeedController.java
+++ b/src/com/nutrons/framework/controllers/LoopSpeedController.java
@@ -8,6 +8,8 @@
*/
public abstract class LoopSpeedController implements Consumer {
+ public abstract double getCurrent();
+
public abstract Flowable feedback();
@Override
@@ -46,4 +48,6 @@ public void setReversedSensor(boolean flipped) {
public abstract boolean revLimitSwitchClosed();
public abstract double position();
+
+ public abstract void setVoltageRampRate(double v);
}
diff --git a/src/com/nutrons/framework/controllers/ResetPositionEvent.java b/src/com/nutrons/framework/controllers/ResetPositionEvent.java
index e426607..69f35b3 100644
--- a/src/com/nutrons/framework/controllers/ResetPositionEvent.java
+++ b/src/com/nutrons/framework/controllers/ResetPositionEvent.java
@@ -16,4 +16,8 @@ public ResetPositionEvent(double position) {
public void actOn(Talon talon) {
talon.resetPositionTo(position);
}
+
+ public void actOn(VirtualSpeedController controller) {
+ controller.resetPositionTo(position);
+ }
}
diff --git a/src/com/nutrons/framework/controllers/RevServo.java b/src/com/nutrons/framework/controllers/RevServo.java
new file mode 100644
index 0000000..ef40826
--- /dev/null
+++ b/src/com/nutrons/framework/controllers/RevServo.java
@@ -0,0 +1,44 @@
+package com.nutrons.framework.controllers;
+
+import edu.wpi.first.wpilibj.Servo;
+
+/**
+ * The RevServo controlle used for the programming and running of Rev Smart Servos.
+ */
+public class RevServo extends ServoController {
+
+ private final Servo servo;
+
+ /**
+ * Makes a RevServo object at the given port.
+ *
+ * @param port Port of RevServo
+ */
+ public RevServo(int port) {
+ servo = new Servo(port);
+ }
+
+ @Override
+ public void accept(ServoCommand servoCommand) {
+ servoCommand.actOn(this);
+ }
+
+ /**
+ * Sets the Servo to a given angle ranging from -90 to 90 left and right reflectively.
+ *
+ * @param angle given angle to turn to.
+ */
+ public void setAngle(double angle) {
+ this.servo.setAngle(angle);
+ }
+
+ /**
+ * Set the motor to a position given a value 0.0 is full left 1.0 is full right.
+ *
+ * @param value The value to turn the servo to.
+ */
+ public void set(double value) {
+ this.servo.set(value);
+ }
+
+}
diff --git a/src/com/nutrons/framework/controllers/RunAtPowerEvent.java b/src/com/nutrons/framework/controllers/RunAtPowerEvent.java
index c460cac..f4cb29e 100644
--- a/src/com/nutrons/framework/controllers/RunAtPowerEvent.java
+++ b/src/com/nutrons/framework/controllers/RunAtPowerEvent.java
@@ -26,4 +26,9 @@ public void actOn(Talon talon) {
}
talon.set(power);
}
+
+ public void actOn(VirtualSpeedController controller) {
+ controller.setControlMode(ControlMode.MANUAL);
+ controller.setRawOutput(power);
+ }
}
diff --git a/src/com/nutrons/framework/controllers/ServoCommand.java b/src/com/nutrons/framework/controllers/ServoCommand.java
new file mode 100644
index 0000000..5b1e4b2
--- /dev/null
+++ b/src/com/nutrons/framework/controllers/ServoCommand.java
@@ -0,0 +1,10 @@
+package com.nutrons.framework.controllers;
+
+public interface ServoCommand {
+
+ default void actOn(RevServo servo) {
+ throw new EventUnimplementedException(
+ servo.getClass().toString(), this.getClass().toString());
+ }
+
+}
diff --git a/src/com/nutrons/framework/controllers/ServoController.java b/src/com/nutrons/framework/controllers/ServoController.java
new file mode 100644
index 0000000..e079faa
--- /dev/null
+++ b/src/com/nutrons/framework/controllers/ServoController.java
@@ -0,0 +1,17 @@
+package com.nutrons.framework.controllers;
+
+import io.reactivex.functions.Consumer;
+
+public abstract class ServoController implements Consumer {
+
+ @Override
+ public abstract void accept(ServoCommand servoCommand);
+
+ public void setAngle(double angle) {
+ this.accept(ServoInstr.setAngle(angle));
+ }
+
+ public void set(double value) {
+ this.accept(ServoInstr.set(value));
+ }
+}
diff --git a/src/com/nutrons/framework/controllers/ServoInstr.java b/src/com/nutrons/framework/controllers/ServoInstr.java
new file mode 100644
index 0000000..3c34377
--- /dev/null
+++ b/src/com/nutrons/framework/controllers/ServoInstr.java
@@ -0,0 +1,13 @@
+package com.nutrons.framework.controllers;
+
+public class ServoInstr {
+
+ public static ServoCommand setAngle(double angle) {
+ return new SetAngleCommand(angle);
+ }
+
+ public static ServoCommand set(double value) {
+ return new SetCommand(value);
+ }
+
+}
diff --git a/src/com/nutrons/framework/controllers/SetAngleCommand.java b/src/com/nutrons/framework/controllers/SetAngleCommand.java
new file mode 100644
index 0000000..38206a8
--- /dev/null
+++ b/src/com/nutrons/framework/controllers/SetAngleCommand.java
@@ -0,0 +1,31 @@
+package com.nutrons.framework.controllers;
+
+public class SetAngleCommand implements ServoCommand {
+
+ private final double angle;
+
+ /**
+ * An event which sets the servo to turn to a certain angle.
+ *
+ * @param angle the angle to turn the servo to -90 (left) to 90 (right).
+ */
+ public SetAngleCommand(double angle) {
+ this.angle = angle;
+ }
+
+ public double angle() {
+ return this.angle;
+
+ }
+
+ @Override
+ public void actOn(RevServo servo) {
+ if (Math.abs(angle) > 90.0) {
+ throw new IllegalArgumentException(
+ "Angle greater than 90 degrees is not supported for Servos");
+ }
+ servo.setAngle(angle);
+
+ }
+
+}
diff --git a/src/com/nutrons/framework/controllers/SetCommand.java b/src/com/nutrons/framework/controllers/SetCommand.java
new file mode 100644
index 0000000..8a893eb
--- /dev/null
+++ b/src/com/nutrons/framework/controllers/SetCommand.java
@@ -0,0 +1,33 @@
+package com.nutrons.framework.controllers;
+
+public class SetCommand implements ServoCommand {
+
+ private final double value;
+
+ /**
+ * An event which sets the servo to turn to a certain angle.
+ *
+ * @param value the input value to give the servo -1 is full left 1 is full right.
+ */
+ public SetCommand(double value) {
+ this.value = value;
+
+ }
+
+ public double value() {
+ return this.value;
+
+ }
+
+ @Override
+ public void actOn(RevServo servo) {
+ if (Math.abs(value) > 1.0) {
+ throw new IllegalArgumentException(
+ "Value input greater than 1.0 is not supported for this servo");
+
+ }
+ servo.set(value);
+
+ }
+
+}
diff --git a/src/com/nutrons/framework/controllers/SetReversedSensorEvent.java b/src/com/nutrons/framework/controllers/SetReversedSensorEvent.java
index c58e215..c1d0aad 100644
--- a/src/com/nutrons/framework/controllers/SetReversedSensorEvent.java
+++ b/src/com/nutrons/framework/controllers/SetReversedSensorEvent.java
@@ -12,4 +12,9 @@ public SetReversedSensorEvent(boolean flipped) {
public void actOn(Talon talon) {
talon.reverseSensor(flipped);
}
+
+ @Override
+ public void actOn(VirtualSpeedController controller) {
+ controller.reverseSensor(true);
+ }
}
diff --git a/src/com/nutrons/framework/controllers/SetpointEvent.java b/src/com/nutrons/framework/controllers/SetpointEvent.java
index 469028c..63086e8 100644
--- a/src/com/nutrons/framework/controllers/SetpointEvent.java
+++ b/src/com/nutrons/framework/controllers/SetpointEvent.java
@@ -12,4 +12,9 @@ class SetpointEvent implements ControllerEvent {
public void actOn(Talon talon) {
talon.changeSetpoint(setpoint);
}
+
+ @Override
+ public void actOn(VirtualSpeedController controller) {
+ controller.changeSetpoint(setpoint);
+ }
}
diff --git a/src/com/nutrons/framework/controllers/Talon.java b/src/com/nutrons/framework/controllers/Talon.java
index 5babd71..e177d4c 100644
--- a/src/com/nutrons/framework/controllers/Talon.java
+++ b/src/com/nutrons/framework/controllers/Talon.java
@@ -1,11 +1,12 @@
package com.nutrons.framework.controllers;
+import static com.nutrons.framework.util.FlowOperators.toFlow;
+
import com.ctre.CANTalon;
+import edu.wpi.first.wpilibj.smartdashboard.SmartDashboard;
import io.reactivex.Flowable;
-
import java.security.InvalidParameterException;
-import static com.nutrons.framework.util.FlowOperators.toFlow;
public class Talon extends LoopSpeedController {
@@ -42,6 +43,7 @@ public Talon(int port, Talon toFollow) {
}
void set(double value) {
+ SmartDashboard.putNumber("talon " + this.id(), value);
this.talon.set(value);
}
@@ -69,6 +71,10 @@ void changeSetpoint(double setpoint) {
}
+ public double getCurrent() {
+ return this.talon.getOutputCurrent();
+ }
+
void setP(double pval) {
this.talon.setP(pval);
}
@@ -133,7 +139,7 @@ void resetPositionTo(double position) {
@Override
- public boolean fwdLimitSwitchClosed(){
+ public boolean fwdLimitSwitchClosed() {
return this.talon.isFwdLimitSwitchClosed();
}
@@ -145,4 +151,9 @@ public boolean revLimitSwitchClosed() {
public double getClosedLoopError() {
return this.talon.getClosedLoopError();
}
+
+ @Override
+ public void setVoltageRampRate(double v){
+ this.talon.setVoltageRampRate(v);
+ }
}
diff --git a/src/com/nutrons/framework/controllers/VirtualSpeedController.java b/src/com/nutrons/framework/controllers/VirtualSpeedController.java
index 730738e..ca88ae0 100644
--- a/src/com/nutrons/framework/controllers/VirtualSpeedController.java
+++ b/src/com/nutrons/framework/controllers/VirtualSpeedController.java
@@ -1,9 +1,63 @@
package com.nutrons.framework.controllers;
import io.reactivex.Flowable;
+import io.reactivex.flowables.ConnectableFlowable;
+import io.reactivex.processors.PublishProcessor;
public class VirtualSpeedController extends LoopSpeedController {
+ private final PublishProcessor outputDirection;
+ private final ConnectableFlowable outputDirectionReplay;
+ private final PublishProcessor pidProperties;
+ private final ConnectableFlowable pidPropertiesReplay;
+ private final Flowable position;
+ private final Flowable speed;
+ private final PublishProcessor setpoint;
+ private final ConnectableFlowable setpointReplay;
+ private final PublishProcessor rawOutput;
+ private final ConnectableFlowable rawOutputReplay;
+ private final PublishProcessor mode;
+ private final ConnectableFlowable modeReplay;
+ private final PublishProcessor sensorDirection;
+ private final ConnectableFlowable sensorDirectionRelay;
+ private volatile double lastSpeed;
+ private volatile double lastPosition;
+
+ public VirtualSpeedController(Flowable position, Flowable speed) {
+ this.outputDirection = PublishProcessor.create();
+ this.outputDirectionReplay = this.outputDirection.distinctUntilChanged().replay(1);
+ this.pidProperties = PublishProcessor.create();
+ this.pidPropertiesReplay = this.pidProperties.distinctUntilChanged().replay(1);
+ this.setpoint = PublishProcessor.create();
+ this.setpointReplay = this.setpoint.distinctUntilChanged().replay(1);
+ this.rawOutput = PublishProcessor.create();
+ this.rawOutputReplay = this.rawOutput.distinctUntilChanged().replay(1);
+ this.mode = PublishProcessor.create();
+ this.modeReplay = this.mode.distinctUntilChanged().replay(1);
+ this.sensorDirection = PublishProcessor.create();
+ this.sensorDirectionRelay = this.sensorDirection.distinctUntilChanged().replay(1);
+
+ this.modeReplay.connect();
+ this.rawOutputReplay.connect();
+ this.outputDirectionReplay.connect();
+ this.pidPropertiesReplay.connect();
+ this.setpointReplay.connect();
+
+ this.position = position;
+ this.speed = speed;
+ this.position.subscribe(x -> this.lastPosition = x);
+ this.speed.subscribe(x -> this.lastSpeed = x);
+ }
+
+ public VirtualSpeedController() {
+ this(Flowable.just(0.0), Flowable.just(0.0));
+ }
+
+ @Override
+ public double getCurrent() {
+ return 0;
+ }
+
@Override
public Flowable feedback() {
return Flowable.empty();
@@ -14,9 +68,13 @@ public void accept(ControllerEvent controllerEvent) {
controllerEvent.actOn(this);
}
+ public Flowable outputDirection() {
+ return outputDirectionReplay;
+ }
+
@Override
public void setOutputFlipped(boolean flipped) {
-
+ outputDirection.onNext(flipped);
}
@Override
@@ -31,10 +89,59 @@ public boolean revLimitSwitchClosed() {
@Override
public double position() {
- return 0;
+ return lastPosition;
+ }
+
+ @Override
+ public void setVoltageRampRate(double v) {
+
}
public double speed() {
- return 0;
+ return lastSpeed;
+ }
+
+ void changeControlMode(ControlMode mode) {
+ this.mode.onNext(mode);
+ }
+
+ public Flowable mode() {
+ return modeReplay;
+ }
+
+ void setRawOutput(double output) {
+ this.rawOutput.onNext(output);
+ }
+
+ public Flowable rawOutput() {
+ return rawOutputReplay;
+ }
+
+ void setLoopProperties(double p, double i, double d, double f) {
+ pidProperties.onNext(new Double[] {p, i, d, f});
+ }
+
+ public Flowable loopProperties() {
+ return pidPropertiesReplay;
+ }
+
+ void changeSetpoint(double setpoint) {
+ this.setpoint.onNext(setpoint);
+ }
+
+ public Flowable setpoint() {
+ return setpointReplay;
+ }
+
+ public void reverseSensor(boolean b) {
+ this.sensorDirection.onNext(b);
+ }
+
+ public Flowable sensorDirection() {
+ return this.sensorDirectionRelay;
+ }
+
+ public void resetPositionTo(double position) {
+
}
}
diff --git a/src/com/nutrons/framework/inputs/RadioBox.java b/src/com/nutrons/framework/inputs/RadioBox.java
new file mode 100644
index 0000000..fdca643
--- /dev/null
+++ b/src/com/nutrons/framework/inputs/RadioBox.java
@@ -0,0 +1,43 @@
+package com.nutrons.framework.inputs;
+
+import static com.nutrons.framework.util.FlowOperators.toFlow;
+
+import com.nutrons.framework.Subsystem;
+import edu.wpi.first.wpilibj.smartdashboard.SendableChooser;
+import edu.wpi.first.wpilibj.smartdashboard.SmartDashboard;
+import io.reactivex.Flowable;
+import io.reactivex.Observable;
+import java.util.HashMap;
+import java.util.Map;
+
+public class RadioBox implements Subsystem {
+ private final SendableChooser chooser;
+ private final Map options;
+ private final String name;
+ private Map.Entry defaultOption;
+
+ public RadioBox(String name, Map options, String defaultOption) {
+ this.name = name;
+ this.chooser = new SendableChooser<>();
+ this.options = new HashMap<>();
+ Observable> optionStream = Observable.fromIterable(options.entrySet());
+ optionStream.filter(x -> x.getKey().equals(defaultOption))
+ .blockingSubscribe(x -> this.defaultOption = x);
+ optionStream.filter(x -> !x.getKey().equals(defaultOption))
+ .blockingSubscribe(x -> this.options.put(x.getKey(), x.getValue()));
+ }
+
+ @Override
+ public void registerSubscriptions() {
+ if (defaultOption != null) {
+ chooser.addDefault(defaultOption.getKey(), defaultOption.getValue());
+ }
+ Observable.fromIterable(options.entrySet()).blockingSubscribe(x ->
+ chooser.addObject(x.getKey(), x.getValue()));
+ SmartDashboard.putData(name, chooser);
+ }
+
+ public Flowable selected() {
+ return toFlow(chooser::getSelected).distinctUntilChanged();
+ }
+}
diff --git a/src/com/nutrons/framework/inputs/Serial.java b/src/com/nutrons/framework/inputs/Serial.java
index e872c5e..ed03b17 100644
--- a/src/com/nutrons/framework/inputs/Serial.java
+++ b/src/com/nutrons/framework/inputs/Serial.java
@@ -1,13 +1,12 @@
package com.nutrons.framework.inputs;
-import static com.nutrons.framework.util.FlowOperators.toFlow;
-
import com.nutrons.framework.Subsystem;
import com.nutrons.framework.util.IntervalCache;
import edu.wpi.first.wpilibj.SerialPort;
import io.reactivex.Flowable;
import io.reactivex.processors.PublishProcessor;
import java.util.function.Supplier;
+import static com.nutrons.framework.util.FlowOperators.toFlowFast;
/**
* A wrapper around WPI's SerialPort class which provides
@@ -41,7 +40,7 @@ public Serial(int bufferSize, int packetLength) {
* @param bufferSize represents how many bytes to cache unread before clearing buffer
* @param packetLength represents the length of each read from the buffer
* @param terminationCharacter allows users to set a custom termination character - default is the
- * newline character '\n'
+ * newline character '\n'
*/
public Serial(SerialPort.Port port,
int bufferSize,
@@ -56,7 +55,7 @@ public Serial(SerialPort.Port port,
* @param bufferSize represents how many bytes to cache unread before clearing buffer
* @param packetLength represents the length of each read from the buffer
* @param terminationCharacter allows users to set a custom termination character - default is the
- * newline character '\n'
+ * newline character '\n'
* @param baudrate See Symbol Rate
*/
public Serial(int baudrate,
@@ -80,7 +79,7 @@ public Serial(int baudrate,
}
return serial.read(packetLength);
};
- this.dataStream = toFlow(new IntervalCache(100, supplier))
+ this.dataStream = toFlowFast(new IntervalCache(100, supplier))
.filter(x -> x.length == packetLength);
} catch (RuntimeException ex) {
System.out.println(ex.getMessage());
diff --git a/src/com/nutrons/framework/subsystems/WpiSmartDashboard.java b/src/com/nutrons/framework/subsystems/WpiSmartDashboard.java
index 6675229..44f3b5c 100644
--- a/src/com/nutrons/framework/subsystems/WpiSmartDashboard.java
+++ b/src/com/nutrons/framework/subsystems/WpiSmartDashboard.java
@@ -24,6 +24,7 @@ public WpiSmartDashboard() {
/**
* Takes a double and passes it to the Smart Dashboard.
+ *
* @param key decimal passed to the Smartdash.
*/
public Consumer getTextFieldDouble(String key) {
@@ -39,7 +40,8 @@ public Consumer getTextFieldDouble(String key) {
/**
* Prints string to the Smart Dashborad.
- * @param key A specific String.
+ *
+ * @param key A specific String.
*/
public Consumer getTextFieldString(String key) {
if (!stringFields.containsKey(key)) {
@@ -54,6 +56,7 @@ public Consumer getTextFieldString(String key) {
/**
* Prints Output of a Boolean to the Smart Dashboard.
+ *
* @param key boolean statement.
*/
public Consumer getTextFieldBoolean(String key) {
diff --git a/src/com/nutrons/framework/util/FlowOperators.java b/src/com/nutrons/framework/util/FlowOperators.java
index 20ed960..1c85a8a 100644
--- a/src/com/nutrons/framework/util/FlowOperators.java
+++ b/src/com/nutrons/framework/util/FlowOperators.java
@@ -1,18 +1,19 @@
package com.nutrons.framework.util;
+import static java.lang.Math.abs;
+import static java.lang.Math.max;
+import static java.lang.Math.min;
+
import io.reactivex.Flowable;
import io.reactivex.FlowableTransformer;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.disposables.Disposable;
+import io.reactivex.flowables.ConnectableFlowable;
import io.reactivex.functions.Function;
import io.reactivex.schedulers.Schedulers;
-
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
-import static java.lang.Math.abs;
-import static java.lang.Math.max;
-import static java.lang.Math.min;
public class FlowOperators {
@@ -24,9 +25,10 @@ public class FlowOperators {
*/
public static Flowable toFlow(Supplier supplier,
long ignored, TimeUnit unit) {
- return Flowable.interval(ignored, unit).subscribeOn(Schedulers.io())
- .map(x -> supplier.get()).onBackpressureDrop().observeOn(Schedulers.computation())
- .onBackpressureDrop();
+ ConnectableFlowable toRet = Flowable.interval(ignored, unit, Schedulers.io())
+ .map(x -> supplier.get()).onBackpressureDrop().observeOn(Schedulers.computation()).publish();
+ toRet.connect();
+ return toRet;
}
/**
@@ -38,6 +40,10 @@ public static Flowable toFlow(Supplier supplier) {
return toFlow(supplier, 100, TimeUnit.MILLISECONDS);
}
+ public static Flowable toFlowFast(Supplier supplier) {
+ return toFlow(supplier, 50, TimeUnit.MILLISECONDS);
+ }
+
/**
* Modifies motor input to prevent motors running at very small values
*
@@ -51,12 +57,25 @@ public static Flowable deadband(Flowable input) {
/**
* Creates a function that will return the input value, unless that value is within the range
* specified by minimum and maximum. If so, the value will be changed to remap.
+ * 0 if in deadband else f(x-d)
*/
public static Function deadbandMap(double minimum, double maximum,
double remap) {
return bandMap(minimum, maximum, x -> remap);
}
+ public static Function deadbandAssign(double deadzone, double rampRate) {
+ return (Double x) -> {
+ if (x <= -(deadzone / 2)) {
+ return -Math.pow(Math.abs(x + deadzone / 2) / (1 - deadzone / 2), rampRate);
+ } else if (x >= (deadzone / 2)) {
+ return Math.pow(x - deadzone / 2, rampRate) / (1 - deadzone / 2);
+ } else {
+ return 0.0;
+ }
+ };
+ }
+
/**
* Creates a function that will return the input value, unless that value is within the range
* specified by minimum and maximum. If so, the value will be passed through the remap function.
@@ -86,20 +105,37 @@ public static FlowableTransformer pidLoop(double proportional,
.map(last -> last.stream().reduce(0.0, (x, y) -> x - y))
.map(x -> x * derivative);
Flowable output = Flowable.combineLatest(errorP, errorI, errorD,
- (p, i, d) -> p + i + d);
+ (p, i, d) -> p + i + d).publish().autoConnect();
return output;
};
}
+ public static FlowableTransformer pdLoop(double proportional,
+ double derivative) {
+ return error -> {
+ Flowable errorP = error.map(x -> x * proportional);
+ Flowable errorD = error.buffer(2, 1)
+ .map(last -> last.stream().reduce(0.0, (x, y) -> x - y))
+ .map(x -> x * derivative);
+ return Flowable.combineLatest(errorP, errorD, (p, d) -> p + d).publish().autoConnect();
+ };
+ }
+
public static Function limitWithin(double minimum, double maximum) {
return x -> max(min(x, maximum), minimum);
}
- public static T printId(T t) {
- System.out.println(t);
- return t;
+ public static T printId(T flowables) {
+ System.out.println(flowables);
+ return flowables;
}
+ /**
+ * Combines disposable given to this method in order to return them all at once.
+ *
+ * @param disposables the disposables you desire to combine
+ * @return combined disposables.
+ */
public static Disposable combineDisposable(Disposable... disposables) {
CompositeDisposable cd = new CompositeDisposable();
Flowable.fromArray(disposables).blockingSubscribe(cd::add);
diff --git a/test/com/nutrons/framework/test/MultiCommandTest.java b/test/com/nutrons/framework/test/MultiCommandTest.java
index 510e164..520dd37 100644
--- a/test/com/nutrons/framework/test/MultiCommandTest.java
+++ b/test/com/nutrons/framework/test/MultiCommandTest.java
@@ -1,16 +1,14 @@
package com.nutrons.framework.test;
+import static com.nutrons.framework.commands.Command.parallel;
+import static junit.framework.TestCase.assertTrue;
+
import com.nutrons.framework.commands.Command;
-import com.nutrons.framework.commands.TerminatorWrapper;
import io.reactivex.Flowable;
+import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Test;
-import java.util.concurrent.TimeUnit;
-
-import static com.nutrons.framework.commands.Command.parallel;
-import static junit.framework.TestCase.assertTrue;
-
public class MultiCommandTest {
private Command delay;
@@ -41,24 +39,4 @@ public void testOneThenAnother() throws InterruptedException {
Thread.sleep(2000);
assertTrue(record[0] == 0);
}
-
- @Test
- public void testSwitch() throws InterruptedException {
- int[] record = new int[1];
- record[0] = 0;
- Command inc = Command.just(x -> {
- synchronized (record) {
- record[0] += 1;
- }
- return Flowable.just(new TerminatorWrapper(() -> {
- synchronized (record) {
- record[0] -= 1;
- }
- }));
- });
- Command.fromSwitch(Flowable.interval(1, TimeUnit.SECONDS).map(x -> inc).take(5))
- .execute(true).blockingSubscribe();
- Thread.sleep(2000);
- assertTrue(record[0] == 0);
- }
}
diff --git a/test/com/nutrons/framework/test/TestCommand.java b/test/com/nutrons/framework/test/TestCommand.java
index bc33a49..c55d8bd 100644
--- a/test/com/nutrons/framework/test/TestCommand.java
+++ b/test/com/nutrons/framework/test/TestCommand.java
@@ -1,31 +1,29 @@
package com.nutrons.framework.test;
+import static com.nutrons.framework.commands.Command.parallel;
+import static com.nutrons.framework.commands.Command.serial;
+import static junit.framework.TestCase.assertTrue;
+
import com.nutrons.framework.commands.Command;
import com.nutrons.framework.commands.Terminator;
import io.reactivex.Flowable;
import io.reactivex.processors.PublishProcessor;
import io.reactivex.schedulers.Schedulers;
+import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Test;
-import java.util.concurrent.TimeUnit;
-
-import static com.nutrons.framework.commands.Command.parallel;
-import static com.nutrons.framework.commands.Command.serial;
-import static junit.framework.TestCase.assertTrue;
-
public class TestCommand {
private Command delay;
- static void waitForCommand(Flowable commandExecution) {
+ static void waitForCommand(Flowable extends Terminator> commandExecution) {
commandExecution.blockingSubscribe();
}
/**
* Delays commands until test starts.
*/
-
@Before
public void setupCommands() {
delay = Command.fromAction(() -> {
@@ -33,6 +31,18 @@ public void setupCommands() {
}).killAfter(1000, TimeUnit.MILLISECONDS);
}
+ @Test
+ public void fromAction() throws InterruptedException {
+ int[] record = new int[1];
+ Command.fromAction(() -> {
+ synchronized (record) {
+ record[0] = 1;
+ }
+ }).execute(true);
+ Thread.sleep(1000);
+ assertTrue(record[0] == 1);
+ }
+
@Test
public void single() {
final Integer[] arr = new Integer[1];
@@ -72,7 +82,7 @@ public void inParallelTimed() {
public void testTerminable() throws InterruptedException {
final long start = System.currentTimeMillis();
PublishProcessor pp = PublishProcessor.create();
- Flowable td = serial(delay, delay, delay, delay)
+ final Flowable extends Terminator> td = serial(delay, delay, delay, delay)
.terminable(pp).execute(true);
Thread.sleep(3000);
pp.onNext(new Object());
@@ -86,7 +96,8 @@ public void testUntil() throws InterruptedException {
int[] record = new int[2];
assertTrue(record[0] == 0);
long start = System.currentTimeMillis();
- Flowable td = Command.fromAction(() -> record[0] = 1).until(() -> record[1] == 1).execute(true);
+ Flowable extends Terminator> td = Command.fromAction(() -> record[0] = 1)
+ .until(() -> record[1] == 1).execute(true);
Flowable.timer(1, TimeUnit.SECONDS).subscribeOn(Schedulers.io()).subscribe(x -> record[1] = 1);
waitForCommand(td);
assertTrue(System.currentTimeMillis() - 1000 > start);
@@ -109,7 +120,7 @@ public void testStartable() {
public void testWhen() throws InterruptedException {
int[] record = new int[2];
assertTrue(record[0] == 0);
- final Flowable td = Command.fromAction(() -> record[0] = 1)
+ final Flowable extends Terminator> td = Command.fromAction(() -> record[0] = 1)
.when(() -> record[1] == 1)
.execute(true);
Thread.sleep(1000);
@@ -141,4 +152,13 @@ public void killAfter() throws InterruptedException {
Thread.sleep(4000);
assertTrue(record[0] == 1);
}
+
+ @Test
+ public void notSelfTerminating() throws InterruptedException {
+ Command doesntTermrinate = Command.just(x ->
+ Flowable.just(() -> assertTrue(false))
+ .mergeWith(Flowable.never()));
+ doesntTermrinate.execute(true);
+ Thread.sleep(2000);
+ }
}
diff --git a/test/com/nutrons/framework/test/TestConnectables.java b/test/com/nutrons/framework/test/TestConnectables.java
index 0de262d..e83ba6e 100644
--- a/test/com/nutrons/framework/test/TestConnectables.java
+++ b/test/com/nutrons/framework/test/TestConnectables.java
@@ -27,4 +27,18 @@ public void testConnectableDiscards() throws InterruptedException {
flow.take(1).subscribeOn(Schedulers.io()).subscribe(x -> assertTrue(x != 0));
Thread.sleep(1000);
}
+
+ @Test
+ public void testConnectableWaitsToRetrieve() throws InterruptedException {
+ Runnable avoid = () -> assertTrue(false);
+ Runnable wanted = () -> {};
+ ConnectableFlowable source = Flowable.interval(1000, TimeUnit.MILLISECONDS, Schedulers.io()).map(x -> avoid)
+ .take(5).concatWith(Flowable.timer(3, TimeUnit.SECONDS).map(x -> wanted)).publish();
+ source.connect();
+ ConnectableFlowable listener = source.take(1).publish();
+ listener.subscribe(Runnable::run);
+ Thread.sleep(6000);
+ listener.connect();
+ Thread.sleep(1000);
+ }
}
diff --git a/test/com/nutrons/framework/test/TestSwitchCommand.java b/test/com/nutrons/framework/test/TestSwitchCommand.java
new file mode 100644
index 0000000..22658af
--- /dev/null
+++ b/test/com/nutrons/framework/test/TestSwitchCommand.java
@@ -0,0 +1,94 @@
+package com.nutrons.framework.test;
+
+import static junit.framework.TestCase.assertFalse;
+import static junit.framework.TestCase.assertTrue;
+
+import com.nutrons.framework.commands.Command;
+import com.nutrons.framework.commands.TerminatorWrapper;
+import io.reactivex.Flowable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.junit.Test;
+
+public class TestSwitchCommand {
+
+ @Test
+ public void testSwitchStart() throws InterruptedException {
+ List list = new ArrayList<>();
+ Flowable commandStream = Flowable.interval(100, TimeUnit.MILLISECONDS)
+ .map(x -> putNumber(list, x)).take(5);
+ Command.fromSwitch(commandStream, true).execute(true);
+ Thread.sleep(300);
+ assertFalse(list.contains(4));
+ Thread.sleep(500);
+ for (long i = 0; i < 5; i++) {
+ assertTrue(list.contains(i));
+ }
+ }
+
+ private Command putNumber(List list, long number) {
+ return Command.fromAction(() -> list.add(number));
+ }
+
+
+ @Test
+ public void testSwitchStartAndStop() throws InterruptedException {
+ int[] record = new int[1];
+ record[0] = 0;
+ Command inc = Command.just(x -> {
+ synchronized (record) {
+ record[0] += 1;
+ }
+ return Flowable.just(new TerminatorWrapper(() -> {
+ synchronized (record) {
+ record[0] -= 1;
+ }
+ }));
+ });
+ Command.fromSwitch(Flowable.interval(1, TimeUnit.SECONDS).map(x -> inc).take(5), true)
+ .execute(true).blockingSubscribe();
+ Thread.sleep(2000);
+ assertTrue(record[0] == 0);
+ }
+
+ @Test
+ public void testSwitchTerminateRealtime() throws InterruptedException {
+ int[] record = new int[1];
+ long start = System.currentTimeMillis();
+ record[0] = 0;
+ Command inc = Command.just(x -> Flowable.just(new TerminatorWrapper(() -> {
+ synchronized (record) {
+ record[0] += 1;
+ }
+ })));
+ Command.fromSwitch(Flowable.interval(1, TimeUnit.SECONDS).map(x -> inc).take(5), true)
+ .execute(true);
+ assertTrue(System.currentTimeMillis() - start < 1000);
+ Thread.sleep(2000);
+ assertTrue(record[0] < 5);
+ assertTrue(record[0] > 0);
+ }
+
+ @Test
+ public void testSwitchNotTerminating() throws InterruptedException {
+ Command doesntFinish = Command.just(x -> Flowable.just(() -> assertTrue(false)));
+ doesntFinish.execute(false);
+ Thread.sleep(1000);
+ Command justOne = Command.fromSwitch(Flowable.never()
+ .mergeWith(Flowable.just(doesntFinish)), false);
+ justOne.execute(false);
+ Thread.sleep(1000);
+ justOne.execute(true);
+ Thread.sleep(1000);
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void testSwitchTerminatesOnNext() {
+ Command doesntFinish = Command.just(x -> Flowable.just(() -> {
+ throw new RuntimeException();
+ }));
+ Command two = Command.fromSwitch(Flowable.just(doesntFinish, doesntFinish), false);
+ two.execute(false).blockingSubscribe();
+ }
+}