用户级别的锁定方法,使用synchronizedMap和ReentrantLock。

huangapple go评论73阅读模式
英文:

User level Locking Approach With synchronizedMap & ReentrantLock

问题

我想实现如下代码中所示的基于用户的锁定。

如果在用户 "1234"(java.lang.String 类的标识符)上正在进行某些进程,或者用户正在被某些进程使用,其他进程应等待该进程完成。听起来很简单,我尝试使用 `ReentrantLock` 使其工作,但我陷入了永久的等待状态和死锁中,尽管我几乎要使用 `synchronized` 块使其工作,但我希望使用 `ReentrantLock` 来实现这一点。

以下是我的代码和日志。

如果需要相同的线程转储,请告诉我。

我在 macOS 上使用 Java 8。

(以下为代码和日志的翻译,不包含原文中的问题和回答部分)

//Research.java 文件
public static final int PERIOD = 10;
public static final int END_INCLUSIVE = 23;
public static final int N_THREADS = 8;

public static void main(String[] args) {
  final ExecutorService executorService = Executors.newFixedThreadPool(N_THREADS);
  IntStream.rangeClosed(1, END_INCLUSIVE).forEach(value -> executorService.submit(() -> {
    final String user = value % 2 == 0 ? "1234" : "5678";
    process(value + "process", user);
  }));
  executorService.shutdown();
}

private static void process(String tag, String user) {
  System.out.println("waiting tag=" + tag + ",user=" + user + ",TH=" + Thread.currentThread().getName());
  AccountLock.getInstance().lock(user);
  System.out.println("in tag=" + tag + ",user=" + user + ",TH=" + Thread.currentThread().getName());
  sleep(tag, PERIOD);
  AccountLock.getInstance().unlock(user);
  System.out.println("out tag=" + tag + ",user=" + user + ",TH=" + Thread.currentThread().getName());
}

private static void sleep(String tag, long s) {
  boolean interrupt = false;
  try {
    TimeUnit.SECONDS.sleep(s);
  } catch (InterruptedException e) {
    interrupt = true;
    e.printStackTrace();
  } finally {
    if (interrupt) {
      Thread.currentThread().interrupt();
    }
  }
}
/**
 * AccountLock
 */
final class AccountLock {
  private static final Map<String, Lock> LOCK_MAP = Collections.synchronizedMap(new HashMap<>());
  private static volatile AccountLock INSTANCE;

  private AccountLock() {
  }

  public static AccountLock getInstance() {
    if (INSTANCE == null) {
      synchronized (AccountLock.class) {
        if (INSTANCE == null) {
          INSTANCE = new AccountLock();
        }
      }
    }
    return INSTANCE;
  }

  public void lock(String user) {
    LOCK_MAP.computeIfPresent(user, (s, lock) -> {
      lock.lock();
      return lock;
    });
    LOCK_MAP.computeIfAbsent(user, s -> {
      final ReentrantLock lock = new ReentrantLock(true);
      lock.lock();
      return lock;
    });
  }

  public void unlock(String user) {
    LOCK_MAP.computeIfPresent(user, (s, lock) -> {
      lock.unlock();
      return null;
    });
  }
}
//日志
//waiting tag=2process,user=1234,TH=pool-1-thread-2
//waiting tag=3process,user=5678,TH=pool-1-thread-3
//waiting tag=1process,user=5678,TH=pool-1-thread-1
//waiting tag=4process,user=1234,TH=pool-1-thread-4
//waiting tag=5process,user=5678,TH=pool-1-thread-5
//in tag=3process,user=5678,TH=pool-1-thread-3
//in tag=4process,user=1234,TH=pool-1-thread-4
//in tag=5process,user=5678,TH=pool-1-thread-5
//waiting tag=6process,user=1234,TH=pool-1-thread-6
//waiting tag=7process,user=5678,TH=pool-1-thread-7
//waiting tag=8process,user=1234,TH=pool-1-thread-8
英文:

