基于 Apollo 的 配置中心 Matrix 2.0 实践总结

配置中心

首先简单介绍一下什么是配置中心,我们为什么需要它,为什么要花力气去完善它。

微服务化的挑战

传统单体应用( monolithic apps )因种种潜在缺陷,如:随着规模的扩大,部署效率逐渐降低,团队协作效率差,系统可靠性变差,维护困难,新功能上线周期长等,迫切需要一种新的架构去解决这些问题,而微服务( microservices )架构正是当下一种正确的解法。

不过,解决一个问题的同时,往往会诞生出很多新的问题,故:微服务化的过程中伴随着很多的挑战,其中一个挑战就是有关服务(应用)配置的。

当系统从一个单体应用,被拆分成分布式系统上一个个服务节点后,配置文件也必须更着迁移(分割),这样配置就分散了,不仅如此,分散中还包含着冗余,冗余分两方面:服务与服务之间(如:有 A,B,C 三个服务调用 D 服务,那么 D 服务的地址会被复制三份,因为 A,B,C 三个服务是 share nothing 的),同服务实例之间(如:A服务的所有实例都是一样的配置,且它们在物理上很有可能是分散的,即:不在一台机器上)。

在单体应用时期,我们管理配置只需要考虑环境(develop,test,staging,producting...)这一个维度,那么现在就多了服务(应用)这个维度。

再明确一下上面说的问题:配置文件分散且冗余,映射到配置管理上就是:分散意味着需要一个个处理,冗余意味着重复操作。

为了解决这个问题,配置必须集中管理(逻辑上分散,物理上集中),而实现这个功能的系统组件(或基础设置)就是配置中心。

配置中心核心需求

既然集中管理,那么就是要将应用的配置作为一个单独的服务抽离出来了(配置不再和应用一起进代码仓库),同理也需要解决新的问题,比如:版本管理(为了支持回滚),权限管理,灰度发布等

以上讨论的还都停留在静态配置的层面上,而应用除了静态的配置(如:数据库配置,一些云服务的参数配置,服务启动后一般不会变动),还会有一些动态的配置(如:灰度开关,一些常量参数:超时时间,限流阈值等),还有理论上:

在一个大型的分布式系统中,你没有办法把整个分布式系统停下来,去做一个软件的、硬件的或者系统的升级

业务需求的一些天然动态行为(如:一些运营活动,会动态调整一些参数),加之理论上必须要支持这个特性,所以,配置中心服务还得支持动态特性,即:配置热更新。

简单总结一下,在传统巨型单体应用纷纷转向细粒度微服务架构的历史进程中,服务配置中心是微服务化不可缺少的一个系统组件,其解决的就是:分布式系统的动态配置问题。

那么我们是怎么解决的呢?那就是 Matrix 1.0 的故事了。

Matrix 1.x

Matrix 是我们自研的服务配置中心,它完成了第一步,即:配置集中管理,所有应用的配置文件都交给 Matrix 管理,拥有环境隔离,版本控制,权限管理等功能。

配置文件是在 CI 构建阶段静态注入的,不同环境注入相应的配置文件,对不同的 build 工具(如:maven,sbt)都实现了配置注入的插件,来从 Matrix 上拉取配置文件

那么为什么还要进行 Matrix 2.0 的工作呢?Matrix1.x 有什么缺陷吗?

在服务(应用)数不多的情况下,Matrix1.x 是完全够用的,但是随着业务规模的发展,问题会渐渐暴露出来。

配置冗余

Matrix 并没有解决配置冗余的问题,我们需要类似模版的东西,将冗余部分抽取成一份并共享,降低配置维护成本。

打包耦合

其次,CI 打包与配置注入耦合,意味着打包与环境耦合,一个环境对应一个包(镜像),这其实违背了容器的"一份镜像到处运行"的理念,即:镜像与环境无关,应用需要什么环境的配置,在启动阶段确定(注入)就可以了。

不支持动态更新

最后是没有配置动态(热)更新能力,前面已说过这个也是必须要支持的;只有我们的基础设施是完美的,其之上的业务才能是完美的。

所有问题,在服务规模小(管理/运维复杂度低)时,都不是问题。

