如何按顺序处理特定键的事件

huangapple go评论68阅读模式
英文:

How to process events in order for a specific key

问题

我没有为我的问题找到一个合适的问题标题。

我有一个监听器接收事件,每个事件都有一个对象标识。因为某些操作需要一些时间来处理,我从一个同步处理(其中我逐个处理每个事件,锁定监听器)切换到了一个固定线程池执行程序(有10个线程)。

问题是,对于特定的对象标识,我仍希望按照事件发出的顺序处理事件。

问题是,对象标识是随机的,我不能为每个对象标识预先声明一个线程。

如果我想简化一下:假设我有一个线程数组,每个具有标识为0的事件都将通过索引为0的线程处理,对于每个具有标识为1的事件,它们将通过索引为1的线程处理。

如果标识在范围内,比如从0到10,那么它能够很好地工作,但这不适用于我的情况,它们可以是任何整数值。

英文:

I didn't find a good question title for my problem.

I have a listener that receives events, each event has an object id. Because some actions take some time to process, I switch to a synchronous processing (where I treat each event one at a time, locking the listener) to a fixed thread pool executor (with 10 threads).

The problem is that for a specific object id, I still want to process events in the same order the events are emitted.

The problem is that the object id is random, I can't pre-declare a thread for each object id.

If I want to simplify it: let's say I have an array of threads, each event with id 0 would be treated via thread at index 0, and for each event with 1 they would be treated via thread at index 1.

It works well if id is within a range, like 0 to 10, but this is not my case, they can have any int value.

答案1

得分: 2

要处理对象ID的事件,您需要确保它进入同一线程进行处理。为了做到这一点,您可以计算对象ID的哈希值,然后对线程数取模,这将确保相同的对象ID由相同的线程或相同的线程池进行处理。

int objectIdHash = objectId.hashCode();

return (objectIdHash % totalThreads);

现在,要存储用于处理的线程或线程池,您可以使用并发HashMap。

// 定义线程池
private Map<Integer, ExecutorService> threadPool = new ConcurrentHashMap<Integer, ExecutorService>();

// 使用上述方法获取线程ID
int threadId = getJobThreadPoolId(objectId, totalThreads);

// 从线程池中获取线程并提交事件
threadPool.get(threadId).submit(event);
英文:

To process the events of an object ID you need to make sure that it is coming to same thread for processing. To do that what you can do is you can calculate the hash of the object ID and then mod it by the number of threads you have which will make sure the same object ID is processed by same thread or by same threadpool.

int objectIdHash = objectId.hashCode();
	
return (objectIdHash % totalThreads);

Now to store the threads or threadpool for processing you can use the concurrent HashMap.

// Define the thread pool
private Map&lt;Integer, ExecutorService&gt; threadPool = new ConcurrentHashMap&lt;Integer, ExecutorService&gt;();

// Get the thread id using the above
int threadId = getJobThreadPoolId(objectId, totalThreads);

// Get the thread from the threadpool and submit
threadpool.get(threadId).submit(event);

huangapple
  • 本文由 发表于 2020年9月10日 23:20:32
  • 转载请务必保留本文链接:https://go.coder-hub.com/63832815.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定