如何检查轮询停止

huangapple go评论59阅读模式
英文:

How to check when polling stopped

问题

我有一个消息流,在这个流中,我需要处理并将消息存储到数据库中。在Java中,我已经编写了轮询代码,它每20秒轮询一次流并消耗消息。

这是在一个无限循环内完成的,如下所示:

for (;;) {
    try {
        //1. 用于轮询的逻辑。
        //2. 用于处理消息的逻辑。
        //3. 用于将消息存储到数据库的逻辑。
        Thread.sleep(20000 - <上述3个步骤所需的时间>);
    } catch (Exception E) {
        //4. 异常处理。
    }
}

这个逻辑按预期运行,可以轮询流,但偶尔会发生异常或出现问题,轮询停止。

我希望有一种机制,一旦轮询停止,比如说这个for循环连续不运行了60秒,我应该收到一封邮件或一条提示。

如何在这个for循环连续不运行60秒后调用一个方法?

我考虑,每个for循环执行都会发送一个心跳,当没有从for循环收到心跳时,就会触发发送邮件的操作。

英文:

I have a message stream, where messages comes which I need to process and then store them in database. In Java, I've written polling code which polls stream and consumes messages every 20 seconds.

This is done inside an infinite for-loop, like below:

for (;;) {
    try{
        //1. Logic for polling.
        //2. Logic for processing the message.
        //3. Logic for storing the message in database.
        Thread.sleep(20000 - &lt;time taken for above 3 steps &gt;); 
    } catch(Exception E){
        //4. Exception handling.
    }
}

This logic runs as expected and the stream is polled, but once in a while it hits an exception or something goes wrong and polling stops.

I want to have a mechanism, that as soon as polling stopped, let's say this for loop is not running for 60 seconds, I should receive a mail or ping.

How can I invoke a method if this for loop is not running for 60 seconds?

I am thinking like, each for-loop execution will ping a heartbeat, and when that heartbeat pinging not received from for-loop then a mail sending is invoked.

答案1

得分: 3

以下是翻译好的部分:

有两个不同的原因导致轮询停止取得进展,每个原因都需要不同的处理:

如果逻辑抛出一个Throwable而不是一个Exception,例如Error,则catch子句不匹配,执行将离开for循环,很可能会到达线程的UncaughtExceptionHandler,其默认实现将异常记录到System.err并终止线程。为了防止这种情况发生,应该捕获Throwable而不是Exception

第二种可能性是您的逻辑中的某个步骤不会终止,例如由于无限循环、死锁、等待I/O操作或其他原因。在这种情况下,您需要获取线程转储以查看线程被卡在哪里。您可以按照以下方式自动执行此操作:

class Watchdog {
    final Duration gracePeriod;
    final Thread watchedThread;
    volatile Instant lastProgress;

    public Watchdog(Duration gracePeriod) {
        this.gracePeriod = gracePeriod;
        watchedThread = Thread.currentThread();
        everythingIsFine();

        var t = new Thread(this::keepWatch);
        t.setDaemon(true);
        t.start();
    }

    public void everythingIsFine() {
        lastProgress = Instant.now();
    }

    void keepWatch() {
        while (true) {
            var silence = Duration.between(lastProgress, Instant.now());
            if (silence.compareTo(gracePeriod) > 0) {
                System.err.println("Watchdog hasn't seen any progress for " + silence.toSeconds() + " seconds. The watched thread is currently at:");
                for (var element : watchedThread.getStackTrace()) {
                    System.err.println("\tat " + element);
                }
            }
            try {
                Thread.sleep(gracePeriod);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

您可以按照以下方式使用:

public class Test {
    void step() throws Exception {
        System.in.read();
    }

    void job() {
        var snoopy = new Watchdog(Duration.ofSeconds(2));
        for (;;) {
            try {
                step();
                snoopy.everythingIsFine();
                Thread.sleep(1000);
            } catch (Throwable t) {
                System.err.println(t);
            }
        }
    }

    public static void main(String[] args) throws Exception {
        new Test().job();
    }
}

一旦宽限期到期,WatchDog将打印类似以下的内容:

Watchdog hasn't seen any progress for 2 seconds. The watched thread is currently at:
	at java.base/java.io.FileInputStream.readBytes(Native Method)
	at java.base/java.io.FileInputStream.read(FileInputStream.java:293)
	at java.base/java.io.BufferedInputStream.fill(BufferedInputStream.java:255)
	at java.base/java.io.BufferedInputStream.implRead(BufferedInputStream.java:289)
	at java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:276)
	at stackoverflow.Test.step(Test.java:48)
	at stackoverflow.Test.job(Test.java:55)
	at stackoverflow.Test.main(Test.java:65)
英文:

There are two different reasons why polling stops making progress, and each needs a different approach:

If the logic throws a Throwable other than an Exception, for instance an Error, the catch does not match, and execution will leave the for-loop, and likely reach the thread's UncaughtExceptionHandler, the default implementation of which logs the exception to System.err and terminates the thread. To prevent this, you should catch Throwable rather than Exception.

The second possibility is that some step in your logic doesn't terminate, for instance due to an infinite loop, a deadlock, waiting for I/O operations, or whatever. In this case, you'll want to take a thread dump to see where the thread is stuck. You can automate this as follows:

class Watchdog {
	final Duration gracePeriod;
	final Thread watchedThread;
	volatile Instant lastProgress;

	public Watchdog(Duration gracePeriod) {
		this.gracePeriod = gracePeriod;
		watchedThread = Thread.currentThread();
		everythingIsFine();
		
		var t = new Thread(this::keepWatch);
		t.setDaemon(true);
		t.start();
	}
	
	public void everythingIsFine() {
		lastProgress = Instant.now();
	}
	
	void keepWatch() {
		while (true) {
			var silence = Duration.between(lastProgress, Instant.now());
			if (silence.compareTo(gracePeriod) &gt; 0) {
				System.err.println(&quot;Watchdog hasn&#39;t seen any progress for &quot; + silence.toSeconds() + &quot; seconds. The watched thread is currently at:&quot;);
				for (var element : watchedThread.getStackTrace()) {
					System.err.println(&quot;\tat &quot; + element);
				}
			}
			try {
				Thread.sleep(gracePeriod);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
}

while you can use as follows:

public class Test {
	void step() throws Exception {
		System.in.read();
	}
	
	void job() {
		var snoopy = new Watchdog(Duration.ofSeconds(2));
		for (;;) {
			try {
				step();
				snoopy.everythingIsFine();
				Thread.sleep(1000);
			} catch (Throwable t) {
				System.err.println(t);
			}
		}
	}
	
	public static void main(String[] args) throws Exception {
		new Test().job();
	}
}

once the grace period elapses, the WatchDog will print something like:

<pre>
Watchdog hasn't seen any progress for 2 seconds. The watched thread is currently at:
at java.base/java.io.FileInputStream.readBytes(Native Method)
at java.base/java.io.FileInputStream.read(FileInputStream.java:293)
at java.base/java.io.BufferedInputStream.fill(BufferedInputStream.java:255)
at java.base/java.io.BufferedInputStream.implRead(BufferedInputStream.java:289)
at java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:276)
at stackoverflow.Test.step(Test.java:48)
at stackoverflow.Test.job(Test.java:55)
at stackoverflow.Test.main(Test.java:65)
</pre>

huangapple
  • 本文由 发表于 2023年2月19日 09:47:31
  • 转载请务必保留本文链接:https://go.coder-hub.com/75497509.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定