随着业务复杂度的不断提高,和微服务架构的不断深化,现有的 Matrix 1.x 版本暴露出很多缺失的关键能力(比如权限管理,配置模板,热发布等),逐渐成为产品快速迭代的瓶颈之一,我们亟需对 Matrix 进行升级改造,打造一个更强大、更易用的微服务配置中心。

所以我们需要 Matrix 2.0。

Matrix 2.0

在撸起袖子开始干之前,还是得静下心来评估一下,是继续走自研,还是选择别人开源的进行二次开发?我做了一段时间的调研,显然继续自研的开发 cost 是巨大的,核心功能如:配置热更新,灰度发布,配置模版(去冗余)都得从零开始开发, 其次还要保证高性能,高可用等非功能性需求。

二次开发的选择其实也不多(这并不是什么坏事),参考网络上已有的对比与讨论,可得出以下结论:

注册中心 配置存储 时效性 数据模型 维护性 优点 缺点
disconf zookpeer 实时推送 支持传统的配置文件模式,亦支持 KV 结构数据 提供界面操作 基于分布式的 Zookeeper 来实时推送稳定性、实效性、易用性上均优于其他 代码复杂, 2016 年 12 月之后没有更新,功能不够强大,不能完全满足我们的需求
zookpeer zookpeer 实时推送 支持传统的配置文件模式,亦支持 KV 结构数 命令操作 实时推送稳定性、实效性 太底层,开发量大
diamond mysql 每隔15s拉一次全量数据 只支持 KV 结构的数据 提供界面操 简单、可靠、易用 数据模型不支持文件,使用不方便
Spring Cloud Config git 人工批量刷新 文件模式 Git 操作 简单、可靠、易用 需要依赖 GIT,并且更新 GIT
Apollo mysql 实时推送 + 定时拉取 支持传统的配置文件模式,亦支持 KV 结构数 提供界面操 架构设计和稳定性考虑比较完善,不少大厂在用,Portal 的体验不错, 更新活跃 整体架构略显复杂,和我们容器环境不太一致

就目前看来"真正能打的"就 Apollo 了,由于自研成本较大,并且 Apollo 的代码也并不复杂(是标准的 Spring Boot 项目),其功能也基本上能覆盖我们的需求,所以我们最后选择基于 Apollo 进行二次开发。

对于不了解 Apollo 的同学,下面简单介绍一下,详情可参考官方文档

Apollo

Apollo(阿波罗)是携程框架部门研发的开源配置管理中心,能够集中化管理应用不同环境、不同集群的配置,配置修改后能够实时推送到应用端,并且具备规范的权限、流程治理等特性。

其 Portal 界面的截图如下:

apollo-home-screenshot

目前其最新版本(1.0.0)主要提供的功能有:

  • 多环境配置管理
  • 多集群配置管理
  • 支持热发布
  • 版本发布管理(支持配置回滚)
  • 灰度发布
  • 权限管理、发布审核、操作审计
  • 客户端配置信息监控

可以看到 Apollo 提供了很多强大的功能,解决了 Matrix 1.0 待解决的问题;锦上添花的是其对 Spring 项目的支持非常好,这又能大大节省开发时间。

二次开发

部署模式改造

二次开发主要是做适应我们环境的定制改造,而非添加大的新功能(至少这个阶段不会),首当其冲的就是部署模式的改变,下图是 Apollo 的架构图:

overall-architecture

这个图是从上往下看的,第一层是 client,也就是与应用集成的 SDK,还有 Protal(包括自己的 server 和 db),它们通过一层 SLB 访问到 Meta Server,而Meta Server 是 Eureka(服务注册中心)的一层封装,用于发现 Config Service 和 Admin Service(它们启动时会向 Eureka 注册自己),也就是最后一层,这两个服务共同管理着我们的配置,配置则存储在 ConfigDB 中。

外面还有一条 Client 到 Config Service 的箭头,就是其实时推送机制的实现原理:Client 通过(HTTP)Long Polling的方式,不停的询问 Config Service,如果配置有更新(发布)则会立即返回,无更新则返回 HTTP 状态码 304。

而我们的服务都是部署在 Kubernetes 集群上的,Kubernetes 有自己的服务发现功能,包括 LB 功能,所以与 Eureka 重复了,需要将 Eureka 剥离出来,将 Meta Server 去掉,我们的部署方式如下图:

DA83200EA3E7A2A7D36D140497F16C25

所有非生产环境对应一个 portal,每个环境独立部署 config service 和 admin service,其中生产环境在单独的 k8s 集群中(单独对生产环境做了隔离,是因为 Apollo 的权限管理还不够强大,不支持区分环境的访问权限)。

这个改造分三部分,第一部分和第二部分是分别去除 Protal 和 Client 对 Meta Server 对依赖,即:去除在 Client 和 Protal 中配置的 Meta server 地址,而是直接使用 Kubernetes 暴露的服务域名(开发和 local 环境因为需要本地访问,有公网地址)。

第三部分是去除 Config Service 上的 Eureka Server,Admin Service 上的 Eureka Client,因为 Eureka 非常成熟易用,且是个声明式(基于注解和配置)的框架,所以改动起来并不麻烦,我们只是将 Eureka 的注解去掉了而已,没有把依赖去掉(如果要去掉依赖,那么工作量会很大,因为依赖的地方多,整个 Meta Server 都是对其 Client API 的封装)。

唯一的坑就是记得在配置文件中把spring.cloud.discovery.enabled设置为 false,因为 Protal 对 Admin Service 的健康检查是基于spring boot actuator,而它是自动的,检测到有 Eureka 的依赖,就会启动相关健康检查的 Endpoint。

如果想做的好一点,可以做个配置开关,启动时配置是否启用 Eureka,当然这样工作量会很大,或许官方会考虑(毕竟容器编排系统是趋势,大中小厂都会用)。

本地开发

其次我们还添加了 LOC 环境(针对本地开发),UT 环境(针对单元测试),这方面 Apollo 有方便自定义环境的方法提供。不过我们并不满意官方推荐的本地开发模式,我们希望配置中心对本地开发是透明的,对于 Spring Boot 项目,原来是基于profile 覆盖的,那么现在还是 profile 覆盖的方式,这是最好的。然而 SDK 是将 Apollo 上拉取的配置覆盖在 Spring 的配置上的,即 Spring 的 profile 机制会失效,经讨论后决定在 Client 启动时:如果检测到当前是 LOC 环境,则将 Apollo 和 Spring 的配置覆盖顺序倒置。

这里简单的说一下实现原理,SDK 对 Spring 项目的集成是通过往 Environment 的 propertySourceList 中以 addFirst 的方式添加自己的配置,放在了查找链的最前面,故达到了覆盖一切的目的,而我们在检测到是 LOC 环境时以 addLast 的方式添加 Apollo 上的配置,就不会覆盖任何配置了,原来的 profile 机制依然有效。

封装SDK

还有一个坑,就是 SDK 的初始化时间问题,或者说是 Client 拉取配置的时间点问题;任何应用都有框架级别的配置,而有些配置是在一个非常早的时间点生效的,这是"应用启动时读取配置"这种方式必须考虑的问题,Spring Boot 项目的 Logging System 就是初始化早于 Apollo 的配置初始化(覆盖)的例子,也就是说:关于 log 的配置,不会生效。

我们解决的方式就是在 Apollo 配置初始化之后,重新初始化这些模块,使得 Apollo 的配置生效,这种解法的副产物是:我们使得这些模块的配置具有了动态性(我们可以安全的在运行期去重新初始化这些模块)

为此我们封装了自己的 SDK:vapollo(依赖于 apollo-client 的 SDK),提供定制功能。

配置迁移

二次开发结束后,接着面对的问题就是配置迁移问题,因为我们选择了对 Apollo 进行二次开发,而不是对 Matrix1.x 进行扩展,故:我们需要将配置迁移过去。

我们编写了详细的配置迁移操作手册(主要涵盖了操作流程和配置规范),并设定了迁移计划,之后项目的配置会慢慢全部迁移过去,为了更安全的实施这个过程,我们在实际迁移一个简单项目后,发现还可以编写工具来帮助我们,比如配置的 diff(检查是否遗漏项,不同项)。

跳出眼前,展望未来

Matrix2.0 的工作暂时告一段落,本文虽然是总结工作内容的,但是最后我希望跳出眼前的工作,看一看未来,从而了解我们现在的不足之处。

