blob: 1571cae03537acc1a1ef0e7374dac1a18995c94f [file] [log] [blame] [raw]
package org.rivoreo.nettychannels;
import io.netty.channel.nio.AbstractNioByteChannel;
import io.netty.channel.socket.DuplexChannel;
import io.netty.channel.socket.SocketChannelConfig;
import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelException;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.MessageSizeEstimator;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.Channel;
import io.netty.channel.FileRegion;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.EventLoop;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelPromise;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBuf;
import io.netty.util.internal.SocketUtils;
import jnr.unixsocket.UnixSocketChannel;
import jnr.unixsocket.UnixSocket;
import jnr.unixsocket.UnixSocketAddress;
import java.nio.channels.SelectionKey;
import java.net.SocketException;
import java.net.SocketAddress;
import java.util.Map;
final class JNRUnixDomainSocketChannelConfig extends DefaultChannelConfig implements SocketChannelConfig {
public JNRUnixDomainSocketChannelConfig(JNRUnixDomainSocketChannel channel, UnixSocket socket) {
super(channel);
this.socket = socket;
}
protected UnixSocket socket;
private boolean is_half_closure_allowed;
@Override
public Map<ChannelOption<?>, Object> getOptions() {
return getOptions(super.getOptions(), ChannelOption.SO_RCVBUF,
ChannelOption.SO_SNDBUF, ChannelOption.SO_LINGER);
}
@Override
public <T> T getOption(ChannelOption<T> option) {
if(option == ChannelOption.SO_RCVBUF) return (T)Integer.valueOf(getReceiveBufferSize());
if(option == ChannelOption.SO_SNDBUF) return (T)Integer.valueOf(getSendBufferSize());
if(option == ChannelOption.SO_LINGER) return (T)Integer.valueOf(getSoLinger());
return super.getOption(option);
}
@Override
public <T> boolean setOption(ChannelOption<T> option, T value) {
if(option == ChannelOption.SO_RCVBUF) setReceiveBufferSize(((Integer)value).intValue());
else if(option == ChannelOption.SO_SNDBUF) setSendBufferSize(((Integer)value).intValue());
else if(option == ChannelOption.SO_LINGER) setSoLinger(((Integer)value).intValue());
else return super.setOption(option, value);
return true;
}
@Override
public boolean isReuseAddress() {
return false;
}
@Override
public SocketChannelConfig setReuseAddress(boolean value) {
// Ignore
return this;
}
@Override
public int getReceiveBufferSize() {
try {
return socket.getReceiveBufferSize();
} catch(SocketException e) {
throw new ChannelException(e);
}
}
@Override
public int getSendBufferSize() {
try {
return socket.getSendBufferSize();
} catch(SocketException e) {
throw new ChannelException(e);
}
}
@Override
public int getSoLinger() {
try {
return socket.getSoLinger();
} catch(SocketException e) {
throw new ChannelException(e);
}
}
@Override
public boolean isTcpNoDelay() {
throw new UnsupportedOperationException();
}
@Override
public SocketChannelConfig setTcpNoDelay(boolean value) {
throw new UnsupportedOperationException();
}
@Override
public int getTrafficClass() {
try {
return socket.getTrafficClass();
} catch(SocketException e) {
throw new ChannelException(e);
}
}
@Override
public boolean isKeepAlive() {
return false;
}
@Override
public SocketChannelConfig setKeepAlive(boolean value) {
// Ignore
return this;
}
@Override
public SocketChannelConfig setPerformancePreferences(int connect_time, int latency, int bandwidth) {
socket.setPerformancePreferences(connect_time, latency, bandwidth);
return this;
}
@Override
public SocketChannelConfig setReceiveBufferSize(int size) {
try {
socket.setReceiveBufferSize(size);
} catch(SocketException e) {
throw new ChannelException(e);
}
return this;
}
@Override
public SocketChannelConfig setSendBufferSize(int size) {
try {
socket.setSendBufferSize(size);
} catch(SocketException e) {
throw new ChannelException(e);
}
return this;
}
@Override
public SocketChannelConfig setSoLinger(int linger) {
try {
if(linger < 0) socket.setSoLinger(false, 0);
else socket.setSoLinger(true, linger);
} catch(SocketException e) {
throw new ChannelException(e);
}
return this;
}
@Override
public SocketChannelConfig setTrafficClass(int traffic_class) {
try {
socket.setTrafficClass(traffic_class);
} catch(SocketException e) {
throw new ChannelException(e);
}
return this;
}
@Override
public boolean isAllowHalfClosure() {
return is_half_closure_allowed;
}
@Override
public SocketChannelConfig setAllowHalfClosure(boolean value) {
is_half_closure_allowed = value;
return this;
}
@Override
public SocketChannelConfig setConnectTimeoutMillis(int timeout) {
super.setConnectTimeoutMillis(timeout);
return this;
}
@Override
public SocketChannelConfig setMaxMessagesPerRead(int value) {
super.setMaxMessagesPerRead(value);
return this;
}
@Override
public SocketChannelConfig setWriteSpinCount(int value) {
super.setWriteSpinCount(value);
return this;
}
@Override
public SocketChannelConfig setAllocator(ByteBufAllocator allocator) {
super.setAllocator(allocator);
return this;
}
@Override
public SocketChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) {
super.setRecvByteBufAllocator(allocator);
return this;
}
@Override
public SocketChannelConfig setAutoRead(boolean value) {
super.setAutoRead(value);
return this;
}
@Override
public SocketChannelConfig setAutoClose(boolean value) {
super.setAutoClose(value);
return this;
}
@Override
protected void autoReadCleared() {
((JNRUnixDomainSocketChannel)channel).set_read_pending(false);
}
@Override
public SocketChannelConfig setWriteBufferLowWaterMark(int low_water_mark) {
super.setWriteBufferLowWaterMark(low_water_mark);
return this;
}
@Override
public SocketChannelConfig setWriteBufferHighWaterMark(int high_water_mark) {
super.setWriteBufferHighWaterMark(high_water_mark);
return this;
}
@Override
public SocketChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) {
super.setMessageSizeEstimator(estimator);
return this;
}
// #if NETTY41
@Override
public SocketChannelConfig setWriteBufferWaterMark(WriteBufferWaterMark water_mark) {
super.setWriteBufferWaterMark(water_mark);
return this;
}
// #endif
}
public class JNRUnixDomainSocketChannel extends AbstractNioByteChannel implements DuplexChannel {
public JNRUnixDomainSocketChannel(Channel parent, UnixSocketChannel channel) {
super(parent, channel);
config = new JNRUnixDomainSocketChannelConfig(this, channel.socket());
}
public JNRUnixDomainSocketChannel(UnixSocketChannel channel) {
this(null, channel);
}
private static final ChannelMetadata METADATA = new ChannelMetadata(false);
private JNRUnixDomainSocketChannelConfig config;
public void set_read_pending(boolean value) {
setReadPending(value);
}
@Override
public JNRUnixDomainSocketChannel parent() {
return (JNRUnixDomainSocketChannel)super.parent();
}
@Override
public ChannelMetadata metadata() {
return METADATA;
}
@Override
public JNRUnixDomainSocketChannelConfig config() {
return config;
}
@Override
protected UnixSocketChannel javaChannel() {
return (UnixSocketChannel)super.javaChannel();
}
@Override
public boolean isActive() {
UnixSocketChannel channel = javaChannel();
return channel.isOpen() && channel.isConnected();
}
@Override
public boolean isInputShutdown() {
//return super.isInputShutdown();
return javaChannel().socket().isInputShutdown() || !isActive();
}
@Override
public boolean isOutputShutdown() {
return javaChannel().socket().isOutputShutdown() || !isActive();
}
@Override
public UnixSocketAddress localAddress() {
return (UnixSocketAddress)super.localAddress();
}
@Override
public UnixSocketAddress remoteAddress() {
return (UnixSocketAddress)super.remoteAddress();
}
@Override
protected SocketAddress localAddress0() {
return javaChannel().socket().getLocalSocketAddress();
}
@Override
protected SocketAddress remoteAddress0() {
return javaChannel().socket().getRemoteSocketAddress();
}
@Override
protected void doBind(SocketAddress local_address) throws Exception {
SocketUtils.bind(javaChannel(), local_address);
}
@Override
protected boolean doConnect(SocketAddress remote_address, SocketAddress local_address) throws Exception {
if(local_address != null) doBind(local_address);
if(SocketUtils.connect(javaChannel(), remote_address)) try {
selectionKey().interestOps(SelectionKey.OP_CONNECT);
return true;
} catch(Throwable throwable) {
doClose();
throw throwable;
}
return false;
}
@Override
protected void doFinishConnect() throws Exception {
if(!javaChannel().finishConnect()) throw new Exception("finishConnect failed");
}
@Override
protected void doDisconnect() throws Exception {
doClose();
}
@Override
protected void doClose() throws Exception {
super.doClose();
javaChannel().close();
}
@Override
protected int doReadBytes(ByteBuf buf) throws Exception {
return buf.writeBytes(javaChannel(), buf.writableBytes());
}
@Override
protected int doWriteBytes(ByteBuf buf) throws Exception {
int len = buf.readableBytes();
return buf.readBytes(javaChannel(), len);
}
@Override
protected long doWriteFileRegion(FileRegion region) throws Exception {
final long position = region.transfered();
return region.transferTo(javaChannel(), position);
}
/*
@Override
protected void doWrite(ChannelOutboundBuffer buffer) throws Exception {
int len;
while((len = buffer.size()) != 0) {
}
clearOpWrite();
}
*/
// #if NETTY41
private void shutdown_input_now(final ChannelPromise promise) {
try {
javaChannel().shutdownInput();
promise.setSuccess();
} catch(Exception e) {
promise.setFailure(e);
}
}
@Override
public ChannelFuture shutdownInput(final ChannelPromise promise) {
EventLoop loop = eventLoop();
if(loop.inEventLoop()) {
shutdown_input_now(promise);
} else {
loop.execute(new Runnable() {
public void run() {
shutdown_input_now(promise);
}
});
}
return promise;
}
@Override
public ChannelFuture shutdownInput() {
return shutdownInput(newPromise());
}
private void shutdown_output_now(final ChannelPromise promise) {
try {
javaChannel().shutdownOutput();
promise.setSuccess();
} catch(Exception e) {
promise.setFailure(e);
}
}
@Override
public ChannelFuture shutdownOutput(final ChannelPromise promise) {
final EventLoop loop = eventLoop();
if(loop.inEventLoop()) {
shutdown_output_now(promise);
//((AbstractUnsafe)unsafe()).shutdownOutput(promise);
} else {
loop.execute(new Runnable() {
public void run() {
shutdown_output_now(promise);
//((AbstractUnsafe)unsafe()).shutdownOutput(promise);
}
});
}
return promise;
}
@Override
public ChannelFuture shutdownOutput() {
return shutdownOutput(newPromise());
}
private void shutdown_done(ChannelFuture shutdown_output_future, ChannelFuture shutdown_input_future, ChannelPromise promise) {
Throwable shutdown_output_cause = shutdown_output_future.cause();
if(shutdown_output_cause != null) {
promise.setFailure(shutdown_output_cause);
return;
}
Throwable shutdown_input_cause = shutdown_input_future.cause();
if(shutdown_input_cause != null) {
promise.setFailure(shutdown_input_cause);
return;
}
promise.setSuccess();
}
private void shutdown_output_done(final ChannelFuture shutdown_output_future, final ChannelPromise promise) {
ChannelFuture shutdown_input_future = shutdownInput();
if(shutdown_input_future.isDone()) {
shutdown_done(shutdown_output_future, shutdown_input_future, promise);
} else {
shutdown_input_future.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture shutdown_input_future) throws Exception {
shutdown_done(shutdown_output_future, shutdown_input_future, promise);
}
});
}
}
@Override
public ChannelFuture shutdown(final ChannelPromise promise) {
ChannelFuture shutdown_output_future = shutdownOutput();
if(shutdown_output_future.isDone()) {
shutdown_output_done(shutdown_output_future, promise);
} else {
shutdown_output_future.addListener(new ChannelFutureListener() {
public void operationComplete(final ChannelFuture shutdown_output_future) throws Exception {
shutdown_output_done(shutdown_output_future, promise);
}
});
}
return promise;
}
@Override
public ChannelFuture shutdown() {
return shutdown(newPromise());
}
public boolean isShutdown() {
UnixSocket socket = javaChannel().socket();
return socket.isInputShutdown() && socket.isOutputShutdown() || !isActive();
}
// #endif
}