编程知识 cdmana.com

一文带你深入浅出zookeeper

zookeeper的基本概念

zookeeper是一款开源的分布式协调服务框架,为分布式环境提供了一致性服务的功能,常见应用场景有:发布订阅,主动通知,文件管理,集群管理,分布式锁等功能。zk在设计的时候满足了cp两要素,即一致性和分区容错性。

zookeeper的设计理念

这里我参考以往的一些经验,将其简单囊括为了以下几点:
一致性
如下图所示,所有的客户端一旦连接到了集群环境中,不论访问的zk是leader角色还是follower角色,每个zk节点的数据都是相同的。假设某一时刻,zk的某个节点数据被修改了,那么此时必须要将每个节点的数据都做同步之后才能继续提供外界读取节点的功能。
图片: https://uploader.shimo.im/f/0dRhY6ULCmrJZDn7.png

有头
在集群环境中,一定会有一个leader的角色充当集群领头。一旦leader挂了,就会重新选举新的机器当选leader。

数据树
zk内部存储数据是采用了树状结构,这一点有些类似于文件系统的设计,每个树状节点底下存放的子节点可以是有序排列的状态 , 如下图所示:
在这里插入图片描述

zookeeper内部的相关配置信息

常规配置文件内容
使用的zk版本为3.4.14版本

# zookeeper时间配置中的基本单位 (毫秒)
tickTime=2000
# 允许follower初始化连接到leader最大时长,它表示tickTime时间倍数 即:initLimit*tickTime 
initLimit=10
# 允许follower与leader数据同步最大时长,它表示tickTime时间倍数 
syncLimit=5
# 存放节点数据的位置
dataDir=/Users/linhao/env/zookeeper/zk-data
# 端口号
clientPort=2181
# 最大并发连接数
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
#日志地址
dataLogDir=/Users/linhao/env/zookeeper/zk-log

# 保存的数据快照数量,之外的将会被清除
autopurge.snapRetainCount=3
#自动触发清除任务时间间隔,小时为单位。默认为0,表示不自动清除。
autopurge.purgeInterval=1

内部指令整理

启动zookeeper脚本

sh ./zkServer.sh start

连接服务端指令

sh ./zkCli.sh

节点的增删改查

ls /  查询所有根节点

ls /idea 查询idea节点下边的内容

delete /idea 删除idea节点

delete /idea/node-1 删除idea节点下边的node-1子节点

create /idea/node-2 "v1" 创建idea/node-2节点 并且写入v1值

get /idea/node-1 获取相关节点

节点类型

持久化节点
默认创建的节点就是持久化类型

create  /node ""

临时节点
在zookeeper3.4版本里面,创建临时节点的时候可能会有以下异常:

Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: 3
	at org.apache.zookeeper.ZooKeeperMain.processZKCmd(ZooKeeperMain.java:707)

临时节点正确的执行命令为:

create -e /temp ""

持久化顺序节点

create -s /seq-node ""

多次执行该指令之后可以看到以下内容:

[zk: localhost:2181(CONNECTED) 4] ls /
[dubbo, seq-node0000000010, ietty, zookeeper, idea]
[zk: localhost:2181(CONNECTED) 5] create -s /seq-node ""
Created /seq-node0000000011
[zk: localhost:2181(CONNECTED) 6] create -s /seq-node ""
Created /seq-node0000000012
[zk: localhost:2181(CONNECTED) 7] create -s /seq-node ""
Created /seq-node0000000013
[zk: localhost:2181(CONNECTED) 8] create -s /seq-node ""
Created /seq-node0000000014

此时节点的下方已经创建了多个顺序节点数据

临时顺序节点

create -e -s /seq-temp-node ""

查看节点信息

stat命令可以查看节点底下的各种属性
[zk: localhost:2181(CONNECTED) 12] stat /idea
//创建节点的事务id
cZxid = 0x1bd6
//创建节点的时间
ctime = Sun Nov 08 22:50:57 CST 2020
//修改节点的事务id
mZxid = 0x1bd6
//最后修改时间
mtime = Sun Nov 08 22:50:57 CST 2020
//子节点变更的事务ID
pZxid = 0x1bdb
//子节点的变更次数
cversion = 4
dataVersion = 0
//权限版本
aclVersion = 0
ephemeralOwner = 0x0
//数据长度
dataLength = 12
//第一层子节点的数目 不包含子子节点
numChildren = 2

acl权限设置
ACL全称为Access Control List(访问控制列表),用于控制资源的访问权限。ZooKeeper使用ACL来控制对其znode的防问。基于schemepermission的方式进行权限控制。scheme表示授权模式、id模式对应值、permission即具体的增删改权限位。
scheme认证模型