主要分两部分:Matrix2.x 和 Matrix3.0

Matrix2.x 指现阶段工作的延续,就目前来说(实际使用一段时间后),还有很多操作上的可优化点(比如:添加更多的默认设置,让开发人员对 Apollo 更加无感知),其次是关于配置的规范也会不断的完善(什么环境必须要有那些配置,namespace 的创建和关联),甚至会有完善的配置发布流程,有了规范,就会有 review,config review(就像代码 review 那样)

世界是变化的,现阶段的工作只是符合了我们现在的环境,之后会怎样?我们的微服务架构还在发展,我们还没上 service mesh,如今基础设置都在不断的下沉,我们的配置中心需要怎样?我想也是一样的,对开发来说:我们对配置的存在会越来越无感知,我不关心这个配置项是哪来的,不关心当前是何种部署环境,不关心配置项是变化还是不变的,我只关心用到它的业务是怎样的;另一方面对配置的管理和维护也应该越来越智能,越来越自动。希望 Matrix3.0 能实现这些目标。

参考资料

  1. 微服务架构为什么需要配置中心?
  2. 阿里巴巴微服务与配置中心技术实践之道
  3. Apollo配置中心Wiki
  4. Apollo Client Spring项目集成原理
  5. Apollo源码解析
0

ZooKeeper 分布式锁实践(下篇)读写锁

ZooKeeper 分布式锁实践(上篇)排它锁 中我们通过代码实践了如何使用 ZooKeeper 组件来实现排他锁。

排他锁简单易用,但是缺点也很明显:

  • 竞争压力大:当锁被占用之后,其他获取锁的操作只能阻塞等待;当锁释放后,所有等待锁的进程会在同一时刻争抢锁的使用权。
  • 羊群响应:锁释放后,会通知所有等待锁的进程,如果等待者特别多,一时间锁的竞争压力将会特别大。

简单来说就是:通知范围太广、锁的粒度太大。我们可以分别从这两个层面去寻找解决方案:

  • 缩小通知范围:等待锁的小伙伴们按先来后到的顺序排队吧,排好队了,接下来我只需要关心我前面一个节点的状态,当前一个节点被释放,我再去抢锁。

  • 缩小锁的粒度:锁不关心业务,但是可以简单地通过操作的读、写性质来二分锁的粒度:

    • 读锁:又称共享锁,如果前面没有写节点,可以直接上锁;当前面有写节点时,则等待距离自己最近的写节点释放( 删除 )。

    • 写锁:如果前面没有节点,可以直接上锁;如果前面有节点,则等待前一个节点释放( 删除 )。

      思考:为什么不是关注前面距离自己最近的写节点?

      如果两个写节点之间有读节点,必需等待读节点释放之后再进行写节点请求,否则会有不可重复读的问题。

数据结构

和排他锁一样,我们通过 ZooKeeper 的节点来表示一个读写锁的父节点,如 /SHARE_LOCK,通过父节点下的临时自增子节点来表示一个读写操作请求,如 /SHARE_LOCK/R_0000000001 。整体数据结构如下图所示。

算法

获取锁

获取锁的算法步骤:

  1. 开始尝试获取锁
  2. 如果持久化父节点不存在,则创建父节点
  3. 如果当前临时自增子节点不存在,则创建子节点
  4. 获取父节点下的所有子节点
  5. 在所有子节点中,查找序号比当前子节点小的前置子节点( 最近的兄节点 )。有两种情况:
    • 读请求:查找比自己小的前置子节点 ( 最近的兄节点 )
    • 写请求:查找比自己小的前置子节点 ( 最近的兄节点 )
  6. 如果没有更小的前置子节点,则持有锁
  7. 如果有更小的前置子节点,则监听该子节点被释放( 删除 )的事件
  8. 释放 ( 删除 )子节点事件被触发后,重复第 1 步

释放锁

释放锁的算法与排他锁部分的释放锁算法相似,这里不再赘述。

加锁、解锁流程

加锁、解锁完整的流程图。

代码实现

子节点定义

