遍历 HashMap 并为每个不同的值启动一个线程。

huangapple go评论86阅读模式
英文:

Iterate thru a HashMap and start a Thread for every Different Value

问题

我需要编写代码来处理一个HashMap(其键和值是两个不同的类),并为其中的每个项创建一个线程。为了执行线程,代码将从该HashMap中取出一个条目,并检查是否有一个具有相同值的线程。如果存在,它将跳过此条目,直到该线程完成执行。代码必须遍历此HashMap直到为空,在不违反上述条件的情况下进行。然而,我在实现这个逻辑时遇到了问题。

我基于处理器数量创建了一个Executor Service,并创建了一个正在执行的值列表(来自HashMap)。但我不知道在线程运行结束后如何从此列表中删除该值。

这是我的未完成和未经测试的代码:

int threads = Runtime.getRuntime().availableProcessors();
ExecutorService executor = Executors.newFixedThreadPool(threads);
List<Future<Boolean>> futures = new ArrayList<>();
List<UnidadeOrganizacionalView> dealershipsBeingExecuted = new ArrayList<>();
dealershipsBeingExecuted = Collections.synchronizedList(dealershipsBeingExecuted);

Iterator<Map.Entry<NotaResumoView, UnidadeOrganizacionalView>> it =
        preparedNfs.entrySet().iterator();
while (it.hasNext()) {
    Map.Entry<NotaResumoView, UnidadeOrganizacionalView> pair = it.next();
    if (dealershipsBeingExecuted.contains(pair.getValue())){
        it.next();
    }
    dealershipsBeingExecuted.add(pair.getValue());
    futures.add(executor.submit(
            new ConfirmNfeProcessor(Integration, pair.getKey(), pair.getValue())););
    it.remove();
}
英文:

I need to write code that processes a HashMap (with two different classes being its key and value) and creates a thread for each of its items. To execute a thread, the code will take an entry from that HashMap and check if there is a thread with the same value. If it exists, it will skip this entry until that thread finishes executing. The code must go through this HashMap until it is empty, without breaking the condition I mentioned above. However, I am having trouble implementing this logic.

I created an Executor Service based on the number of processors and a list of values (from HashMap) that are being executed. But I don't know how I can remove the value from this list after the thread has finished running.

Here's my unfinished and untested code:

    int threads = Runtime.getRuntime().availableProcessors();
    ExecutorService executor = Executors.newFixedThreadPool(threads);
    List&lt;Future&lt;Boolean&gt;&gt; futures = new ArrayList&lt;&gt;();
    List&lt;UnidadeOrganizacionalView&gt; dealershipsBeingExecuted = new ArrayList&lt;&gt;();
    dealershipsBeingExecuted = Collections.synchronizedList(dealershipsBeingExecuted);

    Iterator&lt;Map.Entry&lt;NotaResumoView, UnidadeOrganizacionalView&gt;&gt; it =
            preparedNfs.entrySet().iterator();
    while (it.hasNext()) {
        Map.Entry&lt;NotaResumoView, UnidadeOrganizacionalView&gt; pair = it.next();
        if (dealershipsBeingExecuted.contains(pair.getValue())){
            it.next();
        }
        dealershipsBeingExecuted.add(pair.getValue());
        futures.add(executor.submit(
                new ConfirmNfeProcessor(Integration, pair.getKey(), pair.getValue())););
        it.remove();
    }

答案1

得分: 2

我认为你遇到了一个XY问题。在我看来,你的实际需求是确保不同的线程同时运行相同经销商的 ConfirmNfeProcessor 任务,是这样吗?

与其使用具有 N 个工作线程的固定线程池,我建议使用一个包含 N 个线程“池”的数组,每个池只有一个线程。然后,对于映射中的每个 pair,可以使用 pair.getValue().hashCode() 来选择哪个 N 个执行器应该执行该任务。

所有具有相同值("value" == "dealership",对吗?)的 pair 将被提交到同一个执行器,由于每个执行器是单线程的,它将确保从不同时为同一经销商执行两个任务。

英文:

I think you have a bit of an XY problem there. It looks to me as if your actual requirement is to ensure that no two threads run ConfirmNfeProcessor tasks for the same dealership at the same time, is that right?

Instead of having one fixed thread pool with N worker threads, I would use an array of N thread "pools" that have one thread each. Then, for each pair in the map, I would use pair.getValue().hashCode() to choose which one of the N executors should perform the task.

Every pair that has the same value ("value" == "dealership", right?) will be submitted to the same executor, and since each executor is single-threaded, it will be guaranteed to never perform two tasks for the same dealership at the same time.

答案2

得分: 2

最简单的实现方式是使用 CompletableFuture,它允许您链接依赖动作,比如在异步评估完成后提交另一个作业。

例如:

Map<UnidadeOrganizacionalView, CompletableFuture<Boolean>> pending = new HashMap<>();

for(Map.Entry<NotaResumoView, UnidadeOrganizacionalView> e: preparedNfs.entrySet()) {
    UnidadeOrganizacionalView value = e.getValue();
    ConfirmNfeProcessor p = new ConfirmNfeProcessor(Integration, e.getKey(), value);
    pending.compute(value, (key,future) -> future == null?
        CompletableFuture.supplyAsync(p, executor):
        future.thenApplyAsync(b -> p.get(), executor));
}

// 如果您想等待所有作业完成:
CompletableFuture.allOf(pending.values().toArray(new CompletableFuture<?>[0])).join();

