Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions voxels-common/src/main/java/common/network/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,24 @@ public void send(Packet packet) {

synchronized (buffer) {
try {
buffer.writeInt(packet.getId());
packet.write(buffer);
out.flush();
writePacket(packet);
flushOutput();
} catch (Exception e) {
System.err.println("[Network] Failed to send packet " + packet.getId());
close();
}
}
}

protected void writePacket(Packet packet) throws IOException {
buffer.writeInt(packet.getId());
packet.write(buffer);
}

protected void flushOutput() throws IOException {
out.flush();
}

/**
* Handover point for received packets. Implementations should typically queue the packet for the
* main thread.
Expand Down
56 changes: 56 additions & 0 deletions voxels-server/src/main/java/server/network/ServerConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,14 @@
*/
public class ServerConnection extends Connection {

private static final int MAX_OUTBOUND_BATCH_SIZE = 64;
private static final long WRITER_POLL_TIMEOUT_MS = 10L;

private volatile ServerPlayer player; // Initialized as null until the player officially joins
private final GameServer server;
private final ServerPacketDispatcher packetDispatcher;
private final LinkedBlockingQueue<Packet> outboundQueue = new LinkedBlockingQueue<>();
private final Thread writerThread;

private final Queue<Packet> incomingPackets = new ConcurrentLinkedQueue<>();

Expand Down Expand Up @@ -62,6 +67,56 @@ public int getQueueSize() {
return incomingPackets.size();
}

@Override
public void send(Packet packet) {
if (!running || packet == null) return;
outboundQueue.offer(packet);
}

private void writeLoop() {
List<Packet> batch = new ArrayList<>(MAX_OUTBOUND_BATCH_SIZE);

try {
while (running || !outboundQueue.isEmpty()) {
Packet first = outboundQueue.poll(WRITER_POLL_TIMEOUT_MS, TimeUnit.MILLISECONDS);
if (first == null) {
continue;
}

batch.add(first);
outboundQueue.drainTo(batch, MAX_OUTBOUND_BATCH_SIZE - 1);
writeBatch(batch);
batch.clear();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
if (!batch.isEmpty()) {
try {
writeBatch(batch);
} catch (Exception ignored) {
}
}
}
}

private void writeBatch(List<Packet> batch) {
if (batch.isEmpty() || buffer == null) return;

synchronized (buffer) {
try {
for (Packet packet : batch) {
writePacket(packet);
}
flushOutput();
} catch (Exception e) {
Packet failedPacket = batch.get(0);
System.err.println("[Network] Failed to send packet " + failedPacket.getId());
close();
}
}
}

/**
* Closes the connection and cleans up resources. Unregisters the connection from the
* PlayerManager to trigger leave events.
Expand All @@ -70,6 +125,7 @@ public int getQueueSize() {
public void close() {
// Close sockets and set running = false via base class
super.close();
writerThread.interrupt();

// Notify manager to handle player logout/cleanup
server.getPlayerManager().removeConnection(this);
Expand Down
Loading