基于 Apollo 的 配置中心 Matrix 2.0 实践总结

配置中心

首先简单介绍一下什么是配置中心,我们为什么需要它,为什么要花力气去完善它。

微服务化的挑战

传统单体应用( monolithic apps )因种种潜在缺陷,如:随着规模的扩大,部署效率逐渐降低,团队协作效率差,系统可靠性变差,维护困难,新功能上线周期长等,迫切需要一种新的架构去解决这些问题,而微服务( microservices )架构正是当下一种正确的解法。

不过,解决一个问题的同时,往往会诞生出很多新的问题,故:微服务化的过程中伴随着很多的挑战,其中一个挑战就是有关服务(应用)配置的。

当系统从一个单体应用,被拆分成分布式系统上一个个服务节点后,配置文件也必须更着迁移(分割),这样配置就分散了,不仅如此,分散中还包含着冗余,冗余分两方面:服务与服务之间(如:有 A,B,C 三个服务调用 D 服务,那么 D 服务的地址会被复制三份,因为 A,B,C 三个服务是 share nothing 的),同服务实例之间(如:A服务的所有实例都是一样的配置,且它们在物理上很有可能是分散的,即:不在一台机器上)。

在单体应用时期,我们管理配置只需要考虑环境(develop,test,staging,producting...)这一个维度,那么现在就多了服务(应用)这个维度。

再明确一下上面说的问题:配置文件分散且冗余,映射到配置管理上就是:分散意味着需要一个个处理,冗余意味着重复操作。

为了解决这个问题,配置必须集中管理(逻辑上分散,物理上集中),而实现这个功能的系统组件(或基础设置)就是配置中心。

配置中心核心需求

既然集中管理,那么就是要将应用的配置作为一个单独的服务抽离出来了(配置不再和应用一起进代码仓库),同理也需要解决新的问题,比如:版本管理(为了支持回滚),权限管理,灰度发布等

以上讨论的还都停留在静态配置的层面上,而应用除了静态的配置(如:数据库配置,一些云服务的参数配置,服务启动后一般不会变动),还会有一些动态的配置(如:灰度开关,一些常量参数:超时时间,限流阈值等),还有理论上:

在一个大型的分布式系统中,你没有办法把整个分布式系统停下来,去做一个软件的、硬件的或者系统的升级

业务需求的一些天然动态行为(如:一些运营活动,会动态调整一些参数),加之理论上必须要支持这个特性,所以,配置中心服务还得支持动态特性,即:配置热更新。

简单总结一下,在传统巨型单体应用纷纷转向细粒度微服务架构的历史进程中,服务配置中心是微服务化不可缺少的一个系统组件,其解决的就是:分布式系统的动态配置问题。

那么我们是怎么解决的呢?那就是 Matrix 1.0 的故事了。

Matrix 1.x

Matrix 是我们自研的服务配置中心,它完成了第一步,即:配置集中管理,所有应用的配置文件都交给 Matrix 管理,拥有环境隔离,版本控制,权限管理等功能。

配置文件是在 CI 构建阶段静态注入的,不同环境注入相应的配置文件,对不同的 build 工具(如:maven,sbt)都实现了配置注入的插件,来从 Matrix 上拉取配置文件

那么为什么还要进行 Matrix 2.0 的工作呢?Matrix1.x 有什么缺陷吗?

在服务(应用)数不多的情况下,Matrix1.x 是完全够用的,但是随着业务规模的发展,问题会渐渐暴露出来。

配置冗余

Matrix 并没有解决配置冗余的问题,我们需要类似模版的东西,将冗余部分抽取成一份并共享,降低配置维护成本。

打包耦合

其次,CI 打包与配置注入耦合,意味着打包与环境耦合,一个环境对应一个包(镜像),这其实违背了容器的"一份镜像到处运行"的理念,即:镜像与环境无关,应用需要什么环境的配置,在启动阶段确定(注入)就可以了。

不支持动态更新

最后是没有配置动态(热)更新能力,前面已说过这个也是必须要支持的;只有我们的基础设施是完美的,其之上的业务才能是完美的。

所有问题,在服务规模小(管理/运维复杂度低)时,都不是问题。

随着业务复杂度的不断提高,和微服务架构的不断深化,现有的 Matrix 1.x 版本暴露出很多缺失的关键能力(比如权限管理,配置模板,热发布等),逐渐成为产品快速迭代的瓶颈之一,我们亟需对 Matrix 进行升级改造,打造一个更强大、更易用的微服务配置中心。

所以我们需要 Matrix 2.0。

Matrix 2.0

在撸起袖子开始干之前,还是得静下心来评估一下,是继续走自研,还是选择别人开源的进行二次开发?我做了一段时间的调研,显然继续自研的开发 cost 是巨大的,核心功能如:配置热更新,灰度发布,配置模版(去冗余)都得从零开始开发, 其次还要保证高性能,高可用等非功能性需求。

二次开发的选择其实也不多(这并不是什么坏事),参考网络上已有的对比与讨论,可得出以下结论:

注册中心 配置存储 时效性 数据模型 维护性 优点 缺点
disconf zookpeer 实时推送 支持传统的配置文件模式,亦支持 KV 结构数据 提供界面操作 基于分布式的 Zookeeper 来实时推送稳定性、实效性、易用性上均优于其他 代码复杂, 2016 年 12 月之后没有更新,功能不够强大,不能完全满足我们的需求
zookpeer zookpeer 实时推送 支持传统的配置文件模式,亦支持 KV 结构数 命令操作 实时推送稳定性、实效性 太底层,开发量大
diamond mysql 每隔15s拉一次全量数据 只支持 KV 结构的数据 提供界面操 简单、可靠、易用 数据模型不支持文件,使用不方便
Spring Cloud Config git 人工批量刷新 文件模式 Git 操作 简单、可靠、易用 需要依赖 GIT,并且更新 GIT
Apollo mysql 实时推送 + 定时拉取 支持传统的配置文件模式,亦支持 KV 结构数 提供界面操 架构设计和稳定性考虑比较完善,不少大厂在用,Portal 的体验不错, 更新活跃 整体架构略显复杂,和我们容器环境不太一致

就目前看来"真正能打的"就 Apollo 了,由于自研成本较大,并且 Apollo 的代码也并不复杂(是标准的 Spring Boot 项目),其功能也基本上能覆盖我们的需求,所以我们最后选择基于 Apollo 进行二次开发。

对于不了解 Apollo 的同学,下面简单介绍一下,详情可参考官方文档

Apollo

Apollo(阿波罗)是携程框架部门研发的开源配置管理中心,能够集中化管理应用不同环境、不同集群的配置,配置修改后能够实时推送到应用端,并且具备规范的权限、流程治理等特性。

其 Portal 界面的截图如下:

apollo-home-screenshot

目前其最新版本(1.0.0)主要提供的功能有:

  • 多环境配置管理
  • 多集群配置管理
  • 支持热发布
  • 版本发布管理(支持配置回滚)
  • 灰度发布
  • 权限管理、发布审核、操作审计
  • 客户端配置信息监控

可以看到 Apollo 提供了很多强大的功能,解决了 Matrix 1.0 待解决的问题;锦上添花的是其对 Spring 项目的支持非常好,这又能大大节省开发时间。

二次开发

部署模式改造

二次开发主要是做适应我们环境的定制改造,而非添加大的新功能(至少这个阶段不会),首当其冲的就是部署模式的改变,下图是 Apollo 的架构图:

overall-architecture

这个图是从上往下看的,第一层是 client,也就是与应用集成的 SDK,还有 Protal(包括自己的 server 和 db),它们通过一层 SLB 访问到 Meta Server,而Meta Server 是 Eureka(服务注册中心)的一层封装,用于发现 Config Service 和 Admin Service(它们启动时会向 Eureka 注册自己),也就是最后一层,这两个服务共同管理着我们的配置,配置则存储在 ConfigDB 中。

外面还有一条 Client 到 Config Service 的箭头,就是其实时推送机制的实现原理:Client 通过(HTTP)Long Polling的方式,不停的询问 Config Service,如果配置有更新(发布)则会立即返回,无更新则返回 HTTP 状态码 304。

而我们的服务都是部署在 Kubernetes 集群上的,Kubernetes 有自己的服务发现功能,包括 LB 功能,所以与 Eureka 重复了,需要将 Eureka 剥离出来,将 Meta Server 去掉,我们的部署方式如下图:

DA83200EA3E7A2A7D36D140497F16C25

所有非生产环境对应一个 portal,每个环境独立部署 config service 和 admin service,其中生产环境在单独的 k8s 集群中(单独对生产环境做了隔离,是因为 Apollo 的权限管理还不够强大,不支持区分环境的访问权限)。

这个改造分三部分,第一部分和第二部分是分别去除 Protal 和 Client 对 Meta Server 对依赖,即:去除在 Client 和 Protal 中配置的 Meta server 地址,而是直接使用 Kubernetes 暴露的服务域名(开发和 local 环境因为需要本地访问,有公网地址)。

第三部分是去除 Config Service 上的 Eureka Server,Admin Service 上的 Eureka Client,因为 Eureka 非常成熟易用,且是个声明式(基于注解和配置)的框架,所以改动起来并不麻烦,我们只是将 Eureka 的注解去掉了而已,没有把依赖去掉(如果要去掉依赖,那么工作量会很大,因为依赖的地方多,整个 Meta Server 都是对其 Client API 的封装)。

唯一的坑就是记得在配置文件中把spring.cloud.discovery.enabled设置为 false,因为 Protal 对 Admin Service 的健康检查是基于spring boot actuator,而它是自动的,检测到有 Eureka 的依赖,就会启动相关健康检查的 Endpoint。

如果想做的好一点,可以做个配置开关,启动时配置是否启用 Eureka,当然这样工作量会很大,或许官方会考虑(毕竟容器编排系统是趋势,大中小厂都会用)。

本地开发

其次我们还添加了 LOC 环境(针对本地开发),UT 环境(针对单元测试),这方面 Apollo 有方便自定义环境的方法提供。不过我们并不满意官方推荐的本地开发模式,我们希望配置中心对本地开发是透明的,对于 Spring Boot 项目,原来是基于profile 覆盖的,那么现在还是 profile 覆盖的方式,这是最好的。然而 SDK 是将 Apollo 上拉取的配置覆盖在 Spring 的配置上的,即 Spring 的 profile 机制会失效,经讨论后决定在 Client 启动时:如果检测到当前是 LOC 环境,则将 Apollo 和 Spring 的配置覆盖顺序倒置。

这里简单的说一下实现原理,SDK 对 Spring 项目的集成是通过往 Environment 的 propertySourceList 中以 addFirst 的方式添加自己的配置,放在了查找链的最前面,故达到了覆盖一切的目的,而我们在检测到是 LOC 环境时以 addLast 的方式添加 Apollo 上的配置,就不会覆盖任何配置了,原来的 profile 机制依然有效。

封装SDK

还有一个坑,就是 SDK 的初始化时间问题,或者说是 Client 拉取配置的时间点问题;任何应用都有框架级别的配置,而有些配置是在一个非常早的时间点生效的,这是"应用启动时读取配置"这种方式必须考虑的问题,Spring Boot 项目的 Logging System 就是初始化早于 Apollo 的配置初始化(覆盖)的例子,也就是说:关于 log 的配置,不会生效。

我们解决的方式就是在 Apollo 配置初始化之后,重新初始化这些模块,使得 Apollo 的配置生效,这种解法的副产物是:我们使得这些模块的配置具有了动态性(我们可以安全的在运行期去重新初始化这些模块)

为此我们封装了自己的 SDK:vapollo(依赖于 apollo-client 的 SDK),提供定制功能。

配置迁移

二次开发结束后,接着面对的问题就是配置迁移问题,因为我们选择了对 Apollo 进行二次开发,而不是对 Matrix1.x 进行扩展,故:我们需要将配置迁移过去。

我们编写了详细的配置迁移操作手册(主要涵盖了操作流程和配置规范),并设定了迁移计划,之后项目的配置会慢慢全部迁移过去,为了更安全的实施这个过程,我们在实际迁移一个简单项目后,发现还可以编写工具来帮助我们,比如配置的 diff(检查是否遗漏项,不同项)。

跳出眼前,展望未来

Matrix2.0 的工作暂时告一段落,本文虽然是总结工作内容的,但是最后我希望跳出眼前的工作,看一看未来,从而了解我们现在的不足之处。

主要分两部分:Matrix2.x 和 Matrix3.0

