blob: c586b5d22e962a4014f15644983b1ad875eeb1a6 [file] [log] [blame] [raw]
/* JNR Unix domain socket integration for Netty 4.1
Copyright 2015-2020 Rivoreo
This Source Code Form is subject to the terms of the Mozilla Public
License, v. 2.0. If a copy of the MPL was not distributed with this
file, You can obtain one at https://mozilla.org/MPL/2.0/.
*/
package org.rivoreo.nettychannels;
//import io.netty.channel.AbstractServerChannel;
import io.netty.channel.nio.AbstractNioMessageChannel;
import io.netty.channel.socket.ServerSocketChannelConfig;
import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.ServerChannel;
import io.netty.channel.ChannelOption;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.MessageSizeEstimator;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.buffer.ByteBufAllocator;
import jnr.unixsocket.UnixServerSocketChannel;
import jnr.unixsocket.UnixServerSocket;
import jnr.unixsocket.UnixSocketChannel;
import jnr.unixsocket.UnixSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.spi.SelectorProvider;
import java.io.IOException;
import java.net.SocketAddress;
import java.util.Map;
import java.util.List;
import java.lang.reflect.Field;
class ServerJNRUnixDomainSocketChannelConfig extends DefaultChannelConfig implements ServerSocketChannelConfig {
public ServerJNRUnixDomainSocketChannelConfig(ServerJNRUnixDomainSocketChannel channel, UnixServerSocket socket) {
super(channel);
this.socket = socket;
}
private UnixServerSocket socket;
private int backlog;
@Override
public Map<ChannelOption<?>, Object> getOptions() {
return getOptions(super.getOptions(), ChannelOption.SO_BACKLOG);
}
@Override
public <T> T getOption(ChannelOption<T> option) {
//if(option == ChannelOption.SO_RCVBUF) return (T)Integer.valueOf(getReceiveBufferSize());
if(option == ChannelOption.SO_BACKLOG) return (T)Integer.valueOf(getBacklog());
return super.getOption(option);
}
@Override
public <T> boolean setOption(ChannelOption<T> option, T value) {
validate(option, value);
//if(option == ChannelOption.SO_RCVBUF) setReceiveBufferSize(value);
if(option == ChannelOption.SO_BACKLOG) setBacklog(((Integer)value).intValue());
else return super.setOption(option, value);
return true;
}
@Override
public boolean isReuseAddress() {
return false;
}
@Override
public ServerSocketChannelConfig setReuseAddress(boolean value) {
// Ignore
return this;
}
@Override
public int getReceiveBufferSize() {
throw new UnsupportedOperationException();
}
@Override
public ServerSocketChannelConfig setReceiveBufferSize(int size) {
// Ignore
return this;
}
@Override
public ServerSocketChannelConfig setPerformancePreferences(int connect_time, int latency, int bandwidth) {
//Ignore
return this;
}
@Override
public int getBacklog() {
return backlog;
}
public ServerJNRUnixDomainSocketChannelConfig setBacklog(int backlog) {
if(backlog < 0) throw new IllegalArgumentException(String.format("backlog is negative: %d", backlog));
this.backlog = backlog;
return this;
}
@Override
public ServerSocketChannelConfig setConnectTimeoutMillis(int timeout) {
super.setConnectTimeoutMillis(timeout);
return this;
}
@Override
public ServerSocketChannelConfig setMaxMessagesPerRead(int value) {
super.setMaxMessagesPerRead(value);
return this;
}
@Override
public ServerSocketChannelConfig setWriteSpinCount(int value) {
super.setWriteSpinCount(value);
return this;
}
@Override
public ServerSocketChannelConfig setAllocator(ByteBufAllocator allocator) {
super.setAllocator(allocator);
return this;
}
@Override
public ServerSocketChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) {
super.setRecvByteBufAllocator(allocator);
return this;
}
@Override
public ServerSocketChannelConfig setAutoRead(boolean value) {
super.setAutoRead(value);
return this;
}
@Override
protected void autoReadCleared() {
((ServerJNRUnixDomainSocketChannel)channel).set_read_pending(false);
}
@Override
public ServerSocketChannelConfig setWriteBufferWaterMark(WriteBufferWaterMark water_mark) {
super.setWriteBufferWaterMark(water_mark);
return this;
}
@Override
public ServerSocketChannelConfig setWriteBufferLowWaterMark(int low_water_mark) {
super.setWriteBufferLowWaterMark(low_water_mark);
return this;
}
@Override
public ServerSocketChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) {
super.setMessageSizeEstimator(estimator);
return this;
}
// #if NETTY41
@Override
public ServerSocketChannelConfig setWriteBufferHighWaterMark(int high_water_mark) {
super.setWriteBufferHighWaterMark(high_water_mark);
return this;
}
// #endif
}
//public class ServerJNRUnixDomainSocketChannel extends AbstractServerChannel {
public class ServerJNRUnixDomainSocketChannel extends AbstractNioMessageChannel implements ServerChannel {
public ServerJNRUnixDomainSocketChannel(UnixServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new ServerJNRUnixDomainSocketChannelConfig(this, channel.socket());
try {
jnr_unix_domain_server_socket_local_address_field = UnixServerSocket.class.getDeclaredField("localAddress");
jnr_unix_domain_server_socket_local_address_field.setAccessible(true);
} catch(ReflectiveOperationException e) {
e.printStackTrace();
}
}
private static UnixServerSocketChannel create_server_socket_channel() {
try {
return UnixServerSocketChannel.open();
} catch(IOException e) {
throw new ChannelException("Failed to open Unix domain server socket channel", e);
}
}
public ServerJNRUnixDomainSocketChannel() {
this(create_server_socket_channel());
}
private static final ChannelMetadata METADATA = new ChannelMetadata(false);
//private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
private ServerJNRUnixDomainSocketChannelConfig config;
private Field jnr_unix_domain_server_socket_local_address_field;
@Override
public UnixSocketAddress localAddress() {
return (UnixSocketAddress)super.localAddress();
}
@Override
public UnixSocketAddress remoteAddress() {
return null;
}
@Override
public ChannelMetadata metadata() {
return METADATA;
}
@Override
public ServerSocketChannelConfig config() {
return config;
}
public void set_read_pending(boolean value) {
setReadPending(value);
}
@Override
protected UnixServerSocketChannel javaChannel() {
return (UnixServerSocketChannel)super.javaChannel();
}
@Override
public boolean isActive() {
if(jnr_unix_domain_server_socket_local_address_field != null) try {
Object o = jnr_unix_domain_server_socket_local_address_field.get(javaChannel().socket());
return o != null;
} catch(IllegalAccessException e) {
e.printStackTrace();
}
return false;
}
@Override
protected SocketAddress localAddress0() {
if(jnr_unix_domain_server_socket_local_address_field == null) return null;
try {
return (SocketAddress)jnr_unix_domain_server_socket_local_address_field.get(javaChannel());
} catch(IllegalAccessException e) {
e.printStackTrace();
return null;
}
}
@Override
protected void doBind(SocketAddress local_address) throws Exception {
javaChannel().socket().bind(local_address, config.getBacklog());
}
@Override
protected void doClose() throws Exception {
javaChannel().close();
}
@Override
protected int doReadMessages(List<Object> buffer) throws Exception {
UnixSocketChannel c_channel = javaChannel().accept();
if(c_channel == null) return 0;
try {
buffer.add(new JNRUnixDomainSocketChannel(this, c_channel));
return 1;
} catch(Exception e) {
//logger.warn("Failed to create a new channel from an accepted socket", e);
e.printStackTrace();
try {
c_channel.close();
} catch(Exception ee) {
//logger.warn("Failed to close client socket channel", ee);
ee.printStackTrace();
}
}
return 0;
}
@Override
protected boolean doConnect(SocketAddress remote_address, SocketAddress local_address) throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected void doFinishConnect() throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected SocketAddress remoteAddress0() {
return null;
}
@Override
protected void doDisconnect() throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer buffer) throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected final Object filterOutboundMessage(Object msg) throws Exception {
throw new UnsupportedOperationException();
}
}