ZooKeeper 分布式锁实践(上篇)排它锁

前面我们使用 Redis 实现了一个简单的分布式排它锁,它的主要问题在于无法及时得知锁状态的变化,虽然能够通过 Redis 的订阅发布模式来实现通知的功能,但实现起来比较复杂、实现成本较高。而 ZooKeeper 天生就是为分布式系统的协调工作而设计的,能够很轻松地实现日常的分布式管理工具,也就是 ZooKeeper 几大菜谱:

  • 数据发布/订阅
  • 负载均衡
  • 命名服务
  • 分布式协调/通知
  • 集群管理
  • Master选举
  • 分布式锁
  • 分布式队列

接下来,就让我们具体来看看,如何通过 ZooKeeper 的部件来实现一个分布式锁。

数据模型

对于一个锁来说,我们至少需要知道它的 ID、以及它的所有者( 只有持有者才能解锁 ),所以简单来说,锁的数据模型就是:

case class ZkLock
(
  lockName: String,
  lockOwner: String
)

那么,如何用 ZooKeeper 来存储锁对象呢?

其实非常简单,用一个 ZNode 来表示一个锁,ZNode 的路径就是锁的 ID、ZNode 的数据即是锁的所有者:

/zk-lock (data = "zk-lock-owner")

原语

锁的原语一般有两个:

  1. 加锁
  2. 解锁

加锁

加锁的一般算法步骤是:

  1. 尝试加锁
  2. 如果锁没有被占用,则加锁成功
  3. 如果锁被占用,则等待锁被释放
  4. 锁被释放后,收到锁释放通知,重复步骤 1

翻译成ZooKeeper的算法步骤就是:

  1. 尝试创建表示锁的临时节点
  2. 如果创建节点成功,则加锁成功
  3. 如果创建节点失败,则创建一个锁节点的监视器,等待锁节点的删除通知
  4. 锁节点被持有者删除后,收到锁节点的删除通知,重复步骤 1

这里有两个注意点:

  • 为什么创建临时节点?当 ZooKeeper 客户端断开与服务端的连接时,它所创建的临时节点会被删除
  • 节点删除监视器

