From 51da724af1d6773f522e734189718a47235a66dd Mon Sep 17 00:00:00 2001 From: mohamed00736 Mohamed Abdelhakim Hacine Date: Fri, 10 Apr 2026 00:30:07 +0100 Subject: [PATCH] Expose Channel state, add isStale() and onStateChange callback - Make Channel.state public with internal set (read-only for consumers) - Make isClosed(), isErrored(), isJoined(), isJoining(), isLeaving(), canPush() public (were internal) - Add isStale() convenience method: true when CLOSED, ERRORED, or LEAVING - Add onStateChange callback that fires on every state transition - Add 5 tests covering state after join, join error, leave, initial state, and onStateChange ordering --- src/main/kotlin/ch/kuon/phoenix/Channel.kt | 63 +++++++- .../kotlin/ch/kuon/phoenix/LibraryTest.kt | 141 ++++++++++++++++++ 2 files changed, 197 insertions(+), 7 deletions(-) diff --git a/src/main/kotlin/ch/kuon/phoenix/Channel.kt b/src/main/kotlin/ch/kuon/phoenix/Channel.kt index fb8af48..3bdb7b3 100644 --- a/src/main/kotlin/ch/kuon/phoenix/Channel.kt +++ b/src/main/kotlin/ch/kuon/phoenix/Channel.kt @@ -316,7 +316,20 @@ class Channel internal constructor( } } - private var state = State.CLOSED + /** + * Current channel state (read-only for consumers) + */ + var state = State.CLOSED + internal set + + /** + * Optional callback invoked on every state transition. + * + * Fires inside the socket's synchronized block, so avoid + * long-running work in this callback. + */ + var onStateChange: ((State) -> Unit)? = null + private var bindings = listOf Unit>>() private var bindingRef: Ref = 0 @@ -351,6 +364,7 @@ class Channel internal constructor( joinPush.receive("ok") { synchronized(socket) { state = State.JOINED + onStateChange?.invoke(state) rejoinTimer.reset() pushBuffer.forEach { evt -> evt.send() @@ -361,6 +375,7 @@ class Channel internal constructor( joinPush.receive("error") { synchronized(socket) { state = State.ERRORED + onStateChange?.invoke(state) if (socket.isConnected()) { rejoinTimer.scheduleTimeout() } @@ -372,6 +387,7 @@ class Channel internal constructor( rejoinTimer.reset() socket.log("CHANNEL", "Closed", this) state = State.CLOSED + onStateChange?.invoke(state) socket.remove(this) } } @@ -383,6 +399,7 @@ class Channel internal constructor( joinPush.reset() } state = State.ERRORED + onStateChange?.invoke(state) if (socket.isConnected()) { rejoinTimer.scheduleTimeout() } @@ -395,6 +412,7 @@ class Channel internal constructor( val leavePush = Push(this, Event.LEAVE.event, JSONObject(), timeout) leavePush.send() state = State.ERRORED + onStateChange?.invoke(state) joinPush.reset() if (socket.isConnected()) { rejoinTimer.scheduleTimeout() @@ -558,7 +576,10 @@ class Channel internal constructor( off(listOf(ref)) } - internal fun canPush(): Boolean { + /** + * Returns true if the channel can accept pushes (socket connected and joined) + */ + fun canPush(): Boolean { return socket.isConnected() && isJoined() } @@ -603,6 +624,7 @@ class Channel internal constructor( joinPush.cancelTimeout() state = State.LEAVING + onStateChange?.invoke(state) val onClose: (JSONObject) -> Unit = { _ -> socket.log("CHANNEL", "Leaving", this) @@ -646,6 +668,7 @@ class Channel internal constructor( internal fun sendJoin(timeout: Int) { state = State.JOINING + onStateChange?.invoke(state) joinPush.resend(timeout) } @@ -681,23 +704,49 @@ class Channel internal constructor( return "chan_reply_" + ref.toString() } - internal fun isClosed() : Boolean { + /** + * Returns true if the channel is in CLOSED state + */ + fun isClosed() : Boolean { return state == State.CLOSED } - internal fun isErrored() : Boolean { + /** + * Returns true if the channel is in ERRORED state + */ + fun isErrored() : Boolean { return state == State.ERRORED } - internal fun isJoined() : Boolean { + /** + * Returns true if the channel is in JOINED state + */ + fun isJoined() : Boolean { return state == State.JOINED } - internal fun isJoining() : Boolean { + /** + * Returns true if the channel is in JOINING state + */ + fun isJoining() : Boolean { return state == State.JOINING } - internal fun isLeaving() : Boolean { + /** + * Returns true if the channel is in LEAVING state + */ + fun isLeaving() : Boolean { return state == State.LEAVING } + + /** + * Returns true if the channel is stale and should be discarded. + * + * A stale channel is one that is CLOSED, ERRORED, or LEAVING. + * Consumers should create a new channel instance instead of + * reusing a stale one. + */ + fun isStale() : Boolean { + return state == State.CLOSED || state == State.ERRORED || state == State.LEAVING + } } diff --git a/src/test/kotlin/ch/kuon/phoenix/LibraryTest.kt b/src/test/kotlin/ch/kuon/phoenix/LibraryTest.kt index e825ad2..9299dc5 100644 --- a/src/test/kotlin/ch/kuon/phoenix/LibraryTest.kt +++ b/src/test/kotlin/ch/kuon/phoenix/LibraryTest.kt @@ -4,6 +4,7 @@ import kotlin.test.* import com.github.openjson.JSONObject import com.github.openjson.JSONArray import net.jodah.concurrentunit.Waiter +import java.util.concurrent.CopyOnWriteArrayList val url = "ws://localhost:4444/socket" @@ -301,6 +302,146 @@ class LibraryTest { Thread.sleep(500) } + @Test fun testChannelStateAfterJoin() { + val waiter = Waiter() + val sd = Socket(url) + sd.connect() + + val chan = sd.channel( + "mock:lobby", + JSONObject(hashMapOf("auth" to "secret")) + ) + + chan + .join() + .receive("ok") { _ -> + waiter.assertEquals(Channel.State.JOINED, chan.state) + waiter.assertTrue(chan.isJoined()) + waiter.assertFalse(chan.isStale()) + waiter.assertTrue(chan.canPush()) + waiter.assertFalse(chan.isClosed()) + waiter.assertFalse(chan.isErrored()) + waiter.resume() + } + .receive("error") { _ -> + waiter.fail("Should not fail join with auth") + } + waiter.await(10000) + sd.disconnect() + } + + @Test fun testChannelStateAfterJoinError() { + val waiter = Waiter() + val sd = Socket(url) + sd.connect() + + val chan = sd.channel("mock:lobby") + + chan + .join() + .receive("ok") { _ -> + waiter.fail("Should fail join without auth") + } + .receive("error") { _ -> + waiter.assertEquals(Channel.State.ERRORED, chan.state) + waiter.assertTrue(chan.isErrored()) + waiter.assertTrue(chan.isStale()) + waiter.assertFalse(chan.canPush()) + waiter.assertFalse(chan.isJoined()) + waiter.resume() + } + waiter.await(10000) + sd.disconnect() + } + + @Test fun testChannelStateAfterLeave() { + val waiter = Waiter() + val sd = Socket(url) + sd.connect() + + val chan = sd.channel( + "mock:lobby", + JSONObject(hashMapOf("auth" to "secret")) + ) + + chan + .join() + .receive("ok") { _ -> + waiter.resume() + } + waiter.await(10000) + + chan.leave() + .receive("ok") { _ -> + waiter.assertTrue(chan.isClosed()) + waiter.assertTrue(chan.isStale()) + waiter.assertFalse(chan.canPush()) + waiter.resume() + } + waiter.await(10000) + sd.disconnect() + } + + @Test fun testChannelStateInitial() { + val sd = Socket(url) + sd.connect() + + val chan = sd.channel("mock:lobby") + + assertEquals(Channel.State.CLOSED, chan.state) + assertTrue(chan.isClosed()) + assertTrue(chan.isStale()) + assertFalse(chan.canPush()) + + sd.disconnect() + } + + @Test fun testOnStateChangeCallback() { + val waiter = Waiter() + val sd = Socket(url) + sd.connect() + + val chan = sd.channel( + "mock:lobby", + JSONObject(hashMapOf("auth" to "secret")) + ) + + val states = CopyOnWriteArrayList() + chan.onStateChange = { newState -> + states.add(newState) + } + + chan + .join() + .receive("ok") { _ -> + waiter.resume() + } + waiter.await(10000) + + chan.leave() + .receive("ok") { _ -> + waiter.resume() + } + waiter.await(10000) + + Thread.sleep(500) + + assertTrue(states.contains(Channel.State.JOINING), "Should contain JOINING") + assertTrue(states.contains(Channel.State.JOINED), "Should contain JOINED") + assertTrue(states.contains(Channel.State.LEAVING), "Should contain LEAVING") + assertTrue(states.contains(Channel.State.CLOSED), "Should contain CLOSED") + + val joiningIdx = states.indexOf(Channel.State.JOINING) + val joinedIdx = states.indexOf(Channel.State.JOINED) + val leavingIdx = states.indexOf(Channel.State.LEAVING) + val closedIdx = states.indexOf(Channel.State.CLOSED) + assertTrue(joiningIdx < joinedIdx, "JOINING before JOINED") + assertTrue(joinedIdx < leavingIdx, "JOINED before LEAVING") + assertTrue(leavingIdx < closedIdx, "LEAVING before CLOSED") + + sd.disconnect() + } + @Test fun testTimer() { val waiter = Waiter()