浅析 InnoDB 存储引擎的工作流程

InnoDB

InnoDB 是由 Innobase Oy 公司开发,该存储引擎是第一个完整支持 ACID 事务的 MySQL 存储引擎。具有插入缓存两次写自适应哈希索引等关键特性,是一个高性能、高可用的存储引擎。

整体架构

InnoDB 有多个内存块,这些内存块组合在一起组成了一个大的内存池。而 InnoDB 的内存池中会有多个后台线程,这些后台线程负责刷新内存池中的数据,和将脏页(已修改的数据页)刷新到磁盘文件。

后台线程

默认情况下,InnoDB 存储引擎有 13 个后台线程:

  • 一个 master 线程
  • 一个锁监控线程
  • 一个错误监控线程
  • 十个 IO 线程
    • 插入缓存线程
    • 日志线程
    • 读线程(默认 4 个)
    • 写线程(默认 4 个)

下面是我本机上的十个 IO 线程

--------
FILE I/O
--------
I/O thread 0 state: waiting for i/o request (insert buffer thread)
I/O thread 1 state: waiting for i/o request (log thread)
I/O thread 2 state: waiting for i/o request (read thread)
I/O thread 3 state: waiting for i/o request (read thread)
I/O thread 4 state: waiting for i/o request (read thread)
I/O thread 5 state: waiting for i/o request (read thread)
I/O thread 6 state: waiting for i/o request (write thread)
I/O thread 7 state: waiting for i/o request (write thread)
I/O thread 8 state: waiting for i/o request (write thread)
I/O thread 9 state: waiting for i/o request (write thread)
Pending normal aio reads: 0 [0, 0, 0, 0] , aio writes: 0 [0, 0, 0, 0] ,
 ibuf aio reads: 0, log i/o's: 0, sync i/o's: 0
Pending flushes (fsync) log: 0; buffer pool: 0
540 OS file reads, 89 OS file writes, 7 OS fsyncs
0.00 reads/s, 0 avg bytes/read, 0.00 writes/s, 0.00 fsyncs/s

内存池

InnoDB 存储引擎的内存池包含:缓冲池、日志缓存池、额外内存池。这些内存的大小分别由配置文件中的参数决定。其中占比最大的是缓冲池,里面包含了数据缓存页、索引、插入缓存、自适应哈希索引、锁信息和数据字典。InnoDB 会在读取数据库数据的时候,将数据缓存到缓冲池中,而在修改数据的时候,会先把缓冲池中的数据修改掉,一旦修改过的数据页就会被标记为脏页,而脏页则会被 master 线程按照一定的频率刷新到磁盘中。日志缓存则是缓存了redo-log 信息,然后再刷新到 redo-log 文件中。额外内存池则是在对一些数据结构本身分配内存时会从额外内存池中申请内存,当该区域内存不足则会到缓冲池中申请。

内存结构

Master Thread

InnoDB 存储引擎的主要工作都在一个单独的 Master Thread 中完成,其内部由四个循环体构成:主循环( loop )、后台循环( background loop )、刷新循环( flush loop )、暂停循环( suspend loop )。具体工作流程如下图所示:

mysql_thread

主循环

主要负责将缓冲池中的日志文件刷新到磁盘中、合并插入缓存、刷新缓冲池中的脏页数据到磁盘中、删除无用的 Undo 页、产生一个 checkpoint 。在主循环中会多次将脏页刷新到磁盘中,但是有一些刷新任务总会执行,有一些则根据参数来判断当前是否需要刷新。而这个参数 innodb_max_dirty_pages_pct 最大脏页比例是通过配置文件决定的,你可以根据实际情况来调整你自己的最大脏页比例,来达到最好的性能。

伪代码如下:

for (int i = 0; i<10; i++) {
    thread_sleep(1)
    do log buffer flush to disk
    if ( last_one_second_ios < 5) {
        do merge at most 5 insert buffer
    }
    if (buf_get_modified_ratio_pct > innodb_max_ditry_pages_pct) {
        do buffer pool flush 100 dirty page
    }
    if (no user activity) {
        goto background loop
    }
}
if (last_ten_second_ios < 200) {
    do buffer pool flush 100 dirty page
}
do merge at most 5 insert buffer
do log buffer flush to disk
do full pourge
if (buf_get_modifued_ratio_pct > 70%) {
    do buffer pool flush 100 dirty page
} else {
    buffer pool flush 10 dirty page
}
do fuzzy checkpoint
goto loop

后台循环

在后台循环中 InnoDB 会做这些事:删除无用的Undo页、合并插入缓存。如果当前 InnoDB 处于空闲状态,则跳转到刷新循环,否则跳转到主循环继续处理数据。

伪代码如下:

do full purge
do merge 20 insert buffer
if (not idle) {
    goto loop
} else {
    goto flush loop
}

刷新循环

一旦执行到刷新循环,InnoDB 会一直处理脏页数据,直到脏页数据达到最大脏页比例以下。这时候会跳转到暂停循环中(所有数据都处理完毕)。

伪代码如下:

flush loop:
do buffer pool flush 100 dirty page
if (buf_get_modified_ratio_pct > innodb_max_dirty_pages_pct) {
    goto flush loop
} else {
    goto suspend loop
}

暂停循环

在本循环中,InnoDB会将 Master Thread 挂起,减少内存资源使用,一直处于 waiting 状态,等待事件来唤醒。一旦有新的事件过来,就跳转到主循环中。

伪代码如下:

suspend loop:
suspend_thread()
waiting event
goto loop;=

