Interval Operator in RxJava : Which part of code runs in background thread since I have subscribed on IO thread

huangapple go评论77阅读模式
英文:

Interval Operator in RxJava : Which part of code runs in background thread since I have subscribed on IO thread

问题

以下是翻译好的部分:

- **我在做什么:** 我正在使用 RxJava 中的 Interval 操作符
- 下面是我发布的输出
- **我想要找出的是:** 正如你所看到的,我已经在 `IO` 线程上进行了订阅。代码中哪部分在后台运行

**代码:**

    class IntervalOperatorFragment : Fragment() {
    
        // ...(省略了其他部分)
    
        private fun createObservable(): Observable<Long> {
            return Observable.interval(INTERVAL_PERIOD, TimeUnit.SECONDS)
                             .subscribeOn(Schedulers.io())
                             .observeOn(AndroidSchedulers.mainThread())
                             .takeWhile { value ->
                                 Timber.tag(TAG).d("Thread: %s", Thread.currentThread())
                                 value <= MAXIMUM_PERIOD
                             }
        }
    
        private fun subscribeToObservable() {
            createObservable().subscribe(object : Observer<Long>{
                override fun onSubscribe(d: Disposable) {
                    Timber.tag(TAG).d("Subscribe Invoked")
                }
    
                override fun onNext(t: Long) {
                    Timber.tag(TAG).d("Value: %s", t)
                }
    
                override fun onError(e: Throwable) {
                    Timber.tag(TAG).e("ERROR: %s", e.message)
                }
    
                override fun onComplete() {
                    Timber.tag(TAG).d("Task is complete")
                }
    
            })
        }
    }

**输出:**

    2020-09-13 03:20:19.334 24452-24452/com.demo.code D/IntervalOperatorFragment: Subscribe Invoked
    2020-09-13 03:20:20.345 24452-24452/com.demo.code D/IntervalOperatorFragment: Thread: Thread[main,5,main]
    2020-09-13 03:20:20.348 24452-24452/com.demo.code D/IntervalOperatorFragment: Value: 0
    2020-09-13 03:20:21.341 24452-24452/com.demo.code D/IntervalOperatorFragment: Thread: Thread[main,5,main]
    2020-09-13 03:20:21.342 24452-24452/com.demo.code D/IntervalOperatorFragment: Value: 1
    2020-09-13 03:20:22.342 24452-24452/com.demo.code D/IntervalOperatorFragment: Thread: Thread[main,5,main]
    2020-09-13 03:20:22.342 24452-24452/com.demo.code D/IntervalOperatorFragment: Value: 2
    2020-09-13 03:20:23.342 24452-24452/com.demo.code D/IntervalOperatorFragment: Thread: Thread[main,5,main]
    2020-09-13 03:20:23.342 24452-24452/com.demo.code D/IntervalOperatorFragment: Value: 3
    2020-09-13 03:20:24.342 24452-24452/com.demo.code D/IntervalOperatorFragment: Thread: Thread[main,5,main]
    2020-09-13 03:20:24.342 24452-24452/com.demo.code D/IntervalOperatorFragment: Value: 4
    2020-09-13 03:20:25.341 24452-24452/com.demo.code D/IntervalOperatorFragment: Thread: Thread[main,5,main]
    2020-09-13 03:20:25.342 24452-24452/com.demo.code D/IntervalOperatorFragment: Value: 5
    2020-09-13 03:20:26.342 24452-24452/com.demo.code D/IntervalOperatorFragment: Thread: Thread[main,5,main]
    2020-09-13 03:20:26.342 24452-24452/com.demo.code D/IntervalOperatorFragment: Task is complete
英文:
  • What I am doing: I am using the Iterval Operator from RxJava
  • Output I have posted below
  • What I am trying to find out: As you can see I have subscribed on IO
    thread. which part of code runs in background

Code:

class IntervalOperatorFragment : Fragment() {

    private val TAG = IntervalOperatorFragment::class.java.simpleName
    private var _binding: FragmentOperatorIntervalBinding? = null
    private val binding get() = _binding!!

    private val INTERVAL_PERIOD = 1L
    private val MAXIMUM_PERIOD = 5L

    override fun onCreateView(
        inflater: LayoutInflater,
        container: ViewGroup?,
        savedInstanceState: Bundle?
    ): View? {
        _binding = FragmentOperatorIntervalBinding.inflate(inflater, container, false)
        return binding.root
    }

    override fun onViewCreated(view: View, savedInstanceState: Bundle?) {
        super.onViewCreated(view, savedInstanceState)
        onClickListeners()
    }

    override fun onDestroyView() {
        super.onDestroyView()
        _binding = null
    }

    private fun onClickListeners() {
        binding.floatingActionButton.setOnClickListener {
            subscribeToObservable()
        }
    }