Matrix2.x 指现阶段工作的延续,就目前来说(实际使用一段时间后),还有很多操作上的可优化点(比如:添加更多的默认设置,让开发人员对 Apollo 更加无感知),其次是关于配置的规范也会不断的完善(什么环境必须要有那些配置,namespace 的创建和关联),甚至会有完善的配置发布流程,有了规范,就会有 review,config review(就像代码 review 那样)

世界是变化的,现阶段的工作只是符合了我们现在的环境,之后会怎样?我们的微服务架构还在发展,我们还没上 service mesh,如今基础设置都在不断的下沉,我们的配置中心需要怎样?我想也是一样的,对开发来说:我们对配置的存在会越来越无感知,我不关心这个配置项是哪来的,不关心当前是何种部署环境,不关心配置项是变化还是不变的,我只关心用到它的业务是怎样的;另一方面对配置的管理和维护也应该越来越智能,越来越自动。希望 Matrix3.0 能实现这些目标。

参考资料

  1. 微服务架构为什么需要配置中心?
  2. 阿里巴巴微服务与配置中心技术实践之道
  3. Apollo配置中心Wiki
  4. Apollo Client Spring项目集成原理
  5. Apollo源码解析
0

ZooKeeper 分布式锁实践(下篇)读写锁

ZooKeeper 分布式锁实践(上篇)排它锁 中我们通过代码实践了如何使用 ZooKeeper 组件来实现排他锁。

排他锁简单易用,但是缺点也很明显:

  • 竞争压力大:当锁被占用之后,其他获取锁的操作只能阻塞等待;当锁释放后,所有等待锁的进程会在同一时刻争抢锁的使用权。
  • 羊群响应:锁释放后,会通知所有等待锁的进程,如果等待者特别多,一时间锁的竞争压力将会特别大。

简单来说就是:通知范围太广、锁的粒度太大。我们可以分别从这两个层面去寻找解决方案:

  • 缩小通知范围:等待锁的小伙伴们按先来后到的顺序排队吧,排好队了,接下来我只需要关心我前面一个节点的状态,当前一个节点被释放,我再去抢锁。

  • 缩小锁的粒度:锁不关心业务,但是可以简单地通过操作的读、写性质来二分锁的粒度:

    • 读锁:又称共享锁,如果前面没有写节点,可以直接上锁;当前面有写节点时,则等待距离自己最近的写节点释放( 删除 )。

    • 写锁:如果前面没有节点,可以直接上锁;如果前面有节点,则等待前一个节点释放( 删除 )。

      思考:为什么不是关注前面距离自己最近的写节点?

      如果两个写节点之间有读节点,必需等待读节点释放之后再进行写节点请求,否则会有不可重复读的问题。

数据结构

和排他锁一样,我们通过 ZooKeeper 的节点来表示一个读写锁的父节点,如 /SHARE_LOCK,通过父节点下的临时自增子节点来表示一个读写操作请求,如 /SHARE_LOCK/R_0000000001 。整体数据结构如下图所示。

算法

获取锁

获取锁的算法步骤:

  1. 开始尝试获取锁
  2. 如果持久化父节点不存在,则创建父节点
  3. 如果当前临时自增子节点不存在,则创建子节点
  4. 获取父节点下的所有子节点
  5. 在所有子节点中,查找序号比当前子节点小的前置子节点( 最近的兄节点 )。有两种情况:
    • 读请求:查找比自己小的前置子节点 ( 最近的兄节点 )
    • 写请求:查找比自己小的前置子节点 ( 最近的兄节点 )
  6. 如果没有更小的前置子节点,则持有锁
  7. 如果有更小的前置子节点,则监听该子节点被释放( 删除 )的事件
  8. 释放 ( 删除 )子节点事件被触发后,重复第 1 步

释放锁

释放锁的算法与排他锁部分的释放锁算法相似,这里不再赘述。

加锁、解锁流程

加锁、解锁完整的流程图。

代码实现

子节点定义