由此可以看出,master 线程的最大的工作内容就是刷新脏页数据到磁盘了。这一步就是把缓存池中被修改的数据页同步到磁盘中。而脏页数据的刷新基本上都是由 innodb_max_dirty_pages_pct 来控制的,所以当你的服务器处理能力比较强,给 InnoDB 分配的内存池比较大,这时候可能你的脏页数据会很难达到最大脏页比,这时候你的数据基本上都在缓冲池中,可能需要很长一段时间才会到数据库磁盘文件中,也就是脏页的刷新速度会很低(MySQL 5.1之前的版本默认是 90%,后面调整到 75%)。所以实际应用中可以根据自己内存和数据库的读写量来设置这个最大脏页比。对于一次刷新脏页数量的设置,在 InnoDB Plugin 中有一个参数 innodb_adaptive_flushing 自适应刷新,InnoDB 会根据产生的重做日志速度来计算出当前最适合的刷新脏页数量。当然 InnoDB Plugin 中还有其它很多参数配置,合理利用这些配置可以极大的提升 InnoDB 存储引擎的性能。

关键特性

前面说到 InnoDB 的三大特性分别为:插入缓存、两次写、自适应哈希索引。下面就简单介绍下这三大特性。

插入缓存

当我插入一条数据,该数据只有一个 ID 索引(聚集索引数据行的物理顺序与列值的逻辑顺序相同)的时候,并且 ID 是自增长的,这时候页中的行记录按照 ID 顺序存放,所以只需要在最新页插入数据即可。但是如果我的表有多个非聚集索引:该索引中索引的逻辑顺序与磁盘上行的物理存储顺序不同,在插入的时候非聚集索引的插入不再是顺序的,这时候要离散的访问非聚集索引页,导致插入性能变低。而插入缓存则在插入的时候判断缓冲池中是否存在当前非聚集索引,如果存在则直接插入,否则先插入到一个缓存区,然后再通过 Master Thread 来合并插入缓存。这样极大的提高了数据的写性能。

两次写

两次写是为了解决在将缓冲池中的脏页刷新到磁盘的过程中,操作系统出现故障,导致当前的脏页部分写失效的问题。通过两次写在下次恢复的时候,InnoDB 会根据两次写的结果来恢复数据。

原理:在刷新脏页的时候,不是直接把脏页数据刷新到磁盘,而是将脏页先写到一个大小为2M的内存缓存中,再将这个内存缓存数据同步到磁盘的共享表空间中。当全部都写到共享表空间后,再将数据刷新到磁盘中。这样如果发生了上面描述的情况,这时候数据会在共享表空间中有个备份,恢复的时候就可以使用共享表空间的数据。

如果有数据库集群的情况下,master数据库是一定要开启两次写的,为了保证数据可靠性。而从数据库可以通过参数 skip_innodb_doublewrite 来禁止两次写功能,来提高插入效率。

自适应哈希索引

InnoDB 会监控对表示的索引查找,如果发现可以通过对索引进行哈希来优化搜索。这时候会对当前的索引建立哈希索引。称之为自适应哈希索引( AHI )。可以通过参数 innodb_adaptive_hash_index 来禁用或启用此特性。

小结

总体来说 InnoDB 的高性能体现在:插入数据的时候先保存在内存中,直接跟内存交互性能比较好,而且还有插入缓存优化,保证了高并发写操作。高可用则表现在两次写特性,保证了机器宕机或者出故障的时候数据不会丢失。这里只是简单介绍了一下 InnoDB 的工作流程和一些特性,当然 InnoDB 还有很多很多强大的功能,比如说事务、锁、索引、算法等等有兴趣的同学可以参考《 MySQL 技术内幕 InnoDB 存储引擎》这本书深入了解。

0

使用 RabbitMQ 实现 RPC

背景知识

RabbitMQ

RabbitMQ 是基于 AMQP 协议实现的一个消息队列(Message Queue),Message Queue 是一个典型的生产者/消费者模式。生产者发布消息,消费者消费消息,生产者和消费者之间是解耦的,互相不知道对方的存在。

RPC

Remote Procedure Call:远程过程调用,一次远程过程调用的流程即客户端发送一个请求到服务端,服务端根据请求信息进行处理后返回响应信息,客户端收到响应信息后结束。

如何使用RabbitMQ实现RPC?

使用 RabbitMQ 实现 RPC,相应的角色是由生产者来作为客户端,消费者作为服务端。

但 RPC 调用一般是同步的,客户端和服务器也是紧密耦合的。即客户端通过 IP/域名和端口链接到服务器,向服务器发送请求后等待服务器返回响应信息。

但 MQ 的生产者和消费者是完全解耦的,那么如何用 MQ 实现 RPC 呢?很明显就是把 MQ 当作中间件实现一次双向的消息传递:

客户端和服务端即是生产者也是消费者。客户端发布请求,消费响应;服务端消费请求,发布响应。

具体实现

MQ部分的定义

请求信息的队列

我们需要一个队列来存放请求信息,客户端向这个队列发布请求信息,服务端消费该队列处理请求。该队列不需要复杂的路由规则,直接使用 RabbitMQ 默认的 direct exchange 来路由消息即可。

响应信息的队列

存放响应信息的队列不应只有一个。如果存在多个客户端,不能保证响应信息被发布请求的那个客户端消费到。所以应为每一个客户端创建一个响应队列,这个队列应该由客户端来创建且只能由这个客户端使用并在使用完毕后删除,这里可以使用 RabbitMQ 提供的排他队列(Exclusive Queue):

channel.queueDeclare(queue:"", durable:false, exclusive:true, autoDelete:false, new HashMap<>())

并且要保证队列名唯一,声明队列时名称设为空 RabbitMQ 会生成一个唯一的队列名。
exclusive设为true表示声明一个排他队列,排他队列的特点是只能被当前的连接使用,并且在连接关闭后被删除。

一个简单的 demo(使用 pull 机制)

我们使用一个简单的 demo 来了解客户端和服务端的处理流程。

发布请求
  • 编写代码前的一个小问题

