RxJava 3 - How to create and observe Observables

dimitrilc 3 Tallied Votes 1K Views Share

Introduction

RxJava 3 includes 5 core classes:

  1. Flowable,
  2. Observable,
  3. Single,
  4. Completable,
  5. Maybe.

This tutorial aims to teach the basic concepts behind Observable, which serves as a foundation for understanding the other 4 classes.

Goals

At the end of the tutorial, you would have learned:

  1. How to create an Observable.
  2. How to subscribe to an Observable using an Observer.
  3. How to subscribe to an Observable using Consumers.

Prerequisite Knowledge

  1. Intermediate Java.
  2. Basic knowledge of the Observer pattern.

Tools Required

  1. A Java IDE such as IntelliJ Community Edition.
  2. The project uses Gradle 7.2 and JDK 17, you can use different versions if you wish.

Project Setup

To follow along with the tutorial, perform the steps below:

  1. Create a new Java Gradle project.

  2. In the build.gradle file, add the dependency for RxJava 3.

     implementation 'io.reactivex.rxjava3:rxjava:3.1.1'
  3. Create a new package called com.example under the src/main folder.

  4. Create a new class called Entry.

  5. Create the main() method inside the Entry class.

Creating an Observable

The Observable interface (io.reactivex.rxjava3.core.Observable) has a lot of builder methods that can be used to create an instance of Observable. For this tutorial, we will use the intervalRange() builder to simulate an async stream of data and to keep our code simple.

Below is the method signature of intervalRange():

    public static @NonNull Observable<Long> intervalRange(long start,
            long count,
            long initialDelay,
            long period,
            @NonNull TimeUnit unit)

The Observable from intervalRange() will emit a Long object after each period of waiting. The first parameter is the starting number that we want to emit. Each number after the first will increment by one. Once the amount of numbers emitted is equal to the count parameter, then the Observable will complete.

With the concept out of the way, we add the code into our main() method like below.

    Observable<Long> obs = Observable.intervalRange(1L, 5L, 0L, 2L, TimeUnit.SECONDS);

The line of code above will generate an Observable<Long> that will emit a n+1 number every two seconds to its subscribers.

Subscribing with an Observer

The Observable that we created in the previous section does nothing for now. For it to be useful, we must subscribe to it. One way to subscribe to an Observable is with an Observer.

An Observer (io.reactivex.rxjava3.core.Observer) is an interface that we can subclass to create an instance of. There are 4 methods that we must override:

  1. onSubscribe: The Observable will call this method when the subscription starts.
  2. onNext: The Observable will call this method when it emits a new item.
  3. onError: The Observable will call this method when it throws an exception.
  4. onComplete: The Observable will call this method when it is completed.

To subscribe an Observer to an Observable, we would need to use one of the instance methods from Observable. The method to use in this tutorial is blockingSubscribe() for simplicity. The method signature is:

    public final void blockingSubscribe(@NonNull Observer<? super T> observer)

We can either create a dedicated Observer class and pass its instance into blockingSubscribe(), or we can just create an anonymous class on the fly. Because The official docs tend to use an anonymous class, so we will do the same thing here.

In the Entry class, create a subWithObserver() method using the code snippet below.

    private static <T> void subWithObserver(Observable<T> obs){ //1

       obs.blockingSubscribe(new Observer<>(){ //2
           private Disposable disposable; //3

           @Override
           public void onSubscribe(@NonNull Disposable d) { //4
               this.disposable = d; //5
           }

           @Override
           public void onNext(@NonNull T item) { //6
               System.out.println("Received in Observer: " + item); //7
           }

           @Override
           public void onError(@NonNull Throwable e) {
               disposable.dispose(); //8
               e.printStackTrace();
           }

           @Override
           public void onComplete() {
               disposable.dispose(); //9
               System.out.println("Complete"); //10
           }
       });
    }

The above method takes an Observable argument and then subscribes an anonymous Observer to it. This Observer will print a line every time a new item is emitted. This Observer will also call dispose() on the Disposable object received from the Observable when the Observable is completed.

Now we are ready to call the method in main().

    subWithObserver(obs);

And the output would be:

    Received in Observer: 1
    Received in Observer: 2
    Received in Observer: 3
    Received in Observer: 4
    Received in Observer: 5
    Complete

Subscribing with Consumers

We can also subscribe to an Observable using Consumers, with each representing a callback (as opposed to having to override all 4 methods with Observer). For blockingSubscribe(), the maximum amount of callbacks that we can provide is three: onNext(), onError(), onComplete(). The method signature for this variant of blockingSubscribe() is:

    public final void blockingSubscribe(@NonNull Consumer<? super T> onNext,
            @NonNull Consumer<? super Throwable> onError,
            @NonNull Action onComplete)

As you can see, we only have to provide 2 Consumer(s) and one Action(similar to a Runnable). Create a method subWithConsumers() based on the code below.

    private static <T> void subWithConsumers(Observable<T> obs){
       obs.blockingSubscribe(
               item -> System.out.println("Received in Consumer: " + item),
               error -> error.printStackTrace(),
               () -> System.out.println("Complete")
       );
    }

The method above is almost exactly like the subWithObserver() method in functionality, but with much shorter code. There are also other variants which only require you to provide one or two Consumer(s) if you do not care about the other callbacks.

    public final void blockingSubscribe(@NonNull Consumer<? super T> onNext)
    public final void blockingSubscribe(@NonNull Consumer<? super T> onNext, @NonNull Consumer<? super Throwable> onError)

Finally, we can call the method in main like below.

    //subWithObserver(obs);
    subWithConsumers(obs);

Which will print the exact same thing as the subWithObserver() call:

    Received in Consumer: 1
    Received in Consumer: 2
    Received in Consumer: 3
    Received in Consumer: 4
    Received in Consumer: 5
    Complete

Solution Code

    package com.example;

    import io.reactivex.rxjava3.annotations.NonNull;
    import io.reactivex.rxjava3.core.Observable;
    import io.reactivex.rxjava3.core.Observer;
    import io.reactivex.rxjava3.disposables.Disposable;

    import java.util.concurrent.TimeUnit;

    public class Entry {
       public static void main(String[] args) {
           Observable<Long> obs = Observable.intervalRange(1L, 5L, 0L, 2L, TimeUnit.SECONDS); //1

           //subWithObserver(obs);
           subWithConsumers(obs);
       }

       private static <T> void subWithObserver(Observable<T> obs){ //1

           obs.blockingSubscribe(new Observer<>(){ //2
               private Disposable disposable; //3

               @Override
               public void onSubscribe(@NonNull Disposable d) { //4
                   this.disposable = d; //5
               }

               @Override
               public void onNext(@NonNull T item) { //6
                   System.out.println("Received in Observer: " + item); //7
               }

               @Override
               public void onError(@NonNull Throwable e) {
                   disposable.dispose(); //8
                   e.printStackTrace();
               }

               @Override
               public void onComplete() {
                   disposable.dispose(); //9
                   System.out.println("Complete"); //10
               }
           });
       }

       private static <T> void subWithConsumers(Observable<T> obs){
           obs.blockingSubscribe(
                   item -> System.out.println("Received in Consumer: " + item),
                   error -> error.printStackTrace(),
                   () -> System.out.println("Complete")
           );
       }
    }

Summary

We have learned how to create an Observable and subscribe to it using either an Observer or Consumers. The full project code can be found here https://github.com/dmitrilc/DaniWebRxJavaBuilders

Be a part of the DaniWeb community

We're a friendly, industry-focused community of developers, IT pros, digital marketers, and technology enthusiasts meeting, networking, learning, and sharing knowledge.