Skip to main content

Reactive Spring - Getting Started

Greetings!

Reactive programming is very popular now. Every passionate Java developer wants to try out it with Webflux which is Spring's way of reactive web development. 

In this first tutorial, I am not trying to explain Reactive Programming or Spring WebFlux or anything in detail. Instead, I would like to just play with it. Let's practice a little then come to theory on another day.

Assumption: You know how to create and run a Spring Boot application.
Warning: Do not try hard, just play with it.

Full source code of this article - reactive-spring-hello

Create the project

Go to Spring initializer (https://start.spring.io/) and fill out project metadata. Add the below dependencies.
Spring Reactive Web, Lombok
Download and open the project with your desired IDE.

Say hello Reactively

Let's quickly create an endpoint (Controller) and add the below contents. Compile and run it.
http://localhost:8080/mono, http://localhost:8080/flux
package com.slmanju.reactivespringhello.controller;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@RestController
public class HelloController {

  @GetMapping("mono")
  public Mono<String> helloMono() {
    return Mono.just("Hello Reactive Programming");
  }

  @GetMapping("flux")
  public Flux<String> helloFlux() {
    return Flux.just("Hello", "Reactive", "Programming");
  }

}

Mono and Flux

These are the two publishers which Spring offers via Project Reactor. Mono is for 0 or 1 element while Flux is for 0 or many elements. Do not think too much about it now as we just want to play with these. Just understand that you always return Mono or Flux.
Like in Java Streams, you use chains of operations with dots.
@GetMapping("flux2")
public Flux<String> helloFlux2() {
  return Flux.just("Apple", "Orange", "Banana", "Avocado", "Mango")
      .filter(fruit -> fruit.startsWith("A"))
      .map(String::toUpperCase);
}
To make this more interesting we can add a few more operations;
hint: try adding log()
@GetMapping("flux2")
public Flux<String> helloFlux2() {
  return Flux.just("Apple", "Orange", "Banana", "Avocado", "Mango")
      .doOnNext(fruit -> System.out.println("Processing " + fruit))
      .filter(fruit -> fruit.startsWith("A"))
      .map(String::toUpperCase)
      .doOnSubscribe(subscription -> System.out.println("Subscribed to Fruit stream"))
      .doOnComplete(() -> System.out.println("Fruit stream completed"))
      .doOnTerminate(() -> System.out.println("Fruit stream terminated"));
}
Let's create another example that accepts the number and return the square of it. To make this more Spring friendly, I'm creating a Service class.

A simple service

package com.slmanju.reactivespringhello.service;

import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;

@Service
public class MathService {

  public Mono<Integer> square(int value) {
    int square = value * value;
    return Mono.just(square);
  }
}
@RestController
public class MathController {

  @Autowired
  private MathService mathService;

  @GetMapping("square/{input}")
  public Mono<Integer> square(@PathVariable("input") int input) {
    return mathService.square(input);
  }
}
You can run this and access http://localhost:8080/square/8, what you achieved here is NOT a reactive endpoint. Why?

Reactive Pipeline

We did the above square calculation outside the reactive pipeline. This is very important. Do not do calculations outside the reactive pipeline as it will block the main thread. Mono.just is for known values. When you have something like this, you can use fromSupplier as below.
public Mono<Integer> square(int value) {
  Supplier<Integer> supplier = () -> value * value;
  return Mono.fromSupplier(supplier);
}
Instead of returning an integer directly, let's wrap it in a data transfer object.
public class Response {
  private int result;
}
public Mono<Response> square2(int value) {
  return Mono.fromSupplier(() -> value * value)
      .map(Response::new);
}
As you can see, it is like using Java Streams (at least it feels that way). Anyway, Java 8 streams are not reactive. We will add a new endpoint to support this but will return ResponseEntity to make it more fun. I'm just showing you that we can do more operations on the pipeline.
@GetMapping("square2/{input}")
public Mono<ResponseEntity<Response>> square2(@PathVariable("input") int input) {
  return mathService.square2(input)
      .map(ResponseEntity::ok);
}

Let's do more

Now things are getting more interesting. Let's create another endpoint to return multiplications for a given number.
Flux.range creates a sequence by starting a number to a given count.
public Flux<Response> multiplications(int value) {
  return Flux.range(1, 10)
      .doOnNext(i -> System.out.println("Reactive math service is processing " + i))
      .map(i -> i * value)
      .map(Response::new);
}
@GetMapping("multiplications/{input}")
public Flux<Response> multiplications(@PathVariable("input") int input) {
  return mathService.multiplications(input);
}

Add a delay

Real-life operations will take time. It may be database access, other network calls, etc. Hence we can introduce a Thread.sleep to produce a delay.
public Flux<Response> multiplications(int value) {
  return Flux.range(1, 10)
      .doOnNext(i -> System.out.println("Reactive math service is processing " + i))
      .doOnNext(i -> {
        try {
          Thread.sleep(1000);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      })
      .map(i -> i * value)
      .map(Response::new);
}
Now if you navigate to your browser (or postman, etc) and request multiplications you can see the delay we added (http://localhost:8080/multiplications/8). Even though this looks correct, this is problematic. Why?

To demonstrate that, enter the URL again and wait for 5 seconds, then click the browser's cancel request. After that check, your console result. You will see that, even though you have canceled the request still it is processing. This is how the thread request model work. It does not know that the user has canceled the request.
However, we are using reactive programming where users can cancel their subscriptions. Can we do that? Yes, instead of Thread.sleep which uses the main thread, we need to use the appropriate reactive operation - delayElements.
public Flux<Response> multiplications(int value) {
  return Flux.range(1, 10)
      .delayElements(Duration.ofSeconds(1))
      .doOnNext(i -> System.out.println("Reactive math service is processing " + i))
      .map(i -> i * value)
      .map(Response::new);
}
Now, if you close/cancel the request you will see processing is stopped. Cool isn't it?

Filter and empty

Let's do one more example, restrict our square calculation to accept only 10 to 20 values.
public Mono<Response> square3(int value) {
  return Mono.just(value)
      .filter(i -> i >= 10)
      .filter(i -> i <= 20)
      .map(Response::new)
      .doOnSuccess(System.out::println);
}
@GetMapping("square3/{input}")
public Mono<ResponseEntity<Response>> square3(@PathVariable("input") int input) {
  return mathService.square3(input)
      .map(ResponseEntity::ok);
}
If you run this with http://localhost:8080/square3/5 it returns success 200 with an empty body. That is not right in REST.
@GetMapping("square3/{input}")
public Mono<ResponseEntity<Response>> square3(@PathVariable("input") int input) {
  return mathService.square3(input)
      .map(ResponseEntity::ok)
      .defaultIfEmpty(ResponseEntity.badRequest().build());
}
That is all for today. Hope you enjoyed the show. Happy WebFluxing.

References

https://projectreactor.io/
https://www.udemy.com/course/spring-webflux/

Comments