我们在声明队列时为每一个客户端声明了独有的响应队列,那服务器在发布响应时如何知道发布到哪个队列呢?其实就是客户端需要告诉服务端将响应发布到哪个队列,RabbitMQ 提供了这个支持,消息体的Properties中有一个属性reply_to就是用来标记回调队列的名称,服务器需要将响应发布到reply_to指定的回调队列中。

解决了这个问题之后我们就可以编写客户端发布请求的代码了:

// 定义响应回调队列
String replyQueueName = channel.queueDeclare("", false, true, false, new HashMap<>()).getQueue();

// 设置回调队列到 Properties
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
        .replyTo(replyQueueName)
        .build();

String request = "request";

// 发布请求
channel.basicPublish("", "rpc_queue", properties, request.getBytes());

Direct reply-to:
RabbitMQ 提供了一种更便捷的机制来实现 RPC,不需要客户端每次都定义回调队列,客户端发布请求时将replyTo设为amq.rabbitmq.reply-to,消费响应时也指定消费amq.rabbitmq.reply-to,RabbitMQ 会为客户端创建一个内部队列

消费请求

接下来是服务端处理请求的部分,接收到请求后经过处理将响应信息发布到reply_to指定的回调队列:

// 服务端 Consumer 的定义
public class RpcServer extends DefaultConsumer {

    public RpcServer(Channel channel) {
        super(channel);
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String msg = new String(body);
        String response = (msg + " Received");

        // 获取回调队列名
        String replyTo = properties.getReplyTo();

        // 发布响应消息到回调队列
        this.getChannel().basicPublish("", replyTo, new AMQP.BasicProperties(), response.getBytes());
    }
}

...

// 启动服务端 Consumer
channel.basicConsume("rpc_queue", true, new RpcServer(channel));
接收响应

客户端如何接收服务器的响应呢?有两种方式:1.轮询的去 pull 回调队列中的消息,2.异步的消费回调队列中的消息。我们在这里简单实现第一种方案。

GetResponse getResponse = null;

while (getResponse == null) {
    getResponse = channel.basicGet(replyQueueName, true);
}

String response = new String(getResponse.getBody());

一个简单的基于 RabbitMQ 的 RPC 模型已经实现了,但这个 demo 并不实用,因为客户端每次发送完请求都要同步的轮询等待响应消息,只能每次处理一个请求。RabbitMQ 的 pull 模式效率也比较低。

实现一个完备可用的 RPC 模式需要做的工作还有很多,要处理的关键点也比较复杂,有句话叫不要重复造轮子,spring 已经实现了一个完备可用的 RPC 模式的库,接下来我们来了解一下。

Spring Rabbit 中的实现

和上面 demo 的 pull 模式一次只能处理一个请求相对应的:如何异步的接收响应并处理多个请求呢?关键点就在于我们需要记录请求和响应并将它们关联起来,RabbitMQ 也提供了支持,Properties 中的另一个属性correlation_id用来标识一个消息的唯一 id。

参考spring-rabbit中的convertSendAndReceive方法的实现,为每一次请求生成一个唯一的correlation_id

private final AtomicInteger messageTagProvider = new AtomicInteger();
...
String messageTag = String.valueOf(this.messageTagProvider.incrementAndGet());
...
message.getMessageProperties().setCorrelationId(messageTag);

并使用一个ConcurrentHashMap来维护correlation_id和响应信息的映射:

private final Map<String, PendingReply> replyHolder = new ConcurrentHashMap<String, PendingReply>();
...
final PendingReply pendingReply = new PendingReply();

this.replyHolder.put(correlationId, pendingReply);

PendingReply中有一个BlockingQueue存放响应信息,在发送完请求信息后调用BlockingQueuepull方法并设置超时时间来获取响应:

private final BlockingQueue<Object> queue = new ArrayBlockingQueue<Object>(1);

public Message get(long timeout, TimeUnit unit) throws InterruptedException {
    Object reply = this.queue.poll(timeout, unit);
    return reply == null ? null : processReply(reply);
}

在获取响应后不论结果如何,都会将PendingReplyreplyHolder中移除,防止replyHolder中积压超时的响应消息:

try {
    reply = exchangeMessages(exchange, routingKey, message, correlationData, channel, pendingReply,messageTag);
} finally {
    this.replyHolder.remove(messageTag);
    ...
}

响应信息是何时如何被放到这个BlockingQueue中的呢?看一下RabbitTemplate接收消息的地方:

public void onMessage(Message message) {
    String messageTag;
    if (this.correlationKey == null) { // using standard correlationId property
        messageTag = message.getMessageProperties().getCorrelationId();
    } else {
        messageTag = (String) message.getMessageProperties()
                .getHeaders().get(this.correlationKey);
    }

    // 存在 correlation_id 才认为是RPC的响应信息,不存在时不处理
    if (messageTag == null) {
        logger.error("No correlation header in reply");
        return;
    }

    // 从 replyHolder 中取出 correlation_id 对应的 PendingReply
    PendingReply pendingReply = this.replyHolder.get(messageTag);
    if (pendingReply == null) {
        if (logger.isWarnEnabled()) {
            logger.warn("Reply received after timeout for " + messageTag);
        }
        throw new AmqpRejectAndDontRequeueException("Reply received after timeout");
    }
    else {
        restoreProperties(message, pendingReply);
        // 将响应信息 add 到 BlockingQueue 中
        pendingReply.reply(message);
    }
}

以上的 spring 代码隐去了很多额外部分的处理和细节,只关注关键的部分。

至此一个完整可用的由 RabbitMQ 作为中间件实现的 RPC 模式就完成了。

总结

服务端

服务端的实现比较简单,和一般的Consumer的区别只在于需要将请求回复到replyTo指定的 queue 中并带上消息标识correlation_id即可

