A workshop for learning the Producer-Consumer pattern and async I/O in .NET C# V10, using a simulated OCPP 1.6 charging station backend.
The Server project handles OCPP messages from charging stations over WebSockets. It works, but has two scalability issues:
- Thread per connection — Every WebSocket connection spawns a dedicated thread that blocks while waiting for data. With 1000 stations connected, that's 1000 threads sitting idle most of the time.
- Single consumer — All incoming messages funnel through one processing thread. Even with 16 CPU cores, only one message is handled at a time.
Your task: fix both issues by applying the changes found in ServerSolution/.
# Terminal 1: Start the server
dotnet run --project Server
# Terminal 2: Start 10 charging stations
dotnet run --project ChargingStations 10Watch the server output — messages are processed sequentially on a single thread, with noticeable delays stacking up.
100 connections = 100 blocked threads (wasteful)
│
▼
[ BlockingCollection queue ]
│
▼
1 worker thread (bottleneck)
100 connections = ~few ThreadPool threads (efficient)
│
▼
[ BlockingCollection queue ]
│
▼
N worker threads (parallel processing)
What it solves: The single-consumer bottleneck. BlockingCollection<T> is already thread-safe — multiple threads can call GetConsumingEnumerable() simultaneously. The runtime distributes items across consumers automatically.
Changes in Server/Program.cs:
- Add a worker count based on available CPU cores:
int WorkerCount = Environment.ProcessorCount;- Replace the single processing thread with a loop that spawns N workers:
var workerThreads = new Thread[WorkerCount];
for (int i = 0; i < WorkerCount; i++)
{
var workerId = i + 1;
workerThreads[i] = new Thread(() => ProcessMessages(messageQueue, workerId))
{
IsBackground = true,
Name = $"OCPP-Worker-{workerId}"
};
workerThreads[i].Start();
}- Update
ProcessMessagesto accept and pass through aworkerId:
static void ProcessMessages(BlockingCollection<QueuedMessage> queue, int workerId)- Pass
workerIdtoOcppMessageHandler.HandleMessage:
var response = OcppMessageHandler.HandleMessage(item.StationId, item.Message, workerId);Changes in Server/OcppMessageHandler.cs:
- Add
workerIdparameter toHandleMessageand include it in the log output:
public static string? HandleMessage(string stationId, string json, int workerId)
{
// ...
Log($"[{stationId}] {action} (Worker {workerId})");
// ...
}Verify: Run the server again with 10 stations. You should now see messages being processed in parallel by different workers, with timestamps overlapping instead of sequential.
What it solves: Thread waste on I/O-bound waiting. Each WebSocket connection currently blocks a dedicated thread while waiting for data. With async I/O, the thread returns to the ThreadPool during the wait — one ThreadPool can serve thousands of connections.
Changes in Server/Program.cs:
- Replace the blocking accept loop with an async method call:
// Before:
while (true)
{
var context = listener.GetContext();
// ...
}
// After:
await AcceptConnectionsAsync(listener, messageQueue, connections);- Create the
AcceptConnectionsAsyncmethod usingGetContextAsyncandAcceptWebSocketAsync:
static async Task AcceptConnectionsAsync(
HttpListener listener,
BlockingCollection<QueuedMessage> queue,
ConcurrentDictionary<string, WebSocket> connections)
{
while (true)
{
var context = await listener.GetContextAsync();
if (!context.Request.IsWebSocketRequest)
{
context.Response.StatusCode = 400;
context.Response.Close();
continue;
}
var stationId = context.Request.Url?.AbsolutePath.TrimStart('/') ?? "unknown";
var wsContext = await context.AcceptWebSocketAsync("ocpp1.6");
var webSocket = wsContext.WebSocket;
connections[stationId] = webSocket;
_ = ReceiveMessagesAsync(stationId, webSocket, queue, connections);
OcppMessageHandler.Log($"[CONNECT] Station '{stationId}' connected");
}
}- Convert
ReceiveMessagestoReceiveMessagesAsync— replace all.GetAwaiter().GetResult()calls withawait:
static async Task ReceiveMessagesAsync(
string stationId,
WebSocket webSocket,
BlockingCollection<QueuedMessage> queue,
ConcurrentDictionary<string, WebSocket> connections)
{
var buffer = new byte[4096];
try
{
while (webSocket.State == WebSocketState.Open)
{
var result = await webSocket.ReceiveAsync(buffer, CancellationToken.None);
if (result.MessageType == WebSocketMessageType.Close)
{
await webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Closing", CancellationToken.None);
break;
}
var message = Encoding.UTF8.GetString(buffer, 0, result.Count);
queue.Add(new QueuedMessage(stationId, message, webSocket));
}
}
catch { }
finally
{
connections.TryRemove(stationId, out _);
OcppMessageHandler.Log($"[DISCONNECT] Station '{stationId}' disconnected");
}
}Note the fire-and-forget pattern (_ = ReceiveMessagesAsync(...)) — each connection runs independently without a dedicated thread.
Verify: Run the server with 10+ stations. The behavior should be the same as Step 1, but now using far fewer OS threads. The ThreadPool manages waiting efficiently.
| Concept | What | Why |
|---|---|---|
BlockingCollection<T> |
Thread-safe queue | Multiple consumers can call GetConsumingEnumerable() concurrently |
Environment.ProcessorCount |
Number of CPU cores | Optimal worker count for CPU-bound work |
async/await |
Non-blocking I/O | Thread returns to pool while waiting for data |
Fire-and-forget (_ = ...) |
Start async task without awaiting | Each connection runs independently |
.GetAwaiter().GetResult() |
Sync-over-async (anti-pattern) | Blocks the thread — exactly what we're removing |
| Project | Description |
|---|---|
| Server | Your starting point — thread-per-connection, single consumer |
| ServerSolution | The completed solution for reference |
| ChargingStations | Simulated OCPP 1.6 charging stations (test client) |