英文:
What causes UnrecoverableTimeoutException and how should i fix or avoid it?
问题
这似乎是由于多线程访问Chronicle队列引起的问题。您在多个服务器上部署了相同的JAR文件,但只有一个服务器出现了这个错误,其他服务器正常工作。以下是您提供的代码中可能导致问题的部分:
this.APPENDER.writeBytes(marshallable); // <--error thrown here
错误似乎是在这行代码处抛出的。问题可能出现在多个线程同时尝试写入Chronicle队列的同一个APPENDER实例时。Chronicle队列可能没有足够的并发支持来处理这种情况。
为了解决这个问题,您可以考虑以下几点:
-
使用线程安全的APPENDER:确保每个线程都使用线程安全的APPENDER实例,以避免多线程冲突。您可以使用
ChronicleQueue.acquireAppender()
来获取一个新的APPENDER实例,以确保每个线程都有自己的实例。 -
排队写入请求:如果您的应用程序需要高并发写入,可以考虑实现一个队列来排队写入请求,以确保每个写入操作都按顺序进行。
-
增加Chronicle队列的锁定超时时间:根据您的需求,您可以考虑增加Chronicle队列的锁定超时时间,以允许更长时间的等待。但这可能会导致写入操作被阻塞,具体取决于您的应用程序要求。
-
检查服务器环境差异:考虑检查出现错误的服务器与其他服务器之间的差异,包括硬件、操作系统和Chronicle库的版本。可能存在某些环境因素导致问题。
请注意,以上建议是基于您提供的信息进行的一般性建议。要更详细地解决问题,您可能需要进一步调查服务器上的具体情况和Chronicle队列的配置。
英文:
I have N sets of 1 producer to 1 consumer. The consumer will write to Chronicle Queue. I just found out today that there is an error I have not previously seen before.
Exception in thread "TaskConsumer2" Exception in thread "TaskConsumer0" net.openhft.chronicle.wire.UnrecoverableTimeoutException: Couldn't acquire write lock after 15000 ms for the lock file:./chronicle/roll/metadata.cq4t. Lock was held by me. You can manually unlock with net.openhft.chronicle.queue.main.UnlockMain
at net.openhft.chronicle.queue.impl.single.TableStoreWriteLock.lock(TableStoreWriteLock.java:96)
at net.openhft.chronicle.queue.impl.single.StoreAppender.prepareAndReturnWriteContext(StoreAppender.java:430)
at net.openhft.chronicle.queue.impl.single.StoreAppender.writingDocument(StoreAppender.java:406)
at net.openhft.chronicle.queue.impl.single.StoreAppender.writingDocument(StoreAppender.java:394)
at net.openhft.chronicle.queue.impl.single.StoreAppender.writeBytes(StoreAppender.java:194)
at service.producerconsumer.TaskConsumer.runTask(TaskConsumer.java:80)
at service.producerconsumer.TaskConsumer.run(TaskConsumer.java:142)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Couldn't acquire write lock after 15000 ms for the lock file:./chronicle/roll/metadata.cq4t. Lock was held by me. You can manually unlock with net.openhft.chronicle.queue.main.UnlockMain
... 8 more
net.openhft.chronicle.wire.UnrecoverableTimeoutException: Couldn't acquire write lock after 15000 ms for the lock file:./chronicle/roll/metadata.cq4t. Lock was held by me. You can manually unlock with net.openhft.chronicle.queue.main.UnlockMain
at net.openhft.chronicle.queue.impl.single.TableStoreWriteLock.lock(TableStoreWriteLock.java:96)
at net.openhft.chronicle.queue.impl.single.StoreAppender.prepareAndReturnWriteContext(StoreAppender.java:430)
at net.openhft.chronicle.queue.impl.single.StoreAppender.writingDocument(StoreAppender.java:406)
at net.openhft.chronicle.queue.impl.single.StoreAppender.writingDocument(StoreAppender.java:394)
at net.openhft.chronicle.queue.impl.single.StoreAppender.writeBytes(StoreAppender.java:194)
at service.producerconsumer.TaskConsumer.runTask(TaskConsumer.java:80)
at service.producerconsumer.TaskConsumer.run(TaskConsumer.java:142)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Couldn't acquire write lock after 15000 ms for the lock file:./chronicle/roll/metadata.cq4t. Lock was held by me. You can manually unlock with net.openhft.chronicle.queue.main.UnlockMain
... 8 more
Is this due to multi-thread access to chronicle queue? I reuse the chronicle queue and use thread local appenders and the following is a sample of how I use the class.
public class TaskConsumer implements Runnable {
private final ChronicleQueue QUEUE;
private CustomQueueClass queue;
private ExcerptAppender APPENDER;
//other code
public TaskConsumer(ChronicleQueue queue) {
this.QUEUE= queue;
//instantiate queue
//other code
}
private long millis;
private long nanos;
private ByteBuffer buffer;
private InetAddress remoteAdd;
private int remotePort;
private String ni;
private int remaining;
private int senderId;
private long seqNum;
private MoldUdpHeader moldUdpHeader = new MoldUdpHeader();
private final PrimitiveIntPair pair = new PrimitiveIntPair(0, 0);
private final WriteBytesMarshallable marshallable = (bytes) -> bytes.writeLong(this.millis)
.writeLong(this.nanos)
.write(this.remoteAdd.getAddress())
.writeInt(this.remotePort)
.writeUtf8(this.ni)
.writeInt(this.remaining)
.writeInt(this.senderId)
.writeLong(this.seqNum)
.write(this.buffer.array(), 0, this.remaining); //sbe-style writes seqNum, remoteAddress, and the ByteBuffer
private void runTask() {
LOGGER.debug(logMarker, "{} {} {} {} {} | senderId: {} seqNum: {} msgCnt: {}",
() -> ZonedDateTime.ofInstant(Instant.ofEpochMilli(millis), ZoneId.of("Asia/Hong_Kong")),
() -> remoteAdd.getHostName(), () -> remotePort, () -> ni,
() -> remaining, () -> moldUdpHeader.getSenderId(), () -> moldUdpHeader.getSeqNum(),
() -> moldUdpHeader.getMsgCnt());
this.APPENDER.writeBytes(marshallable); //<--error thrown here
}
public void run() {
this.APPENDER = QUEUE.acquireAppender();
TaskHolder task = null;
while (true) {
if (Thread.currentThread().isInterrupted()) {
closeAppender();
break;
}
if (task == null) {
try {
task = queue.getForConsuming(TIMEOUT, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
if (task != null) {
buffer = task.getByteBuffer();
if (task.getUdpChannel() != null) {
remoteAdd = task.getUdpChannel().getGROUP_ADDRESS();
remotePort = task.getUdpChannel().getPORT();
millis = task.getMillis();
nanos = task.getNanos();
ni = task.getUdpChannel().getNETWORK_INTERFACE().getName();
remaining = buffer.remaining();
if (DECODING.equals("TRUE")) {
moldUdpHeader = (MoldUdpHeader) moldUdpHeader.decode(buffer, 0);
}
senderId = moldUdpHeader.getSenderId();
seqNum = moldUdpHeader.getSeqNum();
pair.setId(moldUdpHeader.getSenderId());
pair.setIndex(getIndex(task.getUdpChannel()));
triplesHashmap.computeIfAbsent(pair.copy(), k -> (DECODING.equals("TRUE")) ?
new Triple<>(new MutableLong(Long.MIN_VALUE), new LongArrayVisitationCounter(10000000), new PacketStats()) :
new Triple<>(new MutableLong(Long.MIN_VALUE), new LongArrayVisitationCounter(10), new PacketStats())); //using a supplier to lazily instantiate
runTask(); //<--- error thrown here
synchronized (triplesHashmap.get(pair).getType3()) {
if (DECODING.equals("TRUE")) {
checkReadValueSequence(triplesHashmap.get(pair), pair, moldUdpHeader.getSeqNum());
} else {
PacketStats stats = triplesHashmap.get(pair).getType3();
stats.incrementPacketsRead();
stats.incrementBytesReadBy(remaining);
}
}
}
task.clearValues();
queue.incrementReadIndex();
task = null;
}
}
}
//other code
}
What is weird is that I have deployed the jar onto multiple servers, but it is just this one server that has this error. The other servers are working as per normal. I could use try-catch to just ignore the error and recursion on the catch to retry running the task, but I would like to know what is causing this and how to avoid it
答案1
得分: 2
听起来你正在做正确的事情。
你可以共享Chronicle Queue,一个线程本地的appender和tailer不应该有问题。一些旧版本可能会在资源清理方面出现问题,尤其是如果线程死掉。现在我们对此有更好的控制。
我建议你尝试5.23.37或5.24ea17。
顺便说一下,每次都执行DECODING.equals("TRUE")
是比较昂贵的。我建议在循环外部将其缓存到一个局部变量中。
英文:
It sounds like you are going the right thing.
You can share the Chronicle Queue, a thread-local appender and tailer shouldn't have a problem. Some older versions had problems with clean-up of resources esp if a thread died. We have better control over that now.
I suggest you try 5.23.37 or 5.24ea17.
BTW DECODING.equals("TRUE")
is expensive to do every time. I suggest caching this in a local variable outside the loop.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论