子节点属性

  • lockName 读写锁的名称,即父节点的名称
  • name 子节点的名称,格式为 :{请求类型:R/W}_{自增序号} ( 子节点的路径为:{lockName}/{name}
  • seq 子节点的自增序号,通过解析 name 属性 _ 下划线分隔符后面的数字字符串来获取序号( ZooKeeper 创建临时自增节点时会自动分配 Int 范围内的序号 )
  • isWrite 子节点是否为写请求,通过解析 name 属性 _ 下划线分隔符前面的英文字符来判断请求类型 :
    • R :读请求
    • W :写请求

zk_lock_share_childnode

读写锁定义和初始设置

读写锁的属性

  • lockName 读写锁的名称,即父节点的路径
  • locker 获取锁的请求方,即锁的持有者,释放锁时需要验证请求者与锁的持有者是否一致
  • isWrite 请求类型:
    • 读:false
    • 写:true

读写锁的初始设置

  • 连接到 ZooKeeper 实例
  • 连接后,如果父节点不存在,则创建父节点

zk_lock_share_zksharelock_setup

尝试获取锁

尝试获取锁的算法实现

  1. 获取或创建 ZooKeeper 子节点
  2. 获取当前子节点后,遍历所有的子节点,查找:
    • front 离当前子节点最近的兄节点:序号比当前子节点的序号小、且在小于当前序号的子节点中序号是最大的
    • fontWrite 离当前子节点最近的写、兄节点:序号比当前子节点的序号小、且在小于当前序号的子节点中序号是最大的、且为写子节点
  3. 查找后,返回序号更小的兄节点:
    • 读请求:返回最近的写、兄节点,用于 Watch 监听释放( 删除 )事件
    • 写请求:返回最近的兄节点,用于 Watch 监听释放( 删除 )事件
  4. 如果没有更小的子节点,返回 None ,表示成功地获取了锁

zk_lock_share_zksharelock_trylock

zk_lock_share_zksharelock_createchildnode

同步获取锁

同步获取锁的算法实现

  1. 尝试获取锁
  2. 如果没有兄节点,则成功持有锁
  3. 如果得到更小的兄节点,则监听该兄节点的释放( 删除 )事件
  4. 收到兄节点的释放( 删除 )事件通知后,重复第 1 步

zk_lock_share_zksharelock_lock

测试验证

最后,通过一个简单的测试方法来验证读写锁的加、解锁过程:

zk_lock_share_zksharelock_test

测试结果

zk_lock_share_zksharelock_test_out

测试结果、分析

# 请求方 操作 输出
1 LOCK1_读 加锁:成功 [LOCK1] : Lock
2 LOCK2_写 加锁:等待,因为有未释放的兄节点 LOCK1
3 LOCK3_写 加锁:等待,因为有未释放的兄节点 LOCK2
4 LOCK4_读 加锁:等待,因为有未释放的兄、写节点 LOCK3
5 LOCK5_读 加锁:等待,因为有未释放的兄、写节点 LOCK3
6 LOCK1_读 解锁:成功,通知到 LOCK2 [LOCK1] : Unlock
7 LOCK2_写 收到通知,尝试加锁:成功 [LOCK2] : Lock
8 LOCK2_写 解锁:成功,通知到 LOCK3 [LOCK2] : Unlock
9 LOCK3_写 收到通知,加锁成功 [LOCK3] : Lock
10 LOCK3_写 解锁成功,通知到 LOCK4、LOCK5 [LOCK3] : Unlock
11 LOCK4_读 收到通知,尝试加锁:成功 [LOCK4] : Lock
12 LOCK5_读 收到通知,尝试加锁:成功 [LOCK5] : Lock
13 LOCK4_读 解锁:成功 [LOCK4] : Unlock
14 LOCK5_读 解锁:成功 [LOCK5] : Unlock

尾声

通过 ZooKeeper 分布式锁实践,对它的接口最直观的感受就是简单。虽然它没有直接提供加锁、解锁这样的原语,但是当你了解了它的数据结构、接口和事件设计之后,加锁、解锁功能简直呼之欲出,实现起来毫无障碍,一切都是那么地合理、妥当。

而 ZooKeeper 的能力远不止于此,就像前面提到的它能够十分轻松地实现诸如:数据发布/订阅、负载均衡、命名服务、分布式协调/通知、集群管理、Master选举、分布式锁、分布式队列这些小菜,不得不佩服 ZooKeeper 设计者的抽象能力。本篇只是浅尝了 ZooKeeper 的基本能力,有关它的设计思路、实现细节仍待进一步发掘和探索。

0

ZooKeeper 分布式锁实践(上篇)排它锁

前面我们使用 Redis 实现了一个简单的分布式排它锁,它的主要问题在于无法及时得知锁状态的变化,虽然能够通过 Redis 的订阅发布模式来实现通知的功能,但实现起来比较复杂、实现成本较高。而 ZooKeeper 天生就是为分布式系统的协调工作而设计的,能够很轻松地实现日常的分布式管理工具,也就是 ZooKeeper 几大菜谱:

  • 数据发布/订阅
  • 负载均衡
  • 命名服务
  • 分布式协调/通知
  • 集群管理
  • Master选举
  • 分布式锁
  • 分布式队列

接下来,就让我们具体来看看,如何通过 ZooKeeper 的部件来实现一个分布式锁。

数据模型

对于一个锁来说,我们至少需要知道它的 ID、以及它的所有者( 只有持有者才能解锁 ),所以简单来说,锁的数据模型就是:

case class ZkLock
(
  lockName: String,
  lockOwner: String
)

那么,如何用 ZooKeeper 来存储锁对象呢?

其实非常简单,用一个 ZNode 来表示一个锁,ZNode 的路径就是锁的 ID、ZNode 的数据即是锁的所有者:

/zk-lock (data = "zk-lock-owner")

原语

锁的原语一般有两个:

  1. 加锁
  2. 解锁

加锁

加锁的一般算法步骤是:

  1. 尝试加锁
  2. 如果锁没有被占用,则加锁成功
  3. 如果锁被占用,则等待锁被释放
  4. 锁被释放后,收到锁释放通知,重复步骤 1

翻译成ZooKeeper的算法步骤就是:

  1. 尝试创建表示锁的临时节点
  2. 如果创建节点成功,则加锁成功
  3. 如果创建节点失败,则创建一个锁节点的监视器,等待锁节点的删除通知
  4. 锁节点被持有者删除后,收到锁节点的删除通知,重复步骤 1

这里有两个注意点:

  • 为什么创建临时节点?当 ZooKeeper 客户端断开与服务端的连接时,它所创建的临时节点会被删除
  • 节点删除监视器

加锁实现

  1. 数据模型:ZkLock 对象:
    case class ZkLock
    (
      lockName: String, // 锁ID = ZNode节点的路径
      lockOwner: String // 锁的所有者 = ZNode节点的数据内容
    ) {
      // 1. connect
      // 2. try lock
      // 3. lock
      // 4. unlock
    }
    
  2. 为了示例完整性,在构造锁对象时会创建一个 ZooKeeper 客户端连接,用来和 ZooKeeper 服务端通信。

    这里通过一个闭锁来同步 ZooKeeper 客户端的连接状态:构造函数会一直阻塞,直到ZooKeeper客户端连接上服务端,然后打开闭锁:

    case class ZkLock(...) {
    
      //////////////////// 1. connect ////////////////////
    
      private val connectSignal = new CountDownLatch(1)
      private val zk = new ZooKeeper(
        ZkConnection.defaultHost,
        60 * 1000,
        new Watcher {
          override def process(e: WatchedEvent): Unit = {
            if (KeeperState.SyncConnected.equals(e.getState)
              && EventType.None.equals(e.getType)) {
              connectSignal.countDown // 连接成功,打开闭锁
            }
          }
        }
      )
      connectSignal.await
    }
    

    与服务端成功建立连接之后,就可以实现下一步尝试加锁的操作了。

  3. 尝试加锁

    • 排他性:锁节点只能被一个 ZooKeeper 成功创建
    • 可重入性:如果锁节点已被创建、且加锁者与锁节点的持有者一样,也返回加锁成功
    • 非阻塞方法:不管是否成功加锁,都立即返回加锁的结果
    case class ZkLock(...) {
    
      //////////////////// 2. try lock ////////////////////
    
      def tryLock: Boolean = {
        Try {
          if (zk.exists(lockName, false) != null) {
            val existOwner = new String(zk.getData(lockName, null, null), "UTF-8")
            lockOwner.equals(existOwner) // 可重入性
          } else {
            zk.create(
              lockName, // 节点路径 = 锁ID
              lockOwner.getBytes, // 节点数据 = 锁的所有者
              Ids.OPEN_ACL_UNSAFE, // 公开访问权限
              CreateMode.EPHEMERAL // 临时节点
            )
            true
          }
        } match {
          case Success(isLocked) => isLocked
          case _ => false
        }
      }
    }
    
  4. 同步加锁
    • 同步加锁算法:尝试加锁:调用 tryLock 方法
      • 如果返回:加锁成功,则完成加锁操作
      • 如果返回:加锁失败,则:
        • 创建闭锁:同步等待锁释放通知
        • 创建锁节点路径的监视器:收到锁节点被删除的事件时,打开闭锁
        • 阻塞等待的闭锁被打开后,重新开始同步加锁( 尾递归调用
    case class ZkLock(...) {
    
      //////////////////// 3. lock ////////////////////
    
      def lock: Unit = {
        if (!tryLock) {
          val releaseSignal = new CountDownLatch(1)
          zk.exists(lockName, new Watcher {
            override def process(e: WatchedEvent): Unit = {
              if (lockName.equals(e.getPath)
                && EventType.NodeDeleted.equals(e.getType)) {
                releaseSignal.countDown
              }
            }
          })
          releaseSignal.await
          lock
        } else {
          println(s"${this} Locked ${nowHourStr} \n")
        }
      }
    }
    

解锁

解锁的算法步骤是:

  1. 锁节点是否存在
  2. 如果不存在,完成解锁
  3. 如果锁节点存在,则判断锁节点的数据( 锁的持有者 )是否和解锁者相同
  4. 如果一样,则删除锁节点,完成解锁

解锁实现

case class ZkLock(...) {

  //////////////////// 4. unlock ////////////////////

  def unlock: Unit = {
    if (zk.exists(lockName, false) != null) {
      val existOwner = new String(zk.getData(lockName, null, null), "UTF-8")
      if (lockOwner.equals(existOwner)) {
        zk.delete(lockName, -1)
        println(s"${this} Unlocked ${nowHourStr}")
      }
    }
  }
}

测试

object ZkLockDemo extends App {
  val lockName = "/zk-lock"
  val locker1 = ZkLock(lockName, "locker1")
  val locker2 = ZkLock(lockName, "locker2")

  async(() => locker1.lock)
  async(() => locker2.lock, waitTime = 1000L)

  async(() => locker1.unlock, waitTime = 2000L)
  async(() => locker2.unlock, waitTime = 3000L)

  Thread.sleep(Int.MaxValue)
}

注:async 方法用于异步地执行传入的函数来演示同步加锁、解锁过程,async 实现代码如下:

def async(action: () => Unit,
          waitTime: Long = 0L,
          delayTime: Long = 0L): Unit = {
  new Thread(new Runnable {
    override def run(): Unit = {
      Thread.sleep(waitTime)
      action()
      Thread.sleep(delayTime)
    }
  }).start()
}

执行结果:

ZkLock(/zk-lock,locker1) Locked 18:22:37.002
ZkLock(/zk-lock,locker1) Unlocked 18:22:38.897
ZkLock(/zk-lock,locker2) Locked 18:22:38.900
ZkLock(/zk-lock,locker2) Unlocked 18:22:39.901

梳理一下整个 Demo 的过程:

  1. locker1 先成功加锁
  2. locker2 在 1 秒后尝试加锁,但是锁已被占用,所以进入阻塞等待阶段
  3. locker1 在 2 秒后解锁
  4. locker2 收到锁释放通知,再次尝试加锁成功
  5. locker2 在 3 秒后解锁

缺陷

以上,一个简单的分布式排他锁就宣告完成,实现代码十分简单。

但是,它也有明显的缺陷:

  1. 排它锁的粒度大,没有区分读、写操作,如果读多写少,则十分影响性能
  2. 羊群效应:锁释放后会通知所有等待中的 ZooKeeper 客户端,然后同时发起加锁请求,瞬时压力很大

那有没有什么好的办法来解决这两个问题呢?欲知答案,请看下篇:ZooKeeper 分布式锁实践(下篇)读写锁

0

浅析 InnoDB 存储引擎的工作流程

InnoDB

InnoDB 是由 Innobase Oy 公司开发,该存储引擎是第一个完整支持 ACID 事务的 MySQL 存储引擎。具有插入缓存两次写自适应哈希索引等关键特性,是一个高性能、高可用的存储引擎。

整体架构

InnoDB 有多个内存块,这些内存块组合在一起组成了一个大的内存池。而 InnoDB 的内存池中会有多个后台线程,这些后台线程负责刷新内存池中的数据,和将脏页(已修改的数据页)刷新到磁盘文件。

后台线程

默认情况下,InnoDB 存储引擎有 13 个后台线程:

  • 一个 master 线程
  • 一个锁监控线程
  • 一个错误监控线程
  • 十个 IO 线程
    • 插入缓存线程
    • 日志线程
    • 读线程(默认 4 个)
    • 写线程(默认 4 个)

下面是我本机上的十个 IO 线程

--------
FILE I/O
--------
I/O thread 0 state: waiting for i/o request (insert buffer thread)
I/O thread 1 state: waiting for i/o request (log thread)
I/O thread 2 state: waiting for i/o request (read thread)
I/O thread 3 state: waiting for i/o request (read thread)
I/O thread 4 state: waiting for i/o request (read thread)
I/O thread 5 state: waiting for i/o request (read thread)
I/O thread 6 state: waiting for i/o request (write thread)
I/O thread 7 state: waiting for i/o request (write thread)
I/O thread 8 state: waiting for i/o request (write thread)
I/O thread 9 state: waiting for i/o request (write thread)
Pending normal aio reads: 0 [0, 0, 0, 0] , aio writes: 0 [0, 0, 0, 0] ,
 ibuf aio reads: 0, log i/o's: 0, sync i/o's: 0
Pending flushes (fsync) log: 0; buffer pool: 0
540 OS file reads, 89 OS file writes, 7 OS fsyncs
0.00 reads/s, 0 avg bytes/read, 0.00 writes/s, 0.00 fsyncs/s

内存池

InnoDB 存储引擎的内存池包含:缓冲池、日志缓存池、额外内存池。这些内存的大小分别由配置文件中的参数决定。其中占比最大的是缓冲池,里面包含了数据缓存页、索引、插入缓存、自适应哈希索引、锁信息和数据字典。InnoDB 会在读取数据库数据的时候,将数据缓存到缓冲池中,而在修改数据的时候,会先把缓冲池中的数据修改掉,一旦修改过的数据页就会被标记为脏页,而脏页则会被 master 线程按照一定的频率刷新到磁盘中。日志缓存则是缓存了redo-log 信息,然后再刷新到 redo-log 文件中。额外内存池则是在对一些数据结构本身分配内存时会从额外内存池中申请内存,当该区域内存不足则会到缓冲池中申请。

内存结构

Master Thread

InnoDB 存储引擎的主要工作都在一个单独的 Master Thread 中完成,其内部由四个循环体构成:主循环( loop )、后台循环( background loop )、刷新循环( flush loop )、暂停循环( suspend loop )。具体工作流程如下图所示:

mysql_thread

主循环

主要负责将缓冲池中的日志文件刷新到磁盘中、合并插入缓存、刷新缓冲池中的脏页数据到磁盘中、删除无用的 Undo 页、产生一个 checkpoint 。在主循环中会多次将脏页刷新到磁盘中,但是有一些刷新任务总会执行,有一些则根据参数来判断当前是否需要刷新。而这个参数 innodb_max_dirty_pages_pct 最大脏页比例是通过配置文件决定的,你可以根据实际情况来调整你自己的最大脏页比例,来达到最好的性能。

伪代码如下:

for (int i = 0; i<10; i++) {
    thread_sleep(1)
    do log buffer flush to disk
    if ( last_one_second_ios < 5) {
        do merge at most 5 insert buffer
    }
    if (buf_get_modified_ratio_pct > innodb_max_ditry_pages_pct) {
        do buffer pool flush 100 dirty page
    }
    if (no user activity) {
        goto background loop
    }
}
if (last_ten_second_ios < 200) {
    do buffer pool flush 100 dirty page
}
do merge at most 5 insert buffer
do log buffer flush to disk
do full pourge
if (buf_get_modifued_ratio_pct > 70%) {
    do buffer pool flush 100 dirty page
} else {
    buffer pool flush 10 dirty page
}
do fuzzy checkpoint
goto loop

后台循环

在后台循环中 InnoDB 会做这些事:删除无用的Undo页、合并插入缓存。如果当前 InnoDB 处于空闲状态,则跳转到刷新循环,否则跳转到主循环继续处理数据。

伪代码如下:

do full purge
do merge 20 insert buffer
if (not idle) {
    goto loop
} else {
    goto flush loop
}

刷新循环

一旦执行到刷新循环,InnoDB 会一直处理脏页数据,直到脏页数据达到最大脏页比例以下。这时候会跳转到暂停循环中(所有数据都处理完毕)。

伪代码如下:

flush loop:
do buffer pool flush 100 dirty page
if (buf_get_modified_ratio_pct > innodb_max_dirty_pages_pct) {
    goto flush loop
} else {
    goto suspend loop
}

暂停循环

在本循环中,InnoDB会将 Master Thread 挂起,减少内存资源使用,一直处于 waiting 状态,等待事件来唤醒。一旦有新的事件过来,就跳转到主循环中。

伪代码如下:

suspend loop:
suspend_thread()
waiting event
goto loop;=

由此可以看出,master 线程的最大的工作内容就是刷新脏页数据到磁盘了。这一步就是把缓存池中被修改的数据页同步到磁盘中。而脏页数据的刷新基本上都是由 innodb_max_dirty_pages_pct 来控制的,所以当你的服务器处理能力比较强,给 InnoDB 分配的内存池比较大,这时候可能你的脏页数据会很难达到最大脏页比,这时候你的数据基本上都在缓冲池中,可能需要很长一段时间才会到数据库磁盘文件中,也就是脏页的刷新速度会很低(MySQL 5.1之前的版本默认是 90%,后面调整到 75%)。所以实际应用中可以根据自己内存和数据库的读写量来设置这个最大脏页比。对于一次刷新脏页数量的设置,在 InnoDB Plugin 中有一个参数 innodb_adaptive_flushing 自适应刷新,InnoDB 会根据产生的重做日志速度来计算出当前最适合的刷新脏页数量。当然 InnoDB Plugin 中还有其它很多参数配置,合理利用这些配置可以极大的提升 InnoDB 存储引擎的性能。

关键特性

前面说到 InnoDB 的三大特性分别为:插入缓存、两次写、自适应哈希索引。下面就简单介绍下这三大特性。

插入缓存

当我插入一条数据,该数据只有一个 ID 索引(聚集索引数据行的物理顺序与列值的逻辑顺序相同)的时候,并且 ID 是自增长的,这时候页中的行记录按照 ID 顺序存放,所以只需要在最新页插入数据即可。但是如果我的表有多个非聚集索引:该索引中索引的逻辑顺序与磁盘上行的物理存储顺序不同,在插入的时候非聚集索引的插入不再是顺序的,这时候要离散的访问非聚集索引页,导致插入性能变低。而插入缓存则在插入的时候判断缓冲池中是否存在当前非聚集索引,如果存在则直接插入,否则先插入到一个缓存区,然后再通过 Master Thread 来合并插入缓存。这样极大的提高了数据的写性能。

两次写

两次写是为了解决在将缓冲池中的脏页刷新到磁盘的过程中,操作系统出现故障,导致当前的脏页部分写失效的问题。通过两次写在下次恢复的时候,InnoDB 会根据两次写的结果来恢复数据。

原理:在刷新脏页的时候,不是直接把脏页数据刷新到磁盘,而是将脏页先写到一个大小为2M的内存缓存中,再将这个内存缓存数据同步到磁盘的共享表空间中。当全部都写到共享表空间后,再将数据刷新到磁盘中。这样如果发生了上面描述的情况,这时候数据会在共享表空间中有个备份,恢复的时候就可以使用共享表空间的数据。

如果有数据库集群的情况下,master数据库是一定要开启两次写的,为了保证数据可靠性。而从数据库可以通过参数 skip_innodb_doublewrite 来禁止两次写功能,来提高插入效率。

自适应哈希索引

InnoDB 会监控对表示的索引查找,如果发现可以通过对索引进行哈希来优化搜索。这时候会对当前的索引建立哈希索引。称之为自适应哈希索引( AHI )。可以通过参数 innodb_adaptive_hash_index 来禁用或启用此特性。

小结

总体来说 InnoDB 的高性能体现在:插入数据的时候先保存在内存中,直接跟内存交互性能比较好,而且还有插入缓存优化,保证了高并发写操作。高可用则表现在两次写特性,保证了机器宕机或者出故障的时候数据不会丢失。这里只是简单介绍了一下 InnoDB 的工作流程和一些特性,当然 InnoDB 还有很多很多强大的功能,比如说事务、锁、索引、算法等等有兴趣的同学可以参考《 MySQL 技术内幕 InnoDB 存储引擎》这本书深入了解。

0

使用 RabbitMQ 实现 RPC

背景知识

RabbitMQ

RabbitMQ 是基于 AMQP 协议实现的一个消息队列(Message Queue),Message Queue 是一个典型的生产者/消费者模式。生产者发布消息,消费者消费消息,生产者和消费者之间是解耦的,互相不知道对方的存在。

RPC

Remote Procedure Call:远程过程调用,一次远程过程调用的流程即客户端发送一个请求到服务端,服务端根据请求信息进行处理后返回响应信息,客户端收到响应信息后结束。

如何使用RabbitMQ实现RPC?

使用 RabbitMQ 实现 RPC,相应的角色是由生产者来作为客户端,消费者作为服务端。

但 RPC 调用一般是同步的,客户端和服务器也是紧密耦合的。即客户端通过 IP/域名和端口链接到服务器,向服务器发送请求后等待服务器返回响应信息。

但 MQ 的生产者和消费者是完全解耦的,那么如何用 MQ 实现 RPC 呢?很明显就是把 MQ 当作中间件实现一次双向的消息传递:

客户端和服务端即是生产者也是消费者。客户端发布请求,消费响应;服务端消费请求,发布响应。

具体实现

MQ部分的定义

请求信息的队列

我们需要一个队列来存放请求信息,客户端向这个队列发布请求信息,服务端消费该队列处理请求。该队列不需要复杂的路由规则,直接使用 RabbitMQ 默认的 direct exchange 来路由消息即可。

响应信息的队列

存放响应信息的队列不应只有一个。如果存在多个客户端,不能保证响应信息被发布请求的那个客户端消费到。所以应为每一个客户端创建一个响应队列,这个队列应该由客户端来创建且只能由这个客户端使用并在使用完毕后删除,这里可以使用 RabbitMQ 提供的排他队列(Exclusive Queue):

channel.queueDeclare(queue:"", durable:false, exclusive:true, autoDelete:false, new HashMap<>())

并且要保证队列名唯一,声明队列时名称设为空 RabbitMQ 会生成一个唯一的队列名。
exclusive设为true表示声明一个排他队列,排他队列的特点是只能被当前的连接使用,并且在连接关闭后被删除。

一个简单的 demo(使用 pull 机制)

我们使用一个简单的 demo 来了解客户端和服务端的处理流程。

发布请求
  • 编写代码前的一个小问题

我们在声明队列时为每一个客户端声明了独有的响应队列,那服务器在发布响应时如何知道发布到哪个队列呢?其实就是客户端需要告诉服务端将响应发布到哪个队列,RabbitMQ 提供了这个支持,消息体的Properties中有一个属性reply_to就是用来标记回调队列的名称,服务器需要将响应发布到reply_to指定的回调队列中。

解决了这个问题之后我们就可以编写客户端发布请求的代码了:

// 定义响应回调队列
String replyQueueName = channel.queueDeclare("", false, true, false, new HashMap<>()).getQueue();

// 设置回调队列到 Properties
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
        .replyTo(replyQueueName)
        .build();

String request = "request";

// 发布请求
channel.basicPublish("", "rpc_queue", properties, request.getBytes());

Direct reply-to:
RabbitMQ 提供了一种更便捷的机制来实现 RPC,不需要客户端每次都定义回调队列,客户端发布请求时将replyTo设为amq.rabbitmq.reply-to,消费响应时也指定消费amq.rabbitmq.reply-to,RabbitMQ 会为客户端创建一个内部队列

消费请求

接下来是服务端处理请求的部分,接收到请求后经过处理将响应信息发布到reply_to指定的回调队列:

// 服务端 Consumer 的定义
public class RpcServer extends DefaultConsumer {

    public RpcServer(Channel channel) {
        super(channel);
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String msg = new String(body);
        String response = (msg + " Received");

        // 获取回调队列名
        String replyTo = properties.getReplyTo();

        // 发布响应消息到回调队列
        this.getChannel().basicPublish("", replyTo, new AMQP.BasicProperties(), response.getBytes());
    }
}

...

// 启动服务端 Consumer
channel.basicConsume("rpc_queue", true, new RpcServer(channel));
接收响应

客户端如何接收服务器的响应呢?有两种方式:1.轮询的去 pull 回调队列中的消息,2.异步的消费回调队列中的消息。我们在这里简单实现第一种方案。

GetResponse getResponse = null;

while (getResponse == null) {
    getResponse = channel.basicGet(replyQueueName, true);
}

String response = new String(getResponse.getBody());

一个简单的基于 RabbitMQ 的 RPC 模型已经实现了,但这个 demo 并不实用,因为客户端每次发送完请求都要同步的轮询等待响应消息,只能每次处理一个请求。RabbitMQ 的 pull 模式效率也比较低。

实现一个完备可用的 RPC 模式需要做的工作还有很多,要处理的关键点也比较复杂,有句话叫不要重复造轮子,spring 已经实现了一个完备可用的 RPC 模式的库,接下来我们来了解一下。

Spring Rabbit 中的实现

和上面 demo 的 pull 模式一次只能处理一个请求相对应的:如何异步的接收响应并处理多个请求呢?关键点就在于我们需要记录请求和响应并将它们关联起来,RabbitMQ 也提供了支持,Properties 中的另一个属性correlation_id用来标识一个消息的唯一 id。

参考spring-rabbit中的convertSendAndReceive方法的实现,为每一次请求生成一个唯一的correlation_id

private final AtomicInteger messageTagProvider = new AtomicInteger();
...
String messageTag = String.valueOf(this.messageTagProvider.incrementAndGet());
...
message.getMessageProperties().setCorrelationId(messageTag);

并使用一个ConcurrentHashMap来维护correlation_id和响应信息的映射:

private final Map<String, PendingReply> replyHolder = new ConcurrentHashMap<String, PendingReply>();
...
final PendingReply pendingReply = new PendingReply();

this.replyHolder.put(correlationId, pendingReply);

PendingReply中有一个BlockingQueue存放响应信息,在发送完请求信息后调用BlockingQueuepull方法并设置超时时间来获取响应:

private final BlockingQueue<Object> queue = new ArrayBlockingQueue<Object>(1);

public Message get(long timeout, TimeUnit unit) throws InterruptedException {
    Object reply = this.queue.poll(timeout, unit);
    return reply == null ? null : processReply(reply);
}

在获取响应后不论结果如何,都会将PendingReplyreplyHolder中移除,防止replyHolder中积压超时的响应消息:

try {
    reply = exchangeMessages(exchange, routingKey, message, correlationData, channel, pendingReply,messageTag);
} finally {
    this.replyHolder.remove(messageTag);
    ...
}

响应信息是何时如何被放到这个BlockingQueue中的呢?看一下RabbitTemplate接收消息的地方:

public void onMessage(Message message) {
    String messageTag;
    if (this.correlationKey == null) { // using standard correlationId property
        messageTag = message.getMessageProperties().getCorrelationId();
    } else {
        messageTag = (String) message.getMessageProperties()
                .getHeaders().get(this.correlationKey);
    }

    // 存在 correlation_id 才认为是RPC的响应信息,不存在时不处理
    if (messageTag == null) {
        logger.error("No correlation header in reply");
        return;
    }

    // 从 replyHolder 中取出 correlation_id 对应的 PendingReply
    PendingReply pendingReply = this.replyHolder.get(messageTag);
    if (pendingReply == null) {
        if (logger.isWarnEnabled()) {
            logger.warn("Reply received after timeout for " + messageTag);
        }
        throw new AmqpRejectAndDontRequeueException("Reply received after timeout");
    }
    else {
        restoreProperties(message, pendingReply);
        // 将响应信息 add 到 BlockingQueue 中
        pendingReply.reply(message);
    }
}

以上的 spring 代码隐去了很多额外部分的处理和细节,只关注关键的部分。

至此一个完整可用的由 RabbitMQ 作为中间件实现的 RPC 模式就完成了。

总结

服务端

服务端的实现比较简单,和一般的Consumer的区别只在于需要将请求回复到replyTo指定的 queue 中并带上消息标识correlation_id即可

服务端的一点小优化:
超时的处理是由客户端来实现的,那服务端有没有可以优化的地方呢?
答案是有的:如果我们的服务端处理比较耗时,如何判断客户端是否还在等待响应呢?
我们可以使用passive参数去检查replyTo的 queue 是否存在,因为客户端声明的是内部队列,客户端如果断掉链接了这个 queue 就不存在了,这时服务端就无需处理这个消息了。

客户端

客户端承担了更多的工作量,包括:

  • 声明replyTo队列(使用amq.rabbitmq.reply-to会简单很多)
  • 维护请求和响应消息(使用唯一的correlation_id来关联)
  • 消费服务端的返回
  • 处理超时等异常情况(使用BlockingQueue来阻塞获取)

好在 spring 已经实现了一套完备可靠的代码,我们在清楚了流程和关键点之后,可以直接使用 spring 提供的RabbitTemplate,无需自己实现。

使用 MQ 实现 RPC 的意义

通过 MQ 实现 RPC 看起来比客户端和服务器直接通讯要复杂一些,那我们为什么要这样做呢?或者说这样做有什么好处:

  1. 将客户端和服务器解耦:客户端只是发布一个请求到 MQ 并消费这个请求的响应。并不关心具体由谁来处理这个请求,MQ 另一端的请求的消费者可以随意替换成任何可以处理请求的服务器,并不影响到客户端。
  2. 减轻服务器的压力:传统的 RPC 模式中如果客户端和请求过多,服务器的压力会过大。由 MQ 作为中间件的话,过多的请求而是被 MQ 消化掉,服务器可以控制消费请求的频次,并不会影响到服务器。
  3. 服务器的横向扩展更加容易:如果服务器的处理能力不能满足请求的频次,只需要增加服务器来消费 MQ 的消息即可,MQ会帮我们实现消息消费的负载均衡。
  4. 可以看出 RabbitMQ 对于 RPC 模式的支持也是比较友好地,amq.rabbitmq.reply-to, reply_to, correlation_id这些特性都说明了这一点,再加上 spring-rabbit 的实现,可以让我们很简单的使用消息队列模式的 RPC 调用。
0

原来你是这样的 Stream —— 浅析 Java Stream 实现原理

Stream 为什么会出现?

Stream 出现之前,遍历一个集合最传统的做法大概是用 Iterator,或者 for 循环。这种两种方式都属于外部迭代,然而外部迭代存在着一些问题。

  • 开发者需要自己手写迭代的逻辑,虽然大部分场景迭代逻辑都是每个元素遍历一次。
  • 如果存在像排序这样的有状态的中间操作,不得不进行多次迭代。
  • 多次迭代会增加临时变量,从而导致内存的浪费。

虽然 Java 5 引入的 foreach 解决了部分问题,但也引入了新的问题。

  • foreach 遍历不能对元素进行赋值操作
  • 遍历的时候,只有当前被遍历的元素可见,其他不可见

随着大数据的兴起,传统的遍历方式已经无法满足开发者的需求。
就像小作坊发展到一定程度要变成大工厂才能满足市场需求一样。大工厂和小作坊除了规模变大、工人不多之外,最大的区别就是多了流水线。流水线可以将工人们更高效的组织起来,使得生产力有质的飞跃。

所以不安于现状的开发者们想要开发一种更便捷,更实用的特性。

  • 它可以像流水线一样来处理数据
  • 它应该兼容常用的集合
  • 它的编码应该更简洁
  • 它应该具有更高的可读性
  • 它可以提供对数据集合的常规操作
  • 它可以拼装不同的操作

经过不懈的能力,Stream 就诞生了。加上 lambda 表达式的加成,简直是如虎添翼。

你可以用 Stream 干什么?

下面以简单的需求为例,看一下 Stream 的优势:

从一列单词中选出以字母a开头的单词,按字母排序后返回前3个。

外部迭代实现方式

List<String> list = Lists.newArrayList("are", "where", "advance", "anvato", "java", "abc");
List<String> tempList = Lists.newArrayList();
List<String> result = Lists.newArrayList();
for( int i = 0; i < list.size(); i++) {
    if (list.get(i).startsWith("a")) {
        tempList.add(list.get(i));
    }
}
tempList.sort(Comparator.naturalOrder());
result = tempList.subList(0,3);
result.forEach(System.out::println);

stream实现方式

List<String> list = Lists.newArrayList("are", "where", "anvato", "java", "abc");
list.stream().filter(s -> s.startsWith("a")).sorted().limit(3)
                .collect(Collectors.toList()).forEach(System.out::println);

Stream 是怎么实现的?

需要解决的问题:

  • 如何定义流水线?
  • 原料如何流入?
  • 如何让流水线上的工人将处理过的原料交给下一个工人?
  • 流水线何时开始运行?
  • 流水线何时结束运行?

总观全局

Stream 处理数据的过程可以类别成工厂的流水线。数据可以看做流水线上的原料,对数据的操作可以看做流水线上的工人对原料的操作。

事实上 Stream 只是一个接口,并没有操作的缺省实现。最主要的实现是 ReferencePipeline,而 ReferencePipeline 继承自 AbstractPipelineAbstractPipeline 实现了 BaseStream 接口并实现了它的方法。但 ReferencePipeline 仍然是一个抽象类,因为它并没有实现所有的抽象方法,比如 AbstractPipeline 中的 opWrapSinkReferencePipeline内部定义了三个静态内部类,分别是:Head, StatelessOp, StatefulOp,但只有 Head 不再是抽象类。

图1

流水线的结构有点像双向链表,节点之间通过引用连接。节点可以分为三类,控制数据输入的节点、操作数据的中间节点和控制数据输出的节点。

ReferencePipeline 包含了控制数据流入的 Head ,中间操作 StatelessOp, StatefulOp,终止操作 TerminalOp

Stream 常用的流操作包括:
* 中间操作(Intermediate Operations)
* 无状态(Stateless)操作:每个数据的处理是独立的,不会影响或依赖之前的数据。如 filter()flatMap()flatMapToDouble()flatMapToInt()flatMapToLong()map()mapToDouble()mapToInt()mapToLong()peek()unordered()
* 有状态(Stateful)操作:处理时会记录状态,比如处理了几个。后面元素的处理会依赖前面记录的状态,或者拿到所有元素才能继续下去。如 distinct()sorted()sorted(comparator)limit()skip()
* 终止操作(Terminal Operations)
* 非短路操作:处理完所有数据才能得到结果。如 collect()count()forEach()forEachOrdered()max()min()reduce()toArray() 等。
* 短路(short-circuiting)操作:拿到符合预期的结果就会停下来,不一定会处理完所有数据。如 anyMatch()allMatch()noneMatch()findFirst()findAny() 等。

源码分析

了解了流水线的结构和定义,接下来我们基于上面的例子逐步看一下源代码。

定义输入源

stream() 是 Collection 中的 default 方法,实际上调用的是 StreamSupport.stream() 方法,返回的是 ReferencePipeline.Head的实例。

ReferencePipeline.Head 的构造函数传递是 ArrayList 中实现的 spliterator 。常用的集合都实现了 Spliterator 接口以支持 Stream。可以这样理解,Spliterator 定义了数据集合流入流水线的方式。

定义流水线节点

filter() 是 Stream 中定义的方法,在 ReferencePipeline 中实现,返回 StatelessOp 的实例。

可以看到 filter() 接收的参数是谓词,可以用 lambda 表达式。StatelessOp的构造函数接收了 this,也就是 ReferencePipeline.Head 实例的引用。并且实现了 AbstractPipeline 中定义的 opWrapSink 方法。

@Override
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
    Objects.requireNonNull(predicate);
    return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
                                         StreamOpFlag.NOT_SIZED) {
        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
            return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                @Override
                public void begin(long size) {
                    downstream.begin(-1);
                }

                @Override
                public void accept(P_OUT u) {
                    if (predicate.test(u))
                        downstream.accept(u);
                }
            };
        }
    };
}

