一个线程无法正确地从列表中移除自身。为什么?

huangapple go评论72阅读模式
英文:

A thread can't properly remove itself from a List. Why?

问题

以下是您提供的代码的翻译:

当我尝试熟悉一个新的库或概念时我通常会做一个小的玩具项目利用这些库或概念。`java.util.concurrent` 也不例外我只是开始学习那个库所以我做了一些简单的应用程序其中一些工作正常但这个它应该教我如何熟悉 `Semaphore`)表现不佳下面是完整的代码它不是最小的但由于我不知道哪部分可以安全地剪裁除了 `River`、`speed` 之外但它的代码也不多),所以我决定将其全部包含在内

```java
public record River(short width) {
}
public interface Status {
    String description();
}
public abstract class StatusReporter extends Thread {
    protected Status status;
    public StatusReporter(String name) {
        super(name);
    }

    public void setAndReportStatus(Status status) {
        this.status = status;
        System.out.println(getName() + ": " + status.description());
    }
}
public class SharedSynchronizers {
    static final Lock FAIR_LOCK = new ReentrantLock(true);
    static final Condition CONDITION = FAIR_LOCK.newCondition();
}
public class Boat {
    @Getter(AccessLevel.PACKAGE)
    private final byte capacity;
    @Getter
    private final byte speed;
    private final List<Passenger> passengersOnBoard;
    @Getter(AccessLevel.PACKAGE)
    private final Semaphore semaphore;
    private final Condition condition = SharedSynchronizers.CONDITION;

    private Boat(Builder builder) {
        capacity = builder.capacity;
        speed = builder.speed;
        passengersOnBoard = builder.passengersOnBoard;
        semaphore = builder.semaphore;
    }

    public static Builder withCapacity(byte capacity) {
        return new Builder(capacity);
    }

    public static class Builder {
        private final byte capacity;
        private byte speed;
        private final List<Passenger> passengersOnBoard;
        private final Semaphore semaphore;

        private Builder(byte capacity) {
            this.capacity = capacity;
            this.passengersOnBoard = new ArrayList<>(capacity);
            this.semaphore = new Semaphore(capacity, true);
        }

        public Boat andSpeed(byte speed) {
            this.speed = speed;
            return new Boat(this);
        }
    }

    void addPassenger(Passenger passenger) {
        try {
            while (semaphore.availablePermits() < 1) {
                condition.await(); // so that a thread releases a lock if it can't acquire a permit
            }
            semaphore.acquire();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        passengersOnBoard.add(passenger);
    }

    void removePassenger(Passenger passenger) {
        passengersOnBoard.remove(passenger);
    }

    boolean hasPassengers() {
        return !passengersOnBoard.isEmpty();
    }

    boolean isFull() {
        return passengersOnBoard.size() == capacity;
    }
}
public class Passenger extends StatusReporter {
    @Setter(AccessLevel.PACKAGE)
    private BoatKeeper boatKeeper;
    private static final short EMBARK_DISEMBARK_TIME = 500;
    private final Lock lock = SharedSynchronizers.FAIR_LOCK;
    private final Condition condition = SharedSynchronizers.CONDITION;

    public Passenger(String name) {
        super(name);
    }

    @Override
    public void run() {
        tryToGetOnBoat();
        stayOnBoat();
        disembark();
    }

    private void tryToGetOnBoat() {
        setAndReportStatus(PassengerStatus.GOING_TO_EMBARK);
        try {
            Thread.sleep(EMBARK_DISEMBARK_TIME);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        lock.lock();
        boatKeeper.getBoat().addPassenger(this);
        boatKeeper.getPassengersToFerry().remove(this);
        setAndReportStatus(PassengerStatus.EMBARKED_THE_BOAT);
        condition.signalAll();
        lock.unlock();
    }

    private void stayOnBoat() {
        try {
            lock.lock();
            while (!boatKeeper.isRowing()) {
                condition.await();
            }
            lock.unlock();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        setAndReportStatus(PassengerStatus.CROSSING_THE_RIVER);
    }

    private void disembark() {
        try {
            lock.lock();
            while (!boatKeeper.isDisembarking()) {
                condition.await();
            }
            lock.unlock();
            Thread.sleep(EMBARK_DISEMBARK_TIME);
            boatKeeper.getBoat().removePassenger(this);
            lock.lock();
            condition.signalAll();
            lock.unlock();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        setAndReportStatus(PassengerStatus.ON_THE_OTHER_BANK);
    }

    @RequiredArgsConstructor
    enum PassengerStatus implements Status {
        GOING_TO_EMBARK("going to embark the boat..."),
        EMBARKED_THE_BOAT("embarked the boat!"),
        WAITING_FOR_OTHER_PASSENGERS("waiting for other passengers to join..."),
        CROSSING_THE_RIVER("crossing the river..."),
        ON_THE_OTHER_BANK("on the other bank!");

        private final String description;

        @Override
        public String description() {
            return description;
        }
    }
}
public class BoatKeeper extends StatusReporter {
    @Getter(AccessLevel.PACKAGE)
    private final Boat boat;
    private int timeToCrossRiver;
    @Getter(AccessLevel.PACKAGE)
    private final List<Passenger> passengersToFerry = new ArrayList<>();
    private final Lock lock = SharedSynchronizers.FAIR_LOCK;
    private final Condition condition = SharedSynchronizers.CONDITION;

    public BoatKeeper(Boat boat) {
        super("Boat keeper");
        this.boat = boat;
    }

    @Override
    public void run() {
        performFerrying();
    }

    public BoatKeeper ferry(Passenger... passengers) {
        Collections.addAll(passengersToFerry, passengers);
        return this;
    }

    public void acrossRiver(River river) {
        timeToCrossRiver = river.width() / boat.getSpeed();
        start();
    }

    private void performFerrying() {
        invitePassengersToEmbark();
        while (hasFerryingToDo()) {
            waitTillBoatReadyToGo();
            crossRiver();
            disembarkPassengers();
            rowBack();
        }
        setAndReportStatus(BoatKeeperStatus.DONE);
    }

    private void invitePassengersToEmbark() {
        passengersToFerry.forEach(p -> {
            p.setBoatKeeper(this);
            p.start();
        });
    }

    private void waitTillBoatReadyToGo() {


<details>
<summary>英文:</summary>

When I try to get accustomed with a new library or concept, I typically do a small toy project that utilizes those libraries or concepts. `java.util.concurrent` is no different. I only start to learn that library so I made a few simple apps. Some of them work, but this one (it was supposed to teach me to get comfortable with `Semaphore`) misbehaves. Here&#39;s the complete code. It&#39;s not minimal, but since I don&#39;t know which part I can safely cut (except for `River`, `speed`, but it&#39;s not much code anyway), I decided to include it in its entirety
```java
public record River(short width) {
}
public interface Status {
    String description();
}
public abstract class StatusReporter extends Thread {
    protected Status status;
    public StatusReporter(String name) {
        super(name);
    }

    public void setAndReportStatus(Status status) {
        this.status = status;
        System.out.println(getName() + &quot;: &quot; + status.description());
    }
}
public class SharedSynchronizers {
    static final Lock FAIR_LOCK = new ReentrantLock(true);
    static final Condition CONDITION = FAIR_LOCK.newCondition();
}
public class Boat {
    @Getter(AccessLevel.PACKAGE)
    private final byte capacity;
    @Getter
    private final byte speed;
    private final List&lt;Passenger&gt; passengersOnBoard;
    @Getter(AccessLevel.PACKAGE)
    private final Semaphore semaphore;
    private final Condition condition = SharedSynchronizers.CONDITION;

    private Boat(Builder builder) {
        capacity = builder.capacity;
        speed = builder.speed;
        passengersOnBoard = builder.passengersOnBoard;
        semaphore = builder.semaphore;
    }

    public static Builder withCapacity(byte capacity) {
        return new Builder(capacity);
    }

    public static class Builder {
        private final byte capacity;
        private byte speed;
        private final List&lt;Passenger&gt; passengersOnBoard;
        private final Semaphore semaphore;

        private Builder(byte capacity) {
            this.capacity = capacity;
            this.passengersOnBoard = new ArrayList&lt;&gt;(capacity);
            this.semaphore = new Semaphore(capacity, true);
        }

        public Boat andSpeed(byte speed) {
            this.speed = speed;
            return new Boat(this);
        }
    }

    void addPassenger(Passenger passenger) {
        try {
            while (semaphore.availablePermits() &lt; 1) {
                condition.await(); // so that a thread releases a lock if it can&#39;t acquire a permit
            }
            semaphore.acquire();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        passengersOnBoard.add(passenger);
    }

    void removePassenger(Passenger passenger) {
        passengersOnBoard.remove(passenger);
    }

    boolean hasPassengers() {
        return !passengersOnBoard.isEmpty();
    }

    boolean isFull() {
        return passengersOnBoard.size() == capacity;
    }
}
public class Passenger extends StatusReporter {
    @Setter(AccessLevel.PACKAGE)
    private BoatKeeper boatKeeper;
    private static final short EMBARK_DISEMBARK_TIME = 500;
    private final Lock lock = SharedSynchronizers.FAIR_LOCK;
    private final Condition condition = SharedSynchronizers.CONDITION;

    public Passenger(String name) {
        super(name);
    }

    @Override
    public void run() {
        tryToGetOnBoat();
        stayOnBoat();
        disembark();
    }

    private void tryToGetOnBoat() {
        setAndReportStatus(PassengerStatus.GOING_TO_EMBARK);
        try {
            Thread.sleep(EMBARK_DISEMBARK_TIME);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        lock.lock();
        boatKeeper.getBoat().addPassenger(this);
        boatKeeper.getPassengersToFerry().remove(this);
        setAndReportStatus(PassengerStatus.EMBARKED_THE_BOAT);
        condition.signalAll();
        lock.unlock();
    }

    private void stayOnBoat() {
        try {
            lock.lock();
            while (!boatKeeper.isRowing()) {
                condition.await();
            }
            lock.unlock();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        setAndReportStatus(PassengerStatus.CROSSING_THE_RIVER);
    }

    private void disembark() {
        try {
            lock.lock();
            while (!boatKeeper.isDisembarking()) {
                condition.await();
            }
            lock.unlock();
            Thread.sleep(EMBARK_DISEMBARK_TIME);
            boatKeeper.getBoat().removePassenger(this);
            lock.lock();
            condition.signalAll();
            lock.unlock();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        setAndReportStatus(PassengerStatus.ON_THE_OTHER_BANK);
    }

    @RequiredArgsConstructor
    enum PassengerStatus implements Status {
        GOING_TO_EMBARK(&quot;going to embark the boat...&quot;),
        EMBARKED_THE_BOAT(&quot;embarked the boat!&quot;),
        WAITING_FOR_OTHER_PASSENGERS(&quot;waiting for other passengers to join...&quot;),
        CROSSING_THE_RIVER(&quot;crossing the river...&quot;),
        ON_THE_OTHER_BANK(&quot;on the other bank!&quot;);

        private final String description;

        @Override
        public String description() {
            return description;
        }
    }
}
public class BoatKeeper extends StatusReporter {
    @Getter(AccessLevel.PACKAGE)
    private final Boat boat;
    private int timeToCrossRiver;
    @Getter(AccessLevel.PACKAGE)
    private final List&lt;Passenger&gt; passengersToFerry = new ArrayList&lt;&gt;();
    private final Lock lock = SharedSynchronizers.FAIR_LOCK;
    private final Condition condition = SharedSynchronizers.CONDITION;

    public BoatKeeper(Boat boat) {
        super(&quot;Boat keeper&quot;);
        this.boat = boat;
    }

    @Override
    public void run() {
        performFerrying();
    }

    public BoatKeeper ferry(Passenger... passengers) {
        Collections.addAll(passengersToFerry, passengers);
        return this;
    }

    public void acrossRiver(River river) {
        timeToCrossRiver = river.width() / boat.getSpeed();
        start();
    }

    private void performFerrying() {
        invitePassengersToEmbark();
        while (hasFerryingToDo()) {
            waitTillBoatReadyToGo();
            crossRiver();
            disembarkPassengers();
            rowBack();
        }
        setAndReportStatus(BoatKeeperStatus.DONE);
    }

    private void invitePassengersToEmbark() {
        passengersToFerry.forEach(p -&gt; {
            p.setBoatKeeper(this);
            p.start();
        });
    }

    private void waitTillBoatReadyToGo() {
        if (!isBoardingComplete()) {
            setAndReportStatus(BoatKeeperStatus.WAITING_FOR_PASSENGERS_TO_EMBARK);
        }
        try {
            lock.lock();
            while (!isBoardingComplete()) {
                condition.await();
            }
            lock.unlock();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private void crossRiver() {
        setAndReportStatus(BoatKeeperStatus.FERRYING_PASSENGERS);
        lock.lock();
        condition.signalAll();
        lock.unlock();
        try {
            Thread.sleep(timeToCrossRiver);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private void disembarkPassengers() {
        setAndReportStatus(BoatKeeperStatus.DISEMBARKING_PASSENGERS);
        lock.lock();
        condition.signalAll();
        try {
            while (boat.hasPassengers()) {
                condition.await();
            }
            lock.unlock();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private void rowBack() {
        setAndReportStatus(BoatKeeperStatus.RETURNING);
        try {
            Thread.sleep(timeToCrossRiver);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        lock.lock();
        /*
        doesn&#39;t matter if the boat keeper releases more permits than 
        the number of passengers it ferried: if it&#39;s less than the boat&#39;s
        max capacity, then it was the last trip anyway
        */
        boat.getSemaphore().release(boat.getCapacity());
        condition.signalAll();
        lock.unlock();
    }

    private boolean hasFerryingToDo() {
        return !passengersToFerry.isEmpty() || boat.hasPassengers();
    }

    private boolean isBoardingComplete() {
        return boat.isFull() ||
                (boat.hasPassengers() &amp;&amp; passengersToFerry.isEmpty());
    }

    boolean isRowing() {
        return status == BoatKeeperStatus.FERRYING_PASSENGERS;
    }

    boolean isDisembarking() {
        return status == BoatKeeperStatus.DISEMBARKING_PASSENGERS;
    }

    @RequiredArgsConstructor
    enum BoatKeeperStatus implements Status {
        WAITING_FOR_PASSENGERS_TO_EMBARK(&quot;waiting for passengers to embark...&quot;),
        FERRYING_PASSENGERS(&quot;ferrying passengers across the river...&quot;),
        DISEMBARKING_PASSENGERS(&quot;disembarking passengers...&quot;),
        RETURNING(&quot;returning...&quot;),
        DONE(&quot;all passengers are ferried!&quot;);
        private final String description;

        @Override
        public String description() {
            return description;
        }
    }
}

Client code:

public class App {
    public static void main(String[] args) {
        River river = new River((short) 2800);
        Boat tinyBoat = Boat.withCapacity((byte) 2).andSpeed((byte) 10);
        BoatKeeper boatKeeper = new BoatKeeper(tinyBoat);
        Passenger[] passengers = {
                new Passenger(&quot;Frank&quot;), new Passenger(&quot;Liz&quot;), new Passenger(&quot;Gabriel&quot;)
        };
        boatKeeper.ferry(passengers).acrossRiver(river);
    }
}

Behavior varies. Whether you run it with or without the IntelliJ IDEA debugger seems to have some effect, but it's not a fail-proof predictor either

Scenario #1: Ghost passenger (typical without the debugger, unless it's attached after the freeze)

The passenger threads disembark and finish, but the boat still has one null element making the boat keeper wait indefinitely and not row back.

Boat keeper: waiting for passengers to embark...
Frank: going to embark the boat...
Liz: going to embark the boat...
Gabriel: going to embark the boat...
Frank: embarked the boat!
Liz: embarked the boat!
Boat keeper: ferrying passengers across the river...
Liz: crossing the river...
Frank: crossing the river...
Boat keeper: disembarking passengers...
Liz: on the other bank!
Frank: on the other bank!
// freezes!

screenshot showing that there's some null passenger on the boat

Scenario #2: Everything is fine (typically with the debugger, rarely without)

No issues observed

Gabriel: going to embark the boat...
Liz: going to embark the boat...
Frank: going to embark the boat...
Boat keeper: waiting for passengers to embark...
Frank: embarked the boat!
Liz: embarked the boat!
Boat keeper: ferrying passengers across the river...
Frank: crossing the river...
Liz: crossing the river...
Boat keeper: disembarking passengers...
Frank: on the other bank!
Liz: on the other bank!
Boat keeper: returning...
Gabriel: embarked the boat!
Boat keeper: ferrying passengers across the river...
Gabriel: crossing the river...
Boat keeper: disembarking passengers...
Gabriel: on the other bank!
Boat keeper: returning...
Boat keeper: all passengers are ferried!
Process finished with exit code 0

Since I can't reproduce the issue while debugging the program, I'm completely at a loss. Why do those freezes happen? How do I fix it so that I get the second scenario all the time?

I debugged thoroughly, spent a lot of hours trying to pinpoint the issue. I don't see any other choice but to ask for your help

答案1

得分: 0

注: 这不是我的回答,而是Louis Wasserman在评论中提供的答案(显然,他不想将其发布为答案)。

乍一看,这似乎是一个标准的线程安全问题,由尝试在没有任何锁定的情况下并发修改非线程安全集合引起的。对诸如ArrayList的修改需要在锁定下进行。

–Louis Wasserman

确实,将disembark()方法的这部分更改为:

lock.lock();
boatKeeper.getBoat().removePassenger(this);
condition.signalAll();
lock.unlock();

有所帮助。谢谢!

英文:

Note. It's not my answer but rather the answer Louis Wasserman gave in the comments (apparently, he doesn't want to post it as an answer).

> At a glance, this looks like a standard thread safety issue caused by trying to concurrently modify a non-thread-safe collection without any sort of locking. Modifications to things like ArrayLists need to be done under a lock.
>
> –Louis Wasserman

Indeed, changing this part of disembark() method

            boatKeeper.getBoat().removePassenger(this);
            lock.lock();
            condition.signalAll();
            lock.unlock();

to this

            lock.lock();
            boatKeeper.getBoat().removePassenger(this);
            condition.signalAll();
            lock.unlock();

helped. Thank you!

huangapple
  • 本文由 发表于 2023年7月7日 03:41:53
  • 转载请务必保留本文链接:https://go.coder-hub.com/76632091.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定