英文:
How to show the current throughput of n theads?
问题
想象一下有n个线程,它们都在一个SOAP端点上工作,以获取一些数据并将其存储到文件系统中。如何最好地显示这些线程在最后一秒的吞吐量呢?
我想到了以下的方法:
所有32个(在这种情况下为n)线程都持有一个AtomicInteger
,每个线程在写入文件系统后都会增加它的值。然后,另一个线程(第33个线程)会对这个AtomicInteger
进行评估,计算出AtomicInteger
除以n个线程的值,通过使用指标(剩余条目总数/吞吐量)来估算剩余时间(ETA),然后将AtomicInteger
重置为零,然后休眠一秒,继续循环,直到所有线程完成它们的工作。
然而,这样做存在以下问题:对AtomicInteger
的锁定会导致一些性能损失(尽管我认为可以忽略不计),输出永远不会准确地显示最后一秒,因为在锁定时间上会有一些变化。
有人能想到一个更优雅的解决方案吗?我想我可能过于深思熟虑了,Java可能已经有了一些解决方案。
即使可能存在第三方的解决方案或包,我也更愿意理解如何正确地做这个,而不仅仅是使用一个包。
这些线程是由ExecutorService
生成的,而且所有线程都属于Runnable
类型。
英文:
Imagine having n-Threads which all work on a SOAP endpoint to acquire some data and store it onto the file system. What's the best way to display the throughput of these threads for the last second?
I came around with the following idea:
All 32 (n
in this case) threads hold an AtomicInteger
which is increased by every thread after one file was written to the file system. This AtomicInteger
is then evaluated by another thread (#33), which is just there for logging this AtomicInteger
divided by n-Threads, estimating the ETA by using metrics (total amount of entries left / throughput) resetting the AtomicInteger
to zero and then sleeps one second and continue looping until all threads finished their work.
However, this has the following problem: The locking of the AtomicInteger will cause some loss of performance (although I'd consider it negligible) and the output will never be the last second, since there will be some variance in locking times.
Can anyone think of a more elegant solution to this problem? I guess I might be overthinking this completely and Java already has some solution for it.
Even if there might a 3rd-party solution or package for this, I'd rather understand how to do this properly than just to use a package.
The threads are spawned by an ExecutorService
and all threads are of the Runnable
kind.
答案1
得分: 1
最简单的方法是仅在第 n 次更新,例如,如果 AtomicInteger 是瓶颈,只在每 100 次调用时更新一次。我建议首先使用 Atomicinteger,只有在你发现有问题时才使用更复杂的解决方案。
有一些用于采样统计的库,例如,https://github.com/HdrHistogram/HdrHistogram,但由于你只想测量一个端点,这不符合你的需求。
英文:
The easiest way is to update only n th time, for example only every 100 calls if AtomicInteger is a bottleneck. I would suggest first use an Atomicinteger and only when you see that there is a problem use a more complicated solution.
there are libraries for sampling statistics, for example, https://github.com/HdrHistogram/HdrHistogram but since you want to measure only one endpoint this does not fit your needs.
答案2
得分: 1
我认为你对指标收集有非常定制化的要求。我怀疑是否有任何指标收集库能够满足你的期望。也许可以尝试查看 Micrometer 的 GaugeCounter。
问题根源:
你正试图让 n+1 个工作线程同时对同一资源进行读写操作。在这种情况下,你可以使用变种的 Scatter/Gather 模式,其中一个工作线程可以聚合 n 个工作线程的工作,而 n 个工作线程可以读写各自拥有的 n 个资源。
假设:
我假设你在一个列表或数组中引用了所有这些线程,并且文件数不会超过 Integer.MAX_VALUE
。
解决方案:
你可以在每个线程中使用 volatile int
作为组合,而不是从外部传递 AtomicInteger
。这些计数器将单调递增,且在计算线程中从不被修改或重置。
你可以在计算线程中保留 int 值的本地副本,并将这个本地副本从新值中减去,以获取上一秒处理的文件数,并用新值更新本地副本(这相当于将计数器重置为每个线程的 0)。或者,如果可能的话,你可以只跟踪计数器的总和,而不是为每个线程存储计数器。
class Computation implements Runnable {
List<Worker> workerList;
int[] localCounters;
boolean finished;
public Computation(List<Worker> workerList) {
this.workerList = workerList;
this.localCounters = new int[workerList.size()];
}
@Override
public void run() {
while (!finished) {
for (int i = 0; i < workerList.size(); i++) {
localCounters[i] = workerList.get(i).getCounter();
}
computation();
pause();
}
}
private void computation(){
// 使用 localCounters 进行计算
// this.localCounters[x]
}
private boolean isFinished () {
return finished;
}
private void pause() {
try {
Thread.sleep(1000L);
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class Worker implements Runnable {
private volatile int counter;
@Override
public void run() {
// 执行你的操作。
}
public int getCounter() {
return counter;
}
}
英文:
I think you have a very customized requirement regarding metrics collection. I doubt that there are any metrics collection libraries doing what you are expecting. Maybe checkout micrometers GaugeCounter.
Problem Root:
You are trying to read and write to the same resource concurrently by n+1 workers. The use case allows you to use variation of Scatter/Gather pattern where one worker can aggregate the work of n workers and n workers can read/write n resources owned by themselves.
Assumption:
I am assuming that you have references to all those threads in a list or array and you don't have files more than Integer.MAX_VALUE
.
Solution:
You can use volatile int
as a composition in each thread instead of passing AtomicInteger
from the outside. These counters will be monotonically increasing and are never modified or reset in the computation thread.
You can keep a local copy of int values in the computation thread and subtract this local copy from the new values to obtain the files processed in the last second and update the local copy with the new values (This will be equivalent to resetting the counter to 0 for each thread). Alternatively, if you could, you can just keep track of the summation of the counters instead of storing counter for each thread.
class Computation implements Runnable {
List<Worker> workerList;
int[] localCounters;
boolean finished;
public Computation(List<Worker> workerList) {
this.workerList = workerList;
this.localCounters = new int[workerList.size()];
}
@Override
public void run() {
while (!finished) {
for (int i = 0; i < workerList.size(); i++) {
localCounters[i] = workerList.get(i).getCounter();
}
computation();
pause();
}
}
private void computation(){
//do your computation with localCounters
//this.localCounters[x]
}
private boolean isFinished () {
return finished;
}
private void pause() {
try {
Thread.sleep(1000L);
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class Worker implements Runnable {
private volatile int counter;
@Override
public void run() {
// do your thing.
}
public int getCounter() {
return counter;
}
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论