子节点属性

  • lockName 读写锁的名称,即父节点的名称
  • name 子节点的名称,格式为 :{请求类型:R/W}_{自增序号} ( 子节点的路径为:{lockName}/{name}
  • seq 子节点的自增序号,通过解析 name 属性 _ 下划线分隔符后面的数字字符串来获取序号( ZooKeeper 创建临时自增节点时会自动分配 Int 范围内的序号 )
  • isWrite 子节点是否为写请求,通过解析 name 属性 _ 下划线分隔符前面的英文字符来判断请求类型 :
    • R :读请求
    • W :写请求

zk_lock_share_childnode

读写锁定义和初始设置

读写锁的属性

  • lockName 读写锁的名称,即父节点的路径
  • locker 获取锁的请求方,即锁的持有者,释放锁时需要验证请求者与锁的持有者是否一致
  • isWrite 请求类型:
    • 读:false
    • 写:true

读写锁的初始设置

  • 连接到 ZooKeeper 实例
  • 连接后,如果父节点不存在,则创建父节点

zk_lock_share_zksharelock_setup

尝试获取锁

尝试获取锁的算法实现

  1. 获取或创建 ZooKeeper 子节点
  2. 获取当前子节点后,遍历所有的子节点,查找:
    • front 离当前子节点最近的兄节点:序号比当前子节点的序号小、且在小于当前序号的子节点中序号是最大的
    • fontWrite 离当前子节点最近的写、兄节点:序号比当前子节点的序号小、且在小于当前序号的子节点中序号是最大的、且为写子节点
  3. 查找后,返回序号更小的兄节点:
    • 读请求:返回最近的写、兄节点,用于 Watch 监听释放( 删除 )事件
    • 写请求:返回最近的兄节点,用于 Watch 监听释放( 删除 )事件
  4. 如果没有更小的子节点,返回 None ,表示成功地获取了锁

zk_lock_share_zksharelock_trylock

zk_lock_share_zksharelock_createchildnode

同步获取锁

同步获取锁的算法实现

  1. 尝试获取锁
  2. 如果没有兄节点,则成功持有锁
  3. 如果得到更小的兄节点,则监听该兄节点的释放( 删除 )事件
  4. 收到兄节点的释放( 删除 )事件通知后,重复第 1 步

zk_lock_share_zksharelock_lock

测试验证

最后,通过一个简单的测试方法来验证读写锁的加、解锁过程:

zk_lock_share_zksharelock_test

测试结果

zk_lock_share_zksharelock_test_out

测试结果、分析

# 请求方 操作 输出
1 LOCK1_读 加锁:成功 [LOCK1] : Lock
2 LOCK2_写 加锁:等待,因为有未释放的兄节点 LOCK1
3 LOCK3_写 加锁:等待,因为有未释放的兄节点 LOCK2
4 LOCK4_读 加锁:等待,因为有未释放的兄、写节点 LOCK3
5 LOCK5_读 加锁:等待,因为有未释放的兄、写节点 LOCK3
6 LOCK1_读 解锁:成功,通知到 LOCK2 [LOCK1] : Unlock
7 LOCK2_写 收到通知,尝试加锁:成功 [LOCK2] : Lock
8 LOCK2_写 解锁:成功,通知到 LOCK3 [LOCK2] : Unlock
9 LOCK3_写 收到通知,加锁成功 [LOCK3] : Lock
10 LOCK3_写 解锁成功,通知到 LOCK4、LOCK5 [LOCK3] : Unlock
11 LOCK4_读 收到通知,尝试加锁:成功 [LOCK4] : Lock
12 LOCK5_读 收到通知,尝试加锁:成功 [LOCK5] : Lock
13 LOCK4_读 解锁:成功 [LOCK4] : Unlock
14 LOCK5_读 解锁:成功 [LOCK5] : Unlock

尾声

通过 ZooKeeper 分布式锁实践,对它的接口最直观的感受就是简单。虽然它没有直接提供加锁、解锁这样的原语,但是当你了解了它的数据结构、接口和事件设计之后,加锁、解锁功能简直呼之欲出,实现起来毫无障碍,一切都是那么地合理、妥当。

而 ZooKeeper 的能力远不止于此,就像前面提到的它能够十分轻松地实现诸如:数据发布/订阅、负载均衡、命名服务、分布式协调/通知、集群管理、Master选举、分布式锁、分布式队列这些小菜,不得不佩服 ZooKeeper 设计者的抽象能力。本篇只是浅尝了 ZooKeeper 的基本能力,有关它的设计思路、实现细节仍待进一步发掘和探索。

0

ZooKeeper 分布式锁实践(上篇)排它锁

前面我们使用 Redis 实现了一个简单的分布式排它锁,它的主要问题在于无法及时得知锁状态的变化,虽然能够通过 Redis 的订阅发布模式来实现通知的功能,但实现起来比较复杂、实现成本较高。而 ZooKeeper 天生就是为分布式系统的协调工作而设计的,能够很轻松地实现日常的分布式管理工具,也就是 ZooKeeper 几大菜谱:

  • 数据发布/订阅
  • 负载均衡
  • 命名服务
  • 分布式协调/通知
  • 集群管理
  • Master选举
  • 分布式锁
  • 分布式队列

接下来,就让我们具体来看看,如何通过 ZooKeeper 的部件来实现一个分布式锁。

数据模型

对于一个锁来说,我们至少需要知道它的 ID、以及它的所有者( 只有持有者才能解锁 ),所以简单来说,锁的数据模型就是:

case class ZkLock
(
  lockName: String,
  lockOwner: String
)

那么,如何用 ZooKeeper 来存储锁对象呢?

其实非常简单,用一个 ZNode 来表示一个锁,ZNode 的路径就是锁的 ID、ZNode 的数据即是锁的所有者:

/zk-lock (data = "zk-lock-owner")

原语

锁的原语一般有两个:

  1. 加锁
  2. 解锁

加锁

加锁的一般算法步骤是:

  1. 尝试加锁
  2. 如果锁没有被占用,则加锁成功
  3. 如果锁被占用,则等待锁被释放
  4. 锁被释放后,收到锁释放通知,重复步骤 1

翻译成ZooKeeper的算法步骤就是:

  1. 尝试创建表示锁的临时节点
  2. 如果创建节点成功,则加锁成功
  3. 如果创建节点失败,则创建一个锁节点的监视器,等待锁节点的删除通知
  4. 锁节点被持有者删除后,收到锁节点的删除通知,重复步骤 1

这里有两个注意点:

  • 为什么创建临时节点?当 ZooKeeper 客户端断开与服务端的连接时,它所创建的临时节点会被删除
  • 节点删除监视器

加锁实现

  1. 数据模型:ZkLock 对象:
    case class ZkLock
    (
      lockName: String, // 锁ID = ZNode节点的路径
      lockOwner: String // 锁的所有者 = ZNode节点的数据内容
    ) {
      // 1. connect
      // 2. try lock
      // 3. lock
      // 4. unlock
    }
    
  2. 为了示例完整性,在构造锁对象时会创建一个 ZooKeeper 客户端连接,用来和 ZooKeeper 服务端通信。

    这里通过一个闭锁来同步 ZooKeeper 客户端的连接状态:构造函数会一直阻塞,直到ZooKeeper客户端连接上服务端,然后打开闭锁:

    case class ZkLock(...) {
    
      //////////////////// 1. connect ////////////////////
    
      private val connectSignal = new CountDownLatch(1)
      private val zk = new ZooKeeper(
        ZkConnection.defaultHost,
        60 * 1000,
        new Watcher {
          override def process(e: WatchedEvent): Unit = {
            if (KeeperState.SyncConnected.equals(e.getState)
              && EventType.None.equals(e.getType)) {
              connectSignal.countDown // 连接成功,打开闭锁
            }
          }
        }
      )
      connectSignal.await
    }
    

    与服务端成功建立连接之后,就可以实现下一步尝试加锁的操作了。

  3. 尝试加锁

    • 排他性:锁节点只能被一个 ZooKeeper 成功创建
    • 可重入性:如果锁节点已被创建、且加锁者与锁节点的持有者一样,也返回加锁成功
    • 非阻塞方法:不管是否成功加锁,都立即返回加锁的结果
    case class ZkLock(...) {
    
      //////////////////// 2. try lock ////////////////////
    
      def tryLock: Boolean = {
        Try {
          if (zk.exists(lockName, false) != null) {
            val existOwner = new String(zk.getData(lockName, null, null), "UTF-8")
            lockOwner.equals(existOwner) // 可重入性
          } else {
            zk.create(
              lockName, // 节点路径 = 锁ID
              lockOwner.getBytes, // 节点数据 = 锁的所有者
              Ids.OPEN_ACL_UNSAFE, // 公开访问权限
              CreateMode.EPHEMERAL // 临时节点
            )
            true
          }
        } match {
          case Success(isLocked) => isLocked
          case _ => false
        }
      }
    }
    
  4. 同步加锁
    • 同步加锁算法:尝试加锁:调用 tryLock 方法
      • 如果返回:加锁成功,则完成加锁操作
      • 如果返回:加锁失败,则:
        • 创建闭锁:同步等待锁释放通知
        • 创建锁节点路径的监视器:收到锁节点被删除的事件时,打开闭锁
        • 阻塞等待的闭锁被打开后,重新开始同步加锁( 尾递归调用
    case class ZkLock(...) {
    
      //////////////////// 3. lock ////////////////////
    
      def lock: Unit = {
        if (!tryLock) {
          val releaseSignal = new CountDownLatch(1)
          zk.exists(lockName, new Watcher {
            override def process(e: WatchedEvent): Unit = {
              if (lockName.equals(e.getPath)
                && EventType.NodeDeleted.equals(e.getType)) {
                releaseSignal.countDown
              }
            }
          })
          releaseSignal.await
          lock
        } else {
          println(s"${this} Locked ${nowHourStr} \n")
        }
      }
    }
    

解锁

解锁的算法步骤是:

  1. 锁节点是否存在
  2. 如果不存在,完成解锁
  3. 如果锁节点存在,则判断锁节点的数据( 锁的持有者 )是否和解锁者相同
  4. 如果一样,则删除锁节点,完成解锁

解锁实现

case class ZkLock(...) {

  //////////////////// 4. unlock ////////////////////

  def unlock: Unit = {
    if (zk.exists(lockName, false) != null) {
      val existOwner = new String(zk.getData(lockName, null, null), "UTF-8")
      if (lockOwner.equals(existOwner)) {
        zk.delete(lockName, -1)
        println(s"${this} Unlocked ${nowHourStr}")
      }
    }
  }
}

测试

object ZkLockDemo extends App {
  val lockName = "/zk-lock"
  val locker1 = ZkLock(lockName, "locker1")
  val locker2 = ZkLock(lockName, "locker2")

  async(() => locker1.lock)
  async(() => locker2.lock, waitTime = 1000L)

  async(() => locker1.unlock, waitTime = 2000L)
  async(() => locker2.unlock, waitTime = 3000L)

  Thread.sleep(Int.MaxValue)
}

注:async 方法用于异步地执行传入的函数来演示同步加锁、解锁过程,async 实现代码如下:

def async(action: () => Unit,
          waitTime: Long = 0L,
          delayTime: Long = 0L): Unit = {
  new Thread(new Runnable {
    override def run(): Unit = {
      Thread.sleep(waitTime)
      action()
      Thread.sleep(delayTime)
    }
  }).start()
}

执行结果:

ZkLock(/zk-lock,locker1) Locked 18:22:37.002
ZkLock(/zk-lock,locker1) Unlocked 18:22:38.897
ZkLock(/zk-lock,locker2) Locked 18:22:38.900
ZkLock(/zk-lock,locker2) Unlocked 18:22:39.901

梳理一下整个 Demo 的过程:

  1. locker1 先成功加锁
  2. locker2 在 1 秒后尝试加锁,但是锁已被占用,所以进入阻塞等待阶段
  3. locker1 在 2 秒后解锁
  4. locker2 收到锁释放通知,再次尝试加锁成功
  5. locker2 在 3 秒后解锁

缺陷

以上,一个简单的分布式排他锁就宣告完成,实现代码十分简单。

但是,它也有明显的缺陷:

  1. 排它锁的粒度大,没有区分读、写操作,如果读多写少,则十分影响性能
  2. 羊群效应:锁释放后会通知所有等待中的 ZooKeeper 客户端,然后同时发起加锁请求,瞬时压力很大

那有没有什么好的办法来解决这两个问题呢?欲知答案,请看下篇:ZooKeeper 分布式锁实践(下篇)读写锁

0