有没有更简洁的方式来使用 RxJava 适配标准的观察者/监听器服务?

huangapple go评论70阅读模式
英文:

Is there a cleaner way adapt a standard observer/listener service using RxJava?

问题

static class MockServiceRXAdapterImpl1 implements MockServiceRXAdapter {
    PublishSubject<MockResponse> mockResponseObservable = PublishSubject.create();
    MockService mockService = new MockService(mockResponse -> mockResponseObservable.onNext(mockResponse));
    final ConcurrentMap<String, Observable<String>> subscriptionMap = new ConcurrentHashMap<>();

    public Observable<String> getObservable(String id) {
        return Observable.using(
            () -> subscriptionMap.computeIfAbsent(
                id,
                key -> mockResponseObservable.filter(mockResponse -> mockResponse.id.equals(id))
                    .doOnSubscribe(disposable -> mockService.subscribe(id))
                    .doOnDispose(() -> {
                        mockService.unsubscribe(id);
                        subscriptionMap.remove(id);
                    })
                    .map(mockResponse -> mockResponse.value)
                    .replay(1)
                    .refCount()
            ),
            observable -> observable,
            observable -> {}
        );
    }
}
英文:

I've been tinkering with wrapping an old style listener interface using RxJava. What i've come up with seems to work, but the usage of Observable.using feels a bit awkward.

The requirements are:

  1. Only one subscription per id to the underlying service.
  2. The latest value for a given id should be cached and served to new subscribers.
  3. We must unsubscribe from the underlying service if nothing is listening to an id.

Is there a better way? The following is what I've got.

static class MockServiceRXAdapterImpl1 implements MockServiceRXAdapter {
	PublishSubject&lt;MockResponse&gt; mockResponseObservable = PublishSubject.create();
	MockService mockService = new MockService(mockResponse -&gt; mockResponseObservable.onNext(mockResponse));
	final ConcurrentMap&lt;String, Observable&lt;String&gt;&gt; subscriptionMap = new ConcurrentHashMap&lt;&gt;();

	public Observable&lt;String&gt; getObservable(String id) {
		return Observable.using(() -&gt; subscriptionMap.computeIfAbsent(
				id,
				key -&gt; mockResponseObservable.filter(mockResponse -&gt; mockResponse.id.equals(id))
						.doOnSubscribe(disposable -&gt; mockService.subscribe(id))
						.doOnDispose(() -&gt; {
							mockService.unsubscribe(id);
							subscriptionMap.remove(id);
						})
						.map(mockResponse -&gt; mockResponse.value)
						.replay(1)
						.refCount()),
				observable -&gt; observable,
				observable -&gt; {
				}
		);
	}
}

答案1

得分: 1

你可以使用 Observable.create

代码可能看起来像这样:

final Map<String, Observable<String>> subscriptionMap = new HashMap<>();
MockService mockService = new MockService();

public Observable<String> getObservable(String id) {
    log.info("looking for root observable");
    if (subscriptionMap.containsKey(id)) {
        log.info("found root observable");
        return subscriptionMap.get(id);
    } else {
        synchronized (subscriptionMap) {
            if (!subscriptionMap.containsKey(id)) {
                log.info("creating new root observable");
                final Observable<String> responseObservable = Observable.create(emitter -> {
                    MockServiceListener listener = emitter::onNext;
                    mockService.addListener(listener);
                    emitter.setCancellable(() -> {
                        mockServices.removeListener(listener);
                        mockService.unsubscribe(id);
                        synchronized (subscriptionMap) {
                            subscriptionMap.remove(id);
                        }
                    });
                    mockService.subscribe(id);
                })
                .filter(mockResponse -> mockResponse.id.equals(id))
                .map(mockResponse -> mockResponse.value)
                .replay(1)
                .refCount();

                subscriptionMap.put(id, responseObservable);
            } else {
                log.info("Another thread created the observable for us");
            }
            return subscriptionMap.get(id);
        }
    }
}
英文:

You may use Observable.create

So code may look like this

final Map&lt;String, Observable&lt;String&gt;&gt; subscriptionMap = new HashMap&lt;&gt;();
MockService mockService = new MockService();
public Observable&lt;String&gt; getObservable(String id) {
log.info(&quot;looking for root observable&quot;);
if (subscriptionMap.containsKey(id)) {
log.info(&quot;found root observable&quot;);
return subscriptionMap.get(id);
} else {
synchronized (subscriptionMap) {
if (!subscriptionMap.containsKey(id)) {
log.info(&quot;creating new root observable&quot;);
final Observable&lt;String&gt; responseObservable = Observable.create(emitter -&gt; {
MockServiceListener listener = emitter::onNext;
mockService.addListener(listener);
emitter.setCancellable(() -&gt; {
mockServices.removeListener(listener);
mockService.unsubscribe(id);
synchronized (subscriptionMap) {
subscriptionMap.remove(id);
}
});
mockService.subscribe(id);
})
.filter(mockResponse -&gt; mockResponse.id.equals(id))
.map(mockResponse -&gt; mockResponse.value)
.replay(1)
.refCount();
subscriptionMap.put(id, responseObservable);
} else {
log.info(&quot;Another thread created the observable for us&quot;);
}
return subscriptionMap.get(id);
}
}
}

答案2

得分: 0

以下是翻译好的内容:

我认为我已经使用.groupBy(...)使其工作。

在我的情况下,Response.getValue() 返回一个int,但是你可以理解:

class Adapter
{
    Subject<Response> msgSubject;
    ThirdPartyService service;
    Map<String, Observable<Integer>> observables;
    Observable<GroupedObservable<String, Response>> groupedObservables;

    public Adapter()
    {
        msgSubject = PublishSubject.<Response>create().toSerialized();
        service = new MockThirdPartyService(msgSubject::onNext);
        groupedObservables = msgSubject.groupBy(Response::getId);
        observables = Collections.synchronizedMap(new HashMap<>());
    }

    public Observable<Integer> getObservable(String id)
    {
        return observables.computeIfAbsent(id, this::doCreateObservable);
    }

    private Observable<Integer> doCreateObservable(String id)
    {
        service.subscribe(id);

        return groupedObservables
                .filter(group -> group.getKey().equals(id))
                .doOnDispose(() -> {
                    synchronized (observables)
                    {
                        service.unsubscribe(id);
                        observables.remove(id);
                    }
                })
                .concatMap(Functions.identity())
                .map(Response::getValue)
                .replay(1)
                .refCount();
    }
}
英文:

I think I've gotten it to work using .groupBy(...).

In my case Response.getValue() returns an int, but you get the idea:

class Adapter
{
Subject&lt;Response&gt; msgSubject;
ThirdPartyService service;
Map&lt;String, Observable&lt;Integer&gt;&gt; observables;
Observable&lt;GroupedObservable&lt;String, Response&gt;&gt; groupedObservables;
public Adapter()
{
msgSubject = PublishSubject.&lt;Response&gt;create().toSerialized();
service = new MockThirdPartyService( msgSubject::onNext );
groupedObservables = msgSubject.groupBy( Response::getId );
observables = Collections.synchronizedMap( new HashMap&lt;&gt;() );
}
public Observable&lt;Integer&gt; getObservable( String id )
{
return observables.computeIfAbsent( id, this::doCreateObservable );
}
private Observable&lt;Integer&gt; doCreateObservable( String id )
{
service.subscribe( id );
return groupedObservables
.filter( group -&gt; group.getKey().equals( id ))
.doOnDispose( () -&gt; {
synchronized ( observables )
{
service.unsubscribe( id );
observables.remove( id );
}
} )
.concatMap( Functions.identity() )
.map( Response::getValue )
.replay( 1 )
.refCount();
}
}

huangapple
  • 本文由 发表于 2020年4月9日 14:08:40
  • 转载请务必保留本文链接:https://go.coder-hub.com/61114945.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定