sorted()limit() 的返回值和也都是 Stream 的实现类,并且都接收了 this 。不同的是 sorted() 返回的是 ReferencePipeline.StatefulOp 的子类 SortedOps.OfRef 的实例。limit() 返回的 ReferencePipeline.StatefulOp 的实例。

现在可以粗略地看到,这些中间操作(不管是无状态的 filter(),还是有状态的 sorted()limit() 都只是返回了一个包含上一节点引用的中间节点。有点像 HashMap 中的反向单向链表。就这样把一个个中间操作拼接到了控制数据流入的 Head 后面,但是并没有开始做任何数据处理的动作

这也就是 Stream 延时执行的特性原因之所在。

参见附录I会发现 StatelessOp 和StatefulOp 初始化的时候还会将当前节点的引用传递给上一个节点。

previousStage.nextStage = this;

所以各个节点组成了一个双向链表的结构。

组装流水线

最后来看一下终止操作 .collect() 接收的是返回类型对应的 Collector。

此例中的 Collectors.toList() 是 Collectors 针对 ArrayList 的创建的 CollectorImpl 的实例。

@Override
@SuppressWarnings("unchecked")
public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {
    A container;
    if (isParallel()
        && (collector.characteristics().contains(Collector.Characteristics.CONCURRENT))
        && (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) {
        container = collector.supplier().get();
        BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator();
        forEach(u -> accumulator.accept(container, u));
    }
    else {
        container = evaluate(ReduceOps.makeRef(collector));//1
    }
    return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
        ? (R) container
        : collector.finisher().apply(container);
}

先忽略并行的情况,来看一下加注释了1的代码:

  1. ReduceOps.makeRef 接收此 Collector 返回了一个 ReduceOp(实现了 TerminalOp 接口)的实例。
  2. 返回的 ReduceOp 实例又被传递给 AbstractPipeline 中的 evaluate() 方法。
  3. evaluate 中,调用了 ReduceOp实例的 evaluateSequential 方法,并将上流水线上最后一个节点的引用和 sourceSpliterator 传递进去。
@Override
public <P_IN> R evaluateSequential(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) {
    return helper.wrapAndCopyInto(makeSink(), spliterator).get();
}
  1. 然后调用 ReduceOp 实例的 makeSink() 方法返回其 makeRef() 方法内部类 ReducingSink 的实例。
  2. 接着 ReducingSink 的实例作为参数和 spliterator 一起传入最后一个节点的 wrapAndCopyInto() 方法,返回值是 Sink 。

启动流水线

流水线组装好了,现在就该启动流水线了。这里的核心方法是 wrapAndCopyInto,根据方法名也能看出来这里应该做了两件事,wrapSink()copyInto()

wrapSink()

将最后一个节点创建的 Sink 传入,并且看到里面有个 for 循环。参见附录I可以发现

每个节点都记录了上一节点的引用( previousStage )和每一个节点的深度( depth )。

所以这个 for 循环是从最后一个节点开始,到第二个节点结束。每一次循环都是将上一节点的 combinedFlags 和当前的 Sink 包起来生成一个新的 Sink 。这和前面拼接各个操作很类似,只不过拼接的是 Sink 的实现类的实例,方向相反。

(Head.combinedFlags, (StatelessOp.combinedFlags, (StatefulOp.combinedFlags,(StatefulOp.combinedFlags ,TerminalOp.sink)))

@Override
@SuppressWarnings("unchecked")
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
   Objects.requireNonNull(sink);

   for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
       sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
   }
   return (Sink<P_IN>) sink;
}

copyInto()

终于到了要真正开始迭代的时候,这个方法接收两个参数 Sink<P_IN> wrappedSink, Spliterator<P_IN> spliteratorwrappedSink对应的是 Head节点后面的第一个操作节点(它相当于这串 Sink 的头),spliterator 对应着数据源。

这个时候我们回过头看一下 Sink 这个接口,它继承自 Consumer 接口,又定义了 begin()end()cancellationRequested() 方法。Sink 直译过来是水槽,如果把数据流比作水,那水槽就是水会流过的地方。begin() 用于通知水槽的水要过来了,里面会做一些准备工作,同样 end() 是做一些收尾工作。cancellationRequested() 是原来判断是不是可以停下来了。Consumer 里的accept() 是消费数据的地方。

@Override
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
   Objects.requireNonNull(wrappedSink);
   if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
       wrappedSink.begin(spliterator.getExactSizeIfKnown());//1
       spliterator.forEachRemaining(wrappedSink);//2
       wrappedSink.end();//3
   }
   else {
       copyIntoWithCancel(wrappedSink, spliterator);
   }
}

