响应式编程(下):Spring 5

引子:被誉为“中国大数据第一人”的涂子沛先生在其成名作《数据之巅》里提到,摩尔定律、社交媒体、数据挖掘是大数据的三大成因。IBM 的研究称,整个人类文明所获得的全部数据中,有 90% 是过去两年内产生的。在此背景下,包括NoSQL、Hadoop、Spark、Storm、Kylin在内的大批新技术应运而生。其中以 RxJavaReactor 为代表的响应式(Reactive)编程技术针对的就是经典的大数据4V定义(Volume,Variety,Velocity,Value)中的 Velocity,即高并发问题,而在刚刚发布的 Spring 5 中,也引入了响应式编程的支持。我将分上下两篇与你分享与响应式编程有关的一些学习心得。本篇是下篇,对刚刚发布的 Spring 5 中有关响应式编程的支持做一些简单介绍,并详解一个完整的 Spring 5 示例应用。

1. Spring 5 中的响应式编程

作为 Java 世界首个响应式 Web 框架,Spring 5 最大的亮点莫过于提供了完整的端到端响应式编程的支持。

图片出处:Spring Framework Reference Documentation

左侧是传统的基于 Servlet 的 Spring Web MVC 框架,右侧是 5.0 版本新引入的基于 Reactive Streams 的 Spring WebFlux 框架,从上到下依次是 Router Functions,WebFlux,Reactive Streams 三个新组件。

  • Router Functions: 对标 @Controller、@RequestMapping 等标准的 Spring MVC 注解,提供一套函数式风格的 API,用于创建 Router,Handler 和 Filter。
  • WebFlux: 核心组件,协调上下游各个组件提供响应式编程支持。
  • Reactive Streams: 一种支持背压(Backpressure)的异步数据流处理标准,主流实现有 RxJava 和 Reactor,Spring WebFlux 默认集成的是 Reactor。

在Web容器的选择上,Spring WebFlux 既支持像 Tomcat,Jetty 这样的的传统容器(前提是支持 Servlet 3.1 Non-Blocking IO API),又支持像 Netty,Undertow 那样的异步容器。不管是何种容器,Spring WebFlux 都会将其输入输出流适配成 Flux<DataBuffer> 格式,以便进行统一处理。

值得一提的是,除了新的 Router Functions 接口,Spring WebFlux 同时支持使用老的 Spring MVC 注解声明Reactive Controller。和传统的 MVC Controller 不同,Reactive Controller 操作的是非阻塞的ServerHttpRequest 和 ServerHttpResponse,而不再是 Spring MVC 里的 HttpServletRequest 和 HttpServletResponse。

2. 实战

下面我将以一个简单的 Spring 5 应用为例,介绍如何使用 Spring 5 快速搭建一个响应式Web应用(以下简称 RP 应用)。

2.1 环境准备

首先,从 GitHub 下载我的这个示例应用,地址是https://github.com/emac/spring5-features-demo

然后,从 MongoDB 官网下载最新版本的MongoDB,然后在命令行下运行 mongod & 启动服务。

现在,可以先试着跑一下项目中自带的测试用例。

./gradlew clean build

2.2 依赖介绍

接下来,看一下这个示例应用里的和响应式编程相关的依赖。

compile('org.springframework.boot:spring-boot-starter-webflux')
compile('org.springframework.boot:spring-boot-starter-data-mongodb-reactive')
  • spring-boot-starter-webflux: 启用 Spring 5 的 RP(Reactive Programming)支持,这是使用 Spring 5 开发 RP 应用的必要条件,就好比 spring-boot-starter-web 之于传统的 Spring MVC 应用。
  • spring-boot-starter-data-mongodb-reactive: Spring 5 中新引入的针对 MongoDB 的 Reactive Data 扩展库,允许通过统一的 RP 风格的API操作 MongoDB。

2.3 第一种方式:MVC 注解

Spring 5 提供了 Spring MVC 注解和 Router Functions 两种方式来编写 RP 应用。首先,我先用大家最熟悉的MVC注解来展示如何编写一个最简单的 RP Controller。

示例代码

@RestController
public class RestaurantController {

