Using `ConcurrentDictionary<String, SemaphoreSlim>` with thousands or even millions of entries to lock only on specific keys

huangapple go评论93阅读模式

Using `ConcurrentDictionary<String, SemaphoreSlim>` with thousands or even millions of entries to lock only on specific keys



是否合理使用 ConcurrentDictionary&lt;String, SemaphoreSlim&gt; 来锁定成千上万甚至数百万个条目,只锁定特定的键?也就是说,类似于:

  1. private static readonly ConcurrentDictionary&lt;String, SemaphoreSlim&gt; _Locks = new();
  2. ...
  3. var _Lock = _Locks.GetOrAdd(_Key, (_) =&gt; new SemaphoreSlim(1, 1));
  4. await _Lock.WaitAsync();
  5. try { ... } finally { _Lock.Release() }


  1. 潜在的 SemaphoreSlim 的数量(成千上万甚至数百万)
  2. 可能会多次调用 (_) =&gt; new SemaphoreSlim(1, 1),从而分配了 SemaphoreSlim,但最终未被使用。



我试图使用 SemaphoreSlim 锁定另一个按相同键作为缓存的 ConcurrentDictionary 的更新。

  1. private static readonly ConcurrentDictionary&lt;String, SemaphoreSlim&gt;
  2. _Locks = new();
  3. private static readonly ConcurrentDictionary&lt;String, ImmutableType&gt; _Cache = new();
  4. ...
  5. var _Value;
  6. var _Lock = _Locks.GetOrAdd(_Key, (_) =&gt; new SemaphoreSlim(1, 1));
  7. await _Lock.WaitAsync();
  8. try
  9. {
  10. if(!_Cache.TryGetValue(_Key, out _Value) || _Value.ExpirationTime &lt; DateTime.UtcNow)
  11. {
  12. //执行昂贵的操作以构造 _Value
  13. //如果无法构造 _Value,可能会从方法中返回
  14. //(我们不能使用 Lazy Task - 我们在服务器端的双向 gRPC 调用中)
  15. _Cache[_Key] = _Value;
  16. }
  17. } finally { _Lock.Release() }

请注意,_Value 类型是不可变类,我们只是试图在刷新特定键的缓存时避免阻塞其他调用者。



Is it reasonable to use a ConcurrentDictionary&lt;String, SemaphoreSlim&gt; with thousands or even millions of entries to lock only on specific keys? That is, something like

  1. private static readonly ConcurrentDictionary&lt;String, SemaphoreSlim&gt; _Locks = new();
  2. ...
  3. var _Lock = _Locks.GetOrAdd(_Key, (_) =&gt; new SemaphoreSlim(1, 1));
  4. await _Lock.WaitAsync();
  5. try { ... } finally { _Lock.Release() }

My main concerns would be:

  1. the sheer number of SemaphoreSlims that are potentially in play (thousands or even millions)
  2. (_) =&gt; new SemaphoreSlim(1, 1) potentially being called extra times such that there are SemaphoreSlims that are allocated but ultimately never used.

Update with further context:

I reality, I probably only need to support between 1k - 10k entries.

I am trying to use the SemaphoreSlims to lock on updates to another ConcurrentDictionary that acts as a cache by the same key.

  1. private static readonly ConcurrentDictionary&lt;String, SemaphoreSlim&gt;
  2. _Locks = new();
  3. private static readonly ConcurrentDictionary&lt;String, ImmutableType&gt; _Cache = new();
  4. ...
  5. var _Value;
  6. var _Lock = _Locks.GetOrAdd(_Key, (_) =&gt; new SemaphoreSlim(1, 1));
  7. await _Lock.WaitAsync();
  8. try
  9. {
  10. if(!_Cache.TryGetValue(_Key, out _Value) || _Value.ExpirationTime &lt; DateTime.UtcNow)
  11. {
  12. //do expensive operation to construct the _Value
  13. //possibly return from the method if we can&#39;t construct the _Value
  14. //(we can&#39;t use a Lazy Task - we are in the middle of a bi-direction gRPC call on the server side)
  15. _Cache[_Key] = _Value;
  16. }
  17. } finally { _Lock.Release() }

Note that the _Value type is an immutable class, we are just trying to avoid blocking other callers for other keys while refreshing our cache for the key in question.

Also note that I am not worried about evicting stale entries. We refresh them as needed but never remove them.


得分: 1






Having a ConcurrentDictionary&lt;K,V&gt; with millions of idle SemaphoreSlims sitting around is certainly concerning. It might not be a big deal if you have abundant memory available, but if you are aiming at economic use of resources it is possible to evict from the dictionary the SemaphoreSlims that are not actively used at the moment. It's not trivial because you have to track how many workers are using each semaphore, but it's not rocket science either. You can find implementations in this question:

