Concurrent IO on single core Java 11

huangapple go评论99阅读模式
英文:

Concurrent IO on single core Java 11

问题

使用诸如Erlang等具有轻量级并发进程的语言之后,我发现很难理解如何将这种概念转化为Java。考虑到我使用的是单核机器,是否有一种方法可以执行多个并发的IO绑定操作(例如http)?

我所找到的方法是使用ExecutorServiceCompletableFuture。我遇到的问题是它们都基于线程池。默认的线程池使用核心数 - 1,而在我使用的单核机器上,几乎没有并发能力。解决方案是否应该是提供一个具有更多线程的自定义Executor?或者在Java中,针对单核机器的IO绑定并发是否有更具特色的方法?

我正在一个只有一个核心的AWS Lambda上运行这段代码。

英文:

After using languages like Erlang and others, that have lightweight concurrent processes, I find it hard to understand how this translates into Java. Given I use a single core machine, is there a way to perform multiple concurrent IO bound operations (http)?

What I have found is the following ExecutorService and CompletableFuture. The issue I have is that they are based on a threadpool. The default threadpool uses core# - 1, which, on a single core machine, that I am using, has NO concurrency. Would the solution be to just provide a custom Executor with a higher number of threads? or is there a more idiomatic way for IO bound concurrency on single core machines in Java?

I am running this code on a AWS Lambda with a single core.

答案1

得分: 2

"默认的线程池使用核心数 - 1,对于我正在使用的单核机器来说,并发性为零。" - 为什么?一个并发程序完全可以在单核机器上运行。这与并行无关。

当Java线程在等待I/O时,内核的调度程序会将它移到等待队列,然后会运行其他需要CPU时间的线程。因此,您可以创建具有任意数量线程的线程池,调度程序将处理并发。即使在单核机器上,这也能正常工作。

唯一的限制是您将创建的线程数量。线程的默认堆栈大小在 512K1M 之间变化。因此,这在一定程度上不会很好地扩展,而且在某些时候,线程会耗尽。在我的系统上,我可以创建大约5000个线程。像Go这样的语言通过在有限数量的内核线程上多路复用多个 goroutine 来管理这个问题。这需要由Go运行时进行调度。

如果您想缓解这个问题,您应该考虑使用 NIO。我编写了一个简单的程序,您可以用它来找出您实际上可以支持多少并发连接。在导入部分之后,这个程序应该可以直接运行:

public class ConcurrentBlockingServer {

  private ExecutorService pool = Executors.newCachedThreadPool();

  public static void main(String[] args) {
    ConcurrentBlockingServer bs = new ConcurrentBlockingServer();
    try {
      bs.listen();
    } catch (IOException e) {
      e.printStackTrace();
    }
  }

  private void listen() throws IOException {
    int connectionId = 0;
    ServerSocket ss = new ServerSocket(8080);
    while (true) {
      Socket s = ss.accept(); // 阻塞调用,永远不会返回null
      System.out.println("连接: " + (++connectionId));
      process(s);
    }
  }

  private void process(Socket s) {
    Runnable task =
        () -> {
          try (InputStream is = s.getInputStream();
              OutputStream os = s.getOutputStream()) {
            int data;
            // -1表示EOF,.read()是阻塞的
            while (-1 != (data = is.read())) {
              os.write(flipCase(data));
              os.flush();
            }
          } catch (IOException e) {
            e.printStackTrace();
          }
        };
    pool.submit(task);
  }

  private int flipCase(int input) {
    if (input >= 65 && input <= 90) {
      return input + 32;
    } else if (input >= 97 && input <= 122) {
      return input - 32;
    } else {
      return input;
    }
  }
}

运行此程序,查看您可以建立多少个连接。

public class RogueClient {

  private static long noClients = 9000;

