From e2fcd54babb438bf3792b4b8299b588d13b4ee73 Mon Sep 17 00:00:00 2001 From: William Yang Date: Wed, 23 Apr 2025 22:15:18 +0200 Subject: [PATCH 1/9] docs: draft 0030-emqx_linear_channel_registry.md --- active/0030-emqx_linear_channel_registry.md | 234 ++++++++++++++++++++ 1 file changed, 234 insertions(+) create mode 100644 active/0030-emqx_linear_channel_registry.md diff --git a/active/0030-emqx_linear_channel_registry.md b/active/0030-emqx_linear_channel_registry.md new file mode 100644 index 0000000..ad2764e --- /dev/null +++ b/active/0030-emqx_linear_channel_registry.md @@ -0,0 +1,234 @@ +# Linear Channel Registry + +## Changelog + +* 2025-04-20: @qzhuyan First draft + +## Abstract + +For a new incoming MQTT connection, the node may look for an existing channel matching the same clientid +within the cluster for taking further actions such as taking over the session, discarding the session, +or creating a new session. + +This channel registration info is a cluster wide global state maintained by the underlying database. +It is replicated asynchronously (eventually consistent), and it leads to problems when replication is +lagging or the old channel process is unresponsive. + +In this EIP, we propose another channel registration subsystem called LCR ("Linear Channel Registry") that +expands the channel prop with version (e.g., trpt_connected_at timestamp when transport connects) so that the +yong and old channels could be determined to minimize the issues caused by race conditions during session takeover +while the client is reconnecting, especially in a massive volume. + +## Motivation + +For the current implementation of channel registry (emqx_cm_registry), it has eventual consistency. + +The channel registration gets a dirty write on the core node from an RPC call. +The registration then gets replicated among the core nodes asynchronously (dirty). +The core nodes then replicate the registration to the replicant nodes. +When looking for existing channels while handling the new connection, EMQX do a lookup from the local copy. + +This is a design for high availability and high performance, having the assumption that replication is finished +before the next client connection. But when replication starts lagging due to various reasons, the following issues pop up: + +- A node does not see that the channel exists. It creates a new session instead of taking over the existing one. + +- A new channel could take over a session from the wrong channel due to the wrong order of receiving replications. + +- The older channel with an unresponsive connection kicks the latest channel, causing the client to get disconnected unexpectedly. + +- Wrong Session Present flag in MQTT.CONNACK is returned to the client. + +- The deregistration may overrun the registration, leaving a registration unreleased. + +- When a channel process starts to take over or create a new session, it runs the operation in an ekka locker transaction which uses +a cluster-wide, domain-specific fine lock, ekka_locker. By involving all the core nodes, it protects the operation from being executed +in parallel but not the registration data, leading to unexpected behaviors. + +- The ekka_locker is error-prone in that it is very easy to leave a lock unreleased in the cluster such that when the channel process gets +killed in the middle of the processing of session takeover, the deadlock detection needs 30s to detect and then remove the lock. +This means the broker will be unavailable to that client for that period, and the retries from the client will just create more +load (TLS handshake, client AUTH) on the cluster. + +We need to explore other option to ensure correctness for mission-critical use cases with some performance loss as a trade-off. + +## Design + +### Reasoning + +Based on real-world scenarios, we have the following assumptions: + +- Client to brokers latency is higher than the latency between EMQX nodes. + +- Clients do not reconnect within millisecond delays, so it is unlikely to have two channels race for the same clientid for session operations. + +Therefore, we could use the transport connected timestamp (trpt_connected_at) in ms as the version of the channel. + +@NOTE, we could use other timestamp too such as client provided timestamps embeded in MQTT user props. + +Based on the current EMQX implementation, we have the following facts: + +- Most of the time, replication is not lagging, so it is efficient to read from local to check if the self channel is the latest + or already outdated (there exists a newer one). + +- Removing the ekka locker transaction enables more than one channel to begin takeover of the session ({takeover, 'begin'}), but still, + only one can finish ({takeover, 'end'}). + +- It is no harm if the current channel finished the 'begin' phase for a non-latest channel. + +- For correctness, it should be okay to retry 'begin' takeover of another newly discovered latest channel. + +- Combined with local dirty reads and transactional updates, we could balance correctness and performance. + +- The channel of the latest connection from the client is preferred to wait and retry takeover of the session instead of getting a + negative MQTT.CONNACK immediately. + +- Session registration is a bag table; multiple channels of the same client could co-exist. New design could follow this. + + +Based on the above, with versioned channels, channels from the same client become comparable, and EMQX could find the most recent channel +or check if the current processing connection is outdated. + +If it is outdated, it returns MQTT.CONNACK with a negative reason code. +Otherwise, it tries to begin takeover of the most recent channel. + +Whatever the takeover success or not (created new session @TODO expand this), it tries to do a transactional update with the version of the +most recent channel (`ver_local_max`). In this transaction, it reads the real max version (`ver_real_max`) and compares it with the `ver_local_max`. + +If they match, it should add itself to the registration table. + +If unmatched and `curr_vsn` < `ver_real_max`, it should abort the transaction and then return MQTT.CONNACK with a negative reason code. + +If unmatched AND `curr_vsn` > `ver_real_max` > `ver_local_max`, it should abort the transaction and then restart this procedure to takeover +the channel has `ver_real_max`. + +It shouldn't happen that `curr_vsn` > `ver_real_max` AND `ver_real_max` < `ver_local_max`, but for correctness, it should abort the transaction +and then return MQTT.CONNACK with a negative reason code. + +It continues to finish the 2nd phase, the `{takeover, 'end'}` part, so the old channel will deregister itself from the registry. + + +### Designing lcr_channel + +`lcr_channel` of LCR represents the EMQX channel that provides a global view and maintain a global state within the cluster. + +Erlang +-record(lcr_channel, { + id :: lcr_channel_id(), + pid :: pid() | undefined, + vsn :: integer() | undefined +}). + +`lcr_channel` is bag table using the value of clientid as the key (id). + +`#lcr_channel.pid` is the channel process pid, global unique, and contains embedded info of the node. +`#lcr_channel.vsn` is used to compare or sort the channels. + +@TODO If we could prove that there max has two generations, we could convert it to ordered_set. + +For write, it is done transactionally. A Mria transaction with sharding is executed on one of the core nodes (strong consistency among core nodes), +and it will get replicated to the replicant nodes asynchronously (eventual consistency). + +For read, a node reads from local storage. + +For delete, it is done asynchronously (dirty), but it will never overrun the writes. + +There is no need for updates in core or replicant. @TODO Do we need to prove that it is written once? + +### lcr_channel life cycle + +lcr_channel is written when: + +1. the session is created AND after updating the local ETS for other tables of the channel. + +1. the session takeover 'begin' is finished AND after updating the local ETS for other tables of the channel. + + +lcr_channel can be read at any time. + + +For deletion, there are the following triggers: + +1. the channel process exits gracefully, where in the terminating phase, it deregisters itself from LCR. + +1. node down/Mnesia down event: + + The living node may remove the channel belonging to the down node. It must be done only once within + the cluster successfully. + +## Backwards Compatibility + +`LCR` should be disabled by default with a feature switch. + +### LCR Disabled + +When `LCR` is disabled, there should be no backwards compatibility issue, and the EMQX cluster should work as it does in the old version. + +### LCR Enabled within the cluster + +Once `LCR` is enabled, `LCR` can co-exist with `emqx_cm_registry` with a feature switch. + +READs from `LCR` storage only. WRITES go to both if `emqx_cm_registry` is also enabled (global), but DO NOT use `ekka_locker:trans`. + + +### LCR partially Enabled within the cluster + +During a rolling upgrade, the EMQX cluster could run with `LCR` partially enabled due to the mixed versions of EMQX nodes. + +To be compatible with old bpapi calls from the old version of EMQX, there are two problems to solve: + +1. In the RPC call, no channel version is missing. +2. `emqx_cm_registry` is the only provider, and it has no channel vsn. + +Thus, we define the fallback method here for Node evacuation and rolling upgrade: + +For newer EMQX handling the call from older EMQX, the channel version uses the value of the timestamp: `disconnected_at`. + +Newer EMQX MUST NOT disable the `emqx_cm_registry`; as part of the standard implementation, the writes will go to both storage. + +For newer EMQX calling the older EMQX, the newer EMQX should write to the `LCR` once the bpapi call is successful. The newer EMQX MUST not update the `emqx_cm_registry` storage as it is part of the call handling on the older EMQX. + + +### LCR runtime enabling + +UNSUPPORTED for now. + +### LCR runtime disabling + +Supported, but we don't know the side effects. Leave it as @TODO. + + +## Document Changes + +If there is any document change, give a brief description of it here. + +## Testing Suggestions + +### Functional test + +1. The existing common test suites should be expanded with a new group `lcr_on`, `lcr_off` + + - takeover suite + - cm suite + + Add a new cluster test suite for testing core + replicant roles covering known issues in previous chapters + +2. Rolling upgrade should be tested with the `LCR` switch on. + +## Performance test + +Performance tests should be performed against a 3 cores + 3 replicants cluster. + +Compare the performance with `LCR` on/off with the following scenarios: + +1. initial connections +2. reconnections +3. all clients connect with the same `clientid`. + +## Declined Alternatives + +Here goes which alternatives were discussed but considered worse than the current. +It's to help people understand how we reached the current state and also to +prevent going through the discussion again when an old alternative is brought +up again in the future. + From 0f76fac87bbe39f70eeb87d6f897f86a6d0d89a0 Mon Sep 17 00:00:00 2001 From: William Yang Date: Thu, 24 Apr 2025 14:47:57 +0200 Subject: [PATCH 2/9] add mermaid chart --- active/0030-emqx_linear_channel_registry.md | 45 +++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/active/0030-emqx_linear_channel_registry.md b/active/0030-emqx_linear_channel_registry.md index ad2764e..c6bf57a 100644 --- a/active/0030-emqx_linear_channel_registry.md +++ b/active/0030-emqx_linear_channel_registry.md @@ -107,6 +107,51 @@ and then return MQTT.CONNACK with a negative reason code. It continues to finish the 2nd phase, the `{takeover, 'end'}` part, so the old channel will deregister itself from the registry. +```mermaid +--- +config: + theme: redux +--- +flowchart TD + A(["NewConnection"]) --> B{"clearSession?"}; + + B -- YES --> D["OpenSession"]; + D --> E{"ReadMaxVsn"}; + E -- FOUND --> F{"TakeoverBeginSuccess"}; + + E -- NOTFOUND --> C; + F -- YES --> Transaction + + F -- NO --> C; + B -- NO --> C["NewSession"]; + + H["ReadLatestVsn"] + H --> H1{"LatestVsn>CurrVsn"} + H1 -- YES --> K + H1 -- NO --> I + C --> Transaction + H --> I{"LocalMax==LatestVsn?"} + I -- NO --> K["Abort"] + I -- YES --> J["commit"] + J --> L1["SUCCESS"] + K --> M{Retriable?} + M --NO--> L2["FAIL"] + M --YES--> F + + subgraph Transaction + H + H1 + I + J + K + end + + subgraph LocalDirtyAsync + E + F + C + end +``` ### Designing lcr_channel From 473e1829efc94d63dfbca232ff09a99d38021287 Mon Sep 17 00:00:00 2001 From: William Yang Date: Thu, 24 Apr 2025 17:22:12 +0200 Subject: [PATCH 3/9] updates --- active/0030-emqx_linear_channel_registry.md | 99 +++++++++++++++------ 1 file changed, 73 insertions(+), 26 deletions(-) diff --git a/active/0030-emqx_linear_channel_registry.md b/active/0030-emqx_linear_channel_registry.md index c6bf57a..67039ad 100644 --- a/active/0030-emqx_linear_channel_registry.md +++ b/active/0030-emqx_linear_channel_registry.md @@ -62,7 +62,7 @@ Based on real-world scenarios, we have the following assumptions: - Clients do not reconnect within millisecond delays, so it is unlikely to have two channels race for the same clientid for session operations. -Therefore, we could use the transport connected timestamp (trpt_connected_at) in ms as the version of the channel. +Therefore, we could use the transport connected timestamp (`trpt_connected_at`) in ms as the version of the channel. @NOTE, we could use other timestamp too such as client provided timestamps embeded in MQTT user props. @@ -85,27 +85,38 @@ Based on the current EMQX implementation, we have the following facts: - Session registration is a bag table; multiple channels of the same client could co-exist. New design could follow this. +Based on the above, with versioning, channels from the same client become comparable, and EMQX could find the most recent channel or check if the current connection it is processing is outdated. -Based on the above, with versioned channels, channels from the same client become comparable, and EMQX could find the most recent channel -or check if the current processing connection is outdated. +We use three versions during the processing: -If it is outdated, it returns MQTT.CONNACK with a negative reason code. -Otherwise, it tries to begin takeover of the most recent channel. +- `ver_LocalMax` -Whatever the takeover success or not (created new session @TODO expand this), it tries to do a transactional update with the version of the -most recent channel (`ver_local_max`). In this transaction, it reads the real max version (`ver_real_max`) and compares it with the `ver_local_max`. + Max version of existing `channel`s from **local async dirty read**. + + Value is `undefined` if and only if no matching channel is found. -If they match, it should add itself to the registration table. +- `ver_RealMax` -If unmatched and `curr_vsn` < `ver_real_max`, it should abort the transaction and then return MQTT.CONNACK with a negative reason code. + Max version of existing `channel`s from **transactional read**. + + Value is `undefined` if and only if no matching channel is found. -If unmatched AND `curr_vsn` > `ver_real_max` > `ver_local_max`, it should abort the transaction and then restart this procedure to takeover -the channel has `ver_real_max`. +- `ver_curr` + + `channel` Version from the execution process stack. + +With actions below: -It shouldn't happen that `curr_vsn` > `ver_real_max` AND `ver_real_max` < `ver_local_max`, but for correctness, it should abort the transaction -and then return MQTT.CONNACK with a negative reason code. +IF `ver_curr` < `ver_LocalMax`, drops the processing early, returns negtive CONNACK. __HAPPY FAIL__ + +ELSEIF `ver_curr` < `ver_RealMax`, drops the processing late, returns negtive CONNACK. __EXPENSIVE FAIL__ + +ELSEIF `ver_RealMax` > `ver_LocalMax`, restart the processing with `ver_RealMax` with limited number of retries. __MOST EXPENSIVE PATH__ + +ELSEIF `ver_RealMax` == `ver_LocalMax`, write with `ver_curr` and continue with the processing. __HAPPY ENDING__ + +It is very unlikely to happen that `ver_curr` > `ver_RealMax` AND `ver_RealMax` < `ver_LocalMax`, but for correctness, it should abort the transaction and then return MQTT.CONNACK with a negative reason code then log with INFO message. -It continues to finish the 2nd phase, the `{takeover, 'end'}` part, so the old channel will deregister itself from the registry. ```mermaid --- @@ -114,31 +125,44 @@ config: --- flowchart TD A(["NewConnection"]) --> B{"clearSession?"}; - + B -- YES --> D["OpenSession"]; - D --> E{"ReadMaxVsn"}; - E -- FOUND --> F{"TakeoverBeginSuccess"}; + D --> E{"ReadLocalMax"}; + E -- FOUND --> E1{"LocalMax>curr_ver?"} + F0@{shape: lean-r, label: "vsn"} + E1 -- "`NO + ver=LocalMax`" --> F0 + F0 --> F{"TakeoverBeginSuccess"}; + E1 -- YES --> L2 E -- NOTFOUND --> C; - F -- YES --> Transaction + F -- "`YES + vsn=LocalMax`" --> Transaction F -- NO --> C; B -- NO --> C["NewSession"]; - H["ReadLatestVsn"] - H --> H1{"LatestVsn>CurrVsn"} + H0@{shape: lean-r, label: "vsn"} + H0 --> H["ReadRealMax"] + H --> H1{"RealMax>CurrVsn"} H1 -- YES --> K H1 -- NO --> I - C --> Transaction - H --> I{"LocalMax==LatestVsn?"} + C --"vsn=undef"--> Transaction + H --> I{"LocalMax==RealMax?"} I -- NO --> K["Abort"] I -- YES --> J["commit"] + J --> L1["SUCCESS"] K --> M{Retriable?} M --NO--> L2["FAIL"] - M --YES--> F + M --"`YES + vsn=RealMax`"--> F0 + + L2 --> Z1["Connack"] + L1 --> Z0["Continue"] subgraph Transaction + H0 H H1 I @@ -148,12 +172,18 @@ flowchart TD subgraph LocalDirtyAsync E + E1 + F0 F C end + + ``` -### Designing lcr_channel +The transaction to run is micro and abortive, it only reads and writes the same key, only one lock is taken so it is unlikely to get restarted by mria/mnesia. + +### record #lcr_channel{} `lcr_channel` of LCR represents the EMQX channel that provides a global view and maintain a global state within the cluster. @@ -180,7 +210,7 @@ For delete, it is done asynchronously (dirty), but it will never overrun the wri There is no need for updates in core or replicant. @TODO Do we need to prove that it is written once? -### lcr_channel life cycle +### lcr_channel lifecycle lcr_channel is written when: @@ -188,19 +218,36 @@ lcr_channel is written when: 1. the session takeover 'begin' is finished AND after updating the local ETS for other tables of the channel. - lcr_channel can be read at any time. For deletion, there are the following triggers: 1. the channel process exits gracefully, where in the terminating phase, it deregisters itself from LCR. + triggers: + - transport close + - taken over (`{takeover, 'end'}`) + - discarded + - kicked 1. node down/Mnesia down event: The living node may remove the channel belonging to the down node. It must be done only once within the cluster successfully. + For replicant node down, its upstream core node should clean the channels for it. + For core node down, @TODO + + +### Drawbacks + +- Compare to the `ekka_locker`, using transaction will stress the `mnesia_locker` which is single point of bottleneck. + + But `mnesia_locker` is more robust than `ekka_locker`. + +- "registration history function" will not be supported as we don't want large data set. + It must be reimplemented. + ## Backwards Compatibility `LCR` should be disabled by default with a feature switch. From 677f140aeb176e3a12b9b0a4974fbb0eb1f33b38 Mon Sep 17 00:00:00 2001 From: William Yang Date: Thu, 24 Apr 2025 19:05:59 +0200 Subject: [PATCH 4/9] update mermaid --- active/0030-emqx_linear_channel_registry.md | 97 ++++++++++----------- 1 file changed, 46 insertions(+), 51 deletions(-) diff --git a/active/0030-emqx_linear_channel_registry.md b/active/0030-emqx_linear_channel_registry.md index 67039ad..dd9a42c 100644 --- a/active/0030-emqx_linear_channel_registry.md +++ b/active/0030-emqx_linear_channel_registry.md @@ -121,64 +121,59 @@ It is very unlikely to happen that `ver_curr` > `ver_RealMax` AND `ver_RealMax` ```mermaid --- config: - theme: redux + theme: redux --- flowchart TD - A(["NewConnection"]) --> B{"clearSession?"}; - - B -- YES --> D["OpenSession"]; - D --> E{"ReadLocalMax"}; - E -- FOUND --> E1{"LocalMax>curr_ver?"} - F0@{shape: lean-r, label: "vsn"} - E1 -- "`NO - ver=LocalMax`" --> F0 - F0 --> F{"TakeoverBeginSuccess"}; - E1 -- YES --> L2 - - E -- NOTFOUND --> C; - F -- "`YES - vsn=LocalMax`" --> Transaction - - F -- NO --> C; - B -- NO --> C["NewSession"]; - - H0@{shape: lean-r, label: "vsn"} - H0 --> H["ReadRealMax"] - H --> H1{"RealMax>CurrVsn"} - H1 -- YES --> K - H1 -- NO --> I - C --"vsn=undef"--> Transaction - H --> I{"LocalMax==RealMax?"} - I -- NO --> K["Abort"] - I -- YES --> J["commit"] - - J --> L1["SUCCESS"] - K --> M{Retriable?} - M --NO--> L2["FAIL"] - M --"`YES - vsn=RealMax`"--> F0 + NewConn-->ReadLocalMax + ReadLocalMax-->C0{LocalMax>Curr?} + C0--TRUE-->Fail + C0--FALSE-->C1{CleanStart} + VsnDirty --->Takeover + C1 --FALSE--> VsnDirty@{shape=input} - L2 --> Z1["Connack"] - L1 --> Z0["Continue"] + Takeover--Fail-->NewSession - subgraph Transaction - H0 - H - H1 - I - J - K - end + C1 --TRUE--> NewSession - subgraph LocalDirtyAsync - E - E1 - F0 - F - C - end + ReadRealMax-->C2{RealMax>Curr?} + + Takeover--Success---->Transaction + NewSession-->Transaction + C2--FALSE-->C3{LocalMax==RealMax} + C2--TRUE-->Abort + + C3--True-->Commit + C3--False-->Abort + + + Abort-->C4{Retryable?}; + C4--YES-->VsnDirty + C4--NO-->Fail + + Commit-->TakoverContinue + + Fail-->NegtiveConnack + TakoverContinue-..->PositiveConnak + + subgraph LocalAsyncDirty + direction TB + ReadLocalMax + C0 + C1 + VsnDirty + Takeover + NewSession + end + subgraph Transaction + direction TB + ReadRealMax + C2 + C3 + Abort + Commit + end ``` The transaction to run is micro and abortive, it only reads and writes the same key, only one lock is taken so it is unlikely to get restarted by mria/mnesia. From 0eeb14001f282775a68d49e0ba22fd323bb2c05f Mon Sep 17 00:00:00 2001 From: William Yang Date: Mon, 28 Apr 2025 10:42:28 +0200 Subject: [PATCH 5/9] chore: fix typos --- active/0030-emqx_linear_channel_registry.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/active/0030-emqx_linear_channel_registry.md b/active/0030-emqx_linear_channel_registry.md index dd9a42c..4088e2e 100644 --- a/active/0030-emqx_linear_channel_registry.md +++ b/active/0030-emqx_linear_channel_registry.md @@ -16,7 +16,7 @@ lagging or the old channel process is unresponsive. In this EIP, we propose another channel registration subsystem called LCR ("Linear Channel Registry") that expands the channel prop with version (e.g., trpt_connected_at timestamp when transport connects) so that the -yong and old channels could be determined to minimize the issues caused by race conditions during session takeover +young and old channels could be determined to minimize the issues caused by race conditions during session takeover while the client is reconnecting, especially in a massive volume. ## Motivation @@ -107,9 +107,9 @@ We use three versions during the processing: With actions below: -IF `ver_curr` < `ver_LocalMax`, drops the processing early, returns negtive CONNACK. __HAPPY FAIL__ +IF `ver_curr` < `ver_LocalMax`, drops the processing early, returns negative CONNACK. __HAPPY FAIL__ -ELSEIF `ver_curr` < `ver_RealMax`, drops the processing late, returns negtive CONNACK. __EXPENSIVE FAIL__ +ELSEIF `ver_curr` < `ver_RealMax`, drops the processing late, returns negative CONNACK. __EXPENSIVE FAIL__ ELSEIF `ver_RealMax` > `ver_LocalMax`, restart the processing with `ver_RealMax` with limited number of retries. __MOST EXPENSIVE PATH__ @@ -152,10 +152,10 @@ flowchart TD C4--YES-->VsnDirty C4--NO-->Fail - Commit-->TakoverContinue + Commit-->TakeoverContinue Fail-->NegtiveConnack - TakoverContinue-..->PositiveConnak + TakeoverContinue-..->PositiveConnack subgraph LocalAsyncDirty direction TB From 38340f7f7eaca0f140222b996acdb2529368341d Mon Sep 17 00:00:00 2001 From: William Yang Date: Tue, 20 May 2025 16:00:13 +0200 Subject: [PATCH 6/9] rename LCR to LSR and add more notes for the tests --- active/0030-emqx_linear_channel_registry.md | 137 +++++++++++++------- 1 file changed, 87 insertions(+), 50 deletions(-) diff --git a/active/0030-emqx_linear_channel_registry.md b/active/0030-emqx_linear_channel_registry.md index 4088e2e..1b19b44 100644 --- a/active/0030-emqx_linear_channel_registry.md +++ b/active/0030-emqx_linear_channel_registry.md @@ -1,21 +1,22 @@ -# Linear Channel Registry +# Linear Session Registry (LSR) ## Changelog * 2025-04-20: @qzhuyan First draft +* 2025-05-20: @qzhuyan rename to LSR, add more notes for the tests ## Abstract -For a new incoming MQTT connection, the node may look for an existing channel matching the same clientid +For a new incoming MQTT connection, the node may look for an existing session matching the same clientid within the cluster for taking further actions such as taking over the session, discarding the session, or creating a new session. -This channel registration info is a cluster wide global state maintained by the underlying database. +This session registration info is a cluster wide global state maintained by the underlying database. It is replicated asynchronously (eventually consistent), and it leads to problems when replication is lagging or the old channel process is unresponsive. -In this EIP, we propose another channel registration subsystem called LCR ("Linear Channel Registry") that -expands the channel prop with version (e.g., trpt_connected_at timestamp when transport connects) so that the +In this EIP, we propose another session registration subsystem called LSR ("Linear Session Registry") that +expands the channel prop with version (e.g., `transport_started_at` timestamp when transport connects) so that the young and old channels could be determined to minimize the issues caused by race conditions during session takeover while the client is reconnecting, especially in a massive volume. @@ -23,6 +24,11 @@ while the client is reconnecting, especially in a massive volume. For the current implementation of channel registry (emqx_cm_registry), it has eventual consistency. +@NOTE: Term 'channel registry' is used in current EMQX code base as session binds to channels, one session may have multi channels +but now it is suggested to use the term 'session registry'. + +In this doc, we mix use of both, `channel registry` means the old `emqx_cm_registry`, while `session registry` means the LSR. + The channel registration gets a dirty write on the core node from an RPC call. The registration then gets replicated among the core nodes asynchronously (dirty). The core nodes then replicate the registration to the replicant nodes. @@ -31,7 +37,7 @@ When looking for existing channels while handling the new connection, EMQX do a This is a design for high availability and high performance, having the assumption that replication is finished before the next client connection. But when replication starts lagging due to various reasons, the following issues pop up: -- A node does not see that the channel exists. It creates a new session instead of taking over the existing one. +- A node does not see that the session exists. It creates a new session instead of taking over the existing one. - A new channel could take over a session from the wrong channel due to the wrong order of receiving replications. @@ -39,11 +45,17 @@ before the next client connection. But when replication starts lagging due to va - Wrong Session Present flag in MQTT.CONNACK is returned to the client. + If handling node doesn't know the existence of the session on the other node, it will open new session and return Session Present = false + to the client. The client will get confused if it just disconnect from other node and reconnect to the handling node. + + further more it also forks the session into two, whether the next reconnect will takeover this session or the other session + becomes uncertain. + - The deregistration may overrun the registration, leaving a registration unreleased. - When a channel process starts to take over or create a new session, it runs the operation in an ekka locker transaction which uses a cluster-wide, domain-specific fine lock, ekka_locker. By involving all the core nodes, it protects the operation from being executed -in parallel but not the registration data, leading to unexpected behaviors. +in parallel but not the registration data is not 'thread safe', leading to unexpected behaviors. - The ekka_locker is error-prone in that it is very easy to leave a lock unreleased in the cluster such that when the channel process gets killed in the middle of the processing of session takeover, the deadlock detection needs 30s to detect and then remove the lock. @@ -58,11 +70,14 @@ We need to explore other option to ensure correctness for mission-critical use c Based on real-world scenarios, we have the following assumptions: -- Client to brokers latency is higher than the latency between EMQX nodes. +- Client-to-broker latency exceeds the time synchronization difference between EMQX nodes. + Client-to-broker: 10ms+ + EMQX nodes time diff: < 2ms which requires NTP server in use is correctly configured. + - Clients do not reconnect within millisecond delays, so it is unlikely to have two channels race for the same clientid for session operations. -Therefore, we could use the transport connected timestamp (`trpt_connected_at`) in ms as the version of the channel. +Therefore, we could use the transport connected timestamp (`transport_connected_at`) in ms as the version of the channel. @NOTE, we could use other timestamp too such as client provided timestamps embeded in MQTT user props. @@ -85,7 +100,8 @@ Based on the current EMQX implementation, we have the following facts: - Session registration is a bag table; multiple channels of the same client could co-exist. New design could follow this. -Based on the above, with versioning, channels from the same client become comparable, and EMQX could find the most recent channel or check if the current connection it is processing is outdated. +Based on the above, with versioning, channels from the same client become comparable, and EMQX could find the most recent channel, check if the current connection it is processing is outdated +that it communicate with the client with latest view. We use three versions during the processing: @@ -97,7 +113,7 @@ We use three versions during the processing: - `ver_RealMax` - Max version of existing `channel`s from **transactional read**. + Max version of existing `channel`s from **transactional read** from the cluster. Value is `undefined` if and only if no matching channel is found. @@ -115,7 +131,8 @@ ELSEIF `ver_RealMax` > `ver_LocalMax`, restart the processing with `ver_RealMax` ELSEIF `ver_RealMax` == `ver_LocalMax`, write with `ver_curr` and continue with the processing. __HAPPY ENDING__ -It is very unlikely to happen that `ver_curr` > `ver_RealMax` AND `ver_RealMax` < `ver_LocalMax`, but for correctness, it should abort the transaction and then return MQTT.CONNACK with a negative reason code then log with INFO message. +It is very unlikely to happen that `ver_curr` > `ver_RealMax` AND `ver_RealMax` < `ver_LocalMax`, but for correctness, it should abort the transaction +and then return MQTT.CONNACK with a negative reason code then log with INFO message. ```mermaid @@ -178,23 +195,22 @@ flowchart TD The transaction to run is micro and abortive, it only reads and writes the same key, only one lock is taken so it is unlikely to get restarted by mria/mnesia. -### record #lcr_channel{} +### record #lsr_channel{} -`lcr_channel` of LCR represents the EMQX channel that provides a global view and maintain a global state within the cluster. +`lsr_channel` of LSR represents the EMQX channels that provides a global view and maintain a global state within the cluster. -Erlang --record(lcr_channel, { - id :: lcr_channel_id(), +``` Erlang +-record(lsr_channel, { + id :: lsr_session_id(), pid :: pid() | undefined, vsn :: integer() | undefined }). +``` -`lcr_channel` is bag table using the value of clientid as the key (id). - -`#lcr_channel.pid` is the channel process pid, global unique, and contains embedded info of the node. -`#lcr_channel.vsn` is used to compare or sort the channels. +`lsr_channel` is bag table using the value of sessionid/clientid as the key (id). -@TODO If we could prove that there max has two generations, we could convert it to ordered_set. +`#lsr_channel.pid` is the channel process pid, global unique, and contains embedded info of the node. +`#lsr_channel.vsn` is used to compare or sort the channels. For write, it is done transactionally. A Mria transaction with sharding is executed on one of the core nodes (strong consistency among core nodes), and it will get replicated to the replicant nodes asynchronously (eventual consistency). @@ -205,20 +221,24 @@ For delete, it is done asynchronously (dirty), but it will never overrun the wri There is no need for updates in core or replicant. @TODO Do we need to prove that it is written once? -### lcr_channel lifecycle +### #chan_conn{} + +The `#chan_conn{}` in node local ets table `CHAN_CONN_TAB` will be expanded with new field `vsn` for local vsn trackings. -lcr_channel is written when: +### lsr_channel lifecycle + +`lsr_channel` is written when: 1. the session is created AND after updating the local ETS for other tables of the channel. 1. the session takeover 'begin' is finished AND after updating the local ETS for other tables of the channel. -lcr_channel can be read at any time. +`lsr_channel` can be read at any time. For deletion, there are the following triggers: -1. the channel process exits gracefully, where in the terminating phase, it deregisters itself from LCR. +1. the channel process exits gracefully, where in the terminating phase, it deregisters itself from LSR in dirty async context. triggers: - transport close - taken over (`{takeover, 'end'}`) @@ -227,12 +247,13 @@ For deletion, there are the following triggers: 1. node down/Mnesia down event: - The living node may remove the channel belonging to the down node. It must be done only once within + One living node may remove the channel belonging to the down node. It must be done only once within the cluster successfully. - For replicant node down, its upstream core node should clean the channels for it. - For core node down, @TODO + For replicant node down, a phashed core node will be assigned to clean the channels of the down node. + For core node down, channels should be cleaned up while core is started or cleaned up via maintaincence API/CLI if user is + not able to get that core node back to online. ### Drawbacks @@ -241,30 +262,39 @@ For deletion, there are the following triggers: But `mnesia_locker` is more robust than `ekka_locker`. - "registration history function" will not be supported as we don't want large data set. - It must be reimplemented. + + It must be reimplemented for LSR. + +### Other dependencies + +- `Mria` may offer a transaction API with a restricted number of retries. ## Backwards Compatibility -`LCR` should be disabled by default with a feature switch. +`LSR` should be disabled by default with a feature switch. -### LCR Disabled +### LSR Disabled -When `LCR` is disabled, there should be no backwards compatibility issue, and the EMQX cluster should work as it does in the old version. +When `LSR` is disabled, there should be no backwards compatibility issue, and the EMQX cluster should work as it does in the old version. -### LCR Enabled within the cluster +### LSR Enabled within the cluster -Once `LCR` is enabled, `LCR` can co-exist with `emqx_cm_registry` with a feature switch. +Once `LSR` is enabled, `LSR` can co-exist with `emqx_cm_registry` with a feature switch. -READs from `LCR` storage only. WRITES go to both if `emqx_cm_registry` is also enabled (global), but DO NOT use `ekka_locker:trans`. +READs from `LSR` storage only. WRITES go to both if `emqx_cm_registry` is also enabled (global), but DO NOT use `ekka_locker:trans`. -### LCR partially Enabled within the cluster +### LSR 'Partially Enabled' within the cluster -During a rolling upgrade, the EMQX cluster could run with `LCR` partially enabled due to the mixed versions of EMQX nodes. +During a rolling upgrade, the EMQX cluster could run with `LSR` partially enabled due to the mixed versions of EMQX nodes. -To be compatible with old bpapi calls from the old version of EMQX, there are two problems to solve: +To be compatible with old bpapi calls from the old version of EMQX, -1. In the RPC call, no channel version is missing. +For newer EMQX, all registrations (`emqx_cm:register_channel/1`) will be updated to the `LSR` table only. + +and there are two problems to solve: + +1. In the RPC call, channel version is missing. 2. `emqx_cm_registry` is the only provider, and it has no channel vsn. Thus, we define the fallback method here for Node evacuation and rolling upgrade: @@ -273,14 +303,15 @@ For newer EMQX handling the call from older EMQX, the channel version uses the v Newer EMQX MUST NOT disable the `emqx_cm_registry`; as part of the standard implementation, the writes will go to both storage. -For newer EMQX calling the older EMQX, the newer EMQX should write to the `LCR` once the bpapi call is successful. The newer EMQX MUST not update the `emqx_cm_registry` storage as it is part of the call handling on the older EMQX. +For newer EMQX calling the older EMQX, the newer EMQX should write to the `LSR` once the bpapi call is successful. +The newer EMQX MUST not update the `emqx_cm_registry` storage as it is part of the call handling on the older EMQX. -### LCR runtime enabling +### LSR runtime enabling UNSUPPORTED for now. -### LCR runtime disabling +### LSR runtime disabling Supported, but we don't know the side effects. Leave it as @TODO. @@ -293,24 +324,30 @@ If there is any document change, give a brief description of it here. ### Functional test -1. The existing common test suites should be expanded with a new group `lcr_on`, `lcr_off` +1. The existing common test suites should be expanded with a new group `lsr_on`, `lsr_off` - takeover suite - cm suite Add a new cluster test suite for testing core + replicant roles covering known issues in previous chapters -2. Rolling upgrade should be tested with the `LCR` switch on. +2. Rolling upgrade should be tested with the `LSR` switch on. ## Performance test Performance tests should be performed against a 3 cores + 3 replicants cluster. -Compare the performance with `LCR` on/off with the following scenarios: - -1. initial connections -2. reconnections -3. all clients connect with the same `clientid`. +Compare the performance with `LSR` on/off with the following scenarios: + +1. Initial connections +1. Disconnect and leaves no persistent session. +1. Disconnect and leave persistent session. +1. Session takeover with existing connection +1. Session takeover without existing connection +1. Session discard with existing connection +1. Session discard without existing connection +1. While having a large number of persistent session, kill replicants see if core stands. +1. Realword reconnect: two groups of clients takeover/discard each other's sessions with auto reconnect on. ## Declined Alternatives From b7bd6117d11d7553d2d63f87dfaa2e3c2b4b9736 Mon Sep 17 00:00:00 2001 From: William Yang Date: Tue, 20 May 2025 16:04:38 +0200 Subject: [PATCH 7/9] rename lcr -> lsr --- ...r_channel_registry.md => 0030-emqx_linear_session_registry.md} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename active/{0030-emqx_linear_channel_registry.md => 0030-emqx_linear_session_registry.md} (100%) diff --git a/active/0030-emqx_linear_channel_registry.md b/active/0030-emqx_linear_session_registry.md similarity index 100% rename from active/0030-emqx_linear_channel_registry.md rename to active/0030-emqx_linear_session_registry.md From 2a1654659df3f15845c9cc138e95fb08f3e61762 Mon Sep 17 00:00:00 2001 From: William Yang Date: Thu, 5 Jun 2025 15:42:39 +0200 Subject: [PATCH 8/9] doc(LSR): TLA spec --- active/0030-assets/LSR.tla | 204 +++++++++++++++++++++++++++++++++++++ 1 file changed, 204 insertions(+) create mode 100644 active/0030-assets/LSR.tla diff --git a/active/0030-assets/LSR.tla b/active/0030-assets/LSR.tla new file mode 100644 index 0000000..9e61198 --- /dev/null +++ b/active/0030-assets/LSR.tla @@ -0,0 +1,204 @@ +-------------------------------- MODULE LSR -------------------------------- +EXTENDS Sequences, Naturals, TLC +VARIABLES CStores, (* Core storages *) + RStores, (* Replicant storages *) + RQueue, (* Replicant storages async queue *) + CQueue, (* Core storages async queue *) + Chans (* channels *) +CONSTANT Cores, Replicants, CHs, NONE + +CHStates == {NONE, "registered", "aborted", "unregistered", "closed"} + +TypeInvariant == + /\ Chans \in [CHs -> [loc: Replicants \union {NONE}, \* location of the channel + state: CHStates, \* channel state + retry: {3}, \* left of takeover retries + pre: CHs \union {NONE}]] \* predecessor from best of the knowledge + +----------------------------------------------------------------------------- + +(* helpers *) + +Max(S) == + CHOOSE x \in S : \A y \in S : x >= y + +MaxOrNone(S) == + IF S = {} THEN NONE ELSE Max(S) + +MaxRVsn(node) == MaxOrNone(RStores[node]) + +MaxCVsn(node) == MaxOrNone(CStores["c1"]) \* impl core node selection + + +IsMaxInStore(v, store) == \A i \in store : i < v + +DoREnqueue(log) == + RQueue' = [R \in Replicants |-> Append(RQueue[R], log)] + +DequeueRQueue(r) == + /\ RQueue[r] /= <<>> + /\ LET h == Head(RQueue[r]) IN + RStores' = [ RStores EXCEPT ![r] = RStores[r] \union {h.key} ] + /\ RQueue' = [RQueue EXCEPT ![r] = Tail(RQueue[r])] + /\ UNCHANGED <> + + +insert_job(ch) == [op |-> "insert", key |-> ch] +del_job(ch) == [op |-> "del", key |-> ch] + + +DO_TX_COMMIT(c, max_vsn) == + /\ Chans' = [ Chans EXCEPT ![c].pre = max_vsn, ![c].state = "Registered" ] + /\ CStores' = [ node \in DOMAIN CStores |-> {c} \union CStores[node]] + /\ RQueue' = [ node \in DOMAIN RQueue |-> Append(RQueue[node], insert_job(c))] + /\ UNCHANGED <> + +DO_DIRTY_UNREG(c) == + /\ CQueue' = [ node \in DOMAIN CQueue |-> Append(CQueue[node], del_job(c))] + /\ UNCHANGED <> + + +NOOP(c) == TRUE +----------------------------------------------------------------------------- + +Init == + /\ CStores = [ c \in Cores |-> {}] + /\ RStores = [ r \in Replicants |-> {}] + /\ CQueue = [ c \in Cores |-> <<>>] + /\ RQueue = [ r \in Replicants |-> <<>>] + /\ Chans = [ch \in CHs |-> [loc |-> NONE, state |-> NONE, pre |-> NONE, retry |-> 3]] + + +(* CH connect *) +NextCHConnect(c) == + \E r \in Replicants: + /\ Chans[c].loc = NONE + /\ Chans' = [Chans EXCEPT ![c].loc = r] + /\ UNCHANGED <> + +\* Finshed Stage one, New session +DoNewSession(c) == + /\ Chans' = [ Chans EXCEPT ![c].pre = NONE, ![c].state = "NewSession" ] + +\* @TODO takeover session involving other ch +DoTakeoverSession(c) == TRUE + +DoTakeoverSessionTX(c) == LET ch == Chans[c] max_vsn == MaxCVsn(ch.loc) IN + /\ max_vsn /= NONE + /\ IF max_vsn = ch.pre + THEN /\ DO_TX_COMMIT(c, max_vsn) + ELSE IF max_vsn < c + THEN /\ Chans' = [ Chans EXCEPT ![c].pre = max_vsn, ![c].state = "RetryTakeover" ] + /\ UNCHANGED << CStores, RStores, CQueue, RQueue>> + ELSE + /\ Chans' = [ Chans EXCEPT ![c].pre = max_vsn, ![c].state = "Abort" ] + /\ UNCHANGED << CStores, RStores, CQueue, RQueue>> + +NextCHDirtyReadMax(c) == LET ch == Chans[c] IN + /\ ch.loc /= NONE /\ ch.state = NONE + /\ Chans' = [ Chans EXCEPT ![c].pre = MaxRVsn(ch.loc), ![c].state = "DirtyReadMax" ] + /\ UNCHANGED << CStores, RStores, CQueue, RQueue>> + +NextCHNewSession(c) == LET ch == Chans[c] IN + /\ (ch.state = "DirtyReadMax" /\ ch.pre = NONE) \/ ch.state = "TakeoverFailed" + /\ DoNewSession(c) + /\ UNCHANGED << CStores, RStores, CQueue, RQueue>> + +NextCHTakeoverSessionSuccess(c) == LET ch == Chans[c] IN + /\ ch.state = "DirtyReadMax" + /\ ch.pre /= NONE + /\ DoTakeoverSession(c) + /\ Chans' = [ Chans EXCEPT ![c].state = "TakeoverStarted" ] + /\ UNCHANGED <> + +NextCHTakeoverSessionFail(c) == LET ch == Chans[c] IN + /\ ch.state = "DirtyReadMax" + /\ ch.pre /= NONE + /\ Chans' = [ Chans EXCEPT ![c].pre = NONE, ![c].state = "TakeoverFailed" ] + /\ UNCHANGED <> + +NextCHTakeoverTX(c) == LET ch == Chans[c] IN + /\ ch.state = "TakeoverStarted" + /\ ch.pre /= NONE \* @TODO assert this pre is not NONE when takeover + /\ DoTakeoverSessionTX(c) + +NextCHNewTX_NoExisting(c) == LET ch == Chans[c] max_vsn == MaxCVsn(ch.loc) IN + /\ ch.state = "NewSession" + /\ ch.pre = NONE + /\ max_vsn = NONE + /\ DO_TX_COMMIT(c, max_vsn) + +NextCHNewTX_AlreadyExisting(c) == LET ch == Chans[c] max_vsn == MaxCVsn(ch.loc) IN + /\ ch.state = "NewSession" + /\ ch.pre = NONE + /\ max_vsn /= NONE + /\ IF max_vsn < c + THEN /\ Chans' = [ Chans EXCEPT ![c].pre = max_vsn, ![c].state = "RetryTakeover" ] + /\ UNCHANGED << CStores, RStores, CQueue, RQueue>> + ELSE + /\ Chans' = [ Chans EXCEPT ![c].pre = max_vsn, ![c].state = "Abort" ] + /\ UNCHANGED << CStores, RStores, CQueue, RQueue>> + + +\* AssertNoAbort pre > cur + + +NextCHRetry(c) == LET ch == Chans[c] max_vsn == MaxCVsn(ch.loc) IN + /\ ch.state = "RetryTakeover" + /\ IF ch.retry > 0 + THEN + /\ Chans' = [ Chans EXCEPT ![c].state = "DirtyReadMax", ![c].retry = Chans[c].retry - 1] + /\ UNCHANGED << CStores, RStores, CQueue, RQueue >> + ELSE + /\ Chans' = [ Chans EXCEPT ![c].state = "Abort" ] + /\ UNCHANGED << CStores, RStores, CQueue, RQueue >> + + + +NextCHAbort(c) == LET ch == Chans[c] IN + /\ ch.state = "Abort" + /\ Chans' = [ Chans EXCEPT ![c].state = "offline" ] + /\ UNCHANGED << CStores, RStores, CQueue, RQueue>> + +NextCHNewTX(c) == \/ NextCHNewTX_NoExisting(c) + \/ NextCHNewTX_AlreadyExisting(c) + +NextCH == \E c \in CHs: + \/ NextCHConnect(c) \* @TODO move to init + \/ NextCHDirtyReadMax(c) \* Step 1: read local max + \/ NextCHNewSession(c) \* Step 2a: New session + \/ NextCHNewTX(c) \* Step 3a: New session commit + \/ NextCHTakeoverSessionSuccess(c) \* Step 2b: Takeover Session success + \/ NextCHTakeoverTX(c) \* Step 3b: Takeover Session commit + \/ NextCHTakeoverSessionFail(c) \* Step 2b: Takeover Session fail + \/ NextCHRetry(c) \* Step 4a: Maybe retry + \/ NextCHAbort(c) \* Step 4b: Maybe Abort + + +(* Next Actions of replications *) +NextR == \E r \in Replicants: DequeueRQueue(r) + +Next == NextCH \/ NextR +Spec == Init /\ [][Next]_<> + /\ WF_<>(\E c \in CHs: NextCHConnect(c)) + /\ WF_<>(\E r \in Replicants: DequeueRQueue(r)) + /\ WF_<>(\E c \in CHs: NextCH) + + +----------------------------------------------------------------------------- +(***** Invariants and Property *****) +----------------------------------------------------------------------------- + +eventuallyNoChanNone == <>~(\E ch \in CHs: Chans[ch].loc = NONE) +eventuallyMaxWin == <>(\E ch \in CHs: LET max == Len(Chans) IN + Chans[max].state = "Registered") + +testRQueueAlwaysEmpty == \A r \in Replicants: RStores[r] = {} + +testAbortWontHappen == [] (~ \E c \in CHs: ENABLED NextCHAbort(c)) + + +============================================================================= +\* Modification History +\* Last modified Thu Jun 05 15:37:18 CEST 2025 by ezhuwya +\* Created Wed Jun 04 13:38:58 CEST 2025 by ezhuwya From 6a196343c1517bd58196e27a57ddecf18a3fd46f Mon Sep 17 00:00:00 2001 From: William Yang Date: Tue, 10 Jun 2025 18:17:24 +0200 Subject: [PATCH 9/9] LSR: update formal spec. --- active/0030-assets/LSR.tla | 165 +++++++++++++------- active/0030-emqx_linear_session_registry.md | 3 + 2 files changed, 108 insertions(+), 60 deletions(-) diff --git a/active/0030-assets/LSR.tla b/active/0030-assets/LSR.tla index 9e61198..dfa2b07 100644 --- a/active/0030-assets/LSR.tla +++ b/active/0030-assets/LSR.tla @@ -1,5 +1,5 @@ -------------------------------- MODULE LSR -------------------------------- -EXTENDS Sequences, Naturals, TLC +EXTENDS Sequences, Naturals, TLC, FiniteSets VARIABLES CStores, (* Core storages *) RStores, (* Replicant storages *) RQueue, (* Replicant storages async queue *) @@ -7,29 +7,31 @@ VARIABLES CStores, (* Core storages *) Chans (* channels *) CONSTANT Cores, Replicants, CHs, NONE -CHStates == {NONE, "registered", "aborted", "unregistered", "closed"} +CHStates == {NONE, "DirtyReadMax", "NewSession", "TakeoverStarted", "Tookover", "TakeoverFailed", "RetryTakeover", "Registered", "Abort", "Terminating", "Offline", "Owned", "PartlyOwned"} TypeInvariant == /\ Chans \in [CHs -> [loc: Replicants \union {NONE}, \* location of the channel state: CHStates, \* channel state - retry: {3}, \* left of takeover retries + retry: (0..3), \* left of takeover retries pre: CHs \union {NONE}]] \* predecessor from best of the knowledge + /\ CStores \in [ Cores -> SUBSET CHs ] + /\ RStores \in [ Replicants -> SUBSET CHs ] + /\ RQueue \in [ Replicants -> Seq([ op: {"insert", "del"} , key: CHs]) ] + /\ CQueue \in [ Cores -> Seq([ op: {"insert", "del"} , key: CHs]) ] ----------------------------------------------------------------------------- - (* helpers *) Max(S) == - CHOOSE x \in S : \A y \in S : x >= y + CHOOSE x \in S : \A y \in S \ {x} : x > y MaxOrNone(S) == IF S = {} THEN NONE ELSE Max(S) MaxRVsn(node) == MaxOrNone(RStores[node]) -MaxCVsn(node) == MaxOrNone(CStores["c1"]) \* impl core node selection +MaxCVsn(node) == MaxOrNone(CStores["c1"]) \* @TODO impl core node selection - IsMaxInStore(v, store) == \A i \in store : i < v DoREnqueue(log) == @@ -38,17 +40,31 @@ DoREnqueue(log) == DequeueRQueue(r) == /\ RQueue[r] /= <<>> /\ LET h == Head(RQueue[r]) IN - RStores' = [ RStores EXCEPT ![r] = RStores[r] \union {h.key} ] + IF h.op = "insert" THEN + RStores' = [ RStores EXCEPT ![r] = RStores[r] \union {h.key} ] + ELSE + RStores' = [ RStores EXCEPT ![r] = RStores[r] \ {h.key} ] /\ RQueue' = [RQueue EXCEPT ![r] = Tail(RQueue[r])] /\ UNCHANGED <> +DequeueCQueue(n) == + /\ CQueue[n] /= <<>> + /\ LET h == Head(CQueue[n]) IN + IF h.op = "insert" THEN + CStores' = [ CStores EXCEPT ![n] = CStores[n] \union {h.key} ] + ELSE + CStores' = [ CStores EXCEPT ![n] = CStores[n] \ {h.key} ] + /\ CQueue' = [CQueue EXCEPT ![n] = Tail(CQueue[n])] + /\ UNCHANGED <> + + insert_job(ch) == [op |-> "insert", key |-> ch] del_job(ch) == [op |-> "del", key |-> ch] DO_TX_COMMIT(c, max_vsn) == - /\ Chans' = [ Chans EXCEPT ![c].pre = max_vsn, ![c].state = "Registered" ] + /\ Chans' = [ Chans EXCEPT ![c].pre = max_vsn, ![c].state = "Registered"] /\ CStores' = [ node \in DOMAIN CStores |-> {c} \union CStores[node]] /\ RQueue' = [ node \in DOMAIN RQueue |-> Append(RQueue[node], insert_job(c))] /\ UNCHANGED <> @@ -57,31 +73,24 @@ DO_DIRTY_UNREG(c) == /\ CQueue' = [ node \in DOMAIN CQueue |-> Append(CQueue[node], del_job(c))] /\ UNCHANGED <> - -NOOP(c) == TRUE ----------------------------------------------------------------------------- +(* Init State *) Init == /\ CStores = [ c \in Cores |-> {}] /\ RStores = [ r \in Replicants |-> {}] /\ CQueue = [ c \in Cores |-> <<>>] /\ RQueue = [ r \in Replicants |-> <<>>] - /\ Chans = [ch \in CHs |-> [loc |-> NONE, state |-> NONE, pre |-> NONE, retry |-> 3]] - - -(* CH connect *) -NextCHConnect(c) == - \E r \in Replicants: - /\ Chans[c].loc = NONE - /\ Chans' = [Chans EXCEPT ![c].loc = r] - /\ UNCHANGED <> - -\* Finshed Stage one, New session -DoNewSession(c) == - /\ Chans' = [ Chans EXCEPT ![c].pre = NONE, ![c].state = "NewSession" ] - -\* @TODO takeover session involving other ch -DoTakeoverSession(c) == TRUE + /\ Chans \in [CHs -> [loc: Replicants, state: {NONE}, pre: {NONE}, retry: {3}]] + +----------------------------------------------------------------------------- +(* Dos *) + + + +DoTakeoverSession(c) == LET ch == Chans[c] IN \* Takeover with dirty data, risk to takeover the wrong session + /\ Chans' = [Chans EXCEPT ![ch.pre].state = "Tookover", ![c].state = "TakeoverStarted"] + /\ UNCHANGED <> DoTakeoverSessionTX(c) == LET ch == Chans[c] max_vsn == MaxCVsn(ch.loc) IN /\ max_vsn /= NONE @@ -93,43 +102,44 @@ DoTakeoverSessionTX(c) == LET ch == Chans[c] max_vsn == MaxCVsn(ch.loc) IN ELSE /\ Chans' = [ Chans EXCEPT ![c].pre = max_vsn, ![c].state = "Abort" ] /\ UNCHANGED << CStores, RStores, CQueue, RQueue>> - -NextCHDirtyReadMax(c) == LET ch == Chans[c] IN + +----------------------------------------------------------------------------- +(* Actions *) +NextCHDirtyReadMax(c) == LET ch == Chans[c] IN \* Dirty read local max /\ ch.loc /= NONE /\ ch.state = NONE /\ Chans' = [ Chans EXCEPT ![c].pre = MaxRVsn(ch.loc), ![c].state = "DirtyReadMax" ] /\ UNCHANGED << CStores, RStores, CQueue, RQueue>> -NextCHNewSession(c) == LET ch == Chans[c] IN +NextCHNewSession(c) == LET ch == Chans[c] IN \* Start new session /\ (ch.state = "DirtyReadMax" /\ ch.pre = NONE) \/ ch.state = "TakeoverFailed" - /\ DoNewSession(c) + /\ Chans' = [ Chans EXCEPT ![c].pre = NONE, ![c].state = "NewSession" ] /\ UNCHANGED << CStores, RStores, CQueue, RQueue>> -NextCHTakeoverSessionSuccess(c) == LET ch == Chans[c] IN - /\ ch.state = "DirtyReadMax" +NextCHTakeoverSessionSuccess(c) == LET ch == Chans[c] IN + /\ (ch.state = "DirtyReadMax" \/ ch.state = "RetryTakeover") /\ ch.pre /= NONE + /\ ch.pre < c /\ DoTakeoverSession(c) - /\ Chans' = [ Chans EXCEPT ![c].state = "TakeoverStarted" ] - /\ UNCHANGED <> NextCHTakeoverSessionFail(c) == LET ch == Chans[c] IN - /\ ch.state = "DirtyReadMax" + /\ (ch.state = "DirtyReadMax" \/ ch.state = "RetryTakeover") /\ ch.pre /= NONE /\ Chans' = [ Chans EXCEPT ![c].pre = NONE, ![c].state = "TakeoverFailed" ] /\ UNCHANGED <> NextCHTakeoverTX(c) == LET ch == Chans[c] IN /\ ch.state = "TakeoverStarted" - /\ ch.pre /= NONE \* @TODO assert this pre is not NONE when takeover + /\ ch.pre /= NONE /\ DoTakeoverSessionTX(c) NextCHNewTX_NoExisting(c) == LET ch == Chans[c] max_vsn == MaxCVsn(ch.loc) IN - /\ ch.state = "NewSession" + /\ ch.state = "DirtyReadMax" /\ ch.pre = NONE /\ max_vsn = NONE /\ DO_TX_COMMIT(c, max_vsn) NextCHNewTX_AlreadyExisting(c) == LET ch == Chans[c] max_vsn == MaxCVsn(ch.loc) IN - /\ ch.state = "NewSession" + /\ ch.state = "DirtyReadMax" /\ ch.pre = NONE /\ max_vsn /= NONE /\ IF max_vsn < c @@ -138,10 +148,21 @@ NextCHNewTX_AlreadyExisting(c) == LET ch == Chans[c] max_vsn == MaxCVsn(ch.loc) ELSE /\ Chans' = [ Chans EXCEPT ![c].pre = max_vsn, ![c].state = "Abort" ] /\ UNCHANGED << CStores, RStores, CQueue, RQueue>> - - -\* AssertNoAbort pre > cur - + +NextCHTakeoverEndSuccess(c) == LET ch == Chans[c] IN + /\ ch.state = "Registered" + /\ ch.pre /= NONE + /\ Chans[ch.pre].state = "Tookover" + /\ Chans' = [ Chans EXCEPT ![ch.pre].state = "Terminating", ![c].state = "Owned"] + /\ UNCHANGED <> + + +NextCHTakeoverEndFail(c) == LET ch == Chans[c] IN + /\ ch.state = "Registered" + /\ ch.pre /= NONE + /\ Chans[ch.pre].state = "Tookover" + /\ Chans' = [ Chans EXCEPT ![ch.pre].state = "Terminating", ![c].state = "PartlyOwned"] + /\ UNCHANGED <> NextCHRetry(c) == LET ch == Chans[c] max_vsn == MaxCVsn(ch.loc) IN /\ ch.state = "RetryTakeover" @@ -153,19 +174,24 @@ NextCHRetry(c) == LET ch == Chans[c] max_vsn == MaxCVsn(ch.loc) IN /\ Chans' = [ Chans EXCEPT ![c].state = "Abort" ] /\ UNCHANGED << CStores, RStores, CQueue, RQueue >> - - NextCHAbort(c) == LET ch == Chans[c] IN /\ ch.state = "Abort" - /\ Chans' = [ Chans EXCEPT ![c].state = "offline" ] + /\ Chans' = [ Chans EXCEPT ![c].state = "Offline" ] /\ UNCHANGED << CStores, RStores, CQueue, RQueue>> + + +NextCHTerminating(c) == LET ch == Chans[c] IN + /\ ch.state = "Terminating" + /\ CQueue' = [ node \in DOMAIN CQueue |-> Append(CQueue[node], del_job(c) )] + /\ Chans' = [ Chans EXCEPT ![c].state = "Offline" ] + /\ UNCHANGED <> + NextCHNewTX(c) == \/ NextCHNewTX_NoExisting(c) \/ NextCHNewTX_AlreadyExisting(c) NextCH == \E c \in CHs: - \/ NextCHConnect(c) \* @TODO move to init - \/ NextCHDirtyReadMax(c) \* Step 1: read local max + \/ NextCHDirtyReadMax(c) \* Step 1: Dirty read max from local \/ NextCHNewSession(c) \* Step 2a: New session \/ NextCHNewTX(c) \* Step 3a: New session commit \/ NextCHTakeoverSessionSuccess(c) \* Step 2b: Takeover Session success @@ -173,32 +199,51 @@ NextCH == \E c \in CHs: \/ NextCHTakeoverSessionFail(c) \* Step 2b: Takeover Session fail \/ NextCHRetry(c) \* Step 4a: Maybe retry \/ NextCHAbort(c) \* Step 4b: Maybe Abort - + \/ NextCHTakeoverEndSuccess(c) \* Step 5a: Takeover end + \/ NextCHTakeoverEndFail(c) \* Step 5b: Takeover end failed + \/ NextCHTerminating(c) \* Step 6: Terminating (* Next Actions of replications *) -NextR == \E r \in Replicants: DequeueRQueue(r) +NextR == \/ (\E node \in Replicants: DequeueRQueue(node)) + \/ (\E node \in Cores: DequeueCQueue(node)) Next == NextCH \/ NextR Spec == Init /\ [][Next]_<> - /\ WF_<>(\E c \in CHs: NextCHConnect(c)) /\ WF_<>(\E r \in Replicants: DequeueRQueue(r)) - /\ WF_<>(\E c \in CHs: NextCH) - - + /\ WF_<>(\E n \in Cores: DequeueCQueue(n)) + /\ WF_<>(\E c \in CHs: NextCHTakeoverEndSuccess(c)) + /\ WF_<>(\E c \in CHs: NextCHTerminating(c)) + /\ WF_<>(\E c \in CHs: NextCHNewSession(c)) + /\ WF_<>(\E c \in CHs: NextCHDirtyReadMax(c)) + ----------------------------------------------------------------------------- (***** Invariants and Property *****) ----------------------------------------------------------------------------- -eventuallyNoChanNone == <>~(\E ch \in CHs: Chans[ch].loc = NONE) -eventuallyMaxWin == <>(\E ch \in CHs: LET max == Len(Chans) IN - Chans[max].state = "Registered") +\* New ch cannot be takenover by old ch. +assertOldNeverWin == ~\E ch \in CHs: Chans[ch].pre /= NONE /\ Chans[ch].pre > ch /\ Chans[ch].state = "Registered" + +\* No double registration +assertNoDouble == ~\E ch1, ch2 \in CHs: Chans[ch1].state = "Registered" /\ Chans[ch2].state = "Registered" /\ ch1 /= ch2 + +\* Never takeover own session +assertNotMe == ~ \E ch \in CHs: ch = Chans[ch].pre + +\* No takeover NONE +assertNotTakeoverNone == ~ \E ch \in CHs: Chans[ch].state = "TakeoverStarted" /\ Chans[ch].pre = NONE + + +\* Property: if max chan is registered, it remains registered. +eventuallyRegistered == <>[][\E ch \in CHs: ch = Max(CHs) /\ Chans[ch].state = "Owned" /\ \A o \in CHs \ {ch}: Chans[o].state = "Offline"]_<> -testRQueueAlwaysEmpty == \A r \in Replicants: RStores[r] = {} -testAbortWontHappen == [] (~ \E c \in CHs: ENABLED NextCHAbort(c)) +\* State Predicates that ensures check coverage +\* below are the ones should be violated. +testRStoresAlwaysEmpty == \A r \in Replicants: RStores[r] = {} +testAbortWontHappen == [] (\E c \in CHs: ENABLED NextCHAbort(c)) ============================================================================= \* Modification History -\* Last modified Thu Jun 05 15:37:18 CEST 2025 by ezhuwya +\* Last modified Tue Jun 10 18:10:46 CEST 2025 by ezhuwya \* Created Wed Jun 04 13:38:58 CEST 2025 by ezhuwya diff --git a/active/0030-emqx_linear_session_registry.md b/active/0030-emqx_linear_session_registry.md index 1b19b44..e3fdea7 100644 --- a/active/0030-emqx_linear_session_registry.md +++ b/active/0030-emqx_linear_session_registry.md @@ -195,6 +195,9 @@ flowchart TD The transaction to run is micro and abortive, it only reads and writes the same key, only one lock is taken so it is unlikely to get restarted by mria/mnesia. + +See [Formal Specification in TLA](0030-assets/LSR.tla) + ### record #lsr_channel{} `lsr_channel` of LSR represents the EMQX channels that provides a global view and maintain a global state within the cluster.