处理许多客户端的最佳方法(使用线程?)

huangapple go评论82阅读模式
英文:

Best way to handle many clients (with threads?)

问题

以下是您提供的代码的翻译部分:

while(this.isRunning()) {
    ServerSocket server = new ServerSocket(8081);
    Socket s = server.accept();
    new Thread(new WorkerRunnable(s)).start();
    //现在如果例如有超过25个用户连接,将会创建25个线程。CPU 占用率为 100%。有更好的方法来处理这种情况吗?
}
private boolean state;
private ServerSocket socket;

@Override
public void run() {
    while(this.isRunning()==true) {
        try {
            if(this.socket==null) this.socket = new ServerSocket(this.getPort());
            Socket connection = this.socket.accept();
            
            IntroductionSession session = new IntroductionSession(this, connection);
            new Thread(session).start();
            //注册 3 秒的超时任务并异步处理它
            
            System.out.println(ManagementFactory.getThreadMXBean().getThreadCount());
            //处理传入连接
        } catch(Exception e) {
            e.printStackTrace();
            //System.exit(1);
        }
    }
}
private class IntroductionSession implements Runnable {
    //...
    @Override
    public void run() {
        while(this.alive==true) {
            try {
                if(this.to_client==null) {
                    this.to_client = new ObjectOutputStream(this.socket.getOutputStream());
                }
                if(this.from_client==null) this.from_client = new ObjectInputStream(this.socket.getInputStream());
                //时间运行,如果 socket 不活跃,将会被断开
                
                Object obj = this.from_client.readObject();
                while(obj!=null) {
                    if(obj instanceof IntroductionPacket) {
                        IntroductionPacket pk = (IntroductionPacket) obj;
                        introduced = true;
                        
                        if(isCompatible(pk)==false) {
                            try {
                                this.to_client.writeObject(new DifferentVersionKickPacket(BaseServer.version));
                                this.to_client.close();
                                this.from_client.close();
                                IntroductionSession.this.socket.close();
                                System.out.println("Kicked socket, which uses another version.");
                            } catch(Exception e) {
                                Thread.currentThread().interrupt();
                                //忽略
                                System.out.println("Error at kicking incompatible socket.");
                                e.printStackTrace();
                            }
                        } else {
                            this.server.handleIncomingConnection(this.socket, this.from_client, this.to_client);
                        }
                        
                        Thread.currentThread().interrupt();
                    }
                }
            } catch(StreamCorruptedException e) {
                //未知的客户端类型 = 断开连接
                this.killConnection("unknown_type");
            } catch (IOException|ClassNotFoundException e) {
                e.printStackTrace();
                this.killConnection("no_reason");
            }
        }
        Thread.currentThread().interrupt();
    }
}
@Override
public void handleIncomingConnection(Socket connection, ObjectInputStream from_client, ObjectOutputStream to_client) {
    new AuthenticationSession(connection, from_client, to_client).run();
}

private class AuthenticationSession implements Runnable {
    private Socket socket;
    private ObjectInputStream from_client;
    private ObjectOutputStream to_client;
    
    public AuthenticationSession(Socket socket, ObjectInputStream from_client, ObjectOutputStream to_client) {
        this.socket = socket;
        this.to_client = to_client;
        this.from_client = from_client;
    }
    
    @Override
    public void run() {
        try {
            while(this.socket.isConnected()==true) {
                Object object = from_client.readObject();
                
                while(object!=null) {
                    if(object instanceof RegisterPacket) {
                        RegisterPacket regPacket = (RegisterPacket) object;
                        
                        System.out.println("Username: " + regPacket + ", password: " + regPacket.password + ", APP-ID: " + regPacket.appId);
                    } else {
                        System.out.println("IP " + this.socket.getInetAddress().getHostAddress() + ":" + this.socket.getPort() + " tried to send an unknown packet.");
                        this.socket.close();
                    }
                }
            }
        } catch(Exception e) {
            e.printStackTrace();
            System.exit(1);
        }
    }
}

请注意,我只翻译了您提供的代码的相关部分。如果您需要进一步的帮助或翻译,请随时告知。

英文:

So my question goes here. Now if my Server has over 20 clients, it also has 20 threads and my desktop with an ryzen CPU goes to 100% at usage at 30 Threads. Now I'd like to handle a mass-amount of clients by one server, but the CPU is just getting over-used. My wise is very simple how I do it, but there must be a better way; because I saw many good java servers so far yet. I don't know what I do wrong though. In the following I share my code, how I do it in principle.

