英文:
Setting and reading instance variable value within rxJava chain from different Schedulers
问题
我不确定从不同调度程序的rxJava链中读取/写入实例变量的安全性。这里有一个简单的示例:
public class RxJavaThreadSafety {
private int variable = 0;
// 第一个调用
public void doWriting() {
Single.just(255)
.doOnSuccess(
newValue -> variable = newValue
)
.subscribeOn(Schedulers.io())
.subscribe();
}
// 第二个调用
public void doReadingRxChain() {
Single.fromCallable((Callable<Integer>) () -> variable)
.subscribeOn(Schedulers.computation())
.subscribe(
result -> System.out.println(result)
);
}
// 第三个调用
public void doReading() {
System.out.println(variable);
}
}
为了简单起见,假设这三个方法依次调用。
我的问题:在 io 调度程序中设置变量,然后在计算调度程序或主线程中稍后读取这个变量,这样做是线程安全的吗?
我认为这是不线程安全的,但我希望一些 rxJava 和并发专家来证明一下。
英文:
I am not sure about safety of reading/writing instance variables from rxJava chain with different schedulers. There is a small example
public class RxJavaThreadSafety {
private int variable = 0;
// First call
public void doWriting() {
Single.just(255)
.doOnSuccess(
newValue -> variable = newValue
)
.subscribeOn(Schedulers.io())
.subscribe();
}
// Second call
public void doReadingRxChain() {
Single.fromCallable((Callable<Integer>) () -> variable)
.subscribeOn(Schedulers.computation())
.subscribe(
result -> System.out.println(result)
);
}
// Third call
public void doReading() {
System.out.println(variable);
}
}
For simplicity lets assume that these three methods called one after another
My question: Does it thread safe to set variable "in" io scheduler, and lately read this variable "from" computation scheduler or main thread?
I think that is not thread safe, but i want some rxJava and concurrency experts to prove it
答案1
得分: 1
不,这是不线程安全的。
当您使用 subscribeOn
时,意味着调用 subscribe()
会将生成该项的任务添加到调度程序的工作队列中。
doWriting()
和 doReadingRxChain()
方法将任务添加到不同的调度程序中。不能保证 doWriting()
中的链路甚至会在 doReadingRxChain()
之前开始运行。例如,如果所有IO线程都在忙碌,就会发生这种情况。
还有一个更基本的问题:您在一个线程中编写 variable
的值,在另一个线程中读取它。如果没有任何并发控制,没有任何保证线程读取时会看到 variable
的新值。修复这个问题的一种方法是将变量声明为 volatile
:
private volatile int variable = 0;
英文:
No, this is not thread safe.
When you use subscribeOn
it means that calling subscribe()
adds the task for producing the item to the work queue of a scheduler.
The doWriting()
and doReadingRxChain()
methods add tasks to different schedulers. There is no guarantee that the chain in doWriting()
will even start to run before doReadingRxChain()
. This can happen for example if all IO threads are busy.
There is a more fundamental problem: you are writing the value of variable
in one thread and reading it in another. Without any concurrency controls, nothing guarantees that the new value of variable
is seen by the thread reading it. One way to fix that is declaring the variable as volatile
:
private volatile int variable = 0;
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论