    /**
     * 扩展ReactiveCrudRepository接口,提供基本的CRUD操作
     */
    private final RestaurantRepository restaurantRepository;

    /**
     * spring-boot-starter-data-mongodb-reactive提供的通用模板
     */
    private final ReactiveMongoTemplate reactiveMongoTemplate;

    public RestaurantController(RestaurantRepository restaurantRepository, ReactiveMongoTemplate reactiveMongoTemplate) {
        this.restaurantRepository = restaurantRepository;
        this.reactiveMongoTemplate = reactiveMongoTemplate;
    }

    @GetMapping("/reactive/restaurants")
    public Flux<Restaurant> findAll() {
        return restaurantRepository.findAll();
    }

    @GetMapping("/reactive/restaurants/{id}")
    public Mono<Restaurant> get(@PathVariable String id) {
        return restaurantRepository.findById(id);
    }

    @PostMapping("/reactive/restaurants")
    public Flux<Restaurant> create(@RequestBody Flux<Restaurant> restaurants) {
        return restaurants
                .buffer(10000)
                .flatMap(rs -> reactiveMongoTemplate.insert(rs, Restaurant.class));
    }

    @DeleteMapping("/reactive/restaurants/{id}")
    public Mono<Void> delete(@PathVariable String id) {
        return restaurantRepository.deleteById(id);
    }
}

可以看到,实现一个 RP Controller 和一个普通的 Controller 是非常类似的,最核心的区别是,优先使用 RP 中最基础的两种数据类型,Flux(对应多值)和 Mono(单值),尤其是方法的参数和返回值。即便是空返回值,也应封装为 Mono<Void>。这样做的目的是,使得应用能够以一种统一的符合 RP 规范的方式处理数据,最理想的情况是从最底层的数据库(或者其他系统外部调用),到最上层的 Controller 层,所有数据都不落地,经由各种 FluxMono 铺设的“管道”,直供调用端。就像农夫山泉那句著名的广告词,我们不生产水,我们只是大自然的搬运工。

单元测试

和非 RP 应用的单元测试相比,RP 应用的单元测试主要是使用了一个 Spring 5 新引入的测试工具类,WebTestClient,专门用于测试 RP 应用。

@RunWith(SpringRunner.class)
@SpringBootTest
public class RestaurantControllerTests {

    @Test
    public void testNormal() throws InterruptedException {
        // start from scratch
        restaurantRepository.deleteAll().block();

        // prepare
        WebTestClient webClient = WebTestClient.bindToController(new RestaurantController(restaurantRepository, reactiveMongoTemplate)).build();
        Restaurant[] restaurants = IntStream.range(0, 100)
                .mapToObj(String::valueOf)
                .map(s -> new Restaurant(s, s, s))
                .toArray(Restaurant[]::new);

        // create
        webClient.post().uri("/reactive/restaurants")
                .accept(MediaType.APPLICATION_JSON_UTF8)
                .syncBody(restaurants)
                .exchange()
                .expectStatus().isOk()
                .expectHeader().contentType(MediaType.APPLICATION_JSON_UTF8)
                .expectBodyList(Restaurant.class)
                .hasSize(100)
                .consumeWith(rs -> Flux.fromIterable(rs.getResponseBody())
                        .log()
                        .subscribe(r1 -> {
                            // get
                            webClient.get()
                                    .uri("/reactive/restaurants/{id}", r1.getId())
                                    .accept(MediaType.APPLICATION_JSON_UTF8)
                                    .exchange()
                                    .expectStatus().isOk()
                                    .expectHeader().contentType(MediaType.APPLICATION_JSON_UTF8)
                                    .expectBody(Restaurant.class)
                                    .consumeWith(r2 -> Assert.assertEquals(r1, r2));
                        })
                );
    }
}

创建 WebTestClient 实例时,首先要绑定一下待测试的 RP Controller。可以看到,和业务类一样,编写 RP 应用的单元测试,同样也是数据不落地的流式风格。

2.4 第二种方式:Router Functions

接着介绍实现 RP 应用的另一种实现方式 —— Router Functions。

Router Functions 是 Spring 5 新引入的一套 Reactive 风格(基于 Flux 和 Mono)的函数式接口,主要包括RouterFunctionHandlerFunctionHandlerFilterFunction,分别对应 Spring MVC 中的 @RequestMapping@ControllerHandlerInterceptor(或者 Servlet 规范中的 Filter)。

