为什么与Netty相比,下面的Java NIO API如此缓慢?

huangapple go评论69阅读模式
英文:

Why is the following Java NIO API so slow when compared to netty

问题

我有以下的JAVA实现,用于使用NIO API创建一个简单的Web服务器。

package zion

import java.net._
import java.nio.ByteBuffer
import java.nio.channels._

object NHello {

  import java.nio.CharBuffer
  import java.nio.charset.Charset

  def helloWorldBytes: ByteBuffer = Charset
    .forName("ISO-8859-1")
    .newEncoder
    .encode(CharBuffer.wrap(httpResponse("NHello World\n")))

  def httpResponse(content: String): String = {
    val rn = "\r\n"
    List(
      "HTTP/1.1 200 OK",
      "Content-Type: text/html",
      "Connection: Keep-Alive",
      s"Content-Length: ${content.length()}",
      rn + content
    ).mkString(rn)
  }

  def main(args: Array[String]): Unit = {
    val port    = 8080
    val address = new InetSocketAddress(port)

    // Server Socket Channel
    val serverSocketChannel = ServerSocketChannel.open()
    serverSocketChannel.bind(address)
    serverSocketChannel.configureBlocking(false)

    // Selector
    val selector = Selector.open()
    serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT)

    while (true) {
      selector.select()
      val iterator = selector.selectedKeys().iterator()
      while (iterator.hasNext) {
        val key = iterator.next()
        if (key.isAcceptable) {
          val channel = serverSocketChannel.accept()
          channel.write(helloWorldBytes)
          channel.close()
        }

      }
      iterator.remove()
    }

    sys.addShutdownHook({
      println("Shutting down...")
      serverSocketChannel.close()
    })

    println("Exiting...")
  }
}

使用wrk测试,我每秒大约能够获得几千个请求。

wrk -t12 -c100 -d10s http://127.0.0.1:8080

与Netty相比,这似乎有点慢。使用Netty,我至少能够获得10到15倍的吞吐量。考虑到Netty也是构建在NIO之上的,我做错了什么吗?

我是否忽略了一些明显的性能优化?

英文:

I have the following JAVA implementation to create a simple web server using the NIO API.

package zion

import java.net._
import java.nio.ByteBuffer
import java.nio.channels._

object NHello {

  import java.nio.CharBuffer
  import java.nio.charset.Charset

  def helloWorldBytes: ByteBuffer = Charset
    .forName("ISO-8859-1")
    .newEncoder
    .encode(CharBuffer.wrap(httpResponse("NHello World\n")))

  def httpResponse(content: String): String = {
    val rn = "\r\n"
    List(
      "HTTP/1.1 200 OK",
      "Content-Type: text/html",
      "Connection: Keep-Alive",
      s"Content-Length: ${content.length()}",
      rn + content
    ).mkString(rn)
  }

  def main(args: Array[String]): Unit = {
    val port    = 8080
    val address = new InetSocketAddress(port)

    // Server Socket Channel
    val serverSocketChannel = ServerSocketChannel.open()
    serverSocketChannel.bind(address)
    serverSocketChannel.configureBlocking(false)

    // Selector
    val selector = Selector.open()
    serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT)

    while (true) {
      selector.select()
      val iterator = selector.selectedKeys().iterator()
      while (iterator.hasNext) {
        val key = iterator.next()
        if (key.isAcceptable) {
          val channel = serverSocketChannel.accept()
          channel.write(helloWorldBytes)
          channel.close()
        }

      }
      iterator.remove()
    }

    sys.addShutdownHook({
      println("Shutting down...")
      serverSocketChannel.close()
    })

    println("Exiting...")
  }
}

Using wrk I get around a few thousand requests per second.

wrk -t12 -c100 -d10s http://127.0.0.1:8080

This seems like a bit too slow when compared to Netty. With Netty I am able to get at least 10 ~ 15 times better throughput. Considering Netty is also built on top of NIO what am I doing wrong?

Are there some obvious performance optimizations that I am missing?

答案1

得分: 0

经过进一步的搜索和分析,我终于找出了上述代码中的所有问题。

