防止线程阻塞排队的线程

huangapple go评论79阅读模式
英文:

Prevent thread blocking queued threads

问题

我有一个任务,会使用不同的值多次运行。我想防止它同时执行两个相同任务(基于字符串值)。下面是字符串的示例。这些值将会变化,但为了简单起见,我在示例中包含了这些值。我通过ExecutorService提交这些任务。任务会运行,但第二个"hi"会阻止其他任务运行。所以有4/5个任务同时运行。一旦第一个"hi"释放锁定,第五个任务继续运行,其他任务也正常继续。有没有办法防止这种类型的任务阻塞,以便其他3个任务在它之前运行,这样在实际上有5个任务同时运行之前不会排队。

任务的提交:

executor.submit(new Task("hi"));
executor.submit(new Task("h"));
executor.submit(new Task("u"));
executor.submit(new Task("y"));
executor.submit(new Task("hi"));
executor.submit(new Task("p"));
executor.submit(new Task("o"));
executor.submit(new Task("bb"));

任务很简单,它只是打印出字符串:

Lock l = getLock(x);
try {
    l.lock();

    System.out.println(x);

    try {
        Thread.sleep(5000);
    } catch (InterruptedException ex) {
        Logger.getLogger(Task.class.getName()).log(Level.SEVERE, null, ex);
    }

} finally {
    l.unlock();
}

我已更新帖子,以便更清楚地理解...

英文:

I have a task that will run many times with different values. I'd like to prevent it from executing 2 of the same tasks (Based on the string value) at the same time. Below is an example of the strings. These values will change, but for simplicity I have included these values below in the example. I submit these tasks via an ExecutorService The tasks run, but the 2nd hi blocks the other tasks from running. So 4/5 tasks run concurrently. Once the lock is released from the first hi the 5th tasks continues and the other tasks continue fine. Is there a way to prevent this type of blocking of the task so that the other 3 tasks can run before it so there is no queuing until there is actually 5 tasks running concurrently.

Submission of the tasks:

executor.submit(new Task("hi"));
executor.submit(new Task("h"));
executor.submit(new Task("u"));
executor.submit(new Task("y"));
executor.submit(new Task("hi"));
executor.submit(new Task("p"));
executor.submit(new Task("o"));
executor.submit(new Task("bb"));

The Task is simple. It just prints out the string:

Lock l = getLock(x);
try {
l.lock();

System.out.println(x);

try {
Thread.sleep(5000);
} catch (InterruptedException ex) {
Logger.getLogger(Task.class.getName()).log(Level.SEVERE, null, ex);
}

} finally {
l.unlock();

}

I've updated the post to allow for things to be more clearly understood...

答案1

得分: 3

为了避免阻塞线程,您必须确保在另一个线程之前甚至不运行该操作。例如,您可以使用CompletableFuture来链式安排一个操作,以在前一个操作完成后调度:

public static void main(String[] args) {
    ExecutorService es = Executors.newFixedThreadPool(2);
    for(int i = 0; i < 5; i++) submit("one", task("one"), es);
    for(int i = 0; i < 5; i++) submit("two", task("two"), es);
    LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(26));
    es.shutdown();
}

static Runnable task(String x) {
    return () -> {
        System.out.println(x);
        LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));
    };
}

static final ConcurrentHashMap<String, CompletableFuture<Void>> MAP
    = new ConcurrentHashMap<>();

static final void submit(String key, Runnable task, Executor e) {
    CompletableFuture<Void> job = MAP.compute(key,
        (k, previous) -> previous != null?
            previous.thenRunAsync(task, e): CompletableFuture.runAsync(task, e));
    job.whenComplete((v,t) -> MAP.remove(key, job));
}

ConcurrentHashMap 允许我们处理以下情况作为原子更新:

  • 如果关键字没有先前的未来存在,只需安排操作,创建未来

  • 如果已经存在先前的未来,链式安排操作,以在前一个完成后调度;依赖操作将成为新的未来

  • 如果作业已完成,则两个参数的 remove(key, job) 将仅在它仍然是当前作业时将其移除

main 方法中的示例演示了如何使用两个线程的线程池来运行两个独立的操作,永远不会阻塞线程。

英文:

To avoid blocking a thread, you have to ensure that the action doesn’t even run before the other. For example, you can use a CompletableFuture to chain an action, to be scheduled when the previous has been completed:

public static void main(String[] args) {
    ExecutorService es = Executors.newFixedThreadPool(2);
    for(int i = 0; i < 5; i++) submit("one", task("one"), es);
    for(int i = 0; i < 5; i++) submit("two", task("two"), es);
    LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(26));
    es.shutdown();
}

static Runnable task(String x) {
    return () -> {
        System.out.println(x);
        LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));
    };
}

static final ConcurrentHashMap<String, CompletableFuture<Void>> MAP
    = new ConcurrentHashMap<>();

static final void submit(String key, Runnable task, Executor e) {
    CompletableFuture<Void> job = MAP.compute(key,
        (k, previous) -> previous != null?
            previous.thenRunAsync(task, e): CompletableFuture.runAsync(task, e));
    job.whenComplete((v,t) -> MAP.remove(key, job));
}

The ConcurrentHashMap allows us to handle the cases as atomic updates

  • If no previous future exists for a key, just schedule the action, creating the future

  • If a previous future exists, chain the action, to be scheduled when the previous completed; the dependent action becomes the new future

  • If a job completed, the two-arg remove(key, job) will remove it if and only if it is still the current job

The example in the main method demonstrates how two independent actions can run with a thread pool of two threads, never blocking at thread.

huangapple
  • 本文由 发表于 2023年1月9日 17:59:55
  • 转载请务必保留本文链接:https://go.coder-hub.com/75055627.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定