ReentrantLock 线程随机终止。

huangapple go评论78阅读模式
英文:

ReentrantLock threads terminating randomly

问题

I have been working on a school assignment which is about multithreading in Java. One of the tasks that I am stuck on is that we need to create multiple threads in different groups, and once there are 4 threads in each group, only then they can be released to work in unison, otherwise they have to be put on hold/waiting. For example:

  • Thread a,b,c joins group 7, they are all put on hold/waiting.
  • Thread d joins group 7, all four threads (a,b,c,d) are signaled to be terminated.
  • Thread e,f,g,h,i joins group 8, in this case e,f,g,h will be signalled to be terminated while thread i is put on waiting.
  • Thread j joins group 7, it is put on for waiting.

That is the general task which I'm done with. The task I am working on requires us to release the INITIAL first 4 threads of a group, and the rest should wait until 4 of the previous threads have called finished().

For example, 3 threads join group 65, they are put on wait. Another thread joins group 65 and all 4 threads are released together. Now 4 threads are working (terminated). Now thread e,f,g,h,i,j,k,l join group 65. All of them are put to wait until e,f,g,h have called finished() method.

Here is what I have done so far:

ExtrinsicSync.java:

import java.util.HashMap;
import java.util.concurrent.locks.ReentrantLock;

public class ExtrinsicSync {

    private HashMap<Integer, ConditionWrapper> groupThreadCount;
    private ReentrantLock monitor;
    private int count = 0;

    ExtrinsicSync() {
        groupThreadCount = new HashMap<>();
        monitor = new ReentrantLock();
    }

    public void waitForThreadsInGroup(int groupId) {
        monitor.lock();

        if (!groupThreadCount.containsKey(groupId))
            groupThreadCount.put(groupId, new ConditionWrapper(monitor.newCondition()));

        ConditionWrapper condWrapper = groupThreadCount.get(groupId);
        condWrapper.setValue(condWrapper.getValue() + 1);

        if(condWrapper.getValue() == 4 && condWrapper.getInitialStatus())
        {
            condWrapper.getCondition().signalAll();
            condWrapper.setInitialStatus(false);

            System.out.println("Terminating group: " + groupId + " FROM INITIAL STATE: " + ++count);
        } else {
            System.out.println("Putting thread from group: " + groupId + " on wait: " + ++waitcount);
            try { condWrapper.getCondition().await(); }
            catch (InterruptedException e) { e.printStackTrace(); }

        }

        monitor.unlock();
    }

    public void finished(int groupId) {
        monitor.lock();
        ConditionWrapper condWrapper = groupThreadCount.get(groupId);

        if(!condWrapper.getInitialStatus())
        {
            condWrapper.setFinishedCount(condWrapper.getFinishedCount() + 1);
            System.out.println("Group: " + groupId + " FINISHED COUNT: " + condWrapper.getFinishedCount());
            if(condWrapper.getFinishedCount() == 4)
            {
                condWrapper.setFinishedCount(0);
                condWrapper.getCondition().signalAll();
                System.out.println("Terminating threads for group: " + groupId + ": " + ++count);
            }
        }
        monitor.unlock();
    }
}

ExtrinsicSyncTest.java:

import org.junit.Test;

import java.util.EnumMap;

class TestTask1 implements Runnable{

    final int group;
    final ExtrinsicSync s1;

    TestTask1(int group, ExtrinsicSync s1)
    {
        this.group = group;
        this.s1 = s1;
    }

    public void run() { s1.waitForThreadsInGroup(group); s1.finished(group); }
}

public class ExtrinsicSyncTest {

    @Test
    public void testPhaseThreethreads() {

        int nThreads = 22;

        Thread t[] = new Thread[nThreads];
        final ExtrinsicSync s1 = new ExtrinsicSync();

        for(int i = 0; i < nThreads/2; i++)
            (t[i] = new Thread(new TestTask1(66, s1))).start();

        for(int i = nThreads/2; i < nThreads; i++)
            (t[i] = new Thread(new TestTask1(70, s1))).start();

        for (Thread ti : t)
        {
            try { ti.join(100); }
            catch (Exception e) { System.out.println(e); }
        }

        EnumMap<Thread.State, Integer> threadsInThisState = new EnumMap<>(Thread.State.class);

        for (Thread.State s : Thread.State.values())
            threadsInThisState.put(s, 0);

        for (Thread ti : t)
        {
            Thread.State state = ti.getState();
            int n = threadsInThisState.get(state);
            threadsInThisState.put(state, n + 1);
        }

        System.out.println("threadsInThisState: " + threadsInThisState.toString() );

    }
}

ConditionWrapper.java:

import java.util.concurrent.locks.Condition;

public class ConditionWrapper {
    private Condition cond;
    private Integer value;
    private Integer finishedCount;
    private boolean initialThreads;

    public ConditionWrapper(Condition condition)
    {
        this.cond = condition;
        this.value = 0;
        this.finishedCount = 0;
        this.initialThreads = true;
    }
    // Returns the condition object of current request
    public Condition getCondition()
    {
        return this.cond;
    }
    // Gets the current counter of threads waiting in this queue.
    public Integer getValue()
    {
        return this.value;
    }
    // Sets the given value. Used for resetting the counter.
    public void setValue(int value) { this.value = value; }
    // Sets the counter to help keep track of threads which called finished() method
    public void setFinishedCount(int count) { this.finishedCount = count; }
    // Gets the finished count.
    public Integer getFinishedCount() { return this.finishedCount; }
    // This flag is to identify initial threads of a group
    public boolean getInitialStatus() { return initialThreads; }
    public void setInitialStatus(boolean val) { this.initialThreads = val; }
}

The problem I am having is that I am able to release the first four threads of every group, but somehow, somewhere 2 threads are being terminated randomly and I cannot figure out what is going on. For example, with 22 threads test case above divided into two groups, only 8 threads should be terminated while the rest of them wait.

But here 10 threads are being terminated instead. I do not understand what is going on. I have stripped the code down to bare minimum as best as I could.

英文:

I have been working on a school assignment which is about multithreading in Java. One of the tasks that I am stuck on is that we need to create multiple threads in different groups, and once there are 4 threads in each group, only then they can be released to work in unison, otherwise they have to be put on hold/waiting. For example:

  • Thread a,b,c joins group 7, they are all put on hold/waiting.
  • Thread d joins group 7, all four threads (a,b,c,d) are signaled to be terminated.
  • Thread e,f,g,h,i joins group 8, in this case e,f,g,h will be signalled to be terminated while thread i is put on waiting.
  • Thread j joins group 7, it is put on for waiting.

That is the general task which I'm done with. The task I am working on requires us to release the INITIAL first 4 threads of a group, and the rest should wait until 4 of the previous threads have called finished().

For example, 3 threads join group 65, they are put on wait. Another thread joins group 65 and all 4 threads are released together. Now 4 threads are working (terminated). Now thread e,f,g,h,i,j,k,l join group 65. All of them are put to wait until e,f,g,h have called finished() method.

Here is what I have done so far:

ExtrinsicSync.java:

import java.util.HashMap;
import java.util.concurrent.locks.ReentrantLock;
public class ExtrinsicSync {
private HashMap&lt;Integer, ConditionWrapper&gt; groupThreadCount;
private ReentrantLock monitor;
private int count = 0;
ExtrinsicSync() {
groupThreadCount = new HashMap&lt;&gt;();
monitor = new ReentrantLock();
}
@Override
public void waitForThreadsInGroup(int groupId) {
monitor.lock();
if (!groupThreadCount.containsKey(groupId))
groupThreadCount.put(groupId, new ConditionWrapper(monitor.newCondition()));
ConditionWrapper condWrapper = groupThreadCount.get(groupId);
condWrapper.setValue(condWrapper.getValue() + 1);
if(condWrapper.getValue() == 4 &amp;&amp; condWrapper.getInitialStatus())
{
condWrapper.getCondition().signalAll();
condWrapper.setInitialStatus(false);
System.out.println(&quot;Terminating group: &quot; + groupId + &quot;FROM INITIAL STATE: &quot; + ++count);
} else {
System.out.println(&quot;Putting thread from group: &quot; + groupId + &quot; on wait: &quot; + ++waitcount);
try { condWrapper.getCondition().await(); }
catch (InterruptedException e) { e.printStackTrace(); }
}
monitor.unlock();
}
@Override
public void finished(int groupId) {
monitor.lock();
ConditionWrapper condWrapper = groupThreadCount.get(groupId);
if(!condWrapper.getInitialStatus())
{
condWrapper.setFinishedCount(condWrapper.getFinishedCount() + 1);
System.out.println(&quot;Group: &quot; + groupId + &quot;FINISHED COUNT: &quot; + condWrapper.getFinishedCount());
if(condWrapper.getFinishedCount() == 4)
{
condWrapper.setFinishedCount(0);
condWrapper.getCondition().signalAll();
System.out.println(&quot;Terminating threads for group: &quot; + groupId + &quot;: &quot; + ++count);
}
}
monitor.unlock();
}

ExtrinsicSyncTest.java:

import org.junit.Test;
import java.util.EnumMap;
class TestTask1 implements Runnable{
final int group;
final ExtrinsicSync s1;
TestTask1(int group, ExtrinsicSync s1)
{
this.group = group;
this.s1 = s1;
}
public void run() { s1.waitForThreadsInGroup(group); s1.finished(group); }
}
public class ExtrinsicSyncTest {
@Test
public void testPhaseThreethreads() {
int nThreads = 22;
Thread t[] = new Thread[nThreads];
final ExtrinsicSync s1 = new ExtrinsicSync();
for(int i = 0; i &lt; nThreads/2; i++)
(t[i] = new Thread(new TestTask1(66, s1))).start();
for(int i = nThreads/2; i &lt; nThreads; i++)
(t[i] = new Thread(new TestTask1(70, s1))).start();
for (Thread ti : t)
{
try { ti.join(100); }
catch (Exception e) { System.out.println(e); }
}
EnumMap&lt;Thread.State, Integer&gt; threadsInThisState = new EnumMap&lt;&gt;(Thread.State.class);
for (Thread.State s : Thread.State.values())
threadsInThisState.put(s, 0);
for (Thread ti : t)
{
Thread.State state = ti.getState();
int n = threadsInThisState.get(state);
threadsInThisState.put(state, n + 1);
}
System.out.println(&quot;threadsInThisState: &quot; + threadsInThisState.toString() );
}
}

ConditionWrapper.java:

import java.util.concurrent.locks.Condition;
public class ConditionWrapper {
private Condition cond;
private Integer value;
private Integer finishedCount;
private boolean initialThreads;
public ConditionWrapper(Condition condition)
{
this.cond = condition;
this.value = 0;
this.finishedCount = 0;
this.initialThreads = true;
}
// Returns the condition object of current request
public Condition getCondition()
{
return this.cond;
}
// Gets the current counter of threads waiting in this queue.
public Integer getValue()
{
return this.value;
}
// Sets the given value. Used for resetting the counter.
public void setValue(int value) { this.value = value; }
// Sets the counter to help keep track of threads which called finished() method
public void setFinishedCount(int count) { this.finishedCount = count; }
// Gets the finished count.
public Integer getFinishedCount() { return this.finishedCount; }
// This flag is to identify initial threads of a group
public boolean getInitialStatus() { return initialThreads; }
public void setInitialStatus(boolean val) { this.initialThreads = val; }
}

The problem I am having is that I am able to release the first four threads of every group, but somehow, somewhere 2 threads are being terminated randomly and I cannot figure out what is going on. For example, with 22 threads test case above divided into two groups, only 8 threads should be terminated while the rest of them wait.

But here 10 threads are being terminated instead. I do not understand what is going on. I have stripped the code down to bare minimum as best as I could.

答案1

得分: 2

问题在于对于非初始线程(getInitialStatus == false),您没有通知其他线程,但当达到四个线程时仍然终止了它们。所以情况是这样的:

  1. 前三个线程增加计数并等待。
  2. 第四个线程达到计数为4,并将initial设置为false,通知所有其他线程并将计数设置为零。
  3. 接下来的三个线程将计数增加一。
  4. 这8个线程达到计数为4并被终止。由于getInitialStatus == false,此线程不会通知其他线程。

因此,4 * 2个线程 + 2个线程被终止。正好是您在测试中看到的计数。

以下是一种可能的实现方式:

  1. 在每个线程或任务中使用一个标志canExecute。
  2. 使用方法calculateState来计算当前状态,并在允许执行线程时将标志设置为true。
  3. 将所有等待的线程存储在列表或类似的数据结构中。

因此,您的任务将如下所示:

任务
boolean canExecute

然后,waitForThreadsInGroup方法如下所示:

waitForThreadsInGroup
monitor.lock();
将任务添加到列表
计算任务状态
condition.notifyAll
while( ! task.canExecute )
{
condition.await.
}
monitor.unlock();

finish方法类似:

  finish
monitor.lock();
减少完成计数
计算任务状态
condition.notifyAll
monitor.unlock();

还有calculateTaskState方法:

calculateTaskState
if( finishCount == 0)
{
if( taskList.size >= 4  )
{
将此列表中的4个任务设置为可以执行,并将它们从列表中移除。
}
}

所以关键是将逻辑分成三个步骤:

  1. 动作,例如减少完成计数
  2. 计算新状态。并为每个线程决定是否允许执行
  3. 等待线程。每个线程都需要等待自己的标志。
英文:

The problem is that for the not initial threads (getInitialStatus==false) you do not signal the other threads but you still terminate them when you reached four of them. So this is what happens:

  1. first three threads increase the count and wait
  2. the fourth thread reaches count == 4 and sets initial = false and signals all the other threads and sets the count to zero
  3. the next three threads increase the count by one
  4. the 8 threads reaches count == 4 and gets terminated. Since getInitialStatus==false this thread does not notify the other threads.

so 4*2 threads + 2 threads get terminated. Exactly the count you have seen in your tests.


Here is a potential way to implement this:

  1. use a flag canExecute in each thread or task
  2. use a method calculateState to calculate the current state and set the flag to true if a thread is allowed to execute.
  3. store all threads which are waiting in a list or something similar

So your task would look like this:

Task
boolean canExeute

The method waitForThreadsInGroup then lookslike this:

waitForThreadsInGroup
monitor.lock();
add task to list
calculateTaskState
condition.notifyAll
while( ! task.canExcecute )
{
condition.await.
}
monitor.unlock();

The finish method looks similar:

  finish
monitor.lock();
decrement finish count
calculateTaskState
condition.notifyAll
monitor.unlock();

And calculateTaskState

calculateTaskState
if( finishCount == 0)
{
if( taskList.size &gt;= 4  )
{
set 4 tasks in this list to can execute and remove them from the list
}
}

So the trick is to separate the logic into three steps:

  1. the action, for example reducing the finish count
  2. the calculation of the new state. And deciding for each thread if it is allowed to execute
  3. And the waiting of the threads. Each thread needs to wait on its own flag

huangapple
  • 本文由 发表于 2020年4月6日 14:57:37
  • 转载请务必保留本文链接:https://go.coder-hub.com/61054474.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定