| package org.rivoreo.nettychannels; |
| |
| import io.netty.channel.nio.AbstractNioByteChannel; |
| import io.netty.channel.socket.DuplexChannel; |
| import io.netty.channel.socket.SocketChannelConfig; |
| import io.netty.channel.DefaultChannelConfig; |
| import io.netty.channel.ChannelOption; |
| import io.netty.channel.ChannelException; |
| import io.netty.channel.RecvByteBufAllocator; |
| import io.netty.channel.MessageSizeEstimator; |
| import io.netty.channel.ChannelMetadata; |
| import io.netty.channel.Channel; |
| import io.netty.channel.FileRegion; |
| import io.netty.channel.WriteBufferWaterMark; |
| import io.netty.channel.EventLoop; |
| import io.netty.channel.ChannelFuture; |
| import io.netty.channel.ChannelFutureListener; |
| import io.netty.channel.ChannelPromise; |
| import io.netty.buffer.ByteBufAllocator; |
| import io.netty.buffer.ByteBuf; |
| import io.netty.util.internal.SocketUtils; |
| import jnr.unixsocket.UnixSocketChannel; |
| import jnr.unixsocket.UnixSocket; |
| import jnr.unixsocket.UnixSocketAddress; |
| import java.nio.channels.SelectionKey; |
| import java.net.SocketException; |
| import java.net.SocketAddress; |
| import java.util.Map; |
| |
| final class JNRUnixDomainSocketChannelConfig extends DefaultChannelConfig implements SocketChannelConfig { |
| public JNRUnixDomainSocketChannelConfig(JNRUnixDomainSocketChannel channel, UnixSocket socket) { |
| super(channel); |
| this.socket = socket; |
| } |
| |
| protected UnixSocket socket; |
| private boolean is_half_closure_allowed; |
| |
| @Override |
| public Map<ChannelOption<?>, Object> getOptions() { |
| return getOptions(super.getOptions(), ChannelOption.SO_RCVBUF, |
| ChannelOption.SO_SNDBUF, ChannelOption.SO_LINGER); |
| } |
| |
| @Override |
| public <T> T getOption(ChannelOption<T> option) { |
| if(option == ChannelOption.SO_RCVBUF) return (T)Integer.valueOf(getReceiveBufferSize()); |
| if(option == ChannelOption.SO_SNDBUF) return (T)Integer.valueOf(getSendBufferSize()); |
| if(option == ChannelOption.SO_LINGER) return (T)Integer.valueOf(getSoLinger()); |
| return super.getOption(option); |
| } |
| |
| @Override |
| public <T> boolean setOption(ChannelOption<T> option, T value) { |
| if(option == ChannelOption.SO_RCVBUF) setReceiveBufferSize(((Integer)value).intValue()); |
| else if(option == ChannelOption.SO_SNDBUF) setSendBufferSize(((Integer)value).intValue()); |
| else if(option == ChannelOption.SO_LINGER) setSoLinger(((Integer)value).intValue()); |
| else return super.setOption(option, value); |
| return true; |
| } |
| |
| @Override |
| public boolean isReuseAddress() { |
| return false; |
| } |
| |
| @Override |
| public SocketChannelConfig setReuseAddress(boolean value) { |
| // Ignore |
| return this; |
| } |
| |
| @Override |
| public int getReceiveBufferSize() { |
| try { |
| return socket.getReceiveBufferSize(); |
| } catch(SocketException e) { |
| throw new ChannelException(e); |
| } |
| } |
| |
| @Override |
| public int getSendBufferSize() { |
| try { |
| return socket.getSendBufferSize(); |
| } catch(SocketException e) { |
| throw new ChannelException(e); |
| } |
| } |
| |
| @Override |
| public int getSoLinger() { |
| try { |
| return socket.getSoLinger(); |
| } catch(SocketException e) { |
| throw new ChannelException(e); |
| } |
| } |
| |
| @Override |
| public boolean isTcpNoDelay() { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public SocketChannelConfig setTcpNoDelay(boolean value) { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public int getTrafficClass() { |
| try { |
| return socket.getTrafficClass(); |
| } catch(SocketException e) { |
| throw new ChannelException(e); |
| } |
| } |
| |
| @Override |
| public boolean isKeepAlive() { |
| return false; |
| } |
| |
| @Override |
| public SocketChannelConfig setKeepAlive(boolean value) { |
| // Ignore |
| return this; |
| } |
| |
| @Override |
| public SocketChannelConfig setPerformancePreferences(int connect_time, int latency, int bandwidth) { |
| socket.setPerformancePreferences(connect_time, latency, bandwidth); |
| return this; |
| } |
| |
| @Override |
| public SocketChannelConfig setReceiveBufferSize(int size) { |
| try { |
| socket.setReceiveBufferSize(size); |
| } catch(SocketException e) { |
| throw new ChannelException(e); |
| } |
| return this; |
| } |
| |
| @Override |
| public SocketChannelConfig setSendBufferSize(int size) { |
| try { |
| socket.setSendBufferSize(size); |
| } catch(SocketException e) { |
| throw new ChannelException(e); |
| } |
| return this; |
| } |
| |
| @Override |
| public SocketChannelConfig setSoLinger(int linger) { |
| try { |
| if(linger < 0) socket.setSoLinger(false, 0); |
| else socket.setSoLinger(true, linger); |
| } catch(SocketException e) { |
| throw new ChannelException(e); |
| } |
| return this; |
| } |
| |
| @Override |
| public SocketChannelConfig setTrafficClass(int traffic_class) { |
| try { |
| socket.setTrafficClass(traffic_class); |
| } catch(SocketException e) { |
| throw new ChannelException(e); |
| } |
| return this; |
| } |
| |
| @Override |
| public boolean isAllowHalfClosure() { |
| return is_half_closure_allowed; |
| } |
| |
| @Override |
| public SocketChannelConfig setAllowHalfClosure(boolean value) { |
| is_half_closure_allowed = value; |
| return this; |
| } |
| |
| @Override |
| public SocketChannelConfig setConnectTimeoutMillis(int timeout) { |
| super.setConnectTimeoutMillis(timeout); |
| return this; |
| } |
| |
| @Override |
| public SocketChannelConfig setMaxMessagesPerRead(int value) { |
| super.setMaxMessagesPerRead(value); |
| return this; |
| } |
| |
| @Override |
| public SocketChannelConfig setWriteSpinCount(int value) { |
| super.setWriteSpinCount(value); |
| return this; |
| } |
| |
| @Override |
| public SocketChannelConfig setAllocator(ByteBufAllocator allocator) { |
| super.setAllocator(allocator); |
| return this; |
| } |
| |
| @Override |
| public SocketChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) { |
| super.setRecvByteBufAllocator(allocator); |
| return this; |
| } |
| |
| @Override |
| public SocketChannelConfig setAutoRead(boolean value) { |
| super.setAutoRead(value); |
| return this; |
| } |
| |
| @Override |
| public SocketChannelConfig setAutoClose(boolean value) { |
| super.setAutoClose(value); |
| return this; |
| } |
| |
| @Override |
| protected void autoReadCleared() { |
| ((JNRUnixDomainSocketChannel)channel).set_read_pending(false); |
| } |
| |
| @Override |
| public SocketChannelConfig setWriteBufferLowWaterMark(int low_water_mark) { |
| super.setWriteBufferLowWaterMark(low_water_mark); |
| return this; |
| } |
| |
| @Override |
| public SocketChannelConfig setWriteBufferHighWaterMark(int high_water_mark) { |
| super.setWriteBufferHighWaterMark(high_water_mark); |
| return this; |
| } |
| |
| @Override |
| public SocketChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) { |
| super.setMessageSizeEstimator(estimator); |
| return this; |
| } |
| |
| // #if NETTY41 |
| @Override |
| public SocketChannelConfig setWriteBufferWaterMark(WriteBufferWaterMark water_mark) { |
| super.setWriteBufferWaterMark(water_mark); |
| return this; |
| } |
| // #endif |
| } |
| |
| public class JNRUnixDomainSocketChannel extends AbstractNioByteChannel implements DuplexChannel { |
| public JNRUnixDomainSocketChannel(Channel parent, UnixSocketChannel channel) { |
| super(parent, channel); |
| config = new JNRUnixDomainSocketChannelConfig(this, channel.socket()); |
| } |
| |
| public JNRUnixDomainSocketChannel(UnixSocketChannel channel) { |
| this(null, channel); |
| } |
| |
| private static final ChannelMetadata METADATA = new ChannelMetadata(false); |
| |
| private JNRUnixDomainSocketChannelConfig config; |
| |
| public void set_read_pending(boolean value) { |
| setReadPending(value); |
| } |
| |
| @Override |
| public JNRUnixDomainSocketChannel parent() { |
| return (JNRUnixDomainSocketChannel)super.parent(); |
| } |
| |
| @Override |
| public ChannelMetadata metadata() { |
| return METADATA; |
| } |
| |
| @Override |
| public JNRUnixDomainSocketChannelConfig config() { |
| return config; |
| } |
| |
| @Override |
| protected UnixSocketChannel javaChannel() { |
| return (UnixSocketChannel)super.javaChannel(); |
| } |
| |
| @Override |
| public boolean isActive() { |
| UnixSocketChannel channel = javaChannel(); |
| return channel.isOpen() && channel.isConnected(); |
| } |
| |
| @Override |
| public boolean isInputShutdown() { |
| //return super.isInputShutdown(); |
| return javaChannel().socket().isInputShutdown() || !isActive(); |
| } |
| |
| @Override |
| public boolean isOutputShutdown() { |
| return javaChannel().socket().isOutputShutdown() || !isActive(); |
| } |
| |
| @Override |
| public UnixSocketAddress localAddress() { |
| return (UnixSocketAddress)super.localAddress(); |
| } |
| |
| @Override |
| public UnixSocketAddress remoteAddress() { |
| return (UnixSocketAddress)super.remoteAddress(); |
| } |
| |
| @Override |
| protected SocketAddress localAddress0() { |
| return javaChannel().socket().getLocalSocketAddress(); |
| } |
| |
| @Override |
| protected SocketAddress remoteAddress0() { |
| return javaChannel().socket().getRemoteSocketAddress(); |
| } |
| |
| @Override |
| protected void doBind(SocketAddress local_address) throws Exception { |
| SocketUtils.bind(javaChannel(), local_address); |
| } |
| |
| @Override |
| protected boolean doConnect(SocketAddress remote_address, SocketAddress local_address) throws Exception { |
| if(local_address != null) doBind(local_address); |
| if(SocketUtils.connect(javaChannel(), remote_address)) try { |
| selectionKey().interestOps(SelectionKey.OP_CONNECT); |
| return true; |
| } catch(Throwable throwable) { |
| doClose(); |
| throw throwable; |
| } |
| return false; |
| } |
| |
| @Override |
| protected void doFinishConnect() throws Exception { |
| if(!javaChannel().finishConnect()) throw new Exception("finishConnect failed"); |
| } |
| |
| @Override |
| protected void doDisconnect() throws Exception { |
| doClose(); |
| } |
| |
| @Override |
| protected void doClose() throws Exception { |
| super.doClose(); |
| javaChannel().close(); |
| } |
| |
| @Override |
| protected int doReadBytes(ByteBuf buf) throws Exception { |
| return buf.writeBytes(javaChannel(), buf.writableBytes()); |
| } |
| |
| @Override |
| protected int doWriteBytes(ByteBuf buf) throws Exception { |
| int len = buf.readableBytes(); |
| return buf.readBytes(javaChannel(), len); |
| } |
| |
| @Override |
| protected long doWriteFileRegion(FileRegion region) throws Exception { |
| final long position = region.transfered(); |
| return region.transferTo(javaChannel(), position); |
| } |
| |
| /* |
| @Override |
| protected void doWrite(ChannelOutboundBuffer buffer) throws Exception { |
| int len; |
| while((len = buffer.size()) != 0) { |
| } |
| clearOpWrite(); |
| } |
| */ |
| |
| // #if NETTY41 |
| private void shutdown_input_now(final ChannelPromise promise) { |
| try { |
| javaChannel().shutdownInput(); |
| promise.setSuccess(); |
| } catch(Exception e) { |
| promise.setFailure(e); |
| } |
| } |
| |
| @Override |
| public ChannelFuture shutdownInput(final ChannelPromise promise) { |
| EventLoop loop = eventLoop(); |
| if(loop.inEventLoop()) { |
| shutdown_input_now(promise); |
| } else { |
| loop.execute(new Runnable() { |
| public void run() { |
| shutdown_input_now(promise); |
| } |
| }); |
| } |
| return promise; |
| } |
| |
| @Override |
| public ChannelFuture shutdownInput() { |
| return shutdownInput(newPromise()); |
| } |
| |
| private void shutdown_output_now(final ChannelPromise promise) { |
| try { |
| javaChannel().shutdownOutput(); |
| promise.setSuccess(); |
| } catch(Exception e) { |
| promise.setFailure(e); |
| } |
| } |
| |
| @Override |
| public ChannelFuture shutdownOutput(final ChannelPromise promise) { |
| final EventLoop loop = eventLoop(); |
| if(loop.inEventLoop()) { |
| shutdown_output_now(promise); |
| //((AbstractUnsafe)unsafe()).shutdownOutput(promise); |
| } else { |
| loop.execute(new Runnable() { |
| public void run() { |
| shutdown_output_now(promise); |
| //((AbstractUnsafe)unsafe()).shutdownOutput(promise); |
| } |
| }); |
| } |
| return promise; |
| } |
| |
| @Override |
| public ChannelFuture shutdownOutput() { |
| return shutdownOutput(newPromise()); |
| } |
| |
| private void shutdown_done(ChannelFuture shutdown_output_future, ChannelFuture shutdown_input_future, ChannelPromise promise) { |
| Throwable shutdown_output_cause = shutdown_output_future.cause(); |
| if(shutdown_output_cause != null) { |
| promise.setFailure(shutdown_output_cause); |
| return; |
| } |
| Throwable shutdown_input_cause = shutdown_input_future.cause(); |
| if(shutdown_input_cause != null) { |
| promise.setFailure(shutdown_input_cause); |
| return; |
| } |
| promise.setSuccess(); |
| } |
| |
| private void shutdown_output_done(final ChannelFuture shutdown_output_future, final ChannelPromise promise) { |
| ChannelFuture shutdown_input_future = shutdownInput(); |
| if(shutdown_input_future.isDone()) { |
| shutdown_done(shutdown_output_future, shutdown_input_future, promise); |
| } else { |
| shutdown_input_future.addListener(new ChannelFutureListener() { |
| public void operationComplete(ChannelFuture shutdown_input_future) throws Exception { |
| shutdown_done(shutdown_output_future, shutdown_input_future, promise); |
| } |
| }); |
| } |
| } |
| |
| @Override |
| public ChannelFuture shutdown(final ChannelPromise promise) { |
| ChannelFuture shutdown_output_future = shutdownOutput(); |
| if(shutdown_output_future.isDone()) { |
| shutdown_output_done(shutdown_output_future, promise); |
| } else { |
| shutdown_output_future.addListener(new ChannelFutureListener() { |
| public void operationComplete(final ChannelFuture shutdown_output_future) throws Exception { |
| shutdown_output_done(shutdown_output_future, promise); |
| } |
| }); |
| } |
| return promise; |
| } |
| |
| @Override |
| public ChannelFuture shutdown() { |
| return shutdown(newPromise()); |
| } |
| |
| public boolean isShutdown() { |
| UnixSocket socket = javaChannel().socket(); |
| return socket.isInputShutdown() && socket.isOutputShutdown() || !isActive(); |
| } |
| // #endif |
| } |