服务端的一点小优化:
超时的处理是由客户端来实现的,那服务端有没有可以优化的地方呢?
答案是有的:如果我们的服务端处理比较耗时,如何判断客户端是否还在等待响应呢?
我们可以使用passive参数去检查replyTo的 queue 是否存在,因为客户端声明的是内部队列,客户端如果断掉链接了这个 queue 就不存在了,这时服务端就无需处理这个消息了。

客户端

客户端承担了更多的工作量,包括:

  • 声明replyTo队列(使用amq.rabbitmq.reply-to会简单很多)
  • 维护请求和响应消息(使用唯一的correlation_id来关联)
  • 消费服务端的返回
  • 处理超时等异常情况(使用BlockingQueue来阻塞获取)

好在 spring 已经实现了一套完备可靠的代码,我们在清楚了流程和关键点之后,可以直接使用 spring 提供的RabbitTemplate,无需自己实现。

使用 MQ 实现 RPC 的意义

通过 MQ 实现 RPC 看起来比客户端和服务器直接通讯要复杂一些,那我们为什么要这样做呢?或者说这样做有什么好处:

  1. 将客户端和服务器解耦:客户端只是发布一个请求到 MQ 并消费这个请求的响应。并不关心具体由谁来处理这个请求,MQ 另一端的请求的消费者可以随意替换成任何可以处理请求的服务器,并不影响到客户端。
  2. 减轻服务器的压力:传统的 RPC 模式中如果客户端和请求过多,服务器的压力会过大。由 MQ 作为中间件的话,过多的请求而是被 MQ 消化掉,服务器可以控制消费请求的频次,并不会影响到服务器。
  3. 服务器的横向扩展更加容易:如果服务器的处理能力不能满足请求的频次,只需要增加服务器来消费 MQ 的消息即可,MQ会帮我们实现消息消费的负载均衡。
  4. 可以看出 RabbitMQ 对于 RPC 模式的支持也是比较友好地,amq.rabbitmq.reply-to, reply_to, correlation_id这些特性都说明了这一点,再加上 spring-rabbit 的实现,可以让我们很简单的使用消息队列模式的 RPC 调用。
0

原来你是这样的 Stream —— 浅析 Java Stream 实现原理

Stream 为什么会出现?

Stream 出现之前,遍历一个集合最传统的做法大概是用 Iterator,或者 for 循环。这种两种方式都属于外部迭代,然而外部迭代存在着一些问题。

  • 开发者需要自己手写迭代的逻辑,虽然大部分场景迭代逻辑都是每个元素遍历一次。
  • 如果存在像排序这样的有状态的中间操作,不得不进行多次迭代。
  • 多次迭代会增加临时变量,从而导致内存的浪费。

虽然 Java 5 引入的 foreach 解决了部分问题,但也引入了新的问题。

  • foreach 遍历不能对元素进行赋值操作
  • 遍历的时候,只有当前被遍历的元素可见,其他不可见

随着大数据的兴起,传统的遍历方式已经无法满足开发者的需求。
就像小作坊发展到一定程度要变成大工厂才能满足市场需求一样。大工厂和小作坊除了规模变大、工人不多之外,最大的区别就是多了流水线。流水线可以将工人们更高效的组织起来,使得生产力有质的飞跃。

所以不安于现状的开发者们想要开发一种更便捷,更实用的特性。

  • 它可以像流水线一样来处理数据
  • 它应该兼容常用的集合
  • 它的编码应该更简洁
  • 它应该具有更高的可读性
  • 它可以提供对数据集合的常规操作
  • 它可以拼装不同的操作

经过不懈的能力,Stream 就诞生了。加上 lambda 表达式的加成,简直是如虎添翼。

你可以用 Stream 干什么?

下面以简单的需求为例,看一下 Stream 的优势:

从一列单词中选出以字母a开头的单词,按字母排序后返回前3个。

外部迭代实现方式

List<String> list = Lists.newArrayList("are", "where", "advance", "anvato", "java", "abc");
List<String> tempList = Lists.newArrayList();
List<String> result = Lists.newArrayList();
for( int i = 0; i < list.size(); i++) {
    if (list.get(i).startsWith("a")) {
        tempList.add(list.get(i));
    }
}
tempList.sort(Comparator.naturalOrder());
result = tempList.subList(0,3);
result.forEach(System.out::println);

stream实现方式

List<String> list = Lists.newArrayList("are", "where", "anvato", "java", "abc");
list.stream().filter(s -> s.startsWith("a")).sorted().limit(3)
                .collect(Collectors.toList()).forEach(System.out::println);

Stream 是怎么实现的?

需要解决的问题:

  • 如何定义流水线?
  • 原料如何流入?
  • 如何让流水线上的工人将处理过的原料交给下一个工人?
  • 流水线何时开始运行?
  • 流水线何时结束运行?

总观全局

Stream 处理数据的过程可以类别成工厂的流水线。数据可以看做流水线上的原料,对数据的操作可以看做流水线上的工人对原料的操作。

事实上 Stream 只是一个接口,并没有操作的缺省实现。最主要的实现是 ReferencePipeline,而 ReferencePipeline 继承自 AbstractPipelineAbstractPipeline 实现了 BaseStream 接口并实现了它的方法。但 ReferencePipeline 仍然是一个抽象类,因为它并没有实现所有的抽象方法,比如 AbstractPipeline 中的 opWrapSinkReferencePipeline内部定义了三个静态内部类,分别是:Head, StatelessOp, StatefulOp,但只有 Head 不再是抽象类。

图1

流水线的结构有点像双向链表,节点之间通过引用连接。节点可以分为三类,控制数据输入的节点、操作数据的中间节点和控制数据输出的节点。

ReferencePipeline 包含了控制数据流入的 Head ,中间操作 StatelessOp, StatefulOp,终止操作 TerminalOp

