Java公平可重入锁在多次迭代中不是公平的。

huangapple go评论57阅读模式
英文:

Java Fair Reentrant lock isn't fair in multiple iterations

问题

问题是这个测试在大约50%的执行中失败。要让测试在100%的执行中通过,你可以尝试以下修复:

  1. 线程池的管理:在每次迭代中,你都创建了一个新的线程池,但没有等待线程池中的任务完成。这可能导致线程池中的任务在后续迭代中仍在运行,影响了测试的准确性。你可以将线程池的创建移出循环,并在每次迭代后等待线程池中的任务完成,然后再关闭线程池。
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(waitingThreadsCount + 1);
for (int i = 0; i < iterationsCount; i++) {
    // ... 省略其他代码 ...
    
    // 在每次迭代后等待线程池中的任务完成
    executor.shutdown();
    executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
    
    // ... 省略其他代码 ...
}
  1. 共享状态的重置:在每次迭代中,你应该重置共享状态,以确保每次迭代都是从初始状态开始的。可以在循环的开头添加以下代码:
sharedState.set(0);

这将确保在每次迭代开始时,sharedState 的值都被重置为零。

通过以上两个修复,你的测试应该能够在100%的执行中通过了。

英文:

I am trying to write a test, in which I want to demonstrate the difference between fair and unfair Reentrant locks.
The test uses ThreadPoolExecutor and consists of multiple iterations, each of which has the following steps:

  1. Create a fair lock to test and semaphore with 1 permit to manage the time of releasing the lock.
  2. Acquire the semaphore.
  3. Submit a task, which acquires the lock, and waits for semaphore to release.
  4. Submit multiple "enumerated" tasks, each of which tries to acquire the lock and then updates the shared AtomicInteger state.
  5. Release the semaphore and wait for all tasks to finish.

So for the fair lock the final value of shared state have to be equal to the index of the last task.
But the test fails in ~ 50% of all executions.

My code looks like this:

    @Test
    void should_be_fair() throws InterruptedException, ExecutionException {
        int iterationsCount = 100;
        int waitingThreadsCount = 5;

        ReentrantLock lock = new ReentrantLock(true);
        Semaphore unlockingSemaphore = new Semaphore(1);
        boolean wasAnyThreadUnfair = false;

        for (int i = 0; i &lt; iterationsCount; i++) {
            unlockingSemaphore.acquire();
            ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(waitingThreadsCount + 1);
            Future&lt;?&gt; lockingFuture = executor.submit(() -&gt; {
                try {
                    lock.lock();
                    unlockingSemaphore.acquire();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                } finally {
                    unlockingSemaphore.release();
                    lock.unlock();
                }
            });
            AtomicInteger sharedState = new AtomicInteger();
            List&lt;Future&lt;Integer&gt;&gt; futures = IntStream.rangeClosed(1, waitingThreadsCount)
                    .sequential()
                    .mapToObj(j -&gt; executor.submit(() -&gt; {
                        try {
                            lock.lock();
                            System.out.println(&quot;Acquiring lock for j=&quot; + j);
                            return sharedState.updateAndGet((k) -&gt; j);
                        } finally {
                            lock.unlock();
                        }
                    }))
                    .toList();
            unlockingSemaphore.release();
            lockingFuture.get();
            futures.forEach(f -&gt; {
                try {
                    f.get();
                } catch (InterruptedException | ExecutionException e) {
                    throw new RuntimeException(e);
                }
            });
            executor.shutdown();
            System.out.println(&quot;Ended &quot; + i + &quot;-th cycle with the last index=&quot; + sharedState.get());
            if (sharedState.get() != waitingThreadsCount) {
                wasAnyThreadUnfair = true;
                break;
            }
        }

        Assertions.assertThat(wasAnyThreadUnfair).isFalse();
    }

Question:

What's the problem with this test? What can I fix to get the test passed in 100% of executions?

答案1

得分: 3

