zookeeper

一、初识

1.1 简介

  1. Zookeeper 是一个开源的分布式的,为分布式应用提供协调服务的 Apache 项目。
  2. 从设计模式角度来理解,Zookeeper是一个基于观察者模式设计的分布式服务管理框架。
  3. 它负责存储和管理大家都关心的数据,然后接受观察者的注册,一旦这些数据的状态发生变化 ,Zookeeper就将负责通知已经在Zookeeper上注册
    的那些观察者做出相应的反应。

image.png

1.2 特点

image.png

  1. Zookeeper:一个领导者(Leader),多个跟随者(Follower)组成的集群。
  2. 集群中只要有半数以上节点存活,Zookeeper集群就能正常服务。
  3. 全局数据一致:每个Server保存一份相同的数据副本,Client无论连接到哪个Server,数据都是一致的。
  4. 更新请求顺序进行,来自同一个Client的更新请求按其发送顺序依次执行。
  5. 数据更新原子性,一次数据更新要么成功,要么失败。
  6. 实时性,在一定时间范围内,Client能读到最新数据。

1.3 数据结构

  1. ZooKeeper数据模型的结构与Unix文件系统很类似,整体上可以看作是一棵树,每个节点称做一
    个ZNode。
  2. 每一个ZNode默认能够存储1M B的数据,每个ZNode都可以通过其路径唯一标识。
    image.png

1.4 应用场景

统一命名服务

在分布式环境下,经常需要对应用/服务进行统一命名,便于识别。 例如:IP不容易记住,而域名容易记住。
image.png

统一配置管理

分布式环境下,配置文件同步非常常见。

  1. 一般要求一个集群中,所有节点的配置信息是一致的,比如 Kafka 集群。
  2. 对配置文件修改后,希望能够快速同步到各个节点上。

配置管理可交由ZooKeeper实现

  1. 可将配置信息写入ZooKeeper上的一个Znode。
  2. 各个客户端服务器监听这个Znode。
  3. 一 旦Znode中的数据被修改,ZooKeeper将通知各个客户端服务器。

image.png

统一集群管理

分布式环境中,实时掌握每个节点的状态是必要的,有时需要根据节点实时状态做出一些调整。

ZooKeeper可以实现实时监控节点状态变化

  1. 可将节点信息写入ZooKeeper上的一个ZNode。
  2. 监听这个ZNode可获取它的实时状态变化。

image.png

服务器节点动态上下线

image.png

软负载均衡

在Zookeeper中记录每台服务器的访问数,让访问数最少的服务器去处理最新的客户端请求
image.png

二、安装

2.1 本地模式

1.安装
wget https://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.6.2/apache-zookeeper-3.6.2-bin.tar.gz
tar -zxvf apache-zookeeper-3.6.2-bin.tar.gz -C /opt
mv /opt/apache-zookeeper-3.6.2-bin /opt/zookeeper-3.6.2
2. 配置修改
# 复制配置样本
mv /opt/zookeeper-3.6.2/conf/zoo_sample.cfg /opt/zookeeper-3.6.2/conf/zoo.cfg

# 创建数据目录
mkdir /opt/zookeeper-3.6.2/zkData

#修改dataDir路径
vim /opt/zookeeper-3.6.2/conf/zoo.cfg
# dataDir=/opt/zookeeper-3.6.2/zkData	
3. 启动
#3.1 启动
/opt/zookeeper-3.6.2/bin/zkServer.sh start

#3.2 检测
jps

#3.3 查看状态
/opt/zookeeper-3.6.2/bin/zkServer.sh status
# 4001 QuorumPeerMain

#3.4 启动客户端
/opt/zookeeper-3.6.2/bin/zkCli.sh

#3.5 退出客户端
quit

#3.6 停止zookeeper
/opt/zookeeper-3.6.2/bin/zkServer.sh stop
4. 配置参数
  1. tickTime =2000:通信心跳数,Zookeeper 服务器与客户端心跳时间,单位毫秒,用于心跳机制。
  2. initLimit =10:集群中的Follower与Leader之间初始连接时能容忍的最多心跳数。
  3. syncLimit =5:集群中的Follower与Leader之间最大响应心跳数,超时则从服务器列表中删除Follwer。
  4. dataDir:数据文件目录+数据持久化路径
  5. clientPort =2181:客户端连接端口

2.2 分布式模式

1. 单机部署

按2.1的步骤,在3台linux中部署zookeeper。

2. 新增标识

在3台linux中分别给zookeeper增加myid文件,里面分别放zookeeper的编号1,2,3