I want to achieve user based locking as shown in below code.

If on user "1234" (java.lang.String.class identifier) some process is happening, or user is being used by some process, other processes should wait for that process to complete. Simple as that it sounds, I tried making it work using ReenterrentLock but I get stuck in perpetual waiting state and deadlock, though I was about to make it work using synchronised block, I wanted to achieve this with ReenterrentLock.

Below is my code & logs.

Let me know what I am doing wrong.

//Research.java file
public static final int PERIOD = 10;
public static final int END_INCLUSIVE = 23;
public static final int N_THREADS = 8;
public static void main(String[] args) {
final ExecutorService executorService = Executors.newFixedThreadPool(N_THREADS);
IntStream.rangeClosed(1, END_INCLUSIVE).forEach(value -&gt; executorService.submit(() -&gt; {
final String user = value % 2 == 0 ? &quot;1234&quot; : &quot;5678&quot;;
process(value + &quot;process&quot;, user);
}));
executorService.shutdown();
}
private static void process(String tag, String user) {
System.out.println(&quot;waiting tag=&quot; + tag + &quot;,user=&quot; + user + &quot;,TH=&quot; + Thread.currentThread().getName());
AccountLock.getInstance().lock(user);
System.out.println(&quot;in tag=&quot; + tag + &quot;,user=&quot; + user + &quot;,TH=&quot; + Thread.currentThread().getName());
sleep(tag, PERIOD);
AccountLock.getInstance().unlock(user);
System.out.println(&quot;out tag=&quot; + tag + &quot;,user=&quot; + user + &quot;,TH=&quot; + Thread.currentThread().getName());
}
private static void sleep(String tag, long s) {
boolean interrupt = false;
try {
TimeUnit.SECONDS.sleep(s);
} catch (InterruptedException e) {
interrupt = true;
e.printStackTrace();
} finally {
if (interrupt) {
Thread.currentThread().interrupt();
}
}
}
/**
* AccountLock
*/
final class AccountLock {
private static final Map&lt;String, Lock&gt; LOCK_MAP = Collections.synchronizedMap(new HashMap&lt;&gt;());
private static volatile AccountLock INSTANCE;
private AccountLock() {
}
public static AccountLock getInstance() {
if (INSTANCE == null) {
synchronized (AccountLock.class) {
if (INSTANCE == null) {
INSTANCE = new AccountLock();
}
}
}
return INSTANCE;
}
public void lock(String user) {
LOCK_MAP.computeIfPresent(user, (s, lock) -&gt; {
lock.lock();
return lock;
});
LOCK_MAP.computeIfAbsent(user, s -&gt; {
final ReentrantLock lock = new ReentrantLock(true);
lock.lock();
return lock;
});
}
public void unlock(String user) {
LOCK_MAP.computeIfPresent(user, (s, lock) -&gt; {
lock.unlock();
return null;
});
}
}
//logs
//waiting tag=2process,user=1234,TH=pool-1-thread-2
//waiting tag=3process,user=5678,TH=pool-1-thread-3
//waiting tag=1process,user=5678,TH=pool-1-thread-1
//waiting tag=4process,user=1234,TH=pool-1-thread-4
//waiting tag=5process,user=5678,TH=pool-1-thread-5
//in tag=3process,user=5678,TH=pool-1-thread-3
//in tag=4process,user=1234,TH=pool-1-thread-4
//in tag=5process,user=5678,TH=pool-1-thread-5
//waiting tag=6process,user=1234,TH=pool-1-thread-6
//waiting tag=7process,user=5678,TH=pool-1-thread-7
//waiting tag=8process,user=1234,TH=pool-1-thread-8

Let me know if you need thread dumps for the same.

I am using java8 on mac

答案1

得分: 1

死锁源于你的地图和锁之间的交互。

更精确地说:你使用了一个 Collections.synchronized 地图,实际上在原始地图实例的每个(或多个)方法上都设置了互斥。

