Java,如何管理线程来读取套接字(websocket)?

huangapple go评论92阅读模式
英文:

Java, how manage threads to read socket (websocket)?

问题

我有一个WebSocket服务器。

我的服务器为处理新连接创建一个新线程。线程在WebSocket断开之前都是活动的。

我的问题是:对于100万个连接,我需要100万个线程。我如何才能通过一个线程处理多个WebSocket?而且不需要等待?

ServerSocket server;
private ExecutorService executor = new ThreadPoolExecutor(1000000, 1000000, 7, TimeUnit.SECONDS, queue, threadFactory);

try {
    server = new ServerSocket(port);
} catch (IOException e) {}

while (true) {
    Socket client = null;

    try {
        client = server.accept();

        Runnable r = new Runnable() {
            public void run() {
                // 这是简化的示例,原本是完整的WebSocket实现
                client.getInputStream().read();
            }
        };

        executor.execute(r);
    } catch (IOException e) {}
}
英文:

I have a WebSocket server.

my server create a new thread for handle a new connection.the thread is live until websocket break.

my problem: for 1_000_000 connections i need 1_000_000 threads. how i can handle many websockets by a thread? without wait?

ServerSocket server;
private ExecutorService executor = new ThreadPoolExecutor(1_000_000 , 1_000_000 , 7, TimeUnit.SECONDS, queue, threadFactory);

try
{
    server = new ServerSocket(port); 
}
catch (IOException e) {}

while (true)
 {
    Socket client = null;
    
   try
   {
        client = server.accept();

        Runnable r = new Runnable()
        {
           run()
           {
              // this is simple, original is complete WebSocket imp
               client.getInputStream().read();
           }
        };

        executor.execute(r);
    }

 catch (IOException e) {}
}

答案1

得分: 1

你的观念是错误的。你不应该每隔几毫秒就启动一个新线程,因为那会严重减慢系统的速度。而且你不能同时打开100万个连接。没有正常的操作系统会允许那样做。

与此相反,普通的网络服务器会运行一定数量的线程(例如,在一台普通服务器上可能是100个),这些线程会按顺序处理传入的请求。

英文:

Your concept is wrong. You should not start a new thread every few milliseconds because that will slow down your system a lot. Also you cannot have 1 Million connections open at the same time. No normal operating system would allow that.

Rather than that, normal web servers run a maximum number of threads (e.g. 100 on an average server) which process the incoming requests sequentially.

答案2

得分: 1

请看下面的翻译:

想象一下,你有一个套接字的映射表,每当服务器接收到消息,你会得到消息和相关的套接字!

这个操作是由操作系统(Linux、Windows、Unix、macOS等)内核执行的!

因此,你可以在一个线程中处理百万个连接!

我们称之为非阻塞套接字,这意味着它们不会阻塞你的线程以进行读取、写入或任何其他操作,比如接受等。

Java提供了一个处理这个的包!java.nio.*。

它是如何工作的?

  • 一个线程来处理IO操作
  • 一个选择器来选择哪些套接字有操作,以及什么类型的操作
  • ByteBuffer来处理读写,而不是使用阻塞套接字中的socket.stream

> 你还可以使用多个线程和选择器 (每个选择器都有自己的线程)

看看这个例子:

NoneBlockingServer.java

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

public class NoneBlockingServer {

    public static void main(String[] args) throws Exception {
        runServer("localhost", 5050);
    }