方案 描述
world 开放模式,world表示全世界都可以访问(这是默认设置)
ip ip模式,限定客户端IP防问
auth 用户密码认证模式,只有在会话中添加了认证才可以防问
digest 与auth类似,区别在于auth用明文密码,而digest 用sha-1+base64加密后的密码。在实际使用中digest 更常见。

permission权限位

权限位 权限 描述
c CREATE 可以创建子节点
d DELETE 可以删除子节点(仅下一级节点)
r READ 可以读取节点数据及显示子节点列表
w WRITE 可以设置节点数据
a ADMIN 可以设置节点访问控制列表权限

acl 相关命令

命令 描述
getAcl getAcl
setAcl setAcl
addauth addauth

world权限示例 语法: setAcl world:anyone:<权限位> 注:world模式中anyone是唯一的值,表示所有人
查看默认节点权限:

#创建一个节点
create -e /testAcl
#查看节点权限
getAcl /testAcl
#返回的默认权限表示 ,所有人拥有所有权限。
'world,'anyone: cdrwa
修改默认权限为 读写
#设置为rw权限 
setAcl /testAcl world:anyone:rw
# 可以正常读
get /testAcl
# 无法正常创建子节点
create -e /testAcl/t "hi"
# 返回没有权限的异常
Authentication is not valid : /testAcl/t

zookeeper集群
建议最少要有三台服务器。

这两种场景下会有选举发生:
1.服务节点初始化
2.或者半数以上的节点无法和leader进行连接建立

集群里面的zk机器均具有读和写的功能。

zk集群的选举

在讲解选举之前,我们需要先了解一下什么是zxid。
在zk的节点数据中,每次发生数据变动都会有一个流水id做递增的记录,这个id我们称之为zxid,不同机器的zxid可能会有所不同,越大代表当前的数据越新。实际上每个zk节点都有两个用于记录更新的id,分别是czxid和mzxid。通过名称的缩写可以翻译为:
czxid:创建节点时候的xid。
mzxid:修改节点数据时候的xid。

投票整体思路

第一轮投票全部投给自己
第二轮投票给zxid比自己大的相邻节点 如果得票超过半数,选举结束。

假设现在有3台机器参与选举,分别是1,2,3号机器,启动顺序是1,2,3

第一轮投票
1号机器投票给到1自己
2号机器投票给到2自己
3号机器投票给到3自己

第二轮投票
1号机器的id< 2号机器的id <3号机器的id
1号机器投票给到2号机器,此时2号获取选票大于总机器数目的一半,所以2号成为leader
2号投票给到3号机器,由于此时2号机器已经是leader了,所以3号机器依然只能和1号机器一起保持为follower角色。

所以经过一轮选举之后,2号机当选leader

如何保证主从节点的数据状态同步

ZAB协议
在zk集群中,保证各个server端的数据一致性其实是依靠了一个叫做zab的基础协议。zab协议的全称是zookeeper atomic broadcast zk原子广播协议,可以理解为是一种用于保证分布式环境下事务一致性的一种协议。

在zk的集群环境中,是由一台主机器用于接受集群的请求指令(暂时称之为leader),然后通过leader将指令复制传递给各个follower角色进行数据复制。
如何判断写请求提交成功
当leader将数据成功写入超过半数以上的节点的时候就算是写请求提交成功了。写请求过程中又有哪些细节呢?各位读者不要担心,在下边会有详细讲解。

事务提交的一致性
假设有两个写请求a和b。
a请求:创建一个 /t1 的节点
b请求:创建一个 /t1/a 的节点
那么此时,必须要等a请求执行的事务被完全提交了,后边b请求才能被执行。假设a请求没有提交成功,那么此时后边b请求也不会执行成功。

zk集群环境中写入数据过程中发生了什么

客户端首先连接到zk集群环境,然后将需要写入的数据提交给任意一台zk服务器,如果当前服务器是leader,则由leader接受,然后将数据广播给集群的每一台follower机器。如果接受指令的是follower,则会将请求转给leader,再广播给各个follower机器。

广播细节分为两个步骤:
Leader 可以接受客户端新的事务Proposal请求,将新的Proposal请求广播给所有的 Follower。如何确认广播出去的请求,follower确认接收呢?需要借助一个ack的标记信息响应进行反馈。

假设集群里面有一半的机器都返回了ack信号,那么此时leader就可以再次广播每个机器执行相应事务的commit操作了,并且执行之后需要在本地记录该行为到事务日志中。

崩溃恢复模式和消息广播模式
崩溃模式
zk集群中的leader可能因为网络抖动或者某些异常问题,失去了和其余follower的联系,此时剩余的follower需要重新进行选举。
或者如果集群中只有不到一半的机器能和leader进行通信,那么此时也算是进入到了崩溃模式,需要重新选举。

