英文:
Is there a way to stop a micronaut scheduled job between tests?
问题
我有一个根据属性动态调度作业的应用程序。它会监听ServiceReadyEvent,然后调度作业。下面是一个小例子,通过工厂将其添加到上下文中。
class JobsLoader(
@param:Named(TaskExecutors.SCHEDULED) val taskScheduler: TaskScheduler,
val myJobName: String
) : ApplicationEventListener<ServiceReadyEvent> {
override fun onApplicationEvent(event: ServiceReadyEvent?) {
taskScheduler.schedule("* * * * *", { println("running job $jobName") })
}
}
为了测试这段代码,我使用kotest编写了一个Micronaut测试。
@MicronautTest
class Job1Spec(
@param:Named(TaskExecutors.SCHEDULED) val taskScheduler: TaskScheduler
) {
init {
"start job loader" {
val jobsLoader = JobsLoader("job1", taskScheduler)
jobsLoader.onApplicationEvent(ServiceReadyEvent(ServiceInstance.of("test", URL("http://localhost:8080"))))
}
"verify the job runs" {
//在这里编写代码以验证作业是否已运行并执行了正确的调用
}
}
}
我还为另一个作业编写了第二个测试。
@MicronautTest
class Job2Spec(
@param:Named(TaskExecutors.SCHEDULED) val taskScheduler: TaskScheduler
) {
init {
"start job loader" {
val jobsLoader = JobsLoader("job2", taskScheduler)
jobsLoader.onApplicationEvent(ServiceReadyEvent(ServiceInstance.of("test", URL("http://localhost:8080"))))
}
"verify the job runs" {
//在这里编写代码以验证作业是否已运行并执行了正确的调用
}
}
}
我的问题在于,在第二个测试期间,我发现第一个作业在第二个测试运行期间仍然被调度:
[INFO] 2020-09-10 12:57:00,005 pool-2-thread-1 running job job1
[INFO] 2020-09-10 12:57:00,009 pool-4-thread-1 running job job2
除了作业的运行之外,关于第一次运行测试的其他所有内容都已停止。我发现它仍在运行,因为它在作业运行期间会抛出由于HttpClient关闭而导致的异常。
我尝试过:
- 在测试之间使用
rebuildContext = true
重新构建测试上下文。 - 在我的
JobsLoader
类中跟踪调度的futures,然后在afterSpec
函数中取消它们。但是在取消它们时,future列表始终为空。 - 手动刷新TaskScheduler和ExecutorService beans。
所有测试都通过了,但它们都充斥着异常,特别是在测试越来越多的作业时,除了我正在测试的作业之外,所有其他作业都持续失败。我不确定这是否与kotest、micronaut测试、调度程序还是一些组合,或者完全是其他问题有关(我也看到了vertx日志,但我认为那不是问题所在)。我真的需要找到一种在下一个测试运行之前终止这些作业/线程的方法。
非常感谢您的任何帮助或想法!
英文:
I have an application that dynamically schedules jobs based on properties. It listens for the ServiceReadyEvent and then schedules the job. Below is a small example that would get added to the context via a factory.
class JobsLoader(
@param:Named(TaskExecutors.SCHEDULED) val taskScheduler: TaskScheduler,
val myJobName: String
) : ApplicationEventListener<ServiceReadyEvent> {
override fun onApplicationEvent(event: ServiceReadyEvent?) {
taskScheduler.schedule("* * * * *", () -> println("running job $jobName"))
}
}
To test this code I have a Micronaut test, using kotest.
@MicronautTest
class Job1Spec(
@param:Named(TaskExecutors.SCHEDULED) val taskScheduler: TaskScheduler
) {
init {
"start job loader" {
val jobsLoader = JobsLoader("job1", taskScheduler)
jobsLoader.onApplicationEvent(ServiceReadyEvent(ServiceInstance.of("test", URL("http://localhost:8080"))))
}
"verify the job runs" {
//code here to verify the job ran and performed the correct calls
}
}
}
I also have a second test for another job
@MicronautTest
class Job2Spec(
@param:Named(TaskExecutors.SCHEDULED) val taskScheduler: TaskScheduler
) {
init {
"start job loader" {
val jobsLoader = JobsLoader("job2", taskScheduler)
jobsLoader.onApplicationEvent(ServiceReadyEvent(ServiceInstance.of("test", URL("http://localhost:8080"))))
}
"verify the job runs" {
//code here to verify the job ran and performed the correct calls
}
}
}
My problem lies in that during the second test I see that the first job is still scheduled during the second tests run:
[INFO] 2020-09-10 12:57:00,005 pool-2-thread-1 running job job1
[INFO] 2020-09-10 12:57:00,009 pool-4-thread-1 running job job2
Everything else about the first run of tests has stopped except for the running of the job. I found it was still running because it throws an exception due to the HttpClient being closed during the job run.
I've tried
- Rebuilding the test context between tests with
rebuildContext = true
- Tracking the scheduled futures in my
JobsLoader
class and then cancelling them in anafterSpec
function. The list of futures is always empty by the time I go to cancel them. - Refreshing the TaskScheduler and ExecutorService beans manually
The tests all pass fine, but they are littered with exceptions - especially as I get to testing more and more jobs and all but the one I'm testing continuously fails. I'm not sure if this is an issue with kotest, micronaut test, the schedulers, some combination, or something else entirely (I see vertx logs as well, but I don't think that's the issue). I really just need to figure out a way to kill these jobs/threads before the next test runs.
Any help or ideas would be HUGELY appreciated!
答案1
得分: 1
在输入了所有内容后,我立即找出了自己的问题所在。我在测试中手动创建了一个作业,这不是 micronaut 上下文的一部分,所以当我尝试取消作业时,我的未来列表始终是空的。
我更改了我的测试以加载作业并将其添加到上下文中:
var jobsLoader: JobsLoader? = null
override fun beforeSpec(spec: Spec) {
jobsLoader = JobsLoader(taskScheduler, getJobsConfiguration(), jobResolver, eventPublisher)
jobsLoader?.onApplicationEvent(ServiceReadyEvent(ServiceInstance.of("test", URL("http://localhost:8080"))))
super.beforeSpec(spec)
}
override fun afterSpec(spec: Spec) {
jobsLoader?.stopJobs()
super.afterSpec(spec)
}
然后在我的作业加载器中提供了一个取消任何作业的函数。
class JobsLoader(
@param:Named(TaskExecutors.SCHEDULED) val taskScheduler: TaskScheduler,
val myJobName: String
) : ApplicationEventListener<ServiceReadyEvent> {
private val scheduledJobs = mutableListOf<ScheduledFuture<*>>()
override fun onApplicationEvent(event: ServiceReadyEvent?) {
val scheduledFuture = taskScheduler.schedule("* * * * *", { -> println("running job $jobName") })
scheduledJobs.add(scheduledFuture)
}
fun stopJobs() {
scheduledJobs.forEach { it.cancel(false) }
}
}
英文:
So after typing all that out I figured out what my issue was almost immediately. I'm manually creating a job in my test, which isn't a part of the micronaut context, so that's why my future list is always empty when I go and try to cancel the jobs.
I changed my test to load the job and add it to the context:
var jobsLoader: JobsLoader? = null
override fun beforeSpec(spec: Spec) {
jobsLoader = JobsLoader(taskScheduler, getJobsConfiguration(), jobResolver, eventPublisher)
jobsLoader?.onApplicationEvent(ServiceReadyEvent(ServiceInstance.of("test", URL("http://localhost:8080"))))
super.beforeSpec(spec)
}
override fun afterSpec(spec: Spec) {
jobsLoader?.stopJobs()
super.afterSpec(spec)
}
Then provided a function in my job loader to cancel any jobs.
class JobsLoader(
@param:Named(TaskExecutors.SCHEDULED) val taskScheduler: TaskScheduler,
val myJobName: String
) : ApplicationEventListener<ServiceReadyEvent> {
private val scheduledJobs = mutableListOf<ScheduledFuture<*>>()
override fun onApplicationEvent(event: ServiceReadyEvent?) {
val scheduledFuture = taskScheduler.schedule("* * * * *", () -> println("running job $jobName"))
scheduledJobs.add(scheduledFuture)
}
fun stopJobs() {
scheduledJobs.forEach { it.cancel(false) }
}
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论