java并发 – 在特定任务之间保持顺序

huangapple go评论80阅读模式
英文:

java concurrency - keep order between certain tasks

问题

private ExecutorService executorService = Executors.newFixedThreadPool(10);
private ConcurrentHashMap<String, BlockingQueue<String>> userMessageQueues = new ConcurrentHashMap<>();

public void onMessage(String user, String message) {
    userMessageQueues.computeIfAbsent(user, key -> new LinkedBlockingQueue<>()).offer(message);
    
    if (!executorService.isShutdown()) {
        executorService.execute(() -> processMessagesForUser(user));
    }
}

private void processMessagesForUser(String user) {
    BlockingQueue<String> messageQueue = userMessageQueues.get(user);
    
    while (!messageQueue.isEmpty()) {
        String message = messageQueue.poll();
        if (message != null) {
            processMessage(message);
        }
    }
}

public void shutdown() {
    executorService.shutdown();
}

private void processMessage(String message) {
    // Process the message logic here
}

请注意,上述代码是基于您的要求编写的,用于按顺序执行每个用户的消息,同时允许并行处理不同用户的消息。代码创建一个线程池 executorService 来处理消息。每个用户都有一个关联的消息队列,其中消息会被顺序地添加。在 onMessage 方法中,消息被添加到相应用户的消息队列中,然后检查线程池是否正在关闭,如果没有,则会提交一个任务来处理该用户的消息。processMessagesForUser 方法从消息队列中取出消息并处理,确保按照顺序执行。最后,shutdown 方法用于关闭线程池。

请注意,代码可能需要根据您的实际应用进行调整和优化。

英文:

My app receives messages from several users. Messages per user must be executed in order, but otherwise messages can be executed in parallel. How to implement such logic?

E.g. messages arrive in this order: u1:m1 u1:m2 u2:m1 u1:m3 u2:m2. Execution should be parallel like this:

  • thread1: u1:m1 u1:m2 u1:m3
  • thread2: u2:m1 u2:m2

Number of users can be huge, therefore I can't just create a single threaded executor per user.

private ExcutorService executorService = newFixedThreadPool(10);

public void onMessage(String user, String message) {
  // TODO schedule tasks per user in order
  executorService.schedule(() -&gt; processMessage(message));
}

</details>


# 答案1
**得分**: 3

你可以这样做:

            // 创建执行器
            int 线程数量 = 10;
            ExecutorService[] 执行器们 = new ExecutorService[线程数量];
            for (int i = 0; i < 线程数量; i++) {
                执行器们[i] = Executors.newSingleThreadExecutor();
            }

然后在你的方法中,使用特定的执行器来处理用户的消息,利用哈希码和取模的方式:

    public void onMessage(String 用户, String 消息) {
        // 同一个用户将始终使用相同的执行器,哈希码会将负载均匀地分布在执行器之间
        int 要使用的执行器 = Math.abs(用户.hashCode()) % 线程数量;
        ExecutorService 执行器服务 = 执行器们[要使用的执行器];
        执行器服务.execute(() -> 处理消息(消息));
    }

<details>
<summary>英文:</summary>

You can do it like this:

            //CREATE EXECUTORS
            int numberOfThreads = 10;
            ExecutorService[] executors = new ExecutorService[numberOfThreads];
            for (int i = 0; i &lt; numberOfThreads; i++) {
                executors[i] = Executors.newSingleThreadExecutor();
            }

Then in your method use a specific executor for the user with the help of hashCode and modulo:

    public void onMessage(String user, String message) {
        //same user will always get the same executor, hashCode will evenly distribute the load among the executors
        int executorToUse = Math.abs(user.hashCode()) % numberOfThreads; 
        ExecutorService executorService = executors[executorToUse];
        executorService.execute(() -&gt; processMessage(message));
    }



</details>



# 答案2
**得分**: 0

以下是翻译好的内容:

```java
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class GroupingBlockingQueue<G, E> {

    private static final int UNBOUNDED = -1;

    private final Lock lock = new ReentrantLock();

    private final Condition notFull = lock.newCondition();

    private final Condition notEmpty = lock.newCondition();

    private final Map<G, List<E>> map = new LinkedHashMap<>();

    private final int bound;

    public GroupingBlockingQueue() {
        this(UNBOUNDED);
    }

    public GroupingBlockingQueue(int bound) {
        this.bound = bound;
    }

    public void put(G group, E element) throws InterruptedException {
        lock.lock();
        try {
            if (bound > 0) {
                while (bound == map.keySet().size()) {
                    notFull.await();
                }
            }
            map.computeIfAbsent(group, k -> new ArrayList<>()).add(element);
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    public Item<G, E> take() throws InterruptedException {
        lock.lock();
        try {
            Set<G> keys = map.keySet();
            while (keys.isEmpty()) {
                notEmpty.await();
            }
            G group = keys.iterator().next();
            List<E> elements = map.remove(group);
            notFull.signal();
            return new Item<>(group, elements);
        } finally {
            lock.unlock();
        }
    }

    public static class Item<G, E> {
        private final G group;
        private final List<E> elements;

        public Item(G group, List<E> elements) {
            this.group = group;
            this.elements = elements;
        }

        public G getGroup() {
            return group;
        }

        public List<E> getElements() {
            return elements;
        }
    }
}
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

import static java.util.concurrent.Executors.newFixedThreadPool;

public class UserMessageProcessor implements AutoCloseable {

    private GroupingBlockingQueue<String, String> queue = new GroupingBlockingQueue<>();
    private ExecutorService executorService;

    public UserMessageProcessor(int threadCount) {
        executorService = newFixedThreadPool(threadCount);
        for (int i = 0; i < threadCount; i++) {
            executorService.submit((Runnable) this::processMessages);
        }
    }

    private void processMessages() {
        try {
            while (!Thread.currentThread().isInterrupted()) {
                GroupingBlockingQueue.Item<String, String> item = queue.take();
                processMessages(item.getGroup(), item.getElements());
            }
        } catch (InterruptedException ignore) {}
    }

    private void processMessages(String user, List<String> messages) throws InterruptedException {
        for (String message : messages) {
            System.out.println(String.format("[%s] %s:%s", Thread.currentThread().getName(), user, message));
            Thread.sleep(TimeUnit.SECONDS.toMillis(1));
        }
    }

    public void onMessage(String user, String message) throws InterruptedException {
        queue.put(user, message);
    }

    @Override
    public void close() {
        try {
            executorService.awaitTermination(5, TimeUnit.SECONDS);
        } catch (InterruptedException ignore) {
        }
        executorService.shutdown();
    }

    public static void main(String[] args) throws InterruptedException {
        try (UserMessageProcessor processor = new UserMessageProcessor(10)) {
            processor.onMessage("u1", "m1");
            processor.onMessage("u2", "m1");
            processor.onMessage("u1", "m2");
            processor.onMessage("u2", "m2");
            processor.onMessage("u1", "m3");
        }
    }
}
英文:

You can decouple message producer and processor by kind of grouping queue:

import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class GroupingBlockingQueue&lt;G, E&gt; {
private static final int UNBOUNDED = -1;
private final Lock lock = new ReentrantLock();
private final Condition notFull  = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
private final Map&lt;G, List&lt;E&gt;&gt; map = new LinkedHashMap&lt;&gt;();
private final int bound;
public GroupingBlockingQueue() {
this(UNBOUNDED);
}
public GroupingBlockingQueue(int bound) {
this.bound = bound;
}
public void put(G group, E element) throws InterruptedException {
lock.lock();
try {
if (bound &gt; 0) {
while (bound == map.keySet().size()) {
notFull.await();
}
}
map.computeIfAbsent(group, k -&gt; new ArrayList&lt;&gt;()).add(element);
notEmpty.signal();
} finally {
lock.unlock();
}
}
public Item&lt;G, E&gt; take() throws InterruptedException {
lock.lock();
try {
Set&lt;G&gt; keys = map.keySet();
while (keys.isEmpty()) {
notEmpty.await();
}
G group = keys.iterator().next();
List&lt;E&gt; elements = map.remove(group);
notFull.signal();
return new Item&lt;&gt;(group, elements);
} finally {
lock.unlock();
}
}
public static class Item&lt;G, E&gt; {
private final G group;
private final List&lt;E&gt; elements;
public Item(G group, List&lt;E&gt; elements) {
this.group = group;
this.elements = elements;
}
public G getGroup() {
return group;
}
public List&lt;E&gt; getElements() {
return elements;
}
}
}

Then message processor may look like:

package ru.dkovalev;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import static java.util.concurrent.Executors.newFixedThreadPool;
public class UserMessageProcessor implements AutoCloseable {
private GroupingBlockingQueue&lt;String, String&gt; queue = new GroupingBlockingQueue&lt;&gt;();
private ExecutorService executorService;
public UserMessageProcessor(int threadCount) {
executorService = newFixedThreadPool(threadCount);
for (int i = 0; i &lt; threadCount; i++) {
executorService.submit((Runnable) this::processMessages);
}
}
private void processMessages() {
try {
while (!Thread.currentThread().isInterrupted()) {
GroupingBlockingQueue.Item&lt;String, String&gt; item = queue.take();
processMessages(item.getGroup(), item.getElements());
}
} catch (InterruptedException ignore) {}
}
private void processMessages(String user, List&lt;String&gt; messages) throws InterruptedException {
for (String message : messages) {
System.out.println(String.format(&quot;[%s] %s:%s&quot;, Thread.currentThread().getName(), user, message));
Thread.sleep(TimeUnit.SECONDS.toMillis(1));
}
}
public void onMessage(String user, String message) throws InterruptedException {
queue.put(user, message);
}
@Override
public void close() {
try {
executorService.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException ignore) {
}
executorService.shutdown();
}
public static void main(String[] args) throws InterruptedException {
try (UserMessageProcessor processor = new UserMessageProcessor(10)) {
processor.onMessage(&quot;u1&quot;, &quot;m1&quot;);
processor.onMessage(&quot;u2&quot;, &quot;m1&quot;);
processor.onMessage(&quot;u1&quot;, &quot;m2&quot;);
processor.onMessage(&quot;u2&quot;, &quot;m2&quot;);
processor.onMessage(&quot;u1&quot;, &quot;m3&quot;);
}
}
}

huangapple
  • 本文由 发表于 2020年5月5日 19:27:17
  • 转载请务必保留本文链接:https://go.coder-hub.com/61612044.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定