UnrecoverableTimeoutException是什么原因引起的,如何修复或避免它?

huangapple go评论117阅读模式
英文:

What causes UnrecoverableTimeoutException and how should i fix or avoid it?

问题

这似乎是由于多线程访问Chronicle队列引起的问题。您在多个服务器上部署了相同的JAR文件,但只有一个服务器出现了这个错误,其他服务器正常工作。以下是您提供的代码中可能导致问题的部分:

this.APPENDER.writeBytes(marshallable);  // <--error thrown here

错误似乎是在这行代码处抛出的。问题可能出现在多个线程同时尝试写入Chronicle队列的同一个APPENDER实例时。Chronicle队列可能没有足够的并发支持来处理这种情况。

为了解决这个问题,您可以考虑以下几点:

  1. 使用线程安全的APPENDER:确保每个线程都使用线程安全的APPENDER实例,以避免多线程冲突。您可以使用ChronicleQueue.acquireAppender()来获取一个新的APPENDER实例,以确保每个线程都有自己的实例。

  2. 排队写入请求:如果您的应用程序需要高并发写入,可以考虑实现一个队列来排队写入请求,以确保每个写入操作都按顺序进行。

  3. 增加Chronicle队列的锁定超时时间:根据您的需求,您可以考虑增加Chronicle队列的锁定超时时间,以允许更长时间的等待。但这可能会导致写入操作被阻塞,具体取决于您的应用程序要求。

  4. 检查服务器环境差异:考虑检查出现错误的服务器与其他服务器之间的差异,包括硬件、操作系统和Chronicle库的版本。可能存在某些环境因素导致问题。

请注意,以上建议是基于您提供的信息进行的一般性建议。要更详细地解决问题,您可能需要进一步调查服务器上的具体情况和Chronicle队列的配置。

英文:

I have N sets of 1 producer to 1 consumer. The consumer will write to Chronicle Queue. I just found out today that there is an error I have not previously seen before.

Exception in thread &quot;TaskConsumer2&quot; Exception in thread &quot;TaskConsumer0&quot; net.openhft.chronicle.wire.UnrecoverableTimeoutException: Couldn&#39;t acquire write lock after 15000 ms for the lock file:./chronicle/roll/metadata.cq4t. Lock was held by me. You can manually unlock with net.openhft.chronicle.queue.main.UnlockMain
        at net.openhft.chronicle.queue.impl.single.TableStoreWriteLock.lock(TableStoreWriteLock.java:96)
        at net.openhft.chronicle.queue.impl.single.StoreAppender.prepareAndReturnWriteContext(StoreAppender.java:430)
        at net.openhft.chronicle.queue.impl.single.StoreAppender.writingDocument(StoreAppender.java:406)
        at net.openhft.chronicle.queue.impl.single.StoreAppender.writingDocument(StoreAppender.java:394)
        at net.openhft.chronicle.queue.impl.single.StoreAppender.writeBytes(StoreAppender.java:194)
        at service.producerconsumer.TaskConsumer.runTask(TaskConsumer.java:80)
        at service.producerconsumer.TaskConsumer.run(TaskConsumer.java:142)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Couldn&#39;t acquire write lock after 15000 ms for the lock file:./chronicle/roll/metadata.cq4t. Lock was held by me. You can manually unlock with net.openhft.chronicle.queue.main.UnlockMain
        ... 8 more
net.openhft.chronicle.wire.UnrecoverableTimeoutException: Couldn&#39;t acquire write lock after 15000 ms for the lock file:./chronicle/roll/metadata.cq4t. Lock was held by me. You can manually unlock with net.openhft.chronicle.queue.main.UnlockMain
        at net.openhft.chronicle.queue.impl.single.TableStoreWriteLock.lock(TableStoreWriteLock.java:96)
        at net.openhft.chronicle.queue.impl.single.StoreAppender.prepareAndReturnWriteContext(StoreAppender.java:430)
        at net.openhft.chronicle.queue.impl.single.StoreAppender.writingDocument(StoreAppender.java:406)
        at net.openhft.chronicle.queue.impl.single.StoreAppender.writingDocument(StoreAppender.java:394)
        at net.openhft.chronicle.queue.impl.single.StoreAppender.writeBytes(StoreAppender.java:194)
        at service.producerconsumer.TaskConsumer.runTask(TaskConsumer.java:80)
        at service.producerconsumer.TaskConsumer.run(TaskConsumer.java:142)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Couldn&#39;t acquire write lock after 15000 ms for the lock file:./chronicle/roll/metadata.cq4t. Lock was held by me. You can manually unlock with net.openhft.chronicle.queue.main.UnlockMain
        ... 8 more

Is this due to multi-thread access to chronicle queue? I reuse the chronicle queue and use thread local appenders and the following is a sample of how I use the class.

public class TaskConsumer implements Runnable {
    private final ChronicleQueue QUEUE;
    private CustomQueueClass queue;
    private ExcerptAppender APPENDER;
    //other code

    public TaskConsumer(ChronicleQueue queue) {
        this.QUEUE= queue;
        //instantiate queue
        //other code
    }