Stream 常用的流操作包括:
* 中间操作(Intermediate Operations)
* 无状态(Stateless)操作:每个数据的处理是独立的,不会影响或依赖之前的数据。如 filter()flatMap()flatMapToDouble()flatMapToInt()flatMapToLong()map()mapToDouble()mapToInt()mapToLong()peek()unordered()
* 有状态(Stateful)操作:处理时会记录状态,比如处理了几个。后面元素的处理会依赖前面记录的状态,或者拿到所有元素才能继续下去。如 distinct()sorted()sorted(comparator)limit()skip()
* 终止操作(Terminal Operations)
* 非短路操作:处理完所有数据才能得到结果。如 collect()count()forEach()forEachOrdered()max()min()reduce()toArray() 等。
* 短路(short-circuiting)操作:拿到符合预期的结果就会停下来,不一定会处理完所有数据。如 anyMatch()allMatch()noneMatch()findFirst()findAny() 等。

源码分析

了解了流水线的结构和定义,接下来我们基于上面的例子逐步看一下源代码。

定义输入源

stream() 是 Collection 中的 default 方法,实际上调用的是 StreamSupport.stream() 方法,返回的是 ReferencePipeline.Head的实例。

ReferencePipeline.Head 的构造函数传递是 ArrayList 中实现的 spliterator 。常用的集合都实现了 Spliterator 接口以支持 Stream。可以这样理解,Spliterator 定义了数据集合流入流水线的方式。

定义流水线节点

filter() 是 Stream 中定义的方法,在 ReferencePipeline 中实现,返回 StatelessOp 的实例。

可以看到 filter() 接收的参数是谓词,可以用 lambda 表达式。StatelessOp的构造函数接收了 this,也就是 ReferencePipeline.Head 实例的引用。并且实现了 AbstractPipeline 中定义的 opWrapSink 方法。

@Override
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
    Objects.requireNonNull(predicate);
    return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
                                         StreamOpFlag.NOT_SIZED) {
        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
            return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                @Override
                public void begin(long size) {
                    downstream.begin(-1);
                }

                @Override
                public void accept(P_OUT u) {
                    if (predicate.test(u))
                        downstream.accept(u);
                }
            };
        }
    };
}

sorted()limit() 的返回值和也都是 Stream 的实现类,并且都接收了 this 。不同的是 sorted() 返回的是 ReferencePipeline.StatefulOp 的子类 SortedOps.OfRef 的实例。limit() 返回的 ReferencePipeline.StatefulOp 的实例。

现在可以粗略地看到,这些中间操作(不管是无状态的 filter(),还是有状态的 sorted()limit() 都只是返回了一个包含上一节点引用的中间节点。有点像 HashMap 中的反向单向链表。就这样把一个个中间操作拼接到了控制数据流入的 Head 后面,但是并没有开始做任何数据处理的动作

这也就是 Stream 延时执行的特性原因之所在。

参见附录I会发现 StatelessOp 和StatefulOp 初始化的时候还会将当前节点的引用传递给上一个节点。

previousStage.nextStage = this;

所以各个节点组成了一个双向链表的结构。

组装流水线

最后来看一下终止操作 .collect() 接收的是返回类型对应的 Collector。

此例中的 Collectors.toList() 是 Collectors 针对 ArrayList 的创建的 CollectorImpl 的实例。

@Override
@SuppressWarnings("unchecked")
public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {
    A container;
    if (isParallel()
        && (collector.characteristics().contains(Collector.Characteristics.CONCURRENT))
        && (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) {
        container = collector.supplier().get();
        BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator();
        forEach(u -> accumulator.accept(container, u));
    }
    else {
        container = evaluate(ReduceOps.makeRef(collector));//1
    }
    return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
        ? (R) container
        : collector.finisher().apply(container);
}

先忽略并行的情况,来看一下加注释了1的代码:

  1. ReduceOps.makeRef 接收此 Collector 返回了一个 ReduceOp(实现了 TerminalOp 接口)的实例。
  2. 返回的 ReduceOp 实例又被传递给 AbstractPipeline 中的 evaluate() 方法。
  3. evaluate 中,调用了 ReduceOp实例的 evaluateSequential 方法,并将上流水线上最后一个节点的引用和 sourceSpliterator 传递进去。
@Override
public <P_IN> R evaluateSequential(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) {
    return helper.wrapAndCopyInto(makeSink(), spliterator).get();
}
  1. 然后调用 ReduceOp 实例的 makeSink() 方法返回其 makeRef() 方法内部类 ReducingSink 的实例。
  2. 接着 ReducingSink 的实例作为参数和 spliterator 一起传入最后一个节点的 wrapAndCopyInto() 方法,返回值是 Sink 。

启动流水线

流水线组装好了,现在就该启动流水线了。这里的核心方法是 wrapAndCopyInto,根据方法名也能看出来这里应该做了两件事,wrapSink()copyInto()

wrapSink()

将最后一个节点创建的 Sink 传入,并且看到里面有个 for 循环。参见附录I可以发现

每个节点都记录了上一节点的引用( previousStage )和每一个节点的深度( depth )。

所以这个 for 循环是从最后一个节点开始,到第二个节点结束。每一次循环都是将上一节点的 combinedFlags 和当前的 Sink 包起来生成一个新的 Sink 。这和前面拼接各个操作很类似,只不过拼接的是 Sink 的实现类的实例,方向相反。

(Head.combinedFlags, (StatelessOp.combinedFlags, (StatefulOp.combinedFlags,(StatefulOp.combinedFlags ,TerminalOp.sink)))

@Override
@SuppressWarnings("unchecked")
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
   Objects.requireNonNull(sink);

   for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
       sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
   }
   return (Sink<P_IN>) sink;
}

copyInto()

终于到了要真正开始迭代的时候,这个方法接收两个参数 Sink<P_IN> wrappedSink, Spliterator<P_IN> spliteratorwrappedSink对应的是 Head节点后面的第一个操作节点(它相当于这串 Sink 的头),spliterator 对应着数据源。

这个时候我们回过头看一下 Sink 这个接口,它继承自 Consumer 接口,又定义了 begin()end()cancellationRequested() 方法。Sink 直译过来是水槽,如果把数据流比作水,那水槽就是水会流过的地方。begin() 用于通知水槽的水要过来了,里面会做一些准备工作,同样 end() 是做一些收尾工作。cancellationRequested() 是原来判断是不是可以停下来了。Consumer 里的accept() 是消费数据的地方。

@Override
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
   Objects.requireNonNull(wrappedSink);
   if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
       wrappedSink.begin(spliterator.getExactSizeIfKnown());//1
       spliterator.forEachRemaining(wrappedSink);//2
       wrappedSink.end();//3
   }
   else {
       copyIntoWithCancel(wrappedSink, spliterator);
   }
}

