Skip to content

Phase 2 step 0: Connection struct + tokio::sync::Mutex#3

Open
agaonker wants to merge 1 commit into
mainfrom
phase-2-step-0-conn-struct-and-tokio-mutex
Open

Phase 2 step 0: Connection struct + tokio::sync::Mutex#3
agaonker wants to merge 1 commit into
mainfrom
phase-2-step-0-conn-struct-and-tokio-mutex

Conversation

@agaonker

Copy link
Copy Markdown
Owner

Summary

  • Connection struct refactorhandle_connectionConnection { socket, buf, store, client_id } with per-connection monotonic client_id and structured #[tracing::instrument]. handle_connection kept as a thin wrapper, so listener.rs and tests/phase1.rs see no signature change.
  • tokio::sync::Mutex switchSharedStore = Arc<tokio::sync::Mutex<Store>>. dispatch is now async fn, acquires the lock exactly once per command, hands a &mut Store guard to handlers. Handlers stay pure sync fn (fully testable without the async runtime). PING / ECHO skip the lock entirely.
  • Combined into one PR for review ergonomics, per the CEO + eng plan discussion.

Architectural notes

  • The "never lock across .await" rule is now moot — tokio::sync::Mutex lets a task await while holding the guard. We still don't, but the foot-gun is gone.
  • Lock-once-per-dispatch is the pattern future MULTI/EXEC will reuse: EXEC will take the lock once and run N queued commands under one critical section.
  • Per-connection client_id now appears as a structured field on every log line in that connection's span — grep client_id=42 to follow one conversation.

Test plan

  • cargo build — clean (3 pre-existing dead-code warnings, unrelated)
  • cargo test — 60 unit + 11 phase1.rs integration tests, 0 failures
  • Per-handler unit tests rewritten to use Store::new() directly (no shared_store, no async)
  • Manual smoke: cargo run + redis-cli ping/set/get/lpush/lrange/hset/sadd against 127.0.0.1:6379

🤖 Generated with Claude Code

Two CEO-plan items combined into one PR (per user choice for review
ergonomics).

Connection struct refactor:
- handle_connection -> Connection { socket, buf, store, client_id }
- Per-connection monotonic client_id from AtomicU64
- #[tracing::instrument] adds structured client_id field to every log
  line in the connection's scope
- handle_connection kept as a thin wrapper so listener.rs and
  tests/phase1.rs see no signature change

tokio::sync::Mutex switch:
- SharedStore = Arc<tokio::sync::Mutex<Store>>
- dispatch is now async fn; acquires the lock exactly once per command
- PING and ECHO bypass the lock (zero contention on keepalives)
- Every handler now takes &mut Store (the locked guard) instead of
  &SharedStore. Handlers stay pure sync fn, fully testable in isolation
- Handler unit tests construct Store::new() directly; no shared store,
  no async runtime needed for unit testing
- "Never lock across .await" rule is now moot

Verified: cargo test passes 60 unit + 11 phase1 integration, 0 failures.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
@coderabbitai

coderabbitai Bot commented May 17, 2026

Copy link
Copy Markdown

Warning

Rate limit exceeded

@agaonker has exceeded the limit for the number of commits that can be reviewed per hour. Please wait 9 minutes and 43 seconds before requesting another review.

You’ve run out of usage credits. Purchase more in the billing tab.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 0bb40f71-ed85-4767-a203-b42c8e644355

📥 Commits

Reviewing files that changed from the base of the PR and between 892f232 and cbbf3b8.

📒 Files selected for processing (10)
  • redis-clone/src/command/dispatch.rs
  • redis-clone/src/command/handlers/del.rs
  • redis-clone/src/command/handlers/exists.rs
  • redis-clone/src/command/handlers/get.rs
  • redis-clone/src/command/handlers/hash.rs
  • redis-clone/src/command/handlers/list.rs
  • redis-clone/src/command/handlers/set.rs
  • redis-clone/src/command/handlers/set_cmd.rs
  • redis-clone/src/server/connection.rs
  • redis-clone/src/store/mod.rs
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch phase-2-step-0-conn-struct-and-tokio-mutex

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@agaonker agaonker left a comment

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PRISM review

