英文:
Iterate thru a HashMap and start a Thread for every Different Value
问题
我需要编写代码来处理一个HashMap(其键和值是两个不同的类),并为其中的每个项创建一个线程。为了执行线程,代码将从该HashMap中取出一个条目,并检查是否有一个具有相同值的线程。如果存在,它将跳过此条目,直到该线程完成执行。代码必须遍历此HashMap直到为空,在不违反上述条件的情况下进行。然而,我在实现这个逻辑时遇到了问题。
我基于处理器数量创建了一个Executor Service,并创建了一个正在执行的值列表(来自HashMap)。但我不知道在线程运行结束后如何从此列表中删除该值。
这是我的未完成和未经测试的代码:
int threads = Runtime.getRuntime().availableProcessors();
ExecutorService executor = Executors.newFixedThreadPool(threads);
List<Future<Boolean>> futures = new ArrayList<>();
List<UnidadeOrganizacionalView> dealershipsBeingExecuted = new ArrayList<>();
dealershipsBeingExecuted = Collections.synchronizedList(dealershipsBeingExecuted);
Iterator<Map.Entry<NotaResumoView, UnidadeOrganizacionalView>> it =
preparedNfs.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<NotaResumoView, UnidadeOrganizacionalView> pair = it.next();
if (dealershipsBeingExecuted.contains(pair.getValue())){
it.next();
}
dealershipsBeingExecuted.add(pair.getValue());
futures.add(executor.submit(
new ConfirmNfeProcessor(Integration, pair.getKey(), pair.getValue())););
it.remove();
}
英文:
I need to write code that processes a HashMap (with two different classes being its key and value) and creates a thread for each of its items. To execute a thread, the code will take an entry from that HashMap and check if there is a thread with the same value. If it exists, it will skip this entry until that thread finishes executing. The code must go through this HashMap until it is empty, without breaking the condition I mentioned above. However, I am having trouble implementing this logic.
I created an Executor Service based on the number of processors and a list of values (from HashMap) that are being executed. But I don't know how I can remove the value from this list after the thread has finished running.
Here's my unfinished and untested code:
int threads = Runtime.getRuntime().availableProcessors();
ExecutorService executor = Executors.newFixedThreadPool(threads);
List<Future<Boolean>> futures = new ArrayList<>();
List<UnidadeOrganizacionalView> dealershipsBeingExecuted = new ArrayList<>();
dealershipsBeingExecuted = Collections.synchronizedList(dealershipsBeingExecuted);
Iterator<Map.Entry<NotaResumoView, UnidadeOrganizacionalView>> it =
preparedNfs.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<NotaResumoView, UnidadeOrganizacionalView> pair = it.next();
if (dealershipsBeingExecuted.contains(pair.getValue())){
it.next();
}
dealershipsBeingExecuted.add(pair.getValue());
futures.add(executor.submit(
new ConfirmNfeProcessor(Integration, pair.getKey(), pair.getValue())););
it.remove();
}
答案1
得分: 2
我认为你遇到了一个XY问题。在我看来,你的实际需求是确保不同的线程同时运行相同经销商的 ConfirmNfeProcessor
任务,是这样吗?
与其使用具有 N 个工作线程的固定线程池,我建议使用一个包含 N 个线程“池”的数组,每个池只有一个线程。然后,对于映射中的每个 pair
,可以使用 pair.getValue().hashCode()
来选择哪个 N 个执行器应该执行该任务。
所有具有相同值("value" == "dealership",对吗?)的 pair
将被提交到同一个执行器,由于每个执行器是单线程的,它将确保从不同时为同一经销商执行两个任务。
英文:
I think you have a bit of an XY problem there. It looks to me as if your actual requirement is to ensure that no two threads run ConfirmNfeProcessor
tasks for the same dealership at the same time, is that right?
Instead of having one fixed thread pool with N worker threads, I would use an array of N thread "pools" that have one thread each. Then, for each pair
in the map, I would use pair.getValue().hashCode()
to choose which one of the N executors should perform the task.
Every pair
that has the same value ("value" == "dealership", right?) will be submitted to the same executor, and since each executor is single-threaded, it will be guaranteed to never perform two tasks for the same dealership at the same time.
答案2
得分: 2
最简单的实现方式是使用 CompletableFuture
,它允许您链接依赖动作,比如在异步评估完成后提交另一个作业。
例如:
Map<UnidadeOrganizacionalView, CompletableFuture<Boolean>> pending = new HashMap<>();
for(Map.Entry<NotaResumoView, UnidadeOrganizacionalView> e: preparedNfs.entrySet()) {
UnidadeOrganizacionalView value = e.getValue();
ConfirmNfeProcessor p = new ConfirmNfeProcessor(Integration, e.getKey(), value);
pending.compute(value, (key,future) -> future == null?
CompletableFuture.supplyAsync(p, executor):
future.thenApplyAsync(b -> p.get(), executor));
}
// 如果您想等待所有作业完成:
CompletableFuture.allOf(pending.values().toArray(new CompletableFuture<?>[0])).join();
它使用一个 Map<UnidadeOrganizacionalView, CompletableFuture<Boolean>>
来记住已经提交的作业。
这在 compute
方法内部使用。当值没有关联的先前作业时,它将调用 CompletableFuture.supplyAsync(p, executor)
来创建一个新作业并记住它。否则,它将通过 thenApplyAsync
创建一个新作业,在先前作业完成后启动,并记住这个新作业。
这假设可以将类 ConfirmNfeProcessor
从实现 Callable<Boolean>
更改为实现 Supplier<Boolean>
。除了相关方法的名称之外,主要区别是 Supplier
不能抛出已检查异常。如果不可能进行这种更改,您需要适配器代码。
一个可能的解决方案是:
public static <R> CompletableFuture<R> callAsync(Callable<R> callable, Executor e) {
CompletableFuture<R> cf = new CompletableFuture<>();
CompletableFuture.runAsync(() -> {
try { cf.complete(callable.call()); }
catch(Throwable ex) { cf.completeExceptionally(ex); }
}, e);
return cf;
}
public static <R> CompletableFuture<R> thenCallAsync(
CompletableFuture<?> f, Callable<R> callable, Executor e) {
CompletableFuture<R> cf = new CompletableFuture<>();
f.whenCompleteAsync((value, t) -> {
if(t != null) cf.completeExceptionally(t);
else try { cf.complete(callable.call()); }
catch(Throwable ex) { cf.completeExceptionally(ex); }
}, e);
return cf;
}
用法示例如下:
Map<UnidadeOrganizacionalView, CompletableFuture<Boolean>> pending = new HashMap<>();
for(Map.Entry<NotaResumoView, UnidadeOrganizacionalView> e: preparedNfs.entrySet()) {
UnidadeOrganizacionalView value = e.getValue();
ConfirmNfeProcessor p = new ConfirmNfeProcessor(Integration, e.getKey(), value);
pending.compute(value, (key,future) -> future == null?
callAsync(p, executor): thenCallAsync(future, p, executor));
}
英文:
The simplest way to achieve this, is to use CompletableFuture
, which allows you to chain dependent actions, like submitting another job, when the asynchronous evaluation has been completed.
E.g.
Map<UnidadeOrganizacionalView, CompletableFuture<Boolean>> pending = new HashMap<>();
for(Map.Entry<NotaResumoView, UnidadeOrganizacionalView> e: preparedNfs.entrySet()) {
UnidadeOrganizacionalView value = e.getValue();
ConfirmNfeProcessor p = new ConfirmNfeProcessor(Integration, e.getKey(), value);
pending.compute(value, (key,future) -> future == null?
CompletableFuture.supplyAsync(p, executor):
future.thenApplyAsync(b -> p.get(), executor));
}
// if you want to wait for the completion of all job:
CompletableFuture.allOf(pending.values().toArray(new CompletableFuture<?>[0])).join();
It uses a Map<UnidadeOrganizacionalView, CompletableFuture<Boolean>>
to remember the already submitted jobs.
This is used within the compute
method. It will call CompletableFuture.supplyAsync(p, executor)
when there was no previous job associated with the value, to create a new one and remember it. Otherwise, it will create a new job via thenApplyAsync
, that will be started when the previous one has been completed, and remembers this new job.
This assumes that it is possible to change the class ConfirmNfeProcessor
from implementing Callable<Boolean>
to implementing Supplier<Boolean>
. The main difference, besides the name of the relevant method, is that a Supplier
can not throw checked exceptions. If such a change is not possible, you need adapter code.
One possibility would be:
public static <R> CompletableFuture<R> callAsync(Callable<R> callable, Executor e) {
CompletableFuture<R> cf = new CompletableFuture<>();
CompletableFuture.runAsync(() -> {
try { cf.complete(callable.call()); }
catch(Throwable ex) { cf.completeExceptionally(ex); }
}, e);
return cf;
}
public static <R> CompletableFuture<R> thenCallAsync(
CompletableFuture<?> f, Callable<R> callable, Executor e) {
CompletableFuture<R> cf = new CompletableFuture<>();
f.whenCompleteAsync((value, t) -> {
if(t != null) cf.completeExceptionally(t);
else try { cf.complete(callable.call()); }
catch(Throwable ex) { cf.completeExceptionally(ex); }
}, e);
return cf;
}
to be used like
Map<UnidadeOrganizacionalView, CompletableFuture<Boolean>> pending = new HashMap<>();
for(Map.Entry<NotaResumoView, UnidadeOrganizacionalView> e: preparedNfs.entrySet()) {
UnidadeOrganizacionalView value = e.getValue();
ConfirmNfeProcessor p = new ConfirmNfeProcessor(Integration, e.getKey(), value);
pending.compute(value, (key,future) -> future == null?
callAsync(p, executor): thenCallAsync(future, p, executor));
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论