while(this.isRunning()) {
ServerSocket server = new ServerSocket(8081);
Socket s = server.accept();
new Thread(new WorkerRunnable(s)).start();
//now here if e.g. over 25 users connect there are 25 threads. CPU is at 100%. Is there a better way to handle this?

The worker runnable is identifing the clients. After that they will get into a chat-room. Its like a group chat for e.g.

Edit: Relevant parts of my very unfinished code which is still very WIP

private boolean state;
private ServerSocket socket;
@Override
public void run() {
while(this.isRunning()==true) {
try {
if(this.socket==null) this.socket = new ServerSocket(this.getPort());
Socket connection = this.socket.accept();
IntroductionSession session = new IntroductionSession(this, connection);
new Thread(session).start();
//register timeout task for 3 secs and handle it async
System.out.println(ManagementFactory.getThreadMXBean().getThreadCount());
//this.handleIncomingConnection(connection);
} catch(Exception e) {
e.printStackTrace();
//System.exit(1);
}
}
}

private class IntroductionSession implements Runnable {
private boolean alive = true;

	private BaseServer server;
private Socket socket;
private boolean introduced = false;
public IntroductionSession(BaseServer server, Socket socket) {
this.server = server;
this.socket = socket;
}
private void interrupt() {
System.out.println("Not mroe alive");
this.alive = false;
}
private void killConnection() {
this.killConnection("no_reason");
}
private void killConnection(String reason) {
try {
if(this.from_client!=null) this.from_client.close();
if(this.to_client!=null) this.to_client.close();
this.socket.close();
switch(reason) {
case "didnt_introduce":
System.out.println("Kicked connection, cause it didn't introduce itself");
break;
case "unknown_type":
System.out.println("Kicked unknown connection-type.");
break;
case "no_reason":
default:
//ignore
break;
}
} catch (IOException e) {
switch(reason) {
case "didnt_introduce":
System.out.println("Error at kicking connection, which didn't introduce itself");
break;
case "unknown_type":
System.out.println("Error at kicking unknown connection-type.");
break;
case "no_reason":
default:
System.out.println("Error occured at kicking connection");
break;
}
e.printStackTrace();
}
}
private ObjectInputStream from_client;
private ObjectOutputStream to_client;
@Override
public void run() {
while(this.alive==true) {
try {
if(this.to_client==null) {
this.to_client = new ObjectOutputStream(this.socket.getOutputStream());
//this.to_client.flush();
}
if(this.from_client==null) this.from_client = new ObjectInputStream(this.socket.getInputStream());
//Time runs now, if socket is inactive its getting kicked
new Timer().schedule(new java.util.TimerTask() {
@Override
public void run() {
if(IntroductionSession.this.introduced==false) {
IntroductionSession.this.killConnection("didnt_introduce");
Thread.currentThread().interrupt();
IntroductionSession.this.interrupt();
}
}
}, 5000
);
Object obj = this.from_client.readObject();
while(obj!=null) {
if(obj instanceof IntroductionPacket) {
IntroductionPacket pk = (IntroductionPacket) obj;
introduced = true;
if(isCompatible(pk)==false) {
try {
this.to_client.writeObject(new DifferentVersionKickPacket(BaseServer.version));
this.to_client.close();
this.from_client.close();
IntroductionSession.this.socket.close();
System.out.println("Kicked socket, which uses another version.");
} catch(Exception e) {
Thread.currentThread().interrupt();
//ignore
System.out.println("Error at kicking incompatible socket.");
e.printStackTrace();
}
} else {
this.server.handleIncomingConnection(this.socket, this.from_client, this.to_client);
}
Thread.currentThread().interrupt();
}
}
} catch(StreamCorruptedException e) {
//unknown client-type = kick
this.killConnection("unknown_type");
} catch (IOException|ClassNotFoundException e) {
e.printStackTrace();
this.killConnection("no_reason");
}/* catch(SocketException e) {
}*/
}
Thread.currentThread().interrupt();
}
}

Extending class, which is an actual server:

@Override
public void handleIncomingConnection(Socket connection, ObjectInputStream from_client, ObjectOutputStream to_client) {
new AuthenticationSession(connection, from_client, to_client).run();
}
private class AuthenticationSession implements Runnable {
private Socket socket;
private ObjectInputStream from_client;
private ObjectOutputStream to_client;
public AuthenticationSession(Socket socket, ObjectInputStream from_client, ObjectOutputStream to_client) {
this.socket = socket;
this.to_client = to_client;
this.from_client = from_client;
}
//TODO: Implement app id for access tokens
@Override
public void run() {
try {
while(this.socket.isConnected()==true) {
/*ObjectOutputStream to_client = new ObjectOutputStream(socket.getOutputStream()); //maybe cause problems, do it later if it does
ObjectInputStream from_client = new ObjectInputStream(socket.getInputStream());*/
Object object = from_client.readObject();
while(object!=null) {
if(object instanceof RegisterPacket) {
RegisterPacket regPacket = (RegisterPacket) object;
System.out.println("Username:" + regPacket + ", password: " + regPacket.password + ", APP-ID: " + regPacket.appId);
} else {
System.out.println("IP " + this.socket.getInetAddress().getHostAddress() + ":" + this.socket.getPort() + " tried to send an unknown packet.");
this.socket.close();
}
}
}
}/* catch(EOFException eofe) {
//unexpected disconnect
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}*/
catch(Exception e) {
e.printStackTrace();
System.exit(1);
}
/*catch(Exception e) {
//e.printStackTrace();
Thread.currentThread().interrupt();
}*/
}
}

Please dont look at its very bad formatting and stuff I did in hope to fix it, the tasks dont die whyever though.

答案1

得分: 0

由于您正在开发聊天应用程序,您需要考虑使用单线程事件循环。

您可以保留一个字符串映射(客户端 ID)和套接字(客户端套接字)。

Map<String, Socket> clientSockets;

您的服务器线程将接受新的客户端套接字,并将其放入上述映射中。然后将会有另一个线程执行事件循环,每当在任何客户端套接字的InputStream中有数据时,它都应将该数据发送到所有其他客户端套接字(群聊)。这应该无限循环进行,其中间会有一个睡眠间隔。

英文:

Since you are doing a Chat Application you need to think of doing a Single Threaded Event Loop.

You can Keep a Map of String (Client id) and Socket (Client socket).

Map&lt;String, Socket&gt; clientSockets;

You Server thread will accept new Client Sockets and will just put it in the above map. Then there will be another Thread which will do the Event Loop and whenever there is data in any of the Client Socket in InputStream it should send that data to all other Client Sockets (Group Chat). This should happen infinitely with a Sleep interval.

答案2

得分: 0

通常,在生产级别的服务器代码中,我们不直接创建套接字并处理请求。使用低级套接字、关闭连接和防止泄漏是一场噩梦。相反,我们依赖于生产级别的框架,比如Java的Spring FrameworkPlay Framework

我的问题是,为什么你不使用像我列出的那些服务器端框架呢?

  1. 如果你想知道这些框架如何处理成千上万个并发请求,可以研究诸如线程池等设计模式。这些框架抽象了复杂性,并为你处理线程池。

  2. 如果客户端不需要立即收到响应,你还可以考虑引入消息队列,比如Kafka。服务器将逐个从队列中选择消息并处理它们。但请注意,这是异步的,可能不符合你的要求。

  3. 如果你不仅限于一个服务器,可以考虑将服务器代码部署到Azure或AWS VMSS(虚拟机规模集)。根据你配置的CPU负载规则,系统将自动缩放并动态管理资源。

我建议阅读与服务器相关的系统设计原则,以加强你的理解。

不要重复造轮子。

英文:

Generally, in production grade server code, we don't work with direct creation of socket and handling of requests. It's a nightmare to work with low level sockets, close connections and prevent leaks. Rather, we rely on production grade frameworks such as Java Spring Framework or Play Framework.

My question is, why aren't you using any server-side frameworks such as the ones I listed above?

  1. If you're wondering how these frameworks handle thousands of concurrent requests, look into design patterns such as Thread Pool. These frameworks abstract away the complexities and handle the thread pool for you.

  2. If the clients aren't expected to receive an immediate response, you could also look into introducing messaging queue such as Kafka. The server will pick the messages one by one from the queue and process them. However, bear in mind that this is asynchronous and may not meet your requirements.

  3. If you're not just restricted to one server, you could look into deploying your server code to Azure or AWS VMSS (Virtual machine scale set). Based on CPU load rules you configure, the system will autoscale and dynamically manage resources for you.

I would suggest reading upon system design principles related to servers to reinforce your understanding.

> Don't reinvent the wheel.

huangapple
  • 本文由 发表于 2020年4月6日 15:01:01
  • 转载请务必保留本文链接:https://go.coder-hub.com/61054507.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定