和 Router Functions 搭配使用的是两个新的请求/响应模型,ServerRequestServerResponse,这两个模型同样提供了 Reactive 风格的接口

示例代码

自定义 RouterFunction 和 HandlerFilterFunction
@Configuration
public class RestaurantServer implements CommandLineRunner {

    @Autowired
    private RestaurantHandler restaurantHandler;

    /**
     * 注册自定义RouterFunction
     */
    @Bean
    public RouterFunction<ServerResponse> restaurantRouter() {
        RouterFunction<ServerResponse> router = route(GET("/reactive/restaurants").and(accept(APPLICATION_JSON_UTF8)), restaurantHandler::findAll)
                .andRoute(GET("/reactive/delay/restaurants").and(accept(APPLICATION_JSON_UTF8)), restaurantHandler::findAllDelay)
                .andRoute(GET("/reactive/restaurants/{id}").and(accept(APPLICATION_JSON_UTF8)), restaurantHandler::get)
                .andRoute(POST("/reactive/restaurants").and(accept(APPLICATION_JSON_UTF8)).and(contentType(APPLICATION_JSON_UTF8)), restaurantHandler::create)
                .andRoute(DELETE("/reactive/restaurants/{id}").and(accept(APPLICATION_JSON_UTF8)), restaurantHandler::delete)
                // 注册自定义HandlerFilterFunction
                .filter((request, next) -> {
                    if (HttpMethod.PUT.equals(request.method())) {
                        return ServerResponse.status(HttpStatus.BAD_REQUEST).build();
                    }
                    return next.handle(request);
                });
        return router;
    }

    @Override
    public void run(String... args) throws Exception {
        RouterFunction<ServerResponse> router = restaurantRouter();
        // 转化为通用的Reactive HttpHandler
        HttpHandler httpHandler = toHttpHandler(router);
        // 适配成Netty Server所需的Handler
        ReactorHttpHandlerAdapter httpAdapter = new ReactorHttpHandlerAdapter(httpHandler);
        // 创建Netty Server
        HttpServer server = HttpServer.create("localhost", 9090);
        // 注册Handler并启动Netty Server
        server.newHandler(httpAdapter).block();
    }
}

可以看到,使用 Router Functions 实现 RP 应用时,你需要自己创建和管理容器,也就是说 Spring 5 并没有针对 Router Functions 提供 IoC 支持,这是 Router Functions 和 Spring MVC 相比最大的不同。除此之外,你需要通过 RouterFunction 的 API(而不是注解)来配置路由表和过滤器。对于简单的应用,这样做问题不大,但对于上规模的应用,就会导致两个问题:1)Router 的定义越来越庞大;2)由于 URI 和 Handler 分开定义,路由表的维护成本越来越高。那为什么 Spring 5 会选择这种方式定义 Router 呢?接着往下看。

自定义 HandlerFunction
@Component
public class RestaurantHandler {

    /**
     * 扩展ReactiveCrudRepository接口,提供基本的CRUD操作
     */
    private final RestaurantRepository restaurantRepository;

    /**
     * spring-boot-starter-data-mongodb-reactive提供的通用模板
     */
    private final ReactiveMongoTemplate reactiveMongoTemplate;

    public RestaurantHandler(RestaurantRepository restaurantRepository, ReactiveMongoTemplate reactiveMongoTemplate) {
        this.restaurantRepository = restaurantRepository;
        this.reactiveMongoTemplate = reactiveMongoTemplate;
    }

    public Mono<ServerResponse> findAll(ServerRequest request) {
        Flux<Restaurant> result = restaurantRepository.findAll();
        return ok().contentType(APPLICATION_JSON_UTF8).body(result, Restaurant.class);
    }

    public Mono<ServerResponse> findAllDelay(ServerRequest request) {
        Flux<Restaurant> result = restaurantRepository.findAll().delayElements(Duration.ofSeconds(1));
        return ok().contentType(APPLICATION_JSON_UTF8).body(result, Restaurant.class);
    }