有了完整的水槽链,就可以让水流进去了。copyInto() 里做了三个动作:

  1. 通知第一个水槽(Sink)水要来了,准备一下。
  2. 让水流进水槽(Sink)里。
  3. 通知第一个水槽(Sink)水流完了,该收尾了。

突然想到宋丹丹老师的要把大象放冰箱要几步?

注:图中蓝色线表示数据实际的处理流程。

每一个 Sink 都有自己的职责,但具体表现各有不同。无状态操作的 Sink 接收到通知或者数据,处理完了会马上通知自己的 下游。有状态操作的 Sink 则像有一个缓冲区一样,它会等要处理的数据处理完了才开始通知下游,并将自己处理的结果传递给下游。

例如 sorted() 就是一个有状态的操作,一般会有一个属于自己的容器,用来记录处自己理过的数据的状态。sorted() 是在执行 begin 的时候初始化这个容器,在执行 accept 的时候把数据放到容器中,最后在执行 end 方法时才正在开始排序。排序之后再将数据,采用同样的方式依次传递给下游节点。

最后数据流到终止节点,终止节点将数据收集起来就结束了。

然后就没有然后了,copyInto() 返回类型是 void ,没有返回值。

wrapAndCopyInto() 返回了 TerminalOps 创建的 Sink,这时候它里面已经包含了最终处理的结果。调用它的 get() 方法就获得了最终的结果。

回顾

再来回顾一下整个过程。首先是将 Collection 转化为 Stream,也就是流水线的头。然后将各个中间操作节点像拼积木一样拼接起来。每个中间操作节点都定义了自己对应的 Sink,并重写了 makeSink() 方法用来返回自己的 Sink 实例。直到终止操作节点出现时才开始将 Sink 实例化并串起来。然后就是上面提到的那三步:通知、数据流入、结束。

本文介绍和分析了最常规的 stream 用法和实现,实际上 stream 还有很多高阶用法,比如利用协程实现的并行流。感兴趣的同学可以研究一下。当然既然是高阶用法,用的时候一定要多注意。

参考

附录I

以下是初始化 Head 节点和中间操作的实现。

/**
 * Constructor for the head of a stream pipeline.
 *
 * @param source {@code Spliterator} describing the stream source
 * @param sourceFlags the source flags for the stream source, described in
 * {@link StreamOpFlag}
 * @param parallel {@code true} if the pipeline is parallel
 */
//初始化Head节点的时候会执行
AbstractPipeline(Spliterator<?> source,
                 int sourceFlags, boolean parallel) {
    this.previousStage = null;
    this.sourceSpliterator = source;
    this.sourceStage = this;
    this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
    // The following is an optimization of:
    // StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
    this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
    this.depth = 0;
    this.parallel = parallel;
}

/**
 * Constructor for appending an intermediate operation stage onto an
 * existing pipeline.
 *
 * @param previousStage the upstream pipeline stage
 * @param opFlags the operation flags for the new stage, described in
 * {@link StreamOpFlag}
 */
//初始化中间操作StatelessOp和StatefulOp的时候会执行
AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
    if (previousStage.linkedOrConsumed)
        throw new IllegalStateException(MSG_STREAM_LINKED);
    previousStage.linkedOrConsumed = true;
    previousStage.nextStage = this;

    this.previousStage = previousStage;
    this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
    this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
    this.sourceStage = previousStage.sourceStage;
    if (opIsStateful())
        sourceStage.sourceAnyStateful = true;
    this.depth = previousStage.depth + 1;
}
0

《系统架构》读书笔记:架构到底是什么?

引子

这个月读了一本书,《系统架构》。然而这本书讲的不仅仅是软件系统的架构,而是更高一个层面,它讲的是复杂系统的架构。这本书的三位作者,有两位是航空航天专业的教授和副教授,所以书里用了不少 NASA 的项目为案例,比如人类有史以来最复杂的工程——阿波罗计划。读完这本书,让我对架构的认识提升了一个高度,原先各种离散的关于架构的知识和理解,在这个框架下,终于可以融会贯通了。

系统和系统思维

