diff --git a/core/.gitignore b/core/.gitignore deleted file mode 100644 index ba39d92495..0000000000 --- a/core/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -test-output/ -/target diff --git a/core/src/main/java/net/kuujo/copycat/resource/internal/PassiveState.java b/core/src/main/java/net/kuujo/copycat/resource/internal/PassiveState.java index 2203812ba5..3697862f05 100644 --- a/core/src/main/java/net/kuujo/copycat/resource/internal/PassiveState.java +++ b/core/src/main/java/net/kuujo/copycat/resource/internal/PassiveState.java @@ -151,7 +151,7 @@ private void recursiveSync(ReplicaInfo member, boolean requireEntries, Completab } } else { // If the request failed then record the member as INACTIVE. - LOGGER.debug("{} - Sync to {} failed", context.getLocalMember(), member); + LOGGER.warn("{} - Sync to {} failed: {}", context.getLocalMember(), member, error.getMessage()); future.completeExceptionally(error); } } diff --git a/netty/src/main/java/net/kuujo/copycat/netty/NettyTcpProtocolClient.java b/netty/src/main/java/net/kuujo/copycat/netty/NettyTcpProtocolClient.java index 592f89171d..3568985779 100644 --- a/netty/src/main/java/net/kuujo/copycat/netty/NettyTcpProtocolClient.java +++ b/netty/src/main/java/net/kuujo/copycat/netty/NettyTcpProtocolClient.java @@ -16,33 +16,24 @@ package net.kuujo.copycat.netty; import io.netty.bootstrap.Bootstrap; -import io.netty.buffer.ByteBuf; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.LengthFieldBasedFrameDecoder; +import io.netty.handler.codec.LengthFieldPrepender; +import io.netty.handler.codec.bytes.ByteArrayDecoder; +import io.netty.handler.codec.bytes.ByteArrayEncoder; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import net.kuujo.copycat.protocol.ProtocolClient; import net.kuujo.copycat.protocol.ProtocolException; -import net.kuujo.copycat.util.concurrent.NamedThreadFactory; import javax.net.ssl.SSLException; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.RemovalListener; -import com.google.common.cache.RemovalNotification; - import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; /** * Netty TCP protocol client. @@ -50,57 +41,58 @@ * @author Jordan Halterman */ public class NettyTcpProtocolClient implements ProtocolClient { - private static final Logger LOGGER = LoggerFactory.getLogger(NettyTcpProtocolClient.class); private final String host; private final int port; private final NettyTcpProtocol protocol; - private final EventLoopGroup group; + private EventLoopGroup group; private Channel channel; - ChannelHandlerContext context; - private final Cache> responseFutures = CacheBuilder.newBuilder() - .maximumSize(10000) - .expireAfterWrite(2, TimeUnit.SECONDS) - .removalListener(new RemovalListener>() { - @Override - public void onRemoval(RemovalNotification> entry) { - if (entry.wasEvicted()) { - entry.getValue().completeExceptionally(new TimeoutException()); - } - } - }) - .build(); + private ChannelHandlerContext context; + + private final Map> responseFutures = new HashMap<>(1000); private long requestId; - private final ScheduledExecutorService connectionMonitor; + + private final ChannelInboundHandlerAdapter channelHandler = new SimpleChannelInboundHandler() { + @Override + public void channelActive(ChannelHandlerContext context) { + NettyTcpProtocolClient.this.context = context; + } + + @Override + protected void channelRead0(ChannelHandlerContext context, byte[] message) throws Exception { + ByteBuffer buffer = ByteBuffer.wrap(message); + long responseId = buffer.getLong(); + CompletableFuture responseFuture = responseFutures.remove(responseId); + if (responseFuture != null) { + responseFuture.complete(buffer.slice()); + } + } + }; public NettyTcpProtocolClient(String host, int port, NettyTcpProtocol protocol) { this.host = host; this.port = port; this.protocol = protocol; - this.group = new NioEventLoopGroup(protocol.getThreads()); - connectionMonitor = - Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("netty-tcp-connection-monitor-" + host + ":" + port + "-%d")); - connectionMonitor.scheduleWithFixedDelay((() -> connect()), 0, 100, TimeUnit.MILLISECONDS); } @Override public CompletableFuture write(ByteBuffer request) { final CompletableFuture future = new CompletableFuture<>(); if (channel != null) { - request.rewind(); long requestId = ++this.requestId; - ByteBuf requestBuffer = context.alloc().buffer(request.remaining() + 12); - requestBuffer.writeLong(requestId); - requestBuffer.writeBytes(request); - responseFutures.put(requestId, future); - channel.writeAndFlush(requestBuffer).addListener((channelFuture) -> { - if (!channelFuture.isSuccess()) { - future.completeExceptionally(new ProtocolException(channelFuture.cause())); - } - }); - } else { - future.completeExceptionally(new ProtocolException("Client not connected")); - } - return future; + ByteBuffer buffer = ByteBuffer.allocate(request.limit() + 8); + buffer.putLong(requestId); + buffer.put(request); + channel.writeAndFlush(buffer.array()).addListener((channelFuture) -> { + if (channelFuture.isSuccess()) { + responseFutures.put(requestId, future); + } else { + future.completeExceptionally(new ProtocolException(channelFuture.cause())); + } + }); + } else { + future.completeExceptionally(new ProtocolException("Client not connected")); + } + return future; } @Override @@ -123,6 +115,7 @@ public CompletableFuture connect() { sslContext = null; } + group = new NioEventLoopGroup(protocol.getThreads()); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) @@ -133,13 +126,11 @@ protected void initChannel(SocketChannel channel) throws Exception { if (sslContext != null) { pipeline.addLast(sslContext.newHandler(channel.alloc(), host, port)); } - pipeline.addLast( - //new ObjectEncoder(), - //new ObjectDecoder(ClassResolvers.softCachingConcurrentResolver(getClass().getClassLoader())), - new MessageEncoder(), - new MessageDecoder(), - new TcpProtocolClientHandler(NettyTcpProtocolClient.this) - ); + pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4)); + pipeline.addLast("bytesDecoder", new ByteArrayDecoder()); + pipeline.addLast("frameEncoder", new LengthFieldPrepender(4)); + pipeline.addLast("bytesEncoder", new ByteArrayEncoder()); + pipeline.addLast("handler", channelHandler); } }); @@ -160,15 +151,12 @@ protected void initChannel(SocketChannel channel) throws Exception { bootstrap.option(ChannelOption.SO_KEEPALIVE, true); bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, protocol.getConnectTimeout()); - bootstrap.connect(host, port).addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture channelFuture) throws Exception { - if (channelFuture.isSuccess()) { - channel = channelFuture.channel(); - future.complete(null); - } else { - future.completeExceptionally(channelFuture.cause()); - } + bootstrap.connect(host, port).addListener((ChannelFutureListener) channelFuture -> { + if (channelFuture.isSuccess()) { + channel = channelFuture.channel(); + future.complete(null); + } else { + future.completeExceptionally(channelFuture.cause()); } }); return future; @@ -178,15 +166,12 @@ public void operationComplete(ChannelFuture channelFuture) throws Exception { public CompletableFuture close() { final CompletableFuture future = new CompletableFuture<>(); if (channel != null) { - channel.close().addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture channelFuture) throws Exception { - channel = null; - if (channelFuture.isSuccess()) { - future.complete(null); - } else { - future.completeExceptionally(channelFuture.cause()); - } + channel.close().addListener(channelFuture -> { + group.shutdownGracefully(); + if (channelFuture.isSuccess()) { + future.complete(null); + } else { + future.completeExceptionally(channelFuture.cause()); } }); } else { @@ -195,35 +180,9 @@ public void operationComplete(ChannelFuture channelFuture) throws Exception { return future; } - /** - * Client response handler. - */ - private static class TcpProtocolClientHandler extends ChannelInboundHandlerAdapter { - private final NettyTcpProtocolClient client; - - private TcpProtocolClientHandler(NettyTcpProtocolClient client) { - this.client = client; - } - - public void channelActive(final ChannelHandlerContext context) { - client.context = context; - } - - @Override - @SuppressWarnings({"unchecked", "rawtypes"}) - public void channelRead(final ChannelHandlerContext context, Object message) { - ByteBuf response = (ByteBuf) message; - long requestId = response.readLong(); - CompletableFuture responseFuture = client.responseFutures.getIfPresent(requestId); - if (responseFuture != null) { - responseFuture.complete(response.slice().nioBuffer()); - client.responseFutures.invalidate(requestId); - } - } - - @Override - public void exceptionCaught(ChannelHandlerContext context, Throwable cause) { - context.close(); - } + @Override + public String toString() { + return getClass().getSimpleName(); } + } diff --git a/netty/src/main/java/net/kuujo/copycat/netty/NettyTcpProtocolServer.java b/netty/src/main/java/net/kuujo/copycat/netty/NettyTcpProtocolServer.java index c35f168f5c..4e3bb7bbd3 100644 --- a/netty/src/main/java/net/kuujo/copycat/netty/NettyTcpProtocolServer.java +++ b/netty/src/main/java/net/kuujo/copycat/netty/NettyTcpProtocolServer.java @@ -16,22 +16,21 @@ package net.kuujo.copycat.netty; import io.netty.bootstrap.ServerBootstrap; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; -import io.netty.handler.codec.serialization.ClassResolvers; -import io.netty.handler.codec.serialization.ObjectDecoder; -import io.netty.handler.codec.serialization.ObjectEncoder; +import io.netty.handler.codec.LengthFieldBasedFrameDecoder; +import io.netty.handler.codec.LengthFieldPrepender; +import io.netty.handler.codec.bytes.ByteArrayDecoder; +import io.netty.handler.codec.bytes.ByteArrayEncoder; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.util.SelfSignedCertificate; import net.kuujo.copycat.protocol.ProtocolHandler; import net.kuujo.copycat.protocol.ProtocolServer; import javax.net.ssl.SSLException; - +import java.nio.ByteBuffer; import java.security.cert.CertificateException; import java.util.concurrent.CompletableFuture; @@ -45,6 +44,8 @@ public class NettyTcpProtocolServer implements ProtocolServer { private final int port; private final NettyTcpProtocol protocol; private ProtocolHandler handler; + private EventLoopGroup serverGroup; + private EventLoopGroup workerGroup; private Channel channel; public NettyTcpProtocolServer(String host, int port, NettyTcpProtocol protocol) { @@ -76,29 +77,27 @@ public CompletableFuture listen() { sslContext = null; } - final EventLoopGroup serverGroup = new NioEventLoopGroup(); - final EventLoopGroup workerGroup = new NioEventLoopGroup(protocol.getThreads()); + serverGroup = new NioEventLoopGroup(); + workerGroup = new NioEventLoopGroup(protocol.getThreads()); final ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(serverGroup, workerGroup) - .channel(NioServerSocketChannel.class) - .childHandler(new ChannelInitializer() { - @Override - public void initChannel(SocketChannel channel) throws Exception { - ChannelPipeline pipeline = channel.pipeline(); - if (sslContext != null) { - pipeline.addLast(sslContext.newHandler(channel.alloc())); + .channel(NioServerSocketChannel.class) + .childHandler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel channel) throws Exception { + ChannelPipeline pipeline = channel.pipeline(); + if (sslContext != null) { + pipeline.addLast(sslContext.newHandler(channel.alloc())); + } + pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4)); + pipeline.addLast("bytesDecoder", new ByteArrayDecoder()); + pipeline.addLast("frameEncoder", new LengthFieldPrepender(4)); + pipeline.addLast("bytesEncoder", new ByteArrayEncoder()); + pipeline.addLast("handler", new ServerHandler()); } - pipeline.addLast( - //new ObjectEncoder(), - //new ObjectDecoder(ClassResolvers.softCachingConcurrentResolver(getClass().getClassLoader())), - new MessageEncoder(), - new MessageDecoder(), - new TcpProtocolServerHandler(NettyTcpProtocolServer.this) - ); - } - }) - .option(ChannelOption.SO_BACKLOG, 128); + }) + .option(ChannelOption.SO_BACKLOG, 128); if (protocol.getSendBufferSize() > -1) { bootstrap.option(ChannelOption.SO_SNDBUF, protocol.getSendBufferSize()); @@ -120,22 +119,16 @@ public void initChannel(SocketChannel channel) throws Exception { bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true); // Bind and start to accept incoming connections. - bootstrap.bind(host, port).addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture channelFuture) throws Exception { - channelFuture.channel().closeFuture().addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - workerGroup.shutdownGracefully(); - } - }); + bootstrap.bind(host, port).addListener((ChannelFutureListener) channelFuture -> { + channelFuture.channel().closeFuture().addListener(closeFuture -> { + workerGroup.shutdownGracefully(); + }); - if (channelFuture.isSuccess()) { - channel = channelFuture.channel(); - future.complete(null); - } else { - future.completeExceptionally(channelFuture.cause()); - } + if (channelFuture.isSuccess()) { + channel = channelFuture.channel(); + future.complete(null); + } else { + future.completeExceptionally(channelFuture.cause()); } }); return future; @@ -145,14 +138,13 @@ public void operationComplete(ChannelFuture future) throws Exception { public CompletableFuture close() { final CompletableFuture future = new CompletableFuture<>(); if (channel != null) { - channel.close().addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture channelFuture) throws Exception { - if (channelFuture.isSuccess()) { - future.complete(null); - } else { - future.completeExceptionally(channelFuture.cause()); - } + channel.close().addListener(channelFuture -> { + workerGroup.shutdownGracefully(); + serverGroup.shutdownGracefully(); + if (channelFuture.isSuccess()) { + future.complete(null); + } else { + future.completeExceptionally(channelFuture.cause()); } }); } else { @@ -161,32 +153,27 @@ public void operationComplete(ChannelFuture channelFuture) throws Exception { return future; } - /** - * Server request handler. - */ - private static class TcpProtocolServerHandler extends ChannelInboundHandlerAdapter { - private final NettyTcpProtocolServer server; - - private TcpProtocolServerHandler(NettyTcpProtocolServer server) { - this.server = server; - } - + @Override + public String toString() { + return getClass().getSimpleName(); + } + + private class ServerHandler extends SimpleChannelInboundHandler { @Override - public void channelRead(final ChannelHandlerContext context, Object message) { - ByteBuf request = (ByteBuf) message; - long requestId = request.readLong(); - - if (server.handler != null) { - context.channel().eventLoop().submit(() -> server.handler.apply(request.slice().nioBuffer()).whenComplete((result, error) -> { - try { - ByteBuf responseBuffer = context.alloc().buffer(request.readableBytes() + 12); - responseBuffer.writeLong(requestId); - responseBuffer.writeBytes(result); - context.writeAndFlush(responseBuffer); - } finally { - request.release(); + protected void channelRead0(ChannelHandlerContext context, byte[] message) throws Exception { + if (handler != null) { + ByteBuffer buffer = ByteBuffer.wrap(message); + long requestId = buffer.getLong(); + handler.apply(buffer.slice()).whenComplete((result, error) -> { + if (error == null) { + context.channel().eventLoop().execute(() -> { + ByteBuffer response = ByteBuffer.allocate(result.limit() + 8); + response.putLong(requestId); + response.put(result); + context.writeAndFlush(response.array()); + }); } - })); + }); } } @@ -194,7 +181,6 @@ public void channelRead(final ChannelHandlerContext context, Object message) { public void exceptionCaught(ChannelHandlerContext context, Throwable cause) { context.close(); } - } } diff --git a/test-tools/.gitignore b/test-tools/.gitignore deleted file mode 100644 index ba39d92495..0000000000 --- a/test-tools/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -test-output/ -/target diff --git a/test-tools/src/main/java/net/kuujo/copycat/test/ProtocolTest.java b/test-tools/src/main/java/net/kuujo/copycat/test/ProtocolTest.java index 55d16a6516..4101da9081 100644 --- a/test-tools/src/main/java/net/kuujo/copycat/test/ProtocolTest.java +++ b/test-tools/src/main/java/net/kuujo/copycat/test/ProtocolTest.java @@ -192,6 +192,7 @@ public void testSendReceive() throws Throwable { client.connect().thenRunAsync(this::resume); await(5000); + expectResume(); client.write(ByteBuffer.wrap("Hello world!".getBytes())).thenAcceptAsync(buffer -> { byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes); @@ -200,6 +201,7 @@ public void testSendReceive() throws Throwable { }); await(5000); + expectResume(); client.write(ByteBuffer.wrap("Hello world!".getBytes())).thenAcceptAsync(buffer -> { byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes); @@ -208,6 +210,7 @@ public void testSendReceive() throws Throwable { }); await(5000); + expectResume(); client.write(ByteBuffer.wrap("Hello world!".getBytes())).thenAcceptAsync(buffer -> { byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes); @@ -215,6 +218,14 @@ public void testSendReceive() throws Throwable { resume(); }); await(5000); + + expectResume(); + client.close().thenRunAsync(this::resume); + await(2500); + + expectResume(); + server.close().thenRunAsync(this::resume); + await(2500); } } diff --git a/test-tools/src/main/java/net/kuujo/copycat/test/TestCluster.java b/test-tools/src/main/java/net/kuujo/copycat/test/TestCluster.java index 3809c0b89c..e09fc26b57 100644 --- a/test-tools/src/main/java/net/kuujo/copycat/test/TestCluster.java +++ b/test-tools/src/main/java/net/kuujo/copycat/test/TestCluster.java @@ -29,6 +29,7 @@ * @author Jordan Halterman */ public class TestCluster> { + private static int id; private final List activeResources; private final List passiveResources; @@ -102,10 +103,10 @@ public TestCluster build() { List activeResources = new ArrayList<>(activeMembers); - int i = 0; Set members = new HashSet<>(activeMembers); - while (i <= activeMembers) { - String uri = uriFactory.apply(++i); + int activeCount = activeMembers + id; + while (id <= activeCount) { + String uri = uriFactory.apply(id++); members.add(uri); } @@ -115,8 +116,9 @@ public TestCluster build() { } List passiveResources = new ArrayList<>(passiveMembers); - while (i <= passiveMembers + activeMembers) { - String member = uriFactory.apply(++i); + int passiveCount = passiveMembers + id; + while (id <= passiveCount) { + String member = uriFactory.apply(id++); ClusterConfig cluster = clusterFactory.apply(members).withLocalMember(member); passiveResources.add(resourceFactory.apply(cluster)); }