    public Mono<ServerResponse> get(ServerRequest request) {
        String id = request.pathVariable("id");
        Mono<Restaurant> result = restaurantRepository.findById(id);
        return ok().contentType(APPLICATION_JSON_UTF8).body(result, Restaurant.class);
    }

    public Mono<ServerResponse> create(ServerRequest request) {
        Flux<Restaurant> restaurants = request.bodyToFlux(Restaurant.class);
        Flux<Restaurant> result = restaurants
                .buffer(10000)
                .flatMap(rs -> reactiveMongoTemplate.insert(rs, Restaurant.class));
        return ok().contentType(APPLICATION_JSON_UTF8).body(result, Restaurant.class);
    }

    public Mono<ServerResponse> delete(ServerRequest request) {
        String id = request.pathVariable("id");
        Mono<Void> result = restaurantRepository.deleteById(id);
        return ok().contentType(APPLICATION_JSON_UTF8).build(result);
    }
}

对比前面的 RestaurantController,由于去除了路由信息,RestaurantHandler 变得非常函数化,可以说就是一组相关的 HandlerFunction 的集合,同时各个方法的可复用性也大为提升。这就回答了上一小节提出的疑问,即以牺牲可维护性为代价,换取更好的函数特性。

单元测试

@RunWith(SpringRunner.class)
@SpringBootTest
public class RestaurantHandlerTests extends BaseUnitTests {

    @Autowired
    private RouterFunction<ServerResponse> restaurantRouter;

    @Override
    protected WebTestClient prepareClient() {
        WebTestClient webClient = WebTestClient.bindToRouterFunction(restaurantRouter)
                .configureClient().baseUrl("http://localhost:9090").responseTimeout(Duration.ofMinutes(1)).build();
        return webClient;
    }
}

和针对 Controller 的单元测试相比,编写 Handler 的单元测试的主要区别在于初始化 WebTestClient 方式的不同,测试方法的主体可以完全复用。

3 小结

到此,有关响应式编程的介绍就暂且告一段落。回顾这两篇文章,我先是从响应式宣言说起,然后介绍了响应式编程的基本概念和关键特性,并且详解了 Spring 5 中和响应式编程相关的新特性,最后以一个示例应用结尾。希望读完这些文章,对你理解响应式编程能有所帮助。

4 参考

0

响应式编程(上):总览

引子:被誉为“中国大数据第一人”的涂子沛先生在其成名作《数据之巅》里提到,摩尔定律、社交媒体、数据挖掘是大数据的三大成因。IBM 的研究称,整个人类文明所获得的全部数据中,有 90% 是过去两年内产生的。在此背景下,包括 NoSQL、Hadoop、Spark、Storm、Kylin 在内的大批新技术应运而生。其中以 RxJavaReactor 为代表的响应式(Reactive)编程技术针对的就是经典的大数据 4V 定义(Volume,Variety,Velocity,Value)中的 Velocity,即高并发问题,而在刚刚发布的 Spring 5 中,也引入了响应式编程的支持。我将分上下两篇与你分享与响应式编程有关的一些学习心得。本篇是上篇,以 Reactor 框架为例介绍响应式编程的几个关键特性。

1. 响应式宣言

敏捷宣言一样,说起响应式编程,必先提到响应式宣言。

We want systems that are Responsive, Resilient, Elastic and Message Driven. We call these Reactive Systems. - The Reactive Manifesto

图片出处:The Reactive Manifesto

不知道是不是为了向敏捷宣言致敬,响应式宣言中也包含了 4 组关键词:

  • Responsive:可响应的。要求系统尽可能做到在任何时候都能及时响应。
  • Resilient:可恢复的。要求系统即使出错了,也能保持可响应性。
  • Elastic:可伸缩的。要求系统在各种负载下都能保持可响应性。
  • Message Driven:消息驱动的。要求系统通过异步消息连接各个组件。

可以看到,对于任何一个响应式系统,首先要保证的就是可响应性,否则就称不上是响应式系统。从这个意义上来说,动不动就蓝屏的 Windows 系统显然不是一个响应式系统。

PS: 如果你赞同响应式宣言,不妨到官网上留下的你电子签名,我的编号是 18989,试试看能不能找到我。