它使用一个 Map<UnidadeOrganizacionalView, CompletableFuture<Boolean>> 来记住已经提交的作业。

这在 compute 方法内部使用。当值没有关联的先前作业时,它将调用 CompletableFuture.supplyAsync(p, executor) 来创建一个新作业并记住它。否则,它将通过 thenApplyAsync 创建一个新作业,在先前作业完成后启动,并记住这个新作业。

这假设可以将类 ConfirmNfeProcessor 从实现 Callable<Boolean> 更改为实现 Supplier<Boolean>。除了相关方法的名称之外,主要区别是 Supplier 不能抛出已检查异常。如果不可能进行这种更改,您需要适配器代码。

一个可能的解决方案是:

public static <R> CompletableFuture<R> callAsync(Callable<R> callable, Executor e) {
    CompletableFuture<R> cf = new CompletableFuture<>();
    CompletableFuture.runAsync(() -> {
        try { cf.complete(callable.call()); }
        catch(Throwable ex) { cf.completeExceptionally(ex); }
    }, e);
    return cf;
}
public static <R> CompletableFuture<R> thenCallAsync(
                  CompletableFuture<?> f, Callable<R> callable, Executor e) {
    CompletableFuture<R> cf = new CompletableFuture<>();
    f.whenCompleteAsync((value, t) -> {
        if(t != null) cf.completeExceptionally(t);
        else try { cf.complete(callable.call()); }
        catch(Throwable ex) { cf.completeExceptionally(ex); }
    }, e);
    return cf;
}

用法示例如下:

Map<UnidadeOrganizacionalView, CompletableFuture<Boolean>> pending = new HashMap<>();

for(Map.Entry<NotaResumoView, UnidadeOrganizacionalView> e: preparedNfs.entrySet()) {
    UnidadeOrganizacionalView value = e.getValue();
    ConfirmNfeProcessor p = new ConfirmNfeProcessor(Integration, e.getKey(), value);
    pending.compute(value, (key,future) -> future == null?
        callAsync(p, executor): thenCallAsync(future, p, executor));
}
英文:

The simplest way to achieve this, is to use CompletableFuture, which allows you to chain dependent actions, like submitting another job, when the asynchronous evaluation has been completed.

E.g.

Map&lt;UnidadeOrganizacionalView, CompletableFuture&lt;Boolean&gt;&gt; pending = new HashMap&lt;&gt;();

for(Map.Entry&lt;NotaResumoView, UnidadeOrganizacionalView&gt; e: preparedNfs.entrySet()) {
    UnidadeOrganizacionalView value = e.getValue();
    ConfirmNfeProcessor p = new ConfirmNfeProcessor(Integration, e.getKey(), value);
    pending.compute(value, (key,future) -&gt; future == null?
        CompletableFuture.supplyAsync(p, executor):
        future.thenApplyAsync(b -&gt; p.get(), executor));
}

// if you want to wait for the completion of all job:
CompletableFuture.allOf(pending.values().toArray(new CompletableFuture&lt;?&gt;[0])).join();

It uses a Map&lt;UnidadeOrganizacionalView, CompletableFuture&lt;Boolean&gt;&gt; to remember the already submitted jobs.

This is used within the compute method. It will call CompletableFuture.supplyAsync(p, executor) when there was no previous job associated with the value, to create a new one and remember it. Otherwise, it will create a new job via thenApplyAsync, that will be started when the previous one has been completed, and remembers this new job.

This assumes that it is possible to change the class ConfirmNfeProcessor from implementing Callable&lt;Boolean&gt; to implementing Supplier&lt;Boolean&gt;. The main difference, besides the name of the relevant method, is that a Supplier can not throw checked exceptions. If such a change is not possible, you need adapter code.

One possibility would be:

public static &lt;R&gt; CompletableFuture&lt;R&gt; callAsync(Callable&lt;R&gt; callable, Executor e) {
    CompletableFuture&lt;R&gt; cf = new CompletableFuture&lt;&gt;();
    CompletableFuture.runAsync(() -&gt; {
        try { cf.complete(callable.call()); }
        catch(Throwable ex) { cf.completeExceptionally(ex); }
    }, e);
    return cf;
}
public static &lt;R&gt; CompletableFuture&lt;R&gt; thenCallAsync(
                  CompletableFuture&lt;?&gt; f, Callable&lt;R&gt; callable, Executor e) {
    CompletableFuture&lt;R&gt; cf = new CompletableFuture&lt;&gt;();
    f.whenCompleteAsync((value, t) -&gt; {
        if(t != null) cf.completeExceptionally(t);
        else try { cf.complete(callable.call()); }
        catch(Throwable ex) { cf.completeExceptionally(ex); }
    }, e);
    return cf;
}

to be used like

Map&lt;UnidadeOrganizacionalView, CompletableFuture&lt;Boolean&gt;&gt; pending = new HashMap&lt;&gt;();

for(Map.Entry&lt;NotaResumoView, UnidadeOrganizacionalView&gt; e: preparedNfs.entrySet()) {
    UnidadeOrganizacionalView value = e.getValue();
    ConfirmNfeProcessor p = new ConfirmNfeProcessor(Integration, e.getKey(), value);
    pending.compute(value, (key,future) -&gt; future == null?
        callAsync(p, executor): thenCallAsync(future, p, executor));
}

huangapple
  • 本文由 发表于 2020年9月1日 04:47:04
  • 转载请务必保留本文链接:https://go.coder-hub.com/63677973.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定