Verdict: REQUEST_CHANGES · Risk: low · Type: refactor · Status: success
Recalled 5 past finding(s) from memory.

The refactor is generally clean and the new Connection struct is a reasonable step forward, but there are a few things to address before merging. The most important concern is that SharedStore is embedded directly in Connection, which will make the struct grow unwieldy as more shared state (pub/sub, replication, etc.) is added — consider wrapping all shared server state in a single ServerState or AppState handle instead. The two-phase match in dispatch (first match for lock-free commands, second match acquires the lock) is a maintainability trap: any new command added to the wrong arm will silently hold the lock unnecessarily, so unifying into a single match with per-command lock acquisition would be safer. There's also a meaningful gap in test coverage — the new client_id minting logic, the handle_connection wrapper, pipelined frames (multiple complete frames in one read), and the ParseOutcome::Err close path are all untested. Finally, the handle_connection free function and the public Connection::new/handle create two overlapping entry points with unclear ownership of client_id assignment; pick one pattern and make the other private or remove it.

12 finding(s) posted inline on the diff.

Other findings (3)

  • [MAJOR/test] redis-clone/src/server/connection.rs:None The Connection struct and its handle method have no unit or integration tests. The new struct introduces a client_id field, a NEXT_CLIENT_ID atomic counter, and restructures the read/dispatch/write loop — none of which are exercised by any test in the diff. In particular: (1) the client_id monotonic increment logic, (2) the handle_connection backwards-compat wrapper minting a fresh id, and (3) the full request/response cycle through the new Connection::handle are all untested.
  • [MINOR/test] redis-clone/src/server/connection.rs:None The Connection::handle loop now processes multiple frames per read (inner loop) before going back to read_buf. The pipelining path (multiple complete frames in one read) is not tested anywhere in the diff.
  • [SUGGESTION/test] redis-clone/src/server/connection.rs:None The ParseOutcome::Err branch in Connection::handle closes the connection after writing the error. This error-close path is not covered by any test in the diff.

Automated review by PRISM — verify before acting.

let response = match Command::try_from(value) {
Ok(cmd) => {
debug!(cmd = %cmd.name, "dispatching");
dispatch(cmd, &self.store).await

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[BLOCKING/bug] In Connection::handle, after a ParseOutcome::Err, the code writes an error response and returns Ok(()). This is correct. However, there is a subtle issue: if self.socket.write_all(...) fails (returns an Err), the ? operator will propagate the error upward. But the outer loop only calls self.socket.read_buf once per outer iteration, then processes all available frames in the inner loop. If the inner loop's write_all fails mid-stream (e.g., client disconnected between frames), the error is propagated correctly via ?. This is fine — no bug.

"GET" => handle_get(&cmd.args, &mut store),
"DEL" => handle_del(&cmd.args, &mut store),
"EXISTS" => handle_exists(&cmd.args, &mut store),
"LPUSH" => handle_lpush(&cmd.args, &mut store),

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[BLOCKING/bug] In dispatch, the variable store (the MutexGuard) shadows the parameter store: &SharedStore with let mut store = store.lock().await. The guard is then passed as &mut store to handlers. This is correct Rust — the guard derefs to &mut Store. However, the guard is held for the entire second match block and is only dropped when dispatch returns. Since dispatch is now async, if any future .await point were added inside the second match (e.g., a future async handler), the tokio::sync::Mutex guard would be held across an await, which is a classic deadlock risk. Currently there are no await points inside the second match, so this is not a present bug, but the structure is fragile. This is a SUGGESTION-level concern for now.

}
}
}
}

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[MAJOR/bug] The handle_connection free function mints a client_id via NEXT_CLIENT_ID.fetch_add(1, Ordering::Relaxed) and then constructs a Connection passing that id. However, Connection::new also accepts a client_id parameter from the caller — there is no validation or enforcement that the id is unique. Any caller of Connection::new directly can pass an arbitrary or duplicate client_id, silently producing misleading structured log output. This is a minor API design issue but not a correctness bug in the changed code paths.

