Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions core/.gitignore

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ private void recursiveSync(ReplicaInfo member, boolean requireEntries, Completab
}
} else {
// If the request failed then record the member as INACTIVE.
LOGGER.debug("{} - Sync to {} failed", context.getLocalMember(), member);
LOGGER.warn("{} - Sync to {} failed: {}", context.getLocalMember(), member, error.getMessage());
future.completeExceptionally(error);
}
}
Expand Down
167 changes: 63 additions & 104 deletions netty/src/main/java/net/kuujo/copycat/netty/NettyTcpProtocolClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,91 +16,83 @@
package net.kuujo.copycat.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.bytes.ByteArrayDecoder;
import io.netty.handler.codec.bytes.ByteArrayEncoder;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import net.kuujo.copycat.protocol.ProtocolClient;
import net.kuujo.copycat.protocol.ProtocolException;
import net.kuujo.copycat.util.concurrent.NamedThreadFactory;

import javax.net.ssl.SSLException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
* Netty TCP protocol client.
*
* @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/
public class NettyTcpProtocolClient implements ProtocolClient {
private static final Logger LOGGER = LoggerFactory.getLogger(NettyTcpProtocolClient.class);
private final String host;
private final int port;
private final NettyTcpProtocol protocol;
private final EventLoopGroup group;
private EventLoopGroup group;
private Channel channel;
ChannelHandlerContext context;
private final Cache<Object, CompletableFuture<ByteBuffer>> responseFutures = CacheBuilder.newBuilder()
.maximumSize(10000)
.expireAfterWrite(2, TimeUnit.SECONDS)
.removalListener(new RemovalListener<Object, CompletableFuture<ByteBuffer>>() {
@Override
public void onRemoval(RemovalNotification<Object, CompletableFuture<ByteBuffer>> entry) {
if (entry.wasEvicted()) {
entry.getValue().completeExceptionally(new TimeoutException());
}
}
})
.build();
private ChannelHandlerContext context;

private final Map<Object, CompletableFuture<ByteBuffer>> responseFutures = new HashMap<>(1000);
private long requestId;
private final ScheduledExecutorService connectionMonitor;

private final ChannelInboundHandlerAdapter channelHandler = new SimpleChannelInboundHandler<byte[]>() {
@Override
public void channelActive(ChannelHandlerContext context) {
NettyTcpProtocolClient.this.context = context;
}

@Override
protected void channelRead0(ChannelHandlerContext context, byte[] message) throws Exception {
ByteBuffer buffer = ByteBuffer.wrap(message);
long responseId = buffer.getLong();
CompletableFuture<ByteBuffer> responseFuture = responseFutures.remove(responseId);
if (responseFuture != null) {
responseFuture.complete(buffer.slice());
}
}
};

public NettyTcpProtocolClient(String host, int port, NettyTcpProtocol protocol) {
this.host = host;
this.port = port;
this.protocol = protocol;
this.group = new NioEventLoopGroup(protocol.getThreads());
connectionMonitor =
Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("netty-tcp-connection-monitor-" + host + ":" + port + "-%d"));
connectionMonitor.scheduleWithFixedDelay((() -> connect()), 0, 100, TimeUnit.MILLISECONDS);
}

@Override
public CompletableFuture<ByteBuffer> write(ByteBuffer request) {
final CompletableFuture<ByteBuffer> future = new CompletableFuture<>();
if (channel != null) {
request.rewind();
long requestId = ++this.requestId;
ByteBuf requestBuffer = context.alloc().buffer(request.remaining() + 12);
requestBuffer.writeLong(requestId);
requestBuffer.writeBytes(request);
responseFutures.put(requestId, future);
channel.writeAndFlush(requestBuffer).addListener((channelFuture) -> {
if (!channelFuture.isSuccess()) {
future.completeExceptionally(new ProtocolException(channelFuture.cause()));
}
});
} else {
future.completeExceptionally(new ProtocolException("Client not connected"));
}
return future;
ByteBuffer buffer = ByteBuffer.allocate(request.limit() + 8);
buffer.putLong(requestId);
buffer.put(request);
channel.writeAndFlush(buffer.array()).addListener((channelFuture) -> {
if (channelFuture.isSuccess()) {
responseFutures.put(requestId, future);
} else {
future.completeExceptionally(new ProtocolException(channelFuture.cause()));
}
});
} else {
future.completeExceptionally(new ProtocolException("Client not connected"));
}
return future;
}

@Override
Expand All @@ -123,6 +115,7 @@ public CompletableFuture<Void> connect() {
sslContext = null;
}

group = new NioEventLoopGroup(protocol.getThreads());
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
Expand All @@ -133,13 +126,11 @@ protected void initChannel(SocketChannel channel) throws Exception {
if (sslContext != null) {
pipeline.addLast(sslContext.newHandler(channel.alloc(), host, port));
}
pipeline.addLast(
//new ObjectEncoder(),
//new ObjectDecoder(ClassResolvers.softCachingConcurrentResolver(getClass().getClassLoader())),
new MessageEncoder(),
new MessageDecoder(),
new TcpProtocolClientHandler(NettyTcpProtocolClient.this)
);
pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4));
pipeline.addLast("bytesDecoder", new ByteArrayDecoder());
pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
pipeline.addLast("bytesEncoder", new ByteArrayEncoder());
pipeline.addLast("handler", channelHandler);
}
});

Expand All @@ -160,15 +151,12 @@ protected void initChannel(SocketChannel channel) throws Exception {
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, protocol.getConnectTimeout());

bootstrap.connect(host, port).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if (channelFuture.isSuccess()) {
channel = channelFuture.channel();
future.complete(null);
} else {
future.completeExceptionally(channelFuture.cause());
}
bootstrap.connect(host, port).addListener((ChannelFutureListener) channelFuture -> {
if (channelFuture.isSuccess()) {
channel = channelFuture.channel();
future.complete(null);
} else {
future.completeExceptionally(channelFuture.cause());
}
});
return future;
Expand All @@ -178,15 +166,12 @@ public void operationComplete(ChannelFuture channelFuture) throws Exception {
public CompletableFuture<Void> close() {
final CompletableFuture<Void> future = new CompletableFuture<>();
if (channel != null) {
channel.close().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
channel = null;
if (channelFuture.isSuccess()) {
future.complete(null);
} else {
future.completeExceptionally(channelFuture.cause());
}
channel.close().addListener(channelFuture -> {
group.shutdownGracefully();
if (channelFuture.isSuccess()) {
future.complete(null);
} else {
future.completeExceptionally(channelFuture.cause());
}
});
} else {
Expand All @@ -195,35 +180,9 @@ public void operationComplete(ChannelFuture channelFuture) throws Exception {
return future;
}

/**
* Client response handler.
*/
private static class TcpProtocolClientHandler extends ChannelInboundHandlerAdapter {
private final NettyTcpProtocolClient client;

private TcpProtocolClientHandler(NettyTcpProtocolClient client) {
this.client = client;
}

public void channelActive(final ChannelHandlerContext context) {
client.context = context;
}

@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public void channelRead(final ChannelHandlerContext context, Object message) {
ByteBuf response = (ByteBuf) message;
long requestId = response.readLong();
CompletableFuture responseFuture = client.responseFutures.getIfPresent(requestId);
if (responseFuture != null) {
responseFuture.complete(response.slice().nioBuffer());
client.responseFutures.invalidate(requestId);
}
}

@Override
public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
context.close();
}
@Override
public String toString() {
return getClass().getSimpleName();
}

}
Loading