在流方法链接中添加了SubscribeOn和ObserveOn后,不触发操作。

huangapple go评论59阅读模式
英文:

After added SubscribeOn and ObserveOn in stream method chaining not getting trigger

问题

我正在使用 `rxjava3`,但在添加了 `ObserveOn``SubscribeOn`无法理解为什么流中的方法没有被调用

以下是示例 Java 代码

```java
package mytestapp.error;

import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.schedulers.Schedulers;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import org.json.JSONTokener;

public class App {
    public static void main(String[] args) {
        System.out.println("Hello World!");
        String apiUrl = "myApiUrl";
        try {
            App app = new App();
            app.syncNow(apiUrl);
        } catch (JSONException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    private void syncNow(String apiUrl) throws JSONException, IOException {
        createOrAlterTable().observeOn(Schedulers.newThread())
                .switchMap(d -> menuTableRecords(apiUrl))
                .observeOn(Schedulers.newThread()).subscribe(res -> {
                    System.out.println(res);

                }, onError -> {
                    System.out.println(onError);
                }, () -> {
                    System.out.println("Completed!!");
                });
        ;
    }

    private @NonNull Observable<Object> createOrAlterTable()
            throws IOException, JSONException {
        // Read table from backend
        // Read last sync file
        // get user data
        return Observable.zip(readTableFromBackend(), readLastSyncFile(),
                getUserData(),
                (s1, s2, s3) -> readTableFromBackendZipperFun(s1, s2, s3)).subscribeOn(Schedulers.io())
                .observeOn(Schedulers.io()).map(
                        d -> d);
    }

    private @NonNull Observable<String> readTableFromBackend()
            throws JSONException, IOException {
        return Observable.fromArray("testing");
    }

    private @NonNull Observable<JSONObject> readLastSyncFile()
            throws JSONException {
        return Observable.fromArray(new JSONObject());
    }

    private @NonNull Observable<Boolean> getUserData() throws JSONException {
        return Observable.fromArray(true);
    }

    private JSONArray readTableFromBackendZipperFun(String sqlliteDDL,
            JSONObject lastFV, boolean userDataFlag) throws JSONException {
        System.out.println("zip ops");
        return new JSONArray();
    }

    private @NonNull Observable<String> menuTableRecords(String apiUrl)
            throws JSONException, IOException {

        return Observable.fromArray("MENU_TABLE_RECORDS");
    }
}

我想在单独的线程上执行每个方法,并且在订阅时也在不同的线程上执行。

是什么原因导致出现问题?如何解决上述情况?

谢谢。


<details>
<summary>英文:</summary>
I am using `rxjava3` and not quite understanding why method are not getting called in stream after added `ObserveOn` and `SubscribeOn`.
Here is the example java code:
package mytestapp.error;
import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.schedulers.Schedulers;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import org.json.JSONTokener;
/**
* Hello world!
*
*/
public class App {
public static void main(String[] args) {
System.out.println(&quot;Hello World!&quot;);
String apiUrl = &quot;myApiUrl&quot;;
try {
App app = new App();
app.syncNow(apiUrl);
} catch (JSONException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
private void syncNow(String apiUrl) throws JSONException, IOException {
createOrAlterTable().observeOn(Schedulers.newThread())
.switchMap(d -&gt; menuTableRecords(apiUrl))
.observeOn(Schedulers.newThread()).subscribe(res -&gt; {
System.out.println(res);
}, onError -&gt; {
System.out.println(onError);
}, () -&gt; {
System.out.println(&quot;Completed!!&quot;);
});
;
}
private @NonNull Observable&lt;Object&gt; createOrAlterTable()
throws IOException, JSONException {
// Read table from backend
// Read last sync file
// get user data
return Observable.zip(readTableFromBackend(), readLastSyncFile(),
getUserData(),
(s1, s2, s3) -&gt; readTableFromBackendZipperFun(s1, s2, s3)).subscribeOn(Schedulers.io())
.observeOn(Schedulers.io()).map(
d -&gt; d);
}
private @NonNull Observable&lt;String&gt; readTableFromBackend()
throws JSONException, IOException {
return Observable.fromArray(&quot;testing&quot;);
}
private @NonNull Observable&lt;JSONObject&gt; readLastSyncFile()
throws JSONException {
return Observable.fromArray(new JSONObject());
}
private @NonNull Observable&lt;Boolean&gt; getUserData() throws JSONException {
return Observable.fromArray(true);
}
private JSONArray readTableFromBackendZipperFun(String sqlliteDDL,
JSONObject lastFV, boolean userDataFlag) throws JSONException {
System.out.println(&quot;zip ops&quot;);
return new JSONArray();
}
private @NonNull Observable&lt;String&gt; menuTableRecords(String apiUrl)
throws JSONException, IOException {
return Observable.fromArray(&quot;MENU_TABLE_RECORDS&quot;);
}
}
I want to execute each method on separate thread and on subscription also on different thread.
what is the cause making problem .how to solve above case?
Thanks.
</details>
# 答案1
**得分**: 1
看起来你在`main`方法中没有等待异步活动完成,所以你的应用程序直接退出。推荐阅读:https://github.com/ReactiveX/RxJava#simple-background-computation
在这种特定情况下,你可以通过使用`blockingSubscribe`来解决这个问题,而不仅仅是使用`subscribe`。
<details>
<summary>英文:</summary>
Looks like you don&#39;t wait for the async activity to finish in the `main` method and your application just quits. Recommended reading: https://github.com/ReactiveX/RxJava#simple-background-computation
You can workaround the problem by using `blockingSubscribe` instead of just `subscribe` in this particular case.
</details>

huangapple
  • 本文由 发表于 2020年9月2日 17:31:07
  • 转载请务必保留本文链接:https://go.coder-hub.com/63702704.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定