广播模式
由leader向各个follower发送同步数据的信号,发送各种事务信息和提交信号。

zk集群可能出现的异常场景

图片: https://uploader.shimo.im/f/eBJlYx842UmUUcoo.png

zk中节点数量过多,导致根节点加载不出来。堵塞

图片: https://uploader.shimo.im/f/3xF98AE7xqkLOmnk.png
导致大量zk请求中断

zk节点的数据变动通知特性

当客户端链接到zk服务端的时候,某个节点如果出现了数据变动,之前有监听过这个节点数据的客户端都会接收到相关信号。

可能会有什么问题?
这种主动通知是一种弱最终一致性,而且只会发送一次通知,并不能保证更新的实时准确性。

实践应用场景

网上已经有很多关于基于zk实现分布式锁,dubbo服务注册中心的这些案例,这里我列举了一个自己写的简单案例,扩散各位读者们的一些设计思路。

分布式集群环境的管理

利用了zk里面的有序节点来实现分布式集群环境下的节点管理,设计的思路页比较简单,给每个应用启动的时候加入一个插件,实时将该程序运作的相关性能属性上报到zk的特定节点中,一旦服务下线,节点的数据也会按时消失。

利用zk的临时顺序节点特性就能很好地完成这个功能:

图片: https://uploader.shimo.im/f/4dUmAyEAhe5QnNUv.png

思路整理:
往zk的一个节点(这里假设是:zk-agent 节点)下创建一批临时有序节点:

zk-agent/server000001 //假设1号机器启动
zk-agent/server000002 //假设2号机器启动
zk-agent/server000003 //假设3号机器启动

每个节点都有相关的写入数据:

{
    "serverIp":"127.0.0.1","totalCpu":12,"totalMemory":17179869184,"totalMemoryDesc":"16GB","memoryUsage":"58.9%"}

查看的信息的时候只需要通过访问zk注册中心,读取节点下边的数据即可。
当任意一台服务出现异常的时候,相关的服务也需要和zk断开连接,此时节点数据消失。

为什么不用mysql存储服务性能信息?
其实使用mysql,redis等数据库也是可行的,但是必须一点:
1.服务断开之后数据能够及时同步更新
而这一点,在使用zookeeper来落地更加适合,因为zookeeper内部的临时节点与生俱来就支持这一特性。
这里我罗列出自己的一些编码设计,供大家参考:
相关依赖:

<dependencies>
    <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.5.5</version>
    </dependency>
    <dependency>
        <groupId>com.101tec</groupId>
        <artifactId>zkclient</artifactId>
        <version>0.11</version>
    </dependency>

    <dependency>
        <groupId>com.github.oshi</groupId>
        <artifactId>oshi-core</artifactId>
        <version>3.5.0</version>
    </dependency>

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.12</version>
    </dependency>

    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>2.10.2</version>
    </dependency>

java代码实现:


package org.idea.zookeeper.framework.agent;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.I0Itec.zkclient.ZkClient;

import java.lang.instrument.Instrumentation;

import static org.idea.zookeeper.framework.agent.OSOptCommonConstants.*;

/** * @author idea * @date created in 5:39 下午 2020/11/9 */
public class ZkAgent {
    

    private static ZkClient zkClient;

    private static String nodePath;

    public static void premain(String args, Instrumentation instrumentation) {
    
        System.out.println("[ZkAgent] begin init ");
        if (args != null) {
    
            System.out.println(args);
        }
        zkClient = new ZkClient(ZK_SERVER);
        init();
    }

    /** * 初始化操作 */
    public static void init() {
    
        if (!zkClient.exists(ROOT_PATH)) {
    
            zkClient.createPersistent(ROOT_PATH, "");
        }
        //当前创建的节点路径
        nodePath = zkClient.createEphemeralSequential(SERVER_PATH, getOsBeanDesStr());
        System.out.println("创建节点:" + nodePath);
        final Thread thread = new Thread(new Runnable() {
    
            @Override
            public void run() {
    
                while (true) {
    
                    //守护线程计算相关内存信息
                    updateData(nodePath, getOsBeanDesStr());
                    try {
    
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
    
                        e.printStackTrace();
                    }
                }
            }
        });
        thread.setDaemon(true);
        thread.start();
    }

    /** * 获取节点的描述信息 * * @return */
    public static String getOsBeanDesStr() {
    
        //获取系统内部相关信息
        OSBean osBean = CPUMonitorUtil.getOSBean();
        ObjectMapper objectMapper = new ObjectMapper();
        try {
    
            return objectMapper.writeValueAsString(osBean);
        } catch (JsonProcessingException e) {
    
            e.printStackTrace();
        }
        return null;
    }

