Handle Paging with RxJava

You could model it recursively: Observable<ApiResponse> getPageAndNext(int page) { return getResults(page) .concatMap(new Func1<ApiResponse, Observable<ApiResponse>>() { @Override public Observable<ApiResponse> call(ApiResponse response) { // Terminal case. if (response.next == null) { return Observable.just(response); } return Observable.just(response) .concatWith(getPageAndNext(response.next)); } }); } Then, to consume it, getPageAndNext(0) .concatMap(new Func1<ApiResponse, Observable<ResponseObject>>() { @Override public Observable<ResponseObject> call(ApiResponse response) { return Observable.from(response.results); … Read more

Android RxJava 2 JUnit test – getMainLooper in android.os.Looper not mocked RuntimeException

This error occurs because the default scheduler returned by AndroidSchedulers.mainThread() is an instance of LooperScheduler and relies on Android dependencies that are not available in JUnit tests. We can avoid this issue by initializing RxAndroidPlugins with a different Scheduler before the tests are run. You can do this inside of a @BeforeClass method like so: … Read more

Paginate Observable results without recursion – RxJava

JohnWowUs’ answer is great and helped me understand how to avoid the recursion effectively, but there were some points I was still confused about, so I’m posting my tweaked version. Summary: The individual pages are returned as a Single. Use a Flowable to stream each of the items contained in the pages. This means callers … Read more

Single Observable with Multiple Subscribers

After checking back with RxJava developer Dávid Karnok I’d like to propose a full explanation of what was going on here. share() is defined as publish().refCount(), i. e. the source Observable is first transformed to a ConnectableObservable by publish() but instead of having to call connect() “manually” that part is handled by refCount(). In particular, … Read more

What is the difference between flatmap and switchmap in RxJava?

According to the documentation ( http://reactivex.io/documentation/operators/flatmap.html ) the switchMap is like the flatMap, but it will only emit items from the new observable until a new event is emitted from the source observable. The marble diagram shows it well. Notice the difference in the diagrams: In switchMap the second original emission (green marble) does not … Read more

When should one use RxJava Observable and when simple Callback on Android?

For simple networking stuff, the advantages of RxJava over Callback is very limited. The simple getUserPhoto example: RxJava: api.getUserPhoto(photoId) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Action1<Photo>() { @Override public void call(Photo photo) { // do some stuff with your photo } }); Callback: api.getUserPhoto(photoId, new Callback<Photo>() { @Override public void onSuccess(Photo photo, Response response) { } }); The RxJava … Read more

Get response status code using Retrofit 2.0 and RxJava

Instead of declaring the API call like you did: Observable<MyResponseObject> apiCall(@Body body); You can also declare it like this: Observable<Response<MyResponseObject>> apiCall(@Body body); You will then have a Subscriber like the following: new Subscriber<Response<StartupResponse>>() { @Override public void onCompleted() {} @Override public void onError(Throwable e) { Timber.e(e, “onError: %”, e.toString()); // network errors, e. g. UnknownHostException, … Read more

rxjava: Can I use retry() but with delay?

You can use the retryWhen() operator to add retry logic to any Observable. The following class contains the retry logic: RxJava 2.x public class RetryWithDelay implements Function<Observable<? extends Throwable>, Observable<?>> { private final int maxRetries; private final int retryDelayMillis; private int retryCount; public RetryWithDelay(final int maxRetries, final int retryDelayMillis) { this.maxRetries = maxRetries; this.retryDelayMillis = … Read more