Using `ConcurrentDictionary<String, SemaphoreSlim>` with thousands or even millions of entries to lock only on specific keys

huangapple go评论75阅读模式
英文:

Using `ConcurrentDictionary<String, SemaphoreSlim>` with thousands or even millions of entries to lock only on specific keys

问题

以下是您提供的内容的翻译:

是否合理使用 ConcurrentDictionary&lt;String, SemaphoreSlim&gt; 来锁定成千上万甚至数百万个条目,只锁定特定的键?也就是说,类似于:

private static readonly ConcurrentDictionary&lt;String, SemaphoreSlim&gt; _Locks = new();
...
var _Lock = _Locks.GetOrAdd(_Key, (_) =&gt; new SemaphoreSlim(1, 1));
await _Lock.WaitAsync();
try { ... } finally { _Lock.Release() }

我主要关心的是:

  1. 潜在的 SemaphoreSlim 的数量(成千上万甚至数百万)
  2. 可能会多次调用 (_) =&gt; new SemaphoreSlim(1, 1),从而分配了 SemaphoreSlim,但最终未被使用。

进一步的背景信息更新:

实际上,我可能只需要支持1,000到10,000个条目。

我试图使用 SemaphoreSlim 锁定另一个按相同键作为缓存的 ConcurrentDictionary 的更新。

private static readonly ConcurrentDictionary&lt;String, SemaphoreSlim&gt; 
_Locks = new();
private static readonly ConcurrentDictionary&lt;String, ImmutableType&gt; _Cache = new();
...
var _Value;
var _Lock = _Locks.GetOrAdd(_Key, (_) =&gt; new SemaphoreSlim(1, 1));
await _Lock.WaitAsync();
try 
{ 
  if(!_Cache.TryGetValue(_Key, out _Value) || _Value.ExpirationTime &lt; DateTime.UtcNow)
  {
    //执行昂贵的操作以构造 _Value
    //如果无法构造 _Value,可能会从方法中返回
    //(我们不能使用 Lazy Task - 我们在服务器端的双向 gRPC 调用中)
    _Cache[_Key] = _Value;
  }  
} finally { _Lock.Release() }

请注意,_Value 类型是不可变类,我们只是试图在刷新特定键的缓存时避免阻塞其他调用者。

还请注意,我不担心清除过期条目。我们根据需要刷新它们,但不会删除它们。

英文:

Is it reasonable to use a ConcurrentDictionary&lt;String, SemaphoreSlim&gt; with thousands or even millions of entries to lock only on specific keys? That is, something like

private static readonly ConcurrentDictionary&lt;String, SemaphoreSlim&gt; _Locks = new();
...
var _Lock = _Locks.GetOrAdd(_Key, (_) =&gt; new SemaphoreSlim(1, 1));
await _Lock.WaitAsync();
try { ... } finally { _Lock.Release() }

My main concerns would be:

  1. the sheer number of SemaphoreSlims that are potentially in play (thousands or even millions)
  2. (_) =&gt; new SemaphoreSlim(1, 1) potentially being called extra times such that there are SemaphoreSlims that are allocated but ultimately never used.

Update with further context:

I reality, I probably only need to support between 1k - 10k entries.

I am trying to use the SemaphoreSlims to lock on updates to another ConcurrentDictionary that acts as a cache by the same key.

private static readonly ConcurrentDictionary&lt;String, SemaphoreSlim&gt; 
_Locks = new();
private static readonly ConcurrentDictionary&lt;String, ImmutableType&gt; _Cache = new();
...
var _Value;
var _Lock = _Locks.GetOrAdd(_Key, (_) =&gt; new SemaphoreSlim(1, 1));
await _Lock.WaitAsync();
try 
{ 
  if(!_Cache.TryGetValue(_Key, out _Value) || _Value.ExpirationTime &lt; DateTime.UtcNow)
  {
    //do expensive operation to construct the _Value
    //possibly return from the method if we can&#39;t construct the _Value
    //(we can&#39;t use a Lazy Task - we are in the middle of a bi-direction gRPC call on the server side)
    _Cache[_Key] = _Value;
  }  
} finally { _Lock.Release() }

Note that the _Value type is an immutable class, we are just trying to avoid blocking other callers for other keys while refreshing our cache for the key in question.

Also note that I am not worried about evicting stale entries. We refresh them as needed but never remove them.

答案1

得分: 1

以下是翻译好的内容:

拥有数百万个闲置的ConcurrentDictionary&lt;K,V&gt;SemaphoreSlim可能会引发担忧。如果您有充足的内存可用,这可能不是什么大问题,但如果您的目标是经济使用资源,那么有可能从字典中移除那些当前未被活跃使用的SemaphoreSlim。这并不是一件简单的事情,因为您需要跟踪每个信号量有多少工作线程正在使用,但也不是什么高深的科学。您可以在这个问题中找到实现方式:

如果您担心未释放SemaphoreSlim,请参考这个问题:

原则上,释放IDisposable实例是正确的做法,但实际上,SemaphoreSlim.Dispose方法是一个空操作,除非您使用了很少使用的AvailableWaitHandle属性。

英文:

Having a ConcurrentDictionary&lt;K,V&gt; with millions of idle SemaphoreSlims sitting around is certainly concerning. It might not be a big deal if you have abundant memory available, but if you are aiming at economic use of resources it is possible to evict from the dictionary the SemaphoreSlims that are not actively used at the moment. It's not trivial because you have to track how many workers are using each semaphore, but it's not rocket science either. You can find implementations in this question:

If you are worried about SemaphoreSlims being left undisposed, see this question:

Disposing IDisposable instances is the correct thing to do in principle, but practically the SemaphoreSlim.Dispose method is a no-op, unless you are using the rarely used AvailableWaitHandle property.

答案2

得分: -1

这是我长时间以来用于保护我的Web API免受重复请求的解决方案。

它基于一个单独的TaskCompletionSource,仅在锁实际争用时使用。

一个缺点是,即使等待的任务是为不同的键而等待,所有等待任务都需要在释放时重新获取锁。对于每个任务来说这是一个快速的过程,但如果争用的概率很高,更好的解决方案可能是为每个争用的键使用单独的TCS。

(我的实现返回一个IDisposable,并使用一个普通的List来存储键,但为了这个示例,我简化了代码)

public sealed class KeyedSemaphore&lt;TKey&gt;
{
    // 并发字典在变异时会锁定,我们只进行变异。
    // 在锁内使用普通集合
    private readonly HashSet&lt;TKey&gt; _keys;

    // 使用相同的tcs会导致所有等待任务,
    // 无论键如何,都会在每次释放时自旋
    private TaskCompletionSource&lt;object&gt; _tcs;

    public KeyedSemaphore(IEqualityComparer&lt;TKey&gt; comparer = null)
    {
        _keys = new HashSet&lt;TKey&gt;(comparer);
    }

    public async Task WaitAsync(TKey key)
    {
        while (true) // 这将循环直到成功将键添加到_keys
        {
            Task task;

            lock (_keys)
            {
                if (_keys.Add(key))
                    return;

                if (_tcs == null || _tcs.Task.IsCompleted)
                {
                    _tcs = new TaskCompletionSource&lt;object&gt;();
                }

                task = _tcs.Task;
            }

            await task.ConfigureAwait(false);
        }
    }

    public void Release(TKey key)
    {
        lock (_keys)
        {
            if (!_keys.Remove(key))
                return; // 可能是错误
        }

        _tcs?.TrySetResult(null);
    }
}

用法:

private static readonly KeyedSemaphore&lt;int&gt; _keyedSemaphore = new();

public async Task CreateOrderAsync(Order order)
{
    // 这确保了多个请求创建相同的订单时不会创建多个订单
    await _keyedSemaphore.WaitAsync(order.Id)
    try 
    {
       if (OrderExists(order.Id))
           return Conflict();
       ... 
    }
    finally {
        _keyedSemaphore.Release(order.Id);
    }
}
英文:

This is a solution I have used a long time for protecting my webapi:s from duplicates.

It is based on a single TaskCompletionSource which is only used when the lock is actually contended.

One drawback is that all waiting tasks need to reacquire the lock on release even if they are waiting for a different key. It is a quick cycle for each task but if probability of contention is high a better solution might be to use a separate TCS for each contended key.

(my implementation returns an IDisposable and is using a plain List<TKey> for the keys but I simplified the code for this example)

public sealed class KeyedSemaphore&lt;TKey&gt;
{
    // A concurrent dictionary locks on mutations and we only do mutations.
    // Use a normal collection inside lock instead
    private readonly HashSet&lt;TKey&gt; _keys;

    // Using the same tcs will cause all waiting tasks, 
    // independent of key, to spin on each release
    private TaskCompletionSource&lt;object&gt; _tcs;

    public KeyedSemaphore(IEqualityComparer&lt;TKey&gt; comparer = null)
    {
        _keys = new HashSet&lt;TKey&gt;(comparer);
    }

    public async Task WaitAsync(TKey key)
    {
        while (true) // this will loop until key is successfully added to _keys
        {
            Task task;

            lock (_keys)
            {
                if (_keys.Add(key))
                    return;

                if (_tcs == null || _tcs.Task.IsCompleted)
                {
                    _tcs = new TaskCompletionSource&lt;object&gt;();
                }

                task = _tcs.Task;
            }

            await task.ConfigureAwait(false);
        }
    }

    public void Release(TKey key)
    {
        lock (_keys)
        {
            if (!_keys.Remove(key))
                return; // maybe an error instead
        }

        _tcs?.TrySetResult(null);
    }
}

Usage:

private static readonly KeyedSemaphore&lt;int&gt; _keyedSemaphore = new();

public async Task CreateOrderAsync(Order order)
{
    // This ensures multiple requests for creating same order
    // will not create multiple orders
    await _keyedSemaphore.WaitAsync(order.Id)
    try 
    {
       if (OrderExists(order.Id)
           return Conflict();
       ... 
    }
    finally {
        _keyedSemaphore.Release(order.Id);
    }
}

huangapple
  • 本文由 发表于 2023年5月14日 14:32:07
  • 转载请务必保留本文链接:https://go.coder-hub.com/76246149.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定