观察在订阅者被移除/释放时关闭可关闭资源

huangapple go评论59阅读模式
英文:

Observing closeable resources to be closed when the subscriber is removed/disposed

问题

我正在处理一个小型子系统,使用RxJava 2集成了两个简单组件。

这两个组件以简单的客户端-服务器方式工作,其中第一个组件生成可观察数据,在底层打开资源。

该资源对第二个组件不可见。

此外,只要可观察对象在使用中,它必须保持打开状态,但是可观察对象无法确定何时应该关闭它。

以代码来说,一个示例实现如下:

private Disposable disposable;

public void onCreate() {
    final Maybe<Object> maybeResource = Maybe.defer(() -> {
        System.out.println("open");
        // 这里是底层的资源,它封装在可观察对象中,永远不会被公开
        final Closeable resource = () -> { };
        return Maybe.just(resource)
                .doOnDispose(() -> {
                    // 这个“析构函数”从未被调用,导致资源泄漏
                    System.out.println("close");
                    resource.close();
                })
                // 任意数据,不代表我正在处理的数据,但它将资源隐藏起来
                .map(closeable -> new Object());
    });
    disposable = maybeResource.subscribe(data -> System.out.println("process: " + data));
}

public void onUserWorflow() {
    // ...
    System.out.println("... ... ...");
    // ...
}

public void onDestroy() {
    disposable.dispose();
}

我预期的输出是:

open
process: <...>
... ... ...
close         <-- 这一行永远不会产生

但是最后一行 close 从未产生,因为 doOnDispose 方法未被调用,它并没有按照我预期的方式工作。

因此,资源从未被释放。RxJava/RxJava 2 是否有一种方法可以在释放订阅者时关闭“可关闭”资源呢?

英文:

I'm working on a small subsystem that integrates two simple components using RxJava 2.
These two components work in a simple client-server manner where the first component produces observable data opening a resource under the hood.
The resource is not exposed to the second component.
Moreover, it must be open as long as the observable is in use, however the observable object cannot determine when it should be closed.
Speaking in code, an example implementation is like this:

private Disposable disposable;

public void onCreate() {
    final Maybe&lt;Object&gt; maybeResource = Maybe.defer(() -&gt; {
        System.out.println(&quot;open&quot;);
        // here is the resource under the hood, it is encapsulated in the observable and never gets exposed
        final Closeable resource = () -&gt; { };
        return Maybe.just(resource)
                .doOnDispose(() -&gt; {
                    // this &quot;destructor&quot; is never called, resulting in a resource leak
                    System.out.println(&quot;close&quot;);
                    resource.close();
                })
                // arbitrary data, does not represent the data I&#39;m working with, but it hides the resource away
                .map(closeable -&gt; new Object());
    });
    disposable = maybeResource.subscribe(data -&gt; System.out.println(&quot;process: &quot; + data));
}

public void onUserWorflow() {
    // ...
    System.out.println(&quot;... ... ...&quot;);
    // ...
}

public void onDestroy() {
    disposable.dispose();
}

The output I'd anticipate to get is:

open
process: &lt;...&gt;
... ... ...
close         &lt;-- this is never produced

but the last line, close, is never produced as the doOnDispose method is not invoked and does not work as I might think it's supposed to.
Therefore the resource gets never released.
There is also Maybe.using that does a similar thing, but it does not allow to "span" across the "user workflow".

Is there an RxJava/RxJava 2 way that allows managing "closeable" resources closed on disposing a subscriber?

答案1

得分: 2

我猜你需要使用 Observable.create() 而不是 Maybe。类似这样:

final Observable<Object> resourceObservable = Observable.create(emitter -> {
    // 做你的事情
    emitter.onNext(new Object()); // 让 Observable 发射一些内容
    emitter.setCancellable(() -> {
        System.out.println("close");
        resource.close();
    });
});

disposable = resourceObservable.subscribe(data -> System.out.println("process: " + data));
英文:

i guess you need to use Observable.create() instead of Maybe.
Something like that:

final Observable&lt;Object&gt; resourceObservable = Observable.create&lt;Object&gt; {(emitter -&gt;
        // do you staff
        emitter.onNext(new Object()); //to make observable emit something
        emitter.setCancellable ( 
           System.out.println(&quot;close&quot;);
           resource.close(); 
        )
    );

disposable = resourceObservable.subscribe(data -&gt; System.out.println(&quot;process: &quot; + data));

huangapple
  • 本文由 发表于 2020年10月2日 07:17:09
  • 转载请务必保留本文链接:https://go.coder-hub.com/64164337.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定