RxJava Operators: map(), scan()

October 19, 2018

map()

The map() operator transforms the emitted T items into R items by using a Function<? super T,? extends R> lambda.
For example we could take an Observable which emits Strings and parse these Strings  to LocalDate:

import io.reactivex.Observable;

import java.time.LocalDate;

public class Main {
    public static void main(String[] args) {
        Observable<String> dates = Observable.just("2018-11-12", "2019-12-12", "2019-02-12");

        dates.map(LocalDate::parse)
             .map(LocalDate::lengthOfMonth)
             .subscribe(System.out::println);

        System.out.println("-----");

        dates.map(x -> LocalDate.parse(x).lengthOfMonth())
             .subscribe(System.out::println);
    }
}

scan()

The scan() operator applies a function to the first item emitted by the Observable and then emits the result of the function as its own first emission. It behaves like a rolling aggregator: for every emission, you add it to the already accumulated result, then you emit the new incremental accumulated result.

import io.reactivex.Observable;

public class Main {
    public static void main(String[] args) {
        Observable<String> myObservable = Observable.just("a", "b", "c", "d", "e");

        myObservable.scan((x, y) -> x + y)
                    .subscribe(System.out::println);
    }
}

It is also possible to provide an initial value for the first argument or even to change the type of the aggregate.
For example if we want to have a rolling count of the emitted items, we could provide an initial value of 0. This initial value will be emitted first, so if we want to skip it, we can use the skip() operator:

import io.reactivex.Observable;

public class Main {

    public static void main(String[] args) {
        Observable<String> myObservable = Observable.just("a", "b", "c", "d", "e");

        myObservable.scan(0, (total, next) -> total + 1)
                    .skip(1)
                    .subscribe(s -> System.out.println("Total number of items so far: " + s));
    }
}

scan() can be used on infinite Observables as well, because it does not depend on calling the onComplete():

import io.reactivex.Observable;

import java.util.concurrent.TimeUnit;

public class Main {

    public static void main(String[] args) throws InterruptedException {
        Observable<Long> interval = Observable.interval(1, TimeUnit.SECONDS);

        interval.scan(0, (total, next) -> total + 1)
                .skip(1)
                .subscribe(s -> System.out.println("Total number of items so far: " + s));

        Thread.sleep(4500);
    }
}

 

András Döbröntey

About the Author

András Döbröntey

Leave a Comment:

Bitnami