这意味着,例如,只要有人在 computeIfAbsent(或 computeIfPresent)调用内部,那么地图实例上就不能调用其他方法。

到目前为止没问题。

除了这一点, computeIfxx 调用内部,你还执行了另一个锁定操作,即获取锁实例。

拥有两个不同的锁,并且不维护严格的锁获取顺序,是造成死锁的完美条件,你在这里演示了这一点。

一个示例时间线:

  1. 线程 T1 进入地图(获取锁 M),创建锁 L1 并获取它。T1 持有 M 和 L1。
  2. 线程 T1 退出 map.computeXX,释放 M。T1 只持有 L1。
  3. 线程 T2 进入地图(获取 M)。T1 持有 L1,T2 持有 M。
  4. 线程 T2 尝试获取 L1(与 T1 相同的锁),被 T1 阻塞。T1 持有 L1,T2 持有 M 并等待 L1。你看出来了吗?
  5. 线程 T1 完成。它想要释放 L1,所以试图进入地图,这意味着尝试获取 M,而 M 被 T2 持有。T1 持有 L1,等待 M。T2 持有 M,等待 L1。游戏结束。

解决方案:诱人的做法是不要同时创建锁并获取它,例如在被锁定的方法内部不要获取锁。让我们试试(只是为了说明一点,你会看到有多困难)。

如果打破这种嵌套的锁定模式,当线程已经拥有另一个锁时,就不会出现锁定/解锁调用,从而防止任何死锁模式(单个可重入锁不会导致死锁)。

这意味着:将你的 AccountLock 类从锁定实例更改为锁定工厂实例。或者如果你想集成内容,可以采用不同的方法。

将锁获取移出地图的方法调用后,我已将其从地图的互斥体中移除,并消除了死锁可能性。

在纠正之后,我创建了另一个错误。

在解锁方面,你使用了 computeIfPresent,它返回 null。这意味着一旦完成对锁的使用,你打算从地图中将其删除。这一切都很好,但在解锁情况下会创建竞争条件。

  1. T1 为用户 1 创建锁 L1,将其放入地图并锁定它。
  2. T2 需要相同的 L1,从地图获取它,并等待其可用性。
  3. T1 完成,释放 L1(T2 尚不知道此情况),并将其从地图中删除。
  4. T3 进入并需要用户 1 的锁,但 L1 已从地图中消失,因此它创建了 L2 并获取了它。
  5. T2 发现 L1 可用,获取它并工作。
  6. 此时,T2 和 T3 都可以并发访问用户 1,这是不好的
  7. T2 先完成,转到地图实例并释放用户 1 的锁,即 L2,这是一个它从未首次获取的锁。IllegalMonitorStateException。

这没有简单的解决方法。你必须确保希望访问用户的并发线程具有一致的锁实例(在这里的问题是 T1 释放了一个锁,而 T2 拥有它,但尚未锁定它)。这在原始代码中不会发生——因为地图的互斥体,但该互斥体会造成死锁。

那么该怎么办呢?
你可以永远不要从地图中删除键。