2. 响应式编程

In computing, reactive programming is an asynchronous programming paradigm concerned with data streams and the propagation of change. - Reactive programming - Wikipedia

在上述响应式编程(后面简称 RP)的定义中,除了异步编程,还包含两个重要的关键词:

  • Data streams:即数据流,分为静态数据流(比如数组,文件)和动态数据流(比如事件流,日志流)两种。基于数据流模型,RP 得以提供一套统一的 Stream 风格的数据处理接口。和 Java 8 中的 Stream API 相比,RP API 除了支持静态数据流,还支持动态数据流,并且允许复用和同时接入多个订阅者。
  • The propagation of change:变化传播,简单来说就是以一个数据流为输入,经过一连串操作转化为另一个数据流,然后分发给各个订阅者的过程。这就有点像函数式编程中的组合函数,将多个函数串联起来,把一组输入数据转化为格式迥异的输出数据。

一个容易混淆的概念是响应式设计,虽然它的名字中也包含了“响应式”三个字,但其实和 RP 完全是两码事。响应式设计是指网页能够自动调整布局和样式以适配不同尺寸的屏幕,属于网站设计的范畴,而 RP 是一种关注系统可响应性,面向数据流的编程思想或者说编程框架。

特性

从本质上说,RP 是一种异步编程框架,和其他框架相比,RP 至少包含了以下三个特性:

  • 描述而非执行:在你最终调用 subscribe() 方法之前,从发布端到订阅端,没有任何事会发生。就好比无论多长的水管,只要水龙头不打开,水管里的水就不会流动。为了提高描述能力,RP 提供了比 Stream 丰富的多的多的API,比如 buffer()merge()onErrorMap() 等。
  • 提高吞吐量:类似于 HTTP/2 中的连接复用,RP 通过线程复用来提高吞吐量。在传统的Servlet容器中,每来一个请求就会发起一个线程进行处理。受限于机器硬件资源,单台服务器所能支撑的线程数是存在一个上限的,假设为T,那么应用同时能处理的请求数(吞吐量)必然也不会超过T。但对于一个使用 Spring 5 开发的 RP 应用,如果运行在像 Netty 这样的异步容器中,无论有多少个请求,用于处理请求的线程数是相对固定的,因此最大吞吐量就有可能超过T。
  • 背压(Backpressure)支持:简单来说,背压就是一种反馈机制。在一般的 Push 模型中,发布者既不知道也不关心订阅者的处理速度,当数据的发布速度超过处理速度时,需要订阅者自己决定是缓存还是丢弃。如果使用 RP,决定权就交回给发布者,订阅者只需要根据自己的处理能力问发布者请求相应数量的数据。你可能会问这不就是 Pull 模型吗?其实是不同的。在 Pull 模型中,订阅者每次处理完数据,都要重新发起一次请求拉取新的数据,而使用背压,订阅者只需要发起一次请求,就能连续不断的重复请求数据。

适用场景

了解了 RP 的这些特性,你可能已经猜想到 RP 有哪些适用场景了。一般来说,RP 适用于高并发、带延迟操作的场景,比如以下这些情况(的组合):

  • 一次请求涉及多次外部服务调用
  • 非可靠的网络传输
  • 高并发下的消息处理
  • 弹性计算网络

代价

Every coin has two sides.

和任何框架一样,有优势必然就有劣势。RP 的两个比较大的问题是:

  • 虽然复用线程有助于提高吞吐量,但一旦在某个回调函数中线程被卡住,那么这个线程上所有的请求都会被阻塞,最严重的情况,整个应用会被拖垮。
  • 难以调试。由于 RP 强大的描述能力,在一个典型的 RP 应用中,大部分代码都是以链式表达式的形式出现,比如flux.map(String::toUpperCase).doOnNext(s -> LOG.info("UC String {}", s)).next().subscribe(),一旦出错,你将很难定位到具体是哪个环节出了问题。所幸的是,RP 框架一般都会提供一些工具方法来辅助进行调试。

3. Reactor 实战

为了帮助你理解上面说的一些概念,下面我就通过几个测试用例,演示 RP 的两个关键特性:提高吞吐量和背压。完整的代码可参见我 GitHub 上的示例工程