首先,系统是什么?按本书的定义,系统是由一组实体和这些实体之间的关系所构成的集合,而其功能要大于这些实体各自的功能之和。后半句很重要,如果一个系统的功能,等于其部件的功能之和,那么这个系统就没有存在的意义。因为我们单独使用那些部件,也可以得到需要的功能。只有当这些部件组合时,能够涌现出新的功能,那才算是组成了一个系统。

要理解系统架构,首先要有系统思维。所谓系统思维,就是把某个疑问、某种状况或某个难题明确地视为一个系统,也即是视为一组相互关联的实体,而不是孤立的一个对象。

系统思维要有四个步骤是:
1. 确定系统整体的形式与功能。
2. 确定系统中包含的组件,组件的形式与功能。
3. 确定系统中各个组件之间的关系,并且定这些关系的形式及其功能。
4. 根据组件的功能及功能性的互动来确定系统的涌现属性。

系统思维的初级目标是理解系统是什么,更进一层的目标是为了预测系统在发生某些变化之后的情况。而最高级的目标,则是用部件来合成一个系统,这个过程也就是所谓的系统架构。

系统架构的分析

形式与功能

形式是系统的物理体现或信息体现,它有助于功能的执行。形式可以分为形式对象,以及这些形式对象之间具有的形式关系(也就是结构)。例如,汽车作为一个系统,它的形式对象就是汽车部件,而软件系统的形式对象则是模块、过程、代码和指令,而这些形式对象则通过不同的结构组装成一个系统。

系统的另一个属性就是功能。功能由过程和操作对象组成,过程在操作对象上执行之后,会改变操作对象的状态。当系统对外展现的功能对系统的外部的对象进行操作时,系统的价值就体现出来了。例如,离心泵中的电动机可以带动叶轮旋转,这是它内部的一个功能。而离心泵作为一个系统,它可以给外部对象(例如某种液体)加压,从而移动液体。这也就是它的功能和价值所在。

屏幕快照_2018-10-11_上午11.28.19

形式和功能的区别就是,形式决定了系统是什么,而功能决定了系统能做什么。架构其实就是形式与功能之间的映射。形式结构对功能起着承载作用,或者能够促进相关的性能。在复杂系统里,形式与功能的映射会非常复杂,包含很多不确定的问题,或者非理想的因素。因此架构师需要使用抽象等方法来简化架构,以便能够更好的理解和交流架构。

另外,还有一种特殊的功能,叫做解决方案无关的功能。例如,我要从上海出差去北京,那么“将旅客从上海快速、安全的运送到北京”就是解决方案无关的功能,而“使用高铁/飞机将旅客从上海运送到北京”则不是。好的系统规范书,应该是使用与特定解决方案无关的功能来描述的。如果系统规范书将人引导向某种具体的解决方案,可能会令架构师的视野变窄,从而不去探索更多的潜在选项。

从概念到架构

前面说过,架构就是功能与形式之间的映射,但对于复杂系统,这种映射往往非常复杂。架构师常常需要创造一些概念,来简单明了的解释功能是如何映射到形式的。例如前面离心泵的例子,它的解决方案无关的功能则是“移动液体”,而离心泵本身其实就是一个概念,一提到离心泵,熟悉的人一定会想到电动机、叶轮等等。其他的概念包括油电混动、高速铁路、发光二极管、快速排序、分布式缓存等等。软件开发中的各种设计模式,其实也是概念。

对于一个解决方案无关的功能,往往能提出多个不同的概念。架构师需要创造性地提出这些概念,对它们进行整理,并选定其中一个概念,将其转化为一套架构。

复杂系统的架构往往是分层的,同时我们还需要对架构进行模块化。需要注意的是,如果要对某一层的架构进行模块化,我们必须将其分解到下一层。因为只有检查各个实体在更下一层的关系,才能更好的对当前层级进行模块化。

创建系统架构

架构师

很多人常常会问,架构师的职责到底是做什么?这本书给出了很明确的回答,架构师的职责主要是以下三个方面:
- 减少歧义,确定系统的边界、目标和功能
- 发挥创造力,创建概念
- 管理复杂度,为系统选定一种分解方案

而架构师的交付成果,应该包括:
- 一套清晰、完整、连贯的目标,并且是可行的(80% 以上概率)。
- 系统所在的大环境(法律法规、行业规范等等)以及整个产品环境的描述。
- 系统的概念以及操作方式。
- 系统的功能描述(至少两层分解),除了系统对外界展现的功能,也包括系统的内部功能。
- 系统的形式(至少两层分解)和形式结构,以及功能和形式之间的映射。
- 所有的外部接口以及接口控制过程的详细描述。
- 开发成本、工期、风险、实现计划等。

消除歧义,确定目标

为了消除歧义,架构师必须首先理解上游和下游的相关因素对系统架构的影响。上有因素包括:公司策略、营销、法律法规、行业标准、技术成熟度等,下游因素包括实现(编码、制造、供应链管理)、操作、产品与系统的演化。

复杂的系统一般会涉及多个的利益相关者,他们会有不同的诉求和目标,架构师需排定各项目标之间的优先次序。首先,可以把价值视为一种交换,在交换过程中,我方的成果用来满足对方的需求,而对方的成果也同样用来满足我方的需求。其次,可以根据利益相关者对本产品的重要程度,来排列其优先次序。最后,则可以把系统的目标,展示在系统问题描述中(System Problem Statement, SPS)。

发挥创造力,创建概念

接下来,架构师就需要发挥创造性、创建概念了。创造概念,主要有两种方式,一种是无结构的方式,一种是结构化的方式;无结构的创新包括头脑风暴法、自由联想法等方法。对于一些包含多个功能的丰富概念,我们可以对其进行扩展和分解,提出对应的概念片段,而这些概念片段组合后,又会形成新的整体概念。最后通过定量和定性分析,筛选出 2~3 个作为候选概念。

管理复杂度,为系统选定一种分解方案

架构师另外一项工作,就是分解系统,管理复杂度。系统的表面复杂度就是系统的难懂程度,表面复杂度高的系统理解起来会比较困难。架构师可以通过抽象、层级化、分解及递归等手段来减少表面复杂度,但这样做可能会提升实际复杂度。其中架构师最重要的一项决策,就是对系统进行的分解。要判断一种分解方式好不好,必须先向下分解两层,并根据第二层的分解情况,来检查第一层的分解方式是否合适。另外,架构师也要选择合适的分解平面,例如可以按功能分解、按形式分解、按模块变化程度分解、按供应商分解等等。

架构决策

其实架构方面的决策也可以用一些程序化的方法进行计算,以帮助架构师进行决策,这部分比较枯燥就不细说了。不过书中提到架构决策的模式,还有点意思,一共有六个模式:
1. Decision-Option(决策-选项):一组决策,每个决策都有一套离散选项。例如,开始一个系统需要做一下两个决策,决定数据库和 API 接口的使用何种技术:数据库是用 MySQL、MongoDB 还是 Cassandra,API 接口是用 HTTP 还是 Protobuf。这就是一个 Decision-Option 决策。
2. Down-Selecting(筛选):一组二选一的决策,代表选择实体中的某个子集。例如,我要从全国各地的 候选 IDC 机房中,选择合适的机房来部署 CDN,就是一个 Down-Selecting 决策。
3. Assiging(指派):把一个实体集中的元素,指派给另一个实体集中的某个或某些元素。
4. Partitioning(分区):把一个实体集的元素划分为多个互斥的子集,并且覆盖所有元素。这是模块分解的典型决策模式。
5. Permuting(排列):在一个实体集和一个位置集之间建立一一对应关系。
6. Connecting(连接):给定一个用图中节点表示的实体集,用一组连线展示这些节点之间的关系。网络拓扑结构的决策就是一个 Connecting 问题,即选择星形拓扑、环形拓扑或者总线拓扑等等。

书里用了一个很牛逼的例子——阿波罗计划,来说明架构决策的方案。

屏幕快照_2018-10-11_上午11.30.41

总结

看完这本书,你会发现,其实所有的架构,软件也好、汽车飞机等各种机器的架构也好,都是相通的。甚至我各种生物组织、团队组织的架构,也是一样的道理。只要我们能掌握系统架构的思维,再加上各个领域的专业知识,我们就能做出一套优秀的架构。

系统架构原则

书中最后总结了系统架构的二十六个原则,我摘录在这里供大家参考:

涌现原则:当各实体拼合成一个系统时,实体之间的交互会把功能、行为、性能和其他内在属性现出来。

整体原则:每个系统都作为某一个或某些个大系统的一小部分而运作,同时,每个系统中也都包含着更小的一些系统。

聚焦原则:在任何一个点上都能发现很多影响系统的问题,而其数量已经超出了人们的理解能力。因此,我们必须找出其中最关键、最重要的那些问题,并集中精力思考它们。

二元原则:所有由人类构建而成的系统,其本身都同时存在于物理领城和信息领域中。

受益原则:好的架构必须使人受益,要想把架构做好,就要专注于功能的涌现,使得系统能够把它的主要功能通过跨越系统边界的接口对外展示出来。

价值与架构原则:价值是有着一定成本的利益。架构是由形式所承载的功能。由于利益要通过功能而体现,同时形式又与成本相关,因此,这两个论述之间形成一种特别紧密的联系。

与特定解决方案无关的功能原则:糟糕的系统规范书总是把人引向预先定好的某一套具体解决方案、功能或形式中,这可能会令系统架构师的视野变窄,从而不去探素更多的潜在选项。

架构师角色原则:架构师的角色是解决歧义、专注创新,并简化复杂度。

歧义原则:系统架构的早期阶段充满了歧义。架构师必须解决这种歧义,以便给架构团队定出目标,并持续更新该目标。

现代实践压力原则:现代产品开发过程是由同时工作着的多个分布式团队来进行的,而且还有供应参与,因此,它更加需要有优秀的架构。

架构决策原则:我们要把架构决策与其他决策分开,并且要提前花一些时间来道慎地决定这些问题,因为以后如果想变更会付出很高的代价。

遗留元素复用原则:要透相地理解遗留系统及其现属性,并在新的架构中把必要的遗留元素包据进来。

产品进化原则:系统必须进化,否则就会失去竟争力。在进行架构时,应该把系统中较为稳固的部分定义为接口,以便给元素的进化提供便利。

开端原则:在产品定义的早期阶段列出的(企业内部和企业外部的)利益相关者会对架构产生极其重大的影响。

平衡原则:有很多因素会影响并作用于系统的构想、设计、实现及操作,架构师必须在这些因素中寻求一个平衡点,使大多数重要的利益相关者得到满足。

系统问题陈述原则:对问题所做的陈述会确定系统的高层目标,并划定系统的边界。就问题陈述的正确性进行反复的辩论和完善,直到你认为满意为止。

歧议与目标原则:架构师必须解决这些歧义。以便提出几条有代表性的目标并持续地更新它们。这些目标要完备且一致,要兼具挑战性和可达成性,同时又要能够为人类所解决。

创新原则:在架构中进行创新,就是要追求一种能够解决矛盾的好架构。

表面复杂度原则:我们要对系统进行分解、抽象及分层,将其表面复杂度控制在人类所能理解的范围。

必备复杂度原则:系统的必备复杂度取决于它的功能,把系统必须实现的功能仔细描述出来。然后选择一个复杂度最低的概念。

第二定律原则:系统的实际复杂度总是会超过必备复杂度。架构师要令实际复杂度尽量接近必备复杂度。

分解原则:分解是由架构师主动做出的选择。分解会影响性能的衡量标准,会影响组织的运作了式及供应商的价值捕获潜力。

二下一上原则:要想判断出对 Level1 所做的分解是否合适,必须再向下分解一层,以确定 Level2 的各种关系。

优雅原则:对于身处其中的架构师来说。如果系统的必备复杂度较低,而且其分解方式能够同与多个分解平面相匹配,那么该系统就是优雅的。

架构健壮程度原则:好的架构要能够应对各种各样的变化。能够应对变化的那种架构,要么是比较健壮架构。要么是适应能力比较强的架构。前者能够处理环境中的变化,而后者则能够适环境中的变化。

架构决策的耦合与整理原则:可以按照指标对决策的敏感度以及决策之间的连接度来排定架构决策之间的先后顺序。

0

分布式锁实践之一:基于 Redis 的实现

Redis分布式锁实践

什么是分布式锁?

我们日常工作中(以及面试中)经常说到的并发问题,一般都是指进程内的并发问题,JDK 的并发包也是用以解决 JVM 进程内多线程并发问题的工具。但是,进程之间、以及跨服务器进程之间的并发问题,要如何应对?这时,就需要借助分布式锁来协调多进程 / 服务之间的交互。