/// (as `&mut Store`) to handlers that need it. Sync handlers run under the
/// lock; PING/ECHO bypass the store entirely. This keeps every handler as
/// a pure `fn` and concentrates all locking in this one spot.
pub async fn dispatch(cmd: Command, store: &SharedStore) -> RespValue {

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[MAJOR/quality] The two-match structure in dispatch (previously flagged as a SUGGESTION in prior reviews) is now more problematic: the first match uses return for PING/ECHO and a _ => {} no-op arm, then the second match acquires the lock. Any new store-free command (e.g., CLIENT, COMMAND, DEBUG) added to the second match will silently acquire the lock unnecessarily. The prior suggestion to unify into a single match or use a command metadata table remains unaddressed and is now a more concrete maintainability risk.

/// Per-connection state. Owns the socket and the read buffer; carries the
/// shared store handle and a monotonic `client_id` used for structured logging.
///
/// Future fields (added in later PRs): `txn_state`, `watched_keys`, `client_name`.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[MAJOR/architecture] The Connection struct embeds SharedStore directly. This tightly couples the connection layer to the concrete store type. When persistence, replication, or a scripting engine is added, the connection will need access to more shared state (e.g., a pub/sub bus, a replication log). Embedding each piece of shared state directly in Connection will cause the struct to grow unboundedly and make it hard to test connection logic in isolation.

use crate::store::{wrong_type_error, Store, StoreValue};

pub fn handle_get(args: &[Vec<u8>], store: &SharedStore) -> RespValue {
pub fn handle_get(args: &[Vec<u8>], store: &mut Store) -> RespValue {

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[MINOR/security] The prior finding about read-only handlers (handle_get, handle_exists, handle_hget, handle_hexists, handle_hlen, handle_hkeys, handle_hvals, handle_smembers, handle_sismember, handle_scard, handle_llen, handle_lrange, handle_lindex) still taking &mut Store instead of &Store remains unaddressed. This is unnecessarily restrictive and will cause borrow-checker friction if two read-only handlers ever need to be called in sequence (e.g., in a MULTI/EXEC validator).

}
}
}
}

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[MINOR/quality] The handle_connection free function is described as a "backwards-compatible entry point" but it is the only public entry point — Connection::new and Connection::handle are also pub, so callers can already construct a Connection directly with a custom client_id. The wrapper adds a layer of indirection that may become confusing once callers start managing IDs themselves (e.g., for a CLIENT ID command). Consider either making Connection fields/constructor private and keeping only handle_connection, or removing handle_connection and letting callers use Connection directly.

/// shared store handle and a monotonic `client_id` used for structured logging.
///
/// Future fields (added in later PRs): `txn_state`, `watched_keys`, `client_name`.
pub struct Connection {

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[MINOR/quality] The BytesMut initial capacity is a magic number 4096 duplicated from the old code without a named constant. This appears in Connection::new.

use crate::protocol::RespValue;

/// Shared, async-locked handle to the store. Tokio mutex (not std) so the
/// lock can be acquired across `.await` points without blocking the runtime

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[SUGGESTION/quality] The shared_store() constructor function is still exported from store/mod.rs but is no longer used in any handler test (tests now call Store::new() directly). If shared_store() is only used at the server startup site, consider whether it still needs to be pub or can be inlined at its single call site.

};

pub fn dispatch(cmd: Command, store: &SharedStore) -> RespValue {
/// Route a parsed command to its handler.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[SUGGESTION/quality] The dispatch doc-comment says "Sync handlers run under the lock" but dispatch is now async. The word "Sync" is misleading — it means the handlers themselves are synchronous functions, but a reader may interpret it as the function being synchronous. Clarify the wording.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant