英文:
A thread can't properly remove itself from a List. Why?
问题
以下是您提供的代码的翻译:
当我尝试熟悉一个新的库或概念时,我通常会做一个小的玩具项目,利用这些库或概念。`java.util.concurrent` 也不例外。我只是开始学习那个库,所以我做了一些简单的应用程序。其中一些工作正常,但这个(它应该教我如何熟悉 `Semaphore`)表现不佳。下面是完整的代码。它不是最小的,但由于我不知道哪部分可以安全地剪裁(除了 `River`、`speed` 之外,但它的代码也不多),所以我决定将其全部包含在内。
```java
public record River(short width) {
}
public interface Status {
String description();
}
public abstract class StatusReporter extends Thread {
protected Status status;
public StatusReporter(String name) {
super(name);
}
public void setAndReportStatus(Status status) {
this.status = status;
System.out.println(getName() + ": " + status.description());
}
}
public class SharedSynchronizers {
static final Lock FAIR_LOCK = new ReentrantLock(true);
static final Condition CONDITION = FAIR_LOCK.newCondition();
}
public class Boat {
@Getter(AccessLevel.PACKAGE)
private final byte capacity;
@Getter
private final byte speed;
private final List<Passenger> passengersOnBoard;
@Getter(AccessLevel.PACKAGE)
private final Semaphore semaphore;
private final Condition condition = SharedSynchronizers.CONDITION;
private Boat(Builder builder) {
capacity = builder.capacity;
speed = builder.speed;
passengersOnBoard = builder.passengersOnBoard;
semaphore = builder.semaphore;
}
public static Builder withCapacity(byte capacity) {
return new Builder(capacity);
}
public static class Builder {
private final byte capacity;
private byte speed;
private final List<Passenger> passengersOnBoard;
private final Semaphore semaphore;
private Builder(byte capacity) {
this.capacity = capacity;
this.passengersOnBoard = new ArrayList<>(capacity);
this.semaphore = new Semaphore(capacity, true);
}
public Boat andSpeed(byte speed) {
this.speed = speed;
return new Boat(this);
}
}
void addPassenger(Passenger passenger) {
try {
while (semaphore.availablePermits() < 1) {
condition.await(); // so that a thread releases a lock if it can't acquire a permit
}
semaphore.acquire();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
passengersOnBoard.add(passenger);
}
void removePassenger(Passenger passenger) {
passengersOnBoard.remove(passenger);
}
boolean hasPassengers() {
return !passengersOnBoard.isEmpty();
}
boolean isFull() {
return passengersOnBoard.size() == capacity;
}
}
public class Passenger extends StatusReporter {
@Setter(AccessLevel.PACKAGE)
private BoatKeeper boatKeeper;
private static final short EMBARK_DISEMBARK_TIME = 500;
private final Lock lock = SharedSynchronizers.FAIR_LOCK;
private final Condition condition = SharedSynchronizers.CONDITION;
public Passenger(String name) {
super(name);
}
@Override
public void run() {
tryToGetOnBoat();
stayOnBoat();
disembark();
}
private void tryToGetOnBoat() {
setAndReportStatus(PassengerStatus.GOING_TO_EMBARK);
try {
Thread.sleep(EMBARK_DISEMBARK_TIME);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
lock.lock();
boatKeeper.getBoat().addPassenger(this);
boatKeeper.getPassengersToFerry().remove(this);
setAndReportStatus(PassengerStatus.EMBARKED_THE_BOAT);
condition.signalAll();
lock.unlock();
}
private void stayOnBoat() {
try {
lock.lock();
while (!boatKeeper.isRowing()) {
condition.await();
}
lock.unlock();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
setAndReportStatus(PassengerStatus.CROSSING_THE_RIVER);
}
private void disembark() {
try {
lock.lock();
while (!boatKeeper.isDisembarking()) {
condition.await();
}
lock.unlock();
Thread.sleep(EMBARK_DISEMBARK_TIME);
boatKeeper.getBoat().removePassenger(this);
lock.lock();
condition.signalAll();
lock.unlock();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
setAndReportStatus(PassengerStatus.ON_THE_OTHER_BANK);
}
@RequiredArgsConstructor
enum PassengerStatus implements Status {
GOING_TO_EMBARK("going to embark the boat..."),
EMBARKED_THE_BOAT("embarked the boat!"),
WAITING_FOR_OTHER_PASSENGERS("waiting for other passengers to join..."),
CROSSING_THE_RIVER("crossing the river..."),
ON_THE_OTHER_BANK("on the other bank!");
private final String description;
@Override
public String description() {
return description;
}
}
}
public class BoatKeeper extends StatusReporter {
@Getter(AccessLevel.PACKAGE)
private final Boat boat;
private int timeToCrossRiver;
@Getter(AccessLevel.PACKAGE)
private final List<Passenger> passengersToFerry = new ArrayList<>();
private final Lock lock = SharedSynchronizers.FAIR_LOCK;
private final Condition condition = SharedSynchronizers.CONDITION;
public BoatKeeper(Boat boat) {
super("Boat keeper");
this.boat = boat;
}
@Override
public void run() {
performFerrying();
}
public BoatKeeper ferry(Passenger... passengers) {
Collections.addAll(passengersToFerry, passengers);
return this;
}
public void acrossRiver(River river) {
timeToCrossRiver = river.width() / boat.getSpeed();
start();
}
private void performFerrying() {
invitePassengersToEmbark();
while (hasFerryingToDo()) {
waitTillBoatReadyToGo();
crossRiver();
disembarkPassengers();
rowBack();
}
setAndReportStatus(BoatKeeperStatus.DONE);
}
private void invitePassengersToEmbark() {
passengersToFerry.forEach(p -> {
p.setBoatKeeper(this);
p.start();
});
}
private void waitTillBoatReadyToGo() {
<details>
<summary>英文:</summary>
When I try to get accustomed with a new library or concept, I typically do a small toy project that utilizes those libraries or concepts. `java.util.concurrent` is no different. I only start to learn that library so I made a few simple apps. Some of them work, but this one (it was supposed to teach me to get comfortable with `Semaphore`) misbehaves. Here's the complete code. It's not minimal, but since I don't know which part I can safely cut (except for `River`, `speed`, but it's not much code anyway), I decided to include it in its entirety
```java
public record River(short width) {
}
public interface Status {
String description();
}
public abstract class StatusReporter extends Thread {
protected Status status;
public StatusReporter(String name) {
super(name);
}
public void setAndReportStatus(Status status) {
this.status = status;
System.out.println(getName() + ": " + status.description());
}
}
public class SharedSynchronizers {
static final Lock FAIR_LOCK = new ReentrantLock(true);
static final Condition CONDITION = FAIR_LOCK.newCondition();
}
public class Boat {
@Getter(AccessLevel.PACKAGE)
private final byte capacity;
@Getter
private final byte speed;
private final List<Passenger> passengersOnBoard;
@Getter(AccessLevel.PACKAGE)
private final Semaphore semaphore;
private final Condition condition = SharedSynchronizers.CONDITION;
private Boat(Builder builder) {
capacity = builder.capacity;
speed = builder.speed;
passengersOnBoard = builder.passengersOnBoard;
semaphore = builder.semaphore;
}
public static Builder withCapacity(byte capacity) {
return new Builder(capacity);
}
public static class Builder {
private final byte capacity;
private byte speed;
private final List<Passenger> passengersOnBoard;
private final Semaphore semaphore;
private Builder(byte capacity) {
this.capacity = capacity;
this.passengersOnBoard = new ArrayList<>(capacity);
this.semaphore = new Semaphore(capacity, true);
}
public Boat andSpeed(byte speed) {
this.speed = speed;
return new Boat(this);
}
}
void addPassenger(Passenger passenger) {
try {
while (semaphore.availablePermits() < 1) {
condition.await(); // so that a thread releases a lock if it can't acquire a permit
}
semaphore.acquire();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
passengersOnBoard.add(passenger);
}
void removePassenger(Passenger passenger) {
passengersOnBoard.remove(passenger);
}
boolean hasPassengers() {
return !passengersOnBoard.isEmpty();
}
boolean isFull() {
return passengersOnBoard.size() == capacity;
}
}
public class Passenger extends StatusReporter {
@Setter(AccessLevel.PACKAGE)
private BoatKeeper boatKeeper;
private static final short EMBARK_DISEMBARK_TIME = 500;
private final Lock lock = SharedSynchronizers.FAIR_LOCK;
private final Condition condition = SharedSynchronizers.CONDITION;
public Passenger(String name) {
super(name);
}
@Override
public void run() {
tryToGetOnBoat();
stayOnBoat();
disembark();
}
private void tryToGetOnBoat() {
setAndReportStatus(PassengerStatus.GOING_TO_EMBARK);
try {
Thread.sleep(EMBARK_DISEMBARK_TIME);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
lock.lock();
boatKeeper.getBoat().addPassenger(this);
boatKeeper.getPassengersToFerry().remove(this);
setAndReportStatus(PassengerStatus.EMBARKED_THE_BOAT);
condition.signalAll();
lock.unlock();
}
private void stayOnBoat() {
try {
lock.lock();
while (!boatKeeper.isRowing()) {
condition.await();
}
lock.unlock();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
setAndReportStatus(PassengerStatus.CROSSING_THE_RIVER);
}
private void disembark() {
try {
lock.lock();
while (!boatKeeper.isDisembarking()) {
condition.await();
}
lock.unlock();
Thread.sleep(EMBARK_DISEMBARK_TIME);
boatKeeper.getBoat().removePassenger(this);
lock.lock();
condition.signalAll();
lock.unlock();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
setAndReportStatus(PassengerStatus.ON_THE_OTHER_BANK);
}
@RequiredArgsConstructor
enum PassengerStatus implements Status {
GOING_TO_EMBARK("going to embark the boat..."),
EMBARKED_THE_BOAT("embarked the boat!"),
WAITING_FOR_OTHER_PASSENGERS("waiting for other passengers to join..."),
CROSSING_THE_RIVER("crossing the river..."),
ON_THE_OTHER_BANK("on the other bank!");
private final String description;
@Override
public String description() {
return description;
}
}
}
public class BoatKeeper extends StatusReporter {
@Getter(AccessLevel.PACKAGE)
private final Boat boat;
private int timeToCrossRiver;
@Getter(AccessLevel.PACKAGE)
private final List<Passenger> passengersToFerry = new ArrayList<>();
private final Lock lock = SharedSynchronizers.FAIR_LOCK;
private final Condition condition = SharedSynchronizers.CONDITION;
public BoatKeeper(Boat boat) {
super("Boat keeper");
this.boat = boat;
}
@Override
public void run() {
performFerrying();
}
public BoatKeeper ferry(Passenger... passengers) {
Collections.addAll(passengersToFerry, passengers);
return this;
}
public void acrossRiver(River river) {
timeToCrossRiver = river.width() / boat.getSpeed();
start();
}
private void performFerrying() {
invitePassengersToEmbark();
while (hasFerryingToDo()) {
waitTillBoatReadyToGo();
crossRiver();
disembarkPassengers();
rowBack();
}
setAndReportStatus(BoatKeeperStatus.DONE);
}
private void invitePassengersToEmbark() {
passengersToFerry.forEach(p -> {
p.setBoatKeeper(this);
p.start();
});
}
private void waitTillBoatReadyToGo() {
if (!isBoardingComplete()) {
setAndReportStatus(BoatKeeperStatus.WAITING_FOR_PASSENGERS_TO_EMBARK);
}
try {
lock.lock();
while (!isBoardingComplete()) {
condition.await();
}
lock.unlock();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private void crossRiver() {
setAndReportStatus(BoatKeeperStatus.FERRYING_PASSENGERS);
lock.lock();
condition.signalAll();
lock.unlock();
try {
Thread.sleep(timeToCrossRiver);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private void disembarkPassengers() {
setAndReportStatus(BoatKeeperStatus.DISEMBARKING_PASSENGERS);
lock.lock();
condition.signalAll();
try {
while (boat.hasPassengers()) {
condition.await();
}
lock.unlock();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private void rowBack() {
setAndReportStatus(BoatKeeperStatus.RETURNING);
try {
Thread.sleep(timeToCrossRiver);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
lock.lock();
/*
doesn't matter if the boat keeper releases more permits than
the number of passengers it ferried: if it's less than the boat's
max capacity, then it was the last trip anyway
*/
boat.getSemaphore().release(boat.getCapacity());
condition.signalAll();
lock.unlock();
}
private boolean hasFerryingToDo() {
return !passengersToFerry.isEmpty() || boat.hasPassengers();
}
private boolean isBoardingComplete() {
return boat.isFull() ||
(boat.hasPassengers() && passengersToFerry.isEmpty());
}
boolean isRowing() {
return status == BoatKeeperStatus.FERRYING_PASSENGERS;
}
boolean isDisembarking() {
return status == BoatKeeperStatus.DISEMBARKING_PASSENGERS;
}
@RequiredArgsConstructor
enum BoatKeeperStatus implements Status {
WAITING_FOR_PASSENGERS_TO_EMBARK("waiting for passengers to embark..."),
FERRYING_PASSENGERS("ferrying passengers across the river..."),
DISEMBARKING_PASSENGERS("disembarking passengers..."),
RETURNING("returning..."),
DONE("all passengers are ferried!");
private final String description;
@Override
public String description() {
return description;
}
}
}
Client code:
public class App {
public static void main(String[] args) {
River river = new River((short) 2800);
Boat tinyBoat = Boat.withCapacity((byte) 2).andSpeed((byte) 10);
BoatKeeper boatKeeper = new BoatKeeper(tinyBoat);
Passenger[] passengers = {
new Passenger("Frank"), new Passenger("Liz"), new Passenger("Gabriel")
};
boatKeeper.ferry(passengers).acrossRiver(river);
}
}
Behavior varies. Whether you run it with or without the IntelliJ IDEA debugger seems to have some effect, but it's not a fail-proof predictor either
Scenario #1: Ghost passenger (typical without the debugger, unless it's attached after the freeze)
The passenger threads disembark and finish, but the boat still has one null
element making the boat keeper wait indefinitely and not row back.
Boat keeper: waiting for passengers to embark...
Frank: going to embark the boat...
Liz: going to embark the boat...
Gabriel: going to embark the boat...
Frank: embarked the boat!
Liz: embarked the boat!
Boat keeper: ferrying passengers across the river...
Liz: crossing the river...
Frank: crossing the river...
Boat keeper: disembarking passengers...
Liz: on the other bank!
Frank: on the other bank!
// freezes!
screenshot showing that there's some null passenger on the boat
Scenario #2: Everything is fine (typically with the debugger, rarely without)
No issues observed
Gabriel: going to embark the boat...
Liz: going to embark the boat...
Frank: going to embark the boat...
Boat keeper: waiting for passengers to embark...
Frank: embarked the boat!
Liz: embarked the boat!
Boat keeper: ferrying passengers across the river...
Frank: crossing the river...
Liz: crossing the river...
Boat keeper: disembarking passengers...
Frank: on the other bank!
Liz: on the other bank!
Boat keeper: returning...
Gabriel: embarked the boat!
Boat keeper: ferrying passengers across the river...
Gabriel: crossing the river...
Boat keeper: disembarking passengers...
Gabriel: on the other bank!
Boat keeper: returning...
Boat keeper: all passengers are ferried!
Process finished with exit code 0
Since I can't reproduce the issue while debugging the program, I'm completely at a loss. Why do those freezes happen? How do I fix it so that I get the second scenario all the time?
I debugged thoroughly, spent a lot of hours trying to pinpoint the issue. I don't see any other choice but to ask for your help
答案1
得分: 0
注: 这不是我的回答,而是Louis Wasserman在评论中提供的答案(显然,他不想将其发布为答案)。
乍一看,这似乎是一个标准的线程安全问题,由尝试在没有任何锁定的情况下并发修改非线程安全集合引起的。对诸如ArrayList的修改需要在锁定下进行。
–Louis Wasserman
确实,将disembark()
方法的这部分更改为:
lock.lock();
boatKeeper.getBoat().removePassenger(this);
condition.signalAll();
lock.unlock();
有所帮助。谢谢!
英文:
Note. It's not my answer but rather the answer Louis Wasserman gave in the comments (apparently, he doesn't want to post it as an answer).
> At a glance, this looks like a standard thread safety issue caused by trying to concurrently modify a non-thread-safe collection without any sort of locking. Modifications to things like ArrayLists need to be done under a lock.
>
> –Louis Wasserman
Indeed, changing this part of disembark()
method
boatKeeper.getBoat().removePassenger(this);
lock.lock();
condition.signalAll();
lock.unlock();
to this
lock.lock();
boatKeeper.getBoat().removePassenger(this);
condition.signalAll();
lock.unlock();
helped. Thank you!
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论