分布式锁听起来很高冷、很高大上,但它本质上也是锁,因此,它也具有锁的基本特征:

  1. 原子性
  2. 互斥性

除此之外,分布式的锁有什么不一样呢?简单来说就是:

  1. 独立性

    因为分布式锁需要协调其他进程 / 服务的交互,所以它本身应该是一个独立的、职责单一的进程 / 服务。

  2. 可用性

    因为分布式锁是协调多进程 / 服务交互的基础组件,所以它的可用性直接影响了一组进程 / 服务的可用性,同时也要避免:性能、饥饿、死锁这些潜在问题。

进程锁和分布式锁的区别:

图示 -- 进程级别的锁:

图示 -- 分布式锁:

分布式锁的业界最佳实践应该非大名鼎鼎的 ZooKeeper 莫属了。但杀鸡焉用牛刀?在直接使用 ZooKeeper 实现分布式锁方式之前,我们先通过 Redis 来演练一下分布式锁算法,毕竟 Redis 相对来说简单、轻量很多,我们可以通过这个实践来详细探讨分布式锁的特性。这之后再对比地去看 ZooKeeper 的实现方式,相信会更加容易地理解。

怎么实现分布式锁?

由于 Redis 是高性能的分布式 KV 存储器,它本身就具备了分布式特性,所以我们只需要专注于实现锁的基本特征就好了。

首先来看看如何设计锁记录的数据模型:

key value
lock name lock owner

举个例子,“注册表的分布式写锁”:

lock name lock owner
registry_write 10.10.10.110:25349

注意,为保证锁的互斥性,lock owner 标识必需保证全局唯一,不会如例子中显示的那样简单。

原子性

因为 Redis 提供的方法可以认为是并发安全的,所以只要保证加、解锁操作是原子操作就可以了。也就是说,只使用一个Redis方法来完成加、解锁操作的话,那就能够保证原子性。

  • 加锁操作: set(lockName, lockOwner, ...)

    set 是原子的,所以调用一次 set 也是原子的。

  • 解锁操作:eval(deleteScript, ...)

    这里你也许会疑惑,为什么不直接使用 del(key) 来实现解锁?因为解锁的时候,需要先判断你是不是加锁的进程,不是加锁者是无权解锁的。如果任何进程都能够解锁,那锁还有什么意义?

    因为“先判断是不是加锁者、然后再解锁”是两步的复合操作,而 Redis 并没有提供一个可以实现这个复合操作的直接方法,我们只能通过在 delete script 里面进行复合操作来绕过这个问题:因为执行一条脚本的 eval 方法是原子的,所以这个解锁操作的也是原子的。

互斥性

互斥性是说,一旦有一个进程加锁成功能,那么在该进程解锁之前,其他的进程都不能加锁。

在实现互斥性的同时,注意不能打破锁的原子性。

  • 加锁操作:set(lockName, lockOwner, "NX", ...)

    第 3 个参数 NX 的含义:只有当 lockName(key) 不存在时才会设置该键值。

  • 解锁操作:

    eval(
        "if redis.call('get', KEYS[1]) == ARGV[1] then " +
        "return redis.call('del', KEYS[1]) else return 0 end",
        List(lockName),
        List(lockOwner)
    )
    

    当解锁者等于锁的持有者时,才会删除该键值。

超时

解锁权唯一属于锁的持有者,如果持有者进程异常退出,就永远无法解锁了。针对这种情况,我们可以在加锁时设置一个过期时间,超过这个时间没有解锁,锁会自动失效,这样其他进程就能进行加锁了。

  • 加锁操作:set(lockName, lockOwner, "NX", "PX", expireTime)

    "PX" :过期时间单位:"EX" -- 秒,"PX" -- 毫秒

    expireTime : 过期时间

代码片段 1 :加锁、解锁

// 由Scala编写

case class RedisLock(client: JedisClient,
                     lockName: String,
                     locker: String) {
  private val LOCK_SUCCESS = "OK"
  private val SET_IF_NOT_EXISTS = "NX"
  private val EXPIRE_TIME_UNIT = "PX"
  private val RELEASE_SUCCESS = 1L

  def tryLock(expire: Duration): Boolean = {
    val res = client.con.set(
      lockName, // key
      locker, // value
      SET_IF_NOT_EXISTS, // nxxx
      EXPIRE_TIME_UNIT, // expire time unit
      expire.toMillis // expire time
    )
    val isLock = LOCK_SUCCESS.equals(res)
    println(s"${locker} : ${if (isLock) "lock ok" else "lock fail"}")
    isLock
  }

  def unlock: Boolean = {
    val cmd = 
      "if redis.call('get', KEYS[1]) == ARGV[1] then " +
      "return redis.call('del', KEYS[1]) else return 0 end"
    val res = client.con.eval(
      cmd,
      List(lockName), // keys
      List(locker) // args
    )
    val isUnlock = RELEASE_SUCCESS.equals(res)
    println(s"${locker} : ${if (isUnlock) "unlock ok" else "unlock fail"}")
    isUnlock
  }
}

测试加锁:

object TryLockDemo extends App {
  val client = JedisContext.client
  val lock1 = RedisLock(client, "LOCK", "LOCKER_1")

  // Try lock
  lock1.tryLock(1000.millis)
  Thread.sleep(2000.millis.toMillis)

  // Try lock after expired
  lock1.tryLock(1000.millis)

  // Unlock
  lock1.unlock
}

测试结果:

LOCKER_1 : lock ok   # 加锁成功,1秒后锁失效
LOCKER_1 : lock ok   # 2秒之后,锁已过期释放,所以成功加锁
LOCKER_1 : unlock ok # 解锁成功

阻塞加锁

到目前为止,我们实现了简单的加解锁功能:

  • 通过 tryLock() 方法尝试加锁,会立即返回加锁的结果
  • 锁拥有者通过 unlock() 方法解锁

但在实际的加锁场景中,如果加锁失败了(锁被占用或网络错误等异常情况),我们希望锁工具有同步等待(或者说重试)的能力。面对这个需求,一般会想到两种解决方案:

  1. 简单暴力轮询
  2. Pub / Sub 订阅通知模式

因为 Redis 本身有极好的读性能,所以暴力轮询不失为一种简单高效的实现方式,接下来就让我们来尝试下实现阻塞加锁方法。

先来推演一下算法过程:

  1. 设置阻塞加锁的超时时间 timeout
  2. 如果已超时,则返回失败 false
  3. 如果未超时,则通过 tryLock() 方法尝试加锁
  4. 如果加锁成功,返回成功 true
  5. 如果加锁失败,休眠一段时间 frequency 后,重复第 2 步

代码片段 2 :阻塞加锁

def lock(expire: Duration,
         timeout: Duration,
         frequency: Duration = 500.millis): Boolean = {
  var isTimeout = false
  TimeoutUtil.delay(timeout.toMillis).map(_ => isTimeout = true)
  while (!isTimeout) {
    if (tryLock(expire)) {
      return true
    }
    Thread.sleep(frequency.toMillis)
  }
  println(s"${locker} : timeout")
  return false;
}

代码片段 -- 超时工具类:

object TimeoutUtil {

  def delay(millis: Long): Future[Unit] = {
    val promise = Promise[Unit]()
    val timer = new Timer
    timer.schedule(new TimerTask {
      override def run(): Unit = {
        promise.success()
        timer.cancel()
      }
    }, millis)
    promise.future
  }
}

测试阻塞加锁:

object LockDemo extends App {
  val client = JedisContext.client
  val lock1 = RedisLock(client, "LOCK", "LOCKER_1")
  val lock2 = RedisLock(client, "LOCK", "LOCKER_2")

  // Lock
  lock1.lock(3000.millis, 1000.millis)
  lock2.lock(3000.millis, 1000.millis)
  lock2.lock(3000.millis, 3000.millis)

  // Unlock
  lock1.unlock
  lock2.unlock
}

测试结果:

LOCKER_1 : lock ok     # LOCKER_1 加锁成功,3 秒后锁失效
LOCKER_2 : lock fail   # LOCKER_2 尝试加锁失败
LOCKER_2 : lock fail   # LOCKER_2 重试,尝试加锁失败
LOCKER_2 : timeout     # LOCKER_2 重试超时,返回失败

LOCKER_2 : lock fail   # LOCKER_2 尝试加锁失败
LOCKER_2 : lock fail   # LOCKER_2 重试,尝试加锁失败
LOCKER_2 : lock fail
LOCKER_2 : lock fail
LOCKER_2 : lock ok     # 3 秒时间到,锁失效,LOCKER_2 加锁成功

LOCKER_1 : unlock fail # LOCKER_1 解锁失败,因为此时锁被 LOCKER_2 占有
LOCKER_2 : unlock ok   # LOCKER_2 解锁成功

更进一步

这个分布式锁的实现,有一个比较明显的缺陷,就是等待锁的进程无法实时的知道锁状态的变化,从而及时的做出响应。我们不妨思考一下,通过什么方式可以实时、高效的获得锁的状态?

作为分布式锁的业界标准,ZooKeeper 以及相关的工具库提供了更加直接、高效的支持,那么 ZooKeeper 是怎样的思路?具体又是如何实现的?欲知后事如何,且听下回分解:ZooKeeper 分布式锁实践。

0

介绍一个 MySQL 自动化运维利器 – Inception

引子

最近打算做一个 MySQL 的数据库运维平台。这里面有一个非常重要的功能就是 SQL 的审核,如果完全靠人工去实现就没必要做成一个平台了。正没头绪如何去实现的时候,google 了一下,看下有没有现成的开源方案。果不其然,github 上发现一个『去哪儿网』开源的一个数据库运维工具 Inception, 它是一个集审核、执行、备份及生成回滚语句于一身的 MySQL 自动化运维工具。

Inception 介绍

Inception 的架构图如下图所示,简单来说,Inception 就是一个 MySQL 的代理,能够帮助你审核 SQL,执行 SQL,备份 SQL 影响的记录。Inception 是一个 C/S 的软件架构。我们可以通过原生的 MySQL 客户端 去连接,也可以通过远程的接口去连接,目前执行只支持通过C/C++接口、Python接口来对Inception访问

inception-architecture.png

执行流程图如下:

image

安装 Inception

我安装的环境
OS: Ubuntu 16.04.2 LTS

安装依赖

  • 下载bison: 版本最好是2.6之前的(Ubuntu 16.04.2 LTS 版本下安装的是 bison-2.5.1),最新的可能会有问题,下载之后,需要自己编译源码来安装,具体安装方法,可以参数网上的一些说明。
  • cmake安装:apt-get install cmake
  • ncurses安装:apt-get install libncurses5-dev
  • 安装openssl:apt-get install libssl-dev
  • 安装g++:sudo apt-get install g++
  • 安装m4: apt-get install m4

编译安装 Inception

git clone https://github.com/mysql-inception/inception.git
sh inception_build.sh debug [linux]  (如果不指定就是linux平台,而如果要指定是Xcode,就后面指定Xcode)

可执行文件在 debug/sql/Debug/ 目录下面(不同平台有可能不相同)。

启动 Inception

创建一个配置文件 inc.cnf, 里面主要是配置 Inception 启动的端口,SQL 审核的策略,备份数据库的配置等等,更多可参考官方文档

[inception]
general_log=1
general_log_file=inception.log
port=6669   # Inception 的监听的端口
socket=/tmp/inc.socket
character-set-client-handshake=0
character-set-server=utf8
inception_remote_system_password=root  # 备份数据库密码
inception_remote_system_user=wzf1      # 备份数据库用户名
inception_remote_backup_port=3306      # 备份数据库端口
inception_remote_backup_host=127.0.0.1 # 备份数据库地址
inception_support_charset=utf8mb4
inception_enable_nullable=0
inception_check_primary_key=1
inception_check_column_comment=1
inception_check_table_comment=1
inception_osc_min_table_size=1
inception_osc_bin_dir=/data/temp
inception_osc_chunk_time=0.1
inception_enable_blob_type=1
inception_check_column_default_value=1

启动

./Inception --defaults-file=inc.cnf

访问
1. 通过原生的 MySQL 客户端的方式。主要注意的是,请不要将的 SQL 语句块,放到 MySQL 客户端中执行,因为这是一个自动化运维工具,如果使用交互式的命令行来使用的话没有意义,所有的 SQL 执行应该都通过接口的方式,这个方式仅仅可用来查看和设置上诉配置文件里的配置,如 inception get variables; 可查看所有的变量,更多请参考官方文档