有了完整的水槽链,就可以让水流进去了。copyInto() 里做了三个动作:

  1. 通知第一个水槽(Sink)水要来了,准备一下。
  2. 让水流进水槽(Sink)里。
  3. 通知第一个水槽(Sink)水流完了,该收尾了。

突然想到宋丹丹老师的要把大象放冰箱要几步?

注:图中蓝色线表示数据实际的处理流程。

每一个 Sink 都有自己的职责,但具体表现各有不同。无状态操作的 Sink 接收到通知或者数据,处理完了会马上通知自己的 下游。有状态操作的 Sink 则像有一个缓冲区一样,它会等要处理的数据处理完了才开始通知下游,并将自己处理的结果传递给下游。

例如 sorted() 就是一个有状态的操作,一般会有一个属于自己的容器,用来记录处自己理过的数据的状态。sorted() 是在执行 begin 的时候初始化这个容器,在执行 accept 的时候把数据放到容器中,最后在执行 end 方法时才正在开始排序。排序之后再将数据,采用同样的方式依次传递给下游节点。

最后数据流到终止节点,终止节点将数据收集起来就结束了。

然后就没有然后了,copyInto() 返回类型是 void ,没有返回值。

wrapAndCopyInto() 返回了 TerminalOps 创建的 Sink,这时候它里面已经包含了最终处理的结果。调用它的 get() 方法就获得了最终的结果。

回顾

再来回顾一下整个过程。首先是将 Collection 转化为 Stream,也就是流水线的头。然后将各个中间操作节点像拼积木一样拼接起来。每个中间操作节点都定义了自己对应的 Sink,并重写了 makeSink() 方法用来返回自己的 Sink 实例。直到终止操作节点出现时才开始将 Sink 实例化并串起来。然后就是上面提到的那三步:通知、数据流入、结束。

本文介绍和分析了最常规的 stream 用法和实现,实际上 stream 还有很多高阶用法,比如利用协程实现的并行流。感兴趣的同学可以研究一下。当然既然是高阶用法,用的时候一定要多注意。

参考

附录I

以下是初始化 Head 节点和中间操作的实现。

/**
 * Constructor for the head of a stream pipeline.
 *
 * @param source {@code Spliterator} describing the stream source
 * @param sourceFlags the source flags for the stream source, described in
 * {@link StreamOpFlag}
 * @param parallel {@code true} if the pipeline is parallel
 */
//初始化Head节点的时候会执行
AbstractPipeline(Spliterator<?> source,
                 int sourceFlags, boolean parallel) {
    this.previousStage = null;
    this.sourceSpliterator = source;
    this.sourceStage = this;
    this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
    // The following is an optimization of:
    // StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
    this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
    this.depth = 0;
    this.parallel = parallel;
}

/**
 * Constructor for appending an intermediate operation stage onto an
 * existing pipeline.
 *
 * @param previousStage the upstream pipeline stage
 * @param opFlags the operation flags for the new stage, described in
 * {@link StreamOpFlag}
 */
//初始化中间操作StatelessOp和StatefulOp的时候会执行
AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
    if (previousStage.linkedOrConsumed)
        throw new IllegalStateException(MSG_STREAM_LINKED);
    previousStage.linkedOrConsumed = true;
    previousStage.nextStage = this;

    this.previousStage = previousStage;
    this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
    this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
    this.sourceStage = previousStage.sourceStage;
    if (opIsStateful())
        sourceStage.sourceAnyStateful = true;
    this.depth = previousStage.depth + 1;
}
0

《系统架构》读书笔记:架构到底是什么?

引子

这个月读了一本书,《系统架构》。然而这本书讲的不仅仅是软件系统的架构,而是更高一个层面,它讲的是复杂系统的架构。这本书的三位作者,有两位是航空航天专业的教授和副教授,所以书里用了不少 NASA 的项目为案例,比如人类有史以来最复杂的工程——阿波罗计划。读完这本书,让我对架构的认识提升了一个高度,原先各种离散的关于架构的知识和理解,在这个框架下,终于可以融会贯通了。

系统和系统思维

首先,系统是什么?按本书的定义,系统是由一组实体和这些实体之间的关系所构成的集合,而其功能要大于这些实体各自的功能之和。后半句很重要,如果一个系统的功能,等于其部件的功能之和,那么这个系统就没有存在的意义。因为我们单独使用那些部件,也可以得到需要的功能。只有当这些部件组合时,能够涌现出新的功能,那才算是组成了一个系统。

要理解系统架构,首先要有系统思维。所谓系统思维,就是把某个疑问、某种状况或某个难题明确地视为一个系统,也即是视为一组相互关联的实体,而不是孤立的一个对象。

系统思维要有四个步骤是:
1. 确定系统整体的形式与功能。
2. 确定系统中包含的组件,组件的形式与功能。
3. 确定系统中各个组件之间的关系,并且定这些关系的形式及其功能。
4. 根据组件的功能及功能性的互动来确定系统的涌现属性。

