Replacing lock-step batching with semaphore-based sliding window concurrency across py-libp2p #1275
yashksaini-coder
started this conversation in
General
Replies: 0 comments
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
Semaphore-Based Concurrency in py-libp2p
I spent some time going through how py-libp2p handles concurrency at the DHT and stream management layers. Wanted to share what I found, what I've already fixed, and what's still open.
Background
This started from looking at how the
ALPHAparameter works in the DHT code.ALPHA(default 3) is supposed to control how many queries run at the same time — you don't want to flood the network, but you also don't want to sit around waiting when there's capacity available.Turns out the codebase handles concurrency in two very different ways depending on where you look:
Semaphores already exist in the project for protocol negotiation and pubsub validation, so the pattern isn't new. It just wasn't applied at these higher-level coordination points.
What I Found: DHT Query Coordination
Where:
PeerRouting.find_closest_peers_network(libp2p/kad_dht/peer_routing.py),KadDHT.put_valueandKadDHT.get_value(libp2p/kad_dht/kad_dht.py)How it worked before:
All three methods used the same pattern — pick
ALPHApeers, start them all in atrio.open_nursery(), wait for the entire nursery to close, then start the next batch:The problem is simple: if two queries finish in 10ms and one takes 500ms, the freed-up slots sit idle for 490ms. On an iterative DHT lookup that runs up to 20 rounds, that wasted time adds up fast.
What I changed (PR for #1273):
Replaced all three with
trio.Semaphore(ALPHA):The moment any query completes and releases its slot, the next one starts. No idle capacity, no wasted time. Still respects the ALPHA limit.
For
find_closest_peers_networkspecifically, I made sure the iterative refinement loop is preserved — each round re-sorts candidates with newly discovered closer peers before launching the next round. The sliding window only operates within a round, so we don't lose the Kademlia "query, learn, query closer" behavior.For
get_value, I added atrio.Eventthat stops launching new queries once quorum is reached, so we don't waste work after we have enough valid records.Results:
What's Still Open: Stream Management in Swarm
Where:
Swarm(libp2p/network/swarm.py) viaResourceManager(libp2p/rcmgr/manager.py)How it works now:
When you call
new_stream, theResourceManagerchecks a counter. If the stream count is at the limit, it returnsFalseimmediately, and theSwarmraisesSwarmException("Stream limit exceeded"). No waiting, no backpressure — just instant failure.Why this is a problem:
Under load bursts (lots of peers connecting at once, DHT lookups running in parallel, pubsub fan-out), short-lived streams close and free up slots within milliseconds. But if a new request hits the counter at the exact moment it's full, it fails — even though a slot would be available 5ms later.
What a semaphore would do:
Instead of failing immediately,
new_streamwouldawait semaphore.acquire(). The caller naturally pauses until a slot opens up, then proceeds. Under normal load it's instant (no contention). Under burst load it provides backpressure instead of errors.This is the same pattern go-libp2p uses for their resource limiter. And py-libp2p already uses semaphores this way for protocol negotiation (
BasicHost.new_streamthrottles multiselect with a semaphore of 5) and pubsub validation.I haven't tackled this one yet because it touches more code paths (every stream open goes through the Swarm), but the pattern is straightforward and proven by the DHT change.
Where Semaphores Already Exist in py-libp2p
For context, the project already uses semaphores for lower-level bottlenecks:
BasicHost.new_streamQUICConnectionPubSub._validator_semaphoreThese all work well. The DHT and Swarm just hadn't been wired up the same way.
Summary
PeerRouting,KadDHT)trio.Semaphore(ALPHA)sliding windowSwarm/ResourceManager)trio.Semaphorewith backpressureHappy to discuss the Swarm change further if there's interest. The DHT part is already merge, here #1274 and the semaphore based stream-concurrency is ready for review at #1289
CC: @seetadev @acul71 @pacrob
Beta Was this translation helpful? Give feedback.
All reactions