提高吞吐量

    @Test
    public void testImperative() throws InterruptedException {
        _runInParallel(CONCURRENT_SIZE, () -> {
            ImperativeRestaurantRepository.INSTANCE.insert(load);
        });
    }

    private void _runInParallel(int nThreads, Runnable task) throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
        for (int i = 0; i < nThreads; i++) {
            executorService.submit(task);
        }
        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.MINUTES);
    }

    @Test
    public void testReactive() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(CONCURRENT_SIZE);
        for (int i = 0; i < CONCURRENT_SIZE; i++) {
            ReactiveRestaurantRepository.INSTANCE.insert(load).subscribe(s -> {
            }, e -> latch.countDown(), latch::countDown);
        }
        latch.await();
    }

用例解读:

  • 第一个测试用例使用的是多线程 + MongoDB Driver,同时起 100 个线程,每个线程往 MongoDB 中插入 10000 条数据,总共 100 万条数据,平均用时15秒左右。
  • 第二个测试用例使用的是 Reactor + MongoDB Reactive Streams Driver,同样是插入 100 万条数据,平均用时不到 10 秒,吞吐量提高了
    50%!

背压

在演示测试用例之前,先看两张图,帮助你更形象的理解什么是背压。

图片出处:Dataflow and simplified reactive programming

两张图乍一看没啥区别,但其实是完全两种不同的背压策略。第一张图,发布速度(100/s)远大于订阅速度(1/s),但由于背压的关系,发布者严格按照订阅者的请求数量发送数据。第二张图,发布速度(1/s)小于订阅速度(100/s),当订阅者请求100个数据时,发布者会积满所需个数的数据再开始发送。可以看到,通过背压机制,发布者可以根据各个订阅者的能力动态调整发布速度。

    @BeforeEach
    public void beforeEach() {
        // initialize publisher
        AtomicInteger count = new AtomicInteger();
        timerPublisher = Flux.create(s ->
                new Timer().schedule(new TimerTask() {
                    @Override
                    public void run() {
                        s.next(count.getAndIncrement());
                        if (count.get() == 10) {
                            s.complete();
                        }
                    }
                }, 100, 100)
        );
    }

    @Test
    public void testNormal() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);
        timerPublisher
                .subscribe(r -> System.out.println("Continuous consuming " + r),
                        e -> latch.countDown(),
                        latch::countDown);
        latch.await();
    }

    @Test
    public void testBackpressure() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);
        AtomicReference<Subscription> timerSubscription = new AtomicReference<>();
        Subscriber<Integer> subscriber = new BaseSubscriber<Integer>() {
            @Override
            protected void hookOnSubscribe(Subscription subscription) {
                timerSubscription.set(subscription);
            }

            @Override
            protected void hookOnNext(Integer value) {
                System.out.println("consuming " + value);
            }

            @Override
            protected void hookOnComplete() {
                latch.countDown();
            }

            @Override
            protected void hookOnError(Throwable throwable) {
                latch.countDown();
            }
        };
        timerPublisher.onBackpressureDrop().subscribe(subscriber);
        new Timer().schedule(new TimerTask() {
            @Override
            public void run() {
                timerSubscription.get().request(1);
            }
        }, 100, 200);
        latch.await();
    }

用例解读:

  • 第一个测试用例演示了在理想情况下,即订阅者的处理速度能够跟上发布者的发布速度(以 100ms 为间隔产生 10 个数字),控制台从 0 打印到 9,一共 10 个数字,和发布端一致。
  • 第二个测试用例故意调慢了订阅者的处理速度(每 200ms 处理一个数字),同时发布者采用了 Drop 的背压策略,结果控制台只打印了一半的数字(0,2,4,6,8),另外一半的数字由于背压的原因被发布者 Drop 掉了,并没有发给订阅者。

4 小结

通过上面的介绍,不难看出 RP 实际上是一种内置了发布者订阅者模型的异步编程框架,包含了线程复用,背压等高级特性,特别适用于高并发、有延迟的场景。

下篇我将对刚刚发布的 Spring 5 中有关响应式编程的支持做一些简单介绍,并详解一个完整的 Spring 5 示例应用,敬请期待。

5 参考

0