public void unlock(String user) {
    LOCK_MAP.computeIfPresent(user, (s, lock) -> {
        lock.unlock();
        return lock;
    });

但这样一来,就永远不会从地图中释放键,地图会无限增长——可以添加一个线程安全的计数器来检查在释放它们之前是否正在“被获取”。这会导致另一个错误吗?

总之,这里仍然有一些需要加强的地方:

  • 锁释放应该在 finally 块中执行。
  • 你创建的锁实例数量没有上限。你可能希望避免这种情况(这是真正困难的部分)。
  • 在地图级别拥有互斥体有点浪费。ConcurrentHashMap 可以提供更精细的锁机制。
  • 谁知道还有什么错误。

结论

尝试查看此其他问题中提供的解决方案是否有助于实现你的目标。使用受尊敬的库实现可能更安全。

并发编程很难。别人已经为你做过了。

英文:

The deadlock spawns from the interaction of your map, and the locks.

More precisely : you use a Collections.syncrhonized map, which in effect, puts a mutual exclusion on every (or so) method of the orignal map instance.

That means, for example, that as long as someone is inside a computeIfAbsent (or computeIfPresent) call, then no other method can be called on the map instance.

Up to this point no problem.

Except that, inside the computeIfxx calls, you do another locking operation, by acquiring lock instances.

Having two different locks AND not maintaining a strict lock acquisition order is the perfect recipe to deadlock, which you demonstrate here.

A sample chronology :

  1. Thread T1 enters the Map (acquires the lock M), creates a Lock L1 and acquires it. T1 holds M and L1.
  2. Thread T1 exists the map.computeXX, releasing M. T1 holds L1 (only).
  3. Thread T2 enters the map (acquires M). T1 holds L1, T2 holds M.
  4. Thread T2 tries to acquire L1 (the same one as T1), is blocked by T1. T1 holds L1, T2 holds M and waits for L1. Do you see it coming ?
  5. Thread T1 is done. It wants to release L1, so tries to enter the map, which means trying to acquire M, which is held by T2. T1 holds L1, waits for M. T2 holds M, waits for L1. Game over.

The solution : it is tempting to not try to create the lock and take it at the same time, e.g. do not acquire the lock when inside a locked method. Let's try (just to make a point, you'll see how hard it gets).

If you break this embedded lock pattern, you'll never have lock/unlock calls when your threads already have another lock, preventing any deadlock pattern (deadlocks can not occur with a single reentrant lock).

Which means : change your AccountLock class from a locking instance to a lock factory instance. Or if you want to integrate stuff, do it differently.

lock(String user) {
LOCKS.computeIfAbsent(user, u -&gt; new ReentrantLock()).lock();
}

by moving the lock acquisition outside the map's method call, i've removed it from the map's mutex, and I eliminated the deadlock possibility.

With that corrected, I created another bug.

On the unlock side, you use computeIfPresent which returns null. That means that once done with a lock, you intend to remove it from the map. This is all well, but it creates a race condition in the unlock case.

  1. T1 creates lock L1 for user 1, puts it in the map and locks it.
  2. T2 wants the same L1, gets it from the map, and waits for its availability
  3. T1 finishes, releases L1 (T2 does not yet know about it), and removes it from the map
  4. T3 comes in and wants a lock for user 1, but L1 is gone from the map, so it creates L2 and acquires it.
  5. T2 sees that L1 is free, acquires it and works
  6. At this point, both T2 and T3 have concurrent access to user 1, which is bad
  7. T2 finished first, goes to the map instance and frees the lock for user 1 which is L2, a lock it never acquired in the first place. IllegalMonitorStateException.

There is no easy way out of this one. You have to make sure that concurrent threads wanting to access a user have a consistant lock instance (here the problem is that T1 releases a lock while T2 has a hold on it, but has not locked it yet). This could not happen with your original code - because of the map's mutex, but that mutex created deadlocks.

So what to do ?
You could never remove keys from the map.

public void unlock(String user) {
LOCK_MAP.computeIfPresent(user, (s, lock) -&gt; {
lock.unlock();
return lock;
});

But then nobody ever releases keys from the map, which grows indefinitely - one could add a thread-safe counter to check that locks are not "being acquired" before releasing them. Will that create in turn another bug ?

So that being said, there are still a few things to strengthen here :

  • Lock release should be performed in finally blocks
  • The number of lock instances you create is unbounded. You might want to avoid that (this is the real difficult part)
  • Having a mutex at the map's level is kind of a waste. A ConcurrentHashMap would provide much finer grained lock mechanism
  • Who knows what bug is left here.

Conclusion

Try and see if any solution provided in this other question helps you achieve your goal. It might be safer to use a respected library's implementation.

Concurrent programming is hard. Others have done it for you.

huangapple
  • 本文由 发表于 2020年9月9日 22:37:56
  • 转载请务必保留本文链接:https://go.coder-hub.com/63813962.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定