    private long millis;
    private long nanos;
    private ByteBuffer buffer;
    private InetAddress remoteAdd;
    private int remotePort;
    private String ni;
    private int remaining;
    private int senderId;
    private long seqNum;
    private MoldUdpHeader moldUdpHeader = new MoldUdpHeader();
    private final PrimitiveIntPair pair = new PrimitiveIntPair(0, 0);
    private final WriteBytesMarshallable marshallable = (bytes) -&gt; bytes.writeLong(this.millis)
        .writeLong(this.nanos)
        .write(this.remoteAdd.getAddress())
        .writeInt(this.remotePort)
        .writeUtf8(this.ni)
        .writeInt(this.remaining)
        .writeInt(this.senderId)
        .writeLong(this.seqNum)
        .write(this.buffer.array(), 0, this.remaining);  //sbe-style writes seqNum, remoteAddress, and the ByteBuffer

    private void runTask() {
        LOGGER.debug(logMarker, &quot;{} {} {} {} {} | senderId: {} seqNum: {} msgCnt: {}&quot;,
            () -&gt; ZonedDateTime.ofInstant(Instant.ofEpochMilli(millis), ZoneId.of(&quot;Asia/Hong_Kong&quot;)),
            () -&gt; remoteAdd.getHostName(), () -&gt; remotePort, () -&gt; ni,
            () -&gt; remaining, () -&gt; moldUdpHeader.getSenderId(), () -&gt; moldUdpHeader.getSeqNum(),
            () -&gt; moldUdpHeader.getMsgCnt());
        this.APPENDER.writeBytes(marshallable);  //&lt;--error thrown here
    }

    public void run() {
        this.APPENDER = QUEUE.acquireAppender();
        TaskHolder task = null;
        while (true) {
            if (Thread.currentThread().isInterrupted()) {
                closeAppender();
                break;
            }
            if (task == null) {
                try {
                    task = queue.getForConsuming(TIMEOUT, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }

            if (task != null) {
                buffer = task.getByteBuffer();
                if (task.getUdpChannel() != null) {
                    remoteAdd = task.getUdpChannel().getGROUP_ADDRESS();
                    remotePort = task.getUdpChannel().getPORT();
                    millis = task.getMillis();
                    nanos = task.getNanos();
                    ni = task.getUdpChannel().getNETWORK_INTERFACE().getName();
                    remaining = buffer.remaining();

                    if (DECODING.equals(&quot;TRUE&quot;)) {
                        moldUdpHeader = (MoldUdpHeader) moldUdpHeader.decode(buffer, 0);
                    }
                    senderId = moldUdpHeader.getSenderId();
                    seqNum = moldUdpHeader.getSeqNum();

                    pair.setId(moldUdpHeader.getSenderId());
                    pair.setIndex(getIndex(task.getUdpChannel()));

                    triplesHashmap.computeIfAbsent(pair.copy(), k -&gt; (DECODING.equals(&quot;TRUE&quot;)) ?
                        new Triple&lt;&gt;(new MutableLong(Long.MIN_VALUE), new LongArrayVisitationCounter(10000000), new PacketStats()) :
                        new Triple&lt;&gt;(new MutableLong(Long.MIN_VALUE), new LongArrayVisitationCounter(10), new PacketStats()));  //using a supplier to lazily instantiate
                    runTask();  //&lt;--- error thrown here
                    synchronized (triplesHashmap.get(pair).getType3()) {
                        if (DECODING.equals(&quot;TRUE&quot;)) {
                            checkReadValueSequence(triplesHashmap.get(pair), pair, moldUdpHeader.getSeqNum());
                        } else {
                            PacketStats stats = triplesHashmap.get(pair).getType3();
                            stats.incrementPacketsRead();
                            stats.incrementBytesReadBy(remaining);
                        }
                    }
                }
                task.clearValues();
                queue.incrementReadIndex();
                task = null;
            }
        }
    }

    //other code
}

What is weird is that I have deployed the jar onto multiple servers, but it is just this one server that has this error. The other servers are working as per normal. I could use try-catch to just ignore the error and recursion on the catch to retry running the task, but I would like to know what is causing this and how to avoid it

答案1

得分: 2

听起来你正在做正确的事情。
你可以共享Chronicle Queue,一个线程本地的appender和tailer不应该有问题。一些旧版本可能会在资源清理方面出现问题,尤其是如果线程死掉。现在我们对此有更好的控制。

我建议你尝试5.23.37或5.24ea17。

顺便说一下,每次都执行DECODING.equals("TRUE")是比较昂贵的。我建议在循环外部将其缓存到一个局部变量中。

英文:

It sounds like you are going the right thing.
You can share the Chronicle Queue, a thread-local appender and tailer shouldn't have a problem. Some older versions had problems with clean-up of resources esp if a thread died. We have better control over that now.

I suggest you try 5.23.37 or 5.24ea17.

BTW DECODING.equals(&quot;TRUE&quot;) is expensive to do every time. I suggest caching this in a local variable outside the loop.

huangapple
  • 本文由 发表于 2023年6月1日 11:32:38
  • 转载请务必保留本文链接:https://go.coder-hub.com/76378521.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定