def main(args: Array[String]): Unit = {
    val port    = 8080
    val address = new InetSocketAddress(port)

    val serverSocketChannel = ServerSocketChannel.open()
    serverSocketChannel.bind(address)
    serverSocketChannel.configureBlocking(false)

    val selector = Selector.open()
    serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT)

    while (true) {
      selector.select()
      val iterator = selector.selectedKeys().iterator()
      while (iterator.hasNext) {
        val key = iterator.next()
        if (key.isAcceptable) {
          val channel = serverSocketChannel.accept()
          // 1. 阻塞写入
          channel.write(helloWorldBytes)
          // 2. 阻塞关闭
          channel.close()
        }

      }
      iterator.remove()
    }

    sys.addShutdownHook({
      println("正在关闭...")
      serverSocketChannel.close()
    })

    println("退出...")
  }
}

主要问题在于:

1. 阻塞写入
由于阻塞写入调用,除非字节写入流中,否则我无法接受更多的连接。因此,这些连接只是空闲在那里,从而影响了 Web 服务器的性能。

2. 阻塞关闭
close 调用也是阻塞的,并且需要时间才能完成。同样地,除非连接关闭,否则不会接受新的请求,也不会响应已接受的连接。

在关闭连接方面还存在另一个问题:创建新连接是昂贵的,诸如 wrk 等工具在发出一次请求后不会自动关闭连接。在服务器上每个请求之后关闭连接也会影响性能,并且会影响基准测试的结果。

以下是一个替代的“高性能”实现:

package zion

import java.io.IOException
import java.net.InetSocketAddress
import java.nio.ByteBuffer
import java.nio.channels.{
  AsynchronousChannelGroup,
  AsynchronousServerSocketChannel,
  AsynchronousSocketChannel,
  CompletionHandler
}
import java.util.concurrent.{Executors, TimeUnit}

/**
  * This is potentially as fast as it can get using NIO APIs.
  */
object HelloAsyncNIO {
  // ...(略去部分内容)
  // 请注意,这里是一个更高性能的实现示例,您可以参考该示例进行优化。
  // ...
}

请注意,这里给出的是原始代码的翻译,包括代码中的注释。如果您需要进一步了解有关代码的内容,可以参考原始代码中的注释和逻辑。

英文:

After doing some further searching and analysis I have finally figured out all the problems in this above code.

def main(args: Array[String]): Unit = {
    val port    = 8080
    val address = new InetSocketAddress(port)

    val serverSocketChannel = ServerSocketChannel.open()
    serverSocketChannel.bind(address)
    serverSocketChannel.configureBlocking(false)

    val selector = Selector.open()
    serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT)

    while (true) {
      selector.select()
      val iterator = selector.selectedKeys().iterator()
      while (iterator.hasNext) {
        val key = iterator.next()
        if (key.isAcceptable) {
          val channel = serverSocketChannel.accept()
          // 1. Blocking Write
          channel.write(helloWorldBytes)
          // 2. Blocking Close
          channel.close()
        }

      }
      iterator.remove()
    }

    sys.addShutdownHook({
      println("Shutting down...")
      serverSocketChannel.close()
    })

    println("Exiting...")
  }
}

The main problems were that

1. Blocking Write
Because of the blocking write call, unless the bytes are written into the stream I was not able to accept more connections. So those connections are just lying idle, thus affecting the performance of the webserver

2. Blocking Close
The close call is also blocking and takes time to complete. Again unless the connection is closed, no new requests are accepted and no accepted connections are responded.

There is another problem in closing the connection: Creating a new connection is expensive and tools like wrk etc. don't kill the connection automatically after making one request. Closing it on the server is after each request also becomes a performance killer and thus affects your benchmarks.

Here is an alternative "Highly performant" implementation

package zion

import java.io.IOException
import java.net.InetSocketAddress
import java.nio.ByteBuffer
import java.nio.channels.{
  AsynchronousChannelGroup,
  AsynchronousServerSocketChannel,
  AsynchronousSocketChannel,
  CompletionHandler
}
import java.util.concurrent.{Executors, TimeUnit}

/**
  * This is potentially as fast as it can get using NIO APIs.
  */
object HelloAsyncNIO {
  // Create a thread pool for the socket channel
  // It would be better to have probably only one thread for events.
  // That pool could be shared betwee the SocketServer and in future SocketClients.
  private val group =
    AsynchronousChannelGroup.withThreadPool(Executors.newFixedThreadPool(24))