mysql -uroot -h127.0.0.1 -P6669
  1. 通过接口的方式。下面是官方示例中的 Python 代码,需要注意的是如果使用 Python3 的 pymsql 去连接会有异常,目前的解决方案是需要修改 pymysql 的源码,具体 issue
#!/usr/bin/python
#-\*-coding: utf-8-\*-
import MySQLdb
sql='/*--user=username;--password=password;--host=127.0.0.1;--execute=1;--port=3306;*/\
inception_magic_start;\
use mysql;\
CREATE TABLE adaptive_office(id int);\
inception_magic_commit;'
try:
    conn=MySQLdb.connect(host='127.0.0.1',user='',passwd='',db='',port=9998)
    cur=conn.cursor()
    ret=cur.execute(sql)
    result=cur.fetchall()
    num_fields = len(cur.description) 
    field_names = [i[0] for i in cur.description]
    print field_names
    for row in result:
        print row[0], "|",row[1],"|",row[2],"|",row[3],"|",row[4],"|",
        row[5],"|",row[6],"|",row[7],"|",row[8],"|",row[9],"|",row[10]
    cur.close()
    conn.close()
except MySQLdb.Error,e:
     print "Mysql Error %d: %s" % (e.args[0], e.args[1])

SQL 审核 & 执行

通过 Inception 对语句进行审核时,必须要告诉 Inception 这些语句对应的数据库地址、数据库端口以及
Inception 连接数据库时使用的用户名、密码等信息,而不能简单的只是执行一条 sql 语句,所以必须要通过某种方式将这些信息传达给 Inception。

连接信息放在 /* ... */ 的注释中,真正的 SQL 语句则包括在 inception_magic_startinception_magic_commit:

/*--user=zhufeng;--password=xxxxxxxxxxx;--host=xxxxxxxxxx;
--enable-check;--port=3456;*/  
inception_magic_start;  
use mysql;  
CREATE TABLE adaptive_office(id int);  
inception_magic_commit;

连接的信息里可以配置更多的信息,比如关闭备份等等,具体请参考官方文档

审核

审核的规范见官方文档,有些规范是可配置的,可根据自己公司的规范在 Inception 的配置文件中配置。

执行

注意下,官方说是支持 DDL,DML 语句的,但是并不支持 SELECT 查询。

inception_accept.png

比如通过 Inception 执行一个建表语句:

...
inception_magic_start;  
use mysql;  
CREATE TABLE adaptive_office(id int);  
inception_magic_commit;
...

返回结果, 可见是每一条 SQL 就会返回一个可执行的结果,errlevel 非 0 时表示执行失败,下面所示中的第二条 SQL 语句 Audit completed(审核完成) 但是不符合建表的规范,更多关于返回结果的说明可见官方文档

'ID', 'stage', 'errlevel', 'stagestatus', 'errormessage', 'SQL', 'Affected_rows', 'sequence', 'backup_dbname', 'execute_time', 'sqlsha1'
1 | CHECKED | 0 | Audit completed | None | use inception_test | 0 | '0_0_0' | None | 0 | 
2 | CHECKED | 1 | Audit completed | Set engine to innodb for table 'adaptive_office'.
Set charset to one of 'utf8mb4' for table 'adaptive_office'.
Set comments for table 'adaptive_office'.
Column 'id' in table 'adaptive_office' have no comments.
Column 'id' in table 'adaptive_office' is not allowed to been nullable.
Set Default value for column 'id' in table 'adaptive_office'
Set a primary key for table 'adaptive_office'. | CREATE TABLE adaptive_office(id int) | 0 | '0_0_1' | 10_10_1_67_1028_inception_test | 0 | 

备份功能

前提条件

  • 线上服务器必须要打开 binlog,不然不会备份及生成回滚语句。
  • 参数 binlog_format 必须要设置为 mixed 或者 row 模式,通过语句:set global binlog_format=mixed/row 来设置,如果是 statement 模式,则不做备份及回滚语句的生成。
  • 被影响的行中必须存在主键,因为回滚语句的 WHERE 条件就是主键。比如,我插入一条数据并返回主键 id=1, 那么相应的它就会反向生成一个删除语句 (WHERE 的条件就是主键) DELETE FROM xx WHERE id = 1

Inception 在做 DML 操作时具有备份功能(默认开启,可通过在执行 SQL 中注释文件中指定 --disable-remote-backup),它会将所有当前语句修改的行备份下来,存储到一个指定的备份库中, 备份库通过配置 Inception 参数来指定。

关于备份数据库的命名方式,备份机器的库名组成是由线上机器的 IP 地址的点换成下划线,再加上端口号,再加上库名三部分,这三部分也是通过下划线连接起来的。例如:我执行 DML 操作的数据库地址是 192.168.1.1, 端口是 3306, 库名是 inceptiondb, 则在备份数据库中表名为:192_168_1_1_3306_inceptiondb

比如,我有一个 inception_test 库,其中有一张 userinfo 表,就两个字段:

userinfo.png

我通过 Inception 去执行一个 INSERT 一条记录:

/*--user=root;--password=xxx;--host=1.1.1.1;--execute=1;--port=3306;--sleep=0;--enable-remote-backup;*/\
inception_magic_start;\
use inception_test; \
insert into userinfo(`username`) values("test");\
inception_magic_commit;

返回的结果如下, 可以看到已经执行成功并且备份成功了:

2 | EXECUTED | 0 | Execute Successfully
Backup successfully | None | insert into userinfo(`username`) values("test") | 1 | '1533716166_25519001_1' | 1_1_1_1_3306_inception_test | 0.060 | 

查看下备份数据库中的 1_1_1_1_3306_inception_testuserinfo 表的结果, 根据 INSERT 的语句相应地生成了一条 DELETE 语句:

DELETE FROM `inception_test`.`userinfo` WHERE id=4;

那么,我需要如果正确地找到回滚的语句呢?

可以查看下备份库 1_1_1_1_3306_inception_testuserinfo 的表结构:

backup_userinfo.png

主要有两个字段:

  • rollback_statement text: 生成修改的回滚语句。
  • opid_time varchar(50): 这个列存储的是的被执行的 SQL 语句在执行时的一个序列号,这个序列号由三部分组成:timestamp(int 值,是语句被执行的时间点) + 线上服务器执行时所产生的 thread_id + 当前这条语句在所有被执行的语句块中的一个序号组成。可见上面的结果:1533716166_25519001_1, 这个序列号同时也会出现在执行返回的结果中,所有需要回滚就是根据这个序列号去备份表中查询回滚的 SQL 语句。

更多说明,请参考官方文档中的备份功能说明

最后

有了这么好用的工具,基于这个为基础,我们通过一个 WEB 应用做一个权限审批管理等功能,一个数据库运维平台就可以实现了,真的需要自己去写吗?我有发现了一个基于 Inception 实现的一个数据库运维平台 Yearning

感谢开源!

参考

0

小程序中 Redux 的使用

在我们的一款小程序中聊天部分主要是基于 Redux 来维护数据部分的。为什么使用了 Redux ?这也是符合了使用 Redux 的一些原则的。那么哪些情况使用 Redux 比较好呢?

用户的使用方式复杂
不同身份的用户有不同的使用方式(比如普通用户和管理员)
多个用户之间可以协作
与服务器大量交互,或者使用了 WebSocket
View 要从多个来源获取数据

我们的聊天功能基于 WebSocket 交互数据,使用方式较为复杂,多个地方都会影响聊天呈现的数据内容。并且与服务器交互量比较大,UI 上呈现的内容受到多个地方的影响。如下图:

 Redux 在小程序中的交互逻辑

图中展示了 Redux 的三大块业务实现部分与业务部分的交互逻辑,其中数据会反应在首页和聊天界面,而首页及聊天界面的一些操作又会通过 Action 反馈到 Redux 的数据对象上。另外 Websocket 和 Http 网络部分也会有很多数据反馈到 Redux 的数据对象上。

Redux 设计思想

简单总结为两句话:

(1)Web 应用是一个状态机,视图与状态是一一对应的。
(2)所有的状态,保存在一个对象里面。

Redux 的三大原则

  1. 单一数据源
  2. State 是只读的
  3. 使用纯函数来执行修改

其工作逻辑如下图所示:

Redux

Store

Store 就是保存数据的地方,你可以把它看成一个容器。整个应用只能有一个 Store。

import { createStore } from 'redux';
const store = createStore(fn);

通过 Store 可以获取到 State 对象,State 为时点的数据集合,即 Store 的一个快照。

import { createStore } from 'redux';
const store = createStore(fn);

const state = store.getState();

Action

State 的变化,会导致 View 的变化。但是,用户接触不到 State,只能接触到 View。所以,State 的变化必须是 View 导致的。Action 就是 View 发出的通知,表示 State 应该要发生变化了。

const action = {
  type: 'ADD_TODO',
  payload: 'Learn Redux'
};

Dispatch

store.dispatch()是 View 发出 Action 的唯一方法。

import { createStore } from 'redux';
const store = createStore(fn);

store.dispatch({
  type: 'ADD_TODO',
  payload: 'Learn Redux'
});

Subscribe

Store 允许使用store.subscribe方法设置监听函数,一旦 State 发生变化,就自动执行这个函数。

import { createStore } from 'redux';
const store = createStore(reducer);

store.subscribe(listener);

小程序

在 Redux 的使用中我们主要会去实现两个部分,一是 Action 部分,去构造定义要发送的 Action 的数据格式等,另一部分是 Reducer 部分,即 Dispatch 分发的 Action 的具体相应处理部分。Reducer 即接收原 State 和 Action,根据当前 Action 重新创建一份新的 State,然后返回这个 State。

消息的处理逻辑一开始并不是很好,发送消息、接收消息、发送中、接收中等各种消息的状态,都会单独发送不同的 Action 这也导致 Reducer 的维护变得非常困难,而且导致很多不一致的地方。

Actions_to_Action

后来改为一个 Action 做统一处理,处理起来简单了很多,将原有的多种 Action,多种接收 Action 并处理的逻辑统一成一种,当然如果是维护的不同数据那么还是需要分开来处理的。

修改后 Action 的实现

修改后消息 Action 接口如下:

onMessage(message, doctorId)

如两处修改消息 Action 的使用:

  • 接收消息,WebSocket 接收到新的消息时,因为接收到的消息分为发送出去的,和接收到的两种:
onMessage({
    ...message,
    sending: { 
        status: 0 
    }},
    message.from === config.patientId ? message.to : message.from
);
  • 发送消息,本地发送消息时,将其更新到界面上,并调用WebSocket发送接口将其发送出去。
const sendText = async (doctorId, text) => {
  const message = {
    typ: MSG.TEXT,
    content: toRealText(text),
    to: doctorId,
    mine: true,
    sending: {
      status: 1
    },
    guid: guid(),
    from: config.patientId,
    created: Date.now()
  }
  onMessage(message, doctorId)
}

修改后 Reducer 的实现

在 Reducer 统一提供一处消息 Action 的处理方式,避免之前的多处处理导致的数据不一致的情况:

[CHAT_MESSAGE] (_state, _action) {

    // 拷贝一份 state
    let state = {..._state};

    // 提取消息参数
    _handle(_state, _action);

    // 处理消息对象
    // 修改 state
    // ...

    return state;
}

修改后 UI 订阅状态

最后我们需要将 State 中维护的数据对象显示到 UI 上,在 Javascript 中我们可以使用 @connect 在页面上加上修饰,通过 @connect 实现内容的 subscribe 过程,将 messages 方法注入到页面中的 data 中。

@connect({
    messages(state) {
      const chat = state.chat[this.doctorId];
      if (chat) {
        return chat.messages;
      }
      return [];
    }
})

对于首页也是同样的道理,在首页不需要消息列表,但是需要小时列表的摘要信息以显示有多少种消息列表。也即当前维护着的对话数量。

@connect({
    sessions(state) {
        let sessions = []
        for (let id in state.chat) {
            let session = state.chat[id]
            sessions.push(session.session)
        }
        sessions.sort((a, b) => {
        return b.updated - a.updated
        })
        return sessions
    }
});

总结

小程序里使用到的内容较为简单,Redux 原本也就是简化 Web 中状态和界面简单对应关系,使用时只需要关注其三大原则即可。并尽可能地统一相同的修改操作,保持数据的统一性。

参考

http://www.redux.org.cn/

http://www.ruanyifeng.com/blog/2016/09/redux_tutorial_part_one_basic_usages.html

0