blob: 08d82dbe449ec42b558b020543ce3eeb50272e77 [file] [log] [blame] [raw]
package li.cil.oc.server.component
import java.io.BufferedWriter
import java.io.FileNotFoundException
import java.io.IOException
import java.io.InputStream
import java.io.OutputStreamWriter
import java.net._
import java.nio.ByteBuffer
import java.nio.channels.SelectionKey
import java.nio.channels.Selector
import java.nio.channels.SocketChannel
import java.util
import java.util.UUID
import java.util.concurrent._
import li.cil.oc.Constants
import li.cil.oc.OpenComputers
import li.cil.oc.Settings
import li.cil.oc.api.Network
import li.cil.oc.api.driver.DeviceInfo
import li.cil.oc.api.driver.DeviceInfo.DeviceAttribute
import li.cil.oc.api.driver.DeviceInfo.DeviceClass
import li.cil.oc.api.machine.Arguments
import li.cil.oc.api.machine.Callback
import li.cil.oc.api.machine.Context
import li.cil.oc.api.network._
import li.cil.oc.api.prefab
import li.cil.oc.api.prefab.AbstractValue
import li.cil.oc.util.ThreadPoolFactory
import net.minecraft.server.MinecraftServer
import scala.collection.convert.WrapAsJava._
import scala.collection.convert.WrapAsScala._
import scala.collection.mutable
class InternetCard extends prefab.ManagedEnvironment with DeviceInfo {
override val node = Network.newNode(this, Visibility.Network).
withComponent("internet", Visibility.Neighbors).
create()
protected var owner: Option[Context] = None
protected val connections = mutable.Set.empty[InternetCard.Closable]
// ----------------------------------------------------------------------- //
private final lazy val deviceInfo = Map(
DeviceAttribute.Class -> DeviceClass.Communication,
DeviceAttribute.Description -> "Internet modem",
DeviceAttribute.Vendor -> Constants.DeviceInfo.DefaultVendor,
DeviceAttribute.Product -> "SuperLink X-D4NK"
)
override def getDeviceInfo: util.Map[String, String] = deviceInfo
// ----------------------------------------------------------------------- //
@Callback(direct = true, doc = """function():boolean -- Returns whether HTTP requests can be made (config setting).""")
def isHttpEnabled(context: Context, args: Arguments): Array[AnyRef] = result(Settings.get.httpEnabled)
@Callback(doc = """function(url:string[, postData:string[, headers:table]]):userdata -- Starts an HTTP request. If this returns true, further results will be pushed using `http_response` signals.""")
def request(context: Context, args: Arguments): Array[AnyRef] = this.synchronized {
checkOwner(context)
val address = args.checkString(0)
if (!Settings.get.httpEnabled) {
return result(Unit, "http requests are unavailable")
}
if (connections.size >= Settings.get.maxConnections) {
throw new IOException("too many open connections")
}
val post = if (args.isString(1)) Option(args.checkString(1)) else None
val headers = if (args.isTable(2)) args.checkTable(2).collect {
case (key: String, value: AnyRef) => (key, value.toString)
}.toMap
else Map.empty[String, String]
if (!Settings.get.httpHeadersEnabled && headers.nonEmpty) {
return result(Unit, "http request headers are unavailable")
}
val request = new InternetCard.HTTPRequest(this, checkAddress(address), post, headers)
connections += request
result(request)
}
@Callback(direct = true, doc = """function():boolean -- Returns whether TCP connections can be made (config setting).""")
def isTcpEnabled(context: Context, args: Arguments): Array[AnyRef] = result(Settings.get.tcpEnabled)
@Callback(doc = """function(address:string[, port:number]):userdata -- Opens a new TCP connection. Returns the handle of the connection.""")
def connect(context: Context, args: Arguments): Array[AnyRef] = this.synchronized {
checkOwner(context)
val address = args.checkString(0)
val port = args.optInteger(1, -1)
if (!Settings.get.tcpEnabled) {
return result(Unit, "tcp connections are unavailable")
}
if (connections.size >= Settings.get.maxConnections) {
throw new IOException("too many open connections")
}
val uri = checkUri(address, port)
val socket = new InternetCard.TCPSocket(this, uri, port)
connections += socket
result(socket)
}
private def checkOwner(context: Context) {
if (owner.isEmpty || context.node != owner.get.node) {
throw new IllegalArgumentException("can only be used by the owning computer")
}
}
// ----------------------------------------------------------------------- //
override def onConnect(node: Node) {
super.onConnect(node)
if (owner.isEmpty && node.host.isInstanceOf[Context] && node.isNeighborOf(this.node)) {
owner = Some(node.host.asInstanceOf[Context])
}
}
override def onDisconnect(node: Node) = this.synchronized {
super.onDisconnect(node)
if (owner.isDefined && (node == this.node || node.host.isInstanceOf[Context] && (node.host.asInstanceOf[Context] == owner.get))) {
owner = None
this.synchronized {
connections.foreach(_.close())
connections.clear()
}
}
}
override def onMessage(message: Message) = this.synchronized {
super.onMessage(message)
message.data match {
case Array() if (message.name == "computer.stopped" || message.name == "computer.started") && owner.isDefined && message.source.address == owner.get.node.address =>
this.synchronized {
connections.foreach(_.close())
connections.clear()
}
case _ =>
}
}
// ----------------------------------------------------------------------- //
private def checkUri(address: String, port: Int): URI = {
try {
val parsed = new URI(address)
if (parsed.getHost != null && (parsed.getPort > 0 || port > 0)) {
return parsed
}
}
catch {
case _: Throwable =>
}
val simple = new URI("oc://" + address)
if (simple.getHost != null) {
if (simple.getPort > 0)
return simple
else if (port > 0)
return new URI(simple.toString + ":" + port)
}
throw new IllegalArgumentException("address could not be parsed or no valid port given")
}
private def checkAddress(address: String) = {
val url = try new URL(address)
catch {
case e: Throwable => throw new FileNotFoundException("invalid address")
}
val protocol = url.getProtocol
if (!protocol.matches("^https?$")) {
throw new FileNotFoundException("unsupported protocol")
}
url
}
}
object InternetCard {
private val threadPool = ThreadPoolFactory.create("Internet", Settings.get.internetThreads)
trait Closable {
def close(): Unit
}
object TCPNotifier extends Thread {
private var selector = Selector.open()
private val toAccept = new ConcurrentLinkedQueue[(SocketChannel, () => Unit)]
override def run(): Unit = {
while (true) {
try {
Stream.continually(toAccept.poll).takeWhile(_ != null).foreach({
case (channel: SocketChannel, action: (() => Unit)) =>
channel.register(selector, SelectionKey.OP_READ, action)
})
selector.select()
import scala.collection.JavaConversions._
val selectedKeys = selector.selectedKeys
val readableKeys = mutable.HashSet[SelectionKey]()
selectedKeys.filter(_.isReadable).foreach(key => {
key.attachment.asInstanceOf[() => Unit].apply()
readableKeys += key
})
if(readableKeys.nonEmpty) {
val newSelector = Selector.open()
selectedKeys.filter(!readableKeys.contains(_)).foreach(key => {
key.channel.register(newSelector, SelectionKey.OP_READ, key.attachment)
})
selector.close()
selector = newSelector
}
} catch {
case e: IOException =>
OpenComputers.log.error("Error in TCP selector loop.", e)
}
}
}
def add(e: (SocketChannel, () => Unit)) {
toAccept.offer(e)
selector.wakeup()
}
}
TCPNotifier.start()
class TCPSocket extends AbstractValue with Closable {
def this(owner: InternetCard, uri: URI, port: Int) {
this()
this.owner = Some(owner)
channel = SocketChannel.open()
channel.configureBlocking(false)
address = threadPool.submit(new AddressResolver(uri, port))
}
private var owner: Option[InternetCard] = None
private var address: Future[InetAddress] = null
private var channel: SocketChannel = null
private var isAddressResolved = false
private val id = UUID.randomUUID()
private def setupSelector() {
TCPNotifier.add((channel, () => {
owner match {
case Some(internetCard) =>
internetCard.node.sendToVisible("computer.signal", "internet_ready", id.toString)
case _ =>
channel.close()
}
}))
}
@Callback(doc = """function():boolean -- Ensures a socket is connected. Errors if the connection failed.""")
def finishConnect(context: Context, args: Arguments): Array[AnyRef] = {
val r = this.synchronized(result(checkConnected()))
setupSelector()
r
}
@Callback(doc = """function([n:number]):string -- Tries to read data from the socket stream. Returns the read byte array.""")
def read(context: Context, args: Arguments): Array[AnyRef] = this.synchronized {
val n = math.min(Settings.get.maxReadBuffer, math.max(0, args.optInteger(1, Int.MaxValue)))
if (checkConnected()) {
val buffer = ByteBuffer.allocate(n)
val read = channel.read(buffer)
if (read == -1) result(Unit)
else {
setupSelector()
result(buffer.array.view(0, read).toArray)
}
}
else result(Array.empty[Byte])
}
@Callback(doc = """function(data:string):number -- Tries to write data to the socket stream. Returns the number of bytes written.""")
def write(context: Context, args: Arguments): Array[AnyRef] = this.synchronized {
if (checkConnected()) {
val value = args.checkByteArray(0)
result(channel.write(ByteBuffer.wrap(value)))
}
else result(0)
}
@Callback(direct = true, doc = """function() -- Closes an open socket stream.""")
def close(context: Context, args: Arguments): Array[AnyRef] = this.synchronized {
close()
null
}
@Callback(direct = true, doc = """function():string -- Returns connection ID.""")
def id(context: Context, args: Arguments): Array[AnyRef] = this.synchronized {
result(id.toString)
}
override def dispose(context: Context): Unit = {
super.dispose(context)
close()
}
override def close(): Unit = {
owner.foreach(card => {
card.connections.remove(this)
address.cancel(true)
channel.close()
owner = None
address = null
channel = null
})
}
private def checkConnected() = {
if (owner.isEmpty) throw new IOException("connection lost")
try {
if (isAddressResolved) channel.finishConnect()
else if (address.isCancelled) {
// I don't think this can ever happen, Justin Case.
channel.close()
throw new IOException("bad connection descriptor")
}
else if (address.isDone) {
// Check for errors.
try address.get catch {
case e: ExecutionException => throw e.getCause
}
isAddressResolved = true
false
}
else false
}
catch {
case t: Throwable =>
close()
false
}
}
// This has to be an explicit internal class instead of an anonymous one
// because the scala compiler breaks otherwise. Yay for compiler bugs.
private class AddressResolver(val uri: URI, val port: Int) extends Callable[InetAddress] {
override def call(): InetAddress = {
val resolved = InetAddress.getByName(uri.getHost)
checkLists(resolved, uri.getHost)
val address = new InetSocketAddress(resolved, if (uri.getPort != -1) uri.getPort else port)
channel.connect(address)
resolved
}
}
}
def checkLists(inetAddress: InetAddress, host: String) {
if (Settings.get.httpHostWhitelist.length > 0 && !Settings.get.httpHostWhitelist.exists(_ (inetAddress, host))) {
throw new FileNotFoundException("address is not whitelisted")
}
if (Settings.get.httpHostBlacklist.length > 0 && Settings.get.httpHostBlacklist.exists(_ (inetAddress, host))) {
throw new FileNotFoundException("address is blacklisted")
}
}
class HTTPRequest extends AbstractValue with Closable {
def this(owner: InternetCard, url: URL, post: Option[String], headers: Map[String, String]) {
this()
this.owner = Some(owner)
this.stream = threadPool.submit(new RequestSender(url, post, headers))
}
private var owner: Option[InternetCard] = None
private var response: Option[(Int, String, AnyRef)] = None
private var stream: Future[InputStream] = null
private val queue = new ConcurrentLinkedQueue[Byte]()
private var reader: Future[_] = null
private var eof = false
@Callback(doc = """function():boolean -- Ensures a response is available. Errors if the connection failed.""")
def finishConnect(context: Context, args: Arguments): Array[AnyRef] = this.synchronized(result(checkResponse()))
@Callback(direct = true, doc = """function():number, string, table -- Get response code, message and headers.""")
def response(context: Context, args: Arguments): Array[AnyRef] = this.synchronized {
response match {
case Some((code, message, headers)) => result(code, message, headers)
case _ => result(Unit)
}
}
@Callback(doc = """function([n:number]):string -- Tries to read data from the response. Returns the read byte array.""")
def read(context: Context, args: Arguments): Array[AnyRef] = this.synchronized {
val n = math.min(Settings.get.maxReadBuffer, math.max(0, args.optInteger(1, Int.MaxValue)))
if (checkResponse()) {
if (eof && queue.isEmpty) result(Unit)
else {
val buffer = ByteBuffer.allocate(n)
var read = 0
while (!queue.isEmpty && read < n) {
buffer.put(queue.poll())
read += 1
}
if (read == 0) {
readMore()
}
result(buffer.array.view(0, read).toArray)
}
}
else result(Array.empty[Byte])
}
@Callback(direct = true, doc = """function() -- Closes an open socket stream.""")
def close(context: Context, args: Arguments): Array[AnyRef] = this.synchronized {
close()
null
}
override def dispose(context: Context): Unit = {
super.dispose(context)
close()
}
override def close(): Unit = {
owner.foreach(card => {
card.connections.remove(this)
stream.cancel(true)
if (reader != null) {
reader.cancel(true)
}
owner = None
stream = null
reader = null
})
}
private def checkResponse() = this.synchronized {
if (owner.isEmpty) throw new IOException("connection lost")
if (stream.isDone) {
if (reader == null) {
// Check for errors.
try stream.get catch {
case e: ExecutionException => throw e.getCause
}
readMore()
}
true
}
else false
}
private def readMore(): Unit = {
if (reader == null || reader.isCancelled || reader.isDone) {
if (!eof) reader = threadPool.submit(new Runnable {
override def run(): Unit = {
val buffer = new Array[Byte](Settings.get.maxReadBuffer)
val count = stream.get.read(buffer)
if (count < 0) {
eof = true
}
for (i <- 0 until count) {
queue.add(buffer(i))
}
}
})
}
}
// This one doesn't (see comment in TCP socket), but I like to keep it consistent.
private class RequestSender(val url: URL, val post: Option[String], val headers: Map[String, String]) extends Callable[InputStream] {
override def call() = try {
checkLists(InetAddress.getByName(url.getHost), url.getHost)
val proxy = Option(MinecraftServer.getServer.getServerProxy).getOrElse(java.net.Proxy.NO_PROXY)
url.openConnection(proxy) match {
case http: HttpURLConnection => try {
http.setDoInput(true)
headers.foreach(Function.tupled(http.setRequestProperty))
if (post.isDefined) {
http.setRequestMethod("POST")
http.setDoOutput(true)
http.setReadTimeout(Settings.get.httpTimeout)
val out = new BufferedWriter(new OutputStreamWriter(http.getOutputStream))
out.write(post.get)
out.close()
}
else {
http.setRequestMethod("GET")
http.setDoOutput(false)
}
val input = http.getInputStream
HTTPRequest.this.synchronized {
response = Some((http.getResponseCode, http.getResponseMessage, http.getHeaderFields))
}
input
}
catch {
case t: Throwable =>
http.disconnect()
throw t
}
case other => throw new IOException("unexpected connection type")
}
}
catch {
case e: UnknownHostException =>
throw new IOException("unknown host: " + Option(e.getMessage).getOrElse(e.toString))
case e: Throwable =>
throw new IOException(Option(e.getMessage).getOrElse(e.toString))
}
}
}
}