Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 56 additions & 7 deletions src/main/kotlin/ch/kuon/phoenix/Channel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,20 @@ class Channel internal constructor(
}
}

private var state = State.CLOSED
/**
* Current channel state (read-only for consumers)
*/
var state = State.CLOSED
internal set

/**
* Optional callback invoked on every state transition.
*
* Fires inside the socket's synchronized block, so avoid
* long-running work in this callback.
*/
var onStateChange: ((State) -> Unit)? = null

private var bindings =
listOf<Triple<String, Ref, (Message?, Ref?, Ref?) -> Unit>>()
private var bindingRef: Ref = 0
Expand Down Expand Up @@ -351,6 +364,7 @@ class Channel internal constructor(
joinPush.receive("ok") {
synchronized(socket) {
state = State.JOINED
onStateChange?.invoke(state)
rejoinTimer.reset()
pushBuffer.forEach { evt ->
evt.send()
Expand All @@ -361,6 +375,7 @@ class Channel internal constructor(
joinPush.receive("error") {
synchronized(socket) {
state = State.ERRORED
onStateChange?.invoke(state)
if (socket.isConnected()) {
rejoinTimer.scheduleTimeout()
}
Expand All @@ -372,6 +387,7 @@ class Channel internal constructor(
rejoinTimer.reset()
socket.log("CHANNEL", "Closed", this)
state = State.CLOSED
onStateChange?.invoke(state)
socket.remove(this)
}
}
Expand All @@ -383,6 +399,7 @@ class Channel internal constructor(
joinPush.reset()
}
state = State.ERRORED
onStateChange?.invoke(state)
if (socket.isConnected()) {
rejoinTimer.scheduleTimeout()
}
Expand All @@ -395,6 +412,7 @@ class Channel internal constructor(
val leavePush = Push(this, Event.LEAVE.event, JSONObject(), timeout)
leavePush.send()
state = State.ERRORED
onStateChange?.invoke(state)
joinPush.reset()
if (socket.isConnected()) {
rejoinTimer.scheduleTimeout()
Expand Down Expand Up @@ -558,7 +576,10 @@ class Channel internal constructor(
off(listOf(ref))
}

internal fun canPush(): Boolean {
/**
* Returns true if the channel can accept pushes (socket connected and joined)
*/
fun canPush(): Boolean {
return socket.isConnected() && isJoined()
}

Expand Down Expand Up @@ -603,6 +624,7 @@ class Channel internal constructor(
joinPush.cancelTimeout()

state = State.LEAVING
onStateChange?.invoke(state)

val onClose: (JSONObject) -> Unit = { _ ->
socket.log("CHANNEL", "Leaving", this)
Expand Down Expand Up @@ -646,6 +668,7 @@ class Channel internal constructor(

internal fun sendJoin(timeout: Int) {
state = State.JOINING
onStateChange?.invoke(state)
joinPush.resend(timeout)
}

Expand Down Expand Up @@ -681,23 +704,49 @@ class Channel internal constructor(
return "chan_reply_" + ref.toString()
}

internal fun isClosed() : Boolean {
/**
* Returns true if the channel is in CLOSED state
*/
fun isClosed() : Boolean {
return state == State.CLOSED
}

internal fun isErrored() : Boolean {
/**
* Returns true if the channel is in ERRORED state
*/
fun isErrored() : Boolean {
return state == State.ERRORED
}

internal fun isJoined() : Boolean {
/**
* Returns true if the channel is in JOINED state
*/
fun isJoined() : Boolean {
return state == State.JOINED
}

internal fun isJoining() : Boolean {
/**
* Returns true if the channel is in JOINING state
*/
fun isJoining() : Boolean {
return state == State.JOINING
}

internal fun isLeaving() : Boolean {
/**
* Returns true if the channel is in LEAVING state
*/
fun isLeaving() : Boolean {
return state == State.LEAVING
}

/**
* Returns true if the channel is stale and should be discarded.
*
* A stale channel is one that is CLOSED, ERRORED, or LEAVING.
* Consumers should create a new channel instance instead of
* reusing a stale one.
*/
fun isStale() : Boolean {
return state == State.CLOSED || state == State.ERRORED || state == State.LEAVING
}
}
141 changes: 141 additions & 0 deletions src/test/kotlin/ch/kuon/phoenix/LibraryTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import kotlin.test.*
import com.github.openjson.JSONObject
import com.github.openjson.JSONArray
import net.jodah.concurrentunit.Waiter
import java.util.concurrent.CopyOnWriteArrayList

val url = "ws://localhost:4444/socket"

Expand Down Expand Up @@ -301,6 +302,146 @@ class LibraryTest {
Thread.sleep(500)
}

@Test fun testChannelStateAfterJoin() {
val waiter = Waiter()
val sd = Socket(url)
sd.connect()

val chan = sd.channel(
"mock:lobby",
JSONObject(hashMapOf("auth" to "secret"))
)

chan
.join()
.receive("ok") { _ ->
waiter.assertEquals(Channel.State.JOINED, chan.state)
waiter.assertTrue(chan.isJoined())
waiter.assertFalse(chan.isStale())
waiter.assertTrue(chan.canPush())
waiter.assertFalse(chan.isClosed())
waiter.assertFalse(chan.isErrored())
waiter.resume()
}
.receive("error") { _ ->
waiter.fail("Should not fail join with auth")
}
waiter.await(10000)
sd.disconnect()
}

@Test fun testChannelStateAfterJoinError() {
val waiter = Waiter()
val sd = Socket(url)
sd.connect()

val chan = sd.channel("mock:lobby")

chan
.join()
.receive("ok") { _ ->
waiter.fail("Should fail join without auth")
}
.receive("error") { _ ->
waiter.assertEquals(Channel.State.ERRORED, chan.state)
waiter.assertTrue(chan.isErrored())
waiter.assertTrue(chan.isStale())
waiter.assertFalse(chan.canPush())
waiter.assertFalse(chan.isJoined())
waiter.resume()
}
waiter.await(10000)
sd.disconnect()
}

@Test fun testChannelStateAfterLeave() {
val waiter = Waiter()
val sd = Socket(url)
sd.connect()

val chan = sd.channel(
"mock:lobby",
JSONObject(hashMapOf("auth" to "secret"))
)

chan
.join()
.receive("ok") { _ ->
waiter.resume()
}
waiter.await(10000)

chan.leave()
.receive("ok") { _ ->
waiter.assertTrue(chan.isClosed())
waiter.assertTrue(chan.isStale())
waiter.assertFalse(chan.canPush())
waiter.resume()
}
waiter.await(10000)
sd.disconnect()
}

@Test fun testChannelStateInitial() {
val sd = Socket(url)
sd.connect()

val chan = sd.channel("mock:lobby")

assertEquals(Channel.State.CLOSED, chan.state)
assertTrue(chan.isClosed())
assertTrue(chan.isStale())
assertFalse(chan.canPush())

sd.disconnect()
}

@Test fun testOnStateChangeCallback() {
val waiter = Waiter()
val sd = Socket(url)
sd.connect()

val chan = sd.channel(
"mock:lobby",
JSONObject(hashMapOf("auth" to "secret"))
)

val states = CopyOnWriteArrayList<Channel.State>()
chan.onStateChange = { newState ->
states.add(newState)
}

chan
.join()
.receive("ok") { _ ->
waiter.resume()
}
waiter.await(10000)

chan.leave()
.receive("ok") { _ ->
waiter.resume()
}
waiter.await(10000)

Thread.sleep(500)

assertTrue(states.contains(Channel.State.JOINING), "Should contain JOINING")
assertTrue(states.contains(Channel.State.JOINED), "Should contain JOINED")
assertTrue(states.contains(Channel.State.LEAVING), "Should contain LEAVING")
assertTrue(states.contains(Channel.State.CLOSED), "Should contain CLOSED")

val joiningIdx = states.indexOf(Channel.State.JOINING)
val joinedIdx = states.indexOf(Channel.State.JOINED)
val leavingIdx = states.indexOf(Channel.State.LEAVING)
val closedIdx = states.indexOf(Channel.State.CLOSED)
assertTrue(joiningIdx < joinedIdx, "JOINING before JOINED")
assertTrue(joinedIdx < leavingIdx, "JOINED before LEAVING")
assertTrue(leavingIdx < closedIdx, "LEAVING before CLOSED")

sd.disconnect()
}

@Test fun testTimer() {
val waiter = Waiter()

Expand Down