系统思维的初级目标是理解系统是什么,更进一层的目标是为了预测系统在发生某些变化之后的情况。而最高级的目标,则是用部件来合成一个系统,这个过程也就是所谓的系统架构。

系统架构的分析

形式与功能

形式是系统的物理体现或信息体现,它有助于功能的执行。形式可以分为形式对象,以及这些形式对象之间具有的形式关系(也就是结构)。例如,汽车作为一个系统,它的形式对象就是汽车部件,而软件系统的形式对象则是模块、过程、代码和指令,而这些形式对象则通过不同的结构组装成一个系统。

系统的另一个属性就是功能。功能由过程和操作对象组成,过程在操作对象上执行之后,会改变操作对象的状态。当系统对外展现的功能对系统的外部的对象进行操作时,系统的价值就体现出来了。例如,离心泵中的电动机可以带动叶轮旋转,这是它内部的一个功能。而离心泵作为一个系统,它可以给外部对象(例如某种液体)加压,从而移动液体。这也就是它的功能和价值所在。

屏幕快照_2018-10-11_上午11.28.19

形式和功能的区别就是,形式决定了系统是什么,而功能决定了系统能做什么。架构其实就是形式与功能之间的映射。形式结构对功能起着承载作用,或者能够促进相关的性能。在复杂系统里,形式与功能的映射会非常复杂,包含很多不确定的问题,或者非理想的因素。因此架构师需要使用抽象等方法来简化架构,以便能够更好的理解和交流架构。

另外,还有一种特殊的功能,叫做解决方案无关的功能。例如,我要从上海出差去北京,那么“将旅客从上海快速、安全的运送到北京”就是解决方案无关的功能,而“使用高铁/飞机将旅客从上海运送到北京”则不是。好的系统规范书,应该是使用与特定解决方案无关的功能来描述的。如果系统规范书将人引导向某种具体的解决方案,可能会令架构师的视野变窄,从而不去探索更多的潜在选项。

从概念到架构

前面说过,架构就是功能与形式之间的映射,但对于复杂系统,这种映射往往非常复杂。架构师常常需要创造一些概念,来简单明了的解释功能是如何映射到形式的。例如前面离心泵的例子,它的解决方案无关的功能则是“移动液体”,而离心泵本身其实就是一个概念,一提到离心泵,熟悉的人一定会想到电动机、叶轮等等。其他的概念包括油电混动、高速铁路、发光二极管、快速排序、分布式缓存等等。软件开发中的各种设计模式,其实也是概念。

对于一个解决方案无关的功能,往往能提出多个不同的概念。架构师需要创造性地提出这些概念,对它们进行整理,并选定其中一个概念,将其转化为一套架构。

复杂系统的架构往往是分层的,同时我们还需要对架构进行模块化。需要注意的是,如果要对某一层的架构进行模块化,我们必须将其分解到下一层。因为只有检查各个实体在更下一层的关系,才能更好的对当前层级进行模块化。

创建系统架构

架构师

很多人常常会问,架构师的职责到底是做什么?这本书给出了很明确的回答,架构师的职责主要是以下三个方面:
- 减少歧义,确定系统的边界、目标和功能
- 发挥创造力,创建概念
- 管理复杂度,为系统选定一种分解方案

而架构师的交付成果,应该包括:
- 一套清晰、完整、连贯的目标,并且是可行的(80% 以上概率)。
- 系统所在的大环境(法律法规、行业规范等等)以及整个产品环境的描述。
- 系统的概念以及操作方式。
- 系统的功能描述(至少两层分解),除了系统对外界展现的功能,也包括系统的内部功能。
- 系统的形式(至少两层分解)和形式结构,以及功能和形式之间的映射。
- 所有的外部接口以及接口控制过程的详细描述。
- 开发成本、工期、风险、实现计划等。

消除歧义,确定目标

为了消除歧义,架构师必须首先理解上游和下游的相关因素对系统架构的影响。上有因素包括:公司策略、营销、法律法规、行业标准、技术成熟度等,下游因素包括实现(编码、制造、供应链管理)、操作、产品与系统的演化。

复杂的系统一般会涉及多个的利益相关者,他们会有不同的诉求和目标,架构师需排定各项目标之间的优先次序。首先,可以把价值视为一种交换,在交换过程中,我方的成果用来满足对方的需求,而对方的成果也同样用来满足我方的需求。其次,可以根据利益相关者对本产品的重要程度,来排列其优先次序。最后,则可以把系统的目标,展示在系统问题描述中(System Problem Statement, SPS)。

发挥创造力,创建概念

接下来,架构师就需要发挥创造性、创建概念了。创造概念,主要有两种方式,一种是无结构的方式,一种是结构化的方式;无结构的创新包括头脑风暴法、自由联想法等方法。对于一些包含多个功能的丰富概念,我们可以对其进行扩展和分解,提出对应的概念片段,而这些概念片段组合后,又会形成新的整体概念。最后通过定量和定性分析,筛选出 2~3 个作为候选概念。

管理复杂度,为系统选定一种分解方案

架构师另外一项工作,就是分解系统,管理复杂度。系统的表面复杂度就是系统的难懂程度,表面复杂度高的系统理解起来会比较困难。架构师可以通过抽象、层级化、分解及递归等手段来减少表面复杂度,但这样做可能会提升实际复杂度。其中架构师最重要的一项决策,就是对系统进行的分解。要判断一种分解方式好不好,必须先向下分解两层,并根据第二层的分解情况,来检查第一层的分解方式是否合适。另外,架构师也要选择合适的分解平面,例如可以按功能分解、按形式分解、按模块变化程度分解、按供应商分解等等。

架构决策