    private final static void runServer(String host, int port) throws Exception {
        Selector selector = Selector.open();
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress(host, port));
        serverSocketChannel.configureBlocking(false); // 设置为非阻塞模式
        SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        
        while (true) {
            int numberOfReadSockets = selector.select();
            
            if (numberOfReadSockets == 0) {
                continue;
            }

            Iterator<SelectionKey> keys = selector.selectedKeys().iterator();

            while (keys.hasNext()) {
                SelectionKey key = keys.next();
                keys.remove();

                if (key.isValid() && key.isReadable()) {
                    SocketChannel socketChannel = (SocketChannel) key.channel();
                    ByteBuffer buffer = ByteBuffer.allocate(100);
                    int read = socketChannel.read(buffer);

                    if (read < 0) {
                        socketChannel.close();
                        continue;
                    } else {
                        buffer.flip();
                        byte[] bytes = new byte[buffer.remaining()];
                        buffer.get(bytes);
                        String msg = new String(bytes);
                        System.out.println("MESSAGE: " + msg);

                        key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
                        key.attach(ByteBuffer.wrap("Hello Client !".getBytes("UTF-8")));
                    }
                }

                if (key.isValid() && key.isWritable()) {
                    SocketChannel socketChannel = (SocketChannel) key.channel();
                    ByteBuffer dataToWrite = (ByteBuffer) key.attachment();
                    int write = socketChannel.write(dataToWrite);

                    if (write < 0) {
                        socketChannel.close();
                        continue;
                    } else if (!dataToWrite.hasRemaining()) {
                        key.interestOps(SelectionKey.OP_READ);
                    }
                }

                if (key.isValid() && key.isAcceptable()) {
                    ServerSocketChannel server = (ServerSocketChannel) key.channel();
                    SocketChannel socketChannel = server.accept();
                    socketChannel.configureBlocking(false);
                    socketChannel.register(selector, SelectionKey.OP_READ);
                    System.out.println("NEW CONNECTION");
                }
            }
        }
    }
}

这里是 BlockingClient.java

import java.net.InetSocketAddress;
import java.net.Socket;

public class BlockingClient {

    public static void main(String[] args) throws Exception {
        Socket socket = new Socket();
        socket.connect(new InetSocketAddress("localhost", 5050));
        socket.getOutputStream().write("Hello Server".getBytes("UTF-8"));
        byte[] buffer = new byte[100];
        int len = socket.getInputStream().read(buffer);
        System.out.println(new String(buffer, 0, len, "UTF-8"));
        socket.close();
    }
}

在这个例子中,我们从阻塞客户端发送“Hello Server”消息到非阻塞服务器,服务器会响应“Hello Client”消息!

只需要运行代码即可!

祝你好运!

英文:

Think about this you have a map of sockets and every time a message received to server you will get message and related socket !

this operation done with OS(linux , windows , unix , mac-OS , ...) kernel !

so you can handle a million connection just in one thread !

we call this None-Blocking sockets which means they never block your thread to read or write or any other operation such as accept and ... !

java has a package to handle this ! java.nio.*

how it's work ?

  • A Thread to handle IO operations
  • A Selector to select which sockets has operation and what type of operation
  • ByteBuffer to handle read and write instead of using socket.stream in blocking-socket

>also you can use multiple thread and selectors (each selector has its own thread)

look at this example :