  // Socket to accept connections
  private val serverSocketChannel = AsynchronousServerSocketChannel.open(group)

  // Port to be used to connect
  private val PORT = 8081

  // Flag to handle logging
  private val ENABLE_LOGGING = false

  /**
    * Contains utilities to manage read/write on the socket channels
    */
  object NIOBuffer {
    def helloWorldBytes: ByteBuffer = Charset
      .forName("ISO-8859-1")
      .newEncoder
      .encode(CharBuffer.wrap(httpResponse("NHello World\n")))

    def httpResponse(content: String): String = {
      val rn = "\r\n"
      List(
        "HTTP/1.1 200 OK",
        "Content-Type: text/html",
        "Connection: Keep-Alive",
        s"Content-Length: ${content.length()}",
        rn + content
      ).mkString(rn)
    }
    private val writeByteBuffer = ByteBuffer.wrap(helloWorldBytes)
    private val readByteBuffer  = ByteBuffer.allocateDirect(1024 * 2) // 2kb
    def read(
        socket: AsynchronousSocketChannel
    )(h: CompletionHandler[Integer, AsynchronousSocketChannel]): Unit =
      socket.read(readByteBuffer.duplicate(), socket, h)
    def write(
        socket: AsynchronousSocketChannel
    )(h: CompletionHandler[Integer, AsynchronousSocketChannel]): Unit =
      socket.write(writeByteBuffer.duplicate(), socket, h)
  }

  // Generic async completion handler
  case class Handle[V, A](cb: (V, A) => Unit) extends CompletionHandler[V, A] {
    override def completed(result: V, attachment: A): Unit =
      cb(result, attachment)
    override def failed(cause: Throwable, attachment: A): Unit = {
      cause match {
        case e: IOException => log(e.getMessage)
        case _              => cause.printStackTrace()
      }
    }
  }

  // Logging utility
  def log(input: Any*): Unit = {
    if (ENABLE_LOGGING) println(input.map(_.toString).mkString(", "))
  }

  private val onAccept
      : Handle[AsynchronousSocketChannel, AsynchronousServerSocketChannel] =
    Handle[AsynchronousSocketChannel, AsynchronousServerSocketChannel](
      (socket, server) => {
        log("\nACCEPT")

        // Accept new connections immediately
        server.accept(serverSocketChannel, onAccept)

        // Read from the current socket
        NIOBuffer.read(socket)(onRead)
      }
    )

  private val onRead: Handle[Integer, AsynchronousSocketChannel] =
    Handle[Integer, AsynchronousSocketChannel]((bytes, socket) => {
      log("READ", bytes)

      // EOF, meaning connection can be closed
      if (bytes == -1) socket.close()

      // Some data was read and now we can respond back
      else if (bytes > 0) NIOBuffer.write(socket)(onWrite)

    })

  private val onWrite: Handle[Integer, AsynchronousSocketChannel] =
    Handle[Integer, AsynchronousSocketChannel]((bytes, socket) => {
      log("WRITE", bytes)

      // Read from the socket
      NIOBuffer.read(socket)(onRead)
    })

  def main(args: Array[String]): Unit = {

    // Setup socket channel
    serverSocketChannel.bind(new InetSocketAddress(PORT))
    serverSocketChannel.accept(serverSocketChannel, onAccept)

    // Making the main thread wait
    group.awaitTermination(Long.MaxValue, TimeUnit.SECONDS)
  }
}

答案2

得分: -2

很多时候,基本上你是在以“阻塞”的方式使用 NIO。查看这个链接:

https://bedrin.livejournal.com/204307.html
https://crunchify.com/java-nio-non-blocking-io-with-server-client-example-java-nio-bytebuffer-and-channels-selector-java-nio-vs-io/

英文:

Lot of, basically you're using NIO in 'blocking' way. Check this:

https://bedrin.livejournal.com/204307.html
https://crunchify.com/java-nio-non-blocking-io-with-server-client-example-java-nio-bytebuffer-and-channels-selector-java-nio-vs-io/

huangapple
  • 本文由 发表于 2020年5月4日 23:07:00
  • 转载请务必保留本文链接:https://go.coder-hub.com/61595399.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定