Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 17 additions & 14 deletions smr/src/main/java/com/codeheadsystems/smr/Context.java
Original file line number Diff line number Diff line change
@@ -1,30 +1,28 @@
package com.codeheadsystems.smr;

import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;

/**
* You can have many contexts for a single state machine. And the
* state machine manages this one context.
*/
@FunctionalInterface
public interface Context {

/**
* Reference atomic reference.
* State state.
*
* @return the atomic reference
* @return the state
*/
AtomicReference<State> reference();
State state();

/**
* State state.
* Sets the current state to the new state, returning the existing state, if any.
*
* @return the state
* @param newState for the context.
* @return the old state
*/
default State state() {
return reference().get();
}

Optional<State> setState(State newState);
/**
* You can extend this to generate your own context easily enough.
*/
Expand All @@ -33,20 +31,25 @@ abstract class Impl implements Context {
/**
* The State.
*/
protected final AtomicReference<State> state;
private final AtomicReference<State> reference;

/**
* Instantiates a new .
*
* @param initialState the initial state
*/
public Impl(State initialState) {
this.state = new AtomicReference<>(initialState);
this.reference = new AtomicReference<>(initialState);
}

@Override
public State state() {
return reference.get();
}

@Override
public AtomicReference<State> reference() {
return state;
public Optional<State> setState(State newState) {
return Optional.ofNullable(reference.getAndSet(newState));
}

}
Expand Down
15 changes: 2 additions & 13 deletions smr/src/main/java/com/codeheadsystems/smr/Dispatcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,6 @@ void disable(State state,
Phase phase,
Consumer<Callback> contextConsumer);

/**
* Wrapper to use the event unaware method in case you want to make decisions based on the
* event. This is not recommended as it breaks the state machine pattern, but you do you.
*
* @param context that holds onto the current state.
* @param currentState expected current state.
* @param newState new state to call.
* @param event that caused the transition.
*/
default void handleTransitionEvent(Context context, State currentState, State newState, Event event) {
handleTransitionEvent(context, currentState, newState);
}

/**
* Macro method that handles the full state change and callback execution.
*
Expand All @@ -51,6 +38,8 @@ default void handleTransitionEvent(Context context, State currentState, State ne
*/
void handleTransitionEvent(Context context, State currentState, State newState);

// ---- Below here exist for decorators ----

/**
* Does the state change.
*
Expand Down
11 changes: 1 addition & 10 deletions smr/src/main/java/com/codeheadsystems/smr/StateMachine.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,6 @@ public static StateMachine.Builder builder() {
return new StateMachine.Builder();
}

/**
* The current state of the state machine.
*
* @return the current state.
*/
public State state() {
return state.get();
}

/**
* Get the states that are valid for the current state machine.
*
Expand Down Expand Up @@ -133,7 +124,7 @@ public State dispatch(final Event event) {
final Optional<State> optionalNewState = definition.forEvent(currentState, event);
if (optionalNewState.isPresent()) {
final State newState = optionalNewState.get();
dispatcher.handleTransitionEvent(this, currentState, newState, event);
dispatcher.handleTransitionEvent(this, currentState, newState);
return newState;
} else {
log.warn("No transition for event {} from state {}", event, currentState);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package com.codeheadsystems.smr.dispatcher;

import com.codeheadsystems.smr.Callback;
import com.codeheadsystems.smr.State;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* The type asynchronous dispatcher.
*/
public class AsynchronousDispatcher extends BaseDispatcher {

protected static final Logger log = LoggerFactory.getLogger(AsynchronousDispatcher.class);
private final Executor executor;

public static Builder builder() {
return new Builder();
}

/**
* Instantiates a new Synchronous dispatcher.
*
* @param states the states
* @param executor
*/
private AsynchronousDispatcher(final Set<State> states, final Executor executor) {
super(states);
this.executor = executor;
log.info("SynchronousDispatcher()");
}

// A synchronized execution of the callbacks. Basic.
@Override
protected void executeCallbacks(final Set<Consumer<Callback>> phasedCallbacks,
final Callback callback) {
phasedCallbacks.stream()
.map(callbackConsumer ->
CompletableFuture.runAsync(() -> executeCallback(callbackConsumer, callback), executor))
.forEach(CompletableFuture::join);
}

public static class Builder {

private Executor executor;
private Set<State> states;

public Builder withExecutor(Executor executor) {
this.executor = executor;
return this;
}

public Builder withStates(Set<State> states) {
this.states = states;
return this;
}

public AsynchronousDispatcher build() {
if (states == null) {
throw new IllegalArgumentException("states must not be null");
}
Executor localExecutor = executor;
if (localExecutor == null) {
localExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
}
return new AsynchronousDispatcher(states, localExecutor);
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package com.codeheadsystems.smr.dispatcher;

import com.codeheadsystems.smr.Callback;
import com.codeheadsystems.smr.Context;
import com.codeheadsystems.smr.Dispatcher;
import com.codeheadsystems.smr.ImmutableCallback;
import com.codeheadsystems.smr.Phase;
import com.codeheadsystems.smr.State;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class BaseDispatcher implements Dispatcher {
protected static final Logger log = LoggerFactory.getLogger(BaseDispatcher.class);
protected final Map<State, Set<Consumer<Callback>>[]> callbackMap;

public BaseDispatcher(final Set<State> states) {
log.info("BaseDispatcher()");
this.callbackMap = states.stream()
.collect(HashMap::new, (map, state) -> map.put(state, buildList()), HashMap::putAll);
}

@Override
public void enable(final State state,
final Phase phase,
final Consumer<Callback> contextConsumer) {
log.trace("enable({}, {}, {})", state, phase, contextConsumer);
callbackMap.get(state)[phase.ordinal()].add(contextConsumer);
}

@Override
public void disable(final State state,
final Phase phase,
final Consumer<Callback> contextConsumer) {
log.trace("disable({}, {}, {})", state, phase, contextConsumer);
callbackMap.get(state)[phase.ordinal()].remove(contextConsumer);
}

/**
* TODO: This method needs to be handled with care. Need to consider if we want to 1) back out of events if
* things failed, 2) keep it simple but incomplete, 3) allow for various implementations. (Most likely).
*
* @param context that has state being changed.
* @param currentState the from state.
* @param newState the too state.
*/
@Override
public void handleTransitionEvent(final Context context,
final State currentState,
final State newState) {
log.trace("handleTransitionEvent({}, {}, {})", context, currentState, newState);
dispatchCallbacks(context, currentState, Phase.EXIT);
final State previousState = changeState(context, currentState, newState);
if (!previousState.equals(currentState)) {
log.warn("handleTransitionEvent:state: {} != {}", previousState, currentState);
}
dispatchCallbacks(context, newState, Phase.ENTER);
}

@Override
public State changeState(final Context context, final State currentState, final State newState) {
return context.setState(newState).orElse(null);
}

@Override
public void dispatchCallbacks(final Context context,
final State currentState,
final Phase phase) {
log.trace("dispatchCallbacks({}, {}, {})", context, currentState, phase);
final Set<Consumer<Callback>>[] callbacks = callbackMap.get(currentState);
final Set<Consumer<Callback>> phasedCallbacks = callbacks[phase.ordinal()];
if (phasedCallbacks.isEmpty()) {
return;
}
final Callback callback = ImmutableCallback.builder()
.context(context)
.state(currentState)
.phase(phase)
.build();
executeCallbacks(phasedCallbacks, callback);
}

// Implement this for any logic you need.
abstract protected void executeCallbacks(Set<Consumer<Callback>> phasedCallbacks, final Callback callback);

@Override
public void executeCallback(final Consumer<Callback> consumer,
final Callback callback) {
try {
consumer.accept(callback);
} catch (RuntimeException e) {
log.error("dispatchCallbacks:error: {}", consumer, e);
}
}

@SuppressWarnings("unchecked")
protected Set<Consumer<Callback>>[] buildList() {
return Arrays.stream(Phase.values())
.map(event -> new HashSet<Consumer<Callback>>()).toArray(Set[]::new);
}
}
Loading
Loading