vim /opt/zookeeper-3.6.2/zkData/myid
3. 在每个cfg配置文件中添加集群节点
vim /opt/zookeeper-3.6.2/conf/zoo.cfg

#增加如下配置
#######################cluster##########################
# 2888端口:服务器 Follower 与集群中的 Leader 服务器交换信息的端口;
# 3888端口:万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,用来执行选举时服务器相互通信的端口。
# 记得在防火墙开启2128,2888,3888节点
server.1=192.168.100.5:2888:3888
server.2=192.168.100.6:2888:3888
server.3=hadoop104:2888:3888
4. 分别启动zookeeper

三、原理

3.1 选举机制

  1. 半数机制:集群中半数以上机器存活,集群可用。所以 Zookeeper 适合安装奇数台服务器。
  2. Zookeeper 工作时有一个节点为 Leader,其他则为 Follower,Leader 是通过内部的选举机制临时产生的。
演示

假设有五台服务器组成的 Zookeeper 集群,它们的 id 从 1-5,同时它们都是新启动的,也就是没有历史数据。
image.png

  1. 服务器 1 启动,发起一次选举。
    1.1 服务器 1 投自己一票。
    1.2 此时服务器 1 票数一票,不够半数以上(3 票),选举无法完成,服务器 1 状态保持为 LOOKING;
  2. 服务器 2 启动,再发起一次选举。
    2.1 服务器 1 和 2 分别投自己一票并交换选票信息:
    2.2 此时服务器 1 发现服务器 2 的 ID 比自己目前投票推举的(服务器 1)大,更改选票为推举服务器 2。
    2.3 此时服务器 1 票数 0 票,服务器 2 票数 2 票,没有半数以上结果,选举无法完成,服务器 1,2 状态保持 LOOKING
  3. 服务器 3 启动,发起一次选举并交换选票信息。
    3.1 此时服务器 1 和 2 都会更改选票为服务器 3。
    3.2 此次投票结果:服务器 1 为 0 票,服务器 2 为 0 票,服务器 3 为 3 票。此时服务器 3 的票数已经超过半数,服务器 3 当选 Leader。
    3.3 服务器 1,2 更改状态为 FOLLOWING,服务器 3 更改状态为 LEADING;
  4. 服务器 4 启动,发起一次选举。
    4.1 此时服务器 1,2,3 已经不是 LOOKING 状态,不会更改选票信息。
    4.2 交换选票信息结果:服务器 3 为 3 票,服务器 4 为 1 票。
    4.3 此时服务器 4 服从多数,更改选票信息为服务器 3,并更改状态为 FOLLOWING;
  5. 服务器 5 启动,同 4 一样当FOLLOWING。

3.2 节点类型

image.png

  1. 持久节点(Persistent):客户端和服务器端断开连接后,创建的节点不删除。
  2. 短暂节点(Ephemeral):客户端和服务器端断开连接后,创建的节点自己删除,也称临时节点。
  3. 顺序标识:创建znode时可以设置顺序标识,znode名称后会附加一个顺序号,顺序号是一个单调递增的计数器,由父节点维护。
    3.1 注:在分布式系统中,顺序号可以被用于为所有的事件进行全局排序,这样客户端可以通过顺序号推断事件的顺序。

3.3 监听器原理

image.png

  1. 在main线程中创建Zookeeper客户端,这时就会创建两个线程,一个负责网络连接通信(connet),一个负责监听(listener)。
  2. 通过connect线程将注册的监听事件发送给Zookeeper。
    2.1 监听节点数据的变化 get path [watch]
    2.2 监听子节点增减的变化 ls path [watch]
  3. 在Zookeeper的注册监听器列表中将注册的监听事件添加到列表中。
  4. Zookeeper监听到有数据或路径变化,就会将这个消息发送给listener线程。
  5. listener线程内部调用了process()方法

3.5 写数据流程

  1. Client 向 ZooKeeper 的Server1 上写数据,发送一个写请求。
  2. 如果Server1不是Leader,那么Server1 会把接受到的请求进一步转发给Leader,因为每个ZooKeeper的Server里面有一个是Leader。
  3. Leader 会将写请求广播给各个Server,比如Server1和Server2,各个Server会将该写请求加入待写队列,并向Leader发送成功信息。
  4. 当Leader收到半数以上 Server 的成功信息,说明该写操作可以执行。Leader会向各个Server 发送提交信息,各个Server收到信息后会落实队列里的写请求,此时写成功。
  5. Server1会进一步通知 Client 数据写成功了,这时就认为整个写操作成功。

image.png

四、实战

4.1 客户端命令

启动客户端
/opt/zookeeper-3.6.2/bin/zkCli.sh
显示客户端命令
help
查看当前 znode 中所包含的内容
ls /
查看当前节点详细数据
ls -s /
  1. czxid:创建节点的事务ID(zxid),每个修改都有唯一的 zxid,按发生顺序递增。
  2. ctime:znode创建的时间戳
  3. mzxid:znode最后更新的事务ID
  4. mtime:znode最后修改的时间戳
  5. pZxid:znode最后更新的子节点事务ID
  6. cversion:znode子节点修改次数
  7. dataversion:znode数据变化号
  8. aclVersion:znode访问控制列表的变化号
  9. ephemeralOwner:临时节点拥有者的 session id。
  10. dataLength:znode的数据长度
  11. numChildren:znode子节点数量
创建普通节点
create /sanguo "jinlian"
获得节点的值
get /sanguo
创建短暂节点
create -e /sanguo/wuguo
# 退出当前客户端然后再重启客户端,再次查看根目录下短暂节点已经删除
创建带序号的节点
create -s /sanguo/weiguo/xiaoqiao "jinlian"
修改节点数据值
set /sanguo/weiguo "simayi"
节点的值变化监听
get /sanguo watch
#在另一台zookeeper上修改/sanguo的值
set /sanguo "xisi"
子节点变化监听(路径变化)
ls /sanguo watch

# 在另一台zookeeper上创建/sanguo子节点
create /sanguo/jin "simayi"
删除节点
delete /sanguo/jin
递归删除节点
rmr /sanguo/shuguo
查看节点状态
stat /sanguo

4.2 API调用

  1. 搭建springboot项目
  2. 配置日志logback-spring.xml
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <property name="logger.path" value="D:/logs" />

    <!-- 彩色日志 -->
    <!-- 彩色日志依赖的渲染类 -->
    <conversionRule conversionWord="clr" converterClass="org.springframework.boot.logging.logback.ColorConverter" />
    <conversionRule conversionWord="wex" converterClass="org.springframework.boot.logging.logback.WhitespaceThrowableProxyConverter" />
    <conversionRule conversionWord="wEx" converterClass="org.springframework.boot.logging.logback.ExtendedWhitespaceThrowableProxyConverter" />
    <!-- 彩色日志格式 -->
    <property name="CONSOLE_LOG_PATTERN" value="${CONSOLE_LOG_PATTERN:-%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(${LOG_LEVEL_PATTERN:-%5p}) %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}}"/>


    <!--输出到控制台-->
    <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
        <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
            <level>info</level>
        </filter>
        <encoder>
            <Pattern>${CONSOLE_LOG_PATTERN}</Pattern>
            <charset>UTF-8</charset>
        </encoder>
    </appender>


    <!--输出到文件-->
    <!-- 时间滚动输出 level为 DEBUG 日志 -->
    <appender name="DEBUG_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <file>${logger.path}/log_debug.log</file>
        <!--日志文件输出格式-->
        <encoder>
            <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
            <charset>UTF-8</charset> <!-- 设置字符集 -->
        </encoder>
        <!-- 日志记录器的滚动策略,按日期,按大小记录 -->
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <!-- 日志归档 -->
            <fileNamePattern>${logger.path}/debug/log-debug-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
            <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
                <maxFileSize>100MB</maxFileSize>
            </timeBasedFileNamingAndTriggeringPolicy>
            <!--日志文件保留天数-->
            <maxHistory>15</maxHistory>
        </rollingPolicy>
        <!-- 此日志文件只记录debug级别的 -->
        <filter class="ch.qos.logback.classic.filter.LevelFilter">
            <level>debug</level>
            <onMatch>ACCEPT</onMatch>
            <onMismatch>DENY</onMismatch>
        </filter>
    </appender>

    <!-- 时间滚动输出 level为 INFO 日志 -->
    <appender name="INFO_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <!-- 正在记录的日志文件的路径及文件名 -->
        <file>${logger.path}/log_info.log</file>
        <!--日志文件输出格式-->
        <encoder>
            <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
            <charset>UTF-8</charset>
        </encoder>
        <!-- 日志记录器的滚动策略,按日期,按大小记录 -->
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <!-- 每天日志归档路径以及格式 -->
            <fileNamePattern>${logger.path}/info/log-info-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
            <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
                <maxFileSize>100MB</maxFileSize>
            </timeBasedFileNamingAndTriggeringPolicy>
            <!--日志文件保留天数-->
            <maxHistory>15</maxHistory>
        </rollingPolicy>
        <!-- 此日志文件只记录info级别的 -->
        <filter class="ch.qos.logback.classic.filter.LevelFilter">
            <level>info</level>
            <onMatch>ACCEPT</onMatch>
            <onMismatch>DENY</onMismatch>
        </filter>
    </appender>

    <!-- 时间滚动输出 level为 WARN 日志 -->
    <appender name="WARN_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <!-- 正在记录的日志文件的路径及文件名 -->
        <file>${logger.path}/log_warn.log</file>
        <!--日志文件输出格式-->
        <encoder>
            <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
            <charset>UTF-8</charset> <!-- 此处设置字符集 -->
        </encoder>
        <!-- 日志记录器的滚动策略,按日期,按大小记录 -->
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <fileNamePattern>${logger.path}/warn/log-warn-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
            <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
                <maxFileSize>100MB</maxFileSize>
            </timeBasedFileNamingAndTriggeringPolicy>
            <!--日志文件保留天数-->
            <maxHistory>15</maxHistory>
        </rollingPolicy>
        <!-- 此日志文件只记录warn级别的 -->
        <filter class="ch.qos.logback.classic.filter.LevelFilter">
            <level>warn</level>
            <onMatch>ACCEPT</onMatch>
            <onMismatch>DENY</onMismatch>
        </filter>
    </appender>


    <!-- 时间滚动输出 level为 ERROR 日志 -->
    <appender name="ERROR_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <!-- 正在记录的日志文件的路径及文件名 -->
        <file>${logger.path}/log_error.log</file>
        <!--日志文件输出格式-->
        <encoder>
            <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
            <charset>UTF-8</charset> <!-- 此处设置字符集 -->
        </encoder>
        <!-- 日志记录器的滚动策略,按日期,按大小记录 -->
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <fileNamePattern>${logger.path}/error/log-error-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
            <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
                <maxFileSize>100MB</maxFileSize>
            </timeBasedFileNamingAndTriggeringPolicy>
            <!--日志文件保留天数-->
            <maxHistory>15</maxHistory>
        </rollingPolicy>
        <!-- 此日志文件只记录ERROR级别的 -->
        <filter class="ch.qos.logback.classic.filter.LevelFilter">
            <level>ERROR</level>
            <onMatch>ACCEPT</onMatch>
            <onMismatch>DENY</onMismatch>
        </filter>
    </appender>
    <!--
        root节点是必选节点,用来指定最基础的日志输出级别,只有一个level属性
        level:用来设置打印级别,大小写无关:TRACE, DEBUG, INFO, WARN, ERROR, ALL 和 OFF,
        不能设置为INHERITED或者同义词NULL。默认是DEBUG
        可以包含零个或多个元素,标识这个appender将会添加到这个logger。
    -->

    <root level="info">
        <appender-ref ref="CONSOLE" />
        <appender-ref ref="DEBUG_FILE" />
        <appender-ref ref="INFO_FILE" />
        <appender-ref ref="WARN_FILE" />
        <appender-ref ref="ERROR_FILE" />
    </root>

</configuration>
  1. 创建zookeeper客户端
package site.javaee.zookeeper;

import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

import java.util.List;

@Slf4j
class ZookeeperTest {

    private static ZooKeeper zkClient = null;

    //创建zookeeper客户端
    public static void init() throws Exception {
        String connectString = "zoo1:2181";
        int sessionTimeout = 2000;
        zkClient = new ZooKeeper(connectString, sessionTimeout, (event) -> {
            // 收到事件通知后的回调函数(用户的业务逻辑)
            System.out.println(event.getType() + "--" +
                    event.getPath());
            // 再次启动监听
            try {
                zkClient.getChildren("/", true);
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
    }

    //创建节点
    public static void createNode() throws KeeperException, InterruptedException {
        String path = zkClient.create("/javaee", "WalkerDogW".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        System.out.println(path);
    }

    //获取子节点并监听节点变化
    public static void getChildren() throws Exception {
        List<String> children = zkClient.getChildren("/", true);
        for (String child : children) {
            System.out.println(child);
        }
        // 延时阻塞
        Thread.sleep(Long.MAX_VALUE);
    }

    //判断zNode是否存在
    public static void exist() throws Exception {
        Stat stat = zkClient.exists("/eclipse", false);
        System.out.println(stat == null ? "not exist" : "exist");
    }


    public static void main(String[] args) throws Exception {
        log.info("gg");
        init();
        createNode();
    }

}

Q.E.D.

知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议