NoneBlockingServer.java :

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
public class NoneBlockingServer {
public static void main(String[] args) throws Exception
{
runServer(&quot;localhost&quot; , 5050);
}
private final static void runServer(String host , int port)throws Exception {
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(host, port));
serverSocketChannel.configureBlocking(false); //config to be a none-blocking serve-socket
SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
//register to selector for operation ACCEPT !
//also you can use selectionKey for some other stuffs !
while (true) {
int numberOfReadSockets = selector.select();
//it will wait until a socket(s) be ready for some io operation
//or other threads call selector.wakeup()
if(numberOfReadSockets==0){
//maybe selector.wakeup() called
//do some sync operations here !
continue; // continue selecting !
}
Iterator&lt;SelectionKey&gt; keys = selector.selectedKeys().iterator();
while (keys.hasNext())
{
SelectionKey key = keys.next();
keys.remove(); //remove selected key from current selection !
//handle selected key
if(key.isValid() &amp;&amp; key.isReadable())
{
//it means this socket is valid and has data to read
SocketChannel socketChannel =
(SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(100); // allocate 100 bytes for buffer
//maybe you must use an allocated buffer for each connection
// instead of allocate for each operation
int read = socketChannel.read(buffer);
if(read&lt;0)
{
//need to close channel !
socketChannel.close(); // explicitly remove from selector
System.out.println(&quot;CONNECTION CLOSED&quot;);
continue; //socket closed and other operations will skip
}else
{
buffer.flip(); // you need to learn work with ByteBuffers
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
//maybe convert it to String
String msg = new String(bytes);
//use msg !
System.out.println(&quot;MESSAGE : &quot;+msg);
key.interestOps(SelectionKey.OP_READ|SelectionKey.OP_WRITE);
//set interestOps to WRIT and READ to write hello back message !
key.attach(ByteBuffer.wrap(&quot;Hello Client !&quot;.getBytes(&quot;UTF-8&quot;)));
//wrap a array of bytes using wrap and attach it to selectionKey
}
}
if(key.isValid() &amp;&amp; key.isWritable())
{
//it means this socket is valid and have space to write data !
SocketChannel socketChannel =
(SocketChannel) key.channel();
//you must represent data you want to write to this socket
//maybe attached to selection key !
ByteBuffer dataToWrite = (ByteBuffer) key.attachment();
//key.attachment here to help u have some meta data about each socket
//use it smart !
int write = socketChannel.write(dataToWrite);
if(write&lt;0)
{
//so means some error occurs better to close it !
socketChannel.close();
System.out.println(&quot;CONNECTION CLOSED !&quot;); //log
continue;//as socket closed we will skip next operations !
}else if(!dataToWrite.hasRemaining())
{
//so all data putted to buffer !
key.interestOps(SelectionKey.OP_READ); // just need to read !
}
}
if(key.isValid() &amp;&amp; key.isAcceptable())
{
ServerSocketChannel server =
(ServerSocketChannel) key.channel();//just server channels has accept operation
SocketChannel socketChannel = server.accept(); //accept it !
socketChannel.configureBlocking(false); // config none-blocking mode
socketChannel.register(selector , SelectionKey.OP_READ);
//also you can register for multiple operation using | operation
//for example register for both read and write SelectionKey.READ|SelectionKey.WRITE
//also you can change is late using key.interestOps(int ops)
System.out.println(&quot;NEW CONNECTION&quot;); //log
}
//there is another type of key,  key.isConnectable()
//google it !
}
}
}
}

and here is BlockingClient.java :

import java.net.InetSocketAddress;
import java.net.Socket;
public class BlockingClient {
//using blocking sockets !
public static void main(String[] args)throws Exception
{
Socket socket = new Socket();
socket.connect(new InetSocketAddress(&quot;localhost&quot; , 5050));
socket.getOutputStream()
.write(&quot;Hello Server&quot;.getBytes(&quot;UTF-8&quot;));
byte[] buffer = new byte[100];
int len  = socket.getInputStream().read(buffer);
System.out.println(new String(buffer , 0 , len , &quot;UTF-8&quot;));
socket.close();
}
}

at this example we send Hello Server message from Blocking Client to None-Blocking Server and server will response Hello Client message !

just run !

Good luck

答案3

得分: -5

一个诱人的想法是“不使用Java”。Java在支持绿色线程方面表现糟糕,而Go语言和Erlang是构建在绿色线程之上的,因此它们执行得非常出色。

Java的方式似乎是使用工作线程池。因此,您创建一个Executor线程池(参见java.util.concurrent),根据连接数量确定您想要多少个工作线程,然后通过队列将连接传递给工作线程。然后,工作线程必须在其连接集上进行迭代,决定处理每个连接还是让出。

您的活动TCP连接数最多约为2^16(65,536:可用端口数),但如果您有那么多连接,系统可能不太可能表现出色。大多数系统无法为超过~200个持久连接维持性能。如果您的连接数远远超过这个数字,我会认为您的系统对每个连接的处理不足以真正证明使用WebSocket的必要性,但这只是我的猜测。

英文:

A tempting thought is "don't use java". Java has terrible green thread support but golang and erlang are built on green threading so they do it very well.

The java way seems to be worker pools. So you create an Executor pool (see java.util.concurrent), decide how many workers you want for a given number of connections, then pass connections to workers via a queue. A worker then has to iterate over its set of connections deciding to process each or yield.

Your number of live tcp connections is hard capped at about 2^16 (65_536: number of available ports) but the system is unlikely to be performant if you have that many connections anyway. Most systems can't maintain performance for more than ~200 persistent connections. If you can far exceed that number I would assume your system isn't doing enough per connection to really justify the use of web sockets, but I'm only guessing.

huangapple
  • 本文由 发表于 2020年3月15日 21:46:46
  • 转载请务必保留本文链接:https://go.coder-hub.com/60693565.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定