其实架构方面的决策也可以用一些程序化的方法进行计算,以帮助架构师进行决策,这部分比较枯燥就不细说了。不过书中提到架构决策的模式,还有点意思,一共有六个模式:
1. Decision-Option(决策-选项):一组决策,每个决策都有一套离散选项。例如,开始一个系统需要做一下两个决策,决定数据库和 API 接口的使用何种技术:数据库是用 MySQL、MongoDB 还是 Cassandra,API 接口是用 HTTP 还是 Protobuf。这就是一个 Decision-Option 决策。
2. Down-Selecting(筛选):一组二选一的决策,代表选择实体中的某个子集。例如,我要从全国各地的 候选 IDC 机房中,选择合适的机房来部署 CDN,就是一个 Down-Selecting 决策。
3. Assiging(指派):把一个实体集中的元素,指派给另一个实体集中的某个或某些元素。
4. Partitioning(分区):把一个实体集的元素划分为多个互斥的子集,并且覆盖所有元素。这是模块分解的典型决策模式。
5. Permuting(排列):在一个实体集和一个位置集之间建立一一对应关系。
6. Connecting(连接):给定一个用图中节点表示的实体集,用一组连线展示这些节点之间的关系。网络拓扑结构的决策就是一个 Connecting 问题,即选择星形拓扑、环形拓扑或者总线拓扑等等。

书里用了一个很牛逼的例子——阿波罗计划,来说明架构决策的方案。

屏幕快照_2018-10-11_上午11.30.41

总结

看完这本书,你会发现,其实所有的架构,软件也好、汽车飞机等各种机器的架构也好,都是相通的。甚至我各种生物组织、团队组织的架构,也是一样的道理。只要我们能掌握系统架构的思维,再加上各个领域的专业知识,我们就能做出一套优秀的架构。

系统架构原则

书中最后总结了系统架构的二十六个原则,我摘录在这里供大家参考:

涌现原则:当各实体拼合成一个系统时,实体之间的交互会把功能、行为、性能和其他内在属性现出来。

整体原则:每个系统都作为某一个或某些个大系统的一小部分而运作,同时,每个系统中也都包含着更小的一些系统。

聚焦原则:在任何一个点上都能发现很多影响系统的问题,而其数量已经超出了人们的理解能力。因此,我们必须找出其中最关键、最重要的那些问题,并集中精力思考它们。

二元原则:所有由人类构建而成的系统,其本身都同时存在于物理领城和信息领域中。

受益原则:好的架构必须使人受益,要想把架构做好,就要专注于功能的涌现,使得系统能够把它的主要功能通过跨越系统边界的接口对外展示出来。

价值与架构原则:价值是有着一定成本的利益。架构是由形式所承载的功能。由于利益要通过功能而体现,同时形式又与成本相关,因此,这两个论述之间形成一种特别紧密的联系。

与特定解决方案无关的功能原则:糟糕的系统规范书总是把人引向预先定好的某一套具体解决方案、功能或形式中,这可能会令系统架构师的视野变窄,从而不去探素更多的潜在选项。

架构师角色原则:架构师的角色是解决歧义、专注创新,并简化复杂度。

歧义原则:系统架构的早期阶段充满了歧义。架构师必须解决这种歧义,以便给架构团队定出目标,并持续更新该目标。

现代实践压力原则:现代产品开发过程是由同时工作着的多个分布式团队来进行的,而且还有供应参与,因此,它更加需要有优秀的架构。

架构决策原则:我们要把架构决策与其他决策分开,并且要提前花一些时间来道慎地决定这些问题,因为以后如果想变更会付出很高的代价。

遗留元素复用原则:要透相地理解遗留系统及其现属性,并在新的架构中把必要的遗留元素包据进来。

产品进化原则:系统必须进化,否则就会失去竟争力。在进行架构时,应该把系统中较为稳固的部分定义为接口,以便给元素的进化提供便利。

开端原则:在产品定义的早期阶段列出的(企业内部和企业外部的)利益相关者会对架构产生极其重大的影响。

平衡原则:有很多因素会影响并作用于系统的构想、设计、实现及操作,架构师必须在这些因素中寻求一个平衡点,使大多数重要的利益相关者得到满足。

系统问题陈述原则:对问题所做的陈述会确定系统的高层目标,并划定系统的边界。就问题陈述的正确性进行反复的辩论和完善,直到你认为满意为止。

歧议与目标原则:架构师必须解决这些歧义。以便提出几条有代表性的目标并持续地更新它们。这些目标要完备且一致,要兼具挑战性和可达成性,同时又要能够为人类所解决。

创新原则:在架构中进行创新,就是要追求一种能够解决矛盾的好架构。

表面复杂度原则:我们要对系统进行分解、抽象及分层,将其表面复杂度控制在人类所能理解的范围。

必备复杂度原则:系统的必备复杂度取决于它的功能,把系统必须实现的功能仔细描述出来。然后选择一个复杂度最低的概念。

第二定律原则:系统的实际复杂度总是会超过必备复杂度。架构师要令实际复杂度尽量接近必备复杂度。

分解原则:分解是由架构师主动做出的选择。分解会影响性能的衡量标准,会影响组织的运作了式及供应商的价值捕获潜力。

二下一上原则:要想判断出对 Level1 所做的分解是否合适,必须再向下分解一层,以确定 Level2 的各种关系。

优雅原则:对于身处其中的架构师来说。如果系统的必备复杂度较低,而且其分解方式能够同与多个分解平面相匹配,那么该系统就是优雅的。

架构健壮程度原则:好的架构要能够应对各种各样的变化。能够应对变化的那种架构,要么是比较健壮架构。要么是适应能力比较强的架构。前者能够处理环境中的变化,而后者则能够适环境中的变化。

架构决策的耦合与整理原则:可以按照指标对决策的敏感度以及决策之间的连接度来排定架构决策之间的先后顺序。

0