Cold and Hot Observables

October 10, 2018

A major characteristic of Observables is how they behave when there are multiple Observers. Based on this, we differentiate between Hot and Cold Observables.

Cold Observables

You can imagine Cold Observables like a DVD disk with a movie on it. Each viewer of the DVD can watch the whole movie at any time.
Similarly, Cold Observables will replay their emissions to each Observer, thus ensuring that all Observers get all the data.
Most Observables which are data-driven and emit finite datasets are cold, for example the factories of Observable.just(), Observable.fromIterable() or Observable.fromArray().
Example of two Observers being subscribed to one Observable:

import io.reactivex.Observable;

public class Main {
    public static void main(String[] args) {
        Observable<String> myObservable = Observable.just("A", "B", "C", "D", "E");

        myObservable.subscribe(s -> System.out.println("Observer 1: " + s));

        myObservable.subscribe(s -> System.out.println("Observer 2: " + s));
    }
}

Hot Observables

If we imagine Cold Observables like DVD movies, Hot Observables are like a cinema: a cinema plays the same movie to all viewers at the same time.
Hot Observables broadcast the same emissions to all Observers at the same time. If an Observer subscribes to a Hot Observable and receives some emissions, then a second Observer subscribes a bit later, this second Observer missed the emissions the first Observer received.
Hot Observables are often used to represent events. Of course these events might contain some data, but they are time-sensitive: late Observers can miss the previously emitted data.

Making Cold Observables Hot

With the help of ConnectableObservable we can take any Observable, even cold ones and turn them into hot.
It is easy to perform this conversion, we just have to call the publish() method on any Observable which returns a ConnectableObservalbe.
However, when we subscribe to these, the emissions will not start, for that we need to call the connect() method.

import io.reactivex.observables.ConnectableObservable;

public class Main {
    public static void main(String[] args) throws InterruptedException {
        ConnectableObservable<String> myObservable = Observable.just("A", "B", "C", "D", "E").publish();

        myObservable.subscribe(s -> System.out.println("Observer 1: " + s));

        myObservable.subscribe(s -> System.out.println("Observer 2: " + s));

        myObservable.connect();
    }
}

When you run this code and compere it with the previous example, you will notice that the output is “scrambled”. Now the first Observer will not process all the emissions before the second can start, each emission is being propagated to each Observer simultaneously (this is also called multicasting).
This multicasting is rather helpful if replaying the emissions is expensive and you would like to emit them to all Observers simultaneously.

András Döbröntey

About the Author

András Döbröntey

Leave a Comment:

Bitnami