    /**
     * Create the observable
     */
    private fun createObservable(): Observable&lt;Long&gt; {
        return Observable.interval(INTERVAL_PERIOD, TimeUnit.SECONDS)
                         .subscribeOn(Schedulers.io())
                         .observeOn(AndroidSchedulers.mainThread())
                         .takeWhile { value -&gt;
                             Timber.tag(TAG).d(&quot;Thread: %s&quot;,Thread.currentThread())
                             value &lt;= MAXIMUM_PERIOD
                         }
    }

    /**
     * Subscribe to the observable
     */
    private fun subscribeToObservable() {
        createObservable().subscribe(object : Observer&lt;Long&gt;{
            override fun onSubscribe(d: Disposable) {
                Timber.tag(TAG).d(&quot;Subscribe Invoked&quot;)
            }

            override fun onNext(t: Long) {
                Timber.tag(TAG).d(&quot;Value: %s&quot;, t)
            }

            override fun onError(e: Throwable) {
                Timber.tag(TAG).e(&quot;ERROR: %s&quot;,e.message)
            }

            override fun onComplete() {
                Timber.tag(TAG).d(&quot;Task is complete&quot;)
            }

        })
    }
}

Output:

2020-09-13 03:20:19.334 24452-24452/com.demo.code D/IntervalOperatorFragment: Subscribe Invoked
2020-09-13 03:20:20.345 24452-24452/com.demo.code D/IntervalOperatorFragment: Thread: Thread[main,5,main]
2020-09-13 03:20:20.348 24452-24452/com.demo.code D/IntervalOperatorFragment: Value: 0
2020-09-13 03:20:21.341 24452-24452/com.demo.code D/IntervalOperatorFragment: Thread: Thread[main,5,main]
2020-09-13 03:20:21.342 24452-24452/com.demo.code D/IntervalOperatorFragment: Value: 1
2020-09-13 03:20:22.342 24452-24452/com.demo.code D/IntervalOperatorFragment: Thread: Thread[main,5,main]
2020-09-13 03:20:22.342 24452-24452/com.demo.code D/IntervalOperatorFragment: Value: 2
2020-09-13 03:20:23.342 24452-24452/com.demo.code D/IntervalOperatorFragment: Thread: Thread[main,5,main]
2020-09-13 03:20:23.342 24452-24452/com.demo.code D/IntervalOperatorFragment: Value: 3
2020-09-13 03:20:24.342 24452-24452/com.demo.code D/IntervalOperatorFragment: Thread: Thread[main,5,main]
2020-09-13 03:20:24.342 24452-24452/com.demo.code D/IntervalOperatorFragment: Value: 4
2020-09-13 03:20:25.341 24452-24452/com.demo.code D/IntervalOperatorFragment: Thread: Thread[main,5,main]
2020-09-13 03:20:25.342 24452-24452/com.demo.code D/IntervalOperatorFragment: Value: 5
2020-09-13 03:20:26.342 24452-24452/com.demo.code D/IntervalOperatorFragment: Thread: Thread[main,5,main]
2020-09-13 03:20:26.342 24452-24452/com.demo.code D/IntervalOperatorFragment: Task is complete

答案1

得分: -1

来自 Rx 文档:http://reactivex.io/documentation/operators/subscribeon.html

SubscribeOn 操作符指定了 Observable 将在哪个线程上开始执行,不论在操作符链的什么位置调用该操作符。另一方面,ObserveOn 影响了 Observable 在该操作符出现以下的部分将要使用的线程。

如果你将 takeWhile 放在 observeOn 之前,它将使用 IO 调度器

Observable.interval(INTERVAL_PERIOD, TimeUnit.SECONDS)
    .subscribeOn(Schedulers.io())
    .takeWhile { value ->
        Timber.tag(TAG).d("Thread: %s", Thread.currentThread())
        value <= MAXIMUM_PERIOD
    }
    .observeOn(AndroidSchedulers.mainThread())

在 observeOn 之下的任何部分都将使用 observeOn 操作符中提到的调度器。

英文:

From the Rx documentation: http://reactivex.io/documentation/operators/subscribeon.html
> the SubscribeOn operator designates which thread the Observable will begin operating on, no matter at what point in the chain of operators that operator is called. ObserveOn, on the other hand, affects the thread that the Observable will use below where that operator appears.

If you put takeWhile before observeOn then it will use IO scheduler

Observable.interval(INTERVAL_PERIOD, TimeUnit.SECONDS)
			.subscribeOn(Schedulers.io())
			.takeWhile { value -&gt;
				Timber.tag(TAG).d(&quot;Thread: %s&quot;,Thread.currentThread())
				value &lt;= MAXIMUM_PERIOD
			}
		.observeOn(AndroidSchedulers.mainThread())

Anything below observeOn will use the Scheduler mentioned in observeOn operator.

huangapple
  • 本文由 发表于 2020年9月13日 18:22:37
  • 转载请务必保留本文链接:https://go.coder-hub.com/63869649.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定