加锁实现

  1. 数据模型:ZkLock 对象:
    case class ZkLock
    (
      lockName: String, // 锁ID = ZNode节点的路径
      lockOwner: String // 锁的所有者 = ZNode节点的数据内容
    ) {
      // 1. connect
      // 2. try lock
      // 3. lock
      // 4. unlock
    }
    
  2. 为了示例完整性,在构造锁对象时会创建一个 ZooKeeper 客户端连接,用来和 ZooKeeper 服务端通信。

    这里通过一个闭锁来同步 ZooKeeper 客户端的连接状态:构造函数会一直阻塞,直到ZooKeeper客户端连接上服务端,然后打开闭锁:

    case class ZkLock(...) {
    
      //////////////////// 1. connect ////////////////////
    
      private val connectSignal = new CountDownLatch(1)
      private val zk = new ZooKeeper(
        ZkConnection.defaultHost,
        60 * 1000,
        new Watcher {
          override def process(e: WatchedEvent): Unit = {
            if (KeeperState.SyncConnected.equals(e.getState)
              && EventType.None.equals(e.getType)) {
              connectSignal.countDown // 连接成功,打开闭锁
            }
          }
        }
      )
      connectSignal.await
    }
    

    与服务端成功建立连接之后,就可以实现下一步尝试加锁的操作了。

  3. 尝试加锁

    • 排他性:锁节点只能被一个 ZooKeeper 成功创建
    • 可重入性:如果锁节点已被创建、且加锁者与锁节点的持有者一样,也返回加锁成功
    • 非阻塞方法:不管是否成功加锁,都立即返回加锁的结果
    case class ZkLock(...) {
    
      //////////////////// 2. try lock ////////////////////
    
      def tryLock: Boolean = {
        Try {
          if (zk.exists(lockName, false) != null) {
            val existOwner = new String(zk.getData(lockName, null, null), "UTF-8")
            lockOwner.equals(existOwner) // 可重入性
          } else {
            zk.create(
              lockName, // 节点路径 = 锁ID
              lockOwner.getBytes, // 节点数据 = 锁的所有者
              Ids.OPEN_ACL_UNSAFE, // 公开访问权限
              CreateMode.EPHEMERAL // 临时节点
            )
            true
          }
        } match {
          case Success(isLocked) => isLocked
          case _ => false
        }
      }
    }
    
  4. 同步加锁
    • 同步加锁算法:尝试加锁:调用 tryLock 方法
      • 如果返回:加锁成功,则完成加锁操作
      • 如果返回:加锁失败,则:
        • 创建闭锁:同步等待锁释放通知
        • 创建锁节点路径的监视器:收到锁节点被删除的事件时,打开闭锁
        • 阻塞等待的闭锁被打开后,重新开始同步加锁( 尾递归调用
    case class ZkLock(...) {
    
      //////////////////// 3. lock ////////////////////
    
      def lock: Unit = {
        if (!tryLock) {
          val releaseSignal = new CountDownLatch(1)
          zk.exists(lockName, new Watcher {
            override def process(e: WatchedEvent): Unit = {
              if (lockName.equals(e.getPath)
                && EventType.NodeDeleted.equals(e.getType)) {
                releaseSignal.countDown
              }
            }
          })
          releaseSignal.await
          lock
        } else {
          println(s"${this} Locked ${nowHourStr} \n")
        }
      }
    }
    

解锁

解锁的算法步骤是:

  1. 锁节点是否存在
  2. 如果不存在,完成解锁
  3. 如果锁节点存在,则判断锁节点的数据( 锁的持有者 )是否和解锁者相同
  4. 如果一样,则删除锁节点,完成解锁

解锁实现

case class ZkLock(...) {

  //////////////////// 4. unlock ////////////////////

  def unlock: Unit = {
    if (zk.exists(lockName, false) != null) {
      val existOwner = new String(zk.getData(lockName, null, null), "UTF-8")
      if (lockOwner.equals(existOwner)) {
        zk.delete(lockName, -1)
        println(s"${this} Unlocked ${nowHourStr}")
      }
    }
  }
}

测试

object ZkLockDemo extends App {
  val lockName = "/zk-lock"
  val locker1 = ZkLock(lockName, "locker1")
  val locker2 = ZkLock(lockName, "locker2")

  async(() => locker1.lock)
  async(() => locker2.lock, waitTime = 1000L)

  async(() => locker1.unlock, waitTime = 2000L)
  async(() => locker2.unlock, waitTime = 3000L)

  Thread.sleep(Int.MaxValue)
}

注:async 方法用于异步地执行传入的函数来演示同步加锁、解锁过程,async 实现代码如下:

def async(action: () => Unit,
          waitTime: Long = 0L,
          delayTime: Long = 0L): Unit = {
  new Thread(new Runnable {
    override def run(): Unit = {
      Thread.sleep(waitTime)
      action()
      Thread.sleep(delayTime)
    }
  }).start()
}

执行结果:

ZkLock(/zk-lock,locker1) Locked 18:22:37.002
ZkLock(/zk-lock,locker1) Unlocked 18:22:38.897
ZkLock(/zk-lock,locker2) Locked 18:22:38.900
ZkLock(/zk-lock,locker2) Unlocked 18:22:39.901

梳理一下整个 Demo 的过程:

  1. locker1 先成功加锁
  2. locker2 在 1 秒后尝试加锁,但是锁已被占用,所以进入阻塞等待阶段
  3. locker1 在 2 秒后解锁
  4. locker2 收到锁释放通知,再次尝试加锁成功
  5. locker2 在 3 秒后解锁

缺陷

以上,一个简单的分布式排他锁就宣告完成,实现代码十分简单。

但是,它也有明显的缺陷:

  1. 排它锁的粒度大,没有区分读、写操作,如果读多写少,则十分影响性能
  2. 羊群效应:锁释放后会通知所有等待中的 ZooKeeper 客户端,然后同时发起加锁请求,瞬时压力很大

那有没有什么好的办法来解决这两个问题呢?欲知答案,请看下篇:ZooKeeper 分布式锁实践(下篇)读写锁

0

我们正在招聘Java工程师,欢迎有兴趣的同学投递简历到 rd-hr@xingren.com

发表评论

电子邮件地址不会被公开。 必填项已用*标注