RxJava:如何使代码在异步情况下工作?

huangapple go评论59阅读模式
英文:

RxJava: how to make code working asynchronously?

问题

请纠正我对异步工作的结论是否正确,并提出任何关于如何使这个工作异步的建议。

这段代码执行了将计算结果设置到网格单元格中。根据我的理解,异步工作应该分批次显示网格单元格。所以如果我们有8个核心,那么结果就可以看到8个单元格,然后在一段时间后再看到另外8个,依此类推(如果我设置了一些时间延迟的话)。但目前的情况是,结果单元格一个接一个地显示出来。

Model(模型):

public class ListRepository implements ListRepositoryInterface {
    // ...(其他代码部分不翻译)
}

Presenter(呈现者):

public void calculate(int inputNumber) {
    // ...(其他代码部分不翻译)
}

UPD(更新):

private void run() {
    // ...(其他代码部分不翻译)
}

// ...(其他代码部分不翻译)

Gradle:

dependencies {
    implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'
    implementation 'io.reactivex.rxjava3:rxjava:3.0.4'
}

如果您需要进一步的翻译或解释,请提出具体的问题。

英文:

Please, coorect me if I wrong in my conclusions about async work and give any suggestions how to make this work asynchronously.

This code performs setting result of calculations in grid cells.
As I assume asynchronous work should display grid cells by portions. So if we have 8 cores as result we can see that 8 cells displayed and during some time another 8 and so on (in case if I set some time delay). But for now as result cells displaying become one by one.

Model:

public class ListRepository implements ListRepositoryInterface {
private final Integer insertValue = 1000000;

private HashMap<String, BaseUnit> unitMap = new HashMap<>();

@Inject
public ListRepository() {}

public PublishSubject<BaseUnit> exec(int inputNumber) {

    PublishSubject<BaseUnit> subject = PublishSubject.create();
    Observable<BaseUnit> observable = getListObservable(inputNumber)
            .subscribeOn(Schedulers.computation())
            .flatMap(resultList ->

                Observable.fromIterable(resultList)
                        .flatMap(listElem ->
                        Observable.fromArray(ListOperationName.values())
                                .map(operationElem -> {

                            ListUnit unit = new ListUnit(operationElem, listElem, 0);
                            calculate(unit, listElem);
                            unitMap.put(unit.getViewId(), unit);
                            return unit;
                        })
                )
            );
    observable.subscribe(subject);
    return subject;
}


private Observable<ArrayList<List<Integer>>> getListObservable(int inputNumber) {
    return Observable.fromCallable(() -> {

        ArrayList<List<Integer>> list = new ArrayList<>();

        Integer[] populatedArray = new Integer[inputNumber];
        Arrays.fill(populatedArray, insertValue);

        list.add(new ArrayList<>(Arrays.asList(populatedArray)));
        list.add(new LinkedList<>(Arrays.asList(populatedArray)));
        list.add(new CopyOnWriteArrayList<>(Arrays.asList(populatedArray)));

        return list;
    });
}


private void calculate(ListUnit unit, List<Integer> list) {
    try {
        TimeUnit.MILLISECONDS.sleep(50);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    double start = getTime();

    //noinspection SynchronizationOnLocalVariableOrMethodParameter
    synchronized (list) {

        switch (unit.getOperationName()) {
            case ADD_FIRST:
                list.add(0, insertValue);
                break;
            case ADD_MID:
                list.add(list.size() / 2, insertValue);
                break;
            case ADD_LAST:
                list.add(insertValue);
                break;
            case SEARCH:
                //noinspection unused
                boolean contains = list.contains(insertValue);
                break;
            case RM_FIRST:
                list.remove(0);
                break;
            case RM_MID:
                list.remove(list.size() / 2);
                break;
            case RM_LAST:
                list.remove(list.size() - 1);
                break;
        }
    }

    unit.setTime(getTime() - start);
}

private double getTime() {
    return System.nanoTime();
}

public HashMap<String, BaseUnit> getUnitMap() {
    return unitMap;
}

}

Presenter:

public void calculate(int inputNumber) {
    fragment.showAllProgressBars();

    PublishSubject<BaseUnit> subject = repository.exec(inputNumber);

        Disposable disposable = subject.observeOn(AndroidSchedulers.mainThread())
                .subscribe(unit -> {
                    fragment.setCellText(unit.getViewId(), unit.getTimeString());

                }, Throwable::printStackTrace);

}

UPD: now I made test example and trying to subscribe to subject correctly.
If I use subject.onNext() it works asynchronously, but I assume this is wrong because it doesn't get to check subject.hasComplete() and get "true".
See comments "TODO" below.

private void run() {

    Log.d("APP", "INIT");

    PublishSubject<String> subject = exec(1000000);
    subject.observeOn(AndroidSchedulers.mainThread())
        .subscribe(unit -> {
            Log.d("STRING RESULT = ", unit);
            if (subject.hasComplete()) {
                //TODO: this condition should work if we use observable.subscribe(subject), not subject.onNext
                Log.d("SUBJECT", "COMPLETED");
            }
        }, Throwable::printStackTrace);
}

private int insertValue = 1000000;

public PublishSubject<String> exec(int inputNumber) {
    PublishSubject<String> subject = PublishSubject.create();

    getListObservable(inputNumber)
        .flatMap(resultList -> getOperationsObservable()
        .flatMap(operationElem -> getResultListObservable(resultList)
            .map(listElem ->
                                    calculate(operationElem, listElem)
//TODO: should be smth like this, i.e. calculate.subscribe(subject)
//                        .subscribe(subject)
                                        .subscribe(subject::onNext)

                        )
                )
        ).subscribe();

    return subject;
}


private Observable<ArrayList<List<Integer>>> getListObservable(int inputNumber) {
    return Observable.fromCallable(() -> {

        ArrayList<List<Integer>> list = new ArrayList<>();

        Integer[] populatedArray = new Integer[inputNumber];
        Arrays.fill(populatedArray, insertValue);

        list.add(new ArrayList<>(Arrays.asList(populatedArray)));
        list.add(new LinkedList<>(Arrays.asList(populatedArray)));
        list.add(new CopyOnWriteArrayList<>(Arrays.asList(populatedArray)));

        return list;
    }).subscribeOn(Schedulers.computation());
}


private Observable<String> calculate(ListOperationName operationName, List<Integer> list) {
    return Observable.fromCallable(() -> {
        try {
            TimeUnit.MILLISECONDS.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        double start = getTime();

        synchronized (list) {

            switch (operationName) {
                case ADD_FIRST:
                    list.add(0, insertValue);
                    break;
                case ADD_MID:
                    list.add(list.size() / 2, insertValue);
                    break;
                case ADD_LAST:
                    list.add(insertValue);
                    break;
                case SEARCH:
                    //noinspection unused
                    boolean contains = list.contains(insertValue);
                    break;
                case RM_FIRST:
                    list.remove(0);
                    break;
                case RM_MID:
                    list.remove(list.size() / 2);
                    break;
                case RM_LAST:
                    list.remove(list.size() - 1);
                    break;
            }
        }
        return Double.toString(getTime() - start);
    }).subscribeOn(Schedulers.newThread());
}

private Observable<ListOperationName> getOperationsObservable() {
    return Observable.fromArray(ListOperationName.values());
}

private Observable<List<Integer>> getResultListObservable(ArrayList<List<Integer>> list) {
    return Observable.fromIterable(list);
}

private double getTime() {
    return System.nanoTime();
}

public enum ListOperationName {
    ADD_FIRST,
    ADD_MID,
    ADD_LAST,
    SEARCH,
    RM_FIRST,
    RM_MID,
    RM_LAST;
}

Gradle:

    dependencies {
    implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'
    implementation 'io.reactivex.rxjava3:rxjava:3.0.4'
}

答案1

得分: 1

如果我从calculate(...)方法中移除.subscribeOn(Schedulers.newThread());,并对exec(...)方法进行以下更改,我似乎可以获得所期望的并行性。

public Observable<String> exec(int inputNumber) {
    return getListObservable(inputNumber)
        .flatMap(resultList -> getOperationsObservable()
            .flatMap(operationElem -> getResultListObservable(resultList)
                .flatMap(listElem -> Observable.just(listElem)
                    .subscribeOn(Schedulers.computation())
                    .flatMap(__ -> calculate(operationElem, listElem)))));
}

我移除了PublishSubject - 这似乎是多余的,你可以直接订阅生成的Observable,不是吗?

英文:

If I remove .subscribeOn(Schedulers.newThread()); from the calculate(...) method in your most recent example and make the following change to exec(...), I seem to get the desired parallelism.

public Observable&lt;String&gt; exec( int inputNumber )
{
	return getListObservable( inputNumber )
			.flatMap( resultList -&gt; getOperationsObservable()
				.flatMap( operationElem -&gt; getResultListObservable( resultList )
						.flatMap( listElem -&gt; Observable.just( listElem )
								.subscribeOn( Schedulers.computation() )
								.flatMap( __ -&gt; calculate( operationElem, listElem )))));
}

I removed PublishSubject - it seems redundant, you can just subscribe to the resulting Observable, no?

huangapple
  • 本文由 发表于 2020年7月21日 19:24:52
  • 转载请务必保留本文链接:https://go.coder-hub.com/63013443.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定