  public static void main(String[] args) {
    for (int i = 0; i < noClients; i++) {
      try {
        new Socket("localhost", 8080);
        System.out.println("连接编号: " + i);
      } catch (IOException e) {
        System.err.println("异常: " + e.getMessage() + ",对于连接: " + i);
      }
    }
    try {
      Thread.sleep(Integer.MAX_VALUE);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }
}

编辑:线程池的大小应该取决于您程序的性质。如果是I/O绑定的任务,可以创建许多线程。但是对于CPU绑定的程序,线程数应该等于核心数。

英文:

"The default threadpool uses core# - 1, which, on a single core machine, that I am using, has NO concurrency." - Why? A concurrent program can very well run on a single core machine. It has nothing to do with parallelism.

When a Java thread is waiting for I/O, the kernel's scheduler will move it to the wait queue, and some other thread that requires CPU time will be run. So you can create a thread pool with as many threads as you want, and the scheduler will take care of the concurrency. And this will work fine even on a single core machine.

The only limit here is the number of threads you will create. The default stack size of a thread varies b/w 512K to 1M. So this does not scale very well, and at some point, you'll run out of threads. On my system, I could create around 5k of them. Languages like Go manage this by multiplexing multiple goroutines on a limited number of kernel threads. This requires scheduling by the Go runtime.

If you want to alleviate this, you should look into NIO. I wrote a quick program that you can use to find out how many concurrent connections you can actually support this way. This should run as-is after the imports:

public class ConcurrentBlockingServer {

  private ExecutorService pool = Executors.newCachedThreadPool();

  public static void main(String[] args) {
    ConcurrentBlockingServer bs = new ConcurrentBlockingServer();
    try {
      bs.listen();
    } catch (IOException e) {
      e.printStackTrace();
    }
  }

  private void listen() throws IOException {
    int connectionId = 0;
    ServerSocket ss = new ServerSocket(8080);
    while (true) {
      Socket s = ss.accept(); // blocking call, never null
      System.out.println(&quot;Connection: &quot; + (++connectionId));
      process(s);
    }
  }

  private void process(Socket s) {
    Runnable task =
        () -&gt; {
          try (InputStream is = s.getInputStream();
              OutputStream os = s.getOutputStream()) {
            int data;
            // -1 is EOF, .read() is blocking
            while (-1 != (data = is.read())) {
              os.write(flipCase(data));
              os.flush();
            }
          } catch (IOException e) {
            e.printStackTrace();
          }
        };
    pool.submit(task);
  }

  private int flipCase(int input) {
    if (input &gt;= 65 &amp;&amp; input &lt;= 90) {
      return input + 32;
    } else if (input &gt;= 97 &amp;&amp; input &lt;= 122) {
      return input - 32;
    } else {
      return input;
    }
  }
}

Run this program and see how many connections you could make.

public class RogueClient {

  private static long noClients = 9000;

  public static void main(String[] args) {
    for (int i = 0; i &lt; noClients; i++) {
      try {
        new Socket(&quot;localhost&quot;, 8080);
        System.out.println(&quot;Connection No: &quot; + i);
      } catch (IOException e) {
        System.err.println(&quot;Exception: &quot; + e.getMessage() + &quot;, for connection: &quot; + i);
      }
    }
    try {
      Thread.sleep(Integer.MAX_VALUE);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }
}

Edit: The pool size should depend upon the nature of your program. If it's an I/O bound task, you could go ahead and create many threads. But for CPU bound programs, the number of threads should be equal to the number of cores.

答案2

得分: 0

线程是Java中基本(也是唯一的)并发机制。ExecutorService具有使用任意数量线程创建线程池的方法;此外,CompletableFuture并不局限于ForkJoinPool.commonPool。提供线程数量等于核心数的方法只是方便处理CPU绑定任务。

英文:

Threads are the basic (and only) concurrency mechanism in Java. ExecutorService has methods for creating thread pools with an arbitrary number of threads; also CompletableFutures are not limited to ForkJoinPool.commonPool. The ones provided with the number of threads = number of cores are just convenience methods for CPU-bound tasks.

答案3

得分: 0

针对I/O绑定的工作,您可以像您提到的那样分配更多线程,当它们正在执行I/O操作时,它们会被阻塞,因此其他线程仍然可以利用CPU。

您还可以查看Project Reactor,它可以在不想手动处理线程调度的情况下非常方便,特别是使用此特定调度程序:https://projectreactor.io/docs/core/release/api/reactor/core/scheduler/Schedulers.html#boundedElastic--

英文:

For IO bound work you could allocate more threads as you've mentioned, while they're doing the IO they are blocked, so others can still utilize the CPU.

You could also have a look at Project Reactor which can come handy if you don't want to manually take care of threading with this particular scheduler: https://projectreactor.io/docs/core/release/api/reactor/core/scheduler/Schedulers.html#boundedElastic--

huangapple
  • 本文由 发表于 2020年10月13日 02:05:14
  • 转载请务必保留本文链接:https://go.coder-hub.com/64323108.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定