问题出在任务获取锁的顺序上。提交的顺序不能保证与开始的顺序相等。
所以我添加了使用Awaitility库等待每个任务被锁获取阻塞的用法。不幸的是,与不公平锁相关的测试不再通过,但这是另一个问题。

   @Test
    void should_be_fair() throws InterruptedException, ExecutionException {
        Assertions.assertThat(wasAnyThreadUnfair(new ReentrantLock(true), 100, 5)).isFalse();
    }

    @Test
    // TODO: 不再通过
    void should_be_unfair() throws InterruptedException, ExecutionException {
        Assertions.assertThat(wasAnyThreadUnfair(new ReentrantLock(false), 100, 5)).isTrue();
    }

    private boolean wasAnyThreadUnfair(ReentrantLock lock, int iterationsCount, int waitingThreadsCount)
            throws InterruptedException {
        Semaphore unlockingSemaphore = new Semaphore(1);
        boolean wasAnyThreadUnfair = false;

        for (int i = 0; i < iterationsCount; i++) {
            unlockingSemaphore.acquire();
            ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(waitingThreadsCount + 1);
            Future<?> lockingFuture = executor.submit(() -> {
                try {
                    lock.lock();
                    unlockingSemaphore.acquire();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                } finally {
                    unlockingSemaphore.release();
                    lock.unlock();
                }
            });
            AtomicInteger sharedState = new AtomicInteger();
            List<Future<Integer>> futures = IntStream.rangeClosed(1, waitingThreadsCount)
                    .mapToObj(j -> {
                        Future<Integer> submitted = executor.submit(() -> {
                            try {
                                lock.lock();
                                System.out.println("Acquiring lock for j=" + j);
                                return sharedState.updateAndGet((k) -> j);
                            } finally {
                                lock.unlock();
                            }
                        });
                        await().atMost(150, TimeUnit.MILLISECONDS).until(() -> lock.getQueueLength() == j);
                        return submitted;
                    })
                    .toList();
            unlockingSemaphore.release();
            executor.shutdown();
            executor.awaitTermination(1, TimeUnit.SECONDS);
            System.out.println("Ended " + i + "-th cycle with the last index=" + sharedState.get());
            if (sharedState.get() != waitingThreadsCount) {
                wasAnyThreadUnfair = true;
                break;
            }
        }

        return wasAnyThreadUnfair;
    }

希望对你有所帮助。

英文:

The problem was in the order of acquiring the lock by tasks. The order of submitting is not quaranteed to be equal to the order of starting.
So I added the usage of Awaitility library to wait until each task is blocked by lock acquiring. Unfortunately, the test related to unfair lock doesn't pass anymore, but it is another problem.

   @Test
void should_be_fair() throws InterruptedException, ExecutionException {
Assertions.assertThat(wasAnyThreadUnfair(new ReentrantLock(true), 100, 5)).isFalse();
}
@Test
// TODO: doesn&#39;t pass
void should_be_unfair() throws InterruptedException, ExecutionException {
Assertions.assertThat(wasAnyThreadUnfair(new ReentrantLock(false), 100, 5)).isTrue();
}
private boolean wasAnyThreadUnfair(ReentrantLock lock, int iterationsCount, int waitingThreadsCount)
throws InterruptedException {
Semaphore unlockingSemaphore = new Semaphore(1);
boolean wasAnyThreadUnfair = false;
for (int i = 0; i &lt; iterationsCount; i++) {
unlockingSemaphore.acquire();
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(waitingThreadsCount + 1);
Future&lt;?&gt; lockingFuture = executor.submit(() -&gt; {
try {
lock.lock();
unlockingSemaphore.acquire();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
unlockingSemaphore.release();
lock.unlock();
}
});
AtomicInteger sharedState = new AtomicInteger();
List&lt;Future&lt;Integer&gt;&gt; futures = IntStream.rangeClosed(1, waitingThreadsCount)
.mapToObj(j -&gt; {
Future&lt;Integer&gt; submitted = executor.submit(() -&gt; {
try {
lock.lock();
System.out.println(&quot;Acquiring lock for j=&quot; + j);
return sharedState.updateAndGet((k) -&gt; j);
} finally {
lock.unlock();
}
});
await().atMost(150, TimeUnit.MILLISECONDS).until(() -&gt; lock.getQueueLength() == j);
return submitted;
})
.toList();
unlockingSemaphore.release();
executor.shutdown();
executor.awaitTermination(1, TimeUnit.SECONDS);
System.out.println(&quot;Ended &quot; + i + &quot;-th cycle with the last index=&quot; + sharedState.get());
if (sharedState.get() != waitingThreadsCount) {
wasAnyThreadUnfair = true;
break;
}
}
return wasAnyThreadUnfair;
}

huangapple
  • 本文由 发表于 2023年7月13日 23:01:06
  • 转载请务必保留本文链接:https://go.coder-hub.com/76680854.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定