    /** * 更新节点数据信息 * * @param path * @param data */
    public static void updateData(String path, Object data) {
    
        if (zkClient.exists(path)) {
    
            zkClient.writeData(path, data);
        } else {
    
            zkClient.createEphemeral(path, data);
        }
    }
}

通过结合jdk的Instrumentation技术,在启动程序的时候正好初始化获取操作系统内部的性能指标。创建一个守护线程往节点定时刷新上报系统性能指标,一旦主进程挂了,守护线程也随之消失,从而实现和zk断开会话,节点消失。

查看zk内部节点的信息我这里暂时只设计了一个简单的命令窗口:

package org.idea.zookeeper.framework.agent;
/** * 获取操作系统内部的一些属性信息服务 * * @author linhao * @date created in 8:51 下午 2020/11/9 */
public interface OSInfoService {
    

    /** * 根据opts来查看不同的数据信息 * */
    void doOptionInfo();
}

package org.idea.zookeeper.framework.agent;

import org.I0Itec.zkclient.ZkClient;

import java.util.List;
import java.util.Scanner;

import static org.idea.zookeeper.framework.agent.OSOptCommonConstants.*;

/** * @author linhao * @date created in 10:13 下午 2020/11/9 */
public class OSInfoServiceImpl implements OSInfoService {
    

    private ZkClient zkClient;

    public OSInfoServiceImpl() {
    
        System.out.println("初始化连接zk[" + ZK_SERVER + "]=====");
        this.zkClient = new ZkClient(ZK_SERVER);
    }

    @Override
    public void doOptionInfo() {
    
        while (true) {
    
            System.out.println("请输出执行指令:【】");
            Scanner scanner = new Scanner(System.in);
            int opts = scanner.nextInt();
            if (LIST_ALL == opts) {
    
                List<String> childrenList = zkClient.getChildren(ROOT_PATH);
                System.out.println(childrenList);
                for (String childPath : childrenList) {
    
                    Object nodeData = zkClient.readData(ROOT_PATH +"/"+ childPath);
                    System.out.println("【serverInfo】:" + nodeData);
                }
            } else if(EXIT == opts){
    
                return;
            }
        }
    }


    public static void main(String[] args) {
    
        OSInfoServiceImpl osInfoServiceOptioner = new OSInfoServiceImpl();
        osInfoServiceOptioner.doOptionInfo();
    }

}

将插件打包成jar包,然后通过注入javaagent指令后启动:

-javaagent:/Users/idea/IdeaProjects/my-github/zookeeper-framework/zookeeper-agent/target/zk-agent.jar

实践结果
同时启动三个应用
图片: https://uploader.shimo.im/f/Kue0F0AhM3A2TQJ6.png

然后查看监控中心:
图片: https://uploader.shimo.im/f/5CuacLNobzKC7Bbo.png

这里只是一个粗糙版本的管理平台,如果大家有兴趣可以根据这种思路再去加入一些更加详细的监控数据,例如:将控制台查看信息转换为通过漂亮美观的web界面来渲染,加入节点的监听功能,动态监听服务的下线和上线等等。

文章答疑解惑

ps:因为本文首发是在 java知音 公众号平台,下边是其中一位读者的提问:

大佬好,有两点疑问:1集群中是否只有leader负责写操作(leader读+写+选举、follwer读+选举、observer读); 2 leader选举比较,先比较zxid(比较高32位年号和低32位事务编号),再比较机器编号

首先第一点
以下是我个人的思考:
第一个问题:
leader负责读和写。
follwer负责读和写加选举(当网络出现异常的时候,失去了leader,此时就会重新进入选举状态)。当follwer接收到写命令的时候,会将该指令发送给leader,由leader再将该指令通过广播的形式发送Proposal给到各个follwer节点。在文中讲解写入节点数据细节那里我有提及,可以看下。
observer其实是zk集群后期的一种性能优化设计,类似follwer一样可以接收读和写的指令然后转发给leader,只不过它不会参加到leader广播proposal之后确认ack是否过半数的投票环节。

第二个问题:
读者的理解是正确的,
ZXID是一个长度64位的数字,其中低32位是按照数字递增,任何数据的变更都会导致,低32位的数字简单加1。高32位是leader周期编号,每当选举出一个新的leader时,新的leader就从本地事物日志中取出ZXID,然后解析出高32位的周期编号,进行加1,再将低32位的全部设置为0。这样就保证了每次新选举的leader后,保证了ZXID的递增性。
myid是每台服务器独立配置的一个唯一id,只有当zxid都一致的时候才会再比对myid。

版权声明
本文为[Danny_idea]所创,转载请带上原文链接,感谢
https://blog.csdn.net/Danny_idea/article/details/109687367

Scroll to Top