Observable factories

October 11, 2018

There are various factory methods available to create Observables. In this post we will go through some of them.

Observable.range() and rangeLong()

In a previous post, we created an Observable using an IntStream. To emit a consecutive range of integers, there is a shorter solution available:

import io.reactivex.Observable;

public class Main {
    public static void main(String[] args) {
        Observable<Integer> myObservable = Observable.range(0, 1000);

        myObservable.filter(i -> i % 75 == 0)
                    .subscribe(System.out::println);

    }
}

A similar factory method is also available for Long: Observable.rangeLong()

Observable.interval()

This factory is time-based, with the help of this method, emissions can be spread out over a time period:

import io.reactivex.Observable;
import java.util.concurrent.TimeUnit;

public class Main {
    public static void main(String[] args) throws InterruptedException {
        Observable.interval(1, TimeUnit.SECONDS)
                  .subscribe(s -> System.out.println(s + " second(s) passed!"));

        Thread.sleep(5500);

    }
}

Observable.interval() needs a separate thread (it uses the Schedulers.computation()), because it emits infinitely at the specified interval.
This means that our main method will start the Observable, but will not wait for it to finish, hence we added the Thread.sleep() call.
Even though it emits infinitely, Observable.interval() is not hot:

import io.reactivex.Observable;
import java.util.concurrent.TimeUnit;

public class Main {
    public static void main(String[] args) throws InterruptedException {
        Observable<Long> interval = Observable.interval(1, TimeUnit.SECONDS);

        interval.subscribe(s -> System.out.println("Observer1: " + s + " second(s) passed!"));
        Thread.sleep(2500);

        interval.subscribe(s -> System.out.println("Observer2: " + s + " second(s) passed!"));
        Thread.sleep(5500);
    }
}

When you run this code, you will notice that the second Observer prints out 0. This means that this Observable is cold.
To make it hot, we would need to use ConnectableObservable<Long>.

Observable.fromFuture()

If there is already existing code or library which uses Futures, these Futures can be turned into Observables as well:

import io.reactivex.Observable;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class Main {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(1);
        Future<String> future = executor.submit(() -> "String from a Future");

        Observable<String> myObservable = Observable.fromFuture(future);

        myObservable.subscribe(s -> System.out.println("Observer: " + s));
    }
}

Observable.defer()

Observable.defer() is used to prove a separate state for each Observer.
Imagine that your source is stateful. If the state is changed and the Observable does not capture this state change, it might emit items which are obsolete.
For example let’s say we have a stateful Observable which provides a String message.
If an Observer subscribes, then the message is modified, then a second Observer subscribes, it will not see this change.
To solve this, we would need an Observable which can create a fresh Observable for each subscription which uses the latest value:

import io.reactivex.Observable;

public class Main {
    private static String state = "State A";

    public static void main(String[] args) {
        Observable<String> myObservable = Observable.defer(() -> Observable.just(state));

        myObservable.subscribe(s -> System.out.println("Observer 1: " + s));

        state = "State B";

        myObservable.subscribe(s -> System.out.println("Observer 2: " + s));
    }
}

 

András Döbröntey

About the Author

András Döbröntey

Leave a Comment:

Bitnami