RxJava: defer execution of a function with new operator fromCallable()

Hello, dear reader!

RxJava 1.0.15 was released recently and it brings new operator fromCallable() for both rx.Observable and rx.Single!

Imagine you need to defer execution of function that downloads file from network or throws checked IOException.

First way

Custom Operator

Observable.create(subscriber -> {  
  try {
    subscriber.onNext(downloadFileFromNetwork());
    subscriber.onCompleted();
  } catch (IOException e) {
    subscriber.onError(e);
  }
});

// Don't use Observable.create() if you can, it's very easy to shoot yourself in the foot! (and then shoot again for each new subscriber!)

Second way

Operator Defer

Observable.defer(() -> {  
    try {
      return Observable.just(downloadFileFromNetwork());
    } catch (IOException e) {
      return Observable.error(e);
    }
  });

With defer you had to explicitly return Observable and deal with checked exceptions == write more code:

New, third way

Operator FromCallable

Observable.fromCallable(() -> downloadFileFromNetwork());  

It's a one-liner now! It deals with checked exceptions, no more weird Observable.just() and Observable.error() for such easy thing as deferring code execution!

Notice that fromCallable() does not share the result (emission) between subscribers, it'll execute passed function for each new subscriber. You can use "Connectable Observable Operators" to control lifecycle of the observable.


Observable.defer() is still useful if you need to return Observable instead of a value.

Links:

https://twitter.com/artem_zin

comments powered by Disqus