If you are worried about SemaphoreSlims being left undisposed, see this question:

Disposing IDisposable instances is the correct thing to do in principle, but practically the SemaphoreSlim.Dispose method is a no-op, unless you are using the rarely used AvailableWaitHandle property.


得分: -1

这是我长时间以来用于保护我的Web API免受重复请求的解决方案。




  1. public sealed class KeyedSemaphore&lt;TKey&gt;
  2. {
  3. // 并发字典在变异时会锁定,我们只进行变异。
  4. // 在锁内使用普通集合
  5. private readonly HashSet&lt;TKey&gt; _keys;
  6. // 使用相同的tcs会导致所有等待任务,
  7. // 无论键如何,都会在每次释放时自旋
  8. private TaskCompletionSource&lt;object&gt; _tcs;
  9. public KeyedSemaphore(IEqualityComparer&lt;TKey&gt; comparer = null)
  10. {
  11. _keys = new HashSet&lt;TKey&gt;(comparer);
  12. }
  13. public async Task WaitAsync(TKey key)
  14. {
  15. while (true) // 这将循环直到成功将键添加到_keys
  16. {
  17. Task task;
  18. lock (_keys)
  19. {
  20. if (_keys.Add(key))
  21. return;
  22. if (_tcs == null || _tcs.Task.IsCompleted)
  23. {
  24. _tcs = new TaskCompletionSource&lt;object&gt;();
  25. }
  26. task = _tcs.Task;
  27. }
  28. await task.ConfigureAwait(false);
  29. }
  30. }
  31. public void Release(TKey key)
  32. {
  33. lock (_keys)
  34. {
  35. if (!_keys.Remove(key))
  36. return; // 可能是错误
  37. }
  38. _tcs?.TrySetResult(null);
  39. }
  40. }


  1. private static readonly KeyedSemaphore&lt;int&gt; _keyedSemaphore = new();
  2. public async Task CreateOrderAsync(Order order)
  3. {
  4. // 这确保了多个请求创建相同的订单时不会创建多个订单
  5. await _keyedSemaphore.WaitAsync(order.Id)
  6. try
  7. {
  8. if (OrderExists(order.Id))
  9. return Conflict();
  10. ...
  11. }
  12. finally {
  13. _keyedSemaphore.Release(order.Id);
  14. }
  15. }

This is a solution I have used a long time for protecting my webapi:s from duplicates.

It is based on a single TaskCompletionSource which is only used when the lock is actually contended.

One drawback is that all waiting tasks need to reacquire the lock on release even if they are waiting for a different key. It is a quick cycle for each task but if probability of contention is high a better solution might be to use a separate TCS for each contended key.

(my implementation returns an IDisposable and is using a plain List<TKey> for the keys but I simplified the code for this example)

  1. public sealed class KeyedSemaphore&lt;TKey&gt;
  2. {
  3. // A concurrent dictionary locks on mutations and we only do mutations.
  4. // Use a normal collection inside lock instead
  5. private readonly HashSet&lt;TKey&gt; _keys;
  6. // Using the same tcs will cause all waiting tasks,
  7. // independent of key, to spin on each release
  8. private TaskCompletionSource&lt;object&gt; _tcs;
  9. public KeyedSemaphore(IEqualityComparer&lt;TKey&gt; comparer = null)
  10. {
  11. _keys = new HashSet&lt;TKey&gt;(comparer);
  12. }
  13. public async Task WaitAsync(TKey key)
  14. {
  15. while (true) // this will loop until key is successfully added to _keys
  16. {
  17. Task task;
  18. lock (_keys)
  19. {
  20. if (_keys.Add(key))
  21. return;
  22. if (_tcs == null || _tcs.Task.IsCompleted)
  23. {
  24. _tcs = new TaskCompletionSource&lt;object&gt;();
  25. }
  26. task = _tcs.Task;
  27. }
  28. await task.ConfigureAwait(false);
  29. }
  30. }
  31. public void Release(TKey key)
  32. {
  33. lock (_keys)
  34. {
  35. if (!_keys.Remove(key))
  36. return; // maybe an error instead
  37. }
  38. _tcs?.TrySetResult(null);
  39. }
  40. }


  1. private static readonly KeyedSemaphore&lt;int&gt; _keyedSemaphore = new();
  2. public async Task CreateOrderAsync(Order order)
  3. {
  4. // This ensures multiple requests for creating same order
  5. // will not create multiple orders
  6. await _keyedSemaphore.WaitAsync(order.Id)
  7. try
  8. {
  9. if (OrderExists(order.Id)
  10. return Conflict();
  11. ...
  12. }
  13. finally {
  14. _keyedSemaphore.Release(order.Id);
  15. }
  16. }

  • 本文由 发表于 2023年5月14日 14:32:07
  • 转载请务必保留本文链接:



:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:
